use std::collections::HashSet;
use crate::error::ToolError;
use super::backend::SpoolBackend;
use super::item::SpoolItem;
use super::{DrainOnRecovery, OnFull, OrderingMode, SpoolSpec};
#[derive(Debug, Clone, PartialEq)]
pub enum Admission {
Accept,
RejectStopAck,
AcceptAfterEvict(Vec<DeadLetter>),
AcceptWithAlert { spool_bytes: u64 },
}
#[derive(Debug, Clone, PartialEq)]
pub struct DeadLetter {
pub message_id: String,
pub dedup_key: String,
pub recv_seq: u64,
pub spool_ref: String,
pub attempts: u32,
pub reason: String,
}
impl DeadLetter {
fn from_item(item: &SpoolItem, reason: impl Into<String>) -> Self {
Self {
message_id: item.message_id.clone(),
dedup_key: item.dedup_key.clone(),
recv_seq: item.recv_seq,
spool_ref: item.spool_ref(),
attempts: item.attempts,
reason: reason.into(),
}
}
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct DrainReport {
pub replayed: u64,
pub deduped: u64,
pub dead_lettered: Vec<DeadLetter>,
pub remaining: u64,
pub fully_drained: bool,
}
#[derive(Debug, Clone, Default, PartialEq)]
pub struct GcReport {
pub expired: Vec<DeadLetter>,
pub spool_bytes: u64,
}
pub type DrainResult = Result<(), ToolError>;
pub struct SpoolEngine {
spec: SpoolSpec,
backend: Box<dyn SpoolBackend>,
dlq: Box<dyn SpoolBackend>,
seen: HashSet<String>,
}
impl SpoolEngine {
pub fn new(spec: SpoolSpec, backend: Box<dyn SpoolBackend>, dlq: Box<dyn SpoolBackend>) -> Self {
Self {
spec,
backend,
dlq,
seen: HashSet::new(),
}
}
pub fn spec(&self) -> &SpoolSpec {
&self.spec
}
pub fn mark_dispatched(&mut self, dedup_key: &str) {
self.seen.insert(dedup_key.to_string());
}
pub async fn spool_bytes(&self) -> Result<u64, ToolError> {
self.backend.total_bytes().await
}
pub async fn len(&self) -> Result<usize, ToolError> {
self.backend.len().await
}
pub async fn is_empty(&self) -> Result<bool, ToolError> {
self.backend.is_empty().await
}
pub async fn high_water_recv_seq(&self) -> Result<u64, ToolError> {
let mut max = 0u64;
for meta in self.backend.list().await? {
if let Some(seq) = super::recv_seq_from_object_key(&meta.key) {
max = max.max(seq);
}
}
Ok(max)
}
pub async fn admit(&self, now_ms: u64, incoming_bytes: u64) -> Result<Admission, ToolError> {
let max_bytes = match self.spec.retention.max_bytes {
None => return Ok(Admission::Accept), Some(m) => m,
};
let current = self.backend.total_bytes().await?;
if current + incoming_bytes <= max_bytes {
return Ok(Admission::Accept);
}
match self.spec.retention.on_full {
OnFull::StopAcking => Ok(Admission::RejectStopAck),
OnFull::AlertOnly => Ok(Admission::AcceptWithAlert { spool_bytes: current }),
OnFull::DropToDlq => {
let metas = self.backend.list().await?;
let mut evicted = Vec::new();
let mut freed = 0u64;
for meta in metas {
if current.saturating_sub(freed) + incoming_bytes <= max_bytes {
break;
}
if let Ok(item) = self.backend.get(&meta.key).await {
self.dlq.put(&item).await.ok();
evicted.push(DeadLetter::from_item(&item, "on_full_evicted"));
}
self.backend.delete(&meta.key).await.ok();
freed += meta.size;
}
let _ = now_ms;
Ok(Admission::AcceptAfterEvict(evicted))
}
}
}
pub async fn spool(&mut self, item: &SpoolItem) -> Result<SpooledRef, ToolError> {
self.backend.put(item).await?;
Ok(SpooledRef {
spool_ref: item.spool_ref(),
sha256: item.sha256.clone(),
recv_seq: item.recv_seq,
})
}
pub async fn gc_expired(&mut self, now_ms: u64) -> Result<GcReport, ToolError> {
let max_age_ms = match self.spec.retention.max_age_hours {
None => {
return Ok(GcReport {
expired: Vec::new(),
spool_bytes: self.backend.total_bytes().await?,
})
}
Some(h) => h.saturating_mul(3_600_000),
};
let mut expired = Vec::new();
for meta in self.backend.list().await? {
if let Ok(item) = self.backend.get(&meta.key).await {
if now_ms.saturating_sub(item.spooled_at_ms) >= max_age_ms {
self.dlq.put(&item).await.ok();
self.backend.delete(&meta.key).await.ok();
expired.push(DeadLetter::from_item(&item, "retention_expired"));
}
}
}
Ok(GcReport {
expired,
spool_bytes: self.backend.total_bytes().await?,
})
}
pub async fn drain<F, Fut>(&mut self, mut dispatch: F) -> Result<DrainReport, ToolError>
where
F: FnMut(SpoolItem) -> Fut,
Fut: std::future::Future<Output = DrainResult>,
{
let metas = self.backend.list().await?; let mut items = Vec::with_capacity(metas.len());
for m in metas {
if let Ok(it) = self.backend.get(&m.key).await {
items.push((m.key, it));
}
}
let mut report = DrainReport::default();
let max_attempts = self.spec.drain.max_replay_attempts.max(1);
let lane_of = |it: &SpoolItem| -> String {
match self.spec.ordering {
OrderingMode::PerKey => it
.lane(OrderingMode::PerKey)
.unwrap_or_else(|| "__global__".to_string()),
OrderingMode::Global | OrderingMode::None => "__global__".to_string(),
}
};
let mut stalled_lanes: HashSet<String> = HashSet::new();
let ordered = !matches!(self.spec.ordering, OrderingMode::None);
for (key, mut item) in items {
let lane = lane_of(&item);
if ordered && stalled_lanes.contains(&lane) {
report.remaining += 1;
continue; }
if self.seen.contains(&item.dedup_key) {
self.backend.delete(&key).await.ok();
report.deduped += 1;
continue;
}
match dispatch(item.clone()).await {
Ok(()) => {
self.seen.insert(item.dedup_key.clone());
self.backend.delete(&key).await.ok(); report.replayed += 1;
}
Err(_) => {
item.attempts = item.attempts.saturating_add(1);
if item.attempts >= max_attempts {
self.dlq.put(&item).await.ok();
self.backend.delete(&key).await.ok();
report.dead_lettered.push(DeadLetter::from_item(&item, "max_replay_attempts"));
} else {
self.backend.put(&item).await.ok();
report.remaining += 1;
if ordered {
stalled_lanes.insert(lane);
}
}
}
}
}
report.fully_drained = report.remaining == 0;
Ok(report)
}
pub fn drain_before_live(&self) -> bool {
matches!(self.spec.drain.on_recovery, DrainOnRecovery::OrderedThenLive)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct SpooledRef {
pub spool_ref: String,
pub sha256: String,
pub recv_seq: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::spool::backend::{SpoolBackend, SpoolMeta};
use crate::tools::source::PolledMessage;
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Default, Clone)]
struct MemBackend {
items: Arc<Mutex<std::collections::BTreeMap<String, SpoolItem>>>,
}
#[async_trait::async_trait]
impl SpoolBackend for MemBackend {
fn kind(&self) -> &'static str {
"mem"
}
async fn put(&self, item: &SpoolItem) -> Result<(), ToolError> {
self.items.lock().await.insert(item.object_key(), item.clone());
Ok(())
}
async fn list(&self) -> Result<Vec<SpoolMeta>, ToolError> {
Ok(self
.items
.lock()
.await
.iter()
.map(|(k, v)| SpoolMeta { key: k.clone(), size: v.to_bytes().len() as u64 })
.collect())
}
async fn get(&self, key: &str) -> Result<SpoolItem, ToolError> {
self.items
.lock()
.await
.get(key)
.cloned()
.ok_or_else(|| ToolError::ExecutionFailed(format!("no {key}")))
}
async fn delete(&self, key: &str) -> Result<(), ToolError> {
self.items.lock().await.remove(key);
Ok(())
}
}
fn msg(id: &str, data: serde_json::Value, key: Option<&str>) -> PolledMessage {
let mut headers = serde_json::Map::new();
if let Some(k) = key {
headers.insert("ordering_key".to_string(), serde_json::json!(k));
}
PolledMessage {
id: id.to_string(),
data,
headers,
attributes: serde_json::Value::Null,
metadata: serde_json::Value::Null,
ack_id: None,
}
}
fn item(seq: u64, id: &str, key: Option<&str>) -> SpoolItem {
SpoolItem::new(
"subscriptions/t",
"nats",
msg(id, serde_json::json!({"seq": seq}), key),
None,
seq,
key.map(str::to_string),
"default",
"circuit_open",
seq,
)
}
fn engine(spec: SpoolSpec) -> SpoolEngine {
SpoolEngine::new(spec, Box::new(MemBackend::default()), Box::new(MemBackend::default()))
}
fn spec_with(ordering: OrderingMode, max_attempts: u32) -> SpoolSpec {
let mut s = SpoolSpec::off();
s.mode = super::super::SpoolMode::BufferAndAck;
s.ordering = ordering;
s.drain.max_replay_attempts = max_attempts;
s.retention.max_bytes = None; s
}
#[tokio::test]
async fn drain_replays_in_global_order_then_gcs() {
let mut eng = engine(spec_with(OrderingMode::Global, 3));
for (seq, id) in [(3, "c"), (1, "a"), (2, "b")] {
eng.spool(&item(seq, id, None)).await.unwrap();
}
let order = Arc::new(Mutex::new(Vec::<u64>::new()));
let o2 = order.clone();
let report = eng
.drain(move |it| {
let o = o2.clone();
async move {
o.lock().await.push(it.recv_seq);
Ok(())
}
})
.await
.unwrap();
assert_eq!(report.replayed, 3);
assert!(report.fully_drained);
assert_eq!(*order.lock().await, vec![1, 2, 3]); assert!(eng.is_empty().await.unwrap()); }
#[tokio::test]
async fn drain_idempotency_skips_already_dispatched() {
let mut eng = engine(spec_with(OrderingMode::None, 3));
let mut a = item(1, "dup", None);
a.dedup_key = "dup".into();
let mut b = item(2, "dup", None);
b.dedup_key = "dup".into();
eng.spool(&a).await.unwrap();
eng.spool(&b).await.unwrap();
let count = Arc::new(Mutex::new(0u32));
let c2 = count.clone();
let report = eng
.drain(move |_it| {
let c = c2.clone();
async move {
*c.lock().await += 1;
Ok(())
}
})
.await
.unwrap();
assert_eq!(*count.lock().await, 1); assert_eq!(report.replayed, 1);
assert_eq!(report.deduped, 1);
}
#[tokio::test]
async fn poison_message_dead_letters_after_max_attempts() {
let mut eng = engine(spec_with(OrderingMode::None, 2));
eng.spool(&item(1, "poison", None)).await.unwrap();
let _ = eng.drain(|_it| async { Err(ToolError::ExecutionFailed("down".into())) }).await.unwrap();
let r2 = eng.drain(|_it| async { Err(ToolError::ExecutionFailed("down".into())) }).await.unwrap();
assert_eq!(r2.dead_lettered.len(), 1);
assert_eq!(r2.dead_lettered[0].reason, "max_replay_attempts");
assert!(eng.is_empty().await.unwrap()); }
#[tokio::test]
async fn global_order_stops_on_transient_failure_preserving_order() {
let mut eng = engine(spec_with(OrderingMode::Global, 5));
for (seq, id) in [(1, "a"), (2, "b"), (3, "c")] {
eng.spool(&item(seq, id, None)).await.unwrap();
}
let seen = Arc::new(Mutex::new(Vec::<u64>::new()));
let s2 = seen.clone();
let report = eng
.drain(move |it| {
let s = s2.clone();
async move {
s.lock().await.push(it.recv_seq);
if it.recv_seq == 2 {
Err(ToolError::ExecutionFailed("still down".into()))
} else {
Ok(())
}
}
})
.await
.unwrap();
assert_eq!(*seen.lock().await, vec![1, 2]);
assert_eq!(report.replayed, 1);
assert!(!report.fully_drained);
assert_eq!(report.remaining, 2); }
#[tokio::test]
async fn per_key_lanes_are_independent() {
let mut eng = engine(spec_with(OrderingMode::PerKey, 5));
eng.spool(&item(1, "a1", Some("A"))).await.unwrap();
eng.spool(&item(2, "b1", Some("B"))).await.unwrap();
eng.spool(&item(3, "a2", Some("A"))).await.unwrap();
let report = eng
.drain(|it| async move {
if it.ordering_key.as_deref() == Some("B") {
Err(ToolError::ExecutionFailed("B down".into()))
} else {
Ok(())
}
})
.await
.unwrap();
assert_eq!(report.replayed, 2);
assert_eq!(report.remaining, 1); }
#[tokio::test]
async fn admit_stop_acking_when_ceiling_hit() {
let mut s = spec_with(OrderingMode::None, 3);
s.retention.max_bytes = Some(10); s.retention.on_full = OnFull::StopAcking;
let mut eng = engine(s);
eng.spool(&item(1, "a", None)).await.unwrap(); let admission = eng.admit(0, 100).await.unwrap();
assert_eq!(admission, Admission::RejectStopAck);
}
#[tokio::test]
async fn admit_drop_to_dlq_evicts_oldest() {
let mut s = spec_with(OrderingMode::None, 3);
s.retention.on_full = OnFull::DropToDlq;
let mut eng = engine(s.clone());
eng.spool(&item(1, "a", None)).await.unwrap();
eng.spool(&item(2, "b", None)).await.unwrap();
let bytes = eng.spool_bytes().await.unwrap();
let mut s2 = s;
s2.retention.max_bytes = Some(bytes / 2);
eng.spec = s2;
let admission = eng.admit(0, 1).await.unwrap();
match admission {
Admission::AcceptAfterEvict(evicted) => assert!(!evicted.is_empty()),
other => panic!("expected eviction, got {other:?}"),
}
}
#[tokio::test]
async fn gc_expired_removes_old_items() {
let mut s = spec_with(OrderingMode::None, 3);
s.retention.max_age_hours = Some(1);
let mut eng = engine(s);
eng.spool(&item(1, "old", None)).await.unwrap();
let report = eng.gc_expired(2 * 3_600_000).await.unwrap();
assert_eq!(report.expired.len(), 1);
assert_eq!(report.expired[0].reason, "retention_expired");
assert!(eng.is_empty().await.unwrap());
}
#[tokio::test]
async fn high_water_recv_seq_recovers_max_from_backlog() {
let mut eng = engine(spec_with(OrderingMode::Global, 3));
assert_eq!(eng.high_water_recv_seq().await.unwrap(), 0); for (seq, id) in [(3, "c"), (1, "a"), (42, "z"), (2, "b")] {
eng.spool(&item(seq, id, None)).await.unwrap();
}
assert_eq!(eng.high_water_recv_seq().await.unwrap(), 42);
}
#[tokio::test]
async fn mark_dispatched_dedupes_subsequent_spool_replay() {
let mut eng = engine(spec_with(OrderingMode::None, 3));
eng.mark_dispatched("m1");
let mut it = item(1, "m1", None);
it.dedup_key = "m1".into();
eng.spool(&it).await.unwrap();
let report = eng.drain(|_it| async { Ok(()) }).await.unwrap();
assert_eq!(report.replayed, 0);
assert_eq!(report.deduped, 1);
}
}