Skip to main content

nexus/queue/
mod.rs

1//! Request queue for burst traffic handling
2//!
3//! Bounded dual-priority queue using tokio channels. Requests are queued when
4//! all backends are at capacity and drained as capacity becomes available.
5
6use crate::api::ChatCompletionRequest;
7use crate::config::QueueConfig;
8use crate::routing::reconciler::intent::RoutingIntent;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::Instant;
12use thiserror::Error;
13use tokio::sync::{mpsc, oneshot};
14
15/// Priority level for queued requests
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum Priority {
18    High,
19    Normal,
20}
21
22impl Priority {
23    /// Parse priority from header value. Invalid values default to Normal.
24    pub fn from_header(value: &str) -> Self {
25        match value.trim().to_lowercase().as_str() {
26            "high" => Priority::High,
27            _ => Priority::Normal,
28        }
29    }
30}
31
32/// A request waiting in the queue
33pub struct QueuedRequest {
34    /// Routing intent from the reconciler pipeline
35    pub intent: RoutingIntent,
36    /// Original chat completion request
37    pub request: ChatCompletionRequest,
38    /// Channel to send the response back to the waiting handler
39    pub response_tx: oneshot::Sender<QueueResponse>,
40    /// When the request was enqueued
41    pub enqueued_at: Instant,
42    /// Request priority
43    pub priority: Priority,
44}
45
46impl std::fmt::Debug for QueuedRequest {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("QueuedRequest")
49            .field("priority", &self.priority)
50            .field("enqueued_at", &self.enqueued_at)
51            .finish()
52    }
53}
54
55/// Response sent back through the oneshot channel
56pub type QueueResponse = Result<axum::response::Response, crate::api::ApiError>;
57
58/// Errors from queue operations
59#[derive(Debug, Error)]
60pub enum QueueError {
61    /// Queue is full (total depth == max_size)
62    #[error("Queue is full ({max_size} requests)")]
63    Full { max_size: u32 },
64
65    /// Queue is disabled
66    #[error("Request queuing is disabled")]
67    Disabled,
68}
69
70/// Bounded dual-priority request queue.
71///
72/// High-priority requests are dequeued before normal-priority requests.
73/// Total depth across both channels respects `max_size` from config.
74pub struct RequestQueue {
75    high_tx: mpsc::Sender<QueuedRequest>,
76    high_rx: tokio::sync::Mutex<mpsc::Receiver<QueuedRequest>>,
77    normal_tx: mpsc::Sender<QueuedRequest>,
78    normal_rx: tokio::sync::Mutex<mpsc::Receiver<QueuedRequest>>,
79    depth: Arc<AtomicUsize>,
80    config: QueueConfig,
81}
82
83impl RequestQueue {
84    /// Create a new RequestQueue from configuration.
85    pub fn new(config: QueueConfig) -> Self {
86        let capacity = config.max_size as usize;
87        // Split capacity: high gets up to full capacity, normal gets up to full capacity.
88        // Total depth is tracked atomically and enforced in enqueue().
89        let (high_tx, high_rx) = mpsc::channel(capacity.max(1));
90        let (normal_tx, normal_rx) = mpsc::channel(capacity.max(1));
91
92        Self {
93            high_tx,
94            high_rx: tokio::sync::Mutex::new(high_rx),
95            normal_tx,
96            normal_rx: tokio::sync::Mutex::new(normal_rx),
97            depth: Arc::new(AtomicUsize::new(0)),
98            config,
99        }
100    }
101
102    /// Enqueue a request. Returns QueueError::Full if at capacity.
103    pub fn enqueue(&self, request: QueuedRequest) -> Result<(), QueueError> {
104        if !self.config.is_enabled() {
105            return Err(QueueError::Disabled);
106        }
107
108        // CAS loop to atomically check-and-increment depth, preventing TOCTOU race
109        loop {
110            let current = self.depth.load(Ordering::SeqCst);
111            if current >= self.config.max_size as usize {
112                return Err(QueueError::Full {
113                    max_size: self.config.max_size,
114                });
115            }
116            if self
117                .depth
118                .compare_exchange(current, current + 1, Ordering::SeqCst, Ordering::SeqCst)
119                .is_ok()
120            {
121                break;
122            }
123        }
124        metrics::gauge!("nexus_queue_depth").set(self.depth() as f64);
125
126        let tx = match request.priority {
127            Priority::High => &self.high_tx,
128            Priority::Normal => &self.normal_tx,
129        };
130
131        // try_send is non-blocking; if the channel is full, we already checked
132        // depth so this should succeed unless there's a race (acceptable).
133        if tx.try_send(request).is_err() {
134            self.depth.fetch_sub(1, Ordering::SeqCst);
135            metrics::gauge!("nexus_queue_depth").set(self.depth() as f64);
136            return Err(QueueError::Full {
137                max_size: self.config.max_size,
138            });
139        }
140
141        Ok(())
142    }
143
144    /// Try to dequeue a request. High priority is drained first.
145    pub async fn try_dequeue(&self) -> Option<QueuedRequest> {
146        // Try high priority first
147        {
148            let mut rx = self.high_rx.lock().await;
149            if let Ok(req) = rx.try_recv() {
150                self.depth.fetch_sub(1, Ordering::SeqCst);
151                metrics::gauge!("nexus_queue_depth").set(self.depth() as f64);
152                return Some(req);
153            }
154        }
155
156        // Then normal priority
157        {
158            let mut rx = self.normal_rx.lock().await;
159            if let Ok(req) = rx.try_recv() {
160                self.depth.fetch_sub(1, Ordering::SeqCst);
161                metrics::gauge!("nexus_queue_depth").set(self.depth() as f64);
162                return Some(req);
163            }
164        }
165
166        None
167    }
168
169    /// Current queue depth (high + normal)
170    pub fn depth(&self) -> usize {
171        self.depth.load(Ordering::SeqCst)
172    }
173
174    /// Queue configuration
175    pub fn config(&self) -> &QueueConfig {
176        &self.config
177    }
178}
179
180/// Background drain loop that processes queued requests as capacity becomes
181/// available.
182///
183/// Watches for backend capacity, dequeues requests, re-runs the reconciler
184/// pipeline, and sends responses via oneshot channels.
185pub async fn queue_drain_loop(
186    queue: Arc<RequestQueue>,
187    state: Arc<crate::api::AppState>,
188    cancel: tokio_util::sync::CancellationToken,
189) {
190    use crate::routing::reconciler::intent::TierEnforcementMode;
191    use crate::routing::RequestRequirements;
192    use std::time::Duration;
193
194    tracing::info!("Queue drain loop started");
195
196    loop {
197        tokio::select! {
198            _ = cancel.cancelled() => {
199                tracing::info!("Queue drain loop shutting down");
200                // Drain remaining requests with 503
201                drain_remaining(&queue).await;
202                break;
203            }
204            _ = tokio::time::sleep(Duration::from_millis(50)) => {
205                // Process queued requests
206                while let Some(queued) = queue.try_dequeue().await {
207                    let max_wait = Duration::from_secs(
208                        queue.config().max_wait_seconds
209                    );
210
211                    // Drain-side timeout: skip requests already expired before processing.
212                    // The handler also has its own timeout guard (tokio::time::timeout on
213                    // the oneshot rx), so a request may time out on either side.
214                    if queued.enqueued_at.elapsed() > max_wait {
215                        tracing::warn!(
216                            priority = ?queued.priority,
217                            waited_ms = queued.enqueued_at.elapsed().as_millis() as u64,
218                            "Queued request timed out"
219                        );
220                        let retry_after =
221                            queue.config().max_wait_seconds.to_string();
222                        let error_response = build_timeout_response(&retry_after);
223                        let _ = queued.response_tx.send(Ok(error_response));
224                        continue;
225                    }
226
227                    // Re-run routing
228                    let requirements =
229                        RequestRequirements::from_request(&queued.request);
230                    let result = state.router.select_backend(
231                        &requirements,
232                        Some(TierEnforcementMode::Strict),
233                    );
234
235                    match result {
236                        Ok(routing_result) => {
237                            // Route the request through the agent
238                            let response = process_queued_request(
239                                &state,
240                                &routing_result,
241                                &queued.request,
242                            )
243                            .await;
244                            let _ = queued.response_tx.send(response);
245                        }
246                        Err(_) => {
247                            // Still no capacity, re-enqueue if not timed out
248                            if queued.enqueued_at.elapsed() < max_wait {
249                                let re_queued = QueuedRequest {
250                                    intent: queued.intent,
251                                    request: queued.request,
252                                    response_tx: queued.response_tx,
253                                    enqueued_at: queued.enqueued_at,
254                                    priority: queued.priority,
255                                };
256                                if queue.enqueue(re_queued).is_err() {
257                                    tracing::warn!(
258                                        "Failed to re-enqueue request"
259                                    );
260                                }
261                            } else {
262                                let retry_after = queue
263                                    .config()
264                                    .max_wait_seconds
265                                    .to_string();
266                                let error_response =
267                                    build_timeout_response(&retry_after);
268                                let _ =
269                                    queued.response_tx.send(Ok(error_response));
270                            }
271                        }
272                    }
273                }
274            }
275        }
276    }
277
278    tracing::info!("Queue drain loop stopped");
279}
280
281/// Process a queued request by forwarding to the selected backend.
282async fn process_queued_request(
283    state: &Arc<crate::api::AppState>,
284    routing_result: &crate::routing::RoutingResult,
285    request: &ChatCompletionRequest,
286) -> QueueResponse {
287    let backend = &routing_result.backend;
288    let mut request = request.clone();
289    request.model = routing_result.actual_model.clone();
290
291    let _ = state.registry.increment_pending(&backend.id);
292
293    let result = if let Some(agent) = state.registry.get_agent(&backend.id) {
294        agent
295            .chat_completion(request, None)
296            .await
297            .map_err(crate::api::ApiError::from_agent_error)
298    } else {
299        Err(crate::api::ApiError::bad_gateway(
300            "Agent not found for backend",
301        ))
302    };
303
304    let _ = state.registry.decrement_pending(&backend.id);
305
306    match result {
307        Ok(response) => Ok(axum::response::Json(response).into_response()),
308        Err(e) => Err(e),
309    }
310}
311
312use axum::response::IntoResponse;
313
314/// Build a 503 response with retry_after header for timed-out requests.
315pub fn build_timeout_response(retry_after: &str) -> axum::response::Response {
316    let error = crate::api::ApiError::service_unavailable("Request timed out in queue");
317    let mut response = error.into_response();
318    if let Ok(val) = axum::http::HeaderValue::from_str(retry_after) {
319        response
320            .headers_mut()
321            .insert(axum::http::header::RETRY_AFTER, val);
322    }
323    response
324}
325
326/// Drain remaining requests on shutdown with 503 responses.
327async fn drain_remaining(queue: &Arc<RequestQueue>) {
328    while let Some(queued) = queue.try_dequeue().await {
329        let error_response = build_timeout_response("5");
330        let _ = queued.response_tx.send(Ok(error_response));
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337    use crate::api::ChatCompletionRequest;
338    use crate::config::QueueConfig;
339    use crate::routing::reconciler::intent::RoutingIntent;
340    use crate::routing::RequestRequirements;
341    use std::time::{Duration, Instant};
342
343    fn make_config(max_size: u32, max_wait_seconds: u64) -> QueueConfig {
344        QueueConfig {
345            enabled: true,
346            max_size,
347            max_wait_seconds,
348        }
349    }
350
351    fn make_intent() -> RoutingIntent {
352        RoutingIntent::new(
353            "req-1".to_string(),
354            "llama3:8b".to_string(),
355            "llama3:8b".to_string(),
356            RequestRequirements {
357                model: "llama3:8b".to_string(),
358                estimated_tokens: 100,
359                needs_vision: false,
360                needs_tools: false,
361                needs_json_mode: false,
362                prefers_streaming: false,
363            },
364            vec![],
365        )
366    }
367
368    fn make_request() -> ChatCompletionRequest {
369        serde_json::from_value(serde_json::json!({
370            "model": "llama3:8b",
371            "messages": [{"role": "user", "content": "hello"}]
372        }))
373        .unwrap()
374    }
375
376    fn make_queued(priority: Priority) -> (QueuedRequest, oneshot::Receiver<QueueResponse>) {
377        let (tx, rx) = oneshot::channel();
378        let req = QueuedRequest {
379            intent: make_intent(),
380            request: make_request(),
381            response_tx: tx,
382            enqueued_at: Instant::now(),
383            priority,
384        };
385        (req, rx)
386    }
387
388    // ========================================================================
389    // T021: Unit tests for RequestQueue
390    // ========================================================================
391
392    #[tokio::test]
393    async fn fifo_ordering_normal_priority() {
394        let queue = RequestQueue::new(make_config(10, 30));
395
396        let (req1, _rx1) = make_queued(Priority::Normal);
397        let (req2, _rx2) = make_queued(Priority::Normal);
398        let (req3, _rx3) = make_queued(Priority::Normal);
399
400        let t1 = req1.enqueued_at;
401        let t2 = req2.enqueued_at;
402        let t3 = req3.enqueued_at;
403
404        queue.enqueue(req1).unwrap();
405        queue.enqueue(req2).unwrap();
406        queue.enqueue(req3).unwrap();
407
408        assert_eq!(queue.depth(), 3);
409
410        let d1 = queue.try_dequeue().await.unwrap();
411        assert_eq!(d1.enqueued_at, t1);
412        let d2 = queue.try_dequeue().await.unwrap();
413        assert_eq!(d2.enqueued_at, t2);
414        let d3 = queue.try_dequeue().await.unwrap();
415        assert_eq!(d3.enqueued_at, t3);
416
417        assert_eq!(queue.depth(), 0);
418    }
419
420    #[tokio::test]
421    async fn capacity_limits_reject_when_full() {
422        let queue = RequestQueue::new(make_config(2, 30));
423
424        let (req1, _rx1) = make_queued(Priority::Normal);
425        let (req2, _rx2) = make_queued(Priority::Normal);
426        let (req3, _rx3) = make_queued(Priority::Normal);
427
428        queue.enqueue(req1).unwrap();
429        queue.enqueue(req2).unwrap();
430        let result = queue.enqueue(req3);
431
432        assert!(result.is_err());
433        assert!(matches!(result, Err(QueueError::Full { max_size: 2 })));
434        assert_eq!(queue.depth(), 2);
435    }
436
437    #[tokio::test]
438    async fn priority_ordering_high_drains_first() {
439        let queue = RequestQueue::new(make_config(10, 30));
440
441        let (normal1, _rx1) = make_queued(Priority::Normal);
442        let (high1, _rx2) = make_queued(Priority::High);
443        let (normal2, _rx3) = make_queued(Priority::Normal);
444
445        // Enqueue normal, then high, then normal
446        queue.enqueue(normal1).unwrap();
447        queue.enqueue(high1).unwrap();
448        queue.enqueue(normal2).unwrap();
449
450        assert_eq!(queue.depth(), 3);
451
452        // High priority should dequeue first
453        let d1 = queue.try_dequeue().await.unwrap();
454        assert_eq!(d1.priority, Priority::High);
455
456        // Then normal in FIFO order
457        let d2 = queue.try_dequeue().await.unwrap();
458        assert_eq!(d2.priority, Priority::Normal);
459
460        let d3 = queue.try_dequeue().await.unwrap();
461        assert_eq!(d3.priority, Priority::Normal);
462    }
463
464    #[tokio::test]
465    async fn depth_accuracy() {
466        let queue = RequestQueue::new(make_config(10, 30));
467        assert_eq!(queue.depth(), 0);
468
469        let (req1, _rx1) = make_queued(Priority::Normal);
470        queue.enqueue(req1).unwrap();
471        assert_eq!(queue.depth(), 1);
472
473        let (req2, _rx2) = make_queued(Priority::High);
474        queue.enqueue(req2).unwrap();
475        assert_eq!(queue.depth(), 2);
476
477        queue.try_dequeue().await;
478        assert_eq!(queue.depth(), 1);
479
480        queue.try_dequeue().await;
481        assert_eq!(queue.depth(), 0);
482    }
483
484    #[tokio::test]
485    async fn max_size_zero_rejects_immediately() {
486        let queue = RequestQueue::new(make_config(0, 30));
487        let (req, _rx) = make_queued(Priority::Normal);
488        let result = queue.enqueue(req);
489        assert!(result.is_err());
490        assert!(matches!(result, Err(QueueError::Disabled)));
491    }
492
493    #[tokio::test]
494    async fn disabled_queue_rejects() {
495        let config = QueueConfig {
496            enabled: false,
497            max_size: 100,
498            max_wait_seconds: 30,
499        };
500        let queue = RequestQueue::new(config);
501        let (req, _rx) = make_queued(Priority::Normal);
502        let result = queue.enqueue(req);
503        assert!(result.is_err());
504        assert!(matches!(result, Err(QueueError::Disabled)));
505    }
506
507    #[tokio::test]
508    async fn empty_dequeue_returns_none() {
509        let queue = RequestQueue::new(make_config(10, 30));
510        let result = queue.try_dequeue().await;
511        assert!(result.is_none());
512    }
513
514    // ========================================================================
515    // T022: Unit tests for queue timeout
516    // ========================================================================
517
518    #[tokio::test]
519    async fn timeout_response_has_retry_after() {
520        let response = build_timeout_response("30");
521        assert_eq!(
522            response.status(),
523            axum::http::StatusCode::SERVICE_UNAVAILABLE
524        );
525        let retry = response
526            .headers()
527            .get(axum::http::header::RETRY_AFTER)
528            .unwrap();
529        assert_eq!(retry.to_str().unwrap(), "30");
530    }
531
532    #[tokio::test]
533    async fn enqueued_request_timeout_detection() {
534        // Simulate a request that was enqueued 2 seconds ago with 1s max wait
535        let config = make_config(10, 1);
536        let max_wait = Duration::from_secs(config.max_wait_seconds);
537
538        let enqueued_at = Instant::now() - Duration::from_secs(2);
539        let elapsed = enqueued_at.elapsed();
540        assert!(elapsed > max_wait, "Request should be timed out");
541    }
542
543    #[tokio::test]
544    async fn timeout_completes_within_time_limit() {
545        // This test verifies that timeout detection is fast (<2x max_wait)
546        let max_wait_ms = 100u64;
547        let config = QueueConfig {
548            enabled: true,
549            max_size: 10,
550            max_wait_seconds: 0, // We use ms-based detection in test
551        };
552
553        let _queue = RequestQueue::new(config);
554        let (req, rx) = make_queued(Priority::Normal);
555
556        // Override enqueued_at to be in the past
557        let timed_out_req = QueuedRequest {
558            intent: req.intent,
559            request: req.request,
560            response_tx: req.response_tx,
561            enqueued_at: Instant::now() - Duration::from_millis(max_wait_ms * 2),
562            priority: req.priority,
563        };
564
565        // Directly test timeout detection
566        let max_wait = Duration::from_millis(max_wait_ms);
567        assert!(timed_out_req.enqueued_at.elapsed() > max_wait);
568
569        // Send timeout response
570        let retry_after = "1";
571        let error_response = build_timeout_response(retry_after);
572        let _ = timed_out_req.response_tx.send(Ok(error_response));
573
574        // Verify response received within time
575        let start = Instant::now();
576        let result = rx.await;
577        assert!(result.is_ok());
578        assert!(start.elapsed() < Duration::from_millis(max_wait_ms * 2));
579    }
580
581    // ========================================================================
582    // T028: Priority header parsing tests
583    // ========================================================================
584
585    #[test]
586    fn priority_from_header_high() {
587        assert_eq!(Priority::from_header("high"), Priority::High);
588        assert_eq!(Priority::from_header("HIGH"), Priority::High);
589        assert_eq!(Priority::from_header(" High "), Priority::High);
590    }
591
592    #[test]
593    fn priority_from_header_normal() {
594        assert_eq!(Priority::from_header("normal"), Priority::Normal);
595        assert_eq!(Priority::from_header("NORMAL"), Priority::Normal);
596    }
597
598    #[test]
599    fn priority_from_header_invalid_defaults_to_normal() {
600        assert_eq!(Priority::from_header(""), Priority::Normal);
601        assert_eq!(Priority::from_header("urgent"), Priority::Normal);
602        assert_eq!(Priority::from_header("low"), Priority::Normal);
603        assert_eq!(Priority::from_header("123"), Priority::Normal);
604    }
605
606    // ========================================================================
607    // Concurrent enqueue: CAS loop prevents depth exceeding max_size
608    // ========================================================================
609
610    #[tokio::test]
611    async fn concurrent_enqueue_respects_max_size() {
612        let queue = Arc::new(RequestQueue::new(make_config(10, 30)));
613        let mut handles = vec![];
614
615        for _ in 0..50 {
616            let q = Arc::clone(&queue);
617            handles.push(tokio::spawn(async move {
618                let (req, _rx) = make_queued(Priority::Normal);
619                q.enqueue(req)
620            }));
621        }
622
623        let results: Vec<_> = futures::future::join_all(handles).await;
624        let successes = results
625            .iter()
626            .filter(|r| r.as_ref().unwrap().is_ok())
627            .count();
628
629        assert_eq!(successes, 10, "Exactly max_size requests should succeed");
630        assert_eq!(queue.depth(), 10);
631    }
632
633    #[test]
634    fn test_config_returns_queue_config() {
635        let config = make_config(50, 15);
636        let queue = RequestQueue::new(config);
637        let returned = queue.config();
638        assert_eq!(returned.max_size, 50);
639        assert_eq!(returned.max_wait_seconds, 15);
640    }
641
642    #[tokio::test]
643    async fn test_enqueue_high_priority_dequeued_first() {
644        let queue = Arc::new(RequestQueue::new(make_config(10, 30)));
645
646        // Enqueue a normal priority request
647        let (req1, _rx1) = make_queued(Priority::Normal);
648        queue.enqueue(req1).unwrap();
649
650        // Enqueue a high priority request
651        let (req2, _rx2) = make_queued(Priority::High);
652        queue.enqueue(req2).unwrap();
653
654        assert_eq!(queue.depth(), 2);
655
656        // High priority should be dequeued first
657        let first = queue.try_dequeue().await.unwrap();
658        assert_eq!(first.priority, Priority::High);
659    }
660
661    #[tokio::test]
662    async fn test_queue_timeout_response() {
663        let response = build_timeout_response("10");
664        assert_eq!(
665            response.status(),
666            axum::http::StatusCode::SERVICE_UNAVAILABLE
667        );
668        let retry = response
669            .headers()
670            .get(axum::http::header::RETRY_AFTER)
671            .expect("should have Retry-After header");
672        assert_eq!(retry.to_str().unwrap(), "10");
673
674        // Verify body contains error message
675        let body = axum::body::to_bytes(response.into_body(), usize::MAX)
676            .await
677            .unwrap();
678        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
679        let msg = json["error"]["message"].as_str().unwrap();
680        assert!(
681            msg.contains("timed out"),
682            "expected timeout message, got: {msg}"
683        );
684    }
685
686    #[tokio::test]
687    async fn test_queue_enqueue_disabled() {
688        let config = QueueConfig {
689            enabled: false,
690            max_size: 10,
691            max_wait_seconds: 30,
692        };
693        let queue = RequestQueue::new(config);
694        let (req, _rx) = make_queued(Priority::Normal);
695        let result = queue.enqueue(req);
696        assert!(matches!(result, Err(QueueError::Disabled)));
697    }
698
699    #[tokio::test]
700    async fn test_queue_enqueue_full() {
701        let queue = RequestQueue::new(make_config(2, 30));
702        let (r1, _rx1) = make_queued(Priority::Normal);
703        let (r2, _rx2) = make_queued(Priority::Normal);
704        let (r3, _rx3) = make_queued(Priority::Normal);
705
706        queue.enqueue(r1).unwrap();
707        queue.enqueue(r2).unwrap();
708        let result = queue.enqueue(r3);
709        assert!(matches!(result, Err(QueueError::Full { max_size: 2 })));
710        assert_eq!(queue.depth(), 2);
711    }
712
713    #[test]
714    fn test_queued_request_debug_format() {
715        let (req, _rx) = make_queued(Priority::High);
716        let debug = format!("{:?}", req);
717        assert!(debug.contains("High"));
718        assert!(debug.contains("enqueued_at"));
719    }
720
721    #[test]
722    fn test_queue_error_display_full() {
723        let err = QueueError::Full { max_size: 42 };
724        assert!(err.to_string().contains("42"));
725    }
726
727    #[test]
728    fn test_queue_error_display_disabled() {
729        let err = QueueError::Disabled;
730        assert!(err.to_string().contains("disabled"));
731    }
732
733    #[tokio::test]
734    async fn test_process_queued_request_success_with_agent() {
735        use crate::agent::types::{AgentCapabilities, AgentProfile, PrivacyZone};
736        use crate::agent::{
737            AgentError, HealthStatus, InferenceAgent, ModelCapability, StreamChunk,
738        };
739        use crate::api::types::{
740            ChatCompletionResponse, ChatMessage, Choice, MessageContent, Usage,
741        };
742        use crate::config::NexusConfig;
743        use crate::registry::{Backend, BackendType, DiscoverySource, Registry};
744        use async_trait::async_trait;
745        use axum::http::HeaderMap;
746        use futures_util::stream::BoxStream;
747
748        struct QueueSuccessAgent;
749
750        #[async_trait]
751        impl InferenceAgent for QueueSuccessAgent {
752            fn id(&self) -> &str {
753                "queue-agent"
754            }
755            fn name(&self) -> &str {
756                "Queue Agent"
757            }
758            fn profile(&self) -> AgentProfile {
759                AgentProfile {
760                    backend_type: "ollama".to_string(),
761                    version: None,
762                    privacy_zone: PrivacyZone::Restricted,
763                    capabilities: AgentCapabilities::default(),
764                    capability_tier: Some(1),
765                }
766            }
767            async fn health_check(&self) -> Result<HealthStatus, AgentError> {
768                Ok(HealthStatus::Healthy { model_count: 1 })
769            }
770            async fn list_models(&self) -> Result<Vec<ModelCapability>, AgentError> {
771                Ok(vec![])
772            }
773            async fn chat_completion(
774                &self,
775                _req: ChatCompletionRequest,
776                _h: Option<&HeaderMap>,
777            ) -> Result<ChatCompletionResponse, AgentError> {
778                Ok(ChatCompletionResponse {
779                    id: "queue-cmpl".to_string(),
780                    object: "chat.completion".to_string(),
781                    created: 1234567890,
782                    model: "llama3:8b".to_string(),
783                    choices: vec![Choice {
784                        index: 0,
785                        message: ChatMessage {
786                            role: "assistant".to_string(),
787                            content: MessageContent::Text {
788                                content: "Queued response".to_string(),
789                            },
790                            name: None,
791                            function_call: None,
792                        },
793                        finish_reason: Some("stop".to_string()),
794                    }],
795                    usage: Some(Usage {
796                        prompt_tokens: 5,
797                        completion_tokens: 3,
798                        total_tokens: 8,
799                    }),
800                    extra: std::collections::HashMap::new(),
801                })
802            }
803            async fn chat_completion_stream(
804                &self,
805                _req: ChatCompletionRequest,
806                _h: Option<&HeaderMap>,
807            ) -> Result<BoxStream<'static, Result<StreamChunk, AgentError>>, AgentError>
808            {
809                Err(AgentError::Unsupported("streaming"))
810            }
811        }
812
813        let registry = Arc::new(Registry::new());
814        let backend = Backend::new(
815            "queue-agent".to_string(),
816            "Queue Agent Backend".to_string(),
817            "http://localhost:11434".to_string(),
818            BackendType::Ollama,
819            vec![],
820            DiscoverySource::Static,
821            std::collections::HashMap::new(),
822        );
823        let agent: Arc<dyn InferenceAgent> = Arc::new(QueueSuccessAgent);
824        registry.add_backend_with_agent(backend, agent).unwrap();
825
826        let config = Arc::new(NexusConfig::default());
827        let state = Arc::new(crate::api::AppState::new(Arc::clone(&registry), config));
828
829        let backend_arc = registry
830            .get_all_backends()
831            .into_iter()
832            .find(|b| b.id == "queue-agent")
833            .unwrap();
834        let routing_result = crate::routing::RoutingResult {
835            backend: Arc::new(backend_arc),
836            actual_model: "llama3:8b".to_string(),
837            fallback_used: false,
838            route_reason: "test".to_string(),
839            cost_estimated: None,
840            budget_status: crate::routing::reconciler::intent::BudgetStatus::Normal,
841            budget_utilization: None,
842            budget_remaining: None,
843        };
844
845        let request = make_request();
846        let result = process_queued_request(&state, &routing_result, &request).await;
847        assert!(result.is_ok(), "should succeed with agent");
848        let response = result.unwrap();
849        assert_eq!(response.status(), axum::http::StatusCode::OK);
850
851        let body = axum::body::to_bytes(response.into_body(), usize::MAX)
852            .await
853            .unwrap();
854        let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
855        assert_eq!(json["id"], "queue-cmpl");
856    }
857
858    #[tokio::test]
859    async fn test_process_queued_request_agent_not_found() {
860        use crate::config::NexusConfig;
861        use crate::registry::{Backend, BackendType, DiscoverySource, Registry};
862
863        let registry = Arc::new(Registry::new());
864        let backend = Backend::new(
865            "ghost".to_string(),
866            "ghost-backend".to_string(),
867            "http://localhost:9999".to_string(),
868            BackendType::Ollama,
869            vec![],
870            DiscoverySource::Static,
871            std::collections::HashMap::new(),
872        );
873        registry.add_backend(backend).unwrap();
874
875        let config = Arc::new(NexusConfig::default());
876        let state = Arc::new(crate::api::AppState::new(Arc::clone(&registry), config));
877
878        // Build a RoutingResult pointing to the backend with no agent registered
879        let backend_arc = registry
880            .get_all_backends()
881            .into_iter()
882            .find(|b| b.id == "ghost")
883            .expect("backend must exist");
884        let routing_result = crate::routing::RoutingResult {
885            backend: Arc::new(backend_arc),
886            actual_model: "llama3:8b".to_string(),
887            fallback_used: false,
888            route_reason: "test".to_string(),
889            cost_estimated: None,
890            budget_status: crate::routing::reconciler::intent::BudgetStatus::Normal,
891            budget_utilization: None,
892            budget_remaining: None,
893        };
894
895        let request = make_request();
896        let result = process_queued_request(&state, &routing_result, &request).await;
897        assert!(result.is_err(), "should fail when agent is not registered");
898    }
899
900    #[tokio::test]
901    async fn test_drain_remaining_on_shutdown() {
902        let queue = Arc::new(RequestQueue::new(make_config(10, 30)));
903
904        let (r1, rx1) = make_queued(Priority::Normal);
905        let (r2, rx2) = make_queued(Priority::High);
906        queue.enqueue(r1).unwrap();
907        queue.enqueue(r2).unwrap();
908        assert_eq!(queue.depth(), 2);
909
910        drain_remaining(&queue).await;
911        assert_eq!(queue.depth(), 0);
912
913        // Both receivers should get 503 timeout responses
914        let resp1 = rx1.await.expect("should receive response").unwrap();
915        assert_eq!(resp1.status(), axum::http::StatusCode::SERVICE_UNAVAILABLE);
916        let resp2 = rx2.await.expect("should receive response").unwrap();
917        assert_eq!(resp2.status(), axum::http::StatusCode::SERVICE_UNAVAILABLE);
918    }
919
920    #[tokio::test]
921    async fn test_queue_drain_loop_processes_request() {
922        use crate::config::NexusConfig;
923        use crate::registry::Registry;
924        use tokio_util::sync::CancellationToken;
925
926        let queue = Arc::new(RequestQueue::new(make_config(10, 1)));
927        let registry = Arc::new(Registry::new());
928        let config = Arc::new(NexusConfig::default());
929        let state = Arc::new(crate::api::AppState::new(Arc::clone(&registry), config));
930
931        // Enqueue a request — no backends registered so routing will fail,
932        // and since max_wait_seconds=1, after the request expires the drain
933        // loop will send a timeout response.
934        let (req, rx) = make_queued(Priority::Normal);
935        // Set enqueued_at to the past so it's already expired
936        let timed_out = QueuedRequest {
937            intent: req.intent,
938            request: req.request,
939            response_tx: req.response_tx,
940            enqueued_at: Instant::now() - Duration::from_secs(5),
941            priority: req.priority,
942        };
943        queue.enqueue(timed_out).unwrap();
944
945        let cancel = CancellationToken::new();
946        let cancel_clone = cancel.clone();
947        let q = Arc::clone(&queue);
948        let s = Arc::clone(&state);
949
950        let handle = tokio::spawn(async move {
951            queue_drain_loop(q, s, cancel_clone).await;
952        });
953
954        // Wait for the drain loop to process the expired request
955        let result = tokio::time::timeout(Duration::from_secs(3), rx).await;
956        assert!(result.is_ok(), "should receive response within timeout");
957        let response = result.unwrap().unwrap().unwrap();
958        assert_eq!(
959            response.status(),
960            axum::http::StatusCode::SERVICE_UNAVAILABLE
961        );
962
963        cancel.cancel();
964        let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
965    }
966}