Event Queues
Small Rust queue crate used by Arch services.
The package name is arch-event-queues; the Rust import path is arch_event_queues.
It exposes both an in-memory FIFO queue and a RocksDB-backed durable queue.
= "0.1"
Modules
In-memory queue
EventQueue<T> is a thread-safe FIFO queue backed by Arc<Mutex<VecDeque<T>>> and a Condvar.
Use it when events only need to live in memory and can be lost on process exit.
Main operations:
push(event): appends an event to the back of the queue and wakes one waiter.push_front(event): inserts an event at the front and wakes one waiter.pop(): returns the next event immediately, orNoneif the queue is empty.poll(): waits up to 100 ms for an event, then returnsSome(event)orNone.len()/is_empty(): inspect the current in-memory queue length.
use EventQueue;
let queue = new;
queue
.push
.unwrap;
assert_eq!;
Durable queue
DurableEventQueue<T> stores each event in RocksDB before making it available to consumers. Events remain durable until ack(id) or ack_many(ids) deletes them from RocksDB.
Use it when events must be replayed after restart and the consumer can tolerate at-least-once delivery.
Event payloads must implement:
BorshSerialize + BorshDeserialize + Clone
Main operations:
open(path): opens or creates a dedicated RocksDB queue database in eager mode.open_with_options(path, DurableEventQueueOptions::lazy()): opens in lazy mode.push(event): assigns a monotonically increasingu64id, sync-writes the payload to RocksDB, then makes it available.pop(): returns the next available durable event immediately.poll(): waits up to 100 ms for an event when none is currently available.get(id): reads an unacked event by id without polling it.ack(id): marks an event handled by deleting it from RocksDB.ack_many(ids): deletes multiple ids in one RocksDB batch.nack(id): makes an in-flight event available again without deleting it.len()/is_empty(): inspect unacked events stored in RocksDB.ready_len(): counts events currently available to process.iterator(): iterates all unacked events in id order.
ack and ack_many are intentionally idempotent: acknowledging an unknown or already-acked id succeeds.
use ;
use DurableEventQueue;
let dir = tempdir.unwrap;
let queue = open.unwrap;
let queued = queue
.push
.unwrap;
let next = queue.pop.unwrap.expect;
assert_eq!;
queue.ack.unwrap;
Delivery Semantics
The durable queue provides at-least-once delivery:
pushwrites the event to RocksDB using synchronous writes before enqueueing it.- An event returned by
poporpollis still stored on disk until it is acked. - If the process exits before
ack, reopening the queue replays the unacked event. - Consumers should make handlers idempotent or deduplicate by
DurableEvent.id.
The queue database should be dedicated to this queue. On open, every key in the database is treated as a durable queue entry.
Eager vs Lazy Mode
DurableEventQueue::open(path) uses eager mode. On startup, it scans RocksDB, deserializes every unacked event, and fills the in-memory ready queue. This makes polling cheap after open, but startup cost grows with the number and size of unacked events.
DurableEventQueueOptions::lazy() uses lazy mode. Startup scans keys to find ids but does not deserialize event payloads until they are popped. This is better for large backlogs or large payloads. Lazy mode tracks in-flight ids in memory so a popped event is not delivered again until it is acked, nacked, or the process restarts.
use ;
;
let dir = tempdir.unwrap;
let queue = open_with_options
.unwrap;
Running Tests
From the arch-network workspace, this crate is consumed through a path dependency:
= { = "../event-queues" }