use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Instant;
use crate::storage::schema::Value;
pub(crate) type QueueId = String;
pub(crate) type MessageId = u64;
pub(crate) type ConsumerGroupId = String;
pub(crate) type DeliveryId = String;
pub(crate) type DlqTarget = String;
pub(crate) use super::deque::QueueSide;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum QueueStoreError {
UnknownDelivery(DeliveryId),
UnknownQueue(QueueId),
ReplicaImmutable,
}
impl std::fmt::Display for QueueStoreError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UnknownDelivery(id) => write!(f, "unknown delivery {id}"),
Self::UnknownQueue(q) => write!(f, "unknown queue {q}"),
Self::ReplicaImmutable => write!(
f,
"replica QueueStore is immutable — decisions live on the primary"
),
}
}
}
impl std::error::Error for QueueStoreError {}
pub(crate) type Result<T> = std::result::Result<T, QueueStoreError>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct TombstoneRecord {
pub(crate) queue: QueueId,
pub(crate) message_id: MessageId,
}
pub(crate) struct QueueTxn {
tombstones: Mutex<Vec<TombstoneRecord>>,
}
impl QueueTxn {
pub(crate) fn new() -> Self {
Self {
tombstones: Mutex::new(Vec::new()),
}
}
pub(crate) fn record_pending_tombstone(&self, queue: &str, message_id: MessageId) {
self.tombstones
.lock()
.expect("queue txn poisoned")
.push(TombstoneRecord {
queue: queue.to_string(),
message_id,
});
}
pub(crate) fn recorded_tombstones(&self) -> Vec<TombstoneRecord> {
self.tombstones
.lock()
.expect("queue txn poisoned")
.clone()
}
}
impl Default for QueueTxn {
fn default() -> Self {
Self::new()
}
}
pub(crate) trait QueueStore {
fn available_messages(&self, queue: &str, side: QueueSide) -> Vec<MessageId>;
fn find_pending_by_key(
&self,
queue: &str,
message_id: MessageId,
group: &str,
) -> Option<DeliveryId>;
fn mark_pending(
&self,
txn: &QueueTxn,
queue: &str,
message_id: MessageId,
group: &str,
deadline: Instant,
) -> Result<DeliveryId>;
fn release_pending(&self, txn: &QueueTxn, delivery_id: &str) -> Result<()>;
fn ack_pending(&self, txn: &QueueTxn, delivery_id: &str) -> Result<()>;
fn retire_for_group(&self, txn: &QueueTxn, delivery_id: &str) -> Result<()>;
fn available_messages_for_group(
&self,
queue: &str,
group: &str,
side: QueueSide,
) -> Vec<MessageId>;
fn bump_attempt(&self, txn: &QueueTxn, delivery_id: &str) -> Result<BumpedAttempt>;
fn read_max_attempts(&self, queue: &str, message_id: MessageId) -> u32;
fn enqueue_dlq(&self, txn: &QueueTxn, dlq_target: &str, original: Value) -> Result<()>;
fn read_lock_deadline(&self, delivery_id: &str) -> Option<Instant>;
fn read_message(&self, queue: &str, message_id: MessageId) -> Option<Value>;
fn read_pending_payload(&self, delivery_id: &str) -> Option<Value>;
fn reclaim_expired(&self, txn: &QueueTxn, queue: &str, now: Instant) -> Result<()>;
fn purge_queue(&self, txn: &QueueTxn, queue: &str) -> Result<usize>;
fn pending_deliveries_for_queue(&self, queue: &str) -> Vec<PendingDeliveryView>;
}
#[derive(Debug, Clone)]
pub(crate) struct PendingDeliveryView {
pub(crate) delivery_id: DeliveryId,
pub(crate) queue: QueueId,
pub(crate) message_id: MessageId,
pub(crate) group: ConsumerGroupId,
pub(crate) deadline: Instant,
}
#[derive(Debug, Clone)]
struct PendingDelivery {
queue: QueueId,
message_id: MessageId,
group: ConsumerGroupId,
deadline: Instant,
attempts: u32,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct BumpedAttempt {
pub(crate) attempts: u32,
pub(crate) queue: QueueId,
pub(crate) message_id: MessageId,
}
pub(crate) const DEFAULT_READ_MAX_ATTEMPTS: u32 = 3;
#[derive(Debug, Clone)]
pub(crate) struct DlqRecord {
pub target: DlqTarget,
pub original: Value,
}
#[derive(Default)]
struct State {
queues: HashMap<QueueId, Vec<MessageId>>,
pending: HashMap<DeliveryId, PendingDelivery>,
by_key: HashMap<(QueueId, MessageId, ConsumerGroupId), DeliveryId>,
payloads: HashMap<(QueueId, MessageId), Value>,
attempts: HashMap<(QueueId, MessageId, ConsumerGroupId), u32>,
acked: std::collections::HashSet<(QueueId, MessageId, ConsumerGroupId)>,
max_attempts: HashMap<(QueueId, MessageId), u32>,
dlq: Vec<DlqRecord>,
}
#[derive(Clone)]
pub(crate) struct InMemoryQueueStore {
state: Arc<Mutex<State>>,
counter: Arc<AtomicU64>,
}
impl InMemoryQueueStore {
pub(crate) fn new() -> Self {
Self {
state: Arc::new(Mutex::new(State::default())),
counter: Arc::new(AtomicU64::new(0)),
}
}
pub(crate) fn seed_queue(&self, queue: &str, messages: Vec<MessageId>) {
let mut state = self.state.lock().expect("state poisoned");
state.queues.insert(queue.to_string(), messages);
}
pub(crate) fn dlq_snapshot(&self) -> Vec<DlqRecord> {
self.state.lock().expect("state poisoned").dlq.clone()
}
pub(crate) fn seed_max_attempts(&self, queue: &str, message_id: MessageId, max_attempts: u32) {
let mut state = self.state.lock().expect("state poisoned");
state
.max_attempts
.insert((queue.to_string(), message_id), max_attempts);
}
pub(crate) fn seed_payload(&self, queue: &str, message_id: MessageId, payload: Value) {
let mut state = self.state.lock().expect("state poisoned");
state
.payloads
.insert((queue.to_string(), message_id), payload);
}
fn next_delivery_id(&self) -> DeliveryId {
let n = self.counter.fetch_add(1, Ordering::Relaxed);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
let mut bytes = [0u8; 16];
bytes[..8].copy_from_slice(&n.to_le_bytes());
bytes[8..].copy_from_slice(&nanos.to_le_bytes());
let hash = blake3::hash(&bytes);
base32_lower(&hash.as_bytes()[..15])
}
}
impl QueueStore for InMemoryQueueStore {
fn available_messages(&self, queue: &str, side: QueueSide) -> Vec<MessageId> {
let state = self.state.lock().expect("state poisoned");
let Some(msgs) = state.queues.get(queue) else {
return Vec::new();
};
let pending: std::collections::HashSet<MessageId> = state
.pending
.values()
.filter(|p| p.queue == queue)
.map(|p| p.message_id)
.collect();
let mut out: Vec<MessageId> = msgs
.iter()
.copied()
.filter(|m| !pending.contains(m))
.collect();
if matches!(side, QueueSide::Right) {
out.reverse();
}
out
}
fn mark_pending(
&self,
_txn: &QueueTxn,
queue: &str,
message_id: MessageId,
group: &str,
deadline: Instant,
) -> Result<DeliveryId> {
let key = (queue.to_string(), message_id, group.to_string());
{
let mut state = self.state.lock().expect("state poisoned");
if !state.queues.contains_key(queue) {
return Err(QueueStoreError::UnknownQueue(queue.to_string()));
}
if let Some(existing) = state.by_key.get(&key).cloned() {
if let Some(entry) = state.pending.get_mut(&existing) {
entry.deadline = deadline;
}
return Ok(existing);
}
}
let delivery_id = self.next_delivery_id();
let mut state = self.state.lock().expect("state poisoned");
let attempts = state.attempts.get(&key).copied().unwrap_or(0);
state.pending.insert(
delivery_id.clone(),
PendingDelivery {
queue: queue.to_string(),
message_id,
group: group.to_string(),
deadline,
attempts,
},
);
state.by_key.insert(key, delivery_id.clone());
Ok(delivery_id)
}
fn find_pending_by_key(
&self,
queue: &str,
message_id: MessageId,
group: &str,
) -> Option<DeliveryId> {
let state = self.state.lock().expect("state poisoned");
state
.by_key
.get(&(queue.to_string(), message_id, group.to_string()))
.cloned()
}
fn release_pending(&self, _txn: &QueueTxn, delivery_id: &str) -> Result<()> {
let mut state = self.state.lock().expect("state poisoned");
if let Some(entry) = state.pending.remove(delivery_id) {
let key = (entry.queue, entry.message_id, entry.group);
state.by_key.remove(&key);
}
Ok(())
}
fn ack_pending(&self, txn: &QueueTxn, delivery_id: &str) -> Result<()> {
let mut state = self.state.lock().expect("state poisoned");
let entry = state
.pending
.remove(delivery_id)
.ok_or_else(|| QueueStoreError::UnknownDelivery(delivery_id.to_string()))?;
let key = (entry.queue.clone(), entry.message_id, entry.group);
state.by_key.remove(&key);
state.attempts.remove(&key);
if let Some(msgs) = state.queues.get_mut(&entry.queue) {
msgs.retain(|m| *m != entry.message_id);
}
state.payloads.remove(&(entry.queue.clone(), entry.message_id));
drop(state);
txn.record_pending_tombstone(&entry.queue, entry.message_id);
Ok(())
}
fn retire_for_group(&self, _txn: &QueueTxn, delivery_id: &str) -> Result<()> {
let mut state = self.state.lock().expect("state poisoned");
let entry = state
.pending
.remove(delivery_id)
.ok_or_else(|| QueueStoreError::UnknownDelivery(delivery_id.to_string()))?;
let key = (entry.queue, entry.message_id, entry.group);
state.by_key.remove(&key);
state.attempts.remove(&key);
state.acked.insert(key);
Ok(())
}
fn available_messages_for_group(
&self,
queue: &str,
group: &str,
side: QueueSide,
) -> Vec<MessageId> {
let state = self.state.lock().expect("state poisoned");
let Some(msgs) = state.queues.get(queue) else {
return Vec::new();
};
let pending: std::collections::HashSet<MessageId> = state
.pending
.values()
.filter(|p| p.queue == queue && p.group == group)
.map(|p| p.message_id)
.collect();
let mut out: Vec<MessageId> = msgs
.iter()
.copied()
.filter(|m| !pending.contains(m))
.filter(|m| !state.acked.contains(&(queue.to_string(), *m, group.to_string())))
.collect();
if matches!(side, QueueSide::Right) {
out.reverse();
}
out
}
fn bump_attempt(&self, _txn: &QueueTxn, delivery_id: &str) -> Result<BumpedAttempt> {
let mut state = self.state.lock().expect("state poisoned");
let entry = state
.pending
.get_mut(delivery_id)
.ok_or_else(|| QueueStoreError::UnknownDelivery(delivery_id.to_string()))?;
entry.attempts += 1;
let count = entry.attempts;
let queue = entry.queue.clone();
let message_id = entry.message_id;
let key = (queue.clone(), message_id, entry.group.clone());
state.attempts.insert(key, count);
Ok(BumpedAttempt {
attempts: count,
queue,
message_id,
})
}
fn read_max_attempts(&self, queue: &str, message_id: MessageId) -> u32 {
let state = self.state.lock().expect("state poisoned");
state
.max_attempts
.get(&(queue.to_string(), message_id))
.copied()
.unwrap_or(DEFAULT_READ_MAX_ATTEMPTS)
}
fn enqueue_dlq(&self, _txn: &QueueTxn, dlq_target: &str, original: Value) -> Result<()> {
let mut state = self.state.lock().expect("state poisoned");
state.dlq.push(DlqRecord {
target: dlq_target.to_string(),
original,
});
Ok(())
}
fn read_lock_deadline(&self, delivery_id: &str) -> Option<Instant> {
let state = self.state.lock().expect("state poisoned");
state.pending.get(delivery_id).map(|p| p.deadline)
}
fn read_message(&self, queue: &str, message_id: MessageId) -> Option<Value> {
let state = self.state.lock().expect("state poisoned");
state
.payloads
.get(&(queue.to_string(), message_id))
.cloned()
}
fn read_pending_payload(&self, delivery_id: &str) -> Option<Value> {
let state = self.state.lock().expect("state poisoned");
let entry = state.pending.get(delivery_id)?;
state
.payloads
.get(&(entry.queue.clone(), entry.message_id))
.cloned()
}
fn reclaim_expired(&self, _txn: &QueueTxn, queue: &str, now: Instant) -> Result<()> {
let mut state = self.state.lock().expect("state poisoned");
let expired: Vec<DeliveryId> = state
.pending
.iter()
.filter(|(_, p)| p.queue == queue && p.deadline <= now)
.map(|(id, _)| id.clone())
.collect();
for id in expired {
if let Some(entry) = state.pending.remove(&id) {
let key = (entry.queue, entry.message_id, entry.group);
state.by_key.remove(&key);
}
}
Ok(())
}
fn purge_queue(&self, txn: &QueueTxn, queue: &str) -> Result<usize> {
let mut message_ids: Vec<MessageId> = {
let state = self.state.lock().expect("state poisoned");
let mut ids: Vec<MessageId> = state
.queues
.get(queue)
.map(|v| v.clone())
.unwrap_or_default();
for pending in state.pending.values() {
if pending.queue == queue && !ids.contains(&pending.message_id) {
ids.push(pending.message_id);
}
}
ids
};
message_ids.sort_unstable();
message_ids.dedup();
{
let mut state = self.state.lock().expect("state poisoned");
let pending_to_remove: Vec<DeliveryId> = state
.pending
.iter()
.filter(|(_, p)| p.queue == queue)
.map(|(id, _)| id.clone())
.collect();
for id in pending_to_remove {
if let Some(entry) = state.pending.remove(&id) {
let key = (entry.queue, entry.message_id, entry.group);
state.by_key.remove(&key);
state.attempts.remove(&key);
}
}
state.acked.retain(|(q, _, _)| q != queue);
state.queues.remove(queue);
state.payloads.retain(|(q, _), _| q != queue);
}
for message_id in &message_ids {
txn.record_pending_tombstone(queue, *message_id);
}
Ok(message_ids.len())
}
fn pending_deliveries_for_queue(&self, queue: &str) -> Vec<PendingDeliveryView> {
let state = self.state.lock().expect("state poisoned");
state
.pending
.iter()
.filter(|(_, p)| p.queue == queue)
.map(|(id, p)| PendingDeliveryView {
delivery_id: id.clone(),
queue: p.queue.clone(),
message_id: p.message_id,
group: p.group.clone(),
deadline: p.deadline,
})
.collect()
}
}
fn base32_lower(bytes: &[u8]) -> String {
const ALPHABET: &[u8; 32] = b"abcdefghijklmnopqrstuvwxyz234567";
let mut out = String::with_capacity((bytes.len() * 8 + 4) / 5);
let mut buf: u32 = 0;
let mut bits: u32 = 0;
for &b in bytes {
buf = (buf << 8) | b as u32;
bits += 8;
while bits >= 5 {
bits -= 5;
let idx = ((buf >> bits) & 0x1f) as usize;
out.push(ALPHABET[idx] as char);
}
}
if bits > 0 {
let idx = ((buf << (5 - bits)) & 0x1f) as usize;
out.push(ALPHABET[idx] as char);
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
fn deadline_in(ms: u64) -> Instant {
Instant::now() + Duration::from_millis(ms)
}
fn txn() -> QueueTxn {
QueueTxn::new()
}
#[test]
fn delivery_id_is_opaque_base32() {
let store = InMemoryQueueStore::new();
store.seed_queue("q", vec![1]);
let t = txn();
let id = store
.mark_pending(&t, "q", 1, "g", deadline_in(1000))
.expect("mark");
assert!(!id.is_empty(), "delivery_id is empty");
assert!(
id.chars()
.all(|c| matches!(c, 'a'..='z' | '2'..='7')),
"delivery_id {id} not base32-lower"
);
}
#[test]
fn delivery_ids_are_unique() {
let store = InMemoryQueueStore::new();
store.seed_queue("q", vec![1, 2]);
let t = txn();
let a = store.mark_pending(&t, "q", 1, "g", deadline_in(1000)).unwrap();
let b = store.mark_pending(&t, "q", 2, "g", deadline_in(1000)).unwrap();
assert_ne!(a, b);
}
#[test]
fn mark_pending_is_idempotent_on_same_key() {
let store = InMemoryQueueStore::new();
store.seed_queue("q", vec![1]);
let t = txn();
let a = store.mark_pending(&t, "q", 1, "g", deadline_in(1000)).unwrap();
let b = store.mark_pending(&t, "q", 1, "g", deadline_in(2000)).unwrap();
assert_eq!(a, b, "same (queue, msg, group) should return same delivery_id");
}
#[test]
fn release_pending_is_noop_on_unknown_id() {
let store = InMemoryQueueStore::new();
let t = txn();
assert!(store.release_pending(&t, "does-not-exist").is_ok());
}
#[test]
fn bump_attempt_returns_new_count() {
let store = InMemoryQueueStore::new();
store.seed_queue("q", vec![1]);
let t = txn();
let id = store.mark_pending(&t, "q", 1, "g", deadline_in(1000)).unwrap();
let first = store.bump_attempt(&t, &id).unwrap();
assert_eq!(first.attempts, 1);
assert_eq!(first.queue, "q");
assert_eq!(first.message_id, 1);
assert_eq!(store.bump_attempt(&t, &id).unwrap().attempts, 2);
assert_eq!(store.bump_attempt(&t, &id).unwrap().attempts, 3);
}
#[test]
fn ack_and_delete_records_one_pending_tombstone_per_call() {
let store = InMemoryQueueStore::new();
store.seed_queue("q", vec![1, 2, 3]);
store.seed_payload("q", 1, Value::text("p1"));
store.seed_payload("q", 2, Value::text("p2"));
store.seed_payload("q", 3, Value::text("p3"));
let t = txn();
let d1 = store.mark_pending(&t, "q", 1, "g", deadline_in(1000)).unwrap();
let d2 = store.mark_pending(&t, "q", 2, "g", deadline_in(1000)).unwrap();
let d3 = store.mark_pending(&t, "q", 3, "g", deadline_in(1000)).unwrap();
assert!(
t.recorded_tombstones().is_empty(),
"mark_pending must not record tombstones"
);
store.ack_pending(&t, &d1).unwrap();
store.ack_pending(&t, &d2).unwrap();
assert_eq!(
t.recorded_tombstones(),
vec![
TombstoneRecord { queue: "q".to_string(), message_id: 1 },
TombstoneRecord { queue: "q".to_string(), message_id: 2 },
],
"each ack_pending must record exactly one tombstone, in order",
);
store.release_pending(&t, &d3).unwrap();
assert_eq!(t.recorded_tombstones().len(), 2, "release_pending must not record");
let d3 = store.mark_pending(&t, "q", 3, "g", deadline_in(1000)).unwrap();
store.bump_attempt(&t, &d3).unwrap();
assert_eq!(t.recorded_tombstones().len(), 2, "bump_attempt must not record");
store.retire_for_group(&t, &d3).unwrap();
assert_eq!(t.recorded_tombstones().len(), 2, "retire_for_group must not record");
store
.reclaim_expired(&t, "q", Instant::now() + Duration::from_secs(60))
.unwrap();
assert_eq!(t.recorded_tombstones().len(), 2, "reclaim_expired must not record");
}
#[test]
fn read_max_attempts_defaults_to_three_when_not_seeded() {
let store = InMemoryQueueStore::new();
assert_eq!(
store.read_max_attempts("q", 1),
DEFAULT_READ_MAX_ATTEMPTS,
"unseeded message must return the crate-wide default",
);
assert_eq!(DEFAULT_READ_MAX_ATTEMPTS, 3);
}
#[test]
fn seed_max_attempts_overrides_default_per_message() {
let store = InMemoryQueueStore::new();
store.seed_max_attempts("q", 1, 7);
store.seed_max_attempts("q", 2, 1);
assert_eq!(store.read_max_attempts("q", 1), 7);
assert_eq!(store.read_max_attempts("q", 2), 1);
assert_eq!(store.read_max_attempts("other", 1), DEFAULT_READ_MAX_ATTEMPTS);
}
#[test]
fn bump_attempt_unknown_id_errors() {
let store = InMemoryQueueStore::new();
let t = txn();
let err = store.bump_attempt(&t, "nope").unwrap_err();
assert!(matches!(err, QueueStoreError::UnknownDelivery(_)));
}
#[test]
fn enqueue_dlq_records_original() {
let store = InMemoryQueueStore::new();
let t = txn();
store.enqueue_dlq(&t, "orders.dlq", Value::text("payload-1")).unwrap();
store.enqueue_dlq(&t, "orders.dlq", Value::Integer(42)).unwrap();
let snap = store.dlq_snapshot();
assert_eq!(snap.len(), 2);
assert_eq!(snap[0].target, "orders.dlq");
assert_eq!(snap[0].original, Value::text("payload-1"));
assert_eq!(snap[1].original, Value::Integer(42));
}
#[test]
fn available_messages_skips_pending() {
let store = InMemoryQueueStore::new();
store.seed_queue("q", vec![1, 2, 3]);
let t = txn();
let _ = store.mark_pending(&t, "q", 2, "g", deadline_in(1000)).unwrap();
let avail = store.available_messages("q", QueueSide::Left);
assert_eq!(avail, vec![1, 3]);
let avail_right = store.available_messages("q", QueueSide::Right);
assert_eq!(avail_right, vec![3, 1]);
}
#[test]
fn release_returns_message_to_available() {
let store = InMemoryQueueStore::new();
store.seed_queue("q", vec![1]);
let t = txn();
let id = store.mark_pending(&t, "q", 1, "g", deadline_in(1000)).unwrap();
assert!(store.available_messages("q", QueueSide::Left).is_empty());
store.release_pending(&t, &id).unwrap();
assert_eq!(store.available_messages("q", QueueSide::Left), vec![1]);
}
#[test]
fn read_lock_deadline_reflects_pending_state() {
let store = InMemoryQueueStore::new();
store.seed_queue("q", vec![1]);
let t = txn();
let dl = deadline_in(1000);
let id = store.mark_pending(&t, "q", 1, "g", dl).unwrap();
assert_eq!(store.read_lock_deadline(&id), Some(dl));
store.release_pending(&t, &id).unwrap();
assert_eq!(store.read_lock_deadline(&id), None);
}
#[test]
fn mark_pending_refreshes_deadline_on_repeat() {
let store = InMemoryQueueStore::new();
store.seed_queue("q", vec![1]);
let t = txn();
let d1 = deadline_in(1000);
let d2 = deadline_in(5000);
let id = store.mark_pending(&t, "q", 1, "g", d1).unwrap();
let id2 = store.mark_pending(&t, "q", 1, "g", d2).unwrap();
assert_eq!(id, id2);
assert_eq!(store.read_lock_deadline(&id), Some(d2));
}
#[test]
fn mark_pending_unknown_queue_errors() {
let store = InMemoryQueueStore::new();
let t = txn();
let err = store.mark_pending(&t, "missing", 1, "g", deadline_in(1000)).unwrap_err();
assert!(matches!(err, QueueStoreError::UnknownQueue(_)));
}
#[test]
fn base32_lower_roundtrip_shape() {
let s = base32_lower(&[0u8; 15]);
assert_eq!(s.len(), 24);
assert!(s.chars().all(|c| matches!(c, 'a'..='z' | '2'..='7')));
}
}