Skip to main content

commonware_storage/queue/
storage.rs

1//! Queue storage implementation.
2
3use super::{metrics, Error};
4use crate::{
5    journal::contiguous::{variable, Reader as _},
6    rmap::RMap,
7    Persistable,
8};
9use commonware_codec::CodecShared;
10use commonware_runtime::{
11    buffer::paged::CacheRef, telemetry::metrics::status::GaugeExt, Clock, Metrics, Storage,
12};
13use std::num::{NonZeroU64, NonZeroUsize};
14use tracing::debug;
15
16/// Configuration for [Queue].
17#[derive(Clone)]
18pub struct Config<C> {
19    /// The storage partition name for the queue's journal.
20    pub partition: String,
21
22    /// The number of items to store in each journal section.
23    ///
24    /// Larger values reduce file overhead but increase minimum pruning granularity.
25    /// Once set, this value cannot be changed across restarts.
26    pub items_per_section: NonZeroU64,
27
28    /// Optional zstd compression level for stored items.
29    ///
30    /// If set, items will be compressed before storage. Higher values provide
31    /// better compression but use more CPU.
32    pub compression: Option<u8>,
33
34    /// Codec configuration for encoding/decoding items.
35    pub codec_config: C,
36
37    /// Page cache for buffering reads from the underlying journal.
38    pub page_cache: CacheRef,
39
40    /// Write buffer size for each section.
41    pub write_buffer: NonZeroUsize,
42}
43
44/// A durable, at-least-once delivery queue with per-item acknowledgment.
45///
46/// Items are durably stored in a journal and survive crashes. The reader must
47/// acknowledge each item individually after processing. Items can be acknowledged
48/// out of order, enabling parallel processing.
49///
50/// # Operations
51///
52/// - [append](Self::append) / [commit](Self::commit): Write items to the journal
53///   buffer, then persist. Items are readable immediately after append (before commit),
54///   but are lost on restart if not committed.
55/// - [enqueue](Self::enqueue): Append + commit in one step; the item is durable before return.
56/// - [dequeue](Self::dequeue): Return the next unacked item in FIFO order.
57/// - [ack](Self::ack) / [ack_up_to](Self::ack_up_to): Mark items as processed (in-memory only).
58/// - [sync](Self::sync): Commit, then prune completed sections below the ack floor.
59///
60/// # Acknowledgment
61///
62/// Acks are tracked in-memory with an `ack_floor` (all positions below are acked)
63/// plus an [RMap] of acked positions above the floor. When items are acked
64/// contiguously from the floor, the floor advances automatically.
65///
66/// Acks are **not** persisted. The durable equivalent is the journal's pruning
67/// boundary, advanced by [sync](Self::sync). On restart, all non-pruned
68/// items are re-delivered regardless of prior ack state.
69///
70/// # Crash Recovery
71///
72/// On restart, `ack_floor` is set to the journal's pruning boundary.
73/// Items that were pruned are gone; everything else is re-delivered.
74/// Applications must handle duplicates (idempotent processing).
75pub struct Queue<E: Clock + Storage + Metrics, V: CodecShared> {
76    /// The underlying journal storing queue items.
77    journal: variable::Journal<E, V>,
78
79    /// Position of the next item to dequeue.
80    ///
81    /// Invariant: `read_pos <= journal.size()`. Note that `ack_up_to` can advance
82    /// `ack_floor` past `read_pos`; in this case, `dequeue` skips the already-acked items.
83    read_pos: u64,
84
85    /// All items at positions < ack_floor are considered acknowledged.
86    ///
87    /// On restart, this is initialized to `journal.bounds().start`.
88    ack_floor: u64,
89
90    /// Ranges of acknowledged items at positions >= ack_floor (in-memory only).
91    ///
92    /// When an item at position == ack_floor is acked, the floor advances
93    /// and any contiguous acked items are consumed. Lost on restart.
94    acked_above: RMap,
95
96    /// Metrics for monitoring queue state.
97    metrics: metrics::Metrics,
98}
99
100impl<E: Clock + Storage + Metrics, V: CodecShared> Queue<E, V> {
101    /// Initialize a queue from storage.
102    ///
103    /// On first initialization, creates an empty queue. On restart, begins reading
104    /// from the journal's pruning boundary (providing at-least-once delivery for
105    /// all non-pruned items).
106    ///
107    /// # Errors
108    ///
109    /// Returns an error if the underlying journal cannot be initialized.
110    pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
111        // Initialize metrics before creating sub-contexts
112        let metrics = metrics::Metrics::init(&context);
113
114        let journal = variable::Journal::init(
115            context.with_label("journal"),
116            variable::Config {
117                partition: cfg.partition,
118                items_per_section: cfg.items_per_section,
119                compression: cfg.compression,
120                codec_config: cfg.codec_config,
121                page_cache: cfg.page_cache,
122                write_buffer: cfg.write_buffer,
123            },
124        )
125        .await?;
126
127        // On restart, ack_floor is the pruning boundary (items below are deleted).
128        // acked_above is empty (in-memory state lost on restart).
129        let bounds = journal.reader().await.bounds();
130        let acked_above = RMap::new();
131
132        debug!(floor = bounds.start, size = bounds.end, "queue initialized");
133
134        // Set initial metric values
135        let _ = metrics.tip.try_set(bounds.end);
136        let _ = metrics.floor.try_set(bounds.start);
137        let _ = metrics.next.try_set(bounds.start);
138
139        Ok(Self {
140            journal,
141            read_pos: bounds.start,
142            ack_floor: bounds.start,
143            acked_above,
144            metrics,
145        })
146    }
147
148    /// Returns whether a specific position has been acknowledged.
149    pub fn is_acked(&self, position: u64) -> bool {
150        position < self.ack_floor || self.acked_above.get(&position).is_some()
151    }
152
153    /// Append an item without persisting. Call [Self::commit] or [Self::sync]
154    /// afterwards to make it durable. The item is readable immediately but
155    /// is not guaranteed to survive a crash until committed or the journal
156    /// auto-syncs at a section boundary (see [`variable::Journal`] invariant 1).
157    ///
158    /// # Errors
159    ///
160    /// Returns an error if the underlying storage operation fails.
161    pub async fn append(&mut self, item: V) -> Result<u64, Error> {
162        let pos = self.journal.append(&item).await?;
163        let _ = self.metrics.tip.try_set(pos + 1);
164        debug!(pos, "appended item");
165        Ok(pos)
166    }
167
168    /// Append and commit an item in one step, returning its position.
169    /// The item is durable before this method returns.
170    ///
171    /// # Errors
172    ///
173    /// Returns an error if the underlying storage operation fails.
174    pub async fn enqueue(&mut self, item: V) -> Result<u64, Error> {
175        let pos = self.append(item).await?;
176        self.commit().await?;
177        Ok(pos)
178    }
179
180    /// Dequeue the next unacknowledged item, returning its position and value.
181    /// Returns `None` when all items have been read or acknowledged.
182    /// Already-acked items are skipped automatically.
183    ///
184    /// # Errors
185    ///
186    /// Returns an error if the underlying storage operation fails.
187    pub async fn dequeue(&mut self) -> Result<Option<(u64, V)>, Error> {
188        let reader = self.journal.reader().await;
189        let size = reader.bounds().end;
190
191        // Fast-forward above ack floor
192        if self.read_pos < self.ack_floor {
193            self.read_pos = self.ack_floor;
194        }
195
196        // Fast-forward past the ack range containing read_pos (if any).
197        if let Some((_, end)) = self.acked_above.get(&self.read_pos) {
198            self.read_pos = end.saturating_add(1);
199        }
200
201        // If the read position is greater than the size of the journal, return None.
202        let _ = self.metrics.next.try_set(self.read_pos);
203        if self.read_pos >= size {
204            return Ok(None);
205        }
206
207        let item = reader.read(self.read_pos).await?;
208        let pos = self.read_pos;
209        self.read_pos += 1;
210        let _ = self.metrics.next.try_set(self.read_pos);
211        debug!(position = pos, "dequeued item");
212        Ok(Some((pos, item)))
213    }
214
215    /// Mark the item at `position` as processed (in-memory only).
216    /// The item will be skipped on subsequent dequeues. If this creates a
217    /// contiguous run from the ack floor, the floor advances automatically.
218    ///
219    /// # Errors
220    ///
221    /// Returns [Error::PositionOutOfRange] if `position >= queue size`.
222    pub async fn ack(&mut self, position: u64) -> Result<(), Error> {
223        let size = self.journal.size().await;
224        if position >= size {
225            return Err(Error::PositionOutOfRange(position, size));
226        }
227
228        // Already acked (below floor)
229        if position < self.ack_floor {
230            return Ok(());
231        }
232
233        // Already acked (above floor)
234        if self.acked_above.get(&position).is_some() {
235            return Ok(());
236        }
237
238        // Check if we can advance the floor
239        if position == self.ack_floor {
240            // Advance floor, consuming any contiguous acked items
241            let next = position + 1;
242            let final_floor = match self.acked_above.get(&next) {
243                Some((_, end)) => end + 1,
244                None => next,
245            };
246            self.acked_above.remove(next, final_floor - 1);
247            self.ack_floor = final_floor;
248            let _ = self.metrics.floor.try_set(self.ack_floor);
249            debug!(floor = self.ack_floor, "advanced ack floor");
250        } else {
251            // Floor is not advancing, so add to acked_above
252            self.acked_above.insert(position);
253            debug!(position, "acked item above floor");
254        }
255        Ok(())
256    }
257
258    /// Acknowledge all items in `[ack_floor, up_to)` by advancing the floor
259    /// directly. More efficient than calling [Self::ack] in a loop.
260    ///
261    /// # Errors
262    ///
263    /// Returns [Error::PositionOutOfRange] if `up_to > queue size`.
264    pub async fn ack_up_to(&mut self, up_to: u64) -> Result<(), Error> {
265        let size = self.journal.size().await;
266        if up_to > size {
267            return Err(Error::PositionOutOfRange(up_to, size));
268        }
269
270        // Nothing to do if up_to is at or below current floor
271        if up_to <= self.ack_floor {
272            return Ok(());
273        }
274
275        // Determine final floor: either up_to, or past any contiguous acked range at up_to
276        let final_floor = match self.acked_above.get(&up_to) {
277            Some((_, end)) => end + 1,
278            None => up_to,
279        };
280
281        // Remove all entries covered by the new floor and advance
282        self.acked_above.remove(self.ack_floor, final_floor - 1);
283        self.ack_floor = final_floor;
284        let _ = self.metrics.floor.try_set(self.ack_floor);
285        debug!(floor = self.ack_floor, "batch acked up to");
286        Ok(())
287    }
288
289    /// Returns the current read position.
290    ///
291    /// This is the position of the next item that will be checked by [Queue::dequeue].
292    pub const fn read_position(&self) -> u64 {
293        self.read_pos
294    }
295
296    /// Returns the current ack floor.
297    ///
298    /// All items at positions less than this value are considered acknowledged.
299    pub const fn ack_floor(&self) -> u64 {
300        self.ack_floor
301    }
302
303    /// Returns the total number of items that have been enqueued.
304    ///
305    /// This count is not affected by pruning. It represents the position that the
306    /// next enqueued item will receive.
307    pub async fn size(&self) -> u64 {
308        self.journal.size().await
309    }
310
311    /// Returns whether all enqueued items have been acknowledged.
312    pub async fn is_empty(&self) -> bool {
313        // If acked_above is non-empty, there's a gap at ack_floor (otherwise floor
314        // would have advanced). So all items acked implies ack_floor == size.
315        self.ack_floor >= self.journal.size().await
316    }
317
318    /// Reset the read position to the ack floor so [Self::dequeue] re-delivers
319    /// all unacknowledged items from the beginning.
320    pub fn reset(&mut self) {
321        let old_pos = self.read_pos;
322        self.read_pos = self.ack_floor;
323        let _ = self.metrics.next.try_set(self.read_pos);
324        debug!(
325            old_read_pos = old_pos,
326            new_read_pos = self.read_pos,
327            "reset read position"
328        );
329    }
330
331    /// Returns the number of items not yet read (test-only).
332    #[cfg(test)]
333    pub(crate) async fn pending(&self) -> u64 {
334        self.journal.size().await.saturating_sub(self.read_pos)
335    }
336
337    /// Returns the count of acknowledged items above the ack floor (test-only).
338    #[cfg(test)]
339    pub(crate) fn acked_above_count(&self) -> usize {
340        self.acked_above
341            .iter()
342            .map(|(&s, &e)| (e - s + 1) as usize)
343            .sum()
344    }
345}
346
347impl<E: Clock + Storage + Metrics + Send, V: CodecShared + Send> Persistable for Queue<E, V> {
348    type Error = Error;
349
350    async fn commit(&self) -> Result<(), Error> {
351        self.journal.commit().await?;
352        Ok(())
353    }
354
355    async fn sync(&self) -> Result<(), Error> {
356        self.journal.sync().await?;
357        self.journal.prune(self.ack_floor).await?;
358        Ok(())
359    }
360
361    async fn destroy(self) -> Result<(), Error> {
362        self.journal.destroy().await?;
363        Ok(())
364    }
365}
366
367#[cfg(test)]
368mod tests {
369    use super::*;
370    use commonware_codec::RangeCfg;
371    use commonware_macros::test_traced;
372    use commonware_runtime::{buffer::paged::CacheRef, deterministic, BufferPooler, Runner};
373    use commonware_utils::{NZUsize, NZU16, NZU64};
374    use std::num::NonZeroU16;
375
376    const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
377    const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
378
379    fn test_config(partition: &str, pooler: &impl BufferPooler) -> Config<(RangeCfg<usize>, ())> {
380        Config {
381            partition: partition.into(),
382            items_per_section: NZU64!(10),
383            compression: None,
384            codec_config: ((0..).into(), ()),
385            page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
386            write_buffer: NZUsize!(4096),
387        }
388    }
389
390    #[test_traced]
391    fn test_basic_enqueue_dequeue() {
392        let executor = deterministic::Runner::default();
393        executor.start(|context| async move {
394            let cfg = test_config("test_basic", &context);
395            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
396                .await
397                .unwrap();
398
399            // Queue should be empty initially
400            assert!(queue.is_empty().await);
401            assert_eq!(queue.pending().await, 0);
402            assert_eq!(queue.size().await, 0);
403
404            // Enqueue items
405            let pos0 = queue.enqueue(b"item0".to_vec()).await.unwrap();
406            let pos1 = queue.enqueue(b"item1".to_vec()).await.unwrap();
407            let pos2 = queue.enqueue(b"item2".to_vec()).await.unwrap();
408
409            assert_eq!(pos0, 0);
410            assert_eq!(pos1, 1);
411            assert_eq!(pos2, 2);
412            assert_eq!(queue.size().await, 3);
413            assert_eq!(queue.pending().await, 3);
414            assert!(!queue.is_empty().await);
415
416            // Dequeue items
417            let (p, item) = queue.dequeue().await.unwrap().unwrap();
418            assert_eq!(p, 0);
419            assert_eq!(item, b"item0");
420            assert_eq!(queue.pending().await, 2);
421
422            let (p, item) = queue.dequeue().await.unwrap().unwrap();
423            assert_eq!(p, 1);
424            assert_eq!(item, b"item1");
425            assert_eq!(queue.pending().await, 1);
426
427            let (p, item) = queue.dequeue().await.unwrap().unwrap();
428            assert_eq!(p, 2);
429            assert_eq!(item, b"item2");
430            assert_eq!(queue.pending().await, 0);
431
432            // Queue still has unacked items
433            assert!(!queue.is_empty().await);
434            assert!(queue.dequeue().await.unwrap().is_none());
435        });
436    }
437
438    #[test_traced]
439    fn test_append_commit_batch() {
440        let executor = deterministic::Runner::default();
441        executor.start(|context| async move {
442            let cfg = test_config("test_batch", &context);
443            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
444                .await
445                .unwrap();
446
447            // Append multiple items, then commit once
448            for i in 0..5u8 {
449                queue.append(vec![i]).await.unwrap();
450            }
451            queue.commit().await.unwrap();
452            assert_eq!(queue.size().await, 5);
453
454            // Dequeue and verify order
455            for i in 0..5 {
456                let (pos, item) = queue.dequeue().await.unwrap().unwrap();
457                assert_eq!(pos, i);
458                assert_eq!(item, vec![i as u8]);
459            }
460
461            // Mix batch and single enqueue
462            for i in 5..8u8 {
463                queue.append(vec![i]).await.unwrap();
464            }
465            queue.commit().await.unwrap();
466            queue.enqueue(vec![8]).await.unwrap();
467            assert_eq!(queue.size().await, 9);
468
469            queue.ack_up_to(9).await.unwrap();
470            assert!(queue.is_empty().await);
471        });
472    }
473
474    #[test_traced]
475    fn test_append_commit_persistence() {
476        let executor = deterministic::Runner::default();
477        executor.start(|context| async move {
478            let cfg = test_config("test_batch_persist", &context);
479
480            {
481                let mut queue = Queue::<_, Vec<u8>>::init(context.with_label("first"), cfg.clone())
482                    .await
483                    .unwrap();
484                for i in 0..4u8 {
485                    queue.append(vec![i]).await.unwrap();
486                }
487                queue.commit().await.unwrap();
488                queue.sync().await.unwrap();
489            }
490
491            {
492                let mut queue = Queue::<_, Vec<u8>>::init(context.with_label("second"), cfg)
493                    .await
494                    .unwrap();
495                assert_eq!(queue.size().await, 4);
496                for i in 0..4 {
497                    let (pos, item) = queue.dequeue().await.unwrap().unwrap();
498                    assert_eq!(pos, i);
499                    assert_eq!(item, vec![i as u8]);
500                }
501            }
502        });
503    }
504
505    #[test_traced]
506    fn test_sequential_ack() {
507        let executor = deterministic::Runner::default();
508        executor.start(|context| async move {
509            let cfg = test_config("test_seq_ack", &context);
510            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
511                .await
512                .unwrap();
513
514            // Enqueue items
515            for i in 0..5u8 {
516                queue.enqueue(vec![i]).await.unwrap();
517            }
518
519            // Dequeue and ack sequentially
520            for i in 0..5 {
521                let (pos, _) = queue.dequeue().await.unwrap().unwrap();
522                assert_eq!(pos, i);
523                queue.ack(pos).await.unwrap();
524                assert_eq!(queue.ack_floor(), i + 1);
525            }
526
527            // All items acked
528            assert!(queue.is_empty().await);
529            assert_eq!(queue.ack_floor(), 5);
530        });
531    }
532
533    #[test_traced]
534    fn test_out_of_order_ack() {
535        let executor = deterministic::Runner::default();
536        executor.start(|context| async move {
537            let cfg = test_config("test_ooo_ack", &context);
538            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
539                .await
540                .unwrap();
541
542            // Enqueue items
543            for i in 0..5u8 {
544                queue.enqueue(vec![i]).await.unwrap();
545            }
546
547            // Dequeue all
548            for _ in 0..5 {
549                queue.dequeue().await.unwrap();
550            }
551
552            // Ack out of order: 2, 4, 1, 3, 0
553            queue.ack(2).await.unwrap();
554            assert_eq!(queue.ack_floor(), 0); // Floor doesn't move
555            assert!(queue.is_acked(2));
556
557            queue.ack(4).await.unwrap();
558            assert_eq!(queue.ack_floor(), 0);
559            assert!(queue.is_acked(4));
560
561            queue.ack(1).await.unwrap();
562            assert_eq!(queue.ack_floor(), 0);
563
564            queue.ack(3).await.unwrap();
565            assert_eq!(queue.ack_floor(), 0);
566
567            // Ack 0 - floor should advance to 5 (consuming 1,2,3,4)
568            queue.ack(0).await.unwrap();
569            assert_eq!(queue.ack_floor(), 5);
570            assert!(queue.is_empty().await);
571        });
572    }
573
574    #[test_traced]
575    fn test_ack_up_to() {
576        let executor = deterministic::Runner::default();
577        executor.start(|context| async move {
578            let cfg = test_config("test_ack_up_to", &context);
579            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
580                .await
581                .unwrap();
582
583            // Enqueue items
584            for i in 0..10u8 {
585                queue.enqueue(vec![i]).await.unwrap();
586            }
587
588            // Batch ack items 0-4
589            queue.ack_up_to(5).await.unwrap();
590            assert_eq!(queue.ack_floor(), 5);
591
592            // Items 0-4 should be acked
593            for i in 0..5 {
594                assert!(queue.is_acked(i));
595            }
596            // Items 5-9 should not be acked
597            for i in 5..10 {
598                assert!(!queue.is_acked(i));
599            }
600
601            // Dequeue should start at 5
602            let (p, _) = queue.dequeue().await.unwrap().unwrap();
603            assert_eq!(p, 5);
604        });
605    }
606
607    #[test_traced]
608    fn test_ack_up_to_with_existing_acks() {
609        let executor = deterministic::Runner::default();
610        executor.start(|context| async move {
611            let cfg = test_config("test_ack_up_to_existing", &context);
612            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
613                .await
614                .unwrap();
615
616            // Enqueue items
617            for i in 0..10u8 {
618                queue.enqueue(vec![i]).await.unwrap();
619            }
620
621            // Ack some items out of order first
622            queue.ack(7).await.unwrap();
623            queue.ack(8).await.unwrap();
624            assert_eq!(queue.acked_above_count(), 2);
625
626            // Batch ack up to 5
627            queue.ack_up_to(5).await.unwrap();
628            assert_eq!(queue.ack_floor(), 5);
629            assert_eq!(queue.acked_above_count(), 2);
630
631            // Now batch ack up to 9 - should consume the acked_above entries
632            queue.ack_up_to(9).await.unwrap();
633            assert_eq!(queue.ack_floor(), 9);
634            assert_eq!(queue.acked_above_count(), 0);
635        });
636    }
637
638    #[test_traced]
639    fn test_ack_up_to_coalesces_with_acked_above() {
640        let executor = deterministic::Runner::default();
641        executor.start(|context| async move {
642            let cfg = test_config("test_ack_up_to_coalesce", &context);
643            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
644                .await
645                .unwrap();
646
647            // Enqueue items
648            for i in 0..10u8 {
649                queue.enqueue(vec![i]).await.unwrap();
650            }
651
652            // Ack items 5, 6, 7 first
653            queue.ack(5).await.unwrap();
654            queue.ack(6).await.unwrap();
655            queue.ack(7).await.unwrap();
656            assert_eq!(queue.ack_floor(), 0);
657
658            // Batch ack up to 5 - should coalesce with 5, 6, 7
659            queue.ack_up_to(5).await.unwrap();
660            assert_eq!(queue.ack_floor(), 8); // Consumed 5, 6, 7
661        });
662    }
663
664    #[test_traced]
665    fn test_ack_up_to_errors() {
666        let executor = deterministic::Runner::default();
667        executor.start(|context| async move {
668            let cfg = test_config("test_ack_up_to_errors", &context);
669            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
670                .await
671                .unwrap();
672
673            queue.enqueue(b"item0".to_vec()).await.unwrap();
674            queue.enqueue(b"item1".to_vec()).await.unwrap();
675
676            // Can't ack_up_to beyond queue size
677            let err = queue.ack_up_to(5).await.unwrap_err();
678            assert!(matches!(err, Error::PositionOutOfRange(5, 2)));
679
680            // Can ack_up_to at queue size
681            queue.ack_up_to(2).await.unwrap();
682            assert_eq!(queue.ack_floor(), 2);
683
684            // Acking up_to at or below floor is a no-op
685            queue.ack_up_to(1).await.unwrap();
686            assert_eq!(queue.ack_floor(), 2);
687        });
688    }
689
690    #[test_traced]
691    fn test_dequeue_skips_acked() {
692        let executor = deterministic::Runner::default();
693        executor.start(|context| async move {
694            let cfg = test_config("test_skip_acked", &context);
695            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
696                .await
697                .unwrap();
698
699            // Enqueue items 0-4
700            for i in 0..5u8 {
701                queue.enqueue(vec![i]).await.unwrap();
702            }
703
704            // Ack items 1 and 3 before reading
705            queue.ack(1).await.unwrap();
706            queue.ack(3).await.unwrap();
707
708            // Dequeue should skip 1 and 3
709            let (p, item) = queue.dequeue().await.unwrap().unwrap();
710            assert_eq!(p, 0);
711            assert_eq!(item, vec![0]);
712
713            let (p, item) = queue.dequeue().await.unwrap().unwrap();
714            assert_eq!(p, 2); // Skipped 1
715            assert_eq!(item, vec![2]);
716
717            let (p, item) = queue.dequeue().await.unwrap().unwrap();
718            assert_eq!(p, 4); // Skipped 3
719            assert_eq!(item, vec![4]);
720
721            assert!(queue.dequeue().await.unwrap().is_none());
722        });
723    }
724
725    #[test_traced]
726    fn test_ack_errors() {
727        let executor = deterministic::Runner::default();
728        executor.start(|context| async move {
729            let cfg = test_config("test_ack_errors", &context);
730            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
731                .await
732                .unwrap();
733
734            queue.enqueue(b"item0".to_vec()).await.unwrap();
735            queue.enqueue(b"item1".to_vec()).await.unwrap();
736
737            // Can't ack position beyond queue size
738            let err = queue.ack(5).await.unwrap_err();
739            assert!(matches!(err, Error::PositionOutOfRange(5, 2)));
740
741            // Can ack unread items
742            queue.ack(1).await.unwrap();
743            assert!(queue.is_acked(1));
744
745            // Double ack is a no-op
746            queue.ack(1).await.unwrap();
747        });
748    }
749
750    #[test_traced]
751    fn test_prune() {
752        let executor = deterministic::Runner::default();
753        executor.start(|context| async move {
754            let cfg = test_config("test_prune", &context);
755            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
756                .await
757                .unwrap();
758
759            // Enqueue items (more than items_per_section to test pruning)
760            for i in 0..25u8 {
761                queue.enqueue(vec![i]).await.unwrap();
762            }
763            queue.sync().await.unwrap();
764
765            // Read and ack some items
766            for i in 0..15 {
767                queue.dequeue().await.unwrap();
768                queue.ack(i).await.unwrap();
769            }
770            assert_eq!(queue.ack_floor(), 15);
771
772            // Items 15+ should still be readable
773            let (p, item) = queue.dequeue().await.unwrap().unwrap();
774            assert_eq!(p, 15);
775            assert_eq!(item, vec![15]);
776        });
777    }
778
779    #[test_traced]
780    fn test_ack_across_sections() {
781        let executor = deterministic::Runner::default();
782        executor.start(|context| async move {
783            let cfg = test_config("test_multi_prune", &context);
784            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
785                .await
786                .unwrap();
787
788            // Enqueue many items across multiple sections (items_per_section = 10)
789            for i in 0..50u8 {
790                queue.enqueue(vec![i]).await.unwrap();
791            }
792            queue.sync().await.unwrap();
793
794            // First batch: ack items 0-14
795            for i in 0..15 {
796                queue.dequeue().await.unwrap();
797                queue.ack(i).await.unwrap();
798            }
799            assert_eq!(queue.ack_floor(), 15);
800
801            // Verify items 15+ still readable
802            let (p, item) = queue.dequeue().await.unwrap().unwrap();
803            assert_eq!(p, 15);
804            assert_eq!(item, vec![15]);
805
806            // Second batch: ack items 15-29
807            queue.ack(15).await.unwrap();
808            for i in 16..30 {
809                queue.dequeue().await.unwrap();
810                queue.ack(i).await.unwrap();
811            }
812            assert_eq!(queue.ack_floor(), 30);
813
814            // Verify items 30+ still readable
815            let (p, item) = queue.dequeue().await.unwrap().unwrap();
816            assert_eq!(p, 30);
817            assert_eq!(item, vec![30]);
818
819            // Third batch: ack remaining items
820            queue.ack(30).await.unwrap();
821            for i in 31..50 {
822                queue.dequeue().await.unwrap();
823                queue.ack(i).await.unwrap();
824            }
825            assert_eq!(queue.ack_floor(), 50);
826
827            // Queue should be empty now
828            assert!(queue.is_empty().await);
829            assert!(queue.dequeue().await.unwrap().is_none());
830        });
831    }
832
833    #[test_traced]
834    fn test_crash_recovery_replays_from_pruning_boundary() {
835        // On restart, ack_floor = pruning_boundary. Items not pruned are re-delivered.
836        let executor = deterministic::Runner::default();
837        executor.start(|context| async move {
838            let cfg = test_config("test_recovery_replay", &context);
839
840            // First session: enqueue items, ack some (but not enough to prune)
841            {
842                let mut queue = Queue::<_, Vec<u8>>::init(context.with_label("first"), cfg.clone())
843                    .await
844                    .unwrap();
845
846                for i in 0..5u8 {
847                    queue.enqueue(vec![i]).await.unwrap();
848                }
849
850                // Ack items 0, 1, 2 - but items_per_section=10, so no pruning
851                queue.ack(0).await.unwrap();
852                queue.ack(1).await.unwrap();
853                queue.ack(2).await.unwrap();
854                assert_eq!(queue.ack_floor(), 3);
855
856                queue.sync().await.unwrap();
857            }
858
859            // Second session: all items are re-delivered (no pruning occurred)
860            {
861                let mut queue =
862                    Queue::<_, Vec<u8>>::init(context.with_label("second"), cfg.clone())
863                        .await
864                        .unwrap();
865
866                // ack_floor = pruning_boundary = 0 (nothing was pruned)
867                assert_eq!(queue.ack_floor(), 0);
868
869                // All items re-delivered
870                for i in 0..5 {
871                    let (p, _) = queue.dequeue().await.unwrap().unwrap();
872                    assert_eq!(p, i);
873                }
874            }
875        });
876    }
877
878    #[test_traced]
879    fn test_crash_recovery_with_pruning() {
880        // Items pruned before crash are not re-delivered.
881        let executor = deterministic::Runner::default();
882        executor.start(|context| async move {
883            let cfg = test_config("test_recovery_pruned", &context);
884
885            // First session: enqueue many items, ack enough to trigger pruning
886            let expected_pruning_boundary = {
887                let mut queue = Queue::<_, Vec<u8>>::init(context.with_label("first"), cfg.clone())
888                    .await
889                    .unwrap();
890
891                // Enqueue items across multiple sections (items_per_section = 10)
892                for i in 0..25u8 {
893                    queue.enqueue(vec![i]).await.unwrap();
894                }
895
896                // Ack items 0-14 to advance floor past section 0
897                for i in 0..15 {
898                    queue.ack(i).await.unwrap();
899                }
900                assert_eq!(queue.ack_floor(), 15);
901
902                // Sync triggers pruning
903                queue.sync().await.unwrap();
904
905                // Verify pruning occurred
906                let pruning_boundary = queue.journal.bounds().await.start;
907                assert!(pruning_boundary > 0, "expected some pruning to occur");
908
909                pruning_boundary
910            };
911
912            // Second session: only non-pruned items are available
913            {
914                let mut queue =
915                    Queue::<_, Vec<u8>>::init(context.with_label("second"), cfg.clone())
916                        .await
917                        .unwrap();
918
919                // ack_floor = pruning_boundary (items 0-9 were pruned)
920                let pruning_boundary = queue.journal.bounds().await.start;
921                assert_eq!(queue.ack_floor(), pruning_boundary);
922                assert_eq!(pruning_boundary, expected_pruning_boundary);
923
924                // Items from pruning_boundary to 24 are re-delivered
925                for i in pruning_boundary..25 {
926                    let (p, item) = queue.dequeue().await.unwrap().unwrap();
927                    assert_eq!(p, i);
928                    assert_eq!(item, vec![i as u8]);
929                }
930
931                assert!(queue.dequeue().await.unwrap().is_none());
932            }
933        });
934    }
935
936    #[test_traced]
937    fn test_reset() {
938        let executor = deterministic::Runner::default();
939        executor.start(|context| async move {
940            let cfg = test_config("test_reset", &context);
941            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
942                .await
943                .unwrap();
944
945            // Enqueue items
946            for i in 0..5u8 {
947                queue.enqueue(vec![i]).await.unwrap();
948            }
949
950            // Read some
951            queue.dequeue().await.unwrap();
952            queue.dequeue().await.unwrap();
953            queue.dequeue().await.unwrap();
954            assert_eq!(queue.read_position(), 3);
955
956            // Reset without ack - should go back to 0
957            queue.reset();
958            assert_eq!(queue.read_position(), 0);
959
960            // Verify we can re-read
961            let (p, item) = queue.dequeue().await.unwrap().unwrap();
962            assert_eq!(p, 0);
963            assert_eq!(item, vec![0]);
964        });
965    }
966
967    #[test_traced]
968    fn test_reset_with_ack() {
969        let executor = deterministic::Runner::default();
970        executor.start(|context| async move {
971            let cfg = test_config("test_reset_ack", &context);
972            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
973                .await
974                .unwrap();
975
976            // Enqueue items
977            for i in 0..10u8 {
978                queue.enqueue(vec![i]).await.unwrap();
979            }
980
981            // Read and ack some
982            for i in 0..5 {
983                queue.dequeue().await.unwrap();
984                queue.ack(i).await.unwrap();
985            }
986            assert_eq!(queue.ack_floor(), 5);
987            assert_eq!(queue.read_position(), 5);
988
989            // Read a few more
990            queue.dequeue().await.unwrap();
991            queue.dequeue().await.unwrap();
992            assert_eq!(queue.read_position(), 7);
993
994            // Reset - should go back to ack floor
995            queue.reset();
996            assert_eq!(queue.read_position(), 5);
997
998            // Next dequeue should return item 5
999            let (p, item) = queue.dequeue().await.unwrap().unwrap();
1000            assert_eq!(p, 5);
1001            assert_eq!(item, vec![5]);
1002        });
1003    }
1004
1005    #[test_traced]
1006    fn test_empty_queue_operations() {
1007        let executor = deterministic::Runner::default();
1008        executor.start(|context| async move {
1009            let cfg = test_config("test_empty", &context);
1010            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
1011                .await
1012                .unwrap();
1013
1014            // Operations on empty queue
1015            assert!(queue.is_empty().await);
1016            assert!(queue.dequeue().await.unwrap().is_none());
1017            queue.sync().await.unwrap();
1018            queue.reset();
1019        });
1020    }
1021
1022    #[test_traced]
1023    fn test_persistence() {
1024        let executor = deterministic::Runner::default();
1025        executor.start(|context| async move {
1026            let cfg = test_config("test_persist", &context);
1027
1028            // First session
1029            {
1030                let mut queue = Queue::<_, Vec<u8>>::init(context.with_label("first"), cfg.clone())
1031                    .await
1032                    .unwrap();
1033
1034                queue.enqueue(b"item0".to_vec()).await.unwrap();
1035                queue.enqueue(b"item1".to_vec()).await.unwrap();
1036                queue.sync().await.unwrap();
1037            }
1038
1039            // Second session - data should persist
1040            {
1041                let mut queue =
1042                    Queue::<_, Vec<u8>>::init(context.with_label("second"), cfg.clone())
1043                        .await
1044                        .unwrap();
1045
1046                assert_eq!(queue.size().await, 2);
1047
1048                let (_, item) = queue.dequeue().await.unwrap().unwrap();
1049                assert_eq!(item, b"item0");
1050
1051                let (_, item) = queue.dequeue().await.unwrap().unwrap();
1052                assert_eq!(item, b"item1");
1053            }
1054        });
1055    }
1056
1057    #[test_traced]
1058    fn test_large_queue_with_sparse_acks() {
1059        let executor = deterministic::Runner::default();
1060        executor.start(|context| async move {
1061            let cfg = test_config("test_sparse", &context);
1062            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
1063                .await
1064                .unwrap();
1065
1066            // Enqueue many items
1067            for i in 0..100u8 {
1068                queue.enqueue(vec![i]).await.unwrap();
1069            }
1070
1071            // Ack every 3rd item (sparse acking)
1072            for i in (0..100).step_by(3) {
1073                queue.ack(i).await.unwrap();
1074            }
1075
1076            // Dequeue should skip acked items
1077            let mut received = Vec::new();
1078            while let Some((pos, _)) = queue.dequeue().await.unwrap() {
1079                received.push(pos);
1080            }
1081
1082            // Should have received all items not divisible by 3
1083            let expected: Vec<u64> = (0..100).filter(|x| x % 3 != 0).collect();
1084            assert_eq!(received, expected);
1085        });
1086    }
1087
1088    #[test_traced]
1089    fn test_acked_above_coalescing() {
1090        let executor = deterministic::Runner::default();
1091        executor.start(|context| async move {
1092            let cfg = test_config("test_coalesce", &context);
1093            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
1094                .await
1095                .unwrap();
1096
1097            // Enqueue items
1098            for i in 0..10u8 {
1099                queue.enqueue(vec![i]).await.unwrap();
1100            }
1101
1102            // Ack items 1-8 (not 0)
1103            for i in 1..9 {
1104                queue.ack(i).await.unwrap();
1105            }
1106
1107            // Acked_above should have items 1-8
1108            assert_eq!(queue.ack_floor(), 0);
1109            assert!(queue.acked_above_count() > 0);
1110
1111            // Now ack 0 - floor should advance to 9, consuming all acked_above
1112            queue.ack(0).await.unwrap();
1113            assert_eq!(queue.ack_floor(), 9);
1114            assert_eq!(queue.acked_above_count(), 0);
1115        });
1116    }
1117
1118    #[test_traced]
1119    fn test_ack_up_to_past_read_pos() {
1120        let executor = deterministic::Runner::default();
1121        executor.start(|context| async move {
1122            let cfg = test_config("test_ack_up_to_past_read_pos", &context);
1123            let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
1124                .await
1125                .unwrap();
1126
1127            for i in 0..10u8 {
1128                queue.enqueue(vec![i]).await.unwrap();
1129            }
1130
1131            // Read only 3 items
1132            for _ in 0..3 {
1133                queue.dequeue().await.unwrap();
1134            }
1135            assert_eq!(queue.read_position(), 3);
1136
1137            // Batch ack past read position
1138            queue.ack_up_to(7).await.unwrap();
1139            assert_eq!(queue.ack_floor(), 7);
1140
1141            // Dequeue should skip 3-6 and return 7
1142            let (pos, item) = queue.dequeue().await.unwrap().unwrap();
1143            assert_eq!(pos, 7);
1144            assert_eq!(item, vec![7]);
1145        });
1146    }
1147
1148    #[test_traced]
1149    fn test_metrics() {
1150        let executor = deterministic::Runner::default();
1151        executor.start(|context| async move {
1152            let cfg = test_config("test-metrics", &context);
1153            let ctx = context.with_label("test_metrics");
1154            let mut queue = Queue::<_, Vec<u8>>::init(ctx, cfg).await.unwrap();
1155
1156            let encoded = context.encode();
1157            assert!(
1158                encoded.contains("test_metrics_tip 0"),
1159                "expected tip 0: {encoded}"
1160            );
1161            assert!(
1162                encoded.contains("test_metrics_floor 0"),
1163                "expected floor 0: {encoded}"
1164            );
1165            assert!(
1166                encoded.contains("test_metrics_next 0"),
1167                "expected next 0: {encoded}"
1168            );
1169
1170            // Append updates tip without enqueue
1171            queue.append(vec![0]).await.unwrap();
1172            let encoded = context.encode();
1173            assert!(
1174                encoded.contains("test_metrics_tip 1"),
1175                "expected tip 1: {encoded}"
1176            );
1177            queue.commit().await.unwrap();
1178
1179            // Enqueue updates tip further
1180            for i in 1..10u8 {
1181                queue.enqueue(vec![i]).await.unwrap();
1182            }
1183            let encoded = context.encode();
1184            assert!(
1185                encoded.contains("test_metrics_tip 10"),
1186                "expected tip 10: {encoded}"
1187            );
1188
1189            // Multiple dequeues advance next
1190            queue.dequeue().await.unwrap();
1191            queue.dequeue().await.unwrap();
1192            let encoded = context.encode();
1193            assert!(
1194                encoded.contains("test_metrics_next 2"),
1195                "expected next 2: {encoded}"
1196            );
1197
1198            // Sequential ack advances floor
1199            queue.ack(0).await.unwrap();
1200            queue.ack(1).await.unwrap();
1201            let encoded = context.encode();
1202            assert!(
1203                encoded.contains("test_metrics_floor 2"),
1204                "expected floor 2: {encoded}"
1205            );
1206
1207            // Out-of-order ack: floor stays until gap fills
1208            queue.ack(4).await.unwrap();
1209            queue.ack(6).await.unwrap();
1210            let encoded = context.encode();
1211            assert!(
1212                encoded.contains("test_metrics_floor 2"),
1213                "expected floor still 2: {encoded}"
1214            );
1215
1216            // Fill gap coalesces floor forward
1217            queue.ack(2).await.unwrap();
1218            queue.ack(3).await.unwrap();
1219            let encoded = context.encode();
1220            assert!(
1221                encoded.contains("test_metrics_floor 5"),
1222                "expected floor 5: {encoded}"
1223            );
1224
1225            // ack_up_to advances floor past sparse ack at 6
1226            queue.ack_up_to(8).await.unwrap();
1227            let encoded = context.encode();
1228            assert!(
1229                encoded.contains("test_metrics_floor 8"),
1230                "expected floor 8: {encoded}"
1231            );
1232
1233            // Ack remaining
1234            queue.ack(8).await.unwrap();
1235            queue.ack(9).await.unwrap();
1236            let encoded = context.encode();
1237            assert!(
1238                encoded.contains("test_metrics_floor 10"),
1239                "expected floor 10: {encoded}"
1240            );
1241
1242            // Reset brings next back to floor
1243            queue.reset();
1244            let encoded = context.encode();
1245            assert!(
1246                encoded.contains("test_metrics_next 10"),
1247                "expected next 10: {encoded}"
1248            );
1249        });
1250    }
1251
1252    #[test_traced]
1253    fn test_metrics_next_updates_on_fast_forward() {
1254        let executor = deterministic::Runner::default();
1255        executor.start(|context| async move {
1256            let cfg = test_config("test-ff", &context);
1257            let ctx = context.with_label("test_ff");
1258            let mut queue = Queue::<_, Vec<u8>>::init(ctx, cfg).await.unwrap();
1259
1260            // Enqueue 3 items, dequeue and ack only the first
1261            for i in 0..3u8 {
1262                queue.enqueue(vec![i]).await.unwrap();
1263            }
1264            let (pos, _) = queue.dequeue().await.unwrap().unwrap();
1265            queue.ack(pos).await.unwrap();
1266
1267            let encoded = context.encode();
1268            assert!(
1269                encoded.contains("test_ff_next 1"),
1270                "expected next 1: {encoded}"
1271            );
1272
1273            // Ack remaining items out-of-order to advance floor to 3
1274            queue.ack(2).await.unwrap();
1275            queue.ack(1).await.unwrap();
1276            assert_eq!(queue.ack_floor(), 3);
1277
1278            // next metric is still 1 (no dequeue yet)
1279            let encoded = context.encode();
1280            assert!(
1281                encoded.contains("test_ff_next 1"),
1282                "expected next still 1: {encoded}"
1283            );
1284
1285            // Dequeue returns None but fast-forwards read_pos to ack_floor
1286            assert!(queue.dequeue().await.unwrap().is_none());
1287            let encoded = context.encode();
1288            assert!(
1289                encoded.contains("test_ff_next 3"),
1290                "expected next 3 after fast-forward: {encoded}"
1291            );
1292        });
1293    }
1294}