bzzz-core 0.1.0

Bzzz core library - Declarative orchestration engine for AI Agents
Documentation
//! Agent Lifecycle Heartbeat Module
//!
//! Provides task-bound heartbeat monitoring for worker lifecycle.
//!
//! # Design
//!
//! This is a minimal implementation (~100 lines) bound to single SwarmFile execution:
//! - Tracks worker lifecycle state (Pending → Running → Completed/Failed/Timeout)
//! - Periodic heartbeat check at configurable interval
//! - Simple status reporting for CLI output
//!
//! # Constitution Alignment
//!
//! #2 Runtime Reliable - Real heartbeat execution, no mock implementations.

use std::sync::Arc;
use std::time::{Duration, Instant};

use tokio::sync::{Mutex, RwLock};

/// Worker lifecycle state
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WorkerLifecycleState {
    /// Worker not yet started
    Pending,
    /// Worker currently executing
    Running,
    /// Worker completed successfully
    Completed,
    /// Worker failed with error
    Failed,
    /// Worker exceeded timeout
    Timeout,
}

impl WorkerLifecycleState {
    /// Check if worker is in terminal state
    pub fn is_terminal(&self) -> bool {
        matches!(
            self,
            WorkerLifecycleState::Completed | WorkerLifecycleState::Failed | WorkerLifecycleState::Timeout
        )
    }

    /// Get state name for display
    pub fn name(&self) -> &'static str {
        match self {
            WorkerLifecycleState::Pending => "pending",
            WorkerLifecycleState::Running => "running",
            WorkerLifecycleState::Completed => "completed",
            WorkerLifecycleState::Failed => "failed",
            WorkerLifecycleState::Timeout => "timeout",
        }
    }
}

/// Worker heartbeat info
#[derive(Debug, Clone)]
pub struct WorkerHeartbeat {
    /// Worker name
    pub name: String,
    /// Current lifecycle state
    pub state: WorkerLifecycleState,
    /// When worker entered current state
    pub state_changed_at: Instant,
    /// Last heartbeat timestamp
    pub last_heartbeat: Instant,
}

/// Heartbeat monitor for task-bound execution
pub struct HeartbeatMonitor {
    /// Heartbeat interval
    interval: Duration,
    /// Worker heartbeats
    workers: Arc<RwLock<Vec<WorkerHeartbeat>>>,
    /// Total workers being monitored
    worker_count: usize,
    /// Start time
    start_time: Instant,
    /// Heartbeat count
    heartbeat_count: Arc<Mutex<u64>>,
}

impl HeartbeatMonitor {
    /// Create a new heartbeat monitor
    pub fn new(worker_count: usize) -> Self {
        Self {
            interval: Duration::from_secs(5), // Default 5s interval
            workers: Arc::new(RwLock::new(Vec::new())),
            worker_count,
            start_time: Instant::now(),
            heartbeat_count: Arc::new(Mutex::new(0)),
        }
    }

    /// Set heartbeat interval
    pub fn with_interval(mut self, interval: Duration) -> Self {
        self.interval = interval;
        self
    }

    /// Register a worker for monitoring
    pub async fn register_worker(&self, name: &str) {
        let mut workers = self.workers.write().await;
        workers.push(WorkerHeartbeat {
            name: name.to_string(),
            state: WorkerLifecycleState::Pending,
            state_changed_at: Instant::now(),
            last_heartbeat: Instant::now(),
        });
    }

    /// Update worker state
    pub async fn update_state(&self, name: &str, state: WorkerLifecycleState) {
        let mut workers = self.workers.write().await;
        if let Some(worker) = workers.iter_mut().find(|w| w.name == name) {
            worker.state = state;
            worker.state_changed_at = Instant::now();
            worker.last_heartbeat = Instant::now();
        }
    }

    /// Record heartbeat tick
    pub async fn tick(&self) {
        let mut count = self.heartbeat_count.lock().await;
        *count += 1;

        // Update last_heartbeat for all running workers
        let mut workers = self.workers.write().await;
        for worker in workers.iter_mut() {
            if worker.state == WorkerLifecycleState::Running {
                worker.last_heartbeat = Instant::now();
            }
        }
    }

