1use std::collections::HashMap;
2use std::path::Path;
3
4use async_trait::async_trait;
5use tonic::transport::{Channel, Endpoint, Uri};
6use wfe_core::WfeError;
7use wfe_core::models::ExecutionResult;
8use wfe_core::traits::step::{StepBody, StepExecutionContext};
9
10use wfe_containerd_protos::containerd::services::containers::v1::{
11 Container, CreateContainerRequest, DeleteContainerRequest, container::Runtime,
12 containers_client::ContainersClient,
13};
14use wfe_containerd_protos::containerd::services::content::v1::{
15 ReadContentRequest, WriteAction, WriteContentRequest, content_client::ContentClient,
16};
17use wfe_containerd_protos::containerd::services::diff::v1::{
18 ApplyRequest, diff_client::DiffClient,
19};
20use wfe_containerd_protos::containerd::services::images::v1::{
21 GetImageRequest, images_client::ImagesClient,
22};
23use wfe_containerd_protos::containerd::services::snapshots::v1::{
24 MountsRequest, PrepareSnapshotRequest, snapshots_client::SnapshotsClient,
25};
26use wfe_containerd_protos::containerd::types::Descriptor;
27use wfe_containerd_protos::containerd::services::tasks::v1::{
28 CreateTaskRequest, DeleteTaskRequest, StartRequest, WaitRequest, tasks_client::TasksClient,
29};
30use wfe_containerd_protos::containerd::services::version::v1::version_client::VersionClient;
31
32use crate::config::ContainerdConfig;
33
34const DEFAULT_NAMESPACE: &str = "default";
36
37const DEFAULT_SNAPSHOTTER: &str = "overlayfs";
39
40pub struct ContainerdStep {
42 config: ContainerdConfig,
43 artifact_applies: Vec<(String, Descriptor)>,
46}
47
48impl ContainerdStep {
49 pub fn new(config: ContainerdConfig) -> Self {
50 Self {
51 config,
52 artifact_applies: Vec::new(),
53 }
54 }
55
56 pub(crate) async fn connect(addr: &str) -> Result<Channel, WfeError> {
61 let channel = if addr.starts_with('/') || addr.starts_with("unix://") {
62 let socket_path = addr.strip_prefix("unix://").unwrap_or(addr).to_string();
63
64 if !Path::new(&socket_path).exists() {
65 return Err(WfeError::StepExecution(format!(
66 "containerd socket not found: {socket_path}"
67 )));
68 }
69
70 Endpoint::try_from("http://[::]:50051")
71 .map_err(|e| WfeError::StepExecution(format!("failed to create endpoint: {e}")))?
72 .connect_with_connector(tower::service_fn(move |_: Uri| {
73 let path = socket_path.clone();
74 async move {
75 tokio::net::UnixStream::connect(path)
76 .await
77 .map(hyper_util::rt::TokioIo::new)
78 }
79 }))
80 .await
81 .map_err(|e| {
82 WfeError::StepExecution(format!(
83 "failed to connect to containerd via Unix socket at {addr}: {e}"
84 ))
85 })?
86 } else {
87 let connect_addr = if addr.starts_with("tcp://") {
88 addr.replacen("tcp://", "http://", 1)
89 } else {
90 addr.to_string()
91 };
92
93 Endpoint::from_shared(connect_addr.clone())
94 .map_err(|e| {
95 WfeError::StepExecution(format!(
96 "invalid containerd endpoint {connect_addr}: {e}"
97 ))
98 })?
99 .timeout(std::time::Duration::from_secs(30))
100 .connect()
101 .await
102 .map_err(|e| {
103 WfeError::StepExecution(format!(
104 "failed to connect to containerd at {connect_addr}: {e}"
105 ))
106 })?
107 };
108
109 Ok(channel)
110 }
111
112 async fn upload_content(
118 channel: &Channel,
119 namespace: &str,
120 data: &[u8],
121 ) -> Result<(String, i64), WfeError> {
122 use sha2::{Digest, Sha256};
123
124 let digest = format!("sha256:{:x}", Sha256::digest(data));
125 let size = data.len() as i64;
126
127 let mut client = ContentClient::new(channel.clone());
128
129 let mut requests = Vec::new();
131
132 requests.push(WriteContentRequest {
134 action: WriteAction::Commit as i32,
135 r#ref: digest.clone(),
136 total: size,
137 expected: digest.clone(),
138 offset: 0,
139 data: data.to_vec(),
140 labels: HashMap::new(),
141 });
142
143 let stream = tokio_stream::iter(requests);
144 let req = Self::with_namespace(stream, namespace);
145
146 let mut response = client
147 .write(req)
148 .await
149 .map_err(|e| WfeError::StepExecution(format!("content write failed: {e}")))?
150 .into_inner();
151
152 while let Some(resp) = response
154 .message()
155 .await
156 .map_err(|e| WfeError::StepExecution(format!("content write stream error: {e}")))?
157 {
158 if resp.action == WriteAction::Commit as i32 {
159 break;
160 }
161 }
162
163 Ok((digest, size))
164 }
165
166 async fn apply_diff(
171 channel: &Channel,
172 namespace: &str,
173 mounts: Vec<wfe_containerd_protos::containerd::types::Mount>,
174 descriptor: Descriptor,
175 ) -> Result<(), WfeError> {
176 let mut client = DiffClient::new(channel.clone());
177
178 let req = Self::with_namespace(
179 ApplyRequest {
180 diff: Some(descriptor),
181 mounts,
182 payloads: HashMap::new(),
183 sync_fs: false,
184 },
185 namespace,
186 );
187
188 client
189 .apply(req)
190 .await
191 .map_err(|e| WfeError::StepExecution(format!("Diff.Apply failed: {e}")))?;
192
193 Ok(())
194 }
195
196 async fn ensure_image(channel: &Channel, image: &str, namespace: &str) -> Result<(), WfeError> {
205 let mut client = ImagesClient::new(channel.clone());
206
207 let mut req = tonic::Request::new(GetImageRequest {
208 name: image.to_string(),
209 });
210 req.metadata_mut()
211 .insert("containerd-namespace", namespace.parse().unwrap());
212
213 match client.get(req).await {
214 Ok(_) => Ok(()),
215 Err(status) => Err(WfeError::StepExecution(format!(
216 "image '{image}' not found in containerd (namespace={namespace}). \
217 Pre-pull it with: ctr -n {namespace} image pull {image} \
218 (gRPC status: {status})"
219 ))),
220 }
221 }
222
223 async fn resolve_image_chain_id(
233 channel: &Channel,
234 image: &str,
235 namespace: &str,
236 ) -> Result<String, WfeError> {
237 use sha2::{Digest, Sha256};
238
239 let mut images_client = ImagesClient::new(channel.clone());
241 let req = Self::with_namespace(
242 GetImageRequest {
243 name: image.to_string(),
244 },
245 namespace,
246 );
247 let image_resp = images_client
248 .get(req)
249 .await
250 .map_err(|e| WfeError::StepExecution(format!("failed to get image '{image}': {e}")))?;
251 let img = image_resp
252 .into_inner()
253 .image
254 .ok_or_else(|| WfeError::StepExecution(format!("image '{image}' has no record")))?;
255 let target = img.target.ok_or_else(|| {
256 WfeError::StepExecution(format!("image '{image}' has no target descriptor"))
257 })?;
258
259 let manifest_digest = target.digest.clone();
262 let manifest_bytes = Self::read_content(channel, &manifest_digest, namespace).await?;
263 let manifest_json: serde_json::Value = serde_json::from_slice(&manifest_bytes)
264 .map_err(|e| WfeError::StepExecution(format!("failed to parse manifest: {e}")))?;
265
266 let manifest_json = if manifest_json.get("manifests").is_some() {
268 let arch = std::env::consts::ARCH;
270 let oci_arch = match arch {
271 "aarch64" => "arm64",
272 "x86_64" => "amd64",
273 other => other,
274 };
275 let manifests = manifest_json["manifests"].as_array().ok_or_else(|| {
276 WfeError::StepExecution("image index has no manifests array".to_string())
277 })?;
278 let platform_manifest = manifests
279 .iter()
280 .find(|m| {
281 m.get("platform")
282 .and_then(|p| p.get("architecture"))
283 .and_then(|a| a.as_str())
284 == Some(oci_arch)
285 })
286 .ok_or_else(|| {
287 WfeError::StepExecution(format!(
288 "no manifest for architecture '{oci_arch}' in image index"
289 ))
290 })?;
291 let digest = platform_manifest["digest"].as_str().ok_or_else(|| {
292 WfeError::StepExecution("platform manifest has no digest".to_string())
293 })?;
294 let bytes = Self::read_content(channel, digest, namespace).await?;
295 serde_json::from_slice(&bytes).map_err(|e| {
296 WfeError::StepExecution(format!("failed to parse platform manifest: {e}"))
297 })?
298 } else {
299 manifest_json
300 };
301
302 let config_digest = manifest_json["config"]["digest"]
304 .as_str()
305 .ok_or_else(|| WfeError::StepExecution("manifest has no config.digest".to_string()))?;
306
307 let config_bytes = Self::read_content(channel, config_digest, namespace).await?;
309 let config_json: serde_json::Value = serde_json::from_slice(&config_bytes)
310 .map_err(|e| WfeError::StepExecution(format!("failed to parse image config: {e}")))?;
311
312 let diff_ids = config_json["rootfs"]["diff_ids"]
314 .as_array()
315 .ok_or_else(|| {
316 WfeError::StepExecution("image config has no rootfs.diff_ids".to_string())
317 })?;
318
319 if diff_ids.is_empty() {
320 return Err(WfeError::StepExecution(
321 "image has no layers (empty diff_ids)".to_string(),
322 ));
323 }
324
325 let mut chain_id = diff_ids[0]
326 .as_str()
327 .ok_or_else(|| WfeError::StepExecution("diff_id is not a string".to_string()))?
328 .to_string();
329
330 for diff_id in &diff_ids[1..] {
331 let diff = diff_id
332 .as_str()
333 .ok_or_else(|| WfeError::StepExecution("diff_id is not a string".to_string()))?;
334 let mut hasher = Sha256::new();
335 hasher.update(format!("{chain_id} {diff}"));
336 chain_id = format!("sha256:{:x}", hasher.finalize());
337 }
338
339 tracing::debug!(image = image, chain_id = %chain_id, "resolved image chain ID");
340 Ok(chain_id)
341 }
342
343 async fn read_content(
345 channel: &Channel,
346 digest: &str,
347 namespace: &str,
348 ) -> Result<Vec<u8>, WfeError> {
349 use tokio_stream::StreamExt;
350
351 let mut client = ContentClient::new(channel.clone());
352 let req = Self::with_namespace(
353 ReadContentRequest {
354 digest: digest.to_string(),
355 offset: 0,
356 size: 0, },
358 namespace,
359 );
360
361 let mut stream = client
362 .read(req)
363 .await
364 .map_err(|e| WfeError::StepExecution(format!("failed to read content {digest}: {e}")))?
365 .into_inner();
366
367 let mut data = Vec::new();
368 while let Some(chunk) = stream.next().await {
369 let chunk = chunk.map_err(|e| {
370 WfeError::StepExecution(format!("error reading content {digest}: {e}"))
371 })?;
372 data.extend_from_slice(&chunk.data);
373 }
374
375 Ok(data)
376 }
377
378 pub(crate) fn build_oci_spec(&self, merged_env: &HashMap<String, String>) -> prost_types::Any {
383 let args: Vec<String> = if let Some(ref run) = self.config.run {
385 vec!["/bin/sh".to_string(), "-c".to_string(), run.clone()]
386 } else if let Some(ref command) = self.config.command {
387 command.clone()
388 } else {
389 vec![]
390 };
391
392 let env: Vec<String> = merged_env.iter().map(|(k, v)| format!("{k}={v}")).collect();
394
395 let mut mounts = vec![
397 serde_json::json!({
398 "destination": "/proc",
399 "type": "proc",
400 "source": "proc",
401 "options": ["nosuid", "noexec", "nodev"]
402 }),
403 serde_json::json!({
404 "destination": "/dev",
405 "type": "tmpfs",
406 "source": "tmpfs",
407 "options": ["nosuid", "strictatime", "mode=755", "size=65536k"]
408 }),
409 serde_json::json!({
410 "destination": "/sys",
411 "type": "sysfs",
412 "source": "sysfs",
413 "options": ["nosuid", "noexec", "nodev", "ro"]
414 }),
415 ];
416
417 for vol in &self.config.volumes {
418 let mut opts = vec!["rbind".to_string()];
419 if vol.readonly {
420 opts.push("ro".to_string());
421 }
422 mounts.push(serde_json::json!({
423 "destination": vol.target,
424 "type": "bind",
425 "source": vol.source,
426 "options": opts,
427 }));
428 }
429
430 let (uid, gid) = parse_user_spec(&self.config.user);
432
433 let mut process = serde_json::json!({
434 "terminal": false,
435 "user": {
436 "uid": uid,
437 "gid": gid,
438 },
439 "args": args,
440 "env": env,
441 "cwd": self.config.working_dir.as_deref().unwrap_or("/"),
442 });
443
444 let caps = if uid == 0 {
447 serde_json::json!([
448 "CAP_AUDIT_WRITE",
449 "CAP_CHOWN",
450 "CAP_DAC_OVERRIDE",
451 "CAP_FOWNER",
452 "CAP_FSETID",
453 "CAP_KILL",
454 "CAP_MKNOD",
455 "CAP_NET_BIND_SERVICE",
456 "CAP_NET_RAW",
457 "CAP_SETFCAP",
458 "CAP_SETGID",
459 "CAP_SETPCAP",
460 "CAP_SETUID",
461 "CAP_SYS_CHROOT",
462 ])
463 } else {
464 serde_json::json!([])
465 };
466 process["capabilities"] = serde_json::json!({
467 "bounding": caps,
468 "effective": caps,
469 "inheritable": caps,
470 "permitted": caps,
471 "ambient": caps,
472 });
473
474 let spec = serde_json::json!({
475 "ociVersion": "1.0.2",
476 "process": process,
477 "root": {
478 "path": "rootfs",
479 "readonly": false,
480 },
481 "mounts": mounts,
482 "linux": {
483 "namespaces": [
484 { "type": "pid" },
485 { "type": "ipc" },
486 { "type": "uts" },
487 { "type": "mount" },
488 ],
489 },
490 });
491
492 let json_bytes = serde_json::to_vec(&spec).expect("OCI spec serialization cannot fail");
493
494 prost_types::Any {
495 type_url: "types.containerd.io/opencontainers/runtime-spec/1/Spec".to_string(),
496 value: json_bytes,
497 }
498 }
499
500 pub(crate) fn with_namespace<T>(req: T, namespace: &str) -> tonic::Request<T> {
502 let mut request = tonic::Request::new(req);
503 request
504 .metadata_mut()
505 .insert("containerd-namespace", namespace.parse().unwrap());
506 request
507 }
508
509 pub async fn run_service(
515 addr: &str,
516 container_id: &str,
517 image: &str,
518 env: &std::collections::HashMap<String, String>,
519 ) -> Result<(), WfeError> {
520 let namespace = DEFAULT_NAMESPACE;
521 let channel = Self::connect(addr).await?;
522
523 Self::ensure_image(&channel, image, namespace).await?;
525
526 let config = ContainerdConfig {
528 image: image.to_string(),
529 command: None,
530 run: None,
531 env: env.clone(),
532 volumes: vec![],
533 working_dir: None,
534 user: "0:0".to_string(),
535 network: "host".to_string(),
536 memory: None,
537 cpu: None,
538 pull: "if-not-present".to_string(),
539 containerd_addr: addr.to_string(),
540 cli: "nerdctl".to_string(),
541 tls: Default::default(),
542 registry_auth: Default::default(),
543 inputs: None,
544 timeout_ms: None,
545 };
546
547 let step = Self::new(config);
548 let oci_spec = step.build_oci_spec(env);
549
550 let mut containers_client = ContainersClient::new(channel.clone());
552 let create_req = Self::with_namespace(
553 CreateContainerRequest {
554 container: Some(Container {
555 id: container_id.to_string(),
556 image: image.to_string(),
557 runtime: Some(Runtime {
558 name: "io.containerd.runc.v2".to_string(),
559 options: None,
560 }),
561 spec: Some(oci_spec),
562 snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
563 snapshot_key: container_id.to_string(),
564 labels: HashMap::new(),
565 created_at: None,
566 updated_at: None,
567 extensions: HashMap::new(),
568 sandbox: String::new(),
569 }),
570 },
571 namespace,
572 );
573 containers_client.create(create_req).await.map_err(|e| {
574 WfeError::StepExecution(format!("failed to create service container: {e}"))
575 })?;
576
577 let mut snapshots_client = SnapshotsClient::new(channel.clone());
579 let mounts = {
580 let mounts_req = Self::with_namespace(
581 MountsRequest {
582 snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
583 key: container_id.to_string(),
584 },
585 namespace,
586 );
587 match snapshots_client.mounts(mounts_req).await {
588 Ok(resp) => resp.into_inner().mounts,
589 Err(_) => {
590 let parent = Self::resolve_image_chain_id(&channel, image, namespace).await?;
591 let prepare_req = Self::with_namespace(
592 PrepareSnapshotRequest {
593 snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
594 key: container_id.to_string(),
595 parent,
596 labels: HashMap::new(),
597 },
598 namespace,
599 );
600 snapshots_client
601 .prepare(prepare_req)
602 .await
603 .map_err(|e| {
604 WfeError::StepExecution(format!("failed to prepare snapshot: {e}"))
605 })?
606 .into_inner()
607 .mounts
608 }
609 }
610 };
611
612 let mut tasks_client = TasksClient::new(channel.clone());
614 let create_task_req = Self::with_namespace(
615 CreateTaskRequest {
616 container_id: container_id.to_string(),
617 rootfs: mounts,
618 stdin: String::new(),
619 stdout: String::new(),
620 stderr: String::new(),
621 terminal: false,
622 checkpoint: None,
623 options: None,
624 runtime_path: String::new(),
625 },
626 namespace,
627 );
628 tasks_client
629 .create(create_task_req)
630 .await
631 .map_err(|e| WfeError::StepExecution(format!("failed to create service task: {e}")))?;
632
633 let start_req = Self::with_namespace(
634 StartRequest {
635 container_id: container_id.to_string(),
636 exec_id: String::new(),
637 },
638 namespace,
639 );
640 tasks_client
641 .start(start_req)
642 .await
643 .map_err(|e| WfeError::StepExecution(format!("failed to start service task: {e}")))?;
644
645 tracing::info!(container_id = %container_id, image = %image, "service container started");
646 Ok(())
647 }
648
649 pub async fn cleanup_service(addr: &str, container_id: &str) -> Result<(), WfeError> {
651 let channel = Self::connect(addr).await?;
652 Self::cleanup(&channel, container_id, DEFAULT_NAMESPACE).await
653 }
654
655 pub fn parse_outputs(stdout: &str) -> HashMap<String, String> {
657 let mut outputs = HashMap::new();
658 for line in stdout.lines() {
659 if let Some(rest) = line.strip_prefix("##wfe[output ")
660 && let Some(rest) = rest.strip_suffix(']')
661 && let Some(eq_pos) = rest.find('=')
662 {
663 let name = rest[..eq_pos].trim().to_string();
664 let value = rest[eq_pos + 1..].to_string();
665 outputs.insert(name, value);
666 }
667 }
668 outputs
669 }
670
671 pub fn build_output_data(
673 step_name: &str,
674 stdout: &str,
675 stderr: &str,
676 exit_code: i32,
677 parsed_outputs: &HashMap<String, String>,
678 ) -> serde_json::Value {
679 let mut outputs = serde_json::Map::new();
680 for (key, value) in parsed_outputs {
681 outputs.insert(key.clone(), serde_json::Value::String(value.clone()));
682 }
683 outputs.insert(
684 format!("{step_name}.stdout"),
685 serde_json::Value::String(stdout.to_string()),
686 );
687 outputs.insert(
688 format!("{step_name}.stderr"),
689 serde_json::Value::String(stderr.to_string()),
690 );
691 outputs.insert(
692 format!("{step_name}.exit_code"),
693 serde_json::Value::Number(serde_json::Number::from(exit_code)),
694 );
695 serde_json::Value::Object(outputs)
696 }
697}
698
699fn parse_user_spec(user: &str) -> (u32, u32) {
701 let parts: Vec<&str> = user.split(':').collect();
702 if parts.len() == 2 {
703 let uid = parts[0].parse().unwrap_or(65534);
704 let gid = parts[1].parse().unwrap_or(65534);
705 (uid, gid)
706 } else {
707 (65534, 65534)
708 }
709}
710
711#[async_trait]
712impl StepBody for ContainerdStep {
713 async fn mount_artifacts(
714 &mut self,
715 context: &StepExecutionContext<'_>,
716 ) -> wfe_core::Result<()> {
717 let Some(ref inputs) = self.config.inputs else {
718 return Ok(());
719 };
720
721 if inputs.is_empty() {
722 return Ok(());
723 }
724
725 let Some(volume) = context.artifact_volume else {
726 return Err(WfeError::StepExecution(
727 "artifact volume required but not provided".to_string(),
728 ));
729 };
730
731 let addr = &self.config.containerd_addr;
732 let channel = Self::connect(addr).await?;
733 let namespace = DEFAULT_NAMESPACE;
734
735 for (name, container_target) in inputs {
736 let prefix = container_target.strip_prefix('/').unwrap_or(container_target);
737 let repackaged = volume
738 .repackage_with_prefix(name, prefix)
739 .map_err(|e| WfeError::StepExecution(format!("failed to repackage artifact: {e}")))?;
740
741 let (digest, size) = Self::upload_content(&channel, namespace, &repackaged).await?;
742
743 self.artifact_applies.push((
744 container_target.clone(),
745 Descriptor {
746 media_type: "application/vnd.oci.image.layer.v1.tar+gzip".to_string(),
747 digest,
748 size,
749 annotations: HashMap::new(),
750 },
751 ));
752 }
753
754 Ok(())
755 }
756
757 async fn unmount_artifacts(
758 &mut self,
759 _context: &StepExecutionContext<'_>,
760 ) -> wfe_core::Result<()> {
761 self.artifact_applies.clear();
764 Ok(())
765 }
766
767 async fn run(
768 &mut self,
769 context: &StepExecutionContext<'_>,
770 ) -> wfe_core::Result<ExecutionResult> {
771 let step_name = context.step.name.as_deref().unwrap_or("unknown");
772 let namespace = DEFAULT_NAMESPACE;
773
774 let addr = &self.config.containerd_addr;
776 tracing::info!(addr = %addr, "connecting to containerd daemon");
777 let channel = Self::connect(addr).await?;
778
779 {
781 let mut version_client = VersionClient::new(channel.clone());
782 let req = Self::with_namespace((), namespace);
783 match version_client.version(req).await {
784 Ok(resp) => {
785 let v = resp.into_inner();
786 tracing::info!(
787 version = %v.version,
788 revision = %v.revision,
789 "connected to containerd"
790 );
791 }
792 Err(e) => {
793 return Err(WfeError::StepExecution(format!(
794 "containerd version check failed: {e}"
795 )));
796 }
797 }
798 }
799
800 let should_check = !matches!(self.config.pull.as_str(), "never");
802 if should_check {
803 Self::ensure_image(&channel, &self.config.image, namespace).await?;
804 }
805
806 let container_id = format!("wfe-{}", uuid::Uuid::new_v4());
808
809 let mut merged_env: HashMap<String, String> = HashMap::new();
811 if let Some(data_obj) = context.workflow.data.as_object() {
812 for (key, value) in data_obj {
813 let env_key = key.to_uppercase();
814 let env_val = match value {
815 serde_json::Value::String(s) => s.clone(),
816 other => other.to_string(),
817 };
818 merged_env.insert(env_key, env_val);
819 }
820 }
821 for (key, value) in &self.config.env {
823 merged_env.insert(key.clone(), value.clone());
824 }
825
826 let oci_spec = self.build_oci_spec(&merged_env);
828
829 tracing::info!(container_id = %container_id, image = %self.config.image, "creating container");
831 let mut containers_client = ContainersClient::new(channel.clone());
832 let create_req = Self::with_namespace(
833 CreateContainerRequest {
834 container: Some(Container {
835 id: container_id.clone(),
836 image: self.config.image.clone(),
837 runtime: Some(Runtime {
838 name: "io.containerd.runc.v2".to_string(),
839 options: None,
840 }),
841 spec: Some(oci_spec),
842 snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
843 snapshot_key: container_id.clone(),
844 labels: HashMap::new(),
845 created_at: None,
846 updated_at: None,
847 extensions: HashMap::new(),
848 sandbox: String::new(),
849 }),
850 },
851 namespace,
852 );
853
854 containers_client
855 .create(create_req)
856 .await
857 .map_err(|e| WfeError::StepExecution(format!("failed to create container: {e}")))?;
858
859 let mut snapshots_client = SnapshotsClient::new(channel.clone());
861
862 let mounts = {
863 let mounts_req = Self::with_namespace(
865 MountsRequest {
866 snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
867 key: container_id.clone(),
868 },
869 namespace,
870 );
871
872 match snapshots_client.mounts(mounts_req).await {
873 Ok(resp) => resp.into_inner().mounts,
874 Err(_) => {
875 let parent = if should_check {
877 Self::resolve_image_chain_id(&channel, &self.config.image, namespace)
878 .await?
879 } else {
880 String::new()
881 };
882
883 let prepare_req = Self::with_namespace(
884 PrepareSnapshotRequest {
885 snapshotter: DEFAULT_SNAPSHOTTER.to_string(),
886 key: container_id.clone(),
887 parent,
888 labels: HashMap::new(),
889 },
890 namespace,
891 );
892 snapshots_client
893 .prepare(prepare_req)
894 .await
895 .map_err(|e| {
896 WfeError::StepExecution(format!("failed to prepare snapshot: {e}"))
897 })?
898 .into_inner()
899 .mounts
900 }
901 }
902 };
903
904 for (_target, descriptor) in &self.artifact_applies {
906 Self::apply_diff(&channel, namespace, mounts.clone(), descriptor.clone()).await?;
907 }
908
909 let io_base = std::env::var("WFE_IO_DIR")
913 .map(std::path::PathBuf::from)
914 .unwrap_or_else(|_| std::env::temp_dir());
915 let tmp_dir = io_base.join(format!("wfe-io-{container_id}"));
916 std::fs::create_dir_all(&tmp_dir)
917 .map_err(|e| WfeError::StepExecution(format!("failed to create IO temp dir: {e}")))?;
918
919 let stdout_path = tmp_dir.join("stdout");
920 let stderr_path = tmp_dir.join("stderr");
921
922 for path in [&stdout_path, &stderr_path] {
926 let _ = std::fs::remove_file(path);
927 std::fs::File::create(path).map_err(|e| {
928 WfeError::StepExecution(format!("failed to create IO file {}: {e}", path.display()))
929 })?;
930 #[cfg(unix)]
932 {
933 use std::os::unix::fs::PermissionsExt;
934 std::fs::set_permissions(path, std::fs::Permissions::from_mode(0o666)).ok();
935 }
936 }
937
938 let stdout_str = stdout_path.to_string_lossy().to_string();
939 let stderr_str = stderr_path.to_string_lossy().to_string();
940
941 let mut tasks_client = TasksClient::new(channel.clone());
943
944 let create_task_req = Self::with_namespace(
945 CreateTaskRequest {
946 container_id: container_id.clone(),
947 rootfs: mounts,
948 stdin: String::new(),
949 stdout: stdout_str.clone(),
950 stderr: stderr_str.clone(),
951 terminal: false,
952 checkpoint: None,
953 options: None,
954 runtime_path: String::new(),
955 },
956 namespace,
957 );
958
959 tasks_client
960 .create(create_task_req)
961 .await
962 .map_err(|e| WfeError::StepExecution(format!("failed to create task: {e}")))?;
963
964 let start_req = Self::with_namespace(
966 StartRequest {
967 container_id: container_id.clone(),
968 exec_id: String::new(),
969 },
970 namespace,
971 );
972
973 tasks_client
974 .start(start_req)
975 .await
976 .map_err(|e| WfeError::StepExecution(format!("failed to start task: {e}")))?;
977
978 tracing::info!(container_id = %container_id, "task started");
979
980 let wait_req = Self::with_namespace(
982 WaitRequest {
983 container_id: container_id.clone(),
984 exec_id: String::new(),
985 },
986 namespace,
987 );
988
989 let wait_result = if let Some(timeout_ms) = self.config.timeout_ms {
990 let duration = std::time::Duration::from_millis(timeout_ms);
991 match tokio::time::timeout(duration, tasks_client.wait(wait_req)).await {
992 Ok(result) => result,
993 Err(_) => {
994 let _ = Self::cleanup(&channel, &container_id, namespace).await;
996 let _ = std::fs::remove_dir_all(&tmp_dir);
997 return Err(WfeError::StepExecution(format!(
998 "container execution timed out after {timeout_ms}ms"
999 )));
1000 }
1001 }
1002 } else {
1003 tasks_client.wait(wait_req).await
1004 };
1005
1006 let exit_status = match wait_result {
1007 Ok(resp) => resp.into_inner().exit_status,
1008 Err(e) => {
1009 let _ = Self::cleanup(&channel, &container_id, namespace).await;
1010 let _ = std::fs::remove_dir_all(&tmp_dir);
1011 return Err(WfeError::StepExecution(format!(
1012 "failed waiting for task: {e}"
1013 )));
1014 }
1015 };
1016
1017 let stdout_content = tokio::fs::read_to_string(&stdout_path)
1019 .await
1020 .unwrap_or_default();
1021 let stderr_content = tokio::fs::read_to_string(&stderr_path)
1022 .await
1023 .unwrap_or_default();
1024
1025 if let Err(e) = Self::cleanup(&channel, &container_id, namespace).await {
1027 tracing::warn!(container_id = %container_id, error = %e, "cleanup failed");
1028 }
1029 let _ = std::fs::remove_dir_all(&tmp_dir);
1030
1031 let exit_code = exit_status as i32;
1033 if exit_code != 0 {
1034 return Err(WfeError::StepExecution(format!(
1035 "container exited with code {exit_code}\nstdout: {stdout_content}\nstderr: {stderr_content}"
1036 )));
1037 }
1038
1039 let parsed = Self::parse_outputs(&stdout_content);
1041 let output_data = Self::build_output_data(
1042 step_name,
1043 &stdout_content,
1044 &stderr_content,
1045 exit_code,
1046 &parsed,
1047 );
1048
1049 Ok(ExecutionResult {
1050 proceed: true,
1051 output_data: Some(output_data),
1052 ..Default::default()
1053 })
1054 }
1055}
1056
1057impl ContainerdStep {
1058 pub(crate) async fn cleanup(
1060 channel: &Channel,
1061 container_id: &str,
1062 namespace: &str,
1063 ) -> Result<(), WfeError> {
1064 let mut tasks_client = TasksClient::new(channel.clone());
1065 let mut containers_client = ContainersClient::new(channel.clone());
1066
1067 let del_task_req = Self::with_namespace(
1069 DeleteTaskRequest {
1070 container_id: container_id.to_string(),
1071 },
1072 namespace,
1073 );
1074 let _ = tasks_client.delete(del_task_req).await;
1075
1076 let del_container_req = Self::with_namespace(
1078 DeleteContainerRequest {
1079 id: container_id.to_string(),
1080 },
1081 namespace,
1082 );
1083 containers_client
1084 .delete(del_container_req)
1085 .await
1086 .map_err(|e| WfeError::StepExecution(format!("failed to delete container: {e}")))?;
1087
1088 Ok(())
1089 }
1090}
1091
1092#[cfg(test)]
1093mod tests {
1094 use super::*;
1095 use crate::config::{TlsConfig, VolumeMountConfig};
1096 use pretty_assertions::assert_eq;
1097
1098 fn minimal_config() -> ContainerdConfig {
1099 ContainerdConfig {
1100 image: "alpine:3.18".to_string(),
1101 command: None,
1102 run: Some("echo hello".to_string()),
1103 env: HashMap::new(),
1104 volumes: vec![],
1105 working_dir: None,
1106 user: "65534:65534".to_string(),
1107 network: "none".to_string(),
1108 memory: None,
1109 cpu: None,
1110 pull: "if-not-present".to_string(),
1111 containerd_addr: "/run/containerd/containerd.sock".to_string(),
1112 cli: "nerdctl".to_string(),
1113 tls: TlsConfig::default(),
1114 registry_auth: HashMap::new(),
1115 inputs: None,
1116 timeout_ms: None,
1117 }
1118 }
1119
1120 #[test]
1123 fn parse_outputs_empty() {
1124 let outputs = ContainerdStep::parse_outputs("");
1125 assert!(outputs.is_empty());
1126 }
1127
1128 #[test]
1129 fn parse_outputs_single() {
1130 let stdout = "some log line\n##wfe[output version=1.2.3]\nmore logs\n";
1131 let outputs = ContainerdStep::parse_outputs(stdout);
1132 assert_eq!(outputs.len(), 1);
1133 assert_eq!(outputs.get("version").unwrap(), "1.2.3");
1134 }
1135
1136 #[test]
1137 fn parse_outputs_multiple() {
1138 let stdout = "##wfe[output foo=bar]\n##wfe[output baz=qux]\n";
1139 let outputs = ContainerdStep::parse_outputs(stdout);
1140 assert_eq!(outputs.len(), 2);
1141 assert_eq!(outputs.get("foo").unwrap(), "bar");
1142 assert_eq!(outputs.get("baz").unwrap(), "qux");
1143 }
1144
1145 #[test]
1146 fn parse_outputs_mixed_with_regular_stdout() {
1147 let stdout = "Starting container...\n\
1148 Pulling image...\n\
1149 ##wfe[output digest=sha256:abc123]\n\
1150 Running tests...\n\
1151 ##wfe[output result=pass]\n\
1152 Done.\n";
1153 let outputs = ContainerdStep::parse_outputs(stdout);
1154 assert_eq!(outputs.len(), 2);
1155 assert_eq!(outputs.get("digest").unwrap(), "sha256:abc123");
1156 assert_eq!(outputs.get("result").unwrap(), "pass");
1157 }
1158
1159 #[test]
1160 fn parse_outputs_no_wfe_lines() {
1161 let stdout = "line 1\nline 2\nline 3\n";
1162 let outputs = ContainerdStep::parse_outputs(stdout);
1163 assert!(outputs.is_empty());
1164 }
1165
1166 #[test]
1167 fn parse_outputs_value_with_equals_sign() {
1168 let stdout = "##wfe[output url=https://example.com?a=1&b=2]\n";
1169 let outputs = ContainerdStep::parse_outputs(stdout);
1170 assert_eq!(outputs.len(), 1);
1171 assert_eq!(outputs.get("url").unwrap(), "https://example.com?a=1&b=2");
1172 }
1173
1174 #[test]
1175 fn parse_outputs_ignores_malformed_lines() {
1176 let stdout = "##wfe[output ]\n\
1177 ##wfe[output no_equals]\n\
1178 ##wfe[output valid=yes]\n\
1179 ##wfe[output_extra bad=val]\n";
1180 let outputs = ContainerdStep::parse_outputs(stdout);
1181 assert_eq!(outputs.len(), 1);
1182 assert_eq!(outputs.get("valid").unwrap(), "yes");
1183 }
1184
1185 #[test]
1186 fn parse_outputs_overwrites_duplicate_keys() {
1187 let stdout = "##wfe[output key=first]\n##wfe[output key=second]\n";
1188 let outputs = ContainerdStep::parse_outputs(stdout);
1189 assert_eq!(outputs.len(), 1);
1190 assert_eq!(outputs.get("key").unwrap(), "second");
1191 }
1192
1193 #[test]
1196 fn build_output_data_basic() {
1197 let parsed = HashMap::from([("result".to_string(), "success".to_string())]);
1198 let data = ContainerdStep::build_output_data("my_step", "hello world\n", "", 0, &parsed);
1199
1200 let obj = data.as_object().unwrap();
1201 assert_eq!(obj.get("result").unwrap(), "success");
1202 assert_eq!(obj.get("my_step.stdout").unwrap(), "hello world\n");
1203 assert_eq!(obj.get("my_step.stderr").unwrap(), "");
1204 assert_eq!(obj.get("my_step.exit_code").unwrap(), 0);
1205 }
1206
1207 #[test]
1208 fn build_output_data_no_parsed_outputs() {
1209 let data = ContainerdStep::build_output_data("step1", "out", "err", 1, &HashMap::new());
1210
1211 let obj = data.as_object().unwrap();
1212 assert_eq!(obj.len(), 3); assert_eq!(obj.get("step1.stdout").unwrap(), "out");
1214 assert_eq!(obj.get("step1.stderr").unwrap(), "err");
1215 assert_eq!(obj.get("step1.exit_code").unwrap(), 1);
1216 }
1217
1218 #[test]
1219 fn build_output_data_with_multiple_parsed_outputs() {
1220 let parsed = HashMap::from([
1221 ("a".to_string(), "1".to_string()),
1222 ("b".to_string(), "2".to_string()),
1223 ("c".to_string(), "3".to_string()),
1224 ]);
1225 let data = ContainerdStep::build_output_data("s", "", "", 0, &parsed);
1226
1227 let obj = data.as_object().unwrap();
1228 assert_eq!(obj.get("a").unwrap(), "1");
1229 assert_eq!(obj.get("b").unwrap(), "2");
1230 assert_eq!(obj.get("c").unwrap(), "3");
1231 assert_eq!(obj.len(), 6);
1233 }
1234
1235 #[test]
1236 fn build_output_data_negative_exit_code() {
1237 let data = ContainerdStep::build_output_data("s", "", "", -1, &HashMap::new());
1238 let obj = data.as_object().unwrap();
1239 assert_eq!(obj.get("s.exit_code").unwrap(), -1);
1240 }
1241
1242 #[test]
1245 fn parse_user_spec_normal() {
1246 assert_eq!(parse_user_spec("1000:1000"), (1000, 1000));
1247 }
1248
1249 #[test]
1250 fn parse_user_spec_root() {
1251 assert_eq!(parse_user_spec("0:0"), (0, 0));
1252 }
1253
1254 #[test]
1255 fn parse_user_spec_default() {
1256 assert_eq!(parse_user_spec("65534:65534"), (65534, 65534));
1257 }
1258
1259 #[test]
1260 fn parse_user_spec_invalid_falls_back() {
1261 assert_eq!(parse_user_spec("abc"), (65534, 65534));
1262 }
1263
1264 #[test]
1267 fn build_oci_spec_minimal() {
1268 let step = ContainerdStep::new(minimal_config());
1269 let env = HashMap::new();
1270 let spec = step.build_oci_spec(&env);
1271
1272 assert_eq!(
1273 spec.type_url,
1274 "types.containerd.io/opencontainers/runtime-spec/1/Spec"
1275 );
1276 assert!(!spec.value.is_empty());
1277
1278 let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1280 assert_eq!(parsed["ociVersion"], "1.0.2");
1281 assert_eq!(parsed["process"]["args"][0], "/bin/sh");
1282 assert_eq!(parsed["process"]["args"][1], "-c");
1283 assert_eq!(parsed["process"]["args"][2], "echo hello");
1284 assert_eq!(parsed["process"]["user"]["uid"], 65534);
1285 assert_eq!(parsed["process"]["user"]["gid"], 65534);
1286 assert_eq!(parsed["process"]["cwd"], "/");
1287 }
1288
1289 #[test]
1290 fn build_oci_spec_with_command() {
1291 let mut config = minimal_config();
1292 config.run = None;
1293 config.command = Some(vec![
1294 "echo".to_string(),
1295 "hello".to_string(),
1296 "world".to_string(),
1297 ]);
1298 let step = ContainerdStep::new(config);
1299 let spec = step.build_oci_spec(&HashMap::new());
1300
1301 let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1302 assert_eq!(parsed["process"]["args"][0], "echo");
1303 assert_eq!(parsed["process"]["args"][1], "hello");
1304 assert_eq!(parsed["process"]["args"][2], "world");
1305 }
1306
1307 #[test]
1308 fn build_oci_spec_with_env() {
1309 let step = ContainerdStep::new(minimal_config());
1310 let env = HashMap::from([
1311 ("FOO".to_string(), "bar".to_string()),
1312 ("BAZ".to_string(), "qux".to_string()),
1313 ]);
1314 let spec = step.build_oci_spec(&env);
1315
1316 let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1317 let env_arr: Vec<String> = parsed["process"]["env"]
1318 .as_array()
1319 .unwrap()
1320 .iter()
1321 .map(|v| v.as_str().unwrap().to_string())
1322 .collect();
1323
1324 assert!(env_arr.contains(&"FOO=bar".to_string()));
1325 assert!(env_arr.contains(&"BAZ=qux".to_string()));
1326 }
1327
1328 #[test]
1329 fn build_oci_spec_with_working_dir() {
1330 let mut config = minimal_config();
1331 config.working_dir = Some("/app".to_string());
1332 let step = ContainerdStep::new(config);
1333 let spec = step.build_oci_spec(&HashMap::new());
1334
1335 let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1336 assert_eq!(parsed["process"]["cwd"], "/app");
1337 }
1338
1339 #[test]
1340 fn build_oci_spec_with_user() {
1341 let mut config = minimal_config();
1342 config.user = "1000:2000".to_string();
1343 let step = ContainerdStep::new(config);
1344 let spec = step.build_oci_spec(&HashMap::new());
1345
1346 let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1347 assert_eq!(parsed["process"]["user"]["uid"], 1000);
1348 assert_eq!(parsed["process"]["user"]["gid"], 2000);
1349 }
1350
1351 #[test]
1352 fn build_oci_spec_with_volumes() {
1353 let mut config = minimal_config();
1354 config.volumes = vec![
1355 VolumeMountConfig {
1356 source: "/host/data".to_string(),
1357 target: "/container/data".to_string(),
1358 readonly: false,
1359 },
1360 VolumeMountConfig {
1361 source: "/host/config".to_string(),
1362 target: "/etc/config".to_string(),
1363 readonly: true,
1364 },
1365 ];
1366 let step = ContainerdStep::new(config);
1367 let spec = step.build_oci_spec(&HashMap::new());
1368
1369 let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1370 let mounts = parsed["mounts"].as_array().unwrap();
1371 assert_eq!(mounts.len(), 5);
1373
1374 let bind_mounts: Vec<&serde_json::Value> =
1375 mounts.iter().filter(|m| m["type"] == "bind").collect();
1376 assert_eq!(bind_mounts.len(), 2);
1377
1378 let ro_mount = bind_mounts
1379 .iter()
1380 .find(|m| m["destination"] == "/etc/config")
1381 .unwrap();
1382 let opts: Vec<String> = ro_mount["options"]
1383 .as_array()
1384 .unwrap()
1385 .iter()
1386 .map(|v| v.as_str().unwrap().to_string())
1387 .collect();
1388 assert!(opts.contains(&"ro".to_string()));
1389 }
1390
1391 #[test]
1392 fn build_oci_spec_no_command_no_run() {
1393 let mut config = minimal_config();
1394 config.run = None;
1395 config.command = None;
1396 let step = ContainerdStep::new(config);
1397 let spec = step.build_oci_spec(&HashMap::new());
1398
1399 let parsed: serde_json::Value = serde_json::from_slice(&spec.value).unwrap();
1400 assert!(parsed["process"]["args"].as_array().unwrap().is_empty());
1401 }
1402
1403 #[tokio::test]
1406 async fn connect_to_missing_unix_socket_returns_error() {
1407 let err = ContainerdStep::connect("/tmp/nonexistent-wfe-containerd-test.sock")
1408 .await
1409 .unwrap_err();
1410 let msg = format!("{err}");
1411 assert!(
1412 msg.contains("socket not found"),
1413 "expected 'socket not found' error, got: {msg}"
1414 );
1415 }
1416
1417 #[tokio::test]
1418 async fn connect_to_missing_unix_socket_with_scheme_returns_error() {
1419 let err = ContainerdStep::connect("unix:///tmp/nonexistent-wfe-containerd-test.sock")
1420 .await
1421 .unwrap_err();
1422 let msg = format!("{err}");
1423 assert!(
1424 msg.contains("socket not found"),
1425 "expected 'socket not found' error, got: {msg}"
1426 );
1427 }
1428
1429 #[tokio::test]
1430 async fn connect_to_invalid_tcp_returns_error() {
1431 let err = ContainerdStep::connect("tcp://127.0.0.1:1")
1432 .await
1433 .unwrap_err();
1434 let msg = format!("{err}");
1435 assert!(
1436 msg.contains("failed to connect"),
1437 "expected connection error, got: {msg}"
1438 );
1439 }
1440
1441 #[test]
1444 fn new_creates_step_with_config() {
1445 let config = minimal_config();
1446 let step = ContainerdStep::new(config);
1447 assert_eq!(step.config.image, "alpine:3.18");
1448 assert_eq!(
1449 step.config.containerd_addr,
1450 "/run/containerd/containerd.sock"
1451 );
1452 }
1453}
1454
1455#[cfg(test)]
1457mod e2e_tests {
1458 use super::*;
1459
1460 fn containerd_addr() -> Option<String> {
1462 let addr = std::env::var("WFE_CONTAINERD_ADDR").unwrap_or_else(|_| {
1463 format!(
1464 "unix://{}/.lima/wfe-test/sock/containerd.sock",
1465 std::env::var("HOME").unwrap_or_else(|_| "/root".to_string())
1466 )
1467 });
1468
1469 let socket_path = addr.strip_prefix("unix://").unwrap_or(addr.as_str());
1470
1471 if Path::new(socket_path).exists() {
1472 Some(addr)
1473 } else {
1474 None
1475 }
1476 }
1477
1478 #[tokio::test]
1479 async fn e2e_version_check() {
1480 let Some(addr) = containerd_addr() else {
1481 eprintln!("SKIP: containerd socket not available");
1482 return;
1483 };
1484
1485 let channel = ContainerdStep::connect(&addr).await.unwrap();
1486 let mut client = VersionClient::new(channel);
1487
1488 let req = ContainerdStep::with_namespace((), DEFAULT_NAMESPACE);
1489 let resp = client.version(req).await.unwrap();
1490 let version = resp.into_inner();
1491
1492 assert!(!version.version.is_empty(), "version should not be empty");
1493 assert!(!version.revision.is_empty(), "revision should not be empty");
1494 eprintln!(
1495 "containerd version={} revision={}",
1496 version.version, version.revision
1497 );
1498 }
1499}