use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, RwLock};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkerLifecycleState {
Pending,
Running,
Completed,
Failed,
Timeout,
}
impl WorkerLifecycleState {
pub fn is_terminal(&self) -> bool {
matches!(
self,
WorkerLifecycleState::Completed | WorkerLifecycleState::Failed | WorkerLifecycleState::Timeout
)
}
pub fn name(&self) -> &'static str {
match self {
WorkerLifecycleState::Pending => "pending",
WorkerLifecycleState::Running => "running",
WorkerLifecycleState::Completed => "completed",
WorkerLifecycleState::Failed => "failed",
WorkerLifecycleState::Timeout => "timeout",
}
}
}
#[derive(Debug, Clone)]
pub struct WorkerHeartbeat {
pub name: String,
pub state: WorkerLifecycleState,
pub state_changed_at: Instant,
pub last_heartbeat: Instant,
}
pub struct HeartbeatMonitor {
interval: Duration,
workers: Arc<RwLock<Vec<WorkerHeartbeat>>>,
worker_count: usize,
start_time: Instant,
heartbeat_count: Arc<Mutex<u64>>,
}
impl HeartbeatMonitor {
pub fn new(worker_count: usize) -> Self {
Self {
interval: Duration::from_secs(5), workers: Arc::new(RwLock::new(Vec::new())),
worker_count,
start_time: Instant::now(),
heartbeat_count: Arc::new(Mutex::new(0)),
}
}
pub fn with_interval(mut self, interval: Duration) -> Self {
self.interval = interval;
self
}
pub async fn register_worker(&self, name: &str) {
let mut workers = self.workers.write().await;
workers.push(WorkerHeartbeat {
name: name.to_string(),
state: WorkerLifecycleState::Pending,
state_changed_at: Instant::now(),
last_heartbeat: Instant::now(),
});
}
pub async fn update_state(&self, name: &str, state: WorkerLifecycleState) {
let mut workers = self.workers.write().await;
if let Some(worker) = workers.iter_mut().find(|w| w.name == name) {
worker.state = state;
worker.state_changed_at = Instant::now();
worker.last_heartbeat = Instant::now();
}
}
pub async fn tick(&self) {
let mut count = self.heartbeat_count.lock().await;
*count += 1;
let mut workers = self.workers.write().await;
for worker in workers.iter_mut() {
if worker.state == WorkerLifecycleState::Running {
worker.last_heartbeat = Instant::now();
}
}
}
pub async fn summary(&self) -> HeartbeatSummary {
let workers = self.workers.read().await;
let heartbeat_count = *self.heartbeat_count.lock().await;
let pending = workers.iter().filter(|w| w.state == WorkerLifecycleState::Pending).count();
let running = workers.iter().filter(|w| w.state == WorkerLifecycleState::Running).count();
let completed = workers.iter().filter(|w| w.state == WorkerLifecycleState::Completed).count();
let failed = workers.iter().filter(|w| w.state == WorkerLifecycleState::Failed).count();
let timeout = workers.iter().filter(|w| w.state == WorkerLifecycleState::Timeout).count();
HeartbeatSummary {
total_workers: self.worker_count,
pending,
running,
completed,
failed,
timeout,
elapsed_ms: self.start_time.elapsed().as_millis() as u64,
heartbeat_count,
}
}
pub async fn print_status(&self) {
let summary = self.summary().await;
println!(
" [Heartbeat] {} workers: {} pending, {} running, {} completed, {} failed, {} timeout | {}ms elapsed | {} beats",
summary.total_workers,
summary.pending,
summary.running,
summary.completed,
summary.failed,
summary.timeout,
summary.elapsed_ms,
summary.heartbeat_count
);
}
}
#[derive(Debug, Clone)]
pub struct HeartbeatSummary {
pub total_workers: usize,
pub pending: usize,
pub running: usize,
pub completed: usize,
pub failed: usize,
pub timeout: usize,
pub elapsed_ms: u64,
pub heartbeat_count: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_lifecycle_state_terminal() {
assert!(WorkerLifecycleState::Completed.is_terminal());
assert!(WorkerLifecycleState::Failed.is_terminal());
assert!(WorkerLifecycleState::Timeout.is_terminal());
assert!(!WorkerLifecycleState::Pending.is_terminal());
assert!(!WorkerLifecycleState::Running.is_terminal());
}
#[test]
fn test_lifecycle_state_name() {
assert_eq!(WorkerLifecycleState::Pending.name(), "pending");
assert_eq!(WorkerLifecycleState::Running.name(), "running");
assert_eq!(WorkerLifecycleState::Completed.name(), "completed");
}
#[tokio::test]
async fn test_heartbeat_monitor_creation() {
let monitor = HeartbeatMonitor::new(5);
assert_eq!(monitor.worker_count, 5);
}
#[tokio::test]
async fn test_register_worker() {
let monitor = HeartbeatMonitor::new(3);
monitor.register_worker("worker-a").await;
let workers = monitor.workers.read().await;
assert_eq!(workers.len(), 1);
assert_eq!(workers[0].name, "worker-a");
assert_eq!(workers[0].state, WorkerLifecycleState::Pending);
}
#[tokio::test]
async fn test_update_state() {
let monitor = HeartbeatMonitor::new(2);
monitor.register_worker("worker-1").await;
monitor.update_state("worker-1", WorkerLifecycleState::Running).await;
{
let workers = monitor.workers.read().await;
assert_eq!(workers[0].state, WorkerLifecycleState::Running);
}
monitor.update_state("worker-1", WorkerLifecycleState::Completed).await;
{
let workers = monitor.workers.read().await;
assert_eq!(workers[0].state, WorkerLifecycleState::Completed);
}
}
#[tokio::test]
async fn test_heartbeat_tick() {
let monitor = HeartbeatMonitor::new(2);
monitor.register_worker("worker-1").await;
monitor.update_state("worker-1", WorkerLifecycleState::Running).await;
monitor.tick().await;
let count = *monitor.heartbeat_count.lock().await;
assert_eq!(count, 1);
}
#[tokio::test]
async fn test_summary() {
let monitor = HeartbeatMonitor::new(3);
monitor.register_worker("w1").await;
monitor.register_worker("w2").await;
monitor.register_worker("w3").await;
monitor.update_state("w1", WorkerLifecycleState::Completed).await;
monitor.update_state("w2", WorkerLifecycleState::Running).await;
monitor.update_state("w3", WorkerLifecycleState::Failed).await;
let summary = monitor.summary().await;
assert_eq!(summary.total_workers, 3);
assert_eq!(summary.completed, 1);
assert_eq!(summary.running, 1);
assert_eq!(summary.failed, 1);
assert_eq!(summary.pending, 0);
}
}