1#[cfg(feature = "monitoring")]
4use crate::alerts::AlertManager;
5use crate::config::LaneConfig;
6use crate::error::Result;
7use crate::event::{EventEmitter, EventStream, LaneEvent};
8#[cfg(feature = "metrics")]
9use crate::metrics::QueueMetrics;
10use crate::queue::{lane_ids, priorities, Command, CommandQueue, Lane};
11use crate::storage::Storage;
12use crate::QueueStats;
13use std::collections::HashMap;
14use std::sync::Arc;
15
16#[allow(dead_code)]
18pub struct QueueManager {
19 queue: Arc<CommandQueue>,
20 scheduler_handle: tokio::sync::Mutex<Option<()>>,
21 #[cfg(feature = "metrics")]
22 metrics: Option<QueueMetrics>,
23 #[cfg(feature = "monitoring")]
24 alerts: Option<Arc<AlertManager>>,
25}
26
27impl QueueManager {
28 #[allow(dead_code)]
30 pub(crate) fn new(queue: Arc<CommandQueue>) -> Self {
31 Self {
32 queue,
33 scheduler_handle: tokio::sync::Mutex::new(None),
34 #[cfg(feature = "metrics")]
35 metrics: None,
36 #[cfg(feature = "monitoring")]
37 alerts: None,
38 }
39 }
40
41 pub(crate) fn with_observability(
43 queue: Arc<CommandQueue>,
44 #[cfg(feature = "metrics")] metrics: Option<QueueMetrics>,
45 #[cfg(feature = "monitoring")] alerts: Option<Arc<AlertManager>>,
46 ) -> Self {
47 Self {
48 queue,
49 scheduler_handle: tokio::sync::Mutex::new(None),
50 #[cfg(feature = "metrics")]
51 metrics,
52 #[cfg(feature = "monitoring")]
53 alerts,
54 }
55 }
56
57 pub async fn start(&self) -> Result<()> {
59 tracing::info!("Starting queue scheduler");
60 let queue = Arc::clone(&self.queue);
61 queue.start_scheduler().await;
62 Ok(())
63 }
64
65 pub async fn submit(
67 &self,
68 lane_id: &str,
69 command: Box<dyn Command>,
70 ) -> Result<tokio::sync::oneshot::Receiver<Result<serde_json::Value>>> {
71 #[cfg(feature = "telemetry")]
72 crate::telemetry::record_submit(lane_id);
73 self.queue.submit(lane_id, command).await
74 }
75
76 pub async fn stats(&self) -> Result<QueueStats> {
78 let lane_status = self.queue.status().await;
79
80 let mut total_pending = 0;
81 let mut total_active = 0;
82
83 for status in lane_status.values() {
84 total_pending += status.pending;
85 total_active += status.active;
86 }
87
88 let dead_letter_count = match self.queue.dlq() {
89 Some(dlq) => dlq.len().await,
90 None => 0,
91 };
92
93 Ok(QueueStats {
94 total_pending,
95 total_active,
96 dead_letter_count,
97 lanes: lane_status,
98 })
99 }
100
101 pub fn queue(&self) -> Arc<CommandQueue> {
103 Arc::clone(&self.queue)
104 }
105
106 pub fn subscribe(&self) -> EventStream {
108 self.queue.subscribe_stream()
109 }
110
111 pub fn subscribe_filtered(
113 &self,
114 filter: impl Fn(&LaneEvent) -> bool + Send + Sync + 'static,
115 ) -> EventStream {
116 self.queue.subscribe_filtered(filter)
117 }
118
119 pub async fn shutdown(&self) {
121 self.queue.shutdown().await;
122 }
123
124 pub async fn drain(&self, timeout: std::time::Duration) -> Result<()> {
126 self.queue.drain(timeout).await
127 }
128
129 pub fn is_shutting_down(&self) -> bool {
131 self.queue.is_shutting_down()
132 }
133
134 #[cfg(feature = "metrics")]
136 pub fn metrics(&self) -> Option<&QueueMetrics> {
137 self.metrics.as_ref()
138 }
139
140 #[cfg(feature = "monitoring")]
142 pub fn alerts(&self) -> Option<&Arc<AlertManager>> {
143 self.alerts.as_ref()
144 }
145}
146
147pub struct QueueManagerBuilder {
149 event_emitter: EventEmitter,
150 lane_configs: HashMap<String, (LaneConfig, u8)>,
151 storage: Option<Arc<dyn Storage>>,
152 dlq_size: Option<usize>,
153 #[cfg(feature = "metrics")]
154 metrics: Option<QueueMetrics>,
155 #[cfg(feature = "monitoring")]
156 alerts: Option<Arc<AlertManager>>,
157}
158
159impl QueueManagerBuilder {
160 pub fn new(event_emitter: EventEmitter) -> Self {
162 Self {
163 event_emitter,
164 lane_configs: HashMap::new(),
165 storage: None,
166 dlq_size: None,
167 #[cfg(feature = "metrics")]
168 metrics: None,
169 #[cfg(feature = "monitoring")]
170 alerts: None,
171 }
172 }
173
174 pub fn with_lane(mut self, id: impl Into<String>, config: LaneConfig, priority: u8) -> Self {
176 self.lane_configs.insert(id.into(), (config, priority));
177 self
178 }
179
180 pub fn with_storage(mut self, storage: Arc<dyn Storage>) -> Self {
182 self.storage = Some(storage);
183 self
184 }
185
186 pub fn with_dlq(mut self, size: usize) -> Self {
188 self.dlq_size = Some(size);
189 self
190 }
191
192 #[cfg(feature = "metrics")]
194 pub fn with_metrics(mut self, metrics: QueueMetrics) -> Self {
195 self.metrics = Some(metrics);
196 self
197 }
198
199 #[cfg(feature = "monitoring")]
201 pub fn with_alerts(mut self, alerts: Arc<AlertManager>) -> Self {
202 self.alerts = Some(alerts);
203 self
204 }
205
206 pub fn with_default_lanes(mut self) -> Self {
208 self.lane_configs.insert(
209 lane_ids::SYSTEM.to_string(),
210 (LaneConfig::new(1, 5), priorities::SYSTEM),
211 );
212 self.lane_configs.insert(
213 lane_ids::CONTROL.to_string(),
214 (LaneConfig::new(1, 3), priorities::CONTROL),
215 );
216 self.lane_configs.insert(
217 lane_ids::QUERY.to_string(),
218 (LaneConfig::new(1, 10), priorities::QUERY),
219 );
220 self.lane_configs.insert(
221 lane_ids::SESSION.to_string(),
222 (LaneConfig::new(1, 5), priorities::SESSION),
223 );
224 self.lane_configs.insert(
225 lane_ids::SKILL.to_string(),
226 (LaneConfig::new(1, 3), priorities::SKILL),
227 );
228 self.lane_configs.insert(
229 lane_ids::PROMPT.to_string(),
230 (LaneConfig::new(1, 2), priorities::PROMPT),
231 );
232 self
233 }
234
235 pub async fn build(self) -> Result<QueueManager> {
237 let queue = match (self.dlq_size, self.storage.clone()) {
239 (Some(dlq_size), Some(storage)) => Arc::new(CommandQueue::with_dlq_and_storage(
240 self.event_emitter,
241 dlq_size,
242 storage.clone(),
243 )),
244 (Some(dlq_size), None) => {
245 Arc::new(CommandQueue::with_dlq(self.event_emitter, dlq_size))
246 }
247 (None, Some(storage)) => Arc::new(CommandQueue::with_storage(
248 self.event_emitter,
249 storage.clone(),
250 )),
251 (None, None) => Arc::new(CommandQueue::new(self.event_emitter)),
252 };
253
254 for (id, (config, priority)) in self.lane_configs {
256 let lane = if let Some(storage) = &self.storage {
257 Arc::new(Lane::with_storage(id, config, priority, storage.clone()))
258 } else {
259 Arc::new(Lane::new(id, config, priority))
260 };
261 queue.register_lane(lane).await;
262 }
263
264 Ok(QueueManager::with_observability(
265 queue,
266 #[cfg(feature = "metrics")]
267 self.metrics,
268 #[cfg(feature = "monitoring")]
269 self.alerts,
270 ))
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277 use crate::error::LaneError;
278 use async_trait::async_trait;
279
280 struct TestCommand {
281 result: serde_json::Value,
282 }
283
284 #[async_trait]
285 impl Command for TestCommand {
286 async fn execute(&self) -> Result<serde_json::Value> {
287 Ok(self.result.clone())
288 }
289 fn command_type(&self) -> &str {
290 "test"
291 }
292 }
293
294 struct FailingCommand {
295 message: String,
296 }
297
298 #[async_trait]
299 impl Command for FailingCommand {
300 async fn execute(&self) -> Result<serde_json::Value> {
301 Err(LaneError::Other(self.message.clone()))
302 }
303 fn command_type(&self) -> &str {
304 "failing"
305 }
306 }
307
308 async fn make_manager() -> QueueManager {
310 let emitter = EventEmitter::new(100);
311 let queue = Arc::new(CommandQueue::new(emitter));
312
313 let lanes = [
314 (lane_ids::SYSTEM, priorities::SYSTEM, 5),
315 (lane_ids::CONTROL, priorities::CONTROL, 3),
316 (lane_ids::QUERY, priorities::QUERY, 10),
317 (lane_ids::PROMPT, priorities::PROMPT, 2),
318 ];
319
320 for (id, priority, max) in lanes {
321 let config = LaneConfig::new(1, max);
322 queue
323 .register_lane(Arc::new(Lane::new(id, config, priority)))
324 .await;
325 }
326
327 QueueManager::new(queue)
328 }
329
330 #[tokio::test]
335 async fn test_manager_subscribe_yields_events() {
336 use crate::event::events;
337 use tokio_stream::StreamExt;
338
339 let manager = make_manager().await;
340 let mut stream = manager.subscribe();
341
342 manager.start().await.unwrap();
343
344 let cmd = Box::new(TestCommand {
345 result: serde_json::json!({}),
346 });
347 let _ = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
348
349 let event = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next())
351 .await
352 .expect("No event received via manager.subscribe()")
353 .expect("Stream ended");
354
355 assert_eq!(event.key, events::QUEUE_COMMAND_SUBMITTED);
356 }
357
358 #[tokio::test]
359 async fn test_manager_subscribe_filtered() {
360 use crate::event::events;
361 use tokio_stream::StreamExt;
362
363 let manager = make_manager().await;
364 let mut stream = manager.subscribe_filtered(|e| e.key == events::QUEUE_COMMAND_COMPLETED);
366
367 manager.start().await.unwrap();
368
369 let cmd = Box::new(TestCommand {
370 result: serde_json::json!({"done": true}),
371 });
372 let rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
373
374 let _ = tokio::time::timeout(std::time::Duration::from_secs(1), rx).await;
376
377 let event = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next())
378 .await
379 .expect("No completed event received via manager.subscribe_filtered()")
380 .expect("Stream ended");
381
382 assert_eq!(event.key, events::QUEUE_COMMAND_COMPLETED);
383 }
384
385 #[tokio::test]
390 async fn test_queue_manager_builder() {
391 let emitter = EventEmitter::new(100);
392 let manager = QueueManagerBuilder::new(emitter)
393 .with_default_lanes()
394 .build()
395 .await
396 .unwrap();
397
398 let stats = manager.stats().await.unwrap();
399 assert_eq!(stats.lanes.len(), 6);
400 }
401
402 #[tokio::test]
403 async fn test_queue_manager_builder_custom_lanes() {
404 let emitter = EventEmitter::new(100);
405 let manager = QueueManagerBuilder::new(emitter)
406 .with_lane("custom1", LaneConfig::new(1, 4), 0)
407 .with_lane("custom2", LaneConfig::new(2, 8), 1)
408 .build()
409 .await
410 .unwrap();
411
412 let stats = manager.stats().await.unwrap();
413 assert_eq!(stats.lanes.len(), 2);
414 assert!(stats.lanes.contains_key("custom1"));
415 assert!(stats.lanes.contains_key("custom2"));
416 }
417
418 #[tokio::test]
423 async fn test_manager_new() {
424 let manager = make_manager().await;
425
426 let stats = manager.stats().await.unwrap();
427 assert_eq!(stats.total_pending, 0);
428 assert_eq!(stats.total_active, 0);
429 assert_eq!(stats.lanes.len(), 4);
430 }
431
432 #[tokio::test]
433 async fn test_manager_queue_accessor() {
434 let manager = make_manager().await;
435
436 let queue = manager.queue();
437 let status = queue.status().await;
438 assert_eq!(status.len(), 4);
439 }
440
441 #[tokio::test]
446 async fn test_manager_stats_empty() {
447 let manager = make_manager().await;
448
449 let stats = manager.stats().await.unwrap();
450 assert_eq!(stats.total_pending, 0);
451 assert_eq!(stats.total_active, 0);
452
453 assert_eq!(stats.lanes[lane_ids::SYSTEM].pending, 0);
455 assert_eq!(stats.lanes[lane_ids::SYSTEM].max, 5);
456 assert_eq!(stats.lanes[lane_ids::CONTROL].max, 3);
457 assert_eq!(stats.lanes[lane_ids::QUERY].max, 10);
458 assert_eq!(stats.lanes[lane_ids::PROMPT].max, 2);
459 }
460
461 #[tokio::test]
462 async fn test_manager_stats_with_pending() {
463 let manager = make_manager().await;
464
465 for _ in 0..4 {
467 let cmd = Box::new(TestCommand {
468 result: serde_json::json!({}),
469 });
470 let _ = manager.submit(lane_ids::QUERY, cmd).await;
471 }
472
473 let stats = manager.stats().await.unwrap();
474 assert_eq!(stats.total_pending, 4);
475 assert_eq!(stats.total_active, 0);
476 assert_eq!(stats.lanes[lane_ids::QUERY].pending, 4);
477 }
478
479 #[tokio::test]
480 async fn test_manager_stats_multiple_lanes() {
481 let manager = make_manager().await;
482
483 let _ = manager
484 .submit(
485 lane_ids::SYSTEM,
486 Box::new(TestCommand {
487 result: serde_json::json!({}),
488 }),
489 )
490 .await;
491 let _ = manager
492 .submit(
493 lane_ids::QUERY,
494 Box::new(TestCommand {
495 result: serde_json::json!({}),
496 }),
497 )
498 .await;
499 let _ = manager
500 .submit(
501 lane_ids::QUERY,
502 Box::new(TestCommand {
503 result: serde_json::json!({}),
504 }),
505 )
506 .await;
507 let _ = manager
508 .submit(
509 lane_ids::PROMPT,
510 Box::new(TestCommand {
511 result: serde_json::json!({}),
512 }),
513 )
514 .await;
515
516 let stats = manager.stats().await.unwrap();
517 assert_eq!(stats.total_pending, 4);
518 assert_eq!(stats.lanes[lane_ids::SYSTEM].pending, 1);
519 assert_eq!(stats.lanes[lane_ids::QUERY].pending, 2);
520 assert_eq!(stats.lanes[lane_ids::PROMPT].pending, 1);
521 assert_eq!(stats.lanes[lane_ids::CONTROL].pending, 0);
522 }
523
524 #[tokio::test]
529 async fn test_manager_submit_valid_lane() {
530 let manager = make_manager().await;
531
532 let cmd = Box::new(TestCommand {
533 result: serde_json::json!({"data": "ok"}),
534 });
535 let result = manager.submit(lane_ids::QUERY, cmd).await;
536
537 assert!(result.is_ok());
538 }
539
540 #[tokio::test]
541 async fn test_manager_submit_unknown_lane() {
542 let manager = make_manager().await;
543
544 let cmd = Box::new(TestCommand {
545 result: serde_json::json!({}),
546 });
547 let result = manager.submit("nonexistent-lane", cmd).await;
548
549 assert!(result.is_err());
550 }
551
552 #[tokio::test]
553 async fn test_manager_submit_and_execute() {
554 let manager = make_manager().await;
555 manager.start().await.unwrap();
556
557 let cmd = Box::new(TestCommand {
558 result: serde_json::json!({"key": "value"}),
559 });
560 let rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
561
562 let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
563 .await
564 .expect("Timeout")
565 .expect("Channel closed");
566 assert!(result.is_ok());
567 assert_eq!(result.unwrap(), serde_json::json!({"key": "value"}));
568 }
569
570 #[tokio::test]
571 async fn test_manager_submit_failing_command() {
572 let manager = make_manager().await;
573 manager.start().await.unwrap();
574
575 let cmd = Box::new(FailingCommand {
576 message: "manager test failure".to_string(),
577 });
578 let rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
579
580 let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
581 .await
582 .expect("Timeout")
583 .expect("Channel closed");
584 assert!(result.is_err());
585 }
586
587 #[tokio::test]
588 async fn test_manager_submit_multiple_commands() {
589 let manager = make_manager().await;
590 manager.start().await.unwrap();
591
592 let mut receivers = Vec::new();
593 for i in 0..5 {
594 let cmd = Box::new(TestCommand {
595 result: serde_json::json!({"index": i}),
596 });
597 let rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
598 receivers.push(rx);
599 }
600
601 for (i, rx) in receivers.into_iter().enumerate() {
602 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
603 .await
604 .expect("Timeout")
605 .expect("Channel closed");
606 assert!(result.is_ok());
607 let val = result.unwrap();
608 assert_eq!(val["index"], i);
609 }
610 }
611
612 #[tokio::test]
617 async fn test_manager_start() {
618 let manager = make_manager().await;
619
620 let result = manager.start().await;
621 assert!(result.is_ok());
622 }
623
624 #[tokio::test]
625 async fn test_manager_start_drains_pending() {
626 let manager = make_manager().await;
627
628 let cmd = Box::new(TestCommand {
630 result: serde_json::json!({"queued": true}),
631 });
632 let rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
633
634 let stats = manager.stats().await.unwrap();
636 assert_eq!(stats.total_pending, 1);
637
638 manager.start().await.unwrap();
640
641 let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
643 .await
644 .expect("Timeout")
645 .expect("Channel closed");
646 assert!(result.is_ok());
647 assert_eq!(result.unwrap()["queued"], true);
648 }
649
650 #[tokio::test]
655 async fn test_manager_queue_returns_same_instance() {
656 let manager = make_manager().await;
657
658 let q1 = manager.queue();
659 let q2 = manager.queue();
660
661 assert!(Arc::ptr_eq(&q1, &q2));
663 }
664
665 #[tokio::test]
666 async fn test_manager_queue_can_submit_directly() {
667 let manager = make_manager().await;
668 manager.start().await.unwrap();
669
670 let queue = manager.queue();
672 let cmd = Box::new(TestCommand {
673 result: serde_json::json!({"direct": true}),
674 });
675 let rx = queue.submit(lane_ids::SYSTEM, cmd).await.unwrap();
676
677 let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
678 .await
679 .expect("Timeout")
680 .expect("Channel closed");
681 assert!(result.is_ok());
682 assert_eq!(result.unwrap()["direct"], true);
683 }
684
685 #[tokio::test]
690 async fn test_manager_shutdown() {
691 let manager = make_manager().await;
692
693 assert!(!manager.is_shutting_down());
694
695 manager.shutdown().await;
696 assert!(manager.is_shutting_down());
697 }
698
699 #[tokio::test]
700 async fn test_manager_shutdown_rejects_commands() {
701 let manager = make_manager().await;
702
703 manager.shutdown().await;
704
705 let cmd = Box::new(TestCommand {
706 result: serde_json::json!({}),
707 });
708 let result = manager.submit(lane_ids::QUERY, cmd).await;
709
710 assert!(result.is_err());
711 }
712
713 #[tokio::test]
714 async fn test_manager_drain() {
715 let manager = make_manager().await;
716 manager.start().await.unwrap();
717
718 let cmd = Box::new(TestCommand {
720 result: serde_json::json!({"test": "data"}),
721 });
722 let _rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
723
724 manager.shutdown().await;
726 let drain_result = manager.drain(std::time::Duration::from_secs(2)).await;
727
728 assert!(drain_result.is_ok());
729 }
730
731 #[tokio::test]
736 async fn test_manager_with_storage() {
737 use crate::storage::LocalStorage;
738 use tempfile::TempDir;
739
740 let temp_dir = TempDir::new().unwrap();
741 let storage = Arc::new(
742 LocalStorage::new(temp_dir.path().to_path_buf())
743 .await
744 .unwrap(),
745 );
746
747 let emitter = EventEmitter::new(100);
748 let manager = QueueManagerBuilder::new(emitter)
749 .with_storage(storage.clone())
750 .with_lane(lane_ids::QUERY, LaneConfig::new(1, 10), priorities::QUERY)
751 .build()
752 .await
753 .unwrap();
754
755 manager.start().await.unwrap();
756
757 let cmd = Box::new(TestCommand {
759 result: serde_json::json!({"stored": true}),
760 });
761 let rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
762
763 let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
765 .await
766 .expect("Timeout")
767 .expect("Channel closed");
768 assert!(result.is_ok());
769
770 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
772 let stored_commands = storage.load_commands().await.unwrap();
773 assert_eq!(stored_commands.len(), 0);
774 }
775
776 #[tokio::test]
777 async fn test_manager_with_storage_and_dlq() {
778 use crate::storage::LocalStorage;
779 use tempfile::TempDir;
780
781 let temp_dir = TempDir::new().unwrap();
782 let storage = Arc::new(
783 LocalStorage::new(temp_dir.path().to_path_buf())
784 .await
785 .unwrap(),
786 );
787
788 let emitter = EventEmitter::new(100);
789 let manager = QueueManagerBuilder::new(emitter)
790 .with_storage(storage.clone())
791 .with_dlq(100)
792 .with_lane(lane_ids::QUERY, LaneConfig::new(1, 10), priorities::QUERY)
793 .build()
794 .await
795 .unwrap();
796
797 manager.start().await.unwrap();
798
799 let cmd = Box::new(FailingCommand {
801 message: "test error".to_string(),
802 });
803 let rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
804
805 let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
807 .await
808 .expect("Timeout")
809 .expect("Channel closed");
810 assert!(result.is_err());
811
812 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
814 let stored_commands = storage.load_commands().await.unwrap();
815 assert_eq!(stored_commands.len(), 0);
816
817 let stats = manager.stats().await.unwrap();
819 assert_eq!(stats.dead_letter_count, 1);
820 }
821
822 #[cfg(feature = "metrics")]
827 #[tokio::test]
828 async fn test_manager_with_metrics() {
829 let emitter = EventEmitter::new(100);
830 let metrics = QueueMetrics::local();
831
832 let manager = QueueManagerBuilder::new(emitter)
833 .with_metrics(metrics.clone())
834 .with_lane(lane_ids::QUERY, LaneConfig::new(1, 10), priorities::QUERY)
835 .build()
836 .await
837 .unwrap();
838
839 assert!(manager.metrics().is_some());
840
841 let mgr_metrics = manager.metrics().unwrap();
843 let snapshot = mgr_metrics.snapshot().await;
844 assert!(snapshot.counters.is_empty());
845 }
846
847 #[cfg(feature = "monitoring")]
848 #[tokio::test]
849 async fn test_manager_with_alerts() {
850 let emitter = EventEmitter::new(100);
851 let alerts = Arc::new(AlertManager::with_queue_depth_alerts(100, 200));
852
853 let manager = QueueManagerBuilder::new(emitter)
854 .with_alerts(alerts.clone())
855 .with_lane(lane_ids::QUERY, LaneConfig::new(1, 10), priorities::QUERY)
856 .build()
857 .await
858 .unwrap();
859
860 assert!(manager.alerts().is_some());
861
862 let mgr_alerts = manager.alerts().unwrap();
864 let config = mgr_alerts.queue_depth_config().await;
865 assert_eq!(config.warning_threshold, 100);
866 assert_eq!(config.critical_threshold, 200);
867 }
868
869 #[cfg(all(feature = "metrics", feature = "monitoring"))]
870 #[tokio::test]
871 async fn test_manager_with_metrics_and_alerts() {
872 let emitter = EventEmitter::new(100);
873 let metrics = QueueMetrics::local();
874 let alerts = Arc::new(AlertManager::with_latency_alerts(100.0, 500.0));
875
876 let manager = QueueManagerBuilder::new(emitter)
877 .with_metrics(metrics)
878 .with_alerts(alerts)
879 .with_lane(lane_ids::QUERY, LaneConfig::new(1, 10), priorities::QUERY)
880 .build()
881 .await
882 .unwrap();
883
884 assert!(manager.metrics().is_some());
885 assert!(manager.alerts().is_some());
886 }
887
888 #[cfg(all(feature = "metrics", feature = "monitoring"))]
889 #[tokio::test]
890 async fn test_manager_without_observability() {
891 let emitter = EventEmitter::new(100);
892 let manager = QueueManagerBuilder::new(emitter)
893 .with_lane(lane_ids::QUERY, LaneConfig::new(1, 10), priorities::QUERY)
894 .build()
895 .await
896 .unwrap();
897
898 assert!(manager.metrics().is_none());
899 assert!(manager.alerts().is_none());
900 }
901}