use std::collections::HashMap;
use std::sync::Mutex;
use async_trait::async_trait;
use wfe_core::models::service::{ServiceDefinition, ServiceEndpoint};
use wfe_core::traits::ServiceProvider;
pub struct ContainerdServiceProvider {
containerd_addr: String,
running: Mutex<HashMap<String, Vec<String>>>,
}
impl ContainerdServiceProvider {
pub fn new(containerd_addr: impl Into<String>) -> Self {
Self {
containerd_addr: containerd_addr.into(),
running: Mutex::new(HashMap::new()),
}
}
pub fn containerd_addr(&self) -> &str {
&self.containerd_addr
}
}
#[async_trait]
impl ServiceProvider for ContainerdServiceProvider {
fn can_provision(&self, _services: &[ServiceDefinition]) -> bool {
true }
async fn provision(
&self,
workflow_id: &str,
services: &[ServiceDefinition],
) -> wfe_core::Result<Vec<ServiceEndpoint>> {
let mut endpoints = Vec::new();
let mut container_ids = Vec::new();
for svc in services {
let container_id = format!("wfe-svc-{}-{}", svc.name, workflow_id);
crate::step::ContainerdStep::run_service(
&self.containerd_addr,
&container_id,
&svc.image,
&svc.env,
)
.await?;
container_ids.push(container_id);
endpoints.push(ServiceEndpoint {
name: svc.name.clone(),
host: "127.0.0.1".into(),
ports: svc.ports.clone(),
});
}
self.running
.lock()
.unwrap()
.insert(workflow_id.into(), container_ids);
Ok(endpoints)
}
async fn teardown(&self, workflow_id: &str) -> wfe_core::Result<()> {
let ids = self
.running
.lock()
.unwrap()
.remove(workflow_id)
.unwrap_or_default();
for container_id in ids {
crate::step::ContainerdStep::cleanup_service(&self.containerd_addr, &container_id)
.await
.ok();
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use wfe_core::models::service::ServicePort;
#[test]
fn can_provision_always_true() {
let provider = ContainerdServiceProvider::new("/run/containerd/containerd.sock");
let services = vec![ServiceDefinition {
name: "postgres".into(),
image: "postgres:15".into(),
ports: vec![ServicePort::tcp(5432)],
env: Default::default(),
readiness: None,
command: vec![],
args: vec![],
memory: None,
cpu: None,
}];
assert!(provider.can_provision(&services));
}
#[test]
fn can_provision_empty_services() {
let provider = ContainerdServiceProvider::new("/run/containerd/containerd.sock");
assert!(provider.can_provision(&[]));
}
#[test]
fn running_map_starts_empty() {
let provider = ContainerdServiceProvider::new("/run/containerd/containerd.sock");
assert!(provider.running.lock().unwrap().is_empty());
}
#[test]
fn containerd_addr_accessor() {
let provider = ContainerdServiceProvider::new("http://127.0.0.1:2500");
assert_eq!(provider.containerd_addr(), "http://127.0.0.1:2500");
}
}