1use std::collections::VecDeque;
10
11#[derive(Debug, Clone)]
13pub struct DeferredEntry {
14 pub id: u64,
16
17 pub peer_id: u64,
19
20 pub user_id: u64,
22
23 pub tenant_id: u64,
25
26 pub delta: Vec<u8>,
28
29 pub collection: String,
31
32 pub constraint_name: String,
34
35 pub attempt: u32,
37
38 pub max_retries: u32,
40
41 pub next_retry_ms: u64,
43
44 pub ttl_deadline_ms: u64,
46}
47
48#[derive(Debug)]
50pub struct DeferredQueue {
51 entries: VecDeque<DeferredEntry>,
52 capacity: usize,
53 next_id: u64,
54}
55
56impl DeferredQueue {
57 pub fn new(capacity: usize) -> Self {
59 Self {
60 entries: VecDeque::with_capacity(capacity.min(1024)),
61 capacity,
62 next_id: 1,
63 }
64 }
65
66 #[allow(clippy::too_many_arguments)]
80 pub fn enqueue(
81 &mut self,
82 peer_id: u64,
83 user_id: u64,
84 tenant_id: u64,
85 delta: Vec<u8>,
86 collection: String,
87 constraint_name: String,
88 attempt: u32,
89 max_retries: u32,
90 now_ms: u64,
91 first_retry_after_ms: u64,
92 ttl_secs: u64,
93 ) -> u64 {
94 let id = self.next_id;
95 self.next_id += 1;
96
97 let entry = DeferredEntry {
98 id,
99 peer_id,
100 user_id,
101 tenant_id,
102 delta,
103 collection,
104 constraint_name,
105 attempt,
106 max_retries,
107 next_retry_ms: now_ms + first_retry_after_ms,
108 ttl_deadline_ms: now_ms + (ttl_secs * 1000),
109 };
110
111 self.entries.push_back(entry);
112 id
113 }
114
115 pub fn poll_ready(&mut self, now_ms: u64) -> Vec<DeferredEntry> {
119 let mut ready = Vec::new();
120
121 while let Some(front) = self.entries.front() {
122 if front.next_retry_ms <= now_ms
123 && let Some(entry) = self.entries.pop_front()
124 {
125 ready.push(entry);
126 } else {
127 break;
128 }
129 }
130
131 ready
132 }
133
134 pub fn expire(&mut self, now_ms: u64) -> Vec<DeferredEntry> {
139 let mut expired = Vec::new();
140
141 self.entries.retain(|entry| {
142 if entry.ttl_deadline_ms <= now_ms {
143 expired.push(entry.clone());
144 false
145 } else {
146 true
147 }
148 });
149
150 expired
151 }
152
153 pub fn enqueue_retry(&mut self, mut entry: DeferredEntry, now_ms: u64) {
157 let base_ms = 500u64;
158 let backoff = base_ms
159 .saturating_mul(2_u64.saturating_pow(entry.attempt))
160 .min(30_000);
161
162 entry.attempt += 1;
163 entry.next_retry_ms = now_ms + backoff;
164
165 self.entries.push_back(entry);
166 }
167
168 pub fn len(&self) -> usize {
170 self.entries.len()
171 }
172
173 pub fn is_empty(&self) -> bool {
174 self.entries.is_empty()
175 }
176
177 pub fn capacity(&self) -> usize {
179 self.capacity
180 }
181
182 pub fn clear(&mut self) {
184 self.entries.clear();
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191
192 #[test]
193 fn enqueue_and_poll_ready() {
194 let mut queue = DeferredQueue::new(10);
195 let now = 1000;
196
197 queue.enqueue(
199 42,
200 0,
201 0,
202 b"delta".to_vec(),
203 "posts".to_string(),
204 "posts_author_fk".to_string(),
205 0,
206 3,
207 now,
208 500,
209 60,
210 );
211
212 assert!(queue.poll_ready(now).is_empty());
214
215 let ready = queue.poll_ready(now + 500);
217 assert_eq!(ready.len(), 1);
218 assert_eq!(ready[0].attempt, 0);
219 assert!(queue.is_empty());
220 }
221
222 #[test]
223 fn expire_past_ttl() {
224 let mut queue = DeferredQueue::new(10);
225 let now = 1000;
226
227 queue.enqueue(
229 42,
230 0,
231 0,
232 b"delta".to_vec(),
233 "posts".to_string(),
234 "posts_author_fk".to_string(),
235 0,
236 3,
237 now,
238 500,
239 10,
240 );
241
242 assert!(queue.expire(now + 5000).is_empty());
244 assert_eq!(queue.len(), 1);
245
246 let expired = queue.expire(now + 11_000);
248 assert_eq!(expired.len(), 1);
249 assert!(queue.is_empty());
250 }
251
252 #[test]
253 fn exponential_backoff() {
254 let mut queue = DeferredQueue::new(10);
255 let now = 1000;
256
257 let id = queue.enqueue(
259 42,
260 0,
261 0,
262 b"delta".to_vec(),
263 "posts".to_string(),
264 "posts_author_fk".to_string(),
265 0,
266 3,
267 now,
268 500,
269 60,
270 );
271
272 let ready = queue.poll_ready(now + 500);
274 assert_eq!(ready.len(), 1);
275 let entry = &ready[0];
276 assert_eq!(entry.id, id);
277 assert_eq!(entry.attempt, 0);
278
279 let entry_clone = ready[0].clone();
281 queue.enqueue_retry(entry_clone, now + 500);
282
283 let ready = queue.poll_ready(now + 1500);
285 assert_eq!(ready.len(), 1);
286 assert_eq!(ready[0].attempt, 1);
287
288 queue.enqueue_retry(ready[0].clone(), now + 1500);
290
291 let ready = queue.poll_ready(now + 3500);
293 assert_eq!(ready.len(), 1);
294 assert_eq!(ready[0].attempt, 2);
295 }
296
297 #[test]
298 fn max_retries_respected() {
299 let mut queue = DeferredQueue::new(10);
300 let now = 1000;
301
302 queue.enqueue(
304 42,
305 0,
306 0,
307 b"delta".to_vec(),
308 "posts".to_string(),
309 "posts_author_fk".to_string(),
310 0,
311 3,
312 now,
313 500,
314 60,
315 );
316
317 let ready = queue.poll_ready(now + 500);
318 let mut entry = ready[0].clone();
319 assert_eq!(entry.max_retries, 3);
320
321 for _ in 0..3 {
323 entry.attempt += 1;
324 assert!(entry.attempt <= entry.max_retries);
325 }
326
327 assert_eq!(entry.attempt, 3);
329 assert!(entry.attempt >= entry.max_retries);
330 }
331
332 #[test]
333 fn fifo_ordering() {
334 let mut queue = DeferredQueue::new(10);
335 let now = 1000;
336
337 for i in 0..3 {
339 queue.enqueue(
340 40 + i,
341 0,
342 0,
343 format!("delta{}", i).into_bytes(),
344 "posts".to_string(),
345 "posts_author_fk".to_string(),
346 0,
347 3,
348 now,
349 500,
350 60,
351 );
352 }
353
354 assert_eq!(queue.len(), 3);
355
356 let ready = queue.poll_ready(now + 500);
357 assert_eq!(ready.len(), 3);
358
359 assert_eq!(ready[0].peer_id, 40);
361 assert_eq!(ready[1].peer_id, 41);
362 assert_eq!(ready[2].peer_id, 42);
363 }
364
365 #[test]
366 fn capacity_limit() {
367 let mut queue = DeferredQueue::new(2);
368 let now = 1000;
369
370 queue.enqueue(
371 1,
372 0,
373 0,
374 b"d1".to_vec(),
375 "c".into(),
376 "cn".into(),
377 0,
378 3,
379 now,
380 500,
381 60,
382 );
383 queue.enqueue(
384 2,
385 0,
386 0,
387 b"d2".to_vec(),
388 "c".into(),
389 "cn".into(),
390 0,
391 3,
392 now,
393 500,
394 60,
395 );
396
397 assert_eq!(queue.len(), 2);
398
399 queue.enqueue(
401 3,
402 0,
403 0,
404 b"d3".to_vec(),
405 "c".into(),
406 "cn".into(),
407 0,
408 3,
409 now,
410 500,
411 60,
412 );
413 assert_eq!(queue.len(), 3); }
415
416 #[test]
417 fn clear_empties_queue() {
418 let mut queue = DeferredQueue::new(10);
419 let now = 1000;
420
421 queue.enqueue(
422 1,
423 0,
424 0,
425 b"d1".to_vec(),
426 "c".into(),
427 "cn".into(),
428 0,
429 3,
430 now,
431 500,
432 60,
433 );
434 queue.enqueue(
435 2,
436 0,
437 0,
438 b"d2".to_vec(),
439 "c".into(),
440 "cn".into(),
441 0,
442 3,
443 now,
444 500,
445 60,
446 );
447
448 assert_eq!(queue.len(), 2);
449 queue.clear();
450 assert!(queue.is_empty());
451 }
452}