Skip to main content

stormchaser_runner_docker/container_machine/transitions/
running.rs

1use crate::container_machine::{state, ContainerMetrics, ContainerState, DockerContainerMachine};
2use anyhow::Result;
3use bollard::container::{
4    Config, CreateContainerOptions, LogsOptions, StartContainerOptions, WaitContainerOptions,
5};
6use bollard::service::HostConfig;
7use chrono::Utc;
8use cloudevents::EventBuilder;
9use futures::StreamExt;
10use serde_json::Value;
11use std::collections::HashMap;
12use std::time::Duration;
13use stormchaser_model::dsl::CommonContainerSpec;
14use stormchaser_model::step::StepStatus;
15use stormchaser_model::APPLICATION_JSON;
16use tokio::time::sleep;
17use tracing::{error, info};
18use uuid::Uuid;
19
20impl DockerContainerMachine<state::Running> {
21    /// Waits for the running container to finish executing, collects metrics and artifacts, and cleans it up.
22    pub async fn wait(self) -> Result<DockerContainerMachine<state::Finished>> {
23        let container_name = self.state.container_name.clone();
24        let dispatched_at = self.state.dispatched_at;
25        let volumes_to_cleanup = self.state.volumes_to_cleanup.clone();
26        let storage_names = self.state.storage_names.clone();
27        let mounts = self.state.mounts.clone();
28
29        let mut wait_stream = self.docker.wait_container(
30            &container_name,
31            Some(WaitContainerOptions {
32                condition: "not-running",
33            }),
34        );
35
36        let exit_code = if let Some(wait_result) = wait_stream.next().await {
37            match wait_result {
38                Ok(response) => Some(response.status_code),
39                Err(bollard::errors::Error::DockerContainerWaitError { error: _, code }) => {
40                    Some(code)
41                }
42                Err(e) => {
43                    error!("Error waiting for container {}: {:?}", container_name, e);
44                    None
45                }
46            }
47        } else {
48            None
49        };
50
51        let latency_ms = (dispatched_at - self.metadata.received_at)
52            .num_milliseconds()
53            .max(0) as u64;
54        let duration_ms = (Utc::now() - dispatched_at).num_milliseconds().max(0) as u64;
55
56        let mut metrics = ContainerMetrics {
57            exit_code,
58            duration_ms,
59            latency_ms,
60            ..Default::default()
61        };
62
63        // Capture metadata
64        let spec: Option<CommonContainerSpec> =
65            serde_json::from_value(self.metadata.step_dsl.spec.clone()).ok();
66
67        let (artifacts, storage_hashes, test_reports) = self
68            .collect_metrics(
69                &container_name,
70                &storage_names,
71                mounts,
72                spec.as_ref(),
73                exit_code,
74            )
75            .await?;
76
77        if let Some(a) = artifacts {
78            metrics.artifacts = Some(a);
79        }
80        if let Some(h) = storage_hashes {
81            metrics.storage_hashes = Some(h);
82        }
83        if let Some(r) = test_reports {
84            metrics.test_reports = Some(r);
85        }
86
87        // Cleanup volume and container
88        // Wait a bit for log collector (Alloy) to catch the final logs before we delete the container
89        sleep(Duration::from_secs(15)).await;
90        let _ = self.docker.remove_container(&container_name, None).await;
91
92        for vol in volumes_to_cleanup {
93            info!("Cleaning up volume: {}", vol);
94            let _ = self.docker.remove_volume(&vol, None).await;
95        }
96
97        let result = if exit_code == Some(0) {
98            info!("Container {} completed successfully", container_name);
99            ContainerState::Succeeded(metrics)
100        } else {
101            let error_msg = format!("Container exited with code {:?}", exit_code);
102            error!("Container {} failed: {}", container_name, error_msg);
103            ContainerState::Failed(error_msg, metrics)
104        };
105
106        Ok(DockerContainerMachine {
107            nats: self.nats.clone(),
108            docker: self.docker,
109            metadata: self.metadata,
110            state: state::Finished { result },
111        })
112    }
113
114    async fn collect_metrics(
115        &self,
116        container_name: &str,
117        storage_names: &[String],
118        mounts: Vec<bollard::service::Mount>,
119        spec: Option<&CommonContainerSpec>,
120        exit_code: Option<i64>,
121    ) -> Result<(
122        Option<HashMap<String, Value>>,
123        Option<HashMap<String, String>>,
124        Option<Value>,
125    )> {
126        let mut artifacts_out = None;
127        let mut hashes_out = None;
128        let mut reports_out = None;
129
130        if !storage_names.is_empty() || !self.metadata.step_dsl.reports.is_empty() {
131            let (a, h, r) = self
132                .run_parking_agent(storage_names, mounts, spec, exit_code)
133                .await?;
134            artifacts_out = a;
135            hashes_out = h;
136            reports_out = r;
137        } else {
138            // For adopted containers without parking, we still want to grab logs if possible
139            if let Ok(Some(artifacts)) = self.get_artifact_meta(container_name).await {
140                artifacts_out = Some(artifacts);
141            }
142            if let Ok(Some(hashes)) = self.get_storage_hashes(container_name).await {
143                hashes_out = Some(hashes);
144            }
145            if let Ok(Some(reports)) = self.get_test_reports(container_name).await {
146                reports_out = Some(reports);
147            }
148        }
149
150        Ok((artifacts_out, hashes_out, reports_out))
151    }
152
153    async fn run_parking_agent(
154        &self,
155        storage_names: &[String],
156        mounts: Vec<bollard::service::Mount>,
157        spec: Option<&CommonContainerSpec>,
158        exit_code: Option<i64>,
159    ) -> Result<(
160        Option<HashMap<String, Value>>,
161        Option<HashMap<String, String>>,
162        Option<Value>,
163    )> {
164        let mut artifacts_out = None;
165        let mut hashes_out = None;
166        let mut reports_out = None;
167
168        let agent_image = "stormchaser-agent:v1";
169        let park_container_name = format!("park-{}", Uuid::new_v4());
170
171        let sfs_host_path = std::env::var("STORMCHASER_SFS_HOST_PATH").ok();
172        let (parking_urls, mount_paths, artifact_urls) =
173            self.build_parking_payloads(storage_names, spec, sfs_host_path.as_deref());
174
175        let mut agent_args = vec![
176            "run".to_string(),
177            "--parking-urls".to_string(),
178            serde_json::to_string(&parking_urls)?,
179            "--mount-paths".to_string(),
180            serde_json::to_string(&mount_paths)?,
181        ];
182
183        if !artifact_urls.is_empty() {
184            agent_args.push("--artifact-urls".to_string());
185            agent_args.push(serde_json::to_string(&artifact_urls)?);
186        }
187
188        if let Some(reports) = &self.metadata.test_report_urls {
189            if !reports.is_empty() {
190                agent_args.push("--report-urls".to_string());
191                agent_args.push(serde_json::to_string(reports)?);
192            }
193        }
194
195        if !self.metadata.step_dsl.reports.is_empty() {
196            agent_args.push("--test-reports".to_string());
197            agent_args.push(serde_json::to_string(&self.metadata.step_dsl.reports)?);
198        }
199
200        if exit_code != Some(0) {
201            // If it failed, don't fail parking
202            // (This matches original logic but might need review)
203        }
204
205        agent_args.push("--".to_string());
206        agent_args.push("/bin/true".to_string());
207
208        let agent_config = Config {
209            image: Some(agent_image.to_string()),
210            cmd: Some(agent_args),
211            entrypoint: Some(vec!["/usr/local/bin/stormchaser-agent".to_string()]),
212            host_config: Some(HostConfig {
213                mounts: Some(mounts),
214                network_mode: self.get_network_mode().await,
215                ..Default::default()
216            }),
217            ..Default::default()
218        };
219
220        info!("Running agent for parking/reports: {}", park_container_name);
221        if let Some(nats) = &self.nats {
222            let packing_event = serde_json::json!({
223                "run_id": self.metadata.run_id,
224                "step_id": self.metadata.step_id,
225                "status": StepStatus::PackingSfs,
226                "timestamp": chrono::Utc::now(),
227            });
228            if let Ok(ce) = cloudevents::EventBuilderV10::new()
229                .id(uuid::Uuid::new_v4().to_string())
230                .ty("stormchaser.v1.step.packing_sfs")
231                .source("/stormchaser/runner")
232                .time(chrono::Utc::now())
233                .data(APPLICATION_JSON, packing_event)
234                .build()
235            {
236                if let Ok(payload_bytes) = serde_json::to_vec(&ce) {
237                    let _ = nats
238                        .publish("stormchaser.v1.step.packing_sfs", payload_bytes.into())
239                        .await;
240                }
241            }
242        }
243
244        self.docker
245            .create_container(
246                Some(CreateContainerOptions {
247                    name: park_container_name.clone(),
248                    ..Default::default()
249                }),
250                agent_config,
251            )
252            .await?;
253
254        self.docker
255            .start_container(&park_container_name, None::<StartContainerOptions<String>>)
256            .await?;
257
258        let mut agent_wait_stream = self.docker.wait_container(
259            &park_container_name,
260            Some(WaitContainerOptions {
261                condition: "not-running",
262            }),
263        );
264
265        let _ = agent_wait_stream.next().await;
266
267        if let Ok(Some(artifacts)) = self.get_artifact_meta(&park_container_name).await {
268            artifacts_out = Some(artifacts);
269        }
270        if let Ok(Some(hashes)) = self.get_storage_hashes(&park_container_name).await {
271            hashes_out = Some(hashes);
272        }
273        if let Ok(Some(reports)) = self.get_test_reports(&park_container_name).await {
274            reports_out = Some(reports);
275        }
276
277        // Wait for logs
278        sleep(Duration::from_secs(15)).await;
279        let _ = self
280            .docker
281            .remove_container(&park_container_name, None)
282            .await;
283
284        Ok((artifacts_out, hashes_out, reports_out))
285    }
286    async fn get_artifact_meta(
287        &self,
288        container_name: &str,
289    ) -> Result<Option<HashMap<String, Value>>> {
290        let mut logs = self.docker.logs(
291            container_name,
292            Some(LogsOptions::<String> {
293                stdout: true,
294                stderr: true,
295                ..Default::default()
296            }),
297        );
298
299        while let Some(log_result) = logs.next().await {
300            if let Ok(output) = log_result {
301                let line = output.to_string();
302                if line.contains("Parked artifacts:") {
303                    if let Some(json_start) = line.find('{') {
304                        let json_part = &line[json_start..];
305                        if let Ok(meta) = serde_json::from_str(json_part) {
306                            return Ok(Some(meta));
307                        }
308                    }
309                }
310            }
311        }
312
313        Ok(None)
314    }
315
316    async fn get_storage_hashes(
317        &self,
318        container_name: &str,
319    ) -> Result<Option<HashMap<String, String>>> {
320        let mut logs = self.docker.logs(
321            container_name,
322            Some(LogsOptions::<String> {
323                stdout: true,
324                stderr: true,
325                ..Default::default()
326            }),
327        );
328
329        while let Some(log_result) = logs.next().await {
330            if let Ok(output) = log_result {
331                let line = output.to_string();
332                if line.contains("Parked storage hashes:") {
333                    if let Some(json_start) = line.find('{') {
334                        let json_part = &line[json_start..];
335                        if let Ok(hashes) = serde_json::from_str(json_part) {
336                            return Ok(Some(hashes));
337                        }
338                    }
339                }
340            }
341        }
342
343        Ok(None)
344    }
345
346    async fn get_test_reports(&self, container_name: &str) -> Result<Option<Value>> {
347        let mut logs = self.docker.logs(
348            container_name,
349            Some(LogsOptions::<String> {
350                stdout: true,
351                stderr: true,
352                ..Default::default()
353            }),
354        );
355
356        while let Some(log_result) = logs.next().await {
357            if let Ok(output) = log_result {
358                let line = output.to_string();
359                if line.contains("Collected test reports JSON:") {
360                    if let Some(json_start) = line.find('{') {
361                        let json_part = &line[json_start..];
362                        if let Ok(reports) = serde_json::from_str(json_part) {
363                            return Ok(Some(reports));
364                        }
365                    }
366                }
367            }
368        }
369
370        Ok(None)
371    }
372
373    fn build_parking_payloads(
374        &self,
375        storage_names: &[String],
376        spec: Option<&CommonContainerSpec>,
377        sfs_host_path: Option<&str>,
378    ) -> (
379        HashMap<String, Value>,
380        HashMap<String, String>,
381        HashMap<String, Value>,
382    ) {
383        let mut parking_urls = HashMap::new();
384        let mut mount_paths = HashMap::new();
385        let mut artifact_urls = HashMap::new();
386
387        if let Some(storage_data) = &self.metadata.storage {
388            for name in storage_names {
389                if let Some(urls) = storage_data.get(name) {
390                    let mut cloned_urls = urls.clone();
391                    if sfs_host_path.is_some() {
392                        if let Some(obj) = cloned_urls.as_object_mut() {
393                            obj.remove("put_url");
394                        }
395                    }
396                    parking_urls.insert(name.clone(), cloned_urls);
397                    if let Some(artifacts) = urls.get("artifacts").and_then(|a| a.as_object()) {
398                        let allowed_artifacts = self.metadata.step_dsl.artifacts.as_ref();
399                        for (art_name, art_val) in artifacts {
400                            if let Some(allowed) = allowed_artifacts {
401                                if !allowed.contains(art_name) {
402                                    continue;
403                                }
404                            } else {
405                                // If step.artifacts is not explicitly defined, we assume it publishes NO artifacts
406                                continue;
407                            }
408
409                            let mut cloned_art = art_val.clone();
410                            if let Some(m) = spec
411                                .as_ref()
412                                .and_then(|s| s.storage_mounts.as_ref())
413                                .and_then(|ms| ms.iter().find(|m| &m.name == name))
414                            {
415                                if let Some(p) = art_val.get("path").and_then(|p| p.as_str()) {
416                                    let abs_path = std::path::Path::new(&m.mount_path).join(p);
417                                    cloned_art["path"] =
418                                        Value::String(abs_path.to_string_lossy().to_string());
419                                }
420                            }
421                            artifact_urls.insert(art_name.clone(), cloned_art);
422                        }
423                    }
424                }
425                if let Some(m) = spec
426                    .as_ref()
427                    .and_then(|s| s.storage_mounts.as_ref())
428                    .and_then(|ms| ms.iter().find(|m| &m.name == name))
429                {
430                    mount_paths.insert(name.clone(), m.mount_path.clone());
431                }
432            }
433        }
434
435        (parking_urls, mount_paths, artifact_urls)
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442    use crate::container_machine::ContainerMetadata;
443    use bollard::Docker;
444    use stormchaser_model::dsl::Step;
445
446    fn create_test_metadata() -> ContainerMetadata {
447        let spec = serde_json::json!({
448            "image": "alpine:latest",
449            "command": ["echo", "hello"],
450            "storage_mounts": [
451                {
452                    "name": "workspace",
453                    "mount_path": "/workspace"
454                }
455            ]
456        });
457
458        let mut storage = HashMap::new();
459        storage.insert(
460            "workspace".to_string(),
461            serde_json::json!({
462                "get_url": "http://s3/get",
463                "put_url": "http://s3/put"
464            }),
465        );
466
467        ContainerMetadata {
468            run_id: Uuid::new_v4(),
469            step_id: Uuid::new_v4(),
470            step_dsl: Step {
471                name: "test_step".to_string(),
472                r#type: "RunContainer".to_string(),
473                spec,
474                condition: None,
475                params: HashMap::new(),
476                strategy: None,
477                aggregation: vec![],
478                iterate: None,
479                iterate_as: None,
480                steps: None,
481                next: vec![],
482                on_failure: None,
483                retry: None,
484                timeout: None,
485                allow_failure: None,
486                start_marker: None,
487                end_marker: None,
488                outputs: vec![],
489                reports: vec![],
490                artifacts: None,
491            },
492            received_at: chrono::Utc::now(),
493            encryption_key: None,
494            storage: Some(storage),
495            test_report_urls: None,
496        }
497    }
498
499    #[tokio::test]
500    async fn test_build_parking_payloads_named_volume() {
501        let docker = Docker::connect_with_local_defaults().unwrap();
502        let metadata = create_test_metadata();
503        let init_machine = DockerContainerMachine::new(docker.clone(), metadata, None);
504        let machine = init_machine.adopt("test".to_string());
505
506        let spec: CommonContainerSpec =
507            serde_json::from_value(machine.metadata.step_dsl.spec.clone()).unwrap();
508        let storage_names = vec!["workspace".to_string()];
509
510        let (parking_urls, mount_paths, _) =
511            machine.build_parking_payloads(&storage_names, Some(&spec), None);
512
513        assert_eq!(parking_urls.len(), 1);
514        assert_eq!(mount_paths.len(), 1);
515
516        assert_eq!(mount_paths.get("workspace").unwrap(), "/workspace");
517
518        let urls = parking_urls.get("workspace").unwrap();
519        assert_eq!(
520            urls.get("put_url").and_then(|v| v.as_str()),
521            Some("http://s3/put")
522        );
523        assert_eq!(
524            urls.get("get_url").and_then(|v| v.as_str()),
525            Some("http://s3/get")
526        );
527    }
528
529    #[tokio::test]
530    async fn test_build_parking_payloads_host_bind() {
531        let docker = Docker::connect_with_local_defaults().unwrap();
532        let metadata = create_test_metadata();
533        let init_machine = DockerContainerMachine::new(docker.clone(), metadata, None);
534        let machine = init_machine.adopt("test".to_string());
535
536        let spec: CommonContainerSpec =
537            serde_json::from_value(machine.metadata.step_dsl.spec.clone()).unwrap();
538        let storage_names = vec!["workspace".to_string()];
539
540        let sfs_host_path = Some("/tmp/stormchaser/sfs");
541        let (parking_urls, mount_paths, _) =
542            machine.build_parking_payloads(&storage_names, Some(&spec), sfs_host_path);
543
544        assert_eq!(parking_urls.len(), 1);
545        assert_eq!(mount_paths.len(), 1);
546
547        assert_eq!(mount_paths.get("workspace").unwrap(), "/workspace");
548
549        let urls = parking_urls.get("workspace").unwrap();
550        assert_eq!(
551            urls.get("get_url").and_then(|v| v.as_str()),
552            Some("http://s3/get")
553        );
554        // Should omit put_url because host bind mounts do not park their state
555        assert_eq!(urls.get("put_url"), None);
556    }
557
558    #[test]
559    #[ignore]
560    fn test_run_parking_agent_compiles() {
561        let _f = DockerContainerMachine::<state::Running>::run_parking_agent;
562    }
563}