Skip to main content

ant_core/data/client/
quote.rs

1//! Quote and payment operations.
2//!
3//! Handles requesting storage quotes from network nodes and
4//! managing payment for data storage.
5
6use crate::data::client::peer_xor_distance;
7use crate::data::client::Client;
8use crate::data::client::PUT_TARGET_WIDTH;
9use crate::data::error::{Error, Result};
10use ant_protocol::evm::{Amount, PaymentQuote};
11use ant_protocol::transport::{
12    DHTNode, MultiAddr, P2PNode, PeerId, ResponderView, WitnessedCloseGroup,
13};
14use ant_protocol::{
15    compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
16    ChunkQuoteRequest, ChunkQuoteResponse, CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE,
17};
18use futures::stream::{FuturesUnordered, StreamExt};
19use std::collections::{HashMap, HashSet};
20use std::sync::Arc;
21use std::time::Duration;
22use tracing::{debug, info, warn};
23
24/// Fault-tolerant quote collection asks one extra close group of peers and
25/// keeps the closest successful `CLOSE_GROUP_SIZE` responders. This remains
26/// useful for merkle preflight probes, but single-node payments deliberately
27/// ask only the actual close group.
28const FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER: usize = 2;
29
30/// Witnessed close-group quorum as a fraction of the initial close group.
31/// For today's `CLOSE_GROUP_SIZE = 7`, this yields the requested 5-of-7
32/// quorum.
33const WITNESSED_QUORUM_NUMERATOR: usize = 2;
34const WITNESSED_QUORUM_DENOMINATOR: usize = 3;
35
36/// Number of closest nodes each initial witnessed responder contributes.
37const SINGLE_NODE_WITNESSED_VIEW_COUNT: usize = 20;
38
39/// Index of the paid median quote after sorting by quoted price.
40const MEDIAN_QUOTE_INDEX: usize = CLOSE_GROUP_SIZE / 2;
41
42/// Overall timeout for collecting quote responses. Must accommodate
43/// connect_with_fallback cascade (direct 5s + hole-punch 15s×3 + relay 30s ≈
44/// 80s) plus the per-peer quote timeout.
45const QUOTE_COLLECTION_TIMEOUT_SECS: u64 = 120;
46
47/// ML-DSA-65 public key length in bytes. Mirrors the same value defined as
48/// `pub const ML_DSA_65_PUBLIC_KEY_SIZE` in `saorsa-pqc::pqc::types`, which
49/// the storer's `peer_id_from_public_key_bytes` enforces. We keep a local
50/// copy here rather than adding a direct `saorsa-pqc` dep — the constant
51/// is FIPS-mandated for ML-DSA-65 and won't change unless we change variant.
52///
53/// TODO: switch to `saorsa_pqc::pqc::types::ML_DSA_65_PUBLIC_KEY_SIZE` once
54/// `ant-protocol` re-exports it (`pqc::ops::ML_DSA_65_PUBLIC_KEY_SIZE`).
55const ML_DSA_PUB_KEY_LEN: usize = 1952;
56
57/// Check that a quote's `pub_key` is well-formed and BLAKE3-hashes to the
58/// claimed `peer_id`.
59///
60/// The storer node enforces both constraints in `ant-node/src/payment/verifier.rs`
61/// (via `peer_id_from_public_key_bytes` and `validate_peer_bindings`): every
62/// quote inside a `ProofOfPayment` must (a) have a 1952-byte `pub_key` parsable
63/// as ML-DSA-65 and (b) satisfy `BLAKE3(pub_key) == peer_id`. A single quote
64/// failing either check causes the storer to reject the entire close-group
65/// proof and burn the chunk's payment.
66///
67/// We mirror the cheap structural check here. The storer also runs
68/// `verify_quote_content` and `verify_quote_signature`; those are ML-DSA
69/// verifications (~1 ms per requested quote) and are deliberately NOT mirrored
70/// on the client to keep upload latency unchanged. They are tracked as a
71/// follow-up if a real attack surfaces them.
72fn quote_binding_is_valid(peer_id: &PeerId, quote: &PaymentQuote) -> bool {
73    if quote.pub_key.len() != ML_DSA_PUB_KEY_LEN {
74        return false;
75    }
76    compute_address(&quote.pub_key) == *peer_id.as_bytes()
77}
78
79/// Classification of a `ChunkQuoteResponse::Success` body for a single peer.
80///
81/// Mirrors the storer-side `validate_peer_bindings` check from
82/// `ant-node/src/payment/verifier.rs` — the cheap BLAKE3 binding —
83/// so we drop misbehaving peers' quotes before payment.
84///
85/// We deliberately do NOT mirror the storer's `verify_quote_signature`
86/// (ML-DSA-65 verify, ~1 ms × CLOSE_GROUP_SIZE × every chunk) or
87/// `verify_quote_content`. Those are useful defense-in-depth for an
88/// attacker who self-consistently crafts a signed-but-stolen or wrong-
89/// content quote, but they are NOT cheap and are out of scope for this
90/// fix. Adding them changes upload latency materially. Track them as a
91/// follow-up if a real attack surfaces them.
92///
93/// Pulling the logic out of the async closure lets us unit-test the
94/// primary defense (not just the post-collect defensive filter).
95///
96/// # Returns
97///
98/// - `Ok((quote, price))` — the response is honoured as a quote.
99/// - `Err(Error::AlreadyStored)` — the peer claims the chunk is already
100///   present AND the quote it provided binds to its peer ID. Vote counts.
101/// - `Err(Error::BadQuoteBinding { .. })` — bad binding (mirrors the
102///   storer-side rejection). Outer collector counts these via the typed
103///   variant (no string matching).
104/// - `Err(Error::Serialization(...))` — the quote bytes did not deserialize.
105fn classify_quote_response(
106    peer_id: &PeerId,
107    quote_bytes: &[u8],
108    already_stored: bool,
109) -> std::result::Result<(PaymentQuote, Amount), Error> {
110    let payment_quote = rmp_serde::from_slice::<PaymentQuote>(quote_bytes).map_err(|e| {
111        Error::Serialization(format!("Failed to deserialize quote from {peer_id}: {e}"))
112    })?;
113
114    // Peer binding: BLAKE3(pub_key) must equal peer_id. This is the
115    // exact mitigation Chris and the AI investigation requested for the
116    // 2026-04-30 production failure: drop crossed-key peers before they
117    // poison the close-group ProofOfPayment.
118    if !quote_binding_is_valid(peer_id, &payment_quote) {
119        let derived = compute_address(&payment_quote.pub_key);
120        warn!(
121            "Dropping response from {peer_id} — quote.pub_key BLAKE3 mismatch \
122             (peer is signing quotes with another peer's key); the storer \
123             would reject this proof"
124        );
125        return Err(Error::BadQuoteBinding {
126            peer_id: peer_id.to_string(),
127            detail: format!(
128                "BLAKE3(pub_key)={} pub_key_len={}",
129                hex::encode(derived),
130                payment_quote.pub_key.len(),
131            ),
132        });
133    }
134
135    if already_stored {
136        debug!("Peer {peer_id} already has chunk");
137        return Err(Error::AlreadyStored);
138    }
139    let price = payment_quote.price;
140    debug!("Received quote from {peer_id}: price = {price}");
141    Ok((payment_quote, price))
142}
143
144/// Drop quotes whose `pub_key` does not BLAKE3-hash to the peer that supplied
145/// them. Logs each dropped quote at WARN.
146fn drop_quotes_with_bad_bindings(
147    quotes: &mut Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>,
148) -> usize {
149    let before = quotes.len();
150    quotes.retain(|(peer_id, _, quote, _)| {
151        if quote_binding_is_valid(peer_id, quote) {
152            true
153        } else {
154            warn!(
155                "Dropping quote from peer {peer_id} — quote.pub_key BLAKE3 mismatch \
156                 (peer is signing quotes with another peer's key); the storer would \
157                 reject this proof"
158            );
159            false
160        }
161    });
162    before - quotes.len()
163}
164
165#[allow(clippy::too_many_arguments)]
166async fn request_store_quote_from_peer(
167    node: Arc<P2PNode>,
168    peer_id: PeerId,
169    peer_addrs: Vec<MultiAddr>,
170    request_id: u64,
171    address: [u8; 32],
172    data_size: u64,
173    data_type: u32,
174    per_peer_timeout: Duration,
175) -> StoreQuoteRequestResult {
176    let request = ChunkQuoteRequest {
177        address,
178        data_size,
179        data_type,
180    };
181    let message = ChunkMessage {
182        request_id,
183        body: ChunkMessageBody::QuoteRequest(request),
184    };
185
186    let message_bytes = match message.encode() {
187        Ok(bytes) => bytes,
188        Err(e) => {
189            return (
190                peer_id,
191                peer_addrs,
192                Err(Error::Protocol(format!(
193                    "Failed to encode quote request for {peer_id}: {e}"
194                ))),
195            );
196        }
197    };
198
199    let result = send_and_await_chunk_response(
200        &node,
201        &peer_id,
202        message_bytes,
203        request_id,
204        per_peer_timeout,
205        &peer_addrs,
206        |body| match body {
207            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
208                quote,
209                already_stored,
210            }) => Some(classify_quote_response(&peer_id, &quote, already_stored)),
211            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => Some(Err(
212                Error::Protocol(format!("Quote error from {peer_id}: {e}")),
213            )),
214            _ => None,
215        },
216        |e| Error::Network(format!("Failed to send quote request to {peer_id}: {e}")),
217        || Error::Timeout(format!("Timeout waiting for quote from {peer_id}")),
218    )
219    .await;
220
221    (peer_id, peer_addrs, result)
222}
223
224#[allow(clippy::too_many_arguments)]
225fn record_store_quote_result(
226    peer_id: PeerId,
227    addrs: Vec<MultiAddr>,
228    quote_result: Result<(PaymentQuote, Amount)>,
229    address: &[u8; 32],
230    quotes: &mut Vec<StoreQuote>,
231    already_stored_peers: &mut Vec<(PeerId, [u8; 32])>,
232    failures: &mut Vec<String>,
233    bad_quote_count: &mut usize,
234) {
235    match quote_result {
236        Ok((quote, price)) => {
237            quotes.push((peer_id, addrs, quote, price));
238        }
239        Err(Error::AlreadyStored) => {
240            info!("Peer {peer_id} reports chunk already stored");
241            let dist = peer_xor_distance(&peer_id, address);
242            already_stored_peers.push((peer_id, dist));
243        }
244        Err(e) => {
245            if matches!(&e, Error::BadQuoteBinding { .. }) {
246                *bad_quote_count += 1;
247            }
248            warn!("Failed to get quote from {peer_id}: {e}");
249            failures.push(format!("{peer_id}: {e}"));
250        }
251    }
252}
253
254fn witnessed_quote_launch_budget(
255    successful_quotes: usize,
256    in_flight: usize,
257    remaining_peers: usize,
258) -> usize {
259    CLOSE_GROUP_SIZE
260        .saturating_sub(successful_quotes.saturating_add(in_flight))
261        .min(remaining_peers)
262}
263
264fn single_node_quote_query_count() -> usize {
265    CLOSE_GROUP_SIZE
266}
267
268fn fault_tolerant_quote_query_count() -> usize {
269    CLOSE_GROUP_SIZE * FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER
270}
271
272fn witnessed_close_group_quorum() -> usize {
273    (CLOSE_GROUP_SIZE * WITNESSED_QUORUM_NUMERATOR).div_ceil(WITNESSED_QUORUM_DENOMINATOR)
274}
275
276fn witnessed_close_group_quorum_for_missing_views(missing_views: usize) -> usize {
277    witnessed_close_group_quorum()
278        .saturating_sub(missing_views)
279        .max(1)
280}
281
282fn missing_witnessed_responder_views(witnessed: &WitnessedCloseGroup) -> usize {
283    witnessed
284        .initial_closest
285        .len()
286        .saturating_sub(witnessed.responder_views.len())
287}
288
289fn witnessed_close_group_quorum_for_transcript(witnessed: &WitnessedCloseGroup) -> usize {
290    witnessed_close_group_quorum_for_missing_views(missing_witnessed_responder_views(witnessed))
291}
292
293/// Restrict a witnessed transcript to its closest `CLOSE_GROUP_SIZE` peers.
294///
295/// The witnessed query is widened to `PUT_TARGET_WIDTH` peers so we
296/// have addresses for the full PUT-target set, but the consensus/quorum/quote
297/// logic must still run on the close group only. Keeping just the closest-7
298/// initial peers and the responder views contributed by those peers leaves the
299/// `missing_witnessed_responder_views` math — and the quorum derived from it —
300/// byte-for-byte identical to a `CLOSE_GROUP_SIZE`-wide query.
301fn scope_witnessed_to_close_group(witnessed: &WitnessedCloseGroup) -> WitnessedCloseGroup {
302    let initial_closest: Vec<DHTNode> = witnessed
303        .initial_closest
304        .iter()
305        .take(CLOSE_GROUP_SIZE)
306        .cloned()
307        .collect();
308    let scope: HashSet<PeerId> = initial_closest.iter().map(|node| node.peer_id).collect();
309    let responder_views: Vec<ResponderView> = witnessed
310        .responder_views
311        .iter()
312        .filter(|view| scope.contains(&view.responder))
313        .cloned()
314        .collect();
315    WitnessedCloseGroup {
316        target: witnessed.target,
317        k: CLOSE_GROUP_SIZE,
318        initial_closest,
319        responder_views,
320    }
321}
322
323fn peer_list(peers: &[PeerId]) -> Vec<String> {
324    peers.iter().map(ToString::to_string).collect()
325}
326
327pub(crate) type StoreQuote = (PeerId, Vec<MultiAddr>, PaymentQuote, Amount);
328type StoreQuoteRequestResult = (PeerId, Vec<MultiAddr>, Result<(PaymentQuote, Amount)>);
329type VotersByPeer = HashMap<PeerId, HashSet<PeerId>>;
330type WitnessedVoteData = (HashMap<PeerId, DHTNode>, VotersByPeer, Vec<(PeerId, usize)>);
331
332pub(crate) struct StoreQuotePlan {
333    pub(crate) quotes: Vec<StoreQuote>,
334    pub(crate) put_peers: Vec<(PeerId, Vec<MultiAddr>)>,
335}
336
337#[derive(Debug, Clone)]
338struct WitnessedQuoteCandidate {
339    node: DHTNode,
340    votes: usize,
341    voters: HashSet<PeerId>,
342}
343
344#[derive(Debug, Clone)]
345struct WitnessedQuotePeer {
346    peer_id: PeerId,
347    addrs: Vec<MultiAddr>,
348    voters: HashSet<PeerId>,
349}
350
351#[derive(Debug, Clone)]
352struct WitnessedQuoteSelection {
353    quote_peers: Vec<WitnessedQuotePeer>,
354    initial_put_peers: Vec<(PeerId, Vec<MultiAddr>)>,
355    quorum: usize,
356}
357
358enum QuoteSelectionPolicy {
359    ClosestByDistance,
360    WitnessedMedianVoters {
361        voters_by_peer: VotersByPeer,
362        quorum: usize,
363    },
364}
365
366fn witnessed_initial_peers(witnessed: &WitnessedCloseGroup) -> Vec<String> {
367    witnessed
368        .initial_closest
369        .iter()
370        .map(|node| node.peer_id.to_string())
371        .collect()
372}
373
374fn witnessed_responder_views(witnessed: &WitnessedCloseGroup) -> Vec<String> {
375    witnessed
376        .responder_views
377        .iter()
378        .map(|view| {
379            let peers = view
380                .closest
381                .iter()
382                .map(|node| node.peer_id)
383                .collect::<Vec<_>>();
384            format!("{}=>{:?}", view.responder, peer_list(&peers))
385        })
386        .collect()
387}
388
389fn merge_witnessed_node(nodes: &mut HashMap<PeerId, DHTNode>, node: DHTNode) {
390    match nodes.entry(node.peer_id) {
391        std::collections::hash_map::Entry::Occupied(mut entry) => {
392            entry.get_mut().merge_from(node);
393        }
394        std::collections::hash_map::Entry::Vacant(entry) => {
395            entry.insert(node);
396        }
397    }
398}
399
400fn sort_vote_counts_by_distance(vote_counts: &mut [(PeerId, usize)], address: &[u8; 32]) {
401    vote_counts.sort_by(|left, right| {
402        peer_xor_distance(&left.0, address)
403            .cmp(&peer_xor_distance(&right.0, address))
404            .then_with(|| left.0.as_bytes().cmp(right.0.as_bytes()))
405    });
406}
407
408fn witnessed_vote_counts_and_nodes(
409    witnessed: &WitnessedCloseGroup,
410    address: &[u8; 32],
411) -> WitnessedVoteData {
412    let mut known_nodes = HashMap::new();
413    for node in &witnessed.initial_closest {
414        merge_witnessed_node(&mut known_nodes, node.clone());
415    }
416
417    let mut voters_by_peer: HashMap<PeerId, HashSet<PeerId>> = HashMap::new();
418    for view in &witnessed.responder_views {
419        let mut voted = HashSet::new();
420        for node in &view.closest {
421            merge_witnessed_node(&mut known_nodes, node.clone());
422            if voted.insert(node.peer_id) {
423                voters_by_peer
424                    .entry(node.peer_id)
425                    .or_default()
426                    .insert(view.responder);
427            }
428        }
429    }
430
431    let mut vote_counts: Vec<(PeerId, usize)> = voters_by_peer
432        .iter()
433        .map(|(peer_id, voters)| (*peer_id, voters.len()))
434        .collect();
435    sort_vote_counts_by_distance(&mut vote_counts, address);
436    (known_nodes, voters_by_peer, vote_counts)
437}
438
439fn witnessed_consensus_candidates(
440    witnessed: &WitnessedCloseGroup,
441    address: &[u8; 32],
442    quorum: usize,
443) -> Vec<WitnessedQuoteCandidate> {
444    let (known_nodes, voters_by_peer, vote_counts) =
445        witnessed_vote_counts_and_nodes(witnessed, address);
446    let mut candidates = vote_counts
447        .iter()
448        .filter_map(|(peer_id, votes)| {
449            if *votes < quorum {
450                return None;
451            }
452            known_nodes.get(peer_id).cloned().and_then(|node| {
453                voters_by_peer
454                    .get(peer_id)
455                    .cloned()
456                    .map(|voters| WitnessedQuoteCandidate {
457                        node,
458                        votes: *votes,
459                        voters,
460                    })
461            })
462        })
463        .collect::<Vec<_>>();
464
465    candidates.sort_by(|left, right| {
466        peer_xor_distance(&left.node.peer_id, address)
467            .cmp(&peer_xor_distance(&right.node.peer_id, address))
468            .then_with(|| right.votes.cmp(&left.votes))
469            .then_with(|| {
470                left.node
471                    .peer_id
472                    .as_bytes()
473                    .cmp(right.node.peer_id.as_bytes())
474            })
475    });
476    candidates
477}
478
479fn witnessed_vote_counts(witnessed: &WitnessedCloseGroup, address: &[u8; 32]) -> Vec<String> {
480    let (_, _, vote_counts) = witnessed_vote_counts_and_nodes(witnessed, address);
481    vote_counts
482        .iter()
483        .map(|(peer_id, votes)| format!("{peer_id}:{votes}"))
484        .collect()
485}
486
487fn witnessed_consensus(
488    witnessed: &WitnessedCloseGroup,
489    address: &[u8; 32],
490    quorum: usize,
491) -> Vec<String> {
492    witnessed_consensus_candidates(witnessed, address, quorum)
493        .iter()
494        .map(|candidate| format!("{}:{}", candidate.node.peer_id, candidate.votes))
495        .collect()
496}
497
498fn witnessed_close_group_diagnostics(
499    address: &[u8; 32],
500    witnessed: &WitnessedCloseGroup,
501    quorum: usize,
502) -> String {
503    format!(
504        "target={}, initial={:?}, responder_views={:?}, vote_counts={:?}, quorum={}, final={:?}",
505        hex::encode(address),
506        witnessed_initial_peers(witnessed),
507        witnessed_responder_views(witnessed),
508        witnessed_vote_counts(witnessed, address),
509        quorum,
510        witnessed_consensus(witnessed, address, quorum)
511    )
512}
513
514fn witnessed_quote_selection_or_error(
515    address: &[u8; 32],
516    witnessed: &WitnessedCloseGroup,
517    required: usize,
518    quorum: usize,
519) -> Result<WitnessedQuoteSelection> {
520    let candidates = witnessed_consensus_candidates(witnessed, address, quorum);
521    if candidates.len() < required {
522        return Err(Error::InsufficientPeers(format!(
523            "Witnessed close group inconclusive before payment: got {}/{} quorum-recognised peers. {}",
524            candidates.len(),
525            required,
526            witnessed_close_group_diagnostics(address, witnessed, quorum)
527        )));
528    }
529
530    let initial_put_peers = witnessed
531        .initial_closest
532        .iter()
533        .take(CLOSE_GROUP_SIZE)
534        .map(|node| (node.peer_id, node.addresses_by_priority()))
535        .collect::<Vec<_>>();
536
537    if initial_put_peers.len() < CLOSE_GROUP_SIZE {
538        return Err(Error::InsufficientPeers(format!(
539            "Witnessed close group returned only {}/{} initial PUT peers before payment. {}",
540            initial_put_peers.len(),
541            CLOSE_GROUP_SIZE,
542            witnessed_close_group_diagnostics(address, witnessed, quorum)
543        )));
544    }
545
546    let quote_peers = candidates
547        .into_iter()
548        .map(|candidate| WitnessedQuotePeer {
549            peer_id: candidate.node.peer_id,
550            addrs: candidate.node.addresses_by_priority(),
551            voters: candidate.voters,
552        })
553        .collect();
554
555    Ok(WitnessedQuoteSelection {
556        quote_peers,
557        initial_put_peers,
558        quorum,
559    })
560}
561
562pub(crate) fn median_paid_quote_issuer(
563    quotes: &[(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)],
564) -> Option<(PeerId, Amount)> {
565    if quotes.len() <= MEDIAN_QUOTE_INDEX {
566        return None;
567    }
568
569    let mut by_price: Vec<(usize, PeerId, Amount)> = quotes
570        .iter()
571        .enumerate()
572        .map(|(index, (peer_id, _, _, price))| (index, *peer_id, *price))
573        .collect();
574    by_price.sort_by_key(|(index, _, price)| (*price, *index));
575    by_price
576        .get(MEDIAN_QUOTE_INDEX)
577        .map(|(_, peer_id, price)| (*peer_id, *price))
578}
579
580fn sort_quotes_by_distance(quotes: &mut [StoreQuote], address: &[u8; 32]) {
581    quotes.sort_by(|left, right| {
582        peer_xor_distance(&left.0, address)
583            .cmp(&peer_xor_distance(&right.0, address))
584            .then_with(|| left.0.as_bytes().cmp(right.0.as_bytes()))
585    });
586}
587
588fn median_paid_quote_issuer_for_indices(
589    quotes: &[StoreQuote],
590    indices: &[usize],
591) -> Option<(PeerId, Amount)> {
592    if indices.len() <= MEDIAN_QUOTE_INDEX {
593        return None;
594    }
595
596    let mut by_price: Vec<(usize, PeerId, Amount)> = indices
597        .iter()
598        .enumerate()
599        .map(|(selected_index, quote_index)| {
600            let (peer_id, _, _, price) = &quotes[*quote_index];
601            (selected_index, *peer_id, *price)
602        })
603        .collect();
604    by_price.sort_by_key(|(selected_index, _, price)| (*price, *selected_index));
605    by_price
606        .get(MEDIAN_QUOTE_INDEX)
607        .map(|(_, peer_id, price)| (*peer_id, *price))
608}
609
610fn median_issuer_voter_support(
611    quotes: &[StoreQuote],
612    indices: &[usize],
613    voters_by_peer: &VotersByPeer,
614) -> Option<(PeerId, usize)> {
615    let (median_peer_id, _) = median_paid_quote_issuer_for_indices(quotes, indices)?;
616    let voters = voters_by_peer.get(&median_peer_id)?;
617    Some((median_peer_id, voters.len()))
618}
619
620fn visit_quote_subsets<F>(
621    quote_count: usize,
622    subset_size: usize,
623    start_index: usize,
624    current: &mut Vec<usize>,
625    visit: &mut F,
626) where
627    F: FnMut(&[usize]),
628{
629    if current.len() == subset_size {
630        visit(current);
631        return;
632    }
633
634    let remaining = subset_size - current.len();
635    let last_start = quote_count - remaining;
636    for index in start_index..=last_start {
637        current.push(index);
638        visit_quote_subsets(quote_count, subset_size, index + 1, current, visit);
639        current.pop();
640    }
641}
642
643fn select_closest_quotes(mut quotes: Vec<StoreQuote>, address: &[u8; 32]) -> Vec<StoreQuote> {
644    sort_quotes_by_distance(&mut quotes, address);
645    quotes.truncate(CLOSE_GROUP_SIZE);
646    quotes
647}
648
649fn select_witnessed_median_voter_quotes(
650    mut quotes: Vec<StoreQuote>,
651    address: &[u8; 32],
652    voters_by_peer: &VotersByPeer,
653    required_support: usize,
654) -> Option<Vec<StoreQuote>> {
655    if quotes.len() < CLOSE_GROUP_SIZE {
656        return None;
657    }
658
659    sort_quotes_by_distance(&mut quotes, address);
660
661    let mut best_indices: Option<(usize, Vec<usize>)> = None;
662    let mut current_indices = Vec::with_capacity(CLOSE_GROUP_SIZE);
663    visit_quote_subsets(
664        quotes.len(),
665        CLOSE_GROUP_SIZE,
666        0,
667        &mut current_indices,
668        &mut |indices| {
669            let Some((_, support)) = median_issuer_voter_support(&quotes, indices, voters_by_peer)
670            else {
671                return;
672            };
673            if support < required_support {
674                return;
675            }
676            match &best_indices {
677                Some((best_support, best)) if *best_support > support => {}
678                Some((best_support, best))
679                    if *best_support == support && best.as_slice() <= indices => {}
680                _ => best_indices = Some((support, indices.to_vec())),
681            }
682        },
683    );
684
685    best_indices.map(|(_, indices)| {
686        indices
687            .into_iter()
688            .map(|index| quotes[index].clone())
689            .collect()
690    })
691}
692
693fn put_peers_with_median_voters_first(
694    quotes: &[StoreQuote],
695    put_peers: &[(PeerId, Vec<MultiAddr>)],
696    voters_by_peer: &VotersByPeer,
697    required_support: usize,
698) -> Option<Vec<(PeerId, Vec<MultiAddr>)>> {
699    let (median_peer_id, _) = median_paid_quote_issuer(quotes)?;
700    let voters = voters_by_peer.get(&median_peer_id)?;
701
702    let mut supporting_peers = Vec::new();
703    let mut fallback_peers = Vec::new();
704    for (peer_id, addrs) in put_peers {
705        let peer = (*peer_id, addrs.clone());
706        if voters.contains(peer_id) {
707            supporting_peers.push(peer);
708        } else {
709            fallback_peers.push(peer);
710        }
711    }
712
713    if supporting_peers.len() < required_support {
714        return None;
715    }
716
717    supporting_peers.extend(fallback_peers);
718    Some(supporting_peers)
719}
720
721impl Client {
722    /// Get storage quotes from the closest peers for a given address.
723    ///
724    /// Builds a quorum-witnessed candidate set with at least
725    /// `CLOSE_GROUP_SIZE` peers, requests quotes from all of them concurrently,
726    /// and returns the closest supported `CLOSE_GROUP_SIZE` successful
727    /// responders. When multiple sets are possible, the client prefers the
728    /// one with the strongest paid-median voter support, then the closest
729    /// peers by XOR distance.
730    ///
731    /// Returns `Error::AlreadyStored` early if `CLOSE_GROUP_MAJORITY` peers
732    /// report the chunk is already stored.
733    ///
734    /// # Errors
735    ///
736    /// Returns an error if insufficient quotes can be collected.
737    pub async fn get_store_quotes(
738        &self,
739        address: &[u8; 32],
740        data_size: u64,
741        data_type: u32,
742    ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
743        Ok(self
744            .get_store_quote_plan(address, data_size, data_type)
745            .await?
746            .quotes)
747    }
748
749    /// Get storage quotes plus PUT targets ordered for paid-median acceptance.
750    ///
751    /// Quote order is preserved for proof construction because tied quote
752    /// prices rely on stable median selection. PUT target order is separate:
753    /// peers that voted for the paid median issuer are placed first so the
754    /// initial write wave is locally acceptable to a storage majority.
755    pub(crate) async fn get_store_quote_plan(
756        &self,
757        address: &[u8; 32],
758        data_size: u64,
759        data_type: u32,
760    ) -> Result<StoreQuotePlan> {
761        let witnessed_selection = self.select_witnessed_quote_selection(address).await?;
762        let voters_by_peer: VotersByPeer = witnessed_selection
763            .quote_peers
764            .iter()
765            .map(|peer| (peer.peer_id, peer.voters.clone()))
766            .collect();
767        let remote_peers = witnessed_selection
768            .quote_peers
769            .into_iter()
770            .map(|peer| (peer.peer_id, peer.addrs))
771            .collect();
772        let initial_put_peers = witnessed_selection.initial_put_peers;
773        let quorum = witnessed_selection.quorum;
774        let quotes = self
775            .collect_store_quotes_from_remote_peers(
776                address,
777                data_size,
778                data_type,
779                remote_peers,
780                QuoteSelectionPolicy::WitnessedMedianVoters {
781                    voters_by_peer: voters_by_peer.clone(),
782                    quorum,
783                },
784            )
785            .await?;
786        let put_peers = put_peers_with_median_voters_first(
787            &quotes,
788            &initial_put_peers,
789            &voters_by_peer,
790            quorum,
791        )
792        .ok_or_else(|| {
793            Error::InsufficientPeers(format!(
794                "Collected {} witnessed quotes, but fewer than {} initial witness PUT peers \
795                 voted for the paid median issuer for {}",
796                quotes.len(),
797                quorum,
798                hex::encode(address)
799            ))
800        })?;
801
802        Ok(StoreQuotePlan { quotes, put_peers })
803    }
804
805    /// Get storage quotes with the previous over-query behaviour.
806    ///
807    /// Merkle preflight uses quote responses only as an already-stored probe;
808    /// the actual payment still happens through merkle candidate pools. Keep
809    /// the extra peer buffer there so merkle upload behaviour remains
810    /// unchanged when a few peers are slow or return unusable quote bindings.
811    pub(crate) async fn get_store_quotes_with_fault_tolerance(
812        &self,
813        address: &[u8; 32],
814        data_size: u64,
815        data_type: u32,
816    ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
817        let peer_query_count = fault_tolerant_quote_query_count();
818        let remote_peers = self
819            .network()
820            .find_closest_peers(address, peer_query_count)
821            .await?;
822
823        self.collect_store_quotes_from_remote_peers(
824            address,
825            data_size,
826            data_type,
827            remote_peers,
828            QuoteSelectionPolicy::ClosestByDistance,
829        )
830        .await
831    }
832
833    async fn select_witnessed_quote_selection(
834        &self,
835        address: &[u8; 32],
836    ) -> Result<WitnessedQuoteSelection> {
837        // The quote/quorum/consensus scope is the closest CLOSE_GROUP_SIZE.
838        let required = single_node_quote_query_count();
839        // Contact the closest PUT_TARGET_WIDTH peers directly so the whole
840        // PUT-target set's addresses arrive in this single query. A network
841        // with fewer than that near the target can't satisfy the wide lookup,
842        // so fall back to the close-group width — the upload still proceeds with
843        // a narrower (but valid) PUT-target set rather than failing.
844        let witnessed = match self
845            .network()
846            .find_witnessed_close_group_with_view_count(
847                address,
848                PUT_TARGET_WIDTH,
849                SINGLE_NODE_WITNESSED_VIEW_COUNT,
850            )
851            .await
852        {
853            Ok(witnessed) => witnessed,
854            Err(wide_err) => {
855                debug!(
856                    target = %hex::encode(address),
857                    "Wide witnessed lookup ({PUT_TARGET_WIDTH}) failed ({wide_err}); \
858                     retrying at close-group width ({required})"
859                );
860                self.network()
861                    .find_witnessed_close_group_with_view_count(
862                        address,
863                        required,
864                        SINGLE_NODE_WITNESSED_VIEW_COUNT,
865                    )
866                    .await
867                    .map_err(|e| {
868                        Error::InsufficientPeers(format!(
869                            "Witnessed close group lookup failed before payment for target {}: {e}",
870                            hex::encode(address)
871                        ))
872                    })?
873            }
874        };
875        // Run quoting/quorum on the closest CLOSE_GROUP_SIZE only, so payment
876        // semantics are unaffected by the wider PUT query.
877        let witnessed_quote = scope_witnessed_to_close_group(&witnessed);
878        let base_quorum = witnessed_close_group_quorum();
879        let missing_views = missing_witnessed_responder_views(&witnessed_quote);
880        let quorum = witnessed_close_group_quorum_for_transcript(&witnessed_quote);
881
882        if missing_views > 0 {
883            warn!(
884                target = %hex::encode(address),
885                initial = witnessed_quote.initial_closest.len(),
886                responder_views = witnessed_quote.responder_views.len(),
887                missing_views = missing_views,
888                base_quorum = base_quorum,
889                adjusted_quorum = quorum,
890                "Witnessed close group transcript is missing responder views; lowering SNP witness quorum"
891            );
892        }
893
894        debug!(
895            target = %hex::encode(address),
896            quorum = quorum,
897            view_count = SINGLE_NODE_WITNESSED_VIEW_COUNT,
898            initial = ?witnessed_initial_peers(&witnessed_quote),
899            responder_views = ?witnessed_responder_views(&witnessed_quote),
900            vote_counts = ?witnessed_vote_counts(&witnessed_quote, address),
901            final_witnessed_set = ?witnessed_consensus(&witnessed_quote, address, quorum),
902            "Witnessed close group selected for SNP quote collection"
903        );
904
905        let mut selection =
906            witnessed_quote_selection_or_error(address, &witnessed_quote, required, quorum)?;
907        // Widen the PUT-target set to the closest PUT_TARGET_WIDTH
908        // directly-contacted peers; the quote set above stays the closest
909        // CLOSE_GROUP_SIZE. The same proof is reused on all of them.
910        selection.initial_put_peers = witnessed
911            .initial_closest
912            .iter()
913            .take(PUT_TARGET_WIDTH)
914            .map(|node| (node.peer_id, node.addresses_by_priority()))
915            .collect();
916        Ok(selection)
917    }
918
919    #[allow(clippy::too_many_lines)]
920    async fn collect_store_quotes_from_remote_peers(
921        &self,
922        address: &[u8; 32],
923        data_size: u64,
924        data_type: u32,
925        remote_peers: Vec<(PeerId, Vec<MultiAddr>)>,
926        quote_selection_policy: QuoteSelectionPolicy,
927    ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
928        let peer_query_count = remote_peers.len();
929
930        let node = self.network().node();
931
932        debug!(
933            "Requesting quotes from up to {peer_query_count} peers for address {} (size: {data_size})",
934            hex::encode(address)
935        );
936
937        if remote_peers.len() < CLOSE_GROUP_SIZE {
938            return Err(Error::InsufficientPeers(format!(
939                "Found {} peers, need {CLOSE_GROUP_SIZE}",
940                remote_peers.len()
941            )));
942        }
943        debug_assert!(peer_query_count >= CLOSE_GROUP_SIZE);
944
945        let per_peer_timeout = Duration::from_secs(self.config().quote_timeout_secs);
946        let overall_timeout = Duration::from_secs(QUOTE_COLLECTION_TIMEOUT_SECS);
947
948        // Collect quote responses. SNP/witnessed collection deliberately tries
949        // the closest witnessed peers first and only falls back to further
950        // witnessed peers when a closer peer fails to produce a usable quote.
951        let mut quotes = Vec::with_capacity(peer_query_count);
952        let mut already_stored_peers: Vec<(PeerId, [u8; 32])> = Vec::new();
953        let mut failures: Vec<String> = Vec::new();
954
955        // Track storer-rejecting peers separately (binding, content, signature
956        // failures) so we can surface their count in diagnostics — they're a
957        // special class of failure (peer misconfigured or hostile, not
958        // network-broken) and the user benefits from seeing them called out.
959        let mut bad_quote_count = 0usize;
960
961        let staged_witnessed_collection = matches!(
962            &quote_selection_policy,
963            QuoteSelectionPolicy::WitnessedMedianVoters { .. }
964        );
965
966        if staged_witnessed_collection {
967            let mut quote_futures = FuturesUnordered::new();
968            let mut next_peer_index = 0usize;
969            let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
970                tokio::time::timeout(overall_timeout, async {
971                    loop {
972                        let launch_count = witnessed_quote_launch_budget(
973                            quotes.len(),
974                            quote_futures.len(),
975                            remote_peers.len().saturating_sub(next_peer_index),
976                        );
977                        for _ in 0..launch_count {
978                            let (peer_id, peer_addrs) = &remote_peers[next_peer_index];
979                            next_peer_index += 1;
980                            quote_futures.push(request_store_quote_from_peer(
981                                node.clone(),
982                                *peer_id,
983                                peer_addrs.clone(),
984                                self.next_request_id(),
985                                *address,
986                                data_size,
987                                data_type,
988                                per_peer_timeout,
989                            ));
990                        }
991
992                        if quotes.len() >= CLOSE_GROUP_SIZE || quote_futures.is_empty() {
993                            break;
994                        }
995
996                        let Some((peer_id, addrs, quote_result)) = quote_futures.next().await
997                        else {
998                            break;
999                        };
1000                        record_store_quote_result(
1001                            peer_id,
1002                            addrs,
1003                            quote_result,
1004                            address,
1005                            &mut quotes,
1006                            &mut already_stored_peers,
1007                            &mut failures,
1008                            &mut bad_quote_count,
1009                        );
1010                    }
1011                    Ok(())
1012                })
1013                .await;
1014
1015            match collect_result {
1016                Err(_elapsed) => {
1017                    warn!(
1018                        "Quote collection timed out after {overall_timeout:?} for address {}",
1019                        hex::encode(address)
1020                    );
1021                }
1022                Ok(Err(e)) => return Err(e),
1023                Ok(Ok(())) => {}
1024            }
1025        } else {
1026            // Merkle preflight keeps the previous behaviour: query the full
1027            // over-query set concurrently because those quote responses are
1028            // only used as an already-stored probe.
1029            let mut quote_futures = FuturesUnordered::new();
1030
1031            for (peer_id, peer_addrs) in &remote_peers {
1032                quote_futures.push(request_store_quote_from_peer(
1033                    node.clone(),
1034                    *peer_id,
1035                    peer_addrs.clone(),
1036                    self.next_request_id(),
1037                    *address,
1038                    data_size,
1039                    data_type,
1040                    per_peer_timeout,
1041                ));
1042            }
1043
1044            let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
1045                tokio::time::timeout(overall_timeout, async {
1046                    while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await {
1047                        record_store_quote_result(
1048                            peer_id,
1049                            addrs,
1050                            quote_result,
1051                            address,
1052                            &mut quotes,
1053                            &mut already_stored_peers,
1054                            &mut failures,
1055                            &mut bad_quote_count,
1056                        );
1057                    }
1058                    Ok(())
1059                })
1060                .await;
1061
1062            match collect_result {
1063                Err(_elapsed) => {
1064                    warn!(
1065                        "Quote collection timed out after {overall_timeout:?} for address {}",
1066                        hex::encode(address)
1067                    );
1068                    // Fall through to check if we have enough quotes despite timeout.
1069                    // The timeout fires when slow peers haven't responded yet, but we
1070                    // may already have enough successful quotes from fast peers.
1071                }
1072                Ok(Err(e)) => return Err(e),
1073                Ok(Ok(())) => {}
1074            }
1075        }
1076
1077        // Defensive double-check: the per-peer handler already filters
1078        // bad-binding responses into `failures`, but if any path slipped a bad
1079        // quote into `quotes` (e.g. a future refactor) this catches it before
1080        // we sort by distance and return. `bad_dropped` should be 0 in normal
1081        // operation; non-zero indicates an upstream regression worth investigating.
1082        let bad_dropped = drop_quotes_with_bad_bindings(&mut quotes);
1083        if bad_dropped > 0 {
1084            warn!(
1085                "Defensive filter dropped {bad_dropped} quotes with mismatched peer bindings \
1086                 for address {} — the per-peer handler should have caught these earlier \
1087                 (this indicates an upstream regression)",
1088                hex::encode(address),
1089            );
1090            bad_quote_count += bad_dropped;
1091        }
1092
1093        // Check already-stored: only count votes from the closest CLOSE_GROUP_SIZE peers.
1094        if !already_stored_peers.is_empty() {
1095            let mut all_peers_by_distance: Vec<(bool, [u8; 32])> = Vec::new();
1096            for (peer_id, _, _, _) in &quotes {
1097                all_peers_by_distance.push((false, peer_xor_distance(peer_id, address)));
1098            }
1099            for (_, dist) in &already_stored_peers {
1100                all_peers_by_distance.push((true, *dist));
1101            }
1102            all_peers_by_distance.sort_by_key(|a| a.1);
1103
1104            let close_group_stored = all_peers_by_distance
1105                .iter()
1106                .take(CLOSE_GROUP_SIZE)
1107                .filter(|(is_stored, _)| *is_stored)
1108                .count();
1109
1110            if close_group_stored >= CLOSE_GROUP_MAJORITY {
1111                debug!(
1112                    "Chunk {} already stored ({close_group_stored}/{CLOSE_GROUP_SIZE} close-group peers confirm)",
1113                    hex::encode(address)
1114                );
1115                return Err(Error::AlreadyStored);
1116            }
1117        }
1118
1119        let already_stored_count = already_stored_peers.len();
1120        let failure_count = failures.len();
1121        let quote_count = quotes.len();
1122        let total_responses = quote_count + failure_count + already_stored_count;
1123
1124        if quotes.len() >= CLOSE_GROUP_SIZE {
1125            let selected_quotes = match quote_selection_policy {
1126                QuoteSelectionPolicy::ClosestByDistance => select_closest_quotes(quotes, address),
1127                QuoteSelectionPolicy::WitnessedMedianVoters {
1128                    voters_by_peer,
1129                    quorum,
1130                } => select_witnessed_median_voter_quotes(quotes, address, &voters_by_peer, quorum)
1131                    .ok_or_else(|| {
1132                        Error::InsufficientPeers(format!(
1133                            "Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} whose paid \
1134                                 median issuer is recognised by at least {} \
1135                                 selected witness peers ({total_responses} responses: \
1136                                 {already_stored_count} already_stored, {failure_count} failed \
1137                                 including {bad_quote_count} with mismatched peer bindings). \
1138                                 Failures: [{}]",
1139                            quorum,
1140                            failures.join("; ")
1141                        ))
1142                    })?,
1143            };
1144
1145            info!(
1146                "Collected {} quotes for address {} ({total_responses} responses: \
1147                 {quote_count} ok, {already_stored_count} already_stored, {failure_count} failed, \
1148                 {bad_quote_count} bad-binding)",
1149                selected_quotes.len(),
1150                hex::encode(address),
1151            );
1152            return Ok(selected_quotes);
1153        }
1154
1155        Err(Error::InsufficientPeers(format!(
1156            "Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} ({total_responses} responses: \
1157             {already_stored_count} already_stored, {failure_count} failed including \
1158             {bad_quote_count} with mismatched peer bindings). Failures: [{}]",
1159            failures.join("; ")
1160        )))
1161    }
1162}
1163
1164#[cfg(test)]
1165#[allow(clippy::unwrap_used, clippy::expect_used)]
1166mod tests {
1167    //! Test fixtures use real ML-DSA-65 keypairs (1952-byte public keys), the
1168    //! same key material that ships on the wire. The "bad" quote is built by
1169    //! **swapping** the public key field with a different real keypair's
1170    //! public key — the exact shape produced by the Apr 30 production
1171    //! failure (an operator running two co-located identities with crossed
1172    //! quote-signing keys). Signatures are not exercised here because this
1173    //! filter only mirrors `validate_peer_bindings` (BLAKE3 binding); see
1174    //! the doc-comment on `quote_binding_is_valid` for why
1175    //! `verify_quote_signature` and `verify_quote_content` are deliberately
1176    //! NOT mirrored.
1177
1178    use super::*;
1179    use ant_protocol::evm::RewardsAddress;
1180    use ant_protocol::pqc::ops::{MlDsaOperations, MlDsaPublicKey};
1181    use ant_protocol::transport::{DHTNode, MlDsa65, ResponderView, WitnessedCloseGroup};
1182    use std::time::SystemTime;
1183    use xor_name::XorName;
1184
1185    /// A real ML-DSA-65 keypair plus its derived peer ID.
1186    struct Keypair {
1187        peer_id: PeerId,
1188        pub_key_bytes: Vec<u8>,
1189    }
1190
1191    fn gen_keypair() -> Keypair {
1192        let ml_dsa = MlDsa65::new();
1193        let (pub_key, _sk) = ml_dsa.generate_keypair().expect("ML-DSA-65 keygen");
1194        let pub_key_bytes = pub_key.as_bytes().to_vec();
1195        let peer_id = PeerId::from_bytes(compute_address(&pub_key_bytes));
1196        Keypair {
1197            peer_id,
1198            pub_key_bytes,
1199        }
1200    }
1201
1202    /// Build a quote tuple whose `pub_key` correctly hashes to its peer_id.
1203    /// Signature is left empty: this filter does not verify signatures.
1204    fn good_quote_real() -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
1205        let kp = gen_keypair();
1206        let quote = PaymentQuote {
1207            content: XorName([0u8; 32]),
1208            timestamp: SystemTime::UNIX_EPOCH,
1209            price: Amount::ZERO,
1210            rewards_address: RewardsAddress::new([0u8; 20]),
1211            pub_key: kp.pub_key_bytes,
1212            signature: Vec::new(),
1213        };
1214        (kp.peer_id, Vec::new(), quote, Amount::ZERO)
1215    }
1216
1217    /// Build a quote tuple where the quote carries a different keypair's
1218    /// `pub_key` than the peer_id derives from. Mirrors the production
1219    /// failure shape: peer A advertised on the transport, but the quote
1220    /// carries peer B's key.
1221    fn bad_quote_real() -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
1222        let claimed = gen_keypair();
1223        let signing = gen_keypair();
1224        assert_ne!(claimed.pub_key_bytes, signing.pub_key_bytes);
1225        assert_ne!(claimed.peer_id.as_bytes(), signing.peer_id.as_bytes());
1226        let quote = PaymentQuote {
1227            content: XorName([0u8; 32]),
1228            timestamp: SystemTime::UNIX_EPOCH,
1229            price: Amount::ZERO,
1230            rewards_address: RewardsAddress::new([0u8; 20]),
1231            pub_key: signing.pub_key_bytes,
1232            signature: Vec::new(),
1233        };
1234        (claimed.peer_id, Vec::new(), quote, Amount::ZERO)
1235    }
1236
1237    fn witnessed_test_node(seed: u8) -> DHTNode {
1238        DHTNode {
1239            peer_id: PeerId::from_bytes([seed; 32]),
1240            addresses: Vec::new(),
1241            address_types: Vec::new(),
1242            distance: None,
1243            reliability: 1.0,
1244        }
1245    }
1246
1247    fn witnessed_test_nodes(seeds: &[u8]) -> Vec<DHTNode> {
1248        seeds.iter().copied().map(witnessed_test_node).collect()
1249    }
1250
1251    fn witnessed_test_view(responder: u8, closest: &[u8]) -> ResponderView {
1252        ResponderView {
1253            responder: PeerId::from_bytes([responder; 32]),
1254            closest: witnessed_test_nodes(closest),
1255        }
1256    }
1257
1258    fn synthetic_peer(seed: u8) -> PeerId {
1259        PeerId::from_bytes([seed; 32])
1260    }
1261
1262    fn synthetic_quote(seed: u8, price: u64) -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
1263        let amount = Amount::from(price);
1264        let quote = PaymentQuote {
1265            content: XorName([0u8; 32]),
1266            timestamp: SystemTime::UNIX_EPOCH,
1267            price: amount,
1268            rewards_address: RewardsAddress::new([0u8; 20]),
1269            pub_key: Vec::new(),
1270            signature: Vec::new(),
1271        };
1272        (synthetic_peer(seed), Vec::new(), quote, amount)
1273    }
1274
1275    fn synthetic_voters(seeds: &[u8]) -> HashSet<PeerId> {
1276        seeds.iter().copied().map(synthetic_peer).collect()
1277    }
1278
1279    fn quote_peer_seeds(quotes: &[(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)]) -> Vec<u8> {
1280        quotes
1281            .iter()
1282            .map(|(peer_id, _, _, _)| peer_id.as_bytes()[0])
1283            .collect()
1284    }
1285
1286    fn put_peer_seeds(peers: &[(PeerId, Vec<MultiAddr>)]) -> Vec<u8> {
1287        peers
1288            .iter()
1289            .map(|(peer_id, _)| peer_id.as_bytes()[0])
1290            .collect()
1291    }
1292
1293    fn put_peers_from_seeds(seeds: &[u8]) -> Vec<(PeerId, Vec<MultiAddr>)> {
1294        seeds
1295            .iter()
1296            .copied()
1297            .map(|seed| (synthetic_peer(seed), Vec::new()))
1298            .collect()
1299    }
1300
1301    /// Independent re-implementation of the storer-side binding spec
1302    /// (`ant-node/src/payment/verifier.rs::validate_peer_bindings` +
1303    /// `peer_id_from_public_key_bytes`):
1304    /// (a) `pub_key` parses as ML-DSA-65 (length 1952), and
1305    /// (b) `BLAKE3(pub_key) == peer_id`.
1306    ///
1307    /// Re-derived from spec, NOT delegating to `quote_binding_is_valid`,
1308    /// so cross-checks are not "function == itself".
1309    fn storer_binding_would_accept(peer_id: &PeerId, quote: &PaymentQuote) -> bool {
1310        if MlDsaPublicKey::from_bytes(&quote.pub_key).is_err() {
1311            return false;
1312        }
1313        compute_address(&quote.pub_key) == *peer_id.as_bytes()
1314    }
1315
1316    // ============================================================
1317    // Tests for `quote_binding_is_valid` (the predicate)
1318    // ============================================================
1319
1320    #[test]
1321    fn binding_accepts_real_self_consistent_keypair() {
1322        let (peer_id, _, quote, _) = good_quote_real();
1323        // Property under test: the predicate accepts a quote whose pub_key
1324        // genuinely belongs to the claimed peer.
1325        assert!(quote_binding_is_valid(&peer_id, &quote));
1326        // Cross-check against the independent full storer-spec implementation.
1327        assert!(storer_binding_would_accept(&peer_id, &quote));
1328    }
1329
1330    #[test]
1331    fn binding_rejects_real_crossed_keypair() {
1332        let (peer_id, _, quote, _) = bad_quote_real();
1333        assert!(!quote_binding_is_valid(&peer_id, &quote));
1334        assert!(!storer_binding_would_accept(&peer_id, &quote));
1335    }
1336
1337    #[test]
1338    fn binding_rejects_oversize_pubkey() {
1339        // A pub_key longer than ML-DSA-65 (1952 bytes) must be rejected
1340        // even if BLAKE3 happens to agree, because the storer rejects on
1341        // length first via `peer_id_from_public_key_bytes`.
1342        let oversized = vec![0u8; ML_DSA_PUB_KEY_LEN + 1];
1343        let peer_id = PeerId::from_bytes(compute_address(&oversized));
1344        let quote = PaymentQuote {
1345            content: XorName([0u8; 32]),
1346            timestamp: SystemTime::UNIX_EPOCH,
1347            price: Amount::ZERO,
1348            rewards_address: RewardsAddress::new([0u8; 20]),
1349            pub_key: oversized,
1350            signature: Vec::new(),
1351        };
1352        // BLAKE3(pub_key) DOES equal the peer_id we constructed, so the
1353        // bare hash check would pass — but the length guard must reject.
1354        assert_eq!(compute_address(&quote.pub_key), *peer_id.as_bytes());
1355        assert!(
1356            !quote_binding_is_valid(&peer_id, &quote),
1357            "predicate must reject oversize pub_key even when BLAKE3 happens to match"
1358        );
1359        assert!(!storer_binding_would_accept(&peer_id, &quote));
1360    }
1361
1362    #[test]
1363    fn binding_rejects_undersize_pubkey() {
1364        let undersized = vec![0u8; ML_DSA_PUB_KEY_LEN - 1];
1365        let peer_id = PeerId::from_bytes(compute_address(&undersized));
1366        let quote = PaymentQuote {
1367            content: XorName([0u8; 32]),
1368            timestamp: SystemTime::UNIX_EPOCH,
1369            price: Amount::ZERO,
1370            rewards_address: RewardsAddress::new([0u8; 20]),
1371            pub_key: undersized,
1372            signature: Vec::new(),
1373        };
1374        assert!(!quote_binding_is_valid(&peer_id, &quote));
1375        assert!(!storer_binding_would_accept(&peer_id, &quote));
1376    }
1377
1378    // ============================================================
1379    // Tests for the filter (`drop_quotes_with_bad_bindings`)
1380    // ============================================================
1381
1382    #[test]
1383    fn quote_query_counts_keep_single_node_close_group_only() {
1384        assert_eq!(single_node_quote_query_count(), CLOSE_GROUP_SIZE);
1385        assert_eq!(SINGLE_NODE_WITNESSED_VIEW_COUNT, 20);
1386        assert!(SINGLE_NODE_WITNESSED_VIEW_COUNT > single_node_quote_query_count());
1387        assert_eq!(witnessed_close_group_quorum(), 5);
1388        assert_eq!(witnessed_close_group_quorum_for_missing_views(0), 5);
1389        assert_eq!(witnessed_close_group_quorum_for_missing_views(1), 4);
1390        assert_eq!(witnessed_close_group_quorum_for_missing_views(2), 3);
1391        assert_eq!(
1392            fault_tolerant_quote_query_count(),
1393            CLOSE_GROUP_SIZE * FAULT_TOLERANT_QUOTE_QUERY_MULTIPLIER
1394        );
1395        assert!(fault_tolerant_quote_query_count() > single_node_quote_query_count());
1396    }
1397
1398    #[test]
1399    fn witnessed_quote_launch_budget_keeps_exact_quote_window() {
1400        assert_eq!(
1401            witnessed_quote_launch_budget(0, 0, CLOSE_GROUP_SIZE * 2),
1402            CLOSE_GROUP_SIZE,
1403            "initial SNP quote fetch should launch the closest seven peers"
1404        );
1405        assert_eq!(
1406            witnessed_quote_launch_budget(1, CLOSE_GROUP_SIZE - 1, CLOSE_GROUP_SIZE),
1407            0,
1408            "a successful quote should not launch an extra fallback"
1409        );
1410        assert_eq!(
1411            witnessed_quote_launch_budget(0, CLOSE_GROUP_SIZE - 1, CLOSE_GROUP_SIZE),
1412            1,
1413            "a failed in-flight quote should launch the next closest fallback"
1414        );
1415        assert_eq!(
1416            witnessed_quote_launch_budget(CLOSE_GROUP_SIZE - 1, 0, 3),
1417            1,
1418            "only one more peer is needed for the seventh quote"
1419        );
1420        assert_eq!(
1421            witnessed_quote_launch_budget(0, 0, CLOSE_GROUP_SIZE - 1),
1422            CLOSE_GROUP_SIZE - 1,
1423            "launch budget is capped by remaining candidates"
1424        );
1425    }
1426
1427    #[test]
1428    fn witnessed_candidates_sort_by_xor_distance_then_votes() {
1429        let address = [0u8; 32];
1430        let witnessed = WitnessedCloseGroup {
1431            target: address,
1432            k: CLOSE_GROUP_SIZE,
1433            initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1434            responder_views: vec![
1435                witnessed_test_view(1, &[1, 9]),
1436                witnessed_test_view(2, &[1, 9]),
1437                witnessed_test_view(3, &[1, 9]),
1438                witnessed_test_view(4, &[1, 9]),
1439                witnessed_test_view(5, &[1, 9]),
1440                witnessed_test_view(6, &[9]),
1441                witnessed_test_view(7, &[9]),
1442            ],
1443        };
1444
1445        let candidates =
1446            witnessed_consensus_candidates(&witnessed, &address, witnessed_close_group_quorum());
1447
1448        assert_eq!(
1449            candidates
1450                .iter()
1451                .map(|candidate| candidate.node.peer_id.as_bytes()[0])
1452                .collect::<Vec<_>>(),
1453            vec![1, 9],
1454            "XOR closeness must be the primary sort before quote collection"
1455        );
1456    }
1457
1458    /// Ascending seeds `1..=count`, each a valid `u8` peer seed.
1459    fn ascending_seeds(count: usize) -> Vec<u8> {
1460        (1..=count)
1461            .map(|n| u8::try_from(n).expect("test seed fits in u8"))
1462            .collect()
1463    }
1464
1465    #[test]
1466    fn scope_witnessed_to_close_group_matches_native_close_group_query() {
1467        // How many of the closest-`CLOSE_GROUP_SIZE` responders returned a view.
1468        // The remainder are "missing", so the scoped transcript also exercises
1469        // the missing-views quorum adjustment.
1470        const RESPONDED_IN_SCOPE: usize = 5;
1471        // Responders past the close group whose views scoping must drop.
1472        const OUT_OF_SCOPE_RESPONDERS: usize = 2;
1473
1474        let address = [0u8; 32];
1475        let close_seeds = ascending_seeds(CLOSE_GROUP_SIZE);
1476        // Each view's closest list mixes in-group (1, 2) and far (8, 9) peers so
1477        // candidate selection is non-trivial and must survive scoping verbatim.
1478        let view_closest = [1, 2, 8, 9];
1479        let in_scope_views = || -> Vec<ResponderView> {
1480            ascending_seeds(RESPONDED_IN_SCOPE)
1481                .into_iter()
1482                .map(|responder| witnessed_test_view(responder, &view_closest))
1483                .collect()
1484        };
1485
1486        // A wide PUT_TARGET_WIDTH-peer transcript, ordered closest-first (seed n
1487        // == PeerId [n; 32], whose XOR distance to the zero address is n). Two
1488        // responders past the close group (out of scope) must be dropped.
1489        let mut wide_views = in_scope_views();
1490        for offset in 1..=OUT_OF_SCOPE_RESPONDERS {
1491            let responder =
1492                u8::try_from(CLOSE_GROUP_SIZE + offset).expect("out-of-scope seed fits in u8");
1493            wide_views.push(witnessed_test_view(responder, &[1, 2, 3]));
1494        }
1495        let wide = WitnessedCloseGroup {
1496            target: address,
1497            k: PUT_TARGET_WIDTH,
1498            initial_closest: witnessed_test_nodes(&ascending_seeds(PUT_TARGET_WIDTH)),
1499            responder_views: wide_views,
1500        };
1501
1502        // The hand-built equivalent: a native CLOSE_GROUP_SIZE-wide query with
1503        // the same in-scope responders.
1504        let native = WitnessedCloseGroup {
1505            target: address,
1506            k: CLOSE_GROUP_SIZE,
1507            initial_closest: witnessed_test_nodes(&close_seeds),
1508            responder_views: in_scope_views(),
1509        };
1510
1511        let scoped = scope_witnessed_to_close_group(&wide);
1512
1513        // Target preserved; k and the initial set collapse to the close group.
1514        assert_eq!(scoped.target, wide.target);
1515        assert_eq!(scoped.k, CLOSE_GROUP_SIZE);
1516        assert_eq!(
1517            scoped
1518                .initial_closest
1519                .iter()
1520                .map(|node| node.peer_id.as_bytes()[0])
1521                .collect::<Vec<_>>(),
1522            close_seeds,
1523            "initial set must be the closest CLOSE_GROUP_SIZE, in order"
1524        );
1525
1526        // Out-of-close-group responder views are dropped; the in-scope ones keep
1527        // their closest lists untouched (scoping filters by responder only).
1528        assert_eq!(
1529            scoped
1530                .responder_views
1531                .iter()
1532                .map(|view| view.responder.as_bytes()[0])
1533                .collect::<Vec<_>>(),
1534            ascending_seeds(RESPONDED_IN_SCOPE),
1535            "only responders inside the close group survive"
1536        );
1537        assert_eq!(
1538            scoped.responder_views[0]
1539                .closest
1540                .iter()
1541                .map(|node| node.peer_id.as_bytes()[0])
1542                .collect::<Vec<_>>(),
1543            view_closest.to_vec(),
1544            "a surviving view's closest set must be preserved verbatim"
1545        );
1546
1547        // The quorum math and candidate consensus run on the close group only
1548        // and are byte-for-byte identical to the native CLOSE_GROUP_SIZE query.
1549        assert_eq!(
1550            missing_witnessed_responder_views(&scoped),
1551            missing_witnessed_responder_views(&native),
1552        );
1553        let quorum = witnessed_close_group_quorum_for_transcript(&scoped);
1554        assert_eq!(quorum, witnessed_close_group_quorum_for_transcript(&native));
1555        let candidate_seeds = |group: &WitnessedCloseGroup| {
1556            witnessed_consensus_candidates(group, &address, quorum)
1557                .iter()
1558                .map(|candidate| candidate.node.peer_id.as_bytes()[0])
1559                .collect::<Vec<_>>()
1560        };
1561        assert_eq!(
1562            candidate_seeds(&scoped),
1563            candidate_seeds(&native),
1564            "scoped consensus must match a native close-group query"
1565        );
1566    }
1567
1568    #[test]
1569    fn witnessed_quote_peers_error_is_typed_and_pre_payment_when_consensus_is_short() {
1570        let address = [0u8; 32];
1571        let responder_views = (1..=7)
1572            .map(|responder| witnessed_test_view(responder, &[1, 2, 3, 4]))
1573            .collect();
1574        let witnessed = WitnessedCloseGroup {
1575            target: address,
1576            k: CLOSE_GROUP_SIZE,
1577            initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1578            responder_views,
1579        };
1580
1581        let err = witnessed_quote_selection_or_error(
1582            &address,
1583            &witnessed,
1584            CLOSE_GROUP_SIZE,
1585            witnessed_close_group_quorum(),
1586        )
1587        .expect_err("short witnessed consensus must fail before payment");
1588
1589        match err {
1590            Error::InsufficientPeers(message) => {
1591                assert!(message.contains("before payment"));
1592                assert!(message.contains("vote_counts"));
1593                assert!(message.contains("quorum"));
1594            }
1595            other => panic!("expected typed InsufficientPeers error, got {other:?}"),
1596        }
1597    }
1598
1599    #[test]
1600    fn witnessed_quote_peers_include_quorum_fallback_candidates() {
1601        const EXTRA_QUORUM_CANDIDATES: usize = 1;
1602
1603        let address = [0u8; 32];
1604        let witnessed = WitnessedCloseGroup {
1605            target: address,
1606            k: CLOSE_GROUP_SIZE,
1607            initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1608            responder_views: vec![
1609                witnessed_test_view(1, &[1, 2, 3, 4, 5, 6, 7]),
1610                witnessed_test_view(2, &[1, 2, 3, 4, 5, 6, 8]),
1611                witnessed_test_view(3, &[1, 2, 3, 4, 5, 7, 8]),
1612                witnessed_test_view(4, &[1, 2, 3, 4, 6, 7, 8]),
1613                witnessed_test_view(5, &[1, 2, 3, 5, 6, 7, 8]),
1614                witnessed_test_view(6, &[1, 2, 4, 5, 6, 7, 8]),
1615                witnessed_test_view(7, &[1, 3, 4, 5, 6, 7, 8]),
1616            ],
1617        };
1618
1619        let selection = witnessed_quote_selection_or_error(
1620            &address,
1621            &witnessed,
1622            CLOSE_GROUP_SIZE,
1623            witnessed_close_group_quorum(),
1624        )
1625        .expect("fallback candidates should be retained for quote collection");
1626
1627        assert_eq!(
1628            selection.quote_peers.len(),
1629            CLOSE_GROUP_SIZE + EXTRA_QUORUM_CANDIDATES
1630        );
1631        assert_eq!(
1632            selection
1633                .quote_peers
1634                .iter()
1635                .map(|peer| peer.peer_id.as_bytes()[0])
1636                .collect::<Vec<_>>(),
1637            vec![1, 2, 3, 4, 5, 6, 7, 8]
1638        );
1639        assert_eq!(
1640            put_peer_seeds(&selection.initial_put_peers),
1641            vec![1, 2, 3, 4, 5, 6, 7]
1642        );
1643    }
1644
1645    #[test]
1646    fn witnessed_quote_peers_lower_quorum_for_missing_responder_views() {
1647        let address = [0u8; 32];
1648        let witnessed = WitnessedCloseGroup {
1649            target: address,
1650            k: CLOSE_GROUP_SIZE,
1651            initial_closest: witnessed_test_nodes(&[1, 2, 3, 4, 5, 6, 7]),
1652            responder_views: vec![
1653                witnessed_test_view(1, &[1, 2, 3, 4, 5, 6, 7]),
1654                witnessed_test_view(2, &[1, 2, 3, 4, 5, 6, 8]),
1655                witnessed_test_view(3, &[1, 2, 3, 4, 5, 7, 8]),
1656                witnessed_test_view(4, &[1, 2, 3, 4, 6, 7, 8]),
1657                witnessed_test_view(5, &[1, 2, 3, 5, 6, 7, 8]),
1658                witnessed_test_view(6, &[1, 2, 4, 5, 6, 7, 8]),
1659            ],
1660        };
1661        let quorum = witnessed_close_group_quorum_for_transcript(&witnessed);
1662
1663        assert_eq!(missing_witnessed_responder_views(&witnessed), 1);
1664        assert_eq!(quorum, 4);
1665
1666        let selection =
1667            witnessed_quote_selection_or_error(&address, &witnessed, CLOSE_GROUP_SIZE, quorum)
1668                .expect(
1669                    "one missing responder view should lower quorum and still select candidates",
1670                );
1671
1672        assert_eq!(
1673            selection
1674                .quote_peers
1675                .iter()
1676                .map(|peer| peer.peer_id.as_bytes()[0])
1677                .collect::<Vec<_>>(),
1678            vec![1, 2, 3, 4, 5, 6, 7, 8]
1679        );
1680        assert_eq!(selection.quorum, quorum);
1681    }
1682
1683    #[test]
1684    fn witnessed_quote_selection_keeps_closest_set_with_median_voter_quorum() {
1685        const MEDIAN_ISSUER_SEED: u8 = 7;
1686        const FAR_SUPPORTING_VOTER_SEED: u8 = 20;
1687        const UNSUCCESSFUL_SUPPORTING_VOTER_SEED: u8 = 21;
1688
1689        let address = [0u8; 32];
1690        let quotes = vec![
1691            synthetic_quote(1, 10),
1692            synthetic_quote(2, 20),
1693            synthetic_quote(3, 30),
1694            synthetic_quote(6, 50),
1695            synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1696            synthetic_quote(8, 60),
1697            synthetic_quote(9, 70),
1698            synthetic_quote(FAR_SUPPORTING_VOTER_SEED, 80),
1699        ];
1700        let mut voters_by_peer = HashMap::new();
1701        voters_by_peer.insert(
1702            synthetic_peer(MEDIAN_ISSUER_SEED),
1703            synthetic_voters(&[
1704                1,
1705                2,
1706                3,
1707                MEDIAN_ISSUER_SEED,
1708                FAR_SUPPORTING_VOTER_SEED,
1709                UNSUCCESSFUL_SUPPORTING_VOTER_SEED,
1710            ]),
1711        );
1712
1713        let quorum = witnessed_close_group_quorum();
1714        let selected =
1715            select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer, quorum)
1716                .expect("a supported close-group quote set should be selected");
1717
1718        assert_eq!(quote_peer_seeds(&selected), vec![1, 2, 3, 6, 7, 8, 9]);
1719        let (median_peer_id, _) =
1720            median_paid_quote_issuer(&selected).expect("selected quotes have a median");
1721        assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
1722        assert!(voters_by_peer[&median_peer_id].len() >= quorum);
1723    }
1724
1725    #[test]
1726    fn witnessed_quote_selection_uses_direct_median_witness_recognition() {
1727        const MEDIAN_ISSUER_SEED: u8 = 7;
1728
1729        let address = [0u8; 32];
1730        let quotes = vec![
1731            synthetic_quote(1, 10),
1732            synthetic_quote(2, 20),
1733            synthetic_quote(3, 30),
1734            synthetic_quote(4, 50),
1735            synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1736            synthetic_quote(8, 60),
1737            synthetic_quote(9, 70),
1738        ];
1739        let mut voters_by_peer = HashMap::new();
1740        voters_by_peer.insert(
1741            synthetic_peer(MEDIAN_ISSUER_SEED),
1742            synthetic_voters(&[20, 21, 22, 23, 24]),
1743        );
1744
1745        let quorum = witnessed_close_group_quorum();
1746        let selected =
1747            select_witnessed_median_voter_quotes(quotes, &address, &voters_by_peer, quorum)
1748                .expect("direct witness recognition should support the paid median issuer");
1749
1750        let (median_peer_id, _) =
1751            median_paid_quote_issuer(&selected).expect("selected quotes have a median");
1752        let selected_peers = selected
1753            .iter()
1754            .map(|(peer_id, _, _, _)| *peer_id)
1755            .collect::<HashSet<_>>();
1756        assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
1757        assert_eq!(
1758            voters_by_peer[&median_peer_id]
1759                .intersection(&selected_peers)
1760                .count(),
1761            0,
1762            "recognising witnesses need not also be selected quote issuers"
1763        );
1764        assert_eq!(voters_by_peer[&median_peer_id].len(), quorum);
1765    }
1766
1767    #[test]
1768    fn witnessed_quote_selection_rejects_median_without_witness_quorum() {
1769        const MEDIAN_ISSUER_SEED: u8 = 7;
1770
1771        let address = [0u8; 32];
1772        let quotes = vec![
1773            synthetic_quote(1, 10),
1774            synthetic_quote(2, 20),
1775            synthetic_quote(3, 30),
1776            synthetic_quote(6, 50),
1777            synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1778            synthetic_quote(8, 60),
1779            synthetic_quote(9, 70),
1780            synthetic_quote(10, 80),
1781        ];
1782        let mut voters_by_peer = HashMap::new();
1783        voters_by_peer.insert(
1784            synthetic_peer(MEDIAN_ISSUER_SEED),
1785            synthetic_voters(&[1, 2, 3, 20]),
1786        );
1787
1788        let selected = select_witnessed_median_voter_quotes(
1789            quotes,
1790            &address,
1791            &voters_by_peer,
1792            witnessed_close_group_quorum(),
1793        );
1794
1795        assert!(
1796            selected.is_none(),
1797            "the selector must not return a paid quote set when fewer than the \
1798             witnessed median voter quorum recognised the paid median issuer"
1799        );
1800    }
1801
1802    #[test]
1803    fn put_peers_prioritise_median_voters_without_reordering_quotes() {
1804        const MEDIAN_ISSUER_SEED: u8 = 7;
1805
1806        let quotes = vec![
1807            synthetic_quote(1, 10),
1808            synthetic_quote(2, 20),
1809            synthetic_quote(3, 30),
1810            synthetic_quote(4, 50),
1811            synthetic_quote(5, 60),
1812            synthetic_quote(6, 70),
1813            synthetic_quote(MEDIAN_ISSUER_SEED, 40),
1814        ];
1815        let mut voters_by_peer = HashMap::new();
1816        voters_by_peer.insert(
1817            synthetic_peer(MEDIAN_ISSUER_SEED),
1818            synthetic_voters(&[3, 4, 5, 6, MEDIAN_ISSUER_SEED]),
1819        );
1820
1821        let put_candidates = put_peers_from_seeds(&[1, 2, 3, 4, 5, 6, 7]);
1822        let put_peers = put_peers_with_median_voters_first(
1823            &quotes,
1824            &put_candidates,
1825            &voters_by_peer,
1826            witnessed_close_group_quorum(),
1827        )
1828        .expect("median voters should produce an ordered PUT set");
1829
1830        assert_eq!(quote_peer_seeds(&quotes), vec![1, 2, 3, 4, 5, 6, 7]);
1831        let (median_peer_id, _) =
1832            median_paid_quote_issuer(&quotes).expect("selected quotes have a median");
1833        assert_eq!(median_peer_id, synthetic_peer(MEDIAN_ISSUER_SEED));
1834        assert_eq!(put_peer_seeds(&put_peers), vec![3, 4, 5, 6, 7, 1, 2]);
1835    }
1836
1837    #[test]
1838    fn filter_drops_only_bad_bindings_and_leaves_storer_acceptable_quotes() {
1839        let mut quotes = vec![
1840            good_quote_real(),
1841            bad_quote_real(),
1842            good_quote_real(),
1843            bad_quote_real(),
1844            good_quote_real(),
1845        ];
1846
1847        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1848
1849        assert_eq!(dropped, 2, "two crossed-key quotes must be dropped");
1850        assert_eq!(quotes.len(), 3, "three real-key quotes must remain");
1851
1852        // Cross-checked invariant: every retained quote would be accepted by
1853        // a storer running the full spec. The defensive filter only checks
1854        // the binding, so this asserts the binding-only filter is correct
1855        // for binding-only failures (other failure modes are filtered by
1856        // the per-peer classifier upstream).
1857        for (peer_id, _, quote, _) in &quotes {
1858            assert!(
1859                storer_binding_would_accept(peer_id, quote),
1860                "every retained quote must satisfy the full storer-side spec"
1861            );
1862        }
1863    }
1864
1865    #[test]
1866    fn filter_is_noop_when_all_quotes_are_storer_acceptable() {
1867        let mut quotes: Vec<_> = (0..5).map(|_| good_quote_real()).collect();
1868        let before = quotes.len();
1869        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1870        assert_eq!(dropped, 0);
1871        assert_eq!(quotes.len(), before);
1872        for (peer_id, _, quote, _) in &quotes {
1873            assert!(storer_binding_would_accept(peer_id, quote));
1874        }
1875    }
1876
1877    #[test]
1878    fn filter_drops_all_when_every_responder_is_bad() {
1879        // The "all hostile" case: every peer returned a bad binding. The
1880        // patch should leave us with zero quotes (not panic, not skip the
1881        // filter, not return malformed quotes). The caller then surfaces
1882        // InsufficientPeers.
1883        let mut quotes: Vec<_> = (0..fault_tolerant_quote_query_count())
1884            .map(|_| bad_quote_real())
1885            .collect();
1886        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1887        assert_eq!(dropped, fault_tolerant_quote_query_count());
1888        assert!(quotes.is_empty());
1889    }
1890
1891    #[test]
1892    fn filter_preserves_quote_payload_byte_for_byte() {
1893        // After filtering, the retained quotes must be untouched — pub_key,
1894        // signature, content, timestamp, price, rewards_address. The patch
1895        // is a filter, not a transformation; this test catches any future
1896        // regression that mutates a retained quote.
1897        let (peer_id, addrs, original_quote, amount) = good_quote_real();
1898        let mut quotes = vec![(peer_id, addrs.clone(), original_quote.clone(), amount)];
1899        let _ = drop_quotes_with_bad_bindings(&mut quotes);
1900
1901        let (kept_peer, kept_addrs, kept_quote, kept_amount) =
1902            quotes.pop().expect("the good quote must survive filtering");
1903        assert_eq!(kept_peer.as_bytes(), peer_id.as_bytes());
1904        assert_eq!(kept_addrs.len(), addrs.len());
1905        assert_eq!(kept_amount, amount);
1906        assert_eq!(kept_quote.pub_key, original_quote.pub_key);
1907        assert_eq!(kept_quote.signature, original_quote.signature);
1908        assert_eq!(kept_quote.content.0, original_quote.content.0);
1909        assert_eq!(kept_quote.timestamp, original_quote.timestamp);
1910        assert_eq!(kept_quote.price, original_quote.price);
1911        assert_eq!(kept_quote.rewards_address, original_quote.rewards_address);
1912    }
1913
1914    // ============================================================
1915    // The Apr 30 production-failure repro
1916    // ============================================================
1917
1918    /// Repro of the production failure from 2026-04-30 testnet runs.
1919    ///
1920    /// An external operator on `75.48.86.24` ran two co-located ant-node
1921    /// identities (peer `0755ecb55b…` and peer `073db92f…`) that crossed
1922    /// their quote-signing keys. Every chunk whose XOR-closest set happened
1923    /// to include peer `0755ecb5` got a payment proof with one malformed
1924    /// quote, and the storer's `validate_peer_bindings` rejected the
1925    /// entire close-group proof — burning the chunk's payment.
1926    ///
1927    /// This test proves the fault-tolerant quote path still fixes that failure
1928    /// shape:
1929    ///
1930    /// 1. We assemble `2x CLOSE_GROUP_SIZE` real ML-DSA-65 quotes — the same
1931    ///    buffer merkle preflight and merkle-mode estimates retain for probes.
1932    /// 2. One of them is a *crossed-key* quote — the production failure shape.
1933    /// 3. We run an independent `storer_would_accept` check (re-derived from
1934    ///    the storer spec, not from `quote_binding_is_valid`) over the
1935    ///    pre-filter set; we confirm the bad peer is rejected, proving the
1936    ///    storer **would** burn the chunk's payment if we proceeded unfiltered.
1937    /// 4. We run `drop_quotes_with_bad_bindings`.
1938    /// 5. We re-run `storer_would_accept` over the post-filter set; we confirm
1939    ///    EVERY remaining quote would be accepted, proving the filtered set
1940    ///    will not trigger the `validate_peer_bindings` rejection that caused
1941    ///    the Apr 30 outage.
1942    /// 6. We confirm the post-filter set has at least `CLOSE_GROUP_SIZE`
1943    ///    quotes — the over-query buffer (2x) is sufficient.
1944    #[test]
1945    fn repro_apr_30_storer_would_have_rejected_pre_filter_and_accepts_post_filter() {
1946        let over_query_count = fault_tolerant_quote_query_count();
1947        let mut quotes: Vec<_> = (0..over_query_count - 1)
1948            .map(|_| good_quote_real())
1949            .collect();
1950        // Splice the crossed-key quote in the middle (mirrors the random
1951        // position the bad peer takes in the DHT-returned closest set).
1952        quotes.insert(over_query_count / 2, bad_quote_real());
1953        assert_eq!(quotes.len(), over_query_count);
1954
1955        // Step 1: prove the storer would reject the pre-filter set.
1956        let storer_would_reject_count = quotes
1957            .iter()
1958            .filter(|(p, _, q, _)| !storer_binding_would_accept(p, q))
1959            .count();
1960        assert_eq!(
1961            storer_would_reject_count, 1,
1962            "exactly one quote (the crossed-key one) must be rejected by the storer spec"
1963        );
1964
1965        // Step 2: run the patched filter.
1966        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
1967        assert_eq!(dropped, 1, "exactly the crossed-key quote must be filtered");
1968
1969        // Step 3: prove the storer would accept every survivor under the FULL spec.
1970        for (peer_id, _, quote, _) in &quotes {
1971            assert!(
1972                storer_binding_would_accept(peer_id, quote),
1973                "every post-filter quote must be accepted by the storer spec — \
1974                 this is what the filter guarantees before any quote set is used"
1975            );
1976        }
1977
1978        // Step 4: prove the over-query buffer is sufficient to refill.
1979        assert!(
1980            quotes.len() >= CLOSE_GROUP_SIZE,
1981            "after filtering, at least CLOSE_GROUP_SIZE good quotes must remain \
1982             so a fault-tolerant probe can still return a full close group"
1983        );
1984    }
1985
1986    /// When more than the over-query buffer of peers misbehave, the filter
1987    /// must NOT silently produce a short proof. The downstream caller in
1988    /// `get_store_quotes` must see fewer than `CLOSE_GROUP_SIZE` survivors
1989    /// and return `InsufficientPeers`.
1990    #[test]
1991    fn filter_leaves_short_set_when_too_many_bad_peers() {
1992        let good_count = CLOSE_GROUP_SIZE - 1;
1993        let bad_count = fault_tolerant_quote_query_count() - good_count;
1994        let mut quotes: Vec<_> = std::iter::repeat_with(bad_quote_real)
1995            .take(bad_count)
1996            .chain(std::iter::repeat_with(good_quote_real).take(good_count))
1997            .collect();
1998
1999        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
2000        assert_eq!(dropped, bad_count);
2001        assert!(
2002            quotes.len() < CLOSE_GROUP_SIZE,
2003            "this is the precondition for InsufficientPeers downstream"
2004        );
2005        // Sanity: every survivor is storer-acceptable under the full spec.
2006        for (peer_id, _, quote, _) in &quotes {
2007            assert!(storer_binding_would_accept(peer_id, quote));
2008        }
2009    }
2010
2011    // ============================================================
2012    // Tests for the per-peer response classifier (the PRIMARY defense).
2013    //
2014    // These tests exercise the production code path that runs inside
2015    // get_store_quotes' per-peer async closure. The defensive
2016    // `drop_quotes_with_bad_bindings` is a second line of defence —
2017    // these tests make sure the FIRST line is what actually catches
2018    // misbehaving peers in production. Without these, a regression
2019    // that removes the per-peer check could be masked by the post-
2020    // collect filter and pass the rest of the suite.
2021    // ============================================================
2022
2023    /// Helper: serialize a `PaymentQuote` to bytes the way the wire layer
2024    /// does (rmp_serde / msgpack), to feed into `classify_quote_response`.
2025    fn serialize_quote(quote: &PaymentQuote) -> Vec<u8> {
2026        rmp_serde::to_vec(quote).expect("serialize quote")
2027    }
2028
2029    #[test]
2030    fn classifier_accepts_real_self_consistent_quote() {
2031        let (peer_id, _, quote, _) = good_quote_real();
2032        let bytes = serialize_quote(&quote);
2033        let result = classify_quote_response(&peer_id, &bytes, false);
2034        match result {
2035            Ok((q, price)) => {
2036                assert_eq!(q.pub_key, quote.pub_key);
2037                assert_eq!(price, quote.price);
2038            }
2039            Err(e) => panic!("expected Ok, got {e}"),
2040        }
2041    }
2042
2043    #[test]
2044    fn classifier_rejects_crossed_keypair_with_typed_error() {
2045        let (peer_id, _, quote, _) = bad_quote_real();
2046        let bytes = serialize_quote(&quote);
2047        let result = classify_quote_response(&peer_id, &bytes, false);
2048        match result {
2049            Err(Error::BadQuoteBinding {
2050                peer_id: pid,
2051                detail,
2052            }) => {
2053                assert_eq!(pid, peer_id.to_string());
2054                assert!(
2055                    detail.contains("BLAKE3(pub_key)="),
2056                    "diagnostic detail must include the derived peer id: {detail}"
2057                );
2058            }
2059            other => panic!("expected BadQuoteBinding for crossed-key quote, got {other:?}"),
2060        }
2061    }
2062
2063    /// CRITICAL: a misbehaving peer that votes `already_stored=true` must
2064    /// NOT be allowed to influence the close-group "already stored"
2065    /// majority decision. The bind-check runs before the AlreadyStored
2066    /// short-circuit, so a crossed-key peer voting "already stored" is
2067    /// classified as `BadQuoteBinding`, not `AlreadyStored`.
2068    ///
2069    /// This locks in a specific reviewer concern from round 1:
2070    ///   "A peer with a crossed/garbage signing key could simply respond
2071    ///   already_stored=true and its vote enters already_stored_peers
2072    ///   unfiltered."
2073    #[test]
2074    fn classifier_rejects_already_stored_vote_from_bad_binding_peer() {
2075        let (peer_id, _, quote, _) = bad_quote_real();
2076        let bytes = serialize_quote(&quote);
2077        // The peer claims already_stored=true, but its quote has a crossed key.
2078        let result = classify_quote_response(&peer_id, &bytes, true);
2079        assert!(
2080            matches!(result, Err(Error::BadQuoteBinding { .. })),
2081            "crossed-key peer must be classified BadQuoteBinding even when \
2082             voting already_stored=true; got {result:?}"
2083        );
2084    }
2085
2086    /// An honest peer's `already_stored=true` vote IS honoured (after
2087    /// passing the bind-check). This is the contrast to the test above.
2088    #[test]
2089    fn classifier_honours_already_stored_vote_from_good_binding_peer() {
2090        let (peer_id, _, quote, _) = good_quote_real();
2091        let bytes = serialize_quote(&quote);
2092        let result = classify_quote_response(&peer_id, &bytes, true);
2093        assert!(
2094            matches!(result, Err(Error::AlreadyStored)),
2095            "honest peer's already_stored vote must be honoured; got {result:?}"
2096        );
2097    }
2098
2099    #[test]
2100    fn classifier_returns_serialization_error_on_bad_bytes() {
2101        let (peer_id, _, _, _) = good_quote_real();
2102        let garbage = b"this is not a valid msgpack PaymentQuote".to_vec();
2103        let result = classify_quote_response(&peer_id, &garbage, false);
2104        assert!(
2105            matches!(result, Err(Error::Serialization(_))),
2106            "garbage bytes must produce a Serialization error; got {result:?}"
2107        );
2108    }
2109
2110    /// Cross-validate the classifier's binding verdict against the
2111    /// independent storer-spec re-derivation across mixed responders.
2112    #[test]
2113    fn classifier_verdict_matches_storer_binding_spec_for_mixed_responders() {
2114        let mut responders: Vec<(PeerId, PaymentQuote)> = (0..12)
2115            .map(|_| {
2116                let (p, _, q, _) = good_quote_real();
2117                (p, q)
2118            })
2119            .collect();
2120        for _ in 0..4 {
2121            let (p, _, q, _) = bad_quote_real();
2122            responders.push((p, q));
2123        }
2124
2125        for (peer_id, quote) in &responders {
2126            let bytes = serialize_quote(quote);
2127            let storer_verdict = storer_binding_would_accept(peer_id, quote);
2128            let classifier_verdict = classify_quote_response(peer_id, &bytes, false).is_ok();
2129            assert_eq!(
2130                classifier_verdict, storer_verdict,
2131                "classifier and storer-binding-spec must agree on every responder \
2132                 (peer_id={}, storer={storer_verdict}, classifier={classifier_verdict})",
2133                peer_id
2134            );
2135        }
2136    }
2137}