Skip to main content

ant_core/data/client/
quote.rs

1//! Quote and payment operations.
2//!
3//! Handles requesting storage quotes from network nodes and
4//! managing payment for data storage.
5
6use crate::data::client::peer_xor_distance;
7use crate::data::client::Client;
8use crate::data::error::{Error, Result};
9use ant_protocol::evm::{Amount, PaymentQuote};
10use ant_protocol::transport::{DHTNode, MultiAddr, P2PNode, PeerId, WitnessedCloseGroup};
11use ant_protocol::{
12    compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
13    ChunkQuoteRequest, ChunkQuoteResponse, CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE,
14};
15use futures::stream::{FuturesUnordered, StreamExt};
16use std::collections::{HashMap, HashSet};
17use std::sync::Arc;
18use std::time::Duration;
19use tracing::{debug, info, warn};
20
21/// Fault-tolerant quote collection asks one extra close group of peers and
22/// keeps the closest successful `CLOSE_GROUP_SIZE` responders. This remains
23/// useful for merkle preflight probes, but single-node payments deliberately
24/// ask only the actual close group.
25const FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER: usize = 2;
26
27/// Witnessed close-group quorum as a fraction of the initial close group.
28/// For today's `CLOSE_GROUP_SIZE = 7`, this yields the requested 5-of-7
29/// quorum.
30const WITNESSED_QUORUM_NUMERATOR: usize = 2;
31const WITNESSED_QUORUM_DENOMINATOR: usize = 3;
32
33/// Number of closest nodes each initial witnessed responder contributes.
34const SINGLE_NODE_WITNESSED_VIEW_COUNT: usize = 20;
35
36/// Index of the paid median quote after sorting by quoted price.
37const MEDIAN_QUOTE_INDEX: usize = CLOSE_GROUP_SIZE / 2;
38
39/// Overall timeout for collecting quote responses. Must accommodate
40/// connect_with_fallback cascade (direct 5s + hole-punch 15s×3 + relay 30s ≈
41/// 80s) plus the per-peer quote timeout.
42const QUOTE_COLLECTION_TIMEOUT_SECS: u64 = 120;
43
44/// ML-DSA-65 public key length in bytes. Mirrors the same value defined as
45/// `pub const ML_DSA_65_PUBLIC_KEY_SIZE` in `saorsa-pqc::pqc::types`, which
46/// the storer's `peer_id_from_public_key_bytes` enforces. We keep a local
47/// copy here rather than adding a direct `saorsa-pqc` dep — the constant
48/// is FIPS-mandated for ML-DSA-65 and won't change unless we change variant.
49///
50/// TODO: switch to `saorsa_pqc::pqc::types::ML_DSA_65_PUBLIC_KEY_SIZE` once
51/// `ant-protocol` re-exports it (`pqc::ops::ML_DSA_65_PUBLIC_KEY_SIZE`).
52const ML_DSA_PUB_KEY_LEN: usize = 1952;
53
54/// Check that a quote's `pub_key` is well-formed and BLAKE3-hashes to the
55/// claimed `peer_id`.
56///
57/// The storer node enforces both constraints in `ant-node/src/payment/verifier.rs`
58/// (via `peer_id_from_public_key_bytes` and `validate_peer_bindings`): every
59/// quote inside a `ProofOfPayment` must (a) have a 1952-byte `pub_key` parsable
60/// as ML-DSA-65 and (b) satisfy `BLAKE3(pub_key) == peer_id`. A single quote
61/// failing either check causes the storer to reject the entire close-group
62/// proof and burn the chunk's payment.
63///
64/// We mirror the cheap structural check here. The storer also runs
65/// `verify_quote_content` and `verify_quote_signature`; those are ML-DSA
66/// verifications (~1 ms per requested quote) and are deliberately NOT mirrored
67/// on the client to keep upload latency unchanged. They are tracked as a
68/// follow-up if a real attack surfaces them.
69fn quote_binding_is_valid(peer_id: &PeerId, quote: &PaymentQuote) -> bool {
70    if quote.pub_key.len() != ML_DSA_PUB_KEY_LEN {
71        return false;
72    }
73    compute_address(&quote.pub_key) == *peer_id.as_bytes()
74}
75
76/// Classification of a `ChunkQuoteResponse::Success` body for a single peer.
77///
78/// Mirrors the storer-side `validate_peer_bindings` check from
79/// `ant-node/src/payment/verifier.rs` — the cheap BLAKE3 binding —
80/// so we drop misbehaving peers' quotes before payment.
81///
82/// We deliberately do NOT mirror the storer's `verify_quote_signature`
83/// (ML-DSA-65 verify, ~1 ms × CLOSE_GROUP_SIZE × every chunk) or
84/// `verify_quote_content`. Those are useful defense-in-depth for an
85/// attacker who self-consistently crafts a signed-but-stolen or wrong-
86/// content quote, but they are NOT cheap and are out of scope for this
87/// fix. Adding them changes upload latency materially. Track them as a
88/// follow-up if a real attack surfaces them.
89///
90/// Pulling the logic out of the async closure lets us unit-test the
91/// primary defense (not just the post-collect defensive filter).
92///
93/// # Returns
94///
95/// - `Ok((quote, price))` — the response is honoured as a quote.
96/// - `Err(Error::AlreadyStored)` — the peer claims the chunk is already
97///   present AND the quote it provided binds to its peer ID. Vote counts.
98/// - `Err(Error::BadQuoteBinding { .. })` — bad binding (mirrors the
99///   storer-side rejection). Outer collector counts these via the typed
100///   variant (no string matching).
101/// - `Err(Error::Serialization(...))` — the quote bytes did not deserialize.
102fn classify_quote_response(
103    peer_id: &PeerId,
104    quote_bytes: &[u8],
105    already_stored: bool,
106) -> std::result::Result<(PaymentQuote, Amount), Error> {
107    let payment_quote = rmp_serde::from_slice::<PaymentQuote>(quote_bytes).map_err(|e| {
108        Error::Serialization(format!("Failed to deserialize quote from {peer_id}: {e}"))
109    })?;
110
111    // Peer binding: BLAKE3(pub_key) must equal peer_id. This is the
112    // exact mitigation Chris and the AI investigation requested for the
113    // 2026-04-30 production failure: drop crossed-key peers before they
114    // poison the close-group ProofOfPayment.
115    if !quote_binding_is_valid(peer_id, &payment_quote) {
116        let derived = compute_address(&payment_quote.pub_key);
117        warn!(
118            "Dropping response from {peer_id} — quote.pub_key BLAKE3 mismatch \
119             (peer is signing quotes with another peer's key); the storer \
120             would reject this proof"
121        );
122        return Err(Error::BadQuoteBinding {
123            peer_id: peer_id.to_string(),
124            detail: format!(
125                "BLAKE3(pub_key)={} pub_key_len={}",
126                hex::encode(derived),
127                payment_quote.pub_key.len(),
128            ),
129        });
130    }
131
132    if already_stored {
133        debug!("Peer {peer_id} already has chunk");
134        return Err(Error::AlreadyStored);
135    }
136    let price = payment_quote.price;
137    debug!("Received quote from {peer_id}: price = {price}");
138    Ok((payment_quote, price))
139}
140
141/// Drop quotes whose `pub_key` does not BLAKE3-hash to the peer that supplied
142/// them. Logs each dropped quote at WARN.
143fn drop_quotes_with_bad_bindings(
144    quotes: &mut Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>,
145) -> usize {
146    let before = quotes.len();
147    quotes.retain(|(peer_id, _, quote, _)| {
148        if quote_binding_is_valid(peer_id, quote) {
149            true
150        } else {
151            warn!(
152                "Dropping quote from peer {peer_id} — quote.pub_key BLAKE3 mismatch \
153                 (peer is signing quotes with another peer's key); the storer would \
154                 reject this proof"
155            );
156            false
157        }
158    });
159    before - quotes.len()
160}
161
162#[allow(clippy::too_many_arguments)]
163async fn request_store_quote_from_peer(
164    node: Arc<P2PNode>,
165    peer_id: PeerId,
166    peer_addrs: Vec<MultiAddr>,
167    request_id: u64,
168    address: [u8; 32],
169    data_size: u64,
170    data_type: u32,
171    per_peer_timeout: Duration,
172) -> StoreQuoteRequestResult {
173    let request = ChunkQuoteRequest {
174        address,
175        data_size,
176        data_type,
177    };
178    let message = ChunkMessage {
179        request_id,
180        body: ChunkMessageBody::QuoteRequest(request),
181    };
182
183    let message_bytes = match message.encode() {
184        Ok(bytes) => bytes,
185        Err(e) => {
186            return (
187                peer_id,
188                peer_addrs,
189                Err(Error::Protocol(format!(
190                    "Failed to encode quote request for {peer_id}: {e}"
191                ))),
192            );
193        }
194    };
195
196    let result = send_and_await_chunk_response(
197        &node,
198        &peer_id,
199        message_bytes,
200        request_id,
201        per_peer_timeout,
202        &peer_addrs,
203        |body| match body {
204            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
205                quote,
206                already_stored,
207            }) => Some(classify_quote_response(&peer_id, &quote, already_stored)),
208            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => Some(Err(
209                Error::Protocol(format!("Quote error from {peer_id}: {e}")),
210            )),
211            _ => None,
212        },
213        |e| Error::Network(format!("Failed to send quote request to {peer_id}: {e}")),
214        || Error::Timeout(format!("Timeout waiting for quote from {peer_id}")),
215    )
216    .await;
217
218    (peer_id, peer_addrs, result)
219}
220
221#[allow(clippy::too_many_arguments)]
222fn record_store_quote_result(
223    peer_id: PeerId,
224    addrs: Vec<MultiAddr>,
225    quote_result: Result<(PaymentQuote, Amount)>,
226    address: &[u8; 32],
227    quotes: &mut Vec<StoreQuote>,
228    already_stored_peers: &mut Vec<(PeerId, [u8; 32])>,
229    failures: &mut Vec<String>,
230    bad_quote_count: &mut usize,
231) {
232    match quote_result {
233        Ok((quote, price)) => {
234            quotes.push((peer_id, addrs, quote, price));
235        }
236        Err(Error::AlreadyStored) => {
237            info!("Peer {peer_id} reports chunk already stored");
238            let dist = peer_xor_distance(&peer_id, address);
239            already_stored_peers.push((peer_id, dist));
240        }
241        Err(e) => {
242            if matches!(&e, Error::BadQuoteBinding { .. }) {
243                *bad_quote_count += 1;
244            }
245            warn!("Failed to get quote from {peer_id}: {e}");
246            failures.push(format!("{peer_id}: {e}"));
247        }
248    }
249}
250
251fn witnessed_quote_launch_budget(
252    successful_quotes: usize,
253    in_flight: usize,
254    remaining_peers: usize,
255) -> usize {
256    CLOSE_GROUP_SIZE
257        .saturating_sub(successful_quotes.saturating_add(in_flight))
258        .min(remaining_peers)
259}
260
261fn single_node_quote_query_count() -> usize {
262    CLOSE_GROUP_SIZE
263}
264
265fn fault_tolerant_quote_query_count() -> usize {
266    CLOSE_GROUP_SIZE * FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER
267}
268
269fn witnessed_close_group_quorum() -> usize {
270    (CLOSE_GROUP_SIZE * WITNESSED_QUORUM_NUMERATOR).div_ceil(WITNESSED_QUORUM_DENOMINATOR)
271}
272
273fn witnessed_close_group_quorum_for_missing_views(missing_views: usize) -> usize {
274    witnessed_close_group_quorum()
275        .saturating_sub(missing_views)
276        .max(1)
277}
278
279fn missing_witnessed_responder_views(witnessed: &WitnessedCloseGroup) -> usize {
280    witnessed
281        .initial_closest
282        .len()
283        .saturating_sub(witnessed.responder_views.len())
284}
285
286fn witnessed_close_group_quorum_for_transcript(witnessed: &WitnessedCloseGroup) -> usize {
287    witnessed_close_group_quorum_for_missing_views(missing_witnessed_responder_views(witnessed))
288}
289
290fn peer_list(peers: &[PeerId]) -> Vec<String> {
291    peers.iter().map(ToString::to_string).collect()
292}
293
294pub(crate) type StoreQuote = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
295type StoreQuoteRequestResult = (PeerId, Vec<MultiAddr>, Result<(PaymentQuote, Amount)>);
296type VotersByPeer = HashMap<PeerId, HashSet<PeerId>>;
297type WitnessedVoteData = (HashMap<PeerId, DHTNode>, VotersByPeer, Vec<(PeerId, usize)>);
298
299pub(crate) struct StoreQuotePlan {
300    pub(crate) quotes: Vec<StoreQuote>,
301    pub(crate) put_peers: Vec<(PeerId, Vec<MultiAddr>)>,
302}
303
304#[derive(Debug, Clone)]
305struct WitnessedQuoteCandidate {
306    node: DHTNode,
307    votes: usize,
308    voters: HashSet<PeerId>,
309}
310
311#[derive(Debug, Clone)]
312struct WitnessedQuotePeer {
313    peer_id: PeerId,
314    addrs: Vec<MultiAddr>,
315    voters: HashSet<PeerId>,
316}
317
318#[derive(Debug, Clone)]
319struct WitnessedQuoteSelection {
320    quote_peers: Vec<WitnessedQuotePeer>,
321    initial_put_peers: Vec<(PeerId, Vec<MultiAddr>)>,
322    quorum: usize,
323}
324
325enum QuoteSelectionPolicy {
326    ClosestByDistance,
327    WitnessedMedianVoters {
328        voters_by_peer: VotersByPeer,
329        quorum: usize,
330    },
331}
332
333fn witnessed_initial_peers(witnessed: &WitnessedCloseGroup) -> Vec<String> {
334    witnessed
335        .initial_closest
336        .iter()
337        .map(|node| node.peer_id.to_string())
338        .collect()
339}
340
341fn witnessed_responder_views(witnessed: &WitnessedCloseGroup) -> Vec<String> {
342    witnessed
343        .responder_views
344        .iter()
345        .map(|view| {
346            let peers = view
347                .closest
348                .iter()
349                .map(|node| node.peer_id)
350                .collect::<Vec<_>>();
351            format!("{}=>{:?}", view.responder, peer_list(&peers))
352        })
353        .collect()
354}
355
356fn merge_witnessed_node(nodes: &mut HashMap<PeerId, DHTNode>, node: DHTNode) {
357    match nodes.entry(node.peer_id) {
358        std::collections::hash_map::Entry::Occupied(mut entry) => {
359            entry.get_mut().merge_from(node);
360        }
361        std::collections::hash_map::Entry::Vacant(entry) => {
362            entry.insert(node);
363        }
364    }
365}
366
367fn sort_vote_counts_by_distance(vote_counts: &mut [(PeerId, usize)], address: &[u8; 32]) {
368    vote_counts.sort_by(|left, right| {
369        peer_xor_distance(&left.0, address)
370            .cmp(&peer_xor_distance(&right.0, address))
371            .then_with(|| left.0.as_bytes().cmp(right.0.as_bytes()))
372    });
373}
374
375fn witnessed_vote_counts_and_nodes(
376    witnessed: &WitnessedCloseGroup,
377    address: &[u8; 32],
378) -> WitnessedVoteData {
379    let mut known_nodes = HashMap::new();
380    for node in &witnessed.initial_closest {
381        merge_witnessed_node(&mut known_nodes, node.clone());
382    }
383
384    let mut voters_by_peer: HashMap<PeerId, HashSet<PeerId>> = HashMap::new();
385    for view in &witnessed.responder_views {
386        let mut voted = HashSet::new();
387        for node in &view.closest {
388            merge_witnessed_node(&mut known_nodes, node.clone());
389            if voted.insert(node.peer_id) {
390                voters_by_peer
391                    .entry(node.peer_id)
392                    .or_default()
393                    .insert(view.responder);
394            }
395        }
396    }
397
398    let mut vote_counts: Vec<(PeerId, usize)> = voters_by_peer
399        .iter()
400        .map(|(peer_id, voters)| (*peer_id, voters.len()))
401        .collect();
402    sort_vote_counts_by_distance(&mut vote_counts, address);
403    (known_nodes, voters_by_peer, vote_counts)
404}
405
406fn witnessed_consensus_candidates(
407    witnessed: &WitnessedCloseGroup,
408    address: &[u8; 32],
409    quorum: usize,
410) -> Vec<WitnessedQuoteCandidate> {
411    let (known_nodes, voters_by_peer, vote_counts) =
412        witnessed_vote_counts_and_nodes(witnessed, address);
413    let mut candidates = vote_counts
414        .iter()
415        .filter_map(|(peer_id, votes)| {
416            if *votes < quorum {
417                return None;
418            }
419            known_nodes.get(peer_id).cloned().and_then(|node| {
420                voters_by_peer
421                    .get(peer_id)
422                    .cloned()
423                    .map(|voters| WitnessedQuoteCandidate {
424                        node,
425                        votes: *votes,
426                        voters,
427                    })
428            })
429        })
430        .collect::<Vec<_>>();
431
432    candidates.sort_by(|left, right| {
433        peer_xor_distance(&left.node.peer_id, address)
434            .cmp(&peer_xor_distance(&right.node.peer_id, address))
435            .then_with(|| right.votes.cmp(&left.votes))
436            .then_with(|| {
437                left.node
438                    .peer_id
439                    .as_bytes()
440                    .cmp(right.node.peer_id.as_bytes())
441            })
442    });
443    candidates
444}
445
446fn witnessed_vote_counts(witnessed: &WitnessedCloseGroup, address: &[u8; 32]) -> Vec<String> {
447    let (_, _, vote_counts) = witnessed_vote_counts_and_nodes(witnessed, address);
448    vote_counts
449        .iter()
450        .map(|(peer_id, votes)| format!("{peer_id}:{votes}"))
451        .collect()
452}
453
454fn witnessed_consensus(
455    witnessed: &WitnessedCloseGroup,
456    address: &[u8; 32],
457    quorum: usize,
458) -> Vec<String> {
459    witnessed_consensus_candidates(witnessed, address, quorum)
460        .iter()
461        .map(|candidate| format!("{}:{}", candidate.node.peer_id, candidate.votes))
462        .collect()
463}
464
465fn witnessed_close_group_diagnostics(
466    address: &[u8; 32],
467    witnessed: &WitnessedCloseGroup,
468    quorum: usize,
469) -> String {
470    format!(
471        "target={}, initial={:?}, responder_views={:?}, vote_counts={:?}, quorum={}, final={:?}",
472        hex::encode(address),
473        witnessed_initial_peers(witnessed),
474        witnessed_responder_views(witnessed),
475        witnessed_vote_counts(witnessed, address),
476        quorum,
477        witnessed_consensus(witnessed, address, quorum)
478    )
479}
480
481fn witnessed_quote_selection_or_error(
482    address: &[u8; 32],
483    witnessed: &WitnessedCloseGroup,
484    required: usize,
485    quorum: usize,
486) -> Result<WitnessedQuoteSelection> {
487    let candidates = witnessed_consensus_candidates(witnessed, address, quorum);
488    if candidates.len() < required {
489        return Err(Error::InsufficientPeers(format!(
490            "Witnessed close group inconclusive before payment: got {}/{} quorum-recognised peers. {}",
491            candidates.len(),
492            required,
493            witnessed_close_group_diagnostics(address, witnessed, quorum)
494        )));
495    }
496
497    let initial_put_peers = witnessed
498        .initial_closest
499        .iter()
500        .take(CLOSE_GROUP_SIZE)
501        .map(|node| (node.peer_id, node.addresses_by_priority()))
502        .collect::<Vec<_>>();
503
504    if initial_put_peers.len() < CLOSE_GROUP_SIZE {
505        return Err(Error::InsufficientPeers(format!(
506            "Witnessed close group returned only {}/{} initial PUT peers before payment. {}",
507            initial_put_peers.len(),
508            CLOSE_GROUP_SIZE,
509            witnessed_close_group_diagnostics(address, witnessed, quorum)
510        )));
511    }
512
513    let quote_peers = candidates
514        .into_iter()
515        .map(|candidate| WitnessedQuotePeer {
516            peer_id: candidate.node.peer_id,
517            addrs: candidate.node.addresses_by_priority(),
518            voters: candidate.voters,
519        })
520        .collect();
521
522    Ok(WitnessedQuoteSelection {
523        quote_peers,
524        initial_put_peers,
525        quorum,
526    })
527}
528
529pub(crate) fn median_paid_quote_issuer(
530    quotes: &[(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)],
531) -> Option<(PeerId, Amount)> {
532    if quotes.len() <= MEDIAN_QUOTE_INDEX {
533        return None;
534    }
535
536    let mut by_price: Vec<(usize, PeerId, Amount)> = quotes
537        .iter()
538        .enumerate()
539        .map(|(index, (peer_id, _, _, price))| (index, *peer_id, *price))
540        .collect();
541    by_price.sort_by_key(|(index, _, price)| (*price, *index));
542    by_price
543        .get(MEDIAN_QUOTE_INDEX)
544        .map(|(_, peer_id, price)| (*peer_id, *price))
545}
546
547fn sort_quotes_by_distance(quotes: &mut [StoreQuote], address: &[u8; 32]) {
548    quotes.sort_by(|left, right| {
549        peer_xor_distance(&left.0, address)
550            .cmp(&peer_xor_distance(&right.0, address))
551            .then_with(|| left.0.as_bytes().cmp(right.0.as_bytes()))
552    });
553}
554
555fn median_paid_quote_issuer_for_indices(
556    quotes: &[StoreQuote],
557    indices: &[usize],
558) -> Option<(PeerId, Amount)> {
559    if indices.len() <= MEDIAN_QUOTE_INDEX {
560        return None;
561    }
562
563    let mut by_price: Vec<(usize, PeerId, Amount)> = indices
564        .iter()
565        .enumerate()
566        .map(|(selected_index, quote_index)| {
567            let (peer_id, _, _, price) = &quotes[*quote_index];
568            (selected_index, *peer_id, *price)
569        })
570        .collect();
571    by_price.sort_by_key(|(selected_index, _, price)| (*price, *selected_index));
572    by_price
573        .get(MEDIAN_QUOTE_INDEX)
574        .map(|(_, peer_id, price)| (*peer_id, *price))
575}
576
577fn median_issuer_voter_support(
578    quotes: &[StoreQuote],
579    indices: &[usize],
580    voters_by_peer: &VotersByPeer,
581) -> Option<(PeerId, usize)> {
582    let (median_peer_id, _) = median_paid_quote_issuer_for_indices(quotes, indices)?;
583    let voters = voters_by_peer.get(&median_peer_id)?;
584    Some((median_peer_id, voters.len()))
585}
586
587fn visit_quote_subsets<F>(
588    quote_count: usize,
589    subset_size: usize,
590    start_index: usize,
591    current: &mut Vec<usize>,
592    visit: &mut F,
593) where
594    F: FnMut(&[usize]),
595{
596    if current.len() == subset_size {
597        visit(current);
598        return;
599    }
600
601    let remaining = subset_size - current.len();
602    let last_start = quote_count - remaining;
603    for index in start_index..=last_start {
604        current.push(index);
605        visit_quote_subsets(quote_count, subset_size, index + 1, current, visit);
606        current.pop();
607    }
608}
609
610fn select_closest_quotes(mut quotes: Vec<StoreQuote>, address: &[u8; 32]) -> Vec<StoreQuote> {
611    sort_quotes_by_distance(&mut quotes, address);
612    quotes.truncate(CLOSE_GROUP_SIZE);
613    quotes
614}
615
616fn select_witnessed_median_voter_quotes(
617    mut quotes: Vec<StoreQuote>,
618    address: &[u8; 32],
619    voters_by_peer: &VotersByPeer,
620    required_support: usize,
621) -> Option<Vec<StoreQuote>> {
622    if quotes.len() < CLOSE_GROUP_SIZE {
623        return None;
624    }
625
626    sort_quotes_by_distance(&mut quotes, address);
627
628    let mut best_indices: Option<(usize, Vec<usize>)> = None;
629    let mut current_indices = Vec::with_capacity(CLOSE_GROUP_SIZE);
630    visit_quote_subsets(
631        quotes.len(),
632        CLOSE_GROUP_SIZE,
633        0,
634        &mut current_indices,
635        &mut |indices| {
636            let Some((_, support)) = median_issuer_voter_support(&quotes, indices, voters_by_peer)
637            else {
638                return;
639            };
640            if support < required_support {
641                return;
642            }
643            match &best_indices {
644                Some((best_support, best)) if *best_support > support => {}
645                Some((best_support, best))
646                    if *best_support == support && best.as_slice() <= indices => {}
647                _ => best_indices = Some((support, indices.to_vec())),
648            }
649        },
650    );
651
652    best_indices.map(|(_, indices)| {
653        indices
654            .into_iter()
655            .map(|index| quotes[index].clone())
656            .collect()
657    })
658}
659
660fn put_peers_with_median_voters_first(
661    quotes: &[StoreQuote],
662    put_peers: &[(PeerId, Vec<MultiAddr>)],
663    voters_by_peer: &VotersByPeer,
664    required_support: usize,
665) -> Option<Vec<(PeerId, Vec<MultiAddr>)>> {
666    let (median_peer_id, _) = median_paid_quote_issuer(quotes)?;
667    let voters = voters_by_peer.get(&median_peer_id)?;
668
669    let mut supporting_peers = Vec::new();
670    let mut fallback_peers = Vec::new();
671    for (peer_id, addrs) in put_peers {
672        let peer = (*peer_id, addrs.clone());
673        if voters.contains(peer_id) {
674            supporting_peers.push(peer);
675        } else {
676            fallback_peers.push(peer);
677        }
678    }
679
680    if supporting_peers.len() < required_support {
681        return None;
682    }
683
684    supporting_peers.extend(fallback_peers);
685    Some(supporting_peers)
686}
687
688impl Client {
689    /// Get storage quotes from the closest peers for a given address.
690    ///
691    /// Builds a quorum-witnessed candidate set with at least
692    /// `CLOSE_GROUP_SIZE` peers, requests quotes from all of them concurrently,
693    /// and returns the closest supported `CLOSE_GROUP_SIZE` successful
694    /// responders. When multiple sets are possible, the client prefers the
695    /// one with the strongest paid-median voter support, then the closest
696    /// peers by XOR distance.
697    ///
698    /// Returns `Error::AlreadyStored` early if `CLOSE_GROUP_MAJORITY` peers
699    /// report the chunk is already stored.
700    ///
701    /// # Errors
702    ///
703    /// Returns an error if insufficient quotes can be collected.
704    pub async fn get_store_quotes(
705        &self,
706        address: &[u8; 32],
707        data_size: u64,
708        data_type: u32,
709    ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
710        Ok(self
711            .get_store_quote_plan(address, data_size, data_type)
712            .await?
713            .quotes)
714    }
715
716    /// Get storage quotes plus PUT targets ordered for paid-median acceptance.
717    ///
718    /// Quote order is preserved for proof construction because tied quote
719    /// prices rely on stable median selection. PUT target order is separate:
720    /// peers that voted for the paid median issuer are placed first so the
721    /// initial write wave is locally acceptable to a storage majority.
722    pub(crate) async fn get_store_quote_plan(
723        &self,
724        address: &[u8; 32],
725        data_size: u64,
726        data_type: u32,
727    ) -> Result<StoreQuotePlan> {
728        let witnessed_selection = self.select_witnessed_quote_selection(address).await?;
729        let voters_by_peer: VotersByPeer = witnessed_selection
730            .quote_peers
731            .iter()
732            .map(|peer| (peer.peer_id, peer.voters.clone()))
733            .collect();
734        let remote_peers = witnessed_selection
735            .quote_peers
736            .into_iter()
737            .map(|peer| (peer.peer_id, peer.addrs))
738            .collect();
739        let initial_put_peers = witnessed_selection.initial_put_peers;
740        let quorum = witnessed_selection.quorum;
741        let quotes = self
742            .collect_store_quotes_from_remote_peers(
743                address,
744                data_size,
745                data_type,
746                remote_peers,
747                QuoteSelectionPolicy::WitnessedMedianVoters {
748                    voters_by_peer: voters_by_peer.clone(),
749                    quorum,
750                },
751            )
752            .await?;
753        let put_peers = put_peers_with_median_voters_first(
754            &quotes,
755            &initial_put_peers,
756            &voters_by_peer,
757            quorum,
758        )
759        .ok_or_else(|| {
760            Error::InsufficientPeers(format!(
761                "Collected {} witnessed quotes, but fewer than {} initial witness PUT peers \
762                 voted for the paid median issuer for {}",
763                quotes.len(),
764                quorum,
765                hex::encode(address)
766            ))
767        })?;
768
769        Ok(StoreQuotePlan { quotes, put_peers })
770    }
771
772    /// Get storage quotes with the previous over-query behaviour.
773    ///
774    /// Merkle preflight uses quote responses only as an already-stored probe;
775    /// the actual payment still happens through merkle candidate pools. Keep
776    /// the extra peer buffer there so merkle upload behaviour remains
777    /// unchanged when a few peers are slow or return unusable quote bindings.
778    pub(crate) async fn get_store_quotes_with_fault_tolerance(
779        &self,
780        address: &[u8; 32],
781        data_size: u64,
782        data_type: u32,
783    ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
784        let peer_query_count = fault_tolerant_quote_query_count();
785        let remote_peers = self
786            .network()
787            .find_closest_peers(address, peer_query_count)
788            .await?;
789
790        self.collect_store_quotes_from_remote_peers(
791            address,
792            data_size,
793            data_type,
794            remote_peers,
795            QuoteSelectionPolicy::ClosestByDistance,
796        )
797        .await
798    }
799
800    async fn select_witnessed_quote_selection(
801        &self,
802        address: &[u8; 32],
803    ) -> Result<WitnessedQuoteSelection> {
804        let required = single_node_quote_query_count();
805        let witnessed = self
806            .network()
807            .find_witnessed_close_group_with_view_count(
808                address,
809                required,
810                SINGLE_NODE_WITNESSED_VIEW_COUNT,
811            )
812            .await
813            .map_err(|e| {
814                Error::InsufficientPeers(format!(
815                    "Witnessed close group lookup failed before payment for target {}: {e}",
816                    hex::encode(address)
817                ))
818            })?;
819        let base_quorum = witnessed_close_group_quorum();
820        let missing_views = missing_witnessed_responder_views(&witnessed);
821        let quorum = witnessed_close_group_quorum_for_transcript(&witnessed);
822
823        if missing_views > 0 {
824            warn!(
825                target = %hex::encode(address),
826                initial = witnessed.initial_closest.len(),
827                responder_views = witnessed.responder_views.len(),
828                missing_views = missing_views,
829                base_quorum = base_quorum,
830                adjusted_quorum = quorum,
831                "Witnessed close group transcript is missing responder views; lowering SNP witness quorum"
832            );
833        }
834
835        debug!(
836            target = %hex::encode(address),
837            quorum = quorum,
838            view_count = SINGLE_NODE_WITNESSED_VIEW_COUNT,
839            initial = ?witnessed_initial_peers(&witnessed),
840            responder_views = ?witnessed_responder_views(&witnessed),
841            vote_counts = ?witnessed_vote_counts(&witnessed, address),
842            final_witnessed_set = ?witnessed_consensus(&witnessed, address, quorum),
843            "Witnessed close group selected for SNP quote collection"
844        );
845
846        witnessed_quote_selection_or_error(address, &witnessed, required, quorum)
847    }
848
849    #[allow(clippy::too_many_lines)]
850    async fn collect_store_quotes_from_remote_peers(
851        &self,
852        address: &[u8; 32],
853        data_size: u64,
854        data_type: u32,
855        remote_peers: Vec<(PeerId, Vec<MultiAddr>)>,
856        quote_selection_policy: QuoteSelectionPolicy,
857    ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
858        let peer_query_count = remote_peers.len();
859
860        let node = self.network().node();
861
862        debug!(
863            "Requesting quotes from up to {peer_query_count} peers for address {} (size: {data_size})",
864            hex::encode(address)
865        );
866
867        if remote_peers.len() < CLOSE_GROUP_SIZE {
868            return Err(Error::InsufficientPeers(format!(
869                "Found {} peers, need {CLOSE_GROUP_SIZE}",
870                remote_peers.len()
871            )));
872        }
873        debug_assert!(peer_query_count >= CLOSE_GROUP_SIZE);
874
875        let per_peer_timeout = Duration::from_secs(self.config().quote_timeout_secs);
876        let overall_timeout = Duration::from_secs(QUOTE_COLLECTION_TIMEOUT_SECS);
877
878        // Collect quote responses. SNP/witnessed collection deliberately tries
879        // the closest witnessed peers first and only falls back to further
880        // witnessed peers when a closer peer fails to produce a usable quote.
881        let mut quotes = Vec::with_capacity(peer_query_count);
882        let mut already_stored_peers: Vec<(PeerId, [u8; 32])> = Vec::new();
883        let mut failures: Vec<String> = Vec::new();
884
885        // Track storer-rejecting peers separately (binding, content, signature
886        // failures) so we can surface their count in diagnostics — they're a
887        // special class of failure (peer misconfigured or hostile, not
888        // network-broken) and the user benefits from seeing them called out.
889        let mut bad_quote_count = 0usize;
890
891        let staged_witnessed_collection = matches!(
892            &quote_selection_policy,
893            QuoteSelectionPolicy::WitnessedMedianVoters { .. }
894        );
895
896        if staged_witnessed_collection {
897            let mut quote_futures = FuturesUnordered::new();
898            let mut next_peer_index = 0usize;
899            let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
900                tokio::time::timeout(overall_timeout, async {
901                    loop {
902                        let launch_count = witnessed_quote_launch_budget(
903                            quotes.len(),
904                            quote_futures.len(),
905                            remote_peers.len().saturating_sub(next_peer_index),
906                        );
907                        for _ in 0..launch_count {
908                            let (peer_id, peer_addrs) = &remote_peers[next_peer_index];
909                            next_peer_index += 1;
910                            quote_futures.push(request_store_quote_from_peer(
911                                node.clone(),
912                                *peer_id,
913                                peer_addrs.clone(),
914                                self.next_request_id(),
915                                *address,
916                                data_size,
917                                data_type,
918                                per_peer_timeout,
919                            ));
920                        }
921
922                        if quotes.len() >= CLOSE_GROUP_SIZE || quote_futures.is_empty() {
923                            break;
924                        }
925
926                        let Some((peer_id, addrs, quote_result)) = quote_futures.next().await
927                        else {
928                            break;
929                        };
930                        record_store_quote_result(
931                            peer_id,
932                            addrs,
933                            quote_result,
934                            address,
935                            &mut quotes,
936                            &mut already_stored_peers,
937                            &mut failures,
938                            &mut bad_quote_count,
939                        );
940                    }
941                    Ok(())
942                })
943                .await;
944
945            match collect_result {
946                Err(_elapsed) => {
947                    warn!(
948                        "Quote collection timed out after {overall_timeout:?} for address {}",
949                        hex::encode(address)
950                    );
951                }
952                Ok(Err(e)) => return Err(e),
953                Ok(Ok(())) => {}
954            }
955        } else {
956            // Merkle preflight keeps the previous behaviour: query the full
957            // over-query set concurrently because those quote responses are
958            // only used as an already-stored probe.
959            let mut quote_futures = FuturesUnordered::new();
960
961            for (peer_id, peer_addrs) in &remote_peers {
962                quote_futures.push(request_store_quote_from_peer(
963                    node.clone(),
964                    *peer_id,
965                    peer_addrs.clone(),
966                    self.next_request_id(),
967                    *address,
968                    data_size,
969                    data_type,
970                    per_peer_timeout,
971                ));
972            }
973
974            let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
975                tokio::time::timeout(overall_timeout, async {
976                    while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await {
977                        record_store_quote_result(
978                            peer_id,
979                            addrs,
980                            quote_result,
981                            address,
982                            &mut quotes,
983                            &mut already_stored_peers,
984                            &mut failures,
985                            &mut bad_quote_count,
986                        );
987                    }
988                    Ok(())
989                })
990                .await;
991
992            match collect_result {
993                Err(_elapsed) => {
994                    warn!(
995                        "Quote collection timed out after {overall_timeout:?} for address {}",
996                        hex::encode(address)
997                    );
998                    // Fall through to check if we have enough quotes despite timeout.
999                    // The timeout fires when slow peers haven't responded yet, but we
1000                    // may already have enough successful quotes from fast peers.
1001                }
1002                Ok(Err(e)) => return Err(e),
1003                Ok(Ok(())) => {}
1004            }
1005        }
1006
1007        // Defensive double-check: the per-peer handler already filters
1008        // bad-binding responses into `failures`, but if any path slipped a bad
1009        // quote into `quotes` (e.g. a future refactor) this catches it before
1010        // we sort by distance and return. `bad_dropped` should be 0 in normal
1011        // operation; non-zero indicates an upstream regression worth investigating.
1012        let bad_dropped = drop_quotes_with_bad_bindings(&mut quotes);
1013        if bad_dropped > 0 {
1014            warn!(
1015                "Defensive filter dropped {bad_dropped} quotes with mismatched peer bindings \
1016                 for address {} — the per-peer handler should have caught these earlier \
1017                 (this indicates an upstream regression)",
1018                hex::encode(address),
1019            );
1020            bad_quote_count += bad_dropped;
1021        }
1022
1023        // Check already-stored: only count votes from the closest CLOSE_GROUP_SIZE peers.
1024        if !already_stored_peers.is_empty() {
1025            let mut all_peers_by_distance: Vec<(bool, [u8; 32])> = Vec::new();
1026            for (peer_id, _, _, _) in &quotes {
1027                all_peers_by_distance.push((false, peer_xor_distance(peer_id, address)));
1028            }
1029            for (_, dist) in &already_stored_peers {
1030                all_peers_by_distance.push((true, *dist));
1031            }
1032            all_peers_by_distance.sort_by_key(|a| a.1);
1033
1034            let close_group_stored = all_peers_by_distance
1035                .iter()
1036                .take(CLOSE_GROUP_SIZE)
1037                .filter(|(is_stored, _)| *is_stored)
1038                .count();
1039
1040            if close_group_stored >= CLOSE_GROUP_MAJORITY {
1041                debug!(
1042                    "Chunk {} already stored ({close_group_stored}/{CLOSE_GROUP_SIZE} close-group peers confirm)",
1043                    hex::encode(address)
1044                );
1045                return Err(Error::AlreadyStored);
1046            }
1047        }
1048
1049        let already_stored_count = already_stored_peers.len();
1050        let failure_count = failures.len();
1051        let quote_count = quotes.len();
1052        let total_responses = quote_count + failure_count + already_stored_count;
1053
1054        if quotes.len() >= CLOSE_GROUP_SIZE {
1055            let selected_quotes = match quote_selection_policy {
1056                QuoteSelectionPolicy::ClosestByDistance => select_closest_quotes(quotes, address),
1057                QuoteSelectionPolicy::WitnessedMedianVoters {
1058                    voters_by_peer,
1059                    quorum,
1060                } => select_witnessed_median_voter_quotes(quotes, address, &voters_by_peer, quorum)
1061                    .ok_or_else(|| {
1062                        Error::InsufficientPeers(format!(
1063                            "Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} whose paid \
1064                                 median issuer is recognised by at least {} \
1065                                 selected witness peers ({total_responses} responses: \
1066                                 {already_stored_count} already_stored, {failure_count} failed \
1067                                 including {bad_quote_count} with mismatched peer bindings). \
1068                                 Failures: [{}]",
1069                            quorum,
1070                            failures.join("; ")
1071                        ))
1072                    })?,
1073            };
1074
1075            info!(
1076                "Collected {} quotes for address {} ({total_responses} responses: \
1077                 {quote_count} ok, {already_stored_count} already_stored, {failure_count} failed, \
1078                 {bad_quote_count} bad-binding)",
1079                selected_quotes.len(),
1080                hex::encode(address),
1081            );
1082            return Ok(selected_quotes);
1083        }
1084
1085        Err(Error::InsufficientPeers(format!(
1086            "Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} ({total_responses} responses: \
1087             {already_stored_count} already_stored, {failure_count} failed including \
1088             {bad_quote_count} with mismatched peer bindings). Failures: [{}]",
1089            failures.join("; ")
1090        )))
1091    }
1092}
1093
1094#[cfg(test)]
1095#[allow(clippy::unwrap_used, clippy::expect_used)]
1096mod tests {
1097    //! Test fixtures use real ML-DSA-65 keypairs (1952-byte public keys), the
1098    //! same key material that ships on the wire. The "bad" quote is built by
1099    //! **swapping** the public key field with a different real keypair's
1100    //! public key — the exact shape produced by the Apr 30 production
1101    //! failure (an operator running two co-located identities with crossed
1102    //! quote-signing keys). Signatures are not exercised here because this
1103    //! filter only mirrors `validate_peer_bindings` (BLAKE3 binding); see
1104    //! the doc-comment on `quote_binding_is_valid` for why
1105    //! `verify_quote_signature` and `verify_quote_content` are deliberately
1106    //! NOT mirrored.
1107
1108    use super::*;
1109    use ant_protocol::evm::RewardsAddress;
1110    use ant_protocol::pqc::ops::{MlDsaOperations, MlDsaPublicKey};
1111    use ant_protocol::transport::{DHTNode, MlDsa65, ResponderView, WitnessedCloseGroup};
1112    use std::time::SystemTime;
1113    use xor_name::XorName;
1114
1115    /// A real ML-DSA-65 keypair plus its derived peer ID.
1116    struct Keypair {
1117        peer_id: PeerId,
1118        pub_key_bytes: Vec<u8>,
1119    }
1120
1121    fn gen_keypair() -> Keypair {
1122        let ml_dsa = MlDsa65::new();
1123        let (pub_key, _sk) = ml_dsa.generate_keypair().expect("ML-DSA-65 keygen");
1124        let pub_key_bytes = pub_key.as_bytes().to_vec();
1125        let peer_id = PeerId::from_bytes(compute_address(&pub_key_bytes));
1126        Keypair {
1127            peer_id,
1128            pub_key_bytes,
1129        }
1130    }
1131
1132    /// Build a quote tuple whose `pub_key` correctly hashes to its peer_id.
1133    /// Signature is left empty: this filter does not verify signatures.
1134    fn good_quote_real() -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
1135        let kp = gen_keypair();
1136        let quote = PaymentQuote {
1137            content: XorName([0u8; 32]),
1138            timestamp: SystemTime::UNIX_EPOCH,
1139            price: Amount::ZERO,
1140            rewards_address: RewardsAddress::new([0u8; 20]),
1141            pub_key: kp.pub_key_bytes,
1142            signature: Vec::new(),
1143        };
1144        (kp.peer_id, Vec::new(), quote, Amount::ZERO)
1145    }
1146
1147    /// Build a quote tuple where the quote carries a different keypair's
1148    /// `pub_key` than the peer_id derives from. Mirrors the production
1149    /// failure shape: peer A advertised on the transport, but the quote
1150    /// carries peer B's key.
1151    fn bad_quote_real() -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
1152        let claimed = gen_keypair();
1153        let signing = gen_keypair();
1154        assert_ne!(claimed.pub_key_bytes, signing.pub_key_bytes);
1155        assert_ne!(claimed.peer_id.as_bytes(), signing.peer_id.as_bytes());
1156        let quote = PaymentQuote {
1157            content: XorName([0u8; 32]),
1158            timestamp: SystemTime::UNIX_EPOCH,
1159            price: Amount::ZERO,
1160            rewards_address: RewardsAddress::new([0u8; 20]),
1161            pub_key: signing.pub_key_bytes,
1162            signature: Vec::new(),
1163        };
1164        (claimed.peer_id, Vec::new(), quote, Amount::ZERO)
1165    }
1166
1167    fn witnessed_test_node(seed: u8) -> DHTNode {
1168        DHTNode {
1169            peer_id: PeerId::from_bytes([seed; 32]),
1170            addresses: Vec::new(),
1171            address_types: Vec::new(),
1172            distance: None,
1173            reliability: 1.0,
1174        }
1175    }
1176
1177    fn witnessed_test_nodes(seeds: &[u8]) -> Vec<DHTNode> {
1178        seeds.iter().copied().map(witnessed_test_node).collect()
1179    }
1180
1181    fn witnessed_test_view(responder: u8, closest: &[u8]) -> ResponderView {
1182        ResponderView {
1183            responder: PeerId::from_bytes([responder; 32]),
1184            closest: witnessed_test_nodes(closest),
1185        }
1186    }
1187
1188    fn synthetic_peer(seed: u8) -> PeerId {
1189        PeerId::from_bytes([seed; 32])
1190    }
1191
1192    fn synthetic_quote(seed: u8, price: u64) -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
1193        let amount = Amount::from(price);
1194        let quote = PaymentQuote {
1195            content: XorName([0u8; 32]),
1196            timestamp: SystemTime::UNIX_EPOCH,
1197            price: amount,
1198            rewards_address: RewardsAddress::new([0u8; 20]),
1199            pub_key: Vec::new(),
1200            signature: Vec::new(),
1201        };
1202        (synthetic_peer(seed), Vec::new(), quote, amount)
1203    }
1204
1205    fn synthetic_voters(seeds: &[u8]) -> HashSet<PeerId> {
1206        seeds.iter().copied().map(synthetic_peer).collect()
1207    }
1208
1209    fn quote_peer_seeds(quotes: &[(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)]) -> Vec<u8> {
1210        quotes
1211            .iter()
1212            .map(|(peer_id, _, _, _)| peer_id.as_bytes()[0])
1213            .collect()
1214    }
1215
1216    fn put_peer_seeds(peers: &[(PeerId, Vec<MultiAddr>)]) -> Vec<u8> {
1217        peers
1218            .iter()
1219            .map(|(peer_id, _)| peer_id.as_bytes()[0])
1220            .collect()
1221    }
1222
1223    fn put_peers_from_seeds(seeds: &[u8]) -> Vec<(PeerId, Vec<MultiAddr>)> {
1224        seeds
1225            .iter()
1226            .copied()
1227            .map(|seed| (synthetic_peer(seed), Vec::new()))
1228            .collect()
1229    }
1230
1231    /// Independent re-implementation of the storer-side binding spec
1232    /// (`ant-node/src/payment/verifier.rs::validate_peer_bindings` +
1233    /// `peer_id_from_public_key_bytes`):
1234    /// (a) `pub_key` parses as ML-DSA-65 (length 1952), and
1235    /// (b) `BLAKE3(pub_key) == peer_id`.
1236    ///
1237    /// Re-derived from spec, NOT delegating to `quote_binding_is_valid`,
1238    /// so cross-checks are not "function == itself".
1239    fn storer_binding_would_accept(peer_id: &PeerId, quote: &PaymentQuote) -> bool {
1240        if MlDsaPublicKey::from_bytes(&quote.pub_key).is_err() {
1241            return false;
1242        }
1243        compute_address(&quote.pub_key) == *peer_id.as_bytes()
1244    }
1245
1246    // ============================================================
1247    // Tests for `quote_binding_is_valid` (the predicate)
1248    // ============================================================
1249
1250    #[test]
1251    fn binding_accepts_real_self_consistent_keypair() {
1252        let (peer_id, _, quote, _) = good_quote_real();
1253        // Property under test: the predicate accepts a quote whose pub_key
1254        // genuinely belongs to the claimed peer.
1255        assert!(quote_binding_is_valid(&peer_id, &quote));
1256        // Cross-check against the independent full storer-spec implementation.
1257        assert!(storer_binding_would_accept(&peer_id, &quote));
1258    }
1259
1260    #[test]
1261    fn binding_rejects_real_crossed_keypair() {
1262        let (peer_id, _, quote, _) = bad_quote_real();
1263        assert!(!quote_binding_is_valid(&peer_id, &quote));
1264        assert!(!storer_binding_would_accept(&peer_id, &quote));
1265    }
1266
1267    #[test]
1268    fn binding_rejects_oversize_pubkey() {
1269        // A pub_key longer than ML-DSA-65 (1952 bytes) must be rejected
1270        // even if BLAKE3 happens to agree, because the storer rejects on
1271        // length first via `peer_id_from_public_key_bytes`.
1272        let oversized = vec![0u8; ML_DSA_PUB_KEY_LEN + 1];
1273        let peer_id = PeerId::from_bytes(compute_address(&oversized));
1274        let quote = PaymentQuote {
1275            content: XorName([0u8; 32]),
1276            timestamp: SystemTime::UNIX_EPOCH,
1277            price: Amount::ZERO,
1278            rewards_address: RewardsAddress::new([0u8; 20]),
1279            pub_key: oversized,
1280            signature: Vec::new(),
1281        };
1282        // BLAKE3(pub_key) DOES equal the peer_id we constructed, so the
1283        // bare hash check would pass — but the length guard must reject.
1284        assert_eq!(compute_address(&quote.pub_key), *peer_id.as_bytes());
1285        assert!(
1286            !quote_binding_is_valid(&peer_id, &quote),
1287            "predicate must reject oversize pub_key even when BLAKE3 happens to match"
1288        );
1289        assert!(!storer_binding_would_accept(&peer_id, &quote));
1290    }
1291
1292    #[test]
1293    fn binding_rejects_undersize_pubkey() {
1294        let undersized = vec![0u8; ML_DSA_PUB_KEY_LEN - 1];
1295        let peer_id = PeerId::from_bytes(compute_address(&undersized));
1296        let quote = PaymentQuote {
1297            content: XorName([0u8; 32]),
1298            timestamp: SystemTime::UNIX_EPOCH,
1299            price: Amount::ZERO,
1300            rewards_address: RewardsAddress::new([0u8; 20]),
1301            pub_key: undersized,
1302            signature: Vec::new(),
1303        };
1304        assert!(!quote_binding_is_valid(&peer_id, &quote));
1305        assert!(!storer_binding_would_accept(&peer_id, &quote));
1306    }
1307
1308    // ============================================================
1309    // Tests for the filter (`drop_quotes_with_bad_bindings`)
1310    // ============================================================
1311
1312    #[test]
1313    fn quote_query_counts_keep_single_node_close_group_only() {
1314        assert_eq!(single_node_quote_query_count(), CLOSE_GROUP_SIZE);
1315        assert_eq!(SINGLE_NODE_WITNESSED_VIEW_COUNT, 20);
1316        assert!(SINGLE_NODE_WITNESSED_VIEW_COUNT > single_node_quote_query_count());
1317        assert_eq!(witnessed_close_group_quorum(), 5);
1318        assert_eq!(witnessed_close_group_quorum_for_missing_views(0), 5);
1319        assert_eq!(witnessed_close_group_quorum_for_missing_views(1), 4);
1320        assert_eq!(witnessed_close_group_quorum_for_missing_views(2), 3);
1321        assert_eq!(
1322            fault_tolerant_quote_query_count(),
1323            CLOSE_GROUP_SIZE * FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER
1324        );
1325        assert!(fault_tolerant_quote_query_count() > single_node_quote_query_count());
1326    }
1327
1328    #[test]
1329    fn witnessed_quote_launch_budget_keeps_exact_quote_window() {
1330        assert_eq!(
1331            witnessed_quote_launch_budget(0, 0, CLOSE_GROUP_SIZE * 2),
1332            CLOSE_GROUP_SIZE,
1333            "initial SNP quote fetch should launch the closest seven peers"
1334        );
1335        assert_eq!(
1336            witnessed_quote_launch_budget(1, CLOSE_GROUP_SIZE - 1, CLOSE_GROUP_SIZE),
1337            0,
1338            "a successful quote should not launch an extra fallback"
1339        );
1340        assert_eq!(
1341            witnessed_quote_launch_budget(0, CLOSE_GROUP_SIZE - 1, CLOSE_GROUP_SIZE),
1342            1,
1343            "a failed in-flight quote should launch the next closest fallback"
1344        );
1345        assert_eq!(
1346            witnessed_quote_launch_budget(CLOSE_GROUP_SIZE - 1, 0, 3),
1347            1,
1348            "only one more peer is needed for the seventh quote"
1349        );
1350        assert_eq!(
1351            witnessed_quote_launch_budget(0, 0, CLOSE_GROUP_SIZE - 1),
1352            CLOSE_GROUP_SIZE - 1,
1353            "launch budget is capped by remaining candidates"
1354        );
1355    }
1356
1357    #[test]
1358    fn witnessed_candidates_sort_by_xor_distance_then_votes() {
1359        let address = [0u8; 32];
1360        let witnessed = WitnessedCloseGroup {
1361            target: address,
1362            k: CLOSE_GROUP_SIZE,
1363            initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1364            responder_views: vec![
1365                witnessed_test_view(1, &[1, 9]),
1366                witnessed_test_view(2, &[1, 9]),
1367                witnessed_test_view(3, &[1, 9]),
1368                witnessed_test_view(4, &[1, 9]),
1369                witnessed_test_view(5, &[1, 9]),
1370                witnessed_test_view(6, &[9]),
1371                witnessed_test_view(7, &[9]),
1372            ],
1373        };
1374
1375        let candidates =
1376            witnessed_consensus_candidates(&witnessed, &address, witnessed_close_group_quorum());
1377
1378        assert_eq!(
1379            candidates
1380                .iter()
1381                .map(|candidate| candidate.node.peer_id.as_bytes()[0])
1382                .collect::<Vec<_>>(),
1383            vec![1, 9],
1384            "XOR closeness must be the primary sort before quote collection"
1385        );
1386    }
1387
1388    #[test]
1389    fn witnessed_quote_peers_error_is_typed_and_pre_payment_when_consensus_is_short() {
1390        let address = [0u8; 32];
1391        let responder_views = (1..=7)
1392            .map(|responder| witnessed_test_view(responder, &[1, 2, 3, 4]))
1393            .collect();
1394        let witnessed = WitnessedCloseGroup {
1395            target: address,
1396            k: CLOSE_GROUP_SIZE,
1397            initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1398            responder_views,
1399        };
1400
1401        let err = witnessed_quote_selection_or_error(
1402            &address,
1403            &witnessed,
1404            CLOSE_GROUP_SIZE,
1405            witnessed_close_group_quorum(),
1406        )
1407        .expect_err("short witnessed consensus must fail before payment");
1408
1409        match err {
1410            Error::InsufficientPeers(message) => {
1411                assert!(message.contains("before payment"));
1412                assert!(message.contains("vote_counts"));
1413                assert!(message.contains("quorum"));
1414            }
1415            other => panic!("expected typed InsufficientPeers error, got {other:?}"),
1416        }
1417    }
1418
1419    #[test]
1420    fn witnessed_quote_peers_include_quorum_fallback_candidates() {
1421        const EXTRA_QUORUM_CANDIDATES: usize = 1;
1422
1423        let address = [0u8; 32];
1424        let witnessed = WitnessedCloseGroup {
1425            target: address,
1426            k: CLOSE_GROUP_SIZE,
1427            initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1428            responder_views: vec![
1429                witnessed_test_view(1, &[1, 2, 3, 4, 5, 6, 7]),
1430                witnessed_test_view(2, &[1, 2, 3, 4, 5, 6, 8]),
1431                witnessed_test_view(3, &[1, 2, 3, 4, 5, 7, 8]),
1432                witnessed_test_view(4, &[1, 2, 3, 4, 6, 7, 8]),
1433                witnessed_test_view(5, &[1, 2, 3, 5, 6, 7, 8]),
1434                witnessed_test_view(6, &[1, 2, 4, 5, 6, 7, 8]),
1435                witnessed_test_view(7, &[1, 3, 4, 5, 6, 7, 8]),
1436            ],
1437        };
1438
1439        let selection = witnessed_quote_selection_or_error(
1440            &address,
1441            &witnessed,
1442            CLOSE_GROUP_SIZE,
1443            witnessed_close_group_quorum(),
1444        )
1445        .expect("fallback candidates should be retained for quote collection");
1446
1447        assert_eq!(
1448            selection.quote_peers.len(),
1449            CLOSE_GROUP_SIZE + EXTRA_QUORUM_CANDIDATES
1450        );
1451        assert_eq!(
1452            selection
1453                .quote_peers
1454                .iter()
1455                .map(|peer| peer.peer_id.as_bytes()[0])
1456                .collect::<Vec<_>>(),
1457            vec![1, 2, 3, 4, 5, 6, 7, 8]
1458        );
1459        assert_eq!(
1460            put_peer_seeds(&selection.initial_put_peers),
1461            vec![1, 2, 3, 4, 5, 6, 7]
1462        );
1463    }
1464
1465    #[test]
1466    fn witnessed_quote_peers_lower_quorum_for_missing_responder_views() {
1467        let address = [0u8; 32];
1468        let witnessed = WitnessedCloseGroup {
1469            target: address,
1470            k: CLOSE_GROUP_SIZE,
1471            initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1472            responder_views: vec![
1473                witnessed_test_view(1, &[1, 2, 3, 4, 5, 6, 7]),
1474                witnessed_test_view(2, &[1, 2, 3, 4, 5, 6, 8]),
1475                witnessed_test_view(3, &[1, 2, 3, 4, 5, 7, 8]),
1476                witnessed_test_view(4, &[1, 2, 3, 4, 6, 7, 8]),
1477                witnessed_test_view(5, &[1, 2, 3, 5, 6, 7, 8]),
1478                witnessed_test_view(6, &[1, 2, 4, 5, 6, 7, 8]),
1479            ],
1480        };
1481        let quorum = witnessed_close_group_quorum_for_transcript(&witnessed);
1482
1483        assert_eq!(missing_witnessed_responder_views(&witnessed), 1);
1484        assert_eq!(quorum, 4);
1485
1486        let selection =
1487            witnessed_quote_selection_or_error(&address, &witnessed, CLOSE_GROUP_SIZE, quorum)
1488                .expect(
1489                    "one missing responder view should lower quorum and still select candidates",
1490                );
1491
1492        assert_eq!(
1493            selection
1494                .quote_peers
1495                .iter()
1496                .map(|peer| peer.peer_id.as_bytes()[0])
1497                .collect::<Vec<_>>(),
1498            vec![1, 2, 3, 4, 5, 6, 7, 8]
1499        );
1500        assert_eq!(selection.quorum, quorum);
1501    }
1502
1503    #[test]
1504    fn witnessed_quote_selection_keeps_closest_set_with_median_voter_quorum() {
1505        const MEDIAN_ISSUER_SEED: u8 = 7;
1506        const FAR_SUPPORTING_VOTER_SEED: u8 = 20;
1507        const UNSUCCESSFUL_SUPPORTING_VOTER_SEED: u8 = 21;
1508
1509        let address = [0u8; 32];
1510        let quotes = vec![
1511            synthetic_quote(1, 10),
1512            synthetic_quote(2, 20),
1513            synthetic_quote(3, 30),
1514            synthetic_quote(6, 50),
1515            synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1516            synthetic_quote(8, 60),
1517            synthetic_quote(9, 70),
1518            synthetic_quote(FAR_SUPPORTING_VOTER_SEED, 80),
1519        ];
1520        let mut voters_by_peer = HashMap::new();
1521        voters_by_peer.insert(
1522            synthetic_peer(MEDIAN_ISSUER_SEED),
1523            synthetic_voters(&[
1524                1,
1525                2,
1526                3,
1527                MEDIAN_ISSUER_SEED,
1528                FAR_SUPPORTING_VOTER_SEED,
1529                UNSUCCESSFUL_SUPPORTING_VOTER_SEED,
1530            ]),
1531        );
1532
1533        let quorum = witnessed_close_group_quorum();
1534        let selected =
1535            select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer, quorum)
1536                .expect("a supported close-group quote set should be selected");
1537
1538        assert_eq!(quote_peer_seeds(&selected), vec![1, 2, 3, 6, 7, 8, 9]);
1539        let (median_peer_id, _) =
1540            median_paid_quote_issuer(&selected).expect("selected quotes have a median");
1541        assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
1542        assert!(voters_by_peer[&median_peer_id].len() >= quorum);
1543    }
1544
1545    #[test]
1546    fn witnessed_quote_selection_uses_direct_median_witness_recognition() {
1547        const MEDIAN_ISSUER_SEED: u8 = 7;
1548
1549        let address = [0u8; 32];
1550        let quotes = vec![
1551            synthetic_quote(1, 10),
1552            synthetic_quote(2, 20),
1553            synthetic_quote(3, 30),
1554            synthetic_quote(4, 50),
1555            synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1556            synthetic_quote(8, 60),
1557            synthetic_quote(9, 70),
1558        ];
1559        let mut voters_by_peer = HashMap::new();
1560        voters_by_peer.insert(
1561            synthetic_peer(MEDIAN_ISSUER_SEED),
1562            synthetic_voters(&[20, 21, 22, 23, 24]),
1563        );
1564
1565        let quorum = witnessed_close_group_quorum();
1566        let selected =
1567            select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer, quorum)
1568                .expect("direct witness recognition should support the paid median issuer");
1569
1570        let (median_peer_id, _) =
1571            median_paid_quote_issuer(&selected).expect("selected quotes have a median");
1572        let selected_peers = selected
1573            .iter()
1574            .map(|(peer_id, _, _, _)| *peer_id)
1575            .collect::<HashSet<_>>();
1576        assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
1577        assert_eq!(
1578            voters_by_peer[&median_peer_id]
1579                .intersection(&selected_peers)
1580                .count(),
1581            0,
1582            "recognising witnesses need not also be selected quote issuers"
1583        );
1584        assert_eq!(voters_by_peer[&median_peer_id].len(), quorum);
1585    }
1586
1587    #[test]
1588    fn witnessed_quote_selection_rejects_median_without_witness_quorum() {
1589        const MEDIAN_ISSUER_SEED: u8 = 7;
1590
1591        let address = [0u8; 32];
1592        let quotes = vec![
1593            synthetic_quote(1, 10),
1594            synthetic_quote(2, 20),
1595            synthetic_quote(3, 30),
1596            synthetic_quote(6, 50),
1597            synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1598            synthetic_quote(8, 60),
1599            synthetic_quote(9, 70),
1600            synthetic_quote(10, 80),
1601        ];
1602        let mut voters_by_peer = HashMap::new();
1603        voters_by_peer.insert(
1604            synthetic_peer(MEDIAN_ISSUER_SEED),
1605            synthetic_voters(&[1, 2, 3, 20]),
1606        );
1607
1608        let selected = select_witnessed_median_voter_quotes(
1609            quotes,
1610            &address,
1611            &voters_by_peer,
1612            witnessed_close_group_quorum(),
1613        );
1614
1615        assert!(
1616            selected.is_none(),
1617            "the selector must not return a paid quote set when fewer than the \
1618             witnessed median voter quorum recognised the paid median issuer"
1619        );
1620    }
1621
1622    #[test]
1623    fn put_peers_prioritise_median_voters_without_reordering_quotes() {
1624        const MEDIAN_ISSUER_SEED: u8 = 7;
1625
1626        let quotes = vec![
1627            synthetic_quote(1, 10),
1628            synthetic_quote(2, 20),
1629            synthetic_quote(3, 30),
1630            synthetic_quote(4, 50),
1631            synthetic_quote(5, 60),
1632            synthetic_quote(6, 70),
1633            synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1634        ];
1635        let mut voters_by_peer = HashMap::new();
1636        voters_by_peer.insert(
1637            synthetic_peer(MEDIAN_ISSUER_SEED),
1638            synthetic_voters(&[3, 4, 5, 6, MEDIAN_ISSUER_SEED]),
1639        );
1640
1641        let put_candidates = put_peers_from_seeds(&[1, 2, 3, 4, 5, 6, 7]);
1642        let put_peers = put_peers_with_median_voters_first(
1643            &quotes,
1644            &put_candidates,
1645            &voters_by_peer,
1646            witnessed_close_group_quorum(),
1647        )
1648        .expect("median voters should produce an ordered PUT set");
1649
1650        assert_eq!(quote_peer_seeds(&quotes), vec![1, 2, 3, 4, 5, 6, 7]);
1651        let (median_peer_id, _) =
1652            median_paid_quote_issuer(&quotes).expect("selected quotes have a median");
1653        assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
1654        assert_eq!(put_peer_seeds(&put_peers), vec![3, 4, 5, 6, 7, 1, 2]);
1655    }
1656
1657    #[test]
1658    fn filter_drops_only_bad_bindings_and_leaves_storer_acceptable_quotes() {
1659        let mut quotes = vec![
1660            good_quote_real(),
1661            bad_quote_real(),
1662            good_quote_real(),
1663            bad_quote_real(),
1664            good_quote_real(),
1665        ];
1666
1667        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1668
1669        assert_eq!(dropped, 2, "two crossed-key quotes must be dropped");
1670        assert_eq!(quotes.len(), 3, "three real-key quotes must remain");
1671
1672        // Cross-checked invariant: every retained quote would be accepted by
1673        // a storer running the full spec. The defensive filter only checks
1674        // the binding, so this asserts the binding-only filter is correct
1675        // for binding-only failures (other failure modes are filtered by
1676        // the per-peer classifier upstream).
1677        for (peer_id, _, quote, _) in &quotes {
1678            assert!(
1679                storer_binding_would_accept(peer_id, quote),
1680                "every retained quote must satisfy the full storer-side spec"
1681            );
1682        }
1683    }
1684
1685    #[test]
1686    fn filter_is_noop_when_all_quotes_are_storer_acceptable() {
1687        let mut quotes: Vec<_> = (0..5).map(|_| good_quote_real()).collect();
1688        let before = quotes.len();
1689        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1690        assert_eq!(dropped, 0);
1691        assert_eq!(quotes.len(), before);
1692        for (peer_id, _, quote, _) in &quotes {
1693            assert!(storer_binding_would_accept(peer_id, quote));
1694        }
1695    }
1696
1697    #[test]
1698    fn filter_drops_all_when_every_responder_is_bad() {
1699        // The "all hostile" case: every peer returned a bad binding. The
1700        // patch should leave us with zero quotes (not panic, not skip the
1701        // filter, not return malformed quotes). The caller then surfaces
1702        // InsufficientPeers.
1703        let mut quotes: Vec<_> = (0..fault_tolerant_quote_query_count())
1704            .map(|_| bad_quote_real())
1705            .collect();
1706        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1707        assert_eq!(dropped, fault_tolerant_quote_query_count());
1708        assert!(quotes.is_empty());
1709    }
1710
1711    #[test]
1712    fn filter_preserves_quote_payload_byte_for_byte() {
1713        // After filtering, the retained quotes must be untouched — pub_key,
1714        // signature, content, timestamp, price, rewards_address. The patch
1715        // is a filter, not a transformation; this test catches any future
1716        // regression that mutates a retained quote.
1717        let (peer_id, addrs, original_quote, amount) = good_quote_real();
1718        let mut quotes = vec![(peer_id, addrs.clone(), original_quote.clone(), amount)];
1719        let _ = drop_quotes_with_bad_bindings(&mut quotes);
1720
1721        let (kept_peer, kept_addrs, kept_quote, kept_amount) =
1722            quotes.pop().expect("the good quote must survive filtering");
1723        assert_eq!(kept_peer.as_bytes(), peer_id.as_bytes());
1724        assert_eq!(kept_addrs.len(), addrs.len());
1725        assert_eq!(kept_amount, amount);
1726        assert_eq!(kept_quote.pub_key, original_quote.pub_key);
1727        assert_eq!(kept_quote.signature, original_quote.signature);
1728        assert_eq!(kept_quote.content.0, original_quote.content.0);
1729        assert_eq!(kept_quote.timestamp, original_quote.timestamp);
1730        assert_eq!(kept_quote.price, original_quote.price);
1731        assert_eq!(kept_quote.rewards_address, original_quote.rewards_address);
1732    }
1733
1734    // ============================================================
1735    // The Apr 30 production-failure repro
1736    // ============================================================
1737
1738    /// Repro of the production failure from 2026-04-30 testnet runs.
1739    ///
1740    /// An external operator on `75.48.86.24` ran two co-located ant-node
1741    /// identities (peer `0755ecb55b…` and peer `073db92f…`) that crossed
1742    /// their quote-signing keys. Every chunk whose XOR-closest set happened
1743    /// to include peer `0755ecb5` got a payment proof with one malformed
1744    /// quote, and the storer's `validate_peer_bindings` rejected the
1745    /// entire close-group proof — burning the chunk's payment.
1746    ///
1747    /// This test proves the fault-tolerant quote path still fixes that failure
1748    /// shape:
1749    ///
1750    /// 1. We assemble `2x CLOSE_GROUP_SIZE` real ML-DSA-65 quotes — the same
1751    ///    buffer merkle preflight and merkle-mode estimates retain for probes.
1752    /// 2. One of them is a *crossed-key* quote — the production failure shape.
1753    /// 3. We run an independent `storer_would_accept` check (re-derived from
1754    ///    the storer spec, not from `quote_binding_is_valid`) over the
1755    ///    pre-filter set; we confirm the bad peer is rejected, proving the
1756    ///    storer **would** burn the chunk's payment if we proceeded unfiltered.
1757    /// 4. We run `drop_quotes_with_bad_bindings`.
1758    /// 5. We re-run `storer_would_accept` over the post-filter set; we confirm
1759    ///    EVERY remaining quote would be accepted, proving the filtered set
1760    ///    will not trigger the `validate_peer_bindings` rejection that caused
1761    ///    the Apr 30 outage.
1762    /// 6. We confirm the post-filter set has at least `CLOSE_GROUP_SIZE`
1763    ///    quotes — the over-query buffer (2x) is sufficient.
1764    #[test]
1765    fn repro_apr_30_storer_would_have_rejected_pre_filter_and_accepts_post_filter() {
1766        let over_query_count = fault_tolerant_quote_query_count();
1767        let mut quotes: Vec<_> = (0..over_query_count - 1)
1768            .map(|_| good_quote_real())
1769            .collect();
1770        // Splice the crossed-key quote in the middle (mirrors the random
1771        // position the bad peer takes in the DHT-returned closest set).
1772        quotes.insert(over_query_count / 2, bad_quote_real());
1773        assert_eq!(quotes.len(), over_query_count);
1774
1775        // Step 1: prove the storer would reject the pre-filter set.
1776        let storer_would_reject_count = quotes
1777            .iter()
1778            .filter(|(p, _, q, _)| !storer_binding_would_accept(p, q))
1779            .count();
1780        assert_eq!(
1781            storer_would_reject_count, 1,
1782            "exactly one quote (the crossed-key one) must be rejected by the storer spec"
1783        );
1784
1785        // Step 2: run the patched filter.
1786        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1787        assert_eq!(dropped, 1, "exactly the crossed-key quote must be filtered");
1788
1789        // Step 3: prove the storer would accept every survivor under the FULL spec.
1790        for (peer_id, _, quote, _) in &quotes {
1791            assert!(
1792                storer_binding_would_accept(peer_id, quote),
1793                "every post-filter quote must be accepted by the storer spec — \
1794                 this is what the filter guarantees before any quote set is used"
1795            );
1796        }
1797
1798        // Step 4: prove the over-query buffer is sufficient to refill.
1799        assert!(
1800            quotes.len() >= CLOSE_GROUP_SIZE,
1801            "after filtering, at least CLOSE_GROUP_SIZE good quotes must remain \
1802             so a fault-tolerant probe can still return a full close group"
1803        );
1804    }
1805
1806    /// When more than the over-query buffer of peers misbehave, the filter
1807    /// must NOT silently produce a short proof. The downstream caller in
1808    /// `get_store_quotes` must see fewer than `CLOSE_GROUP_SIZE` survivors
1809    /// and return `InsufficientPeers`.
1810    #[test]
1811    fn filter_leaves_short_set_when_too_many_bad_peers() {
1812        let good_count = CLOSE_GROUP_SIZE - 1;
1813        let bad_count = fault_tolerant_quote_query_count() - good_count;
1814        let mut quotes: Vec<_> = std::iter::repeat_with(bad_quote_real)
1815            .take(bad_count)
1816            .chain(std::iter::repeat_with(good_quote_real).take(good_count))
1817            .collect();
1818
1819        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1820        assert_eq!(dropped, bad_count);
1821        assert!(
1822            quotes.len() < CLOSE_GROUP_SIZE,
1823            "this is the precondition for InsufficientPeers downstream"
1824        );
1825        // Sanity: every survivor is storer-acceptable under the full spec.
1826        for (peer_id, _, quote, _) in &quotes {
1827            assert!(storer_binding_would_accept(peer_id, quote));
1828        }
1829    }
1830
1831    // ============================================================
1832    // Tests for the per-peer response classifier (the PRIMARY defense).
1833    //
1834    // These tests exercise the production code path that runs inside
1835    // get_store_quotes' per-peer async closure. The defensive
1836    // `drop_quotes_with_bad_bindings` is a second line of defence —
1837    // these tests make sure the FIRST line is what actually catches
1838    // misbehaving peers in production. Without these, a regression
1839    // that removes the per-peer check could be masked by the post-
1840    // collect filter and pass the rest of the suite.
1841    // ============================================================
1842
1843    /// Helper: serialize a `PaymentQuote` to bytes the way the wire layer
1844    /// does (rmp_serde / msgpack), to feed into `classify_quote_response`.
1845    fn serialize_quote(quote: &PaymentQuote) -> Vec<u8> {
1846        rmp_serde::to_vec(quote).expect("serialize quote")
1847    }
1848
1849    #[test]
1850    fn classifier_accepts_real_self_consistent_quote() {
1851        let (peer_id, _, quote, _) = good_quote_real();
1852        let bytes = serialize_quote(&quote);
1853        let result = classify_quote_response(&peer_id, &bytes, false);
1854        match result {
1855            Ok((q, price)) => {
1856                assert_eq!(q.pub_key, quote.pub_key);
1857                assert_eq!(price, quote.price);
1858            }
1859            Err(e) => panic!("expected Ok, got {e}"),
1860        }
1861    }
1862
1863    #[test]
1864    fn classifier_rejects_crossed_keypair_with_typed_error() {
1865        let (peer_id, _, quote, _) = bad_quote_real();
1866        let bytes = serialize_quote(&quote);
1867        let result = classify_quote_response(&peer_id, &bytes, false);
1868        match result {
1869            Err(Error::BadQuoteBinding {
1870                peer_id: pid,
1871                detail,
1872            }) => {
1873                assert_eq!(pid, peer_id.to_string());
1874                assert!(
1875                    detail.contains("BLAKE3(pub_key)="),
1876                    "diagnostic detail must include the derived peer id: {detail}"
1877                );
1878            }
1879            other => panic!("expected BadQuoteBinding for crossed-key quote, got {other:?}"),
1880        }
1881    }
1882
1883    /// CRITICAL: a misbehaving peer that votes `already_stored=true` must
1884    /// NOT be allowed to influence the close-group "already stored"
1885    /// majority decision. The bind-check runs before the AlreadyStored
1886    /// short-circuit, so a crossed-key peer voting "already stored" is
1887    /// classified as `BadQuoteBinding`, not `AlreadyStored`.
1888    ///
1889    /// This locks in a specific reviewer concern from round 1:
1890    ///   "A peer with a crossed/garbage signing key could simply respond
1891    ///   already_stored=true and its vote enters already_stored_peers
1892    ///   unfiltered."
1893    #[test]
1894    fn classifier_rejects_already_stored_vote_from_bad_binding_peer() {
1895        let (peer_id, _, quote, _) = bad_quote_real();
1896        let bytes = serialize_quote(&quote);
1897        // The peer claims already_stored=true, but its quote has a crossed key.
1898        let result = classify_quote_response(&peer_id, &bytes, true);
1899        assert!(
1900            matches!(result, Err(Error::BadQuoteBinding { .. })),
1901            "crossed-key peer must be classified BadQuoteBinding even when \
1902             voting already_stored=true; got {result:?}"
1903        );
1904    }
1905
1906    /// An honest peer's `already_stored=true` vote IS honoured (after
1907    /// passing the bind-check). This is the contrast to the test above.
1908    #[test]
1909    fn classifier_honours_already_stored_vote_from_good_binding_peer() {
1910        let (peer_id, _, quote, _) = good_quote_real();
1911        let bytes = serialize_quote(&quote);
1912        let result = classify_quote_response(&peer_id, &bytes, true);
1913        assert!(
1914            matches!(result, Err(Error::AlreadyStored)),
1915            "honest peer's already_stored vote must be honoured; got {result:?}"
1916        );
1917    }
1918
1919    #[test]
1920    fn classifier_returns_serialization_error_on_bad_bytes() {
1921        let (peer_id, _, _, _) = good_quote_real();
1922        let garbage = b"this is not a valid msgpack PaymentQuote".to_vec();
1923        let result = classify_quote_response(&peer_id, &garbage, false);
1924        assert!(
1925            matches!(result, Err(Error::Serialization(_))),
1926            "garbage bytes must produce a Serialization error; got {result:?}"
1927        );
1928    }
1929
1930    /// Cross-validate the classifier's binding verdict against the
1931    /// independent storer-spec re-derivation across mixed responders.
1932    #[test]
1933    fn classifier_verdict_matches_storer_binding_spec_for_mixed_responders() {
1934        let mut responders: Vec<(PeerId, PaymentQuote)> = (0..12)
1935            .map(|_| {
1936                let (p, _, q, _) = good_quote_real();
1937                (p, q)
1938            })
1939            .collect();
1940        for _ in 0..4 {
1941            let (p, _, q, _) = bad_quote_real();
1942            responders.push((p, q));
1943        }
1944
1945        for (peer_id, quote) in &responders {
1946            let bytes = serialize_quote(quote);
1947            let storer_verdict = storer_binding_would_accept(peer_id, quote);
1948            let classifier_verdict = classify_quote_response(peer_id, &bytes, false).is_ok();
1949            assert_eq!(
1950                classifier_verdict, storer_verdict,
1951                "classifier and storer-binding-spec must agree on every responder \
1952                 (peer_id={}, storer={storer_verdict}, classifier={classifier_verdict})",
1953                peer_id
1954            );
1955        }
1956    }
1957}