    /// Get summary report
    pub async fn summary(&self) -> HeartbeatSummary {
        let workers = self.workers.read().await;
        let heartbeat_count = *self.heartbeat_count.lock().await;

        let pending = workers.iter().filter(|w| w.state == WorkerLifecycleState::Pending).count();
        let running = workers.iter().filter(|w| w.state == WorkerLifecycleState::Running).count();
        let completed = workers.iter().filter(|w| w.state == WorkerLifecycleState::Completed).count();
        let failed = workers.iter().filter(|w| w.state == WorkerLifecycleState::Failed).count();
        let timeout = workers.iter().filter(|w| w.state == WorkerLifecycleState::Timeout).count();

        HeartbeatSummary {
            total_workers: self.worker_count,
            pending,
            running,
            completed,
            failed,
            timeout,
            elapsed_ms: self.start_time.elapsed().as_millis() as u64,
            heartbeat_count,
        }
    }

    /// Print status for CLI
    pub async fn print_status(&self) {
        let summary = self.summary().await;
        println!(
            "  [Heartbeat] {} workers: {} pending, {} running, {} completed, {} failed, {} timeout | {}ms elapsed | {} beats",
            summary.total_workers,
            summary.pending,
            summary.running,
            summary.completed,
            summary.failed,
            summary.timeout,
            summary.elapsed_ms,
            summary.heartbeat_count
        );
    }
}

/// Heartbeat summary report
#[derive(Debug, Clone)]
pub struct HeartbeatSummary {
    /// Total workers registered
    pub total_workers: usize,
    /// Workers in Pending state
    pub pending: usize,
    /// Workers in Running state
    pub running: usize,
    /// Workers completed successfully
    pub completed: usize,
    /// Workers that failed
    pub failed: usize,
    /// Workers that timed out
    pub timeout: usize,
    /// Total elapsed time (ms)
    pub elapsed_ms: u64,
    /// Number of heartbeat ticks
    pub heartbeat_count: u64,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_lifecycle_state_terminal() {
        assert!(WorkerLifecycleState::Completed.is_terminal());
        assert!(WorkerLifecycleState::Failed.is_terminal());
        assert!(WorkerLifecycleState::Timeout.is_terminal());
        assert!(!WorkerLifecycleState::Pending.is_terminal());
        assert!(!WorkerLifecycleState::Running.is_terminal());
    }

    #[test]
    fn test_lifecycle_state_name() {
        assert_eq!(WorkerLifecycleState::Pending.name(), "pending");
        assert_eq!(WorkerLifecycleState::Running.name(), "running");
        assert_eq!(WorkerLifecycleState::Completed.name(), "completed");
    }

    #[tokio::test]
    async fn test_heartbeat_monitor_creation() {
        let monitor = HeartbeatMonitor::new(5);
        assert_eq!(monitor.worker_count, 5);
    }

    #[tokio::test]
    async fn test_register_worker() {
        let monitor = HeartbeatMonitor::new(3);
        monitor.register_worker("worker-a").await;

        let workers = monitor.workers.read().await;
        assert_eq!(workers.len(), 1);
        assert_eq!(workers[0].name, "worker-a");
        assert_eq!(workers[0].state, WorkerLifecycleState::Pending);
    }

    #[tokio::test]
    async fn test_update_state() {
        let monitor = HeartbeatMonitor::new(2);
        monitor.register_worker("worker-1").await;

        monitor.update_state("worker-1", WorkerLifecycleState::Running).await;
        {
            let workers = monitor.workers.read().await;
            assert_eq!(workers[0].state, WorkerLifecycleState::Running);
        }

        monitor.update_state("worker-1", WorkerLifecycleState::Completed).await;
        {
            let workers = monitor.workers.read().await;
            assert_eq!(workers[0].state, WorkerLifecycleState::Completed);
        }
    }

    #[tokio::test]
    async fn test_heartbeat_tick() {
        let monitor = HeartbeatMonitor::new(2);
        monitor.register_worker("worker-1").await;
        monitor.update_state("worker-1", WorkerLifecycleState::Running).await;

        monitor.tick().await;
        let count = *monitor.heartbeat_count.lock().await;
        assert_eq!(count, 1);
    }

    #[tokio::test]
    async fn test_summary() {
        let monitor = HeartbeatMonitor::new(3);
        monitor.register_worker("w1").await;
        monitor.register_worker("w2").await;
        monitor.register_worker("w3").await;

        monitor.update_state("w1", WorkerLifecycleState::Completed).await;
        monitor.update_state("w2", WorkerLifecycleState::Running).await;
        monitor.update_state("w3", WorkerLifecycleState::Failed).await;

        let summary = monitor.summary().await;
        assert_eq!(summary.total_workers, 3);
        assert_eq!(summary.completed, 1);
        assert_eq!(summary.running, 1);
        assert_eq!(summary.failed, 1);
        assert_eq!(summary.pending, 0);
    }
}