Skip to main content

ant_core/data/client/
quote.rs

1//! Quote and payment operations.
2//!
3//! Handles requesting storage quotes from network nodes and
4//! managing payment for data storage.
5
6use crate::data::client::peer_cache::record_peer_outcome;
7use crate::data::client::Client;
8use crate::data::error::{Error, Result};
9use ant_protocol::evm::{Amount, PaymentQuote};
10use ant_protocol::transport::{MultiAddr, PeerId};
11use ant_protocol::{
12    compute_address, send_and_await_chunk_response, ChunkMessage, ChunkMessageBody,
13    ChunkQuoteRequest, ChunkQuoteResponse, CLOSE_GROUP_MAJORITY, CLOSE_GROUP_SIZE,
14};
15use futures::stream::{FuturesUnordered, StreamExt};
16use std::time::{Duration, Instant};
17use tracing::{debug, info, warn};
18
19/// Compute XOR distance between a peer's ID bytes and a target address.
20///
21/// Uses the first 32 bytes of the peer ID (or fewer if shorter) XORed with
22/// the target address. Returns a byte array suitable for lexicographic comparison.
23fn xor_distance(peer_id: &PeerId, target: &[u8; 32]) -> [u8; 32] {
24    let peer_bytes = peer_id.as_bytes();
25    let mut distance = [0u8; 32];
26    for (i, d) in distance.iter_mut().enumerate() {
27        let pb = peer_bytes.get(i).copied().unwrap_or(0);
28        *d = pb ^ target[i];
29    }
30    distance
31}
32
33/// ML-DSA-65 public key length in bytes. Mirrors the same value defined as
34/// `pub const ML_DSA_65_PUBLIC_KEY_SIZE` in `saorsa-pqc::pqc::types`, which
35/// the storer's `peer_id_from_public_key_bytes` enforces. We keep a local
36/// copy here rather than adding a direct `saorsa-pqc` dep — the constant
37/// is FIPS-mandated for ML-DSA-65 and won't change unless we change variant.
38///
39/// TODO: switch to `saorsa_pqc::pqc::types::ML_DSA_65_PUBLIC_KEY_SIZE` once
40/// `ant-protocol` re-exports it (`pqc::ops::ML_DSA_65_PUBLIC_KEY_SIZE`).
41const ML_DSA_PUB_KEY_LEN: usize = 1952;
42
43/// Check that a quote's `pub_key` is well-formed and BLAKE3-hashes to the
44/// claimed `peer_id`.
45///
46/// The storer node enforces both constraints in `ant-node/src/payment/verifier.rs`
47/// (via `peer_id_from_public_key_bytes` and `validate_peer_bindings`): every
48/// quote inside a `ProofOfPayment` must (a) have a 1952-byte `pub_key` parsable
49/// as ML-DSA-65 and (b) satisfy `BLAKE3(pub_key) == peer_id`. A single quote
50/// failing either check causes the storer to reject the entire close-group
51/// proof and burn the chunk's payment.
52///
53/// We mirror the cheap structural check here. The storer also runs
54/// `verify_quote_content` and `verify_quote_signature`; those are ML-DSA
55/// verifications (~1 ms × 14 quotes × every chunk) and are deliberately NOT
56/// mirrored on the client to keep upload latency unchanged. They are tracked
57/// as a follow-up if a real attack surfaces them.
58fn quote_binding_is_valid(peer_id: &PeerId, quote: &PaymentQuote) -> bool {
59    if quote.pub_key.len() != ML_DSA_PUB_KEY_LEN {
60        return false;
61    }
62    compute_address(&quote.pub_key) == *peer_id.as_bytes()
63}
64
65/// Classification of a `ChunkQuoteResponse::Success` body for a single peer.
66///
67/// Mirrors the storer-side `validate_peer_bindings` check from
68/// `ant-node/src/payment/verifier.rs` — the cheap BLAKE3 binding —
69/// so we drop misbehaving peers' quotes before payment.
70///
71/// We deliberately do NOT mirror the storer's `verify_quote_signature`
72/// (ML-DSA-65 verify, ~1 ms × CLOSE_GROUP_SIZE × every chunk) or
73/// `verify_quote_content`. Those are useful defense-in-depth for an
74/// attacker who self-consistently crafts a signed-but-stolen or wrong-
75/// content quote, but they are NOT cheap and are out of scope for this
76/// fix. Adding them changes upload latency materially. Track them as a
77/// follow-up if a real attack surfaces them.
78///
79/// Pulling the logic out of the async closure lets us unit-test the
80/// primary defense (not just the post-collect defensive filter).
81///
82/// # Returns
83///
84/// - `Ok((quote, price))` — the response is honoured as a quote.
85/// - `Err(Error::AlreadyStored)` — the peer claims the chunk is already
86///   present AND the quote it provided binds to its peer ID. Vote counts.
87/// - `Err(Error::BadQuoteBinding { .. })` — bad binding (mirrors the
88///   storer-side rejection); the peer is treated as a failure so the
89///   AIMD cache learns to deprioritize it. Outer collector counts these
90///   via the typed variant (no string matching).
91/// - `Err(Error::Serialization(...))` — the quote bytes did not deserialize.
92fn classify_quote_response(
93    peer_id: &PeerId,
94    quote_bytes: &[u8],
95    already_stored: bool,
96) -> std::result::Result<(PaymentQuote, Amount), Error> {
97    let payment_quote = rmp_serde::from_slice::<PaymentQuote>(quote_bytes).map_err(|e| {
98        Error::Serialization(format!("Failed to deserialize quote from {peer_id}: {e}"))
99    })?;
100
101    // Peer binding: BLAKE3(pub_key) must equal peer_id. This is the
102    // exact mitigation Chris and the AI investigation requested for the
103    // 2026-04-30 production failure: drop crossed-key peers before they
104    // poison the close-group ProofOfPayment.
105    if !quote_binding_is_valid(peer_id, &payment_quote) {
106        let derived = compute_address(&payment_quote.pub_key);
107        warn!(
108            "Dropping response from {peer_id} — quote.pub_key BLAKE3 mismatch \
109             (peer is signing quotes with another peer's key); the storer \
110             would reject this proof"
111        );
112        return Err(Error::BadQuoteBinding {
113            peer_id: peer_id.to_string(),
114            detail: format!(
115                "BLAKE3(pub_key)={} pub_key_len={}",
116                hex::encode(derived),
117                payment_quote.pub_key.len(),
118            ),
119        });
120    }
121
122    if already_stored {
123        debug!("Peer {peer_id} already has chunk");
124        return Err(Error::AlreadyStored);
125    }
126    let price = payment_quote.price;
127    debug!("Received quote from {peer_id}: price = {price}");
128    Ok((payment_quote, price))
129}
130
131/// Map a per-peer quote-collection outcome to the AIMD-cache success flag.
132///
133/// `Ok(_)` and `AlreadyStored` are both *benign* outcomes — the peer is
134/// reachable and well-behaved — so we record them as successes (recording
135/// a smooth RTT). Every other variant (network/timeout/protocol/
136/// serialization, plus `BadQuoteBinding`) records as a failure so the
137/// local AIMD bootstrap cache learns to deprioritize peers that don't
138/// help us upload.
139///
140/// Pulled out of the per-peer closure for unit-testing.
141fn quote_outcome_is_success(result: &std::result::Result<(PaymentQuote, Amount), Error>) -> bool {
142    matches!(result, Ok(_) | Err(Error::AlreadyStored))
143}
144
145/// Drop quotes whose `pub_key` does not BLAKE3-hash to the peer that supplied
146/// them. Logs each dropped quote at WARN.
147fn drop_quotes_with_bad_bindings(
148    quotes: &mut Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>,
149) -> usize {
150    let before = quotes.len();
151    quotes.retain(|(peer_id, _, quote, _)| {
152        if quote_binding_is_valid(peer_id, quote) {
153            true
154        } else {
155            warn!(
156                "Dropping quote from peer {peer_id} — quote.pub_key BLAKE3 mismatch \
157                 (peer is signing quotes with another peer's key); the storer would \
158                 reject this proof"
159            );
160            false
161        }
162    });
163    before - quotes.len()
164}
165
166impl Client {
167    /// Get storage quotes from the closest peers for a given address.
168    ///
169    /// Queries 2x `CLOSE_GROUP_SIZE` peers from the DHT for fault tolerance,
170    /// requests quotes from all of them concurrently, and returns the
171    /// `CLOSE_GROUP_SIZE` closest successful responders sorted by XOR distance.
172    ///
173    /// Returns `Error::AlreadyStored` early if `CLOSE_GROUP_MAJORITY` peers
174    /// report the chunk is already stored.
175    ///
176    /// # Errors
177    ///
178    /// Returns an error if insufficient quotes can be collected.
179    #[allow(clippy::too_many_lines)]
180    pub async fn get_store_quotes(
181        &self,
182        address: &[u8; 32],
183        data_size: u64,
184        data_type: u32,
185    ) -> Result<Vec<(PeerId, Vec<MultiAddr>, PaymentQuote, Amount)>> {
186        let node = self.network().node();
187
188        // Over-query for fault tolerance: ask 2x peers, keep closest successful ones.
189        let over_query_count = CLOSE_GROUP_SIZE * 2;
190        debug!(
191            "Requesting quotes from up to {over_query_count} peers for address {} (size: {data_size})",
192            hex::encode(address)
193        );
194
195        let remote_peers = self
196            .network()
197            .find_closest_peers(address, over_query_count)
198            .await?;
199
200        if remote_peers.len() < CLOSE_GROUP_SIZE {
201            return Err(Error::InsufficientPeers(format!(
202                "Found {} peers, need {CLOSE_GROUP_SIZE}",
203                remote_peers.len()
204            )));
205        }
206
207        let per_peer_timeout = Duration::from_secs(self.config().quote_timeout_secs);
208        // Overall timeout for collecting all quotes. Must accommodate
209        // connect_with_fallback cascade (direct 5s + hole-punch 15s×3 + relay 30s ≈ 80s)
210        // plus the per-peer quote timeout. 120s is generous.
211        let overall_timeout = Duration::from_secs(120);
212
213        // Request quotes from all peers concurrently
214        let mut quote_futures = FuturesUnordered::new();
215
216        for (peer_id, peer_addrs) in &remote_peers {
217            let request_id = self.next_request_id();
218            let request = ChunkQuoteRequest {
219                address: *address,
220                data_size,
221                data_type,
222            };
223            let message = ChunkMessage {
224                request_id,
225                body: ChunkMessageBody::QuoteRequest(request),
226            };
227
228            let message_bytes = match message.encode() {
229                Ok(bytes) => bytes,
230                Err(e) => {
231                    warn!("Failed to encode quote request for {peer_id}: {e}");
232                    continue;
233                }
234            };
235
236            let peer_id_clone = *peer_id;
237            let addrs_clone = peer_addrs.clone();
238            let node_clone = node.clone();
239
240            let quote_future = async move {
241                let start = Instant::now();
242                let result = send_and_await_chunk_response(
243                    &node_clone,
244                    &peer_id_clone,
245                    message_bytes,
246                    request_id,
247                    per_peer_timeout,
248                    &addrs_clone,
249                    |body| match body {
250                        ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
251                            quote,
252                            already_stored,
253                        }) => Some(classify_quote_response(
254                            &peer_id_clone,
255                            &quote,
256                            already_stored,
257                        )),
258                        ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Error(e)) => Some(Err(
259                            Error::Protocol(format!("Quote error from {peer_id_clone}: {e}")),
260                        )),
261                        _ => None,
262                    },
263                    |e| {
264                        Error::Network(format!(
265                            "Failed to send quote request to {peer_id_clone}: {e}"
266                        ))
267                    },
268                    || Error::Timeout(format!("Timeout waiting for quote from {peer_id_clone}")),
269                )
270                .await;
271
272                // Record the per-peer outcome for the AIMD bootstrap cache.
273                // See `quote_outcome_is_success` for the full classification.
274                let success = quote_outcome_is_success(&result);
275                let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
276                record_peer_outcome(&node_clone, peer_id_clone, &addrs_clone, success, rtt_ms)
277                    .await;
278
279                (peer_id_clone, addrs_clone, result)
280            };
281
282            quote_futures.push(quote_future);
283        }
284
285        // Collect all responses with an overall timeout to prevent indefinite stalls.
286        // Over-query means we have 2x peers, so we can tolerate failures.
287        let mut quotes = Vec::with_capacity(over_query_count);
288        let mut already_stored_peers: Vec<(PeerId, [u8; 32])> = Vec::new();
289        let mut failures: Vec<String> = Vec::new();
290
291        // Track storer-rejecting peers separately (binding, content, signature
292        // failures) so we can surface their count in diagnostics — they're a
293        // special class of failure (peer misconfigured or hostile, not
294        // network-broken) and the user benefits from seeing them called out.
295        let mut bad_quote_count = 0usize;
296
297        let collect_result: std::result::Result<std::result::Result<(), Error>, _> =
298            tokio::time::timeout(overall_timeout, async {
299                while let Some((peer_id, addrs, quote_result)) = quote_futures.next().await {
300                    match quote_result {
301                        Ok((quote, price)) => {
302                            quotes.push((peer_id, addrs, quote, price));
303                        }
304                        Err(Error::AlreadyStored) => {
305                            info!("Peer {peer_id} reports chunk already stored");
306                            let dist = xor_distance(&peer_id, address);
307                            already_stored_peers.push((peer_id, dist));
308                        }
309                        Err(e) => {
310                            // Count bad-binding peers separately (typed
311                            // variant — no string sniffing). Treat as a
312                            // normal failure for InsufficientPeers reporting.
313                            if matches!(&e, Error::BadQuoteBinding { .. }) {
314                                bad_quote_count += 1;
315                            }
316                            warn!("Failed to get quote from {peer_id}: {e}");
317                            failures.push(format!("{peer_id}: {e}"));
318                        }
319                    }
320                }
321                Ok(())
322            })
323            .await;
324
325        match collect_result {
326            Err(_elapsed) => {
327                warn!(
328                    "Quote collection timed out after {overall_timeout:?} for address {}",
329                    hex::encode(address)
330                );
331                // Fall through to check if we have enough quotes despite timeout.
332                // The timeout fires when slow peers haven't responded yet, but we
333                // may already have enough successful quotes from fast peers.
334            }
335            Ok(Err(e)) => return Err(e),
336            Ok(Ok(())) => {}
337        }
338
339        // Defensive double-check: the per-peer handler already filters
340        // bad-binding responses into `failures`, but if any path slipped a bad
341        // quote into `quotes` (e.g. a future refactor) this catches it before
342        // we sort by distance and return. `bad_dropped` should be 0 in normal
343        // operation; non-zero indicates an upstream regression worth investigating.
344        let bad_dropped = drop_quotes_with_bad_bindings(&mut quotes);
345        if bad_dropped > 0 {
346            warn!(
347                "Defensive filter dropped {bad_dropped} quotes with mismatched peer bindings \
348                 for address {} — the per-peer handler should have caught these earlier \
349                 (this indicates an upstream regression)",
350                hex::encode(address),
351            );
352            bad_quote_count += bad_dropped;
353        }
354
355        // Check already-stored: only count votes from the closest CLOSE_GROUP_SIZE peers.
356        if !already_stored_peers.is_empty() {
357            let mut all_peers_by_distance: Vec<(bool, [u8; 32])> = Vec::new();
358            for (peer_id, _, _, _) in &quotes {
359                all_peers_by_distance.push((false, xor_distance(peer_id, address)));
360            }
361            for (_, dist) in &already_stored_peers {
362                all_peers_by_distance.push((true, *dist));
363            }
364            all_peers_by_distance.sort_by_key(|a| a.1);
365
366            let close_group_stored = all_peers_by_distance
367                .iter()
368                .take(CLOSE_GROUP_SIZE)
369                .filter(|(is_stored, _)| *is_stored)
370                .count();
371
372            if close_group_stored >= CLOSE_GROUP_MAJORITY {
373                debug!(
374                    "Chunk {} already stored ({close_group_stored}/{CLOSE_GROUP_SIZE} close-group peers confirm)",
375                    hex::encode(address)
376                );
377                return Err(Error::AlreadyStored);
378            }
379        }
380
381        let already_stored_count = already_stored_peers.len();
382        let failure_count = failures.len();
383        let quote_count = quotes.len();
384        let total_responses = quote_count + failure_count + already_stored_count;
385
386        if quotes.len() >= CLOSE_GROUP_SIZE {
387            // Sort by XOR distance to target, keep the closest CLOSE_GROUP_SIZE.
388            quotes.sort_by(|a, b| {
389                let dist_a = xor_distance(&a.0, address);
390                let dist_b = xor_distance(&b.0, address);
391                dist_a.cmp(&dist_b)
392            });
393            quotes.truncate(CLOSE_GROUP_SIZE);
394
395            info!(
396                "Collected {} quotes for address {} ({total_responses} responses: \
397                 {quote_count} ok, {already_stored_count} already_stored, {failure_count} failed, \
398                 {bad_quote_count} bad-binding)",
399                quotes.len(),
400                hex::encode(address),
401            );
402            return Ok(quotes);
403        }
404
405        Err(Error::InsufficientPeers(format!(
406            "Got {quote_count} quotes, need {CLOSE_GROUP_SIZE} ({total_responses} responses: \
407             {already_stored_count} already_stored, {failure_count} failed including \
408             {bad_quote_count} with mismatched peer bindings). Failures: [{}]",
409            failures.join("; ")
410        )))
411    }
412}
413
414#[cfg(test)]
415#[allow(clippy::unwrap_used, clippy::expect_used)]
416mod tests {
417    //! Test fixtures use real ML-DSA-65 keypairs (1952-byte public keys), the
418    //! same key material that ships on the wire. The "bad" quote is built by
419    //! **swapping** the public key field with a different real keypair's
420    //! public key — the exact shape produced by the Apr 30 production
421    //! failure (an operator running two co-located identities with crossed
422    //! quote-signing keys). Signatures are not exercised here because this
423    //! filter only mirrors `validate_peer_bindings` (BLAKE3 binding); see
424    //! the doc-comment on `quote_binding_is_valid` for why
425    //! `verify_quote_signature` and `verify_quote_content` are deliberately
426    //! NOT mirrored.
427
428    use super::*;
429    use ant_protocol::evm::RewardsAddress;
430    use ant_protocol::pqc::ops::{MlDsaOperations, MlDsaPublicKey};
431    use ant_protocol::transport::MlDsa65;
432    use std::time::SystemTime;
433    use xor_name::XorName;
434
435    /// A real ML-DSA-65 keypair plus its derived peer ID.
436    struct Keypair {
437        peer_id: PeerId,
438        pub_key_bytes: Vec<u8>,
439    }
440
441    fn gen_keypair() -> Keypair {
442        let ml_dsa = MlDsa65::new();
443        let (pub_key, _sk) = ml_dsa.generate_keypair().expect("ML-DSA-65 keygen");
444        let pub_key_bytes = pub_key.as_bytes().to_vec();
445        let peer_id = PeerId::from_bytes(compute_address(&pub_key_bytes));
446        Keypair {
447            peer_id,
448            pub_key_bytes,
449        }
450    }
451
452    /// Build a quote tuple whose `pub_key` correctly hashes to its peer_id.
453    /// Signature is left empty: this filter does not verify signatures.
454    fn good_quote_real() -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
455        let kp = gen_keypair();
456        let quote = PaymentQuote {
457            content: XorName([0u8; 32]),
458            timestamp: SystemTime::UNIX_EPOCH,
459            price: Amount::ZERO,
460            rewards_address: RewardsAddress::new([0u8; 20]),
461            pub_key: kp.pub_key_bytes,
462            signature: Vec::new(),
463        };
464        (kp.peer_id, Vec::new(), quote, Amount::ZERO)
465    }
466
467    /// Build a quote tuple where the quote carries a different keypair's
468    /// `pub_key` than the peer_id derives from. Mirrors the production
469    /// failure shape: peer A advertised on the transport, but the quote
470    /// carries peer B's key.
471    fn bad_quote_real() -> (PeerId, Vec<MultiAddr>, PaymentQuote, Amount) {
472        let claimed = gen_keypair();
473        let signing = gen_keypair();
474        assert_ne!(claimed.pub_key_bytes, signing.pub_key_bytes);
475        assert_ne!(claimed.peer_id.as_bytes(), signing.peer_id.as_bytes());
476        let quote = PaymentQuote {
477            content: XorName([0u8; 32]),
478            timestamp: SystemTime::UNIX_EPOCH,
479            price: Amount::ZERO,
480            rewards_address: RewardsAddress::new([0u8; 20]),
481            pub_key: signing.pub_key_bytes,
482            signature: Vec::new(),
483        };
484        (claimed.peer_id, Vec::new(), quote, Amount::ZERO)
485    }
486
487    /// Independent re-implementation of the storer-side binding spec
488    /// (`ant-node/src/payment/verifier.rs::validate_peer_bindings` +
489    /// `peer_id_from_public_key_bytes`):
490    /// (a) `pub_key` parses as ML-DSA-65 (length 1952), and
491    /// (b) `BLAKE3(pub_key) == peer_id`.
492    ///
493    /// Re-derived from spec, NOT delegating to `quote_binding_is_valid`,
494    /// so cross-checks are not "function == itself".
495    fn storer_binding_would_accept(peer_id: &PeerId, quote: &PaymentQuote) -> bool {
496        if MlDsaPublicKey::from_bytes(&quote.pub_key).is_err() {
497            return false;
498        }
499        compute_address(&quote.pub_key) == *peer_id.as_bytes()
500    }
501
502    // ============================================================
503    // Tests for `quote_binding_is_valid` (the predicate)
504    // ============================================================
505
506    #[test]
507    fn binding_accepts_real_self_consistent_keypair() {
508        let (peer_id, _, quote, _) = good_quote_real();
509        // Property under test: the predicate accepts a quote whose pub_key
510        // genuinely belongs to the claimed peer.
511        assert!(quote_binding_is_valid(&peer_id, &quote));
512        // Cross-check against the independent full storer-spec implementation.
513        assert!(storer_binding_would_accept(&peer_id, &quote));
514    }
515
516    #[test]
517    fn binding_rejects_real_crossed_keypair() {
518        let (peer_id, _, quote, _) = bad_quote_real();
519        assert!(!quote_binding_is_valid(&peer_id, &quote));
520        assert!(!storer_binding_would_accept(&peer_id, &quote));
521    }
522
523    #[test]
524    fn binding_rejects_oversize_pubkey() {
525        // A pub_key longer than ML-DSA-65 (1952 bytes) must be rejected
526        // even if BLAKE3 happens to agree, because the storer rejects on
527        // length first via `peer_id_from_public_key_bytes`.
528        let oversized = vec![0u8; ML_DSA_PUB_KEY_LEN + 1];
529        let peer_id = PeerId::from_bytes(compute_address(&oversized));
530        let quote = PaymentQuote {
531            content: XorName([0u8; 32]),
532            timestamp: SystemTime::UNIX_EPOCH,
533            price: Amount::ZERO,
534            rewards_address: RewardsAddress::new([0u8; 20]),
535            pub_key: oversized,
536            signature: Vec::new(),
537        };
538        // BLAKE3(pub_key) DOES equal the peer_id we constructed, so the
539        // bare hash check would pass — but the length guard must reject.
540        assert_eq!(compute_address(&quote.pub_key), *peer_id.as_bytes());
541        assert!(
542            !quote_binding_is_valid(&peer_id, &quote),
543            "predicate must reject oversize pub_key even when BLAKE3 happens to match"
544        );
545        assert!(!storer_binding_would_accept(&peer_id, &quote));
546    }
547
548    #[test]
549    fn binding_rejects_undersize_pubkey() {
550        let undersized = vec![0u8; ML_DSA_PUB_KEY_LEN - 1];
551        let peer_id = PeerId::from_bytes(compute_address(&undersized));
552        let quote = PaymentQuote {
553            content: XorName([0u8; 32]),
554            timestamp: SystemTime::UNIX_EPOCH,
555            price: Amount::ZERO,
556            rewards_address: RewardsAddress::new([0u8; 20]),
557            pub_key: undersized,
558            signature: Vec::new(),
559        };
560        assert!(!quote_binding_is_valid(&peer_id, &quote));
561        assert!(!storer_binding_would_accept(&peer_id, &quote));
562    }
563
564    // ============================================================
565    // Tests for the filter (`drop_quotes_with_bad_bindings`)
566    // ============================================================
567
568    #[test]
569    fn filter_drops_only_bad_bindings_and_leaves_storer_acceptable_quotes() {
570        let mut quotes = vec![
571            good_quote_real(),
572            bad_quote_real(),
573            good_quote_real(),
574            bad_quote_real(),
575            good_quote_real(),
576        ];
577
578        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
579
580        assert_eq!(dropped, 2, "two crossed-key quotes must be dropped");
581        assert_eq!(quotes.len(), 3, "three real-key quotes must remain");
582
583        // Cross-checked invariant: every retained quote would be accepted by
584        // a storer running the full spec. The defensive filter only checks
585        // the binding, so this asserts the binding-only filter is correct
586        // for binding-only failures (other failure modes are filtered by
587        // the per-peer classifier upstream).
588        for (peer_id, _, quote, _) in &quotes {
589            assert!(
590                storer_binding_would_accept(peer_id, quote),
591                "every retained quote must satisfy the full storer-side spec"
592            );
593        }
594    }
595
596    #[test]
597    fn filter_is_noop_when_all_quotes_are_storer_acceptable() {
598        let mut quotes: Vec<_> = (0..5).map(|_| good_quote_real()).collect();
599        let before = quotes.len();
600        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
601        assert_eq!(dropped, 0);
602        assert_eq!(quotes.len(), before);
603        for (peer_id, _, quote, _) in &quotes {
604            assert!(storer_binding_would_accept(peer_id, quote));
605        }
606    }
607
608    #[test]
609    fn filter_drops_all_when_every_responder_is_bad() {
610        // The "all hostile" case: every over-queried peer returned a bad
611        // binding. The patch should leave us with zero quotes (not panic,
612        // not skip the filter, not return malformed quotes). The caller in
613        // get_store_quotes then surfaces InsufficientPeers.
614        let mut quotes: Vec<_> = (0..CLOSE_GROUP_SIZE * 2)
615            .map(|_| bad_quote_real())
616            .collect();
617        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
618        assert_eq!(dropped, CLOSE_GROUP_SIZE * 2);
619        assert!(quotes.is_empty());
620    }
621
622    #[test]
623    fn filter_preserves_quote_payload_byte_for_byte() {
624        // After filtering, the retained quotes must be untouched — pub_key,
625        // signature, content, timestamp, price, rewards_address. The patch
626        // is a filter, not a transformation; this test catches any future
627        // regression that mutates a retained quote.
628        let (peer_id, addrs, original_quote, amount) = good_quote_real();
629        let mut quotes = vec![(peer_id, addrs.clone(), original_quote.clone(), amount)];
630        let _ = drop_quotes_with_bad_bindings(&mut quotes);
631
632        let (kept_peer, kept_addrs, kept_quote, kept_amount) =
633            quotes.pop().expect("the good quote must survive filtering");
634        assert_eq!(kept_peer.as_bytes(), peer_id.as_bytes());
635        assert_eq!(kept_addrs.len(), addrs.len());
636        assert_eq!(kept_amount, amount);
637        assert_eq!(kept_quote.pub_key, original_quote.pub_key);
638        assert_eq!(kept_quote.signature, original_quote.signature);
639        assert_eq!(kept_quote.content.0, original_quote.content.0);
640        assert_eq!(kept_quote.timestamp, original_quote.timestamp);
641        assert_eq!(kept_quote.price, original_quote.price);
642        assert_eq!(kept_quote.rewards_address, original_quote.rewards_address);
643    }
644
645    // ============================================================
646    // The Apr 30 production-failure repro
647    // ============================================================
648
649    /// Repro of the production failure from 2026-04-30 testnet runs.
650    ///
651    /// An external operator on `75.48.86.24` ran two co-located ant-node
652    /// identities (peer `0755ecb55b…` and peer `073db92f…`) that crossed
653    /// their quote-signing keys. Every chunk whose XOR-closest set happened
654    /// to include peer `0755ecb5` got a payment proof with one malformed
655    /// quote, and the storer's `validate_peer_bindings` rejected the
656    /// entire close-group proof — burning the chunk's payment.
657    ///
658    /// This test is the strongest proof the patch fixes that failure shape:
659    ///
660    /// 1. We assemble `2x CLOSE_GROUP_SIZE` real ML-DSA-65 quotes — the same
661    ///    over-query buffer the production code uses (line 93 of this file).
662    /// 2. One of them is a *crossed-key* quote — the production failure shape.
663    /// 3. We run an independent `storer_would_accept` check (re-derived from
664    ///    the storer spec, not from `quote_binding_is_valid`) over the
665    ///    pre-filter set; we confirm the bad peer is rejected, proving the
666    ///    storer **would** burn the chunk's payment if we proceeded unfiltered.
667    /// 4. We run `drop_quotes_with_bad_bindings`.
668    /// 5. We re-run `storer_would_accept` over the post-filter set; we confirm
669    ///    EVERY remaining quote would be accepted, proving the patched
670    ///    `ProofOfPayment` will not trigger the `validate_peer_bindings`
671    ///    rejection that caused the Apr 30 outage.
672    /// 6. We confirm the post-filter set has at least `CLOSE_GROUP_SIZE`
673    ///    quotes — the over-query buffer (2x) is sufficient.
674    #[test]
675    fn repro_apr_30_storer_would_have_rejected_pre_filter_and_accepts_post_filter() {
676        let over_query_count = CLOSE_GROUP_SIZE * 2;
677        let mut quotes: Vec<_> = (0..over_query_count - 1)
678            .map(|_| good_quote_real())
679            .collect();
680        // Splice the crossed-key quote in the middle (mirrors the random
681        // position the bad peer takes in the DHT-returned closest set).
682        quotes.insert(over_query_count / 2, bad_quote_real());
683        assert_eq!(quotes.len(), over_query_count);
684
685        // Step 1: prove the storer would reject the pre-filter set.
686        let storer_would_reject_count = quotes
687            .iter()
688            .filter(|(p, _, q, _)| !storer_binding_would_accept(p, q))
689            .count();
690        assert_eq!(
691            storer_would_reject_count, 1,
692            "exactly one quote (the crossed-key one) must be rejected by the storer spec"
693        );
694
695        // Step 2: run the patched filter.
696        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
697        assert_eq!(dropped, 1, "exactly the crossed-key quote must be filtered");
698
699        // Step 3: prove the storer would accept every survivor under the FULL spec.
700        for (peer_id, _, quote, _) in &quotes {
701            assert!(
702                storer_binding_would_accept(peer_id, quote),
703                "every post-filter quote must be accepted by the storer spec — \
704                 this is what the patch guarantees: no more burned payments"
705            );
706        }
707
708        // Step 4: prove the over-query buffer is sufficient to refill.
709        assert!(
710            quotes.len() >= CLOSE_GROUP_SIZE,
711            "after filtering, at least CLOSE_GROUP_SIZE good quotes must remain \
712             so we can build a non-rejected ProofOfPayment"
713        );
714    }
715
716    /// When more than the over-query buffer of peers misbehave, the filter
717    /// must NOT silently produce a short proof. The downstream caller in
718    /// `get_store_quotes` must see fewer than `CLOSE_GROUP_SIZE` survivors
719    /// and return `InsufficientPeers`.
720    #[test]
721    fn filter_leaves_short_set_when_too_many_bad_peers() {
722        // Buffer is 2x; if more than half are bad, there's no way to refill.
723        let bad_count = CLOSE_GROUP_SIZE + 1;
724        let good_count = CLOSE_GROUP_SIZE - 1;
725        let mut quotes: Vec<_> = std::iter::repeat_with(bad_quote_real)
726            .take(bad_count)
727            .chain(std::iter::repeat_with(good_quote_real).take(good_count))
728            .collect();
729
730        let dropped = drop_quotes_with_bad_bindings(&mut quotes);
731        assert_eq!(dropped, bad_count);
732        assert!(
733            quotes.len() < CLOSE_GROUP_SIZE,
734            "this is the precondition for InsufficientPeers downstream"
735        );
736        // Sanity: every survivor is storer-acceptable under the full spec.
737        for (peer_id, _, quote, _) in &quotes {
738            assert!(storer_binding_would_accept(peer_id, quote));
739        }
740    }
741
742    // ============================================================
743    // Tests for the per-peer response classifier (the PRIMARY defense).
744    //
745    // These tests exercise the production code path that runs inside
746    // get_store_quotes' per-peer async closure. The defensive
747    // `drop_quotes_with_bad_bindings` is a second line of defence —
748    // these tests make sure the FIRST line is what actually catches
749    // misbehaving peers in production. Without these, a regression
750    // that removes the per-peer check could be masked by the post-
751    // collect filter and pass the rest of the suite.
752    // ============================================================
753
754    /// Helper: serialize a `PaymentQuote` to bytes the way the wire layer
755    /// does (rmp_serde / msgpack), to feed into `classify_quote_response`.
756    fn serialize_quote(quote: &PaymentQuote) -> Vec<u8> {
757        rmp_serde::to_vec(quote).expect("serialize quote")
758    }
759
760    #[test]
761    fn classifier_accepts_real_self_consistent_quote() {
762        let (peer_id, _, quote, _) = good_quote_real();
763        let bytes = serialize_quote(&quote);
764        let result = classify_quote_response(&peer_id, &bytes, false);
765        match result {
766            Ok((q, price)) => {
767                assert_eq!(q.pub_key, quote.pub_key);
768                assert_eq!(price, quote.price);
769            }
770            Err(e) => panic!("expected Ok, got {e}"),
771        }
772    }
773
774    #[test]
775    fn classifier_rejects_crossed_keypair_with_typed_error() {
776        let (peer_id, _, quote, _) = bad_quote_real();
777        let bytes = serialize_quote(&quote);
778        let result = classify_quote_response(&peer_id, &bytes, false);
779        match result {
780            Err(Error::BadQuoteBinding {
781                peer_id: pid,
782                detail,
783            }) => {
784                assert_eq!(pid, peer_id.to_string());
785                assert!(
786                    detail.contains("BLAKE3(pub_key)="),
787                    "diagnostic detail must include the derived peer id: {detail}"
788                );
789            }
790            other => panic!("expected BadQuoteBinding for crossed-key quote, got {other:?}"),
791        }
792    }
793
794    /// CRITICAL: a misbehaving peer that votes `already_stored=true` must
795    /// NOT be allowed to influence the close-group "already stored"
796    /// majority decision. The bind-check runs before the AlreadyStored
797    /// short-circuit, so a crossed-key peer voting "already stored" is
798    /// classified as `BadQuoteBinding`, not `AlreadyStored`.
799    ///
800    /// This locks in a specific reviewer concern from round 1:
801    ///   "A peer with a crossed/garbage signing key could simply respond
802    ///   already_stored=true and its vote enters already_stored_peers
803    ///   unfiltered."
804    #[test]
805    fn classifier_rejects_already_stored_vote_from_bad_binding_peer() {
806        let (peer_id, _, quote, _) = bad_quote_real();
807        let bytes = serialize_quote(&quote);
808        // The peer claims already_stored=true, but its quote has a crossed key.
809        let result = classify_quote_response(&peer_id, &bytes, true);
810        assert!(
811            matches!(result, Err(Error::BadQuoteBinding { .. })),
812            "crossed-key peer must be classified BadQuoteBinding even when \
813             voting already_stored=true; got {result:?}"
814        );
815    }
816
817    /// An honest peer's `already_stored=true` vote IS honoured (after
818    /// passing the bind-check). This is the contrast to the test above.
819    #[test]
820    fn classifier_honours_already_stored_vote_from_good_binding_peer() {
821        let (peer_id, _, quote, _) = good_quote_real();
822        let bytes = serialize_quote(&quote);
823        let result = classify_quote_response(&peer_id, &bytes, true);
824        assert!(
825            matches!(result, Err(Error::AlreadyStored)),
826            "honest peer's already_stored vote must be honoured; got {result:?}"
827        );
828    }
829
830    #[test]
831    fn classifier_returns_serialization_error_on_bad_bytes() {
832        let (peer_id, _, _, _) = good_quote_real();
833        let garbage = b"this is not a valid msgpack PaymentQuote".to_vec();
834        let result = classify_quote_response(&peer_id, &garbage, false);
835        assert!(
836            matches!(result, Err(Error::Serialization(_))),
837            "garbage bytes must produce a Serialization error; got {result:?}"
838        );
839    }
840
841    // ============================================================
842    // AIMD attribution: every error variant is classified correctly
843    // for `record_peer_outcome` so misbehaving peers are deprioritized
844    // and reachable-but-already-storing peers stay reputable.
845    // ============================================================
846
847    #[test]
848    fn aimd_success_for_ok_result() {
849        let (_, _, quote, _) = good_quote_real();
850        let result: std::result::Result<(PaymentQuote, Amount), Error> =
851            Ok((quote.clone(), quote.price));
852        assert!(quote_outcome_is_success(&result));
853    }
854
855    #[test]
856    fn aimd_success_for_already_stored() {
857        let result: std::result::Result<(PaymentQuote, Amount), Error> = Err(Error::AlreadyStored);
858        assert!(
859            quote_outcome_is_success(&result),
860            "an honest peer reporting already_stored is a benign outcome — \
861             the peer is reachable and well-behaved, so the AIMD cache must \
862             keep them at high reputation"
863        );
864    }
865
866    #[test]
867    fn aimd_failure_for_bad_quote_binding() {
868        let result: std::result::Result<(PaymentQuote, Amount), Error> =
869            Err(Error::BadQuoteBinding {
870                peer_id: "abc123".to_string(),
871                detail: "test".to_string(),
872            });
873        assert!(
874            !quote_outcome_is_success(&result),
875            "BadQuoteBinding peers must be marked as failures so the AIMD \
876             bootstrap cache learns to stop asking them on every upload"
877        );
878    }
879
880    #[test]
881    fn aimd_failure_for_network_and_timeout_and_protocol_and_serialization() {
882        for err in [
883            Error::Network("net".to_string()),
884            Error::Timeout("to".to_string()),
885            Error::Protocol("proto".to_string()),
886            Error::Serialization("ser".to_string()),
887        ] {
888            let result: std::result::Result<(PaymentQuote, Amount), Error> = Err(err);
889            assert!(
890                !quote_outcome_is_success(&result),
891                "network-class errors must be classified as failures: {result:?}"
892            );
893        }
894    }
895
896    /// Cross-validate the classifier's binding verdict against the
897    /// independent storer-spec re-derivation across mixed responders.
898    #[test]
899    fn classifier_verdict_matches_storer_binding_spec_for_mixed_responders() {
900        let mut responders: Vec<(PeerId, PaymentQuote)> = (0..12)
901            .map(|_| {
902                let (p, _, q, _) = good_quote_real();
903                (p, q)
904            })
905            .collect();
906        for _ in 0..4 {
907            let (p, _, q, _) = bad_quote_real();
908            responders.push((p, q));
909        }
910
911        for (peer_id, quote) in &responders {
912            let bytes = serialize_quote(quote);
913            let storer_verdict = storer_binding_would_accept(peer_id, quote);
914            let classifier_verdict = classify_quote_response(peer_id, &bytes, false).is_ok();
915            assert_eq!(
916                classifier_verdict, storer_verdict,
917                "classifier and storer-binding-spec must agree on every responder \
918                 (peer_id={}, storer={storer_verdict}, classifier={classifier_verdict})",
919                peer_id
920            );
921        }
922    }
923}