Skip to main content

ant_node/replication/
quorum.rs

1//! Quorum verification logic (Section 9).
2//!
3//! Single-round batched verification: presence + paid-list evidence collected
4//! in one request round to `VerifyTargets = PaidTargets ∪ QuorumTargets`.
5
6use std::collections::{HashMap, HashSet};
7use std::sync::Arc;
8
9use crate::logging::{debug, warn};
10use saorsa_core::identity::PeerId;
11use saorsa_core::P2PNode;
12
13use crate::ant_protocol::XorName;
14use crate::replication::config::{ReplicationConfig, REPLICATION_PROTOCOL_ID};
15use crate::replication::protocol::{
16    ReplicationMessage, ReplicationMessageBody, VerificationRequest, VerificationResponse,
17};
18use crate::replication::types::{KeyVerificationEvidence, PaidListEvidence, PresenceEvidence};
19
20// ---------------------------------------------------------------------------
21// Verification targets
22// ---------------------------------------------------------------------------
23
24/// Targets for verifying a set of keys.
25#[derive(Debug)]
26pub struct VerificationTargets {
27    /// Per-key: closest `CLOSE_GROUP_SIZE` peers (excluding self) for presence
28    /// quorum.
29    pub quorum_targets: HashMap<XorName, Vec<PeerId>>,
30    /// Per-key: `PaidCloseGroup` peers for paid-list majority.
31    pub paid_targets: HashMap<XorName, Vec<PeerId>>,
32    /// Union of all target peers across all keys.
33    pub all_peers: HashSet<PeerId>,
34    /// Which keys each peer should be queried about.
35    pub peer_to_keys: HashMap<PeerId, Vec<XorName>>,
36    /// Which keys need paid-list checks from which peers.
37    pub peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>>,
38}
39
40/// Compute verification targets for a batch of keys.
41///
42/// For each key, determines the `QuorumTargets` (closest `CLOSE_GROUP_SIZE`
43/// peers excluding self) and `PaidTargets` (`PaidCloseGroup` excluding self),
44/// then unions them into per-peer request batches.
45pub async fn compute_verification_targets(
46    keys: &[XorName],
47    p2p_node: &Arc<P2PNode>,
48    config: &ReplicationConfig,
49    self_id: &PeerId,
50) -> VerificationTargets {
51    let dht = p2p_node.dht_manager();
52    let mut targets = VerificationTargets {
53        quorum_targets: HashMap::new(),
54        paid_targets: HashMap::new(),
55        all_peers: HashSet::new(),
56        peer_to_keys: HashMap::new(),
57        peer_to_paid_keys: HashMap::new(),
58    };
59
60    for &key in keys {
61        // QuorumTargets: up to CLOSE_GROUP_SIZE nearest peers for K, excluding
62        // self.
63        let closest = dht
64            .find_closest_nodes_local(&key, config.close_group_size)
65            .await;
66        let quorum_peers: Vec<PeerId> = closest
67            .iter()
68            .filter(|n| n.peer_id != *self_id)
69            .map(|n| n.peer_id)
70            .collect();
71
72        // PaidTargets: PaidCloseGroup(K) excluding self.
73        let paid_closest = dht
74            .find_closest_nodes_local_with_self(&key, config.paid_list_close_group_size)
75            .await;
76        let paid_peers: Vec<PeerId> = paid_closest
77            .iter()
78            .filter(|n| n.peer_id != *self_id)
79            .map(|n| n.peer_id)
80            .collect();
81
82        // VerifyTargets = PaidTargets ∪ QuorumTargets
83        for &peer in &quorum_peers {
84            targets.all_peers.insert(peer);
85            targets.peer_to_keys.entry(peer).or_default().push(key);
86        }
87        for &peer in &paid_peers {
88            targets.all_peers.insert(peer);
89            targets.peer_to_keys.entry(peer).or_default().push(key);
90            targets
91                .peer_to_paid_keys
92                .entry(peer)
93                .or_default()
94                .insert(key);
95        }
96
97        targets.quorum_targets.insert(key, quorum_peers);
98        targets.paid_targets.insert(key, paid_peers);
99    }
100
101    // Deduplicate keys per peer (a peer in both quorum and paid targets for
102    // the same key would have it listed twice).
103    for keys_list in targets.peer_to_keys.values_mut() {
104        keys_list.sort_unstable();
105        keys_list.dedup();
106    }
107
108    targets
109}
110
111// ---------------------------------------------------------------------------
112// Verification outcome
113// ---------------------------------------------------------------------------
114
115/// Outcome of verifying a single key.
116#[derive(Debug, Clone)]
117pub enum KeyVerificationOutcome {
118    /// Presence quorum passed.
119    QuorumVerified {
120        /// Peers that responded `Present` (verified fetch sources).
121        sources: Vec<PeerId>,
122    },
123    /// Paid-list authorization succeeded.
124    PaidListVerified {
125        /// Peers that responded `Present` (potential fetch sources, may be
126        /// empty).
127        sources: Vec<PeerId>,
128    },
129    /// Quorum failed definitively (both paths impossible).
130    QuorumFailed,
131    /// Inconclusive (timeout with neither success nor fail-fast).
132    QuorumInconclusive,
133}
134
135// ---------------------------------------------------------------------------
136// Evidence evaluation (pure logic, no I/O)
137// ---------------------------------------------------------------------------
138
139/// Evaluate verification evidence for a single key.
140///
141/// Returns the outcome based on Section 9 rules:
142/// - **Step 10**: If presence positives >= `QuorumNeeded(K)`, `QuorumVerified`.
143/// - **Step 9**: If paid confirmations >= `ConfirmNeeded(K)`,
144///   `PaidListVerified`.
145/// - **Step 14**: Fail fast when both paths are impossible.
146/// - **Step 15**: Otherwise inconclusive.
147#[must_use]
148pub fn evaluate_key_evidence(
149    key: &XorName,
150    evidence: &KeyVerificationEvidence,
151    targets: &VerificationTargets,
152    config: &ReplicationConfig,
153) -> KeyVerificationOutcome {
154    let quorum_peers = targets
155        .quorum_targets
156        .get(key)
157        .map_or(&[][..], Vec::as_slice);
158
159    // Count presence evidence from QuorumTargets.
160    let mut presence_positive = 0usize;
161    let mut presence_unresolved = 0usize;
162    let mut present_peers = Vec::new();
163
164    for peer in quorum_peers {
165        match evidence.presence.get(peer) {
166            Some(PresenceEvidence::Present) => {
167                presence_positive += 1;
168                present_peers.push(*peer);
169            }
170            Some(PresenceEvidence::Absent) => {}
171            Some(PresenceEvidence::Unresolved) | None => {
172                presence_unresolved += 1;
173            }
174        }
175    }
176
177    // Also collect Present peers from paid targets for fetch sources.
178    let paid_peers = targets.paid_targets.get(key).map_or(&[][..], Vec::as_slice);
179
180    for peer in paid_peers {
181        if matches!(evidence.presence.get(peer), Some(PresenceEvidence::Present))
182            && !present_peers.contains(peer)
183        {
184            present_peers.push(*peer);
185        }
186    }
187
188    // Count paid-list evidence from PaidTargets.
189    let mut paid_confirmed = 0usize;
190    let mut paid_unresolved = 0usize;
191
192    for peer in paid_peers {
193        match evidence.paid_list.get(peer) {
194            Some(PaidListEvidence::Confirmed) => paid_confirmed += 1,
195            Some(PaidListEvidence::NotFound) => {}
196            Some(PaidListEvidence::Unresolved) | None => paid_unresolved += 1,
197        }
198    }
199
200    let quorum_needed = config.quorum_needed(quorum_peers.len());
201    let paid_group_size = paid_peers.len();
202    let confirm_needed = ReplicationConfig::confirm_needed(paid_group_size);
203
204    // Step 10: Presence quorum reached.
205    // quorum_needed == 0 means zero targets exist — quorum is impossible,
206    // not trivially met.
207    if quorum_needed > 0 && presence_positive >= quorum_needed {
208        return KeyVerificationOutcome::QuorumVerified {
209            sources: present_peers,
210        };
211    }
212
213    // Step 9: Paid-list majority reached.
214    // confirm_needed from 0 paid peers is 1, so this naturally fails with
215    // 0 confirmed — no special guard needed. But be explicit for clarity.
216    if paid_group_size > 0 && paid_confirmed >= confirm_needed {
217        return KeyVerificationOutcome::PaidListVerified {
218            sources: present_peers,
219        };
220    }
221
222    // Step 14: Fail fast when both paths are impossible.
223    let paid_possible = paid_group_size > 0 && paid_confirmed + paid_unresolved >= confirm_needed;
224    let quorum_possible =
225        quorum_needed > 0 && presence_positive + presence_unresolved >= quorum_needed;
226
227    if !paid_possible && !quorum_possible {
228        return KeyVerificationOutcome::QuorumFailed;
229    }
230
231    // Step 15: Neither success nor fail-fast.
232    KeyVerificationOutcome::QuorumInconclusive
233}
234
235// ---------------------------------------------------------------------------
236// Network verification round
237// ---------------------------------------------------------------------------
238
239/// Send batched verification requests to all peers and collect evidence.
240///
241/// Implements Section 9 requirement: one request per peer carrying many keys.
242/// Returns per-key evidence aggregated from all peer responses.
243pub async fn run_verification_round(
244    keys: &[XorName],
245    targets: &VerificationTargets,
246    p2p_node: &Arc<P2PNode>,
247    config: &ReplicationConfig,
248) -> HashMap<XorName, KeyVerificationEvidence> {
249    // Initialize empty evidence for all keys.
250    let mut evidence: HashMap<XorName, KeyVerificationEvidence> = keys
251        .iter()
252        .map(|&k| {
253            (
254                k,
255                KeyVerificationEvidence {
256                    presence: HashMap::new(),
257                    paid_list: HashMap::new(),
258                },
259            )
260        })
261        .collect();
262
263    // Send one batched request per peer.
264    let mut handles = Vec::new();
265
266    for (&peer, peer_keys) in &targets.peer_to_keys {
267        let paid_check_keys = targets.peer_to_paid_keys.get(&peer);
268
269        // Build paid_list_check_indices: which of this peer's keys need
270        // paid-list status.
271        let mut paid_indices = Vec::new();
272        for (i, key) in peer_keys.iter().enumerate() {
273            if let Some(paid_keys) = paid_check_keys {
274                if paid_keys.contains(key) {
275                    if let Ok(idx) = u32::try_from(i) {
276                        paid_indices.push(idx);
277                    }
278                }
279            }
280        }
281
282        let request = VerificationRequest {
283            keys: peer_keys.clone(),
284            paid_list_check_indices: paid_indices,
285        };
286
287        let msg = ReplicationMessage {
288            request_id: rand::random(),
289            body: ReplicationMessageBody::VerificationRequest(request),
290        };
291
292        let p2p = Arc::clone(p2p_node);
293        let timeout = config.verification_request_timeout;
294        let peer_id = peer;
295
296        handles.push(tokio::spawn(async move {
297            let encoded = match msg.encode() {
298                Ok(data) => data,
299                Err(e) => {
300                    warn!("Failed to encode verification request: {e}");
301                    return (peer_id, None);
302                }
303            };
304
305            match p2p
306                .send_request(&peer_id, REPLICATION_PROTOCOL_ID, encoded, timeout)
307                .await
308            {
309                Ok(response) => match ReplicationMessage::decode(&response.data) {
310                    Ok(decoded) => (peer_id, Some(decoded)),
311                    Err(e) => {
312                        warn!("Failed to decode verification response from {peer_id}: {e}");
313                        (peer_id, None)
314                    }
315                },
316                Err(e) => {
317                    debug!("Verification request to {peer_id} failed: {e}");
318                    (peer_id, None)
319                }
320            }
321        }));
322    }
323
324    // Collect responses.
325    for handle in handles {
326        let (peer, response) = match handle.await {
327            Ok(result) => result,
328            Err(e) => {
329                warn!("Verification task panicked: {e}");
330                continue;
331            }
332        };
333
334        let Some(msg) = response else {
335            // Timeout/error: mark all keys for this peer as unresolved.
336            mark_peer_unresolved(&peer, targets, &mut evidence);
337            continue;
338        };
339
340        if let ReplicationMessageBody::VerificationResponse(resp) = msg.body {
341            process_verification_response(&peer, &resp, targets, &mut evidence);
342        }
343    }
344
345    evidence
346}
347
348/// Mark all keys for a peer as unresolved (timeout / decode failure).
349fn mark_peer_unresolved(
350    peer: &PeerId,
351    targets: &VerificationTargets,
352    evidence: &mut HashMap<XorName, KeyVerificationEvidence>,
353) {
354    if let Some(peer_keys) = targets.peer_to_keys.get(peer) {
355        let is_paid_peer = targets.peer_to_paid_keys.get(peer);
356        for key in peer_keys {
357            if let Some(ev) = evidence.get_mut(key) {
358                ev.presence.insert(*peer, PresenceEvidence::Unresolved);
359                if is_paid_peer.is_some_and(|ks| ks.contains(key)) {
360                    ev.paid_list.insert(*peer, PaidListEvidence::Unresolved);
361                }
362            }
363        }
364    }
365}
366
367/// Process a single peer's verification response into the evidence map.
368fn process_verification_response(
369    peer: &PeerId,
370    response: &VerificationResponse,
371    targets: &VerificationTargets,
372    evidence: &mut HashMap<XorName, KeyVerificationEvidence>,
373) {
374    let Some(peer_keys) = targets.peer_to_keys.get(peer) else {
375        return;
376    };
377
378    // Use a HashSet for O(1) key membership checks instead of linear scan,
379    // preventing CPU amplification from large responses.
380    let peer_keys_set: HashSet<&XorName> = peer_keys.iter().collect();
381
382    // Cap results at 2x requested keys to limit processing of stuffed
383    // responses while still tolerating some unsolicited entries.
384    let max_results = peer_keys.len().saturating_mul(2);
385    let results = if response.results.len() > max_results {
386        warn!(
387            "Peer {peer} sent {} verification results but only {} keys were requested — truncating",
388            response.results.len(),
389            peer_keys.len(),
390        );
391        &response.results[..max_results]
392    } else {
393        &response.results
394    };
395
396    // Match response results to requested keys.
397    for result in results {
398        if !peer_keys_set.contains(&result.key) {
399            continue; // Ignore unsolicited key results.
400        }
401
402        if let Some(ev) = evidence.get_mut(&result.key) {
403            // Presence evidence.
404            let presence = if result.present {
405                PresenceEvidence::Present
406            } else {
407                PresenceEvidence::Absent
408            };
409            ev.presence.insert(*peer, presence);
410
411            // Paid-list evidence (only if requested).
412            if let Some(is_paid) = result.paid {
413                let paid = if is_paid {
414                    PaidListEvidence::Confirmed
415                } else {
416                    PaidListEvidence::NotFound
417                };
418                ev.paid_list.insert(*peer, paid);
419            }
420        }
421    }
422
423    // Keys that were requested but not in response -> unresolved.
424    let is_paid_peer = targets.peer_to_paid_keys.get(peer);
425    for key in peer_keys {
426        if let Some(ev) = evidence.get_mut(key) {
427            ev.presence
428                .entry(*peer)
429                .or_insert(PresenceEvidence::Unresolved);
430            if is_paid_peer.is_some_and(|ks| ks.contains(key)) {
431                ev.paid_list
432                    .entry(*peer)
433                    .or_insert(PaidListEvidence::Unresolved);
434            }
435        }
436    }
437}
438
439// ---------------------------------------------------------------------------
440// Tests
441// ---------------------------------------------------------------------------
442
443#[cfg(test)]
444#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
445mod tests {
446    use super::*;
447    use crate::replication::protocol::KeyVerificationResult;
448
449    /// Build a `PeerId` from a single byte (zero-padded to 32 bytes).
450    fn peer_id_from_byte(b: u8) -> PeerId {
451        let mut bytes = [0u8; 32];
452        bytes[0] = b;
453        PeerId::from_bytes(bytes)
454    }
455
456    /// Build an `XorName` from a single byte (repeated to 32 bytes).
457    fn xor_name_from_byte(b: u8) -> XorName {
458        [b; 32]
459    }
460
461    /// Helper: build minimal `VerificationTargets` for a single key with
462    /// explicit quorum and paid peer lists.
463    fn single_key_targets(
464        key: &XorName,
465        quorum_peers: Vec<PeerId>,
466        paid_peers: Vec<PeerId>,
467    ) -> VerificationTargets {
468        let mut all_peers = HashSet::new();
469        let mut peer_to_keys: HashMap<PeerId, Vec<XorName>> = HashMap::new();
470        let mut peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>> = HashMap::new();
471
472        for &p in &quorum_peers {
473            all_peers.insert(p);
474            peer_to_keys.entry(p).or_default().push(*key);
475        }
476        for &p in &paid_peers {
477            all_peers.insert(p);
478            peer_to_keys.entry(p).or_default().push(*key);
479            peer_to_paid_keys.entry(p).or_default().insert(*key);
480        }
481
482        // Deduplicate keys per peer.
483        for keys_list in peer_to_keys.values_mut() {
484            keys_list.sort_unstable();
485            keys_list.dedup();
486        }
487
488        VerificationTargets {
489            quorum_targets: std::iter::once((key.to_owned(), quorum_peers)).collect(),
490            paid_targets: std::iter::once((key.to_owned(), paid_peers)).collect(),
491            all_peers,
492            peer_to_keys,
493            peer_to_paid_keys,
494        }
495    }
496
497    /// Helper: build `KeyVerificationEvidence` from presence and paid-list
498    /// maps.
499    fn build_evidence(
500        presence: Vec<(PeerId, PresenceEvidence)>,
501        paid_list: Vec<(PeerId, PaidListEvidence)>,
502    ) -> KeyVerificationEvidence {
503        KeyVerificationEvidence {
504            presence: presence.into_iter().collect(),
505            paid_list: paid_list.into_iter().collect(),
506        }
507    }
508
509    // -----------------------------------------------------------------------
510    // evaluate_key_evidence: QuorumVerified
511    // -----------------------------------------------------------------------
512
513    #[test]
514    fn quorum_verified_with_enough_present_responses() {
515        let key = xor_name_from_byte(0x10);
516        let config = ReplicationConfig::default();
517
518        // 7 quorum peers, threshold = min(4, floor(7/2)+1) = 4
519        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
520        let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
521
522        // 4 peers say Present, 3 say Absent.
523        let evidence = build_evidence(
524            vec![
525                (quorum_peers[0], PresenceEvidence::Present),
526                (quorum_peers[1], PresenceEvidence::Present),
527                (quorum_peers[2], PresenceEvidence::Present),
528                (quorum_peers[3], PresenceEvidence::Present),
529                (quorum_peers[4], PresenceEvidence::Absent),
530                (quorum_peers[5], PresenceEvidence::Absent),
531                (quorum_peers[6], PresenceEvidence::Absent),
532            ],
533            vec![],
534        );
535
536        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
537        assert!(
538            matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 4),
539            "expected QuorumVerified with 4 sources, got {outcome:?}"
540        );
541    }
542
543    // -----------------------------------------------------------------------
544    // evaluate_key_evidence: PaidListVerified
545    // -----------------------------------------------------------------------
546
547    #[test]
548    fn paid_list_verified_with_enough_confirmations() {
549        let key = xor_name_from_byte(0x20);
550        let config = ReplicationConfig::default();
551
552        // 5 paid peers, confirm_needed = floor(5/2)+1 = 3
553        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
554        // No quorum peers (or quorum fails).
555        let quorum_peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
556        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
557
558        // Quorum: all Absent (fails presence path).
559        // Paid: 3 Confirmed, 2 NotFound -> majority reached.
560        let evidence = build_evidence(
561            vec![
562                (quorum_peers[0], PresenceEvidence::Absent),
563                (quorum_peers[1], PresenceEvidence::Absent),
564                (quorum_peers[2], PresenceEvidence::Absent),
565            ],
566            vec![
567                (paid_peers[0], PaidListEvidence::Confirmed),
568                (paid_peers[1], PaidListEvidence::Confirmed),
569                (paid_peers[2], PaidListEvidence::Confirmed),
570                (paid_peers[3], PaidListEvidence::NotFound),
571                (paid_peers[4], PaidListEvidence::NotFound),
572            ],
573        );
574
575        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
576        assert!(
577            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
578            "expected PaidListVerified, got {outcome:?}"
579        );
580    }
581
582    // -----------------------------------------------------------------------
583    // evaluate_key_evidence: QuorumFailed
584    // -----------------------------------------------------------------------
585
586    #[test]
587    fn quorum_failed_when_both_paths_impossible() {
588        let key = xor_name_from_byte(0x30);
589        let config = ReplicationConfig::default();
590
591        // 5 quorum peers, quorum_needed = min(4, floor(5/2)+1) = min(4,3) = 3
592        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
593        // 3 paid peers, confirm_needed = floor(3/2)+1 = 2
594        let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
595        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
596
597        // Presence: all 5 Absent (0 positive, 0 unresolved) -> can't reach 3.
598        // Paid: all 3 NotFound (0 confirmed, 0 unresolved) -> can't reach 2.
599        let evidence = build_evidence(
600            vec![
601                (quorum_peers[0], PresenceEvidence::Absent),
602                (quorum_peers[1], PresenceEvidence::Absent),
603                (quorum_peers[2], PresenceEvidence::Absent),
604                (quorum_peers[3], PresenceEvidence::Absent),
605                (quorum_peers[4], PresenceEvidence::Absent),
606            ],
607            vec![
608                (paid_peers[0], PaidListEvidence::NotFound),
609                (paid_peers[1], PaidListEvidence::NotFound),
610                (paid_peers[2], PaidListEvidence::NotFound),
611            ],
612        );
613
614        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
615        assert!(
616            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
617            "expected QuorumFailed, got {outcome:?}"
618        );
619    }
620
621    // -----------------------------------------------------------------------
622    // evaluate_key_evidence: QuorumInconclusive
623    // -----------------------------------------------------------------------
624
625    #[test]
626    fn quorum_inconclusive_with_unresolved_peers() {
627        let key = xor_name_from_byte(0x40);
628        let config = ReplicationConfig::default();
629
630        // 5 quorum peers, quorum_needed = min(4, 3) = 3
631        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
632        // 3 paid peers, confirm_needed = 2
633        let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
634        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
635
636        // Presence: 2 Present, 1 Absent, 2 Unresolved.
637        // positive=2, unresolved=2 -> 2+2=4 >= 3 -> quorum still possible.
638        // Paid: 1 Confirmed, 1 Unresolved, 1 NotFound.
639        // confirmed=1, unresolved=1 -> 1+1=2 >= 2 -> paid still possible.
640        // Neither path reached yet -> Inconclusive.
641        let evidence = build_evidence(
642            vec![
643                (quorum_peers[0], PresenceEvidence::Present),
644                (quorum_peers[1], PresenceEvidence::Present),
645                (quorum_peers[2], PresenceEvidence::Absent),
646                (quorum_peers[3], PresenceEvidence::Unresolved),
647                (quorum_peers[4], PresenceEvidence::Unresolved),
648            ],
649            vec![
650                (paid_peers[0], PaidListEvidence::Confirmed),
651                (paid_peers[1], PaidListEvidence::Unresolved),
652                (paid_peers[2], PaidListEvidence::NotFound),
653            ],
654        );
655
656        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
657        assert!(
658            matches!(outcome, KeyVerificationOutcome::QuorumInconclusive),
659            "expected QuorumInconclusive, got {outcome:?}"
660        );
661    }
662
663    // -----------------------------------------------------------------------
664    // Dynamic thresholds with undersized sets
665    // -----------------------------------------------------------------------
666
667    #[test]
668    fn quorum_verified_with_undersized_quorum_targets() {
669        let key = xor_name_from_byte(0x50);
670        let config = ReplicationConfig::default();
671
672        // Only 2 quorum peers (undersized).
673        // quorum_needed = min(4, floor(2/2)+1) = min(4, 2) = 2
674        let quorum_peers: Vec<PeerId> = (1..=2).map(peer_id_from_byte).collect();
675        let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
676
677        // Both Present -> 2 >= 2 -> QuorumVerified.
678        let evidence = build_evidence(
679            vec![
680                (quorum_peers[0], PresenceEvidence::Present),
681                (quorum_peers[1], PresenceEvidence::Present),
682            ],
683            vec![],
684        );
685
686        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
687        assert!(
688            matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 2),
689            "expected QuorumVerified with 2 sources, got {outcome:?}"
690        );
691    }
692
693    #[test]
694    fn paid_list_verified_with_single_paid_peer() {
695        let key = xor_name_from_byte(0x60);
696        let config = ReplicationConfig::default();
697
698        // 1 paid peer, confirm_needed = floor(1/2)+1 = 1
699        let paid_peers = vec![peer_id_from_byte(10)];
700        // No quorum targets -> quorum path impossible from the start.
701        let targets = single_key_targets(&key, vec![], paid_peers.clone());
702
703        let evidence = build_evidence(vec![], vec![(paid_peers[0], PaidListEvidence::Confirmed)]);
704
705        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
706        assert!(
707            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
708            "expected PaidListVerified with single peer, got {outcome:?}"
709        );
710    }
711
712    #[test]
713    fn quorum_fails_with_zero_targets_no_paid() {
714        let key = xor_name_from_byte(0x70);
715        let config = ReplicationConfig::default();
716
717        // No quorum peers, no paid peers.
718        // quorum_needed(0) = min(4, 1) = 1, but 0 positive + 0 unresolved < 1.
719        // confirm_needed(0) = 1, but 0 confirmed + 0 unresolved < 1.
720        let targets = single_key_targets(&key, vec![], vec![]);
721
722        let evidence = build_evidence(vec![], vec![]);
723
724        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
725        assert!(
726            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
727            "expected QuorumFailed with zero targets, got {outcome:?}"
728        );
729    }
730
731    #[test]
732    fn quorum_verified_beats_paid_list_when_both_satisfied() {
733        // When both presence quorum AND paid-list majority are satisfied,
734        // QuorumVerified takes precedence (evaluated first).
735        let key = xor_name_from_byte(0x80);
736        let config = ReplicationConfig::default();
737
738        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
739        let paid_peers: Vec<PeerId> = (10..=12).map(peer_id_from_byte).collect();
740        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
741
742        // quorum_needed(5) = min(4, 3) = 3; all 5 Present -> quorum met.
743        // confirm_needed(3) = 2; all 3 Confirmed -> paid met.
744        let evidence = build_evidence(
745            vec![
746                (quorum_peers[0], PresenceEvidence::Present),
747                (quorum_peers[1], PresenceEvidence::Present),
748                (quorum_peers[2], PresenceEvidence::Present),
749                (quorum_peers[3], PresenceEvidence::Present),
750                (quorum_peers[4], PresenceEvidence::Present),
751            ],
752            vec![
753                (paid_peers[0], PaidListEvidence::Confirmed),
754                (paid_peers[1], PaidListEvidence::Confirmed),
755                (paid_peers[2], PaidListEvidence::Confirmed),
756            ],
757        );
758
759        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
760        assert!(
761            matches!(outcome, KeyVerificationOutcome::QuorumVerified { .. }),
762            "QuorumVerified should take precedence over PaidListVerified, got {outcome:?}"
763        );
764    }
765
766    // -----------------------------------------------------------------------
767    // process_verification_response
768    // -----------------------------------------------------------------------
769
770    #[test]
771    fn process_response_populates_evidence() {
772        let key = xor_name_from_byte(0x90);
773        let peer = peer_id_from_byte(1);
774
775        let targets = single_key_targets(&key, vec![peer], vec![peer]);
776
777        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
778            key,
779            KeyVerificationEvidence {
780                presence: HashMap::new(),
781                paid_list: HashMap::new(),
782            },
783        ))
784        .collect();
785
786        let response = VerificationResponse {
787            results: vec![KeyVerificationResult {
788                key,
789                present: true,
790                paid: Some(true),
791            }],
792        };
793
794        process_verification_response(&peer, &response, &targets, &mut evidence);
795
796        let ev = evidence.get(&key).expect("evidence for key");
797        assert_eq!(
798            ev.presence.get(&peer),
799            Some(&PresenceEvidence::Present),
800            "presence should be Present"
801        );
802        assert_eq!(
803            ev.paid_list.get(&peer),
804            Some(&PaidListEvidence::Confirmed),
805            "paid_list should be Confirmed"
806        );
807    }
808
809    #[test]
810    fn process_response_missing_key_gets_unresolved() {
811        let key = xor_name_from_byte(0xA0);
812        let peer = peer_id_from_byte(2);
813
814        let targets = single_key_targets(&key, vec![peer], vec![peer]);
815
816        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
817            key,
818            KeyVerificationEvidence {
819                presence: HashMap::new(),
820                paid_list: HashMap::new(),
821            },
822        ))
823        .collect();
824
825        // Empty response: peer did not include our key.
826        let response = VerificationResponse { results: vec![] };
827
828        process_verification_response(&peer, &response, &targets, &mut evidence);
829
830        let ev = evidence.get(&key).expect("evidence for key");
831        assert_eq!(
832            ev.presence.get(&peer),
833            Some(&PresenceEvidence::Unresolved),
834            "missing key in response should be Unresolved"
835        );
836        assert_eq!(
837            ev.paid_list.get(&peer),
838            Some(&PaidListEvidence::Unresolved),
839            "missing paid key in response should be Unresolved"
840        );
841    }
842
843    #[test]
844    fn process_response_ignores_unsolicited_keys() {
845        let key = xor_name_from_byte(0xB0);
846        let unsolicited_key = xor_name_from_byte(0xB1);
847        let peer = peer_id_from_byte(3);
848
849        let targets = single_key_targets(&key, vec![peer], vec![]);
850
851        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = std::iter::once((
852            key,
853            KeyVerificationEvidence {
854                presence: HashMap::new(),
855                paid_list: HashMap::new(),
856            },
857        ))
858        .collect();
859
860        // Response includes an unsolicited key.
861        let response = VerificationResponse {
862            results: vec![
863                KeyVerificationResult {
864                    key: unsolicited_key,
865                    present: true,
866                    paid: None,
867                },
868                KeyVerificationResult {
869                    key,
870                    present: false,
871                    paid: None,
872                },
873            ],
874        };
875
876        process_verification_response(&peer, &response, &targets, &mut evidence);
877
878        // Unsolicited key should not appear in evidence.
879        assert!(
880            !evidence.contains_key(&unsolicited_key),
881            "unsolicited key should not be in evidence"
882        );
883
884        let ev = evidence.get(&key).expect("evidence for key");
885        assert_eq!(
886            ev.presence.get(&peer),
887            Some(&PresenceEvidence::Absent),
888            "solicited key should have Absent"
889        );
890    }
891
892    // -----------------------------------------------------------------------
893    // mark_peer_unresolved
894    // -----------------------------------------------------------------------
895
896    #[test]
897    fn mark_unresolved_sets_all_keys_for_peer() {
898        let key_a = xor_name_from_byte(0xC0);
899        let key_b = xor_name_from_byte(0xC1);
900        let peer = peer_id_from_byte(5);
901
902        // Peer is a quorum target for key_a and a paid target for key_b.
903        let targets = VerificationTargets {
904            quorum_targets: std::iter::once((key_a, vec![peer])).collect(),
905            paid_targets: std::iter::once((key_b, vec![peer])).collect(),
906            all_peers: std::iter::once(peer).collect(),
907            peer_to_keys: std::iter::once((peer, vec![key_a, key_b])).collect(),
908            peer_to_paid_keys: std::iter::once((peer, std::iter::once(key_b).collect())).collect(),
909        };
910
911        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
912            (
913                key_a,
914                KeyVerificationEvidence {
915                    presence: HashMap::new(),
916                    paid_list: HashMap::new(),
917                },
918            ),
919            (
920                key_b,
921                KeyVerificationEvidence {
922                    presence: HashMap::new(),
923                    paid_list: HashMap::new(),
924                },
925            ),
926        ]
927        .into_iter()
928        .collect();
929
930        mark_peer_unresolved(&peer, &targets, &mut evidence);
931
932        let ev_a = evidence.get(&key_a).expect("evidence for key_a");
933        assert_eq!(
934            ev_a.presence.get(&peer),
935            Some(&PresenceEvidence::Unresolved)
936        );
937        // key_a is not in peer_to_paid_keys, so no paid_list entry.
938        assert!(!ev_a.paid_list.contains_key(&peer));
939
940        let ev_b = evidence.get(&key_b).expect("evidence for key_b");
941        assert_eq!(
942            ev_b.presence.get(&peer),
943            Some(&PresenceEvidence::Unresolved)
944        );
945        assert_eq!(
946            ev_b.paid_list.get(&peer),
947            Some(&PaidListEvidence::Unresolved)
948        );
949    }
950
951    // -----------------------------------------------------------------------
952    // Section 18 scenarios
953    // -----------------------------------------------------------------------
954
955    /// Scenario 4: All peers respond Absent with no paid confirmations.
956    /// Both presence and paid-list paths are impossible -> `QuorumFailed`.
957    #[test]
958    fn scenario_4_quorum_fail_transitions_to_abandoned() {
959        let key = xor_name_from_byte(0xD0);
960        let config = ReplicationConfig::default();
961
962        // 7 quorum peers, threshold = min(4, floor(7/2)+1) = 4
963        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
964        // 5 paid peers, confirm_needed = floor(5/2)+1 = 3
965        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
966        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
967
968        // All quorum peers respond Absent, all paid peers respond NotFound.
969        let evidence = build_evidence(
970            quorum_peers
971                .iter()
972                .map(|p| (*p, PresenceEvidence::Absent))
973                .collect(),
974            paid_peers
975                .iter()
976                .map(|p| (*p, PaidListEvidence::NotFound))
977                .collect(),
978        );
979
980        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
981        assert!(
982            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
983            "all-Absent with no paid confirmations should yield QuorumFailed, got {outcome:?}"
984        );
985    }
986
987    /// Scenario 16: All peers unresolved (timeout). Neither success nor
988    /// fail-fast is possible because unresolved counts keep both paths alive.
989    #[test]
990    fn scenario_16_timeout_yields_inconclusive() {
991        let key = xor_name_from_byte(0xD1);
992        let config = ReplicationConfig::default();
993
994        // 7 quorum peers, quorum_needed = 4
995        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
996        // 5 paid peers, confirm_needed = 3
997        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
998        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
999
1000        // Every peer is Unresolved (simulating full timeout).
1001        let evidence = build_evidence(
1002            quorum_peers
1003                .iter()
1004                .map(|p| (*p, PresenceEvidence::Unresolved))
1005                .collect(),
1006            paid_peers
1007                .iter()
1008                .map(|p| (*p, PaidListEvidence::Unresolved))
1009                .collect(),
1010        );
1011
1012        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1013        assert!(
1014            matches!(outcome, KeyVerificationOutcome::QuorumInconclusive),
1015            "all-unresolved should yield QuorumInconclusive, got {outcome:?}"
1016        );
1017    }
1018
1019    /// Scenario 27: A single verification round collects both presence
1020    /// evidence from `QuorumTargets` and paid-list confirmations from
1021    /// `PaidTargets`. Paid-list success triggers `PaidListVerified` even when
1022    /// presence quorum fails.
1023    #[test]
1024    fn scenario_27_single_round_collects_both_presence_and_paid() {
1025        let key = xor_name_from_byte(0xD2);
1026        let config = ReplicationConfig::default();
1027
1028        // 7 quorum peers: only 1 Present (quorum_needed=4, so quorum fails).
1029        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1030        // 5 paid peers: 3 Confirmed (confirm_needed=3, so paid passes).
1031        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1032        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1033
1034        let evidence = build_evidence(
1035            vec![
1036                (quorum_peers[0], PresenceEvidence::Present),
1037                (quorum_peers[1], PresenceEvidence::Absent),
1038                (quorum_peers[2], PresenceEvidence::Absent),
1039                (quorum_peers[3], PresenceEvidence::Absent),
1040                (quorum_peers[4], PresenceEvidence::Absent),
1041                (quorum_peers[5], PresenceEvidence::Absent),
1042                (quorum_peers[6], PresenceEvidence::Absent),
1043            ],
1044            vec![
1045                (paid_peers[0], PaidListEvidence::Confirmed),
1046                (paid_peers[1], PaidListEvidence::Confirmed),
1047                (paid_peers[2], PaidListEvidence::Confirmed),
1048                (paid_peers[3], PaidListEvidence::NotFound),
1049                (paid_peers[4], PaidListEvidence::NotFound),
1050            ],
1051        );
1052
1053        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1054        assert!(
1055            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
1056            "paid-list majority should trigger PaidListVerified when quorum fails, got {outcome:?}"
1057        );
1058    }
1059
1060    /// Scenario 28: With |QuorumTargets|=3,
1061    /// `QuorumNeeded` = min(4, floor(3/2)+1) = min(4, 2) = 2.
1062    /// 2 Present responses should pass.
1063    #[test]
1064    fn scenario_28_dynamic_threshold_with_3_targets() {
1065        let key = xor_name_from_byte(0xD3);
1066        let config = ReplicationConfig::default();
1067
1068        let quorum_peers: Vec<PeerId> = (1..=3).map(peer_id_from_byte).collect();
1069        let targets = single_key_targets(&key, quorum_peers.clone(), vec![]);
1070
1071        // Verify the dynamic threshold is indeed 2.
1072        assert_eq!(config.quorum_needed(3), 2, "quorum_needed(3) should be 2");
1073
1074        // 2 Present, 1 Absent -> 2 >= 2 -> QuorumVerified.
1075        let evidence = build_evidence(
1076            vec![
1077                (quorum_peers[0], PresenceEvidence::Present),
1078                (quorum_peers[1], PresenceEvidence::Present),
1079                (quorum_peers[2], PresenceEvidence::Absent),
1080            ],
1081            vec![],
1082        );
1083
1084        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1085        assert!(
1086            matches!(outcome, KeyVerificationOutcome::QuorumVerified { ref sources } if sources.len() == 2),
1087            "2 Present in 3-target set should QuorumVerify, got {outcome:?}"
1088        );
1089    }
1090
1091    /// Helper: build `VerificationTargets` for two keys with shared or
1092    /// separate peer sets.
1093    fn two_key_targets(
1094        key_a: &XorName,
1095        key_b: &XorName,
1096        quorum_peers_a: Vec<PeerId>,
1097        quorum_peers_b: Vec<PeerId>,
1098        paid_peers_a: Vec<PeerId>,
1099        paid_peers_b: Vec<PeerId>,
1100    ) -> VerificationTargets {
1101        let mut all_peers = HashSet::new();
1102        let mut peer_to_keys: HashMap<PeerId, Vec<XorName>> = HashMap::new();
1103        let mut peer_to_paid_keys: HashMap<PeerId, HashSet<XorName>> = HashMap::new();
1104
1105        for &p in &quorum_peers_a {
1106            all_peers.insert(p);
1107            peer_to_keys.entry(p).or_default().push(*key_a);
1108        }
1109        for &p in &quorum_peers_b {
1110            all_peers.insert(p);
1111            peer_to_keys.entry(p).or_default().push(*key_b);
1112        }
1113        for &p in &paid_peers_a {
1114            all_peers.insert(p);
1115            peer_to_keys.entry(p).or_default().push(*key_a);
1116            peer_to_paid_keys.entry(p).or_default().insert(*key_a);
1117        }
1118        for &p in &paid_peers_b {
1119            all_peers.insert(p);
1120            peer_to_keys.entry(p).or_default().push(*key_b);
1121            peer_to_paid_keys.entry(p).or_default().insert(*key_b);
1122        }
1123
1124        for keys_list in peer_to_keys.values_mut() {
1125            keys_list.sort_unstable();
1126            keys_list.dedup();
1127        }
1128
1129        let mut quorum_targets = HashMap::new();
1130        quorum_targets.insert(*key_a, quorum_peers_a);
1131        quorum_targets.insert(*key_b, quorum_peers_b);
1132
1133        let mut paid_targets = HashMap::new();
1134        paid_targets.insert(*key_a, paid_peers_a);
1135        paid_targets.insert(*key_b, paid_peers_b);
1136
1137        VerificationTargets {
1138            quorum_targets,
1139            paid_targets,
1140            all_peers,
1141            peer_to_keys,
1142            peer_to_paid_keys,
1143        }
1144    }
1145
1146    /// Scenario 33: `process_verification_response` correctly attributes
1147    /// per-key evidence when a single peer responds for multiple keys.
1148    #[test]
1149    fn scenario_33_batched_response_per_key_evidence() {
1150        let key_a = xor_name_from_byte(0xD4);
1151        let key_b = xor_name_from_byte(0xD5);
1152        let peer = peer_id_from_byte(1);
1153
1154        // Peer is a quorum+paid target for both keys.
1155        let targets = two_key_targets(
1156            &key_a,
1157            &key_b,
1158            vec![peer],
1159            vec![peer],
1160            vec![peer],
1161            vec![peer],
1162        );
1163
1164        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
1165            (
1166                key_a,
1167                KeyVerificationEvidence {
1168                    presence: HashMap::new(),
1169                    paid_list: HashMap::new(),
1170                },
1171            ),
1172            (
1173                key_b,
1174                KeyVerificationEvidence {
1175                    presence: HashMap::new(),
1176                    paid_list: HashMap::new(),
1177                },
1178            ),
1179        ]
1180        .into_iter()
1181        .collect();
1182
1183        // Peer responds: key_a Present+Confirmed, key_b Absent+NotFound.
1184        let response = VerificationResponse {
1185            results: vec![
1186                KeyVerificationResult {
1187                    key: key_a,
1188                    present: true,
1189                    paid: Some(true),
1190                },
1191                KeyVerificationResult {
1192                    key: key_b,
1193                    present: false,
1194                    paid: Some(false),
1195                },
1196            ],
1197        };
1198
1199        process_verification_response(&peer, &response, &targets, &mut evidence);
1200
1201        // key_a: Present + Confirmed.
1202        let ev_a = evidence.get(&key_a).expect("evidence for key_a");
1203        assert_eq!(ev_a.presence.get(&peer), Some(&PresenceEvidence::Present));
1204        assert_eq!(
1205            ev_a.paid_list.get(&peer),
1206            Some(&PaidListEvidence::Confirmed)
1207        );
1208
1209        // key_b: Absent + NotFound.
1210        let ev_b = evidence.get(&key_b).expect("evidence for key_b");
1211        assert_eq!(ev_b.presence.get(&peer), Some(&PresenceEvidence::Absent));
1212        assert_eq!(ev_b.paid_list.get(&peer), Some(&PaidListEvidence::NotFound));
1213    }
1214
1215    /// Scenario 34: Peer responds for `key_a` but omits `key_b`.
1216    /// `key_a` gets explicit evidence, `key_b` gets Unresolved.
1217    #[test]
1218    fn scenario_34_partial_response_unresolved_per_key() {
1219        let key_a = xor_name_from_byte(0xD6);
1220        let key_b = xor_name_from_byte(0xD7);
1221        let peer = peer_id_from_byte(2);
1222
1223        // Peer is a quorum target for both keys, paid target for key_b only.
1224        let targets = two_key_targets(&key_a, &key_b, vec![peer], vec![peer], vec![], vec![peer]);
1225
1226        let mut evidence: HashMap<XorName, KeyVerificationEvidence> = [
1227            (
1228                key_a,
1229                KeyVerificationEvidence {
1230                    presence: HashMap::new(),
1231                    paid_list: HashMap::new(),
1232                },
1233            ),
1234            (
1235                key_b,
1236                KeyVerificationEvidence {
1237                    presence: HashMap::new(),
1238                    paid_list: HashMap::new(),
1239                },
1240            ),
1241        ]
1242        .into_iter()
1243        .collect();
1244
1245        // Peer responds only for key_a, omits key_b entirely.
1246        let response = VerificationResponse {
1247            results: vec![KeyVerificationResult {
1248                key: key_a,
1249                present: true,
1250                paid: None,
1251            }],
1252        };
1253
1254        process_verification_response(&peer, &response, &targets, &mut evidence);
1255
1256        // key_a: explicit Present.
1257        let ev_a = evidence.get(&key_a).expect("evidence for key_a");
1258        assert_eq!(
1259            ev_a.presence.get(&peer),
1260            Some(&PresenceEvidence::Present),
1261            "key_a should have explicit Present"
1262        );
1263
1264        // key_b: missing from response -> Unresolved for both presence and
1265        // paid_list.
1266        let ev_b = evidence.get(&key_b).expect("evidence for key_b");
1267        assert_eq!(
1268            ev_b.presence.get(&peer),
1269            Some(&PresenceEvidence::Unresolved),
1270            "omitted key_b should get Unresolved presence"
1271        );
1272        assert_eq!(
1273            ev_b.paid_list.get(&peer),
1274            Some(&PaidListEvidence::Unresolved),
1275            "omitted key_b (paid target) should get Unresolved paid_list"
1276        );
1277    }
1278
1279    /// Scenario 42: `QuorumVerified` outcome populates sources correctly,
1280    /// which downstream uses to add the key to `PaidForList`.
1281    #[test]
1282    fn scenario_42_quorum_pass_derives_paid_list_auth() {
1283        let key = xor_name_from_byte(0xD8);
1284        let config = ReplicationConfig::default();
1285
1286        // 5 quorum peers, quorum_needed = min(4, 3) = 3.
1287        let quorum_peers: Vec<PeerId> = (1..=5).map(peer_id_from_byte).collect();
1288        // 3 paid peers (some overlap with quorum peers for realistic scenario).
1289        let paid_peers: Vec<PeerId> = (3..=5).map(peer_id_from_byte).collect();
1290        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1291
1292        // 4 quorum peers Present, 1 Absent -> quorum met.
1293        // Also mark paid_peers[0] (peer 3) as Present so it's collected from
1294        // paid targets too.
1295        let evidence = build_evidence(
1296            vec![
1297                (quorum_peers[0], PresenceEvidence::Present),
1298                (quorum_peers[1], PresenceEvidence::Present),
1299                (quorum_peers[2], PresenceEvidence::Present), // peer 3
1300                (quorum_peers[3], PresenceEvidence::Present), // peer 4
1301                (quorum_peers[4], PresenceEvidence::Absent),  // peer 5
1302            ],
1303            vec![
1304                (paid_peers[0], PaidListEvidence::NotFound),
1305                (paid_peers[1], PaidListEvidence::NotFound),
1306                (paid_peers[2], PaidListEvidence::NotFound),
1307            ],
1308        );
1309
1310        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1311        match outcome {
1312            KeyVerificationOutcome::QuorumVerified { ref sources } => {
1313                // Sources should include peers that responded Present from
1314                // both quorum and paid targets.
1315                assert!(
1316                    sources.len() >= 4,
1317                    "QuorumVerified sources should contain at least the 4 quorum-positive peers, got {}",
1318                    sources.len()
1319                );
1320                // The sources list is used downstream to authorize
1321                // PaidForList insertion. Verify specific peers are present.
1322                assert!(
1323                    sources.contains(&quorum_peers[0]),
1324                    "source peer 1 should be in sources"
1325                );
1326                assert!(
1327                    sources.contains(&quorum_peers[1]),
1328                    "source peer 2 should be in sources"
1329                );
1330            }
1331            other => panic!("expected QuorumVerified, got {other:?}"),
1332        }
1333    }
1334
1335    /// Scenario 44: Paid-list cold-start recovery via replica majority.
1336    ///
1337    /// Multiple nodes restart simultaneously and lose their `PaidForList`
1338    /// (persistence corrupted). Key `K` still has `>= QuorumNeeded(K)`
1339    /// replicas in the close group. During neighbor-sync verification,
1340    /// presence quorum passes and all verifying nodes re-derive `K` into
1341    /// their `PaidForList` via close-group replica majority (Section 7.2
1342    /// rule 4).
1343    ///
1344    /// This test verifies that when paid-list evidence is entirely
1345    /// `NotFound` (simulating data loss) but presence evidence meets
1346    /// quorum, the outcome is `QuorumVerified` with sources that enable
1347    /// `PaidForList` re-derivation.
1348    #[test]
1349    fn scenario_44_cold_start_recovery_via_replica_majority() {
1350        let key = xor_name_from_byte(0xD9);
1351        let config = ReplicationConfig::default();
1352
1353        // 7 quorum peers, quorum_needed = min(4, floor(7/2)+1) = 4.
1354        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1355        // 10 paid peers (wider group), confirm_needed = floor(10/2)+1 = 6.
1356        let paid_peers: Vec<PeerId> = (10..=19).map(peer_id_from_byte).collect();
1357        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1358
1359        // Cold-start scenario: ALL paid-list entries are lost across every
1360        // peer in PaidCloseGroup. Every paid peer reports NotFound.
1361        let paid_evidence: Vec<(PeerId, PaidListEvidence)> = paid_peers
1362            .iter()
1363            .map(|p| (*p, PaidListEvidence::NotFound))
1364            .collect();
1365
1366        // But the replicas still exist: 5 out of 7 quorum peers report
1367        // Present (>= QuorumNeeded(K) = 4).
1368        let presence_evidence = vec![
1369            (quorum_peers[0], PresenceEvidence::Present),
1370            (quorum_peers[1], PresenceEvidence::Present),
1371            (quorum_peers[2], PresenceEvidence::Present),
1372            (quorum_peers[3], PresenceEvidence::Present),
1373            (quorum_peers[4], PresenceEvidence::Present),
1374            (quorum_peers[5], PresenceEvidence::Absent),
1375            (quorum_peers[6], PresenceEvidence::Absent),
1376        ];
1377
1378        let evidence = build_evidence(presence_evidence, paid_evidence);
1379        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1380
1381        match outcome {
1382            KeyVerificationOutcome::QuorumVerified { ref sources } => {
1383                // Quorum passed despite total paid-list loss. The caller
1384                // re-derives PaidForList from close-group replica majority.
1385                assert!(
1386                    sources.len() >= 4,
1387                    "QuorumVerified should have >= 4 sources (the presence-positive peers), got {}",
1388                    sources.len()
1389                );
1390
1391                // Verify the specific Present peers are in sources.
1392                for (i, peer) in quorum_peers.iter().enumerate().take(5) {
1393                    assert!(
1394                        sources.contains(peer),
1395                        "quorum_peer[{i}] responded Present and should be a fetch source"
1396                    );
1397                }
1398
1399                // Absent peers are NOT sources.
1400                assert!(
1401                    !sources.contains(&quorum_peers[5]),
1402                    "absent peer should not be a fetch source"
1403                );
1404                assert!(
1405                    !sources.contains(&quorum_peers[6]),
1406                    "absent peer should not be a fetch source"
1407                );
1408            }
1409            other => panic!(
1410                "Cold-start recovery should succeed via replica majority \
1411                 (QuorumVerified), got {other:?}"
1412            ),
1413        }
1414    }
1415
1416    /// Scenario 20: Unknown replica key found in local `PaidForList` bypasses
1417    /// presence quorum.
1418    ///
1419    /// When a key's paid-list evidence shows confirmation from enough peers,
1420    /// `PaidListVerified` is returned even without a single presence-positive
1421    /// response.  This models the local-hit fast-path: the caller already
1422    /// checked the local paid list and the network confirms majority — no
1423    /// presence quorum needed.
1424    #[test]
1425    fn scenario_20_paid_list_local_hit_bypasses_presence_quorum() {
1426        let key = xor_name_from_byte(0xE0);
1427        let config = ReplicationConfig::default();
1428
1429        // 7 quorum peers, quorum_needed = 4.
1430        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1431        // 5 paid peers, confirm_needed = floor(5/2)+1 = 3.
1432        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1433        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1434
1435        // ALL quorum peers Absent (presence quorum impossible) but 3/5 paid
1436        // peers confirm → PaidListVerified.
1437        let evidence = build_evidence(
1438            quorum_peers
1439                .iter()
1440                .map(|p| (*p, PresenceEvidence::Absent))
1441                .collect(),
1442            vec![
1443                (paid_peers[0], PaidListEvidence::Confirmed),
1444                (paid_peers[1], PaidListEvidence::Confirmed),
1445                (paid_peers[2], PaidListEvidence::Confirmed),
1446                (paid_peers[3], PaidListEvidence::NotFound),
1447                (paid_peers[4], PaidListEvidence::NotFound),
1448            ],
1449        );
1450
1451        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1452        assert!(
1453            matches!(outcome, KeyVerificationOutcome::PaidListVerified { .. }),
1454            "paid-list majority should bypass failed presence quorum, got {outcome:?}"
1455        );
1456    }
1457
1458    /// Scenario 22: Paid-list confirmation below threshold AND presence quorum
1459    /// fails → `QuorumFailed`.
1460    ///
1461    /// Neither path can succeed: presence peers are all Absent (can't reach
1462    /// `quorum_needed`) and paid confirmations are below `confirm_needed`.
1463    #[test]
1464    fn scenario_22_paid_list_rejection_below_threshold() {
1465        let key = xor_name_from_byte(0xE2);
1466        let config = ReplicationConfig::default();
1467
1468        // 7 quorum peers, quorum_needed = 4.
1469        let quorum_peers: Vec<PeerId> = (1..=7).map(peer_id_from_byte).collect();
1470        // 5 paid peers, confirm_needed = 3.
1471        let paid_peers: Vec<PeerId> = (10..=14).map(peer_id_from_byte).collect();
1472        let targets = single_key_targets(&key, quorum_peers.clone(), paid_peers.clone());
1473
1474        // All quorum peers Absent; only 2/5 paid confirmations (below 3).
1475        let evidence = build_evidence(
1476            quorum_peers
1477                .iter()
1478                .map(|p| (*p, PresenceEvidence::Absent))
1479                .collect(),
1480            vec![
1481                (paid_peers[0], PaidListEvidence::Confirmed),
1482                (paid_peers[1], PaidListEvidence::Confirmed),
1483                (paid_peers[2], PaidListEvidence::NotFound),
1484                (paid_peers[3], PaidListEvidence::NotFound),
1485                (paid_peers[4], PaidListEvidence::NotFound),
1486            ],
1487        );
1488
1489        let outcome = evaluate_key_evidence(&key, &evidence, &targets, &config);
1490        assert!(
1491            matches!(outcome, KeyVerificationOutcome::QuorumFailed),
1492            "below-threshold paid confirmations with all-Absent quorum should yield QuorumFailed, got {outcome:?}"
1493        );
1494    }
1495}