Skip to main content

nodedb_crdt/
deferred.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Deferred retry queue for CASCADE_DEFER policy.
4//!
5//! When a foreign key violation occurs and CASCADE_DEFER is the configured policy,
6//! the delta is enqueued here with exponential backoff. The queue polls for ready
7//! entries and expires entries that exceed their TTL.
8
9use std::collections::VecDeque;
10
11/// A deferred entry awaiting retry due to a CASCADE_DEFER policy.
12#[derive(Debug, Clone)]
13pub struct DeferredEntry {
14    /// Unique entry ID.
15    pub id: u64,
16
17    /// The peer that produced this delta.
18    pub peer_id: u64,
19
20    /// The authenticated user_id that submitted this delta (0 = unauthenticated).
21    pub user_id: u64,
22
23    /// The tenant this delta belongs to (0 = system).
24    pub tenant_id: u64,
25
26    /// The raw delta bytes.
27    pub delta: Vec<u8>,
28
29    /// The collection being modified.
30    pub collection: String,
31
32    /// The constraint name that triggered deferral.
33    pub constraint_name: String,
34
35    /// Current attempt number (0-indexed).
36    pub attempt: u32,
37
38    /// Maximum allowed retries.
39    pub max_retries: u32,
40
41    /// Milliseconds since epoch when this entry should be retried.
42    pub next_retry_ms: u64,
43
44    /// Milliseconds since epoch when this entry expires (given up).
45    pub ttl_deadline_ms: u64,
46}
47
48/// Queue of deferred entries awaiting retry.
49#[derive(Debug)]
50pub struct DeferredQueue {
51    entries: VecDeque<DeferredEntry>,
52    capacity: usize,
53    next_id: u64,
54}
55
56impl DeferredQueue {
57    /// Create a new deferred queue with the given capacity.
58    pub fn new(capacity: usize) -> Self {
59        Self {
60            entries: VecDeque::with_capacity(capacity.min(1024)),
61            capacity,
62            next_id: 1,
63        }
64    }
65
66    /// Enqueue a deferred entry.
67    ///
68    /// # Arguments
69    ///
70    /// * `peer_id` — the peer that produced the delta
71    /// * `delta` — the raw delta bytes
72    /// * `collection` — the target collection
73    /// * `constraint_name` — the constraint that was violated
74    /// * `attempt` — current attempt number (usually 0 for first deferral)
75    /// * `max_retries` — maximum retries allowed
76    /// * `now_ms` — current time in milliseconds since epoch
77    /// * `first_retry_after_ms` — milliseconds to wait before first retry
78    /// * `ttl_secs` — seconds from now until this entry expires
79    #[allow(clippy::too_many_arguments)]
80    pub fn enqueue(
81        &mut self,
82        peer_id: u64,
83        user_id: u64,
84        tenant_id: u64,
85        delta: Vec<u8>,
86        collection: String,
87        constraint_name: String,
88        attempt: u32,
89        max_retries: u32,
90        now_ms: u64,
91        first_retry_after_ms: u64,
92        ttl_secs: u64,
93    ) -> u64 {
94        let id = self.next_id;
95        self.next_id += 1;
96
97        let entry = DeferredEntry {
98            id,
99            peer_id,
100            user_id,
101            tenant_id,
102            delta,
103            collection,
104            constraint_name,
105            attempt,
106            max_retries,
107            next_retry_ms: now_ms + first_retry_after_ms,
108            ttl_deadline_ms: now_ms + (ttl_secs * 1000),
109        };
110
111        self.entries.push_back(entry);
112        id
113    }
114
115    /// Poll for entries ready for retry.
116    ///
117    /// Returns all entries whose `next_retry_ms <= now_ms`.
118    pub fn poll_ready(&mut self, now_ms: u64) -> Vec<DeferredEntry> {
119        let mut ready = Vec::new();
120
121        while let Some(front) = self.entries.front() {
122            if front.next_retry_ms <= now_ms
123                && let Some(entry) = self.entries.pop_front()
124            {
125                ready.push(entry);
126            } else {
127                break;
128            }
129        }
130
131        ready
132    }
133
134    /// Expire entries past their TTL deadline.
135    ///
136    /// Returns all entries whose `ttl_deadline_ms <= now_ms`.
137    /// These should be routed to the DLQ as unrecoverable.
138    pub fn expire(&mut self, now_ms: u64) -> Vec<DeferredEntry> {
139        let mut expired = Vec::new();
140
141        self.entries.retain(|entry| {
142            if entry.ttl_deadline_ms <= now_ms {
143                expired.push(entry.clone());
144                false
145            } else {
146                true
147            }
148        });
149
150        expired
151    }
152
153    /// Re-enqueue an entry for retry after the next backoff interval.
154    ///
155    /// The backoff is calculated as: `base_ms * 2^attempt`, capped at 30s.
156    pub fn enqueue_retry(&mut self, mut entry: DeferredEntry, now_ms: u64) {
157        let base_ms = 500u64;
158        let backoff = base_ms
159            .saturating_mul(2_u64.saturating_pow(entry.attempt))
160            .min(30_000);
161
162        entry.attempt += 1;
163        entry.next_retry_ms = now_ms + backoff;
164
165        self.entries.push_back(entry);
166    }
167
168    /// Number of pending deferred entries.
169    pub fn len(&self) -> usize {
170        self.entries.len()
171    }
172
173    pub fn is_empty(&self) -> bool {
174        self.entries.is_empty()
175    }
176
177    /// Capacity of the queue.
178    pub fn capacity(&self) -> usize {
179        self.capacity
180    }
181
182    /// Clear all entries.
183    pub fn clear(&mut self) {
184        self.entries.clear();
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    #[test]
193    fn enqueue_and_poll_ready() {
194        let mut queue = DeferredQueue::new(10);
195        let now = 1000;
196
197        // Enqueue an entry ready to retry at now + 500ms
198        queue.enqueue(
199            42,
200            0,
201            0,
202            b"delta".to_vec(),
203            "posts".to_string(),
204            "posts_author_fk".to_string(),
205            0,
206            3,
207            now,
208            500,
209            60,
210        );
211
212        // At time=now, entry is not ready
213        assert!(queue.poll_ready(now).is_empty());
214
215        // At time=now+500, entry is ready
216        let ready = queue.poll_ready(now + 500);
217        assert_eq!(ready.len(), 1);
218        assert_eq!(ready[0].attempt, 0);
219        assert!(queue.is_empty());
220    }
221
222    #[test]
223    fn expire_past_ttl() {
224        let mut queue = DeferredQueue::new(10);
225        let now = 1000;
226
227        // Enqueue with TTL of 10 seconds
228        queue.enqueue(
229            42,
230            0,
231            0,
232            b"delta".to_vec(),
233            "posts".to_string(),
234            "posts_author_fk".to_string(),
235            0,
236            3,
237            now,
238            500,
239            10,
240        );
241
242        // Before TTL expires
243        assert!(queue.expire(now + 5000).is_empty());
244        assert_eq!(queue.len(), 1);
245
246        // After TTL expires
247        let expired = queue.expire(now + 11_000);
248        assert_eq!(expired.len(), 1);
249        assert!(queue.is_empty());
250    }
251
252    #[test]
253    fn exponential_backoff() {
254        let mut queue = DeferredQueue::new(10);
255        let now = 1000;
256
257        // Enqueue initial entry
258        let id = queue.enqueue(
259            42,
260            0,
261            0,
262            b"delta".to_vec(),
263            "posts".to_string(),
264            "posts_author_fk".to_string(),
265            0,
266            3,
267            now,
268            500,
269            60,
270        );
271
272        // Poll and get the entry
273        let ready = queue.poll_ready(now + 500);
274        assert_eq!(ready.len(), 1);
275        let entry = &ready[0];
276        assert_eq!(entry.id, id);
277        assert_eq!(entry.attempt, 0);
278
279        // Re-enqueue for retry (attempt 1)
280        let entry_clone = ready[0].clone();
281        queue.enqueue_retry(entry_clone, now + 500);
282
283        // Attempt 1: backoff = 500 * 2^1 = 1000ms
284        let ready = queue.poll_ready(now + 1500);
285        assert_eq!(ready.len(), 1);
286        assert_eq!(ready[0].attempt, 1);
287
288        // Re-enqueue for retry (attempt 2)
289        queue.enqueue_retry(ready[0].clone(), now + 1500);
290
291        // Attempt 2: backoff = 500 * 2^2 = 2000ms
292        let ready = queue.poll_ready(now + 3500);
293        assert_eq!(ready.len(), 1);
294        assert_eq!(ready[0].attempt, 2);
295    }
296
297    #[test]
298    fn max_retries_respected() {
299        let mut queue = DeferredQueue::new(10);
300        let now = 1000;
301
302        // Enqueue with max_retries=3
303        queue.enqueue(
304            42,
305            0,
306            0,
307            b"delta".to_vec(),
308            "posts".to_string(),
309            "posts_author_fk".to_string(),
310            0,
311            3,
312            now,
313            500,
314            60,
315        );
316
317        let ready = queue.poll_ready(now + 500);
318        let mut entry = ready[0].clone();
319        assert_eq!(entry.max_retries, 3);
320
321        // Retry loop: attempt 0, 1, 2, 3
322        for _ in 0..3 {
323            entry.attempt += 1;
324            assert!(entry.attempt <= entry.max_retries);
325        }
326
327        // After max_retries, this entry should not be re-enqueued
328        assert_eq!(entry.attempt, 3);
329        assert!(entry.attempt >= entry.max_retries);
330    }
331
332    #[test]
333    fn fifo_ordering() {
334        let mut queue = DeferredQueue::new(10);
335        let now = 1000;
336
337        // Enqueue three entries with same retry time
338        for i in 0..3 {
339            queue.enqueue(
340                40 + i,
341                0,
342                0,
343                format!("delta{}", i).into_bytes(),
344                "posts".to_string(),
345                "posts_author_fk".to_string(),
346                0,
347                3,
348                now,
349                500,
350                60,
351            );
352        }
353
354        assert_eq!(queue.len(), 3);
355
356        let ready = queue.poll_ready(now + 500);
357        assert_eq!(ready.len(), 3);
358
359        // Verify FIFO order by peer_id
360        assert_eq!(ready[0].peer_id, 40);
361        assert_eq!(ready[1].peer_id, 41);
362        assert_eq!(ready[2].peer_id, 42);
363    }
364
365    #[test]
366    fn capacity_limit() {
367        let mut queue = DeferredQueue::new(2);
368        let now = 1000;
369
370        queue.enqueue(
371            1,
372            0,
373            0,
374            b"d1".to_vec(),
375            "c".into(),
376            "cn".into(),
377            0,
378            3,
379            now,
380            500,
381            60,
382        );
383        queue.enqueue(
384            2,
385            0,
386            0,
387            b"d2".to_vec(),
388            "c".into(),
389            "cn".into(),
390            0,
391            3,
392            now,
393            500,
394            60,
395        );
396
397        assert_eq!(queue.len(), 2);
398
399        // Capacity is 2, so this still works (no error checking in enqueue)
400        queue.enqueue(
401            3,
402            0,
403            0,
404            b"d3".to_vec(),
405            "c".into(),
406            "cn".into(),
407            0,
408            3,
409            now,
410            500,
411            60,
412        );
413        assert_eq!(queue.len(), 3); // The queue doesn't enforce capacity strictly; it's advisory.
414    }
415
416    #[test]
417    fn clear_empties_queue() {
418        let mut queue = DeferredQueue::new(10);
419        let now = 1000;
420
421        queue.enqueue(
422            1,
423            0,
424            0,
425            b"d1".to_vec(),
426            "c".into(),
427            "cn".into(),
428            0,
429            3,
430            now,
431            500,
432            60,
433        );
434        queue.enqueue(
435            2,
436            0,
437            0,
438            b"d2".to_vec(),
439            "c".into(),
440            "cn".into(),
441            0,
442            3,
443            now,
444            500,
445            60,
446        );
447
448        assert_eq!(queue.len(), 2);
449        queue.clear();
450        assert!(queue.is_empty());
451    }
452}