1use crate::queue::CommandQueue;
4use crate::QueueStats;
5use std::sync::Arc;
6use std::time::Duration;
7use tracing::{debug, warn};
8
9#[derive(Debug, Clone)]
11pub struct MonitorConfig {
12 pub interval: Duration,
14 pub pending_warning_threshold: usize,
16 pub active_warning_threshold: usize,
18}
19
20impl Default for MonitorConfig {
21 fn default() -> Self {
22 Self {
23 interval: Duration::from_secs(10),
24 pending_warning_threshold: 100,
25 active_warning_threshold: 50,
26 }
27 }
28}
29
30pub struct QueueMonitor {
32 queue: Arc<CommandQueue>,
33 config: MonitorConfig,
34}
35
36impl QueueMonitor {
37 pub fn new(queue: Arc<CommandQueue>) -> Self {
39 Self::with_config(queue, MonitorConfig::default())
40 }
41
42 pub fn with_config(queue: Arc<CommandQueue>, config: MonitorConfig) -> Self {
44 Self { queue, config }
45 }
46
47 pub async fn start(self: Arc<Self>) {
49 let mut ticker = tokio::time::interval(self.config.interval);
50
51 tokio::spawn(async move {
52 loop {
53 ticker.tick().await;
54 self.check_health().await;
55 }
56 });
57 }
58
59 async fn check_health(&self) {
61 let status = self.queue.status().await;
62
63 let mut total_pending = 0;
64 let mut total_active = 0;
65
66 for (lane_id, lane_status) in status.iter() {
67 total_pending += lane_status.pending;
68 total_active += lane_status.active;
69
70 debug!(
71 "Lane {}: pending={}, active={}, max={}",
72 lane_id, lane_status.pending, lane_status.active, lane_status.max
73 );
74
75 if lane_status.active >= lane_status.max {
77 warn!("Lane {} is at maximum capacity", lane_id);
78 }
79 }
80
81 if total_pending > self.config.pending_warning_threshold {
83 warn!(
84 "High number of pending commands: {} (threshold: {})",
85 total_pending, self.config.pending_warning_threshold
86 );
87 }
88
89 if total_active > self.config.active_warning_threshold {
90 warn!(
91 "High number of active commands: {} (threshold: {})",
92 total_active, self.config.active_warning_threshold
93 );
94 }
95 }
96
97 pub async fn stats(&self) -> QueueStats {
99 let lane_status = self.queue.status().await;
100
101 let mut total_pending = 0;
102 let mut total_active = 0;
103
104 for status in lane_status.values() {
105 total_pending += status.pending;
106 total_active += status.active;
107 }
108
109 let dead_letter_count = match self.queue.dlq() {
110 Some(dlq) => dlq.len().await,
111 None => 0,
112 };
113
114 QueueStats {
115 total_pending,
116 total_active,
117 dead_letter_count,
118 lanes: lane_status,
119 }
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use crate::config::LaneConfig;
127 use crate::error::Result;
128 use crate::event::EventEmitter;
129 use crate::queue::{lane_ids, priorities, Command, Lane};
130 use async_trait::async_trait;
131
132 async fn make_queue_with_lanes() -> Arc<CommandQueue> {
134 let emitter = EventEmitter::new(100);
135 let queue = Arc::new(CommandQueue::new(emitter));
136
137 let lanes = [
138 (lane_ids::SYSTEM, priorities::SYSTEM, 5),
139 (lane_ids::CONTROL, priorities::CONTROL, 3),
140 (lane_ids::QUERY, priorities::QUERY, 10),
141 (lane_ids::PROMPT, priorities::PROMPT, 2),
142 ];
143
144 for (id, priority, max) in lanes {
145 let config = LaneConfig::new(1, max);
146 queue
147 .register_lane(Arc::new(Lane::new(id, config, priority)))
148 .await;
149 }
150
151 queue
152 }
153
154 struct TestCommand {
155 result: serde_json::Value,
156 }
157
158 #[async_trait]
159 impl Command for TestCommand {
160 async fn execute(&self) -> Result<serde_json::Value> {
161 Ok(self.result.clone())
162 }
163 fn command_type(&self) -> &str {
164 "test"
165 }
166 }
167
168 #[test]
173 fn test_monitor_config_default() {
174 let config = MonitorConfig::default();
175 assert_eq!(config.interval, Duration::from_secs(10));
176 assert_eq!(config.pending_warning_threshold, 100);
177 assert_eq!(config.active_warning_threshold, 50);
178 }
179
180 #[test]
181 fn test_monitor_config_custom() {
182 let config = MonitorConfig {
183 interval: Duration::from_secs(5),
184 pending_warning_threshold: 50,
185 active_warning_threshold: 20,
186 };
187 assert_eq!(config.interval, Duration::from_secs(5));
188 assert_eq!(config.pending_warning_threshold, 50);
189 assert_eq!(config.active_warning_threshold, 20);
190 }
191
192 #[test]
193 fn test_monitor_config_clone() {
194 let config = MonitorConfig {
195 interval: Duration::from_millis(500),
196 pending_warning_threshold: 10,
197 active_warning_threshold: 5,
198 };
199 let cloned = config.clone();
200 assert_eq!(cloned.interval, Duration::from_millis(500));
201 assert_eq!(cloned.pending_warning_threshold, 10);
202 assert_eq!(cloned.active_warning_threshold, 5);
203 }
204
205 #[test]
206 fn test_monitor_config_debug() {
207 let config = MonitorConfig::default();
208 let debug_str = format!("{:?}", config);
209 assert!(debug_str.contains("MonitorConfig"));
210 assert!(debug_str.contains("interval"));
211 assert!(debug_str.contains("pending_warning_threshold"));
212 assert!(debug_str.contains("active_warning_threshold"));
213 }
214
215 #[tokio::test]
220 async fn test_monitor_new_default_config() {
221 let queue = make_queue_with_lanes().await;
222 let monitor = QueueMonitor::new(Arc::clone(&queue));
223 assert_eq!(monitor.config.interval, Duration::from_secs(10));
224 assert_eq!(monitor.config.pending_warning_threshold, 100);
225 assert_eq!(monitor.config.active_warning_threshold, 50);
226 }
227
228 #[tokio::test]
229 async fn test_monitor_with_config() {
230 let queue = make_queue_with_lanes().await;
231 let config = MonitorConfig {
232 interval: Duration::from_secs(1),
233 pending_warning_threshold: 10,
234 active_warning_threshold: 5,
235 };
236 let monitor = QueueMonitor::with_config(Arc::clone(&queue), config);
237 assert_eq!(monitor.config.interval, Duration::from_secs(1));
238 assert_eq!(monitor.config.pending_warning_threshold, 10);
239 assert_eq!(monitor.config.active_warning_threshold, 5);
240 }
241
242 #[tokio::test]
247 async fn test_monitor_stats_empty_queue() {
248 let queue = make_queue_with_lanes().await;
249 let monitor = QueueMonitor::new(queue);
250
251 let stats = monitor.stats().await;
252 assert_eq!(stats.total_pending, 0);
253 assert_eq!(stats.total_active, 0);
254 assert_eq!(stats.lanes.len(), 4);
255 }
256
257 #[tokio::test]
258 async fn test_monitor_stats_with_pending_commands() {
259 let queue = make_queue_with_lanes().await;
260
261 for _ in 0..3 {
263 let cmd = Box::new(TestCommand {
264 result: serde_json::json!({}),
265 });
266 let _ = queue.submit(lane_ids::QUERY, cmd).await;
267 }
268 for _ in 0..2 {
269 let cmd = Box::new(TestCommand {
270 result: serde_json::json!({}),
271 });
272 let _ = queue.submit(lane_ids::PROMPT, cmd).await;
273 }
274
275 let monitor = QueueMonitor::new(queue);
276 let stats = monitor.stats().await;
277
278 assert_eq!(stats.total_pending, 5);
279 assert_eq!(stats.total_active, 0);
280 assert_eq!(stats.lanes[lane_ids::QUERY].pending, 3);
281 assert_eq!(stats.lanes[lane_ids::PROMPT].pending, 2);
282 assert_eq!(stats.lanes[lane_ids::SYSTEM].pending, 0);
283 assert_eq!(stats.lanes[lane_ids::CONTROL].pending, 0);
284 }
285
286 #[tokio::test]
287 async fn test_monitor_stats_lane_concurrency() {
288 let queue = make_queue_with_lanes().await;
289 let monitor = QueueMonitor::new(queue);
290
291 let stats = monitor.stats().await;
292 assert_eq!(stats.lanes[lane_ids::SYSTEM].max, 5);
293 assert_eq!(stats.lanes[lane_ids::CONTROL].max, 3);
294 assert_eq!(stats.lanes[lane_ids::QUERY].max, 10);
295 assert_eq!(stats.lanes[lane_ids::PROMPT].max, 2);
296 }
297
298 #[tokio::test]
299 async fn test_monitor_stats_no_lanes() {
300 let emitter = EventEmitter::new(100);
301 let queue = Arc::new(CommandQueue::new(emitter));
302 let monitor = QueueMonitor::new(queue);
303
304 let stats = monitor.stats().await;
305 assert_eq!(stats.total_pending, 0);
306 assert_eq!(stats.total_active, 0);
307 assert!(stats.lanes.is_empty());
308 }
309
310 #[tokio::test]
315 async fn test_check_health_no_warnings() {
316 let queue = make_queue_with_lanes().await;
317 let monitor = QueueMonitor::new(queue);
318
319 monitor.check_health().await;
321 }
322
323 #[tokio::test]
324 async fn test_check_health_with_pending_below_threshold() {
325 let queue = make_queue_with_lanes().await;
326
327 for _ in 0..5 {
329 let cmd = Box::new(TestCommand {
330 result: serde_json::json!({}),
331 });
332 let _ = queue.submit(lane_ids::QUERY, cmd).await;
333 }
334
335 let monitor = QueueMonitor::new(queue);
336
337 monitor.check_health().await;
339 }
340
341 #[tokio::test]
342 async fn test_check_health_with_pending_above_threshold() {
343 let queue = make_queue_with_lanes().await;
344
345 let config = MonitorConfig {
347 interval: Duration::from_secs(10),
348 pending_warning_threshold: 3,
349 active_warning_threshold: 50,
350 };
351
352 for _ in 0..5 {
354 let cmd = Box::new(TestCommand {
355 result: serde_json::json!({}),
356 });
357 let _ = queue.submit(lane_ids::QUERY, cmd).await;
358 }
359
360 let monitor = QueueMonitor::with_config(queue, config);
361
362 monitor.check_health().await;
364 }
365
366 #[tokio::test]
367 async fn test_check_health_at_exact_threshold() {
368 let queue = make_queue_with_lanes().await;
369
370 let config = MonitorConfig {
371 interval: Duration::from_secs(10),
372 pending_warning_threshold: 3,
373 active_warning_threshold: 50,
374 };
375
376 for _ in 0..3 {
378 let cmd = Box::new(TestCommand {
379 result: serde_json::json!({}),
380 });
381 let _ = queue.submit(lane_ids::QUERY, cmd).await;
382 }
383
384 let monitor = QueueMonitor::with_config(queue, config);
385 monitor.check_health().await;
386 }
387
388 #[tokio::test]
393 async fn test_monitor_start_runs_periodically() {
394 let queue = make_queue_with_lanes().await;
395 let config = MonitorConfig {
396 interval: Duration::from_millis(30),
397 pending_warning_threshold: 100,
398 active_warning_threshold: 50,
399 };
400 let monitor = Arc::new(QueueMonitor::with_config(queue, config));
401
402 let monitor_ref = Arc::clone(&monitor);
404 monitor.start().await;
405
406 tokio::time::sleep(Duration::from_millis(100)).await;
408
409 let stats = monitor_ref.stats().await;
411 assert_eq!(stats.total_pending, 0);
412 assert_eq!(stats.total_active, 0);
413 }
414
415 #[tokio::test]
416 async fn test_monitor_start_with_commands() {
417 let queue = make_queue_with_lanes().await;
418
419 for _ in 0..3 {
421 let cmd = Box::new(TestCommand {
422 result: serde_json::json!({}),
423 });
424 let _ = queue.submit(lane_ids::QUERY, cmd).await;
425 }
426
427 let config = MonitorConfig {
428 interval: Duration::from_millis(20),
429 pending_warning_threshold: 2, active_warning_threshold: 50,
431 };
432 let monitor = Arc::new(QueueMonitor::with_config(queue, config));
433
434 let monitor_ref = Arc::clone(&monitor);
436 monitor.start().await;
437
438 tokio::time::sleep(Duration::from_millis(80)).await;
440
441 let stats = monitor_ref.stats().await;
442 assert_eq!(stats.total_pending, 3);
443 }
444
445 #[test]
450 fn test_queue_stats_default() {
451 let stats = QueueStats::default();
452 assert_eq!(stats.total_pending, 0);
453 assert_eq!(stats.total_active, 0);
454 assert!(stats.lanes.is_empty());
455 }
456
457 #[test]
458 fn test_queue_stats_serialization() {
459 let mut lanes = std::collections::HashMap::new();
460 lanes.insert(
461 "query".to_string(),
462 crate::queue::LaneStatus {
463 pending: 5,
464 active: 2,
465 min: 1,
466 max: 10,
467 },
468 );
469
470 let stats = QueueStats {
471 total_pending: 5,
472 total_active: 2,
473 dead_letter_count: 0,
474 lanes,
475 };
476
477 let json = serde_json::to_string(&stats).unwrap();
478 let parsed: QueueStats = serde_json::from_str(&json).unwrap();
479 assert_eq!(parsed.total_pending, 5);
480 assert_eq!(parsed.total_active, 2);
481 assert_eq!(parsed.lanes["query"].pending, 5);
482 assert_eq!(parsed.lanes["query"].max, 10);
483 }
484
485 #[test]
486 fn test_queue_stats_clone() {
487 let stats = QueueStats {
488 total_pending: 10,
489 total_active: 3,
490 dead_letter_count: 0,
491 lanes: std::collections::HashMap::new(),
492 };
493 let cloned = stats.clone();
494 assert_eq!(cloned.total_pending, 10);
495 assert_eq!(cloned.total_active, 3);
496 }
497}