Skip to main content

wfe_containerd/
step.rs

1use std::collections::HashMap;
2use std::path::Path;
3
4use async_trait::async_trait;
5use tonic::transport::{Channel, Endpoint, Uri};
6use wfe_core::WfeError;
7use wfe_core::models::ExecutionResult;
8use wfe_core::traits::step::{StepBody, StepExecutionContext};
9
10use wfe_containerd_protos::containerd::services::containers::v1::{
11    Container, CreateContainerRequest, DeleteContainerRequest, container::Runtime,
12    containers_client::ContainersClient,
13};
14use wfe_containerd_protos::containerd::services::content::v1::{
15    ReadContentRequest, content_client::ContentClient,
16};
17use wfe_containerd_protos::containerd::services::images::v1::{
18    GetImageRequest, images_client::ImagesClient,
19};
20use wfe_containerd_protos::containerd::services::snapshots::v1::{
21    MountsRequest, PrepareSnapshotRequest, snapshots_client::SnapshotsClient,
22};
23use wfe_containerd_protos::containerd::services::tasks::v1::{
24    CreateTaskRequest, DeleteTaskRequest, StartRequest, WaitRequest, tasks_client::TasksClient,
25};
26use wfe_containerd_protos::containerd::services::version::v1::version_client::VersionClient;
27
28use crate::config::ContainerdConfig;
29
30/// Default containerd namespace.
31const DEFAULT_NAMESPACE: &str = "default";
32
33/// Default snapshotter for rootless containerd.
34const DEFAULT_SNAPSHOTTER: &str = "overlayfs";
35
36pub struct ContainerdStep {
37    config: ContainerdConfig,
38}
39
40impl ContainerdStep {
41    pub fn new(config: ContainerdConfig) -> Self {
42        Self { config }
43    }
44
45    /// Connect to the containerd daemon and return a raw tonic `Channel`.
46    ///
47    /// Supports Unix socket paths (bare `/path` or `unix:///path`) and
48    /// TCP/HTTP endpoints.
49    pub(crate) async fn connect(addr: &str) -> Result<Channel, WfeError> {
50        let channel = if addr.starts_with('/') || addr.starts_with("unix://") {
51            let socket_path = addr.strip_prefix("unix://").unwrap_or(addr).to_string();
52
53            if !Path::new(&socket_path).exists() {
54                return Err(WfeError::StepExecution(format!(
55                    "containerd socket not found: {socket_path}"
56                )));
57            }
58
59            Endpoint::try_from("http://[::]:50051")
60                .map_err(|e| WfeError::StepExecution(format!("failed to create endpoint: {e}")))?
61                .connect_with_connector(tower::service_fn(move |_: Uri| {
62                    let path = socket_path.clone();
63                    async move {
64                        tokio::net::UnixStream::connect(path)
65                            .await
66                            .map(hyper_util::rt::TokioIo::new)
67                    }
68                }))
69                .await
70                .map_err(|e| {
71                    WfeError::StepExecution(format!(
72                        "failed to connect to containerd via Unix socket at {addr}: {e}"
73                    ))
74                })?
75        } else {
76            let connect_addr = if addr.starts_with("tcp://") {
77                addr.replacen("tcp://", "http://", 1)
78            } else {
79                addr.to_string()
80            };
81
82            Endpoint::from_shared(connect_addr.clone())
83                .map_err(|e| {
84                    WfeError::StepExecution(format!(
85                        "invalid containerd endpoint {connect_addr}: {e}"
86                    ))
87                })?
88                .timeout(std::time::Duration::from_secs(30))
89                .connect()
90                .await
91                .map_err(|e| {
92                    WfeError::StepExecution(format!(
93                        "failed to connect to containerd at {connect_addr}: {e}"
94                    ))
95                })?
96        };
97
98        Ok(channel)
99    }
100
101    /// Check whether an image exists in containerd's image store.
102    ///
103    /// Image pulling via raw containerd gRPC is complex (content store +
104    /// snapshots + transfer). For now we only verify the image exists and
105    /// return an error if it does not. Images must be pre-pulled via
106    /// `ctr image pull` or `nerdctl pull`.
107    ///
108    /// TODO: implement full image pull via TransferService or content ingest.
109    async fn ensure_image(channel: &Channel, image: &str, namespace: &str) -> Result<(), WfeError> {
110        let mut client = ImagesClient::new(channel.clone());
111
112        let mut req = tonic::Request::new(GetImageRequest {
113            name: image.to_string(),
114        });
115        req.metadata_mut()
116            .insert("containerd-namespace", namespace.parse().unwrap());
117
118        match client.get(req).await {
119            Ok(_) => Ok(()),
120            Err(status) => Err(WfeError::StepExecution(format!(
121                "image '{image}' not found in containerd (namespace={namespace}). \
122                 Pre-pull it with: ctr -n {namespace} image pull {image}  \
123                 (gRPC status: {status})"
124            ))),
125        }
126    }
127
128    /// Resolve the snapshot chain ID for an image.
129    ///
130    /// This reads the image manifest and config from the content store to
131    /// compute the chain ID of the topmost layer. The chain ID is used as
132    /// the parent snapshot when preparing a writable rootfs for a container.
133    ///
134    /// Chain ID computation follows the OCI image spec:
135    ///   chain_id[0] = diff_id[0]
136    ///   chain_id[n] = sha256(chain_id[n-1] + " " + diff_id[n])
137    async fn resolve_image_chain_id(
138        channel: &Channel,
139        image: &str,
140        namespace: &str,
141    ) -> Result<String, WfeError> {
142        use sha2::{Digest, Sha256};
143
144        // 1. Get the image record to find the manifest digest.
145        let mut images_client = ImagesClient::new(channel.clone());
146        let req = Self::with_namespace(
147            GetImageRequest {
148                name: image.to_string(),
149            },
150            namespace,
151        );
152        let image_resp = images_client
153            .get(req)
154            .await
155            .map_err(|e| WfeError::StepExecution(format!("failed to get image '{image}': {e}")))?;
156        let img = image_resp
157            .into_inner()
158            .image
159            .ok_or_else(|| WfeError::StepExecution(format!("image '{image}' has no record")))?;
160        let target = img.target.ok_or_else(|| {
161            WfeError::StepExecution(format!("image '{image}' has no target descriptor"))
162        })?;
163
164        // The target might be an index (multi-platform) or a manifest.
165        // Read the content and determine based on mediaType.
166        let manifest_digest = target.digest.clone();
167        let manifest_bytes = Self::read_content(channel, &manifest_digest, namespace).await?;
168        let manifest_json: serde_json::Value = serde_json::from_slice(&manifest_bytes)
169            .map_err(|e| WfeError::StepExecution(format!("failed to parse manifest: {e}")))?;
170
171        // 2. If it's an index, pick the matching platform manifest.
172        let manifest_json = if manifest_json.get("manifests").is_some() {
173            // OCI image index — find the platform-matching manifest.
174            let arch = std::env::consts::ARCH;
175            let oci_arch = match arch {
176                "aarch64" => "arm64",
177                "x86_64" => "amd64",
178                other => other,
179            };
180            let manifests = manifest_json["manifests"].as_array().ok_or_else(|| {
181                WfeError::StepExecution("image index has no manifests array".to_string())
182            })?;
183            let platform_manifest = manifests
184                .iter()
185                .find(|m| {
186                    m.get("platform")
187                        .and_then(|p| p.get("architecture"))
188                        .and_then(|a| a.as_str())
189                        == Some(oci_arch)
190                })
191                .ok_or_else(|| {
192                    WfeError::StepExecution(format!(
193                        "no manifest for architecture '{oci_arch}' in image index"
194                    ))
195                })?;
196            let digest = platform_manifest["digest"].as_str().ok_or_else(|| {
197                WfeError::StepExecution("platform manifest has no digest".to_string())
198            })?;
199            let bytes = Self::read_content(channel, digest, namespace).await?;
200            serde_json::from_slice(&bytes).map_err(|e| {
201                WfeError::StepExecution(format!("failed to parse platform manifest: {e}"))
202            })?
203        } else {
204            manifest_json
205        };
206
207        // 3. Get the config digest from the manifest.
208        let config_digest = manifest_json["config"]["digest"]
209            .as_str()
210            .ok_or_else(|| WfeError::StepExecution("manifest has no config.digest".to_string()))?;
211
212        // 4. Read the image config.
213        let config_bytes = Self::read_content(channel, config_digest, namespace).await?;
214        let config_json: serde_json::Value = serde_json::from_slice(&config_bytes)
215            .map_err(|e| WfeError::StepExecution(format!("failed to parse image config: {e}")))?;
216
217        // 5. Extract diff_ids and compute chain ID.
218        let diff_ids = config_json["rootfs"]["diff_ids"]
219            .as_array()
220            .ok_or_else(|| {
221                WfeError::StepExecution("image config has no rootfs.diff_ids".to_string())
222            })?;
223
224        if diff_ids.is_empty() {
225            return Err(WfeError::StepExecution(
226                "image has no layers (empty diff_ids)".to_string(),
227            ));
228        }
229
230        let mut chain_id = diff_ids[0]
231            .as_str()
232            .ok_or_else(|| WfeError::StepExecution("diff_id is not a string".to_string()))?
233            .to_string();
234
235        for diff_id in &diff_ids[1..] {
236            let diff = diff_id
237                .as_str()
238                .ok_or_else(|| WfeError::StepExecution("diff_id is not a string".to_string()))?;
239            let mut hasher = Sha256::new();
240            hasher.update(format!("{chain_id} {diff}"));
241            chain_id = format!("sha256:{:x}", hasher.finalize());
242        }
243
244        tracing::debug!(image = image, chain_id = %chain_id, "resolved image chain ID");
245        Ok(chain_id)
246    }
247
248    /// Read content from the containerd content store by digest.
249    async fn read_content(
250        channel: &Channel,
251        digest: &str,
252        namespace: &str,
253    ) -> Result<Vec<u8>, WfeError> {
254        use tokio_stream::StreamExt;
255
256        let mut client = ContentClient::new(channel.clone());
257        let req = Self::with_namespace(
258            ReadContentRequest {
259                digest: digest.to_string(),
260                offset: 0,
261                size: 0, // read all
262            },
263            namespace,
264        );
265
266        let mut stream = client
267            .read(req)
268            .await
269            .map_err(|e| WfeError::StepExecution(format!("failed to read content {digest}: {e}")))?
270            .into_inner();
271
272        let mut data = Vec::new();
273        while let Some(chunk) = stream.next().await {
274            let chunk = chunk.map_err(|e| {
275                WfeError::StepExecution(format!("error reading content {digest}: {e}"))
276            })?;
277            data.extend_from_slice(&chunk.data);
278        }
279
280        Ok(data)
281    }
282
283    /// Build a minimal OCI runtime spec as a `prost_types::Any`.
284    ///
285    /// The spec is serialized as JSON and wrapped in a protobuf Any with
286    /// the containerd OCI spec type URL.
287    pub(crate) fn build_oci_spec(&self, merged_env: &HashMap<String, String>) -> prost_types::Any {
288        // Build the args array for the process.
289        let args: Vec<String> = if let Some(ref run) = self.config.run {
290            vec!["/bin/sh".to_string(), "-c".to_string(), run.clone()]
291        } else if let Some(ref command) = self.config.command {
292            command.clone()
293        } else {
294            vec![]
295        };
296
297        // Build env in KEY=VALUE form.
298        let env: Vec<String> = merged_env.iter().map(|(k, v)| format!("{k}={v}")).collect();
299
300        // Build mounts.
301        let mut mounts = vec![
302            serde_json::json!({
303                "destination": "/proc",
304                "type": "proc",
305                "source": "proc",
306                "options": ["nosuid", "noexec", "nodev"]
307            }),
308            serde_json::json!({
309                "destination": "/dev",
310                "type": "tmpfs",
311                "source": "tmpfs",
312                "options": ["nosuid", "strictatime", "mode=755", "size=65536k"]
313            }),
314            serde_json::json!({
315                "destination": "/sys",
316                "type": "sysfs",
317                "source": "sysfs",
318                "options": ["nosuid", "noexec", "nodev", "ro"]
319            }),
320        ];
321
322        for vol in &self.config.volumes {
323            let mut opts = vec!["rbind".to_string()];
324            if vol.readonly {
325                opts.push("ro".to_string());
326            }
327            mounts.push(serde_json::json!({
328                "destination": vol.target,
329                "type": "bind",
330                "source": vol.source,
331                "options": opts,
332            }));
333        }
334
335        // Parse user / group.
336        let (uid, gid) = parse_user_spec(&self.config.user);
337
338        let mut process = serde_json::json!({
339            "terminal": false,
340            "user": {
341                "uid": uid,
342                "gid": gid,
343            },
344            "args": args,
345            "env": env,
346            "cwd": self.config.working_dir.as_deref().unwrap_or("/"),
347        });
348
349        // Add capabilities. When running as root, grant the default Docker
350        // capability set so tools like apt-get work. Non-root gets nothing.
351        let caps = if uid == 0 {
352            serde_json::json!([
353                "CAP_AUDIT_WRITE",
354                "CAP_CHOWN",
355                "CAP_DAC_OVERRIDE",
356                "CAP_FOWNER",
357                "CAP_FSETID",
358                "CAP_KILL",
359                "CAP_MKNOD",
360                "CAP_NET_BIND_SERVICE",
361                "CAP_NET_RAW",
362                "CAP_SETFCAP",
363                "CAP_SETGID",
364                "CAP_SETPCAP",
365                "CAP_SETUID",
366                "CAP_SYS_CHROOT",
367            ])
368        } else {
369            serde_json::json!([])
370        };
371        process["capabilities"] = serde_json::json!({
372            "bounding": caps,
373            "effective": caps,
374            "inheritable": caps,
375            "permitted": caps,
376            "ambient": caps,
377        });
378
379        let spec = serde_json::json!({
380            "ociVersion": "1.0.2",
381            "process": process,
382            "root": {
383                "path": "rootfs",
384                "readonly": false,
385            },
386            "mounts": mounts,
387            "linux": {
388                "namespaces": [
389                    { "type": "pid" },
390                    { "type": "ipc" },
391                    { "type": "uts" },
392                    { "type": "mount" },
393                ],
394            },
395        });
396
397        let json_bytes = serde_json::to_vec(&spec).expect("OCI spec serialization cannot fail");
398
399        prost_types::Any {
400            type_url: "types.containerd.io/opencontainers/runtime-spec/1/Spec".to_string(),
401            value: json_bytes,
402        }
403    }
404
405    /// Inject a `containerd-namespace` header into a tonic request.
406    pub(crate) fn with_namespace<T>(req: T, namespace: &str) -> tonic::Request<T> {
407        let mut request = tonic::Request::new(req);
408        request
409            .metadata_mut()
410            .insert("containerd-namespace", namespace.parse().unwrap());
411        request
412    }
413
414    /// Start a long-running service container via the containerd gRPC API.
415    ///
416    /// Used by `ContainerdServiceProvider` to provision infrastructure services.
417    /// The container runs on the host network so its ports are accessible on 127.0.0.1.
418    /// Unlike step execution, this does NOT wait for the container to exit.
419    pub async fn run_service(
420        addr: &str,
421        container_id: &str,
422        image: &str,
423        env: &std::collections::HashMap<String, String>,
424    ) -> Result<(), WfeError> {
425        let namespace = DEFAULT_NAMESPACE;
426        let channel = Self::connect(addr).await?;
427
428        // Verify image exists.
429        Self::ensure_image(&channel, image, namespace).await?;
430
431        // Build a config for host-network service container.
432        let config = ContainerdConfig {
433            image: image.to_string(),
434            command: None,
435            run: None,
436            env: env.clone(),
437            volumes: vec![],
438            working_dir: None,
439            user: "0:0".to_string(),
440            network: "host".to_string(),
441            memory: None,
442            cpu: None,
443            pull: "if-not-present".to_string(),
444            containerd_addr: addr.to_string(),
445            cli: "nerdctl".to_string(),
446            tls: Default::default(),
447            registry_auth: Default::default(),
448            timeout_ms: None,
449        };
450
451        let step = Self::new(config);
452        let oci_spec = step.build_oci_spec(env);
453
454        // Create container.
455        let mut containers_client = ContainersClient::new(channel.clone());
456        let create_req = Self::with_namespace(
457            CreateContainerRequest {
458                container: Some(Container {
459                    id: container_id.to_string(),
460                    image: image.to_string(),
461                    runtime: Some(Runtime {
462                        name: "io.containerd.runc.v2".to_string(),
463                        options: None,
464                    }),
465                    spec: Some(oci_spec),
466                    snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
467                    snapshot_key: container_id.to_string(),
468                    labels: HashMap::new(),
469                    created_at: None,
470                    updated_at: None,
471                    extensions: HashMap::new(),
472                    sandbox: String::new(),
473                }),
474            },
475            namespace,
476        );
477        containers_client.create(create_req).await.map_err(|e| {
478            WfeError::StepExecution(format!("failed to create service container: {e}"))
479        })?;
480
481        // Prepare snapshot.
482        let mut snapshots_client = SnapshotsClient::new(channel.clone());
483        let mounts = {
484            let mounts_req = Self::with_namespace(
485                MountsRequest {
486                    snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
487                    key: container_id.to_string(),
488                },
489                namespace,
490            );
491            match snapshots_client.mounts(mounts_req).await {
492                Ok(resp) => resp.into_inner().mounts,
493                Err(_) => {
494                    let parent = Self::resolve_image_chain_id(&channel, image, namespace).await?;
495                    let prepare_req = Self::with_namespace(
496                        PrepareSnapshotRequest {
497                            snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
498                            key: container_id.to_string(),
499                            parent,
500                            labels: HashMap::new(),
501                        },
502                        namespace,
503                    );
504                    snapshots_client
505                        .prepare(prepare_req)
506                        .await
507                        .map_err(|e| {
508                            WfeError::StepExecution(format!("failed to prepare snapshot: {e}"))
509                        })?
510                        .into_inner()
511                        .mounts
512                }
513            }
514        };
515
516        // Create and start task (no stdout/stderr capture for services).
517        let mut tasks_client = TasksClient::new(channel.clone());
518        let create_task_req = Self::with_namespace(
519            CreateTaskRequest {
520                container_id: container_id.to_string(),
521                rootfs: mounts,
522                stdin: String::new(),
523                stdout: String::new(),
524                stderr: String::new(),
525                terminal: false,
526                checkpoint: None,
527                options: None,
528                runtime_path: String::new(),
529            },
530            namespace,
531        );
532        tasks_client
533            .create(create_task_req)
534            .await
535            .map_err(|e| WfeError::StepExecution(format!("failed to create service task: {e}")))?;
536
537        let start_req = Self::with_namespace(
538            StartRequest {
539                container_id: container_id.to_string(),
540                exec_id: String::new(),
541            },
542            namespace,
543        );
544        tasks_client
545            .start(start_req)
546            .await
547            .map_err(|e| WfeError::StepExecution(format!("failed to start service task: {e}")))?;
548
549        tracing::info!(container_id = %container_id, image = %image, "service container started");
550        Ok(())
551    }
552
553    /// Stop and clean up a service container via the containerd gRPC API.
554    pub async fn cleanup_service(addr: &str, container_id: &str) -> Result<(), WfeError> {
555        let channel = Self::connect(addr).await?;
556        Self::cleanup(&channel, container_id, DEFAULT_NAMESPACE).await
557    }
558
559    /// Parse `##wfe[output key=value]` lines from stdout.
560    pub fn parse_outputs(stdout: &str) -> HashMap<String, String> {
561        let mut outputs = HashMap::new();
562        for line in stdout.lines() {
563            if let Some(rest) = line.strip_prefix("##wfe[output ")
564                && let Some(rest) = rest.strip_suffix(']')
565                && let Some(eq_pos) = rest.find('=')
566            {
567                let name = rest[..eq_pos].trim().to_string();
568                let value = rest[eq_pos + 1..].to_string();
569                outputs.insert(name, value);
570            }
571        }
572        outputs
573    }
574
575    /// Build the output data JSON value from step execution results.
576    pub fn build_output_data(
577        step_name: &str,
578        stdout: &str,
579        stderr: &str,
580        exit_code: i32,
581        parsed_outputs: &HashMap<String, String>,
582    ) -> serde_json::Value {
583        let mut outputs = serde_json::Map::new();
584        for (key, value) in parsed_outputs {
585            outputs.insert(key.clone(), serde_json::Value::String(value.clone()));
586        }
587        outputs.insert(
588            format!("{step_name}.stdout"),
589            serde_json::Value::String(stdout.to_string()),
590        );
591        outputs.insert(
592            format!("{step_name}.stderr"),
593            serde_json::Value::String(stderr.to_string()),
594        );
595        outputs.insert(
596            format!("{step_name}.exit_code"),
597            serde_json::Value::Number(serde_json::Number::from(exit_code)),
598        );
599        serde_json::Value::Object(outputs)
600    }
601}
602
603/// Parse a "uid:gid" string into (u32, u32). Falls back to (65534, 65534).
604fn parse_user_spec(user: &str) -> (u32, u32) {
605    let parts: Vec<&str> = user.split(':').collect();
606    if parts.len() == 2 {
607        let uid = parts[0].parse().unwrap_or(65534);
608        let gid = parts[1].parse().unwrap_or(65534);
609        (uid, gid)
610    } else {
611        (65534, 65534)
612    }
613}
614
615#[async_trait]
616impl StepBody for ContainerdStep {
617    async fn run(
618        &mut self,
619        context: &StepExecutionContext<'_>,
620    ) -> wfe_core::Result<ExecutionResult> {
621        let step_name = context.step.name.as_deref().unwrap_or("unknown");
622        let namespace = DEFAULT_NAMESPACE;
623
624        // 1. Connect to containerd.
625        let addr = &self.config.containerd_addr;
626        tracing::info!(addr = %addr, "connecting to containerd daemon");
627        let channel = Self::connect(addr).await?;
628
629        // Verify connectivity.
630        {
631            let mut version_client = VersionClient::new(channel.clone());
632            let req = Self::with_namespace((), namespace);
633            match version_client.version(req).await {
634                Ok(resp) => {
635                    let v = resp.into_inner();
636                    tracing::info!(
637                        version = %v.version,
638                        revision = %v.revision,
639                        "connected to containerd"
640                    );
641                }
642                Err(e) => {
643                    return Err(WfeError::StepExecution(format!(
644                        "containerd version check failed: {e}"
645                    )));
646                }
647            }
648        }
649
650        // 2. Ensure image exists (based on pull policy).
651        let should_check = !matches!(self.config.pull.as_str(), "never");
652        if should_check {
653            Self::ensure_image(&channel, &self.config.image, namespace).await?;
654        }
655
656        // Generate a unique container ID.
657        let container_id = format!("wfe-{}", uuid::Uuid::new_v4());
658
659        // 3. Merge environment variables.
660        let mut merged_env: HashMap<String, String> = HashMap::new();
661        if let Some(data_obj) = context.workflow.data.as_object() {
662            for (key, value) in data_obj {
663                let env_key = key.to_uppercase();
664                let env_val = match value {
665                    serde_json::Value::String(s) => s.clone(),
666                    other => other.to_string(),
667                };
668                merged_env.insert(env_key, env_val);
669            }
670        }
671        // Config env overrides workflow data.
672        for (key, value) in &self.config.env {
673            merged_env.insert(key.clone(), value.clone());
674        }
675
676        // 4. Build OCI spec.
677        let oci_spec = self.build_oci_spec(&merged_env);
678
679        // 5. Create container.
680        tracing::info!(container_id = %container_id, image = %self.config.image, "creating container");
681        let mut containers_client = ContainersClient::new(channel.clone());
682        let create_req = Self::with_namespace(
683            CreateContainerRequest {
684                container: Some(Container {
685                    id: container_id.clone(),
686                    image: self.config.image.clone(),
687                    runtime: Some(Runtime {
688                        name: "io.containerd.runc.v2".to_string(),
689                        options: None,
690                    }),
691                    spec: Some(oci_spec),
692                    snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
693                    snapshot_key: container_id.clone(),
694                    labels: HashMap::new(),
695                    created_at: None,
696                    updated_at: None,
697                    extensions: HashMap::new(),
698                    sandbox: String::new(),
699                }),
700            },
701            namespace,
702        );
703
704        containers_client
705            .create(create_req)
706            .await
707            .map_err(|e| WfeError::StepExecution(format!("failed to create container: {e}")))?;
708
709        // 6. Prepare snapshot with the image's layers as parent.
710        let mut snapshots_client = SnapshotsClient::new(channel.clone());
711
712        let mounts = {
713            // First try: see if a snapshot was already prepared for this container.
714            let mounts_req = Self::with_namespace(
715                MountsRequest {
716                    snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
717                    key: container_id.clone(),
718                },
719                namespace,
720            );
721
722            match snapshots_client.mounts(mounts_req).await {
723                Ok(resp) => resp.into_inner().mounts,
724                Err(_) => {
725                    // Resolve the image's chain ID to use as snapshot parent.
726                    let parent = if should_check {
727                        Self::resolve_image_chain_id(&channel, &self.config.image, namespace)
728                            .await?
729                    } else {
730                        String::new()
731                    };
732
733                    let prepare_req = Self::with_namespace(
734                        PrepareSnapshotRequest {
735                            snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
736                            key: container_id.clone(),
737                            parent,
738                            labels: HashMap::new(),
739                        },
740                        namespace,
741                    );
742                    snapshots_client
743                        .prepare(prepare_req)
744                        .await
745                        .map_err(|e| {
746                            WfeError::StepExecution(format!("failed to prepare snapshot: {e}"))
747                        })?
748                        .into_inner()
749                        .mounts
750                }
751            }
752        };
753
754        // 7. Create FIFO paths for stdout/stderr capture.
755        // Use WFE_IO_DIR if set (e.g., a shared mount with a remote containerd daemon),
756        // otherwise fall back to the system temp directory.
757        let io_base = std::env::var("WFE_IO_DIR")
758            .map(std::path::PathBuf::from)
759            .unwrap_or_else(|_| std::env::temp_dir());
760        let tmp_dir = io_base.join(format!("wfe-io-{container_id}"));
761        std::fs::create_dir_all(&tmp_dir)
762            .map_err(|e| WfeError::StepExecution(format!("failed to create IO temp dir: {e}")))?;
763
764        let stdout_path = tmp_dir.join("stdout");
765        let stderr_path = tmp_dir.join("stderr");
766
767        // Create empty files for the shim to write stdout/stderr to.
768        // We use regular files instead of FIFOs because FIFOs don't work
769        // across filesystem boundaries (e.g., virtiofs mounts with Lima VMs).
770        for path in [&stdout_path, &stderr_path] {
771            let _ = std::fs::remove_file(path);
772            std::fs::File::create(path).map_err(|e| {
773                WfeError::StepExecution(format!("failed to create IO file {}: {e}", path.display()))
774            })?;
775            // Ensure the remote shim can write to it.
776            #[cfg(unix)]
777            {
778                use std::os::unix::fs::PermissionsExt;
779                std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o666)).ok();
780            }
781        }
782
783        let stdout_str = stdout_path.to_string_lossy().to_string();
784        let stderr_str = stderr_path.to_string_lossy().to_string();
785
786        // 8. Create task.
787        let mut tasks_client = TasksClient::new(channel.clone());
788
789        let create_task_req = Self::with_namespace(
790            CreateTaskRequest {
791                container_id: container_id.clone(),
792                rootfs: mounts,
793                stdin: String::new(),
794                stdout: stdout_str.clone(),
795                stderr: stderr_str.clone(),
796                terminal: false,
797                checkpoint: None,
798                options: None,
799                runtime_path: String::new(),
800            },
801            namespace,
802        );
803
804        tasks_client
805            .create(create_task_req)
806            .await
807            .map_err(|e| WfeError::StepExecution(format!("failed to create task: {e}")))?;
808
809        // Start the task.
810        let start_req = Self::with_namespace(
811            StartRequest {
812                container_id: container_id.clone(),
813                exec_id: String::new(),
814            },
815            namespace,
816        );
817
818        tasks_client
819            .start(start_req)
820            .await
821            .map_err(|e| WfeError::StepExecution(format!("failed to start task: {e}")))?;
822
823        tracing::info!(container_id = %container_id, "task started");
824
825        // 9. Wait for task completion (with optional timeout).
826        let wait_req = Self::with_namespace(
827            WaitRequest {
828                container_id: container_id.clone(),
829                exec_id: String::new(),
830            },
831            namespace,
832        );
833
834        let wait_result = if let Some(timeout_ms) = self.config.timeout_ms {
835            let duration = std::time::Duration::from_millis(timeout_ms);
836            match tokio::time::timeout(duration, tasks_client.wait(wait_req)).await {
837                Ok(result) => result,
838                Err(_) => {
839                    // Attempt cleanup before returning timeout error.
840                    let _ = Self::cleanup(&channel, &container_id, namespace).await;
841                    let _ = std::fs::remove_dir_all(&tmp_dir);
842                    return Err(WfeError::StepExecution(format!(
843                        "container execution timed out after {timeout_ms}ms"
844                    )));
845                }
846            }
847        } else {
848            tasks_client.wait(wait_req).await
849        };
850
851        let exit_status = match wait_result {
852            Ok(resp) => resp.into_inner().exit_status,
853            Err(e) => {
854                let _ = Self::cleanup(&channel, &container_id, namespace).await;
855                let _ = std::fs::remove_dir_all(&tmp_dir);
856                return Err(WfeError::StepExecution(format!(
857                    "failed waiting for task: {e}"
858                )));
859            }
860        };
861
862        // 10. Read captured output from files.
863        let stdout_content = tokio::fs::read_to_string(&stdout_path)
864            .await
865            .unwrap_or_default();
866        let stderr_content = tokio::fs::read_to_string(&stderr_path)
867            .await
868            .unwrap_or_default();
869
870        // 11. Cleanup: delete task, then container.
871        if let Err(e) = Self::cleanup(&channel, &container_id, namespace).await {
872            tracing::warn!(container_id = %container_id, error = %e, "cleanup failed");
873        }
874        let _ = std::fs::remove_dir_all(&tmp_dir);
875
876        // 12. Check exit status.
877        let exit_code = exit_status as i32;
878        if exit_code != 0 {
879            return Err(WfeError::StepExecution(format!(
880                "container exited with code {exit_code}\nstdout: {stdout_content}\nstderr: {stderr_content}"
881            )));
882        }
883
884        // 13. Parse outputs and build result.
885        let parsed = Self::parse_outputs(&stdout_content);
886        let output_data = Self::build_output_data(
887            step_name,
888            &stdout_content,
889            &stderr_content,
890            exit_code,
891            &parsed,
892        );
893
894        Ok(ExecutionResult {
895            proceed: true,
896            output_data: Some(output_data),
897            ..Default::default()
898        })
899    }
900}
901
902impl ContainerdStep {
903    /// Delete the task and container, best-effort.
904    pub(crate) async fn cleanup(
905        channel: &Channel,
906        container_id: &str,
907        namespace: &str,
908    ) -> Result<(), WfeError> {
909        let mut tasks_client = TasksClient::new(channel.clone());
910        let mut containers_client = ContainersClient::new(channel.clone());
911
912        // Delete task (ignore errors — it may already be gone).
913        let del_task_req = Self::with_namespace(
914            DeleteTaskRequest {
915                container_id: container_id.to_string(),
916            },
917            namespace,
918        );
919        let _ = tasks_client.delete(del_task_req).await;
920
921        // Delete container.
922        let del_container_req = Self::with_namespace(
923            DeleteContainerRequest {
924                id: container_id.to_string(),
925            },
926            namespace,
927        );
928        containers_client
929            .delete(del_container_req)
930            .await
931            .map_err(|e| WfeError::StepExecution(format!("failed to delete container: {e}")))?;
932
933        Ok(())
934    }
935}
936
937#[cfg(test)]
938mod tests {
939    use super::*;
940    use crate::config::{TlsConfig, VolumeMountConfig};
941    use pretty_assertions::assert_eq;
942
943    fn minimal_config() -> ContainerdConfig {
944        ContainerdConfig {
945            image: "alpine:3.18".to_string(),
946            command: None,
947            run: Some("echo hello".to_string()),
948            env: HashMap::new(),
949            volumes: vec![],
950            working_dir: None,
951            user: "65534:65534".to_string(),
952            network: "none".to_string(),
953            memory: None,
954            cpu: None,
955            pull: "if-not-present".to_string(),
956            containerd_addr: "/run/containerd/containerd.sock".to_string(),
957            cli: "nerdctl".to_string(),
958            tls: TlsConfig::default(),
959            registry_auth: HashMap::new(),
960            timeout_ms: None,
961        }
962    }
963
964    // ── parse_outputs ──────────────────────────────────────────────────
965
966    #[test]
967    fn parse_outputs_empty() {
968        let outputs = ContainerdStep::parse_outputs("");
969        assert!(outputs.is_empty());
970    }
971
972    #[test]
973    fn parse_outputs_single() {
974        let stdout = "some log line\n##wfe[output version=1.2.3]\nmore logs\n";
975        let outputs = ContainerdStep::parse_outputs(stdout);
976        assert_eq!(outputs.len(), 1);
977        assert_eq!(outputs.get("version").unwrap(), "1.2.3");
978    }
979
980    #[test]
981    fn parse_outputs_multiple() {
982        let stdout = "##wfe[output foo=bar]\n##wfe[output baz=qux]\n";
983        let outputs = ContainerdStep::parse_outputs(stdout);
984        assert_eq!(outputs.len(), 2);
985        assert_eq!(outputs.get("foo").unwrap(), "bar");
986        assert_eq!(outputs.get("baz").unwrap(), "qux");
987    }
988
989    #[test]
990    fn parse_outputs_mixed_with_regular_stdout() {
991        let stdout = "Starting container...\n\
992                      Pulling image...\n\
993                      ##wfe[output digest=sha256:abc123]\n\
994                      Running tests...\n\
995                      ##wfe[output result=pass]\n\
996                      Done.\n";
997        let outputs = ContainerdStep::parse_outputs(stdout);
998        assert_eq!(outputs.len(), 2);
999        assert_eq!(outputs.get("digest").unwrap(), "sha256:abc123");
1000        assert_eq!(outputs.get("result").unwrap(), "pass");
1001    }
1002
1003    #[test]
1004    fn parse_outputs_no_wfe_lines() {
1005        let stdout = "line 1\nline 2\nline 3\n";
1006        let outputs = ContainerdStep::parse_outputs(stdout);
1007        assert!(outputs.is_empty());
1008    }
1009
1010    #[test]
1011    fn parse_outputs_value_with_equals_sign() {
1012        let stdout = "##wfe[output url=https://example.com?a=1&b=2]\n";
1013        let outputs = ContainerdStep::parse_outputs(stdout);
1014        assert_eq!(outputs.len(), 1);
1015        assert_eq!(outputs.get("url").unwrap(), "https://example.com?a=1&b=2");
1016    }
1017
1018    #[test]
1019    fn parse_outputs_ignores_malformed_lines() {
1020        let stdout = "##wfe[output ]\n\
1021                      ##wfe[output no_equals]\n\
1022                      ##wfe[output valid=yes]\n\
1023                      ##wfe[output_extra bad=val]\n";
1024        let outputs = ContainerdStep::parse_outputs(stdout);
1025        assert_eq!(outputs.len(), 1);
1026        assert_eq!(outputs.get("valid").unwrap(), "yes");
1027    }
1028
1029    #[test]
1030    fn parse_outputs_overwrites_duplicate_keys() {
1031        let stdout = "##wfe[output key=first]\n##wfe[output key=second]\n";
1032        let outputs = ContainerdStep::parse_outputs(stdout);
1033        assert_eq!(outputs.len(), 1);
1034        assert_eq!(outputs.get("key").unwrap(), "second");
1035    }
1036
1037    // ── build_output_data ──────────────────────────────────────────────
1038
1039    #[test]
1040    fn build_output_data_basic() {
1041        let parsed = HashMap::from([("result".to_string(), "success".to_string())]);
1042        let data = ContainerdStep::build_output_data("my_step", "hello world\n", "", 0, &parsed);
1043
1044        let obj = data.as_object().unwrap();
1045        assert_eq!(obj.get("result").unwrap(), "success");
1046        assert_eq!(obj.get("my_step.stdout").unwrap(), "hello world\n");
1047        assert_eq!(obj.get("my_step.stderr").unwrap(), "");
1048        assert_eq!(obj.get("my_step.exit_code").unwrap(), 0);
1049    }
1050
1051    #[test]
1052    fn build_output_data_no_parsed_outputs() {
1053        let data = ContainerdStep::build_output_data("step1", "out", "err", 1, &HashMap::new());
1054
1055        let obj = data.as_object().unwrap();
1056        assert_eq!(obj.len(), 3); // stdout, stderr, exit_code
1057        assert_eq!(obj.get("step1.stdout").unwrap(), "out");
1058        assert_eq!(obj.get("step1.stderr").unwrap(), "err");
1059        assert_eq!(obj.get("step1.exit_code").unwrap(), 1);
1060    }
1061
1062    #[test]
1063    fn build_output_data_with_multiple_parsed_outputs() {
1064        let parsed = HashMap::from([
1065            ("a".to_string(), "1".to_string()),
1066            ("b".to_string(), "2".to_string()),
1067            ("c".to_string(), "3".to_string()),
1068        ]);
1069        let data = ContainerdStep::build_output_data("s", "", "", 0, &parsed);
1070
1071        let obj = data.as_object().unwrap();
1072        assert_eq!(obj.get("a").unwrap(), "1");
1073        assert_eq!(obj.get("b").unwrap(), "2");
1074        assert_eq!(obj.get("c").unwrap(), "3");
1075        // Plus the 3 standard keys
1076        assert_eq!(obj.len(), 6);
1077    }
1078
1079    #[test]
1080    fn build_output_data_negative_exit_code() {
1081        let data = ContainerdStep::build_output_data("s", "", "", -1, &HashMap::new());
1082        let obj = data.as_object().unwrap();
1083        assert_eq!(obj.get("s.exit_code").unwrap(), -1);
1084    }
1085
1086    // ── parse_user_spec ────────────────────────────────────────────────
1087
1088    #[test]
1089    fn parse_user_spec_normal() {
1090        assert_eq!(parse_user_spec("1000:1000"), (1000, 1000));
1091    }
1092
1093    #[test]
1094    fn parse_user_spec_root() {
1095        assert_eq!(parse_user_spec("0:0"), (0, 0));
1096    }
1097
1098    #[test]
1099    fn parse_user_spec_default() {
1100        assert_eq!(parse_user_spec("65534:65534"), (65534, 65534));
1101    }
1102
1103    #[test]
1104    fn parse_user_spec_invalid_falls_back() {
1105        assert_eq!(parse_user_spec("abc"), (65534, 65534));
1106    }
1107
1108    // ── build_oci_spec ─────────────────────────────────────────────────
1109
1110    #[test]
1111    fn build_oci_spec_minimal() {
1112        let step = ContainerdStep::new(minimal_config());
1113        let env = HashMap::new();
1114        let spec = step.build_oci_spec(&env);
1115
1116        assert_eq!(
1117            spec.type_url,
1118            "types.containerd.io/opencontainers/runtime-spec/1/Spec"
1119        );
1120        assert!(!spec.value.is_empty());
1121
1122        // Deserialize and verify.
1123        let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1124        assert_eq!(parsed["ociVersion"], "1.0.2");
1125        assert_eq!(parsed["process"]["args"][0], "/bin/sh");
1126        assert_eq!(parsed["process"]["args"][1], "-c");
1127        assert_eq!(parsed["process"]["args"][2], "echo hello");
1128        assert_eq!(parsed["process"]["user"]["uid"], 65534);
1129        assert_eq!(parsed["process"]["user"]["gid"], 65534);
1130        assert_eq!(parsed["process"]["cwd"], "/");
1131    }
1132
1133    #[test]
1134    fn build_oci_spec_with_command() {
1135        let mut config = minimal_config();
1136        config.run = None;
1137        config.command = Some(vec![
1138            "echo".to_string(),
1139            "hello".to_string(),
1140            "world".to_string(),
1141        ]);
1142        let step = ContainerdStep::new(config);
1143        let spec = step.build_oci_spec(&HashMap::new());
1144
1145        let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1146        assert_eq!(parsed["process"]["args"][0], "echo");
1147        assert_eq!(parsed["process"]["args"][1], "hello");
1148        assert_eq!(parsed["process"]["args"][2], "world");
1149    }
1150
1151    #[test]
1152    fn build_oci_spec_with_env() {
1153        let step = ContainerdStep::new(minimal_config());
1154        let env = HashMap::from([
1155            ("FOO".to_string(), "bar".to_string()),
1156            ("BAZ".to_string(), "qux".to_string()),
1157        ]);
1158        let spec = step.build_oci_spec(&env);
1159
1160        let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1161        let env_arr: Vec<String> = parsed["process"]["env"]
1162            .as_array()
1163            .unwrap()
1164            .iter()
1165            .map(|v| v.as_str().unwrap().to_string())
1166            .collect();
1167
1168        assert!(env_arr.contains(&"FOO=bar".to_string()));
1169        assert!(env_arr.contains(&"BAZ=qux".to_string()));
1170    }
1171
1172    #[test]
1173    fn build_oci_spec_with_working_dir() {
1174        let mut config = minimal_config();
1175        config.working_dir = Some("/app".to_string());
1176        let step = ContainerdStep::new(config);
1177        let spec = step.build_oci_spec(&HashMap::new());
1178
1179        let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1180        assert_eq!(parsed["process"]["cwd"], "/app");
1181    }
1182
1183    #[test]
1184    fn build_oci_spec_with_user() {
1185        let mut config = minimal_config();
1186        config.user = "1000:2000".to_string();
1187        let step = ContainerdStep::new(config);
1188        let spec = step.build_oci_spec(&HashMap::new());
1189
1190        let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1191        assert_eq!(parsed["process"]["user"]["uid"], 1000);
1192        assert_eq!(parsed["process"]["user"]["gid"], 2000);
1193    }
1194
1195    #[test]
1196    fn build_oci_spec_with_volumes() {
1197        let mut config = minimal_config();
1198        config.volumes = vec![
1199            VolumeMountConfig {
1200                source: "/host/data".to_string(),
1201                target: "/container/data".to_string(),
1202                readonly: false,
1203            },
1204            VolumeMountConfig {
1205                source: "/host/config".to_string(),
1206                target: "/etc/config".to_string(),
1207                readonly: true,
1208            },
1209        ];
1210        let step = ContainerdStep::new(config);
1211        let spec = step.build_oci_spec(&HashMap::new());
1212
1213        let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1214        let mounts = parsed["mounts"].as_array().unwrap();
1215        // 3 default + 2 user = 5
1216        assert_eq!(mounts.len(), 5);
1217
1218        let bind_mounts: Vec<&serde_json::Value> =
1219            mounts.iter().filter(|m| m["type"] == "bind").collect();
1220        assert_eq!(bind_mounts.len(), 2);
1221
1222        let ro_mount = bind_mounts
1223            .iter()
1224            .find(|m| m["destination"] == "/etc/config")
1225            .unwrap();
1226        let opts: Vec<String> = ro_mount["options"]
1227            .as_array()
1228            .unwrap()
1229            .iter()
1230            .map(|v| v.as_str().unwrap().to_string())
1231            .collect();
1232        assert!(opts.contains(&"ro".to_string()));
1233    }
1234
1235    #[test]
1236    fn build_oci_spec_no_command_no_run() {
1237        let mut config = minimal_config();
1238        config.run = None;
1239        config.command = None;
1240        let step = ContainerdStep::new(config);
1241        let spec = step.build_oci_spec(&HashMap::new());
1242
1243        let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1244        assert!(parsed["process"]["args"].as_array().unwrap().is_empty());
1245    }
1246
1247    // ── connect ────────────────────────────────────────────────────────
1248
1249    #[tokio::test]
1250    async fn connect_to_missing_unix_socket_returns_error() {
1251        let err = ContainerdStep::connect("/tmp/nonexistent-wfe-containerd-test.sock")
1252            .await
1253            .unwrap_err();
1254        let msg = format!("{err}");
1255        assert!(
1256            msg.contains("socket not found"),
1257            "expected 'socket not found' error, got: {msg}"
1258        );
1259    }
1260
1261    #[tokio::test]
1262    async fn connect_to_missing_unix_socket_with_scheme_returns_error() {
1263        let err = ContainerdStep::connect("unix:///tmp/nonexistent-wfe-containerd-test.sock")
1264            .await
1265            .unwrap_err();
1266        let msg = format!("{err}");
1267        assert!(
1268            msg.contains("socket not found"),
1269            "expected 'socket not found' error, got: {msg}"
1270        );
1271    }
1272
1273    #[tokio::test]
1274    async fn connect_to_invalid_tcp_returns_error() {
1275        let err = ContainerdStep::connect("tcp://127.0.0.1:1")
1276            .await
1277            .unwrap_err();
1278        let msg = format!("{err}");
1279        assert!(
1280            msg.contains("failed to connect"),
1281            "expected connection error, got: {msg}"
1282        );
1283    }
1284
1285    // ── ContainerdStep::new ────────────────────────────────────────────
1286
1287    #[test]
1288    fn new_creates_step_with_config() {
1289        let config = minimal_config();
1290        let step = ContainerdStep::new(config);
1291        assert_eq!(step.config.image, "alpine:3.18");
1292        assert_eq!(
1293            step.config.containerd_addr,
1294            "/run/containerd/containerd.sock"
1295        );
1296    }
1297}
1298
1299/// Integration tests that require a live containerd daemon.
1300#[cfg(test)]
1301mod e2e_tests {
1302    use super::*;
1303
1304    /// Returns the containerd socket address if available, or None.
1305    fn containerd_addr() -> Option<String> {
1306        let addr = std::env::var("WFE_CONTAINERD_ADDR").unwrap_or_else(|_| {
1307            format!(
1308                "unix://{}/.lima/wfe-test/sock/containerd.sock",
1309                std::env::var("HOME").unwrap_or_else(|_| "/root".to_string())
1310            )
1311        });
1312
1313        let socket_path = addr.strip_prefix("unix://").unwrap_or(addr.as_str());
1314
1315        if Path::new(socket_path).exists() {
1316            Some(addr)
1317        } else {
1318            None
1319        }
1320    }
1321
1322    #[tokio::test]
1323    async fn e2e_version_check() {
1324        let Some(addr) = containerd_addr() else {
1325            eprintln!("SKIP: containerd socket not available");
1326            return;
1327        };
1328
1329        let channel = ContainerdStep::connect(&addr).await.unwrap();
1330        let mut client = VersionClient::new(channel);
1331
1332        let req = ContainerdStep::with_namespace((), DEFAULT_NAMESPACE);
1333        let resp = client.version(req).await.unwrap();
1334        let version = resp.into_inner();
1335
1336        assert!(!version.version.is_empty(), "version should not be empty");
1337        assert!(!version.revision.is_empty(), "revision should not be empty");
1338        eprintln!(
1339            "containerd version={} revision={}",
1340            version.version, version.revision
1341        );
1342    }
1343}