Skip to main content

stormchaser_runner_docker/container_machine/transitions/
running.rs

1use crate::container_machine::{state, ContainerMetrics, ContainerState, DockerContainerMachine};
2use anyhow::Result;
3use bollard::container::{
4    Config, CreateContainerOptions, LogsOptions, StartContainerOptions, WaitContainerOptions,
5};
6use bollard::service::HostConfig;
7use chrono::Utc;
8use cloudevents::EventBuilder;
9use futures::StreamExt;
10use serde_json::Value;
11use std::collections::HashMap;
12use std::time::Duration;
13use stormchaser_model::dsl::CommonContainerSpec;
14use tokio::time::sleep;
15use tracing::{error, info};
16use uuid::Uuid;
17
18impl DockerContainerMachine<state::Running> {
19    /// Waits for the running container to finish executing, collects metrics and artifacts, and cleans it up.
20    pub async fn wait(self) -> Result<DockerContainerMachine<state::Finished>> {
21        let container_name = self.state.container_name.clone();
22        let dispatched_at = self.state.dispatched_at;
23        let volumes_to_cleanup = self.state.volumes_to_cleanup.clone();
24        let storage_names = self.state.storage_names.clone();
25        let mounts = self.state.mounts.clone();
26
27        let mut wait_stream = self.docker.wait_container(
28            &container_name,
29            Some(WaitContainerOptions {
30                condition: "not-running",
31            }),
32        );
33
34        let exit_code = if let Some(wait_result) = wait_stream.next().await {
35            match wait_result {
36                Ok(response) => Some(response.status_code),
37                Err(bollard::errors::Error::DockerContainerWaitError { error: _, code }) => {
38                    Some(code)
39                }
40                Err(e) => {
41                    error!("Error waiting for container {}: {:?}", container_name, e);
42                    None
43                }
44            }
45        } else {
46            None
47        };
48
49        let latency_ms = (dispatched_at - self.metadata.received_at)
50            .num_milliseconds()
51            .max(0) as u64;
52        let duration_ms = (Utc::now() - dispatched_at).num_milliseconds().max(0) as u64;
53
54        let mut metrics = ContainerMetrics {
55            exit_code,
56            duration_ms,
57            latency_ms,
58            ..Default::default()
59        };
60
61        // Capture metadata
62        let spec: Option<CommonContainerSpec> =
63            serde_json::from_value(self.metadata.step_dsl.spec.clone()).ok();
64
65        if !storage_names.is_empty() || !self.metadata.step_dsl.reports.is_empty() {
66            let agent_image = "stormchaser-agent:v1";
67            let park_container_name = format!("park-{}", Uuid::new_v4());
68
69            let mut parking_urls = HashMap::new();
70            let mut mount_paths = HashMap::new();
71            let mut artifact_urls = HashMap::new();
72
73            if let Some(storage_data) = &self.metadata.storage {
74                for name in &storage_names {
75                    if let Some(urls) = storage_data.get(name) {
76                        parking_urls.insert(name.clone(), urls.clone());
77                        if let Some(artifacts) = urls.get("artifacts").and_then(|a| a.as_object()) {
78                            let allowed_artifacts = self.metadata.step_dsl.artifacts.as_ref();
79                            for (art_name, art_val) in artifacts {
80                                if let Some(allowed) = allowed_artifacts {
81                                    if !allowed.contains(art_name) {
82                                        continue;
83                                    }
84                                } else {
85                                    // If step.artifacts is not explicitly defined, we assume it publishes NO artifacts
86                                    continue;
87                                }
88
89                                let mut cloned_art = art_val.clone();
90                                if let Some(m) = spec
91                                    .as_ref()
92                                    .and_then(|s| s.storage_mounts.as_ref())
93                                    .and_then(|ms| ms.iter().find(|m| &m.name == name))
94                                {
95                                    if let Some(p) = art_val.get("path").and_then(|p| p.as_str()) {
96                                        let abs_path = std::path::Path::new(&m.mount_path).join(p);
97                                        cloned_art["path"] =
98                                            Value::String(abs_path.to_string_lossy().to_string());
99                                    }
100                                }
101                                artifact_urls.insert(art_name.clone(), cloned_art);
102                            }
103                        }
104                    }
105                    if let Some(m) = spec
106                        .as_ref()
107                        .and_then(|s| s.storage_mounts.as_ref())
108                        .and_then(|ms| ms.iter().find(|m| &m.name == name))
109                    {
110                        mount_paths.insert(name.clone(), m.mount_path.clone());
111                    }
112                }
113            }
114
115            let mut agent_args = vec![
116                "run".to_string(),
117                "--parking-urls".to_string(),
118                serde_json::to_string(&parking_urls)?,
119                "--mount-paths".to_string(),
120                serde_json::to_string(&mount_paths)?,
121            ];
122
123            if !artifact_urls.is_empty() {
124                agent_args.push("--artifact-urls".to_string());
125                agent_args.push(serde_json::to_string(&artifact_urls)?);
126            }
127
128            if let Some(reports) = &self.metadata.test_report_urls {
129                if !reports.is_empty() {
130                    agent_args.push("--report-urls".to_string());
131                    agent_args.push(serde_json::to_string(reports)?);
132                }
133            }
134
135            if !self.metadata.step_dsl.reports.is_empty() {
136                agent_args.push("--test-reports".to_string());
137                agent_args.push(serde_json::to_string(&self.metadata.step_dsl.reports)?);
138            }
139
140            if exit_code != Some(0) {
141                // If it failed, don't fail parking
142                // (This matches original logic but might need review)
143            }
144
145            agent_args.push("--".to_string());
146            agent_args.push("/bin/true".to_string());
147
148            let agent_config = Config {
149                image: Some(agent_image.to_string()),
150                cmd: Some(agent_args),
151                entrypoint: Some(vec!["/usr/local/bin/stormchaser-agent".to_string()]),
152                host_config: Some(HostConfig {
153                    mounts: Some(mounts),
154                    network_mode: self.get_network_mode().await,
155                    ..Default::default()
156                }),
157                ..Default::default()
158            };
159
160            info!("Running agent for parking/reports: {}", park_container_name);
161            if let Some(nats) = &self.nats {
162                let packing_event = serde_json::json!({
163                    "run_id": self.metadata.run_id,
164                    "step_id": self.metadata.step_id,
165                    "status": "packing_sfs",
166                    "timestamp": chrono::Utc::now(),
167                });
168                if let Ok(ce) = cloudevents::EventBuilderV10::new()
169                    .id(uuid::Uuid::new_v4().to_string())
170                    .ty("stormchaser.v1.step.packing_sfs")
171                    .source("/stormchaser/runner")
172                    .time(chrono::Utc::now())
173                    .data("application/json", packing_event)
174                    .build()
175                {
176                    if let Ok(payload_bytes) = serde_json::to_vec(&ce) {
177                        let _ = nats
178                            .publish("stormchaser.v1.step.packing_sfs", payload_bytes.into())
179                            .await;
180                    }
181                }
182            }
183
184            self.docker
185                .create_container(
186                    Some(CreateContainerOptions {
187                        name: park_container_name.clone(),
188                        ..Default::default()
189                    }),
190                    agent_config,
191                )
192                .await?;
193
194            self.docker
195                .start_container(&park_container_name, None::<StartContainerOptions<String>>)
196                .await?;
197
198            let mut agent_wait_stream = self.docker.wait_container(
199                &park_container_name,
200                Some(WaitContainerOptions {
201                    condition: "not-running",
202                }),
203            );
204
205            let _ = agent_wait_stream.next().await;
206
207            if let Ok(Some(artifacts)) = self.get_artifact_meta(&park_container_name).await {
208                metrics.artifacts = Some(artifacts);
209            }
210            if let Ok(Some(hashes)) = self.get_storage_hashes(&park_container_name).await {
211                metrics.storage_hashes = Some(hashes);
212            }
213            if let Ok(Some(reports)) = self.get_test_reports(&park_container_name).await {
214                metrics.test_reports = Some(reports);
215            }
216
217            // Wait for logs
218            sleep(Duration::from_secs(15)).await;
219            let _ = self
220                .docker
221                .remove_container(&park_container_name, None)
222                .await;
223        } else {
224            // For adopted containers without parking, we still want to grab logs if possible
225            if let Ok(Some(artifacts)) = self.get_artifact_meta(&container_name).await {
226                metrics.artifacts = Some(artifacts);
227            }
228            if let Ok(Some(hashes)) = self.get_storage_hashes(&container_name).await {
229                metrics.storage_hashes = Some(hashes);
230            }
231            if let Ok(Some(reports)) = self.get_test_reports(&container_name).await {
232                metrics.test_reports = Some(reports);
233            }
234        }
235
236        // Cleanup volume and container
237        // Wait a bit for log collector (Alloy) to catch the final logs before we delete the container
238        sleep(Duration::from_secs(15)).await;
239        let _ = self.docker.remove_container(&container_name, None).await;
240
241        for vol in volumes_to_cleanup {
242            info!("Cleaning up volume: {}", vol);
243            let _ = self.docker.remove_volume(&vol, None).await;
244        }
245
246        let result = if exit_code == Some(0) {
247            info!("Container {} completed successfully", container_name);
248            ContainerState::Succeeded(metrics)
249        } else {
250            let error_msg = format!("Container exited with code {:?}", exit_code);
251            error!("Container {} failed: {}", container_name, error_msg);
252            ContainerState::Failed(error_msg, metrics)
253        };
254
255        Ok(DockerContainerMachine {
256            nats: self.nats.clone(),
257            docker: self.docker,
258            metadata: self.metadata,
259            state: state::Finished { result },
260        })
261    }
262
263    async fn get_artifact_meta(
264        &self,
265        container_name: &str,
266    ) -> Result<Option<HashMap<String, Value>>> {
267        let mut logs = self.docker.logs(
268            container_name,
269            Some(LogsOptions::<String> {
270                stdout: true,
271                stderr: true,
272                ..Default::default()
273            }),
274        );
275
276        while let Some(log_result) = logs.next().await {
277            if let Ok(output) = log_result {
278                let line = output.to_string();
279                if line.contains("Parked artifacts:") {
280                    if let Some(json_start) = line.find('{') {
281                        let json_part = &line[json_start..];
282                        if let Ok(meta) = serde_json::from_str(json_part) {
283                            return Ok(Some(meta));
284                        }
285                    }
286                }
287            }
288        }
289
290        Ok(None)
291    }
292
293    async fn get_storage_hashes(
294        &self,
295        container_name: &str,
296    ) -> Result<Option<HashMap<String, String>>> {
297        let mut logs = self.docker.logs(
298            container_name,
299            Some(LogsOptions::<String> {
300                stdout: true,
301                stderr: true,
302                ..Default::default()
303            }),
304        );
305
306        while let Some(log_result) = logs.next().await {
307            if let Ok(output) = log_result {
308                let line = output.to_string();
309                if line.contains("Parked storage hashes:") {
310                    if let Some(json_start) = line.find('{') {
311                        let json_part = &line[json_start..];
312                        if let Ok(hashes) = serde_json::from_str(json_part) {
313                            return Ok(Some(hashes));
314                        }
315                    }
316                }
317            }
318        }
319
320        Ok(None)
321    }
322
323    async fn get_test_reports(&self, container_name: &str) -> Result<Option<Value>> {
324        let mut logs = self.docker.logs(
325            container_name,
326            Some(LogsOptions::<String> {
327                stdout: true,
328                stderr: true,
329                ..Default::default()
330            }),
331        );
332
333        while let Some(log_result) = logs.next().await {
334            if let Ok(output) = log_result {
335                let line = output.to_string();
336                if line.contains("Collected test reports JSON:") {
337                    if let Some(json_start) = line.find('{') {
338                        let json_part = &line[json_start..];
339                        if let Ok(reports) = serde_json::from_str(json_part) {
340                            return Ok(Some(reports));
341                        }
342                    }
343                }
344            }
345        }
346
347        Ok(None)
348    }
349}