1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
//! A durable, at-least-once delivery queue backed by a [`variable::Journal`](crate::journal::contiguous::variable).
//!
//! [Queue] provides a persistent message queue with at-least-once delivery semantics.
//! Items are durably stored in a journal and will survive crashes. The reader must
//! explicitly acknowledge each item after processing. On restart, all non-pruned
//! items are re-delivered (acknowledged or not).
//!
//! # Concurrent Access
//!
//! For concurrent access from separate writer and reader tasks, use the [shared] module.
//! Writers can be cloned for multiple producer tasks.
//!
//! ```rust,ignore
//! use commonware_storage::queue::shared;
//! use commonware_macros::select;
//!
//! let (writer, mut reader) = shared::init(context, config).await?;
//!
//! // Writer task (can clone for multiple producers)
//! writer.enqueue(item).await?;
//!
//! // Reader task
//! loop {
//! select! {
//! result = reader.recv() => {
//! let Some((pos, item)) = result? else { break };
//! // Process item...
//! reader.ack(pos).await?;
//! }
//! _ = shutdown => break,
//! }
//! }
//! ```
//!
//! # Example
//!
//! ```rust
//! use commonware_codec::RangeCfg;
//! use commonware_runtime::{Spawner, Runner, deterministic, buffer::paged::CacheRef};
//! use commonware_storage::{queue::{Queue, Config}};
//! use std::num::{NonZeroU16, NonZeroU64, NonZeroUsize};
//!
//! let executor = deterministic::Runner::default();
//! executor.start(|context| async move {
//! // Create a page cache
//! let page_cache = CacheRef::from_pooler(
//! &context,
//! NonZeroU16::new(1024).unwrap(),
//! NonZeroUsize::new(10).unwrap(),
//! );
//!
//! // Create a queue
//! let mut queue = Queue::<_, Vec<u8>>::init(context, Config {
//! partition: "my-queue".into(),
//! items_per_section: NonZeroU64::new(1000).unwrap(),
//! compression: None,
//! codec_config: ((0..).into(), ()), // RangeCfg for Vec length, () for u8
//! page_cache,
//! write_buffer: NonZeroUsize::new(4096).unwrap(),
//! }).await.unwrap();
//!
//! // Enqueue items
//! queue.enqueue(b"task1".to_vec()).await.unwrap();
//! queue.enqueue(b"task2".to_vec()).await.unwrap();
//!
//! // Dequeue and process items (can be done out of order)
//! while let Some((position, item)) = queue.dequeue().await.unwrap() {
//! // Process the item...
//! println!("Processing item at position {}", position);
//!
//! // Acknowledge after successful processing
//! queue.ack(position).await.unwrap();
//! }
//! });
//! ```
pub use ;
pub use ;
use Error;
/// Errors that can occur when interacting with [Queue].