ultrafast_gateway/advanced_routing.rs
1//! # Advanced Routing and Load Balancing Module
2//!
3//! This module provides intelligent request routing, load balancing, and provider
4//! health monitoring for the Ultrafast Gateway. It includes circuit breakers,
5//! health checks, and adaptive routing strategies.
6//!
7//! ## Overview
8//!
9//! The advanced routing system provides:
10//! - **Intelligent Provider Selection**: Route requests to the best available provider
11//! - **Health Monitoring**: Real-time provider health tracking
12//! - **Circuit Breakers**: Automatic failover and recovery
13//! - **Load Balancing**: Distribute requests across healthy providers
14//! - **Adaptive Routing**: Dynamic routing based on performance metrics
15//! - **Failover Strategies**: Automatic fallback to backup providers
16//!
17//! ## Routing Strategies
18//!
19//! The system supports multiple routing strategies:
20//!
21//! - **Single Provider**: Route all requests to a single provider
22//! - **Load Balancing**: Distribute requests across multiple providers
23//! - **Failover**: Primary provider with automatic fallback
24//! - **A/B Testing**: Route requests to different providers for testing
25//! - **Geographic Routing**: Route based on geographic location
26//! - **Cost-Based Routing**: Route to the most cost-effective provider
27//!
28//! ## Health Monitoring
29//!
30//! Real-time provider health tracking includes:
31//!
32//! - **Latency Monitoring**: Track response times for each provider
33//! - **Success Rate Tracking**: Monitor success/failure ratios
34//! - **Consecutive Failures**: Track consecutive failure counts
35//! - **Automatic Health Checks**: Periodic provider health verification
36//! - **Degraded State Detection**: Identify underperforming providers
37//!
38//! ## Circuit Breaker Pattern
39//!
40//! The system implements circuit breakers for each provider:
41//!
42//! - **Closed State**: Normal operation, requests allowed
43//! - **Open State**: Provider failing, requests blocked
44//! - **Half-Open State**: Testing if provider has recovered
45//! - **Automatic Recovery**: Automatic state transitions based on health
46//!
47//! ## Usage
48//!
49//! ```rust
50//! use ultrafast_gateway::advanced_routing::{AdvancedRouter, RoutingConfig};
51//! use ultrafast_models_sdk::routing::RoutingStrategy;
52//!
53//! // Create advanced router
54//! let config = RoutingConfig {
55//! strategy: RoutingStrategy::LoadBalancing,
56//! health_check_interval: Duration::from_secs(30),
57//! failover_threshold: 0.8,
58//! };
59//!
60//! let router = AdvancedRouter::new(RoutingStrategy::LoadBalancing, config);
61//!
62//! // Select provider for request
63//! let selection = router.select_provider(&providers, &context).await;
64//!
65//! // Update provider health after request
66//! router.update_provider_health("openai", true, 150).await;
67//! ```
68//!
69//! ## Health Check Configuration
70//!
71//! Health checks can be configured via `RoutingConfig`:
72//!
73//! ```toml
74//! [routing]
75//! strategy = "load_balancing"
76//! health_check_interval = "30s"
77//! failover_threshold = 0.8
78//! ```
79//!
80//! ## Performance Metrics
81//!
82//! The routing system tracks comprehensive metrics:
83//!
84//! - **Provider Latency**: Average response times per provider
85//! - **Success Rates**: Request success percentages
86//! - **Failure Counts**: Consecutive failure tracking
87//! - **Health Status**: Current health state of each provider
88//! - **Routing Decisions**: Provider selection statistics
89//!
90//! ## Failover Strategies
91//!
92//! The system supports multiple failover strategies:
93//!
94//! - **Immediate Failover**: Switch to backup on first failure
95//! - **Threshold-Based**: Switch after N consecutive failures
96//! - **Latency-Based**: Switch when latency exceeds threshold
97//! - **Cost-Based**: Switch to cheaper provider when possible
98//!
99//! ## Monitoring and Alerts
100//!
101//! The routing system provides monitoring capabilities:
102//!
103//! - **Health Status Endpoints**: Real-time provider health
104//! - **Performance Metrics**: Detailed routing statistics
105//! - **Circuit Breaker Status**: Current state of each provider
106//! - **Alert Integration**: Notifications for unhealthy providers
107
108// Advanced routing and load balancing module with provider health checks
109use crate::config::RoutingConfig;
110use serde::{Deserialize, Serialize};
111use std::collections::HashMap;
112use std::sync::{Arc, OnceLock};
113use std::time::{Duration, Instant};
114use tokio::sync::RwLock;
115use ultrafast_models_sdk::providers::{HealthStatus, Provider};
116use ultrafast_models_sdk::routing::{ProviderSelection, Router, RoutingContext, RoutingStrategy};
117
118/// Global health checker storage for thread-safe access.
119///
120/// Uses `OnceLock` to ensure the health checker is initialized only once
121/// and shared across all threads.
122static HEALTH_CHECKER: OnceLock<Arc<RwLock<HealthChecker>>> = OnceLock::new();
123
124/// Get the global health checker instance.
125///
126/// Returns a reference to the global health checker, initializing it
127/// if it hasn't been initialized yet.
128fn get_health_checker() -> &'static Arc<RwLock<HealthChecker>> {
129 HEALTH_CHECKER.get_or_init(|| Arc::new(RwLock::new(HealthChecker::new())))
130}
131
132/// Health status for a specific provider.
133///
134/// Tracks the current health state, performance metrics, and failure
135/// statistics for a provider.
136///
137/// # Example
138///
139/// ```rust
140/// let status = ProviderHealthStatus {
141/// provider_id: "openai".to_string(),
142/// status: HealthStatus::Healthy,
143/// last_check_timestamp: chrono::Utc::now().timestamp() as u64,
144/// consecutive_failures: 0,
145/// average_latency_ms: 150.0,
146/// success_rate: 0.99,
147/// };
148/// ```
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct ProviderHealthStatus {
151 /// Provider identifier
152 pub provider_id: String,
153 /// Current health status (Healthy, Degraded, Unhealthy, Unknown)
154 pub status: HealthStatus,
155 /// Timestamp of the last health check
156 pub last_check_timestamp: u64,
157 /// Number of consecutive failures
158 pub consecutive_failures: u32,
159 /// Average response latency in milliseconds
160 pub average_latency_ms: f64,
161 /// Success rate as a percentage (0.0 to 1.0)
162 pub success_rate: f64,
163}
164
165impl Default for ProviderHealthStatus {
166 fn default() -> Self {
167 Self {
168 provider_id: String::new(),
169 status: HealthStatus::Unknown,
170 last_check_timestamp: chrono::Utc::now().timestamp() as u64,
171 consecutive_failures: 0,
172 average_latency_ms: 0.0,
173 success_rate: 1.0,
174 }
175 }
176}
177
178/// Health checker for monitoring provider health and performance.
179///
180/// Manages health checks, tracks performance metrics, and provides
181/// health status information for routing decisions.
182///
183/// # Thread Safety
184///
185/// All operations are thread-safe and can be used concurrently.
186///
187/// # Example
188///
189/// ```rust
190/// let mut health_checker = HealthChecker::new();
191/// health_checker.set_config(routing_config);
192///
193/// let status = health_checker.check_provider_health("openai", &provider).await;
194/// ```
195#[derive(Debug)]
196pub struct HealthChecker {
197 /// Health status for each provider
198 provider_health: HashMap<String, ProviderHealthStatus>,
199 /// Routing configuration for health check settings
200 config: Option<RoutingConfig>,
201}
202
203impl Default for HealthChecker {
204 fn default() -> Self {
205 Self::new()
206 }
207}
208
209impl HealthChecker {
210 /// Create a new health checker instance.
211 ///
212 /// Initializes an empty health checker with no configuration.
213 /// Use `set_config()` to configure health check behavior.
214 ///
215 /// # Returns
216 ///
217 /// Returns a new `HealthChecker` instance.
218 pub fn new() -> Self {
219 Self {
220 provider_health: HashMap::new(),
221 config: None,
222 }
223 }
224
225 /// Set the routing configuration for health checks.
226 ///
227 /// Configures health check intervals, thresholds, and other
228 /// routing-related settings.
229 ///
230 /// # Arguments
231 ///
232 /// * `config` - Routing configuration with health check settings
233 pub fn set_config(&mut self, config: RoutingConfig) {
234 self.config = Some(config);
235 }
236
237 /// Check the health of a specific provider.
238 ///
239 /// Performs a health check on the provider and updates the health
240 /// status with current metrics.
241 ///
242 /// # Arguments
243 ///
244 /// * `provider_id` - The provider identifier
245 /// * `provider` - The provider instance to check
246 ///
247 /// # Returns
248 ///
249 /// Returns the updated health status for the provider.
250 pub async fn check_provider_health(
251 &mut self,
252 provider_id: &str,
253 provider: &Arc<dyn Provider>,
254 ) -> ProviderHealthStatus {
255 let start = Instant::now();
256
257 match provider.health_check().await {
258 Ok(health) => {
259 let latency = start.elapsed().as_millis() as f64;
260
261 let status = self
262 .provider_health
263 .entry(provider_id.to_string())
264 .or_default();
265 status.provider_id = provider_id.to_string();
266 status.status = health.status;
267 status.last_check_timestamp = chrono::Utc::now().timestamp() as u64;
268 status.consecutive_failures = 0;
269
270 // Update average latency with exponential moving average
271 if status.average_latency_ms == 0.0 {
272 status.average_latency_ms = latency;
273 } else {
274 status.average_latency_ms = status.average_latency_ms * 0.8 + latency * 0.2;
275 }
276
277 // Update success rate
278 status.success_rate = status.success_rate * 0.9 + 0.1;
279
280 status.clone()
281 }
282 Err(_) => {
283 let status = self
284 .provider_health
285 .entry(provider_id.to_string())
286 .or_default();
287 status.provider_id = provider_id.to_string();
288 status.status = HealthStatus::Unhealthy;
289 status.last_check_timestamp = chrono::Utc::now().timestamp() as u64;
290 status.consecutive_failures += 1;
291
292 // Update success rate
293 status.success_rate *= 0.9;
294
295 status.clone()
296 }
297 }
298 }
299
300 pub fn get_healthy_providers(&self) -> Vec<String> {
301 self.provider_health
302 .iter()
303 .filter(|(_, health)| {
304 matches!(
305 health.status,
306 HealthStatus::Healthy | HealthStatus::Degraded
307 ) && health.consecutive_failures < 3
308 })
309 .map(|(id, _)| id.clone())
310 .collect()
311 }
312
313 pub fn should_use_provider(&self, provider_id: &str) -> bool {
314 if let Some(health) = self.provider_health.get(provider_id) {
315 let config = self.config.as_ref();
316 let failover_threshold = config.map(|c| c.failover_threshold).unwrap_or(0.8);
317
318 health.success_rate >= failover_threshold && health.consecutive_failures < 5
319 } else {
320 true // Allow unknown providers to be tried
321 }
322 }
323
324 pub fn get_provider_stats(&self) -> HashMap<String, ProviderHealthStatus> {
325 self.provider_health.clone()
326 }
327
328 pub fn is_provider_healthy(&self, provider_id: &str) -> bool {
329 if let Some(health) = self.provider_health.get(provider_id) {
330 matches!(
331 health.status,
332 HealthStatus::Healthy | HealthStatus::Degraded
333 ) && health.consecutive_failures < 3
334 } else {
335 true
336 }
337 }
338}
339
340pub struct AdvancedRouter {
341 base_router: Router,
342 health_checker: Arc<RwLock<HealthChecker>>,
343 config: RoutingConfig,
344}
345
346impl AdvancedRouter {
347 pub fn new(strategy: RoutingStrategy, config: RoutingConfig) -> Self {
348 let health_checker = get_health_checker().clone();
349
350 Self {
351 base_router: Router::new(strategy),
352 health_checker,
353 config,
354 }
355 }
356
357 pub async fn select_provider(
358 &self,
359 providers: &HashMap<String, Arc<dyn Provider>>,
360 context: &RoutingContext,
361 ) -> Option<ProviderSelection> {
362 // First, filter out unhealthy providers
363 let health_checker = self.health_checker.read().await;
364 let healthy_provider_ids: Vec<String> = providers
365 .keys()
366 .filter(|id| health_checker.is_provider_healthy(id))
367 .cloned()
368 .collect();
369
370 drop(health_checker);
371
372 if healthy_provider_ids.is_empty() {
373 tracing::warn!("No healthy providers available");
374 return None;
375 }
376
377 // Use the base router to select from healthy providers
378 self.base_router
379 .select_provider(&healthy_provider_ids, context)
380 }
381
382 pub async fn update_provider_health(&self, provider_id: &str, success: bool, latency_ms: u64) {
383 let mut health_checker = self.health_checker.write().await;
384 let status = health_checker
385 .provider_health
386 .entry(provider_id.to_string())
387 .or_default();
388
389 if success {
390 status.consecutive_failures = 0;
391 status.success_rate = status.success_rate * 0.9 + 0.1;
392 } else {
393 status.consecutive_failures += 1;
394 status.success_rate *= 0.9;
395 }
396
397 // Update average latency
398 if status.average_latency_ms == 0.0 {
399 status.average_latency_ms = latency_ms as f64;
400 } else {
401 status.average_latency_ms = status.average_latency_ms * 0.8 + latency_ms as f64 * 0.2;
402 }
403
404 status.last_check_timestamp = chrono::Utc::now().timestamp() as u64;
405 }
406
407 pub async fn start_health_monitoring(&self, providers: HashMap<String, Arc<dyn Provider>>) {
408 let health_checker = self.health_checker.clone();
409 let interval = self.config.health_check_interval;
410
411 tokio::spawn(async move {
412 let mut interval_timer = tokio::time::interval(interval);
413
414 loop {
415 interval_timer.tick().await;
416
417 let mut checker = health_checker.write().await;
418
419 for (provider_id, provider) in &providers {
420 let _health_status = checker.check_provider_health(provider_id, provider).await;
421
422 tracing::debug!(
423 provider = provider_id,
424 status = ?_health_status.status,
425 latency_ms = _health_status.average_latency_ms,
426 "Provider health check completed"
427 );
428 }
429
430 drop(checker);
431
432 // Small delay between provider checks to avoid overwhelming
433 tokio::time::sleep(Duration::from_millis(100)).await;
434 }
435 });
436 }
437
438 pub async fn get_routing_stats(&self) -> RoutingStats {
439 let health_checker = self.health_checker.read().await;
440 let provider_stats = health_checker.get_provider_stats();
441
442 let total_providers = provider_stats.len();
443 let healthy_providers = provider_stats
444 .values()
445 .filter(|status| matches!(status.status, HealthStatus::Healthy))
446 .count();
447 let degraded_providers = provider_stats
448 .values()
449 .filter(|status| matches!(status.status, HealthStatus::Degraded))
450 .count();
451 let unhealthy_providers = provider_stats
452 .values()
453 .filter(|status| matches!(status.status, HealthStatus::Unhealthy))
454 .count();
455
456 let average_latency = if !provider_stats.is_empty() {
457 provider_stats
458 .values()
459 .map(|status| status.average_latency_ms)
460 .sum::<f64>()
461 / provider_stats.len() as f64
462 } else {
463 0.0
464 };
465
466 RoutingStats {
467 total_providers,
468 healthy_providers,
469 degraded_providers,
470 unhealthy_providers,
471 average_latency_ms: average_latency,
472 provider_details: provider_stats,
473 }
474 }
475}
476
477#[derive(Debug, Clone, Serialize, Deserialize)]
478pub struct RoutingStats {
479 pub total_providers: usize,
480 pub healthy_providers: usize,
481 pub degraded_providers: usize,
482 pub unhealthy_providers: usize,
483 pub average_latency_ms: f64,
484 pub provider_details: HashMap<String, ProviderHealthStatus>,
485}
486
487// Failover and circuit breaker functionality
488#[derive(Debug, Clone)]
489pub struct CircuitBreaker {
490 failure_threshold: u32,
491 recovery_timeout: Duration,
492 #[allow(dead_code)]
493 half_open_max_calls: u32,
494}
495
496impl Default for CircuitBreaker {
497 fn default() -> Self {
498 Self {
499 failure_threshold: 5,
500 recovery_timeout: Duration::from_secs(60),
501 half_open_max_calls: 3,
502 }
503 }
504}
505
506#[derive(Debug, Clone, PartialEq)]
507pub enum CircuitState {
508 Closed, // Normal operation
509 Open, // Failing, rejecting requests
510 HalfOpen, // Testing if service has recovered
511}
512
513pub struct ProviderCircuitBreaker {
514 state: CircuitState,
515 failure_count: u32,
516 last_failure_time: Option<Instant>,
517 config: CircuitBreaker,
518}
519
520impl ProviderCircuitBreaker {
521 pub fn new(config: CircuitBreaker) -> Self {
522 Self {
523 state: CircuitState::Closed,
524 failure_count: 0,
525 last_failure_time: None,
526 config,
527 }
528 }
529
530 pub fn can_execute(&mut self) -> bool {
531 match self.state {
532 CircuitState::Closed => true,
533 CircuitState::Open => {
534 if let Some(last_failure) = self.last_failure_time {
535 if last_failure.elapsed() >= self.config.recovery_timeout {
536 self.state = CircuitState::HalfOpen;
537 self.failure_count = 0;
538 true
539 } else {
540 false
541 }
542 } else {
543 false
544 }
545 }
546 CircuitState::HalfOpen => true,
547 }
548 }
549
550 pub fn record_success(&mut self) {
551 self.failure_count = 0;
552 self.state = CircuitState::Closed;
553 self.last_failure_time = None;
554 }
555
556 pub fn record_failure(&mut self) {
557 self.failure_count += 1;
558 self.last_failure_time = Some(Instant::now());
559
560 match self.state {
561 CircuitState::Closed => {
562 if self.failure_count >= self.config.failure_threshold {
563 self.state = CircuitState::Open;
564 }
565 }
566 CircuitState::HalfOpen => {
567 self.state = CircuitState::Open;
568 }
569 CircuitState::Open => {
570 // Already open, do nothing
571 }
572 }
573 }
574
575 pub fn get_state(&self) -> CircuitState {
576 self.state.clone()
577 }
578}