Skip to main content

ant_node/replication/
scheduling.rs

1//! Scheduling and queue management (Section 12).
2//!
3//! Manages `PendingVerify`, `FetchQueue`, and `InFlightFetch` queues for the
4//! replication pipeline. Each key progresses through at most one queue at a
5//! time, with strict dedup across all three stages.
6
7use std::collections::{BinaryHeap, HashMap, HashSet};
8use std::time::{Duration, Instant};
9
10use crate::logging::debug;
11
12use crate::ant_protocol::XorName;
13use crate::replication::types::{FetchCandidate, VerificationEntry};
14use saorsa_core::identity::PeerId;
15
16// ---------------------------------------------------------------------------
17// In-flight entry
18// ---------------------------------------------------------------------------
19
20/// An in-flight fetch entry tracking an active download.
21#[derive(Debug, Clone)]
22pub struct InFlightEntry {
23    /// The key being fetched.
24    pub key: XorName,
25    /// The peer we are currently fetching from.
26    pub source: PeerId,
27    /// When the fetch started.
28    pub started_at: Instant,
29    /// All verified sources for this key.
30    pub all_sources: Vec<PeerId>,
31    /// Sources already attempted (failed or in progress).
32    pub tried: HashSet<PeerId>,
33}
34
35// ---------------------------------------------------------------------------
36// Central queue manager
37// ---------------------------------------------------------------------------
38
39/// Central queue manager for the replication pipeline.
40///
41/// Maintains three stages of the pipeline with global dedup:
42/// 1. **`PendingVerify`** -- keys awaiting quorum verification.
43/// 2. **`FetchQueue`** -- quorum-passed keys waiting for a fetch slot.
44/// 3. **`InFlightFetch`** -- keys actively being downloaded.
45pub struct ReplicationQueues {
46    /// Keys awaiting quorum result (dedup by key).
47    // TODO: Add capacity bound to prevent unbounded growth under network flood.
48    // Consider evicting farthest-distance entries when at capacity.
49    pending_verify: HashMap<XorName, VerificationEntry>,
50    /// Presence-quorum-passed or paid-list-authorized keys waiting for fetch.
51    // TODO: Add capacity bound (e.g. MAX_FETCH_QUEUE_SIZE) to prevent
52    // unbounded growth. Reject or evict farthest-distance candidates when full.
53    fetch_queue: BinaryHeap<FetchCandidate>,
54    /// Keys present in `fetch_queue` for O(1) dedup.
55    fetch_queue_keys: HashSet<XorName>,
56    /// Active downloads keyed by `XorName`.
57    in_flight_fetch: HashMap<XorName, InFlightEntry>,
58}
59
60impl Default for ReplicationQueues {
61    fn default() -> Self {
62        Self::new()
63    }
64}
65
66impl ReplicationQueues {
67    /// Create new empty queues.
68    #[must_use]
69    pub fn new() -> Self {
70        Self {
71            pending_verify: HashMap::new(),
72            fetch_queue: BinaryHeap::new(),
73            fetch_queue_keys: HashSet::new(),
74            in_flight_fetch: HashMap::new(),
75        }
76    }
77
78    // -----------------------------------------------------------------------
79    // PendingVerify
80    // -----------------------------------------------------------------------
81
82    /// Add a key to pending verification if not already present in any queue.
83    ///
84    /// Returns `true` if the key was newly added (Rule 8: cross-queue dedup).
85    pub fn add_pending_verify(&mut self, key: XorName, entry: VerificationEntry) -> bool {
86        if self.contains_key(&key) {
87            return false;
88        }
89        self.pending_verify.insert(key, entry);
90        true
91    }
92
93    /// Get a reference to a pending verification entry.
94    #[must_use]
95    pub fn get_pending(&self, key: &XorName) -> Option<&VerificationEntry> {
96        self.pending_verify.get(key)
97    }
98
99    /// Get a mutable reference to a pending verification entry.
100    pub fn get_pending_mut(&mut self, key: &XorName) -> Option<&mut VerificationEntry> {
101        self.pending_verify.get_mut(key)
102    }
103
104    /// Remove a key from pending verification.
105    pub fn remove_pending(&mut self, key: &XorName) -> Option<VerificationEntry> {
106        self.pending_verify.remove(key)
107    }
108
109    /// Collect all pending verification keys (for batch processing).
110    #[must_use]
111    pub fn pending_keys(&self) -> Vec<XorName> {
112        self.pending_verify.keys().copied().collect()
113    }
114
115    /// Number of keys in pending verification.
116    #[must_use]
117    pub fn pending_count(&self) -> usize {
118        self.pending_verify.len()
119    }
120
121    // -----------------------------------------------------------------------
122    // FetchQueue
123    // -----------------------------------------------------------------------
124
125    /// Enqueue a key for fetch with its distance and verified sources.
126    ///
127    /// No-op if the key is already in any pipeline stage (Rule 8: cross-queue
128    /// dedup).
129    pub fn enqueue_fetch(&mut self, key: XorName, distance: XorName, sources: Vec<PeerId>) {
130        if self.pending_verify.contains_key(&key)
131            || self.fetch_queue_keys.contains(&key)
132            || self.in_flight_fetch.contains_key(&key)
133        {
134            return;
135        }
136        self.fetch_queue_keys.insert(key);
137        self.fetch_queue.push(FetchCandidate {
138            key,
139            distance,
140            sources,
141        });
142    }
143
144    /// Dequeue the nearest fetch candidate.
145    ///
146    /// Returns `None` when the queue is empty.  Silently skips candidates
147    /// that are somehow already in-flight.  Concurrency is enforced by the
148    /// fetch worker, not by this method.
149    pub fn dequeue_fetch(&mut self) -> Option<FetchCandidate> {
150        while let Some(candidate) = self.fetch_queue.pop() {
151            self.fetch_queue_keys.remove(&candidate.key);
152            if !self.in_flight_fetch.contains_key(&candidate.key) {
153                return Some(candidate);
154            }
155        }
156        None
157    }
158
159    /// Number of keys waiting in the fetch queue.
160    #[must_use]
161    pub fn fetch_queue_count(&self) -> usize {
162        self.fetch_queue.len()
163    }
164
165    // -----------------------------------------------------------------------
166    // InFlightFetch
167    // -----------------------------------------------------------------------
168
169    /// Mark a key as in-flight (actively being fetched from `source`).
170    pub fn start_fetch(&mut self, key: XorName, source: PeerId, all_sources: Vec<PeerId>) {
171        let mut tried = HashSet::new();
172        tried.insert(source);
173        self.in_flight_fetch.insert(
174            key,
175            InFlightEntry {
176                key,
177                source,
178                started_at: Instant::now(),
179                all_sources,
180                tried,
181            },
182        );
183    }
184
185    /// Mark a fetch as completed (success or permanent failure).
186    pub fn complete_fetch(&mut self, key: &XorName) -> Option<InFlightEntry> {
187        self.in_flight_fetch.remove(key)
188    }
189
190    /// Mark the current fetch attempt as failed and try the next untried source.
191    ///
192    /// Returns the next source peer if one is available, or `None` if all
193    /// sources have been exhausted.
194    pub fn retry_fetch(&mut self, key: &XorName) -> Option<PeerId> {
195        let entry = self.in_flight_fetch.get_mut(key)?;
196        entry.tried.insert(entry.source);
197
198        let next = entry
199            .all_sources
200            .iter()
201            .find(|p| !entry.tried.contains(p))
202            .copied();
203
204        if let Some(next_peer) = next {
205            entry.source = next_peer;
206            entry.tried.insert(next_peer);
207            Some(next_peer)
208        } else {
209            None
210        }
211    }
212
213    /// Number of in-flight fetches.
214    #[must_use]
215    pub fn in_flight_count(&self) -> usize {
216        self.in_flight_fetch.len()
217    }
218
219    // -----------------------------------------------------------------------
220    // Cross-queue queries
221    // -----------------------------------------------------------------------
222
223    /// Check if a key is present in any pipeline stage.
224    #[must_use]
225    pub fn contains_key(&self, key: &XorName) -> bool {
226        self.pending_verify.contains_key(key)
227            || self.fetch_queue_keys.contains(key)
228            || self.in_flight_fetch.contains_key(key)
229    }
230
231    /// Check if all bootstrap-related work is done.
232    ///
233    /// Returns `true` when none of the given bootstrap keys remain in any queue.
234    #[must_use]
235    pub fn is_bootstrap_work_empty(&self, bootstrap_keys: &HashSet<XorName>) -> bool {
236        !bootstrap_keys.iter().any(|k| self.contains_key(k))
237    }
238
239    /// Evict stale pending-verification entries older than `max_age`.
240    pub fn evict_stale(&mut self, max_age: Duration) {
241        let now = Instant::now();
242        let before = self.pending_verify.len();
243        self.pending_verify
244            .retain(|_, entry| now.duration_since(entry.created_at) < max_age);
245        let evicted = before.saturating_sub(self.pending_verify.len());
246        if evicted > 0 {
247            debug!("Evicted {evicted} stale pending-verification entries");
248        }
249    }
250}
251
252// ---------------------------------------------------------------------------
253// Tests
254// ---------------------------------------------------------------------------
255
256#[cfg(test)]
257#[allow(clippy::unwrap_used, clippy::expect_used)]
258mod tests {
259    use std::collections::HashSet;
260    use std::time::{Duration, Instant};
261
262    use super::*;
263    use crate::replication::types::{HintPipeline, VerificationState};
264
265    /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
266    fn peer_id_from_byte(b: u8) -> PeerId {
267        let mut bytes = [0u8; 32];
268        bytes[0] = b;
269        PeerId::from_bytes(bytes)
270    }
271
272    /// Build an `XorName` from a single byte (repeated to 32 bytes).
273    fn xor_name_from_byte(b: u8) -> XorName {
274        [b; 32]
275    }
276
277    /// Create a minimal `VerificationEntry` for testing.
278    fn test_entry(sender_byte: u8) -> VerificationEntry {
279        VerificationEntry {
280            state: VerificationState::PendingVerify,
281            pipeline: HintPipeline::Replica,
282            verified_sources: Vec::new(),
283            tried_sources: HashSet::new(),
284            created_at: Instant::now(),
285            hint_sender: peer_id_from_byte(sender_byte),
286        }
287    }
288
289    // -- add_pending_verify dedup ------------------------------------------
290
291    #[test]
292    fn add_pending_verify_new_key_succeeds() {
293        let mut queues = ReplicationQueues::new();
294        let key = xor_name_from_byte(0x01);
295        assert!(queues.add_pending_verify(key, test_entry(1)));
296        assert_eq!(queues.pending_count(), 1);
297    }
298
299    #[test]
300    fn add_pending_verify_duplicate_rejected() {
301        let mut queues = ReplicationQueues::new();
302        let key = xor_name_from_byte(0x01);
303        assert!(queues.add_pending_verify(key, test_entry(1)));
304        assert!(!queues.add_pending_verify(key, test_entry(2)));
305        assert_eq!(queues.pending_count(), 1);
306    }
307
308    #[test]
309    fn add_pending_verify_rejected_if_in_fetch_queue() {
310        let mut queues = ReplicationQueues::new();
311        let key = xor_name_from_byte(0x02);
312        let distance = xor_name_from_byte(0x10);
313        queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(1)]);
314
315        assert!(
316            !queues.add_pending_verify(key, test_entry(1)),
317            "should reject key already in fetch queue"
318        );
319    }
320
321    #[test]
322    fn add_pending_verify_rejected_if_in_flight() {
323        let mut queues = ReplicationQueues::new();
324        let key = xor_name_from_byte(0x03);
325        let source = peer_id_from_byte(1);
326        queues.start_fetch(key, source, vec![source]);
327
328        assert!(
329            !queues.add_pending_verify(key, test_entry(1)),
330            "should reject key already in-flight"
331        );
332    }
333
334    // -- enqueue/dequeue ordering -----------------------------------------
335
336    #[test]
337    fn dequeue_returns_nearest_first() {
338        let mut queues = ReplicationQueues::new();
339
340        let near_key = xor_name_from_byte(0x01);
341        let far_key = xor_name_from_byte(0x02);
342        let near_dist = [0x00; 32]; // nearest
343        let far_dist = [0xFF; 32]; // farthest
344
345        queues.enqueue_fetch(far_key, far_dist, vec![peer_id_from_byte(1)]);
346        queues.enqueue_fetch(near_key, near_dist, vec![peer_id_from_byte(2)]);
347
348        let first = queues.dequeue_fetch().expect("should dequeue");
349        assert_eq!(first.key, near_key, "nearest key should dequeue first");
350
351        let second = queues.dequeue_fetch().expect("should dequeue");
352        assert_eq!(second.key, far_key, "farthest key should dequeue second");
353    }
354
355    #[test]
356    fn enqueue_dedup_prevents_duplicates() {
357        let mut queues = ReplicationQueues::new();
358        let key = xor_name_from_byte(0x01);
359
360        queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(1)]);
361        queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(2)]);
362
363        assert_eq!(
364            queues.fetch_queue_count(),
365            1,
366            "duplicate enqueue should be ignored"
367        );
368    }
369
370    // -- in-flight tracking -----------------------------------------------
371
372    #[test]
373    fn start_and_complete_fetch() {
374        let mut queues = ReplicationQueues::new();
375        let key = xor_name_from_byte(0x01);
376        let source = peer_id_from_byte(1);
377
378        queues.start_fetch(key, source, vec![source]);
379        assert_eq!(queues.in_flight_count(), 1);
380
381        let completed = queues.complete_fetch(&key);
382        assert!(completed.is_some());
383        assert_eq!(queues.in_flight_count(), 0);
384    }
385
386    #[test]
387    fn complete_nonexistent_returns_none() {
388        let mut queues = ReplicationQueues::new();
389        let key = xor_name_from_byte(0x99);
390        assert!(queues.complete_fetch(&key).is_none());
391    }
392
393    // -- retry_fetch ------------------------------------------------------
394
395    #[test]
396    fn retry_fetch_returns_next_untried_source() {
397        let mut queues = ReplicationQueues::new();
398        let key = xor_name_from_byte(0x01);
399        let source_a = peer_id_from_byte(1);
400        let source_b = peer_id_from_byte(2);
401        let source_c = peer_id_from_byte(3);
402
403        queues.start_fetch(key, source_a, vec![source_a, source_b, source_c]);
404
405        // First retry: should skip source_a (already tried), return source_b.
406        let next = queues.retry_fetch(&key);
407        assert_eq!(next, Some(source_b));
408
409        // Second retry: should return source_c.
410        let next = queues.retry_fetch(&key);
411        assert_eq!(next, Some(source_c));
412
413        // Third retry: all exhausted.
414        let next = queues.retry_fetch(&key);
415        assert!(next.is_none(), "all sources exhausted");
416    }
417
418    #[test]
419    fn retry_fetch_nonexistent_returns_none() {
420        let mut queues = ReplicationQueues::new();
421        assert!(queues.retry_fetch(&xor_name_from_byte(0xFF)).is_none());
422    }
423
424    // -- contains_key across pipelines ------------------------------------
425
426    #[test]
427    fn contains_key_in_pending() {
428        let mut queues = ReplicationQueues::new();
429        let key = xor_name_from_byte(0x01);
430        queues.add_pending_verify(key, test_entry(1));
431        assert!(queues.contains_key(&key));
432    }
433
434    #[test]
435    fn contains_key_in_fetch_queue() {
436        let mut queues = ReplicationQueues::new();
437        let key = xor_name_from_byte(0x02);
438        queues.enqueue_fetch(key, [0x10; 32], vec![peer_id_from_byte(1)]);
439        assert!(queues.contains_key(&key));
440    }
441
442    #[test]
443    fn contains_key_in_flight() {
444        let mut queues = ReplicationQueues::new();
445        let key = xor_name_from_byte(0x03);
446        queues.start_fetch(key, peer_id_from_byte(1), vec![]);
447        assert!(queues.contains_key(&key));
448    }
449
450    #[test]
451    fn contains_key_absent() {
452        let queues = ReplicationQueues::new();
453        assert!(!queues.contains_key(&xor_name_from_byte(0xFF)));
454    }
455
456    // -- bootstrap work empty ---------------------------------------------
457
458    #[test]
459    fn bootstrap_work_empty_when_no_keys_present() {
460        let queues = ReplicationQueues::new();
461        let bootstrap_keys: HashSet<XorName> = [xor_name_from_byte(0x01), xor_name_from_byte(0x02)]
462            .into_iter()
463            .collect();
464        assert!(queues.is_bootstrap_work_empty(&bootstrap_keys));
465    }
466
467    #[test]
468    fn bootstrap_work_not_empty_when_key_in_pending() {
469        let mut queues = ReplicationQueues::new();
470        let key = xor_name_from_byte(0x01);
471        queues.add_pending_verify(key, test_entry(1));
472
473        let bootstrap_keys: HashSet<XorName> = std::iter::once(key).collect();
474        assert!(!queues.is_bootstrap_work_empty(&bootstrap_keys));
475    }
476
477    // -- evict_stale ------------------------------------------------------
478
479    #[test]
480    fn evict_stale_removes_old_entries() {
481        let mut queues = ReplicationQueues::new();
482        let key = xor_name_from_byte(0x01);
483
484        // Create entry with a backdated timestamp. Use a small subtraction
485        // to avoid `checked_sub` returning `None` on freshly-booted CI runners.
486        let mut entry = test_entry(1);
487        entry.created_at = Instant::now()
488            .checked_sub(Duration::from_secs(2))
489            .unwrap_or_else(Instant::now);
490        queues.pending_verify.insert(key, entry);
491
492        assert_eq!(queues.pending_count(), 1);
493        queues.evict_stale(Duration::from_secs(1));
494        assert_eq!(
495            queues.pending_count(),
496            0,
497            "entry older than max_age should be evicted"
498        );
499    }
500
501    #[test]
502    fn evict_stale_keeps_fresh_entries() {
503        let mut queues = ReplicationQueues::new();
504        let key = xor_name_from_byte(0x01);
505        queues.add_pending_verify(key, test_entry(1));
506
507        queues.evict_stale(Duration::from_secs(3600));
508        assert_eq!(
509            queues.pending_count(),
510            1,
511            "fresh entry should not be evicted"
512        );
513    }
514
515    // -- remove_pending ---------------------------------------------------
516
517    #[test]
518    fn remove_pending_returns_entry() {
519        let mut queues = ReplicationQueues::new();
520        let key = xor_name_from_byte(0x01);
521        queues.add_pending_verify(key, test_entry(1));
522
523        let removed = queues.remove_pending(&key);
524        assert!(removed.is_some());
525        assert_eq!(queues.pending_count(), 0);
526    }
527
528    #[test]
529    fn remove_pending_nonexistent_returns_none() {
530        let mut queues = ReplicationQueues::new();
531        assert!(queues.remove_pending(&xor_name_from_byte(0xFF)).is_none());
532    }
533
534    // -----------------------------------------------------------------------
535    // Section 18 scenarios
536    // -----------------------------------------------------------------------
537
538    /// Scenario 8: A key already in `PendingVerify` cannot be enqueued into
539    /// `FetchQueue` (cross-queue dedup). Also, a key in `FetchQueue` cannot be
540    /// re-added to `PendingVerify`.
541    #[test]
542    fn scenario_8_duplicate_key_not_double_queued() {
543        let mut queues = ReplicationQueues::new();
544        let key = xor_name_from_byte(0xE0);
545        let distance = xor_name_from_byte(0x10);
546
547        // Step 1: Add to PendingVerify.
548        assert!(
549            queues.add_pending_verify(key, test_entry(1)),
550            "first add to PendingVerify should succeed"
551        );
552        assert!(
553            queues.contains_key(&key),
554            "key should be present in pipeline"
555        );
556
557        // Step 2: Attempt to enqueue fetch while still in PendingVerify.
558        // enqueue_fetch checks all three stages (pending_verify,
559        // fetch_queue_keys, in_flight), so this is a no-op while the key
560        // is still in PendingVerify.
561        queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(2)]);
562        // Verify the key is still tracked via the cross-stage check.
563        assert!(queues.contains_key(&key), "key should still be in pipeline");
564
565        // Step 3: Remove from PendingVerify, add to FetchQueue.
566        queues.remove_pending(&key);
567        queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(3)]);
568        assert_eq!(queues.fetch_queue_count(), 1);
569
570        // Step 4: Attempt to re-add to PendingVerify -> should fail.
571        assert!(
572            !queues.add_pending_verify(key, test_entry(4)),
573            "key in FetchQueue should be rejected from PendingVerify"
574        );
575
576        // Step 5: Dequeue, start fetch -> key is in-flight.
577        let candidate = queues.dequeue_fetch().expect("should dequeue");
578        queues.start_fetch(
579            candidate.key,
580            candidate.sources[0],
581            candidate.sources.clone(),
582        );
583
584        // Step 6: Attempt to add to PendingVerify while in-flight -> reject.
585        assert!(
586            !queues.add_pending_verify(key, test_entry(5)),
587            "key in-flight should be rejected from PendingVerify"
588        );
589
590        // Step 7: Attempt to enqueue fetch while in-flight -> no-op.
591        queues.enqueue_fetch(key, distance, vec![peer_id_from_byte(6)]);
592        // fetch_queue should still be empty (the enqueue was a no-op).
593        assert_eq!(
594            queues.fetch_queue_count(),
595            0,
596            "enqueue_fetch should be no-op for in-flight key"
597        );
598    }
599
600    /// Scenario 8 (continued): Verify that pipeline field for a key
601    /// admitted as both replica and paid hint collapses to Replica only,
602    /// because cross-set precedence in admission gives replica priority.
603    #[test]
604    fn scenario_8_replica_and_paid_hint_collapses_to_replica() {
605        let mut queues = ReplicationQueues::new();
606        let key = xor_name_from_byte(0xE1);
607
608        // Simulate admission result: key was in both replica_hints and
609        // paid_hints, so admission gives it HintPipeline::Replica.
610        let entry = VerificationEntry {
611            state: VerificationState::PendingVerify,
612            pipeline: HintPipeline::Replica, // Cross-set precedence result.
613            verified_sources: Vec::new(),
614            tried_sources: HashSet::new(),
615            created_at: Instant::now(),
616            hint_sender: peer_id_from_byte(1),
617        };
618
619        assert!(queues.add_pending_verify(key, entry));
620
621        let pending = queues.get_pending(&key).expect("should be pending");
622        assert_eq!(
623            pending.pipeline,
624            HintPipeline::Replica,
625            "key in both hint sets should be Replica pipeline"
626        );
627
628        // A second add (e.g. from paid hints arriving separately) is rejected.
629        let paid_entry = VerificationEntry {
630            state: VerificationState::PendingVerify,
631            pipeline: HintPipeline::PaidOnly,
632            verified_sources: Vec::new(),
633            tried_sources: HashSet::new(),
634            created_at: Instant::now(),
635            hint_sender: peer_id_from_byte(2),
636        };
637
638        assert!(
639            !queues.add_pending_verify(key, paid_entry),
640            "duplicate key should be rejected regardless of pipeline"
641        );
642
643        // Pipeline stays Replica.
644        let pending = queues.get_pending(&key).expect("should still be pending");
645        assert_eq!(
646            pending.pipeline,
647            HintPipeline::Replica,
648            "pipeline should remain Replica after duplicate rejection"
649        );
650    }
651
652    /// Scenario 3: Neighbor-sync unknown key transitions through the full
653    /// state machine to stored.
654    ///
655    /// Exercises the complete queue pipeline that a key follows when it
656    /// arrives as a neighbor-sync hint, passes quorum verification, is
657    /// fetched, and completes:
658    ///   `PendingVerify` → (quorum pass) → `QueuedForFetch` → `Fetching` → `Stored`
659    #[test]
660    fn scenario_3_neighbor_sync_quorum_pass_full_pipeline() {
661        let mut queues = ReplicationQueues::new();
662        let key = xor_name_from_byte(0x03);
663        let distance = xor_name_from_byte(0x01);
664        let source_a = peer_id_from_byte(1);
665        let source_b = peer_id_from_byte(2);
666        let hint_sender = peer_id_from_byte(3);
667
668        // Stage 1: Hint admitted → PendingVerify
669        let entry = VerificationEntry {
670            state: VerificationState::PendingVerify,
671            pipeline: HintPipeline::Replica,
672            verified_sources: Vec::new(),
673            tried_sources: HashSet::new(),
674            created_at: Instant::now(),
675            hint_sender,
676        };
677        assert!(
678            queues.add_pending_verify(key, entry),
679            "new key should be admitted to PendingVerify"
680        );
681        assert!(queues.contains_key(&key));
682        assert_eq!(queues.pending_count(), 1);
683
684        // Stage 2: Quorum passes — remove from pending and enqueue for fetch
685        // with the verified sources discovered during the quorum round.
686        let removed = queues.remove_pending(&key);
687        assert!(removed.is_some(), "key should exist in pending");
688        assert_eq!(queues.pending_count(), 0);
689
690        queues.enqueue_fetch(key, distance, vec![source_a, source_b]);
691        assert_eq!(queues.fetch_queue_count(), 1);
692        assert!(
693            queues.contains_key(&key),
694            "key should be in pipeline (fetch queue)"
695        );
696
697        // Stage 3: Dequeue → Fetching
698        let candidate = queues.dequeue_fetch().expect("should dequeue");
699        assert_eq!(candidate.key, key);
700        assert_eq!(candidate.sources.len(), 2);
701        queues.start_fetch(key, source_a, candidate.sources);
702        assert_eq!(queues.in_flight_count(), 1);
703        assert_eq!(queues.fetch_queue_count(), 0);
704        assert!(
705            queues.contains_key(&key),
706            "key should be in pipeline (in-flight)"
707        );
708
709        // Stage 4: Fetch completes → Stored
710        let completed = queues.complete_fetch(&key);
711        assert!(
712            completed.is_some(),
713            "should have in-flight entry to complete"
714        );
715        assert_eq!(queues.in_flight_count(), 0);
716        assert!(
717            !queues.contains_key(&key),
718            "key should be fully processed out of pipeline"
719        );
720    }
721}