Skip to main content

stormchaser_runner_docker/container_machine/
transitions.rs

1use super::{state, ContainerMetrics, ContainerState, DockerContainerMachine, StartResult};
2use anyhow::{Context, Result};
3use bollard::container::{
4    Config, CreateContainerOptions, LogsOptions, StartContainerOptions, WaitContainerOptions,
5};
6use bollard::image::CreateImageOptions;
7use bollard::service::{HostConfig, Mount, MountTypeEnum};
8use bollard::volume::CreateVolumeOptions;
9use chrono::Utc;
10use futures::StreamExt;
11use serde_json::Value;
12use std::collections::HashMap;
13use std::path::PathBuf;
14use std::time::Duration;
15use stormchaser_model::dsl::CommonContainerSpec;
16use tokio::time::sleep;
17use tracing::{error, info};
18use uuid::Uuid;
19
20impl DockerContainerMachine<state::Initialized> {
21    /// Adopts an already-running container by name, transitioning the state machine to `Running`.
22    pub fn adopt(self, container_name: String) -> DockerContainerMachine<state::Running> {
23        info!("Adopting orphaned Docker container {}", container_name);
24        DockerContainerMachine {
25            nats: self.nats.clone(),
26            docker: self.docker,
27            metadata: self.metadata,
28            state: state::Running {
29                container_name,
30                dispatched_at: Utc::now(),
31                volumes_to_cleanup: Vec::new(),
32                storage_names: Vec::new(),
33                mounts: Vec::new(),
34            },
35        }
36    }
37
38    /// Cleans up an orphaned container without running it through the state machine.
39    pub async fn clean_up(self, container_name: &str) -> Result<()> {
40        info!("Cleaning up orphaned Docker container {}", container_name);
41        let _ = self.docker.stop_container(container_name, None).await;
42        let _ = self.docker.remove_container(container_name, None).await;
43        Ok(())
44    }
45
46    /// Starts a new Docker container, pulling images and unparking storage as necessary.
47    pub async fn start(self) -> Result<StartResult> {
48        let container_name = format!(
49            "storm-{}-{}",
50            self.metadata.step_dsl.name.to_lowercase().replace('_', "-"),
51            &self.metadata.step_id.to_string()[..8]
52        );
53
54        let spec: CommonContainerSpec = serde_json::from_value(self.metadata.step_dsl.spec.clone())
55            .context("Failed to parse RunContainer spec as CommonContainerSpec")?;
56
57        let mut mounts = Vec::new();
58        let mut storage_names = Vec::new();
59        let mut volumes_to_cleanup = Vec::new();
60
61        if let Some(storage_mounts) = &spec.storage_mounts {
62            for mount in storage_mounts {
63                storage_names.push(mount.name.clone());
64                let volume_name = format!(
65                    "sfs-{}-{}",
66                    mount.name.to_lowercase().replace('_', "-"),
67                    &self.metadata.run_id.to_string()[..8]
68                );
69
70                info!("Ensuring Docker volume exists: {}", volume_name);
71                self.docker
72                    .create_volume(CreateVolumeOptions {
73                        name: volume_name.clone(),
74                        ..Default::default()
75                    })
76                    .await?;
77
78                volumes_to_cleanup.push(volume_name.clone());
79
80                mounts.push(Mount {
81                    target: Some(mount.mount_path.clone()),
82                    source: Some(volume_name.clone()),
83                    typ: Some(MountTypeEnum::VOLUME),
84                    read_only: mount.read_only,
85                    ..Default::default()
86                });
87
88                if let Some(storage_data) = &self.metadata.storage {
89                    if let Some(urls) = storage_data.get(&mount.name) {
90                        let has_state =
91                            urls.get("expected_hash").and_then(|h| h.as_str()).is_some();
92
93                        if has_state {
94                            if let Some(get_url) = urls.get("get_url").and_then(|u| u.as_str()) {
95                                info!(
96                                    "Unparking storage '{}' (resume state) for volume '{}'",
97                                    mount.name, volume_name
98                                );
99                                if let Some(nats) = &self.nats {
100                                    let unpacking_event = serde_json::json!({
101                                        "run_id": self.metadata.run_id,
102                                        "step_id": self.metadata.step_id,
103                                        "status": "unpacking_sfs",
104                                        "timestamp": chrono::Utc::now(),
105                                    });
106                                    let _ = nats
107                                        .publish(
108                                            "stormchaser.step.unpacking_sfs",
109                                            unpacking_event.to_string().into(),
110                                        )
111                                        .await;
112                                }
113                                self.unpark_storage(&volume_name, &mount.mount_path, get_url)
114                                    .await?;
115                            }
116                        } else if let Some(provision) =
117                            urls.get("provision").and_then(|p| p.as_array())
118                        {
119                            for prov in provision {
120                                if let (Some(url), Some(dest)) = (
121                                    prov.get("url").and_then(|u| u.as_str()),
122                                    prov.get("destination").and_then(|d| d.as_str()),
123                                ) {
124                                    info!(
125                                        "Provisioning storage '{}' from URL '{}' into destination '{}'",
126                                        mount.name, url, dest
127                                    );
128                                    if let Some(nats) = &self.nats {
129                                        let unpacking_event = serde_json::json!({
130                                            "run_id": self.metadata.run_id,
131                                            "step_id": self.metadata.step_id,
132                                            "status": "unpacking_sfs",
133                                            "timestamp": chrono::Utc::now(),
134                                        });
135                                        let _ = nats
136                                            .publish(
137                                                "stormchaser.step.unpacking_sfs",
138                                                unpacking_event.to_string().into(),
139                                            )
140                                            .await;
141                                    }
142                                    let mut full_dest = PathBuf::from(&mount.mount_path);
143                                    if dest != "/" && !dest.is_empty() {
144                                        let relative_dest =
145                                            dest.trim_start_matches('/').replace('/', "");
146                                        full_dest.push(relative_dest);
147                                    }
148
149                                    self.unpark_storage(
150                                        &volume_name,
151                                        full_dest.to_str().unwrap_or(&mount.mount_path),
152                                        url,
153                                    )
154                                    .await?;
155                                }
156                            }
157                        }
158                    }
159                }
160            }
161        }
162
163        info!("Pulling image {}", spec.image);
164        self.pull_image(&spec.image).await?;
165
166        let network_mode = self.get_network_mode().await;
167        let config =
168            self.build_container_config(&spec, mounts.clone(), network_mode, &storage_names)?;
169
170        info!("Creating container {}", container_name);
171        let dispatched_at = Utc::now();
172        self.docker
173            .create_container(
174                Some(CreateContainerOptions {
175                    name: container_name.clone(),
176                    ..Default::default()
177                }),
178                config,
179            )
180            .await?;
181
182        info!("Starting container {}", container_name);
183        self.docker
184            .start_container(&container_name, None::<StartContainerOptions<String>>)
185            .await?;
186
187        if let Some(nats) = &self.nats {
188            let running_event = serde_json::json!({
189                "run_id": self.metadata.run_id,
190                "step_id": self.metadata.step_id,
191                "status": "running",
192                "timestamp": chrono::Utc::now(),
193            });
194            let _ = nats
195                .publish("stormchaser.step.running", running_event.to_string().into())
196                .await;
197        }
198
199        Ok(StartResult::Running(DockerContainerMachine {
200            nats: self.nats.clone(),
201            docker: self.docker,
202            metadata: self.metadata,
203            state: state::Running {
204                container_name,
205                dispatched_at,
206                volumes_to_cleanup,
207                storage_names,
208                mounts,
209            },
210        }))
211    }
212
213    async fn unpark_storage(
214        &self,
215        volume_name: &str,
216        mount_path: &str,
217        get_url: &str,
218    ) -> Result<()> {
219        let agent_image = "stormchaser-agent:v1";
220        let unpark_container_name = format!("unpark-{}", Uuid::new_v4());
221        let config = Config {
222            image: Some(agent_image.to_string()),
223            cmd: Some(vec![
224                "/usr/local/bin/stormchaser-agent".to_string(),
225                "unpark".to_string(),
226                "--url".to_string(),
227                get_url.to_string(),
228                "--destination".to_string(),
229                mount_path.to_string(),
230            ]),
231            host_config: Some(HostConfig {
232                mounts: Some(vec![Mount {
233                    target: Some(mount_path.to_string()),
234                    source: Some(volume_name.to_string()),
235                    typ: Some(MountTypeEnum::VOLUME),
236                    ..Default::default()
237                }]),
238                network_mode: self.get_network_mode().await,
239                ..Default::default()
240            }),
241            ..Default::default()
242        };
243
244        self.docker
245            .create_container(
246                Some(CreateContainerOptions {
247                    name: unpark_container_name.clone(),
248                    ..Default::default()
249                }),
250                config,
251            )
252            .await?;
253
254        self.docker
255            .start_container(
256                &unpark_container_name,
257                None::<StartContainerOptions<String>>,
258            )
259            .await?;
260
261        let mut wait_stream = self.docker.wait_container(
262            &unpark_container_name,
263            Some(WaitContainerOptions {
264                condition: "not-running",
265            }),
266        );
267
268        if let Some(wait_result) = wait_stream.next().await {
269            match wait_result {
270                Ok(res) if res.status_code == 0 => {
271                    info!("Unpark successful for {}", volume_name);
272                    // Wait for logs
273                    sleep(Duration::from_secs(15)).await;
274                    let _ = self
275                        .docker
276                        .remove_container(&unpark_container_name, None)
277                        .await;
278                    Ok(())
279                }
280                res => {
281                    error!("Unpark failed for {}: {:?}", volume_name, res);
282                    Err(anyhow::anyhow!(
283                        "Unpark failed for {}: {:?}",
284                        volume_name,
285                        res
286                    ))
287                }
288            }
289        } else {
290            Err(anyhow::anyhow!("Wait stream ended unexpectedly"))
291        }
292    }
293
294    async fn pull_image(&self, image: &str) -> Result<()> {
295        let mut pull_stream = self.docker.create_image(
296            Some(CreateImageOptions {
297                from_image: image.to_string(),
298                ..Default::default()
299            }),
300            None,
301            None,
302        );
303        while let Some(pull_result) = pull_stream.next().await {
304            if let Err(e) = pull_result {
305                error!("Error pulling image {}: {:?}", image, e);
306                anyhow::bail!("Failed to pull image {}: {}", image, e);
307            }
308        }
309        Ok(())
310    }
311}
312
313impl DockerContainerMachine<state::Running> {
314    /// Waits for the running container to finish executing, collects metrics and artifacts, and cleans it up.
315    pub async fn wait(self) -> Result<DockerContainerMachine<state::Finished>> {
316        let container_name = self.state.container_name.clone();
317        let dispatched_at = self.state.dispatched_at;
318        let volumes_to_cleanup = self.state.volumes_to_cleanup.clone();
319        let storage_names = self.state.storage_names.clone();
320        let mounts = self.state.mounts.clone();
321
322        let mut wait_stream = self.docker.wait_container(
323            &container_name,
324            Some(WaitContainerOptions {
325                condition: "not-running",
326            }),
327        );
328
329        let exit_code = if let Some(wait_result) = wait_stream.next().await {
330            match wait_result {
331                Ok(response) => Some(response.status_code),
332                Err(bollard::errors::Error::DockerContainerWaitError { error: _, code }) => {
333                    Some(code)
334                }
335                Err(e) => {
336                    error!("Error waiting for container {}: {:?}", container_name, e);
337                    None
338                }
339            }
340        } else {
341            None
342        };
343
344        let latency_ms = (dispatched_at - self.metadata.received_at)
345            .num_milliseconds()
346            .max(0) as u64;
347        let duration_ms = (Utc::now() - dispatched_at).num_milliseconds().max(0) as u64;
348
349        let mut metrics = ContainerMetrics {
350            exit_code,
351            duration_ms,
352            latency_ms,
353            ..Default::default()
354        };
355
356        // Capture metadata
357        let spec: Option<CommonContainerSpec> =
358            serde_json::from_value(self.metadata.step_dsl.spec.clone()).ok();
359
360        if !storage_names.is_empty() || !self.metadata.step_dsl.reports.is_empty() {
361            let agent_image = "stormchaser-agent:v1";
362            let park_container_name = format!("park-{}", Uuid::new_v4());
363
364            let mut parking_urls = HashMap::new();
365            let mut mount_paths = HashMap::new();
366            let mut artifact_urls = HashMap::new();
367
368            if let Some(storage_data) = &self.metadata.storage {
369                for name in &storage_names {
370                    if let Some(urls) = storage_data.get(name) {
371                        parking_urls.insert(name.clone(), urls.clone());
372                        if let Some(artifacts) = urls.get("artifacts").and_then(|a| a.as_object()) {
373                            let allowed_artifacts = self.metadata.step_dsl.artifacts.as_ref();
374                            for (art_name, art_val) in artifacts {
375                                if let Some(allowed) = allowed_artifacts {
376                                    if !allowed.contains(art_name) {
377                                        continue;
378                                    }
379                                } else {
380                                    // If step.artifacts is not explicitly defined, we assume it publishes NO artifacts
381                                    continue;
382                                }
383
384                                let mut cloned_art = art_val.clone();
385                                if let Some(m) = spec
386                                    .as_ref()
387                                    .and_then(|s| s.storage_mounts.as_ref())
388                                    .and_then(|ms| ms.iter().find(|m| &m.name == name))
389                                {
390                                    if let Some(p) = art_val.get("path").and_then(|p| p.as_str()) {
391                                        let abs_path = std::path::Path::new(&m.mount_path).join(p);
392                                        cloned_art["path"] =
393                                            Value::String(abs_path.to_string_lossy().to_string());
394                                    }
395                                }
396                                artifact_urls.insert(art_name.clone(), cloned_art);
397                            }
398                        }
399                    }
400                    if let Some(m) = spec
401                        .as_ref()
402                        .and_then(|s| s.storage_mounts.as_ref())
403                        .and_then(|ms| ms.iter().find(|m| &m.name == name))
404                    {
405                        mount_paths.insert(name.clone(), m.mount_path.clone());
406                    }
407                }
408            }
409
410            let mut agent_args = vec![
411                "run".to_string(),
412                "--parking-urls".to_string(),
413                serde_json::to_string(&parking_urls)?,
414                "--mount-paths".to_string(),
415                serde_json::to_string(&mount_paths)?,
416            ];
417
418            if !artifact_urls.is_empty() {
419                agent_args.push("--artifact-urls".to_string());
420                agent_args.push(serde_json::to_string(&artifact_urls)?);
421            }
422
423            if let Some(reports) = &self.metadata.test_report_urls {
424                if !reports.is_empty() {
425                    agent_args.push("--report-urls".to_string());
426                    agent_args.push(serde_json::to_string(reports)?);
427                }
428            }
429
430            if !self.metadata.step_dsl.reports.is_empty() {
431                agent_args.push("--test-reports".to_string());
432                agent_args.push(serde_json::to_string(&self.metadata.step_dsl.reports)?);
433            }
434
435            if exit_code != Some(0) {
436                // If it failed, don't fail parking
437                // (This matches original logic but might need review)
438            }
439
440            agent_args.push("--".to_string());
441            agent_args.push("/bin/true".to_string());
442
443            let agent_config = Config {
444                image: Some(agent_image.to_string()),
445                cmd: Some(agent_args),
446                entrypoint: Some(vec!["/usr/local/bin/stormchaser-agent".to_string()]),
447                host_config: Some(HostConfig {
448                    mounts: Some(mounts),
449                    network_mode: self.get_network_mode().await,
450                    ..Default::default()
451                }),
452                ..Default::default()
453            };
454
455            info!("Running agent for parking/reports: {}", park_container_name);
456            if let Some(nats) = &self.nats {
457                let packing_event = serde_json::json!({
458                    "run_id": self.metadata.run_id,
459                    "step_id": self.metadata.step_id,
460                    "status": "packing_sfs",
461                    "timestamp": chrono::Utc::now(),
462                });
463                let _ = nats
464                    .publish(
465                        "stormchaser.step.packing_sfs",
466                        packing_event.to_string().into(),
467                    )
468                    .await;
469            }
470
471            self.docker
472                .create_container(
473                    Some(CreateContainerOptions {
474                        name: park_container_name.clone(),
475                        ..Default::default()
476                    }),
477                    agent_config,
478                )
479                .await?;
480
481            self.docker
482                .start_container(&park_container_name, None::<StartContainerOptions<String>>)
483                .await?;
484
485            let mut agent_wait_stream = self.docker.wait_container(
486                &park_container_name,
487                Some(WaitContainerOptions {
488                    condition: "not-running",
489                }),
490            );
491
492            let _ = agent_wait_stream.next().await;
493
494            if let Ok(Some(artifacts)) = self.get_artifact_meta(&park_container_name).await {
495                metrics.artifacts = Some(artifacts);
496            }
497            if let Ok(Some(hashes)) = self.get_storage_hashes(&park_container_name).await {
498                metrics.storage_hashes = Some(hashes);
499            }
500            if let Ok(Some(reports)) = self.get_test_reports(&park_container_name).await {
501                metrics.test_reports = Some(reports);
502            }
503
504            // Wait for logs
505            sleep(Duration::from_secs(15)).await;
506            let _ = self
507                .docker
508                .remove_container(&park_container_name, None)
509                .await;
510        } else {
511            // For adopted containers without parking, we still want to grab logs if possible
512            if let Ok(Some(artifacts)) = self.get_artifact_meta(&container_name).await {
513                metrics.artifacts = Some(artifacts);
514            }
515            if let Ok(Some(hashes)) = self.get_storage_hashes(&container_name).await {
516                metrics.storage_hashes = Some(hashes);
517            }
518            if let Ok(Some(reports)) = self.get_test_reports(&container_name).await {
519                metrics.test_reports = Some(reports);
520            }
521        }
522
523        // Cleanup volume and container
524        // Wait a bit for log collector (Alloy) to catch the final logs before we delete the container
525        sleep(Duration::from_secs(15)).await;
526        let _ = self.docker.remove_container(&container_name, None).await;
527
528        for vol in volumes_to_cleanup {
529            info!("Cleaning up volume: {}", vol);
530            let _ = self.docker.remove_volume(&vol, None).await;
531        }
532
533        let result = if exit_code == Some(0) {
534            info!("Container {} completed successfully", container_name);
535            ContainerState::Succeeded(metrics)
536        } else {
537            let error_msg = format!("Container exited with code {:?}", exit_code);
538            error!("Container {} failed: {}", container_name, error_msg);
539            ContainerState::Failed(error_msg, metrics)
540        };
541
542        Ok(DockerContainerMachine {
543            nats: self.nats.clone(),
544            docker: self.docker,
545            metadata: self.metadata,
546            state: state::Finished { result },
547        })
548    }
549
550    async fn get_artifact_meta(
551        &self,
552        container_name: &str,
553    ) -> Result<Option<HashMap<String, Value>>> {
554        let mut logs = self.docker.logs(
555            container_name,
556            Some(LogsOptions::<String> {
557                stdout: true,
558                stderr: true,
559                ..Default::default()
560            }),
561        );
562
563        while let Some(log_result) = logs.next().await {
564            if let Ok(output) = log_result {
565                let line = output.to_string();
566                if line.contains("Parked artifacts:") {
567                    if let Some(json_start) = line.find('{') {
568                        let json_part = &line[json_start..];
569                        if let Ok(meta) = serde_json::from_str(json_part) {
570                            return Ok(Some(meta));
571                        }
572                    }
573                }
574            }
575        }
576
577        Ok(None)
578    }
579
580    async fn get_storage_hashes(
581        &self,
582        container_name: &str,
583    ) -> Result<Option<HashMap<String, String>>> {
584        let mut logs = self.docker.logs(
585            container_name,
586            Some(LogsOptions::<String> {
587                stdout: true,
588                stderr: true,
589                ..Default::default()
590            }),
591        );
592
593        while let Some(log_result) = logs.next().await {
594            if let Ok(output) = log_result {
595                let line = output.to_string();
596                if line.contains("Parked storage hashes:") {
597                    if let Some(json_start) = line.find('{') {
598                        let json_part = &line[json_start..];
599                        if let Ok(hashes) = serde_json::from_str(json_part) {
600                            return Ok(Some(hashes));
601                        }
602                    }
603                }
604            }
605        }
606
607        Ok(None)
608    }
609
610    async fn get_test_reports(&self, container_name: &str) -> Result<Option<Value>> {
611        let mut logs = self.docker.logs(
612            container_name,
613            Some(LogsOptions::<String> {
614                stdout: true,
615                stderr: true,
616                ..Default::default()
617            }),
618        );
619
620        while let Some(log_result) = logs.next().await {
621            if let Ok(output) = log_result {
622                let line = output.to_string();
623                if line.contains("Collected test reports JSON:") {
624                    if let Some(json_start) = line.find('{') {
625                        let json_part = &line[json_start..];
626                        if let Ok(reports) = serde_json::from_str(json_part) {
627                            return Ok(Some(reports));
628                        }
629                    }
630                }
631            }
632        }
633
634        Ok(None)
635    }
636}
637
638impl DockerContainerMachine<state::Finished> {
639    /// Consumes the state machine, returning the final `ContainerState` result.
640    pub fn into_result(self) -> ContainerState {
641        self.state.result
642    }
643}
644
645#[cfg(test)]
646mod tests {
647    use super::*;
648    use crate::container_machine::ContainerMetadata;
649    use bollard::Docker;
650    use stormchaser_model::dsl::Step;
651    use uuid::Uuid;
652
653    async fn get_docker() -> Docker {
654        Docker::connect_with_local_defaults().unwrap()
655    }
656
657    fn create_test_metadata(image: &str, cmd: Vec<&str>) -> ContainerMetadata {
658        let spec = serde_json::json!({
659            "image": image,
660            "command": cmd,
661            "cpu_limit": null,
662            "memory_limit": null,
663            "env": null,
664            "ports": null,
665            "storage_mounts": null
666        });
667
668        ContainerMetadata {
669            run_id: Uuid::new_v4(),
670            step_id: Uuid::new_v4(),
671            step_dsl: Step {
672                name: "test_step".to_string(),
673                r#type: "RunContainer".to_string(),
674                spec,
675                condition: None,
676                params: HashMap::new(),
677                strategy: None,
678                aggregation: vec![],
679                iterate: None,
680                iterate_as: None,
681                steps: None,
682                next: vec![],
683                on_failure: None,
684                retry: None,
685                timeout: None,
686                allow_failure: None,
687                start_marker: None,
688                end_marker: None,
689                outputs: vec![],
690                reports: vec![],
691                artifacts: None,
692            },
693            received_at: chrono::Utc::now(),
694            encryption_key: None,
695            storage: None,
696            test_report_urls: None,
697        }
698    }
699
700    #[tokio::test]
701    async fn test_container_lifecycle_success() {
702        let docker = get_docker().await;
703        let metadata = create_test_metadata("alpine:latest", vec!["echo", "hello"]);
704
705        let machine = DockerContainerMachine::new(docker.clone(), metadata, None);
706
707        // Test Start
708        let start_res = machine.start().await.expect("Failed to start container");
709        let running_machine = match start_res {
710            StartResult::Running(m) => m,
711            StartResult::Failed(_) => panic!("Container failed to start"),
712        };
713
714        let container_name = running_machine.state.container_name.clone();
715
716        // Test Wait
717        let finished_machine = running_machine
718            .wait()
719            .await
720            .expect("Failed to wait on container");
721        let state = finished_machine.into_result();
722
723        match state {
724            ContainerState::Succeeded(metrics) => {
725                assert_eq!(metrics.exit_code, Some(0));
726            }
727            ContainerState::Failed(msg, _) => panic!("Container failed: {}", msg),
728        }
729
730        // Ensure container is cleaned up
731        let inspect = docker.inspect_container(&container_name, None).await;
732        assert!(inspect.is_err(), "Container should be removed");
733    }
734
735    #[tokio::test]
736    async fn test_container_lifecycle_failure() {
737        let docker = get_docker().await;
738        let metadata = create_test_metadata("alpine:latest", vec!["false"]);
739
740        let machine = DockerContainerMachine::new(docker.clone(), metadata, None);
741
742        let start_res = machine.start().await.expect("Failed to start container");
743        let running_machine = match start_res {
744            StartResult::Running(m) => m,
745            StartResult::Failed(_) => panic!("Container failed to start"),
746        };
747
748        let container_name = running_machine.state.container_name.clone();
749
750        let finished_machine = running_machine
751            .wait()
752            .await
753            .expect("Failed to wait on container");
754        let state = finished_machine.into_result();
755
756        match state {
757            ContainerState::Failed(msg, metrics) => {
758                assert!(
759                    msg.contains("Some(1)"),
760                    "Unexpected failure message: {}",
761                    msg
762                );
763                assert_eq!(metrics.exit_code, Some(1));
764            }
765            ContainerState::Succeeded(_) => panic!("Container should have failed"),
766        }
767
768        // Ensure container is cleaned up
769        let inspect = docker.inspect_container(&container_name, None).await;
770        assert!(inspect.is_err(), "Container should be removed");
771    }
772
773    #[tokio::test]
774    async fn test_adopt_container() {
775        let docker = get_docker().await;
776        let metadata = create_test_metadata("alpine:latest", vec!["echo", "hello"]);
777        let machine = DockerContainerMachine::new(docker.clone(), metadata, None);
778
779        let running_machine = machine.adopt("my-adopted-container".to_string());
780        assert_eq!(running_machine.state.container_name, "my-adopted-container");
781        assert!(running_machine.state.volumes_to_cleanup.is_empty());
782    }
783
784    #[tokio::test]
785    async fn test_clean_up_orphaned_container() {
786        let docker = get_docker().await;
787        let container_name = format!("test-cleanup-{}", Uuid::new_v4());
788
789        // Create a real container to clean up
790        use futures::StreamExt;
791        let _ = docker
792            .create_image(
793                Some(bollard::image::CreateImageOptions {
794                    from_image: "alpine:latest",
795                    ..Default::default()
796                }),
797                None,
798                None,
799            )
800            .collect::<Vec<_>>()
801            .await;
802
803        let _ = docker
804            .create_container(
805                Some(bollard::container::CreateContainerOptions {
806                    name: container_name.as_str(),
807                    ..Default::default()
808                }),
809                bollard::container::Config {
810                    image: Some("alpine:latest".to_string()),
811                    cmd: Some(vec!["sleep".to_string(), "1000".to_string()]),
812                    ..Default::default()
813                },
814            )
815            .await
816            .unwrap();
817
818        let metadata = create_test_metadata("alpine:latest", vec!["sleep", "1000"]);
819        let machine = DockerContainerMachine::new(docker.clone(), metadata, None);
820
821        machine.clean_up(&container_name).await.unwrap();
822
823        // Verify it is gone
824        let inspect = docker.inspect_container(&container_name, None).await;
825        assert!(inspect.is_err(), "Container should be cleaned up");
826    }
827
828    #[tokio::test]
829    async fn test_into_result() {
830        use super::super::{state, ContainerMetrics, ContainerState, DockerContainerMachine};
831        let docker = get_docker().await;
832        let metadata = create_test_metadata("alpine:latest", vec!["echo", "hello"]);
833        let machine = DockerContainerMachine {
834            nats: None,
835            docker,
836            metadata,
837            state: state::Finished {
838                result: ContainerState::Succeeded(ContainerMetrics::default()),
839            },
840        };
841        let state = machine.into_result();
842        match state {
843            ContainerState::Succeeded(_) => {}
844            _ => panic!("Expected Succeeded"),
845        }
846    }
847}