1use crate::agent::AgentEvent;
15use crate::queue::SessionLane;
16use crate::queue::{
17 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionCommand, SessionQueueConfig,
18 TaskHandlerMode,
19};
20use a3s_lane::{
21 AlertManager, Command as LaneCommand, DeadLetter, EventEmitter, LaneConfig, LaneError,
22 LocalStorage, MetricsSnapshot, PriorityBoostConfig, QueueManager, QueueManagerBuilder,
23 QueueMetrics, RateLimitConfig, Result as LaneResult, RetryPolicy,
24};
25use anyhow::Result;
26use async_trait::async_trait;
27use serde_json::Value;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31use tokio::sync::{broadcast, oneshot, RwLock};
32
33impl SessionLane {
38 fn lane_id(self) -> &'static str {
40 match self {
41 SessionLane::Control => "control",
42 SessionLane::Query => "query",
43 SessionLane::Execute => "skill",
44 SessionLane::Generate => "prompt",
45 }
46 }
47
48 fn lane_priority(self) -> u8 {
50 match self {
51 SessionLane::Control => 1,
52 SessionLane::Query => 2,
53 SessionLane::Execute => 4,
54 SessionLane::Generate => 5,
55 }
56 }
57}
58
59struct PendingExternalTask {
65 task: ExternalTask,
66 result_tx: oneshot::Sender<Result<Value>>,
67}
68
69pub struct SessionCommandAdapter {
75 inner: Box<dyn SessionCommand>,
76 task_id: String,
77 handler_mode: TaskHandlerMode,
78 session_id: String,
79 lane: SessionLane,
80 timeout_ms: u64,
81 external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
82 event_tx: broadcast::Sender<AgentEvent>,
83}
84
85impl SessionCommandAdapter {
86 #[allow(clippy::too_many_arguments)]
87 fn new(
88 inner: Box<dyn SessionCommand>,
89 task_id: String,
90 handler_mode: TaskHandlerMode,
91 session_id: String,
92 lane: SessionLane,
93 timeout_ms: u64,
94 external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
95 event_tx: broadcast::Sender<AgentEvent>,
96 ) -> Self {
97 Self {
98 inner,
99 task_id,
100 handler_mode,
101 session_id,
102 lane,
103 timeout_ms,
104 external_tasks,
105 event_tx,
106 }
107 }
108
109 async fn register_and_wait(&self) -> LaneResult<Value> {
111 let (tx, rx) = oneshot::channel();
112 let task = ExternalTask {
113 task_id: self.task_id.clone(),
114 session_id: self.session_id.clone(),
115 lane: self.lane,
116 command_type: self.inner.command_type().to_string(),
117 payload: self.inner.payload(),
118 timeout_ms: self.timeout_ms,
119 created_at: Some(Instant::now()),
120 };
121 {
122 let mut tasks = self.external_tasks.write().await;
123 tasks.insert(
124 self.task_id.clone(),
125 PendingExternalTask {
126 task: task.clone(),
127 result_tx: tx,
128 },
129 );
130 }
131 let _ = self.event_tx.send(AgentEvent::ExternalTaskPending {
132 task_id: task.task_id.clone(),
133 session_id: task.session_id.clone(),
134 lane: task.lane,
135 command_type: task.command_type.clone(),
136 payload: task.payload.clone(),
137 timeout_ms: task.timeout_ms,
138 });
139 match tokio::time::timeout(Duration::from_millis(self.timeout_ms), rx).await {
140 Ok(Ok(result)) => result.map_err(|e| LaneError::CommandError(e.to_string())),
141 Ok(Err(_)) => Err(LaneError::CommandError("Channel closed".to_string())),
142 Err(_) => {
143 let mut tasks = self.external_tasks.write().await;
144 tasks.remove(&self.task_id);
145 Err(LaneError::Timeout(Duration::from_millis(self.timeout_ms)))
146 }
147 }
148 }
149
150 async fn execute_with_notification(&self) -> LaneResult<Value> {
152 let task = ExternalTask {
153 task_id: self.task_id.clone(),
154 session_id: self.session_id.clone(),
155 lane: self.lane,
156 command_type: self.inner.command_type().to_string(),
157 payload: self.inner.payload(),
158 timeout_ms: self.timeout_ms,
159 created_at: Some(Instant::now()),
160 };
161 let _ = self.event_tx.send(AgentEvent::ExternalTaskPending {
162 task_id: task.task_id.clone(),
163 session_id: task.session_id.clone(),
164 lane: task.lane,
165 command_type: task.command_type.clone(),
166 payload: task.payload.clone(),
167 timeout_ms: task.timeout_ms,
168 });
169 let result = self
170 .inner
171 .execute()
172 .await
173 .map_err(|e| LaneError::CommandError(e.to_string()));
174 let _ = self.event_tx.send(AgentEvent::ExternalTaskCompleted {
175 task_id: self.task_id.clone(),
176 session_id: self.session_id.clone(),
177 success: result.is_ok(),
178 });
179 result
180 }
181}
182
183#[async_trait]
184impl LaneCommand for SessionCommandAdapter {
185 async fn execute(&self) -> LaneResult<Value> {
186 match self.handler_mode {
187 TaskHandlerMode::Internal => self
188 .inner
189 .execute()
190 .await
191 .map_err(|e| LaneError::CommandError(e.to_string())),
192 TaskHandlerMode::External => self.register_and_wait().await,
193 TaskHandlerMode::Hybrid => self.execute_with_notification().await,
194 }
195 }
196 fn command_type(&self) -> &str {
197 self.inner.command_type()
198 }
199}
200
201pub struct EventBridge {
207 event_tx: broadcast::Sender<AgentEvent>,
208}
209
210impl EventBridge {
211 pub fn new(event_tx: broadcast::Sender<AgentEvent>) -> Self {
212 Self { event_tx }
213 }
214
215 pub fn emit_dead_letter(&self, dead_letter: &DeadLetter) {
216 let _ = self.event_tx.send(AgentEvent::CommandDeadLettered {
217 command_id: dead_letter.command_id.clone(),
218 command_type: dead_letter.command_type.clone(),
219 lane: dead_letter.lane_id.clone(),
220 error: dead_letter.error.clone(),
221 attempts: dead_letter.attempts,
222 });
223 }
224
225 pub fn emit_retry(
226 &self,
227 command_id: &str,
228 command_type: &str,
229 lane: &str,
230 attempt: u32,
231 delay_ms: u64,
232 ) {
233 let _ = self.event_tx.send(AgentEvent::CommandRetry {
234 command_id: command_id.to_string(),
235 command_type: command_type.to_string(),
236 lane: lane.to_string(),
237 attempt,
238 delay_ms,
239 });
240 }
241
242 pub fn emit_alert(&self, level: &str, alert_type: &str, message: &str) {
243 let _ = self.event_tx.send(AgentEvent::QueueAlert {
244 level: level.to_string(),
245 alert_type: alert_type.to_string(),
246 message: message.to_string(),
247 });
248 }
249}
250
251pub struct SessionLaneQueue {
257 session_id: String,
258 manager: Arc<QueueManager>,
259 metrics: Option<QueueMetrics>,
260 external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
261 lane_handlers: Arc<RwLock<HashMap<SessionLane, LaneHandlerConfig>>>,
262 event_tx: broadcast::Sender<AgentEvent>,
263 event_bridge: Arc<EventBridge>,
264 task_id_counter: Arc<std::sync::atomic::AtomicU64>, }
266
267impl SessionLaneQueue {
268 pub async fn new(
270 session_id: &str,
271 config: SessionQueueConfig,
272 event_tx: broadcast::Sender<AgentEvent>,
273 ) -> Result<Self> {
274 let (manager, metrics) = Self::build_queue_manager(&config).await?;
275 let mut lane_handlers = HashMap::new();
276 for lane in [
277 SessionLane::Control,
278 SessionLane::Query,
279 SessionLane::Execute,
280 SessionLane::Generate,
281 ] {
282 lane_handlers.insert(lane, config.handler_config(lane));
283 }
284 let event_bridge = Arc::new(EventBridge::new(event_tx.clone()));
285 Ok(Self {
286 session_id: session_id.to_string(),
287 manager: Arc::new(manager),
288 metrics,
289 external_tasks: Arc::new(RwLock::new(HashMap::new())),
290 lane_handlers: Arc::new(RwLock::new(lane_handlers)),
291 event_tx,
292 event_bridge,
293 task_id_counter: Arc::new(std::sync::atomic::AtomicU64::new(1)),
294 })
295 }
296
297 async fn build_queue_manager(
299 config: &SessionQueueConfig,
300 ) -> Result<(QueueManager, Option<QueueMetrics>)> {
301 let emitter = EventEmitter::new(100);
302 let mut builder = QueueManagerBuilder::new(emitter);
303 let default_timeout = config.default_timeout_ms.map(Duration::from_millis);
304 let default_retry = Some(RetryPolicy::exponential(3));
305
306 for lane in [
307 SessionLane::Control,
308 SessionLane::Query,
309 SessionLane::Execute,
310 SessionLane::Generate,
311 ] {
312 let max_concurrency = match lane {
314 SessionLane::Control => config.control_max_concurrency,
315 SessionLane::Query => config.query_max_concurrency,
316 SessionLane::Execute => config.execute_max_concurrency,
317 SessionLane::Generate => config.generate_max_concurrency,
318 };
319
320 let mut cfg = LaneConfig::new(1, max_concurrency);
322
323 if let Some(timeout) = default_timeout {
324 cfg = cfg.with_timeout(timeout);
325 }
326 if let Some(ref retry) = default_retry {
327 cfg = cfg.with_retry_policy(retry.clone());
328 }
329 if lane == SessionLane::Generate {
330 cfg = cfg.with_rate_limit(RateLimitConfig::per_minute(60));
331 cfg = cfg
332 .with_priority_boost(PriorityBoostConfig::standard(Duration::from_secs(300)));
333 }
334 builder = builder.with_lane(lane.lane_id(), cfg, lane.lane_priority());
335 }
336
337 if config.enable_dlq {
338 builder = builder.with_dlq(config.dlq_max_size.unwrap_or(1000));
339 }
340
341 let metrics = if config.enable_metrics {
342 let m = QueueMetrics::local();
343 builder = builder.with_metrics(m.clone());
344 Some(m)
345 } else {
346 None
347 };
348
349 if config.enable_alerts {
350 builder = builder.with_alerts(Arc::new(AlertManager::with_queue_depth_alerts(50, 100)));
351 }
352
353 if let Some(ref storage_path) = config.storage_path {
354 builder = builder.with_storage(Arc::new(
355 LocalStorage::new(storage_path.to_path_buf()).await?,
356 ));
357 }
358
359 let manager = builder.build().await?;
360 Ok((manager, metrics))
361 }
362
363 pub async fn start(&self) -> Result<()> {
364 self.manager.start().await.map_err(|e| anyhow::anyhow!("Lane manager start failed: {}", e))
365 }
366 pub async fn stop(&self) {
367 self.manager.shutdown().await;
368 }
369
370 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
371 self.lane_handlers.write().await.insert(lane, config);
372 }
373
374 pub async fn get_lane_handler(&self, lane: SessionLane) -> LaneHandlerConfig {
375 self.lane_handlers
376 .read()
377 .await
378 .get(&lane)
379 .cloned()
380 .unwrap_or_default()
381 }
382
383 pub async fn submit(
385 &self,
386 lane: SessionLane,
387 command: Box<dyn SessionCommand>,
388 ) -> oneshot::Receiver<Result<Value>> {
389 let (result_tx, result_rx) = oneshot::channel();
390 let handler_config = self.get_lane_handler(lane).await;
391
392 let task_id = format!(
394 "{}-{}",
395 self.session_id,
396 self.task_id_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
397 );
398
399 let adapter = SessionCommandAdapter::new(
400 command,
401 task_id,
402 handler_config.mode,
403 self.session_id.clone(),
404 lane,
405 handler_config.timeout_ms,
406 Arc::clone(&self.external_tasks),
407 self.event_tx.clone(),
408 );
409 match self.manager.submit(lane.lane_id(), Box::new(adapter)).await {
410 Ok(lane_rx) => {
411 tokio::spawn(async move {
412 match lane_rx.await {
413 Ok(Ok(value)) => {
414 let _ = result_tx.send(Ok(value));
415 }
416 Ok(Err(e)) => {
417 let _ = result_tx.send(Err(anyhow::anyhow!("{}", e)));
418 }
419 Err(_) => {
420 let _ = result_tx.send(Err(anyhow::anyhow!("Channel closed")));
421 }
422 }
423 });
424 }
425 Err(e) => {
426 let _ = result_tx.send(Err(e.into()));
427 }
428 }
429 result_rx
430 }
431
432 pub async fn submit_by_tool(
433 &self,
434 tool_name: &str,
435 command: Box<dyn SessionCommand>,
436 ) -> oneshot::Receiver<Result<Value>> {
437 self.submit(SessionLane::from_tool_name(tool_name), command)
438 .await
439 }
440
441 pub async fn submit_batch(
448 &self,
449 lane: SessionLane,
450 commands: Vec<Box<dyn SessionCommand>>,
451 ) -> Vec<oneshot::Receiver<Result<Value>>> {
452 if commands.is_empty() {
453 return Vec::new();
454 }
455
456 let handler_config = self.get_lane_handler(lane).await;
458
459 let mut receivers = Vec::with_capacity(commands.len());
460
461 for command in commands {
462 let (result_tx, result_rx) = oneshot::channel();
463
464 let task_id = format!(
466 "{}-{}",
467 self.session_id,
468 self.task_id_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
469 );
470
471 let adapter = SessionCommandAdapter::new(
472 command,
473 task_id,
474 handler_config.mode,
475 self.session_id.clone(),
476 lane,
477 handler_config.timeout_ms,
478 Arc::clone(&self.external_tasks),
479 self.event_tx.clone(),
480 );
481
482 match self.manager.submit(lane.lane_id(), Box::new(adapter)).await {
483 Ok(lane_rx) => {
484 tokio::spawn(async move {
485 match lane_rx.await {
486 Ok(Ok(value)) => {
487 let _ = result_tx.send(Ok(value));
488 }
489 Ok(Err(e)) => {
490 let _ = result_tx.send(Err(anyhow::anyhow!("{}", e)));
491 }
492 Err(_) => {
493 let _ = result_tx.send(Err(anyhow::anyhow!("Channel closed")));
494 }
495 }
496 });
497 }
498 Err(e) => {
499 let _ = result_tx.send(Err(e.into()));
500 }
501 }
502
503 receivers.push(result_rx);
504 }
505
506 receivers
507 }
508
509 pub async fn submit_batch_by_tool(
511 &self,
512 tool_name: &str,
513 commands: Vec<Box<dyn SessionCommand>>,
514 ) -> Vec<oneshot::Receiver<Result<Value>>> {
515 self.submit_batch(SessionLane::from_tool_name(tool_name), commands)
516 .await
517 }
518
519 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
520 let pending = { self.external_tasks.write().await.remove(task_id) };
521 if let Some(pending) = pending {
522 let _ = self.event_tx.send(AgentEvent::ExternalTaskCompleted {
523 task_id: task_id.to_string(),
524 session_id: self.session_id.clone(),
525 success: result.success,
526 });
527 let final_result = if result.success {
528 Ok(result.result)
529 } else {
530 Err(anyhow::anyhow!(result
531 .error
532 .unwrap_or_else(|| "External task failed".to_string())))
533 };
534 let _ = pending.result_tx.send(final_result);
535 true
536 } else {
537 false
538 }
539 }
540
541 pub async fn stats(&self) -> crate::queue::SessionQueueStats {
542 let lane_stats = self.manager.stats().await.ok();
543 let external_tasks = self.external_tasks.read().await;
544 let mut total_pending = 0;
545 let mut total_active = 0;
546 let mut lanes = HashMap::new();
547 if let Some(stats) = lane_stats {
548 for (lane_id, lane_stat) in stats.lanes {
549 total_pending += lane_stat.pending;
550 total_active += lane_stat.active;
551 let session_lane = match lane_id.as_str() {
552 "control" => SessionLane::Control,
553 "query" => SessionLane::Query,
554 "skill" => SessionLane::Execute,
555 "prompt" => SessionLane::Generate,
556 _ => continue,
557 };
558 let handler_mode = self.get_lane_handler(session_lane).await.mode;
559 lanes.insert(
560 format!("{:?}", session_lane),
561 crate::queue::LaneStatus {
562 lane: session_lane,
563 pending: lane_stat.pending,
564 active: lane_stat.active,
565 max_concurrency: lane_stat.max,
566 handler_mode,
567 },
568 );
569 }
570 }
571 crate::queue::SessionQueueStats {
572 total_pending,
573 total_active,
574 external_pending: external_tasks.len(),
575 lanes,
576 }
577 }
578
579 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
580 self.external_tasks
581 .read()
582 .await
583 .values()
584 .map(|p| p.task.clone())
585 .collect()
586 }
587
588 pub fn session_id(&self) -> &str {
589 &self.session_id
590 }
591
592 pub fn event_bridge(&self) -> &EventBridge {
594 &self.event_bridge
595 }
596
597 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
598 if let Some(dlq) = self.manager.queue().dlq() {
599 dlq.list().await
600 } else {
601 Vec::new()
602 }
603 }
604
605 pub async fn metrics_snapshot(&self) -> Option<MetricsSnapshot> {
606 if let Some(ref m) = self.metrics {
607 Some(m.snapshot().await)
608 } else {
609 None
610 }
611 }
612
613 pub async fn drain(&self, timeout: Duration) -> Result<()> {
614 Ok(self.manager.drain(timeout).await?)
615 }
616 pub fn is_shutting_down(&self) -> bool {
617 self.manager.is_shutting_down()
618 }
619}
620
621#[cfg(test)]
622mod tests {
623 use super::*;
624 use crate::queue::SessionCommand;
625
626 struct TestCommand {
627 value: Value,
628 }
629
630 #[async_trait]
631 impl SessionCommand for TestCommand {
632 async fn execute(&self) -> Result<Value> {
633 Ok(self.value.clone())
634 }
635 fn command_type(&self) -> &str {
636 "test"
637 }
638 fn payload(&self) -> Value {
639 self.value.clone()
640 }
641 }
642
643 #[tokio::test]
644 async fn test_session_lane_queue_creation() {
645 let (tx, _) = broadcast::channel(100);
646 let q = SessionLaneQueue::new("test-session", SessionQueueConfig::default(), tx)
647 .await
648 .unwrap();
649 assert_eq!(q.session_id(), "test-session");
650 }
651
652 #[tokio::test]
653 async fn test_submit_and_execute() {
654 let (tx, _) = broadcast::channel(100);
655 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
656 .await
657 .unwrap();
658 q.start().await.unwrap();
659 let cmd = Box::new(TestCommand {
660 value: serde_json::json!({"result": "success"}),
661 });
662 let rx = q.submit(SessionLane::Query, cmd).await;
663 let result = tokio::time::timeout(Duration::from_secs(2), rx)
664 .await
665 .unwrap()
666 .unwrap();
667 assert_eq!(result.unwrap()["result"], "success");
668 q.stop().await;
669 }
670
671 #[tokio::test]
672 async fn test_stats() {
673 let (tx, _) = broadcast::channel(100);
674 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
675 .await
676 .unwrap();
677 q.start().await.unwrap();
678 let stats = q.stats().await;
679 assert_eq!(stats.total_pending, 0);
680 assert_eq!(stats.total_active, 0);
681 assert_eq!(stats.external_pending, 0);
682 q.stop().await;
683 }
684
685 #[tokio::test]
686 async fn test_lane_handler_config() {
687 let (tx, _) = broadcast::channel(100);
688 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
689 .await
690 .unwrap();
691 assert_eq!(
692 q.get_lane_handler(SessionLane::Execute).await.mode,
693 TaskHandlerMode::Internal
694 );
695 q.set_lane_handler(
696 SessionLane::Execute,
697 LaneHandlerConfig {
698 mode: TaskHandlerMode::External,
699 timeout_ms: 30000,
700 },
701 )
702 .await;
703 let h = q.get_lane_handler(SessionLane::Execute).await;
704 assert_eq!(h.mode, TaskHandlerMode::External);
705 assert_eq!(h.timeout_ms, 30000);
706 }
707
708 #[tokio::test]
709 async fn test_submit_by_tool() {
710 let (tx, _) = broadcast::channel(100);
711 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
712 .await
713 .unwrap();
714 q.start().await.unwrap();
715 let cmd = Box::new(TestCommand {
716 value: serde_json::json!({"tool": "read"}),
717 });
718 let rx = q.submit_by_tool("read", cmd).await;
719 let result = tokio::time::timeout(Duration::from_secs(2), rx)
720 .await
721 .unwrap()
722 .unwrap();
723 assert_eq!(result.unwrap()["tool"], "read");
724 q.stop().await;
725 }
726
727 #[tokio::test]
728 async fn test_dead_letters_empty() {
729 let (tx, _) = broadcast::channel(100);
730 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
731 .await
732 .unwrap();
733 assert!(q.dead_letters().await.is_empty());
734 }
735
736 #[tokio::test]
737 async fn test_metrics_snapshot() {
738 let (tx, _) = broadcast::channel(100);
739 let cfg = SessionQueueConfig {
740 enable_metrics: true,
741 ..Default::default()
742 };
743 let q = SessionLaneQueue::new("s", cfg, tx).await.unwrap();
744 q.start().await.unwrap();
745 assert!(q.metrics_snapshot().await.is_some());
746 q.stop().await;
747 }
748
749 #[tokio::test]
750 async fn test_is_shutting_down() {
751 let (tx, _) = broadcast::channel(100);
752 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
753 .await
754 .unwrap();
755 assert!(!q.is_shutting_down());
756 q.stop().await;
757 assert!(q.is_shutting_down());
758 }
759
760 #[tokio::test]
761 async fn test_pending_external_tasks_empty() {
762 let (tx, _) = broadcast::channel(100);
763 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
764 .await
765 .unwrap();
766 assert!(q.pending_external_tasks().await.is_empty());
767 }
768
769 #[tokio::test]
770 async fn test_complete_external_task_nonexistent() {
771 let (tx, _) = broadcast::channel(100);
772 let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
773 .await
774 .unwrap();
775 let r = ExternalTaskResult {
776 success: true,
777 result: serde_json::json!("ok"),
778 error: None,
779 };
780 assert!(!q.complete_external_task("nope", r).await);
781 }
782
783 #[test]
784 fn test_command_payload() {
785 let cmd = TestCommand {
786 value: serde_json::json!({"k": "v"}),
787 };
788 assert_eq!(cmd.payload(), serde_json::json!({"k": "v"}));
789 assert_eq!(cmd.command_type(), "test");
790 }
791
792 #[test]
793 fn test_event_bridge_new() {
794 let (tx, _) = broadcast::channel(100);
795 let _b = EventBridge::new(tx);
796 }
798
799 #[test]
800 fn test_event_bridge_emit_dead_letter() {
801 let (tx, mut rx) = broadcast::channel(100);
802 let b = EventBridge::new(tx);
803 b.emit_dead_letter(&DeadLetter {
804 command_id: "c1".to_string(),
805 command_type: "t".to_string(),
806 lane_id: "control".to_string(),
807 error: "err".to_string(),
808 attempts: 3,
809 failed_at: chrono::Utc::now(),
810 });
811 match rx.try_recv().unwrap() {
812 AgentEvent::CommandDeadLettered {
813 command_id,
814 attempts,
815 ..
816 } => {
817 assert_eq!(command_id, "c1");
818 assert_eq!(attempts, 3);
819 }
820 _ => panic!("wrong event"),
821 }
822 }
823
824 #[test]
825 fn test_event_bridge_emit_retry() {
826 let (tx, mut rx) = broadcast::channel(100);
827 let b = EventBridge::new(tx);
828 b.emit_retry("c1", "t", "query", 2, 1000);
829 match rx.try_recv().unwrap() {
830 AgentEvent::CommandRetry {
831 attempt, delay_ms, ..
832 } => {
833 assert_eq!(attempt, 2);
834 assert_eq!(delay_ms, 1000);
835 }
836 _ => panic!("wrong event"),
837 }
838 }
839
840 #[test]
841 fn test_event_bridge_emit_alert() {
842 let (tx, mut rx) = broadcast::channel(100);
843 let b = EventBridge::new(tx);
844 b.emit_alert("warning", "queue_full", "at capacity");
845 match rx.try_recv().unwrap() {
846 AgentEvent::QueueAlert { level, .. } => assert_eq!(level, "warning"),
847 _ => panic!("wrong event"),
848 }
849 }
850
851 #[test]
852 fn test_lane_mapping() {
853 assert_eq!(SessionLane::Control.lane_id(), "control");
854 assert_eq!(SessionLane::Query.lane_id(), "query");
855 assert_eq!(SessionLane::Execute.lane_id(), "skill");
856 assert_eq!(SessionLane::Generate.lane_id(), "prompt");
857 }
858
859 #[test]
860 fn test_lane_priority() {
861 assert!(SessionLane::Control.lane_priority() < SessionLane::Query.lane_priority());
862 assert!(SessionLane::Query.lane_priority() < SessionLane::Execute.lane_priority());
863 assert!(SessionLane::Execute.lane_priority() < SessionLane::Generate.lane_priority());
864 }
865
866 #[tokio::test]
869 async fn test_build_queue_manager_default() {
870 let (_, metrics) = SessionLaneQueue::build_queue_manager(&SessionQueueConfig::default())
871 .await
872 .unwrap();
873 assert!(metrics.is_none());
874 }
875
876 #[tokio::test]
877 async fn test_build_queue_manager_with_metrics() {
878 let cfg = SessionQueueConfig {
879 enable_metrics: true,
880 ..Default::default()
881 };
882 let (_, metrics) = SessionLaneQueue::build_queue_manager(&cfg).await.unwrap();
883 assert!(metrics.is_some());
884 }
885
886 #[tokio::test]
887 async fn test_build_queue_manager_with_dlq() {
888 let cfg = SessionQueueConfig {
889 enable_dlq: true,
890 dlq_max_size: Some(500),
891 ..Default::default()
892 };
893 assert!(SessionLaneQueue::build_queue_manager(&cfg).await.is_ok());
894 }
895}