Skip to main content

commonware_storage/queue/
storage.rs

1//! Queue storage implementation.
2
3use super::{metrics, Error};
4use crate::{
5    journal::contiguous::{variable, Reader as _},
6    rmap::RMap,
7    Context, Persistable,
8};
9use commonware_codec::CodecShared;
10use commonware_runtime::{buffer::paged::CacheRef, telemetry::metrics::GaugeExt};
11use std::num::{NonZeroU64, NonZeroUsize};
12use tracing::debug;
13
14/// Configuration for [Queue].
15#[derive(Clone)]
16pub struct Config<C> {
17    /// The storage partition name for the queue's journal.
18    pub partition: String,
19
20    /// The number of items to store in each journal section.
21    ///
22    /// Larger values reduce file overhead but increase minimum pruning granularity.
23    /// Once set, this value cannot be changed across restarts.
24    pub items_per_section: NonZeroU64,
25
26    /// Optional zstd compression level for stored items.
27    ///
28    /// If set, items will be compressed before storage. Higher values provide
29    /// better compression but use more CPU.
30    pub compression: Option<u8>,
31
32    /// Codec configuration for encoding/decoding items.
33    pub codec_config: C,
34
35    /// Page cache for buffering reads from the underlying journal.
36    pub page_cache: CacheRef,
37
38    /// Write buffer size for each section.
39    pub write_buffer: NonZeroUsize,
40}
41
42/// A durable, at-least-once delivery queue with per-item acknowledgment.
43///
44/// Items are durably stored in a journal and survive crashes. The reader must
45/// acknowledge each item individually after processing. Items can be acknowledged
46/// out of order, enabling parallel processing.
47///
48/// # Operations
49///
50/// - [append](Self::append) / [commit](Self::commit): Write items to the journal
51///   buffer, then persist. Items are readable immediately after append (before commit),
52///   but are lost on restart if not committed.
53/// - [enqueue](Self::enqueue): Append + commit in one step; the item is durable before return.
54/// - [dequeue](Self::dequeue): Return the next unacked item in FIFO order.
55/// - [ack](Self::ack) / [ack_up_to](Self::ack_up_to): Mark items as processed (in-memory only).
56/// - [sync](Self::sync): Commit, then prune completed sections below the ack floor.
57///
58/// # Acknowledgment
59///
60/// Acks are tracked in-memory with an `ack_floor` (all positions below are acked)
61/// plus an [RMap] of acked positions above the floor. When items are acked
62/// contiguously from the floor, the floor advances automatically.
63///
64/// Acks are **not** persisted. The durable equivalent is the journal's pruning
65/// boundary, advanced by [sync](Self::sync). On restart, all non-pruned
66/// items are re-delivered regardless of prior ack state.
67///
68/// # Crash Recovery
69///
70/// On restart, `ack_floor` is set to the journal's pruning boundary.
71/// Items that were pruned are gone; everything else is re-delivered.
72/// Applications must handle duplicates (idempotent processing).
73pub struct Queue<E: Context, V: CodecShared> {
74    /// The underlying journal storing queue items.
75    journal: variable::Journal<E, V>,
76
77    /// Position of the next item to dequeue.
78    ///
79    /// Invariant: `read_pos <= journal.size()`. Note that `ack_up_to` can advance
80    /// `ack_floor` past `read_pos`; in this case, `dequeue` skips the already-acked items.
81    read_pos: u64,
82
83    /// All items at positions < ack_floor are considered acknowledged.
84    ///
85    /// On restart, this is initialized to `journal.bounds().start`.
86    ack_floor: u64,
87
88    /// Ranges of acknowledged items at positions >= ack_floor (in-memory only).
89    ///
90    /// When an item at position == ack_floor is acked, the floor advances
91    /// and any contiguous acked items are consumed. Lost on restart.
92    acked_above: RMap,
93
94    /// Metrics for monitoring queue state.
95    metrics: metrics::Metrics,
96}
97
98impl<E: Context, V: CodecShared> Queue<E, V> {
99    /// Initialize a queue from storage.
100    ///
101    /// On first initialization, creates an empty queue. On restart, begins reading
102    /// from the journal's pruning boundary (providing at-least-once delivery for
103    /// all non-pruned items).
104    ///
105    /// # Errors
106    ///
107    /// Returns an error if the underlying journal cannot be initialized.
108    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
109        // Initialize metrics before creating sub-contexts
110        let metrics = metrics::Metrics::init(&context);
111
112        let journal = variable::Journal::init(
113            context.child("journal"),
114            variable::Config {
115                partition: cfg.partition,
116                items_per_section: cfg.items_per_section,
117                compression: cfg.compression,
118                codec_config: cfg.codec_config,
119                page_cache: cfg.page_cache,
120                write_buffer: cfg.write_buffer,
121            },
122        )
123        .await?;
124
125        // On restart, ack_floor is the pruning boundary (items below are deleted).
126        // acked_above is empty (in-memory state lost on restart).
127        let bounds = journal.reader().await.bounds();
128        let acked_above = RMap::new();
129
130        debug!(floor = bounds.start, size = bounds.end, "queue initialized");
131
132        // Set initial metric values
133        let _ = metrics.tip.try_set(bounds.end);
134        let _ = metrics.floor.try_set(bounds.start);
135        let _ = metrics.next.try_set(bounds.start);
136
137        Ok(Self {
138            journal,
139            read_pos: bounds.start,
140            ack_floor: bounds.start,
141            acked_above,
142            metrics,
143        })
144    }
145
146    /// Returns whether a specific position has been acknowledged.
147    pub fn is_acked(&self, position: u64) -> bool {
148        position < self.ack_floor || self.acked_above.get(&position).is_some()
149    }
150
151    /// Append an item without persisting. Call [Self::commit] or [Self::sync]
152    /// afterwards to make it durable. The item is readable immediately but
153    /// is not guaranteed to survive a crash until committed or the journal
154    /// auto-syncs at a section boundary (see [`variable::Journal`] invariant 1).
155    ///
156    /// # Errors
157    ///
158    /// Returns an error if the underlying storage operation fails.
159    pub async fn append(&mut self, item: V) -> Result<u64, Error> {
160        let pos = self.journal.append(&item).await?;
161        let _ = self.metrics.tip.try_set(pos + 1);
162        debug!(pos, "appended item");
163        Ok(pos)
164    }
165
166    /// Append and commit an item in one step, returning its position.
167    /// The item is durable before this method returns.
168    ///
169    /// # Errors
170    ///
171    /// Returns an error if the underlying storage operation fails.
172    pub async fn enqueue(&mut self, item: V) -> Result<u64, Error> {
173        let pos = self.append(item).await?;
174        self.commit().await?;
175        Ok(pos)
176    }
177
178    /// Dequeue the next unacknowledged item, returning its position and value.
179    /// Returns `None` when all items have been read or acknowledged.
180    /// Already-acked items are skipped automatically.
181    ///
182    /// # Errors
183    ///
184    /// Returns an error if the underlying storage operation fails.
185    pub async fn dequeue(&mut self) -> Result<Option<(u64, V)>, Error> {
186        let reader = self.journal.reader().await;
187        let size = reader.bounds().end;
188
189        // Fast-forward above ack floor
190        if self.read_pos < self.ack_floor {
191            self.read_pos = self.ack_floor;
192        }
193
194        // Fast-forward past the ack range containing read_pos (if any).
195        if let Some((_, end)) = self.acked_above.get(&self.read_pos) {
196            self.read_pos = end.saturating_add(1);
197        }
198
199        // If the read position is greater than the size of the journal, return None.
200        let _ = self.metrics.next.try_set(self.read_pos);
201        if self.read_pos >= size {
202            return Ok(None);
203        }
204
205        let item = reader.read(self.read_pos).await?;
206        let pos = self.read_pos;
207        self.read_pos += 1;
208        let _ = self.metrics.next.try_set(self.read_pos);
209        debug!(position = pos, "dequeued item");
210        Ok(Some((pos, item)))
211    }
212
213    /// Mark the item at `position` as processed (in-memory only).
214    /// The item will be skipped on subsequent dequeues. If this creates a
215    /// contiguous run from the ack floor, the floor advances automatically.
216    ///
217    /// # Errors
218    ///
219    /// Returns [Error::PositionOutOfRange] if `position >= queue size`.
220    pub async fn ack(&mut self, position: u64) -> Result<(), Error> {
221        let size = self.journal.size().await;
222        if position >= size {
223            return Err(Error::PositionOutOfRange(position, size));
224        }
225
226        // Already acked (below floor)
227        if position < self.ack_floor {
228            return Ok(());
229        }
230
231        // Already acked (above floor)
232        if self.acked_above.get(&position).is_some() {
233            return Ok(());
234        }
235
236        // Check if we can advance the floor
237        if position == self.ack_floor {
238            // Advance floor, consuming any contiguous acked items
239            let next = position + 1;
240            let final_floor = match self.acked_above.get(&next) {
241                Some((_, end)) => end + 1,
242                None => next,
243            };
244            self.acked_above.remove(next, final_floor - 1);
245            self.ack_floor = final_floor;
246            let _ = self.metrics.floor.try_set(self.ack_floor);
247            debug!(floor = self.ack_floor, "advanced ack floor");
248        } else {
249            // Floor is not advancing, so add to acked_above
250            self.acked_above.insert(position);
251            debug!(position, "acked item above floor");
252        }
253        Ok(())
254    }
255
256    /// Acknowledge all items in `[ack_floor, up_to)` by advancing the floor
257    /// directly. More efficient than calling [Self::ack] in a loop.
258    ///
259    /// # Errors
260    ///
261    /// Returns [Error::PositionOutOfRange] if `up_to > queue size`.
262    pub async fn ack_up_to(&mut self, up_to: u64) -> Result<(), Error> {
263        let size = self.journal.size().await;
264        if up_to > size {
265            return Err(Error::PositionOutOfRange(up_to, size));
266        }
267
268        // Nothing to do if up_to is at or below current floor
269        if up_to <= self.ack_floor {
270            return Ok(());
271        }
272
273        // Determine final floor: either up_to, or past any contiguous acked range at up_to
274        let final_floor = match self.acked_above.get(&up_to) {
275            Some((_, end)) => end + 1,
276            None => up_to,
277        };
278
279        // Remove all entries covered by the new floor and advance
280        self.acked_above.remove(self.ack_floor, final_floor - 1);
281        self.ack_floor = final_floor;
282        let _ = self.metrics.floor.try_set(self.ack_floor);
283        debug!(floor = self.ack_floor, "batch acked up to");
284        Ok(())
285    }
286
287    /// Returns the current read position.
288    ///
289    /// This is the position of the next item that will be checked by [Queue::dequeue].
290    pub const fn read_position(&self) -> u64 {
291        self.read_pos
292    }
293
294    /// Returns the current ack floor.
295    ///
296    /// All items at positions less than this value are considered acknowledged.
297    pub const fn ack_floor(&self) -> u64 {
298        self.ack_floor
299    }
300
301    /// Returns the total number of items that have been enqueued.
302    ///
303    /// This count is not affected by pruning. It represents the position that the
304    /// next enqueued item will receive.
305    pub async fn size(&self) -> u64 {
306        self.journal.size().await
307    }
308
309    /// Returns whether all enqueued items have been acknowledged.
310    pub async fn is_empty(&self) -> bool {
311        // If acked_above is non-empty, there's a gap at ack_floor (otherwise floor
312        // would have advanced). So all items acked implies ack_floor == size.
313        self.ack_floor >= self.journal.size().await
314    }
315
316    /// Reset the read position to the ack floor so [Self::dequeue] re-delivers
317    /// all unacknowledged items from the beginning.
318    pub fn reset(&mut self) {
319        let old_pos = self.read_pos;
320        self.read_pos = self.ack_floor;
321        let _ = self.metrics.next.try_set(self.read_pos);
322        debug!(
323            old_read_pos = old_pos,
324            new_read_pos = self.read_pos,
325            "reset read position"
326        );
327    }
328
329    /// Returns the number of items not yet read (test-only).
330    #[cfg(test)]
331    pub(crate) async fn pending(&self) -> u64 {
332        self.journal.size().await.saturating_sub(self.read_pos)
333    }
334
335    /// Returns the count of acknowledged items above the ack floor (test-only).
336    #[cfg(test)]
337    pub(crate) fn acked_above_count(&self) -> usize {
338        self.acked_above
339            .iter()
340            .map(|(&s, &e)| (e - s + 1) as usize)
341            .sum()
342    }
343}
344
345impl<E: Context + Send, V: CodecShared + Send> Persistable for Queue<E, V> {
346    type Error = Error;
347
348    async fn commit(&self) -> Result<(), Error> {
349        self.journal.commit().await?;
350        Ok(())
351    }
352
353    async fn sync(&self) -> Result<(), Error> {
354        self.journal.sync().await?;
355        self.journal.prune(self.ack_floor).await?;
356        Ok(())
357    }
358
359    async fn destroy(self) -> Result<(), Error> {
360        self.journal.destroy().await?;
361        Ok(())
362    }
363}
364
365#[cfg(test)]
366mod tests {
367    use super::*;
368    use commonware_codec::RangeCfg;
369    use commonware_macros::test_traced;
370    use commonware_runtime::{
371        buffer::paged::CacheRef, deterministic, BufferPooler, Metrics as _, Runner, Supervisor as _,
372    };
373    use commonware_utils::{NZUsize, NZU16, NZU64};
374    use std::num::NonZeroU16;
375
376    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
377    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
378
379    fn test_config(partition: &str, pooler: &impl BufferPooler) -> Config<(RangeCfg<usize>, ())> {
380        Config {
381            partition: partition.into(),
382            items_per_section: NZU64!(10),
383            compression: None,
384            codec_config: ((0..).into(), ()),
385            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
386            write_buffer: NZUsize!(4096),
387        }
388    }
389
390    #[test_traced]
391    fn test_basic_enqueue_dequeue() {
392        let executor = deterministic::Runner::default();
393        executor.start(|context| async move {
394            let cfg = test_config("test_basic", &context);
395            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
396                .await
397                .unwrap();
398
399            // Queue should be empty initially
400            assert!(queue.is_empty().await);
401            assert_eq!(queue.pending().await, 0);
402            assert_eq!(queue.size().await, 0);
403
404            // Enqueue items
405            let pos0 = queue.enqueue(b"item0".to_vec()).await.unwrap();
406            let pos1 = queue.enqueue(b"item1".to_vec()).await.unwrap();
407            let pos2 = queue.enqueue(b"item2".to_vec()).await.unwrap();
408
409            assert_eq!(pos0, 0);
410            assert_eq!(pos1, 1);
411            assert_eq!(pos2, 2);
412            assert_eq!(queue.size().await, 3);
413            assert_eq!(queue.pending().await, 3);
414            assert!(!queue.is_empty().await);
415
416            // Dequeue items
417            let (p, item) = queue.dequeue().await.unwrap().unwrap();
418            assert_eq!(p, 0);
419            assert_eq!(item, b"item0");
420            assert_eq!(queue.pending().await, 2);
421
422            let (p, item) = queue.dequeue().await.unwrap().unwrap();
423            assert_eq!(p, 1);
424            assert_eq!(item, b"item1");
425            assert_eq!(queue.pending().await, 1);
426
427            let (p, item) = queue.dequeue().await.unwrap().unwrap();
428            assert_eq!(p, 2);
429            assert_eq!(item, b"item2");
430            assert_eq!(queue.pending().await, 0);
431
432            // Queue still has unacked items
433            assert!(!queue.is_empty().await);
434            assert!(queue.dequeue().await.unwrap().is_none());
435        });
436    }
437
438    #[test_traced]
439    fn test_append_commit_batch() {
440        let executor = deterministic::Runner::default();
441        executor.start(|context| async move {
442            let cfg = test_config("test_batch", &context);
443            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
444                .await
445                .unwrap();
446
447            // Append multiple items, then commit once
448            for i in 0..5u8 {
449                queue.append(vec![i]).await.unwrap();
450            }
451            queue.commit().await.unwrap();
452            assert_eq!(queue.size().await, 5);
453
454            // Dequeue and verify order
455            for i in 0..5 {
456                let (pos, item) = queue.dequeue().await.unwrap().unwrap();
457                assert_eq!(pos, i);
458                assert_eq!(item, vec![i as u8]);
459            }
460
461            // Mix batch and single enqueue
462            for i in 5..8u8 {
463                queue.append(vec![i]).await.unwrap();
464            }
465            queue.commit().await.unwrap();
466            queue.enqueue(vec![8]).await.unwrap();
467            assert_eq!(queue.size().await, 9);
468
469            queue.ack_up_to(9).await.unwrap();
470            assert!(queue.is_empty().await);
471        });
472    }
473
474    #[test_traced]
475    fn test_append_commit_persistence() {
476        let executor = deterministic::Runner::default();
477        executor.start(|context| async move {
478            let cfg = test_config("test_batch_persist", &context);
479
480            {
481                let mut queue = Queue::<_, Vec<u8>>::init(context.child("first"), cfg.clone())
482                    .await
483                    .unwrap();
484                for i in 0..4u8 {
485                    queue.append(vec![i]).await.unwrap();
486                }
487                queue.commit().await.unwrap();
488                queue.sync().await.unwrap();
489            }
490
491            {
492                let mut queue = Queue::<_, Vec<u8>>::init(context.child("second"), cfg)
493                    .await
494                    .unwrap();
495                assert_eq!(queue.size().await, 4);
496                for i in 0..4 {
497                    let (pos, item) = queue.dequeue().await.unwrap().unwrap();
498                    assert_eq!(pos, i);
499                    assert_eq!(item, vec![i as u8]);
500                }
501            }
502        });
503    }
504
505    #[test_traced]
506    fn test_sequential_ack() {
507        let executor = deterministic::Runner::default();
508        executor.start(|context| async move {
509            let cfg = test_config("test_seq_ack", &context);
510            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
511                .await
512                .unwrap();
513
514            // Enqueue items
515            for i in 0..5u8 {
516                queue.enqueue(vec![i]).await.unwrap();
517            }
518
519            // Dequeue and ack sequentially
520            for i in 0..5 {
521                let (pos, _) = queue.dequeue().await.unwrap().unwrap();
522                assert_eq!(pos, i);
523                queue.ack(pos).await.unwrap();
524                assert_eq!(queue.ack_floor(), i + 1);
525            }
526
527            // All items acked
528            assert!(queue.is_empty().await);
529            assert_eq!(queue.ack_floor(), 5);
530        });
531    }
532
533    #[test_traced]
534    fn test_out_of_order_ack() {
535        let executor = deterministic::Runner::default();
536        executor.start(|context| async move {
537            let cfg = test_config("test_ooo_ack", &context);
538            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
539                .await
540                .unwrap();
541
542            // Enqueue items
543            for i in 0..5u8 {
544                queue.enqueue(vec![i]).await.unwrap();
545            }
546
547            // Dequeue all
548            for _ in 0..5 {
549                queue.dequeue().await.unwrap();
550            }
551
552            // Ack out of order: 2, 4, 1, 3, 0
553            queue.ack(2).await.unwrap();
554            assert_eq!(queue.ack_floor(), 0); // Floor doesn't move
555            assert!(queue.is_acked(2));
556
557            queue.ack(4).await.unwrap();
558            assert_eq!(queue.ack_floor(), 0);
559            assert!(queue.is_acked(4));
560
561            queue.ack(1).await.unwrap();
562            assert_eq!(queue.ack_floor(), 0);
563
564            queue.ack(3).await.unwrap();
565            assert_eq!(queue.ack_floor(), 0);
566
567            // Ack 0 - floor should advance to 5 (consuming 1,2,3,4)
568            queue.ack(0).await.unwrap();
569            assert_eq!(queue.ack_floor(), 5);
570            assert!(queue.is_empty().await);
571        });
572    }
573
574    #[test_traced]
575    fn test_ack_up_to() {
576        let executor = deterministic::Runner::default();
577        executor.start(|context| async move {
578            let cfg = test_config("test_ack_up_to", &context);
579            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
580                .await
581                .unwrap();
582
583            // Enqueue items
584            for i in 0..10u8 {
585                queue.enqueue(vec![i]).await.unwrap();
586            }
587
588            // Batch ack items 0-4
589            queue.ack_up_to(5).await.unwrap();
590            assert_eq!(queue.ack_floor(), 5);
591
592            // Items 0-4 should be acked
593            for i in 0..5 {
594                assert!(queue.is_acked(i));
595            }
596            // Items 5-9 should not be acked
597            for i in 5..10 {
598                assert!(!queue.is_acked(i));
599            }
600
601            // Dequeue should start at 5
602            let (p, _) = queue.dequeue().await.unwrap().unwrap();
603            assert_eq!(p, 5);
604        });
605    }
606
607    #[test_traced]
608    fn test_ack_up_to_with_existing_acks() {
609        let executor = deterministic::Runner::default();
610        executor.start(|context| async move {
611            let cfg = test_config("test_ack_up_to_existing", &context);
612            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
613                .await
614                .unwrap();
615
616            // Enqueue items
617            for i in 0..10u8 {
618                queue.enqueue(vec![i]).await.unwrap();
619            }
620
621            // Ack some items out of order first
622            queue.ack(7).await.unwrap();
623            queue.ack(8).await.unwrap();
624            assert_eq!(queue.acked_above_count(), 2);
625
626            // Batch ack up to 5
627            queue.ack_up_to(5).await.unwrap();
628            assert_eq!(queue.ack_floor(), 5);
629            assert_eq!(queue.acked_above_count(), 2);
630
631            // Now batch ack up to 9 - should consume the acked_above entries
632            queue.ack_up_to(9).await.unwrap();
633            assert_eq!(queue.ack_floor(), 9);
634            assert_eq!(queue.acked_above_count(), 0);
635        });
636    }
637
638    #[test_traced]
639    fn test_ack_up_to_coalesces_with_acked_above() {
640        let executor = deterministic::Runner::default();
641        executor.start(|context| async move {
642            let cfg = test_config("test_ack_up_to_coalesce", &context);
643            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
644                .await
645                .unwrap();
646
647            // Enqueue items
648            for i in 0..10u8 {
649                queue.enqueue(vec![i]).await.unwrap();
650            }
651
652            // Ack items 5, 6, 7 first
653            queue.ack(5).await.unwrap();
654            queue.ack(6).await.unwrap();
655            queue.ack(7).await.unwrap();
656            assert_eq!(queue.ack_floor(), 0);
657
658            // Batch ack up to 5 - should coalesce with 5, 6, 7
659            queue.ack_up_to(5).await.unwrap();
660            assert_eq!(queue.ack_floor(), 8); // Consumed 5, 6, 7
661        });
662    }
663
664    #[test_traced]
665    fn test_ack_up_to_errors() {
666        let executor = deterministic::Runner::default();
667        executor.start(|context| async move {
668            let cfg = test_config("test_ack_up_to_errors", &context);
669            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
670                .await
671                .unwrap();
672
673            queue.enqueue(b"item0".to_vec()).await.unwrap();
674            queue.enqueue(b"item1".to_vec()).await.unwrap();
675
676            // Can't ack_up_to beyond queue size
677            let err = queue.ack_up_to(5).await.unwrap_err();
678            assert!(matches!(err, Error::PositionOutOfRange(5, 2)));
679
680            // Can ack_up_to at queue size
681            queue.ack_up_to(2).await.unwrap();
682            assert_eq!(queue.ack_floor(), 2);
683
684            // Acking up_to at or below floor is a no-op
685            queue.ack_up_to(1).await.unwrap();
686            assert_eq!(queue.ack_floor(), 2);
687        });
688    }
689
690    #[test_traced]
691    fn test_dequeue_skips_acked() {
692        let executor = deterministic::Runner::default();
693        executor.start(|context| async move {
694            let cfg = test_config("test_skip_acked", &context);
695            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
696                .await
697                .unwrap();
698
699            // Enqueue items 0-4
700            for i in 0..5u8 {
701                queue.enqueue(vec![i]).await.unwrap();
702            }
703
704            // Ack items 1 and 3 before reading
705            queue.ack(1).await.unwrap();
706            queue.ack(3).await.unwrap();
707
708            // Dequeue should skip 1 and 3
709            let (p, item) = queue.dequeue().await.unwrap().unwrap();
710            assert_eq!(p, 0);
711            assert_eq!(item, vec![0]);
712
713            let (p, item) = queue.dequeue().await.unwrap().unwrap();
714            assert_eq!(p, 2); // Skipped 1
715            assert_eq!(item, vec![2]);
716
717            let (p, item) = queue.dequeue().await.unwrap().unwrap();
718            assert_eq!(p, 4); // Skipped 3
719            assert_eq!(item, vec![4]);
720
721            assert!(queue.dequeue().await.unwrap().is_none());
722        });
723    }
724
725    #[test_traced]
726    fn test_ack_errors() {
727        let executor = deterministic::Runner::default();
728        executor.start(|context| async move {
729            let cfg = test_config("test_ack_errors", &context);
730            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
731                .await
732                .unwrap();
733
734            queue.enqueue(b"item0".to_vec()).await.unwrap();
735            queue.enqueue(b"item1".to_vec()).await.unwrap();
736
737            // Can't ack position beyond queue size
738            let err = queue.ack(5).await.unwrap_err();
739            assert!(matches!(err, Error::PositionOutOfRange(5, 2)));
740
741            // Can ack unread items
742            queue.ack(1).await.unwrap();
743            assert!(queue.is_acked(1));
744
745            // Double ack is a no-op
746            queue.ack(1).await.unwrap();
747        });
748    }
749
750    #[test_traced]
751    fn test_prune() {
752        let executor = deterministic::Runner::default();
753        executor.start(|context| async move {
754            let cfg = test_config("test_prune", &context);
755            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
756                .await
757                .unwrap();
758
759            // Enqueue items (more than items_per_section to test pruning)
760            for i in 0..25u8 {
761                queue.enqueue(vec![i]).await.unwrap();
762            }
763            queue.sync().await.unwrap();
764
765            // Read and ack some items
766            for i in 0..15 {
767                queue.dequeue().await.unwrap();
768                queue.ack(i).await.unwrap();
769            }
770            assert_eq!(queue.ack_floor(), 15);
771
772            // Items 15+ should still be readable
773            let (p, item) = queue.dequeue().await.unwrap().unwrap();
774            assert_eq!(p, 15);
775            assert_eq!(item, vec![15]);
776        });
777    }
778
779    #[test_traced]
780    fn test_ack_across_sections() {
781        let executor = deterministic::Runner::default();
782        executor.start(|context| async move {
783            let cfg = test_config("test_multi_prune", &context);
784            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
785                .await
786                .unwrap();
787
788            // Enqueue many items across multiple sections (items_per_section = 10)
789            for i in 0..50u8 {
790                queue.enqueue(vec![i]).await.unwrap();
791            }
792            queue.sync().await.unwrap();
793
794            // First batch: ack items 0-14
795            for i in 0..15 {
796                queue.dequeue().await.unwrap();
797                queue.ack(i).await.unwrap();
798            }
799            assert_eq!(queue.ack_floor(), 15);
800
801            // Verify items 15+ still readable
802            let (p, item) = queue.dequeue().await.unwrap().unwrap();
803            assert_eq!(p, 15);
804            assert_eq!(item, vec![15]);
805
806            // Second batch: ack items 15-29
807            queue.ack(15).await.unwrap();
808            for i in 16..30 {
809                queue.dequeue().await.unwrap();
810                queue.ack(i).await.unwrap();
811            }
812            assert_eq!(queue.ack_floor(), 30);
813
814            // Verify items 30+ still readable
815            let (p, item) = queue.dequeue().await.unwrap().unwrap();
816            assert_eq!(p, 30);
817            assert_eq!(item, vec![30]);
818
819            // Third batch: ack remaining items
820            queue.ack(30).await.unwrap();
821            for i in 31..50 {
822                queue.dequeue().await.unwrap();
823                queue.ack(i).await.unwrap();
824            }
825            assert_eq!(queue.ack_floor(), 50);
826
827            // Queue should be empty now
828            assert!(queue.is_empty().await);
829            assert!(queue.dequeue().await.unwrap().is_none());
830        });
831    }
832
833    #[test_traced]
834    fn test_crash_recovery_replays_from_pruning_boundary() {
835        // On restart, ack_floor = pruning_boundary. Items not pruned are re-delivered.
836        let executor = deterministic::Runner::default();
837        executor.start(|context| async move {
838            let cfg = test_config("test_recovery_replay", &context);
839
840            // First session: enqueue items, ack some (but not enough to prune)
841            {
842                let mut queue = Queue::<_, Vec<u8>>::init(context.child("first"), cfg.clone())
843                    .await
844                    .unwrap();
845
846                for i in 0..5u8 {
847                    queue.enqueue(vec![i]).await.unwrap();
848                }
849
850                // Ack items 0, 1, 2 - but items_per_section=10, so no pruning
851                queue.ack(0).await.unwrap();
852                queue.ack(1).await.unwrap();
853                queue.ack(2).await.unwrap();
854                assert_eq!(queue.ack_floor(), 3);
855
856                queue.sync().await.unwrap();
857            }
858
859            // Second session: all items are re-delivered (no pruning occurred)
860            {
861                let mut queue = Queue::<_, Vec<u8>>::init(context.child("second"), cfg.clone())
862                    .await
863                    .unwrap();
864
865                // ack_floor = pruning_boundary = 0 (nothing was pruned)
866                assert_eq!(queue.ack_floor(), 0);
867
868                // All items re-delivered
869                for i in 0..5 {
870                    let (p, _) = queue.dequeue().await.unwrap().unwrap();
871                    assert_eq!(p, i);
872                }
873            }
874        });
875    }
876
877    #[test_traced]
878    fn test_crash_recovery_with_pruning() {
879        // Items pruned before crash are not re-delivered.
880        let executor = deterministic::Runner::default();
881        executor.start(|context| async move {
882            let cfg = test_config("test_recovery_pruned", &context);
883
884            // First session: enqueue many items, ack enough to trigger pruning
885            let expected_pruning_boundary = {
886                let mut queue = Queue::<_, Vec<u8>>::init(context.child("first"), cfg.clone())
887                    .await
888                    .unwrap();
889
890                // Enqueue items across multiple sections (items_per_section = 10)
891                for i in 0..25u8 {
892                    queue.enqueue(vec![i]).await.unwrap();
893                }
894
895                // Ack items 0-14 to advance floor past section 0
896                for i in 0..15 {
897                    queue.ack(i).await.unwrap();
898                }
899                assert_eq!(queue.ack_floor(), 15);
900
901                // Sync triggers pruning
902                queue.sync().await.unwrap();
903
904                // Verify pruning occurred
905                let pruning_boundary = queue.journal.bounds().await.start;
906                assert!(pruning_boundary > 0, "expected some pruning to occur");
907
908                pruning_boundary
909            };
910
911            // Second session: only non-pruned items are available
912            {
913                let mut queue = Queue::<_, Vec<u8>>::init(context.child("second"), cfg.clone())
914                    .await
915                    .unwrap();
916
917                // ack_floor = pruning_boundary (items 0-9 were pruned)
918                let pruning_boundary = queue.journal.bounds().await.start;
919                assert_eq!(queue.ack_floor(), pruning_boundary);
920                assert_eq!(pruning_boundary, expected_pruning_boundary);
921
922                // Items from pruning_boundary to 24 are re-delivered
923                for i in pruning_boundary..25 {
924                    let (p, item) = queue.dequeue().await.unwrap().unwrap();
925                    assert_eq!(p, i);
926                    assert_eq!(item, vec![i as u8]);
927                }
928
929                assert!(queue.dequeue().await.unwrap().is_none());
930            }
931        });
932    }
933
934    #[test_traced]
935    fn test_reset() {
936        let executor = deterministic::Runner::default();
937        executor.start(|context| async move {
938            let cfg = test_config("test_reset", &context);
939            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
940                .await
941                .unwrap();
942
943            // Enqueue items
944            for i in 0..5u8 {
945                queue.enqueue(vec![i]).await.unwrap();
946            }
947
948            // Read some
949            queue.dequeue().await.unwrap();
950            queue.dequeue().await.unwrap();
951            queue.dequeue().await.unwrap();
952            assert_eq!(queue.read_position(), 3);
953
954            // Reset without ack - should go back to 0
955            queue.reset();
956            assert_eq!(queue.read_position(), 0);
957
958            // Verify we can re-read
959            let (p, item) = queue.dequeue().await.unwrap().unwrap();
960            assert_eq!(p, 0);
961            assert_eq!(item, vec![0]);
962        });
963    }
964
965    #[test_traced]
966    fn test_reset_with_ack() {
967        let executor = deterministic::Runner::default();
968        executor.start(|context| async move {
969            let cfg = test_config("test_reset_ack", &context);
970            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
971                .await
972                .unwrap();
973
974            // Enqueue items
975            for i in 0..10u8 {
976                queue.enqueue(vec![i]).await.unwrap();
977            }
978
979            // Read and ack some
980            for i in 0..5 {
981                queue.dequeue().await.unwrap();
982                queue.ack(i).await.unwrap();
983            }
984            assert_eq!(queue.ack_floor(), 5);
985            assert_eq!(queue.read_position(), 5);
986
987            // Read a few more
988            queue.dequeue().await.unwrap();
989            queue.dequeue().await.unwrap();
990            assert_eq!(queue.read_position(), 7);
991
992            // Reset - should go back to ack floor
993            queue.reset();
994            assert_eq!(queue.read_position(), 5);
995
996            // Next dequeue should return item 5
997            let (p, item) = queue.dequeue().await.unwrap().unwrap();
998            assert_eq!(p, 5);
999            assert_eq!(item, vec![5]);
1000        });
1001    }
1002
1003    #[test_traced]
1004    fn test_empty_queue_operations() {
1005        let executor = deterministic::Runner::default();
1006        executor.start(|context| async move {
1007            let cfg = test_config("test_empty", &context);
1008            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
1009                .await
1010                .unwrap();
1011
1012            // Operations on empty queue
1013            assert!(queue.is_empty().await);
1014            assert!(queue.dequeue().await.unwrap().is_none());
1015            queue.sync().await.unwrap();
1016            queue.reset();
1017        });
1018    }
1019
1020    #[test_traced]
1021    fn test_persistence() {
1022        let executor = deterministic::Runner::default();
1023        executor.start(|context| async move {
1024            let cfg = test_config("test_persist", &context);
1025
1026            // First session
1027            {
1028                let mut queue = Queue::<_, Vec<u8>>::init(context.child("first"), cfg.clone())
1029                    .await
1030                    .unwrap();
1031
1032                queue.enqueue(b"item0".to_vec()).await.unwrap();
1033                queue.enqueue(b"item1".to_vec()).await.unwrap();
1034                queue.sync().await.unwrap();
1035            }
1036
1037            // Second session - data should persist
1038            {
1039                let mut queue = Queue::<_, Vec<u8>>::init(context.child("second"), cfg.clone())
1040                    .await
1041                    .unwrap();
1042
1043                assert_eq!(queue.size().await, 2);
1044
1045                let (_, item) = queue.dequeue().await.unwrap().unwrap();
1046                assert_eq!(item, b"item0");
1047
1048                let (_, item) = queue.dequeue().await.unwrap().unwrap();
1049                assert_eq!(item, b"item1");
1050            }
1051        });
1052    }
1053
1054    #[test_traced]
1055    fn test_large_queue_with_sparse_acks() {
1056        let executor = deterministic::Runner::default();
1057        executor.start(|context| async move {
1058            let cfg = test_config("test_sparse", &context);
1059            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
1060                .await
1061                .unwrap();
1062
1063            // Enqueue many items
1064            for i in 0..100u8 {
1065                queue.enqueue(vec![i]).await.unwrap();
1066            }
1067
1068            // Ack every 3rd item (sparse acking)
1069            for i in (0..100).step_by(3) {
1070                queue.ack(i).await.unwrap();
1071            }
1072
1073            // Dequeue should skip acked items
1074            let mut received = Vec::new();
1075            while let Some((pos, _)) = queue.dequeue().await.unwrap() {
1076                received.push(pos);
1077            }
1078
1079            // Should have received all items not divisible by 3
1080            let expected: Vec<u64> = (0..100).filter(|x| x % 3 != 0).collect();
1081            assert_eq!(received, expected);
1082        });
1083    }
1084
1085    #[test_traced]
1086    fn test_acked_above_coalescing() {
1087        let executor = deterministic::Runner::default();
1088        executor.start(|context| async move {
1089            let cfg = test_config("test_coalesce", &context);
1090            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
1091                .await
1092                .unwrap();
1093
1094            // Enqueue items
1095            for i in 0..10u8 {
1096                queue.enqueue(vec![i]).await.unwrap();
1097            }
1098
1099            // Ack items 1-8 (not 0)
1100            for i in 1..9 {
1101                queue.ack(i).await.unwrap();
1102            }
1103
1104            // Acked_above should have items 1-8
1105            assert_eq!(queue.ack_floor(), 0);
1106            assert!(queue.acked_above_count() > 0);
1107
1108            // Now ack 0 - floor should advance to 9, consuming all acked_above
1109            queue.ack(0).await.unwrap();
1110            assert_eq!(queue.ack_floor(), 9);
1111            assert_eq!(queue.acked_above_count(), 0);
1112        });
1113    }
1114
1115    #[test_traced]
1116    fn test_ack_up_to_past_read_pos() {
1117        let executor = deterministic::Runner::default();
1118        executor.start(|context| async move {
1119            let cfg = test_config("test_ack_up_to_past_read_pos", &context);
1120            let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
1121                .await
1122                .unwrap();
1123
1124            for i in 0..10u8 {
1125                queue.enqueue(vec![i]).await.unwrap();
1126            }
1127
1128            // Read only 3 items
1129            for _ in 0..3 {
1130                queue.dequeue().await.unwrap();
1131            }
1132            assert_eq!(queue.read_position(), 3);
1133
1134            // Batch ack past read position
1135            queue.ack_up_to(7).await.unwrap();
1136            assert_eq!(queue.ack_floor(), 7);
1137
1138            // Dequeue should skip 3-6 and return 7
1139            let (pos, item) = queue.dequeue().await.unwrap().unwrap();
1140            assert_eq!(pos, 7);
1141            assert_eq!(item, vec![7]);
1142        });
1143    }
1144
1145    #[test_traced]
1146    fn test_metrics() {
1147        let executor = deterministic::Runner::default();
1148        executor.start(|context| async move {
1149            let cfg = test_config("test-metrics", &context);
1150            let ctx = context.child("test_metrics");
1151            let mut queue = Queue::<_, Vec<u8>>::init(ctx, cfg).await.unwrap();
1152
1153            let encoded = context.encode();
1154            assert!(
1155                encoded.contains("test_metrics_tip 0"),
1156                "expected tip 0: {encoded}"
1157            );
1158            assert!(
1159                encoded.contains("test_metrics_floor 0"),
1160                "expected floor 0: {encoded}"
1161            );
1162            assert!(
1163                encoded.contains("test_metrics_next 0"),
1164                "expected next 0: {encoded}"
1165            );
1166
1167            // Append updates tip without enqueue
1168            queue.append(vec![0]).await.unwrap();
1169            let encoded = context.encode();
1170            assert!(
1171                encoded.contains("test_metrics_tip 1"),
1172                "expected tip 1: {encoded}"
1173            );
1174            queue.commit().await.unwrap();
1175
1176            // Enqueue updates tip further
1177            for i in 1..10u8 {
1178                queue.enqueue(vec![i]).await.unwrap();
1179            }
1180            let encoded = context.encode();
1181            assert!(
1182                encoded.contains("test_metrics_tip 10"),
1183                "expected tip 10: {encoded}"
1184            );
1185
1186            // Multiple dequeues advance next
1187            queue.dequeue().await.unwrap();
1188            queue.dequeue().await.unwrap();
1189            let encoded = context.encode();
1190            assert!(
1191                encoded.contains("test_metrics_next 2"),
1192                "expected next 2: {encoded}"
1193            );
1194
1195            // Sequential ack advances floor
1196            queue.ack(0).await.unwrap();
1197            queue.ack(1).await.unwrap();
1198            let encoded = context.encode();
1199            assert!(
1200                encoded.contains("test_metrics_floor 2"),
1201                "expected floor 2: {encoded}"
1202            );
1203
1204            // Out-of-order ack: floor stays until gap fills
1205            queue.ack(4).await.unwrap();
1206            queue.ack(6).await.unwrap();
1207            let encoded = context.encode();
1208            assert!(
1209                encoded.contains("test_metrics_floor 2"),
1210                "expected floor still 2: {encoded}"
1211            );
1212
1213            // Fill gap coalesces floor forward
1214            queue.ack(2).await.unwrap();
1215            queue.ack(3).await.unwrap();
1216            let encoded = context.encode();
1217            assert!(
1218                encoded.contains("test_metrics_floor 5"),
1219                "expected floor 5: {encoded}"
1220            );
1221
1222            // ack_up_to advances floor past sparse ack at 6
1223            queue.ack_up_to(8).await.unwrap();
1224            let encoded = context.encode();
1225            assert!(
1226                encoded.contains("test_metrics_floor 8"),
1227                "expected floor 8: {encoded}"
1228            );
1229
1230            // Ack remaining
1231            queue.ack(8).await.unwrap();
1232            queue.ack(9).await.unwrap();
1233            let encoded = context.encode();
1234            assert!(
1235                encoded.contains("test_metrics_floor 10"),
1236                "expected floor 10: {encoded}"
1237            );
1238
1239            // Reset brings next back to floor
1240            queue.reset();
1241            let encoded = context.encode();
1242            assert!(
1243                encoded.contains("test_metrics_next 10"),
1244                "expected next 10: {encoded}"
1245            );
1246        });
1247    }
1248
1249    #[test_traced]
1250    fn test_metrics_next_updates_on_fast_forward() {
1251        let executor = deterministic::Runner::default();
1252        executor.start(|context| async move {
1253            let cfg = test_config("test-ff", &context);
1254            let ctx = context.child("test_ff");
1255            let mut queue = Queue::<_, Vec<u8>>::init(ctx, cfg).await.unwrap();
1256
1257            // Enqueue 3 items, dequeue and ack only the first
1258            for i in 0..3u8 {
1259                queue.enqueue(vec![i]).await.unwrap();
1260            }
1261            let (pos, _) = queue.dequeue().await.unwrap().unwrap();
1262            queue.ack(pos).await.unwrap();
1263
1264            let encoded = context.encode();
1265            assert!(
1266                encoded.contains("test_ff_next 1"),
1267                "expected next 1: {encoded}"
1268            );
1269
1270            // Ack remaining items out-of-order to advance floor to 3
1271            queue.ack(2).await.unwrap();
1272            queue.ack(1).await.unwrap();
1273            assert_eq!(queue.ack_floor(), 3);
1274
1275            // next metric is still 1 (no dequeue yet)
1276            let encoded = context.encode();
1277            assert!(
1278                encoded.contains("test_ff_next 1"),
1279                "expected next still 1: {encoded}"
1280            );
1281
1282            // Dequeue returns None but fast-forwards read_pos to ack_floor
1283            assert!(queue.dequeue().await.unwrap().is_none());
1284            let encoded = context.encode();
1285            assert!(
1286                encoded.contains("test_ff_next 3"),
1287                "expected next 3 after fast-forward: {encoded}"
1288            );
1289        });
1290    }
1291}