1use crate::container_machine::{state, ContainerMetrics, ContainerState, DockerContainerMachine};
2use anyhow::Result;
3use bollard::container::{
4 Config, CreateContainerOptions, LogsOptions, StartContainerOptions, WaitContainerOptions,
5};
6use bollard::service::HostConfig;
7use chrono::Utc;
8use cloudevents::EventBuilder;
9use futures::StreamExt;
10use serde_json::Value;
11use std::collections::HashMap;
12use std::time::Duration;
13use stormchaser_model::dsl::CommonContainerSpec;
14use stormchaser_model::step::StepStatus;
15use stormchaser_model::APPLICATION_JSON;
16use tokio::time::sleep;
17use tracing::{error, info};
18use uuid::Uuid;
19
20impl DockerContainerMachine<state::Running> {
21 pub async fn wait(self) -> Result<DockerContainerMachine<state::Finished>> {
23 let container_name = self.state.container_name.clone();
24 let dispatched_at = self.state.dispatched_at;
25 let volumes_to_cleanup = self.state.volumes_to_cleanup.clone();
26 let storage_names = self.state.storage_names.clone();
27 let mounts = self.state.mounts.clone();
28
29 let mut wait_stream = self.docker.wait_container(
30 &container_name,
31 Some(WaitContainerOptions {
32 condition: "not-running",
33 }),
34 );
35
36 let exit_code = if let Some(wait_result) = wait_stream.next().await {
37 match wait_result {
38 Ok(response) => Some(response.status_code),
39 Err(bollard::errors::Error::DockerContainerWaitError { error: _, code }) => {
40 Some(code)
41 }
42 Err(e) => {
43 error!("Error waiting for container {}: {:?}", container_name, e);
44 None
45 }
46 }
47 } else {
48 None
49 };
50
51 let latency_ms = (dispatched_at - self.metadata.received_at)
52 .num_milliseconds()
53 .max(0) as u64;
54 let duration_ms = (Utc::now() - dispatched_at).num_milliseconds().max(0) as u64;
55
56 let mut metrics = ContainerMetrics {
57 exit_code,
58 duration_ms,
59 latency_ms,
60 ..Default::default()
61 };
62
63 let spec: Option<CommonContainerSpec> =
65 serde_json::from_value(self.metadata.step_dsl.spec.clone()).ok();
66
67 let (artifacts, storage_hashes, test_reports) = self
68 .collect_metrics(
69 &container_name,
70 &storage_names,
71 mounts,
72 spec.as_ref(),
73 exit_code,
74 )
75 .await?;
76
77 if let Some(a) = artifacts {
78 metrics.artifacts = Some(a);
79 }
80 if let Some(h) = storage_hashes {
81 metrics.storage_hashes = Some(h);
82 }
83 if let Some(r) = test_reports {
84 metrics.test_reports = Some(r);
85 }
86
87 sleep(Duration::from_secs(15)).await;
90 let _ = self.docker.remove_container(&container_name, None).await;
91
92 for vol in volumes_to_cleanup {
93 info!("Cleaning up volume: {}", vol);
94 let _ = self.docker.remove_volume(&vol, None).await;
95 }
96
97 let result = if exit_code == Some(0) {
98 info!("Container {} completed successfully", container_name);
99 ContainerState::Succeeded(metrics)
100 } else {
101 let error_msg = format!("Container exited with code {:?}", exit_code);
102 error!("Container {} failed: {}", container_name, error_msg);
103 ContainerState::Failed(error_msg, metrics)
104 };
105
106 Ok(DockerContainerMachine {
107 nats: self.nats.clone(),
108 docker: self.docker,
109 metadata: self.metadata,
110 state: state::Finished { result },
111 })
112 }
113
114 async fn collect_metrics(
115 &self,
116 container_name: &str,
117 storage_names: &[String],
118 mounts: Vec<bollard::service::Mount>,
119 spec: Option<&CommonContainerSpec>,
120 exit_code: Option<i64>,
121 ) -> Result<(
122 Option<HashMap<String, Value>>,
123 Option<HashMap<String, String>>,
124 Option<Value>,
125 )> {
126 let mut artifacts_out = None;
127 let mut hashes_out = None;
128 let mut reports_out = None;
129
130 if !storage_names.is_empty() || !self.metadata.step_dsl.reports.is_empty() {
131 let (a, h, r) = self
132 .run_parking_agent(storage_names, mounts, spec, exit_code)
133 .await?;
134 artifacts_out = a;
135 hashes_out = h;
136 reports_out = r;
137 } else {
138 if let Ok(Some(artifacts)) = self.get_artifact_meta(container_name).await {
140 artifacts_out = Some(artifacts);
141 }
142 if let Ok(Some(hashes)) = self.get_storage_hashes(container_name).await {
143 hashes_out = Some(hashes);
144 }
145 if let Ok(Some(reports)) = self.get_test_reports(container_name).await {
146 reports_out = Some(reports);
147 }
148 }
149
150 Ok((artifacts_out, hashes_out, reports_out))
151 }
152
153 async fn run_parking_agent(
154 &self,
155 storage_names: &[String],
156 mounts: Vec<bollard::service::Mount>,
157 spec: Option<&CommonContainerSpec>,
158 exit_code: Option<i64>,
159 ) -> Result<(
160 Option<HashMap<String, Value>>,
161 Option<HashMap<String, String>>,
162 Option<Value>,
163 )> {
164 let mut artifacts_out = None;
165 let mut hashes_out = None;
166 let mut reports_out = None;
167
168 let agent_image = "stormchaser-agent:v1";
169 let park_container_name = format!("park-{}", Uuid::new_v4());
170
171 let sfs_host_path = std::env::var("STORMCHASER_SFS_HOST_PATH").ok();
172 let (parking_urls, mount_paths, artifact_urls) =
173 self.build_parking_payloads(storage_names, spec, sfs_host_path.as_deref());
174
175 let mut agent_args = vec![
176 "run".to_string(),
177 "--parking-urls".to_string(),
178 serde_json::to_string(&parking_urls)?,
179 "--mount-paths".to_string(),
180 serde_json::to_string(&mount_paths)?,
181 ];
182
183 if !artifact_urls.is_empty() {
184 agent_args.push("--artifact-urls".to_string());
185 agent_args.push(serde_json::to_string(&artifact_urls)?);
186 }
187
188 if let Some(reports) = &self.metadata.test_report_urls {
189 if !reports.is_empty() {
190 agent_args.push("--report-urls".to_string());
191 agent_args.push(serde_json::to_string(reports)?);
192 }
193 }
194
195 if !self.metadata.step_dsl.reports.is_empty() {
196 agent_args.push("--test-reports".to_string());
197 agent_args.push(serde_json::to_string(&self.metadata.step_dsl.reports)?);
198 }
199
200 if exit_code != Some(0) {
201 }
204
205 agent_args.push("--".to_string());
206 agent_args.push("/bin/true".to_string());
207
208 let agent_config = Config {
209 image: Some(agent_image.to_string()),
210 cmd: Some(agent_args),
211 entrypoint: Some(vec!["/usr/local/bin/stormchaser-agent".to_string()]),
212 host_config: Some(HostConfig {
213 mounts: Some(mounts),
214 network_mode: self.get_network_mode().await,
215 ..Default::default()
216 }),
217 ..Default::default()
218 };
219
220 info!("Running agent for parking/reports: {}", park_container_name);
221 if let Some(nats) = &self.nats {
222 let packing_event = serde_json::json!({
223 "run_id": self.metadata.run_id,
224 "step_id": self.metadata.step_id,
225 "status": StepStatus::PackingSfs,
226 "timestamp": chrono::Utc::now(),
227 });
228 if let Ok(ce) = cloudevents::EventBuilderV10::new()
229 .id(uuid::Uuid::new_v4().to_string())
230 .ty("stormchaser.v1.step.packing_sfs")
231 .source("/stormchaser/runner")
232 .time(chrono::Utc::now())
233 .data(APPLICATION_JSON, packing_event)
234 .build()
235 {
236 if let Ok(payload_bytes) = serde_json::to_vec(&ce) {
237 let _ = nats
238 .publish("stormchaser.v1.step.packing_sfs", payload_bytes.into())
239 .await;
240 }
241 }
242 }
243
244 self.docker
245 .create_container(
246 Some(CreateContainerOptions {
247 name: park_container_name.clone(),
248 ..Default::default()
249 }),
250 agent_config,
251 )
252 .await?;
253
254 self.docker
255 .start_container(&park_container_name, None::<StartContainerOptions<String>>)
256 .await?;
257
258 let mut agent_wait_stream = self.docker.wait_container(
259 &park_container_name,
260 Some(WaitContainerOptions {
261 condition: "not-running",
262 }),
263 );
264
265 let _ = agent_wait_stream.next().await;
266
267 if let Ok(Some(artifacts)) = self.get_artifact_meta(&park_container_name).await {
268 artifacts_out = Some(artifacts);
269 }
270 if let Ok(Some(hashes)) = self.get_storage_hashes(&park_container_name).await {
271 hashes_out = Some(hashes);
272 }
273 if let Ok(Some(reports)) = self.get_test_reports(&park_container_name).await {
274 reports_out = Some(reports);
275 }
276
277 sleep(Duration::from_secs(15)).await;
279 let _ = self
280 .docker
281 .remove_container(&park_container_name, None)
282 .await;
283
284 Ok((artifacts_out, hashes_out, reports_out))
285 }
286 async fn get_artifact_meta(
287 &self,
288 container_name: &str,
289 ) -> Result<Option<HashMap<String, Value>>> {
290 let mut logs = self.docker.logs(
291 container_name,
292 Some(LogsOptions::<String> {
293 stdout: true,
294 stderr: true,
295 ..Default::default()
296 }),
297 );
298
299 while let Some(log_result) = logs.next().await {
300 if let Ok(output) = log_result {
301 let line = output.to_string();
302 if line.contains("Parked artifacts:") {
303 if let Some(json_start) = line.find('{') {
304 let json_part = &line[json_start..];
305 if let Ok(meta) = serde_json::from_str(json_part) {
306 return Ok(Some(meta));
307 }
308 }
309 }
310 }
311 }
312
313 Ok(None)
314 }
315
316 async fn get_storage_hashes(
317 &self,
318 container_name: &str,
319 ) -> Result<Option<HashMap<String, String>>> {
320 let mut logs = self.docker.logs(
321 container_name,
322 Some(LogsOptions::<String> {
323 stdout: true,
324 stderr: true,
325 ..Default::default()
326 }),
327 );
328
329 while let Some(log_result) = logs.next().await {
330 if let Ok(output) = log_result {
331 let line = output.to_string();
332 if line.contains("Parked storage hashes:") {
333 if let Some(json_start) = line.find('{') {
334 let json_part = &line[json_start..];
335 if let Ok(hashes) = serde_json::from_str(json_part) {
336 return Ok(Some(hashes));
337 }
338 }
339 }
340 }
341 }
342
343 Ok(None)
344 }
345
346 async fn get_test_reports(&self, container_name: &str) -> Result<Option<Value>> {
347 let mut logs = self.docker.logs(
348 container_name,
349 Some(LogsOptions::<String> {
350 stdout: true,
351 stderr: true,
352 ..Default::default()
353 }),
354 );
355
356 while let Some(log_result) = logs.next().await {
357 if let Ok(output) = log_result {
358 let line = output.to_string();
359 if line.contains("Collected test reports JSON:") {
360 if let Some(json_start) = line.find('{') {
361 let json_part = &line[json_start..];
362 if let Ok(reports) = serde_json::from_str(json_part) {
363 return Ok(Some(reports));
364 }
365 }
366 }
367 }
368 }
369
370 Ok(None)
371 }
372
373 fn build_parking_payloads(
374 &self,
375 storage_names: &[String],
376 spec: Option<&CommonContainerSpec>,
377 sfs_host_path: Option<&str>,
378 ) -> (
379 HashMap<String, Value>,
380 HashMap<String, String>,
381 HashMap<String, Value>,
382 ) {
383 let mut parking_urls = HashMap::new();
384 let mut mount_paths = HashMap::new();
385 let mut artifact_urls = HashMap::new();
386
387 if let Some(storage_data) = &self.metadata.storage {
388 for name in storage_names {
389 if let Some(urls) = storage_data.get(name) {
390 let mut cloned_urls = urls.clone();
391 if sfs_host_path.is_some() {
392 if let Some(obj) = cloned_urls.as_object_mut() {
393 obj.remove("put_url");
394 }
395 }
396 parking_urls.insert(name.clone(), cloned_urls);
397 if let Some(artifacts) = urls.get("artifacts").and_then(|a| a.as_object()) {
398 let allowed_artifacts = self.metadata.step_dsl.artifacts.as_ref();
399 for (art_name, art_val) in artifacts {
400 if let Some(allowed) = allowed_artifacts {
401 if !allowed.contains(art_name) {
402 continue;
403 }
404 } else {
405 continue;
407 }
408
409 let mut cloned_art = art_val.clone();
410 if let Some(m) = spec
411 .as_ref()
412 .and_then(|s| s.storage_mounts.as_ref())
413 .and_then(|ms| ms.iter().find(|m| &m.name == name))
414 {
415 if let Some(p) = art_val.get("path").and_then(|p| p.as_str()) {
416 let abs_path = std::path::Path::new(&m.mount_path).join(p);
417 cloned_art["path"] =
418 Value::String(abs_path.to_string_lossy().to_string());
419 }
420 }
421 artifact_urls.insert(art_name.clone(), cloned_art);
422 }
423 }
424 }
425 if let Some(m) = spec
426 .as_ref()
427 .and_then(|s| s.storage_mounts.as_ref())
428 .and_then(|ms| ms.iter().find(|m| &m.name == name))
429 {
430 mount_paths.insert(name.clone(), m.mount_path.clone());
431 }
432 }
433 }
434
435 (parking_urls, mount_paths, artifact_urls)
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442 use crate::container_machine::ContainerMetadata;
443 use bollard::Docker;
444 use stormchaser_model::dsl::Step;
445
446 fn create_test_metadata() -> ContainerMetadata {
447 let spec = serde_json::json!({
448 "image": "alpine:latest",
449 "command": ["echo", "hello"],
450 "storage_mounts": [
451 {
452 "name": "workspace",
453 "mount_path": "/workspace"
454 }
455 ]
456 });
457
458 let mut storage = HashMap::new();
459 storage.insert(
460 "workspace".to_string(),
461 serde_json::json!({
462 "get_url": "http://s3/get",
463 "put_url": "http://s3/put"
464 }),
465 );
466
467 ContainerMetadata {
468 run_id: Uuid::new_v4(),
469 step_id: Uuid::new_v4(),
470 step_dsl: Step {
471 name: "test_step".to_string(),
472 r#type: "RunContainer".to_string(),
473 spec,
474 condition: None,
475 params: HashMap::new(),
476 strategy: None,
477 aggregation: vec![],
478 iterate: None,
479 iterate_as: None,
480 steps: None,
481 next: vec![],
482 on_failure: None,
483 retry: None,
484 timeout: None,
485 allow_failure: None,
486 start_marker: None,
487 end_marker: None,
488 outputs: vec![],
489 reports: vec![],
490 artifacts: None,
491 },
492 received_at: chrono::Utc::now(),
493 encryption_key: None,
494 storage: Some(storage),
495 test_report_urls: None,
496 }
497 }
498
499 #[tokio::test]
500 async fn test_build_parking_payloads_named_volume() {
501 let docker = Docker::connect_with_local_defaults().unwrap();
502 let metadata = create_test_metadata();
503 let init_machine = DockerContainerMachine::new(docker.clone(), metadata, None);
504 let machine = init_machine.adopt("test".to_string());
505
506 let spec: CommonContainerSpec =
507 serde_json::from_value(machine.metadata.step_dsl.spec.clone()).unwrap();
508 let storage_names = vec!["workspace".to_string()];
509
510 let (parking_urls, mount_paths, _) =
511 machine.build_parking_payloads(&storage_names, Some(&spec), None);
512
513 assert_eq!(parking_urls.len(), 1);
514 assert_eq!(mount_paths.len(), 1);
515
516 assert_eq!(mount_paths.get("workspace").unwrap(), "/workspace");
517
518 let urls = parking_urls.get("workspace").unwrap();
519 assert_eq!(
520 urls.get("put_url").and_then(|v| v.as_str()),
521 Some("http://s3/put")
522 );
523 assert_eq!(
524 urls.get("get_url").and_then(|v| v.as_str()),
525 Some("http://s3/get")
526 );
527 }
528
529 #[tokio::test]
530 async fn test_build_parking_payloads_host_bind() {
531 let docker = Docker::connect_with_local_defaults().unwrap();
532 let metadata = create_test_metadata();
533 let init_machine = DockerContainerMachine::new(docker.clone(), metadata, None);
534 let machine = init_machine.adopt("test".to_string());
535
536 let spec: CommonContainerSpec =
537 serde_json::from_value(machine.metadata.step_dsl.spec.clone()).unwrap();
538 let storage_names = vec!["workspace".to_string()];
539
540 let sfs_host_path = Some("/tmp/stormchaser/sfs");
541 let (parking_urls, mount_paths, _) =
542 machine.build_parking_payloads(&storage_names, Some(&spec), sfs_host_path);
543
544 assert_eq!(parking_urls.len(), 1);
545 assert_eq!(mount_paths.len(), 1);
546
547 assert_eq!(mount_paths.get("workspace").unwrap(), "/workspace");
548
549 let urls = parking_urls.get("workspace").unwrap();
550 assert_eq!(
551 urls.get("get_url").and_then(|v| v.as_str()),
552 Some("http://s3/get")
553 );
554 assert_eq!(urls.get("put_url"), None);
556 }
557
558 #[test]
559 #[ignore]
560 fn test_run_parking_agent_compiles() {
561 let _f = DockerContainerMachine::<state::Running>::run_parking_agent;
562 }
563}