wfe_containerd/
service_provider.rs1use std::collections::HashMap;
2use std::sync::Mutex;
3
4use async_trait::async_trait;
5use wfe_core::models::service::{ServiceDefinition, ServiceEndpoint};
6use wfe_core::traits::ServiceProvider;
7
8pub struct ContainerdServiceProvider {
14 containerd_addr: String,
15 running: Mutex<HashMap<String, Vec<String>>>,
17}
18
19impl ContainerdServiceProvider {
20 pub fn new(containerd_addr: impl Into<String>) -> Self {
21 Self {
22 containerd_addr: containerd_addr.into(),
23 running: Mutex::new(HashMap::new()),
24 }
25 }
26
27 pub fn containerd_addr(&self) -> &str {
29 &self.containerd_addr
30 }
31}
32
33#[async_trait]
34impl ServiceProvider for ContainerdServiceProvider {
35 fn can_provision(&self, _services: &[ServiceDefinition]) -> bool {
36 true }
38
39 async fn provision(
40 &self,
41 workflow_id: &str,
42 services: &[ServiceDefinition],
43 ) -> wfe_core::Result<Vec<ServiceEndpoint>> {
44 let mut endpoints = Vec::new();
45 let mut container_ids = Vec::new();
46
47 for svc in services {
48 let container_id = format!("wfe-svc-{}-{}", svc.name, workflow_id);
49
50 crate::step::ContainerdStep::run_service(
54 &self.containerd_addr,
55 &container_id,
56 &svc.image,
57 &svc.env,
58 )
59 .await?;
60
61 container_ids.push(container_id);
62
63 endpoints.push(ServiceEndpoint {
64 name: svc.name.clone(),
65 host: "127.0.0.1".into(),
66 ports: svc.ports.clone(),
67 });
68 }
69
70 self.running
71 .lock()
72 .unwrap()
73 .insert(workflow_id.into(), container_ids);
74
75 Ok(endpoints)
76 }
77
78 async fn teardown(&self, workflow_id: &str) -> wfe_core::Result<()> {
79 let ids = self
80 .running
81 .lock()
82 .unwrap()
83 .remove(workflow_id)
84 .unwrap_or_default();
85
86 for container_id in ids {
87 crate::step::ContainerdStep::cleanup_service(&self.containerd_addr, &container_id)
88 .await
89 .ok();
90 }
91
92 Ok(())
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99 use wfe_core::models::service::ServicePort;
100
101 #[test]
102 fn can_provision_always_true() {
103 let provider = ContainerdServiceProvider::new("/run/containerd/containerd.sock");
104 let services = vec![ServiceDefinition {
105 name: "postgres".into(),
106 image: "postgres:15".into(),
107 ports: vec![ServicePort::tcp(5432)],
108 env: Default::default(),
109 readiness: None,
110 command: vec![],
111 args: vec![],
112 memory: None,
113 cpu: None,
114 }];
115 assert!(provider.can_provision(&services));
116 }
117
118 #[test]
119 fn can_provision_empty_services() {
120 let provider = ContainerdServiceProvider::new("/run/containerd/containerd.sock");
121 assert!(provider.can_provision(&[]));
122 }
123
124 #[test]
125 fn running_map_starts_empty() {
126 let provider = ContainerdServiceProvider::new("/run/containerd/containerd.sock");
127 assert!(provider.running.lock().unwrap().is_empty());
128 }
129
130 #[test]
131 fn containerd_addr_accessor() {
132 let provider = ContainerdServiceProvider::new("http://127.0.0.1:2500");
133 assert_eq!(provider.containerd_addr(), "http://127.0.0.1:2500");
134 }
135}