1use std::collections::HashMap;
2use std::path::Path;
3
4use async_trait::async_trait;
5use tonic::transport::{Channel, Endpoint, Uri};
6use wfe_core::WfeError;
7use wfe_core::models::ExecutionResult;
8use wfe_core::traits::step::{StepBody, StepExecutionContext};
9
10use wfe_containerd_protos::containerd::services::containers::v1::{
11 Container, CreateContainerRequest, DeleteContainerRequest, container::Runtime,
12 containers_client::ContainersClient,
13};
14use wfe_containerd_protos::containerd::services::content::v1::{
15 ReadContentRequest, content_client::ContentClient,
16};
17use wfe_containerd_protos::containerd::services::images::v1::{
18 GetImageRequest, images_client::ImagesClient,
19};
20use wfe_containerd_protos::containerd::services::snapshots::v1::{
21 MountsRequest, PrepareSnapshotRequest, snapshots_client::SnapshotsClient,
22};
23use wfe_containerd_protos::containerd::services::tasks::v1::{
24 CreateTaskRequest, DeleteTaskRequest, StartRequest, WaitRequest, tasks_client::TasksClient,
25};
26use wfe_containerd_protos::containerd::services::version::v1::version_client::VersionClient;
27
28use crate::config::ContainerdConfig;
29
30const DEFAULT_NAMESPACE: &str = "default";
32
33const DEFAULT_SNAPSHOTTER: &str = "overlayfs";
35
36pub struct ContainerdStep {
37 config: ContainerdConfig,
38}
39
40impl ContainerdStep {
41 pub fn new(config: ContainerdConfig) -> Self {
42 Self { config }
43 }
44
45 pub(crate) async fn connect(addr: &str) -> Result<Channel, WfeError> {
50 let channel = if addr.starts_with('/') || addr.starts_with("unix://") {
51 let socket_path = addr.strip_prefix("unix://").unwrap_or(addr).to_string();
52
53 if !Path::new(&socket_path).exists() {
54 return Err(WfeError::StepExecution(format!(
55 "containerd socket not found: {socket_path}"
56 )));
57 }
58
59 Endpoint::try_from("http://[::]:50051")
60 .map_err(|e| WfeError::StepExecution(format!("failed to create endpoint: {e}")))?
61 .connect_with_connector(tower::service_fn(move |_: Uri| {
62 let path = socket_path.clone();
63 async move {
64 tokio::net::UnixStream::connect(path)
65 .await
66 .map(hyper_util::rt::TokioIo::new)
67 }
68 }))
69 .await
70 .map_err(|e| {
71 WfeError::StepExecution(format!(
72 "failed to connect to containerd via Unix socket at {addr}: {e}"
73 ))
74 })?
75 } else {
76 let connect_addr = if addr.starts_with("tcp://") {
77 addr.replacen("tcp://", "http://", 1)
78 } else {
79 addr.to_string()
80 };
81
82 Endpoint::from_shared(connect_addr.clone())
83 .map_err(|e| {
84 WfeError::StepExecution(format!(
85 "invalid containerd endpoint {connect_addr}: {e}"
86 ))
87 })?
88 .timeout(std::time::Duration::from_secs(30))
89 .connect()
90 .await
91 .map_err(|e| {
92 WfeError::StepExecution(format!(
93 "failed to connect to containerd at {connect_addr}: {e}"
94 ))
95 })?
96 };
97
98 Ok(channel)
99 }
100
101 async fn ensure_image(channel: &Channel, image: &str, namespace: &str) -> Result<(), WfeError> {
110 let mut client = ImagesClient::new(channel.clone());
111
112 let mut req = tonic::Request::new(GetImageRequest {
113 name: image.to_string(),
114 });
115 req.metadata_mut()
116 .insert("containerd-namespace", namespace.parse().unwrap());
117
118 match client.get(req).await {
119 Ok(_) => Ok(()),
120 Err(status) => Err(WfeError::StepExecution(format!(
121 "image '{image}' not found in containerd (namespace={namespace}). \
122 Pre-pull it with: ctr -n {namespace} image pull {image} \
123 (gRPC status: {status})"
124 ))),
125 }
126 }
127
128 async fn resolve_image_chain_id(
138 channel: &Channel,
139 image: &str,
140 namespace: &str,
141 ) -> Result<String, WfeError> {
142 use sha2::{Digest, Sha256};
143
144 let mut images_client = ImagesClient::new(channel.clone());
146 let req = Self::with_namespace(
147 GetImageRequest {
148 name: image.to_string(),
149 },
150 namespace,
151 );
152 let image_resp = images_client
153 .get(req)
154 .await
155 .map_err(|e| WfeError::StepExecution(format!("failed to get image '{image}': {e}")))?;
156 let img = image_resp
157 .into_inner()
158 .image
159 .ok_or_else(|| WfeError::StepExecution(format!("image '{image}' has no record")))?;
160 let target = img.target.ok_or_else(|| {
161 WfeError::StepExecution(format!("image '{image}' has no target descriptor"))
162 })?;
163
164 let manifest_digest = target.digest.clone();
167 let manifest_bytes = Self::read_content(channel, &manifest_digest, namespace).await?;
168 let manifest_json: serde_json::Value = serde_json::from_slice(&manifest_bytes)
169 .map_err(|e| WfeError::StepExecution(format!("failed to parse manifest: {e}")))?;
170
171 let manifest_json = if manifest_json.get("manifests").is_some() {
173 let arch = std::env::consts::ARCH;
175 let oci_arch = match arch {
176 "aarch64" => "arm64",
177 "x86_64" => "amd64",
178 other => other,
179 };
180 let manifests = manifest_json["manifests"].as_array().ok_or_else(|| {
181 WfeError::StepExecution("image index has no manifests array".to_string())
182 })?;
183 let platform_manifest = manifests
184 .iter()
185 .find(|m| {
186 m.get("platform")
187 .and_then(|p| p.get("architecture"))
188 .and_then(|a| a.as_str())
189 == Some(oci_arch)
190 })
191 .ok_or_else(|| {
192 WfeError::StepExecution(format!(
193 "no manifest for architecture '{oci_arch}' in image index"
194 ))
195 })?;
196 let digest = platform_manifest["digest"].as_str().ok_or_else(|| {
197 WfeError::StepExecution("platform manifest has no digest".to_string())
198 })?;
199 let bytes = Self::read_content(channel, digest, namespace).await?;
200 serde_json::from_slice(&bytes).map_err(|e| {
201 WfeError::StepExecution(format!("failed to parse platform manifest: {e}"))
202 })?
203 } else {
204 manifest_json
205 };
206
207 let config_digest = manifest_json["config"]["digest"]
209 .as_str()
210 .ok_or_else(|| WfeError::StepExecution("manifest has no config.digest".to_string()))?;
211
212 let config_bytes = Self::read_content(channel, config_digest, namespace).await?;
214 let config_json: serde_json::Value = serde_json::from_slice(&config_bytes)
215 .map_err(|e| WfeError::StepExecution(format!("failed to parse image config: {e}")))?;
216
217 let diff_ids = config_json["rootfs"]["diff_ids"]
219 .as_array()
220 .ok_or_else(|| {
221 WfeError::StepExecution("image config has no rootfs.diff_ids".to_string())
222 })?;
223
224 if diff_ids.is_empty() {
225 return Err(WfeError::StepExecution(
226 "image has no layers (empty diff_ids)".to_string(),
227 ));
228 }
229
230 let mut chain_id = diff_ids[0]
231 .as_str()
232 .ok_or_else(|| WfeError::StepExecution("diff_id is not a string".to_string()))?
233 .to_string();
234
235 for diff_id in &diff_ids[1..] {
236 let diff = diff_id
237 .as_str()
238 .ok_or_else(|| WfeError::StepExecution("diff_id is not a string".to_string()))?;
239 let mut hasher = Sha256::new();
240 hasher.update(format!("{chain_id} {diff}"));
241 chain_id = format!("sha256:{:x}", hasher.finalize());
242 }
243
244 tracing::debug!(image = image, chain_id = %chain_id, "resolved image chain ID");
245 Ok(chain_id)
246 }
247
248 async fn read_content(
250 channel: &Channel,
251 digest: &str,
252 namespace: &str,
253 ) -> Result<Vec<u8>, WfeError> {
254 use tokio_stream::StreamExt;
255
256 let mut client = ContentClient::new(channel.clone());
257 let req = Self::with_namespace(
258 ReadContentRequest {
259 digest: digest.to_string(),
260 offset: 0,
261 size: 0, },
263 namespace,
264 );
265
266 let mut stream = client
267 .read(req)
268 .await
269 .map_err(|e| WfeError::StepExecution(format!("failed to read content {digest}: {e}")))?
270 .into_inner();
271
272 let mut data = Vec::new();
273 while let Some(chunk) = stream.next().await {
274 let chunk = chunk.map_err(|e| {
275 WfeError::StepExecution(format!("error reading content {digest}: {e}"))
276 })?;
277 data.extend_from_slice(&chunk.data);
278 }
279
280 Ok(data)
281 }
282
283 pub(crate) fn build_oci_spec(&self, merged_env: &HashMap<String, String>) -> prost_types::Any {
288 let args: Vec<String> = if let Some(ref run) = self.config.run {
290 vec!["/bin/sh".to_string(), "-c".to_string(), run.clone()]
291 } else if let Some(ref command) = self.config.command {
292 command.clone()
293 } else {
294 vec![]
295 };
296
297 let env: Vec<String> = merged_env.iter().map(|(k, v)| format!("{k}={v}")).collect();
299
300 let mut mounts = vec![
302 serde_json::json!({
303 "destination": "/proc",
304 "type": "proc",
305 "source": "proc",
306 "options": ["nosuid", "noexec", "nodev"]
307 }),
308 serde_json::json!({
309 "destination": "/dev",
310 "type": "tmpfs",
311 "source": "tmpfs",
312 "options": ["nosuid", "strictatime", "mode=755", "size=65536k"]
313 }),
314 serde_json::json!({
315 "destination": "/sys",
316 "type": "sysfs",
317 "source": "sysfs",
318 "options": ["nosuid", "noexec", "nodev", "ro"]
319 }),
320 ];
321
322 for vol in &self.config.volumes {
323 let mut opts = vec!["rbind".to_string()];
324 if vol.readonly {
325 opts.push("ro".to_string());
326 }
327 mounts.push(serde_json::json!({
328 "destination": vol.target,
329 "type": "bind",
330 "source": vol.source,
331 "options": opts,
332 }));
333 }
334
335 let (uid, gid) = parse_user_spec(&self.config.user);
337
338 let mut process = serde_json::json!({
339 "terminal": false,
340 "user": {
341 "uid": uid,
342 "gid": gid,
343 },
344 "args": args,
345 "env": env,
346 "cwd": self.config.working_dir.as_deref().unwrap_or("/"),
347 });
348
349 let caps = if uid == 0 {
352 serde_json::json!([
353 "CAP_AUDIT_WRITE",
354 "CAP_CHOWN",
355 "CAP_DAC_OVERRIDE",
356 "CAP_FOWNER",
357 "CAP_FSETID",
358 "CAP_KILL",
359 "CAP_MKNOD",
360 "CAP_NET_BIND_SERVICE",
361 "CAP_NET_RAW",
362 "CAP_SETFCAP",
363 "CAP_SETGID",
364 "CAP_SETPCAP",
365 "CAP_SETUID",
366 "CAP_SYS_CHROOT",
367 ])
368 } else {
369 serde_json::json!([])
370 };
371 process["capabilities"] = serde_json::json!({
372 "bounding": caps,
373 "effective": caps,
374 "inheritable": caps,
375 "permitted": caps,
376 "ambient": caps,
377 });
378
379 let spec = serde_json::json!({
380 "ociVersion": "1.0.2",
381 "process": process,
382 "root": {
383 "path": "rootfs",
384 "readonly": false,
385 },
386 "mounts": mounts,
387 "linux": {
388 "namespaces": [
389 { "type": "pid" },
390 { "type": "ipc" },
391 { "type": "uts" },
392 { "type": "mount" },
393 ],
394 },
395 });
396
397 let json_bytes = serde_json::to_vec(&spec).expect("OCI spec serialization cannot fail");
398
399 prost_types::Any {
400 type_url: "types.containerd.io/opencontainers/runtime-spec/1/Spec".to_string(),
401 value: json_bytes,
402 }
403 }
404
405 pub(crate) fn with_namespace<T>(req: T, namespace: &str) -> tonic::Request<T> {
407 let mut request = tonic::Request::new(req);
408 request
409 .metadata_mut()
410 .insert("containerd-namespace", namespace.parse().unwrap());
411 request
412 }
413
414 pub async fn run_service(
420 addr: &str,
421 container_id: &str,
422 image: &str,
423 env: &std::collections::HashMap<String, String>,
424 ) -> Result<(), WfeError> {
425 let namespace = DEFAULT_NAMESPACE;
426 let channel = Self::connect(addr).await?;
427
428 Self::ensure_image(&channel, image, namespace).await?;
430
431 let config = ContainerdConfig {
433 image: image.to_string(),
434 command: None,
435 run: None,
436 env: env.clone(),
437 volumes: vec![],
438 working_dir: None,
439 user: "0:0".to_string(),
440 network: "host".to_string(),
441 memory: None,
442 cpu: None,
443 pull: "if-not-present".to_string(),
444 containerd_addr: addr.to_string(),
445 cli: "nerdctl".to_string(),
446 tls: Default::default(),
447 registry_auth: Default::default(),
448 timeout_ms: None,
449 };
450
451 let step = Self::new(config);
452 let oci_spec = step.build_oci_spec(env);
453
454 let mut containers_client = ContainersClient::new(channel.clone());
456 let create_req = Self::with_namespace(
457 CreateContainerRequest {
458 container: Some(Container {
459 id: container_id.to_string(),
460 image: image.to_string(),
461 runtime: Some(Runtime {
462 name: "io.containerd.runc.v2".to_string(),
463 options: None,
464 }),
465 spec: Some(oci_spec),
466 snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
467 snapshot_key: container_id.to_string(),
468 labels: HashMap::new(),
469 created_at: None,
470 updated_at: None,
471 extensions: HashMap::new(),
472 sandbox: String::new(),
473 }),
474 },
475 namespace,
476 );
477 containers_client.create(create_req).await.map_err(|e| {
478 WfeError::StepExecution(format!("failed to create service container: {e}"))
479 })?;
480
481 let mut snapshots_client = SnapshotsClient::new(channel.clone());
483 let mounts = {
484 let mounts_req = Self::with_namespace(
485 MountsRequest {
486 snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
487 key: container_id.to_string(),
488 },
489 namespace,
490 );
491 match snapshots_client.mounts(mounts_req).await {
492 Ok(resp) => resp.into_inner().mounts,
493 Err(_) => {
494 let parent = Self::resolve_image_chain_id(&channel, image, namespace).await?;
495 let prepare_req = Self::with_namespace(
496 PrepareSnapshotRequest {
497 snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
498 key: container_id.to_string(),
499 parent,
500 labels: HashMap::new(),
501 },
502 namespace,
503 );
504 snapshots_client
505 .prepare(prepare_req)
506 .await
507 .map_err(|e| {
508 WfeError::StepExecution(format!("failed to prepare snapshot: {e}"))
509 })?
510 .into_inner()
511 .mounts
512 }
513 }
514 };
515
516 let mut tasks_client = TasksClient::new(channel.clone());
518 let create_task_req = Self::with_namespace(
519 CreateTaskRequest {
520 container_id: container_id.to_string(),
521 rootfs: mounts,
522 stdin: String::new(),
523 stdout: String::new(),
524 stderr: String::new(),
525 terminal: false,
526 checkpoint: None,
527 options: None,
528 runtime_path: String::new(),
529 },
530 namespace,
531 );
532 tasks_client
533 .create(create_task_req)
534 .await
535 .map_err(|e| WfeError::StepExecution(format!("failed to create service task: {e}")))?;
536
537 let start_req = Self::with_namespace(
538 StartRequest {
539 container_id: container_id.to_string(),
540 exec_id: String::new(),
541 },
542 namespace,
543 );
544 tasks_client
545 .start(start_req)
546 .await
547 .map_err(|e| WfeError::StepExecution(format!("failed to start service task: {e}")))?;
548
549 tracing::info!(container_id = %container_id, image = %image, "service container started");
550 Ok(())
551 }
552
553 pub async fn cleanup_service(addr: &str, container_id: &str) -> Result<(), WfeError> {
555 let channel = Self::connect(addr).await?;
556 Self::cleanup(&channel, container_id, DEFAULT_NAMESPACE).await
557 }
558
559 pub fn parse_outputs(stdout: &str) -> HashMap<String, String> {
561 let mut outputs = HashMap::new();
562 for line in stdout.lines() {
563 if let Some(rest) = line.strip_prefix("##wfe[output ")
564 && let Some(rest) = rest.strip_suffix(']')
565 && let Some(eq_pos) = rest.find('=')
566 {
567 let name = rest[..eq_pos].trim().to_string();
568 let value = rest[eq_pos + 1..].to_string();
569 outputs.insert(name, value);
570 }
571 }
572 outputs
573 }
574
575 pub fn build_output_data(
577 step_name: &str,
578 stdout: &str,
579 stderr: &str,
580 exit_code: i32,
581 parsed_outputs: &HashMap<String, String>,
582 ) -> serde_json::Value {
583 let mut outputs = serde_json::Map::new();
584 for (key, value) in parsed_outputs {
585 outputs.insert(key.clone(), serde_json::Value::String(value.clone()));
586 }
587 outputs.insert(
588 format!("{step_name}.stdout"),
589 serde_json::Value::String(stdout.to_string()),
590 );
591 outputs.insert(
592 format!("{step_name}.stderr"),
593 serde_json::Value::String(stderr.to_string()),
594 );
595 outputs.insert(
596 format!("{step_name}.exit_code"),
597 serde_json::Value::Number(serde_json::Number::from(exit_code)),
598 );
599 serde_json::Value::Object(outputs)
600 }
601}
602
603fn parse_user_spec(user: &str) -> (u32, u32) {
605 let parts: Vec<&str> = user.split(':').collect();
606 if parts.len() == 2 {
607 let uid = parts[0].parse().unwrap_or(65534);
608 let gid = parts[1].parse().unwrap_or(65534);
609 (uid, gid)
610 } else {
611 (65534, 65534)
612 }
613}
614
615#[async_trait]
616impl StepBody for ContainerdStep {
617 async fn run(
618 &mut self,
619 context: &StepExecutionContext<'_>,
620 ) -> wfe_core::Result<ExecutionResult> {
621 let step_name = context.step.name.as_deref().unwrap_or("unknown");
622 let namespace = DEFAULT_NAMESPACE;
623
624 let addr = &self.config.containerd_addr;
626 tracing::info!(addr = %addr, "connecting to containerd daemon");
627 let channel = Self::connect(addr).await?;
628
629 {
631 let mut version_client = VersionClient::new(channel.clone());
632 let req = Self::with_namespace((), namespace);
633 match version_client.version(req).await {
634 Ok(resp) => {
635 let v = resp.into_inner();
636 tracing::info!(
637 version = %v.version,
638 revision = %v.revision,
639 "connected to containerd"
640 );
641 }
642 Err(e) => {
643 return Err(WfeError::StepExecution(format!(
644 "containerd version check failed: {e}"
645 )));
646 }
647 }
648 }
649
650 let should_check = !matches!(self.config.pull.as_str(), "never");
652 if should_check {
653 Self::ensure_image(&channel, &self.config.image, namespace).await?;
654 }
655
656 let container_id = format!("wfe-{}", uuid::Uuid::new_v4());
658
659 let mut merged_env: HashMap<String, String> = HashMap::new();
661 if let Some(data_obj) = context.workflow.data.as_object() {
662 for (key, value) in data_obj {
663 let env_key = key.to_uppercase();
664 let env_val = match value {
665 serde_json::Value::String(s) => s.clone(),
666 other => other.to_string(),
667 };
668 merged_env.insert(env_key, env_val);
669 }
670 }
671 for (key, value) in &self.config.env {
673 merged_env.insert(key.clone(), value.clone());
674 }
675
676 let oci_spec = self.build_oci_spec(&merged_env);
678
679 tracing::info!(container_id = %container_id, image = %self.config.image, "creating container");
681 let mut containers_client = ContainersClient::new(channel.clone());
682 let create_req = Self::with_namespace(
683 CreateContainerRequest {
684 container: Some(Container {
685 id: container_id.clone(),
686 image: self.config.image.clone(),
687 runtime: Some(Runtime {
688 name: "io.containerd.runc.v2".to_string(),
689 options: None,
690 }),
691 spec: Some(oci_spec),
692 snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
693 snapshot_key: container_id.clone(),
694 labels: HashMap::new(),
695 created_at: None,
696 updated_at: None,
697 extensions: HashMap::new(),
698 sandbox: String::new(),
699 }),
700 },
701 namespace,
702 );
703
704 containers_client
705 .create(create_req)
706 .await
707 .map_err(|e| WfeError::StepExecution(format!("failed to create container: {e}")))?;
708
709 let mut snapshots_client = SnapshotsClient::new(channel.clone());
711
712 let mounts = {
713 let mounts_req = Self::with_namespace(
715 MountsRequest {
716 snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
717 key: container_id.clone(),
718 },
719 namespace,
720 );
721
722 match snapshots_client.mounts(mounts_req).await {
723 Ok(resp) => resp.into_inner().mounts,
724 Err(_) => {
725 let parent = if should_check {
727 Self::resolve_image_chain_id(&channel, &self.config.image, namespace)
728 .await?
729 } else {
730 String::new()
731 };
732
733 let prepare_req = Self::with_namespace(
734 PrepareSnapshotRequest {
735 snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
736 key: container_id.clone(),
737 parent,
738 labels: HashMap::new(),
739 },
740 namespace,
741 );
742 snapshots_client
743 .prepare(prepare_req)
744 .await
745 .map_err(|e| {
746 WfeError::StepExecution(format!("failed to prepare snapshot: {e}"))
747 })?
748 .into_inner()
749 .mounts
750 }
751 }
752 };
753
754 let io_base = std::env::var("WFE_IO_DIR")
758 .map(std::path::PathBuf::from)
759 .unwrap_or_else(|_| std::env::temp_dir());
760 let tmp_dir = io_base.join(format!("wfe-io-{container_id}"));
761 std::fs::create_dir_all(&tmp_dir)
762 .map_err(|e| WfeError::StepExecution(format!("failed to create IO temp dir: {e}")))?;
763
764 let stdout_path = tmp_dir.join("stdout");
765 let stderr_path = tmp_dir.join("stderr");
766
767 for path in [&stdout_path, &stderr_path] {
771 let _ = std::fs::remove_file(path);
772 std::fs::File::create(path).map_err(|e| {
773 WfeError::StepExecution(format!("failed to create IO file {}: {e}", path.display()))
774 })?;
775 #[cfg(unix)]
777 {
778 use std::os::unix::fs::PermissionsExt;
779 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o666)).ok();
780 }
781 }
782
783 let stdout_str = stdout_path.to_string_lossy().to_string();
784 let stderr_str = stderr_path.to_string_lossy().to_string();
785
786 let mut tasks_client = TasksClient::new(channel.clone());
788
789 let create_task_req = Self::with_namespace(
790 CreateTaskRequest {
791 container_id: container_id.clone(),
792 rootfs: mounts,
793 stdin: String::new(),
794 stdout: stdout_str.clone(),
795 stderr: stderr_str.clone(),
796 terminal: false,
797 checkpoint: None,
798 options: None,
799 runtime_path: String::new(),
800 },
801 namespace,
802 );
803
804 tasks_client
805 .create(create_task_req)
806 .await
807 .map_err(|e| WfeError::StepExecution(format!("failed to create task: {e}")))?;
808
809 let start_req = Self::with_namespace(
811 StartRequest {
812 container_id: container_id.clone(),
813 exec_id: String::new(),
814 },
815 namespace,
816 );
817
818 tasks_client
819 .start(start_req)
820 .await
821 .map_err(|e| WfeError::StepExecution(format!("failed to start task: {e}")))?;
822
823 tracing::info!(container_id = %container_id, "task started");
824
825 let wait_req = Self::with_namespace(
827 WaitRequest {
828 container_id: container_id.clone(),
829 exec_id: String::new(),
830 },
831 namespace,
832 );
833
834 let wait_result = if let Some(timeout_ms) = self.config.timeout_ms {
835 let duration = std::time::Duration::from_millis(timeout_ms);
836 match tokio::time::timeout(duration, tasks_client.wait(wait_req)).await {
837 Ok(result) => result,
838 Err(_) => {
839 let _ = Self::cleanup(&channel, &container_id, namespace).await;
841 let _ = std::fs::remove_dir_all(&tmp_dir);
842 return Err(WfeError::StepExecution(format!(
843 "container execution timed out after {timeout_ms}ms"
844 )));
845 }
846 }
847 } else {
848 tasks_client.wait(wait_req).await
849 };
850
851 let exit_status = match wait_result {
852 Ok(resp) => resp.into_inner().exit_status,
853 Err(e) => {
854 let _ = Self::cleanup(&channel, &container_id, namespace).await;
855 let _ = std::fs::remove_dir_all(&tmp_dir);
856 return Err(WfeError::StepExecution(format!(
857 "failed waiting for task: {e}"
858 )));
859 }
860 };
861
862 let stdout_content = tokio::fs::read_to_string(&stdout_path)
864 .await
865 .unwrap_or_default();
866 let stderr_content = tokio::fs::read_to_string(&stderr_path)
867 .await
868 .unwrap_or_default();
869
870 if let Err(e) = Self::cleanup(&channel, &container_id, namespace).await {
872 tracing::warn!(container_id = %container_id, error = %e, "cleanup failed");
873 }
874 let _ = std::fs::remove_dir_all(&tmp_dir);
875
876 let exit_code = exit_status as i32;
878 if exit_code != 0 {
879 return Err(WfeError::StepExecution(format!(
880 "container exited with code {exit_code}\nstdout: {stdout_content}\nstderr: {stderr_content}"
881 )));
882 }
883
884 let parsed = Self::parse_outputs(&stdout_content);
886 let output_data = Self::build_output_data(
887 step_name,
888 &stdout_content,
889 &stderr_content,
890 exit_code,
891 &parsed,
892 );
893
894 Ok(ExecutionResult {
895 proceed: true,
896 output_data: Some(output_data),
897 ..Default::default()
898 })
899 }
900}
901
902impl ContainerdStep {
903 pub(crate) async fn cleanup(
905 channel: &Channel,
906 container_id: &str,
907 namespace: &str,
908 ) -> Result<(), WfeError> {
909 let mut tasks_client = TasksClient::new(channel.clone());
910 let mut containers_client = ContainersClient::new(channel.clone());
911
912 let del_task_req = Self::with_namespace(
914 DeleteTaskRequest {
915 container_id: container_id.to_string(),
916 },
917 namespace,
918 );
919 let _ = tasks_client.delete(del_task_req).await;
920
921 let del_container_req = Self::with_namespace(
923 DeleteContainerRequest {
924 id: container_id.to_string(),
925 },
926 namespace,
927 );
928 containers_client
929 .delete(del_container_req)
930 .await
931 .map_err(|e| WfeError::StepExecution(format!("failed to delete container: {e}")))?;
932
933 Ok(())
934 }
935}
936
937#[cfg(test)]
938mod tests {
939 use super::*;
940 use crate::config::{TlsConfig, VolumeMountConfig};
941 use pretty_assertions::assert_eq;
942
943 fn minimal_config() -> ContainerdConfig {
944 ContainerdConfig {
945 image: "alpine:3.18".to_string(),
946 command: None,
947 run: Some("echo hello".to_string()),
948 env: HashMap::new(),
949 volumes: vec![],
950 working_dir: None,
951 user: "65534:65534".to_string(),
952 network: "none".to_string(),
953 memory: None,
954 cpu: None,
955 pull: "if-not-present".to_string(),
956 containerd_addr: "/run/containerd/containerd.sock".to_string(),
957 cli: "nerdctl".to_string(),
958 tls: TlsConfig::default(),
959 registry_auth: HashMap::new(),
960 timeout_ms: None,
961 }
962 }
963
964 #[test]
967 fn parse_outputs_empty() {
968 let outputs = ContainerdStep::parse_outputs("");
969 assert!(outputs.is_empty());
970 }
971
972 #[test]
973 fn parse_outputs_single() {
974 let stdout = "some log line\n##wfe[output version=1.2.3]\nmore logs\n";
975 let outputs = ContainerdStep::parse_outputs(stdout);
976 assert_eq!(outputs.len(), 1);
977 assert_eq!(outputs.get("version").unwrap(), "1.2.3");
978 }
979
980 #[test]
981 fn parse_outputs_multiple() {
982 let stdout = "##wfe[output foo=bar]\n##wfe[output baz=qux]\n";
983 let outputs = ContainerdStep::parse_outputs(stdout);
984 assert_eq!(outputs.len(), 2);
985 assert_eq!(outputs.get("foo").unwrap(), "bar");
986 assert_eq!(outputs.get("baz").unwrap(), "qux");
987 }
988
989 #[test]
990 fn parse_outputs_mixed_with_regular_stdout() {
991 let stdout = "Starting container...\n\
992 Pulling image...\n\
993 ##wfe[output digest=sha256:abc123]\n\
994 Running tests...\n\
995 ##wfe[output result=pass]\n\
996 Done.\n";
997 let outputs = ContainerdStep::parse_outputs(stdout);
998 assert_eq!(outputs.len(), 2);
999 assert_eq!(outputs.get("digest").unwrap(), "sha256:abc123");
1000 assert_eq!(outputs.get("result").unwrap(), "pass");
1001 }
1002
1003 #[test]
1004 fn parse_outputs_no_wfe_lines() {
1005 let stdout = "line 1\nline 2\nline 3\n";
1006 let outputs = ContainerdStep::parse_outputs(stdout);
1007 assert!(outputs.is_empty());
1008 }
1009
1010 #[test]
1011 fn parse_outputs_value_with_equals_sign() {
1012 let stdout = "##wfe[output url=https://example.com?a=1&b=2]\n";
1013 let outputs = ContainerdStep::parse_outputs(stdout);
1014 assert_eq!(outputs.len(), 1);
1015 assert_eq!(outputs.get("url").unwrap(), "https://example.com?a=1&b=2");
1016 }
1017
1018 #[test]
1019 fn parse_outputs_ignores_malformed_lines() {
1020 let stdout = "##wfe[output ]\n\
1021 ##wfe[output no_equals]\n\
1022 ##wfe[output valid=yes]\n\
1023 ##wfe[output_extra bad=val]\n";
1024 let outputs = ContainerdStep::parse_outputs(stdout);
1025 assert_eq!(outputs.len(), 1);
1026 assert_eq!(outputs.get("valid").unwrap(), "yes");
1027 }
1028
1029 #[test]
1030 fn parse_outputs_overwrites_duplicate_keys() {
1031 let stdout = "##wfe[output key=first]\n##wfe[output key=second]\n";
1032 let outputs = ContainerdStep::parse_outputs(stdout);
1033 assert_eq!(outputs.len(), 1);
1034 assert_eq!(outputs.get("key").unwrap(), "second");
1035 }
1036
1037 #[test]
1040 fn build_output_data_basic() {
1041 let parsed = HashMap::from([("result".to_string(), "success".to_string())]);
1042 let data = ContainerdStep::build_output_data("my_step", "hello world\n", "", 0, &parsed);
1043
1044 let obj = data.as_object().unwrap();
1045 assert_eq!(obj.get("result").unwrap(), "success");
1046 assert_eq!(obj.get("my_step.stdout").unwrap(), "hello world\n");
1047 assert_eq!(obj.get("my_step.stderr").unwrap(), "");
1048 assert_eq!(obj.get("my_step.exit_code").unwrap(), 0);
1049 }
1050
1051 #[test]
1052 fn build_output_data_no_parsed_outputs() {
1053 let data = ContainerdStep::build_output_data("step1", "out", "err", 1, &HashMap::new());
1054
1055 let obj = data.as_object().unwrap();
1056 assert_eq!(obj.len(), 3); assert_eq!(obj.get("step1.stdout").unwrap(), "out");
1058 assert_eq!(obj.get("step1.stderr").unwrap(), "err");
1059 assert_eq!(obj.get("step1.exit_code").unwrap(), 1);
1060 }
1061
1062 #[test]
1063 fn build_output_data_with_multiple_parsed_outputs() {
1064 let parsed = HashMap::from([
1065 ("a".to_string(), "1".to_string()),
1066 ("b".to_string(), "2".to_string()),
1067 ("c".to_string(), "3".to_string()),
1068 ]);
1069 let data = ContainerdStep::build_output_data("s", "", "", 0, &parsed);
1070
1071 let obj = data.as_object().unwrap();
1072 assert_eq!(obj.get("a").unwrap(), "1");
1073 assert_eq!(obj.get("b").unwrap(), "2");
1074 assert_eq!(obj.get("c").unwrap(), "3");
1075 assert_eq!(obj.len(), 6);
1077 }
1078
1079 #[test]
1080 fn build_output_data_negative_exit_code() {
1081 let data = ContainerdStep::build_output_data("s", "", "", -1, &HashMap::new());
1082 let obj = data.as_object().unwrap();
1083 assert_eq!(obj.get("s.exit_code").unwrap(), -1);
1084 }
1085
1086 #[test]
1089 fn parse_user_spec_normal() {
1090 assert_eq!(parse_user_spec("1000:1000"), (1000, 1000));
1091 }
1092
1093 #[test]
1094 fn parse_user_spec_root() {
1095 assert_eq!(parse_user_spec("0:0"), (0, 0));
1096 }
1097
1098 #[test]
1099 fn parse_user_spec_default() {
1100 assert_eq!(parse_user_spec("65534:65534"), (65534, 65534));
1101 }
1102
1103 #[test]
1104 fn parse_user_spec_invalid_falls_back() {
1105 assert_eq!(parse_user_spec("abc"), (65534, 65534));
1106 }
1107
1108 #[test]
1111 fn build_oci_spec_minimal() {
1112 let step = ContainerdStep::new(minimal_config());
1113 let env = HashMap::new();
1114 let spec = step.build_oci_spec(&env);
1115
1116 assert_eq!(
1117 spec.type_url,
1118 "types.containerd.io/opencontainers/runtime-spec/1/Spec"
1119 );
1120 assert!(!spec.value.is_empty());
1121
1122 let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1124 assert_eq!(parsed["ociVersion"], "1.0.2");
1125 assert_eq!(parsed["process"]["args"][0], "/bin/sh");
1126 assert_eq!(parsed["process"]["args"][1], "-c");
1127 assert_eq!(parsed["process"]["args"][2], "echo hello");
1128 assert_eq!(parsed["process"]["user"]["uid"], 65534);
1129 assert_eq!(parsed["process"]["user"]["gid"], 65534);
1130 assert_eq!(parsed["process"]["cwd"], "/");
1131 }
1132
1133 #[test]
1134 fn build_oci_spec_with_command() {
1135 let mut config = minimal_config();
1136 config.run = None;
1137 config.command = Some(vec![
1138 "echo".to_string(),
1139 "hello".to_string(),
1140 "world".to_string(),
1141 ]);
1142 let step = ContainerdStep::new(config);
1143 let spec = step.build_oci_spec(&HashMap::new());
1144
1145 let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1146 assert_eq!(parsed["process"]["args"][0], "echo");
1147 assert_eq!(parsed["process"]["args"][1], "hello");
1148 assert_eq!(parsed["process"]["args"][2], "world");
1149 }
1150
1151 #[test]
1152 fn build_oci_spec_with_env() {
1153 let step = ContainerdStep::new(minimal_config());
1154 let env = HashMap::from([
1155 ("FOO".to_string(), "bar".to_string()),
1156 ("BAZ".to_string(), "qux".to_string()),
1157 ]);
1158 let spec = step.build_oci_spec(&env);
1159
1160 let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1161 let env_arr: Vec<String> = parsed["process"]["env"]
1162 .as_array()
1163 .unwrap()
1164 .iter()
1165 .map(|v| v.as_str().unwrap().to_string())
1166 .collect();
1167
1168 assert!(env_arr.contains(&"FOO=bar".to_string()));
1169 assert!(env_arr.contains(&"BAZ=qux".to_string()));
1170 }
1171
1172 #[test]
1173 fn build_oci_spec_with_working_dir() {
1174 let mut config = minimal_config();
1175 config.working_dir = Some("/app".to_string());
1176 let step = ContainerdStep::new(config);
1177 let spec = step.build_oci_spec(&HashMap::new());
1178
1179 let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1180 assert_eq!(parsed["process"]["cwd"], "/app");
1181 }
1182
1183 #[test]
1184 fn build_oci_spec_with_user() {
1185 let mut config = minimal_config();
1186 config.user = "1000:2000".to_string();
1187 let step = ContainerdStep::new(config);
1188 let spec = step.build_oci_spec(&HashMap::new());
1189
1190 let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1191 assert_eq!(parsed["process"]["user"]["uid"], 1000);
1192 assert_eq!(parsed["process"]["user"]["gid"], 2000);
1193 }
1194
1195 #[test]
1196 fn build_oci_spec_with_volumes() {
1197 let mut config = minimal_config();
1198 config.volumes = vec![
1199 VolumeMountConfig {
1200 source: "/host/data".to_string(),
1201 target: "/container/data".to_string(),
1202 readonly: false,
1203 },
1204 VolumeMountConfig {
1205 source: "/host/config".to_string(),
1206 target: "/etc/config".to_string(),
1207 readonly: true,
1208 },
1209 ];
1210 let step = ContainerdStep::new(config);
1211 let spec = step.build_oci_spec(&HashMap::new());
1212
1213 let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1214 let mounts = parsed["mounts"].as_array().unwrap();
1215 assert_eq!(mounts.len(), 5);
1217
1218 let bind_mounts: Vec<&serde_json::Value> =
1219 mounts.iter().filter(|m| m["type"] == "bind").collect();
1220 assert_eq!(bind_mounts.len(), 2);
1221
1222 let ro_mount = bind_mounts
1223 .iter()
1224 .find(|m| m["destination"] == "/etc/config")
1225 .unwrap();
1226 let opts: Vec<String> = ro_mount["options"]
1227 .as_array()
1228 .unwrap()
1229 .iter()
1230 .map(|v| v.as_str().unwrap().to_string())
1231 .collect();
1232 assert!(opts.contains(&"ro".to_string()));
1233 }
1234
1235 #[test]
1236 fn build_oci_spec_no_command_no_run() {
1237 let mut config = minimal_config();
1238 config.run = None;
1239 config.command = None;
1240 let step = ContainerdStep::new(config);
1241 let spec = step.build_oci_spec(&HashMap::new());
1242
1243 let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1244 assert!(parsed["process"]["args"].as_array().unwrap().is_empty());
1245 }
1246
1247 #[tokio::test]
1250 async fn connect_to_missing_unix_socket_returns_error() {
1251 let err = ContainerdStep::connect("/tmp/nonexistent-wfe-containerd-test.sock")
1252 .await
1253 .unwrap_err();
1254 let msg = format!("{err}");
1255 assert!(
1256 msg.contains("socket not found"),
1257 "expected 'socket not found' error, got: {msg}"
1258 );
1259 }
1260
1261 #[tokio::test]
1262 async fn connect_to_missing_unix_socket_with_scheme_returns_error() {
1263 let err = ContainerdStep::connect("unix:///tmp/nonexistent-wfe-containerd-test.sock")
1264 .await
1265 .unwrap_err();
1266 let msg = format!("{err}");
1267 assert!(
1268 msg.contains("socket not found"),
1269 "expected 'socket not found' error, got: {msg}"
1270 );
1271 }
1272
1273 #[tokio::test]
1274 async fn connect_to_invalid_tcp_returns_error() {
1275 let err = ContainerdStep::connect("tcp://127.0.0.1:1")
1276 .await
1277 .unwrap_err();
1278 let msg = format!("{err}");
1279 assert!(
1280 msg.contains("failed to connect"),
1281 "expected connection error, got: {msg}"
1282 );
1283 }
1284
1285 #[test]
1288 fn new_creates_step_with_config() {
1289 let config = minimal_config();
1290 let step = ContainerdStep::new(config);
1291 assert_eq!(step.config.image, "alpine:3.18");
1292 assert_eq!(
1293 step.config.containerd_addr,
1294 "/run/containerd/containerd.sock"
1295 );
1296 }
1297}
1298
1299#[cfg(test)]
1301mod e2e_tests {
1302 use super::*;
1303
1304 fn containerd_addr() -> Option<String> {
1306 let addr = std::env::var("WFE_CONTAINERD_ADDR").unwrap_or_else(|_| {
1307 format!(
1308 "unix://{}/.lima/wfe-test/sock/containerd.sock",
1309 std::env::var("HOME").unwrap_or_else(|_| "/root".to_string())
1310 )
1311 });
1312
1313 let socket_path = addr.strip_prefix("unix://").unwrap_or(addr.as_str());
1314
1315 if Path::new(socket_path).exists() {
1316 Some(addr)
1317 } else {
1318 None
1319 }
1320 }
1321
1322 #[tokio::test]
1323 async fn e2e_version_check() {
1324 let Some(addr) = containerd_addr() else {
1325 eprintln!("SKIP: containerd socket not available");
1326 return;
1327 };
1328
1329 let channel = ContainerdStep::connect(&addr).await.unwrap();
1330 let mut client = VersionClient::new(channel);
1331
1332 let req = ContainerdStep::with_namespace((), DEFAULT_NAMESPACE);
1333 let resp = client.version(req).await.unwrap();
1334 let version = resp.into_inner();
1335
1336 assert!(!version.version.is_empty(), "version should not be empty");
1337 assert!(!version.revision.is_empty(), "revision should not be empty");
1338 eprintln!(
1339 "containerd version={} revision={}",
1340 version.version, version.revision
1341 );
1342 }
1343}