Skip to main content

nodedb_crdt/
dead_letter.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Dead-letter queue for rejected CRDT deltas.
4//!
5//! When a delta fails constraint validation at commit time, it's not silently
6//! dropped — it's routed to a dead-letter queue with actionable compensation
7//! hints that tell the application exactly how to recover.
8//!
9//! ## Developer Experience
10//!
11//! The DX of handling constraint rejections must be pristine. If developers
12//! have to write massive amounts of complex undo/compensation logic, they
13//! will abandon the database. The DLQ provides:
14//!
15//! 1. The original delta that was rejected.
16//! 2. Which constraint was violated and why.
17//! 3. A machine-readable `CompensationHint` suggesting how to fix it.
18
19use std::collections::VecDeque;
20use std::time::{SystemTime, UNIX_EPOCH};
21
22use serde::{Deserialize, Serialize};
23
24use crate::constraint::Constraint;
25use crate::error::{CrdtError, Result};
26
27/// Suggested action the application should take to resolve a constraint violation.
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub enum CompensationHint {
30    /// Retry with a different value for the conflicting field.
31    /// Example: UNIQUE violation — suggest appending a suffix.
32    RetryWithDifferentValue {
33        field: String,
34        conflicting_value: String,
35        suggestion: String,
36    },
37
38    /// Delete the conflicting row first, then retry.
39    /// Example: UNIQUE violation where the application wants to "upsert".
40    DeleteThenRetry {
41        collection: String,
42        conflicting_key: String,
43    },
44
45    /// The referenced row doesn't exist — create it first.
46    /// Example: FK violation — the parent row is missing.
47    CreateReferencedRow {
48        ref_collection: String,
49        ref_key: String,
50        missing_value: String,
51    },
52
53    /// The field must not be empty — provide a value.
54    ProvideRequiredField { field: String },
55
56    /// No automatic compensation available — manual intervention required.
57    ManualIntervention { reason: String },
58}
59
60/// A rejected delta with metadata for debugging and recovery.
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct DeadLetter {
63    /// Unique ID for this dead letter entry.
64    pub id: u64,
65
66    /// The peer that produced this delta.
67    pub peer_id: u64,
68
69    /// The authenticated user_id that submitted this delta (0 = unauthenticated/legacy).
70    #[serde(default)]
71    pub user_id: u64,
72
73    /// The tenant this delta belongs to (0 = system).
74    #[serde(default)]
75    pub tenant_id: u64,
76
77    /// The raw delta bytes that were rejected.
78    pub delta: Vec<u8>,
79
80    /// The constraint that was violated.
81    pub violated_constraint: String,
82
83    /// The collection where the violation occurred.
84    pub collection: String,
85
86    /// Human-readable explanation of why the delta was rejected.
87    pub reason: String,
88
89    /// Machine-readable hint for how to fix the violation.
90    pub hint: CompensationHint,
91
92    /// Timestamp when the rejection occurred (unix millis).
93    pub rejected_at: u64,
94
95    /// Number of times this delta has been retried.
96    pub retry_count: u32,
97}
98
99/// Bounded dead-letter queue.
100///
101/// Stores rejected deltas for later inspection and retry. The queue is bounded
102/// to prevent unbounded memory growth from a flood of invalid deltas.
103pub struct DeadLetterQueue {
104    entries: VecDeque<DeadLetter>,
105    capacity: usize,
106    next_id: u64,
107}
108
109impl DeadLetterQueue {
110    /// Create a new DLQ with the given capacity.
111    pub fn new(capacity: usize) -> Self {
112        Self {
113            entries: VecDeque::with_capacity(capacity.min(1024)),
114            capacity,
115            next_id: 1,
116        }
117    }
118
119    /// Enqueue a rejected delta with full auth context.
120    #[allow(clippy::too_many_arguments)]
121    pub fn enqueue(
122        &mut self,
123        peer_id: u64,
124        user_id: u64,
125        tenant_id: u64,
126        delta: Vec<u8>,
127        constraint: &Constraint,
128        reason: String,
129        hint: CompensationHint,
130    ) -> Result<u64> {
131        if self.entries.len() >= self.capacity {
132            return Err(CrdtError::DlqFull {
133                capacity: self.capacity,
134                pending: self.entries.len(),
135            });
136        }
137
138        let id = self.next_id;
139        self.next_id += 1;
140
141        let now = SystemTime::now()
142            .duration_since(UNIX_EPOCH)
143            .unwrap_or_default()
144            .as_millis() as u64;
145
146        self.entries.push_back(DeadLetter {
147            id,
148            peer_id,
149            user_id,
150            tenant_id,
151            delta,
152            violated_constraint: constraint.name.clone(),
153            collection: constraint.collection.clone(),
154            reason,
155            hint,
156            rejected_at: now,
157            retry_count: 0,
158        });
159
160        Ok(id)
161    }
162
163    /// Peek at the oldest dead letter without removing it.
164    pub fn peek(&self) -> Option<&DeadLetter> {
165        self.entries.front()
166    }
167
168    /// Dequeue the oldest dead letter for retry or inspection.
169    pub fn dequeue(&mut self) -> Option<DeadLetter> {
170        self.entries.pop_front()
171    }
172
173    /// Get a dead letter by ID.
174    pub fn get(&self, id: u64) -> Option<&DeadLetter> {
175        self.entries.iter().find(|dl| dl.id == id)
176    }
177
178    /// Remove a dead letter by ID (e.g., after successful manual resolution).
179    pub fn remove(&mut self, id: u64) -> Option<DeadLetter> {
180        if let Some(pos) = self.entries.iter().position(|dl| dl.id == id) {
181            self.entries.remove(pos)
182        } else {
183            None
184        }
185    }
186
187    /// Drop every entry for `(tenant_id, collection)`. Returns the
188    /// number of entries removed. Called during collection hard-delete
189    /// so stale rejected deltas don't resurface if a collection of the
190    /// same name is recreated inside the retention window's shadow.
191    pub fn purge_collection(&mut self, tenant_id: u64, collection: &str) -> usize {
192        let before = self.entries.len();
193        self.entries
194            .retain(|e| !(e.tenant_id == tenant_id && e.collection == collection));
195        before - self.entries.len()
196    }
197
198    /// Drain all entries for a specific peer.
199    pub fn drain_peer(&mut self, peer_id: u64) -> Vec<DeadLetter> {
200        let mut drained = Vec::new();
201        self.entries.retain(|dl| {
202            if dl.peer_id == peer_id {
203                drained.push(dl.clone());
204                false
205            } else {
206                true
207            }
208        });
209        drained
210    }
211
212    /// Number of pending dead letters.
213    pub fn len(&self) -> usize {
214        self.entries.len()
215    }
216
217    pub fn is_empty(&self) -> bool {
218        self.entries.is_empty()
219    }
220
221    /// Capacity of the queue.
222    pub fn capacity(&self) -> usize {
223        self.capacity
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use crate::constraint::{Constraint, ConstraintKind};
231
232    fn test_constraint() -> Constraint {
233        Constraint {
234            name: "users_email_unique".into(),
235            collection: "users".into(),
236            field: "email".into(),
237            kind: ConstraintKind::Unique,
238        }
239    }
240
241    #[test]
242    fn enqueue_and_dequeue() {
243        let mut dlq = DeadLetterQueue::new(10);
244        let c = test_constraint();
245
246        let id = dlq
247            .enqueue(
248                42,
249                0,
250                0,
251                b"delta-bytes".to_vec(),
252                &c,
253                "email already exists".into(),
254                CompensationHint::RetryWithDifferentValue {
255                    field: "email".into(),
256                    conflicting_value: "alice@example.com".into(),
257                    suggestion: "alice+1@example.com".into(),
258                },
259            )
260            .unwrap();
261
262        assert_eq!(dlq.len(), 1);
263        assert_eq!(id, 1);
264
265        let dl = dlq.dequeue().unwrap();
266        assert_eq!(dl.peer_id, 42);
267        assert_eq!(dl.violated_constraint, "users_email_unique");
268        assert!(dlq.is_empty());
269    }
270
271    #[test]
272    fn capacity_enforced() {
273        let mut dlq = DeadLetterQueue::new(2);
274        let c = test_constraint();
275        let hint = CompensationHint::ManualIntervention {
276            reason: "test".into(),
277        };
278
279        dlq.enqueue(1, 0, 0, vec![], &c, "r1".into(), hint.clone())
280            .unwrap();
281        dlq.enqueue(2, 0, 0, vec![], &c, "r2".into(), hint.clone())
282            .unwrap();
283
284        let err = dlq.enqueue(3, 0, 0, vec![], &c, "r3".into(), hint);
285        assert!(matches!(err, Err(CrdtError::DlqFull { .. })));
286    }
287
288    #[test]
289    fn purge_collection_drops_only_matching_entries() {
290        let mut dlq = DeadLetterQueue::new(10);
291        let users_c = Constraint {
292            name: "users_email_unique".into(),
293            collection: "users".into(),
294            field: "email".into(),
295            kind: ConstraintKind::Unique,
296        };
297        let orders_c = Constraint {
298            name: "orders_sku_unique".into(),
299            collection: "orders".into(),
300            field: "sku".into(),
301            kind: ConstraintKind::Unique,
302        };
303        let hint = CompensationHint::ManualIntervention { reason: "t".into() };
304
305        // Two entries for tenant 1 / users, one for tenant 1 / orders,
306        // one for tenant 2 / users — only the first two should be
307        // dropped by `purge_collection(1, "users")`.
308        dlq.enqueue(10, 0, 1, vec![], &users_c, "a".into(), hint.clone())
309            .unwrap();
310        dlq.enqueue(11, 0, 1, vec![], &users_c, "b".into(), hint.clone())
311            .unwrap();
312        dlq.enqueue(12, 0, 1, vec![], &orders_c, "c".into(), hint.clone())
313            .unwrap();
314        dlq.enqueue(13, 0, 2, vec![], &users_c, "d".into(), hint.clone())
315            .unwrap();
316
317        let removed = dlq.purge_collection(1, "users");
318        assert_eq!(removed, 2);
319        assert_eq!(dlq.len(), 2);
320
321        // Idempotent — repeated call is a no-op.
322        assert_eq!(dlq.purge_collection(1, "users"), 0);
323
324        // Remaining entries are the orders row (t1) and users row (t2).
325        let remaining: Vec<_> = (0..dlq.len()).map(|_| dlq.dequeue().unwrap()).collect();
326        assert!(
327            remaining
328                .iter()
329                .any(|d| d.collection == "orders" && d.tenant_id == 1)
330        );
331        assert!(
332            remaining
333                .iter()
334                .any(|d| d.collection == "users" && d.tenant_id == 2)
335        );
336    }
337
338    #[test]
339    fn drain_by_peer() {
340        let mut dlq = DeadLetterQueue::new(10);
341        let c = test_constraint();
342        let hint = CompensationHint::ManualIntervention {
343            reason: "test".into(),
344        };
345
346        dlq.enqueue(1, 0, 0, vec![], &c, "a".into(), hint.clone())
347            .unwrap();
348        dlq.enqueue(2, 0, 0, vec![], &c, "b".into(), hint.clone())
349            .unwrap();
350        dlq.enqueue(1, 0, 0, vec![], &c, "c".into(), hint).unwrap();
351
352        let peer1 = dlq.drain_peer(1);
353        assert_eq!(peer1.len(), 2);
354        assert_eq!(dlq.len(), 1);
355    }
356
357    #[test]
358    fn remove_by_id() {
359        let mut dlq = DeadLetterQueue::new(10);
360        let c = test_constraint();
361        let hint = CompensationHint::ManualIntervention {
362            reason: "test".into(),
363        };
364
365        let id1 = dlq
366            .enqueue(1, 0, 0, vec![], &c, "a".into(), hint.clone())
367            .unwrap();
368        let _id2 = dlq.enqueue(1, 0, 0, vec![], &c, "b".into(), hint).unwrap();
369
370        let removed = dlq.remove(id1).unwrap();
371        assert_eq!(removed.reason, "a");
372        assert_eq!(dlq.len(), 1);
373    }
374}