Skip to main content

a3s_code_core/
session_lane_queue.rs

1//! Session Lane Queue - a3s-lane backed command queue
2//!
3//! Provides per-session command queues with lane-based priority scheduling,
4//! backed by a3s-lane directly (no intermediate wrapper).
5//!
6//! ## Features
7//!
8//! - Per-session QueueManager with independent queue, metrics, DLQ
9//! - Preserves Internal/External/Hybrid task handler modes
10//! - Dead letter queue for failed commands
11//! - Metrics collection and alerts
12//! - Retry policies and rate limiting
13
14use crate::agent::AgentEvent;
15use crate::queue::SessionLane;
16use crate::queue::{
17    ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionCommand, SessionQueueConfig,
18    TaskHandlerMode,
19};
20use a3s_lane::{
21    AlertManager, Command as LaneCommand, DeadLetter, EventEmitter, LaneConfig, LaneError,
22    LocalStorage, MetricsSnapshot, PriorityBoostConfig, QueueManager, QueueManagerBuilder,
23    QueueMetrics, RateLimitConfig, Result as LaneResult, RetryPolicy,
24};
25use anyhow::Result;
26use async_trait::async_trait;
27use serde_json::Value;
28use std::collections::HashMap;
29use std::sync::Arc;
30use std::time::{Duration, Instant};
31use tokio::sync::{broadcast, oneshot, RwLock};
32
33// ============================================================================
34// SessionLane → a3s-lane mapping
35// ============================================================================
36
37impl SessionLane {
38    /// Get a3s-lane lane ID string
39    fn lane_id(self) -> &'static str {
40        match self {
41            SessionLane::Control => "control",
42            SessionLane::Query => "query",
43            SessionLane::Execute => "skill",
44            SessionLane::Generate => "prompt",
45        }
46    }
47
48    /// Get priority value (lower = higher priority)
49    fn lane_priority(self) -> u8 {
50        match self {
51            SessionLane::Control => 1,
52            SessionLane::Query => 2,
53            SessionLane::Execute => 4,
54            SessionLane::Generate => 5,
55        }
56    }
57}
58
59// ============================================================================
60// Pending External Task
61// ============================================================================
62
63/// A pending external task waiting for completion
64struct PendingExternalTask {
65    task: ExternalTask,
66    result_tx: oneshot::Sender<Result<Value>>,
67}
68
69// ============================================================================
70// Session Command Adapter
71// ============================================================================
72
73/// Adapter that wraps SessionCommand for a3s-lane execution
74pub struct SessionCommandAdapter {
75    inner: Box<dyn SessionCommand>,
76    task_id: String,
77    handler_mode: TaskHandlerMode,
78    session_id: String,
79    lane: SessionLane,
80    timeout_ms: u64,
81    external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
82    event_tx: broadcast::Sender<AgentEvent>,
83}
84
85impl SessionCommandAdapter {
86    #[allow(clippy::too_many_arguments)]
87    fn new(
88        inner: Box<dyn SessionCommand>,
89        task_id: String,
90        handler_mode: TaskHandlerMode,
91        session_id: String,
92        lane: SessionLane,
93        timeout_ms: u64,
94        external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
95        event_tx: broadcast::Sender<AgentEvent>,
96    ) -> Self {
97        Self {
98            inner,
99            task_id,
100            handler_mode,
101            session_id,
102            lane,
103            timeout_ms,
104            external_tasks,
105            event_tx,
106        }
107    }
108
109    /// Register as external task and wait for completion
110    async fn register_and_wait(&self) -> LaneResult<Value> {
111        let (tx, rx) = oneshot::channel();
112        let task = ExternalTask {
113            task_id: self.task_id.clone(),
114            session_id: self.session_id.clone(),
115            lane: self.lane,
116            command_type: self.inner.command_type().to_string(),
117            payload: self.inner.payload(),
118            timeout_ms: self.timeout_ms,
119            created_at: Some(Instant::now()),
120        };
121        {
122            let mut tasks = self.external_tasks.write().await;
123            tasks.insert(
124                self.task_id.clone(),
125                PendingExternalTask {
126                    task: task.clone(),
127                    result_tx: tx,
128                },
129            );
130        }
131        let _ = self.event_tx.send(AgentEvent::ExternalTaskPending {
132            task_id: task.task_id.clone(),
133            session_id: task.session_id.clone(),
134            lane: task.lane,
135            command_type: task.command_type.clone(),
136            payload: task.payload.clone(),
137            timeout_ms: task.timeout_ms,
138        });
139        match tokio::time::timeout(Duration::from_millis(self.timeout_ms), rx).await {
140            Ok(Ok(result)) => result.map_err(|e| LaneError::CommandError(e.to_string())),
141            Ok(Err(_)) => Err(LaneError::CommandError("Channel closed".to_string())),
142            Err(_) => {
143                let mut tasks = self.external_tasks.write().await;
144                tasks.remove(&self.task_id);
145                Err(LaneError::Timeout(Duration::from_millis(self.timeout_ms)))
146            }
147        }
148    }
149
150    /// Execute internally with external notification (Hybrid mode)
151    async fn execute_with_notification(&self) -> LaneResult<Value> {
152        let task = ExternalTask {
153            task_id: self.task_id.clone(),
154            session_id: self.session_id.clone(),
155            lane: self.lane,
156            command_type: self.inner.command_type().to_string(),
157            payload: self.inner.payload(),
158            timeout_ms: self.timeout_ms,
159            created_at: Some(Instant::now()),
160        };
161        let _ = self.event_tx.send(AgentEvent::ExternalTaskPending {
162            task_id: task.task_id.clone(),
163            session_id: task.session_id.clone(),
164            lane: task.lane,
165            command_type: task.command_type.clone(),
166            payload: task.payload.clone(),
167            timeout_ms: task.timeout_ms,
168        });
169        let result = self
170            .inner
171            .execute()
172            .await
173            .map_err(|e| LaneError::CommandError(e.to_string()));
174        let _ = self.event_tx.send(AgentEvent::ExternalTaskCompleted {
175            task_id: self.task_id.clone(),
176            session_id: self.session_id.clone(),
177            success: result.is_ok(),
178        });
179        result
180    }
181}
182
183#[async_trait]
184impl LaneCommand for SessionCommandAdapter {
185    async fn execute(&self) -> LaneResult<Value> {
186        match self.handler_mode {
187            TaskHandlerMode::Internal => self
188                .inner
189                .execute()
190                .await
191                .map_err(|e| LaneError::CommandError(e.to_string())),
192            TaskHandlerMode::External => self.register_and_wait().await,
193            TaskHandlerMode::Hybrid => self.execute_with_notification().await,
194        }
195    }
196    fn command_type(&self) -> &str {
197        self.inner.command_type()
198    }
199}
200
201// ============================================================================
202// Event Bridge
203// ============================================================================
204
205/// Bridge that translates a3s-lane events to AgentEvent
206pub struct EventBridge {
207    event_tx: broadcast::Sender<AgentEvent>,
208}
209
210impl EventBridge {
211    pub fn new(event_tx: broadcast::Sender<AgentEvent>) -> Self {
212        Self { event_tx }
213    }
214
215    pub fn emit_dead_letter(&self, dead_letter: &DeadLetter) {
216        let _ = self.event_tx.send(AgentEvent::CommandDeadLettered {
217            command_id: dead_letter.command_id.clone(),
218            command_type: dead_letter.command_type.clone(),
219            lane: dead_letter.lane_id.clone(),
220            error: dead_letter.error.clone(),
221            attempts: dead_letter.attempts,
222        });
223    }
224
225    pub fn emit_retry(
226        &self,
227        command_id: &str,
228        command_type: &str,
229        lane: &str,
230        attempt: u32,
231        delay_ms: u64,
232    ) {
233        let _ = self.event_tx.send(AgentEvent::CommandRetry {
234            command_id: command_id.to_string(),
235            command_type: command_type.to_string(),
236            lane: lane.to_string(),
237            attempt,
238            delay_ms,
239        });
240    }
241
242    pub fn emit_alert(&self, level: &str, alert_type: &str, message: &str) {
243        let _ = self.event_tx.send(AgentEvent::QueueAlert {
244            level: level.to_string(),
245            alert_type: alert_type.to_string(),
246            message: message.to_string(),
247        });
248    }
249}
250
251// ============================================================================
252// Session Lane Queue
253// ============================================================================
254
255/// Per-session command queue backed by a3s-lane with external task handling
256pub struct SessionLaneQueue {
257    session_id: String,
258    manager: Arc<QueueManager>,
259    metrics: Option<QueueMetrics>,
260    external_tasks: Arc<RwLock<HashMap<String, PendingExternalTask>>>,
261    lane_handlers: Arc<RwLock<HashMap<SessionLane, LaneHandlerConfig>>>,
262    event_tx: broadcast::Sender<AgentEvent>,
263    event_bridge: Arc<EventBridge>,
264    task_id_counter: Arc<std::sync::atomic::AtomicU64>,  // Fast task ID generation
265}
266
267impl SessionLaneQueue {
268    /// Create a new session lane queue
269    pub async fn new(
270        session_id: &str,
271        config: SessionQueueConfig,
272        event_tx: broadcast::Sender<AgentEvent>,
273    ) -> Result<Self> {
274        let (manager, metrics) = Self::build_queue_manager(&config).await?;
275        let mut lane_handlers = HashMap::new();
276        for lane in [
277            SessionLane::Control,
278            SessionLane::Query,
279            SessionLane::Execute,
280            SessionLane::Generate,
281        ] {
282            lane_handlers.insert(lane, config.handler_config(lane));
283        }
284        let event_bridge = Arc::new(EventBridge::new(event_tx.clone()));
285        Ok(Self {
286            session_id: session_id.to_string(),
287            manager: Arc::new(manager),
288            metrics,
289            external_tasks: Arc::new(RwLock::new(HashMap::new())),
290            lane_handlers: Arc::new(RwLock::new(lane_handlers)),
291            event_tx,
292            event_bridge,
293            task_id_counter: Arc::new(std::sync::atomic::AtomicU64::new(1)),
294        })
295    }
296
297    /// Build a3s-lane QueueManager directly from SessionQueueConfig
298    async fn build_queue_manager(
299        config: &SessionQueueConfig,
300    ) -> Result<(QueueManager, Option<QueueMetrics>)> {
301        let emitter = EventEmitter::new(100);
302        let mut builder = QueueManagerBuilder::new(emitter);
303        let default_timeout = config.default_timeout_ms.map(Duration::from_millis);
304        let default_retry = Some(RetryPolicy::exponential(3));
305
306        for lane in [
307            SessionLane::Control,
308            SessionLane::Query,
309            SessionLane::Execute,
310            SessionLane::Generate,
311        ] {
312            // Apply user-configured concurrency limits
313            let max_concurrency = match lane {
314                SessionLane::Control => config.control_max_concurrency,
315                SessionLane::Query => config.query_max_concurrency,
316                SessionLane::Execute => config.execute_max_concurrency,
317                SessionLane::Generate => config.generate_max_concurrency,
318            };
319
320            // Create LaneConfig with user-specified max_concurrency
321            let mut cfg = LaneConfig::new(1, max_concurrency);
322
323            if let Some(timeout) = default_timeout {
324                cfg = cfg.with_timeout(timeout);
325            }
326            if let Some(ref retry) = default_retry {
327                cfg = cfg.with_retry_policy(retry.clone());
328            }
329            if lane == SessionLane::Generate {
330                cfg = cfg.with_rate_limit(RateLimitConfig::per_minute(60));
331                cfg = cfg
332                    .with_priority_boost(PriorityBoostConfig::standard(Duration::from_secs(300)));
333            }
334            builder = builder.with_lane(lane.lane_id(), cfg, lane.lane_priority());
335        }
336
337        if config.enable_dlq {
338            builder = builder.with_dlq(config.dlq_max_size.unwrap_or(1000));
339        }
340
341        let metrics = if config.enable_metrics {
342            let m = QueueMetrics::local();
343            builder = builder.with_metrics(m.clone());
344            Some(m)
345        } else {
346            None
347        };
348
349        if config.enable_alerts {
350            builder = builder.with_alerts(Arc::new(AlertManager::with_queue_depth_alerts(50, 100)));
351        }
352
353        if let Some(ref storage_path) = config.storage_path {
354            builder = builder.with_storage(Arc::new(
355                LocalStorage::new(storage_path.to_path_buf()).await?,
356            ));
357        }
358
359        let manager = builder.build().await?;
360        Ok((manager, metrics))
361    }
362
363    pub async fn start(&self) -> Result<()> {
364        self.manager.start().await.map_err(|e| anyhow::anyhow!("Lane manager start failed: {}", e))
365    }
366    pub async fn stop(&self) {
367        self.manager.shutdown().await;
368    }
369
370    pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
371        self.lane_handlers.write().await.insert(lane, config);
372    }
373
374    pub async fn get_lane_handler(&self, lane: SessionLane) -> LaneHandlerConfig {
375        self.lane_handlers
376            .read()
377            .await
378            .get(&lane)
379            .cloned()
380            .unwrap_or_default()
381    }
382
383    /// Submit a command to a specific lane
384    pub async fn submit(
385        &self,
386        lane: SessionLane,
387        command: Box<dyn SessionCommand>,
388    ) -> oneshot::Receiver<Result<Value>> {
389        let (result_tx, result_rx) = oneshot::channel();
390        let handler_config = self.get_lane_handler(lane).await;
391
392        // Fast task ID generation using atomic counter instead of UUID
393        let task_id = format!(
394            "{}-{}",
395            self.session_id,
396            self.task_id_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
397        );
398
399        let adapter = SessionCommandAdapter::new(
400            command,
401            task_id,
402            handler_config.mode,
403            self.session_id.clone(),
404            lane,
405            handler_config.timeout_ms,
406            Arc::clone(&self.external_tasks),
407            self.event_tx.clone(),
408        );
409        match self.manager.submit(lane.lane_id(), Box::new(adapter)).await {
410            Ok(lane_rx) => {
411                tokio::spawn(async move {
412                    match lane_rx.await {
413                        Ok(Ok(value)) => {
414                            let _ = result_tx.send(Ok(value));
415                        }
416                        Ok(Err(e)) => {
417                            let _ = result_tx.send(Err(anyhow::anyhow!("{}", e)));
418                        }
419                        Err(_) => {
420                            let _ = result_tx.send(Err(anyhow::anyhow!("Channel closed")));
421                        }
422                    }
423                });
424            }
425            Err(e) => {
426                let _ = result_tx.send(Err(e.into()));
427            }
428        }
429        result_rx
430    }
431
432    pub async fn submit_by_tool(
433        &self,
434        tool_name: &str,
435        command: Box<dyn SessionCommand>,
436    ) -> oneshot::Receiver<Result<Value>> {
437        self.submit(SessionLane::from_tool_name(tool_name), command)
438            .await
439    }
440
441    /// Submit multiple commands to the same lane in batch (optimized)
442    ///
443    /// This is more efficient than calling submit() multiple times because:
444    /// - Handler config is fetched only once
445    /// - Task IDs are generated in batch
446    /// - Reduces lock contention
447    pub async fn submit_batch(
448        &self,
449        lane: SessionLane,
450        commands: Vec<Box<dyn SessionCommand>>,
451    ) -> Vec<oneshot::Receiver<Result<Value>>> {
452        if commands.is_empty() {
453            return Vec::new();
454        }
455
456        // Fetch handler config once for all commands
457        let handler_config = self.get_lane_handler(lane).await;
458
459        let mut receivers = Vec::with_capacity(commands.len());
460
461        for command in commands {
462            let (result_tx, result_rx) = oneshot::channel();
463
464            // Fast task ID generation using atomic counter
465            let task_id = format!(
466                "{}-{}",
467                self.session_id,
468                self.task_id_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
469            );
470
471            let adapter = SessionCommandAdapter::new(
472                command,
473                task_id,
474                handler_config.mode,
475                self.session_id.clone(),
476                lane,
477                handler_config.timeout_ms,
478                Arc::clone(&self.external_tasks),
479                self.event_tx.clone(),
480            );
481
482            match self.manager.submit(lane.lane_id(), Box::new(adapter)).await {
483                Ok(lane_rx) => {
484                    tokio::spawn(async move {
485                        match lane_rx.await {
486                            Ok(Ok(value)) => {
487                                let _ = result_tx.send(Ok(value));
488                            }
489                            Ok(Err(e)) => {
490                                let _ = result_tx.send(Err(anyhow::anyhow!("{}", e)));
491                            }
492                            Err(_) => {
493                                let _ = result_tx.send(Err(anyhow::anyhow!("Channel closed")));
494                            }
495                        }
496                    });
497                }
498                Err(e) => {
499                    let _ = result_tx.send(Err(e.into()));
500                }
501            }
502
503            receivers.push(result_rx);
504        }
505
506        receivers
507    }
508
509    /// Submit multiple commands by tool name in batch (optimized)
510    pub async fn submit_batch_by_tool(
511        &self,
512        tool_name: &str,
513        commands: Vec<Box<dyn SessionCommand>>,
514    ) -> Vec<oneshot::Receiver<Result<Value>>> {
515        self.submit_batch(SessionLane::from_tool_name(tool_name), commands)
516            .await
517    }
518
519    pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
520        let pending = { self.external_tasks.write().await.remove(task_id) };
521        if let Some(pending) = pending {
522            let _ = self.event_tx.send(AgentEvent::ExternalTaskCompleted {
523                task_id: task_id.to_string(),
524                session_id: self.session_id.clone(),
525                success: result.success,
526            });
527            let final_result = if result.success {
528                Ok(result.result)
529            } else {
530                Err(anyhow::anyhow!(result
531                    .error
532                    .unwrap_or_else(|| "External task failed".to_string())))
533            };
534            let _ = pending.result_tx.send(final_result);
535            true
536        } else {
537            false
538        }
539    }
540
541    pub async fn stats(&self) -> crate::queue::SessionQueueStats {
542        let lane_stats = self.manager.stats().await.ok();
543        let external_tasks = self.external_tasks.read().await;
544        let mut total_pending = 0;
545        let mut total_active = 0;
546        let mut lanes = HashMap::new();
547        if let Some(stats) = lane_stats {
548            for (lane_id, lane_stat) in stats.lanes {
549                total_pending += lane_stat.pending;
550                total_active += lane_stat.active;
551                let session_lane = match lane_id.as_str() {
552                    "control" => SessionLane::Control,
553                    "query" => SessionLane::Query,
554                    "skill" => SessionLane::Execute,
555                    "prompt" => SessionLane::Generate,
556                    _ => continue,
557                };
558                let handler_mode = self.get_lane_handler(session_lane).await.mode;
559                lanes.insert(
560                    format!("{:?}", session_lane),
561                    crate::queue::LaneStatus {
562                        lane: session_lane,
563                        pending: lane_stat.pending,
564                        active: lane_stat.active,
565                        max_concurrency: lane_stat.max,
566                        handler_mode,
567                    },
568                );
569            }
570        }
571        crate::queue::SessionQueueStats {
572            total_pending,
573            total_active,
574            external_pending: external_tasks.len(),
575            lanes,
576        }
577    }
578
579    pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
580        self.external_tasks
581            .read()
582            .await
583            .values()
584            .map(|p| p.task.clone())
585            .collect()
586    }
587
588    pub fn session_id(&self) -> &str {
589        &self.session_id
590    }
591
592    /// Access the event bridge for emitting queue lifecycle events
593    pub fn event_bridge(&self) -> &EventBridge {
594        &self.event_bridge
595    }
596
597    pub async fn dead_letters(&self) -> Vec<DeadLetter> {
598        if let Some(dlq) = self.manager.queue().dlq() {
599            dlq.list().await
600        } else {
601            Vec::new()
602        }
603    }
604
605    pub async fn metrics_snapshot(&self) -> Option<MetricsSnapshot> {
606        if let Some(ref m) = self.metrics {
607            Some(m.snapshot().await)
608        } else {
609            None
610        }
611    }
612
613    pub async fn drain(&self, timeout: Duration) -> Result<()> {
614        Ok(self.manager.drain(timeout).await?)
615    }
616    pub fn is_shutting_down(&self) -> bool {
617        self.manager.is_shutting_down()
618    }
619}
620
621#[cfg(test)]
622mod tests {
623    use super::*;
624    use crate::queue::SessionCommand;
625
626    struct TestCommand {
627        value: Value,
628    }
629
630    #[async_trait]
631    impl SessionCommand for TestCommand {
632        async fn execute(&self) -> Result<Value> {
633            Ok(self.value.clone())
634        }
635        fn command_type(&self) -> &str {
636            "test"
637        }
638        fn payload(&self) -> Value {
639            self.value.clone()
640        }
641    }
642
643    #[tokio::test]
644    async fn test_session_lane_queue_creation() {
645        let (tx, _) = broadcast::channel(100);
646        let q = SessionLaneQueue::new("test-session", SessionQueueConfig::default(), tx)
647            .await
648            .unwrap();
649        assert_eq!(q.session_id(), "test-session");
650    }
651
652    #[tokio::test]
653    async fn test_submit_and_execute() {
654        let (tx, _) = broadcast::channel(100);
655        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
656            .await
657            .unwrap();
658        q.start().await.unwrap();
659        let cmd = Box::new(TestCommand {
660            value: serde_json::json!({"result": "success"}),
661        });
662        let rx = q.submit(SessionLane::Query, cmd).await;
663        let result = tokio::time::timeout(Duration::from_secs(2), rx)
664            .await
665            .unwrap()
666            .unwrap();
667        assert_eq!(result.unwrap()["result"], "success");
668        q.stop().await;
669    }
670
671    #[tokio::test]
672    async fn test_stats() {
673        let (tx, _) = broadcast::channel(100);
674        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
675            .await
676            .unwrap();
677        q.start().await.unwrap();
678        let stats = q.stats().await;
679        assert_eq!(stats.total_pending, 0);
680        assert_eq!(stats.total_active, 0);
681        assert_eq!(stats.external_pending, 0);
682        q.stop().await;
683    }
684
685    #[tokio::test]
686    async fn test_lane_handler_config() {
687        let (tx, _) = broadcast::channel(100);
688        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
689            .await
690            .unwrap();
691        assert_eq!(
692            q.get_lane_handler(SessionLane::Execute).await.mode,
693            TaskHandlerMode::Internal
694        );
695        q.set_lane_handler(
696            SessionLane::Execute,
697            LaneHandlerConfig {
698                mode: TaskHandlerMode::External,
699                timeout_ms: 30000,
700            },
701        )
702        .await;
703        let h = q.get_lane_handler(SessionLane::Execute).await;
704        assert_eq!(h.mode, TaskHandlerMode::External);
705        assert_eq!(h.timeout_ms, 30000);
706    }
707
708    #[tokio::test]
709    async fn test_submit_by_tool() {
710        let (tx, _) = broadcast::channel(100);
711        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
712            .await
713            .unwrap();
714        q.start().await.unwrap();
715        let cmd = Box::new(TestCommand {
716            value: serde_json::json!({"tool": "read"}),
717        });
718        let rx = q.submit_by_tool("read", cmd).await;
719        let result = tokio::time::timeout(Duration::from_secs(2), rx)
720            .await
721            .unwrap()
722            .unwrap();
723        assert_eq!(result.unwrap()["tool"], "read");
724        q.stop().await;
725    }
726
727    #[tokio::test]
728    async fn test_dead_letters_empty() {
729        let (tx, _) = broadcast::channel(100);
730        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
731            .await
732            .unwrap();
733        assert!(q.dead_letters().await.is_empty());
734    }
735
736    #[tokio::test]
737    async fn test_metrics_snapshot() {
738        let (tx, _) = broadcast::channel(100);
739        let cfg = SessionQueueConfig {
740            enable_metrics: true,
741            ..Default::default()
742        };
743        let q = SessionLaneQueue::new("s", cfg, tx).await.unwrap();
744        q.start().await.unwrap();
745        assert!(q.metrics_snapshot().await.is_some());
746        q.stop().await;
747    }
748
749    #[tokio::test]
750    async fn test_is_shutting_down() {
751        let (tx, _) = broadcast::channel(100);
752        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
753            .await
754            .unwrap();
755        assert!(!q.is_shutting_down());
756        q.stop().await;
757        assert!(q.is_shutting_down());
758    }
759
760    #[tokio::test]
761    async fn test_pending_external_tasks_empty() {
762        let (tx, _) = broadcast::channel(100);
763        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
764            .await
765            .unwrap();
766        assert!(q.pending_external_tasks().await.is_empty());
767    }
768
769    #[tokio::test]
770    async fn test_complete_external_task_nonexistent() {
771        let (tx, _) = broadcast::channel(100);
772        let q = SessionLaneQueue::new("s", SessionQueueConfig::default(), tx)
773            .await
774            .unwrap();
775        let r = ExternalTaskResult {
776            success: true,
777            result: serde_json::json!("ok"),
778            error: None,
779        };
780        assert!(!q.complete_external_task("nope", r).await);
781    }
782
783    #[test]
784    fn test_command_payload() {
785        let cmd = TestCommand {
786            value: serde_json::json!({"k": "v"}),
787        };
788        assert_eq!(cmd.payload(), serde_json::json!({"k": "v"}));
789        assert_eq!(cmd.command_type(), "test");
790    }
791
792    #[test]
793    fn test_event_bridge_new() {
794        let (tx, _) = broadcast::channel(100);
795        let _b = EventBridge::new(tx);
796        // EventBridge constructed successfully
797    }
798
799    #[test]
800    fn test_event_bridge_emit_dead_letter() {
801        let (tx, mut rx) = broadcast::channel(100);
802        let b = EventBridge::new(tx);
803        b.emit_dead_letter(&DeadLetter {
804            command_id: "c1".to_string(),
805            command_type: "t".to_string(),
806            lane_id: "control".to_string(),
807            error: "err".to_string(),
808            attempts: 3,
809            failed_at: chrono::Utc::now(),
810        });
811        match rx.try_recv().unwrap() {
812            AgentEvent::CommandDeadLettered {
813                command_id,
814                attempts,
815                ..
816            } => {
817                assert_eq!(command_id, "c1");
818                assert_eq!(attempts, 3);
819            }
820            _ => panic!("wrong event"),
821        }
822    }
823
824    #[test]
825    fn test_event_bridge_emit_retry() {
826        let (tx, mut rx) = broadcast::channel(100);
827        let b = EventBridge::new(tx);
828        b.emit_retry("c1", "t", "query", 2, 1000);
829        match rx.try_recv().unwrap() {
830            AgentEvent::CommandRetry {
831                attempt, delay_ms, ..
832            } => {
833                assert_eq!(attempt, 2);
834                assert_eq!(delay_ms, 1000);
835            }
836            _ => panic!("wrong event"),
837        }
838    }
839
840    #[test]
841    fn test_event_bridge_emit_alert() {
842        let (tx, mut rx) = broadcast::channel(100);
843        let b = EventBridge::new(tx);
844        b.emit_alert("warning", "queue_full", "at capacity");
845        match rx.try_recv().unwrap() {
846            AgentEvent::QueueAlert { level, .. } => assert_eq!(level, "warning"),
847            _ => panic!("wrong event"),
848        }
849    }
850
851    #[test]
852    fn test_lane_mapping() {
853        assert_eq!(SessionLane::Control.lane_id(), "control");
854        assert_eq!(SessionLane::Query.lane_id(), "query");
855        assert_eq!(SessionLane::Execute.lane_id(), "skill");
856        assert_eq!(SessionLane::Generate.lane_id(), "prompt");
857    }
858
859    #[test]
860    fn test_lane_priority() {
861        assert!(SessionLane::Control.lane_priority() < SessionLane::Query.lane_priority());
862        assert!(SessionLane::Query.lane_priority() < SessionLane::Execute.lane_priority());
863        assert!(SessionLane::Execute.lane_priority() < SessionLane::Generate.lane_priority());
864    }
865
866    // Note: lane_config() method was removed, config is now handled by SessionQueueConfig
867
868    #[tokio::test]
869    async fn test_build_queue_manager_default() {
870        let (_, metrics) = SessionLaneQueue::build_queue_manager(&SessionQueueConfig::default())
871            .await
872            .unwrap();
873        assert!(metrics.is_none());
874    }
875
876    #[tokio::test]
877    async fn test_build_queue_manager_with_metrics() {
878        let cfg = SessionQueueConfig {
879            enable_metrics: true,
880            ..Default::default()
881        };
882        let (_, metrics) = SessionLaneQueue::build_queue_manager(&cfg).await.unwrap();
883        assert!(metrics.is_some());
884    }
885
886    #[tokio::test]
887    async fn test_build_queue_manager_with_dlq() {
888        let cfg = SessionQueueConfig {
889            enable_dlq: true,
890            dlq_max_size: Some(500),
891            ..Default::default()
892        };
893        assert!(SessionLaneQueue::build_queue_manager(&cfg).await.is_ok());
894    }
895}