arch-event-queues 0.1.0

In-memory and RocksDB-backed event queues for Arch services.
Documentation

Event Queues

docs.rs

Small Rust queue crate used by Arch services.

The package name is arch-event-queues; the Rust import path is arch_event_queues. It exposes both an in-memory FIFO queue and a RocksDB-backed durable queue.

arch-event-queues = "0.1"

Modules

In-memory queue

EventQueue<T> is a thread-safe FIFO queue backed by Arc<Mutex<VecDeque<T>>> and a Condvar.

Use it when events only need to live in memory and can be lost on process exit.

Main operations:

  • push(event): appends an event to the back of the queue and wakes one waiter.
  • push_front(event): inserts an event at the front and wakes one waiter.
  • pop(): returns the next event immediately, or None if the queue is empty.
  • poll(): waits up to 100 ms for an event, then returns Some(event) or None.
  • len() / is_empty(): inspect the current in-memory queue length.
use arch_event_queues::EventQueue;

#[derive(Debug, PartialEq, Eq)]
struct Event {
    payload: String,
}

let queue = EventQueue::new();
queue
    .push(Event {
        payload: "created".to_string(),
    })
    .unwrap();

assert_eq!(
    queue.pop().unwrap(),
    Some(Event {
        payload: "created".to_string(),
    })
);

Durable queue

DurableEventQueue<T> stores each event in RocksDB before making it available to consumers. Events remain durable until ack(id) or ack_many(ids) deletes them from RocksDB.

Use it when events must be replayed after restart and the consumer can tolerate at-least-once delivery.

Event payloads must implement:

borsh::BorshSerialize + borsh::BorshDeserialize + Clone

Main operations:

  • open(path): opens or creates a dedicated RocksDB queue database in eager mode.
  • open_with_options(path, DurableEventQueueOptions::lazy()): opens in lazy mode.
  • push(event): assigns a monotonically increasing u64 id, sync-writes the payload to RocksDB, then makes it available.
  • pop(): returns the next available durable event immediately.
  • poll(): waits up to 100 ms for an event when none is currently available.
  • get(id): reads an unacked event by id without polling it.
  • ack(id): marks an event handled by deleting it from RocksDB.
  • ack_many(ids): deletes multiple ids in one RocksDB batch.
  • nack(id): makes an in-flight event available again without deleting it.
  • len() / is_empty(): inspect unacked events stored in RocksDB.
  • ready_len(): counts events currently available to process.
  • iterator(): iterates all unacked events in id order.

ack and ack_many are intentionally idempotent: acknowledging an unknown or already-acked id succeeds.

use borsh::{BorshDeserialize, BorshSerialize};
use arch_event_queues::DurableEventQueue;

#[derive(Clone, Debug, PartialEq, Eq, BorshSerialize, BorshDeserialize)]
struct Job {
    payload: String,
}

let dir = tempfile::tempdir().unwrap();
let queue = DurableEventQueue::open(dir.path()).unwrap();

let queued = queue
    .push(Job {
        payload: "index block".to_string(),
    })
    .unwrap();

let next = queue.pop().unwrap().expect("queued job");
assert_eq!(next.id, queued.id);

queue.ack(next.id).unwrap();

Delivery Semantics

The durable queue provides at-least-once delivery:

  • push writes the event to RocksDB using synchronous writes before enqueueing it.
  • An event returned by pop or poll is still stored on disk until it is acked.
  • If the process exits before ack, reopening the queue replays the unacked event.
  • Consumers should make handlers idempotent or deduplicate by DurableEvent.id.

The queue database should be dedicated to this queue. On open, every key in the database is treated as a durable queue entry.

Eager vs Lazy Mode

DurableEventQueue::open(path) uses eager mode. On startup, it scans RocksDB, deserializes every unacked event, and fills the in-memory ready queue. This makes polling cheap after open, but startup cost grows with the number and size of unacked events.

DurableEventQueueOptions::lazy() uses lazy mode. Startup scans keys to find ids but does not deserialize event payloads until they are popped. This is better for large backlogs or large payloads. Lazy mode tracks in-flight ids in memory so a popped event is not delivered again until it is acked, nacked, or the process restarts.

use arch_event_queues::{DurableEventQueue, DurableEventQueueOptions};

#[derive(Clone, borsh::BorshSerialize, borsh::BorshDeserialize)]
struct Job;

let dir = tempfile::tempdir().unwrap();
let queue = DurableEventQueue::<Job>::open_with_options(
    dir.path(),
    DurableEventQueueOptions::lazy(),
)
.unwrap();

Running Tests

cargo test

From the arch-network workspace, this crate is consumed through a path dependency:

arch-event-queues = { path = "../event-queues" }