1use crate::container_machine::{state, DockerContainerMachine, StartResult};
2use anyhow::{Context, Result};
3use bollard::container::{
4 Config, CreateContainerOptions, StartContainerOptions, WaitContainerOptions,
5};
6use bollard::image::CreateImageOptions;
7use bollard::service::{HostConfig, Mount, MountTypeEnum};
8use bollard::volume::CreateVolumeOptions;
9use chrono::Utc;
10use cloudevents::EventBuilder;
11use futures::StreamExt;
12use std::path::{Component, Path, PathBuf};
13use std::time::Duration;
14use stormchaser_model::dsl::{CommonContainerSpec, StorageMount};
15use stormchaser_model::events::StepRunningEvent;
16use stormchaser_model::events::{EventSource, EventType, SchemaVersion, StepEventType};
17use stormchaser_model::nats::publish_cloudevent;
18use stormchaser_model::nats::NatsSubject;
19use stormchaser_model::step::StepStatus;
20use stormchaser_model::{RunId, StepInstanceId, APPLICATION_JSON};
21use tokio::{fs, time::sleep};
22use tracing::{error, info};
23use uuid::Uuid;
24
25struct UnparkParams<'a> {
26 source_name: &'a str,
27 volume_mount_path: &'a str,
28 destination_path: &'a str,
29 get_url: &'a str,
30 no_extract: bool,
31 mode: Option<&'a str>,
32 is_bind_mount: bool,
33}
34
35impl DockerContainerMachine<state::Initialized> {
36 pub fn adopt(self, container_name: String) -> DockerContainerMachine<state::Running> {
38 info!("Adopting orphaned Docker container {}", container_name);
39 DockerContainerMachine {
40 nats: self.nats.clone(),
41 docker: self.docker,
42 metadata: self.metadata,
43 state: state::Running {
44 container_name,
45 dispatched_at: Utc::now(),
46 volumes_to_cleanup: Vec::new(),
47 storage_names: Vec::new(),
48 mounts: Vec::new(),
49 },
50 }
51 }
52
53 pub async fn clean_up(self, container_name: &str) -> Result<()> {
55 info!("Cleaning up orphaned Docker container {}", container_name);
56 let _ = self.docker.stop_container(container_name, None).await;
57 let _ = self.docker.remove_container(container_name, None).await;
58 Ok(())
59 }
60
61 pub async fn start(self) -> Result<StartResult> {
63 let container_name = format!(
64 "storm-{}-{}",
65 self.metadata.step_dsl.name.to_lowercase().replace('_', "-"),
66 &self.metadata.step_id.to_string()[..8]
67 );
68
69 let spec: CommonContainerSpec = serde_json::from_value(self.metadata.step_dsl.spec.clone())
70 .context("Failed to parse RunContainer spec as CommonContainerSpec")?;
71
72 let sfs_host_path = std::env::var("STORMCHASER_SFS_HOST_PATH").ok();
73 let (mounts, storage_names, volumes_to_cleanup) = self
74 .setup_storage_mounts(&spec, sfs_host_path.as_deref())
75 .await?;
76
77 info!("Pulling image {}", spec.image);
78 self.pull_image(&spec.image).await?;
79
80 let network_mode = self.get_network_mode().await;
81 let config =
82 self.build_container_config(&spec, mounts.clone(), network_mode, &storage_names)?;
83
84 info!("Creating container {}", container_name);
85 let dispatched_at = Utc::now();
86 self.docker
87 .create_container(
88 Some(CreateContainerOptions {
89 name: container_name.clone(),
90 ..Default::default()
91 }),
92 config,
93 )
94 .await?;
95
96 info!("Starting container {}", container_name);
97 self.docker
98 .start_container(&container_name, None::<StartContainerOptions<String>>)
99 .await?;
100
101 if let Some(nats) = &self.nats {
102 let running_event = StepRunningEvent {
103 run_id: RunId::new(self.metadata.run_id),
104 step_id: StepInstanceId::new(self.metadata.step_id),
105 event_type: EventType::Step(StepEventType::Running),
106 runner_id: None,
107 timestamp: chrono::Utc::now(),
108 };
109 let _ = publish_cloudevent(
110 &async_nats::jetstream::new(nats.clone()),
111 NatsSubject::StepRunning,
112 EventType::Step(StepEventType::Running),
113 EventSource::System,
114 serde_json::to_value(running_event).unwrap(),
115 Some(SchemaVersion::new("1.0".to_string())),
116 None,
117 )
118 .await;
119 }
120
121 Ok(StartResult::Running(DockerContainerMachine {
122 nats: self.nats.clone(),
123 docker: self.docker,
124 metadata: self.metadata,
125 state: state::Running {
126 container_name,
127 dispatched_at,
128 volumes_to_cleanup,
129 storage_names,
130 mounts,
131 },
132 }))
133 }
134
135 async fn unpark_storage(&self, params: UnparkParams<'_>) -> Result<()> {
136 let volume_name = params.source_name;
137 let agent_image = "stormchaser-agent:v1";
138 let unpark_container_name = format!("unpark-{}", Uuid::new_v4());
139
140 let mut cmd = vec![
141 "/usr/local/bin/stormchaser-agent".to_string(),
142 "unpark".to_string(),
143 "--url".to_string(),
144 params.get_url.to_string(),
145 "--destination".to_string(),
146 params.destination_path.to_string(),
147 ];
148 if params.no_extract {
149 cmd.push("--no-extract".to_string());
150 }
151 if let Some(m) = params.mode {
152 cmd.push("--mode".to_string());
153 cmd.push(m.to_string());
154 }
155
156 let config = Config {
157 image: Some(agent_image.to_string()),
158 cmd: Some(cmd),
159 host_config: Some(HostConfig {
160 mounts: Some(vec![Mount {
161 target: Some(params.volume_mount_path.to_string()),
162 source: Some(params.source_name.to_string()),
163 typ: Some(if params.is_bind_mount {
164 MountTypeEnum::BIND
165 } else {
166 MountTypeEnum::VOLUME
167 }),
168 ..Default::default()
169 }]),
170 network_mode: self.get_network_mode().await,
171 ..Default::default()
172 }),
173 ..Default::default()
174 };
175
176 self.docker
177 .create_container(
178 Some(CreateContainerOptions {
179 name: unpark_container_name.clone(),
180 ..Default::default()
181 }),
182 config,
183 )
184 .await?;
185
186 self.docker
187 .start_container(
188 &unpark_container_name,
189 None::<StartContainerOptions<String>>,
190 )
191 .await?;
192
193 let mut wait_stream = self.docker.wait_container(
194 &unpark_container_name,
195 Some(WaitContainerOptions {
196 condition: "not-running",
197 }),
198 );
199
200 if let Some(wait_result) = wait_stream.next().await {
201 match wait_result {
202 Ok(res) if res.status_code == 0 => {
203 info!("Unpark successful for {}", volume_name);
204 sleep(Duration::from_secs(15)).await;
206 if let Err(e) = self
207 .docker
208 .remove_container(&unpark_container_name, None)
209 .await
210 {
211 error!(
212 "Failed to remove unpark container {}: {:?}",
213 unpark_container_name, e
214 );
215 }
216 Ok(())
217 }
218 res => {
219 error!("Unpark failed for {}: {:?}", volume_name, res);
220 if let Err(e) = self
221 .docker
222 .remove_container(&unpark_container_name, None)
223 .await
224 {
225 error!(
226 "Failed to remove unpark container {}: {:?}",
227 unpark_container_name, e
228 );
229 }
230 Err(anyhow::anyhow!(
231 "Unpark failed for {}: {:?}",
232 volume_name,
233 res
234 ))
235 }
236 }
237 } else {
238 if let Err(e) = self
239 .docker
240 .remove_container(&unpark_container_name, None)
241 .await
242 {
243 error!(
244 "Failed to remove unpark container {}: {:?}",
245 unpark_container_name, e
246 );
247 }
248 Err(anyhow::anyhow!("Wait stream ended unexpectedly"))
249 }
250 }
251
252 async fn setup_single_mount(
253 &self,
254 mount: &StorageMount,
255 sfs_host_path: Option<&str>,
256 mounts: &mut Vec<Mount>,
257 storage_names: &mut Vec<String>,
258 volumes_to_cleanup: &mut Vec<String>,
259 ) -> Result<()> {
260 storage_names.push(mount.name.clone());
261
262 let (source_name, is_bind_mount) = if let Some(host_path) = &sfs_host_path {
263 let source_path = self.build_bind_mount_source(host_path, &mount.name).await?;
264 (source_path, true)
265 } else {
266 let volume_name = format!(
267 "sfs-{}-{}",
268 mount.name.to_lowercase().replace('_', "-"),
269 &self.metadata.run_id.to_string()[..8]
270 );
271
272 info!("Ensuring Docker volume exists: {}", volume_name);
273 self.docker
274 .create_volume(CreateVolumeOptions {
275 name: volume_name.clone(),
276 ..Default::default()
277 })
278 .await?;
279
280 volumes_to_cleanup.push(volume_name.clone());
281 (volume_name, false)
282 };
283
284 mounts.push(Mount {
285 target: Some(mount.mount_path.clone()),
286 source: Some(source_name.clone()),
287 typ: Some(if is_bind_mount {
288 MountTypeEnum::BIND
289 } else {
290 MountTypeEnum::VOLUME
291 }),
292 read_only: mount.read_only,
293 ..Default::default()
294 });
295
296 if let Some(storage_data) = &self.metadata.storage {
297 if let Some(urls) = storage_data.get(&mount.name) {
298 let has_state = urls.get("expected_hash").and_then(|h| h.as_str()).is_some();
299
300 if has_state {
301 if is_bind_mount {
302 info!(
303 "Skipping unpark for storage '{}' due to STORMCHASER_SFS_HOST_PATH",
304 mount.name
305 );
306 } else if let Some(get_url) = urls.get("get_url").and_then(|u| u.as_str()) {
307 info!(
308 "Unparking storage '{}' (resume state) for volume '{}'",
309 mount.name, source_name
310 );
311 if let Some(nats) = &self.nats {
312 let unpacking_event = serde_json::json!({
313 "run_id": self.metadata.run_id,
314 "step_id": self.metadata.step_id,
315 "status": StepStatus::UnpackingSfs,
316 "timestamp": chrono::Utc::now(),
317 });
318 if let Ok(ce) = cloudevents::EventBuilderV10::new()
319 .id(uuid::Uuid::new_v4().to_string())
320 .ty("stormchaser.v1.step.unpacking_sfs")
321 .source("/stormchaser/runner")
322 .time(chrono::Utc::now())
323 .data(APPLICATION_JSON, unpacking_event)
324 .build()
325 {
326 if let Ok(payload_bytes) = serde_json::to_vec(&ce) {
327 let _ = nats
328 .publish(
329 "stormchaser.v1.step.unpacking_sfs",
330 payload_bytes.into(),
331 )
332 .await;
333 }
334 }
335 }
336 self.unpark_storage(UnparkParams {
337 source_name: &source_name,
338 volume_mount_path: &mount.mount_path,
339 destination_path: &mount.mount_path,
340 get_url,
341 no_extract: false,
342 mode: None,
343 is_bind_mount,
344 })
345 .await?;
346 }
347 } else if let Some(provision) = urls.get("provision").and_then(|p| p.as_array()) {
348 for prov in provision {
349 if let (Some(url), Some(dest)) = (
350 prov.get("url").and_then(|u| u.as_str()),
351 prov.get("destination").and_then(|d| d.as_str()),
352 ) {
353 info!(
354 "Provisioning storage '{}' from URL '{}' into destination '{}'",
355 mount.name, url, dest
356 );
357 if let Some(nats) = &self.nats {
358 let unpacking_event = serde_json::json!({
359 "run_id": self.metadata.run_id,
360 "step_id": self.metadata.step_id,
361 "status": StepStatus::UnpackingSfs,
362 "timestamp": chrono::Utc::now(),
363 });
364 if let Ok(ce) = cloudevents::EventBuilderV10::new()
365 .id(uuid::Uuid::new_v4().to_string())
366 .ty("stormchaser.v1.step.unpacking_sfs")
367 .source("/stormchaser/runner")
368 .time(chrono::Utc::now())
369 .data(APPLICATION_JSON, unpacking_event)
370 .build()
371 {
372 if let Ok(payload_bytes) = serde_json::to_vec(&ce) {
373 let _ = nats
374 .publish(
375 "stormchaser.v1.step.unpacking_sfs",
376 payload_bytes.into(),
377 )
378 .await;
379 }
380 }
381 }
382 let mut full_dest = PathBuf::from(&mount.mount_path);
383 if dest != "/" && !dest.is_empty() {
384 let relative_dest = dest.trim_start_matches('/');
385 for component in Path::new(relative_dest).components() {
387 match component {
388 std::path::Component::ParentDir => {
389 anyhow::bail!(
390 "Provision destination '{}' contains illegal path traversal (..)",
391 dest
392 );
393 }
394 std::path::Component::Prefix(_) => {
395 anyhow::bail!(
396 "Provision destination '{}' contains an illegal absolute path prefix",
397 dest
398 );
399 }
400 _ => {}
401 }
402 }
403 full_dest.push(relative_dest);
404 }
405
406 let resource_type = prov.get("resource_type").and_then(|r| r.as_str());
407 let is_extract = resource_type != Some("artifact");
408 let prov_mode =
409 prov.get("mode").and_then(|m| m.as_str()).map(str::to_owned);
410 info!(
411 "Provisioning resource_type: {:?}, is_extract: {}",
412 resource_type, is_extract
413 );
414
415 let full_dest_str = full_dest.to_string_lossy().into_owned();
416 self.unpark_storage(UnparkParams {
417 source_name: &source_name,
418 volume_mount_path: &mount.mount_path,
419 destination_path: &full_dest_str,
420 get_url: url,
421 no_extract: !is_extract,
422 mode: prov_mode.as_deref(),
423 is_bind_mount,
424 })
425 .await?;
426 }
427 }
428 }
429 }
430 }
431
432 Ok(())
433 }
434
435 fn validate_bind_mount_name(mount_name: &str) -> Result<()> {
436 if mount_name.is_empty() || mount_name.contains('/') || mount_name.contains('\\') {
437 anyhow::bail!(
438 "Storage mount name '{}' is not valid for bind mounts",
439 mount_name
440 );
441 }
442
443 let mut components = Path::new(mount_name).components();
444 match (components.next(), components.next()) {
445 (Some(Component::Normal(_)), None) => Ok(()),
446 _ => anyhow::bail!(
447 "Storage mount name '{}' contains illegal path components",
448 mount_name
449 ),
450 }
451 }
452
453 async fn build_bind_mount_source(&self, host_path: &str, mount_name: &str) -> Result<String> {
454 Self::validate_bind_mount_name(mount_name)?;
455
456 let run_dir = PathBuf::from(host_path).join(self.metadata.run_id.to_string());
457 let source_path = run_dir.join(mount_name);
458 if !source_path.starts_with(&run_dir) {
459 anyhow::bail!(
460 "Storage mount name '{}' escapes bind mount root '{}'",
461 mount_name,
462 run_dir.display()
463 );
464 }
465
466 fs::create_dir_all(&source_path).await.with_context(|| {
467 format!(
468 "Failed to create bind mount directory '{}' for storage '{}'",
469 source_path.display(),
470 mount_name
471 )
472 })?;
473
474 Ok(source_path.to_string_lossy().into_owned())
475 }
476
477 async fn setup_storage_mounts(
478 &self,
479 spec: &CommonContainerSpec,
480 sfs_host_path: Option<&str>,
481 ) -> Result<(Vec<Mount>, Vec<String>, Vec<String>)> {
482 let mut mounts = Vec::new();
483 let mut storage_names = Vec::new();
484 let mut volumes_to_cleanup = Vec::new();
485
486 if let Some(storage_mounts) = &spec.storage_mounts {
487 for mount in storage_mounts {
488 self.setup_single_mount(
489 mount,
490 sfs_host_path,
491 &mut mounts,
492 &mut storage_names,
493 &mut volumes_to_cleanup,
494 )
495 .await?;
496 }
497 }
498 Ok((mounts, storage_names, volumes_to_cleanup))
499 }
500 async fn pull_image(&self, image: &str) -> Result<()> {
501 let (from_image, tag) = if image.contains('@') {
506 (image, "")
508 } else {
509 match image.rsplit_once(':') {
510 Some((repo, t)) if !t.contains('/') => (repo, t),
513 _ => (image, "latest"),
514 }
515 };
516
517 let mut pull_stream = self.docker.create_image(
518 Some(CreateImageOptions {
519 from_image: from_image.to_string(),
520 tag: tag.to_string(),
521 ..Default::default()
522 }),
523 None,
524 None,
525 );
526 while let Some(pull_result) = pull_stream.next().await {
527 if let Err(e) = pull_result {
528 error!("Error pulling image {}: {:?}", image, e);
529 anyhow::bail!("Failed to pull image {}: {}", image, e);
530 }
531 }
532 Ok(())
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539 use crate::container_machine::ContainerMetadata;
540 use bollard::Docker;
541 use std::collections::HashMap;
542 use stormchaser_model::dsl::Step;
543 use tempfile::tempdir;
544
545 fn create_test_metadata_with_mounts() -> ContainerMetadata {
546 create_test_metadata_with_mount_name("workspace")
547 }
548
549 fn create_test_metadata_with_mount_name(mount_name: &str) -> ContainerMetadata {
550 let spec = serde_json::json!({
551 "image": "alpine:latest",
552 "command": ["echo", "hello"],
553 "storage_mounts": [
554 {
555 "name": mount_name,
556 "mount_path": "/workspace"
557 }
558 ]
559 });
560
561 ContainerMetadata {
562 run_id: Uuid::new_v4(),
563 step_id: Uuid::new_v4(),
564 step_dsl: Step {
565 name: "test_step".to_string(),
566 r#type: "RunContainer".to_string(),
567 spec,
568 condition: None,
569 params: HashMap::new(),
570 strategy: None,
571 aggregation: vec![],
572 iterate: None,
573 iterate_as: None,
574 steps: None,
575 next: vec![],
576 on_failure: None,
577 retry: None,
578 timeout: None,
579 allow_failure: None,
580 start_marker: None,
581 end_marker: None,
582 outputs: vec![],
583 reports: vec![],
584 artifacts: None,
585 },
586 received_at: chrono::Utc::now(),
587 encryption_key: None,
588 storage: None,
589 test_report_urls: None,
590 }
591 }
592
593 #[tokio::test]
594 async fn test_setup_storage_mounts_named_volume() {
595 let docker = Docker::connect_with_local_defaults().unwrap();
596 let metadata = create_test_metadata_with_mounts();
597 let run_id = metadata.run_id;
598 let machine = DockerContainerMachine::new(docker.clone(), metadata, None);
599
600 let spec: CommonContainerSpec =
601 serde_json::from_value(machine.metadata.step_dsl.spec.clone()).unwrap();
602
603 let (mounts, storage_names, volumes_to_cleanup) =
604 machine.setup_storage_mounts(&spec, None).await.unwrap();
605
606 assert_eq!(mounts.len(), 1);
607 assert_eq!(storage_names.len(), 1);
608 assert_eq!(volumes_to_cleanup.len(), 1);
609
610 assert_eq!(storage_names[0], "workspace");
611
612 let mount = &mounts[0];
613 assert_eq!(mount.target.as_deref(), Some("/workspace"));
614 assert_eq!(mount.typ, Some(MountTypeEnum::VOLUME));
615
616 let source = mount.source.as_ref().unwrap();
617 assert!(source.starts_with("sfs-workspace-"));
618 assert!(source.contains(&run_id.to_string()[..8]));
619
620 for vol in volumes_to_cleanup {
622 let _ = docker.remove_volume(&vol, None).await;
623 }
624 }
625
626 #[tokio::test]
627 async fn test_setup_storage_mounts_host_bind() {
628 let docker = Docker::connect_with_local_defaults().unwrap();
629 let metadata = create_test_metadata_with_mounts();
630 let run_id = metadata.run_id;
631 let machine = DockerContainerMachine::new(docker.clone(), metadata, None);
632
633 let spec: CommonContainerSpec =
634 serde_json::from_value(machine.metadata.step_dsl.spec.clone()).unwrap();
635
636 let host_path = tempdir().unwrap();
637 let sfs_host_path = Some(host_path.path().to_str().unwrap());
638 let (mounts, storage_names, volumes_to_cleanup) = machine
639 .setup_storage_mounts(&spec, sfs_host_path)
640 .await
641 .unwrap();
642
643 assert_eq!(mounts.len(), 1);
644 assert_eq!(storage_names.len(), 1);
645 assert_eq!(volumes_to_cleanup.len(), 0); assert_eq!(storage_names[0], "workspace");
648
649 let mount = &mounts[0];
650 assert_eq!(mount.target.as_deref(), Some("/workspace"));
651 assert_eq!(mount.typ, Some(MountTypeEnum::BIND));
652
653 let expected_source = host_path.path().join(run_id.to_string()).join("workspace");
654 assert!(expected_source.is_dir());
655 assert_eq!(mount.source.as_deref(), expected_source.to_str());
656 }
657
658 #[tokio::test]
659 async fn test_setup_storage_mounts_host_bind_rejects_invalid_mount_name() {
660 let docker = Docker::connect_with_local_defaults().unwrap();
661 let metadata = create_test_metadata_with_mount_name("../escape");
662 let machine = DockerContainerMachine::new(docker, metadata, None);
663 let spec: CommonContainerSpec =
664 serde_json::from_value(machine.metadata.step_dsl.spec.clone()).unwrap();
665 let host_path = tempdir().unwrap();
666
667 let err = machine
668 .setup_storage_mounts(&spec, host_path.path().to_str())
669 .await
670 .unwrap_err();
671
672 assert!(
673 err.to_string().contains("Storage mount name '../escape'"),
674 "unexpected error: {err}"
675 );
676 }
677}