Skip to main content

ant_node/replication/
quorum.rs

1//! Quorum verification logic (Section 9).
2//!
3//! Single-round batched verification: presence + paid-list evidence collected
4//! in one request round to `VerifyTargets = PaidTargets ∪ QuorumTargets`.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8
9use crate::logging::{debug, warn};
10use saorsa_core::identity::PeerId;
11use saorsa_core::P2PNode;
12
13use crate::ant_protocol::XorName;
14use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
15use crate::replication::protocol::{
16    ReplicationMessage, ReplicationMessageBody, VerificationRequest, VerificationResponse,
17};
18use crate::replication::types::{KeyVerificationEvidence, PaidListEvidence, PresenceEvidence};
19
20// ---------------------------------------------------------------------------
21// Verification targets
22// ---------------------------------------------------------------------------
23
24/// Targets for verifying a set of keys.
25#[derive(Debug)]
26pub struct VerificationTargets {
27    /// Per-key: closest `CLOSE_GROUP_SIZE` peers (excluding self) for presence
28    /// quorum.
29    pub quorum_targets: HashMap<XorName, Vec<PeerId>>,
30    /// Per-key: `PaidCloseGroup` peers for paid-list majority.
31    pub paid_targets: HashMap<XorName, Vec<PeerId>>,
32    /// Per-key: self-inclusive paid close-group size used to compute
33    /// `ConfirmNeeded(K)`.
34    pub paid_group_sizes: HashMap<XorName, usize>,
35    /// Union of all target peers across all keys.
36    pub all_peers: HashSet<PeerId>,
37    /// Which keys each peer should be queried about.
38    pub peer_to_keys: HashMap<PeerId, Vec<XorName>>,
39    /// Which keys need paid-list checks from which peers.
40    pub peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>>,
41}
42
43/// Compute verification targets for a batch of keys.
44///
45/// For each key, determines the `QuorumTargets` (closest `CLOSE_GROUP_SIZE`
46/// peers excluding self) and `PaidTargets` (`PaidCloseGroup` excluding self),
47/// then unions them into per-peer request batches.
48pub async fn compute_verification_targets(
49    keys: &[XorName],
50    p2p_node: &Arc<P2PNode>,
51    config: &ReplicationConfig,
52    self_id: &PeerId,
53) -> VerificationTargets {
54    let dht = p2p_node.dht_manager();
55    let mut targets = VerificationTargets {
56        quorum_targets: HashMap::new(),
57        paid_targets: HashMap::new(),
58        paid_group_sizes: HashMap::new(),
59        all_peers: HashSet::new(),
60        peer_to_keys: HashMap::new(),
61        peer_to_paid_keys: HashMap::new(),
62    };
63
64    for &key in keys {
65        // QuorumTargets: up to CLOSE_GROUP_SIZE nearest peers for K, excluding
66        // self.
67        let closest = dht
68            .find_closest_nodes_local(&key, config.close_group_size)
69            .await;
70        let quorum_peers: Vec<PeerId> = closest
71            .iter()
72            .filter(|n| n.peer_id != *self_id)
73            .map(|n| n.peer_id)
74            .collect();
75
76        // PaidTargets: PaidCloseGroup(K) excluding self.
77        let paid_closest = dht
78            .find_closest_nodes_local_with_self(&key, config.paid_list_close_group_size)
79            .await;
80        let paid_group_size = paid_closest.len();
81        let paid_peers: Vec<PeerId> = paid_closest
82            .iter()
83            .filter(|n| n.peer_id != *self_id)
84            .map(|n| n.peer_id)
85            .collect();
86
87        // VerifyTargets = PaidTargets ∪ QuorumTargets
88        for &peer in &quorum_peers {
89            targets.all_peers.insert(peer);
90            targets.peer_to_keys.entry(peer).or_default().push(key);
91        }
92        for &peer in &paid_peers {
93            targets.all_peers.insert(peer);
94            targets.peer_to_keys.entry(peer).or_default().push(key);
95            targets
96                .peer_to_paid_keys
97                .entry(peer)
98                .or_default()
99                .insert(key);
100        }
101
102        targets.quorum_targets.insert(key, quorum_peers);
103        targets.paid_targets.insert(key, paid_peers);
104        targets.paid_group_sizes.insert(key, paid_group_size);
105    }
106
107    // Deduplicate keys per peer (a peer in both quorum and paid targets for
108    // the same key would have it listed twice).
109    for keys_list in targets.peer_to_keys.values_mut() {
110        keys_list.sort_unstable();
111        keys_list.dedup();
112    }
113
114    targets
115}
116
117/// Compute presence-only verification targets for locally paid keys.
118///
119/// Local `PaidForList` membership authorizes the key already; this target set
120/// is only used to discover peers that can serve the record bytes.
121pub async fn compute_presence_targets(
122    keys: &[XorName],
123    p2p_node: &Arc<P2PNode>,
124    config: &ReplicationConfig,
125    self_id: &PeerId,
126) -> VerificationTargets {
127    let dht = p2p_node.dht_manager();
128    let mut targets = VerificationTargets {
129        quorum_targets: HashMap::new(),
130        paid_targets: HashMap::new(),
131        paid_group_sizes: HashMap::new(),
132        all_peers: HashSet::new(),
133        peer_to_keys: HashMap::new(),
134        peer_to_paid_keys: HashMap::new(),
135    };
136
137    for &key in keys {
138        let closest = dht
139            .find_closest_nodes_local(&key, config.close_group_size)
140            .await;
141        let quorum_peers: Vec<PeerId> = closest
142            .iter()
143            .filter(|n| n.peer_id != *self_id)
144            .map(|n| n.peer_id)
145            .collect();
146
147        for &peer in &quorum_peers {
148            targets.all_peers.insert(peer);
149            targets.peer_to_keys.entry(peer).or_default().push(key);
150        }
151
152        targets.quorum_targets.insert(key, quorum_peers);
153    }
154
155    for keys_list in targets.peer_to_keys.values_mut() {
156        keys_list.sort_unstable();
157        keys_list.dedup();
158    }
159
160    targets
161}
162
163// ---------------------------------------------------------------------------
164// Verification outcome
165// ---------------------------------------------------------------------------
166
167/// Outcome of verifying a single key.
168#[derive(Debug, Clone)]
169pub enum KeyVerificationOutcome {
170    /// Presence quorum passed.
171    QuorumVerified {
172        /// Peers that responded `Present` (verified fetch sources).
173        sources: Vec<PeerId>,
174    },
175    /// Paid-list authorization succeeded.
176    PaidListVerified {
177        /// Peers that responded `Present` (potential fetch sources, may be
178        /// empty).
179        sources: Vec<PeerId>,
180    },
181    /// Quorum failed definitively (both paths impossible).
182    QuorumFailed,
183    /// Inconclusive (timeout with neither success nor fail-fast).
184    QuorumInconclusive,
185}
186
187// ---------------------------------------------------------------------------
188// Evidence evaluation (pure logic, no I/O)
189// ---------------------------------------------------------------------------
190
191/// Evaluate verification evidence for a single key.
192///
193/// Returns the outcome based on Section 9 rules:
194/// - **Step 10**: If presence positives >= `QuorumNeeded(K)`, `QuorumVerified`.
195/// - **Step 9**: If paid confirmations >= `ConfirmNeeded(K)`,
196///   `PaidListVerified`.
197/// - **Step 14**: Fail fast when both paths are impossible.
198/// - **Step 15**: Otherwise inconclusive.
199#[must_use]
200pub fn evaluate_key_evidence(
201    key: &XorName,
202    evidence: &KeyVerificationEvidence,
203    targets: &VerificationTargets,
204    config: &ReplicationConfig,
205) -> KeyVerificationOutcome {
206    let quorum_peers = targets
207        .quorum_targets
208        .get(key)
209        .map_or(&[][..], Vec::as_slice);
210
211    // Count presence evidence from QuorumTargets.
212    let mut presence_positive = 0usize;
213    let mut presence_unresolved = 0usize;
214
215    for peer in quorum_peers {
216        match evidence.presence.get(peer) {
217            Some(PresenceEvidence::Present) => presence_positive += 1,
218            Some(PresenceEvidence::Absent) => {}
219            Some(PresenceEvidence::Unresolved) | None => {
220                presence_unresolved += 1;
221            }
222        }
223    }
224
225    // Also collect Present peers from paid targets for fetch sources.
226    let paid_peers = targets.paid_targets.get(key).map_or(&[][..], Vec::as_slice);
227    let present_peers = collect_present_sources(evidence, quorum_peers, paid_peers);
228
229    // Count paid-list evidence from PaidTargets.
230    let mut paid_confirmed = 0usize;
231    let mut paid_unresolved = 0usize;
232
233    for peer in paid_peers {
234        match evidence.paid_list.get(peer) {
235            Some(PaidListEvidence::Confirmed) => paid_confirmed += 1,
236            Some(PaidListEvidence::NotFound) => {}
237            Some(PaidListEvidence::Unresolved) | None => paid_unresolved += 1,
238        }
239    }
240
241    let quorum_needed = config.quorum_needed(quorum_peers.len());
242    let paid_group_size = targets
243        .paid_group_sizes
244        .get(key)
245        .copied()
246        .unwrap_or(paid_peers.len());
247    let confirm_needed = ReplicationConfig::confirm_needed(paid_group_size);
248
249    // Step 10: Presence quorum reached.
250    // quorum_needed == 0 means zero targets exist — quorum is impossible,
251    // not trivially met.
252    if quorum_needed > 0 && presence_positive >= quorum_needed {
253        return KeyVerificationOutcome::QuorumVerified {
254            sources: present_peers,
255        };
256    }
257
258    // Step 9: Paid-list majority reached.
259    // confirm_needed from 0 paid peers is 1, so this naturally fails with
260    // 0 confirmed — no special guard needed. But be explicit for clarity.
261    if paid_group_size > 0 && paid_confirmed >= confirm_needed {
262        return KeyVerificationOutcome::PaidListVerified {
263            sources: present_peers,
264        };
265    }
266
267    // Step 14: Fail fast when both paths are impossible.
268    let paid_possible = paid_group_size > 0 && paid_confirmed + paid_unresolved >= confirm_needed;
269    let quorum_possible =
270        quorum_needed > 0 && presence_positive + presence_unresolved >= quorum_needed;
271
272    if !paid_possible && !quorum_possible {
273        return KeyVerificationOutcome::QuorumFailed;
274    }
275
276    // Step 15: Neither success nor fail-fast.
277    KeyVerificationOutcome::QuorumInconclusive
278}
279
280/// Return peers that gave positive presence evidence for a key.
281///
282/// Only peers in the computed verification target sets are considered.
283#[must_use]
284pub fn present_sources_for_key(
285    key: &XorName,
286    evidence: &KeyVerificationEvidence,
287    targets: &VerificationTargets,
288) -> Vec<PeerId> {
289    let quorum_peers = targets
290        .quorum_targets
291        .get(key)
292        .map_or(&[][..], Vec::as_slice);
293    let paid_peers = targets.paid_targets.get(key).map_or(&[][..], Vec::as_slice);
294
295    collect_present_sources(evidence, quorum_peers, paid_peers)
296}
297
298fn collect_present_sources(
299    evidence: &KeyVerificationEvidence,
300    quorum_peers: &[PeerId],
301    paid_peers: &[PeerId],
302) -> Vec<PeerId> {
303    let mut present_peers = Vec::new();
304    let mut seen = HashSet::new();
305
306    for peer in quorum_peers.iter().chain(paid_peers.iter()) {
307        if matches!(evidence.presence.get(peer), Some(PresenceEvidence::Present))
308            && seen.insert(*peer)
309        {
310            present_peers.push(*peer);
311        }
312    }
313
314    present_peers
315}
316
317// ---------------------------------------------------------------------------
318// Network verification round
319// ---------------------------------------------------------------------------
320
321/// Send batched verification requests to all peers and collect evidence.
322///
323/// Implements Section 9 requirement: one request per peer carrying many keys.
324/// Returns per-key evidence aggregated from all peer responses.
325pub async fn run_verification_round(
326    keys: &[XorName],
327    targets: &VerificationTargets,
328    p2p_node: &Arc<P2PNode>,
329    config: &ReplicationConfig,
330) -> HashMap<XorName, KeyVerificationEvidence> {
331    // Initialize empty evidence for all keys.
332    let mut evidence: HashMap<XorName, KeyVerificationEvidence> = keys
333        .iter()
334        .map(|&k| {
335            (
336                k,
337                KeyVerificationEvidence {
338                    presence: HashMap::new(),
339                    paid_list: HashMap::new(),
340                },
341            )
342        })
343        .collect();
344
345    // Send one batched request per peer.
346    let mut handles = Vec::new();
347
348    for (&peer, peer_keys) in &targets.peer_to_keys {
349        let paid_check_keys = targets.peer_to_paid_keys.get(&peer);
350
351        // Build paid_list_check_indices: which of this peer's keys need
352        // paid-list status.
353        let mut paid_indices = Vec::new();
354        for (i, key) in peer_keys.iter().enumerate() {
355            if let Some(paid_keys) = paid_check_keys {
356                if paid_keys.contains(key) {
357                    if let Ok(idx) = u32::try_from(i) {
358                        paid_indices.push(idx);
359                    }
360                }
361            }
362        }
363
364        let request = VerificationRequest {
365            keys: peer_keys.clone(),
366            paid_list_check_indices: paid_indices,
367        };
368
369        let msg = ReplicationMessage {
370            request_id: rand::random(),
371            body: ReplicationMessageBody::VerificationRequest(request),
372        };
373
374        let p2p = Arc::clone(p2p_node);
375        let timeout = config.verification_request_timeout;
376        let peer_id = peer;
377
378        handles.push(tokio::spawn(async move {
379            let encoded = match msg.encode() {
380                Ok(data) => data,
381                Err(e) => {
382                    warn!("Failed to encode verification request: {e}");
383                    return (peer_id, None);
384                }
385            };
386
387            match p2p
388                .send_request(&peer_id, REPLICATION_PROTOCOL_ID, encoded, timeout)
389                .await
390            {
391                Ok(response) => match ReplicationMessage::decode(&response.data) {
392                    Ok(decoded) => (peer_id, Some(decoded)),
393                    Err(e) => {
394                        warn!("Failed to decode verification response from {peer_id}: {e}");
395                        (peer_id, None)
396                    }
397                },
398                Err(e) => {
399                    debug!("Verification request to {peer_id} failed: {e}");
400                    (peer_id, None)
401                }
402            }
403        }));
404    }
405
406    // Collect responses.
407    for handle in handles {
408        let (peer, response) = match handle.await {
409            Ok(result) => result,
410            Err(e) => {
411                warn!("Verification task panicked: {e}");
412                continue;
413            }
414        };
415
416        let Some(msg) = response else {
417            // Timeout/error: mark all keys for this peer as unresolved.
418            mark_peer_unresolved(&peer, targets, &mut evidence);
419            continue;
420        };
421
422        if let ReplicationMessageBody::VerificationResponse(resp) = msg.body {
423            process_verification_response(&peer, &resp, targets, &mut evidence);
424        }
425    }
426
427    evidence
428}
429
430/// Mark all keys for a peer as unresolved (timeout / decode failure).
431fn mark_peer_unresolved(
432    peer: &PeerId,
433    targets: &VerificationTargets,
434    evidence: &mut HashMap<XorName, KeyVerificationEvidence>,
435) {
436    if let Some(peer_keys) = targets.peer_to_keys.get(peer) {
437        let is_paid_peer = targets.peer_to_paid_keys.get(peer);
438        for key in peer_keys {
439            if let Some(ev) = evidence.get_mut(key) {
440                ev.presence.insert(*peer, PresenceEvidence::Unresolved);
441                if is_paid_peer.is_some_and(|ks| ks.contains(key)) {
442                    ev.paid_list.insert(*peer, PaidListEvidence::Unresolved);
443                }
444            }
445        }
446    }
447}
448
449/// Process a single peer's verification response into the evidence map.
450fn process_verification_response(
451    peer: &PeerId,
452    response: &VerificationResponse,
453    targets: &VerificationTargets,
454    evidence: &mut HashMap<XorName, KeyVerificationEvidence>,
455) {
456    let Some(peer_keys) = targets.peer_to_keys.get(peer) else {
457        return;
458    };
459
460    // Use a HashSet for O(1) key membership checks instead of linear scan,
461    // preventing CPU amplification from large responses.
462    let peer_keys_set: HashSet<&XorName> = peer_keys.iter().collect();
463
464    // Cap results at 2x requested keys to limit processing of stuffed
465    // responses while still tolerating some unsolicited entries.
466    let max_results = peer_keys.len().saturating_mul(2);
467    let results = if response.results.len() > max_results {
468        warn!(
469            "Peer {peer} sent {} verification results but only {} keys were requested — truncating",
470            response.results.len(),
471            peer_keys.len(),
472        );
473        &response.results[..max_results]
474    } else {
475        &response.results
476    };
477
478    // Match response results to requested keys.
479    for result in results {
480        if !peer_keys_set.contains(&result.key) {
481            continue; // Ignore unsolicited key results.
482        }
483
484        if let Some(ev) = evidence.get_mut(&result.key) {
485            // Presence evidence.
486            let presence = if result.present {
487                PresenceEvidence::Present
488            } else {
489                PresenceEvidence::Absent
490            };
491            ev.presence.insert(*peer, presence);
492
493            // Paid-list evidence (only if requested).
494            if let Some(is_paid) = result.paid {
495                let paid = if is_paid {
496                    PaidListEvidence::Confirmed
497                } else {
498                    PaidListEvidence::NotFound
499                };
500                ev.paid_list.insert(*peer, paid);
501            }
502        }
503    }
504
505    // Keys that were requested but not in response -> unresolved.
506    let is_paid_peer = targets.peer_to_paid_keys.get(peer);
507    for key in peer_keys {
508        if let Some(ev) = evidence.get_mut(key) {
509            ev.presence
510                .entry(*peer)
511                .or_insert(PresenceEvidence::Unresolved);
512            if is_paid_peer.is_some_and(|ks| ks.contains(key)) {
513                ev.paid_list
514                    .entry(*peer)
515                    .or_insert(PaidListEvidence::Unresolved);
516            }
517        }
518    }
519}
520
521// ---------------------------------------------------------------------------
522// Tests
523// ---------------------------------------------------------------------------
524
525#[cfg(test)]
526#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
527mod tests {
528    use super::*;
529    use crate::replication::protocol::KeyVerificationResult;
530
531    /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
532    fn peer_id_from_byte(b: u8) -> PeerId {
533        let mut bytes = [0u8; 32];
534        bytes[0] = b;
535        PeerId::from_bytes(bytes)
536    }
537
538    /// Build an `XorName` from a single byte (repeated to 32 bytes).
539    fn xor_name_from_byte(b: u8) -> XorName {
540        [b; 32]
541    }
542
543    /// Helper: build minimal `VerificationTargets` for a single key with
544    /// explicit quorum and paid peer lists.
545    fn single_key_targets(
546        key: &XorName,
547        quorum_peers: Vec<PeerId>,
548        paid_peers: Vec<PeerId>,
549    ) -> VerificationTargets {
550        let mut all_peers = HashSet::new();
551        let mut peer_to_keys: HashMap<PeerId, Vec<XorName>> = HashMap::new();
552        let mut peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>> = HashMap::new();
553
554        for &p in &quorum_peers {
555            all_peers.insert(p);
556            peer_to_keys.entry(p).or_default().push(*key);
557        }
558        for &p in &paid_peers {
559            all_peers.insert(p);
560            peer_to_keys.entry(p).or_default().push(*key);
561            peer_to_paid_keys.entry(p).or_default().insert(*key);
562        }
563
564        // Deduplicate keys per peer.
565        for keys_list in peer_to_keys.values_mut() {
566            keys_list.sort_unstable();
567            keys_list.dedup();
568        }
569
570        let paid_group_size = paid_peers.len();
571        VerificationTargets {
572            quorum_targets: std::iter::once((key.to_owned(), quorum_peers)).collect(),
573            paid_group_sizes: std::iter::once((key.to_owned(), paid_group_size)).collect(),
574            paid_targets: std::iter::once((key.to_owned(), paid_peers)).collect(),
575            all_peers,
576            peer_to_keys,
577            peer_to_paid_keys,
578        }
579    }
580
581    /// Helper: build `KeyVerificationEvidence` from presence and paid-list
582    /// maps.
583    fn build_evidence(
584        presence: Vec<(PeerId, PresenceEvidence)>,
585        paid_list: Vec<(PeerId, PaidListEvidence)>,
586    ) -> KeyVerificationEvidence {
587        KeyVerificationEvidence {
588            presence: presence.into_iter().collect(),
589            paid_list: paid_list.into_iter().collect(),
590        }
591    }
592
593    #[test]
594    fn present_sources_for_key_filters_targets_and_deduplicates() {
595        let key = xor_name_from_byte(0x11);
596        let q_present = peer_id_from_byte(1);
597        let overlap = peer_id_from_byte(2);
598        let q_absent = peer_id_from_byte(3);
599        let q_unresolved = peer_id_from_byte(4);
600        let paid_present = peer_id_from_byte(5);
601        let paid_absent = peer_id_from_byte(6);
602        let outside_target = peer_id_from_byte(7);
603
604        let targets = single_key_targets(
605            &key,
606            vec![q_present, overlap, q_absent, q_unresolved],
607            vec![overlap, paid_present, paid_absent],
608        );
609        let evidence = build_evidence(
610            vec![
611                (q_present, PresenceEvidence::Present),
612                (overlap, PresenceEvidence::Present),
613                (q_absent, PresenceEvidence::Absent),
614                (q_unresolved, PresenceEvidence::Unresolved),
615                (paid_present, PresenceEvidence::Present),
616                (paid_absent, PresenceEvidence::Absent),
617                (outside_target, PresenceEvidence::Present),
618            ],
619            vec![],
620        );
621
622        let sources = present_sources_for_key(&key, &evidence, &targets);
623
624        assert_eq!(
625            sources,
626            vec![q_present, overlap, paid_present],
627            "sources should preserve quorum-first order, de-duplicate overlap, and ignore non-target/negative evidence"
628        );
629    }
630
631    // -----------------------------------------------------------------------
632    // evaluate_key_evidence: QuorumVerified
633    // -----------------------------------------------------------------------
634
635    #[test]
636    fn quorum_verified_with_enough_present_responses() {
637        let key = xor_name_from_byte(0x10);
638        let config = ReplicationConfig::default();
639
640        // 7 quorum peers, threshold = min(4, floor(7/2)+1) = 4
641        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
642        let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
643
644        // 4 peers say Present, 3 say Absent.
645        let evidence = build_evidence(
646            vec![
647                (quorum_peers[0], PresenceEvidence::Present),
648                (quorum_peers[1], PresenceEvidence::Present),
649                (quorum_peers[2], PresenceEvidence::Present),
650                (quorum_peers[3], PresenceEvidence::Present),
651                (quorum_peers[4], PresenceEvidence::Absent),
652                (quorum_peers[5], PresenceEvidence::Absent),
653                (quorum_peers[6], PresenceEvidence::Absent),
654            ],
655            vec![],
656        );
657
658        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
659        assert!(
660            matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 4),
661            "expected QuorumVerified with 4 sources, got {outcome:?}"
662        );
663    }
664
665    // -----------------------------------------------------------------------
666    // evaluate_key_evidence: PaidListVerified
667    // -----------------------------------------------------------------------
668
669    #[test]
670    fn paid_list_verified_with_enough_confirmations() {
671        let key = xor_name_from_byte(0x20);
672        let config = ReplicationConfig::default();
673
674        // 5 paid peers, confirm_needed = floor(5/2)+1 = 3
675        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
676        // No quorum peers (or quorum fails).
677        let quorum_peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
678        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
679
680        // Quorum: all Absent (fails presence path).
681        // Paid: 3 Confirmed, 2 NotFound -> majority reached.
682        let evidence = build_evidence(
683            vec![
684                (quorum_peers[0], PresenceEvidence::Absent),
685                (quorum_peers[1], PresenceEvidence::Absent),
686                (quorum_peers[2], PresenceEvidence::Absent),
687            ],
688            vec![
689                (paid_peers[0], PaidListEvidence::Confirmed),
690                (paid_peers[1], PaidListEvidence::Confirmed),
691                (paid_peers[2], PaidListEvidence::Confirmed),
692                (paid_peers[3], PaidListEvidence::NotFound),
693                (paid_peers[4], PaidListEvidence::NotFound),
694            ],
695        );
696
697        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
698        assert!(
699            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
700            "expected PaidListVerified, got {outcome:?}"
701        );
702    }
703
704    // -----------------------------------------------------------------------
705    // evaluate_key_evidence: QuorumFailed
706    // -----------------------------------------------------------------------
707
708    #[test]
709    fn quorum_failed_when_both_paths_impossible() {
710        let key = xor_name_from_byte(0x30);
711        let config = ReplicationConfig::default();
712
713        // 5 quorum peers, quorum_needed = min(4, floor(5/2)+1) = min(4,3) = 3
714        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
715        // 3 paid peers, confirm_needed = floor(3/2)+1 = 2
716        let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
717        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
718
719        // Presence: all 5 Absent (0 positive, 0 unresolved) -> can't reach 3.
720        // Paid: all 3 NotFound (0 confirmed, 0 unresolved) -> can't reach 2.
721        let evidence = build_evidence(
722            vec![
723                (quorum_peers[0], PresenceEvidence::Absent),
724                (quorum_peers[1], PresenceEvidence::Absent),
725                (quorum_peers[2], PresenceEvidence::Absent),
726                (quorum_peers[3], PresenceEvidence::Absent),
727                (quorum_peers[4], PresenceEvidence::Absent),
728            ],
729            vec![
730                (paid_peers[0], PaidListEvidence::NotFound),
731                (paid_peers[1], PaidListEvidence::NotFound),
732                (paid_peers[2], PaidListEvidence::NotFound),
733            ],
734        );
735
736        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
737        assert!(
738            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
739            "expected QuorumFailed, got {outcome:?}"
740        );
741    }
742
743    // -----------------------------------------------------------------------
744    // evaluate_key_evidence: QuorumInconclusive
745    // -----------------------------------------------------------------------
746
747    #[test]
748    fn quorum_inconclusive_with_unresolved_peers() {
749        let key = xor_name_from_byte(0x40);
750        let config = ReplicationConfig::default();
751
752        // 5 quorum peers, quorum_needed = min(4, 3) = 3
753        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
754        // 3 paid peers, confirm_needed = 2
755        let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
756        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
757
758        // Presence: 2 Present, 1 Absent, 2 Unresolved.
759        // positive=2, unresolved=2 -> 2+2=4 >= 3 -> quorum still possible.
760        // Paid: 1 Confirmed, 1 Unresolved, 1 NotFound.
761        // confirmed=1, unresolved=1 -> 1+1=2 >= 2 -> paid still possible.
762        // Neither path reached yet -> Inconclusive.
763        let evidence = build_evidence(
764            vec![
765                (quorum_peers[0], PresenceEvidence::Present),
766                (quorum_peers[1], PresenceEvidence::Present),
767                (quorum_peers[2], PresenceEvidence::Absent),
768                (quorum_peers[3], PresenceEvidence::Unresolved),
769                (quorum_peers[4], PresenceEvidence::Unresolved),
770            ],
771            vec![
772                (paid_peers[0], PaidListEvidence::Confirmed),
773                (paid_peers[1], PaidListEvidence::Unresolved),
774                (paid_peers[2], PaidListEvidence::NotFound),
775            ],
776        );
777
778        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
779        assert!(
780            matches!(outcome, KeyVerificationOutcome::QuorumInconclusive),
781            "expected QuorumInconclusive, got {outcome:?}"
782        );
783    }
784
785    // -----------------------------------------------------------------------
786    // Dynamic thresholds with undersized sets
787    // -----------------------------------------------------------------------
788
789    #[test]
790    fn quorum_verified_with_undersized_quorum_targets() {
791        let key = xor_name_from_byte(0x50);
792        let config = ReplicationConfig::default();
793
794        // Only 2 quorum peers (undersized).
795        // quorum_needed = min(4, floor(2/2)+1) = min(4, 2) = 2
796        let quorum_peers: Vec<PeerId> = (1..=2).map(peer_id_from_byte).collect();
797        let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
798
799        // Both Present -> 2 >= 2 -> QuorumVerified.
800        let evidence = build_evidence(
801            vec![
802                (quorum_peers[0], PresenceEvidence::Present),
803                (quorum_peers[1], PresenceEvidence::Present),
804            ],
805            vec![],
806        );
807
808        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
809        assert!(
810            matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 2),
811            "expected QuorumVerified with 2 sources, got {outcome:?}"
812        );
813    }
814
815    #[test]
816    fn paid_list_verified_with_single_paid_peer() {
817        let key = xor_name_from_byte(0x60);
818        let config = ReplicationConfig::default();
819
820        // 1 paid peer, confirm_needed = floor(1/2)+1 = 1
821        let paid_peers = vec![peer_id_from_byte(10)];
822        // No quorum targets -> quorum path impossible from the start.
823        let targets = single_key_targets(&key, vec![], paid_peers.clone());
824
825        let evidence = build_evidence(vec![], vec![(paid_peers[0], PaidListEvidence::Confirmed)]);
826
827        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
828        assert!(
829            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
830            "expected PaidListVerified with single peer, got {outcome:?}"
831        );
832    }
833
834    #[test]
835    fn paid_list_majority_uses_self_inclusive_paid_group_size() {
836        let key = xor_name_from_byte(0x61);
837        let config = ReplicationConfig::default();
838
839        // Real target computation uses PaidCloseGroup(K), which is
840        // self-inclusive. If self is in a 20-node paid group and does not
841        // already have local paid-list state, 10 remote confirmations are not
842        // enough: ConfirmNeeded(20) is 11.
843        let paid_peers: Vec<PeerId> = (1..=19).map(peer_id_from_byte).collect();
844        let mut targets = single_key_targets(&key, vec![], paid_peers.clone());
845        targets.paid_group_sizes.insert(key, 20);
846
847        let ten_confirmations = build_evidence(
848            vec![],
849            paid_peers
850                .iter()
851                .enumerate()
852                .map(|(i, p)| {
853                    (
854                        *p,
855                        if i < 10 {
856                            PaidListEvidence::Confirmed
857                        } else {
858                            PaidListEvidence::NotFound
859                        },
860                    )
861                })
862                .collect(),
863        );
864        let outcome = evaluate_key_evidence(&key, &ten_confirmations, &targets, &config);
865        assert!(
866            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
867            "10/20 paid confirmations must not authorize the key, got {outcome:?}"
868        );
869
870        let eleven_confirmations = build_evidence(
871            vec![],
872            paid_peers
873                .iter()
874                .enumerate()
875                .map(|(i, p)| {
876                    (
877                        *p,
878                        if i < 11 {
879                            PaidListEvidence::Confirmed
880                        } else {
881                            PaidListEvidence::NotFound
882                        },
883                    )
884                })
885                .collect(),
886        );
887        let outcome = evaluate_key_evidence(&key, &eleven_confirmations, &targets, &config);
888        assert!(
889            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
890            "11/20 paid confirmations should authorize the key, got {outcome:?}"
891        );
892    }
893
894    #[test]
895    fn quorum_fails_with_zero_targets_no_paid() {
896        let key = xor_name_from_byte(0x70);
897        let config = ReplicationConfig::default();
898
899        // No quorum peers, no paid peers.
900        // quorum_needed(0) = min(4, 1) = 1, but 0 positive + 0 unresolved < 1.
901        // confirm_needed(0) = 1, but 0 confirmed + 0 unresolved < 1.
902        let targets = single_key_targets(&key, vec![], vec![]);
903
904        let evidence = build_evidence(vec![], vec![]);
905
906        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
907        assert!(
908            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
909            "expected QuorumFailed with zero targets, got {outcome:?}"
910        );
911    }
912
913    #[test]
914    fn quorum_verified_beats_paid_list_when_both_satisfied() {
915        // When both presence quorum AND paid-list majority are satisfied,
916        // QuorumVerified takes precedence (evaluated first).
917        let key = xor_name_from_byte(0x80);
918        let config = ReplicationConfig::default();
919
920        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
921        let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
922        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
923
924        // quorum_needed(5) = min(4, 3) = 3; all 5 Present -> quorum met.
925        // confirm_needed(3) = 2; all 3 Confirmed -> paid met.
926        let evidence = build_evidence(
927            vec![
928                (quorum_peers[0], PresenceEvidence::Present),
929                (quorum_peers[1], PresenceEvidence::Present),
930                (quorum_peers[2], PresenceEvidence::Present),
931                (quorum_peers[3], PresenceEvidence::Present),
932                (quorum_peers[4], PresenceEvidence::Present),
933            ],
934            vec![
935                (paid_peers[0], PaidListEvidence::Confirmed),
936                (paid_peers[1], PaidListEvidence::Confirmed),
937                (paid_peers[2], PaidListEvidence::Confirmed),
938            ],
939        );
940
941        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
942        assert!(
943            matches!(outcome, KeyVerificationOutcome::QuorumVerified { .. }),
944            "QuorumVerified should take precedence over PaidListVerified, got {outcome:?}"
945        );
946    }
947
948    // -----------------------------------------------------------------------
949    // process_verification_response
950    // -----------------------------------------------------------------------
951
952    #[test]
953    fn process_response_populates_evidence() {
954        let key = xor_name_from_byte(0x90);
955        let peer = peer_id_from_byte(1);
956
957        let targets = single_key_targets(&key, vec![peer], vec![peer]);
958
959        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
960            key,
961            KeyVerificationEvidence {
962                presence: HashMap::new(),
963                paid_list: HashMap::new(),
964            },
965        ))
966        .collect();
967
968        let response = VerificationResponse {
969            results: vec![KeyVerificationResult {
970                key,
971                present: true,
972                paid: Some(true),
973            }],
974        };
975
976        process_verification_response(&peer, &response, &targets, &mut evidence);
977
978        let ev = evidence.get(&key).expect("evidence for key");
979        assert_eq!(
980            ev.presence.get(&peer),
981            Some(&PresenceEvidence::Present),
982            "presence should be Present"
983        );
984        assert_eq!(
985            ev.paid_list.get(&peer),
986            Some(&PaidListEvidence::Confirmed),
987            "paid_list should be Confirmed"
988        );
989    }
990
991    #[test]
992    fn process_response_missing_key_gets_unresolved() {
993        let key = xor_name_from_byte(0xA0);
994        let peer = peer_id_from_byte(2);
995
996        let targets = single_key_targets(&key, vec![peer], vec![peer]);
997
998        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
999            key,
1000            KeyVerificationEvidence {
1001                presence: HashMap::new(),
1002                paid_list: HashMap::new(),
1003            },
1004        ))
1005        .collect();
1006
1007        // Empty response: peer did not include our key.
1008        let response = VerificationResponse { results: vec![] };
1009
1010        process_verification_response(&peer, &response, &targets, &mut evidence);
1011
1012        let ev = evidence.get(&key).expect("evidence for key");
1013        assert_eq!(
1014            ev.presence.get(&peer),
1015            Some(&PresenceEvidence::Unresolved),
1016            "missing key in response should be Unresolved"
1017        );
1018        assert_eq!(
1019            ev.paid_list.get(&peer),
1020            Some(&PaidListEvidence::Unresolved),
1021            "missing paid key in response should be Unresolved"
1022        );
1023    }
1024
1025    #[test]
1026    fn process_response_ignores_unsolicited_keys() {
1027        let key = xor_name_from_byte(0xB0);
1028        let unsolicited_key = xor_name_from_byte(0xB1);
1029        let peer = peer_id_from_byte(3);
1030
1031        let targets = single_key_targets(&key, vec![peer], vec![]);
1032
1033        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
1034            key,
1035            KeyVerificationEvidence {
1036                presence: HashMap::new(),
1037                paid_list: HashMap::new(),
1038            },
1039        ))
1040        .collect();
1041
1042        // Response includes an unsolicited key.
1043        let response = VerificationResponse {
1044            results: vec![
1045                KeyVerificationResult {
1046                    key: unsolicited_key,
1047                    present: true,
1048                    paid: None,
1049                },
1050                KeyVerificationResult {
1051                    key,
1052                    present: false,
1053                    paid: None,
1054                },
1055            ],
1056        };
1057
1058        process_verification_response(&peer, &response, &targets, &mut evidence);
1059
1060        // Unsolicited key should not appear in evidence.
1061        assert!(
1062            !evidence.contains_key(&unsolicited_key),
1063            "unsolicited key should not be in evidence"
1064        );
1065
1066        let ev = evidence.get(&key).expect("evidence for key");
1067        assert_eq!(
1068            ev.presence.get(&peer),
1069            Some(&PresenceEvidence::Absent),
1070            "solicited key should have Absent"
1071        );
1072    }
1073
1074    // -----------------------------------------------------------------------
1075    // mark_peer_unresolved
1076    // -----------------------------------------------------------------------
1077
1078    #[test]
1079    fn mark_unresolved_sets_all_keys_for_peer() {
1080        let key_a = xor_name_from_byte(0xC0);
1081        let key_b = xor_name_from_byte(0xC1);
1082        let peer = peer_id_from_byte(5);
1083
1084        // Peer is a quorum target for key_a and a paid target for key_b.
1085        let targets = VerificationTargets {
1086            quorum_targets: std::iter::once((key_a, vec![peer])).collect(),
1087            paid_targets: std::iter::once((key_b, vec![peer])).collect(),
1088            paid_group_sizes: [(key_a, 0), (key_b, 1)].into_iter().collect(),
1089            all_peers: std::iter::once(peer).collect(),
1090            peer_to_keys: std::iter::once((peer, vec![key_a, key_b])).collect(),
1091            peer_to_paid_keys: std::iter::once((peer, std::iter::once(key_b).collect())).collect(),
1092        };
1093
1094        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
1095            (
1096                key_a,
1097                KeyVerificationEvidence {
1098                    presence: HashMap::new(),
1099                    paid_list: HashMap::new(),
1100                },
1101            ),
1102            (
1103                key_b,
1104                KeyVerificationEvidence {
1105                    presence: HashMap::new(),
1106                    paid_list: HashMap::new(),
1107                },
1108            ),
1109        ]
1110        .into_iter()
1111        .collect();
1112
1113        mark_peer_unresolved(&peer, &targets, &mut evidence);
1114
1115        let ev_a = evidence.get(&key_a).expect("evidence for key_a");
1116        assert_eq!(
1117            ev_a.presence.get(&peer),
1118            Some(&PresenceEvidence::Unresolved)
1119        );
1120        // key_a is not in peer_to_paid_keys, so no paid_list entry.
1121        assert!(!ev_a.paid_list.contains_key(&peer));
1122
1123        let ev_b = evidence.get(&key_b).expect("evidence for key_b");
1124        assert_eq!(
1125            ev_b.presence.get(&peer),
1126            Some(&PresenceEvidence::Unresolved)
1127        );
1128        assert_eq!(
1129            ev_b.paid_list.get(&peer),
1130            Some(&PaidListEvidence::Unresolved)
1131        );
1132    }
1133
1134    // -----------------------------------------------------------------------
1135    // Section 18 scenarios
1136    // -----------------------------------------------------------------------
1137
1138    /// Scenario 4: All peers respond Absent with no paid confirmations.
1139    /// Both presence and paid-list paths are impossible -> `QuorumFailed`.
1140    #[test]
1141    fn scenario_4_quorum_fail_transitions_to_abandoned() {
1142        let key = xor_name_from_byte(0xD0);
1143        let config = ReplicationConfig::default();
1144
1145        // 7 quorum peers, threshold = min(4, floor(7/2)+1) = 4
1146        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1147        // 5 paid peers, confirm_needed = floor(5/2)+1 = 3
1148        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1149        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1150
1151        // All quorum peers respond Absent, all paid peers respond NotFound.
1152        let evidence = build_evidence(
1153            quorum_peers
1154                .iter()
1155                .map(|p| (*p, PresenceEvidence::Absent))
1156                .collect(),
1157            paid_peers
1158                .iter()
1159                .map(|p| (*p, PaidListEvidence::NotFound))
1160                .collect(),
1161        );
1162
1163        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1164        assert!(
1165            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
1166            "all-Absent with no paid confirmations should yield QuorumFailed, got {outcome:?}"
1167        );
1168    }
1169
1170    /// Scenario 16: All peers unresolved (timeout). Neither success nor
1171    /// fail-fast is possible because unresolved counts keep both paths alive.
1172    #[test]
1173    fn scenario_16_timeout_yields_inconclusive() {
1174        let key = xor_name_from_byte(0xD1);
1175        let config = ReplicationConfig::default();
1176
1177        // 7 quorum peers, quorum_needed = 4
1178        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1179        // 5 paid peers, confirm_needed = 3
1180        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1181        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1182
1183        // Every peer is Unresolved (simulating full timeout).
1184        let evidence = build_evidence(
1185            quorum_peers
1186                .iter()
1187                .map(|p| (*p, PresenceEvidence::Unresolved))
1188                .collect(),
1189            paid_peers
1190                .iter()
1191                .map(|p| (*p, PaidListEvidence::Unresolved))
1192                .collect(),
1193        );
1194
1195        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1196        assert!(
1197            matches!(outcome, KeyVerificationOutcome::QuorumInconclusive),
1198            "all-unresolved should yield QuorumInconclusive, got {outcome:?}"
1199        );
1200    }
1201
1202    /// Scenario 27: A single verification round collects both presence
1203    /// evidence from `QuorumTargets` and paid-list confirmations from
1204    /// `PaidTargets`. Paid-list success triggers `PaidListVerified` even when
1205    /// presence quorum fails.
1206    #[test]
1207    fn scenario_27_single_round_collects_both_presence_and_paid() {
1208        let key = xor_name_from_byte(0xD2);
1209        let config = ReplicationConfig::default();
1210
1211        // 7 quorum peers: only 1 Present (quorum_needed=4, so quorum fails).
1212        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1213        // 5 paid peers: 3 Confirmed (confirm_needed=3, so paid passes).
1214        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1215        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1216
1217        let evidence = build_evidence(
1218            vec![
1219                (quorum_peers[0], PresenceEvidence::Present),
1220                (quorum_peers[1], PresenceEvidence::Absent),
1221                (quorum_peers[2], PresenceEvidence::Absent),
1222                (quorum_peers[3], PresenceEvidence::Absent),
1223                (quorum_peers[4], PresenceEvidence::Absent),
1224                (quorum_peers[5], PresenceEvidence::Absent),
1225                (quorum_peers[6], PresenceEvidence::Absent),
1226            ],
1227            vec![
1228                (paid_peers[0], PaidListEvidence::Confirmed),
1229                (paid_peers[1], PaidListEvidence::Confirmed),
1230                (paid_peers[2], PaidListEvidence::Confirmed),
1231                (paid_peers[3], PaidListEvidence::NotFound),
1232                (paid_peers[4], PaidListEvidence::NotFound),
1233            ],
1234        );
1235
1236        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1237        assert!(
1238            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
1239            "paid-list majority should trigger PaidListVerified when quorum fails, got {outcome:?}"
1240        );
1241    }
1242
1243    /// Scenario 28: With |QuorumTargets|=3,
1244    /// `QuorumNeeded` = min(4, floor(3/2)+1) = min(4, 2) = 2.
1245    /// 2 Present responses should pass.
1246    #[test]
1247    fn scenario_28_dynamic_threshold_with_3_targets() {
1248        let key = xor_name_from_byte(0xD3);
1249        let config = ReplicationConfig::default();
1250
1251        let quorum_peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
1252        let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
1253
1254        // Verify the dynamic threshold is indeed 2.
1255        assert_eq!(config.quorum_needed(3), 2, "quorum_needed(3) should be 2");
1256
1257        // 2 Present, 1 Absent -> 2 >= 2 -> QuorumVerified.
1258        let evidence = build_evidence(
1259            vec![
1260                (quorum_peers[0], PresenceEvidence::Present),
1261                (quorum_peers[1], PresenceEvidence::Present),
1262                (quorum_peers[2], PresenceEvidence::Absent),
1263            ],
1264            vec![],
1265        );
1266
1267        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1268        assert!(
1269            matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 2),
1270            "2 Present in 3-target set should QuorumVerify, got {outcome:?}"
1271        );
1272    }
1273
1274    /// Helper: build `VerificationTargets` for two keys with shared or
1275    /// separate peer sets.
1276    fn two_key_targets(
1277        key_a: &XorName,
1278        key_b: &XorName,
1279        quorum_peers_a: Vec<PeerId>,
1280        quorum_peers_b: Vec<PeerId>,
1281        paid_peers_a: Vec<PeerId>,
1282        paid_peers_b: Vec<PeerId>,
1283    ) -> VerificationTargets {
1284        let mut all_peers = HashSet::new();
1285        let mut peer_to_keys: HashMap<PeerId, Vec<XorName>> = HashMap::new();
1286        let mut peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>> = HashMap::new();
1287
1288        for &p in &quorum_peers_a {
1289            all_peers.insert(p);
1290            peer_to_keys.entry(p).or_default().push(*key_a);
1291        }
1292        for &p in &quorum_peers_b {
1293            all_peers.insert(p);
1294            peer_to_keys.entry(p).or_default().push(*key_b);
1295        }
1296        for &p in &paid_peers_a {
1297            all_peers.insert(p);
1298            peer_to_keys.entry(p).or_default().push(*key_a);
1299            peer_to_paid_keys.entry(p).or_default().insert(*key_a);
1300        }
1301        for &p in &paid_peers_b {
1302            all_peers.insert(p);
1303            peer_to_keys.entry(p).or_default().push(*key_b);
1304            peer_to_paid_keys.entry(p).or_default().insert(*key_b);
1305        }
1306
1307        for keys_list in peer_to_keys.values_mut() {
1308            keys_list.sort_unstable();
1309            keys_list.dedup();
1310        }
1311
1312        let mut quorum_targets = HashMap::new();
1313        quorum_targets.insert(*key_a, quorum_peers_a);
1314        quorum_targets.insert(*key_b, quorum_peers_b);
1315
1316        let mut paid_targets = HashMap::new();
1317        let paid_group_size_a = paid_peers_a.len();
1318        let paid_group_size_b = paid_peers_b.len();
1319        paid_targets.insert(*key_a, paid_peers_a);
1320        paid_targets.insert(*key_b, paid_peers_b);
1321
1322        VerificationTargets {
1323            quorum_targets,
1324            paid_targets,
1325            paid_group_sizes: [(*key_a, paid_group_size_a), (*key_b, paid_group_size_b)]
1326                .into_iter()
1327                .collect(),
1328            all_peers,
1329            peer_to_keys,
1330            peer_to_paid_keys,
1331        }
1332    }
1333
1334    /// Scenario 33: `process_verification_response` correctly attributes
1335    /// per-key evidence when a single peer responds for multiple keys.
1336    #[test]
1337    fn scenario_33_batched_response_per_key_evidence() {
1338        let key_a = xor_name_from_byte(0xD4);
1339        let key_b = xor_name_from_byte(0xD5);
1340        let peer = peer_id_from_byte(1);
1341
1342        // Peer is a quorum+paid target for both keys.
1343        let targets = two_key_targets(
1344            &key_a,
1345            &key_b,
1346            vec![peer],
1347            vec![peer],
1348            vec![peer],
1349            vec![peer],
1350        );
1351
1352        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
1353            (
1354                key_a,
1355                KeyVerificationEvidence {
1356                    presence: HashMap::new(),
1357                    paid_list: HashMap::new(),
1358                },
1359            ),
1360            (
1361                key_b,
1362                KeyVerificationEvidence {
1363                    presence: HashMap::new(),
1364                    paid_list: HashMap::new(),
1365                },
1366            ),
1367        ]
1368        .into_iter()
1369        .collect();
1370
1371        // Peer responds: key_a Present+Confirmed, key_b Absent+NotFound.
1372        let response = VerificationResponse {
1373            results: vec![
1374                KeyVerificationResult {
1375                    key: key_a,
1376                    present: true,
1377                    paid: Some(true),
1378                },
1379                KeyVerificationResult {
1380                    key: key_b,
1381                    present: false,
1382                    paid: Some(false),
1383                },
1384            ],
1385        };
1386
1387        process_verification_response(&peer, &response, &targets, &mut evidence);
1388
1389        // key_a: Present + Confirmed.
1390        let ev_a = evidence.get(&key_a).expect("evidence for key_a");
1391        assert_eq!(ev_a.presence.get(&peer), Some(&PresenceEvidence::Present));
1392        assert_eq!(
1393            ev_a.paid_list.get(&peer),
1394            Some(&PaidListEvidence::Confirmed)
1395        );
1396
1397        // key_b: Absent + NotFound.
1398        let ev_b = evidence.get(&key_b).expect("evidence for key_b");
1399        assert_eq!(ev_b.presence.get(&peer), Some(&PresenceEvidence::Absent));
1400        assert_eq!(ev_b.paid_list.get(&peer), Some(&PaidListEvidence::NotFound));
1401    }
1402
1403    /// Scenario 34: Peer responds for `key_a` but omits `key_b`.
1404    /// `key_a` gets explicit evidence, `key_b` gets Unresolved.
1405    #[test]
1406    fn scenario_34_partial_response_unresolved_per_key() {
1407        let key_a = xor_name_from_byte(0xD6);
1408        let key_b = xor_name_from_byte(0xD7);
1409        let peer = peer_id_from_byte(2);
1410
1411        // Peer is a quorum target for both keys, paid target for key_b only.
1412        let targets = two_key_targets(&key_a, &key_b, vec![peer], vec![peer], vec![], vec![peer]);
1413
1414        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
1415            (
1416                key_a,
1417                KeyVerificationEvidence {
1418                    presence: HashMap::new(),
1419                    paid_list: HashMap::new(),
1420                },
1421            ),
1422            (
1423                key_b,
1424                KeyVerificationEvidence {
1425                    presence: HashMap::new(),
1426                    paid_list: HashMap::new(),
1427                },
1428            ),
1429        ]
1430        .into_iter()
1431        .collect();
1432
1433        // Peer responds only for key_a, omits key_b entirely.
1434        let response = VerificationResponse {
1435            results: vec![KeyVerificationResult {
1436                key: key_a,
1437                present: true,
1438                paid: None,
1439            }],
1440        };
1441
1442        process_verification_response(&peer, &response, &targets, &mut evidence);
1443
1444        // key_a: explicit Present.
1445        let ev_a = evidence.get(&key_a).expect("evidence for key_a");
1446        assert_eq!(
1447            ev_a.presence.get(&peer),
1448            Some(&PresenceEvidence::Present),
1449            "key_a should have explicit Present"
1450        );
1451
1452        // key_b: missing from response -> Unresolved for both presence and
1453        // paid_list.
1454        let ev_b = evidence.get(&key_b).expect("evidence for key_b");
1455        assert_eq!(
1456            ev_b.presence.get(&peer),
1457            Some(&PresenceEvidence::Unresolved),
1458            "omitted key_b should get Unresolved presence"
1459        );
1460        assert_eq!(
1461            ev_b.paid_list.get(&peer),
1462            Some(&PaidListEvidence::Unresolved),
1463            "omitted key_b (paid target) should get Unresolved paid_list"
1464        );
1465    }
1466
1467    /// Scenario 42: `QuorumVerified` outcome populates sources correctly,
1468    /// which downstream uses to add the key to `PaidForList`.
1469    #[test]
1470    fn scenario_42_quorum_pass_derives_paid_list_auth() {
1471        let key = xor_name_from_byte(0xD8);
1472        let config = ReplicationConfig::default();
1473
1474        // 5 quorum peers, quorum_needed = min(4, 3) = 3.
1475        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
1476        // 3 paid peers (some overlap with quorum peers for realistic scenario).
1477        let paid_peers: Vec<PeerId> = (3..=5).map(peer_id_from_byte).collect();
1478        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1479
1480        // 4 quorum peers Present, 1 Absent -> quorum met.
1481        // Also mark paid_peers[0] (peer 3) as Present so it's collected from
1482        // paid targets too.
1483        let evidence = build_evidence(
1484            vec![
1485                (quorum_peers[0], PresenceEvidence::Present),
1486                (quorum_peers[1], PresenceEvidence::Present),
1487                (quorum_peers[2], PresenceEvidence::Present), // peer 3
1488                (quorum_peers[3], PresenceEvidence::Present), // peer 4
1489                (quorum_peers[4], PresenceEvidence::Absent),  // peer 5
1490            ],
1491            vec![
1492                (paid_peers[0], PaidListEvidence::NotFound),
1493                (paid_peers[1], PaidListEvidence::NotFound),
1494                (paid_peers[2], PaidListEvidence::NotFound),
1495            ],
1496        );
1497
1498        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1499        match outcome {
1500            KeyVerificationOutcome::QuorumVerified { ref sources } => {
1501                // Sources should include peers that responded Present from
1502                // both quorum and paid targets.
1503                assert!(
1504                    sources.len() >= 4,
1505                    "QuorumVerified sources should contain at least the 4 quorum-positive peers, got {}",
1506                    sources.len()
1507                );
1508                // The sources list is used downstream to authorize
1509                // PaidForList insertion. Verify specific peers are present.
1510                assert!(
1511                    sources.contains(&quorum_peers[0]),
1512                    "source peer 1 should be in sources"
1513                );
1514                assert!(
1515                    sources.contains(&quorum_peers[1]),
1516                    "source peer 2 should be in sources"
1517                );
1518            }
1519            other => panic!("expected QuorumVerified, got {other:?}"),
1520        }
1521    }
1522
1523    /// Scenario 44: Paid-list cold-start recovery via replica majority.
1524    ///
1525    /// Multiple nodes restart simultaneously and lose their `PaidForList`
1526    /// (persistence corrupted). Key `K` still has `>= QuorumNeeded(K)`
1527    /// replicas in the close group. During neighbor-sync verification,
1528    /// presence quorum passes and all verifying nodes re-derive `K` into
1529    /// their `PaidForList` via close-group replica majority (Section 7.2
1530    /// rule 4).
1531    ///
1532    /// This test verifies that when paid-list evidence is entirely
1533    /// `NotFound` (simulating data loss) but presence evidence meets
1534    /// quorum, the outcome is `QuorumVerified` with sources that enable
1535    /// `PaidForList` re-derivation.
1536    #[test]
1537    fn scenario_44_cold_start_recovery_via_replica_majority() {
1538        let key = xor_name_from_byte(0xD9);
1539        let config = ReplicationConfig::default();
1540
1541        // 7 quorum peers, quorum_needed = min(4, floor(7/2)+1) = 4.
1542        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1543        // 10 paid peers (wider group), confirm_needed = floor(10/2)+1 = 6.
1544        let paid_peers: Vec<PeerId> = (10..=19).map(peer_id_from_byte).collect();
1545        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1546
1547        // Cold-start scenario: ALL paid-list entries are lost across every
1548        // peer in PaidCloseGroup. Every paid peer reports NotFound.
1549        let paid_evidence: Vec<(PeerId, PaidListEvidence)> = paid_peers
1550            .iter()
1551            .map(|p| (*p, PaidListEvidence::NotFound))
1552            .collect();
1553
1554        // But the replicas still exist: 5 out of 7 quorum peers report
1555        // Present (>= QuorumNeeded(K) = 4).
1556        let presence_evidence = vec![
1557            (quorum_peers[0], PresenceEvidence::Present),
1558            (quorum_peers[1], PresenceEvidence::Present),
1559            (quorum_peers[2], PresenceEvidence::Present),
1560            (quorum_peers[3], PresenceEvidence::Present),
1561            (quorum_peers[4], PresenceEvidence::Present),
1562            (quorum_peers[5], PresenceEvidence::Absent),
1563            (quorum_peers[6], PresenceEvidence::Absent),
1564        ];
1565
1566        let evidence = build_evidence(presence_evidence, paid_evidence);
1567        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1568
1569        match outcome {
1570            KeyVerificationOutcome::QuorumVerified { ref sources } => {
1571                // Quorum passed despite total paid-list loss. The caller
1572                // re-derives PaidForList from close-group replica majority.
1573                assert!(
1574                    sources.len() >= 4,
1575                    "QuorumVerified should have >= 4 sources (the presence-positive peers), got {}",
1576                    sources.len()
1577                );
1578
1579                // Verify the specific Present peers are in sources.
1580                for (i, peer) in quorum_peers.iter().enumerate().take(5) {
1581                    assert!(
1582                        sources.contains(peer),
1583                        "quorum_peer[{i}] responded Present and should be a fetch source"
1584                    );
1585                }
1586
1587                // Absent peers are NOT sources.
1588                assert!(
1589                    !sources.contains(&quorum_peers[5]),
1590                    "absent peer should not be a fetch source"
1591                );
1592                assert!(
1593                    !sources.contains(&quorum_peers[6]),
1594                    "absent peer should not be a fetch source"
1595                );
1596            }
1597            other => panic!(
1598                "Cold-start recovery should succeed via replica majority \
1599                 (QuorumVerified), got {other:?}"
1600            ),
1601        }
1602    }
1603
1604    /// Scenario 20: Unknown replica key found in local `PaidForList` bypasses
1605    /// presence quorum.
1606    ///
1607    /// When a key's paid-list evidence shows confirmation from enough peers,
1608    /// `PaidListVerified` is returned even without a single presence-positive
1609    /// response.  This models the local-hit fast-path: the caller already
1610    /// checked the local paid list and the network confirms majority — no
1611    /// presence quorum needed.
1612    #[test]
1613    fn scenario_20_paid_list_local_hit_bypasses_presence_quorum() {
1614        let key = xor_name_from_byte(0xE0);
1615        let config = ReplicationConfig::default();
1616
1617        // 7 quorum peers, quorum_needed = 4.
1618        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1619        // 5 paid peers, confirm_needed = floor(5/2)+1 = 3.
1620        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1621        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1622
1623        // ALL quorum peers Absent (presence quorum impossible) but 3/5 paid
1624        // peers confirm → PaidListVerified.
1625        let evidence = build_evidence(
1626            quorum_peers
1627                .iter()
1628                .map(|p| (*p, PresenceEvidence::Absent))
1629                .collect(),
1630            vec![
1631                (paid_peers[0], PaidListEvidence::Confirmed),
1632                (paid_peers[1], PaidListEvidence::Confirmed),
1633                (paid_peers[2], PaidListEvidence::Confirmed),
1634                (paid_peers[3], PaidListEvidence::NotFound),
1635                (paid_peers[4], PaidListEvidence::NotFound),
1636            ],
1637        );
1638
1639        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1640        assert!(
1641            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
1642            "paid-list majority should bypass failed presence quorum, got {outcome:?}"
1643        );
1644    }
1645
1646    /// Scenario 22: Paid-list confirmation below threshold AND presence quorum
1647    /// fails → `QuorumFailed`.
1648    ///
1649    /// Neither path can succeed: presence peers are all Absent (can't reach
1650    /// `quorum_needed`) and paid confirmations are below `confirm_needed`.
1651    #[test]
1652    fn scenario_22_paid_list_rejection_below_threshold() {
1653        let key = xor_name_from_byte(0xE2);
1654        let config = ReplicationConfig::default();
1655
1656        // 7 quorum peers, quorum_needed = 4.
1657        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1658        // 5 paid peers, confirm_needed = 3.
1659        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1660        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1661
1662        // All quorum peers Absent; only 2/5 paid confirmations (below 3).
1663        let evidence = build_evidence(
1664            quorum_peers
1665                .iter()
1666                .map(|p| (*p, PresenceEvidence::Absent))
1667                .collect(),
1668            vec![
1669                (paid_peers[0], PaidListEvidence::Confirmed),
1670                (paid_peers[1], PaidListEvidence::Confirmed),
1671                (paid_peers[2], PaidListEvidence::NotFound),
1672                (paid_peers[3], PaidListEvidence::NotFound),
1673                (paid_peers[4], PaidListEvidence::NotFound),
1674            ],
1675        );
1676
1677        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1678        assert!(
1679            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
1680            "below-threshold paid confirmations with all-Absent quorum should yield QuorumFailed, got {outcome:?}"
1681        );
1682    }
1683}