Skip to main content

ant_node/replication/
quorum.rs

1//! Quorum verification logic (Section 9).
2//!
3//! Single-round batched verification: presence + paid-list evidence collected
4//! in one request round to `VerifyTargets = PaidTargets ∪ QuorumTargets`.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::Instant;
9
10use crate::logging::{debug, info, warn};
11use saorsa_core::identity::PeerId;
12use saorsa_core::P2PNode;
13
14use crate::ant_protocol::XorName;
15use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
16use crate::replication::protocol::{
17    ReplicationMessage, ReplicationMessageBody, VerificationRequest, VerificationResponse,
18};
19use crate::replication::types::{KeyVerificationEvidence, PaidListEvidence, PresenceEvidence};
20
21/// Verification round duration that is worth surfacing at info level.
22const VERIFICATION_ROUND_SLOW_LOG_MS: u128 = 500;
23
24// ---------------------------------------------------------------------------
25// Verification targets
26// ---------------------------------------------------------------------------
27
28/// Targets for verifying a set of keys.
29#[derive(Debug, Default)]
30pub struct VerificationTargets {
31    /// Per-key: closest `CLOSE_GROUP_SIZE` peers (excluding self) for presence
32    /// quorum.
33    pub quorum_targets: HashMap<XorName, Vec<PeerId>>,
34    /// Per-key: `PaidCloseGroup` peers for paid-list majority.
35    pub paid_targets: HashMap<XorName, Vec<PeerId>>,
36    /// Per-key: self-inclusive paid close-group size used to compute
37    /// `ConfirmNeeded(K)`.
38    pub paid_group_sizes: HashMap<XorName, usize>,
39    /// Union of all target peers across all keys.
40    pub all_peers: HashSet<PeerId>,
41    /// Which keys each peer should be queried about.
42    pub peer_to_keys: HashMap<PeerId, Vec<XorName>>,
43    /// Which keys need paid-list checks from which peers.
44    pub peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>>,
45}
46
47/// Compute verification targets for a batch of keys.
48///
49/// For each key, determines the `QuorumTargets` (closest `CLOSE_GROUP_SIZE`
50/// peers excluding self) and `PaidTargets` (`PaidCloseGroup` excluding self),
51/// then unions them into per-peer request batches.
52pub async fn compute_verification_targets(
53    keys: &[XorName],
54    p2p_node: &Arc<P2PNode>,
55    config: &ReplicationConfig,
56    self_id: &PeerId,
57) -> VerificationTargets {
58    let dht = p2p_node.dht_manager();
59    let mut targets = VerificationTargets {
60        quorum_targets: HashMap::new(),
61        paid_targets: HashMap::new(),
62        paid_group_sizes: HashMap::new(),
63        all_peers: HashSet::new(),
64        peer_to_keys: HashMap::new(),
65        peer_to_paid_keys: HashMap::new(),
66    };
67
68    for &key in keys {
69        // QuorumTargets: up to CLOSE_GROUP_SIZE nearest peers for K, excluding
70        // self.
71        let closest = dht
72            .find_closest_nodes_local(&key, config.close_group_size)
73            .await;
74        let quorum_peers: Vec<PeerId> = closest
75            .iter()
76            .filter(|n| n.peer_id != *self_id)
77            .map(|n| n.peer_id)
78            .collect();
79
80        // PaidTargets: PaidCloseGroup(K) excluding self.
81        let paid_closest = dht
82            .find_closest_nodes_local_with_self(&key, config.paid_list_close_group_size)
83            .await;
84        let paid_group_size = paid_closest.len();
85        let paid_peers: Vec<PeerId> = paid_closest
86            .iter()
87            .filter(|n| n.peer_id != *self_id)
88            .map(|n| n.peer_id)
89            .collect();
90
91        // VerifyTargets = PaidTargets ∪ QuorumTargets
92        for &peer in &quorum_peers {
93            targets.all_peers.insert(peer);
94            targets.peer_to_keys.entry(peer).or_default().push(key);
95        }
96        for &peer in &paid_peers {
97            targets.all_peers.insert(peer);
98            targets.peer_to_keys.entry(peer).or_default().push(key);
99            targets
100                .peer_to_paid_keys
101                .entry(peer)
102                .or_default()
103                .insert(key);
104        }
105
106        targets.quorum_targets.insert(key, quorum_peers);
107        targets.paid_targets.insert(key, paid_peers);
108        targets.paid_group_sizes.insert(key, paid_group_size);
109    }
110
111    // Deduplicate keys per peer (a peer in both quorum and paid targets for
112    // the same key would have it listed twice).
113    for keys_list in targets.peer_to_keys.values_mut() {
114        keys_list.sort_unstable();
115        keys_list.dedup();
116    }
117
118    targets
119}
120
121/// Compute presence-only verification targets for locally paid keys.
122///
123/// Local `PaidForList` membership authorizes the key already; this target set
124/// is only used to discover peers that can serve the record bytes.
125pub async fn compute_presence_targets(
126    keys: &[XorName],
127    p2p_node: &Arc<P2PNode>,
128    config: &ReplicationConfig,
129    self_id: &PeerId,
130) -> VerificationTargets {
131    let dht = p2p_node.dht_manager();
132    let mut targets = VerificationTargets {
133        quorum_targets: HashMap::new(),
134        paid_targets: HashMap::new(),
135        paid_group_sizes: HashMap::new(),
136        all_peers: HashSet::new(),
137        peer_to_keys: HashMap::new(),
138        peer_to_paid_keys: HashMap::new(),
139    };
140
141    for &key in keys {
142        let closest = dht
143            .find_closest_nodes_local(&key, config.close_group_size)
144            .await;
145        let quorum_peers: Vec<PeerId> = closest
146            .iter()
147            .filter(|n| n.peer_id != *self_id)
148            .map(|n| n.peer_id)
149            .collect();
150
151        for &peer in &quorum_peers {
152            targets.all_peers.insert(peer);
153            targets.peer_to_keys.entry(peer).or_default().push(key);
154        }
155
156        targets.quorum_targets.insert(key, quorum_peers);
157    }
158
159    for keys_list in targets.peer_to_keys.values_mut() {
160        keys_list.sort_unstable();
161        keys_list.dedup();
162    }
163
164    targets
165}
166
167// ---------------------------------------------------------------------------
168// Verification outcome
169// ---------------------------------------------------------------------------
170
171/// Outcome of verifying a single key.
172#[derive(Debug, Clone)]
173pub enum KeyVerificationOutcome {
174    /// Presence quorum passed.
175    QuorumVerified {
176        /// Peers that responded `Present` (verified fetch sources).
177        sources: Vec<PeerId>,
178    },
179    /// Paid-list authorization succeeded.
180    PaidListVerified {
181        /// Peers that responded `Present` (potential fetch sources, may be
182        /// empty).
183        sources: Vec<PeerId>,
184    },
185    /// Quorum failed definitively (both paths impossible).
186    QuorumFailed,
187    /// Inconclusive (timeout with neither success nor fail-fast).
188    QuorumInconclusive,
189}
190
191// ---------------------------------------------------------------------------
192// Evidence evaluation (pure logic, no I/O)
193// ---------------------------------------------------------------------------
194
195/// Evaluate verification evidence for a single key.
196///
197/// Returns the outcome based on Section 9 rules:
198/// - **Step 10**: If presence positives >= `QuorumNeeded(K)`, `QuorumVerified`.
199/// - **Step 9**: If paid confirmations >= `ConfirmNeeded(K)`,
200///   `PaidListVerified`.
201/// - **Step 14**: Fail fast when both paths are impossible.
202/// - **Step 15**: Otherwise inconclusive.
203#[must_use]
204pub fn evaluate_key_evidence(
205    key: &XorName,
206    evidence: &KeyVerificationEvidence,
207    targets: &VerificationTargets,
208    config: &ReplicationConfig,
209) -> KeyVerificationOutcome {
210    let quorum_peers = targets
211        .quorum_targets
212        .get(key)
213        .map_or(&[][..], Vec::as_slice);
214
215    // Count presence evidence from QuorumTargets.
216    let mut presence_positive = 0usize;
217    let mut presence_unresolved = 0usize;
218
219    for peer in quorum_peers {
220        match evidence.presence.get(peer) {
221            Some(PresenceEvidence::Present) => presence_positive += 1,
222            Some(PresenceEvidence::Absent) => {}
223            Some(PresenceEvidence::Unresolved) | None => {
224                presence_unresolved += 1;
225            }
226        }
227    }
228
229    // Also collect Present peers from paid targets for fetch sources.
230    let paid_peers = targets.paid_targets.get(key).map_or(&[][..], Vec::as_slice);
231    let present_peers = collect_present_sources(evidence, quorum_peers, paid_peers);
232
233    // Count paid-list evidence from PaidTargets.
234    let mut paid_confirmed = 0usize;
235    let mut paid_unresolved = 0usize;
236
237    for peer in paid_peers {
238        match evidence.paid_list.get(peer) {
239            Some(PaidListEvidence::Confirmed) => paid_confirmed += 1,
240            Some(PaidListEvidence::NotFound) => {}
241            Some(PaidListEvidence::Unresolved) | None => paid_unresolved += 1,
242        }
243    }
244
245    let quorum_needed = config.quorum_needed(quorum_peers.len());
246    let paid_group_size = targets
247        .paid_group_sizes
248        .get(key)
249        .copied()
250        .unwrap_or(paid_peers.len());
251    let confirm_needed = ReplicationConfig::confirm_needed(paid_group_size);
252
253    // Step 10: Presence quorum reached.
254    // quorum_needed == 0 means zero targets exist — quorum is impossible,
255    // not trivially met.
256    if quorum_needed > 0 && presence_positive >= quorum_needed {
257        return KeyVerificationOutcome::QuorumVerified {
258            sources: present_peers,
259        };
260    }
261
262    // Step 9: Paid-list majority reached.
263    // confirm_needed from 0 paid peers is 1, so this naturally fails with
264    // 0 confirmed — no special guard needed. But be explicit for clarity.
265    if paid_group_size > 0 && paid_confirmed >= confirm_needed {
266        return KeyVerificationOutcome::PaidListVerified {
267            sources: present_peers,
268        };
269    }
270
271    // Step 14: Fail fast when both paths are impossible.
272    let paid_possible = paid_group_size > 0 && paid_confirmed + paid_unresolved >= confirm_needed;
273    let quorum_possible =
274        quorum_needed > 0 && presence_positive + presence_unresolved >= quorum_needed;
275
276    if !paid_possible && !quorum_possible {
277        return KeyVerificationOutcome::QuorumFailed;
278    }
279
280    // Step 15: Neither success nor fail-fast.
281    KeyVerificationOutcome::QuorumInconclusive
282}
283
284/// Return peers that gave positive presence evidence for a key.
285///
286/// Only peers in the computed verification target sets are considered.
287#[must_use]
288pub fn present_sources_for_key(
289    key: &XorName,
290    evidence: &KeyVerificationEvidence,
291    targets: &VerificationTargets,
292) -> Vec<PeerId> {
293    let quorum_peers = targets
294        .quorum_targets
295        .get(key)
296        .map_or(&[][..], Vec::as_slice);
297    let paid_peers = targets.paid_targets.get(key).map_or(&[][..], Vec::as_slice);
298
299    collect_present_sources(evidence, quorum_peers, paid_peers)
300}
301
302fn collect_present_sources(
303    evidence: &KeyVerificationEvidence,
304    quorum_peers: &[PeerId],
305    paid_peers: &[PeerId],
306) -> Vec<PeerId> {
307    let mut present_peers = Vec::new();
308    let mut seen = HashSet::new();
309
310    for peer in quorum_peers.iter().chain(paid_peers.iter()) {
311        if matches!(evidence.presence.get(peer), Some(PresenceEvidence::Present))
312            && seen.insert(*peer)
313        {
314            present_peers.push(*peer);
315        }
316    }
317
318    present_peers
319}
320
321// ---------------------------------------------------------------------------
322// Network verification round
323// ---------------------------------------------------------------------------
324
325/// Send batched verification requests to all peers and collect evidence.
326///
327/// Implements Section 9 requirement: one request per peer carrying many keys.
328/// Returns per-key evidence aggregated from all peer responses.
329pub async fn run_verification_round(
330    keys: &[XorName],
331    targets: &VerificationTargets,
332    p2p_node: &Arc<P2PNode>,
333    config: &ReplicationConfig,
334) -> HashMap<XorName, KeyVerificationEvidence> {
335    let started = Instant::now();
336    let peer_count = targets.peer_to_keys.len();
337    let requested_key_refs = targets.peer_to_keys.values().map(Vec::len).sum::<usize>();
338
339    // Initialize empty evidence for all keys.
340    let mut evidence: HashMap<XorName, KeyVerificationEvidence> = keys
341        .iter()
342        .map(|&k| {
343            (
344                k,
345                KeyVerificationEvidence {
346                    presence: HashMap::new(),
347                    paid_list: HashMap::new(),
348                },
349            )
350        })
351        .collect();
352
353    // Send one batched request per peer.
354    let mut handles = Vec::new();
355
356    for (&peer, peer_keys) in &targets.peer_to_keys {
357        let paid_check_keys = targets.peer_to_paid_keys.get(&peer);
358
359        // Build paid_list_check_indices: which of this peer's keys need
360        // paid-list status.
361        let mut paid_indices = Vec::new();
362        for (i, key) in peer_keys.iter().enumerate() {
363            if let Some(paid_keys) = paid_check_keys {
364                if paid_keys.contains(key) {
365                    if let Ok(idx) = u32::try_from(i) {
366                        paid_indices.push(idx);
367                    }
368                }
369            }
370        }
371
372        let request = VerificationRequest {
373            keys: peer_keys.clone(),
374            paid_list_check_indices: paid_indices,
375        };
376
377        let msg = ReplicationMessage {
378            request_id: rand::random(),
379            body: ReplicationMessageBody::VerificationRequest(request),
380        };
381
382        let p2p = Arc::clone(p2p_node);
383        let timeout = config.verification_request_timeout;
384        let peer_id = peer;
385
386        handles.push(tokio::spawn(async move {
387            let encoded = match msg.encode() {
388                Ok(data) => data,
389                Err(e) => {
390                    warn!("Failed to encode verification request: {e}");
391                    return (peer_id, None);
392                }
393            };
394
395            match p2p
396                .send_request(&peer_id, REPLICATION_PROTOCOL_ID, encoded, timeout)
397                .await
398            {
399                Ok(response) => match ReplicationMessage::decode(&response.data) {
400                    Ok(decoded) => (peer_id, Some(decoded)),
401                    Err(e) => {
402                        warn!("Failed to decode verification response from {peer_id}: {e}");
403                        (peer_id, None)
404                    }
405                },
406                Err(e) => {
407                    debug!("Verification request to {peer_id} failed: {e}");
408                    (peer_id, None)
409                }
410            }
411        }));
412    }
413
414    // Collect responses.
415    for handle in handles {
416        let (peer, response) = match handle.await {
417            Ok(result) => result,
418            Err(e) => {
419                warn!("Verification task panicked: {e}");
420                continue;
421            }
422        };
423
424        let Some(msg) = response else {
425            // Timeout/error: mark all keys for this peer as unresolved.
426            mark_peer_unresolved(&peer, targets, &mut evidence);
427            continue;
428        };
429
430        if let ReplicationMessageBody::VerificationResponse(resp) = msg.body {
431            process_verification_response(&peer, &resp, targets, &mut evidence);
432        }
433    }
434
435    let elapsed_ms = started.elapsed().as_millis();
436    if elapsed_ms >= VERIFICATION_ROUND_SLOW_LOG_MS {
437        info!(
438            target: "ant_node::replication::verification",
439            "Slow quorum verification round: keys={}, peers={peer_count}, requested_key_refs={requested_key_refs}, elapsed_ms={elapsed_ms}",
440            keys.len(),
441        );
442    } else {
443        debug!(
444            target: "ant_node::replication::verification",
445            "Quorum verification round: keys={}, peers={peer_count}, requested_key_refs={requested_key_refs}, elapsed_ms={elapsed_ms}",
446            keys.len(),
447        );
448    }
449
450    evidence
451}
452
453/// Mark all keys for a peer as unresolved (timeout / decode failure).
454fn mark_peer_unresolved(
455    peer: &PeerId,
456    targets: &VerificationTargets,
457    evidence: &mut HashMap<XorName, KeyVerificationEvidence>,
458) {
459    if let Some(peer_keys) = targets.peer_to_keys.get(peer) {
460        let is_paid_peer = targets.peer_to_paid_keys.get(peer);
461        for key in peer_keys {
462            if let Some(ev) = evidence.get_mut(key) {
463                ev.presence.insert(*peer, PresenceEvidence::Unresolved);
464                if is_paid_peer.is_some_and(|ks| ks.contains(key)) {
465                    ev.paid_list.insert(*peer, PaidListEvidence::Unresolved);
466                }
467            }
468        }
469    }
470}
471
472/// Process a single peer's verification response into the evidence map.
473fn process_verification_response(
474    peer: &PeerId,
475    response: &VerificationResponse,
476    targets: &VerificationTargets,
477    evidence: &mut HashMap<XorName, KeyVerificationEvidence>,
478) {
479    let Some(peer_keys) = targets.peer_to_keys.get(peer) else {
480        return;
481    };
482
483    // Use a HashSet for O(1) key membership checks instead of linear scan,
484    // preventing CPU amplification from large responses.
485    let peer_keys_set: HashSet<&XorName> = peer_keys.iter().collect();
486
487    // Cap results at 2x requested keys to limit processing of stuffed
488    // responses while still tolerating some unsolicited entries.
489    let max_results = peer_keys.len().saturating_mul(2);
490    let results = if response.results.len() > max_results {
491        warn!(
492            "Peer {peer} sent {} verification results but only {} keys were requested — truncating",
493            response.results.len(),
494            peer_keys.len(),
495        );
496        &response.results[..max_results]
497    } else {
498        &response.results
499    };
500
501    // Match response results to requested keys.
502    for result in results {
503        if !peer_keys_set.contains(&result.key) {
504            continue; // Ignore unsolicited key results.
505        }
506
507        if let Some(ev) = evidence.get_mut(&result.key) {
508            // Presence evidence.
509            let presence = if result.present {
510                PresenceEvidence::Present
511            } else {
512                PresenceEvidence::Absent
513            };
514            ev.presence.insert(*peer, presence);
515
516            // Paid-list evidence (only if requested).
517            if let Some(is_paid) = result.paid {
518                let paid = if is_paid {
519                    PaidListEvidence::Confirmed
520                } else {
521                    PaidListEvidence::NotFound
522                };
523                ev.paid_list.insert(*peer, paid);
524            }
525        }
526    }
527
528    // Keys that were requested but not in response -> unresolved.
529    let is_paid_peer = targets.peer_to_paid_keys.get(peer);
530    for key in peer_keys {
531        if let Some(ev) = evidence.get_mut(key) {
532            ev.presence
533                .entry(*peer)
534                .or_insert(PresenceEvidence::Unresolved);
535            if is_paid_peer.is_some_and(|ks| ks.contains(key)) {
536                ev.paid_list
537                    .entry(*peer)
538                    .or_insert(PaidListEvidence::Unresolved);
539            }
540        }
541    }
542}
543
544// ---------------------------------------------------------------------------
545// Tests
546// ---------------------------------------------------------------------------
547
548#[cfg(test)]
549#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
550mod tests {
551    use super::*;
552    use crate::replication::protocol::KeyVerificationResult;
553
554    /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
555    fn peer_id_from_byte(b: u8) -> PeerId {
556        let mut bytes = [0u8; 32];
557        bytes[0] = b;
558        PeerId::from_bytes(bytes)
559    }
560
561    /// Build an `XorName` from a single byte (repeated to 32 bytes).
562    fn xor_name_from_byte(b: u8) -> XorName {
563        [b; 32]
564    }
565
566    /// Helper: build minimal `VerificationTargets` for a single key with
567    /// explicit quorum and paid peer lists.
568    fn single_key_targets(
569        key: &XorName,
570        quorum_peers: Vec<PeerId>,
571        paid_peers: Vec<PeerId>,
572    ) -> VerificationTargets {
573        let mut all_peers = HashSet::new();
574        let mut peer_to_keys: HashMap<PeerId, Vec<XorName>> = HashMap::new();
575        let mut peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>> = HashMap::new();
576
577        for &p in &quorum_peers {
578            all_peers.insert(p);
579            peer_to_keys.entry(p).or_default().push(*key);
580        }
581        for &p in &paid_peers {
582            all_peers.insert(p);
583            peer_to_keys.entry(p).or_default().push(*key);
584            peer_to_paid_keys.entry(p).or_default().insert(*key);
585        }
586
587        // Deduplicate keys per peer.
588        for keys_list in peer_to_keys.values_mut() {
589            keys_list.sort_unstable();
590            keys_list.dedup();
591        }
592
593        let paid_group_size = paid_peers.len();
594        VerificationTargets {
595            quorum_targets: std::iter::once((key.to_owned(), quorum_peers)).collect(),
596            paid_group_sizes: std::iter::once((key.to_owned(), paid_group_size)).collect(),
597            paid_targets: std::iter::once((key.to_owned(), paid_peers)).collect(),
598            all_peers,
599            peer_to_keys,
600            peer_to_paid_keys,
601        }
602    }
603
604    /// Helper: build `KeyVerificationEvidence` from presence and paid-list
605    /// maps.
606    fn build_evidence(
607        presence: Vec<(PeerId, PresenceEvidence)>,
608        paid_list: Vec<(PeerId, PaidListEvidence)>,
609    ) -> KeyVerificationEvidence {
610        KeyVerificationEvidence {
611            presence: presence.into_iter().collect(),
612            paid_list: paid_list.into_iter().collect(),
613        }
614    }
615
616    #[test]
617    fn present_sources_for_key_filters_targets_and_deduplicates() {
618        let key = xor_name_from_byte(0x11);
619        let q_present = peer_id_from_byte(1);
620        let overlap = peer_id_from_byte(2);
621        let q_absent = peer_id_from_byte(3);
622        let q_unresolved = peer_id_from_byte(4);
623        let paid_present = peer_id_from_byte(5);
624        let paid_absent = peer_id_from_byte(6);
625        let outside_target = peer_id_from_byte(7);
626
627        let targets = single_key_targets(
628            &key,
629            vec![q_present, overlap, q_absent, q_unresolved],
630            vec![overlap, paid_present, paid_absent],
631        );
632        let evidence = build_evidence(
633            vec![
634                (q_present, PresenceEvidence::Present),
635                (overlap, PresenceEvidence::Present),
636                (q_absent, PresenceEvidence::Absent),
637                (q_unresolved, PresenceEvidence::Unresolved),
638                (paid_present, PresenceEvidence::Present),
639                (paid_absent, PresenceEvidence::Absent),
640                (outside_target, PresenceEvidence::Present),
641            ],
642            vec![],
643        );
644
645        let sources = present_sources_for_key(&key, &evidence, &targets);
646
647        assert_eq!(
648            sources,
649            vec![q_present, overlap, paid_present],
650            "sources should preserve quorum-first order, de-duplicate overlap, and ignore non-target/negative evidence"
651        );
652    }
653
654    // -----------------------------------------------------------------------
655    // evaluate_key_evidence: QuorumVerified
656    // -----------------------------------------------------------------------
657
658    #[test]
659    fn quorum_verified_with_enough_present_responses() {
660        let key = xor_name_from_byte(0x10);
661        let config = ReplicationConfig::default();
662
663        // 7 quorum peers, threshold = min(4, floor(7/2)+1) = 4
664        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
665        let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
666
667        // 4 peers say Present, 3 say Absent.
668        let evidence = build_evidence(
669            vec![
670                (quorum_peers[0], PresenceEvidence::Present),
671                (quorum_peers[1], PresenceEvidence::Present),
672                (quorum_peers[2], PresenceEvidence::Present),
673                (quorum_peers[3], PresenceEvidence::Present),
674                (quorum_peers[4], PresenceEvidence::Absent),
675                (quorum_peers[5], PresenceEvidence::Absent),
676                (quorum_peers[6], PresenceEvidence::Absent),
677            ],
678            vec![],
679        );
680
681        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
682        assert!(
683            matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 4),
684            "expected QuorumVerified with 4 sources, got {outcome:?}"
685        );
686    }
687
688    // -----------------------------------------------------------------------
689    // evaluate_key_evidence: PaidListVerified
690    // -----------------------------------------------------------------------
691
692    #[test]
693    fn paid_list_verified_with_enough_confirmations() {
694        let key = xor_name_from_byte(0x20);
695        let config = ReplicationConfig::default();
696
697        // 5 paid peers, confirm_needed = floor(5/2)+1 = 3
698        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
699        // No quorum peers (or quorum fails).
700        let quorum_peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
701        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
702
703        // Quorum: all Absent (fails presence path).
704        // Paid: 3 Confirmed, 2 NotFound -> majority reached.
705        let evidence = build_evidence(
706            vec![
707                (quorum_peers[0], PresenceEvidence::Absent),
708                (quorum_peers[1], PresenceEvidence::Absent),
709                (quorum_peers[2], PresenceEvidence::Absent),
710            ],
711            vec![
712                (paid_peers[0], PaidListEvidence::Confirmed),
713                (paid_peers[1], PaidListEvidence::Confirmed),
714                (paid_peers[2], PaidListEvidence::Confirmed),
715                (paid_peers[3], PaidListEvidence::NotFound),
716                (paid_peers[4], PaidListEvidence::NotFound),
717            ],
718        );
719
720        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
721        assert!(
722            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
723            "expected PaidListVerified, got {outcome:?}"
724        );
725    }
726
727    // -----------------------------------------------------------------------
728    // evaluate_key_evidence: QuorumFailed
729    // -----------------------------------------------------------------------
730
731    #[test]
732    fn quorum_failed_when_both_paths_impossible() {
733        let key = xor_name_from_byte(0x30);
734        let config = ReplicationConfig::default();
735
736        // 5 quorum peers, quorum_needed = min(4, floor(5/2)+1) = min(4,3) = 3
737        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
738        // 3 paid peers, confirm_needed = floor(3/2)+1 = 2
739        let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
740        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
741
742        // Presence: all 5 Absent (0 positive, 0 unresolved) -> can't reach 3.
743        // Paid: all 3 NotFound (0 confirmed, 0 unresolved) -> can't reach 2.
744        let evidence = build_evidence(
745            vec![
746                (quorum_peers[0], PresenceEvidence::Absent),
747                (quorum_peers[1], PresenceEvidence::Absent),
748                (quorum_peers[2], PresenceEvidence::Absent),
749                (quorum_peers[3], PresenceEvidence::Absent),
750                (quorum_peers[4], PresenceEvidence::Absent),
751            ],
752            vec![
753                (paid_peers[0], PaidListEvidence::NotFound),
754                (paid_peers[1], PaidListEvidence::NotFound),
755                (paid_peers[2], PaidListEvidence::NotFound),
756            ],
757        );
758
759        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
760        assert!(
761            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
762            "expected QuorumFailed, got {outcome:?}"
763        );
764    }
765
766    // -----------------------------------------------------------------------
767    // evaluate_key_evidence: QuorumInconclusive
768    // -----------------------------------------------------------------------
769
770    #[test]
771    fn quorum_inconclusive_with_unresolved_peers() {
772        let key = xor_name_from_byte(0x40);
773        let config = ReplicationConfig::default();
774
775        // 5 quorum peers, quorum_needed = min(4, 3) = 3
776        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
777        // 3 paid peers, confirm_needed = 2
778        let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
779        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
780
781        // Presence: 2 Present, 1 Absent, 2 Unresolved.
782        // positive=2, unresolved=2 -> 2+2=4 >= 3 -> quorum still possible.
783        // Paid: 1 Confirmed, 1 Unresolved, 1 NotFound.
784        // confirmed=1, unresolved=1 -> 1+1=2 >= 2 -> paid still possible.
785        // Neither path reached yet -> Inconclusive.
786        let evidence = build_evidence(
787            vec![
788                (quorum_peers[0], PresenceEvidence::Present),
789                (quorum_peers[1], PresenceEvidence::Present),
790                (quorum_peers[2], PresenceEvidence::Absent),
791                (quorum_peers[3], PresenceEvidence::Unresolved),
792                (quorum_peers[4], PresenceEvidence::Unresolved),
793            ],
794            vec![
795                (paid_peers[0], PaidListEvidence::Confirmed),
796                (paid_peers[1], PaidListEvidence::Unresolved),
797                (paid_peers[2], PaidListEvidence::NotFound),
798            ],
799        );
800
801        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
802        assert!(
803            matches!(outcome, KeyVerificationOutcome::QuorumInconclusive),
804            "expected QuorumInconclusive, got {outcome:?}"
805        );
806    }
807
808    // -----------------------------------------------------------------------
809    // Dynamic thresholds with undersized sets
810    // -----------------------------------------------------------------------
811
812    #[test]
813    fn quorum_verified_with_undersized_quorum_targets() {
814        let key = xor_name_from_byte(0x50);
815        let config = ReplicationConfig::default();
816
817        // Only 2 quorum peers (undersized).
818        // quorum_needed = min(4, floor(2/2)+1) = min(4, 2) = 2
819        let quorum_peers: Vec<PeerId> = (1..=2).map(peer_id_from_byte).collect();
820        let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
821
822        // Both Present -> 2 >= 2 -> QuorumVerified.
823        let evidence = build_evidence(
824            vec![
825                (quorum_peers[0], PresenceEvidence::Present),
826                (quorum_peers[1], PresenceEvidence::Present),
827            ],
828            vec![],
829        );
830
831        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
832        assert!(
833            matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 2),
834            "expected QuorumVerified with 2 sources, got {outcome:?}"
835        );
836    }
837
838    #[test]
839    fn paid_list_verified_with_single_paid_peer() {
840        let key = xor_name_from_byte(0x60);
841        let config = ReplicationConfig::default();
842
843        // 1 paid peer, confirm_needed = floor(1/2)+1 = 1
844        let paid_peers = vec![peer_id_from_byte(10)];
845        // No quorum targets -> quorum path impossible from the start.
846        let targets = single_key_targets(&key, vec![], paid_peers.clone());
847
848        let evidence = build_evidence(vec![], vec![(paid_peers[0], PaidListEvidence::Confirmed)]);
849
850        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
851        assert!(
852            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
853            "expected PaidListVerified with single peer, got {outcome:?}"
854        );
855    }
856
857    #[test]
858    fn paid_list_majority_uses_self_inclusive_paid_group_size() {
859        let key = xor_name_from_byte(0x61);
860        let config = ReplicationConfig::default();
861
862        // Real target computation uses PaidCloseGroup(K), which is
863        // self-inclusive. If self is in a 20-node paid group and does not
864        // already have local paid-list state, 10 remote confirmations are not
865        // enough: ConfirmNeeded(20) is 11.
866        let paid_peers: Vec<PeerId> = (1..=19).map(peer_id_from_byte).collect();
867        let mut targets = single_key_targets(&key, vec![], paid_peers.clone());
868        targets.paid_group_sizes.insert(key, 20);
869
870        let ten_confirmations = build_evidence(
871            vec![],
872            paid_peers
873                .iter()
874                .enumerate()
875                .map(|(i, p)| {
876                    (
877                        *p,
878                        if i < 10 {
879                            PaidListEvidence::Confirmed
880                        } else {
881                            PaidListEvidence::NotFound
882                        },
883                    )
884                })
885                .collect(),
886        );
887        let outcome = evaluate_key_evidence(&key, &ten_confirmations, &targets, &config);
888        assert!(
889            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
890            "10/20 paid confirmations must not authorize the key, got {outcome:?}"
891        );
892
893        let eleven_confirmations = build_evidence(
894            vec![],
895            paid_peers
896                .iter()
897                .enumerate()
898                .map(|(i, p)| {
899                    (
900                        *p,
901                        if i < 11 {
902                            PaidListEvidence::Confirmed
903                        } else {
904                            PaidListEvidence::NotFound
905                        },
906                    )
907                })
908                .collect(),
909        );
910        let outcome = evaluate_key_evidence(&key, &eleven_confirmations, &targets, &config);
911        assert!(
912            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
913            "11/20 paid confirmations should authorize the key, got {outcome:?}"
914        );
915    }
916
917    #[test]
918    fn quorum_fails_with_zero_targets_no_paid() {
919        let key = xor_name_from_byte(0x70);
920        let config = ReplicationConfig::default();
921
922        // No quorum peers, no paid peers.
923        // quorum_needed(0) = min(4, 1) = 1, but 0 positive + 0 unresolved < 1.
924        // confirm_needed(0) = 1, but 0 confirmed + 0 unresolved < 1.
925        let targets = single_key_targets(&key, vec![], vec![]);
926
927        let evidence = build_evidence(vec![], vec![]);
928
929        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
930        assert!(
931            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
932            "expected QuorumFailed with zero targets, got {outcome:?}"
933        );
934    }
935
936    #[test]
937    fn quorum_verified_beats_paid_list_when_both_satisfied() {
938        // When both presence quorum AND paid-list majority are satisfied,
939        // QuorumVerified takes precedence (evaluated first).
940        let key = xor_name_from_byte(0x80);
941        let config = ReplicationConfig::default();
942
943        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
944        let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
945        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
946
947        // quorum_needed(5) = min(4, 3) = 3; all 5 Present -> quorum met.
948        // confirm_needed(3) = 2; all 3 Confirmed -> paid met.
949        let evidence = build_evidence(
950            vec![
951                (quorum_peers[0], PresenceEvidence::Present),
952                (quorum_peers[1], PresenceEvidence::Present),
953                (quorum_peers[2], PresenceEvidence::Present),
954                (quorum_peers[3], PresenceEvidence::Present),
955                (quorum_peers[4], PresenceEvidence::Present),
956            ],
957            vec![
958                (paid_peers[0], PaidListEvidence::Confirmed),
959                (paid_peers[1], PaidListEvidence::Confirmed),
960                (paid_peers[2], PaidListEvidence::Confirmed),
961            ],
962        );
963
964        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
965        assert!(
966            matches!(outcome, KeyVerificationOutcome::QuorumVerified { .. }),
967            "QuorumVerified should take precedence over PaidListVerified, got {outcome:?}"
968        );
969    }
970
971    // -----------------------------------------------------------------------
972    // process_verification_response
973    // -----------------------------------------------------------------------
974
975    #[test]
976    fn process_response_populates_evidence() {
977        let key = xor_name_from_byte(0x90);
978        let peer = peer_id_from_byte(1);
979
980        let targets = single_key_targets(&key, vec![peer], vec![peer]);
981
982        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
983            key,
984            KeyVerificationEvidence {
985                presence: HashMap::new(),
986                paid_list: HashMap::new(),
987            },
988        ))
989        .collect();
990
991        let response = VerificationResponse {
992            results: vec![KeyVerificationResult {
993                key,
994                present: true,
995                paid: Some(true),
996            }],
997        };
998
999        process_verification_response(&peer, &response, &targets, &mut evidence);
1000
1001        let ev = evidence.get(&key).expect("evidence for key");
1002        assert_eq!(
1003            ev.presence.get(&peer),
1004            Some(&PresenceEvidence::Present),
1005            "presence should be Present"
1006        );
1007        assert_eq!(
1008            ev.paid_list.get(&peer),
1009            Some(&PaidListEvidence::Confirmed),
1010            "paid_list should be Confirmed"
1011        );
1012    }
1013
1014    #[test]
1015    fn process_response_missing_key_gets_unresolved() {
1016        let key = xor_name_from_byte(0xA0);
1017        let peer = peer_id_from_byte(2);
1018
1019        let targets = single_key_targets(&key, vec![peer], vec![peer]);
1020
1021        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
1022            key,
1023            KeyVerificationEvidence {
1024                presence: HashMap::new(),
1025                paid_list: HashMap::new(),
1026            },
1027        ))
1028        .collect();
1029
1030        // Empty response: peer did not include our key.
1031        let response = VerificationResponse { results: vec![] };
1032
1033        process_verification_response(&peer, &response, &targets, &mut evidence);
1034
1035        let ev = evidence.get(&key).expect("evidence for key");
1036        assert_eq!(
1037            ev.presence.get(&peer),
1038            Some(&PresenceEvidence::Unresolved),
1039            "missing key in response should be Unresolved"
1040        );
1041        assert_eq!(
1042            ev.paid_list.get(&peer),
1043            Some(&PaidListEvidence::Unresolved),
1044            "missing paid key in response should be Unresolved"
1045        );
1046    }
1047
1048    #[test]
1049    fn process_response_ignores_unsolicited_keys() {
1050        let key = xor_name_from_byte(0xB0);
1051        let unsolicited_key = xor_name_from_byte(0xB1);
1052        let peer = peer_id_from_byte(3);
1053
1054        let targets = single_key_targets(&key, vec![peer], vec![]);
1055
1056        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
1057            key,
1058            KeyVerificationEvidence {
1059                presence: HashMap::new(),
1060                paid_list: HashMap::new(),
1061            },
1062        ))
1063        .collect();
1064
1065        // Response includes an unsolicited key.
1066        let response = VerificationResponse {
1067            results: vec![
1068                KeyVerificationResult {
1069                    key: unsolicited_key,
1070                    present: true,
1071                    paid: None,
1072                },
1073                KeyVerificationResult {
1074                    key,
1075                    present: false,
1076                    paid: None,
1077                },
1078            ],
1079        };
1080
1081        process_verification_response(&peer, &response, &targets, &mut evidence);
1082
1083        // Unsolicited key should not appear in evidence.
1084        assert!(
1085            !evidence.contains_key(&unsolicited_key),
1086            "unsolicited key should not be in evidence"
1087        );
1088
1089        let ev = evidence.get(&key).expect("evidence for key");
1090        assert_eq!(
1091            ev.presence.get(&peer),
1092            Some(&PresenceEvidence::Absent),
1093            "solicited key should have Absent"
1094        );
1095    }
1096
1097    // -----------------------------------------------------------------------
1098    // mark_peer_unresolved
1099    // -----------------------------------------------------------------------
1100
1101    #[test]
1102    fn mark_unresolved_sets_all_keys_for_peer() {
1103        let key_a = xor_name_from_byte(0xC0);
1104        let key_b = xor_name_from_byte(0xC1);
1105        let peer = peer_id_from_byte(5);
1106
1107        // Peer is a quorum target for key_a and a paid target for key_b.
1108        let targets = VerificationTargets {
1109            quorum_targets: std::iter::once((key_a, vec![peer])).collect(),
1110            paid_targets: std::iter::once((key_b, vec![peer])).collect(),
1111            paid_group_sizes: [(key_a, 0), (key_b, 1)].into_iter().collect(),
1112            all_peers: std::iter::once(peer).collect(),
1113            peer_to_keys: std::iter::once((peer, vec![key_a, key_b])).collect(),
1114            peer_to_paid_keys: std::iter::once((peer, std::iter::once(key_b).collect())).collect(),
1115        };
1116
1117        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
1118            (
1119                key_a,
1120                KeyVerificationEvidence {
1121                    presence: HashMap::new(),
1122                    paid_list: HashMap::new(),
1123                },
1124            ),
1125            (
1126                key_b,
1127                KeyVerificationEvidence {
1128                    presence: HashMap::new(),
1129                    paid_list: HashMap::new(),
1130                },
1131            ),
1132        ]
1133        .into_iter()
1134        .collect();
1135
1136        mark_peer_unresolved(&peer, &targets, &mut evidence);
1137
1138        let ev_a = evidence.get(&key_a).expect("evidence for key_a");
1139        assert_eq!(
1140            ev_a.presence.get(&peer),
1141            Some(&PresenceEvidence::Unresolved)
1142        );
1143        // key_a is not in peer_to_paid_keys, so no paid_list entry.
1144        assert!(!ev_a.paid_list.contains_key(&peer));
1145
1146        let ev_b = evidence.get(&key_b).expect("evidence for key_b");
1147        assert_eq!(
1148            ev_b.presence.get(&peer),
1149            Some(&PresenceEvidence::Unresolved)
1150        );
1151        assert_eq!(
1152            ev_b.paid_list.get(&peer),
1153            Some(&PaidListEvidence::Unresolved)
1154        );
1155    }
1156
1157    // -----------------------------------------------------------------------
1158    // Section 18 scenarios
1159    // -----------------------------------------------------------------------
1160
1161    /// Scenario 4: All peers respond Absent with no paid confirmations.
1162    /// Both presence and paid-list paths are impossible -> `QuorumFailed`.
1163    #[test]
1164    fn scenario_4_quorum_fail_transitions_to_abandoned() {
1165        let key = xor_name_from_byte(0xD0);
1166        let config = ReplicationConfig::default();
1167
1168        // 7 quorum peers, threshold = min(4, floor(7/2)+1) = 4
1169        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1170        // 5 paid peers, confirm_needed = floor(5/2)+1 = 3
1171        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1172        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1173
1174        // All quorum peers respond Absent, all paid peers respond NotFound.
1175        let evidence = build_evidence(
1176            quorum_peers
1177                .iter()
1178                .map(|p| (*p, PresenceEvidence::Absent))
1179                .collect(),
1180            paid_peers
1181                .iter()
1182                .map(|p| (*p, PaidListEvidence::NotFound))
1183                .collect(),
1184        );
1185
1186        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1187        assert!(
1188            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
1189            "all-Absent with no paid confirmations should yield QuorumFailed, got {outcome:?}"
1190        );
1191    }
1192
1193    /// Scenario 16: All peers unresolved (timeout). Neither success nor
1194    /// fail-fast is possible because unresolved counts keep both paths alive.
1195    #[test]
1196    fn scenario_16_timeout_yields_inconclusive() {
1197        let key = xor_name_from_byte(0xD1);
1198        let config = ReplicationConfig::default();
1199
1200        // 7 quorum peers, quorum_needed = 4
1201        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1202        // 5 paid peers, confirm_needed = 3
1203        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1204        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1205
1206        // Every peer is Unresolved (simulating full timeout).
1207        let evidence = build_evidence(
1208            quorum_peers
1209                .iter()
1210                .map(|p| (*p, PresenceEvidence::Unresolved))
1211                .collect(),
1212            paid_peers
1213                .iter()
1214                .map(|p| (*p, PaidListEvidence::Unresolved))
1215                .collect(),
1216        );
1217
1218        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1219        assert!(
1220            matches!(outcome, KeyVerificationOutcome::QuorumInconclusive),
1221            "all-unresolved should yield QuorumInconclusive, got {outcome:?}"
1222        );
1223    }
1224
1225    /// Scenario 27: A single verification round collects both presence
1226    /// evidence from `QuorumTargets` and paid-list confirmations from
1227    /// `PaidTargets`. Paid-list success triggers `PaidListVerified` even when
1228    /// presence quorum fails.
1229    #[test]
1230    fn scenario_27_single_round_collects_both_presence_and_paid() {
1231        let key = xor_name_from_byte(0xD2);
1232        let config = ReplicationConfig::default();
1233
1234        // 7 quorum peers: only 1 Present (quorum_needed=4, so quorum fails).
1235        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1236        // 5 paid peers: 3 Confirmed (confirm_needed=3, so paid passes).
1237        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1238        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1239
1240        let evidence = build_evidence(
1241            vec![
1242                (quorum_peers[0], PresenceEvidence::Present),
1243                (quorum_peers[1], PresenceEvidence::Absent),
1244                (quorum_peers[2], PresenceEvidence::Absent),
1245                (quorum_peers[3], PresenceEvidence::Absent),
1246                (quorum_peers[4], PresenceEvidence::Absent),
1247                (quorum_peers[5], PresenceEvidence::Absent),
1248                (quorum_peers[6], PresenceEvidence::Absent),
1249            ],
1250            vec![
1251                (paid_peers[0], PaidListEvidence::Confirmed),
1252                (paid_peers[1], PaidListEvidence::Confirmed),
1253                (paid_peers[2], PaidListEvidence::Confirmed),
1254                (paid_peers[3], PaidListEvidence::NotFound),
1255                (paid_peers[4], PaidListEvidence::NotFound),
1256            ],
1257        );
1258
1259        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1260        assert!(
1261            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
1262            "paid-list majority should trigger PaidListVerified when quorum fails, got {outcome:?}"
1263        );
1264    }
1265
1266    /// Scenario 28: With |QuorumTargets|=3,
1267    /// `QuorumNeeded` = min(4, floor(3/2)+1) = min(4, 2) = 2.
1268    /// 2 Present responses should pass.
1269    #[test]
1270    fn scenario_28_dynamic_threshold_with_3_targets() {
1271        let key = xor_name_from_byte(0xD3);
1272        let config = ReplicationConfig::default();
1273
1274        let quorum_peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
1275        let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
1276
1277        // Verify the dynamic threshold is indeed 2.
1278        assert_eq!(config.quorum_needed(3), 2, "quorum_needed(3) should be 2");
1279
1280        // 2 Present, 1 Absent -> 2 >= 2 -> QuorumVerified.
1281        let evidence = build_evidence(
1282            vec![
1283                (quorum_peers[0], PresenceEvidence::Present),
1284                (quorum_peers[1], PresenceEvidence::Present),
1285                (quorum_peers[2], PresenceEvidence::Absent),
1286            ],
1287            vec![],
1288        );
1289
1290        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1291        assert!(
1292            matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 2),
1293            "2 Present in 3-target set should QuorumVerify, got {outcome:?}"
1294        );
1295    }
1296
1297    /// Helper: build `VerificationTargets` for two keys with shared or
1298    /// separate peer sets.
1299    fn two_key_targets(
1300        key_a: &XorName,
1301        key_b: &XorName,
1302        quorum_peers_a: Vec<PeerId>,
1303        quorum_peers_b: Vec<PeerId>,
1304        paid_peers_a: Vec<PeerId>,
1305        paid_peers_b: Vec<PeerId>,
1306    ) -> VerificationTargets {
1307        let mut all_peers = HashSet::new();
1308        let mut peer_to_keys: HashMap<PeerId, Vec<XorName>> = HashMap::new();
1309        let mut peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>> = HashMap::new();
1310
1311        for &p in &quorum_peers_a {
1312            all_peers.insert(p);
1313            peer_to_keys.entry(p).or_default().push(*key_a);
1314        }
1315        for &p in &quorum_peers_b {
1316            all_peers.insert(p);
1317            peer_to_keys.entry(p).or_default().push(*key_b);
1318        }
1319        for &p in &paid_peers_a {
1320            all_peers.insert(p);
1321            peer_to_keys.entry(p).or_default().push(*key_a);
1322            peer_to_paid_keys.entry(p).or_default().insert(*key_a);
1323        }
1324        for &p in &paid_peers_b {
1325            all_peers.insert(p);
1326            peer_to_keys.entry(p).or_default().push(*key_b);
1327            peer_to_paid_keys.entry(p).or_default().insert(*key_b);
1328        }
1329
1330        for keys_list in peer_to_keys.values_mut() {
1331            keys_list.sort_unstable();
1332            keys_list.dedup();
1333        }
1334
1335        let mut quorum_targets = HashMap::new();
1336        quorum_targets.insert(*key_a, quorum_peers_a);
1337        quorum_targets.insert(*key_b, quorum_peers_b);
1338
1339        let mut paid_targets = HashMap::new();
1340        let paid_group_size_a = paid_peers_a.len();
1341        let paid_group_size_b = paid_peers_b.len();
1342        paid_targets.insert(*key_a, paid_peers_a);
1343        paid_targets.insert(*key_b, paid_peers_b);
1344
1345        VerificationTargets {
1346            quorum_targets,
1347            paid_targets,
1348            paid_group_sizes: [(*key_a, paid_group_size_a), (*key_b, paid_group_size_b)]
1349                .into_iter()
1350                .collect(),
1351            all_peers,
1352            peer_to_keys,
1353            peer_to_paid_keys,
1354        }
1355    }
1356
1357    /// Scenario 33: `process_verification_response` correctly attributes
1358    /// per-key evidence when a single peer responds for multiple keys.
1359    #[test]
1360    fn scenario_33_batched_response_per_key_evidence() {
1361        let key_a = xor_name_from_byte(0xD4);
1362        let key_b = xor_name_from_byte(0xD5);
1363        let peer = peer_id_from_byte(1);
1364
1365        // Peer is a quorum+paid target for both keys.
1366        let targets = two_key_targets(
1367            &key_a,
1368            &key_b,
1369            vec![peer],
1370            vec![peer],
1371            vec![peer],
1372            vec![peer],
1373        );
1374
1375        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
1376            (
1377                key_a,
1378                KeyVerificationEvidence {
1379                    presence: HashMap::new(),
1380                    paid_list: HashMap::new(),
1381                },
1382            ),
1383            (
1384                key_b,
1385                KeyVerificationEvidence {
1386                    presence: HashMap::new(),
1387                    paid_list: HashMap::new(),
1388                },
1389            ),
1390        ]
1391        .into_iter()
1392        .collect();
1393
1394        // Peer responds: key_a Present+Confirmed, key_b Absent+NotFound.
1395        let response = VerificationResponse {
1396            results: vec![
1397                KeyVerificationResult {
1398                    key: key_a,
1399                    present: true,
1400                    paid: Some(true),
1401                },
1402                KeyVerificationResult {
1403                    key: key_b,
1404                    present: false,
1405                    paid: Some(false),
1406                },
1407            ],
1408        };
1409
1410        process_verification_response(&peer, &response, &targets, &mut evidence);
1411
1412        // key_a: Present + Confirmed.
1413        let ev_a = evidence.get(&key_a).expect("evidence for key_a");
1414        assert_eq!(ev_a.presence.get(&peer), Some(&PresenceEvidence::Present));
1415        assert_eq!(
1416            ev_a.paid_list.get(&peer),
1417            Some(&PaidListEvidence::Confirmed)
1418        );
1419
1420        // key_b: Absent + NotFound.
1421        let ev_b = evidence.get(&key_b).expect("evidence for key_b");
1422        assert_eq!(ev_b.presence.get(&peer), Some(&PresenceEvidence::Absent));
1423        assert_eq!(ev_b.paid_list.get(&peer), Some(&PaidListEvidence::NotFound));
1424    }
1425
1426    /// Scenario 34: Peer responds for `key_a` but omits `key_b`.
1427    /// `key_a` gets explicit evidence, `key_b` gets Unresolved.
1428    #[test]
1429    fn scenario_34_partial_response_unresolved_per_key() {
1430        let key_a = xor_name_from_byte(0xD6);
1431        let key_b = xor_name_from_byte(0xD7);
1432        let peer = peer_id_from_byte(2);
1433
1434        // Peer is a quorum target for both keys, paid target for key_b only.
1435        let targets = two_key_targets(&key_a, &key_b, vec![peer], vec![peer], vec![], vec![peer]);
1436
1437        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
1438            (
1439                key_a,
1440                KeyVerificationEvidence {
1441                    presence: HashMap::new(),
1442                    paid_list: HashMap::new(),
1443                },
1444            ),
1445            (
1446                key_b,
1447                KeyVerificationEvidence {
1448                    presence: HashMap::new(),
1449                    paid_list: HashMap::new(),
1450                },
1451            ),
1452        ]
1453        .into_iter()
1454        .collect();
1455
1456        // Peer responds only for key_a, omits key_b entirely.
1457        let response = VerificationResponse {
1458            results: vec![KeyVerificationResult {
1459                key: key_a,
1460                present: true,
1461                paid: None,
1462            }],
1463        };
1464
1465        process_verification_response(&peer, &response, &targets, &mut evidence);
1466
1467        // key_a: explicit Present.
1468        let ev_a = evidence.get(&key_a).expect("evidence for key_a");
1469        assert_eq!(
1470            ev_a.presence.get(&peer),
1471            Some(&PresenceEvidence::Present),
1472            "key_a should have explicit Present"
1473        );
1474
1475        // key_b: missing from response -> Unresolved for both presence and
1476        // paid_list.
1477        let ev_b = evidence.get(&key_b).expect("evidence for key_b");
1478        assert_eq!(
1479            ev_b.presence.get(&peer),
1480            Some(&PresenceEvidence::Unresolved),
1481            "omitted key_b should get Unresolved presence"
1482        );
1483        assert_eq!(
1484            ev_b.paid_list.get(&peer),
1485            Some(&PaidListEvidence::Unresolved),
1486            "omitted key_b (paid target) should get Unresolved paid_list"
1487        );
1488    }
1489
1490    /// Scenario 42: `QuorumVerified` outcome populates sources correctly,
1491    /// which downstream uses to add the key to `PaidForList`.
1492    #[test]
1493    fn scenario_42_quorum_pass_derives_paid_list_auth() {
1494        let key = xor_name_from_byte(0xD8);
1495        let config = ReplicationConfig::default();
1496
1497        // 5 quorum peers, quorum_needed = min(4, 3) = 3.
1498        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
1499        // 3 paid peers (some overlap with quorum peers for realistic scenario).
1500        let paid_peers: Vec<PeerId> = (3..=5).map(peer_id_from_byte).collect();
1501        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1502
1503        // 4 quorum peers Present, 1 Absent -> quorum met.
1504        // Also mark paid_peers[0] (peer 3) as Present so it's collected from
1505        // paid targets too.
1506        let evidence = build_evidence(
1507            vec![
1508                (quorum_peers[0], PresenceEvidence::Present),
1509                (quorum_peers[1], PresenceEvidence::Present),
1510                (quorum_peers[2], PresenceEvidence::Present), // peer 3
1511                (quorum_peers[3], PresenceEvidence::Present), // peer 4
1512                (quorum_peers[4], PresenceEvidence::Absent),  // peer 5
1513            ],
1514            vec![
1515                (paid_peers[0], PaidListEvidence::NotFound),
1516                (paid_peers[1], PaidListEvidence::NotFound),
1517                (paid_peers[2], PaidListEvidence::NotFound),
1518            ],
1519        );
1520
1521        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1522        match outcome {
1523            KeyVerificationOutcome::QuorumVerified { ref sources } => {
1524                // Sources should include peers that responded Present from
1525                // both quorum and paid targets.
1526                assert!(
1527                    sources.len() >= 4,
1528                    "QuorumVerified sources should contain at least the 4 quorum-positive peers, got {}",
1529                    sources.len()
1530                );
1531                // The sources list is used downstream to authorize
1532                // PaidForList insertion. Verify specific peers are present.
1533                assert!(
1534                    sources.contains(&quorum_peers[0]),
1535                    "source peer 1 should be in sources"
1536                );
1537                assert!(
1538                    sources.contains(&quorum_peers[1]),
1539                    "source peer 2 should be in sources"
1540                );
1541            }
1542            other => panic!("expected QuorumVerified, got {other:?}"),
1543        }
1544    }
1545
1546    /// Scenario 44: Paid-list cold-start recovery via replica majority.
1547    ///
1548    /// Multiple nodes restart simultaneously and lose their `PaidForList`
1549    /// (persistence corrupted). Key `K` still has `>= QuorumNeeded(K)`
1550    /// replicas in the close group. During neighbor-sync verification,
1551    /// presence quorum passes and all verifying nodes re-derive `K` into
1552    /// their `PaidForList` via close-group replica majority (Section 7.2
1553    /// rule 4).
1554    ///
1555    /// This test verifies that when paid-list evidence is entirely
1556    /// `NotFound` (simulating data loss) but presence evidence meets
1557    /// quorum, the outcome is `QuorumVerified` with sources that enable
1558    /// `PaidForList` re-derivation.
1559    #[test]
1560    fn scenario_44_cold_start_recovery_via_replica_majority() {
1561        let key = xor_name_from_byte(0xD9);
1562        let config = ReplicationConfig::default();
1563
1564        // 7 quorum peers, quorum_needed = min(4, floor(7/2)+1) = 4.
1565        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1566        // 10 paid peers (wider group), confirm_needed = floor(10/2)+1 = 6.
1567        let paid_peers: Vec<PeerId> = (10..=19).map(peer_id_from_byte).collect();
1568        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1569
1570        // Cold-start scenario: ALL paid-list entries are lost across every
1571        // peer in PaidCloseGroup. Every paid peer reports NotFound.
1572        let paid_evidence: Vec<(PeerId, PaidListEvidence)> = paid_peers
1573            .iter()
1574            .map(|p| (*p, PaidListEvidence::NotFound))
1575            .collect();
1576
1577        // But the replicas still exist: 5 out of 7 quorum peers report
1578        // Present (>= QuorumNeeded(K) = 4).
1579        let presence_evidence = vec![
1580            (quorum_peers[0], PresenceEvidence::Present),
1581            (quorum_peers[1], PresenceEvidence::Present),
1582            (quorum_peers[2], PresenceEvidence::Present),
1583            (quorum_peers[3], PresenceEvidence::Present),
1584            (quorum_peers[4], PresenceEvidence::Present),
1585            (quorum_peers[5], PresenceEvidence::Absent),
1586            (quorum_peers[6], PresenceEvidence::Absent),
1587        ];
1588
1589        let evidence = build_evidence(presence_evidence, paid_evidence);
1590        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1591
1592        match outcome {
1593            KeyVerificationOutcome::QuorumVerified { ref sources } => {
1594                // Quorum passed despite total paid-list loss. The caller
1595                // re-derives PaidForList from close-group replica majority.
1596                assert!(
1597                    sources.len() >= 4,
1598                    "QuorumVerified should have >= 4 sources (the presence-positive peers), got {}",
1599                    sources.len()
1600                );
1601
1602                // Verify the specific Present peers are in sources.
1603                for (i, peer) in quorum_peers.iter().enumerate().take(5) {
1604                    assert!(
1605                        sources.contains(peer),
1606                        "quorum_peer[{i}] responded Present and should be a fetch source"
1607                    );
1608                }
1609
1610                // Absent peers are NOT sources.
1611                assert!(
1612                    !sources.contains(&quorum_peers[5]),
1613                    "absent peer should not be a fetch source"
1614                );
1615                assert!(
1616                    !sources.contains(&quorum_peers[6]),
1617                    "absent peer should not be a fetch source"
1618                );
1619            }
1620            other => panic!(
1621                "Cold-start recovery should succeed via replica majority \
1622                 (QuorumVerified), got {other:?}"
1623            ),
1624        }
1625    }
1626
1627    /// Scenario 20: Unknown replica key found in local `PaidForList` bypasses
1628    /// presence quorum.
1629    ///
1630    /// When a key's paid-list evidence shows confirmation from enough peers,
1631    /// `PaidListVerified` is returned even without a single presence-positive
1632    /// response.  This models the local-hit fast-path: the caller already
1633    /// checked the local paid list and the network confirms majority — no
1634    /// presence quorum needed.
1635    #[test]
1636    fn scenario_20_paid_list_local_hit_bypasses_presence_quorum() {
1637        let key = xor_name_from_byte(0xE0);
1638        let config = ReplicationConfig::default();
1639
1640        // 7 quorum peers, quorum_needed = 4.
1641        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1642        // 5 paid peers, confirm_needed = floor(5/2)+1 = 3.
1643        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1644        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1645
1646        // ALL quorum peers Absent (presence quorum impossible) but 3/5 paid
1647        // peers confirm → PaidListVerified.
1648        let evidence = build_evidence(
1649            quorum_peers
1650                .iter()
1651                .map(|p| (*p, PresenceEvidence::Absent))
1652                .collect(),
1653            vec![
1654                (paid_peers[0], PaidListEvidence::Confirmed),
1655                (paid_peers[1], PaidListEvidence::Confirmed),
1656                (paid_peers[2], PaidListEvidence::Confirmed),
1657                (paid_peers[3], PaidListEvidence::NotFound),
1658                (paid_peers[4], PaidListEvidence::NotFound),
1659            ],
1660        );
1661
1662        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1663        assert!(
1664            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
1665            "paid-list majority should bypass failed presence quorum, got {outcome:?}"
1666        );
1667    }
1668
1669    /// Scenario 22: Paid-list confirmation below threshold AND presence quorum
1670    /// fails → `QuorumFailed`.
1671    ///
1672    /// Neither path can succeed: presence peers are all Absent (can't reach
1673    /// `quorum_needed`) and paid confirmations are below `confirm_needed`.
1674    #[test]
1675    fn scenario_22_paid_list_rejection_below_threshold() {
1676        let key = xor_name_from_byte(0xE2);
1677        let config = ReplicationConfig::default();
1678
1679        // 7 quorum peers, quorum_needed = 4.
1680        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1681        // 5 paid peers, confirm_needed = 3.
1682        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1683        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1684
1685        // All quorum peers Absent; only 2/5 paid confirmations (below 3).
1686        let evidence = build_evidence(
1687            quorum_peers
1688                .iter()
1689                .map(|p| (*p, PresenceEvidence::Absent))
1690                .collect(),
1691            vec![
1692                (paid_peers[0], PaidListEvidence::Confirmed),
1693                (paid_peers[1], PaidListEvidence::Confirmed),
1694                (paid_peers[2], PaidListEvidence::NotFound),
1695                (paid_peers[3], PaidListEvidence::NotFound),
1696                (paid_peers[4], PaidListEvidence::NotFound),
1697            ],
1698        );
1699
1700        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1701        assert!(
1702            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
1703            "below-threshold paid confirmations with all-Absent quorum should yield QuorumFailed, got {outcome:?}"
1704        );
1705    }
1706}