1use super::{metrics, Error};
4use crate::{
5 journal::contiguous::{variable, Reader as _},
6 rmap::RMap,
7 Context, Persistable,
8};
9use commonware_codec::CodecShared;
10use commonware_runtime::{buffer::paged::CacheRef, telemetry::metrics::GaugeExt};
11use std::num::{NonZeroU64, NonZeroUsize};
12use tracing::debug;
13
14#[derive(Clone)]
16pub struct Config<C> {
17 pub partition: String,
19
20 pub items_per_section: NonZeroU64,
25
26 pub compression: Option<u8>,
31
32 pub codec_config: C,
34
35 pub page_cache: CacheRef,
37
38 pub write_buffer: NonZeroUsize,
40}
41
42pub struct Queue<E: Context, V: CodecShared> {
74 journal: variable::Journal<E, V>,
76
77 read_pos: u64,
82
83 ack_floor: u64,
87
88 acked_above: RMap,
93
94 metrics: metrics::Metrics,
96}
97
98impl<E: Context, V: CodecShared> Queue<E, V> {
99 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
109 let metrics = metrics::Metrics::init(&context);
111
112 let journal = variable::Journal::init(
113 context.child("journal"),
114 variable::Config {
115 partition: cfg.partition,
116 items_per_section: cfg.items_per_section,
117 compression: cfg.compression,
118 codec_config: cfg.codec_config,
119 page_cache: cfg.page_cache,
120 write_buffer: cfg.write_buffer,
121 },
122 )
123 .await?;
124
125 let bounds = journal.reader().await.bounds();
128 let acked_above = RMap::new();
129
130 debug!(floor = bounds.start, size = bounds.end, "queue initialized");
131
132 let _ = metrics.tip.try_set(bounds.end);
134 let _ = metrics.floor.try_set(bounds.start);
135 let _ = metrics.next.try_set(bounds.start);
136
137 Ok(Self {
138 journal,
139 read_pos: bounds.start,
140 ack_floor: bounds.start,
141 acked_above,
142 metrics,
143 })
144 }
145
146 pub fn is_acked(&self, position: u64) -> bool {
148 position < self.ack_floor || self.acked_above.get(&position).is_some()
149 }
150
151 pub async fn append(&mut self, item: V) -> Result<u64, Error> {
160 let pos = self.journal.append(&item).await?;
161 let _ = self.metrics.tip.try_set(pos + 1);
162 debug!(pos, "appended item");
163 Ok(pos)
164 }
165
166 pub async fn enqueue(&mut self, item: V) -> Result<u64, Error> {
173 let pos = self.append(item).await?;
174 self.commit().await?;
175 Ok(pos)
176 }
177
178 pub async fn dequeue(&mut self) -> Result<Option<(u64, V)>, Error> {
186 let reader = self.journal.reader().await;
187 let size = reader.bounds().end;
188
189 if self.read_pos < self.ack_floor {
191 self.read_pos = self.ack_floor;
192 }
193
194 if let Some((_, end)) = self.acked_above.get(&self.read_pos) {
196 self.read_pos = end.saturating_add(1);
197 }
198
199 let _ = self.metrics.next.try_set(self.read_pos);
201 if self.read_pos >= size {
202 return Ok(None);
203 }
204
205 let item = reader.read(self.read_pos).await?;
206 let pos = self.read_pos;
207 self.read_pos += 1;
208 let _ = self.metrics.next.try_set(self.read_pos);
209 debug!(position = pos, "dequeued item");
210 Ok(Some((pos, item)))
211 }
212
213 pub async fn ack(&mut self, position: u64) -> Result<(), Error> {
221 let size = self.journal.size().await;
222 if position >= size {
223 return Err(Error::PositionOutOfRange(position, size));
224 }
225
226 if position < self.ack_floor {
228 return Ok(());
229 }
230
231 if self.acked_above.get(&position).is_some() {
233 return Ok(());
234 }
235
236 if position == self.ack_floor {
238 let next = position + 1;
240 let final_floor = match self.acked_above.get(&next) {
241 Some((_, end)) => end + 1,
242 None => next,
243 };
244 self.acked_above.remove(next, final_floor - 1);
245 self.ack_floor = final_floor;
246 let _ = self.metrics.floor.try_set(self.ack_floor);
247 debug!(floor = self.ack_floor, "advanced ack floor");
248 } else {
249 self.acked_above.insert(position);
251 debug!(position, "acked item above floor");
252 }
253 Ok(())
254 }
255
256 pub async fn ack_up_to(&mut self, up_to: u64) -> Result<(), Error> {
263 let size = self.journal.size().await;
264 if up_to > size {
265 return Err(Error::PositionOutOfRange(up_to, size));
266 }
267
268 if up_to <= self.ack_floor {
270 return Ok(());
271 }
272
273 let final_floor = match self.acked_above.get(&up_to) {
275 Some((_, end)) => end + 1,
276 None => up_to,
277 };
278
279 self.acked_above.remove(self.ack_floor, final_floor - 1);
281 self.ack_floor = final_floor;
282 let _ = self.metrics.floor.try_set(self.ack_floor);
283 debug!(floor = self.ack_floor, "batch acked up to");
284 Ok(())
285 }
286
287 pub const fn read_position(&self) -> u64 {
291 self.read_pos
292 }
293
294 pub const fn ack_floor(&self) -> u64 {
298 self.ack_floor
299 }
300
301 pub async fn size(&self) -> u64 {
306 self.journal.size().await
307 }
308
309 pub async fn is_empty(&self) -> bool {
311 self.ack_floor >= self.journal.size().await
314 }
315
316 pub fn reset(&mut self) {
319 let old_pos = self.read_pos;
320 self.read_pos = self.ack_floor;
321 let _ = self.metrics.next.try_set(self.read_pos);
322 debug!(
323 old_read_pos = old_pos,
324 new_read_pos = self.read_pos,
325 "reset read position"
326 );
327 }
328
329 #[cfg(test)]
331 pub(crate) async fn pending(&self) -> u64 {
332 self.journal.size().await.saturating_sub(self.read_pos)
333 }
334
335 #[cfg(test)]
337 pub(crate) fn acked_above_count(&self) -> usize {
338 self.acked_above
339 .iter()
340 .map(|(&s, &e)| (e - s + 1) as usize)
341 .sum()
342 }
343}
344
345impl<E: Context + Send, V: CodecShared + Send> Persistable for Queue<E, V> {
346 type Error = Error;
347
348 async fn commit(&self) -> Result<(), Error> {
349 self.journal.commit().await?;
350 Ok(())
351 }
352
353 async fn sync(&self) -> Result<(), Error> {
354 self.journal.sync().await?;
355 self.journal.prune(self.ack_floor).await?;
356 Ok(())
357 }
358
359 async fn destroy(self) -> Result<(), Error> {
360 self.journal.destroy().await?;
361 Ok(())
362 }
363}
364
365#[cfg(test)]
366mod tests {
367 use super::*;
368 use commonware_codec::RangeCfg;
369 use commonware_macros::test_traced;
370 use commonware_runtime::{
371 buffer::paged::CacheRef, deterministic, BufferPooler, Metrics as _, Runner, Supervisor as _,
372 };
373 use commonware_utils::{NZUsize, NZU16, NZU64};
374 use std::num::NonZeroU16;
375
376 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
377 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
378
379 fn test_config(partition: &str, pooler: &impl BufferPooler) -> Config<(RangeCfg<usize>, ())> {
380 Config {
381 partition: partition.into(),
382 items_per_section: NZU64!(10),
383 compression: None,
384 codec_config: ((0..).into(), ()),
385 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
386 write_buffer: NZUsize!(4096),
387 }
388 }
389
390 #[test_traced]
391 fn test_basic_enqueue_dequeue() {
392 let executor = deterministic::Runner::default();
393 executor.start(|context| async move {
394 let cfg = test_config("test_basic", &context);
395 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
396 .await
397 .unwrap();
398
399 assert!(queue.is_empty().await);
401 assert_eq!(queue.pending().await, 0);
402 assert_eq!(queue.size().await, 0);
403
404 let pos0 = queue.enqueue(b"item0".to_vec()).await.unwrap();
406 let pos1 = queue.enqueue(b"item1".to_vec()).await.unwrap();
407 let pos2 = queue.enqueue(b"item2".to_vec()).await.unwrap();
408
409 assert_eq!(pos0, 0);
410 assert_eq!(pos1, 1);
411 assert_eq!(pos2, 2);
412 assert_eq!(queue.size().await, 3);
413 assert_eq!(queue.pending().await, 3);
414 assert!(!queue.is_empty().await);
415
416 let (p, item) = queue.dequeue().await.unwrap().unwrap();
418 assert_eq!(p, 0);
419 assert_eq!(item, b"item0");
420 assert_eq!(queue.pending().await, 2);
421
422 let (p, item) = queue.dequeue().await.unwrap().unwrap();
423 assert_eq!(p, 1);
424 assert_eq!(item, b"item1");
425 assert_eq!(queue.pending().await, 1);
426
427 let (p, item) = queue.dequeue().await.unwrap().unwrap();
428 assert_eq!(p, 2);
429 assert_eq!(item, b"item2");
430 assert_eq!(queue.pending().await, 0);
431
432 assert!(!queue.is_empty().await);
434 assert!(queue.dequeue().await.unwrap().is_none());
435 });
436 }
437
438 #[test_traced]
439 fn test_append_commit_batch() {
440 let executor = deterministic::Runner::default();
441 executor.start(|context| async move {
442 let cfg = test_config("test_batch", &context);
443 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
444 .await
445 .unwrap();
446
447 for i in 0..5u8 {
449 queue.append(vec![i]).await.unwrap();
450 }
451 queue.commit().await.unwrap();
452 assert_eq!(queue.size().await, 5);
453
454 for i in 0..5 {
456 let (pos, item) = queue.dequeue().await.unwrap().unwrap();
457 assert_eq!(pos, i);
458 assert_eq!(item, vec![i as u8]);
459 }
460
461 for i in 5..8u8 {
463 queue.append(vec![i]).await.unwrap();
464 }
465 queue.commit().await.unwrap();
466 queue.enqueue(vec![8]).await.unwrap();
467 assert_eq!(queue.size().await, 9);
468
469 queue.ack_up_to(9).await.unwrap();
470 assert!(queue.is_empty().await);
471 });
472 }
473
474 #[test_traced]
475 fn test_append_commit_persistence() {
476 let executor = deterministic::Runner::default();
477 executor.start(|context| async move {
478 let cfg = test_config("test_batch_persist", &context);
479
480 {
481 let mut queue = Queue::<_, Vec<u8>>::init(context.child("first"), cfg.clone())
482 .await
483 .unwrap();
484 for i in 0..4u8 {
485 queue.append(vec![i]).await.unwrap();
486 }
487 queue.commit().await.unwrap();
488 queue.sync().await.unwrap();
489 }
490
491 {
492 let mut queue = Queue::<_, Vec<u8>>::init(context.child("second"), cfg)
493 .await
494 .unwrap();
495 assert_eq!(queue.size().await, 4);
496 for i in 0..4 {
497 let (pos, item) = queue.dequeue().await.unwrap().unwrap();
498 assert_eq!(pos, i);
499 assert_eq!(item, vec![i as u8]);
500 }
501 }
502 });
503 }
504
505 #[test_traced]
506 fn test_sequential_ack() {
507 let executor = deterministic::Runner::default();
508 executor.start(|context| async move {
509 let cfg = test_config("test_seq_ack", &context);
510 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
511 .await
512 .unwrap();
513
514 for i in 0..5u8 {
516 queue.enqueue(vec![i]).await.unwrap();
517 }
518
519 for i in 0..5 {
521 let (pos, _) = queue.dequeue().await.unwrap().unwrap();
522 assert_eq!(pos, i);
523 queue.ack(pos).await.unwrap();
524 assert_eq!(queue.ack_floor(), i + 1);
525 }
526
527 assert!(queue.is_empty().await);
529 assert_eq!(queue.ack_floor(), 5);
530 });
531 }
532
533 #[test_traced]
534 fn test_out_of_order_ack() {
535 let executor = deterministic::Runner::default();
536 executor.start(|context| async move {
537 let cfg = test_config("test_ooo_ack", &context);
538 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
539 .await
540 .unwrap();
541
542 for i in 0..5u8 {
544 queue.enqueue(vec![i]).await.unwrap();
545 }
546
547 for _ in 0..5 {
549 queue.dequeue().await.unwrap();
550 }
551
552 queue.ack(2).await.unwrap();
554 assert_eq!(queue.ack_floor(), 0); assert!(queue.is_acked(2));
556
557 queue.ack(4).await.unwrap();
558 assert_eq!(queue.ack_floor(), 0);
559 assert!(queue.is_acked(4));
560
561 queue.ack(1).await.unwrap();
562 assert_eq!(queue.ack_floor(), 0);
563
564 queue.ack(3).await.unwrap();
565 assert_eq!(queue.ack_floor(), 0);
566
567 queue.ack(0).await.unwrap();
569 assert_eq!(queue.ack_floor(), 5);
570 assert!(queue.is_empty().await);
571 });
572 }
573
574 #[test_traced]
575 fn test_ack_up_to() {
576 let executor = deterministic::Runner::default();
577 executor.start(|context| async move {
578 let cfg = test_config("test_ack_up_to", &context);
579 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
580 .await
581 .unwrap();
582
583 for i in 0..10u8 {
585 queue.enqueue(vec![i]).await.unwrap();
586 }
587
588 queue.ack_up_to(5).await.unwrap();
590 assert_eq!(queue.ack_floor(), 5);
591
592 for i in 0..5 {
594 assert!(queue.is_acked(i));
595 }
596 for i in 5..10 {
598 assert!(!queue.is_acked(i));
599 }
600
601 let (p, _) = queue.dequeue().await.unwrap().unwrap();
603 assert_eq!(p, 5);
604 });
605 }
606
607 #[test_traced]
608 fn test_ack_up_to_with_existing_acks() {
609 let executor = deterministic::Runner::default();
610 executor.start(|context| async move {
611 let cfg = test_config("test_ack_up_to_existing", &context);
612 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
613 .await
614 .unwrap();
615
616 for i in 0..10u8 {
618 queue.enqueue(vec![i]).await.unwrap();
619 }
620
621 queue.ack(7).await.unwrap();
623 queue.ack(8).await.unwrap();
624 assert_eq!(queue.acked_above_count(), 2);
625
626 queue.ack_up_to(5).await.unwrap();
628 assert_eq!(queue.ack_floor(), 5);
629 assert_eq!(queue.acked_above_count(), 2);
630
631 queue.ack_up_to(9).await.unwrap();
633 assert_eq!(queue.ack_floor(), 9);
634 assert_eq!(queue.acked_above_count(), 0);
635 });
636 }
637
638 #[test_traced]
639 fn test_ack_up_to_coalesces_with_acked_above() {
640 let executor = deterministic::Runner::default();
641 executor.start(|context| async move {
642 let cfg = test_config("test_ack_up_to_coalesce", &context);
643 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
644 .await
645 .unwrap();
646
647 for i in 0..10u8 {
649 queue.enqueue(vec![i]).await.unwrap();
650 }
651
652 queue.ack(5).await.unwrap();
654 queue.ack(6).await.unwrap();
655 queue.ack(7).await.unwrap();
656 assert_eq!(queue.ack_floor(), 0);
657
658 queue.ack_up_to(5).await.unwrap();
660 assert_eq!(queue.ack_floor(), 8); });
662 }
663
664 #[test_traced]
665 fn test_ack_up_to_errors() {
666 let executor = deterministic::Runner::default();
667 executor.start(|context| async move {
668 let cfg = test_config("test_ack_up_to_errors", &context);
669 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
670 .await
671 .unwrap();
672
673 queue.enqueue(b"item0".to_vec()).await.unwrap();
674 queue.enqueue(b"item1".to_vec()).await.unwrap();
675
676 let err = queue.ack_up_to(5).await.unwrap_err();
678 assert!(matches!(err, Error::PositionOutOfRange(5, 2)));
679
680 queue.ack_up_to(2).await.unwrap();
682 assert_eq!(queue.ack_floor(), 2);
683
684 queue.ack_up_to(1).await.unwrap();
686 assert_eq!(queue.ack_floor(), 2);
687 });
688 }
689
690 #[test_traced]
691 fn test_dequeue_skips_acked() {
692 let executor = deterministic::Runner::default();
693 executor.start(|context| async move {
694 let cfg = test_config("test_skip_acked", &context);
695 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
696 .await
697 .unwrap();
698
699 for i in 0..5u8 {
701 queue.enqueue(vec![i]).await.unwrap();
702 }
703
704 queue.ack(1).await.unwrap();
706 queue.ack(3).await.unwrap();
707
708 let (p, item) = queue.dequeue().await.unwrap().unwrap();
710 assert_eq!(p, 0);
711 assert_eq!(item, vec![0]);
712
713 let (p, item) = queue.dequeue().await.unwrap().unwrap();
714 assert_eq!(p, 2); assert_eq!(item, vec![2]);
716
717 let (p, item) = queue.dequeue().await.unwrap().unwrap();
718 assert_eq!(p, 4); assert_eq!(item, vec![4]);
720
721 assert!(queue.dequeue().await.unwrap().is_none());
722 });
723 }
724
725 #[test_traced]
726 fn test_ack_errors() {
727 let executor = deterministic::Runner::default();
728 executor.start(|context| async move {
729 let cfg = test_config("test_ack_errors", &context);
730 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
731 .await
732 .unwrap();
733
734 queue.enqueue(b"item0".to_vec()).await.unwrap();
735 queue.enqueue(b"item1".to_vec()).await.unwrap();
736
737 let err = queue.ack(5).await.unwrap_err();
739 assert!(matches!(err, Error::PositionOutOfRange(5, 2)));
740
741 queue.ack(1).await.unwrap();
743 assert!(queue.is_acked(1));
744
745 queue.ack(1).await.unwrap();
747 });
748 }
749
750 #[test_traced]
751 fn test_prune() {
752 let executor = deterministic::Runner::default();
753 executor.start(|context| async move {
754 let cfg = test_config("test_prune", &context);
755 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
756 .await
757 .unwrap();
758
759 for i in 0..25u8 {
761 queue.enqueue(vec![i]).await.unwrap();
762 }
763 queue.sync().await.unwrap();
764
765 for i in 0..15 {
767 queue.dequeue().await.unwrap();
768 queue.ack(i).await.unwrap();
769 }
770 assert_eq!(queue.ack_floor(), 15);
771
772 let (p, item) = queue.dequeue().await.unwrap().unwrap();
774 assert_eq!(p, 15);
775 assert_eq!(item, vec![15]);
776 });
777 }
778
779 #[test_traced]
780 fn test_ack_across_sections() {
781 let executor = deterministic::Runner::default();
782 executor.start(|context| async move {
783 let cfg = test_config("test_multi_prune", &context);
784 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
785 .await
786 .unwrap();
787
788 for i in 0..50u8 {
790 queue.enqueue(vec![i]).await.unwrap();
791 }
792 queue.sync().await.unwrap();
793
794 for i in 0..15 {
796 queue.dequeue().await.unwrap();
797 queue.ack(i).await.unwrap();
798 }
799 assert_eq!(queue.ack_floor(), 15);
800
801 let (p, item) = queue.dequeue().await.unwrap().unwrap();
803 assert_eq!(p, 15);
804 assert_eq!(item, vec![15]);
805
806 queue.ack(15).await.unwrap();
808 for i in 16..30 {
809 queue.dequeue().await.unwrap();
810 queue.ack(i).await.unwrap();
811 }
812 assert_eq!(queue.ack_floor(), 30);
813
814 let (p, item) = queue.dequeue().await.unwrap().unwrap();
816 assert_eq!(p, 30);
817 assert_eq!(item, vec![30]);
818
819 queue.ack(30).await.unwrap();
821 for i in 31..50 {
822 queue.dequeue().await.unwrap();
823 queue.ack(i).await.unwrap();
824 }
825 assert_eq!(queue.ack_floor(), 50);
826
827 assert!(queue.is_empty().await);
829 assert!(queue.dequeue().await.unwrap().is_none());
830 });
831 }
832
833 #[test_traced]
834 fn test_crash_recovery_replays_from_pruning_boundary() {
835 let executor = deterministic::Runner::default();
837 executor.start(|context| async move {
838 let cfg = test_config("test_recovery_replay", &context);
839
840 {
842 let mut queue = Queue::<_, Vec<u8>>::init(context.child("first"), cfg.clone())
843 .await
844 .unwrap();
845
846 for i in 0..5u8 {
847 queue.enqueue(vec![i]).await.unwrap();
848 }
849
850 queue.ack(0).await.unwrap();
852 queue.ack(1).await.unwrap();
853 queue.ack(2).await.unwrap();
854 assert_eq!(queue.ack_floor(), 3);
855
856 queue.sync().await.unwrap();
857 }
858
859 {
861 let mut queue = Queue::<_, Vec<u8>>::init(context.child("second"), cfg.clone())
862 .await
863 .unwrap();
864
865 assert_eq!(queue.ack_floor(), 0);
867
868 for i in 0..5 {
870 let (p, _) = queue.dequeue().await.unwrap().unwrap();
871 assert_eq!(p, i);
872 }
873 }
874 });
875 }
876
877 #[test_traced]
878 fn test_crash_recovery_with_pruning() {
879 let executor = deterministic::Runner::default();
881 executor.start(|context| async move {
882 let cfg = test_config("test_recovery_pruned", &context);
883
884 let expected_pruning_boundary = {
886 let mut queue = Queue::<_, Vec<u8>>::init(context.child("first"), cfg.clone())
887 .await
888 .unwrap();
889
890 for i in 0..25u8 {
892 queue.enqueue(vec![i]).await.unwrap();
893 }
894
895 for i in 0..15 {
897 queue.ack(i).await.unwrap();
898 }
899 assert_eq!(queue.ack_floor(), 15);
900
901 queue.sync().await.unwrap();
903
904 let pruning_boundary = queue.journal.bounds().await.start;
906 assert!(pruning_boundary > 0, "expected some pruning to occur");
907
908 pruning_boundary
909 };
910
911 {
913 let mut queue = Queue::<_, Vec<u8>>::init(context.child("second"), cfg.clone())
914 .await
915 .unwrap();
916
917 let pruning_boundary = queue.journal.bounds().await.start;
919 assert_eq!(queue.ack_floor(), pruning_boundary);
920 assert_eq!(pruning_boundary, expected_pruning_boundary);
921
922 for i in pruning_boundary..25 {
924 let (p, item) = queue.dequeue().await.unwrap().unwrap();
925 assert_eq!(p, i);
926 assert_eq!(item, vec![i as u8]);
927 }
928
929 assert!(queue.dequeue().await.unwrap().is_none());
930 }
931 });
932 }
933
934 #[test_traced]
935 fn test_reset() {
936 let executor = deterministic::Runner::default();
937 executor.start(|context| async move {
938 let cfg = test_config("test_reset", &context);
939 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
940 .await
941 .unwrap();
942
943 for i in 0..5u8 {
945 queue.enqueue(vec![i]).await.unwrap();
946 }
947
948 queue.dequeue().await.unwrap();
950 queue.dequeue().await.unwrap();
951 queue.dequeue().await.unwrap();
952 assert_eq!(queue.read_position(), 3);
953
954 queue.reset();
956 assert_eq!(queue.read_position(), 0);
957
958 let (p, item) = queue.dequeue().await.unwrap().unwrap();
960 assert_eq!(p, 0);
961 assert_eq!(item, vec![0]);
962 });
963 }
964
965 #[test_traced]
966 fn test_reset_with_ack() {
967 let executor = deterministic::Runner::default();
968 executor.start(|context| async move {
969 let cfg = test_config("test_reset_ack", &context);
970 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
971 .await
972 .unwrap();
973
974 for i in 0..10u8 {
976 queue.enqueue(vec![i]).await.unwrap();
977 }
978
979 for i in 0..5 {
981 queue.dequeue().await.unwrap();
982 queue.ack(i).await.unwrap();
983 }
984 assert_eq!(queue.ack_floor(), 5);
985 assert_eq!(queue.read_position(), 5);
986
987 queue.dequeue().await.unwrap();
989 queue.dequeue().await.unwrap();
990 assert_eq!(queue.read_position(), 7);
991
992 queue.reset();
994 assert_eq!(queue.read_position(), 5);
995
996 let (p, item) = queue.dequeue().await.unwrap().unwrap();
998 assert_eq!(p, 5);
999 assert_eq!(item, vec![5]);
1000 });
1001 }
1002
1003 #[test_traced]
1004 fn test_empty_queue_operations() {
1005 let executor = deterministic::Runner::default();
1006 executor.start(|context| async move {
1007 let cfg = test_config("test_empty", &context);
1008 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
1009 .await
1010 .unwrap();
1011
1012 assert!(queue.is_empty().await);
1014 assert!(queue.dequeue().await.unwrap().is_none());
1015 queue.sync().await.unwrap();
1016 queue.reset();
1017 });
1018 }
1019
1020 #[test_traced]
1021 fn test_persistence() {
1022 let executor = deterministic::Runner::default();
1023 executor.start(|context| async move {
1024 let cfg = test_config("test_persist", &context);
1025
1026 {
1028 let mut queue = Queue::<_, Vec<u8>>::init(context.child("first"), cfg.clone())
1029 .await
1030 .unwrap();
1031
1032 queue.enqueue(b"item0".to_vec()).await.unwrap();
1033 queue.enqueue(b"item1".to_vec()).await.unwrap();
1034 queue.sync().await.unwrap();
1035 }
1036
1037 {
1039 let mut queue = Queue::<_, Vec<u8>>::init(context.child("second"), cfg.clone())
1040 .await
1041 .unwrap();
1042
1043 assert_eq!(queue.size().await, 2);
1044
1045 let (_, item) = queue.dequeue().await.unwrap().unwrap();
1046 assert_eq!(item, b"item0");
1047
1048 let (_, item) = queue.dequeue().await.unwrap().unwrap();
1049 assert_eq!(item, b"item1");
1050 }
1051 });
1052 }
1053
1054 #[test_traced]
1055 fn test_large_queue_with_sparse_acks() {
1056 let executor = deterministic::Runner::default();
1057 executor.start(|context| async move {
1058 let cfg = test_config("test_sparse", &context);
1059 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
1060 .await
1061 .unwrap();
1062
1063 for i in 0..100u8 {
1065 queue.enqueue(vec![i]).await.unwrap();
1066 }
1067
1068 for i in (0..100).step_by(3) {
1070 queue.ack(i).await.unwrap();
1071 }
1072
1073 let mut received = Vec::new();
1075 while let Some((pos, _)) = queue.dequeue().await.unwrap() {
1076 received.push(pos);
1077 }
1078
1079 let expected: Vec<u64> = (0..100).filter(|x| x % 3 != 0).collect();
1081 assert_eq!(received, expected);
1082 });
1083 }
1084
1085 #[test_traced]
1086 fn test_acked_above_coalescing() {
1087 let executor = deterministic::Runner::default();
1088 executor.start(|context| async move {
1089 let cfg = test_config("test_coalesce", &context);
1090 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
1091 .await
1092 .unwrap();
1093
1094 for i in 0..10u8 {
1096 queue.enqueue(vec![i]).await.unwrap();
1097 }
1098
1099 for i in 1..9 {
1101 queue.ack(i).await.unwrap();
1102 }
1103
1104 assert_eq!(queue.ack_floor(), 0);
1106 assert!(queue.acked_above_count() > 0);
1107
1108 queue.ack(0).await.unwrap();
1110 assert_eq!(queue.ack_floor(), 9);
1111 assert_eq!(queue.acked_above_count(), 0);
1112 });
1113 }
1114
1115 #[test_traced]
1116 fn test_ack_up_to_past_read_pos() {
1117 let executor = deterministic::Runner::default();
1118 executor.start(|context| async move {
1119 let cfg = test_config("test_ack_up_to_past_read_pos", &context);
1120 let mut queue = Queue::<_, Vec<u8>>::init(context.child("storage"), cfg)
1121 .await
1122 .unwrap();
1123
1124 for i in 0..10u8 {
1125 queue.enqueue(vec![i]).await.unwrap();
1126 }
1127
1128 for _ in 0..3 {
1130 queue.dequeue().await.unwrap();
1131 }
1132 assert_eq!(queue.read_position(), 3);
1133
1134 queue.ack_up_to(7).await.unwrap();
1136 assert_eq!(queue.ack_floor(), 7);
1137
1138 let (pos, item) = queue.dequeue().await.unwrap().unwrap();
1140 assert_eq!(pos, 7);
1141 assert_eq!(item, vec![7]);
1142 });
1143 }
1144
1145 #[test_traced]
1146 fn test_metrics() {
1147 let executor = deterministic::Runner::default();
1148 executor.start(|context| async move {
1149 let cfg = test_config("test-metrics", &context);
1150 let ctx = context.child("test_metrics");
1151 let mut queue = Queue::<_, Vec<u8>>::init(ctx, cfg).await.unwrap();
1152
1153 let encoded = context.encode();
1154 assert!(
1155 encoded.contains("test_metrics_tip 0"),
1156 "expected tip 0: {encoded}"
1157 );
1158 assert!(
1159 encoded.contains("test_metrics_floor 0"),
1160 "expected floor 0: {encoded}"
1161 );
1162 assert!(
1163 encoded.contains("test_metrics_next 0"),
1164 "expected next 0: {encoded}"
1165 );
1166
1167 queue.append(vec![0]).await.unwrap();
1169 let encoded = context.encode();
1170 assert!(
1171 encoded.contains("test_metrics_tip 1"),
1172 "expected tip 1: {encoded}"
1173 );
1174 queue.commit().await.unwrap();
1175
1176 for i in 1..10u8 {
1178 queue.enqueue(vec![i]).await.unwrap();
1179 }
1180 let encoded = context.encode();
1181 assert!(
1182 encoded.contains("test_metrics_tip 10"),
1183 "expected tip 10: {encoded}"
1184 );
1185
1186 queue.dequeue().await.unwrap();
1188 queue.dequeue().await.unwrap();
1189 let encoded = context.encode();
1190 assert!(
1191 encoded.contains("test_metrics_next 2"),
1192 "expected next 2: {encoded}"
1193 );
1194
1195 queue.ack(0).await.unwrap();
1197 queue.ack(1).await.unwrap();
1198 let encoded = context.encode();
1199 assert!(
1200 encoded.contains("test_metrics_floor 2"),
1201 "expected floor 2: {encoded}"
1202 );
1203
1204 queue.ack(4).await.unwrap();
1206 queue.ack(6).await.unwrap();
1207 let encoded = context.encode();
1208 assert!(
1209 encoded.contains("test_metrics_floor 2"),
1210 "expected floor still 2: {encoded}"
1211 );
1212
1213 queue.ack(2).await.unwrap();
1215 queue.ack(3).await.unwrap();
1216 let encoded = context.encode();
1217 assert!(
1218 encoded.contains("test_metrics_floor 5"),
1219 "expected floor 5: {encoded}"
1220 );
1221
1222 queue.ack_up_to(8).await.unwrap();
1224 let encoded = context.encode();
1225 assert!(
1226 encoded.contains("test_metrics_floor 8"),
1227 "expected floor 8: {encoded}"
1228 );
1229
1230 queue.ack(8).await.unwrap();
1232 queue.ack(9).await.unwrap();
1233 let encoded = context.encode();
1234 assert!(
1235 encoded.contains("test_metrics_floor 10"),
1236 "expected floor 10: {encoded}"
1237 );
1238
1239 queue.reset();
1241 let encoded = context.encode();
1242 assert!(
1243 encoded.contains("test_metrics_next 10"),
1244 "expected next 10: {encoded}"
1245 );
1246 });
1247 }
1248
1249 #[test_traced]
1250 fn test_metrics_next_updates_on_fast_forward() {
1251 let executor = deterministic::Runner::default();
1252 executor.start(|context| async move {
1253 let cfg = test_config("test-ff", &context);
1254 let ctx = context.child("test_ff");
1255 let mut queue = Queue::<_, Vec<u8>>::init(ctx, cfg).await.unwrap();
1256
1257 for i in 0..3u8 {
1259 queue.enqueue(vec![i]).await.unwrap();
1260 }
1261 let (pos, _) = queue.dequeue().await.unwrap().unwrap();
1262 queue.ack(pos).await.unwrap();
1263
1264 let encoded = context.encode();
1265 assert!(
1266 encoded.contains("test_ff_next 1"),
1267 "expected next 1: {encoded}"
1268 );
1269
1270 queue.ack(2).await.unwrap();
1272 queue.ack(1).await.unwrap();
1273 assert_eq!(queue.ack_floor(), 3);
1274
1275 let encoded = context.encode();
1277 assert!(
1278 encoded.contains("test_ff_next 1"),
1279 "expected next still 1: {encoded}"
1280 );
1281
1282 assert!(queue.dequeue().await.unwrap().is_none());
1284 let encoded = context.encode();
1285 assert!(
1286 encoded.contains("test_ff_next 3"),
1287 "expected next 3 after fast-forward: {encoded}"
1288 );
1289 });
1290 }
1291}