Skip to main content

a3s_lane/
manager.rs

1//! Queue manager provides high-level queue management
2
3#[cfg(feature = "monitoring")]
4use crate::alerts::AlertManager;
5use crate::config::LaneConfig;
6use crate::error::Result;
7use crate::event::{EventEmitter, EventStream, LaneEvent};
8#[cfg(feature = "metrics")]
9use crate::metrics::QueueMetrics;
10use crate::queue::{lane_ids, priorities, Command, CommandQueue, Lane};
11use crate::storage::Storage;
12use crate::QueueStats;
13use std::collections::HashMap;
14use std::sync::Arc;
15
16/// Queue manager
17#[allow(dead_code)]
18pub struct QueueManager {
19    queue: Arc<CommandQueue>,
20    scheduler_handle: tokio::sync::Mutex<Option<()>>,
21    #[cfg(feature = "metrics")]
22    metrics: Option<QueueMetrics>,
23    #[cfg(feature = "monitoring")]
24    alerts: Option<Arc<AlertManager>>,
25}
26
27impl QueueManager {
28    /// Create a new queue manager
29    #[allow(dead_code)]
30    pub(crate) fn new(queue: Arc<CommandQueue>) -> Self {
31        Self {
32            queue,
33            scheduler_handle: tokio::sync::Mutex::new(None),
34            #[cfg(feature = "metrics")]
35            metrics: None,
36            #[cfg(feature = "monitoring")]
37            alerts: None,
38        }
39    }
40
41    /// Create a new queue manager with metrics and alerts
42    pub(crate) fn with_observability(
43        queue: Arc<CommandQueue>,
44        #[cfg(feature = "metrics")] metrics: Option<QueueMetrics>,
45        #[cfg(feature = "monitoring")] alerts: Option<Arc<AlertManager>>,
46    ) -> Self {
47        Self {
48            queue,
49            scheduler_handle: tokio::sync::Mutex::new(None),
50            #[cfg(feature = "metrics")]
51            metrics,
52            #[cfg(feature = "monitoring")]
53            alerts,
54        }
55    }
56
57    /// Start the queue scheduler
58    pub async fn start(&self) -> Result<()> {
59        tracing::info!("Starting queue scheduler");
60        let queue = Arc::clone(&self.queue);
61        queue.start_scheduler().await;
62        Ok(())
63    }
64
65    /// Submit a command to a lane
66    pub async fn submit(
67        &self,
68        lane_id: &str,
69        command: Box<dyn Command>,
70    ) -> Result<tokio::sync::oneshot::Receiver<Result<serde_json::Value>>> {
71        #[cfg(feature = "telemetry")]
72        crate::telemetry::record_submit(lane_id);
73        self.queue.submit(lane_id, command).await
74    }
75
76    /// Get queue statistics
77    pub async fn stats(&self) -> Result<QueueStats> {
78        let lane_status = self.queue.status().await;
79
80        let mut total_pending = 0;
81        let mut total_active = 0;
82
83        for status in lane_status.values() {
84            total_pending += status.pending;
85            total_active += status.active;
86        }
87
88        let dead_letter_count = match self.queue.dlq() {
89            Some(dlq) => dlq.len().await,
90            None => 0,
91        };
92
93        Ok(QueueStats {
94            total_pending,
95            total_active,
96            dead_letter_count,
97            lanes: lane_status,
98        })
99    }
100
101    /// Get the underlying command queue
102    pub fn queue(&self) -> Arc<CommandQueue> {
103        Arc::clone(&self.queue)
104    }
105
106    /// Subscribe to all queue lifecycle events as an `EventStream` (implements `Stream`)
107    pub fn subscribe(&self) -> EventStream {
108        self.queue.subscribe_stream()
109    }
110
111    /// Subscribe to filtered queue lifecycle events as an `EventStream`
112    pub fn subscribe_filtered(
113        &self,
114        filter: impl Fn(&LaneEvent) -> bool + Send + Sync + 'static,
115    ) -> EventStream {
116        self.queue.subscribe_filtered(filter)
117    }
118
119    /// Initiate graceful shutdown - stop accepting new commands
120    pub async fn shutdown(&self) {
121        self.queue.shutdown().await;
122    }
123
124    /// Wait for all pending commands to complete (with timeout)
125    pub async fn drain(&self, timeout: std::time::Duration) -> Result<()> {
126        self.queue.drain(timeout).await
127    }
128
129    /// Check if shutdown is in progress
130    pub fn is_shutting_down(&self) -> bool {
131        self.queue.is_shutting_down()
132    }
133
134    /// Get the metrics collector (if configured)
135    #[cfg(feature = "metrics")]
136    pub fn metrics(&self) -> Option<&QueueMetrics> {
137        self.metrics.as_ref()
138    }
139
140    /// Get the alert manager (if configured)
141    #[cfg(feature = "monitoring")]
142    pub fn alerts(&self) -> Option<&Arc<AlertManager>> {
143        self.alerts.as_ref()
144    }
145}
146
147/// Queue manager builder provides a high-level API for managing the command queue
148pub struct QueueManagerBuilder {
149    event_emitter: EventEmitter,
150    lane_configs: HashMap<String, (LaneConfig, u8)>,
151    storage: Option<Arc<dyn Storage>>,
152    dlq_size: Option<usize>,
153    #[cfg(feature = "metrics")]
154    metrics: Option<QueueMetrics>,
155    #[cfg(feature = "monitoring")]
156    alerts: Option<Arc<AlertManager>>,
157}
158
159impl QueueManagerBuilder {
160    /// Create a new queue manager builder
161    pub fn new(event_emitter: EventEmitter) -> Self {
162        Self {
163            event_emitter,
164            lane_configs: HashMap::new(),
165            storage: None,
166            dlq_size: None,
167            #[cfg(feature = "metrics")]
168            metrics: None,
169            #[cfg(feature = "monitoring")]
170            alerts: None,
171        }
172    }
173
174    /// Add a lane configuration
175    pub fn with_lane(mut self, id: impl Into<String>, config: LaneConfig, priority: u8) -> Self {
176        self.lane_configs.insert(id.into(), (config, priority));
177        self
178    }
179
180    /// Add storage backend for persistence
181    pub fn with_storage(mut self, storage: Arc<dyn Storage>) -> Self {
182        self.storage = Some(storage);
183        self
184    }
185
186    /// Add dead letter queue with specified size
187    pub fn with_dlq(mut self, size: usize) -> Self {
188        self.dlq_size = Some(size);
189        self
190    }
191
192    /// Add metrics collection
193    #[cfg(feature = "metrics")]
194    pub fn with_metrics(mut self, metrics: QueueMetrics) -> Self {
195        self.metrics = Some(metrics);
196        self
197    }
198
199    /// Add alert manager
200    #[cfg(feature = "monitoring")]
201    pub fn with_alerts(mut self, alerts: Arc<AlertManager>) -> Self {
202        self.alerts = Some(alerts);
203        self
204    }
205
206    /// Add default lanes (system, control, query, session, skill, prompt)
207    pub fn with_default_lanes(mut self) -> Self {
208        self.lane_configs.insert(
209            lane_ids::SYSTEM.to_string(),
210            (LaneConfig::new(1, 5), priorities::SYSTEM),
211        );
212        self.lane_configs.insert(
213            lane_ids::CONTROL.to_string(),
214            (LaneConfig::new(1, 3), priorities::CONTROL),
215        );
216        self.lane_configs.insert(
217            lane_ids::QUERY.to_string(),
218            (LaneConfig::new(1, 10), priorities::QUERY),
219        );
220        self.lane_configs.insert(
221            lane_ids::SESSION.to_string(),
222            (LaneConfig::new(1, 5), priorities::SESSION),
223        );
224        self.lane_configs.insert(
225            lane_ids::SKILL.to_string(),
226            (LaneConfig::new(1, 3), priorities::SKILL),
227        );
228        self.lane_configs.insert(
229            lane_ids::PROMPT.to_string(),
230            (LaneConfig::new(1, 2), priorities::PROMPT),
231        );
232        self
233    }
234
235    /// Build the queue manager
236    pub async fn build(self) -> Result<QueueManager> {
237        // Create queue with appropriate configuration
238        let queue = match (self.dlq_size, self.storage.clone()) {
239            (Some(dlq_size), Some(storage)) => Arc::new(CommandQueue::with_dlq_and_storage(
240                self.event_emitter,
241                dlq_size,
242                storage.clone(),
243            )),
244            (Some(dlq_size), None) => {
245                Arc::new(CommandQueue::with_dlq(self.event_emitter, dlq_size))
246            }
247            (None, Some(storage)) => Arc::new(CommandQueue::with_storage(
248                self.event_emitter,
249                storage.clone(),
250            )),
251            (None, None) => Arc::new(CommandQueue::new(self.event_emitter)),
252        };
253
254        // Register all lanes
255        for (id, (config, priority)) in self.lane_configs {
256            let lane = if let Some(storage) = &self.storage {
257                Arc::new(Lane::with_storage(id, config, priority, storage.clone()))
258            } else {
259                Arc::new(Lane::new(id, config, priority))
260            };
261            queue.register_lane(lane).await;
262        }
263
264        Ok(QueueManager::with_observability(
265            queue,
266            #[cfg(feature = "metrics")]
267            self.metrics,
268            #[cfg(feature = "monitoring")]
269            self.alerts,
270        ))
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277    use crate::error::LaneError;
278    use async_trait::async_trait;
279
280    struct TestCommand {
281        result: serde_json::Value,
282    }
283
284    #[async_trait]
285    impl Command for TestCommand {
286        async fn execute(&self) -> Result<serde_json::Value> {
287            Ok(self.result.clone())
288        }
289        fn command_type(&self) -> &str {
290            "test"
291        }
292    }
293
294    struct FailingCommand {
295        message: String,
296    }
297
298    #[async_trait]
299    impl Command for FailingCommand {
300        async fn execute(&self) -> Result<serde_json::Value> {
301            Err(LaneError::Other(self.message.clone()))
302        }
303        fn command_type(&self) -> &str {
304            "failing"
305        }
306    }
307
308    /// Helper: build a QueueManager with standard lanes
309    async fn make_manager() -> QueueManager {
310        let emitter = EventEmitter::new(100);
311        let queue = Arc::new(CommandQueue::new(emitter));
312
313        let lanes = [
314            (lane_ids::SYSTEM, priorities::SYSTEM, 5),
315            (lane_ids::CONTROL, priorities::CONTROL, 3),
316            (lane_ids::QUERY, priorities::QUERY, 10),
317            (lane_ids::PROMPT, priorities::PROMPT, 2),
318        ];
319
320        for (id, priority, max) in lanes {
321            let config = LaneConfig::new(1, max);
322            queue
323                .register_lane(Arc::new(Lane::new(id, config, priority)))
324                .await;
325        }
326
327        QueueManager::new(queue)
328    }
329
330    // ========================================================================
331    // subscribe() Tests
332    // ========================================================================
333
334    #[tokio::test]
335    async fn test_manager_subscribe_yields_events() {
336        use crate::event::events;
337        use tokio_stream::StreamExt;
338
339        let manager = make_manager().await;
340        let mut stream = manager.subscribe();
341
342        manager.start().await.unwrap();
343
344        let cmd = Box::new(TestCommand {
345            result: serde_json::json!({}),
346        });
347        let _ = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
348
349        // The first event emitted is QUEUE_COMMAND_SUBMITTED
350        let event = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next())
351            .await
352            .expect("No event received via manager.subscribe()")
353            .expect("Stream ended");
354
355        assert_eq!(event.key, events::QUEUE_COMMAND_SUBMITTED);
356    }
357
358    #[tokio::test]
359    async fn test_manager_subscribe_filtered() {
360        use crate::event::events;
361        use tokio_stream::StreamExt;
362
363        let manager = make_manager().await;
364        // Only capture completed events; submitted/started events are filtered out
365        let mut stream = manager.subscribe_filtered(|e| e.key == events::QUEUE_COMMAND_COMPLETED);
366
367        manager.start().await.unwrap();
368
369        let cmd = Box::new(TestCommand {
370            result: serde_json::json!({"done": true}),
371        });
372        let rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
373
374        // Wait for command to finish
375        let _ = tokio::time::timeout(std::time::Duration::from_secs(1), rx).await;
376
377        let event = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next())
378            .await
379            .expect("No completed event received via manager.subscribe_filtered()")
380            .expect("Stream ended");
381
382        assert_eq!(event.key, events::QUEUE_COMMAND_COMPLETED);
383    }
384
385    // ========================================================================
386    // Builder Tests
387    // ========================================================================
388
389    #[tokio::test]
390    async fn test_queue_manager_builder() {
391        let emitter = EventEmitter::new(100);
392        let manager = QueueManagerBuilder::new(emitter)
393            .with_default_lanes()
394            .build()
395            .await
396            .unwrap();
397
398        let stats = manager.stats().await.unwrap();
399        assert_eq!(stats.lanes.len(), 6);
400    }
401
402    #[tokio::test]
403    async fn test_queue_manager_builder_custom_lanes() {
404        let emitter = EventEmitter::new(100);
405        let manager = QueueManagerBuilder::new(emitter)
406            .with_lane("custom1", LaneConfig::new(1, 4), 0)
407            .with_lane("custom2", LaneConfig::new(2, 8), 1)
408            .build()
409            .await
410            .unwrap();
411
412        let stats = manager.stats().await.unwrap();
413        assert_eq!(stats.lanes.len(), 2);
414        assert!(stats.lanes.contains_key("custom1"));
415        assert!(stats.lanes.contains_key("custom2"));
416    }
417
418    // ========================================================================
419    // Construction Tests
420    // ========================================================================
421
422    #[tokio::test]
423    async fn test_manager_new() {
424        let manager = make_manager().await;
425
426        let stats = manager.stats().await.unwrap();
427        assert_eq!(stats.total_pending, 0);
428        assert_eq!(stats.total_active, 0);
429        assert_eq!(stats.lanes.len(), 4);
430    }
431
432    #[tokio::test]
433    async fn test_manager_queue_accessor() {
434        let manager = make_manager().await;
435
436        let queue = manager.queue();
437        let status = queue.status().await;
438        assert_eq!(status.len(), 4);
439    }
440
441    // ========================================================================
442    // stats() Tests
443    // ========================================================================
444
445    #[tokio::test]
446    async fn test_manager_stats_empty() {
447        let manager = make_manager().await;
448
449        let stats = manager.stats().await.unwrap();
450        assert_eq!(stats.total_pending, 0);
451        assert_eq!(stats.total_active, 0);
452
453        // Check each lane
454        assert_eq!(stats.lanes[lane_ids::SYSTEM].pending, 0);
455        assert_eq!(stats.lanes[lane_ids::SYSTEM].max, 5);
456        assert_eq!(stats.lanes[lane_ids::CONTROL].max, 3);
457        assert_eq!(stats.lanes[lane_ids::QUERY].max, 10);
458        assert_eq!(stats.lanes[lane_ids::PROMPT].max, 2);
459    }
460
461    #[tokio::test]
462    async fn test_manager_stats_with_pending() {
463        let manager = make_manager().await;
464
465        // Submit commands without starting scheduler
466        for _ in 0..4 {
467            let cmd = Box::new(TestCommand {
468                result: serde_json::json!({}),
469            });
470            let _ = manager.submit(lane_ids::QUERY, cmd).await;
471        }
472
473        let stats = manager.stats().await.unwrap();
474        assert_eq!(stats.total_pending, 4);
475        assert_eq!(stats.total_active, 0);
476        assert_eq!(stats.lanes[lane_ids::QUERY].pending, 4);
477    }
478
479    #[tokio::test]
480    async fn test_manager_stats_multiple_lanes() {
481        let manager = make_manager().await;
482
483        let _ = manager
484            .submit(
485                lane_ids::SYSTEM,
486                Box::new(TestCommand {
487                    result: serde_json::json!({}),
488                }),
489            )
490            .await;
491        let _ = manager
492            .submit(
493                lane_ids::QUERY,
494                Box::new(TestCommand {
495                    result: serde_json::json!({}),
496                }),
497            )
498            .await;
499        let _ = manager
500            .submit(
501                lane_ids::QUERY,
502                Box::new(TestCommand {
503                    result: serde_json::json!({}),
504                }),
505            )
506            .await;
507        let _ = manager
508            .submit(
509                lane_ids::PROMPT,
510                Box::new(TestCommand {
511                    result: serde_json::json!({}),
512                }),
513            )
514            .await;
515
516        let stats = manager.stats().await.unwrap();
517        assert_eq!(stats.total_pending, 4);
518        assert_eq!(stats.lanes[lane_ids::SYSTEM].pending, 1);
519        assert_eq!(stats.lanes[lane_ids::QUERY].pending, 2);
520        assert_eq!(stats.lanes[lane_ids::PROMPT].pending, 1);
521        assert_eq!(stats.lanes[lane_ids::CONTROL].pending, 0);
522    }
523
524    // ========================================================================
525    // submit() Tests
526    // ========================================================================
527
528    #[tokio::test]
529    async fn test_manager_submit_valid_lane() {
530        let manager = make_manager().await;
531
532        let cmd = Box::new(TestCommand {
533            result: serde_json::json!({"data": "ok"}),
534        });
535        let result = manager.submit(lane_ids::QUERY, cmd).await;
536
537        assert!(result.is_ok());
538    }
539
540    #[tokio::test]
541    async fn test_manager_submit_unknown_lane() {
542        let manager = make_manager().await;
543
544        let cmd = Box::new(TestCommand {
545            result: serde_json::json!({}),
546        });
547        let result = manager.submit("nonexistent-lane", cmd).await;
548
549        assert!(result.is_err());
550    }
551
552    #[tokio::test]
553    async fn test_manager_submit_and_execute() {
554        let manager = make_manager().await;
555        manager.start().await.unwrap();
556
557        let cmd = Box::new(TestCommand {
558            result: serde_json::json!({"key": "value"}),
559        });
560        let rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
561
562        let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
563            .await
564            .expect("Timeout")
565            .expect("Channel closed");
566        assert!(result.is_ok());
567        assert_eq!(result.unwrap(), serde_json::json!({"key": "value"}));
568    }
569
570    #[tokio::test]
571    async fn test_manager_submit_failing_command() {
572        let manager = make_manager().await;
573        manager.start().await.unwrap();
574
575        let cmd = Box::new(FailingCommand {
576            message: "manager test failure".to_string(),
577        });
578        let rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
579
580        let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
581            .await
582            .expect("Timeout")
583            .expect("Channel closed");
584        assert!(result.is_err());
585    }
586
587    #[tokio::test]
588    async fn test_manager_submit_multiple_commands() {
589        let manager = make_manager().await;
590        manager.start().await.unwrap();
591
592        let mut receivers = Vec::new();
593        for i in 0..5 {
594            let cmd = Box::new(TestCommand {
595                result: serde_json::json!({"index": i}),
596            });
597            let rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
598            receivers.push(rx);
599        }
600
601        for (i, rx) in receivers.into_iter().enumerate() {
602            let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
603                .await
604                .expect("Timeout")
605                .expect("Channel closed");
606            assert!(result.is_ok());
607            let val = result.unwrap();
608            assert_eq!(val["index"], i);
609        }
610    }
611
612    // ========================================================================
613    // start() Tests
614    // ========================================================================
615
616    #[tokio::test]
617    async fn test_manager_start() {
618        let manager = make_manager().await;
619
620        let result = manager.start().await;
621        assert!(result.is_ok());
622    }
623
624    #[tokio::test]
625    async fn test_manager_start_drains_pending() {
626        let manager = make_manager().await;
627
628        // Submit before start
629        let cmd = Box::new(TestCommand {
630            result: serde_json::json!({"queued": true}),
631        });
632        let rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
633
634        // Verify pending
635        let stats = manager.stats().await.unwrap();
636        assert_eq!(stats.total_pending, 1);
637
638        // Start scheduler
639        manager.start().await.unwrap();
640
641        // Command should now execute
642        let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
643            .await
644            .expect("Timeout")
645            .expect("Channel closed");
646        assert!(result.is_ok());
647        assert_eq!(result.unwrap()["queued"], true);
648    }
649
650    // ========================================================================
651    // queue() Accessor Tests
652    // ========================================================================
653
654    #[tokio::test]
655    async fn test_manager_queue_returns_same_instance() {
656        let manager = make_manager().await;
657
658        let q1 = manager.queue();
659        let q2 = manager.queue();
660
661        // Both should point to the same underlying data
662        assert!(Arc::ptr_eq(&q1, &q2));
663    }
664
665    #[tokio::test]
666    async fn test_manager_queue_can_submit_directly() {
667        let manager = make_manager().await;
668        manager.start().await.unwrap();
669
670        // Submit directly via underlying queue
671        let queue = manager.queue();
672        let cmd = Box::new(TestCommand {
673            result: serde_json::json!({"direct": true}),
674        });
675        let rx = queue.submit(lane_ids::SYSTEM, cmd).await.unwrap();
676
677        let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
678            .await
679            .expect("Timeout")
680            .expect("Channel closed");
681        assert!(result.is_ok());
682        assert_eq!(result.unwrap()["direct"], true);
683    }
684
685    // ========================================================================
686    // Shutdown Tests
687    // ========================================================================
688
689    #[tokio::test]
690    async fn test_manager_shutdown() {
691        let manager = make_manager().await;
692
693        assert!(!manager.is_shutting_down());
694
695        manager.shutdown().await;
696        assert!(manager.is_shutting_down());
697    }
698
699    #[tokio::test]
700    async fn test_manager_shutdown_rejects_commands() {
701        let manager = make_manager().await;
702
703        manager.shutdown().await;
704
705        let cmd = Box::new(TestCommand {
706            result: serde_json::json!({}),
707        });
708        let result = manager.submit(lane_ids::QUERY, cmd).await;
709
710        assert!(result.is_err());
711    }
712
713    #[tokio::test]
714    async fn test_manager_drain() {
715        let manager = make_manager().await;
716        manager.start().await.unwrap();
717
718        // Submit a command
719        let cmd = Box::new(TestCommand {
720            result: serde_json::json!({"test": "data"}),
721        });
722        let _rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
723
724        // Shutdown and drain
725        manager.shutdown().await;
726        let drain_result = manager.drain(std::time::Duration::from_secs(2)).await;
727
728        assert!(drain_result.is_ok());
729    }
730
731    // ========================================================================
732    // Storage Tests
733    // ========================================================================
734
735    #[tokio::test]
736    async fn test_manager_with_storage() {
737        use crate::storage::LocalStorage;
738        use tempfile::TempDir;
739
740        let temp_dir = TempDir::new().unwrap();
741        let storage = Arc::new(
742            LocalStorage::new(temp_dir.path().to_path_buf())
743                .await
744                .unwrap(),
745        );
746
747        let emitter = EventEmitter::new(100);
748        let manager = QueueManagerBuilder::new(emitter)
749            .with_storage(storage.clone())
750            .with_lane(lane_ids::QUERY, LaneConfig::new(1, 10), priorities::QUERY)
751            .build()
752            .await
753            .unwrap();
754
755        manager.start().await.unwrap();
756
757        // Submit a command
758        let cmd = Box::new(TestCommand {
759            result: serde_json::json!({"stored": true}),
760        });
761        let rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
762
763        // Wait for command to complete
764        let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
765            .await
766            .expect("Timeout")
767            .expect("Channel closed");
768        assert!(result.is_ok());
769
770        // Verify command was removed from storage after completion
771        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
772        let stored_commands = storage.load_commands().await.unwrap();
773        assert_eq!(stored_commands.len(), 0);
774    }
775
776    #[tokio::test]
777    async fn test_manager_with_storage_and_dlq() {
778        use crate::storage::LocalStorage;
779        use tempfile::TempDir;
780
781        let temp_dir = TempDir::new().unwrap();
782        let storage = Arc::new(
783            LocalStorage::new(temp_dir.path().to_path_buf())
784                .await
785                .unwrap(),
786        );
787
788        let emitter = EventEmitter::new(100);
789        let manager = QueueManagerBuilder::new(emitter)
790            .with_storage(storage.clone())
791            .with_dlq(100)
792            .with_lane(lane_ids::QUERY, LaneConfig::new(1, 10), priorities::QUERY)
793            .build()
794            .await
795            .unwrap();
796
797        manager.start().await.unwrap();
798
799        // Submit a failing command
800        let cmd = Box::new(FailingCommand {
801            message: "test error".to_string(),
802        });
803        let rx = manager.submit(lane_ids::QUERY, cmd).await.unwrap();
804
805        // Wait for command to fail
806        let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
807            .await
808            .expect("Timeout")
809            .expect("Channel closed");
810        assert!(result.is_err());
811
812        // Verify command was removed from storage after failure
813        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
814        let stored_commands = storage.load_commands().await.unwrap();
815        assert_eq!(stored_commands.len(), 0);
816
817        // Verify DLQ has the failed command
818        let stats = manager.stats().await.unwrap();
819        assert_eq!(stats.dead_letter_count, 1);
820    }
821
822    // ========================================================================
823    // Observability Tests
824    // ========================================================================
825
826    #[cfg(feature = "metrics")]
827    #[tokio::test]
828    async fn test_manager_with_metrics() {
829        let emitter = EventEmitter::new(100);
830        let metrics = QueueMetrics::local();
831
832        let manager = QueueManagerBuilder::new(emitter)
833            .with_metrics(metrics.clone())
834            .with_lane(lane_ids::QUERY, LaneConfig::new(1, 10), priorities::QUERY)
835            .build()
836            .await
837            .unwrap();
838
839        assert!(manager.metrics().is_some());
840
841        // Verify metrics are accessible
842        let mgr_metrics = manager.metrics().unwrap();
843        let snapshot = mgr_metrics.snapshot().await;
844        assert!(snapshot.counters.is_empty());
845    }
846
847    #[cfg(feature = "monitoring")]
848    #[tokio::test]
849    async fn test_manager_with_alerts() {
850        let emitter = EventEmitter::new(100);
851        let alerts = Arc::new(AlertManager::with_queue_depth_alerts(100, 200));
852
853        let manager = QueueManagerBuilder::new(emitter)
854            .with_alerts(alerts.clone())
855            .with_lane(lane_ids::QUERY, LaneConfig::new(1, 10), priorities::QUERY)
856            .build()
857            .await
858            .unwrap();
859
860        assert!(manager.alerts().is_some());
861
862        // Verify alerts are accessible
863        let mgr_alerts = manager.alerts().unwrap();
864        let config = mgr_alerts.queue_depth_config().await;
865        assert_eq!(config.warning_threshold, 100);
866        assert_eq!(config.critical_threshold, 200);
867    }
868
869    #[cfg(all(feature = "metrics", feature = "monitoring"))]
870    #[tokio::test]
871    async fn test_manager_with_metrics_and_alerts() {
872        let emitter = EventEmitter::new(100);
873        let metrics = QueueMetrics::local();
874        let alerts = Arc::new(AlertManager::with_latency_alerts(100.0, 500.0));
875
876        let manager = QueueManagerBuilder::new(emitter)
877            .with_metrics(metrics)
878            .with_alerts(alerts)
879            .with_lane(lane_ids::QUERY, LaneConfig::new(1, 10), priorities::QUERY)
880            .build()
881            .await
882            .unwrap();
883
884        assert!(manager.metrics().is_some());
885        assert!(manager.alerts().is_some());
886    }
887
888    #[cfg(all(feature = "metrics", feature = "monitoring"))]
889    #[tokio::test]
890    async fn test_manager_without_observability() {
891        let emitter = EventEmitter::new(100);
892        let manager = QueueManagerBuilder::new(emitter)
893            .with_lane(lane_ids::QUERY, LaneConfig::new(1, 10), priorities::QUERY)
894            .build()
895            .await
896            .unwrap();
897
898        assert!(manager.metrics().is_none());
899        assert!(manager.alerts().is_none());
900    }
901}