Skip to main content

ant_node/replication/
scheduling.rs

1//! Scheduling and queue management (Section 12).
2//!
3//! Manages `PendingVerify`, `FetchQueue`, and `InFlightFetch` queues for the
4//! replication pipeline. Each key progresses through at most one queue at a
5//! time, with strict dedup across all three stages.
6
7use std::collections::{BinaryHeap, HashMap, HashSet};
8use std::time::{Duration, Instant};
9
10use crate::logging::debug;
11
12use crate::ant_protocol::XorName;
13use crate::replication::types::{
14    FetchCandidate, HintPipeline, VerificationEntry, VerificationState,
15};
16use saorsa_core::identity::PeerId;
17
18/// Global hard upper bound on the number of keys held in `pending_verify`.
19///
20/// Without a bound, a peer in the local routing table can flood
21/// `NeighborSyncRequest` messages (each capped only by
22/// `MAX_REPLICATION_MESSAGE_SIZE` ≈ 10 MiB, i.e. ~320k 32-byte hints per
23/// message) and grow this map without limit, exhausting node memory and
24/// driving a self-amplifying storm of outbound verification requests.
25///
26/// `131_072` entries is far above any legitimate aggregate need while
27/// bounding worst-case memory to a few tens of MiB (each `VerificationEntry`
28/// is on the order of a few hundred bytes; its sub-collections are populated
29/// only from close-group-sized verification evidence, never from attacker
30/// hint volume).
31///
32/// This global cap alone is **not** sufficient: with blind capacity-reject a
33/// single malicious routing-table peer could fill the whole map with cheap
34/// admission-passing junk and starve every honest peer's hints until the
35/// 30-minute `evict_stale` backstop fires (and re-fill immediately after).
36/// Honest-replication fairness is therefore enforced by
37/// [`MAX_PENDING_VERIFY_PER_PEER`] below; this global value is only the
38/// memory backstop.
39pub const MAX_PENDING_VERIFY: usize = 131_072;
40
41/// Per-source hard cap on `pending_verify` entries attributed to a single
42/// `hint_sender` peer.
43///
44/// This is the actual D1 defence. Each pending entry records the peer that
45/// hinted it (`VerificationEntry::hint_sender`); a single source may occupy
46/// at most this many slots. A flooding peer can therefore consume only its
47/// own quota — it can never deny slots to honest peers, because honest
48/// sources are accounted independently. Set well above any legitimate
49/// per-peer hint working set (a healthy neighbour syncs at most a few
50/// thousand keys to us per cycle) yet small enough that
51/// `MAX_PENDING_VERIFY / MAX_PENDING_VERIFY_PER_PEER` distinct malicious
52/// peers would be required to approach the global cap.
53///
54/// Residual (accepted, follow-up): with the current ratio, ~16 distinct
55/// `PeerId`s that are *all* simultaneously in the victim's routing table
56/// (gated by `sender_in_rt`) could still collectively reach the global
57/// `MAX_PENDING_VERIFY` backstop. `hint_sender` is the cryptographically
58/// authenticated connection identity (not a forgeable payload field), so
59/// this requires running ~16 real Kademlia-adjacent Sybil nodes — a large
60/// step up from the single-peer pre-fix attack, and the worst case degrades
61/// only to the bounded memory backstop, not silent permanent starvation of
62/// non-Sybil peers (each keeps its independent quota). A future hardening
63/// (reserved headroom for under-quota sources, or a per-source cap that
64/// scales with distinct-source pressure) is tracked as a follow-up and is
65/// intentionally out of scope for this `DoS` fix.
66pub const MAX_PENDING_VERIFY_PER_PEER: usize = 8_192;
67
68/// Hard upper bound on the number of keys held in `fetch_queue`.
69///
70/// `fetch_queue` is fed only by `enqueue_fetch`, which is reached **after** a
71/// key passes quorum verification in `run_verification_cycle` — attacker junk
72/// keys (no real holder) fail quorum and never reach this stage, so the
73/// bounded-and-fair `pending_verify` upstream is the primary protection. This
74/// global cap remains as a defence-in-depth memory backstop and is dropped
75/// (consistent with the existing cross-queue-dedup no-op contract of
76/// `enqueue_fetch`) when full.
77pub const MAX_FETCH_QUEUE: usize = 131_072;
78
79/// Outcome of [`ReplicationQueues::add_pending_verify`].
80///
81/// Distinguishes "the key is already being handled" from "the key was
82/// silently dropped due to a queue capacity bound". Bootstrap drain
83/// accounting and source-side retry logic MUST treat `CapacityRejected` as
84/// outstanding work; treating it like a dedup hit was the silent-drop
85/// regression introduced when the queues first became bounded.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum AdmissionResult {
88    /// New entry inserted into `pending_verify`.
89    Admitted,
90    /// Key was already in some pipeline stage; the existing entry is left
91    /// in place. No retry required.
92    AlreadyPresent,
93    /// Global or per-source capacity bound rejected the entry. The caller
94    /// MUST treat this as work still to do (not as silently completed).
95    CapacityRejected,
96}
97
98impl AdmissionResult {
99    /// `true` only for [`AdmissionResult::Admitted`]. Preserves call sites
100    /// that only want to know "did the insert happen".
101    #[must_use]
102    pub fn admitted(self) -> bool {
103        matches!(self, Self::Admitted)
104    }
105}
106
107// ---------------------------------------------------------------------------
108// In-flight entry
109// ---------------------------------------------------------------------------
110
111/// An in-flight fetch entry tracking an active download.
112#[derive(Debug, Clone)]
113pub struct InFlightEntry {
114    /// The key being fetched.
115    pub key: XorName,
116    /// The peer we are currently fetching from.
117    pub source: PeerId,
118    /// When the fetch started.
119    pub started_at: Instant,
120    /// All verified sources for this key.
121    pub all_sources: Vec<PeerId>,
122    /// Sources already attempted (failed or in progress).
123    pub tried: HashSet<PeerId>,
124}
125
126// ---------------------------------------------------------------------------
127// Central queue manager
128// ---------------------------------------------------------------------------
129
130/// Central queue manager for the replication pipeline.
131///
132/// Maintains three stages of the pipeline with global dedup:
133/// 1. **`PendingVerify`** -- keys awaiting quorum verification.
134/// 2. **`FetchQueue`** -- quorum-passed keys waiting for a fetch slot.
135/// 3. **`InFlightFetch`** -- keys actively being downloaded.
136pub struct ReplicationQueues {
137    /// Keys awaiting quorum result (dedup by key).
138    ///
139    /// Capacity-bounded by [`MAX_PENDING_VERIFY`]: admissions are rejected
140    /// once full, preventing unbounded growth under a network hint flood.
141    pending_verify: HashMap<XorName, VerificationEntry>,
142    /// Presence-quorum-passed or paid-list-authorized keys waiting for fetch.
143    ///
144    /// Capacity-bounded by [`MAX_FETCH_QUEUE`]: enqueues are dropped once
145    /// full, preventing unbounded growth under a network hint flood.
146    fetch_queue: BinaryHeap<FetchCandidate>,
147    /// Keys present in `fetch_queue` for O(1) dedup.
148    fetch_queue_keys: HashSet<XorName>,
149    /// Active downloads keyed by `XorName`.
150    in_flight_fetch: HashMap<XorName, InFlightEntry>,
151    /// Number of `pending_verify` entries currently attributed to each
152    /// `hint_sender` peer. Maintained in lockstep with `pending_verify`
153    /// (insert/remove/evict) so the per-peer quota
154    /// ([`MAX_PENDING_VERIFY_PER_PEER`]) can be enforced in O(1). An entry is
155    /// removed from this map when its count reaches zero so the map itself is
156    /// bounded by the number of distinct currently-pending sources.
157    pending_per_sender: HashMap<PeerId, usize>,
158}
159
160impl Default for ReplicationQueues {
161    fn default() -> Self {
162        Self::new()
163    }
164}
165
166impl ReplicationQueues {
167    /// Create new empty queues.
168    #[must_use]
169    pub fn new() -> Self {
170        Self {
171            pending_verify: HashMap::new(),
172            fetch_queue: BinaryHeap::new(),
173            fetch_queue_keys: HashSet::new(),
174            in_flight_fetch: HashMap::new(),
175            pending_per_sender: HashMap::new(),
176        }
177    }
178
179    // -----------------------------------------------------------------------
180    // PendingVerify
181    // -----------------------------------------------------------------------
182
183    /// Add a key to pending verification if not already present in any queue.
184    ///
185    /// Returns an [`AdmissionResult`] distinguishing the three outcomes:
186    /// * `Admitted` — newly inserted.
187    /// * `AlreadyPresent` — Rule 8 cross-queue dedup (the key is already in
188    ///   `pending_verify`, `fetch_queue`, or `in_flight_fetch`); the existing
189    ///   entry remains and there is no work to retry.
190    /// * `CapacityRejected` — global or per-source bound hit; the work is
191    ///   genuinely lost and the caller (e.g. bootstrap drain accounting,
192    ///   source-side retry) MUST treat this as still-outstanding work, not as
193    ///   "done". Without this distinction a bootstrap snapshot whose hints
194    ///   are capacity-rejected would silently mark itself drained.
195    pub fn add_pending_verify(
196        &mut self,
197        key: XorName,
198        entry: VerificationEntry,
199    ) -> AdmissionResult {
200        if self.contains_key(&key) {
201            return AdmissionResult::AlreadyPresent;
202        }
203        if self.pending_verify.len() >= MAX_PENDING_VERIFY {
204            debug!(
205                "pending_verify at global capacity ({MAX_PENDING_VERIFY}); rejecting key {}",
206                hex::encode(key)
207            );
208            return AdmissionResult::CapacityRejected;
209        }
210        let sender = entry.hint_sender;
211        let sender_count = self.pending_per_sender.get(&sender).copied().unwrap_or(0);
212        if sender_count >= MAX_PENDING_VERIFY_PER_PEER {
213            debug!(
214                "peer {sender} at per-source pending cap ({MAX_PENDING_VERIFY_PER_PEER}); \
215                 rejecting key {} (honest peers are unaffected)",
216                hex::encode(key)
217            );
218            return AdmissionResult::CapacityRejected;
219        }
220        self.pending_verify.insert(key, entry);
221        *self.pending_per_sender.entry(sender).or_insert(0) += 1;
222        AdmissionResult::Admitted
223    }
224
225    /// Decrement (and prune at zero) the per-sender counter for `sender`.
226    ///
227    /// Kept private so the counter can only move in lockstep with
228    /// `pending_verify` mutations. The decrement uses `saturating_sub` so a
229    /// hypothetical future invariant break (a release without a matching
230    /// admission) self-heals to zero instead of panicking on `usize`
231    /// underflow; `debug_assert!` still surfaces such a break in test builds.
232    fn release_sender_slot(pending_per_sender: &mut HashMap<PeerId, usize>, sender: &PeerId) {
233        if let Some(count) = pending_per_sender.get_mut(sender) {
234            debug_assert!(*count > 0, "per-sender counter underflow for {sender}");
235            *count = count.saturating_sub(1);
236            if *count == 0 {
237                pending_per_sender.remove(sender);
238            }
239        }
240    }
241
242    /// Get a reference to a pending verification entry.
243    #[must_use]
244    pub fn get_pending(&self, key: &XorName) -> Option<&VerificationEntry> {
245        self.pending_verify.get(key)
246    }
247
248    /// Advance a pending entry's verification `state`, returning the entry's
249    /// `pipeline` (so the caller can branch on it) when the key was found.
250    ///
251    /// Replaces a prior `get_pending_mut` which handed out `&mut VerificationEntry`
252    /// and relied on a doc-comment to keep callers from re-assigning
253    /// `hint_sender`. The per-source quota counter (`pending_per_sender`) is
254    /// keyed by `hint_sender` recorded at admission; re-attributing a live
255    /// entry to a different peer would orphan a count and silently desync
256    /// the quota — exactly the silent-starvation class this fix prevents.
257    /// Narrowing the mutation API to a single setter makes that mistake
258    /// impossible to commit by accident.
259    pub fn set_pending_state(
260        &mut self,
261        key: &XorName,
262        state: VerificationState,
263    ) -> Option<HintPipeline> {
264        let entry = self.pending_verify.get_mut(key)?;
265        entry.state = state;
266        Some(entry.pipeline)
267    }
268
269    /// Remove a key from pending verification.
270    pub fn remove_pending(&mut self, key: &XorName) -> Option<VerificationEntry> {
271        let removed = self.pending_verify.remove(key);
272        if let Some(entry) = &removed {
273            Self::release_sender_slot(&mut self.pending_per_sender, &entry.hint_sender);
274        }
275        removed
276    }
277
278    /// Collect all pending verification keys (for batch processing).
279    #[must_use]
280    pub fn pending_keys(&self) -> Vec<XorName> {
281        self.pending_verify.keys().copied().collect()
282    }
283
284    /// Number of keys in pending verification.
285    #[must_use]
286    pub fn pending_count(&self) -> usize {
287        self.pending_verify.len()
288    }
289
290    // -----------------------------------------------------------------------
291    // FetchQueue
292    // -----------------------------------------------------------------------
293
294    /// Enqueue a key for fetch with its distance and verified sources.
295    ///
296    /// Returns `true` if the candidate was enqueued, `false` if it was
297    /// already present in any pipeline stage (Rule 8: cross-queue dedup) or
298    /// the `fetch_queue` is at [`MAX_FETCH_QUEUE`].
299    ///
300    /// Callers that have removed the key from `pending_verify` immediately
301    /// before this call should prefer [`promote_pending_to_fetch`](Self::promote_pending_to_fetch),
302    /// which performs the move atomically and leaves the pending entry in
303    /// place when the fetch queue is full (so verified work is retried on
304    /// the next cycle instead of being silently lost).
305    pub fn enqueue_fetch(&mut self, key: XorName, distance: XorName, sources: Vec<PeerId>) -> bool {
306        if self.pending_verify.contains_key(&key)
307            || self.fetch_queue_keys.contains(&key)
308            || self.in_flight_fetch.contains_key(&key)
309        {
310            return false;
311        }
312        if self.fetch_queue.len() >= MAX_FETCH_QUEUE {
313            debug!(
314                "fetch_queue at capacity ({MAX_FETCH_QUEUE}); dropping new key {}",
315                hex::encode(key)
316            );
317            return false;
318        }
319        self.fetch_queue_keys.insert(key);
320        self.fetch_queue.push(FetchCandidate {
321            key,
322            distance,
323            sources,
324        });
325        true
326    }
327
328    /// Atomically promote a key from `pending_verify` to `fetch_queue`.
329    ///
330    /// Checks `fetch_queue` capacity FIRST, then removes the pending entry
331    /// and enqueues the fetch candidate. If `fetch_queue` is full, the
332    /// pending entry is **left in place** so the next verification cycle
333    /// can retry — preventing the silent-drop regression where a verified
334    /// key removed from `pending_verify` could be dropped by a full fetch
335    /// queue and lost from every stage.
336    ///
337    /// Returns `true` on successful promotion, `false` when the fetch queue
338    /// is at capacity (pending entry preserved).
339    pub fn promote_pending_to_fetch(
340        &mut self,
341        key: XorName,
342        distance: XorName,
343        sources: Vec<PeerId>,
344    ) -> bool {
345        if self.fetch_queue.len() >= MAX_FETCH_QUEUE {
346            debug!(
347                "fetch_queue at capacity ({MAX_FETCH_QUEUE}); leaving {} pending \
348                 for retry next cycle",
349                hex::encode(key)
350            );
351            return false;
352        }
353        // Capacity confirmed; safe to release the pending slot and enqueue.
354        let _ = self.remove_pending(&key);
355        // enqueue_fetch returns false only on capacity or already-queued; the
356        // capacity check above and the just-removed pending state make this
357        // succeed. If a concurrent path put the key into fetch_queue/in_flight
358        // between, dropping the duplicate is fine.
359        self.enqueue_fetch(key, distance, sources)
360    }
361
362    /// Dequeue the nearest fetch candidate.
363    ///
364    /// Returns `None` when the queue is empty.  Silently skips candidates
365    /// that are somehow already in-flight.  Concurrency is enforced by the
366    /// fetch worker, not by this method.
367    pub fn dequeue_fetch(&mut self) -> Option<FetchCandidate> {
368        while let Some(candidate) = self.fetch_queue.pop() {
369            self.fetch_queue_keys.remove(&candidate.key);
370            if !self.in_flight_fetch.contains_key(&candidate.key) {
371                return Some(candidate);
372            }
373        }
374        None
375    }
376
377    /// Number of keys waiting in the fetch queue.
378    #[must_use]
379    pub fn fetch_queue_count(&self) -> usize {
380        self.fetch_queue.len()
381    }
382
383    // -----------------------------------------------------------------------
384    // InFlightFetch
385    // -----------------------------------------------------------------------
386
387    /// Mark a key as in-flight (actively being fetched from `source`).
388    pub fn start_fetch(&mut self, key: XorName, source: PeerId, all_sources: Vec<PeerId>) {
389        let mut tried = HashSet::new();
390        tried.insert(source);
391        self.in_flight_fetch.insert(
392            key,
393            InFlightEntry {
394                key,
395                source,
396                started_at: Instant::now(),
397                all_sources,
398                tried,
399            },
400        );
401    }
402
403    /// Mark a fetch as completed (success or permanent failure).
404    pub fn complete_fetch(&mut self, key: &XorName) -> Option<InFlightEntry> {
405        self.in_flight_fetch.remove(key)
406    }
407
408    /// Mark the current fetch attempt as failed and try the next untried source.
409    ///
410    /// Returns the next source peer if one is available, or `None` if all
411    /// sources have been exhausted.
412    pub fn retry_fetch(&mut self, key: &XorName) -> Option<PeerId> {
413        let entry = self.in_flight_fetch.get_mut(key)?;
414        entry.tried.insert(entry.source);
415
416        let next = entry
417            .all_sources
418            .iter()
419            .find(|p| !entry.tried.contains(p))
420            .copied();
421
422        if let Some(next_peer) = next {
423            entry.source = next_peer;
424            entry.tried.insert(next_peer);
425            Some(next_peer)
426        } else {
427            None
428        }
429    }
430
431    /// Number of in-flight fetches.
432    #[must_use]
433    pub fn in_flight_count(&self) -> usize {
434        self.in_flight_fetch.len()
435    }
436
437    // -----------------------------------------------------------------------
438    // Cross-queue queries
439    // -----------------------------------------------------------------------
440
441    /// Check if a key is present in any pipeline stage.
442    #[must_use]
443    pub fn contains_key(&self, key: &XorName) -> bool {
444        self.pending_verify.contains_key(key)
445            || self.fetch_queue_keys.contains(key)
446            || self.in_flight_fetch.contains_key(key)
447    }
448
449    /// Check if all bootstrap-related work is done.
450    ///
451    /// Returns `true` when none of the given bootstrap keys remain in any queue.
452    #[must_use]
453    pub fn is_bootstrap_work_empty(&self, bootstrap_keys: &HashSet<XorName>) -> bool {
454        !bootstrap_keys.iter().any(|k| self.contains_key(k))
455    }
456
457    /// Evict stale pending-verification entries older than `max_age`.
458    pub fn evict_stale(&mut self, max_age: Duration) {
459        let now = Instant::now();
460        let before = self.pending_verify.len();
461        let pending_per_sender = &mut self.pending_per_sender;
462        self.pending_verify.retain(|_, entry| {
463            let fresh = now.duration_since(entry.created_at) < max_age;
464            if !fresh {
465                Self::release_sender_slot(pending_per_sender, &entry.hint_sender);
466            }
467            fresh
468        });
469        let evicted = before.saturating_sub(self.pending_verify.len());
470        if evicted > 0 {
471            debug!("Evicted {evicted} stale pending-verification entries");
472        }
473    }
474
475    /// Number of `pending_verify` entries currently attributed to `sender`.
476    /// Exposed for tests and observability of the per-source fairness quota.
477    #[must_use]
478    pub fn pending_count_for_sender(&self, sender: &PeerId) -> usize {
479        self.pending_per_sender.get(sender).copied().unwrap_or(0)
480    }
481}
482
483// ---------------------------------------------------------------------------
484// Tests
485// ---------------------------------------------------------------------------
486
487#[cfg(test)]
488#[allow(clippy::unwrap_used, clippy::expect_used)]
489mod tests {
490    use std::collections::HashSet;
491    use std::time::{Duration, Instant};
492
493    use super::*;
494
495    /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
496    fn peer_id_from_byte(b: u8) -> PeerId {
497        let mut bytes = [0u8; 32];
498        bytes[0] = b;
499        PeerId::from_bytes(bytes)
500    }
501
502    /// Build an `XorName` from a single byte (repeated to 32 bytes).
503    fn xor_name_from_byte(b: u8) -> XorName {
504        [b; 32]
505    }
506
507    /// Create a minimal `VerificationEntry` for testing.
508    fn test_entry(sender_byte: u8) -> VerificationEntry {
509        VerificationEntry {
510            state: VerificationState::PendingVerify,
511            pipeline: HintPipeline::Replica,
512            verified_sources: Vec::new(),
513            tried_sources: HashSet::new(),
514            created_at: Instant::now(),
515            hint_sender: peer_id_from_byte(sender_byte),
516        }
517    }
518
519    // -- add_pending_verify dedup ------------------------------------------
520
521    #[test]
522    fn add_pending_verify_new_key_succeeds() {
523        let mut queues = ReplicationQueues::new();
524        let key = xor_name_from_byte(0x01);
525        assert!(queues.add_pending_verify(key, test_entry(1)).admitted());
526        assert_eq!(queues.pending_count(), 1);
527    }
528
529    #[test]
530    fn add_pending_verify_duplicate_rejected() {
531        let mut queues = ReplicationQueues::new();
532        let key = xor_name_from_byte(0x01);
533        assert!(queues.add_pending_verify(key, test_entry(1)).admitted());
534        assert!(!queues.add_pending_verify(key, test_entry(2)).admitted());
535        assert_eq!(queues.pending_count(), 1);
536    }
537
538    #[test]
539    fn add_pending_verify_rejected_if_in_fetch_queue() {
540        let mut queues = ReplicationQueues::new();
541        let key = xor_name_from_byte(0x02);
542        let distance = xor_name_from_byte(0x10);
543        queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(1)]);
544
545        assert!(
546            !queues.add_pending_verify(key, test_entry(1)).admitted(),
547            "should reject key already in fetch queue"
548        );
549    }
550
551    #[test]
552    fn add_pending_verify_rejected_if_in_flight() {
553        let mut queues = ReplicationQueues::new();
554        let key = xor_name_from_byte(0x03);
555        let source = peer_id_from_byte(1);
556        queues.start_fetch(key, source, vec![source]);
557
558        assert!(
559            !queues.add_pending_verify(key, test_entry(1)).admitted(),
560            "should reject key already in-flight"
561        );
562    }
563
564    // -- enqueue/dequeue ordering -----------------------------------------
565
566    #[test]
567    fn dequeue_returns_nearest_first() {
568        let mut queues = ReplicationQueues::new();
569
570        let near_key = xor_name_from_byte(0x01);
571        let far_key = xor_name_from_byte(0x02);
572        let near_dist = [0x00; 32]; // nearest
573        let far_dist = [0xFF; 32]; // farthest
574
575        queues.enqueue_fetch(far_key, far_dist, vec![peer_id_from_byte(1)]);
576        queues.enqueue_fetch(near_key, near_dist, vec![peer_id_from_byte(2)]);
577
578        let first = queues.dequeue_fetch().expect("should dequeue");
579        assert_eq!(first.key, near_key, "nearest key should dequeue first");
580
581        let second = queues.dequeue_fetch().expect("should dequeue");
582        assert_eq!(second.key, far_key, "farthest key should dequeue second");
583    }
584
585    #[test]
586    fn enqueue_dedup_prevents_duplicates() {
587        let mut queues = ReplicationQueues::new();
588        let key = xor_name_from_byte(0x01);
589
590        queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(1)]);
591        queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(2)]);
592
593        assert_eq!(
594            queues.fetch_queue_count(),
595            1,
596            "duplicate enqueue should be ignored"
597        );
598    }
599
600    // -- in-flight tracking -----------------------------------------------
601
602    #[test]
603    fn start_and_complete_fetch() {
604        let mut queues = ReplicationQueues::new();
605        let key = xor_name_from_byte(0x01);
606        let source = peer_id_from_byte(1);
607
608        queues.start_fetch(key, source, vec![source]);
609        assert_eq!(queues.in_flight_count(), 1);
610
611        let completed = queues.complete_fetch(&key);
612        assert!(completed.is_some());
613        assert_eq!(queues.in_flight_count(), 0);
614    }
615
616    #[test]
617    fn complete_nonexistent_returns_none() {
618        let mut queues = ReplicationQueues::new();
619        let key = xor_name_from_byte(0x99);
620        assert!(queues.complete_fetch(&key).is_none());
621    }
622
623    // -- retry_fetch ------------------------------------------------------
624
625    #[test]
626    fn retry_fetch_returns_next_untried_source() {
627        let mut queues = ReplicationQueues::new();
628        let key = xor_name_from_byte(0x01);
629        let source_a = peer_id_from_byte(1);
630        let source_b = peer_id_from_byte(2);
631        let source_c = peer_id_from_byte(3);
632
633        queues.start_fetch(key, source_a, vec![source_a, source_b, source_c]);
634
635        // First retry: should skip source_a (already tried), return source_b.
636        let next = queues.retry_fetch(&key);
637        assert_eq!(next, Some(source_b));
638
639        // Second retry: should return source_c.
640        let next = queues.retry_fetch(&key);
641        assert_eq!(next, Some(source_c));
642
643        // Third retry: all exhausted.
644        let next = queues.retry_fetch(&key);
645        assert!(next.is_none(), "all sources exhausted");
646    }
647
648    #[test]
649    fn retry_fetch_nonexistent_returns_none() {
650        let mut queues = ReplicationQueues::new();
651        assert!(queues.retry_fetch(&xor_name_from_byte(0xFF)).is_none());
652    }
653
654    // -- contains_key across pipelines ------------------------------------
655
656    #[test]
657    fn contains_key_in_pending() {
658        let mut queues = ReplicationQueues::new();
659        let key = xor_name_from_byte(0x01);
660        queues.add_pending_verify(key, test_entry(1));
661        assert!(queues.contains_key(&key));
662    }
663
664    #[test]
665    fn contains_key_in_fetch_queue() {
666        let mut queues = ReplicationQueues::new();
667        let key = xor_name_from_byte(0x02);
668        queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(1)]);
669        assert!(queues.contains_key(&key));
670    }
671
672    #[test]
673    fn contains_key_in_flight() {
674        let mut queues = ReplicationQueues::new();
675        let key = xor_name_from_byte(0x03);
676        queues.start_fetch(key, peer_id_from_byte(1), vec![]);
677        assert!(queues.contains_key(&key));
678    }
679
680    #[test]
681    fn contains_key_absent() {
682        let queues = ReplicationQueues::new();
683        assert!(!queues.contains_key(&xor_name_from_byte(0xFF)));
684    }
685
686    // -- bootstrap work empty ---------------------------------------------
687
688    #[test]
689    fn bootstrap_work_empty_when_no_keys_present() {
690        let queues = ReplicationQueues::new();
691        let bootstrap_keys: HashSet<XorName> = [xor_name_from_byte(0x01), xor_name_from_byte(0x02)]
692            .into_iter()
693            .collect();
694        assert!(queues.is_bootstrap_work_empty(&bootstrap_keys));
695    }
696
697    #[test]
698    fn bootstrap_work_not_empty_when_key_in_pending() {
699        let mut queues = ReplicationQueues::new();
700        let key = xor_name_from_byte(0x01);
701        queues.add_pending_verify(key, test_entry(1));
702
703        let bootstrap_keys: HashSet<XorName> = std::iter::once(key).collect();
704        assert!(!queues.is_bootstrap_work_empty(&bootstrap_keys));
705    }
706
707    // -- evict_stale ------------------------------------------------------
708
709    #[test]
710    fn evict_stale_removes_old_entries() {
711        let mut queues = ReplicationQueues::new();
712        let key = xor_name_from_byte(0x01);
713
714        // Go through the public `add_pending_verify` so the per-sender
715        // counter is correctly bumped — the entry's `hint_sender` slot must
716        // be released by `evict_stale` and we want to exercise that path.
717        let mut entry = test_entry(1);
718        let sender = entry.hint_sender;
719        // Backdate via the same defensive checked_sub used elsewhere so
720        // freshly-booted CI clocks don't trip us up.
721        entry.created_at = Instant::now()
722            .checked_sub(Duration::from_secs(2))
723            .unwrap_or_else(Instant::now);
724        assert!(queues.add_pending_verify(key, entry).admitted());
725
726        assert_eq!(queues.pending_count(), 1);
727        assert_eq!(queues.pending_count_for_sender(&sender), 1);
728
729        queues.evict_stale(Duration::from_secs(1));
730        assert_eq!(
731            queues.pending_count(),
732            0,
733            "entry older than max_age should be evicted"
734        );
735        // Per-sender counter must be released alongside the map removal.
736        assert_eq!(
737            queues.pending_count_for_sender(&sender),
738            0,
739            "evict_stale must release the per-sender slot"
740        );
741    }
742
743    #[test]
744    fn evict_stale_keeps_fresh_entries() {
745        let mut queues = ReplicationQueues::new();
746        let key = xor_name_from_byte(0x01);
747        queues.add_pending_verify(key, test_entry(1));
748
749        queues.evict_stale(Duration::from_secs(3600));
750        assert_eq!(
751            queues.pending_count(),
752            1,
753            "fresh entry should not be evicted"
754        );
755    }
756
757    // -- remove_pending ---------------------------------------------------
758
759    #[test]
760    fn remove_pending_returns_entry() {
761        let mut queues = ReplicationQueues::new();
762        let key = xor_name_from_byte(0x01);
763        queues.add_pending_verify(key, test_entry(1));
764
765        let removed = queues.remove_pending(&key);
766        assert!(removed.is_some());
767        assert_eq!(queues.pending_count(), 0);
768    }
769
770    #[test]
771    fn remove_pending_nonexistent_returns_none() {
772        let mut queues = ReplicationQueues::new();
773        assert!(queues.remove_pending(&xor_name_from_byte(0xFF)).is_none());
774    }
775
776    // -----------------------------------------------------------------------
777    // Section 18 scenarios
778    // -----------------------------------------------------------------------
779
780    /// Scenario 8: A key already in `PendingVerify` cannot be enqueued into
781    /// `FetchQueue` (cross-queue dedup). Also, a key in `FetchQueue` cannot be
782    /// re-added to `PendingVerify`.
783    #[test]
784    fn scenario_8_duplicate_key_not_double_queued() {
785        let mut queues = ReplicationQueues::new();
786        let key = xor_name_from_byte(0xE0);
787        let distance = xor_name_from_byte(0x10);
788
789        // Step 1: Add to PendingVerify.
790        assert!(
791            queues.add_pending_verify(key, test_entry(1)).admitted(),
792            "first add to PendingVerify should succeed"
793        );
794        assert!(
795            queues.contains_key(&key),
796            "key should be present in pipeline"
797        );
798
799        // Step 2: Attempt to enqueue fetch while still in PendingVerify.
800        // enqueue_fetch checks all three stages (pending_verify,
801        // fetch_queue_keys, in_flight), so this is a no-op while the key
802        // is still in PendingVerify.
803        queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(2)]);
804        // Verify the key is still tracked via the cross-stage check.
805        assert!(queues.contains_key(&key), "key should still be in pipeline");
806
807        // Step 3: Remove from PendingVerify, add to FetchQueue.
808        queues.remove_pending(&key);
809        queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(3)]);
810        assert_eq!(queues.fetch_queue_count(), 1);
811
812        // Step 4: Attempt to re-add to PendingVerify -> should fail.
813        assert!(
814            !queues.add_pending_verify(key, test_entry(4)).admitted(),
815            "key in FetchQueue should be rejected from PendingVerify"
816        );
817
818        // Step 5: Dequeue, start fetch -> key is in-flight.
819        let candidate = queues.dequeue_fetch().expect("should dequeue");
820        queues.start_fetch(
821            candidate.key,
822            candidate.sources[0],
823            candidate.sources.clone(),
824        );
825
826        // Step 6: Attempt to add to PendingVerify while in-flight -> reject.
827        assert!(
828            !queues.add_pending_verify(key, test_entry(5)).admitted(),
829            "key in-flight should be rejected from PendingVerify"
830        );
831
832        // Step 7: Attempt to enqueue fetch while in-flight -> no-op.
833        queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(6)]);
834        // fetch_queue should still be empty (the enqueue was a no-op).
835        assert_eq!(
836            queues.fetch_queue_count(),
837            0,
838            "enqueue_fetch should be no-op for in-flight key"
839        );
840    }
841
842    /// Scenario 8 (continued): Verify that pipeline field for a key
843    /// admitted as both replica and paid hint collapses to Replica only,
844    /// because cross-set precedence in admission gives replica priority.
845    #[test]
846    fn scenario_8_replica_and_paid_hint_collapses_to_replica() {
847        let mut queues = ReplicationQueues::new();
848        let key = xor_name_from_byte(0xE1);
849
850        // Simulate admission result: key was in both replica_hints and
851        // paid_hints, so admission gives it HintPipeline::Replica.
852        let entry = VerificationEntry {
853            state: VerificationState::PendingVerify,
854            pipeline: HintPipeline::Replica, // Cross-set precedence result.
855            verified_sources: Vec::new(),
856            tried_sources: HashSet::new(),
857            created_at: Instant::now(),
858            hint_sender: peer_id_from_byte(1),
859        };
860
861        assert!(queues.add_pending_verify(key, entry).admitted());
862
863        let pending = queues.get_pending(&key).expect("should be pending");
864        assert_eq!(
865            pending.pipeline,
866            HintPipeline::Replica,
867            "key in both hint sets should be Replica pipeline"
868        );
869
870        // A second add (e.g. from paid hints arriving separately) is rejected.
871        let paid_entry = VerificationEntry {
872            state: VerificationState::PendingVerify,
873            pipeline: HintPipeline::PaidOnly,
874            verified_sources: Vec::new(),
875            tried_sources: HashSet::new(),
876            created_at: Instant::now(),
877            hint_sender: peer_id_from_byte(2),
878        };
879
880        assert!(
881            !queues.add_pending_verify(key, paid_entry).admitted(),
882            "duplicate key should be rejected regardless of pipeline"
883        );
884
885        // Pipeline stays Replica.
886        let pending = queues.get_pending(&key).expect("should still be pending");
887        assert_eq!(
888            pending.pipeline,
889            HintPipeline::Replica,
890            "pipeline should remain Replica after duplicate rejection"
891        );
892    }
893
894    /// Scenario 3: Neighbor-sync unknown key transitions through the full
895    /// state machine to stored.
896    ///
897    /// Exercises the complete queue pipeline that a key follows when it
898    /// arrives as a neighbor-sync hint, passes quorum verification, is
899    /// fetched, and completes:
900    ///   `PendingVerify` → (quorum pass) → `QueuedForFetch` → `Fetching` → `Stored`
901    #[test]
902    fn scenario_3_neighbor_sync_quorum_pass_full_pipeline() {
903        let mut queues = ReplicationQueues::new();
904        let key = xor_name_from_byte(0x03);
905        let distance = xor_name_from_byte(0x01);
906        let source_a = peer_id_from_byte(1);
907        let source_b = peer_id_from_byte(2);
908        let hint_sender = peer_id_from_byte(3);
909
910        // Stage 1: Hint admitted → PendingVerify
911        let entry = VerificationEntry {
912            state: VerificationState::PendingVerify,
913            pipeline: HintPipeline::Replica,
914            verified_sources: Vec::new(),
915            tried_sources: HashSet::new(),
916            created_at: Instant::now(),
917            hint_sender,
918        };
919        assert!(
920            queues.add_pending_verify(key, entry).admitted(),
921            "new key should be admitted to PendingVerify"
922        );
923        assert!(queues.contains_key(&key));
924        assert_eq!(queues.pending_count(), 1);
925
926        // Stage 2: Quorum passes — remove from pending and enqueue for fetch
927        // with the verified sources discovered during the quorum round.
928        let removed = queues.remove_pending(&key);
929        assert!(removed.is_some(), "key should exist in pending");
930        assert_eq!(queues.pending_count(), 0);
931
932        queues.enqueue_fetch(key, distance, vec![source_a, source_b]);
933        assert_eq!(queues.fetch_queue_count(), 1);
934        assert!(
935            queues.contains_key(&key),
936            "key should be in pipeline (fetch queue)"
937        );
938
939        // Stage 3: Dequeue → Fetching
940        let candidate = queues.dequeue_fetch().expect("should dequeue");
941        assert_eq!(candidate.key, key);
942        assert_eq!(candidate.sources.len(), 2);
943        queues.start_fetch(key, source_a, candidate.sources);
944        assert_eq!(queues.in_flight_count(), 1);
945        assert_eq!(queues.fetch_queue_count(), 0);
946        assert!(
947            queues.contains_key(&key),
948            "key should be in pipeline (in-flight)"
949        );
950
951        // Stage 4: Fetch completes → Stored
952        let completed = queues.complete_fetch(&key);
953        assert!(
954            completed.is_some(),
955            "should have in-flight entry to complete"
956        );
957        assert_eq!(queues.in_flight_count(), 0);
958        assert!(
959            !queues.contains_key(&key),
960            "key should be fully processed out of pipeline"
961        );
962    }
963}