Skip to main content

stormchaser_runner_docker/container_machine/transitions/
initialized.rs

1use crate::container_machine::{state, DockerContainerMachine, StartResult};
2use anyhow::{Context, Result};
3use bollard::container::{
4    Config, CreateContainerOptions, StartContainerOptions, WaitContainerOptions,
5};
6use bollard::image::CreateImageOptions;
7use bollard::service::{HostConfig, Mount, MountTypeEnum};
8use bollard::volume::CreateVolumeOptions;
9use chrono::Utc;
10use cloudevents::EventBuilder;
11use futures::StreamExt;
12use std::path::{Path, PathBuf};
13use std::time::Duration;
14use stormchaser_model::dsl::CommonContainerSpec;
15use stormchaser_model::events::StepRunningEvent;
16use tokio::time::sleep;
17use tracing::{error, info};
18use uuid::Uuid;
19
20impl DockerContainerMachine<state::Initialized> {
21    /// Adopts an already-running container by name, transitioning the state machine to `Running`.
22    pub fn adopt(self, container_name: String) -> DockerContainerMachine<state::Running> {
23        info!("Adopting orphaned Docker container {}", container_name);
24        DockerContainerMachine {
25            nats: self.nats.clone(),
26            docker: self.docker,
27            metadata: self.metadata,
28            state: state::Running {
29                container_name,
30                dispatched_at: Utc::now(),
31                volumes_to_cleanup: Vec::new(),
32                storage_names: Vec::new(),
33                mounts: Vec::new(),
34            },
35        }
36    }
37
38    /// Cleans up an orphaned container without running it through the state machine.
39    pub async fn clean_up(self, container_name: &str) -> Result<()> {
40        info!("Cleaning up orphaned Docker container {}", container_name);
41        let _ = self.docker.stop_container(container_name, None).await;
42        let _ = self.docker.remove_container(container_name, None).await;
43        Ok(())
44    }
45
46    /// Starts a new Docker container, pulling images and unparking storage as necessary.
47    pub async fn start(self) -> Result<StartResult> {
48        let container_name = format!(
49            "storm-{}-{}",
50            self.metadata.step_dsl.name.to_lowercase().replace('_', "-"),
51            &self.metadata.step_id.to_string()[..8]
52        );
53
54        let spec: CommonContainerSpec = serde_json::from_value(self.metadata.step_dsl.spec.clone())
55            .context("Failed to parse RunContainer spec as CommonContainerSpec")?;
56
57        let mut mounts = Vec::new();
58        let mut storage_names = Vec::new();
59        let mut volumes_to_cleanup = Vec::new();
60
61        if let Some(storage_mounts) = &spec.storage_mounts {
62            for mount in storage_mounts {
63                storage_names.push(mount.name.clone());
64                let volume_name = format!(
65                    "sfs-{}-{}",
66                    mount.name.to_lowercase().replace('_', "-"),
67                    &self.metadata.run_id.to_string()[..8]
68                );
69
70                info!("Ensuring Docker volume exists: {}", volume_name);
71                self.docker
72                    .create_volume(CreateVolumeOptions {
73                        name: volume_name.clone(),
74                        ..Default::default()
75                    })
76                    .await?;
77
78                volumes_to_cleanup.push(volume_name.clone());
79
80                mounts.push(Mount {
81                    target: Some(mount.mount_path.clone()),
82                    source: Some(volume_name.clone()),
83                    typ: Some(MountTypeEnum::VOLUME),
84                    read_only: mount.read_only,
85                    ..Default::default()
86                });
87
88                if let Some(storage_data) = &self.metadata.storage {
89                    if let Some(urls) = storage_data.get(&mount.name) {
90                        let has_state =
91                            urls.get("expected_hash").and_then(|h| h.as_str()).is_some();
92
93                        if has_state {
94                            if let Some(get_url) = urls.get("get_url").and_then(|u| u.as_str()) {
95                                info!(
96                                    "Unparking storage '{}' (resume state) for volume '{}'",
97                                    mount.name, volume_name
98                                );
99                                if let Some(nats) = &self.nats {
100                                    let unpacking_event = serde_json::json!({
101                                        "run_id": self.metadata.run_id,
102                                        "step_id": self.metadata.step_id,
103                                        "status": "unpacking_sfs",
104                                        "timestamp": chrono::Utc::now(),
105                                    });
106                                    if let Ok(ce) = cloudevents::EventBuilderV10::new()
107                                        .id(uuid::Uuid::new_v4().to_string())
108                                        .ty("stormchaser.v1.step.unpacking_sfs")
109                                        .source("/stormchaser/runner")
110                                        .time(chrono::Utc::now())
111                                        .data("application/json", unpacking_event)
112                                        .build()
113                                    {
114                                        if let Ok(payload_bytes) = serde_json::to_vec(&ce) {
115                                            let _ = nats
116                                                .publish(
117                                                    "stormchaser.v1.step.unpacking_sfs",
118                                                    payload_bytes.into(),
119                                                )
120                                                .await;
121                                        }
122                                    }
123                                }
124                                self.unpark_storage(
125                                    &volume_name,
126                                    &mount.mount_path,
127                                    &mount.mount_path,
128                                    get_url,
129                                    false,
130                                    None,
131                                )
132                                .await?;
133                            }
134                        } else if let Some(provision) =
135                            urls.get("provision").and_then(|p| p.as_array())
136                        {
137                            for prov in provision {
138                                if let (Some(url), Some(dest)) = (
139                                    prov.get("url").and_then(|u| u.as_str()),
140                                    prov.get("destination").and_then(|d| d.as_str()),
141                                ) {
142                                    info!(
143                                        "Provisioning storage '{}' from URL '{}' into destination '{}'",
144                                        mount.name, url, dest
145                                    );
146                                    if let Some(nats) = &self.nats {
147                                        let unpacking_event = serde_json::json!({
148                                            "run_id": self.metadata.run_id,
149                                            "step_id": self.metadata.step_id,
150                                            "status": "unpacking_sfs",
151                                            "timestamp": chrono::Utc::now(),
152                                        });
153                                        if let Ok(ce) = cloudevents::EventBuilderV10::new()
154                                            .id(uuid::Uuid::new_v4().to_string())
155                                            .ty("stormchaser.v1.step.unpacking_sfs")
156                                            .source("/stormchaser/runner")
157                                            .time(chrono::Utc::now())
158                                            .data("application/json", unpacking_event)
159                                            .build()
160                                        {
161                                            if let Ok(payload_bytes) = serde_json::to_vec(&ce) {
162                                                let _ = nats
163                                                    .publish(
164                                                        "stormchaser.v1.step.unpacking_sfs",
165                                                        payload_bytes.into(),
166                                                    )
167                                                    .await;
168                                            }
169                                        }
170                                    }
171                                    let mut full_dest = PathBuf::from(&mount.mount_path);
172                                    if dest != "/" && !dest.is_empty() {
173                                        let relative_dest = dest.trim_start_matches('/');
174                                        // Reject path traversal and Windows-style drive prefixes.
175                                        for component in Path::new(relative_dest).components() {
176                                            match component {
177                                                std::path::Component::ParentDir => {
178                                                    anyhow::bail!(
179                                                        "Provision destination '{}' contains illegal path traversal (..)",
180                                                        dest
181                                                    );
182                                                }
183                                                std::path::Component::Prefix(_) => {
184                                                    anyhow::bail!(
185                                                        "Provision destination '{}' contains an illegal absolute path prefix",
186                                                        dest
187                                                    );
188                                                }
189                                                _ => {}
190                                            }
191                                        }
192                                        full_dest.push(relative_dest);
193                                    }
194
195                                    let resource_type =
196                                        prov.get("resource_type").and_then(|r| r.as_str());
197                                    let is_extract = resource_type != Some("artifact");
198                                    let prov_mode = prov
199                                        .get("mode")
200                                        .and_then(|m| m.as_str())
201                                        .map(str::to_owned);
202                                    info!(
203                                        "Provisioning resource_type: {:?}, is_extract: {}",
204                                        resource_type, is_extract
205                                    );
206
207                                    let full_dest_str = full_dest.to_string_lossy().into_owned();
208                                    self.unpark_storage(
209                                        &volume_name,
210                                        &mount.mount_path,
211                                        &full_dest_str,
212                                        url,
213                                        !is_extract,
214                                        prov_mode.as_deref(),
215                                    )
216                                    .await?;
217                                }
218                            }
219                        }
220                    }
221                }
222            }
223        }
224
225        info!("Pulling image {}", spec.image);
226        self.pull_image(&spec.image).await?;
227
228        let network_mode = self.get_network_mode().await;
229        let config =
230            self.build_container_config(&spec, mounts.clone(), network_mode, &storage_names)?;
231
232        info!("Creating container {}", container_name);
233        let dispatched_at = Utc::now();
234        self.docker
235            .create_container(
236                Some(CreateContainerOptions {
237                    name: container_name.clone(),
238                    ..Default::default()
239                }),
240                config,
241            )
242            .await?;
243
244        info!("Starting container {}", container_name);
245        self.docker
246            .start_container(&container_name, None::<StartContainerOptions<String>>)
247            .await?;
248
249        if let Some(nats) = &self.nats {
250            let running_event = StepRunningEvent {
251                run_id: stormchaser_model::RunId::new(self.metadata.run_id),
252                step_id: stormchaser_model::StepInstanceId::new(self.metadata.step_id),
253                event_type: "stormchaser.v1.step.running".to_string(),
254                timestamp: chrono::Utc::now(),
255            };
256            let _ = stormchaser_model::nats::publish_cloudevent(
257                &async_nats::jetstream::new(nats.clone()),
258                "stormchaser.v1.step.running",
259                "stormchaser.v1.step.running",
260                "/stormchaser",
261                serde_json::to_value(running_event).unwrap(),
262                Some("1.0"),
263                None,
264            )
265            .await;
266        }
267
268        Ok(StartResult::Running(DockerContainerMachine {
269            nats: self.nats.clone(),
270            docker: self.docker,
271            metadata: self.metadata,
272            state: state::Running {
273                container_name,
274                dispatched_at,
275                volumes_to_cleanup,
276                storage_names,
277                mounts,
278            },
279        }))
280    }
281
282    async fn unpark_storage(
283        &self,
284        volume_name: &str,
285        volume_mount_path: &str,
286        destination_path: &str,
287        get_url: &str,
288        no_extract: bool,
289        mode: Option<&str>,
290    ) -> Result<()> {
291        let agent_image = "stormchaser-agent:v1";
292        let unpark_container_name = format!("unpark-{}", Uuid::new_v4());
293
294        let mut cmd = vec![
295            "/usr/local/bin/stormchaser-agent".to_string(),
296            "unpark".to_string(),
297            "--url".to_string(),
298            get_url.to_string(),
299            "--destination".to_string(),
300            destination_path.to_string(),
301        ];
302        if no_extract {
303            cmd.push("--no-extract".to_string());
304        }
305        if let Some(m) = mode {
306            cmd.push("--mode".to_string());
307            cmd.push(m.to_string());
308        }
309
310        let config = Config {
311            image: Some(agent_image.to_string()),
312            cmd: Some(cmd),
313            host_config: Some(HostConfig {
314                mounts: Some(vec![Mount {
315                    target: Some(volume_mount_path.to_string()),
316                    source: Some(volume_name.to_string()),
317                    typ: Some(MountTypeEnum::VOLUME),
318                    ..Default::default()
319                }]),
320                network_mode: self.get_network_mode().await,
321                ..Default::default()
322            }),
323            ..Default::default()
324        };
325
326        self.docker
327            .create_container(
328                Some(CreateContainerOptions {
329                    name: unpark_container_name.clone(),
330                    ..Default::default()
331                }),
332                config,
333            )
334            .await?;
335
336        self.docker
337            .start_container(
338                &unpark_container_name,
339                None::<StartContainerOptions<String>>,
340            )
341            .await?;
342
343        let mut wait_stream = self.docker.wait_container(
344            &unpark_container_name,
345            Some(WaitContainerOptions {
346                condition: "not-running",
347            }),
348        );
349
350        if let Some(wait_result) = wait_stream.next().await {
351            match wait_result {
352                Ok(res) if res.status_code == 0 => {
353                    info!("Unpark successful for {}", volume_name);
354                    // Wait for logs
355                    sleep(Duration::from_secs(15)).await;
356                    if let Err(e) = self
357                        .docker
358                        .remove_container(&unpark_container_name, None)
359                        .await
360                    {
361                        error!(
362                            "Failed to remove unpark container {}: {:?}",
363                            unpark_container_name, e
364                        );
365                    }
366                    Ok(())
367                }
368                res => {
369                    error!("Unpark failed for {}: {:?}", volume_name, res);
370                    if let Err(e) = self
371                        .docker
372                        .remove_container(&unpark_container_name, None)
373                        .await
374                    {
375                        error!(
376                            "Failed to remove unpark container {}: {:?}",
377                            unpark_container_name, e
378                        );
379                    }
380                    Err(anyhow::anyhow!(
381                        "Unpark failed for {}: {:?}",
382                        volume_name,
383                        res
384                    ))
385                }
386            }
387        } else {
388            if let Err(e) = self
389                .docker
390                .remove_container(&unpark_container_name, None)
391                .await
392            {
393                error!(
394                    "Failed to remove unpark container {}: {:?}",
395                    unpark_container_name, e
396                );
397            }
398            Err(anyhow::anyhow!("Wait stream ended unexpectedly"))
399        }
400    }
401
402    async fn pull_image(&self, image: &str) -> Result<()> {
403        // Parse image reference robustly:
404        // - Digest-pinned: `repo@sha256:...` → from_image=full ref, tag=""
405        // - Tagged:        `repo:tag`         → from_image=repo, tag=tag
406        // - Bare:          `repo`             → from_image=repo, tag="latest"
407        let (from_image, tag) = if image.contains('@') {
408            // Digest reference — pass the full string and let Docker handle it
409            (image, "")
410        } else {
411            match image.rsplit_once(':') {
412                // Only treat it as a tag if the part after `:` contains no `/`
413                // (to avoid splitting registry hosts like `registry.example.com:5000/repo`)
414                Some((repo, t)) if !t.contains('/') => (repo, t),
415                _ => (image, "latest"),
416            }
417        };
418
419        let mut pull_stream = self.docker.create_image(
420            Some(CreateImageOptions {
421                from_image: from_image.to_string(),
422                tag: tag.to_string(),
423                ..Default::default()
424            }),
425            None,
426            None,
427        );
428        while let Some(pull_result) = pull_stream.next().await {
429            if let Err(e) = pull_result {
430                error!("Error pulling image {}: {:?}", image, e);
431                anyhow::bail!("Failed to pull image {}: {}", image, e);
432            }
433        }
434        Ok(())
435    }
436}