commonware_storage/queue/mod.rs
1//! A durable, at-least-once delivery queue backed by a [`variable::Journal`](crate::journal::contiguous::variable).
2//!
3//! [Queue] provides a persistent message queue with at-least-once delivery semantics.
4//! Items are durably stored in a journal and will survive crashes. The reader must
5//! explicitly acknowledge each item after processing. On restart, all non-pruned
6//! items are re-delivered (acknowledged or not).
7//!
8//! # Concurrent Access
9//!
10//! For concurrent access from separate writer and reader tasks, use the [shared] module.
11//! Writers can be cloned for multiple producer tasks.
12//!
13//! ```rust,ignore
14//! use commonware_storage::queue::shared;
15//! use commonware_macros::select;
16//!
17//! let (writer, mut reader) = shared::init(context, config).await?;
18//!
19//! // Writer task (can clone for multiple producers)
20//! writer.enqueue(item).await?;
21//!
22//! // Reader task
23//! loop {
24//! select! {
25//! result = reader.recv() => {
26//! let Some((pos, item)) = result? else { break };
27//! // Process item...
28//! reader.ack(pos).await?;
29//! }
30//! _ = shutdown => break,
31//! }
32//! }
33//! ```
34//!
35//! # Example
36//!
37//! ```rust
38//! use commonware_codec::RangeCfg;
39//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
40//! use commonware_storage::{queue::{Queue, Config}};
41//! use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
42//!
43//! let executor = deterministic::Runner::default();
44//! executor.start(|context| async move {
45//! // Create a page cache
46//! let page_cache = CacheRef::from_pooler(
47//! &context,
48//! NonZeroU16::new(1024).unwrap(),
49//! NonZeroUsize::new(10).unwrap(),
50//! );
51//!
52//! // Create a queue
53//! let mut queue = Queue::<_, Vec<u8>>::init(context, Config {
54//! partition: "my-queue".into(),
55//! items_per_section: NonZeroU64::new(1000).unwrap(),
56//! compression: None,
57//! codec_config: ((0..).into(), ()), // RangeCfg for Vec length, () for u8
58//! page_cache,
59//! write_buffer: NonZeroUsize::new(4096).unwrap(),
60//! }).await.unwrap();
61//!
62//! // Enqueue items
63//! queue.enqueue(b"task1".to_vec()).await.unwrap();
64//! queue.enqueue(b"task2".to_vec()).await.unwrap();
65//!
66//! // Dequeue and process items (can be done out of order)
67//! while let Some((position, item)) = queue.dequeue().await.unwrap() {
68//! // Process the item...
69//! println!("Processing item at position {}", position);
70//!
71//! // Acknowledge after successful processing
72//! queue.ack(position).await.unwrap();
73//! }
74//! });
75//! ```
76
77#[cfg(test)]
78mod conformance;
79mod metrics;
80pub mod shared;
81mod storage;
82
83pub use shared::{Reader, Writer};
84pub use storage::{Config, Queue};
85use thiserror::Error;
86
87/// Errors that can occur when interacting with [Queue].
88#[derive(Debug, Error)]
89pub enum Error {
90 #[error("journal error: {0}")]
91 Journal(#[from] crate::journal::Error),
92 #[error("position out of range: {0} (queue size is {1})")]
93 PositionOutOfRange(u64, u64),
94}