1use super::{state, ContainerMetrics, ContainerState, DockerContainerMachine, StartResult};
2use anyhow::{Context, Result};
3use bollard::container::{
4 Config, CreateContainerOptions, LogsOptions, StartContainerOptions, WaitContainerOptions,
5};
6use bollard::image::CreateImageOptions;
7use bollard::service::{HostConfig, Mount, MountTypeEnum};
8use bollard::volume::CreateVolumeOptions;
9use chrono::Utc;
10use futures::StreamExt;
11use serde_json::Value;
12use std::collections::HashMap;
13use std::path::PathBuf;
14use std::time::Duration;
15use stormchaser_model::dsl::CommonContainerSpec;
16use tokio::time::sleep;
17use tracing::{error, info};
18use uuid::Uuid;
19
20impl DockerContainerMachine<state::Initialized> {
21 pub fn adopt(self, container_name: String) -> DockerContainerMachine<state::Running> {
23 info!("Adopting orphaned Docker container {}", container_name);
24 DockerContainerMachine {
25 nats: self.nats.clone(),
26 docker: self.docker,
27 metadata: self.metadata,
28 state: state::Running {
29 container_name,
30 dispatched_at: Utc::now(),
31 volumes_to_cleanup: Vec::new(),
32 storage_names: Vec::new(),
33 mounts: Vec::new(),
34 },
35 }
36 }
37
38 pub async fn clean_up(self, container_name: &str) -> Result<()> {
40 info!("Cleaning up orphaned Docker container {}", container_name);
41 let _ = self.docker.stop_container(container_name, None).await;
42 let _ = self.docker.remove_container(container_name, None).await;
43 Ok(())
44 }
45
46 pub async fn start(self) -> Result<StartResult> {
48 let container_name = format!(
49 "storm-{}-{}",
50 self.metadata.step_dsl.name.to_lowercase().replace('_', "-"),
51 &self.metadata.step_id.to_string()[..8]
52 );
53
54 let spec: CommonContainerSpec = serde_json::from_value(self.metadata.step_dsl.spec.clone())
55 .context("Failed to parse RunContainer spec as CommonContainerSpec")?;
56
57 let mut mounts = Vec::new();
58 let mut storage_names = Vec::new();
59 let mut volumes_to_cleanup = Vec::new();
60
61 if let Some(storage_mounts) = &spec.storage_mounts {
62 for mount in storage_mounts {
63 storage_names.push(mount.name.clone());
64 let volume_name = format!(
65 "sfs-{}-{}",
66 mount.name.to_lowercase().replace('_', "-"),
67 &self.metadata.run_id.to_string()[..8]
68 );
69
70 info!("Ensuring Docker volume exists: {}", volume_name);
71 self.docker
72 .create_volume(CreateVolumeOptions {
73 name: volume_name.clone(),
74 ..Default::default()
75 })
76 .await?;
77
78 volumes_to_cleanup.push(volume_name.clone());
79
80 mounts.push(Mount {
81 target: Some(mount.mount_path.clone()),
82 source: Some(volume_name.clone()),
83 typ: Some(MountTypeEnum::VOLUME),
84 read_only: mount.read_only,
85 ..Default::default()
86 });
87
88 if let Some(storage_data) = &self.metadata.storage {
89 if let Some(urls) = storage_data.get(&mount.name) {
90 let has_state =
91 urls.get("expected_hash").and_then(|h| h.as_str()).is_some();
92
93 if has_state {
94 if let Some(get_url) = urls.get("get_url").and_then(|u| u.as_str()) {
95 info!(
96 "Unparking storage '{}' (resume state) for volume '{}'",
97 mount.name, volume_name
98 );
99 if let Some(nats) = &self.nats {
100 let unpacking_event = serde_json::json!({
101 "run_id": self.metadata.run_id,
102 "step_id": self.metadata.step_id,
103 "status": "unpacking_sfs",
104 "timestamp": chrono::Utc::now(),
105 });
106 let _ = nats
107 .publish(
108 "stormchaser.step.unpacking_sfs",
109 unpacking_event.to_string().into(),
110 )
111 .await;
112 }
113 self.unpark_storage(&volume_name, &mount.mount_path, get_url)
114 .await?;
115 }
116 } else if let Some(provision) =
117 urls.get("provision").and_then(|p| p.as_array())
118 {
119 for prov in provision {
120 if let (Some(url), Some(dest)) = (
121 prov.get("url").and_then(|u| u.as_str()),
122 prov.get("destination").and_then(|d| d.as_str()),
123 ) {
124 info!(
125 "Provisioning storage '{}' from URL '{}' into destination '{}'",
126 mount.name, url, dest
127 );
128 if let Some(nats) = &self.nats {
129 let unpacking_event = serde_json::json!({
130 "run_id": self.metadata.run_id,
131 "step_id": self.metadata.step_id,
132 "status": "unpacking_sfs",
133 "timestamp": chrono::Utc::now(),
134 });
135 let _ = nats
136 .publish(
137 "stormchaser.step.unpacking_sfs",
138 unpacking_event.to_string().into(),
139 )
140 .await;
141 }
142 let mut full_dest = PathBuf::from(&mount.mount_path);
143 if dest != "/" && !dest.is_empty() {
144 let relative_dest =
145 dest.trim_start_matches('/').replace('/', "");
146 full_dest.push(relative_dest);
147 }
148
149 self.unpark_storage(
150 &volume_name,
151 full_dest.to_str().unwrap_or(&mount.mount_path),
152 url,
153 )
154 .await?;
155 }
156 }
157 }
158 }
159 }
160 }
161 }
162
163 info!("Pulling image {}", spec.image);
164 self.pull_image(&spec.image).await?;
165
166 let network_mode = self.get_network_mode().await;
167 let config =
168 self.build_container_config(&spec, mounts.clone(), network_mode, &storage_names)?;
169
170 info!("Creating container {}", container_name);
171 let dispatched_at = Utc::now();
172 self.docker
173 .create_container(
174 Some(CreateContainerOptions {
175 name: container_name.clone(),
176 ..Default::default()
177 }),
178 config,
179 )
180 .await?;
181
182 info!("Starting container {}", container_name);
183 self.docker
184 .start_container(&container_name, None::<StartContainerOptions<String>>)
185 .await?;
186
187 if let Some(nats) = &self.nats {
188 let running_event = serde_json::json!({
189 "run_id": self.metadata.run_id,
190 "step_id": self.metadata.step_id,
191 "status": "running",
192 "timestamp": chrono::Utc::now(),
193 });
194 let _ = nats
195 .publish("stormchaser.step.running", running_event.to_string().into())
196 .await;
197 }
198
199 Ok(StartResult::Running(DockerContainerMachine {
200 nats: self.nats.clone(),
201 docker: self.docker,
202 metadata: self.metadata,
203 state: state::Running {
204 container_name,
205 dispatched_at,
206 volumes_to_cleanup,
207 storage_names,
208 mounts,
209 },
210 }))
211 }
212
213 async fn unpark_storage(
214 &self,
215 volume_name: &str,
216 mount_path: &str,
217 get_url: &str,
218 ) -> Result<()> {
219 let agent_image = "stormchaser-agent:v1";
220 let unpark_container_name = format!("unpark-{}", Uuid::new_v4());
221 let config = Config {
222 image: Some(agent_image.to_string()),
223 cmd: Some(vec![
224 "/usr/local/bin/stormchaser-agent".to_string(),
225 "unpark".to_string(),
226 "--url".to_string(),
227 get_url.to_string(),
228 "--destination".to_string(),
229 mount_path.to_string(),
230 ]),
231 host_config: Some(HostConfig {
232 mounts: Some(vec![Mount {
233 target: Some(mount_path.to_string()),
234 source: Some(volume_name.to_string()),
235 typ: Some(MountTypeEnum::VOLUME),
236 ..Default::default()
237 }]),
238 network_mode: self.get_network_mode().await,
239 ..Default::default()
240 }),
241 ..Default::default()
242 };
243
244 self.docker
245 .create_container(
246 Some(CreateContainerOptions {
247 name: unpark_container_name.clone(),
248 ..Default::default()
249 }),
250 config,
251 )
252 .await?;
253
254 self.docker
255 .start_container(
256 &unpark_container_name,
257 None::<StartContainerOptions<String>>,
258 )
259 .await?;
260
261 let mut wait_stream = self.docker.wait_container(
262 &unpark_container_name,
263 Some(WaitContainerOptions {
264 condition: "not-running",
265 }),
266 );
267
268 if let Some(wait_result) = wait_stream.next().await {
269 match wait_result {
270 Ok(res) if res.status_code == 0 => {
271 info!("Unpark successful for {}", volume_name);
272 sleep(Duration::from_secs(15)).await;
274 let _ = self
275 .docker
276 .remove_container(&unpark_container_name, None)
277 .await;
278 Ok(())
279 }
280 res => {
281 error!("Unpark failed for {}: {:?}", volume_name, res);
282 Err(anyhow::anyhow!(
283 "Unpark failed for {}: {:?}",
284 volume_name,
285 res
286 ))
287 }
288 }
289 } else {
290 Err(anyhow::anyhow!("Wait stream ended unexpectedly"))
291 }
292 }
293
294 async fn pull_image(&self, image: &str) -> Result<()> {
295 let mut pull_stream = self.docker.create_image(
296 Some(CreateImageOptions {
297 from_image: image.to_string(),
298 ..Default::default()
299 }),
300 None,
301 None,
302 );
303 while let Some(pull_result) = pull_stream.next().await {
304 if let Err(e) = pull_result {
305 error!("Error pulling image {}: {:?}", image, e);
306 anyhow::bail!("Failed to pull image {}: {}", image, e);
307 }
308 }
309 Ok(())
310 }
311}
312
313impl DockerContainerMachine<state::Running> {
314 pub async fn wait(self) -> Result<DockerContainerMachine<state::Finished>> {
316 let container_name = self.state.container_name.clone();
317 let dispatched_at = self.state.dispatched_at;
318 let volumes_to_cleanup = self.state.volumes_to_cleanup.clone();
319 let storage_names = self.state.storage_names.clone();
320 let mounts = self.state.mounts.clone();
321
322 let mut wait_stream = self.docker.wait_container(
323 &container_name,
324 Some(WaitContainerOptions {
325 condition: "not-running",
326 }),
327 );
328
329 let exit_code = if let Some(wait_result) = wait_stream.next().await {
330 match wait_result {
331 Ok(response) => Some(response.status_code),
332 Err(bollard::errors::Error::DockerContainerWaitError { error: _, code }) => {
333 Some(code)
334 }
335 Err(e) => {
336 error!("Error waiting for container {}: {:?}", container_name, e);
337 None
338 }
339 }
340 } else {
341 None
342 };
343
344 let latency_ms = (dispatched_at - self.metadata.received_at)
345 .num_milliseconds()
346 .max(0) as u64;
347 let duration_ms = (Utc::now() - dispatched_at).num_milliseconds().max(0) as u64;
348
349 let mut metrics = ContainerMetrics {
350 exit_code,
351 duration_ms,
352 latency_ms,
353 ..Default::default()
354 };
355
356 let spec: Option<CommonContainerSpec> =
358 serde_json::from_value(self.metadata.step_dsl.spec.clone()).ok();
359
360 if !storage_names.is_empty() || !self.metadata.step_dsl.reports.is_empty() {
361 let agent_image = "stormchaser-agent:v1";
362 let park_container_name = format!("park-{}", Uuid::new_v4());
363
364 let mut parking_urls = HashMap::new();
365 let mut mount_paths = HashMap::new();
366 let mut artifact_urls = HashMap::new();
367
368 if let Some(storage_data) = &self.metadata.storage {
369 for name in &storage_names {
370 if let Some(urls) = storage_data.get(name) {
371 parking_urls.insert(name.clone(), urls.clone());
372 if let Some(artifacts) = urls.get("artifacts").and_then(|a| a.as_object()) {
373 let allowed_artifacts = self.metadata.step_dsl.artifacts.as_ref();
374 for (art_name, art_val) in artifacts {
375 if let Some(allowed) = allowed_artifacts {
376 if !allowed.contains(art_name) {
377 continue;
378 }
379 } else {
380 continue;
382 }
383
384 let mut cloned_art = art_val.clone();
385 if let Some(m) = spec
386 .as_ref()
387 .and_then(|s| s.storage_mounts.as_ref())
388 .and_then(|ms| ms.iter().find(|m| &m.name == name))
389 {
390 if let Some(p) = art_val.get("path").and_then(|p| p.as_str()) {
391 let abs_path = std::path::Path::new(&m.mount_path).join(p);
392 cloned_art["path"] =
393 Value::String(abs_path.to_string_lossy().to_string());
394 }
395 }
396 artifact_urls.insert(art_name.clone(), cloned_art);
397 }
398 }
399 }
400 if let Some(m) = spec
401 .as_ref()
402 .and_then(|s| s.storage_mounts.as_ref())
403 .and_then(|ms| ms.iter().find(|m| &m.name == name))
404 {
405 mount_paths.insert(name.clone(), m.mount_path.clone());
406 }
407 }
408 }
409
410 let mut agent_args = vec![
411 "run".to_string(),
412 "--parking-urls".to_string(),
413 serde_json::to_string(&parking_urls)?,
414 "--mount-paths".to_string(),
415 serde_json::to_string(&mount_paths)?,
416 ];
417
418 if !artifact_urls.is_empty() {
419 agent_args.push("--artifact-urls".to_string());
420 agent_args.push(serde_json::to_string(&artifact_urls)?);
421 }
422
423 if let Some(reports) = &self.metadata.test_report_urls {
424 if !reports.is_empty() {
425 agent_args.push("--report-urls".to_string());
426 agent_args.push(serde_json::to_string(reports)?);
427 }
428 }
429
430 if !self.metadata.step_dsl.reports.is_empty() {
431 agent_args.push("--test-reports".to_string());
432 agent_args.push(serde_json::to_string(&self.metadata.step_dsl.reports)?);
433 }
434
435 if exit_code != Some(0) {
436 }
439
440 agent_args.push("--".to_string());
441 agent_args.push("/bin/true".to_string());
442
443 let agent_config = Config {
444 image: Some(agent_image.to_string()),
445 cmd: Some(agent_args),
446 entrypoint: Some(vec!["/usr/local/bin/stormchaser-agent".to_string()]),
447 host_config: Some(HostConfig {
448 mounts: Some(mounts),
449 network_mode: self.get_network_mode().await,
450 ..Default::default()
451 }),
452 ..Default::default()
453 };
454
455 info!("Running agent for parking/reports: {}", park_container_name);
456 if let Some(nats) = &self.nats {
457 let packing_event = serde_json::json!({
458 "run_id": self.metadata.run_id,
459 "step_id": self.metadata.step_id,
460 "status": "packing_sfs",
461 "timestamp": chrono::Utc::now(),
462 });
463 let _ = nats
464 .publish(
465 "stormchaser.step.packing_sfs",
466 packing_event.to_string().into(),
467 )
468 .await;
469 }
470
471 self.docker
472 .create_container(
473 Some(CreateContainerOptions {
474 name: park_container_name.clone(),
475 ..Default::default()
476 }),
477 agent_config,
478 )
479 .await?;
480
481 self.docker
482 .start_container(&park_container_name, None::<StartContainerOptions<String>>)
483 .await?;
484
485 let mut agent_wait_stream = self.docker.wait_container(
486 &park_container_name,
487 Some(WaitContainerOptions {
488 condition: "not-running",
489 }),
490 );
491
492 let _ = agent_wait_stream.next().await;
493
494 if let Ok(Some(artifacts)) = self.get_artifact_meta(&park_container_name).await {
495 metrics.artifacts = Some(artifacts);
496 }
497 if let Ok(Some(hashes)) = self.get_storage_hashes(&park_container_name).await {
498 metrics.storage_hashes = Some(hashes);
499 }
500 if let Ok(Some(reports)) = self.get_test_reports(&park_container_name).await {
501 metrics.test_reports = Some(reports);
502 }
503
504 sleep(Duration::from_secs(15)).await;
506 let _ = self
507 .docker
508 .remove_container(&park_container_name, None)
509 .await;
510 } else {
511 if let Ok(Some(artifacts)) = self.get_artifact_meta(&container_name).await {
513 metrics.artifacts = Some(artifacts);
514 }
515 if let Ok(Some(hashes)) = self.get_storage_hashes(&container_name).await {
516 metrics.storage_hashes = Some(hashes);
517 }
518 if let Ok(Some(reports)) = self.get_test_reports(&container_name).await {
519 metrics.test_reports = Some(reports);
520 }
521 }
522
523 sleep(Duration::from_secs(15)).await;
526 let _ = self.docker.remove_container(&container_name, None).await;
527
528 for vol in volumes_to_cleanup {
529 info!("Cleaning up volume: {}", vol);
530 let _ = self.docker.remove_volume(&vol, None).await;
531 }
532
533 let result = if exit_code == Some(0) {
534 info!("Container {} completed successfully", container_name);
535 ContainerState::Succeeded(metrics)
536 } else {
537 let error_msg = format!("Container exited with code {:?}", exit_code);
538 error!("Container {} failed: {}", container_name, error_msg);
539 ContainerState::Failed(error_msg, metrics)
540 };
541
542 Ok(DockerContainerMachine {
543 nats: self.nats.clone(),
544 docker: self.docker,
545 metadata: self.metadata,
546 state: state::Finished { result },
547 })
548 }
549
550 async fn get_artifact_meta(
551 &self,
552 container_name: &str,
553 ) -> Result<Option<HashMap<String, Value>>> {
554 let mut logs = self.docker.logs(
555 container_name,
556 Some(LogsOptions::<String> {
557 stdout: true,
558 stderr: true,
559 ..Default::default()
560 }),
561 );
562
563 while let Some(log_result) = logs.next().await {
564 if let Ok(output) = log_result {
565 let line = output.to_string();
566 if line.contains("Parked artifacts:") {
567 if let Some(json_start) = line.find('{') {
568 let json_part = &line[json_start..];
569 if let Ok(meta) = serde_json::from_str(json_part) {
570 return Ok(Some(meta));
571 }
572 }
573 }
574 }
575 }
576
577 Ok(None)
578 }
579
580 async fn get_storage_hashes(
581 &self,
582 container_name: &str,
583 ) -> Result<Option<HashMap<String, String>>> {
584 let mut logs = self.docker.logs(
585 container_name,
586 Some(LogsOptions::<String> {
587 stdout: true,
588 stderr: true,
589 ..Default::default()
590 }),
591 );
592
593 while let Some(log_result) = logs.next().await {
594 if let Ok(output) = log_result {
595 let line = output.to_string();
596 if line.contains("Parked storage hashes:") {
597 if let Some(json_start) = line.find('{') {
598 let json_part = &line[json_start..];
599 if let Ok(hashes) = serde_json::from_str(json_part) {
600 return Ok(Some(hashes));
601 }
602 }
603 }
604 }
605 }
606
607 Ok(None)
608 }
609
610 async fn get_test_reports(&self, container_name: &str) -> Result<Option<Value>> {
611 let mut logs = self.docker.logs(
612 container_name,
613 Some(LogsOptions::<String> {
614 stdout: true,
615 stderr: true,
616 ..Default::default()
617 }),
618 );
619
620 while let Some(log_result) = logs.next().await {
621 if let Ok(output) = log_result {
622 let line = output.to_string();
623 if line.contains("Collected test reports JSON:") {
624 if let Some(json_start) = line.find('{') {
625 let json_part = &line[json_start..];
626 if let Ok(reports) = serde_json::from_str(json_part) {
627 return Ok(Some(reports));
628 }
629 }
630 }
631 }
632 }
633
634 Ok(None)
635 }
636}
637
638impl DockerContainerMachine<state::Finished> {
639 pub fn into_result(self) -> ContainerState {
641 self.state.result
642 }
643}
644
645#[cfg(test)]
646mod tests {
647 use super::*;
648 use crate::container_machine::ContainerMetadata;
649 use bollard::Docker;
650 use stormchaser_model::dsl::Step;
651 use uuid::Uuid;
652
653 async fn get_docker() -> Docker {
654 Docker::connect_with_local_defaults().unwrap()
655 }
656
657 fn create_test_metadata(image: &str, cmd: Vec<&str>) -> ContainerMetadata {
658 let spec = serde_json::json!({
659 "image": image,
660 "command": cmd,
661 "cpu_limit": null,
662 "memory_limit": null,
663 "env": null,
664 "ports": null,
665 "storage_mounts": null
666 });
667
668 ContainerMetadata {
669 run_id: Uuid::new_v4(),
670 step_id: Uuid::new_v4(),
671 step_dsl: Step {
672 name: "test_step".to_string(),
673 r#type: "RunContainer".to_string(),
674 spec,
675 condition: None,
676 params: HashMap::new(),
677 strategy: None,
678 aggregation: vec![],
679 iterate: None,
680 iterate_as: None,
681 steps: None,
682 next: vec![],
683 on_failure: None,
684 retry: None,
685 timeout: None,
686 allow_failure: None,
687 start_marker: None,
688 end_marker: None,
689 outputs: vec![],
690 reports: vec![],
691 artifacts: None,
692 },
693 received_at: chrono::Utc::now(),
694 encryption_key: None,
695 storage: None,
696 test_report_urls: None,
697 }
698 }
699
700 #[tokio::test]
701 async fn test_container_lifecycle_success() {
702 let docker = get_docker().await;
703 let metadata = create_test_metadata("alpine:latest", vec!["echo", "hello"]);
704
705 let machine = DockerContainerMachine::new(docker.clone(), metadata, None);
706
707 let start_res = machine.start().await.expect("Failed to start container");
709 let running_machine = match start_res {
710 StartResult::Running(m) => m,
711 StartResult::Failed(_) => panic!("Container failed to start"),
712 };
713
714 let container_name = running_machine.state.container_name.clone();
715
716 let finished_machine = running_machine
718 .wait()
719 .await
720 .expect("Failed to wait on container");
721 let state = finished_machine.into_result();
722
723 match state {
724 ContainerState::Succeeded(metrics) => {
725 assert_eq!(metrics.exit_code, Some(0));
726 }
727 ContainerState::Failed(msg, _) => panic!("Container failed: {}", msg),
728 }
729
730 let inspect = docker.inspect_container(&container_name, None).await;
732 assert!(inspect.is_err(), "Container should be removed");
733 }
734
735 #[tokio::test]
736 async fn test_container_lifecycle_failure() {
737 let docker = get_docker().await;
738 let metadata = create_test_metadata("alpine:latest", vec!["false"]);
739
740 let machine = DockerContainerMachine::new(docker.clone(), metadata, None);
741
742 let start_res = machine.start().await.expect("Failed to start container");
743 let running_machine = match start_res {
744 StartResult::Running(m) => m,
745 StartResult::Failed(_) => panic!("Container failed to start"),
746 };
747
748 let container_name = running_machine.state.container_name.clone();
749
750 let finished_machine = running_machine
751 .wait()
752 .await
753 .expect("Failed to wait on container");
754 let state = finished_machine.into_result();
755
756 match state {
757 ContainerState::Failed(msg, metrics) => {
758 assert!(
759 msg.contains("Some(1)"),
760 "Unexpected failure message: {}",
761 msg
762 );
763 assert_eq!(metrics.exit_code, Some(1));
764 }
765 ContainerState::Succeeded(_) => panic!("Container should have failed"),
766 }
767
768 let inspect = docker.inspect_container(&container_name, None).await;
770 assert!(inspect.is_err(), "Container should be removed");
771 }
772
773 #[tokio::test]
774 async fn test_adopt_container() {
775 let docker = get_docker().await;
776 let metadata = create_test_metadata("alpine:latest", vec!["echo", "hello"]);
777 let machine = DockerContainerMachine::new(docker.clone(), metadata, None);
778
779 let running_machine = machine.adopt("my-adopted-container".to_string());
780 assert_eq!(running_machine.state.container_name, "my-adopted-container");
781 assert!(running_machine.state.volumes_to_cleanup.is_empty());
782 }
783
784 #[tokio::test]
785 async fn test_clean_up_orphaned_container() {
786 let docker = get_docker().await;
787 let container_name = format!("test-cleanup-{}", Uuid::new_v4());
788
789 use futures::StreamExt;
791 let _ = docker
792 .create_image(
793 Some(bollard::image::CreateImageOptions {
794 from_image: "alpine:latest",
795 ..Default::default()
796 }),
797 None,
798 None,
799 )
800 .collect::<Vec<_>>()
801 .await;
802
803 let _ = docker
804 .create_container(
805 Some(bollard::container::CreateContainerOptions {
806 name: container_name.as_str(),
807 ..Default::default()
808 }),
809 bollard::container::Config {
810 image: Some("alpine:latest".to_string()),
811 cmd: Some(vec!["sleep".to_string(), "1000".to_string()]),
812 ..Default::default()
813 },
814 )
815 .await
816 .unwrap();
817
818 let metadata = create_test_metadata("alpine:latest", vec!["sleep", "1000"]);
819 let machine = DockerContainerMachine::new(docker.clone(), metadata, None);
820
821 machine.clean_up(&container_name).await.unwrap();
822
823 let inspect = docker.inspect_container(&container_name, None).await;
825 assert!(inspect.is_err(), "Container should be cleaned up");
826 }
827
828 #[tokio::test]
829 async fn test_into_result() {
830 use super::super::{state, ContainerMetrics, ContainerState, DockerContainerMachine};
831 let docker = get_docker().await;
832 let metadata = create_test_metadata("alpine:latest", vec!["echo", "hello"]);
833 let machine = DockerContainerMachine {
834 nats: None,
835 docker,
836 metadata,
837 state: state::Finished {
838 result: ContainerState::Succeeded(ContainerMetrics::default()),
839 },
840 };
841 let state = machine.into_result();
842 match state {
843 ContainerState::Succeeded(_) => {}
844 _ => panic!("Expected Succeeded"),
845 }
846 }
847}