1use crate::agent::AgentEvent;
15use crate::queue::SessionLane;
16use crate::queue::{
17 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionCommand, SessionQueueConfig,
18 TaskHandlerMode,
19};
20use a3s_lane::{
21 AlertManager, Command as LaneCommand, DeadLetter, EventEmitter, LaneConfig, LaneError,
22 LocalStorage, MetricsSnapshot, PriorityBoostConfig, QueueManager, QueueManagerBuilder,
23 QueueMetrics, RateLimitConfig, Result as LaneResult, RetryPolicy,
24};
25use anyhow::Result;
26use async_trait::async_trait;
27use serde_json::Value;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31use tokio::sync::{broadcast, oneshot, RwLock};
32
33impl SessionLane {
38 fn lane_id(self) -> &'static str {
40 match self {
41 SessionLane::Control => "control",
42 SessionLane::Query => "query",
43 SessionLane::Execute => "skill",
44 SessionLane::Generate => "prompt",
45 }
46 }
47
48 fn lane_config(self) -> LaneConfig {
50 match self {
51 SessionLane::Control => LaneConfig::new(1, 2),
52 SessionLane::Query => LaneConfig::new(1, 4),
53 SessionLane::Execute => LaneConfig::new(1, 2),
54 SessionLane::Generate => LaneConfig::new(1, 1),
55 }
56 }
57
58 fn lane_priority(self) -> u8 {
60 match self {
61 SessionLane::Control => 1,
62 SessionLane::Query => 2,
63 SessionLane::Execute => 4,
64 SessionLane::Generate => 5,
65 }
66 }
67}
68
69struct PendingExternalTask {
75 task: ExternalTask,
76 result_tx: oneshot::Sender<Result<Value>>,
77}
78
79pub struct SessionCommandAdapter {
85 inner: Box<dyn SessionCommand>,
86 task_id: String,
87 handler_mode: TaskHandlerMode,
88 session_id: String,
89 lane: SessionLane,
90 timeout_ms: u64,
91 external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
92 event_tx: broadcast::Sender<AgentEvent>,
93}
94
95impl SessionCommandAdapter {
96 #[allow(clippy::too_many_arguments)]
97 fn new(
98 inner: Box<dyn SessionCommand>,
99 task_id: String,
100 handler_mode: TaskHandlerMode,
101 session_id: String,
102 lane: SessionLane,
103 timeout_ms: u64,
104 external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
105 event_tx: broadcast::Sender<AgentEvent>,
106 ) -> Self {
107 Self {
108 inner,
109 task_id,
110 handler_mode,
111 session_id,
112 lane,
113 timeout_ms,
114 external_tasks,
115 event_tx,
116 }
117 }
118
119 async fn register_and_wait(&self) -> LaneResult<Value> {
121 let (tx, rx) = oneshot::channel();
122 let task = ExternalTask {
123 task_id: self.task_id.clone(),
124 session_id: self.session_id.clone(),
125 lane: self.lane,
126 command_type: self.inner.command_type().to_string(),
127 payload: self.inner.payload(),
128 timeout_ms: self.timeout_ms,
129 created_at: Some(Instant::now()),
130 };
131 {
132 let mut tasks = self.external_tasks.write().await;
133 tasks.insert(
134 self.task_id.clone(),
135 PendingExternalTask {
136 task: task.clone(),
137 result_tx: tx,
138 },
139 );
140 }
141 let _ = self.event_tx.send(AgentEvent::ExternalTaskPending {
142 task_id: task.task_id.clone(),
143 session_id: task.session_id.clone(),
144 lane: task.lane,
145 command_type: task.command_type.clone(),
146 payload: task.payload.clone(),
147 timeout_ms: task.timeout_ms,
148 });
149 match tokio::time::timeout(Duration::from_millis(self.timeout_ms), rx).await {
150 Ok(Ok(result)) => result.map_err(|e| LaneError::CommandError(e.to_string())),
151 Ok(Err(_)) => Err(LaneError::CommandError("Channel closed".to_string())),
152 Err(_) => {
153 let mut tasks = self.external_tasks.write().await;
154 tasks.remove(&self.task_id);
155 Err(LaneError::Timeout(Duration::from_millis(self.timeout_ms)))
156 }
157 }
158 }
159
160 async fn execute_with_notification(&self) -> LaneResult<Value> {
162 let task = ExternalTask {
163 task_id: self.task_id.clone(),
164 session_id: self.session_id.clone(),
165 lane: self.lane,
166 command_type: self.inner.command_type().to_string(),
167 payload: self.inner.payload(),
168 timeout_ms: self.timeout_ms,
169 created_at: Some(Instant::now()),
170 };
171 let _ = self.event_tx.send(AgentEvent::ExternalTaskPending {
172 task_id: task.task_id.clone(),
173 session_id: task.session_id.clone(),
174 lane: task.lane,
175 command_type: task.command_type.clone(),
176 payload: task.payload.clone(),
177 timeout_ms: task.timeout_ms,
178 });
179 let result = self
180 .inner
181 .execute()
182 .await
183 .map_err(|e| LaneError::CommandError(e.to_string()));
184 let _ = self.event_tx.send(AgentEvent::ExternalTaskCompleted {
185 task_id: self.task_id.clone(),
186 session_id: self.session_id.clone(),
187 success: result.is_ok(),
188 });
189 result
190 }
191}
192
193#[async_trait]
194impl LaneCommand for SessionCommandAdapter {
195 async fn execute(&self) -> LaneResult<Value> {
196 match self.handler_mode {
197 TaskHandlerMode::Internal => self
198 .inner
199 .execute()
200 .await
201 .map_err(|e| LaneError::CommandError(e.to_string())),
202 TaskHandlerMode::External => self.register_and_wait().await,
203 TaskHandlerMode::Hybrid => self.execute_with_notification().await,
204 }
205 }
206 fn command_type(&self) -> &str {
207 self.inner.command_type()
208 }
209}
210
211pub struct EventBridge {
217 event_tx: broadcast::Sender<AgentEvent>,
218}
219
220impl EventBridge {
221 pub fn new(event_tx: broadcast::Sender<AgentEvent>) -> Self {
222 Self { event_tx }
223 }
224
225 pub fn emit_dead_letter(&self, dead_letter: &DeadLetter) {
226 let _ = self.event_tx.send(AgentEvent::CommandDeadLettered {
227 command_id: dead_letter.command_id.clone(),
228 command_type: dead_letter.command_type.clone(),
229 lane: dead_letter.lane_id.clone(),
230 error: dead_letter.error.clone(),
231 attempts: dead_letter.attempts,
232 });
233 }
234
235 pub fn emit_retry(
236 &self,
237 command_id: &str,
238 command_type: &str,
239 lane: &str,
240 attempt: u32,
241 delay_ms: u64,
242 ) {
243 let _ = self.event_tx.send(AgentEvent::CommandRetry {
244 command_id: command_id.to_string(),
245 command_type: command_type.to_string(),
246 lane: lane.to_string(),
247 attempt,
248 delay_ms,
249 });
250 }
251
252 pub fn emit_alert(&self, level: &str, alert_type: &str, message: &str) {
253 let _ = self.event_tx.send(AgentEvent::QueueAlert {
254 level: level.to_string(),
255 alert_type: alert_type.to_string(),
256 message: message.to_string(),
257 });
258 }
259}
260
261pub struct SessionLaneQueue {
267 session_id: String,
268 manager: Arc<QueueManager>,
269 metrics: Option<QueueMetrics>,
270 external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
271 lane_handlers: Arc<RwLock<HashMap<SessionLane, LaneHandlerConfig>>>,
272 event_tx: broadcast::Sender<AgentEvent>,
273 event_bridge: Arc<EventBridge>,
274}
275
276impl SessionLaneQueue {
277 pub async fn new(
279 session_id: &str,
280 config: SessionQueueConfig,
281 event_tx: broadcast::Sender<AgentEvent>,
282 ) -> Result<Self> {
283 let (manager, metrics) = Self::build_queue_manager(&config).await?;
284 let mut lane_handlers = HashMap::new();
285 for lane in [
286 SessionLane::Control,
287 SessionLane::Query,
288 SessionLane::Execute,
289 SessionLane::Generate,
290 ] {
291 lane_handlers.insert(lane, config.handler_config(lane));
292 }
293 let event_bridge = Arc::new(EventBridge::new(event_tx.clone()));
294 Ok(Self {
295 session_id: session_id.to_string(),
296 manager: Arc::new(manager),
297 metrics,
298 external_tasks: Arc::new(RwLock::new(HashMap::new())),
299 lane_handlers: Arc::new(RwLock::new(lane_handlers)),
300 event_tx,
301 event_bridge,
302 })
303 }
304
305 async fn build_queue_manager(
307 config: &SessionQueueConfig,
308 ) -> Result<(QueueManager, Option<QueueMetrics>)> {
309 let emitter = EventEmitter::new(100);
310 let mut builder = QueueManagerBuilder::new(emitter);
311 let default_timeout = config.default_timeout_ms.map(Duration::from_millis);
312 let default_retry = Some(RetryPolicy::exponential(3));
313
314 for lane in [
315 SessionLane::Control,
316 SessionLane::Query,
317 SessionLane::Execute,
318 SessionLane::Generate,
319 ] {
320 let mut cfg = lane.lane_config();
321 if let Some(timeout) = default_timeout {
322 cfg = cfg.with_timeout(timeout);
323 }
324 if let Some(ref retry) = default_retry {
325 cfg = cfg.with_retry_policy(retry.clone());
326 }
327 if lane == SessionLane::Generate {
328 cfg = cfg.with_rate_limit(RateLimitConfig::per_minute(60));
329 cfg = cfg
330 .with_priority_boost(PriorityBoostConfig::standard(Duration::from_secs(300)));
331 }
332 builder = builder.with_lane(lane.lane_id(), cfg, lane.lane_priority());
333 }
334
335 if config.enable_dlq {
336 builder = builder.with_dlq(config.dlq_max_size.unwrap_or(1000));
337 }
338
339 let metrics = if config.enable_metrics {
340 let m = QueueMetrics::local();
341 builder = builder.with_metrics(m.clone());
342 Some(m)
343 } else {
344 None
345 };
346
347 if config.enable_alerts {
348 builder = builder.with_alerts(Arc::new(AlertManager::with_queue_depth_alerts(50, 100)));
349 }
350
351 if let Some(ref storage_path) = config.storage_path {
352 builder = builder.with_storage(Arc::new(
353 LocalStorage::new(storage_path.to_path_buf()).await?,
354 ));
355 }
356
357 let manager = builder.build().await?;
358 Ok((manager, metrics))
359 }
360
361 pub async fn start(&self) -> Result<()> {
362 self.manager.start().await
363 }
364 pub async fn stop(&self) {
365 self.manager.shutdown().await;
366 }
367
368 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
369 self.lane_handlers.write().await.insert(lane, config);
370 }
371
372 pub async fn get_lane_handler(&self, lane: SessionLane) -> LaneHandlerConfig {
373 self.lane_handlers
374 .read()
375 .await
376 .get(&lane)
377 .cloned()
378 .unwrap_or_default()
379 }
380
381 pub async fn submit(
383 &self,
384 lane: SessionLane,
385 command: Box<dyn SessionCommand>,
386 ) -> oneshot::Receiver<Result<Value>> {
387 let (result_tx, result_rx) = oneshot::channel();
388 let handler_config = self.get_lane_handler(lane).await;
389 let task_id = uuid::Uuid::new_v4().to_string();
390 let adapter = SessionCommandAdapter::new(
391 command,
392 task_id,
393 handler_config.mode,
394 self.session_id.clone(),
395 lane,
396 handler_config.timeout_ms,
397 Arc::clone(&self.external_tasks),
398 self.event_tx.clone(),
399 );
400 match self.manager.submit(lane.lane_id(), Box::new(adapter)).await {
401 Ok(lane_rx) => {
402 tokio::spawn(async move {
403 match lane_rx.await {
404 Ok(Ok(value)) => {
405 let _ = result_tx.send(Ok(value));
406 }
407 Ok(Err(e)) => {
408 let _ = result_tx.send(Err(anyhow::anyhow!("{}", e)));
409 }
410 Err(_) => {
411 let _ = result_tx.send(Err(anyhow::anyhow!("Channel closed")));
412 }
413 }
414 });
415 }
416 Err(e) => {
417 let _ = result_tx.send(Err(e.into()));
418 }
419 }
420 result_rx
421 }
422
423 pub async fn submit_by_tool(
424 &self,
425 tool_name: &str,
426 command: Box<dyn SessionCommand>,
427 ) -> oneshot::Receiver<Result<Value>> {
428 self.submit(SessionLane::from_tool_name(tool_name), command)
429 .await
430 }
431
432 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
433 let pending = { self.external_tasks.write().await.remove(task_id) };
434 if let Some(pending) = pending {
435 let _ = self.event_tx.send(AgentEvent::ExternalTaskCompleted {
436 task_id: task_id.to_string(),
437 session_id: self.session_id.clone(),
438 success: result.success,
439 });
440 let final_result = if result.success {
441 Ok(result.result)
442 } else {
443 Err(anyhow::anyhow!(result
444 .error
445 .unwrap_or_else(|| "External task failed".to_string())))
446 };
447 let _ = pending.result_tx.send(final_result);
448 true
449 } else {
450 false
451 }
452 }
453
454 pub async fn stats(&self) -> crate::queue::SessionQueueStats {
455 let lane_stats = self.manager.stats().await.ok();
456 let external_tasks = self.external_tasks.read().await;
457 let mut total_pending = 0;
458 let mut total_active = 0;
459 let mut lanes = HashMap::new();
460 if let Some(stats) = lane_stats {
461 for (lane_id, lane_stat) in stats.lanes {
462 total_pending += lane_stat.pending;
463 total_active += lane_stat.active;
464 let session_lane = match lane_id.as_str() {
465 "control" => SessionLane::Control,
466 "query" => SessionLane::Query,
467 "skill" => SessionLane::Execute,
468 "prompt" => SessionLane::Generate,
469 _ => continue,
470 };
471 let handler_mode = self.get_lane_handler(session_lane).await.mode;
472 lanes.insert(
473 format!("{:?}", session_lane),
474 crate::queue::LaneStatus {
475 lane: session_lane,
476 pending: lane_stat.pending,
477 active: lane_stat.active,
478 max_concurrency: lane_stat.max,
479 handler_mode,
480 },
481 );
482 }
483 }
484 crate::queue::SessionQueueStats {
485 total_pending,
486 total_active,
487 external_pending: external_tasks.len(),
488 lanes,
489 }
490 }
491
492 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
493 self.external_tasks
494 .read()
495 .await
496 .values()
497 .map(|p| p.task.clone())
498 .collect()
499 }
500
501 pub fn session_id(&self) -> &str {
502 &self.session_id
503 }
504
505 pub fn event_bridge(&self) -> &EventBridge {
507 &self.event_bridge
508 }
509
510 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
511 if let Some(dlq) = self.manager.queue().dlq() {
512 dlq.list().await
513 } else {
514 Vec::new()
515 }
516 }
517
518 pub async fn metrics_snapshot(&self) -> Option<MetricsSnapshot> {
519 if let Some(ref m) = self.metrics {
520 Some(m.snapshot().await)
521 } else {
522 None
523 }
524 }
525
526 pub async fn drain(&self, timeout: Duration) -> Result<()> {
527 Ok(self.manager.drain(timeout).await?)
528 }
529 pub fn is_shutting_down(&self) -> bool {
530 self.manager.is_shutting_down()
531 }
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537 use crate::queue::SessionCommand;
538
539 struct TestCommand {
540 value: Value,
541 }
542
543 #[async_trait]
544 impl SessionCommand for TestCommand {
545 async fn execute(&self) -> Result<Value> {
546 Ok(self.value.clone())
547 }
548 fn command_type(&self) -> &str {
549 "test"
550 }
551 fn payload(&self) -> Value {
552 self.value.clone()
553 }
554 }
555
556 #[tokio::test]
557 async fn test_session_lane_queue_creation() {
558 let (tx, _) = broadcast::channel(100);
559 let q = SessionLaneQueue::new("test-session", SessionQueueConfig::default(), tx)
560 .await
561 .unwrap();
562 assert_eq!(q.session_id(), "test-session");
563 }
564
565 #[tokio::test]
566 async fn test_submit_and_execute() {
567 let (tx, _) = broadcast::channel(100);
568 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
569 .await
570 .unwrap();
571 q.start().await.unwrap();
572 let cmd = Box::new(TestCommand {
573 value: serde_json::json!({"result": "success"}),
574 });
575 let rx = q.submit(SessionLane::Query, cmd).await;
576 let result = tokio::time::timeout(Duration::from_secs(2), rx)
577 .await
578 .unwrap()
579 .unwrap();
580 assert_eq!(result.unwrap()["result"], "success");
581 q.stop().await;
582 }
583
584 #[tokio::test]
585 async fn test_stats() {
586 let (tx, _) = broadcast::channel(100);
587 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
588 .await
589 .unwrap();
590 q.start().await.unwrap();
591 let stats = q.stats().await;
592 assert_eq!(stats.total_pending, 0);
593 assert_eq!(stats.total_active, 0);
594 assert_eq!(stats.external_pending, 0);
595 q.stop().await;
596 }
597
598 #[tokio::test]
599 async fn test_lane_handler_config() {
600 let (tx, _) = broadcast::channel(100);
601 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
602 .await
603 .unwrap();
604 assert_eq!(
605 q.get_lane_handler(SessionLane::Execute).await.mode,
606 TaskHandlerMode::Internal
607 );
608 q.set_lane_handler(
609 SessionLane::Execute,
610 LaneHandlerConfig {
611 mode: TaskHandlerMode::External,
612 timeout_ms: 30000,
613 },
614 )
615 .await;
616 let h = q.get_lane_handler(SessionLane::Execute).await;
617 assert_eq!(h.mode, TaskHandlerMode::External);
618 assert_eq!(h.timeout_ms, 30000);
619 }
620
621 #[tokio::test]
622 async fn test_submit_by_tool() {
623 let (tx, _) = broadcast::channel(100);
624 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
625 .await
626 .unwrap();
627 q.start().await.unwrap();
628 let cmd = Box::new(TestCommand {
629 value: serde_json::json!({"tool": "read"}),
630 });
631 let rx = q.submit_by_tool("read", cmd).await;
632 let result = tokio::time::timeout(Duration::from_secs(2), rx)
633 .await
634 .unwrap()
635 .unwrap();
636 assert_eq!(result.unwrap()["tool"], "read");
637 q.stop().await;
638 }
639
640 #[tokio::test]
641 async fn test_dead_letters_empty() {
642 let (tx, _) = broadcast::channel(100);
643 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
644 .await
645 .unwrap();
646 assert!(q.dead_letters().await.is_empty());
647 }
648
649 #[tokio::test]
650 async fn test_metrics_snapshot() {
651 let (tx, _) = broadcast::channel(100);
652 let cfg = SessionQueueConfig {
653 enable_metrics: true,
654 ..Default::default()
655 };
656 let q = SessionLaneQueue::new("s", cfg, tx).await.unwrap();
657 q.start().await.unwrap();
658 assert!(q.metrics_snapshot().await.is_some());
659 q.stop().await;
660 }
661
662 #[tokio::test]
663 async fn test_is_shutting_down() {
664 let (tx, _) = broadcast::channel(100);
665 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
666 .await
667 .unwrap();
668 assert!(!q.is_shutting_down());
669 q.stop().await;
670 assert!(q.is_shutting_down());
671 }
672
673 #[tokio::test]
674 async fn test_pending_external_tasks_empty() {
675 let (tx, _) = broadcast::channel(100);
676 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
677 .await
678 .unwrap();
679 assert!(q.pending_external_tasks().await.is_empty());
680 }
681
682 #[tokio::test]
683 async fn test_complete_external_task_nonexistent() {
684 let (tx, _) = broadcast::channel(100);
685 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
686 .await
687 .unwrap();
688 let r = ExternalTaskResult {
689 success: true,
690 result: serde_json::json!("ok"),
691 error: None,
692 };
693 assert!(!q.complete_external_task("nope", r).await);
694 }
695
696 #[test]
697 fn test_command_payload() {
698 let cmd = TestCommand {
699 value: serde_json::json!({"k": "v"}),
700 };
701 assert_eq!(cmd.payload(), serde_json::json!({"k": "v"}));
702 assert_eq!(cmd.command_type(), "test");
703 }
704
705 #[test]
706 fn test_event_bridge_new() {
707 let (tx, _) = broadcast::channel(100);
708 let _b = EventBridge::new(tx);
709 }
711
712 #[test]
713 fn test_event_bridge_emit_dead_letter() {
714 let (tx, mut rx) = broadcast::channel(100);
715 let b = EventBridge::new(tx);
716 b.emit_dead_letter(&DeadLetter {
717 command_id: "c1".to_string(),
718 command_type: "t".to_string(),
719 lane_id: "control".to_string(),
720 error: "err".to_string(),
721 attempts: 3,
722 failed_at: chrono::Utc::now(),
723 });
724 match rx.try_recv().unwrap() {
725 AgentEvent::CommandDeadLettered {
726 command_id,
727 attempts,
728 ..
729 } => {
730 assert_eq!(command_id, "c1");
731 assert_eq!(attempts, 3);
732 }
733 _ => panic!("wrong event"),
734 }
735 }
736
737 #[test]
738 fn test_event_bridge_emit_retry() {
739 let (tx, mut rx) = broadcast::channel(100);
740 let b = EventBridge::new(tx);
741 b.emit_retry("c1", "t", "query", 2, 1000);
742 match rx.try_recv().unwrap() {
743 AgentEvent::CommandRetry {
744 attempt, delay_ms, ..
745 } => {
746 assert_eq!(attempt, 2);
747 assert_eq!(delay_ms, 1000);
748 }
749 _ => panic!("wrong event"),
750 }
751 }
752
753 #[test]
754 fn test_event_bridge_emit_alert() {
755 let (tx, mut rx) = broadcast::channel(100);
756 let b = EventBridge::new(tx);
757 b.emit_alert("warning", "queue_full", "at capacity");
758 match rx.try_recv().unwrap() {
759 AgentEvent::QueueAlert { level, .. } => assert_eq!(level, "warning"),
760 _ => panic!("wrong event"),
761 }
762 }
763
764 #[test]
765 fn test_lane_mapping() {
766 assert_eq!(SessionLane::Control.lane_id(), "control");
767 assert_eq!(SessionLane::Query.lane_id(), "query");
768 assert_eq!(SessionLane::Execute.lane_id(), "skill");
769 assert_eq!(SessionLane::Generate.lane_id(), "prompt");
770 }
771
772 #[test]
773 fn test_lane_priority() {
774 assert!(SessionLane::Control.lane_priority() < SessionLane::Query.lane_priority());
775 assert!(SessionLane::Query.lane_priority() < SessionLane::Execute.lane_priority());
776 assert!(SessionLane::Execute.lane_priority() < SessionLane::Generate.lane_priority());
777 }
778
779 #[test]
780 fn test_lane_config() {
781 assert_eq!(SessionLane::Control.lane_config().max_concurrency, 2);
782 assert_eq!(SessionLane::Query.lane_config().max_concurrency, 4);
783 assert_eq!(SessionLane::Generate.lane_config().max_concurrency, 1);
784 }
785
786 #[tokio::test]
787 async fn test_build_queue_manager_default() {
788 let (_, metrics) = SessionLaneQueue::build_queue_manager(&SessionQueueConfig::default())
789 .await
790 .unwrap();
791 assert!(metrics.is_none());
792 }
793
794 #[tokio::test]
795 async fn test_build_queue_manager_with_metrics() {
796 let cfg = SessionQueueConfig {
797 enable_metrics: true,
798 ..Default::default()
799 };
800 let (_, metrics) = SessionLaneQueue::build_queue_manager(&cfg).await.unwrap();
801 assert!(metrics.is_some());
802 }
803
804 #[tokio::test]
805 async fn test_build_queue_manager_with_dlq() {
806 let cfg = SessionQueueConfig {
807 enable_dlq: true,
808 dlq_max_size: Some(500),
809 ..Default::default()
810 };
811 assert!(SessionLaneQueue::build_queue_manager(&cfg).await.is_ok());
812 }
813}