Skip to main content

ant_node/payment/
verifier.rs

1//! Payment verifier with LRU cache and EVM verification.
2//!
3//! This is the core payment verification logic for ant-node.
4//! All new data requires EVM payment on Arbitrum (no free tier).
5
6use crate::ant_protocol::CLOSE_GROUP_SIZE;
7use crate::error::{Error, Result};
8use crate::logging::{debug, info};
9use crate::payment::cache::{CacheStats, VerifiedCache, XorName};
10use crate::payment::proof::{
11    deserialize_merkle_proof, deserialize_proof, detect_proof_type, ProofType,
12};
13use crate::payment::single_node::SingleNodePayment;
14use ant_protocol::payment::verify::{verify_quote_content, verify_quote_signature};
15use evmlib::common::Amount;
16use evmlib::contract::payment_vault;
17use evmlib::merkle_batch_payment::{OnChainPaymentInfo, PoolHash};
18use evmlib::Network as EvmNetwork;
19use evmlib::ProofOfPayment;
20use evmlib::RewardsAddress;
21use lru::LruCache;
22use parking_lot::{Mutex, RwLock};
23use saorsa_core::identity::node_identity::peer_id_from_public_key_bytes;
24use saorsa_core::identity::PeerId;
25use saorsa_core::P2PNode;
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28use std::time::SystemTime;
29
30/// Minimum allowed size for a payment proof in bytes.
31///
32/// This minimum ensures the proof contains at least a basic cryptographic hash or identifier.
33/// Proofs smaller than this are rejected as they cannot contain sufficient payment information.
34pub const MIN_PAYMENT_PROOF_SIZE_BYTES: usize = 32;
35
36/// Maximum allowed size for a payment proof in bytes (256 KB).
37///
38/// Single-node proofs with 7 ML-DSA-65 quotes reach ~40 KB.
39/// Merkle proofs include 16 candidate nodes (each with ~1,952-byte ML-DSA pub key
40/// and ~3,309-byte signature) plus merkle branch hashes, totaling ~130 KB.
41/// 256 KB provides headroom while still capping memory during verification.
42pub const MAX_PAYMENT_PROOF_SIZE_BYTES: usize = 262_144;
43
44/// Maximum age of a payment quote before it's considered expired (24 hours).
45/// Prevents replaying old cheap quotes against nearly-full nodes.
46const QUOTE_MAX_AGE_SECS: u64 = 86_400;
47
48/// Maximum allowed clock skew for quote timestamps (60 seconds).
49/// Accounts for NTP synchronization differences between P2P nodes.
50const QUOTE_CLOCK_SKEW_TOLERANCE_SECS: u64 = 60;
51
52/// Configuration for EVM payment verification.
53///
54/// EVM verification is always on. All new data requires on-chain
55/// payment verification. The network field selects which EVM chain to use.
56#[derive(Debug, Clone)]
57pub struct EvmVerifierConfig {
58    /// EVM network to use (Arbitrum One, Arbitrum Sepolia, etc.)
59    pub network: EvmNetwork,
60}
61
62impl Default for EvmVerifierConfig {
63    fn default() -> Self {
64        Self {
65            network: EvmNetwork::ArbitrumOne,
66        }
67    }
68}
69
70/// Configuration for the payment verifier.
71///
72/// All new data requires EVM payment on Arbitrum. The cache stores
73/// previously verified payments to avoid redundant on-chain lookups.
74#[derive(Debug, Clone)]
75pub struct PaymentVerifierConfig {
76    /// EVM verifier configuration.
77    pub evm: EvmVerifierConfig,
78    /// Cache capacity (number of `XorName` values to cache).
79    pub cache_capacity: usize,
80    /// Local node's rewards address.
81    /// The verifier rejects payments that don't include this node as a recipient.
82    pub local_rewards_address: RewardsAddress,
83}
84
85/// Status returned by payment verification.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum PaymentStatus {
88    /// Data was found in local cache - previously paid.
89    CachedAsVerified,
90    /// New data - payment required.
91    PaymentRequired,
92    /// Payment was provided and verified.
93    PaymentVerified,
94}
95
96impl PaymentStatus {
97    /// Returns true if the data can be stored (cached or payment verified).
98    #[must_use]
99    pub fn can_store(&self) -> bool {
100        matches!(self, Self::CachedAsVerified | Self::PaymentVerified)
101    }
102
103    /// Returns true if this status indicates the data was already paid for.
104    #[must_use]
105    pub fn is_cached(&self) -> bool {
106        matches!(self, Self::CachedAsVerified)
107    }
108}
109
110/// Default capacity for the merkle pool cache (number of pool hashes to cache).
111const DEFAULT_POOL_CACHE_CAPACITY: usize = 1_000;
112
113/// Main payment verifier for ant-node.
114///
115/// Uses:
116/// 1. LRU cache for fast lookups of previously verified `XorName` values
117/// 2. EVM payment verification for new data (always required)
118/// 3. Pool-level cache for merkle batch payments (avoids repeated on-chain queries)
119pub struct PaymentVerifier {
120    /// LRU cache of verified `XorName` values.
121    cache: VerifiedCache,
122    /// LRU cache of verified merkle pool hashes → on-chain payment info.
123    pool_cache: Mutex<LruCache<PoolHash, OnChainPaymentInfo>>,
124    /// LRU cache of pool hashes whose candidate closeness has already been
125    /// verified by this node. Collapses the per-chunk Kademlia lookup cost
126    /// within a batch (256 chunks × 1 pool = 1 lookup instead of 256).
127    closeness_pass_cache: Mutex<LruCache<PoolHash, ()>>,
128    /// In-flight closeness lookups, keyed by pool hash. Lets concurrent PUTs
129    /// for the same pool coalesce onto a single Kademlia lookup AND share
130    /// its result — on both success and failure — which bounds `DoS`
131    /// amplification to one lookup per unique `pool_hash` regardless of
132    /// concurrency.
133    inflight_closeness: Mutex<LruCache<PoolHash, Arc<ClosenessSlot>>>,
134    /// P2P node handle, attached post-construction so merkle verification can
135    /// check that candidate `pub_keys` map to peers actually close to the pool
136    /// midpoint in the live DHT. `None` in unit tests that don't exercise
137    /// merkle verification; production startup MUST call [`attach_p2p_node`].
138    p2p_node: RwLock<Option<Arc<P2PNode>>>,
139    /// Configuration.
140    config: PaymentVerifierConfig,
141}
142
143/// Shared state for an inflight closeness verification. The leader publishes
144/// its result via the `OnceLock`; waiters read that result directly instead
145/// of racing on a cache re-check. Wrapped in an `Arc` and held both by the
146/// leader's drop guard and by each waiting task.
147struct ClosenessSlot {
148    notify: Arc<tokio::sync::Notify>,
149    /// `Some(Ok(()))` on success, `Some(Err(msg))` on failure, `None` if the
150    /// leader disappeared without publishing (panic, cancellation).
151    result: std::sync::OnceLock<std::result::Result<(), String>>,
152}
153
154impl ClosenessSlot {
155    fn new() -> Self {
156        Self {
157            notify: Arc::new(tokio::sync::Notify::new()),
158            result: std::sync::OnceLock::new(),
159        }
160    }
161
162    /// Build an owned `Notified` future that snapshots the `notify_waiters`
163    /// counter at call time. Awaiting this future after dropping external
164    /// locks is race-free: if `notify_waiters` fires between construction
165    /// and the first poll, the snapshot mismatch resolves the future
166    /// immediately.
167    fn notified_owned(&self) -> tokio::sync::futures::OwnedNotified {
168        Arc::clone(&self.notify).notified_owned()
169    }
170}
171
172/// Drop guard that publishes the leader's result, clears the inflight slot,
173/// and wakes all waiters. Fires on every exit path: success, failure, panic,
174/// future-cancellation.
175///
176/// The guard owns its own `Arc<ClosenessSlot>` so `notify_waiters` still
177/// fires even if LRU pressure evicted the slot before the leader finished.
178/// Waiters see the published result via `result.get()`; the `Notify` is only
179/// the wake-up signal.
180struct InflightGuard<'a> {
181    slot_cache: &'a Mutex<LruCache<PoolHash, Arc<ClosenessSlot>>>,
182    pool_hash: PoolHash,
183    slot: Arc<ClosenessSlot>,
184}
185
186impl InflightGuard<'_> {
187    /// Publish the leader's result. Called exactly once by the leader on
188    /// every successful or explicit-error exit. If dropped without calling
189    /// (panic, cancellation) the guard still wakes waiters but leaves
190    /// `result` empty, which waiters treat as a transient failure and retry.
191    fn publish(&self, result: &Result<()>) {
192        let stored: std::result::Result<(), String> = match result {
193            Ok(()) => Ok(()),
194            Err(e) => Err(e.to_string()),
195        };
196        let _ = self.slot.result.set(stored);
197    }
198}
199
200impl Drop for InflightGuard<'_> {
201    fn drop(&mut self) {
202        // Remove the slot entry if it's still ours. A separate leader may
203        // have inserted a new slot for the same pool_hash after LRU
204        // eviction — don't pop someone else's entry.
205        {
206            let mut cache = self.slot_cache.lock();
207            if let Some(existing) = cache.peek(&self.pool_hash) {
208                if Arc::ptr_eq(existing, &self.slot) {
209                    cache.pop(&self.pool_hash);
210                }
211            }
212        }
213        // Wake every waiter registered against OUR slot, regardless of
214        // whether the cache entry is still ours.
215        self.slot.notify.notify_waiters();
216    }
217}
218
219impl PaymentVerifier {
220    /// Create a new payment verifier.
221    #[must_use]
222    pub fn new(config: PaymentVerifierConfig) -> Self {
223        const _: () = assert!(
224            DEFAULT_POOL_CACHE_CAPACITY > 0,
225            "pool cache capacity must be > 0"
226        );
227        let cache = VerifiedCache::with_capacity(config.cache_capacity);
228        let pool_cache_size =
229            NonZeroUsize::new(DEFAULT_POOL_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
230        let pool_cache = Mutex::new(LruCache::new(pool_cache_size));
231        let closeness_pass_cache = Mutex::new(LruCache::new(pool_cache_size));
232        let inflight_closeness = Mutex::new(LruCache::new(pool_cache_size));
233
234        let cache_capacity = config.cache_capacity;
235        info!("Payment verifier initialized (cache_capacity={cache_capacity}, evm=always-on, pool_cache={DEFAULT_POOL_CACHE_CAPACITY})");
236
237        // Loud warning if a production binary was accidentally built with
238        // `test-utils`: that feature flips the closeness-check fail-open
239        // switch, disabling the pay-yourself defence when P2PNode isn't
240        // attached. Safe in tests, never intended for prod.
241        #[cfg(feature = "test-utils")]
242        crate::logging::error!(
243            "PaymentVerifier: built with `test-utils` feature — merkle closeness \
244             defence falls back to fail-open when no P2PNode is attached. This \
245             feature is for test binaries only; production nodes must be built \
246             without it."
247        );
248
249        Self {
250            cache,
251            pool_cache,
252            closeness_pass_cache,
253            inflight_closeness,
254            p2p_node: RwLock::new(None),
255            config,
256        }
257    }
258
259    /// Attach the node's [`P2PNode`] handle so merkle-payment verification can
260    /// check candidate `pub_keys` against the DHT's actual closest peers to the
261    /// pool midpoint.
262    ///
263    /// Production startup MUST call this once the `P2PNode` exists. Without
264    /// it, the closeness check fails CLOSED in release builds (rejects the
265    /// PUT with a visible error) and fails open in test builds. Idempotent:
266    /// calling twice replaces the handle.
267    pub fn attach_p2p_node(&self, node: Arc<P2PNode>) {
268        *self.p2p_node.write() = Some(node);
269        debug!("PaymentVerifier: P2PNode attached for merkle closeness checks");
270    }
271
272    /// Check if payment is required for the given `XorName`.
273    ///
274    /// This is the main entry point for payment verification:
275    /// 1. Check LRU cache (fast path)
276    /// 2. If not cached, payment is required
277    ///
278    /// # Arguments
279    ///
280    /// * `xorname` - The content-addressed name of the data
281    ///
282    /// # Returns
283    ///
284    /// * `PaymentStatus::CachedAsVerified` - Found in local cache (previously paid)
285    /// * `PaymentStatus::PaymentRequired` - Not cached (payment required)
286    pub fn check_payment_required(&self, xorname: &XorName) -> PaymentStatus {
287        // Check LRU cache (fast path)
288        if self.cache.contains(xorname) {
289            if crate::logging::enabled!(crate::logging::Level::DEBUG) {
290                debug!("Data {} found in verified cache", hex::encode(xorname));
291            }
292            return PaymentStatus::CachedAsVerified;
293        }
294
295        // Not in cache - payment required
296        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
297            debug!(
298                "Data {} not in cache - payment required",
299                hex::encode(xorname)
300            );
301        }
302        PaymentStatus::PaymentRequired
303    }
304
305    /// Verify that a PUT request has valid payment.
306    ///
307    /// This is the complete payment verification flow:
308    /// 1. Check if data is in cache (previously paid)
309    /// 2. If not, verify the provided payment proof
310    ///
311    /// # Arguments
312    ///
313    /// * `xorname` - The content-addressed name of the data
314    /// * `payment_proof` - Optional payment proof (required if not in cache)
315    ///
316    /// # Returns
317    ///
318    /// * `Ok(PaymentStatus)` - Verification succeeded
319    /// * `Err(Error::Payment)` - No payment and not cached, or payment invalid
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if payment is required but not provided, or if payment is invalid.
324    pub async fn verify_payment(
325        &self,
326        xorname: &XorName,
327        payment_proof: Option<&[u8]>,
328    ) -> Result<PaymentStatus> {
329        // First check if payment is required
330        let status = self.check_payment_required(xorname);
331
332        match status {
333            PaymentStatus::CachedAsVerified => {
334                // No payment needed - already in cache
335                Ok(status)
336            }
337            PaymentStatus::PaymentRequired => {
338                // EVM verification is always on — verify the proof
339                if let Some(proof) = payment_proof {
340                    let proof_len = proof.len();
341                    if proof_len < MIN_PAYMENT_PROOF_SIZE_BYTES {
342                        return Err(Error::Payment(format!(
343                            "Payment proof too small: {proof_len} bytes (min {MIN_PAYMENT_PROOF_SIZE_BYTES})"
344                        )));
345                    }
346                    if proof_len > MAX_PAYMENT_PROOF_SIZE_BYTES {
347                        return Err(Error::Payment(format!(
348                            "Payment proof too large: {proof_len} bytes (max {MAX_PAYMENT_PROOF_SIZE_BYTES} bytes)"
349                        )));
350                    }
351
352                    // Detect proof type from version tag byte
353                    match detect_proof_type(proof) {
354                        Some(ProofType::Merkle) => {
355                            self.verify_merkle_payment(xorname, proof).await?;
356                        }
357                        Some(ProofType::SingleNode) => {
358                            let (payment, tx_hashes) = deserialize_proof(proof).map_err(|e| {
359                                Error::Payment(format!("Failed to deserialize payment proof: {e}"))
360                            })?;
361
362                            if !tx_hashes.is_empty() {
363                                debug!("Proof includes {} transaction hash(es)", tx_hashes.len());
364                            }
365
366                            self.verify_evm_payment(xorname, &payment).await?;
367                        }
368                        None => {
369                            let tag = proof.first().copied().unwrap_or(0);
370                            return Err(Error::Payment(format!(
371                                "Unknown payment proof type tag: 0x{tag:02x}"
372                            )));
373                        }
374                        // ant-protocol marks `ProofType` as `#[non_exhaustive]`.
375                        // A future proof variant that this node does not yet
376                        // understand must be rejected, not silently accepted.
377                        Some(_) => {
378                            let tag = proof.first().copied().unwrap_or(0);
379                            return Err(Error::Payment(format!(
380                                "Unsupported payment proof type tag: 0x{tag:02x} (this node's protocol version does not handle it — upgrade ant-node)"
381                            )));
382                        }
383                    }
384
385                    // Cache the verified xorname
386                    self.cache.insert(*xorname);
387
388                    Ok(PaymentStatus::PaymentVerified)
389                } else {
390                    // No payment provided in production mode
391                    let xorname_hex = hex::encode(xorname);
392                    Err(Error::Payment(format!(
393                        "Payment required for new data {xorname_hex}"
394                    )))
395                }
396            }
397            PaymentStatus::PaymentVerified => Err(Error::Payment(
398                "Unexpected PaymentVerified status from check_payment_required".to_string(),
399            )),
400        }
401    }
402
403    /// Get cache statistics.
404    #[must_use]
405    pub fn cache_stats(&self) -> CacheStats {
406        self.cache.stats()
407    }
408
409    /// Get the number of cached entries.
410    #[must_use]
411    pub fn cache_len(&self) -> usize {
412        self.cache.len()
413    }
414
415    /// Pre-populate the payment cache for a given address.
416    ///
417    /// This marks the address as already paid, so subsequent `verify_payment`
418    /// calls will return `CachedAsVerified` without on-chain verification.
419    /// Useful for test setups where real EVM payment is not needed.
420    #[cfg(any(test, feature = "test-utils"))]
421    pub fn cache_insert(&self, xorname: XorName) {
422        self.cache.insert(xorname);
423    }
424
425    /// Pre-populate the merkle pool cache. Testing helper that lets e2e tests
426    /// bypass the on-chain `completedMerklePayments` lookup when the point of
427    /// the test is to exercise merkle-verification logic BEFORE the on-chain
428    /// call (e.g. the pay-yourself closeness check).
429    #[cfg(any(test, feature = "test-utils"))]
430    pub fn pool_cache_insert(&self, pool_hash: PoolHash, info: OnChainPaymentInfo) {
431        let mut cache = self.pool_cache.lock();
432        cache.put(pool_hash, info);
433    }
434
435    /// Verify a single-node EVM payment proof.
436    ///
437    /// Verification steps:
438    /// 1. Exactly `CLOSE_GROUP_SIZE` quotes are present
439    /// 2. All quotes target the correct content address (xorname binding)
440    /// 3. Quote timestamps are fresh (not expired or future-dated)
441    /// 4. Peer ID bindings match the ML-DSA-65 public keys
442    /// 5. This node is among the quoted recipients
443    /// 6. All ML-DSA-65 signatures are valid (offloaded to `spawn_blocking`)
444    /// 7. The median-priced quote was paid at least 3x its price on-chain
445    ///    (looked up via `completedPayments(quoteHash)` on the payment vault)
446    ///
447    /// For unit tests that don't need on-chain verification, pre-populate
448    /// the cache so `verify_payment` returns `CachedAsVerified` before
449    /// reaching this method.
450    async fn verify_evm_payment(&self, xorname: &XorName, payment: &ProofOfPayment) -> Result<()> {
451        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
452            let xorname_hex = hex::encode(xorname);
453            let quote_count = payment.peer_quotes.len();
454            debug!("Verifying EVM payment for {xorname_hex} with {quote_count} quotes");
455        }
456
457        Self::validate_quote_structure(payment)?;
458        Self::validate_quote_content(payment, xorname)?;
459        Self::validate_quote_timestamps(payment)?;
460        Self::validate_peer_bindings(payment)?;
461        self.validate_local_recipient(payment)?;
462
463        // Verify quote signatures (CPU-bound, run off async runtime)
464        let peer_quotes = payment.peer_quotes.clone();
465        tokio::task::spawn_blocking(move || {
466            for (encoded_peer_id, quote) in &peer_quotes {
467                if !verify_quote_signature(quote) {
468                    return Err(Error::Payment(
469                        format!("Quote ML-DSA-65 signature verification failed for peer {encoded_peer_id:?}"),
470                    ));
471                }
472            }
473            Ok(())
474        })
475        .await
476        .map_err(|e| Error::Payment(format!("Signature verification task failed: {e}")))??;
477
478        // Reconstruct the SingleNodePayment to identify the median quote.
479        // from_quotes() sorts by price and marks the median for 3x payment.
480        let quotes_with_prices: Vec<_> = payment
481            .peer_quotes
482            .iter()
483            .map(|(_, quote)| (quote.clone(), quote.price))
484            .collect();
485        let single_payment = SingleNodePayment::from_quotes(quotes_with_prices).map_err(|e| {
486            Error::Payment(format!(
487                "Failed to reconstruct payment for verification: {e}"
488            ))
489        })?;
490
491        // Verify the median quote was paid at least 3x its price on-chain
492        // via completedPayments(quoteHash) on the payment vault contract.
493        let verified_amount = single_payment
494            .verify(&self.config.evm.network)
495            .await
496            .map_err(|e| {
497                let xorname_hex = hex::encode(xorname);
498                Error::Payment(format!(
499                    "Median quote payment verification failed for {xorname_hex}: {e}"
500                ))
501            })?;
502
503        if crate::logging::enabled!(crate::logging::Level::INFO) {
504            let xorname_hex = hex::encode(xorname);
505            info!("EVM payment verified for {xorname_hex} (median paid {verified_amount} atto)");
506        }
507        Ok(())
508    }
509
510    /// Validate quote count, uniqueness, and basic structure.
511    fn validate_quote_structure(payment: &ProofOfPayment) -> Result<()> {
512        if payment.peer_quotes.is_empty() {
513            return Err(Error::Payment("Payment has no quotes".to_string()));
514        }
515
516        let quote_count = payment.peer_quotes.len();
517        if quote_count != CLOSE_GROUP_SIZE {
518            return Err(Error::Payment(format!(
519                "Payment must have exactly {CLOSE_GROUP_SIZE} quotes, got {quote_count}"
520            )));
521        }
522
523        let mut seen: Vec<&evmlib::EncodedPeerId> = Vec::with_capacity(quote_count);
524        for (encoded_peer_id, _) in &payment.peer_quotes {
525            if seen.contains(&encoded_peer_id) {
526                return Err(Error::Payment(format!(
527                    "Duplicate peer ID in payment quotes: {encoded_peer_id:?}"
528                )));
529            }
530            seen.push(encoded_peer_id);
531        }
532
533        Ok(())
534    }
535
536    /// Verify all quotes target the correct content address.
537    fn validate_quote_content(payment: &ProofOfPayment, xorname: &XorName) -> Result<()> {
538        for (encoded_peer_id, quote) in &payment.peer_quotes {
539            if !verify_quote_content(quote, xorname) {
540                let expected_hex = hex::encode(xorname);
541                let actual_hex = hex::encode(quote.content.0);
542                return Err(Error::Payment(format!(
543                    "Quote content address mismatch for peer {encoded_peer_id:?}: expected {expected_hex}, got {actual_hex}"
544                )));
545            }
546        }
547        Ok(())
548    }
549
550    /// Verify quote freshness — reject stale or excessively future quotes.
551    fn validate_quote_timestamps(payment: &ProofOfPayment) -> Result<()> {
552        let now = SystemTime::now();
553        for (encoded_peer_id, quote) in &payment.peer_quotes {
554            match now.duration_since(quote.timestamp) {
555                Ok(age) => {
556                    if age.as_secs() > QUOTE_MAX_AGE_SECS {
557                        return Err(Error::Payment(format!(
558                            "Quote from peer {encoded_peer_id:?} expired: age {}s exceeds max {QUOTE_MAX_AGE_SECS}s",
559                            age.as_secs()
560                        )));
561                    }
562                }
563                Err(_) => {
564                    if let Ok(skew) = quote.timestamp.duration_since(now) {
565                        if skew.as_secs() > QUOTE_CLOCK_SKEW_TOLERANCE_SECS {
566                            return Err(Error::Payment(format!(
567                                "Quote from peer {encoded_peer_id:?} has timestamp {}s in the future \
568                                 (exceeds {QUOTE_CLOCK_SKEW_TOLERANCE_SECS}s tolerance)",
569                                skew.as_secs()
570                            )));
571                        }
572                    } else {
573                        return Err(Error::Payment(format!(
574                            "Quote from peer {encoded_peer_id:?} has invalid timestamp"
575                        )));
576                    }
577                }
578            }
579        }
580        Ok(())
581    }
582
583    /// Verify each quote's `pub_key` matches the claimed peer ID via BLAKE3.
584    fn validate_peer_bindings(payment: &ProofOfPayment) -> Result<()> {
585        for (encoded_peer_id, quote) in &payment.peer_quotes {
586            let expected_peer_id = peer_id_from_public_key_bytes(&quote.pub_key)
587                .map_err(|e| Error::Payment(format!("Invalid ML-DSA public key in quote: {e}")))?;
588
589            if expected_peer_id.as_bytes() != encoded_peer_id.as_bytes() {
590                let expected_hex = expected_peer_id.to_hex();
591                let actual_hex = hex::encode(encoded_peer_id.as_bytes());
592                return Err(Error::Payment(format!(
593                    "Quote pub_key does not belong to claimed peer {encoded_peer_id:?}: \
594                     BLAKE3(pub_key) = {expected_hex}, peer_id = {actual_hex}"
595                )));
596            }
597        }
598        Ok(())
599    }
600
601    /// Minimum number of candidate `pub_keys` (out of 16) whose derived `PeerId`
602    /// must match the DHT's actual closest peers to the pool midpoint address.
603    ///
604    /// Set below 16/16 to absorb normal routing-table skew between the
605    /// payer's view and this node's view — on a well-connected network the
606    /// divergence between two nodes' closest-set views is typically 1-2
607    /// peers, occasionally 3 during churn. 13/16 tolerates 3 divergent
608    /// peers while still limiting how many candidates an attacker can
609    /// fabricate before the check bites. A lower threshold (e.g. 9/16)
610    /// would let an attacker who controls 7 real neighbourhood peers plant
611    /// 7 fabricated candidates and still pass.
612    ///
613    /// This is the pure "fabricated key" defence; it does not stop an
614    /// attacker who can grind the pool midpoint address to land near 13
615    /// pre-chosen keys AND run those keys as Sybil DHT participants. That
616    /// requires an orthogonal Sybil-resistance layer and is out of scope
617    /// for this check.
618    const CANDIDATE_CLOSENESS_REQUIRED: usize = 13;
619
620    /// Timeout for the authoritative network lookup used by the closeness
621    /// check.
622    ///
623    /// Iterative Kademlia lookups can cascade through up to 20 iterations,
624    /// and a single unresponsive peer's dial can take 20-30s before timing
625    /// out. 60s leaves room for the lookup to converge even under churn
626    /// while still capping `DoS` amplification at roughly one bounded lookup
627    /// per forged `pool_hash`.
628    const CLOSENESS_LOOKUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
629
630    /// Maximum waiter → leader retries when the leader's future was cancelled
631    /// or panicked before publishing a result. Beyond this the waiter returns
632    /// a visible error rather than spinning indefinitely through a
633    /// cancellation cascade.
634    const MAX_LEADER_RETRIES: usize = 4;
635
636    /// Verify that the candidate pool's `pub_keys` correspond to peers that
637    /// are actually XOR-closest to the pool midpoint address, by querying
638    /// the DHT for its closest peers to that address and requiring that a
639    /// majority of the candidates match.
640    ///
641    /// **What this blocks**: the "pay yourself" attack. Candidate signatures
642    /// only cover `(price, reward_address, timestamp)` and the `pub_key` bytes —
643    /// nothing ties a candidate to a network-registered identity or to the
644    /// pool neighbourhood. Without this check an attacker can generate 16
645    /// ML-DSA keypairs locally, point all 16 `reward_address` fields at a
646    /// single attacker-controlled wallet, submit the merkle payment, and drain
647    /// their own payment back out.
648    ///
649    /// **How it blocks**: each candidate's `PeerId = BLAKE3(pub_key)`; the DHT
650    /// is the authoritative source of "which peers exist at this XOR
651    /// coordinate". If the attacker's 16 fabricated `PeerId`s are not among
652    /// the peers the network actually lists as closest to the pool address,
653    /// the pool is forged.
654    ///
655    /// **Scope**: a `MerklePaymentProof` carries exactly one `winner_pool`
656    /// (the pool the smart contract selected for the batch). Every storing
657    /// node that receives the proof independently re-runs this check against
658    /// that same pool, so a forged pool is rejected at every node it
659    /// reaches.
660    ///
661    /// **Known limitation — Sybil-grinding**: `midpoint_proof.address()` is a
662    /// BLAKE3 hash of attacker-controllable inputs (leaf bytes, tree root,
663    /// timestamp). A determined attacker who *also* runs Sybil DHT nodes can
664    /// grind the midpoint until it lands in a region where 13 of their
665    /// Sybil keys are the true network-closest — at which point this check
666    /// passes for the attacker. Closing that gap requires binding the
667    /// midpoint to an attacker-uncontrolled value (e.g. a block hash at
668    /// payment time or an on-chain VRF) or a Sybil-resistant identity
669    /// layer. This defence raises the attack cost from "free" to "run N
670    /// Sybil nodes AND grind", which is a meaningful but not complete
671    /// improvement.
672    async fn verify_merkle_candidate_closeness(
673        &self,
674        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
675        pool_hash: PoolHash,
676    ) -> Result<()> {
677        // Fast path: this node already verified this pool successfully.
678        // A batch of 256 chunks shares one winner_pool, so without this cache
679        // we'd pay a Kademlia lookup per chunk.
680        if self.closeness_pass_cache.lock().get(&pool_hash).is_some() {
681            return Ok(());
682        }
683
684        // Single-flight: on each attempt, either claim leadership by
685        // inserting a fresh `ClosenessSlot`, or wait on an existing leader
686        // and read its published result. The leader holds an `Arc` to the
687        // slot independent of the LruCache so waiters are still woken if
688        // eviction pressure kicked the cache entry.
689        //
690        // The `notified_owned()` future snapshots the `notify_waiters`
691        // counter at the moment of construction (while we hold the lock),
692        // which makes the subsequent `.await` race-free: if the leader
693        // calls `notify_waiters` between our construction and our poll, the
694        // counter has advanced and the future resolves immediately on first
695        // poll.
696        //
697        // Bounded retry: if we're a waiter and the leader gets cancelled or
698        // panics (slot.result.get() == None after wake-up), we loop back to
699        // claim leadership. `MAX_LEADER_RETRIES` bounds the attempts so
700        // adversarial cancellation cascades cannot spin this indefinitely.
701        for attempt in 0..=Self::MAX_LEADER_RETRIES {
702            // Release the mutex guard explicitly before any await below.
703            // Clippy wants `if let ... else` written as `map_or_else`, but
704            // any such rewrite re-borrows the locked `inflight` inside the
705            // closure and fails the borrow checker — so the lint is
706            // silenced here.
707            #[allow(clippy::option_if_let_else)]
708            let (waiter_slot, leader_slot) = {
709                let mut inflight = self.inflight_closeness.lock();
710                let chosen = if let Some(existing) = inflight.get(&pool_hash) {
711                    (Some(Arc::clone(existing)), None)
712                } else {
713                    let slot = Arc::new(ClosenessSlot::new());
714                    inflight.put(pool_hash, Arc::clone(&slot));
715                    (None, Some(slot))
716                };
717                drop(inflight);
718                chosen
719            };
720
721            if let Some(slot) = waiter_slot {
722                // Build the owned-notified future BEFORE awaiting, so it
723                // snapshots the `notify_waiters` counter now. The slot
724                // already existed when we locked, so the leader is either
725                // running or finished; in both cases the snapshot + counter
726                // check ensures we wake up correctly.
727                let notified = slot.notified_owned();
728                notified.await;
729
730                // Leader published a result — use it directly.
731                if let Some(result) = slot.result.get() {
732                    return result.clone().map_err(Error::Payment);
733                }
734                // Leader disappeared without publishing (panic or
735                // cancellation). Slot was cleared by the leader's drop
736                // guard; loop to become the new leader — unless we've
737                // hit the retry bound (see MAX_LEADER_RETRIES).
738                if attempt == Self::MAX_LEADER_RETRIES {
739                    return Err(Error::Payment(
740                        "Merkle candidate pool rejected: closeness leader \
741                         repeatedly failed to publish a result (likely \
742                         repeated cancellation or panic)."
743                            .into(),
744                    ));
745                }
746                continue;
747            }
748
749            // Leader path. Drop guard clears the slot and wakes waiters on
750            // every exit (success, failure, panic, cancellation).
751            let Some(slot) = leader_slot else {
752                // Unreachable by construction.
753                return Err(Error::Payment(
754                    "internal error: neither leader nor waiter in closeness check".into(),
755                ));
756            };
757            let guard = InflightGuard {
758                slot_cache: &self.inflight_closeness,
759                pool_hash,
760                slot,
761            };
762
763            let result = self.verify_merkle_candidate_closeness_inner(pool).await;
764            guard.publish(&result);
765            if result.is_ok() {
766                self.closeness_pass_cache.lock().put(pool_hash, ());
767            }
768            return result;
769        }
770        // Unreachable: the for-loop body always either `return`s or `continue`s,
771        // and the waiter branch's `continue` only runs when `attempt <
772        // Self::MAX_LEADER_RETRIES`. The last iteration's waiter branch returns
773        // via the retry-bound check; the leader branch always returns.
774        Err(Error::Payment(
775            "internal error: closeness retry loop exited without returning".into(),
776        ))
777    }
778
779    /// Inner closeness check: the actual DHT lookup + set-membership test.
780    /// Wrapped by [`verify_merkle_candidate_closeness`] with a pass-cache and
781    /// single-flight guard so a batch of chunks and a storm of forged PUTs
782    /// don't multiply the lookup cost.
783    /// Derive each candidate's `PeerId` from its `pub_key` and reject the
784    /// pool if any `PeerId` appears more than once.
785    ///
786    /// This is a pure-validation pre-check, runnable without a `P2PNode`:
787    /// catches the case where one real peer's `pub_key` is repeated to
788    /// inflate the closeness match count, without paying for a Kademlia
789    /// lookup. An honest pool has [`evmlib::merkle_payments::CANDIDATES_PER_POOL`]
790    /// distinct candidate `pub_keys` by construction.
791    fn derive_distinct_candidate_peer_ids(
792        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
793    ) -> Result<Vec<PeerId>> {
794        let mut candidate_peer_ids = Vec::with_capacity(pool.candidate_nodes.len());
795        let mut seen = std::collections::HashSet::with_capacity(pool.candidate_nodes.len());
796        for candidate in &pool.candidate_nodes {
797            let pid = peer_id_from_public_key_bytes(&candidate.pub_key).map_err(|e| {
798                Error::Payment(format!(
799                    "Invalid ML-DSA public key in merkle candidate: {e}"
800                ))
801            })?;
802            if !seen.insert(pid) {
803                return Err(Error::Payment(
804                    "Merkle candidate pool rejected: duplicate candidate PeerId. An \
805                     honest pool has 16 distinct candidate pub_keys; duplicates would \
806                     let a single real peer satisfy the closeness threshold by being \
807                     counted multiple times."
808                        .into(),
809                ));
810            }
811            candidate_peer_ids.push(pid);
812        }
813        Ok(candidate_peer_ids)
814    }
815
816    #[allow(clippy::too_many_lines)]
817    async fn verify_merkle_candidate_closeness_inner(
818        &self,
819        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
820    ) -> Result<()> {
821        // Pre-check: catch malformed/hostile pools (duplicate candidate
822        // PeerIds) before paying for the Kademlia lookup. Runs in unit
823        // tests without a P2PNode too.
824        let candidate_peer_ids = Self::derive_distinct_candidate_peer_ids(pool)?;
825
826        // Release the RwLock guard before any await to avoid holding it
827        // across an iterative Kademlia lookup.
828        let attached = self.p2p_node.read().as_ref().map(Arc::clone);
829        let Some(p2p_node) = attached else {
830            // Production must call attach_p2p_node at startup. Fail CLOSED
831            // to avoid silently disabling the defence if a startup path
832            // regresses and loses the attach call. Unit-test builds that
833            // construct a PaymentVerifier directly without exercising merkle
834            // verification are opted-in via `test-utils` to fall back to
835            // fail-open.
836            #[cfg(any(test, feature = "test-utils"))]
837            {
838                crate::logging::warn!(
839                    "PaymentVerifier: no P2PNode attached; merkle pay-yourself \
840                     defence SKIPPED (test build). Production startup MUST call \
841                     PaymentVerifier::attach_p2p_node."
842                );
843                return Ok(());
844            }
845            #[cfg(not(any(test, feature = "test-utils")))]
846            {
847                crate::logging::error!(
848                    "PaymentVerifier: no P2PNode attached; rejecting merkle \
849                     payment. This is a node-startup bug — \
850                     PaymentVerifier::attach_p2p_node must be called before \
851                     any PUT handler runs."
852                );
853                return Err(Error::Payment(
854                    "Merkle candidate pool rejected: verifier is not wired to \
855                     the P2P layer; cannot verify candidate closeness."
856                        .into(),
857                ));
858            }
859        };
860
861        let pool_address = pool.midpoint_proof.address();
862        let lookup_count = pool.candidate_nodes.len();
863        let network_lookup = p2p_node
864            .dht_manager()
865            .find_closest_nodes_network(&pool_address.0, lookup_count);
866        let network_peers =
867            match tokio::time::timeout(Self::CLOSENESS_LOOKUP_TIMEOUT, network_lookup).await {
868                Ok(Ok(peers)) => peers,
869                Ok(Err(e)) => {
870                    debug!(
871                        "Merkle closeness network-lookup failed for pool midpoint {}: {e}",
872                        hex::encode(pool_address.0),
873                    );
874                    return Err(Error::Payment(
875                        "Merkle candidate pool rejected: could not verify candidate \
876                     closeness against the authoritative network view."
877                            .into(),
878                    ));
879                }
880                Err(_) => {
881                    debug!(
882                        "Merkle closeness network-lookup timeout ({:?}) for pool midpoint {}",
883                        Self::CLOSENESS_LOOKUP_TIMEOUT,
884                        hex::encode(pool_address.0),
885                    );
886                    return Err(Error::Payment(
887                        "Merkle candidate pool rejected: authoritative network lookup \
888                     timed out. Retry once the network lookup completes."
889                            .into(),
890                    ));
891                }
892            };
893
894        // Sparse-network short-circuit: if the DHT itself returned fewer
895        // peers than the closeness threshold, the proof can never pass —
896        // not because the candidates are forged, but because we don't
897        // have an authoritative view to compare against. Surface this
898        // distinct cause so operators can tell "retry once the network
899        // settles" apart from "this peer sent a forged pool".
900        if network_peers.len() < Self::CANDIDATE_CLOSENESS_REQUIRED {
901            debug!(
902                "Merkle closeness deferred: network lookup returned {} peers \
903                 for pool midpoint {} (need at least {} to verify)",
904                network_peers.len(),
905                hex::encode(pool_address.0),
906                Self::CANDIDATE_CLOSENESS_REQUIRED,
907            );
908            return Err(Error::Payment(format!(
909                "Merkle candidate pool rejected: authoritative DHT lookup returned \
910                 only {} peers, less than the {} required to verify candidate \
911                 closeness. Retry once the routing table populates further.",
912                network_peers.len(),
913                Self::CANDIDATE_CLOSENESS_REQUIRED,
914            )));
915        }
916
917        // Set-membership check against the returned closest-peers list.
918        // Candidate `PeerId`s are deduplicated upstream, so each match
919        // corresponds to a distinct peer.
920        let network_set: std::collections::HashSet<PeerId> =
921            network_peers.iter().map(|n| n.peer_id).collect();
922        let matched = candidate_peer_ids
923            .iter()
924            .filter(|pid| network_set.contains(pid))
925            .count();
926
927        if matched < Self::CANDIDATE_CLOSENESS_REQUIRED {
928            debug!(
929                "Merkle closeness rejected: {matched}/{} candidates match the DHT's closest peers \
930                 for pool midpoint {} (required: {}, network returned {} peers)",
931                pool.candidate_nodes.len(),
932                hex::encode(pool_address.0),
933                Self::CANDIDATE_CLOSENESS_REQUIRED,
934                network_peers.len(),
935            );
936            return Err(Error::Payment(
937                "Merkle candidate pool rejected: candidate pub_keys do not match the \
938                 network's closest peers to the pool midpoint address. Pools must be \
939                 collected from the pool-address close group, not fabricated off-network."
940                    .into(),
941            ));
942        }
943
944        debug!(
945            "Merkle closeness passed: {matched}/{} candidates matched the DHT's closest peers \
946             for pool midpoint {}",
947            pool.candidate_nodes.len(),
948            hex::encode(pool_address.0),
949        );
950        Ok(())
951    }
952
953    /// Verify a merkle batch payment proof.
954    ///
955    /// This verification flow:
956    /// 1. Deserialize the `MerklePaymentProof`
957    /// 2. Check pool cache for previously verified pool hash
958    /// 3. If not cached, query on-chain for payment info
959    /// 4. Validate the proof against on-chain data
960    /// 5. Cache the pool hash for subsequent chunk verifications in the same batch
961    #[allow(clippy::too_many_lines)]
962    async fn verify_merkle_payment(&self, xorname: &XorName, proof_bytes: &[u8]) -> Result<()> {
963        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
964            debug!("Verifying merkle payment for {}", hex::encode(xorname));
965        }
966
967        // Deserialize the merkle proof
968        let merkle_proof = deserialize_merkle_proof(proof_bytes)
969            .map_err(|e| Error::Payment(format!("Failed to deserialize merkle proof: {e}")))?;
970
971        // Verify the address in the proof matches the xorname being stored
972        if merkle_proof.address.0 != *xorname {
973            let proof_hex = hex::encode(merkle_proof.address.0);
974            let store_hex = hex::encode(xorname);
975            return Err(Error::Payment(format!(
976                "Merkle proof address mismatch: proof is for {proof_hex}, but storing {store_hex}"
977            )));
978        }
979
980        let pool_hash = merkle_proof.winner_pool_hash();
981
982        // Run cheap local checks BEFORE expensive on-chain queries.
983        // This prevents DoS via garbage proofs that trigger RPC lookups.
984        for candidate in &merkle_proof.winner_pool.candidate_nodes {
985            if !crate::payment::verify_merkle_candidate_signature(candidate) {
986                return Err(Error::Payment(format!(
987                    "Invalid ML-DSA-65 signature on merkle candidate node (reward: {})",
988                    candidate.reward_address
989                )));
990            }
991        }
992
993        // Pay-yourself defence: the candidate pub_keys must map to peers the
994        // live DHT actually considers closest to the pool midpoint. Without
995        // this, an attacker can point all 16 reward_address fields at a
996        // self-owned wallet and drain their own payment. Every storing node
997        // runs this check against the single `winner_pool` in the proof, so a
998        // forged pool is rejected everywhere it lands. The pass cache and
999        // single-flight keyed on pool_hash collapse the Kademlia lookup cost
1000        // within a batch and across concurrent PUTs for the same pool.
1001        self.verify_merkle_candidate_closeness(&merkle_proof.winner_pool, pool_hash)
1002            .await?;
1003
1004        // Check pool cache first
1005        let cached_info = {
1006            let mut pool_cache = self.pool_cache.lock();
1007            pool_cache.get(&pool_hash).cloned()
1008        };
1009
1010        let payment_info = if let Some(info) = cached_info {
1011            debug!("Pool cache hit for hash {}", hex::encode(pool_hash));
1012            info
1013        } else {
1014            // Query on-chain for completed merkle payment
1015            let info =
1016                payment_vault::get_completed_merkle_payment(&self.config.evm.network, pool_hash)
1017                    .await
1018                    .map_err(|e| {
1019                        let pool_hex = hex::encode(pool_hash);
1020                        Error::Payment(format!(
1021                            "Failed to query merkle payment info for pool {pool_hex}: {e}"
1022                        ))
1023                    })?;
1024
1025            let paid_node_addresses: Vec<_> = info
1026                .paidNodeAddresses
1027                .iter()
1028                .map(|pna| (pna.rewardsAddress, usize::from(pna.poolIndex), pna.amount))
1029                .collect();
1030
1031            let on_chain_info = OnChainPaymentInfo {
1032                depth: info.depth,
1033                merkle_payment_timestamp: info.merklePaymentTimestamp,
1034                paid_node_addresses,
1035            };
1036
1037            // Cache the pool info for subsequent chunks in the same batch
1038            {
1039                let mut pool_cache = self.pool_cache.lock();
1040                pool_cache.put(pool_hash, on_chain_info.clone());
1041            }
1042
1043            debug!(
1044                "Queried on-chain merkle payment info for pool {}: depth={}, timestamp={}, paid_nodes={}",
1045                hex::encode(pool_hash),
1046                on_chain_info.depth,
1047                on_chain_info.merkle_payment_timestamp,
1048                on_chain_info.paid_node_addresses.len()
1049            );
1050
1051            on_chain_info
1052        };
1053
1054        // Verify timestamp consistency (signatures already checked above before RPC).
1055        for candidate in &merkle_proof.winner_pool.candidate_nodes {
1056            if candidate.merkle_payment_timestamp != payment_info.merkle_payment_timestamp {
1057                return Err(Error::Payment(format!(
1058                    "Candidate timestamp mismatch: expected {}, got {} (reward: {})",
1059                    payment_info.merkle_payment_timestamp,
1060                    candidate.merkle_payment_timestamp,
1061                    candidate.reward_address
1062                )));
1063            }
1064        }
1065
1066        // Get the root from the winner pool's midpoint proof
1067        let smart_contract_root = merkle_proof.winner_pool.midpoint_proof.root();
1068
1069        // Verify the cryptographic merkle proofs (address belongs to tree,
1070        // midpoint belongs to tree, roots match, timestamps valid).
1071        evmlib::merkle_payments::verify_merkle_proof(
1072            &merkle_proof.address,
1073            &merkle_proof.data_proof,
1074            &merkle_proof.winner_pool.midpoint_proof,
1075            payment_info.depth,
1076            smart_contract_root,
1077            payment_info.merkle_payment_timestamp,
1078        )
1079        .map_err(|e| {
1080            let xorname_hex = hex::encode(xorname);
1081            Error::Payment(format!(
1082                "Merkle proof verification failed for {xorname_hex}: {e}"
1083            ))
1084        })?;
1085
1086        // Verify paid node count matches depth
1087        let expected_depth = payment_info.depth as usize;
1088        let actual_paid = payment_info.paid_node_addresses.len();
1089        if actual_paid != expected_depth {
1090            return Err(Error::Payment(format!(
1091                "Wrong number of paid nodes: expected {expected_depth}, got {actual_paid}"
1092            )));
1093        }
1094
1095        // Compute expected per-node payment using the contract formula:
1096        // totalAmount = median16(candidate_prices) * (1 << depth)
1097        // amountPerNode = totalAmount / depth
1098        let expected_per_node = if payment_info.depth > 0 {
1099            let mut candidate_prices: Vec<Amount> = merkle_proof
1100                .winner_pool
1101                .candidate_nodes
1102                .iter()
1103                .map(|c| c.price)
1104                .collect();
1105            candidate_prices.sort_unstable(); // ascending
1106                                              // Upper median (index 8 of 16) — matches Solidity's median16 (k = 8)
1107            let median_price = *candidate_prices
1108                .get(candidate_prices.len() / 2)
1109                .ok_or_else(|| Error::Payment("empty candidate pool in merkle proof".into()))?;
1110            let shift = u32::from(payment_info.depth);
1111            let multiplier = 1u64
1112                .checked_shl(shift)
1113                .ok_or_else(|| Error::Payment("merkle proof depth too large".into()))?;
1114            let total_amount = median_price * Amount::from(multiplier);
1115            total_amount / Amount::from(u64::from(payment_info.depth))
1116        } else {
1117            Amount::ZERO
1118        };
1119
1120        // Verify paid node indices, addresses, and amounts against the candidate pool.
1121        //
1122        // Each paid node must:
1123        // 1. Have a valid index within the candidate pool
1124        // 2. Match the expected reward address at that index
1125        // 3. Have been paid at least the expected per-node amount from the
1126        //    contract formula: median16(prices) * 2^depth / depth
1127        //
1128        // Note: unlike single-node payments, merkle proofs are NOT bound to a
1129        // specific storing node. The contract pays `depth` random nodes from the
1130        // winner pool; the storing node is whichever close-group peer the client
1131        // routes the chunk to. There is no local-recipient check here because
1132        // any node that can verify the merkle proof is allowed to store the chunk.
1133        // Replay protection comes from the per-address proof binding (each proof
1134        // is for a specific XorName in the paid tree).
1135        for (addr, idx, paid_amount) in &payment_info.paid_node_addresses {
1136            let node = merkle_proof
1137                .winner_pool
1138                .candidate_nodes
1139                .get(*idx)
1140                .ok_or_else(|| {
1141                    Error::Payment(format!(
1142                        "Paid node index {idx} out of bounds for pool size {}",
1143                        merkle_proof.winner_pool.candidate_nodes.len()
1144                    ))
1145                })?;
1146            if node.reward_address != *addr {
1147                return Err(Error::Payment(format!(
1148                    "Paid node address mismatch at index {idx}: expected {addr}, got {}",
1149                    node.reward_address
1150                )));
1151            }
1152            if *paid_amount < expected_per_node {
1153                return Err(Error::Payment(format!(
1154                    "Underpayment for node at index {idx}: paid {paid_amount}, \
1155                     expected at least {expected_per_node} \
1156                     (median16 formula, depth={})",
1157                    payment_info.depth
1158                )));
1159            }
1160        }
1161
1162        if crate::logging::enabled!(crate::logging::Level::INFO) {
1163            info!(
1164                "Merkle payment verified for {} (pool: {})",
1165                hex::encode(xorname),
1166                hex::encode(pool_hash)
1167            );
1168        }
1169
1170        Ok(())
1171    }
1172
1173    /// Verify this node is among the paid recipients.
1174    fn validate_local_recipient(&self, payment: &ProofOfPayment) -> Result<()> {
1175        let local_addr = &self.config.local_rewards_address;
1176        let is_recipient = payment
1177            .peer_quotes
1178            .iter()
1179            .any(|(_, quote)| quote.rewards_address == *local_addr);
1180        if !is_recipient {
1181            return Err(Error::Payment(
1182                "Payment proof does not include this node as a recipient".to_string(),
1183            ));
1184        }
1185        Ok(())
1186    }
1187}
1188
1189#[cfg(test)]
1190#[allow(clippy::expect_used)]
1191mod tests {
1192    use super::*;
1193    use evmlib::merkle_payments::MerklePaymentCandidatePool;
1194
1195    /// Create a verifier for unit tests. EVM is always on, but tests can
1196    /// pre-populate the cache to bypass on-chain verification.
1197    fn create_test_verifier() -> PaymentVerifier {
1198        let config = PaymentVerifierConfig {
1199            evm: EvmVerifierConfig::default(),
1200            cache_capacity: 100,
1201            local_rewards_address: RewardsAddress::new([1u8; 20]),
1202        };
1203        PaymentVerifier::new(config)
1204    }
1205
1206    #[test]
1207    fn test_payment_required_for_new_data() {
1208        let verifier = create_test_verifier();
1209        let xorname = [1u8; 32];
1210
1211        // All uncached data requires payment
1212        let status = verifier.check_payment_required(&xorname);
1213        assert_eq!(status, PaymentStatus::PaymentRequired);
1214    }
1215
1216    #[test]
1217    fn test_cache_hit() {
1218        let verifier = create_test_verifier();
1219        let xorname = [1u8; 32];
1220
1221        // Manually add to cache
1222        verifier.cache.insert(xorname);
1223
1224        // Should return CachedAsVerified
1225        let status = verifier.check_payment_required(&xorname);
1226        assert_eq!(status, PaymentStatus::CachedAsVerified);
1227    }
1228
1229    #[tokio::test]
1230    async fn test_verify_payment_without_proof_rejected() {
1231        let verifier = create_test_verifier();
1232        let xorname = [1u8; 32];
1233
1234        // No proof provided => should return an error (EVM is always on)
1235        let result = verifier.verify_payment(&xorname, None).await;
1236        assert!(
1237            result.is_err(),
1238            "Expected Err without proof, got: {result:?}"
1239        );
1240    }
1241
1242    #[tokio::test]
1243    async fn test_verify_payment_cached() {
1244        let verifier = create_test_verifier();
1245        let xorname = [1u8; 32];
1246
1247        // Add to cache — simulates previously-paid data
1248        verifier.cache.insert(xorname);
1249
1250        // Should succeed without payment (cached)
1251        let result = verifier.verify_payment(&xorname, None).await;
1252        assert!(result.is_ok());
1253        assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified);
1254    }
1255
1256    #[test]
1257    fn test_payment_status_can_store() {
1258        assert!(PaymentStatus::CachedAsVerified.can_store());
1259        assert!(PaymentStatus::PaymentVerified.can_store());
1260        assert!(!PaymentStatus::PaymentRequired.can_store());
1261    }
1262
1263    #[test]
1264    fn test_payment_status_is_cached() {
1265        assert!(PaymentStatus::CachedAsVerified.is_cached());
1266        assert!(!PaymentStatus::PaymentVerified.is_cached());
1267        assert!(!PaymentStatus::PaymentRequired.is_cached());
1268    }
1269
1270    #[tokio::test]
1271    async fn test_cache_preload_bypasses_evm() {
1272        let verifier = create_test_verifier();
1273        let xorname = [42u8; 32];
1274
1275        // Not yet cached — should require payment
1276        assert_eq!(
1277            verifier.check_payment_required(&xorname),
1278            PaymentStatus::PaymentRequired
1279        );
1280
1281        // Pre-populate cache (simulates a previous successful payment)
1282        verifier.cache.insert(xorname);
1283
1284        // Now the xorname should be cached
1285        assert_eq!(
1286            verifier.check_payment_required(&xorname),
1287            PaymentStatus::CachedAsVerified
1288        );
1289    }
1290
1291    #[tokio::test]
1292    async fn test_proof_too_small() {
1293        let verifier = create_test_verifier();
1294        let xorname = [1u8; 32];
1295
1296        // Proof smaller than MIN_PAYMENT_PROOF_SIZE_BYTES
1297        let small_proof = vec![0u8; MIN_PAYMENT_PROOF_SIZE_BYTES - 1];
1298        let result = verifier.verify_payment(&xorname, Some(&small_proof)).await;
1299        assert!(result.is_err());
1300        let err_msg = format!("{}", result.expect_err("should fail"));
1301        assert!(
1302            err_msg.contains("too small"),
1303            "Error should mention 'too small': {err_msg}"
1304        );
1305    }
1306
1307    #[tokio::test]
1308    async fn test_proof_too_large() {
1309        let verifier = create_test_verifier();
1310        let xorname = [2u8; 32];
1311
1312        // Proof larger than MAX_PAYMENT_PROOF_SIZE_BYTES
1313        let large_proof = vec![0u8; MAX_PAYMENT_PROOF_SIZE_BYTES + 1];
1314        let result = verifier.verify_payment(&xorname, Some(&large_proof)).await;
1315        assert!(result.is_err());
1316        let err_msg = format!("{}", result.expect_err("should fail"));
1317        assert!(
1318            err_msg.contains("too large"),
1319            "Error should mention 'too large': {err_msg}"
1320        );
1321    }
1322
1323    #[tokio::test]
1324    async fn test_proof_at_min_boundary_unknown_tag() {
1325        let verifier = create_test_verifier();
1326        let xorname = [3u8; 32];
1327
1328        // Exactly MIN_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected
1329        let boundary_proof = vec![0xFFu8; MIN_PAYMENT_PROOF_SIZE_BYTES];
1330        let result = verifier
1331            .verify_payment(&xorname, Some(&boundary_proof))
1332            .await;
1333        assert!(result.is_err());
1334        let err_msg = format!("{}", result.expect_err("should fail"));
1335        assert!(
1336            err_msg.contains("Unknown payment proof type tag"),
1337            "Error should mention unknown tag: {err_msg}"
1338        );
1339    }
1340
1341    #[tokio::test]
1342    async fn test_proof_at_max_boundary_unknown_tag() {
1343        let verifier = create_test_verifier();
1344        let xorname = [4u8; 32];
1345
1346        // Exactly MAX_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected
1347        let boundary_proof = vec![0xFFu8; MAX_PAYMENT_PROOF_SIZE_BYTES];
1348        let result = verifier
1349            .verify_payment(&xorname, Some(&boundary_proof))
1350            .await;
1351        assert!(result.is_err());
1352        let err_msg = format!("{}", result.expect_err("should fail"));
1353        assert!(
1354            err_msg.contains("Unknown payment proof type tag"),
1355            "Error should mention unknown tag: {err_msg}"
1356        );
1357    }
1358
1359    #[tokio::test]
1360    async fn test_malformed_single_node_proof() {
1361        let verifier = create_test_verifier();
1362        let xorname = [5u8; 32];
1363
1364        // Valid tag (0x01) but garbage payload — should fail deserialization
1365        let mut garbage = vec![crate::ant_protocol::PROOF_TAG_SINGLE_NODE];
1366        garbage.extend_from_slice(&[0xAB; 63]);
1367        let result = verifier.verify_payment(&xorname, Some(&garbage)).await;
1368        assert!(result.is_err());
1369        let err_msg = format!("{}", result.expect_err("should fail"));
1370        assert!(
1371            err_msg.contains("deserialize") || err_msg.contains("Failed"),
1372            "Error should mention deserialization failure: {err_msg}"
1373        );
1374    }
1375
1376    #[test]
1377    fn test_cache_len_getter() {
1378        let verifier = create_test_verifier();
1379        assert_eq!(verifier.cache_len(), 0);
1380
1381        verifier.cache.insert([10u8; 32]);
1382        assert_eq!(verifier.cache_len(), 1);
1383
1384        verifier.cache.insert([20u8; 32]);
1385        assert_eq!(verifier.cache_len(), 2);
1386    }
1387
1388    #[test]
1389    fn test_cache_stats_after_operations() {
1390        let verifier = create_test_verifier();
1391        let xorname = [7u8; 32];
1392
1393        // Miss
1394        verifier.check_payment_required(&xorname);
1395        let stats = verifier.cache_stats();
1396        assert_eq!(stats.misses, 1);
1397        assert_eq!(stats.hits, 0);
1398
1399        // Insert and hit
1400        verifier.cache.insert(xorname);
1401        verifier.check_payment_required(&xorname);
1402        let stats = verifier.cache_stats();
1403        assert_eq!(stats.hits, 1);
1404        assert_eq!(stats.misses, 1);
1405        assert_eq!(stats.additions, 1);
1406    }
1407
1408    #[tokio::test]
1409    async fn test_concurrent_cache_lookups() {
1410        let verifier = std::sync::Arc::new(create_test_verifier());
1411
1412        // Pre-populate cache for all 10 xornames
1413        for i in 0..10u8 {
1414            verifier.cache.insert([i; 32]);
1415        }
1416
1417        let mut handles = Vec::new();
1418        for i in 0..10u8 {
1419            let v = verifier.clone();
1420            handles.push(tokio::spawn(async move {
1421                let xorname = [i; 32];
1422                v.verify_payment(&xorname, None).await
1423            }));
1424        }
1425
1426        for handle in handles {
1427            let result = handle.await.expect("task panicked");
1428            assert!(result.is_ok());
1429            assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified);
1430        }
1431
1432        assert_eq!(verifier.cache_len(), 10);
1433    }
1434
1435    #[test]
1436    fn test_default_evm_config() {
1437        let _config = EvmVerifierConfig::default();
1438        // EVM is always on — default network is ArbitrumOne
1439    }
1440
1441    #[test]
1442    fn test_real_ml_dsa_proof_size_within_limits() {
1443        use crate::payment::metrics::QuotingMetricsTracker;
1444        use crate::payment::proof::PaymentProof;
1445        use crate::payment::quote::{QuoteGenerator, XorName};
1446        use alloy::primitives::FixedBytes;
1447        use evmlib::{EncodedPeerId, RewardsAddress};
1448        use saorsa_core::MlDsa65;
1449        use saorsa_pqc::pqc::types::MlDsaSecretKey;
1450        use saorsa_pqc::pqc::MlDsaOperations;
1451
1452        let ml_dsa = MlDsa65::new();
1453        let mut peer_quotes = Vec::new();
1454
1455        for i in 0..5u8 {
1456            let (public_key, secret_key) = ml_dsa.generate_keypair().expect("keygen");
1457
1458            let rewards_address = RewardsAddress::new([i; 20]);
1459            let metrics_tracker = QuotingMetricsTracker::new(0);
1460            let mut generator = QuoteGenerator::new(rewards_address, metrics_tracker);
1461
1462            let pub_key_bytes = public_key.as_bytes().to_vec();
1463            let sk_bytes = secret_key.as_bytes().to_vec();
1464            generator.set_signer(pub_key_bytes, move |msg| {
1465                let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("sk parse");
1466                let ml_dsa = MlDsa65::new();
1467                ml_dsa.sign(&sk, msg).expect("sign").as_bytes().to_vec()
1468            });
1469
1470            let content: XorName = [i; 32];
1471            let quote = generator.create_quote(content, 4096, 0).expect("quote");
1472
1473            peer_quotes.push((EncodedPeerId::new(rand::random()), quote));
1474        }
1475
1476        let proof = PaymentProof {
1477            proof_of_payment: ProofOfPayment { peer_quotes },
1478            tx_hashes: vec![FixedBytes::from([0xABu8; 32])],
1479        };
1480
1481        let proof_bytes =
1482            crate::payment::proof::serialize_single_node_proof(&proof).expect("serialize");
1483
1484        // 7 ML-DSA-65 quotes with ~1952-byte pub keys and ~3309-byte signatures
1485        // should produce a proof in the 30-80 KB range
1486        assert!(
1487            proof_bytes.len() > 20_000,
1488            "Real 7-quote ML-DSA proof should be > 20 KB, got {} bytes",
1489            proof_bytes.len()
1490        );
1491        assert!(
1492            proof_bytes.len() < MAX_PAYMENT_PROOF_SIZE_BYTES,
1493            "Real 7-quote ML-DSA proof ({} bytes) should fit within {} byte limit",
1494            proof_bytes.len(),
1495            MAX_PAYMENT_PROOF_SIZE_BYTES
1496        );
1497    }
1498
1499    #[tokio::test]
1500    async fn test_content_address_mismatch_rejected() {
1501        use crate::payment::proof::{serialize_single_node_proof, PaymentProof};
1502        use evmlib::{EncodedPeerId, PaymentQuote, RewardsAddress};
1503        use std::time::SystemTime;
1504
1505        let verifier = create_test_verifier();
1506
1507        // The xorname we're trying to store
1508        let target_xorname = [0xAAu8; 32];
1509
1510        // Create a quote for a DIFFERENT xorname
1511        let wrong_xorname = [0xBBu8; 32];
1512        let quote = PaymentQuote {
1513            content: xor_name::XorName(wrong_xorname),
1514            timestamp: SystemTime::now(),
1515            price: Amount::from(1u64),
1516            rewards_address: RewardsAddress::new([1u8; 20]),
1517            pub_key: vec![0u8; 64],
1518            signature: vec![0u8; 64],
1519        };
1520
1521        // Build CLOSE_GROUP_SIZE quotes with distinct peer IDs
1522        let mut peer_quotes = Vec::new();
1523        for _ in 0..CLOSE_GROUP_SIZE {
1524            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1525        }
1526
1527        let proof = PaymentProof {
1528            proof_of_payment: ProofOfPayment { peer_quotes },
1529            tx_hashes: vec![],
1530        };
1531
1532        let proof_bytes = serialize_single_node_proof(&proof).expect("serialize proof");
1533
1534        let result = verifier
1535            .verify_payment(&target_xorname, Some(&proof_bytes))
1536            .await;
1537
1538        assert!(result.is_err(), "Should reject mismatched content address");
1539        let err_msg = format!("{}", result.expect_err("should be error"));
1540        assert!(
1541            err_msg.contains("content address mismatch"),
1542            "Error should mention 'content address mismatch': {err_msg}"
1543        );
1544    }
1545
1546    /// Helper: create a fake quote with the given xorname and timestamp.
1547    fn make_fake_quote(
1548        xorname: [u8; 32],
1549        timestamp: SystemTime,
1550        rewards_address: RewardsAddress,
1551    ) -> evmlib::PaymentQuote {
1552        use evmlib::PaymentQuote;
1553
1554        PaymentQuote {
1555            content: xor_name::XorName(xorname),
1556            timestamp,
1557            price: Amount::from(1u64),
1558            rewards_address,
1559            pub_key: vec![0u8; 64],
1560            signature: vec![0u8; 64],
1561        }
1562    }
1563
1564    /// Helper: wrap quotes into a tagged serialized `PaymentProof`.
1565    fn serialize_proof(peer_quotes: Vec<(evmlib::EncodedPeerId, evmlib::PaymentQuote)>) -> Vec<u8> {
1566        use crate::payment::proof::{serialize_single_node_proof, PaymentProof};
1567
1568        let proof = PaymentProof {
1569            proof_of_payment: ProofOfPayment { peer_quotes },
1570            tx_hashes: vec![],
1571        };
1572        serialize_single_node_proof(&proof).expect("serialize proof")
1573    }
1574
1575    #[tokio::test]
1576    async fn test_expired_quote_rejected() {
1577        use evmlib::{EncodedPeerId, RewardsAddress};
1578        use std::time::Duration;
1579
1580        let verifier = create_test_verifier();
1581        let xorname = [0xCCu8; 32];
1582        let rewards_addr = RewardsAddress::new([1u8; 20]);
1583
1584        // Create a quote that's 25 hours old (exceeds 24-hour max)
1585        let old_timestamp = SystemTime::now() - Duration::from_secs(25 * 3600);
1586        let quote = make_fake_quote(xorname, old_timestamp, rewards_addr);
1587
1588        let mut peer_quotes = Vec::new();
1589        for _ in 0..CLOSE_GROUP_SIZE {
1590            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1591        }
1592
1593        let proof_bytes = serialize_proof(peer_quotes);
1594        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1595
1596        assert!(result.is_err(), "Should reject expired quote");
1597        let err_msg = format!("{}", result.expect_err("should fail"));
1598        assert!(
1599            err_msg.contains("expired"),
1600            "Error should mention 'expired': {err_msg}"
1601        );
1602    }
1603
1604    #[tokio::test]
1605    async fn test_future_timestamp_rejected() {
1606        use evmlib::{EncodedPeerId, RewardsAddress};
1607        use std::time::Duration;
1608
1609        let verifier = create_test_verifier();
1610        let xorname = [0xDDu8; 32];
1611        let rewards_addr = RewardsAddress::new([1u8; 20]);
1612
1613        // Create a quote with a timestamp 1 hour in the future
1614        let future_timestamp = SystemTime::now() + Duration::from_secs(3600);
1615        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
1616
1617        let mut peer_quotes = Vec::new();
1618        for _ in 0..CLOSE_GROUP_SIZE {
1619            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1620        }
1621
1622        let proof_bytes = serialize_proof(peer_quotes);
1623        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1624
1625        assert!(result.is_err(), "Should reject future-timestamped quote");
1626        let err_msg = format!("{}", result.expect_err("should fail"));
1627        assert!(
1628            err_msg.contains("future"),
1629            "Error should mention 'future': {err_msg}"
1630        );
1631    }
1632
1633    #[tokio::test]
1634    async fn test_quote_within_clock_skew_tolerance_accepted() {
1635        use evmlib::{EncodedPeerId, RewardsAddress};
1636        use std::time::Duration;
1637
1638        let verifier = create_test_verifier();
1639        let xorname = [0xD1u8; 32];
1640        let rewards_addr = RewardsAddress::new([1u8; 20]);
1641
1642        // Quote 30 seconds in the future — within 60s tolerance
1643        let future_timestamp = SystemTime::now() + Duration::from_secs(30);
1644        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
1645
1646        let mut peer_quotes = Vec::new();
1647        for _ in 0..CLOSE_GROUP_SIZE {
1648            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1649        }
1650
1651        let proof_bytes = serialize_proof(peer_quotes);
1652        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1653
1654        // Should NOT fail at timestamp check (will fail later at pub_key binding)
1655        let err_msg = format!("{}", result.expect_err("should fail at later check"));
1656        assert!(
1657            !err_msg.contains("future"),
1658            "Should pass timestamp check (within tolerance), but got: {err_msg}"
1659        );
1660    }
1661
1662    #[tokio::test]
1663    async fn test_quote_just_beyond_clock_skew_tolerance_rejected() {
1664        use evmlib::{EncodedPeerId, RewardsAddress};
1665        use std::time::Duration;
1666
1667        let verifier = create_test_verifier();
1668        let xorname = [0xD2u8; 32];
1669        let rewards_addr = RewardsAddress::new([1u8; 20]);
1670
1671        // Quote 120 seconds in the future — exceeds 60s tolerance
1672        let future_timestamp = SystemTime::now() + Duration::from_secs(120);
1673        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
1674
1675        let mut peer_quotes = Vec::new();
1676        for _ in 0..CLOSE_GROUP_SIZE {
1677            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1678        }
1679
1680        let proof_bytes = serialize_proof(peer_quotes);
1681        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1682
1683        assert!(
1684            result.is_err(),
1685            "Should reject quote beyond clock skew tolerance"
1686        );
1687        let err_msg = format!("{}", result.expect_err("should fail"));
1688        assert!(
1689            err_msg.contains("future"),
1690            "Error should mention 'future': {err_msg}"
1691        );
1692    }
1693
1694    #[tokio::test]
1695    async fn test_quote_23h_old_still_accepted() {
1696        use evmlib::{EncodedPeerId, RewardsAddress};
1697        use std::time::Duration;
1698
1699        let verifier = create_test_verifier();
1700        let xorname = [0xD3u8; 32];
1701        let rewards_addr = RewardsAddress::new([1u8; 20]);
1702
1703        // Quote 23 hours old — within 24h max age
1704        let old_timestamp = SystemTime::now() - Duration::from_secs(23 * 3600);
1705        let quote = make_fake_quote(xorname, old_timestamp, rewards_addr);
1706
1707        let mut peer_quotes = Vec::new();
1708        for _ in 0..CLOSE_GROUP_SIZE {
1709            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1710        }
1711
1712        let proof_bytes = serialize_proof(peer_quotes);
1713        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1714
1715        // Should NOT fail at timestamp check (will fail later at pub_key binding)
1716        let err_msg = format!("{}", result.expect_err("should fail at later check"));
1717        assert!(
1718            !err_msg.contains("expired"),
1719            "Should pass expiry check (23h < 24h), but got: {err_msg}"
1720        );
1721    }
1722
1723    /// Helper: build an `EncodedPeerId` that matches the BLAKE3 hash of an ML-DSA public key.
1724    fn encoded_peer_id_for_pub_key(pub_key: &[u8]) -> evmlib::EncodedPeerId {
1725        let ant_peer_id = peer_id_from_public_key_bytes(pub_key).expect("valid ML-DSA pub key");
1726        evmlib::EncodedPeerId::new(*ant_peer_id.as_bytes())
1727    }
1728
1729    #[tokio::test]
1730    async fn test_local_not_in_paid_set_rejected() {
1731        use evmlib::RewardsAddress;
1732        use saorsa_core::MlDsa65;
1733        use saorsa_pqc::pqc::MlDsaOperations;
1734
1735        // Verifier with a local rewards address set
1736        let local_addr = RewardsAddress::new([0xAAu8; 20]);
1737        let config = PaymentVerifierConfig {
1738            evm: EvmVerifierConfig {
1739                network: EvmNetwork::ArbitrumOne,
1740            },
1741            cache_capacity: 100,
1742            local_rewards_address: local_addr,
1743        };
1744        let verifier = PaymentVerifier::new(config);
1745
1746        let xorname = [0xEEu8; 32];
1747        // Quotes pay a DIFFERENT rewards address
1748        let other_addr = RewardsAddress::new([0xBBu8; 20]);
1749
1750        // Use real ML-DSA keys so the pub_key→peer_id binding check passes
1751        let ml_dsa = MlDsa65::new();
1752        let mut peer_quotes = Vec::new();
1753        for _ in 0..CLOSE_GROUP_SIZE {
1754            let (public_key, _secret_key) = ml_dsa.generate_keypair().expect("keygen");
1755            let pub_key_bytes = public_key.as_bytes().to_vec();
1756            let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes);
1757
1758            let mut quote = make_fake_quote(xorname, SystemTime::now(), other_addr);
1759            quote.pub_key = pub_key_bytes;
1760
1761            peer_quotes.push((encoded, quote));
1762        }
1763
1764        let proof_bytes = serialize_proof(peer_quotes);
1765        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1766
1767        assert!(result.is_err(), "Should reject payment not addressed to us");
1768        let err_msg = format!("{}", result.expect_err("should fail"));
1769        assert!(
1770            err_msg.contains("does not include this node as a recipient"),
1771            "Error should mention recipient rejection: {err_msg}"
1772        );
1773    }
1774
1775    #[tokio::test]
1776    async fn test_wrong_peer_binding_rejected() {
1777        use evmlib::{EncodedPeerId, RewardsAddress};
1778        use saorsa_core::MlDsa65;
1779        use saorsa_pqc::pqc::MlDsaOperations;
1780
1781        let verifier = create_test_verifier();
1782        let xorname = [0xFFu8; 32];
1783        let rewards_addr = RewardsAddress::new([1u8; 20]);
1784
1785        // Generate a real ML-DSA keypair so pub_key is valid
1786        let ml_dsa = MlDsa65::new();
1787        let (public_key, _secret_key) = ml_dsa.generate_keypair().expect("keygen");
1788        let pub_key_bytes = public_key.as_bytes().to_vec();
1789
1790        // Create a quote with a real pub_key but attach it to a random peer ID
1791        // whose identity multihash does NOT match BLAKE3(pub_key)
1792        let mut quote = make_fake_quote(xorname, SystemTime::now(), rewards_addr);
1793        quote.pub_key = pub_key_bytes;
1794
1795        // Use random ed25519 peer IDs — they won't match BLAKE3(pub_key)
1796        let mut peer_quotes = Vec::new();
1797        for _ in 0..CLOSE_GROUP_SIZE {
1798            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1799        }
1800
1801        let proof_bytes = serialize_proof(peer_quotes);
1802        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1803
1804        assert!(result.is_err(), "Should reject wrong peer binding");
1805        let err_msg = format!("{}", result.expect_err("should fail"));
1806        assert!(
1807            err_msg.contains("pub_key does not belong to claimed peer"),
1808            "Error should mention binding mismatch: {err_msg}"
1809        );
1810    }
1811
1812    // =========================================================================
1813    // Merkle-tagged proof tests
1814    // =========================================================================
1815
1816    #[tokio::test]
1817    async fn test_merkle_tagged_proof_invalid_data_rejected() {
1818        use crate::ant_protocol::PROOF_TAG_MERKLE;
1819
1820        let verifier = create_test_verifier();
1821        let xorname = [0xA1u8; 32];
1822
1823        // Build a merkle-tagged proof with garbage body.
1824        // The tag byte is correct but the body is not valid msgpack.
1825        let mut merkle_garbage = Vec::with_capacity(64);
1826        merkle_garbage.push(PROOF_TAG_MERKLE);
1827        merkle_garbage.extend_from_slice(&[0xAB; 63]);
1828
1829        let result = verifier
1830            .verify_payment(&xorname, Some(&merkle_garbage))
1831            .await;
1832
1833        assert!(
1834            result.is_err(),
1835            "Should reject merkle proof with invalid body"
1836        );
1837        let err_msg = format!("{}", result.expect_err("should fail"));
1838        assert!(
1839            err_msg.contains("deserialize") || err_msg.contains("merkle proof"),
1840            "Error should mention deserialization failure: {err_msg}"
1841        );
1842    }
1843
1844    #[tokio::test]
1845    async fn test_single_node_tagged_proof_deserialization() {
1846        use crate::payment::proof::serialize_single_node_proof;
1847        use evmlib::{EncodedPeerId, RewardsAddress};
1848
1849        let verifier = create_test_verifier();
1850        let xorname = [0xA2u8; 32];
1851        let rewards_addr = RewardsAddress::new([1u8; 20]);
1852
1853        // Build a valid tagged single-node proof
1854        let quote = make_fake_quote(xorname, SystemTime::now(), rewards_addr);
1855        let mut peer_quotes = Vec::new();
1856        for _ in 0..CLOSE_GROUP_SIZE {
1857            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1858        }
1859
1860        let proof = crate::payment::proof::PaymentProof {
1861            proof_of_payment: ProofOfPayment {
1862                peer_quotes: peer_quotes.clone(),
1863            },
1864            tx_hashes: vec![],
1865        };
1866
1867        let tagged_bytes = serialize_single_node_proof(&proof).expect("serialize tagged proof");
1868
1869        // detect_proof_type should identify it as SingleNode
1870        assert_eq!(
1871            crate::payment::proof::detect_proof_type(&tagged_bytes),
1872            Some(crate::payment::proof::ProofType::SingleNode)
1873        );
1874
1875        // verify_payment should process it through the single-node path.
1876        // It will fail at quote validation (fake pub_key), but we verify
1877        // it passes the deserialization stage by checking the error type.
1878        let result = verifier.verify_payment(&xorname, Some(&tagged_bytes)).await;
1879
1880        assert!(result.is_err(), "Should fail at quote validation stage");
1881        let err_msg = format!("{}", result.expect_err("should fail"));
1882        // It should NOT be a deserialization error — it should get further
1883        assert!(
1884            !err_msg.contains("deserialize"),
1885            "Should pass deserialization but fail later: {err_msg}"
1886        );
1887    }
1888
1889    #[test]
1890    fn test_pool_cache_insert_and_lookup() {
1891        use evmlib::merkle_batch_payment::PoolHash;
1892
1893        // Verify the pool_cache field exists and works correctly.
1894        // Insert a pool hash, then verify it's present on lookup.
1895        let verifier = create_test_verifier();
1896
1897        let pool_hash: PoolHash = [0xBBu8; 32];
1898        let payment_info = evmlib::merkle_payments::OnChainPaymentInfo {
1899            depth: 4,
1900            merkle_payment_timestamp: 1_700_000_000,
1901            paid_node_addresses: vec![],
1902        };
1903
1904        // Insert into pool cache
1905        {
1906            let mut cache = verifier.pool_cache.lock();
1907            cache.put(pool_hash, payment_info);
1908        }
1909
1910        // First lookup — should find it
1911        {
1912            let found = verifier.pool_cache.lock().get(&pool_hash).cloned();
1913            assert!(found.is_some(), "Pool hash should be in cache after insert");
1914            let info = found.expect("cached info");
1915            assert_eq!(info.depth, 4);
1916            assert_eq!(info.merkle_payment_timestamp, 1_700_000_000);
1917        }
1918
1919        // Second lookup — same result (no double-query needed)
1920        {
1921            let found = verifier.pool_cache.lock().get(&pool_hash).cloned();
1922            assert!(
1923                found.is_some(),
1924                "Pool hash should still be in cache on second lookup"
1925            );
1926        }
1927
1928        // Different pool hash — should NOT be found
1929        let other_hash: PoolHash = [0xCCu8; 32];
1930        {
1931            let found = verifier.pool_cache.lock().get(&other_hash).cloned();
1932            assert!(found.is_none(), "Unknown pool hash should not be in cache");
1933        }
1934    }
1935
1936    #[tokio::test]
1937    async fn closeness_pass_cache_short_circuits_second_call() {
1938        // When a pool_hash is in the closeness_pass_cache, the outer
1939        // verify_merkle_candidate_closeness must return Ok(()) without
1940        // running the inner lookup — even if no P2PNode is attached.
1941        // That second half (no-p2p → would normally fail-closed in release)
1942        // is the proof the cache short-circuit ran first.
1943        let verifier = create_test_verifier();
1944        let pool_hash = [0xAAu8; 32];
1945        verifier.closeness_pass_cache.lock().put(pool_hash, ());
1946
1947        // Construct a dummy pool — contents don't matter because the cache
1948        // hit means we never look at them.
1949        let pool = MerklePaymentCandidatePool {
1950            midpoint_proof: fake_midpoint_proof(),
1951            candidate_nodes: make_candidate_nodes(1_700_000_000),
1952        };
1953
1954        let result = verifier
1955            .verify_merkle_candidate_closeness(&pool, pool_hash)
1956            .await;
1957        assert!(
1958            result.is_ok(),
1959            "cached pool hash must bypass the inner check and return Ok(()), got: {result:?}"
1960        );
1961    }
1962
1963    #[tokio::test]
1964    async fn closeness_single_flight_concurrent_readers_share_one_verification() {
1965        // Two concurrent callers for the same pool_hash should produce the
1966        // same outcome, and the cache should end up populated exactly once.
1967        // We use the test-utils fail-open path to short-circuit the inner
1968        // DHT lookup; the purpose of this test is the single-flight
1969        // plumbing, not the lookup itself.
1970        let verifier = Arc::new(create_test_verifier());
1971        let pool_hash = [0x77u8; 32];
1972        let pool = MerklePaymentCandidatePool {
1973            midpoint_proof: fake_midpoint_proof(),
1974            candidate_nodes: make_candidate_nodes(1_700_000_000),
1975        };
1976
1977        let v1 = Arc::clone(&verifier);
1978        let p1 = pool.clone();
1979        let v2 = Arc::clone(&verifier);
1980        let p2 = pool.clone();
1981
1982        let (r1, r2) = tokio::join!(
1983            async move { v1.verify_merkle_candidate_closeness(&p1, pool_hash).await },
1984            async move { v2.verify_merkle_candidate_closeness(&p2, pool_hash).await },
1985        );
1986
1987        assert_eq!(r1.is_ok(), r2.is_ok(), "concurrent callers must agree");
1988        assert!(
1989            r1.is_ok(),
1990            "both callers must succeed on the test-utils path"
1991        );
1992        assert!(
1993            verifier
1994                .closeness_pass_cache
1995                .lock()
1996                .get(&pool_hash)
1997                .is_some(),
1998            "success path must populate the pass cache"
1999        );
2000        assert!(
2001            verifier.inflight_closeness.lock().get(&pool_hash).is_none(),
2002            "inflight slot must be cleared after the leader finishes"
2003        );
2004    }
2005
2006    #[tokio::test]
2007    async fn closeness_waiter_reads_leaders_published_failure() {
2008        // Prove the waiter path actually surfaces a failure published by a
2009        // concurrent leader, without running its own inner check. Insert a
2010        // slot, spawn a waiter (which will park on notified_owned), then
2011        // publish failure + notify from the outside — simulating what the
2012        // leader's `publish` + drop-guard pair does.
2013        let verifier = Arc::new(create_test_verifier());
2014        let pool_hash = [0x55u8; 32];
2015        let slot = Arc::new(ClosenessSlot::new());
2016        verifier
2017            .inflight_closeness
2018            .lock()
2019            .put(pool_hash, Arc::clone(&slot));
2020
2021        let pool = MerklePaymentCandidatePool {
2022            midpoint_proof: fake_midpoint_proof(),
2023            candidate_nodes: make_candidate_nodes(1_700_000_000),
2024        };
2025
2026        let verifier_c = Arc::clone(&verifier);
2027        let pool_c = pool.clone();
2028        let waiter = tokio::spawn(async move {
2029            verifier_c
2030                .verify_merkle_candidate_closeness(&pool_c, pool_hash)
2031                .await
2032        });
2033
2034        // Yield so the waiter can run up to its `notified_owned().await`.
2035        // A few yields cover both single-threaded and multi-threaded tokio
2036        // runtimes regardless of scheduling.
2037        for _ in 0..5 {
2038            tokio::task::yield_now().await;
2039        }
2040
2041        // Simulate the leader's `publish` + drop-guard: publish the result,
2042        // clear the slot, wake waiters.
2043        slot.result
2044            .set(Err("forged pool: not close enough".to_string()))
2045            .expect("set once");
2046        verifier.inflight_closeness.lock().pop(&pool_hash);
2047        slot.notify.notify_waiters();
2048
2049        let result = waiter.await.expect("task panicked");
2050        let err = result.expect_err("waiter must return the leader's published failure");
2051        assert!(
2052            err.to_string().contains("forged pool"),
2053            "waiter must surface the leader's error message, got: {err}"
2054        );
2055    }
2056
2057    #[tokio::test]
2058    async fn closeness_rejects_pool_with_duplicate_candidate_pub_keys() {
2059        // An attacker who submits 16 copies of the same real peer's pub_key
2060        // would otherwise satisfy the 13/16 closeness threshold trivially:
2061        // that one peer's membership in the DHT-returned set would count
2062        // 16 times. The dedupe check in verify_merkle_candidate_closeness_inner
2063        // must reject the pool BEFORE the network lookup runs (so this test
2064        // works even with no P2PNode attached).
2065        let verifier = create_test_verifier();
2066        let pool_hash = [0xDDu8; 32];
2067
2068        // Build a normal pool, then overwrite every candidate's pub_key
2069        // with a single shared key so all 16 derive to the same PeerId.
2070        let mut candidates = make_candidate_nodes(1_700_000_000);
2071        let shared_pub_key = candidates
2072            .first()
2073            .expect("make_candidate_nodes returns CANDIDATES_PER_POOL entries")
2074            .pub_key
2075            .clone();
2076        for c in &mut candidates {
2077            c.pub_key = shared_pub_key.clone();
2078        }
2079        let pool = MerklePaymentCandidatePool {
2080            midpoint_proof: fake_midpoint_proof(),
2081            candidate_nodes: candidates,
2082        };
2083
2084        let result = verifier
2085            .verify_merkle_candidate_closeness(&pool, pool_hash)
2086            .await;
2087        let err = result.expect_err("duplicate candidate PeerIds must be rejected");
2088        let msg = err.to_string();
2089        assert!(
2090            msg.contains("duplicate candidate PeerId"),
2091            "rejection must be the duplicate-PeerId branch, got: {msg}"
2092        );
2093    }
2094
2095    /// Build a deterministic but otherwise-unused `MidpointProof` so unit
2096    /// tests can construct a `MerklePaymentCandidatePool` without spinning
2097    /// up a real merkle tree. The closeness path only calls `.address()`
2098    /// on it, which is a pure BLAKE3 of the branch's leaf/root/timestamp —
2099    /// the values don't need to be tree-valid for these tests.
2100    fn fake_midpoint_proof() -> evmlib::merkle_payments::MidpointProof {
2101        // Build a minimal tree of two leaves so we get a real branch.
2102        let leaves = vec![xor_name::XorName([1u8; 32]), xor_name::XorName([2u8; 32])];
2103        let tree = evmlib::merkle_payments::MerkleTree::from_xornames(leaves).expect("tree");
2104        let candidates = tree.reward_candidates(1_700_000_000).expect("candidates");
2105        candidates.first().expect("at least one").clone()
2106    }
2107
2108    // =========================================================================
2109    // Merkle verification unit tests
2110    // =========================================================================
2111
2112    /// Helper: build 16 validly-signed ML-DSA-65 candidate nodes.
2113    fn make_candidate_nodes(
2114        timestamp: u64,
2115    ) -> [evmlib::merkle_payments::MerklePaymentCandidateNode;
2116           evmlib::merkle_payments::CANDIDATES_PER_POOL] {
2117        use evmlib::merkle_payments::{MerklePaymentCandidateNode, CANDIDATES_PER_POOL};
2118        use saorsa_core::MlDsa65;
2119        use saorsa_pqc::pqc::types::MlDsaSecretKey;
2120        use saorsa_pqc::pqc::MlDsaOperations;
2121
2122        std::array::from_fn::<_, CANDIDATES_PER_POOL, _>(|i| {
2123            let ml_dsa = MlDsa65::new();
2124            let (pub_key, secret_key) = ml_dsa.generate_keypair().expect("keygen");
2125            let price = evmlib::common::Amount::from(1024u64);
2126            #[allow(clippy::cast_possible_truncation)]
2127            let reward_address = RewardsAddress::new([i as u8; 20]);
2128            let msg = MerklePaymentCandidateNode::bytes_to_sign(&price, &reward_address, timestamp);
2129            let sk = MlDsaSecretKey::from_bytes(secret_key.as_bytes()).expect("sk");
2130            let signature = ml_dsa.sign(&sk, &msg).expect("sign").as_bytes().to_vec();
2131
2132            MerklePaymentCandidateNode {
2133                pub_key: pub_key.as_bytes().to_vec(),
2134                price,
2135                reward_address,
2136                merkle_payment_timestamp: timestamp,
2137                signature,
2138            }
2139        })
2140    }
2141
2142    /// Helper: build a valid `MerklePaymentProof` with real ML-DSA-65
2143    /// signatures. Returns the raw proof, pool hash, xorname, and timestamp.
2144    fn make_valid_merkle_proof() -> (
2145        evmlib::merkle_payments::MerklePaymentProof,
2146        evmlib::merkle_batch_payment::PoolHash,
2147        [u8; 32],
2148        u64,
2149    ) {
2150        use evmlib::merkle_payments::{MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree};
2151
2152        let timestamp = std::time::SystemTime::now()
2153            .duration_since(std::time::UNIX_EPOCH)
2154            .expect("system time")
2155            .as_secs();
2156
2157        let addresses: Vec<xor_name::XorName> = (0..4u8)
2158            .map(|i| xor_name::XorName::from_content(&[i]))
2159            .collect();
2160        let tree = MerkleTree::from_xornames(addresses.clone()).expect("tree");
2161
2162        let candidate_nodes = make_candidate_nodes(timestamp);
2163
2164        let reward_candidates = tree
2165            .reward_candidates(timestamp)
2166            .expect("reward candidates");
2167        let midpoint_proof = reward_candidates
2168            .first()
2169            .expect("at least one candidate")
2170            .clone();
2171
2172        let pool = MerklePaymentCandidatePool {
2173            midpoint_proof,
2174            candidate_nodes,
2175        };
2176
2177        let first_address = *addresses.first().expect("first address");
2178        let address_proof = tree
2179            .generate_address_proof(0, first_address)
2180            .expect("proof");
2181
2182        let merkle_proof = MerklePaymentProof::new(first_address, address_proof, pool);
2183        let pool_hash = merkle_proof.winner_pool_hash();
2184        let xorname = first_address.0;
2185
2186        (merkle_proof, pool_hash, xorname, timestamp)
2187    }
2188
2189    /// Helper: build a minimal valid `MerklePaymentProof` with real ML-DSA-65
2190    /// signatures. Returns `(xorname, serialized_tagged_proof, pool_hash, timestamp)`.
2191    fn make_valid_merkle_proof_bytes() -> (
2192        [u8; 32],
2193        Vec<u8>,
2194        evmlib::merkle_batch_payment::PoolHash,
2195        u64,
2196    ) {
2197        let (merkle_proof, pool_hash, xorname, timestamp) = make_valid_merkle_proof();
2198        let tagged = crate::payment::proof::serialize_merkle_proof(&merkle_proof)
2199            .expect("serialize merkle proof");
2200        (xorname, tagged, pool_hash, timestamp)
2201    }
2202
2203    #[tokio::test]
2204    async fn test_merkle_address_mismatch_rejected() {
2205        let verifier = create_test_verifier();
2206        let (_correct_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
2207
2208        // Use a DIFFERENT xorname than what the proof was built for
2209        let wrong_xorname = [0xFFu8; 32];
2210
2211        let result = verifier
2212            .verify_payment(&wrong_xorname, Some(&tagged_proof))
2213            .await;
2214
2215        assert!(
2216            result.is_err(),
2217            "Should reject merkle proof address mismatch"
2218        );
2219        let err_msg = format!("{}", result.expect_err("should fail"));
2220        assert!(
2221            err_msg.contains("address mismatch") || err_msg.contains("Merkle proof address"),
2222            "Error should mention address mismatch: {err_msg}"
2223        );
2224    }
2225
2226    #[tokio::test]
2227    async fn test_merkle_malformed_body_rejected() {
2228        let verifier = create_test_verifier();
2229        let xorname = [0xA3u8; 32];
2230
2231        // Valid merkle tag but truncated/corrupted msgpack body
2232        let mut bad_proof = vec![crate::ant_protocol::PROOF_TAG_MERKLE];
2233        bad_proof.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
2234        bad_proof.extend_from_slice(&[0x00; 10]);
2235        // pad to minimum size
2236        while bad_proof.len() < MIN_PAYMENT_PROOF_SIZE_BYTES {
2237            bad_proof.push(0x00);
2238        }
2239
2240        let result = verifier.verify_payment(&xorname, Some(&bad_proof)).await;
2241
2242        assert!(result.is_err(), "Should reject malformed merkle body");
2243        let err_msg = format!("{}", result.expect_err("should fail"));
2244        assert!(
2245            err_msg.contains("deserialize") || err_msg.contains("Failed"),
2246            "Error should mention deserialization: {err_msg}"
2247        );
2248    }
2249
2250    #[test]
2251    fn test_merkle_proof_serialized_size_within_limits() {
2252        let (_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
2253
2254        // 16 ML-DSA-65 candidates (~1952 pub key + ~3309 sig each) ≈ 84 KB + tree data
2255        assert!(
2256            tagged_proof.len() >= MIN_PAYMENT_PROOF_SIZE_BYTES,
2257            "Merkle proof ({} bytes) should be >= min {} bytes",
2258            tagged_proof.len(),
2259            MIN_PAYMENT_PROOF_SIZE_BYTES
2260        );
2261        assert!(
2262            tagged_proof.len() <= MAX_PAYMENT_PROOF_SIZE_BYTES,
2263            "Merkle proof ({} bytes) should be <= max {} bytes",
2264            tagged_proof.len(),
2265            MAX_PAYMENT_PROOF_SIZE_BYTES
2266        );
2267    }
2268
2269    #[test]
2270    fn test_merkle_proof_tag_is_correct() {
2271        let (_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
2272
2273        assert_eq!(
2274            tagged_proof.first().copied(),
2275            Some(crate::ant_protocol::PROOF_TAG_MERKLE),
2276            "First byte must be the merkle tag"
2277        );
2278        assert_eq!(
2279            crate::payment::proof::detect_proof_type(&tagged_proof),
2280            Some(crate::payment::proof::ProofType::Merkle)
2281        );
2282    }
2283
2284    #[test]
2285    fn test_pool_cache_eviction() {
2286        use evmlib::merkle_batch_payment::PoolHash;
2287
2288        let config = PaymentVerifierConfig {
2289            evm: EvmVerifierConfig::default(),
2290            cache_capacity: 100,
2291            local_rewards_address: RewardsAddress::new([1u8; 20]),
2292        };
2293        let verifier = PaymentVerifier::new(config);
2294
2295        // Fill the pool cache to capacity (DEFAULT_POOL_CACHE_CAPACITY = 1000)
2296        for i in 0..DEFAULT_POOL_CACHE_CAPACITY {
2297            let mut hash: PoolHash = [0u8; 32];
2298            // Write index bytes into the hash
2299            let idx_bytes = i.to_le_bytes();
2300            for (j, b) in idx_bytes.iter().enumerate() {
2301                if j < 32 {
2302                    hash[j] = *b;
2303                }
2304            }
2305            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2306                depth: 4,
2307                merkle_payment_timestamp: 1_700_000_000,
2308                paid_node_addresses: vec![],
2309            };
2310            verifier.pool_cache.lock().put(hash, info);
2311        }
2312
2313        assert_eq!(
2314            verifier.pool_cache.lock().len(),
2315            DEFAULT_POOL_CACHE_CAPACITY
2316        );
2317
2318        // Insert one more — should evict the oldest
2319        let overflow_hash: PoolHash = [0xFFu8; 32];
2320        let info = evmlib::merkle_payments::OnChainPaymentInfo {
2321            depth: 8,
2322            merkle_payment_timestamp: 1_800_000_000,
2323            paid_node_addresses: vec![],
2324        };
2325        verifier.pool_cache.lock().put(overflow_hash, info);
2326
2327        // Size should still be at capacity (not capacity + 1)
2328        assert_eq!(
2329            verifier.pool_cache.lock().len(),
2330            DEFAULT_POOL_CACHE_CAPACITY
2331        );
2332
2333        // The new entry should be present
2334        let found = verifier.pool_cache.lock().get(&overflow_hash).cloned();
2335        assert!(
2336            found.is_some(),
2337            "Newly inserted pool hash should be present"
2338        );
2339        assert_eq!(found.expect("info").depth, 8);
2340    }
2341
2342    #[test]
2343    fn test_pool_cache_concurrent_access() {
2344        use evmlib::merkle_batch_payment::PoolHash;
2345        use std::sync::Arc;
2346
2347        let verifier = Arc::new(create_test_verifier());
2348
2349        let mut handles = Vec::new();
2350        for i in 0..20u8 {
2351            let v = verifier.clone();
2352            handles.push(std::thread::spawn(move || {
2353                let hash: PoolHash = [i; 32];
2354                let info = evmlib::merkle_payments::OnChainPaymentInfo {
2355                    depth: i,
2356                    merkle_payment_timestamp: u64::from(i) * 1000,
2357                    paid_node_addresses: vec![],
2358                };
2359                v.pool_cache.lock().put(hash, info);
2360
2361                // Read back
2362                let found = v.pool_cache.lock().get(&hash).cloned();
2363                assert!(found.is_some(), "Entry {i} should be readable after insert");
2364            }));
2365        }
2366
2367        for handle in handles {
2368            handle.join().expect("thread panicked");
2369        }
2370
2371        // All 20 entries should be present (well under 1000 capacity)
2372        assert_eq!(verifier.pool_cache.lock().len(), 20);
2373    }
2374
2375    #[tokio::test]
2376    async fn test_merkle_tampered_candidate_signature_rejected() {
2377        let verifier = create_test_verifier();
2378
2379        let (mut merkle_proof, _pool_hash, xorname, timestamp) = make_valid_merkle_proof();
2380
2381        // Tamper the first candidate's signature
2382        if let Some(byte) = merkle_proof
2383            .winner_pool
2384            .candidate_nodes
2385            .first_mut()
2386            .and_then(|c| c.signature.first_mut())
2387        {
2388            *byte ^= 0xFF;
2389        }
2390
2391        // Recompute pool hash after tampering (signature change alters the hash)
2392        let tampered_pool_hash = merkle_proof.winner_pool_hash();
2393
2394        // Pre-populate pool cache so we skip the on-chain query
2395        {
2396            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2397                depth: 4,
2398                merkle_payment_timestamp: timestamp,
2399                paid_node_addresses: vec![],
2400            };
2401            verifier.pool_cache.lock().put(tampered_pool_hash, info);
2402        }
2403
2404        let tagged =
2405            crate::payment::proof::serialize_merkle_proof(&merkle_proof).expect("serialize");
2406
2407        let result = verifier.verify_payment(&xorname, Some(&tagged)).await;
2408
2409        assert!(
2410            result.is_err(),
2411            "Should reject merkle proof with tampered candidate signature"
2412        );
2413        let err_msg = format!("{}", result.expect_err("should fail"));
2414        assert!(
2415            err_msg.contains("Invalid ML-DSA-65 signature"),
2416            "Error should mention invalid signature: {err_msg}"
2417        );
2418    }
2419
2420    #[tokio::test]
2421    async fn test_merkle_timestamp_mismatch_rejected() {
2422        let verifier = create_test_verifier();
2423
2424        let (xorname, tagged, pool_hash, timestamp) = make_valid_merkle_proof_bytes();
2425
2426        // Pre-populate pool cache with a DIFFERENT timestamp than the candidates
2427        {
2428            let mismatched_ts = timestamp + 9999;
2429            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2430                depth: 4,
2431                merkle_payment_timestamp: mismatched_ts,
2432                paid_node_addresses: vec![],
2433            };
2434            verifier.pool_cache.lock().put(pool_hash, info);
2435        }
2436
2437        let result = verifier.verify_payment(&xorname, Some(&tagged)).await;
2438
2439        assert!(
2440            result.is_err(),
2441            "Should reject merkle proof with timestamp mismatch"
2442        );
2443        let err_msg = format!("{}", result.expect_err("should fail"));
2444        assert!(
2445            err_msg.contains("timestamp mismatch"),
2446            "Error should mention timestamp mismatch: {err_msg}"
2447        );
2448    }
2449
2450    #[tokio::test]
2451    async fn test_merkle_paid_node_index_out_of_bounds_rejected() {
2452        let verifier = create_test_verifier();
2453        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2454
2455        // The test tree has 4 addresses → depth 2. We must match the tree depth
2456        // so verify_merkle_proof passes the depth check, then the paid node
2457        // index out-of-bounds check fires.
2458        {
2459            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2460                depth: 2,
2461                merkle_payment_timestamp: ts,
2462                paid_node_addresses: vec![
2463                    // First paid node: valid (matches candidate 0, amount matches formula)
2464                    // Expected per-node: median(1024) * 2^2 / 2 = 2048
2465                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(2048u64)),
2466                    // Second paid node: index 999 is way beyond CANDIDATES_PER_POOL (16)
2467                    (RewardsAddress::new([1u8; 20]), 999, Amount::from(2048u64)),
2468                ],
2469            };
2470            verifier.pool_cache.lock().put(pool_hash, info);
2471        }
2472
2473        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2474
2475        assert!(
2476            result.is_err(),
2477            "Should reject paid node index out of bounds"
2478        );
2479        let err_msg = format!("{}", result.expect_err("should fail"));
2480        assert!(
2481            err_msg.contains("out of bounds"),
2482            "Error should mention out of bounds: {err_msg}"
2483        );
2484    }
2485
2486    #[tokio::test]
2487    async fn test_merkle_paid_node_address_mismatch_rejected() {
2488        let verifier = create_test_verifier();
2489        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2490
2491        // Tree has depth 2, so provide 2 paid node entries.
2492        // Both use valid indices but the second has a wrong reward address.
2493        {
2494            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2495                depth: 2,
2496                merkle_payment_timestamp: ts,
2497                paid_node_addresses: vec![
2498                    // Index 0 with matching address [0x00; 20]
2499                    // Expected per-node: median(1024) * 2^2 / 2 = 2048
2500                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(2048u64)),
2501                    // Index 1 with WRONG address — candidate 1's address is [0x01; 20]
2502                    (RewardsAddress::new([0xFF; 20]), 1, Amount::from(2048u64)),
2503                ],
2504            };
2505            verifier.pool_cache.lock().put(pool_hash, info);
2506        }
2507
2508        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2509
2510        assert!(result.is_err(), "Should reject paid node address mismatch");
2511        let err_msg = format!("{}", result.expect_err("should fail"));
2512        assert!(
2513            err_msg.contains("address mismatch"),
2514            "Error should mention address mismatch: {err_msg}"
2515        );
2516    }
2517
2518    #[tokio::test]
2519    async fn test_merkle_wrong_depth_rejected() {
2520        let verifier = create_test_verifier();
2521        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2522
2523        // Pre-populate pool cache with depth=3 but only 1 paid node address
2524        // (depth must equal paid_node_addresses.len())
2525        {
2526            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2527                depth: 3,
2528                merkle_payment_timestamp: ts,
2529                paid_node_addresses: vec![(
2530                    RewardsAddress::new([0u8; 20]),
2531                    0,
2532                    Amount::from(1024u64),
2533                )],
2534            };
2535            verifier.pool_cache.lock().put(pool_hash, info);
2536        }
2537
2538        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2539
2540        assert!(
2541            result.is_err(),
2542            "Should reject mismatched depth vs paid node count"
2543        );
2544        let err_msg = format!("{}", result.expect_err("should fail"));
2545        assert!(
2546            err_msg.contains("Wrong number of paid nodes")
2547                || err_msg.contains("verification failed"),
2548            "Error should mention depth/count mismatch: {err_msg}"
2549        );
2550    }
2551
2552    #[tokio::test]
2553    async fn test_merkle_underpayment_rejected() {
2554        let verifier = create_test_verifier();
2555        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2556
2557        // Tree depth=2, so 2 paid nodes required. Candidates all quote price=1024.
2558        // Expected per-node: median(1024) * 2^2 / 2 = 2048.
2559        // Pay only 1 wei per node — far below the expected amount.
2560        {
2561            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2562                depth: 2,
2563                merkle_payment_timestamp: ts,
2564                paid_node_addresses: vec![
2565                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(1u64)),
2566                    (RewardsAddress::new([1u8; 20]), 1, Amount::from(1u64)),
2567                ],
2568            };
2569            verifier.pool_cache.lock().put(pool_hash, info);
2570        }
2571
2572        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2573
2574        assert!(
2575            result.is_err(),
2576            "Should reject merkle payment where paid amount < expected per-node amount"
2577        );
2578        let err_msg = format!("{}", result.expect_err("should fail"));
2579        assert!(
2580            err_msg.contains("Underpayment"),
2581            "Error should mention underpayment: {err_msg}"
2582        );
2583    }
2584}