use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use crate::runtime::queue_telemetry::{NackOutcomeLabel, QueueTelemetryCounters};
use crate::storage::queue::lifecycle::{
DeliveryId, DlqTarget, MessageId, PendingDeliveryView, QueueSide, QueueStore, QueueStoreError,
QueueTxn, Result,
};
use crate::storage::queue::mode::QueueMode;
use crate::storage::schema::Value;
use crate::telemetry::operator_event::OperatorEvent;
pub(crate) trait Clock: Send + Sync {
fn now(&self) -> Instant;
}
pub(crate) struct SystemClock;
impl Clock for SystemClock {
fn now(&self) -> Instant {
Instant::now()
}
}
#[derive(Debug, Clone)]
pub(crate) struct LifecycleConfig {
pub(crate) lock_duration: Duration,
pub(crate) dlq_target: Option<DlqTarget>,
pub(crate) mode: QueueMode,
}
impl Default for LifecycleConfig {
fn default() -> Self {
Self {
lock_duration: Duration::from_secs(30),
dlq_target: None,
mode: QueueMode::Work,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum RetirementOutcome {
Requeued,
MovedToDlq(DlqTarget),
Dropped,
}
#[derive(Debug, Clone)]
pub(crate) struct Delivery {
pub(crate) delivery_id: DeliveryId,
pub(crate) payload: Value,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct QueueMessageView {
pub(crate) message_id: MessageId,
pub(crate) payload: Value,
pub(crate) max_attempts: u32,
}
pub(crate) struct QueueLifecycle<S: QueueStore> {
store: S,
config: LifecycleConfig,
clock: Arc<dyn Clock>,
outcomes: Mutex<Vec<RetirementOutcome>>,
telemetry: Option<Arc<QueueTelemetryCounters>>,
}
impl<S: QueueStore> QueueLifecycle<S> {
pub(crate) fn new(store: S, config: LifecycleConfig) -> Self {
Self::with_clock(store, config, Arc::new(SystemClock))
}
pub(crate) fn with_clock(
store: S,
config: LifecycleConfig,
clock: Arc<dyn Clock>,
) -> Self {
Self {
store,
config,
clock,
outcomes: Mutex::new(Vec::new()),
telemetry: None,
}
}
#[allow(dead_code)]
pub(crate) fn with_telemetry(mut self, telemetry: Arc<QueueTelemetryCounters>) -> Self {
self.telemetry = Some(telemetry);
self
}
pub(crate) fn deliver(
&self,
txn: &QueueTxn,
queue: &str,
group: &str,
count: usize,
) -> Result<Vec<Delivery>> {
if count == 0 {
return Ok(Vec::new());
}
let now = self.clock.now();
tracing::debug!(
target: "reddb::queue_lifecycle",
queue = queue,
group = group,
"queue lock reclaim sweep"
);
self.store.reclaim_expired(txn, queue, now)?;
let available = match self.config.mode {
QueueMode::Work => self.store.available_messages(queue, QueueSide::Left),
QueueMode::Fanout => self
.store
.available_messages_for_group(queue, group, QueueSide::Left),
};
let mut out = Vec::with_capacity(count.min(available.len()));
for message_id in available.into_iter().take(count) {
let deadline = now + self.config.lock_duration;
let delivery_id = self.store.mark_pending(txn, queue, message_id, group, deadline)?;
let payload = self
.store
.read_message(queue, message_id)
.ok_or_else(|| QueueStoreError::UnknownQueue(queue.to_string()))?;
out.push(Delivery {
delivery_id,
payload,
});
}
if !out.is_empty() {
if let Some(telemetry) = self.telemetry.as_ref() {
telemetry.record_delivered(
queue,
group,
self.config.mode.as_str(),
out.len() as u64,
);
}
}
Ok(out)
}
pub(crate) fn ack(&self, txn: &QueueTxn, delivery_id: &str) -> Result<()> {
self.retire(txn, delivery_id)
}
fn retire(&self, txn: &QueueTxn, delivery_id: &str) -> Result<()> {
match self.config.mode {
QueueMode::Work => self.store.ack_pending(txn, delivery_id),
QueueMode::Fanout => self.store.retire_for_group(txn, delivery_id),
}
}
pub(crate) fn nack(&self, txn: &QueueTxn, delivery_id: &str) -> Result<()> {
let bumped = self.store.bump_attempt(txn, delivery_id)?;
let max_attempts = self
.store
.read_max_attempts(&bumped.queue, bumped.message_id);
let attempts = bumped.attempts;
if attempts >= max_attempts {
match &self.config.dlq_target {
Some(target) => {
let payload = self
.store
.read_pending_payload(delivery_id)
.ok_or_else(|| {
QueueStoreError::UnknownDelivery(delivery_id.to_string())
})?;
self.retire(txn, delivery_id)?;
self.store.enqueue_dlq(txn, target, payload)?;
self.record(RetirementOutcome::MovedToDlq(target.clone()));
OperatorEvent::QueueDlqPromoted {
queue: String::new(),
group: String::new(),
dlq: target.clone(),
message_id: 0,
attempts,
reason: format!("lifecycle_nack:{delivery_id}"),
}
.emit_global();
}
None => {
self.retire(txn, delivery_id)?;
self.record(RetirementOutcome::Dropped);
}
}
} else {
self.store.release_pending(txn, delivery_id)?;
self.record(RetirementOutcome::Requeued);
}
if let Some(telemetry) = self.telemetry.as_ref() {
let outcome = match self.outcomes.lock().unwrap_or_else(|p| p.into_inner()).last() {
Some(RetirementOutcome::MovedToDlq(_)) => NackOutcomeLabel::Dlq,
Some(RetirementOutcome::Dropped) => NackOutcomeLabel::Drop,
Some(RetirementOutcome::Requeued) | None => NackOutcomeLabel::Retry,
};
telemetry.record_nacked("", "", self.config.mode.as_str(), outcome);
}
Ok(())
}
pub(crate) fn peek(
&self,
queue: &str,
count: usize,
_txn: &QueueTxn,
) -> Vec<QueueMessageView> {
if count == 0 {
return Vec::new();
}
let available = self.store.available_messages(queue, QueueSide::Left);
available
.into_iter()
.take(count)
.filter_map(|message_id| {
self.store
.read_message(queue, message_id)
.map(|payload| QueueMessageView {
message_id,
payload,
max_attempts: self.store.read_max_attempts(queue, message_id),
})
})
.collect()
}
pub(crate) fn read(&self, delivery_id: &str, _txn: &QueueTxn) -> Option<Value> {
self.store.read_pending_payload(delivery_id)
}
pub(crate) fn claim(
&self,
queue: &str,
_new_consumer: &str,
min_idle_ms: u64,
txn: &QueueTxn,
) -> Result<Vec<DeliveryId>> {
let now = self.clock.now();
let threshold = Duration::from_millis(min_idle_ms);
let mut candidates: Vec<PendingDeliveryView> = self
.store
.pending_deliveries_for_queue(queue)
.into_iter()
.filter(|p| p.deadline + threshold <= now)
.collect();
candidates.sort_by_key(|p| p.deadline);
let mut claimed = Vec::with_capacity(candidates.len());
for entry in candidates {
let bumped = self.store.bump_attempt(txn, &entry.delivery_id)?;
let max_attempts = self
.store
.read_max_attempts(&bumped.queue, bumped.message_id);
if bumped.attempts > max_attempts {
match &self.config.dlq_target {
Some(target) => {
let payload = self
.store
.read_pending_payload(&entry.delivery_id)
.ok_or_else(|| {
QueueStoreError::UnknownDelivery(entry.delivery_id.clone())
})?;
self.retire(txn, &entry.delivery_id)?;
self.store.enqueue_dlq(txn, target, payload)?;
self.record(RetirementOutcome::MovedToDlq(target.clone()));
}
None => {
self.retire(txn, &entry.delivery_id)?;
self.record(RetirementOutcome::Dropped);
}
}
continue;
}
let new_deadline = now + self.config.lock_duration;
self.store
.mark_pending(txn, &entry.queue, entry.message_id, &entry.group, new_deadline)?;
claimed.push(entry.delivery_id);
}
Ok(claimed)
}
pub(crate) fn purge(&self, queue: &str, txn: &QueueTxn) -> Result<usize> {
self.store.purge_queue(txn, queue)
}
fn record(&self, outcome: RetirementOutcome) {
self.outcomes
.lock()
.expect("outcomes poisoned")
.push(outcome);
}
#[cfg(test)]
pub(crate) fn recorded_outcomes(&self) -> Vec<RetirementOutcome> {
self.outcomes.lock().expect("outcomes poisoned").clone()
}
#[cfg(test)]
pub(crate) fn store_ref(&self) -> &S {
&self.store
}
}
#[cfg(test)]
pub(crate) struct TestClock {
inner: Mutex<Instant>,
}
#[cfg(test)]
impl TestClock {
pub(crate) fn new() -> Self {
Self {
inner: Mutex::new(Instant::now()),
}
}
pub(crate) fn advance(&self, by: Duration) {
let mut now = self.inner.lock().expect("clock poisoned");
*now += by;
}
}
#[cfg(test)]
impl Clock for TestClock {
fn now(&self) -> Instant {
*self.inner.lock().expect("clock poisoned")
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::queue::lifecycle::{InMemoryQueueStore, TombstoneRecord};
fn store_with(messages: &[(MessageId, &str)]) -> InMemoryQueueStore {
let store = InMemoryQueueStore::new();
let ids: Vec<MessageId> = messages.iter().map(|(id, _)| *id).collect();
store.seed_queue("q", ids);
for (id, payload) in messages {
store.seed_payload("q", *id, Value::text(*payload));
}
store
}
fn lifecycle(store: InMemoryQueueStore) -> QueueLifecycle<InMemoryQueueStore> {
QueueLifecycle::new(store, LifecycleConfig::default())
}
#[test]
fn work_deliver_returns_one_message_to_one_consumer() {
let lc = lifecycle(store_with(&[(1, "first"), (2, "second")]));
let deliveries = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("deliver");
assert_eq!(deliveries.len(), 1);
assert_eq!(deliveries[0].payload, Value::text("first"));
assert!(!deliveries[0].delivery_id.is_empty());
}
#[test]
fn work_second_consumer_gets_a_different_message() {
let lc = lifecycle(store_with(&[(1, "first"), (2, "second")]));
let a = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("deliver a");
let b = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("deliver b");
assert_eq!(a.len(), 1);
assert_eq!(b.len(), 1);
assert_ne!(a[0].delivery_id, b[0].delivery_id);
assert_ne!(a[0].payload, b[0].payload);
assert_eq!(b[0].payload, Value::text("second"));
}
#[test]
fn work_exhausted_queue_returns_empty() {
let lc = lifecycle(store_with(&[(1, "only")]));
let first = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("first");
assert_eq!(first.len(), 1);
let empty = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("empty");
assert!(empty.is_empty(), "exhausted queue should yield no deliveries");
}
#[test]
fn deliver_with_count_zero_is_noop() {
let lc = lifecycle(store_with(&[(1, "first")]));
let got = lc.deliver(&QueueTxn::new(),"q", "workers", 0).expect("zero count");
assert!(got.is_empty());
}
#[test]
fn ack_retires_message_no_longer_redeliverable() {
let lc = lifecycle(store_with(&[(1, "first")]));
let delivered = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("deliver");
let delivery_id = delivered[0].delivery_id.clone();
lc.ack(&QueueTxn::new(),&delivery_id).expect("ack");
let again = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("redeliver attempt");
assert!(again.is_empty(), "acked message must not redeliver");
}
#[test]
fn ack_unknown_delivery_id_errors() {
let store = InMemoryQueueStore::new();
let lc = QueueLifecycle::new(store, LifecycleConfig::default());
let err = lc.ack(&QueueTxn::new(),"does-not-exist").unwrap_err();
assert!(matches!(err, QueueStoreError::UnknownDelivery(_)));
}
#[test]
fn deliver_count_larger_than_available_returns_all_available() {
let lc = lifecycle(store_with(&[(1, "a"), (2, "b")]));
let got = lc.deliver(&QueueTxn::new(),"q", "workers", 10).expect("deliver");
assert_eq!(got.len(), 2);
assert_eq!(got[0].payload, Value::text("a"));
assert_eq!(got[1].payload, Value::text("b"));
assert_ne!(got[0].delivery_id, got[1].delivery_id);
}
#[test]
fn deliver_on_unknown_queue_returns_empty() {
let store = InMemoryQueueStore::new();
let lc = QueueLifecycle::new(store, LifecycleConfig::default());
let got = lc.deliver(&QueueTxn::new(),"missing", "workers", 5).expect("deliver");
assert!(got.is_empty());
}
fn config_with(
store: &InMemoryQueueStore,
max_attempts: u32,
dlq: Option<&str>,
) -> LifecycleConfig {
store.seed_max_attempts("q", 1, max_attempts);
LifecycleConfig {
dlq_target: dlq.map(|s| s.to_string()),
..LifecycleConfig::default()
}
}
#[test]
fn nack_below_max_requeues_same_message() {
let store = store_with(&[(1, "payload")]);
let cfg = config_with(&store, 3, None);
let lc = QueueLifecycle::new(store, cfg);
let first = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("deliver-1");
assert_eq!(first[0].payload, Value::text("payload"));
lc.nack(&QueueTxn::new(),&first[0].delivery_id).expect("nack-1");
let second = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("deliver-2");
assert_eq!(second.len(), 1, "requeued message must redeliver");
assert_eq!(second[0].payload, Value::text("payload"));
assert_ne!(second[0].delivery_id, first[0].delivery_id);
assert_eq!(lc.recorded_outcomes(), vec![RetirementOutcome::Requeued]);
}
#[test]
fn three_nacks_at_max_three_yield_two_requeues_then_retire() {
let store = store_with(&[(1, "payload")]);
let cfg = config_with(&store, 3, None);
let lc = QueueLifecycle::new(store, cfg);
for _ in 0..2 {
let d = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("deliver");
lc.nack(&QueueTxn::new(),&d[0].delivery_id).expect("nack");
}
let third = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("deliver-3");
lc.nack(&QueueTxn::new(),&third[0].delivery_id).expect("nack-3");
assert_eq!(
lc.recorded_outcomes(),
vec![
RetirementOutcome::Requeued,
RetirementOutcome::Requeued,
RetirementOutcome::Dropped,
]
);
assert!(lc.deliver(&QueueTxn::new(),"q", "workers", 1).unwrap().is_empty());
}
#[test]
fn nack_at_max_with_dlq_promotes_to_dlq_target() {
let store = store_with(&[(1, "orders/42")]);
let cfg = config_with(&store, 2, Some("orders.dlq"));
let lc = QueueLifecycle::new(store, cfg);
let a = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("deliver-a");
lc.nack(&QueueTxn::new(),&a[0].delivery_id).expect("nack-a");
let b = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("deliver-b");
lc.nack(&QueueTxn::new(),&b[0].delivery_id).expect("nack-b");
assert_eq!(
lc.recorded_outcomes(),
vec![
RetirementOutcome::Requeued,
RetirementOutcome::MovedToDlq("orders.dlq".to_string()),
]
);
let dlq = lc.store.dlq_snapshot();
assert_eq!(dlq.len(), 1, "exactly one DLQ enqueue");
assert_eq!(dlq[0].target, "orders.dlq");
assert_eq!(dlq[0].original, Value::text("orders/42"));
assert!(lc.deliver(&QueueTxn::new(),"q", "workers", 1).unwrap().is_empty());
}
#[test]
fn nack_at_max_without_dlq_drops_silently() {
let store = store_with(&[(1, "ephemeral")]);
let cfg = config_with(&store, 1, None);
let lc = QueueLifecycle::new(store, cfg);
let d = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("deliver");
lc.nack(&QueueTxn::new(),&d[0].delivery_id).expect("nack");
assert_eq!(lc.recorded_outcomes(), vec![RetirementOutcome::Dropped]);
assert!(lc.store.dlq_snapshot().is_empty(), "no DLQ enqueue when target unset");
assert!(lc.deliver(&QueueTxn::new(),"q", "workers", 1).unwrap().is_empty());
}
#[test]
fn nack_unknown_delivery_id_errors() {
let store = InMemoryQueueStore::new();
let cfg = config_with(&store, 3, None);
let lc = QueueLifecycle::new(store, cfg);
let err = lc.nack(&QueueTxn::new(),"nope").unwrap_err();
assert!(matches!(err, QueueStoreError::UnknownDelivery(_)));
assert!(lc.recorded_outcomes().is_empty());
}
fn fanout_config() -> LifecycleConfig {
LifecycleConfig {
mode: QueueMode::Fanout,
..LifecycleConfig::default()
}
}
fn fanout_config_with(
store: &InMemoryQueueStore,
max_attempts: u32,
dlq: Option<&str>,
) -> LifecycleConfig {
store.seed_max_attempts("q", 1, max_attempts);
LifecycleConfig {
mode: QueueMode::Fanout,
dlq_target: dlq.map(|s| s.to_string()),
..LifecycleConfig::default()
}
}
#[test]
fn fanout_two_groups_both_receive_same_message() {
let lc = QueueLifecycle::new(store_with(&[(1, "shared")]), fanout_config());
let a = lc.deliver(&QueueTxn::new(),"q", "subs.a", 1).expect("deliver a");
let b = lc.deliver(&QueueTxn::new(),"q", "subs.b", 1).expect("deliver b");
assert_eq!(a.len(), 1);
assert_eq!(b.len(), 1);
assert_eq!(a[0].payload, Value::text("shared"));
assert_eq!(b[0].payload, Value::text("shared"));
assert_ne!(a[0].delivery_id, b[0].delivery_id);
}
#[test]
fn fanout_ack_by_one_group_leaves_other_pending_intact() {
let lc = QueueLifecycle::new(store_with(&[(1, "shared")]), fanout_config());
let a = lc.deliver(&QueueTxn::new(),"q", "subs.a", 1).expect("deliver a");
let b = lc.deliver(&QueueTxn::new(),"q", "subs.b", 1).expect("deliver b");
lc.ack(&QueueTxn::new(),&a[0].delivery_id).expect("ack a");
assert!(lc.deliver(&QueueTxn::new(),"q", "subs.a", 1).unwrap().is_empty());
lc.ack(&QueueTxn::new(),&b[0].delivery_id).expect("ack b still valid");
}
#[test]
fn fanout_nack_by_one_group_does_not_touch_other() {
let store = store_with(&[(1, "shared")]);
let cfg = fanout_config_with(&store, 3, None);
let lc = QueueLifecycle::new(store, cfg);
let a = lc.deliver(&QueueTxn::new(),"q", "subs.a", 1).expect("deliver a");
let b = lc.deliver(&QueueTxn::new(),"q", "subs.b", 1).expect("deliver b");
let b_delivery = b[0].delivery_id.clone();
lc.nack(&QueueTxn::new(),&a[0].delivery_id).expect("nack a");
assert_eq!(lc.recorded_outcomes(), vec![RetirementOutcome::Requeued]);
let a2 = lc.deliver(&QueueTxn::new(),"q", "subs.a", 1).expect("a redeliver");
assert_eq!(a2.len(), 1);
assert_eq!(a2[0].payload, Value::text("shared"));
lc.ack(&QueueTxn::new(),&b_delivery).expect("ack b's original delivery_id");
}
#[test]
fn fanout_terminal_nack_with_dlq_only_retires_caller_group() {
let store = store_with(&[(1, "orders/42")]);
let cfg = fanout_config_with(&store, 1, Some("orders.dlq"));
let lc = QueueLifecycle::new(store, cfg);
let a = lc.deliver(&QueueTxn::new(),"q", "subs.a", 1).expect("deliver a");
lc.nack(&QueueTxn::new(),&a[0].delivery_id).expect("nack a");
assert_eq!(
lc.recorded_outcomes(),
vec![RetirementOutcome::MovedToDlq("orders.dlq".to_string())]
);
let dlq = lc.store.dlq_snapshot();
assert_eq!(dlq.len(), 1);
assert_eq!(dlq[0].original, Value::text("orders/42"));
assert!(lc.deliver(&QueueTxn::new(),"q", "subs.a", 1).unwrap().is_empty());
let b = lc.deliver(&QueueTxn::new(),"q", "subs.b", 1).expect("deliver b");
assert_eq!(b.len(), 1);
assert_eq!(b[0].payload, Value::text("orders/42"));
}
#[test]
fn fanout_terminal_nack_no_dlq_drops_for_caller_group_only() {
let store = store_with(&[(1, "p")]);
let cfg = fanout_config_with(&store, 1, None);
let lc = QueueLifecycle::new(store, cfg);
let a = lc.deliver(&QueueTxn::new(),"q", "subs.a", 1).expect("deliver a");
lc.nack(&QueueTxn::new(),&a[0].delivery_id).expect("nack a");
assert_eq!(lc.recorded_outcomes(), vec![RetirementOutcome::Dropped]);
assert!(lc.store.dlq_snapshot().is_empty());
assert!(lc.deliver(&QueueTxn::new(),"q", "subs.a", 1).unwrap().is_empty());
let b = lc.deliver(&QueueTxn::new(),"q", "subs.b", 1).expect("deliver b");
assert_eq!(b.len(), 1);
assert_eq!(b[0].payload, Value::text("p"));
}
#[test]
fn fanout_does_not_share_pending_across_groups_with_work_semantics() {
let lc = QueueLifecycle::new(store_with(&[(1, "x"), (2, "y")]), fanout_config());
let a = lc.deliver(&QueueTxn::new(),"q", "subs.a", 2).expect("deliver a");
let b = lc.deliver(&QueueTxn::new(),"q", "subs.b", 2).expect("deliver b");
assert_eq!(a.len(), 2);
assert_eq!(b.len(), 2);
let a_payloads: Vec<_> = a.iter().map(|d| d.payload.clone()).collect();
let b_payloads: Vec<_> = b.iter().map(|d| d.payload.clone()).collect();
assert_eq!(a_payloads, b_payloads);
}
#[test]
fn nack_requeue_preserves_attempt_count_across_new_delivery_id() {
let store = store_with(&[(1, "p")]);
let cfg = config_with(&store, 3, Some("dlq"));
let lc = QueueLifecycle::new(store, cfg);
let mut ids = Vec::new();
for _ in 0..3 {
let d = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("deliver");
assert_eq!(d.len(), 1, "should always redeliver until retired");
ids.push(d[0].delivery_id.clone());
lc.nack(&QueueTxn::new(),&d[0].delivery_id).expect("nack");
}
assert_ne!(ids[0], ids[1]);
assert_ne!(ids[1], ids[2]);
assert_eq!(
lc.recorded_outcomes(),
vec![
RetirementOutcome::Requeued,
RetirementOutcome::Requeued,
RetirementOutcome::MovedToDlq("dlq".to_string()),
]
);
assert_eq!(lc.store.dlq_snapshot().len(), 1);
}
fn config_with_lock(lock: Duration) -> LifecycleConfig {
LifecycleConfig {
lock_duration: lock,
..LifecycleConfig::default()
}
}
#[test]
fn expired_pending_is_reclaimed_lazily_on_next_deliver() {
let clock = Arc::new(TestClock::new());
let lc = QueueLifecycle::with_clock(
store_with(&[(1, "only")]),
config_with_lock(Duration::from_millis(100)),
clock.clone() as Arc<dyn Clock>,
);
let first = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("first");
assert_eq!(first.len(), 1);
assert_eq!(first[0].payload, Value::text("only"));
clock.advance(Duration::from_millis(50));
let blocked = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("still locked");
assert!(blocked.is_empty(), "lock still held — must not redeliver");
clock.advance(Duration::from_millis(60));
let second = lc.deliver(&QueueTxn::new(),"q", "workers", 1).expect("after expiry");
assert_eq!(second.len(), 1, "expired lock must release for redelivery");
assert_eq!(second[0].payload, Value::text("only"));
assert_ne!(second[0].delivery_id, first[0].delivery_id);
}
#[test]
fn partial_advance_then_recreate_module_keeps_lock_held() {
let clock = Arc::new(TestClock::new());
let store = store_with(&[(1, "in-flight")]);
let lc1 = QueueLifecycle::with_clock(
store.clone(),
config_with_lock(Duration::from_secs(30)),
clock.clone() as Arc<dyn Clock>,
);
let first = lc1.deliver(&QueueTxn::new(),"q", "workers", 1).expect("deliver");
assert_eq!(first.len(), 1);
let original_delivery_id = first[0].delivery_id.clone();
clock.advance(Duration::from_secs(5));
let lc2 = QueueLifecycle::with_clock(
store.clone(),
config_with_lock(Duration::from_secs(30)),
clock.clone() as Arc<dyn Clock>,
);
let again = lc2.deliver(&QueueTxn::new(),"q", "workers", 1).expect("post-restart deliver");
assert!(
again.is_empty(),
"pending row must survive Module recreation while deadline holds"
);
lc2.ack(&QueueTxn::new(),&original_delivery_id).expect("original delivery still ackable");
}
#[test]
fn peek_returns_available_slice_without_mutating_state() {
let store = store_with(&[(1, "a"), (2, "b"), (3, "c")]);
let lc = lifecycle(store);
let t = QueueTxn::new();
let view = lc.peek("q", 2, &t);
assert_eq!(view.len(), 2);
assert_eq!(view[0].message_id, 1);
assert_eq!(view[0].payload, Value::text("a"));
assert_eq!(view[1].message_id, 2);
assert_eq!(view[1].payload, Value::text("b"));
let delivered = lc.deliver(&QueueTxn::new(), "q", "workers", 1).expect("deliver");
assert_eq!(delivered.len(), 1);
assert_eq!(delivered[0].payload, Value::text("a"));
assert!(t.recorded_tombstones().is_empty(), "peek must not tombstone");
}
#[test]
fn peek_count_zero_returns_empty() {
let lc = lifecycle(store_with(&[(1, "a")]));
let got = lc.peek("q", 0, &QueueTxn::new());
assert!(got.is_empty());
}
#[test]
fn peek_unknown_queue_returns_empty() {
let store = InMemoryQueueStore::new();
let lc = QueueLifecycle::new(store, LifecycleConfig::default());
let got = lc.peek("missing", 5, &QueueTxn::new());
assert!(got.is_empty());
}
#[test]
fn peek_count_larger_than_available_returns_all() {
let lc = lifecycle(store_with(&[(1, "a"), (2, "b")]));
let view = lc.peek("q", 10, &QueueTxn::new());
assert_eq!(view.len(), 2);
}
#[test]
fn peek_skips_pending_messages() {
let lc = lifecycle(store_with(&[(1, "a"), (2, "b"), (3, "c")]));
let _ = lc.deliver(&QueueTxn::new(), "q", "workers", 1).expect("deliver");
let view = lc.peek("q", 5, &QueueTxn::new());
let ids: Vec<_> = view.iter().map(|v| v.message_id).collect();
assert_eq!(ids, vec![2, 3]);
}
#[test]
fn peek_carries_per_message_max_attempts() {
let store = store_with(&[(1, "a"), (2, "b")]);
store.seed_max_attempts("q", 1, 7);
let lc = lifecycle(store);
let view = lc.peek("q", 2, &QueueTxn::new());
assert_eq!(view[0].message_id, 1);
assert_eq!(view[0].max_attempts, 7);
assert_eq!(view[1].message_id, 2);
assert_eq!(view[1].max_attempts, 3);
}
#[test]
fn peek_works_on_fanout_queue_and_does_not_mutate() {
let store = store_with(&[(1, "x"), (2, "y")]);
let lc = QueueLifecycle::new(store, fanout_config());
let view = lc.peek("q", 5, &QueueTxn::new());
assert_eq!(view.len(), 2);
assert_eq!(view[0].payload, Value::text("x"));
assert_eq!(view[1].payload, Value::text("y"));
let a = lc.deliver(&QueueTxn::new(), "q", "subs.a", 2).expect("deliver a");
let b = lc.deliver(&QueueTxn::new(), "q", "subs.b", 2).expect("deliver b");
assert_eq!(a.len(), 2);
assert_eq!(b.len(), 2);
}
#[test]
fn read_returns_payload_for_pending_delivery_without_locking() {
let lc = lifecycle(store_with(&[(1, "payload-1")]));
let t = QueueTxn::new();
let delivered = lc.deliver(&t, "q", "workers", 1).expect("deliver");
let id = delivered[0].delivery_id.clone();
let deadline_before = lc.store_ref().read_lock_deadline(&id);
let got = lc.read(&id, &t);
assert_eq!(got, Some(Value::text("payload-1")));
assert_eq!(lc.store_ref().read_lock_deadline(&id), deadline_before);
assert!(t.recorded_tombstones().is_empty(), "read must not tombstone");
lc.ack(&QueueTxn::new(), &id).expect("delivery still ackable after read");
}
#[test]
fn read_returns_none_for_unknown_delivery() {
let lc = lifecycle(store_with(&[(1, "p")]));
assert!(lc.read("does-not-exist", &QueueTxn::new()).is_none());
}
#[test]
fn purge_on_empty_queue_returns_zero_and_records_no_tombstones() {
let store = InMemoryQueueStore::new();
let lc = QueueLifecycle::new(store, LifecycleConfig::default());
let t = QueueTxn::new();
let n = lc.purge("missing", &t).expect("purge unknown");
assert_eq!(n, 0);
assert!(t.recorded_tombstones().is_empty());
}
#[test]
fn purge_on_seeded_queue_with_no_pending_removes_all_and_returns_count() {
let lc = lifecycle(store_with(&[(1, "a"), (2, "b"), (3, "c")]));
let t = QueueTxn::new();
let n = lc.purge("q", &t).expect("purge");
assert_eq!(n, 3);
assert!(
lc.store_ref().available_messages("q", QueueSide::Left).is_empty(),
"no available messages after purge",
);
for id in [1u64, 2, 3] {
assert!(
lc.store_ref().read_message("q", id).is_none(),
"message {id} payload should be purged",
);
}
assert_eq!(
t.recorded_tombstones(),
vec![
TombstoneRecord { queue: "q".to_string(), message_id: 1 },
TombstoneRecord { queue: "q".to_string(), message_id: 2 },
TombstoneRecord { queue: "q".to_string(), message_id: 3 },
],
);
}
#[test]
fn purge_removes_pending_rows_and_records_tombstone_per_message() {
let lc = lifecycle(store_with(&[(1, "first"), (2, "second")]));
let delivered = lc.deliver(&QueueTxn::new(), "q", "workers", 1).expect("deliver");
assert_eq!(delivered.len(), 1);
let pending_id = delivered[0].delivery_id.clone();
let t = QueueTxn::new();
let n = lc.purge("q", &t).expect("purge");
assert_eq!(n, 2);
assert!(
lc.store_ref().read_lock_deadline(&pending_id).is_none(),
"pending row should be gone after purge",
);
let t2 = QueueTxn::new();
assert_eq!(lc.purge("q", &t2).expect("re-purge"), 0);
assert!(t2.recorded_tombstones().is_empty());
assert_eq!(
t.recorded_tombstones(),
vec![
TombstoneRecord { queue: "q".to_string(), message_id: 1 },
TombstoneRecord { queue: "q".to_string(), message_id: 2 },
],
);
}
#[test]
fn purge_on_fanout_queue_tombstones_each_message_once() {
let lc = QueueLifecycle::new(store_with(&[(1, "shared"), (2, "other")]), fanout_config());
let _a = lc.deliver(&QueueTxn::new(), "q", "subs.a", 2).expect("deliver a");
let _b = lc.deliver(&QueueTxn::new(), "q", "subs.b", 2).expect("deliver b");
let t = QueueTxn::new();
let n = lc.purge("q", &t).expect("purge");
assert_eq!(n, 2, "two unique message ids — not four pending rows");
assert_eq!(
t.recorded_tombstones(),
vec![
TombstoneRecord { queue: "q".to_string(), message_id: 1 },
TombstoneRecord { queue: "q".to_string(), message_id: 2 },
],
);
assert!(lc.deliver(&QueueTxn::new(), "q", "subs.a", 5).unwrap().is_empty());
assert!(lc.deliver(&QueueTxn::new(), "q", "subs.b", 5).unwrap().is_empty());
}
fn lifecycle_with_clock(
store: InMemoryQueueStore,
cfg: LifecycleConfig,
clock: Arc<TestClock>,
) -> QueueLifecycle<InMemoryQueueStore> {
QueueLifecycle::with_clock(store, cfg, clock as Arc<dyn Clock>)
}
#[test]
fn claim_on_queue_with_no_pending_returns_empty() {
let lc = lifecycle(store_with(&[(1, "a")]));
let claimed = lc.claim("q", "consumer-x", 0, &QueueTxn::new()).expect("claim");
assert!(claimed.is_empty());
let claimed = lc.claim("missing", "c", 0, &QueueTxn::new()).expect("claim");
assert!(claimed.is_empty());
}
#[test]
fn claim_on_idle_but_not_expired_returns_empty() {
let clock = Arc::new(TestClock::new());
let store = store_with(&[(1, "payload")]);
let lc = lifecycle_with_clock(
store,
LifecycleConfig {
lock_duration: Duration::from_secs(30),
..LifecycleConfig::default()
},
clock.clone(),
);
let d = lc.deliver(&QueueTxn::new(), "q", "workers", 1).expect("deliver");
let original_deadline = lc.store_ref().read_lock_deadline(&d[0].delivery_id);
clock.advance(Duration::from_secs(5));
let claimed = lc.claim("q", "consumer-x", 0, &QueueTxn::new()).expect("claim");
assert!(claimed.is_empty(), "lock still held; nothing to claim");
assert_eq!(lc.store_ref().read_lock_deadline(&d[0].delivery_id), original_deadline);
}
#[test]
fn claim_on_expired_delivery_reassigns_and_bumps_attempt() {
let clock = Arc::new(TestClock::new());
let store = store_with(&[(1, "payload")]);
store.seed_max_attempts("q", 1, 10);
let lc = lifecycle_with_clock(
store,
LifecycleConfig {
lock_duration: Duration::from_millis(100),
..LifecycleConfig::default()
},
clock.clone(),
);
let d = lc.deliver(&QueueTxn::new(), "q", "workers", 1).expect("deliver");
let id = d[0].delivery_id.clone();
let pre_deadline = lc.store_ref().read_lock_deadline(&id).expect("pre-deadline");
clock.advance(Duration::from_millis(500));
let t = QueueTxn::new();
let claimed = lc.claim("q", "consumer-x", 100, &t).expect("claim");
assert_eq!(claimed, vec![id.clone()], "expired delivery must be claimed");
let post_deadline = lc.store_ref().read_lock_deadline(&id).expect("post-deadline");
assert!(post_deadline > pre_deadline, "deadline must move forward on claim");
assert!(t.recorded_tombstones().is_empty(), "happy-path claim must not tombstone");
}
#[test]
fn claim_that_exhausts_attempts_retires_delivery() {
let clock = Arc::new(TestClock::new());
let store = store_with(&[(1, "payload")]);
store.seed_max_attempts("q", 1, 1);
let lc = lifecycle_with_clock(
store,
LifecycleConfig {
lock_duration: Duration::from_millis(100),
..LifecycleConfig::default()
},
clock.clone(),
);
let d = lc.deliver(&QueueTxn::new(), "q", "workers", 1).expect("deliver");
let id = d[0].delivery_id.clone();
clock.advance(Duration::from_millis(500));
let c1 = lc.claim("q", "consumer-x", 100, &QueueTxn::new()).expect("claim-1");
assert_eq!(c1, vec![id.clone()]);
clock.advance(Duration::from_millis(500));
let t = QueueTxn::new();
let c2 = lc.claim("q", "consumer-y", 100, &t).expect("claim-2");
assert!(c2.is_empty(), "exhausted delivery must not be returned");
assert!(
lc.store_ref().read_lock_deadline(&id).is_none(),
"pending row must be gone after retirement",
);
assert_eq!(
lc.recorded_outcomes(),
vec![RetirementOutcome::Dropped],
);
assert_eq!(t.recorded_tombstones().len(), 1);
}
#[test]
fn claim_that_exhausts_attempts_with_dlq_target_promotes_to_dlq() {
let clock = Arc::new(TestClock::new());
let store = store_with(&[(1, "payload")]);
store.seed_max_attempts("q", 1, 1);
let lc = lifecycle_with_clock(
store,
LifecycleConfig {
lock_duration: Duration::from_millis(100),
dlq_target: Some("q.dlq".to_string()),
..LifecycleConfig::default()
},
clock.clone(),
);
let d = lc.deliver(&QueueTxn::new(), "q", "workers", 1).expect("deliver");
let _id = d[0].delivery_id.clone();
clock.advance(Duration::from_millis(500));
let _ = lc.claim("q", "consumer-x", 100, &QueueTxn::new()).expect("claim-1");
clock.advance(Duration::from_millis(500));
let c2 = lc.claim("q", "consumer-y", 100, &QueueTxn::new()).expect("claim-2");
assert!(c2.is_empty());
assert_eq!(
lc.recorded_outcomes(),
vec![RetirementOutcome::MovedToDlq("q.dlq".to_string())],
);
let dlq = lc.store_ref().dlq_snapshot();
assert_eq!(dlq.len(), 1);
assert_eq!(dlq[0].target, "q.dlq");
assert_eq!(dlq[0].original, Value::text("payload"));
}
#[test]
fn claim_respects_min_idle_threshold_per_delivery() {
let clock = Arc::new(TestClock::new());
let store = store_with(&[(1, "a"), (2, "b")]);
let lc = lifecycle_with_clock(
store,
LifecycleConfig {
lock_duration: Duration::from_millis(100),
..LifecycleConfig::default()
},
clock.clone(),
);
let d = lc.deliver(&QueueTxn::new(), "q", "workers", 2).expect("deliver");
assert_eq!(d.len(), 2);
clock.advance(Duration::from_millis(150));
let none = lc.claim("q", "consumer-x", 200, &QueueTxn::new()).expect("strict");
assert!(none.is_empty(), "min_idle 200ms not yet satisfied past deadline");
let both = lc.claim("q", "consumer-x", 0, &QueueTxn::new()).expect("loose");
assert_eq!(both.len(), 2);
}
#[test]
fn read_returns_none_after_ack() {
let lc = lifecycle(store_with(&[(1, "p")]));
let t = QueueTxn::new();
let delivered = lc.deliver(&t, "q", "workers", 1).expect("deliver");
let id = delivered[0].delivery_id.clone();
lc.ack(&t, &id).expect("ack");
assert!(
lc.read(&id, &QueueTxn::new()).is_none(),
"retired delivery must not be readable",
);
}
}