Skip to main content

wfe_containerd/
service_provider.rs

1use std::collections::HashMap;
2use std::sync::Mutex;
3
4use async_trait::async_trait;
5use wfe_core::models::service::{ServiceDefinition, ServiceEndpoint};
6use wfe_core::traits::ServiceProvider;
7
8/// Provisions infrastructure services as containerd containers on the host network.
9///
10/// Services are accessible via `127.0.0.1` on their declared ports.
11/// Connection info is injected as `SVC_{NAME}_HOST` / `SVC_{NAME}_PORT` env vars
12/// into workflow data.
13pub struct ContainerdServiceProvider {
14    containerd_addr: String,
15    /// Track running service containers per workflow for teardown.
16    running: Mutex<HashMap<String, Vec<String>>>,
17}
18
19impl ContainerdServiceProvider {
20    pub fn new(containerd_addr: impl Into<String>) -> Self {
21        Self {
22            containerd_addr: containerd_addr.into(),
23            running: Mutex::new(HashMap::new()),
24        }
25    }
26
27    /// Get the containerd address this provider connects to.
28    pub fn containerd_addr(&self) -> &str {
29        &self.containerd_addr
30    }
31}
32
33#[async_trait]
34impl ServiceProvider for ContainerdServiceProvider {
35    fn can_provision(&self, _services: &[ServiceDefinition]) -> bool {
36        true // containerd can run any OCI image
37    }
38
39    async fn provision(
40        &self,
41        workflow_id: &str,
42        services: &[ServiceDefinition],
43    ) -> wfe_core::Result<Vec<ServiceEndpoint>> {
44        let mut endpoints = Vec::new();
45        let mut container_ids = Vec::new();
46
47        for svc in services {
48            let container_id = format!("wfe-svc-{}-{}", svc.name, workflow_id);
49
50            // Create and start the service container via containerd gRPC.
51            // This reuses the same connection and container lifecycle as ContainerdStep
52            // but starts the container without waiting for it to exit.
53            crate::step::ContainerdStep::run_service(
54                &self.containerd_addr,
55                &container_id,
56                &svc.image,
57                &svc.env,
58            )
59            .await?;
60
61            container_ids.push(container_id);
62
63            endpoints.push(ServiceEndpoint {
64                name: svc.name.clone(),
65                host: "127.0.0.1".into(),
66                ports: svc.ports.clone(),
67            });
68        }
69
70        self.running
71            .lock()
72            .unwrap()
73            .insert(workflow_id.into(), container_ids);
74
75        Ok(endpoints)
76    }
77
78    async fn teardown(&self, workflow_id: &str) -> wfe_core::Result<()> {
79        let ids = self
80            .running
81            .lock()
82            .unwrap()
83            .remove(workflow_id)
84            .unwrap_or_default();
85
86        for container_id in ids {
87            crate::step::ContainerdStep::cleanup_service(&self.containerd_addr, &container_id)
88                .await
89                .ok();
90        }
91
92        Ok(())
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99    use wfe_core::models::service::ServicePort;
100
101    #[test]
102    fn can_provision_always_true() {
103        let provider = ContainerdServiceProvider::new("/run/containerd/containerd.sock");
104        let services = vec![ServiceDefinition {
105            name: "postgres".into(),
106            image: "postgres:15".into(),
107            ports: vec![ServicePort::tcp(5432)],
108            env: Default::default(),
109            readiness: None,
110            command: vec![],
111            args: vec![],
112            memory: None,
113            cpu: None,
114        }];
115        assert!(provider.can_provision(&services));
116    }
117
118    #[test]
119    fn can_provision_empty_services() {
120        let provider = ContainerdServiceProvider::new("/run/containerd/containerd.sock");
121        assert!(provider.can_provision(&[]));
122    }
123
124    #[test]
125    fn running_map_starts_empty() {
126        let provider = ContainerdServiceProvider::new("/run/containerd/containerd.sock");
127        assert!(provider.running.lock().unwrap().is_empty());
128    }
129
130    #[test]
131    fn containerd_addr_accessor() {
132        let provider = ContainerdServiceProvider::new("http://127.0.0.1:2500");
133        assert_eq!(provider.containerd_addr(), "http://127.0.0.1:2500");
134    }
135}