Skip to main content

a3s_code_core/
session_lane_queue.rs

1//! Session Lane Queue - a3s-lane backed command queue
2//!
3//! Provides per-session command queues with lane-based priority scheduling,
4//! backed by a3s-lane directly (no intermediate wrapper).
5//!
6//! ## Features
7//!
8//! - Per-session QueueManager with independent queue, metrics, DLQ
9//! - Preserves Internal/External/Hybrid task handler modes
10//! - Dead letter queue for failed commands
11//! - Metrics collection and alerts
12//! - Retry policies and rate limiting
13
14use crate::agent::AgentEvent;
15use crate::queue::SessionLane;
16use crate::queue::{
17    ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionCommand, SessionQueueConfig,
18    TaskHandlerMode,
19};
20use a3s_lane::{
21    AlertManager, Command as LaneCommand, DeadLetter, EventEmitter, LaneConfig, LaneError,
22    LocalStorage, MetricsSnapshot, PriorityBoostConfig, QueueManager, QueueManagerBuilder,
23    QueueMetrics, RateLimitConfig, Result as LaneResult, RetryPolicy,
24};
25use anyhow::Result;
26use async_trait::async_trait;
27use serde_json::Value;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31use tokio::sync::{broadcast, oneshot, RwLock};
32
33// ============================================================================
34// SessionLane → a3s-lane mapping
35// ============================================================================
36
37impl SessionLane {
38    /// Get a3s-lane lane ID string
39    fn lane_id(self) -> &'static str {
40        match self {
41            SessionLane::Control => "control",
42            SessionLane::Query => "query",
43            SessionLane::Execute => "skill",
44            SessionLane::Generate => "prompt",
45        }
46    }
47
48    /// Get priority value (lower = higher priority)
49    fn lane_priority(self) -> u8 {
50        match self {
51            SessionLane::Control => 1,
52            SessionLane::Query => 2,
53            SessionLane::Execute => 4,
54            SessionLane::Generate => 5,
55        }
56    }
57}
58
59// ============================================================================
60// Pending External Task
61// ============================================================================
62
63/// A pending external task waiting for completion
64struct PendingExternalTask {
65    task: ExternalTask,
66    result_tx: oneshot::Sender<Result<Value>>,
67}
68
69// ============================================================================
70// Session Command Adapter
71// ============================================================================
72
73/// Adapter that wraps SessionCommand for a3s-lane execution
74pub struct SessionCommandAdapter {
75    inner: Box<dyn SessionCommand>,
76    task_id: String,
77    handler_mode: TaskHandlerMode,
78    session_id: String,
79    lane: SessionLane,
80    timeout_ms: u64,
81    external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
82    event_tx: broadcast::Sender<AgentEvent>,
83}
84
85impl SessionCommandAdapter {
86    #[allow(clippy::too_many_arguments)]
87    fn new(
88        inner: Box<dyn SessionCommand>,
89        task_id: String,
90        handler_mode: TaskHandlerMode,
91        session_id: String,
92        lane: SessionLane,
93        timeout_ms: u64,
94        external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
95        event_tx: broadcast::Sender<AgentEvent>,
96    ) -> Self {
97        Self {
98            inner,
99            task_id,
100            handler_mode,
101            session_id,
102            lane,
103            timeout_ms,
104            external_tasks,
105            event_tx,
106        }
107    }
108
109    /// Register as external task and wait for completion
110    async fn register_and_wait(&self) -> LaneResult<Value> {
111        let (tx, rx) = oneshot::channel();
112        let task = ExternalTask {
113            task_id: self.task_id.clone(),
114            session_id: self.session_id.clone(),
115            lane: self.lane,
116            command_type: self.inner.command_type().to_string(),
117            payload: self.inner.payload(),
118            timeout_ms: self.timeout_ms,
119            created_at: Some(Instant::now()),
120        };
121        {
122            let mut tasks = self.external_tasks.write().await;
123            tasks.insert(
124                self.task_id.clone(),
125                PendingExternalTask {
126                    task: task.clone(),
127                    result_tx: tx,
128                },
129            );
130        }
131        let _ = self.event_tx.send(AgentEvent::ExternalTaskPending {
132            task_id: task.task_id.clone(),
133            session_id: task.session_id.clone(),
134            lane: task.lane,
135            command_type: task.command_type.clone(),
136            payload: task.payload.clone(),
137            timeout_ms: task.timeout_ms,
138        });
139        match tokio::time::timeout(Duration::from_millis(self.timeout_ms), rx).await {
140            Ok(Ok(result)) => result.map_err(|e| LaneError::CommandError(e.to_string())),
141            Ok(Err(_)) => Err(LaneError::CommandError("Channel closed".to_string())),
142            Err(_) => {
143                let mut tasks = self.external_tasks.write().await;
144                tasks.remove(&self.task_id);
145                Err(LaneError::Timeout(Duration::from_millis(self.timeout_ms)))
146            }
147        }
148    }
149
150    /// Execute internally with external notification (Hybrid mode)
151    async fn execute_with_notification(&self) -> LaneResult<Value> {
152        let task = ExternalTask {
153            task_id: self.task_id.clone(),
154            session_id: self.session_id.clone(),
155            lane: self.lane,
156            command_type: self.inner.command_type().to_string(),
157            payload: self.inner.payload(),
158            timeout_ms: self.timeout_ms,
159            created_at: Some(Instant::now()),
160        };
161        let _ = self.event_tx.send(AgentEvent::ExternalTaskPending {
162            task_id: task.task_id.clone(),
163            session_id: task.session_id.clone(),
164            lane: task.lane,
165            command_type: task.command_type.clone(),
166            payload: task.payload.clone(),
167            timeout_ms: task.timeout_ms,
168        });
169        let result = self
170            .inner
171            .execute()
172            .await
173            .map_err(|e| LaneError::CommandError(e.to_string()));
174        let _ = self.event_tx.send(AgentEvent::ExternalTaskCompleted {
175            task_id: self.task_id.clone(),
176            session_id: self.session_id.clone(),
177            success: result.is_ok(),
178        });
179        result
180    }
181}
182
183#[async_trait]
184impl LaneCommand for SessionCommandAdapter {
185    async fn execute(&self) -> LaneResult<Value> {
186        match self.handler_mode {
187            TaskHandlerMode::Internal => self
188                .inner
189                .execute()
190                .await
191                .map_err(|e| LaneError::CommandError(e.to_string())),
192            TaskHandlerMode::External => self.register_and_wait().await,
193            TaskHandlerMode::Hybrid => self.execute_with_notification().await,
194        }
195    }
196    fn command_type(&self) -> &str {
197        self.inner.command_type()
198    }
199}
200
201// ============================================================================
202// Event Bridge
203// ============================================================================
204
205/// Bridge that translates a3s-lane events to AgentEvent
206pub struct EventBridge {
207    event_tx: broadcast::Sender<AgentEvent>,
208}
209
210impl EventBridge {
211    pub fn new(event_tx: broadcast::Sender<AgentEvent>) -> Self {
212        Self { event_tx }
213    }
214
215    pub fn emit_dead_letter(&self, dead_letter: &DeadLetter) {
216        let _ = self.event_tx.send(AgentEvent::CommandDeadLettered {
217            command_id: dead_letter.command_id.clone(),
218            command_type: dead_letter.command_type.clone(),
219            lane: dead_letter.lane_id.clone(),
220            error: dead_letter.error.clone(),
221            attempts: dead_letter.attempts,
222        });
223    }
224
225    pub fn emit_retry(
226        &self,
227        command_id: &str,
228        command_type: &str,
229        lane: &str,
230        attempt: u32,
231        delay_ms: u64,
232    ) {
233        let _ = self.event_tx.send(AgentEvent::CommandRetry {
234            command_id: command_id.to_string(),
235            command_type: command_type.to_string(),
236            lane: lane.to_string(),
237            attempt,
238            delay_ms,
239        });
240    }
241
242    pub fn emit_alert(&self, level: &str, alert_type: &str, message: &str) {
243        let _ = self.event_tx.send(AgentEvent::QueueAlert {
244            level: level.to_string(),
245            alert_type: alert_type.to_string(),
246            message: message.to_string(),
247        });
248    }
249}
250
251// ============================================================================
252// Session Lane Queue
253// ============================================================================
254
255/// Per-session command queue backed by a3s-lane with external task handling
256pub struct SessionLaneQueue {
257    session_id: String,
258    manager: Arc<QueueManager>,
259    metrics: Option<QueueMetrics>,
260    external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
261    lane_handlers: Arc<RwLock<HashMap<SessionLane, LaneHandlerConfig>>>,
262    event_tx: broadcast::Sender<AgentEvent>,
263    event_bridge: Arc<EventBridge>,
264    task_id_counter: Arc<std::sync::atomic::AtomicU64>, // Fast task ID generation
265}
266
267impl SessionLaneQueue {
268    /// Create a new session lane queue
269    pub async fn new(
270        session_id: &str,
271        config: SessionQueueConfig,
272        event_tx: broadcast::Sender<AgentEvent>,
273    ) -> Result<Self> {
274        let (manager, metrics) = Self::build_queue_manager(&config).await?;
275        let mut lane_handlers = HashMap::new();
276        for lane in [
277            SessionLane::Control,
278            SessionLane::Query,
279            SessionLane::Execute,
280            SessionLane::Generate,
281        ] {
282            lane_handlers.insert(lane, config.handler_config(lane));
283        }
284        let event_bridge = Arc::new(EventBridge::new(event_tx.clone()));
285        Ok(Self {
286            session_id: session_id.to_string(),
287            manager: Arc::new(manager),
288            metrics,
289            external_tasks: Arc::new(RwLock::new(HashMap::new())),
290            lane_handlers: Arc::new(RwLock::new(lane_handlers)),
291            event_tx,
292            event_bridge,
293            task_id_counter: Arc::new(std::sync::atomic::AtomicU64::new(1)),
294        })
295    }
296
297    /// Build a3s-lane QueueManager directly from SessionQueueConfig
298    async fn build_queue_manager(
299        config: &SessionQueueConfig,
300    ) -> Result<(QueueManager, Option<QueueMetrics>)> {
301        let emitter = EventEmitter::new(100);
302        let mut builder = QueueManagerBuilder::new(emitter);
303        let default_timeout = config.default_timeout_ms.map(Duration::from_millis);
304        let default_retry = Some(RetryPolicy::exponential(3));
305
306        for lane in [
307            SessionLane::Control,
308            SessionLane::Query,
309            SessionLane::Execute,
310            SessionLane::Generate,
311        ] {
312            // Apply user-configured concurrency limits
313            let max_concurrency = match lane {
314                SessionLane::Control => config.control_max_concurrency,
315                SessionLane::Query => config.query_max_concurrency,
316                SessionLane::Execute => config.execute_max_concurrency,
317                SessionLane::Generate => config.generate_max_concurrency,
318            };
319
320            // Create LaneConfig with user-specified max_concurrency
321            let mut cfg = LaneConfig::new(1, max_concurrency);
322
323            if let Some(timeout) = default_timeout {
324                cfg = cfg.with_timeout(timeout);
325            }
326            if let Some(ref retry) = default_retry {
327                cfg = cfg.with_retry_policy(retry.clone());
328            }
329            if lane == SessionLane::Generate {
330                cfg = cfg.with_rate_limit(RateLimitConfig::per_minute(60));
331                cfg = cfg
332                    .with_priority_boost(PriorityBoostConfig::standard(Duration::from_secs(300)));
333            }
334            builder = builder.with_lane(lane.lane_id(), cfg, lane.lane_priority());
335        }
336
337        if config.enable_dlq {
338            builder = builder.with_dlq(config.dlq_max_size.unwrap_or(1000));
339        }
340
341        let metrics = if config.enable_metrics {
342            let m = QueueMetrics::local();
343            builder = builder.with_metrics(m.clone());
344            Some(m)
345        } else {
346            None
347        };
348
349        if config.enable_alerts {
350            builder = builder.with_alerts(Arc::new(AlertManager::with_queue_depth_alerts(50, 100)));
351        }
352
353        if let Some(ref storage_path) = config.storage_path {
354            builder = builder.with_storage(Arc::new(
355                LocalStorage::new(storage_path.to_path_buf()).await?,
356            ));
357        }
358
359        let manager = builder.build().await?;
360        Ok((manager, metrics))
361    }
362
363    pub async fn start(&self) -> Result<()> {
364        self.manager
365            .start()
366            .await
367            .map_err(|e| anyhow::anyhow!("Lane manager start failed: {}", e))
368    }
369    pub async fn stop(&self) {
370        self.manager.shutdown().await;
371    }
372
373    pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
374        self.lane_handlers.write().await.insert(lane, config);
375    }
376
377    pub async fn get_lane_handler(&self, lane: SessionLane) -> LaneHandlerConfig {
378        self.lane_handlers
379            .read()
380            .await
381            .get(&lane)
382            .cloned()
383            .unwrap_or_default()
384    }
385
386    /// Submit a command to a specific lane
387    pub async fn submit(
388        &self,
389        lane: SessionLane,
390        command: Box<dyn SessionCommand>,
391    ) -> oneshot::Receiver<Result<Value>> {
392        let (result_tx, result_rx) = oneshot::channel();
393        let handler_config = self.get_lane_handler(lane).await;
394
395        // Fast task ID generation using atomic counter instead of UUID
396        let task_id = format!(
397            "{}-{}",
398            self.session_id,
399            self.task_id_counter
400                .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
401        );
402
403        let adapter = SessionCommandAdapter::new(
404            command,
405            task_id,
406            handler_config.mode,
407            self.session_id.clone(),
408            lane,
409            handler_config.timeout_ms,
410            Arc::clone(&self.external_tasks),
411            self.event_tx.clone(),
412        );
413        match self.manager.submit(lane.lane_id(), Box::new(adapter)).await {
414            Ok(lane_rx) => {
415                tokio::spawn(async move {
416                    match lane_rx.await {
417                        Ok(Ok(value)) => {
418                            let _ = result_tx.send(Ok(value));
419                        }
420                        Ok(Err(e)) => {
421                            let _ = result_tx.send(Err(anyhow::anyhow!("{}", e)));
422                        }
423                        Err(_) => {
424                            let _ = result_tx.send(Err(anyhow::anyhow!("Channel closed")));
425                        }
426                    }
427                });
428            }
429            Err(e) => {
430                let _ = result_tx.send(Err(e.into()));
431            }
432        }
433        result_rx
434    }
435
436    pub async fn submit_by_tool(
437        &self,
438        tool_name: &str,
439        command: Box<dyn SessionCommand>,
440    ) -> oneshot::Receiver<Result<Value>> {
441        self.submit(SessionLane::from_tool_name(tool_name), command)
442            .await
443    }
444
445    /// Submit multiple commands to the same lane in batch (optimized)
446    ///
447    /// This is more efficient than calling submit() multiple times because:
448    /// - Handler config is fetched only once
449    /// - Task IDs are generated in batch
450    /// - Reduces lock contention
451    pub async fn submit_batch(
452        &self,
453        lane: SessionLane,
454        commands: Vec<Box<dyn SessionCommand>>,
455    ) -> Vec<oneshot::Receiver<Result<Value>>> {
456        if commands.is_empty() {
457            return Vec::new();
458        }
459
460        // Fetch handler config once for all commands
461        let handler_config = self.get_lane_handler(lane).await;
462
463        let mut receivers = Vec::with_capacity(commands.len());
464
465        for command in commands {
466            let (result_tx, result_rx) = oneshot::channel();
467
468            // Fast task ID generation using atomic counter
469            let task_id = format!(
470                "{}-{}",
471                self.session_id,
472                self.task_id_counter
473                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
474            );
475
476            let adapter = SessionCommandAdapter::new(
477                command,
478                task_id,
479                handler_config.mode,
480                self.session_id.clone(),
481                lane,
482                handler_config.timeout_ms,
483                Arc::clone(&self.external_tasks),
484                self.event_tx.clone(),
485            );
486
487            match self.manager.submit(lane.lane_id(), Box::new(adapter)).await {
488                Ok(lane_rx) => {
489                    tokio::spawn(async move {
490                        match lane_rx.await {
491                            Ok(Ok(value)) => {
492                                let _ = result_tx.send(Ok(value));
493                            }
494                            Ok(Err(e)) => {
495                                let _ = result_tx.send(Err(anyhow::anyhow!("{}", e)));
496                            }
497                            Err(_) => {
498                                let _ = result_tx.send(Err(anyhow::anyhow!("Channel closed")));
499                            }
500                        }
501                    });
502                }
503                Err(e) => {
504                    let _ = result_tx.send(Err(e.into()));
505                }
506            }
507
508            receivers.push(result_rx);
509        }
510
511        receivers
512    }
513
514    /// Submit multiple commands by tool name in batch (optimized)
515    pub async fn submit_batch_by_tool(
516        &self,
517        tool_name: &str,
518        commands: Vec<Box<dyn SessionCommand>>,
519    ) -> Vec<oneshot::Receiver<Result<Value>>> {
520        self.submit_batch(SessionLane::from_tool_name(tool_name), commands)
521            .await
522    }
523
524    pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
525        let pending = { self.external_tasks.write().await.remove(task_id) };
526        if let Some(pending) = pending {
527            let _ = self.event_tx.send(AgentEvent::ExternalTaskCompleted {
528                task_id: task_id.to_string(),
529                session_id: self.session_id.clone(),
530                success: result.success,
531            });
532            let final_result = if result.success {
533                Ok(result.result)
534            } else {
535                Err(anyhow::anyhow!(result
536                    .error
537                    .unwrap_or_else(|| "External task failed".to_string())))
538            };
539            let _ = pending.result_tx.send(final_result);
540            true
541        } else {
542            false
543        }
544    }
545
546    pub async fn stats(&self) -> crate::queue::SessionQueueStats {
547        let lane_stats = self.manager.stats().await.ok();
548        let external_tasks = self.external_tasks.read().await;
549        let mut total_pending = 0;
550        let mut total_active = 0;
551        let mut lanes = HashMap::new();
552        if let Some(stats) = lane_stats {
553            for (lane_id, lane_stat) in stats.lanes {
554                total_pending += lane_stat.pending;
555                total_active += lane_stat.active;
556                let session_lane = match lane_id.as_str() {
557                    "control" => SessionLane::Control,
558                    "query" => SessionLane::Query,
559                    "skill" => SessionLane::Execute,
560                    "prompt" => SessionLane::Generate,
561                    _ => continue,
562                };
563                let handler_mode = self.get_lane_handler(session_lane).await.mode;
564                lanes.insert(
565                    format!("{:?}", session_lane),
566                    crate::queue::LaneStatus {
567                        lane: session_lane,
568                        pending: lane_stat.pending,
569                        active: lane_stat.active,
570                        max_concurrency: lane_stat.max,
571                        handler_mode,
572                    },
573                );
574            }
575        }
576        crate::queue::SessionQueueStats {
577            total_pending,
578            total_active,
579            external_pending: external_tasks.len(),
580            lanes,
581        }
582    }
583
584    pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
585        self.external_tasks
586            .read()
587            .await
588            .values()
589            .map(|p| p.task.clone())
590            .collect()
591    }
592
593    pub fn session_id(&self) -> &str {
594        &self.session_id
595    }
596
597    /// Access the event bridge for emitting queue lifecycle events
598    pub fn event_bridge(&self) -> &EventBridge {
599        &self.event_bridge
600    }
601
602    /// Subscribe to queue events (CommandDeadLettered, CommandRetry, QueueAlert, etc.)
603    pub fn subscribe(&self) -> broadcast::Receiver<AgentEvent> {
604        self.event_tx.subscribe()
605    }
606
607    pub async fn dead_letters(&self) -> Vec<DeadLetter> {
608        if let Some(dlq) = self.manager.queue().dlq() {
609            dlq.list().await
610        } else {
611            Vec::new()
612        }
613    }
614
615    pub async fn metrics_snapshot(&self) -> Option<MetricsSnapshot> {
616        if let Some(ref m) = self.metrics {
617            Some(m.snapshot().await)
618        } else {
619            None
620        }
621    }
622
623    pub async fn drain(&self, timeout: Duration) -> Result<()> {
624        Ok(self.manager.drain(timeout).await?)
625    }
626    pub fn is_shutting_down(&self) -> bool {
627        self.manager.is_shutting_down()
628    }
629}
630
631#[cfg(test)]
632mod tests {
633    use super::*;
634    use crate::queue::SessionCommand;
635
636    struct TestCommand {
637        value: Value,
638    }
639
640    #[async_trait]
641    impl SessionCommand for TestCommand {
642        async fn execute(&self) -> Result<Value> {
643            Ok(self.value.clone())
644        }
645        fn command_type(&self) -> &str {
646            "test"
647        }
648        fn payload(&self) -> Value {
649            self.value.clone()
650        }
651    }
652
653    #[tokio::test]
654    async fn test_session_lane_queue_creation() {
655        let (tx, _) = broadcast::channel(100);
656        let q = SessionLaneQueue::new("test-session", SessionQueueConfig::default(), tx)
657            .await
658            .unwrap();
659        assert_eq!(q.session_id(), "test-session");
660    }
661
662    #[tokio::test]
663    async fn test_submit_and_execute() {
664        let (tx, _) = broadcast::channel(100);
665        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
666            .await
667            .unwrap();
668        q.start().await.unwrap();
669        let cmd = Box::new(TestCommand {
670            value: serde_json::json!({"result": "success"}),
671        });
672        let rx = q.submit(SessionLane::Query, cmd).await;
673        let result = tokio::time::timeout(Duration::from_secs(2), rx)
674            .await
675            .unwrap()
676            .unwrap();
677        assert_eq!(result.unwrap()["result"], "success");
678        q.stop().await;
679    }
680
681    #[tokio::test]
682    async fn test_stats() {
683        let (tx, _) = broadcast::channel(100);
684        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
685            .await
686            .unwrap();
687        q.start().await.unwrap();
688        let stats = q.stats().await;
689        assert_eq!(stats.total_pending, 0);
690        assert_eq!(stats.total_active, 0);
691        assert_eq!(stats.external_pending, 0);
692        q.stop().await;
693    }
694
695    #[tokio::test]
696    async fn test_lane_handler_config() {
697        let (tx, _) = broadcast::channel(100);
698        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
699            .await
700            .unwrap();
701        assert_eq!(
702            q.get_lane_handler(SessionLane::Execute).await.mode,
703            TaskHandlerMode::Internal
704        );
705        q.set_lane_handler(
706            SessionLane::Execute,
707            LaneHandlerConfig {
708                mode: TaskHandlerMode::External,
709                timeout_ms: 30000,
710            },
711        )
712        .await;
713        let h = q.get_lane_handler(SessionLane::Execute).await;
714        assert_eq!(h.mode, TaskHandlerMode::External);
715        assert_eq!(h.timeout_ms, 30000);
716    }
717
718    #[tokio::test]
719    async fn test_submit_by_tool() {
720        let (tx, _) = broadcast::channel(100);
721        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
722            .await
723            .unwrap();
724        q.start().await.unwrap();
725        let cmd = Box::new(TestCommand {
726            value: serde_json::json!({"tool": "read"}),
727        });
728        let rx = q.submit_by_tool("read", cmd).await;
729        let result = tokio::time::timeout(Duration::from_secs(2), rx)
730            .await
731            .unwrap()
732            .unwrap();
733        assert_eq!(result.unwrap()["tool"], "read");
734        q.stop().await;
735    }
736
737    #[tokio::test]
738    async fn test_dead_letters_empty() {
739        let (tx, _) = broadcast::channel(100);
740        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
741            .await
742            .unwrap();
743        assert!(q.dead_letters().await.is_empty());
744    }
745
746    #[tokio::test]
747    async fn test_metrics_snapshot() {
748        let (tx, _) = broadcast::channel(100);
749        let cfg = SessionQueueConfig {
750            enable_metrics: true,
751            ..Default::default()
752        };
753        let q = SessionLaneQueue::new("s", cfg, tx).await.unwrap();
754        q.start().await.unwrap();
755        assert!(q.metrics_snapshot().await.is_some());
756        q.stop().await;
757    }
758
759    #[tokio::test]
760    async fn test_is_shutting_down() {
761        let (tx, _) = broadcast::channel(100);
762        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
763            .await
764            .unwrap();
765        assert!(!q.is_shutting_down());
766        q.stop().await;
767        assert!(q.is_shutting_down());
768    }
769
770    #[tokio::test]
771    async fn test_pending_external_tasks_empty() {
772        let (tx, _) = broadcast::channel(100);
773        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
774            .await
775            .unwrap();
776        assert!(q.pending_external_tasks().await.is_empty());
777    }
778
779    #[tokio::test]
780    async fn test_complete_external_task_nonexistent() {
781        let (tx, _) = broadcast::channel(100);
782        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
783            .await
784            .unwrap();
785        let r = ExternalTaskResult {
786            success: true,
787            result: serde_json::json!("ok"),
788            error: None,
789        };
790        assert!(!q.complete_external_task("nope", r).await);
791    }
792
793    #[test]
794    fn test_command_payload() {
795        let cmd = TestCommand {
796            value: serde_json::json!({"k": "v"}),
797        };
798        assert_eq!(cmd.payload(), serde_json::json!({"k": "v"}));
799        assert_eq!(cmd.command_type(), "test");
800    }
801
802    #[test]
803    fn test_event_bridge_new() {
804        let (tx, _) = broadcast::channel(100);
805        let _b = EventBridge::new(tx);
806        // EventBridge constructed successfully
807    }
808
809    #[test]
810    fn test_event_bridge_emit_dead_letter() {
811        let (tx, mut rx) = broadcast::channel(100);
812        let b = EventBridge::new(tx);
813        b.emit_dead_letter(&DeadLetter {
814            command_id: "c1".to_string(),
815            command_type: "t".to_string(),
816            lane_id: "control".to_string(),
817            error: "err".to_string(),
818            attempts: 3,
819            failed_at: chrono::Utc::now(),
820        });
821        match rx.try_recv().unwrap() {
822            AgentEvent::CommandDeadLettered {
823                command_id,
824                attempts,
825                ..
826            } => {
827                assert_eq!(command_id, "c1");
828                assert_eq!(attempts, 3);
829            }
830            _ => panic!("wrong event"),
831        }
832    }
833
834    #[test]
835    fn test_event_bridge_emit_retry() {
836        let (tx, mut rx) = broadcast::channel(100);
837        let b = EventBridge::new(tx);
838        b.emit_retry("c1", "t", "query", 2, 1000);
839        match rx.try_recv().unwrap() {
840            AgentEvent::CommandRetry {
841                attempt, delay_ms, ..
842            } => {
843                assert_eq!(attempt, 2);
844                assert_eq!(delay_ms, 1000);
845            }
846            _ => panic!("wrong event"),
847        }
848    }
849
850    #[test]
851    fn test_event_bridge_emit_alert() {
852        let (tx, mut rx) = broadcast::channel(100);
853        let b = EventBridge::new(tx);
854        b.emit_alert("warning", "queue_full", "at capacity");
855        match rx.try_recv().unwrap() {
856            AgentEvent::QueueAlert { level, .. } => assert_eq!(level, "warning"),
857            _ => panic!("wrong event"),
858        }
859    }
860
861    #[test]
862    fn test_lane_mapping() {
863        assert_eq!(SessionLane::Control.lane_id(), "control");
864        assert_eq!(SessionLane::Query.lane_id(), "query");
865        assert_eq!(SessionLane::Execute.lane_id(), "skill");
866        assert_eq!(SessionLane::Generate.lane_id(), "prompt");
867    }
868
869    #[test]
870    fn test_lane_priority() {
871        assert!(SessionLane::Control.lane_priority() < SessionLane::Query.lane_priority());
872        assert!(SessionLane::Query.lane_priority() < SessionLane::Execute.lane_priority());
873        assert!(SessionLane::Execute.lane_priority() < SessionLane::Generate.lane_priority());
874    }
875
876    // Note: lane_config() method was removed, config is now handled by SessionQueueConfig
877
878    #[tokio::test]
879    async fn test_build_queue_manager_default() {
880        let (_, metrics) = SessionLaneQueue::build_queue_manager(&SessionQueueConfig::default())
881            .await
882            .unwrap();
883        assert!(metrics.is_none());
884    }
885
886    #[tokio::test]
887    async fn test_build_queue_manager_with_metrics() {
888        let cfg = SessionQueueConfig {
889            enable_metrics: true,
890            ..Default::default()
891        };
892        let (_, metrics) = SessionLaneQueue::build_queue_manager(&cfg).await.unwrap();
893        assert!(metrics.is_some());
894    }
895
896    #[tokio::test]
897    async fn test_build_queue_manager_with_dlq() {
898        let cfg = SessionQueueConfig {
899            enable_dlq: true,
900            dlq_max_size: Some(500),
901            ..Default::default()
902        };
903        assert!(SessionLaneQueue::build_queue_manager(&cfg).await.is_ok());
904    }
905}