Skip to main content

oxirs_embed/
monitoring_health.rs

1//! Health check logic, alerting, and threshold monitoring for embedding service.
2//!
3//! This module implements liveness/readiness/comprehensive health checks
4//! and alert handler infrastructure.
5
6use anyhow::{anyhow, Result};
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use std::sync::{Arc, RwLock};
11
12use crate::monitoring_metrics::{MetricsCollector, PerformanceMetrics};
13
14// ====================================================================================
15// ALERT INFRASTRUCTURE
16// ====================================================================================
17
18/// Alert handling trait
19pub trait AlertHandler {
20    fn handle_alert(&self, alert: Alert) -> Result<()>;
21}
22
23/// Alert types
24#[derive(Debug, Clone)]
25pub struct Alert {
26    pub alert_type: AlertType,
27    pub message: String,
28    pub severity: AlertSeverity,
29    pub timestamp: DateTime<Utc>,
30    pub metrics: HashMap<String, f64>,
31}
32
33/// Alert types
34#[derive(Debug, Clone)]
35pub enum AlertType {
36    HighLatency,
37    LowThroughput,
38    HighErrorRate,
39    LowCacheHitRate,
40    QualityDrift,
41    PerformanceDrift,
42    ResourceExhaustion,
43    SystemFailure,
44}
45
46/// Alert severity levels
47#[derive(Debug, Clone)]
48pub enum AlertSeverity {
49    Info,
50    Warning,
51    Critical,
52    Emergency,
53}
54
55/// Alert threshold configuration
56#[derive(Debug, Clone)]
57pub struct AlertThresholds {
58    /// Maximum acceptable P95 latency (ms)
59    pub max_p95_latency_ms: f64,
60    /// Minimum acceptable throughput (req/s)
61    pub min_throughput_rps: f64,
62    /// Maximum acceptable error rate
63    pub max_error_rate: f64,
64    /// Minimum acceptable cache hit rate
65    pub min_cache_hit_rate: f64,
66    /// Maximum acceptable quality drift
67    pub max_quality_drift: f64,
68    /// Maximum acceptable memory usage (MB)
69    pub max_memory_usage_mb: f64,
70    /// Maximum acceptable GPU memory usage (MB)
71    pub max_gpu_memory_mb: f64,
72}
73
74impl Default for AlertThresholds {
75    fn default() -> Self {
76        Self {
77            max_p95_latency_ms: 500.0,
78            min_throughput_rps: 100.0,
79            max_error_rate: 0.05,    // 5%
80            min_cache_hit_rate: 0.8, // 80%
81            max_quality_drift: 0.1,
82            max_memory_usage_mb: 4096.0, // 4GB
83            max_gpu_memory_mb: 8192.0,   // 8GB
84        }
85    }
86}
87
88/// Console alert handler implementation
89pub struct ConsoleAlertHandler;
90
91impl AlertHandler for ConsoleAlertHandler {
92    fn handle_alert(&self, alert: Alert) -> Result<()> {
93        println!(
94            "ALERT [{}]: {} - {}",
95            format!("{:?}", alert.severity).to_uppercase(),
96            alert.message,
97            alert.timestamp.format("%Y-%m-%d %H:%M:%S UTC")
98        );
99        Ok(())
100    }
101}
102
103/// Slack alert handler (placeholder)
104pub struct SlackAlertHandler {
105    pub webhook_url: String,
106}
107
108impl AlertHandler for SlackAlertHandler {
109    fn handle_alert(&self, alert: Alert) -> Result<()> {
110        // In production, this would send to Slack
111        tracing::info!(
112            "Would send Slack alert to {}: {}",
113            self.webhook_url,
114            alert.message
115        );
116        Ok(())
117    }
118}
119
120// ====================================================================================
121// HEALTH CHECK FUNCTIONALITY
122// ====================================================================================
123
124/// Health status for the embedding service
125#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
126pub enum HealthStatus {
127    /// Service is healthy and operational
128    Healthy,
129    /// Service is degraded but operational
130    Degraded,
131    /// Service is unhealthy
132    Unhealthy,
133}
134
135/// Health check result
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct HealthCheckResult {
138    /// Overall health status
139    pub status: HealthStatus,
140    /// Timestamp of health check
141    pub timestamp: DateTime<Utc>,
142    /// Individual component health
143    pub components: HashMap<String, ComponentHealth>,
144    /// Additional details
145    pub details: HashMap<String, String>,
146}
147
148/// Component health information
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct ComponentHealth {
151    /// Component status
152    pub status: HealthStatus,
153    /// Component message
154    pub message: String,
155    /// Last check time
156    pub last_check: DateTime<Utc>,
157    /// Additional metrics
158    pub metrics: HashMap<String, f64>,
159}
160
161/// Health checker for embedding service
162pub struct HealthChecker {
163    /// Model load status
164    models_loaded: Arc<RwLock<bool>>,
165    /// Last successful request time
166    last_request_time: Arc<RwLock<DateTime<Utc>>>,
167    /// Error rate threshold
168    error_rate_threshold: f64,
169    /// Latency threshold (ms)
170    latency_threshold_ms: f64,
171    /// Memory threshold (MB)
172    memory_threshold_mb: f64,
173    /// Metrics collector
174    metrics: Arc<MetricsCollector>,
175}
176
177impl HealthChecker {
178    /// Create a new health checker
179    pub fn new(metrics: Arc<MetricsCollector>) -> Self {
180        Self {
181            models_loaded: Arc::new(RwLock::new(false)),
182            last_request_time: Arc::new(RwLock::new(Utc::now())),
183            error_rate_threshold: 0.1,    // 10%
184            latency_threshold_ms: 1000.0, // 1 second
185            memory_threshold_mb: 8192.0,  // 8GB
186            metrics,
187        }
188    }
189
190    /// Set models loaded status
191    pub fn set_models_loaded(&self, loaded: bool) -> Result<()> {
192        let mut status = self
193            .models_loaded
194            .write()
195            .map_err(|e| anyhow!("Failed to write lock: {}", e))?;
196        *status = loaded;
197        Ok(())
198    }
199
200    /// Update last request time
201    pub fn update_last_request_time(&self) -> Result<()> {
202        let mut time = self
203            .last_request_time
204            .write()
205            .map_err(|e| anyhow!("Failed to write lock: {}", e))?;
206        *time = Utc::now();
207        Ok(())
208    }
209
210    /// Perform liveness check (basic service availability)
211    pub fn check_liveness(&self) -> HealthCheckResult {
212        let mut components = HashMap::new();
213
214        // Check if service is running (always healthy if we can respond)
215        components.insert(
216            "service".to_string(),
217            ComponentHealth {
218                status: HealthStatus::Healthy,
219                message: "Service is running".to_string(),
220                last_check: Utc::now(),
221                metrics: HashMap::new(),
222            },
223        );
224
225        HealthCheckResult {
226            status: HealthStatus::Healthy,
227            timestamp: Utc::now(),
228            components,
229            details: HashMap::new(),
230        }
231    }
232
233    /// Perform readiness check (service ready to handle requests)
234    pub fn check_readiness(&self) -> HealthCheckResult {
235        let mut components = HashMap::new();
236        let mut overall_status = HealthStatus::Healthy;
237
238        // Check if models are loaded
239        let models_loaded = self.models_loaded.read().map(|g| *g).unwrap_or(false);
240        if !models_loaded {
241            overall_status = HealthStatus::Unhealthy;
242            components.insert(
243                "models".to_string(),
244                ComponentHealth {
245                    status: HealthStatus::Unhealthy,
246                    message: "Models not loaded".to_string(),
247                    last_check: Utc::now(),
248                    metrics: HashMap::new(),
249                },
250            );
251        } else {
252            components.insert(
253                "models".to_string(),
254                ComponentHealth {
255                    status: HealthStatus::Healthy,
256                    message: "Models loaded and ready".to_string(),
257                    last_check: Utc::now(),
258                    metrics: HashMap::new(),
259                },
260            );
261        }
262
263        // Check cache availability
264        let cache_hit_rate = self.metrics.get_cache_hit_rate();
265        components.insert(
266            "cache".to_string(),
267            ComponentHealth {
268                status: HealthStatus::Healthy,
269                message: format!("Cache hit rate: {:.2}%", cache_hit_rate * 100.0),
270                last_check: Utc::now(),
271                metrics: [("hit_rate".to_string(), cache_hit_rate)]
272                    .into_iter()
273                    .collect(),
274            },
275        );
276
277        HealthCheckResult {
278            status: overall_status,
279            timestamp: Utc::now(),
280            components,
281            details: HashMap::new(),
282        }
283    }
284
285    /// Perform comprehensive health check
286    pub fn check_health(&self, performance_metrics: &PerformanceMetrics) -> HealthCheckResult {
287        let mut components = HashMap::new();
288        let mut overall_status = HealthStatus::Healthy;
289
290        // Check models
291        let models_loaded = self.models_loaded.read().map(|g| *g).unwrap_or(false);
292        if !models_loaded {
293            overall_status = HealthStatus::Unhealthy;
294            components.insert(
295                "models".to_string(),
296                ComponentHealth {
297                    status: HealthStatus::Unhealthy,
298                    message: "Models not loaded".to_string(),
299                    last_check: Utc::now(),
300                    metrics: HashMap::new(),
301                },
302            );
303        } else {
304            components.insert(
305                "models".to_string(),
306                ComponentHealth {
307                    status: HealthStatus::Healthy,
308                    message: "Models operational".to_string(),
309                    last_check: Utc::now(),
310                    metrics: HashMap::new(),
311                },
312            );
313        }
314
315        // Check latency
316        let latency_status =
317            if performance_metrics.latency.p95_latency_ms > self.latency_threshold_ms {
318                if overall_status == HealthStatus::Healthy {
319                    overall_status = HealthStatus::Degraded;
320                }
321                HealthStatus::Degraded
322            } else {
323                HealthStatus::Healthy
324            };
325
326        components.insert(
327            "latency".to_string(),
328            ComponentHealth {
329                status: latency_status,
330                message: format!(
331                    "P95 latency: {:.2}ms",
332                    performance_metrics.latency.p95_latency_ms
333                ),
334                last_check: Utc::now(),
335                metrics: [
336                    (
337                        "p50".to_string(),
338                        performance_metrics.latency.p50_latency_ms,
339                    ),
340                    (
341                        "p95".to_string(),
342                        performance_metrics.latency.p95_latency_ms,
343                    ),
344                    (
345                        "p99".to_string(),
346                        performance_metrics.latency.p99_latency_ms,
347                    ),
348                ]
349                .into_iter()
350                .collect(),
351            },
352        );
353
354        // Check error rate
355        let error_rate = if performance_metrics.throughput.total_requests > 0 {
356            performance_metrics.errors.total_errors as f64
357                / performance_metrics.throughput.total_requests as f64
358        } else {
359            0.0
360        };
361
362        let error_status = if error_rate > self.error_rate_threshold {
363            if overall_status == HealthStatus::Healthy {
364                overall_status = HealthStatus::Degraded;
365            }
366            HealthStatus::Degraded
367        } else {
368            HealthStatus::Healthy
369        };
370
371        components.insert(
372            "errors".to_string(),
373            ComponentHealth {
374                status: error_status,
375                message: format!("Error rate: {:.2}%", error_rate * 100.0),
376                last_check: Utc::now(),
377                metrics: [("error_rate".to_string(), error_rate)]
378                    .into_iter()
379                    .collect(),
380            },
381        );
382
383        // Check memory
384        let memory_status =
385            if performance_metrics.resources.memory_usage_mb > self.memory_threshold_mb {
386                if overall_status == HealthStatus::Healthy {
387                    overall_status = HealthStatus::Degraded;
388                }
389                HealthStatus::Degraded
390            } else {
391                HealthStatus::Healthy
392            };
393
394        components.insert(
395            "memory".to_string(),
396            ComponentHealth {
397                status: memory_status,
398                message: format!(
399                    "Memory usage: {:.2}MB / {:.2}MB",
400                    performance_metrics.resources.memory_usage_mb, self.memory_threshold_mb
401                ),
402                last_check: Utc::now(),
403                metrics: [
404                    (
405                        "usage_mb".to_string(),
406                        performance_metrics.resources.memory_usage_mb,
407                    ),
408                    ("threshold_mb".to_string(), self.memory_threshold_mb),
409                ]
410                .into_iter()
411                .collect(),
412            },
413        );
414
415        // Check cache
416        let cache_hit_rate = self.metrics.get_cache_hit_rate();
417        components.insert(
418            "cache".to_string(),
419            ComponentHealth {
420                status: HealthStatus::Healthy,
421                message: format!("Cache hit rate: {:.2}%", cache_hit_rate * 100.0),
422                last_check: Utc::now(),
423                metrics: [("hit_rate".to_string(), cache_hit_rate)]
424                    .into_iter()
425                    .collect(),
426            },
427        );
428
429        HealthCheckResult {
430            status: overall_status,
431            timestamp: Utc::now(),
432            components,
433            details: HashMap::new(),
434        }
435    }
436
437    /// Get metrics endpoint (Prometheus format)
438    pub fn get_metrics_endpoint(&self) -> Result<String> {
439        self.metrics.export_prometheus()
440    }
441}