Skip to main content

workflow_graph_queue/memory/
workers.rs

1use std::collections::HashMap;
2
3use tokio::sync::Mutex;
4
5use crate::error::RegistryError;
6use crate::traits::{WorkerInfo, WorkerRegistry, WorkerStatus};
7
8pub struct InMemoryWorkerRegistry {
9    workers: Mutex<HashMap<String, WorkerInfo>>,
10}
11
12impl InMemoryWorkerRegistry {
13    pub fn new() -> Self {
14        Self {
15            workers: Mutex::new(HashMap::new()),
16        }
17    }
18
19    fn now_ms() -> u64 {
20        std::time::SystemTime::now()
21            .duration_since(std::time::UNIX_EPOCH)
22            .unwrap_or_default()
23            .as_millis() as u64
24    }
25}
26
27impl Default for InMemoryWorkerRegistry {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33impl WorkerRegistry for InMemoryWorkerRegistry {
34    async fn register(&self, worker_id: &str, labels: &[String]) -> Result<(), RegistryError> {
35        let now = Self::now_ms();
36        self.workers.lock().await.insert(
37            worker_id.to_string(),
38            WorkerInfo {
39                worker_id: worker_id.to_string(),
40                labels: labels.to_vec(),
41                registered_at_ms: now,
42                last_heartbeat_ms: now,
43                current_job: None,
44                status: WorkerStatus::Idle,
45            },
46        );
47        Ok(())
48    }
49
50    async fn heartbeat(&self, worker_id: &str) -> Result<(), RegistryError> {
51        let mut workers = self.workers.lock().await;
52        let worker = workers
53            .get_mut(worker_id)
54            .ok_or_else(|| RegistryError::WorkerNotFound(worker_id.to_string()))?;
55        worker.last_heartbeat_ms = Self::now_ms();
56        Ok(())
57    }
58
59    async fn deregister(&self, worker_id: &str) -> Result<(), RegistryError> {
60        self.workers.lock().await.remove(worker_id);
61        Ok(())
62    }
63
64    async fn list_workers(&self) -> Result<Vec<WorkerInfo>, RegistryError> {
65        Ok(self.workers.lock().await.values().cloned().collect())
66    }
67
68    async fn mark_busy(&self, worker_id: &str, job_id: &str) -> Result<(), RegistryError> {
69        let mut workers = self.workers.lock().await;
70        let worker = workers
71            .get_mut(worker_id)
72            .ok_or_else(|| RegistryError::WorkerNotFound(worker_id.to_string()))?;
73        worker.status = WorkerStatus::Busy;
74        worker.current_job = Some(job_id.to_string());
75        worker.last_heartbeat_ms = Self::now_ms();
76        Ok(())
77    }
78
79    async fn mark_idle(&self, worker_id: &str) -> Result<(), RegistryError> {
80        let mut workers = self.workers.lock().await;
81        let worker = workers
82            .get_mut(worker_id)
83            .ok_or_else(|| RegistryError::WorkerNotFound(worker_id.to_string()))?;
84        worker.status = WorkerStatus::Idle;
85        worker.current_job = None;
86        worker.last_heartbeat_ms = Self::now_ms();
87        Ok(())
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94
95    #[tokio::test]
96    async fn test_register_and_list() {
97        let registry = InMemoryWorkerRegistry::new();
98        registry
99            .register("w1", &["docker".into(), "linux".into()])
100            .await
101            .unwrap();
102
103        let workers = registry.list_workers().await.unwrap();
104        assert_eq!(workers.len(), 1);
105        assert_eq!(workers[0].worker_id, "w1");
106        assert_eq!(workers[0].labels, vec!["docker", "linux"]);
107        assert_eq!(workers[0].status, WorkerStatus::Idle);
108    }
109
110    #[tokio::test]
111    async fn test_mark_busy_and_idle() {
112        let registry = InMemoryWorkerRegistry::new();
113        registry.register("w1", &[]).await.unwrap();
114
115        registry.mark_busy("w1", "j1").await.unwrap();
116        let workers = registry.list_workers().await.unwrap();
117        assert_eq!(workers[0].status, WorkerStatus::Busy);
118        assert_eq!(workers[0].current_job.as_deref(), Some("j1"));
119
120        registry.mark_idle("w1").await.unwrap();
121        let workers = registry.list_workers().await.unwrap();
122        assert_eq!(workers[0].status, WorkerStatus::Idle);
123        assert!(workers[0].current_job.is_none());
124    }
125
126    #[tokio::test]
127    async fn test_deregister() {
128        let registry = InMemoryWorkerRegistry::new();
129        registry.register("w1", &[]).await.unwrap();
130        registry.deregister("w1").await.unwrap();
131
132        let workers = registry.list_workers().await.unwrap();
133        assert!(workers.is_empty());
134    }
135}