floxide_core/distributed/
liveness_store.rs1use crate::distributed::LivenessStoreError;
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13
14#[derive(Debug, Clone, Default, Serialize, Deserialize)]
15pub enum WorkerStatus {
16 #[default]
17 Idle,
18 InProgress,
19 Retrying(usize, usize), }
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct WorkerHealth {
25 pub worker_id: usize,
27 pub last_heartbeat: DateTime<Utc>,
29 pub error_count: usize,
31 pub status: WorkerStatus,
33 pub current_work_item: Option<String>,
35 pub current_work_item_run_id: Option<String>,
37}
38
39impl Default for WorkerHealth {
40 fn default() -> Self {
41 Self {
42 worker_id: 0,
43 last_heartbeat: chrono::Utc::now(),
44 error_count: 0,
45 status: WorkerStatus::Idle,
46 current_work_item: None,
47 current_work_item_run_id: None,
48 }
49 }
50}
51
52#[async_trait]
56pub trait LivenessStore {
57 async fn update_heartbeat(
59 &self,
60 worker_id: usize,
61 timestamp: DateTime<Utc>,
62 ) -> Result<(), LivenessStoreError>;
63 async fn get_heartbeat(
65 &self,
66 worker_id: usize,
67 ) -> Result<Option<DateTime<Utc>>, LivenessStoreError>;
68 async fn list_workers(&self) -> Result<Vec<usize>, LivenessStoreError>;
70 async fn update_health(
72 &self,
73 worker_id: usize,
74 health: WorkerHealth,
75 ) -> Result<(), LivenessStoreError>;
76 async fn get_health(
78 &self,
79 worker_id: usize,
80 ) -> Result<Option<WorkerHealth>, LivenessStoreError>;
81 async fn list_health(&self) -> Result<Vec<WorkerHealth>, LivenessStoreError>;
83}
84
85#[derive(Clone, Default)]
87pub struct InMemoryLivenessStore {
88 inner: Arc<Mutex<HashMap<usize, DateTime<Utc>>>>,
89 health: Arc<Mutex<HashMap<usize, WorkerHealth>>>,
90}
91
92#[async_trait]
93impl LivenessStore for InMemoryLivenessStore {
94 async fn update_heartbeat(
95 &self,
96 worker_id: usize,
97 timestamp: DateTime<Utc>,
98 ) -> Result<(), LivenessStoreError> {
99 let mut map = self.inner.lock().await;
100 map.insert(worker_id, timestamp);
101 Ok(())
102 }
103 async fn get_heartbeat(
104 &self,
105 worker_id: usize,
106 ) -> Result<Option<DateTime<Utc>>, LivenessStoreError> {
107 let map = self.inner.lock().await;
108 Ok(map.get(&worker_id).cloned())
109 }
110 async fn list_workers(&self) -> Result<Vec<usize>, LivenessStoreError> {
111 let map = self.inner.lock().await;
112 Ok(map.keys().cloned().collect())
113 }
114 async fn update_health(
115 &self,
116 worker_id: usize,
117 health: WorkerHealth,
118 ) -> Result<(), LivenessStoreError> {
119 let mut map = self.health.lock().await;
120 map.insert(worker_id, health);
121 Ok(())
122 }
123 async fn get_health(
124 &self,
125 worker_id: usize,
126 ) -> Result<Option<WorkerHealth>, LivenessStoreError> {
127 let map = self.health.lock().await;
128 Ok(map.get(&worker_id).cloned())
129 }
130 async fn list_health(&self) -> Result<Vec<WorkerHealth>, LivenessStoreError> {
131 let map = self.health.lock().await;
132 Ok(map.values().cloned().collect())
133 }
134}