Skip to main content

nodedb_crdt/
deferred.rs

1//! Deferred retry queue for CASCADE_DEFER policy.
2//!
3//! When a foreign key violation occurs and CASCADE_DEFER is the configured policy,
4//! the delta is enqueued here with exponential backoff. The queue polls for ready
5//! entries and expires entries that exceed their TTL.
6
7use std::collections::VecDeque;
8
9/// A deferred entry awaiting retry due to a CASCADE_DEFER policy.
10#[derive(Debug, Clone)]
11pub struct DeferredEntry {
12    /// Unique entry ID.
13    pub id: u64,
14
15    /// The peer that produced this delta.
16    pub peer_id: u64,
17
18    /// The authenticated user_id that submitted this delta (0 = unauthenticated).
19    pub user_id: u64,
20
21    /// The tenant this delta belongs to (0 = system).
22    pub tenant_id: u32,
23
24    /// The raw delta bytes.
25    pub delta: Vec<u8>,
26
27    /// The collection being modified.
28    pub collection: String,
29
30    /// The constraint name that triggered deferral.
31    pub constraint_name: String,
32
33    /// Current attempt number (0-indexed).
34    pub attempt: u32,
35
36    /// Maximum allowed retries.
37    pub max_retries: u32,
38
39    /// Milliseconds since epoch when this entry should be retried.
40    pub next_retry_ms: u64,
41
42    /// Milliseconds since epoch when this entry expires (given up).
43    pub ttl_deadline_ms: u64,
44}
45
46/// Queue of deferred entries awaiting retry.
47#[derive(Debug)]
48pub struct DeferredQueue {
49    entries: VecDeque<DeferredEntry>,
50    capacity: usize,
51    next_id: u64,
52}
53
54impl DeferredQueue {
55    /// Create a new deferred queue with the given capacity.
56    pub fn new(capacity: usize) -> Self {
57        Self {
58            entries: VecDeque::with_capacity(capacity.min(1024)),
59            capacity,
60            next_id: 1,
61        }
62    }
63
64    /// Enqueue a deferred entry.
65    ///
66    /// # Arguments
67    ///
68    /// * `peer_id` — the peer that produced the delta
69    /// * `delta` — the raw delta bytes
70    /// * `collection` — the target collection
71    /// * `constraint_name` — the constraint that was violated
72    /// * `attempt` — current attempt number (usually 0 for first deferral)
73    /// * `max_retries` — maximum retries allowed
74    /// * `now_ms` — current time in milliseconds since epoch
75    /// * `first_retry_after_ms` — milliseconds to wait before first retry
76    /// * `ttl_secs` — seconds from now until this entry expires
77    #[allow(clippy::too_many_arguments)]
78    pub fn enqueue(
79        &mut self,
80        peer_id: u64,
81        user_id: u64,
82        tenant_id: u32,
83        delta: Vec<u8>,
84        collection: String,
85        constraint_name: String,
86        attempt: u32,
87        max_retries: u32,
88        now_ms: u64,
89        first_retry_after_ms: u64,
90        ttl_secs: u64,
91    ) -> u64 {
92        let id = self.next_id;
93        self.next_id += 1;
94
95        let entry = DeferredEntry {
96            id,
97            peer_id,
98            user_id,
99            tenant_id,
100            delta,
101            collection,
102            constraint_name,
103            attempt,
104            max_retries,
105            next_retry_ms: now_ms + first_retry_after_ms,
106            ttl_deadline_ms: now_ms + (ttl_secs * 1000),
107        };
108
109        self.entries.push_back(entry);
110        id
111    }
112
113    /// Poll for entries ready for retry.
114    ///
115    /// Returns all entries whose `next_retry_ms <= now_ms`.
116    pub fn poll_ready(&mut self, now_ms: u64) -> Vec<DeferredEntry> {
117        let mut ready = Vec::new();
118
119        while let Some(front) = self.entries.front() {
120            if front.next_retry_ms <= now_ms
121                && let Some(entry) = self.entries.pop_front()
122            {
123                ready.push(entry);
124            } else {
125                break;
126            }
127        }
128
129        ready
130    }
131
132    /// Expire entries past their TTL deadline.
133    ///
134    /// Returns all entries whose `ttl_deadline_ms <= now_ms`.
135    /// These should be routed to the DLQ as unrecoverable.
136    pub fn expire(&mut self, now_ms: u64) -> Vec<DeferredEntry> {
137        let mut expired = Vec::new();
138
139        self.entries.retain(|entry| {
140            if entry.ttl_deadline_ms <= now_ms {
141                expired.push(entry.clone());
142                false
143            } else {
144                true
145            }
146        });
147
148        expired
149    }
150
151    /// Re-enqueue an entry for retry after the next backoff interval.
152    ///
153    /// The backoff is calculated as: `base_ms * 2^attempt`, capped at 30s.
154    pub fn enqueue_retry(&mut self, mut entry: DeferredEntry, now_ms: u64) {
155        let base_ms = 500u64;
156        let backoff = base_ms
157            .saturating_mul(2_u64.saturating_pow(entry.attempt))
158            .min(30_000);
159
160        entry.attempt += 1;
161        entry.next_retry_ms = now_ms + backoff;
162
163        self.entries.push_back(entry);
164    }
165
166    /// Number of pending deferred entries.
167    pub fn len(&self) -> usize {
168        self.entries.len()
169    }
170
171    pub fn is_empty(&self) -> bool {
172        self.entries.is_empty()
173    }
174
175    /// Capacity of the queue.
176    pub fn capacity(&self) -> usize {
177        self.capacity
178    }
179
180    /// Clear all entries.
181    pub fn clear(&mut self) {
182        self.entries.clear();
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use super::*;
189
190    #[test]
191    fn enqueue_and_poll_ready() {
192        let mut queue = DeferredQueue::new(10);
193        let now = 1000;
194
195        // Enqueue an entry ready to retry at now + 500ms
196        queue.enqueue(
197            42,
198            0,
199            0,
200            b"delta".to_vec(),
201            "posts".to_string(),
202            "posts_author_fk".to_string(),
203            0,
204            3,
205            now,
206            500,
207            60,
208        );
209
210        // At time=now, entry is not ready
211        assert!(queue.poll_ready(now).is_empty());
212
213        // At time=now+500, entry is ready
214        let ready = queue.poll_ready(now + 500);
215        assert_eq!(ready.len(), 1);
216        assert_eq!(ready[0].attempt, 0);
217        assert!(queue.is_empty());
218    }
219
220    #[test]
221    fn expire_past_ttl() {
222        let mut queue = DeferredQueue::new(10);
223        let now = 1000;
224
225        // Enqueue with TTL of 10 seconds
226        queue.enqueue(
227            42,
228            0,
229            0,
230            b"delta".to_vec(),
231            "posts".to_string(),
232            "posts_author_fk".to_string(),
233            0,
234            3,
235            now,
236            500,
237            10,
238        );
239
240        // Before TTL expires
241        assert!(queue.expire(now + 5000).is_empty());
242        assert_eq!(queue.len(), 1);
243
244        // After TTL expires
245        let expired = queue.expire(now + 11_000);
246        assert_eq!(expired.len(), 1);
247        assert!(queue.is_empty());
248    }
249
250    #[test]
251    fn exponential_backoff() {
252        let mut queue = DeferredQueue::new(10);
253        let now = 1000;
254
255        // Enqueue initial entry
256        let id = queue.enqueue(
257            42,
258            0,
259            0,
260            b"delta".to_vec(),
261            "posts".to_string(),
262            "posts_author_fk".to_string(),
263            0,
264            3,
265            now,
266            500,
267            60,
268        );
269
270        // Poll and get the entry
271        let ready = queue.poll_ready(now + 500);
272        assert_eq!(ready.len(), 1);
273        let entry = &ready[0];
274        assert_eq!(entry.id, id);
275        assert_eq!(entry.attempt, 0);
276
277        // Re-enqueue for retry (attempt 1)
278        let entry_clone = ready[0].clone();
279        queue.enqueue_retry(entry_clone, now + 500);
280
281        // Attempt 1: backoff = 500 * 2^1 = 1000ms
282        let ready = queue.poll_ready(now + 1500);
283        assert_eq!(ready.len(), 1);
284        assert_eq!(ready[0].attempt, 1);
285
286        // Re-enqueue for retry (attempt 2)
287        queue.enqueue_retry(ready[0].clone(), now + 1500);
288
289        // Attempt 2: backoff = 500 * 2^2 = 2000ms
290        let ready = queue.poll_ready(now + 3500);
291        assert_eq!(ready.len(), 1);
292        assert_eq!(ready[0].attempt, 2);
293    }
294
295    #[test]
296    fn max_retries_respected() {
297        let mut queue = DeferredQueue::new(10);
298        let now = 1000;
299
300        // Enqueue with max_retries=3
301        queue.enqueue(
302            42,
303            0,
304            0,
305            b"delta".to_vec(),
306            "posts".to_string(),
307            "posts_author_fk".to_string(),
308            0,
309            3,
310            now,
311            500,
312            60,
313        );
314
315        let ready = queue.poll_ready(now + 500);
316        let mut entry = ready[0].clone();
317        assert_eq!(entry.max_retries, 3);
318
319        // Retry loop: attempt 0, 1, 2, 3
320        for _ in 0..3 {
321            entry.attempt += 1;
322            assert!(entry.attempt <= entry.max_retries);
323        }
324
325        // After max_retries, this entry should not be re-enqueued
326        assert_eq!(entry.attempt, 3);
327        assert!(entry.attempt >= entry.max_retries);
328    }
329
330    #[test]
331    fn fifo_ordering() {
332        let mut queue = DeferredQueue::new(10);
333        let now = 1000;
334
335        // Enqueue three entries with same retry time
336        for i in 0..3 {
337            queue.enqueue(
338                40 + i,
339                0,
340                0,
341                format!("delta{}", i).into_bytes(),
342                "posts".to_string(),
343                "posts_author_fk".to_string(),
344                0,
345                3,
346                now,
347                500,
348                60,
349            );
350        }
351
352        assert_eq!(queue.len(), 3);
353
354        let ready = queue.poll_ready(now + 500);
355        assert_eq!(ready.len(), 3);
356
357        // Verify FIFO order by peer_id
358        assert_eq!(ready[0].peer_id, 40);
359        assert_eq!(ready[1].peer_id, 41);
360        assert_eq!(ready[2].peer_id, 42);
361    }
362
363    #[test]
364    fn capacity_limit() {
365        let mut queue = DeferredQueue::new(2);
366        let now = 1000;
367
368        queue.enqueue(
369            1,
370            0,
371            0,
372            b"d1".to_vec(),
373            "c".into(),
374            "cn".into(),
375            0,
376            3,
377            now,
378            500,
379            60,
380        );
381        queue.enqueue(
382            2,
383            0,
384            0,
385            b"d2".to_vec(),
386            "c".into(),
387            "cn".into(),
388            0,
389            3,
390            now,
391            500,
392            60,
393        );
394
395        assert_eq!(queue.len(), 2);
396
397        // Capacity is 2, so this still works (no error checking in enqueue)
398        queue.enqueue(
399            3,
400            0,
401            0,
402            b"d3".to_vec(),
403            "c".into(),
404            "cn".into(),
405            0,
406            3,
407            now,
408            500,
409            60,
410        );
411        assert_eq!(queue.len(), 3); // The queue doesn't enforce capacity strictly; it's advisory.
412    }
413
414    #[test]
415    fn clear_empties_queue() {
416        let mut queue = DeferredQueue::new(10);
417        let now = 1000;
418
419        queue.enqueue(
420            1,
421            0,
422            0,
423            b"d1".to_vec(),
424            "c".into(),
425            "cn".into(),
426            0,
427            3,
428            now,
429            500,
430            60,
431        );
432        queue.enqueue(
433            2,
434            0,
435            0,
436            b"d2".to_vec(),
437            "c".into(),
438            "cn".into(),
439            0,
440            3,
441            now,
442            500,
443            60,
444        );
445
446        assert_eq!(queue.len(), 2);
447        queue.clear();
448        assert!(queue.is_empty());
449    }
450}