1use std::{
2 collections::BTreeSet,
3 convert::TryInto,
4 io,
5 path::Path,
6 sync::{Arc, Condvar, Mutex},
7 time::Duration,
8};
9
10use borsh::{BorshDeserialize, BorshSerialize};
11use rocksdb::{
12 DBIteratorWithThreadMode, DBWithThreadMode, Direction, IteratorMode, MultiThreaded, Options,
13 WriteBatchWithTransaction, WriteOptions,
14};
15use thiserror::Error;
16
17use crate::event_queue::{EventQueue, EventQueueError};
18
19#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct DurableEvent<T> {
22 pub id: u64,
24 pub event: T,
26}
27
28#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
30pub struct DurableEventQueueOptions {
31 pub lazy: bool,
33}
34
35impl DurableEventQueueOptions {
36 pub fn lazy() -> Self {
38 Self { lazy: true }
39 }
40}
41
42#[derive(Error, Debug)]
44pub enum DurableEventQueueError {
45 #[error("event queue error: {0}")]
47 EventQueue(#[from] EventQueueError),
48
49 #[error("rocksdb error: {0}")]
51 RocksDb(#[from] rocksdb::Error),
52
53 #[error("durable event queue lock was poisoned")]
55 PoisonedLock,
56
57 #[error("failed to serialize durable event: {source}")]
59 Serialize { source: io::Error },
60
61 #[error("failed to deserialize durable event id {id}: {source}")]
63 Deserialize { id: u64, source: io::Error },
64
65 #[error("invalid durable event key length: expected 8 bytes, got {actual}")]
67 InvalidKeyLength { actual: usize },
68
69 #[error("event id space exhausted (u64 overflow)")]
71 IdOverflow,
72
73 #[error("no durable event with id {id}; it may have already been acked")]
75 UnknownEvent { id: u64 },
76}
77
78pub struct DurableEventQueue<T> {
80 db: Arc<DBWithThreadMode<MultiThreaded>>,
81 mode: DurableEventQueueMode<T>,
82 next_id: Mutex<u64>,
83}
84
85enum DurableEventQueueMode<T> {
86 Eager { queue: EventQueue<DurableEvent<T>> },
87 Lazy { state: LazyQueueState },
88}
89
90struct LazyQueueState {
91 in_flight: Mutex<BTreeSet<u64>>,
92 next_scan_id: Mutex<u64>,
93 generation: Mutex<u64>,
94 condvar: Condvar,
95}
96
97impl Default for LazyQueueState {
98 fn default() -> Self {
99 Self::new(0)
100 }
101}
102
103impl LazyQueueState {
104 fn new(next_scan_id: u64) -> Self {
105 Self {
106 in_flight: Mutex::new(BTreeSet::new()),
107 next_scan_id: Mutex::new(next_scan_id),
108 generation: Mutex::new(0),
109 condvar: Condvar::new(),
110 }
111 }
112
113 fn scan_cursor(&self) -> Result<u64, DurableEventQueueError> {
114 Ok(*self
115 .next_scan_id
116 .lock()
117 .map_err(|_| DurableEventQueueError::PoisonedLock)?)
118 }
119
120 fn set_scan_cursor(&self, next_scan_id: u64) -> Result<(), DurableEventQueueError> {
121 *self
122 .next_scan_id
123 .lock()
124 .map_err(|_| DurableEventQueueError::PoisonedLock)? = next_scan_id;
125 Ok(())
126 }
127
128 fn rewind_scan_cursor(&self, id: u64) -> Result<(), DurableEventQueueError> {
129 let mut next_scan_id = self
130 .next_scan_id
131 .lock()
132 .map_err(|_| DurableEventQueueError::PoisonedLock)?;
133 *next_scan_id = (*next_scan_id).min(id);
134 Ok(())
135 }
136
137 fn notify(&self) -> Result<(), DurableEventQueueError> {
138 let mut generation = self
139 .generation
140 .lock()
141 .map_err(|_| DurableEventQueueError::PoisonedLock)?;
142 *generation = generation.wrapping_add(1);
143 self.condvar.notify_one();
144 Ok(())
145 }
146
147 fn generation(&self) -> Result<u64, DurableEventQueueError> {
148 Ok(*self
149 .generation
150 .lock()
151 .map_err(|_| DurableEventQueueError::PoisonedLock)?)
152 }
153
154 fn wait_for_generation_change(
155 &self,
156 observed_generation: u64,
157 ) -> Result<(), DurableEventQueueError> {
158 let generation = self
159 .generation
160 .lock()
161 .map_err(|_| DurableEventQueueError::PoisonedLock)?;
162 let (_generation, _) = self
163 .condvar
164 .wait_timeout_while(generation, Duration::from_millis(100), |generation| {
165 *generation == observed_generation
166 })
167 .map_err(|_| DurableEventQueueError::PoisonedLock)?;
168 Ok(())
169 }
170}
171
172pub struct DurableEventQueueIterator<'a, T> {
174 inner: DBIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>,
175 _event: std::marker::PhantomData<T>,
176}
177
178impl<T> DurableEventQueue<T>
179where
180 T: BorshSerialize + BorshDeserialize + Clone,
181{
182 pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, DurableEventQueueError> {
187 Self::open_with_options(path, DurableEventQueueOptions::default())
188 }
189
190 pub fn open_with_options<P: AsRef<Path>>(
192 path: P,
193 options: DurableEventQueueOptions,
194 ) -> Result<Self, DurableEventQueueError> {
195 let mut opts = Options::default();
196 opts.create_if_missing(true);
197
198 let db = Arc::new(DBWithThreadMode::<MultiThreaded>::open(&opts, path)?);
199 Self::load_from_db(db, options)
200 }
201
202 fn load_from_db(
203 db: Arc<DBWithThreadMode<MultiThreaded>>,
204 options: DurableEventQueueOptions,
205 ) -> Result<Self, DurableEventQueueError> {
206 if options.lazy {
207 return Self::load_lazy_from_db(db);
208 }
209
210 Self::load_eager_from_db(db)
211 }
212
213 fn load_eager_from_db(
214 db: Arc<DBWithThreadMode<MultiThreaded>>,
215 ) -> Result<Self, DurableEventQueueError> {
216 let queue = EventQueue::new();
217 let mut next_id = 0;
218
219 for item in db.iterator(IteratorMode::Start) {
220 let (key, value) = item?;
221 let id = decode_key(key.as_ref())?;
222 let event = T::try_from_slice(value.as_ref())
223 .map_err(|source| DurableEventQueueError::Deserialize { id, source })?;
224 queue.push(DurableEvent { id, event })?;
225 next_id = next_id.max(id.saturating_add(1));
226 }
227
228 Ok(Self {
229 db,
230 mode: DurableEventQueueMode::Eager { queue },
231 next_id: Mutex::new(next_id),
232 })
233 }
234
235 fn load_lazy_from_db(
236 db: Arc<DBWithThreadMode<MultiThreaded>>,
237 ) -> Result<Self, DurableEventQueueError> {
238 let mut first_id = None;
239 let mut next_id = 0;
240
241 for item in db.iterator(IteratorMode::Start) {
242 let (key, _) = item?;
243 let id = decode_key(key.as_ref())?;
244 first_id.get_or_insert(id);
245 next_id = next_id.max(id.saturating_add(1));
246 }
247
248 Ok(Self {
249 db,
250 mode: DurableEventQueueMode::Lazy {
251 state: LazyQueueState::new(first_id.unwrap_or(next_id)),
252 },
253 next_id: Mutex::new(next_id),
254 })
255 }
256
257 pub fn push(&self, event: T) -> Result<DurableEvent<T>, DurableEventQueueError> {
265 let mut next_id = self
266 .next_id
267 .lock()
268 .map_err(|_| DurableEventQueueError::PoisonedLock)?;
269 let id = *next_id;
270 let next = id
271 .checked_add(1)
272 .ok_or(DurableEventQueueError::IdOverflow)?;
273
274 let durable_event = DurableEvent { id, event };
275 let value = borsh::to_vec(&durable_event.event)
276 .map_err(|source| DurableEventQueueError::Serialize { source })?;
277
278 self.db.put_opt(encode_key(id), value, &sync_writes())?;
279 *next_id = next;
280 match &self.mode {
281 DurableEventQueueMode::Eager { queue } => queue.push(durable_event.clone())?,
282 DurableEventQueueMode::Lazy { state } => state.notify()?,
283 }
284
285 Ok(durable_event)
286 }
287
288 pub fn get(&self, id: u64) -> Result<Option<DurableEvent<T>>, DurableEventQueueError> {
290 let Some(raw) = self.db.get(encode_key(id))? else {
291 return Ok(None);
292 };
293 let event = T::try_from_slice(&raw)
294 .map_err(|source| DurableEventQueueError::Deserialize { id, source })?;
295 Ok(Some(DurableEvent { id, event }))
296 }
297
298 pub fn poll(&self) -> Result<Option<DurableEvent<T>>, DurableEventQueueError> {
300 match &self.mode {
301 DurableEventQueueMode::Eager { queue } => Ok(queue.poll()?),
302 DurableEventQueueMode::Lazy { state } => {
303 let observed_generation = state.generation()?;
304 if let Some(event) = self.pop_lazy(state)? {
305 return Ok(Some(event));
306 }
307
308 state.wait_for_generation_change(observed_generation)?;
309 self.pop_lazy(state)
310 }
311 }
312 }
313
314 pub fn pop(&self) -> Result<Option<DurableEvent<T>>, DurableEventQueueError> {
316 match &self.mode {
317 DurableEventQueueMode::Eager { queue } => Ok(queue.pop()?),
318 DurableEventQueueMode::Lazy { state } => self.pop_lazy(state),
319 }
320 }
321
322 fn pop_lazy(
323 &self,
324 state: &LazyQueueState,
325 ) -> Result<Option<DurableEvent<T>>, DurableEventQueueError> {
326 let mut in_flight = state
327 .in_flight
328 .lock()
329 .map_err(|_| DurableEventQueueError::PoisonedLock)?;
330 let start_scan_id = state.scan_cursor()?;
331 let start_key = encode_key(start_scan_id);
332 let mut next_scan_id = start_scan_id;
333
334 for item in self
335 .db
336 .iterator(IteratorMode::From(&start_key, Direction::Forward))
337 {
338 let (key, value) = item?;
339 let id = decode_key(key.as_ref())?;
340 next_scan_id = id.saturating_add(1);
341 if in_flight.contains(&id) {
342 continue;
343 }
344
345 let event = T::try_from_slice(value.as_ref())
346 .map_err(|source| DurableEventQueueError::Deserialize { id, source })?;
347 in_flight.insert(id);
348 state.set_scan_cursor(next_scan_id)?;
349 return Ok(Some(DurableEvent { id, event }));
350 }
351
352 state.set_scan_cursor(next_scan_id)?;
353 Ok(None)
354 }
355
356 pub fn ack(&self, id: u64) -> Result<(), DurableEventQueueError> {
362 self.db.delete_opt(encode_key(id), &sync_writes())?;
363 if let DurableEventQueueMode::Lazy { state } = &self.mode {
364 state
365 .in_flight
366 .lock()
367 .map_err(|_| DurableEventQueueError::PoisonedLock)?
368 .remove(&id);
369 }
370 Ok(())
371 }
372
373 pub fn ack_many<I>(&self, ids: I) -> Result<(), DurableEventQueueError>
378 where
379 I: IntoIterator<Item = u64>,
380 {
381 let ids = ids.into_iter().collect::<Vec<_>>();
382 let mut batch = WriteBatchWithTransaction::<false>::default();
383 for id in &ids {
384 batch.delete(encode_key(*id));
385 }
386 self.db.write_opt(batch, &sync_writes())?;
387
388 if let DurableEventQueueMode::Lazy { state } = &self.mode {
389 let mut in_flight = state
390 .in_flight
391 .lock()
392 .map_err(|_| DurableEventQueueError::PoisonedLock)?;
393 for id in ids {
394 in_flight.remove(&id);
395 }
396 }
397
398 Ok(())
399 }
400
401 pub fn nack(&self, id: u64) -> Result<(), DurableEventQueueError> {
406 let raw = self.db.get(encode_key(id))?;
407 match &self.mode {
408 DurableEventQueueMode::Eager { queue } => {
409 let raw = raw.ok_or(DurableEventQueueError::UnknownEvent { id })?;
410 let event = T::try_from_slice(&raw)
411 .map_err(|source| DurableEventQueueError::Deserialize { id, source })?;
412 queue.push_front(DurableEvent { id, event })?;
413 }
414 DurableEventQueueMode::Lazy { state } => {
415 if raw.is_none() {
416 return Err(DurableEventQueueError::UnknownEvent { id });
417 }
418 {
419 let mut in_flight = state
420 .in_flight
421 .lock()
422 .map_err(|_| DurableEventQueueError::PoisonedLock)?;
423 in_flight.remove(&id);
424 state.rewind_scan_cursor(id)?;
425 }
426 state.notify()?;
427 }
428 }
429 Ok(())
430 }
431
432 pub fn len(&self) -> Result<usize, DurableEventQueueError> {
438 let mut count = 0;
439 for item in self.db.iterator(IteratorMode::Start) {
440 item?;
441 count += 1;
442 }
443 Ok(count)
444 }
445
446 pub fn is_empty(&self) -> Result<bool, DurableEventQueueError> {
448 let mut iter = self.db.iterator(IteratorMode::Start);
449 match iter.next() {
450 Some(Ok(_)) => Ok(false),
451 Some(Err(err)) => Err(DurableEventQueueError::RocksDb(err)),
452 None => Ok(true),
453 }
454 }
455
456 pub fn ready_len(&self) -> Result<usize, DurableEventQueueError> {
458 match &self.mode {
459 DurableEventQueueMode::Eager { queue } => Ok(queue.len()?),
460 DurableEventQueueMode::Lazy { state } => {
461 let in_flight = state
462 .in_flight
463 .lock()
464 .map_err(|_| DurableEventQueueError::PoisonedLock)?;
465 let mut count = 0;
466 for item in self.db.iterator(IteratorMode::Start) {
467 let (key, _) = item?;
468 let id = decode_key(key.as_ref())?;
469 if !in_flight.contains(&id) {
470 count += 1;
471 }
472 }
473 Ok(count)
474 }
475 }
476 }
477
478 pub fn iterator(&self) -> DurableEventQueueIterator<'_, T> {
480 DurableEventQueueIterator {
481 inner: self.db.iterator(IteratorMode::Start),
482 _event: std::marker::PhantomData,
483 }
484 }
485}
486
487impl<'a, T> Iterator for DurableEventQueueIterator<'a, T>
488where
489 T: BorshDeserialize,
490{
491 type Item = Result<DurableEvent<T>, DurableEventQueueError>;
492
493 fn next(&mut self) -> Option<Self::Item> {
494 let (key, value) = match self.inner.next()? {
495 Ok(kv) => kv,
496 Err(err) => return Some(Err(DurableEventQueueError::RocksDb(err))),
497 };
498 let id = match decode_key(key.as_ref()) {
499 Ok(id) => id,
500 Err(err) => return Some(Err(err)),
501 };
502 let event = match T::try_from_slice(value.as_ref()) {
503 Ok(event) => event,
504 Err(source) => {
505 return Some(Err(DurableEventQueueError::Deserialize { id, source }));
506 }
507 };
508 Some(Ok(DurableEvent { id, event }))
509 }
510}
511
512fn encode_key(id: u64) -> [u8; 8] {
513 id.to_be_bytes()
514}
515
516fn decode_key(key: &[u8]) -> Result<u64, DurableEventQueueError> {
517 let bytes: [u8; 8] = key
518 .try_into()
519 .map_err(|_| DurableEventQueueError::InvalidKeyLength { actual: key.len() })?;
520 Ok(u64::from_be_bytes(bytes))
521}
522
523fn sync_writes() -> WriteOptions {
524 let mut opts = WriteOptions::default();
525 opts.set_sync(true);
526 opts
527}
528
529#[cfg(test)]
530mod tests {
531 use std::{
532 sync::{
533 atomic::{AtomicUsize, Ordering},
534 Arc,
535 },
536 thread,
537 time::{Duration, Instant},
538 };
539
540 use super::*;
541
542 #[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)]
543 struct TestEvent {
544 value: String,
545 }
546
547 static COUNTING_EVENT_DESERIALIZE_COUNT: AtomicUsize = AtomicUsize::new(0);
548
549 #[derive(Debug, Clone, PartialEq, Eq, BorshSerialize)]
550 struct CountingEvent {
551 value: String,
552 }
553
554 impl BorshDeserialize for CountingEvent {
555 fn deserialize_reader<R: std::io::Read>(reader: &mut R) -> Result<Self, borsh::io::Error> {
556 COUNTING_EVENT_DESERIALIZE_COUNT.fetch_add(1, Ordering::SeqCst);
557 Ok(Self {
558 value: String::deserialize_reader(reader)?,
559 })
560 }
561 }
562
563 #[test]
564 fn push_persists_event_until_ack() {
565 let temp_dir = tempfile::tempdir().unwrap();
566 let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
567
568 let pushed = queue
569 .push(TestEvent {
570 value: "first".to_string(),
571 })
572 .unwrap();
573
574 assert_eq!(pushed.id, 0);
575 assert_eq!(queue.len().unwrap(), 1);
576 assert_eq!(queue.poll().unwrap(), Some(pushed.clone()));
577 assert_eq!(queue.ready_len().unwrap(), 0);
578 assert_eq!(queue.len().unwrap(), 1);
579
580 drop(queue);
581
582 let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
583 assert_eq!(queue.poll().unwrap(), Some(pushed.clone()));
584
585 queue.ack(pushed.id).unwrap();
586 drop(queue);
587
588 let queue = DurableEventQueue::<TestEvent>::open(temp_dir.path()).unwrap();
589 assert!(queue.is_empty().unwrap());
590 }
591
592 #[test]
593 fn get_returns_event_by_id_without_polling() {
594 let temp_dir = tempfile::tempdir().unwrap();
595 let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
596
597 let first = queue
598 .push(TestEvent {
599 value: "first".to_string(),
600 })
601 .unwrap();
602 let second = queue
603 .push(TestEvent {
604 value: "second".to_string(),
605 })
606 .unwrap();
607
608 assert_eq!(queue.get(second.id).unwrap(), Some(second.clone()));
609 assert_eq!(queue.ready_len().unwrap(), 2);
610 assert_eq!(queue.poll().unwrap(), Some(first));
611 assert_eq!(queue.poll().unwrap(), Some(second));
612 }
613
614 #[test]
615 fn get_returns_none_after_ack() {
616 let temp_dir = tempfile::tempdir().unwrap();
617 let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
618
619 let pushed = queue
620 .push(TestEvent {
621 value: "first".to_string(),
622 })
623 .unwrap();
624
625 assert_eq!(queue.get(pushed.id).unwrap(), Some(pushed.clone()));
626 queue.ack(pushed.id).unwrap();
627 assert_eq!(queue.get(pushed.id).unwrap(), None);
628 }
629
630 #[test]
631 fn push_at_id_max_overflows() {
632 let temp_dir = tempfile::tempdir().unwrap();
633 let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
634
635 *queue.next_id.lock().unwrap() = u64::MAX;
637
638 let err = queue
639 .push(TestEvent {
640 value: "boom".to_string(),
641 })
642 .unwrap_err();
643
644 assert!(
645 matches!(err, DurableEventQueueError::IdOverflow),
646 "expected IdOverflow, got {err}"
647 );
648 assert!(queue.is_empty().unwrap(), "no event should be written");
649 }
650
651 #[test]
652 fn nack_requeues_event_for_reprocessing() {
653 let temp_dir = tempfile::tempdir().unwrap();
654 let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
655
656 let pushed = queue
657 .push(TestEvent {
658 value: "retry me".to_string(),
659 })
660 .unwrap();
661
662 let taken = queue.pop().unwrap().unwrap();
664 assert_eq!(taken, pushed);
665 assert_eq!(queue.ready_len().unwrap(), 0);
666
667 queue.nack(pushed.id).unwrap();
669 assert_eq!(queue.ready_len().unwrap(), 1);
670
671 let retried = queue.pop().unwrap().unwrap();
673 assert_eq!(retried, pushed);
674
675 assert_eq!(queue.len().unwrap(), 1);
677 queue.ack(pushed.id).unwrap();
678 assert!(queue.is_empty().unwrap());
679 }
680
681 #[test]
682 fn nack_requeues_event_before_ready_events() {
683 let temp_dir = tempfile::tempdir().unwrap();
684 let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
685
686 let first = queue
687 .push(TestEvent {
688 value: "first".to_string(),
689 })
690 .unwrap();
691 let second = queue
692 .push(TestEvent {
693 value: "second".to_string(),
694 })
695 .unwrap();
696
697 assert_eq!(queue.pop().unwrap().unwrap(), first);
698 queue.nack(first.id).unwrap();
699
700 assert_eq!(queue.pop().unwrap().unwrap(), first);
701 assert_eq!(queue.pop().unwrap().unwrap(), second);
702 }
703
704 #[test]
705 fn nack_unknown_id_returns_error() {
706 let temp_dir = tempfile::tempdir().unwrap();
707 let queue = DurableEventQueue::<TestEvent>::open(temp_dir.path()).unwrap();
708
709 let err = queue.nack(99).unwrap_err();
710 assert!(
711 matches!(err, DurableEventQueueError::UnknownEvent { id: 99 }),
712 "expected UnknownEvent(99), got {err}"
713 );
714 }
715
716 #[test]
717 fn ack_many_removes_multiple_events() {
718 let temp_dir = tempfile::tempdir().unwrap();
719 let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
720
721 let first = queue
722 .push(TestEvent {
723 value: "first".to_string(),
724 })
725 .unwrap();
726 let second = queue
727 .push(TestEvent {
728 value: "second".to_string(),
729 })
730 .unwrap();
731 let third = queue
732 .push(TestEvent {
733 value: "third".to_string(),
734 })
735 .unwrap();
736
737 queue.ack_many([first.id, third.id]).unwrap();
738 drop(queue);
739
740 let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
741 assert_eq!(queue.pop().unwrap(), Some(second));
742 assert_eq!(queue.pop().unwrap(), None);
743 }
744
745 #[test]
746 fn ack_many_is_idempotent_for_unknown_ids() {
747 let temp_dir = tempfile::tempdir().unwrap();
748 let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
749
750 let pushed = queue
751 .push(TestEvent {
752 value: "first".to_string(),
753 })
754 .unwrap();
755
756 queue.ack_many([pushed.id, 99, pushed.id]).unwrap();
757 queue.ack_many([pushed.id, 99]).unwrap();
758 drop(queue);
759
760 let queue = DurableEventQueue::<TestEvent>::open(temp_dir.path()).unwrap();
761 assert!(queue.is_empty().unwrap());
762 }
763
764 #[test]
765 fn lazy_recovered_events_keep_fifo_order() {
766 let temp_dir = tempfile::tempdir().unwrap();
767 let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
768
769 let first = queue
770 .push(TestEvent {
771 value: "first".to_string(),
772 })
773 .unwrap();
774 let second = queue
775 .push(TestEvent {
776 value: "second".to_string(),
777 })
778 .unwrap();
779
780 drop(queue);
781
782 let queue =
783 DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
784 .unwrap();
785
786 assert_eq!(queue.pop().unwrap(), Some(first));
787 assert_eq!(queue.pop().unwrap(), Some(second));
788 assert_eq!(queue.pop().unwrap(), None);
789 }
790
791 #[test]
792 fn lazy_open_does_not_deserialize_recovered_values_until_poll() {
793 let temp_dir = tempfile::tempdir().unwrap();
794 let event = CountingEvent {
795 value: "large payload".to_string(),
796 };
797 {
798 let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
799 queue.push(event.clone()).unwrap();
800 }
801
802 COUNTING_EVENT_DESERIALIZE_COUNT.store(0, Ordering::SeqCst);
803 let queue = DurableEventQueue::<CountingEvent>::open_with_options(
804 temp_dir.path(),
805 DurableEventQueueOptions::lazy(),
806 )
807 .unwrap();
808
809 assert_eq!(COUNTING_EVENT_DESERIALIZE_COUNT.load(Ordering::SeqCst), 0);
810 assert_eq!(queue.ready_len().unwrap(), 1);
811 assert_eq!(COUNTING_EVENT_DESERIALIZE_COUNT.load(Ordering::SeqCst), 0);
812 assert_eq!(queue.pop().unwrap().unwrap().event, event);
813 assert_eq!(COUNTING_EVENT_DESERIALIZE_COUNT.load(Ordering::SeqCst), 1);
814 }
815
816 #[test]
817 fn lazy_poll_skips_in_flight_events_without_duplicating() {
818 let temp_dir = tempfile::tempdir().unwrap();
819 let queue =
820 DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
821 .unwrap();
822
823 let first = queue
824 .push(TestEvent {
825 value: "first".to_string(),
826 })
827 .unwrap();
828 let second = queue
829 .push(TestEvent {
830 value: "second".to_string(),
831 })
832 .unwrap();
833
834 assert_eq!(queue.pop().unwrap(), Some(first.clone()));
835 assert_eq!(queue.pop().unwrap(), Some(second.clone()));
836 assert_eq!(queue.ready_len().unwrap(), 0);
837
838 queue.nack(first.id).unwrap();
839
840 assert_eq!(queue.pop().unwrap(), Some(first));
841 assert_eq!(queue.pop().unwrap(), None);
842
843 queue.nack(second.id).unwrap();
844 assert_eq!(queue.pop().unwrap(), Some(second));
845 }
846
847 #[test]
848 fn lazy_nack_makes_event_available_before_later_ready_events() {
849 let temp_dir = tempfile::tempdir().unwrap();
850 let queue =
851 DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
852 .unwrap();
853
854 let first = queue
855 .push(TestEvent {
856 value: "first".to_string(),
857 })
858 .unwrap();
859 let second = queue
860 .push(TestEvent {
861 value: "second".to_string(),
862 })
863 .unwrap();
864
865 assert_eq!(queue.pop().unwrap(), Some(first.clone()));
866 queue.nack(first.id).unwrap();
867
868 assert_eq!(queue.pop().unwrap(), Some(first));
869 assert_eq!(queue.pop().unwrap(), Some(second));
870 }
871
872 #[test]
873 fn lazy_scan_cursor_advances_and_rewinds_on_nack() {
874 let temp_dir = tempfile::tempdir().unwrap();
875 let queue =
876 DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
877 .unwrap();
878
879 let first = queue
880 .push(TestEvent {
881 value: "first".to_string(),
882 })
883 .unwrap();
884 let second = queue
885 .push(TestEvent {
886 value: "second".to_string(),
887 })
888 .unwrap();
889
890 let state = match &queue.mode {
891 DurableEventQueueMode::Lazy { state } => state,
892 DurableEventQueueMode::Eager { .. } => panic!("expected lazy queue"),
893 };
894
895 assert_eq!(state.scan_cursor().unwrap(), first.id);
896 assert_eq!(queue.pop().unwrap(), Some(first.clone()));
897 assert_eq!(state.scan_cursor().unwrap(), second.id);
898 assert_eq!(queue.pop().unwrap(), Some(second.clone()));
899 assert_eq!(state.scan_cursor().unwrap(), second.id + 1);
900
901 queue.nack(first.id).unwrap();
902 assert_eq!(state.scan_cursor().unwrap(), first.id);
903 assert_eq!(queue.pop().unwrap(), Some(first.clone()));
904 assert_eq!(state.scan_cursor().unwrap(), second.id);
905
906 queue.ack(first.id).unwrap();
907 queue.nack(second.id).unwrap();
908 assert_eq!(state.scan_cursor().unwrap(), second.id);
909 assert_eq!(queue.pop().unwrap(), Some(second.clone()));
910 assert_eq!(state.scan_cursor().unwrap(), second.id + 1);
911 }
912
913 #[test]
914 fn lazy_ack_many_removes_in_flight_and_ready_events() {
915 let temp_dir = tempfile::tempdir().unwrap();
916 let queue =
917 DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
918 .unwrap();
919
920 let first = queue
921 .push(TestEvent {
922 value: "first".to_string(),
923 })
924 .unwrap();
925 let second = queue
926 .push(TestEvent {
927 value: "second".to_string(),
928 })
929 .unwrap();
930 let third = queue
931 .push(TestEvent {
932 value: "third".to_string(),
933 })
934 .unwrap();
935
936 assert_eq!(queue.pop().unwrap(), Some(first.clone()));
937 queue.ack_many([first.id, third.id]).unwrap();
938
939 assert_eq!(queue.pop().unwrap(), Some(second));
940 assert_eq!(queue.pop().unwrap(), None);
941 }
942
943 #[test]
944 fn lazy_push_wakes_blocking_poll() {
945 let temp_dir = tempfile::tempdir().unwrap();
946 let queue = Arc::new(
947 DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
948 .unwrap(),
949 );
950 let polling_queue = Arc::clone(&queue);
951
952 let handle = thread::spawn(move || polling_queue.poll().unwrap());
953 thread::sleep(Duration::from_millis(20));
954 let started = Instant::now();
955 let pushed = queue
956 .push(TestEvent {
957 value: "wake".to_string(),
958 })
959 .unwrap();
960
961 assert_eq!(handle.join().unwrap(), Some(pushed));
962 assert!(
963 started.elapsed() < Duration::from_millis(80),
964 "poll should wake after push instead of waiting for the full timeout"
965 );
966 }
967
968 #[test]
969 fn recovered_events_keep_fifo_order() {
970 let temp_dir = tempfile::tempdir().unwrap();
971 let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
972
973 let first = queue
974 .push(TestEvent {
975 value: "first".to_string(),
976 })
977 .unwrap();
978 let second = queue
979 .push(TestEvent {
980 value: "second".to_string(),
981 })
982 .unwrap();
983
984 drop(queue);
985
986 let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
987
988 assert_eq!(queue.pop().unwrap(), Some(first));
989 assert_eq!(queue.pop().unwrap(), Some(second));
990 assert_eq!(queue.pop().unwrap(), None);
991 }
992}