use crate::distributed::LivenessStoreError;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub enum WorkerStatus {
#[default]
Idle,
InProgress,
Retrying(usize, usize), }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerHealth {
pub worker_id: usize,
pub last_heartbeat: DateTime<Utc>,
pub error_count: usize,
pub status: WorkerStatus,
pub current_work_item: Option<String>,
pub current_work_item_run_id: Option<String>,
}
impl Default for WorkerHealth {
fn default() -> Self {
Self {
worker_id: 0,
last_heartbeat: chrono::Utc::now(),
error_count: 0,
status: WorkerStatus::Idle,
current_work_item: None,
current_work_item_run_id: None,
}
}
}
#[async_trait]
pub trait LivenessStore {
async fn update_heartbeat(
&self,
worker_id: usize,
timestamp: DateTime<Utc>,
) -> Result<(), LivenessStoreError>;
async fn get_heartbeat(
&self,
worker_id: usize,
) -> Result<Option<DateTime<Utc>>, LivenessStoreError>;
async fn list_workers(&self) -> Result<Vec<usize>, LivenessStoreError>;
async fn update_health(
&self,
worker_id: usize,
health: WorkerHealth,
) -> Result<(), LivenessStoreError>;
async fn get_health(
&self,
worker_id: usize,
) -> Result<Option<WorkerHealth>, LivenessStoreError>;
async fn list_health(&self) -> Result<Vec<WorkerHealth>, LivenessStoreError>;
}
#[derive(Clone, Default)]
pub struct InMemoryLivenessStore {
inner: Arc<Mutex<HashMap<usize, DateTime<Utc>>>>,
health: Arc<Mutex<HashMap<usize, WorkerHealth>>>,
}
#[async_trait]
impl LivenessStore for InMemoryLivenessStore {
async fn update_heartbeat(
&self,
worker_id: usize,
timestamp: DateTime<Utc>,
) -> Result<(), LivenessStoreError> {
let mut map = self.inner.lock().await;
map.insert(worker_id, timestamp);
Ok(())
}
async fn get_heartbeat(
&self,
worker_id: usize,
) -> Result<Option<DateTime<Utc>>, LivenessStoreError> {
let map = self.inner.lock().await;
Ok(map.get(&worker_id).cloned())
}
async fn list_workers(&self) -> Result<Vec<usize>, LivenessStoreError> {
let map = self.inner.lock().await;
Ok(map.keys().cloned().collect())
}
async fn update_health(
&self,
worker_id: usize,
health: WorkerHealth,
) -> Result<(), LivenessStoreError> {
let mut map = self.health.lock().await;
map.insert(worker_id, health);
Ok(())
}
async fn get_health(
&self,
worker_id: usize,
) -> Result<Option<WorkerHealth>, LivenessStoreError> {
let map = self.health.lock().await;
Ok(map.get(&worker_id).cloned())
}
async fn list_health(&self) -> Result<Vec<WorkerHealth>, LivenessStoreError> {
let map = self.health.lock().await;
Ok(map.values().cloned().collect())
}
}