1use crate::serve::backends::{PrivacyTier, ServingBackend};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
12use std::time::{Duration, Instant};
13
14#[derive(Debug, Default)]
20pub struct QueueMetrics {
21 depth: AtomicUsize,
23 total_requests: AtomicU64,
25 total_latency_ms: AtomicU64,
27 recent_requests: AtomicU64,
29}
30
31impl QueueMetrics {
32 #[must_use]
34 pub fn new() -> Self {
35 Self::default()
36 }
37
38 pub fn enqueue(&self) {
40 self.depth.fetch_add(1, Ordering::SeqCst);
41 }
42
43 pub fn dequeue(&self, latency_ms: u64) {
45 self.depth.fetch_sub(1, Ordering::SeqCst);
46 self.total_requests.fetch_add(1, Ordering::SeqCst);
47 self.total_latency_ms.fetch_add(latency_ms, Ordering::SeqCst);
48 self.recent_requests.fetch_add(1, Ordering::SeqCst);
49 }
50
51 #[must_use]
53 pub fn depth(&self) -> usize {
54 self.depth.load(Ordering::SeqCst)
55 }
56
57 #[must_use]
59 pub fn avg_latency_ms(&self) -> f64 {
60 let total = self.total_requests.load(Ordering::SeqCst);
61 if total == 0 {
62 0.0
63 } else {
64 self.total_latency_ms.load(Ordering::SeqCst) as f64 / total as f64
65 }
66 }
67
68 #[must_use]
70 pub fn total_requests(&self) -> u64 {
71 self.total_requests.load(Ordering::SeqCst)
72 }
73
74 pub fn reset_recent(&self) {
76 self.recent_requests.store(0, Ordering::SeqCst);
77 }
78
79 #[must_use]
81 pub fn take_recent(&self) -> u64 {
82 self.recent_requests.swap(0, Ordering::SeqCst)
83 }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
92pub struct RouterConfig {
93 pub spillover_threshold: usize,
95 pub max_queue_depth: usize,
97 pub latency_sla_ms: u64,
99 pub privacy: PrivacyTier,
101 pub local_backend: ServingBackend,
103 pub spillover_backends: Vec<ServingBackend>,
105 pub spillover_enabled: bool,
107}
108
109impl Default for RouterConfig {
110 fn default() -> Self {
111 Self {
112 spillover_threshold: 10,
113 max_queue_depth: 50,
114 latency_sla_ms: 1000, privacy: PrivacyTier::Standard,
116 local_backend: ServingBackend::Realizar,
117 spillover_backends: vec![
118 ServingBackend::Groq, ServingBackend::Together, ServingBackend::Fireworks, ],
122 spillover_enabled: true,
123 }
124 }
125}
126
127impl RouterConfig {
128 #[must_use]
130 pub fn sovereign() -> Self {
131 Self {
132 privacy: PrivacyTier::Sovereign,
133 spillover_backends: vec![ServingBackend::Ollama, ServingBackend::LlamaCpp],
134 spillover_enabled: true,
135 ..Default::default()
136 }
137 }
138
139 #[must_use]
141 pub fn with_threshold(threshold: usize) -> Self {
142 Self { spillover_threshold: threshold, ..Default::default() }
143 }
144}
145
146#[derive(Debug, Clone, PartialEq, Eq)]
152pub enum RoutingDecision {
153 Local(ServingBackend),
155 Spillover(ServingBackend),
157 Reject(RejectReason),
159}
160
161#[derive(Debug, Clone, PartialEq, Eq)]
163pub enum RejectReason {
164 QueueFull,
166 NoBackends,
168 PrivacyViolation,
170}
171
172impl std::fmt::Display for RejectReason {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 match self {
175 Self::QueueFull => write!(f, "Queue full, try again later"),
176 Self::NoBackends => write!(f, "No backends available"),
177 Self::PrivacyViolation => write!(f, "Request violates privacy constraints"),
178 }
179 }
180}
181
182pub struct SpilloverRouter {
188 config: RouterConfig,
189 metrics: HashMap<ServingBackend, QueueMetrics>,
191 last_window: std::sync::RwLock<Instant>,
193 window_duration: Duration,
195}
196
197impl SpilloverRouter {
198 #[must_use]
200 pub fn new(config: RouterConfig) -> Self {
201 let mut metrics = HashMap::new();
202 metrics.insert(config.local_backend, QueueMetrics::new());
203 for backend in &config.spillover_backends {
204 metrics.insert(*backend, QueueMetrics::new());
205 }
206
207 Self {
208 config,
209 metrics,
210 last_window: std::sync::RwLock::new(Instant::now()),
211 window_duration: Duration::from_secs(60),
212 }
213 }
214
215 #[must_use]
217 pub fn with_defaults() -> Self {
218 Self::new(RouterConfig::default())
219 }
220
221 #[must_use]
223 pub fn route(&self) -> RoutingDecision {
224 let local_depth = self.backend_depth(self.config.local_backend);
226
227 if local_depth >= self.config.max_queue_depth {
229 if self.config.spillover_enabled {
231 if let Some(backend) = self.find_available_spillover() {
232 return RoutingDecision::Spillover(backend);
233 }
234 }
235 return RoutingDecision::Reject(RejectReason::QueueFull);
236 }
237
238 if self.config.spillover_enabled && local_depth >= self.config.spillover_threshold {
240 if let Some(backend) = self.find_available_spillover() {
241 return RoutingDecision::Spillover(backend);
242 }
243 }
244
245 RoutingDecision::Local(self.config.local_backend)
247 }
248
249 fn backend_depth(&self, backend: ServingBackend) -> usize {
251 self.metrics.get(&backend).map_or(0, QueueMetrics::depth)
252 }
253
254 fn find_available_spillover(&self) -> Option<ServingBackend> {
256 self.config
257 .spillover_backends
258 .iter()
259 .copied()
260 .filter(|b| self.config.privacy.allows(*b))
261 .find(|b| self.backend_depth(*b) < self.config.max_queue_depth)
262 }
263
264 pub fn start_request(&self, backend: ServingBackend) {
266 if let Some(metrics) = self.metrics.get(&backend) {
267 metrics.enqueue();
268 }
269 }
270
271 pub fn complete_request(&self, backend: ServingBackend, latency_ms: u64) {
273 if let Some(metrics) = self.metrics.get(&backend) {
274 metrics.dequeue(latency_ms);
275 }
276 }
277
278 #[must_use]
280 pub fn queue_depth(&self, backend: ServingBackend) -> usize {
281 self.backend_depth(backend)
282 }
283
284 #[must_use]
286 pub fn local_queue_depth(&self) -> usize {
287 self.queue_depth(self.config.local_backend)
288 }
289
290 #[must_use]
292 pub fn stats(&self) -> RouterStats {
293 let local_latency =
294 self.metrics.get(&self.config.local_backend).map_or(0.0, QueueMetrics::avg_latency_ms);
295
296 let spillover_depth: usize =
297 self.config.spillover_backends.iter().map(|b| self.backend_depth(*b)).sum();
298
299 RouterStats {
300 local_queue_depth: self.local_queue_depth(),
301 local_avg_latency_ms: local_latency,
302 spillover_queue_depth: spillover_depth,
303 spillover_threshold: self.config.spillover_threshold,
304 max_queue_depth: self.config.max_queue_depth,
305 spillover_enabled: self.config.spillover_enabled,
306 }
307 }
308
309 #[must_use]
311 pub fn config(&self) -> &RouterConfig {
312 &self.config
313 }
314
315 #[must_use]
317 pub fn is_spilling(&self) -> bool {
318 self.local_queue_depth() >= self.config.spillover_threshold
319 }
320}
321
322impl Default for SpilloverRouter {
323 fn default() -> Self {
324 Self::with_defaults()
325 }
326}
327
328#[derive(Debug, Clone, Default)]
330pub struct RouterStats {
331 pub local_queue_depth: usize,
332 pub local_avg_latency_ms: f64,
333 pub spillover_queue_depth: usize,
334 pub spillover_threshold: usize,
335 pub max_queue_depth: usize,
336 pub spillover_enabled: bool,
337}
338
339impl RouterStats {
340 #[must_use]
342 pub fn utilization(&self) -> f64 {
343 if self.max_queue_depth == 0 {
344 0.0
345 } else {
346 (self.local_queue_depth as f64 / self.max_queue_depth as f64) * 100.0
347 }
348 }
349
350 #[must_use]
352 pub fn near_spillover(&self) -> bool {
353 self.local_queue_depth >= (self.spillover_threshold * 80 / 100)
354 }
355}
356
357#[cfg(test)]
362#[allow(non_snake_case)]
363mod tests {
364 use super::*;
365
366 fn router_with(threshold: usize, max_depth: usize) -> SpilloverRouter {
369 SpilloverRouter::new(RouterConfig {
370 spillover_threshold: threshold,
371 max_queue_depth: max_depth,
372 ..Default::default()
373 })
374 }
375
376 fn fill_local_queue(router: &SpilloverRouter, n: usize) {
378 for _ in 0..n {
379 router.start_request(ServingBackend::Realizar);
380 }
381 }
382
383 #[test]
388 fn test_SERVE_RTR_001_metrics_new() {
389 let metrics = QueueMetrics::new();
390 assert_eq!(metrics.depth(), 0);
391 assert_eq!(metrics.total_requests(), 0);
392 }
393
394 #[test]
395 fn test_SERVE_RTR_001_enqueue_dequeue() {
396 let metrics = QueueMetrics::new();
397 metrics.enqueue();
398 assert_eq!(metrics.depth(), 1);
399
400 metrics.enqueue();
401 assert_eq!(metrics.depth(), 2);
402
403 metrics.dequeue(100);
404 assert_eq!(metrics.depth(), 1);
405 assert_eq!(metrics.total_requests(), 1);
406 }
407
408 #[test]
409 fn test_SERVE_RTR_001_avg_latency() {
410 let metrics = QueueMetrics::new();
411 metrics.enqueue();
412 metrics.dequeue(100);
413 metrics.enqueue();
414 metrics.dequeue(200);
415
416 assert_eq!(metrics.avg_latency_ms(), 150.0);
417 }
418
419 #[test]
420 fn test_SERVE_RTR_001_avg_latency_empty() {
421 let metrics = QueueMetrics::new();
422 assert_eq!(metrics.avg_latency_ms(), 0.0);
423 }
424
425 #[test]
430 fn test_SERVE_RTR_002_default_config() {
431 let config = RouterConfig::default();
432 assert_eq!(config.spillover_threshold, 10);
433 assert_eq!(config.max_queue_depth, 50);
434 assert!(config.spillover_enabled);
435 }
436
437 #[test]
438 fn test_SERVE_RTR_002_sovereign_config() {
439 let config = RouterConfig::sovereign();
440 assert_eq!(config.privacy, PrivacyTier::Sovereign);
441 for backend in &config.spillover_backends {
443 assert!(backend.is_local());
444 }
445 }
446
447 #[test]
448 fn test_SERVE_RTR_002_custom_threshold() {
449 let config = RouterConfig::with_threshold(5);
450 assert_eq!(config.spillover_threshold, 5);
451 }
452
453 #[test]
458 fn test_SERVE_RTR_003_route_local_empty_queue() {
459 let router = SpilloverRouter::with_defaults();
460 let decision = router.route();
461 assert!(matches!(decision, RoutingDecision::Local(_)));
462 }
463
464 #[test]
465 fn test_SERVE_RTR_003_route_spillover_when_busy() {
466 let router = router_with(2, 10);
467
468 fill_local_queue(&router, 3);
470
471 let decision = router.route();
472 assert!(matches!(decision, RoutingDecision::Spillover(_)));
473 }
474
475 #[test]
476 fn test_SERVE_RTR_003_route_reject_when_full() {
477 let router = SpilloverRouter::new(RouterConfig {
478 spillover_threshold: 2,
479 max_queue_depth: 3,
480 spillover_enabled: false, ..Default::default()
482 });
483
484 fill_local_queue(&router, 3);
486
487 let decision = router.route();
488 assert!(matches!(decision, RoutingDecision::Reject(RejectReason::QueueFull)));
489 }
490
491 #[test]
496 fn test_SERVE_RTR_004_queue_depth() {
497 let router = SpilloverRouter::with_defaults();
498 assert_eq!(router.local_queue_depth(), 0);
499
500 router.start_request(ServingBackend::Realizar);
501 assert_eq!(router.local_queue_depth(), 1);
502
503 router.complete_request(ServingBackend::Realizar, 50);
504 assert_eq!(router.local_queue_depth(), 0);
505 }
506
507 #[test]
508 fn test_SERVE_RTR_004_is_spilling() {
509 let router = router_with(2, 50);
510
511 assert!(!router.is_spilling());
512
513 fill_local_queue(&router, 2);
514
515 assert!(router.is_spilling());
516 }
517
518 #[test]
523 fn test_SERVE_RTR_005_stats() {
524 let router = SpilloverRouter::with_defaults();
525 let stats = router.stats();
526 assert_eq!(stats.local_queue_depth, 0);
527 assert!(stats.spillover_enabled);
528 }
529
530 #[test]
531 fn test_SERVE_RTR_005_utilization() {
532 let router = router_with(10, 100);
533
534 fill_local_queue(&router, 25);
536
537 let stats = router.stats();
538 assert_eq!(stats.utilization(), 25.0);
539 }
540
541 #[test]
542 fn test_SERVE_RTR_005_near_spillover() {
543 let router = router_with(10, 50);
544
545 fill_local_queue(&router, 8);
547
548 let stats = router.stats();
549 assert!(stats.near_spillover());
550 }
551
552 #[test]
557 fn test_SERVE_RTR_006_sovereign_no_public_spillover() {
558 let router = SpilloverRouter::new(RouterConfig::sovereign());
559
560 fill_local_queue(&router, 15);
562
563 let decision = router.route();
564 match decision {
566 RoutingDecision::Spillover(backend) => assert!(backend.is_local()),
567 RoutingDecision::Local(_) => {} RoutingDecision::Reject(_) => {} }
570 }
571
572 #[test]
577 fn test_SERVE_RTR_007_reject_reason_display() {
578 assert!(RejectReason::QueueFull.to_string().contains("Queue"));
579 assert!(RejectReason::NoBackends.to_string().contains("backend"));
580 assert!(RejectReason::PrivacyViolation.to_string().contains("privacy"));
581 }
582
583 #[test]
588 fn test_queue_metrics_reset_recent() {
589 let metrics = QueueMetrics::new();
590 metrics.enqueue();
591 metrics.dequeue(50);
592 assert_eq!(metrics.total_requests(), 1);
593 metrics.reset_recent();
594 assert_eq!(metrics.total_requests(), 1);
596 }
597
598 #[test]
599 fn test_queue_metrics_take_recent() {
600 let metrics = QueueMetrics::new();
601 metrics.enqueue();
602 metrics.dequeue(100);
603 metrics.enqueue();
604 metrics.dequeue(200);
605 let recent = metrics.take_recent();
606 assert_eq!(recent, 2);
607 let recent_after = metrics.take_recent();
609 assert_eq!(recent_after, 0);
610 }
611
612 #[test]
613 fn test_queue_metrics_default() {
614 let metrics = QueueMetrics::default();
615 assert_eq!(metrics.depth(), 0);
616 assert_eq!(metrics.total_requests(), 0);
617 assert_eq!(metrics.avg_latency_ms(), 0.0);
618 }
619
620 #[test]
621 fn test_router_stats_default() {
622 let stats = RouterStats::default();
623 assert_eq!(stats.local_queue_depth, 0);
624 assert_eq!(stats.local_avg_latency_ms, 0.0);
625 assert_eq!(stats.spillover_queue_depth, 0);
626 assert_eq!(stats.spillover_threshold, 0);
627 assert_eq!(stats.max_queue_depth, 0);
628 assert!(!stats.spillover_enabled);
629 }
630
631 #[test]
632 fn test_router_stats_utilization_zero_max() {
633 let stats = RouterStats { max_queue_depth: 0, local_queue_depth: 5, ..Default::default() };
634 assert_eq!(stats.utilization(), 0.0);
635 }
636
637 #[test]
638 fn test_router_stats_near_spillover_false() {
639 let stats =
640 RouterStats { spillover_threshold: 100, local_queue_depth: 10, ..Default::default() };
641 assert!(!stats.near_spillover());
642 }
643
644 #[test]
645 fn test_spillover_router_default() {
646 let router = SpilloverRouter::default();
647 assert_eq!(router.local_queue_depth(), 0);
648 assert!(!router.is_spilling());
649 }
650
651 #[test]
652 fn test_spillover_router_config_accessor() {
653 let config = RouterConfig::with_threshold(42);
654 let router = SpilloverRouter::new(config);
655 assert_eq!(router.config().spillover_threshold, 42);
656 }
657
658 #[test]
659 fn test_routing_decision_equality() {
660 let local1 = RoutingDecision::Local(ServingBackend::Realizar);
661 let local2 = RoutingDecision::Local(ServingBackend::Realizar);
662 assert_eq!(local1, local2);
663
664 let spillover1 = RoutingDecision::Spillover(ServingBackend::Groq);
665 let spillover2 = RoutingDecision::Spillover(ServingBackend::Groq);
666 assert_eq!(spillover1, spillover2);
667
668 let reject1 = RoutingDecision::Reject(RejectReason::QueueFull);
669 let reject2 = RoutingDecision::Reject(RejectReason::QueueFull);
670 assert_eq!(reject1, reject2);
671 }
672
673 #[test]
674 fn test_routing_decision_inequality() {
675 let local = RoutingDecision::Local(ServingBackend::Realizar);
676 let spillover = RoutingDecision::Spillover(ServingBackend::Groq);
677 assert_ne!(local, spillover);
678 }
679
680 #[test]
681 fn test_reject_reason_equality() {
682 assert_eq!(RejectReason::QueueFull, RejectReason::QueueFull);
683 assert_eq!(RejectReason::NoBackends, RejectReason::NoBackends);
684 assert_eq!(RejectReason::PrivacyViolation, RejectReason::PrivacyViolation);
685 }
686
687 #[test]
688 fn test_reject_reason_inequality() {
689 assert_ne!(RejectReason::QueueFull, RejectReason::NoBackends);
690 assert_ne!(RejectReason::NoBackends, RejectReason::PrivacyViolation);
691 }
692
693 #[test]
694 fn test_queue_depth_for_unknown_backend() {
695 let router = SpilloverRouter::with_defaults();
696 let depth = router.queue_depth(ServingBackend::Anthropic);
698 assert_eq!(depth, 0);
699 }
700
701 #[test]
702 fn test_start_request_unknown_backend() {
703 let router = SpilloverRouter::with_defaults();
704 router.start_request(ServingBackend::Anthropic);
706 assert_eq!(router.queue_depth(ServingBackend::Anthropic), 0);
707 }
708
709 #[test]
710 fn test_complete_request_unknown_backend() {
711 let router = SpilloverRouter::with_defaults();
712 router.complete_request(ServingBackend::Anthropic, 100);
714 }
715
716 #[test]
717 fn test_router_stats_local_avg_latency() {
718 let router = SpilloverRouter::with_defaults();
719 router.start_request(ServingBackend::Realizar);
720 router.complete_request(ServingBackend::Realizar, 100);
721 router.start_request(ServingBackend::Realizar);
722 router.complete_request(ServingBackend::Realizar, 200);
723
724 let stats = router.stats();
725 assert_eq!(stats.local_avg_latency_ms, 150.0);
726 }
727
728 #[test]
729 fn test_router_config_latency_sla() {
730 let config = RouterConfig::default();
731 assert_eq!(config.latency_sla_ms, 1000);
732 }
733
734 #[test]
735 fn test_router_config_local_backend() {
736 let config = RouterConfig::default();
737 assert_eq!(config.local_backend, ServingBackend::Realizar);
738 }
739
740 #[test]
741 fn test_router_config_spillover_backends() {
742 let config = RouterConfig::default();
743 assert!(!config.spillover_backends.is_empty());
744 assert!(config.spillover_backends.contains(&ServingBackend::Groq));
745 }
746}