ant_node/replication/scheduling.rs
1//! Scheduling and queue management (Section 12).
2//!
3//! Manages `PendingVerify`, `FetchQueue`, and `InFlightFetch` queues for the
4//! replication pipeline. Each key progresses through at most one queue at a
5//! time, with strict dedup across all three stages.
6
7use std::collections::{BinaryHeap, HashMap, HashSet};
8use std::time::{Duration, Instant};
9
10use crate::logging::debug;
11
12use crate::ant_protocol::XorName;
13use crate::replication::types::{
14 FetchCandidate, HintPipeline, VerificationEntry, VerificationState,
15};
16use saorsa_core::identity::PeerId;
17
18/// Global hard upper bound on the number of keys held in `pending_verify`.
19///
20/// Without a bound, a peer in the local routing table can flood
21/// `NeighborSyncRequest` messages (each capped only by
22/// `MAX_REPLICATION_MESSAGE_SIZE` ≈ 10 MiB, i.e. ~320k 32-byte hints per
23/// message) and grow this map without limit, exhausting node memory and
24/// driving a self-amplifying storm of outbound verification requests.
25///
26/// `131_072` entries is far above any legitimate aggregate need while
27/// bounding worst-case memory to a few tens of MiB (each `VerificationEntry`
28/// is on the order of a few hundred bytes; its sub-collections are populated
29/// only from close-group-sized verification evidence, never from attacker
30/// hint volume).
31///
32/// This global cap alone is **not** sufficient: with blind capacity-reject a
33/// single malicious routing-table peer could fill the whole map with cheap
34/// admission-passing junk and starve every honest peer's hints until the
35/// 30-minute `evict_stale` backstop fires (and re-fill immediately after).
36/// Honest-replication fairness is therefore enforced by
37/// [`MAX_PENDING_VERIFY_PER_PEER`] below; this global value is only the
38/// memory backstop.
39pub const MAX_PENDING_VERIFY: usize = 131_072;
40
41/// Per-source hard cap on `pending_verify` entries attributed to a single
42/// `hint_sender` peer.
43///
44/// This is the actual D1 defence. Each pending entry records the peer that
45/// hinted it (`VerificationEntry::hint_sender`); a single source may occupy
46/// at most this many slots. A flooding peer can therefore consume only its
47/// own quota — it can never deny slots to honest peers, because honest
48/// sources are accounted independently. Set well above any legitimate
49/// per-peer hint working set (a healthy neighbour syncs at most a few
50/// thousand keys to us per cycle) yet small enough that
51/// `MAX_PENDING_VERIFY / MAX_PENDING_VERIFY_PER_PEER` distinct malicious
52/// peers would be required to approach the global cap.
53///
54/// Residual (accepted, follow-up): with the current ratio, ~16 distinct
55/// `PeerId`s that are *all* simultaneously in the victim's routing table
56/// (gated by `sender_in_rt`) could still collectively reach the global
57/// `MAX_PENDING_VERIFY` backstop. `hint_sender` is the cryptographically
58/// authenticated connection identity (not a forgeable payload field), so
59/// this requires running ~16 real Kademlia-adjacent Sybil nodes — a large
60/// step up from the single-peer pre-fix attack, and the worst case degrades
61/// only to the bounded memory backstop, not silent permanent starvation of
62/// non-Sybil peers (each keeps its independent quota). A future hardening
63/// (reserved headroom for under-quota sources, or a per-source cap that
64/// scales with distinct-source pressure) is tracked as a follow-up and is
65/// intentionally out of scope for this `DoS` fix.
66pub const MAX_PENDING_VERIFY_PER_PEER: usize = 8_192;
67
68/// Hard upper bound on the number of keys held in `fetch_queue`.
69///
70/// `fetch_queue` is fed only by `enqueue_fetch`, which is reached **after** a
71/// key passes quorum verification in `run_verification_cycle` — attacker junk
72/// keys (no real holder) fail quorum and never reach this stage, so the
73/// bounded-and-fair `pending_verify` upstream is the primary protection. This
74/// global cap remains as a defence-in-depth memory backstop and is dropped
75/// (consistent with the existing cross-queue-dedup no-op contract of
76/// `enqueue_fetch`) when full.
77pub const MAX_FETCH_QUEUE: usize = 131_072;
78
79/// Outcome of [`ReplicationQueues::add_pending_verify`].
80///
81/// Distinguishes "the key is already being handled" from "the key was
82/// silently dropped due to a queue capacity bound". Bootstrap drain
83/// accounting and source-side retry logic MUST treat `CapacityRejected` as
84/// outstanding work; treating it like a dedup hit was the silent-drop
85/// regression introduced when the queues first became bounded.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum AdmissionResult {
88 /// New entry inserted into `pending_verify`.
89 Admitted,
90 /// Key was already in some pipeline stage; the existing entry is left
91 /// in place. No retry required.
92 AlreadyPresent,
93 /// Global or per-source capacity bound rejected the entry. The caller
94 /// MUST treat this as work still to do (not as silently completed).
95 CapacityRejected,
96}
97
98impl AdmissionResult {
99 /// `true` only for [`AdmissionResult::Admitted`]. Preserves call sites
100 /// that only want to know "did the insert happen".
101 #[must_use]
102 pub fn admitted(self) -> bool {
103 matches!(self, Self::Admitted)
104 }
105}
106
107// ---------------------------------------------------------------------------
108// In-flight entry
109// ---------------------------------------------------------------------------
110
111/// An in-flight fetch entry tracking an active download.
112#[derive(Debug, Clone)]
113pub struct InFlightEntry {
114 /// The key being fetched.
115 pub key: XorName,
116 /// The peer we are currently fetching from.
117 pub source: PeerId,
118 /// When the fetch started.
119 pub started_at: Instant,
120 /// All verified sources for this key.
121 pub all_sources: Vec<PeerId>,
122 /// Sources already attempted (failed or in progress).
123 pub tried: HashSet<PeerId>,
124}
125
126// ---------------------------------------------------------------------------
127// Central queue manager
128// ---------------------------------------------------------------------------
129
130/// Central queue manager for the replication pipeline.
131///
132/// Maintains three stages of the pipeline with global dedup:
133/// 1. **`PendingVerify`** -- keys awaiting quorum verification.
134/// 2. **`FetchQueue`** -- quorum-passed keys waiting for a fetch slot.
135/// 3. **`InFlightFetch`** -- keys actively being downloaded.
136pub struct ReplicationQueues {
137 /// Keys awaiting quorum result (dedup by key).
138 ///
139 /// Capacity-bounded by [`MAX_PENDING_VERIFY`]: admissions are rejected
140 /// once full, preventing unbounded growth under a network hint flood.
141 pending_verify: HashMap<XorName, VerificationEntry>,
142 /// Presence-quorum-passed or paid-list-authorized keys waiting for fetch.
143 ///
144 /// Capacity-bounded by [`MAX_FETCH_QUEUE`]: enqueues are dropped once
145 /// full, preventing unbounded growth under a network hint flood.
146 fetch_queue: BinaryHeap<FetchCandidate>,
147 /// Keys present in `fetch_queue` for O(1) dedup.
148 fetch_queue_keys: HashSet<XorName>,
149 /// Active downloads keyed by `XorName`.
150 in_flight_fetch: HashMap<XorName, InFlightEntry>,
151 /// Number of `pending_verify` entries currently attributed to each
152 /// `hint_sender` peer. Maintained in lockstep with `pending_verify`
153 /// (insert/remove/evict) so the per-peer quota
154 /// ([`MAX_PENDING_VERIFY_PER_PEER`]) can be enforced in O(1). An entry is
155 /// removed from this map when its count reaches zero so the map itself is
156 /// bounded by the number of distinct currently-pending sources.
157 pending_per_sender: HashMap<PeerId, usize>,
158}
159
160impl Default for ReplicationQueues {
161 fn default() -> Self {
162 Self::new()
163 }
164}
165
166impl ReplicationQueues {
167 /// Create new empty queues.
168 #[must_use]
169 pub fn new() -> Self {
170 Self {
171 pending_verify: HashMap::new(),
172 fetch_queue: BinaryHeap::new(),
173 fetch_queue_keys: HashSet::new(),
174 in_flight_fetch: HashMap::new(),
175 pending_per_sender: HashMap::new(),
176 }
177 }
178
179 // -----------------------------------------------------------------------
180 // PendingVerify
181 // -----------------------------------------------------------------------
182
183 /// Add a key to pending verification if not already present in any queue.
184 ///
185 /// Returns an [`AdmissionResult`] distinguishing the three outcomes:
186 /// * `Admitted` — newly inserted.
187 /// * `AlreadyPresent` — Rule 8 cross-queue dedup (the key is already in
188 /// `pending_verify`, `fetch_queue`, or `in_flight_fetch`); the existing
189 /// entry remains and there is no work to retry.
190 /// * `CapacityRejected` — global or per-source bound hit; the work is
191 /// genuinely lost and the caller (e.g. bootstrap drain accounting,
192 /// source-side retry) MUST treat this as still-outstanding work, not as
193 /// "done". Without this distinction a bootstrap snapshot whose hints
194 /// are capacity-rejected would silently mark itself drained.
195 pub fn add_pending_verify(
196 &mut self,
197 key: XorName,
198 entry: VerificationEntry,
199 ) -> AdmissionResult {
200 if self.contains_key(&key) {
201 return AdmissionResult::AlreadyPresent;
202 }
203 if self.pending_verify.len() >= MAX_PENDING_VERIFY {
204 debug!(
205 "pending_verify at global capacity ({MAX_PENDING_VERIFY}); rejecting key {}",
206 hex::encode(key)
207 );
208 return AdmissionResult::CapacityRejected;
209 }
210 let sender = entry.hint_sender;
211 let sender_count = self.pending_per_sender.get(&sender).copied().unwrap_or(0);
212 if sender_count >= MAX_PENDING_VERIFY_PER_PEER {
213 debug!(
214 "peer {sender} at per-source pending cap ({MAX_PENDING_VERIFY_PER_PEER}); \
215 rejecting key {} (honest peers are unaffected)",
216 hex::encode(key)
217 );
218 return AdmissionResult::CapacityRejected;
219 }
220 self.pending_verify.insert(key, entry);
221 *self.pending_per_sender.entry(sender).or_insert(0) += 1;
222 AdmissionResult::Admitted
223 }
224
225 /// Decrement (and prune at zero) the per-sender counter for `sender`.
226 ///
227 /// Kept private so the counter can only move in lockstep with
228 /// `pending_verify` mutations. The decrement uses `saturating_sub` so a
229 /// hypothetical future invariant break (a release without a matching
230 /// admission) self-heals to zero instead of panicking on `usize`
231 /// underflow; `debug_assert!` still surfaces such a break in test builds.
232 fn release_sender_slot(pending_per_sender: &mut HashMap<PeerId, usize>, sender: &PeerId) {
233 if let Some(count) = pending_per_sender.get_mut(sender) {
234 debug_assert!(*count > 0, "per-sender counter underflow for {sender}");
235 *count = count.saturating_sub(1);
236 if *count == 0 {
237 pending_per_sender.remove(sender);
238 }
239 }
240 }
241
242 /// Get a reference to a pending verification entry.
243 #[must_use]
244 pub fn get_pending(&self, key: &XorName) -> Option<&VerificationEntry> {
245 self.pending_verify.get(key)
246 }
247
248 /// Advance a pending entry's verification `state`, returning the entry's
249 /// `pipeline` (so the caller can branch on it) when the key was found.
250 ///
251 /// Replaces a prior `get_pending_mut` which handed out `&mut VerificationEntry`
252 /// and relied on a doc-comment to keep callers from re-assigning
253 /// `hint_sender`. The per-source quota counter (`pending_per_sender`) is
254 /// keyed by `hint_sender` recorded at admission; re-attributing a live
255 /// entry to a different peer would orphan a count and silently desync
256 /// the quota — exactly the silent-starvation class this fix prevents.
257 /// Narrowing the mutation API to a single setter makes that mistake
258 /// impossible to commit by accident.
259 pub fn set_pending_state(
260 &mut self,
261 key: &XorName,
262 state: VerificationState,
263 ) -> Option<HintPipeline> {
264 let entry = self.pending_verify.get_mut(key)?;
265 entry.state = state;
266 Some(entry.pipeline)
267 }
268
269 /// Remove a key from pending verification.
270 pub fn remove_pending(&mut self, key: &XorName) -> Option<VerificationEntry> {
271 let removed = self.pending_verify.remove(key);
272 if let Some(entry) = &removed {
273 Self::release_sender_slot(&mut self.pending_per_sender, &entry.hint_sender);
274 }
275 removed
276 }
277
278 /// Collect all pending verification keys (for batch processing).
279 #[must_use]
280 pub fn pending_keys(&self) -> Vec<XorName> {
281 self.pending_verify.keys().copied().collect()
282 }
283
284 /// Number of keys in pending verification.
285 #[must_use]
286 pub fn pending_count(&self) -> usize {
287 self.pending_verify.len()
288 }
289
290 // -----------------------------------------------------------------------
291 // FetchQueue
292 // -----------------------------------------------------------------------
293
294 /// Enqueue a key for fetch with its distance and verified sources.
295 ///
296 /// Returns `true` if the candidate was enqueued, `false` if it was
297 /// already present in any pipeline stage (Rule 8: cross-queue dedup) or
298 /// the `fetch_queue` is at [`MAX_FETCH_QUEUE`].
299 ///
300 /// Callers that have removed the key from `pending_verify` immediately
301 /// before this call should prefer [`promote_pending_to_fetch`](Self::promote_pending_to_fetch),
302 /// which performs the move atomically and leaves the pending entry in
303 /// place when the fetch queue is full (so verified work is retried on
304 /// the next cycle instead of being silently lost).
305 pub fn enqueue_fetch(&mut self, key: XorName, distance: XorName, sources: Vec<PeerId>) -> bool {
306 if self.pending_verify.contains_key(&key)
307 || self.fetch_queue_keys.contains(&key)
308 || self.in_flight_fetch.contains_key(&key)
309 {
310 return false;
311 }
312 if self.fetch_queue.len() >= MAX_FETCH_QUEUE {
313 debug!(
314 "fetch_queue at capacity ({MAX_FETCH_QUEUE}); dropping new key {}",
315 hex::encode(key)
316 );
317 return false;
318 }
319 self.fetch_queue_keys.insert(key);
320 self.fetch_queue.push(FetchCandidate {
321 key,
322 distance,
323 sources,
324 });
325 true
326 }
327
328 /// Atomically promote a key from `pending_verify` to `fetch_queue`.
329 ///
330 /// Checks `fetch_queue` capacity FIRST, then removes the pending entry
331 /// and enqueues the fetch candidate. If `fetch_queue` is full, the
332 /// pending entry is **left in place** so the next verification cycle
333 /// can retry — preventing the silent-drop regression where a verified
334 /// key removed from `pending_verify` could be dropped by a full fetch
335 /// queue and lost from every stage.
336 ///
337 /// Returns `true` on successful promotion, `false` when the fetch queue
338 /// is at capacity (pending entry preserved).
339 pub fn promote_pending_to_fetch(
340 &mut self,
341 key: XorName,
342 distance: XorName,
343 sources: Vec<PeerId>,
344 ) -> bool {
345 if self.fetch_queue.len() >= MAX_FETCH_QUEUE {
346 debug!(
347 "fetch_queue at capacity ({MAX_FETCH_QUEUE}); leaving {} pending \
348 for retry next cycle",
349 hex::encode(key)
350 );
351 return false;
352 }
353 // Capacity confirmed; safe to release the pending slot and enqueue.
354 let _ = self.remove_pending(&key);
355 // enqueue_fetch returns false only on capacity or already-queued; the
356 // capacity check above and the just-removed pending state make this
357 // succeed. If a concurrent path put the key into fetch_queue/in_flight
358 // between, dropping the duplicate is fine.
359 self.enqueue_fetch(key, distance, sources)
360 }
361
362 /// Dequeue the nearest fetch candidate.
363 ///
364 /// Returns `None` when the queue is empty. Silently skips candidates
365 /// that are somehow already in-flight. Concurrency is enforced by the
366 /// fetch worker, not by this method.
367 pub fn dequeue_fetch(&mut self) -> Option<FetchCandidate> {
368 while let Some(candidate) = self.fetch_queue.pop() {
369 self.fetch_queue_keys.remove(&candidate.key);
370 if !self.in_flight_fetch.contains_key(&candidate.key) {
371 return Some(candidate);
372 }
373 }
374 None
375 }
376
377 /// Number of keys waiting in the fetch queue.
378 #[must_use]
379 pub fn fetch_queue_count(&self) -> usize {
380 self.fetch_queue.len()
381 }
382
383 // -----------------------------------------------------------------------
384 // InFlightFetch
385 // -----------------------------------------------------------------------
386
387 /// Mark a key as in-flight (actively being fetched from `source`).
388 pub fn start_fetch(&mut self, key: XorName, source: PeerId, all_sources: Vec<PeerId>) {
389 let mut tried = HashSet::new();
390 tried.insert(source);
391 self.in_flight_fetch.insert(
392 key,
393 InFlightEntry {
394 key,
395 source,
396 started_at: Instant::now(),
397 all_sources,
398 tried,
399 },
400 );
401 }
402
403 /// Mark a fetch as completed (success or permanent failure).
404 pub fn complete_fetch(&mut self, key: &XorName) -> Option<InFlightEntry> {
405 self.in_flight_fetch.remove(key)
406 }
407
408 /// Mark the current fetch attempt as failed and try the next untried source.
409 ///
410 /// Returns the next source peer if one is available, or `None` if all
411 /// sources have been exhausted.
412 pub fn retry_fetch(&mut self, key: &XorName) -> Option<PeerId> {
413 let entry = self.in_flight_fetch.get_mut(key)?;
414 entry.tried.insert(entry.source);
415
416 let next = entry
417 .all_sources
418 .iter()
419 .find(|p| !entry.tried.contains(p))
420 .copied();
421
422 if let Some(next_peer) = next {
423 entry.source = next_peer;
424 entry.tried.insert(next_peer);
425 Some(next_peer)
426 } else {
427 None
428 }
429 }
430
431 /// Number of in-flight fetches.
432 #[must_use]
433 pub fn in_flight_count(&self) -> usize {
434 self.in_flight_fetch.len()
435 }
436
437 // -----------------------------------------------------------------------
438 // Cross-queue queries
439 // -----------------------------------------------------------------------
440
441 /// Check if a key is present in any pipeline stage.
442 #[must_use]
443 pub fn contains_key(&self, key: &XorName) -> bool {
444 self.pending_verify.contains_key(key)
445 || self.fetch_queue_keys.contains(key)
446 || self.in_flight_fetch.contains_key(key)
447 }
448
449 /// Check if all bootstrap-related work is done.
450 ///
451 /// Returns `true` when none of the given bootstrap keys remain in any queue.
452 #[must_use]
453 pub fn is_bootstrap_work_empty(&self, bootstrap_keys: &HashSet<XorName>) -> bool {
454 !bootstrap_keys.iter().any(|k| self.contains_key(k))
455 }
456
457 /// Evict stale pending-verification entries older than `max_age`.
458 pub fn evict_stale(&mut self, max_age: Duration) {
459 let now = Instant::now();
460 let before = self.pending_verify.len();
461 let pending_per_sender = &mut self.pending_per_sender;
462 self.pending_verify.retain(|_, entry| {
463 let fresh = now.duration_since(entry.created_at) < max_age;
464 if !fresh {
465 Self::release_sender_slot(pending_per_sender, &entry.hint_sender);
466 }
467 fresh
468 });
469 let evicted = before.saturating_sub(self.pending_verify.len());
470 if evicted > 0 {
471 debug!("Evicted {evicted} stale pending-verification entries");
472 }
473 }
474
475 /// Number of `pending_verify` entries currently attributed to `sender`.
476 /// Exposed for tests and observability of the per-source fairness quota.
477 #[must_use]
478 pub fn pending_count_for_sender(&self, sender: &PeerId) -> usize {
479 self.pending_per_sender.get(sender).copied().unwrap_or(0)
480 }
481}
482
483// ---------------------------------------------------------------------------
484// Tests
485// ---------------------------------------------------------------------------
486
487#[cfg(test)]
488#[allow(clippy::unwrap_used, clippy::expect_used)]
489mod tests {
490 use std::collections::HashSet;
491 use std::time::{Duration, Instant};
492
493 use super::*;
494
495 /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
496 fn peer_id_from_byte(b: u8) -> PeerId {
497 let mut bytes = [0u8; 32];
498 bytes[0] = b;
499 PeerId::from_bytes(bytes)
500 }
501
502 /// Build an `XorName` from a single byte (repeated to 32 bytes).
503 fn xor_name_from_byte(b: u8) -> XorName {
504 [b; 32]
505 }
506
507 /// Create a minimal `VerificationEntry` for testing.
508 fn test_entry(sender_byte: u8) -> VerificationEntry {
509 VerificationEntry {
510 state: VerificationState::PendingVerify,
511 pipeline: HintPipeline::Replica,
512 verified_sources: Vec::new(),
513 tried_sources: HashSet::new(),
514 created_at: Instant::now(),
515 hint_sender: peer_id_from_byte(sender_byte),
516 }
517 }
518
519 // -- add_pending_verify dedup ------------------------------------------
520
521 #[test]
522 fn add_pending_verify_new_key_succeeds() {
523 let mut queues = ReplicationQueues::new();
524 let key = xor_name_from_byte(0x01);
525 assert!(queues.add_pending_verify(key, test_entry(1)).admitted());
526 assert_eq!(queues.pending_count(), 1);
527 }
528
529 #[test]
530 fn add_pending_verify_duplicate_rejected() {
531 let mut queues = ReplicationQueues::new();
532 let key = xor_name_from_byte(0x01);
533 assert!(queues.add_pending_verify(key, test_entry(1)).admitted());
534 assert!(!queues.add_pending_verify(key, test_entry(2)).admitted());
535 assert_eq!(queues.pending_count(), 1);
536 }
537
538 #[test]
539 fn add_pending_verify_rejected_if_in_fetch_queue() {
540 let mut queues = ReplicationQueues::new();
541 let key = xor_name_from_byte(0x02);
542 let distance = xor_name_from_byte(0x10);
543 queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(1)]);
544
545 assert!(
546 !queues.add_pending_verify(key, test_entry(1)).admitted(),
547 "should reject key already in fetch queue"
548 );
549 }
550
551 #[test]
552 fn add_pending_verify_rejected_if_in_flight() {
553 let mut queues = ReplicationQueues::new();
554 let key = xor_name_from_byte(0x03);
555 let source = peer_id_from_byte(1);
556 queues.start_fetch(key, source, vec![source]);
557
558 assert!(
559 !queues.add_pending_verify(key, test_entry(1)).admitted(),
560 "should reject key already in-flight"
561 );
562 }
563
564 // -- enqueue/dequeue ordering -----------------------------------------
565
566 #[test]
567 fn dequeue_returns_nearest_first() {
568 let mut queues = ReplicationQueues::new();
569
570 let near_key = xor_name_from_byte(0x01);
571 let far_key = xor_name_from_byte(0x02);
572 let near_dist = [0x00; 32]; // nearest
573 let far_dist = [0xFF; 32]; // farthest
574
575 queues.enqueue_fetch(far_key, far_dist, vec![peer_id_from_byte(1)]);
576 queues.enqueue_fetch(near_key, near_dist, vec![peer_id_from_byte(2)]);
577
578 let first = queues.dequeue_fetch().expect("should dequeue");
579 assert_eq!(first.key, near_key, "nearest key should dequeue first");
580
581 let second = queues.dequeue_fetch().expect("should dequeue");
582 assert_eq!(second.key, far_key, "farthest key should dequeue second");
583 }
584
585 #[test]
586 fn enqueue_dedup_prevents_duplicates() {
587 let mut queues = ReplicationQueues::new();
588 let key = xor_name_from_byte(0x01);
589
590 queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(1)]);
591 queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(2)]);
592
593 assert_eq!(
594 queues.fetch_queue_count(),
595 1,
596 "duplicate enqueue should be ignored"
597 );
598 }
599
600 // -- in-flight tracking -----------------------------------------------
601
602 #[test]
603 fn start_and_complete_fetch() {
604 let mut queues = ReplicationQueues::new();
605 let key = xor_name_from_byte(0x01);
606 let source = peer_id_from_byte(1);
607
608 queues.start_fetch(key, source, vec![source]);
609 assert_eq!(queues.in_flight_count(), 1);
610
611 let completed = queues.complete_fetch(&key);
612 assert!(completed.is_some());
613 assert_eq!(queues.in_flight_count(), 0);
614 }
615
616 #[test]
617 fn complete_nonexistent_returns_none() {
618 let mut queues = ReplicationQueues::new();
619 let key = xor_name_from_byte(0x99);
620 assert!(queues.complete_fetch(&key).is_none());
621 }
622
623 // -- retry_fetch ------------------------------------------------------
624
625 #[test]
626 fn retry_fetch_returns_next_untried_source() {
627 let mut queues = ReplicationQueues::new();
628 let key = xor_name_from_byte(0x01);
629 let source_a = peer_id_from_byte(1);
630 let source_b = peer_id_from_byte(2);
631 let source_c = peer_id_from_byte(3);
632
633 queues.start_fetch(key, source_a, vec![source_a, source_b, source_c]);
634
635 // First retry: should skip source_a (already tried), return source_b.
636 let next = queues.retry_fetch(&key);
637 assert_eq!(next, Some(source_b));
638
639 // Second retry: should return source_c.
640 let next = queues.retry_fetch(&key);
641 assert_eq!(next, Some(source_c));
642
643 // Third retry: all exhausted.
644 let next = queues.retry_fetch(&key);
645 assert!(next.is_none(), "all sources exhausted");
646 }
647
648 #[test]
649 fn retry_fetch_nonexistent_returns_none() {
650 let mut queues = ReplicationQueues::new();
651 assert!(queues.retry_fetch(&xor_name_from_byte(0xFF)).is_none());
652 }
653
654 // -- contains_key across pipelines ------------------------------------
655
656 #[test]
657 fn contains_key_in_pending() {
658 let mut queues = ReplicationQueues::new();
659 let key = xor_name_from_byte(0x01);
660 queues.add_pending_verify(key, test_entry(1));
661 assert!(queues.contains_key(&key));
662 }
663
664 #[test]
665 fn contains_key_in_fetch_queue() {
666 let mut queues = ReplicationQueues::new();
667 let key = xor_name_from_byte(0x02);
668 queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(1)]);
669 assert!(queues.contains_key(&key));
670 }
671
672 #[test]
673 fn contains_key_in_flight() {
674 let mut queues = ReplicationQueues::new();
675 let key = xor_name_from_byte(0x03);
676 queues.start_fetch(key, peer_id_from_byte(1), vec![]);
677 assert!(queues.contains_key(&key));
678 }
679
680 #[test]
681 fn contains_key_absent() {
682 let queues = ReplicationQueues::new();
683 assert!(!queues.contains_key(&xor_name_from_byte(0xFF)));
684 }
685
686 // -- bootstrap work empty ---------------------------------------------
687
688 #[test]
689 fn bootstrap_work_empty_when_no_keys_present() {
690 let queues = ReplicationQueues::new();
691 let bootstrap_keys: HashSet<XorName> = [xor_name_from_byte(0x01), xor_name_from_byte(0x02)]
692 .into_iter()
693 .collect();
694 assert!(queues.is_bootstrap_work_empty(&bootstrap_keys));
695 }
696
697 #[test]
698 fn bootstrap_work_not_empty_when_key_in_pending() {
699 let mut queues = ReplicationQueues::new();
700 let key = xor_name_from_byte(0x01);
701 queues.add_pending_verify(key, test_entry(1));
702
703 let bootstrap_keys: HashSet<XorName> = std::iter::once(key).collect();
704 assert!(!queues.is_bootstrap_work_empty(&bootstrap_keys));
705 }
706
707 // -- evict_stale ------------------------------------------------------
708
709 #[test]
710 fn evict_stale_removes_old_entries() {
711 let mut queues = ReplicationQueues::new();
712 let key = xor_name_from_byte(0x01);
713
714 // Go through the public `add_pending_verify` so the per-sender
715 // counter is correctly bumped — the entry's `hint_sender` slot must
716 // be released by `evict_stale` and we want to exercise that path.
717 let mut entry = test_entry(1);
718 let sender = entry.hint_sender;
719 // Backdate via the same defensive checked_sub used elsewhere so
720 // freshly-booted CI clocks don't trip us up.
721 entry.created_at = Instant::now()
722 .checked_sub(Duration::from_secs(2))
723 .unwrap_or_else(Instant::now);
724 assert!(queues.add_pending_verify(key, entry).admitted());
725
726 assert_eq!(queues.pending_count(), 1);
727 assert_eq!(queues.pending_count_for_sender(&sender), 1);
728
729 queues.evict_stale(Duration::from_secs(1));
730 assert_eq!(
731 queues.pending_count(),
732 0,
733 "entry older than max_age should be evicted"
734 );
735 // Per-sender counter must be released alongside the map removal.
736 assert_eq!(
737 queues.pending_count_for_sender(&sender),
738 0,
739 "evict_stale must release the per-sender slot"
740 );
741 }
742
743 #[test]
744 fn evict_stale_keeps_fresh_entries() {
745 let mut queues = ReplicationQueues::new();
746 let key = xor_name_from_byte(0x01);
747 queues.add_pending_verify(key, test_entry(1));
748
749 queues.evict_stale(Duration::from_secs(3600));
750 assert_eq!(
751 queues.pending_count(),
752 1,
753 "fresh entry should not be evicted"
754 );
755 }
756
757 // -- remove_pending ---------------------------------------------------
758
759 #[test]
760 fn remove_pending_returns_entry() {
761 let mut queues = ReplicationQueues::new();
762 let key = xor_name_from_byte(0x01);
763 queues.add_pending_verify(key, test_entry(1));
764
765 let removed = queues.remove_pending(&key);
766 assert!(removed.is_some());
767 assert_eq!(queues.pending_count(), 0);
768 }
769
770 #[test]
771 fn remove_pending_nonexistent_returns_none() {
772 let mut queues = ReplicationQueues::new();
773 assert!(queues.remove_pending(&xor_name_from_byte(0xFF)).is_none());
774 }
775
776 // -----------------------------------------------------------------------
777 // Section 18 scenarios
778 // -----------------------------------------------------------------------
779
780 /// Scenario 8: A key already in `PendingVerify` cannot be enqueued into
781 /// `FetchQueue` (cross-queue dedup). Also, a key in `FetchQueue` cannot be
782 /// re-added to `PendingVerify`.
783 #[test]
784 fn scenario_8_duplicate_key_not_double_queued() {
785 let mut queues = ReplicationQueues::new();
786 let key = xor_name_from_byte(0xE0);
787 let distance = xor_name_from_byte(0x10);
788
789 // Step 1: Add to PendingVerify.
790 assert!(
791 queues.add_pending_verify(key, test_entry(1)).admitted(),
792 "first add to PendingVerify should succeed"
793 );
794 assert!(
795 queues.contains_key(&key),
796 "key should be present in pipeline"
797 );
798
799 // Step 2: Attempt to enqueue fetch while still in PendingVerify.
800 // enqueue_fetch checks all three stages (pending_verify,
801 // fetch_queue_keys, in_flight), so this is a no-op while the key
802 // is still in PendingVerify.
803 queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(2)]);
804 // Verify the key is still tracked via the cross-stage check.
805 assert!(queues.contains_key(&key), "key should still be in pipeline");
806
807 // Step 3: Remove from PendingVerify, add to FetchQueue.
808 queues.remove_pending(&key);
809 queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(3)]);
810 assert_eq!(queues.fetch_queue_count(), 1);
811
812 // Step 4: Attempt to re-add to PendingVerify -> should fail.
813 assert!(
814 !queues.add_pending_verify(key, test_entry(4)).admitted(),
815 "key in FetchQueue should be rejected from PendingVerify"
816 );
817
818 // Step 5: Dequeue, start fetch -> key is in-flight.
819 let candidate = queues.dequeue_fetch().expect("should dequeue");
820 queues.start_fetch(
821 candidate.key,
822 candidate.sources[0],
823 candidate.sources.clone(),
824 );
825
826 // Step 6: Attempt to add to PendingVerify while in-flight -> reject.
827 assert!(
828 !queues.add_pending_verify(key, test_entry(5)).admitted(),
829 "key in-flight should be rejected from PendingVerify"
830 );
831
832 // Step 7: Attempt to enqueue fetch while in-flight -> no-op.
833 queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(6)]);
834 // fetch_queue should still be empty (the enqueue was a no-op).
835 assert_eq!(
836 queues.fetch_queue_count(),
837 0,
838 "enqueue_fetch should be no-op for in-flight key"
839 );
840 }
841
842 /// Scenario 8 (continued): Verify that pipeline field for a key
843 /// admitted as both replica and paid hint collapses to Replica only,
844 /// because cross-set precedence in admission gives replica priority.
845 #[test]
846 fn scenario_8_replica_and_paid_hint_collapses_to_replica() {
847 let mut queues = ReplicationQueues::new();
848 let key = xor_name_from_byte(0xE1);
849
850 // Simulate admission result: key was in both replica_hints and
851 // paid_hints, so admission gives it HintPipeline::Replica.
852 let entry = VerificationEntry {
853 state: VerificationState::PendingVerify,
854 pipeline: HintPipeline::Replica, // Cross-set precedence result.
855 verified_sources: Vec::new(),
856 tried_sources: HashSet::new(),
857 created_at: Instant::now(),
858 hint_sender: peer_id_from_byte(1),
859 };
860
861 assert!(queues.add_pending_verify(key, entry).admitted());
862
863 let pending = queues.get_pending(&key).expect("should be pending");
864 assert_eq!(
865 pending.pipeline,
866 HintPipeline::Replica,
867 "key in both hint sets should be Replica pipeline"
868 );
869
870 // A second add (e.g. from paid hints arriving separately) is rejected.
871 let paid_entry = VerificationEntry {
872 state: VerificationState::PendingVerify,
873 pipeline: HintPipeline::PaidOnly,
874 verified_sources: Vec::new(),
875 tried_sources: HashSet::new(),
876 created_at: Instant::now(),
877 hint_sender: peer_id_from_byte(2),
878 };
879
880 assert!(
881 !queues.add_pending_verify(key, paid_entry).admitted(),
882 "duplicate key should be rejected regardless of pipeline"
883 );
884
885 // Pipeline stays Replica.
886 let pending = queues.get_pending(&key).expect("should still be pending");
887 assert_eq!(
888 pending.pipeline,
889 HintPipeline::Replica,
890 "pipeline should remain Replica after duplicate rejection"
891 );
892 }
893
894 /// Scenario 3: Neighbor-sync unknown key transitions through the full
895 /// state machine to stored.
896 ///
897 /// Exercises the complete queue pipeline that a key follows when it
898 /// arrives as a neighbor-sync hint, passes quorum verification, is
899 /// fetched, and completes:
900 /// `PendingVerify` → (quorum pass) → `QueuedForFetch` → `Fetching` → `Stored`
901 #[test]
902 fn scenario_3_neighbor_sync_quorum_pass_full_pipeline() {
903 let mut queues = ReplicationQueues::new();
904 let key = xor_name_from_byte(0x03);
905 let distance = xor_name_from_byte(0x01);
906 let source_a = peer_id_from_byte(1);
907 let source_b = peer_id_from_byte(2);
908 let hint_sender = peer_id_from_byte(3);
909
910 // Stage 1: Hint admitted → PendingVerify
911 let entry = VerificationEntry {
912 state: VerificationState::PendingVerify,
913 pipeline: HintPipeline::Replica,
914 verified_sources: Vec::new(),
915 tried_sources: HashSet::new(),
916 created_at: Instant::now(),
917 hint_sender,
918 };
919 assert!(
920 queues.add_pending_verify(key, entry).admitted(),
921 "new key should be admitted to PendingVerify"
922 );
923 assert!(queues.contains_key(&key));
924 assert_eq!(queues.pending_count(), 1);
925
926 // Stage 2: Quorum passes — remove from pending and enqueue for fetch
927 // with the verified sources discovered during the quorum round.
928 let removed = queues.remove_pending(&key);
929 assert!(removed.is_some(), "key should exist in pending");
930 assert_eq!(queues.pending_count(), 0);
931
932 queues.enqueue_fetch(key, distance, vec![source_a, source_b]);
933 assert_eq!(queues.fetch_queue_count(), 1);
934 assert!(
935 queues.contains_key(&key),
936 "key should be in pipeline (fetch queue)"
937 );
938
939 // Stage 3: Dequeue → Fetching
940 let candidate = queues.dequeue_fetch().expect("should dequeue");
941 assert_eq!(candidate.key, key);
942 assert_eq!(candidate.sources.len(), 2);
943 queues.start_fetch(key, source_a, candidate.sources);
944 assert_eq!(queues.in_flight_count(), 1);
945 assert_eq!(queues.fetch_queue_count(), 0);
946 assert!(
947 queues.contains_key(&key),
948 "key should be in pipeline (in-flight)"
949 );
950
951 // Stage 4: Fetch completes → Stored
952 let completed = queues.complete_fetch(&key);
953 assert!(
954 completed.is_some(),
955 "should have in-flight entry to complete"
956 );
957 assert_eq!(queues.in_flight_count(), 0);
958 assert!(
959 !queues.contains_key(&key),
960 "key should be fully processed out of pipeline"
961 );
962 }
963}