Skip to main content

stormchaser_runner_docker/container_machine/transitions/
initialized.rs

1use crate::container_machine::{state, DockerContainerMachine, StartResult};
2use anyhow::{Context, Result};
3use bollard::container::{
4    Config, CreateContainerOptions, StartContainerOptions, WaitContainerOptions,
5};
6use bollard::image::CreateImageOptions;
7use bollard::service::{HostConfig, Mount, MountTypeEnum};
8use bollard::volume::CreateVolumeOptions;
9use chrono::Utc;
10use cloudevents::EventBuilder;
11use futures::StreamExt;
12use std::path::{Component, Path, PathBuf};
13use std::time::Duration;
14use stormchaser_model::dsl::{CommonContainerSpec, StorageMount};
15use stormchaser_model::events::StepRunningEvent;
16use stormchaser_model::events::{EventSource, EventType, SchemaVersion, StepEventType};
17use stormchaser_model::nats::publish_cloudevent;
18use stormchaser_model::nats::NatsSubject;
19use stormchaser_model::step::StepStatus;
20use stormchaser_model::{RunId, StepInstanceId, APPLICATION_JSON};
21use tokio::{fs, time::sleep};
22use tracing::{error, info};
23use uuid::Uuid;
24
25struct UnparkParams<'a> {
26    source_name: &'a str,
27    volume_mount_path: &'a str,
28    destination_path: &'a str,
29    get_url: &'a str,
30    no_extract: bool,
31    mode: Option<&'a str>,
32    is_bind_mount: bool,
33}
34
35impl DockerContainerMachine<state::Initialized> {
36    /// Adopts an already-running container by name, transitioning the state machine to `Running`.
37    pub fn adopt(self, container_name: String) -> DockerContainerMachine<state::Running> {
38        info!("Adopting orphaned Docker container {}", container_name);
39        DockerContainerMachine {
40            nats: self.nats.clone(),
41            docker: self.docker,
42            metadata: self.metadata,
43            state: state::Running {
44                container_name,
45                dispatched_at: Utc::now(),
46                volumes_to_cleanup: Vec::new(),
47                storage_names: Vec::new(),
48                mounts: Vec::new(),
49            },
50        }
51    }
52
53    /// Cleans up an orphaned container without running it through the state machine.
54    pub async fn clean_up(self, container_name: &str) -> Result<()> {
55        info!("Cleaning up orphaned Docker container {}", container_name);
56        let _ = self.docker.stop_container(container_name, None).await;
57        let _ = self.docker.remove_container(container_name, None).await;
58        Ok(())
59    }
60
61    /// Starts a new Docker container, pulling images and unparking storage as necessary.
62    pub async fn start(self) -> Result<StartResult> {
63        let container_name = format!(
64            "storm-{}-{}",
65            self.metadata.step_dsl.name.to_lowercase().replace('_', "-"),
66            &self.metadata.step_id.to_string()[..8]
67        );
68
69        let spec: CommonContainerSpec = serde_json::from_value(self.metadata.step_dsl.spec.clone())
70            .context("Failed to parse RunContainer spec as CommonContainerSpec")?;
71
72        let sfs_host_path = std::env::var("STORMCHASER_SFS_HOST_PATH").ok();
73        let (mounts, storage_names, volumes_to_cleanup) = self
74            .setup_storage_mounts(&spec, sfs_host_path.as_deref())
75            .await?;
76
77        info!("Pulling image {}", spec.image);
78        self.pull_image(&spec.image).await?;
79
80        let network_mode = self.get_network_mode().await;
81        let config =
82            self.build_container_config(&spec, mounts.clone(), network_mode, &storage_names)?;
83
84        info!("Creating container {}", container_name);
85        let dispatched_at = Utc::now();
86        self.docker
87            .create_container(
88                Some(CreateContainerOptions {
89                    name: container_name.clone(),
90                    ..Default::default()
91                }),
92                config,
93            )
94            .await?;
95
96        info!("Starting container {}", container_name);
97        self.docker
98            .start_container(&container_name, None::<StartContainerOptions<String>>)
99            .await?;
100
101        if let Some(nats) = &self.nats {
102            let running_event = StepRunningEvent {
103                run_id: RunId::new(self.metadata.run_id),
104                step_id: StepInstanceId::new(self.metadata.step_id),
105                event_type: EventType::Step(StepEventType::Running),
106                runner_id: None,
107                timestamp: chrono::Utc::now(),
108            };
109            let _ = publish_cloudevent(
110                &async_nats::jetstream::new(nats.clone()),
111                NatsSubject::StepRunning,
112                EventType::Step(StepEventType::Running),
113                EventSource::System,
114                serde_json::to_value(running_event).unwrap(),
115                Some(SchemaVersion::new("1.0".to_string())),
116                None,
117            )
118            .await;
119        }
120
121        Ok(StartResult::Running(DockerContainerMachine {
122            nats: self.nats.clone(),
123            docker: self.docker,
124            metadata: self.metadata,
125            state: state::Running {
126                container_name,
127                dispatched_at,
128                volumes_to_cleanup,
129                storage_names,
130                mounts,
131            },
132        }))
133    }
134
135    async fn unpark_storage(&self, params: UnparkParams<'_>) -> Result<()> {
136        let volume_name = params.source_name;
137        let agent_image = "stormchaser-agent:v1";
138        let unpark_container_name = format!("unpark-{}", Uuid::new_v4());
139
140        let mut cmd = vec![
141            "/usr/local/bin/stormchaser-agent".to_string(),
142            "unpark".to_string(),
143            "--url".to_string(),
144            params.get_url.to_string(),
145            "--destination".to_string(),
146            params.destination_path.to_string(),
147        ];
148        if params.no_extract {
149            cmd.push("--no-extract".to_string());
150        }
151        if let Some(m) = params.mode {
152            cmd.push("--mode".to_string());
153            cmd.push(m.to_string());
154        }
155
156        let config = Config {
157            image: Some(agent_image.to_string()),
158            cmd: Some(cmd),
159            host_config: Some(HostConfig {
160                mounts: Some(vec![Mount {
161                    target: Some(params.volume_mount_path.to_string()),
162                    source: Some(params.source_name.to_string()),
163                    typ: Some(if params.is_bind_mount {
164                        MountTypeEnum::BIND
165                    } else {
166                        MountTypeEnum::VOLUME
167                    }),
168                    ..Default::default()
169                }]),
170                network_mode: self.get_network_mode().await,
171                ..Default::default()
172            }),
173            ..Default::default()
174        };
175
176        self.docker
177            .create_container(
178                Some(CreateContainerOptions {
179                    name: unpark_container_name.clone(),
180                    ..Default::default()
181                }),
182                config,
183            )
184            .await?;
185
186        self.docker
187            .start_container(
188                &unpark_container_name,
189                None::<StartContainerOptions<String>>,
190            )
191            .await?;
192
193        let mut wait_stream = self.docker.wait_container(
194            &unpark_container_name,
195            Some(WaitContainerOptions {
196                condition: "not-running",
197            }),
198        );
199
200        if let Some(wait_result) = wait_stream.next().await {
201            match wait_result {
202                Ok(res) if res.status_code == 0 => {
203                    info!("Unpark successful for {}", volume_name);
204                    // Wait for logs
205                    sleep(Duration::from_secs(15)).await;
206                    if let Err(e) = self
207                        .docker
208                        .remove_container(&unpark_container_name, None)
209                        .await
210                    {
211                        error!(
212                            "Failed to remove unpark container {}: {:?}",
213                            unpark_container_name, e
214                        );
215                    }
216                    Ok(())
217                }
218                res => {
219                    error!("Unpark failed for {}: {:?}", volume_name, res);
220                    if let Err(e) = self
221                        .docker
222                        .remove_container(&unpark_container_name, None)
223                        .await
224                    {
225                        error!(
226                            "Failed to remove unpark container {}: {:?}",
227                            unpark_container_name, e
228                        );
229                    }
230                    Err(anyhow::anyhow!(
231                        "Unpark failed for {}: {:?}",
232                        volume_name,
233                        res
234                    ))
235                }
236            }
237        } else {
238            if let Err(e) = self
239                .docker
240                .remove_container(&unpark_container_name, None)
241                .await
242            {
243                error!(
244                    "Failed to remove unpark container {}: {:?}",
245                    unpark_container_name, e
246                );
247            }
248            Err(anyhow::anyhow!("Wait stream ended unexpectedly"))
249        }
250    }
251
252    async fn setup_single_mount(
253        &self,
254        mount: &StorageMount,
255        sfs_host_path: Option<&str>,
256        mounts: &mut Vec<Mount>,
257        storage_names: &mut Vec<String>,
258        volumes_to_cleanup: &mut Vec<String>,
259    ) -> Result<()> {
260        storage_names.push(mount.name.clone());
261
262        let (source_name, is_bind_mount) = if let Some(host_path) = &sfs_host_path {
263            let source_path = self.build_bind_mount_source(host_path, &mount.name).await?;
264            (source_path, true)
265        } else {
266            let volume_name = format!(
267                "sfs-{}-{}",
268                mount.name.to_lowercase().replace('_', "-"),
269                &self.metadata.run_id.to_string()[..8]
270            );
271
272            info!("Ensuring Docker volume exists: {}", volume_name);
273            self.docker
274                .create_volume(CreateVolumeOptions {
275                    name: volume_name.clone(),
276                    ..Default::default()
277                })
278                .await?;
279
280            volumes_to_cleanup.push(volume_name.clone());
281            (volume_name, false)
282        };
283
284        mounts.push(Mount {
285            target: Some(mount.mount_path.clone()),
286            source: Some(source_name.clone()),
287            typ: Some(if is_bind_mount {
288                MountTypeEnum::BIND
289            } else {
290                MountTypeEnum::VOLUME
291            }),
292            read_only: mount.read_only,
293            ..Default::default()
294        });
295
296        if let Some(storage_data) = &self.metadata.storage {
297            if let Some(urls) = storage_data.get(&mount.name) {
298                let has_state = urls.get("expected_hash").and_then(|h| h.as_str()).is_some();
299
300                if has_state {
301                    if is_bind_mount {
302                        info!(
303                            "Skipping unpark for storage '{}' due to STORMCHASER_SFS_HOST_PATH",
304                            mount.name
305                        );
306                    } else if let Some(get_url) = urls.get("get_url").and_then(|u| u.as_str()) {
307                        info!(
308                            "Unparking storage '{}' (resume state) for volume '{}'",
309                            mount.name, source_name
310                        );
311                        if let Some(nats) = &self.nats {
312                            let unpacking_event = serde_json::json!({
313                                "run_id": self.metadata.run_id,
314                                "step_id": self.metadata.step_id,
315                                "status": StepStatus::UnpackingSfs,
316                                "timestamp": chrono::Utc::now(),
317                            });
318                            if let Ok(ce) = cloudevents::EventBuilderV10::new()
319                                .id(uuid::Uuid::new_v4().to_string())
320                                .ty("stormchaser.v1.step.unpacking_sfs")
321                                .source("/stormchaser/runner")
322                                .time(chrono::Utc::now())
323                                .data(APPLICATION_JSON, unpacking_event)
324                                .build()
325                            {
326                                if let Ok(payload_bytes) = serde_json::to_vec(&ce) {
327                                    let _ = nats
328                                        .publish(
329                                            "stormchaser.v1.step.unpacking_sfs",
330                                            payload_bytes.into(),
331                                        )
332                                        .await;
333                                }
334                            }
335                        }
336                        self.unpark_storage(UnparkParams {
337                            source_name: &source_name,
338                            volume_mount_path: &mount.mount_path,
339                            destination_path: &mount.mount_path,
340                            get_url,
341                            no_extract: false,
342                            mode: None,
343                            is_bind_mount,
344                        })
345                        .await?;
346                    }
347                } else if let Some(provision) = urls.get("provision").and_then(|p| p.as_array()) {
348                    for prov in provision {
349                        if let (Some(url), Some(dest)) = (
350                            prov.get("url").and_then(|u| u.as_str()),
351                            prov.get("destination").and_then(|d| d.as_str()),
352                        ) {
353                            info!(
354                                "Provisioning storage '{}' from URL '{}' into destination '{}'",
355                                mount.name, url, dest
356                            );
357                            if let Some(nats) = &self.nats {
358                                let unpacking_event = serde_json::json!({
359                                    "run_id": self.metadata.run_id,
360                                    "step_id": self.metadata.step_id,
361                                    "status": StepStatus::UnpackingSfs,
362                                    "timestamp": chrono::Utc::now(),
363                                });
364                                if let Ok(ce) = cloudevents::EventBuilderV10::new()
365                                    .id(uuid::Uuid::new_v4().to_string())
366                                    .ty("stormchaser.v1.step.unpacking_sfs")
367                                    .source("/stormchaser/runner")
368                                    .time(chrono::Utc::now())
369                                    .data(APPLICATION_JSON, unpacking_event)
370                                    .build()
371                                {
372                                    if let Ok(payload_bytes) = serde_json::to_vec(&ce) {
373                                        let _ = nats
374                                            .publish(
375                                                "stormchaser.v1.step.unpacking_sfs",
376                                                payload_bytes.into(),
377                                            )
378                                            .await;
379                                    }
380                                }
381                            }
382                            let mut full_dest = PathBuf::from(&mount.mount_path);
383                            if dest != "/" && !dest.is_empty() {
384                                let relative_dest = dest.trim_start_matches('/');
385                                // Reject path traversal and Windows-style drive prefixes.
386                                for component in Path::new(relative_dest).components() {
387                                    match component {
388                                        std::path::Component::ParentDir => {
389                                            anyhow::bail!(
390                                                        "Provision destination '{}' contains illegal path traversal (..)",
391                                                        dest
392                                                    );
393                                        }
394                                        std::path::Component::Prefix(_) => {
395                                            anyhow::bail!(
396                                                        "Provision destination '{}' contains an illegal absolute path prefix",
397                                                        dest
398                                                    );
399                                        }
400                                        _ => {}
401                                    }
402                                }
403                                full_dest.push(relative_dest);
404                            }
405
406                            let resource_type = prov.get("resource_type").and_then(|r| r.as_str());
407                            let is_extract = resource_type != Some("artifact");
408                            let prov_mode =
409                                prov.get("mode").and_then(|m| m.as_str()).map(str::to_owned);
410                            info!(
411                                "Provisioning resource_type: {:?}, is_extract: {}",
412                                resource_type, is_extract
413                            );
414
415                            let full_dest_str = full_dest.to_string_lossy().into_owned();
416                            self.unpark_storage(UnparkParams {
417                                source_name: &source_name,
418                                volume_mount_path: &mount.mount_path,
419                                destination_path: &full_dest_str,
420                                get_url: url,
421                                no_extract: !is_extract,
422                                mode: prov_mode.as_deref(),
423                                is_bind_mount,
424                            })
425                            .await?;
426                        }
427                    }
428                }
429            }
430        }
431
432        Ok(())
433    }
434
435    fn validate_bind_mount_name(mount_name: &str) -> Result<()> {
436        if mount_name.is_empty() || mount_name.contains('/') || mount_name.contains('\\') {
437            anyhow::bail!(
438                "Storage mount name '{}' is not valid for bind mounts",
439                mount_name
440            );
441        }
442
443        let mut components = Path::new(mount_name).components();
444        match (components.next(), components.next()) {
445            (Some(Component::Normal(_)), None) => Ok(()),
446            _ => anyhow::bail!(
447                "Storage mount name '{}' contains illegal path components",
448                mount_name
449            ),
450        }
451    }
452
453    async fn build_bind_mount_source(&self, host_path: &str, mount_name: &str) -> Result<String> {
454        Self::validate_bind_mount_name(mount_name)?;
455
456        let run_dir = PathBuf::from(host_path).join(self.metadata.run_id.to_string());
457        let source_path = run_dir.join(mount_name);
458        if !source_path.starts_with(&run_dir) {
459            anyhow::bail!(
460                "Storage mount name '{}' escapes bind mount root '{}'",
461                mount_name,
462                run_dir.display()
463            );
464        }
465
466        fs::create_dir_all(&source_path).await.with_context(|| {
467            format!(
468                "Failed to create bind mount directory '{}' for storage '{}'",
469                source_path.display(),
470                mount_name
471            )
472        })?;
473
474        Ok(source_path.to_string_lossy().into_owned())
475    }
476
477    async fn setup_storage_mounts(
478        &self,
479        spec: &CommonContainerSpec,
480        sfs_host_path: Option<&str>,
481    ) -> Result<(Vec<Mount>, Vec<String>, Vec<String>)> {
482        let mut mounts = Vec::new();
483        let mut storage_names = Vec::new();
484        let mut volumes_to_cleanup = Vec::new();
485
486        if let Some(storage_mounts) = &spec.storage_mounts {
487            for mount in storage_mounts {
488                self.setup_single_mount(
489                    mount,
490                    sfs_host_path,
491                    &mut mounts,
492                    &mut storage_names,
493                    &mut volumes_to_cleanup,
494                )
495                .await?;
496            }
497        }
498        Ok((mounts, storage_names, volumes_to_cleanup))
499    }
500    async fn pull_image(&self, image: &str) -> Result<()> {
501        // Parse image reference robustly:
502        // - Digest-pinned: `repo@sha256:...` → from_image=full ref, tag=""
503        // - Tagged:        `repo:tag`         → from_image=repo, tag=tag
504        // - Bare:          `repo`             → from_image=repo, tag="latest"
505        let (from_image, tag) = if image.contains('@') {
506            // Digest reference — pass the full string and let Docker handle it
507            (image, "")
508        } else {
509            match image.rsplit_once(':') {
510                // Only treat it as a tag if the part after `:` contains no `/`
511                // (to avoid splitting registry hosts like `registry.example.com:5000/repo`)
512                Some((repo, t)) if !t.contains('/') => (repo, t),
513                _ => (image, "latest"),
514            }
515        };
516
517        let mut pull_stream = self.docker.create_image(
518            Some(CreateImageOptions {
519                from_image: from_image.to_string(),
520                tag: tag.to_string(),
521                ..Default::default()
522            }),
523            None,
524            None,
525        );
526        while let Some(pull_result) = pull_stream.next().await {
527            if let Err(e) = pull_result {
528                error!("Error pulling image {}: {:?}", image, e);
529                anyhow::bail!("Failed to pull image {}: {}", image, e);
530            }
531        }
532        Ok(())
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use super::*;
539    use crate::container_machine::ContainerMetadata;
540    use bollard::Docker;
541    use std::collections::HashMap;
542    use stormchaser_model::dsl::Step;
543    use tempfile::tempdir;
544
545    fn create_test_metadata_with_mounts() -> ContainerMetadata {
546        create_test_metadata_with_mount_name("workspace")
547    }
548
549    fn create_test_metadata_with_mount_name(mount_name: &str) -> ContainerMetadata {
550        let spec = serde_json::json!({
551            "image": "alpine:latest",
552            "command": ["echo", "hello"],
553            "storage_mounts": [
554                {
555                    "name": mount_name,
556                    "mount_path": "/workspace"
557                }
558            ]
559        });
560
561        ContainerMetadata {
562            run_id: Uuid::new_v4(),
563            step_id: Uuid::new_v4(),
564            step_dsl: Step {
565                name: "test_step".to_string(),
566                r#type: "RunContainer".to_string(),
567                spec,
568                condition: None,
569                params: HashMap::new(),
570                strategy: None,
571                aggregation: vec![],
572                iterate: None,
573                iterate_as: None,
574                steps: None,
575                next: vec![],
576                on_failure: None,
577                retry: None,
578                timeout: None,
579                allow_failure: None,
580                start_marker: None,
581                end_marker: None,
582                outputs: vec![],
583                reports: vec![],
584                artifacts: None,
585            },
586            received_at: chrono::Utc::now(),
587            encryption_key: None,
588            storage: None,
589            test_report_urls: None,
590        }
591    }
592
593    #[tokio::test]
594    async fn test_setup_storage_mounts_named_volume() {
595        let docker = Docker::connect_with_local_defaults().unwrap();
596        let metadata = create_test_metadata_with_mounts();
597        let run_id = metadata.run_id;
598        let machine = DockerContainerMachine::new(docker.clone(), metadata, None);
599
600        let spec: CommonContainerSpec =
601            serde_json::from_value(machine.metadata.step_dsl.spec.clone()).unwrap();
602
603        let (mounts, storage_names, volumes_to_cleanup) =
604            machine.setup_storage_mounts(&spec, None).await.unwrap();
605
606        assert_eq!(mounts.len(), 1);
607        assert_eq!(storage_names.len(), 1);
608        assert_eq!(volumes_to_cleanup.len(), 1);
609
610        assert_eq!(storage_names[0], "workspace");
611
612        let mount = &mounts[0];
613        assert_eq!(mount.target.as_deref(), Some("/workspace"));
614        assert_eq!(mount.typ, Some(MountTypeEnum::VOLUME));
615
616        let source = mount.source.as_ref().unwrap();
617        assert!(source.starts_with("sfs-workspace-"));
618        assert!(source.contains(&run_id.to_string()[..8]));
619
620        // Cleanup
621        for vol in volumes_to_cleanup {
622            let _ = docker.remove_volume(&vol, None).await;
623        }
624    }
625
626    #[tokio::test]
627    async fn test_setup_storage_mounts_host_bind() {
628        let docker = Docker::connect_with_local_defaults().unwrap();
629        let metadata = create_test_metadata_with_mounts();
630        let run_id = metadata.run_id;
631        let machine = DockerContainerMachine::new(docker.clone(), metadata, None);
632
633        let spec: CommonContainerSpec =
634            serde_json::from_value(machine.metadata.step_dsl.spec.clone()).unwrap();
635
636        let host_path = tempdir().unwrap();
637        let sfs_host_path = Some(host_path.path().to_str().unwrap());
638        let (mounts, storage_names, volumes_to_cleanup) = machine
639            .setup_storage_mounts(&spec, sfs_host_path)
640            .await
641            .unwrap();
642
643        assert_eq!(mounts.len(), 1);
644        assert_eq!(storage_names.len(), 1);
645        assert_eq!(volumes_to_cleanup.len(), 0); // No volumes to clean up for bind mounts
646
647        assert_eq!(storage_names[0], "workspace");
648
649        let mount = &mounts[0];
650        assert_eq!(mount.target.as_deref(), Some("/workspace"));
651        assert_eq!(mount.typ, Some(MountTypeEnum::BIND));
652
653        let expected_source = host_path.path().join(run_id.to_string()).join("workspace");
654        assert!(expected_source.is_dir());
655        assert_eq!(mount.source.as_deref(), expected_source.to_str());
656    }
657
658    #[tokio::test]
659    async fn test_setup_storage_mounts_host_bind_rejects_invalid_mount_name() {
660        let docker = Docker::connect_with_local_defaults().unwrap();
661        let metadata = create_test_metadata_with_mount_name("../escape");
662        let machine = DockerContainerMachine::new(docker, metadata, None);
663        let spec: CommonContainerSpec =
664            serde_json::from_value(machine.metadata.step_dsl.spec.clone()).unwrap();
665        let host_path = tempdir().unwrap();
666
667        let err = machine
668            .setup_storage_mounts(&spec, host_path.path().to_str())
669            .await
670            .unwrap_err();
671
672        assert!(
673            err.to_string().contains("Storage mount name '../escape'"),
674            "unexpected error: {err}"
675        );
676    }
677}