Skip to main content

commonware_storage/queue/
shared.rs

1//! Shared queue with split writer/reader handles.
2//!
3//! Provides concurrent access to a [Queue] with multiple writers and a single reader.
4//! The reader can await new items using [Reader::recv], which integrates
5//! with `select!` for multiplexing with other futures.
6//!
7//! Writers can be cloned to allow multiple tasks to enqueue items concurrently.
8
9use super::{Config, Error, Queue};
10use crate::{Context, Persistable};
11use commonware_codec::CodecShared;
12use commonware_utils::{channel::mpsc, sync::AsyncMutex};
13use std::{ops::Range, sync::Arc};
14use tracing::debug;
15
16/// Writer handle for enqueueing items.
17///
18/// This handle can be cloned to allow multiple tasks to enqueue items concurrently.
19/// All clones share the same underlying queue and notification channel.
20pub struct Writer<E: Context, V: CodecShared> {
21    queue: Arc<AsyncMutex<Queue<E, V>>>,
22    notify: mpsc::Sender<()>,
23}
24
25impl<E: Context, V: CodecShared> Clone for Writer<E, V> {
26    fn clone(&self) -> Self {
27        Self {
28            queue: self.queue.clone(),
29            notify: self.notify.clone(),
30        }
31    }
32}
33
34impl<E: Context, V: CodecShared> Writer<E, V> {
35    /// Enqueue an item, returning its position. The lock is held for the
36    /// full append + commit, so no reader can see the item until it is durable.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if the underlying storage operation fails.
41    pub async fn enqueue(&self, item: V) -> Result<u64, Error> {
42        let pos = self.queue.lock().await.enqueue(item).await?;
43
44        // Fire-and-forget so the writer never blocks on reader wake-up.
45        // The reader always checks the queue under lock, so a missed
46        // notification never causes a missed item.
47        let _ = self.notify.try_send(());
48
49        debug!(position = pos, "writer: enqueued item");
50        Ok(pos)
51    }
52
53    /// Enqueue a batch of items with a single commit, returning positions
54    /// `[start, end)`. The lock is held for the full batch, so no reader can
55    /// see any item until the entire batch is durable.
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if any append or the final commit fails.
60    pub async fn enqueue_bulk(
61        &self,
62        items: impl IntoIterator<Item = V>,
63    ) -> Result<Range<u64>, Error> {
64        let mut queue = self.queue.lock().await;
65        let start = queue.size().await;
66        for item in items {
67            queue.append(item).await?;
68        }
69        let end = queue.size().await;
70        if end > start {
71            queue.commit().await?;
72        }
73        drop(queue);
74
75        if start < end {
76            let _ = self.notify.try_send(());
77        }
78        debug!(start, end, "writer: enqueued bulk");
79        Ok(start..end)
80    }
81
82    /// Append an item without committing, returning its position. The item
83    /// is immediately visible to the reader but is **not durable** until
84    /// [Self::commit] is called or the underlying journal auto-syncs at a
85    /// section boundary (see [`variable::Journal`](crate::journal::contiguous::variable::Journal)
86    /// invariant 1).
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if the underlying storage operation fails.
91    pub async fn append(&self, item: V) -> Result<u64, Error> {
92        let pos = self.queue.lock().await.append(item).await?;
93        let _ = self.notify.try_send(());
94        debug!(position = pos, "writer: appended item");
95        Ok(pos)
96    }
97
98    /// See [Queue::commit](super::Queue::commit).
99    pub async fn commit(&self) -> Result<(), Error> {
100        self.queue.lock().await.commit().await
101    }
102
103    /// See [Queue::sync](super::Queue::sync).
104    pub async fn sync(&self) -> Result<(), Error> {
105        self.queue.lock().await.sync().await
106    }
107
108    /// Returns the total number of items that have been enqueued.
109    pub async fn size(&self) -> u64 {
110        self.queue.lock().await.size().await
111    }
112}
113
114/// Reader handle for dequeuing and acknowledging items.
115///
116/// There should only be one reader per shared queue.
117pub struct Reader<E: Context, V: CodecShared> {
118    queue: Arc<AsyncMutex<Queue<E, V>>>,
119    notify: mpsc::Receiver<()>,
120}
121
122impl<E: Context, V: CodecShared> Reader<E, V> {
123    /// Receive the next unacknowledged item, waiting if necessary.
124    ///
125    /// This method is designed for use with `select!`. It will:
126    /// 1. Return immediately if an unacked item is available
127    /// 2. Wait for the writer to enqueue new items if the queue is empty
128    /// 3. Return `None` if the writer is dropped (no more items will arrive)
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if the underlying storage operation fails.
133    pub async fn recv(&mut self) -> Result<Option<(u64, V)>, Error> {
134        loop {
135            // Try to dequeue an item
136            if let Some(item) = self.queue.lock().await.dequeue().await? {
137                return Ok(Some(item));
138            }
139
140            // No item available, wait for notification
141            // Returns None if writer is dropped
142            if self.notify.recv().await.is_none() {
143                // Writer dropped, drain any remaining items
144                return self.queue.lock().await.dequeue().await;
145            }
146        }
147    }
148
149    /// Try to dequeue the next unacknowledged item without waiting.
150    ///
151    /// Returns `None` immediately if no unacked item is available.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the underlying storage operation fails.
156    pub async fn try_recv(&mut self) -> Result<Option<(u64, V)>, Error> {
157        // Drain pending notification (capacity is 1, so at most 1 buffered).
158        let _ = self.notify.try_recv();
159
160        self.queue.lock().await.dequeue().await
161    }
162
163    /// See [Queue::ack].
164    ///
165    /// # Errors
166    ///
167    /// Returns [super::Error::PositionOutOfRange] if the position is invalid.
168    pub async fn ack(&self, position: u64) -> Result<(), Error> {
169        self.queue.lock().await.ack(position).await
170    }
171
172    /// See [Queue::ack_up_to].
173    ///
174    /// # Errors
175    ///
176    /// Returns [super::Error::PositionOutOfRange] if `up_to` is invalid.
177    pub async fn ack_up_to(&self, up_to: u64) -> Result<(), Error> {
178        self.queue.lock().await.ack_up_to(up_to).await
179    }
180
181    /// See [Queue::ack_floor].
182    pub async fn ack_floor(&self) -> u64 {
183        self.queue.lock().await.ack_floor()
184    }
185
186    /// See [Queue::read_position].
187    pub async fn read_position(&self) -> u64 {
188        self.queue.lock().await.read_position()
189    }
190
191    /// See [Queue::is_empty].
192    pub async fn is_empty(&self) -> bool {
193        self.queue.lock().await.is_empty().await
194    }
195
196    /// See [Queue::reset].
197    pub async fn reset(&self) {
198        self.queue.lock().await.reset();
199    }
200}
201
202/// Initialize a shared queue and split into writer and reader handles.
203///
204/// # Example
205///
206/// ```rust,ignore
207/// use commonware_macros::select;
208///
209/// let (writer, mut reader) = shared::init(context, config).await?;
210///
211/// // Writer task (clone for multiple producers)
212/// writer.enqueue(item).await?;
213///
214/// // Reader task
215/// loop {
216///     select! {
217///         result = reader.recv() => {
218///             let Some((pos, item)) = result? else { break };
219///             // Process item...
220///             reader.ack(pos).await?;
221///         }
222///         _ = shutdown => break,
223///     }
224/// }
225/// ```
226pub async fn init<E: Context, V: CodecShared>(
227    context: E,
228    cfg: Config<V::Cfg>,
229) -> Result<(Writer<E, V>, Reader<E, V>), Error> {
230    let queue = Arc::new(AsyncMutex::new(Queue::init(context, cfg).await?));
231    let (notify_tx, notify_rx) = mpsc::channel(1);
232
233    let writer = Writer {
234        queue: queue.clone(),
235        notify: notify_tx,
236    };
237
238    let reader = Reader {
239        queue,
240        notify: notify_rx,
241    };
242
243    Ok((writer, reader))
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use commonware_codec::RangeCfg;
250    use commonware_macros::{select, test_traced};
251    use commonware_runtime::{
252        buffer::paged::CacheRef, deterministic, BufferPooler, Clock, Metrics, Runner, Spawner,
253    };
254    use commonware_utils::{NZUsize, NZU16, NZU64};
255    use std::num::{NonZeroU16, NonZeroUsize};
256
257    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
258    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
259
260    fn test_config(partition: &str, pooler: &impl BufferPooler) -> Config<(RangeCfg<usize>, ())> {
261        Config {
262            partition: partition.into(),
263            items_per_section: NZU64!(10),
264            compression: None,
265            codec_config: ((0..).into(), ()),
266            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
267            write_buffer: NZUsize!(4096),
268        }
269    }
270
271    #[test_traced]
272    fn test_shared_basic() {
273        let executor = deterministic::Runner::default();
274        executor.start(|context| async move {
275            let cfg = test_config("test_shared_basic", &context);
276            let (writer, mut reader) = init(context, cfg).await.unwrap();
277
278            // Enqueue from writer
279            let pos = writer.enqueue(b"hello".to_vec()).await.unwrap();
280            assert_eq!(pos, 0);
281
282            // Receive from reader
283            let (recv_pos, item) = reader.recv().await.unwrap().unwrap();
284            assert_eq!(recv_pos, 0);
285            assert_eq!(item, b"hello".to_vec());
286
287            // Ack the item
288            reader.ack(recv_pos).await.unwrap();
289            assert!(reader.is_empty().await);
290        });
291    }
292
293    #[test_traced]
294    fn test_shared_append_commit() {
295        let executor = deterministic::Runner::default();
296        executor.start(|context| async move {
297            let cfg = test_config("test_shared_append_commit", &context);
298            let (writer, mut reader) = init(context, cfg).await.unwrap();
299
300            // Append several items without committing
301            for i in 0..5u8 {
302                let pos = writer.append(vec![i]).await.unwrap();
303                assert_eq!(pos, i as u64);
304            }
305
306            // Reader can see them before commit
307            let (pos, item) = reader.recv().await.unwrap().unwrap();
308            assert_eq!(pos, 0);
309            assert_eq!(item, vec![0]);
310
311            // Commit to make durable
312            writer.commit().await.unwrap();
313
314            // Remaining items still readable
315            for i in 1..5 {
316                let (pos, item) = reader.recv().await.unwrap().unwrap();
317                assert_eq!(pos, i);
318                assert_eq!(item, vec![i as u8]);
319                reader.ack(pos).await.unwrap();
320            }
321
322            reader.ack(0).await.unwrap();
323            assert!(reader.is_empty().await);
324        });
325    }
326
327    #[test_traced]
328    fn test_shared_enqueue_bulk() {
329        let executor = deterministic::Runner::default();
330        executor.start(|context| async move {
331            let cfg = test_config("test_shared_bulk", &context);
332            let (writer, mut reader) = init(context, cfg).await.unwrap();
333
334            let range = writer
335                .enqueue_bulk((0..5u8).map(|i| vec![i]))
336                .await
337                .unwrap();
338            assert_eq!(range, 0..5);
339
340            for i in 0..5 {
341                let (pos, item) = reader.recv().await.unwrap().unwrap();
342                assert_eq!(pos, i);
343                assert_eq!(item, vec![i as u8]);
344                reader.ack(pos).await.unwrap();
345            }
346            assert!(reader.is_empty().await);
347        });
348    }
349
350    #[test_traced]
351    fn test_shared_concurrent() {
352        let executor = deterministic::Runner::default();
353        executor.start(|context| async move {
354            let cfg = test_config("test_shared_concurrent", &context);
355            let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
356
357            // Spawn writer task
358            let writer_handle = context.with_label("writer").spawn(|_ctx| async move {
359                for i in 0..10u8 {
360                    writer.enqueue(vec![i]).await.unwrap();
361                }
362                writer
363            });
364
365            // Reader receives items as they come
366            let mut received = Vec::new();
367            for _ in 0..10 {
368                let (pos, item) = reader.recv().await.unwrap().unwrap();
369                received.push((pos, item.clone()));
370                reader.ack(pos).await.unwrap();
371            }
372
373            // Verify all items received in order
374            for (i, (pos, item)) in received.iter().enumerate() {
375                assert_eq!(*pos, i as u64);
376                assert_eq!(*item, vec![i as u8]);
377            }
378
379            let _ = writer_handle.await.unwrap();
380        });
381    }
382
383    #[test_traced]
384    fn test_shared_select() {
385        let executor = deterministic::Runner::default();
386        executor.start(|context| async move {
387            let cfg = test_config("test_shared_select", &context);
388            let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
389
390            // Enqueue an item
391            writer.enqueue(b"test".to_vec()).await.unwrap();
392
393            // Use select to receive with timeout
394            let result = select! {
395                item = reader.recv() => item,
396                _ = context.sleep(std::time::Duration::from_secs(1)) => {
397                    panic!("timeout")
398                },
399            };
400
401            let (pos, item) = result.unwrap().unwrap();
402            assert_eq!(pos, 0);
403            assert_eq!(item, b"test".to_vec());
404
405            reader.ack(pos).await.unwrap();
406        });
407    }
408
409    #[test_traced]
410    fn test_shared_writer_dropped() {
411        let executor = deterministic::Runner::default();
412        executor.start(|context| async move {
413            let cfg = test_config("test_shared_writer_dropped", &context);
414            let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
415
416            // Enqueue items then drop writer
417            writer.enqueue(b"item1".to_vec()).await.unwrap();
418            writer.enqueue(b"item2".to_vec()).await.unwrap();
419
420            // Get the queue before dropping writer
421            let queue = writer.queue.clone();
422            drop(writer);
423
424            // Reader should still get existing items
425            let (pos1, _) = reader.recv().await.unwrap().unwrap();
426            reader.ack(pos1).await.unwrap();
427
428            let (pos2, _) = reader.recv().await.unwrap().unwrap();
429            reader.ack(pos2).await.unwrap();
430
431            // Next recv should return None (writer dropped, queue empty)
432            let result = reader.recv().await.unwrap();
433            assert!(result.is_none());
434
435            drop(reader);
436            let _ = Arc::try_unwrap(queue)
437                .unwrap_or_else(|_| panic!("queue should have a single reference"))
438                .into_inner();
439        });
440    }
441
442    #[test_traced]
443    fn test_shared_try_recv() {
444        let executor = deterministic::Runner::default();
445        executor.start(|context| async move {
446            let cfg = test_config("test_shared_try_recv", &context);
447            let (writer, mut reader) = init(context, cfg).await.unwrap();
448
449            // try_recv on empty queue returns None
450            let result = reader.try_recv().await.unwrap();
451            assert!(result.is_none());
452
453            // Enqueue and try_recv
454            writer.enqueue(b"item".to_vec()).await.unwrap();
455            let (pos, item) = reader.try_recv().await.unwrap().unwrap();
456            assert_eq!(pos, 0);
457            assert_eq!(item, b"item".to_vec());
458
459            reader.ack(pos).await.unwrap();
460        });
461    }
462
463    #[test_traced]
464    fn test_shared_multiple_writers() {
465        let executor = deterministic::Runner::default();
466        executor.start(|context| async move {
467            let cfg = test_config("test_shared_multi_writer", &context);
468            let (writer, mut reader) = init(context.clone(), cfg).await.unwrap();
469
470            // Clone writer for second task
471            let writer2 = writer.clone();
472
473            // Spawn two writer tasks
474            let handle1 = context.with_label("writer1").spawn(|_ctx| async move {
475                for i in 0..5u8 {
476                    writer.enqueue(vec![i]).await.unwrap();
477                }
478                writer
479            });
480
481            let handle2 = context.with_label("writer2").spawn(|_ctx| async move {
482                for i in 5..10u8 {
483                    writer2.enqueue(vec![i]).await.unwrap();
484                }
485            });
486
487            // Reader receives all 10 items
488            let mut received = Vec::new();
489            for _ in 0..10 {
490                let (pos, item) = reader.recv().await.unwrap().unwrap();
491                received.push(item[0]);
492                reader.ack(pos).await.unwrap();
493            }
494
495            // All items should be received (order may vary due to concurrent writes)
496            received.sort();
497            assert_eq!(received, (0..10u8).collect::<Vec<_>>());
498
499            let _ = handle1.await.unwrap();
500            handle2.await.unwrap();
501        });
502    }
503}