use std::collections::VecDeque;
use std::time::{Duration, Instant};
use tracing::{debug, warn};
const DEFAULT_MAX_RETRIES: u32 = 5;
const BASE_BACKOFF: Duration = Duration::from_millis(100);
const MAX_BACKOFF: Duration = Duration::from_secs(10);
#[derive(Debug, Clone)]
pub struct RetryEntry {
pub tenant_id: u64,
pub collection: String,
pub row_id: String,
pub operation: String,
pub trigger_name: String,
pub new_fields: Option<std::collections::HashMap<String, nodedb_types::Value>>,
pub old_fields: Option<std::collections::HashMap<String, nodedb_types::Value>>,
pub attempts: u32,
pub last_error: String,
pub next_retry_at: Instant,
pub source_lsn: u64,
pub source_sequence: u64,
pub cascade_depth: u32,
}
pub struct TriggerRetryQueue {
queue: VecDeque<RetryEntry>,
max_retries: u32,
}
impl TriggerRetryQueue {
pub fn new() -> Self {
Self {
queue: VecDeque::new(),
max_retries: DEFAULT_MAX_RETRIES,
}
}
pub fn enqueue(&mut self, mut entry: RetryEntry) {
entry.attempts += 1;
let backoff = compute_backoff(entry.attempts);
entry.next_retry_at = Instant::now() + backoff;
debug!(
trigger = %entry.trigger_name,
collection = %entry.collection,
attempt = entry.attempts,
backoff_ms = backoff.as_millis(),
"trigger retry enqueued"
);
self.queue.push_back(entry);
}
pub fn drain_due(&mut self) -> (Vec<RetryEntry>, Vec<RetryEntry>) {
let now = Instant::now();
let mut ready = Vec::new();
let mut exhausted = Vec::new();
while self.queue.front().is_some_and(|e| e.next_retry_at <= now) {
let Some(entry) = self.queue.pop_front() else {
break;
};
if entry.attempts >= self.max_retries {
warn!(
trigger = %entry.trigger_name,
collection = %entry.collection,
attempts = entry.attempts,
"trigger exhausted max retries → DLQ"
);
exhausted.push(entry);
} else {
ready.push(entry);
}
}
(ready, exhausted)
}
pub fn len(&self) -> usize {
self.queue.len()
}
pub fn is_empty(&self) -> bool {
self.queue.is_empty()
}
pub fn next_retry_delay(&self) -> Option<Duration> {
self.queue.front().map(|e| {
let now = Instant::now();
if e.next_retry_at > now {
e.next_retry_at - now
} else {
Duration::ZERO
}
})
}
}
impl Default for TriggerRetryQueue {
fn default() -> Self {
Self::new()
}
}
fn compute_backoff(attempt: u32) -> Duration {
let multiplier = 1u64 << (attempt.saturating_sub(1).min(20));
let delay = BASE_BACKOFF.saturating_mul(multiplier as u32);
delay.min(MAX_BACKOFF)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_entry(trigger: &str) -> RetryEntry {
RetryEntry {
tenant_id: 1,
collection: "orders".into(),
row_id: "order-1".into(),
operation: "INSERT".into(),
trigger_name: trigger.into(),
new_fields: None,
old_fields: None,
attempts: 0,
last_error: "timeout".into(),
next_retry_at: Instant::now(),
source_lsn: 100,
source_sequence: 1,
cascade_depth: 0,
}
}
#[test]
fn backoff_increases_exponentially() {
assert_eq!(compute_backoff(1), Duration::from_millis(100));
assert_eq!(compute_backoff(2), Duration::from_millis(200));
assert_eq!(compute_backoff(3), Duration::from_millis(400));
assert_eq!(compute_backoff(4), Duration::from_millis(800));
assert_eq!(compute_backoff(5), Duration::from_millis(1600));
}
#[test]
fn backoff_capped() {
let big = compute_backoff(30);
assert!(big <= MAX_BACKOFF);
}
#[test]
fn enqueue_and_drain() {
let mut q = TriggerRetryQueue::new();
let mut entry = make_entry("t1");
entry.next_retry_at = Instant::now() - Duration::from_secs(1); q.enqueue(entry);
std::thread::sleep(Duration::from_millis(110));
let (ready, exhausted) = q.drain_due();
assert_eq!(ready.len(), 1);
assert!(exhausted.is_empty());
assert_eq!(ready[0].attempts, 1);
}
#[test]
fn exhausted_after_max_retries() {
let mut q = TriggerRetryQueue::new();
q.max_retries = 2;
let mut entry = make_entry("t1");
entry.attempts = 2; entry.next_retry_at = Instant::now() - Duration::from_secs(1);
q.queue.push_back(entry);
let (ready, exhausted) = q.drain_due();
assert!(ready.is_empty());
assert_eq!(exhausted.len(), 1);
}
}