a3s-lane
Lane-based priority queue for concurrent async tasks. Commands are organized into named lanes with configurable concurrency and priority — the highest-priority lane with pending work is always scheduled next.
Used in the A3S ecosystem to guarantee control commands (pause/cancel) always preempt LLM generation: control (P=1) beats prompt (P=5) regardless of arrival order.
Install
[]
= "0.4"
All four features (distributed, metrics, monitoring, telemetry) are on by default. Core queue only:
= { = "0.4", = false }
# or pick selectively:
= { = "0.4", = false, = ["metrics", "distributed"] }
Usage
Implement the Command trait for each task type:
Then build a manager, start the scheduler, and submit:
use ;
use async_trait;
use Duration;
async
submit() returns a oneshot::Receiver<Result<Value>> — the ?? unwraps both the channel send and the command result.
Lane model
| Lane | Priority | Max concurrency | Use case |
|---|---|---|---|
system |
0 (highest) | 5 | System-level ops |
control |
1 | 3 | Pause / cancel |
query |
2 | 10 | Read-only queries |
session |
3 | 5 | Session management |
skill |
4 | 3 | Tool execution |
prompt |
5 (lowest) | 2 | LLM generation |
Custom lanes replace or extend the defaults:
new
.with_lane
.with_lane
.build.await?;
LaneConfig
All options use the builder pattern and can be chained:
new
.with_timeout
.with_retry_policy // 100ms initial, 2× backoff, 30s cap
.with_pressure_threshold // emit queue.lane.pressure / queue.lane.idle
.with_rate_limit // requires `distributed` feature
.with_priority_boost
RetryPolicy: exponential(max_retries), fixed(max_retries, delay), none().
RateLimitConfig: per_second(n), per_minute(n), per_hour(n), unlimited().
PriorityBoostConfig: standard(deadline) (boosts at 75/50/25% of deadline remaining), aggressive(deadline), disabled().
Events
EventStream implements futures_core::Stream — use .next().await via StreamExt or the .recv() convenience method. Subscribe directly from the manager without threading EventEmitter manually:
use StreamExt;
// All events
let mut stream = manager.subscribe;
// Filtered — only failures
let mut failures = manager.subscribe_filtered;
spawn;
Events emitted automatically at every queue stage:
| Event key | When | Payload fields |
|---|---|---|
queue.command.submitted |
submit() accepted |
lane_id |
queue.command.started |
Scheduler dispatched | lane_id, command_id, command_type |
queue.command.completed |
Returned Ok |
lane_id, command_id |
queue.command.retry |
Failed, will retry | lane_id, command_id, attempt |
queue.command.dead_lettered |
Moved to DLQ | lane_id, command_id, command_type |
queue.command.failed |
Terminal failure | lane_id, command_id, error |
queue.command.timeout |
Timed out | lane_id, command_id, error |
queue.shutdown.started |
shutdown() called |
— |
queue.lane.pressure |
pending >= threshold, first crossing |
lane_id |
queue.lane.idle |
pending == 0 after being pressured |
lane_id |
queue.lane.pressure and queue.lane.idle require with_pressure_threshold(n) on the lane config.
Reliability
Dead letter queue
let dlq = new;
let queue = with_dlq;
// Inspect failed commands after running
for letter in dlq.list.await
Persistent storage
let storage = new;
let manager = new
.with_storage
.with_default_lanes
.build.await?;
Custom backends: implement the Storage trait (save_command, load_commands, remove_command, save_dead_letter, load_dead_letters, clear_all).
Graceful shutdown
manager.shutdown.await; // stop accepting new commands
manager.drain.await?; // wait for in-flight to finish
Observability
Metrics
let metrics = local; // in-memory; or bring your own MetricsBackend
let manager = new
.with_metrics
.build.await?;
let snap = metrics.snapshot.await;
// snap.counters → submit/complete/fail/timeout/retry/dead-letter counts per lane
// snap.histograms → latency p50/p90/p95/p99 per lane
OpenTelemetry OTLP export: use OtelMetricsBackend (requires telemetry feature).
Custom backend: implement MetricsBackend (increment_counter, set_gauge, record_histogram, snapshot, reset).
Alerts and monitoring
let alerts = new;
alerts.add_callback.await;
let manager = new
.with_alerts
.build.await?;
Background monitor (polls on an interval):
let monitor = new;
monitor.clone.start.await;
let stats = monitor.stats.await;
println!;
Scalability (distributed feature)
// Rate limiting — enforced at dequeue time, not submit time
new.with_rate_limit
// Priority boost — commands approaching their deadline get elevated priority
new.with_priority_boost
// Multi-core partitioning — auto-detects CPU cores
let queue = new;
Custom distributed queue: implement DistributedQueue (enqueue, dequeue, complete, num_partitions, worker_id).
SDKs
See sdk/python/ and sdk/node/ for full examples and type definitions.
Development
Optional: cargo install cargo-llvm-cov, brew install lcov (HTML coverage).
In the A3S ecosystem
a3s-lane is the scheduling layer of the A3S Agent OS. Each a3s-code agent session gets its own instance, ensuring control commands always preempt LLM work:
a3s-gateway → a3s-box (MicroVM) → SafeClaw → a3s-code → a3s-lane
↑ here
Works standalone for any priority-based async scheduling: web servers, background job processors, rate-limited API clients.
Benchmarks
Apple Silicon (M-series), release build, steady-state throughput with pre-warmed manager:
| Workload | Throughput |
|---|---|
| 100 commands, 10 lanes | ~33,000–50,000 ops/sec |
| 100 commands, 1 lane | ~6,600–10,000 ops/sec |
| Metrics overhead | ~3–5% |
Full lifecycle benchmarks (including manager create/start/shutdown) run at ~85–93 ops/sec — dominated by startup cost, not scheduling.
License
MIT