Skip to main content

a3s_lane/
queue.rs

1//! Core queue implementation with lanes and priority scheduling
2
3#[cfg(feature = "distributed")]
4use crate::boost::PriorityBooster;
5use crate::config::LaneConfig;
6use crate::dlq::{DeadLetter, DeadLetterQueue};
7use crate::error::{LaneError, Result};
8use crate::event::{events, EventEmitter, EventStream, LaneEvent};
9#[cfg(feature = "distributed")]
10use crate::ratelimit::RateLimiter;
11use crate::retry::RetryPolicy;
12use crate::storage::{Storage, StoredCommand};
13#[cfg(feature = "telemetry")]
14use crate::telemetry;
15use async_trait::async_trait;
16use chrono::Utc;
17use serde::{Deserialize, Serialize};
18use std::collections::{HashMap, VecDeque};
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::Arc;
21#[cfg(feature = "telemetry")]
22use std::time::Instant;
23use tokio::sync::{Mutex, Semaphore};
24use uuid::Uuid;
25
26/// Lane identifier
27pub type LaneId = String;
28
29/// Command identifier
30pub type CommandId = String;
31
32/// Lane priority (lower number = higher priority)
33pub type Priority = u8;
34
35/// Lane priorities
36pub mod priorities {
37    use super::Priority;
38
39    pub const SYSTEM: Priority = 0;
40    pub const CONTROL: Priority = 1;
41    pub const QUERY: Priority = 2;
42    pub const SESSION: Priority = 3;
43    pub const SKILL: Priority = 4;
44    pub const PROMPT: Priority = 5;
45}
46
47/// Command to be executed
48#[async_trait]
49pub trait Command: Send + Sync {
50    /// Execute the command
51    async fn execute(&self) -> Result<serde_json::Value>;
52
53    /// Get command type (for logging/debugging)
54    fn command_type(&self) -> &str;
55}
56
57/// A simple JSON-based command for SDK usage.
58///
59/// Returns the payload as-is when executed. Useful for language bindings
60/// (Python, Node.js) where commands are represented as JSON data.
61pub struct JsonCommand {
62    command_type: String,
63    payload: serde_json::Value,
64}
65
66impl JsonCommand {
67    /// Create a new JSON command.
68    pub fn new(command_type: impl Into<String>, payload: serde_json::Value) -> Self {
69        Self {
70            command_type: command_type.into(),
71            payload,
72        }
73    }
74}
75
76#[async_trait]
77impl Command for JsonCommand {
78    async fn execute(&self) -> Result<serde_json::Value> {
79        Ok(self.payload.clone())
80    }
81
82    fn command_type(&self) -> &str {
83        &self.command_type
84    }
85}
86
87/// Command wrapper
88struct CommandWrapper {
89    id: CommandId,
90    command: Arc<dyn Command>,
91    result_tx: Option<tokio::sync::oneshot::Sender<Result<serde_json::Value>>>,
92    timeout: Option<std::time::Duration>,
93    retry_policy: RetryPolicy,
94    attempt: u32,
95    lane_id: LaneId,
96    command_type: String,
97    /// Submission time used by the priority booster to calculate deadline proximity
98    #[cfg(feature = "distributed")]
99    enqueue_time: chrono::DateTime<chrono::Utc>,
100}
101
102/// Lane state
103#[allow(dead_code)]
104struct LaneState {
105    /// Lane configuration
106    config: LaneConfig,
107
108    /// Priority
109    priority: Priority,
110
111    /// Pending commands (FIFO queue)
112    pending: VecDeque<CommandWrapper>,
113
114    /// Active command count
115    active: usize,
116
117    /// Semaphore for concurrency control
118    semaphore: Arc<Semaphore>,
119
120    /// True when the lane is currently considered under pressure
121    is_pressured: bool,
122}
123
124impl LaneState {
125    fn new(config: LaneConfig, priority: Priority) -> Self {
126        let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
127        Self {
128            config,
129            priority,
130            pending: VecDeque::new(),
131            active: 0,
132            semaphore,
133            is_pressured: false,
134        }
135    }
136
137    fn has_capacity(&self) -> bool {
138        self.active < self.config.max_concurrency
139    }
140
141    fn has_pending(&self) -> bool {
142        !self.pending.is_empty()
143    }
144}
145
146/// Lane
147pub struct Lane {
148    id: LaneId,
149    state: Arc<Mutex<LaneState>>,
150    storage: Option<Arc<dyn Storage>>,
151    /// Rate limiter instantiated from LaneConfig.rate_limit (None = unlimited)
152    #[cfg(feature = "distributed")]
153    rate_limiter: RateLimiter,
154    /// Priority booster instantiated from LaneConfig.priority_boost
155    #[cfg(feature = "distributed")]
156    booster: Option<PriorityBooster>,
157}
158
159impl Lane {
160    /// Create a new lane
161    pub fn new(id: impl Into<String>, config: LaneConfig, priority: Priority) -> Self {
162        #[cfg(feature = "distributed")]
163        let rate_limiter = config
164            .rate_limit
165            .as_ref()
166            .map(RateLimiter::token_bucket)
167            .unwrap_or_default();
168        #[cfg(feature = "distributed")]
169        let booster = config
170            .priority_boost
171            .as_ref()
172            .map(|pb| PriorityBooster::new(pb.clone()));
173        Self {
174            id: id.into(),
175            state: Arc::new(Mutex::new(LaneState::new(config, priority))),
176            storage: None,
177            #[cfg(feature = "distributed")]
178            rate_limiter,
179            #[cfg(feature = "distributed")]
180            booster,
181        }
182    }
183
184    /// Create a new lane with storage
185    pub fn with_storage(
186        id: impl Into<String>,
187        config: LaneConfig,
188        priority: Priority,
189        storage: Arc<dyn Storage>,
190    ) -> Self {
191        #[cfg(feature = "distributed")]
192        let rate_limiter = config
193            .rate_limit
194            .as_ref()
195            .map(RateLimiter::token_bucket)
196            .unwrap_or_default();
197        #[cfg(feature = "distributed")]
198        let booster = config
199            .priority_boost
200            .as_ref()
201            .map(|pb| PriorityBooster::new(pb.clone()));
202        Self {
203            id: id.into(),
204            state: Arc::new(Mutex::new(LaneState::new(config, priority))),
205            storage: Some(storage),
206            #[cfg(feature = "distributed")]
207            rate_limiter,
208            #[cfg(feature = "distributed")]
209            booster,
210        }
211    }
212
213    /// Get lane ID
214    pub fn id(&self) -> &str {
215        &self.id
216    }
217
218    /// Get lane priority
219    pub async fn priority(&self) -> Priority {
220        self.state.lock().await.priority
221    }
222
223    /// Get effective priority, applying boost based on the front command's age
224    pub async fn effective_priority(&self) -> Priority {
225        let state = self.state.lock().await;
226        let base = state.priority;
227        #[cfg(feature = "distributed")]
228        if let Some(booster) = &self.booster {
229            if let Some(front) = state.pending.front() {
230                return booster.calculate_priority(base, front.enqueue_time);
231            }
232        }
233        base
234    }
235
236    /// Enqueue a command
237    pub async fn enqueue(
238        &self,
239        command: Box<dyn Command>,
240    ) -> tokio::sync::oneshot::Receiver<Result<serde_json::Value>> {
241        let (tx, rx) = tokio::sync::oneshot::channel();
242        let state = self.state.lock().await;
243        let timeout = state.config.default_timeout;
244        let retry_policy = state.config.retry_policy.clone();
245        drop(state);
246
247        let command_id = Uuid::new_v4().to_string();
248        let command_type = command.command_type().to_string();
249        let wrapper = CommandWrapper {
250            id: command_id.clone(),
251            command: Arc::from(command),
252            result_tx: Some(tx),
253            timeout,
254            retry_policy,
255            attempt: 0,
256            lane_id: self.id.clone(),
257            command_type: command_type.clone(),
258            #[cfg(feature = "distributed")]
259            enqueue_time: Utc::now(),
260        };
261
262        // Persist to storage if available
263        if let Some(storage) = &self.storage {
264            let stored_cmd = StoredCommand {
265                id: command_id,
266                command_type,
267                lane_id: self.id.clone(),
268                payload: serde_json::json!({}), // Empty payload for now
269                retry_count: 0,
270                created_at: Utc::now(),
271                last_attempt_at: None,
272            };
273            // Ignore storage errors to not block command execution
274            let _ = storage.save_command(stored_cmd).await;
275        }
276
277        let mut state = self.state.lock().await;
278        state.pending.push_back(wrapper);
279
280        rx
281    }
282
283    /// Re-enqueue a command for retry (internal use)
284    async fn retry_command(&self, mut wrapper: CommandWrapper, delay: std::time::Duration) {
285        wrapper.attempt += 1;
286
287        // Spawn a task to re-enqueue after delay
288        let state_clone = Arc::clone(&self.state);
289        tokio::spawn(async move {
290            tokio::time::sleep(delay).await;
291            let mut state = state_clone.lock().await;
292            state.pending.push_back(wrapper);
293        });
294    }
295
296    /// Try to dequeue a command for execution
297    async fn try_dequeue(&self) -> Option<CommandWrapper> {
298        // Check rate limiter before acquiring the state lock
299        #[cfg(feature = "distributed")]
300        if !self.rate_limiter.try_acquire().await {
301            return None;
302        }
303        let mut state = self.state.lock().await;
304        if state.has_capacity() && state.has_pending() {
305            state.active += 1;
306            state.pending.pop_front()
307        } else {
308            None
309        }
310    }
311
312    /// Mark a command as completed
313    async fn mark_completed(&self) {
314        let mut state = self.state.lock().await;
315        state.active = state.active.saturating_sub(1);
316    }
317
318    /// Get lane status
319    pub async fn status(&self) -> LaneStatus {
320        let state = self.state.lock().await;
321        LaneStatus {
322            pending: state.pending.len(),
323            active: state.active,
324            min: state.config.min_concurrency,
325            max: state.config.max_concurrency,
326        }
327    }
328
329    /// Check for pressure state transitions.
330    ///
331    /// Returns the event key to emit if a transition occurred, or `None` if no change.
332    /// - Transitions to pressured when `pending >= threshold` and was not already pressured.
333    /// - Transitions to idle when `pending == 0` and was previously pressured.
334    async fn check_pressure(&self) -> Option<&'static str> {
335        let mut state = self.state.lock().await;
336        let threshold = match state.config.pressure_threshold {
337            Some(t) => t,
338            None => return None,
339        };
340        let pending = state.pending.len();
341        let was_pressured = state.is_pressured;
342        if pending >= threshold && !was_pressured {
343            state.is_pressured = true;
344            Some(events::QUEUE_LANE_PRESSURE)
345        } else if pending == 0 && was_pressured {
346            state.is_pressured = false;
347            Some(events::QUEUE_LANE_IDLE)
348        } else {
349            None
350        }
351    }
352}
353
354/// Lane status
355#[derive(Debug, Clone, Serialize, Deserialize)]
356pub struct LaneStatus {
357    pub pending: usize,
358    pub active: usize,
359    pub min: usize,
360    pub max: usize,
361}
362
363/// Command queue
364#[allow(dead_code)]
365pub struct CommandQueue {
366    lanes: Arc<Mutex<HashMap<LaneId, Arc<Lane>>>>,
367    event_emitter: EventEmitter,
368    dlq: Option<DeadLetterQueue>,
369    storage: Option<Arc<dyn Storage>>,
370    is_shutting_down: Arc<AtomicBool>,
371    shutdown_notify: Arc<tokio::sync::Notify>,
372}
373
374impl CommandQueue {
375    /// Create a new command queue
376    pub fn new(event_emitter: EventEmitter) -> Self {
377        Self {
378            lanes: Arc::new(Mutex::new(HashMap::new())),
379            event_emitter,
380            dlq: None,
381            storage: None,
382            is_shutting_down: Arc::new(AtomicBool::new(false)),
383            shutdown_notify: Arc::new(tokio::sync::Notify::new()),
384        }
385    }
386
387    /// Create a new command queue with a dead letter queue
388    pub fn with_dlq(event_emitter: EventEmitter, dlq_size: usize) -> Self {
389        Self {
390            lanes: Arc::new(Mutex::new(HashMap::new())),
391            event_emitter,
392            dlq: Some(DeadLetterQueue::new(dlq_size)),
393            storage: None,
394            is_shutting_down: Arc::new(AtomicBool::new(false)),
395            shutdown_notify: Arc::new(tokio::sync::Notify::new()),
396        }
397    }
398
399    /// Create a new command queue with storage
400    pub fn with_storage(event_emitter: EventEmitter, storage: Arc<dyn Storage>) -> Self {
401        Self {
402            lanes: Arc::new(Mutex::new(HashMap::new())),
403            event_emitter,
404            dlq: None,
405            storage: Some(storage),
406            is_shutting_down: Arc::new(AtomicBool::new(false)),
407            shutdown_notify: Arc::new(tokio::sync::Notify::new()),
408        }
409    }
410
411    /// Create a new command queue with both DLQ and storage
412    pub fn with_dlq_and_storage(
413        event_emitter: EventEmitter,
414        dlq_size: usize,
415        storage: Arc<dyn Storage>,
416    ) -> Self {
417        Self {
418            lanes: Arc::new(Mutex::new(HashMap::new())),
419            event_emitter,
420            dlq: Some(DeadLetterQueue::new(dlq_size)),
421            storage: Some(storage),
422            is_shutting_down: Arc::new(AtomicBool::new(false)),
423            shutdown_notify: Arc::new(tokio::sync::Notify::new()),
424        }
425    }
426
427    /// Get the storage backend
428    pub fn storage(&self) -> Option<&Arc<dyn Storage>> {
429        self.storage.as_ref()
430    }
431
432    /// Get the dead letter queue
433    pub fn dlq(&self) -> Option<&DeadLetterQueue> {
434        self.dlq.as_ref()
435    }
436
437    /// Check if shutdown is in progress
438    pub fn is_shutting_down(&self) -> bool {
439        self.is_shutting_down.load(Ordering::SeqCst)
440    }
441
442    /// Initiate graceful shutdown - stop accepting new commands
443    pub async fn shutdown(&self) {
444        self.is_shutting_down.store(true, Ordering::SeqCst);
445        self.event_emitter
446            .emit(LaneEvent::empty(events::QUEUE_SHUTDOWN_STARTED));
447    }
448
449    /// Wait for all pending commands to complete (with timeout)
450    pub async fn drain(&self, timeout: std::time::Duration) -> Result<()> {
451        let start = std::time::Instant::now();
452
453        loop {
454            // Check if all lanes are empty and idle
455            let lanes = self.lanes.lock().await;
456            let mut all_idle = true;
457
458            for lane in lanes.values() {
459                let status = lane.status().await;
460                if status.pending > 0 || status.active > 0 {
461                    all_idle = false;
462                    break;
463                }
464            }
465            drop(lanes);
466
467            if all_idle {
468                return Ok(());
469            }
470
471            // Check timeout
472            if start.elapsed() >= timeout {
473                return Err(LaneError::Timeout(timeout));
474            }
475
476            // Wait a bit before checking again
477            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
478        }
479    }
480
481    /// Register a lane
482    pub async fn register_lane(&self, lane: Arc<Lane>) {
483        let mut lanes = self.lanes.lock().await;
484        lanes.insert(lane.id().to_string(), lane);
485    }
486
487    /// Submit a command to a lane
488    pub async fn submit(
489        &self,
490        lane_id: &str,
491        command: Box<dyn Command>,
492    ) -> Result<tokio::sync::oneshot::Receiver<Result<serde_json::Value>>> {
493        // Reject new commands during shutdown
494        if self.is_shutting_down() {
495            return Err(LaneError::ShutdownInProgress);
496        }
497
498        let lanes = self.lanes.lock().await;
499        let lane = lanes
500            .get(lane_id)
501            .ok_or_else(|| LaneError::LaneNotFound(lane_id.to_string()))?;
502        let rx = lane.enqueue(command).await;
503
504        self.event_emitter.emit(LaneEvent::with_map(
505            events::QUEUE_COMMAND_SUBMITTED,
506            HashMap::from([("lane_id".to_string(), serde_json::json!(lane_id))]),
507        ));
508
509        Ok(rx)
510    }
511
512    /// Start the scheduler
513    pub async fn start_scheduler(self: Arc<Self>) {
514        tokio::spawn(async move {
515            loop {
516                self.schedule_next().await;
517                tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
518            }
519        });
520    }
521
522    /// Schedule the next command
523    async fn schedule_next(&self) {
524        // Find the highest-priority lane with pending commands.
525        // effective_priority applies any deadline-based boost configured on the lane.
526        let lanes = self.lanes.lock().await;
527
528        // Check pressure transitions for all lanes and emit events
529        for (lane_id, lane) in lanes.iter() {
530            if let Some(event_key) = lane.check_pressure().await {
531                self.event_emitter.emit(LaneEvent::with_map(
532                    event_key,
533                    HashMap::from([("lane_id".to_string(), serde_json::json!(lane_id))]),
534                ));
535            }
536        }
537
538        let mut lane_priorities = Vec::new();
539        for lane in lanes.values() {
540            let priority = lane.effective_priority().await;
541            lane_priorities.push((priority, Arc::clone(lane)));
542        }
543
544        // Sort by priority (lower number = higher priority)
545        lane_priorities.sort_by_key(|(priority, _)| *priority);
546
547        for (_, lane) in lane_priorities {
548            if let Some(mut wrapper) = lane.try_dequeue().await {
549                let lane_clone = Arc::clone(&lane);
550                let timeout = wrapper.timeout;
551                let retry_policy = wrapper.retry_policy.clone();
552                let attempt = wrapper.attempt;
553                let dlq = self.dlq.clone();
554                let command_id = wrapper.id.clone();
555                let command_type = wrapper.command_type.clone();
556                let lane_id = wrapper.lane_id.clone();
557                let storage = lane.storage.clone();
558                let event_emitter = self.event_emitter.clone();
559
560                event_emitter.emit(LaneEvent::with_map(
561                    events::QUEUE_COMMAND_STARTED,
562                    HashMap::from([
563                        ("lane_id".to_string(), serde_json::json!(lane_id)),
564                        ("command_id".to_string(), serde_json::json!(command_id)),
565                        ("command_type".to_string(), serde_json::json!(command_type)),
566                    ]),
567                ));
568
569                tokio::spawn(async move {
570                    #[cfg(feature = "telemetry")]
571                    let exec_start = Instant::now();
572
573                    let result = match timeout {
574                        Some(dur) => {
575                            match tokio::time::timeout(dur, wrapper.command.execute()).await {
576                                Ok(r) => r,
577                                Err(_) => Err(LaneError::Timeout(dur)),
578                            }
579                        }
580                        None => wrapper.command.execute().await,
581                    };
582
583                    match result {
584                        Ok(value) => {
585                            if let Some(storage) = &storage {
586                                let _ = storage.remove_command(&command_id).await;
587                            }
588
589                            #[cfg(feature = "telemetry")]
590                            telemetry::record_complete(
591                                &lane_id,
592                                exec_start.elapsed().as_secs_f64(),
593                            );
594
595                            event_emitter.emit(LaneEvent::with_map(
596                                events::QUEUE_COMMAND_COMPLETED,
597                                HashMap::from([
598                                    ("lane_id".to_string(), serde_json::json!(lane_id)),
599                                    ("command_id".to_string(), serde_json::json!(command_id)),
600                                ]),
601                            ));
602
603                            if let Some(tx) = wrapper.result_tx.take() {
604                                let _ = tx.send(Ok(value));
605                            }
606                            lane_clone.mark_completed().await;
607                        }
608                        Err(err) => {
609                            if retry_policy.should_retry(attempt) {
610                                let delay = retry_policy.delay_for_attempt(attempt + 1);
611
612                                tracing::info!(
613                                    command_id = %command_id,
614                                    retry_attempt = attempt + 1,
615                                    "a3s.lane.retry: retrying command"
616                                );
617
618                                event_emitter.emit(LaneEvent::with_map(
619                                    events::QUEUE_COMMAND_RETRY,
620                                    HashMap::from([
621                                        ("lane_id".to_string(), serde_json::json!(lane_id)),
622                                        ("command_id".to_string(), serde_json::json!(command_id)),
623                                        ("attempt".to_string(), serde_json::json!(attempt + 1)),
624                                    ]),
625                                ));
626
627                                lane_clone.retry_command(wrapper, delay).await;
628                                lane_clone.mark_completed().await;
629                            } else {
630                                #[cfg(feature = "telemetry")]
631                                telemetry::record_failure(&lane_id);
632
633                                if let Some(storage) = &storage {
634                                    let _ = storage.remove_command(&command_id).await;
635                                }
636
637                                let error_msg = err.to_string();
638                                let is_timeout = matches!(err, LaneError::Timeout(_));
639
640                                if let Some(dlq) = dlq {
641                                    let dead_letter = DeadLetter {
642                                        command_id: command_id.clone(),
643                                        command_type: command_type.clone(),
644                                        lane_id: lane_id.clone(),
645                                        error: error_msg.clone(),
646                                        attempts: attempt + 1,
647                                        failed_at: Utc::now(),
648                                    };
649                                    dlq.push(dead_letter).await;
650
651                                    event_emitter.emit(LaneEvent::with_map(
652                                        events::QUEUE_COMMAND_DEAD_LETTERED,
653                                        HashMap::from([
654                                            ("lane_id".to_string(), serde_json::json!(lane_id)),
655                                            (
656                                                "command_id".to_string(),
657                                                serde_json::json!(command_id),
658                                            ),
659                                            (
660                                                "command_type".to_string(),
661                                                serde_json::json!(command_type),
662                                            ),
663                                        ]),
664                                    ));
665                                }
666
667                                event_emitter.emit(LaneEvent::with_map(
668                                    if is_timeout {
669                                        events::QUEUE_COMMAND_TIMEOUT
670                                    } else {
671                                        events::QUEUE_COMMAND_FAILED
672                                    },
673                                    HashMap::from([
674                                        ("lane_id".to_string(), serde_json::json!(lane_id)),
675                                        ("command_id".to_string(), serde_json::json!(command_id)),
676                                        ("error".to_string(), serde_json::json!(error_msg)),
677                                    ]),
678                                ));
679
680                                if let Some(tx) = wrapper.result_tx.take() {
681                                    let _ = tx.send(Err(err));
682                                }
683                                lane_clone.mark_completed().await;
684                            }
685                        }
686                    }
687                });
688                break;
689            }
690        }
691    }
692
693    /// Subscribe to all queue lifecycle events as an `EventStream` (implements `Stream`)
694    pub fn subscribe_stream(&self) -> EventStream {
695        self.event_emitter.subscribe_stream()
696    }
697
698    /// Subscribe to filtered queue lifecycle events as an `EventStream`
699    pub fn subscribe_filtered(
700        &self,
701        filter: impl Fn(&LaneEvent) -> bool + Send + Sync + 'static,
702    ) -> EventStream {
703        self.event_emitter.subscribe_filtered(filter)
704    }
705
706    /// Get queue status for all lanes
707    pub async fn status(&self) -> HashMap<LaneId, LaneStatus> {
708        let lanes = self.lanes.lock().await;
709        let mut status = HashMap::new();
710
711        for (id, lane) in lanes.iter() {
712            status.insert(id.clone(), lane.status().await);
713        }
714
715        status
716    }
717}
718
719/// Built-in lane IDs
720pub mod lane_ids {
721    pub const SYSTEM: &str = "system";
722    pub const CONTROL: &str = "control";
723    pub const QUERY: &str = "query";
724    pub const SESSION: &str = "session";
725    pub const SKILL: &str = "skill";
726    pub const PROMPT: &str = "prompt";
727}
728
729#[cfg(test)]
730mod tests {
731    use super::*;
732
733    /// Test command implementation
734    struct TestCommand {
735        result: serde_json::Value,
736        delay_ms: Option<u64>,
737    }
738
739    impl TestCommand {
740        fn new(result: serde_json::Value) -> Self {
741            Self {
742                result,
743                delay_ms: None,
744            }
745        }
746
747        fn with_delay(result: serde_json::Value, delay_ms: u64) -> Self {
748            Self {
749                result,
750                delay_ms: Some(delay_ms),
751            }
752        }
753    }
754
755    #[async_trait]
756    impl Command for TestCommand {
757        async fn execute(&self) -> Result<serde_json::Value> {
758            if let Some(delay) = self.delay_ms {
759                tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
760            }
761            Ok(self.result.clone())
762        }
763
764        fn command_type(&self) -> &str {
765            "test"
766        }
767    }
768
769    /// Failing test command
770    struct FailingCommand {
771        message: String,
772    }
773
774    #[async_trait]
775    impl Command for FailingCommand {
776        async fn execute(&self) -> Result<serde_json::Value> {
777            Err(LaneError::Other(self.message.clone()))
778        }
779
780        fn command_type(&self) -> &str {
781            "failing"
782        }
783    }
784
785    #[test]
786    fn test_priorities() {
787        assert_eq!(priorities::SYSTEM, 0);
788        assert_eq!(priorities::CONTROL, 1);
789        assert_eq!(priorities::QUERY, 2);
790        assert_eq!(priorities::SESSION, 3);
791        assert_eq!(priorities::SKILL, 4);
792        assert_eq!(priorities::PROMPT, 5);
793
794        // Verify priority ordering: system has highest priority (lowest number)
795        // Using const block to satisfy clippy assertions_on_constants
796        const _: () = {
797            assert!(priorities::SYSTEM < priorities::CONTROL);
798            assert!(priorities::CONTROL < priorities::QUERY);
799            assert!(priorities::QUERY < priorities::SESSION);
800            assert!(priorities::SESSION < priorities::SKILL);
801            assert!(priorities::SKILL < priorities::PROMPT);
802        };
803    }
804
805    #[test]
806    fn test_lane_ids() {
807        assert_eq!(lane_ids::SYSTEM, "system");
808        assert_eq!(lane_ids::CONTROL, "control");
809        assert_eq!(lane_ids::QUERY, "query");
810        assert_eq!(lane_ids::SESSION, "session");
811        assert_eq!(lane_ids::SKILL, "skill");
812        assert_eq!(lane_ids::PROMPT, "prompt");
813    }
814
815    #[test]
816    fn test_lane_new() {
817        let config = LaneConfig::new(1, 4);
818        let lane = Lane::new("test-lane", config, priorities::QUERY);
819
820        assert_eq!(lane.id(), "test-lane");
821    }
822
823    #[tokio::test]
824    async fn test_lane_priority() {
825        let config = LaneConfig::new(1, 4);
826        let lane = Lane::new("test", config, priorities::SESSION);
827
828        assert_eq!(lane.priority().await, priorities::SESSION);
829    }
830
831    #[tokio::test]
832    async fn test_lane_status_initial() {
833        let config = LaneConfig::new(2, 8);
834        let lane = Lane::new("test", config, priorities::QUERY);
835
836        let status = lane.status().await;
837        assert_eq!(status.pending, 0);
838        assert_eq!(status.active, 0);
839        assert_eq!(status.min, 2);
840        assert_eq!(status.max, 8);
841    }
842
843    #[tokio::test]
844    async fn test_lane_enqueue() {
845        let config = LaneConfig::new(1, 4);
846        let lane = Lane::new("test", config, priorities::QUERY);
847
848        let cmd = Box::new(TestCommand::new(serde_json::json!({"result": "ok"})));
849        let _rx = lane.enqueue(cmd).await;
850
851        let status = lane.status().await;
852        assert_eq!(status.pending, 1);
853    }
854
855    #[tokio::test]
856    async fn test_lane_status_serialization() {
857        let status = LaneStatus {
858            pending: 5,
859            active: 2,
860            min: 1,
861            max: 8,
862        };
863
864        let json = serde_json::to_string(&status).unwrap();
865        assert!(json.contains("\"pending\":5"));
866        assert!(json.contains("\"active\":2"));
867        assert!(json.contains("\"min\":1"));
868        assert!(json.contains("\"max\":8"));
869
870        let parsed: LaneStatus = serde_json::from_str(&json).unwrap();
871        assert_eq!(parsed.pending, 5);
872        assert_eq!(parsed.active, 2);
873    }
874
875    #[tokio::test]
876    async fn test_command_queue_new() {
877        let emitter = EventEmitter::new(100);
878        let queue = CommandQueue::new(emitter);
879
880        let status = queue.status().await;
881        assert!(status.is_empty());
882    }
883
884    #[tokio::test]
885    async fn test_command_queue_register_lane() {
886        let emitter = EventEmitter::new(100);
887        let queue = CommandQueue::new(emitter);
888
889        let config = LaneConfig::new(1, 4);
890        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
891
892        queue.register_lane(lane).await;
893
894        let status = queue.status().await;
895        assert!(status.contains_key("test-lane"));
896    }
897
898    #[tokio::test]
899    async fn test_command_queue_submit() {
900        let emitter = EventEmitter::new(100);
901        let queue = CommandQueue::new(emitter);
902
903        let config = LaneConfig::new(1, 4);
904        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
905        queue.register_lane(lane).await;
906
907        let cmd = Box::new(TestCommand::new(serde_json::json!({"status": "ok"})));
908        let result = queue.submit("test-lane", cmd).await;
909
910        assert!(result.is_ok());
911    }
912
913    #[tokio::test]
914    async fn test_command_queue_submit_unknown_lane() {
915        let emitter = EventEmitter::new(100);
916        let queue = CommandQueue::new(emitter);
917
918        let cmd = Box::new(TestCommand::new(serde_json::json!({})));
919        let result = queue.submit("nonexistent", cmd).await;
920
921        assert!(result.is_err());
922        if let Err(LaneError::LaneNotFound(id)) = result {
923            assert_eq!(id, "nonexistent");
924        } else {
925            panic!("Expected LaneNotFound error");
926        }
927    }
928
929    #[tokio::test]
930    async fn test_command_queue_multiple_lanes() {
931        let emitter = EventEmitter::new(100);
932        let queue = CommandQueue::new(emitter);
933
934        // Register multiple lanes
935        let configs = vec![
936            ("system", priorities::SYSTEM, 1),
937            ("control", priorities::CONTROL, 8),
938            ("query", priorities::QUERY, 4),
939        ];
940
941        for (id, priority, max) in configs {
942            let config = LaneConfig::new(1, max);
943            let lane = Arc::new(Lane::new(id, config, priority));
944            queue.register_lane(lane).await;
945        }
946
947        let status = queue.status().await;
948        assert_eq!(status.len(), 3);
949        assert!(status.contains_key("system"));
950        assert!(status.contains_key("control"));
951        assert!(status.contains_key("query"));
952    }
953
954    #[tokio::test]
955    async fn test_command_queue_status() {
956        let emitter = EventEmitter::new(100);
957        let queue = CommandQueue::new(emitter);
958
959        let config = LaneConfig::new(2, 16);
960        let lane = Arc::new(Lane::new("query", config, priorities::QUERY));
961        queue.register_lane(lane).await;
962
963        let status = queue.status().await;
964        let lane_status = status.get("query").unwrap();
965
966        assert_eq!(lane_status.min, 2);
967        assert_eq!(lane_status.max, 16);
968        assert_eq!(lane_status.pending, 0);
969        assert_eq!(lane_status.active, 0);
970    }
971
972    #[test]
973    fn test_lane_state_has_capacity() {
974        let config = LaneConfig::new(1, 2);
975        let mut state = LaneState::new(config, priorities::QUERY);
976
977        assert!(state.has_capacity());
978        state.active = 1;
979        assert!(state.has_capacity());
980        state.active = 2;
981        assert!(!state.has_capacity());
982    }
983
984    #[tokio::test]
985    async fn test_lane_state_has_pending() {
986        let config = LaneConfig::new(1, 4);
987        let state = LaneState::new(config, priorities::QUERY);
988
989        assert!(!state.has_pending());
990        assert_eq!(state.pending.len(), 0);
991    }
992
993    #[test]
994    fn test_lane_status_debug() {
995        let status = LaneStatus {
996            pending: 3,
997            active: 1,
998            min: 1,
999            max: 4,
1000        };
1001
1002        let debug_str = format!("{:?}", status);
1003        assert!(debug_str.contains("LaneStatus"));
1004        assert!(debug_str.contains("pending"));
1005        assert!(debug_str.contains("active"));
1006    }
1007
1008    #[test]
1009    fn test_lane_status_clone() {
1010        let status = LaneStatus {
1011            pending: 5,
1012            active: 2,
1013            min: 1,
1014            max: 8,
1015        };
1016
1017        let cloned = status.clone();
1018        assert_eq!(cloned.pending, 5);
1019        assert_eq!(cloned.active, 2);
1020        assert_eq!(cloned.min, 1);
1021        assert_eq!(cloned.max, 8);
1022    }
1023
1024    #[tokio::test]
1025    async fn test_command_execution() {
1026        let cmd = TestCommand::new(serde_json::json!({"value": 42}));
1027        let result = cmd.execute().await;
1028
1029        assert!(result.is_ok());
1030        assert_eq!(result.unwrap(), serde_json::json!({"value": 42}));
1031    }
1032
1033    #[tokio::test]
1034    async fn test_command_type() {
1035        let cmd = TestCommand::new(serde_json::json!({}));
1036        assert_eq!(cmd.command_type(), "test");
1037
1038        let failing = FailingCommand {
1039            message: "error".to_string(),
1040        };
1041        assert_eq!(failing.command_type(), "failing");
1042    }
1043
1044    #[tokio::test]
1045    async fn test_failing_command() {
1046        let cmd = FailingCommand {
1047            message: "Something went wrong".to_string(),
1048        };
1049
1050        let result = cmd.execute().await;
1051        assert!(result.is_err());
1052
1053        if let Err(LaneError::Other(msg)) = result {
1054            assert_eq!(msg, "Something went wrong");
1055        } else {
1056            panic!("Expected Other error");
1057        }
1058    }
1059
1060    #[tokio::test]
1061    async fn test_command_with_delay() {
1062        let cmd = TestCommand::with_delay(serde_json::json!({"delayed": true}), 10);
1063
1064        let start = std::time::Instant::now();
1065        let result = cmd.execute().await;
1066        let elapsed = start.elapsed();
1067
1068        assert!(result.is_ok());
1069        assert!(elapsed.as_millis() >= 10);
1070    }
1071
1072    #[test]
1073    fn test_priority_type() {
1074        let p: Priority = 5;
1075        assert_eq!(p, 5u8);
1076    }
1077
1078    #[test]
1079    fn test_lane_id_type() {
1080        let id: LaneId = "test-lane".to_string();
1081        assert_eq!(id, "test-lane");
1082    }
1083
1084    #[test]
1085    fn test_command_id_type() {
1086        let id: CommandId = "cmd-123".to_string();
1087        assert_eq!(id, "cmd-123");
1088    }
1089
1090    #[tokio::test]
1091    async fn test_command_timeout() {
1092        let emitter = EventEmitter::new(100);
1093        let queue = Arc::new(CommandQueue::new(emitter));
1094
1095        let config = LaneConfig::new(1, 4).with_timeout(std::time::Duration::from_millis(50));
1096        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1097        queue.register_lane(lane).await;
1098
1099        // Start scheduler
1100        Arc::clone(&queue).start_scheduler().await;
1101
1102        // Submit a command that takes longer than timeout
1103        let cmd = Box::new(TestCommand::with_delay(
1104            serde_json::json!({"result": "ok"}),
1105            200,
1106        ));
1107        let rx = queue.submit("test-lane", cmd).await.unwrap();
1108
1109        // Wait for result
1110        let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
1111            .await
1112            .expect("Timeout waiting for result")
1113            .expect("Channel closed");
1114
1115        // Should be a timeout error
1116        assert!(result.is_err());
1117        if let Err(LaneError::Timeout(dur)) = result {
1118            assert_eq!(dur, std::time::Duration::from_millis(50));
1119        } else {
1120            panic!("Expected Timeout error");
1121        }
1122    }
1123
1124    #[tokio::test]
1125    async fn test_command_no_timeout() {
1126        let emitter = EventEmitter::new(100);
1127        let queue = Arc::new(CommandQueue::new(emitter));
1128
1129        let config = LaneConfig::new(1, 4);
1130        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1131        queue.register_lane(lane).await;
1132
1133        Arc::clone(&queue).start_scheduler().await;
1134
1135        // Submit a command with delay but no timeout configured
1136        let cmd = Box::new(TestCommand::with_delay(
1137            serde_json::json!({"result": "ok"}),
1138            50,
1139        ));
1140        let rx = queue.submit("test-lane", cmd).await.unwrap();
1141
1142        let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
1143            .await
1144            .expect("Timeout waiting for result")
1145            .expect("Channel closed");
1146
1147        // Should succeed
1148        assert!(result.is_ok());
1149        assert_eq!(result.unwrap()["result"], "ok");
1150    }
1151
1152    #[tokio::test]
1153    async fn test_command_completes_before_timeout() {
1154        let emitter = EventEmitter::new(100);
1155        let queue = Arc::new(CommandQueue::new(emitter));
1156
1157        let config = LaneConfig::new(1, 4).with_timeout(std::time::Duration::from_secs(5));
1158        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1159        queue.register_lane(lane).await;
1160
1161        Arc::clone(&queue).start_scheduler().await;
1162
1163        // Submit a fast command with long timeout
1164        let cmd = Box::new(TestCommand::with_delay(
1165            serde_json::json!({"result": "fast"}),
1166            10,
1167        ));
1168        let rx = queue.submit("test-lane", cmd).await.unwrap();
1169
1170        let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
1171            .await
1172            .expect("Timeout waiting for result")
1173            .expect("Channel closed");
1174
1175        // Should succeed
1176        assert!(result.is_ok());
1177        assert_eq!(result.unwrap()["result"], "fast");
1178    }
1179
1180    #[tokio::test]
1181    async fn test_command_retry_on_failure() {
1182        use std::sync::atomic::{AtomicU32, Ordering};
1183
1184        // Command that fails first 2 times, then succeeds
1185        struct RetryableCommand {
1186            attempts: Arc<AtomicU32>,
1187        }
1188
1189        #[async_trait]
1190        impl Command for RetryableCommand {
1191            async fn execute(&self) -> Result<serde_json::Value> {
1192                let attempt = self.attempts.fetch_add(1, Ordering::SeqCst);
1193                if attempt < 2 {
1194                    Err(LaneError::Other(format!("Attempt {} failed", attempt)))
1195                } else {
1196                    Ok(serde_json::json!({"success": true, "attempts": attempt + 1}))
1197                }
1198            }
1199
1200            fn command_type(&self) -> &str {
1201                "retryable"
1202            }
1203        }
1204
1205        let emitter = EventEmitter::new(100);
1206        let queue = Arc::new(CommandQueue::new(emitter));
1207
1208        let retry_policy = RetryPolicy::fixed(3, std::time::Duration::from_millis(10));
1209        let config = LaneConfig::new(1, 4).with_retry_policy(retry_policy);
1210        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1211        queue.register_lane(lane).await;
1212
1213        Arc::clone(&queue).start_scheduler().await;
1214
1215        let attempts = Arc::new(AtomicU32::new(0));
1216        let cmd = Box::new(RetryableCommand {
1217            attempts: Arc::clone(&attempts),
1218        });
1219        let rx = queue.submit("test-lane", cmd).await.unwrap();
1220
1221        // Wait for result (should succeed after retries)
1222        let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
1223            .await
1224            .expect("Timeout waiting for result")
1225            .expect("Channel closed");
1226
1227        assert!(result.is_ok());
1228        let value = result.unwrap();
1229        assert_eq!(value["success"], true);
1230        assert_eq!(value["attempts"], 3); // Failed twice, succeeded on 3rd attempt
1231    }
1232
1233    #[tokio::test]
1234    async fn test_command_retry_exhausted() {
1235        // Command that always fails
1236        struct AlwaysFailCommand;
1237
1238        #[async_trait]
1239        impl Command for AlwaysFailCommand {
1240            async fn execute(&self) -> Result<serde_json::Value> {
1241                Err(LaneError::Other("Always fails".to_string()))
1242            }
1243
1244            fn command_type(&self) -> &str {
1245                "always_fail"
1246            }
1247        }
1248
1249        let emitter = EventEmitter::new(100);
1250        let queue = Arc::new(CommandQueue::new(emitter));
1251
1252        let retry_policy = RetryPolicy::fixed(2, std::time::Duration::from_millis(10));
1253        let config = LaneConfig::new(1, 4).with_retry_policy(retry_policy);
1254        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1255        queue.register_lane(lane).await;
1256
1257        Arc::clone(&queue).start_scheduler().await;
1258
1259        let cmd = Box::new(AlwaysFailCommand);
1260        let rx = queue.submit("test-lane", cmd).await.unwrap();
1261
1262        // Wait for result (should fail after exhausting retries)
1263        let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
1264            .await
1265            .expect("Timeout waiting for result")
1266            .expect("Channel closed");
1267
1268        assert!(result.is_err());
1269        if let Err(LaneError::Other(msg)) = result {
1270            assert_eq!(msg, "Always fails");
1271        } else {
1272            panic!("Expected Other error");
1273        }
1274    }
1275
1276    #[tokio::test]
1277    async fn test_command_no_retry_on_success() {
1278        use std::sync::atomic::{AtomicU32, Ordering};
1279
1280        struct CountingCommand {
1281            counter: Arc<AtomicU32>,
1282        }
1283
1284        #[async_trait]
1285        impl Command for CountingCommand {
1286            async fn execute(&self) -> Result<serde_json::Value> {
1287                let count = self.counter.fetch_add(1, Ordering::SeqCst);
1288                Ok(serde_json::json!({"count": count + 1}))
1289            }
1290
1291            fn command_type(&self) -> &str {
1292                "counting"
1293            }
1294        }
1295
1296        let emitter = EventEmitter::new(100);
1297        let queue = Arc::new(CommandQueue::new(emitter));
1298
1299        let retry_policy = RetryPolicy::exponential(3);
1300        let config = LaneConfig::new(1, 4).with_retry_policy(retry_policy);
1301        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1302        queue.register_lane(lane).await;
1303
1304        Arc::clone(&queue).start_scheduler().await;
1305
1306        let counter = Arc::new(AtomicU32::new(0));
1307        let cmd = Box::new(CountingCommand {
1308            counter: Arc::clone(&counter),
1309        });
1310        let rx = queue.submit("test-lane", cmd).await.unwrap();
1311
1312        let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
1313            .await
1314            .expect("Timeout waiting for result")
1315            .expect("Channel closed");
1316
1317        assert!(result.is_ok());
1318        assert_eq!(result.unwrap()["count"], 1);
1319
1320        // Verify command was only executed once (no retries on success)
1321        assert_eq!(counter.load(Ordering::SeqCst), 1);
1322    }
1323
1324    #[tokio::test]
1325    async fn test_dlq_integration() {
1326        // Command that always fails
1327        struct FailCommand;
1328
1329        #[async_trait]
1330        impl Command for FailCommand {
1331            async fn execute(&self) -> Result<serde_json::Value> {
1332                Err(LaneError::Other("Permanent failure".to_string()))
1333            }
1334
1335            fn command_type(&self) -> &str {
1336                "fail_command"
1337            }
1338        }
1339
1340        let emitter = EventEmitter::new(100);
1341        let queue = Arc::new(CommandQueue::with_dlq(emitter, 100));
1342
1343        let retry_policy = RetryPolicy::fixed(2, std::time::Duration::from_millis(10));
1344        let config = LaneConfig::new(1, 4).with_retry_policy(retry_policy);
1345        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1346        queue.register_lane(lane).await;
1347
1348        Arc::clone(&queue).start_scheduler().await;
1349
1350        // Submit a failing command
1351        let cmd = Box::new(FailCommand);
1352        let rx = queue.submit("test-lane", cmd).await.unwrap();
1353
1354        // Wait for result
1355        let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
1356            .await
1357            .expect("Timeout waiting for result")
1358            .expect("Channel closed");
1359
1360        assert!(result.is_err());
1361
1362        // Check DLQ
1363        let dlq = queue.dlq().expect("DLQ should exist");
1364        tokio::time::sleep(std::time::Duration::from_millis(50)).await; // Give time for DLQ push
1365
1366        assert_eq!(dlq.len().await, 1);
1367
1368        let letters = dlq.list().await;
1369        assert_eq!(letters[0].command_type, "fail_command");
1370        assert_eq!(letters[0].lane_id, "test-lane");
1371        assert_eq!(letters[0].attempts, 3); // Initial + 2 retries
1372        assert!(letters[0].error.contains("Permanent failure"));
1373    }
1374
1375    #[tokio::test]
1376    async fn test_no_dlq_without_configuration() {
1377        let emitter = EventEmitter::new(100);
1378        let queue = Arc::new(CommandQueue::new(emitter));
1379
1380        assert!(queue.dlq().is_none());
1381    }
1382
1383    #[tokio::test]
1384    async fn test_shutdown_rejects_new_commands() {
1385        let emitter = EventEmitter::new(100);
1386        let queue = Arc::new(CommandQueue::new(emitter));
1387
1388        let config = LaneConfig::new(1, 4);
1389        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1390        queue.register_lane(lane).await;
1391
1392        // Initiate shutdown
1393        queue.shutdown().await;
1394        assert!(queue.is_shutting_down());
1395
1396        // Try to submit a command - should be rejected
1397        let cmd = Box::new(TestCommand::new(serde_json::json!({"test": "data"})));
1398        let result = queue.submit("test-lane", cmd).await;
1399
1400        assert!(result.is_err());
1401        if let Err(LaneError::ShutdownInProgress) = result {
1402            // Expected
1403        } else {
1404            panic!("Expected ShutdownInProgress error");
1405        }
1406    }
1407
1408    #[tokio::test]
1409    async fn test_drain_waits_for_completion() {
1410        let emitter = EventEmitter::new(100);
1411        let queue = Arc::new(CommandQueue::new(emitter));
1412
1413        let config = LaneConfig::new(1, 4);
1414        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1415        queue.register_lane(lane).await;
1416
1417        Arc::clone(&queue).start_scheduler().await;
1418
1419        // Submit a slow command
1420        let cmd = Box::new(TestCommand::with_delay(
1421            serde_json::json!({"result": "ok"}),
1422            100,
1423        ));
1424        let rx = queue.submit("test-lane", cmd).await.unwrap();
1425
1426        // Initiate shutdown
1427        queue.shutdown().await;
1428
1429        // Drain should wait for the command to complete
1430        let drain_result = queue.drain(std::time::Duration::from_secs(2)).await;
1431        assert!(drain_result.is_ok());
1432
1433        // Command should have completed
1434        let result = tokio::time::timeout(std::time::Duration::from_millis(100), rx)
1435            .await
1436            .expect("Timeout")
1437            .expect("Channel closed");
1438        assert!(result.is_ok());
1439    }
1440
1441    #[tokio::test]
1442    async fn test_drain_timeout() {
1443        let emitter = EventEmitter::new(100);
1444        let queue = Arc::new(CommandQueue::new(emitter));
1445
1446        let config = LaneConfig::new(1, 4);
1447        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1448        queue.register_lane(lane).await;
1449
1450        Arc::clone(&queue).start_scheduler().await;
1451
1452        // Submit a very slow command
1453        let cmd = Box::new(TestCommand::with_delay(
1454            serde_json::json!({"result": "ok"}),
1455            5000,
1456        ));
1457        let _rx = queue.submit("test-lane", cmd).await.unwrap();
1458
1459        // Initiate shutdown
1460        queue.shutdown().await;
1461
1462        // Drain with short timeout should fail
1463        let drain_result = queue.drain(std::time::Duration::from_millis(50)).await;
1464        assert!(drain_result.is_err());
1465        if let Err(LaneError::Timeout(_)) = drain_result {
1466            // Expected
1467        } else {
1468            panic!("Expected Timeout error");
1469        }
1470    }
1471
1472    #[tokio::test]
1473    async fn test_is_shutting_down() {
1474        let emitter = EventEmitter::new(100);
1475        let queue = Arc::new(CommandQueue::new(emitter));
1476
1477        assert!(!queue.is_shutting_down());
1478
1479        queue.shutdown().await;
1480        assert!(queue.is_shutting_down());
1481    }
1482
1483    // ── Pressure event tests ───────────────────────────────────────────────────
1484
1485    #[tokio::test]
1486    async fn test_lane_pressure_emits_on_threshold() {
1487        use crate::event::events;
1488
1489        let emitter = EventEmitter::new(100);
1490        let queue = Arc::new(CommandQueue::new(emitter.clone()));
1491
1492        let config = LaneConfig::new(1, 4).with_pressure_threshold(2);
1493        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1494        queue.register_lane(lane).await;
1495
1496        // Subscribe before enqueuing to capture all events
1497        let mut stream = emitter.subscribe_filtered(|e| e.key == events::QUEUE_LANE_PRESSURE);
1498
1499        // Enqueue 2 commands (meets threshold=2)
1500        for _ in 0..2 {
1501            let cmd = Box::new(TestCommand::new(serde_json::json!({})));
1502            let _ = queue.submit("test-lane", cmd).await.unwrap();
1503        }
1504
1505        // Start scheduler — first tick calls check_pressure → pending=2 >= 2 → emit PRESSURE
1506        Arc::clone(&queue).start_scheduler().await;
1507
1508        let event = tokio::time::timeout(std::time::Duration::from_secs(1), stream.recv())
1509            .await
1510            .expect("No pressure event received within timeout")
1511            .expect("Stream ended");
1512
1513        assert_eq!(event.key, events::QUEUE_LANE_PRESSURE);
1514    }
1515
1516    #[tokio::test]
1517    async fn test_lane_idle_emits_when_drained() {
1518        use crate::event::events;
1519
1520        let emitter = EventEmitter::new(100);
1521        let queue = Arc::new(CommandQueue::new(emitter.clone()));
1522
1523        let config = LaneConfig::new(1, 4).with_pressure_threshold(1);
1524        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1525        queue.register_lane(lane).await;
1526
1527        // Subscribe to both pressure and idle events
1528        let mut stream = emitter.subscribe_filtered(|e| {
1529            e.key == events::QUEUE_LANE_PRESSURE || e.key == events::QUEUE_LANE_IDLE
1530        });
1531
1532        // Enqueue 1 command (meets threshold=1)
1533        let cmd = Box::new(TestCommand::new(serde_json::json!({})));
1534        let _ = queue.submit("test-lane", cmd).await.unwrap();
1535
1536        // Start scheduler
1537        Arc::clone(&queue).start_scheduler().await;
1538
1539        // First event must be pressure
1540        let pressure = tokio::time::timeout(std::time::Duration::from_secs(1), stream.recv())
1541            .await
1542            .expect("No pressure event")
1543            .expect("Stream ended");
1544        assert_eq!(pressure.key, events::QUEUE_LANE_PRESSURE);
1545
1546        // After dequeue, pending=0 → idle event on the next scheduler tick
1547        let idle = tokio::time::timeout(std::time::Duration::from_secs(1), stream.recv())
1548            .await
1549            .expect("No idle event")
1550            .expect("Stream ended");
1551        assert_eq!(idle.key, events::QUEUE_LANE_IDLE);
1552    }
1553
1554    #[tokio::test]
1555    async fn test_lane_no_pressure_without_threshold() {
1556        use crate::event::events;
1557
1558        let emitter = EventEmitter::new(100);
1559        let queue = Arc::new(CommandQueue::new(emitter.clone()));
1560
1561        // No pressure threshold
1562        let config = LaneConfig::new(1, 4);
1563        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1564        queue.register_lane(lane).await;
1565
1566        let mut stream = emitter.subscribe_filtered(|e| {
1567            e.key == events::QUEUE_LANE_PRESSURE || e.key == events::QUEUE_LANE_IDLE
1568        });
1569
1570        // Enqueue several commands
1571        for _ in 0..5 {
1572            let cmd = Box::new(TestCommand::new(serde_json::json!({})));
1573            let _ = queue.submit("test-lane", cmd).await.unwrap();
1574        }
1575
1576        Arc::clone(&queue).start_scheduler().await;
1577
1578        // Allow time for scheduler ticks to run
1579        tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1580
1581        // No pressure/idle events should have been emitted
1582        let result =
1583            tokio::time::timeout(std::time::Duration::from_millis(50), stream.recv()).await;
1584
1585        assert!(
1586            result.is_err(),
1587            "Should not receive pressure/idle events without threshold"
1588        );
1589    }
1590
1591    // ── Bug-fix tests ──────────────────────────────────────────────────────────
1592
1593    /// Bug fix: EventEmitter.emit() is now called on submit, start, complete, retry,
1594    /// dead-letter, fail, and shutdown.
1595    #[tokio::test]
1596    async fn test_event_emitted_on_submit() {
1597        use crate::event::events;
1598
1599        let emitter = EventEmitter::new(100);
1600        let mut rx = emitter.subscribe();
1601        let queue = CommandQueue::new(emitter);
1602
1603        let config = LaneConfig::new(1, 4);
1604        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1605        queue.register_lane(lane).await;
1606
1607        let cmd = Box::new(TestCommand::new(serde_json::json!({"result": "ok"})));
1608        let _ = queue.submit("test-lane", cmd).await.unwrap();
1609
1610        let event = tokio::time::timeout(std::time::Duration::from_millis(200), async {
1611            rx.recv().await.unwrap()
1612        })
1613        .await
1614        .expect("QUEUE_COMMAND_SUBMITTED event not received");
1615
1616        assert_eq!(event.key, events::QUEUE_COMMAND_SUBMITTED);
1617    }
1618
1619    #[tokio::test]
1620    async fn test_event_emitted_on_complete() {
1621        use crate::event::{events, EventStream};
1622
1623        let emitter = EventEmitter::new(100);
1624        let queue = Arc::new(CommandQueue::new(emitter.clone()));
1625
1626        let config = LaneConfig::new(1, 4);
1627        let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1628        queue.register_lane(lane).await;
1629        Arc::clone(&queue).start_scheduler().await;
1630
1631        let mut stream: EventStream =
1632            emitter.subscribe_filtered(|e| e.key == events::QUEUE_COMMAND_COMPLETED);
1633
1634        let cmd = Box::new(TestCommand::new(serde_json::json!({"result": "ok"})));
1635        let rx = queue.submit("test-lane", cmd).await.unwrap();
1636
1637        // Wait for command to complete
1638        let _ = tokio::time::timeout(std::time::Duration::from_secs(1), rx).await;
1639
1640        let event = tokio::time::timeout(std::time::Duration::from_millis(200), stream.recv())
1641            .await
1642            .expect("QUEUE_COMMAND_COMPLETED event not received")
1643            .expect("stream closed");
1644
1645        assert_eq!(event.key, events::QUEUE_COMMAND_COMPLETED);
1646    }
1647
1648    #[tokio::test]
1649    async fn test_event_emitted_on_shutdown() {
1650        use crate::event::events;
1651
1652        let emitter = EventEmitter::new(100);
1653        let mut rx = emitter.subscribe();
1654        let queue = CommandQueue::new(emitter);
1655
1656        queue.shutdown().await;
1657
1658        let event = tokio::time::timeout(std::time::Duration::from_millis(100), async {
1659            rx.recv().await.unwrap()
1660        })
1661        .await
1662        .expect("QUEUE_SHUTDOWN_STARTED event not received");
1663
1664        assert_eq!(event.key, events::QUEUE_SHUTDOWN_STARTED);
1665    }
1666
1667    #[cfg(feature = "distributed")]
1668    #[tokio::test]
1669    async fn test_rate_limit_blocks_dequeue() {
1670        use crate::ratelimit::RateLimitConfig;
1671
1672        // Token bucket starts full (1 token for per_second(1))
1673        let config = LaneConfig::new(1, 10).with_rate_limit(RateLimitConfig::per_second(1));
1674        let lane = Lane::new("test", config, priorities::QUERY);
1675
1676        let _rx1 = lane
1677            .enqueue(Box::new(TestCommand::new(serde_json::json!(1))))
1678            .await;
1679        let _rx2 = lane
1680            .enqueue(Box::new(TestCommand::new(serde_json::json!(2))))
1681            .await;
1682
1683        // First dequeue consumes the single available token
1684        let first = lane.try_dequeue().await;
1685        assert!(first.is_some(), "first dequeue should succeed");
1686        // Return the slot so capacity isn't the limiting factor
1687        lane.mark_completed().await;
1688
1689        // No tokens left — rate limiter should block the second dequeue
1690        let second = lane.try_dequeue().await;
1691        assert!(second.is_none(), "second dequeue should be rate-limited");
1692    }
1693
1694    #[cfg(feature = "distributed")]
1695    #[tokio::test]
1696    async fn test_effective_priority_no_boost_when_fresh() {
1697        use crate::boost::PriorityBoostConfig;
1698
1699        // Boost activates with 9 s remaining on a 10 s deadline.
1700        // A freshly-enqueued command has ~10 s left, so no boost yet.
1701        let config = LaneConfig::new(1, 4).with_priority_boost(
1702            PriorityBoostConfig::new(std::time::Duration::from_secs(10))
1703                .with_boost(std::time::Duration::from_secs(9), 2),
1704        );
1705        let lane = Lane::new("test", config, priorities::QUERY);
1706
1707        // Without any pending command the lane returns its base priority
1708        assert_eq!(lane.effective_priority().await, priorities::QUERY);
1709
1710        // A just-enqueued command has ~10 s left — no boost
1711        let _rx = lane
1712            .enqueue(Box::new(TestCommand::new(serde_json::json!(1))))
1713            .await;
1714        assert_eq!(lane.effective_priority().await, priorities::QUERY);
1715    }
1716
1717    #[cfg(feature = "distributed")]
1718    #[tokio::test]
1719    async fn test_effective_priority_boosted_when_past_deadline() {
1720        use crate::boost::PriorityBoostConfig;
1721
1722        // Deadline already passed — effective priority should reach 0 (maximum)
1723        let config = LaneConfig::new(1, 4).with_priority_boost(PriorityBoostConfig::standard(
1724            std::time::Duration::from_millis(1), // 1 ms deadline
1725        ));
1726        let lane = Lane::new("test", config, priorities::PROMPT); // base = 5
1727
1728        let _rx = lane
1729            .enqueue(Box::new(TestCommand::new(serde_json::json!(1))))
1730            .await;
1731
1732        // Sleep past the deadline so the booster gives priority 0
1733        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1734
1735        assert_eq!(lane.effective_priority().await, 0);
1736    }
1737}