Skip to main content

ant_node/payment/
verifier.rs

1//! Payment verifier with LRU cache and EVM verification.
2//!
3//! This is the core payment verification logic for ant-node.
4//! All new data requires EVM payment on Arbitrum (no free tier).
5
6use crate::ant_protocol::CLOSE_GROUP_SIZE;
7use crate::error::{Error, Result};
8use crate::logging::{debug, info, warn};
9use crate::payment::cache::{CacheStats, VerifiedCache, XorName};
10use crate::payment::pricing::{calculate_price, derive_records_stored_from_price};
11use crate::payment::proof::{
12    deserialize_merkle_proof, deserialize_proof, detect_proof_type, ProofType,
13};
14use crate::payment::single_node::SingleNodePayment;
15use crate::storage::lmdb::LmdbStorage;
16use ant_protocol::payment::verify::{verify_quote_content, verify_quote_signature};
17use evmlib::common::Amount;
18use evmlib::contract::payment_vault;
19use evmlib::merkle_batch_payment::{OnChainPaymentInfo, PoolHash};
20use evmlib::Network as EvmNetwork;
21use evmlib::ProofOfPayment;
22use evmlib::RewardsAddress;
23use lru::LruCache;
24use parking_lot::{Mutex, RwLock};
25use saorsa_core::identity::node_identity::peer_id_from_public_key_bytes;
26use saorsa_core::identity::PeerId;
27use saorsa_core::P2PNode;
28use std::num::NonZeroUsize;
29use std::sync::Arc;
30
31/// Minimum allowed size for a payment proof in bytes.
32///
33/// This minimum ensures the proof contains at least a basic cryptographic hash or identifier.
34/// Proofs smaller than this are rejected as they cannot contain sufficient payment information.
35pub const MIN_PAYMENT_PROOF_SIZE_BYTES: usize = 32;
36
37/// Maximum allowed size for a payment proof in bytes (256 KB).
38///
39/// Single-node proofs with 7 ML-DSA-65 quotes reach ~40 KB.
40/// Merkle proofs include 16 candidate nodes (each with ~1,952-byte ML-DSA pub key
41/// and ~3,309-byte signature) plus merkle branch hashes, totaling ~130 KB.
42/// 256 KB provides headroom while still capping memory during verification.
43pub const MAX_PAYMENT_PROOF_SIZE_BYTES: usize = 262_144;
44
45/// Maximum percentage by which a quote's paid price may fall *below* the node's
46/// current price before the quote is rejected as stale.
47///
48/// The freshness gate is one-directional and price-based, not a symmetric
49/// record-count delta:
50///
51/// - **Over-payment is always accepted.** If the client paid at least the
52///   node's current price (e.g. the node pruned records and is now cheaper),
53///   the quote is fine — a node has no reason to reject money.
54/// - **Only meaningful under-payment is rejected.** A quote priced below the
55///   current price by more than this percentage is rejected as stale.
56///
57/// Comparing prices instead of raw record counts makes the tolerance
58/// self-scaling against the quadratic pricing curve: at low/moderate fill the
59/// curve is nearly flat, so normal in-flight churn (the node storing a handful
60/// of replicated records between quoting and verifying) is a negligible price
61/// change and passes; at high fill the curve is steep, so the same percentage
62/// still catches genuinely stale, underpriced quotes.
63const QUOTE_PRICE_STALENESS_PCT_TOLERANCE: u64 = 25;
64
65/// Configuration for EVM payment verification.
66///
67/// EVM verification is always on. All new data requires on-chain
68/// payment verification. The network field selects which EVM chain to use.
69#[derive(Debug, Clone)]
70pub struct EvmVerifierConfig {
71    /// EVM network to use (Arbitrum One, Arbitrum Sepolia, etc.)
72    pub network: EvmNetwork,
73}
74
75impl Default for EvmVerifierConfig {
76    fn default() -> Self {
77        Self {
78            network: EvmNetwork::ArbitrumOne,
79        }
80    }
81}
82
83/// Configuration for the payment verifier.
84///
85/// All new data requires EVM payment on Arbitrum. The cache stores
86/// previously verified payments to avoid redundant on-chain lookups.
87#[derive(Debug, Clone)]
88pub struct PaymentVerifierConfig {
89    /// EVM verifier configuration.
90    pub evm: EvmVerifierConfig,
91    /// Cache capacity (number of `XorName` values to cache).
92    pub cache_capacity: usize,
93    /// Local node's rewards address.
94    /// The verifier rejects payments that don't include this node as a recipient.
95    pub local_rewards_address: RewardsAddress,
96}
97
98/// The situation a payment proof is being verified in.
99///
100/// A proof-of-payment is a *receipt*: it records a sale that closed at some
101/// earlier moment, at that moment's prices, between the client and the close
102/// group of that moment. Two very different callers present receipts:
103///
104/// - **`ClientPut`** — the node is the storer being paid *right now*. Every
105///   check applies, including the ones that interrogate the present: "is the
106///   price on this receipt still fair for my current fullness?" (own-quote
107///   freshness) and "am I actually one of the paid recipients?" (local
108///   recipient / merkle candidate closeness).
109/// - **`Replication`** — a neighbour is handing over an already-paid record
110///   (fresh-write fan-out, paid-notify, repair). The sale closed long ago; the
111///   network's job now is to keep the record at target redundancy for the rest
112///   of its life. Re-asking the present-tense questions of a receipt is a
113///   category error with a guaranteed failure mode: record counts only grow,
114///   so every receipt's quoted price eventually drops below the verifier's
115///   live floor, and close groups churn, so the receiving node eventually
116///   isn't a quoted recipient at all. On DEV-01 (2026-06-05) this rejected
117///   nearly 100% of replication proof-of-payment transfers within an hour of
118///   launch (4M+
119///   rejections at ~300k/hour), pinned records below target redundancy, and
120///   drove a permanent ~500 MB/s fleet-wide re-offer storm.
121///
122/// Under `Replication` the verifier therefore skips only the
123/// storer-being-paid-now checks. Everything that makes the receipt a receipt
124/// still runs: quote structure, content binding to this exact address,
125/// peer-ID/pub-key bindings, ML-DSA signatures, and the on-chain settlement
126/// lookup. A record cannot be admitted via replication without an authentic,
127/// settled payment for that record.
128///
129/// The verified-`XorName` cache is context-aware to match: an entry inserted
130/// by a `Replication` verification satisfies later replication lookups but
131/// NOT a later `ClientPut` fast-path, so a replication receipt can never let
132/// a client PUT bypass the checks this enum gates.
133///
134/// Trade-off (deliberate, documented): skipping the recipient/closeness
135/// checks for replication means a payer who self-deals — minting a quote pool
136/// from peers they control and settling the median payment to their own
137/// wallet on-chain — can present that receipt to honest nodes via the
138/// replication protocol, paying only gas plus a recycled self-payment instead
139/// of paying real storers. The client-PUT path still rejects such pools, and
140/// replication admission still requires the receiving node to be responsible
141/// for the key, so the abuse costs a settled on-chain payment per chunk and
142/// buys only what storage already costs; closing it properly belongs in quote
143/// issuance / payment policy, not in the replication hot path, where the
144/// equivalent defence provably destroys the network's ability to heal.
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub enum VerificationContext {
147    /// The node is the storer being paid right now: all checks apply.
148    ClientPut,
149    /// An already-settled receipt presented during replication/repair: skip
150    /// the storer-being-paid-now checks (own-quote price freshness, local
151    /// recipient, merkle candidate closeness); keep all receipt-authenticity
152    /// checks.
153    Replication,
154}
155
156/// Status returned by payment verification.
157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
158pub enum PaymentStatus {
159    /// Data was found in local cache - previously paid.
160    CachedAsVerified,
161    /// New data - payment required.
162    PaymentRequired,
163    /// Payment was provided and verified.
164    PaymentVerified,
165}
166
167impl PaymentStatus {
168    /// Returns true if the data can be stored (cached or payment verified).
169    #[must_use]
170    pub fn can_store(&self) -> bool {
171        matches!(self, Self::CachedAsVerified | Self::PaymentVerified)
172    }
173
174    /// Returns true if this status indicates the data was already paid for.
175    #[must_use]
176    pub fn is_cached(&self) -> bool {
177        matches!(self, Self::CachedAsVerified)
178    }
179}
180
181/// Default capacity for the merkle pool cache (number of pool hashes to cache).
182const DEFAULT_POOL_CACHE_CAPACITY: usize = 1_000;
183
184/// Main payment verifier for ant-node.
185///
186/// Uses:
187/// 1. LRU cache for fast lookups of previously verified `XorName` values
188/// 2. EVM payment verification for new data (always required)
189/// 3. Pool-level cache for merkle batch payments (avoids repeated on-chain queries)
190pub struct PaymentVerifier {
191    /// LRU cache of verified `XorName` values.
192    cache: VerifiedCache,
193    /// LRU cache of verified merkle pool hashes → on-chain payment info.
194    pool_cache: Mutex<LruCache<PoolHash, OnChainPaymentInfo>>,
195    /// LRU cache of pool hashes whose candidate closeness has already been
196    /// verified by this node. Collapses the per-chunk Kademlia lookup cost
197    /// within a batch (256 chunks × 1 pool = 1 lookup instead of 256).
198    closeness_pass_cache: Mutex<LruCache<PoolHash, ()>>,
199    /// In-flight closeness lookups, keyed by pool hash. Lets concurrent PUTs
200    /// for the same pool coalesce onto a single Kademlia lookup AND share
201    /// its result — on both success and failure — which bounds `DoS`
202    /// amplification to one lookup per unique `pool_hash` regardless of
203    /// concurrency.
204    inflight_closeness: Mutex<LruCache<PoolHash, Arc<ClosenessSlot>>>,
205    /// P2P node handle, attached post-construction so merkle verification can
206    /// check that candidate `pub_keys` map to peers actually close to the pool
207    /// midpoint in the live DHT. `None` in unit tests that don't exercise
208    /// merkle verification; production startup MUST call [`attach_p2p_node`].
209    p2p_node: RwLock<Option<Arc<P2PNode>>>,
210    /// LMDB storage handle, attached post-construction so the storage-delta
211    /// freshness check can read the authoritative on-disk record count without
212    /// depending on a side counter that may drift from replication/repair/prune
213    /// paths. `None` in unit tests that pre-set [`Self::test_records_override`];
214    /// production startup MUST call [`attach_storage`].
215    storage: RwLock<Option<Arc<LmdbStorage>>>,
216    /// Test-only override for the storage-delta freshness check.
217    ///
218    /// When `Some(n)`, `validate_quote_freshness` uses `n` as the current
219    /// record count instead of querying `storage.current_chunks()`. Set via
220    /// [`Self::set_records_stored_for_tests`] so unit tests that don't wire a
221    /// real `LmdbStorage` can still drive the freshness logic.
222    test_records_override: RwLock<Option<u64>>,
223    /// Test-only override for this node's own peer ID, used by
224    /// `validate_quote_freshness` to pick out the node's own quote from the
225    /// payment bundle. Production code derives it from the attached
226    /// [`P2PNode`]; set via [`Self::set_peer_id_for_tests`] so unit tests can
227    /// drive the freshness logic without wiring a real `P2PNode`.
228    test_peer_id_override: RwLock<Option<[u8; 32]>>,
229    /// Configuration.
230    config: PaymentVerifierConfig,
231}
232
233/// Shared state for an inflight closeness verification. The leader publishes
234/// its result via the `OnceLock`; waiters read that result directly instead
235/// of racing on a cache re-check. Wrapped in an `Arc` and held both by the
236/// leader's drop guard and by each waiting task.
237struct ClosenessSlot {
238    notify: Arc<tokio::sync::Notify>,
239    /// `Some(Ok(()))` on success, `Some(Err(msg))` on failure, `None` if the
240    /// leader disappeared without publishing (panic, cancellation).
241    result: std::sync::OnceLock<std::result::Result<(), String>>,
242}
243
244impl ClosenessSlot {
245    fn new() -> Self {
246        Self {
247            notify: Arc::new(tokio::sync::Notify::new()),
248            result: std::sync::OnceLock::new(),
249        }
250    }
251
252    /// Build an owned `Notified` future that snapshots the `notify_waiters`
253    /// counter at call time. Awaiting this future after dropping external
254    /// locks is race-free: if `notify_waiters` fires between construction
255    /// and the first poll, the snapshot mismatch resolves the future
256    /// immediately.
257    fn notified_owned(&self) -> tokio::sync::futures::OwnedNotified {
258        Arc::clone(&self.notify).notified_owned()
259    }
260}
261
262/// Drop guard that publishes the leader's result, clears the inflight slot,
263/// and wakes all waiters. Fires on every exit path: success, failure, panic,
264/// future-cancellation.
265///
266/// The guard owns its own `Arc<ClosenessSlot>` so `notify_waiters` still
267/// fires even if LRU pressure evicted the slot before the leader finished.
268/// Waiters see the published result via `result.get()`; the `Notify` is only
269/// the wake-up signal.
270struct InflightGuard<'a> {
271    slot_cache: &'a Mutex<LruCache<PoolHash, Arc<ClosenessSlot>>>,
272    pool_hash: PoolHash,
273    slot: Arc<ClosenessSlot>,
274}
275
276impl InflightGuard<'_> {
277    /// Publish the leader's result. Called exactly once by the leader on
278    /// every successful or explicit-error exit. If dropped without calling
279    /// (panic, cancellation) the guard still wakes waiters but leaves
280    /// `result` empty, which waiters treat as a transient failure and retry.
281    fn publish(&self, result: &Result<()>) {
282        let stored: std::result::Result<(), String> = match result {
283            Ok(()) => Ok(()),
284            Err(e) => Err(e.to_string()),
285        };
286        let _ = self.slot.result.set(stored);
287    }
288}
289
290impl Drop for InflightGuard<'_> {
291    fn drop(&mut self) {
292        // Remove the slot entry if it's still ours. A separate leader may
293        // have inserted a new slot for the same pool_hash after LRU
294        // eviction — don't pop someone else's entry.
295        {
296            let mut cache = self.slot_cache.lock();
297            if let Some(existing) = cache.peek(&self.pool_hash) {
298                if Arc::ptr_eq(existing, &self.slot) {
299                    cache.pop(&self.pool_hash);
300                }
301            }
302        }
303        // Wake every waiter registered against OUR slot, regardless of
304        // whether the cache entry is still ours.
305        self.slot.notify.notify_waiters();
306    }
307}
308
309impl PaymentVerifier {
310    /// Create a new payment verifier.
311    #[must_use]
312    pub fn new(config: PaymentVerifierConfig) -> Self {
313        const _: () = assert!(
314            DEFAULT_POOL_CACHE_CAPACITY > 0,
315            "pool cache capacity must be > 0"
316        );
317        let cache = VerifiedCache::with_capacity(config.cache_capacity);
318        let pool_cache_size =
319            NonZeroUsize::new(DEFAULT_POOL_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
320        let pool_cache = Mutex::new(LruCache::new(pool_cache_size));
321        let closeness_pass_cache = Mutex::new(LruCache::new(pool_cache_size));
322        let inflight_closeness = Mutex::new(LruCache::new(pool_cache_size));
323
324        let cache_capacity = config.cache_capacity;
325        info!("Payment verifier initialized (cache_capacity={cache_capacity}, evm=always-on, pool_cache={DEFAULT_POOL_CACHE_CAPACITY})");
326
327        // Loud warning if a production binary was accidentally built with
328        // `test-utils`: that feature flips the closeness-check fail-open
329        // switch, disabling the pay-yourself defence when P2PNode isn't
330        // attached. Safe in tests, never intended for prod.
331        #[cfg(feature = "test-utils")]
332        crate::logging::error!(
333            "PaymentVerifier: built with `test-utils` feature — merkle closeness \
334             defence falls back to fail-open when no P2PNode is attached. This \
335             feature is for test binaries only; production nodes must be built \
336             without it."
337        );
338
339        Self {
340            cache,
341            pool_cache,
342            closeness_pass_cache,
343            inflight_closeness,
344            p2p_node: RwLock::new(None),
345            storage: RwLock::new(None),
346            test_records_override: RwLock::new(None),
347            test_peer_id_override: RwLock::new(None),
348            config,
349        }
350    }
351
352    /// Attach the node's [`P2PNode`] handle so merkle-payment verification can
353    /// check candidate `pub_keys` against the DHT's actual closest peers to the
354    /// pool midpoint.
355    ///
356    /// Production startup MUST call this once the `P2PNode` exists. Without
357    /// it, the closeness check fails CLOSED in release builds (rejects the
358    /// PUT with a visible error) and fails open in test builds. Idempotent:
359    /// calling twice replaces the handle.
360    pub fn attach_p2p_node(&self, node: Arc<P2PNode>) {
361        *self.p2p_node.write() = Some(node);
362        debug!("PaymentVerifier: P2PNode attached for merkle closeness checks");
363    }
364
365    /// Attach the node's [`LmdbStorage`] handle so storage-delta freshness
366    /// checks can query the authoritative on-disk record count.
367    ///
368    /// Production startup MUST call this once the storage exists; otherwise
369    /// `validate_quote_freshness` falls back to treating the current count as
370    /// zero, which will reject all non-trivial quotes. Idempotent: calling
371    /// twice replaces the handle.
372    pub fn attach_storage(&self, storage: Arc<LmdbStorage>) {
373        *self.storage.write() = Some(storage);
374        debug!("PaymentVerifier: LmdbStorage attached for storage-delta freshness checks");
375    }
376
377    /// Test-only setter for the current record count used by storage-delta
378    /// freshness checks. Lets unit tests drive the freshness logic without
379    /// wiring a real `LmdbStorage`. Has no effect in production code because
380    /// production code is expected to call [`Self::attach_storage`] instead.
381    #[cfg(any(test, feature = "test-utils"))]
382    pub fn set_records_stored_for_tests(&self, count: u64) {
383        *self.test_records_override.write() = Some(count);
384    }
385
386    /// Test-only setter for the node's own peer ID used by the quote
387    /// freshness check. Lets unit tests mark which quote in a payment bundle
388    /// is "ours" without wiring a real `P2PNode`. Has no effect in production
389    /// code because production code is expected to call
390    /// [`Self::attach_p2p_node`] instead.
391    #[cfg(any(test, feature = "test-utils"))]
392    pub fn set_peer_id_for_tests(&self, peer_id_bytes: [u8; 32]) {
393        *self.test_peer_id_override.write() = Some(peer_id_bytes);
394    }
395
396    /// Snapshot this node's own peer ID for the quote freshness check.
397    ///
398    /// Prefers the attached [`P2PNode`] (authoritative). Falls back to a test
399    /// override if one was set. Returns `None` only when no source is
400    /// available (mis-configured production startup); the caller treats that
401    /// as "unknown" and skips the freshness gate rather than rejecting — the
402    /// same fail-open posture as a missing record-count source.
403    fn self_peer_id_bytes(&self) -> Option<[u8; 32]> {
404        if let Some(node) = self.p2p_node.read().as_ref() {
405            return Some(*node.peer_id().as_bytes());
406        }
407        *self.test_peer_id_override.read()
408    }
409
410    /// Snapshot the current record count for freshness comparisons.
411    ///
412    /// Prefers the attached `LmdbStorage` (authoritative — covers client PUTs,
413    /// replication stores, repair fetches, and prune deletes by definition).
414    /// Falls back to a test override if one was set. Returns `None` only when
415    /// no source is available (mis-configured production startup); the caller
416    /// treats that as "unknown" and skips storage-delta gating rather than
417    /// rejecting all quotes outright.
418    fn current_records_stored(&self) -> Option<u64> {
419        if let Some(storage) = self.storage.read().as_ref() {
420            match storage.current_chunks() {
421                Ok(n) => return Some(n),
422                Err(e) => {
423                    warn!(
424                        "PaymentVerifier: failed to read current_chunks() for freshness check: {e}"
425                    );
426                    return None;
427                }
428            }
429        }
430        *self.test_records_override.read()
431    }
432
433    /// Check if payment is required for the given `XorName`.
434    ///
435    /// This is the main entry point for payment verification:
436    /// 1. Check LRU cache (fast path)
437    /// 2. If not cached, payment is required
438    ///
439    /// The fast path is context-aware: a `ClientPut` lookup is satisfied only
440    /// by an entry whose verification ran the full client-PUT check set. An
441    /// entry inserted by a `Replication` verification (which skips the
442    /// storer-being-paid-now checks) must not let a later client PUT bypass
443    /// those checks. A `Replication` lookup accepts either kind of entry.
444    ///
445    /// # Arguments
446    ///
447    /// * `xorname` - The content-addressed name of the data
448    /// * `context` - The verification context of the caller
449    ///
450    /// # Returns
451    ///
452    /// * `PaymentStatus::CachedAsVerified` - Found in local cache (previously paid)
453    /// * `PaymentStatus::PaymentRequired` - Not cached (payment required)
454    pub fn check_payment_required(
455        &self,
456        xorname: &XorName,
457        context: VerificationContext,
458    ) -> PaymentStatus {
459        // Check LRU cache (fast path)
460        let cached = match context {
461            VerificationContext::ClientPut => self.cache.contains_client_put_verified(xorname),
462            VerificationContext::Replication => self.cache.contains(xorname),
463        };
464        if cached {
465            if crate::logging::enabled!(crate::logging::Level::DEBUG) {
466                debug!("Data {} found in verified cache", hex::encode(xorname));
467            }
468            return PaymentStatus::CachedAsVerified;
469        }
470
471        // Not in cache - payment required
472        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
473            debug!(
474                "Data {} not in cache - payment required",
475                hex::encode(xorname)
476            );
477        }
478        PaymentStatus::PaymentRequired
479    }
480
481    /// Verify that a PUT request has valid payment.
482    ///
483    /// This is the complete payment verification flow:
484    /// 1. Check if data is in cache (previously paid)
485    /// 2. If not, verify the provided payment proof
486    ///
487    /// # Arguments
488    ///
489    /// * `xorname` - The content-addressed name of the data
490    /// * `payment_proof` - Optional payment proof (required if not in cache)
491    /// * `context` - Whether the proof backs a live client PUT or an
492    ///   already-settled receipt presented during replication — see
493    ///   [`VerificationContext`] for which checks each context runs
494    ///
495    /// # Returns
496    ///
497    /// * `Ok(PaymentStatus)` - Verification succeeded
498    /// * `Err(Error::Payment)` - No payment and not cached, or payment invalid
499    ///
500    /// # Errors
501    ///
502    /// Returns an error if payment is required but not provided, or if payment is invalid.
503    pub async fn verify_payment(
504        &self,
505        xorname: &XorName,
506        payment_proof: Option<&[u8]>,
507        context: VerificationContext,
508    ) -> Result<PaymentStatus> {
509        // First check if payment is required
510        let status = self.check_payment_required(xorname, context);
511
512        match status {
513            PaymentStatus::CachedAsVerified => {
514                // No payment needed - already in cache
515                Ok(status)
516            }
517            PaymentStatus::PaymentRequired => {
518                // EVM verification is always on — verify the proof
519                if let Some(proof) = payment_proof {
520                    let proof_len = proof.len();
521                    if proof_len < MIN_PAYMENT_PROOF_SIZE_BYTES {
522                        return Err(Error::Payment(format!(
523                            "Payment proof too small: {proof_len} bytes (min {MIN_PAYMENT_PROOF_SIZE_BYTES})"
524                        )));
525                    }
526                    if proof_len > MAX_PAYMENT_PROOF_SIZE_BYTES {
527                        return Err(Error::Payment(format!(
528                            "Payment proof too large: {proof_len} bytes (max {MAX_PAYMENT_PROOF_SIZE_BYTES} bytes)"
529                        )));
530                    }
531
532                    // Detect proof type from version tag byte
533                    match detect_proof_type(proof) {
534                        Some(ProofType::Merkle) => {
535                            self.verify_merkle_payment(xorname, proof, context).await?;
536                        }
537                        Some(ProofType::SingleNode) => {
538                            let (payment, tx_hashes) = deserialize_proof(proof).map_err(|e| {
539                                Error::Payment(format!("Failed to deserialize payment proof: {e}"))
540                            })?;
541
542                            if !tx_hashes.is_empty() {
543                                debug!("Proof includes {} transaction hash(es)", tx_hashes.len());
544                            }
545
546                            self.verify_evm_payment(xorname, &payment, context).await?;
547                        }
548                        None => {
549                            let tag = proof.first().copied().unwrap_or(0);
550                            return Err(Error::Payment(format!(
551                                "Unknown payment proof type tag: 0x{tag:02x}"
552                            )));
553                        }
554                        // ant-protocol marks `ProofType` as `#[non_exhaustive]`.
555                        // A future proof variant that this node does not yet
556                        // understand must be rejected, not silently accepted.
557                        Some(_) => {
558                            let tag = proof.first().copied().unwrap_or(0);
559                            return Err(Error::Payment(format!(
560                                "Unsupported payment proof type tag: 0x{tag:02x} (this node's protocol version does not handle it — upgrade ant-node)"
561                            )));
562                        }
563                    }
564
565                    // Cache the verified xorname, recording which check set
566                    // ran. A Replication-verified entry satisfies later
567                    // replication lookups (re-offers of the same key are
568                    // routine) but not a later ClientPut fast-path — the
569                    // context-gated checks were never run for it.
570                    match context {
571                        VerificationContext::ClientPut => self.cache.insert(*xorname),
572                        VerificationContext::Replication => {
573                            self.cache.insert_replication_verified(*xorname);
574                        }
575                    }
576
577                    Ok(PaymentStatus::PaymentVerified)
578                } else {
579                    // No payment provided in production mode
580                    let xorname_hex = hex::encode(xorname);
581                    Err(Error::Payment(format!(
582                        "Payment required for new data {xorname_hex}"
583                    )))
584                }
585            }
586            PaymentStatus::PaymentVerified => Err(Error::Payment(
587                "Unexpected PaymentVerified status from check_payment_required".to_string(),
588            )),
589        }
590    }
591
592    /// Get cache statistics.
593    #[must_use]
594    pub fn cache_stats(&self) -> CacheStats {
595        self.cache.stats()
596    }
597
598    /// Get the number of cached entries.
599    #[must_use]
600    pub fn cache_len(&self) -> usize {
601        self.cache.len()
602    }
603
604    /// Pre-populate the payment cache for a given address.
605    ///
606    /// This marks the address as already paid, so subsequent `verify_payment`
607    /// calls will return `CachedAsVerified` without on-chain verification.
608    /// Useful for test setups where real EVM payment is not needed.
609    #[cfg(any(test, feature = "test-utils"))]
610    pub fn cache_insert(&self, xorname: XorName) {
611        self.cache.insert(xorname);
612    }
613
614    /// Pre-populate the merkle pool cache. Testing helper that lets e2e tests
615    /// bypass the on-chain `completedMerklePayments` lookup when the point of
616    /// the test is to exercise merkle-verification logic BEFORE the on-chain
617    /// call (e.g. the pay-yourself closeness check).
618    #[cfg(any(test, feature = "test-utils"))]
619    pub fn pool_cache_insert(&self, pool_hash: PoolHash, info: OnChainPaymentInfo) {
620        let mut cache = self.pool_cache.lock();
621        cache.put(pool_hash, info);
622    }
623
624    /// Verify a single-node EVM payment proof.
625    ///
626    /// Verification steps:
627    /// 1. Exactly `CLOSE_GROUP_SIZE` quotes are present
628    /// 2. All quotes target the correct content address (xorname binding)
629    /// 3. This node's own quote price is fresh (`ClientPut` only — a
630    ///    replication receipt's price was fixed at the original sale and the
631    ///    node's record count has legitimately grown since)
632    /// 4. Peer ID bindings match the ML-DSA-65 public keys
633    /// 5. This node is among the quoted recipients (`ClientPut` only — a
634    ///    post-churn close-group member receiving a record via replication
635    ///    was never a payee on the original receipt)
636    /// 6. All ML-DSA-65 signatures are valid (offloaded to `spawn_blocking`)
637    /// 7. The median-priced quote was paid at least 3x its price on-chain
638    ///    (looked up via `completedPayments(quoteHash)` on the payment vault)
639    ///
640    /// See [`VerificationContext`] for why steps 3 and 5 are context-gated.
641    ///
642    /// For unit tests that don't need on-chain verification, pre-populate
643    /// the cache so `verify_payment` returns `CachedAsVerified` before
644    /// reaching this method.
645    async fn verify_evm_payment(
646        &self,
647        xorname: &XorName,
648        payment: &ProofOfPayment,
649        context: VerificationContext,
650    ) -> Result<()> {
651        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
652            let xorname_hex = hex::encode(xorname);
653            let quote_count = payment.peer_quotes.len();
654            debug!(
655                "Verifying EVM payment for {xorname_hex} with {quote_count} quotes ({context:?})"
656            );
657        }
658
659        Self::validate_quote_structure(payment)?;
660        Self::validate_quote_content(payment, xorname)?;
661        if context == VerificationContext::ClientPut {
662            self.validate_quote_freshness(payment)?;
663        }
664        Self::validate_peer_bindings(payment)?;
665        if context == VerificationContext::ClientPut {
666            self.validate_local_recipient(payment)?;
667        }
668
669        // Verify quote signatures (CPU-bound, run off async runtime)
670        let peer_quotes = payment.peer_quotes.clone();
671        tokio::task::spawn_blocking(move || {
672            for (encoded_peer_id, quote) in &peer_quotes {
673                if !verify_quote_signature(quote) {
674                    return Err(Error::Payment(
675                        format!("Quote ML-DSA-65 signature verification failed for peer {encoded_peer_id:?}"),
676                    ));
677                }
678            }
679            Ok(())
680        })
681        .await
682        .map_err(|e| Error::Payment(format!("Signature verification task failed: {e}")))??;
683
684        // Reconstruct the SingleNodePayment to identify the median quote.
685        // from_quotes() sorts by price and marks the median for 3x payment.
686        let quotes_with_prices: Vec<_> = payment
687            .peer_quotes
688            .iter()
689            .map(|(_, quote)| (quote.clone(), quote.price))
690            .collect();
691        let single_payment = SingleNodePayment::from_quotes(quotes_with_prices).map_err(|e| {
692            Error::Payment(format!(
693                "Failed to reconstruct payment for verification: {e}"
694            ))
695        })?;
696
697        // Verify the median quote was paid at least 3x its price on-chain
698        // via completedPayments(quoteHash) on the payment vault contract.
699        let verified_amount = single_payment
700            .verify(&self.config.evm.network)
701            .await
702            .map_err(|e| {
703                let xorname_hex = hex::encode(xorname);
704                Error::Payment(format!(
705                    "Median quote payment verification failed for {xorname_hex}: {e}"
706                ))
707            })?;
708
709        if crate::logging::enabled!(crate::logging::Level::INFO) {
710            let xorname_hex = hex::encode(xorname);
711            info!("EVM payment verified for {xorname_hex} (median paid {verified_amount} atto)");
712        }
713        Ok(())
714    }
715
716    /// Validate quote count, uniqueness, and basic structure.
717    fn validate_quote_structure(payment: &ProofOfPayment) -> Result<()> {
718        if payment.peer_quotes.is_empty() {
719            return Err(Error::Payment("Payment has no quotes".to_string()));
720        }
721
722        let quote_count = payment.peer_quotes.len();
723        if quote_count != CLOSE_GROUP_SIZE {
724            return Err(Error::Payment(format!(
725                "Payment must have exactly {CLOSE_GROUP_SIZE} quotes, got {quote_count}"
726            )));
727        }
728
729        let mut seen: Vec<&evmlib::EncodedPeerId> = Vec::with_capacity(quote_count);
730        for (encoded_peer_id, _) in &payment.peer_quotes {
731            if seen.contains(&encoded_peer_id) {
732                return Err(Error::Payment(format!(
733                    "Duplicate peer ID in payment quotes: {encoded_peer_id:?}"
734                )));
735            }
736            seen.push(encoded_peer_id);
737        }
738
739        Ok(())
740    }
741
742    /// Verify all quotes target the correct content address.
743    fn validate_quote_content(payment: &ProofOfPayment, xorname: &XorName) -> Result<()> {
744        for (encoded_peer_id, quote) in &payment.peer_quotes {
745            if !verify_quote_content(quote, xorname) {
746                let expected_hex = hex::encode(xorname);
747                let actual_hex = hex::encode(quote.content.0);
748                return Err(Error::Payment(format!(
749                    "Quote content address mismatch for peer {encoded_peer_id:?}: expected {expected_hex}, got {actual_hex}"
750                )));
751            }
752        }
753        Ok(())
754    }
755
756    /// Verify quote freshness by price staleness, not wall-clock time and not a
757    /// symmetric record-count delta.
758    ///
759    /// The quote price encodes the quoting node's record count via the quadratic
760    /// pricing formula. We compute the price the node would charge *now* for its
761    /// current fullness and reject the quote only if the client under-paid that
762    /// current price by more than [`QUOTE_PRICE_STALENESS_PCT_TOLERANCE`]. This:
763    ///
764    /// - removes the platform clock dependency that caused Windows/UTC false
765    ///   rejections (timestamps are deliberately unused);
766    /// - never rejects an over-payment (the previous symmetric `abs_diff` check
767    ///   rejected quotes where the node had *fewer* records than when it quoted,
768    ///   i.e. the client paid for a fuller, pricier node — nonsensical to
769    ///   reject); and
770    /// - self-scales with the pricing curve, so benign in-flight churn (a node
771    ///   storing a few replicated records between quoting and verifying) — a
772    ///   negligible price move where the curve is flat — no longer rejects an
773    ///   otherwise-valid payment. On a fresh, rapidly-filling testnet that churn
774    ///   routinely exceeded the old fixed 5-record tolerance and rejected ~100%
775    ///   of uploads via the multiplicative per-chunk effect.
776    ///
777    /// The current record count comes from the attached [`LmdbStorage`] via
778    /// `current_chunks()` — an O(1) B-tree page-header read, authoritative
779    /// regardless of which path stored the record (client PUT, replication
780    /// store, repair fetch) or removed it (prune delete). If no storage source
781    /// is available (mis-configured production startup, or a unit test that
782    /// didn't set a test override), the gate is skipped entirely rather than
783    /// rejecting every quote — see [`Self::current_records_stored`].
784    ///
785    /// **Only this node's own quote is gated.** A bundle contains one quote
786    /// per close-group peer, and fullness across a close group is wildly
787    /// heterogeneous on a real network (a freshly joined node holds tens of
788    /// records while an established neighbour holds thousands). Comparing a
789    /// *neighbour's* quote price against *this node's* record count therefore
790    /// rejects honest payments whenever the group spans more than the
791    /// tolerance — on ant-prod-01 a close group spanning 47..=1788 records
792    /// made the three fullest nodes reject every bundle containing the
793    /// emptiest node's (perfectly fresh, 10-second-old) quote, failing the
794    /// PUT after the client had already paid on-chain. The node can only
795    /// re-derive *its own* price from its own record count, so its own quote
796    /// is the only one it can legitimately call stale. Replay of another
797    /// node's old cheap quote is that node's gate to enforce when the PUT
798    /// reaches it; the on-chain median payment binding is unaffected either
799    /// way.
800    ///
801    /// A bundle holds at most one quote per peer — [`Self::validate_quote_structure`]
802    /// rejects duplicate peer IDs and runs before this gate on every path —
803    /// so the loop below matches at most one own quote.
804    fn validate_quote_freshness(&self, payment: &ProofOfPayment) -> Result<()> {
805        let Some(current_records) = self.current_records_stored() else {
806            debug!(
807                "PaymentVerifier: no record-count source attached; skipping \
808                 quote price-staleness check"
809            );
810            return Ok(());
811        };
812
813        let Some(self_peer_id) = self.self_peer_id_bytes() else {
814            debug!(
815                "PaymentVerifier: no self peer-id source attached; skipping \
816                 quote price-staleness check"
817            );
818            return Ok(());
819        };
820
821        // The price the node would charge right now for its current fullness,
822        // and the floor a quote may not drop below (one-directional: paying at
823        // or above `current_price` is always accepted).
824        let current_price = calculate_price(usize::try_from(current_records).unwrap_or(usize::MAX));
825        let min_acceptable_price = current_price.saturating_mul(Amount::from(
826            100u64.saturating_sub(QUOTE_PRICE_STALENESS_PCT_TOLERANCE),
827        )) / Amount::from(100u64);
828
829        let mut own_quote_seen = false;
830        for (encoded_peer_id, quote) in &payment.peer_quotes {
831            if encoded_peer_id.as_bytes() != &self_peer_id {
832                // A neighbour's quote prices the *neighbour's* fullness; this
833                // node has no basis to judge it against its own record count.
834                continue;
835            }
836            own_quote_seen = true;
837            if quote.price < min_acceptable_price {
838                let quoted_records = derive_records_stored_from_price(quote.price);
839                return Err(Error::Payment(format!(
840                    "Own quote {encoded_peer_id:?} stale: quoted price encodes \
841                     {quoted_records} records but node currently holds {current_records} \
842                     (quoted {}, minimum acceptable {min_acceptable_price} at \
843                     {QUOTE_PRICE_STALENESS_PCT_TOLERANCE}% under-payment tolerance)",
844                    quote.price
845                )));
846            }
847        }
848
849        // Two self-identity notions coexist in this verifier and are expected
850        // to refer to the same node: `validate_local_recipient` matches "us"
851        // by rewards address, this gate by peer ID. They legitimately diverge
852        // when a PUT reaches a node whose own quote isn't in the bundle but
853        // whose rewards address is shared with a quoted sibling (common in
854        // fleet deployments). The gate fail-opens in that case — leave a
855        // breadcrumb, because a silent no-op is exactly what makes a
856        // production incident hard to reconstruct from node logs.
857        if !own_quote_seen {
858            let our_rewards_address_quoted = payment
859                .peer_quotes
860                .iter()
861                .any(|(_, quote)| quote.rewards_address == self.config.local_rewards_address);
862            if our_rewards_address_quoted {
863                debug!(
864                    "PaymentVerifier: bundle contains our rewards address but no quote \
865                     under our peer ID; skipping quote price-staleness check"
866                );
867            }
868        }
869        Ok(())
870    }
871
872    /// Verify each quote's `pub_key` matches the claimed peer ID via BLAKE3.
873    fn validate_peer_bindings(payment: &ProofOfPayment) -> Result<()> {
874        for (encoded_peer_id, quote) in &payment.peer_quotes {
875            let expected_peer_id = peer_id_from_public_key_bytes(&quote.pub_key)
876                .map_err(|e| Error::Payment(format!("Invalid ML-DSA public key in quote: {e}")))?;
877
878            if expected_peer_id.as_bytes() != encoded_peer_id.as_bytes() {
879                let expected_hex = expected_peer_id.to_hex();
880                let actual_hex = hex::encode(encoded_peer_id.as_bytes());
881                return Err(Error::Payment(format!(
882                    "Quote pub_key does not belong to claimed peer {encoded_peer_id:?}: \
883                     BLAKE3(pub_key) = {expected_hex}, peer_id = {actual_hex}"
884                )));
885            }
886        }
887        Ok(())
888    }
889
890    /// Minimum number of candidate `pub_keys` (out of 16) whose derived
891    /// `PeerId` must be among the DHT's actual closest peers to the pool
892    /// midpoint address for the pool to be accepted.
893    ///
894    /// Set to a simple majority (9/16). Two nodes' views of the closest set
895    /// to a midpoint diverge on a young, high-churn, NAT-heavy network — by
896    /// more than a near-unanimous threshold tolerates — so a stricter bar
897    /// rejected honest pools whose candidates are genuinely drawn from the
898    /// midpoint's close group but don't all reappear in this storer's own
899    /// lookup. A majority absorbs that divergence while still requiring most
900    /// candidates to be real peers the live DHT lists as closest.
901    ///
902    /// Security cost: a lower threshold widens the room for the "pay-yourself"
903    /// attack — an attacker running real neighbourhood peers needs fewer of
904    /// them to clear a majority than to clear a near-unanimous bar. No theft
905    /// of funds is possible regardless (payment binds on-chain to the rewards
906    /// address); the cost is that grinding storage payments back to your own
907    /// nodes gets cheaper. Each counted candidate must still be a peer the
908    /// live DHT actually returns as closest — a fabricated off-network key
909    /// cannot satisfy this — so the floor is "run N real top-K Sybil nodes
910    /// AND grind the midpoint", just with a smaller N. Pairs with the planned
911    /// pool-midpoint consensus-anchor work, which removes the midpoint
912    /// grinding freedom that makes a low threshold dangerous.
913    const CANDIDATE_CLOSENESS_REQUIRED: usize = 9;
914
915    /// Timeout for the authoritative network lookup used by the closeness
916    /// check.
917    ///
918    /// Iterative Kademlia lookups can cascade through `MAX_ITERATIONS = 20`
919    /// rounds in saorsa-core's `find_closest_nodes_network`, and a single
920    /// unresponsive peer's dial can take 20–30s before timing out. On a
921    /// young network (e.g. fresh testnet, NAT-simulated peers in 30% of
922    /// the swarm) iterations average ~10s each — captured trace from
923    /// STG-01 EWR-3 ant-node-1 just before a pre-fix timeout:
924    ///
925    /// ```text
926    /// Iter 0: +0.0s | Iter 1: +0.2s | Iter 2: +6.6s | Iter 3: +13.1s
927    /// Iter 4: +20.9s | Iter 5: +39.8s | Iter 6: +50.8s | [60s wall]
928    /// ```
929    ///
930    /// 60s caps the lookup at ~7 iterations and rejects honest pools whose
931    /// candidates only emerge after iteration 7. 240s gives ~1.2× headroom
932    /// over the ~200s natural worst-case runtime on a 1k-node testnet.
933    ///
934    /// `DoS` amplification stays bounded at roughly one in-flight lookup
935    /// per unique `pool_hash` under typical load, via
936    /// [`closeness_pass_cache`] + [`inflight_closeness`]. The bound is
937    /// "typical" because `inflight_closeness` is an LRU and a sustained
938    /// flood of unique `pool_hash` entries can evict an in-flight slot,
939    /// at which point a second leader can race for the same pool (see
940    /// [`InflightGuard::drop`]). At steady state the pool cache and pool
941    /// signature verification gate keep this rare in practice.
942    const CLOSENESS_LOOKUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(240);
943
944    /// Width of the storer's authoritative network lookup, in peers.
945    ///
946    /// The client over-queries `2 * CANDIDATES_PER_POOL = 32` peers via
947    /// `find_closest_peers(addr, 32)` (see
948    /// `ant-client/ant-core/src/data/client/merkle.rs::get_merkle_candidate_pool`)
949    /// and selects 16 valid responders by XOR distance — so truly-close
950    /// peers that are slow, NAT'd, or briefly unreachable get filtered
951    /// out and replaced by peers from positions 17–32 of the network's
952    /// actual ranking. The storer must therefore verify against the same
953    /// wider window: a pool containing peers from positions 17–32 is
954    /// honest (those peers really exist in the network's closest-32 set),
955    /// it's just that the client's quote-collection step couldn't reach
956    /// the peers at positions <17 in time.
957    ///
958    /// Empirical effect on STG-01 (1k-node testnet, 30% NAT-simulated):
959    /// widening from K=16 to K=32 dropped client-side closeness
960    /// mismatches from ~115 to ~31 per 5 min, a 73% reduction.
961    ///
962    /// Performance note: `count` does not just truncate the lookup —
963    /// `find_closest_nodes_network` keeps iterating until either
964    /// `MAX_ITERATIONS` is reached or `best_nodes.len() >= count`. K=32
965    /// can therefore extend lookups by a few iterations on sparse
966    /// networks vs K=16, which reinforces (rather than undermines) the
967    /// timeout bump above.
968    ///
969    /// Security: the pay-yourself attack still requires the attacker's
970    /// fabricated `PeerId`s to land in the storer's authoritative top-K, so
971    /// the dominant cost is Sybil-grinding midpoint addresses or running real
972    /// nodes near the target. The leniency for honest divergence comes from
973    /// the `CANDIDATE_CLOSENESS_REQUIRED` majority threshold, not from this
974    /// window; widening the window further was measured as too heavy on the
975    /// lookup path.
976    const CLOSENESS_LOOKUP_WIDTH: usize = 2 * evmlib::merkle_payments::CANDIDATES_PER_POOL;
977
978    /// Maximum waiter → leader retries when the leader's future was cancelled
979    /// or panicked before publishing a result. Beyond this the waiter returns
980    /// a visible error rather than spinning indefinitely through a
981    /// cancellation cascade.
982    ///
983    /// Worst-case waiter wall-clock is `(MAX_LEADER_RETRIES + 1) *
984    /// CLOSENESS_LOOKUP_TIMEOUT` (one wait per attempt). Kept low (1)
985    /// because the only realistic trigger is leader future-cancellation,
986    /// which should be extraordinarily rare; under sustained adversarial
987    /// cancellation a higher cap doesn't add resilience, it just hides
988    /// the symptom. With `CLOSENESS_LOOKUP_TIMEOUT = 240s` this caps a
989    /// single user-visible verification at ~8 min worst case (vs ~20 min
990    /// at the previous value of 4).
991    const MAX_LEADER_RETRIES: usize = 1;
992
993    /// Compute the storer's authoritative-lookup width for a candidate pool.
994    ///
995    /// Returns `max(CLOSENESS_LOOKUP_WIDTH, pool_len)`: matches the client's
996    /// over-query width today, and scales with the pool if a future protocol
997    /// bump grows pool size beyond `CLOSENESS_LOOKUP_WIDTH`. Truncating to
998    /// `CLOSENESS_LOOKUP_WIDTH` in that future case would re-open the
999    /// K-too-small failure mode (the storer would reject honest pools whose
1000    /// candidates legitimately span a wider XOR range than the storer
1001    /// fetched). Pinned by `closeness_lookup_count_uses_max_of_width_and_pool_len`.
1002    const fn closeness_lookup_count(pool_len: usize) -> usize {
1003        if Self::CLOSENESS_LOOKUP_WIDTH > pool_len {
1004            Self::CLOSENESS_LOOKUP_WIDTH
1005        } else {
1006            pool_len
1007        }
1008    }
1009
1010    /// Verify that the candidate pool's `pub_keys` correspond to peers that
1011    /// are actually XOR-closest to the pool midpoint address, by querying
1012    /// the DHT for its closest peers to that address and requiring that a
1013    /// majority of the candidates match.
1014    ///
1015    /// **What this blocks**: the "pay yourself" attack. Candidate signatures
1016    /// only cover `(price, reward_address, timestamp)` and the `pub_key` bytes —
1017    /// nothing ties a candidate to a network-registered identity or to the
1018    /// pool neighbourhood. Without this check an attacker can generate 16
1019    /// ML-DSA keypairs locally, point all 16 `reward_address` fields at a
1020    /// single attacker-controlled wallet, submit the merkle payment, and drain
1021    /// their own payment back out.
1022    ///
1023    /// **How it blocks**: each candidate's `PeerId = BLAKE3(pub_key)`; the DHT
1024    /// is the authoritative source of "which peers exist at this XOR
1025    /// coordinate". If the attacker's 16 fabricated `PeerId`s are not among
1026    /// the peers the network actually lists as closest to the pool address,
1027    /// the pool is forged.
1028    ///
1029    /// **Scope**: a `MerklePaymentProof` carries exactly one `winner_pool`
1030    /// (the pool the smart contract selected for the batch). Every storing
1031    /// node that receives the proof independently re-runs this check against
1032    /// that same pool, so a forged pool is rejected at every node it
1033    /// reaches.
1034    ///
1035    /// **Known limitation — Sybil-grinding**: `midpoint_proof.address()` is a
1036    /// BLAKE3 hash of attacker-controllable inputs (leaf bytes, tree root,
1037    /// timestamp). A determined attacker who *also* runs Sybil DHT nodes can
1038    /// grind the midpoint until it lands in a region where a majority of
1039    /// their Sybil keys are the true network-closest — at which point this check
1040    /// passes for the attacker. Closing that gap requires binding the
1041    /// midpoint to an attacker-uncontrolled value (e.g. a block hash at
1042    /// payment time or an on-chain VRF) or a Sybil-resistant identity
1043    /// layer. This defence raises the attack cost from "free" to "run N
1044    /// Sybil nodes AND grind", which is a meaningful but not complete
1045    /// improvement.
1046    async fn verify_merkle_candidate_closeness(
1047        &self,
1048        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
1049        pool_hash: PoolHash,
1050    ) -> Result<()> {
1051        // Fast path: this node already verified this pool successfully.
1052        // A batch of 256 chunks shares one winner_pool, so without this cache
1053        // we'd pay a Kademlia lookup per chunk.
1054        if self.closeness_pass_cache.lock().get(&pool_hash).is_some() {
1055            return Ok(());
1056        }
1057
1058        // Single-flight: on each attempt, either claim leadership by
1059        // inserting a fresh `ClosenessSlot`, or wait on an existing leader
1060        // and read its published result. The leader holds an `Arc` to the
1061        // slot independent of the LruCache so waiters are still woken if
1062        // eviction pressure kicked the cache entry.
1063        //
1064        // The `notified_owned()` future snapshots the `notify_waiters`
1065        // counter at the moment of construction (while we hold the lock),
1066        // which makes the subsequent `.await` race-free: if the leader
1067        // calls `notify_waiters` between our construction and our poll, the
1068        // counter has advanced and the future resolves immediately on first
1069        // poll.
1070        //
1071        // Bounded retry: if we're a waiter and the leader gets cancelled or
1072        // panics (slot.result.get() == None after wake-up), we loop back to
1073        // claim leadership. `MAX_LEADER_RETRIES` bounds the attempts so
1074        // adversarial cancellation cascades cannot spin this indefinitely.
1075        for attempt in 0..=Self::MAX_LEADER_RETRIES {
1076            // Release the mutex guard explicitly before any await below.
1077            // Clippy wants `if let ... else` written as `map_or_else`, but
1078            // any such rewrite re-borrows the locked `inflight` inside the
1079            // closure and fails the borrow checker — so the lint is
1080            // silenced here.
1081            #[allow(clippy::option_if_let_else)]
1082            let (waiter_slot, leader_slot) = {
1083                let mut inflight = self.inflight_closeness.lock();
1084                let chosen = if let Some(existing) = inflight.get(&pool_hash) {
1085                    (Some(Arc::clone(existing)), None)
1086                } else {
1087                    let slot = Arc::new(ClosenessSlot::new());
1088                    inflight.put(pool_hash, Arc::clone(&slot));
1089                    (None, Some(slot))
1090                };
1091                drop(inflight);
1092                chosen
1093            };
1094
1095            if let Some(slot) = waiter_slot {
1096                // Build the owned-notified future BEFORE awaiting, so it
1097                // snapshots the `notify_waiters` counter now. The slot
1098                // already existed when we locked, so the leader is either
1099                // running or finished; in both cases the snapshot + counter
1100                // check ensures we wake up correctly.
1101                let notified = slot.notified_owned();
1102                notified.await;
1103
1104                // Leader published a result — use it directly.
1105                if let Some(result) = slot.result.get() {
1106                    return result.clone().map_err(Error::Payment);
1107                }
1108                // Leader disappeared without publishing (panic or
1109                // cancellation). Slot was cleared by the leader's drop
1110                // guard; loop to become the new leader — unless we've
1111                // hit the retry bound (see MAX_LEADER_RETRIES).
1112                if attempt == Self::MAX_LEADER_RETRIES {
1113                    return Err(Error::Payment(
1114                        "Merkle candidate pool rejected: closeness leader \
1115                         repeatedly failed to publish a result (likely \
1116                         repeated cancellation or panic)."
1117                            .into(),
1118                    ));
1119                }
1120                continue;
1121            }
1122
1123            // Leader path. Drop guard clears the slot and wakes waiters on
1124            // every exit (success, failure, panic, cancellation).
1125            let Some(slot) = leader_slot else {
1126                // Unreachable by construction.
1127                return Err(Error::Payment(
1128                    "internal error: neither leader nor waiter in closeness check".into(),
1129                ));
1130            };
1131            let guard = InflightGuard {
1132                slot_cache: &self.inflight_closeness,
1133                pool_hash,
1134                slot,
1135            };
1136
1137            let result = self.verify_merkle_candidate_closeness_inner(pool).await;
1138            guard.publish(&result);
1139            if result.is_ok() {
1140                self.closeness_pass_cache.lock().put(pool_hash, ());
1141            }
1142            return result;
1143        }
1144        // Unreachable: the for-loop body always either `return`s or `continue`s,
1145        // and the waiter branch's `continue` only runs when `attempt <
1146        // Self::MAX_LEADER_RETRIES`. The last iteration's waiter branch returns
1147        // via the retry-bound check; the leader branch always returns.
1148        Err(Error::Payment(
1149            "internal error: closeness retry loop exited without returning".into(),
1150        ))
1151    }
1152
1153    /// Inner closeness check: the actual DHT lookup + set-membership test.
1154    /// Wrapped by [`verify_merkle_candidate_closeness`] with a pass-cache and
1155    /// single-flight guard so a batch of chunks and a storm of forged PUTs
1156    /// don't multiply the lookup cost.
1157    /// Derive each candidate's `PeerId` from its `pub_key` and reject the
1158    /// pool if any `PeerId` appears more than once.
1159    ///
1160    /// This is a pure-validation pre-check, runnable without a `P2PNode`:
1161    /// catches the case where one real peer's `pub_key` is repeated to
1162    /// inflate the closeness match count, without paying for a Kademlia
1163    /// lookup. An honest pool has [`evmlib::merkle_payments::CANDIDATES_PER_POOL`]
1164    /// distinct candidate `pub_keys` by construction.
1165    fn derive_distinct_candidate_peer_ids(
1166        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
1167    ) -> Result<Vec<PeerId>> {
1168        let mut candidate_peer_ids = Vec::with_capacity(pool.candidate_nodes.len());
1169        let mut seen = std::collections::HashSet::with_capacity(pool.candidate_nodes.len());
1170        for candidate in &pool.candidate_nodes {
1171            let pid = peer_id_from_public_key_bytes(&candidate.pub_key).map_err(|e| {
1172                Error::Payment(format!(
1173                    "Invalid ML-DSA public key in merkle candidate: {e}"
1174                ))
1175            })?;
1176            if !seen.insert(pid) {
1177                return Err(Error::Payment(
1178                    "Merkle candidate pool rejected: duplicate candidate PeerId. An \
1179                     honest pool has 16 distinct candidate pub_keys; duplicates would \
1180                     let a single real peer satisfy the closeness threshold by being \
1181                     counted multiple times."
1182                        .into(),
1183                ));
1184            }
1185            candidate_peer_ids.push(pid);
1186        }
1187        Ok(candidate_peer_ids)
1188    }
1189
1190    /// Pure-logic closeness check: given the pool's candidate peer IDs and
1191    /// the storer's authoritative network view (closest peers to the pool
1192    /// midpoint), decide whether the pool passes the
1193    /// `CANDIDATE_CLOSENESS_REQUIRED`-of-N threshold.
1194    ///
1195    /// A candidate counts only if its `PeerId` is one of the peers the
1196    /// storer's own network lookup returned (exact set membership). This is
1197    /// the property that makes the gate meaningful: a passing candidate must
1198    /// be a real, reachable peer the live DHT actually routes to and lists
1199    /// among the closest — it cannot be a key fabricated off-network. The
1200    /// leniency in this check is purely the lowered threshold (a majority
1201    /// rather than near-unanimity), which tolerates the closest-set
1202    /// divergence between two nodes' views without admitting fabricated keys.
1203    ///
1204    /// Extracted from `verify_merkle_candidate_closeness_inner` so tests
1205    /// can exercise the matching logic without standing up a real DHT.
1206    /// Mirrors the runtime path exactly: same sparse-network short-circuit,
1207    /// same set-membership check, same error strings.
1208    fn check_closeness_match(
1209        candidate_peer_ids: &[PeerId],
1210        network_peer_ids: &[PeerId],
1211        pool_address: &[u8; 32],
1212    ) -> Result<()> {
1213        // Sparse-network short-circuit: if the DHT itself returned fewer
1214        // peers than the closeness threshold, the proof can never pass —
1215        // not because the candidates are forged, but because we don't
1216        // have an authoritative view to compare against. Surface this
1217        // distinct cause so operators can tell "retry once the network
1218        // settles" apart from "this peer sent a forged pool".
1219        if network_peer_ids.len() < Self::CANDIDATE_CLOSENESS_REQUIRED {
1220            debug!(
1221                "Merkle closeness deferred: network lookup returned {} peers \
1222                 for pool midpoint {} (need at least {} to verify)",
1223                network_peer_ids.len(),
1224                hex::encode(pool_address),
1225                Self::CANDIDATE_CLOSENESS_REQUIRED,
1226            );
1227            return Err(Error::Payment(format!(
1228                "Merkle candidate pool rejected: authoritative DHT lookup returned \
1229                 only {} peers, less than the {} required to verify candidate \
1230                 closeness. Retry once the routing table populates further.",
1231                network_peer_ids.len(),
1232                Self::CANDIDATE_CLOSENESS_REQUIRED,
1233            )));
1234        }
1235
1236        // Exact-match membership against the returned closest peers.
1237        // Candidate `PeerId`s are deduplicated upstream, so each match
1238        // corresponds to a distinct peer.
1239        let network_set: std::collections::HashSet<PeerId> =
1240            network_peer_ids.iter().copied().collect();
1241        let matched = candidate_peer_ids
1242            .iter()
1243            .filter(|pid| network_set.contains(pid))
1244            .count();
1245
1246        if matched < Self::CANDIDATE_CLOSENESS_REQUIRED {
1247            debug!(
1248                "Merkle closeness rejected: {matched}/{} candidates match the DHT's closest peers \
1249                 for pool midpoint {} (required: {}, network returned {} peers)",
1250                candidate_peer_ids.len(),
1251                hex::encode(pool_address),
1252                Self::CANDIDATE_CLOSENESS_REQUIRED,
1253                network_peer_ids.len(),
1254            );
1255            return Err(Error::Payment(
1256                "Merkle candidate pool rejected: candidate pub_keys do not match the \
1257                 network's closest peers to the pool midpoint address. Pools must be \
1258                 collected from the pool-address close group, not fabricated off-network."
1259                    .into(),
1260            ));
1261        }
1262
1263        debug!(
1264            "Merkle closeness passed: {matched}/{} candidates matched the DHT's closest peers \
1265             for pool midpoint {}",
1266            candidate_peer_ids.len(),
1267            hex::encode(pool_address),
1268        );
1269        Ok(())
1270    }
1271
1272    #[allow(clippy::too_many_lines)]
1273    async fn verify_merkle_candidate_closeness_inner(
1274        &self,
1275        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
1276    ) -> Result<()> {
1277        // Pre-check: catch malformed/hostile pools (duplicate candidate
1278        // PeerIds) before paying for the Kademlia lookup. Runs in unit
1279        // tests without a P2PNode too.
1280        let candidate_peer_ids = Self::derive_distinct_candidate_peer_ids(pool)?;
1281
1282        // Release the RwLock guard before any await to avoid holding it
1283        // across an iterative Kademlia lookup.
1284        let attached = self.p2p_node.read().as_ref().map(Arc::clone);
1285        let Some(p2p_node) = attached else {
1286            // Production must call attach_p2p_node at startup. Fail CLOSED
1287            // to avoid silently disabling the defence if a startup path
1288            // regresses and loses the attach call. Unit-test builds that
1289            // construct a PaymentVerifier directly without exercising merkle
1290            // verification are opted-in via `test-utils` to fall back to
1291            // fail-open.
1292            #[cfg(any(test, feature = "test-utils"))]
1293            {
1294                crate::logging::warn!(
1295                    "PaymentVerifier: no P2PNode attached; merkle pay-yourself \
1296                     defence SKIPPED (test build). Production startup MUST call \
1297                     PaymentVerifier::attach_p2p_node."
1298                );
1299                return Ok(());
1300            }
1301            #[cfg(not(any(test, feature = "test-utils")))]
1302            {
1303                crate::logging::error!(
1304                    "PaymentVerifier: no P2PNode attached; rejecting merkle \
1305                     payment. This is a node-startup bug — \
1306                     PaymentVerifier::attach_p2p_node must be called before \
1307                     any PUT handler runs."
1308                );
1309                return Err(Error::Payment(
1310                    "Merkle candidate pool rejected: verifier is not wired to \
1311                     the P2P layer; cannot verify candidate closeness."
1312                        .into(),
1313                ));
1314            }
1315        };
1316
1317        let pool_address = pool.midpoint_proof.address();
1318        // Match the client's over-query width. The client's
1319        // `get_merkle_candidate_pool` queries 2 × `CANDIDATES_PER_POOL` peers
1320        // and picks the 16 closest *valid responders* — so legitimate pools
1321        // routinely include peers from positions 17–32 of the network's true
1322        // ranking when the closer peers are slow or NAT-stuck. The storer
1323        // must look at the same window or it will reject honest pools with
1324        // no security benefit.
1325        //
1326        // `pool.candidate_nodes` is currently a fixed-size array of length
1327        // `CANDIDATES_PER_POOL` (= 16), so `.max(...)` always evaluates to
1328        // `CLOSENESS_LOOKUP_WIDTH` today. The compile-time
1329        // `const _: () = assert!(WIDTH >= CANDIDATES_PER_POOL)` in the test
1330        // module pins that invariant. The `.max(...)` form is belt-and-braces
1331        // for a hypothetical future protocol that grows pool size to a
1332        // `Vec`-typed candidate set: the storer would scale its lookup with
1333        // the pool rather than truncating, which would otherwise re-open the
1334        // K-too-small failure mode.
1335        let lookup_count = Self::closeness_lookup_count(pool.candidate_nodes.len());
1336        let network_lookup = p2p_node
1337            .dht_manager()
1338            .find_closest_nodes_network(&pool_address.0, lookup_count);
1339        let network_peers =
1340            match tokio::time::timeout(Self::CLOSENESS_LOOKUP_TIMEOUT, network_lookup).await {
1341                Ok(Ok(peers)) => peers,
1342                Ok(Err(e)) => {
1343                    debug!(
1344                        "Merkle closeness network-lookup failed for pool midpoint {}: {e}",
1345                        hex::encode(pool_address.0),
1346                    );
1347                    return Err(Error::Payment(
1348                        "Merkle candidate pool rejected: could not verify candidate \
1349                     closeness against the authoritative network view."
1350                            .into(),
1351                    ));
1352                }
1353                Err(_) => {
1354                    debug!(
1355                        "Merkle closeness network-lookup timeout ({:?}) for pool midpoint {}",
1356                        Self::CLOSENESS_LOOKUP_TIMEOUT,
1357                        hex::encode(pool_address.0),
1358                    );
1359                    return Err(Error::Payment(
1360                        "Merkle candidate pool rejected: authoritative network lookup \
1361                     timed out. Retry once the network lookup completes."
1362                            .into(),
1363                    ));
1364                }
1365            };
1366
1367        let network_peer_ids: Vec<PeerId> = network_peers.iter().map(|n| n.peer_id).collect();
1368        Self::check_closeness_match(&candidate_peer_ids, &network_peer_ids, &pool_address.0)
1369    }
1370
1371    /// Verify a merkle batch payment proof.
1372    ///
1373    /// This verification flow:
1374    /// 1. Deserialize the `MerklePaymentProof`
1375    /// 2. Check pool cache for previously verified pool hash
1376    /// 3. If not cached, query on-chain for payment info
1377    /// 4. Validate the proof against on-chain data
1378    /// 5. Cache the pool hash for subsequent chunk verifications in the same batch
1379    #[allow(clippy::too_many_lines)]
1380    async fn verify_merkle_payment(
1381        &self,
1382        xorname: &XorName,
1383        proof_bytes: &[u8],
1384        context: VerificationContext,
1385    ) -> Result<()> {
1386        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
1387            debug!(
1388                "Verifying merkle payment for {} ({context:?})",
1389                hex::encode(xorname)
1390            );
1391        }
1392
1393        // Deserialize the merkle proof
1394        let merkle_proof = deserialize_merkle_proof(proof_bytes)
1395            .map_err(|e| Error::Payment(format!("Failed to deserialize merkle proof: {e}")))?;
1396
1397        // Verify the address in the proof matches the xorname being stored
1398        if merkle_proof.address.0 != *xorname {
1399            let proof_hex = hex::encode(merkle_proof.address.0);
1400            let store_hex = hex::encode(xorname);
1401            return Err(Error::Payment(format!(
1402                "Merkle proof address mismatch: proof is for {proof_hex}, but storing {store_hex}"
1403            )));
1404        }
1405
1406        let pool_hash = merkle_proof.winner_pool_hash();
1407
1408        // Run cheap local checks BEFORE expensive on-chain queries.
1409        // This prevents DoS via garbage proofs that trigger RPC lookups.
1410        for candidate in &merkle_proof.winner_pool.candidate_nodes {
1411            if !crate::payment::verify_merkle_candidate_signature(candidate) {
1412                return Err(Error::Payment(format!(
1413                    "Invalid ML-DSA-65 signature on merkle candidate node (reward: {})",
1414                    candidate.reward_address
1415                )));
1416            }
1417        }
1418
1419        // Pay-yourself defence: the candidate pub_keys must map to peers the
1420        // live DHT actually considers closest to the pool midpoint. Without
1421        // this, an attacker can point all 16 reward_address fields at a
1422        // self-owned wallet and drain their own payment. Every storing node
1423        // runs this check against the single `winner_pool` in the proof, so a
1424        // forged pool is rejected everywhere it lands. The pass cache and
1425        // single-flight keyed on pool_hash collapse the Kademlia lookup cost
1426        // within a batch and across concurrent PUTs for the same pool.
1427        //
1428        // ClientPut only: the check interrogates the *live* DHT, but a
1429        // replication receipt's winner pool was sampled from the DHT of the
1430        // original sale. Churn guarantees old pools eventually stop matching
1431        // the current top-K, which would make old records unreplicatable —
1432        // the same failure mode the single-node freshness gate caused on
1433        // DEV-01. See `VerificationContext` for the trade-off discussion.
1434        if context == VerificationContext::ClientPut {
1435            self.verify_merkle_candidate_closeness(&merkle_proof.winner_pool, pool_hash)
1436                .await?;
1437        }
1438
1439        // Check pool cache first
1440        let cached_info = {
1441            let mut pool_cache = self.pool_cache.lock();
1442            pool_cache.get(&pool_hash).cloned()
1443        };
1444
1445        let payment_info = if let Some(info) = cached_info {
1446            debug!("Pool cache hit for hash {}", hex::encode(pool_hash));
1447            info
1448        } else {
1449            // Query on-chain for completed merkle payment
1450            let info =
1451                payment_vault::get_completed_merkle_payment(&self.config.evm.network, pool_hash)
1452                    .await
1453                    .map_err(|e| {
1454                        let pool_hex = hex::encode(pool_hash);
1455                        Error::Payment(format!(
1456                            "Failed to query merkle payment info for pool {pool_hex}: {e}"
1457                        ))
1458                    })?;
1459
1460            let paid_node_addresses: Vec<_> = info
1461                .paidNodeAddresses
1462                .iter()
1463                .map(|pna| (pna.rewardsAddress, usize::from(pna.poolIndex), pna.amount))
1464                .collect();
1465
1466            let on_chain_info = OnChainPaymentInfo {
1467                depth: info.depth,
1468                merkle_payment_timestamp: info.merklePaymentTimestamp,
1469                paid_node_addresses,
1470            };
1471
1472            // Cache the pool info for subsequent chunks in the same batch
1473            {
1474                let mut pool_cache = self.pool_cache.lock();
1475                pool_cache.put(pool_hash, on_chain_info.clone());
1476            }
1477
1478            debug!(
1479                "Queried on-chain merkle payment info for pool {}: depth={}, timestamp={}, paid_nodes={}",
1480                hex::encode(pool_hash),
1481                on_chain_info.depth,
1482                on_chain_info.merkle_payment_timestamp,
1483                on_chain_info.paid_node_addresses.len()
1484            );
1485
1486            on_chain_info
1487        };
1488
1489        // Verify timestamp consistency (signatures already checked above before RPC).
1490        for candidate in &merkle_proof.winner_pool.candidate_nodes {
1491            if candidate.merkle_payment_timestamp != payment_info.merkle_payment_timestamp {
1492                return Err(Error::Payment(format!(
1493                    "Candidate timestamp mismatch: expected {}, got {} (reward: {})",
1494                    payment_info.merkle_payment_timestamp,
1495                    candidate.merkle_payment_timestamp,
1496                    candidate.reward_address
1497                )));
1498            }
1499        }
1500
1501        // Get the root from the winner pool's midpoint proof
1502        let smart_contract_root = merkle_proof.winner_pool.midpoint_proof.root();
1503
1504        // Verify the cryptographic merkle proofs (address belongs to tree,
1505        // midpoint belongs to tree, roots match, timestamps valid).
1506        evmlib::merkle_payments::verify_merkle_proof(
1507            &merkle_proof.address,
1508            &merkle_proof.data_proof,
1509            &merkle_proof.winner_pool.midpoint_proof,
1510            payment_info.depth,
1511            smart_contract_root,
1512            payment_info.merkle_payment_timestamp,
1513        )
1514        .map_err(|e| {
1515            let xorname_hex = hex::encode(xorname);
1516            Error::Payment(format!(
1517                "Merkle proof verification failed for {xorname_hex}: {e}"
1518            ))
1519        })?;
1520
1521        // Verify paid node count matches depth
1522        let expected_depth = payment_info.depth as usize;
1523        let actual_paid = payment_info.paid_node_addresses.len();
1524        if actual_paid != expected_depth {
1525            return Err(Error::Payment(format!(
1526                "Wrong number of paid nodes: expected {expected_depth}, got {actual_paid}"
1527            )));
1528        }
1529
1530        // Compute expected per-node payment using the contract formula:
1531        // totalAmount = median16(candidate_prices) * (1 << depth)
1532        // amountPerNode = totalAmount / depth
1533        let expected_per_node = if payment_info.depth > 0 {
1534            let mut candidate_prices: Vec<Amount> = merkle_proof
1535                .winner_pool
1536                .candidate_nodes
1537                .iter()
1538                .map(|c| c.price)
1539                .collect();
1540            candidate_prices.sort_unstable(); // ascending
1541                                              // Upper median (index 8 of 16) — matches Solidity's median16 (k = 8)
1542            let median_price = *candidate_prices
1543                .get(candidate_prices.len() / 2)
1544                .ok_or_else(|| Error::Payment("empty candidate pool in merkle proof".into()))?;
1545            let shift = u32::from(payment_info.depth);
1546            let multiplier = 1u64
1547                .checked_shl(shift)
1548                .ok_or_else(|| Error::Payment("merkle proof depth too large".into()))?;
1549            let total_amount = median_price * Amount::from(multiplier);
1550            total_amount / Amount::from(u64::from(payment_info.depth))
1551        } else {
1552            Amount::ZERO
1553        };
1554
1555        // Verify paid node indices, addresses, and amounts against the candidate pool.
1556        //
1557        // Each paid node must:
1558        // 1. Have a valid index within the candidate pool
1559        // 2. Match the expected reward address at that index
1560        // 3. Have been paid at least the expected per-node amount from the
1561        //    contract formula: median16(prices) * 2^depth / depth
1562        //
1563        // Note: unlike single-node payments, merkle proofs are NOT bound to a
1564        // specific storing node. The contract pays `depth` random nodes from the
1565        // winner pool; the storing node is whichever close-group peer the client
1566        // routes the chunk to. There is no local-recipient check here because
1567        // any node that can verify the merkle proof is allowed to store the chunk.
1568        // Replay protection comes from the per-address proof binding (each proof
1569        // is for a specific XorName in the paid tree).
1570        for (addr, idx, paid_amount) in &payment_info.paid_node_addresses {
1571            let node = merkle_proof
1572                .winner_pool
1573                .candidate_nodes
1574                .get(*idx)
1575                .ok_or_else(|| {
1576                    Error::Payment(format!(
1577                        "Paid node index {idx} out of bounds for pool size {}",
1578                        merkle_proof.winner_pool.candidate_nodes.len()
1579                    ))
1580                })?;
1581            if node.reward_address != *addr {
1582                return Err(Error::Payment(format!(
1583                    "Paid node address mismatch at index {idx}: expected {addr}, got {}",
1584                    node.reward_address
1585                )));
1586            }
1587            if *paid_amount < expected_per_node {
1588                return Err(Error::Payment(format!(
1589                    "Underpayment for node at index {idx}: paid {paid_amount}, \
1590                     expected at least {expected_per_node} \
1591                     (median16 formula, depth={})",
1592                    payment_info.depth
1593                )));
1594            }
1595        }
1596
1597        if crate::logging::enabled!(crate::logging::Level::INFO) {
1598            info!(
1599                "Merkle payment verified for {} (pool: {})",
1600                hex::encode(xorname),
1601                hex::encode(pool_hash)
1602            );
1603        }
1604
1605        Ok(())
1606    }
1607
1608    /// Verify this node is among the paid recipients.
1609    fn validate_local_recipient(&self, payment: &ProofOfPayment) -> Result<()> {
1610        let local_addr = &self.config.local_rewards_address;
1611        let is_recipient = payment
1612            .peer_quotes
1613            .iter()
1614            .any(|(_, quote)| quote.rewards_address == *local_addr);
1615        if !is_recipient {
1616            return Err(Error::Payment(
1617                "Payment proof does not include this node as a recipient".to_string(),
1618            ));
1619        }
1620        Ok(())
1621    }
1622}
1623
1624#[cfg(test)]
1625#[allow(clippy::expect_used, clippy::panic)]
1626mod tests {
1627    use super::*;
1628    use evmlib::merkle_payments::MerklePaymentCandidatePool;
1629    use std::time::SystemTime;
1630
1631    /// Create a verifier for unit tests. EVM is always on, but tests can
1632    /// pre-populate the cache to bypass on-chain verification.
1633    fn create_test_verifier() -> PaymentVerifier {
1634        let config = PaymentVerifierConfig {
1635            evm: EvmVerifierConfig::default(),
1636            cache_capacity: 100,
1637            local_rewards_address: RewardsAddress::new([1u8; 20]),
1638        };
1639        PaymentVerifier::new(config)
1640    }
1641
1642    #[test]
1643    fn test_payment_required_for_new_data() {
1644        let verifier = create_test_verifier();
1645        let xorname = [1u8; 32];
1646
1647        // All uncached data requires payment
1648        let status = verifier.check_payment_required(&xorname, VerificationContext::ClientPut);
1649        assert_eq!(status, PaymentStatus::PaymentRequired);
1650    }
1651
1652    #[test]
1653    fn test_cache_hit() {
1654        let verifier = create_test_verifier();
1655        let xorname = [1u8; 32];
1656
1657        // Manually add to cache
1658        verifier.cache.insert(xorname);
1659
1660        // Should return CachedAsVerified
1661        let status = verifier.check_payment_required(&xorname, VerificationContext::ClientPut);
1662        assert_eq!(status, PaymentStatus::CachedAsVerified);
1663    }
1664
1665    #[tokio::test]
1666    async fn test_verify_payment_without_proof_rejected() {
1667        let verifier = create_test_verifier();
1668        let xorname = [1u8; 32];
1669
1670        // No proof provided => should return an error (EVM is always on)
1671        let result = verifier
1672            .verify_payment(&xorname, None, VerificationContext::ClientPut)
1673            .await;
1674        assert!(
1675            result.is_err(),
1676            "Expected Err without proof, got: {result:?}"
1677        );
1678    }
1679
1680    #[tokio::test]
1681    async fn test_verify_payment_cached() {
1682        let verifier = create_test_verifier();
1683        let xorname = [1u8; 32];
1684
1685        // Add to cache — simulates previously-paid data
1686        verifier.cache.insert(xorname);
1687
1688        // Should succeed without payment (cached)
1689        let result = verifier
1690            .verify_payment(&xorname, None, VerificationContext::ClientPut)
1691            .await;
1692        assert!(result.is_ok());
1693        assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified);
1694    }
1695
1696    #[test]
1697    fn test_payment_status_can_store() {
1698        assert!(PaymentStatus::CachedAsVerified.can_store());
1699        assert!(PaymentStatus::PaymentVerified.can_store());
1700        assert!(!PaymentStatus::PaymentRequired.can_store());
1701    }
1702
1703    #[test]
1704    fn test_payment_status_is_cached() {
1705        assert!(PaymentStatus::CachedAsVerified.is_cached());
1706        assert!(!PaymentStatus::PaymentVerified.is_cached());
1707        assert!(!PaymentStatus::PaymentRequired.is_cached());
1708    }
1709
1710    #[tokio::test]
1711    async fn test_cache_preload_bypasses_evm() {
1712        let verifier = create_test_verifier();
1713        let xorname = [42u8; 32];
1714
1715        // Not yet cached — should require payment
1716        assert_eq!(
1717            verifier.check_payment_required(&xorname, VerificationContext::ClientPut),
1718            PaymentStatus::PaymentRequired
1719        );
1720
1721        // Pre-populate cache (simulates a previous successful payment)
1722        verifier.cache.insert(xorname);
1723
1724        // Now the xorname should be cached
1725        assert_eq!(
1726            verifier.check_payment_required(&xorname, VerificationContext::ClientPut),
1727            PaymentStatus::CachedAsVerified
1728        );
1729    }
1730
1731    #[tokio::test]
1732    async fn test_proof_too_small() {
1733        let verifier = create_test_verifier();
1734        let xorname = [1u8; 32];
1735
1736        // Proof smaller than MIN_PAYMENT_PROOF_SIZE_BYTES
1737        let small_proof = vec![0u8; MIN_PAYMENT_PROOF_SIZE_BYTES - 1];
1738        let result = verifier
1739            .verify_payment(&xorname, Some(&small_proof), VerificationContext::ClientPut)
1740            .await;
1741        assert!(result.is_err());
1742        let err_msg = format!("{}", result.expect_err("should fail"));
1743        assert!(
1744            err_msg.contains("too small"),
1745            "Error should mention 'too small': {err_msg}"
1746        );
1747    }
1748
1749    #[tokio::test]
1750    async fn test_proof_too_large() {
1751        let verifier = create_test_verifier();
1752        let xorname = [2u8; 32];
1753
1754        // Proof larger than MAX_PAYMENT_PROOF_SIZE_BYTES
1755        let large_proof = vec![0u8; MAX_PAYMENT_PROOF_SIZE_BYTES + 1];
1756        let result = verifier
1757            .verify_payment(&xorname, Some(&large_proof), VerificationContext::ClientPut)
1758            .await;
1759        assert!(result.is_err());
1760        let err_msg = format!("{}", result.expect_err("should fail"));
1761        assert!(
1762            err_msg.contains("too large"),
1763            "Error should mention 'too large': {err_msg}"
1764        );
1765    }
1766
1767    #[tokio::test]
1768    async fn test_proof_at_min_boundary_unknown_tag() {
1769        let verifier = create_test_verifier();
1770        let xorname = [3u8; 32];
1771
1772        // Exactly MIN_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected
1773        let boundary_proof = vec![0xFFu8; MIN_PAYMENT_PROOF_SIZE_BYTES];
1774        let result = verifier
1775            .verify_payment(
1776                &xorname,
1777                Some(&boundary_proof),
1778                VerificationContext::ClientPut,
1779            )
1780            .await;
1781        assert!(result.is_err());
1782        let err_msg = format!("{}", result.expect_err("should fail"));
1783        assert!(
1784            err_msg.contains("Unknown payment proof type tag"),
1785            "Error should mention unknown tag: {err_msg}"
1786        );
1787    }
1788
1789    #[tokio::test]
1790    async fn test_proof_at_max_boundary_unknown_tag() {
1791        let verifier = create_test_verifier();
1792        let xorname = [4u8; 32];
1793
1794        // Exactly MAX_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected
1795        let boundary_proof = vec![0xFFu8; MAX_PAYMENT_PROOF_SIZE_BYTES];
1796        let result = verifier
1797            .verify_payment(
1798                &xorname,
1799                Some(&boundary_proof),
1800                VerificationContext::ClientPut,
1801            )
1802            .await;
1803        assert!(result.is_err());
1804        let err_msg = format!("{}", result.expect_err("should fail"));
1805        assert!(
1806            err_msg.contains("Unknown payment proof type tag"),
1807            "Error should mention unknown tag: {err_msg}"
1808        );
1809    }
1810
1811    #[tokio::test]
1812    async fn test_malformed_single_node_proof() {
1813        let verifier = create_test_verifier();
1814        let xorname = [5u8; 32];
1815
1816        // Valid tag (0x01) but garbage payload — should fail deserialization
1817        let mut garbage = vec![crate::ant_protocol::PROOF_TAG_SINGLE_NODE];
1818        garbage.extend_from_slice(&[0xAB; 63]);
1819        let result = verifier
1820            .verify_payment(&xorname, Some(&garbage), VerificationContext::ClientPut)
1821            .await;
1822        assert!(result.is_err());
1823        let err_msg = format!("{}", result.expect_err("should fail"));
1824        assert!(
1825            err_msg.contains("deserialize") || err_msg.contains("Failed"),
1826            "Error should mention deserialization failure: {err_msg}"
1827        );
1828    }
1829
1830    #[test]
1831    fn test_cache_len_getter() {
1832        let verifier = create_test_verifier();
1833        assert_eq!(verifier.cache_len(), 0);
1834
1835        verifier.cache.insert([10u8; 32]);
1836        assert_eq!(verifier.cache_len(), 1);
1837
1838        verifier.cache.insert([20u8; 32]);
1839        assert_eq!(verifier.cache_len(), 2);
1840    }
1841
1842    #[test]
1843    fn test_cache_stats_after_operations() {
1844        let verifier = create_test_verifier();
1845        let xorname = [7u8; 32];
1846
1847        // Miss
1848        verifier.check_payment_required(&xorname, VerificationContext::ClientPut);
1849        let stats = verifier.cache_stats();
1850        assert_eq!(stats.misses, 1);
1851        assert_eq!(stats.hits, 0);
1852
1853        // Insert and hit
1854        verifier.cache.insert(xorname);
1855        verifier.check_payment_required(&xorname, VerificationContext::ClientPut);
1856        let stats = verifier.cache_stats();
1857        assert_eq!(stats.hits, 1);
1858        assert_eq!(stats.misses, 1);
1859        assert_eq!(stats.additions, 1);
1860    }
1861
1862    #[tokio::test]
1863    async fn test_concurrent_cache_lookups() {
1864        let verifier = std::sync::Arc::new(create_test_verifier());
1865
1866        // Pre-populate cache for all 10 xornames
1867        for i in 0..10u8 {
1868            verifier.cache.insert([i; 32]);
1869        }
1870
1871        let mut handles = Vec::new();
1872        for i in 0..10u8 {
1873            let v = verifier.clone();
1874            handles.push(tokio::spawn(async move {
1875                let xorname = [i; 32];
1876                v.verify_payment(&xorname, None, VerificationContext::ClientPut)
1877                    .await
1878            }));
1879        }
1880
1881        for handle in handles {
1882            let result = handle.await.expect("task panicked");
1883            assert!(result.is_ok());
1884            assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified);
1885        }
1886
1887        assert_eq!(verifier.cache_len(), 10);
1888    }
1889
1890    #[test]
1891    fn test_default_evm_config() {
1892        let _config = EvmVerifierConfig::default();
1893        // EVM is always on — default network is ArbitrumOne
1894    }
1895
1896    #[test]
1897    fn test_real_ml_dsa_proof_size_within_limits() {
1898        use crate::payment::metrics::QuotingMetricsTracker;
1899        use crate::payment::proof::PaymentProof;
1900        use crate::payment::quote::{QuoteGenerator, XorName};
1901        use alloy::primitives::FixedBytes;
1902        use evmlib::{EncodedPeerId, RewardsAddress};
1903        use saorsa_core::MlDsa65;
1904        use saorsa_pqc::pqc::types::MlDsaSecretKey;
1905        use saorsa_pqc::pqc::MlDsaOperations;
1906
1907        let ml_dsa = MlDsa65::new();
1908        let mut peer_quotes = Vec::new();
1909
1910        for i in 0..5u8 {
1911            let (public_key, secret_key) = ml_dsa.generate_keypair().expect("keygen");
1912
1913            let rewards_address = RewardsAddress::new([i; 20]);
1914            let metrics_tracker = QuotingMetricsTracker::new(0);
1915            let mut generator = QuoteGenerator::new(rewards_address, metrics_tracker);
1916
1917            let pub_key_bytes = public_key.as_bytes().to_vec();
1918            let sk_bytes = secret_key.as_bytes().to_vec();
1919            generator.set_signer(pub_key_bytes, move |msg| {
1920                let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("sk parse");
1921                let ml_dsa = MlDsa65::new();
1922                ml_dsa.sign(&sk, msg).expect("sign").as_bytes().to_vec()
1923            });
1924
1925            let content: XorName = [i; 32];
1926            let quote = generator.create_quote(content, 4096, 0).expect("quote");
1927
1928            peer_quotes.push((EncodedPeerId::new(rand::random()), quote));
1929        }
1930
1931        let proof = PaymentProof {
1932            proof_of_payment: ProofOfPayment { peer_quotes },
1933            tx_hashes: vec![FixedBytes::from([0xABu8; 32])],
1934        };
1935
1936        let proof_bytes =
1937            crate::payment::proof::serialize_single_node_proof(&proof).expect("serialize");
1938
1939        // 7 ML-DSA-65 quotes with ~1952-byte pub keys and ~3309-byte signatures
1940        // should produce a proof in the 30-80 KB range
1941        assert!(
1942            proof_bytes.len() > 20_000,
1943            "Real 7-quote ML-DSA proof should be > 20 KB, got {} bytes",
1944            proof_bytes.len()
1945        );
1946        assert!(
1947            proof_bytes.len() < MAX_PAYMENT_PROOF_SIZE_BYTES,
1948            "Real 7-quote ML-DSA proof ({} bytes) should fit within {} byte limit",
1949            proof_bytes.len(),
1950            MAX_PAYMENT_PROOF_SIZE_BYTES
1951        );
1952    }
1953
1954    #[tokio::test]
1955    async fn test_content_address_mismatch_rejected() {
1956        use crate::payment::proof::{serialize_single_node_proof, PaymentProof};
1957        use evmlib::{EncodedPeerId, PaymentQuote, RewardsAddress};
1958        use std::time::SystemTime;
1959
1960        let verifier = create_test_verifier();
1961
1962        // The xorname we're trying to store
1963        let target_xorname = [0xAAu8; 32];
1964
1965        // Create a quote for a DIFFERENT xorname
1966        let wrong_xorname = [0xBBu8; 32];
1967        let quote = PaymentQuote {
1968            content: xor_name::XorName(wrong_xorname),
1969            timestamp: SystemTime::now(),
1970            price: Amount::from(1u64),
1971            rewards_address: RewardsAddress::new([1u8; 20]),
1972            pub_key: vec![0u8; 64],
1973            signature: vec![0u8; 64],
1974        };
1975
1976        // Build CLOSE_GROUP_SIZE quotes with distinct peer IDs
1977        let mut peer_quotes = Vec::new();
1978        for _ in 0..CLOSE_GROUP_SIZE {
1979            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1980        }
1981
1982        let proof = PaymentProof {
1983            proof_of_payment: ProofOfPayment { peer_quotes },
1984            tx_hashes: vec![],
1985        };
1986
1987        let proof_bytes = serialize_single_node_proof(&proof).expect("serialize proof");
1988
1989        let result = verifier
1990            .verify_payment(
1991                &target_xorname,
1992                Some(&proof_bytes),
1993                VerificationContext::ClientPut,
1994            )
1995            .await;
1996
1997        assert!(result.is_err(), "Should reject mismatched content address");
1998        let err_msg = format!("{}", result.expect_err("should be error"));
1999        assert!(
2000            err_msg.contains("content address mismatch"),
2001            "Error should mention 'content address mismatch': {err_msg}"
2002        );
2003    }
2004
2005    /// Helper: create a fake quote with the given xorname and timestamp.
2006    fn make_fake_quote(
2007        xorname: [u8; 32],
2008        timestamp: SystemTime,
2009        rewards_address: RewardsAddress,
2010    ) -> evmlib::PaymentQuote {
2011        use evmlib::PaymentQuote;
2012
2013        PaymentQuote {
2014            content: xor_name::XorName(xorname),
2015            timestamp,
2016            price: Amount::from(1u64),
2017            rewards_address,
2018            pub_key: vec![0u8; 64],
2019            signature: vec![0u8; 64],
2020        }
2021    }
2022
2023    /// Helper: create a fake quote whose price encodes the supplied record count.
2024    fn make_fake_quote_at_records(
2025        xorname: [u8; 32],
2026        timestamp: SystemTime,
2027        rewards_address: RewardsAddress,
2028        records: usize,
2029    ) -> evmlib::PaymentQuote {
2030        let mut quote = make_fake_quote(xorname, timestamp, rewards_address);
2031        quote.price = crate::payment::pricing::calculate_price(records);
2032        quote
2033    }
2034
2035    /// A small upward record drift between quoting and verifying — the normal
2036    /// in-flight churn on a busy network — must pass. The old fixed 5-record
2037    /// tolerance rejected a drift of 10 as "stale by 10 records"; the
2038    /// price-based gate sees a negligible price move on the near-flat curve and
2039    /// accepts it.
2040    #[test]
2041    fn test_small_record_drift_accepted() {
2042        use evmlib::{EncodedPeerId, RewardsAddress};
2043
2044        let verifier = create_test_verifier();
2045        // Node gained 10 records since quoting (100 -> 110).
2046        verifier.set_records_stored_for_tests(110);
2047        let self_id: [u8; 32] = rand::random();
2048        verifier.set_peer_id_for_tests(self_id);
2049        let quote = make_fake_quote_at_records(
2050            [0xE0u8; 32],
2051            SystemTime::now(),
2052            RewardsAddress::new([1u8; 20]),
2053            100,
2054        );
2055        let payment = ProofOfPayment {
2056            peer_quotes: vec![(EncodedPeerId::new(self_id), quote)],
2057        };
2058
2059        verifier
2060            .validate_quote_freshness(&payment)
2061            .expect("benign in-flight drift should pass");
2062    }
2063
2064    /// Over-payment must always be accepted: the node had MORE records when it
2065    /// quoted than it does now (e.g. it pruned), so the client paid for a
2066    /// fuller, pricier node. The old symmetric `abs_diff` gate wrongly rejected
2067    /// this; ~36% of STG-01 rejections were exactly this case.
2068    #[test]
2069    fn test_overpayment_accepted() {
2070        use evmlib::{EncodedPeerId, RewardsAddress};
2071
2072        let verifier = create_test_verifier();
2073        // Quote priced at 6000 records, but node now holds only 100.
2074        verifier.set_records_stored_for_tests(100);
2075        let self_id: [u8; 32] = rand::random();
2076        verifier.set_peer_id_for_tests(self_id);
2077        let quote = make_fake_quote_at_records(
2078            [0xE2u8; 32],
2079            SystemTime::now(),
2080            RewardsAddress::new([1u8; 20]),
2081            6000,
2082        );
2083        let payment = ProofOfPayment {
2084            peer_quotes: vec![(EncodedPeerId::new(self_id), quote)],
2085        };
2086
2087        verifier
2088            .validate_quote_freshness(&payment)
2089            .expect("over-payment must never be rejected");
2090    }
2091
2092    /// Genuine staleness — a quote that under-prices the node's current fullness
2093    /// by far more than the tolerance — is still rejected. Quote encodes 100
2094    /// records but the node now holds 6000, so the quadratic curve makes the
2095    /// paid price a small fraction of the current price.
2096    #[test]
2097    fn test_underpriced_quote_rejected() {
2098        use evmlib::{EncodedPeerId, RewardsAddress};
2099
2100        let verifier = create_test_verifier();
2101        verifier.set_records_stored_for_tests(6000);
2102        let self_id: [u8; 32] = rand::random();
2103        verifier.set_peer_id_for_tests(self_id);
2104        let quote = make_fake_quote_at_records(
2105            [0xE1u8; 32],
2106            SystemTime::now(),
2107            RewardsAddress::new([1u8; 20]),
2108            100,
2109        );
2110        let payment = ProofOfPayment {
2111            peer_quotes: vec![(EncodedPeerId::new(self_id), quote)],
2112        };
2113
2114        let err = verifier
2115            .validate_quote_freshness(&payment)
2116            .expect_err("a quote underpricing by >25% should fail");
2117        assert!(format!("{err}").contains("stale"));
2118    }
2119
2120    /// Regression test for the PROD-UL-01 `DataMap` failure (2026-06-04): a
2121    /// close group whose fullness spans 47..=1788 records produces a bundle
2122    /// where the emptiest node's honest quote prices far below a full node's
2123    /// 75% floor. The verifying node must gate only its OWN quote — a
2124    /// neighbour's cheap-but-honest quote is not evidence of staleness.
2125    #[test]
2126    fn test_neighbour_cheap_quote_not_rejected() {
2127        use evmlib::{EncodedPeerId, RewardsAddress};
2128
2129        let verifier = create_test_verifier();
2130        // This node holds 1788 records (the fullest rejector in the incident).
2131        verifier.set_records_stored_for_tests(1788);
2132        let self_id: [u8; 32] = rand::random();
2133        verifier.set_peer_id_for_tests(self_id);
2134
2135        let xorname = [0xE3u8; 32];
2136        let rewards = RewardsAddress::new([1u8; 20]);
2137        // Own quote is fresh: priced at our own current fullness.
2138        let own_quote = make_fake_quote_at_records(xorname, SystemTime::now(), rewards, 1788);
2139        // Neighbour quotes from a heterogeneous close group, including a
2140        // nearly-empty node at 47 records (price far below our 75% floor).
2141        let neighbour_47 = make_fake_quote_at_records(xorname, SystemTime::now(), rewards, 47);
2142        let neighbour_978 = make_fake_quote_at_records(xorname, SystemTime::now(), rewards, 978);
2143
2144        let payment = ProofOfPayment {
2145            peer_quotes: vec![
2146                (EncodedPeerId::new(rand::random()), neighbour_47),
2147                (EncodedPeerId::new(self_id), own_quote),
2148                (EncodedPeerId::new(rand::random()), neighbour_978),
2149            ],
2150        };
2151
2152        verifier
2153            .validate_quote_freshness(&payment)
2154            .expect("neighbours' cheaper quotes must not trip this node's own staleness gate");
2155    }
2156
2157    /// The own-quote gate still bites: if THIS node's own quote in the bundle
2158    /// underprices its current fullness beyond tolerance, the payment is
2159    /// rejected even when every neighbour quote looks expensive.
2160    #[test]
2161    fn test_own_stale_quote_still_rejected_among_neighbours() {
2162        use evmlib::{EncodedPeerId, RewardsAddress};
2163
2164        let verifier = create_test_verifier();
2165        verifier.set_records_stored_for_tests(6000);
2166        let self_id: [u8; 32] = rand::random();
2167        verifier.set_peer_id_for_tests(self_id);
2168
2169        let xorname = [0xE4u8; 32];
2170        let rewards = RewardsAddress::new([1u8; 20]);
2171        let own_stale = make_fake_quote_at_records(xorname, SystemTime::now(), rewards, 100);
2172        let neighbour = make_fake_quote_at_records(xorname, SystemTime::now(), rewards, 7000);
2173
2174        let payment = ProofOfPayment {
2175            peer_quotes: vec![
2176                (EncodedPeerId::new(rand::random()), neighbour),
2177                (EncodedPeerId::new(self_id), own_stale),
2178            ],
2179        };
2180
2181        let err = verifier
2182            .validate_quote_freshness(&payment)
2183            .expect_err("own underpriced quote must still be rejected");
2184        assert!(format!("{err}").contains("stale"));
2185    }
2186
2187    /// Without a self peer-id source (no `P2PNode` attached, no test override)
2188    /// the gate skips rather than rejecting — mirroring the missing
2189    /// record-count-source behaviour.
2190    #[test]
2191    fn test_freshness_skipped_without_self_peer_id() {
2192        use evmlib::{EncodedPeerId, RewardsAddress};
2193
2194        let verifier = create_test_verifier();
2195        verifier.set_records_stored_for_tests(6000);
2196        // NOTE: no set_peer_id_for_tests call.
2197        let quote = make_fake_quote_at_records(
2198            [0xE5u8; 32],
2199            SystemTime::now(),
2200            RewardsAddress::new([1u8; 20]),
2201            100,
2202        );
2203        let payment = ProofOfPayment {
2204            peer_quotes: vec![(EncodedPeerId::new(rand::random()), quote)],
2205        };
2206
2207        verifier
2208            .validate_quote_freshness(&payment)
2209            .expect("gate must fail open when self identity is unknown");
2210    }
2211
2212    /// Helper: wrap quotes into a tagged serialized `PaymentProof`.
2213    fn serialize_proof(peer_quotes: Vec<(evmlib::EncodedPeerId, evmlib::PaymentQuote)>) -> Vec<u8> {
2214        use crate::payment::proof::{serialize_single_node_proof, PaymentProof};
2215
2216        let proof = PaymentProof {
2217            proof_of_payment: ProofOfPayment { peer_quotes },
2218            tx_hashes: vec![],
2219        };
2220        serialize_single_node_proof(&proof).expect("serialize proof")
2221    }
2222
2223    #[tokio::test]
2224    async fn test_old_quote_uses_storage_delta_not_timestamp() {
2225        use evmlib::{EncodedPeerId, RewardsAddress};
2226        use std::time::Duration;
2227
2228        let verifier = create_test_verifier();
2229        let xorname = [0xCCu8; 32];
2230        let rewards_addr = RewardsAddress::new([1u8; 20]);
2231
2232        // Create a quote that's 25 hours old (exceeds 24-hour max)
2233        let old_timestamp = SystemTime::now() - Duration::from_secs(25 * 3600);
2234        let quote = make_fake_quote(xorname, old_timestamp, rewards_addr);
2235
2236        let mut peer_quotes = Vec::new();
2237        for _ in 0..CLOSE_GROUP_SIZE {
2238            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2239        }
2240
2241        let proof_bytes = serialize_proof(peer_quotes);
2242        let result = verifier
2243            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2244            .await;
2245
2246        let err_msg = format!("{}", result.expect_err("should fail at later check"));
2247        assert!(
2248            !err_msg.contains("expired"),
2249            "Should not reject by timestamp age: {err_msg}"
2250        );
2251    }
2252
2253    #[tokio::test]
2254    async fn test_future_quote_uses_storage_delta_not_timestamp() {
2255        use evmlib::{EncodedPeerId, RewardsAddress};
2256        use std::time::Duration;
2257
2258        let verifier = create_test_verifier();
2259        let xorname = [0xDDu8; 32];
2260        let rewards_addr = RewardsAddress::new([1u8; 20]);
2261
2262        // Create a quote with a timestamp 1 hour in the future
2263        let future_timestamp = SystemTime::now() + Duration::from_secs(3600);
2264        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
2265
2266        let mut peer_quotes = Vec::new();
2267        for _ in 0..CLOSE_GROUP_SIZE {
2268            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2269        }
2270
2271        let proof_bytes = serialize_proof(peer_quotes);
2272        let result = verifier
2273            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2274            .await;
2275
2276        let err_msg = format!("{}", result.expect_err("should fail at later check"));
2277        assert!(
2278            !err_msg.contains("future"),
2279            "Should not reject by future timestamp: {err_msg}"
2280        );
2281    }
2282
2283    #[tokio::test]
2284    async fn test_quote_within_clock_skew_tolerance_accepted() {
2285        use evmlib::{EncodedPeerId, RewardsAddress};
2286        use std::time::Duration;
2287
2288        let verifier = create_test_verifier();
2289        let xorname = [0xD1u8; 32];
2290        let rewards_addr = RewardsAddress::new([1u8; 20]);
2291
2292        // Quote 30 seconds in the future — well within 300s tolerance
2293        let future_timestamp = SystemTime::now() + Duration::from_secs(30);
2294        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
2295
2296        let mut peer_quotes = Vec::new();
2297        for _ in 0..CLOSE_GROUP_SIZE {
2298            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2299        }
2300
2301        let proof_bytes = serialize_proof(peer_quotes);
2302        let result = verifier
2303            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2304            .await;
2305
2306        // Should NOT fail at timestamp check (will fail later at pub_key binding)
2307        let err_msg = format!("{}", result.expect_err("should fail at later check"));
2308        assert!(
2309            !err_msg.contains("future"),
2310            "Should pass timestamp check (within tolerance), but got: {err_msg}"
2311        );
2312    }
2313
2314    #[tokio::test]
2315    async fn test_quote_beyond_clock_skew_still_uses_storage_delta() {
2316        use evmlib::{EncodedPeerId, RewardsAddress};
2317        use std::time::Duration;
2318
2319        let verifier = create_test_verifier();
2320        let xorname = [0xD2u8; 32];
2321        let rewards_addr = RewardsAddress::new([1u8; 20]);
2322
2323        // Quote 360 seconds in the future — exceeds 300s tolerance
2324        let future_timestamp = SystemTime::now() + Duration::from_secs(360);
2325        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
2326
2327        let mut peer_quotes = Vec::new();
2328        for _ in 0..CLOSE_GROUP_SIZE {
2329            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2330        }
2331
2332        let proof_bytes = serialize_proof(peer_quotes);
2333        let result = verifier
2334            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2335            .await;
2336
2337        let err_msg = format!("{}", result.expect_err("should fail at later check"));
2338        assert!(
2339            !err_msg.contains("future"),
2340            "Should not reject by future timestamp: {err_msg}"
2341        );
2342    }
2343
2344    #[tokio::test]
2345    async fn test_quote_23h_old_still_accepted() {
2346        use evmlib::{EncodedPeerId, RewardsAddress};
2347        use std::time::Duration;
2348
2349        let verifier = create_test_verifier();
2350        let xorname = [0xD3u8; 32];
2351        let rewards_addr = RewardsAddress::new([1u8; 20]);
2352
2353        // Quote 23 hours old — within 24h max age
2354        let old_timestamp = SystemTime::now() - Duration::from_secs(23 * 3600);
2355        let quote = make_fake_quote(xorname, old_timestamp, rewards_addr);
2356
2357        let mut peer_quotes = Vec::new();
2358        for _ in 0..CLOSE_GROUP_SIZE {
2359            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2360        }
2361
2362        let proof_bytes = serialize_proof(peer_quotes);
2363        let result = verifier
2364            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2365            .await;
2366
2367        // Should NOT fail at timestamp check (will fail later at pub_key binding)
2368        let err_msg = format!("{}", result.expect_err("should fail at later check"));
2369        assert!(
2370            !err_msg.contains("expired"),
2371            "Should pass expiry check (23h < 24h), but got: {err_msg}"
2372        );
2373    }
2374
2375    /// Helper: build an `EncodedPeerId` that matches the BLAKE3 hash of an ML-DSA public key.
2376    fn encoded_peer_id_for_pub_key(pub_key: &[u8]) -> evmlib::EncodedPeerId {
2377        let ant_peer_id = peer_id_from_public_key_bytes(pub_key).expect("valid ML-DSA pub key");
2378        evmlib::EncodedPeerId::new(*ant_peer_id.as_bytes())
2379    }
2380
2381    #[tokio::test]
2382    async fn test_local_not_in_paid_set_rejected() {
2383        use evmlib::RewardsAddress;
2384        use saorsa_core::MlDsa65;
2385        use saorsa_pqc::pqc::MlDsaOperations;
2386
2387        // Verifier with a local rewards address set
2388        let local_addr = RewardsAddress::new([0xAAu8; 20]);
2389        let config = PaymentVerifierConfig {
2390            evm: EvmVerifierConfig {
2391                network: EvmNetwork::ArbitrumOne,
2392            },
2393            cache_capacity: 100,
2394            local_rewards_address: local_addr,
2395        };
2396        let verifier = PaymentVerifier::new(config);
2397
2398        let xorname = [0xEEu8; 32];
2399        // Quotes pay a DIFFERENT rewards address
2400        let other_addr = RewardsAddress::new([0xBBu8; 20]);
2401
2402        // Use real ML-DSA keys so the pub_key→peer_id binding check passes
2403        let ml_dsa = MlDsa65::new();
2404        let mut peer_quotes = Vec::new();
2405        for _ in 0..CLOSE_GROUP_SIZE {
2406            let (public_key, _secret_key) = ml_dsa.generate_keypair().expect("keygen");
2407            let pub_key_bytes = public_key.as_bytes().to_vec();
2408            let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes);
2409
2410            let mut quote = make_fake_quote(xorname, SystemTime::now(), other_addr);
2411            quote.pub_key = pub_key_bytes;
2412
2413            peer_quotes.push((encoded, quote));
2414        }
2415
2416        let proof_bytes = serialize_proof(peer_quotes);
2417        let result = verifier
2418            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2419            .await;
2420
2421        assert!(result.is_err(), "Should reject payment not addressed to us");
2422        let err_msg = format!("{}", result.expect_err("should fail"));
2423        assert!(
2424            err_msg.contains("does not include this node as a recipient"),
2425            "Error should mention recipient rejection: {err_msg}"
2426        );
2427    }
2428
2429    #[tokio::test]
2430    async fn test_wrong_peer_binding_rejected() {
2431        use evmlib::{EncodedPeerId, RewardsAddress};
2432        use saorsa_core::MlDsa65;
2433        use saorsa_pqc::pqc::MlDsaOperations;
2434
2435        let verifier = create_test_verifier();
2436        let xorname = [0xFFu8; 32];
2437        let rewards_addr = RewardsAddress::new([1u8; 20]);
2438
2439        // Generate a real ML-DSA keypair so pub_key is valid
2440        let ml_dsa = MlDsa65::new();
2441        let (public_key, _secret_key) = ml_dsa.generate_keypair().expect("keygen");
2442        let pub_key_bytes = public_key.as_bytes().to_vec();
2443
2444        // Create a quote with a real pub_key but attach it to a random peer ID
2445        // whose identity multihash does NOT match BLAKE3(pub_key)
2446        let mut quote = make_fake_quote(xorname, SystemTime::now(), rewards_addr);
2447        quote.pub_key = pub_key_bytes;
2448
2449        // Use random ed25519 peer IDs — they won't match BLAKE3(pub_key)
2450        let mut peer_quotes = Vec::new();
2451        for _ in 0..CLOSE_GROUP_SIZE {
2452            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2453        }
2454
2455        let proof_bytes = serialize_proof(peer_quotes);
2456        let result = verifier
2457            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2458            .await;
2459
2460        assert!(result.is_err(), "Should reject wrong peer binding");
2461        let err_msg = format!("{}", result.expect_err("should fail"));
2462        assert!(
2463            err_msg.contains("pub_key does not belong to claimed peer"),
2464            "Error should mention binding mismatch: {err_msg}"
2465        );
2466    }
2467
2468    // =========================================================================
2469    // VerificationContext tests — Replication must skip the
2470    // storer-being-paid-now checks (own-quote freshness, local recipient,
2471    // merkle candidate closeness) while keeping every receipt-authenticity
2472    // check. Each test runs the same proof under both contexts and asserts
2473    // the context-gated check fires only under ClientPut. Where a proof
2474    // can't reach Ok(()) without on-chain access, "skipped" is proven by the
2475    // error moving PAST the gated check to a later stage.
2476    // =========================================================================
2477
2478    /// A bundle whose own quote is stale (quoted 100 records, node now holds
2479    /// 6000) is rejected by the freshness gate under `ClientPut`, but under
2480    /// `Replication` the gate is skipped: verification proceeds to the next
2481    /// stage (peer bindings, which fail on the fake `pub_keys`).
2482    #[tokio::test]
2483    async fn test_replication_context_skips_own_quote_freshness() {
2484        use evmlib::{EncodedPeerId, RewardsAddress};
2485
2486        let verifier = create_test_verifier();
2487        verifier.set_records_stored_for_tests(6000);
2488        let self_id: [u8; 32] = rand::random();
2489        verifier.set_peer_id_for_tests(self_id);
2490
2491        let xorname = [0xD0u8; 32];
2492        let rewards = RewardsAddress::new([1u8; 20]);
2493        let own_stale = make_fake_quote_at_records(xorname, SystemTime::now(), rewards, 100);
2494        let mut peer_quotes = vec![(EncodedPeerId::new(self_id), own_stale)];
2495        for _ in 1..CLOSE_GROUP_SIZE {
2496            let neighbour = make_fake_quote_at_records(xorname, SystemTime::now(), rewards, 6000);
2497            peer_quotes.push((EncodedPeerId::new(rand::random()), neighbour));
2498        }
2499        let proof_bytes = serialize_proof(peer_quotes);
2500
2501        let err = verifier
2502            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2503            .await
2504            .expect_err("own stale quote must be rejected on a client PUT");
2505        assert!(
2506            format!("{err}").contains("stale"),
2507            "ClientPut must fail at the freshness gate: {err}"
2508        );
2509
2510        let err = verifier
2511            .verify_payment(
2512                &xorname,
2513                Some(&proof_bytes),
2514                VerificationContext::Replication,
2515            )
2516            .await
2517            .expect_err("fake pub_keys still fail peer bindings");
2518        let msg = format!("{err}");
2519        assert!(
2520            !msg.contains("stale"),
2521            "Replication must skip the freshness gate: {msg}"
2522        );
2523        assert!(
2524            msg.contains("Invalid ML-DSA public key"),
2525            "Replication should fail at the LATER peer-binding stage: {msg}"
2526        );
2527    }
2528
2529    /// A receipt that pays a different node's rewards address is rejected by
2530    /// the local-recipient check under `ClientPut`, but under `Replication`
2531    /// (a post-churn close-group member was never a payee) the check is
2532    /// skipped: verification proceeds to quote-signature verification.
2533    #[tokio::test]
2534    async fn test_replication_context_skips_local_recipient() {
2535        use evmlib::RewardsAddress;
2536        use saorsa_core::MlDsa65;
2537        use saorsa_pqc::pqc::MlDsaOperations;
2538
2539        let local_addr = RewardsAddress::new([0xAAu8; 20]);
2540        let config = PaymentVerifierConfig {
2541            evm: EvmVerifierConfig {
2542                network: EvmNetwork::ArbitrumOne,
2543            },
2544            cache_capacity: 100,
2545            local_rewards_address: local_addr,
2546        };
2547        let verifier = PaymentVerifier::new(config);
2548
2549        let xorname = [0xD1u8; 32];
2550        // Quotes pay a DIFFERENT rewards address.
2551        let other_addr = RewardsAddress::new([0xBBu8; 20]);
2552
2553        // Real ML-DSA keys so the pub_key→peer_id binding check passes and
2554        // the first divergence between contexts is the recipient check.
2555        let ml_dsa = MlDsa65::new();
2556        let mut peer_quotes = Vec::new();
2557        for _ in 0..CLOSE_GROUP_SIZE {
2558            let (public_key, _secret_key) = ml_dsa.generate_keypair().expect("keygen");
2559            let pub_key_bytes = public_key.as_bytes().to_vec();
2560            let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes);
2561            let mut quote = make_fake_quote(xorname, SystemTime::now(), other_addr);
2562            quote.pub_key = pub_key_bytes;
2563            peer_quotes.push((encoded, quote));
2564        }
2565        let proof_bytes = serialize_proof(peer_quotes);
2566
2567        let err = verifier
2568            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2569            .await
2570            .expect_err("payment not addressed to us must fail on a client PUT");
2571        assert!(
2572            format!("{err}").contains("does not include this node as a recipient"),
2573            "ClientPut must fail at the recipient check: {err}"
2574        );
2575
2576        let err = verifier
2577            .verify_payment(
2578                &xorname,
2579                Some(&proof_bytes),
2580                VerificationContext::Replication,
2581            )
2582            .await
2583            .expect_err("fake quote signatures still fail signature verification");
2584        let msg = format!("{err}");
2585        assert!(
2586            !msg.contains("recipient"),
2587            "Replication must skip the recipient check: {msg}"
2588        );
2589        assert!(
2590            msg.contains("signature verification failed"),
2591            "Replication should fail at the LATER signature stage: {msg}"
2592        );
2593    }
2594
2595    /// A `Replication`-verified cache entry must not satisfy a later
2596    /// `ClientPut` fast-path: the context-gated checks were never run for it,
2597    /// so letting it short-circuit a client PUT would bypass them via the
2598    /// cache. It must still satisfy later `Replication` lookups (re-offers of
2599    /// the same key are routine), and a subsequent full `ClientPut`
2600    /// verification upgrades the entry without ever being downgraded back.
2601    #[tokio::test]
2602    async fn test_replication_verified_cache_entry_does_not_satisfy_client_put() {
2603        let verifier = create_test_verifier();
2604        let xorname = [0xD4u8; 32];
2605
2606        // Simulate a successful Replication-context verification.
2607        verifier.cache.insert_replication_verified(xorname);
2608
2609        assert_eq!(
2610            verifier.check_payment_required(&xorname, VerificationContext::Replication),
2611            PaymentStatus::CachedAsVerified,
2612            "replication lookups must hit a replication-verified entry"
2613        );
2614        assert_eq!(
2615            verifier.check_payment_required(&xorname, VerificationContext::ClientPut),
2616            PaymentStatus::PaymentRequired,
2617            "a client PUT must not fast-path on a replication-verified entry"
2618        );
2619
2620        // End-to-end: a proof-less client PUT is still rejected, while a
2621        // proof-less replication re-check passes via the cache.
2622        let result = verifier
2623            .verify_payment(&xorname, None, VerificationContext::Replication)
2624            .await;
2625        assert_eq!(
2626            result.expect("replication re-check should hit the cache"),
2627            PaymentStatus::CachedAsVerified
2628        );
2629        let err = verifier
2630            .verify_payment(&xorname, None, VerificationContext::ClientPut)
2631            .await
2632            .expect_err("proof-less client PUT must not ride the replication entry");
2633        assert!(
2634            format!("{err}").contains("Payment required"),
2635            "client PUT must still demand payment: {err}"
2636        );
2637
2638        // A full ClientPut verification upgrades the entry...
2639        verifier.cache.insert(xorname);
2640        assert_eq!(
2641            verifier.check_payment_required(&xorname, VerificationContext::ClientPut),
2642            PaymentStatus::CachedAsVerified,
2643            "a full client-PUT verification must upgrade the entry"
2644        );
2645
2646        // ...and a later replication re-verification never downgrades it.
2647        verifier.cache.insert_replication_verified(xorname);
2648        assert_eq!(
2649            verifier.check_payment_required(&xorname, VerificationContext::ClientPut),
2650            PaymentStatus::CachedAsVerified,
2651            "replication re-verification must not downgrade a client-PUT entry"
2652        );
2653    }
2654
2655    /// Receipt authenticity is NOT relaxed under `Replication`: a bundle whose
2656    /// quotes are bound to a different content address is rejected in both
2657    /// contexts. A neighbour cannot replay a receipt for chunk A to get
2658    /// chunk B admitted.
2659    #[tokio::test]
2660    async fn test_replication_context_still_rejects_content_mismatch() {
2661        use evmlib::{EncodedPeerId, RewardsAddress};
2662
2663        let verifier = create_test_verifier();
2664        let stored_xorname = [0xD2u8; 32];
2665        let quoted_xorname = [0xD3u8; 32];
2666        let rewards = RewardsAddress::new([1u8; 20]);
2667
2668        let mut peer_quotes = Vec::new();
2669        for _ in 0..CLOSE_GROUP_SIZE {
2670            let quote = make_fake_quote(quoted_xorname, SystemTime::now(), rewards);
2671            peer_quotes.push((EncodedPeerId::new(rand::random()), quote));
2672        }
2673        let proof_bytes = serialize_proof(peer_quotes);
2674
2675        for context in [
2676            VerificationContext::ClientPut,
2677            VerificationContext::Replication,
2678        ] {
2679            let err = verifier
2680                .verify_payment(&stored_xorname, Some(&proof_bytes), context)
2681                .await
2682                .expect_err("content binding must hold in every context");
2683            assert!(
2684                format!("{err}").contains("content address mismatch"),
2685                "{context:?} must reject a receipt for a different address: {err}"
2686            );
2687        }
2688    }
2689
2690    /// The merkle pay-yourself closeness defence (including its duplicate-
2691    /// candidate pre-check, which runs without a `P2PNode`) applies to client
2692    /// PUTs only. Under `Replication` the pool was sampled from the DHT of
2693    /// the original sale, so the live-DHT check is skipped and verification
2694    /// proceeds to the on-chain stages.
2695    #[tokio::test]
2696    async fn test_replication_context_skips_merkle_closeness() {
2697        let verifier = create_test_verifier();
2698
2699        let (mut merkle_proof, _pool_hash, xorname, timestamp) = make_valid_merkle_proof();
2700
2701        // 16 copies of one real candidate: every self-signature is valid, but
2702        // the candidate PeerIds are duplicates — the closeness pre-check
2703        // rejects this pool on a client PUT.
2704        let shared = merkle_proof
2705            .winner_pool
2706            .candidate_nodes
2707            .first()
2708            .expect("candidates")
2709            .clone();
2710        for c in &mut merkle_proof.winner_pool.candidate_nodes {
2711            *c = shared.clone();
2712        }
2713        let pool_hash = merkle_proof.winner_pool_hash();
2714
2715        // Seed the pool cache with a deliberately mismatched timestamp so the
2716        // Replication path fails deterministically AFTER the (skipped)
2717        // closeness check, without needing on-chain access.
2718        {
2719            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2720                depth: 4,
2721                merkle_payment_timestamp: timestamp + 1,
2722                paid_node_addresses: vec![],
2723            };
2724            verifier.pool_cache.lock().put(pool_hash, info);
2725        }
2726
2727        let tagged =
2728            crate::payment::proof::serialize_merkle_proof(&merkle_proof).expect("serialize");
2729
2730        let err = verifier
2731            .verify_payment(&xorname, Some(&tagged), VerificationContext::ClientPut)
2732            .await
2733            .expect_err("duplicate candidate PeerIds must fail the client-PUT closeness check");
2734        assert!(
2735            format!("{err}").contains("duplicate candidate PeerId"),
2736            "ClientPut must fail at the closeness pre-check: {err}"
2737        );
2738
2739        let err = verifier
2740            .verify_payment(&xorname, Some(&tagged), VerificationContext::Replication)
2741            .await
2742            .expect_err("seeded timestamp mismatch still fails after the skipped check");
2743        let msg = format!("{err}");
2744        assert!(
2745            !msg.contains("duplicate candidate PeerId"),
2746            "Replication must skip the closeness check: {msg}"
2747        );
2748        assert!(
2749            msg.contains("timestamp mismatch"),
2750            "Replication should fail at the LATER timestamp stage: {msg}"
2751        );
2752    }
2753
2754    // =========================================================================
2755    // Merkle-tagged proof tests
2756    // =========================================================================
2757
2758    #[tokio::test]
2759    async fn test_merkle_tagged_proof_invalid_data_rejected() {
2760        use crate::ant_protocol::PROOF_TAG_MERKLE;
2761
2762        let verifier = create_test_verifier();
2763        let xorname = [0xA1u8; 32];
2764
2765        // Build a merkle-tagged proof with garbage body.
2766        // The tag byte is correct but the body is not valid msgpack.
2767        let mut merkle_garbage = Vec::with_capacity(64);
2768        merkle_garbage.push(PROOF_TAG_MERKLE);
2769        merkle_garbage.extend_from_slice(&[0xAB; 63]);
2770
2771        let result = verifier
2772            .verify_payment(
2773                &xorname,
2774                Some(&merkle_garbage),
2775                VerificationContext::ClientPut,
2776            )
2777            .await;
2778
2779        assert!(
2780            result.is_err(),
2781            "Should reject merkle proof with invalid body"
2782        );
2783        let err_msg = format!("{}", result.expect_err("should fail"));
2784        assert!(
2785            err_msg.contains("deserialize") || err_msg.contains("merkle proof"),
2786            "Error should mention deserialization failure: {err_msg}"
2787        );
2788    }
2789
2790    #[tokio::test]
2791    async fn test_single_node_tagged_proof_deserialization() {
2792        use crate::payment::proof::serialize_single_node_proof;
2793        use evmlib::{EncodedPeerId, RewardsAddress};
2794
2795        let verifier = create_test_verifier();
2796        let xorname = [0xA2u8; 32];
2797        let rewards_addr = RewardsAddress::new([1u8; 20]);
2798
2799        // Build a valid tagged single-node proof
2800        let quote = make_fake_quote(xorname, SystemTime::now(), rewards_addr);
2801        let mut peer_quotes = Vec::new();
2802        for _ in 0..CLOSE_GROUP_SIZE {
2803            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2804        }
2805
2806        let proof = crate::payment::proof::PaymentProof {
2807            proof_of_payment: ProofOfPayment {
2808                peer_quotes: peer_quotes.clone(),
2809            },
2810            tx_hashes: vec![],
2811        };
2812
2813        let tagged_bytes = serialize_single_node_proof(&proof).expect("serialize tagged proof");
2814
2815        // detect_proof_type should identify it as SingleNode
2816        assert_eq!(
2817            crate::payment::proof::detect_proof_type(&tagged_bytes),
2818            Some(crate::payment::proof::ProofType::SingleNode)
2819        );
2820
2821        // verify_payment should process it through the single-node path.
2822        // It will fail at quote validation (fake pub_key), but we verify
2823        // it passes the deserialization stage by checking the error type.
2824        let result = verifier
2825            .verify_payment(
2826                &xorname,
2827                Some(&tagged_bytes),
2828                VerificationContext::ClientPut,
2829            )
2830            .await;
2831
2832        assert!(result.is_err(), "Should fail at quote validation stage");
2833        let err_msg = format!("{}", result.expect_err("should fail"));
2834        // It should NOT be a deserialization error — it should get further
2835        assert!(
2836            !err_msg.contains("deserialize"),
2837            "Should pass deserialization but fail later: {err_msg}"
2838        );
2839    }
2840
2841    #[test]
2842    fn test_pool_cache_insert_and_lookup() {
2843        use evmlib::merkle_batch_payment::PoolHash;
2844
2845        // Verify the pool_cache field exists and works correctly.
2846        // Insert a pool hash, then verify it's present on lookup.
2847        let verifier = create_test_verifier();
2848
2849        let pool_hash: PoolHash = [0xBBu8; 32];
2850        let payment_info = evmlib::merkle_payments::OnChainPaymentInfo {
2851            depth: 4,
2852            merkle_payment_timestamp: 1_700_000_000,
2853            paid_node_addresses: vec![],
2854        };
2855
2856        // Insert into pool cache
2857        {
2858            let mut cache = verifier.pool_cache.lock();
2859            cache.put(pool_hash, payment_info);
2860        }
2861
2862        // First lookup — should find it
2863        {
2864            let found = verifier.pool_cache.lock().get(&pool_hash).cloned();
2865            assert!(found.is_some(), "Pool hash should be in cache after insert");
2866            let info = found.expect("cached info");
2867            assert_eq!(info.depth, 4);
2868            assert_eq!(info.merkle_payment_timestamp, 1_700_000_000);
2869        }
2870
2871        // Second lookup — same result (no double-query needed)
2872        {
2873            let found = verifier.pool_cache.lock().get(&pool_hash).cloned();
2874            assert!(
2875                found.is_some(),
2876                "Pool hash should still be in cache on second lookup"
2877            );
2878        }
2879
2880        // Different pool hash — should NOT be found
2881        let other_hash: PoolHash = [0xCCu8; 32];
2882        {
2883            let found = verifier.pool_cache.lock().get(&other_hash).cloned();
2884            assert!(found.is_none(), "Unknown pool hash should not be in cache");
2885        }
2886    }
2887
2888    #[tokio::test]
2889    async fn closeness_pass_cache_short_circuits_second_call() {
2890        // When a pool_hash is in the closeness_pass_cache, the outer
2891        // verify_merkle_candidate_closeness must return Ok(()) without
2892        // running the inner lookup — even if no P2PNode is attached.
2893        // That second half (no-p2p → would normally fail-closed in release)
2894        // is the proof the cache short-circuit ran first.
2895        let verifier = create_test_verifier();
2896        let pool_hash = [0xAAu8; 32];
2897        verifier.closeness_pass_cache.lock().put(pool_hash, ());
2898
2899        // Construct a dummy pool — contents don't matter because the cache
2900        // hit means we never look at them.
2901        let pool = MerklePaymentCandidatePool {
2902            midpoint_proof: fake_midpoint_proof(),
2903            candidate_nodes: make_candidate_nodes(1_700_000_000),
2904        };
2905
2906        let result = verifier
2907            .verify_merkle_candidate_closeness(&pool, pool_hash)
2908            .await;
2909        assert!(
2910            result.is_ok(),
2911            "cached pool hash must bypass the inner check and return Ok(()), got: {result:?}"
2912        );
2913    }
2914
2915    #[tokio::test]
2916    async fn closeness_single_flight_concurrent_readers_share_one_verification() {
2917        // Two concurrent callers for the same pool_hash should produce the
2918        // same outcome, and the cache should end up populated exactly once.
2919        // We use the test-utils fail-open path to short-circuit the inner
2920        // DHT lookup; the purpose of this test is the single-flight
2921        // plumbing, not the lookup itself.
2922        let verifier = Arc::new(create_test_verifier());
2923        let pool_hash = [0x77u8; 32];
2924        let pool = MerklePaymentCandidatePool {
2925            midpoint_proof: fake_midpoint_proof(),
2926            candidate_nodes: make_candidate_nodes(1_700_000_000),
2927        };
2928
2929        let v1 = Arc::clone(&verifier);
2930        let p1 = pool.clone();
2931        let v2 = Arc::clone(&verifier);
2932        let p2 = pool.clone();
2933
2934        let (r1, r2) = tokio::join!(
2935            async move { v1.verify_merkle_candidate_closeness(&p1, pool_hash).await },
2936            async move { v2.verify_merkle_candidate_closeness(&p2, pool_hash).await },
2937        );
2938
2939        assert_eq!(r1.is_ok(), r2.is_ok(), "concurrent callers must agree");
2940        assert!(
2941            r1.is_ok(),
2942            "both callers must succeed on the test-utils path"
2943        );
2944        assert!(
2945            verifier
2946                .closeness_pass_cache
2947                .lock()
2948                .get(&pool_hash)
2949                .is_some(),
2950            "success path must populate the pass cache"
2951        );
2952        assert!(
2953            verifier.inflight_closeness.lock().get(&pool_hash).is_none(),
2954            "inflight slot must be cleared after the leader finishes"
2955        );
2956    }
2957
2958    #[tokio::test]
2959    async fn closeness_waiter_reads_leaders_published_failure() {
2960        // Prove the waiter path actually surfaces a failure published by a
2961        // concurrent leader, without running its own inner check. Insert a
2962        // slot, spawn a waiter (which will park on notified_owned), then
2963        // publish failure + notify from the outside — simulating what the
2964        // leader's `publish` + drop-guard pair does.
2965        let verifier = Arc::new(create_test_verifier());
2966        let pool_hash = [0x55u8; 32];
2967        let slot = Arc::new(ClosenessSlot::new());
2968        verifier
2969            .inflight_closeness
2970            .lock()
2971            .put(pool_hash, Arc::clone(&slot));
2972
2973        let pool = MerklePaymentCandidatePool {
2974            midpoint_proof: fake_midpoint_proof(),
2975            candidate_nodes: make_candidate_nodes(1_700_000_000),
2976        };
2977
2978        let verifier_c = Arc::clone(&verifier);
2979        let pool_c = pool.clone();
2980        let waiter = tokio::spawn(async move {
2981            verifier_c
2982                .verify_merkle_candidate_closeness(&pool_c, pool_hash)
2983                .await
2984        });
2985
2986        // Yield so the waiter can run up to its `notified_owned().await`.
2987        // A few yields cover both single-threaded and multi-threaded tokio
2988        // runtimes regardless of scheduling.
2989        for _ in 0..5 {
2990            tokio::task::yield_now().await;
2991        }
2992
2993        // Simulate the leader's `publish` + drop-guard: publish the result,
2994        // clear the slot, wake waiters.
2995        slot.result
2996            .set(Err("forged pool: not close enough".to_string()))
2997            .expect("set once");
2998        verifier.inflight_closeness.lock().pop(&pool_hash);
2999        slot.notify.notify_waiters();
3000
3001        let result = waiter.await.expect("task panicked");
3002        let err = result.expect_err("waiter must return the leader's published failure");
3003        assert!(
3004            err.to_string().contains("forged pool"),
3005            "waiter must surface the leader's error message, got: {err}"
3006        );
3007    }
3008
3009    #[tokio::test]
3010    async fn closeness_rejects_pool_with_duplicate_candidate_pub_keys() {
3011        // An attacker who submits 16 copies of the same real peer's pub_key
3012        // would otherwise satisfy the closeness threshold trivially:
3013        // that one peer's membership in the DHT-returned set would count
3014        // 16 times. The dedupe check in verify_merkle_candidate_closeness_inner
3015        // must reject the pool BEFORE the network lookup runs (so this test
3016        // works even with no P2PNode attached).
3017        let verifier = create_test_verifier();
3018        let pool_hash = [0xDDu8; 32];
3019
3020        // Build a normal pool, then overwrite every candidate's pub_key
3021        // with a single shared key so all 16 derive to the same PeerId.
3022        let mut candidates = make_candidate_nodes(1_700_000_000);
3023        let shared_pub_key = candidates
3024            .first()
3025            .expect("make_candidate_nodes returns CANDIDATES_PER_POOL entries")
3026            .pub_key
3027            .clone();
3028        for c in &mut candidates {
3029            c.pub_key = shared_pub_key.clone();
3030        }
3031        let pool = MerklePaymentCandidatePool {
3032            midpoint_proof: fake_midpoint_proof(),
3033            candidate_nodes: candidates,
3034        };
3035
3036        let result = verifier
3037            .verify_merkle_candidate_closeness(&pool, pool_hash)
3038            .await;
3039        let err = result.expect_err("duplicate candidate PeerIds must be rejected");
3040        let msg = err.to_string();
3041        assert!(
3042            msg.contains("duplicate candidate PeerId"),
3043            "rejection must be the duplicate-PeerId branch, got: {msg}"
3044        );
3045    }
3046
3047    /// Build a deterministic but otherwise-unused `MidpointProof` so unit
3048    /// tests can construct a `MerklePaymentCandidatePool` without spinning
3049    /// up a real merkle tree. The closeness path only calls `.address()`
3050    /// on it, which is a pure BLAKE3 of the branch's leaf/root/timestamp —
3051    /// the values don't need to be tree-valid for these tests.
3052    fn fake_midpoint_proof() -> evmlib::merkle_payments::MidpointProof {
3053        // Build a minimal tree of two leaves so we get a real branch.
3054        let leaves = vec![xor_name::XorName([1u8; 32]), xor_name::XorName([2u8; 32])];
3055        let tree = evmlib::merkle_payments::MerkleTree::from_xornames(leaves).expect("tree");
3056        let candidates = tree.reward_candidates(1_700_000_000).expect("candidates");
3057        candidates.first().expect("at least one").clone()
3058    }
3059
3060    // =========================================================================
3061    // Merkle verification unit tests
3062    // =========================================================================
3063
3064    /// Helper: build 16 validly-signed ML-DSA-65 candidate nodes.
3065    fn make_candidate_nodes(
3066        timestamp: u64,
3067    ) -> [evmlib::merkle_payments::MerklePaymentCandidateNode;
3068           evmlib::merkle_payments::CANDIDATES_PER_POOL] {
3069        use evmlib::merkle_payments::{MerklePaymentCandidateNode, CANDIDATES_PER_POOL};
3070        use saorsa_core::MlDsa65;
3071        use saorsa_pqc::pqc::types::MlDsaSecretKey;
3072        use saorsa_pqc::pqc::MlDsaOperations;
3073
3074        std::array::from_fn::<_, CANDIDATES_PER_POOL, _>(|i| {
3075            let ml_dsa = MlDsa65::new();
3076            let (pub_key, secret_key) = ml_dsa.generate_keypair().expect("keygen");
3077            let price = evmlib::common::Amount::from(1024u64);
3078            #[allow(clippy::cast_possible_truncation)]
3079            let reward_address = RewardsAddress::new([i as u8; 20]);
3080            let msg = MerklePaymentCandidateNode::bytes_to_sign(&price, &reward_address, timestamp);
3081            let sk = MlDsaSecretKey::from_bytes(secret_key.as_bytes()).expect("sk");
3082            let signature = ml_dsa.sign(&sk, &msg).expect("sign").as_bytes().to_vec();
3083
3084            MerklePaymentCandidateNode {
3085                pub_key: pub_key.as_bytes().to_vec(),
3086                price,
3087                reward_address,
3088                merkle_payment_timestamp: timestamp,
3089                signature,
3090            }
3091        })
3092    }
3093
3094    /// Helper: build a valid `MerklePaymentProof` with real ML-DSA-65
3095    /// signatures. Returns the raw proof, pool hash, xorname, and timestamp.
3096    fn make_valid_merkle_proof() -> (
3097        evmlib::merkle_payments::MerklePaymentProof,
3098        evmlib::merkle_batch_payment::PoolHash,
3099        [u8; 32],
3100        u64,
3101    ) {
3102        use evmlib::merkle_payments::{MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree};
3103
3104        let timestamp = std::time::SystemTime::now()
3105            .duration_since(std::time::UNIX_EPOCH)
3106            .expect("system time")
3107            .as_secs();
3108
3109        let addresses: Vec<xor_name::XorName> = (0..4u8)
3110            .map(|i| xor_name::XorName::from_content(&[i]))
3111            .collect();
3112        let tree = MerkleTree::from_xornames(addresses.clone()).expect("tree");
3113
3114        let candidate_nodes = make_candidate_nodes(timestamp);
3115
3116        let reward_candidates = tree
3117            .reward_candidates(timestamp)
3118            .expect("reward candidates");
3119        let midpoint_proof = reward_candidates
3120            .first()
3121            .expect("at least one candidate")
3122            .clone();
3123
3124        let pool = MerklePaymentCandidatePool {
3125            midpoint_proof,
3126            candidate_nodes,
3127        };
3128
3129        let first_address = *addresses.first().expect("first address");
3130        let address_proof = tree
3131            .generate_address_proof(0, first_address)
3132            .expect("proof");
3133
3134        let merkle_proof = MerklePaymentProof::new(first_address, address_proof, pool);
3135        let pool_hash = merkle_proof.winner_pool_hash();
3136        let xorname = first_address.0;
3137
3138        (merkle_proof, pool_hash, xorname, timestamp)
3139    }
3140
3141    /// Helper: build a minimal valid `MerklePaymentProof` with real ML-DSA-65
3142    /// signatures. Returns `(xorname, serialized_tagged_proof, pool_hash, timestamp)`.
3143    fn make_valid_merkle_proof_bytes() -> (
3144        [u8; 32],
3145        Vec<u8>,
3146        evmlib::merkle_batch_payment::PoolHash,
3147        u64,
3148    ) {
3149        let (merkle_proof, pool_hash, xorname, timestamp) = make_valid_merkle_proof();
3150        let tagged = crate::payment::proof::serialize_merkle_proof(&merkle_proof)
3151            .expect("serialize merkle proof");
3152        (xorname, tagged, pool_hash, timestamp)
3153    }
3154
3155    #[tokio::test]
3156    async fn test_merkle_address_mismatch_rejected() {
3157        let verifier = create_test_verifier();
3158        let (_correct_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
3159
3160        // Use a DIFFERENT xorname than what the proof was built for
3161        let wrong_xorname = [0xFFu8; 32];
3162
3163        let result = verifier
3164            .verify_payment(
3165                &wrong_xorname,
3166                Some(&tagged_proof),
3167                VerificationContext::ClientPut,
3168            )
3169            .await;
3170
3171        assert!(
3172            result.is_err(),
3173            "Should reject merkle proof address mismatch"
3174        );
3175        let err_msg = format!("{}", result.expect_err("should fail"));
3176        assert!(
3177            err_msg.contains("address mismatch") || err_msg.contains("Merkle proof address"),
3178            "Error should mention address mismatch: {err_msg}"
3179        );
3180    }
3181
3182    #[tokio::test]
3183    async fn test_merkle_malformed_body_rejected() {
3184        let verifier = create_test_verifier();
3185        let xorname = [0xA3u8; 32];
3186
3187        // Valid merkle tag but truncated/corrupted msgpack body
3188        let mut bad_proof = vec![crate::ant_protocol::PROOF_TAG_MERKLE];
3189        bad_proof.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
3190        bad_proof.extend_from_slice(&[0x00; 10]);
3191        // pad to minimum size
3192        while bad_proof.len() < MIN_PAYMENT_PROOF_SIZE_BYTES {
3193            bad_proof.push(0x00);
3194        }
3195
3196        let result = verifier
3197            .verify_payment(&xorname, Some(&bad_proof), VerificationContext::ClientPut)
3198            .await;
3199
3200        assert!(result.is_err(), "Should reject malformed merkle body");
3201        let err_msg = format!("{}", result.expect_err("should fail"));
3202        assert!(
3203            err_msg.contains("deserialize") || err_msg.contains("Failed"),
3204            "Error should mention deserialization: {err_msg}"
3205        );
3206    }
3207
3208    #[test]
3209    fn test_merkle_proof_serialized_size_within_limits() {
3210        let (_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
3211
3212        // 16 ML-DSA-65 candidates (~1952 pub key + ~3309 sig each) ≈ 84 KB + tree data
3213        assert!(
3214            tagged_proof.len() >= MIN_PAYMENT_PROOF_SIZE_BYTES,
3215            "Merkle proof ({} bytes) should be >= min {} bytes",
3216            tagged_proof.len(),
3217            MIN_PAYMENT_PROOF_SIZE_BYTES
3218        );
3219        assert!(
3220            tagged_proof.len() <= MAX_PAYMENT_PROOF_SIZE_BYTES,
3221            "Merkle proof ({} bytes) should be <= max {} bytes",
3222            tagged_proof.len(),
3223            MAX_PAYMENT_PROOF_SIZE_BYTES
3224        );
3225    }
3226
3227    #[test]
3228    fn test_merkle_proof_tag_is_correct() {
3229        let (_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
3230
3231        assert_eq!(
3232            tagged_proof.first().copied(),
3233            Some(crate::ant_protocol::PROOF_TAG_MERKLE),
3234            "First byte must be the merkle tag"
3235        );
3236        assert_eq!(
3237            crate::payment::proof::detect_proof_type(&tagged_proof),
3238            Some(crate::payment::proof::ProofType::Merkle)
3239        );
3240    }
3241
3242    #[test]
3243    fn test_pool_cache_eviction() {
3244        use evmlib::merkle_batch_payment::PoolHash;
3245
3246        let config = PaymentVerifierConfig {
3247            evm: EvmVerifierConfig::default(),
3248            cache_capacity: 100,
3249            local_rewards_address: RewardsAddress::new([1u8; 20]),
3250        };
3251        let verifier = PaymentVerifier::new(config);
3252
3253        // Fill the pool cache to capacity (DEFAULT_POOL_CACHE_CAPACITY = 1000)
3254        for i in 0..DEFAULT_POOL_CACHE_CAPACITY {
3255            let mut hash: PoolHash = [0u8; 32];
3256            // Write index bytes into the hash
3257            let idx_bytes = i.to_le_bytes();
3258            for (j, b) in idx_bytes.iter().enumerate() {
3259                if j < 32 {
3260                    hash[j] = *b;
3261                }
3262            }
3263            let info = evmlib::merkle_payments::OnChainPaymentInfo {
3264                depth: 4,
3265                merkle_payment_timestamp: 1_700_000_000,
3266                paid_node_addresses: vec![],
3267            };
3268            verifier.pool_cache.lock().put(hash, info);
3269        }
3270
3271        assert_eq!(
3272            verifier.pool_cache.lock().len(),
3273            DEFAULT_POOL_CACHE_CAPACITY
3274        );
3275
3276        // Insert one more — should evict the oldest
3277        let overflow_hash: PoolHash = [0xFFu8; 32];
3278        let info = evmlib::merkle_payments::OnChainPaymentInfo {
3279            depth: 8,
3280            merkle_payment_timestamp: 1_800_000_000,
3281            paid_node_addresses: vec![],
3282        };
3283        verifier.pool_cache.lock().put(overflow_hash, info);
3284
3285        // Size should still be at capacity (not capacity + 1)
3286        assert_eq!(
3287            verifier.pool_cache.lock().len(),
3288            DEFAULT_POOL_CACHE_CAPACITY
3289        );
3290
3291        // The new entry should be present
3292        let found = verifier.pool_cache.lock().get(&overflow_hash).cloned();
3293        assert!(
3294            found.is_some(),
3295            "Newly inserted pool hash should be present"
3296        );
3297        assert_eq!(found.expect("info").depth, 8);
3298    }
3299
3300    #[test]
3301    fn test_pool_cache_concurrent_access() {
3302        use evmlib::merkle_batch_payment::PoolHash;
3303        use std::sync::Arc;
3304
3305        let verifier = Arc::new(create_test_verifier());
3306
3307        let mut handles = Vec::new();
3308        for i in 0..20u8 {
3309            let v = verifier.clone();
3310            handles.push(std::thread::spawn(move || {
3311                let hash: PoolHash = [i; 32];
3312                let info = evmlib::merkle_payments::OnChainPaymentInfo {
3313                    depth: i,
3314                    merkle_payment_timestamp: u64::from(i) * 1000,
3315                    paid_node_addresses: vec![],
3316                };
3317                v.pool_cache.lock().put(hash, info);
3318
3319                // Read back
3320                let found = v.pool_cache.lock().get(&hash).cloned();
3321                assert!(found.is_some(), "Entry {i} should be readable after insert");
3322            }));
3323        }
3324
3325        for handle in handles {
3326            handle.join().expect("thread panicked");
3327        }
3328
3329        // All 20 entries should be present (well under 1000 capacity)
3330        assert_eq!(verifier.pool_cache.lock().len(), 20);
3331    }
3332
3333    #[tokio::test]
3334    async fn test_merkle_tampered_candidate_signature_rejected() {
3335        let verifier = create_test_verifier();
3336
3337        let (mut merkle_proof, _pool_hash, xorname, timestamp) = make_valid_merkle_proof();
3338
3339        // Tamper the first candidate's signature
3340        if let Some(byte) = merkle_proof
3341            .winner_pool
3342            .candidate_nodes
3343            .first_mut()
3344            .and_then(|c| c.signature.first_mut())
3345        {
3346            *byte ^= 0xFF;
3347        }
3348
3349        // Recompute pool hash after tampering (signature change alters the hash)
3350        let tampered_pool_hash = merkle_proof.winner_pool_hash();
3351
3352        // Pre-populate pool cache so we skip the on-chain query
3353        {
3354            let info = evmlib::merkle_payments::OnChainPaymentInfo {
3355                depth: 4,
3356                merkle_payment_timestamp: timestamp,
3357                paid_node_addresses: vec![],
3358            };
3359            verifier.pool_cache.lock().put(tampered_pool_hash, info);
3360        }
3361
3362        let tagged =
3363            crate::payment::proof::serialize_merkle_proof(&merkle_proof).expect("serialize");
3364
3365        let result = verifier
3366            .verify_payment(&xorname, Some(&tagged), VerificationContext::ClientPut)
3367            .await;
3368
3369        assert!(
3370            result.is_err(),
3371            "Should reject merkle proof with tampered candidate signature"
3372        );
3373        let err_msg = format!("{}", result.expect_err("should fail"));
3374        assert!(
3375            err_msg.contains("Invalid ML-DSA-65 signature"),
3376            "Error should mention invalid signature: {err_msg}"
3377        );
3378    }
3379
3380    #[tokio::test]
3381    async fn test_merkle_timestamp_mismatch_rejected() {
3382        let verifier = create_test_verifier();
3383
3384        let (xorname, tagged, pool_hash, timestamp) = make_valid_merkle_proof_bytes();
3385
3386        // Pre-populate pool cache with a DIFFERENT timestamp than the candidates
3387        {
3388            let mismatched_ts = timestamp + 9999;
3389            let info = evmlib::merkle_payments::OnChainPaymentInfo {
3390                depth: 4,
3391                merkle_payment_timestamp: mismatched_ts,
3392                paid_node_addresses: vec![],
3393            };
3394            verifier.pool_cache.lock().put(pool_hash, info);
3395        }
3396
3397        let result = verifier
3398            .verify_payment(&xorname, Some(&tagged), VerificationContext::ClientPut)
3399            .await;
3400
3401        assert!(
3402            result.is_err(),
3403            "Should reject merkle proof with timestamp mismatch"
3404        );
3405        let err_msg = format!("{}", result.expect_err("should fail"));
3406        assert!(
3407            err_msg.contains("timestamp mismatch"),
3408            "Error should mention timestamp mismatch: {err_msg}"
3409        );
3410    }
3411
3412    #[tokio::test]
3413    async fn test_merkle_paid_node_index_out_of_bounds_rejected() {
3414        let verifier = create_test_verifier();
3415        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
3416
3417        // The test tree has 4 addresses → depth 2. We must match the tree depth
3418        // so verify_merkle_proof passes the depth check, then the paid node
3419        // index out-of-bounds check fires.
3420        {
3421            let info = evmlib::merkle_payments::OnChainPaymentInfo {
3422                depth: 2,
3423                merkle_payment_timestamp: ts,
3424                paid_node_addresses: vec![
3425                    // First paid node: valid (matches candidate 0, amount matches formula)
3426                    // Expected per-node: median(1024) * 2^2 / 2 = 2048
3427                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(2048u64)),
3428                    // Second paid node: index 999 is way beyond CANDIDATES_PER_POOL (16)
3429                    (RewardsAddress::new([1u8; 20]), 999, Amount::from(2048u64)),
3430                ],
3431            };
3432            verifier.pool_cache.lock().put(pool_hash, info);
3433        }
3434
3435        let result = verifier
3436            .verify_payment(
3437                &xorname,
3438                Some(&tagged_proof),
3439                VerificationContext::ClientPut,
3440            )
3441            .await;
3442
3443        assert!(
3444            result.is_err(),
3445            "Should reject paid node index out of bounds"
3446        );
3447        let err_msg = format!("{}", result.expect_err("should fail"));
3448        assert!(
3449            err_msg.contains("out of bounds"),
3450            "Error should mention out of bounds: {err_msg}"
3451        );
3452    }
3453
3454    #[tokio::test]
3455    async fn test_merkle_paid_node_address_mismatch_rejected() {
3456        let verifier = create_test_verifier();
3457        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
3458
3459        // Tree has depth 2, so provide 2 paid node entries.
3460        // Both use valid indices but the second has a wrong reward address.
3461        {
3462            let info = evmlib::merkle_payments::OnChainPaymentInfo {
3463                depth: 2,
3464                merkle_payment_timestamp: ts,
3465                paid_node_addresses: vec![
3466                    // Index 0 with matching address [0x00; 20]
3467                    // Expected per-node: median(1024) * 2^2 / 2 = 2048
3468                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(2048u64)),
3469                    // Index 1 with WRONG address — candidate 1's address is [0x01; 20]
3470                    (RewardsAddress::new([0xFF; 20]), 1, Amount::from(2048u64)),
3471                ],
3472            };
3473            verifier.pool_cache.lock().put(pool_hash, info);
3474        }
3475
3476        let result = verifier
3477            .verify_payment(
3478                &xorname,
3479                Some(&tagged_proof),
3480                VerificationContext::ClientPut,
3481            )
3482            .await;
3483
3484        assert!(result.is_err(), "Should reject paid node address mismatch");
3485        let err_msg = format!("{}", result.expect_err("should fail"));
3486        assert!(
3487            err_msg.contains("address mismatch"),
3488            "Error should mention address mismatch: {err_msg}"
3489        );
3490    }
3491
3492    #[tokio::test]
3493    async fn test_merkle_wrong_depth_rejected() {
3494        let verifier = create_test_verifier();
3495        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
3496
3497        // Pre-populate pool cache with depth=3 but only 1 paid node address
3498        // (depth must equal paid_node_addresses.len())
3499        {
3500            let info = evmlib::merkle_payments::OnChainPaymentInfo {
3501                depth: 3,
3502                merkle_payment_timestamp: ts,
3503                paid_node_addresses: vec![(
3504                    RewardsAddress::new([0u8; 20]),
3505                    0,
3506                    Amount::from(1024u64),
3507                )],
3508            };
3509            verifier.pool_cache.lock().put(pool_hash, info);
3510        }
3511
3512        let result = verifier
3513            .verify_payment(
3514                &xorname,
3515                Some(&tagged_proof),
3516                VerificationContext::ClientPut,
3517            )
3518            .await;
3519
3520        assert!(
3521            result.is_err(),
3522            "Should reject mismatched depth vs paid node count"
3523        );
3524        let err_msg = format!("{}", result.expect_err("should fail"));
3525        assert!(
3526            err_msg.contains("Wrong number of paid nodes")
3527                || err_msg.contains("verification failed"),
3528            "Error should mention depth/count mismatch: {err_msg}"
3529        );
3530    }
3531
3532    #[tokio::test]
3533    async fn test_merkle_underpayment_rejected() {
3534        let verifier = create_test_verifier();
3535        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
3536
3537        // Tree depth=2, so 2 paid nodes required. Candidates all quote price=1024.
3538        // Expected per-node: median(1024) * 2^2 / 2 = 2048.
3539        // Pay only 1 wei per node — far below the expected amount.
3540        {
3541            let info = evmlib::merkle_payments::OnChainPaymentInfo {
3542                depth: 2,
3543                merkle_payment_timestamp: ts,
3544                paid_node_addresses: vec![
3545                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(1u64)),
3546                    (RewardsAddress::new([1u8; 20]), 1, Amount::from(1u64)),
3547                ],
3548            };
3549            verifier.pool_cache.lock().put(pool_hash, info);
3550        }
3551
3552        let result = verifier
3553            .verify_payment(
3554                &xorname,
3555                Some(&tagged_proof),
3556                VerificationContext::ClientPut,
3557            )
3558            .await;
3559
3560        assert!(
3561            result.is_err(),
3562            "Should reject merkle payment where paid amount < expected per-node amount"
3563        );
3564        let err_msg = format!("{}", result.expect_err("should fail"));
3565        assert!(
3566            err_msg.contains("Underpayment"),
3567            "Error should mention underpayment: {err_msg}"
3568        );
3569    }
3570
3571    // =========================================================================
3572    // Closeness-window constants regression tests
3573    //
3574    // These constants are load-bearing for both correctness (the storer
3575    // must look at the same window the client picks from, otherwise honest
3576    // pools are rejected) and DoS resistance (the timeout caps lookup
3577    // amplification per forged pool_hash). Pinning them with tests gives
3578    // future patches a one-line failure if either is silently changed
3579    // without updating the security argument in the doc comments.
3580    //
3581    // Empirical justification, captured during STG-01 investigation on
3582    // 2026-05-01:
3583    //
3584    //   - 60s timeout cut iterative lookups off after ~7 of 20 iterations
3585    //     (trace from EWR-3 ant-node-1 in CLOSENESS_LOOKUP_TIMEOUT doc).
3586    //   - K=16 storer window vs K=32 client over-query produced 73%
3587    //     false-positive mismatch rejections under realistic load
3588    //     (115 → 31 client mismatches per 5min after K=32 deploy).
3589    // =========================================================================
3590
3591    #[test]
3592    fn closeness_lookup_timeout_is_240s() {
3593        // Pin the timeout. If a future change drops it back to 60s the
3594        // failure mode from the trace in the doc comment will return.
3595        assert_eq!(
3596            PaymentVerifier::CLOSENESS_LOOKUP_TIMEOUT,
3597            std::time::Duration::from_secs(240),
3598            "CLOSENESS_LOOKUP_TIMEOUT must be 240s; if changing this, update \
3599             the iteration trace in the doc comment and re-validate on a \
3600             fresh testnet"
3601        );
3602    }
3603
3604    #[test]
3605    fn closeness_lookup_width_is_32() {
3606        // Pin the storer's lookup width. Must equal the client's
3607        // over-query factor (CANDIDATES_PER_POOL * 2 = 32) so the storer
3608        // sees the same peers the client legitimately picks from.
3609        assert_eq!(
3610            PaymentVerifier::CLOSENESS_LOOKUP_WIDTH,
3611            2 * evmlib::merkle_payments::CANDIDATES_PER_POOL,
3612            "CLOSENESS_LOOKUP_WIDTH must equal 2 * CANDIDATES_PER_POOL to \
3613             match the client's over-query in get_merkle_candidate_pool"
3614        );
3615    }
3616
3617    #[test]
3618    fn closeness_required_threshold_is_majority() {
3619        // Pin the threshold so a future change can't silently move it. This
3620        // is the security knob: a 9/16 majority tolerates closest-set
3621        // divergence between two nodes' views while still requiring most
3622        // candidates to be real peers the live DHT lists as closest.
3623        assert_eq!(
3624            PaymentVerifier::CANDIDATE_CLOSENESS_REQUIRED,
3625            9,
3626            "closeness threshold is a 9/16 majority"
3627        );
3628    }
3629
3630    #[test]
3631    fn closeness_lookup_count_uses_max_of_width_and_pool_len() {
3632        // The honest case: a 16-candidate pool must trigger a 32-peer
3633        // network lookup. This is the K=16-rejects-honest-pool fix from
3634        // the STG-01 investigation — without it, the storer never
3635        // observes the peers at network-true positions 17–32 that the
3636        // client legitimately picks from.
3637        let standard =
3638            PaymentVerifier::closeness_lookup_count(evmlib::merkle_payments::CANDIDATES_PER_POOL);
3639        assert_eq!(
3640            standard, 32,
3641            "honest 16-candidate pool must trigger a 32-peer DHT lookup"
3642        );
3643
3644        // Future-proof: if a protocol bump ever produces a pool larger
3645        // than CLOSENESS_LOOKUP_WIDTH, lookup_count must scale with the
3646        // pool — not truncate to WIDTH. Truncating would let an attacker
3647        // hide candidates by padding the pool past the storer's window.
3648        assert_eq!(
3649            PaymentVerifier::closeness_lookup_count(64),
3650            64,
3651            "lookup_count must scale up if pool exceeds CLOSENESS_LOOKUP_WIDTH"
3652        );
3653
3654        // Lower bound (also covered by the const-assert below; pin the
3655        // runtime path too in case the const-assert is ever removed).
3656        assert_eq!(
3657            PaymentVerifier::closeness_lookup_count(1),
3658            PaymentVerifier::CLOSENESS_LOOKUP_WIDTH,
3659            "lookup_count must never drop below CLOSENESS_LOOKUP_WIDTH"
3660        );
3661    }
3662
3663    // Compile-time invariant: the `closeness_lookup_count` formula relies
3664    // on WIDTH being ≥ CANDIDATES_PER_POOL so we never request fewer peers
3665    // than the pool itself contains.
3666    const _: () = assert!(
3667        PaymentVerifier::CLOSENESS_LOOKUP_WIDTH >= evmlib::merkle_payments::CANDIDATES_PER_POOL,
3668        "CLOSENESS_LOOKUP_WIDTH must be ≥ CANDIDATES_PER_POOL",
3669    );
3670
3671    // =========================================================================
3672    // Closeness-match logic tests
3673    //
3674    // These tests use the extracted `check_closeness_match` helper to
3675    // exercise the matching logic directly with synthetic peer-ID sets,
3676    // without standing up a real DHT. They cover:
3677    //
3678    //   - the 9/16 majority threshold (accept at exactly 9, reject below);
3679    //   - that a candidate counts only via exact membership in the storer's
3680    //     returned closest peers, so off-network fabrications are rejected;
3681    //   - the sparse-network short-circuit.
3682    //
3683    // Synthetic PeerIds put the tag in `bytes[0]`, so a candidate is in or
3684    // out of the network's returned set purely by tag value.
3685    // =========================================================================
3686
3687    /// Build a deterministic `PeerId` from a single byte tag.
3688    fn synthetic_peer_id(tag: u8) -> PeerId {
3689        let mut bytes = [0u8; 32];
3690        bytes[0] = tag;
3691        PeerId::from_bytes(bytes)
3692    }
3693
3694    /// Build a vector of synthetic `PeerId`s tagged with bytes 1..=n.
3695    fn synthetic_peer_ids(n: u8) -> Vec<PeerId> {
3696        (1..=n).map(synthetic_peer_id).collect()
3697    }
3698
3699    #[test]
3700    fn closeness_match_passes_when_all_16_candidates_in_top_16() {
3701        // Trivial case: every candidate is in the network's top-16.
3702        // Asserts the happy path still works after the refactor.
3703        let candidates = synthetic_peer_ids(16);
3704        let network = synthetic_peer_ids(16);
3705        let pool_address = [0u8; 32];
3706        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
3707        assert!(result.is_ok(), "all-in-top-16 pool must pass: {result:?}");
3708    }
3709
3710    #[test]
3711    fn closeness_match_passes_when_candidates_span_positions_1_to_15_and_17() {
3712        // The client's pool contains 16 candidates, 15 at network-true
3713        // positions 1..=15 plus one at position 17 (the position-16 peer was
3714        // unresponsive when the client over-queried). Under K=32 all 16 are
3715        // exact matches, comfortably ≥ the 9/16 majority.
3716        let candidates = synthetic_peer_ids(15)
3717            .into_iter()
3718            .chain(std::iter::once(synthetic_peer_id(17)))
3719            .collect::<Vec<_>>();
3720        // Lookup window = 32, includes position 17.
3721        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
3722        let pool_address = [0u8; 32];
3723        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
3724        assert!(
3725            result.is_ok(),
3726            "pool with one candidate at position 17 must pass: {result:?}"
3727        );
3728    }
3729
3730    #[test]
3731    fn closeness_match_accepts_honest_skew_via_exact_matches() {
3732        // Honest skew: the client's 16 candidates span network-true positions
3733        // {1..=12, 17, 19, 21, 23}. The lookup window of 32 covers all of
3734        // them, so all 16 are exact matches — trivially ≥ the 9/16 majority.
3735        let candidates: Vec<PeerId> = (1..=12u8)
3736            .chain([17u8, 19, 21, 23])
3737            .map(synthetic_peer_id)
3738            .collect();
3739        let pool_address = [0u8; 32];
3740        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
3741
3742        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
3743        assert!(
3744            result.is_ok(),
3745            "honest pool fully inside the lookup window must pass: {result:?}"
3746        );
3747    }
3748
3749    #[test]
3750    fn closeness_match_rejects_forged_pool() {
3751        // Security floor: a fully-forged pool whose candidate PeerIds are
3752        // disjoint from the network's returned closest peers must be
3753        // rejected. The lowered majority threshold must NOT let off-network
3754        // fabrications pass — every counted candidate has to be a peer the
3755        // live DHT actually returned.
3756        let forged_candidates: Vec<PeerId> = (100..=115).map(synthetic_peer_id).collect();
3757        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
3758        let pool_address = [0u8; 32];
3759
3760        let result =
3761            PaymentVerifier::check_closeness_match(&forged_candidates, &network, &pool_address);
3762        match result {
3763            Err(Error::Payment(msg)) => {
3764                assert!(
3765                    msg.contains("candidate pub_keys do not match"),
3766                    "expected forged-pool rejection message, got: {msg}"
3767                );
3768            }
3769            other => {
3770                panic!("forged pool disjoint from the network set must be rejected: {other:?}")
3771            }
3772        }
3773    }
3774
3775    #[test]
3776    fn closeness_match_rejects_pool_below_majority() {
3777        // Threshold sanity: 8 candidates are exact matches (tags 1..=8) and
3778        // the other 8 are off-network fabrications (tags 100..=107). 8 < 9
3779        // → reject.
3780        let mut candidates = synthetic_peer_ids(8);
3781        candidates.extend((100..=107).map(synthetic_peer_id)); // 8 fabrications
3782        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
3783        let pool_address = [0u8; 32];
3784
3785        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
3786        assert!(
3787            result.is_err(),
3788            "8 matches < majority of 9/16 must reject: {result:?}"
3789        );
3790    }
3791
3792    #[test]
3793    fn closeness_match_accepts_at_exactly_majority() {
3794        // Threshold sanity: exactly 9 candidates are exact matches (tags
3795        // 1..=9), the other 7 are off-network fabrications (tags 100..=106).
3796        // 9 ≥ 9 → accept.
3797        let mut candidates = synthetic_peer_ids(9);
3798        candidates.extend((100..=106).map(synthetic_peer_id)); // 7 fabrications
3799        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
3800        let pool_address = [0u8; 32];
3801
3802        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
3803        assert!(
3804            result.is_ok(),
3805            "9/16 ≥ majority threshold must accept: {result:?}"
3806        );
3807    }
3808
3809    #[test]
3810    fn closeness_match_returns_sparse_dht_error_when_lookup_too_small() {
3811        // The sparse-DHT short-circuit fires when the lookup returned
3812        // fewer peers than the threshold itself — even an all-matching
3813        // candidate set can't pass because the storer doesn't have an
3814        // authoritative view to compare against.
3815        let candidates = synthetic_peer_ids(16);
3816        let network = synthetic_peer_ids(8); // < CANDIDATE_CLOSENESS_REQUIRED (9)
3817        let pool_address = [0u8; 32];
3818
3819        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
3820        match result {
3821            Err(Error::Payment(msg)) => {
3822                assert!(
3823                    msg.contains("authoritative DHT lookup returned only 8"),
3824                    "expected sparse-DHT error message, got: {msg}"
3825                );
3826            }
3827            other => panic!("expected sparse-DHT rejection, got: {other:?}"),
3828        }
3829    }
3830}