Skip to main content

ant_node/payment/
verifier.rs

1//! Payment verifier with LRU cache and EVM verification.
2//!
3//! This is the core payment verification logic for ant-node.
4//! All new data requires EVM payment on Arbitrum (no free tier).
5
6use crate::ant_protocol::CLOSE_GROUP_SIZE;
7use crate::error::{Error, Result};
8use crate::logging::{debug, info};
9use crate::payment::cache::{CacheStats, VerifiedCache, XorName};
10use crate::payment::proof::{
11    deserialize_merkle_proof, deserialize_proof, detect_proof_type, ProofType,
12};
13use crate::payment::single_node::SingleNodePayment;
14use ant_protocol::payment::verify::{verify_quote_content, verify_quote_signature};
15use evmlib::common::Amount;
16use evmlib::contract::payment_vault;
17use evmlib::merkle_batch_payment::{OnChainPaymentInfo, PoolHash};
18use evmlib::Network as EvmNetwork;
19use evmlib::ProofOfPayment;
20use evmlib::RewardsAddress;
21use lru::LruCache;
22use parking_lot::{Mutex, RwLock};
23use saorsa_core::identity::node_identity::peer_id_from_public_key_bytes;
24use saorsa_core::identity::PeerId;
25use saorsa_core::P2PNode;
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28use std::time::{Duration, SystemTime};
29
30/// Minimum allowed size for a payment proof in bytes.
31///
32/// This minimum ensures the proof contains at least a basic cryptographic hash or identifier.
33/// Proofs smaller than this are rejected as they cannot contain sufficient payment information.
34pub const MIN_PAYMENT_PROOF_SIZE_BYTES: usize = 32;
35
36/// Maximum allowed size for a payment proof in bytes (256 KB).
37///
38/// Single-node proofs with 7 ML-DSA-65 quotes reach ~40 KB.
39/// Merkle proofs include 16 candidate nodes (each with ~1,952-byte ML-DSA pub key
40/// and ~3,309-byte signature) plus merkle branch hashes, totaling ~130 KB.
41/// 256 KB provides headroom while still capping memory during verification.
42pub const MAX_PAYMENT_PROOF_SIZE_BYTES: usize = 262_144;
43
44/// Maximum age of a payment quote before it's considered expired (24 hours).
45/// Prevents replaying old cheap quotes against nearly-full nodes. Past-side
46/// clock skew is absorbed entirely by this window — there is no separate
47/// past-skew tolerance.
48const QUOTE_MAX_AGE_SECS: u64 = 86_400;
49
50/// Maximum tolerated forward skew when a quote's timestamp is ahead of the
51/// verifying node's wall clock (300 seconds). Applies exclusively to the
52/// future direction; past-dated quotes are governed by `QUOTE_MAX_AGE_SECS`.
53const QUOTE_FUTURE_SKEW_TOLERANCE_SECS: u64 = 300;
54
55/// Configuration for EVM payment verification.
56///
57/// EVM verification is always on. All new data requires on-chain
58/// payment verification. The network field selects which EVM chain to use.
59#[derive(Debug, Clone)]
60pub struct EvmVerifierConfig {
61    /// EVM network to use (Arbitrum One, Arbitrum Sepolia, etc.)
62    pub network: EvmNetwork,
63}
64
65impl Default for EvmVerifierConfig {
66    fn default() -> Self {
67        Self {
68            network: EvmNetwork::ArbitrumOne,
69        }
70    }
71}
72
73/// Configuration for the payment verifier.
74///
75/// All new data requires EVM payment on Arbitrum. The cache stores
76/// previously verified payments to avoid redundant on-chain lookups.
77#[derive(Debug, Clone)]
78pub struct PaymentVerifierConfig {
79    /// EVM verifier configuration.
80    pub evm: EvmVerifierConfig,
81    /// Cache capacity (number of `XorName` values to cache).
82    pub cache_capacity: usize,
83    /// Local node's rewards address.
84    /// The verifier rejects payments that don't include this node as a recipient.
85    pub local_rewards_address: RewardsAddress,
86}
87
88/// Status returned by payment verification.
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
90pub enum PaymentStatus {
91    /// Data was found in local cache - previously paid.
92    CachedAsVerified,
93    /// New data - payment required.
94    PaymentRequired,
95    /// Payment was provided and verified.
96    PaymentVerified,
97}
98
99impl PaymentStatus {
100    /// Returns true if the data can be stored (cached or payment verified).
101    #[must_use]
102    pub fn can_store(&self) -> bool {
103        matches!(self, Self::CachedAsVerified | Self::PaymentVerified)
104    }
105
106    /// Returns true if this status indicates the data was already paid for.
107    #[must_use]
108    pub fn is_cached(&self) -> bool {
109        matches!(self, Self::CachedAsVerified)
110    }
111}
112
113/// Default capacity for the merkle pool cache (number of pool hashes to cache).
114const DEFAULT_POOL_CACHE_CAPACITY: usize = 1_000;
115
116/// Main payment verifier for ant-node.
117///
118/// Uses:
119/// 1. LRU cache for fast lookups of previously verified `XorName` values
120/// 2. EVM payment verification for new data (always required)
121/// 3. Pool-level cache for merkle batch payments (avoids repeated on-chain queries)
122pub struct PaymentVerifier {
123    /// LRU cache of verified `XorName` values.
124    cache: VerifiedCache,
125    /// LRU cache of verified merkle pool hashes → on-chain payment info.
126    pool_cache: Mutex<LruCache<PoolHash, OnChainPaymentInfo>>,
127    /// LRU cache of pool hashes whose candidate closeness has already been
128    /// verified by this node. Collapses the per-chunk Kademlia lookup cost
129    /// within a batch (256 chunks × 1 pool = 1 lookup instead of 256).
130    closeness_pass_cache: Mutex<LruCache<PoolHash, ()>>,
131    /// In-flight closeness lookups, keyed by pool hash. Lets concurrent PUTs
132    /// for the same pool coalesce onto a single Kademlia lookup AND share
133    /// its result — on both success and failure — which bounds `DoS`
134    /// amplification to one lookup per unique `pool_hash` regardless of
135    /// concurrency.
136    inflight_closeness: Mutex<LruCache<PoolHash, Arc<ClosenessSlot>>>,
137    /// P2P node handle, attached post-construction so merkle verification can
138    /// check that candidate `pub_keys` map to peers actually close to the pool
139    /// midpoint in the live DHT. `None` in unit tests that don't exercise
140    /// merkle verification; production startup MUST call [`attach_p2p_node`].
141    p2p_node: RwLock<Option<Arc<P2PNode>>>,
142    /// Configuration.
143    config: PaymentVerifierConfig,
144}
145
146/// Shared state for an inflight closeness verification. The leader publishes
147/// its result via the `OnceLock`; waiters read that result directly instead
148/// of racing on a cache re-check. Wrapped in an `Arc` and held both by the
149/// leader's drop guard and by each waiting task.
150struct ClosenessSlot {
151    notify: Arc<tokio::sync::Notify>,
152    /// `Some(Ok(()))` on success, `Some(Err(msg))` on failure, `None` if the
153    /// leader disappeared without publishing (panic, cancellation).
154    result: std::sync::OnceLock<std::result::Result<(), String>>,
155}
156
157impl ClosenessSlot {
158    fn new() -> Self {
159        Self {
160            notify: Arc::new(tokio::sync::Notify::new()),
161            result: std::sync::OnceLock::new(),
162        }
163    }
164
165    /// Build an owned `Notified` future that snapshots the `notify_waiters`
166    /// counter at call time. Awaiting this future after dropping external
167    /// locks is race-free: if `notify_waiters` fires between construction
168    /// and the first poll, the snapshot mismatch resolves the future
169    /// immediately.
170    fn notified_owned(&self) -> tokio::sync::futures::OwnedNotified {
171        Arc::clone(&self.notify).notified_owned()
172    }
173}
174
175/// Drop guard that publishes the leader's result, clears the inflight slot,
176/// and wakes all waiters. Fires on every exit path: success, failure, panic,
177/// future-cancellation.
178///
179/// The guard owns its own `Arc<ClosenessSlot>` so `notify_waiters` still
180/// fires even if LRU pressure evicted the slot before the leader finished.
181/// Waiters see the published result via `result.get()`; the `Notify` is only
182/// the wake-up signal.
183struct InflightGuard<'a> {
184    slot_cache: &'a Mutex<LruCache<PoolHash, Arc<ClosenessSlot>>>,
185    pool_hash: PoolHash,
186    slot: Arc<ClosenessSlot>,
187}
188
189impl InflightGuard<'_> {
190    /// Publish the leader's result. Called exactly once by the leader on
191    /// every successful or explicit-error exit. If dropped without calling
192    /// (panic, cancellation) the guard still wakes waiters but leaves
193    /// `result` empty, which waiters treat as a transient failure and retry.
194    fn publish(&self, result: &Result<()>) {
195        let stored: std::result::Result<(), String> = match result {
196            Ok(()) => Ok(()),
197            Err(e) => Err(e.to_string()),
198        };
199        let _ = self.slot.result.set(stored);
200    }
201}
202
203impl Drop for InflightGuard<'_> {
204    fn drop(&mut self) {
205        // Remove the slot entry if it's still ours. A separate leader may
206        // have inserted a new slot for the same pool_hash after LRU
207        // eviction — don't pop someone else's entry.
208        {
209            let mut cache = self.slot_cache.lock();
210            if let Some(existing) = cache.peek(&self.pool_hash) {
211                if Arc::ptr_eq(existing, &self.slot) {
212                    cache.pop(&self.pool_hash);
213                }
214            }
215        }
216        // Wake every waiter registered against OUR slot, regardless of
217        // whether the cache entry is still ours.
218        self.slot.notify.notify_waiters();
219    }
220}
221
222impl PaymentVerifier {
223    /// Create a new payment verifier.
224    #[must_use]
225    pub fn new(config: PaymentVerifierConfig) -> Self {
226        const _: () = assert!(
227            DEFAULT_POOL_CACHE_CAPACITY > 0,
228            "pool cache capacity must be > 0"
229        );
230        let cache = VerifiedCache::with_capacity(config.cache_capacity);
231        let pool_cache_size =
232            NonZeroUsize::new(DEFAULT_POOL_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
233        let pool_cache = Mutex::new(LruCache::new(pool_cache_size));
234        let closeness_pass_cache = Mutex::new(LruCache::new(pool_cache_size));
235        let inflight_closeness = Mutex::new(LruCache::new(pool_cache_size));
236
237        let cache_capacity = config.cache_capacity;
238        info!("Payment verifier initialized (cache_capacity={cache_capacity}, evm=always-on, pool_cache={DEFAULT_POOL_CACHE_CAPACITY})");
239
240        // Loud warning if a production binary was accidentally built with
241        // `test-utils`: that feature flips the closeness-check fail-open
242        // switch, disabling the pay-yourself defence when P2PNode isn't
243        // attached. Safe in tests, never intended for prod.
244        #[cfg(feature = "test-utils")]
245        crate::logging::error!(
246            "PaymentVerifier: built with `test-utils` feature — merkle closeness \
247             defence falls back to fail-open when no P2PNode is attached. This \
248             feature is for test binaries only; production nodes must be built \
249             without it."
250        );
251
252        Self {
253            cache,
254            pool_cache,
255            closeness_pass_cache,
256            inflight_closeness,
257            p2p_node: RwLock::new(None),
258            config,
259        }
260    }
261
262    /// Attach the node's [`P2PNode`] handle so merkle-payment verification can
263    /// check candidate `pub_keys` against the DHT's actual closest peers to the
264    /// pool midpoint.
265    ///
266    /// Production startup MUST call this once the `P2PNode` exists. Without
267    /// it, the closeness check fails CLOSED in release builds (rejects the
268    /// PUT with a visible error) and fails open in test builds. Idempotent:
269    /// calling twice replaces the handle.
270    pub fn attach_p2p_node(&self, node: Arc<P2PNode>) {
271        *self.p2p_node.write() = Some(node);
272        debug!("PaymentVerifier: P2PNode attached for merkle closeness checks");
273    }
274
275    /// Check if payment is required for the given `XorName`.
276    ///
277    /// This is the main entry point for payment verification:
278    /// 1. Check LRU cache (fast path)
279    /// 2. If not cached, payment is required
280    ///
281    /// # Arguments
282    ///
283    /// * `xorname` - The content-addressed name of the data
284    ///
285    /// # Returns
286    ///
287    /// * `PaymentStatus::CachedAsVerified` - Found in local cache (previously paid)
288    /// * `PaymentStatus::PaymentRequired` - Not cached (payment required)
289    pub fn check_payment_required(&self, xorname: &XorName) -> PaymentStatus {
290        // Check LRU cache (fast path)
291        if self.cache.contains(xorname) {
292            if crate::logging::enabled!(crate::logging::Level::DEBUG) {
293                debug!("Data {} found in verified cache", hex::encode(xorname));
294            }
295            return PaymentStatus::CachedAsVerified;
296        }
297
298        // Not in cache - payment required
299        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
300            debug!(
301                "Data {} not in cache - payment required",
302                hex::encode(xorname)
303            );
304        }
305        PaymentStatus::PaymentRequired
306    }
307
308    /// Verify that a PUT request has valid payment.
309    ///
310    /// This is the complete payment verification flow:
311    /// 1. Check if data is in cache (previously paid)
312    /// 2. If not, verify the provided payment proof
313    ///
314    /// # Arguments
315    ///
316    /// * `xorname` - The content-addressed name of the data
317    /// * `payment_proof` - Optional payment proof (required if not in cache)
318    ///
319    /// # Returns
320    ///
321    /// * `Ok(PaymentStatus)` - Verification succeeded
322    /// * `Err(Error::Payment)` - No payment and not cached, or payment invalid
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if payment is required but not provided, or if payment is invalid.
327    pub async fn verify_payment(
328        &self,
329        xorname: &XorName,
330        payment_proof: Option<&[u8]>,
331    ) -> Result<PaymentStatus> {
332        // First check if payment is required
333        let status = self.check_payment_required(xorname);
334
335        match status {
336            PaymentStatus::CachedAsVerified => {
337                // No payment needed - already in cache
338                Ok(status)
339            }
340            PaymentStatus::PaymentRequired => {
341                // EVM verification is always on — verify the proof
342                if let Some(proof) = payment_proof {
343                    let proof_len = proof.len();
344                    if proof_len < MIN_PAYMENT_PROOF_SIZE_BYTES {
345                        return Err(Error::Payment(format!(
346                            "Payment proof too small: {proof_len} bytes (min {MIN_PAYMENT_PROOF_SIZE_BYTES})"
347                        )));
348                    }
349                    if proof_len > MAX_PAYMENT_PROOF_SIZE_BYTES {
350                        return Err(Error::Payment(format!(
351                            "Payment proof too large: {proof_len} bytes (max {MAX_PAYMENT_PROOF_SIZE_BYTES} bytes)"
352                        )));
353                    }
354
355                    // Detect proof type from version tag byte
356                    match detect_proof_type(proof) {
357                        Some(ProofType::Merkle) => {
358                            self.verify_merkle_payment(xorname, proof).await?;
359                        }
360                        Some(ProofType::SingleNode) => {
361                            let (payment, tx_hashes) = deserialize_proof(proof).map_err(|e| {
362                                Error::Payment(format!("Failed to deserialize payment proof: {e}"))
363                            })?;
364
365                            if !tx_hashes.is_empty() {
366                                debug!("Proof includes {} transaction hash(es)", tx_hashes.len());
367                            }
368
369                            self.verify_evm_payment(xorname, &payment).await?;
370                        }
371                        None => {
372                            let tag = proof.first().copied().unwrap_or(0);
373                            return Err(Error::Payment(format!(
374                                "Unknown payment proof type tag: 0x{tag:02x}"
375                            )));
376                        }
377                        // ant-protocol marks `ProofType` as `#[non_exhaustive]`.
378                        // A future proof variant that this node does not yet
379                        // understand must be rejected, not silently accepted.
380                        Some(_) => {
381                            let tag = proof.first().copied().unwrap_or(0);
382                            return Err(Error::Payment(format!(
383                                "Unsupported payment proof type tag: 0x{tag:02x} (this node's protocol version does not handle it — upgrade ant-node)"
384                            )));
385                        }
386                    }
387
388                    // Cache the verified xorname
389                    self.cache.insert(*xorname);
390
391                    Ok(PaymentStatus::PaymentVerified)
392                } else {
393                    // No payment provided in production mode
394                    let xorname_hex = hex::encode(xorname);
395                    Err(Error::Payment(format!(
396                        "Payment required for new data {xorname_hex}"
397                    )))
398                }
399            }
400            PaymentStatus::PaymentVerified => Err(Error::Payment(
401                "Unexpected PaymentVerified status from check_payment_required".to_string(),
402            )),
403        }
404    }
405
406    /// Get cache statistics.
407    #[must_use]
408    pub fn cache_stats(&self) -> CacheStats {
409        self.cache.stats()
410    }
411
412    /// Get the number of cached entries.
413    #[must_use]
414    pub fn cache_len(&self) -> usize {
415        self.cache.len()
416    }
417
418    /// Pre-populate the payment cache for a given address.
419    ///
420    /// This marks the address as already paid, so subsequent `verify_payment`
421    /// calls will return `CachedAsVerified` without on-chain verification.
422    /// Useful for test setups where real EVM payment is not needed.
423    #[cfg(any(test, feature = "test-utils"))]
424    pub fn cache_insert(&self, xorname: XorName) {
425        self.cache.insert(xorname);
426    }
427
428    /// Pre-populate the merkle pool cache. Testing helper that lets e2e tests
429    /// bypass the on-chain `completedMerklePayments` lookup when the point of
430    /// the test is to exercise merkle-verification logic BEFORE the on-chain
431    /// call (e.g. the pay-yourself closeness check).
432    #[cfg(any(test, feature = "test-utils"))]
433    pub fn pool_cache_insert(&self, pool_hash: PoolHash, info: OnChainPaymentInfo) {
434        let mut cache = self.pool_cache.lock();
435        cache.put(pool_hash, info);
436    }
437
438    /// Verify a single-node EVM payment proof.
439    ///
440    /// Verification steps:
441    /// 1. Exactly `CLOSE_GROUP_SIZE` quotes are present
442    /// 2. All quotes target the correct content address (xorname binding)
443    /// 3. Quote timestamps are fresh (not expired or future-dated)
444    /// 4. Peer ID bindings match the ML-DSA-65 public keys
445    /// 5. This node is among the quoted recipients
446    /// 6. All ML-DSA-65 signatures are valid (offloaded to `spawn_blocking`)
447    /// 7. The median-priced quote was paid at least 3x its price on-chain
448    ///    (looked up via `completedPayments(quoteHash)` on the payment vault)
449    ///
450    /// For unit tests that don't need on-chain verification, pre-populate
451    /// the cache so `verify_payment` returns `CachedAsVerified` before
452    /// reaching this method.
453    async fn verify_evm_payment(&self, xorname: &XorName, payment: &ProofOfPayment) -> Result<()> {
454        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
455            let xorname_hex = hex::encode(xorname);
456            let quote_count = payment.peer_quotes.len();
457            debug!("Verifying EVM payment for {xorname_hex} with {quote_count} quotes");
458        }
459
460        Self::validate_quote_structure(payment)?;
461        Self::validate_quote_content(payment, xorname)?;
462        Self::validate_quote_timestamps(payment)?;
463        Self::validate_peer_bindings(payment)?;
464        self.validate_local_recipient(payment)?;
465
466        // Verify quote signatures (CPU-bound, run off async runtime)
467        let peer_quotes = payment.peer_quotes.clone();
468        tokio::task::spawn_blocking(move || {
469            for (encoded_peer_id, quote) in &peer_quotes {
470                if !verify_quote_signature(quote) {
471                    return Err(Error::Payment(
472                        format!("Quote ML-DSA-65 signature verification failed for peer {encoded_peer_id:?}"),
473                    ));
474                }
475            }
476            Ok(())
477        })
478        .await
479        .map_err(|e| Error::Payment(format!("Signature verification task failed: {e}")))??;
480
481        // Reconstruct the SingleNodePayment to identify the median quote.
482        // from_quotes() sorts by price and marks the median for 3x payment.
483        let quotes_with_prices: Vec<_> = payment
484            .peer_quotes
485            .iter()
486            .map(|(_, quote)| (quote.clone(), quote.price))
487            .collect();
488        let single_payment = SingleNodePayment::from_quotes(quotes_with_prices).map_err(|e| {
489            Error::Payment(format!(
490                "Failed to reconstruct payment for verification: {e}"
491            ))
492        })?;
493
494        // Verify the median quote was paid at least 3x its price on-chain
495        // via completedPayments(quoteHash) on the payment vault contract.
496        let verified_amount = single_payment
497            .verify(&self.config.evm.network)
498            .await
499            .map_err(|e| {
500                let xorname_hex = hex::encode(xorname);
501                Error::Payment(format!(
502                    "Median quote payment verification failed for {xorname_hex}: {e}"
503                ))
504            })?;
505
506        if crate::logging::enabled!(crate::logging::Level::INFO) {
507            let xorname_hex = hex::encode(xorname);
508            info!("EVM payment verified for {xorname_hex} (median paid {verified_amount} atto)");
509        }
510        Ok(())
511    }
512
513    /// Validate quote count, uniqueness, and basic structure.
514    fn validate_quote_structure(payment: &ProofOfPayment) -> Result<()> {
515        if payment.peer_quotes.is_empty() {
516            return Err(Error::Payment("Payment has no quotes".to_string()));
517        }
518
519        let quote_count = payment.peer_quotes.len();
520        if quote_count != CLOSE_GROUP_SIZE {
521            return Err(Error::Payment(format!(
522                "Payment must have exactly {CLOSE_GROUP_SIZE} quotes, got {quote_count}"
523            )));
524        }
525
526        let mut seen: Vec<&evmlib::EncodedPeerId> = Vec::with_capacity(quote_count);
527        for (encoded_peer_id, _) in &payment.peer_quotes {
528            if seen.contains(&encoded_peer_id) {
529                return Err(Error::Payment(format!(
530                    "Duplicate peer ID in payment quotes: {encoded_peer_id:?}"
531                )));
532            }
533            seen.push(encoded_peer_id);
534        }
535
536        Ok(())
537    }
538
539    /// Verify all quotes target the correct content address.
540    fn validate_quote_content(payment: &ProofOfPayment, xorname: &XorName) -> Result<()> {
541        for (encoded_peer_id, quote) in &payment.peer_quotes {
542            if !verify_quote_content(quote, xorname) {
543                let expected_hex = hex::encode(xorname);
544                let actual_hex = hex::encode(quote.content.0);
545                return Err(Error::Payment(format!(
546                    "Quote content address mismatch for peer {encoded_peer_id:?}: expected {expected_hex}, got {actual_hex}"
547                )));
548            }
549        }
550        Ok(())
551    }
552
553    /// Verify quote freshness — reject stale quotes and ones too far in the future.
554    ///
555    /// A quote whose timestamp is in the past is accepted as long as its age
556    /// does not exceed `QUOTE_MAX_AGE_SECS`. A quote whose timestamp is in
557    /// the future relative to this node is accepted only if the forward skew
558    /// does not exceed `QUOTE_FUTURE_SKEW_TOLERANCE_SECS`.
559    fn validate_quote_timestamps(payment: &ProofOfPayment) -> Result<()> {
560        let now = SystemTime::now();
561        let max_age = Duration::from_secs(QUOTE_MAX_AGE_SECS);
562        let max_future_skew = Duration::from_secs(QUOTE_FUTURE_SKEW_TOLERANCE_SECS);
563
564        for (encoded_peer_id, quote) in &payment.peer_quotes {
565            match now.duration_since(quote.timestamp) {
566                Ok(age) => {
567                    if age > max_age {
568                        return Err(Error::Payment(format!(
569                            "Quote from peer {encoded_peer_id:?} expired: age {}s exceeds max {QUOTE_MAX_AGE_SECS}s",
570                            age.as_secs()
571                        )));
572                    }
573                }
574                Err(future) => {
575                    let skew = future.duration();
576                    if skew > max_future_skew {
577                        return Err(Error::Payment(format!(
578                            "Quote from peer {encoded_peer_id:?} has timestamp {}s in the future \
579                             (exceeds {QUOTE_FUTURE_SKEW_TOLERANCE_SECS}s tolerance)",
580                            skew.as_secs()
581                        )));
582                    }
583                }
584            }
585        }
586        Ok(())
587    }
588
589    /// Verify each quote's `pub_key` matches the claimed peer ID via BLAKE3.
590    fn validate_peer_bindings(payment: &ProofOfPayment) -> Result<()> {
591        for (encoded_peer_id, quote) in &payment.peer_quotes {
592            let expected_peer_id = peer_id_from_public_key_bytes(&quote.pub_key)
593                .map_err(|e| Error::Payment(format!("Invalid ML-DSA public key in quote: {e}")))?;
594
595            if expected_peer_id.as_bytes() != encoded_peer_id.as_bytes() {
596                let expected_hex = expected_peer_id.to_hex();
597                let actual_hex = hex::encode(encoded_peer_id.as_bytes());
598                return Err(Error::Payment(format!(
599                    "Quote pub_key does not belong to claimed peer {encoded_peer_id:?}: \
600                     BLAKE3(pub_key) = {expected_hex}, peer_id = {actual_hex}"
601                )));
602            }
603        }
604        Ok(())
605    }
606
607    /// Minimum number of candidate `pub_keys` (out of 16) whose derived `PeerId`
608    /// must match the DHT's actual closest peers to the pool midpoint address.
609    ///
610    /// Set below 16/16 to absorb normal routing-table skew between the
611    /// payer's view and this node's view — on a well-connected network the
612    /// divergence between two nodes' closest-set views is typically 1-2
613    /// peers, occasionally 3 during churn. 13/16 tolerates 3 divergent
614    /// peers while still limiting how many candidates an attacker can
615    /// fabricate before the check bites. A lower threshold (e.g. 9/16)
616    /// would let an attacker who controls 7 real neighbourhood peers plant
617    /// 7 fabricated candidates and still pass.
618    ///
619    /// This is the pure "fabricated key" defence; it does not stop an
620    /// attacker who can grind the pool midpoint address to land near 13
621    /// pre-chosen keys AND run those keys as Sybil DHT participants. That
622    /// requires an orthogonal Sybil-resistance layer and is out of scope
623    /// for this check.
624    const CANDIDATE_CLOSENESS_REQUIRED: usize = 13;
625
626    /// Timeout for the authoritative network lookup used by the closeness
627    /// check.
628    ///
629    /// Iterative Kademlia lookups can cascade through `MAX_ITERATIONS = 20`
630    /// rounds in saorsa-core's `find_closest_nodes_network`, and a single
631    /// unresponsive peer's dial can take 20–30s before timing out. On a
632    /// young network (e.g. fresh testnet, NAT-simulated peers in 30% of
633    /// the swarm) iterations average ~10s each — captured trace from
634    /// STG-01 EWR-3 ant-node-1 just before a pre-fix timeout:
635    ///
636    /// ```text
637    /// Iter 0: +0.0s | Iter 1: +0.2s | Iter 2: +6.6s | Iter 3: +13.1s
638    /// Iter 4: +20.9s | Iter 5: +39.8s | Iter 6: +50.8s | [60s wall]
639    /// ```
640    ///
641    /// 60s caps the lookup at ~7 iterations and rejects honest pools whose
642    /// candidates only emerge after iteration 7. 240s gives ~1.2× headroom
643    /// over the ~200s natural worst-case runtime on a 1k-node testnet.
644    ///
645    /// `DoS` amplification stays bounded at roughly one in-flight lookup
646    /// per unique `pool_hash` under typical load, via
647    /// [`closeness_pass_cache`] + [`inflight_closeness`]. The bound is
648    /// "typical" because `inflight_closeness` is an LRU and a sustained
649    /// flood of unique `pool_hash` entries can evict an in-flight slot,
650    /// at which point a second leader can race for the same pool (see
651    /// [`InflightGuard::drop`]). At steady state the pool cache and pool
652    /// signature verification gate keep this rare in practice.
653    const CLOSENESS_LOOKUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(240);
654
655    /// Width of the storer's authoritative network lookup, in peers.
656    ///
657    /// The client over-queries `2 * CANDIDATES_PER_POOL = 32` peers via
658    /// `find_closest_peers(addr, 32)` (see
659    /// `ant-client/ant-core/src/data/client/merkle.rs::get_merkle_candidate_pool`)
660    /// and selects 16 valid responders by XOR distance — so truly-close
661    /// peers that are slow, NAT'd, or briefly unreachable get filtered
662    /// out and replaced by peers from positions 17–32 of the network's
663    /// actual ranking. The storer must therefore verify against the same
664    /// wider window: a pool containing peers from positions 17–32 is
665    /// honest (those peers really exist in the network's closest-32 set),
666    /// it's just that the client's quote-collection step couldn't reach
667    /// the peers at positions <17 in time.
668    ///
669    /// Empirical effect on STG-01 (1k-node testnet, 30% NAT-simulated):
670    /// widening from K=16 to K=32 dropped client-side closeness
671    /// mismatches from ~115 to ~31 per 5 min, a 73% reduction.
672    ///
673    /// Performance note: `count` does not just truncate the lookup —
674    /// `find_closest_nodes_network` keeps iterating until either
675    /// `MAX_ITERATIONS` is reached or `best_nodes.len() >= count`. K=32
676    /// can therefore extend lookups by a few iterations on sparse
677    /// networks vs K=16, which reinforces (rather than undermines) the
678    /// timeout bump above.
679    ///
680    /// Security: the pay-yourself attack still requires the attacker's
681    /// fabricated `PeerId`s to land in the storer's authoritative top-K.
682    /// K=32 doubles the window vs K=16 (≈1 extra bit of grinding), but
683    /// the dominant cost is still Sybil-grinding midpoint addresses or
684    /// running real nodes near the target — same security floor.
685    /// `CANDIDATE_CLOSENESS_REQUIRED` (13/16) is unchanged.
686    const CLOSENESS_LOOKUP_WIDTH: usize = 2 * evmlib::merkle_payments::CANDIDATES_PER_POOL;
687
688    /// Maximum waiter → leader retries when the leader's future was cancelled
689    /// or panicked before publishing a result. Beyond this the waiter returns
690    /// a visible error rather than spinning indefinitely through a
691    /// cancellation cascade.
692    ///
693    /// Worst-case waiter wall-clock is `(MAX_LEADER_RETRIES + 1) *
694    /// CLOSENESS_LOOKUP_TIMEOUT` (one wait per attempt). Kept low (1)
695    /// because the only realistic trigger is leader future-cancellation,
696    /// which should be extraordinarily rare; under sustained adversarial
697    /// cancellation a higher cap doesn't add resilience, it just hides
698    /// the symptom. With `CLOSENESS_LOOKUP_TIMEOUT = 240s` this caps a
699    /// single user-visible verification at ~8 min worst case (vs ~20 min
700    /// at the previous value of 4).
701    const MAX_LEADER_RETRIES: usize = 1;
702
703    /// Compute the storer's authoritative-lookup width for a candidate pool.
704    ///
705    /// Returns `max(CLOSENESS_LOOKUP_WIDTH, pool_len)`: matches the client's
706    /// over-query width today, and scales with the pool if a future protocol
707    /// bump grows pool size beyond `CLOSENESS_LOOKUP_WIDTH`. Truncating to
708    /// `CLOSENESS_LOOKUP_WIDTH` in that future case would re-open the
709    /// K-too-small failure mode (the storer would reject honest pools whose
710    /// candidates legitimately span a wider XOR range than the storer
711    /// fetched). Pinned by `closeness_lookup_count_uses_max_of_width_and_pool_len`.
712    const fn closeness_lookup_count(pool_len: usize) -> usize {
713        if Self::CLOSENESS_LOOKUP_WIDTH > pool_len {
714            Self::CLOSENESS_LOOKUP_WIDTH
715        } else {
716            pool_len
717        }
718    }
719
720    /// Verify that the candidate pool's `pub_keys` correspond to peers that
721    /// are actually XOR-closest to the pool midpoint address, by querying
722    /// the DHT for its closest peers to that address and requiring that a
723    /// majority of the candidates match.
724    ///
725    /// **What this blocks**: the "pay yourself" attack. Candidate signatures
726    /// only cover `(price, reward_address, timestamp)` and the `pub_key` bytes —
727    /// nothing ties a candidate to a network-registered identity or to the
728    /// pool neighbourhood. Without this check an attacker can generate 16
729    /// ML-DSA keypairs locally, point all 16 `reward_address` fields at a
730    /// single attacker-controlled wallet, submit the merkle payment, and drain
731    /// their own payment back out.
732    ///
733    /// **How it blocks**: each candidate's `PeerId = BLAKE3(pub_key)`; the DHT
734    /// is the authoritative source of "which peers exist at this XOR
735    /// coordinate". If the attacker's 16 fabricated `PeerId`s are not among
736    /// the peers the network actually lists as closest to the pool address,
737    /// the pool is forged.
738    ///
739    /// **Scope**: a `MerklePaymentProof` carries exactly one `winner_pool`
740    /// (the pool the smart contract selected for the batch). Every storing
741    /// node that receives the proof independently re-runs this check against
742    /// that same pool, so a forged pool is rejected at every node it
743    /// reaches.
744    ///
745    /// **Known limitation — Sybil-grinding**: `midpoint_proof.address()` is a
746    /// BLAKE3 hash of attacker-controllable inputs (leaf bytes, tree root,
747    /// timestamp). A determined attacker who *also* runs Sybil DHT nodes can
748    /// grind the midpoint until it lands in a region where 13 of their
749    /// Sybil keys are the true network-closest — at which point this check
750    /// passes for the attacker. Closing that gap requires binding the
751    /// midpoint to an attacker-uncontrolled value (e.g. a block hash at
752    /// payment time or an on-chain VRF) or a Sybil-resistant identity
753    /// layer. This defence raises the attack cost from "free" to "run N
754    /// Sybil nodes AND grind", which is a meaningful but not complete
755    /// improvement.
756    async fn verify_merkle_candidate_closeness(
757        &self,
758        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
759        pool_hash: PoolHash,
760    ) -> Result<()> {
761        // Fast path: this node already verified this pool successfully.
762        // A batch of 256 chunks shares one winner_pool, so without this cache
763        // we'd pay a Kademlia lookup per chunk.
764        if self.closeness_pass_cache.lock().get(&pool_hash).is_some() {
765            return Ok(());
766        }
767
768        // Single-flight: on each attempt, either claim leadership by
769        // inserting a fresh `ClosenessSlot`, or wait on an existing leader
770        // and read its published result. The leader holds an `Arc` to the
771        // slot independent of the LruCache so waiters are still woken if
772        // eviction pressure kicked the cache entry.
773        //
774        // The `notified_owned()` future snapshots the `notify_waiters`
775        // counter at the moment of construction (while we hold the lock),
776        // which makes the subsequent `.await` race-free: if the leader
777        // calls `notify_waiters` between our construction and our poll, the
778        // counter has advanced and the future resolves immediately on first
779        // poll.
780        //
781        // Bounded retry: if we're a waiter and the leader gets cancelled or
782        // panics (slot.result.get() == None after wake-up), we loop back to
783        // claim leadership. `MAX_LEADER_RETRIES` bounds the attempts so
784        // adversarial cancellation cascades cannot spin this indefinitely.
785        for attempt in 0..=Self::MAX_LEADER_RETRIES {
786            // Release the mutex guard explicitly before any await below.
787            // Clippy wants `if let ... else` written as `map_or_else`, but
788            // any such rewrite re-borrows the locked `inflight` inside the
789            // closure and fails the borrow checker — so the lint is
790            // silenced here.
791            #[allow(clippy::option_if_let_else)]
792            let (waiter_slot, leader_slot) = {
793                let mut inflight = self.inflight_closeness.lock();
794                let chosen = if let Some(existing) = inflight.get(&pool_hash) {
795                    (Some(Arc::clone(existing)), None)
796                } else {
797                    let slot = Arc::new(ClosenessSlot::new());
798                    inflight.put(pool_hash, Arc::clone(&slot));
799                    (None, Some(slot))
800                };
801                drop(inflight);
802                chosen
803            };
804
805            if let Some(slot) = waiter_slot {
806                // Build the owned-notified future BEFORE awaiting, so it
807                // snapshots the `notify_waiters` counter now. The slot
808                // already existed when we locked, so the leader is either
809                // running or finished; in both cases the snapshot + counter
810                // check ensures we wake up correctly.
811                let notified = slot.notified_owned();
812                notified.await;
813
814                // Leader published a result — use it directly.
815                if let Some(result) = slot.result.get() {
816                    return result.clone().map_err(Error::Payment);
817                }
818                // Leader disappeared without publishing (panic or
819                // cancellation). Slot was cleared by the leader's drop
820                // guard; loop to become the new leader — unless we've
821                // hit the retry bound (see MAX_LEADER_RETRIES).
822                if attempt == Self::MAX_LEADER_RETRIES {
823                    return Err(Error::Payment(
824                        "Merkle candidate pool rejected: closeness leader \
825                         repeatedly failed to publish a result (likely \
826                         repeated cancellation or panic)."
827                            .into(),
828                    ));
829                }
830                continue;
831            }
832
833            // Leader path. Drop guard clears the slot and wakes waiters on
834            // every exit (success, failure, panic, cancellation).
835            let Some(slot) = leader_slot else {
836                // Unreachable by construction.
837                return Err(Error::Payment(
838                    "internal error: neither leader nor waiter in closeness check".into(),
839                ));
840            };
841            let guard = InflightGuard {
842                slot_cache: &self.inflight_closeness,
843                pool_hash,
844                slot,
845            };
846
847            let result = self.verify_merkle_candidate_closeness_inner(pool).await;
848            guard.publish(&result);
849            if result.is_ok() {
850                self.closeness_pass_cache.lock().put(pool_hash, ());
851            }
852            return result;
853        }
854        // Unreachable: the for-loop body always either `return`s or `continue`s,
855        // and the waiter branch's `continue` only runs when `attempt <
856        // Self::MAX_LEADER_RETRIES`. The last iteration's waiter branch returns
857        // via the retry-bound check; the leader branch always returns.
858        Err(Error::Payment(
859            "internal error: closeness retry loop exited without returning".into(),
860        ))
861    }
862
863    /// Inner closeness check: the actual DHT lookup + set-membership test.
864    /// Wrapped by [`verify_merkle_candidate_closeness`] with a pass-cache and
865    /// single-flight guard so a batch of chunks and a storm of forged PUTs
866    /// don't multiply the lookup cost.
867    /// Derive each candidate's `PeerId` from its `pub_key` and reject the
868    /// pool if any `PeerId` appears more than once.
869    ///
870    /// This is a pure-validation pre-check, runnable without a `P2PNode`:
871    /// catches the case where one real peer's `pub_key` is repeated to
872    /// inflate the closeness match count, without paying for a Kademlia
873    /// lookup. An honest pool has [`evmlib::merkle_payments::CANDIDATES_PER_POOL`]
874    /// distinct candidate `pub_keys` by construction.
875    fn derive_distinct_candidate_peer_ids(
876        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
877    ) -> Result<Vec<PeerId>> {
878        let mut candidate_peer_ids = Vec::with_capacity(pool.candidate_nodes.len());
879        let mut seen = std::collections::HashSet::with_capacity(pool.candidate_nodes.len());
880        for candidate in &pool.candidate_nodes {
881            let pid = peer_id_from_public_key_bytes(&candidate.pub_key).map_err(|e| {
882                Error::Payment(format!(
883                    "Invalid ML-DSA public key in merkle candidate: {e}"
884                ))
885            })?;
886            if !seen.insert(pid) {
887                return Err(Error::Payment(
888                    "Merkle candidate pool rejected: duplicate candidate PeerId. An \
889                     honest pool has 16 distinct candidate pub_keys; duplicates would \
890                     let a single real peer satisfy the closeness threshold by being \
891                     counted multiple times."
892                        .into(),
893                ));
894            }
895            candidate_peer_ids.push(pid);
896        }
897        Ok(candidate_peer_ids)
898    }
899
900    /// Pure-logic closeness check: given the pool's candidate peer IDs and
901    /// the storer's authoritative network view (top-K closest peers to the
902    /// pool midpoint), decide whether the pool passes the
903    /// `CANDIDATE_CLOSENESS_REQUIRED`-of-N threshold.
904    ///
905    /// Extracted from `verify_merkle_candidate_closeness_inner` so tests
906    /// can exercise the matching logic without standing up a real DHT.
907    /// Mirrors the runtime path exactly: same sparse-network short-circuit,
908    /// same set-membership check, same error strings.
909    fn check_closeness_match(
910        candidate_peer_ids: &[PeerId],
911        network_peer_ids: &[PeerId],
912        pool_address: &[u8; 32],
913    ) -> Result<()> {
914        // Sparse-network short-circuit: if the DHT itself returned fewer
915        // peers than the closeness threshold, the proof can never pass —
916        // not because the candidates are forged, but because we don't
917        // have an authoritative view to compare against. Surface this
918        // distinct cause so operators can tell "retry once the network
919        // settles" apart from "this peer sent a forged pool".
920        if network_peer_ids.len() < Self::CANDIDATE_CLOSENESS_REQUIRED {
921            debug!(
922                "Merkle closeness deferred: network lookup returned {} peers \
923                 for pool midpoint {} (need at least {} to verify)",
924                network_peer_ids.len(),
925                hex::encode(pool_address),
926                Self::CANDIDATE_CLOSENESS_REQUIRED,
927            );
928            return Err(Error::Payment(format!(
929                "Merkle candidate pool rejected: authoritative DHT lookup returned \
930                 only {} peers, less than the {} required to verify candidate \
931                 closeness. Retry once the routing table populates further.",
932                network_peer_ids.len(),
933                Self::CANDIDATE_CLOSENESS_REQUIRED,
934            )));
935        }
936
937        // Set-membership check against the returned closest-peers list.
938        // Candidate `PeerId`s are deduplicated upstream, so each match
939        // corresponds to a distinct peer.
940        let network_set: std::collections::HashSet<PeerId> =
941            network_peer_ids.iter().copied().collect();
942        let matched = candidate_peer_ids
943            .iter()
944            .filter(|pid| network_set.contains(pid))
945            .count();
946
947        if matched < Self::CANDIDATE_CLOSENESS_REQUIRED {
948            debug!(
949                "Merkle closeness rejected: {matched}/{} candidates match the DHT's closest peers \
950                 for pool midpoint {} (required: {}, network returned {} peers)",
951                candidate_peer_ids.len(),
952                hex::encode(pool_address),
953                Self::CANDIDATE_CLOSENESS_REQUIRED,
954                network_peer_ids.len(),
955            );
956            return Err(Error::Payment(
957                "Merkle candidate pool rejected: candidate pub_keys do not match the \
958                 network's closest peers to the pool midpoint address. Pools must be \
959                 collected from the pool-address close group, not fabricated off-network."
960                    .into(),
961            ));
962        }
963
964        debug!(
965            "Merkle closeness passed: {matched}/{} candidates matched the DHT's closest peers \
966             for pool midpoint {}",
967            candidate_peer_ids.len(),
968            hex::encode(pool_address),
969        );
970        Ok(())
971    }
972
973    #[allow(clippy::too_many_lines)]
974    async fn verify_merkle_candidate_closeness_inner(
975        &self,
976        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
977    ) -> Result<()> {
978        // Pre-check: catch malformed/hostile pools (duplicate candidate
979        // PeerIds) before paying for the Kademlia lookup. Runs in unit
980        // tests without a P2PNode too.
981        let candidate_peer_ids = Self::derive_distinct_candidate_peer_ids(pool)?;
982
983        // Release the RwLock guard before any await to avoid holding it
984        // across an iterative Kademlia lookup.
985        let attached = self.p2p_node.read().as_ref().map(Arc::clone);
986        let Some(p2p_node) = attached else {
987            // Production must call attach_p2p_node at startup. Fail CLOSED
988            // to avoid silently disabling the defence if a startup path
989            // regresses and loses the attach call. Unit-test builds that
990            // construct a PaymentVerifier directly without exercising merkle
991            // verification are opted-in via `test-utils` to fall back to
992            // fail-open.
993            #[cfg(any(test, feature = "test-utils"))]
994            {
995                crate::logging::warn!(
996                    "PaymentVerifier: no P2PNode attached; merkle pay-yourself \
997                     defence SKIPPED (test build). Production startup MUST call \
998                     PaymentVerifier::attach_p2p_node."
999                );
1000                return Ok(());
1001            }
1002            #[cfg(not(any(test, feature = "test-utils")))]
1003            {
1004                crate::logging::error!(
1005                    "PaymentVerifier: no P2PNode attached; rejecting merkle \
1006                     payment. This is a node-startup bug — \
1007                     PaymentVerifier::attach_p2p_node must be called before \
1008                     any PUT handler runs."
1009                );
1010                return Err(Error::Payment(
1011                    "Merkle candidate pool rejected: verifier is not wired to \
1012                     the P2P layer; cannot verify candidate closeness."
1013                        .into(),
1014                ));
1015            }
1016        };
1017
1018        let pool_address = pool.midpoint_proof.address();
1019        // Match the client's over-query width. The client's
1020        // `get_merkle_candidate_pool` queries 2 × `CANDIDATES_PER_POOL` peers
1021        // and picks the 16 closest *valid responders* — so legitimate pools
1022        // routinely include peers from positions 17–32 of the network's true
1023        // ranking when the closer peers are slow or NAT-stuck. The storer
1024        // must look at the same window or it will reject honest pools with
1025        // no security benefit.
1026        //
1027        // `pool.candidate_nodes` is currently a fixed-size array of length
1028        // `CANDIDATES_PER_POOL` (= 16), so `.max(...)` always evaluates to
1029        // `CLOSENESS_LOOKUP_WIDTH` today. The compile-time
1030        // `const _: () = assert!(WIDTH >= CANDIDATES_PER_POOL)` in the test
1031        // module pins that invariant. The `.max(...)` form is belt-and-braces
1032        // for a hypothetical future protocol that grows pool size to a
1033        // `Vec`-typed candidate set: the storer would scale its lookup with
1034        // the pool rather than truncating, which would otherwise re-open the
1035        // K-too-small failure mode.
1036        let lookup_count = Self::closeness_lookup_count(pool.candidate_nodes.len());
1037        let network_lookup = p2p_node
1038            .dht_manager()
1039            .find_closest_nodes_network(&pool_address.0, lookup_count);
1040        let network_peers =
1041            match tokio::time::timeout(Self::CLOSENESS_LOOKUP_TIMEOUT, network_lookup).await {
1042                Ok(Ok(peers)) => peers,
1043                Ok(Err(e)) => {
1044                    debug!(
1045                        "Merkle closeness network-lookup failed for pool midpoint {}: {e}",
1046                        hex::encode(pool_address.0),
1047                    );
1048                    return Err(Error::Payment(
1049                        "Merkle candidate pool rejected: could not verify candidate \
1050                     closeness against the authoritative network view."
1051                            .into(),
1052                    ));
1053                }
1054                Err(_) => {
1055                    debug!(
1056                        "Merkle closeness network-lookup timeout ({:?}) for pool midpoint {}",
1057                        Self::CLOSENESS_LOOKUP_TIMEOUT,
1058                        hex::encode(pool_address.0),
1059                    );
1060                    return Err(Error::Payment(
1061                        "Merkle candidate pool rejected: authoritative network lookup \
1062                     timed out. Retry once the network lookup completes."
1063                            .into(),
1064                    ));
1065                }
1066            };
1067
1068        let network_peer_ids: Vec<PeerId> = network_peers.iter().map(|n| n.peer_id).collect();
1069        Self::check_closeness_match(&candidate_peer_ids, &network_peer_ids, &pool_address.0)
1070    }
1071
1072    /// Verify a merkle batch payment proof.
1073    ///
1074    /// This verification flow:
1075    /// 1. Deserialize the `MerklePaymentProof`
1076    /// 2. Check pool cache for previously verified pool hash
1077    /// 3. If not cached, query on-chain for payment info
1078    /// 4. Validate the proof against on-chain data
1079    /// 5. Cache the pool hash for subsequent chunk verifications in the same batch
1080    #[allow(clippy::too_many_lines)]
1081    async fn verify_merkle_payment(&self, xorname: &XorName, proof_bytes: &[u8]) -> Result<()> {
1082        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
1083            debug!("Verifying merkle payment for {}", hex::encode(xorname));
1084        }
1085
1086        // Deserialize the merkle proof
1087        let merkle_proof = deserialize_merkle_proof(proof_bytes)
1088            .map_err(|e| Error::Payment(format!("Failed to deserialize merkle proof: {e}")))?;
1089
1090        // Verify the address in the proof matches the xorname being stored
1091        if merkle_proof.address.0 != *xorname {
1092            let proof_hex = hex::encode(merkle_proof.address.0);
1093            let store_hex = hex::encode(xorname);
1094            return Err(Error::Payment(format!(
1095                "Merkle proof address mismatch: proof is for {proof_hex}, but storing {store_hex}"
1096            )));
1097        }
1098
1099        let pool_hash = merkle_proof.winner_pool_hash();
1100
1101        // Run cheap local checks BEFORE expensive on-chain queries.
1102        // This prevents DoS via garbage proofs that trigger RPC lookups.
1103        for candidate in &merkle_proof.winner_pool.candidate_nodes {
1104            if !crate::payment::verify_merkle_candidate_signature(candidate) {
1105                return Err(Error::Payment(format!(
1106                    "Invalid ML-DSA-65 signature on merkle candidate node (reward: {})",
1107                    candidate.reward_address
1108                )));
1109            }
1110        }
1111
1112        // Pay-yourself defence: the candidate pub_keys must map to peers the
1113        // live DHT actually considers closest to the pool midpoint. Without
1114        // this, an attacker can point all 16 reward_address fields at a
1115        // self-owned wallet and drain their own payment. Every storing node
1116        // runs this check against the single `winner_pool` in the proof, so a
1117        // forged pool is rejected everywhere it lands. The pass cache and
1118        // single-flight keyed on pool_hash collapse the Kademlia lookup cost
1119        // within a batch and across concurrent PUTs for the same pool.
1120        self.verify_merkle_candidate_closeness(&merkle_proof.winner_pool, pool_hash)
1121            .await?;
1122
1123        // Check pool cache first
1124        let cached_info = {
1125            let mut pool_cache = self.pool_cache.lock();
1126            pool_cache.get(&pool_hash).cloned()
1127        };
1128
1129        let payment_info = if let Some(info) = cached_info {
1130            debug!("Pool cache hit for hash {}", hex::encode(pool_hash));
1131            info
1132        } else {
1133            // Query on-chain for completed merkle payment
1134            let info =
1135                payment_vault::get_completed_merkle_payment(&self.config.evm.network, pool_hash)
1136                    .await
1137                    .map_err(|e| {
1138                        let pool_hex = hex::encode(pool_hash);
1139                        Error::Payment(format!(
1140                            "Failed to query merkle payment info for pool {pool_hex}: {e}"
1141                        ))
1142                    })?;
1143
1144            let paid_node_addresses: Vec<_> = info
1145                .paidNodeAddresses
1146                .iter()
1147                .map(|pna| (pna.rewardsAddress, usize::from(pna.poolIndex), pna.amount))
1148                .collect();
1149
1150            let on_chain_info = OnChainPaymentInfo {
1151                depth: info.depth,
1152                merkle_payment_timestamp: info.merklePaymentTimestamp,
1153                paid_node_addresses,
1154            };
1155
1156            // Cache the pool info for subsequent chunks in the same batch
1157            {
1158                let mut pool_cache = self.pool_cache.lock();
1159                pool_cache.put(pool_hash, on_chain_info.clone());
1160            }
1161
1162            debug!(
1163                "Queried on-chain merkle payment info for pool {}: depth={}, timestamp={}, paid_nodes={}",
1164                hex::encode(pool_hash),
1165                on_chain_info.depth,
1166                on_chain_info.merkle_payment_timestamp,
1167                on_chain_info.paid_node_addresses.len()
1168            );
1169
1170            on_chain_info
1171        };
1172
1173        // Verify timestamp consistency (signatures already checked above before RPC).
1174        for candidate in &merkle_proof.winner_pool.candidate_nodes {
1175            if candidate.merkle_payment_timestamp != payment_info.merkle_payment_timestamp {
1176                return Err(Error::Payment(format!(
1177                    "Candidate timestamp mismatch: expected {}, got {} (reward: {})",
1178                    payment_info.merkle_payment_timestamp,
1179                    candidate.merkle_payment_timestamp,
1180                    candidate.reward_address
1181                )));
1182            }
1183        }
1184
1185        // Get the root from the winner pool's midpoint proof
1186        let smart_contract_root = merkle_proof.winner_pool.midpoint_proof.root();
1187
1188        // Verify the cryptographic merkle proofs (address belongs to tree,
1189        // midpoint belongs to tree, roots match, timestamps valid).
1190        evmlib::merkle_payments::verify_merkle_proof(
1191            &merkle_proof.address,
1192            &merkle_proof.data_proof,
1193            &merkle_proof.winner_pool.midpoint_proof,
1194            payment_info.depth,
1195            smart_contract_root,
1196            payment_info.merkle_payment_timestamp,
1197        )
1198        .map_err(|e| {
1199            let xorname_hex = hex::encode(xorname);
1200            Error::Payment(format!(
1201                "Merkle proof verification failed for {xorname_hex}: {e}"
1202            ))
1203        })?;
1204
1205        // Verify paid node count matches depth
1206        let expected_depth = payment_info.depth as usize;
1207        let actual_paid = payment_info.paid_node_addresses.len();
1208        if actual_paid != expected_depth {
1209            return Err(Error::Payment(format!(
1210                "Wrong number of paid nodes: expected {expected_depth}, got {actual_paid}"
1211            )));
1212        }
1213
1214        // Compute expected per-node payment using the contract formula:
1215        // totalAmount = median16(candidate_prices) * (1 << depth)
1216        // amountPerNode = totalAmount / depth
1217        let expected_per_node = if payment_info.depth > 0 {
1218            let mut candidate_prices: Vec<Amount> = merkle_proof
1219                .winner_pool
1220                .candidate_nodes
1221                .iter()
1222                .map(|c| c.price)
1223                .collect();
1224            candidate_prices.sort_unstable(); // ascending
1225                                              // Upper median (index 8 of 16) — matches Solidity's median16 (k = 8)
1226            let median_price = *candidate_prices
1227                .get(candidate_prices.len() / 2)
1228                .ok_or_else(|| Error::Payment("empty candidate pool in merkle proof".into()))?;
1229            let shift = u32::from(payment_info.depth);
1230            let multiplier = 1u64
1231                .checked_shl(shift)
1232                .ok_or_else(|| Error::Payment("merkle proof depth too large".into()))?;
1233            let total_amount = median_price * Amount::from(multiplier);
1234            total_amount / Amount::from(u64::from(payment_info.depth))
1235        } else {
1236            Amount::ZERO
1237        };
1238
1239        // Verify paid node indices, addresses, and amounts against the candidate pool.
1240        //
1241        // Each paid node must:
1242        // 1. Have a valid index within the candidate pool
1243        // 2. Match the expected reward address at that index
1244        // 3. Have been paid at least the expected per-node amount from the
1245        //    contract formula: median16(prices) * 2^depth / depth
1246        //
1247        // Note: unlike single-node payments, merkle proofs are NOT bound to a
1248        // specific storing node. The contract pays `depth` random nodes from the
1249        // winner pool; the storing node is whichever close-group peer the client
1250        // routes the chunk to. There is no local-recipient check here because
1251        // any node that can verify the merkle proof is allowed to store the chunk.
1252        // Replay protection comes from the per-address proof binding (each proof
1253        // is for a specific XorName in the paid tree).
1254        for (addr, idx, paid_amount) in &payment_info.paid_node_addresses {
1255            let node = merkle_proof
1256                .winner_pool
1257                .candidate_nodes
1258                .get(*idx)
1259                .ok_or_else(|| {
1260                    Error::Payment(format!(
1261                        "Paid node index {idx} out of bounds for pool size {}",
1262                        merkle_proof.winner_pool.candidate_nodes.len()
1263                    ))
1264                })?;
1265            if node.reward_address != *addr {
1266                return Err(Error::Payment(format!(
1267                    "Paid node address mismatch at index {idx}: expected {addr}, got {}",
1268                    node.reward_address
1269                )));
1270            }
1271            if *paid_amount < expected_per_node {
1272                return Err(Error::Payment(format!(
1273                    "Underpayment for node at index {idx}: paid {paid_amount}, \
1274                     expected at least {expected_per_node} \
1275                     (median16 formula, depth={})",
1276                    payment_info.depth
1277                )));
1278            }
1279        }
1280
1281        if crate::logging::enabled!(crate::logging::Level::INFO) {
1282            info!(
1283                "Merkle payment verified for {} (pool: {})",
1284                hex::encode(xorname),
1285                hex::encode(pool_hash)
1286            );
1287        }
1288
1289        Ok(())
1290    }
1291
1292    /// Verify this node is among the paid recipients.
1293    fn validate_local_recipient(&self, payment: &ProofOfPayment) -> Result<()> {
1294        let local_addr = &self.config.local_rewards_address;
1295        let is_recipient = payment
1296            .peer_quotes
1297            .iter()
1298            .any(|(_, quote)| quote.rewards_address == *local_addr);
1299        if !is_recipient {
1300            return Err(Error::Payment(
1301                "Payment proof does not include this node as a recipient".to_string(),
1302            ));
1303        }
1304        Ok(())
1305    }
1306}
1307
1308#[cfg(test)]
1309#[allow(clippy::expect_used, clippy::panic)]
1310mod tests {
1311    use super::*;
1312    use evmlib::merkle_payments::MerklePaymentCandidatePool;
1313
1314    /// Create a verifier for unit tests. EVM is always on, but tests can
1315    /// pre-populate the cache to bypass on-chain verification.
1316    fn create_test_verifier() -> PaymentVerifier {
1317        let config = PaymentVerifierConfig {
1318            evm: EvmVerifierConfig::default(),
1319            cache_capacity: 100,
1320            local_rewards_address: RewardsAddress::new([1u8; 20]),
1321        };
1322        PaymentVerifier::new(config)
1323    }
1324
1325    #[test]
1326    fn test_payment_required_for_new_data() {
1327        let verifier = create_test_verifier();
1328        let xorname = [1u8; 32];
1329
1330        // All uncached data requires payment
1331        let status = verifier.check_payment_required(&xorname);
1332        assert_eq!(status, PaymentStatus::PaymentRequired);
1333    }
1334
1335    #[test]
1336    fn test_cache_hit() {
1337        let verifier = create_test_verifier();
1338        let xorname = [1u8; 32];
1339
1340        // Manually add to cache
1341        verifier.cache.insert(xorname);
1342
1343        // Should return CachedAsVerified
1344        let status = verifier.check_payment_required(&xorname);
1345        assert_eq!(status, PaymentStatus::CachedAsVerified);
1346    }
1347
1348    #[tokio::test]
1349    async fn test_verify_payment_without_proof_rejected() {
1350        let verifier = create_test_verifier();
1351        let xorname = [1u8; 32];
1352
1353        // No proof provided => should return an error (EVM is always on)
1354        let result = verifier.verify_payment(&xorname, None).await;
1355        assert!(
1356            result.is_err(),
1357            "Expected Err without proof, got: {result:?}"
1358        );
1359    }
1360
1361    #[tokio::test]
1362    async fn test_verify_payment_cached() {
1363        let verifier = create_test_verifier();
1364        let xorname = [1u8; 32];
1365
1366        // Add to cache — simulates previously-paid data
1367        verifier.cache.insert(xorname);
1368
1369        // Should succeed without payment (cached)
1370        let result = verifier.verify_payment(&xorname, None).await;
1371        assert!(result.is_ok());
1372        assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified);
1373    }
1374
1375    #[test]
1376    fn test_payment_status_can_store() {
1377        assert!(PaymentStatus::CachedAsVerified.can_store());
1378        assert!(PaymentStatus::PaymentVerified.can_store());
1379        assert!(!PaymentStatus::PaymentRequired.can_store());
1380    }
1381
1382    #[test]
1383    fn test_payment_status_is_cached() {
1384        assert!(PaymentStatus::CachedAsVerified.is_cached());
1385        assert!(!PaymentStatus::PaymentVerified.is_cached());
1386        assert!(!PaymentStatus::PaymentRequired.is_cached());
1387    }
1388
1389    #[tokio::test]
1390    async fn test_cache_preload_bypasses_evm() {
1391        let verifier = create_test_verifier();
1392        let xorname = [42u8; 32];
1393
1394        // Not yet cached — should require payment
1395        assert_eq!(
1396            verifier.check_payment_required(&xorname),
1397            PaymentStatus::PaymentRequired
1398        );
1399
1400        // Pre-populate cache (simulates a previous successful payment)
1401        verifier.cache.insert(xorname);
1402
1403        // Now the xorname should be cached
1404        assert_eq!(
1405            verifier.check_payment_required(&xorname),
1406            PaymentStatus::CachedAsVerified
1407        );
1408    }
1409
1410    #[tokio::test]
1411    async fn test_proof_too_small() {
1412        let verifier = create_test_verifier();
1413        let xorname = [1u8; 32];
1414
1415        // Proof smaller than MIN_PAYMENT_PROOF_SIZE_BYTES
1416        let small_proof = vec![0u8; MIN_PAYMENT_PROOF_SIZE_BYTES - 1];
1417        let result = verifier.verify_payment(&xorname, Some(&small_proof)).await;
1418        assert!(result.is_err());
1419        let err_msg = format!("{}", result.expect_err("should fail"));
1420        assert!(
1421            err_msg.contains("too small"),
1422            "Error should mention 'too small': {err_msg}"
1423        );
1424    }
1425
1426    #[tokio::test]
1427    async fn test_proof_too_large() {
1428        let verifier = create_test_verifier();
1429        let xorname = [2u8; 32];
1430
1431        // Proof larger than MAX_PAYMENT_PROOF_SIZE_BYTES
1432        let large_proof = vec![0u8; MAX_PAYMENT_PROOF_SIZE_BYTES + 1];
1433        let result = verifier.verify_payment(&xorname, Some(&large_proof)).await;
1434        assert!(result.is_err());
1435        let err_msg = format!("{}", result.expect_err("should fail"));
1436        assert!(
1437            err_msg.contains("too large"),
1438            "Error should mention 'too large': {err_msg}"
1439        );
1440    }
1441
1442    #[tokio::test]
1443    async fn test_proof_at_min_boundary_unknown_tag() {
1444        let verifier = create_test_verifier();
1445        let xorname = [3u8; 32];
1446
1447        // Exactly MIN_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected
1448        let boundary_proof = vec![0xFFu8; MIN_PAYMENT_PROOF_SIZE_BYTES];
1449        let result = verifier
1450            .verify_payment(&xorname, Some(&boundary_proof))
1451            .await;
1452        assert!(result.is_err());
1453        let err_msg = format!("{}", result.expect_err("should fail"));
1454        assert!(
1455            err_msg.contains("Unknown payment proof type tag"),
1456            "Error should mention unknown tag: {err_msg}"
1457        );
1458    }
1459
1460    #[tokio::test]
1461    async fn test_proof_at_max_boundary_unknown_tag() {
1462        let verifier = create_test_verifier();
1463        let xorname = [4u8; 32];
1464
1465        // Exactly MAX_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected
1466        let boundary_proof = vec![0xFFu8; MAX_PAYMENT_PROOF_SIZE_BYTES];
1467        let result = verifier
1468            .verify_payment(&xorname, Some(&boundary_proof))
1469            .await;
1470        assert!(result.is_err());
1471        let err_msg = format!("{}", result.expect_err("should fail"));
1472        assert!(
1473            err_msg.contains("Unknown payment proof type tag"),
1474            "Error should mention unknown tag: {err_msg}"
1475        );
1476    }
1477
1478    #[tokio::test]
1479    async fn test_malformed_single_node_proof() {
1480        let verifier = create_test_verifier();
1481        let xorname = [5u8; 32];
1482
1483        // Valid tag (0x01) but garbage payload — should fail deserialization
1484        let mut garbage = vec![crate::ant_protocol::PROOF_TAG_SINGLE_NODE];
1485        garbage.extend_from_slice(&[0xAB; 63]);
1486        let result = verifier.verify_payment(&xorname, Some(&garbage)).await;
1487        assert!(result.is_err());
1488        let err_msg = format!("{}", result.expect_err("should fail"));
1489        assert!(
1490            err_msg.contains("deserialize") || err_msg.contains("Failed"),
1491            "Error should mention deserialization failure: {err_msg}"
1492        );
1493    }
1494
1495    #[test]
1496    fn test_cache_len_getter() {
1497        let verifier = create_test_verifier();
1498        assert_eq!(verifier.cache_len(), 0);
1499
1500        verifier.cache.insert([10u8; 32]);
1501        assert_eq!(verifier.cache_len(), 1);
1502
1503        verifier.cache.insert([20u8; 32]);
1504        assert_eq!(verifier.cache_len(), 2);
1505    }
1506
1507    #[test]
1508    fn test_cache_stats_after_operations() {
1509        let verifier = create_test_verifier();
1510        let xorname = [7u8; 32];
1511
1512        // Miss
1513        verifier.check_payment_required(&xorname);
1514        let stats = verifier.cache_stats();
1515        assert_eq!(stats.misses, 1);
1516        assert_eq!(stats.hits, 0);
1517
1518        // Insert and hit
1519        verifier.cache.insert(xorname);
1520        verifier.check_payment_required(&xorname);
1521        let stats = verifier.cache_stats();
1522        assert_eq!(stats.hits, 1);
1523        assert_eq!(stats.misses, 1);
1524        assert_eq!(stats.additions, 1);
1525    }
1526
1527    #[tokio::test]
1528    async fn test_concurrent_cache_lookups() {
1529        let verifier = std::sync::Arc::new(create_test_verifier());
1530
1531        // Pre-populate cache for all 10 xornames
1532        for i in 0..10u8 {
1533            verifier.cache.insert([i; 32]);
1534        }
1535
1536        let mut handles = Vec::new();
1537        for i in 0..10u8 {
1538            let v = verifier.clone();
1539            handles.push(tokio::spawn(async move {
1540                let xorname = [i; 32];
1541                v.verify_payment(&xorname, None).await
1542            }));
1543        }
1544
1545        for handle in handles {
1546            let result = handle.await.expect("task panicked");
1547            assert!(result.is_ok());
1548            assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified);
1549        }
1550
1551        assert_eq!(verifier.cache_len(), 10);
1552    }
1553
1554    #[test]
1555    fn test_default_evm_config() {
1556        let _config = EvmVerifierConfig::default();
1557        // EVM is always on — default network is ArbitrumOne
1558    }
1559
1560    #[test]
1561    fn test_real_ml_dsa_proof_size_within_limits() {
1562        use crate::payment::metrics::QuotingMetricsTracker;
1563        use crate::payment::proof::PaymentProof;
1564        use crate::payment::quote::{QuoteGenerator, XorName};
1565        use alloy::primitives::FixedBytes;
1566        use evmlib::{EncodedPeerId, RewardsAddress};
1567        use saorsa_core::MlDsa65;
1568        use saorsa_pqc::pqc::types::MlDsaSecretKey;
1569        use saorsa_pqc::pqc::MlDsaOperations;
1570
1571        let ml_dsa = MlDsa65::new();
1572        let mut peer_quotes = Vec::new();
1573
1574        for i in 0..5u8 {
1575            let (public_key, secret_key) = ml_dsa.generate_keypair().expect("keygen");
1576
1577            let rewards_address = RewardsAddress::new([i; 20]);
1578            let metrics_tracker = QuotingMetricsTracker::new(0);
1579            let mut generator = QuoteGenerator::new(rewards_address, metrics_tracker);
1580
1581            let pub_key_bytes = public_key.as_bytes().to_vec();
1582            let sk_bytes = secret_key.as_bytes().to_vec();
1583            generator.set_signer(pub_key_bytes, move |msg| {
1584                let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("sk parse");
1585                let ml_dsa = MlDsa65::new();
1586                ml_dsa.sign(&sk, msg).expect("sign").as_bytes().to_vec()
1587            });
1588
1589            let content: XorName = [i; 32];
1590            let quote = generator.create_quote(content, 4096, 0).expect("quote");
1591
1592            peer_quotes.push((EncodedPeerId::new(rand::random()), quote));
1593        }
1594
1595        let proof = PaymentProof {
1596            proof_of_payment: ProofOfPayment { peer_quotes },
1597            tx_hashes: vec![FixedBytes::from([0xABu8; 32])],
1598        };
1599
1600        let proof_bytes =
1601            crate::payment::proof::serialize_single_node_proof(&proof).expect("serialize");
1602
1603        // 7 ML-DSA-65 quotes with ~1952-byte pub keys and ~3309-byte signatures
1604        // should produce a proof in the 30-80 KB range
1605        assert!(
1606            proof_bytes.len() > 20_000,
1607            "Real 7-quote ML-DSA proof should be > 20 KB, got {} bytes",
1608            proof_bytes.len()
1609        );
1610        assert!(
1611            proof_bytes.len() < MAX_PAYMENT_PROOF_SIZE_BYTES,
1612            "Real 7-quote ML-DSA proof ({} bytes) should fit within {} byte limit",
1613            proof_bytes.len(),
1614            MAX_PAYMENT_PROOF_SIZE_BYTES
1615        );
1616    }
1617
1618    #[tokio::test]
1619    async fn test_content_address_mismatch_rejected() {
1620        use crate::payment::proof::{serialize_single_node_proof, PaymentProof};
1621        use evmlib::{EncodedPeerId, PaymentQuote, RewardsAddress};
1622        use std::time::SystemTime;
1623
1624        let verifier = create_test_verifier();
1625
1626        // The xorname we're trying to store
1627        let target_xorname = [0xAAu8; 32];
1628
1629        // Create a quote for a DIFFERENT xorname
1630        let wrong_xorname = [0xBBu8; 32];
1631        let quote = PaymentQuote {
1632            content: xor_name::XorName(wrong_xorname),
1633            timestamp: SystemTime::now(),
1634            price: Amount::from(1u64),
1635            rewards_address: RewardsAddress::new([1u8; 20]),
1636            pub_key: vec![0u8; 64],
1637            signature: vec![0u8; 64],
1638        };
1639
1640        // Build CLOSE_GROUP_SIZE quotes with distinct peer IDs
1641        let mut peer_quotes = Vec::new();
1642        for _ in 0..CLOSE_GROUP_SIZE {
1643            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1644        }
1645
1646        let proof = PaymentProof {
1647            proof_of_payment: ProofOfPayment { peer_quotes },
1648            tx_hashes: vec![],
1649        };
1650
1651        let proof_bytes = serialize_single_node_proof(&proof).expect("serialize proof");
1652
1653        let result = verifier
1654            .verify_payment(&target_xorname, Some(&proof_bytes))
1655            .await;
1656
1657        assert!(result.is_err(), "Should reject mismatched content address");
1658        let err_msg = format!("{}", result.expect_err("should be error"));
1659        assert!(
1660            err_msg.contains("content address mismatch"),
1661            "Error should mention 'content address mismatch': {err_msg}"
1662        );
1663    }
1664
1665    /// Helper: create a fake quote with the given xorname and timestamp.
1666    fn make_fake_quote(
1667        xorname: [u8; 32],
1668        timestamp: SystemTime,
1669        rewards_address: RewardsAddress,
1670    ) -> evmlib::PaymentQuote {
1671        use evmlib::PaymentQuote;
1672
1673        PaymentQuote {
1674            content: xor_name::XorName(xorname),
1675            timestamp,
1676            price: Amount::from(1u64),
1677            rewards_address,
1678            pub_key: vec![0u8; 64],
1679            signature: vec![0u8; 64],
1680        }
1681    }
1682
1683    /// Helper: wrap quotes into a tagged serialized `PaymentProof`.
1684    fn serialize_proof(peer_quotes: Vec<(evmlib::EncodedPeerId, evmlib::PaymentQuote)>) -> Vec<u8> {
1685        use crate::payment::proof::{serialize_single_node_proof, PaymentProof};
1686
1687        let proof = PaymentProof {
1688            proof_of_payment: ProofOfPayment { peer_quotes },
1689            tx_hashes: vec![],
1690        };
1691        serialize_single_node_proof(&proof).expect("serialize proof")
1692    }
1693
1694    #[tokio::test]
1695    async fn test_expired_quote_rejected() {
1696        use evmlib::{EncodedPeerId, RewardsAddress};
1697        use std::time::Duration;
1698
1699        let verifier = create_test_verifier();
1700        let xorname = [0xCCu8; 32];
1701        let rewards_addr = RewardsAddress::new([1u8; 20]);
1702
1703        // Create a quote that's 25 hours old (exceeds 24-hour max)
1704        let old_timestamp = SystemTime::now() - Duration::from_secs(25 * 3600);
1705        let quote = make_fake_quote(xorname, old_timestamp, rewards_addr);
1706
1707        let mut peer_quotes = Vec::new();
1708        for _ in 0..CLOSE_GROUP_SIZE {
1709            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1710        }
1711
1712        let proof_bytes = serialize_proof(peer_quotes);
1713        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1714
1715        assert!(result.is_err(), "Should reject expired quote");
1716        let err_msg = format!("{}", result.expect_err("should fail"));
1717        assert!(
1718            err_msg.contains("expired"),
1719            "Error should mention 'expired': {err_msg}"
1720        );
1721    }
1722
1723    #[tokio::test]
1724    async fn test_future_timestamp_rejected() {
1725        use evmlib::{EncodedPeerId, RewardsAddress};
1726        use std::time::Duration;
1727
1728        let verifier = create_test_verifier();
1729        let xorname = [0xDDu8; 32];
1730        let rewards_addr = RewardsAddress::new([1u8; 20]);
1731
1732        // Create a quote with a timestamp 1 hour in the future
1733        let future_timestamp = SystemTime::now() + Duration::from_secs(3600);
1734        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
1735
1736        let mut peer_quotes = Vec::new();
1737        for _ in 0..CLOSE_GROUP_SIZE {
1738            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1739        }
1740
1741        let proof_bytes = serialize_proof(peer_quotes);
1742        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1743
1744        assert!(result.is_err(), "Should reject future-timestamped quote");
1745        let err_msg = format!("{}", result.expect_err("should fail"));
1746        assert!(
1747            err_msg.contains("future"),
1748            "Error should mention 'future': {err_msg}"
1749        );
1750    }
1751
1752    #[tokio::test]
1753    async fn test_quote_within_clock_skew_tolerance_accepted() {
1754        use evmlib::{EncodedPeerId, RewardsAddress};
1755        use std::time::Duration;
1756
1757        let verifier = create_test_verifier();
1758        let xorname = [0xD1u8; 32];
1759        let rewards_addr = RewardsAddress::new([1u8; 20]);
1760
1761        // Quote 30 seconds in the future — well within 300s tolerance
1762        let future_timestamp = SystemTime::now() + Duration::from_secs(30);
1763        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
1764
1765        let mut peer_quotes = Vec::new();
1766        for _ in 0..CLOSE_GROUP_SIZE {
1767            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1768        }
1769
1770        let proof_bytes = serialize_proof(peer_quotes);
1771        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1772
1773        // Should NOT fail at timestamp check (will fail later at pub_key binding)
1774        let err_msg = format!("{}", result.expect_err("should fail at later check"));
1775        assert!(
1776            !err_msg.contains("future"),
1777            "Should pass timestamp check (within tolerance), but got: {err_msg}"
1778        );
1779    }
1780
1781    #[tokio::test]
1782    async fn test_quote_just_beyond_clock_skew_tolerance_rejected() {
1783        use evmlib::{EncodedPeerId, RewardsAddress};
1784        use std::time::Duration;
1785
1786        let verifier = create_test_verifier();
1787        let xorname = [0xD2u8; 32];
1788        let rewards_addr = RewardsAddress::new([1u8; 20]);
1789
1790        // Quote 360 seconds in the future — exceeds 300s tolerance
1791        let future_timestamp = SystemTime::now() + Duration::from_secs(360);
1792        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
1793
1794        let mut peer_quotes = Vec::new();
1795        for _ in 0..CLOSE_GROUP_SIZE {
1796            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1797        }
1798
1799        let proof_bytes = serialize_proof(peer_quotes);
1800        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1801
1802        assert!(
1803            result.is_err(),
1804            "Should reject quote beyond clock skew tolerance"
1805        );
1806        let err_msg = format!("{}", result.expect_err("should fail"));
1807        assert!(
1808            err_msg.contains("future"),
1809            "Error should mention 'future': {err_msg}"
1810        );
1811    }
1812
1813    #[tokio::test]
1814    async fn test_quote_23h_old_still_accepted() {
1815        use evmlib::{EncodedPeerId, RewardsAddress};
1816        use std::time::Duration;
1817
1818        let verifier = create_test_verifier();
1819        let xorname = [0xD3u8; 32];
1820        let rewards_addr = RewardsAddress::new([1u8; 20]);
1821
1822        // Quote 23 hours old — within 24h max age
1823        let old_timestamp = SystemTime::now() - Duration::from_secs(23 * 3600);
1824        let quote = make_fake_quote(xorname, old_timestamp, rewards_addr);
1825
1826        let mut peer_quotes = Vec::new();
1827        for _ in 0..CLOSE_GROUP_SIZE {
1828            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1829        }
1830
1831        let proof_bytes = serialize_proof(peer_quotes);
1832        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1833
1834        // Should NOT fail at timestamp check (will fail later at pub_key binding)
1835        let err_msg = format!("{}", result.expect_err("should fail at later check"));
1836        assert!(
1837            !err_msg.contains("expired"),
1838            "Should pass expiry check (23h < 24h), but got: {err_msg}"
1839        );
1840    }
1841
1842    /// Helper: build an `EncodedPeerId` that matches the BLAKE3 hash of an ML-DSA public key.
1843    fn encoded_peer_id_for_pub_key(pub_key: &[u8]) -> evmlib::EncodedPeerId {
1844        let ant_peer_id = peer_id_from_public_key_bytes(pub_key).expect("valid ML-DSA pub key");
1845        evmlib::EncodedPeerId::new(*ant_peer_id.as_bytes())
1846    }
1847
1848    #[tokio::test]
1849    async fn test_local_not_in_paid_set_rejected() {
1850        use evmlib::RewardsAddress;
1851        use saorsa_core::MlDsa65;
1852        use saorsa_pqc::pqc::MlDsaOperations;
1853
1854        // Verifier with a local rewards address set
1855        let local_addr = RewardsAddress::new([0xAAu8; 20]);
1856        let config = PaymentVerifierConfig {
1857            evm: EvmVerifierConfig {
1858                network: EvmNetwork::ArbitrumOne,
1859            },
1860            cache_capacity: 100,
1861            local_rewards_address: local_addr,
1862        };
1863        let verifier = PaymentVerifier::new(config);
1864
1865        let xorname = [0xEEu8; 32];
1866        // Quotes pay a DIFFERENT rewards address
1867        let other_addr = RewardsAddress::new([0xBBu8; 20]);
1868
1869        // Use real ML-DSA keys so the pub_key→peer_id binding check passes
1870        let ml_dsa = MlDsa65::new();
1871        let mut peer_quotes = Vec::new();
1872        for _ in 0..CLOSE_GROUP_SIZE {
1873            let (public_key, _secret_key) = ml_dsa.generate_keypair().expect("keygen");
1874            let pub_key_bytes = public_key.as_bytes().to_vec();
1875            let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes);
1876
1877            let mut quote = make_fake_quote(xorname, SystemTime::now(), other_addr);
1878            quote.pub_key = pub_key_bytes;
1879
1880            peer_quotes.push((encoded, quote));
1881        }
1882
1883        let proof_bytes = serialize_proof(peer_quotes);
1884        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1885
1886        assert!(result.is_err(), "Should reject payment not addressed to us");
1887        let err_msg = format!("{}", result.expect_err("should fail"));
1888        assert!(
1889            err_msg.contains("does not include this node as a recipient"),
1890            "Error should mention recipient rejection: {err_msg}"
1891        );
1892    }
1893
1894    #[tokio::test]
1895    async fn test_wrong_peer_binding_rejected() {
1896        use evmlib::{EncodedPeerId, RewardsAddress};
1897        use saorsa_core::MlDsa65;
1898        use saorsa_pqc::pqc::MlDsaOperations;
1899
1900        let verifier = create_test_verifier();
1901        let xorname = [0xFFu8; 32];
1902        let rewards_addr = RewardsAddress::new([1u8; 20]);
1903
1904        // Generate a real ML-DSA keypair so pub_key is valid
1905        let ml_dsa = MlDsa65::new();
1906        let (public_key, _secret_key) = ml_dsa.generate_keypair().expect("keygen");
1907        let pub_key_bytes = public_key.as_bytes().to_vec();
1908
1909        // Create a quote with a real pub_key but attach it to a random peer ID
1910        // whose identity multihash does NOT match BLAKE3(pub_key)
1911        let mut quote = make_fake_quote(xorname, SystemTime::now(), rewards_addr);
1912        quote.pub_key = pub_key_bytes;
1913
1914        // Use random ed25519 peer IDs — they won't match BLAKE3(pub_key)
1915        let mut peer_quotes = Vec::new();
1916        for _ in 0..CLOSE_GROUP_SIZE {
1917            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1918        }
1919
1920        let proof_bytes = serialize_proof(peer_quotes);
1921        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1922
1923        assert!(result.is_err(), "Should reject wrong peer binding");
1924        let err_msg = format!("{}", result.expect_err("should fail"));
1925        assert!(
1926            err_msg.contains("pub_key does not belong to claimed peer"),
1927            "Error should mention binding mismatch: {err_msg}"
1928        );
1929    }
1930
1931    // =========================================================================
1932    // Merkle-tagged proof tests
1933    // =========================================================================
1934
1935    #[tokio::test]
1936    async fn test_merkle_tagged_proof_invalid_data_rejected() {
1937        use crate::ant_protocol::PROOF_TAG_MERKLE;
1938
1939        let verifier = create_test_verifier();
1940        let xorname = [0xA1u8; 32];
1941
1942        // Build a merkle-tagged proof with garbage body.
1943        // The tag byte is correct but the body is not valid msgpack.
1944        let mut merkle_garbage = Vec::with_capacity(64);
1945        merkle_garbage.push(PROOF_TAG_MERKLE);
1946        merkle_garbage.extend_from_slice(&[0xAB; 63]);
1947
1948        let result = verifier
1949            .verify_payment(&xorname, Some(&merkle_garbage))
1950            .await;
1951
1952        assert!(
1953            result.is_err(),
1954            "Should reject merkle proof with invalid body"
1955        );
1956        let err_msg = format!("{}", result.expect_err("should fail"));
1957        assert!(
1958            err_msg.contains("deserialize") || err_msg.contains("merkle proof"),
1959            "Error should mention deserialization failure: {err_msg}"
1960        );
1961    }
1962
1963    #[tokio::test]
1964    async fn test_single_node_tagged_proof_deserialization() {
1965        use crate::payment::proof::serialize_single_node_proof;
1966        use evmlib::{EncodedPeerId, RewardsAddress};
1967
1968        let verifier = create_test_verifier();
1969        let xorname = [0xA2u8; 32];
1970        let rewards_addr = RewardsAddress::new([1u8; 20]);
1971
1972        // Build a valid tagged single-node proof
1973        let quote = make_fake_quote(xorname, SystemTime::now(), rewards_addr);
1974        let mut peer_quotes = Vec::new();
1975        for _ in 0..CLOSE_GROUP_SIZE {
1976            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1977        }
1978
1979        let proof = crate::payment::proof::PaymentProof {
1980            proof_of_payment: ProofOfPayment {
1981                peer_quotes: peer_quotes.clone(),
1982            },
1983            tx_hashes: vec![],
1984        };
1985
1986        let tagged_bytes = serialize_single_node_proof(&proof).expect("serialize tagged proof");
1987
1988        // detect_proof_type should identify it as SingleNode
1989        assert_eq!(
1990            crate::payment::proof::detect_proof_type(&tagged_bytes),
1991            Some(crate::payment::proof::ProofType::SingleNode)
1992        );
1993
1994        // verify_payment should process it through the single-node path.
1995        // It will fail at quote validation (fake pub_key), but we verify
1996        // it passes the deserialization stage by checking the error type.
1997        let result = verifier.verify_payment(&xorname, Some(&tagged_bytes)).await;
1998
1999        assert!(result.is_err(), "Should fail at quote validation stage");
2000        let err_msg = format!("{}", result.expect_err("should fail"));
2001        // It should NOT be a deserialization error — it should get further
2002        assert!(
2003            !err_msg.contains("deserialize"),
2004            "Should pass deserialization but fail later: {err_msg}"
2005        );
2006    }
2007
2008    #[test]
2009    fn test_pool_cache_insert_and_lookup() {
2010        use evmlib::merkle_batch_payment::PoolHash;
2011
2012        // Verify the pool_cache field exists and works correctly.
2013        // Insert a pool hash, then verify it's present on lookup.
2014        let verifier = create_test_verifier();
2015
2016        let pool_hash: PoolHash = [0xBBu8; 32];
2017        let payment_info = evmlib::merkle_payments::OnChainPaymentInfo {
2018            depth: 4,
2019            merkle_payment_timestamp: 1_700_000_000,
2020            paid_node_addresses: vec![],
2021        };
2022
2023        // Insert into pool cache
2024        {
2025            let mut cache = verifier.pool_cache.lock();
2026            cache.put(pool_hash, payment_info);
2027        }
2028
2029        // First lookup — should find it
2030        {
2031            let found = verifier.pool_cache.lock().get(&pool_hash).cloned();
2032            assert!(found.is_some(), "Pool hash should be in cache after insert");
2033            let info = found.expect("cached info");
2034            assert_eq!(info.depth, 4);
2035            assert_eq!(info.merkle_payment_timestamp, 1_700_000_000);
2036        }
2037
2038        // Second lookup — same result (no double-query needed)
2039        {
2040            let found = verifier.pool_cache.lock().get(&pool_hash).cloned();
2041            assert!(
2042                found.is_some(),
2043                "Pool hash should still be in cache on second lookup"
2044            );
2045        }
2046
2047        // Different pool hash — should NOT be found
2048        let other_hash: PoolHash = [0xCCu8; 32];
2049        {
2050            let found = verifier.pool_cache.lock().get(&other_hash).cloned();
2051            assert!(found.is_none(), "Unknown pool hash should not be in cache");
2052        }
2053    }
2054
2055    #[tokio::test]
2056    async fn closeness_pass_cache_short_circuits_second_call() {
2057        // When a pool_hash is in the closeness_pass_cache, the outer
2058        // verify_merkle_candidate_closeness must return Ok(()) without
2059        // running the inner lookup — even if no P2PNode is attached.
2060        // That second half (no-p2p → would normally fail-closed in release)
2061        // is the proof the cache short-circuit ran first.
2062        let verifier = create_test_verifier();
2063        let pool_hash = [0xAAu8; 32];
2064        verifier.closeness_pass_cache.lock().put(pool_hash, ());
2065
2066        // Construct a dummy pool — contents don't matter because the cache
2067        // hit means we never look at them.
2068        let pool = MerklePaymentCandidatePool {
2069            midpoint_proof: fake_midpoint_proof(),
2070            candidate_nodes: make_candidate_nodes(1_700_000_000),
2071        };
2072
2073        let result = verifier
2074            .verify_merkle_candidate_closeness(&pool, pool_hash)
2075            .await;
2076        assert!(
2077            result.is_ok(),
2078            "cached pool hash must bypass the inner check and return Ok(()), got: {result:?}"
2079        );
2080    }
2081
2082    #[tokio::test]
2083    async fn closeness_single_flight_concurrent_readers_share_one_verification() {
2084        // Two concurrent callers for the same pool_hash should produce the
2085        // same outcome, and the cache should end up populated exactly once.
2086        // We use the test-utils fail-open path to short-circuit the inner
2087        // DHT lookup; the purpose of this test is the single-flight
2088        // plumbing, not the lookup itself.
2089        let verifier = Arc::new(create_test_verifier());
2090        let pool_hash = [0x77u8; 32];
2091        let pool = MerklePaymentCandidatePool {
2092            midpoint_proof: fake_midpoint_proof(),
2093            candidate_nodes: make_candidate_nodes(1_700_000_000),
2094        };
2095
2096        let v1 = Arc::clone(&verifier);
2097        let p1 = pool.clone();
2098        let v2 = Arc::clone(&verifier);
2099        let p2 = pool.clone();
2100
2101        let (r1, r2) = tokio::join!(
2102            async move { v1.verify_merkle_candidate_closeness(&p1, pool_hash).await },
2103            async move { v2.verify_merkle_candidate_closeness(&p2, pool_hash).await },
2104        );
2105
2106        assert_eq!(r1.is_ok(), r2.is_ok(), "concurrent callers must agree");
2107        assert!(
2108            r1.is_ok(),
2109            "both callers must succeed on the test-utils path"
2110        );
2111        assert!(
2112            verifier
2113                .closeness_pass_cache
2114                .lock()
2115                .get(&pool_hash)
2116                .is_some(),
2117            "success path must populate the pass cache"
2118        );
2119        assert!(
2120            verifier.inflight_closeness.lock().get(&pool_hash).is_none(),
2121            "inflight slot must be cleared after the leader finishes"
2122        );
2123    }
2124
2125    #[tokio::test]
2126    async fn closeness_waiter_reads_leaders_published_failure() {
2127        // Prove the waiter path actually surfaces a failure published by a
2128        // concurrent leader, without running its own inner check. Insert a
2129        // slot, spawn a waiter (which will park on notified_owned), then
2130        // publish failure + notify from the outside — simulating what the
2131        // leader's `publish` + drop-guard pair does.
2132        let verifier = Arc::new(create_test_verifier());
2133        let pool_hash = [0x55u8; 32];
2134        let slot = Arc::new(ClosenessSlot::new());
2135        verifier
2136            .inflight_closeness
2137            .lock()
2138            .put(pool_hash, Arc::clone(&slot));
2139
2140        let pool = MerklePaymentCandidatePool {
2141            midpoint_proof: fake_midpoint_proof(),
2142            candidate_nodes: make_candidate_nodes(1_700_000_000),
2143        };
2144
2145        let verifier_c = Arc::clone(&verifier);
2146        let pool_c = pool.clone();
2147        let waiter = tokio::spawn(async move {
2148            verifier_c
2149                .verify_merkle_candidate_closeness(&pool_c, pool_hash)
2150                .await
2151        });
2152
2153        // Yield so the waiter can run up to its `notified_owned().await`.
2154        // A few yields cover both single-threaded and multi-threaded tokio
2155        // runtimes regardless of scheduling.
2156        for _ in 0..5 {
2157            tokio::task::yield_now().await;
2158        }
2159
2160        // Simulate the leader's `publish` + drop-guard: publish the result,
2161        // clear the slot, wake waiters.
2162        slot.result
2163            .set(Err("forged pool: not close enough".to_string()))
2164            .expect("set once");
2165        verifier.inflight_closeness.lock().pop(&pool_hash);
2166        slot.notify.notify_waiters();
2167
2168        let result = waiter.await.expect("task panicked");
2169        let err = result.expect_err("waiter must return the leader's published failure");
2170        assert!(
2171            err.to_string().contains("forged pool"),
2172            "waiter must surface the leader's error message, got: {err}"
2173        );
2174    }
2175
2176    #[tokio::test]
2177    async fn closeness_rejects_pool_with_duplicate_candidate_pub_keys() {
2178        // An attacker who submits 16 copies of the same real peer's pub_key
2179        // would otherwise satisfy the 13/16 closeness threshold trivially:
2180        // that one peer's membership in the DHT-returned set would count
2181        // 16 times. The dedupe check in verify_merkle_candidate_closeness_inner
2182        // must reject the pool BEFORE the network lookup runs (so this test
2183        // works even with no P2PNode attached).
2184        let verifier = create_test_verifier();
2185        let pool_hash = [0xDDu8; 32];
2186
2187        // Build a normal pool, then overwrite every candidate's pub_key
2188        // with a single shared key so all 16 derive to the same PeerId.
2189        let mut candidates = make_candidate_nodes(1_700_000_000);
2190        let shared_pub_key = candidates
2191            .first()
2192            .expect("make_candidate_nodes returns CANDIDATES_PER_POOL entries")
2193            .pub_key
2194            .clone();
2195        for c in &mut candidates {
2196            c.pub_key = shared_pub_key.clone();
2197        }
2198        let pool = MerklePaymentCandidatePool {
2199            midpoint_proof: fake_midpoint_proof(),
2200            candidate_nodes: candidates,
2201        };
2202
2203        let result = verifier
2204            .verify_merkle_candidate_closeness(&pool, pool_hash)
2205            .await;
2206        let err = result.expect_err("duplicate candidate PeerIds must be rejected");
2207        let msg = err.to_string();
2208        assert!(
2209            msg.contains("duplicate candidate PeerId"),
2210            "rejection must be the duplicate-PeerId branch, got: {msg}"
2211        );
2212    }
2213
2214    /// Build a deterministic but otherwise-unused `MidpointProof` so unit
2215    /// tests can construct a `MerklePaymentCandidatePool` without spinning
2216    /// up a real merkle tree. The closeness path only calls `.address()`
2217    /// on it, which is a pure BLAKE3 of the branch's leaf/root/timestamp —
2218    /// the values don't need to be tree-valid for these tests.
2219    fn fake_midpoint_proof() -> evmlib::merkle_payments::MidpointProof {
2220        // Build a minimal tree of two leaves so we get a real branch.
2221        let leaves = vec![xor_name::XorName([1u8; 32]), xor_name::XorName([2u8; 32])];
2222        let tree = evmlib::merkle_payments::MerkleTree::from_xornames(leaves).expect("tree");
2223        let candidates = tree.reward_candidates(1_700_000_000).expect("candidates");
2224        candidates.first().expect("at least one").clone()
2225    }
2226
2227    // =========================================================================
2228    // Merkle verification unit tests
2229    // =========================================================================
2230
2231    /// Helper: build 16 validly-signed ML-DSA-65 candidate nodes.
2232    fn make_candidate_nodes(
2233        timestamp: u64,
2234    ) -> [evmlib::merkle_payments::MerklePaymentCandidateNode;
2235           evmlib::merkle_payments::CANDIDATES_PER_POOL] {
2236        use evmlib::merkle_payments::{MerklePaymentCandidateNode, CANDIDATES_PER_POOL};
2237        use saorsa_core::MlDsa65;
2238        use saorsa_pqc::pqc::types::MlDsaSecretKey;
2239        use saorsa_pqc::pqc::MlDsaOperations;
2240
2241        std::array::from_fn::<_, CANDIDATES_PER_POOL, _>(|i| {
2242            let ml_dsa = MlDsa65::new();
2243            let (pub_key, secret_key) = ml_dsa.generate_keypair().expect("keygen");
2244            let price = evmlib::common::Amount::from(1024u64);
2245            #[allow(clippy::cast_possible_truncation)]
2246            let reward_address = RewardsAddress::new([i as u8; 20]);
2247            let msg = MerklePaymentCandidateNode::bytes_to_sign(&price, &reward_address, timestamp);
2248            let sk = MlDsaSecretKey::from_bytes(secret_key.as_bytes()).expect("sk");
2249            let signature = ml_dsa.sign(&sk, &msg).expect("sign").as_bytes().to_vec();
2250
2251            MerklePaymentCandidateNode {
2252                pub_key: pub_key.as_bytes().to_vec(),
2253                price,
2254                reward_address,
2255                merkle_payment_timestamp: timestamp,
2256                signature,
2257            }
2258        })
2259    }
2260
2261    /// Helper: build a valid `MerklePaymentProof` with real ML-DSA-65
2262    /// signatures. Returns the raw proof, pool hash, xorname, and timestamp.
2263    fn make_valid_merkle_proof() -> (
2264        evmlib::merkle_payments::MerklePaymentProof,
2265        evmlib::merkle_batch_payment::PoolHash,
2266        [u8; 32],
2267        u64,
2268    ) {
2269        use evmlib::merkle_payments::{MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree};
2270
2271        let timestamp = std::time::SystemTime::now()
2272            .duration_since(std::time::UNIX_EPOCH)
2273            .expect("system time")
2274            .as_secs();
2275
2276        let addresses: Vec<xor_name::XorName> = (0..4u8)
2277            .map(|i| xor_name::XorName::from_content(&[i]))
2278            .collect();
2279        let tree = MerkleTree::from_xornames(addresses.clone()).expect("tree");
2280
2281        let candidate_nodes = make_candidate_nodes(timestamp);
2282
2283        let reward_candidates = tree
2284            .reward_candidates(timestamp)
2285            .expect("reward candidates");
2286        let midpoint_proof = reward_candidates
2287            .first()
2288            .expect("at least one candidate")
2289            .clone();
2290
2291        let pool = MerklePaymentCandidatePool {
2292            midpoint_proof,
2293            candidate_nodes,
2294        };
2295
2296        let first_address = *addresses.first().expect("first address");
2297        let address_proof = tree
2298            .generate_address_proof(0, first_address)
2299            .expect("proof");
2300
2301        let merkle_proof = MerklePaymentProof::new(first_address, address_proof, pool);
2302        let pool_hash = merkle_proof.winner_pool_hash();
2303        let xorname = first_address.0;
2304
2305        (merkle_proof, pool_hash, xorname, timestamp)
2306    }
2307
2308    /// Helper: build a minimal valid `MerklePaymentProof` with real ML-DSA-65
2309    /// signatures. Returns `(xorname, serialized_tagged_proof, pool_hash, timestamp)`.
2310    fn make_valid_merkle_proof_bytes() -> (
2311        [u8; 32],
2312        Vec<u8>,
2313        evmlib::merkle_batch_payment::PoolHash,
2314        u64,
2315    ) {
2316        let (merkle_proof, pool_hash, xorname, timestamp) = make_valid_merkle_proof();
2317        let tagged = crate::payment::proof::serialize_merkle_proof(&merkle_proof)
2318            .expect("serialize merkle proof");
2319        (xorname, tagged, pool_hash, timestamp)
2320    }
2321
2322    #[tokio::test]
2323    async fn test_merkle_address_mismatch_rejected() {
2324        let verifier = create_test_verifier();
2325        let (_correct_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
2326
2327        // Use a DIFFERENT xorname than what the proof was built for
2328        let wrong_xorname = [0xFFu8; 32];
2329
2330        let result = verifier
2331            .verify_payment(&wrong_xorname, Some(&tagged_proof))
2332            .await;
2333
2334        assert!(
2335            result.is_err(),
2336            "Should reject merkle proof address mismatch"
2337        );
2338        let err_msg = format!("{}", result.expect_err("should fail"));
2339        assert!(
2340            err_msg.contains("address mismatch") || err_msg.contains("Merkle proof address"),
2341            "Error should mention address mismatch: {err_msg}"
2342        );
2343    }
2344
2345    #[tokio::test]
2346    async fn test_merkle_malformed_body_rejected() {
2347        let verifier = create_test_verifier();
2348        let xorname = [0xA3u8; 32];
2349
2350        // Valid merkle tag but truncated/corrupted msgpack body
2351        let mut bad_proof = vec![crate::ant_protocol::PROOF_TAG_MERKLE];
2352        bad_proof.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
2353        bad_proof.extend_from_slice(&[0x00; 10]);
2354        // pad to minimum size
2355        while bad_proof.len() < MIN_PAYMENT_PROOF_SIZE_BYTES {
2356            bad_proof.push(0x00);
2357        }
2358
2359        let result = verifier.verify_payment(&xorname, Some(&bad_proof)).await;
2360
2361        assert!(result.is_err(), "Should reject malformed merkle body");
2362        let err_msg = format!("{}", result.expect_err("should fail"));
2363        assert!(
2364            err_msg.contains("deserialize") || err_msg.contains("Failed"),
2365            "Error should mention deserialization: {err_msg}"
2366        );
2367    }
2368
2369    #[test]
2370    fn test_merkle_proof_serialized_size_within_limits() {
2371        let (_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
2372
2373        // 16 ML-DSA-65 candidates (~1952 pub key + ~3309 sig each) ≈ 84 KB + tree data
2374        assert!(
2375            tagged_proof.len() >= MIN_PAYMENT_PROOF_SIZE_BYTES,
2376            "Merkle proof ({} bytes) should be >= min {} bytes",
2377            tagged_proof.len(),
2378            MIN_PAYMENT_PROOF_SIZE_BYTES
2379        );
2380        assert!(
2381            tagged_proof.len() <= MAX_PAYMENT_PROOF_SIZE_BYTES,
2382            "Merkle proof ({} bytes) should be <= max {} bytes",
2383            tagged_proof.len(),
2384            MAX_PAYMENT_PROOF_SIZE_BYTES
2385        );
2386    }
2387
2388    #[test]
2389    fn test_merkle_proof_tag_is_correct() {
2390        let (_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
2391
2392        assert_eq!(
2393            tagged_proof.first().copied(),
2394            Some(crate::ant_protocol::PROOF_TAG_MERKLE),
2395            "First byte must be the merkle tag"
2396        );
2397        assert_eq!(
2398            crate::payment::proof::detect_proof_type(&tagged_proof),
2399            Some(crate::payment::proof::ProofType::Merkle)
2400        );
2401    }
2402
2403    #[test]
2404    fn test_pool_cache_eviction() {
2405        use evmlib::merkle_batch_payment::PoolHash;
2406
2407        let config = PaymentVerifierConfig {
2408            evm: EvmVerifierConfig::default(),
2409            cache_capacity: 100,
2410            local_rewards_address: RewardsAddress::new([1u8; 20]),
2411        };
2412        let verifier = PaymentVerifier::new(config);
2413
2414        // Fill the pool cache to capacity (DEFAULT_POOL_CACHE_CAPACITY = 1000)
2415        for i in 0..DEFAULT_POOL_CACHE_CAPACITY {
2416            let mut hash: PoolHash = [0u8; 32];
2417            // Write index bytes into the hash
2418            let idx_bytes = i.to_le_bytes();
2419            for (j, b) in idx_bytes.iter().enumerate() {
2420                if j < 32 {
2421                    hash[j] = *b;
2422                }
2423            }
2424            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2425                depth: 4,
2426                merkle_payment_timestamp: 1_700_000_000,
2427                paid_node_addresses: vec![],
2428            };
2429            verifier.pool_cache.lock().put(hash, info);
2430        }
2431
2432        assert_eq!(
2433            verifier.pool_cache.lock().len(),
2434            DEFAULT_POOL_CACHE_CAPACITY
2435        );
2436
2437        // Insert one more — should evict the oldest
2438        let overflow_hash: PoolHash = [0xFFu8; 32];
2439        let info = evmlib::merkle_payments::OnChainPaymentInfo {
2440            depth: 8,
2441            merkle_payment_timestamp: 1_800_000_000,
2442            paid_node_addresses: vec![],
2443        };
2444        verifier.pool_cache.lock().put(overflow_hash, info);
2445
2446        // Size should still be at capacity (not capacity + 1)
2447        assert_eq!(
2448            verifier.pool_cache.lock().len(),
2449            DEFAULT_POOL_CACHE_CAPACITY
2450        );
2451
2452        // The new entry should be present
2453        let found = verifier.pool_cache.lock().get(&overflow_hash).cloned();
2454        assert!(
2455            found.is_some(),
2456            "Newly inserted pool hash should be present"
2457        );
2458        assert_eq!(found.expect("info").depth, 8);
2459    }
2460
2461    #[test]
2462    fn test_pool_cache_concurrent_access() {
2463        use evmlib::merkle_batch_payment::PoolHash;
2464        use std::sync::Arc;
2465
2466        let verifier = Arc::new(create_test_verifier());
2467
2468        let mut handles = Vec::new();
2469        for i in 0..20u8 {
2470            let v = verifier.clone();
2471            handles.push(std::thread::spawn(move || {
2472                let hash: PoolHash = [i; 32];
2473                let info = evmlib::merkle_payments::OnChainPaymentInfo {
2474                    depth: i,
2475                    merkle_payment_timestamp: u64::from(i) * 1000,
2476                    paid_node_addresses: vec![],
2477                };
2478                v.pool_cache.lock().put(hash, info);
2479
2480                // Read back
2481                let found = v.pool_cache.lock().get(&hash).cloned();
2482                assert!(found.is_some(), "Entry {i} should be readable after insert");
2483            }));
2484        }
2485
2486        for handle in handles {
2487            handle.join().expect("thread panicked");
2488        }
2489
2490        // All 20 entries should be present (well under 1000 capacity)
2491        assert_eq!(verifier.pool_cache.lock().len(), 20);
2492    }
2493
2494    #[tokio::test]
2495    async fn test_merkle_tampered_candidate_signature_rejected() {
2496        let verifier = create_test_verifier();
2497
2498        let (mut merkle_proof, _pool_hash, xorname, timestamp) = make_valid_merkle_proof();
2499
2500        // Tamper the first candidate's signature
2501        if let Some(byte) = merkle_proof
2502            .winner_pool
2503            .candidate_nodes
2504            .first_mut()
2505            .and_then(|c| c.signature.first_mut())
2506        {
2507            *byte ^= 0xFF;
2508        }
2509
2510        // Recompute pool hash after tampering (signature change alters the hash)
2511        let tampered_pool_hash = merkle_proof.winner_pool_hash();
2512
2513        // Pre-populate pool cache so we skip the on-chain query
2514        {
2515            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2516                depth: 4,
2517                merkle_payment_timestamp: timestamp,
2518                paid_node_addresses: vec![],
2519            };
2520            verifier.pool_cache.lock().put(tampered_pool_hash, info);
2521        }
2522
2523        let tagged =
2524            crate::payment::proof::serialize_merkle_proof(&merkle_proof).expect("serialize");
2525
2526        let result = verifier.verify_payment(&xorname, Some(&tagged)).await;
2527
2528        assert!(
2529            result.is_err(),
2530            "Should reject merkle proof with tampered candidate signature"
2531        );
2532        let err_msg = format!("{}", result.expect_err("should fail"));
2533        assert!(
2534            err_msg.contains("Invalid ML-DSA-65 signature"),
2535            "Error should mention invalid signature: {err_msg}"
2536        );
2537    }
2538
2539    #[tokio::test]
2540    async fn test_merkle_timestamp_mismatch_rejected() {
2541        let verifier = create_test_verifier();
2542
2543        let (xorname, tagged, pool_hash, timestamp) = make_valid_merkle_proof_bytes();
2544
2545        // Pre-populate pool cache with a DIFFERENT timestamp than the candidates
2546        {
2547            let mismatched_ts = timestamp + 9999;
2548            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2549                depth: 4,
2550                merkle_payment_timestamp: mismatched_ts,
2551                paid_node_addresses: vec![],
2552            };
2553            verifier.pool_cache.lock().put(pool_hash, info);
2554        }
2555
2556        let result = verifier.verify_payment(&xorname, Some(&tagged)).await;
2557
2558        assert!(
2559            result.is_err(),
2560            "Should reject merkle proof with timestamp mismatch"
2561        );
2562        let err_msg = format!("{}", result.expect_err("should fail"));
2563        assert!(
2564            err_msg.contains("timestamp mismatch"),
2565            "Error should mention timestamp mismatch: {err_msg}"
2566        );
2567    }
2568
2569    #[tokio::test]
2570    async fn test_merkle_paid_node_index_out_of_bounds_rejected() {
2571        let verifier = create_test_verifier();
2572        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2573
2574        // The test tree has 4 addresses → depth 2. We must match the tree depth
2575        // so verify_merkle_proof passes the depth check, then the paid node
2576        // index out-of-bounds check fires.
2577        {
2578            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2579                depth: 2,
2580                merkle_payment_timestamp: ts,
2581                paid_node_addresses: vec![
2582                    // First paid node: valid (matches candidate 0, amount matches formula)
2583                    // Expected per-node: median(1024) * 2^2 / 2 = 2048
2584                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(2048u64)),
2585                    // Second paid node: index 999 is way beyond CANDIDATES_PER_POOL (16)
2586                    (RewardsAddress::new([1u8; 20]), 999, Amount::from(2048u64)),
2587                ],
2588            };
2589            verifier.pool_cache.lock().put(pool_hash, info);
2590        }
2591
2592        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2593
2594        assert!(
2595            result.is_err(),
2596            "Should reject paid node index out of bounds"
2597        );
2598        let err_msg = format!("{}", result.expect_err("should fail"));
2599        assert!(
2600            err_msg.contains("out of bounds"),
2601            "Error should mention out of bounds: {err_msg}"
2602        );
2603    }
2604
2605    #[tokio::test]
2606    async fn test_merkle_paid_node_address_mismatch_rejected() {
2607        let verifier = create_test_verifier();
2608        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2609
2610        // Tree has depth 2, so provide 2 paid node entries.
2611        // Both use valid indices but the second has a wrong reward address.
2612        {
2613            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2614                depth: 2,
2615                merkle_payment_timestamp: ts,
2616                paid_node_addresses: vec![
2617                    // Index 0 with matching address [0x00; 20]
2618                    // Expected per-node: median(1024) * 2^2 / 2 = 2048
2619                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(2048u64)),
2620                    // Index 1 with WRONG address — candidate 1's address is [0x01; 20]
2621                    (RewardsAddress::new([0xFF; 20]), 1, Amount::from(2048u64)),
2622                ],
2623            };
2624            verifier.pool_cache.lock().put(pool_hash, info);
2625        }
2626
2627        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2628
2629        assert!(result.is_err(), "Should reject paid node address mismatch");
2630        let err_msg = format!("{}", result.expect_err("should fail"));
2631        assert!(
2632            err_msg.contains("address mismatch"),
2633            "Error should mention address mismatch: {err_msg}"
2634        );
2635    }
2636
2637    #[tokio::test]
2638    async fn test_merkle_wrong_depth_rejected() {
2639        let verifier = create_test_verifier();
2640        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2641
2642        // Pre-populate pool cache with depth=3 but only 1 paid node address
2643        // (depth must equal paid_node_addresses.len())
2644        {
2645            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2646                depth: 3,
2647                merkle_payment_timestamp: ts,
2648                paid_node_addresses: vec![(
2649                    RewardsAddress::new([0u8; 20]),
2650                    0,
2651                    Amount::from(1024u64),
2652                )],
2653            };
2654            verifier.pool_cache.lock().put(pool_hash, info);
2655        }
2656
2657        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2658
2659        assert!(
2660            result.is_err(),
2661            "Should reject mismatched depth vs paid node count"
2662        );
2663        let err_msg = format!("{}", result.expect_err("should fail"));
2664        assert!(
2665            err_msg.contains("Wrong number of paid nodes")
2666                || err_msg.contains("verification failed"),
2667            "Error should mention depth/count mismatch: {err_msg}"
2668        );
2669    }
2670
2671    #[tokio::test]
2672    async fn test_merkle_underpayment_rejected() {
2673        let verifier = create_test_verifier();
2674        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2675
2676        // Tree depth=2, so 2 paid nodes required. Candidates all quote price=1024.
2677        // Expected per-node: median(1024) * 2^2 / 2 = 2048.
2678        // Pay only 1 wei per node — far below the expected amount.
2679        {
2680            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2681                depth: 2,
2682                merkle_payment_timestamp: ts,
2683                paid_node_addresses: vec![
2684                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(1u64)),
2685                    (RewardsAddress::new([1u8; 20]), 1, Amount::from(1u64)),
2686                ],
2687            };
2688            verifier.pool_cache.lock().put(pool_hash, info);
2689        }
2690
2691        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2692
2693        assert!(
2694            result.is_err(),
2695            "Should reject merkle payment where paid amount < expected per-node amount"
2696        );
2697        let err_msg = format!("{}", result.expect_err("should fail"));
2698        assert!(
2699            err_msg.contains("Underpayment"),
2700            "Error should mention underpayment: {err_msg}"
2701        );
2702    }
2703
2704    // =========================================================================
2705    // Closeness-window constants regression tests
2706    //
2707    // These constants are load-bearing for both correctness (the storer
2708    // must look at the same window the client picks from, otherwise honest
2709    // pools are rejected) and DoS resistance (the timeout caps lookup
2710    // amplification per forged pool_hash). Pinning them with tests gives
2711    // future patches a one-line failure if either is silently changed
2712    // without updating the security argument in the doc comments.
2713    //
2714    // Empirical justification, captured during STG-01 investigation on
2715    // 2026-05-01:
2716    //
2717    //   - 60s timeout cut iterative lookups off after ~7 of 20 iterations
2718    //     (trace from EWR-3 ant-node-1 in CLOSENESS_LOOKUP_TIMEOUT doc).
2719    //   - K=16 storer window vs K=32 client over-query produced 73%
2720    //     false-positive mismatch rejections under realistic load
2721    //     (115 → 31 client mismatches per 5min after K=32 deploy).
2722    // =========================================================================
2723
2724    #[test]
2725    fn closeness_lookup_timeout_is_240s() {
2726        // Pin the timeout. If a future change drops it back to 60s the
2727        // failure mode from the trace in the doc comment will return.
2728        assert_eq!(
2729            PaymentVerifier::CLOSENESS_LOOKUP_TIMEOUT,
2730            std::time::Duration::from_secs(240),
2731            "CLOSENESS_LOOKUP_TIMEOUT must be 240s; if changing this, update \
2732             the iteration trace in the doc comment and re-validate on a \
2733             fresh testnet"
2734        );
2735    }
2736
2737    #[test]
2738    fn closeness_lookup_width_is_32() {
2739        // Pin the storer's lookup width. Must equal the client's
2740        // over-query factor (CANDIDATES_PER_POOL * 2 = 32) so the storer
2741        // sees the same peers the client legitimately picks from.
2742        assert_eq!(
2743            PaymentVerifier::CLOSENESS_LOOKUP_WIDTH,
2744            2 * evmlib::merkle_payments::CANDIDATES_PER_POOL,
2745            "CLOSENESS_LOOKUP_WIDTH must equal 2 * CANDIDATES_PER_POOL to \
2746             match the client's over-query in get_merkle_candidate_pool"
2747        );
2748    }
2749
2750    #[test]
2751    fn closeness_required_threshold_unchanged_at_13() {
2752        // Sanity-check that widening the lookup did not also lower the
2753        // matching threshold. The 13/16 floor is the security knob; the
2754        // window widening is purely a false-positive fix for honest pools.
2755        assert_eq!(
2756            PaymentVerifier::CANDIDATE_CLOSENESS_REQUIRED,
2757            13,
2758            "Widening the lookup window must not lower the matching \
2759             threshold — that would weaken the pay-yourself defence"
2760        );
2761    }
2762
2763    #[test]
2764    fn closeness_lookup_count_uses_max_of_width_and_pool_len() {
2765        // The honest case: a 16-candidate pool must trigger a 32-peer
2766        // network lookup. This is the K=16-rejects-honest-pool fix from
2767        // the STG-01 investigation — without it, the storer never
2768        // observes the peers at network-true positions 17–32 that the
2769        // client legitimately picks from.
2770        let standard =
2771            PaymentVerifier::closeness_lookup_count(evmlib::merkle_payments::CANDIDATES_PER_POOL);
2772        assert_eq!(
2773            standard, 32,
2774            "honest 16-candidate pool must trigger a 32-peer DHT lookup"
2775        );
2776
2777        // Future-proof: if a protocol bump ever produces a pool larger
2778        // than CLOSENESS_LOOKUP_WIDTH, lookup_count must scale with the
2779        // pool — not truncate to WIDTH. Truncating would let an attacker
2780        // hide candidates by padding the pool past the storer's window.
2781        assert_eq!(
2782            PaymentVerifier::closeness_lookup_count(64),
2783            64,
2784            "lookup_count must scale up if pool exceeds CLOSENESS_LOOKUP_WIDTH"
2785        );
2786
2787        // Lower bound (also covered by the const-assert below; pin the
2788        // runtime path too in case the const-assert is ever removed).
2789        assert_eq!(
2790            PaymentVerifier::closeness_lookup_count(1),
2791            PaymentVerifier::CLOSENESS_LOOKUP_WIDTH,
2792            "lookup_count must never drop below CLOSENESS_LOOKUP_WIDTH"
2793        );
2794    }
2795
2796    // Compile-time invariant: the `closeness_lookup_count` formula relies
2797    // on WIDTH being ≥ CANDIDATES_PER_POOL so we never request fewer peers
2798    // than the pool itself contains.
2799    const _: () = assert!(
2800        PaymentVerifier::CLOSENESS_LOOKUP_WIDTH >= evmlib::merkle_payments::CANDIDATES_PER_POOL,
2801        "CLOSENESS_LOOKUP_WIDTH must be ≥ CANDIDATES_PER_POOL",
2802    );
2803
2804    // =========================================================================
2805    // Regression tests for the original STG-01 failure modes
2806    //
2807    // These tests use the extracted `check_closeness_match` helper to
2808    // exercise the matching logic directly with synthetic peer-ID sets,
2809    // without standing up a real DHT. They prove the two failure modes
2810    // observed on STG-01 on 2026-05-01 are fixed by the K=16 → K=32
2811    // change:
2812    //
2813    //   - "K=16 storer rejects honest pool whose candidates legitimately
2814    //     include peers from positions 17–32" (~73% of mismatches)
2815    //
2816    // and that the security floor (`CANDIDATE_CLOSENESS_REQUIRED = 13/16`)
2817    // still rejects forged pools at the wider window.
2818    //
2819    // Pool address used as the XOR midpoint: `[0u8; 32]`.
2820    // Synthetic PeerIds use distinct constant byte patterns so each test
2821    // can reason about which IDs are "in the network's top-K" vs not.
2822    // =========================================================================
2823
2824    /// Build a deterministic `PeerId` from a single byte tag.
2825    fn synthetic_peer_id(tag: u8) -> PeerId {
2826        let mut bytes = [0u8; 32];
2827        bytes[0] = tag;
2828        PeerId::from_bytes(bytes)
2829    }
2830
2831    /// Build a vector of synthetic `PeerId`s tagged with bytes 1..=n.
2832    fn synthetic_peer_ids(n: u8) -> Vec<PeerId> {
2833        (1..=n).map(synthetic_peer_id).collect()
2834    }
2835
2836    #[test]
2837    fn closeness_match_passes_when_all_16_candidates_in_top_16() {
2838        // Trivial case: every candidate is in the network's top-16.
2839        // Asserts the happy path still works after the refactor.
2840        let candidates = synthetic_peer_ids(16);
2841        let network = synthetic_peer_ids(16);
2842        let pool_address = [0u8; 32];
2843        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
2844        assert!(result.is_ok(), "all-in-top-16 pool must pass: {result:?}");
2845    }
2846
2847    #[test]
2848    fn closeness_match_passes_when_candidates_span_positions_1_to_15_and_17() {
2849        // STG-01 regression test: the client's pool contains 16 candidates,
2850        // 15 of which are at network-true positions 1..=15, and ONE of
2851        // which is at position 17 (because the network-true position-16
2852        // peer was unresponsive when the client over-queried 32).
2853        //
2854        // Pre-fix (K=16 storer): network_peer_ids = 16 entries (positions
2855        // 1..=16); position 17 is NOT in the network set, so matched =
2856        // 15 < 13 — wait, 15 ≥ 13, that path actually passes too. The
2857        // failure mode was a *worse* skew where 4+ of the storer's top-16
2858        // were unresponsive at the client side. Let me model that case
2859        // precisely below.
2860        let candidates = synthetic_peer_ids(15)
2861            .into_iter()
2862            .chain(std::iter::once(synthetic_peer_id(17)))
2863            .collect::<Vec<_>>();
2864        // Post-fix lookup window = 32, includes position 17.
2865        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
2866        let pool_address = [0u8; 32];
2867        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
2868        assert!(
2869            result.is_ok(),
2870            "pool with one candidate at position 17 must pass under K=32: {result:?}"
2871        );
2872    }
2873
2874    #[test]
2875    fn closeness_match_fails_at_k_16_passes_at_k_32_for_honest_skew() {
2876        // The actual STG-01 failure mode: the client's 16 candidates
2877        // legitimately span network-true positions {1..=12, 17, 19, 21,
2878        // 23} — i.e. 12 positions in the storer's top-16 plus 4 in the
2879        // 17–32 window (because positions 13–16 were unresponsive when
2880        // the client over-queried).
2881        let candidates: Vec<PeerId> = (1..=12u8)
2882            .chain([17u8, 19, 21, 23])
2883            .map(synthetic_peer_id)
2884            .collect();
2885        let pool_address = [0u8; 32];
2886
2887        // Pre-fix (K=16): network = positions 1..=16. Only 12 of the 16
2888        // candidates appear — below the 13/16 threshold. This is the
2889        // exact false-positive rejection STG-01 was hitting.
2890        let network_pre_fix: Vec<PeerId> = (1..=16).map(synthetic_peer_id).collect();
2891        let result_pre_fix =
2892            PaymentVerifier::check_closeness_match(&candidates, &network_pre_fix, &pool_address);
2893        assert!(
2894            result_pre_fix.is_err(),
2895            "PRE-FIX: K=16 storer should reject the honest pool (this is \
2896             the bug we observed; if this assertion stops failing the \
2897             refactor lost the rejection logic): {result_pre_fix:?}"
2898        );
2899
2900        // Post-fix (K=32): network = positions 1..=32. All 16 candidates
2901        // appear (12 at 1..=12, 4 at 17/19/21/23). matched = 16 ≥ 13:
2902        // pool accepted. This is the fix.
2903        let network_post_fix: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
2904        let result_post_fix =
2905            PaymentVerifier::check_closeness_match(&candidates, &network_post_fix, &pool_address);
2906        assert!(
2907            result_post_fix.is_ok(),
2908            "POST-FIX: K=32 storer must accept the same honest pool: {result_post_fix:?}"
2909        );
2910    }
2911
2912    #[test]
2913    fn closeness_match_rejects_forged_pool_at_k_32() {
2914        // Security floor regression: a fully-forged pool whose candidate
2915        // PeerIds are network-disjoint must still be rejected at the
2916        // wider window K=32. The 13/16 threshold is the security knob;
2917        // widening the lookup window must not soften it.
2918        //
2919        // Tag bytes 100..=115 are deliberately disjoint from the network
2920        // set (1..=32).
2921        let forged_candidates: Vec<PeerId> = (100..=115).map(synthetic_peer_id).collect();
2922        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
2923        let pool_address = [0u8; 32];
2924
2925        let result =
2926            PaymentVerifier::check_closeness_match(&forged_candidates, &network, &pool_address);
2927        match result {
2928            Err(Error::Payment(msg)) => {
2929                assert!(
2930                    msg.contains("candidate pub_keys do not match"),
2931                    "expected forged-pool rejection message, got: {msg}"
2932                );
2933            }
2934            other => panic!(
2935                "forged pool with all candidates outside network's top-32 \
2936                 must be rejected at K=32 (security floor): {other:?}"
2937            ),
2938        }
2939    }
2940
2941    #[test]
2942    fn closeness_match_rejects_pool_at_exactly_12_of_16_match() {
2943        // Threshold sanity: a pool with exactly 12 of 16 candidates in
2944        // the network set must still be rejected (12 < 13).
2945        let mut candidates = synthetic_peer_ids(12);
2946        candidates.extend((100..=103).map(synthetic_peer_id)); // 4 disjoint
2947        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
2948        let pool_address = [0u8; 32];
2949
2950        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
2951        assert!(
2952            result.is_err(),
2953            "12/16 < threshold of 13/16 must reject regardless of K: {result:?}"
2954        );
2955    }
2956
2957    #[test]
2958    fn closeness_match_accepts_pool_at_exactly_13_of_16_match() {
2959        // Threshold sanity: a pool with exactly 13 of 16 candidates in
2960        // the network set must pass (13 ≥ 13).
2961        let mut candidates = synthetic_peer_ids(13);
2962        candidates.extend((100..=102).map(synthetic_peer_id)); // 3 disjoint
2963        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
2964        let pool_address = [0u8; 32];
2965
2966        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
2967        assert!(
2968            result.is_ok(),
2969            "13/16 ≥ threshold of 13/16 must accept: {result:?}"
2970        );
2971    }
2972
2973    #[test]
2974    fn closeness_match_returns_sparse_dht_error_when_lookup_too_small() {
2975        // The sparse-DHT short-circuit fires when the lookup returned
2976        // fewer peers than the threshold itself — even an all-matching
2977        // candidate set can't pass because the storer doesn't have an
2978        // authoritative view to compare against.
2979        let candidates = synthetic_peer_ids(16);
2980        let network = synthetic_peer_ids(12); // < CANDIDATE_CLOSENESS_REQUIRED
2981        let pool_address = [0u8; 32];
2982
2983        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
2984        match result {
2985            Err(Error::Payment(msg)) => {
2986                assert!(
2987                    msg.contains("authoritative DHT lookup returned only 12"),
2988                    "expected sparse-DHT error message, got: {msg}"
2989                );
2990            }
2991            other => panic!("expected sparse-DHT rejection, got: {other:?}"),
2992        }
2993    }
2994}