Skip to main content

arch_event_queues/
durable_event_queue.rs

1use std::{
2    collections::BTreeSet,
3    convert::TryInto,
4    io,
5    path::Path,
6    sync::{Arc, Condvar, Mutex},
7    time::Duration,
8};
9
10use borsh::{BorshDeserialize, BorshSerialize};
11use rocksdb::{
12    DBIteratorWithThreadMode, DBWithThreadMode, Direction, IteratorMode, MultiThreaded, Options,
13    WriteBatchWithTransaction, WriteOptions,
14};
15use thiserror::Error;
16
17use crate::event_queue::{EventQueue, EventQueueError};
18
19/// Event returned by a durable queue, including its durable identifier.
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct DurableEvent<T> {
22    /// Monotonically increasing event id assigned by the queue.
23    pub id: u64,
24    /// User-provided event payload.
25    pub event: T,
26}
27
28/// Options used when opening a durable event queue.
29#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
30pub struct DurableEventQueueOptions {
31    /// Enables lazy loading of persisted events from RocksDB.
32    pub lazy: bool,
33}
34
35impl DurableEventQueueOptions {
36    /// Creates options that enable lazy loading.
37    pub fn lazy() -> Self {
38        Self { lazy: true }
39    }
40}
41
42/// Error returned by durable queue operations.
43#[derive(Error, Debug)]
44pub enum DurableEventQueueError {
45    /// Error from the in-memory ready queue.
46    #[error("event queue error: {0}")]
47    EventQueue(#[from] EventQueueError),
48
49    /// Error returned by RocksDB.
50    #[error("rocksdb error: {0}")]
51    RocksDb(#[from] rocksdb::Error),
52
53    /// An internal durable queue lock was poisoned by a panicking thread.
54    #[error("durable event queue lock was poisoned")]
55    PoisonedLock,
56
57    /// Failed to serialize an event payload before writing it to RocksDB.
58    #[error("failed to serialize durable event: {source}")]
59    Serialize { source: io::Error },
60
61    /// Failed to deserialize a persisted event payload.
62    #[error("failed to deserialize durable event id {id}: {source}")]
63    Deserialize { id: u64, source: io::Error },
64
65    /// Found a RocksDB key that is not a valid durable event id.
66    #[error("invalid durable event key length: expected 8 bytes, got {actual}")]
67    InvalidKeyLength { actual: usize },
68
69    /// The queue cannot assign another event id.
70    #[error("event id space exhausted (u64 overflow)")]
71    IdOverflow,
72
73    /// The requested event id is not present in the durable store.
74    #[error("no durable event with id {id}; it may have already been acked")]
75    UnknownEvent { id: u64 },
76}
77
78/// RocksDB-backed event queue with at-least-once delivery.
79pub struct DurableEventQueue<T> {
80    db: Arc<DBWithThreadMode<MultiThreaded>>,
81    mode: DurableEventQueueMode<T>,
82    next_id: Mutex<u64>,
83}
84
85enum DurableEventQueueMode<T> {
86    Eager { queue: EventQueue<DurableEvent<T>> },
87    Lazy { state: LazyQueueState },
88}
89
90struct LazyQueueState {
91    in_flight: Mutex<BTreeSet<u64>>,
92    next_scan_id: Mutex<u64>,
93    generation: Mutex<u64>,
94    condvar: Condvar,
95}
96
97impl Default for LazyQueueState {
98    fn default() -> Self {
99        Self::new(0)
100    }
101}
102
103impl LazyQueueState {
104    fn new(next_scan_id: u64) -> Self {
105        Self {
106            in_flight: Mutex::new(BTreeSet::new()),
107            next_scan_id: Mutex::new(next_scan_id),
108            generation: Mutex::new(0),
109            condvar: Condvar::new(),
110        }
111    }
112
113    fn scan_cursor(&self) -> Result<u64, DurableEventQueueError> {
114        Ok(*self
115            .next_scan_id
116            .lock()
117            .map_err(|_| DurableEventQueueError::PoisonedLock)?)
118    }
119
120    fn set_scan_cursor(&self, next_scan_id: u64) -> Result<(), DurableEventQueueError> {
121        *self
122            .next_scan_id
123            .lock()
124            .map_err(|_| DurableEventQueueError::PoisonedLock)? = next_scan_id;
125        Ok(())
126    }
127
128    fn rewind_scan_cursor(&self, id: u64) -> Result<(), DurableEventQueueError> {
129        let mut next_scan_id = self
130            .next_scan_id
131            .lock()
132            .map_err(|_| DurableEventQueueError::PoisonedLock)?;
133        *next_scan_id = (*next_scan_id).min(id);
134        Ok(())
135    }
136
137    fn notify(&self) -> Result<(), DurableEventQueueError> {
138        let mut generation = self
139            .generation
140            .lock()
141            .map_err(|_| DurableEventQueueError::PoisonedLock)?;
142        *generation = generation.wrapping_add(1);
143        self.condvar.notify_one();
144        Ok(())
145    }
146
147    fn generation(&self) -> Result<u64, DurableEventQueueError> {
148        Ok(*self
149            .generation
150            .lock()
151            .map_err(|_| DurableEventQueueError::PoisonedLock)?)
152    }
153
154    fn wait_for_generation_change(
155        &self,
156        observed_generation: u64,
157    ) -> Result<(), DurableEventQueueError> {
158        let generation = self
159            .generation
160            .lock()
161            .map_err(|_| DurableEventQueueError::PoisonedLock)?;
162        let (_generation, _) = self
163            .condvar
164            .wait_timeout_while(generation, Duration::from_millis(100), |generation| {
165                *generation == observed_generation
166            })
167            .map_err(|_| DurableEventQueueError::PoisonedLock)?;
168        Ok(())
169    }
170}
171
172/// Iterator over unacked events stored in RocksDB.
173pub struct DurableEventQueueIterator<'a, T> {
174    inner: DBIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>,
175    _event: std::marker::PhantomData<T>,
176}
177
178impl<T> DurableEventQueue<T>
179where
180    T: BorshSerialize + BorshDeserialize + Clone,
181{
182    /// Opens a RocksDB database dedicated to this durable queue.
183    ///
184    /// The database must not be shared with unrelated data. Recovery scans all
185    /// keys in this DB and treats them as durable queue entries.
186    pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, DurableEventQueueError> {
187        Self::open_with_options(path, DurableEventQueueOptions::default())
188    }
189
190    /// Opens a durable queue with explicit options.
191    pub fn open_with_options<P: AsRef<Path>>(
192        path: P,
193        options: DurableEventQueueOptions,
194    ) -> Result<Self, DurableEventQueueError> {
195        let mut opts = Options::default();
196        opts.create_if_missing(true);
197
198        let db = Arc::new(DBWithThreadMode::<MultiThreaded>::open(&opts, path)?);
199        Self::load_from_db(db, options)
200    }
201
202    fn load_from_db(
203        db: Arc<DBWithThreadMode<MultiThreaded>>,
204        options: DurableEventQueueOptions,
205    ) -> Result<Self, DurableEventQueueError> {
206        if options.lazy {
207            return Self::load_lazy_from_db(db);
208        }
209
210        Self::load_eager_from_db(db)
211    }
212
213    fn load_eager_from_db(
214        db: Arc<DBWithThreadMode<MultiThreaded>>,
215    ) -> Result<Self, DurableEventQueueError> {
216        let queue = EventQueue::new();
217        let mut next_id = 0;
218
219        for item in db.iterator(IteratorMode::Start) {
220            let (key, value) = item?;
221            let id = decode_key(key.as_ref())?;
222            let event = T::try_from_slice(value.as_ref())
223                .map_err(|source| DurableEventQueueError::Deserialize { id, source })?;
224            queue.push(DurableEvent { id, event })?;
225            next_id = next_id.max(id.saturating_add(1));
226        }
227
228        Ok(Self {
229            db,
230            mode: DurableEventQueueMode::Eager { queue },
231            next_id: Mutex::new(next_id),
232        })
233    }
234
235    fn load_lazy_from_db(
236        db: Arc<DBWithThreadMode<MultiThreaded>>,
237    ) -> Result<Self, DurableEventQueueError> {
238        let mut first_id = None;
239        let mut next_id = 0;
240
241        for item in db.iterator(IteratorMode::Start) {
242            let (key, _) = item?;
243            let id = decode_key(key.as_ref())?;
244            first_id.get_or_insert(id);
245            next_id = next_id.max(id.saturating_add(1));
246        }
247
248        Ok(Self {
249            db,
250            mode: DurableEventQueueMode::Lazy {
251                state: LazyQueueState::new(first_id.unwrap_or(next_id)),
252            },
253            next_id: Mutex::new(next_id),
254        })
255    }
256
257    /// Durably stores `event` and enqueues it for consumption.
258    ///
259    /// # Partial-failure note
260    /// The DB write and the in-memory enqueue are not atomic. If `queue.push`
261    /// fails after `db.put` succeeds (only possible when the internal lock is
262    /// poisoned), the event is already durable and will be replayed on the next
263    /// `open` call. Callers should treat this as at-least-once delivery.
264    pub fn push(&self, event: T) -> Result<DurableEvent<T>, DurableEventQueueError> {
265        let mut next_id = self
266            .next_id
267            .lock()
268            .map_err(|_| DurableEventQueueError::PoisonedLock)?;
269        let id = *next_id;
270        let next = id
271            .checked_add(1)
272            .ok_or(DurableEventQueueError::IdOverflow)?;
273
274        let durable_event = DurableEvent { id, event };
275        let value = borsh::to_vec(&durable_event.event)
276            .map_err(|source| DurableEventQueueError::Serialize { source })?;
277
278        self.db.put_opt(encode_key(id), value, &sync_writes())?;
279        *next_id = next;
280        match &self.mode {
281            DurableEventQueueMode::Eager { queue } => queue.push(durable_event.clone())?,
282            DurableEventQueueMode::Lazy { state } => state.notify()?,
283        }
284
285        Ok(durable_event)
286    }
287
288    /// Reads an unacked event by id without marking it in flight.
289    pub fn get(&self, id: u64) -> Result<Option<DurableEvent<T>>, DurableEventQueueError> {
290        let Some(raw) = self.db.get(encode_key(id))? else {
291            return Ok(None);
292        };
293        let event = T::try_from_slice(&raw)
294            .map_err(|source| DurableEventQueueError::Deserialize { id, source })?;
295        Ok(Some(DurableEvent { id, event }))
296    }
297
298    /// Waits briefly for an available durable event.
299    pub fn poll(&self) -> Result<Option<DurableEvent<T>>, DurableEventQueueError> {
300        match &self.mode {
301            DurableEventQueueMode::Eager { queue } => Ok(queue.poll()?),
302            DurableEventQueueMode::Lazy { state } => {
303                let observed_generation = state.generation()?;
304                if let Some(event) = self.pop_lazy(state)? {
305                    return Ok(Some(event));
306                }
307
308                state.wait_for_generation_change(observed_generation)?;
309                self.pop_lazy(state)
310            }
311        }
312    }
313
314    /// Returns the next available durable event immediately.
315    pub fn pop(&self) -> Result<Option<DurableEvent<T>>, DurableEventQueueError> {
316        match &self.mode {
317            DurableEventQueueMode::Eager { queue } => Ok(queue.pop()?),
318            DurableEventQueueMode::Lazy { state } => self.pop_lazy(state),
319        }
320    }
321
322    fn pop_lazy(
323        &self,
324        state: &LazyQueueState,
325    ) -> Result<Option<DurableEvent<T>>, DurableEventQueueError> {
326        let mut in_flight = state
327            .in_flight
328            .lock()
329            .map_err(|_| DurableEventQueueError::PoisonedLock)?;
330        let start_scan_id = state.scan_cursor()?;
331        let start_key = encode_key(start_scan_id);
332        let mut next_scan_id = start_scan_id;
333
334        for item in self
335            .db
336            .iterator(IteratorMode::From(&start_key, Direction::Forward))
337        {
338            let (key, value) = item?;
339            let id = decode_key(key.as_ref())?;
340            next_scan_id = id.saturating_add(1);
341            if in_flight.contains(&id) {
342                continue;
343            }
344
345            let event = T::try_from_slice(value.as_ref())
346                .map_err(|source| DurableEventQueueError::Deserialize { id, source })?;
347            in_flight.insert(id);
348            state.set_scan_cursor(next_scan_id)?;
349            return Ok(Some(DurableEvent { id, event }));
350        }
351
352        state.set_scan_cursor(next_scan_id)?;
353        Ok(None)
354    }
355
356    /// Marks an event as handled by deleting it from RocksDB.
357    ///
358    /// This method is intentionally not strict for performance: acking an
359    /// unknown or already-acked id succeeds because RocksDB deletes are
360    /// idempotent and avoid an extra read.
361    pub fn ack(&self, id: u64) -> Result<(), DurableEventQueueError> {
362        self.db.delete_opt(encode_key(id), &sync_writes())?;
363        if let DurableEventQueueMode::Lazy { state } = &self.mode {
364            state
365                .in_flight
366                .lock()
367                .map_err(|_| DurableEventQueueError::PoisonedLock)?
368                .remove(&id);
369        }
370        Ok(())
371    }
372
373    /// Marks multiple events as handled with a single RocksDB batch write.
374    ///
375    /// Like `ack`, this is intentionally not strict: unknown or already-acked
376    /// ids succeed because RocksDB deletes are idempotent and avoid extra reads.
377    pub fn ack_many<I>(&self, ids: I) -> Result<(), DurableEventQueueError>
378    where
379        I: IntoIterator<Item = u64>,
380    {
381        let ids = ids.into_iter().collect::<Vec<_>>();
382        let mut batch = WriteBatchWithTransaction::<false>::default();
383        for id in &ids {
384            batch.delete(encode_key(*id));
385        }
386        self.db.write_opt(batch, &sync_writes())?;
387
388        if let DurableEventQueueMode::Lazy { state } = &self.mode {
389            let mut in_flight = state
390                .in_flight
391                .lock()
392                .map_err(|_| DurableEventQueueError::PoisonedLock)?;
393            for id in ids {
394                in_flight.remove(&id);
395            }
396        }
397
398        Ok(())
399    }
400
401    /// Re-enqueues an in-flight event at the front of the ready queue without
402    /// removing it from the durable store. Use this when a consumer cannot
403    /// handle an event and wants it available for the next `poll`/`pop` without
404    /// restarting.
405    pub fn nack(&self, id: u64) -> Result<(), DurableEventQueueError> {
406        let raw = self.db.get(encode_key(id))?;
407        match &self.mode {
408            DurableEventQueueMode::Eager { queue } => {
409                let raw = raw.ok_or(DurableEventQueueError::UnknownEvent { id })?;
410                let event = T::try_from_slice(&raw)
411                    .map_err(|source| DurableEventQueueError::Deserialize { id, source })?;
412                queue.push_front(DurableEvent { id, event })?;
413            }
414            DurableEventQueueMode::Lazy { state } => {
415                if raw.is_none() {
416                    return Err(DurableEventQueueError::UnknownEvent { id });
417                }
418                {
419                    let mut in_flight = state
420                        .in_flight
421                        .lock()
422                        .map_err(|_| DurableEventQueueError::PoisonedLock)?;
423                    in_flight.remove(&id);
424                    state.rewind_scan_cursor(id)?;
425                }
426                state.notify()?;
427            }
428        }
429        Ok(())
430    }
431
432    /// Counts unacked events in RocksDB.
433    ///
434    /// This scans the dedicated queue DB, so it is O(n) in the number of
435    /// unacked events. Prefer using it for diagnostics/tests rather than hot
436    /// path metrics.
437    pub fn len(&self) -> Result<usize, DurableEventQueueError> {
438        let mut count = 0;
439        for item in self.db.iterator(IteratorMode::Start) {
440            item?;
441            count += 1;
442        }
443        Ok(count)
444    }
445
446    /// Returns true when there are no unacked events in RocksDB.
447    pub fn is_empty(&self) -> Result<bool, DurableEventQueueError> {
448        let mut iter = self.db.iterator(IteratorMode::Start);
449        match iter.next() {
450            Some(Ok(_)) => Ok(false),
451            Some(Err(err)) => Err(DurableEventQueueError::RocksDb(err)),
452            None => Ok(true),
453        }
454    }
455
456    /// Counts events currently available for processing.
457    pub fn ready_len(&self) -> Result<usize, DurableEventQueueError> {
458        match &self.mode {
459            DurableEventQueueMode::Eager { queue } => Ok(queue.len()?),
460            DurableEventQueueMode::Lazy { state } => {
461                let in_flight = state
462                    .in_flight
463                    .lock()
464                    .map_err(|_| DurableEventQueueError::PoisonedLock)?;
465                let mut count = 0;
466                for item in self.db.iterator(IteratorMode::Start) {
467                    let (key, _) = item?;
468                    let id = decode_key(key.as_ref())?;
469                    if !in_flight.contains(&id) {
470                        count += 1;
471                    }
472                }
473                Ok(count)
474            }
475        }
476    }
477
478    /// Iterates over all unacked events in id order.
479    pub fn iterator(&self) -> DurableEventQueueIterator<'_, T> {
480        DurableEventQueueIterator {
481            inner: self.db.iterator(IteratorMode::Start),
482            _event: std::marker::PhantomData,
483        }
484    }
485}
486
487impl<'a, T> Iterator for DurableEventQueueIterator<'a, T>
488where
489    T: BorshDeserialize,
490{
491    type Item = Result<DurableEvent<T>, DurableEventQueueError>;
492
493    fn next(&mut self) -> Option<Self::Item> {
494        let (key, value) = match self.inner.next()? {
495            Ok(kv) => kv,
496            Err(err) => return Some(Err(DurableEventQueueError::RocksDb(err))),
497        };
498        let id = match decode_key(key.as_ref()) {
499            Ok(id) => id,
500            Err(err) => return Some(Err(err)),
501        };
502        let event = match T::try_from_slice(value.as_ref()) {
503            Ok(event) => event,
504            Err(source) => {
505                return Some(Err(DurableEventQueueError::Deserialize { id, source }));
506            }
507        };
508        Some(Ok(DurableEvent { id, event }))
509    }
510}
511
512fn encode_key(id: u64) -> [u8; 8] {
513    id.to_be_bytes()
514}
515
516fn decode_key(key: &[u8]) -> Result<u64, DurableEventQueueError> {
517    let bytes: [u8; 8] = key
518        .try_into()
519        .map_err(|_| DurableEventQueueError::InvalidKeyLength { actual: key.len() })?;
520    Ok(u64::from_be_bytes(bytes))
521}
522
523fn sync_writes() -> WriteOptions {
524    let mut opts = WriteOptions::default();
525    opts.set_sync(true);
526    opts
527}
528
529#[cfg(test)]
530mod tests {
531    use std::{
532        sync::{
533            atomic::{AtomicUsize, Ordering},
534            Arc,
535        },
536        thread,
537        time::{Duration, Instant},
538    };
539
540    use super::*;
541
542    #[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)]
543    struct TestEvent {
544        value: String,
545    }
546
547    static COUNTING_EVENT_DESERIALIZE_COUNT: AtomicUsize = AtomicUsize::new(0);
548
549    #[derive(Debug, Clone, PartialEq, Eq, BorshSerialize)]
550    struct CountingEvent {
551        value: String,
552    }
553
554    impl BorshDeserialize for CountingEvent {
555        fn deserialize_reader<R: std::io::Read>(reader: &mut R) -> Result<Self, borsh::io::Error> {
556            COUNTING_EVENT_DESERIALIZE_COUNT.fetch_add(1, Ordering::SeqCst);
557            Ok(Self {
558                value: String::deserialize_reader(reader)?,
559            })
560        }
561    }
562
563    #[test]
564    fn push_persists_event_until_ack() {
565        let temp_dir = tempfile::tempdir().unwrap();
566        let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
567
568        let pushed = queue
569            .push(TestEvent {
570                value: "first".to_string(),
571            })
572            .unwrap();
573
574        assert_eq!(pushed.id, 0);
575        assert_eq!(queue.len().unwrap(), 1);
576        assert_eq!(queue.poll().unwrap(), Some(pushed.clone()));
577        assert_eq!(queue.ready_len().unwrap(), 0);
578        assert_eq!(queue.len().unwrap(), 1);
579
580        drop(queue);
581
582        let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
583        assert_eq!(queue.poll().unwrap(), Some(pushed.clone()));
584
585        queue.ack(pushed.id).unwrap();
586        drop(queue);
587
588        let queue = DurableEventQueue::<TestEvent>::open(temp_dir.path()).unwrap();
589        assert!(queue.is_empty().unwrap());
590    }
591
592    #[test]
593    fn get_returns_event_by_id_without_polling() {
594        let temp_dir = tempfile::tempdir().unwrap();
595        let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
596
597        let first = queue
598            .push(TestEvent {
599                value: "first".to_string(),
600            })
601            .unwrap();
602        let second = queue
603            .push(TestEvent {
604                value: "second".to_string(),
605            })
606            .unwrap();
607
608        assert_eq!(queue.get(second.id).unwrap(), Some(second.clone()));
609        assert_eq!(queue.ready_len().unwrap(), 2);
610        assert_eq!(queue.poll().unwrap(), Some(first));
611        assert_eq!(queue.poll().unwrap(), Some(second));
612    }
613
614    #[test]
615    fn get_returns_none_after_ack() {
616        let temp_dir = tempfile::tempdir().unwrap();
617        let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
618
619        let pushed = queue
620            .push(TestEvent {
621                value: "first".to_string(),
622            })
623            .unwrap();
624
625        assert_eq!(queue.get(pushed.id).unwrap(), Some(pushed.clone()));
626        queue.ack(pushed.id).unwrap();
627        assert_eq!(queue.get(pushed.id).unwrap(), None);
628    }
629
630    #[test]
631    fn push_at_id_max_overflows() {
632        let temp_dir = tempfile::tempdir().unwrap();
633        let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
634
635        // Manually seed next_id to u64::MAX so the first push hits the ceiling.
636        *queue.next_id.lock().unwrap() = u64::MAX;
637
638        let err = queue
639            .push(TestEvent {
640                value: "boom".to_string(),
641            })
642            .unwrap_err();
643
644        assert!(
645            matches!(err, DurableEventQueueError::IdOverflow),
646            "expected IdOverflow, got {err}"
647        );
648        assert!(queue.is_empty().unwrap(), "no event should be written");
649    }
650
651    #[test]
652    fn nack_requeues_event_for_reprocessing() {
653        let temp_dir = tempfile::tempdir().unwrap();
654        let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
655
656        let pushed = queue
657            .push(TestEvent {
658                value: "retry me".to_string(),
659            })
660            .unwrap();
661
662        // Consumer takes the event...
663        let taken = queue.pop().unwrap().unwrap();
664        assert_eq!(taken, pushed);
665        assert_eq!(queue.ready_len().unwrap(), 0);
666
667        // ...fails to process it and nacks.
668        queue.nack(pushed.id).unwrap();
669        assert_eq!(queue.ready_len().unwrap(), 1);
670
671        // Event is available again with the same id.
672        let retried = queue.pop().unwrap().unwrap();
673        assert_eq!(retried, pushed);
674
675        // Still durable until acked.
676        assert_eq!(queue.len().unwrap(), 1);
677        queue.ack(pushed.id).unwrap();
678        assert!(queue.is_empty().unwrap());
679    }
680
681    #[test]
682    fn nack_requeues_event_before_ready_events() {
683        let temp_dir = tempfile::tempdir().unwrap();
684        let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
685
686        let first = queue
687            .push(TestEvent {
688                value: "first".to_string(),
689            })
690            .unwrap();
691        let second = queue
692            .push(TestEvent {
693                value: "second".to_string(),
694            })
695            .unwrap();
696
697        assert_eq!(queue.pop().unwrap().unwrap(), first);
698        queue.nack(first.id).unwrap();
699
700        assert_eq!(queue.pop().unwrap().unwrap(), first);
701        assert_eq!(queue.pop().unwrap().unwrap(), second);
702    }
703
704    #[test]
705    fn nack_unknown_id_returns_error() {
706        let temp_dir = tempfile::tempdir().unwrap();
707        let queue = DurableEventQueue::<TestEvent>::open(temp_dir.path()).unwrap();
708
709        let err = queue.nack(99).unwrap_err();
710        assert!(
711            matches!(err, DurableEventQueueError::UnknownEvent { id: 99 }),
712            "expected UnknownEvent(99), got {err}"
713        );
714    }
715
716    #[test]
717    fn ack_many_removes_multiple_events() {
718        let temp_dir = tempfile::tempdir().unwrap();
719        let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
720
721        let first = queue
722            .push(TestEvent {
723                value: "first".to_string(),
724            })
725            .unwrap();
726        let second = queue
727            .push(TestEvent {
728                value: "second".to_string(),
729            })
730            .unwrap();
731        let third = queue
732            .push(TestEvent {
733                value: "third".to_string(),
734            })
735            .unwrap();
736
737        queue.ack_many([first.id, third.id]).unwrap();
738        drop(queue);
739
740        let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
741        assert_eq!(queue.pop().unwrap(), Some(second));
742        assert_eq!(queue.pop().unwrap(), None);
743    }
744
745    #[test]
746    fn ack_many_is_idempotent_for_unknown_ids() {
747        let temp_dir = tempfile::tempdir().unwrap();
748        let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
749
750        let pushed = queue
751            .push(TestEvent {
752                value: "first".to_string(),
753            })
754            .unwrap();
755
756        queue.ack_many([pushed.id, 99, pushed.id]).unwrap();
757        queue.ack_many([pushed.id, 99]).unwrap();
758        drop(queue);
759
760        let queue = DurableEventQueue::<TestEvent>::open(temp_dir.path()).unwrap();
761        assert!(queue.is_empty().unwrap());
762    }
763
764    #[test]
765    fn lazy_recovered_events_keep_fifo_order() {
766        let temp_dir = tempfile::tempdir().unwrap();
767        let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
768
769        let first = queue
770            .push(TestEvent {
771                value: "first".to_string(),
772            })
773            .unwrap();
774        let second = queue
775            .push(TestEvent {
776                value: "second".to_string(),
777            })
778            .unwrap();
779
780        drop(queue);
781
782        let queue =
783            DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
784                .unwrap();
785
786        assert_eq!(queue.pop().unwrap(), Some(first));
787        assert_eq!(queue.pop().unwrap(), Some(second));
788        assert_eq!(queue.pop().unwrap(), None);
789    }
790
791    #[test]
792    fn lazy_open_does_not_deserialize_recovered_values_until_poll() {
793        let temp_dir = tempfile::tempdir().unwrap();
794        let event = CountingEvent {
795            value: "large payload".to_string(),
796        };
797        {
798            let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
799            queue.push(event.clone()).unwrap();
800        }
801
802        COUNTING_EVENT_DESERIALIZE_COUNT.store(0, Ordering::SeqCst);
803        let queue = DurableEventQueue::<CountingEvent>::open_with_options(
804            temp_dir.path(),
805            DurableEventQueueOptions::lazy(),
806        )
807        .unwrap();
808
809        assert_eq!(COUNTING_EVENT_DESERIALIZE_COUNT.load(Ordering::SeqCst), 0);
810        assert_eq!(queue.ready_len().unwrap(), 1);
811        assert_eq!(COUNTING_EVENT_DESERIALIZE_COUNT.load(Ordering::SeqCst), 0);
812        assert_eq!(queue.pop().unwrap().unwrap().event, event);
813        assert_eq!(COUNTING_EVENT_DESERIALIZE_COUNT.load(Ordering::SeqCst), 1);
814    }
815
816    #[test]
817    fn lazy_poll_skips_in_flight_events_without_duplicating() {
818        let temp_dir = tempfile::tempdir().unwrap();
819        let queue =
820            DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
821                .unwrap();
822
823        let first = queue
824            .push(TestEvent {
825                value: "first".to_string(),
826            })
827            .unwrap();
828        let second = queue
829            .push(TestEvent {
830                value: "second".to_string(),
831            })
832            .unwrap();
833
834        assert_eq!(queue.pop().unwrap(), Some(first.clone()));
835        assert_eq!(queue.pop().unwrap(), Some(second.clone()));
836        assert_eq!(queue.ready_len().unwrap(), 0);
837
838        queue.nack(first.id).unwrap();
839
840        assert_eq!(queue.pop().unwrap(), Some(first));
841        assert_eq!(queue.pop().unwrap(), None);
842
843        queue.nack(second.id).unwrap();
844        assert_eq!(queue.pop().unwrap(), Some(second));
845    }
846
847    #[test]
848    fn lazy_nack_makes_event_available_before_later_ready_events() {
849        let temp_dir = tempfile::tempdir().unwrap();
850        let queue =
851            DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
852                .unwrap();
853
854        let first = queue
855            .push(TestEvent {
856                value: "first".to_string(),
857            })
858            .unwrap();
859        let second = queue
860            .push(TestEvent {
861                value: "second".to_string(),
862            })
863            .unwrap();
864
865        assert_eq!(queue.pop().unwrap(), Some(first.clone()));
866        queue.nack(first.id).unwrap();
867
868        assert_eq!(queue.pop().unwrap(), Some(first));
869        assert_eq!(queue.pop().unwrap(), Some(second));
870    }
871
872    #[test]
873    fn lazy_scan_cursor_advances_and_rewinds_on_nack() {
874        let temp_dir = tempfile::tempdir().unwrap();
875        let queue =
876            DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
877                .unwrap();
878
879        let first = queue
880            .push(TestEvent {
881                value: "first".to_string(),
882            })
883            .unwrap();
884        let second = queue
885            .push(TestEvent {
886                value: "second".to_string(),
887            })
888            .unwrap();
889
890        let state = match &queue.mode {
891            DurableEventQueueMode::Lazy { state } => state,
892            DurableEventQueueMode::Eager { .. } => panic!("expected lazy queue"),
893        };
894
895        assert_eq!(state.scan_cursor().unwrap(), first.id);
896        assert_eq!(queue.pop().unwrap(), Some(first.clone()));
897        assert_eq!(state.scan_cursor().unwrap(), second.id);
898        assert_eq!(queue.pop().unwrap(), Some(second.clone()));
899        assert_eq!(state.scan_cursor().unwrap(), second.id + 1);
900
901        queue.nack(first.id).unwrap();
902        assert_eq!(state.scan_cursor().unwrap(), first.id);
903        assert_eq!(queue.pop().unwrap(), Some(first.clone()));
904        assert_eq!(state.scan_cursor().unwrap(), second.id);
905
906        queue.ack(first.id).unwrap();
907        queue.nack(second.id).unwrap();
908        assert_eq!(state.scan_cursor().unwrap(), second.id);
909        assert_eq!(queue.pop().unwrap(), Some(second.clone()));
910        assert_eq!(state.scan_cursor().unwrap(), second.id + 1);
911    }
912
913    #[test]
914    fn lazy_ack_many_removes_in_flight_and_ready_events() {
915        let temp_dir = tempfile::tempdir().unwrap();
916        let queue =
917            DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
918                .unwrap();
919
920        let first = queue
921            .push(TestEvent {
922                value: "first".to_string(),
923            })
924            .unwrap();
925        let second = queue
926            .push(TestEvent {
927                value: "second".to_string(),
928            })
929            .unwrap();
930        let third = queue
931            .push(TestEvent {
932                value: "third".to_string(),
933            })
934            .unwrap();
935
936        assert_eq!(queue.pop().unwrap(), Some(first.clone()));
937        queue.ack_many([first.id, third.id]).unwrap();
938
939        assert_eq!(queue.pop().unwrap(), Some(second));
940        assert_eq!(queue.pop().unwrap(), None);
941    }
942
943    #[test]
944    fn lazy_push_wakes_blocking_poll() {
945        let temp_dir = tempfile::tempdir().unwrap();
946        let queue = Arc::new(
947            DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
948                .unwrap(),
949        );
950        let polling_queue = Arc::clone(&queue);
951
952        let handle = thread::spawn(move || polling_queue.poll().unwrap());
953        thread::sleep(Duration::from_millis(20));
954        let started = Instant::now();
955        let pushed = queue
956            .push(TestEvent {
957                value: "wake".to_string(),
958            })
959            .unwrap();
960
961        assert_eq!(handle.join().unwrap(), Some(pushed));
962        assert!(
963            started.elapsed() < Duration::from_millis(80),
964            "poll should wake after push instead of waiting for the full timeout"
965        );
966    }
967
968    #[test]
969    fn recovered_events_keep_fifo_order() {
970        let temp_dir = tempfile::tempdir().unwrap();
971        let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
972
973        let first = queue
974            .push(TestEvent {
975                value: "first".to_string(),
976            })
977            .unwrap();
978        let second = queue
979            .push(TestEvent {
980                value: "second".to_string(),
981            })
982            .unwrap();
983
984        drop(queue);
985
986        let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
987
988        assert_eq!(queue.pop().unwrap(), Some(first));
989        assert_eq!(queue.pop().unwrap(), Some(second));
990        assert_eq!(queue.pop().unwrap(), None);
991    }
992}