Skip to main content

Module queue

Module queue 

Source
Expand description

A durable, at-least-once delivery queue backed by a variable::Journal.

Queue provides a persistent message queue with at-least-once delivery semantics. Items are durably stored in a journal and will survive crashes. The reader must explicitly acknowledge each item after processing. On restart, all non-pruned items are re-delivered (acknowledged or not).

§Concurrent Access

For concurrent access from separate writer and reader tasks, use the shared module. Writers can be cloned for multiple producer tasks.

use commonware_storage::queue::shared;
use commonware_macros::select;

let (writer, mut reader) = shared::init(context, config).await?;

// Writer task (can clone for multiple producers)
writer.enqueue(item).await?;

// Reader task
loop {
    select! {
        result = reader.recv() => {
            let Some((pos, item)) = result? else { break };
            // Process item...
            reader.ack(pos).await?;
        }
        _ = shutdown => break,
    }
}

§Example

use commonware_codec::RangeCfg;
use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
use commonware_storage::{queue::{Queue, Config}};
use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};

let executor = deterministic::Runner::default();
executor.start(|context| async move {
    // Create a page cache
    let page_cache = CacheRef::from_pooler(
        &context,
        NonZeroU16::new(1024).unwrap(),
        NonZeroUsize::new(10).unwrap(),
    );

    // Create a queue
    let mut queue = Queue::<_, Vec<u8>>::init(context, Config {
        partition: "my-queue".into(),
        items_per_section: NonZeroU64::new(1000).unwrap(),
        compression: None,
        codec_config: ((0..).into(), ()), // RangeCfg for Vec length, () for u8
        page_cache,
        write_buffer: NonZeroUsize::new(4096).unwrap(),
    }).await.unwrap();

    // Enqueue items
    queue.enqueue(b"task1".to_vec()).await.unwrap();
    queue.enqueue(b"task2".to_vec()).await.unwrap();

    // Dequeue and process items (can be done out of order)
    while let Some((position, item)) = queue.dequeue().await.unwrap() {
        // Process the item...
        println!("Processing item at position {}", position);

        // Acknowledge after successful processing
        queue.ack(position).await.unwrap();
    }
});

Re-exports§

pub use shared::Reader;
pub use shared::Writer;

Modules§

shared
Shared queue with split writer/reader handles.

Structs§

Config
Configuration for Queue.
Queue
A durable, at-least-once delivery queue with per-item acknowledgment.

Enums§

Error
Errors that can occur when interacting with Queue.