A3S Lane
Overview
A3S Lane provides a lane-based priority command queue designed for managing concurrent async operations with different priority levels. In the A3S ecosystem, each a3s-code agent session gets its own a3s-lane instance — ensuring control commands (pause/cancel) always preempt LLM generation tasks. Commands are organized into lanes, each with configurable concurrency limits and priority.
Basic Usage
use ;
use async_trait;
// Define a command
async
Features
- Priority-based Scheduling: Commands execute based on lane priority (lower = higher priority)
- Concurrency Control: Per-lane min/max concurrency limits
- 6 Built-in Lanes: system, control, query, session, skill, prompt
- Command Timeout: Configurable timeout per lane with automatic cancellation
- Retry Policies: Exponential backoff, fixed delay, or custom retry strategies
- Dead Letter Queue: Capture permanently failed commands for inspection
- Persistent Storage: Optional pluggable storage backend (LocalStorage included)
- Graceful Shutdown: Drain pending commands before shutdown
- Multi-core Parallelism: Automatic CPU core detection and parallel processing
- Queue Partitioning: Distribute commands across workers for scalability
- Rate Limiting: Token bucket and sliding window rate limiters per lane
- Priority Boosting: Deadline-based automatic priority adjustment
- Distributed Queue Support: Pluggable interface for multi-machine processing
- Metrics Collection: Local in-memory metrics with pluggable backend support
- Latency Histograms: Track command execution and wait time with percentiles (p50, p90, p95, p99)
- Queue Depth Alerts: Configurable warning and critical thresholds
- Latency Alerts: Monitor and alert on command execution latency
- Event System: Subscribe to queue events for monitoring
- Health Monitoring: Track queue depth and active command counts
- Builder Pattern: Flexible queue configuration
- OpenTelemetry Integration: Native OTLP metrics export via
OtelMetricsBackend - Python SDK:
pip install a3s-lane— async queue management from Python - Node.js SDK:
npm install @a3s-lab/lane— native bindings for Node.js - Async-first: Built on Tokio for high-performance async operations
Quality Metrics
Test Coverage
230 comprehensive unit tests with 96.48% line coverage:
| Module | Lines | Coverage | Functions | Coverage |
|---|---|---|---|---|
| dlq.rs | 157 | 100.00% | 34 | 100.00% |
| error.rs | 34 | 100.00% | 8 | 100.00% |
| lib.rs | 39 | 100.00% | 4 | 100.00% |
| retry.rs | 111 | 100.00% | 16 | 100.00% |
| manager.rs | 572 | 99.65% | 81 | 100.00% |
| queue.rs | 822 | 98.91% | 120 | 99.17% |
| monitor.rs | 325 | 98.46% | 45 | 97.78% |
| event.rs | 169 | 97.63% | 29 | 100.00% |
| metrics.rs | 453 | 96.69% | 82 | 93.90% |
| alerts.rs | 432 | 96.53% | 79 | 93.67% |
| boost.rs | 184 | 95.11% | 26 | 88.46% |
| storage.rs | 193 | 94.82% | 30 | 83.33% |
| distributed.rs | 227 | 92.07% | 34 | 82.35% |
| config.rs | 118 | 88.98% | 18 | 88.89% |
| ratelimit.rs | 257 | 88.33% | 53 | 86.79% |
| partition.rs | 143 | 86.71% | 28 | 82.14% |
| TOTAL | 4236 | 96.48% | 687 | 94.18% |
Run coverage report:
Performance Benchmarks
Criterion-based benchmarks measure throughput, concurrency scaling, and overhead:
# Run all benchmarks
# Run specific benchmark
Benchmark suites:
- Throughput: Measures commands/second for 1K, 10K, 50K, and 100K command batches
- Concurrency scaling: Tests performance with 1, 2, 4, 8, and 16 concurrent lanes
- Priority scheduling: Compares overhead of priority-based vs FIFO scheduling
- Metrics overhead: Measures performance impact of metrics collection
Results are saved to target/criterion/ with detailed HTML reports.
Architecture
Lane Priority Model
| Lane | Priority | Default Max Concurrency | Use Case |
|---|---|---|---|
system |
0 (highest) | 5 | System-level operations |
control |
1 | 3 | Control commands (pause, resume, cancel) |
query |
2 | 10 | Read-only queries |
session |
3 | 5 | Session management |
skill |
4 | 3 | Skill/tool execution |
prompt |
5 (lowest) | 2 | LLM prompt processing |
Components
┌─────────────────────────────────────────┐
│ QueueManager │
│ ┌─────────────────────────────────┐ │
│ │ CommandQueue │ │
│ │ ┌───────┐ ┌───────┐ ┌───────┐ │ │
│ │ │system │ │control│ │ query │ │ │
│ │ │ P=0 │ │ P=1 │ │ P=2 │ │ │
│ │ └───────┘ └───────┘ └───────┘ │ │
│ │ ┌───────┐ ┌───────┐ ┌───────┐ │ │
│ │ │session│ │ skill │ │prompt │ │ │
│ │ │ P=3 │ │ P=4 │ │ P=5 │ │ │
│ │ └───────┘ └───────┘ └───────┘ │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘
↓ schedule_next()
Priority-based command selection
Quick Start
Installation
Add to your Cargo.toml:
[]
= "0.2"
Custom Lanes
use ;
let emitter = new;
let manager = new
.with_lane
.with_lane
.with_lane
.build
.await?;
Queue Monitoring
use ;
use Duration;
use Arc;
let config = MonitorConfig ;
let monitor = new;
// Start monitoring (runs in background)
monitor.clone.start.await;
// Get current stats
let stats = monitor.stats.await;
println!;
Event Subscription
use EventEmitter;
let emitter = new;
// Subscribe to all events
let mut receiver = emitter.subscribe;
// Subscribe with filter
let mut filtered = emitter.subscribe_filtered;
// In another task
spawn;
Reliability Features
Command Timeout
use ;
use Duration;
let emitter = new;
// Configure lane with 30-second timeout
let config = new
.with_timeout;
let manager = new
.with_lane
.build
.await?;
Retry Policies
use ;
use Duration;
let emitter = new;
// Exponential backoff: 3 retries with 100ms initial delay, 2x multiplier
let retry_policy = exponential;
// Or fixed delay: 5 retries with 1 second delay
let retry_policy = fixed;
let config = new
.with_retry_policy;
let manager = new
.with_lane
.build
.await?;
Dead Letter Queue
use ;
let emitter = new;
let dlq = new; // Max 1000 dead letters
// Create queue with DLQ
let queue = with_dlq;
// Later: inspect failed commands
let dead_letters = dlq.list.await;
for letter in dead_letters
Graceful Shutdown
use QueueManager;
use Duration;
// Initiate shutdown (stop accepting new commands)
manager.shutdown.await;
// Wait for pending commands to complete (with 30s timeout)
manager.drain.await?;
println!;
Scalability Features
Multi-core Parallelism
use ;
// Automatically use all CPU cores
let partition_config = auto;
let distributed_queue = new;
println!;
Queue Partitioning
use ;
// Round-robin partitioning across 8 workers
let config = round_robin;
// Hash-based partitioning (same command types go to same partition)
let config = hash;
Rate Limiting
use ;
// Limit to 100 commands per second
let rate_limit = per_second;
let config = new
.with_rate_limit;
// Limit to 1000 commands per minute
let rate_limit = per_minute;
let config = new
.with_rate_limit;
Priority Boosting
use ;
use Duration;
// Standard boost: priority increases as deadline approaches
let boost = standard;
let config = new
.with_priority_boost;
// Aggressive boost: faster priority escalation
let boost = aggressive;
let config = new
.with_priority_boost;
Custom Distributed Queue
use ;
use async_trait;
Persistent Storage
use ;
use PathBuf;
let emitter = new;
let storage_dir = from;
let storage = new;
// Create queue with persistent storage
let manager = new
.with_storage
.with_lane
.build
.await?;
// Commands are automatically persisted to disk
// On restart, you can inspect stored commands:
let stored_commands = storage.load_commands.await?;
for cmd in stored_commands
Custom Storage Backend:
use ;
use async_trait;
Observability Features
Metrics Collection
use ;
let emitter = new;
// Create local metrics collector
let metrics = local;
// Build queue manager with metrics
let manager = new
.with_metrics
.with_lane
.build
.await?;
// Record metrics manually (or integrate into command execution)
metrics.record_submit.await;
metrics.record_complete.await; // 150ms latency
// Get metrics snapshot
let snapshot = metrics.snapshot.await;
println!;
// Get latency histogram stats
if let Some = snapshot.histograms.get
Custom Metrics Backend (Prometheus/OpenTelemetry):
use ;
use async_trait;
Queue Depth Alerts
use ;
use Arc;
let emitter = new;
// Create alert manager with queue depth thresholds
let alerts = new;
// Add alert callback
alerts.add_callback.await;
// Build queue manager with alerts
let manager = new
.with_alerts
.with_lane
.build
.await?;
// Check queue depth (triggers alerts if thresholds exceeded)
alerts.check_queue_depth.await; // Triggers warning
alerts.check_queue_depth.await; // Triggers critical
Latency Alerts
use ;
// Create alert manager with latency thresholds
let alerts = with_latency_alerts;
// Check latency after command execution
alerts.check_latency.await; // Triggers warning
alerts.check_latency.await; // Triggers critical
API Reference
QueueManagerBuilder
| Method | Description |
|---|---|
new(emitter) |
Create a new builder |
with_lane(id, config, priority) |
Add a custom lane |
with_default_lanes() |
Add the 6 default lanes |
with_storage(storage) |
Add persistent storage backend |
with_dlq(size) |
Add dead letter queue with max size |
with_metrics(metrics) |
Add metrics collection |
with_alerts(alerts) |
Add alert manager |
build() |
Build the QueueManager |
QueueManager
| Method | Description |
|---|---|
start() |
Start the scheduler |
submit(lane_id, command) |
Submit a command to a lane |
stats() |
Get queue statistics |
queue() |
Get underlying CommandQueue |
metrics() |
Get metrics collector (if configured) |
alerts() |
Get alert manager (if configured) |
shutdown() |
Initiate graceful shutdown (stop accepting new commands) |
drain(timeout) |
Wait for pending commands to complete with timeout |
is_shutting_down() |
Check if shutdown is in progress |
LaneConfig
| Method | Description |
|---|---|
new(min, max) |
Create config with min/max concurrency |
with_timeout(duration) |
Set command timeout for this lane |
with_retry_policy(policy) |
Set retry policy for this lane |
with_rate_limit(config) |
Set rate limiting for this lane |
with_priority_boost(config) |
Set priority boosting for this lane |
RetryPolicy
| Method | Description |
|---|---|
exponential(max_retries) |
Create exponential backoff policy (100ms initial, 30s max, 2x multiplier) |
fixed(max_retries, delay) |
Create fixed delay policy |
none() |
No retries |
DeadLetterQueue
| Method | Description |
|---|---|
new(max_size) |
Create DLQ with maximum size |
push(letter) |
Add a dead letter |
pop() |
Remove and return oldest dead letter |
list() |
Get all dead letters |
clear() |
Remove all dead letters |
len() |
Get current count |
Storage Trait
| Method | Description |
|---|---|
save_command(cmd) |
Persist a command to storage |
load_commands() |
Load all pending commands |
remove_command(id) |
Remove a completed command |
save_dead_letter(letter) |
Persist a dead letter |
load_dead_letters() |
Load all dead letters |
clear_dead_letters() |
Clear all dead letters |
clear_all() |
Clear all storage |
LocalStorage
| Method | Description |
|---|---|
new(path) |
Create local filesystem storage at path |
PartitionConfig
| Method | Description |
|---|---|
auto() |
Auto-detect CPU cores for optimal parallelism |
round_robin(n) |
Round-robin distribution across n partitions |
hash(n) |
Hash-based distribution across n partitions |
none() |
No partitioning (single partition) |
RateLimitConfig
| Method | Description |
|---|---|
per_second(n) |
Limit to n commands per second |
per_minute(n) |
Limit to n commands per minute |
per_hour(n) |
Limit to n commands per hour |
unlimited() |
No rate limiting |
PriorityBoostConfig
| Method | Description |
|---|---|
standard(deadline) |
Standard boost intervals (25%, 50%, 75%) |
aggressive(deadline) |
Aggressive boost intervals |
disabled() |
No priority boosting |
with_boost(time, boost) |
Add custom boost interval |
DistributedQueue Trait
| Method | Description |
|---|---|
enqueue(envelope) |
Enqueue command for processing |
dequeue(partition_id) |
Dequeue command from partition |
complete(result) |
Report command completion |
num_partitions() |
Get number of partitions |
worker_id() |
Get worker identifier |
LocalDistributedQueue
| Method | Description |
|---|---|
auto() |
Create with auto-detected CPU cores |
new(config) |
Create with custom partition config |
QueueMetrics
| Method | Description |
|---|---|
local() |
Create with local in-memory backend |
new(backend) |
Create with custom metrics backend |
record_submit(lane_id) |
Record command submission |
record_complete(lane_id, latency_ms) |
Record command completion with latency |
record_failure(lane_id) |
Record command failure |
record_timeout(lane_id) |
Record command timeout |
record_retry(lane_id) |
Record command retry |
record_dead_letter(lane_id) |
Record command sent to DLQ |
set_queue_depth(lane_id, depth) |
Update queue depth gauge |
set_active_commands(lane_id, active) |
Update active commands gauge |
record_wait_time(lane_id, wait_time_ms) |
Record command wait time |
snapshot() |
Get snapshot of all metrics |
reset() |
Reset all metrics |
MetricsBackend Trait
| Method | Description |
|---|---|
increment_counter(name, value) |
Increment a counter metric |
set_gauge(name, value) |
Set a gauge metric value |
record_histogram(name, value) |
Record histogram observation |
get_counter(name) |
Get current counter value |
get_gauge(name) |
Get current gauge value |
get_histogram_stats(name) |
Get histogram statistics |
reset() |
Reset all metrics |
snapshot() |
Export all metrics as snapshot |
AlertManager
| Method | Description |
|---|---|
new() |
Create with alerts disabled |
with_queue_depth_alerts(warn, crit) |
Create with queue depth alerts |
with_latency_alerts(warn_ms, crit_ms) |
Create with latency alerts |
set_queue_depth_config(config) |
Update queue depth alert config |
set_latency_config(config) |
Update latency alert config |
add_callback(callback) |
Add alert callback function |
check_queue_depth(lane_id, depth) |
Check queue depth and trigger alerts |
check_latency(lane_id, latency_ms) |
Check latency and trigger alerts |
QueueMonitor
| Method | Description |
|---|---|
new(queue) |
Create with default config |
with_config(queue, config) |
Create with custom config |
start() |
Start background monitoring |
stats() |
Get current statistics |
Command Trait
SDK
Python
# Create and start
=
# Submit a command
=
# Get stats
=
# Shutdown
Node.js
const = require;
// Create and start
const lane = ;
lane.;
// Submit a command
const result = lane.;
// Get stats
const stats = lane.;
console.log;
// Shutdown
lane.;
Development
Dependencies
| Dependency | Install | Purpose |
|---|---|---|
cargo-llvm-cov |
cargo install cargo-llvm-cov |
Code coverage (optional) |
lcov |
brew install lcov / apt install lcov |
Coverage report formatting (optional) |
cargo-watch |
cargo install cargo-watch |
File watching (optional) |
Build Commands
# Build
# Test (with colored progress display)
# Test subsets
# Coverage (requires cargo-llvm-cov + lcov)
# Format & Lint
# Utilities
Project Structure
lane/
├── Cargo.toml
├── justfile
├── README.md
├── CLAUDE.md
├── .github/ # CI/CD workflows
│ ├── setup-workspace.sh
│ └── workflows/
│ ├── ci.yml
│ ├── release.yml
│ ├── publish-node.yml
│ └── publish-python.yml
├── sdk/ # Language SDKs
│ ├── node/ # Node.js SDK (napi-rs)
│ │ ├── Cargo.toml
│ │ ├── package.json
│ │ ├── src/
│ │ └── npm/ # Per-platform packages
│ └── python/ # Python SDK (PyO3)
│ ├── Cargo.toml
│ ├── pyproject.toml
│ └── src/
├── examples/ # Comprehensive feature demonstrations
│ ├── basic_usage.rs
│ ├── reliability.rs
│ ├── observability.rs
│ └── scalability.rs
├── benches/ # Performance benchmarks
│ └── queue_benchmark.rs
└── src/
├── lib.rs # Library entry point with module docs
├── config.rs # LaneConfig with timeout, retry, rate limit
├── error.rs # LaneError and Result types
├── event.rs # EventEmitter, LaneEvent
├── queue.rs # Lane, CommandQueue, Command trait
├── manager.rs # QueueManager, QueueManagerBuilder
├── monitor.rs # QueueMonitor, MonitorConfig
├── retry.rs # RetryPolicy (Phase 2)
├── dlq.rs # DeadLetterQueue (Phase 2)
├── storage.rs # Storage trait, LocalStorage (Phase 2)
├── partition.rs # Partitioning strategies (Phase 3)
├── distributed.rs # Distributed queue support (Phase 3)
├── ratelimit.rs # Rate limiting (Phase 3)
├── boost.rs # Priority boosting (Phase 3)
├── metrics.rs # Metrics collection (Phase 4)
├── alerts.rs # Alert management (Phase 4)
└── telemetry.rs # OpenTelemetry integration (Phase 5)
A3S Ecosystem
A3S Lane is the per-session scheduling layer of the A3S Agent OS. Each a3s-code agent session gets its own a3s-lane instance, ensuring control commands always preempt LLM generation tasks.
┌──────────────────────────────────────────────────────────┐
│ A3S Agent OS │
│ │
│ External: a3s-gateway (OS external gateway) │
│ │ │
│ Sandbox: a3s-box (MicroVM isolation) │
│ │ │
│ Application: SafeClaw (OS main app, multi-agent) │
│ │ │
│ Execution: a3s-code (AI agent instances) │
│ │ │
│ Scheduling: a3s-lane (per-session priority queue) │
│ ▲ │
│ │ You are here │
└──────────────────────────────────────────────────────────┘
| Project | Package | Relationship |
|---|---|---|
| code | a3s-code |
Each a3s-code session creates its own SessionLaneQueue wrapping a3s-lane |
| SafeClaw | safeclaw |
Coordinates multiple a3s-code sessions, each with independent a3s-lane instances |
Standalone Usage: a3s-lane also works independently for any priority-based async task scheduling:
- Web servers with request prioritization
- Background job processors
- Rate-limited API clients
- Any system needing lane-based concurrency control
Roadmap
Phase 1: Core ✅ (Complete)
- Priority-based lane scheduling
- Configurable concurrency per lane
- Event system for monitoring
- Queue manager with builder pattern
- Health monitoring with thresholds
- Async-first with Tokio
- 230 comprehensive tests
Phase 2: Reliability ✅ (Complete)
- Persistent queue storage (LocalStorage + pluggable Storage trait)
- Command timeout support with automatic cancellation
- Command retries with exponential backoff and fixed delay strategies
- Dead letter queue for permanently failed commands
- Graceful shutdown with drain and timeout
Phase 3: Scalability ✅ (Complete)
- Queue partitioning (round-robin, hash-based, custom strategies)
- Multi-core parallelism (automatic CPU core detection)
- Distributed queue support (pluggable DistributedQueue trait)
- Priority boosting (deadline-based automatic priority adjustment)
- Rate limiting per lane (token bucket and sliding window algorithms)
Phase 4: Observability ✅ (Complete)
- Metrics collection (LocalMetrics + pluggable MetricsBackend trait)
- Latency histograms with percentiles (p50, p90, p95, p99)
- Queue depth alerts with configurable warning/critical thresholds
- Latency alerts with warning and critical levels
- Prometheus/OpenTelemetry ready (implement MetricsBackend trait)
- Alert callbacks for custom notification handling
Documentation & Examples ✅ (Complete)
- Comprehensive README with all features documented
- 4 working examples demonstrating all major features
- Performance benchmarks with Criterion
- Detailed API reference
- Inline documentation for all public APIs
Phase 5: OpenTelemetry Integration ✅ (Complete)
- OTLP Metrics Export:
OtelMetricsBackendimplementingMetricsBackendfor OpenTelemetry- Latency histograms (p50/p90/p95/p99) → OTLP Histogram
- Queue depth gauges → OTLP Gauge
- Command throughput counters → OTLP Counter
- Distributed Tracing: Propagate trace context through command lifecycle
- Prometheus Exporter: Via OpenTelemetry Prometheus bridge
- Alert Integration: Forward alerts to external systems via callbacks
Phase 6: SDK & CI/CD ✅ (Complete)
- Python SDK (PyO3/maturin):
pip install a3s-lane— async queue management from Python - Node.js SDK (napi-rs):
npm install @a3s-lab/lane— native bindings for Node.js - Multi-platform builds: 7 platforms (macOS arm64/x64, Linux x64/arm64 gnu/musl, Windows x64)
- GitHub Actions CI/CD: Automated publishing to crates.io, PyPI, and npm on tag push
- Per-platform npm packages:
@a3s-lab/lane-{platform}withoptionalDependencies
Examples
The examples/ directory contains comprehensive demonstrations of all features:
Basic Usage (examples/basic_usage.rs)
Demonstrates:
- Creating a queue manager with default lanes
- Submitting commands and handling results
- Checking queue statistics
- Graceful shutdown
Reliability Features (examples/reliability.rs)
Demonstrates:
- Command timeout configuration
- Retry policies with exponential backoff
- Dead letter queue for failed commands
- Graceful shutdown with drain
Observability Features (examples/observability.rs)
Demonstrates:
- Metrics collection with local backend
- Latency histogram tracking
- Queue depth and latency alerts
- Alert callbacks for notifications
Scalability Features (examples/scalability.rs)
Demonstrates:
- Rate limiting configuration
- Priority boosting for urgent commands
- Distributed queue with partitioning
- Multi-core parallelism
Benchmarks
Performance benchmarks are available using Criterion:
# Run all benchmarks
# Run specific benchmark
# View HTML reports
Benchmark suites included:
-
Throughput Benchmarking
- Tests: 10, 100, 1000 commands
- Measures: Full lifecycle (create, start, execute, shutdown, drain)
- Purpose: Understand end-to-end performance including overhead
-
Concurrency Scaling
- Tests: 1, 5, 10, 20 concurrent lanes
- Measures: 100 commands with simulated work (100μs each)
- Purpose: Evaluate multi-core scaling efficiency
-
Priority Scheduling Overhead
- Compares: 3 priority lanes vs single lane
- Measures: 30 commands distributed across priorities
- Purpose: Quantify cost of priority features
-
Metrics Collection Overhead
- Compares: With vs without metrics
- Measures: 100 commands with/without observability
- Purpose: Understand monitoring costs
-
Rate Limiting
- Tests: 100 commands with 50 commands/sec limit
- Measures: Impact of rate limiting on throughput
- Purpose: Validate rate limiter behavior
Sample Results
Benchmarks run on Apple Silicon (M-series) with optimized release build:
| Benchmark | Commands | Time | Throughput |
|---|---|---|---|
| Full lifecycle | 10 | ~107ms | ~93 ops/sec |
| Full lifecycle | 100 | ~1.17s | ~85 ops/sec |
| Concurrent (1 lane) | 100 | ~10-15ms | ~6,600-10,000 ops/sec |
| Concurrent (10 lanes) | 100 | ~2-3ms | ~33,000-50,000 ops/sec |
| Priority scheduling | 30 | ~3-5ms | ~6,000-10,000 ops/sec |
| With metrics | 100 | ~1.2-1.3s | ~77-83 ops/sec |
| Without metrics | 100 | ~1.1-1.2s | ~83-90 ops/sec |
Notes:
- Full lifecycle benchmarks include manager creation, startup, execution, shutdown, and drain
- Concurrent benchmarks measure steady-state throughput with reused manager
- Metrics overhead is approximately 3-5% in typical workloads
- Actual performance varies based on hardware, command complexity, and workload patterns
Results include:
- Mean execution time with 95% confidence intervals
- Throughput measurements (ops/sec)
- Detailed statistical analysis
- Historical comparison charts
License
MIT