1#[cfg(feature = "distributed")]
4use crate::boost::PriorityBooster;
5use crate::config::LaneConfig;
6use crate::dlq::{DeadLetter, DeadLetterQueue};
7use crate::error::{LaneError, Result};
8use crate::event::{events, EventEmitter, EventStream, LaneEvent};
9#[cfg(feature = "distributed")]
10use crate::ratelimit::RateLimiter;
11use crate::retry::RetryPolicy;
12use crate::storage::{Storage, StoredCommand};
13#[cfg(feature = "telemetry")]
14use crate::telemetry;
15use async_trait::async_trait;
16use chrono::Utc;
17use serde::{Deserialize, Serialize};
18use std::collections::{HashMap, VecDeque};
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::Arc;
21#[cfg(feature = "telemetry")]
22use std::time::Instant;
23use tokio::sync::{Mutex, Semaphore};
24use uuid::Uuid;
25
26pub type LaneId = String;
28
29pub type CommandId = String;
31
32pub type Priority = u8;
34
35pub mod priorities {
37 use super::Priority;
38
39 pub const SYSTEM: Priority = 0;
40 pub const CONTROL: Priority = 1;
41 pub const QUERY: Priority = 2;
42 pub const SESSION: Priority = 3;
43 pub const SKILL: Priority = 4;
44 pub const PROMPT: Priority = 5;
45}
46
47#[async_trait]
49pub trait Command: Send + Sync {
50 async fn execute(&self) -> Result<serde_json::Value>;
52
53 fn command_type(&self) -> &str;
55}
56
57pub struct JsonCommand {
62 command_type: String,
63 payload: serde_json::Value,
64}
65
66impl JsonCommand {
67 pub fn new(command_type: impl Into<String>, payload: serde_json::Value) -> Self {
69 Self {
70 command_type: command_type.into(),
71 payload,
72 }
73 }
74}
75
76#[async_trait]
77impl Command for JsonCommand {
78 async fn execute(&self) -> Result<serde_json::Value> {
79 Ok(self.payload.clone())
80 }
81
82 fn command_type(&self) -> &str {
83 &self.command_type
84 }
85}
86
87struct CommandWrapper {
89 id: CommandId,
90 command: Arc<dyn Command>,
91 result_tx: Option<tokio::sync::oneshot::Sender<Result<serde_json::Value>>>,
92 timeout: Option<std::time::Duration>,
93 retry_policy: RetryPolicy,
94 attempt: u32,
95 lane_id: LaneId,
96 command_type: String,
97 #[cfg(feature = "distributed")]
99 enqueue_time: chrono::DateTime<chrono::Utc>,
100}
101
102#[allow(dead_code)]
104struct LaneState {
105 config: LaneConfig,
107
108 priority: Priority,
110
111 pending: VecDeque<CommandWrapper>,
113
114 active: usize,
116
117 semaphore: Arc<Semaphore>,
119
120 is_pressured: bool,
122}
123
124impl LaneState {
125 fn new(config: LaneConfig, priority: Priority) -> Self {
126 let semaphore = Arc::new(Semaphore::new(config.max_concurrency));
127 Self {
128 config,
129 priority,
130 pending: VecDeque::new(),
131 active: 0,
132 semaphore,
133 is_pressured: false,
134 }
135 }
136
137 fn has_capacity(&self) -> bool {
138 self.active < self.config.max_concurrency
139 }
140
141 fn has_pending(&self) -> bool {
142 !self.pending.is_empty()
143 }
144}
145
146pub struct Lane {
148 id: LaneId,
149 state: Arc<Mutex<LaneState>>,
150 storage: Option<Arc<dyn Storage>>,
151 #[cfg(feature = "distributed")]
153 rate_limiter: RateLimiter,
154 #[cfg(feature = "distributed")]
156 booster: Option<PriorityBooster>,
157}
158
159impl Lane {
160 pub fn new(id: impl Into<String>, config: LaneConfig, priority: Priority) -> Self {
162 #[cfg(feature = "distributed")]
163 let rate_limiter = config
164 .rate_limit
165 .as_ref()
166 .map(RateLimiter::token_bucket)
167 .unwrap_or_default();
168 #[cfg(feature = "distributed")]
169 let booster = config
170 .priority_boost
171 .as_ref()
172 .map(|pb| PriorityBooster::new(pb.clone()));
173 Self {
174 id: id.into(),
175 state: Arc::new(Mutex::new(LaneState::new(config, priority))),
176 storage: None,
177 #[cfg(feature = "distributed")]
178 rate_limiter,
179 #[cfg(feature = "distributed")]
180 booster,
181 }
182 }
183
184 pub fn with_storage(
186 id: impl Into<String>,
187 config: LaneConfig,
188 priority: Priority,
189 storage: Arc<dyn Storage>,
190 ) -> Self {
191 #[cfg(feature = "distributed")]
192 let rate_limiter = config
193 .rate_limit
194 .as_ref()
195 .map(RateLimiter::token_bucket)
196 .unwrap_or_default();
197 #[cfg(feature = "distributed")]
198 let booster = config
199 .priority_boost
200 .as_ref()
201 .map(|pb| PriorityBooster::new(pb.clone()));
202 Self {
203 id: id.into(),
204 state: Arc::new(Mutex::new(LaneState::new(config, priority))),
205 storage: Some(storage),
206 #[cfg(feature = "distributed")]
207 rate_limiter,
208 #[cfg(feature = "distributed")]
209 booster,
210 }
211 }
212
213 pub fn id(&self) -> &str {
215 &self.id
216 }
217
218 pub async fn priority(&self) -> Priority {
220 self.state.lock().await.priority
221 }
222
223 pub async fn effective_priority(&self) -> Priority {
225 let state = self.state.lock().await;
226 let base = state.priority;
227 #[cfg(feature = "distributed")]
228 if let Some(booster) = &self.booster {
229 if let Some(front) = state.pending.front() {
230 return booster.calculate_priority(base, front.enqueue_time);
231 }
232 }
233 base
234 }
235
236 pub async fn enqueue(
238 &self,
239 command: Box<dyn Command>,
240 ) -> tokio::sync::oneshot::Receiver<Result<serde_json::Value>> {
241 let (tx, rx) = tokio::sync::oneshot::channel();
242 let state = self.state.lock().await;
243 let timeout = state.config.default_timeout;
244 let retry_policy = state.config.retry_policy.clone();
245 drop(state);
246
247 let command_id = Uuid::new_v4().to_string();
248 let command_type = command.command_type().to_string();
249 let wrapper = CommandWrapper {
250 id: command_id.clone(),
251 command: Arc::from(command),
252 result_tx: Some(tx),
253 timeout,
254 retry_policy,
255 attempt: 0,
256 lane_id: self.id.clone(),
257 command_type: command_type.clone(),
258 #[cfg(feature = "distributed")]
259 enqueue_time: Utc::now(),
260 };
261
262 if let Some(storage) = &self.storage {
264 let stored_cmd = StoredCommand {
265 id: command_id,
266 command_type,
267 lane_id: self.id.clone(),
268 payload: serde_json::json!({}), retry_count: 0,
270 created_at: Utc::now(),
271 last_attempt_at: None,
272 };
273 let _ = storage.save_command(stored_cmd).await;
275 }
276
277 let mut state = self.state.lock().await;
278 state.pending.push_back(wrapper);
279
280 rx
281 }
282
283 async fn retry_command(&self, mut wrapper: CommandWrapper, delay: std::time::Duration) {
285 wrapper.attempt += 1;
286
287 let state_clone = Arc::clone(&self.state);
289 tokio::spawn(async move {
290 tokio::time::sleep(delay).await;
291 let mut state = state_clone.lock().await;
292 state.pending.push_back(wrapper);
293 });
294 }
295
296 async fn try_dequeue(&self) -> Option<CommandWrapper> {
298 #[cfg(feature = "distributed")]
300 if !self.rate_limiter.try_acquire().await {
301 return None;
302 }
303 let mut state = self.state.lock().await;
304 if state.has_capacity() && state.has_pending() {
305 state.active += 1;
306 state.pending.pop_front()
307 } else {
308 None
309 }
310 }
311
312 async fn mark_completed(&self) {
314 let mut state = self.state.lock().await;
315 state.active = state.active.saturating_sub(1);
316 }
317
318 pub async fn status(&self) -> LaneStatus {
320 let state = self.state.lock().await;
321 LaneStatus {
322 pending: state.pending.len(),
323 active: state.active,
324 min: state.config.min_concurrency,
325 max: state.config.max_concurrency,
326 }
327 }
328
329 async fn check_pressure(&self) -> Option<&'static str> {
335 let mut state = self.state.lock().await;
336 let threshold = match state.config.pressure_threshold {
337 Some(t) => t,
338 None => return None,
339 };
340 let pending = state.pending.len();
341 let was_pressured = state.is_pressured;
342 if pending >= threshold && !was_pressured {
343 state.is_pressured = true;
344 Some(events::QUEUE_LANE_PRESSURE)
345 } else if pending == 0 && was_pressured {
346 state.is_pressured = false;
347 Some(events::QUEUE_LANE_IDLE)
348 } else {
349 None
350 }
351 }
352}
353
354#[derive(Debug, Clone, Serialize, Deserialize)]
356pub struct LaneStatus {
357 pub pending: usize,
358 pub active: usize,
359 pub min: usize,
360 pub max: usize,
361}
362
363#[allow(dead_code)]
365pub struct CommandQueue {
366 lanes: Arc<Mutex<HashMap<LaneId, Arc<Lane>>>>,
367 event_emitter: EventEmitter,
368 dlq: Option<DeadLetterQueue>,
369 storage: Option<Arc<dyn Storage>>,
370 is_shutting_down: Arc<AtomicBool>,
371 shutdown_notify: Arc<tokio::sync::Notify>,
372}
373
374impl CommandQueue {
375 pub fn new(event_emitter: EventEmitter) -> Self {
377 Self {
378 lanes: Arc::new(Mutex::new(HashMap::new())),
379 event_emitter,
380 dlq: None,
381 storage: None,
382 is_shutting_down: Arc::new(AtomicBool::new(false)),
383 shutdown_notify: Arc::new(tokio::sync::Notify::new()),
384 }
385 }
386
387 pub fn with_dlq(event_emitter: EventEmitter, dlq_size: usize) -> Self {
389 Self {
390 lanes: Arc::new(Mutex::new(HashMap::new())),
391 event_emitter,
392 dlq: Some(DeadLetterQueue::new(dlq_size)),
393 storage: None,
394 is_shutting_down: Arc::new(AtomicBool::new(false)),
395 shutdown_notify: Arc::new(tokio::sync::Notify::new()),
396 }
397 }
398
399 pub fn with_storage(event_emitter: EventEmitter, storage: Arc<dyn Storage>) -> Self {
401 Self {
402 lanes: Arc::new(Mutex::new(HashMap::new())),
403 event_emitter,
404 dlq: None,
405 storage: Some(storage),
406 is_shutting_down: Arc::new(AtomicBool::new(false)),
407 shutdown_notify: Arc::new(tokio::sync::Notify::new()),
408 }
409 }
410
411 pub fn with_dlq_and_storage(
413 event_emitter: EventEmitter,
414 dlq_size: usize,
415 storage: Arc<dyn Storage>,
416 ) -> Self {
417 Self {
418 lanes: Arc::new(Mutex::new(HashMap::new())),
419 event_emitter,
420 dlq: Some(DeadLetterQueue::new(dlq_size)),
421 storage: Some(storage),
422 is_shutting_down: Arc::new(AtomicBool::new(false)),
423 shutdown_notify: Arc::new(tokio::sync::Notify::new()),
424 }
425 }
426
427 pub fn storage(&self) -> Option<&Arc<dyn Storage>> {
429 self.storage.as_ref()
430 }
431
432 pub fn dlq(&self) -> Option<&DeadLetterQueue> {
434 self.dlq.as_ref()
435 }
436
437 pub fn is_shutting_down(&self) -> bool {
439 self.is_shutting_down.load(Ordering::SeqCst)
440 }
441
442 pub async fn shutdown(&self) {
444 self.is_shutting_down.store(true, Ordering::SeqCst);
445 self.event_emitter
446 .emit(LaneEvent::empty(events::QUEUE_SHUTDOWN_STARTED));
447 }
448
449 pub async fn drain(&self, timeout: std::time::Duration) -> Result<()> {
451 let start = std::time::Instant::now();
452
453 loop {
454 let lanes = self.lanes.lock().await;
456 let mut all_idle = true;
457
458 for lane in lanes.values() {
459 let status = lane.status().await;
460 if status.pending > 0 || status.active > 0 {
461 all_idle = false;
462 break;
463 }
464 }
465 drop(lanes);
466
467 if all_idle {
468 return Ok(());
469 }
470
471 if start.elapsed() >= timeout {
473 return Err(LaneError::Timeout(timeout));
474 }
475
476 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
478 }
479 }
480
481 pub async fn register_lane(&self, lane: Arc<Lane>) {
483 let mut lanes = self.lanes.lock().await;
484 lanes.insert(lane.id().to_string(), lane);
485 }
486
487 pub async fn submit(
489 &self,
490 lane_id: &str,
491 command: Box<dyn Command>,
492 ) -> Result<tokio::sync::oneshot::Receiver<Result<serde_json::Value>>> {
493 if self.is_shutting_down() {
495 return Err(LaneError::ShutdownInProgress);
496 }
497
498 let lanes = self.lanes.lock().await;
499 let lane = lanes
500 .get(lane_id)
501 .ok_or_else(|| LaneError::LaneNotFound(lane_id.to_string()))?;
502 let rx = lane.enqueue(command).await;
503
504 self.event_emitter.emit(LaneEvent::with_map(
505 events::QUEUE_COMMAND_SUBMITTED,
506 HashMap::from([("lane_id".to_string(), serde_json::json!(lane_id))]),
507 ));
508
509 Ok(rx)
510 }
511
512 pub async fn start_scheduler(self: Arc<Self>) {
514 tokio::spawn(async move {
515 loop {
516 self.schedule_next().await;
517 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
518 }
519 });
520 }
521
522 async fn schedule_next(&self) {
524 let lanes = self.lanes.lock().await;
527
528 for (lane_id, lane) in lanes.iter() {
530 if let Some(event_key) = lane.check_pressure().await {
531 self.event_emitter.emit(LaneEvent::with_map(
532 event_key,
533 HashMap::from([("lane_id".to_string(), serde_json::json!(lane_id))]),
534 ));
535 }
536 }
537
538 let mut lane_priorities = Vec::new();
539 for lane in lanes.values() {
540 let priority = lane.effective_priority().await;
541 lane_priorities.push((priority, Arc::clone(lane)));
542 }
543
544 lane_priorities.sort_by_key(|(priority, _)| *priority);
546
547 for (_, lane) in lane_priorities {
548 if let Some(mut wrapper) = lane.try_dequeue().await {
549 let lane_clone = Arc::clone(&lane);
550 let timeout = wrapper.timeout;
551 let retry_policy = wrapper.retry_policy.clone();
552 let attempt = wrapper.attempt;
553 let dlq = self.dlq.clone();
554 let command_id = wrapper.id.clone();
555 let command_type = wrapper.command_type.clone();
556 let lane_id = wrapper.lane_id.clone();
557 let storage = lane.storage.clone();
558 let event_emitter = self.event_emitter.clone();
559
560 event_emitter.emit(LaneEvent::with_map(
561 events::QUEUE_COMMAND_STARTED,
562 HashMap::from([
563 ("lane_id".to_string(), serde_json::json!(lane_id)),
564 ("command_id".to_string(), serde_json::json!(command_id)),
565 ("command_type".to_string(), serde_json::json!(command_type)),
566 ]),
567 ));
568
569 tokio::spawn(async move {
570 #[cfg(feature = "telemetry")]
571 let exec_start = Instant::now();
572
573 let result = match timeout {
574 Some(dur) => {
575 match tokio::time::timeout(dur, wrapper.command.execute()).await {
576 Ok(r) => r,
577 Err(_) => Err(LaneError::Timeout(dur)),
578 }
579 }
580 None => wrapper.command.execute().await,
581 };
582
583 match result {
584 Ok(value) => {
585 if let Some(storage) = &storage {
586 let _ = storage.remove_command(&command_id).await;
587 }
588
589 #[cfg(feature = "telemetry")]
590 telemetry::record_complete(
591 &lane_id,
592 exec_start.elapsed().as_secs_f64(),
593 );
594
595 event_emitter.emit(LaneEvent::with_map(
596 events::QUEUE_COMMAND_COMPLETED,
597 HashMap::from([
598 ("lane_id".to_string(), serde_json::json!(lane_id)),
599 ("command_id".to_string(), serde_json::json!(command_id)),
600 ]),
601 ));
602
603 if let Some(tx) = wrapper.result_tx.take() {
604 let _ = tx.send(Ok(value));
605 }
606 lane_clone.mark_completed().await;
607 }
608 Err(err) => {
609 if retry_policy.should_retry(attempt) {
610 let delay = retry_policy.delay_for_attempt(attempt + 1);
611
612 tracing::info!(
613 command_id = %command_id,
614 retry_attempt = attempt + 1,
615 "a3s.lane.retry: retrying command"
616 );
617
618 event_emitter.emit(LaneEvent::with_map(
619 events::QUEUE_COMMAND_RETRY,
620 HashMap::from([
621 ("lane_id".to_string(), serde_json::json!(lane_id)),
622 ("command_id".to_string(), serde_json::json!(command_id)),
623 ("attempt".to_string(), serde_json::json!(attempt + 1)),
624 ]),
625 ));
626
627 lane_clone.retry_command(wrapper, delay).await;
628 lane_clone.mark_completed().await;
629 } else {
630 #[cfg(feature = "telemetry")]
631 telemetry::record_failure(&lane_id);
632
633 if let Some(storage) = &storage {
634 let _ = storage.remove_command(&command_id).await;
635 }
636
637 let error_msg = err.to_string();
638 let is_timeout = matches!(err, LaneError::Timeout(_));
639
640 if let Some(dlq) = dlq {
641 let dead_letter = DeadLetter {
642 command_id: command_id.clone(),
643 command_type: command_type.clone(),
644 lane_id: lane_id.clone(),
645 error: error_msg.clone(),
646 attempts: attempt + 1,
647 failed_at: Utc::now(),
648 };
649 dlq.push(dead_letter).await;
650
651 event_emitter.emit(LaneEvent::with_map(
652 events::QUEUE_COMMAND_DEAD_LETTERED,
653 HashMap::from([
654 ("lane_id".to_string(), serde_json::json!(lane_id)),
655 (
656 "command_id".to_string(),
657 serde_json::json!(command_id),
658 ),
659 (
660 "command_type".to_string(),
661 serde_json::json!(command_type),
662 ),
663 ]),
664 ));
665 }
666
667 event_emitter.emit(LaneEvent::with_map(
668 if is_timeout {
669 events::QUEUE_COMMAND_TIMEOUT
670 } else {
671 events::QUEUE_COMMAND_FAILED
672 },
673 HashMap::from([
674 ("lane_id".to_string(), serde_json::json!(lane_id)),
675 ("command_id".to_string(), serde_json::json!(command_id)),
676 ("error".to_string(), serde_json::json!(error_msg)),
677 ]),
678 ));
679
680 if let Some(tx) = wrapper.result_tx.take() {
681 let _ = tx.send(Err(err));
682 }
683 lane_clone.mark_completed().await;
684 }
685 }
686 }
687 });
688 break;
689 }
690 }
691 }
692
693 pub fn subscribe_stream(&self) -> EventStream {
695 self.event_emitter.subscribe_stream()
696 }
697
698 pub fn subscribe_filtered(
700 &self,
701 filter: impl Fn(&LaneEvent) -> bool + Send + Sync + 'static,
702 ) -> EventStream {
703 self.event_emitter.subscribe_filtered(filter)
704 }
705
706 pub async fn status(&self) -> HashMap<LaneId, LaneStatus> {
708 let lanes = self.lanes.lock().await;
709 let mut status = HashMap::new();
710
711 for (id, lane) in lanes.iter() {
712 status.insert(id.clone(), lane.status().await);
713 }
714
715 status
716 }
717}
718
719pub mod lane_ids {
721 pub const SYSTEM: &str = "system";
722 pub const CONTROL: &str = "control";
723 pub const QUERY: &str = "query";
724 pub const SESSION: &str = "session";
725 pub const SKILL: &str = "skill";
726 pub const PROMPT: &str = "prompt";
727}
728
729#[cfg(test)]
730mod tests {
731 use super::*;
732
733 struct TestCommand {
735 result: serde_json::Value,
736 delay_ms: Option<u64>,
737 }
738
739 impl TestCommand {
740 fn new(result: serde_json::Value) -> Self {
741 Self {
742 result,
743 delay_ms: None,
744 }
745 }
746
747 fn with_delay(result: serde_json::Value, delay_ms: u64) -> Self {
748 Self {
749 result,
750 delay_ms: Some(delay_ms),
751 }
752 }
753 }
754
755 #[async_trait]
756 impl Command for TestCommand {
757 async fn execute(&self) -> Result<serde_json::Value> {
758 if let Some(delay) = self.delay_ms {
759 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
760 }
761 Ok(self.result.clone())
762 }
763
764 fn command_type(&self) -> &str {
765 "test"
766 }
767 }
768
769 struct FailingCommand {
771 message: String,
772 }
773
774 #[async_trait]
775 impl Command for FailingCommand {
776 async fn execute(&self) -> Result<serde_json::Value> {
777 Err(LaneError::Other(self.message.clone()))
778 }
779
780 fn command_type(&self) -> &str {
781 "failing"
782 }
783 }
784
785 #[test]
786 fn test_priorities() {
787 assert_eq!(priorities::SYSTEM, 0);
788 assert_eq!(priorities::CONTROL, 1);
789 assert_eq!(priorities::QUERY, 2);
790 assert_eq!(priorities::SESSION, 3);
791 assert_eq!(priorities::SKILL, 4);
792 assert_eq!(priorities::PROMPT, 5);
793
794 const _: () = {
797 assert!(priorities::SYSTEM < priorities::CONTROL);
798 assert!(priorities::CONTROL < priorities::QUERY);
799 assert!(priorities::QUERY < priorities::SESSION);
800 assert!(priorities::SESSION < priorities::SKILL);
801 assert!(priorities::SKILL < priorities::PROMPT);
802 };
803 }
804
805 #[test]
806 fn test_lane_ids() {
807 assert_eq!(lane_ids::SYSTEM, "system");
808 assert_eq!(lane_ids::CONTROL, "control");
809 assert_eq!(lane_ids::QUERY, "query");
810 assert_eq!(lane_ids::SESSION, "session");
811 assert_eq!(lane_ids::SKILL, "skill");
812 assert_eq!(lane_ids::PROMPT, "prompt");
813 }
814
815 #[test]
816 fn test_lane_new() {
817 let config = LaneConfig::new(1, 4);
818 let lane = Lane::new("test-lane", config, priorities::QUERY);
819
820 assert_eq!(lane.id(), "test-lane");
821 }
822
823 #[tokio::test]
824 async fn test_lane_priority() {
825 let config = LaneConfig::new(1, 4);
826 let lane = Lane::new("test", config, priorities::SESSION);
827
828 assert_eq!(lane.priority().await, priorities::SESSION);
829 }
830
831 #[tokio::test]
832 async fn test_lane_status_initial() {
833 let config = LaneConfig::new(2, 8);
834 let lane = Lane::new("test", config, priorities::QUERY);
835
836 let status = lane.status().await;
837 assert_eq!(status.pending, 0);
838 assert_eq!(status.active, 0);
839 assert_eq!(status.min, 2);
840 assert_eq!(status.max, 8);
841 }
842
843 #[tokio::test]
844 async fn test_lane_enqueue() {
845 let config = LaneConfig::new(1, 4);
846 let lane = Lane::new("test", config, priorities::QUERY);
847
848 let cmd = Box::new(TestCommand::new(serde_json::json!({"result": "ok"})));
849 let _rx = lane.enqueue(cmd).await;
850
851 let status = lane.status().await;
852 assert_eq!(status.pending, 1);
853 }
854
855 #[tokio::test]
856 async fn test_lane_status_serialization() {
857 let status = LaneStatus {
858 pending: 5,
859 active: 2,
860 min: 1,
861 max: 8,
862 };
863
864 let json = serde_json::to_string(&status).unwrap();
865 assert!(json.contains("\"pending\":5"));
866 assert!(json.contains("\"active\":2"));
867 assert!(json.contains("\"min\":1"));
868 assert!(json.contains("\"max\":8"));
869
870 let parsed: LaneStatus = serde_json::from_str(&json).unwrap();
871 assert_eq!(parsed.pending, 5);
872 assert_eq!(parsed.active, 2);
873 }
874
875 #[tokio::test]
876 async fn test_command_queue_new() {
877 let emitter = EventEmitter::new(100);
878 let queue = CommandQueue::new(emitter);
879
880 let status = queue.status().await;
881 assert!(status.is_empty());
882 }
883
884 #[tokio::test]
885 async fn test_command_queue_register_lane() {
886 let emitter = EventEmitter::new(100);
887 let queue = CommandQueue::new(emitter);
888
889 let config = LaneConfig::new(1, 4);
890 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
891
892 queue.register_lane(lane).await;
893
894 let status = queue.status().await;
895 assert!(status.contains_key("test-lane"));
896 }
897
898 #[tokio::test]
899 async fn test_command_queue_submit() {
900 let emitter = EventEmitter::new(100);
901 let queue = CommandQueue::new(emitter);
902
903 let config = LaneConfig::new(1, 4);
904 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
905 queue.register_lane(lane).await;
906
907 let cmd = Box::new(TestCommand::new(serde_json::json!({"status": "ok"})));
908 let result = queue.submit("test-lane", cmd).await;
909
910 assert!(result.is_ok());
911 }
912
913 #[tokio::test]
914 async fn test_command_queue_submit_unknown_lane() {
915 let emitter = EventEmitter::new(100);
916 let queue = CommandQueue::new(emitter);
917
918 let cmd = Box::new(TestCommand::new(serde_json::json!({})));
919 let result = queue.submit("nonexistent", cmd).await;
920
921 assert!(result.is_err());
922 if let Err(LaneError::LaneNotFound(id)) = result {
923 assert_eq!(id, "nonexistent");
924 } else {
925 panic!("Expected LaneNotFound error");
926 }
927 }
928
929 #[tokio::test]
930 async fn test_command_queue_multiple_lanes() {
931 let emitter = EventEmitter::new(100);
932 let queue = CommandQueue::new(emitter);
933
934 let configs = vec![
936 ("system", priorities::SYSTEM, 1),
937 ("control", priorities::CONTROL, 8),
938 ("query", priorities::QUERY, 4),
939 ];
940
941 for (id, priority, max) in configs {
942 let config = LaneConfig::new(1, max);
943 let lane = Arc::new(Lane::new(id, config, priority));
944 queue.register_lane(lane).await;
945 }
946
947 let status = queue.status().await;
948 assert_eq!(status.len(), 3);
949 assert!(status.contains_key("system"));
950 assert!(status.contains_key("control"));
951 assert!(status.contains_key("query"));
952 }
953
954 #[tokio::test]
955 async fn test_command_queue_status() {
956 let emitter = EventEmitter::new(100);
957 let queue = CommandQueue::new(emitter);
958
959 let config = LaneConfig::new(2, 16);
960 let lane = Arc::new(Lane::new("query", config, priorities::QUERY));
961 queue.register_lane(lane).await;
962
963 let status = queue.status().await;
964 let lane_status = status.get("query").unwrap();
965
966 assert_eq!(lane_status.min, 2);
967 assert_eq!(lane_status.max, 16);
968 assert_eq!(lane_status.pending, 0);
969 assert_eq!(lane_status.active, 0);
970 }
971
972 #[test]
973 fn test_lane_state_has_capacity() {
974 let config = LaneConfig::new(1, 2);
975 let mut state = LaneState::new(config, priorities::QUERY);
976
977 assert!(state.has_capacity());
978 state.active = 1;
979 assert!(state.has_capacity());
980 state.active = 2;
981 assert!(!state.has_capacity());
982 }
983
984 #[tokio::test]
985 async fn test_lane_state_has_pending() {
986 let config = LaneConfig::new(1, 4);
987 let state = LaneState::new(config, priorities::QUERY);
988
989 assert!(!state.has_pending());
990 assert_eq!(state.pending.len(), 0);
991 }
992
993 #[test]
994 fn test_lane_status_debug() {
995 let status = LaneStatus {
996 pending: 3,
997 active: 1,
998 min: 1,
999 max: 4,
1000 };
1001
1002 let debug_str = format!("{:?}", status);
1003 assert!(debug_str.contains("LaneStatus"));
1004 assert!(debug_str.contains("pending"));
1005 assert!(debug_str.contains("active"));
1006 }
1007
1008 #[test]
1009 fn test_lane_status_clone() {
1010 let status = LaneStatus {
1011 pending: 5,
1012 active: 2,
1013 min: 1,
1014 max: 8,
1015 };
1016
1017 let cloned = status.clone();
1018 assert_eq!(cloned.pending, 5);
1019 assert_eq!(cloned.active, 2);
1020 assert_eq!(cloned.min, 1);
1021 assert_eq!(cloned.max, 8);
1022 }
1023
1024 #[tokio::test]
1025 async fn test_command_execution() {
1026 let cmd = TestCommand::new(serde_json::json!({"value": 42}));
1027 let result = cmd.execute().await;
1028
1029 assert!(result.is_ok());
1030 assert_eq!(result.unwrap(), serde_json::json!({"value": 42}));
1031 }
1032
1033 #[tokio::test]
1034 async fn test_command_type() {
1035 let cmd = TestCommand::new(serde_json::json!({}));
1036 assert_eq!(cmd.command_type(), "test");
1037
1038 let failing = FailingCommand {
1039 message: "error".to_string(),
1040 };
1041 assert_eq!(failing.command_type(), "failing");
1042 }
1043
1044 #[tokio::test]
1045 async fn test_failing_command() {
1046 let cmd = FailingCommand {
1047 message: "Something went wrong".to_string(),
1048 };
1049
1050 let result = cmd.execute().await;
1051 assert!(result.is_err());
1052
1053 if let Err(LaneError::Other(msg)) = result {
1054 assert_eq!(msg, "Something went wrong");
1055 } else {
1056 panic!("Expected Other error");
1057 }
1058 }
1059
1060 #[tokio::test]
1061 async fn test_command_with_delay() {
1062 let cmd = TestCommand::with_delay(serde_json::json!({"delayed": true}), 10);
1063
1064 let start = std::time::Instant::now();
1065 let result = cmd.execute().await;
1066 let elapsed = start.elapsed();
1067
1068 assert!(result.is_ok());
1069 assert!(elapsed.as_millis() >= 10);
1070 }
1071
1072 #[test]
1073 fn test_priority_type() {
1074 let p: Priority = 5;
1075 assert_eq!(p, 5u8);
1076 }
1077
1078 #[test]
1079 fn test_lane_id_type() {
1080 let id: LaneId = "test-lane".to_string();
1081 assert_eq!(id, "test-lane");
1082 }
1083
1084 #[test]
1085 fn test_command_id_type() {
1086 let id: CommandId = "cmd-123".to_string();
1087 assert_eq!(id, "cmd-123");
1088 }
1089
1090 #[tokio::test]
1091 async fn test_command_timeout() {
1092 let emitter = EventEmitter::new(100);
1093 let queue = Arc::new(CommandQueue::new(emitter));
1094
1095 let config = LaneConfig::new(1, 4).with_timeout(std::time::Duration::from_millis(50));
1096 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1097 queue.register_lane(lane).await;
1098
1099 Arc::clone(&queue).start_scheduler().await;
1101
1102 let cmd = Box::new(TestCommand::with_delay(
1104 serde_json::json!({"result": "ok"}),
1105 200,
1106 ));
1107 let rx = queue.submit("test-lane", cmd).await.unwrap();
1108
1109 let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
1111 .await
1112 .expect("Timeout waiting for result")
1113 .expect("Channel closed");
1114
1115 assert!(result.is_err());
1117 if let Err(LaneError::Timeout(dur)) = result {
1118 assert_eq!(dur, std::time::Duration::from_millis(50));
1119 } else {
1120 panic!("Expected Timeout error");
1121 }
1122 }
1123
1124 #[tokio::test]
1125 async fn test_command_no_timeout() {
1126 let emitter = EventEmitter::new(100);
1127 let queue = Arc::new(CommandQueue::new(emitter));
1128
1129 let config = LaneConfig::new(1, 4);
1130 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1131 queue.register_lane(lane).await;
1132
1133 Arc::clone(&queue).start_scheduler().await;
1134
1135 let cmd = Box::new(TestCommand::with_delay(
1137 serde_json::json!({"result": "ok"}),
1138 50,
1139 ));
1140 let rx = queue.submit("test-lane", cmd).await.unwrap();
1141
1142 let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
1143 .await
1144 .expect("Timeout waiting for result")
1145 .expect("Channel closed");
1146
1147 assert!(result.is_ok());
1149 assert_eq!(result.unwrap()["result"], "ok");
1150 }
1151
1152 #[tokio::test]
1153 async fn test_command_completes_before_timeout() {
1154 let emitter = EventEmitter::new(100);
1155 let queue = Arc::new(CommandQueue::new(emitter));
1156
1157 let config = LaneConfig::new(1, 4).with_timeout(std::time::Duration::from_secs(5));
1158 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1159 queue.register_lane(lane).await;
1160
1161 Arc::clone(&queue).start_scheduler().await;
1162
1163 let cmd = Box::new(TestCommand::with_delay(
1165 serde_json::json!({"result": "fast"}),
1166 10,
1167 ));
1168 let rx = queue.submit("test-lane", cmd).await.unwrap();
1169
1170 let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
1171 .await
1172 .expect("Timeout waiting for result")
1173 .expect("Channel closed");
1174
1175 assert!(result.is_ok());
1177 assert_eq!(result.unwrap()["result"], "fast");
1178 }
1179
1180 #[tokio::test]
1181 async fn test_command_retry_on_failure() {
1182 use std::sync::atomic::{AtomicU32, Ordering};
1183
1184 struct RetryableCommand {
1186 attempts: Arc<AtomicU32>,
1187 }
1188
1189 #[async_trait]
1190 impl Command for RetryableCommand {
1191 async fn execute(&self) -> Result<serde_json::Value> {
1192 let attempt = self.attempts.fetch_add(1, Ordering::SeqCst);
1193 if attempt < 2 {
1194 Err(LaneError::Other(format!("Attempt {} failed", attempt)))
1195 } else {
1196 Ok(serde_json::json!({"success": true, "attempts": attempt + 1}))
1197 }
1198 }
1199
1200 fn command_type(&self) -> &str {
1201 "retryable"
1202 }
1203 }
1204
1205 let emitter = EventEmitter::new(100);
1206 let queue = Arc::new(CommandQueue::new(emitter));
1207
1208 let retry_policy = RetryPolicy::fixed(3, std::time::Duration::from_millis(10));
1209 let config = LaneConfig::new(1, 4).with_retry_policy(retry_policy);
1210 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1211 queue.register_lane(lane).await;
1212
1213 Arc::clone(&queue).start_scheduler().await;
1214
1215 let attempts = Arc::new(AtomicU32::new(0));
1216 let cmd = Box::new(RetryableCommand {
1217 attempts: Arc::clone(&attempts),
1218 });
1219 let rx = queue.submit("test-lane", cmd).await.unwrap();
1220
1221 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
1223 .await
1224 .expect("Timeout waiting for result")
1225 .expect("Channel closed");
1226
1227 assert!(result.is_ok());
1228 let value = result.unwrap();
1229 assert_eq!(value["success"], true);
1230 assert_eq!(value["attempts"], 3); }
1232
1233 #[tokio::test]
1234 async fn test_command_retry_exhausted() {
1235 struct AlwaysFailCommand;
1237
1238 #[async_trait]
1239 impl Command for AlwaysFailCommand {
1240 async fn execute(&self) -> Result<serde_json::Value> {
1241 Err(LaneError::Other("Always fails".to_string()))
1242 }
1243
1244 fn command_type(&self) -> &str {
1245 "always_fail"
1246 }
1247 }
1248
1249 let emitter = EventEmitter::new(100);
1250 let queue = Arc::new(CommandQueue::new(emitter));
1251
1252 let retry_policy = RetryPolicy::fixed(2, std::time::Duration::from_millis(10));
1253 let config = LaneConfig::new(1, 4).with_retry_policy(retry_policy);
1254 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1255 queue.register_lane(lane).await;
1256
1257 Arc::clone(&queue).start_scheduler().await;
1258
1259 let cmd = Box::new(AlwaysFailCommand);
1260 let rx = queue.submit("test-lane", cmd).await.unwrap();
1261
1262 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
1264 .await
1265 .expect("Timeout waiting for result")
1266 .expect("Channel closed");
1267
1268 assert!(result.is_err());
1269 if let Err(LaneError::Other(msg)) = result {
1270 assert_eq!(msg, "Always fails");
1271 } else {
1272 panic!("Expected Other error");
1273 }
1274 }
1275
1276 #[tokio::test]
1277 async fn test_command_no_retry_on_success() {
1278 use std::sync::atomic::{AtomicU32, Ordering};
1279
1280 struct CountingCommand {
1281 counter: Arc<AtomicU32>,
1282 }
1283
1284 #[async_trait]
1285 impl Command for CountingCommand {
1286 async fn execute(&self) -> Result<serde_json::Value> {
1287 let count = self.counter.fetch_add(1, Ordering::SeqCst);
1288 Ok(serde_json::json!({"count": count + 1}))
1289 }
1290
1291 fn command_type(&self) -> &str {
1292 "counting"
1293 }
1294 }
1295
1296 let emitter = EventEmitter::new(100);
1297 let queue = Arc::new(CommandQueue::new(emitter));
1298
1299 let retry_policy = RetryPolicy::exponential(3);
1300 let config = LaneConfig::new(1, 4).with_retry_policy(retry_policy);
1301 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1302 queue.register_lane(lane).await;
1303
1304 Arc::clone(&queue).start_scheduler().await;
1305
1306 let counter = Arc::new(AtomicU32::new(0));
1307 let cmd = Box::new(CountingCommand {
1308 counter: Arc::clone(&counter),
1309 });
1310 let rx = queue.submit("test-lane", cmd).await.unwrap();
1311
1312 let result = tokio::time::timeout(std::time::Duration::from_secs(1), rx)
1313 .await
1314 .expect("Timeout waiting for result")
1315 .expect("Channel closed");
1316
1317 assert!(result.is_ok());
1318 assert_eq!(result.unwrap()["count"], 1);
1319
1320 assert_eq!(counter.load(Ordering::SeqCst), 1);
1322 }
1323
1324 #[tokio::test]
1325 async fn test_dlq_integration() {
1326 struct FailCommand;
1328
1329 #[async_trait]
1330 impl Command for FailCommand {
1331 async fn execute(&self) -> Result<serde_json::Value> {
1332 Err(LaneError::Other("Permanent failure".to_string()))
1333 }
1334
1335 fn command_type(&self) -> &str {
1336 "fail_command"
1337 }
1338 }
1339
1340 let emitter = EventEmitter::new(100);
1341 let queue = Arc::new(CommandQueue::with_dlq(emitter, 100));
1342
1343 let retry_policy = RetryPolicy::fixed(2, std::time::Duration::from_millis(10));
1344 let config = LaneConfig::new(1, 4).with_retry_policy(retry_policy);
1345 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1346 queue.register_lane(lane).await;
1347
1348 Arc::clone(&queue).start_scheduler().await;
1349
1350 let cmd = Box::new(FailCommand);
1352 let rx = queue.submit("test-lane", cmd).await.unwrap();
1353
1354 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
1356 .await
1357 .expect("Timeout waiting for result")
1358 .expect("Channel closed");
1359
1360 assert!(result.is_err());
1361
1362 let dlq = queue.dlq().expect("DLQ should exist");
1364 tokio::time::sleep(std::time::Duration::from_millis(50)).await; assert_eq!(dlq.len().await, 1);
1367
1368 let letters = dlq.list().await;
1369 assert_eq!(letters[0].command_type, "fail_command");
1370 assert_eq!(letters[0].lane_id, "test-lane");
1371 assert_eq!(letters[0].attempts, 3); assert!(letters[0].error.contains("Permanent failure"));
1373 }
1374
1375 #[tokio::test]
1376 async fn test_no_dlq_without_configuration() {
1377 let emitter = EventEmitter::new(100);
1378 let queue = Arc::new(CommandQueue::new(emitter));
1379
1380 assert!(queue.dlq().is_none());
1381 }
1382
1383 #[tokio::test]
1384 async fn test_shutdown_rejects_new_commands() {
1385 let emitter = EventEmitter::new(100);
1386 let queue = Arc::new(CommandQueue::new(emitter));
1387
1388 let config = LaneConfig::new(1, 4);
1389 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1390 queue.register_lane(lane).await;
1391
1392 queue.shutdown().await;
1394 assert!(queue.is_shutting_down());
1395
1396 let cmd = Box::new(TestCommand::new(serde_json::json!({"test": "data"})));
1398 let result = queue.submit("test-lane", cmd).await;
1399
1400 assert!(result.is_err());
1401 if let Err(LaneError::ShutdownInProgress) = result {
1402 } else {
1404 panic!("Expected ShutdownInProgress error");
1405 }
1406 }
1407
1408 #[tokio::test]
1409 async fn test_drain_waits_for_completion() {
1410 let emitter = EventEmitter::new(100);
1411 let queue = Arc::new(CommandQueue::new(emitter));
1412
1413 let config = LaneConfig::new(1, 4);
1414 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1415 queue.register_lane(lane).await;
1416
1417 Arc::clone(&queue).start_scheduler().await;
1418
1419 let cmd = Box::new(TestCommand::with_delay(
1421 serde_json::json!({"result": "ok"}),
1422 100,
1423 ));
1424 let rx = queue.submit("test-lane", cmd).await.unwrap();
1425
1426 queue.shutdown().await;
1428
1429 let drain_result = queue.drain(std::time::Duration::from_secs(2)).await;
1431 assert!(drain_result.is_ok());
1432
1433 let result = tokio::time::timeout(std::time::Duration::from_millis(100), rx)
1435 .await
1436 .expect("Timeout")
1437 .expect("Channel closed");
1438 assert!(result.is_ok());
1439 }
1440
1441 #[tokio::test]
1442 async fn test_drain_timeout() {
1443 let emitter = EventEmitter::new(100);
1444 let queue = Arc::new(CommandQueue::new(emitter));
1445
1446 let config = LaneConfig::new(1, 4);
1447 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1448 queue.register_lane(lane).await;
1449
1450 Arc::clone(&queue).start_scheduler().await;
1451
1452 let cmd = Box::new(TestCommand::with_delay(
1454 serde_json::json!({"result": "ok"}),
1455 5000,
1456 ));
1457 let _rx = queue.submit("test-lane", cmd).await.unwrap();
1458
1459 queue.shutdown().await;
1461
1462 let drain_result = queue.drain(std::time::Duration::from_millis(50)).await;
1464 assert!(drain_result.is_err());
1465 if let Err(LaneError::Timeout(_)) = drain_result {
1466 } else {
1468 panic!("Expected Timeout error");
1469 }
1470 }
1471
1472 #[tokio::test]
1473 async fn test_is_shutting_down() {
1474 let emitter = EventEmitter::new(100);
1475 let queue = Arc::new(CommandQueue::new(emitter));
1476
1477 assert!(!queue.is_shutting_down());
1478
1479 queue.shutdown().await;
1480 assert!(queue.is_shutting_down());
1481 }
1482
1483 #[tokio::test]
1486 async fn test_lane_pressure_emits_on_threshold() {
1487 use crate::event::events;
1488
1489 let emitter = EventEmitter::new(100);
1490 let queue = Arc::new(CommandQueue::new(emitter.clone()));
1491
1492 let config = LaneConfig::new(1, 4).with_pressure_threshold(2);
1493 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1494 queue.register_lane(lane).await;
1495
1496 let mut stream = emitter.subscribe_filtered(|e| e.key == events::QUEUE_LANE_PRESSURE);
1498
1499 for _ in 0..2 {
1501 let cmd = Box::new(TestCommand::new(serde_json::json!({})));
1502 let _ = queue.submit("test-lane", cmd).await.unwrap();
1503 }
1504
1505 Arc::clone(&queue).start_scheduler().await;
1507
1508 let event = tokio::time::timeout(std::time::Duration::from_secs(1), stream.recv())
1509 .await
1510 .expect("No pressure event received within timeout")
1511 .expect("Stream ended");
1512
1513 assert_eq!(event.key, events::QUEUE_LANE_PRESSURE);
1514 }
1515
1516 #[tokio::test]
1517 async fn test_lane_idle_emits_when_drained() {
1518 use crate::event::events;
1519
1520 let emitter = EventEmitter::new(100);
1521 let queue = Arc::new(CommandQueue::new(emitter.clone()));
1522
1523 let config = LaneConfig::new(1, 4).with_pressure_threshold(1);
1524 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1525 queue.register_lane(lane).await;
1526
1527 let mut stream = emitter.subscribe_filtered(|e| {
1529 e.key == events::QUEUE_LANE_PRESSURE || e.key == events::QUEUE_LANE_IDLE
1530 });
1531
1532 let cmd = Box::new(TestCommand::new(serde_json::json!({})));
1534 let _ = queue.submit("test-lane", cmd).await.unwrap();
1535
1536 Arc::clone(&queue).start_scheduler().await;
1538
1539 let pressure = tokio::time::timeout(std::time::Duration::from_secs(1), stream.recv())
1541 .await
1542 .expect("No pressure event")
1543 .expect("Stream ended");
1544 assert_eq!(pressure.key, events::QUEUE_LANE_PRESSURE);
1545
1546 let idle = tokio::time::timeout(std::time::Duration::from_secs(1), stream.recv())
1548 .await
1549 .expect("No idle event")
1550 .expect("Stream ended");
1551 assert_eq!(idle.key, events::QUEUE_LANE_IDLE);
1552 }
1553
1554 #[tokio::test]
1555 async fn test_lane_no_pressure_without_threshold() {
1556 use crate::event::events;
1557
1558 let emitter = EventEmitter::new(100);
1559 let queue = Arc::new(CommandQueue::new(emitter.clone()));
1560
1561 let config = LaneConfig::new(1, 4);
1563 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1564 queue.register_lane(lane).await;
1565
1566 let mut stream = emitter.subscribe_filtered(|e| {
1567 e.key == events::QUEUE_LANE_PRESSURE || e.key == events::QUEUE_LANE_IDLE
1568 });
1569
1570 for _ in 0..5 {
1572 let cmd = Box::new(TestCommand::new(serde_json::json!({})));
1573 let _ = queue.submit("test-lane", cmd).await.unwrap();
1574 }
1575
1576 Arc::clone(&queue).start_scheduler().await;
1577
1578 tokio::time::sleep(std::time::Duration::from_millis(200)).await;
1580
1581 let result =
1583 tokio::time::timeout(std::time::Duration::from_millis(50), stream.recv()).await;
1584
1585 assert!(
1586 result.is_err(),
1587 "Should not receive pressure/idle events without threshold"
1588 );
1589 }
1590
1591 #[tokio::test]
1596 async fn test_event_emitted_on_submit() {
1597 use crate::event::events;
1598
1599 let emitter = EventEmitter::new(100);
1600 let mut rx = emitter.subscribe();
1601 let queue = CommandQueue::new(emitter);
1602
1603 let config = LaneConfig::new(1, 4);
1604 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1605 queue.register_lane(lane).await;
1606
1607 let cmd = Box::new(TestCommand::new(serde_json::json!({"result": "ok"})));
1608 let _ = queue.submit("test-lane", cmd).await.unwrap();
1609
1610 let event = tokio::time::timeout(std::time::Duration::from_millis(200), async {
1611 rx.recv().await.unwrap()
1612 })
1613 .await
1614 .expect("QUEUE_COMMAND_SUBMITTED event not received");
1615
1616 assert_eq!(event.key, events::QUEUE_COMMAND_SUBMITTED);
1617 }
1618
1619 #[tokio::test]
1620 async fn test_event_emitted_on_complete() {
1621 use crate::event::{events, EventStream};
1622
1623 let emitter = EventEmitter::new(100);
1624 let queue = Arc::new(CommandQueue::new(emitter.clone()));
1625
1626 let config = LaneConfig::new(1, 4);
1627 let lane = Arc::new(Lane::new("test-lane", config, priorities::QUERY));
1628 queue.register_lane(lane).await;
1629 Arc::clone(&queue).start_scheduler().await;
1630
1631 let mut stream: EventStream =
1632 emitter.subscribe_filtered(|e| e.key == events::QUEUE_COMMAND_COMPLETED);
1633
1634 let cmd = Box::new(TestCommand::new(serde_json::json!({"result": "ok"})));
1635 let rx = queue.submit("test-lane", cmd).await.unwrap();
1636
1637 let _ = tokio::time::timeout(std::time::Duration::from_secs(1), rx).await;
1639
1640 let event = tokio::time::timeout(std::time::Duration::from_millis(200), stream.recv())
1641 .await
1642 .expect("QUEUE_COMMAND_COMPLETED event not received")
1643 .expect("stream closed");
1644
1645 assert_eq!(event.key, events::QUEUE_COMMAND_COMPLETED);
1646 }
1647
1648 #[tokio::test]
1649 async fn test_event_emitted_on_shutdown() {
1650 use crate::event::events;
1651
1652 let emitter = EventEmitter::new(100);
1653 let mut rx = emitter.subscribe();
1654 let queue = CommandQueue::new(emitter);
1655
1656 queue.shutdown().await;
1657
1658 let event = tokio::time::timeout(std::time::Duration::from_millis(100), async {
1659 rx.recv().await.unwrap()
1660 })
1661 .await
1662 .expect("QUEUE_SHUTDOWN_STARTED event not received");
1663
1664 assert_eq!(event.key, events::QUEUE_SHUTDOWN_STARTED);
1665 }
1666
1667 #[cfg(feature = "distributed")]
1668 #[tokio::test]
1669 async fn test_rate_limit_blocks_dequeue() {
1670 use crate::ratelimit::RateLimitConfig;
1671
1672 let config = LaneConfig::new(1, 10).with_rate_limit(RateLimitConfig::per_second(1));
1674 let lane = Lane::new("test", config, priorities::QUERY);
1675
1676 let _rx1 = lane
1677 .enqueue(Box::new(TestCommand::new(serde_json::json!(1))))
1678 .await;
1679 let _rx2 = lane
1680 .enqueue(Box::new(TestCommand::new(serde_json::json!(2))))
1681 .await;
1682
1683 let first = lane.try_dequeue().await;
1685 assert!(first.is_some(), "first dequeue should succeed");
1686 lane.mark_completed().await;
1688
1689 let second = lane.try_dequeue().await;
1691 assert!(second.is_none(), "second dequeue should be rate-limited");
1692 }
1693
1694 #[cfg(feature = "distributed")]
1695 #[tokio::test]
1696 async fn test_effective_priority_no_boost_when_fresh() {
1697 use crate::boost::PriorityBoostConfig;
1698
1699 let config = LaneConfig::new(1, 4).with_priority_boost(
1702 PriorityBoostConfig::new(std::time::Duration::from_secs(10))
1703 .with_boost(std::time::Duration::from_secs(9), 2),
1704 );
1705 let lane = Lane::new("test", config, priorities::QUERY);
1706
1707 assert_eq!(lane.effective_priority().await, priorities::QUERY);
1709
1710 let _rx = lane
1712 .enqueue(Box::new(TestCommand::new(serde_json::json!(1))))
1713 .await;
1714 assert_eq!(lane.effective_priority().await, priorities::QUERY);
1715 }
1716
1717 #[cfg(feature = "distributed")]
1718 #[tokio::test]
1719 async fn test_effective_priority_boosted_when_past_deadline() {
1720 use crate::boost::PriorityBoostConfig;
1721
1722 let config = LaneConfig::new(1, 4).with_priority_boost(PriorityBoostConfig::standard(
1724 std::time::Duration::from_millis(1), ));
1726 let lane = Lane::new("test", config, priorities::PROMPT); let _rx = lane
1729 .enqueue(Box::new(TestCommand::new(serde_json::json!(1))))
1730 .await;
1731
1732 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1734
1735 assert_eq!(lane.effective_priority().await, 0);
1736 }
1737}