azoth-bus
Multi-consumer pub/sub event bus built on Azoth's embedded database primitives.
Overview
Azoth Bus provides durable, multi-consumer event streaming with independent cursors, load-balanced consumer groups, and automatic retention policies. It's built as a pure compositional layer on top of Azoth's LMDB-backed event log, requiring no modifications to Azoth core.
New in 0.2: Real-time notifications - consumers wake instantly when events are published, eliminating polling overhead.
Quick Start
use AzothDb;
use EventBus;
use Arc;
// Open database and create bus with notification support
let db = new;
let bus = with_notifications;
// Publish events using the bus API (auto-notifies consumers)
bus.publish?;
// Subscribe and consume
let mut consumer = bus.subscribe?;
while let Some = consumer.next?
Features
Real-Time Event Streaming (NEW)
- Instant wake-up: Consumers wake immediately when events are published - no polling required
- Notification-based: Uses
tokio::sync::Notifyfor sub-millisecond latency - Atomic publish + notify: Events are committed and consumers notified in one operation
- Async-safe:
publish_async()andack_async()for use in async contexts
Multi-Consumer Event Processing
- Independent cursors: Each consumer tracks its own position
- Stream filtering: Subscribe to
"orders"to automatically see only"orders:*"events - Composable filters: Combine prefix, exact, and/or filters for fine-grained control
- Lag monitoring: Track how far behind each consumer is from the head
Consumer Groups (Load Balancing)
- Parallel processing: Multiple workers claim and process events concurrently
- No duplicates: Atomic claim mechanism ensures each event is processed exactly once
- Fault tolerance: Lease-based expiration automatically reclaims stale claims
- Nack and retry: Failed events can be nacked and retried with forward progress semantics
Async Support
- Non-blocking consumption: Use
next_async()for async/await workloads - Pluggable wake strategies: Poll-based (default) or notification-based (Tokio Notify)
- Stream trait:
futures::Streamimplementation for ergonomic async iteration
Stream Processing & Actor Patterns
- Transform pipelines: Chain agents that consume, transform, and publish to other streams
- Actor mailboxes: Messages accumulate durably until agents process them
- Offline tolerance: Agents can go offline and catch up on missed messages later
- Independent cursors: Each agent tracks its own position in each stream
Retention Policies
- Automatic cleanup: Configure per-stream retention (KeepAll, KeepCount, KeepDays)
- Safe compaction: Never deletes events still needed by slow consumers
- Background compaction: Optional continuous compaction task
Usage
Publishing Events (Recommended)
Use bus.publish() for automatic event formatting and consumer notification:
// Synchronous publish (for non-async contexts)
let event_id = bus.publish?;
// Async publish (for async contexts - uses spawn_blocking internally)
let event_id = bus.publish_async.await?;
// Batch publish (atomic)
let = bus.publish_batch?;
Basic Consumer
// Subscribe to stream (auto-filters to "orders:*" events)
let mut consumer = bus.subscribe?;
// Read and acknowledge events
while let Some = consumer.next?
// Check consumer status
let info = bus.consumer_info?;
println!;
Real-Time Async Processing
For instant wake-up when events arrive:
// Create bus with notification support
let bus = with_notifications;
// Create consumer (do this before entering async context)
let mut consumer = bus.subscribe?;
// In async context: consumer wakes instantly on publish
loop
Consumer Groups
use Duration;
// Create consumer group
let group = bus.consumer_group;
// Join as a member
let mut worker = group.join?;
// Claim and process events
loop
Stream API
Use futures::Stream for ergonomic async iteration:
use StreamExt;
use ;
// Manual acknowledgment stream
let stream = event_stream;
pin_mut!;
while let Some = stream.next.await
// Auto-acknowledging stream (fire-and-forget)
let stream = auto_ack_stream;
pin_mut!;
while let Some = stream.next.await
Stream Processing Pipeline
Build data transformation pipelines where agents consume, transform, and forward:
// Agent 1: Consume from "numbers", transform, publish to "doubled"
async
// Agent 2: Consume from "doubled", print results
async
Actor Mailbox Pattern
Use streams as durable mailboxes - messages accumulate until actors process them:
// Send messages while actor is offline
bus.publish?;
bus.publish?;
// Later: actor comes online and processes accumulated messages
let mut actor = bus.subscribe?;
// Check how many messages are waiting
let info = bus.consumer_info?;
println!;
// Process all waiting messages
while let Some = actor.next?
Key properties:
- Durable: Messages survive process restarts
- Independent cursors: Each actor tracks its own position
- Offline tolerance: Actors can go offline and catch up later
- Buffered or real-time: Use
Pollfor buffered,Notifyfor instant wake-up
Retention Policies
use RetentionManager;
let mgr = new;
// Set retention per stream
mgr.set_retention?;
mgr.set_retention?;
// Run compaction
let stats = mgr.compact?;
println!;
// Or run continuous background compaction
spawn;
Event Filters
// Additional filtering within a stream
let consumer = bus.subscribe?
.with_filter;
Wake Strategies
Azoth Bus supports two wake strategies for async consumers:
| Strategy | Use Case | Latency | CPU Usage |
|---|---|---|---|
Poll (default) |
Simple setups, testing | ~10ms (configurable) | Higher |
Notify |
Real-time, production | Sub-millisecond | Minimal |
use ;
use Duration;
// Polling (checks every 10ms by default)
let bus = new;
// Custom poll interval
let bus = with_wake_strategy;
// Notification-based (instant wake-up)
let bus = with_notifications;
Architecture
Azoth Bus stores all metadata in LMDB using structured key prefixes:
bus:consumer:{stream}:{name}:cursor -> Last acked event ID
bus:consumer:{stream}:{name}:meta -> Consumer metadata (JSON)
bus:group:{stream}:{group}:cursor -> Next event to claim
bus:group:{stream}:{group}:claim:{id} -> Claim info with lease
bus:group:{stream}:{group}:member:{id} -> Member metadata
bus:group:{stream}:{group}:reclaim -> Nacked events for retry
bus:stream:{name}:config -> Retention policy (JSON)
Event iteration uses Azoth's existing event log APIs. Cursor updates use Azoth's transaction system for atomicity.
Examples
# Simple consumer
# Multi-consumer with independent cursors
# Async consumption with polling
# Real-time notifications (instant wake-up)
# Stream processing pipeline (Generator -> Doubler -> Printer)
# Actor mailbox pattern (buffered message consumption)
# Consumer groups (load balancing)
# Retention policies
Testing
Test Coverage (43 tests):
- Core consumer functionality (creation, ack, seek, filtering)
- Independent cursors and lag monitoring
- Event filtering (prefix, exact, and, or)
- Async notifications (poll and notify strategies)
- Retention policies (KeepAll, KeepCount)
- Consumer groups (claims, releases, expiration)
- Nack/reclaim with forward progress semantics
- LMDB cursor edge cases (empty ranges, sequential calls)
- Publish API (publish, publish_batch, publish_raw)
- Stream API (event_stream, auto_ack_stream)
Implementation Notes
Consumer Group Forward Progress
Consumer groups prioritize making forward progress over retrying failed events:
- Fresh events are claimed first (advancing the cursor)
- Nacked events are pushed to a reclaim list
- Once caught up, reclaim list is retried (LIFO)
This ensures transient failures don't block new event processing.
Async Transaction Safety
Azoth's Transaction API panics when called from async contexts to prevent deadlocks. Use:
bus.publish()/consumer.ack()- for synchronous codebus.publish_async()/consumer.ack_async()- for async code
Consumer creation (bus.subscribe()) uses synchronous transactions, so create consumers before entering the async runtime or use spawn_blocking.
LMDB Cursor Workarounds
The implementation works around two LMDB cursor issues:
-
Sequential iterator creation: Sequential calls to
iter_events()with different positions can fail. The implementation uses batch iteration with larger limits instead. -
Empty range scans: LMDB panics when scanning empty key ranges. The
list_consumers()function usesstd::panic::catch_unwindto handle this gracefully.
Known Limitations
- KeepCount retention: Works globally on the entire event log, not per-stream (all streams share the same log)
- KeepDays retention: Not yet implemented (requires event timestamps)
- Actual deletion: Compaction calculates what to delete but doesn't execute deletion yet (requires
event_log.delete_range()implementation)
Performance
- Cursor updates: Atomic via LMDB write locks
- Event iteration: Efficient sequential LMDB reads
- Concurrent consumers: Supported via LMDB MVCC
- Notification latency: Sub-millisecond with
WakeStrategy::Notify - Target throughput: 50k+ events/sec per consumer
License
MIT OR Apache-2.0