ultrafast_gateway/
advanced_routing.rs

1//! # Advanced Routing and Load Balancing Module
2//!
3//! This module provides intelligent request routing, load balancing, and provider
4//! health monitoring for the Ultrafast Gateway. It includes circuit breakers,
5//! health checks, and adaptive routing strategies.
6//!
7//! ## Overview
8//!
9//! The advanced routing system provides:
10//! - **Intelligent Provider Selection**: Route requests to the best available provider
11//! - **Health Monitoring**: Real-time provider health tracking
12//! - **Circuit Breakers**: Automatic failover and recovery
13//! - **Load Balancing**: Distribute requests across healthy providers
14//! - **Adaptive Routing**: Dynamic routing based on performance metrics
15//! - **Failover Strategies**: Automatic fallback to backup providers
16//!
17//! ## Routing Strategies
18//!
19//! The system supports multiple routing strategies:
20//!
21//! - **Single Provider**: Route all requests to a single provider
22//! - **Load Balancing**: Distribute requests across multiple providers
23//! - **Failover**: Primary provider with automatic fallback
24//! - **A/B Testing**: Route requests to different providers for testing
25//! - **Geographic Routing**: Route based on geographic location
26//! - **Cost-Based Routing**: Route to the most cost-effective provider
27//!
28//! ## Health Monitoring
29//!
30//! Real-time provider health tracking includes:
31//!
32//! - **Latency Monitoring**: Track response times for each provider
33//! - **Success Rate Tracking**: Monitor success/failure ratios
34//! - **Consecutive Failures**: Track consecutive failure counts
35//! - **Automatic Health Checks**: Periodic provider health verification
36//! - **Degraded State Detection**: Identify underperforming providers
37//!
38//! ## Circuit Breaker Pattern
39//!
40//! The system implements circuit breakers for each provider:
41//!
42//! - **Closed State**: Normal operation, requests allowed
43//! - **Open State**: Provider failing, requests blocked
44//! - **Half-Open State**: Testing if provider has recovered
45//! - **Automatic Recovery**: Automatic state transitions based on health
46//!
47//! ## Usage
48//!
49//! ```rust
50//! use ultrafast_gateway::advanced_routing::{AdvancedRouter, RoutingConfig};
51//! use ultrafast_models_sdk::routing::RoutingStrategy;
52//!
53//! // Create advanced router
54//! let config = RoutingConfig {
55//!     strategy: RoutingStrategy::LoadBalancing,
56//!     health_check_interval: Duration::from_secs(30),
57//!     failover_threshold: 0.8,
58//! };
59//!
60//! let router = AdvancedRouter::new(RoutingStrategy::LoadBalancing, config);
61//!
62//! // Select provider for request
63//! let selection = router.select_provider(&providers, &context).await;
64//!
65//! // Update provider health after request
66//! router.update_provider_health("openai", true, 150).await;
67//! ```
68//!
69//! ## Health Check Configuration
70//!
71//! Health checks can be configured via `RoutingConfig`:
72//!
73//! ```toml
74//! [routing]
75//! strategy = "load_balancing"
76//! health_check_interval = "30s"
77//! failover_threshold = 0.8
78//! ```
79//!
80//! ## Performance Metrics
81//!
82//! The routing system tracks comprehensive metrics:
83//!
84//! - **Provider Latency**: Average response times per provider
85//! - **Success Rates**: Request success percentages
86//! - **Failure Counts**: Consecutive failure tracking
87//! - **Health Status**: Current health state of each provider
88//! - **Routing Decisions**: Provider selection statistics
89//!
90//! ## Failover Strategies
91//!
92//! The system supports multiple failover strategies:
93//!
94//! - **Immediate Failover**: Switch to backup on first failure
95//! - **Threshold-Based**: Switch after N consecutive failures
96//! - **Latency-Based**: Switch when latency exceeds threshold
97//! - **Cost-Based**: Switch to cheaper provider when possible
98//!
99//! ## Monitoring and Alerts
100//!
101//! The routing system provides monitoring capabilities:
102//!
103//! - **Health Status Endpoints**: Real-time provider health
104//! - **Performance Metrics**: Detailed routing statistics
105//! - **Circuit Breaker Status**: Current state of each provider
106//! - **Alert Integration**: Notifications for unhealthy providers
107
108// Advanced routing and load balancing module with provider health checks
109use crate::config::RoutingConfig;
110use serde::{Deserialize, Serialize};
111use std::collections::HashMap;
112use std::sync::{Arc, OnceLock};
113use std::time::{Duration, Instant};
114use tokio::sync::RwLock;
115use ultrafast_models_sdk::providers::{HealthStatus, Provider};
116use ultrafast_models_sdk::routing::{ProviderSelection, Router, RoutingContext, RoutingStrategy};
117
118/// Global health checker storage for thread-safe access.
119///
120/// Uses `OnceLock` to ensure the health checker is initialized only once
121/// and shared across all threads.
122static HEALTH_CHECKER: OnceLock<Arc<RwLock<HealthChecker>>> = OnceLock::new();
123
124/// Get the global health checker instance.
125///
126/// Returns a reference to the global health checker, initializing it
127/// if it hasn't been initialized yet.
128fn get_health_checker() -> &'static Arc<RwLock<HealthChecker>> {
129    HEALTH_CHECKER.get_or_init(|| Arc::new(RwLock::new(HealthChecker::new())))
130}
131
132/// Health status for a specific provider.
133///
134/// Tracks the current health state, performance metrics, and failure
135/// statistics for a provider.
136///
137/// # Example
138///
139/// ```rust
140/// let status = ProviderHealthStatus {
141///     provider_id: "openai".to_string(),
142///     status: HealthStatus::Healthy,
143///     last_check_timestamp: chrono::Utc::now().timestamp() as u64,
144///     consecutive_failures: 0,
145///     average_latency_ms: 150.0,
146///     success_rate: 0.99,
147/// };
148/// ```
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct ProviderHealthStatus {
151    /// Provider identifier
152    pub provider_id: String,
153    /// Current health status (Healthy, Degraded, Unhealthy, Unknown)
154    pub status: HealthStatus,
155    /// Timestamp of the last health check
156    pub last_check_timestamp: u64,
157    /// Number of consecutive failures
158    pub consecutive_failures: u32,
159    /// Average response latency in milliseconds
160    pub average_latency_ms: f64,
161    /// Success rate as a percentage (0.0 to 1.0)
162    pub success_rate: f64,
163}
164
165impl Default for ProviderHealthStatus {
166    fn default() -> Self {
167        Self {
168            provider_id: String::new(),
169            status: HealthStatus::Unknown,
170            last_check_timestamp: chrono::Utc::now().timestamp() as u64,
171            consecutive_failures: 0,
172            average_latency_ms: 0.0,
173            success_rate: 1.0,
174        }
175    }
176}
177
178/// Health checker for monitoring provider health and performance.
179///
180/// Manages health checks, tracks performance metrics, and provides
181/// health status information for routing decisions.
182///
183/// # Thread Safety
184///
185/// All operations are thread-safe and can be used concurrently.
186///
187/// # Example
188///
189/// ```rust
190/// let mut health_checker = HealthChecker::new();
191/// health_checker.set_config(routing_config);
192///
193/// let status = health_checker.check_provider_health("openai", &provider).await;
194/// ```
195#[derive(Debug)]
196pub struct HealthChecker {
197    /// Health status for each provider
198    provider_health: HashMap<String, ProviderHealthStatus>,
199    /// Routing configuration for health check settings
200    config: Option<RoutingConfig>,
201}
202
203impl Default for HealthChecker {
204    fn default() -> Self {
205        Self::new()
206    }
207}
208
209impl HealthChecker {
210    /// Create a new health checker instance.
211    ///
212    /// Initializes an empty health checker with no configuration.
213    /// Use `set_config()` to configure health check behavior.
214    ///
215    /// # Returns
216    ///
217    /// Returns a new `HealthChecker` instance.
218    pub fn new() -> Self {
219        Self {
220            provider_health: HashMap::new(),
221            config: None,
222        }
223    }
224
225    /// Set the routing configuration for health checks.
226    ///
227    /// Configures health check intervals, thresholds, and other
228    /// routing-related settings.
229    ///
230    /// # Arguments
231    ///
232    /// * `config` - Routing configuration with health check settings
233    pub fn set_config(&mut self, config: RoutingConfig) {
234        self.config = Some(config);
235    }
236
237    /// Check the health of a specific provider.
238    ///
239    /// Performs a health check on the provider and updates the health
240    /// status with current metrics.
241    ///
242    /// # Arguments
243    ///
244    /// * `provider_id` - The provider identifier
245    /// * `provider` - The provider instance to check
246    ///
247    /// # Returns
248    ///
249    /// Returns the updated health status for the provider.
250    pub async fn check_provider_health(
251        &mut self,
252        provider_id: &str,
253        provider: &Arc<dyn Provider>,
254    ) -> ProviderHealthStatus {
255        let start = Instant::now();
256
257        match provider.health_check().await {
258            Ok(health) => {
259                let latency = start.elapsed().as_millis() as f64;
260
261                let status = self
262                    .provider_health
263                    .entry(provider_id.to_string())
264                    .or_default();
265                status.provider_id = provider_id.to_string();
266                status.status = health.status;
267                status.last_check_timestamp = chrono::Utc::now().timestamp() as u64;
268                status.consecutive_failures = 0;
269
270                // Update average latency with exponential moving average
271                if status.average_latency_ms == 0.0 {
272                    status.average_latency_ms = latency;
273                } else {
274                    status.average_latency_ms = status.average_latency_ms * 0.8 + latency * 0.2;
275                }
276
277                // Update success rate
278                status.success_rate = status.success_rate * 0.9 + 0.1;
279
280                status.clone()
281            }
282            Err(_) => {
283                let status = self
284                    .provider_health
285                    .entry(provider_id.to_string())
286                    .or_default();
287                status.provider_id = provider_id.to_string();
288                status.status = HealthStatus::Unhealthy;
289                status.last_check_timestamp = chrono::Utc::now().timestamp() as u64;
290                status.consecutive_failures += 1;
291
292                // Update success rate
293                status.success_rate *= 0.9;
294
295                status.clone()
296            }
297        }
298    }
299
300    pub fn get_healthy_providers(&self) -> Vec<String> {
301        self.provider_health
302            .iter()
303            .filter(|(_, health)| {
304                matches!(
305                    health.status,
306                    HealthStatus::Healthy | HealthStatus::Degraded
307                ) && health.consecutive_failures < 3
308            })
309            .map(|(id, _)| id.clone())
310            .collect()
311    }
312
313    pub fn should_use_provider(&self, provider_id: &str) -> bool {
314        if let Some(health) = self.provider_health.get(provider_id) {
315            let config = self.config.as_ref();
316            let failover_threshold = config.map(|c| c.failover_threshold).unwrap_or(0.8);
317
318            health.success_rate >= failover_threshold && health.consecutive_failures < 5
319        } else {
320            true // Allow unknown providers to be tried
321        }
322    }
323
324    pub fn get_provider_stats(&self) -> HashMap<String, ProviderHealthStatus> {
325        self.provider_health.clone()
326    }
327
328    pub fn is_provider_healthy(&self, provider_id: &str) -> bool {
329        if let Some(health) = self.provider_health.get(provider_id) {
330            matches!(
331                health.status,
332                HealthStatus::Healthy | HealthStatus::Degraded
333            ) && health.consecutive_failures < 3
334        } else {
335            true
336        }
337    }
338}
339
340pub struct AdvancedRouter {
341    base_router: Router,
342    health_checker: Arc<RwLock<HealthChecker>>,
343    config: RoutingConfig,
344}
345
346impl AdvancedRouter {
347    pub fn new(strategy: RoutingStrategy, config: RoutingConfig) -> Self {
348        let health_checker = get_health_checker().clone();
349
350        Self {
351            base_router: Router::new(strategy),
352            health_checker,
353            config,
354        }
355    }
356
357    pub async fn select_provider(
358        &self,
359        providers: &HashMap<String, Arc<dyn Provider>>,
360        context: &RoutingContext,
361    ) -> Option<ProviderSelection> {
362        // First, filter out unhealthy providers
363        let health_checker = self.health_checker.read().await;
364        let healthy_provider_ids: Vec<String> = providers
365            .keys()
366            .filter(|id| health_checker.is_provider_healthy(id))
367            .cloned()
368            .collect();
369
370        drop(health_checker);
371
372        if healthy_provider_ids.is_empty() {
373            tracing::warn!("No healthy providers available");
374            return None;
375        }
376
377        // Use the base router to select from healthy providers
378        self.base_router
379            .select_provider(&healthy_provider_ids, context)
380    }
381
382    pub async fn update_provider_health(&self, provider_id: &str, success: bool, latency_ms: u64) {
383        let mut health_checker = self.health_checker.write().await;
384        let status = health_checker
385            .provider_health
386            .entry(provider_id.to_string())
387            .or_default();
388
389        if success {
390            status.consecutive_failures = 0;
391            status.success_rate = status.success_rate * 0.9 + 0.1;
392        } else {
393            status.consecutive_failures += 1;
394            status.success_rate *= 0.9;
395        }
396
397        // Update average latency
398        if status.average_latency_ms == 0.0 {
399            status.average_latency_ms = latency_ms as f64;
400        } else {
401            status.average_latency_ms = status.average_latency_ms * 0.8 + latency_ms as f64 * 0.2;
402        }
403
404        status.last_check_timestamp = chrono::Utc::now().timestamp() as u64;
405    }
406
407    pub async fn start_health_monitoring(&self, providers: HashMap<String, Arc<dyn Provider>>) {
408        let health_checker = self.health_checker.clone();
409        let interval = self.config.health_check_interval;
410
411        tokio::spawn(async move {
412            let mut interval_timer = tokio::time::interval(interval);
413
414            loop {
415                interval_timer.tick().await;
416
417                let mut checker = health_checker.write().await;
418
419                for (provider_id, provider) in &providers {
420                    let _health_status = checker.check_provider_health(provider_id, provider).await;
421
422                    tracing::debug!(
423                        provider = provider_id,
424                        status = ?_health_status.status,
425                        latency_ms = _health_status.average_latency_ms,
426                        "Provider health check completed"
427                    );
428                }
429
430                drop(checker);
431
432                // Small delay between provider checks to avoid overwhelming
433                tokio::time::sleep(Duration::from_millis(100)).await;
434            }
435        });
436    }
437
438    pub async fn get_routing_stats(&self) -> RoutingStats {
439        let health_checker = self.health_checker.read().await;
440        let provider_stats = health_checker.get_provider_stats();
441
442        let total_providers = provider_stats.len();
443        let healthy_providers = provider_stats
444            .values()
445            .filter(|status| matches!(status.status, HealthStatus::Healthy))
446            .count();
447        let degraded_providers = provider_stats
448            .values()
449            .filter(|status| matches!(status.status, HealthStatus::Degraded))
450            .count();
451        let unhealthy_providers = provider_stats
452            .values()
453            .filter(|status| matches!(status.status, HealthStatus::Unhealthy))
454            .count();
455
456        let average_latency = if !provider_stats.is_empty() {
457            provider_stats
458                .values()
459                .map(|status| status.average_latency_ms)
460                .sum::<f64>()
461                / provider_stats.len() as f64
462        } else {
463            0.0
464        };
465
466        RoutingStats {
467            total_providers,
468            healthy_providers,
469            degraded_providers,
470            unhealthy_providers,
471            average_latency_ms: average_latency,
472            provider_details: provider_stats,
473        }
474    }
475}
476
477#[derive(Debug, Clone, Serialize, Deserialize)]
478pub struct RoutingStats {
479    pub total_providers: usize,
480    pub healthy_providers: usize,
481    pub degraded_providers: usize,
482    pub unhealthy_providers: usize,
483    pub average_latency_ms: f64,
484    pub provider_details: HashMap<String, ProviderHealthStatus>,
485}
486
487// Failover and circuit breaker functionality
488#[derive(Debug, Clone)]
489pub struct CircuitBreaker {
490    failure_threshold: u32,
491    recovery_timeout: Duration,
492    #[allow(dead_code)]
493    half_open_max_calls: u32,
494}
495
496impl Default for CircuitBreaker {
497    fn default() -> Self {
498        Self {
499            failure_threshold: 5,
500            recovery_timeout: Duration::from_secs(60),
501            half_open_max_calls: 3,
502        }
503    }
504}
505
506#[derive(Debug, Clone, PartialEq)]
507pub enum CircuitState {
508    Closed,   // Normal operation
509    Open,     // Failing, rejecting requests
510    HalfOpen, // Testing if service has recovered
511}
512
513pub struct ProviderCircuitBreaker {
514    state: CircuitState,
515    failure_count: u32,
516    last_failure_time: Option<Instant>,
517    config: CircuitBreaker,
518}
519
520impl ProviderCircuitBreaker {
521    pub fn new(config: CircuitBreaker) -> Self {
522        Self {
523            state: CircuitState::Closed,
524            failure_count: 0,
525            last_failure_time: None,
526            config,
527        }
528    }
529
530    pub fn can_execute(&mut self) -> bool {
531        match self.state {
532            CircuitState::Closed => true,
533            CircuitState::Open => {
534                if let Some(last_failure) = self.last_failure_time {
535                    if last_failure.elapsed() >= self.config.recovery_timeout {
536                        self.state = CircuitState::HalfOpen;
537                        self.failure_count = 0;
538                        true
539                    } else {
540                        false
541                    }
542                } else {
543                    false
544                }
545            }
546            CircuitState::HalfOpen => true,
547        }
548    }
549
550    pub fn record_success(&mut self) {
551        self.failure_count = 0;
552        self.state = CircuitState::Closed;
553        self.last_failure_time = None;
554    }
555
556    pub fn record_failure(&mut self) {
557        self.failure_count += 1;
558        self.last_failure_time = Some(Instant::now());
559
560        match self.state {
561            CircuitState::Closed => {
562                if self.failure_count >= self.config.failure_threshold {
563                    self.state = CircuitState::Open;
564                }
565            }
566            CircuitState::HalfOpen => {
567                self.state = CircuitState::Open;
568            }
569            CircuitState::Open => {
570                // Already open, do nothing
571            }
572        }
573    }
574
575    pub fn get_state(&self) -> CircuitState {
576        self.state.clone()
577    }
578}