1use std::collections::VecDeque;
8
9#[derive(Debug, Clone)]
11pub struct DeferredEntry {
12 pub id: u64,
14
15 pub peer_id: u64,
17
18 pub user_id: u64,
20
21 pub tenant_id: u32,
23
24 pub delta: Vec<u8>,
26
27 pub collection: String,
29
30 pub constraint_name: String,
32
33 pub attempt: u32,
35
36 pub max_retries: u32,
38
39 pub next_retry_ms: u64,
41
42 pub ttl_deadline_ms: u64,
44}
45
46#[derive(Debug)]
48pub struct DeferredQueue {
49 entries: VecDeque<DeferredEntry>,
50 capacity: usize,
51 next_id: u64,
52}
53
54impl DeferredQueue {
55 pub fn new(capacity: usize) -> Self {
57 Self {
58 entries: VecDeque::with_capacity(capacity.min(1024)),
59 capacity,
60 next_id: 1,
61 }
62 }
63
64 #[allow(clippy::too_many_arguments)]
78 pub fn enqueue(
79 &mut self,
80 peer_id: u64,
81 user_id: u64,
82 tenant_id: u32,
83 delta: Vec<u8>,
84 collection: String,
85 constraint_name: String,
86 attempt: u32,
87 max_retries: u32,
88 now_ms: u64,
89 first_retry_after_ms: u64,
90 ttl_secs: u64,
91 ) -> u64 {
92 let id = self.next_id;
93 self.next_id += 1;
94
95 let entry = DeferredEntry {
96 id,
97 peer_id,
98 user_id,
99 tenant_id,
100 delta,
101 collection,
102 constraint_name,
103 attempt,
104 max_retries,
105 next_retry_ms: now_ms + first_retry_after_ms,
106 ttl_deadline_ms: now_ms + (ttl_secs * 1000),
107 };
108
109 self.entries.push_back(entry);
110 id
111 }
112
113 pub fn poll_ready(&mut self, now_ms: u64) -> Vec<DeferredEntry> {
117 let mut ready = Vec::new();
118
119 while let Some(front) = self.entries.front() {
120 if front.next_retry_ms <= now_ms
121 && let Some(entry) = self.entries.pop_front()
122 {
123 ready.push(entry);
124 } else {
125 break;
126 }
127 }
128
129 ready
130 }
131
132 pub fn expire(&mut self, now_ms: u64) -> Vec<DeferredEntry> {
137 let mut expired = Vec::new();
138
139 self.entries.retain(|entry| {
140 if entry.ttl_deadline_ms <= now_ms {
141 expired.push(entry.clone());
142 false
143 } else {
144 true
145 }
146 });
147
148 expired
149 }
150
151 pub fn enqueue_retry(&mut self, mut entry: DeferredEntry, now_ms: u64) {
155 let base_ms = 500u64;
156 let backoff = base_ms
157 .saturating_mul(2_u64.saturating_pow(entry.attempt))
158 .min(30_000);
159
160 entry.attempt += 1;
161 entry.next_retry_ms = now_ms + backoff;
162
163 self.entries.push_back(entry);
164 }
165
166 pub fn len(&self) -> usize {
168 self.entries.len()
169 }
170
171 pub fn is_empty(&self) -> bool {
172 self.entries.is_empty()
173 }
174
175 pub fn capacity(&self) -> usize {
177 self.capacity
178 }
179
180 pub fn clear(&mut self) {
182 self.entries.clear();
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 #[test]
191 fn enqueue_and_poll_ready() {
192 let mut queue = DeferredQueue::new(10);
193 let now = 1000;
194
195 queue.enqueue(
197 42,
198 0,
199 0,
200 b"delta".to_vec(),
201 "posts".to_string(),
202 "posts_author_fk".to_string(),
203 0,
204 3,
205 now,
206 500,
207 60,
208 );
209
210 assert!(queue.poll_ready(now).is_empty());
212
213 let ready = queue.poll_ready(now + 500);
215 assert_eq!(ready.len(), 1);
216 assert_eq!(ready[0].attempt, 0);
217 assert!(queue.is_empty());
218 }
219
220 #[test]
221 fn expire_past_ttl() {
222 let mut queue = DeferredQueue::new(10);
223 let now = 1000;
224
225 queue.enqueue(
227 42,
228 0,
229 0,
230 b"delta".to_vec(),
231 "posts".to_string(),
232 "posts_author_fk".to_string(),
233 0,
234 3,
235 now,
236 500,
237 10,
238 );
239
240 assert!(queue.expire(now + 5000).is_empty());
242 assert_eq!(queue.len(), 1);
243
244 let expired = queue.expire(now + 11_000);
246 assert_eq!(expired.len(), 1);
247 assert!(queue.is_empty());
248 }
249
250 #[test]
251 fn exponential_backoff() {
252 let mut queue = DeferredQueue::new(10);
253 let now = 1000;
254
255 let id = queue.enqueue(
257 42,
258 0,
259 0,
260 b"delta".to_vec(),
261 "posts".to_string(),
262 "posts_author_fk".to_string(),
263 0,
264 3,
265 now,
266 500,
267 60,
268 );
269
270 let ready = queue.poll_ready(now + 500);
272 assert_eq!(ready.len(), 1);
273 let entry = &ready[0];
274 assert_eq!(entry.id, id);
275 assert_eq!(entry.attempt, 0);
276
277 let entry_clone = ready[0].clone();
279 queue.enqueue_retry(entry_clone, now + 500);
280
281 let ready = queue.poll_ready(now + 1500);
283 assert_eq!(ready.len(), 1);
284 assert_eq!(ready[0].attempt, 1);
285
286 queue.enqueue_retry(ready[0].clone(), now + 1500);
288
289 let ready = queue.poll_ready(now + 3500);
291 assert_eq!(ready.len(), 1);
292 assert_eq!(ready[0].attempt, 2);
293 }
294
295 #[test]
296 fn max_retries_respected() {
297 let mut queue = DeferredQueue::new(10);
298 let now = 1000;
299
300 queue.enqueue(
302 42,
303 0,
304 0,
305 b"delta".to_vec(),
306 "posts".to_string(),
307 "posts_author_fk".to_string(),
308 0,
309 3,
310 now,
311 500,
312 60,
313 );
314
315 let ready = queue.poll_ready(now + 500);
316 let mut entry = ready[0].clone();
317 assert_eq!(entry.max_retries, 3);
318
319 for _ in 0..3 {
321 entry.attempt += 1;
322 assert!(entry.attempt <= entry.max_retries);
323 }
324
325 assert_eq!(entry.attempt, 3);
327 assert!(entry.attempt >= entry.max_retries);
328 }
329
330 #[test]
331 fn fifo_ordering() {
332 let mut queue = DeferredQueue::new(10);
333 let now = 1000;
334
335 for i in 0..3 {
337 queue.enqueue(
338 40 + i,
339 0,
340 0,
341 format!("delta{}", i).into_bytes(),
342 "posts".to_string(),
343 "posts_author_fk".to_string(),
344 0,
345 3,
346 now,
347 500,
348 60,
349 );
350 }
351
352 assert_eq!(queue.len(), 3);
353
354 let ready = queue.poll_ready(now + 500);
355 assert_eq!(ready.len(), 3);
356
357 assert_eq!(ready[0].peer_id, 40);
359 assert_eq!(ready[1].peer_id, 41);
360 assert_eq!(ready[2].peer_id, 42);
361 }
362
363 #[test]
364 fn capacity_limit() {
365 let mut queue = DeferredQueue::new(2);
366 let now = 1000;
367
368 queue.enqueue(
369 1,
370 0,
371 0,
372 b"d1".to_vec(),
373 "c".into(),
374 "cn".into(),
375 0,
376 3,
377 now,
378 500,
379 60,
380 );
381 queue.enqueue(
382 2,
383 0,
384 0,
385 b"d2".to_vec(),
386 "c".into(),
387 "cn".into(),
388 0,
389 3,
390 now,
391 500,
392 60,
393 );
394
395 assert_eq!(queue.len(), 2);
396
397 queue.enqueue(
399 3,
400 0,
401 0,
402 b"d3".to_vec(),
403 "c".into(),
404 "cn".into(),
405 0,
406 3,
407 now,
408 500,
409 60,
410 );
411 assert_eq!(queue.len(), 3); }
413
414 #[test]
415 fn clear_empties_queue() {
416 let mut queue = DeferredQueue::new(10);
417 let now = 1000;
418
419 queue.enqueue(
420 1,
421 0,
422 0,
423 b"d1".to_vec(),
424 "c".into(),
425 "cn".into(),
426 0,
427 3,
428 now,
429 500,
430 60,
431 );
432 queue.enqueue(
433 2,
434 0,
435 0,
436 b"d2".to_vec(),
437 "c".into(),
438 "cn".into(),
439 0,
440 3,
441 now,
442 500,
443 60,
444 );
445
446 assert_eq!(queue.len(), 2);
447 queue.clear();
448 assert!(queue.is_empty());
449 }
450}