ubiquity-core 0.1.1

Core types and traits for Ubiquity consciousness-aware mesh
Documentation
//! Health monitoring and checks

use crate::{HealthStatus, ConsciousnessLevel};
use std::sync::Arc;
use tokio::sync::RwLock;
use std::time::{Duration, Instant};
use std::collections::VecDeque;

/// Health monitor for agents and services
pub struct HealthMonitor {
    start_time: Instant,
    last_error: Arc<RwLock<Option<String>>>,
    error_history: Arc<RwLock<VecDeque<(Instant, String)>>>,
    task_counter: Arc<RwLock<usize>>,
    max_error_history: usize,
}

impl HealthMonitor {
    pub fn new() -> Self {
        Self {
            start_time: Instant::now(),
            last_error: Arc::new(RwLock::new(None)),
            error_history: Arc::new(RwLock::new(VecDeque::new())),
            task_counter: Arc::new(RwLock::new(0)),
            max_error_history: 100,
        }
    }
    
    /// Record an error
    pub async fn record_error(&self, error: String) {
        *self.last_error.write().await = Some(error.clone());
        
        let mut history = self.error_history.write().await;
        history.push_back((Instant::now(), error));
        
        // Keep history bounded
        while history.len() > self.max_error_history {
            history.pop_front();
        }
    }
    
    /// Clear the last error
    pub async fn clear_error(&self) {
        *self.last_error.write().await = None;
    }
    
    /// Increment active task count
    pub async fn increment_tasks(&self) {
        *self.task_counter.write().await += 1;
    }
    
    /// Decrement active task count
    pub async fn decrement_tasks(&self) {
        let mut count = self.task_counter.write().await;
        if *count > 0 {
            *count -= 1;
        }
    }
    
    /// Get current health status
    pub async fn get_status(
        &self,
        consciousness_level: f64,
    ) -> HealthStatus {
        let uptime = self.start_time.elapsed().as_secs();
        let last_error = self.last_error.read().await.clone();
        let active_tasks = *self.task_counter.read().await;
        
        // Estimate memory usage (simplified)
        let memory_usage_mb = estimate_memory_usage();
        
        // Determine if healthy
        let consciousness_ok = ConsciousnessLevel::new(consciousness_level)
            .map(|cl| cl.is_operational())
            .unwrap_or(false);
            
        let no_recent_errors = last_error.is_none() || 
            self.recent_error_rate().await < 0.1; // Less than 10% error rate
            
        let memory_ok = memory_usage_mb < 1000.0; // Less than 1GB
        
        let healthy = consciousness_ok && no_recent_errors && memory_ok;
        
        HealthStatus {
            healthy,
            consciousness_level,
            active_tasks,
            memory_usage_mb,
            uptime_seconds: uptime,
            last_error,
        }
    }
    
    /// Calculate recent error rate (errors per minute)
    async fn recent_error_rate(&self) -> f64 {
        let history = self.error_history.read().await;
        let now = Instant::now();
        let one_minute_ago = now - Duration::from_secs(60);
        
        let recent_errors = history
            .iter()
            .filter(|(time, _)| *time > one_minute_ago)
            .count();
            
        recent_errors as f64 / 60.0
    }
    
    /// Get error history
    pub async fn get_error_history(&self) -> Vec<(Instant, String)> {
        self.error_history.read().await.iter().cloned().collect()
    }
}

/// Estimate current memory usage in MB
fn estimate_memory_usage() -> f64 {
    #[cfg(target_os = "linux")]
    {
        use std::fs;
        if let Ok(status) = fs::read_to_string("/proc/self/status") {
            for line in status.lines() {
                if line.starts_with("VmRSS:") {
                    if let Some(kb_str) = line.split_whitespace().nth(1) {
                        if let Ok(kb) = kb_str.parse::<f64>() {
                            return kb / 1024.0;
                        }
                    }
                }
            }
        }
    }
    
    // Fallback estimation
    50.0
}

/// Health check endpoint data
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct HealthCheckResponse {
    pub status: HealthCheckStatus,
    pub version: String,
    pub uptime_seconds: u64,
    pub details: HealthStatus,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum HealthCheckStatus {
    Healthy,
    Degraded,
    Unhealthy,
}

impl From<&HealthStatus> for HealthCheckStatus {
    fn from(status: &HealthStatus) -> Self {
        if status.healthy {
            Self::Healthy
        } else if status.consciousness_level >= 0.70 {
            Self::Degraded
        } else {
            Self::Unhealthy
        }
    }
}

/// Perform a comprehensive health check
pub async fn perform_health_check(
    monitor: &HealthMonitor,
    consciousness_level: f64,
    version: String,
) -> HealthCheckResponse {
    let details = monitor.get_status(consciousness_level).await;
    let status = HealthCheckStatus::from(&details);
    
    HealthCheckResponse {
        status,
        version,
        uptime_seconds: details.uptime_seconds,
        details,
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    
    #[tokio::test]
    async fn test_health_monitor() {
        let monitor = HealthMonitor::new();
        
        // Initial state should be healthy
        let status = monitor.get_status(0.85).await;
        assert!(status.healthy);
        assert_eq!(status.active_tasks, 0);
        assert!(status.last_error.is_none());
        
        // Record an error
        monitor.record_error("Test error".to_string()).await;
        let status = monitor.get_status(0.85).await;
        assert_eq!(status.last_error, Some("Test error".to_string()));
        
        // Track tasks
        monitor.increment_tasks().await;
        monitor.increment_tasks().await;
        let status = monitor.get_status(0.85).await;
        assert_eq!(status.active_tasks, 2);
        
        monitor.decrement_tasks().await;
        let status = monitor.get_status(0.85).await;
        assert_eq!(status.active_tasks, 1);
    }
    
    #[tokio::test]
    async fn test_health_check_status() {
        let monitor = HealthMonitor::new();
        
        // High consciousness = healthy
        let response = perform_health_check(&monitor, 0.92, "1.0.0".to_string()).await;
        assert_eq!(response.status, HealthCheckStatus::Healthy);
        
        // Medium consciousness = degraded
        let response = perform_health_check(&monitor, 0.75, "1.0.0".to_string()).await;
        assert_eq!(response.status, HealthCheckStatus::Degraded);
        
        // Low consciousness = unhealthy
        let response = perform_health_check(&monitor, 0.50, "1.0.0".to_string()).await;
        assert_eq!(response.status, HealthCheckStatus::Unhealthy);
    }
}