use std::{
collections::BTreeSet,
convert::TryInto,
io,
path::Path,
sync::{Arc, Condvar, Mutex},
time::Duration,
};
use borsh::{BorshDeserialize, BorshSerialize};
use rocksdb::{
DBIteratorWithThreadMode, DBWithThreadMode, Direction, IteratorMode, MultiThreaded, Options,
WriteBatchWithTransaction, WriteOptions,
};
use thiserror::Error;
use crate::event_queue::{EventQueue, EventQueueError};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DurableEvent<T> {
pub id: u64,
pub event: T,
}
#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)]
pub struct DurableEventQueueOptions {
pub lazy: bool,
}
impl DurableEventQueueOptions {
pub fn lazy() -> Self {
Self { lazy: true }
}
}
#[derive(Error, Debug)]
pub enum DurableEventQueueError {
#[error("event queue error: {0}")]
EventQueue(#[from] EventQueueError),
#[error("rocksdb error: {0}")]
RocksDb(#[from] rocksdb::Error),
#[error("durable event queue lock was poisoned")]
PoisonedLock,
#[error("failed to serialize durable event: {source}")]
Serialize { source: io::Error },
#[error("failed to deserialize durable event id {id}: {source}")]
Deserialize { id: u64, source: io::Error },
#[error("invalid durable event key length: expected 8 bytes, got {actual}")]
InvalidKeyLength { actual: usize },
#[error("event id space exhausted (u64 overflow)")]
IdOverflow,
#[error("no durable event with id {id}; it may have already been acked")]
UnknownEvent { id: u64 },
}
pub struct DurableEventQueue<T> {
db: Arc<DBWithThreadMode<MultiThreaded>>,
mode: DurableEventQueueMode<T>,
next_id: Mutex<u64>,
}
enum DurableEventQueueMode<T> {
Eager { queue: EventQueue<DurableEvent<T>> },
Lazy { state: LazyQueueState },
}
struct LazyQueueState {
in_flight: Mutex<BTreeSet<u64>>,
next_scan_id: Mutex<u64>,
generation: Mutex<u64>,
condvar: Condvar,
}
impl Default for LazyQueueState {
fn default() -> Self {
Self::new(0)
}
}
impl LazyQueueState {
fn new(next_scan_id: u64) -> Self {
Self {
in_flight: Mutex::new(BTreeSet::new()),
next_scan_id: Mutex::new(next_scan_id),
generation: Mutex::new(0),
condvar: Condvar::new(),
}
}
fn scan_cursor(&self) -> Result<u64, DurableEventQueueError> {
Ok(*self
.next_scan_id
.lock()
.map_err(|_| DurableEventQueueError::PoisonedLock)?)
}
fn set_scan_cursor(&self, next_scan_id: u64) -> Result<(), DurableEventQueueError> {
*self
.next_scan_id
.lock()
.map_err(|_| DurableEventQueueError::PoisonedLock)? = next_scan_id;
Ok(())
}
fn rewind_scan_cursor(&self, id: u64) -> Result<(), DurableEventQueueError> {
let mut next_scan_id = self
.next_scan_id
.lock()
.map_err(|_| DurableEventQueueError::PoisonedLock)?;
*next_scan_id = (*next_scan_id).min(id);
Ok(())
}
fn notify(&self) -> Result<(), DurableEventQueueError> {
let mut generation = self
.generation
.lock()
.map_err(|_| DurableEventQueueError::PoisonedLock)?;
*generation = generation.wrapping_add(1);
self.condvar.notify_one();
Ok(())
}
fn generation(&self) -> Result<u64, DurableEventQueueError> {
Ok(*self
.generation
.lock()
.map_err(|_| DurableEventQueueError::PoisonedLock)?)
}
fn wait_for_generation_change(
&self,
observed_generation: u64,
) -> Result<(), DurableEventQueueError> {
let generation = self
.generation
.lock()
.map_err(|_| DurableEventQueueError::PoisonedLock)?;
let (_generation, _) = self
.condvar
.wait_timeout_while(generation, Duration::from_millis(100), |generation| {
*generation == observed_generation
})
.map_err(|_| DurableEventQueueError::PoisonedLock)?;
Ok(())
}
}
pub struct DurableEventQueueIterator<'a, T> {
inner: DBIteratorWithThreadMode<'a, DBWithThreadMode<MultiThreaded>>,
_event: std::marker::PhantomData<T>,
}
impl<T> DurableEventQueue<T>
where
T: BorshSerialize + BorshDeserialize + Clone,
{
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self, DurableEventQueueError> {
Self::open_with_options(path, DurableEventQueueOptions::default())
}
pub fn open_with_options<P: AsRef<Path>>(
path: P,
options: DurableEventQueueOptions,
) -> Result<Self, DurableEventQueueError> {
let mut opts = Options::default();
opts.create_if_missing(true);
let db = Arc::new(DBWithThreadMode::<MultiThreaded>::open(&opts, path)?);
Self::load_from_db(db, options)
}
fn load_from_db(
db: Arc<DBWithThreadMode<MultiThreaded>>,
options: DurableEventQueueOptions,
) -> Result<Self, DurableEventQueueError> {
if options.lazy {
return Self::load_lazy_from_db(db);
}
Self::load_eager_from_db(db)
}
fn load_eager_from_db(
db: Arc<DBWithThreadMode<MultiThreaded>>,
) -> Result<Self, DurableEventQueueError> {
let queue = EventQueue::new();
let mut next_id = 0;
for item in db.iterator(IteratorMode::Start) {
let (key, value) = item?;
let id = decode_key(key.as_ref())?;
let event = T::try_from_slice(value.as_ref())
.map_err(|source| DurableEventQueueError::Deserialize { id, source })?;
queue.push(DurableEvent { id, event })?;
next_id = next_id.max(id.saturating_add(1));
}
Ok(Self {
db,
mode: DurableEventQueueMode::Eager { queue },
next_id: Mutex::new(next_id),
})
}
fn load_lazy_from_db(
db: Arc<DBWithThreadMode<MultiThreaded>>,
) -> Result<Self, DurableEventQueueError> {
let mut first_id = None;
let mut next_id = 0;
for item in db.iterator(IteratorMode::Start) {
let (key, _) = item?;
let id = decode_key(key.as_ref())?;
first_id.get_or_insert(id);
next_id = next_id.max(id.saturating_add(1));
}
Ok(Self {
db,
mode: DurableEventQueueMode::Lazy {
state: LazyQueueState::new(first_id.unwrap_or(next_id)),
},
next_id: Mutex::new(next_id),
})
}
pub fn push(&self, event: T) -> Result<DurableEvent<T>, DurableEventQueueError> {
let mut next_id = self
.next_id
.lock()
.map_err(|_| DurableEventQueueError::PoisonedLock)?;
let id = *next_id;
let next = id
.checked_add(1)
.ok_or(DurableEventQueueError::IdOverflow)?;
let durable_event = DurableEvent { id, event };
let value = borsh::to_vec(&durable_event.event)
.map_err(|source| DurableEventQueueError::Serialize { source })?;
self.db.put_opt(encode_key(id), value, &sync_writes())?;
*next_id = next;
match &self.mode {
DurableEventQueueMode::Eager { queue } => queue.push(durable_event.clone())?,
DurableEventQueueMode::Lazy { state } => state.notify()?,
}
Ok(durable_event)
}
pub fn get(&self, id: u64) -> Result<Option<DurableEvent<T>>, DurableEventQueueError> {
let Some(raw) = self.db.get(encode_key(id))? else {
return Ok(None);
};
let event = T::try_from_slice(&raw)
.map_err(|source| DurableEventQueueError::Deserialize { id, source })?;
Ok(Some(DurableEvent { id, event }))
}
pub fn poll(&self) -> Result<Option<DurableEvent<T>>, DurableEventQueueError> {
match &self.mode {
DurableEventQueueMode::Eager { queue } => Ok(queue.poll()?),
DurableEventQueueMode::Lazy { state } => {
let observed_generation = state.generation()?;
if let Some(event) = self.pop_lazy(state)? {
return Ok(Some(event));
}
state.wait_for_generation_change(observed_generation)?;
self.pop_lazy(state)
}
}
}
pub fn pop(&self) -> Result<Option<DurableEvent<T>>, DurableEventQueueError> {
match &self.mode {
DurableEventQueueMode::Eager { queue } => Ok(queue.pop()?),
DurableEventQueueMode::Lazy { state } => self.pop_lazy(state),
}
}
fn pop_lazy(
&self,
state: &LazyQueueState,
) -> Result<Option<DurableEvent<T>>, DurableEventQueueError> {
let mut in_flight = state
.in_flight
.lock()
.map_err(|_| DurableEventQueueError::PoisonedLock)?;
let start_scan_id = state.scan_cursor()?;
let start_key = encode_key(start_scan_id);
let mut next_scan_id = start_scan_id;
for item in self
.db
.iterator(IteratorMode::From(&start_key, Direction::Forward))
{
let (key, value) = item?;
let id = decode_key(key.as_ref())?;
next_scan_id = id.saturating_add(1);
if in_flight.contains(&id) {
continue;
}
let event = T::try_from_slice(value.as_ref())
.map_err(|source| DurableEventQueueError::Deserialize { id, source })?;
in_flight.insert(id);
state.set_scan_cursor(next_scan_id)?;
return Ok(Some(DurableEvent { id, event }));
}
state.set_scan_cursor(next_scan_id)?;
Ok(None)
}
pub fn ack(&self, id: u64) -> Result<(), DurableEventQueueError> {
self.db.delete_opt(encode_key(id), &sync_writes())?;
if let DurableEventQueueMode::Lazy { state } = &self.mode {
state
.in_flight
.lock()
.map_err(|_| DurableEventQueueError::PoisonedLock)?
.remove(&id);
}
Ok(())
}
pub fn ack_many<I>(&self, ids: I) -> Result<(), DurableEventQueueError>
where
I: IntoIterator<Item = u64>,
{
let ids = ids.into_iter().collect::<Vec<_>>();
let mut batch = WriteBatchWithTransaction::<false>::default();
for id in &ids {
batch.delete(encode_key(*id));
}
self.db.write_opt(batch, &sync_writes())?;
if let DurableEventQueueMode::Lazy { state } = &self.mode {
let mut in_flight = state
.in_flight
.lock()
.map_err(|_| DurableEventQueueError::PoisonedLock)?;
for id in ids {
in_flight.remove(&id);
}
}
Ok(())
}
pub fn nack(&self, id: u64) -> Result<(), DurableEventQueueError> {
let raw = self.db.get(encode_key(id))?;
match &self.mode {
DurableEventQueueMode::Eager { queue } => {
let raw = raw.ok_or(DurableEventQueueError::UnknownEvent { id })?;
let event = T::try_from_slice(&raw)
.map_err(|source| DurableEventQueueError::Deserialize { id, source })?;
queue.push_front(DurableEvent { id, event })?;
}
DurableEventQueueMode::Lazy { state } => {
if raw.is_none() {
return Err(DurableEventQueueError::UnknownEvent { id });
}
{
let mut in_flight = state
.in_flight
.lock()
.map_err(|_| DurableEventQueueError::PoisonedLock)?;
in_flight.remove(&id);
state.rewind_scan_cursor(id)?;
}
state.notify()?;
}
}
Ok(())
}
pub fn len(&self) -> Result<usize, DurableEventQueueError> {
let mut count = 0;
for item in self.db.iterator(IteratorMode::Start) {
item?;
count += 1;
}
Ok(count)
}
pub fn is_empty(&self) -> Result<bool, DurableEventQueueError> {
let mut iter = self.db.iterator(IteratorMode::Start);
match iter.next() {
Some(Ok(_)) => Ok(false),
Some(Err(err)) => Err(DurableEventQueueError::RocksDb(err)),
None => Ok(true),
}
}
pub fn ready_len(&self) -> Result<usize, DurableEventQueueError> {
match &self.mode {
DurableEventQueueMode::Eager { queue } => Ok(queue.len()?),
DurableEventQueueMode::Lazy { state } => {
let in_flight = state
.in_flight
.lock()
.map_err(|_| DurableEventQueueError::PoisonedLock)?;
let mut count = 0;
for item in self.db.iterator(IteratorMode::Start) {
let (key, _) = item?;
let id = decode_key(key.as_ref())?;
if !in_flight.contains(&id) {
count += 1;
}
}
Ok(count)
}
}
}
pub fn iterator(&self) -> DurableEventQueueIterator<'_, T> {
DurableEventQueueIterator {
inner: self.db.iterator(IteratorMode::Start),
_event: std::marker::PhantomData,
}
}
}
impl<'a, T> Iterator for DurableEventQueueIterator<'a, T>
where
T: BorshDeserialize,
{
type Item = Result<DurableEvent<T>, DurableEventQueueError>;
fn next(&mut self) -> Option<Self::Item> {
let (key, value) = match self.inner.next()? {
Ok(kv) => kv,
Err(err) => return Some(Err(DurableEventQueueError::RocksDb(err))),
};
let id = match decode_key(key.as_ref()) {
Ok(id) => id,
Err(err) => return Some(Err(err)),
};
let event = match T::try_from_slice(value.as_ref()) {
Ok(event) => event,
Err(source) => {
return Some(Err(DurableEventQueueError::Deserialize { id, source }));
}
};
Some(Ok(DurableEvent { id, event }))
}
}
fn encode_key(id: u64) -> [u8; 8] {
id.to_be_bytes()
}
fn decode_key(key: &[u8]) -> Result<u64, DurableEventQueueError> {
let bytes: [u8; 8] = key
.try_into()
.map_err(|_| DurableEventQueueError::InvalidKeyLength { actual: key.len() })?;
Ok(u64::from_be_bytes(bytes))
}
fn sync_writes() -> WriteOptions {
let mut opts = WriteOptions::default();
opts.set_sync(true);
opts
}
#[cfg(test)]
mod tests {
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
thread,
time::{Duration, Instant},
};
use super::*;
#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize, BorshDeserialize)]
struct TestEvent {
value: String,
}
static COUNTING_EVENT_DESERIALIZE_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, Clone, PartialEq, Eq, BorshSerialize)]
struct CountingEvent {
value: String,
}
impl BorshDeserialize for CountingEvent {
fn deserialize_reader<R: std::io::Read>(reader: &mut R) -> Result<Self, borsh::io::Error> {
COUNTING_EVENT_DESERIALIZE_COUNT.fetch_add(1, Ordering::SeqCst);
Ok(Self {
value: String::deserialize_reader(reader)?,
})
}
}
#[test]
fn push_persists_event_until_ack() {
let temp_dir = tempfile::tempdir().unwrap();
let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
let pushed = queue
.push(TestEvent {
value: "first".to_string(),
})
.unwrap();
assert_eq!(pushed.id, 0);
assert_eq!(queue.len().unwrap(), 1);
assert_eq!(queue.poll().unwrap(), Some(pushed.clone()));
assert_eq!(queue.ready_len().unwrap(), 0);
assert_eq!(queue.len().unwrap(), 1);
drop(queue);
let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
assert_eq!(queue.poll().unwrap(), Some(pushed.clone()));
queue.ack(pushed.id).unwrap();
drop(queue);
let queue = DurableEventQueue::<TestEvent>::open(temp_dir.path()).unwrap();
assert!(queue.is_empty().unwrap());
}
#[test]
fn get_returns_event_by_id_without_polling() {
let temp_dir = tempfile::tempdir().unwrap();
let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
let first = queue
.push(TestEvent {
value: "first".to_string(),
})
.unwrap();
let second = queue
.push(TestEvent {
value: "second".to_string(),
})
.unwrap();
assert_eq!(queue.get(second.id).unwrap(), Some(second.clone()));
assert_eq!(queue.ready_len().unwrap(), 2);
assert_eq!(queue.poll().unwrap(), Some(first));
assert_eq!(queue.poll().unwrap(), Some(second));
}
#[test]
fn get_returns_none_after_ack() {
let temp_dir = tempfile::tempdir().unwrap();
let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
let pushed = queue
.push(TestEvent {
value: "first".to_string(),
})
.unwrap();
assert_eq!(queue.get(pushed.id).unwrap(), Some(pushed.clone()));
queue.ack(pushed.id).unwrap();
assert_eq!(queue.get(pushed.id).unwrap(), None);
}
#[test]
fn push_at_id_max_overflows() {
let temp_dir = tempfile::tempdir().unwrap();
let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
*queue.next_id.lock().unwrap() = u64::MAX;
let err = queue
.push(TestEvent {
value: "boom".to_string(),
})
.unwrap_err();
assert!(
matches!(err, DurableEventQueueError::IdOverflow),
"expected IdOverflow, got {err}"
);
assert!(queue.is_empty().unwrap(), "no event should be written");
}
#[test]
fn nack_requeues_event_for_reprocessing() {
let temp_dir = tempfile::tempdir().unwrap();
let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
let pushed = queue
.push(TestEvent {
value: "retry me".to_string(),
})
.unwrap();
let taken = queue.pop().unwrap().unwrap();
assert_eq!(taken, pushed);
assert_eq!(queue.ready_len().unwrap(), 0);
queue.nack(pushed.id).unwrap();
assert_eq!(queue.ready_len().unwrap(), 1);
let retried = queue.pop().unwrap().unwrap();
assert_eq!(retried, pushed);
assert_eq!(queue.len().unwrap(), 1);
queue.ack(pushed.id).unwrap();
assert!(queue.is_empty().unwrap());
}
#[test]
fn nack_requeues_event_before_ready_events() {
let temp_dir = tempfile::tempdir().unwrap();
let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
let first = queue
.push(TestEvent {
value: "first".to_string(),
})
.unwrap();
let second = queue
.push(TestEvent {
value: "second".to_string(),
})
.unwrap();
assert_eq!(queue.pop().unwrap().unwrap(), first);
queue.nack(first.id).unwrap();
assert_eq!(queue.pop().unwrap().unwrap(), first);
assert_eq!(queue.pop().unwrap().unwrap(), second);
}
#[test]
fn nack_unknown_id_returns_error() {
let temp_dir = tempfile::tempdir().unwrap();
let queue = DurableEventQueue::<TestEvent>::open(temp_dir.path()).unwrap();
let err = queue.nack(99).unwrap_err();
assert!(
matches!(err, DurableEventQueueError::UnknownEvent { id: 99 }),
"expected UnknownEvent(99), got {err}"
);
}
#[test]
fn ack_many_removes_multiple_events() {
let temp_dir = tempfile::tempdir().unwrap();
let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
let first = queue
.push(TestEvent {
value: "first".to_string(),
})
.unwrap();
let second = queue
.push(TestEvent {
value: "second".to_string(),
})
.unwrap();
let third = queue
.push(TestEvent {
value: "third".to_string(),
})
.unwrap();
queue.ack_many([first.id, third.id]).unwrap();
drop(queue);
let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
assert_eq!(queue.pop().unwrap(), Some(second));
assert_eq!(queue.pop().unwrap(), None);
}
#[test]
fn ack_many_is_idempotent_for_unknown_ids() {
let temp_dir = tempfile::tempdir().unwrap();
let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
let pushed = queue
.push(TestEvent {
value: "first".to_string(),
})
.unwrap();
queue.ack_many([pushed.id, 99, pushed.id]).unwrap();
queue.ack_many([pushed.id, 99]).unwrap();
drop(queue);
let queue = DurableEventQueue::<TestEvent>::open(temp_dir.path()).unwrap();
assert!(queue.is_empty().unwrap());
}
#[test]
fn lazy_recovered_events_keep_fifo_order() {
let temp_dir = tempfile::tempdir().unwrap();
let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
let first = queue
.push(TestEvent {
value: "first".to_string(),
})
.unwrap();
let second = queue
.push(TestEvent {
value: "second".to_string(),
})
.unwrap();
drop(queue);
let queue =
DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
.unwrap();
assert_eq!(queue.pop().unwrap(), Some(first));
assert_eq!(queue.pop().unwrap(), Some(second));
assert_eq!(queue.pop().unwrap(), None);
}
#[test]
fn lazy_open_does_not_deserialize_recovered_values_until_poll() {
let temp_dir = tempfile::tempdir().unwrap();
let event = CountingEvent {
value: "large payload".to_string(),
};
{
let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
queue.push(event.clone()).unwrap();
}
COUNTING_EVENT_DESERIALIZE_COUNT.store(0, Ordering::SeqCst);
let queue = DurableEventQueue::<CountingEvent>::open_with_options(
temp_dir.path(),
DurableEventQueueOptions::lazy(),
)
.unwrap();
assert_eq!(COUNTING_EVENT_DESERIALIZE_COUNT.load(Ordering::SeqCst), 0);
assert_eq!(queue.ready_len().unwrap(), 1);
assert_eq!(COUNTING_EVENT_DESERIALIZE_COUNT.load(Ordering::SeqCst), 0);
assert_eq!(queue.pop().unwrap().unwrap().event, event);
assert_eq!(COUNTING_EVENT_DESERIALIZE_COUNT.load(Ordering::SeqCst), 1);
}
#[test]
fn lazy_poll_skips_in_flight_events_without_duplicating() {
let temp_dir = tempfile::tempdir().unwrap();
let queue =
DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
.unwrap();
let first = queue
.push(TestEvent {
value: "first".to_string(),
})
.unwrap();
let second = queue
.push(TestEvent {
value: "second".to_string(),
})
.unwrap();
assert_eq!(queue.pop().unwrap(), Some(first.clone()));
assert_eq!(queue.pop().unwrap(), Some(second.clone()));
assert_eq!(queue.ready_len().unwrap(), 0);
queue.nack(first.id).unwrap();
assert_eq!(queue.pop().unwrap(), Some(first));
assert_eq!(queue.pop().unwrap(), None);
queue.nack(second.id).unwrap();
assert_eq!(queue.pop().unwrap(), Some(second));
}
#[test]
fn lazy_nack_makes_event_available_before_later_ready_events() {
let temp_dir = tempfile::tempdir().unwrap();
let queue =
DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
.unwrap();
let first = queue
.push(TestEvent {
value: "first".to_string(),
})
.unwrap();
let second = queue
.push(TestEvent {
value: "second".to_string(),
})
.unwrap();
assert_eq!(queue.pop().unwrap(), Some(first.clone()));
queue.nack(first.id).unwrap();
assert_eq!(queue.pop().unwrap(), Some(first));
assert_eq!(queue.pop().unwrap(), Some(second));
}
#[test]
fn lazy_scan_cursor_advances_and_rewinds_on_nack() {
let temp_dir = tempfile::tempdir().unwrap();
let queue =
DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
.unwrap();
let first = queue
.push(TestEvent {
value: "first".to_string(),
})
.unwrap();
let second = queue
.push(TestEvent {
value: "second".to_string(),
})
.unwrap();
let state = match &queue.mode {
DurableEventQueueMode::Lazy { state } => state,
DurableEventQueueMode::Eager { .. } => panic!("expected lazy queue"),
};
assert_eq!(state.scan_cursor().unwrap(), first.id);
assert_eq!(queue.pop().unwrap(), Some(first.clone()));
assert_eq!(state.scan_cursor().unwrap(), second.id);
assert_eq!(queue.pop().unwrap(), Some(second.clone()));
assert_eq!(state.scan_cursor().unwrap(), second.id + 1);
queue.nack(first.id).unwrap();
assert_eq!(state.scan_cursor().unwrap(), first.id);
assert_eq!(queue.pop().unwrap(), Some(first.clone()));
assert_eq!(state.scan_cursor().unwrap(), second.id);
queue.ack(first.id).unwrap();
queue.nack(second.id).unwrap();
assert_eq!(state.scan_cursor().unwrap(), second.id);
assert_eq!(queue.pop().unwrap(), Some(second.clone()));
assert_eq!(state.scan_cursor().unwrap(), second.id + 1);
}
#[test]
fn lazy_ack_many_removes_in_flight_and_ready_events() {
let temp_dir = tempfile::tempdir().unwrap();
let queue =
DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
.unwrap();
let first = queue
.push(TestEvent {
value: "first".to_string(),
})
.unwrap();
let second = queue
.push(TestEvent {
value: "second".to_string(),
})
.unwrap();
let third = queue
.push(TestEvent {
value: "third".to_string(),
})
.unwrap();
assert_eq!(queue.pop().unwrap(), Some(first.clone()));
queue.ack_many([first.id, third.id]).unwrap();
assert_eq!(queue.pop().unwrap(), Some(second));
assert_eq!(queue.pop().unwrap(), None);
}
#[test]
fn lazy_push_wakes_blocking_poll() {
let temp_dir = tempfile::tempdir().unwrap();
let queue = Arc::new(
DurableEventQueue::open_with_options(temp_dir.path(), DurableEventQueueOptions::lazy())
.unwrap(),
);
let polling_queue = Arc::clone(&queue);
let handle = thread::spawn(move || polling_queue.poll().unwrap());
thread::sleep(Duration::from_millis(20));
let started = Instant::now();
let pushed = queue
.push(TestEvent {
value: "wake".to_string(),
})
.unwrap();
assert_eq!(handle.join().unwrap(), Some(pushed));
assert!(
started.elapsed() < Duration::from_millis(80),
"poll should wake after push instead of waiting for the full timeout"
);
}
#[test]
fn recovered_events_keep_fifo_order() {
let temp_dir = tempfile::tempdir().unwrap();
let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
let first = queue
.push(TestEvent {
value: "first".to_string(),
})
.unwrap();
let second = queue
.push(TestEvent {
value: "second".to_string(),
})
.unwrap();
drop(queue);
let queue = DurableEventQueue::open(temp_dir.path()).unwrap();
assert_eq!(queue.pop().unwrap(), Some(first));
assert_eq!(queue.pop().unwrap(), Some(second));
assert_eq!(queue.pop().unwrap(), None);
}
}