Skip to main content

a3s_code_core/
session_lane_queue.rs

1//! Session Lane Queue - a3s-lane backed command queue
2//!
3//! Provides per-session command queues with lane-based priority scheduling,
4//! backed by a3s-lane directly (no intermediate wrapper).
5//!
6//! ## Features
7//!
8//! - Per-session QueueManager with independent queue, metrics, DLQ
9//! - Preserves Internal/External/Hybrid task handler modes
10//! - Dead letter queue for failed commands
11//! - Metrics collection and alerts
12//! - Retry policies and rate limiting
13
14use crate::agent::AgentEvent;
15use crate::queue::SessionLane;
16use crate::queue::{
17    ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionCommand, SessionQueueConfig,
18    TaskHandlerMode,
19};
20use a3s_lane::{
21    AlertManager, Command as LaneCommand, DeadLetter, EventEmitter, LaneConfig, LaneError,
22    LocalStorage, MetricsSnapshot, PriorityBoostConfig, QueueManager, QueueManagerBuilder,
23    QueueMetrics, RateLimitConfig, Result as LaneResult, RetryPolicy,
24};
25use anyhow::Result;
26use async_trait::async_trait;
27use serde_json::Value;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31use tokio::sync::{broadcast, oneshot, RwLock};
32
33// ============================================================================
34// SessionLane → a3s-lane mapping
35// ============================================================================
36
37impl SessionLane {
38    /// Get a3s-lane lane ID string
39    fn lane_id(self) -> &'static str {
40        match self {
41            SessionLane::Control => "control",
42            SessionLane::Query => "query",
43            SessionLane::Execute => "skill",
44            SessionLane::Generate => "prompt",
45        }
46    }
47
48    /// Get default lane configuration for a3s-lane
49    fn lane_config(self) -> LaneConfig {
50        match self {
51            SessionLane::Control => LaneConfig::new(1, 2),
52            SessionLane::Query => LaneConfig::new(1, 4),
53            SessionLane::Execute => LaneConfig::new(1, 2),
54            SessionLane::Generate => LaneConfig::new(1, 1),
55        }
56    }
57
58    /// Get priority value (lower = higher priority)
59    fn lane_priority(self) -> u8 {
60        match self {
61            SessionLane::Control => 1,
62            SessionLane::Query => 2,
63            SessionLane::Execute => 4,
64            SessionLane::Generate => 5,
65        }
66    }
67}
68
69// ============================================================================
70// Pending External Task
71// ============================================================================
72
73/// A pending external task waiting for completion
74struct PendingExternalTask {
75    task: ExternalTask,
76    result_tx: oneshot::Sender<Result<Value>>,
77}
78
79// ============================================================================
80// Session Command Adapter
81// ============================================================================
82
83/// Adapter that wraps SessionCommand for a3s-lane execution
84pub struct SessionCommandAdapter {
85    inner: Box<dyn SessionCommand>,
86    task_id: String,
87    handler_mode: TaskHandlerMode,
88    session_id: String,
89    lane: SessionLane,
90    timeout_ms: u64,
91    external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
92    event_tx: broadcast::Sender<AgentEvent>,
93}
94
95impl SessionCommandAdapter {
96    #[allow(clippy::too_many_arguments)]
97    fn new(
98        inner: Box<dyn SessionCommand>,
99        task_id: String,
100        handler_mode: TaskHandlerMode,
101        session_id: String,
102        lane: SessionLane,
103        timeout_ms: u64,
104        external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
105        event_tx: broadcast::Sender<AgentEvent>,
106    ) -> Self {
107        Self {
108            inner,
109            task_id,
110            handler_mode,
111            session_id,
112            lane,
113            timeout_ms,
114            external_tasks,
115            event_tx,
116        }
117    }
118
119    /// Register as external task and wait for completion
120    async fn register_and_wait(&self) -> LaneResult<Value> {
121        let (tx, rx) = oneshot::channel();
122        let task = ExternalTask {
123            task_id: self.task_id.clone(),
124            session_id: self.session_id.clone(),
125            lane: self.lane,
126            command_type: self.inner.command_type().to_string(),
127            payload: self.inner.payload(),
128            timeout_ms: self.timeout_ms,
129            created_at: Some(Instant::now()),
130        };
131        {
132            let mut tasks = self.external_tasks.write().await;
133            tasks.insert(
134                self.task_id.clone(),
135                PendingExternalTask {
136                    task: task.clone(),
137                    result_tx: tx,
138                },
139            );
140        }
141        let _ = self.event_tx.send(AgentEvent::ExternalTaskPending {
142            task_id: task.task_id.clone(),
143            session_id: task.session_id.clone(),
144            lane: task.lane,
145            command_type: task.command_type.clone(),
146            payload: task.payload.clone(),
147            timeout_ms: task.timeout_ms,
148        });
149        match tokio::time::timeout(Duration::from_millis(self.timeout_ms), rx).await {
150            Ok(Ok(result)) => result.map_err(|e| LaneError::CommandError(e.to_string())),
151            Ok(Err(_)) => Err(LaneError::CommandError("Channel closed".to_string())),
152            Err(_) => {
153                let mut tasks = self.external_tasks.write().await;
154                tasks.remove(&self.task_id);
155                Err(LaneError::Timeout(Duration::from_millis(self.timeout_ms)))
156            }
157        }
158    }
159
160    /// Execute internally with external notification (Hybrid mode)
161    async fn execute_with_notification(&self) -> LaneResult<Value> {
162        let task = ExternalTask {
163            task_id: self.task_id.clone(),
164            session_id: self.session_id.clone(),
165            lane: self.lane,
166            command_type: self.inner.command_type().to_string(),
167            payload: self.inner.payload(),
168            timeout_ms: self.timeout_ms,
169            created_at: Some(Instant::now()),
170        };
171        let _ = self.event_tx.send(AgentEvent::ExternalTaskPending {
172            task_id: task.task_id.clone(),
173            session_id: task.session_id.clone(),
174            lane: task.lane,
175            command_type: task.command_type.clone(),
176            payload: task.payload.clone(),
177            timeout_ms: task.timeout_ms,
178        });
179        let result = self
180            .inner
181            .execute()
182            .await
183            .map_err(|e| LaneError::CommandError(e.to_string()));
184        let _ = self.event_tx.send(AgentEvent::ExternalTaskCompleted {
185            task_id: self.task_id.clone(),
186            session_id: self.session_id.clone(),
187            success: result.is_ok(),
188        });
189        result
190    }
191}
192
193#[async_trait]
194impl LaneCommand for SessionCommandAdapter {
195    async fn execute(&self) -> LaneResult<Value> {
196        match self.handler_mode {
197            TaskHandlerMode::Internal => self
198                .inner
199                .execute()
200                .await
201                .map_err(|e| LaneError::CommandError(e.to_string())),
202            TaskHandlerMode::External => self.register_and_wait().await,
203            TaskHandlerMode::Hybrid => self.execute_with_notification().await,
204        }
205    }
206    fn command_type(&self) -> &str {
207        self.inner.command_type()
208    }
209}
210
211// ============================================================================
212// Event Bridge
213// ============================================================================
214
215/// Bridge that translates a3s-lane events to AgentEvent
216pub struct EventBridge {
217    event_tx: broadcast::Sender<AgentEvent>,
218}
219
220impl EventBridge {
221    pub fn new(event_tx: broadcast::Sender<AgentEvent>) -> Self {
222        Self { event_tx }
223    }
224
225    pub fn emit_dead_letter(&self, dead_letter: &DeadLetter) {
226        let _ = self.event_tx.send(AgentEvent::CommandDeadLettered {
227            command_id: dead_letter.command_id.clone(),
228            command_type: dead_letter.command_type.clone(),
229            lane: dead_letter.lane_id.clone(),
230            error: dead_letter.error.clone(),
231            attempts: dead_letter.attempts,
232        });
233    }
234
235    pub fn emit_retry(
236        &self,
237        command_id: &str,
238        command_type: &str,
239        lane: &str,
240        attempt: u32,
241        delay_ms: u64,
242    ) {
243        let _ = self.event_tx.send(AgentEvent::CommandRetry {
244            command_id: command_id.to_string(),
245            command_type: command_type.to_string(),
246            lane: lane.to_string(),
247            attempt,
248            delay_ms,
249        });
250    }
251
252    pub fn emit_alert(&self, level: &str, alert_type: &str, message: &str) {
253        let _ = self.event_tx.send(AgentEvent::QueueAlert {
254            level: level.to_string(),
255            alert_type: alert_type.to_string(),
256            message: message.to_string(),
257        });
258    }
259}
260
261// ============================================================================
262// Session Lane Queue
263// ============================================================================
264
265/// Per-session command queue backed by a3s-lane with external task handling
266pub struct SessionLaneQueue {
267    session_id: String,
268    manager: Arc<QueueManager>,
269    metrics: Option<QueueMetrics>,
270    external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
271    lane_handlers: Arc<RwLock<HashMap<SessionLane, LaneHandlerConfig>>>,
272    event_tx: broadcast::Sender<AgentEvent>,
273    event_bridge: Arc<EventBridge>,
274}
275
276impl SessionLaneQueue {
277    /// Create a new session lane queue
278    pub async fn new(
279        session_id: &str,
280        config: SessionQueueConfig,
281        event_tx: broadcast::Sender<AgentEvent>,
282    ) -> Result<Self> {
283        let (manager, metrics) = Self::build_queue_manager(&config).await?;
284        let mut lane_handlers = HashMap::new();
285        for lane in [
286            SessionLane::Control,
287            SessionLane::Query,
288            SessionLane::Execute,
289            SessionLane::Generate,
290        ] {
291            lane_handlers.insert(lane, config.handler_config(lane));
292        }
293        let event_bridge = Arc::new(EventBridge::new(event_tx.clone()));
294        Ok(Self {
295            session_id: session_id.to_string(),
296            manager: Arc::new(manager),
297            metrics,
298            external_tasks: Arc::new(RwLock::new(HashMap::new())),
299            lane_handlers: Arc::new(RwLock::new(lane_handlers)),
300            event_tx,
301            event_bridge,
302        })
303    }
304
305    /// Build a3s-lane QueueManager directly from SessionQueueConfig
306    async fn build_queue_manager(
307        config: &SessionQueueConfig,
308    ) -> Result<(QueueManager, Option<QueueMetrics>)> {
309        let emitter = EventEmitter::new(100);
310        let mut builder = QueueManagerBuilder::new(emitter);
311        let default_timeout = config.default_timeout_ms.map(Duration::from_millis);
312        let default_retry = Some(RetryPolicy::exponential(3));
313
314        for lane in [
315            SessionLane::Control,
316            SessionLane::Query,
317            SessionLane::Execute,
318            SessionLane::Generate,
319        ] {
320            let mut cfg = lane.lane_config();
321            if let Some(timeout) = default_timeout {
322                cfg = cfg.with_timeout(timeout);
323            }
324            if let Some(ref retry) = default_retry {
325                cfg = cfg.with_retry_policy(retry.clone());
326            }
327            if lane == SessionLane::Generate {
328                cfg = cfg.with_rate_limit(RateLimitConfig::per_minute(60));
329                cfg = cfg
330                    .with_priority_boost(PriorityBoostConfig::standard(Duration::from_secs(300)));
331            }
332            builder = builder.with_lane(lane.lane_id(), cfg, lane.lane_priority());
333        }
334
335        if config.enable_dlq {
336            builder = builder.with_dlq(config.dlq_max_size.unwrap_or(1000));
337        }
338
339        let metrics = if config.enable_metrics {
340            let m = QueueMetrics::local();
341            builder = builder.with_metrics(m.clone());
342            Some(m)
343        } else {
344            None
345        };
346
347        if config.enable_alerts {
348            builder = builder.with_alerts(Arc::new(AlertManager::with_queue_depth_alerts(50, 100)));
349        }
350
351        if let Some(ref storage_path) = config.storage_path {
352            builder = builder.with_storage(Arc::new(
353                LocalStorage::new(storage_path.to_path_buf()).await?,
354            ));
355        }
356
357        let manager = builder.build().await?;
358        Ok((manager, metrics))
359    }
360
361    pub async fn start(&self) -> Result<()> {
362        self.manager.start().await
363    }
364    pub async fn stop(&self) {
365        self.manager.shutdown().await;
366    }
367
368    pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
369        self.lane_handlers.write().await.insert(lane, config);
370    }
371
372    pub async fn get_lane_handler(&self, lane: SessionLane) -> LaneHandlerConfig {
373        self.lane_handlers
374            .read()
375            .await
376            .get(&lane)
377            .cloned()
378            .unwrap_or_default()
379    }
380
381    /// Submit a command to a specific lane
382    pub async fn submit(
383        &self,
384        lane: SessionLane,
385        command: Box<dyn SessionCommand>,
386    ) -> oneshot::Receiver<Result<Value>> {
387        let (result_tx, result_rx) = oneshot::channel();
388        let handler_config = self.get_lane_handler(lane).await;
389        let task_id = uuid::Uuid::new_v4().to_string();
390        let adapter = SessionCommandAdapter::new(
391            command,
392            task_id,
393            handler_config.mode,
394            self.session_id.clone(),
395            lane,
396            handler_config.timeout_ms,
397            Arc::clone(&self.external_tasks),
398            self.event_tx.clone(),
399        );
400        match self.manager.submit(lane.lane_id(), Box::new(adapter)).await {
401            Ok(lane_rx) => {
402                tokio::spawn(async move {
403                    match lane_rx.await {
404                        Ok(Ok(value)) => {
405                            let _ = result_tx.send(Ok(value));
406                        }
407                        Ok(Err(e)) => {
408                            let _ = result_tx.send(Err(anyhow::anyhow!("{}", e)));
409                        }
410                        Err(_) => {
411                            let _ = result_tx.send(Err(anyhow::anyhow!("Channel closed")));
412                        }
413                    }
414                });
415            }
416            Err(e) => {
417                let _ = result_tx.send(Err(e.into()));
418            }
419        }
420        result_rx
421    }
422
423    pub async fn submit_by_tool(
424        &self,
425        tool_name: &str,
426        command: Box<dyn SessionCommand>,
427    ) -> oneshot::Receiver<Result<Value>> {
428        self.submit(SessionLane::from_tool_name(tool_name), command)
429            .await
430    }
431
432    pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
433        let pending = { self.external_tasks.write().await.remove(task_id) };
434        if let Some(pending) = pending {
435            let _ = self.event_tx.send(AgentEvent::ExternalTaskCompleted {
436                task_id: task_id.to_string(),
437                session_id: self.session_id.clone(),
438                success: result.success,
439            });
440            let final_result = if result.success {
441                Ok(result.result)
442            } else {
443                Err(anyhow::anyhow!(result
444                    .error
445                    .unwrap_or_else(|| "External task failed".to_string())))
446            };
447            let _ = pending.result_tx.send(final_result);
448            true
449        } else {
450            false
451        }
452    }
453
454    pub async fn stats(&self) -> crate::queue::SessionQueueStats {
455        let lane_stats = self.manager.stats().await.ok();
456        let external_tasks = self.external_tasks.read().await;
457        let mut total_pending = 0;
458        let mut total_active = 0;
459        let mut lanes = HashMap::new();
460        if let Some(stats) = lane_stats {
461            for (lane_id, lane_stat) in stats.lanes {
462                total_pending += lane_stat.pending;
463                total_active += lane_stat.active;
464                let session_lane = match lane_id.as_str() {
465                    "control" => SessionLane::Control,
466                    "query" => SessionLane::Query,
467                    "skill" => SessionLane::Execute,
468                    "prompt" => SessionLane::Generate,
469                    _ => continue,
470                };
471                let handler_mode = self.get_lane_handler(session_lane).await.mode;
472                lanes.insert(
473                    format!("{:?}", session_lane),
474                    crate::queue::LaneStatus {
475                        lane: session_lane,
476                        pending: lane_stat.pending,
477                        active: lane_stat.active,
478                        max_concurrency: lane_stat.max,
479                        handler_mode,
480                    },
481                );
482            }
483        }
484        crate::queue::SessionQueueStats {
485            total_pending,
486            total_active,
487            external_pending: external_tasks.len(),
488            lanes,
489        }
490    }
491
492    pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
493        self.external_tasks
494            .read()
495            .await
496            .values()
497            .map(|p| p.task.clone())
498            .collect()
499    }
500
501    pub fn session_id(&self) -> &str {
502        &self.session_id
503    }
504
505    /// Access the event bridge for emitting queue lifecycle events
506    pub fn event_bridge(&self) -> &EventBridge {
507        &self.event_bridge
508    }
509
510    pub async fn dead_letters(&self) -> Vec<DeadLetter> {
511        if let Some(dlq) = self.manager.queue().dlq() {
512            dlq.list().await
513        } else {
514            Vec::new()
515        }
516    }
517
518    pub async fn metrics_snapshot(&self) -> Option<MetricsSnapshot> {
519        if let Some(ref m) = self.metrics {
520            Some(m.snapshot().await)
521        } else {
522            None
523        }
524    }
525
526    pub async fn drain(&self, timeout: Duration) -> Result<()> {
527        Ok(self.manager.drain(timeout).await?)
528    }
529    pub fn is_shutting_down(&self) -> bool {
530        self.manager.is_shutting_down()
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use super::*;
537    use crate::queue::SessionCommand;
538
539    struct TestCommand {
540        value: Value,
541    }
542
543    #[async_trait]
544    impl SessionCommand for TestCommand {
545        async fn execute(&self) -> Result<Value> {
546            Ok(self.value.clone())
547        }
548        fn command_type(&self) -> &str {
549            "test"
550        }
551        fn payload(&self) -> Value {
552            self.value.clone()
553        }
554    }
555
556    #[tokio::test]
557    async fn test_session_lane_queue_creation() {
558        let (tx, _) = broadcast::channel(100);
559        let q = SessionLaneQueue::new("test-session", SessionQueueConfig::default(), tx)
560            .await
561            .unwrap();
562        assert_eq!(q.session_id(), "test-session");
563    }
564
565    #[tokio::test]
566    async fn test_submit_and_execute() {
567        let (tx, _) = broadcast::channel(100);
568        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
569            .await
570            .unwrap();
571        q.start().await.unwrap();
572        let cmd = Box::new(TestCommand {
573            value: serde_json::json!({"result": "success"}),
574        });
575        let rx = q.submit(SessionLane::Query, cmd).await;
576        let result = tokio::time::timeout(Duration::from_secs(2), rx)
577            .await
578            .unwrap()
579            .unwrap();
580        assert_eq!(result.unwrap()["result"], "success");
581        q.stop().await;
582    }
583
584    #[tokio::test]
585    async fn test_stats() {
586        let (tx, _) = broadcast::channel(100);
587        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
588            .await
589            .unwrap();
590        q.start().await.unwrap();
591        let stats = q.stats().await;
592        assert_eq!(stats.total_pending, 0);
593        assert_eq!(stats.total_active, 0);
594        assert_eq!(stats.external_pending, 0);
595        q.stop().await;
596    }
597
598    #[tokio::test]
599    async fn test_lane_handler_config() {
600        let (tx, _) = broadcast::channel(100);
601        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
602            .await
603            .unwrap();
604        assert_eq!(
605            q.get_lane_handler(SessionLane::Execute).await.mode,
606            TaskHandlerMode::Internal
607        );
608        q.set_lane_handler(
609            SessionLane::Execute,
610            LaneHandlerConfig {
611                mode: TaskHandlerMode::External,
612                timeout_ms: 30000,
613            },
614        )
615        .await;
616        let h = q.get_lane_handler(SessionLane::Execute).await;
617        assert_eq!(h.mode, TaskHandlerMode::External);
618        assert_eq!(h.timeout_ms, 30000);
619    }
620
621    #[tokio::test]
622    async fn test_submit_by_tool() {
623        let (tx, _) = broadcast::channel(100);
624        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
625            .await
626            .unwrap();
627        q.start().await.unwrap();
628        let cmd = Box::new(TestCommand {
629            value: serde_json::json!({"tool": "read"}),
630        });
631        let rx = q.submit_by_tool("read", cmd).await;
632        let result = tokio::time::timeout(Duration::from_secs(2), rx)
633            .await
634            .unwrap()
635            .unwrap();
636        assert_eq!(result.unwrap()["tool"], "read");
637        q.stop().await;
638    }
639
640    #[tokio::test]
641    async fn test_dead_letters_empty() {
642        let (tx, _) = broadcast::channel(100);
643        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
644            .await
645            .unwrap();
646        assert!(q.dead_letters().await.is_empty());
647    }
648
649    #[tokio::test]
650    async fn test_metrics_snapshot() {
651        let (tx, _) = broadcast::channel(100);
652        let cfg = SessionQueueConfig {
653            enable_metrics: true,
654            ..Default::default()
655        };
656        let q = SessionLaneQueue::new("s", cfg, tx).await.unwrap();
657        q.start().await.unwrap();
658        assert!(q.metrics_snapshot().await.is_some());
659        q.stop().await;
660    }
661
662    #[tokio::test]
663    async fn test_is_shutting_down() {
664        let (tx, _) = broadcast::channel(100);
665        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
666            .await
667            .unwrap();
668        assert!(!q.is_shutting_down());
669        q.stop().await;
670        assert!(q.is_shutting_down());
671    }
672
673    #[tokio::test]
674    async fn test_pending_external_tasks_empty() {
675        let (tx, _) = broadcast::channel(100);
676        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
677            .await
678            .unwrap();
679        assert!(q.pending_external_tasks().await.is_empty());
680    }
681
682    #[tokio::test]
683    async fn test_complete_external_task_nonexistent() {
684        let (tx, _) = broadcast::channel(100);
685        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
686            .await
687            .unwrap();
688        let r = ExternalTaskResult {
689            success: true,
690            result: serde_json::json!("ok"),
691            error: None,
692        };
693        assert!(!q.complete_external_task("nope", r).await);
694    }
695
696    #[test]
697    fn test_command_payload() {
698        let cmd = TestCommand {
699            value: serde_json::json!({"k": "v"}),
700        };
701        assert_eq!(cmd.payload(), serde_json::json!({"k": "v"}));
702        assert_eq!(cmd.command_type(), "test");
703    }
704
705    #[test]
706    fn test_event_bridge_new() {
707        let (tx, _) = broadcast::channel(100);
708        let _b = EventBridge::new(tx);
709        // EventBridge constructed successfully
710    }
711
712    #[test]
713    fn test_event_bridge_emit_dead_letter() {
714        let (tx, mut rx) = broadcast::channel(100);
715        let b = EventBridge::new(tx);
716        b.emit_dead_letter(&DeadLetter {
717            command_id: "c1".to_string(),
718            command_type: "t".to_string(),
719            lane_id: "control".to_string(),
720            error: "err".to_string(),
721            attempts: 3,
722            failed_at: chrono::Utc::now(),
723        });
724        match rx.try_recv().unwrap() {
725            AgentEvent::CommandDeadLettered {
726                command_id,
727                attempts,
728                ..
729            } => {
730                assert_eq!(command_id, "c1");
731                assert_eq!(attempts, 3);
732            }
733            _ => panic!("wrong event"),
734        }
735    }
736
737    #[test]
738    fn test_event_bridge_emit_retry() {
739        let (tx, mut rx) = broadcast::channel(100);
740        let b = EventBridge::new(tx);
741        b.emit_retry("c1", "t", "query", 2, 1000);
742        match rx.try_recv().unwrap() {
743            AgentEvent::CommandRetry {
744                attempt, delay_ms, ..
745            } => {
746                assert_eq!(attempt, 2);
747                assert_eq!(delay_ms, 1000);
748            }
749            _ => panic!("wrong event"),
750        }
751    }
752
753    #[test]
754    fn test_event_bridge_emit_alert() {
755        let (tx, mut rx) = broadcast::channel(100);
756        let b = EventBridge::new(tx);
757        b.emit_alert("warning", "queue_full", "at capacity");
758        match rx.try_recv().unwrap() {
759            AgentEvent::QueueAlert { level, .. } => assert_eq!(level, "warning"),
760            _ => panic!("wrong event"),
761        }
762    }
763
764    #[test]
765    fn test_lane_mapping() {
766        assert_eq!(SessionLane::Control.lane_id(), "control");
767        assert_eq!(SessionLane::Query.lane_id(), "query");
768        assert_eq!(SessionLane::Execute.lane_id(), "skill");
769        assert_eq!(SessionLane::Generate.lane_id(), "prompt");
770    }
771
772    #[test]
773    fn test_lane_priority() {
774        assert!(SessionLane::Control.lane_priority() < SessionLane::Query.lane_priority());
775        assert!(SessionLane::Query.lane_priority() < SessionLane::Execute.lane_priority());
776        assert!(SessionLane::Execute.lane_priority() < SessionLane::Generate.lane_priority());
777    }
778
779    #[test]
780    fn test_lane_config() {
781        assert_eq!(SessionLane::Control.lane_config().max_concurrency, 2);
782        assert_eq!(SessionLane::Query.lane_config().max_concurrency, 4);
783        assert_eq!(SessionLane::Generate.lane_config().max_concurrency, 1);
784    }
785
786    #[tokio::test]
787    async fn test_build_queue_manager_default() {
788        let (_, metrics) = SessionLaneQueue::build_queue_manager(&SessionQueueConfig::default())
789            .await
790            .unwrap();
791        assert!(metrics.is_none());
792    }
793
794    #[tokio::test]
795    async fn test_build_queue_manager_with_metrics() {
796        let cfg = SessionQueueConfig {
797            enable_metrics: true,
798            ..Default::default()
799        };
800        let (_, metrics) = SessionLaneQueue::build_queue_manager(&cfg).await.unwrap();
801        assert!(metrics.is_some());
802    }
803
804    #[tokio::test]
805    async fn test_build_queue_manager_with_dlq() {
806        let cfg = SessionQueueConfig {
807            enable_dlq: true,
808            dlq_max_size: Some(500),
809            ..Default::default()
810        };
811        assert!(SessionLaneQueue::build_queue_manager(&cfg).await.is_ok());
812    }
813}