Skip to main content

a3s_lane/
monitor.rs

1//! Queue monitor for tracking queue metrics and health
2
3use crate::queue::CommandQueue;
4use crate::QueueStats;
5use std::sync::Arc;
6use std::time::Duration;
7use tracing::{debug, warn};
8
9/// Queue monitor configuration
10#[derive(Debug, Clone)]
11pub struct MonitorConfig {
12    /// Monitoring interval
13    pub interval: Duration,
14    /// Warning threshold for pending commands
15    pub pending_warning_threshold: usize,
16    /// Warning threshold for active commands
17    pub active_warning_threshold: usize,
18}
19
20impl Default for MonitorConfig {
21    fn default() -> Self {
22        Self {
23            interval: Duration::from_secs(10),
24            pending_warning_threshold: 100,
25            active_warning_threshold: 50,
26        }
27    }
28}
29
30/// Queue monitor
31pub struct QueueMonitor {
32    queue: Arc<CommandQueue>,
33    config: MonitorConfig,
34}
35
36impl QueueMonitor {
37    /// Create a new queue monitor
38    pub fn new(queue: Arc<CommandQueue>) -> Self {
39        Self::with_config(queue, MonitorConfig::default())
40    }
41
42    /// Create a new queue monitor with custom configuration
43    pub fn with_config(queue: Arc<CommandQueue>, config: MonitorConfig) -> Self {
44        Self { queue, config }
45    }
46
47    /// Start monitoring
48    pub async fn start(self: Arc<Self>) {
49        let mut ticker = tokio::time::interval(self.config.interval);
50
51        tokio::spawn(async move {
52            loop {
53                ticker.tick().await;
54                self.check_health().await;
55            }
56        });
57    }
58
59    /// Check queue health
60    async fn check_health(&self) {
61        let status = self.queue.status().await;
62
63        let mut total_pending = 0;
64        let mut total_active = 0;
65
66        for (lane_id, lane_status) in status.iter() {
67            total_pending += lane_status.pending;
68            total_active += lane_status.active;
69
70            debug!(
71                "Lane {}: pending={}, active={}, max={}",
72                lane_id, lane_status.pending, lane_status.active, lane_status.max
73            );
74
75            // Check if lane is at capacity
76            if lane_status.active >= lane_status.max {
77                warn!("Lane {} is at maximum capacity", lane_id);
78            }
79        }
80
81        // Check global thresholds
82        if total_pending > self.config.pending_warning_threshold {
83            warn!(
84                "High number of pending commands: {} (threshold: {})",
85                total_pending, self.config.pending_warning_threshold
86            );
87        }
88
89        if total_active > self.config.active_warning_threshold {
90            warn!(
91                "High number of active commands: {} (threshold: {})",
92                total_active, self.config.active_warning_threshold
93            );
94        }
95    }
96
97    /// Get current statistics
98    pub async fn stats(&self) -> QueueStats {
99        let lane_status = self.queue.status().await;
100
101        let mut total_pending = 0;
102        let mut total_active = 0;
103
104        for status in lane_status.values() {
105            total_pending += status.pending;
106            total_active += status.active;
107        }
108
109        let dead_letter_count = match self.queue.dlq() {
110            Some(dlq) => dlq.len().await,
111            None => 0,
112        };
113
114        QueueStats {
115            total_pending,
116            total_active,
117            dead_letter_count,
118            lanes: lane_status,
119        }
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use crate::config::LaneConfig;
127    use crate::error::Result;
128    use crate::event::EventEmitter;
129    use crate::queue::{lane_ids, priorities, Command, Lane};
130    use async_trait::async_trait;
131
132    /// Helper: create a CommandQueue with default lanes registered
133    async fn make_queue_with_lanes() -> Arc<CommandQueue> {
134        let emitter = EventEmitter::new(100);
135        let queue = Arc::new(CommandQueue::new(emitter));
136
137        let lanes = [
138            (lane_ids::SYSTEM, priorities::SYSTEM, 5),
139            (lane_ids::CONTROL, priorities::CONTROL, 3),
140            (lane_ids::QUERY, priorities::QUERY, 10),
141            (lane_ids::PROMPT, priorities::PROMPT, 2),
142        ];
143
144        for (id, priority, max) in lanes {
145            let config = LaneConfig::new(1, max);
146            queue
147                .register_lane(Arc::new(Lane::new(id, config, priority)))
148                .await;
149        }
150
151        queue
152    }
153
154    struct TestCommand {
155        result: serde_json::Value,
156    }
157
158    #[async_trait]
159    impl Command for TestCommand {
160        async fn execute(&self) -> Result<serde_json::Value> {
161            Ok(self.result.clone())
162        }
163        fn command_type(&self) -> &str {
164            "test"
165        }
166    }
167
168    // ========================================================================
169    // MonitorConfig Tests
170    // ========================================================================
171
172    #[test]
173    fn test_monitor_config_default() {
174        let config = MonitorConfig::default();
175        assert_eq!(config.interval, Duration::from_secs(10));
176        assert_eq!(config.pending_warning_threshold, 100);
177        assert_eq!(config.active_warning_threshold, 50);
178    }
179
180    #[test]
181    fn test_monitor_config_custom() {
182        let config = MonitorConfig {
183            interval: Duration::from_secs(5),
184            pending_warning_threshold: 50,
185            active_warning_threshold: 20,
186        };
187        assert_eq!(config.interval, Duration::from_secs(5));
188        assert_eq!(config.pending_warning_threshold, 50);
189        assert_eq!(config.active_warning_threshold, 20);
190    }
191
192    #[test]
193    fn test_monitor_config_clone() {
194        let config = MonitorConfig {
195            interval: Duration::from_millis(500),
196            pending_warning_threshold: 10,
197            active_warning_threshold: 5,
198        };
199        let cloned = config.clone();
200        assert_eq!(cloned.interval, Duration::from_millis(500));
201        assert_eq!(cloned.pending_warning_threshold, 10);
202        assert_eq!(cloned.active_warning_threshold, 5);
203    }
204
205    #[test]
206    fn test_monitor_config_debug() {
207        let config = MonitorConfig::default();
208        let debug_str = format!("{:?}", config);
209        assert!(debug_str.contains("MonitorConfig"));
210        assert!(debug_str.contains("interval"));
211        assert!(debug_str.contains("pending_warning_threshold"));
212        assert!(debug_str.contains("active_warning_threshold"));
213    }
214
215    // ========================================================================
216    // QueueMonitor Construction Tests
217    // ========================================================================
218
219    #[tokio::test]
220    async fn test_monitor_new_default_config() {
221        let queue = make_queue_with_lanes().await;
222        let monitor = QueueMonitor::new(Arc::clone(&queue));
223        assert_eq!(monitor.config.interval, Duration::from_secs(10));
224        assert_eq!(monitor.config.pending_warning_threshold, 100);
225        assert_eq!(monitor.config.active_warning_threshold, 50);
226    }
227
228    #[tokio::test]
229    async fn test_monitor_with_config() {
230        let queue = make_queue_with_lanes().await;
231        let config = MonitorConfig {
232            interval: Duration::from_secs(1),
233            pending_warning_threshold: 10,
234            active_warning_threshold: 5,
235        };
236        let monitor = QueueMonitor::with_config(Arc::clone(&queue), config);
237        assert_eq!(monitor.config.interval, Duration::from_secs(1));
238        assert_eq!(monitor.config.pending_warning_threshold, 10);
239        assert_eq!(monitor.config.active_warning_threshold, 5);
240    }
241
242    // ========================================================================
243    // stats() Tests
244    // ========================================================================
245
246    #[tokio::test]
247    async fn test_monitor_stats_empty_queue() {
248        let queue = make_queue_with_lanes().await;
249        let monitor = QueueMonitor::new(queue);
250
251        let stats = monitor.stats().await;
252        assert_eq!(stats.total_pending, 0);
253        assert_eq!(stats.total_active, 0);
254        assert_eq!(stats.lanes.len(), 4);
255    }
256
257    #[tokio::test]
258    async fn test_monitor_stats_with_pending_commands() {
259        let queue = make_queue_with_lanes().await;
260
261        // Enqueue commands (without starting scheduler, they stay pending)
262        for _ in 0..3 {
263            let cmd = Box::new(TestCommand {
264                result: serde_json::json!({}),
265            });
266            let _ = queue.submit(lane_ids::QUERY, cmd).await;
267        }
268        for _ in 0..2 {
269            let cmd = Box::new(TestCommand {
270                result: serde_json::json!({}),
271            });
272            let _ = queue.submit(lane_ids::PROMPT, cmd).await;
273        }
274
275        let monitor = QueueMonitor::new(queue);
276        let stats = monitor.stats().await;
277
278        assert_eq!(stats.total_pending, 5);
279        assert_eq!(stats.total_active, 0);
280        assert_eq!(stats.lanes[lane_ids::QUERY].pending, 3);
281        assert_eq!(stats.lanes[lane_ids::PROMPT].pending, 2);
282        assert_eq!(stats.lanes[lane_ids::SYSTEM].pending, 0);
283        assert_eq!(stats.lanes[lane_ids::CONTROL].pending, 0);
284    }
285
286    #[tokio::test]
287    async fn test_monitor_stats_lane_concurrency() {
288        let queue = make_queue_with_lanes().await;
289        let monitor = QueueMonitor::new(queue);
290
291        let stats = monitor.stats().await;
292        assert_eq!(stats.lanes[lane_ids::SYSTEM].max, 5);
293        assert_eq!(stats.lanes[lane_ids::CONTROL].max, 3);
294        assert_eq!(stats.lanes[lane_ids::QUERY].max, 10);
295        assert_eq!(stats.lanes[lane_ids::PROMPT].max, 2);
296    }
297
298    #[tokio::test]
299    async fn test_monitor_stats_no_lanes() {
300        let emitter = EventEmitter::new(100);
301        let queue = Arc::new(CommandQueue::new(emitter));
302        let monitor = QueueMonitor::new(queue);
303
304        let stats = monitor.stats().await;
305        assert_eq!(stats.total_pending, 0);
306        assert_eq!(stats.total_active, 0);
307        assert!(stats.lanes.is_empty());
308    }
309
310    // ========================================================================
311    // check_health() Tests
312    // ========================================================================
313
314    #[tokio::test]
315    async fn test_check_health_no_warnings() {
316        let queue = make_queue_with_lanes().await;
317        let monitor = QueueMonitor::new(queue);
318
319        // Should not panic or error with empty queue
320        monitor.check_health().await;
321    }
322
323    #[tokio::test]
324    async fn test_check_health_with_pending_below_threshold() {
325        let queue = make_queue_with_lanes().await;
326
327        // Add a few commands (below default threshold of 100)
328        for _ in 0..5 {
329            let cmd = Box::new(TestCommand {
330                result: serde_json::json!({}),
331            });
332            let _ = queue.submit(lane_ids::QUERY, cmd).await;
333        }
334
335        let monitor = QueueMonitor::new(queue);
336
337        // Should complete without errors
338        monitor.check_health().await;
339    }
340
341    #[tokio::test]
342    async fn test_check_health_with_pending_above_threshold() {
343        let queue = make_queue_with_lanes().await;
344
345        // Set a low threshold and add commands above it
346        let config = MonitorConfig {
347            interval: Duration::from_secs(10),
348            pending_warning_threshold: 3,
349            active_warning_threshold: 50,
350        };
351
352        // Add 5 commands (above threshold of 3)
353        for _ in 0..5 {
354            let cmd = Box::new(TestCommand {
355                result: serde_json::json!({}),
356            });
357            let _ = queue.submit(lane_ids::QUERY, cmd).await;
358        }
359
360        let monitor = QueueMonitor::with_config(queue, config);
361
362        // Should not panic, just emit warning log
363        monitor.check_health().await;
364    }
365
366    #[tokio::test]
367    async fn test_check_health_at_exact_threshold() {
368        let queue = make_queue_with_lanes().await;
369
370        let config = MonitorConfig {
371            interval: Duration::from_secs(10),
372            pending_warning_threshold: 3,
373            active_warning_threshold: 50,
374        };
375
376        // Add exactly 3 commands (equal to threshold, should NOT trigger warning)
377        for _ in 0..3 {
378            let cmd = Box::new(TestCommand {
379                result: serde_json::json!({}),
380            });
381            let _ = queue.submit(lane_ids::QUERY, cmd).await;
382        }
383
384        let monitor = QueueMonitor::with_config(queue, config);
385        monitor.check_health().await;
386    }
387
388    // ========================================================================
389    // start() Tests
390    // ========================================================================
391
392    #[tokio::test]
393    async fn test_monitor_start_runs_periodically() {
394        let queue = make_queue_with_lanes().await;
395        let config = MonitorConfig {
396            interval: Duration::from_millis(30),
397            pending_warning_threshold: 100,
398            active_warning_threshold: 50,
399        };
400        let monitor = Arc::new(QueueMonitor::with_config(queue, config));
401
402        // Clone Arc before start() consumes it
403        let monitor_ref = Arc::clone(&monitor);
404        monitor.start().await;
405
406        // Let monitor run a few cycles
407        tokio::time::sleep(Duration::from_millis(100)).await;
408
409        // Verify the monitor is still functional via the cloned reference
410        let stats = monitor_ref.stats().await;
411        assert_eq!(stats.total_pending, 0);
412        assert_eq!(stats.total_active, 0);
413    }
414
415    #[tokio::test]
416    async fn test_monitor_start_with_commands() {
417        let queue = make_queue_with_lanes().await;
418
419        // Add some pending commands
420        for _ in 0..3 {
421            let cmd = Box::new(TestCommand {
422                result: serde_json::json!({}),
423            });
424            let _ = queue.submit(lane_ids::QUERY, cmd).await;
425        }
426
427        let config = MonitorConfig {
428            interval: Duration::from_millis(20),
429            pending_warning_threshold: 2, // Below current pending, will trigger warning
430            active_warning_threshold: 50,
431        };
432        let monitor = Arc::new(QueueMonitor::with_config(queue, config));
433
434        // Clone Arc before start() consumes it
435        let monitor_ref = Arc::clone(&monitor);
436        monitor.start().await;
437
438        // Let monitor run and check health (should not panic even with warnings)
439        tokio::time::sleep(Duration::from_millis(80)).await;
440
441        let stats = monitor_ref.stats().await;
442        assert_eq!(stats.total_pending, 3);
443    }
444
445    // ========================================================================
446    // QueueStats Tests
447    // ========================================================================
448
449    #[test]
450    fn test_queue_stats_default() {
451        let stats = QueueStats::default();
452        assert_eq!(stats.total_pending, 0);
453        assert_eq!(stats.total_active, 0);
454        assert!(stats.lanes.is_empty());
455    }
456
457    #[test]
458    fn test_queue_stats_serialization() {
459        let mut lanes = std::collections::HashMap::new();
460        lanes.insert(
461            "query".to_string(),
462            crate::queue::LaneStatus {
463                pending: 5,
464                active: 2,
465                min: 1,
466                max: 10,
467            },
468        );
469
470        let stats = QueueStats {
471            total_pending: 5,
472            total_active: 2,
473            dead_letter_count: 0,
474            lanes,
475        };
476
477        let json = serde_json::to_string(&stats).unwrap();
478        let parsed: QueueStats = serde_json::from_str(&json).unwrap();
479        assert_eq!(parsed.total_pending, 5);
480        assert_eq!(parsed.total_active, 2);
481        assert_eq!(parsed.lanes["query"].pending, 5);
482        assert_eq!(parsed.lanes["query"].max, 10);
483    }
484
485    #[test]
486    fn test_queue_stats_clone() {
487        let stats = QueueStats {
488            total_pending: 10,
489            total_active: 3,
490            dead_letter_count: 0,
491            lanes: std::collections::HashMap::new(),
492        };
493        let cloned = stats.clone();
494        assert_eq!(cloned.total_pending, 10);
495        assert_eq!(cloned.total_active, 3);
496    }
497}