1use std::collections::{BinaryHeap, HashMap, HashSet};
8use std::time::{Duration, Instant};
9
10use crate::logging::debug;
11
12use crate::ant_protocol::XorName;
13use crate::replication::types::{FetchCandidate, VerificationEntry};
14use saorsa_core::identity::PeerId;
15
16#[derive(Debug, Clone)]
22pub struct InFlightEntry {
23 pub key: XorName,
25 pub source: PeerId,
27 pub started_at: Instant,
29 pub all_sources: Vec<PeerId>,
31 pub tried: HashSet<PeerId>,
33}
34
35pub struct ReplicationQueues {
46 pending_verify: HashMap<XorName, VerificationEntry>,
50 fetch_queue: BinaryHeap<FetchCandidate>,
54 fetch_queue_keys: HashSet<XorName>,
56 in_flight_fetch: HashMap<XorName, InFlightEntry>,
58}
59
60impl Default for ReplicationQueues {
61 fn default() -> Self {
62 Self::new()
63 }
64}
65
66impl ReplicationQueues {
67 #[must_use]
69 pub fn new() -> Self {
70 Self {
71 pending_verify: HashMap::new(),
72 fetch_queue: BinaryHeap::new(),
73 fetch_queue_keys: HashSet::new(),
74 in_flight_fetch: HashMap::new(),
75 }
76 }
77
78 pub fn add_pending_verify(&mut self, key: XorName, entry: VerificationEntry) -> bool {
86 if self.contains_key(&key) {
87 return false;
88 }
89 self.pending_verify.insert(key, entry);
90 true
91 }
92
93 #[must_use]
95 pub fn get_pending(&self, key: &XorName) -> Option<&VerificationEntry> {
96 self.pending_verify.get(key)
97 }
98
99 pub fn get_pending_mut(&mut self, key: &XorName) -> Option<&mut VerificationEntry> {
101 self.pending_verify.get_mut(key)
102 }
103
104 pub fn remove_pending(&mut self, key: &XorName) -> Option<VerificationEntry> {
106 self.pending_verify.remove(key)
107 }
108
109 #[must_use]
111 pub fn pending_keys(&self) -> Vec<XorName> {
112 self.pending_verify.keys().copied().collect()
113 }
114
115 #[must_use]
117 pub fn pending_count(&self) -> usize {
118 self.pending_verify.len()
119 }
120
121 pub fn enqueue_fetch(&mut self, key: XorName, distance: XorName, sources: Vec<PeerId>) {
130 if self.pending_verify.contains_key(&key)
131 || self.fetch_queue_keys.contains(&key)
132 || self.in_flight_fetch.contains_key(&key)
133 {
134 return;
135 }
136 self.fetch_queue_keys.insert(key);
137 self.fetch_queue.push(FetchCandidate {
138 key,
139 distance,
140 sources,
141 });
142 }
143
144 pub fn dequeue_fetch(&mut self) -> Option<FetchCandidate> {
150 while let Some(candidate) = self.fetch_queue.pop() {
151 self.fetch_queue_keys.remove(&candidate.key);
152 if !self.in_flight_fetch.contains_key(&candidate.key) {
153 return Some(candidate);
154 }
155 }
156 None
157 }
158
159 #[must_use]
161 pub fn fetch_queue_count(&self) -> usize {
162 self.fetch_queue.len()
163 }
164
165 pub fn start_fetch(&mut self, key: XorName, source: PeerId, all_sources: Vec<PeerId>) {
171 let mut tried = HashSet::new();
172 tried.insert(source);
173 self.in_flight_fetch.insert(
174 key,
175 InFlightEntry {
176 key,
177 source,
178 started_at: Instant::now(),
179 all_sources,
180 tried,
181 },
182 );
183 }
184
185 pub fn complete_fetch(&mut self, key: &XorName) -> Option<InFlightEntry> {
187 self.in_flight_fetch.remove(key)
188 }
189
190 pub fn retry_fetch(&mut self, key: &XorName) -> Option<PeerId> {
195 let entry = self.in_flight_fetch.get_mut(key)?;
196 entry.tried.insert(entry.source);
197
198 let next = entry
199 .all_sources
200 .iter()
201 .find(|p| !entry.tried.contains(p))
202 .copied();
203
204 if let Some(next_peer) = next {
205 entry.source = next_peer;
206 entry.tried.insert(next_peer);
207 Some(next_peer)
208 } else {
209 None
210 }
211 }
212
213 #[must_use]
215 pub fn in_flight_count(&self) -> usize {
216 self.in_flight_fetch.len()
217 }
218
219 #[must_use]
225 pub fn contains_key(&self, key: &XorName) -> bool {
226 self.pending_verify.contains_key(key)
227 || self.fetch_queue_keys.contains(key)
228 || self.in_flight_fetch.contains_key(key)
229 }
230
231 #[must_use]
235 pub fn is_bootstrap_work_empty(&self, bootstrap_keys: &HashSet<XorName>) -> bool {
236 !bootstrap_keys.iter().any(|k| self.contains_key(k))
237 }
238
239 pub fn evict_stale(&mut self, max_age: Duration) {
241 let now = Instant::now();
242 let before = self.pending_verify.len();
243 self.pending_verify
244 .retain(|_, entry| now.duration_since(entry.created_at) < max_age);
245 let evicted = before.saturating_sub(self.pending_verify.len());
246 if evicted > 0 {
247 debug!("Evicted {evicted} stale pending-verification entries");
248 }
249 }
250}
251
252#[cfg(test)]
257#[allow(clippy::unwrap_used, clippy::expect_used)]
258mod tests {
259 use std::collections::HashSet;
260 use std::time::{Duration, Instant};
261
262 use super::*;
263 use crate::replication::types::{HintPipeline, VerificationState};
264
265 fn peer_id_from_byte(b: u8) -> PeerId {
267 let mut bytes = [0u8; 32];
268 bytes[0] = b;
269 PeerId::from_bytes(bytes)
270 }
271
272 fn xor_name_from_byte(b: u8) -> XorName {
274 [b; 32]
275 }
276
277 fn test_entry(sender_byte: u8) -> VerificationEntry {
279 VerificationEntry {
280 state: VerificationState::PendingVerify,
281 pipeline: HintPipeline::Replica,
282 verified_sources: Vec::new(),
283 tried_sources: HashSet::new(),
284 created_at: Instant::now(),
285 hint_sender: peer_id_from_byte(sender_byte),
286 }
287 }
288
289 #[test]
292 fn add_pending_verify_new_key_succeeds() {
293 let mut queues = ReplicationQueues::new();
294 let key = xor_name_from_byte(0x01);
295 assert!(queues.add_pending_verify(key, test_entry(1)));
296 assert_eq!(queues.pending_count(), 1);
297 }
298
299 #[test]
300 fn add_pending_verify_duplicate_rejected() {
301 let mut queues = ReplicationQueues::new();
302 let key = xor_name_from_byte(0x01);
303 assert!(queues.add_pending_verify(key, test_entry(1)));
304 assert!(!queues.add_pending_verify(key, test_entry(2)));
305 assert_eq!(queues.pending_count(), 1);
306 }
307
308 #[test]
309 fn add_pending_verify_rejected_if_in_fetch_queue() {
310 let mut queues = ReplicationQueues::new();
311 let key = xor_name_from_byte(0x02);
312 let distance = xor_name_from_byte(0x10);
313 queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(1)]);
314
315 assert!(
316 !queues.add_pending_verify(key, test_entry(1)),
317 "should reject key already in fetch queue"
318 );
319 }
320
321 #[test]
322 fn add_pending_verify_rejected_if_in_flight() {
323 let mut queues = ReplicationQueues::new();
324 let key = xor_name_from_byte(0x03);
325 let source = peer_id_from_byte(1);
326 queues.start_fetch(key, source, vec![source]);
327
328 assert!(
329 !queues.add_pending_verify(key, test_entry(1)),
330 "should reject key already in-flight"
331 );
332 }
333
334 #[test]
337 fn dequeue_returns_nearest_first() {
338 let mut queues = ReplicationQueues::new();
339
340 let near_key = xor_name_from_byte(0x01);
341 let far_key = xor_name_from_byte(0x02);
342 let near_dist = [0x00; 32]; let far_dist = [0xFF; 32]; queues.enqueue_fetch(far_key, far_dist, vec![peer_id_from_byte(1)]);
346 queues.enqueue_fetch(near_key, near_dist, vec![peer_id_from_byte(2)]);
347
348 let first = queues.dequeue_fetch().expect("should dequeue");
349 assert_eq!(first.key, near_key, "nearest key should dequeue first");
350
351 let second = queues.dequeue_fetch().expect("should dequeue");
352 assert_eq!(second.key, far_key, "farthest key should dequeue second");
353 }
354
355 #[test]
356 fn enqueue_dedup_prevents_duplicates() {
357 let mut queues = ReplicationQueues::new();
358 let key = xor_name_from_byte(0x01);
359
360 queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(1)]);
361 queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(2)]);
362
363 assert_eq!(
364 queues.fetch_queue_count(),
365 1,
366 "duplicate enqueue should be ignored"
367 );
368 }
369
370 #[test]
373 fn start_and_complete_fetch() {
374 let mut queues = ReplicationQueues::new();
375 let key = xor_name_from_byte(0x01);
376 let source = peer_id_from_byte(1);
377
378 queues.start_fetch(key, source, vec![source]);
379 assert_eq!(queues.in_flight_count(), 1);
380
381 let completed = queues.complete_fetch(&key);
382 assert!(completed.is_some());
383 assert_eq!(queues.in_flight_count(), 0);
384 }
385
386 #[test]
387 fn complete_nonexistent_returns_none() {
388 let mut queues = ReplicationQueues::new();
389 let key = xor_name_from_byte(0x99);
390 assert!(queues.complete_fetch(&key).is_none());
391 }
392
393 #[test]
396 fn retry_fetch_returns_next_untried_source() {
397 let mut queues = ReplicationQueues::new();
398 let key = xor_name_from_byte(0x01);
399 let source_a = peer_id_from_byte(1);
400 let source_b = peer_id_from_byte(2);
401 let source_c = peer_id_from_byte(3);
402
403 queues.start_fetch(key, source_a, vec![source_a, source_b, source_c]);
404
405 let next = queues.retry_fetch(&key);
407 assert_eq!(next, Some(source_b));
408
409 let next = queues.retry_fetch(&key);
411 assert_eq!(next, Some(source_c));
412
413 let next = queues.retry_fetch(&key);
415 assert!(next.is_none(), "all sources exhausted");
416 }
417
418 #[test]
419 fn retry_fetch_nonexistent_returns_none() {
420 let mut queues = ReplicationQueues::new();
421 assert!(queues.retry_fetch(&xor_name_from_byte(0xFF)).is_none());
422 }
423
424 #[test]
427 fn contains_key_in_pending() {
428 let mut queues = ReplicationQueues::new();
429 let key = xor_name_from_byte(0x01);
430 queues.add_pending_verify(key, test_entry(1));
431 assert!(queues.contains_key(&key));
432 }
433
434 #[test]
435 fn contains_key_in_fetch_queue() {
436 let mut queues = ReplicationQueues::new();
437 let key = xor_name_from_byte(0x02);
438 queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(1)]);
439 assert!(queues.contains_key(&key));
440 }
441
442 #[test]
443 fn contains_key_in_flight() {
444 let mut queues = ReplicationQueues::new();
445 let key = xor_name_from_byte(0x03);
446 queues.start_fetch(key, peer_id_from_byte(1), vec![]);
447 assert!(queues.contains_key(&key));
448 }
449
450 #[test]
451 fn contains_key_absent() {
452 let queues = ReplicationQueues::new();
453 assert!(!queues.contains_key(&xor_name_from_byte(0xFF)));
454 }
455
456 #[test]
459 fn bootstrap_work_empty_when_no_keys_present() {
460 let queues = ReplicationQueues::new();
461 let bootstrap_keys: HashSet<XorName> = [xor_name_from_byte(0x01), xor_name_from_byte(0x02)]
462 .into_iter()
463 .collect();
464 assert!(queues.is_bootstrap_work_empty(&bootstrap_keys));
465 }
466
467 #[test]
468 fn bootstrap_work_not_empty_when_key_in_pending() {
469 let mut queues = ReplicationQueues::new();
470 let key = xor_name_from_byte(0x01);
471 queues.add_pending_verify(key, test_entry(1));
472
473 let bootstrap_keys: HashSet<XorName> = std::iter::once(key).collect();
474 assert!(!queues.is_bootstrap_work_empty(&bootstrap_keys));
475 }
476
477 #[test]
480 fn evict_stale_removes_old_entries() {
481 let mut queues = ReplicationQueues::new();
482 let key = xor_name_from_byte(0x01);
483
484 let mut entry = test_entry(1);
487 entry.created_at = Instant::now()
488 .checked_sub(Duration::from_secs(2))
489 .unwrap_or_else(Instant::now);
490 queues.pending_verify.insert(key, entry);
491
492 assert_eq!(queues.pending_count(), 1);
493 queues.evict_stale(Duration::from_secs(1));
494 assert_eq!(
495 queues.pending_count(),
496 0,
497 "entry older than max_age should be evicted"
498 );
499 }
500
501 #[test]
502 fn evict_stale_keeps_fresh_entries() {
503 let mut queues = ReplicationQueues::new();
504 let key = xor_name_from_byte(0x01);
505 queues.add_pending_verify(key, test_entry(1));
506
507 queues.evict_stale(Duration::from_secs(3600));
508 assert_eq!(
509 queues.pending_count(),
510 1,
511 "fresh entry should not be evicted"
512 );
513 }
514
515 #[test]
518 fn remove_pending_returns_entry() {
519 let mut queues = ReplicationQueues::new();
520 let key = xor_name_from_byte(0x01);
521 queues.add_pending_verify(key, test_entry(1));
522
523 let removed = queues.remove_pending(&key);
524 assert!(removed.is_some());
525 assert_eq!(queues.pending_count(), 0);
526 }
527
528 #[test]
529 fn remove_pending_nonexistent_returns_none() {
530 let mut queues = ReplicationQueues::new();
531 assert!(queues.remove_pending(&xor_name_from_byte(0xFF)).is_none());
532 }
533
534 #[test]
542 fn scenario_8_duplicate_key_not_double_queued() {
543 let mut queues = ReplicationQueues::new();
544 let key = xor_name_from_byte(0xE0);
545 let distance = xor_name_from_byte(0x10);
546
547 assert!(
549 queues.add_pending_verify(key, test_entry(1)),
550 "first add to PendingVerify should succeed"
551 );
552 assert!(
553 queues.contains_key(&key),
554 "key should be present in pipeline"
555 );
556
557 queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(2)]);
562 assert!(queues.contains_key(&key), "key should still be in pipeline");
564
565 queues.remove_pending(&key);
567 queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(3)]);
568 assert_eq!(queues.fetch_queue_count(), 1);
569
570 assert!(
572 !queues.add_pending_verify(key, test_entry(4)),
573 "key in FetchQueue should be rejected from PendingVerify"
574 );
575
576 let candidate = queues.dequeue_fetch().expect("should dequeue");
578 queues.start_fetch(
579 candidate.key,
580 candidate.sources[0],
581 candidate.sources.clone(),
582 );
583
584 assert!(
586 !queues.add_pending_verify(key, test_entry(5)),
587 "key in-flight should be rejected from PendingVerify"
588 );
589
590 queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(6)]);
592 assert_eq!(
594 queues.fetch_queue_count(),
595 0,
596 "enqueue_fetch should be no-op for in-flight key"
597 );
598 }
599
600 #[test]
604 fn scenario_8_replica_and_paid_hint_collapses_to_replica() {
605 let mut queues = ReplicationQueues::new();
606 let key = xor_name_from_byte(0xE1);
607
608 let entry = VerificationEntry {
611 state: VerificationState::PendingVerify,
612 pipeline: HintPipeline::Replica, verified_sources: Vec::new(),
614 tried_sources: HashSet::new(),
615 created_at: Instant::now(),
616 hint_sender: peer_id_from_byte(1),
617 };
618
619 assert!(queues.add_pending_verify(key, entry));
620
621 let pending = queues.get_pending(&key).expect("should be pending");
622 assert_eq!(
623 pending.pipeline,
624 HintPipeline::Replica,
625 "key in both hint sets should be Replica pipeline"
626 );
627
628 let paid_entry = VerificationEntry {
630 state: VerificationState::PendingVerify,
631 pipeline: HintPipeline::PaidOnly,
632 verified_sources: Vec::new(),
633 tried_sources: HashSet::new(),
634 created_at: Instant::now(),
635 hint_sender: peer_id_from_byte(2),
636 };
637
638 assert!(
639 !queues.add_pending_verify(key, paid_entry),
640 "duplicate key should be rejected regardless of pipeline"
641 );
642
643 let pending = queues.get_pending(&key).expect("should still be pending");
645 assert_eq!(
646 pending.pipeline,
647 HintPipeline::Replica,
648 "pipeline should remain Replica after duplicate rejection"
649 );
650 }
651
652 #[test]
660 fn scenario_3_neighbor_sync_quorum_pass_full_pipeline() {
661 let mut queues = ReplicationQueues::new();
662 let key = xor_name_from_byte(0x03);
663 let distance = xor_name_from_byte(0x01);
664 let source_a = peer_id_from_byte(1);
665 let source_b = peer_id_from_byte(2);
666 let hint_sender = peer_id_from_byte(3);
667
668 let entry = VerificationEntry {
670 state: VerificationState::PendingVerify,
671 pipeline: HintPipeline::Replica,
672 verified_sources: Vec::new(),
673 tried_sources: HashSet::new(),
674 created_at: Instant::now(),
675 hint_sender,
676 };
677 assert!(
678 queues.add_pending_verify(key, entry),
679 "new key should be admitted to PendingVerify"
680 );
681 assert!(queues.contains_key(&key));
682 assert_eq!(queues.pending_count(), 1);
683
684 let removed = queues.remove_pending(&key);
687 assert!(removed.is_some(), "key should exist in pending");
688 assert_eq!(queues.pending_count(), 0);
689
690 queues.enqueue_fetch(key, distance, vec![source_a, source_b]);
691 assert_eq!(queues.fetch_queue_count(), 1);
692 assert!(
693 queues.contains_key(&key),
694 "key should be in pipeline (fetch queue)"
695 );
696
697 let candidate = queues.dequeue_fetch().expect("should dequeue");
699 assert_eq!(candidate.key, key);
700 assert_eq!(candidate.sources.len(), 2);
701 queues.start_fetch(key, source_a, candidate.sources);
702 assert_eq!(queues.in_flight_count(), 1);
703 assert_eq!(queues.fetch_queue_count(), 0);
704 assert!(
705 queues.contains_key(&key),
706 "key should be in pipeline (in-flight)"
707 );
708
709 let completed = queues.complete_fetch(&key);
711 assert!(
712 completed.is_some(),
713 "should have in-flight entry to complete"
714 );
715 assert_eq!(queues.in_flight_count(), 0);
716 assert!(
717 !queues.contains_key(&key),
718 "key should be fully processed out of pipeline"
719 );
720 }
721}