Skip to main content

commonware_storage/queue/
shared.rs

1//! Shared queue with split writer/reader handles.
2//!
3//! Provides concurrent access to a [Queue] with multiple writers and a single reader.
4//! The reader can await new items using [Reader::recv], which integrates
5//! with `select!` for multiplexing with other futures.
6//!
7//! Writers can be cloned to allow multiple tasks to enqueue items concurrently.
8
9use super::{Config, Error, Queue};
10use crate::Persistable;
11use commonware_codec::CodecShared;
12use commonware_runtime::{Clock, Metrics, Storage};
13use commonware_utils::{channel::mpsc, sync::AsyncMutex};
14use std::{ops::Range, sync::Arc};
15use tracing::debug;
16
17/// Writer handle for enqueueing items.
18///
19/// This handle can be cloned to allow multiple tasks to enqueue items concurrently.
20/// All clones share the same underlying queue and notification channel.
21pub struct Writer<E: Clock + Storage + Metrics, V: CodecShared> {
22    queue: Arc<AsyncMutex<Queue<E, V>>>,
23    notify: mpsc::Sender<()>,
24}
25
26impl<E: Clock + Storage + Metrics, V: CodecShared> Clone for Writer<E, V> {
27    fn clone(&self) -> Self {
28        Self {
29            queue: self.queue.clone(),
30            notify: self.notify.clone(),
31        }
32    }
33}
34
35impl<E: Clock + Storage + Metrics, V: CodecShared> Writer<E, V> {
36    /// Enqueue an item, returning its position. The lock is held for the
37    /// full append + commit, so no reader can see the item until it is durable.
38    ///
39    /// # Errors
40    ///
41    /// Returns an error if the underlying storage operation fails.
42    pub async fn enqueue(&self, item: V) -> Result<u64, Error> {
43        let pos = self.queue.lock().await.enqueue(item).await?;
44
45        // Fire-and-forget so the writer never blocks on reader wake-up.
46        // The reader always checks the queue under lock, so a missed
47        // notification never causes a missed item.
48        let _ = self.notify.try_send(());
49
50        debug!(position = pos, "writer: enqueued item");
51        Ok(pos)
52    }
53
54    /// Enqueue a batch of items with a single commit, returning positions
55    /// `[start, end)`. The lock is held for the full batch, so no reader can
56    /// see any item until the entire batch is durable.
57    ///
58    /// # Errors
59    ///
60    /// Returns an error if any append or the final commit fails.
61    pub async fn enqueue_bulk(
62        &self,
63        items: impl IntoIterator<Item = V>,
64    ) -> Result<Range<u64>, Error> {
65        let mut queue = self.queue.lock().await;
66        let start = queue.size().await;
67        for item in items {
68            queue.append(item).await?;
69        }
70        let end = queue.size().await;
71        if end > start {
72            queue.commit().await?;
73        }
74        drop(queue);
75
76        if start < end {
77            let _ = self.notify.try_send(());
78        }
79        debug!(start, end, "writer: enqueued bulk");
80        Ok(start..end)
81    }
82
83    /// Append an item without committing, returning its position. The item
84    /// is immediately visible to the reader but is **not durable** until
85    /// [Self::commit] is called or the underlying journal auto-syncs at a
86    /// section boundary (see [`variable::Journal`](crate::journal::contiguous::variable::Journal)
87    /// invariant 1).
88    ///
89    /// # Errors
90    ///
91    /// Returns an error if the underlying storage operation fails.
92    pub async fn append(&self, item: V) -> Result<u64, Error> {
93        let pos = self.queue.lock().await.append(item).await?;
94        let _ = self.notify.try_send(());
95        debug!(position = pos, "writer: appended item");
96        Ok(pos)
97    }
98
99    /// See [Queue::commit](super::Queue::commit).
100    pub async fn commit(&self) -> Result<(), Error> {
101        self.queue.lock().await.commit().await
102    }
103
104    /// See [Queue::sync](super::Queue::sync).
105    pub async fn sync(&self) -> Result<(), Error> {
106        self.queue.lock().await.sync().await
107    }
108
109    /// Returns the total number of items that have been enqueued.
110    pub async fn size(&self) -> u64 {
111        self.queue.lock().await.size().await
112    }
113}
114
115/// Reader handle for dequeuing and acknowledging items.
116///
117/// There should only be one reader per shared queue.
118pub struct Reader<E: Clock + Storage + Metrics, V: CodecShared> {
119    queue: Arc<AsyncMutex<Queue<E, V>>>,
120    notify: mpsc::Receiver<()>,
121}
122
123impl<E: Clock + Storage + Metrics, V: CodecShared> Reader<E, V> {
124    /// Receive the next unacknowledged item, waiting if necessary.
125    ///
126    /// This method is designed for use with `select!`. It will:
127    /// 1. Return immediately if an unacked item is available
128    /// 2. Wait for the writer to enqueue new items if the queue is empty
129    /// 3. Return `None` if the writer is dropped (no more items will arrive)
130    ///
131    /// # Errors
132    ///
133    /// Returns an error if the underlying storage operation fails.
134    pub async fn recv(&mut self) -> Result<Option<(u64, V)>, Error> {
135        loop {
136            // Try to dequeue an item
137            if let Some(item) = self.queue.lock().await.dequeue().await? {
138                return Ok(Some(item));
139            }
140
141            // No item available, wait for notification
142            // Returns None if writer is dropped
143            if self.notify.recv().await.is_none() {
144                // Writer dropped, drain any remaining items
145                return self.queue.lock().await.dequeue().await;
146            }
147        }
148    }
149
150    /// Try to dequeue the next unacknowledged item without waiting.
151    ///
152    /// Returns `None` immediately if no unacked item is available.
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if the underlying storage operation fails.
157    pub async fn try_recv(&mut self) -> Result<Option<(u64, V)>, Error> {
158        // Drain pending notification (capacity is 1, so at most 1 buffered).
159        let _ = self.notify.try_recv();
160
161        self.queue.lock().await.dequeue().await
162    }
163
164    /// See [Queue::ack].
165    ///
166    /// # Errors
167    ///
168    /// Returns [super::Error::PositionOutOfRange] if the position is invalid.
169    pub async fn ack(&self, position: u64) -> Result<(), Error> {
170        self.queue.lock().await.ack(position).await
171    }
172
173    /// See [Queue::ack_up_to].
174    ///
175    /// # Errors
176    ///
177    /// Returns [super::Error::PositionOutOfRange] if `up_to` is invalid.
178    pub async fn ack_up_to(&self, up_to: u64) -> Result<(), Error> {
179        self.queue.lock().await.ack_up_to(up_to).await
180    }
181
182    /// See [Queue::ack_floor].
183    pub async fn ack_floor(&self) -> u64 {
184        self.queue.lock().await.ack_floor()
185    }
186
187    /// See [Queue::read_position].
188    pub async fn read_position(&self) -> u64 {
189        self.queue.lock().await.read_position()
190    }
191
192    /// See [Queue::is_empty].
193    pub async fn is_empty(&self) -> bool {
194        self.queue.lock().await.is_empty().await
195    }
196
197    /// See [Queue::reset].
198    pub async fn reset(&self) {
199        self.queue.lock().await.reset();
200    }
201}
202
203/// Initialize a shared queue and split into writer and reader handles.
204///
205/// # Example
206///
207/// ```rust,ignore
208/// use commonware_macros::select;
209///
210/// let (writer, mut reader) = shared::init(context, config).await?;
211///
212/// // Writer task (clone for multiple producers)
213/// writer.enqueue(item).await?;
214///
215/// // Reader task
216/// loop {
217///     select! {
218///         result = reader.recv() => {
219///             let Some((pos, item)) = result? else { break };
220///             // Process item...
221///             reader.ack(pos).await?;
222///         }
223///         _ = shutdown => break,
224///     }
225/// }
226/// ```
227pub async fn init<E: Clock + Storage + Metrics, V: CodecShared>(
228    context: E,
229    cfg: Config<V::Cfg>,
230) -> Result<(Writer<E, V>, Reader<E, V>), Error> {
231    let queue = Arc::new(AsyncMutex::new(Queue::init(context, cfg).await?));
232    let (notify_tx, notify_rx) = mpsc::channel(1);
233
234    let writer = Writer {
235        queue: queue.clone(),
236        notify: notify_tx,
237    };
238
239    let reader = Reader {
240        queue,
241        notify: notify_rx,
242    };
243
244    Ok((writer, reader))
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use commonware_codec::RangeCfg;
251    use commonware_macros::{select, test_traced};
252    use commonware_runtime::{
253        buffer::paged::CacheRef, deterministic, BufferPooler, Runner, Spawner,
254    };
255    use commonware_utils::{NZUsize, NZU16, NZU64};
256    use std::num::{NonZeroU16, NonZeroUsize};
257
258    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
259    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
260
261    fn test_config(partition: &str, pooler: &impl BufferPooler) -> Config<(RangeCfg<usize>, ())> {
262        Config {
263            partition: partition.into(),
264            items_per_section: NZU64!(10),
265            compression: None,
266            codec_config: ((0..).into(), ()),
267            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
268            write_buffer: NZUsize!(4096),
269        }
270    }
271
272    #[test_traced]
273    fn test_shared_basic() {
274        let executor = deterministic::Runner::default();
275        executor.start(|context| async move {
276            let cfg = test_config("test_shared_basic", &context);
277            let (writer, mut reader) = init(context, cfg).await.unwrap();
278
279            // Enqueue from writer
280            let pos = writer.enqueue(b"hello".to_vec()).await.unwrap();
281            assert_eq!(pos, 0);
282
283            // Receive from reader
284            let (recv_pos, item) = reader.recv().await.unwrap().unwrap();
285            assert_eq!(recv_pos, 0);
286            assert_eq!(item, b"hello".to_vec());
287
288            // Ack the item
289            reader.ack(recv_pos).await.unwrap();
290            assert!(reader.is_empty().await);
291        });
292    }
293
294    #[test_traced]
295    fn test_shared_append_commit() {
296        let executor = deterministic::Runner::default();
297        executor.start(|context| async move {
298            let cfg = test_config("test_shared_append_commit", &context);
299            let (writer, mut reader) = init(context, cfg).await.unwrap();
300
301            // Append several items without committing
302            for i in 0..5u8 {
303                let pos = writer.append(vec![i]).await.unwrap();
304                assert_eq!(pos, i as u64);
305            }
306
307            // Reader can see them before commit
308            let (pos, item) = reader.recv().await.unwrap().unwrap();
309            assert_eq!(pos, 0);
310            assert_eq!(item, vec![0]);
311
312            // Commit to make durable
313            writer.commit().await.unwrap();
314
315            // Remaining items still readable
316            for i in 1..5 {
317                let (pos, item) = reader.recv().await.unwrap().unwrap();
318                assert_eq!(pos, i);
319                assert_eq!(item, vec![i as u8]);
320                reader.ack(pos).await.unwrap();
321            }
322
323            reader.ack(0).await.unwrap();
324            assert!(reader.is_empty().await);
325        });
326    }
327
328    #[test_traced]
329    fn test_shared_enqueue_bulk() {
330        let executor = deterministic::Runner::default();
331        executor.start(|context| async move {
332            let cfg = test_config("test_shared_bulk", &context);
333            let (writer, mut reader) = init(context, cfg).await.unwrap();
334
335            let range = writer
336                .enqueue_bulk((0..5u8).map(|i| vec![i]))
337                .await
338                .unwrap();
339            assert_eq!(range, 0..5);
340
341            for i in 0..5 {
342                let (pos, item) = reader.recv().await.unwrap().unwrap();
343                assert_eq!(pos, i);
344                assert_eq!(item, vec![i as u8]);
345                reader.ack(pos).await.unwrap();
346            }
347            assert!(reader.is_empty().await);
348        });
349    }
350
351    #[test_traced]
352    fn test_shared_concurrent() {
353        let executor = deterministic::Runner::default();
354        executor.start(|context| async move {
355            let cfg = test_config("test_shared_concurrent", &context);
356            let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
357
358            // Spawn writer task
359            let writer_handle = context.with_label("writer").spawn(|_ctx| async move {
360                for i in 0..10u8 {
361                    writer.enqueue(vec![i]).await.unwrap();
362                }
363                writer
364            });
365
366            // Reader receives items as they come
367            let mut received = Vec::new();
368            for _ in 0..10 {
369                let (pos, item) = reader.recv().await.unwrap().unwrap();
370                received.push((pos, item.clone()));
371                reader.ack(pos).await.unwrap();
372            }
373
374            // Verify all items received in order
375            for (i, (pos, item)) in received.iter().enumerate() {
376                assert_eq!(*pos, i as u64);
377                assert_eq!(*item, vec![i as u8]);
378            }
379
380            let _ = writer_handle.await.unwrap();
381        });
382    }
383
384    #[test_traced]
385    fn test_shared_select() {
386        let executor = deterministic::Runner::default();
387        executor.start(|context| async move {
388            let cfg = test_config("test_shared_select", &context);
389            let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
390
391            // Enqueue an item
392            writer.enqueue(b"test".to_vec()).await.unwrap();
393
394            // Use select to receive with timeout
395            let result = select! {
396                item = reader.recv() => item,
397                _ = context.sleep(std::time::Duration::from_secs(1)) => {
398                    panic!("timeout")
399                },
400            };
401
402            let (pos, item) = result.unwrap().unwrap();
403            assert_eq!(pos, 0);
404            assert_eq!(item, b"test".to_vec());
405
406            reader.ack(pos).await.unwrap();
407        });
408    }
409
410    #[test_traced]
411    fn test_shared_writer_dropped() {
412        let executor = deterministic::Runner::default();
413        executor.start(|context| async move {
414            let cfg = test_config("test_shared_writer_dropped", &context);
415            let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
416
417            // Enqueue items then drop writer
418            writer.enqueue(b"item1".to_vec()).await.unwrap();
419            writer.enqueue(b"item2".to_vec()).await.unwrap();
420
421            // Get the queue before dropping writer
422            let queue = writer.queue.clone();
423            drop(writer);
424
425            // Reader should still get existing items
426            let (pos1, _) = reader.recv().await.unwrap().unwrap();
427            reader.ack(pos1).await.unwrap();
428
429            let (pos2, _) = reader.recv().await.unwrap().unwrap();
430            reader.ack(pos2).await.unwrap();
431
432            // Next recv should return None (writer dropped, queue empty)
433            let result = reader.recv().await.unwrap();
434            assert!(result.is_none());
435
436            drop(reader);
437            let _ = Arc::try_unwrap(queue)
438                .unwrap_or_else(|_| panic!("queue should have a single reference"))
439                .into_inner();
440        });
441    }
442
443    #[test_traced]
444    fn test_shared_try_recv() {
445        let executor = deterministic::Runner::default();
446        executor.start(|context| async move {
447            let cfg = test_config("test_shared_try_recv", &context);
448            let (writer, mut reader) = init(context, cfg).await.unwrap();
449
450            // try_recv on empty queue returns None
451            let result = reader.try_recv().await.unwrap();
452            assert!(result.is_none());
453
454            // Enqueue and try_recv
455            writer.enqueue(b"item".to_vec()).await.unwrap();
456            let (pos, item) = reader.try_recv().await.unwrap().unwrap();
457            assert_eq!(pos, 0);
458            assert_eq!(item, b"item".to_vec());
459
460            reader.ack(pos).await.unwrap();
461        });
462    }
463
464    #[test_traced]
465    fn test_shared_multiple_writers() {
466        let executor = deterministic::Runner::default();
467        executor.start(|context| async move {
468            let cfg = test_config("test_shared_multi_writer", &context);
469            let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
470
471            // Clone writer for second task
472            let writer2 = writer.clone();
473
474            // Spawn two writer tasks
475            let handle1 = context.with_label("writer1").spawn(|_ctx| async move {
476                for i in 0..5u8 {
477                    writer.enqueue(vec![i]).await.unwrap();
478                }
479                writer
480            });
481
482            let handle2 = context.with_label("writer2").spawn(|_ctx| async move {
483                for i in 5..10u8 {
484                    writer2.enqueue(vec![i]).await.unwrap();
485                }
486            });
487
488            // Reader receives all 10 items
489            let mut received = Vec::new();
490            for _ in 0..10 {
491                let (pos, item) = reader.recv().await.unwrap().unwrap();
492                received.push(item[0]);
493                reader.ack(pos).await.unwrap();
494            }
495
496            // All items should be received (order may vary due to concurrent writes)
497            received.sort();
498            assert_eq!(received, (0..10u8).collect::<Vec<_>>());
499
500            let _ = handle1.await.unwrap();
501            handle2.await.unwrap();
502        });
503    }
504}