Skip to main content

commonware_storage/queue/
shared.rs

1//! Shared queue with split writer/reader handles.
2//!
3//! Provides concurrent access to a [Queue] with multiple writers and a single reader.
4//! The reader can await new items using [Reader::recv], which integrates
5//! with `select!` for multiplexing with other futures.
6//!
7//! Writers can be cloned to allow multiple tasks to enqueue items concurrently.
8
9use super::{Config, Error, Queue};
10use crate::{Context, Persistable};
11use commonware_codec::CodecShared;
12use commonware_utils::{channel::mpsc, sync::AsyncMutex};
13use std::{ops::Range, sync::Arc};
14use tracing::debug;
15
16/// Writer handle for enqueueing items.
17///
18/// This handle can be cloned to allow multiple tasks to enqueue items concurrently.
19/// All clones share the same underlying queue and notification channel.
20pub struct Writer<E: Context, V: CodecShared> {
21    queue: Arc<AsyncMutex<Queue<E, V>>>,
22    notify: mpsc::Sender<()>,
23}
24
25impl<E: Context, V: CodecShared> Clone for Writer<E, V> {
26    fn clone(&self) -> Self {
27        Self {
28            queue: self.queue.clone(),
29            notify: self.notify.clone(),
30        }
31    }
32}
33
34impl<E: Context, V: CodecShared> Writer<E, V> {
35    /// Enqueue an item, returning its position. The lock is held for the
36    /// full append + commit, so no reader can see the item until it is durable.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if the underlying storage operation fails.
41    pub async fn enqueue(&self, item: V) -> Result<u64, Error> {
42        let pos = self.queue.lock().await.enqueue(item).await?;
43
44        // Fire-and-forget so the writer never blocks on reader wake-up.
45        // The reader always checks the queue under lock, so a missed
46        // notification never causes a missed item.
47        let _ = self.notify.try_send(());
48
49        debug!(position = pos, "writer: enqueued item");
50        Ok(pos)
51    }
52
53    /// Enqueue a batch of items with a single commit, returning positions
54    /// `[start, end)`. The lock is held for the full batch, so no reader can
55    /// see any item until the entire batch is durable.
56    ///
57    /// # Errors
58    ///
59    /// Returns an error if any append or the final commit fails.
60    pub async fn enqueue_bulk(
61        &self,
62        items: impl IntoIterator<Item = V>,
63    ) -> Result<Range<u64>, Error> {
64        let mut queue = self.queue.lock().await;
65        let start = queue.size().await;
66        for item in items {
67            queue.append(item).await?;
68        }
69        let end = queue.size().await;
70        if end > start {
71            queue.commit().await?;
72        }
73        drop(queue);
74
75        if start < end {
76            let _ = self.notify.try_send(());
77        }
78        debug!(start, end, "writer: enqueued bulk");
79        Ok(start..end)
80    }
81
82    /// Append an item without committing, returning its position. The item
83    /// is immediately visible to the reader but is **not durable** until
84    /// [Self::commit] is called or the underlying journal auto-syncs at a
85    /// section boundary (see [`variable::Journal`](crate::journal::contiguous::variable::Journal)
86    /// invariant 1).
87    ///
88    /// # Errors
89    ///
90    /// Returns an error if the underlying storage operation fails.
91    pub async fn append(&self, item: V) -> Result<u64, Error> {
92        let pos = self.queue.lock().await.append(item).await?;
93        let _ = self.notify.try_send(());
94        debug!(position = pos, "writer: appended item");
95        Ok(pos)
96    }
97
98    /// See [Queue::commit](super::Queue::commit).
99    pub async fn commit(&self) -> Result<(), Error> {
100        self.queue.lock().await.commit().await
101    }
102
103    /// See [Queue::sync](super::Queue::sync).
104    pub async fn sync(&self) -> Result<(), Error> {
105        self.queue.lock().await.sync().await
106    }
107
108    /// Returns the total number of items that have been enqueued.
109    pub async fn size(&self) -> u64 {
110        self.queue.lock().await.size().await
111    }
112}
113
114/// Reader handle for dequeuing and acknowledging items.
115///
116/// There should only be one reader per shared queue.
117pub struct Reader<E: Context, V: CodecShared> {
118    queue: Arc<AsyncMutex<Queue<E, V>>>,
119    notify: mpsc::Receiver<()>,
120}
121
122impl<E: Context, V: CodecShared> Reader<E, V> {
123    /// Receive the next unacknowledged item, waiting if necessary.
124    ///
125    /// This method is designed for use with `select!`. It will:
126    /// 1. Return immediately if an unacked item is available
127    /// 2. Wait for the writer to enqueue new items if the queue is empty
128    /// 3. Return `None` if the writer is dropped (no more items will arrive)
129    ///
130    /// # Errors
131    ///
132    /// Returns an error if the underlying storage operation fails.
133    pub async fn recv(&mut self) -> Result<Option<(u64, V)>, Error> {
134        loop {
135            // Try to dequeue an item
136            if let Some(item) = self.queue.lock().await.dequeue().await? {
137                return Ok(Some(item));
138            }
139
140            // No item available, wait for notification
141            // Returns None if writer is dropped
142            if self.notify.recv().await.is_none() {
143                // Writer dropped, drain any remaining items
144                return self.queue.lock().await.dequeue().await;
145            }
146        }
147    }
148
149    /// Try to dequeue the next unacknowledged item without waiting.
150    ///
151    /// Returns `None` immediately if no unacked item is available.
152    ///
153    /// # Errors
154    ///
155    /// Returns an error if the underlying storage operation fails.
156    pub async fn try_recv(&mut self) -> Result<Option<(u64, V)>, Error> {
157        // Drain pending notification (capacity is 1, so at most 1 buffered).
158        let _ = self.notify.try_recv();
159
160        self.queue.lock().await.dequeue().await
161    }
162
163    /// See [Queue::ack].
164    ///
165    /// # Errors
166    ///
167    /// Returns [super::Error::PositionOutOfRange] if the position is invalid.
168    pub async fn ack(&self, position: u64) -> Result<(), Error> {
169        self.queue.lock().await.ack(position).await
170    }
171
172    /// See [Queue::ack_up_to].
173    ///
174    /// # Errors
175    ///
176    /// Returns [super::Error::PositionOutOfRange] if `up_to` is invalid.
177    pub async fn ack_up_to(&self, up_to: u64) -> Result<(), Error> {
178        self.queue.lock().await.ack_up_to(up_to).await
179    }
180
181    /// See [Queue::ack_floor].
182    pub async fn ack_floor(&self) -> u64 {
183        self.queue.lock().await.ack_floor()
184    }
185
186    /// See [Queue::read_position].
187    pub async fn read_position(&self) -> u64 {
188        self.queue.lock().await.read_position()
189    }
190
191    /// See [Queue::is_empty].
192    pub async fn is_empty(&self) -> bool {
193        self.queue.lock().await.is_empty().await
194    }
195
196    /// See [Queue::reset].
197    pub async fn reset(&self) {
198        self.queue.lock().await.reset();
199    }
200}
201
202/// Initialize a shared queue and split into writer and reader handles.
203///
204/// # Example
205///
206/// ```rust,ignore
207/// use commonware_macros::select;
208///
209/// let (writer, mut reader) = shared::init(context, config).await?;
210///
211/// // Writer task (clone for multiple producers)
212/// writer.enqueue(item).await?;
213///
214/// // Reader task
215/// loop {
216///     select! {
217///         result = reader.recv() => {
218///             let Some((pos, item)) = result? else { break };
219///             // Process item...
220///             reader.ack(pos).await?;
221///         }
222///         _ = shutdown => break,
223///     }
224/// }
225/// ```
226pub async fn init<E: Context, V: CodecShared>(
227    context: E,
228    cfg: Config<V::Cfg>,
229) -> Result<(Writer<E, V>, Reader<E, V>), Error> {
230    let queue = Arc::new(AsyncMutex::new(Queue::init(context, cfg).await?));
231    let (notify_tx, notify_rx) = mpsc::channel(1);
232
233    let writer = Writer {
234        queue: queue.clone(),
235        notify: notify_tx,
236    };
237
238    let reader = Reader {
239        queue,
240        notify: notify_rx,
241    };
242
243    Ok((writer, reader))
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use commonware_codec::RangeCfg;
250    use commonware_macros::{select, test_traced};
251    use commonware_runtime::{
252        buffer::paged::CacheRef, deterministic, BufferPooler, Clock, Runner, Spawner,
253        Supervisor as _,
254    };
255    use commonware_utils::{NZUsize, NZU16, NZU64};
256    use std::num::{NonZeroU16, NonZeroUsize};
257
258    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
259    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
260
261    fn test_config(partition: &str, pooler: &impl BufferPooler) -> Config<(RangeCfg<usize>, ())> {
262        Config {
263            partition: partition.into(),
264            items_per_section: NZU64!(10),
265            compression: None,
266            codec_config: ((0..).into(), ()),
267            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
268            write_buffer: NZUsize!(4096),
269        }
270    }
271
272    #[test_traced]
273    fn test_shared_basic() {
274        let executor = deterministic::Runner::default();
275        executor.start(|context| async move {
276            let cfg = test_config("test_shared_basic", &context);
277            let (writer, mut reader) = init(context, cfg).await.unwrap();
278
279            // Enqueue from writer
280            let pos = writer.enqueue(b"hello".to_vec()).await.unwrap();
281            assert_eq!(pos, 0);
282
283            // Receive from reader
284            let (recv_pos, item) = reader.recv().await.unwrap().unwrap();
285            assert_eq!(recv_pos, 0);
286            assert_eq!(item, b"hello".to_vec());
287
288            // Ack the item
289            reader.ack(recv_pos).await.unwrap();
290            assert!(reader.is_empty().await);
291        });
292    }
293
294    #[test_traced]
295    fn test_shared_append_commit() {
296        let executor = deterministic::Runner::default();
297        executor.start(|context| async move {
298            let cfg = test_config("test_shared_append_commit", &context);
299            let (writer, mut reader) = init(context, cfg).await.unwrap();
300
301            // Append several items without committing
302            for i in 0..5u8 {
303                let pos = writer.append(vec![i]).await.unwrap();
304                assert_eq!(pos, i as u64);
305            }
306
307            // Reader can see them before commit
308            let (pos, item) = reader.recv().await.unwrap().unwrap();
309            assert_eq!(pos, 0);
310            assert_eq!(item, vec![0]);
311
312            // Commit to make durable
313            writer.commit().await.unwrap();
314
315            // Remaining items still readable
316            for i in 1..5 {
317                let (pos, item) = reader.recv().await.unwrap().unwrap();
318                assert_eq!(pos, i);
319                assert_eq!(item, vec![i as u8]);
320                reader.ack(pos).await.unwrap();
321            }
322
323            reader.ack(0).await.unwrap();
324            assert!(reader.is_empty().await);
325        });
326    }
327
328    #[test_traced]
329    fn test_shared_enqueue_bulk() {
330        let executor = deterministic::Runner::default();
331        executor.start(|context| async move {
332            let cfg = test_config("test_shared_bulk", &context);
333            let (writer, mut reader) = init(context, cfg).await.unwrap();
334
335            let range = writer
336                .enqueue_bulk((0..5u8).map(|i| vec![i]))
337                .await
338                .unwrap();
339            assert_eq!(range, 0..5);
340
341            for i in 0..5 {
342                let (pos, item) = reader.recv().await.unwrap().unwrap();
343                assert_eq!(pos, i);
344                assert_eq!(item, vec![i as u8]);
345                reader.ack(pos).await.unwrap();
346            }
347            assert!(reader.is_empty().await);
348        });
349    }
350
351    #[test_traced]
352    fn test_shared_concurrent() {
353        let executor = deterministic::Runner::default();
354        executor.start(|context| async move {
355            let cfg = test_config("test_shared_concurrent", &context);
356            let (writer, mut reader) = init(context.child("storage"), cfg).await.unwrap();
357
358            // Spawn writer task
359            let writer_handle = context.child("writer").spawn(|_ctx| async move {
360                for i in 0..10u8 {
361                    writer.enqueue(vec![i]).await.unwrap();
362                }
363                writer
364            });
365
366            // Reader receives items as they come
367            let mut received = Vec::new();
368            for _ in 0..10 {
369                let (pos, item) = reader.recv().await.unwrap().unwrap();
370                received.push((pos, item.clone()));
371                reader.ack(pos).await.unwrap();
372            }
373
374            // Verify all items received in order
375            for (i, (pos, item)) in received.iter().enumerate() {
376                assert_eq!(*pos, i as u64);
377                assert_eq!(*item, vec![i as u8]);
378            }
379
380            let _ = writer_handle.await.unwrap();
381        });
382    }
383
384    #[test_traced]
385    fn test_shared_select() {
386        let executor = deterministic::Runner::default();
387        executor.start(|context| async move {
388            let cfg = test_config("test_shared_select", &context);
389            let (writer, mut reader) = init(context.child("storage"), cfg).await.unwrap();
390
391            // Enqueue an item
392            writer.enqueue(b"test".to_vec()).await.unwrap();
393
394            // Use select to receive with timeout
395            let result = select! {
396                item = reader.recv() => item,
397                _ = context.sleep(std::time::Duration::from_secs(1)) => {
398                    panic!("timeout")
399                },
400            };
401
402            let (pos, item) = result.unwrap().unwrap();
403            assert_eq!(pos, 0);
404            assert_eq!(item, b"test".to_vec());
405
406            reader.ack(pos).await.unwrap();
407        });
408    }
409
410    #[test_traced]
411    fn test_shared_writer_dropped() {
412        let executor = deterministic::Runner::default();
413        executor.start(|context| async move {
414            let cfg = test_config("test_shared_writer_dropped", &context);
415            let (writer, mut reader) = init(context.child("storage"), cfg).await.unwrap();
416
417            // Enqueue items then drop writer
418            writer.enqueue(b"item1".to_vec()).await.unwrap();
419            writer.enqueue(b"item2".to_vec()).await.unwrap();
420
421            // Get the queue before dropping writer
422            let queue = writer.queue.clone();
423            drop(writer);
424
425            // Reader should still get existing items
426            let (pos1, _) = reader.recv().await.unwrap().unwrap();
427            reader.ack(pos1).await.unwrap();
428
429            let (pos2, _) = reader.recv().await.unwrap().unwrap();
430            reader.ack(pos2).await.unwrap();
431
432            // Next recv should return None (writer dropped, queue empty)
433            let result = reader.recv().await.unwrap();
434            assert!(result.is_none());
435
436            drop(reader);
437            let _ = Arc::try_unwrap(queue)
438                .unwrap_or_else(|_| panic!("queue should have a single reference"))
439                .into_inner();
440        });
441    }
442
443    #[test_traced]
444    fn test_shared_try_recv() {
445        let executor = deterministic::Runner::default();
446        executor.start(|context| async move {
447            let cfg = test_config("test_shared_try_recv", &context);
448            let (writer, mut reader) = init(context, cfg).await.unwrap();
449
450            // try_recv on empty queue returns None
451            let result = reader.try_recv().await.unwrap();
452            assert!(result.is_none());
453
454            // Enqueue and try_recv
455            writer.enqueue(b"item".to_vec()).await.unwrap();
456            let (pos, item) = reader.try_recv().await.unwrap().unwrap();
457            assert_eq!(pos, 0);
458            assert_eq!(item, b"item".to_vec());
459
460            reader.ack(pos).await.unwrap();
461        });
462    }
463
464    #[test_traced]
465    fn test_shared_multiple_writers() {
466        let executor = deterministic::Runner::default();
467        executor.start(|context| async move {
468            let cfg = test_config("test_shared_multi_writer", &context);
469            let (writer, mut reader) = init(context.child("storage"), cfg).await.unwrap();
470
471            // Clone writer for second task
472            let writer2 = writer.clone();
473
474            // Spawn two writer tasks
475            let handle1 =
476                context
477                    .child("writer")
478                    .with_attribute("index", 1)
479                    .spawn(|_ctx| async move {
480                        for i in 0..5u8 {
481                            writer.enqueue(vec![i]).await.unwrap();
482                        }
483                        writer
484                    });
485
486            let handle2 =
487                context
488                    .child("writer")
489                    .with_attribute("index", 2)
490                    .spawn(|_ctx| async move {
491                        for i in 5..10u8 {
492                            writer2.enqueue(vec![i]).await.unwrap();
493                        }
494                    });
495
496            // Reader receives all 10 items
497            let mut received = Vec::new();
498            for _ in 0..10 {
499                let (pos, item) = reader.recv().await.unwrap().unwrap();
500                received.push(item[0]);
501                reader.ack(pos).await.unwrap();
502            }
503
504            // All items should be received (order may vary due to concurrent writes)
505            received.sort();
506            assert_eq!(received, (0..10u8).collect::<Vec<_>>());
507
508            let _ = handle1.await.unwrap();
509            handle2.await.unwrap();
510        });
511    }
512}