workflow_graph_queue/memory/
workers.rs1use std::collections::HashMap;
2
3use tokio::sync::Mutex;
4
5use crate::error::RegistryError;
6use crate::traits::{WorkerInfo, WorkerRegistry, WorkerStatus};
7
8pub struct InMemoryWorkerRegistry {
9 workers: Mutex<HashMap<String, WorkerInfo>>,
10}
11
12impl InMemoryWorkerRegistry {
13 pub fn new() -> Self {
14 Self {
15 workers: Mutex::new(HashMap::new()),
16 }
17 }
18
19 fn now_ms() -> u64 {
20 std::time::SystemTime::now()
21 .duration_since(std::time::UNIX_EPOCH)
22 .unwrap_or_default()
23 .as_millis() as u64
24 }
25}
26
27impl Default for InMemoryWorkerRegistry {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl WorkerRegistry for InMemoryWorkerRegistry {
34 async fn register(&self, worker_id: &str, labels: &[String]) -> Result<(), RegistryError> {
35 let now = Self::now_ms();
36 self.workers.lock().await.insert(
37 worker_id.to_string(),
38 WorkerInfo {
39 worker_id: worker_id.to_string(),
40 labels: labels.to_vec(),
41 registered_at_ms: now,
42 last_heartbeat_ms: now,
43 current_job: None,
44 status: WorkerStatus::Idle,
45 },
46 );
47 Ok(())
48 }
49
50 async fn heartbeat(&self, worker_id: &str) -> Result<(), RegistryError> {
51 let mut workers = self.workers.lock().await;
52 let worker = workers
53 .get_mut(worker_id)
54 .ok_or_else(|| RegistryError::WorkerNotFound(worker_id.to_string()))?;
55 worker.last_heartbeat_ms = Self::now_ms();
56 Ok(())
57 }
58
59 async fn deregister(&self, worker_id: &str) -> Result<(), RegistryError> {
60 self.workers.lock().await.remove(worker_id);
61 Ok(())
62 }
63
64 async fn list_workers(&self) -> Result<Vec<WorkerInfo>, RegistryError> {
65 Ok(self.workers.lock().await.values().cloned().collect())
66 }
67
68 async fn mark_busy(&self, worker_id: &str, job_id: &str) -> Result<(), RegistryError> {
69 let mut workers = self.workers.lock().await;
70 let worker = workers
71 .get_mut(worker_id)
72 .ok_or_else(|| RegistryError::WorkerNotFound(worker_id.to_string()))?;
73 worker.status = WorkerStatus::Busy;
74 worker.current_job = Some(job_id.to_string());
75 worker.last_heartbeat_ms = Self::now_ms();
76 Ok(())
77 }
78
79 async fn mark_idle(&self, worker_id: &str) -> Result<(), RegistryError> {
80 let mut workers = self.workers.lock().await;
81 let worker = workers
82 .get_mut(worker_id)
83 .ok_or_else(|| RegistryError::WorkerNotFound(worker_id.to_string()))?;
84 worker.status = WorkerStatus::Idle;
85 worker.current_job = None;
86 worker.last_heartbeat_ms = Self::now_ms();
87 Ok(())
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use super::*;
94
95 #[tokio::test]
96 async fn test_register_and_list() {
97 let registry = InMemoryWorkerRegistry::new();
98 registry
99 .register("w1", &["docker".into(), "linux".into()])
100 .await
101 .unwrap();
102
103 let workers = registry.list_workers().await.unwrap();
104 assert_eq!(workers.len(), 1);
105 assert_eq!(workers[0].worker_id, "w1");
106 assert_eq!(workers[0].labels, vec!["docker", "linux"]);
107 assert_eq!(workers[0].status, WorkerStatus::Idle);
108 }
109
110 #[tokio::test]
111 async fn test_mark_busy_and_idle() {
112 let registry = InMemoryWorkerRegistry::new();
113 registry.register("w1", &[]).await.unwrap();
114
115 registry.mark_busy("w1", "j1").await.unwrap();
116 let workers = registry.list_workers().await.unwrap();
117 assert_eq!(workers[0].status, WorkerStatus::Busy);
118 assert_eq!(workers[0].current_job.as_deref(), Some("j1"));
119
120 registry.mark_idle("w1").await.unwrap();
121 let workers = registry.list_workers().await.unwrap();
122 assert_eq!(workers[0].status, WorkerStatus::Idle);
123 assert!(workers[0].current_job.is_none());
124 }
125
126 #[tokio::test]
127 async fn test_deregister() {
128 let registry = InMemoryWorkerRegistry::new();
129 registry.register("w1", &[]).await.unwrap();
130 registry.deregister("w1").await.unwrap();
131
132 let workers = registry.list_workers().await.unwrap();
133 assert!(workers.is_empty());
134 }
135}