Skip to main content

ant_core/data/client/
quote.rs

1//! Quote and payment operations.
2//!
3//! Handles requesting storage quotes from network nodes and
4//! managing payment for data storage.
5
6use crate::data::client::peer_cache::record_peer_outcome;
7use crate::data::client::peer_xor_distance;
8use crate::data::client::Client;
9use crate::data::error::{Error, Result};
10use ant_protocol::evm::{Amount, PaymentQuote};
11use ant_protocol::transport::{MultiAddr, PeerId};
12use ant_protocol::{
13    compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
14    ChunkQuoteRequest, ChunkQuoteResponse, CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE,
15};
16use futures::stream::{FuturesUnordered, StreamExt};
17use std::time::{Duration, Instant};
18use tracing::{debug, info, warn};
19
20/// ML-DSA-65 public key length in bytes. Mirrors the same value defined as
21/// `pub const ML_DSA_65_PUBLIC_KEY_SIZE` in `saorsa-pqc::pqc::types`, which
22/// the storer's `peer_id_from_public_key_bytes` enforces. We keep a local
23/// copy here rather than adding a direct `saorsa-pqc` dep — the constant
24/// is FIPS-mandated for ML-DSA-65 and won't change unless we change variant.
25///
26/// TODO: switch to `saorsa_pqc::pqc::types::ML_DSA_65_PUBLIC_KEY_SIZE` once
27/// `ant-protocol` re-exports it (`pqc::ops::ML_DSA_65_PUBLIC_KEY_SIZE`).
28const ML_DSA_PUB_KEY_LEN: usize = 1952;
29
30/// Check that a quote's `pub_key` is well-formed and BLAKE3-hashes to the
31/// claimed `peer_id`.
32///
33/// The storer node enforces both constraints in `ant-node/src/payment/verifier.rs`
34/// (via `peer_id_from_public_key_bytes` and `validate_peer_bindings`): every
35/// quote inside a `ProofOfPayment` must (a) have a 1952-byte `pub_key` parsable
36/// as ML-DSA-65 and (b) satisfy `BLAKE3(pub_key) == peer_id`. A single quote
37/// failing either check causes the storer to reject the entire close-group
38/// proof and burn the chunk's payment.
39///
40/// We mirror the cheap structural check here. The storer also runs
41/// `verify_quote_content` and `verify_quote_signature`; those are ML-DSA
42/// verifications (~1 ms × 14 quotes × every chunk) and are deliberately NOT
43/// mirrored on the client to keep upload latency unchanged. They are tracked
44/// as a follow-up if a real attack surfaces them.
45fn quote_binding_is_valid(peer_id: &PeerId, quote: &PaymentQuote) -> bool {
46    if quote.pub_key.len() != ML_DSA_PUB_KEY_LEN {
47        return false;
48    }
49    compute_address(&quote.pub_key) == *peer_id.as_bytes()
50}
51
52/// Classification of a `ChunkQuoteResponse::Success` body for a single peer.
53///
54/// Mirrors the storer-side `validate_peer_bindings` check from
55/// `ant-node/src/payment/verifier.rs` — the cheap BLAKE3 binding —
56/// so we drop misbehaving peers' quotes before payment.
57///
58/// We deliberately do NOT mirror the storer's `verify_quote_signature`
59/// (ML-DSA-65 verify, ~1 ms × CLOSE_GROUP_SIZE × every chunk) or
60/// `verify_quote_content`. Those are useful defense-in-depth for an
61/// attacker who self-consistently crafts a signed-but-stolen or wrong-
62/// content quote, but they are NOT cheap and are out of scope for this
63/// fix. Adding them changes upload latency materially. Track them as a
64/// follow-up if a real attack surfaces them.
65///
66/// Pulling the logic out of the async closure lets us unit-test the
67/// primary defense (not just the post-collect defensive filter).
68///
69/// # Returns
70///
71/// - `Ok((quote, price))` — the response is honoured as a quote.
72/// - `Err(Error::AlreadyStored)` — the peer claims the chunk is already
73///   present AND the quote it provided binds to its peer ID. Vote counts.
74/// - `Err(Error::BadQuoteBinding { .. })` — bad binding (mirrors the
75///   storer-side rejection); the peer is treated as a failure so the
76///   AIMD cache learns to deprioritize it. Outer collector counts these
77///   via the typed variant (no string matching).
78/// - `Err(Error::Serialization(...))` — the quote bytes did not deserialize.
79fn classify_quote_response(
80    peer_id: &PeerId,
81    quote_bytes: &[u8],
82    already_stored: bool,
83) -> std::result::Result<(PaymentQuote, Amount), Error> {
84    let payment_quote = rmp_serde::from_slice::<PaymentQuote>(quote_bytes).map_err(|e| {
85        Error::Serialization(format!("Failed to deserialize quote from {peer_id}: {e}"))
86    })?;
87
88    // Peer binding: BLAKE3(pub_key) must equal peer_id. This is the
89    // exact mitigation Chris and the AI investigation requested for the
90    // 2026-04-30 production failure: drop crossed-key peers before they
91    // poison the close-group ProofOfPayment.
92    if !quote_binding_is_valid(peer_id, &payment_quote) {
93        let derived = compute_address(&payment_quote.pub_key);
94        warn!(
95            "Dropping response from {peer_id} — quote.pub_key BLAKE3 mismatch \
96             (peer is signing quotes with another peer's key); the storer \
97             would reject this proof"
98        );
99        return Err(Error::BadQuoteBinding {
100            peer_id: peer_id.to_string(),
101            detail: format!(
102                "BLAKE3(pub_key)={} pub_key_len={}",
103                hex::encode(derived),
104                payment_quote.pub_key.len(),
105            ),
106        });
107    }
108
109    if already_stored {
110        debug!("Peer {peer_id} already has chunk");
111        return Err(Error::AlreadyStored);
112    }
113    let price = payment_quote.price;
114    debug!("Received quote from {peer_id}: price = {price}");
115    Ok((payment_quote, price))
116}
117
118/// Map a per-peer quote-collection outcome to the AIMD-cache success flag.
119///
120/// `Ok(_)` and `AlreadyStored` are both *benign* outcomes — the peer is
121/// reachable and well-behaved — so we record them as successes (recording
122/// a smooth RTT). Every other variant (network/timeout/protocol/
123/// serialization, plus `BadQuoteBinding`) records as a failure so the
124/// local AIMD bootstrap cache learns to deprioritize peers that don't
125/// help us upload.
126///
127/// Pulled out of the per-peer closure for unit-testing.
128fn quote_outcome_is_success(result: &std::result::Result<(PaymentQuote, Amount), Error>) -> bool {
129    matches!(result, Ok(_) | Err(Error::AlreadyStored))
130}
131
132/// Drop quotes whose `pub_key` does not BLAKE3-hash to the peer that supplied
133/// them. Logs each dropped quote at WARN.
134fn drop_quotes_with_bad_bindings(
135    quotes: &mut Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>,
136) -> usize {
137    let before = quotes.len();
138    quotes.retain(|(peer_id, _, quote, _)| {
139        if quote_binding_is_valid(peer_id, quote) {
140            true
141        } else {
142            warn!(
143                "Dropping quote from peer {peer_id} — quote.pub_key BLAKE3 mismatch \
144                 (peer is signing quotes with another peer's key); the storer would \
145                 reject this proof"
146            );
147            false
148        }
149    });
150    before - quotes.len()
151}
152
153impl Client {
154    /// Get storage quotes from the closest peers for a given address.
155    ///
156    /// Queries 2x `CLOSE_GROUP_SIZE` peers from the DHT for fault tolerance,
157    /// requests quotes from all of them concurrently, and returns the
158    /// `CLOSE_GROUP_SIZE` closest successful responders sorted by XOR distance.
159    ///
160    /// Returns `Error::AlreadyStored` early if `CLOSE_GROUP_MAJORITY` peers
161    /// report the chunk is already stored.
162    ///
163    /// # Errors
164    ///
165    /// Returns an error if insufficient quotes can be collected.
166    #[allow(clippy::too_many_lines)]
167    pub async fn get_store_quotes(
168        &self,
169        address: &[u8; 32],
170        data_size: u64,
171        data_type: u32,
172    ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
173        let node = self.network().node();
174
175        // Over-query for fault tolerance: ask 2x peers, keep closest successful ones.
176        let over_query_count = CLOSE_GROUP_SIZE * 2;
177        debug!(
178            "Requesting quotes from up to {over_query_count} peers for address {} (size: {data_size})",
179            hex::encode(address)
180        );
181
182        let remote_peers = self
183            .network()
184            .find_closest_peers(address, over_query_count)
185            .await?;
186
187        if remote_peers.len() < CLOSE_GROUP_SIZE {
188            return Err(Error::InsufficientPeers(format!(
189                "Found {} peers, need {CLOSE_GROUP_SIZE}",
190                remote_peers.len()
191            )));
192        }
193
194        let per_peer_timeout = Duration::from_secs(self.config().quote_timeout_secs);
195        // Overall timeout for collecting all quotes. Must accommodate
196        // connect_with_fallback cascade (direct 5s + hole-punch 15s×3 + relay 30s ≈ 80s)
197        // plus the per-peer quote timeout. 120s is generous.
198        let overall_timeout = Duration::from_secs(120);
199
200        // Request quotes from all peers concurrently
201        let mut quote_futures = FuturesUnordered::new();
202
203        for (peer_id, peer_addrs) in &remote_peers {
204            let request_id = self.next_request_id();
205            let request = ChunkQuoteRequest {
206                address: *address,
207                data_size,
208                data_type,
209            };
210            let message = ChunkMessage {
211                request_id,
212                body: ChunkMessageBody::QuoteRequest(request),
213            };
214
215            let message_bytes = match message.encode() {
216                Ok(bytes) => bytes,
217                Err(e) => {
218                    warn!("Failed to encode quote request for {peer_id}: {e}");
219                    continue;
220                }
221            };
222
223            let peer_id_clone = *peer_id;
224            let addrs_clone = peer_addrs.clone();
225            let node_clone = node.clone();
226
227            let quote_future = async move {
228                let start = Instant::now();
229                let result = send_and_await_chunk_response(
230                    &node_clone,
231                    &peer_id_clone,
232                    message_bytes,
233                    request_id,
234                    per_peer_timeout,
235                    &addrs_clone,
236                    |body| match body {
237                        ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
238                            quote,
239                            already_stored,
240                        }) => Some(classify_quote_response(
241                            &peer_id_clone,
242                            &quote,
243                            already_stored,
244                        )),
245                        ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => Some(Err(
246                            Error::Protocol(format!("Quote error from {peer_id_clone}: {e}")),
247                        )),
248                        _ => None,
249                    },
250                    |e| {
251                        Error::Network(format!(
252                            "Failed to send quote request to {peer_id_clone}: {e}"
253                        ))
254                    },
255                    || Error::Timeout(format!("Timeout waiting for quote from {peer_id_clone}")),
256                )
257                .await;
258
259                // Record the per-peer outcome for the AIMD bootstrap cache.
260                // See `quote_outcome_is_success` for the full classification.
261                let success = quote_outcome_is_success(&result);
262                let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
263                record_peer_outcome(&node_clone, peer_id_clone, &addrs_clone, success, rtt_ms)
264                    .await;
265
266                (peer_id_clone, addrs_clone, result)
267            };
268
269            quote_futures.push(quote_future);
270        }
271
272        // Collect all responses with an overall timeout to prevent indefinite stalls.
273        // Over-query means we have 2x peers, so we can tolerate failures.
274        let mut quotes = Vec::with_capacity(over_query_count);
275        let mut already_stored_peers: Vec<(PeerId, [u8; 32])> = Vec::new();
276        let mut failures: Vec<String> = Vec::new();
277
278        // Track storer-rejecting peers separately (binding, content, signature
279        // failures) so we can surface their count in diagnostics — they're a
280        // special class of failure (peer misconfigured or hostile, not
281        // network-broken) and the user benefits from seeing them called out.
282        let mut bad_quote_count = 0usize;
283
284        let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
285            tokio::time::timeout(overall_timeout, async {
286                while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await {
287                    match quote_result {
288                        Ok((quote, price)) => {
289                            quotes.push((peer_id, addrs, quote, price));
290                        }
291                        Err(Error::AlreadyStored) => {
292                            info!("Peer {peer_id} reports chunk already stored");
293                            let dist = peer_xor_distance(&peer_id, address);
294                            already_stored_peers.push((peer_id, dist));
295                        }
296                        Err(e) => {
297                            // Count bad-binding peers separately (typed
298                            // variant — no string sniffing). Treat as a
299                            // normal failure for InsufficientPeers reporting.
300                            if matches!(&e, Error::BadQuoteBinding { .. }) {
301                                bad_quote_count += 1;
302                            }
303                            warn!("Failed to get quote from {peer_id}: {e}");
304                            failures.push(format!("{peer_id}: {e}"));
305                        }
306                    }
307                }
308                Ok(())
309            })
310            .await;
311
312        match collect_result {
313            Err(_elapsed) => {
314                warn!(
315                    "Quote collection timed out after {overall_timeout:?} for address {}",
316                    hex::encode(address)
317                );
318                // Fall through to check if we have enough quotes despite timeout.
319                // The timeout fires when slow peers haven't responded yet, but we
320                // may already have enough successful quotes from fast peers.
321            }
322            Ok(Err(e)) => return Err(e),
323            Ok(Ok(())) => {}
324        }
325
326        // Defensive double-check: the per-peer handler already filters
327        // bad-binding responses into `failures`, but if any path slipped a bad
328        // quote into `quotes` (e.g. a future refactor) this catches it before
329        // we sort by distance and return. `bad_dropped` should be 0 in normal
330        // operation; non-zero indicates an upstream regression worth investigating.
331        let bad_dropped = drop_quotes_with_bad_bindings(&mut quotes);
332        if bad_dropped > 0 {
333            warn!(
334                "Defensive filter dropped {bad_dropped} quotes with mismatched peer bindings \
335                 for address {} — the per-peer handler should have caught these earlier \
336                 (this indicates an upstream regression)",
337                hex::encode(address),
338            );
339            bad_quote_count += bad_dropped;
340        }
341
342        // Check already-stored: only count votes from the closest CLOSE_GROUP_SIZE peers.
343        if !already_stored_peers.is_empty() {
344            let mut all_peers_by_distance: Vec<(bool, [u8; 32])> = Vec::new();
345            for (peer_id, _, _, _) in &quotes {
346                all_peers_by_distance.push((false, peer_xor_distance(peer_id, address)));
347            }
348            for (_, dist) in &already_stored_peers {
349                all_peers_by_distance.push((true, *dist));
350            }
351            all_peers_by_distance.sort_by_key(|a| a.1);
352
353            let close_group_stored = all_peers_by_distance
354                .iter()
355                .take(CLOSE_GROUP_SIZE)
356                .filter(|(is_stored, _)| *is_stored)
357                .count();
358
359            if close_group_stored >= CLOSE_GROUP_MAJORITY {
360                debug!(
361                    "Chunk {} already stored ({close_group_stored}/{CLOSE_GROUP_SIZE} close-group peers confirm)",
362                    hex::encode(address)
363                );
364                return Err(Error::AlreadyStored);
365            }
366        }
367
368        let already_stored_count = already_stored_peers.len();
369        let failure_count = failures.len();
370        let quote_count = quotes.len();
371        let total_responses = quote_count + failure_count + already_stored_count;
372
373        if quotes.len() >= CLOSE_GROUP_SIZE {
374            // Sort by XOR distance to target, keep the closest CLOSE_GROUP_SIZE.
375            quotes.sort_by(|a, b| {
376                let dist_a = peer_xor_distance(&a.0, address);
377                let dist_b = peer_xor_distance(&b.0, address);
378                dist_a.cmp(&dist_b)
379            });
380            quotes.truncate(CLOSE_GROUP_SIZE);
381
382            info!(
383                "Collected {} quotes for address {} ({total_responses} responses: \
384                 {quote_count} ok, {already_stored_count} already_stored, {failure_count} failed, \
385                 {bad_quote_count} bad-binding)",
386                quotes.len(),
387                hex::encode(address),
388            );
389            return Ok(quotes);
390        }
391
392        Err(Error::InsufficientPeers(format!(
393            "Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} ({total_responses} responses: \
394             {already_stored_count} already_stored, {failure_count} failed including \
395             {bad_quote_count} with mismatched peer bindings). Failures: [{}]",
396            failures.join("; ")
397        )))
398    }
399}
400
401#[cfg(test)]
402#[allow(clippy::unwrap_used, clippy::expect_used)]
403mod tests {
404    //! Test fixtures use real ML-DSA-65 keypairs (1952-byte public keys), the
405    //! same key material that ships on the wire. The "bad" quote is built by
406    //! **swapping** the public key field with a different real keypair's
407    //! public key — the exact shape produced by the Apr 30 production
408    //! failure (an operator running two co-located identities with crossed
409    //! quote-signing keys). Signatures are not exercised here because this
410    //! filter only mirrors `validate_peer_bindings` (BLAKE3 binding); see
411    //! the doc-comment on `quote_binding_is_valid` for why
412    //! `verify_quote_signature` and `verify_quote_content` are deliberately
413    //! NOT mirrored.
414
415    use super::*;
416    use ant_protocol::evm::RewardsAddress;
417    use ant_protocol::pqc::ops::{MlDsaOperations, MlDsaPublicKey};
418    use ant_protocol::transport::MlDsa65;
419    use std::time::SystemTime;
420    use xor_name::XorName;
421
422    /// A real ML-DSA-65 keypair plus its derived peer ID.
423    struct Keypair {
424        peer_id: PeerId,
425        pub_key_bytes: Vec<u8>,
426    }
427
428    fn gen_keypair() -> Keypair {
429        let ml_dsa = MlDsa65::new();
430        let (pub_key, _sk) = ml_dsa.generate_keypair().expect("ML-DSA-65 keygen");
431        let pub_key_bytes = pub_key.as_bytes().to_vec();
432        let peer_id = PeerId::from_bytes(compute_address(&pub_key_bytes));
433        Keypair {
434            peer_id,
435            pub_key_bytes,
436        }
437    }
438
439    /// Build a quote tuple whose `pub_key` correctly hashes to its peer_id.
440    /// Signature is left empty: this filter does not verify signatures.
441    fn good_quote_real() -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
442        let kp = gen_keypair();
443        let quote = PaymentQuote {
444            content: XorName([0u8; 32]),
445            timestamp: SystemTime::UNIX_EPOCH,
446            price: Amount::ZERO,
447            rewards_address: RewardsAddress::new([0u8; 20]),
448            pub_key: kp.pub_key_bytes,
449            signature: Vec::new(),
450        };
451        (kp.peer_id, Vec::new(), quote, Amount::ZERO)
452    }
453
454    /// Build a quote tuple where the quote carries a different keypair's
455    /// `pub_key` than the peer_id derives from. Mirrors the production
456    /// failure shape: peer A advertised on the transport, but the quote
457    /// carries peer B's key.
458    fn bad_quote_real() -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
459        let claimed = gen_keypair();
460        let signing = gen_keypair();
461        assert_ne!(claimed.pub_key_bytes, signing.pub_key_bytes);
462        assert_ne!(claimed.peer_id.as_bytes(), signing.peer_id.as_bytes());
463        let quote = PaymentQuote {
464            content: XorName([0u8; 32]),
465            timestamp: SystemTime::UNIX_EPOCH,
466            price: Amount::ZERO,
467            rewards_address: RewardsAddress::new([0u8; 20]),
468            pub_key: signing.pub_key_bytes,
469            signature: Vec::new(),
470        };
471        (claimed.peer_id, Vec::new(), quote, Amount::ZERO)
472    }
473
474    /// Independent re-implementation of the storer-side binding spec
475    /// (`ant-node/src/payment/verifier.rs::validate_peer_bindings` +
476    /// `peer_id_from_public_key_bytes`):
477    /// (a) `pub_key` parses as ML-DSA-65 (length 1952), and
478    /// (b) `BLAKE3(pub_key) == peer_id`.
479    ///
480    /// Re-derived from spec, NOT delegating to `quote_binding_is_valid`,
481    /// so cross-checks are not "function == itself".
482    fn storer_binding_would_accept(peer_id: &PeerId, quote: &PaymentQuote) -> bool {
483        if MlDsaPublicKey::from_bytes(&quote.pub_key).is_err() {
484            return false;
485        }
486        compute_address(&quote.pub_key) == *peer_id.as_bytes()
487    }
488
489    // ============================================================
490    // Tests for `quote_binding_is_valid` (the predicate)
491    // ============================================================
492
493    #[test]
494    fn binding_accepts_real_self_consistent_keypair() {
495        let (peer_id, _, quote, _) = good_quote_real();
496        // Property under test: the predicate accepts a quote whose pub_key
497        // genuinely belongs to the claimed peer.
498        assert!(quote_binding_is_valid(&peer_id, &quote));
499        // Cross-check against the independent full storer-spec implementation.
500        assert!(storer_binding_would_accept(&peer_id, &quote));
501    }
502
503    #[test]
504    fn binding_rejects_real_crossed_keypair() {
505        let (peer_id, _, quote, _) = bad_quote_real();
506        assert!(!quote_binding_is_valid(&peer_id, &quote));
507        assert!(!storer_binding_would_accept(&peer_id, &quote));
508    }
509
510    #[test]
511    fn binding_rejects_oversize_pubkey() {
512        // A pub_key longer than ML-DSA-65 (1952 bytes) must be rejected
513        // even if BLAKE3 happens to agree, because the storer rejects on
514        // length first via `peer_id_from_public_key_bytes`.
515        let oversized = vec![0u8; ML_DSA_PUB_KEY_LEN + 1];
516        let peer_id = PeerId::from_bytes(compute_address(&oversized));
517        let quote = PaymentQuote {
518            content: XorName([0u8; 32]),
519            timestamp: SystemTime::UNIX_EPOCH,
520            price: Amount::ZERO,
521            rewards_address: RewardsAddress::new([0u8; 20]),
522            pub_key: oversized,
523            signature: Vec::new(),
524        };
525        // BLAKE3(pub_key) DOES equal the peer_id we constructed, so the
526        // bare hash check would pass — but the length guard must reject.
527        assert_eq!(compute_address(&quote.pub_key), *peer_id.as_bytes());
528        assert!(
529            !quote_binding_is_valid(&peer_id, &quote),
530            "predicate must reject oversize pub_key even when BLAKE3 happens to match"
531        );
532        assert!(!storer_binding_would_accept(&peer_id, &quote));
533    }
534
535    #[test]
536    fn binding_rejects_undersize_pubkey() {
537        let undersized = vec![0u8; ML_DSA_PUB_KEY_LEN - 1];
538        let peer_id = PeerId::from_bytes(compute_address(&undersized));
539        let quote = PaymentQuote {
540            content: XorName([0u8; 32]),
541            timestamp: SystemTime::UNIX_EPOCH,
542            price: Amount::ZERO,
543            rewards_address: RewardsAddress::new([0u8; 20]),
544            pub_key: undersized,
545            signature: Vec::new(),
546        };
547        assert!(!quote_binding_is_valid(&peer_id, &quote));
548        assert!(!storer_binding_would_accept(&peer_id, &quote));
549    }
550
551    // ============================================================
552    // Tests for the filter (`drop_quotes_with_bad_bindings`)
553    // ============================================================
554
555    #[test]
556    fn filter_drops_only_bad_bindings_and_leaves_storer_acceptable_quotes() {
557        let mut quotes = vec![
558            good_quote_real(),
559            bad_quote_real(),
560            good_quote_real(),
561            bad_quote_real(),
562            good_quote_real(),
563        ];
564
565        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
566
567        assert_eq!(dropped, 2, "two crossed-key quotes must be dropped");
568        assert_eq!(quotes.len(), 3, "three real-key quotes must remain");
569
570        // Cross-checked invariant: every retained quote would be accepted by
571        // a storer running the full spec. The defensive filter only checks
572        // the binding, so this asserts the binding-only filter is correct
573        // for binding-only failures (other failure modes are filtered by
574        // the per-peer classifier upstream).
575        for (peer_id, _, quote, _) in &quotes {
576            assert!(
577                storer_binding_would_accept(peer_id, quote),
578                "every retained quote must satisfy the full storer-side spec"
579            );
580        }
581    }
582
583    #[test]
584    fn filter_is_noop_when_all_quotes_are_storer_acceptable() {
585        let mut quotes: Vec<_> = (0..5).map(|_| good_quote_real()).collect();
586        let before = quotes.len();
587        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
588        assert_eq!(dropped, 0);
589        assert_eq!(quotes.len(), before);
590        for (peer_id, _, quote, _) in &quotes {
591            assert!(storer_binding_would_accept(peer_id, quote));
592        }
593    }
594
595    #[test]
596    fn filter_drops_all_when_every_responder_is_bad() {
597        // The "all hostile" case: every over-queried peer returned a bad
598        // binding. The patch should leave us with zero quotes (not panic,
599        // not skip the filter, not return malformed quotes). The caller in
600        // get_store_quotes then surfaces InsufficientPeers.
601        let mut quotes: Vec<_> = (0..CLOSE_GROUP_SIZE * 2)
602            .map(|_| bad_quote_real())
603            .collect();
604        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
605        assert_eq!(dropped, CLOSE_GROUP_SIZE * 2);
606        assert!(quotes.is_empty());
607    }
608
609    #[test]
610    fn filter_preserves_quote_payload_byte_for_byte() {
611        // After filtering, the retained quotes must be untouched — pub_key,
612        // signature, content, timestamp, price, rewards_address. The patch
613        // is a filter, not a transformation; this test catches any future
614        // regression that mutates a retained quote.
615        let (peer_id, addrs, original_quote, amount) = good_quote_real();
616        let mut quotes = vec![(peer_id, addrs.clone(), original_quote.clone(), amount)];
617        let _ = drop_quotes_with_bad_bindings(&mut quotes);
618
619        let (kept_peer, kept_addrs, kept_quote, kept_amount) =
620            quotes.pop().expect("the good quote must survive filtering");
621        assert_eq!(kept_peer.as_bytes(), peer_id.as_bytes());
622        assert_eq!(kept_addrs.len(), addrs.len());
623        assert_eq!(kept_amount, amount);
624        assert_eq!(kept_quote.pub_key, original_quote.pub_key);
625        assert_eq!(kept_quote.signature, original_quote.signature);
626        assert_eq!(kept_quote.content.0, original_quote.content.0);
627        assert_eq!(kept_quote.timestamp, original_quote.timestamp);
628        assert_eq!(kept_quote.price, original_quote.price);
629        assert_eq!(kept_quote.rewards_address, original_quote.rewards_address);
630    }
631
632    // ============================================================
633    // The Apr 30 production-failure repro
634    // ============================================================
635
636    /// Repro of the production failure from 2026-04-30 testnet runs.
637    ///
638    /// An external operator on `75.48.86.24` ran two co-located ant-node
639    /// identities (peer `0755ecb55b…` and peer `073db92f…`) that crossed
640    /// their quote-signing keys. Every chunk whose XOR-closest set happened
641    /// to include peer `0755ecb5` got a payment proof with one malformed
642    /// quote, and the storer's `validate_peer_bindings` rejected the
643    /// entire close-group proof — burning the chunk's payment.
644    ///
645    /// This test is the strongest proof the patch fixes that failure shape:
646    ///
647    /// 1. We assemble `2x CLOSE_GROUP_SIZE` real ML-DSA-65 quotes — the same
648    ///    over-query buffer the production code uses (line 93 of this file).
649    /// 2. One of them is a *crossed-key* quote — the production failure shape.
650    /// 3. We run an independent `storer_would_accept` check (re-derived from
651    ///    the storer spec, not from `quote_binding_is_valid`) over the
652    ///    pre-filter set; we confirm the bad peer is rejected, proving the
653    ///    storer **would** burn the chunk's payment if we proceeded unfiltered.
654    /// 4. We run `drop_quotes_with_bad_bindings`.
655    /// 5. We re-run `storer_would_accept` over the post-filter set; we confirm
656    ///    EVERY remaining quote would be accepted, proving the patched
657    ///    `ProofOfPayment` will not trigger the `validate_peer_bindings`
658    ///    rejection that caused the Apr 30 outage.
659    /// 6. We confirm the post-filter set has at least `CLOSE_GROUP_SIZE`
660    ///    quotes — the over-query buffer (2x) is sufficient.
661    #[test]
662    fn repro_apr_30_storer_would_have_rejected_pre_filter_and_accepts_post_filter() {
663        let over_query_count = CLOSE_GROUP_SIZE * 2;
664        let mut quotes: Vec<_> = (0..over_query_count - 1)
665            .map(|_| good_quote_real())
666            .collect();
667        // Splice the crossed-key quote in the middle (mirrors the random
668        // position the bad peer takes in the DHT-returned closest set).
669        quotes.insert(over_query_count / 2, bad_quote_real());
670        assert_eq!(quotes.len(), over_query_count);
671
672        // Step 1: prove the storer would reject the pre-filter set.
673        let storer_would_reject_count = quotes
674            .iter()
675            .filter(|(p, _, q, _)| !storer_binding_would_accept(p, q))
676            .count();
677        assert_eq!(
678            storer_would_reject_count, 1,
679            "exactly one quote (the crossed-key one) must be rejected by the storer spec"
680        );
681
682        // Step 2: run the patched filter.
683        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
684        assert_eq!(dropped, 1, "exactly the crossed-key quote must be filtered");
685
686        // Step 3: prove the storer would accept every survivor under the FULL spec.
687        for (peer_id, _, quote, _) in &quotes {
688            assert!(
689                storer_binding_would_accept(peer_id, quote),
690                "every post-filter quote must be accepted by the storer spec — \
691                 this is what the patch guarantees: no more burned payments"
692            );
693        }
694
695        // Step 4: prove the over-query buffer is sufficient to refill.
696        assert!(
697            quotes.len() >= CLOSE_GROUP_SIZE,
698            "after filtering, at least CLOSE_GROUP_SIZE good quotes must remain \
699             so we can build a non-rejected ProofOfPayment"
700        );
701    }
702
703    /// When more than the over-query buffer of peers misbehave, the filter
704    /// must NOT silently produce a short proof. The downstream caller in
705    /// `get_store_quotes` must see fewer than `CLOSE_GROUP_SIZE` survivors
706    /// and return `InsufficientPeers`.
707    #[test]
708    fn filter_leaves_short_set_when_too_many_bad_peers() {
709        // Buffer is 2x; if more than half are bad, there's no way to refill.
710        let bad_count = CLOSE_GROUP_SIZE + 1;
711        let good_count = CLOSE_GROUP_SIZE - 1;
712        let mut quotes: Vec<_> = std::iter::repeat_with(bad_quote_real)
713            .take(bad_count)
714            .chain(std::iter::repeat_with(good_quote_real).take(good_count))
715            .collect();
716
717        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
718        assert_eq!(dropped, bad_count);
719        assert!(
720            quotes.len() < CLOSE_GROUP_SIZE,
721            "this is the precondition for InsufficientPeers downstream"
722        );
723        // Sanity: every survivor is storer-acceptable under the full spec.
724        for (peer_id, _, quote, _) in &quotes {
725            assert!(storer_binding_would_accept(peer_id, quote));
726        }
727    }
728
729    // ============================================================
730    // Tests for the per-peer response classifier (the PRIMARY defense).
731    //
732    // These tests exercise the production code path that runs inside
733    // get_store_quotes' per-peer async closure. The defensive
734    // `drop_quotes_with_bad_bindings` is a second line of defence —
735    // these tests make sure the FIRST line is what actually catches
736    // misbehaving peers in production. Without these, a regression
737    // that removes the per-peer check could be masked by the post-
738    // collect filter and pass the rest of the suite.
739    // ============================================================
740
741    /// Helper: serialize a `PaymentQuote` to bytes the way the wire layer
742    /// does (rmp_serde / msgpack), to feed into `classify_quote_response`.
743    fn serialize_quote(quote: &PaymentQuote) -> Vec<u8> {
744        rmp_serde::to_vec(quote).expect("serialize quote")
745    }
746
747    #[test]
748    fn classifier_accepts_real_self_consistent_quote() {
749        let (peer_id, _, quote, _) = good_quote_real();
750        let bytes = serialize_quote(&quote);
751        let result = classify_quote_response(&peer_id, &bytes, false);
752        match result {
753            Ok((q, price)) => {
754                assert_eq!(q.pub_key, quote.pub_key);
755                assert_eq!(price, quote.price);
756            }
757            Err(e) => panic!("expected Ok, got {e}"),
758        }
759    }
760
761    #[test]
762    fn classifier_rejects_crossed_keypair_with_typed_error() {
763        let (peer_id, _, quote, _) = bad_quote_real();
764        let bytes = serialize_quote(&quote);
765        let result = classify_quote_response(&peer_id, &bytes, false);
766        match result {
767            Err(Error::BadQuoteBinding {
768                peer_id: pid,
769                detail,
770            }) => {
771                assert_eq!(pid, peer_id.to_string());
772                assert!(
773                    detail.contains("BLAKE3(pub_key)="),
774                    "diagnostic detail must include the derived peer id: {detail}"
775                );
776            }
777            other => panic!("expected BadQuoteBinding for crossed-key quote, got {other:?}"),
778        }
779    }
780
781    /// CRITICAL: a misbehaving peer that votes `already_stored=true` must
782    /// NOT be allowed to influence the close-group "already stored"
783    /// majority decision. The bind-check runs before the AlreadyStored
784    /// short-circuit, so a crossed-key peer voting "already stored" is
785    /// classified as `BadQuoteBinding`, not `AlreadyStored`.
786    ///
787    /// This locks in a specific reviewer concern from round 1:
788    ///   "A peer with a crossed/garbage signing key could simply respond
789    ///   already_stored=true and its vote enters already_stored_peers
790    ///   unfiltered."
791    #[test]
792    fn classifier_rejects_already_stored_vote_from_bad_binding_peer() {
793        let (peer_id, _, quote, _) = bad_quote_real();
794        let bytes = serialize_quote(&quote);
795        // The peer claims already_stored=true, but its quote has a crossed key.
796        let result = classify_quote_response(&peer_id, &bytes, true);
797        assert!(
798            matches!(result, Err(Error::BadQuoteBinding { .. })),
799            "crossed-key peer must be classified BadQuoteBinding even when \
800             voting already_stored=true; got {result:?}"
801        );
802    }
803
804    /// An honest peer's `already_stored=true` vote IS honoured (after
805    /// passing the bind-check). This is the contrast to the test above.
806    #[test]
807    fn classifier_honours_already_stored_vote_from_good_binding_peer() {
808        let (peer_id, _, quote, _) = good_quote_real();
809        let bytes = serialize_quote(&quote);
810        let result = classify_quote_response(&peer_id, &bytes, true);
811        assert!(
812            matches!(result, Err(Error::AlreadyStored)),
813            "honest peer's already_stored vote must be honoured; got {result:?}"
814        );
815    }
816
817    #[test]
818    fn classifier_returns_serialization_error_on_bad_bytes() {
819        let (peer_id, _, _, _) = good_quote_real();
820        let garbage = b"this is not a valid msgpack PaymentQuote".to_vec();
821        let result = classify_quote_response(&peer_id, &garbage, false);
822        assert!(
823            matches!(result, Err(Error::Serialization(_))),
824            "garbage bytes must produce a Serialization error; got {result:?}"
825        );
826    }
827
828    // ============================================================
829    // AIMD attribution: every error variant is classified correctly
830    // for `record_peer_outcome` so misbehaving peers are deprioritized
831    // and reachable-but-already-storing peers stay reputable.
832    // ============================================================
833
834    #[test]
835    fn aimd_success_for_ok_result() {
836        let (_, _, quote, _) = good_quote_real();
837        let result: std::result::Result<(PaymentQuote, Amount), Error> =
838            Ok((quote.clone(), quote.price));
839        assert!(quote_outcome_is_success(&result));
840    }
841
842    #[test]
843    fn aimd_success_for_already_stored() {
844        let result: std::result::Result<(PaymentQuote, Amount), Error> = Err(Error::AlreadyStored);
845        assert!(
846            quote_outcome_is_success(&result),
847            "an honest peer reporting already_stored is a benign outcome — \
848             the peer is reachable and well-behaved, so the AIMD cache must \
849             keep them at high reputation"
850        );
851    }
852
853    #[test]
854    fn aimd_failure_for_bad_quote_binding() {
855        let result: std::result::Result<(PaymentQuote, Amount), Error> =
856            Err(Error::BadQuoteBinding {
857                peer_id: "abc123".to_string(),
858                detail: "test".to_string(),
859            });
860        assert!(
861            !quote_outcome_is_success(&result),
862            "BadQuoteBinding peers must be marked as failures so the AIMD \
863             bootstrap cache learns to stop asking them on every upload"
864        );
865    }
866
867    #[test]
868    fn aimd_failure_for_network_and_timeout_and_protocol_and_serialization() {
869        for err in [
870            Error::Network("net".to_string()),
871            Error::Timeout("to".to_string()),
872            Error::Protocol("proto".to_string()),
873            Error::Serialization("ser".to_string()),
874        ] {
875            let result: std::result::Result<(PaymentQuote, Amount), Error> = Err(err);
876            assert!(
877                !quote_outcome_is_success(&result),
878                "network-class errors must be classified as failures: {result:?}"
879            );
880        }
881    }
882
883    /// Cross-validate the classifier's binding verdict against the
884    /// independent storer-spec re-derivation across mixed responders.
885    #[test]
886    fn classifier_verdict_matches_storer_binding_spec_for_mixed_responders() {
887        let mut responders: Vec<(PeerId, PaymentQuote)> = (0..12)
888            .map(|_| {
889                let (p, _, q, _) = good_quote_real();
890                (p, q)
891            })
892            .collect();
893        for _ in 0..4 {
894            let (p, _, q, _) = bad_quote_real();
895            responders.push((p, q));
896        }
897
898        for (peer_id, quote) in &responders {
899            let bytes = serialize_quote(quote);
900            let storer_verdict = storer_binding_would_accept(peer_id, quote);
901            let classifier_verdict = classify_quote_response(peer_id, &bytes, false).is_ok();
902            assert_eq!(
903                classifier_verdict, storer_verdict,
904                "classifier and storer-binding-spec must agree on every responder \
905                 (peer_id={}, storer={storer_verdict}, classifier={classifier_verdict})",
906                peer_id
907            );
908        }
909    }
910}