azoth-bus
Multi-consumer pub/sub event bus built on Azoth's embedded database primitives.
Overview
Azoth Bus provides durable, multi-consumer event streaming with independent cursors, load-balanced consumer groups, and automatic retention policies. It's built as a pure compositional layer on top of Azoth's LMDB-backed event log, requiring no modifications to Azoth core.
Quick Start
use ;
use EventBus;
use Arc;
// Open database and create bus
let db = new;
let bus = new;
// Publish events
new.execute?;
// Subscribe and consume
let mut consumer = bus.subscribe?;
while let Some = consumer.next?
Features
Multi-Consumer Event Processing
- Independent cursors: Each consumer tracks its own position
- Stream filtering: Subscribe to
"orders"to automatically see only"orders:*"events - Composable filters: Combine prefix, exact, and/or filters for fine-grained control
- Lag monitoring: Track how far behind each consumer is from the head
Consumer Groups (Load Balancing)
- Parallel processing: Multiple workers claim and process events concurrently
- No duplicates: Atomic claim mechanism ensures each event is processed exactly once
- Fault tolerance: Lease-based expiration automatically reclaims stale claims
- Nack and retry: Failed events can be nacked and retried with forward progress semantics
Async Support
- Non-blocking consumption: Use
next_async()for async/await workloads - Pluggable wake strategies: Poll-based (default) or notification-based (Tokio Notify)
Retention Policies
- Automatic cleanup: Configure per-stream retention (KeepAll, KeepCount, KeepDays)
- Safe compaction: Never deletes events still needed by slow consumers
- Background compaction: Optional continuous compaction task
Usage
Basic Consumer
// Subscribe to stream (auto-filters to "orders:*" events)
let mut consumer = bus.subscribe?;
// Read and acknowledge events
while let Some = consumer.next?
// Check consumer status
let info = bus.consumer_info?;
println!;
Consumer Groups
use Duration;
// Create consumer group
let group = bus.consumer_group;
// Join as a member
let mut worker = group.join?;
// Claim and process events
loop
Async Consumption
// Use async iterator
while let Some = consumer.next_async.await?
Retention Policies
use RetentionManager;
let mgr = new;
// Set retention per stream
mgr.set_retention?;
mgr.set_retention?;
// Run compaction
let stats = mgr.compact?;
println!;
// Or run continuous background compaction
spawn;
Event Filters
// Additional filtering within a stream
let consumer = bus.subscribe?
.with_filter;
Architecture
Azoth Bus stores all metadata in LMDB using structured key prefixes:
bus:consumer:{stream}:{name}:cursor → Last acked event ID
bus:consumer:{stream}:{name}:meta → Consumer metadata (JSON)
bus:group:{stream}:{group}:cursor → Next event to claim
bus:group:{stream}:{group}:claim:{id} → Claim info with lease
bus:group:{stream}:{group}:member:{id} → Member metadata
bus:group:{stream}:{group}:reclaim → Nacked events for retry
bus:stream:{name}:config → Retention policy (JSON)
Event iteration uses Azoth's existing event log APIs. Cursor updates use Azoth's transaction system for atomicity.
Examples
# Simple consumer
# Multi-consumer with independent cursors
# Async consumption
# Consumer groups (load balancing)
# Retention policies
Testing
Test Coverage (33 tests):
- Core consumer functionality (creation, ack, seek, filtering)
- Independent cursors and lag monitoring
- Event filtering (prefix, exact, and, or)
- Async notifications (poll and notify strategies)
- Retention policies (KeepAll, KeepCount)
- Consumer groups (claims, releases, expiration)
- Nack/reclaim with forward progress semantics
- LMDB cursor edge cases (empty ranges, sequential calls)
Implementation Notes
Consumer Group Forward Progress
Consumer groups prioritize making forward progress over retrying failed events:
- Fresh events are claimed first (advancing the cursor)
- Nacked events are pushed to a reclaim list
- Once caught up, reclaim list is retried (LIFO)
This ensures transient failures don't block new event processing.
LMDB Cursor Workarounds
The implementation works around two LMDB cursor issues:
-
Sequential iterator creation: Sequential calls to
iter_events()with different positions can fail. The implementation uses batch iteration with larger limits instead. -
Empty range scans: LMDB panics when scanning empty key ranges. The
list_consumers()function usesstd::panic::catch_unwindto handle this gracefully.
Known Limitations
- KeepCount retention: Works globally on the entire event log, not per-stream (all streams share the same log)
- KeepDays retention: Not yet implemented (requires event timestamps)
- Actual deletion: Compaction calculates what to delete but doesn't execute deletion yet (requires
event_log.delete_range()implementation)
Future Features
- Named streams: Multiple logical event logs for true isolation
- Value encryption: At-rest encryption for sensitive event data
Performance
- Cursor updates: Atomic via LMDB write locks
- Event iteration: Efficient sequential LMDB reads
- Concurrent consumers: Supported via LMDB MVCC
- Target throughput: 50k+ events/sec per consumer
License
MIT OR Apache-2.0