Skip to main content

wfe_containerd/
step.rs

1use std::collections::HashMap;
2use std::path::Path;
3
4use async_trait::async_trait;
5use tonic::transport::{Channel, Endpoint, Uri};
6use wfe_core::WfeError;
7use wfe_core::models::ExecutionResult;
8use wfe_core::traits::step::{StepBody, StepExecutionContext};
9
10use wfe_containerd_protos::containerd::services::containers::v1::{
11    Container, CreateContainerRequest, DeleteContainerRequest, container::Runtime,
12    containers_client::ContainersClient,
13};
14use wfe_containerd_protos::containerd::services::content::v1::{
15    ReadContentRequest, WriteAction, WriteContentRequest, content_client::ContentClient,
16};
17use wfe_containerd_protos::containerd::services::diff::v1::{
18    ApplyRequest, diff_client::DiffClient,
19};
20use wfe_containerd_protos::containerd::services::images::v1::{
21    GetImageRequest, images_client::ImagesClient,
22};
23use wfe_containerd_protos::containerd::services::snapshots::v1::{
24    MountsRequest, PrepareSnapshotRequest, snapshots_client::SnapshotsClient,
25};
26use wfe_containerd_protos::containerd::types::Descriptor;
27use wfe_containerd_protos::containerd::services::tasks::v1::{
28    CreateTaskRequest, DeleteTaskRequest, StartRequest, WaitRequest, tasks_client::TasksClient,
29};
30use wfe_containerd_protos::containerd::services::version::v1::version_client::VersionClient;
31
32use crate::config::ContainerdConfig;
33
34/// Default containerd namespace.
35const DEFAULT_NAMESPACE: &str = "default";
36
37/// Default snapshotter for rootless containerd.
38const DEFAULT_SNAPSHOTTER: &str = "overlayfs";
39
40/// Containerdstep.
41pub struct ContainerdStep {
42    config: ContainerdConfig,
43    /// Tracks artifact content descriptors uploaded during mount_artifacts.
44    /// Each tuple is (container_target, descriptor).
45    artifact_applies: Vec<(String, Descriptor)>,
46}
47
48impl ContainerdStep {
49    pub fn new(config: ContainerdConfig) -> Self {
50        Self {
51            config,
52            artifact_applies: Vec::new(),
53        }
54    }
55
56    /// Connect to the containerd daemon and return a raw tonic `Channel`.
57    ///
58    /// Supports Unix socket paths (bare `/path` or `unix:///path`) and
59    /// TCP/HTTP endpoints.
60    pub(crate) async fn connect(addr: &str) -> Result<Channel, WfeError> {
61        let channel = if addr.starts_with('/') || addr.starts_with("unix://") {
62            let socket_path = addr.strip_prefix("unix://").unwrap_or(addr).to_string();
63
64            if !Path::new(&socket_path).exists() {
65                return Err(WfeError::StepExecution(format!(
66                    "containerd socket not found: {socket_path}"
67                )));
68            }
69
70            Endpoint::try_from("http://[::]:50051")
71                .map_err(|e| WfeError::StepExecution(format!("failed to create endpoint: {e}")))?
72                .connect_with_connector(tower::service_fn(move |_: Uri| {
73                    let path = socket_path.clone();
74                    async move {
75                        tokio::net::UnixStream::connect(path)
76                            .await
77                            .map(hyper_util::rt::TokioIo::new)
78                    }
79                }))
80                .await
81                .map_err(|e| {
82                    WfeError::StepExecution(format!(
83                        "failed to connect to containerd via Unix socket at {addr}: {e}"
84                    ))
85                })?
86        } else {
87            let connect_addr = if addr.starts_with("tcp://") {
88                addr.replacen("tcp://", "http://", 1)
89            } else {
90                addr.to_string()
91            };
92
93            Endpoint::from_shared(connect_addr.clone())
94                .map_err(|e| {
95                    WfeError::StepExecution(format!(
96                        "invalid containerd endpoint {connect_addr}: {e}"
97                    ))
98                })?
99                .timeout(std::time::Duration::from_secs(30))
100                .connect()
101                .await
102                .map_err(|e| {
103                    WfeError::StepExecution(format!(
104                        "failed to connect to containerd at {connect_addr}: {e}"
105                    ))
106                })?
107        };
108
109        Ok(channel)
110    }
111
112    /// Upload bytes to containerd's content store.
113    ///
114    /// Computes the sha256 digest client-side, streams the data via the
115    /// `Content.Write` bidirectional gRPC, and commits with the expected
116    /// digest. Returns the digest and size.
117    async fn upload_content(
118        channel: &Channel,
119        namespace: &str,
120        data: &[u8],
121    ) -> Result<(String, i64), WfeError> {
122        use sha2::{Digest, Sha256};
123
124        let digest = format!("sha256:{:x}", Sha256::digest(data));
125        let size = data.len() as i64;
126
127        let mut client = ContentClient::new(channel.clone());
128
129        // Create a stream of WriteContentRequest messages.
130        let mut requests = Vec::new();
131
132        // Write the data in a single COMMIT message (small enough for artifacts).
133        requests.push(WriteContentRequest {
134            action: WriteAction::Commit as i32,
135            r#ref: digest.clone(),
136            total: size,
137            expected: digest.clone(),
138            offset: 0,
139            data: data.to_vec(),
140            labels: HashMap::new(),
141        });
142
143        let stream = tokio_stream::iter(requests);
144        let req = Self::with_namespace(stream, namespace);
145
146        let mut response = client
147            .write(req)
148            .await
149            .map_err(|e| WfeError::StepExecution(format!("content write failed: {e}")))?
150            .into_inner();
151
152        // Wait for the commit response.
153        while let Some(resp) = response
154            .message()
155            .await
156            .map_err(|e| WfeError::StepExecution(format!("content write stream error: {e}")))?
157        {
158            if resp.action == WriteAction::Commit as i32 {
159                break;
160            }
161        }
162
163        Ok((digest, size))
164    }
165
166    /// Apply a diff (tar archive) to a set of snapshot mounts.
167    ///
168    /// Uses containerd's `Diff.Apply` RPC to extract the artifact into the
169    /// container's rootfs snapshot.
170    async fn apply_diff(
171        channel: &Channel,
172        namespace: &str,
173        mounts: Vec<wfe_containerd_protos::containerd::types::Mount>,
174        descriptor: Descriptor,
175    ) -> Result<(), WfeError> {
176        let mut client = DiffClient::new(channel.clone());
177
178        let req = Self::with_namespace(
179            ApplyRequest {
180                diff: Some(descriptor),
181                mounts,
182                payloads: HashMap::new(),
183                sync_fs: false,
184            },
185            namespace,
186        );
187
188        client
189            .apply(req)
190            .await
191            .map_err(|e| WfeError::StepExecution(format!("Diff.Apply failed: {e}")))?;
192
193        Ok(())
194    }
195
196    /// Check whether an image exists in containerd's image store.
197    ///
198    /// Image pulling via raw containerd gRPC is complex (content store +
199    /// snapshots + transfer). For now we only verify the image exists and
200    /// return an error if it does not. Images must be pre-pulled via
201    /// `ctr image pull` or `nerdctl pull`.
202    ///
203    /// TODO: implement full image pull via TransferService or content ingest.
204    async fn ensure_image(channel: &Channel, image: &str, namespace: &str) -> Result<(), WfeError> {
205        let mut client = ImagesClient::new(channel.clone());
206
207        let mut req = tonic::Request::new(GetImageRequest {
208            name: image.to_string(),
209        });
210        req.metadata_mut()
211            .insert("containerd-namespace", namespace.parse().unwrap());
212
213        match client.get(req).await {
214            Ok(_) => Ok(()),
215            Err(status) => Err(WfeError::StepExecution(format!(
216                "image '{image}' not found in containerd (namespace={namespace}). \
217                 Pre-pull it with: ctr -n {namespace} image pull {image}  \
218                 (gRPC status: {status})"
219            ))),
220        }
221    }
222
223    /// Resolve the snapshot chain ID for an image.
224    ///
225    /// This reads the image manifest and config from the content store to
226    /// compute the chain ID of the topmost layer. The chain ID is used as
227    /// the parent snapshot when preparing a writable rootfs for a container.
228    ///
229    /// Chain ID computation follows the OCI image spec:
230    ///   chain_id[0] = diff_id[0]
231    ///   chain_id[n] = sha256(chain_id[n-1] + " " + diff_id[n])
232    async fn resolve_image_chain_id(
233        channel: &Channel,
234        image: &str,
235        namespace: &str,
236    ) -> Result<String, WfeError> {
237        use sha2::{Digest, Sha256};
238
239        // 1. Get the image record to find the manifest digest.
240        let mut images_client = ImagesClient::new(channel.clone());
241        let req = Self::with_namespace(
242            GetImageRequest {
243                name: image.to_string(),
244            },
245            namespace,
246        );
247        let image_resp = images_client
248            .get(req)
249            .await
250            .map_err(|e| WfeError::StepExecution(format!("failed to get image '{image}': {e}")))?;
251        let img = image_resp
252            .into_inner()
253            .image
254            .ok_or_else(|| WfeError::StepExecution(format!("image '{image}' has no record")))?;
255        let target = img.target.ok_or_else(|| {
256            WfeError::StepExecution(format!("image '{image}' has no target descriptor"))
257        })?;
258
259        // The target might be an index (multi-platform) or a manifest.
260        // Read the content and determine based on mediaType.
261        let manifest_digest = target.digest.clone();
262        let manifest_bytes = Self::read_content(channel, &manifest_digest, namespace).await?;
263        let manifest_json: serde_json::Value = serde_json::from_slice(&manifest_bytes)
264            .map_err(|e| WfeError::StepExecution(format!("failed to parse manifest: {e}")))?;
265
266        // 2. If it's an index, pick the matching platform manifest.
267        let manifest_json = if manifest_json.get("manifests").is_some() {
268            // OCI image index — find the platform-matching manifest.
269            let arch = std::env::consts::ARCH;
270            let oci_arch = match arch {
271                "aarch64" => "arm64",
272                "x86_64" => "amd64",
273                other => other,
274            };
275            let manifests = manifest_json["manifests"].as_array().ok_or_else(|| {
276                WfeError::StepExecution("image index has no manifests array".to_string())
277            })?;
278            let platform_manifest = manifests
279                .iter()
280                .find(|m| {
281                    m.get("platform")
282                        .and_then(|p| p.get("architecture"))
283                        .and_then(|a| a.as_str())
284                        == Some(oci_arch)
285                })
286                .ok_or_else(|| {
287                    WfeError::StepExecution(format!(
288                        "no manifest for architecture '{oci_arch}' in image index"
289                    ))
290                })?;
291            let digest = platform_manifest["digest"].as_str().ok_or_else(|| {
292                WfeError::StepExecution("platform manifest has no digest".to_string())
293            })?;
294            let bytes = Self::read_content(channel, digest, namespace).await?;
295            serde_json::from_slice(&bytes).map_err(|e| {
296                WfeError::StepExecution(format!("failed to parse platform manifest: {e}"))
297            })?
298        } else {
299            manifest_json
300        };
301
302        // 3. Get the config digest from the manifest.
303        let config_digest = manifest_json["config"]["digest"]
304            .as_str()
305            .ok_or_else(|| WfeError::StepExecution("manifest has no config.digest".to_string()))?;
306
307        // 4. Read the image config.
308        let config_bytes = Self::read_content(channel, config_digest, namespace).await?;
309        let config_json: serde_json::Value = serde_json::from_slice(&config_bytes)
310            .map_err(|e| WfeError::StepExecution(format!("failed to parse image config: {e}")))?;
311
312        // 5. Extract diff_ids and compute chain ID.
313        let diff_ids = config_json["rootfs"]["diff_ids"]
314            .as_array()
315            .ok_or_else(|| {
316                WfeError::StepExecution("image config has no rootfs.diff_ids".to_string())
317            })?;
318
319        if diff_ids.is_empty() {
320            return Err(WfeError::StepExecution(
321                "image has no layers (empty diff_ids)".to_string(),
322            ));
323        }
324
325        let mut chain_id = diff_ids[0]
326            .as_str()
327            .ok_or_else(|| WfeError::StepExecution("diff_id is not a string".to_string()))?
328            .to_string();
329
330        for diff_id in &diff_ids[1..] {
331            let diff = diff_id
332                .as_str()
333                .ok_or_else(|| WfeError::StepExecution("diff_id is not a string".to_string()))?;
334            let mut hasher = Sha256::new();
335            hasher.update(format!("{chain_id} {diff}"));
336            chain_id = format!("sha256:{:x}", hasher.finalize());
337        }
338
339        tracing::debug!(image = image, chain_id = %chain_id, "resolved image chain ID");
340        Ok(chain_id)
341    }
342
343    /// Read content from the containerd content store by digest.
344    async fn read_content(
345        channel: &Channel,
346        digest: &str,
347        namespace: &str,
348    ) -> Result<Vec<u8>, WfeError> {
349        use tokio_stream::StreamExt;
350
351        let mut client = ContentClient::new(channel.clone());
352        let req = Self::with_namespace(
353            ReadContentRequest {
354                digest: digest.to_string(),
355                offset: 0,
356                size: 0, // read all
357            },
358            namespace,
359        );
360
361        let mut stream = client
362            .read(req)
363            .await
364            .map_err(|e| WfeError::StepExecution(format!("failed to read content {digest}: {e}")))?
365            .into_inner();
366
367        let mut data = Vec::new();
368        while let Some(chunk) = stream.next().await {
369            let chunk = chunk.map_err(|e| {
370                WfeError::StepExecution(format!("error reading content {digest}: {e}"))
371            })?;
372            data.extend_from_slice(&chunk.data);
373        }
374
375        Ok(data)
376    }
377
378    /// Build a minimal OCI runtime spec as a `prost_types::Any`.
379    ///
380    /// The spec is serialized as JSON and wrapped in a protobuf Any with
381    /// the containerd OCI spec type URL.
382    pub(crate) fn build_oci_spec(&self, merged_env: &HashMap<String, String>) -> prost_types::Any {
383        // Build the args array for the process.
384        let args: Vec<String> = if let Some(ref run) = self.config.run {
385            vec!["/bin/sh".to_string(), "-c".to_string(), run.clone()]
386        } else if let Some(ref command) = self.config.command {
387            command.clone()
388        } else {
389            vec![]
390        };
391
392        // Build env in KEY=VALUE form.
393        let env: Vec<String> = merged_env.iter().map(|(k, v)| format!("{k}={v}")).collect();
394
395        // Build mounts.
396        let mut mounts = vec![
397            serde_json::json!({
398                "destination": "/proc",
399                "type": "proc",
400                "source": "proc",
401                "options": ["nosuid", "noexec", "nodev"]
402            }),
403            serde_json::json!({
404                "destination": "/dev",
405                "type": "tmpfs",
406                "source": "tmpfs",
407                "options": ["nosuid", "strictatime", "mode=755", "size=65536k"]
408            }),
409            serde_json::json!({
410                "destination": "/sys",
411                "type": "sysfs",
412                "source": "sysfs",
413                "options": ["nosuid", "noexec", "nodev", "ro"]
414            }),
415        ];
416
417        for vol in &self.config.volumes {
418            let mut opts = vec!["rbind".to_string()];
419            if vol.readonly {
420                opts.push("ro".to_string());
421            }
422            mounts.push(serde_json::json!({
423                "destination": vol.target,
424                "type": "bind",
425                "source": vol.source,
426                "options": opts,
427            }));
428        }
429
430        // Parse user / group.
431        let (uid, gid) = parse_user_spec(&self.config.user);
432
433        let mut process = serde_json::json!({
434            "terminal": false,
435            "user": {
436                "uid": uid,
437                "gid": gid,
438            },
439            "args": args,
440            "env": env,
441            "cwd": self.config.working_dir.as_deref().unwrap_or("/"),
442        });
443
444        // Add capabilities. When running as root, grant the default Docker
445        // capability set so tools like apt-get work. Non-root gets nothing.
446        let caps = if uid == 0 {
447            serde_json::json!([
448                "CAP_AUDIT_WRITE",
449                "CAP_CHOWN",
450                "CAP_DAC_OVERRIDE",
451                "CAP_FOWNER",
452                "CAP_FSETID",
453                "CAP_KILL",
454                "CAP_MKNOD",
455                "CAP_NET_BIND_SERVICE",
456                "CAP_NET_RAW",
457                "CAP_SETFCAP",
458                "CAP_SETGID",
459                "CAP_SETPCAP",
460                "CAP_SETUID",
461                "CAP_SYS_CHROOT",
462            ])
463        } else {
464            serde_json::json!([])
465        };
466        process["capabilities"] = serde_json::json!({
467            "bounding": caps,
468            "effective": caps,
469            "inheritable": caps,
470            "permitted": caps,
471            "ambient": caps,
472        });
473
474        let spec = serde_json::json!({
475            "ociVersion": "1.0.2",
476            "process": process,
477            "root": {
478                "path": "rootfs",
479                "readonly": false,
480            },
481            "mounts": mounts,
482            "linux": {
483                "namespaces": [
484                    { "type": "pid" },
485                    { "type": "ipc" },
486                    { "type": "uts" },
487                    { "type": "mount" },
488                ],
489            },
490        });
491
492        let json_bytes = serde_json::to_vec(&spec).expect("OCI spec serialization cannot fail");
493
494        prost_types::Any {
495            type_url: "types.containerd.io/opencontainers/runtime-spec/1/Spec".to_string(),
496            value: json_bytes,
497        }
498    }
499
500    /// Inject a `containerd-namespace` header into a tonic request.
501    pub(crate) fn with_namespace<T>(req: T, namespace: &str) -> tonic::Request<T> {
502        let mut request = tonic::Request::new(req);
503        request
504            .metadata_mut()
505            .insert("containerd-namespace", namespace.parse().unwrap());
506        request
507    }
508
509    /// Start a long-running service container via the containerd gRPC API.
510    ///
511    /// Used by `ContainerdServiceProvider` to provision infrastructure services.
512    /// The container runs on the host network so its ports are accessible on 127.0.0.1.
513    /// Unlike step execution, this does NOT wait for the container to exit.
514    pub async fn run_service(
515        addr: &str,
516        container_id: &str,
517        image: &str,
518        env: &std::collections::HashMap<String, String>,
519    ) -> Result<(), WfeError> {
520        let namespace = DEFAULT_NAMESPACE;
521        let channel = Self::connect(addr).await?;
522
523        // Verify image exists.
524        Self::ensure_image(&channel, image, namespace).await?;
525
526        // Build a config for host-network service container.
527        let config = ContainerdConfig {
528            image: image.to_string(),
529            command: None,
530            run: None,
531            env: env.clone(),
532            volumes: vec![],
533            working_dir: None,
534            user: "0:0".to_string(),
535            network: "host".to_string(),
536            memory: None,
537            cpu: None,
538            pull: "if-not-present".to_string(),
539            containerd_addr: addr.to_string(),
540            cli: "nerdctl".to_string(),
541            tls: Default::default(),
542            registry_auth: Default::default(),
543            inputs: None,
544            timeout_ms: None,
545        };
546
547        let step = Self::new(config);
548        let oci_spec = step.build_oci_spec(env);
549
550        // Create container.
551        let mut containers_client = ContainersClient::new(channel.clone());
552        let create_req = Self::with_namespace(
553            CreateContainerRequest {
554                container: Some(Container {
555                    id: container_id.to_string(),
556                    image: image.to_string(),
557                    runtime: Some(Runtime {
558                        name: "io.containerd.runc.v2".to_string(),
559                        options: None,
560                    }),
561                    spec: Some(oci_spec),
562                    snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
563                    snapshot_key: container_id.to_string(),
564                    labels: HashMap::new(),
565                    created_at: None,
566                    updated_at: None,
567                    extensions: HashMap::new(),
568                    sandbox: String::new(),
569                }),
570            },
571            namespace,
572        );
573        containers_client.create(create_req).await.map_err(|e| {
574            WfeError::StepExecution(format!("failed to create service container: {e}"))
575        })?;
576
577        // Prepare snapshot.
578        let mut snapshots_client = SnapshotsClient::new(channel.clone());
579        let mounts = {
580            let mounts_req = Self::with_namespace(
581                MountsRequest {
582                    snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
583                    key: container_id.to_string(),
584                },
585                namespace,
586            );
587            match snapshots_client.mounts(mounts_req).await {
588                Ok(resp) => resp.into_inner().mounts,
589                Err(_) => {
590                    let parent = Self::resolve_image_chain_id(&channel, image, namespace).await?;
591                    let prepare_req = Self::with_namespace(
592                        PrepareSnapshotRequest {
593                            snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
594                            key: container_id.to_string(),
595                            parent,
596                            labels: HashMap::new(),
597                        },
598                        namespace,
599                    );
600                    snapshots_client
601                        .prepare(prepare_req)
602                        .await
603                        .map_err(|e| {
604                            WfeError::StepExecution(format!("failed to prepare snapshot: {e}"))
605                        })?
606                        .into_inner()
607                        .mounts
608                }
609            }
610        };
611
612        // Create and start task (no stdout/stderr capture for services).
613        let mut tasks_client = TasksClient::new(channel.clone());
614        let create_task_req = Self::with_namespace(
615            CreateTaskRequest {
616                container_id: container_id.to_string(),
617                rootfs: mounts,
618                stdin: String::new(),
619                stdout: String::new(),
620                stderr: String::new(),
621                terminal: false,
622                checkpoint: None,
623                options: None,
624                runtime_path: String::new(),
625            },
626            namespace,
627        );
628        tasks_client
629            .create(create_task_req)
630            .await
631            .map_err(|e| WfeError::StepExecution(format!("failed to create service task: {e}")))?;
632
633        let start_req = Self::with_namespace(
634            StartRequest {
635                container_id: container_id.to_string(),
636                exec_id: String::new(),
637            },
638            namespace,
639        );
640        tasks_client
641            .start(start_req)
642            .await
643            .map_err(|e| WfeError::StepExecution(format!("failed to start service task: {e}")))?;
644
645        tracing::info!(container_id = %container_id, image = %image, "service container started");
646        Ok(())
647    }
648
649    /// Stop and clean up a service container via the containerd gRPC API.
650    pub async fn cleanup_service(addr: &str, container_id: &str) -> Result<(), WfeError> {
651        let channel = Self::connect(addr).await?;
652        Self::cleanup(&channel, container_id, DEFAULT_NAMESPACE).await
653    }
654
655    /// Parse `##wfe[output key=value]` lines from stdout.
656    pub fn parse_outputs(stdout: &str) -> HashMap<String, String> {
657        let mut outputs = HashMap::new();
658        for line in stdout.lines() {
659            if let Some(rest) = line.strip_prefix("##wfe[output ")
660                && let Some(rest) = rest.strip_suffix(']')
661                && let Some(eq_pos) = rest.find('=')
662            {
663                let name = rest[..eq_pos].trim().to_string();
664                let value = rest[eq_pos + 1..].to_string();
665                outputs.insert(name, value);
666            }
667        }
668        outputs
669    }
670
671    /// Build the output data JSON value from step execution results.
672    pub fn build_output_data(
673        step_name: &str,
674        stdout: &str,
675        stderr: &str,
676        exit_code: i32,
677        parsed_outputs: &HashMap<String, String>,
678    ) -> serde_json::Value {
679        let mut outputs = serde_json::Map::new();
680        for (key, value) in parsed_outputs {
681            outputs.insert(key.clone(), serde_json::Value::String(value.clone()));
682        }
683        outputs.insert(
684            format!("{step_name}.stdout"),
685            serde_json::Value::String(stdout.to_string()),
686        );
687        outputs.insert(
688            format!("{step_name}.stderr"),
689            serde_json::Value::String(stderr.to_string()),
690        );
691        outputs.insert(
692            format!("{step_name}.exit_code"),
693            serde_json::Value::Number(serde_json::Number::from(exit_code)),
694        );
695        serde_json::Value::Object(outputs)
696    }
697}
698
699/// Parse a "uid:gid" string into (u32, u32). Falls back to (65534, 65534).
700fn parse_user_spec(user: &str) -> (u32, u32) {
701    let parts: Vec<&str> = user.split(':').collect();
702    if parts.len() == 2 {
703        let uid = parts[0].parse().unwrap_or(65534);
704        let gid = parts[1].parse().unwrap_or(65534);
705        (uid, gid)
706    } else {
707        (65534, 65534)
708    }
709}
710
711#[async_trait]
712impl StepBody for ContainerdStep {
713    async fn mount_artifacts(
714        &mut self,
715        context: &StepExecutionContext<'_>,
716    ) -> wfe_core::Result<()> {
717        let Some(ref inputs) = self.config.inputs else {
718            return Ok(());
719        };
720
721        if inputs.is_empty() {
722            return Ok(());
723        }
724
725        let Some(volume) = context.artifact_volume else {
726            return Err(WfeError::StepExecution(
727                "artifact volume required but not provided".to_string(),
728            ));
729        };
730
731        let addr = &self.config.containerd_addr;
732        let channel = Self::connect(addr).await?;
733        let namespace = DEFAULT_NAMESPACE;
734
735        for (name, container_target) in inputs {
736            let prefix = container_target.strip_prefix('/').unwrap_or(container_target);
737            let repackaged = volume
738                .repackage_with_prefix(name, prefix)
739                .map_err(|e| WfeError::StepExecution(format!("failed to repackage artifact: {e}")))?;
740
741            let (digest, size) = Self::upload_content(&channel, namespace, &repackaged).await?;
742
743            self.artifact_applies.push((
744                container_target.clone(),
745                Descriptor {
746                    media_type: "application/vnd.oci.image.layer.v1.tar+gzip".to_string(),
747                    digest,
748                    size,
749                    annotations: HashMap::new(),
750                },
751            ));
752        }
753
754        Ok(())
755    }
756
757    async fn unmount_artifacts(
758        &mut self,
759        _context: &StepExecutionContext<'_>,
760    ) -> wfe_core::Result<()> {
761        // Content store objects are best-effort cleaned up by containerd's GC.
762        // We don't have a bulk-delete API for individual digests.
763        self.artifact_applies.clear();
764        Ok(())
765    }
766
767    async fn run(
768        &mut self,
769        context: &StepExecutionContext<'_>,
770    ) -> wfe_core::Result<ExecutionResult> {
771        let step_name = context.step.name.as_deref().unwrap_or("unknown");
772        let namespace = DEFAULT_NAMESPACE;
773
774        // 1. Connect to containerd.
775        let addr = &self.config.containerd_addr;
776        tracing::info!(addr = %addr, "connecting to containerd daemon");
777        let channel = Self::connect(addr).await?;
778
779        // Verify connectivity.
780        {
781            let mut version_client = VersionClient::new(channel.clone());
782            let req = Self::with_namespace((), namespace);
783            match version_client.version(req).await {
784                Ok(resp) => {
785                    let v = resp.into_inner();
786                    tracing::info!(
787                        version = %v.version,
788                        revision = %v.revision,
789                        "connected to containerd"
790                    );
791                }
792                Err(e) => {
793                    return Err(WfeError::StepExecution(format!(
794                        "containerd version check failed: {e}"
795                    )));
796                }
797            }
798        }
799
800        // 2. Ensure image exists (based on pull policy).
801        let should_check = !matches!(self.config.pull.as_str(), "never");
802        if should_check {
803            Self::ensure_image(&channel, &self.config.image, namespace).await?;
804        }
805
806        // Generate a unique container ID.
807        let container_id = format!("wfe-{}", uuid::Uuid::new_v4());
808
809        // 3. Merge environment variables.
810        let mut merged_env: HashMap<String, String> = HashMap::new();
811        if let Some(data_obj) = context.workflow.data.as_object() {
812            for (key, value) in data_obj {
813                let env_key = key.to_uppercase();
814                let env_val = match value {
815                    serde_json::Value::String(s) => s.clone(),
816                    other => other.to_string(),
817                };
818                merged_env.insert(env_key, env_val);
819            }
820        }
821        // Config env overrides workflow data.
822        for (key, value) in &self.config.env {
823            merged_env.insert(key.clone(), value.clone());
824        }
825
826        // 4. Build OCI spec.
827        let oci_spec = self.build_oci_spec(&merged_env);
828
829        // 5. Create container.
830        tracing::info!(container_id = %container_id, image = %self.config.image, "creating container");
831        let mut containers_client = ContainersClient::new(channel.clone());
832        let create_req = Self::with_namespace(
833            CreateContainerRequest {
834                container: Some(Container {
835                    id: container_id.clone(),
836                    image: self.config.image.clone(),
837                    runtime: Some(Runtime {
838                        name: "io.containerd.runc.v2".to_string(),
839                        options: None,
840                    }),
841                    spec: Some(oci_spec),
842                    snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
843                    snapshot_key: container_id.clone(),
844                    labels: HashMap::new(),
845                    created_at: None,
846                    updated_at: None,
847                    extensions: HashMap::new(),
848                    sandbox: String::new(),
849                }),
850            },
851            namespace,
852        );
853
854        containers_client
855            .create(create_req)
856            .await
857            .map_err(|e| WfeError::StepExecution(format!("failed to create container: {e}")))?;
858
859        // 6. Prepare snapshot with the image's layers as parent.
860        let mut snapshots_client = SnapshotsClient::new(channel.clone());
861
862        let mounts = {
863            // First try: see if a snapshot was already prepared for this container.
864            let mounts_req = Self::with_namespace(
865                MountsRequest {
866                    snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
867                    key: container_id.clone(),
868                },
869                namespace,
870            );
871
872            match snapshots_client.mounts(mounts_req).await {
873                Ok(resp) => resp.into_inner().mounts,
874                Err(_) => {
875                    // Resolve the image's chain ID to use as snapshot parent.
876                    let parent = if should_check {
877                        Self::resolve_image_chain_id(&channel, &self.config.image, namespace)
878                            .await?
879                    } else {
880                        String::new()
881                    };
882
883                    let prepare_req = Self::with_namespace(
884                        PrepareSnapshotRequest {
885                            snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
886                            key: container_id.clone(),
887                            parent,
888                            labels: HashMap::new(),
889                        },
890                        namespace,
891                    );
892                    snapshots_client
893                        .prepare(prepare_req)
894                        .await
895                        .map_err(|e| {
896                            WfeError::StepExecution(format!("failed to prepare snapshot: {e}"))
897                        })?
898                        .into_inner()
899                        .mounts
900                }
901            }
902        };
903
904        // 6b. Apply artifact diffs to the snapshot.
905        for (_target, descriptor) in &self.artifact_applies {
906            Self::apply_diff(&channel, namespace, mounts.clone(), descriptor.clone()).await?;
907        }
908
909        // 7. Create FIFO paths for stdout/stderr capture.
910        // Use WFE_IO_DIR if set (e.g., a shared mount with a remote containerd daemon),
911        // otherwise fall back to the system temp directory.
912        let io_base = std::env::var("WFE_IO_DIR")
913            .map(std::path::PathBuf::from)
914            .unwrap_or_else(|_| std::env::temp_dir());
915        let tmp_dir = io_base.join(format!("wfe-io-{container_id}"));
916        std::fs::create_dir_all(&tmp_dir)
917            .map_err(|e| WfeError::StepExecution(format!("failed to create IO temp dir: {e}")))?;
918
919        let stdout_path = tmp_dir.join("stdout");
920        let stderr_path = tmp_dir.join("stderr");
921
922        // Create empty files for the shim to write stdout/stderr to.
923        // We use regular files instead of FIFOs because FIFOs don't work
924        // across filesystem boundaries (e.g., virtiofs mounts with Lima VMs).
925        for path in [&stdout_path, &stderr_path] {
926            let _ = std::fs::remove_file(path);
927            std::fs::File::create(path).map_err(|e| {
928                WfeError::StepExecution(format!("failed to create IO file {}: {e}", path.display()))
929            })?;
930            // Ensure the remote shim can write to it.
931            #[cfg(unix)]
932            {
933                use std::os::unix::fs::PermissionsExt;
934                std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o666)).ok();
935            }
936        }
937
938        let stdout_str = stdout_path.to_string_lossy().to_string();
939        let stderr_str = stderr_path.to_string_lossy().to_string();
940
941        // 8. Create task.
942        let mut tasks_client = TasksClient::new(channel.clone());
943
944        let create_task_req = Self::with_namespace(
945            CreateTaskRequest {
946                container_id: container_id.clone(),
947                rootfs: mounts,
948                stdin: String::new(),
949                stdout: stdout_str.clone(),
950                stderr: stderr_str.clone(),
951                terminal: false,
952                checkpoint: None,
953                options: None,
954                runtime_path: String::new(),
955            },
956            namespace,
957        );
958
959        tasks_client
960            .create(create_task_req)
961            .await
962            .map_err(|e| WfeError::StepExecution(format!("failed to create task: {e}")))?;
963
964        // Start the task.
965        let start_req = Self::with_namespace(
966            StartRequest {
967                container_id: container_id.clone(),
968                exec_id: String::new(),
969            },
970            namespace,
971        );
972
973        tasks_client
974            .start(start_req)
975            .await
976            .map_err(|e| WfeError::StepExecution(format!("failed to start task: {e}")))?;
977
978        tracing::info!(container_id = %container_id, "task started");
979
980        // 9. Wait for task completion (with optional timeout).
981        let wait_req = Self::with_namespace(
982            WaitRequest {
983                container_id: container_id.clone(),
984                exec_id: String::new(),
985            },
986            namespace,
987        );
988
989        let wait_result = if let Some(timeout_ms) = self.config.timeout_ms {
990            let duration = std::time::Duration::from_millis(timeout_ms);
991            match tokio::time::timeout(duration, tasks_client.wait(wait_req)).await {
992                Ok(result) => result,
993                Err(_) => {
994                    // Attempt cleanup before returning timeout error.
995                    let _ = Self::cleanup(&channel, &container_id, namespace).await;
996                    let _ = std::fs::remove_dir_all(&tmp_dir);
997                    return Err(WfeError::StepExecution(format!(
998                        "container execution timed out after {timeout_ms}ms"
999                    )));
1000                }
1001            }
1002        } else {
1003            tasks_client.wait(wait_req).await
1004        };
1005
1006        let exit_status = match wait_result {
1007            Ok(resp) => resp.into_inner().exit_status,
1008            Err(e) => {
1009                let _ = Self::cleanup(&channel, &container_id, namespace).await;
1010                let _ = std::fs::remove_dir_all(&tmp_dir);
1011                return Err(WfeError::StepExecution(format!(
1012                    "failed waiting for task: {e}"
1013                )));
1014            }
1015        };
1016
1017        // 10. Read captured output from files.
1018        let stdout_content = tokio::fs::read_to_string(&stdout_path)
1019            .await
1020            .unwrap_or_default();
1021        let stderr_content = tokio::fs::read_to_string(&stderr_path)
1022            .await
1023            .unwrap_or_default();
1024
1025        // 11. Cleanup: delete task, then container.
1026        if let Err(e) = Self::cleanup(&channel, &container_id, namespace).await {
1027            tracing::warn!(container_id = %container_id, error = %e, "cleanup failed");
1028        }
1029        let _ = std::fs::remove_dir_all(&tmp_dir);
1030
1031        // 12. Check exit status.
1032        let exit_code = exit_status as i32;
1033        if exit_code != 0 {
1034            return Err(WfeError::StepExecution(format!(
1035                "container exited with code {exit_code}\nstdout: {stdout_content}\nstderr: {stderr_content}"
1036            )));
1037        }
1038
1039        // 13. Parse outputs and build result.
1040        let parsed = Self::parse_outputs(&stdout_content);
1041        let output_data = Self::build_output_data(
1042            step_name,
1043            &stdout_content,
1044            &stderr_content,
1045            exit_code,
1046            &parsed,
1047        );
1048
1049        Ok(ExecutionResult {
1050            proceed: true,
1051            output_data: Some(output_data),
1052            ..Default::default()
1053        })
1054    }
1055}
1056
1057impl ContainerdStep {
1058    /// Delete the task and container, best-effort.
1059    pub(crate) async fn cleanup(
1060        channel: &Channel,
1061        container_id: &str,
1062        namespace: &str,
1063    ) -> Result<(), WfeError> {
1064        let mut tasks_client = TasksClient::new(channel.clone());
1065        let mut containers_client = ContainersClient::new(channel.clone());
1066
1067        // Delete task (ignore errors — it may already be gone).
1068        let del_task_req = Self::with_namespace(
1069            DeleteTaskRequest {
1070                container_id: container_id.to_string(),
1071            },
1072            namespace,
1073        );
1074        let _ = tasks_client.delete(del_task_req).await;
1075
1076        // Delete container.
1077        let del_container_req = Self::with_namespace(
1078            DeleteContainerRequest {
1079                id: container_id.to_string(),
1080            },
1081            namespace,
1082        );
1083        containers_client
1084            .delete(del_container_req)
1085            .await
1086            .map_err(|e| WfeError::StepExecution(format!("failed to delete container: {e}")))?;
1087
1088        Ok(())
1089    }
1090}
1091
1092#[cfg(test)]
1093mod tests {
1094    use super::*;
1095    use crate::config::{TlsConfig, VolumeMountConfig};
1096    use pretty_assertions::assert_eq;
1097
1098    fn minimal_config() -> ContainerdConfig {
1099        ContainerdConfig {
1100            image: "alpine:3.18".to_string(),
1101            command: None,
1102            run: Some("echo hello".to_string()),
1103            env: HashMap::new(),
1104            volumes: vec![],
1105            working_dir: None,
1106            user: "65534:65534".to_string(),
1107            network: "none".to_string(),
1108            memory: None,
1109            cpu: None,
1110            pull: "if-not-present".to_string(),
1111            containerd_addr: "/run/containerd/containerd.sock".to_string(),
1112            cli: "nerdctl".to_string(),
1113            tls: TlsConfig::default(),
1114            registry_auth: HashMap::new(),
1115            inputs: None,
1116            timeout_ms: None,
1117        }
1118    }
1119
1120    // ── parse_outputs ──────────────────────────────────────────────────
1121
1122    #[test]
1123    fn parse_outputs_empty() {
1124        let outputs = ContainerdStep::parse_outputs("");
1125        assert!(outputs.is_empty());
1126    }
1127
1128    #[test]
1129    fn parse_outputs_single() {
1130        let stdout = "some log line\n##wfe[output version=1.2.3]\nmore logs\n";
1131        let outputs = ContainerdStep::parse_outputs(stdout);
1132        assert_eq!(outputs.len(), 1);
1133        assert_eq!(outputs.get("version").unwrap(), "1.2.3");
1134    }
1135
1136    #[test]
1137    fn parse_outputs_multiple() {
1138        let stdout = "##wfe[output foo=bar]\n##wfe[output baz=qux]\n";
1139        let outputs = ContainerdStep::parse_outputs(stdout);
1140        assert_eq!(outputs.len(), 2);
1141        assert_eq!(outputs.get("foo").unwrap(), "bar");
1142        assert_eq!(outputs.get("baz").unwrap(), "qux");
1143    }
1144
1145    #[test]
1146    fn parse_outputs_mixed_with_regular_stdout() {
1147        let stdout = "Starting container...\n\
1148                      Pulling image...\n\
1149                      ##wfe[output digest=sha256:abc123]\n\
1150                      Running tests...\n\
1151                      ##wfe[output result=pass]\n\
1152                      Done.\n";
1153        let outputs = ContainerdStep::parse_outputs(stdout);
1154        assert_eq!(outputs.len(), 2);
1155        assert_eq!(outputs.get("digest").unwrap(), "sha256:abc123");
1156        assert_eq!(outputs.get("result").unwrap(), "pass");
1157    }
1158
1159    #[test]
1160    fn parse_outputs_no_wfe_lines() {
1161        let stdout = "line 1\nline 2\nline 3\n";
1162        let outputs = ContainerdStep::parse_outputs(stdout);
1163        assert!(outputs.is_empty());
1164    }
1165
1166    #[test]
1167    fn parse_outputs_value_with_equals_sign() {
1168        let stdout = "##wfe[output url=https://example.com?a=1&b=2]\n";
1169        let outputs = ContainerdStep::parse_outputs(stdout);
1170        assert_eq!(outputs.len(), 1);
1171        assert_eq!(outputs.get("url").unwrap(), "https://example.com?a=1&b=2");
1172    }
1173
1174    #[test]
1175    fn parse_outputs_ignores_malformed_lines() {
1176        let stdout = "##wfe[output ]\n\
1177                      ##wfe[output no_equals]\n\
1178                      ##wfe[output valid=yes]\n\
1179                      ##wfe[output_extra bad=val]\n";
1180        let outputs = ContainerdStep::parse_outputs(stdout);
1181        assert_eq!(outputs.len(), 1);
1182        assert_eq!(outputs.get("valid").unwrap(), "yes");
1183    }
1184
1185    #[test]
1186    fn parse_outputs_overwrites_duplicate_keys() {
1187        let stdout = "##wfe[output key=first]\n##wfe[output key=second]\n";
1188        let outputs = ContainerdStep::parse_outputs(stdout);
1189        assert_eq!(outputs.len(), 1);
1190        assert_eq!(outputs.get("key").unwrap(), "second");
1191    }
1192
1193    // ── build_output_data ──────────────────────────────────────────────
1194
1195    #[test]
1196    fn build_output_data_basic() {
1197        let parsed = HashMap::from([("result".to_string(), "success".to_string())]);
1198        let data = ContainerdStep::build_output_data("my_step", "hello world\n", "", 0, &parsed);
1199
1200        let obj = data.as_object().unwrap();
1201        assert_eq!(obj.get("result").unwrap(), "success");
1202        assert_eq!(obj.get("my_step.stdout").unwrap(), "hello world\n");
1203        assert_eq!(obj.get("my_step.stderr").unwrap(), "");
1204        assert_eq!(obj.get("my_step.exit_code").unwrap(), 0);
1205    }
1206
1207    #[test]
1208    fn build_output_data_no_parsed_outputs() {
1209        let data = ContainerdStep::build_output_data("step1", "out", "err", 1, &HashMap::new());
1210
1211        let obj = data.as_object().unwrap();
1212        assert_eq!(obj.len(), 3); // stdout, stderr, exit_code
1213        assert_eq!(obj.get("step1.stdout").unwrap(), "out");
1214        assert_eq!(obj.get("step1.stderr").unwrap(), "err");
1215        assert_eq!(obj.get("step1.exit_code").unwrap(), 1);
1216    }
1217
1218    #[test]
1219    fn build_output_data_with_multiple_parsed_outputs() {
1220        let parsed = HashMap::from([
1221            ("a".to_string(), "1".to_string()),
1222            ("b".to_string(), "2".to_string()),
1223            ("c".to_string(), "3".to_string()),
1224        ]);
1225        let data = ContainerdStep::build_output_data("s", "", "", 0, &parsed);
1226
1227        let obj = data.as_object().unwrap();
1228        assert_eq!(obj.get("a").unwrap(), "1");
1229        assert_eq!(obj.get("b").unwrap(), "2");
1230        assert_eq!(obj.get("c").unwrap(), "3");
1231        // Plus the 3 standard keys
1232        assert_eq!(obj.len(), 6);
1233    }
1234
1235    #[test]
1236    fn build_output_data_negative_exit_code() {
1237        let data = ContainerdStep::build_output_data("s", "", "", -1, &HashMap::new());
1238        let obj = data.as_object().unwrap();
1239        assert_eq!(obj.get("s.exit_code").unwrap(), -1);
1240    }
1241
1242    // ── parse_user_spec ────────────────────────────────────────────────
1243
1244    #[test]
1245    fn parse_user_spec_normal() {
1246        assert_eq!(parse_user_spec("1000:1000"), (1000, 1000));
1247    }
1248
1249    #[test]
1250    fn parse_user_spec_root() {
1251        assert_eq!(parse_user_spec("0:0"), (0, 0));
1252    }
1253
1254    #[test]
1255    fn parse_user_spec_default() {
1256        assert_eq!(parse_user_spec("65534:65534"), (65534, 65534));
1257    }
1258
1259    #[test]
1260    fn parse_user_spec_invalid_falls_back() {
1261        assert_eq!(parse_user_spec("abc"), (65534, 65534));
1262    }
1263
1264    // ── build_oci_spec ─────────────────────────────────────────────────
1265
1266    #[test]
1267    fn build_oci_spec_minimal() {
1268        let step = ContainerdStep::new(minimal_config());
1269        let env = HashMap::new();
1270        let spec = step.build_oci_spec(&env);
1271
1272        assert_eq!(
1273            spec.type_url,
1274            "types.containerd.io/opencontainers/runtime-spec/1/Spec"
1275        );
1276        assert!(!spec.value.is_empty());
1277
1278        // Deserialize and verify.
1279        let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1280        assert_eq!(parsed["ociVersion"], "1.0.2");
1281        assert_eq!(parsed["process"]["args"][0], "/bin/sh");
1282        assert_eq!(parsed["process"]["args"][1], "-c");
1283        assert_eq!(parsed["process"]["args"][2], "echo hello");
1284        assert_eq!(parsed["process"]["user"]["uid"], 65534);
1285        assert_eq!(parsed["process"]["user"]["gid"], 65534);
1286        assert_eq!(parsed["process"]["cwd"], "/");
1287    }
1288
1289    #[test]
1290    fn build_oci_spec_with_command() {
1291        let mut config = minimal_config();
1292        config.run = None;
1293        config.command = Some(vec![
1294            "echo".to_string(),
1295            "hello".to_string(),
1296            "world".to_string(),
1297        ]);
1298        let step = ContainerdStep::new(config);
1299        let spec = step.build_oci_spec(&HashMap::new());
1300
1301        let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1302        assert_eq!(parsed["process"]["args"][0], "echo");
1303        assert_eq!(parsed["process"]["args"][1], "hello");
1304        assert_eq!(parsed["process"]["args"][2], "world");
1305    }
1306
1307    #[test]
1308    fn build_oci_spec_with_env() {
1309        let step = ContainerdStep::new(minimal_config());
1310        let env = HashMap::from([
1311            ("FOO".to_string(), "bar".to_string()),
1312            ("BAZ".to_string(), "qux".to_string()),
1313        ]);
1314        let spec = step.build_oci_spec(&env);
1315
1316        let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1317        let env_arr: Vec<String> = parsed["process"]["env"]
1318            .as_array()
1319            .unwrap()
1320            .iter()
1321            .map(|v| v.as_str().unwrap().to_string())
1322            .collect();
1323
1324        assert!(env_arr.contains(&"FOO=bar".to_string()));
1325        assert!(env_arr.contains(&"BAZ=qux".to_string()));
1326    }
1327
1328    #[test]
1329    fn build_oci_spec_with_working_dir() {
1330        let mut config = minimal_config();
1331        config.working_dir = Some("/app".to_string());
1332        let step = ContainerdStep::new(config);
1333        let spec = step.build_oci_spec(&HashMap::new());
1334
1335        let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1336        assert_eq!(parsed["process"]["cwd"], "/app");
1337    }
1338
1339    #[test]
1340    fn build_oci_spec_with_user() {
1341        let mut config = minimal_config();
1342        config.user = "1000:2000".to_string();
1343        let step = ContainerdStep::new(config);
1344        let spec = step.build_oci_spec(&HashMap::new());
1345
1346        let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1347        assert_eq!(parsed["process"]["user"]["uid"], 1000);
1348        assert_eq!(parsed["process"]["user"]["gid"], 2000);
1349    }
1350
1351    #[test]
1352    fn build_oci_spec_with_volumes() {
1353        let mut config = minimal_config();
1354        config.volumes = vec![
1355            VolumeMountConfig {
1356                source: "/host/data".to_string(),
1357                target: "/container/data".to_string(),
1358                readonly: false,
1359            },
1360            VolumeMountConfig {
1361                source: "/host/config".to_string(),
1362                target: "/etc/config".to_string(),
1363                readonly: true,
1364            },
1365        ];
1366        let step = ContainerdStep::new(config);
1367        let spec = step.build_oci_spec(&HashMap::new());
1368
1369        let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1370        let mounts = parsed["mounts"].as_array().unwrap();
1371        // 3 default + 2 user = 5
1372        assert_eq!(mounts.len(), 5);
1373
1374        let bind_mounts: Vec<&serde_json::Value> =
1375            mounts.iter().filter(|m| m["type"] == "bind").collect();
1376        assert_eq!(bind_mounts.len(), 2);
1377
1378        let ro_mount = bind_mounts
1379            .iter()
1380            .find(|m| m["destination"] == "/etc/config")
1381            .unwrap();
1382        let opts: Vec<String> = ro_mount["options"]
1383            .as_array()
1384            .unwrap()
1385            .iter()
1386            .map(|v| v.as_str().unwrap().to_string())
1387            .collect();
1388        assert!(opts.contains(&"ro".to_string()));
1389    }
1390
1391    #[test]
1392    fn build_oci_spec_no_command_no_run() {
1393        let mut config = minimal_config();
1394        config.run = None;
1395        config.command = None;
1396        let step = ContainerdStep::new(config);
1397        let spec = step.build_oci_spec(&HashMap::new());
1398
1399        let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1400        assert!(parsed["process"]["args"].as_array().unwrap().is_empty());
1401    }
1402
1403    // ── connect ────────────────────────────────────────────────────────
1404
1405    #[tokio::test]
1406    async fn connect_to_missing_unix_socket_returns_error() {
1407        let err = ContainerdStep::connect("/tmp/nonexistent-wfe-containerd-test.sock")
1408            .await
1409            .unwrap_err();
1410        let msg = format!("{err}");
1411        assert!(
1412            msg.contains("socket not found"),
1413            "expected 'socket not found' error, got: {msg}"
1414        );
1415    }
1416
1417    #[tokio::test]
1418    async fn connect_to_missing_unix_socket_with_scheme_returns_error() {
1419        let err = ContainerdStep::connect("unix:///tmp/nonexistent-wfe-containerd-test.sock")
1420            .await
1421            .unwrap_err();
1422        let msg = format!("{err}");
1423        assert!(
1424            msg.contains("socket not found"),
1425            "expected 'socket not found' error, got: {msg}"
1426        );
1427    }
1428
1429    #[tokio::test]
1430    async fn connect_to_invalid_tcp_returns_error() {
1431        let err = ContainerdStep::connect("tcp://127.0.0.1:1")
1432            .await
1433            .unwrap_err();
1434        let msg = format!("{err}");
1435        assert!(
1436            msg.contains("failed to connect"),
1437            "expected connection error, got: {msg}"
1438        );
1439    }
1440
1441    // ── ContainerdStep::new ────────────────────────────────────────────
1442
1443    #[test]
1444    fn new_creates_step_with_config() {
1445        let config = minimal_config();
1446        let step = ContainerdStep::new(config);
1447        assert_eq!(step.config.image, "alpine:3.18");
1448        assert_eq!(
1449            step.config.containerd_addr,
1450            "/run/containerd/containerd.sock"
1451        );
1452    }
1453}
1454
1455/// Integration tests that require a live containerd daemon.
1456#[cfg(test)]
1457mod e2e_tests {
1458    use super::*;
1459
1460    /// Returns the containerd socket address if available, or None.
1461    fn containerd_addr() -> Option<String> {
1462        let addr = std::env::var("WFE_CONTAINERD_ADDR").unwrap_or_else(|_| {
1463            format!(
1464                "unix://{}/.lima/wfe-test/sock/containerd.sock",
1465                std::env::var("HOME").unwrap_or_else(|_| "/root".to_string())
1466            )
1467        });
1468
1469        let socket_path = addr.strip_prefix("unix://").unwrap_or(addr.as_str());
1470
1471        if Path::new(socket_path).exists() {
1472            Some(addr)
1473        } else {
1474            None
1475        }
1476    }
1477
1478    #[tokio::test]
1479    async fn e2e_version_check() {
1480        let Some(addr) = containerd_addr() else {
1481            eprintln!("SKIP: containerd socket not available");
1482            return;
1483        };
1484
1485        let channel = ContainerdStep::connect(&addr).await.unwrap();
1486        let mut client = VersionClient::new(channel);
1487
1488        let req = ContainerdStep::with_namespace((), DEFAULT_NAMESPACE);
1489        let resp = client.version(req).await.unwrap();
1490        let version = resp.into_inner();
1491
1492        assert!(!version.version.is_empty(), "version should not be empty");
1493        assert!(!version.revision.is_empty(), "revision should not be empty");
1494        eprintln!(
1495            "containerd version={} revision={}",
1496            version.version, version.revision
1497        );
1498    }
1499}