1use std::collections::VecDeque;
20use std::time::{SystemTime, UNIX_EPOCH};
21
22use serde::{Deserialize, Serialize};
23
24use crate::constraint::Constraint;
25use crate::error::{CrdtError, Result};
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
29pub enum CompensationHint {
30 RetryWithDifferentValue {
33 field: String,
34 conflicting_value: String,
35 suggestion: String,
36 },
37
38 DeleteThenRetry {
41 collection: String,
42 conflicting_key: String,
43 },
44
45 CreateReferencedRow {
48 ref_collection: String,
49 ref_key: String,
50 missing_value: String,
51 },
52
53 ProvideRequiredField { field: String },
55
56 ManualIntervention { reason: String },
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub struct DeadLetter {
63 pub id: u64,
65
66 pub peer_id: u64,
68
69 #[serde(default)]
71 pub user_id: u64,
72
73 #[serde(default)]
75 pub tenant_id: u64,
76
77 pub delta: Vec<u8>,
79
80 pub violated_constraint: String,
82
83 pub collection: String,
85
86 pub reason: String,
88
89 pub hint: CompensationHint,
91
92 pub rejected_at: u64,
94
95 pub retry_count: u32,
97}
98
99pub struct DeadLetterQueue {
104 entries: VecDeque<DeadLetter>,
105 capacity: usize,
106 next_id: u64,
107}
108
109impl DeadLetterQueue {
110 pub fn new(capacity: usize) -> Self {
112 Self {
113 entries: VecDeque::with_capacity(capacity.min(1024)),
114 capacity,
115 next_id: 1,
116 }
117 }
118
119 #[allow(clippy::too_many_arguments)]
121 pub fn enqueue(
122 &mut self,
123 peer_id: u64,
124 user_id: u64,
125 tenant_id: u64,
126 delta: Vec<u8>,
127 constraint: &Constraint,
128 reason: String,
129 hint: CompensationHint,
130 ) -> Result<u64> {
131 if self.entries.len() >= self.capacity {
132 return Err(CrdtError::DlqFull {
133 capacity: self.capacity,
134 pending: self.entries.len(),
135 });
136 }
137
138 let id = self.next_id;
139 self.next_id += 1;
140
141 let now = SystemTime::now()
142 .duration_since(UNIX_EPOCH)
143 .unwrap_or_default()
144 .as_millis() as u64;
145
146 self.entries.push_back(DeadLetter {
147 id,
148 peer_id,
149 user_id,
150 tenant_id,
151 delta,
152 violated_constraint: constraint.name.clone(),
153 collection: constraint.collection.clone(),
154 reason,
155 hint,
156 rejected_at: now,
157 retry_count: 0,
158 });
159
160 Ok(id)
161 }
162
163 pub fn peek(&self) -> Option<&DeadLetter> {
165 self.entries.front()
166 }
167
168 pub fn dequeue(&mut self) -> Option<DeadLetter> {
170 self.entries.pop_front()
171 }
172
173 pub fn get(&self, id: u64) -> Option<&DeadLetter> {
175 self.entries.iter().find(|dl| dl.id == id)
176 }
177
178 pub fn remove(&mut self, id: u64) -> Option<DeadLetter> {
180 if let Some(pos) = self.entries.iter().position(|dl| dl.id == id) {
181 self.entries.remove(pos)
182 } else {
183 None
184 }
185 }
186
187 pub fn purge_collection(&mut self, tenant_id: u64, collection: &str) -> usize {
192 let before = self.entries.len();
193 self.entries
194 .retain(|e| !(e.tenant_id == tenant_id && e.collection == collection));
195 before - self.entries.len()
196 }
197
198 pub fn drain_peer(&mut self, peer_id: u64) -> Vec<DeadLetter> {
200 let mut drained = Vec::new();
201 self.entries.retain(|dl| {
202 if dl.peer_id == peer_id {
203 drained.push(dl.clone());
204 false
205 } else {
206 true
207 }
208 });
209 drained
210 }
211
212 pub fn len(&self) -> usize {
214 self.entries.len()
215 }
216
217 pub fn is_empty(&self) -> bool {
218 self.entries.is_empty()
219 }
220
221 pub fn capacity(&self) -> usize {
223 self.capacity
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230 use crate::constraint::{Constraint, ConstraintKind};
231
232 fn test_constraint() -> Constraint {
233 Constraint {
234 name: "users_email_unique".into(),
235 collection: "users".into(),
236 field: "email".into(),
237 kind: ConstraintKind::Unique,
238 }
239 }
240
241 #[test]
242 fn enqueue_and_dequeue() {
243 let mut dlq = DeadLetterQueue::new(10);
244 let c = test_constraint();
245
246 let id = dlq
247 .enqueue(
248 42,
249 0,
250 0,
251 b"delta-bytes".to_vec(),
252 &c,
253 "email already exists".into(),
254 CompensationHint::RetryWithDifferentValue {
255 field: "email".into(),
256 conflicting_value: "alice@example.com".into(),
257 suggestion: "alice+1@example.com".into(),
258 },
259 )
260 .unwrap();
261
262 assert_eq!(dlq.len(), 1);
263 assert_eq!(id, 1);
264
265 let dl = dlq.dequeue().unwrap();
266 assert_eq!(dl.peer_id, 42);
267 assert_eq!(dl.violated_constraint, "users_email_unique");
268 assert!(dlq.is_empty());
269 }
270
271 #[test]
272 fn capacity_enforced() {
273 let mut dlq = DeadLetterQueue::new(2);
274 let c = test_constraint();
275 let hint = CompensationHint::ManualIntervention {
276 reason: "test".into(),
277 };
278
279 dlq.enqueue(1, 0, 0, vec![], &c, "r1".into(), hint.clone())
280 .unwrap();
281 dlq.enqueue(2, 0, 0, vec![], &c, "r2".into(), hint.clone())
282 .unwrap();
283
284 let err = dlq.enqueue(3, 0, 0, vec![], &c, "r3".into(), hint);
285 assert!(matches!(err, Err(CrdtError::DlqFull { .. })));
286 }
287
288 #[test]
289 fn purge_collection_drops_only_matching_entries() {
290 let mut dlq = DeadLetterQueue::new(10);
291 let users_c = Constraint {
292 name: "users_email_unique".into(),
293 collection: "users".into(),
294 field: "email".into(),
295 kind: ConstraintKind::Unique,
296 };
297 let orders_c = Constraint {
298 name: "orders_sku_unique".into(),
299 collection: "orders".into(),
300 field: "sku".into(),
301 kind: ConstraintKind::Unique,
302 };
303 let hint = CompensationHint::ManualIntervention { reason: "t".into() };
304
305 dlq.enqueue(10, 0, 1, vec![], &users_c, "a".into(), hint.clone())
309 .unwrap();
310 dlq.enqueue(11, 0, 1, vec![], &users_c, "b".into(), hint.clone())
311 .unwrap();
312 dlq.enqueue(12, 0, 1, vec![], &orders_c, "c".into(), hint.clone())
313 .unwrap();
314 dlq.enqueue(13, 0, 2, vec![], &users_c, "d".into(), hint.clone())
315 .unwrap();
316
317 let removed = dlq.purge_collection(1, "users");
318 assert_eq!(removed, 2);
319 assert_eq!(dlq.len(), 2);
320
321 assert_eq!(dlq.purge_collection(1, "users"), 0);
323
324 let remaining: Vec<_> = (0..dlq.len()).map(|_| dlq.dequeue().unwrap()).collect();
326 assert!(
327 remaining
328 .iter()
329 .any(|d| d.collection == "orders" && d.tenant_id == 1)
330 );
331 assert!(
332 remaining
333 .iter()
334 .any(|d| d.collection == "users" && d.tenant_id == 2)
335 );
336 }
337
338 #[test]
339 fn drain_by_peer() {
340 let mut dlq = DeadLetterQueue::new(10);
341 let c = test_constraint();
342 let hint = CompensationHint::ManualIntervention {
343 reason: "test".into(),
344 };
345
346 dlq.enqueue(1, 0, 0, vec![], &c, "a".into(), hint.clone())
347 .unwrap();
348 dlq.enqueue(2, 0, 0, vec![], &c, "b".into(), hint.clone())
349 .unwrap();
350 dlq.enqueue(1, 0, 0, vec![], &c, "c".into(), hint).unwrap();
351
352 let peer1 = dlq.drain_peer(1);
353 assert_eq!(peer1.len(), 2);
354 assert_eq!(dlq.len(), 1);
355 }
356
357 #[test]
358 fn remove_by_id() {
359 let mut dlq = DeadLetterQueue::new(10);
360 let c = test_constraint();
361 let hint = CompensationHint::ManualIntervention {
362 reason: "test".into(),
363 };
364
365 let id1 = dlq
366 .enqueue(1, 0, 0, vec![], &c, "a".into(), hint.clone())
367 .unwrap();
368 let _id2 = dlq.enqueue(1, 0, 0, vec![], &c, "b".into(), hint).unwrap();
369
370 let removed = dlq.remove(id1).unwrap();
371 assert_eq!(removed.reason, "a");
372 assert_eq!(dlq.len(), 1);
373 }
374}