Skip to main content

a3s_lane/
lib.rs

1//! # A3S Lane
2//!
3//! A priority-based command queue for async task scheduling.
4//!
5//! ## Core (always compiled)
6//!
7//! - Priority-based scheduling with per-lane concurrency control
8//! - Command timeout and retry policies (exponential backoff, fixed delay)
9//! - Dead letter queue for permanently failed commands
10//! - Persistent storage (pluggable `Storage` trait, `LocalStorage` included)
11//! - Event system for queue lifecycle notifications
12//! - Graceful shutdown with drain support
13//!
14//! ## Feature Flags
15//!
16//! | Feature | Default | Dependencies | Description |
17//! |---------|---------|-------------|-------------|
18//! | `metrics` | ✅ | — | `MetricsBackend` trait, `LocalMetrics`, latency histograms |
19//! | `monitoring` | ✅ | `metrics` | `AlertManager`, `QueueMonitor` with depth/latency thresholds |
20//! | `telemetry` | ✅ | `opentelemetry`, `dashmap` | OpenTelemetry spans and `OtelMetricsBackend` |
21//! | `distributed` | ✅ | `num_cpus` | Partitioning, rate limiting, priority boosting, `DistributedQueue` |
22//!
23//! ## Quick Start
24//!
25//! ```rust,ignore
26//! use a3s_lane::{QueueManagerBuilder, EventEmitter, Command, Result};
27//! use async_trait::async_trait;
28//!
29//! struct MyCommand { data: String }
30//!
31//! #[async_trait]
32//! impl Command for MyCommand {
33//!     async fn execute(&self) -> Result<serde_json::Value> {
34//!         Ok(serde_json::json!({"processed": self.data}))
35//!     }
36//!     fn command_type(&self) -> &str { "my_command" }
37//! }
38//!
39//! #[tokio::main]
40//! async fn main() -> Result<()> {
41//!     let manager = QueueManagerBuilder::new(EventEmitter::new(100))
42//!         .with_default_lanes()
43//!         .build()
44//!         .await?;
45//!
46//!     manager.start().await?;
47//!
48//!     let rx = manager.submit("query", Box::new(MyCommand { data: "hello".into() })).await?;
49//!     let result = rx.await??;
50//!     println!("Result: {}", result);
51//!     Ok(())
52//! }
53//! ```
54
55// Core modules (always compiled)
56pub mod config;
57pub mod dlq;
58pub mod error;
59pub mod event;
60pub mod manager;
61pub mod queue;
62pub mod retry;
63pub mod storage;
64
65// Feature-gated modules
66#[cfg(feature = "monitoring")]
67pub mod alerts;
68#[cfg(feature = "distributed")]
69pub mod boost;
70#[cfg(feature = "distributed")]
71pub mod distributed;
72#[cfg(feature = "metrics")]
73pub mod metrics;
74#[cfg(feature = "monitoring")]
75pub mod monitor;
76#[cfg(feature = "distributed")]
77pub mod partition;
78#[cfg(feature = "distributed")]
79pub mod ratelimit;
80#[cfg(feature = "telemetry")]
81pub mod telemetry;
82
83// Core re-exports
84pub use config::LaneConfig;
85pub use dlq::{DeadLetter, DeadLetterQueue};
86pub use error::{LaneError, Result};
87pub use event::{EventEmitter, EventPayload, EventStream, LaneEvent};
88pub use manager::{QueueManager, QueueManagerBuilder};
89pub use queue::{
90    lane_ids, priorities, Command, CommandId, CommandQueue, JsonCommand, Lane, LaneId, LaneStatus,
91    Priority,
92};
93pub use retry::RetryPolicy;
94pub use storage::{LocalStorage, Storage, StoredCommand, StoredDeadLetter};
95
96// Feature-gated re-exports
97#[cfg(feature = "monitoring")]
98pub use alerts::{Alert, AlertLevel, AlertManager, LatencyAlertConfig, QueueDepthAlertConfig};
99#[cfg(feature = "distributed")]
100pub use boost::{PriorityBoostConfig, PriorityBooster};
101#[cfg(feature = "distributed")]
102pub use distributed::{
103    CommandEnvelope, CommandResult, DistributedQueue, LocalDistributedQueue, WorkerId, WorkerPool,
104};
105#[cfg(feature = "metrics")]
106pub use metrics::{
107    metric_names, HistogramPercentiles, HistogramStats, LocalMetrics, MetricsBackend,
108    MetricsSnapshot, QueueMetrics,
109};
110#[cfg(feature = "monitoring")]
111pub use monitor::{MonitorConfig, QueueMonitor};
112#[cfg(feature = "distributed")]
113pub use partition::{
114    CustomPartitioner, HashPartitioner, PartitionConfig, PartitionId, PartitionStrategy,
115    Partitioner, RoundRobinPartitioner,
116};
117#[cfg(feature = "distributed")]
118pub use ratelimit::{RateLimitConfig, RateLimiter, SlidingWindowLimiter, TokenBucketLimiter};
119#[cfg(feature = "telemetry")]
120pub use telemetry::OtelMetricsBackend;
121
122use serde::{Deserialize, Serialize};
123use std::collections::HashMap;
124
125/// Queue statistics snapshot
126///
127/// Provides a point-in-time view of the queue state across all lanes.
128///
129/// # Fields
130///
131/// * `total_pending` - Total number of commands waiting to be executed across all lanes
132/// * `total_active` - Total number of commands currently executing across all lanes
133/// * `dead_letter_count` - Total number of permanently failed commands in the dead letter queue
134/// * `lanes` - Per-lane status information (pending, active, min/max concurrency)
135///
136/// # Example
137///
138/// ```rust,ignore
139/// let stats = manager.stats().await?;
140/// println!("Queue has {} pending and {} active commands",
141///     stats.total_pending, stats.total_active);
142///
143/// for (lane_id, status) in &stats.lanes {
144///     println!("{}: {} pending, {} active (max: {})",
145///         lane_id, status.pending, status.active, status.max);
146/// }
147/// ```
148#[derive(Debug, Clone, Default, Serialize, Deserialize)]
149pub struct QueueStats {
150    pub total_pending: usize,
151    pub total_active: usize,
152    pub dead_letter_count: usize,
153    pub lanes: HashMap<String, LaneStatus>,
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159
160    #[tokio::test]
161    async fn test_queue_manager_builder() {
162        let emitter = EventEmitter::new(100);
163        let manager = QueueManagerBuilder::new(emitter)
164            .with_default_lanes()
165            .build()
166            .await
167            .unwrap();
168
169        let stats = manager.stats().await.unwrap();
170        assert_eq!(stats.lanes.len(), 6);
171    }
172
173    #[test]
174    fn test_queue_stats_default() {
175        let stats = QueueStats::default();
176        assert_eq!(stats.total_pending, 0);
177        assert_eq!(stats.total_active, 0);
178        assert!(stats.lanes.is_empty());
179    }
180
181    #[test]
182    fn test_queue_stats_serialization() {
183        let mut lanes = HashMap::new();
184        lanes.insert(
185            "query".to_string(),
186            LaneStatus {
187                pending: 5,
188                active: 2,
189                min: 1,
190                max: 10,
191            },
192        );
193
194        let stats = QueueStats {
195            total_pending: 5,
196            total_active: 2,
197            dead_letter_count: 0,
198            lanes,
199        };
200
201        let json = serde_json::to_string(&stats).unwrap();
202        let parsed: QueueStats = serde_json::from_str(&json).unwrap();
203        assert_eq!(parsed.total_pending, 5);
204        assert_eq!(parsed.total_active, 2);
205    }
206}