1use crate::api::ChatCompletionRequest;
7use crate::config::QueueConfig;
8use crate::routing::reconciler::intent::RoutingIntent;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::sync::Arc;
11use std::time::Instant;
12use thiserror::Error;
13use tokio::sync::{mpsc, oneshot};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum Priority {
18 High,
19 Normal,
20}
21
22impl Priority {
23 pub fn from_header(value: &str) -> Self {
25 match value.trim().to_lowercase().as_str() {
26 "high" => Priority::High,
27 _ => Priority::Normal,
28 }
29 }
30}
31
32pub struct QueuedRequest {
34 pub intent: RoutingIntent,
36 pub request: ChatCompletionRequest,
38 pub response_tx: oneshot::Sender<QueueResponse>,
40 pub enqueued_at: Instant,
42 pub priority: Priority,
44}
45
46impl std::fmt::Debug for QueuedRequest {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("QueuedRequest")
49 .field("priority", &self.priority)
50 .field("enqueued_at", &self.enqueued_at)
51 .finish()
52 }
53}
54
55pub type QueueResponse = Result<axum::response::Response, crate::api::ApiError>;
57
58#[derive(Debug, Error)]
60pub enum QueueError {
61 #[error("Queue is full ({max_size} requests)")]
63 Full { max_size: u32 },
64
65 #[error("Request queuing is disabled")]
67 Disabled,
68}
69
70pub struct RequestQueue {
75 high_tx: mpsc::Sender<QueuedRequest>,
76 high_rx: tokio::sync::Mutex<mpsc::Receiver<QueuedRequest>>,
77 normal_tx: mpsc::Sender<QueuedRequest>,
78 normal_rx: tokio::sync::Mutex<mpsc::Receiver<QueuedRequest>>,
79 depth: Arc<AtomicUsize>,
80 config: QueueConfig,
81}
82
83impl RequestQueue {
84 pub fn new(config: QueueConfig) -> Self {
86 let capacity = config.max_size as usize;
87 let (high_tx, high_rx) = mpsc::channel(capacity.max(1));
90 let (normal_tx, normal_rx) = mpsc::channel(capacity.max(1));
91
92 Self {
93 high_tx,
94 high_rx: tokio::sync::Mutex::new(high_rx),
95 normal_tx,
96 normal_rx: tokio::sync::Mutex::new(normal_rx),
97 depth: Arc::new(AtomicUsize::new(0)),
98 config,
99 }
100 }
101
102 pub fn enqueue(&self, request: QueuedRequest) -> Result<(), QueueError> {
104 if !self.config.is_enabled() {
105 return Err(QueueError::Disabled);
106 }
107
108 loop {
110 let current = self.depth.load(Ordering::SeqCst);
111 if current >= self.config.max_size as usize {
112 return Err(QueueError::Full {
113 max_size: self.config.max_size,
114 });
115 }
116 if self
117 .depth
118 .compare_exchange(current, current + 1, Ordering::SeqCst, Ordering::SeqCst)
119 .is_ok()
120 {
121 break;
122 }
123 }
124 metrics::gauge!("nexus_queue_depth").set(self.depth() as f64);
125
126 let tx = match request.priority {
127 Priority::High => &self.high_tx,
128 Priority::Normal => &self.normal_tx,
129 };
130
131 if tx.try_send(request).is_err() {
134 self.depth.fetch_sub(1, Ordering::SeqCst);
135 metrics::gauge!("nexus_queue_depth").set(self.depth() as f64);
136 return Err(QueueError::Full {
137 max_size: self.config.max_size,
138 });
139 }
140
141 Ok(())
142 }
143
144 pub async fn try_dequeue(&self) -> Option<QueuedRequest> {
146 {
148 let mut rx = self.high_rx.lock().await;
149 if let Ok(req) = rx.try_recv() {
150 self.depth.fetch_sub(1, Ordering::SeqCst);
151 metrics::gauge!("nexus_queue_depth").set(self.depth() as f64);
152 return Some(req);
153 }
154 }
155
156 {
158 let mut rx = self.normal_rx.lock().await;
159 if let Ok(req) = rx.try_recv() {
160 self.depth.fetch_sub(1, Ordering::SeqCst);
161 metrics::gauge!("nexus_queue_depth").set(self.depth() as f64);
162 return Some(req);
163 }
164 }
165
166 None
167 }
168
169 pub fn depth(&self) -> usize {
171 self.depth.load(Ordering::SeqCst)
172 }
173
174 pub fn config(&self) -> &QueueConfig {
176 &self.config
177 }
178}
179
180pub async fn queue_drain_loop(
186 queue: Arc<RequestQueue>,
187 state: Arc<crate::api::AppState>,
188 cancel: tokio_util::sync::CancellationToken,
189) {
190 use crate::routing::reconciler::intent::TierEnforcementMode;
191 use crate::routing::RequestRequirements;
192 use std::time::Duration;
193
194 tracing::info!("Queue drain loop started");
195
196 loop {
197 tokio::select! {
198 _ = cancel.cancelled() => {
199 tracing::info!("Queue drain loop shutting down");
200 drain_remaining(&queue).await;
202 break;
203 }
204 _ = tokio::time::sleep(Duration::from_millis(50)) => {
205 while let Some(queued) = queue.try_dequeue().await {
207 let max_wait = Duration::from_secs(
208 queue.config().max_wait_seconds
209 );
210
211 if queued.enqueued_at.elapsed() > max_wait {
215 tracing::warn!(
216 priority = ?queued.priority,
217 waited_ms = queued.enqueued_at.elapsed().as_millis() as u64,
218 "Queued request timed out"
219 );
220 let retry_after =
221 queue.config().max_wait_seconds.to_string();
222 let error_response = build_timeout_response(&retry_after);
223 let _ = queued.response_tx.send(Ok(error_response));
224 continue;
225 }
226
227 let requirements =
229 RequestRequirements::from_request(&queued.request);
230 let result = state.router.select_backend(
231 &requirements,
232 Some(TierEnforcementMode::Strict),
233 );
234
235 match result {
236 Ok(routing_result) => {
237 let response = process_queued_request(
239 &state,
240 &routing_result,
241 &queued.request,
242 )
243 .await;
244 let _ = queued.response_tx.send(response);
245 }
246 Err(_) => {
247 if queued.enqueued_at.elapsed() < max_wait {
249 let re_queued = QueuedRequest {
250 intent: queued.intent,
251 request: queued.request,
252 response_tx: queued.response_tx,
253 enqueued_at: queued.enqueued_at,
254 priority: queued.priority,
255 };
256 if queue.enqueue(re_queued).is_err() {
257 tracing::warn!(
258 "Failed to re-enqueue request"
259 );
260 }
261 } else {
262 let retry_after = queue
263 .config()
264 .max_wait_seconds
265 .to_string();
266 let error_response =
267 build_timeout_response(&retry_after);
268 let _ =
269 queued.response_tx.send(Ok(error_response));
270 }
271 }
272 }
273 }
274 }
275 }
276 }
277
278 tracing::info!("Queue drain loop stopped");
279}
280
281async fn process_queued_request(
283 state: &Arc<crate::api::AppState>,
284 routing_result: &crate::routing::RoutingResult,
285 request: &ChatCompletionRequest,
286) -> QueueResponse {
287 let backend = &routing_result.backend;
288 let mut request = request.clone();
289 request.model = routing_result.actual_model.clone();
290
291 let _ = state.registry.increment_pending(&backend.id);
292
293 let result = if let Some(agent) = state.registry.get_agent(&backend.id) {
294 agent
295 .chat_completion(request, None)
296 .await
297 .map_err(crate::api::ApiError::from_agent_error)
298 } else {
299 Err(crate::api::ApiError::bad_gateway(
300 "Agent not found for backend",
301 ))
302 };
303
304 let _ = state.registry.decrement_pending(&backend.id);
305
306 match result {
307 Ok(response) => Ok(axum::response::Json(response).into_response()),
308 Err(e) => Err(e),
309 }
310}
311
312use axum::response::IntoResponse;
313
314pub fn build_timeout_response(retry_after: &str) -> axum::response::Response {
316 let error = crate::api::ApiError::service_unavailable("Request timed out in queue");
317 let mut response = error.into_response();
318 if let Ok(val) = axum::http::HeaderValue::from_str(retry_after) {
319 response
320 .headers_mut()
321 .insert(axum::http::header::RETRY_AFTER, val);
322 }
323 response
324}
325
326async fn drain_remaining(queue: &Arc<RequestQueue>) {
328 while let Some(queued) = queue.try_dequeue().await {
329 let error_response = build_timeout_response("5");
330 let _ = queued.response_tx.send(Ok(error_response));
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337 use crate::api::ChatCompletionRequest;
338 use crate::config::QueueConfig;
339 use crate::routing::reconciler::intent::RoutingIntent;
340 use crate::routing::RequestRequirements;
341 use std::time::{Duration, Instant};
342
343 fn make_config(max_size: u32, max_wait_seconds: u64) -> QueueConfig {
344 QueueConfig {
345 enabled: true,
346 max_size,
347 max_wait_seconds,
348 }
349 }
350
351 fn make_intent() -> RoutingIntent {
352 RoutingIntent::new(
353 "req-1".to_string(),
354 "llama3:8b".to_string(),
355 "llama3:8b".to_string(),
356 RequestRequirements {
357 model: "llama3:8b".to_string(),
358 estimated_tokens: 100,
359 needs_vision: false,
360 needs_tools: false,
361 needs_json_mode: false,
362 prefers_streaming: false,
363 },
364 vec![],
365 )
366 }
367
368 fn make_request() -> ChatCompletionRequest {
369 serde_json::from_value(serde_json::json!({
370 "model": "llama3:8b",
371 "messages": [{"role": "user", "content": "hello"}]
372 }))
373 .unwrap()
374 }
375
376 fn make_queued(priority: Priority) -> (QueuedRequest, oneshot::Receiver<QueueResponse>) {
377 let (tx, rx) = oneshot::channel();
378 let req = QueuedRequest {
379 intent: make_intent(),
380 request: make_request(),
381 response_tx: tx,
382 enqueued_at: Instant::now(),
383 priority,
384 };
385 (req, rx)
386 }
387
388 #[tokio::test]
393 async fn fifo_ordering_normal_priority() {
394 let queue = RequestQueue::new(make_config(10, 30));
395
396 let (req1, _rx1) = make_queued(Priority::Normal);
397 let (req2, _rx2) = make_queued(Priority::Normal);
398 let (req3, _rx3) = make_queued(Priority::Normal);
399
400 let t1 = req1.enqueued_at;
401 let t2 = req2.enqueued_at;
402 let t3 = req3.enqueued_at;
403
404 queue.enqueue(req1).unwrap();
405 queue.enqueue(req2).unwrap();
406 queue.enqueue(req3).unwrap();
407
408 assert_eq!(queue.depth(), 3);
409
410 let d1 = queue.try_dequeue().await.unwrap();
411 assert_eq!(d1.enqueued_at, t1);
412 let d2 = queue.try_dequeue().await.unwrap();
413 assert_eq!(d2.enqueued_at, t2);
414 let d3 = queue.try_dequeue().await.unwrap();
415 assert_eq!(d3.enqueued_at, t3);
416
417 assert_eq!(queue.depth(), 0);
418 }
419
420 #[tokio::test]
421 async fn capacity_limits_reject_when_full() {
422 let queue = RequestQueue::new(make_config(2, 30));
423
424 let (req1, _rx1) = make_queued(Priority::Normal);
425 let (req2, _rx2) = make_queued(Priority::Normal);
426 let (req3, _rx3) = make_queued(Priority::Normal);
427
428 queue.enqueue(req1).unwrap();
429 queue.enqueue(req2).unwrap();
430 let result = queue.enqueue(req3);
431
432 assert!(result.is_err());
433 assert!(matches!(result, Err(QueueError::Full { max_size: 2 })));
434 assert_eq!(queue.depth(), 2);
435 }
436
437 #[tokio::test]
438 async fn priority_ordering_high_drains_first() {
439 let queue = RequestQueue::new(make_config(10, 30));
440
441 let (normal1, _rx1) = make_queued(Priority::Normal);
442 let (high1, _rx2) = make_queued(Priority::High);
443 let (normal2, _rx3) = make_queued(Priority::Normal);
444
445 queue.enqueue(normal1).unwrap();
447 queue.enqueue(high1).unwrap();
448 queue.enqueue(normal2).unwrap();
449
450 assert_eq!(queue.depth(), 3);
451
452 let d1 = queue.try_dequeue().await.unwrap();
454 assert_eq!(d1.priority, Priority::High);
455
456 let d2 = queue.try_dequeue().await.unwrap();
458 assert_eq!(d2.priority, Priority::Normal);
459
460 let d3 = queue.try_dequeue().await.unwrap();
461 assert_eq!(d3.priority, Priority::Normal);
462 }
463
464 #[tokio::test]
465 async fn depth_accuracy() {
466 let queue = RequestQueue::new(make_config(10, 30));
467 assert_eq!(queue.depth(), 0);
468
469 let (req1, _rx1) = make_queued(Priority::Normal);
470 queue.enqueue(req1).unwrap();
471 assert_eq!(queue.depth(), 1);
472
473 let (req2, _rx2) = make_queued(Priority::High);
474 queue.enqueue(req2).unwrap();
475 assert_eq!(queue.depth(), 2);
476
477 queue.try_dequeue().await;
478 assert_eq!(queue.depth(), 1);
479
480 queue.try_dequeue().await;
481 assert_eq!(queue.depth(), 0);
482 }
483
484 #[tokio::test]
485 async fn max_size_zero_rejects_immediately() {
486 let queue = RequestQueue::new(make_config(0, 30));
487 let (req, _rx) = make_queued(Priority::Normal);
488 let result = queue.enqueue(req);
489 assert!(result.is_err());
490 assert!(matches!(result, Err(QueueError::Disabled)));
491 }
492
493 #[tokio::test]
494 async fn disabled_queue_rejects() {
495 let config = QueueConfig {
496 enabled: false,
497 max_size: 100,
498 max_wait_seconds: 30,
499 };
500 let queue = RequestQueue::new(config);
501 let (req, _rx) = make_queued(Priority::Normal);
502 let result = queue.enqueue(req);
503 assert!(result.is_err());
504 assert!(matches!(result, Err(QueueError::Disabled)));
505 }
506
507 #[tokio::test]
508 async fn empty_dequeue_returns_none() {
509 let queue = RequestQueue::new(make_config(10, 30));
510 let result = queue.try_dequeue().await;
511 assert!(result.is_none());
512 }
513
514 #[tokio::test]
519 async fn timeout_response_has_retry_after() {
520 let response = build_timeout_response("30");
521 assert_eq!(
522 response.status(),
523 axum::http::StatusCode::SERVICE_UNAVAILABLE
524 );
525 let retry = response
526 .headers()
527 .get(axum::http::header::RETRY_AFTER)
528 .unwrap();
529 assert_eq!(retry.to_str().unwrap(), "30");
530 }
531
532 #[tokio::test]
533 async fn enqueued_request_timeout_detection() {
534 let config = make_config(10, 1);
536 let max_wait = Duration::from_secs(config.max_wait_seconds);
537
538 let enqueued_at = Instant::now() - Duration::from_secs(2);
539 let elapsed = enqueued_at.elapsed();
540 assert!(elapsed > max_wait, "Request should be timed out");
541 }
542
543 #[tokio::test]
544 async fn timeout_completes_within_time_limit() {
545 let max_wait_ms = 100u64;
547 let config = QueueConfig {
548 enabled: true,
549 max_size: 10,
550 max_wait_seconds: 0, };
552
553 let _queue = RequestQueue::new(config);
554 let (req, rx) = make_queued(Priority::Normal);
555
556 let timed_out_req = QueuedRequest {
558 intent: req.intent,
559 request: req.request,
560 response_tx: req.response_tx,
561 enqueued_at: Instant::now() - Duration::from_millis(max_wait_ms * 2),
562 priority: req.priority,
563 };
564
565 let max_wait = Duration::from_millis(max_wait_ms);
567 assert!(timed_out_req.enqueued_at.elapsed() > max_wait);
568
569 let retry_after = "1";
571 let error_response = build_timeout_response(retry_after);
572 let _ = timed_out_req.response_tx.send(Ok(error_response));
573
574 let start = Instant::now();
576 let result = rx.await;
577 assert!(result.is_ok());
578 assert!(start.elapsed() < Duration::from_millis(max_wait_ms * 2));
579 }
580
581 #[test]
586 fn priority_from_header_high() {
587 assert_eq!(Priority::from_header("high"), Priority::High);
588 assert_eq!(Priority::from_header("HIGH"), Priority::High);
589 assert_eq!(Priority::from_header(" High "), Priority::High);
590 }
591
592 #[test]
593 fn priority_from_header_normal() {
594 assert_eq!(Priority::from_header("normal"), Priority::Normal);
595 assert_eq!(Priority::from_header("NORMAL"), Priority::Normal);
596 }
597
598 #[test]
599 fn priority_from_header_invalid_defaults_to_normal() {
600 assert_eq!(Priority::from_header(""), Priority::Normal);
601 assert_eq!(Priority::from_header("urgent"), Priority::Normal);
602 assert_eq!(Priority::from_header("low"), Priority::Normal);
603 assert_eq!(Priority::from_header("123"), Priority::Normal);
604 }
605
606 #[tokio::test]
611 async fn concurrent_enqueue_respects_max_size() {
612 let queue = Arc::new(RequestQueue::new(make_config(10, 30)));
613 let mut handles = vec![];
614
615 for _ in 0..50 {
616 let q = Arc::clone(&queue);
617 handles.push(tokio::spawn(async move {
618 let (req, _rx) = make_queued(Priority::Normal);
619 q.enqueue(req)
620 }));
621 }
622
623 let results: Vec<_> = futures::future::join_all(handles).await;
624 let successes = results
625 .iter()
626 .filter(|r| r.as_ref().unwrap().is_ok())
627 .count();
628
629 assert_eq!(successes, 10, "Exactly max_size requests should succeed");
630 assert_eq!(queue.depth(), 10);
631 }
632
633 #[test]
634 fn test_config_returns_queue_config() {
635 let config = make_config(50, 15);
636 let queue = RequestQueue::new(config);
637 let returned = queue.config();
638 assert_eq!(returned.max_size, 50);
639 assert_eq!(returned.max_wait_seconds, 15);
640 }
641
642 #[tokio::test]
643 async fn test_enqueue_high_priority_dequeued_first() {
644 let queue = Arc::new(RequestQueue::new(make_config(10, 30)));
645
646 let (req1, _rx1) = make_queued(Priority::Normal);
648 queue.enqueue(req1).unwrap();
649
650 let (req2, _rx2) = make_queued(Priority::High);
652 queue.enqueue(req2).unwrap();
653
654 assert_eq!(queue.depth(), 2);
655
656 let first = queue.try_dequeue().await.unwrap();
658 assert_eq!(first.priority, Priority::High);
659 }
660
661 #[tokio::test]
662 async fn test_queue_timeout_response() {
663 let response = build_timeout_response("10");
664 assert_eq!(
665 response.status(),
666 axum::http::StatusCode::SERVICE_UNAVAILABLE
667 );
668 let retry = response
669 .headers()
670 .get(axum::http::header::RETRY_AFTER)
671 .expect("should have Retry-After header");
672 assert_eq!(retry.to_str().unwrap(), "10");
673
674 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
676 .await
677 .unwrap();
678 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
679 let msg = json["error"]["message"].as_str().unwrap();
680 assert!(
681 msg.contains("timed out"),
682 "expected timeout message, got: {msg}"
683 );
684 }
685
686 #[tokio::test]
687 async fn test_queue_enqueue_disabled() {
688 let config = QueueConfig {
689 enabled: false,
690 max_size: 10,
691 max_wait_seconds: 30,
692 };
693 let queue = RequestQueue::new(config);
694 let (req, _rx) = make_queued(Priority::Normal);
695 let result = queue.enqueue(req);
696 assert!(matches!(result, Err(QueueError::Disabled)));
697 }
698
699 #[tokio::test]
700 async fn test_queue_enqueue_full() {
701 let queue = RequestQueue::new(make_config(2, 30));
702 let (r1, _rx1) = make_queued(Priority::Normal);
703 let (r2, _rx2) = make_queued(Priority::Normal);
704 let (r3, _rx3) = make_queued(Priority::Normal);
705
706 queue.enqueue(r1).unwrap();
707 queue.enqueue(r2).unwrap();
708 let result = queue.enqueue(r3);
709 assert!(matches!(result, Err(QueueError::Full { max_size: 2 })));
710 assert_eq!(queue.depth(), 2);
711 }
712
713 #[test]
714 fn test_queued_request_debug_format() {
715 let (req, _rx) = make_queued(Priority::High);
716 let debug = format!("{:?}", req);
717 assert!(debug.contains("High"));
718 assert!(debug.contains("enqueued_at"));
719 }
720
721 #[test]
722 fn test_queue_error_display_full() {
723 let err = QueueError::Full { max_size: 42 };
724 assert!(err.to_string().contains("42"));
725 }
726
727 #[test]
728 fn test_queue_error_display_disabled() {
729 let err = QueueError::Disabled;
730 assert!(err.to_string().contains("disabled"));
731 }
732
733 #[tokio::test]
734 async fn test_process_queued_request_success_with_agent() {
735 use crate::agent::types::{AgentCapabilities, AgentProfile, PrivacyZone};
736 use crate::agent::{
737 AgentError, HealthStatus, InferenceAgent, ModelCapability, StreamChunk,
738 };
739 use crate::api::types::{
740 ChatCompletionResponse, ChatMessage, Choice, MessageContent, Usage,
741 };
742 use crate::config::NexusConfig;
743 use crate::registry::{Backend, BackendType, DiscoverySource, Registry};
744 use async_trait::async_trait;
745 use axum::http::HeaderMap;
746 use futures_util::stream::BoxStream;
747
748 struct QueueSuccessAgent;
749
750 #[async_trait]
751 impl InferenceAgent for QueueSuccessAgent {
752 fn id(&self) -> &str {
753 "queue-agent"
754 }
755 fn name(&self) -> &str {
756 "Queue Agent"
757 }
758 fn profile(&self) -> AgentProfile {
759 AgentProfile {
760 backend_type: "ollama".to_string(),
761 version: None,
762 privacy_zone: PrivacyZone::Restricted,
763 capabilities: AgentCapabilities::default(),
764 capability_tier: Some(1),
765 }
766 }
767 async fn health_check(&self) -> Result<HealthStatus, AgentError> {
768 Ok(HealthStatus::Healthy { model_count: 1 })
769 }
770 async fn list_models(&self) -> Result<Vec<ModelCapability>, AgentError> {
771 Ok(vec![])
772 }
773 async fn chat_completion(
774 &self,
775 _req: ChatCompletionRequest,
776 _h: Option<&HeaderMap>,
777 ) -> Result<ChatCompletionResponse, AgentError> {
778 Ok(ChatCompletionResponse {
779 id: "queue-cmpl".to_string(),
780 object: "chat.completion".to_string(),
781 created: 1234567890,
782 model: "llama3:8b".to_string(),
783 choices: vec![Choice {
784 index: 0,
785 message: ChatMessage {
786 role: "assistant".to_string(),
787 content: MessageContent::Text {
788 content: "Queued response".to_string(),
789 },
790 name: None,
791 function_call: None,
792 },
793 finish_reason: Some("stop".to_string()),
794 }],
795 usage: Some(Usage {
796 prompt_tokens: 5,
797 completion_tokens: 3,
798 total_tokens: 8,
799 }),
800 extra: std::collections::HashMap::new(),
801 })
802 }
803 async fn chat_completion_stream(
804 &self,
805 _req: ChatCompletionRequest,
806 _h: Option<&HeaderMap>,
807 ) -> Result<BoxStream<'static, Result<StreamChunk, AgentError>>, AgentError>
808 {
809 Err(AgentError::Unsupported("streaming"))
810 }
811 }
812
813 let registry = Arc::new(Registry::new());
814 let backend = Backend::new(
815 "queue-agent".to_string(),
816 "Queue Agent Backend".to_string(),
817 "http://localhost:11434".to_string(),
818 BackendType::Ollama,
819 vec![],
820 DiscoverySource::Static,
821 std::collections::HashMap::new(),
822 );
823 let agent: Arc<dyn InferenceAgent> = Arc::new(QueueSuccessAgent);
824 registry.add_backend_with_agent(backend, agent).unwrap();
825
826 let config = Arc::new(NexusConfig::default());
827 let state = Arc::new(crate::api::AppState::new(Arc::clone(®istry), config));
828
829 let backend_arc = registry
830 .get_all_backends()
831 .into_iter()
832 .find(|b| b.id == "queue-agent")
833 .unwrap();
834 let routing_result = crate::routing::RoutingResult {
835 backend: Arc::new(backend_arc),
836 actual_model: "llama3:8b".to_string(),
837 fallback_used: false,
838 route_reason: "test".to_string(),
839 cost_estimated: None,
840 budget_status: crate::routing::reconciler::intent::BudgetStatus::Normal,
841 budget_utilization: None,
842 budget_remaining: None,
843 };
844
845 let request = make_request();
846 let result = process_queued_request(&state, &routing_result, &request).await;
847 assert!(result.is_ok(), "should succeed with agent");
848 let response = result.unwrap();
849 assert_eq!(response.status(), axum::http::StatusCode::OK);
850
851 let body = axum::body::to_bytes(response.into_body(), usize::MAX)
852 .await
853 .unwrap();
854 let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
855 assert_eq!(json["id"], "queue-cmpl");
856 }
857
858 #[tokio::test]
859 async fn test_process_queued_request_agent_not_found() {
860 use crate::config::NexusConfig;
861 use crate::registry::{Backend, BackendType, DiscoverySource, Registry};
862
863 let registry = Arc::new(Registry::new());
864 let backend = Backend::new(
865 "ghost".to_string(),
866 "ghost-backend".to_string(),
867 "http://localhost:9999".to_string(),
868 BackendType::Ollama,
869 vec![],
870 DiscoverySource::Static,
871 std::collections::HashMap::new(),
872 );
873 registry.add_backend(backend).unwrap();
874
875 let config = Arc::new(NexusConfig::default());
876 let state = Arc::new(crate::api::AppState::new(Arc::clone(®istry), config));
877
878 let backend_arc = registry
880 .get_all_backends()
881 .into_iter()
882 .find(|b| b.id == "ghost")
883 .expect("backend must exist");
884 let routing_result = crate::routing::RoutingResult {
885 backend: Arc::new(backend_arc),
886 actual_model: "llama3:8b".to_string(),
887 fallback_used: false,
888 route_reason: "test".to_string(),
889 cost_estimated: None,
890 budget_status: crate::routing::reconciler::intent::BudgetStatus::Normal,
891 budget_utilization: None,
892 budget_remaining: None,
893 };
894
895 let request = make_request();
896 let result = process_queued_request(&state, &routing_result, &request).await;
897 assert!(result.is_err(), "should fail when agent is not registered");
898 }
899
900 #[tokio::test]
901 async fn test_drain_remaining_on_shutdown() {
902 let queue = Arc::new(RequestQueue::new(make_config(10, 30)));
903
904 let (r1, rx1) = make_queued(Priority::Normal);
905 let (r2, rx2) = make_queued(Priority::High);
906 queue.enqueue(r1).unwrap();
907 queue.enqueue(r2).unwrap();
908 assert_eq!(queue.depth(), 2);
909
910 drain_remaining(&queue).await;
911 assert_eq!(queue.depth(), 0);
912
913 let resp1 = rx1.await.expect("should receive response").unwrap();
915 assert_eq!(resp1.status(), axum::http::StatusCode::SERVICE_UNAVAILABLE);
916 let resp2 = rx2.await.expect("should receive response").unwrap();
917 assert_eq!(resp2.status(), axum::http::StatusCode::SERVICE_UNAVAILABLE);
918 }
919
920 #[tokio::test]
921 async fn test_queue_drain_loop_processes_request() {
922 use crate::config::NexusConfig;
923 use crate::registry::Registry;
924 use tokio_util::sync::CancellationToken;
925
926 let queue = Arc::new(RequestQueue::new(make_config(10, 1)));
927 let registry = Arc::new(Registry::new());
928 let config = Arc::new(NexusConfig::default());
929 let state = Arc::new(crate::api::AppState::new(Arc::clone(®istry), config));
930
931 let (req, rx) = make_queued(Priority::Normal);
935 let timed_out = QueuedRequest {
937 intent: req.intent,
938 request: req.request,
939 response_tx: req.response_tx,
940 enqueued_at: Instant::now() - Duration::from_secs(5),
941 priority: req.priority,
942 };
943 queue.enqueue(timed_out).unwrap();
944
945 let cancel = CancellationToken::new();
946 let cancel_clone = cancel.clone();
947 let q = Arc::clone(&queue);
948 let s = Arc::clone(&state);
949
950 let handle = tokio::spawn(async move {
951 queue_drain_loop(q, s, cancel_clone).await;
952 });
953
954 let result = tokio::time::timeout(Duration::from_secs(3), rx).await;
956 assert!(result.is_ok(), "should receive response within timeout");
957 let response = result.unwrap().unwrap().unwrap();
958 assert_eq!(
959 response.status(),
960 axum::http::StatusCode::SERVICE_UNAVAILABLE
961 );
962
963 cancel.cancel();
964 let _ = tokio::time::timeout(Duration::from_secs(2), handle).await;
965 }
966}