1use super::{metrics, Error};
4use crate::{
5 journal::contiguous::{variable, Reader as _},
6 rmap::RMap,
7 Persistable,
8};
9use commonware_codec::CodecShared;
10use commonware_runtime::{
11 buffer::paged::CacheRef, telemetry::metrics::status::GaugeExt, Clock, Metrics, Storage,
12};
13use std::num::{NonZeroU64, NonZeroUsize};
14use tracing::debug;
15
16#[derive(Clone)]
18pub struct Config<C> {
19 pub partition: String,
21
22 pub items_per_section: NonZeroU64,
27
28 pub compression: Option<u8>,
33
34 pub codec_config: C,
36
37 pub page_cache: CacheRef,
39
40 pub write_buffer: NonZeroUsize,
42}
43
44pub struct Queue<E: Clock + Storage + Metrics, V: CodecShared> {
76 journal: variable::Journal<E, V>,
78
79 read_pos: u64,
84
85 ack_floor: u64,
89
90 acked_above: RMap,
95
96 metrics: metrics::Metrics,
98}
99
100impl<E: Clock + Storage + Metrics, V: CodecShared> Queue<E, V> {
101 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
111 let metrics = metrics::Metrics::init(&context);
113
114 let journal = variable::Journal::init(
115 context.with_label("journal"),
116 variable::Config {
117 partition: cfg.partition,
118 items_per_section: cfg.items_per_section,
119 compression: cfg.compression,
120 codec_config: cfg.codec_config,
121 page_cache: cfg.page_cache,
122 write_buffer: cfg.write_buffer,
123 },
124 )
125 .await?;
126
127 let bounds = journal.reader().await.bounds();
130 let acked_above = RMap::new();
131
132 debug!(floor = bounds.start, size = bounds.end, "queue initialized");
133
134 let _ = metrics.tip.try_set(bounds.end);
136 let _ = metrics.floor.try_set(bounds.start);
137 let _ = metrics.next.try_set(bounds.start);
138
139 Ok(Self {
140 journal,
141 read_pos: bounds.start,
142 ack_floor: bounds.start,
143 acked_above,
144 metrics,
145 })
146 }
147
148 pub fn is_acked(&self, position: u64) -> bool {
150 position < self.ack_floor || self.acked_above.get(&position).is_some()
151 }
152
153 pub async fn append(&mut self, item: V) -> Result<u64, Error> {
162 let pos = self.journal.append(&item).await?;
163 let _ = self.metrics.tip.try_set(pos + 1);
164 debug!(pos, "appended item");
165 Ok(pos)
166 }
167
168 pub async fn enqueue(&mut self, item: V) -> Result<u64, Error> {
175 let pos = self.append(item).await?;
176 self.commit().await?;
177 Ok(pos)
178 }
179
180 pub async fn dequeue(&mut self) -> Result<Option<(u64, V)>, Error> {
188 let reader = self.journal.reader().await;
189 let size = reader.bounds().end;
190
191 if self.read_pos < self.ack_floor {
193 self.read_pos = self.ack_floor;
194 }
195
196 if let Some((_, end)) = self.acked_above.get(&self.read_pos) {
198 self.read_pos = end.saturating_add(1);
199 }
200
201 let _ = self.metrics.next.try_set(self.read_pos);
203 if self.read_pos >= size {
204 return Ok(None);
205 }
206
207 let item = reader.read(self.read_pos).await?;
208 let pos = self.read_pos;
209 self.read_pos += 1;
210 let _ = self.metrics.next.try_set(self.read_pos);
211 debug!(position = pos, "dequeued item");
212 Ok(Some((pos, item)))
213 }
214
215 pub async fn ack(&mut self, position: u64) -> Result<(), Error> {
223 let size = self.journal.size().await;
224 if position >= size {
225 return Err(Error::PositionOutOfRange(position, size));
226 }
227
228 if position < self.ack_floor {
230 return Ok(());
231 }
232
233 if self.acked_above.get(&position).is_some() {
235 return Ok(());
236 }
237
238 if position == self.ack_floor {
240 let next = position + 1;
242 let final_floor = match self.acked_above.get(&next) {
243 Some((_, end)) => end + 1,
244 None => next,
245 };
246 self.acked_above.remove(next, final_floor - 1);
247 self.ack_floor = final_floor;
248 let _ = self.metrics.floor.try_set(self.ack_floor);
249 debug!(floor = self.ack_floor, "advanced ack floor");
250 } else {
251 self.acked_above.insert(position);
253 debug!(position, "acked item above floor");
254 }
255 Ok(())
256 }
257
258 pub async fn ack_up_to(&mut self, up_to: u64) -> Result<(), Error> {
265 let size = self.journal.size().await;
266 if up_to > size {
267 return Err(Error::PositionOutOfRange(up_to, size));
268 }
269
270 if up_to <= self.ack_floor {
272 return Ok(());
273 }
274
275 let final_floor = match self.acked_above.get(&up_to) {
277 Some((_, end)) => end + 1,
278 None => up_to,
279 };
280
281 self.acked_above.remove(self.ack_floor, final_floor - 1);
283 self.ack_floor = final_floor;
284 let _ = self.metrics.floor.try_set(self.ack_floor);
285 debug!(floor = self.ack_floor, "batch acked up to");
286 Ok(())
287 }
288
289 pub const fn read_position(&self) -> u64 {
293 self.read_pos
294 }
295
296 pub const fn ack_floor(&self) -> u64 {
300 self.ack_floor
301 }
302
303 pub async fn size(&self) -> u64 {
308 self.journal.size().await
309 }
310
311 pub async fn is_empty(&self) -> bool {
313 self.ack_floor >= self.journal.size().await
316 }
317
318 pub fn reset(&mut self) {
321 let old_pos = self.read_pos;
322 self.read_pos = self.ack_floor;
323 let _ = self.metrics.next.try_set(self.read_pos);
324 debug!(
325 old_read_pos = old_pos,
326 new_read_pos = self.read_pos,
327 "reset read position"
328 );
329 }
330
331 #[cfg(test)]
333 pub(crate) async fn pending(&self) -> u64 {
334 self.journal.size().await.saturating_sub(self.read_pos)
335 }
336
337 #[cfg(test)]
339 pub(crate) fn acked_above_count(&self) -> usize {
340 self.acked_above
341 .iter()
342 .map(|(&s, &e)| (e - s + 1) as usize)
343 .sum()
344 }
345}
346
347impl<E: Clock + Storage + Metrics + Send, V: CodecShared + Send> Persistable for Queue<E, V> {
348 type Error = Error;
349
350 async fn commit(&self) -> Result<(), Error> {
351 self.journal.commit().await?;
352 Ok(())
353 }
354
355 async fn sync(&self) -> Result<(), Error> {
356 self.journal.sync().await?;
357 self.journal.prune(self.ack_floor).await?;
358 Ok(())
359 }
360
361 async fn destroy(self) -> Result<(), Error> {
362 self.journal.destroy().await?;
363 Ok(())
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370 use commonware_codec::RangeCfg;
371 use commonware_macros::test_traced;
372 use commonware_runtime::{buffer::paged::CacheRef, deterministic, BufferPooler, Runner};
373 use commonware_utils::{NZUsize, NZU16, NZU64};
374 use std::num::NonZeroU16;
375
376 const PAGE_SIZE: NonZeroU16 = NZU16!(1024);
377 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10);
378
379 fn test_config(partition: &str, pooler: &impl BufferPooler) -> Config<(RangeCfg<usize>, ())> {
380 Config {
381 partition: partition.into(),
382 items_per_section: NZU64!(10),
383 compression: None,
384 codec_config: ((0..).into(), ()),
385 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
386 write_buffer: NZUsize!(4096),
387 }
388 }
389
390 #[test_traced]
391 fn test_basic_enqueue_dequeue() {
392 let executor = deterministic::Runner::default();
393 executor.start(|context| async move {
394 let cfg = test_config("test_basic", &context);
395 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
396 .await
397 .unwrap();
398
399 assert!(queue.is_empty().await);
401 assert_eq!(queue.pending().await, 0);
402 assert_eq!(queue.size().await, 0);
403
404 let pos0 = queue.enqueue(b"item0".to_vec()).await.unwrap();
406 let pos1 = queue.enqueue(b"item1".to_vec()).await.unwrap();
407 let pos2 = queue.enqueue(b"item2".to_vec()).await.unwrap();
408
409 assert_eq!(pos0, 0);
410 assert_eq!(pos1, 1);
411 assert_eq!(pos2, 2);
412 assert_eq!(queue.size().await, 3);
413 assert_eq!(queue.pending().await, 3);
414 assert!(!queue.is_empty().await);
415
416 let (p, item) = queue.dequeue().await.unwrap().unwrap();
418 assert_eq!(p, 0);
419 assert_eq!(item, b"item0");
420 assert_eq!(queue.pending().await, 2);
421
422 let (p, item) = queue.dequeue().await.unwrap().unwrap();
423 assert_eq!(p, 1);
424 assert_eq!(item, b"item1");
425 assert_eq!(queue.pending().await, 1);
426
427 let (p, item) = queue.dequeue().await.unwrap().unwrap();
428 assert_eq!(p, 2);
429 assert_eq!(item, b"item2");
430 assert_eq!(queue.pending().await, 0);
431
432 assert!(!queue.is_empty().await);
434 assert!(queue.dequeue().await.unwrap().is_none());
435 });
436 }
437
438 #[test_traced]
439 fn test_append_commit_batch() {
440 let executor = deterministic::Runner::default();
441 executor.start(|context| async move {
442 let cfg = test_config("test_batch", &context);
443 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
444 .await
445 .unwrap();
446
447 for i in 0..5u8 {
449 queue.append(vec![i]).await.unwrap();
450 }
451 queue.commit().await.unwrap();
452 assert_eq!(queue.size().await, 5);
453
454 for i in 0..5 {
456 let (pos, item) = queue.dequeue().await.unwrap().unwrap();
457 assert_eq!(pos, i);
458 assert_eq!(item, vec![i as u8]);
459 }
460
461 for i in 5..8u8 {
463 queue.append(vec![i]).await.unwrap();
464 }
465 queue.commit().await.unwrap();
466 queue.enqueue(vec![8]).await.unwrap();
467 assert_eq!(queue.size().await, 9);
468
469 queue.ack_up_to(9).await.unwrap();
470 assert!(queue.is_empty().await);
471 });
472 }
473
474 #[test_traced]
475 fn test_append_commit_persistence() {
476 let executor = deterministic::Runner::default();
477 executor.start(|context| async move {
478 let cfg = test_config("test_batch_persist", &context);
479
480 {
481 let mut queue = Queue::<_, Vec<u8>>::init(context.with_label("first"), cfg.clone())
482 .await
483 .unwrap();
484 for i in 0..4u8 {
485 queue.append(vec![i]).await.unwrap();
486 }
487 queue.commit().await.unwrap();
488 queue.sync().await.unwrap();
489 }
490
491 {
492 let mut queue = Queue::<_, Vec<u8>>::init(context.with_label("second"), cfg)
493 .await
494 .unwrap();
495 assert_eq!(queue.size().await, 4);
496 for i in 0..4 {
497 let (pos, item) = queue.dequeue().await.unwrap().unwrap();
498 assert_eq!(pos, i);
499 assert_eq!(item, vec![i as u8]);
500 }
501 }
502 });
503 }
504
505 #[test_traced]
506 fn test_sequential_ack() {
507 let executor = deterministic::Runner::default();
508 executor.start(|context| async move {
509 let cfg = test_config("test_seq_ack", &context);
510 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
511 .await
512 .unwrap();
513
514 for i in 0..5u8 {
516 queue.enqueue(vec![i]).await.unwrap();
517 }
518
519 for i in 0..5 {
521 let (pos, _) = queue.dequeue().await.unwrap().unwrap();
522 assert_eq!(pos, i);
523 queue.ack(pos).await.unwrap();
524 assert_eq!(queue.ack_floor(), i + 1);
525 }
526
527 assert!(queue.is_empty().await);
529 assert_eq!(queue.ack_floor(), 5);
530 });
531 }
532
533 #[test_traced]
534 fn test_out_of_order_ack() {
535 let executor = deterministic::Runner::default();
536 executor.start(|context| async move {
537 let cfg = test_config("test_ooo_ack", &context);
538 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
539 .await
540 .unwrap();
541
542 for i in 0..5u8 {
544 queue.enqueue(vec![i]).await.unwrap();
545 }
546
547 for _ in 0..5 {
549 queue.dequeue().await.unwrap();
550 }
551
552 queue.ack(2).await.unwrap();
554 assert_eq!(queue.ack_floor(), 0); assert!(queue.is_acked(2));
556
557 queue.ack(4).await.unwrap();
558 assert_eq!(queue.ack_floor(), 0);
559 assert!(queue.is_acked(4));
560
561 queue.ack(1).await.unwrap();
562 assert_eq!(queue.ack_floor(), 0);
563
564 queue.ack(3).await.unwrap();
565 assert_eq!(queue.ack_floor(), 0);
566
567 queue.ack(0).await.unwrap();
569 assert_eq!(queue.ack_floor(), 5);
570 assert!(queue.is_empty().await);
571 });
572 }
573
574 #[test_traced]
575 fn test_ack_up_to() {
576 let executor = deterministic::Runner::default();
577 executor.start(|context| async move {
578 let cfg = test_config("test_ack_up_to", &context);
579 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
580 .await
581 .unwrap();
582
583 for i in 0..10u8 {
585 queue.enqueue(vec![i]).await.unwrap();
586 }
587
588 queue.ack_up_to(5).await.unwrap();
590 assert_eq!(queue.ack_floor(), 5);
591
592 for i in 0..5 {
594 assert!(queue.is_acked(i));
595 }
596 for i in 5..10 {
598 assert!(!queue.is_acked(i));
599 }
600
601 let (p, _) = queue.dequeue().await.unwrap().unwrap();
603 assert_eq!(p, 5);
604 });
605 }
606
607 #[test_traced]
608 fn test_ack_up_to_with_existing_acks() {
609 let executor = deterministic::Runner::default();
610 executor.start(|context| async move {
611 let cfg = test_config("test_ack_up_to_existing", &context);
612 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
613 .await
614 .unwrap();
615
616 for i in 0..10u8 {
618 queue.enqueue(vec![i]).await.unwrap();
619 }
620
621 queue.ack(7).await.unwrap();
623 queue.ack(8).await.unwrap();
624 assert_eq!(queue.acked_above_count(), 2);
625
626 queue.ack_up_to(5).await.unwrap();
628 assert_eq!(queue.ack_floor(), 5);
629 assert_eq!(queue.acked_above_count(), 2);
630
631 queue.ack_up_to(9).await.unwrap();
633 assert_eq!(queue.ack_floor(), 9);
634 assert_eq!(queue.acked_above_count(), 0);
635 });
636 }
637
638 #[test_traced]
639 fn test_ack_up_to_coalesces_with_acked_above() {
640 let executor = deterministic::Runner::default();
641 executor.start(|context| async move {
642 let cfg = test_config("test_ack_up_to_coalesce", &context);
643 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
644 .await
645 .unwrap();
646
647 for i in 0..10u8 {
649 queue.enqueue(vec![i]).await.unwrap();
650 }
651
652 queue.ack(5).await.unwrap();
654 queue.ack(6).await.unwrap();
655 queue.ack(7).await.unwrap();
656 assert_eq!(queue.ack_floor(), 0);
657
658 queue.ack_up_to(5).await.unwrap();
660 assert_eq!(queue.ack_floor(), 8); });
662 }
663
664 #[test_traced]
665 fn test_ack_up_to_errors() {
666 let executor = deterministic::Runner::default();
667 executor.start(|context| async move {
668 let cfg = test_config("test_ack_up_to_errors", &context);
669 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
670 .await
671 .unwrap();
672
673 queue.enqueue(b"item0".to_vec()).await.unwrap();
674 queue.enqueue(b"item1".to_vec()).await.unwrap();
675
676 let err = queue.ack_up_to(5).await.unwrap_err();
678 assert!(matches!(err, Error::PositionOutOfRange(5, 2)));
679
680 queue.ack_up_to(2).await.unwrap();
682 assert_eq!(queue.ack_floor(), 2);
683
684 queue.ack_up_to(1).await.unwrap();
686 assert_eq!(queue.ack_floor(), 2);
687 });
688 }
689
690 #[test_traced]
691 fn test_dequeue_skips_acked() {
692 let executor = deterministic::Runner::default();
693 executor.start(|context| async move {
694 let cfg = test_config("test_skip_acked", &context);
695 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
696 .await
697 .unwrap();
698
699 for i in 0..5u8 {
701 queue.enqueue(vec![i]).await.unwrap();
702 }
703
704 queue.ack(1).await.unwrap();
706 queue.ack(3).await.unwrap();
707
708 let (p, item) = queue.dequeue().await.unwrap().unwrap();
710 assert_eq!(p, 0);
711 assert_eq!(item, vec![0]);
712
713 let (p, item) = queue.dequeue().await.unwrap().unwrap();
714 assert_eq!(p, 2); assert_eq!(item, vec![2]);
716
717 let (p, item) = queue.dequeue().await.unwrap().unwrap();
718 assert_eq!(p, 4); assert_eq!(item, vec![4]);
720
721 assert!(queue.dequeue().await.unwrap().is_none());
722 });
723 }
724
725 #[test_traced]
726 fn test_ack_errors() {
727 let executor = deterministic::Runner::default();
728 executor.start(|context| async move {
729 let cfg = test_config("test_ack_errors", &context);
730 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
731 .await
732 .unwrap();
733
734 queue.enqueue(b"item0".to_vec()).await.unwrap();
735 queue.enqueue(b"item1".to_vec()).await.unwrap();
736
737 let err = queue.ack(5).await.unwrap_err();
739 assert!(matches!(err, Error::PositionOutOfRange(5, 2)));
740
741 queue.ack(1).await.unwrap();
743 assert!(queue.is_acked(1));
744
745 queue.ack(1).await.unwrap();
747 });
748 }
749
750 #[test_traced]
751 fn test_prune() {
752 let executor = deterministic::Runner::default();
753 executor.start(|context| async move {
754 let cfg = test_config("test_prune", &context);
755 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
756 .await
757 .unwrap();
758
759 for i in 0..25u8 {
761 queue.enqueue(vec![i]).await.unwrap();
762 }
763 queue.sync().await.unwrap();
764
765 for i in 0..15 {
767 queue.dequeue().await.unwrap();
768 queue.ack(i).await.unwrap();
769 }
770 assert_eq!(queue.ack_floor(), 15);
771
772 let (p, item) = queue.dequeue().await.unwrap().unwrap();
774 assert_eq!(p, 15);
775 assert_eq!(item, vec![15]);
776 });
777 }
778
779 #[test_traced]
780 fn test_ack_across_sections() {
781 let executor = deterministic::Runner::default();
782 executor.start(|context| async move {
783 let cfg = test_config("test_multi_prune", &context);
784 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
785 .await
786 .unwrap();
787
788 for i in 0..50u8 {
790 queue.enqueue(vec![i]).await.unwrap();
791 }
792 queue.sync().await.unwrap();
793
794 for i in 0..15 {
796 queue.dequeue().await.unwrap();
797 queue.ack(i).await.unwrap();
798 }
799 assert_eq!(queue.ack_floor(), 15);
800
801 let (p, item) = queue.dequeue().await.unwrap().unwrap();
803 assert_eq!(p, 15);
804 assert_eq!(item, vec![15]);
805
806 queue.ack(15).await.unwrap();
808 for i in 16..30 {
809 queue.dequeue().await.unwrap();
810 queue.ack(i).await.unwrap();
811 }
812 assert_eq!(queue.ack_floor(), 30);
813
814 let (p, item) = queue.dequeue().await.unwrap().unwrap();
816 assert_eq!(p, 30);
817 assert_eq!(item, vec![30]);
818
819 queue.ack(30).await.unwrap();
821 for i in 31..50 {
822 queue.dequeue().await.unwrap();
823 queue.ack(i).await.unwrap();
824 }
825 assert_eq!(queue.ack_floor(), 50);
826
827 assert!(queue.is_empty().await);
829 assert!(queue.dequeue().await.unwrap().is_none());
830 });
831 }
832
833 #[test_traced]
834 fn test_crash_recovery_replays_from_pruning_boundary() {
835 let executor = deterministic::Runner::default();
837 executor.start(|context| async move {
838 let cfg = test_config("test_recovery_replay", &context);
839
840 {
842 let mut queue = Queue::<_, Vec<u8>>::init(context.with_label("first"), cfg.clone())
843 .await
844 .unwrap();
845
846 for i in 0..5u8 {
847 queue.enqueue(vec![i]).await.unwrap();
848 }
849
850 queue.ack(0).await.unwrap();
852 queue.ack(1).await.unwrap();
853 queue.ack(2).await.unwrap();
854 assert_eq!(queue.ack_floor(), 3);
855
856 queue.sync().await.unwrap();
857 }
858
859 {
861 let mut queue =
862 Queue::<_, Vec<u8>>::init(context.with_label("second"), cfg.clone())
863 .await
864 .unwrap();
865
866 assert_eq!(queue.ack_floor(), 0);
868
869 for i in 0..5 {
871 let (p, _) = queue.dequeue().await.unwrap().unwrap();
872 assert_eq!(p, i);
873 }
874 }
875 });
876 }
877
878 #[test_traced]
879 fn test_crash_recovery_with_pruning() {
880 let executor = deterministic::Runner::default();
882 executor.start(|context| async move {
883 let cfg = test_config("test_recovery_pruned", &context);
884
885 let expected_pruning_boundary = {
887 let mut queue = Queue::<_, Vec<u8>>::init(context.with_label("first"), cfg.clone())
888 .await
889 .unwrap();
890
891 for i in 0..25u8 {
893 queue.enqueue(vec![i]).await.unwrap();
894 }
895
896 for i in 0..15 {
898 queue.ack(i).await.unwrap();
899 }
900 assert_eq!(queue.ack_floor(), 15);
901
902 queue.sync().await.unwrap();
904
905 let pruning_boundary = queue.journal.bounds().await.start;
907 assert!(pruning_boundary > 0, "expected some pruning to occur");
908
909 pruning_boundary
910 };
911
912 {
914 let mut queue =
915 Queue::<_, Vec<u8>>::init(context.with_label("second"), cfg.clone())
916 .await
917 .unwrap();
918
919 let pruning_boundary = queue.journal.bounds().await.start;
921 assert_eq!(queue.ack_floor(), pruning_boundary);
922 assert_eq!(pruning_boundary, expected_pruning_boundary);
923
924 for i in pruning_boundary..25 {
926 let (p, item) = queue.dequeue().await.unwrap().unwrap();
927 assert_eq!(p, i);
928 assert_eq!(item, vec![i as u8]);
929 }
930
931 assert!(queue.dequeue().await.unwrap().is_none());
932 }
933 });
934 }
935
936 #[test_traced]
937 fn test_reset() {
938 let executor = deterministic::Runner::default();
939 executor.start(|context| async move {
940 let cfg = test_config("test_reset", &context);
941 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
942 .await
943 .unwrap();
944
945 for i in 0..5u8 {
947 queue.enqueue(vec![i]).await.unwrap();
948 }
949
950 queue.dequeue().await.unwrap();
952 queue.dequeue().await.unwrap();
953 queue.dequeue().await.unwrap();
954 assert_eq!(queue.read_position(), 3);
955
956 queue.reset();
958 assert_eq!(queue.read_position(), 0);
959
960 let (p, item) = queue.dequeue().await.unwrap().unwrap();
962 assert_eq!(p, 0);
963 assert_eq!(item, vec![0]);
964 });
965 }
966
967 #[test_traced]
968 fn test_reset_with_ack() {
969 let executor = deterministic::Runner::default();
970 executor.start(|context| async move {
971 let cfg = test_config("test_reset_ack", &context);
972 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
973 .await
974 .unwrap();
975
976 for i in 0..10u8 {
978 queue.enqueue(vec![i]).await.unwrap();
979 }
980
981 for i in 0..5 {
983 queue.dequeue().await.unwrap();
984 queue.ack(i).await.unwrap();
985 }
986 assert_eq!(queue.ack_floor(), 5);
987 assert_eq!(queue.read_position(), 5);
988
989 queue.dequeue().await.unwrap();
991 queue.dequeue().await.unwrap();
992 assert_eq!(queue.read_position(), 7);
993
994 queue.reset();
996 assert_eq!(queue.read_position(), 5);
997
998 let (p, item) = queue.dequeue().await.unwrap().unwrap();
1000 assert_eq!(p, 5);
1001 assert_eq!(item, vec![5]);
1002 });
1003 }
1004
1005 #[test_traced]
1006 fn test_empty_queue_operations() {
1007 let executor = deterministic::Runner::default();
1008 executor.start(|context| async move {
1009 let cfg = test_config("test_empty", &context);
1010 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
1011 .await
1012 .unwrap();
1013
1014 assert!(queue.is_empty().await);
1016 assert!(queue.dequeue().await.unwrap().is_none());
1017 queue.sync().await.unwrap();
1018 queue.reset();
1019 });
1020 }
1021
1022 #[test_traced]
1023 fn test_persistence() {
1024 let executor = deterministic::Runner::default();
1025 executor.start(|context| async move {
1026 let cfg = test_config("test_persist", &context);
1027
1028 {
1030 let mut queue = Queue::<_, Vec<u8>>::init(context.with_label("first"), cfg.clone())
1031 .await
1032 .unwrap();
1033
1034 queue.enqueue(b"item0".to_vec()).await.unwrap();
1035 queue.enqueue(b"item1".to_vec()).await.unwrap();
1036 queue.sync().await.unwrap();
1037 }
1038
1039 {
1041 let mut queue =
1042 Queue::<_, Vec<u8>>::init(context.with_label("second"), cfg.clone())
1043 .await
1044 .unwrap();
1045
1046 assert_eq!(queue.size().await, 2);
1047
1048 let (_, item) = queue.dequeue().await.unwrap().unwrap();
1049 assert_eq!(item, b"item0");
1050
1051 let (_, item) = queue.dequeue().await.unwrap().unwrap();
1052 assert_eq!(item, b"item1");
1053 }
1054 });
1055 }
1056
1057 #[test_traced]
1058 fn test_large_queue_with_sparse_acks() {
1059 let executor = deterministic::Runner::default();
1060 executor.start(|context| async move {
1061 let cfg = test_config("test_sparse", &context);
1062 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
1063 .await
1064 .unwrap();
1065
1066 for i in 0..100u8 {
1068 queue.enqueue(vec![i]).await.unwrap();
1069 }
1070
1071 for i in (0..100).step_by(3) {
1073 queue.ack(i).await.unwrap();
1074 }
1075
1076 let mut received = Vec::new();
1078 while let Some((pos, _)) = queue.dequeue().await.unwrap() {
1079 received.push(pos);
1080 }
1081
1082 let expected: Vec<u64> = (0..100).filter(|x| x % 3 != 0).collect();
1084 assert_eq!(received, expected);
1085 });
1086 }
1087
1088 #[test_traced]
1089 fn test_acked_above_coalescing() {
1090 let executor = deterministic::Runner::default();
1091 executor.start(|context| async move {
1092 let cfg = test_config("test_coalesce", &context);
1093 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
1094 .await
1095 .unwrap();
1096
1097 for i in 0..10u8 {
1099 queue.enqueue(vec![i]).await.unwrap();
1100 }
1101
1102 for i in 1..9 {
1104 queue.ack(i).await.unwrap();
1105 }
1106
1107 assert_eq!(queue.ack_floor(), 0);
1109 assert!(queue.acked_above_count() > 0);
1110
1111 queue.ack(0).await.unwrap();
1113 assert_eq!(queue.ack_floor(), 9);
1114 assert_eq!(queue.acked_above_count(), 0);
1115 });
1116 }
1117
1118 #[test_traced]
1119 fn test_ack_up_to_past_read_pos() {
1120 let executor = deterministic::Runner::default();
1121 executor.start(|context| async move {
1122 let cfg = test_config("test_ack_up_to_past_read_pos", &context);
1123 let mut queue = Queue::<_, Vec<u8>>::init(context.clone(), cfg)
1124 .await
1125 .unwrap();
1126
1127 for i in 0..10u8 {
1128 queue.enqueue(vec![i]).await.unwrap();
1129 }
1130
1131 for _ in 0..3 {
1133 queue.dequeue().await.unwrap();
1134 }
1135 assert_eq!(queue.read_position(), 3);
1136
1137 queue.ack_up_to(7).await.unwrap();
1139 assert_eq!(queue.ack_floor(), 7);
1140
1141 let (pos, item) = queue.dequeue().await.unwrap().unwrap();
1143 assert_eq!(pos, 7);
1144 assert_eq!(item, vec![7]);
1145 });
1146 }
1147
1148 #[test_traced]
1149 fn test_metrics() {
1150 let executor = deterministic::Runner::default();
1151 executor.start(|context| async move {
1152 let cfg = test_config("test-metrics", &context);
1153 let ctx = context.with_label("test_metrics");
1154 let mut queue = Queue::<_, Vec<u8>>::init(ctx, cfg).await.unwrap();
1155
1156 let encoded = context.encode();
1157 assert!(
1158 encoded.contains("test_metrics_tip 0"),
1159 "expected tip 0: {encoded}"
1160 );
1161 assert!(
1162 encoded.contains("test_metrics_floor 0"),
1163 "expected floor 0: {encoded}"
1164 );
1165 assert!(
1166 encoded.contains("test_metrics_next 0"),
1167 "expected next 0: {encoded}"
1168 );
1169
1170 queue.append(vec![0]).await.unwrap();
1172 let encoded = context.encode();
1173 assert!(
1174 encoded.contains("test_metrics_tip 1"),
1175 "expected tip 1: {encoded}"
1176 );
1177 queue.commit().await.unwrap();
1178
1179 for i in 1..10u8 {
1181 queue.enqueue(vec![i]).await.unwrap();
1182 }
1183 let encoded = context.encode();
1184 assert!(
1185 encoded.contains("test_metrics_tip 10"),
1186 "expected tip 10: {encoded}"
1187 );
1188
1189 queue.dequeue().await.unwrap();
1191 queue.dequeue().await.unwrap();
1192 let encoded = context.encode();
1193 assert!(
1194 encoded.contains("test_metrics_next 2"),
1195 "expected next 2: {encoded}"
1196 );
1197
1198 queue.ack(0).await.unwrap();
1200 queue.ack(1).await.unwrap();
1201 let encoded = context.encode();
1202 assert!(
1203 encoded.contains("test_metrics_floor 2"),
1204 "expected floor 2: {encoded}"
1205 );
1206
1207 queue.ack(4).await.unwrap();
1209 queue.ack(6).await.unwrap();
1210 let encoded = context.encode();
1211 assert!(
1212 encoded.contains("test_metrics_floor 2"),
1213 "expected floor still 2: {encoded}"
1214 );
1215
1216 queue.ack(2).await.unwrap();
1218 queue.ack(3).await.unwrap();
1219 let encoded = context.encode();
1220 assert!(
1221 encoded.contains("test_metrics_floor 5"),
1222 "expected floor 5: {encoded}"
1223 );
1224
1225 queue.ack_up_to(8).await.unwrap();
1227 let encoded = context.encode();
1228 assert!(
1229 encoded.contains("test_metrics_floor 8"),
1230 "expected floor 8: {encoded}"
1231 );
1232
1233 queue.ack(8).await.unwrap();
1235 queue.ack(9).await.unwrap();
1236 let encoded = context.encode();
1237 assert!(
1238 encoded.contains("test_metrics_floor 10"),
1239 "expected floor 10: {encoded}"
1240 );
1241
1242 queue.reset();
1244 let encoded = context.encode();
1245 assert!(
1246 encoded.contains("test_metrics_next 10"),
1247 "expected next 10: {encoded}"
1248 );
1249 });
1250 }
1251
1252 #[test_traced]
1253 fn test_metrics_next_updates_on_fast_forward() {
1254 let executor = deterministic::Runner::default();
1255 executor.start(|context| async move {
1256 let cfg = test_config("test-ff", &context);
1257 let ctx = context.with_label("test_ff");
1258 let mut queue = Queue::<_, Vec<u8>>::init(ctx, cfg).await.unwrap();
1259
1260 for i in 0..3u8 {
1262 queue.enqueue(vec![i]).await.unwrap();
1263 }
1264 let (pos, _) = queue.dequeue().await.unwrap().unwrap();
1265 queue.ack(pos).await.unwrap();
1266
1267 let encoded = context.encode();
1268 assert!(
1269 encoded.contains("test_ff_next 1"),
1270 "expected next 1: {encoded}"
1271 );
1272
1273 queue.ack(2).await.unwrap();
1275 queue.ack(1).await.unwrap();
1276 assert_eq!(queue.ack_floor(), 3);
1277
1278 let encoded = context.encode();
1280 assert!(
1281 encoded.contains("test_ff_next 1"),
1282 "expected next still 1: {encoded}"
1283 );
1284
1285 assert!(queue.dequeue().await.unwrap().is_none());
1287 let encoded = context.encode();
1288 assert!(
1289 encoded.contains("test_ff_next 3"),
1290 "expected next 3 after fast-forward: {encoded}"
1291 );
1292 });
1293 }
1294}