Skip to main content

ant_node/payment/
verifier.rs

1//! Payment verifier with LRU cache and EVM verification.
2//!
3//! This is the core payment verification logic for ant-node.
4//! All new data requires EVM payment on Arbitrum (no free tier).
5
6use crate::ant_protocol::CLOSE_GROUP_SIZE;
7use crate::error::{Error, Result};
8use crate::logging::{debug, info, warn};
9use crate::payment::cache::{CacheStats, VerifiedCache, XorName};
10use crate::payment::pricing::{calculate_price, derive_records_stored_from_price};
11use crate::payment::proof::{
12    deserialize_merkle_proof, deserialize_proof, detect_proof_type, ProofType,
13};
14use crate::payment::single_node::SingleNodePayment;
15use crate::storage::lmdb::LmdbStorage;
16use ant_protocol::payment::verify::{verify_quote_content, verify_quote_signature};
17use evmlib::common::Amount;
18use evmlib::contract::payment_vault;
19use evmlib::merkle_batch_payment::{OnChainPaymentInfo, PoolHash};
20use evmlib::Network as EvmNetwork;
21use evmlib::ProofOfPayment;
22use evmlib::RewardsAddress;
23use lru::LruCache;
24use parking_lot::{Mutex, RwLock};
25use saorsa_core::identity::node_identity::peer_id_from_public_key_bytes;
26use saorsa_core::identity::PeerId;
27use saorsa_core::P2PNode;
28use std::num::NonZeroUsize;
29use std::sync::Arc;
30
31/// Minimum allowed size for a payment proof in bytes.
32///
33/// This minimum ensures the proof contains at least a basic cryptographic hash or identifier.
34/// Proofs smaller than this are rejected as they cannot contain sufficient payment information.
35pub const MIN_PAYMENT_PROOF_SIZE_BYTES: usize = 32;
36
37/// Maximum allowed size for a payment proof in bytes (256 KB).
38///
39/// Single-node proofs with 7 ML-DSA-65 quotes reach ~40 KB.
40/// Merkle proofs include 16 candidate nodes (each with ~1,952-byte ML-DSA pub key
41/// and ~3,309-byte signature) plus merkle branch hashes, totaling ~130 KB.
42/// 256 KB provides headroom while still capping memory during verification.
43pub const MAX_PAYMENT_PROOF_SIZE_BYTES: usize = 262_144;
44
45/// Maximum percentage by which a quote's paid price may fall *below* the node's
46/// current price before the quote is rejected as stale.
47///
48/// The freshness gate is one-directional and price-based, not a symmetric
49/// record-count delta:
50///
51/// - **Over-payment is always accepted.** If the client paid at least the
52///   node's current price (e.g. the node pruned records and is now cheaper),
53///   the quote is fine — a node has no reason to reject money.
54/// - **Only meaningful under-payment is rejected.** A quote priced below the
55///   current price by more than this percentage is rejected as stale.
56///
57/// Comparing prices instead of raw record counts makes the tolerance
58/// self-scaling against the quadratic pricing curve: at low/moderate fill the
59/// curve is nearly flat, so normal in-flight churn (the node storing a handful
60/// of replicated records between quoting and verifying) is a negligible price
61/// change and passes; at high fill the curve is steep, so the same percentage
62/// still catches genuinely stale, underpriced quotes.
63const QUOTE_PRICE_STALENESS_PCT_TOLERANCE: u64 = 25;
64
65/// Configuration for EVM payment verification.
66///
67/// EVM verification is always on. All new data requires on-chain
68/// payment verification. The network field selects which EVM chain to use.
69#[derive(Debug, Clone)]
70pub struct EvmVerifierConfig {
71    /// EVM network to use (Arbitrum One, Arbitrum Sepolia, etc.)
72    pub network: EvmNetwork,
73}
74
75impl Default for EvmVerifierConfig {
76    fn default() -> Self {
77        Self {
78            network: EvmNetwork::ArbitrumOne,
79        }
80    }
81}
82
83/// Configuration for the payment verifier.
84///
85/// All new data requires EVM payment on Arbitrum. The cache stores
86/// previously verified payments to avoid redundant on-chain lookups.
87#[derive(Debug, Clone)]
88pub struct PaymentVerifierConfig {
89    /// EVM verifier configuration.
90    pub evm: EvmVerifierConfig,
91    /// Cache capacity (number of `XorName` values to cache).
92    pub cache_capacity: usize,
93    /// Local node's rewards address.
94    /// The verifier rejects payments that don't include this node as a recipient.
95    pub local_rewards_address: RewardsAddress,
96}
97
98/// Status returned by payment verification.
99#[derive(Debug, Clone, Copy, PartialEq, Eq)]
100pub enum PaymentStatus {
101    /// Data was found in local cache - previously paid.
102    CachedAsVerified,
103    /// New data - payment required.
104    PaymentRequired,
105    /// Payment was provided and verified.
106    PaymentVerified,
107}
108
109impl PaymentStatus {
110    /// Returns true if the data can be stored (cached or payment verified).
111    #[must_use]
112    pub fn can_store(&self) -> bool {
113        matches!(self, Self::CachedAsVerified | Self::PaymentVerified)
114    }
115
116    /// Returns true if this status indicates the data was already paid for.
117    #[must_use]
118    pub fn is_cached(&self) -> bool {
119        matches!(self, Self::CachedAsVerified)
120    }
121}
122
123/// Default capacity for the merkle pool cache (number of pool hashes to cache).
124const DEFAULT_POOL_CACHE_CAPACITY: usize = 1_000;
125
126/// Main payment verifier for ant-node.
127///
128/// Uses:
129/// 1. LRU cache for fast lookups of previously verified `XorName` values
130/// 2. EVM payment verification for new data (always required)
131/// 3. Pool-level cache for merkle batch payments (avoids repeated on-chain queries)
132pub struct PaymentVerifier {
133    /// LRU cache of verified `XorName` values.
134    cache: VerifiedCache,
135    /// LRU cache of verified merkle pool hashes → on-chain payment info.
136    pool_cache: Mutex<LruCache<PoolHash, OnChainPaymentInfo>>,
137    /// LRU cache of pool hashes whose candidate closeness has already been
138    /// verified by this node. Collapses the per-chunk Kademlia lookup cost
139    /// within a batch (256 chunks × 1 pool = 1 lookup instead of 256).
140    closeness_pass_cache: Mutex<LruCache<PoolHash, ()>>,
141    /// In-flight closeness lookups, keyed by pool hash. Lets concurrent PUTs
142    /// for the same pool coalesce onto a single Kademlia lookup AND share
143    /// its result — on both success and failure — which bounds `DoS`
144    /// amplification to one lookup per unique `pool_hash` regardless of
145    /// concurrency.
146    inflight_closeness: Mutex<LruCache<PoolHash, Arc<ClosenessSlot>>>,
147    /// P2P node handle, attached post-construction so merkle verification can
148    /// check that candidate `pub_keys` map to peers actually close to the pool
149    /// midpoint in the live DHT. `None` in unit tests that don't exercise
150    /// merkle verification; production startup MUST call [`attach_p2p_node`].
151    p2p_node: RwLock<Option<Arc<P2PNode>>>,
152    /// LMDB storage handle, attached post-construction so the storage-delta
153    /// freshness check can read the authoritative on-disk record count without
154    /// depending on a side counter that may drift from replication/repair/prune
155    /// paths. `None` in unit tests that pre-set [`Self::test_records_override`];
156    /// production startup MUST call [`attach_storage`].
157    storage: RwLock<Option<Arc<LmdbStorage>>>,
158    /// Test-only override for the storage-delta freshness check.
159    ///
160    /// When `Some(n)`, `validate_quote_freshness` uses `n` as the current
161    /// record count instead of querying `storage.current_chunks()`. Set via
162    /// [`Self::set_records_stored_for_tests`] so unit tests that don't wire a
163    /// real `LmdbStorage` can still drive the freshness logic.
164    test_records_override: RwLock<Option<u64>>,
165    /// Configuration.
166    config: PaymentVerifierConfig,
167}
168
169/// Shared state for an inflight closeness verification. The leader publishes
170/// its result via the `OnceLock`; waiters read that result directly instead
171/// of racing on a cache re-check. Wrapped in an `Arc` and held both by the
172/// leader's drop guard and by each waiting task.
173struct ClosenessSlot {
174    notify: Arc<tokio::sync::Notify>,
175    /// `Some(Ok(()))` on success, `Some(Err(msg))` on failure, `None` if the
176    /// leader disappeared without publishing (panic, cancellation).
177    result: std::sync::OnceLock<std::result::Result<(), String>>,
178}
179
180impl ClosenessSlot {
181    fn new() -> Self {
182        Self {
183            notify: Arc::new(tokio::sync::Notify::new()),
184            result: std::sync::OnceLock::new(),
185        }
186    }
187
188    /// Build an owned `Notified` future that snapshots the `notify_waiters`
189    /// counter at call time. Awaiting this future after dropping external
190    /// locks is race-free: if `notify_waiters` fires between construction
191    /// and the first poll, the snapshot mismatch resolves the future
192    /// immediately.
193    fn notified_owned(&self) -> tokio::sync::futures::OwnedNotified {
194        Arc::clone(&self.notify).notified_owned()
195    }
196}
197
198/// Drop guard that publishes the leader's result, clears the inflight slot,
199/// and wakes all waiters. Fires on every exit path: success, failure, panic,
200/// future-cancellation.
201///
202/// The guard owns its own `Arc<ClosenessSlot>` so `notify_waiters` still
203/// fires even if LRU pressure evicted the slot before the leader finished.
204/// Waiters see the published result via `result.get()`; the `Notify` is only
205/// the wake-up signal.
206struct InflightGuard<'a> {
207    slot_cache: &'a Mutex<LruCache<PoolHash, Arc<ClosenessSlot>>>,
208    pool_hash: PoolHash,
209    slot: Arc<ClosenessSlot>,
210}
211
212impl InflightGuard<'_> {
213    /// Publish the leader's result. Called exactly once by the leader on
214    /// every successful or explicit-error exit. If dropped without calling
215    /// (panic, cancellation) the guard still wakes waiters but leaves
216    /// `result` empty, which waiters treat as a transient failure and retry.
217    fn publish(&self, result: &Result<()>) {
218        let stored: std::result::Result<(), String> = match result {
219            Ok(()) => Ok(()),
220            Err(e) => Err(e.to_string()),
221        };
222        let _ = self.slot.result.set(stored);
223    }
224}
225
226impl Drop for InflightGuard<'_> {
227    fn drop(&mut self) {
228        // Remove the slot entry if it's still ours. A separate leader may
229        // have inserted a new slot for the same pool_hash after LRU
230        // eviction — don't pop someone else's entry.
231        {
232            let mut cache = self.slot_cache.lock();
233            if let Some(existing) = cache.peek(&self.pool_hash) {
234                if Arc::ptr_eq(existing, &self.slot) {
235                    cache.pop(&self.pool_hash);
236                }
237            }
238        }
239        // Wake every waiter registered against OUR slot, regardless of
240        // whether the cache entry is still ours.
241        self.slot.notify.notify_waiters();
242    }
243}
244
245impl PaymentVerifier {
246    /// Create a new payment verifier.
247    #[must_use]
248    pub fn new(config: PaymentVerifierConfig) -> Self {
249        const _: () = assert!(
250            DEFAULT_POOL_CACHE_CAPACITY > 0,
251            "pool cache capacity must be > 0"
252        );
253        let cache = VerifiedCache::with_capacity(config.cache_capacity);
254        let pool_cache_size =
255            NonZeroUsize::new(DEFAULT_POOL_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
256        let pool_cache = Mutex::new(LruCache::new(pool_cache_size));
257        let closeness_pass_cache = Mutex::new(LruCache::new(pool_cache_size));
258        let inflight_closeness = Mutex::new(LruCache::new(pool_cache_size));
259
260        let cache_capacity = config.cache_capacity;
261        info!("Payment verifier initialized (cache_capacity={cache_capacity}, evm=always-on, pool_cache={DEFAULT_POOL_CACHE_CAPACITY})");
262
263        // Loud warning if a production binary was accidentally built with
264        // `test-utils`: that feature flips the closeness-check fail-open
265        // switch, disabling the pay-yourself defence when P2PNode isn't
266        // attached. Safe in tests, never intended for prod.
267        #[cfg(feature = "test-utils")]
268        crate::logging::error!(
269            "PaymentVerifier: built with `test-utils` feature — merkle closeness \
270             defence falls back to fail-open when no P2PNode is attached. This \
271             feature is for test binaries only; production nodes must be built \
272             without it."
273        );
274
275        Self {
276            cache,
277            pool_cache,
278            closeness_pass_cache,
279            inflight_closeness,
280            p2p_node: RwLock::new(None),
281            storage: RwLock::new(None),
282            test_records_override: RwLock::new(None),
283            config,
284        }
285    }
286
287    /// Attach the node's [`P2PNode`] handle so merkle-payment verification can
288    /// check candidate `pub_keys` against the DHT's actual closest peers to the
289    /// pool midpoint.
290    ///
291    /// Production startup MUST call this once the `P2PNode` exists. Without
292    /// it, the closeness check fails CLOSED in release builds (rejects the
293    /// PUT with a visible error) and fails open in test builds. Idempotent:
294    /// calling twice replaces the handle.
295    pub fn attach_p2p_node(&self, node: Arc<P2PNode>) {
296        *self.p2p_node.write() = Some(node);
297        debug!("PaymentVerifier: P2PNode attached for merkle closeness checks");
298    }
299
300    /// Attach the node's [`LmdbStorage`] handle so storage-delta freshness
301    /// checks can query the authoritative on-disk record count.
302    ///
303    /// Production startup MUST call this once the storage exists; otherwise
304    /// `validate_quote_freshness` falls back to treating the current count as
305    /// zero, which will reject all non-trivial quotes. Idempotent: calling
306    /// twice replaces the handle.
307    pub fn attach_storage(&self, storage: Arc<LmdbStorage>) {
308        *self.storage.write() = Some(storage);
309        debug!("PaymentVerifier: LmdbStorage attached for storage-delta freshness checks");
310    }
311
312    /// Test-only setter for the current record count used by storage-delta
313    /// freshness checks. Lets unit tests drive the freshness logic without
314    /// wiring a real `LmdbStorage`. Has no effect in production code because
315    /// production code is expected to call [`Self::attach_storage`] instead.
316    #[cfg(any(test, feature = "test-utils"))]
317    pub fn set_records_stored_for_tests(&self, count: u64) {
318        *self.test_records_override.write() = Some(count);
319    }
320
321    /// Snapshot the current record count for freshness comparisons.
322    ///
323    /// Prefers the attached `LmdbStorage` (authoritative — covers client PUTs,
324    /// replication stores, repair fetches, and prune deletes by definition).
325    /// Falls back to a test override if one was set. Returns `None` only when
326    /// no source is available (mis-configured production startup); the caller
327    /// treats that as "unknown" and skips storage-delta gating rather than
328    /// rejecting all quotes outright.
329    fn current_records_stored(&self) -> Option<u64> {
330        if let Some(storage) = self.storage.read().as_ref() {
331            match storage.current_chunks() {
332                Ok(n) => return Some(n),
333                Err(e) => {
334                    warn!(
335                        "PaymentVerifier: failed to read current_chunks() for freshness check: {e}"
336                    );
337                    return None;
338                }
339            }
340        }
341        *self.test_records_override.read()
342    }
343
344    /// Check if payment is required for the given `XorName`.
345    ///
346    /// This is the main entry point for payment verification:
347    /// 1. Check LRU cache (fast path)
348    /// 2. If not cached, payment is required
349    ///
350    /// # Arguments
351    ///
352    /// * `xorname` - The content-addressed name of the data
353    ///
354    /// # Returns
355    ///
356    /// * `PaymentStatus::CachedAsVerified` - Found in local cache (previously paid)
357    /// * `PaymentStatus::PaymentRequired` - Not cached (payment required)
358    pub fn check_payment_required(&self, xorname: &XorName) -> PaymentStatus {
359        // Check LRU cache (fast path)
360        if self.cache.contains(xorname) {
361            if crate::logging::enabled!(crate::logging::Level::DEBUG) {
362                debug!("Data {} found in verified cache", hex::encode(xorname));
363            }
364            return PaymentStatus::CachedAsVerified;
365        }
366
367        // Not in cache - payment required
368        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
369            debug!(
370                "Data {} not in cache - payment required",
371                hex::encode(xorname)
372            );
373        }
374        PaymentStatus::PaymentRequired
375    }
376
377    /// Verify that a PUT request has valid payment.
378    ///
379    /// This is the complete payment verification flow:
380    /// 1. Check if data is in cache (previously paid)
381    /// 2. If not, verify the provided payment proof
382    ///
383    /// # Arguments
384    ///
385    /// * `xorname` - The content-addressed name of the data
386    /// * `payment_proof` - Optional payment proof (required if not in cache)
387    ///
388    /// # Returns
389    ///
390    /// * `Ok(PaymentStatus)` - Verification succeeded
391    /// * `Err(Error::Payment)` - No payment and not cached, or payment invalid
392    ///
393    /// # Errors
394    ///
395    /// Returns an error if payment is required but not provided, or if payment is invalid.
396    pub async fn verify_payment(
397        &self,
398        xorname: &XorName,
399        payment_proof: Option<&[u8]>,
400    ) -> Result<PaymentStatus> {
401        // First check if payment is required
402        let status = self.check_payment_required(xorname);
403
404        match status {
405            PaymentStatus::CachedAsVerified => {
406                // No payment needed - already in cache
407                Ok(status)
408            }
409            PaymentStatus::PaymentRequired => {
410                // EVM verification is always on — verify the proof
411                if let Some(proof) = payment_proof {
412                    let proof_len = proof.len();
413                    if proof_len < MIN_PAYMENT_PROOF_SIZE_BYTES {
414                        return Err(Error::Payment(format!(
415                            "Payment proof too small: {proof_len} bytes (min {MIN_PAYMENT_PROOF_SIZE_BYTES})"
416                        )));
417                    }
418                    if proof_len > MAX_PAYMENT_PROOF_SIZE_BYTES {
419                        return Err(Error::Payment(format!(
420                            "Payment proof too large: {proof_len} bytes (max {MAX_PAYMENT_PROOF_SIZE_BYTES} bytes)"
421                        )));
422                    }
423
424                    // Detect proof type from version tag byte
425                    match detect_proof_type(proof) {
426                        Some(ProofType::Merkle) => {
427                            self.verify_merkle_payment(xorname, proof).await?;
428                        }
429                        Some(ProofType::SingleNode) => {
430                            let (payment, tx_hashes) = deserialize_proof(proof).map_err(|e| {
431                                Error::Payment(format!("Failed to deserialize payment proof: {e}"))
432                            })?;
433
434                            if !tx_hashes.is_empty() {
435                                debug!("Proof includes {} transaction hash(es)", tx_hashes.len());
436                            }
437
438                            self.verify_evm_payment(xorname, &payment).await?;
439                        }
440                        None => {
441                            let tag = proof.first().copied().unwrap_or(0);
442                            return Err(Error::Payment(format!(
443                                "Unknown payment proof type tag: 0x{tag:02x}"
444                            )));
445                        }
446                        // ant-protocol marks `ProofType` as `#[non_exhaustive]`.
447                        // A future proof variant that this node does not yet
448                        // understand must be rejected, not silently accepted.
449                        Some(_) => {
450                            let tag = proof.first().copied().unwrap_or(0);
451                            return Err(Error::Payment(format!(
452                                "Unsupported payment proof type tag: 0x{tag:02x} (this node's protocol version does not handle it — upgrade ant-node)"
453                            )));
454                        }
455                    }
456
457                    // Cache the verified xorname
458                    self.cache.insert(*xorname);
459
460                    Ok(PaymentStatus::PaymentVerified)
461                } else {
462                    // No payment provided in production mode
463                    let xorname_hex = hex::encode(xorname);
464                    Err(Error::Payment(format!(
465                        "Payment required for new data {xorname_hex}"
466                    )))
467                }
468            }
469            PaymentStatus::PaymentVerified => Err(Error::Payment(
470                "Unexpected PaymentVerified status from check_payment_required".to_string(),
471            )),
472        }
473    }
474
475    /// Get cache statistics.
476    #[must_use]
477    pub fn cache_stats(&self) -> CacheStats {
478        self.cache.stats()
479    }
480
481    /// Get the number of cached entries.
482    #[must_use]
483    pub fn cache_len(&self) -> usize {
484        self.cache.len()
485    }
486
487    /// Pre-populate the payment cache for a given address.
488    ///
489    /// This marks the address as already paid, so subsequent `verify_payment`
490    /// calls will return `CachedAsVerified` without on-chain verification.
491    /// Useful for test setups where real EVM payment is not needed.
492    #[cfg(any(test, feature = "test-utils"))]
493    pub fn cache_insert(&self, xorname: XorName) {
494        self.cache.insert(xorname);
495    }
496
497    /// Pre-populate the merkle pool cache. Testing helper that lets e2e tests
498    /// bypass the on-chain `completedMerklePayments` lookup when the point of
499    /// the test is to exercise merkle-verification logic BEFORE the on-chain
500    /// call (e.g. the pay-yourself closeness check).
501    #[cfg(any(test, feature = "test-utils"))]
502    pub fn pool_cache_insert(&self, pool_hash: PoolHash, info: OnChainPaymentInfo) {
503        let mut cache = self.pool_cache.lock();
504        cache.put(pool_hash, info);
505    }
506
507    /// Verify a single-node EVM payment proof.
508    ///
509    /// Verification steps:
510    /// 1. Exactly `CLOSE_GROUP_SIZE` quotes are present
511    /// 2. All quotes target the correct content address (xorname binding)
512    /// 3. Quote timestamps are fresh (not expired or future-dated)
513    /// 4. Peer ID bindings match the ML-DSA-65 public keys
514    /// 5. This node is among the quoted recipients
515    /// 6. All ML-DSA-65 signatures are valid (offloaded to `spawn_blocking`)
516    /// 7. The median-priced quote was paid at least 3x its price on-chain
517    ///    (looked up via `completedPayments(quoteHash)` on the payment vault)
518    ///
519    /// For unit tests that don't need on-chain verification, pre-populate
520    /// the cache so `verify_payment` returns `CachedAsVerified` before
521    /// reaching this method.
522    async fn verify_evm_payment(&self, xorname: &XorName, payment: &ProofOfPayment) -> Result<()> {
523        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
524            let xorname_hex = hex::encode(xorname);
525            let quote_count = payment.peer_quotes.len();
526            debug!("Verifying EVM payment for {xorname_hex} with {quote_count} quotes");
527        }
528
529        Self::validate_quote_structure(payment)?;
530        Self::validate_quote_content(payment, xorname)?;
531        self.validate_quote_freshness(payment)?;
532        Self::validate_peer_bindings(payment)?;
533        self.validate_local_recipient(payment)?;
534
535        // Verify quote signatures (CPU-bound, run off async runtime)
536        let peer_quotes = payment.peer_quotes.clone();
537        tokio::task::spawn_blocking(move || {
538            for (encoded_peer_id, quote) in &peer_quotes {
539                if !verify_quote_signature(quote) {
540                    return Err(Error::Payment(
541                        format!("Quote ML-DSA-65 signature verification failed for peer {encoded_peer_id:?}"),
542                    ));
543                }
544            }
545            Ok(())
546        })
547        .await
548        .map_err(|e| Error::Payment(format!("Signature verification task failed: {e}")))??;
549
550        // Reconstruct the SingleNodePayment to identify the median quote.
551        // from_quotes() sorts by price and marks the median for 3x payment.
552        let quotes_with_prices: Vec<_> = payment
553            .peer_quotes
554            .iter()
555            .map(|(_, quote)| (quote.clone(), quote.price))
556            .collect();
557        let single_payment = SingleNodePayment::from_quotes(quotes_with_prices).map_err(|e| {
558            Error::Payment(format!(
559                "Failed to reconstruct payment for verification: {e}"
560            ))
561        })?;
562
563        // Verify the median quote was paid at least 3x its price on-chain
564        // via completedPayments(quoteHash) on the payment vault contract.
565        let verified_amount = single_payment
566            .verify(&self.config.evm.network)
567            .await
568            .map_err(|e| {
569                let xorname_hex = hex::encode(xorname);
570                Error::Payment(format!(
571                    "Median quote payment verification failed for {xorname_hex}: {e}"
572                ))
573            })?;
574
575        if crate::logging::enabled!(crate::logging::Level::INFO) {
576            let xorname_hex = hex::encode(xorname);
577            info!("EVM payment verified for {xorname_hex} (median paid {verified_amount} atto)");
578        }
579        Ok(())
580    }
581
582    /// Validate quote count, uniqueness, and basic structure.
583    fn validate_quote_structure(payment: &ProofOfPayment) -> Result<()> {
584        if payment.peer_quotes.is_empty() {
585            return Err(Error::Payment("Payment has no quotes".to_string()));
586        }
587
588        let quote_count = payment.peer_quotes.len();
589        if quote_count != CLOSE_GROUP_SIZE {
590            return Err(Error::Payment(format!(
591                "Payment must have exactly {CLOSE_GROUP_SIZE} quotes, got {quote_count}"
592            )));
593        }
594
595        let mut seen: Vec<&evmlib::EncodedPeerId> = Vec::with_capacity(quote_count);
596        for (encoded_peer_id, _) in &payment.peer_quotes {
597            if seen.contains(&encoded_peer_id) {
598                return Err(Error::Payment(format!(
599                    "Duplicate peer ID in payment quotes: {encoded_peer_id:?}"
600                )));
601            }
602            seen.push(encoded_peer_id);
603        }
604
605        Ok(())
606    }
607
608    /// Verify all quotes target the correct content address.
609    fn validate_quote_content(payment: &ProofOfPayment, xorname: &XorName) -> Result<()> {
610        for (encoded_peer_id, quote) in &payment.peer_quotes {
611            if !verify_quote_content(quote, xorname) {
612                let expected_hex = hex::encode(xorname);
613                let actual_hex = hex::encode(quote.content.0);
614                return Err(Error::Payment(format!(
615                    "Quote content address mismatch for peer {encoded_peer_id:?}: expected {expected_hex}, got {actual_hex}"
616                )));
617            }
618        }
619        Ok(())
620    }
621
622    /// Verify quote freshness by price staleness, not wall-clock time and not a
623    /// symmetric record-count delta.
624    ///
625    /// The quote price encodes the quoting node's record count via the quadratic
626    /// pricing formula. We compute the price the node would charge *now* for its
627    /// current fullness and reject the quote only if the client under-paid that
628    /// current price by more than [`QUOTE_PRICE_STALENESS_PCT_TOLERANCE`]. This:
629    ///
630    /// - removes the platform clock dependency that caused Windows/UTC false
631    ///   rejections (timestamps are deliberately unused);
632    /// - never rejects an over-payment (the previous symmetric `abs_diff` check
633    ///   rejected quotes where the node had *fewer* records than when it quoted,
634    ///   i.e. the client paid for a fuller, pricier node — nonsensical to
635    ///   reject); and
636    /// - self-scales with the pricing curve, so benign in-flight churn (a node
637    ///   storing a few replicated records between quoting and verifying) — a
638    ///   negligible price move where the curve is flat — no longer rejects an
639    ///   otherwise-valid payment. On a fresh, rapidly-filling testnet that churn
640    ///   routinely exceeded the old fixed 5-record tolerance and rejected ~100%
641    ///   of uploads via the multiplicative per-chunk effect.
642    ///
643    /// The current record count comes from the attached [`LmdbStorage`] via
644    /// `current_chunks()` — an O(1) B-tree page-header read, authoritative
645    /// regardless of which path stored the record (client PUT, replication
646    /// store, repair fetch) or removed it (prune delete). If no storage source
647    /// is available (mis-configured production startup, or a unit test that
648    /// didn't set a test override), the gate is skipped entirely rather than
649    /// rejecting every quote — see [`Self::current_records_stored`].
650    fn validate_quote_freshness(&self, payment: &ProofOfPayment) -> Result<()> {
651        let Some(current_records) = self.current_records_stored() else {
652            debug!(
653                "PaymentVerifier: no record-count source attached; skipping \
654                 quote price-staleness check"
655            );
656            return Ok(());
657        };
658
659        // The price the node would charge right now for its current fullness,
660        // and the floor a quote may not drop below (one-directional: paying at
661        // or above `current_price` is always accepted).
662        let current_price = calculate_price(usize::try_from(current_records).unwrap_or(usize::MAX));
663        let min_acceptable_price = current_price.saturating_mul(Amount::from(
664            100u64.saturating_sub(QUOTE_PRICE_STALENESS_PCT_TOLERANCE),
665        )) / Amount::from(100u64);
666
667        for (encoded_peer_id, quote) in &payment.peer_quotes {
668            if quote.price < min_acceptable_price {
669                let quoted_records = derive_records_stored_from_price(quote.price);
670                return Err(Error::Payment(format!(
671                    "Quote from peer {encoded_peer_id:?} stale: paid price encodes \
672                     {quoted_records} records but node currently holds {current_records} \
673                     (paid {}, minimum acceptable {min_acceptable_price} at \
674                     {QUOTE_PRICE_STALENESS_PCT_TOLERANCE}% under-payment tolerance)",
675                    quote.price
676                )));
677            }
678        }
679        Ok(())
680    }
681
682    /// Verify each quote's `pub_key` matches the claimed peer ID via BLAKE3.
683    fn validate_peer_bindings(payment: &ProofOfPayment) -> Result<()> {
684        for (encoded_peer_id, quote) in &payment.peer_quotes {
685            let expected_peer_id = peer_id_from_public_key_bytes(&quote.pub_key)
686                .map_err(|e| Error::Payment(format!("Invalid ML-DSA public key in quote: {e}")))?;
687
688            if expected_peer_id.as_bytes() != encoded_peer_id.as_bytes() {
689                let expected_hex = expected_peer_id.to_hex();
690                let actual_hex = hex::encode(encoded_peer_id.as_bytes());
691                return Err(Error::Payment(format!(
692                    "Quote pub_key does not belong to claimed peer {encoded_peer_id:?}: \
693                     BLAKE3(pub_key) = {expected_hex}, peer_id = {actual_hex}"
694                )));
695            }
696        }
697        Ok(())
698    }
699
700    /// Minimum number of candidate `pub_keys` (out of 16) whose derived
701    /// `PeerId` must be among the DHT's actual closest peers to the pool
702    /// midpoint address for the pool to be accepted.
703    ///
704    /// Set to a simple majority (9/16). Two nodes' views of the closest set
705    /// to a midpoint diverge on a young, high-churn, NAT-heavy network — by
706    /// more than a near-unanimous threshold tolerates — so a stricter bar
707    /// rejected honest pools whose candidates are genuinely drawn from the
708    /// midpoint's close group but don't all reappear in this storer's own
709    /// lookup. A majority absorbs that divergence while still requiring most
710    /// candidates to be real peers the live DHT lists as closest.
711    ///
712    /// Security cost: a lower threshold widens the room for the "pay-yourself"
713    /// attack — an attacker running real neighbourhood peers needs fewer of
714    /// them to clear a majority than to clear a near-unanimous bar. No theft
715    /// of funds is possible regardless (payment binds on-chain to the rewards
716    /// address); the cost is that grinding storage payments back to your own
717    /// nodes gets cheaper. Each counted candidate must still be a peer the
718    /// live DHT actually returns as closest — a fabricated off-network key
719    /// cannot satisfy this — so the floor is "run N real top-K Sybil nodes
720    /// AND grind the midpoint", just with a smaller N. Pairs with the planned
721    /// pool-midpoint consensus-anchor work, which removes the midpoint
722    /// grinding freedom that makes a low threshold dangerous.
723    const CANDIDATE_CLOSENESS_REQUIRED: usize = 9;
724
725    /// Timeout for the authoritative network lookup used by the closeness
726    /// check.
727    ///
728    /// Iterative Kademlia lookups can cascade through `MAX_ITERATIONS = 20`
729    /// rounds in saorsa-core's `find_closest_nodes_network`, and a single
730    /// unresponsive peer's dial can take 20–30s before timing out. On a
731    /// young network (e.g. fresh testnet, NAT-simulated peers in 30% of
732    /// the swarm) iterations average ~10s each — captured trace from
733    /// STG-01 EWR-3 ant-node-1 just before a pre-fix timeout:
734    ///
735    /// ```text
736    /// Iter 0: +0.0s | Iter 1: +0.2s | Iter 2: +6.6s | Iter 3: +13.1s
737    /// Iter 4: +20.9s | Iter 5: +39.8s | Iter 6: +50.8s | [60s wall]
738    /// ```
739    ///
740    /// 60s caps the lookup at ~7 iterations and rejects honest pools whose
741    /// candidates only emerge after iteration 7. 240s gives ~1.2× headroom
742    /// over the ~200s natural worst-case runtime on a 1k-node testnet.
743    ///
744    /// `DoS` amplification stays bounded at roughly one in-flight lookup
745    /// per unique `pool_hash` under typical load, via
746    /// [`closeness_pass_cache`] + [`inflight_closeness`]. The bound is
747    /// "typical" because `inflight_closeness` is an LRU and a sustained
748    /// flood of unique `pool_hash` entries can evict an in-flight slot,
749    /// at which point a second leader can race for the same pool (see
750    /// [`InflightGuard::drop`]). At steady state the pool cache and pool
751    /// signature verification gate keep this rare in practice.
752    const CLOSENESS_LOOKUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(240);
753
754    /// Width of the storer's authoritative network lookup, in peers.
755    ///
756    /// The client over-queries `2 * CANDIDATES_PER_POOL = 32` peers via
757    /// `find_closest_peers(addr, 32)` (see
758    /// `ant-client/ant-core/src/data/client/merkle.rs::get_merkle_candidate_pool`)
759    /// and selects 16 valid responders by XOR distance — so truly-close
760    /// peers that are slow, NAT'd, or briefly unreachable get filtered
761    /// out and replaced by peers from positions 17–32 of the network's
762    /// actual ranking. The storer must therefore verify against the same
763    /// wider window: a pool containing peers from positions 17–32 is
764    /// honest (those peers really exist in the network's closest-32 set),
765    /// it's just that the client's quote-collection step couldn't reach
766    /// the peers at positions <17 in time.
767    ///
768    /// Empirical effect on STG-01 (1k-node testnet, 30% NAT-simulated):
769    /// widening from K=16 to K=32 dropped client-side closeness
770    /// mismatches from ~115 to ~31 per 5 min, a 73% reduction.
771    ///
772    /// Performance note: `count` does not just truncate the lookup —
773    /// `find_closest_nodes_network` keeps iterating until either
774    /// `MAX_ITERATIONS` is reached or `best_nodes.len() >= count`. K=32
775    /// can therefore extend lookups by a few iterations on sparse
776    /// networks vs K=16, which reinforces (rather than undermines) the
777    /// timeout bump above.
778    ///
779    /// Security: the pay-yourself attack still requires the attacker's
780    /// fabricated `PeerId`s to land in the storer's authoritative top-K, so
781    /// the dominant cost is Sybil-grinding midpoint addresses or running real
782    /// nodes near the target. The leniency for honest divergence comes from
783    /// the `CANDIDATE_CLOSENESS_REQUIRED` majority threshold, not from this
784    /// window; widening the window further was measured as too heavy on the
785    /// lookup path.
786    const CLOSENESS_LOOKUP_WIDTH: usize = 2 * evmlib::merkle_payments::CANDIDATES_PER_POOL;
787
788    /// Maximum waiter → leader retries when the leader's future was cancelled
789    /// or panicked before publishing a result. Beyond this the waiter returns
790    /// a visible error rather than spinning indefinitely through a
791    /// cancellation cascade.
792    ///
793    /// Worst-case waiter wall-clock is `(MAX_LEADER_RETRIES + 1) *
794    /// CLOSENESS_LOOKUP_TIMEOUT` (one wait per attempt). Kept low (1)
795    /// because the only realistic trigger is leader future-cancellation,
796    /// which should be extraordinarily rare; under sustained adversarial
797    /// cancellation a higher cap doesn't add resilience, it just hides
798    /// the symptom. With `CLOSENESS_LOOKUP_TIMEOUT = 240s` this caps a
799    /// single user-visible verification at ~8 min worst case (vs ~20 min
800    /// at the previous value of 4).
801    const MAX_LEADER_RETRIES: usize = 1;
802
803    /// Compute the storer's authoritative-lookup width for a candidate pool.
804    ///
805    /// Returns `max(CLOSENESS_LOOKUP_WIDTH, pool_len)`: matches the client's
806    /// over-query width today, and scales with the pool if a future protocol
807    /// bump grows pool size beyond `CLOSENESS_LOOKUP_WIDTH`. Truncating to
808    /// `CLOSENESS_LOOKUP_WIDTH` in that future case would re-open the
809    /// K-too-small failure mode (the storer would reject honest pools whose
810    /// candidates legitimately span a wider XOR range than the storer
811    /// fetched). Pinned by `closeness_lookup_count_uses_max_of_width_and_pool_len`.
812    const fn closeness_lookup_count(pool_len: usize) -> usize {
813        if Self::CLOSENESS_LOOKUP_WIDTH > pool_len {
814            Self::CLOSENESS_LOOKUP_WIDTH
815        } else {
816            pool_len
817        }
818    }
819
820    /// Verify that the candidate pool's `pub_keys` correspond to peers that
821    /// are actually XOR-closest to the pool midpoint address, by querying
822    /// the DHT for its closest peers to that address and requiring that a
823    /// majority of the candidates match.
824    ///
825    /// **What this blocks**: the "pay yourself" attack. Candidate signatures
826    /// only cover `(price, reward_address, timestamp)` and the `pub_key` bytes —
827    /// nothing ties a candidate to a network-registered identity or to the
828    /// pool neighbourhood. Without this check an attacker can generate 16
829    /// ML-DSA keypairs locally, point all 16 `reward_address` fields at a
830    /// single attacker-controlled wallet, submit the merkle payment, and drain
831    /// their own payment back out.
832    ///
833    /// **How it blocks**: each candidate's `PeerId = BLAKE3(pub_key)`; the DHT
834    /// is the authoritative source of "which peers exist at this XOR
835    /// coordinate". If the attacker's 16 fabricated `PeerId`s are not among
836    /// the peers the network actually lists as closest to the pool address,
837    /// the pool is forged.
838    ///
839    /// **Scope**: a `MerklePaymentProof` carries exactly one `winner_pool`
840    /// (the pool the smart contract selected for the batch). Every storing
841    /// node that receives the proof independently re-runs this check against
842    /// that same pool, so a forged pool is rejected at every node it
843    /// reaches.
844    ///
845    /// **Known limitation — Sybil-grinding**: `midpoint_proof.address()` is a
846    /// BLAKE3 hash of attacker-controllable inputs (leaf bytes, tree root,
847    /// timestamp). A determined attacker who *also* runs Sybil DHT nodes can
848    /// grind the midpoint until it lands in a region where a majority of
849    /// their Sybil keys are the true network-closest — at which point this check
850    /// passes for the attacker. Closing that gap requires binding the
851    /// midpoint to an attacker-uncontrolled value (e.g. a block hash at
852    /// payment time or an on-chain VRF) or a Sybil-resistant identity
853    /// layer. This defence raises the attack cost from "free" to "run N
854    /// Sybil nodes AND grind", which is a meaningful but not complete
855    /// improvement.
856    async fn verify_merkle_candidate_closeness(
857        &self,
858        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
859        pool_hash: PoolHash,
860    ) -> Result<()> {
861        // Fast path: this node already verified this pool successfully.
862        // A batch of 256 chunks shares one winner_pool, so without this cache
863        // we'd pay a Kademlia lookup per chunk.
864        if self.closeness_pass_cache.lock().get(&pool_hash).is_some() {
865            return Ok(());
866        }
867
868        // Single-flight: on each attempt, either claim leadership by
869        // inserting a fresh `ClosenessSlot`, or wait on an existing leader
870        // and read its published result. The leader holds an `Arc` to the
871        // slot independent of the LruCache so waiters are still woken if
872        // eviction pressure kicked the cache entry.
873        //
874        // The `notified_owned()` future snapshots the `notify_waiters`
875        // counter at the moment of construction (while we hold the lock),
876        // which makes the subsequent `.await` race-free: if the leader
877        // calls `notify_waiters` between our construction and our poll, the
878        // counter has advanced and the future resolves immediately on first
879        // poll.
880        //
881        // Bounded retry: if we're a waiter and the leader gets cancelled or
882        // panics (slot.result.get() == None after wake-up), we loop back to
883        // claim leadership. `MAX_LEADER_RETRIES` bounds the attempts so
884        // adversarial cancellation cascades cannot spin this indefinitely.
885        for attempt in 0..=Self::MAX_LEADER_RETRIES {
886            // Release the mutex guard explicitly before any await below.
887            // Clippy wants `if let ... else` written as `map_or_else`, but
888            // any such rewrite re-borrows the locked `inflight` inside the
889            // closure and fails the borrow checker — so the lint is
890            // silenced here.
891            #[allow(clippy::option_if_let_else)]
892            let (waiter_slot, leader_slot) = {
893                let mut inflight = self.inflight_closeness.lock();
894                let chosen = if let Some(existing) = inflight.get(&pool_hash) {
895                    (Some(Arc::clone(existing)), None)
896                } else {
897                    let slot = Arc::new(ClosenessSlot::new());
898                    inflight.put(pool_hash, Arc::clone(&slot));
899                    (None, Some(slot))
900                };
901                drop(inflight);
902                chosen
903            };
904
905            if let Some(slot) = waiter_slot {
906                // Build the owned-notified future BEFORE awaiting, so it
907                // snapshots the `notify_waiters` counter now. The slot
908                // already existed when we locked, so the leader is either
909                // running or finished; in both cases the snapshot + counter
910                // check ensures we wake up correctly.
911                let notified = slot.notified_owned();
912                notified.await;
913
914                // Leader published a result — use it directly.
915                if let Some(result) = slot.result.get() {
916                    return result.clone().map_err(Error::Payment);
917                }
918                // Leader disappeared without publishing (panic or
919                // cancellation). Slot was cleared by the leader's drop
920                // guard; loop to become the new leader — unless we've
921                // hit the retry bound (see MAX_LEADER_RETRIES).
922                if attempt == Self::MAX_LEADER_RETRIES {
923                    return Err(Error::Payment(
924                        "Merkle candidate pool rejected: closeness leader \
925                         repeatedly failed to publish a result (likely \
926                         repeated cancellation or panic)."
927                            .into(),
928                    ));
929                }
930                continue;
931            }
932
933            // Leader path. Drop guard clears the slot and wakes waiters on
934            // every exit (success, failure, panic, cancellation).
935            let Some(slot) = leader_slot else {
936                // Unreachable by construction.
937                return Err(Error::Payment(
938                    "internal error: neither leader nor waiter in closeness check".into(),
939                ));
940            };
941            let guard = InflightGuard {
942                slot_cache: &self.inflight_closeness,
943                pool_hash,
944                slot,
945            };
946
947            let result = self.verify_merkle_candidate_closeness_inner(pool).await;
948            guard.publish(&result);
949            if result.is_ok() {
950                self.closeness_pass_cache.lock().put(pool_hash, ());
951            }
952            return result;
953        }
954        // Unreachable: the for-loop body always either `return`s or `continue`s,
955        // and the waiter branch's `continue` only runs when `attempt <
956        // Self::MAX_LEADER_RETRIES`. The last iteration's waiter branch returns
957        // via the retry-bound check; the leader branch always returns.
958        Err(Error::Payment(
959            "internal error: closeness retry loop exited without returning".into(),
960        ))
961    }
962
963    /// Inner closeness check: the actual DHT lookup + set-membership test.
964    /// Wrapped by [`verify_merkle_candidate_closeness`] with a pass-cache and
965    /// single-flight guard so a batch of chunks and a storm of forged PUTs
966    /// don't multiply the lookup cost.
967    /// Derive each candidate's `PeerId` from its `pub_key` and reject the
968    /// pool if any `PeerId` appears more than once.
969    ///
970    /// This is a pure-validation pre-check, runnable without a `P2PNode`:
971    /// catches the case where one real peer's `pub_key` is repeated to
972    /// inflate the closeness match count, without paying for a Kademlia
973    /// lookup. An honest pool has [`evmlib::merkle_payments::CANDIDATES_PER_POOL`]
974    /// distinct candidate `pub_keys` by construction.
975    fn derive_distinct_candidate_peer_ids(
976        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
977    ) -> Result<Vec<PeerId>> {
978        let mut candidate_peer_ids = Vec::with_capacity(pool.candidate_nodes.len());
979        let mut seen = std::collections::HashSet::with_capacity(pool.candidate_nodes.len());
980        for candidate in &pool.candidate_nodes {
981            let pid = peer_id_from_public_key_bytes(&candidate.pub_key).map_err(|e| {
982                Error::Payment(format!(
983                    "Invalid ML-DSA public key in merkle candidate: {e}"
984                ))
985            })?;
986            if !seen.insert(pid) {
987                return Err(Error::Payment(
988                    "Merkle candidate pool rejected: duplicate candidate PeerId. An \
989                     honest pool has 16 distinct candidate pub_keys; duplicates would \
990                     let a single real peer satisfy the closeness threshold by being \
991                     counted multiple times."
992                        .into(),
993                ));
994            }
995            candidate_peer_ids.push(pid);
996        }
997        Ok(candidate_peer_ids)
998    }
999
1000    /// Pure-logic closeness check: given the pool's candidate peer IDs and
1001    /// the storer's authoritative network view (closest peers to the pool
1002    /// midpoint), decide whether the pool passes the
1003    /// `CANDIDATE_CLOSENESS_REQUIRED`-of-N threshold.
1004    ///
1005    /// A candidate counts only if its `PeerId` is one of the peers the
1006    /// storer's own network lookup returned (exact set membership). This is
1007    /// the property that makes the gate meaningful: a passing candidate must
1008    /// be a real, reachable peer the live DHT actually routes to and lists
1009    /// among the closest — it cannot be a key fabricated off-network. The
1010    /// leniency in this check is purely the lowered threshold (a majority
1011    /// rather than near-unanimity), which tolerates the closest-set
1012    /// divergence between two nodes' views without admitting fabricated keys.
1013    ///
1014    /// Extracted from `verify_merkle_candidate_closeness_inner` so tests
1015    /// can exercise the matching logic without standing up a real DHT.
1016    /// Mirrors the runtime path exactly: same sparse-network short-circuit,
1017    /// same set-membership check, same error strings.
1018    fn check_closeness_match(
1019        candidate_peer_ids: &[PeerId],
1020        network_peer_ids: &[PeerId],
1021        pool_address: &[u8; 32],
1022    ) -> Result<()> {
1023        // Sparse-network short-circuit: if the DHT itself returned fewer
1024        // peers than the closeness threshold, the proof can never pass —
1025        // not because the candidates are forged, but because we don't
1026        // have an authoritative view to compare against. Surface this
1027        // distinct cause so operators can tell "retry once the network
1028        // settles" apart from "this peer sent a forged pool".
1029        if network_peer_ids.len() < Self::CANDIDATE_CLOSENESS_REQUIRED {
1030            debug!(
1031                "Merkle closeness deferred: network lookup returned {} peers \
1032                 for pool midpoint {} (need at least {} to verify)",
1033                network_peer_ids.len(),
1034                hex::encode(pool_address),
1035                Self::CANDIDATE_CLOSENESS_REQUIRED,
1036            );
1037            return Err(Error::Payment(format!(
1038                "Merkle candidate pool rejected: authoritative DHT lookup returned \
1039                 only {} peers, less than the {} required to verify candidate \
1040                 closeness. Retry once the routing table populates further.",
1041                network_peer_ids.len(),
1042                Self::CANDIDATE_CLOSENESS_REQUIRED,
1043            )));
1044        }
1045
1046        // Exact-match membership against the returned closest peers.
1047        // Candidate `PeerId`s are deduplicated upstream, so each match
1048        // corresponds to a distinct peer.
1049        let network_set: std::collections::HashSet<PeerId> =
1050            network_peer_ids.iter().copied().collect();
1051        let matched = candidate_peer_ids
1052            .iter()
1053            .filter(|pid| network_set.contains(pid))
1054            .count();
1055
1056        if matched < Self::CANDIDATE_CLOSENESS_REQUIRED {
1057            debug!(
1058                "Merkle closeness rejected: {matched}/{} candidates match the DHT's closest peers \
1059                 for pool midpoint {} (required: {}, network returned {} peers)",
1060                candidate_peer_ids.len(),
1061                hex::encode(pool_address),
1062                Self::CANDIDATE_CLOSENESS_REQUIRED,
1063                network_peer_ids.len(),
1064            );
1065            return Err(Error::Payment(
1066                "Merkle candidate pool rejected: candidate pub_keys do not match the \
1067                 network's closest peers to the pool midpoint address. Pools must be \
1068                 collected from the pool-address close group, not fabricated off-network."
1069                    .into(),
1070            ));
1071        }
1072
1073        debug!(
1074            "Merkle closeness passed: {matched}/{} candidates matched the DHT's closest peers \
1075             for pool midpoint {}",
1076            candidate_peer_ids.len(),
1077            hex::encode(pool_address),
1078        );
1079        Ok(())
1080    }
1081
1082    #[allow(clippy::too_many_lines)]
1083    async fn verify_merkle_candidate_closeness_inner(
1084        &self,
1085        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
1086    ) -> Result<()> {
1087        // Pre-check: catch malformed/hostile pools (duplicate candidate
1088        // PeerIds) before paying for the Kademlia lookup. Runs in unit
1089        // tests without a P2PNode too.
1090        let candidate_peer_ids = Self::derive_distinct_candidate_peer_ids(pool)?;
1091
1092        // Release the RwLock guard before any await to avoid holding it
1093        // across an iterative Kademlia lookup.
1094        let attached = self.p2p_node.read().as_ref().map(Arc::clone);
1095        let Some(p2p_node) = attached else {
1096            // Production must call attach_p2p_node at startup. Fail CLOSED
1097            // to avoid silently disabling the defence if a startup path
1098            // regresses and loses the attach call. Unit-test builds that
1099            // construct a PaymentVerifier directly without exercising merkle
1100            // verification are opted-in via `test-utils` to fall back to
1101            // fail-open.
1102            #[cfg(any(test, feature = "test-utils"))]
1103            {
1104                crate::logging::warn!(
1105                    "PaymentVerifier: no P2PNode attached; merkle pay-yourself \
1106                     defence SKIPPED (test build). Production startup MUST call \
1107                     PaymentVerifier::attach_p2p_node."
1108                );
1109                return Ok(());
1110            }
1111            #[cfg(not(any(test, feature = "test-utils")))]
1112            {
1113                crate::logging::error!(
1114                    "PaymentVerifier: no P2PNode attached; rejecting merkle \
1115                     payment. This is a node-startup bug — \
1116                     PaymentVerifier::attach_p2p_node must be called before \
1117                     any PUT handler runs."
1118                );
1119                return Err(Error::Payment(
1120                    "Merkle candidate pool rejected: verifier is not wired to \
1121                     the P2P layer; cannot verify candidate closeness."
1122                        .into(),
1123                ));
1124            }
1125        };
1126
1127        let pool_address = pool.midpoint_proof.address();
1128        // Match the client's over-query width. The client's
1129        // `get_merkle_candidate_pool` queries 2 × `CANDIDATES_PER_POOL` peers
1130        // and picks the 16 closest *valid responders* — so legitimate pools
1131        // routinely include peers from positions 17–32 of the network's true
1132        // ranking when the closer peers are slow or NAT-stuck. The storer
1133        // must look at the same window or it will reject honest pools with
1134        // no security benefit.
1135        //
1136        // `pool.candidate_nodes` is currently a fixed-size array of length
1137        // `CANDIDATES_PER_POOL` (= 16), so `.max(...)` always evaluates to
1138        // `CLOSENESS_LOOKUP_WIDTH` today. The compile-time
1139        // `const _: () = assert!(WIDTH >= CANDIDATES_PER_POOL)` in the test
1140        // module pins that invariant. The `.max(...)` form is belt-and-braces
1141        // for a hypothetical future protocol that grows pool size to a
1142        // `Vec`-typed candidate set: the storer would scale its lookup with
1143        // the pool rather than truncating, which would otherwise re-open the
1144        // K-too-small failure mode.
1145        let lookup_count = Self::closeness_lookup_count(pool.candidate_nodes.len());
1146        let network_lookup = p2p_node
1147            .dht_manager()
1148            .find_closest_nodes_network(&pool_address.0, lookup_count);
1149        let network_peers =
1150            match tokio::time::timeout(Self::CLOSENESS_LOOKUP_TIMEOUT, network_lookup).await {
1151                Ok(Ok(peers)) => peers,
1152                Ok(Err(e)) => {
1153                    debug!(
1154                        "Merkle closeness network-lookup failed for pool midpoint {}: {e}",
1155                        hex::encode(pool_address.0),
1156                    );
1157                    return Err(Error::Payment(
1158                        "Merkle candidate pool rejected: could not verify candidate \
1159                     closeness against the authoritative network view."
1160                            .into(),
1161                    ));
1162                }
1163                Err(_) => {
1164                    debug!(
1165                        "Merkle closeness network-lookup timeout ({:?}) for pool midpoint {}",
1166                        Self::CLOSENESS_LOOKUP_TIMEOUT,
1167                        hex::encode(pool_address.0),
1168                    );
1169                    return Err(Error::Payment(
1170                        "Merkle candidate pool rejected: authoritative network lookup \
1171                     timed out. Retry once the network lookup completes."
1172                            .into(),
1173                    ));
1174                }
1175            };
1176
1177        let network_peer_ids: Vec<PeerId> = network_peers.iter().map(|n| n.peer_id).collect();
1178        Self::check_closeness_match(&candidate_peer_ids, &network_peer_ids, &pool_address.0)
1179    }
1180
1181    /// Verify a merkle batch payment proof.
1182    ///
1183    /// This verification flow:
1184    /// 1. Deserialize the `MerklePaymentProof`
1185    /// 2. Check pool cache for previously verified pool hash
1186    /// 3. If not cached, query on-chain for payment info
1187    /// 4. Validate the proof against on-chain data
1188    /// 5. Cache the pool hash for subsequent chunk verifications in the same batch
1189    #[allow(clippy::too_many_lines)]
1190    async fn verify_merkle_payment(&self, xorname: &XorName, proof_bytes: &[u8]) -> Result<()> {
1191        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
1192            debug!("Verifying merkle payment for {}", hex::encode(xorname));
1193        }
1194
1195        // Deserialize the merkle proof
1196        let merkle_proof = deserialize_merkle_proof(proof_bytes)
1197            .map_err(|e| Error::Payment(format!("Failed to deserialize merkle proof: {e}")))?;
1198
1199        // Verify the address in the proof matches the xorname being stored
1200        if merkle_proof.address.0 != *xorname {
1201            let proof_hex = hex::encode(merkle_proof.address.0);
1202            let store_hex = hex::encode(xorname);
1203            return Err(Error::Payment(format!(
1204                "Merkle proof address mismatch: proof is for {proof_hex}, but storing {store_hex}"
1205            )));
1206        }
1207
1208        let pool_hash = merkle_proof.winner_pool_hash();
1209
1210        // Run cheap local checks BEFORE expensive on-chain queries.
1211        // This prevents DoS via garbage proofs that trigger RPC lookups.
1212        for candidate in &merkle_proof.winner_pool.candidate_nodes {
1213            if !crate::payment::verify_merkle_candidate_signature(candidate) {
1214                return Err(Error::Payment(format!(
1215                    "Invalid ML-DSA-65 signature on merkle candidate node (reward: {})",
1216                    candidate.reward_address
1217                )));
1218            }
1219        }
1220
1221        // Pay-yourself defence: the candidate pub_keys must map to peers the
1222        // live DHT actually considers closest to the pool midpoint. Without
1223        // this, an attacker can point all 16 reward_address fields at a
1224        // self-owned wallet and drain their own payment. Every storing node
1225        // runs this check against the single `winner_pool` in the proof, so a
1226        // forged pool is rejected everywhere it lands. The pass cache and
1227        // single-flight keyed on pool_hash collapse the Kademlia lookup cost
1228        // within a batch and across concurrent PUTs for the same pool.
1229        self.verify_merkle_candidate_closeness(&merkle_proof.winner_pool, pool_hash)
1230            .await?;
1231
1232        // Check pool cache first
1233        let cached_info = {
1234            let mut pool_cache = self.pool_cache.lock();
1235            pool_cache.get(&pool_hash).cloned()
1236        };
1237
1238        let payment_info = if let Some(info) = cached_info {
1239            debug!("Pool cache hit for hash {}", hex::encode(pool_hash));
1240            info
1241        } else {
1242            // Query on-chain for completed merkle payment
1243            let info =
1244                payment_vault::get_completed_merkle_payment(&self.config.evm.network, pool_hash)
1245                    .await
1246                    .map_err(|e| {
1247                        let pool_hex = hex::encode(pool_hash);
1248                        Error::Payment(format!(
1249                            "Failed to query merkle payment info for pool {pool_hex}: {e}"
1250                        ))
1251                    })?;
1252
1253            let paid_node_addresses: Vec<_> = info
1254                .paidNodeAddresses
1255                .iter()
1256                .map(|pna| (pna.rewardsAddress, usize::from(pna.poolIndex), pna.amount))
1257                .collect();
1258
1259            let on_chain_info = OnChainPaymentInfo {
1260                depth: info.depth,
1261                merkle_payment_timestamp: info.merklePaymentTimestamp,
1262                paid_node_addresses,
1263            };
1264
1265            // Cache the pool info for subsequent chunks in the same batch
1266            {
1267                let mut pool_cache = self.pool_cache.lock();
1268                pool_cache.put(pool_hash, on_chain_info.clone());
1269            }
1270
1271            debug!(
1272                "Queried on-chain merkle payment info for pool {}: depth={}, timestamp={}, paid_nodes={}",
1273                hex::encode(pool_hash),
1274                on_chain_info.depth,
1275                on_chain_info.merkle_payment_timestamp,
1276                on_chain_info.paid_node_addresses.len()
1277            );
1278
1279            on_chain_info
1280        };
1281
1282        // Verify timestamp consistency (signatures already checked above before RPC).
1283        for candidate in &merkle_proof.winner_pool.candidate_nodes {
1284            if candidate.merkle_payment_timestamp != payment_info.merkle_payment_timestamp {
1285                return Err(Error::Payment(format!(
1286                    "Candidate timestamp mismatch: expected {}, got {} (reward: {})",
1287                    payment_info.merkle_payment_timestamp,
1288                    candidate.merkle_payment_timestamp,
1289                    candidate.reward_address
1290                )));
1291            }
1292        }
1293
1294        // Get the root from the winner pool's midpoint proof
1295        let smart_contract_root = merkle_proof.winner_pool.midpoint_proof.root();
1296
1297        // Verify the cryptographic merkle proofs (address belongs to tree,
1298        // midpoint belongs to tree, roots match, timestamps valid).
1299        evmlib::merkle_payments::verify_merkle_proof(
1300            &merkle_proof.address,
1301            &merkle_proof.data_proof,
1302            &merkle_proof.winner_pool.midpoint_proof,
1303            payment_info.depth,
1304            smart_contract_root,
1305            payment_info.merkle_payment_timestamp,
1306        )
1307        .map_err(|e| {
1308            let xorname_hex = hex::encode(xorname);
1309            Error::Payment(format!(
1310                "Merkle proof verification failed for {xorname_hex}: {e}"
1311            ))
1312        })?;
1313
1314        // Verify paid node count matches depth
1315        let expected_depth = payment_info.depth as usize;
1316        let actual_paid = payment_info.paid_node_addresses.len();
1317        if actual_paid != expected_depth {
1318            return Err(Error::Payment(format!(
1319                "Wrong number of paid nodes: expected {expected_depth}, got {actual_paid}"
1320            )));
1321        }
1322
1323        // Compute expected per-node payment using the contract formula:
1324        // totalAmount = median16(candidate_prices) * (1 << depth)
1325        // amountPerNode = totalAmount / depth
1326        let expected_per_node = if payment_info.depth > 0 {
1327            let mut candidate_prices: Vec<Amount> = merkle_proof
1328                .winner_pool
1329                .candidate_nodes
1330                .iter()
1331                .map(|c| c.price)
1332                .collect();
1333            candidate_prices.sort_unstable(); // ascending
1334                                              // Upper median (index 8 of 16) — matches Solidity's median16 (k = 8)
1335            let median_price = *candidate_prices
1336                .get(candidate_prices.len() / 2)
1337                .ok_or_else(|| Error::Payment("empty candidate pool in merkle proof".into()))?;
1338            let shift = u32::from(payment_info.depth);
1339            let multiplier = 1u64
1340                .checked_shl(shift)
1341                .ok_or_else(|| Error::Payment("merkle proof depth too large".into()))?;
1342            let total_amount = median_price * Amount::from(multiplier);
1343            total_amount / Amount::from(u64::from(payment_info.depth))
1344        } else {
1345            Amount::ZERO
1346        };
1347
1348        // Verify paid node indices, addresses, and amounts against the candidate pool.
1349        //
1350        // Each paid node must:
1351        // 1. Have a valid index within the candidate pool
1352        // 2. Match the expected reward address at that index
1353        // 3. Have been paid at least the expected per-node amount from the
1354        //    contract formula: median16(prices) * 2^depth / depth
1355        //
1356        // Note: unlike single-node payments, merkle proofs are NOT bound to a
1357        // specific storing node. The contract pays `depth` random nodes from the
1358        // winner pool; the storing node is whichever close-group peer the client
1359        // routes the chunk to. There is no local-recipient check here because
1360        // any node that can verify the merkle proof is allowed to store the chunk.
1361        // Replay protection comes from the per-address proof binding (each proof
1362        // is for a specific XorName in the paid tree).
1363        for (addr, idx, paid_amount) in &payment_info.paid_node_addresses {
1364            let node = merkle_proof
1365                .winner_pool
1366                .candidate_nodes
1367                .get(*idx)
1368                .ok_or_else(|| {
1369                    Error::Payment(format!(
1370                        "Paid node index {idx} out of bounds for pool size {}",
1371                        merkle_proof.winner_pool.candidate_nodes.len()
1372                    ))
1373                })?;
1374            if node.reward_address != *addr {
1375                return Err(Error::Payment(format!(
1376                    "Paid node address mismatch at index {idx}: expected {addr}, got {}",
1377                    node.reward_address
1378                )));
1379            }
1380            if *paid_amount < expected_per_node {
1381                return Err(Error::Payment(format!(
1382                    "Underpayment for node at index {idx}: paid {paid_amount}, \
1383                     expected at least {expected_per_node} \
1384                     (median16 formula, depth={})",
1385                    payment_info.depth
1386                )));
1387            }
1388        }
1389
1390        if crate::logging::enabled!(crate::logging::Level::INFO) {
1391            info!(
1392                "Merkle payment verified for {} (pool: {})",
1393                hex::encode(xorname),
1394                hex::encode(pool_hash)
1395            );
1396        }
1397
1398        Ok(())
1399    }
1400
1401    /// Verify this node is among the paid recipients.
1402    fn validate_local_recipient(&self, payment: &ProofOfPayment) -> Result<()> {
1403        let local_addr = &self.config.local_rewards_address;
1404        let is_recipient = payment
1405            .peer_quotes
1406            .iter()
1407            .any(|(_, quote)| quote.rewards_address == *local_addr);
1408        if !is_recipient {
1409            return Err(Error::Payment(
1410                "Payment proof does not include this node as a recipient".to_string(),
1411            ));
1412        }
1413        Ok(())
1414    }
1415}
1416
1417#[cfg(test)]
1418#[allow(clippy::expect_used, clippy::panic)]
1419mod tests {
1420    use super::*;
1421    use evmlib::merkle_payments::MerklePaymentCandidatePool;
1422    use std::time::SystemTime;
1423
1424    /// Create a verifier for unit tests. EVM is always on, but tests can
1425    /// pre-populate the cache to bypass on-chain verification.
1426    fn create_test_verifier() -> PaymentVerifier {
1427        let config = PaymentVerifierConfig {
1428            evm: EvmVerifierConfig::default(),
1429            cache_capacity: 100,
1430            local_rewards_address: RewardsAddress::new([1u8; 20]),
1431        };
1432        PaymentVerifier::new(config)
1433    }
1434
1435    #[test]
1436    fn test_payment_required_for_new_data() {
1437        let verifier = create_test_verifier();
1438        let xorname = [1u8; 32];
1439
1440        // All uncached data requires payment
1441        let status = verifier.check_payment_required(&xorname);
1442        assert_eq!(status, PaymentStatus::PaymentRequired);
1443    }
1444
1445    #[test]
1446    fn test_cache_hit() {
1447        let verifier = create_test_verifier();
1448        let xorname = [1u8; 32];
1449
1450        // Manually add to cache
1451        verifier.cache.insert(xorname);
1452
1453        // Should return CachedAsVerified
1454        let status = verifier.check_payment_required(&xorname);
1455        assert_eq!(status, PaymentStatus::CachedAsVerified);
1456    }
1457
1458    #[tokio::test]
1459    async fn test_verify_payment_without_proof_rejected() {
1460        let verifier = create_test_verifier();
1461        let xorname = [1u8; 32];
1462
1463        // No proof provided => should return an error (EVM is always on)
1464        let result = verifier.verify_payment(&xorname, None).await;
1465        assert!(
1466            result.is_err(),
1467            "Expected Err without proof, got: {result:?}"
1468        );
1469    }
1470
1471    #[tokio::test]
1472    async fn test_verify_payment_cached() {
1473        let verifier = create_test_verifier();
1474        let xorname = [1u8; 32];
1475
1476        // Add to cache — simulates previously-paid data
1477        verifier.cache.insert(xorname);
1478
1479        // Should succeed without payment (cached)
1480        let result = verifier.verify_payment(&xorname, None).await;
1481        assert!(result.is_ok());
1482        assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified);
1483    }
1484
1485    #[test]
1486    fn test_payment_status_can_store() {
1487        assert!(PaymentStatus::CachedAsVerified.can_store());
1488        assert!(PaymentStatus::PaymentVerified.can_store());
1489        assert!(!PaymentStatus::PaymentRequired.can_store());
1490    }
1491
1492    #[test]
1493    fn test_payment_status_is_cached() {
1494        assert!(PaymentStatus::CachedAsVerified.is_cached());
1495        assert!(!PaymentStatus::PaymentVerified.is_cached());
1496        assert!(!PaymentStatus::PaymentRequired.is_cached());
1497    }
1498
1499    #[tokio::test]
1500    async fn test_cache_preload_bypasses_evm() {
1501        let verifier = create_test_verifier();
1502        let xorname = [42u8; 32];
1503
1504        // Not yet cached — should require payment
1505        assert_eq!(
1506            verifier.check_payment_required(&xorname),
1507            PaymentStatus::PaymentRequired
1508        );
1509
1510        // Pre-populate cache (simulates a previous successful payment)
1511        verifier.cache.insert(xorname);
1512
1513        // Now the xorname should be cached
1514        assert_eq!(
1515            verifier.check_payment_required(&xorname),
1516            PaymentStatus::CachedAsVerified
1517        );
1518    }
1519
1520    #[tokio::test]
1521    async fn test_proof_too_small() {
1522        let verifier = create_test_verifier();
1523        let xorname = [1u8; 32];
1524
1525        // Proof smaller than MIN_PAYMENT_PROOF_SIZE_BYTES
1526        let small_proof = vec![0u8; MIN_PAYMENT_PROOF_SIZE_BYTES - 1];
1527        let result = verifier.verify_payment(&xorname, Some(&small_proof)).await;
1528        assert!(result.is_err());
1529        let err_msg = format!("{}", result.expect_err("should fail"));
1530        assert!(
1531            err_msg.contains("too small"),
1532            "Error should mention 'too small': {err_msg}"
1533        );
1534    }
1535
1536    #[tokio::test]
1537    async fn test_proof_too_large() {
1538        let verifier = create_test_verifier();
1539        let xorname = [2u8; 32];
1540
1541        // Proof larger than MAX_PAYMENT_PROOF_SIZE_BYTES
1542        let large_proof = vec![0u8; MAX_PAYMENT_PROOF_SIZE_BYTES + 1];
1543        let result = verifier.verify_payment(&xorname, Some(&large_proof)).await;
1544        assert!(result.is_err());
1545        let err_msg = format!("{}", result.expect_err("should fail"));
1546        assert!(
1547            err_msg.contains("too large"),
1548            "Error should mention 'too large': {err_msg}"
1549        );
1550    }
1551
1552    #[tokio::test]
1553    async fn test_proof_at_min_boundary_unknown_tag() {
1554        let verifier = create_test_verifier();
1555        let xorname = [3u8; 32];
1556
1557        // Exactly MIN_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected
1558        let boundary_proof = vec![0xFFu8; MIN_PAYMENT_PROOF_SIZE_BYTES];
1559        let result = verifier
1560            .verify_payment(&xorname, Some(&boundary_proof))
1561            .await;
1562        assert!(result.is_err());
1563        let err_msg = format!("{}", result.expect_err("should fail"));
1564        assert!(
1565            err_msg.contains("Unknown payment proof type tag"),
1566            "Error should mention unknown tag: {err_msg}"
1567        );
1568    }
1569
1570    #[tokio::test]
1571    async fn test_proof_at_max_boundary_unknown_tag() {
1572        let verifier = create_test_verifier();
1573        let xorname = [4u8; 32];
1574
1575        // Exactly MAX_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected
1576        let boundary_proof = vec![0xFFu8; MAX_PAYMENT_PROOF_SIZE_BYTES];
1577        let result = verifier
1578            .verify_payment(&xorname, Some(&boundary_proof))
1579            .await;
1580        assert!(result.is_err());
1581        let err_msg = format!("{}", result.expect_err("should fail"));
1582        assert!(
1583            err_msg.contains("Unknown payment proof type tag"),
1584            "Error should mention unknown tag: {err_msg}"
1585        );
1586    }
1587
1588    #[tokio::test]
1589    async fn test_malformed_single_node_proof() {
1590        let verifier = create_test_verifier();
1591        let xorname = [5u8; 32];
1592
1593        // Valid tag (0x01) but garbage payload — should fail deserialization
1594        let mut garbage = vec![crate::ant_protocol::PROOF_TAG_SINGLE_NODE];
1595        garbage.extend_from_slice(&[0xAB; 63]);
1596        let result = verifier.verify_payment(&xorname, Some(&garbage)).await;
1597        assert!(result.is_err());
1598        let err_msg = format!("{}", result.expect_err("should fail"));
1599        assert!(
1600            err_msg.contains("deserialize") || err_msg.contains("Failed"),
1601            "Error should mention deserialization failure: {err_msg}"
1602        );
1603    }
1604
1605    #[test]
1606    fn test_cache_len_getter() {
1607        let verifier = create_test_verifier();
1608        assert_eq!(verifier.cache_len(), 0);
1609
1610        verifier.cache.insert([10u8; 32]);
1611        assert_eq!(verifier.cache_len(), 1);
1612
1613        verifier.cache.insert([20u8; 32]);
1614        assert_eq!(verifier.cache_len(), 2);
1615    }
1616
1617    #[test]
1618    fn test_cache_stats_after_operations() {
1619        let verifier = create_test_verifier();
1620        let xorname = [7u8; 32];
1621
1622        // Miss
1623        verifier.check_payment_required(&xorname);
1624        let stats = verifier.cache_stats();
1625        assert_eq!(stats.misses, 1);
1626        assert_eq!(stats.hits, 0);
1627
1628        // Insert and hit
1629        verifier.cache.insert(xorname);
1630        verifier.check_payment_required(&xorname);
1631        let stats = verifier.cache_stats();
1632        assert_eq!(stats.hits, 1);
1633        assert_eq!(stats.misses, 1);
1634        assert_eq!(stats.additions, 1);
1635    }
1636
1637    #[tokio::test]
1638    async fn test_concurrent_cache_lookups() {
1639        let verifier = std::sync::Arc::new(create_test_verifier());
1640
1641        // Pre-populate cache for all 10 xornames
1642        for i in 0..10u8 {
1643            verifier.cache.insert([i; 32]);
1644        }
1645
1646        let mut handles = Vec::new();
1647        for i in 0..10u8 {
1648            let v = verifier.clone();
1649            handles.push(tokio::spawn(async move {
1650                let xorname = [i; 32];
1651                v.verify_payment(&xorname, None).await
1652            }));
1653        }
1654
1655        for handle in handles {
1656            let result = handle.await.expect("task panicked");
1657            assert!(result.is_ok());
1658            assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified);
1659        }
1660
1661        assert_eq!(verifier.cache_len(), 10);
1662    }
1663
1664    #[test]
1665    fn test_default_evm_config() {
1666        let _config = EvmVerifierConfig::default();
1667        // EVM is always on — default network is ArbitrumOne
1668    }
1669
1670    #[test]
1671    fn test_real_ml_dsa_proof_size_within_limits() {
1672        use crate::payment::metrics::QuotingMetricsTracker;
1673        use crate::payment::proof::PaymentProof;
1674        use crate::payment::quote::{QuoteGenerator, XorName};
1675        use alloy::primitives::FixedBytes;
1676        use evmlib::{EncodedPeerId, RewardsAddress};
1677        use saorsa_core::MlDsa65;
1678        use saorsa_pqc::pqc::types::MlDsaSecretKey;
1679        use saorsa_pqc::pqc::MlDsaOperations;
1680
1681        let ml_dsa = MlDsa65::new();
1682        let mut peer_quotes = Vec::new();
1683
1684        for i in 0..5u8 {
1685            let (public_key, secret_key) = ml_dsa.generate_keypair().expect("keygen");
1686
1687            let rewards_address = RewardsAddress::new([i; 20]);
1688            let metrics_tracker = QuotingMetricsTracker::new(0);
1689            let mut generator = QuoteGenerator::new(rewards_address, metrics_tracker);
1690
1691            let pub_key_bytes = public_key.as_bytes().to_vec();
1692            let sk_bytes = secret_key.as_bytes().to_vec();
1693            generator.set_signer(pub_key_bytes, move |msg| {
1694                let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("sk parse");
1695                let ml_dsa = MlDsa65::new();
1696                ml_dsa.sign(&sk, msg).expect("sign").as_bytes().to_vec()
1697            });
1698
1699            let content: XorName = [i; 32];
1700            let quote = generator.create_quote(content, 4096, 0).expect("quote");
1701
1702            peer_quotes.push((EncodedPeerId::new(rand::random()), quote));
1703        }
1704
1705        let proof = PaymentProof {
1706            proof_of_payment: ProofOfPayment { peer_quotes },
1707            tx_hashes: vec![FixedBytes::from([0xABu8; 32])],
1708        };
1709
1710        let proof_bytes =
1711            crate::payment::proof::serialize_single_node_proof(&proof).expect("serialize");
1712
1713        // 7 ML-DSA-65 quotes with ~1952-byte pub keys and ~3309-byte signatures
1714        // should produce a proof in the 30-80 KB range
1715        assert!(
1716            proof_bytes.len() > 20_000,
1717            "Real 7-quote ML-DSA proof should be > 20 KB, got {} bytes",
1718            proof_bytes.len()
1719        );
1720        assert!(
1721            proof_bytes.len() < MAX_PAYMENT_PROOF_SIZE_BYTES,
1722            "Real 7-quote ML-DSA proof ({} bytes) should fit within {} byte limit",
1723            proof_bytes.len(),
1724            MAX_PAYMENT_PROOF_SIZE_BYTES
1725        );
1726    }
1727
1728    #[tokio::test]
1729    async fn test_content_address_mismatch_rejected() {
1730        use crate::payment::proof::{serialize_single_node_proof, PaymentProof};
1731        use evmlib::{EncodedPeerId, PaymentQuote, RewardsAddress};
1732        use std::time::SystemTime;
1733
1734        let verifier = create_test_verifier();
1735
1736        // The xorname we're trying to store
1737        let target_xorname = [0xAAu8; 32];
1738
1739        // Create a quote for a DIFFERENT xorname
1740        let wrong_xorname = [0xBBu8; 32];
1741        let quote = PaymentQuote {
1742            content: xor_name::XorName(wrong_xorname),
1743            timestamp: SystemTime::now(),
1744            price: Amount::from(1u64),
1745            rewards_address: RewardsAddress::new([1u8; 20]),
1746            pub_key: vec![0u8; 64],
1747            signature: vec![0u8; 64],
1748        };
1749
1750        // Build CLOSE_GROUP_SIZE quotes with distinct peer IDs
1751        let mut peer_quotes = Vec::new();
1752        for _ in 0..CLOSE_GROUP_SIZE {
1753            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1754        }
1755
1756        let proof = PaymentProof {
1757            proof_of_payment: ProofOfPayment { peer_quotes },
1758            tx_hashes: vec![],
1759        };
1760
1761        let proof_bytes = serialize_single_node_proof(&proof).expect("serialize proof");
1762
1763        let result = verifier
1764            .verify_payment(&target_xorname, Some(&proof_bytes))
1765            .await;
1766
1767        assert!(result.is_err(), "Should reject mismatched content address");
1768        let err_msg = format!("{}", result.expect_err("should be error"));
1769        assert!(
1770            err_msg.contains("content address mismatch"),
1771            "Error should mention 'content address mismatch': {err_msg}"
1772        );
1773    }
1774
1775    /// Helper: create a fake quote with the given xorname and timestamp.
1776    fn make_fake_quote(
1777        xorname: [u8; 32],
1778        timestamp: SystemTime,
1779        rewards_address: RewardsAddress,
1780    ) -> evmlib::PaymentQuote {
1781        use evmlib::PaymentQuote;
1782
1783        PaymentQuote {
1784            content: xor_name::XorName(xorname),
1785            timestamp,
1786            price: Amount::from(1u64),
1787            rewards_address,
1788            pub_key: vec![0u8; 64],
1789            signature: vec![0u8; 64],
1790        }
1791    }
1792
1793    /// Helper: create a fake quote whose price encodes the supplied record count.
1794    fn make_fake_quote_at_records(
1795        xorname: [u8; 32],
1796        timestamp: SystemTime,
1797        rewards_address: RewardsAddress,
1798        records: usize,
1799    ) -> evmlib::PaymentQuote {
1800        let mut quote = make_fake_quote(xorname, timestamp, rewards_address);
1801        quote.price = crate::payment::pricing::calculate_price(records);
1802        quote
1803    }
1804
1805    /// A small upward record drift between quoting and verifying — the normal
1806    /// in-flight churn on a busy network — must pass. The old fixed 5-record
1807    /// tolerance rejected a drift of 10 as "stale by 10 records"; the
1808    /// price-based gate sees a negligible price move on the near-flat curve and
1809    /// accepts it.
1810    #[test]
1811    fn test_small_record_drift_accepted() {
1812        use evmlib::{EncodedPeerId, RewardsAddress};
1813
1814        let verifier = create_test_verifier();
1815        // Node gained 10 records since quoting (100 -> 110).
1816        verifier.set_records_stored_for_tests(110);
1817        let quote = make_fake_quote_at_records(
1818            [0xE0u8; 32],
1819            SystemTime::now(),
1820            RewardsAddress::new([1u8; 20]),
1821            100,
1822        );
1823        let payment = ProofOfPayment {
1824            peer_quotes: vec![(EncodedPeerId::new(rand::random()), quote)],
1825        };
1826
1827        verifier
1828            .validate_quote_freshness(&payment)
1829            .expect("benign in-flight drift should pass");
1830    }
1831
1832    /// Over-payment must always be accepted: the node had MORE records when it
1833    /// quoted than it does now (e.g. it pruned), so the client paid for a
1834    /// fuller, pricier node. The old symmetric `abs_diff` gate wrongly rejected
1835    /// this; ~36% of STG-01 rejections were exactly this case.
1836    #[test]
1837    fn test_overpayment_accepted() {
1838        use evmlib::{EncodedPeerId, RewardsAddress};
1839
1840        let verifier = create_test_verifier();
1841        // Quote priced at 6000 records, but node now holds only 100.
1842        verifier.set_records_stored_for_tests(100);
1843        let quote = make_fake_quote_at_records(
1844            [0xE2u8; 32],
1845            SystemTime::now(),
1846            RewardsAddress::new([1u8; 20]),
1847            6000,
1848        );
1849        let payment = ProofOfPayment {
1850            peer_quotes: vec![(EncodedPeerId::new(rand::random()), quote)],
1851        };
1852
1853        verifier
1854            .validate_quote_freshness(&payment)
1855            .expect("over-payment must never be rejected");
1856    }
1857
1858    /// Genuine staleness — a quote that under-prices the node's current fullness
1859    /// by far more than the tolerance — is still rejected. Quote encodes 100
1860    /// records but the node now holds 6000, so the quadratic curve makes the
1861    /// paid price a small fraction of the current price.
1862    #[test]
1863    fn test_underpriced_quote_rejected() {
1864        use evmlib::{EncodedPeerId, RewardsAddress};
1865
1866        let verifier = create_test_verifier();
1867        verifier.set_records_stored_for_tests(6000);
1868        let quote = make_fake_quote_at_records(
1869            [0xE1u8; 32],
1870            SystemTime::now(),
1871            RewardsAddress::new([1u8; 20]),
1872            100,
1873        );
1874        let payment = ProofOfPayment {
1875            peer_quotes: vec![(EncodedPeerId::new(rand::random()), quote)],
1876        };
1877
1878        let err = verifier
1879            .validate_quote_freshness(&payment)
1880            .expect_err("a quote underpricing by >25% should fail");
1881        assert!(format!("{err}").contains("stale"));
1882    }
1883
1884    /// Helper: wrap quotes into a tagged serialized `PaymentProof`.
1885    fn serialize_proof(peer_quotes: Vec<(evmlib::EncodedPeerId, evmlib::PaymentQuote)>) -> Vec<u8> {
1886        use crate::payment::proof::{serialize_single_node_proof, PaymentProof};
1887
1888        let proof = PaymentProof {
1889            proof_of_payment: ProofOfPayment { peer_quotes },
1890            tx_hashes: vec![],
1891        };
1892        serialize_single_node_proof(&proof).expect("serialize proof")
1893    }
1894
1895    #[tokio::test]
1896    async fn test_old_quote_uses_storage_delta_not_timestamp() {
1897        use evmlib::{EncodedPeerId, RewardsAddress};
1898        use std::time::Duration;
1899
1900        let verifier = create_test_verifier();
1901        let xorname = [0xCCu8; 32];
1902        let rewards_addr = RewardsAddress::new([1u8; 20]);
1903
1904        // Create a quote that's 25 hours old (exceeds 24-hour max)
1905        let old_timestamp = SystemTime::now() - Duration::from_secs(25 * 3600);
1906        let quote = make_fake_quote(xorname, old_timestamp, rewards_addr);
1907
1908        let mut peer_quotes = Vec::new();
1909        for _ in 0..CLOSE_GROUP_SIZE {
1910            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1911        }
1912
1913        let proof_bytes = serialize_proof(peer_quotes);
1914        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1915
1916        let err_msg = format!("{}", result.expect_err("should fail at later check"));
1917        assert!(
1918            !err_msg.contains("expired"),
1919            "Should not reject by timestamp age: {err_msg}"
1920        );
1921    }
1922
1923    #[tokio::test]
1924    async fn test_future_quote_uses_storage_delta_not_timestamp() {
1925        use evmlib::{EncodedPeerId, RewardsAddress};
1926        use std::time::Duration;
1927
1928        let verifier = create_test_verifier();
1929        let xorname = [0xDDu8; 32];
1930        let rewards_addr = RewardsAddress::new([1u8; 20]);
1931
1932        // Create a quote with a timestamp 1 hour in the future
1933        let future_timestamp = SystemTime::now() + Duration::from_secs(3600);
1934        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
1935
1936        let mut peer_quotes = Vec::new();
1937        for _ in 0..CLOSE_GROUP_SIZE {
1938            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1939        }
1940
1941        let proof_bytes = serialize_proof(peer_quotes);
1942        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1943
1944        let err_msg = format!("{}", result.expect_err("should fail at later check"));
1945        assert!(
1946            !err_msg.contains("future"),
1947            "Should not reject by future timestamp: {err_msg}"
1948        );
1949    }
1950
1951    #[tokio::test]
1952    async fn test_quote_within_clock_skew_tolerance_accepted() {
1953        use evmlib::{EncodedPeerId, RewardsAddress};
1954        use std::time::Duration;
1955
1956        let verifier = create_test_verifier();
1957        let xorname = [0xD1u8; 32];
1958        let rewards_addr = RewardsAddress::new([1u8; 20]);
1959
1960        // Quote 30 seconds in the future — well within 300s tolerance
1961        let future_timestamp = SystemTime::now() + Duration::from_secs(30);
1962        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
1963
1964        let mut peer_quotes = Vec::new();
1965        for _ in 0..CLOSE_GROUP_SIZE {
1966            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1967        }
1968
1969        let proof_bytes = serialize_proof(peer_quotes);
1970        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1971
1972        // Should NOT fail at timestamp check (will fail later at pub_key binding)
1973        let err_msg = format!("{}", result.expect_err("should fail at later check"));
1974        assert!(
1975            !err_msg.contains("future"),
1976            "Should pass timestamp check (within tolerance), but got: {err_msg}"
1977        );
1978    }
1979
1980    #[tokio::test]
1981    async fn test_quote_beyond_clock_skew_still_uses_storage_delta() {
1982        use evmlib::{EncodedPeerId, RewardsAddress};
1983        use std::time::Duration;
1984
1985        let verifier = create_test_verifier();
1986        let xorname = [0xD2u8; 32];
1987        let rewards_addr = RewardsAddress::new([1u8; 20]);
1988
1989        // Quote 360 seconds in the future — exceeds 300s tolerance
1990        let future_timestamp = SystemTime::now() + Duration::from_secs(360);
1991        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
1992
1993        let mut peer_quotes = Vec::new();
1994        for _ in 0..CLOSE_GROUP_SIZE {
1995            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1996        }
1997
1998        let proof_bytes = serialize_proof(peer_quotes);
1999        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
2000
2001        let err_msg = format!("{}", result.expect_err("should fail at later check"));
2002        assert!(
2003            !err_msg.contains("future"),
2004            "Should not reject by future timestamp: {err_msg}"
2005        );
2006    }
2007
2008    #[tokio::test]
2009    async fn test_quote_23h_old_still_accepted() {
2010        use evmlib::{EncodedPeerId, RewardsAddress};
2011        use std::time::Duration;
2012
2013        let verifier = create_test_verifier();
2014        let xorname = [0xD3u8; 32];
2015        let rewards_addr = RewardsAddress::new([1u8; 20]);
2016
2017        // Quote 23 hours old — within 24h max age
2018        let old_timestamp = SystemTime::now() - Duration::from_secs(23 * 3600);
2019        let quote = make_fake_quote(xorname, old_timestamp, rewards_addr);
2020
2021        let mut peer_quotes = Vec::new();
2022        for _ in 0..CLOSE_GROUP_SIZE {
2023            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2024        }
2025
2026        let proof_bytes = serialize_proof(peer_quotes);
2027        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
2028
2029        // Should NOT fail at timestamp check (will fail later at pub_key binding)
2030        let err_msg = format!("{}", result.expect_err("should fail at later check"));
2031        assert!(
2032            !err_msg.contains("expired"),
2033            "Should pass expiry check (23h < 24h), but got: {err_msg}"
2034        );
2035    }
2036
2037    /// Helper: build an `EncodedPeerId` that matches the BLAKE3 hash of an ML-DSA public key.
2038    fn encoded_peer_id_for_pub_key(pub_key: &[u8]) -> evmlib::EncodedPeerId {
2039        let ant_peer_id = peer_id_from_public_key_bytes(pub_key).expect("valid ML-DSA pub key");
2040        evmlib::EncodedPeerId::new(*ant_peer_id.as_bytes())
2041    }
2042
2043    #[tokio::test]
2044    async fn test_local_not_in_paid_set_rejected() {
2045        use evmlib::RewardsAddress;
2046        use saorsa_core::MlDsa65;
2047        use saorsa_pqc::pqc::MlDsaOperations;
2048
2049        // Verifier with a local rewards address set
2050        let local_addr = RewardsAddress::new([0xAAu8; 20]);
2051        let config = PaymentVerifierConfig {
2052            evm: EvmVerifierConfig {
2053                network: EvmNetwork::ArbitrumOne,
2054            },
2055            cache_capacity: 100,
2056            local_rewards_address: local_addr,
2057        };
2058        let verifier = PaymentVerifier::new(config);
2059
2060        let xorname = [0xEEu8; 32];
2061        // Quotes pay a DIFFERENT rewards address
2062        let other_addr = RewardsAddress::new([0xBBu8; 20]);
2063
2064        // Use real ML-DSA keys so the pub_key→peer_id binding check passes
2065        let ml_dsa = MlDsa65::new();
2066        let mut peer_quotes = Vec::new();
2067        for _ in 0..CLOSE_GROUP_SIZE {
2068            let (public_key, _secret_key) = ml_dsa.generate_keypair().expect("keygen");
2069            let pub_key_bytes = public_key.as_bytes().to_vec();
2070            let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes);
2071
2072            let mut quote = make_fake_quote(xorname, SystemTime::now(), other_addr);
2073            quote.pub_key = pub_key_bytes;
2074
2075            peer_quotes.push((encoded, quote));
2076        }
2077
2078        let proof_bytes = serialize_proof(peer_quotes);
2079        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
2080
2081        assert!(result.is_err(), "Should reject payment not addressed to us");
2082        let err_msg = format!("{}", result.expect_err("should fail"));
2083        assert!(
2084            err_msg.contains("does not include this node as a recipient"),
2085            "Error should mention recipient rejection: {err_msg}"
2086        );
2087    }
2088
2089    #[tokio::test]
2090    async fn test_wrong_peer_binding_rejected() {
2091        use evmlib::{EncodedPeerId, RewardsAddress};
2092        use saorsa_core::MlDsa65;
2093        use saorsa_pqc::pqc::MlDsaOperations;
2094
2095        let verifier = create_test_verifier();
2096        let xorname = [0xFFu8; 32];
2097        let rewards_addr = RewardsAddress::new([1u8; 20]);
2098
2099        // Generate a real ML-DSA keypair so pub_key is valid
2100        let ml_dsa = MlDsa65::new();
2101        let (public_key, _secret_key) = ml_dsa.generate_keypair().expect("keygen");
2102        let pub_key_bytes = public_key.as_bytes().to_vec();
2103
2104        // Create a quote with a real pub_key but attach it to a random peer ID
2105        // whose identity multihash does NOT match BLAKE3(pub_key)
2106        let mut quote = make_fake_quote(xorname, SystemTime::now(), rewards_addr);
2107        quote.pub_key = pub_key_bytes;
2108
2109        // Use random ed25519 peer IDs — they won't match BLAKE3(pub_key)
2110        let mut peer_quotes = Vec::new();
2111        for _ in 0..CLOSE_GROUP_SIZE {
2112            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2113        }
2114
2115        let proof_bytes = serialize_proof(peer_quotes);
2116        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
2117
2118        assert!(result.is_err(), "Should reject wrong peer binding");
2119        let err_msg = format!("{}", result.expect_err("should fail"));
2120        assert!(
2121            err_msg.contains("pub_key does not belong to claimed peer"),
2122            "Error should mention binding mismatch: {err_msg}"
2123        );
2124    }
2125
2126    // =========================================================================
2127    // Merkle-tagged proof tests
2128    // =========================================================================
2129
2130    #[tokio::test]
2131    async fn test_merkle_tagged_proof_invalid_data_rejected() {
2132        use crate::ant_protocol::PROOF_TAG_MERKLE;
2133
2134        let verifier = create_test_verifier();
2135        let xorname = [0xA1u8; 32];
2136
2137        // Build a merkle-tagged proof with garbage body.
2138        // The tag byte is correct but the body is not valid msgpack.
2139        let mut merkle_garbage = Vec::with_capacity(64);
2140        merkle_garbage.push(PROOF_TAG_MERKLE);
2141        merkle_garbage.extend_from_slice(&[0xAB; 63]);
2142
2143        let result = verifier
2144            .verify_payment(&xorname, Some(&merkle_garbage))
2145            .await;
2146
2147        assert!(
2148            result.is_err(),
2149            "Should reject merkle proof with invalid body"
2150        );
2151        let err_msg = format!("{}", result.expect_err("should fail"));
2152        assert!(
2153            err_msg.contains("deserialize") || err_msg.contains("merkle proof"),
2154            "Error should mention deserialization failure: {err_msg}"
2155        );
2156    }
2157
2158    #[tokio::test]
2159    async fn test_single_node_tagged_proof_deserialization() {
2160        use crate::payment::proof::serialize_single_node_proof;
2161        use evmlib::{EncodedPeerId, RewardsAddress};
2162
2163        let verifier = create_test_verifier();
2164        let xorname = [0xA2u8; 32];
2165        let rewards_addr = RewardsAddress::new([1u8; 20]);
2166
2167        // Build a valid tagged single-node proof
2168        let quote = make_fake_quote(xorname, SystemTime::now(), rewards_addr);
2169        let mut peer_quotes = Vec::new();
2170        for _ in 0..CLOSE_GROUP_SIZE {
2171            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2172        }
2173
2174        let proof = crate::payment::proof::PaymentProof {
2175            proof_of_payment: ProofOfPayment {
2176                peer_quotes: peer_quotes.clone(),
2177            },
2178            tx_hashes: vec![],
2179        };
2180
2181        let tagged_bytes = serialize_single_node_proof(&proof).expect("serialize tagged proof");
2182
2183        // detect_proof_type should identify it as SingleNode
2184        assert_eq!(
2185            crate::payment::proof::detect_proof_type(&tagged_bytes),
2186            Some(crate::payment::proof::ProofType::SingleNode)
2187        );
2188
2189        // verify_payment should process it through the single-node path.
2190        // It will fail at quote validation (fake pub_key), but we verify
2191        // it passes the deserialization stage by checking the error type.
2192        let result = verifier.verify_payment(&xorname, Some(&tagged_bytes)).await;
2193
2194        assert!(result.is_err(), "Should fail at quote validation stage");
2195        let err_msg = format!("{}", result.expect_err("should fail"));
2196        // It should NOT be a deserialization error — it should get further
2197        assert!(
2198            !err_msg.contains("deserialize"),
2199            "Should pass deserialization but fail later: {err_msg}"
2200        );
2201    }
2202
2203    #[test]
2204    fn test_pool_cache_insert_and_lookup() {
2205        use evmlib::merkle_batch_payment::PoolHash;
2206
2207        // Verify the pool_cache field exists and works correctly.
2208        // Insert a pool hash, then verify it's present on lookup.
2209        let verifier = create_test_verifier();
2210
2211        let pool_hash: PoolHash = [0xBBu8; 32];
2212        let payment_info = evmlib::merkle_payments::OnChainPaymentInfo {
2213            depth: 4,
2214            merkle_payment_timestamp: 1_700_000_000,
2215            paid_node_addresses: vec![],
2216        };
2217
2218        // Insert into pool cache
2219        {
2220            let mut cache = verifier.pool_cache.lock();
2221            cache.put(pool_hash, payment_info);
2222        }
2223
2224        // First lookup — should find it
2225        {
2226            let found = verifier.pool_cache.lock().get(&pool_hash).cloned();
2227            assert!(found.is_some(), "Pool hash should be in cache after insert");
2228            let info = found.expect("cached info");
2229            assert_eq!(info.depth, 4);
2230            assert_eq!(info.merkle_payment_timestamp, 1_700_000_000);
2231        }
2232
2233        // Second lookup — same result (no double-query needed)
2234        {
2235            let found = verifier.pool_cache.lock().get(&pool_hash).cloned();
2236            assert!(
2237                found.is_some(),
2238                "Pool hash should still be in cache on second lookup"
2239            );
2240        }
2241
2242        // Different pool hash — should NOT be found
2243        let other_hash: PoolHash = [0xCCu8; 32];
2244        {
2245            let found = verifier.pool_cache.lock().get(&other_hash).cloned();
2246            assert!(found.is_none(), "Unknown pool hash should not be in cache");
2247        }
2248    }
2249
2250    #[tokio::test]
2251    async fn closeness_pass_cache_short_circuits_second_call() {
2252        // When a pool_hash is in the closeness_pass_cache, the outer
2253        // verify_merkle_candidate_closeness must return Ok(()) without
2254        // running the inner lookup — even if no P2PNode is attached.
2255        // That second half (no-p2p → would normally fail-closed in release)
2256        // is the proof the cache short-circuit ran first.
2257        let verifier = create_test_verifier();
2258        let pool_hash = [0xAAu8; 32];
2259        verifier.closeness_pass_cache.lock().put(pool_hash, ());
2260
2261        // Construct a dummy pool — contents don't matter because the cache
2262        // hit means we never look at them.
2263        let pool = MerklePaymentCandidatePool {
2264            midpoint_proof: fake_midpoint_proof(),
2265            candidate_nodes: make_candidate_nodes(1_700_000_000),
2266        };
2267
2268        let result = verifier
2269            .verify_merkle_candidate_closeness(&pool, pool_hash)
2270            .await;
2271        assert!(
2272            result.is_ok(),
2273            "cached pool hash must bypass the inner check and return Ok(()), got: {result:?}"
2274        );
2275    }
2276
2277    #[tokio::test]
2278    async fn closeness_single_flight_concurrent_readers_share_one_verification() {
2279        // Two concurrent callers for the same pool_hash should produce the
2280        // same outcome, and the cache should end up populated exactly once.
2281        // We use the test-utils fail-open path to short-circuit the inner
2282        // DHT lookup; the purpose of this test is the single-flight
2283        // plumbing, not the lookup itself.
2284        let verifier = Arc::new(create_test_verifier());
2285        let pool_hash = [0x77u8; 32];
2286        let pool = MerklePaymentCandidatePool {
2287            midpoint_proof: fake_midpoint_proof(),
2288            candidate_nodes: make_candidate_nodes(1_700_000_000),
2289        };
2290
2291        let v1 = Arc::clone(&verifier);
2292        let p1 = pool.clone();
2293        let v2 = Arc::clone(&verifier);
2294        let p2 = pool.clone();
2295
2296        let (r1, r2) = tokio::join!(
2297            async move { v1.verify_merkle_candidate_closeness(&p1, pool_hash).await },
2298            async move { v2.verify_merkle_candidate_closeness(&p2, pool_hash).await },
2299        );
2300
2301        assert_eq!(r1.is_ok(), r2.is_ok(), "concurrent callers must agree");
2302        assert!(
2303            r1.is_ok(),
2304            "both callers must succeed on the test-utils path"
2305        );
2306        assert!(
2307            verifier
2308                .closeness_pass_cache
2309                .lock()
2310                .get(&pool_hash)
2311                .is_some(),
2312            "success path must populate the pass cache"
2313        );
2314        assert!(
2315            verifier.inflight_closeness.lock().get(&pool_hash).is_none(),
2316            "inflight slot must be cleared after the leader finishes"
2317        );
2318    }
2319
2320    #[tokio::test]
2321    async fn closeness_waiter_reads_leaders_published_failure() {
2322        // Prove the waiter path actually surfaces a failure published by a
2323        // concurrent leader, without running its own inner check. Insert a
2324        // slot, spawn a waiter (which will park on notified_owned), then
2325        // publish failure + notify from the outside — simulating what the
2326        // leader's `publish` + drop-guard pair does.
2327        let verifier = Arc::new(create_test_verifier());
2328        let pool_hash = [0x55u8; 32];
2329        let slot = Arc::new(ClosenessSlot::new());
2330        verifier
2331            .inflight_closeness
2332            .lock()
2333            .put(pool_hash, Arc::clone(&slot));
2334
2335        let pool = MerklePaymentCandidatePool {
2336            midpoint_proof: fake_midpoint_proof(),
2337            candidate_nodes: make_candidate_nodes(1_700_000_000),
2338        };
2339
2340        let verifier_c = Arc::clone(&verifier);
2341        let pool_c = pool.clone();
2342        let waiter = tokio::spawn(async move {
2343            verifier_c
2344                .verify_merkle_candidate_closeness(&pool_c, pool_hash)
2345                .await
2346        });
2347
2348        // Yield so the waiter can run up to its `notified_owned().await`.
2349        // A few yields cover both single-threaded and multi-threaded tokio
2350        // runtimes regardless of scheduling.
2351        for _ in 0..5 {
2352            tokio::task::yield_now().await;
2353        }
2354
2355        // Simulate the leader's `publish` + drop-guard: publish the result,
2356        // clear the slot, wake waiters.
2357        slot.result
2358            .set(Err("forged pool: not close enough".to_string()))
2359            .expect("set once");
2360        verifier.inflight_closeness.lock().pop(&pool_hash);
2361        slot.notify.notify_waiters();
2362
2363        let result = waiter.await.expect("task panicked");
2364        let err = result.expect_err("waiter must return the leader's published failure");
2365        assert!(
2366            err.to_string().contains("forged pool"),
2367            "waiter must surface the leader's error message, got: {err}"
2368        );
2369    }
2370
2371    #[tokio::test]
2372    async fn closeness_rejects_pool_with_duplicate_candidate_pub_keys() {
2373        // An attacker who submits 16 copies of the same real peer's pub_key
2374        // would otherwise satisfy the closeness threshold trivially:
2375        // that one peer's membership in the DHT-returned set would count
2376        // 16 times. The dedupe check in verify_merkle_candidate_closeness_inner
2377        // must reject the pool BEFORE the network lookup runs (so this test
2378        // works even with no P2PNode attached).
2379        let verifier = create_test_verifier();
2380        let pool_hash = [0xDDu8; 32];
2381
2382        // Build a normal pool, then overwrite every candidate's pub_key
2383        // with a single shared key so all 16 derive to the same PeerId.
2384        let mut candidates = make_candidate_nodes(1_700_000_000);
2385        let shared_pub_key = candidates
2386            .first()
2387            .expect("make_candidate_nodes returns CANDIDATES_PER_POOL entries")
2388            .pub_key
2389            .clone();
2390        for c in &mut candidates {
2391            c.pub_key = shared_pub_key.clone();
2392        }
2393        let pool = MerklePaymentCandidatePool {
2394            midpoint_proof: fake_midpoint_proof(),
2395            candidate_nodes: candidates,
2396        };
2397
2398        let result = verifier
2399            .verify_merkle_candidate_closeness(&pool, pool_hash)
2400            .await;
2401        let err = result.expect_err("duplicate candidate PeerIds must be rejected");
2402        let msg = err.to_string();
2403        assert!(
2404            msg.contains("duplicate candidate PeerId"),
2405            "rejection must be the duplicate-PeerId branch, got: {msg}"
2406        );
2407    }
2408
2409    /// Build a deterministic but otherwise-unused `MidpointProof` so unit
2410    /// tests can construct a `MerklePaymentCandidatePool` without spinning
2411    /// up a real merkle tree. The closeness path only calls `.address()`
2412    /// on it, which is a pure BLAKE3 of the branch's leaf/root/timestamp —
2413    /// the values don't need to be tree-valid for these tests.
2414    fn fake_midpoint_proof() -> evmlib::merkle_payments::MidpointProof {
2415        // Build a minimal tree of two leaves so we get a real branch.
2416        let leaves = vec![xor_name::XorName([1u8; 32]), xor_name::XorName([2u8; 32])];
2417        let tree = evmlib::merkle_payments::MerkleTree::from_xornames(leaves).expect("tree");
2418        let candidates = tree.reward_candidates(1_700_000_000).expect("candidates");
2419        candidates.first().expect("at least one").clone()
2420    }
2421
2422    // =========================================================================
2423    // Merkle verification unit tests
2424    // =========================================================================
2425
2426    /// Helper: build 16 validly-signed ML-DSA-65 candidate nodes.
2427    fn make_candidate_nodes(
2428        timestamp: u64,
2429    ) -> [evmlib::merkle_payments::MerklePaymentCandidateNode;
2430           evmlib::merkle_payments::CANDIDATES_PER_POOL] {
2431        use evmlib::merkle_payments::{MerklePaymentCandidateNode, CANDIDATES_PER_POOL};
2432        use saorsa_core::MlDsa65;
2433        use saorsa_pqc::pqc::types::MlDsaSecretKey;
2434        use saorsa_pqc::pqc::MlDsaOperations;
2435
2436        std::array::from_fn::<_, CANDIDATES_PER_POOL, _>(|i| {
2437            let ml_dsa = MlDsa65::new();
2438            let (pub_key, secret_key) = ml_dsa.generate_keypair().expect("keygen");
2439            let price = evmlib::common::Amount::from(1024u64);
2440            #[allow(clippy::cast_possible_truncation)]
2441            let reward_address = RewardsAddress::new([i as u8; 20]);
2442            let msg = MerklePaymentCandidateNode::bytes_to_sign(&price, &reward_address, timestamp);
2443            let sk = MlDsaSecretKey::from_bytes(secret_key.as_bytes()).expect("sk");
2444            let signature = ml_dsa.sign(&sk, &msg).expect("sign").as_bytes().to_vec();
2445
2446            MerklePaymentCandidateNode {
2447                pub_key: pub_key.as_bytes().to_vec(),
2448                price,
2449                reward_address,
2450                merkle_payment_timestamp: timestamp,
2451                signature,
2452            }
2453        })
2454    }
2455
2456    /// Helper: build a valid `MerklePaymentProof` with real ML-DSA-65
2457    /// signatures. Returns the raw proof, pool hash, xorname, and timestamp.
2458    fn make_valid_merkle_proof() -> (
2459        evmlib::merkle_payments::MerklePaymentProof,
2460        evmlib::merkle_batch_payment::PoolHash,
2461        [u8; 32],
2462        u64,
2463    ) {
2464        use evmlib::merkle_payments::{MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree};
2465
2466        let timestamp = std::time::SystemTime::now()
2467            .duration_since(std::time::UNIX_EPOCH)
2468            .expect("system time")
2469            .as_secs();
2470
2471        let addresses: Vec<xor_name::XorName> = (0..4u8)
2472            .map(|i| xor_name::XorName::from_content(&[i]))
2473            .collect();
2474        let tree = MerkleTree::from_xornames(addresses.clone()).expect("tree");
2475
2476        let candidate_nodes = make_candidate_nodes(timestamp);
2477
2478        let reward_candidates = tree
2479            .reward_candidates(timestamp)
2480            .expect("reward candidates");
2481        let midpoint_proof = reward_candidates
2482            .first()
2483            .expect("at least one candidate")
2484            .clone();
2485
2486        let pool = MerklePaymentCandidatePool {
2487            midpoint_proof,
2488            candidate_nodes,
2489        };
2490
2491        let first_address = *addresses.first().expect("first address");
2492        let address_proof = tree
2493            .generate_address_proof(0, first_address)
2494            .expect("proof");
2495
2496        let merkle_proof = MerklePaymentProof::new(first_address, address_proof, pool);
2497        let pool_hash = merkle_proof.winner_pool_hash();
2498        let xorname = first_address.0;
2499
2500        (merkle_proof, pool_hash, xorname, timestamp)
2501    }
2502
2503    /// Helper: build a minimal valid `MerklePaymentProof` with real ML-DSA-65
2504    /// signatures. Returns `(xorname, serialized_tagged_proof, pool_hash, timestamp)`.
2505    fn make_valid_merkle_proof_bytes() -> (
2506        [u8; 32],
2507        Vec<u8>,
2508        evmlib::merkle_batch_payment::PoolHash,
2509        u64,
2510    ) {
2511        let (merkle_proof, pool_hash, xorname, timestamp) = make_valid_merkle_proof();
2512        let tagged = crate::payment::proof::serialize_merkle_proof(&merkle_proof)
2513            .expect("serialize merkle proof");
2514        (xorname, tagged, pool_hash, timestamp)
2515    }
2516
2517    #[tokio::test]
2518    async fn test_merkle_address_mismatch_rejected() {
2519        let verifier = create_test_verifier();
2520        let (_correct_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
2521
2522        // Use a DIFFERENT xorname than what the proof was built for
2523        let wrong_xorname = [0xFFu8; 32];
2524
2525        let result = verifier
2526            .verify_payment(&wrong_xorname, Some(&tagged_proof))
2527            .await;
2528
2529        assert!(
2530            result.is_err(),
2531            "Should reject merkle proof address mismatch"
2532        );
2533        let err_msg = format!("{}", result.expect_err("should fail"));
2534        assert!(
2535            err_msg.contains("address mismatch") || err_msg.contains("Merkle proof address"),
2536            "Error should mention address mismatch: {err_msg}"
2537        );
2538    }
2539
2540    #[tokio::test]
2541    async fn test_merkle_malformed_body_rejected() {
2542        let verifier = create_test_verifier();
2543        let xorname = [0xA3u8; 32];
2544
2545        // Valid merkle tag but truncated/corrupted msgpack body
2546        let mut bad_proof = vec![crate::ant_protocol::PROOF_TAG_MERKLE];
2547        bad_proof.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
2548        bad_proof.extend_from_slice(&[0x00; 10]);
2549        // pad to minimum size
2550        while bad_proof.len() < MIN_PAYMENT_PROOF_SIZE_BYTES {
2551            bad_proof.push(0x00);
2552        }
2553
2554        let result = verifier.verify_payment(&xorname, Some(&bad_proof)).await;
2555
2556        assert!(result.is_err(), "Should reject malformed merkle body");
2557        let err_msg = format!("{}", result.expect_err("should fail"));
2558        assert!(
2559            err_msg.contains("deserialize") || err_msg.contains("Failed"),
2560            "Error should mention deserialization: {err_msg}"
2561        );
2562    }
2563
2564    #[test]
2565    fn test_merkle_proof_serialized_size_within_limits() {
2566        let (_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
2567
2568        // 16 ML-DSA-65 candidates (~1952 pub key + ~3309 sig each) ≈ 84 KB + tree data
2569        assert!(
2570            tagged_proof.len() >= MIN_PAYMENT_PROOF_SIZE_BYTES,
2571            "Merkle proof ({} bytes) should be >= min {} bytes",
2572            tagged_proof.len(),
2573            MIN_PAYMENT_PROOF_SIZE_BYTES
2574        );
2575        assert!(
2576            tagged_proof.len() <= MAX_PAYMENT_PROOF_SIZE_BYTES,
2577            "Merkle proof ({} bytes) should be <= max {} bytes",
2578            tagged_proof.len(),
2579            MAX_PAYMENT_PROOF_SIZE_BYTES
2580        );
2581    }
2582
2583    #[test]
2584    fn test_merkle_proof_tag_is_correct() {
2585        let (_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
2586
2587        assert_eq!(
2588            tagged_proof.first().copied(),
2589            Some(crate::ant_protocol::PROOF_TAG_MERKLE),
2590            "First byte must be the merkle tag"
2591        );
2592        assert_eq!(
2593            crate::payment::proof::detect_proof_type(&tagged_proof),
2594            Some(crate::payment::proof::ProofType::Merkle)
2595        );
2596    }
2597
2598    #[test]
2599    fn test_pool_cache_eviction() {
2600        use evmlib::merkle_batch_payment::PoolHash;
2601
2602        let config = PaymentVerifierConfig {
2603            evm: EvmVerifierConfig::default(),
2604            cache_capacity: 100,
2605            local_rewards_address: RewardsAddress::new([1u8; 20]),
2606        };
2607        let verifier = PaymentVerifier::new(config);
2608
2609        // Fill the pool cache to capacity (DEFAULT_POOL_CACHE_CAPACITY = 1000)
2610        for i in 0..DEFAULT_POOL_CACHE_CAPACITY {
2611            let mut hash: PoolHash = [0u8; 32];
2612            // Write index bytes into the hash
2613            let idx_bytes = i.to_le_bytes();
2614            for (j, b) in idx_bytes.iter().enumerate() {
2615                if j < 32 {
2616                    hash[j] = *b;
2617                }
2618            }
2619            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2620                depth: 4,
2621                merkle_payment_timestamp: 1_700_000_000,
2622                paid_node_addresses: vec![],
2623            };
2624            verifier.pool_cache.lock().put(hash, info);
2625        }
2626
2627        assert_eq!(
2628            verifier.pool_cache.lock().len(),
2629            DEFAULT_POOL_CACHE_CAPACITY
2630        );
2631
2632        // Insert one more — should evict the oldest
2633        let overflow_hash: PoolHash = [0xFFu8; 32];
2634        let info = evmlib::merkle_payments::OnChainPaymentInfo {
2635            depth: 8,
2636            merkle_payment_timestamp: 1_800_000_000,
2637            paid_node_addresses: vec![],
2638        };
2639        verifier.pool_cache.lock().put(overflow_hash, info);
2640
2641        // Size should still be at capacity (not capacity + 1)
2642        assert_eq!(
2643            verifier.pool_cache.lock().len(),
2644            DEFAULT_POOL_CACHE_CAPACITY
2645        );
2646
2647        // The new entry should be present
2648        let found = verifier.pool_cache.lock().get(&overflow_hash).cloned();
2649        assert!(
2650            found.is_some(),
2651            "Newly inserted pool hash should be present"
2652        );
2653        assert_eq!(found.expect("info").depth, 8);
2654    }
2655
2656    #[test]
2657    fn test_pool_cache_concurrent_access() {
2658        use evmlib::merkle_batch_payment::PoolHash;
2659        use std::sync::Arc;
2660
2661        let verifier = Arc::new(create_test_verifier());
2662
2663        let mut handles = Vec::new();
2664        for i in 0..20u8 {
2665            let v = verifier.clone();
2666            handles.push(std::thread::spawn(move || {
2667                let hash: PoolHash = [i; 32];
2668                let info = evmlib::merkle_payments::OnChainPaymentInfo {
2669                    depth: i,
2670                    merkle_payment_timestamp: u64::from(i) * 1000,
2671                    paid_node_addresses: vec![],
2672                };
2673                v.pool_cache.lock().put(hash, info);
2674
2675                // Read back
2676                let found = v.pool_cache.lock().get(&hash).cloned();
2677                assert!(found.is_some(), "Entry {i} should be readable after insert");
2678            }));
2679        }
2680
2681        for handle in handles {
2682            handle.join().expect("thread panicked");
2683        }
2684
2685        // All 20 entries should be present (well under 1000 capacity)
2686        assert_eq!(verifier.pool_cache.lock().len(), 20);
2687    }
2688
2689    #[tokio::test]
2690    async fn test_merkle_tampered_candidate_signature_rejected() {
2691        let verifier = create_test_verifier();
2692
2693        let (mut merkle_proof, _pool_hash, xorname, timestamp) = make_valid_merkle_proof();
2694
2695        // Tamper the first candidate's signature
2696        if let Some(byte) = merkle_proof
2697            .winner_pool
2698            .candidate_nodes
2699            .first_mut()
2700            .and_then(|c| c.signature.first_mut())
2701        {
2702            *byte ^= 0xFF;
2703        }
2704
2705        // Recompute pool hash after tampering (signature change alters the hash)
2706        let tampered_pool_hash = merkle_proof.winner_pool_hash();
2707
2708        // Pre-populate pool cache so we skip the on-chain query
2709        {
2710            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2711                depth: 4,
2712                merkle_payment_timestamp: timestamp,
2713                paid_node_addresses: vec![],
2714            };
2715            verifier.pool_cache.lock().put(tampered_pool_hash, info);
2716        }
2717
2718        let tagged =
2719            crate::payment::proof::serialize_merkle_proof(&merkle_proof).expect("serialize");
2720
2721        let result = verifier.verify_payment(&xorname, Some(&tagged)).await;
2722
2723        assert!(
2724            result.is_err(),
2725            "Should reject merkle proof with tampered candidate signature"
2726        );
2727        let err_msg = format!("{}", result.expect_err("should fail"));
2728        assert!(
2729            err_msg.contains("Invalid ML-DSA-65 signature"),
2730            "Error should mention invalid signature: {err_msg}"
2731        );
2732    }
2733
2734    #[tokio::test]
2735    async fn test_merkle_timestamp_mismatch_rejected() {
2736        let verifier = create_test_verifier();
2737
2738        let (xorname, tagged, pool_hash, timestamp) = make_valid_merkle_proof_bytes();
2739
2740        // Pre-populate pool cache with a DIFFERENT timestamp than the candidates
2741        {
2742            let mismatched_ts = timestamp + 9999;
2743            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2744                depth: 4,
2745                merkle_payment_timestamp: mismatched_ts,
2746                paid_node_addresses: vec![],
2747            };
2748            verifier.pool_cache.lock().put(pool_hash, info);
2749        }
2750
2751        let result = verifier.verify_payment(&xorname, Some(&tagged)).await;
2752
2753        assert!(
2754            result.is_err(),
2755            "Should reject merkle proof with timestamp mismatch"
2756        );
2757        let err_msg = format!("{}", result.expect_err("should fail"));
2758        assert!(
2759            err_msg.contains("timestamp mismatch"),
2760            "Error should mention timestamp mismatch: {err_msg}"
2761        );
2762    }
2763
2764    #[tokio::test]
2765    async fn test_merkle_paid_node_index_out_of_bounds_rejected() {
2766        let verifier = create_test_verifier();
2767        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2768
2769        // The test tree has 4 addresses → depth 2. We must match the tree depth
2770        // so verify_merkle_proof passes the depth check, then the paid node
2771        // index out-of-bounds check fires.
2772        {
2773            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2774                depth: 2,
2775                merkle_payment_timestamp: ts,
2776                paid_node_addresses: vec![
2777                    // First paid node: valid (matches candidate 0, amount matches formula)
2778                    // Expected per-node: median(1024) * 2^2 / 2 = 2048
2779                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(2048u64)),
2780                    // Second paid node: index 999 is way beyond CANDIDATES_PER_POOL (16)
2781                    (RewardsAddress::new([1u8; 20]), 999, Amount::from(2048u64)),
2782                ],
2783            };
2784            verifier.pool_cache.lock().put(pool_hash, info);
2785        }
2786
2787        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2788
2789        assert!(
2790            result.is_err(),
2791            "Should reject paid node index out of bounds"
2792        );
2793        let err_msg = format!("{}", result.expect_err("should fail"));
2794        assert!(
2795            err_msg.contains("out of bounds"),
2796            "Error should mention out of bounds: {err_msg}"
2797        );
2798    }
2799
2800    #[tokio::test]
2801    async fn test_merkle_paid_node_address_mismatch_rejected() {
2802        let verifier = create_test_verifier();
2803        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2804
2805        // Tree has depth 2, so provide 2 paid node entries.
2806        // Both use valid indices but the second has a wrong reward address.
2807        {
2808            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2809                depth: 2,
2810                merkle_payment_timestamp: ts,
2811                paid_node_addresses: vec![
2812                    // Index 0 with matching address [0x00; 20]
2813                    // Expected per-node: median(1024) * 2^2 / 2 = 2048
2814                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(2048u64)),
2815                    // Index 1 with WRONG address — candidate 1's address is [0x01; 20]
2816                    (RewardsAddress::new([0xFF; 20]), 1, Amount::from(2048u64)),
2817                ],
2818            };
2819            verifier.pool_cache.lock().put(pool_hash, info);
2820        }
2821
2822        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2823
2824        assert!(result.is_err(), "Should reject paid node address mismatch");
2825        let err_msg = format!("{}", result.expect_err("should fail"));
2826        assert!(
2827            err_msg.contains("address mismatch"),
2828            "Error should mention address mismatch: {err_msg}"
2829        );
2830    }
2831
2832    #[tokio::test]
2833    async fn test_merkle_wrong_depth_rejected() {
2834        let verifier = create_test_verifier();
2835        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2836
2837        // Pre-populate pool cache with depth=3 but only 1 paid node address
2838        // (depth must equal paid_node_addresses.len())
2839        {
2840            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2841                depth: 3,
2842                merkle_payment_timestamp: ts,
2843                paid_node_addresses: vec![(
2844                    RewardsAddress::new([0u8; 20]),
2845                    0,
2846                    Amount::from(1024u64),
2847                )],
2848            };
2849            verifier.pool_cache.lock().put(pool_hash, info);
2850        }
2851
2852        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2853
2854        assert!(
2855            result.is_err(),
2856            "Should reject mismatched depth vs paid node count"
2857        );
2858        let err_msg = format!("{}", result.expect_err("should fail"));
2859        assert!(
2860            err_msg.contains("Wrong number of paid nodes")
2861                || err_msg.contains("verification failed"),
2862            "Error should mention depth/count mismatch: {err_msg}"
2863        );
2864    }
2865
2866    #[tokio::test]
2867    async fn test_merkle_underpayment_rejected() {
2868        let verifier = create_test_verifier();
2869        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2870
2871        // Tree depth=2, so 2 paid nodes required. Candidates all quote price=1024.
2872        // Expected per-node: median(1024) * 2^2 / 2 = 2048.
2873        // Pay only 1 wei per node — far below the expected amount.
2874        {
2875            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2876                depth: 2,
2877                merkle_payment_timestamp: ts,
2878                paid_node_addresses: vec![
2879                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(1u64)),
2880                    (RewardsAddress::new([1u8; 20]), 1, Amount::from(1u64)),
2881                ],
2882            };
2883            verifier.pool_cache.lock().put(pool_hash, info);
2884        }
2885
2886        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2887
2888        assert!(
2889            result.is_err(),
2890            "Should reject merkle payment where paid amount < expected per-node amount"
2891        );
2892        let err_msg = format!("{}", result.expect_err("should fail"));
2893        assert!(
2894            err_msg.contains("Underpayment"),
2895            "Error should mention underpayment: {err_msg}"
2896        );
2897    }
2898
2899    // =========================================================================
2900    // Closeness-window constants regression tests
2901    //
2902    // These constants are load-bearing for both correctness (the storer
2903    // must look at the same window the client picks from, otherwise honest
2904    // pools are rejected) and DoS resistance (the timeout caps lookup
2905    // amplification per forged pool_hash). Pinning them with tests gives
2906    // future patches a one-line failure if either is silently changed
2907    // without updating the security argument in the doc comments.
2908    //
2909    // Empirical justification, captured during STG-01 investigation on
2910    // 2026-05-01:
2911    //
2912    //   - 60s timeout cut iterative lookups off after ~7 of 20 iterations
2913    //     (trace from EWR-3 ant-node-1 in CLOSENESS_LOOKUP_TIMEOUT doc).
2914    //   - K=16 storer window vs K=32 client over-query produced 73%
2915    //     false-positive mismatch rejections under realistic load
2916    //     (115 → 31 client mismatches per 5min after K=32 deploy).
2917    // =========================================================================
2918
2919    #[test]
2920    fn closeness_lookup_timeout_is_240s() {
2921        // Pin the timeout. If a future change drops it back to 60s the
2922        // failure mode from the trace in the doc comment will return.
2923        assert_eq!(
2924            PaymentVerifier::CLOSENESS_LOOKUP_TIMEOUT,
2925            std::time::Duration::from_secs(240),
2926            "CLOSENESS_LOOKUP_TIMEOUT must be 240s; if changing this, update \
2927             the iteration trace in the doc comment and re-validate on a \
2928             fresh testnet"
2929        );
2930    }
2931
2932    #[test]
2933    fn closeness_lookup_width_is_32() {
2934        // Pin the storer's lookup width. Must equal the client's
2935        // over-query factor (CANDIDATES_PER_POOL * 2 = 32) so the storer
2936        // sees the same peers the client legitimately picks from.
2937        assert_eq!(
2938            PaymentVerifier::CLOSENESS_LOOKUP_WIDTH,
2939            2 * evmlib::merkle_payments::CANDIDATES_PER_POOL,
2940            "CLOSENESS_LOOKUP_WIDTH must equal 2 * CANDIDATES_PER_POOL to \
2941             match the client's over-query in get_merkle_candidate_pool"
2942        );
2943    }
2944
2945    #[test]
2946    fn closeness_required_threshold_is_majority() {
2947        // Pin the threshold so a future change can't silently move it. This
2948        // is the security knob: a 9/16 majority tolerates closest-set
2949        // divergence between two nodes' views while still requiring most
2950        // candidates to be real peers the live DHT lists as closest.
2951        assert_eq!(
2952            PaymentVerifier::CANDIDATE_CLOSENESS_REQUIRED,
2953            9,
2954            "closeness threshold is a 9/16 majority"
2955        );
2956    }
2957
2958    #[test]
2959    fn closeness_lookup_count_uses_max_of_width_and_pool_len() {
2960        // The honest case: a 16-candidate pool must trigger a 32-peer
2961        // network lookup. This is the K=16-rejects-honest-pool fix from
2962        // the STG-01 investigation — without it, the storer never
2963        // observes the peers at network-true positions 17–32 that the
2964        // client legitimately picks from.
2965        let standard =
2966            PaymentVerifier::closeness_lookup_count(evmlib::merkle_payments::CANDIDATES_PER_POOL);
2967        assert_eq!(
2968            standard, 32,
2969            "honest 16-candidate pool must trigger a 32-peer DHT lookup"
2970        );
2971
2972        // Future-proof: if a protocol bump ever produces a pool larger
2973        // than CLOSENESS_LOOKUP_WIDTH, lookup_count must scale with the
2974        // pool — not truncate to WIDTH. Truncating would let an attacker
2975        // hide candidates by padding the pool past the storer's window.
2976        assert_eq!(
2977            PaymentVerifier::closeness_lookup_count(64),
2978            64,
2979            "lookup_count must scale up if pool exceeds CLOSENESS_LOOKUP_WIDTH"
2980        );
2981
2982        // Lower bound (also covered by the const-assert below; pin the
2983        // runtime path too in case the const-assert is ever removed).
2984        assert_eq!(
2985            PaymentVerifier::closeness_lookup_count(1),
2986            PaymentVerifier::CLOSENESS_LOOKUP_WIDTH,
2987            "lookup_count must never drop below CLOSENESS_LOOKUP_WIDTH"
2988        );
2989    }
2990
2991    // Compile-time invariant: the `closeness_lookup_count` formula relies
2992    // on WIDTH being ≥ CANDIDATES_PER_POOL so we never request fewer peers
2993    // than the pool itself contains.
2994    const _: () = assert!(
2995        PaymentVerifier::CLOSENESS_LOOKUP_WIDTH >= evmlib::merkle_payments::CANDIDATES_PER_POOL,
2996        "CLOSENESS_LOOKUP_WIDTH must be ≥ CANDIDATES_PER_POOL",
2997    );
2998
2999    // =========================================================================
3000    // Closeness-match logic tests
3001    //
3002    // These tests use the extracted `check_closeness_match` helper to
3003    // exercise the matching logic directly with synthetic peer-ID sets,
3004    // without standing up a real DHT. They cover:
3005    //
3006    //   - the 9/16 majority threshold (accept at exactly 9, reject below);
3007    //   - that a candidate counts only via exact membership in the storer's
3008    //     returned closest peers, so off-network fabrications are rejected;
3009    //   - the sparse-network short-circuit.
3010    //
3011    // Synthetic PeerIds put the tag in `bytes[0]`, so a candidate is in or
3012    // out of the network's returned set purely by tag value.
3013    // =========================================================================
3014
3015    /// Build a deterministic `PeerId` from a single byte tag.
3016    fn synthetic_peer_id(tag: u8) -> PeerId {
3017        let mut bytes = [0u8; 32];
3018        bytes[0] = tag;
3019        PeerId::from_bytes(bytes)
3020    }
3021
3022    /// Build a vector of synthetic `PeerId`s tagged with bytes 1..=n.
3023    fn synthetic_peer_ids(n: u8) -> Vec<PeerId> {
3024        (1..=n).map(synthetic_peer_id).collect()
3025    }
3026
3027    #[test]
3028    fn closeness_match_passes_when_all_16_candidates_in_top_16() {
3029        // Trivial case: every candidate is in the network's top-16.
3030        // Asserts the happy path still works after the refactor.
3031        let candidates = synthetic_peer_ids(16);
3032        let network = synthetic_peer_ids(16);
3033        let pool_address = [0u8; 32];
3034        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
3035        assert!(result.is_ok(), "all-in-top-16 pool must pass: {result:?}");
3036    }
3037
3038    #[test]
3039    fn closeness_match_passes_when_candidates_span_positions_1_to_15_and_17() {
3040        // The client's pool contains 16 candidates, 15 at network-true
3041        // positions 1..=15 plus one at position 17 (the position-16 peer was
3042        // unresponsive when the client over-queried). Under K=32 all 16 are
3043        // exact matches, comfortably ≥ the 9/16 majority.
3044        let candidates = synthetic_peer_ids(15)
3045            .into_iter()
3046            .chain(std::iter::once(synthetic_peer_id(17)))
3047            .collect::<Vec<_>>();
3048        // Lookup window = 32, includes position 17.
3049        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
3050        let pool_address = [0u8; 32];
3051        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
3052        assert!(
3053            result.is_ok(),
3054            "pool with one candidate at position 17 must pass: {result:?}"
3055        );
3056    }
3057
3058    #[test]
3059    fn closeness_match_accepts_honest_skew_via_exact_matches() {
3060        // Honest skew: the client's 16 candidates span network-true positions
3061        // {1..=12, 17, 19, 21, 23}. The lookup window of 32 covers all of
3062        // them, so all 16 are exact matches — trivially ≥ the 9/16 majority.
3063        let candidates: Vec<PeerId> = (1..=12u8)
3064            .chain([17u8, 19, 21, 23])
3065            .map(synthetic_peer_id)
3066            .collect();
3067        let pool_address = [0u8; 32];
3068        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
3069
3070        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
3071        assert!(
3072            result.is_ok(),
3073            "honest pool fully inside the lookup window must pass: {result:?}"
3074        );
3075    }
3076
3077    #[test]
3078    fn closeness_match_rejects_forged_pool() {
3079        // Security floor: a fully-forged pool whose candidate PeerIds are
3080        // disjoint from the network's returned closest peers must be
3081        // rejected. The lowered majority threshold must NOT let off-network
3082        // fabrications pass — every counted candidate has to be a peer the
3083        // live DHT actually returned.
3084        let forged_candidates: Vec<PeerId> = (100..=115).map(synthetic_peer_id).collect();
3085        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
3086        let pool_address = [0u8; 32];
3087
3088        let result =
3089            PaymentVerifier::check_closeness_match(&forged_candidates, &network, &pool_address);
3090        match result {
3091            Err(Error::Payment(msg)) => {
3092                assert!(
3093                    msg.contains("candidate pub_keys do not match"),
3094                    "expected forged-pool rejection message, got: {msg}"
3095                );
3096            }
3097            other => {
3098                panic!("forged pool disjoint from the network set must be rejected: {other:?}")
3099            }
3100        }
3101    }
3102
3103    #[test]
3104    fn closeness_match_rejects_pool_below_majority() {
3105        // Threshold sanity: 8 candidates are exact matches (tags 1..=8) and
3106        // the other 8 are off-network fabrications (tags 100..=107). 8 < 9
3107        // → reject.
3108        let mut candidates = synthetic_peer_ids(8);
3109        candidates.extend((100..=107).map(synthetic_peer_id)); // 8 fabrications
3110        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
3111        let pool_address = [0u8; 32];
3112
3113        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
3114        assert!(
3115            result.is_err(),
3116            "8 matches < majority of 9/16 must reject: {result:?}"
3117        );
3118    }
3119
3120    #[test]
3121    fn closeness_match_accepts_at_exactly_majority() {
3122        // Threshold sanity: exactly 9 candidates are exact matches (tags
3123        // 1..=9), the other 7 are off-network fabrications (tags 100..=106).
3124        // 9 ≥ 9 → accept.
3125        let mut candidates = synthetic_peer_ids(9);
3126        candidates.extend((100..=106).map(synthetic_peer_id)); // 7 fabrications
3127        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
3128        let pool_address = [0u8; 32];
3129
3130        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
3131        assert!(
3132            result.is_ok(),
3133            "9/16 ≥ majority threshold must accept: {result:?}"
3134        );
3135    }
3136
3137    #[test]
3138    fn closeness_match_returns_sparse_dht_error_when_lookup_too_small() {
3139        // The sparse-DHT short-circuit fires when the lookup returned
3140        // fewer peers than the threshold itself — even an all-matching
3141        // candidate set can't pass because the storer doesn't have an
3142        // authoritative view to compare against.
3143        let candidates = synthetic_peer_ids(16);
3144        let network = synthetic_peer_ids(8); // < CANDIDATE_CLOSENESS_REQUIRED (9)
3145        let pool_address = [0u8; 32];
3146
3147        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
3148        match result {
3149            Err(Error::Payment(msg)) => {
3150                assert!(
3151                    msg.contains("authoritative DHT lookup returned only 8"),
3152                    "expected sparse-DHT error message, got: {msg}"
3153                );
3154            }
3155            other => panic!("expected sparse-DHT rejection, got: {other:?}"),
3156        }
3157    }
3158}