1use crate::container_machine::{state, DockerContainerMachine, StartResult};
2use anyhow::{Context, Result};
3use bollard::container::{
4 Config, CreateContainerOptions, StartContainerOptions, WaitContainerOptions,
5};
6use bollard::image::CreateImageOptions;
7use bollard::service::{HostConfig, Mount, MountTypeEnum};
8use bollard::volume::CreateVolumeOptions;
9use chrono::Utc;
10use cloudevents::EventBuilder;
11use futures::StreamExt;
12use std::path::{Path, PathBuf};
13use std::time::Duration;
14use stormchaser_model::dsl::CommonContainerSpec;
15use stormchaser_model::events::StepRunningEvent;
16use tokio::time::sleep;
17use tracing::{error, info};
18use uuid::Uuid;
19
20impl DockerContainerMachine<state::Initialized> {
21 pub fn adopt(self, container_name: String) -> DockerContainerMachine<state::Running> {
23 info!("Adopting orphaned Docker container {}", container_name);
24 DockerContainerMachine {
25 nats: self.nats.clone(),
26 docker: self.docker,
27 metadata: self.metadata,
28 state: state::Running {
29 container_name,
30 dispatched_at: Utc::now(),
31 volumes_to_cleanup: Vec::new(),
32 storage_names: Vec::new(),
33 mounts: Vec::new(),
34 },
35 }
36 }
37
38 pub async fn clean_up(self, container_name: &str) -> Result<()> {
40 info!("Cleaning up orphaned Docker container {}", container_name);
41 let _ = self.docker.stop_container(container_name, None).await;
42 let _ = self.docker.remove_container(container_name, None).await;
43 Ok(())
44 }
45
46 pub async fn start(self) -> Result<StartResult> {
48 let container_name = format!(
49 "storm-{}-{}",
50 self.metadata.step_dsl.name.to_lowercase().replace('_', "-"),
51 &self.metadata.step_id.to_string()[..8]
52 );
53
54 let spec: CommonContainerSpec = serde_json::from_value(self.metadata.step_dsl.spec.clone())
55 .context("Failed to parse RunContainer spec as CommonContainerSpec")?;
56
57 let mut mounts = Vec::new();
58 let mut storage_names = Vec::new();
59 let mut volumes_to_cleanup = Vec::new();
60
61 if let Some(storage_mounts) = &spec.storage_mounts {
62 for mount in storage_mounts {
63 storage_names.push(mount.name.clone());
64 let volume_name = format!(
65 "sfs-{}-{}",
66 mount.name.to_lowercase().replace('_', "-"),
67 &self.metadata.run_id.to_string()[..8]
68 );
69
70 info!("Ensuring Docker volume exists: {}", volume_name);
71 self.docker
72 .create_volume(CreateVolumeOptions {
73 name: volume_name.clone(),
74 ..Default::default()
75 })
76 .await?;
77
78 volumes_to_cleanup.push(volume_name.clone());
79
80 mounts.push(Mount {
81 target: Some(mount.mount_path.clone()),
82 source: Some(volume_name.clone()),
83 typ: Some(MountTypeEnum::VOLUME),
84 read_only: mount.read_only,
85 ..Default::default()
86 });
87
88 if let Some(storage_data) = &self.metadata.storage {
89 if let Some(urls) = storage_data.get(&mount.name) {
90 let has_state =
91 urls.get("expected_hash").and_then(|h| h.as_str()).is_some();
92
93 if has_state {
94 if let Some(get_url) = urls.get("get_url").and_then(|u| u.as_str()) {
95 info!(
96 "Unparking storage '{}' (resume state) for volume '{}'",
97 mount.name, volume_name
98 );
99 if let Some(nats) = &self.nats {
100 let unpacking_event = serde_json::json!({
101 "run_id": self.metadata.run_id,
102 "step_id": self.metadata.step_id,
103 "status": "unpacking_sfs",
104 "timestamp": chrono::Utc::now(),
105 });
106 if let Ok(ce) = cloudevents::EventBuilderV10::new()
107 .id(uuid::Uuid::new_v4().to_string())
108 .ty("stormchaser.v1.step.unpacking_sfs")
109 .source("/stormchaser/runner")
110 .time(chrono::Utc::now())
111 .data("application/json", unpacking_event)
112 .build()
113 {
114 if let Ok(payload_bytes) = serde_json::to_vec(&ce) {
115 let _ = nats
116 .publish(
117 "stormchaser.v1.step.unpacking_sfs",
118 payload_bytes.into(),
119 )
120 .await;
121 }
122 }
123 }
124 self.unpark_storage(
125 &volume_name,
126 &mount.mount_path,
127 &mount.mount_path,
128 get_url,
129 false,
130 None,
131 )
132 .await?;
133 }
134 } else if let Some(provision) =
135 urls.get("provision").and_then(|p| p.as_array())
136 {
137 for prov in provision {
138 if let (Some(url), Some(dest)) = (
139 prov.get("url").and_then(|u| u.as_str()),
140 prov.get("destination").and_then(|d| d.as_str()),
141 ) {
142 info!(
143 "Provisioning storage '{}' from URL '{}' into destination '{}'",
144 mount.name, url, dest
145 );
146 if let Some(nats) = &self.nats {
147 let unpacking_event = serde_json::json!({
148 "run_id": self.metadata.run_id,
149 "step_id": self.metadata.step_id,
150 "status": "unpacking_sfs",
151 "timestamp": chrono::Utc::now(),
152 });
153 if let Ok(ce) = cloudevents::EventBuilderV10::new()
154 .id(uuid::Uuid::new_v4().to_string())
155 .ty("stormchaser.v1.step.unpacking_sfs")
156 .source("/stormchaser/runner")
157 .time(chrono::Utc::now())
158 .data("application/json", unpacking_event)
159 .build()
160 {
161 if let Ok(payload_bytes) = serde_json::to_vec(&ce) {
162 let _ = nats
163 .publish(
164 "stormchaser.v1.step.unpacking_sfs",
165 payload_bytes.into(),
166 )
167 .await;
168 }
169 }
170 }
171 let mut full_dest = PathBuf::from(&mount.mount_path);
172 if dest != "/" && !dest.is_empty() {
173 let relative_dest = dest.trim_start_matches('/');
174 for component in Path::new(relative_dest).components() {
176 match component {
177 std::path::Component::ParentDir => {
178 anyhow::bail!(
179 "Provision destination '{}' contains illegal path traversal (..)",
180 dest
181 );
182 }
183 std::path::Component::Prefix(_) => {
184 anyhow::bail!(
185 "Provision destination '{}' contains an illegal absolute path prefix",
186 dest
187 );
188 }
189 _ => {}
190 }
191 }
192 full_dest.push(relative_dest);
193 }
194
195 let resource_type =
196 prov.get("resource_type").and_then(|r| r.as_str());
197 let is_extract = resource_type != Some("artifact");
198 let prov_mode = prov
199 .get("mode")
200 .and_then(|m| m.as_str())
201 .map(str::to_owned);
202 info!(
203 "Provisioning resource_type: {:?}, is_extract: {}",
204 resource_type, is_extract
205 );
206
207 let full_dest_str = full_dest.to_string_lossy().into_owned();
208 self.unpark_storage(
209 &volume_name,
210 &mount.mount_path,
211 &full_dest_str,
212 url,
213 !is_extract,
214 prov_mode.as_deref(),
215 )
216 .await?;
217 }
218 }
219 }
220 }
221 }
222 }
223 }
224
225 info!("Pulling image {}", spec.image);
226 self.pull_image(&spec.image).await?;
227
228 let network_mode = self.get_network_mode().await;
229 let config =
230 self.build_container_config(&spec, mounts.clone(), network_mode, &storage_names)?;
231
232 info!("Creating container {}", container_name);
233 let dispatched_at = Utc::now();
234 self.docker
235 .create_container(
236 Some(CreateContainerOptions {
237 name: container_name.clone(),
238 ..Default::default()
239 }),
240 config,
241 )
242 .await?;
243
244 info!("Starting container {}", container_name);
245 self.docker
246 .start_container(&container_name, None::<StartContainerOptions<String>>)
247 .await?;
248
249 if let Some(nats) = &self.nats {
250 let running_event = StepRunningEvent {
251 run_id: stormchaser_model::RunId::new(self.metadata.run_id),
252 step_id: stormchaser_model::StepInstanceId::new(self.metadata.step_id),
253 event_type: "stormchaser.v1.step.running".to_string(),
254 timestamp: chrono::Utc::now(),
255 };
256 let _ = stormchaser_model::nats::publish_cloudevent(
257 &async_nats::jetstream::new(nats.clone()),
258 "stormchaser.v1.step.running",
259 "stormchaser.v1.step.running",
260 "/stormchaser",
261 serde_json::to_value(running_event).unwrap(),
262 Some("1.0"),
263 None,
264 )
265 .await;
266 }
267
268 Ok(StartResult::Running(DockerContainerMachine {
269 nats: self.nats.clone(),
270 docker: self.docker,
271 metadata: self.metadata,
272 state: state::Running {
273 container_name,
274 dispatched_at,
275 volumes_to_cleanup,
276 storage_names,
277 mounts,
278 },
279 }))
280 }
281
282 async fn unpark_storage(
283 &self,
284 volume_name: &str,
285 volume_mount_path: &str,
286 destination_path: &str,
287 get_url: &str,
288 no_extract: bool,
289 mode: Option<&str>,
290 ) -> Result<()> {
291 let agent_image = "stormchaser-agent:v1";
292 let unpark_container_name = format!("unpark-{}", Uuid::new_v4());
293
294 let mut cmd = vec![
295 "/usr/local/bin/stormchaser-agent".to_string(),
296 "unpark".to_string(),
297 "--url".to_string(),
298 get_url.to_string(),
299 "--destination".to_string(),
300 destination_path.to_string(),
301 ];
302 if no_extract {
303 cmd.push("--no-extract".to_string());
304 }
305 if let Some(m) = mode {
306 cmd.push("--mode".to_string());
307 cmd.push(m.to_string());
308 }
309
310 let config = Config {
311 image: Some(agent_image.to_string()),
312 cmd: Some(cmd),
313 host_config: Some(HostConfig {
314 mounts: Some(vec![Mount {
315 target: Some(volume_mount_path.to_string()),
316 source: Some(volume_name.to_string()),
317 typ: Some(MountTypeEnum::VOLUME),
318 ..Default::default()
319 }]),
320 network_mode: self.get_network_mode().await,
321 ..Default::default()
322 }),
323 ..Default::default()
324 };
325
326 self.docker
327 .create_container(
328 Some(CreateContainerOptions {
329 name: unpark_container_name.clone(),
330 ..Default::default()
331 }),
332 config,
333 )
334 .await?;
335
336 self.docker
337 .start_container(
338 &unpark_container_name,
339 None::<StartContainerOptions<String>>,
340 )
341 .await?;
342
343 let mut wait_stream = self.docker.wait_container(
344 &unpark_container_name,
345 Some(WaitContainerOptions {
346 condition: "not-running",
347 }),
348 );
349
350 if let Some(wait_result) = wait_stream.next().await {
351 match wait_result {
352 Ok(res) if res.status_code == 0 => {
353 info!("Unpark successful for {}", volume_name);
354 sleep(Duration::from_secs(15)).await;
356 if let Err(e) = self
357 .docker
358 .remove_container(&unpark_container_name, None)
359 .await
360 {
361 error!(
362 "Failed to remove unpark container {}: {:?}",
363 unpark_container_name, e
364 );
365 }
366 Ok(())
367 }
368 res => {
369 error!("Unpark failed for {}: {:?}", volume_name, res);
370 if let Err(e) = self
371 .docker
372 .remove_container(&unpark_container_name, None)
373 .await
374 {
375 error!(
376 "Failed to remove unpark container {}: {:?}",
377 unpark_container_name, e
378 );
379 }
380 Err(anyhow::anyhow!(
381 "Unpark failed for {}: {:?}",
382 volume_name,
383 res
384 ))
385 }
386 }
387 } else {
388 if let Err(e) = self
389 .docker
390 .remove_container(&unpark_container_name, None)
391 .await
392 {
393 error!(
394 "Failed to remove unpark container {}: {:?}",
395 unpark_container_name, e
396 );
397 }
398 Err(anyhow::anyhow!("Wait stream ended unexpectedly"))
399 }
400 }
401
402 async fn pull_image(&self, image: &str) -> Result<()> {
403 let (from_image, tag) = if image.contains('@') {
408 (image, "")
410 } else {
411 match image.rsplit_once(':') {
412 Some((repo, t)) if !t.contains('/') => (repo, t),
415 _ => (image, "latest"),
416 }
417 };
418
419 let mut pull_stream = self.docker.create_image(
420 Some(CreateImageOptions {
421 from_image: from_image.to_string(),
422 tag: tag.to_string(),
423 ..Default::default()
424 }),
425 None,
426 None,
427 );
428 while let Some(pull_result) = pull_stream.next().await {
429 if let Err(e) = pull_result {
430 error!("Error pulling image {}: {:?}", image, e);
431 anyhow::bail!("Failed to pull image {}: {}", image, e);
432 }
433 }
434 Ok(())
435 }
436}