Skip to main content

commonware_storage/queue/
mod.rs

1//! A durable, at-least-once delivery queue backed by a [`variable::Journal`](crate::journal::contiguous::variable).
2//!
3//! [Queue] provides a persistent message queue with at-least-once delivery semantics.
4//! Items are durably stored in a journal and will survive crashes. The reader must
5//! explicitly acknowledge each item after processing. On restart, all non-pruned
6//! items are re-delivered (acknowledged or not).
7//!
8//! # Concurrent Access
9//!
10//! For concurrent access from separate writer and reader tasks, use the [shared] module.
11//! Writers can be cloned for multiple producer tasks.
12//!
13//! ```rust,ignore
14//! use commonware_storage::queue::shared;
15//! use commonware_macros::select;
16//!
17//! let (writer, mut reader) = shared::init(context, config).await?;
18//!
19//! // Writer task (can clone for multiple producers)
20//! writer.enqueue(item).await?;
21//!
22//! // Reader task
23//! loop {
24//!     select! {
25//!         result = reader.recv() => {
26//!             let Some((pos, item)) = result? else { break };
27//!             // Process item...
28//!             reader.ack(pos).await?;
29//!         }
30//!         _ = shutdown => break,
31//!     }
32//! }
33//! ```
34//!
35//! # Example
36//!
37//! ```rust
38//! use commonware_codec::RangeCfg;
39//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
40//! use commonware_storage::{queue::{Queue, Config}};
41//! use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
42//!
43//! let executor = deterministic::Runner::default();
44//! executor.start(|context| async move {
45//!     // Create a page cache
46//!     let page_cache = CacheRef::from_pooler(
47//!         &context,
48//!         NonZeroU16::new(1024).unwrap(),
49//!         NonZeroUsize::new(10).unwrap(),
50//!     );
51//!
52//!     // Create a queue
53//!     let mut queue = Queue::<_, Vec<u8>>::init(context, Config {
54//!         partition: "my-queue".into(),
55//!         items_per_section: NonZeroU64::new(1000).unwrap(),
56//!         compression: None,
57//!         codec_config: ((0..).into(), ()), // RangeCfg for Vec length, () for u8
58//!         page_cache,
59//!         write_buffer: NonZeroUsize::new(4096).unwrap(),
60//!     }).await.unwrap();
61//!
62//!     // Enqueue items
63//!     queue.enqueue(b"task1".to_vec()).await.unwrap();
64//!     queue.enqueue(b"task2".to_vec()).await.unwrap();
65//!
66//!     // Dequeue and process items (can be done out of order)
67//!     while let Some((position, item)) = queue.dequeue().await.unwrap() {
68//!         // Process the item...
69//!         println!("Processing item at position {}", position);
70//!
71//!         // Acknowledge after successful processing
72//!         queue.ack(position).await.unwrap();
73//!     }
74//! });
75//! ```
76
77#[cfg(test)]
78mod conformance;
79mod metrics;
80pub mod shared;
81mod storage;
82
83pub use shared::{Reader, Writer};
84pub use storage::{Config, Queue};
85use thiserror::Error;
86
87/// Errors that can occur when interacting with [Queue].
88#[derive(Debug, Error)]
89pub enum Error {
90    #[error("journal error: {0}")]
91    Journal(#[from] crate::journal::Error),
92    #[error("position out of range: {0} (queue size is {1})")]
93    PositionOutOfRange(u64, u64),
94}