Skip to main content

batuta/serve/
router.rs

1//! Spillover Router for Hybrid Cloud
2//!
3//! Implements dynamic routing logic to "spill" excess local traffic
4//! to remote APIs when local queue depth exceeds thresholds.
5//!
6//! Toyota Way: "Heijunka" (Level Loading) across backends.
7
8use crate::serve::backends::{PrivacyTier, ServingBackend};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14// ============================================================================
15// SERVE-RTR-001: Queue Metrics
16// ============================================================================
17
18/// Queue metrics for a backend
19#[derive(Debug, Default)]
20pub struct QueueMetrics {
21    /// Current queue depth
22    depth: AtomicUsize,
23    /// Total requests processed
24    total_requests: AtomicU64,
25    /// Total latency in milliseconds (for averaging)
26    total_latency_ms: AtomicU64,
27    /// Requests in last window
28    recent_requests: AtomicU64,
29}
30
31impl QueueMetrics {
32    /// Create new metrics
33    #[must_use]
34    pub fn new() -> Self {
35        Self::default()
36    }
37
38    /// Increment queue depth (request started)
39    pub fn enqueue(&self) {
40        self.depth.fetch_add(1, Ordering::SeqCst);
41    }
42
43    /// Decrement queue depth (request completed)
44    pub fn dequeue(&self, latency_ms: u64) {
45        self.depth.fetch_sub(1, Ordering::SeqCst);
46        self.total_requests.fetch_add(1, Ordering::SeqCst);
47        self.total_latency_ms.fetch_add(latency_ms, Ordering::SeqCst);
48        self.recent_requests.fetch_add(1, Ordering::SeqCst);
49    }
50
51    /// Get current queue depth
52    #[must_use]
53    pub fn depth(&self) -> usize {
54        self.depth.load(Ordering::SeqCst)
55    }
56
57    /// Get average latency in milliseconds
58    #[must_use]
59    pub fn avg_latency_ms(&self) -> f64 {
60        let total = self.total_requests.load(Ordering::SeqCst);
61        if total == 0 {
62            0.0
63        } else {
64            self.total_latency_ms.load(Ordering::SeqCst) as f64 / total as f64
65        }
66    }
67
68    /// Get total requests processed
69    #[must_use]
70    pub fn total_requests(&self) -> u64 {
71        self.total_requests.load(Ordering::SeqCst)
72    }
73
74    /// Reset recent request counter (for rate calculation)
75    pub fn reset_recent(&self) {
76        self.recent_requests.store(0, Ordering::SeqCst);
77    }
78
79    /// Get recent requests and reset
80    #[must_use]
81    pub fn take_recent(&self) -> u64 {
82        self.recent_requests.swap(0, Ordering::SeqCst)
83    }
84}
85
86// ============================================================================
87// SERVE-RTR-002: Router Configuration
88// ============================================================================
89
90/// Spillover routing configuration
91#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct RouterConfig {
93    /// Queue depth threshold before spillover
94    pub spillover_threshold: usize,
95    /// Maximum queue depth (reject requests)
96    pub max_queue_depth: usize,
97    /// Target latency SLA in milliseconds
98    pub latency_sla_ms: u64,
99    /// Privacy tier for routing decisions
100    pub privacy: PrivacyTier,
101    /// Preferred local backend
102    pub local_backend: ServingBackend,
103    /// Spillover backends in priority order
104    pub spillover_backends: Vec<ServingBackend>,
105    /// Enable spillover (can disable for testing)
106    pub spillover_enabled: bool,
107}
108
109impl Default for RouterConfig {
110    fn default() -> Self {
111        Self {
112            spillover_threshold: 10,
113            max_queue_depth: 50,
114            latency_sla_ms: 1000, // 1 second
115            privacy: PrivacyTier::Standard,
116            local_backend: ServingBackend::Realizar,
117            spillover_backends: vec![
118                ServingBackend::Groq,      // Fastest
119                ServingBackend::Together,  // Cost-effective
120                ServingBackend::Fireworks, // Good balance
121            ],
122            spillover_enabled: true,
123        }
124    }
125}
126
127impl RouterConfig {
128    /// Create sovereign config (no spillover to public APIs)
129    #[must_use]
130    pub fn sovereign() -> Self {
131        Self {
132            privacy: PrivacyTier::Sovereign,
133            spillover_backends: vec![ServingBackend::Ollama, ServingBackend::LlamaCpp],
134            spillover_enabled: true,
135            ..Default::default()
136        }
137    }
138
139    /// Create config with custom threshold
140    #[must_use]
141    pub fn with_threshold(threshold: usize) -> Self {
142        Self { spillover_threshold: threshold, ..Default::default() }
143    }
144}
145
146// ============================================================================
147// SERVE-RTR-003: Routing Decision
148// ============================================================================
149
150/// Routing decision result
151#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum RoutingDecision {
153    /// Route to primary local backend
154    Local(ServingBackend),
155    /// Spillover to remote backend
156    Spillover(ServingBackend),
157    /// Reject request (queue full)
158    Reject(RejectReason),
159}
160
161/// Reason for rejection
162#[derive(Debug, Clone, PartialEq, Eq)]
163pub enum RejectReason {
164    /// Queue depth exceeded
165    QueueFull,
166    /// No backends available
167    NoBackends,
168    /// Privacy constraint
169    PrivacyViolation,
170}
171
172impl std::fmt::Display for RejectReason {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        match self {
175            Self::QueueFull => write!(f, "Queue full, try again later"),
176            Self::NoBackends => write!(f, "No backends available"),
177            Self::PrivacyViolation => write!(f, "Request violates privacy constraints"),
178        }
179    }
180}
181
182// ============================================================================
183// SERVE-RTR-004: Spillover Router
184// ============================================================================
185
186/// Spillover router for hybrid cloud routing
187pub struct SpilloverRouter {
188    config: RouterConfig,
189    /// Metrics per backend
190    metrics: HashMap<ServingBackend, QueueMetrics>,
191    /// Last metrics window time
192    last_window: std::sync::RwLock<Instant>,
193    /// Window duration for rate calculation
194    window_duration: Duration,
195}
196
197impl SpilloverRouter {
198    /// Create a new spillover router
199    #[must_use]
200    pub fn new(config: RouterConfig) -> Self {
201        let mut metrics = HashMap::new();
202        metrics.insert(config.local_backend, QueueMetrics::new());
203        for backend in &config.spillover_backends {
204            metrics.insert(*backend, QueueMetrics::new());
205        }
206
207        Self {
208            config,
209            metrics,
210            last_window: std::sync::RwLock::new(Instant::now()),
211            window_duration: Duration::from_secs(60),
212        }
213    }
214
215    /// Create with default config
216    #[must_use]
217    pub fn with_defaults() -> Self {
218        Self::new(RouterConfig::default())
219    }
220
221    /// Route a request
222    #[must_use]
223    pub fn route(&self) -> RoutingDecision {
224        // Get local queue metrics
225        let local_depth = self.backend_depth(self.config.local_backend);
226
227        // Check if we should reject
228        if local_depth >= self.config.max_queue_depth {
229            // Check spillover backends
230            if self.config.spillover_enabled {
231                if let Some(backend) = self.find_available_spillover() {
232                    return RoutingDecision::Spillover(backend);
233                }
234            }
235            return RoutingDecision::Reject(RejectReason::QueueFull);
236        }
237
238        // Check if we should spillover
239        if self.config.spillover_enabled && local_depth >= self.config.spillover_threshold {
240            if let Some(backend) = self.find_available_spillover() {
241                return RoutingDecision::Spillover(backend);
242            }
243        }
244
245        // Route to local
246        RoutingDecision::Local(self.config.local_backend)
247    }
248
249    /// Get queue depth for a backend, returning 0 if unknown
250    fn backend_depth(&self, backend: ServingBackend) -> usize {
251        self.metrics.get(&backend).map_or(0, QueueMetrics::depth)
252    }
253
254    /// Find an available spillover backend
255    fn find_available_spillover(&self) -> Option<ServingBackend> {
256        self.config
257            .spillover_backends
258            .iter()
259            .copied()
260            .filter(|b| self.config.privacy.allows(*b))
261            .find(|b| self.backend_depth(*b) < self.config.max_queue_depth)
262    }
263
264    /// Record request start
265    pub fn start_request(&self, backend: ServingBackend) {
266        if let Some(metrics) = self.metrics.get(&backend) {
267            metrics.enqueue();
268        }
269    }
270
271    /// Record request completion
272    pub fn complete_request(&self, backend: ServingBackend, latency_ms: u64) {
273        if let Some(metrics) = self.metrics.get(&backend) {
274            metrics.dequeue(latency_ms);
275        }
276    }
277
278    /// Get current queue depth for a backend
279    #[must_use]
280    pub fn queue_depth(&self, backend: ServingBackend) -> usize {
281        self.backend_depth(backend)
282    }
283
284    /// Get total local queue depth
285    #[must_use]
286    pub fn local_queue_depth(&self) -> usize {
287        self.queue_depth(self.config.local_backend)
288    }
289
290    /// Get router statistics
291    #[must_use]
292    pub fn stats(&self) -> RouterStats {
293        let local_latency =
294            self.metrics.get(&self.config.local_backend).map_or(0.0, QueueMetrics::avg_latency_ms);
295
296        let spillover_depth: usize =
297            self.config.spillover_backends.iter().map(|b| self.backend_depth(*b)).sum();
298
299        RouterStats {
300            local_queue_depth: self.local_queue_depth(),
301            local_avg_latency_ms: local_latency,
302            spillover_queue_depth: spillover_depth,
303            spillover_threshold: self.config.spillover_threshold,
304            max_queue_depth: self.config.max_queue_depth,
305            spillover_enabled: self.config.spillover_enabled,
306        }
307    }
308
309    /// Get config
310    #[must_use]
311    pub fn config(&self) -> &RouterConfig {
312        &self.config
313    }
314
315    /// Check if currently spilling over
316    #[must_use]
317    pub fn is_spilling(&self) -> bool {
318        self.local_queue_depth() >= self.config.spillover_threshold
319    }
320}
321
322impl Default for SpilloverRouter {
323    fn default() -> Self {
324        Self::with_defaults()
325    }
326}
327
328/// Router statistics
329#[derive(Debug, Clone, Default)]
330pub struct RouterStats {
331    pub local_queue_depth: usize,
332    pub local_avg_latency_ms: f64,
333    pub spillover_queue_depth: usize,
334    pub spillover_threshold: usize,
335    pub max_queue_depth: usize,
336    pub spillover_enabled: bool,
337}
338
339impl RouterStats {
340    /// Queue utilization as percentage
341    #[must_use]
342    pub fn utilization(&self) -> f64 {
343        if self.max_queue_depth == 0 {
344            0.0
345        } else {
346            (self.local_queue_depth as f64 / self.max_queue_depth as f64) * 100.0
347        }
348    }
349
350    /// Check if approaching spillover
351    #[must_use]
352    pub fn near_spillover(&self) -> bool {
353        self.local_queue_depth >= (self.spillover_threshold * 80 / 100)
354    }
355}
356
357// ============================================================================
358// Tests
359// ============================================================================
360
361#[cfg(test)]
362#[allow(non_snake_case)]
363mod tests {
364    use super::*;
365
366    /// Create a router with custom threshold and max queue depth.
367    /// Spillover is enabled by default.
368    fn router_with(threshold: usize, max_depth: usize) -> SpilloverRouter {
369        SpilloverRouter::new(RouterConfig {
370            spillover_threshold: threshold,
371            max_queue_depth: max_depth,
372            ..Default::default()
373        })
374    }
375
376    /// Enqueue `n` local (Realizar) requests on the router.
377    fn fill_local_queue(router: &SpilloverRouter, n: usize) {
378        for _ in 0..n {
379            router.start_request(ServingBackend::Realizar);
380        }
381    }
382
383    // ========================================================================
384    // SERVE-RTR-001: Queue Metrics Tests
385    // ========================================================================
386
387    #[test]
388    fn test_SERVE_RTR_001_metrics_new() {
389        let metrics = QueueMetrics::new();
390        assert_eq!(metrics.depth(), 0);
391        assert_eq!(metrics.total_requests(), 0);
392    }
393
394    #[test]
395    fn test_SERVE_RTR_001_enqueue_dequeue() {
396        let metrics = QueueMetrics::new();
397        metrics.enqueue();
398        assert_eq!(metrics.depth(), 1);
399
400        metrics.enqueue();
401        assert_eq!(metrics.depth(), 2);
402
403        metrics.dequeue(100);
404        assert_eq!(metrics.depth(), 1);
405        assert_eq!(metrics.total_requests(), 1);
406    }
407
408    #[test]
409    fn test_SERVE_RTR_001_avg_latency() {
410        let metrics = QueueMetrics::new();
411        metrics.enqueue();
412        metrics.dequeue(100);
413        metrics.enqueue();
414        metrics.dequeue(200);
415
416        assert_eq!(metrics.avg_latency_ms(), 150.0);
417    }
418
419    #[test]
420    fn test_SERVE_RTR_001_avg_latency_empty() {
421        let metrics = QueueMetrics::new();
422        assert_eq!(metrics.avg_latency_ms(), 0.0);
423    }
424
425    // ========================================================================
426    // SERVE-RTR-002: Router Config Tests
427    // ========================================================================
428
429    #[test]
430    fn test_SERVE_RTR_002_default_config() {
431        let config = RouterConfig::default();
432        assert_eq!(config.spillover_threshold, 10);
433        assert_eq!(config.max_queue_depth, 50);
434        assert!(config.spillover_enabled);
435    }
436
437    #[test]
438    fn test_SERVE_RTR_002_sovereign_config() {
439        let config = RouterConfig::sovereign();
440        assert_eq!(config.privacy, PrivacyTier::Sovereign);
441        // Should only have local backends
442        for backend in &config.spillover_backends {
443            assert!(backend.is_local());
444        }
445    }
446
447    #[test]
448    fn test_SERVE_RTR_002_custom_threshold() {
449        let config = RouterConfig::with_threshold(5);
450        assert_eq!(config.spillover_threshold, 5);
451    }
452
453    // ========================================================================
454    // SERVE-RTR-003: Routing Decision Tests
455    // ========================================================================
456
457    #[test]
458    fn test_SERVE_RTR_003_route_local_empty_queue() {
459        let router = SpilloverRouter::with_defaults();
460        let decision = router.route();
461        assert!(matches!(decision, RoutingDecision::Local(_)));
462    }
463
464    #[test]
465    fn test_SERVE_RTR_003_route_spillover_when_busy() {
466        let router = router_with(2, 10);
467
468        // Fill local queue past threshold
469        fill_local_queue(&router, 3);
470
471        let decision = router.route();
472        assert!(matches!(decision, RoutingDecision::Spillover(_)));
473    }
474
475    #[test]
476    fn test_SERVE_RTR_003_route_reject_when_full() {
477        let router = SpilloverRouter::new(RouterConfig {
478            spillover_threshold: 2,
479            max_queue_depth: 3,
480            spillover_enabled: false, // Disable spillover
481            ..Default::default()
482        });
483
484        // Fill queue to max
485        fill_local_queue(&router, 3);
486
487        let decision = router.route();
488        assert!(matches!(decision, RoutingDecision::Reject(RejectReason::QueueFull)));
489    }
490
491    // ========================================================================
492    // SERVE-RTR-004: Spillover Router Tests
493    // ========================================================================
494
495    #[test]
496    fn test_SERVE_RTR_004_queue_depth() {
497        let router = SpilloverRouter::with_defaults();
498        assert_eq!(router.local_queue_depth(), 0);
499
500        router.start_request(ServingBackend::Realizar);
501        assert_eq!(router.local_queue_depth(), 1);
502
503        router.complete_request(ServingBackend::Realizar, 50);
504        assert_eq!(router.local_queue_depth(), 0);
505    }
506
507    #[test]
508    fn test_SERVE_RTR_004_is_spilling() {
509        let router = router_with(2, 50);
510
511        assert!(!router.is_spilling());
512
513        fill_local_queue(&router, 2);
514
515        assert!(router.is_spilling());
516    }
517
518    // ========================================================================
519    // SERVE-RTR-005: Statistics Tests
520    // ========================================================================
521
522    #[test]
523    fn test_SERVE_RTR_005_stats() {
524        let router = SpilloverRouter::with_defaults();
525        let stats = router.stats();
526        assert_eq!(stats.local_queue_depth, 0);
527        assert!(stats.spillover_enabled);
528    }
529
530    #[test]
531    fn test_SERVE_RTR_005_utilization() {
532        let router = router_with(10, 100);
533
534        // Add 25 requests
535        fill_local_queue(&router, 25);
536
537        let stats = router.stats();
538        assert_eq!(stats.utilization(), 25.0);
539    }
540
541    #[test]
542    fn test_SERVE_RTR_005_near_spillover() {
543        let router = router_with(10, 50);
544
545        // 80% of threshold = 8
546        fill_local_queue(&router, 8);
547
548        let stats = router.stats();
549        assert!(stats.near_spillover());
550    }
551
552    // ========================================================================
553    // SERVE-RTR-006: Privacy Tests
554    // ========================================================================
555
556    #[test]
557    fn test_SERVE_RTR_006_sovereign_no_public_spillover() {
558        let router = SpilloverRouter::new(RouterConfig::sovereign());
559
560        // Fill local queue
561        fill_local_queue(&router, 15);
562
563        let decision = router.route();
564        // Should spillover to local backend only
565        match decision {
566            RoutingDecision::Spillover(backend) => assert!(backend.is_local()),
567            RoutingDecision::Local(_) => {}  // Also acceptable
568            RoutingDecision::Reject(_) => {} // If no local backends available
569        }
570    }
571
572    // ========================================================================
573    // SERVE-RTR-007: Reject Reason Display Tests
574    // ========================================================================
575
576    #[test]
577    fn test_SERVE_RTR_007_reject_reason_display() {
578        assert!(RejectReason::QueueFull.to_string().contains("Queue"));
579        assert!(RejectReason::NoBackends.to_string().contains("backend"));
580        assert!(RejectReason::PrivacyViolation.to_string().contains("privacy"));
581    }
582
583    // ========================================================================
584    // Additional coverage tests
585    // ========================================================================
586
587    #[test]
588    fn test_queue_metrics_reset_recent() {
589        let metrics = QueueMetrics::new();
590        metrics.enqueue();
591        metrics.dequeue(50);
592        assert_eq!(metrics.total_requests(), 1);
593        metrics.reset_recent();
594        // total_requests unchanged, but recent reset
595        assert_eq!(metrics.total_requests(), 1);
596    }
597
598    #[test]
599    fn test_queue_metrics_take_recent() {
600        let metrics = QueueMetrics::new();
601        metrics.enqueue();
602        metrics.dequeue(100);
603        metrics.enqueue();
604        metrics.dequeue(200);
605        let recent = metrics.take_recent();
606        assert_eq!(recent, 2);
607        // After take, recent should be 0
608        let recent_after = metrics.take_recent();
609        assert_eq!(recent_after, 0);
610    }
611
612    #[test]
613    fn test_queue_metrics_default() {
614        let metrics = QueueMetrics::default();
615        assert_eq!(metrics.depth(), 0);
616        assert_eq!(metrics.total_requests(), 0);
617        assert_eq!(metrics.avg_latency_ms(), 0.0);
618    }
619
620    #[test]
621    fn test_router_stats_default() {
622        let stats = RouterStats::default();
623        assert_eq!(stats.local_queue_depth, 0);
624        assert_eq!(stats.local_avg_latency_ms, 0.0);
625        assert_eq!(stats.spillover_queue_depth, 0);
626        assert_eq!(stats.spillover_threshold, 0);
627        assert_eq!(stats.max_queue_depth, 0);
628        assert!(!stats.spillover_enabled);
629    }
630
631    #[test]
632    fn test_router_stats_utilization_zero_max() {
633        let stats = RouterStats { max_queue_depth: 0, local_queue_depth: 5, ..Default::default() };
634        assert_eq!(stats.utilization(), 0.0);
635    }
636
637    #[test]
638    fn test_router_stats_near_spillover_false() {
639        let stats =
640            RouterStats { spillover_threshold: 100, local_queue_depth: 10, ..Default::default() };
641        assert!(!stats.near_spillover());
642    }
643
644    #[test]
645    fn test_spillover_router_default() {
646        let router = SpilloverRouter::default();
647        assert_eq!(router.local_queue_depth(), 0);
648        assert!(!router.is_spilling());
649    }
650
651    #[test]
652    fn test_spillover_router_config_accessor() {
653        let config = RouterConfig::with_threshold(42);
654        let router = SpilloverRouter::new(config);
655        assert_eq!(router.config().spillover_threshold, 42);
656    }
657
658    #[test]
659    fn test_routing_decision_equality() {
660        let local1 = RoutingDecision::Local(ServingBackend::Realizar);
661        let local2 = RoutingDecision::Local(ServingBackend::Realizar);
662        assert_eq!(local1, local2);
663
664        let spillover1 = RoutingDecision::Spillover(ServingBackend::Groq);
665        let spillover2 = RoutingDecision::Spillover(ServingBackend::Groq);
666        assert_eq!(spillover1, spillover2);
667
668        let reject1 = RoutingDecision::Reject(RejectReason::QueueFull);
669        let reject2 = RoutingDecision::Reject(RejectReason::QueueFull);
670        assert_eq!(reject1, reject2);
671    }
672
673    #[test]
674    fn test_routing_decision_inequality() {
675        let local = RoutingDecision::Local(ServingBackend::Realizar);
676        let spillover = RoutingDecision::Spillover(ServingBackend::Groq);
677        assert_ne!(local, spillover);
678    }
679
680    #[test]
681    fn test_reject_reason_equality() {
682        assert_eq!(RejectReason::QueueFull, RejectReason::QueueFull);
683        assert_eq!(RejectReason::NoBackends, RejectReason::NoBackends);
684        assert_eq!(RejectReason::PrivacyViolation, RejectReason::PrivacyViolation);
685    }
686
687    #[test]
688    fn test_reject_reason_inequality() {
689        assert_ne!(RejectReason::QueueFull, RejectReason::NoBackends);
690        assert_ne!(RejectReason::NoBackends, RejectReason::PrivacyViolation);
691    }
692
693    #[test]
694    fn test_queue_depth_for_unknown_backend() {
695        let router = SpilloverRouter::with_defaults();
696        // Query depth for backend not in config
697        let depth = router.queue_depth(ServingBackend::Anthropic);
698        assert_eq!(depth, 0);
699    }
700
701    #[test]
702    fn test_start_request_unknown_backend() {
703        let router = SpilloverRouter::with_defaults();
704        // Should not panic for unknown backend
705        router.start_request(ServingBackend::Anthropic);
706        assert_eq!(router.queue_depth(ServingBackend::Anthropic), 0);
707    }
708
709    #[test]
710    fn test_complete_request_unknown_backend() {
711        let router = SpilloverRouter::with_defaults();
712        // Should not panic for unknown backend
713        router.complete_request(ServingBackend::Anthropic, 100);
714    }
715
716    #[test]
717    fn test_router_stats_local_avg_latency() {
718        let router = SpilloverRouter::with_defaults();
719        router.start_request(ServingBackend::Realizar);
720        router.complete_request(ServingBackend::Realizar, 100);
721        router.start_request(ServingBackend::Realizar);
722        router.complete_request(ServingBackend::Realizar, 200);
723
724        let stats = router.stats();
725        assert_eq!(stats.local_avg_latency_ms, 150.0);
726    }
727
728    #[test]
729    fn test_router_config_latency_sla() {
730        let config = RouterConfig::default();
731        assert_eq!(config.latency_sla_ms, 1000);
732    }
733
734    #[test]
735    fn test_router_config_local_backend() {
736        let config = RouterConfig::default();
737        assert_eq!(config.local_backend, ServingBackend::Realizar);
738    }
739
740    #[test]
741    fn test_router_config_spillover_backends() {
742        let config = RouterConfig::default();
743        assert!(!config.spillover_backends.is_empty());
744        assert!(config.spillover_backends.contains(&ServingBackend::Groq));
745    }
746}