Skip to main content

ant_node/replication/
quorum.rs

1//! Quorum verification logic (Section 9).
2//!
3//! Single-round batched verification: presence + paid-list evidence collected
4//! in one request round to `VerifyTargets = PaidTargets ∪ QuorumTargets`.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8use std::time::Instant;
9
10use crate::logging::{debug, info, warn};
11use saorsa_core::identity::PeerId;
12use saorsa_core::P2PNode;
13
14use crate::ant_protocol::XorName;
15use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
16use crate::replication::protocol::{
17    ReplicationMessage, ReplicationMessageBody, VerificationRequest, VerificationResponse,
18};
19use crate::replication::types::{KeyVerificationEvidence, PaidListEvidence, PresenceEvidence};
20
21/// Verification round duration that is worth surfacing at info level.
22const VERIFICATION_ROUND_SLOW_LOG_MS: u128 = 500;
23
24// ---------------------------------------------------------------------------
25// Verification targets
26// ---------------------------------------------------------------------------
27
28/// Targets for verifying a set of keys.
29#[derive(Debug, Default)]
30pub struct VerificationTargets {
31    /// Per-key: closest `CLOSE_GROUP_SIZE` peers (excluding self) for presence
32    /// quorum.
33    pub quorum_targets: HashMap<XorName, Vec<PeerId>>,
34    /// Per-key: `PaidCloseGroup` peers for paid-list majority.
35    pub paid_targets: HashMap<XorName, Vec<PeerId>>,
36    /// Per-key: self-inclusive paid close-group size used to compute
37    /// `ConfirmNeeded(K)`.
38    pub paid_group_sizes: HashMap<XorName, usize>,
39    /// Union of all target peers across all keys.
40    pub all_peers: HashSet<PeerId>,
41    /// Which keys each peer should be queried about.
42    pub peer_to_keys: HashMap<PeerId, Vec<XorName>>,
43    /// Which keys need paid-list checks from which peers.
44    pub peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>>,
45}
46
47/// Compute verification targets for a batch of keys.
48///
49/// For each key, determines the `QuorumTargets` (closest `CLOSE_GROUP_SIZE`
50/// peers excluding self) and `PaidTargets` (`PaidCloseGroup` excluding self),
51/// then unions them into per-peer request batches.
52pub async fn compute_verification_targets(
53    keys: &[XorName],
54    p2p_node: &Arc<P2PNode>,
55    config: &ReplicationConfig,
56    self_id: &PeerId,
57) -> VerificationTargets {
58    let dht = p2p_node.dht_manager();
59    let mut targets = VerificationTargets {
60        quorum_targets: HashMap::new(),
61        paid_targets: HashMap::new(),
62        paid_group_sizes: HashMap::new(),
63        all_peers: HashSet::new(),
64        peer_to_keys: HashMap::new(),
65        peer_to_paid_keys: HashMap::new(),
66    };
67
68    for &key in keys {
69        // QuorumTargets: up to CLOSE_GROUP_SIZE nearest peers for K, excluding
70        // self.
71        let closest = dht
72            .find_closest_nodes_local(&key, config.close_group_size)
73            .await;
74        let quorum_peers: Vec<PeerId> = closest
75            .iter()
76            .filter(|n| n.peer_id != *self_id)
77            .map(|n| n.peer_id)
78            .collect();
79
80        // PaidTargets: PaidCloseGroup(K) excluding self.
81        let paid_closest = dht
82            .find_closest_nodes_local_with_self(&key, config.paid_list_close_group_size)
83            .await;
84        let paid_group_size = paid_closest.len();
85        let paid_peers: Vec<PeerId> = paid_closest
86            .iter()
87            .filter(|n| n.peer_id != *self_id)
88            .map(|n| n.peer_id)
89            .collect();
90
91        // VerifyTargets = PaidTargets ∪ QuorumTargets
92        for &peer in &quorum_peers {
93            targets.all_peers.insert(peer);
94            targets.peer_to_keys.entry(peer).or_default().push(key);
95        }
96        for &peer in &paid_peers {
97            targets.all_peers.insert(peer);
98            targets.peer_to_keys.entry(peer).or_default().push(key);
99            targets
100                .peer_to_paid_keys
101                .entry(peer)
102                .or_default()
103                .insert(key);
104        }
105
106        targets.quorum_targets.insert(key, quorum_peers);
107        targets.paid_targets.insert(key, paid_peers);
108        targets.paid_group_sizes.insert(key, paid_group_size);
109    }
110
111    // Deduplicate keys per peer (a peer in both quorum and paid targets for
112    // the same key would have it listed twice).
113    for keys_list in targets.peer_to_keys.values_mut() {
114        keys_list.sort_unstable();
115        keys_list.dedup();
116    }
117
118    targets
119}
120
121/// Compute presence-only verification targets for locally paid keys.
122///
123/// Local `PaidForList` membership authorizes the key already; this target set
124/// is only used to discover peers that can serve the record bytes.
125pub async fn compute_presence_targets(
126    keys: &[XorName],
127    p2p_node: &Arc<P2PNode>,
128    config: &ReplicationConfig,
129    self_id: &PeerId,
130) -> VerificationTargets {
131    let dht = p2p_node.dht_manager();
132    let mut targets = VerificationTargets {
133        quorum_targets: HashMap::new(),
134        paid_targets: HashMap::new(),
135        paid_group_sizes: HashMap::new(),
136        all_peers: HashSet::new(),
137        peer_to_keys: HashMap::new(),
138        peer_to_paid_keys: HashMap::new(),
139    };
140
141    for &key in keys {
142        let closest = dht
143            .find_closest_nodes_local(&key, config.close_group_size)
144            .await;
145        let quorum_peers: Vec<PeerId> = closest
146            .iter()
147            .filter(|n| n.peer_id != *self_id)
148            .map(|n| n.peer_id)
149            .collect();
150
151        for &peer in &quorum_peers {
152            targets.all_peers.insert(peer);
153            targets.peer_to_keys.entry(peer).or_default().push(key);
154        }
155
156        targets.quorum_targets.insert(key, quorum_peers);
157    }
158
159    for keys_list in targets.peer_to_keys.values_mut() {
160        keys_list.sort_unstable();
161        keys_list.dedup();
162    }
163
164    targets
165}
166
167// ---------------------------------------------------------------------------
168// Verification outcome
169// ---------------------------------------------------------------------------
170
171/// Outcome of verifying a single key.
172#[derive(Debug, Clone)]
173pub enum KeyVerificationOutcome {
174    /// Presence quorum passed.
175    QuorumVerified {
176        /// Peers that responded `Present` (verified fetch sources).
177        sources: Vec<PeerId>,
178    },
179    /// Paid-list authorization succeeded.
180    PaidListVerified {
181        /// Peers that responded `Present` (potential fetch sources, may be
182        /// empty).
183        sources: Vec<PeerId>,
184    },
185    /// Quorum failed definitively (both paths impossible).
186    QuorumFailed,
187    /// Inconclusive (timeout with neither success nor fail-fast).
188    QuorumInconclusive,
189}
190
191// ---------------------------------------------------------------------------
192// Evidence evaluation (pure logic, no I/O)
193// ---------------------------------------------------------------------------
194
195/// Evaluate verification evidence for a single key.
196///
197/// Returns the outcome based on Section 9 rules:
198/// - **Step 10**: If presence positives >= `QuorumNeeded(K)`, `QuorumVerified`.
199/// - **Step 9**: If paid confirmations >= `ConfirmNeeded(K)`,
200///   `PaidListVerified`.
201/// - **Step 14**: Fail fast when both paths are impossible.
202/// - **Step 15**: Otherwise inconclusive.
203#[must_use]
204pub fn evaluate_key_evidence(
205    key: &XorName,
206    evidence: &KeyVerificationEvidence,
207    targets: &VerificationTargets,
208    config: &ReplicationConfig,
209) -> KeyVerificationOutcome {
210    evaluate_key_evidence_with_holder_check(key, evidence, targets, config, |_, _| true)
211}
212
213/// Variant of [`evaluate_key_evidence`] that consults a holder-credit
214/// predicate before counting a peer's Present evidence (v12 §6).
215///
216/// `holder_credit` is invoked as `(peer, key) -> bool`. Returning `false`
217/// downgrades a Present claim to Unresolved (we don't trust this peer's
218/// "I have it" without a recent commitment-bound audit proving it).
219/// Returning `true` keeps today's behaviour. Paid-list evidence is
220/// independent of holder credit (the paid-list lookup is a property of
221/// the receiving peer's own data, not a claim about K being present).
222///
223/// The non-`_with_holder_check` form preserves prior behaviour by
224/// passing a predicate that always returns true. New call sites that
225/// have a `RecentProvers` cache + commitment-by-peer table should pass
226/// a real predicate.
227#[must_use]
228pub fn evaluate_key_evidence_with_holder_check(
229    key: &XorName,
230    evidence: &KeyVerificationEvidence,
231    targets: &VerificationTargets,
232    config: &ReplicationConfig,
233    holder_credit: impl Fn(&PeerId, &XorName) -> bool,
234) -> KeyVerificationOutcome {
235    let quorum_peers = targets
236        .quorum_targets
237        .get(key)
238        .map_or(&[][..], Vec::as_slice);
239
240    // Count presence evidence from QuorumTargets. v12 §6: a peer that
241    // claims Present but is not commitment-credited for K is downgraded
242    // to Unresolved (we may have to retry once they re-prove storage).
243    let mut presence_positive = 0usize;
244    let mut presence_unresolved = 0usize;
245
246    for peer in quorum_peers {
247        match evidence.presence.get(peer) {
248            Some(PresenceEvidence::Present) => {
249                if holder_credit(peer, key) {
250                    presence_positive += 1;
251                } else {
252                    presence_unresolved += 1;
253                }
254            }
255            Some(PresenceEvidence::Absent) => {}
256            Some(PresenceEvidence::Unresolved) | None => {
257                presence_unresolved += 1;
258            }
259        }
260    }
261
262    // Also collect Present peers from paid targets for fetch sources.
263    let paid_peers = targets.paid_targets.get(key).map_or(&[][..], Vec::as_slice);
264    let present_peers = collect_present_sources(evidence, quorum_peers, paid_peers);
265
266    // Count paid-list evidence from PaidTargets.
267    let mut paid_confirmed = 0usize;
268    let mut paid_unresolved = 0usize;
269
270    for peer in paid_peers {
271        match evidence.paid_list.get(peer) {
272            Some(PaidListEvidence::Confirmed) => paid_confirmed += 1,
273            Some(PaidListEvidence::NotFound) => {}
274            Some(PaidListEvidence::Unresolved) | None => paid_unresolved += 1,
275        }
276    }
277
278    let quorum_needed = config.quorum_needed(quorum_peers.len());
279    let paid_group_size = targets
280        .paid_group_sizes
281        .get(key)
282        .copied()
283        .unwrap_or(paid_peers.len());
284    let confirm_needed = ReplicationConfig::confirm_needed(paid_group_size);
285
286    // Step 10: Presence quorum reached.
287    // quorum_needed == 0 means zero targets exist — quorum is impossible,
288    // not trivially met.
289    if quorum_needed > 0 && presence_positive >= quorum_needed {
290        return KeyVerificationOutcome::QuorumVerified {
291            sources: present_peers,
292        };
293    }
294
295    // Step 9: Paid-list majority reached.
296    // confirm_needed from 0 paid peers is 1, so this naturally fails with
297    // 0 confirmed — no special guard needed. But be explicit for clarity.
298    if paid_group_size > 0 && paid_confirmed >= confirm_needed {
299        return KeyVerificationOutcome::PaidListVerified {
300            sources: present_peers,
301        };
302    }
303
304    // Step 14: Fail fast when both paths are impossible.
305    let paid_possible = paid_group_size > 0 && paid_confirmed + paid_unresolved >= confirm_needed;
306    let quorum_possible =
307        quorum_needed > 0 && presence_positive + presence_unresolved >= quorum_needed;
308
309    if !paid_possible && !quorum_possible {
310        return KeyVerificationOutcome::QuorumFailed;
311    }
312
313    // Step 15: Neither success nor fail-fast.
314    KeyVerificationOutcome::QuorumInconclusive
315}
316
317/// Return peers that gave positive presence evidence for a key.
318///
319/// Only peers in the computed verification target sets are considered.
320#[must_use]
321pub fn present_sources_for_key(
322    key: &XorName,
323    evidence: &KeyVerificationEvidence,
324    targets: &VerificationTargets,
325) -> Vec<PeerId> {
326    let quorum_peers = targets
327        .quorum_targets
328        .get(key)
329        .map_or(&[][..], Vec::as_slice);
330    let paid_peers = targets.paid_targets.get(key).map_or(&[][..], Vec::as_slice);
331
332    collect_present_sources(evidence, quorum_peers, paid_peers)
333}
334
335fn collect_present_sources(
336    evidence: &KeyVerificationEvidence,
337    quorum_peers: &[PeerId],
338    paid_peers: &[PeerId],
339) -> Vec<PeerId> {
340    let mut present_peers = Vec::new();
341    let mut seen = HashSet::new();
342
343    for peer in quorum_peers.iter().chain(paid_peers.iter()) {
344        if matches!(evidence.presence.get(peer), Some(PresenceEvidence::Present))
345            && seen.insert(*peer)
346        {
347            present_peers.push(*peer);
348        }
349    }
350
351    present_peers
352}
353
354// ---------------------------------------------------------------------------
355// Network verification round
356// ---------------------------------------------------------------------------
357
358/// Send batched verification requests to all peers and collect evidence.
359///
360/// Implements Section 9 requirement: one request per peer carrying many keys.
361/// Returns per-key evidence aggregated from all peer responses.
362pub async fn run_verification_round(
363    keys: &[XorName],
364    targets: &VerificationTargets,
365    p2p_node: &Arc<P2PNode>,
366    config: &ReplicationConfig,
367) -> HashMap<XorName, KeyVerificationEvidence> {
368    let started = Instant::now();
369    let peer_count = targets.peer_to_keys.len();
370    let requested_key_refs = targets.peer_to_keys.values().map(Vec::len).sum::<usize>();
371
372    // Initialize empty evidence for all keys.
373    let mut evidence: HashMap<XorName, KeyVerificationEvidence> = keys
374        .iter()
375        .map(|&k| {
376            (
377                k,
378                KeyVerificationEvidence {
379                    presence: HashMap::new(),
380                    paid_list: HashMap::new(),
381                },
382            )
383        })
384        .collect();
385
386    // Send one batched request per peer.
387    let mut handles = Vec::new();
388
389    for (&peer, peer_keys) in &targets.peer_to_keys {
390        let paid_check_keys = targets.peer_to_paid_keys.get(&peer);
391
392        // Build paid_list_check_indices: which of this peer's keys need
393        // paid-list status.
394        let mut paid_indices = Vec::new();
395        for (i, key) in peer_keys.iter().enumerate() {
396            if let Some(paid_keys) = paid_check_keys {
397                if paid_keys.contains(key) {
398                    if let Ok(idx) = u32::try_from(i) {
399                        paid_indices.push(idx);
400                    }
401                }
402            }
403        }
404
405        let request = VerificationRequest {
406            keys: peer_keys.clone(),
407            paid_list_check_indices: paid_indices,
408        };
409
410        let msg = ReplicationMessage {
411            request_id: rand::random(),
412            body: ReplicationMessageBody::VerificationRequest(request),
413        };
414
415        let p2p = Arc::clone(p2p_node);
416        let timeout = config.verification_request_timeout;
417        let peer_id = peer;
418
419        handles.push(tokio::spawn(async move {
420            let encoded = match msg.encode() {
421                Ok(data) => data,
422                Err(e) => {
423                    warn!("Failed to encode verification request: {e}");
424                    return (peer_id, None);
425                }
426            };
427
428            match p2p
429                .send_request(&peer_id, REPLICATION_PROTOCOL_ID, encoded, timeout)
430                .await
431            {
432                Ok(response) => match ReplicationMessage::decode(&response.data) {
433                    Ok(decoded) => (peer_id, Some(decoded)),
434                    Err(e) => {
435                        warn!("Failed to decode verification response from {peer_id}: {e}");
436                        (peer_id, None)
437                    }
438                },
439                Err(e) => {
440                    debug!("Verification request to {peer_id} failed: {e}");
441                    (peer_id, None)
442                }
443            }
444        }));
445    }
446
447    // Collect responses.
448    for handle in handles {
449        let (peer, response) = match handle.await {
450            Ok(result) => result,
451            Err(e) => {
452                warn!("Verification task panicked: {e}");
453                continue;
454            }
455        };
456
457        let Some(msg) = response else {
458            // Timeout/error: mark all keys for this peer as unresolved.
459            mark_peer_unresolved(&peer, targets, &mut evidence);
460            continue;
461        };
462
463        if let ReplicationMessageBody::VerificationResponse(resp) = msg.body {
464            process_verification_response(&peer, &resp, targets, &mut evidence);
465        }
466    }
467
468    let elapsed_ms = started.elapsed().as_millis();
469    if elapsed_ms >= VERIFICATION_ROUND_SLOW_LOG_MS {
470        info!(
471            target: "ant_node::replication::verification",
472            "Slow quorum verification round: keys={}, peers={peer_count}, requested_key_refs={requested_key_refs}, elapsed_ms={elapsed_ms}",
473            keys.len(),
474        );
475    } else {
476        debug!(
477            target: "ant_node::replication::verification",
478            "Quorum verification round: keys={}, peers={peer_count}, requested_key_refs={requested_key_refs}, elapsed_ms={elapsed_ms}",
479            keys.len(),
480        );
481    }
482
483    evidence
484}
485
486/// Mark all keys for a peer as unresolved (timeout / decode failure).
487fn mark_peer_unresolved(
488    peer: &PeerId,
489    targets: &VerificationTargets,
490    evidence: &mut HashMap<XorName, KeyVerificationEvidence>,
491) {
492    if let Some(peer_keys) = targets.peer_to_keys.get(peer) {
493        let is_paid_peer = targets.peer_to_paid_keys.get(peer);
494        for key in peer_keys {
495            if let Some(ev) = evidence.get_mut(key) {
496                ev.presence.insert(*peer, PresenceEvidence::Unresolved);
497                if is_paid_peer.is_some_and(|ks| ks.contains(key)) {
498                    ev.paid_list.insert(*peer, PaidListEvidence::Unresolved);
499                }
500            }
501        }
502    }
503}
504
505/// Process a single peer's verification response into the evidence map.
506fn process_verification_response(
507    peer: &PeerId,
508    response: &VerificationResponse,
509    targets: &VerificationTargets,
510    evidence: &mut HashMap<XorName, KeyVerificationEvidence>,
511) {
512    let Some(peer_keys) = targets.peer_to_keys.get(peer) else {
513        return;
514    };
515
516    // Use a HashSet for O(1) key membership checks instead of linear scan,
517    // preventing CPU amplification from large responses.
518    let peer_keys_set: HashSet<&XorName> = peer_keys.iter().collect();
519
520    // Cap results at 2x requested keys to limit processing of stuffed
521    // responses while still tolerating some unsolicited entries.
522    let max_results = peer_keys.len().saturating_mul(2);
523    let results = if response.results.len() > max_results {
524        warn!(
525            "Peer {peer} sent {} verification results but only {} keys were requested — truncating",
526            response.results.len(),
527            peer_keys.len(),
528        );
529        &response.results[..max_results]
530    } else {
531        &response.results
532    };
533
534    // Match response results to requested keys.
535    for result in results {
536        if !peer_keys_set.contains(&result.key) {
537            continue; // Ignore unsolicited key results.
538        }
539
540        if let Some(ev) = evidence.get_mut(&result.key) {
541            // Presence evidence.
542            let presence = if result.present {
543                PresenceEvidence::Present
544            } else {
545                PresenceEvidence::Absent
546            };
547            ev.presence.insert(*peer, presence);
548
549            // Paid-list evidence (only if requested).
550            if let Some(is_paid) = result.paid {
551                let paid = if is_paid {
552                    PaidListEvidence::Confirmed
553                } else {
554                    PaidListEvidence::NotFound
555                };
556                ev.paid_list.insert(*peer, paid);
557            }
558        }
559    }
560
561    // Keys that were requested but not in response -> unresolved.
562    let is_paid_peer = targets.peer_to_paid_keys.get(peer);
563    for key in peer_keys {
564        if let Some(ev) = evidence.get_mut(key) {
565            ev.presence
566                .entry(*peer)
567                .or_insert(PresenceEvidence::Unresolved);
568            if is_paid_peer.is_some_and(|ks| ks.contains(key)) {
569                ev.paid_list
570                    .entry(*peer)
571                    .or_insert(PaidListEvidence::Unresolved);
572            }
573        }
574    }
575}
576
577// ---------------------------------------------------------------------------
578// Tests
579// ---------------------------------------------------------------------------
580
581#[cfg(test)]
582#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
583mod tests {
584    use super::*;
585    use crate::replication::protocol::KeyVerificationResult;
586
587    /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
588    fn peer_id_from_byte(b: u8) -> PeerId {
589        let mut bytes = [0u8; 32];
590        bytes[0] = b;
591        PeerId::from_bytes(bytes)
592    }
593
594    /// Build an `XorName` from a single byte (repeated to 32 bytes).
595    fn xor_name_from_byte(b: u8) -> XorName {
596        [b; 32]
597    }
598
599    /// Helper: build minimal `VerificationTargets` for a single key with
600    /// explicit quorum and paid peer lists.
601    fn single_key_targets(
602        key: &XorName,
603        quorum_peers: Vec<PeerId>,
604        paid_peers: Vec<PeerId>,
605    ) -> VerificationTargets {
606        let mut all_peers = HashSet::new();
607        let mut peer_to_keys: HashMap<PeerId, Vec<XorName>> = HashMap::new();
608        let mut peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>> = HashMap::new();
609
610        for &p in &quorum_peers {
611            all_peers.insert(p);
612            peer_to_keys.entry(p).or_default().push(*key);
613        }
614        for &p in &paid_peers {
615            all_peers.insert(p);
616            peer_to_keys.entry(p).or_default().push(*key);
617            peer_to_paid_keys.entry(p).or_default().insert(*key);
618        }
619
620        // Deduplicate keys per peer.
621        for keys_list in peer_to_keys.values_mut() {
622            keys_list.sort_unstable();
623            keys_list.dedup();
624        }
625
626        let paid_group_size = paid_peers.len();
627        VerificationTargets {
628            quorum_targets: std::iter::once((key.to_owned(), quorum_peers)).collect(),
629            paid_group_sizes: std::iter::once((key.to_owned(), paid_group_size)).collect(),
630            paid_targets: std::iter::once((key.to_owned(), paid_peers)).collect(),
631            all_peers,
632            peer_to_keys,
633            peer_to_paid_keys,
634        }
635    }
636
637    /// Helper: build `KeyVerificationEvidence` from presence and paid-list
638    /// maps.
639    fn build_evidence(
640        presence: Vec<(PeerId, PresenceEvidence)>,
641        paid_list: Vec<(PeerId, PaidListEvidence)>,
642    ) -> KeyVerificationEvidence {
643        KeyVerificationEvidence {
644            presence: presence.into_iter().collect(),
645            paid_list: paid_list.into_iter().collect(),
646        }
647    }
648
649    #[test]
650    fn present_sources_for_key_filters_targets_and_deduplicates() {
651        let key = xor_name_from_byte(0x11);
652        let q_present = peer_id_from_byte(1);
653        let overlap = peer_id_from_byte(2);
654        let q_absent = peer_id_from_byte(3);
655        let q_unresolved = peer_id_from_byte(4);
656        let paid_present = peer_id_from_byte(5);
657        let paid_absent = peer_id_from_byte(6);
658        let outside_target = peer_id_from_byte(7);
659
660        let targets = single_key_targets(
661            &key,
662            vec![q_present, overlap, q_absent, q_unresolved],
663            vec![overlap, paid_present, paid_absent],
664        );
665        let evidence = build_evidence(
666            vec![
667                (q_present, PresenceEvidence::Present),
668                (overlap, PresenceEvidence::Present),
669                (q_absent, PresenceEvidence::Absent),
670                (q_unresolved, PresenceEvidence::Unresolved),
671                (paid_present, PresenceEvidence::Present),
672                (paid_absent, PresenceEvidence::Absent),
673                (outside_target, PresenceEvidence::Present),
674            ],
675            vec![],
676        );
677
678        let sources = present_sources_for_key(&key, &evidence, &targets);
679
680        assert_eq!(
681            sources,
682            vec![q_present, overlap, paid_present],
683            "sources should preserve quorum-first order, de-duplicate overlap, and ignore non-target/negative evidence"
684        );
685    }
686
687    // -----------------------------------------------------------------------
688    // evaluate_key_evidence: QuorumVerified
689    // -----------------------------------------------------------------------
690
691    #[test]
692    fn quorum_verified_with_enough_present_responses() {
693        let key = xor_name_from_byte(0x10);
694        let config = ReplicationConfig::default();
695
696        // 7 quorum peers, threshold = min(4, floor(7/2)+1) = 4
697        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
698        let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
699
700        // 4 peers say Present, 3 say Absent.
701        let evidence = build_evidence(
702            vec![
703                (quorum_peers[0], PresenceEvidence::Present),
704                (quorum_peers[1], PresenceEvidence::Present),
705                (quorum_peers[2], PresenceEvidence::Present),
706                (quorum_peers[3], PresenceEvidence::Present),
707                (quorum_peers[4], PresenceEvidence::Absent),
708                (quorum_peers[5], PresenceEvidence::Absent),
709                (quorum_peers[6], PresenceEvidence::Absent),
710            ],
711            vec![],
712        );
713
714        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
715        assert!(
716            matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 4),
717            "expected QuorumVerified with 4 sources, got {outcome:?}"
718        );
719    }
720
721    // -----------------------------------------------------------------------
722    // v12 §6 holder-credit predicate downgrades uncredited peers
723    // -----------------------------------------------------------------------
724
725    #[test]
726    fn quorum_downgrades_uncredited_present_peers() {
727        // 7 quorum peers, threshold 4. 4 say Present, 3 say Absent —
728        // would normally pass. But with a holder-credit predicate that
729        // only credits 2 of them, presence_positive drops to 2 and the
730        // 2 uncredited Presents become Unresolved. Total = 2 positive
731        // + 2 unresolved + 3 absent = 5 valid → still possible →
732        // QuorumInconclusive (not yet failed, but not verified either).
733        let key = xor_name_from_byte(0x33);
734        let config = ReplicationConfig::default();
735        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
736        let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
737
738        let evidence = build_evidence(
739            vec![
740                (quorum_peers[0], PresenceEvidence::Present),
741                (quorum_peers[1], PresenceEvidence::Present),
742                (quorum_peers[2], PresenceEvidence::Present),
743                (quorum_peers[3], PresenceEvidence::Present),
744                (quorum_peers[4], PresenceEvidence::Absent),
745                (quorum_peers[5], PresenceEvidence::Absent),
746                (quorum_peers[6], PresenceEvidence::Absent),
747            ],
748            vec![],
749        );
750
751        // Credit only the first two peers (the other two Presents are
752        // uncredited and will be downgraded to Unresolved).
753        let credit = |peer: &PeerId, _: &XorName| -> bool {
754            *peer == quorum_peers[0] || *peer == quorum_peers[1]
755        };
756        let outcome =
757            evaluate_key_evidence_with_holder_check(&key, &evidence, &targets, &config, credit);
758        assert!(
759            matches!(outcome, KeyVerificationOutcome::QuorumInconclusive),
760            "credit downgrade should drop presence_positive below threshold, got {outcome:?}"
761        );
762    }
763
764    #[test]
765    fn quorum_passes_when_all_present_peers_are_credited() {
766        let key = xor_name_from_byte(0x34);
767        let config = ReplicationConfig::default();
768        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
769        let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
770
771        let evidence = build_evidence(
772            (0..4)
773                .map(|i| (quorum_peers[i], PresenceEvidence::Present))
774                .chain((4..7).map(|i| (quorum_peers[i], PresenceEvidence::Absent)))
775                .collect(),
776            vec![],
777        );
778
779        let credit = |_: &PeerId, _: &XorName| -> bool { true };
780        let outcome =
781            evaluate_key_evidence_with_holder_check(&key, &evidence, &targets, &config, credit);
782        assert!(
783            matches!(outcome, KeyVerificationOutcome::QuorumVerified { .. }),
784            "all-credited Present should pass quorum, got {outcome:?}"
785        );
786    }
787
788    #[test]
789    fn paid_list_path_unaffected_by_holder_credit() {
790        // v12 §6: holder-credit gates Present claims, NOT paid-list
791        // evidence (the paid-list lookup is the receiving peer's own
792        // data, not a claim about K). A peer with no credit at all
793        // can still contribute to paid-list majority.
794        let key = xor_name_from_byte(0x35);
795        let config = ReplicationConfig::default();
796        let quorum_peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
797        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
798        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
799
800        let evidence = build_evidence(
801            quorum_peers
802                .iter()
803                .map(|p| (*p, PresenceEvidence::Absent))
804                .collect(),
805            vec![
806                (paid_peers[0], PaidListEvidence::Confirmed),
807                (paid_peers[1], PaidListEvidence::Confirmed),
808                (paid_peers[2], PaidListEvidence::Confirmed),
809                (paid_peers[3], PaidListEvidence::NotFound),
810                (paid_peers[4], PaidListEvidence::NotFound),
811            ],
812        );
813
814        let credit = |_: &PeerId, _: &XorName| -> bool { false };
815        let outcome =
816            evaluate_key_evidence_with_holder_check(&key, &evidence, &targets, &config, credit);
817        assert!(
818            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
819            "paid-list path must not be gated by holder-credit, got {outcome:?}"
820        );
821    }
822
823    // -----------------------------------------------------------------------
824    // evaluate_key_evidence: PaidListVerified
825    // -----------------------------------------------------------------------
826
827    #[test]
828    fn paid_list_verified_with_enough_confirmations() {
829        let key = xor_name_from_byte(0x20);
830        let config = ReplicationConfig::default();
831
832        // 5 paid peers, confirm_needed = floor(5/2)+1 = 3
833        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
834        // No quorum peers (or quorum fails).
835        let quorum_peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
836        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
837
838        // Quorum: all Absent (fails presence path).
839        // Paid: 3 Confirmed, 2 NotFound -> majority reached.
840        let evidence = build_evidence(
841            vec![
842                (quorum_peers[0], PresenceEvidence::Absent),
843                (quorum_peers[1], PresenceEvidence::Absent),
844                (quorum_peers[2], PresenceEvidence::Absent),
845            ],
846            vec![
847                (paid_peers[0], PaidListEvidence::Confirmed),
848                (paid_peers[1], PaidListEvidence::Confirmed),
849                (paid_peers[2], PaidListEvidence::Confirmed),
850                (paid_peers[3], PaidListEvidence::NotFound),
851                (paid_peers[4], PaidListEvidence::NotFound),
852            ],
853        );
854
855        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
856        assert!(
857            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
858            "expected PaidListVerified, got {outcome:?}"
859        );
860    }
861
862    // -----------------------------------------------------------------------
863    // evaluate_key_evidence: QuorumFailed
864    // -----------------------------------------------------------------------
865
866    #[test]
867    fn quorum_failed_when_both_paths_impossible() {
868        let key = xor_name_from_byte(0x30);
869        let config = ReplicationConfig::default();
870
871        // 5 quorum peers, quorum_needed = min(4, floor(5/2)+1) = min(4,3) = 3
872        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
873        // 3 paid peers, confirm_needed = floor(3/2)+1 = 2
874        let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
875        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
876
877        // Presence: all 5 Absent (0 positive, 0 unresolved) -> can't reach 3.
878        // Paid: all 3 NotFound (0 confirmed, 0 unresolved) -> can't reach 2.
879        let evidence = build_evidence(
880            vec![
881                (quorum_peers[0], PresenceEvidence::Absent),
882                (quorum_peers[1], PresenceEvidence::Absent),
883                (quorum_peers[2], PresenceEvidence::Absent),
884                (quorum_peers[3], PresenceEvidence::Absent),
885                (quorum_peers[4], PresenceEvidence::Absent),
886            ],
887            vec![
888                (paid_peers[0], PaidListEvidence::NotFound),
889                (paid_peers[1], PaidListEvidence::NotFound),
890                (paid_peers[2], PaidListEvidence::NotFound),
891            ],
892        );
893
894        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
895        assert!(
896            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
897            "expected QuorumFailed, got {outcome:?}"
898        );
899    }
900
901    // -----------------------------------------------------------------------
902    // evaluate_key_evidence: QuorumInconclusive
903    // -----------------------------------------------------------------------
904
905    #[test]
906    fn quorum_inconclusive_with_unresolved_peers() {
907        let key = xor_name_from_byte(0x40);
908        let config = ReplicationConfig::default();
909
910        // 5 quorum peers, quorum_needed = min(4, 3) = 3
911        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
912        // 3 paid peers, confirm_needed = 2
913        let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
914        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
915
916        // Presence: 2 Present, 1 Absent, 2 Unresolved.
917        // positive=2, unresolved=2 -> 2+2=4 >= 3 -> quorum still possible.
918        // Paid: 1 Confirmed, 1 Unresolved, 1 NotFound.
919        // confirmed=1, unresolved=1 -> 1+1=2 >= 2 -> paid still possible.
920        // Neither path reached yet -> Inconclusive.
921        let evidence = build_evidence(
922            vec![
923                (quorum_peers[0], PresenceEvidence::Present),
924                (quorum_peers[1], PresenceEvidence::Present),
925                (quorum_peers[2], PresenceEvidence::Absent),
926                (quorum_peers[3], PresenceEvidence::Unresolved),
927                (quorum_peers[4], PresenceEvidence::Unresolved),
928            ],
929            vec![
930                (paid_peers[0], PaidListEvidence::Confirmed),
931                (paid_peers[1], PaidListEvidence::Unresolved),
932                (paid_peers[2], PaidListEvidence::NotFound),
933            ],
934        );
935
936        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
937        assert!(
938            matches!(outcome, KeyVerificationOutcome::QuorumInconclusive),
939            "expected QuorumInconclusive, got {outcome:?}"
940        );
941    }
942
943    // -----------------------------------------------------------------------
944    // Dynamic thresholds with undersized sets
945    // -----------------------------------------------------------------------
946
947    #[test]
948    fn quorum_verified_with_undersized_quorum_targets() {
949        let key = xor_name_from_byte(0x50);
950        let config = ReplicationConfig::default();
951
952        // Only 2 quorum peers (undersized).
953        // quorum_needed = min(4, floor(2/2)+1) = min(4, 2) = 2
954        let quorum_peers: Vec<PeerId> = (1..=2).map(peer_id_from_byte).collect();
955        let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
956
957        // Both Present -> 2 >= 2 -> QuorumVerified.
958        let evidence = build_evidence(
959            vec![
960                (quorum_peers[0], PresenceEvidence::Present),
961                (quorum_peers[1], PresenceEvidence::Present),
962            ],
963            vec![],
964        );
965
966        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
967        assert!(
968            matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 2),
969            "expected QuorumVerified with 2 sources, got {outcome:?}"
970        );
971    }
972
973    #[test]
974    fn paid_list_verified_with_single_paid_peer() {
975        let key = xor_name_from_byte(0x60);
976        let config = ReplicationConfig::default();
977
978        // 1 paid peer, confirm_needed = floor(1/2)+1 = 1
979        let paid_peers = vec![peer_id_from_byte(10)];
980        // No quorum targets -> quorum path impossible from the start.
981        let targets = single_key_targets(&key, vec![], paid_peers.clone());
982
983        let evidence = build_evidence(vec![], vec![(paid_peers[0], PaidListEvidence::Confirmed)]);
984
985        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
986        assert!(
987            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
988            "expected PaidListVerified with single peer, got {outcome:?}"
989        );
990    }
991
992    #[test]
993    fn paid_list_majority_uses_self_inclusive_paid_group_size() {
994        let key = xor_name_from_byte(0x61);
995        let config = ReplicationConfig::default();
996
997        // Real target computation uses PaidCloseGroup(K), which is
998        // self-inclusive. If self is in a 20-node paid group and does not
999        // already have local paid-list state, 10 remote confirmations are not
1000        // enough: ConfirmNeeded(20) is 11.
1001        let paid_peers: Vec<PeerId> = (1..=19).map(peer_id_from_byte).collect();
1002        let mut targets = single_key_targets(&key, vec![], paid_peers.clone());
1003        targets.paid_group_sizes.insert(key, 20);
1004
1005        let ten_confirmations = build_evidence(
1006            vec![],
1007            paid_peers
1008                .iter()
1009                .enumerate()
1010                .map(|(i, p)| {
1011                    (
1012                        *p,
1013                        if i < 10 {
1014                            PaidListEvidence::Confirmed
1015                        } else {
1016                            PaidListEvidence::NotFound
1017                        },
1018                    )
1019                })
1020                .collect(),
1021        );
1022        let outcome = evaluate_key_evidence(&key, &ten_confirmations, &targets, &config);
1023        assert!(
1024            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
1025            "10/20 paid confirmations must not authorize the key, got {outcome:?}"
1026        );
1027
1028        let eleven_confirmations = build_evidence(
1029            vec![],
1030            paid_peers
1031                .iter()
1032                .enumerate()
1033                .map(|(i, p)| {
1034                    (
1035                        *p,
1036                        if i < 11 {
1037                            PaidListEvidence::Confirmed
1038                        } else {
1039                            PaidListEvidence::NotFound
1040                        },
1041                    )
1042                })
1043                .collect(),
1044        );
1045        let outcome = evaluate_key_evidence(&key, &eleven_confirmations, &targets, &config);
1046        assert!(
1047            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
1048            "11/20 paid confirmations should authorize the key, got {outcome:?}"
1049        );
1050    }
1051
1052    #[test]
1053    fn quorum_fails_with_zero_targets_no_paid() {
1054        let key = xor_name_from_byte(0x70);
1055        let config = ReplicationConfig::default();
1056
1057        // No quorum peers, no paid peers.
1058        // quorum_needed(0) = min(4, 1) = 1, but 0 positive + 0 unresolved < 1.
1059        // confirm_needed(0) = 1, but 0 confirmed + 0 unresolved < 1.
1060        let targets = single_key_targets(&key, vec![], vec![]);
1061
1062        let evidence = build_evidence(vec![], vec![]);
1063
1064        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1065        assert!(
1066            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
1067            "expected QuorumFailed with zero targets, got {outcome:?}"
1068        );
1069    }
1070
1071    #[test]
1072    fn quorum_verified_beats_paid_list_when_both_satisfied() {
1073        // When both presence quorum AND paid-list majority are satisfied,
1074        // QuorumVerified takes precedence (evaluated first).
1075        let key = xor_name_from_byte(0x80);
1076        let config = ReplicationConfig::default();
1077
1078        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
1079        let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
1080        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1081
1082        // quorum_needed(5) = min(4, 3) = 3; all 5 Present -> quorum met.
1083        // confirm_needed(3) = 2; all 3 Confirmed -> paid met.
1084        let evidence = build_evidence(
1085            vec![
1086                (quorum_peers[0], PresenceEvidence::Present),
1087                (quorum_peers[1], PresenceEvidence::Present),
1088                (quorum_peers[2], PresenceEvidence::Present),
1089                (quorum_peers[3], PresenceEvidence::Present),
1090                (quorum_peers[4], PresenceEvidence::Present),
1091            ],
1092            vec![
1093                (paid_peers[0], PaidListEvidence::Confirmed),
1094                (paid_peers[1], PaidListEvidence::Confirmed),
1095                (paid_peers[2], PaidListEvidence::Confirmed),
1096            ],
1097        );
1098
1099        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1100        assert!(
1101            matches!(outcome, KeyVerificationOutcome::QuorumVerified { .. }),
1102            "QuorumVerified should take precedence over PaidListVerified, got {outcome:?}"
1103        );
1104    }
1105
1106    // -----------------------------------------------------------------------
1107    // process_verification_response
1108    // -----------------------------------------------------------------------
1109
1110    #[test]
1111    fn process_response_populates_evidence() {
1112        let key = xor_name_from_byte(0x90);
1113        let peer = peer_id_from_byte(1);
1114
1115        let targets = single_key_targets(&key, vec![peer], vec![peer]);
1116
1117        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
1118            key,
1119            KeyVerificationEvidence {
1120                presence: HashMap::new(),
1121                paid_list: HashMap::new(),
1122            },
1123        ))
1124        .collect();
1125
1126        let response = VerificationResponse {
1127            results: vec![KeyVerificationResult {
1128                key,
1129                present: true,
1130                paid: Some(true),
1131            }],
1132        };
1133
1134        process_verification_response(&peer, &response, &targets, &mut evidence);
1135
1136        let ev = evidence.get(&key).expect("evidence for key");
1137        assert_eq!(
1138            ev.presence.get(&peer),
1139            Some(&PresenceEvidence::Present),
1140            "presence should be Present"
1141        );
1142        assert_eq!(
1143            ev.paid_list.get(&peer),
1144            Some(&PaidListEvidence::Confirmed),
1145            "paid_list should be Confirmed"
1146        );
1147    }
1148
1149    #[test]
1150    fn process_response_missing_key_gets_unresolved() {
1151        let key = xor_name_from_byte(0xA0);
1152        let peer = peer_id_from_byte(2);
1153
1154        let targets = single_key_targets(&key, vec![peer], vec![peer]);
1155
1156        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
1157            key,
1158            KeyVerificationEvidence {
1159                presence: HashMap::new(),
1160                paid_list: HashMap::new(),
1161            },
1162        ))
1163        .collect();
1164
1165        // Empty response: peer did not include our key.
1166        let response = VerificationResponse { results: vec![] };
1167
1168        process_verification_response(&peer, &response, &targets, &mut evidence);
1169
1170        let ev = evidence.get(&key).expect("evidence for key");
1171        assert_eq!(
1172            ev.presence.get(&peer),
1173            Some(&PresenceEvidence::Unresolved),
1174            "missing key in response should be Unresolved"
1175        );
1176        assert_eq!(
1177            ev.paid_list.get(&peer),
1178            Some(&PaidListEvidence::Unresolved),
1179            "missing paid key in response should be Unresolved"
1180        );
1181    }
1182
1183    #[test]
1184    fn process_response_ignores_unsolicited_keys() {
1185        let key = xor_name_from_byte(0xB0);
1186        let unsolicited_key = xor_name_from_byte(0xB1);
1187        let peer = peer_id_from_byte(3);
1188
1189        let targets = single_key_targets(&key, vec![peer], vec![]);
1190
1191        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
1192            key,
1193            KeyVerificationEvidence {
1194                presence: HashMap::new(),
1195                paid_list: HashMap::new(),
1196            },
1197        ))
1198        .collect();
1199
1200        // Response includes an unsolicited key.
1201        let response = VerificationResponse {
1202            results: vec![
1203                KeyVerificationResult {
1204                    key: unsolicited_key,
1205                    present: true,
1206                    paid: None,
1207                },
1208                KeyVerificationResult {
1209                    key,
1210                    present: false,
1211                    paid: None,
1212                },
1213            ],
1214        };
1215
1216        process_verification_response(&peer, &response, &targets, &mut evidence);
1217
1218        // Unsolicited key should not appear in evidence.
1219        assert!(
1220            !evidence.contains_key(&unsolicited_key),
1221            "unsolicited key should not be in evidence"
1222        );
1223
1224        let ev = evidence.get(&key).expect("evidence for key");
1225        assert_eq!(
1226            ev.presence.get(&peer),
1227            Some(&PresenceEvidence::Absent),
1228            "solicited key should have Absent"
1229        );
1230    }
1231
1232    // -----------------------------------------------------------------------
1233    // mark_peer_unresolved
1234    // -----------------------------------------------------------------------
1235
1236    #[test]
1237    fn mark_unresolved_sets_all_keys_for_peer() {
1238        let key_a = xor_name_from_byte(0xC0);
1239        let key_b = xor_name_from_byte(0xC1);
1240        let peer = peer_id_from_byte(5);
1241
1242        // Peer is a quorum target for key_a and a paid target for key_b.
1243        let targets = VerificationTargets {
1244            quorum_targets: std::iter::once((key_a, vec![peer])).collect(),
1245            paid_targets: std::iter::once((key_b, vec![peer])).collect(),
1246            paid_group_sizes: [(key_a, 0), (key_b, 1)].into_iter().collect(),
1247            all_peers: std::iter::once(peer).collect(),
1248            peer_to_keys: std::iter::once((peer, vec![key_a, key_b])).collect(),
1249            peer_to_paid_keys: std::iter::once((peer, std::iter::once(key_b).collect())).collect(),
1250        };
1251
1252        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
1253            (
1254                key_a,
1255                KeyVerificationEvidence {
1256                    presence: HashMap::new(),
1257                    paid_list: HashMap::new(),
1258                },
1259            ),
1260            (
1261                key_b,
1262                KeyVerificationEvidence {
1263                    presence: HashMap::new(),
1264                    paid_list: HashMap::new(),
1265                },
1266            ),
1267        ]
1268        .into_iter()
1269        .collect();
1270
1271        mark_peer_unresolved(&peer, &targets, &mut evidence);
1272
1273        let ev_a = evidence.get(&key_a).expect("evidence for key_a");
1274        assert_eq!(
1275            ev_a.presence.get(&peer),
1276            Some(&PresenceEvidence::Unresolved)
1277        );
1278        // key_a is not in peer_to_paid_keys, so no paid_list entry.
1279        assert!(!ev_a.paid_list.contains_key(&peer));
1280
1281        let ev_b = evidence.get(&key_b).expect("evidence for key_b");
1282        assert_eq!(
1283            ev_b.presence.get(&peer),
1284            Some(&PresenceEvidence::Unresolved)
1285        );
1286        assert_eq!(
1287            ev_b.paid_list.get(&peer),
1288            Some(&PaidListEvidence::Unresolved)
1289        );
1290    }
1291
1292    // -----------------------------------------------------------------------
1293    // Section 18 scenarios
1294    // -----------------------------------------------------------------------
1295
1296    /// Scenario 4: All peers respond Absent with no paid confirmations.
1297    /// Both presence and paid-list paths are impossible -> `QuorumFailed`.
1298    #[test]
1299    fn scenario_4_quorum_fail_transitions_to_abandoned() {
1300        let key = xor_name_from_byte(0xD0);
1301        let config = ReplicationConfig::default();
1302
1303        // 7 quorum peers, threshold = min(4, floor(7/2)+1) = 4
1304        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1305        // 5 paid peers, confirm_needed = floor(5/2)+1 = 3
1306        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1307        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1308
1309        // All quorum peers respond Absent, all paid peers respond NotFound.
1310        let evidence = build_evidence(
1311            quorum_peers
1312                .iter()
1313                .map(|p| (*p, PresenceEvidence::Absent))
1314                .collect(),
1315            paid_peers
1316                .iter()
1317                .map(|p| (*p, PaidListEvidence::NotFound))
1318                .collect(),
1319        );
1320
1321        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1322        assert!(
1323            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
1324            "all-Absent with no paid confirmations should yield QuorumFailed, got {outcome:?}"
1325        );
1326    }
1327
1328    /// Scenario 16: All peers unresolved (timeout). Neither success nor
1329    /// fail-fast is possible because unresolved counts keep both paths alive.
1330    #[test]
1331    fn scenario_16_timeout_yields_inconclusive() {
1332        let key = xor_name_from_byte(0xD1);
1333        let config = ReplicationConfig::default();
1334
1335        // 7 quorum peers, quorum_needed = 4
1336        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1337        // 5 paid peers, confirm_needed = 3
1338        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1339        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1340
1341        // Every peer is Unresolved (simulating full timeout).
1342        let evidence = build_evidence(
1343            quorum_peers
1344                .iter()
1345                .map(|p| (*p, PresenceEvidence::Unresolved))
1346                .collect(),
1347            paid_peers
1348                .iter()
1349                .map(|p| (*p, PaidListEvidence::Unresolved))
1350                .collect(),
1351        );
1352
1353        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1354        assert!(
1355            matches!(outcome, KeyVerificationOutcome::QuorumInconclusive),
1356            "all-unresolved should yield QuorumInconclusive, got {outcome:?}"
1357        );
1358    }
1359
1360    /// Scenario 27: A single verification round collects both presence
1361    /// evidence from `QuorumTargets` and paid-list confirmations from
1362    /// `PaidTargets`. Paid-list success triggers `PaidListVerified` even when
1363    /// presence quorum fails.
1364    #[test]
1365    fn scenario_27_single_round_collects_both_presence_and_paid() {
1366        let key = xor_name_from_byte(0xD2);
1367        let config = ReplicationConfig::default();
1368
1369        // 7 quorum peers: only 1 Present (quorum_needed=4, so quorum fails).
1370        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1371        // 5 paid peers: 3 Confirmed (confirm_needed=3, so paid passes).
1372        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1373        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1374
1375        let evidence = build_evidence(
1376            vec![
1377                (quorum_peers[0], PresenceEvidence::Present),
1378                (quorum_peers[1], PresenceEvidence::Absent),
1379                (quorum_peers[2], PresenceEvidence::Absent),
1380                (quorum_peers[3], PresenceEvidence::Absent),
1381                (quorum_peers[4], PresenceEvidence::Absent),
1382                (quorum_peers[5], PresenceEvidence::Absent),
1383                (quorum_peers[6], PresenceEvidence::Absent),
1384            ],
1385            vec![
1386                (paid_peers[0], PaidListEvidence::Confirmed),
1387                (paid_peers[1], PaidListEvidence::Confirmed),
1388                (paid_peers[2], PaidListEvidence::Confirmed),
1389                (paid_peers[3], PaidListEvidence::NotFound),
1390                (paid_peers[4], PaidListEvidence::NotFound),
1391            ],
1392        );
1393
1394        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1395        assert!(
1396            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
1397            "paid-list majority should trigger PaidListVerified when quorum fails, got {outcome:?}"
1398        );
1399    }
1400
1401    /// Scenario 28: With |QuorumTargets|=3,
1402    /// `QuorumNeeded` = min(4, floor(3/2)+1) = min(4, 2) = 2.
1403    /// 2 Present responses should pass.
1404    #[test]
1405    fn scenario_28_dynamic_threshold_with_3_targets() {
1406        let key = xor_name_from_byte(0xD3);
1407        let config = ReplicationConfig::default();
1408
1409        let quorum_peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
1410        let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
1411
1412        // Verify the dynamic threshold is indeed 2.
1413        assert_eq!(config.quorum_needed(3), 2, "quorum_needed(3) should be 2");
1414
1415        // 2 Present, 1 Absent -> 2 >= 2 -> QuorumVerified.
1416        let evidence = build_evidence(
1417            vec![
1418                (quorum_peers[0], PresenceEvidence::Present),
1419                (quorum_peers[1], PresenceEvidence::Present),
1420                (quorum_peers[2], PresenceEvidence::Absent),
1421            ],
1422            vec![],
1423        );
1424
1425        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1426        assert!(
1427            matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 2),
1428            "2 Present in 3-target set should QuorumVerify, got {outcome:?}"
1429        );
1430    }
1431
1432    /// Helper: build `VerificationTargets` for two keys with shared or
1433    /// separate peer sets.
1434    fn two_key_targets(
1435        key_a: &XorName,
1436        key_b: &XorName,
1437        quorum_peers_a: Vec<PeerId>,
1438        quorum_peers_b: Vec<PeerId>,
1439        paid_peers_a: Vec<PeerId>,
1440        paid_peers_b: Vec<PeerId>,
1441    ) -> VerificationTargets {
1442        let mut all_peers = HashSet::new();
1443        let mut peer_to_keys: HashMap<PeerId, Vec<XorName>> = HashMap::new();
1444        let mut peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>> = HashMap::new();
1445
1446        for &p in &quorum_peers_a {
1447            all_peers.insert(p);
1448            peer_to_keys.entry(p).or_default().push(*key_a);
1449        }
1450        for &p in &quorum_peers_b {
1451            all_peers.insert(p);
1452            peer_to_keys.entry(p).or_default().push(*key_b);
1453        }
1454        for &p in &paid_peers_a {
1455            all_peers.insert(p);
1456            peer_to_keys.entry(p).or_default().push(*key_a);
1457            peer_to_paid_keys.entry(p).or_default().insert(*key_a);
1458        }
1459        for &p in &paid_peers_b {
1460            all_peers.insert(p);
1461            peer_to_keys.entry(p).or_default().push(*key_b);
1462            peer_to_paid_keys.entry(p).or_default().insert(*key_b);
1463        }
1464
1465        for keys_list in peer_to_keys.values_mut() {
1466            keys_list.sort_unstable();
1467            keys_list.dedup();
1468        }
1469
1470        let mut quorum_targets = HashMap::new();
1471        quorum_targets.insert(*key_a, quorum_peers_a);
1472        quorum_targets.insert(*key_b, quorum_peers_b);
1473
1474        let mut paid_targets = HashMap::new();
1475        let paid_group_size_a = paid_peers_a.len();
1476        let paid_group_size_b = paid_peers_b.len();
1477        paid_targets.insert(*key_a, paid_peers_a);
1478        paid_targets.insert(*key_b, paid_peers_b);
1479
1480        VerificationTargets {
1481            quorum_targets,
1482            paid_targets,
1483            paid_group_sizes: [(*key_a, paid_group_size_a), (*key_b, paid_group_size_b)]
1484                .into_iter()
1485                .collect(),
1486            all_peers,
1487            peer_to_keys,
1488            peer_to_paid_keys,
1489        }
1490    }
1491
1492    /// Scenario 33: `process_verification_response` correctly attributes
1493    /// per-key evidence when a single peer responds for multiple keys.
1494    #[test]
1495    fn scenario_33_batched_response_per_key_evidence() {
1496        let key_a = xor_name_from_byte(0xD4);
1497        let key_b = xor_name_from_byte(0xD5);
1498        let peer = peer_id_from_byte(1);
1499
1500        // Peer is a quorum+paid target for both keys.
1501        let targets = two_key_targets(
1502            &key_a,
1503            &key_b,
1504            vec![peer],
1505            vec![peer],
1506            vec![peer],
1507            vec![peer],
1508        );
1509
1510        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
1511            (
1512                key_a,
1513                KeyVerificationEvidence {
1514                    presence: HashMap::new(),
1515                    paid_list: HashMap::new(),
1516                },
1517            ),
1518            (
1519                key_b,
1520                KeyVerificationEvidence {
1521                    presence: HashMap::new(),
1522                    paid_list: HashMap::new(),
1523                },
1524            ),
1525        ]
1526        .into_iter()
1527        .collect();
1528
1529        // Peer responds: key_a Present+Confirmed, key_b Absent+NotFound.
1530        let response = VerificationResponse {
1531            results: vec![
1532                KeyVerificationResult {
1533                    key: key_a,
1534                    present: true,
1535                    paid: Some(true),
1536                },
1537                KeyVerificationResult {
1538                    key: key_b,
1539                    present: false,
1540                    paid: Some(false),
1541                },
1542            ],
1543        };
1544
1545        process_verification_response(&peer, &response, &targets, &mut evidence);
1546
1547        // key_a: Present + Confirmed.
1548        let ev_a = evidence.get(&key_a).expect("evidence for key_a");
1549        assert_eq!(ev_a.presence.get(&peer), Some(&PresenceEvidence::Present));
1550        assert_eq!(
1551            ev_a.paid_list.get(&peer),
1552            Some(&PaidListEvidence::Confirmed)
1553        );
1554
1555        // key_b: Absent + NotFound.
1556        let ev_b = evidence.get(&key_b).expect("evidence for key_b");
1557        assert_eq!(ev_b.presence.get(&peer), Some(&PresenceEvidence::Absent));
1558        assert_eq!(ev_b.paid_list.get(&peer), Some(&PaidListEvidence::NotFound));
1559    }
1560
1561    /// Scenario 34: Peer responds for `key_a` but omits `key_b`.
1562    /// `key_a` gets explicit evidence, `key_b` gets Unresolved.
1563    #[test]
1564    fn scenario_34_partial_response_unresolved_per_key() {
1565        let key_a = xor_name_from_byte(0xD6);
1566        let key_b = xor_name_from_byte(0xD7);
1567        let peer = peer_id_from_byte(2);
1568
1569        // Peer is a quorum target for both keys, paid target for key_b only.
1570        let targets = two_key_targets(&key_a, &key_b, vec![peer], vec![peer], vec![], vec![peer]);
1571
1572        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
1573            (
1574                key_a,
1575                KeyVerificationEvidence {
1576                    presence: HashMap::new(),
1577                    paid_list: HashMap::new(),
1578                },
1579            ),
1580            (
1581                key_b,
1582                KeyVerificationEvidence {
1583                    presence: HashMap::new(),
1584                    paid_list: HashMap::new(),
1585                },
1586            ),
1587        ]
1588        .into_iter()
1589        .collect();
1590
1591        // Peer responds only for key_a, omits key_b entirely.
1592        let response = VerificationResponse {
1593            results: vec![KeyVerificationResult {
1594                key: key_a,
1595                present: true,
1596                paid: None,
1597            }],
1598        };
1599
1600        process_verification_response(&peer, &response, &targets, &mut evidence);
1601
1602        // key_a: explicit Present.
1603        let ev_a = evidence.get(&key_a).expect("evidence for key_a");
1604        assert_eq!(
1605            ev_a.presence.get(&peer),
1606            Some(&PresenceEvidence::Present),
1607            "key_a should have explicit Present"
1608        );
1609
1610        // key_b: missing from response -> Unresolved for both presence and
1611        // paid_list.
1612        let ev_b = evidence.get(&key_b).expect("evidence for key_b");
1613        assert_eq!(
1614            ev_b.presence.get(&peer),
1615            Some(&PresenceEvidence::Unresolved),
1616            "omitted key_b should get Unresolved presence"
1617        );
1618        assert_eq!(
1619            ev_b.paid_list.get(&peer),
1620            Some(&PaidListEvidence::Unresolved),
1621            "omitted key_b (paid target) should get Unresolved paid_list"
1622        );
1623    }
1624
1625    /// Scenario 42: `QuorumVerified` outcome populates sources correctly,
1626    /// which downstream uses to add the key to `PaidForList`.
1627    #[test]
1628    fn scenario_42_quorum_pass_derives_paid_list_auth() {
1629        let key = xor_name_from_byte(0xD8);
1630        let config = ReplicationConfig::default();
1631
1632        // 5 quorum peers, quorum_needed = min(4, 3) = 3.
1633        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
1634        // 3 paid peers (some overlap with quorum peers for realistic scenario).
1635        let paid_peers: Vec<PeerId> = (3..=5).map(peer_id_from_byte).collect();
1636        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1637
1638        // 4 quorum peers Present, 1 Absent -> quorum met.
1639        // Also mark paid_peers[0] (peer 3) as Present so it's collected from
1640        // paid targets too.
1641        let evidence = build_evidence(
1642            vec![
1643                (quorum_peers[0], PresenceEvidence::Present),
1644                (quorum_peers[1], PresenceEvidence::Present),
1645                (quorum_peers[2], PresenceEvidence::Present), // peer 3
1646                (quorum_peers[3], PresenceEvidence::Present), // peer 4
1647                (quorum_peers[4], PresenceEvidence::Absent),  // peer 5
1648            ],
1649            vec![
1650                (paid_peers[0], PaidListEvidence::NotFound),
1651                (paid_peers[1], PaidListEvidence::NotFound),
1652                (paid_peers[2], PaidListEvidence::NotFound),
1653            ],
1654        );
1655
1656        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1657        match outcome {
1658            KeyVerificationOutcome::QuorumVerified { ref sources } => {
1659                // Sources should include peers that responded Present from
1660                // both quorum and paid targets.
1661                assert!(
1662                    sources.len() >= 4,
1663                    "QuorumVerified sources should contain at least the 4 quorum-positive peers, got {}",
1664                    sources.len()
1665                );
1666                // The sources list is used downstream to authorize
1667                // PaidForList insertion. Verify specific peers are present.
1668                assert!(
1669                    sources.contains(&quorum_peers[0]),
1670                    "source peer 1 should be in sources"
1671                );
1672                assert!(
1673                    sources.contains(&quorum_peers[1]),
1674                    "source peer 2 should be in sources"
1675                );
1676            }
1677            other => panic!("expected QuorumVerified, got {other:?}"),
1678        }
1679    }
1680
1681    /// Scenario 44: Paid-list cold-start recovery via replica majority.
1682    ///
1683    /// Multiple nodes restart simultaneously and lose their `PaidForList`
1684    /// (persistence corrupted). Key `K` still has `>= QuorumNeeded(K)`
1685    /// replicas in the close group. During neighbor-sync verification,
1686    /// presence quorum passes and all verifying nodes re-derive `K` into
1687    /// their `PaidForList` via close-group replica majority (Section 7.2
1688    /// rule 4).
1689    ///
1690    /// This test verifies that when paid-list evidence is entirely
1691    /// `NotFound` (simulating data loss) but presence evidence meets
1692    /// quorum, the outcome is `QuorumVerified` with sources that enable
1693    /// `PaidForList` re-derivation.
1694    #[test]
1695    fn scenario_44_cold_start_recovery_via_replica_majority() {
1696        let key = xor_name_from_byte(0xD9);
1697        let config = ReplicationConfig::default();
1698
1699        // 7 quorum peers, quorum_needed = min(4, floor(7/2)+1) = 4.
1700        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1701        // 10 paid peers (wider group), confirm_needed = floor(10/2)+1 = 6.
1702        let paid_peers: Vec<PeerId> = (10..=19).map(peer_id_from_byte).collect();
1703        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1704
1705        // Cold-start scenario: ALL paid-list entries are lost across every
1706        // peer in PaidCloseGroup. Every paid peer reports NotFound.
1707        let paid_evidence: Vec<(PeerId, PaidListEvidence)> = paid_peers
1708            .iter()
1709            .map(|p| (*p, PaidListEvidence::NotFound))
1710            .collect();
1711
1712        // But the replicas still exist: 5 out of 7 quorum peers report
1713        // Present (>= QuorumNeeded(K) = 4).
1714        let presence_evidence = vec![
1715            (quorum_peers[0], PresenceEvidence::Present),
1716            (quorum_peers[1], PresenceEvidence::Present),
1717            (quorum_peers[2], PresenceEvidence::Present),
1718            (quorum_peers[3], PresenceEvidence::Present),
1719            (quorum_peers[4], PresenceEvidence::Present),
1720            (quorum_peers[5], PresenceEvidence::Absent),
1721            (quorum_peers[6], PresenceEvidence::Absent),
1722        ];
1723
1724        let evidence = build_evidence(presence_evidence, paid_evidence);
1725        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1726
1727        match outcome {
1728            KeyVerificationOutcome::QuorumVerified { ref sources } => {
1729                // Quorum passed despite total paid-list loss. The caller
1730                // re-derives PaidForList from close-group replica majority.
1731                assert!(
1732                    sources.len() >= 4,
1733                    "QuorumVerified should have >= 4 sources (the presence-positive peers), got {}",
1734                    sources.len()
1735                );
1736
1737                // Verify the specific Present peers are in sources.
1738                for (i, peer) in quorum_peers.iter().enumerate().take(5) {
1739                    assert!(
1740                        sources.contains(peer),
1741                        "quorum_peer[{i}] responded Present and should be a fetch source"
1742                    );
1743                }
1744
1745                // Absent peers are NOT sources.
1746                assert!(
1747                    !sources.contains(&quorum_peers[5]),
1748                    "absent peer should not be a fetch source"
1749                );
1750                assert!(
1751                    !sources.contains(&quorum_peers[6]),
1752                    "absent peer should not be a fetch source"
1753                );
1754            }
1755            other => panic!(
1756                "Cold-start recovery should succeed via replica majority \
1757                 (QuorumVerified), got {other:?}"
1758            ),
1759        }
1760    }
1761
1762    /// Scenario 20: Unknown replica key found in local `PaidForList` bypasses
1763    /// presence quorum.
1764    ///
1765    /// When a key's paid-list evidence shows confirmation from enough peers,
1766    /// `PaidListVerified` is returned even without a single presence-positive
1767    /// response.  This models the local-hit fast-path: the caller already
1768    /// checked the local paid list and the network confirms majority — no
1769    /// presence quorum needed.
1770    #[test]
1771    fn scenario_20_paid_list_local_hit_bypasses_presence_quorum() {
1772        let key = xor_name_from_byte(0xE0);
1773        let config = ReplicationConfig::default();
1774
1775        // 7 quorum peers, quorum_needed = 4.
1776        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1777        // 5 paid peers, confirm_needed = floor(5/2)+1 = 3.
1778        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1779        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1780
1781        // ALL quorum peers Absent (presence quorum impossible) but 3/5 paid
1782        // peers confirm → PaidListVerified.
1783        let evidence = build_evidence(
1784            quorum_peers
1785                .iter()
1786                .map(|p| (*p, PresenceEvidence::Absent))
1787                .collect(),
1788            vec![
1789                (paid_peers[0], PaidListEvidence::Confirmed),
1790                (paid_peers[1], PaidListEvidence::Confirmed),
1791                (paid_peers[2], PaidListEvidence::Confirmed),
1792                (paid_peers[3], PaidListEvidence::NotFound),
1793                (paid_peers[4], PaidListEvidence::NotFound),
1794            ],
1795        );
1796
1797        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1798        assert!(
1799            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
1800            "paid-list majority should bypass failed presence quorum, got {outcome:?}"
1801        );
1802    }
1803
1804    /// Scenario 22: Paid-list confirmation below threshold AND presence quorum
1805    /// fails → `QuorumFailed`.
1806    ///
1807    /// Neither path can succeed: presence peers are all Absent (can't reach
1808    /// `quorum_needed`) and paid confirmations are below `confirm_needed`.
1809    #[test]
1810    fn scenario_22_paid_list_rejection_below_threshold() {
1811        let key = xor_name_from_byte(0xE2);
1812        let config = ReplicationConfig::default();
1813
1814        // 7 quorum peers, quorum_needed = 4.
1815        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1816        // 5 paid peers, confirm_needed = 3.
1817        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1818        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1819
1820        // All quorum peers Absent; only 2/5 paid confirmations (below 3).
1821        let evidence = build_evidence(
1822            quorum_peers
1823                .iter()
1824                .map(|p| (*p, PresenceEvidence::Absent))
1825                .collect(),
1826            vec![
1827                (paid_peers[0], PaidListEvidence::Confirmed),
1828                (paid_peers[1], PaidListEvidence::Confirmed),
1829                (paid_peers[2], PaidListEvidence::NotFound),
1830                (paid_peers[3], PaidListEvidence::NotFound),
1831                (paid_peers[4], PaidListEvidence::NotFound),
1832            ],
1833        );
1834
1835        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1836        assert!(
1837            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
1838            "below-threshold paid confirmations with all-Absent quorum should yield QuorumFailed, got {outcome:?}"
1839        );
1840    }
1841}