floxide_core/distributed/
liveness_store.rs

1//! Liveness and health tracking for distributed workflow workers.
2//!
3//! This module defines the LivenessStore trait for tracking worker heartbeats and health status,
4//! and provides an in-memory implementation for testing and local development.
5
6use crate::distributed::LivenessStoreError;
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::Mutex;
13
14#[derive(Debug, Clone, Default, Serialize, Deserialize)]
15pub enum WorkerStatus {
16    #[default]
17    Idle,
18    InProgress,
19    Retrying(usize, usize), // (attempt, max_attempts)
20}
21
22/// Health and status information for a workflow worker.
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct WorkerHealth {
25    /// Unique worker ID.
26    pub worker_id: usize,
27    /// Timestamp of the last heartbeat.
28    pub last_heartbeat: DateTime<Utc>,
29    /// Number of errors encountered by this worker.
30    pub error_count: usize,
31    /// Optional custom status string (e.g., "in progress", "permanently failed").
32    pub status: WorkerStatus,
33    /// Worker's current work item.
34    pub current_work_item: Option<String>,
35    /// Worker's current work item's run ID.
36    pub current_work_item_run_id: Option<String>,
37}
38
39impl Default for WorkerHealth {
40    fn default() -> Self {
41        Self {
42            worker_id: 0,
43            last_heartbeat: chrono::Utc::now(),
44            error_count: 0,
45            status: WorkerStatus::Idle,
46            current_work_item: None,
47            current_work_item_run_id: None,
48        }
49    }
50}
51
52/// Trait for a distributed workflow liveness/health store.
53///
54/// Implementations track worker heartbeats and health for monitoring and fault detection.
55#[async_trait]
56pub trait LivenessStore {
57    /// Update the heartbeat timestamp for a worker.
58    async fn update_heartbeat(
59        &self,
60        worker_id: usize,
61        timestamp: DateTime<Utc>,
62    ) -> Result<(), LivenessStoreError>;
63    /// Get the last heartbeat timestamp for a worker.
64    async fn get_heartbeat(
65        &self,
66        worker_id: usize,
67    ) -> Result<Option<DateTime<Utc>>, LivenessStoreError>;
68    /// List all known worker IDs.
69    async fn list_workers(&self) -> Result<Vec<usize>, LivenessStoreError>;
70    /// Update the health status for a worker.
71    async fn update_health(
72        &self,
73        worker_id: usize,
74        health: WorkerHealth,
75    ) -> Result<(), LivenessStoreError>;
76    /// Get the health status for a worker.
77    async fn get_health(
78        &self,
79        worker_id: usize,
80    ) -> Result<Option<WorkerHealth>, LivenessStoreError>;
81    /// List health status for all workers.
82    async fn list_health(&self) -> Result<Vec<WorkerHealth>, LivenessStoreError>;
83}
84
85/// In-memory implementation of LivenessStore for testing and local development.
86#[derive(Clone, Default)]
87pub struct InMemoryLivenessStore {
88    inner: Arc<Mutex<HashMap<usize, DateTime<Utc>>>>,
89    health: Arc<Mutex<HashMap<usize, WorkerHealth>>>,
90}
91
92#[async_trait]
93impl LivenessStore for InMemoryLivenessStore {
94    async fn update_heartbeat(
95        &self,
96        worker_id: usize,
97        timestamp: DateTime<Utc>,
98    ) -> Result<(), LivenessStoreError> {
99        let mut map = self.inner.lock().await;
100        map.insert(worker_id, timestamp);
101        Ok(())
102    }
103    async fn get_heartbeat(
104        &self,
105        worker_id: usize,
106    ) -> Result<Option<DateTime<Utc>>, LivenessStoreError> {
107        let map = self.inner.lock().await;
108        Ok(map.get(&worker_id).cloned())
109    }
110    async fn list_workers(&self) -> Result<Vec<usize>, LivenessStoreError> {
111        let map = self.inner.lock().await;
112        Ok(map.keys().cloned().collect())
113    }
114    async fn update_health(
115        &self,
116        worker_id: usize,
117        health: WorkerHealth,
118    ) -> Result<(), LivenessStoreError> {
119        let mut map = self.health.lock().await;
120        map.insert(worker_id, health);
121        Ok(())
122    }
123    async fn get_health(
124        &self,
125        worker_id: usize,
126    ) -> Result<Option<WorkerHealth>, LivenessStoreError> {
127        let map = self.health.lock().await;
128        Ok(map.get(&worker_id).cloned())
129    }
130    async fn list_health(&self) -> Result<Vec<WorkerHealth>, LivenessStoreError> {
131        let map = self.health.lock().await;
132        Ok(map.values().cloned().collect())
133    }
134}