Skip to main content

ant_node/payment/
verifier.rs

1//! Payment verifier with LRU cache and EVM verification.
2//!
3//! This is the core payment verification logic for ant-node.
4//! All new data requires EVM payment on Arbitrum (no free tier).
5
6use crate::ant_protocol::CLOSE_GROUP_SIZE;
7use crate::error::{Error, Result};
8use crate::logging::{debug, info};
9use crate::payment::cache::{CacheStats, VerifiedCache, XorName};
10use crate::payment::proof::{
11    deserialize_merkle_proof, deserialize_proof, detect_proof_type, ProofType,
12};
13use crate::payment::quote::{verify_quote_content, verify_quote_signature};
14use crate::payment::single_node::SingleNodePayment;
15use evmlib::common::Amount;
16use evmlib::contract::payment_vault;
17use evmlib::merkle_batch_payment::{OnChainPaymentInfo, PoolHash};
18use evmlib::Network as EvmNetwork;
19use evmlib::ProofOfPayment;
20use evmlib::RewardsAddress;
21use lru::LruCache;
22use parking_lot::{Mutex, RwLock};
23use saorsa_core::identity::node_identity::peer_id_from_public_key_bytes;
24use saorsa_core::identity::PeerId;
25use saorsa_core::P2PNode;
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28use std::time::SystemTime;
29
30/// Minimum allowed size for a payment proof in bytes.
31///
32/// This minimum ensures the proof contains at least a basic cryptographic hash or identifier.
33/// Proofs smaller than this are rejected as they cannot contain sufficient payment information.
34pub const MIN_PAYMENT_PROOF_SIZE_BYTES: usize = 32;
35
36/// Maximum allowed size for a payment proof in bytes (256 KB).
37///
38/// Single-node proofs with 7 ML-DSA-65 quotes reach ~40 KB.
39/// Merkle proofs include 16 candidate nodes (each with ~1,952-byte ML-DSA pub key
40/// and ~3,309-byte signature) plus merkle branch hashes, totaling ~130 KB.
41/// 256 KB provides headroom while still capping memory during verification.
42pub const MAX_PAYMENT_PROOF_SIZE_BYTES: usize = 262_144;
43
44/// Maximum age of a payment quote before it's considered expired (24 hours).
45/// Prevents replaying old cheap quotes against nearly-full nodes.
46const QUOTE_MAX_AGE_SECS: u64 = 86_400;
47
48/// Maximum allowed clock skew for quote timestamps (60 seconds).
49/// Accounts for NTP synchronization differences between P2P nodes.
50const QUOTE_CLOCK_SKEW_TOLERANCE_SECS: u64 = 60;
51
52/// Configuration for EVM payment verification.
53///
54/// EVM verification is always on. All new data requires on-chain
55/// payment verification. The network field selects which EVM chain to use.
56#[derive(Debug, Clone)]
57pub struct EvmVerifierConfig {
58    /// EVM network to use (Arbitrum One, Arbitrum Sepolia, etc.)
59    pub network: EvmNetwork,
60}
61
62impl Default for EvmVerifierConfig {
63    fn default() -> Self {
64        Self {
65            network: EvmNetwork::ArbitrumOne,
66        }
67    }
68}
69
70/// Configuration for the payment verifier.
71///
72/// All new data requires EVM payment on Arbitrum. The cache stores
73/// previously verified payments to avoid redundant on-chain lookups.
74#[derive(Debug, Clone)]
75pub struct PaymentVerifierConfig {
76    /// EVM verifier configuration.
77    pub evm: EvmVerifierConfig,
78    /// Cache capacity (number of `XorName` values to cache).
79    pub cache_capacity: usize,
80    /// Local node's rewards address.
81    /// The verifier rejects payments that don't include this node as a recipient.
82    pub local_rewards_address: RewardsAddress,
83}
84
85/// Status returned by payment verification.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum PaymentStatus {
88    /// Data was found in local cache - previously paid.
89    CachedAsVerified,
90    /// New data - payment required.
91    PaymentRequired,
92    /// Payment was provided and verified.
93    PaymentVerified,
94}
95
96impl PaymentStatus {
97    /// Returns true if the data can be stored (cached or payment verified).
98    #[must_use]
99    pub fn can_store(&self) -> bool {
100        matches!(self, Self::CachedAsVerified | Self::PaymentVerified)
101    }
102
103    /// Returns true if this status indicates the data was already paid for.
104    #[must_use]
105    pub fn is_cached(&self) -> bool {
106        matches!(self, Self::CachedAsVerified)
107    }
108}
109
110/// Default capacity for the merkle pool cache (number of pool hashes to cache).
111const DEFAULT_POOL_CACHE_CAPACITY: usize = 1_000;
112
113/// Main payment verifier for ant-node.
114///
115/// Uses:
116/// 1. LRU cache for fast lookups of previously verified `XorName` values
117/// 2. EVM payment verification for new data (always required)
118/// 3. Pool-level cache for merkle batch payments (avoids repeated on-chain queries)
119pub struct PaymentVerifier {
120    /// LRU cache of verified `XorName` values.
121    cache: VerifiedCache,
122    /// LRU cache of verified merkle pool hashes → on-chain payment info.
123    pool_cache: Mutex<LruCache<PoolHash, OnChainPaymentInfo>>,
124    /// LRU cache of pool hashes whose candidate closeness has already been
125    /// verified by this node. Collapses the per-chunk Kademlia lookup cost
126    /// within a batch (256 chunks × 1 pool = 1 lookup instead of 256).
127    closeness_pass_cache: Mutex<LruCache<PoolHash, ()>>,
128    /// In-flight closeness lookups, keyed by pool hash. Lets concurrent PUTs
129    /// for the same pool coalesce onto a single Kademlia lookup AND share
130    /// its result — on both success and failure — which bounds `DoS`
131    /// amplification to one lookup per unique `pool_hash` regardless of
132    /// concurrency.
133    inflight_closeness: Mutex<LruCache<PoolHash, Arc<ClosenessSlot>>>,
134    /// P2P node handle, attached post-construction so merkle verification can
135    /// check that candidate `pub_keys` map to peers actually close to the pool
136    /// midpoint in the live DHT. `None` in unit tests that don't exercise
137    /// merkle verification; production startup MUST call [`attach_p2p_node`].
138    p2p_node: RwLock<Option<Arc<P2PNode>>>,
139    /// Configuration.
140    config: PaymentVerifierConfig,
141}
142
143/// Shared state for an inflight closeness verification. The leader publishes
144/// its result via the `OnceLock`; waiters read that result directly instead
145/// of racing on a cache re-check. Wrapped in an `Arc` and held both by the
146/// leader's drop guard and by each waiting task.
147struct ClosenessSlot {
148    notify: Arc<tokio::sync::Notify>,
149    /// `Some(Ok(()))` on success, `Some(Err(msg))` on failure, `None` if the
150    /// leader disappeared without publishing (panic, cancellation).
151    result: std::sync::OnceLock<std::result::Result<(), String>>,
152}
153
154impl ClosenessSlot {
155    fn new() -> Self {
156        Self {
157            notify: Arc::new(tokio::sync::Notify::new()),
158            result: std::sync::OnceLock::new(),
159        }
160    }
161
162    /// Build an owned `Notified` future that snapshots the `notify_waiters`
163    /// counter at call time. Awaiting this future after dropping external
164    /// locks is race-free: if `notify_waiters` fires between construction
165    /// and the first poll, the snapshot mismatch resolves the future
166    /// immediately.
167    fn notified_owned(&self) -> tokio::sync::futures::OwnedNotified {
168        Arc::clone(&self.notify).notified_owned()
169    }
170}
171
172/// Drop guard that publishes the leader's result, clears the inflight slot,
173/// and wakes all waiters. Fires on every exit path: success, failure, panic,
174/// future-cancellation.
175///
176/// The guard owns its own `Arc<ClosenessSlot>` so `notify_waiters` still
177/// fires even if LRU pressure evicted the slot before the leader finished.
178/// Waiters see the published result via `result.get()`; the `Notify` is only
179/// the wake-up signal.
180struct InflightGuard<'a> {
181    slot_cache: &'a Mutex<LruCache<PoolHash, Arc<ClosenessSlot>>>,
182    pool_hash: PoolHash,
183    slot: Arc<ClosenessSlot>,
184}
185
186impl InflightGuard<'_> {
187    /// Publish the leader's result. Called exactly once by the leader on
188    /// every successful or explicit-error exit. If dropped without calling
189    /// (panic, cancellation) the guard still wakes waiters but leaves
190    /// `result` empty, which waiters treat as a transient failure and retry.
191    fn publish(&self, result: &Result<()>) {
192        let stored: std::result::Result<(), String> = match result {
193            Ok(()) => Ok(()),
194            Err(e) => Err(e.to_string()),
195        };
196        let _ = self.slot.result.set(stored);
197    }
198}
199
200impl Drop for InflightGuard<'_> {
201    fn drop(&mut self) {
202        // Remove the slot entry if it's still ours. A separate leader may
203        // have inserted a new slot for the same pool_hash after LRU
204        // eviction — don't pop someone else's entry.
205        {
206            let mut cache = self.slot_cache.lock();
207            if let Some(existing) = cache.peek(&self.pool_hash) {
208                if Arc::ptr_eq(existing, &self.slot) {
209                    cache.pop(&self.pool_hash);
210                }
211            }
212        }
213        // Wake every waiter registered against OUR slot, regardless of
214        // whether the cache entry is still ours.
215        self.slot.notify.notify_waiters();
216    }
217}
218
219impl PaymentVerifier {
220    /// Create a new payment verifier.
221    #[must_use]
222    pub fn new(config: PaymentVerifierConfig) -> Self {
223        const _: () = assert!(
224            DEFAULT_POOL_CACHE_CAPACITY > 0,
225            "pool cache capacity must be > 0"
226        );
227        let cache = VerifiedCache::with_capacity(config.cache_capacity);
228        let pool_cache_size =
229            NonZeroUsize::new(DEFAULT_POOL_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
230        let pool_cache = Mutex::new(LruCache::new(pool_cache_size));
231        let closeness_pass_cache = Mutex::new(LruCache::new(pool_cache_size));
232        let inflight_closeness = Mutex::new(LruCache::new(pool_cache_size));
233
234        let cache_capacity = config.cache_capacity;
235        info!("Payment verifier initialized (cache_capacity={cache_capacity}, evm=always-on, pool_cache={DEFAULT_POOL_CACHE_CAPACITY})");
236
237        // Loud warning if a production binary was accidentally built with
238        // `test-utils`: that feature flips the closeness-check fail-open
239        // switch, disabling the pay-yourself defence when P2PNode isn't
240        // attached. Safe in tests, never intended for prod.
241        #[cfg(feature = "test-utils")]
242        crate::logging::error!(
243            "PaymentVerifier: built with `test-utils` feature — merkle closeness \
244             defence falls back to fail-open when no P2PNode is attached. This \
245             feature is for test binaries only; production nodes must be built \
246             without it."
247        );
248
249        Self {
250            cache,
251            pool_cache,
252            closeness_pass_cache,
253            inflight_closeness,
254            p2p_node: RwLock::new(None),
255            config,
256        }
257    }
258
259    /// Attach the node's [`P2PNode`] handle so merkle-payment verification can
260    /// check candidate `pub_keys` against the DHT's actual closest peers to the
261    /// pool midpoint.
262    ///
263    /// Production startup MUST call this once the `P2PNode` exists. Without
264    /// it, the closeness check fails CLOSED in release builds (rejects the
265    /// PUT with a visible error) and fails open in test builds. Idempotent:
266    /// calling twice replaces the handle.
267    pub fn attach_p2p_node(&self, node: Arc<P2PNode>) {
268        *self.p2p_node.write() = Some(node);
269        debug!("PaymentVerifier: P2PNode attached for merkle closeness checks");
270    }
271
272    /// Check if payment is required for the given `XorName`.
273    ///
274    /// This is the main entry point for payment verification:
275    /// 1. Check LRU cache (fast path)
276    /// 2. If not cached, payment is required
277    ///
278    /// # Arguments
279    ///
280    /// * `xorname` - The content-addressed name of the data
281    ///
282    /// # Returns
283    ///
284    /// * `PaymentStatus::CachedAsVerified` - Found in local cache (previously paid)
285    /// * `PaymentStatus::PaymentRequired` - Not cached (payment required)
286    pub fn check_payment_required(&self, xorname: &XorName) -> PaymentStatus {
287        // Check LRU cache (fast path)
288        if self.cache.contains(xorname) {
289            if crate::logging::enabled!(crate::logging::Level::DEBUG) {
290                debug!("Data {} found in verified cache", hex::encode(xorname));
291            }
292            return PaymentStatus::CachedAsVerified;
293        }
294
295        // Not in cache - payment required
296        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
297            debug!(
298                "Data {} not in cache - payment required",
299                hex::encode(xorname)
300            );
301        }
302        PaymentStatus::PaymentRequired
303    }
304
305    /// Verify that a PUT request has valid payment.
306    ///
307    /// This is the complete payment verification flow:
308    /// 1. Check if data is in cache (previously paid)
309    /// 2. If not, verify the provided payment proof
310    ///
311    /// # Arguments
312    ///
313    /// * `xorname` - The content-addressed name of the data
314    /// * `payment_proof` - Optional payment proof (required if not in cache)
315    ///
316    /// # Returns
317    ///
318    /// * `Ok(PaymentStatus)` - Verification succeeded
319    /// * `Err(Error::Payment)` - No payment and not cached, or payment invalid
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if payment is required but not provided, or if payment is invalid.
324    pub async fn verify_payment(
325        &self,
326        xorname: &XorName,
327        payment_proof: Option<&[u8]>,
328    ) -> Result<PaymentStatus> {
329        // First check if payment is required
330        let status = self.check_payment_required(xorname);
331
332        match status {
333            PaymentStatus::CachedAsVerified => {
334                // No payment needed - already in cache
335                Ok(status)
336            }
337            PaymentStatus::PaymentRequired => {
338                // EVM verification is always on — verify the proof
339                if let Some(proof) = payment_proof {
340                    let proof_len = proof.len();
341                    if proof_len < MIN_PAYMENT_PROOF_SIZE_BYTES {
342                        return Err(Error::Payment(format!(
343                            "Payment proof too small: {proof_len} bytes (min {MIN_PAYMENT_PROOF_SIZE_BYTES})"
344                        )));
345                    }
346                    if proof_len > MAX_PAYMENT_PROOF_SIZE_BYTES {
347                        return Err(Error::Payment(format!(
348                            "Payment proof too large: {proof_len} bytes (max {MAX_PAYMENT_PROOF_SIZE_BYTES} bytes)"
349                        )));
350                    }
351
352                    // Detect proof type from version tag byte
353                    match detect_proof_type(proof) {
354                        Some(ProofType::Merkle) => {
355                            self.verify_merkle_payment(xorname, proof).await?;
356                        }
357                        Some(ProofType::SingleNode) => {
358                            let (payment, tx_hashes) = deserialize_proof(proof).map_err(|e| {
359                                Error::Payment(format!("Failed to deserialize payment proof: {e}"))
360                            })?;
361
362                            if !tx_hashes.is_empty() {
363                                debug!("Proof includes {} transaction hash(es)", tx_hashes.len());
364                            }
365
366                            self.verify_evm_payment(xorname, &payment).await?;
367                        }
368                        None => {
369                            let tag = proof.first().copied().unwrap_or(0);
370                            return Err(Error::Payment(format!(
371                                "Unknown payment proof type tag: 0x{tag:02x}"
372                            )));
373                        }
374                    }
375
376                    // Cache the verified xorname
377                    self.cache.insert(*xorname);
378
379                    Ok(PaymentStatus::PaymentVerified)
380                } else {
381                    // No payment provided in production mode
382                    let xorname_hex = hex::encode(xorname);
383                    Err(Error::Payment(format!(
384                        "Payment required for new data {xorname_hex}"
385                    )))
386                }
387            }
388            PaymentStatus::PaymentVerified => Err(Error::Payment(
389                "Unexpected PaymentVerified status from check_payment_required".to_string(),
390            )),
391        }
392    }
393
394    /// Get cache statistics.
395    #[must_use]
396    pub fn cache_stats(&self) -> CacheStats {
397        self.cache.stats()
398    }
399
400    /// Get the number of cached entries.
401    #[must_use]
402    pub fn cache_len(&self) -> usize {
403        self.cache.len()
404    }
405
406    /// Pre-populate the payment cache for a given address.
407    ///
408    /// This marks the address as already paid, so subsequent `verify_payment`
409    /// calls will return `CachedAsVerified` without on-chain verification.
410    /// Useful for test setups where real EVM payment is not needed.
411    #[cfg(any(test, feature = "test-utils"))]
412    pub fn cache_insert(&self, xorname: XorName) {
413        self.cache.insert(xorname);
414    }
415
416    /// Pre-populate the merkle pool cache. Testing helper that lets e2e tests
417    /// bypass the on-chain `completedMerklePayments` lookup when the point of
418    /// the test is to exercise merkle-verification logic BEFORE the on-chain
419    /// call (e.g. the pay-yourself closeness check).
420    #[cfg(any(test, feature = "test-utils"))]
421    pub fn pool_cache_insert(&self, pool_hash: PoolHash, info: OnChainPaymentInfo) {
422        let mut cache = self.pool_cache.lock();
423        cache.put(pool_hash, info);
424    }
425
426    /// Verify a single-node EVM payment proof.
427    ///
428    /// Verification steps:
429    /// 1. Exactly `CLOSE_GROUP_SIZE` quotes are present
430    /// 2. All quotes target the correct content address (xorname binding)
431    /// 3. Quote timestamps are fresh (not expired or future-dated)
432    /// 4. Peer ID bindings match the ML-DSA-65 public keys
433    /// 5. This node is among the quoted recipients
434    /// 6. All ML-DSA-65 signatures are valid (offloaded to `spawn_blocking`)
435    /// 7. The median-priced quote was paid at least 3x its price on-chain
436    ///    (looked up via `completedPayments(quoteHash)` on the payment vault)
437    ///
438    /// For unit tests that don't need on-chain verification, pre-populate
439    /// the cache so `verify_payment` returns `CachedAsVerified` before
440    /// reaching this method.
441    async fn verify_evm_payment(&self, xorname: &XorName, payment: &ProofOfPayment) -> Result<()> {
442        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
443            let xorname_hex = hex::encode(xorname);
444            let quote_count = payment.peer_quotes.len();
445            debug!("Verifying EVM payment for {xorname_hex} with {quote_count} quotes");
446        }
447
448        Self::validate_quote_structure(payment)?;
449        Self::validate_quote_content(payment, xorname)?;
450        Self::validate_quote_timestamps(payment)?;
451        Self::validate_peer_bindings(payment)?;
452        self.validate_local_recipient(payment)?;
453
454        // Verify quote signatures (CPU-bound, run off async runtime)
455        let peer_quotes = payment.peer_quotes.clone();
456        tokio::task::spawn_blocking(move || {
457            for (encoded_peer_id, quote) in &peer_quotes {
458                if !verify_quote_signature(quote) {
459                    return Err(Error::Payment(
460                        format!("Quote ML-DSA-65 signature verification failed for peer {encoded_peer_id:?}"),
461                    ));
462                }
463            }
464            Ok(())
465        })
466        .await
467        .map_err(|e| Error::Payment(format!("Signature verification task failed: {e}")))??;
468
469        // Reconstruct the SingleNodePayment to identify the median quote.
470        // from_quotes() sorts by price and marks the median for 3x payment.
471        let quotes_with_prices: Vec<_> = payment
472            .peer_quotes
473            .iter()
474            .map(|(_, quote)| (quote.clone(), quote.price))
475            .collect();
476        let single_payment = SingleNodePayment::from_quotes(quotes_with_prices).map_err(|e| {
477            Error::Payment(format!(
478                "Failed to reconstruct payment for verification: {e}"
479            ))
480        })?;
481
482        // Verify the median quote was paid at least 3x its price on-chain
483        // via completedPayments(quoteHash) on the payment vault contract.
484        let verified_amount = single_payment
485            .verify(&self.config.evm.network)
486            .await
487            .map_err(|e| {
488                let xorname_hex = hex::encode(xorname);
489                Error::Payment(format!(
490                    "Median quote payment verification failed for {xorname_hex}: {e}"
491                ))
492            })?;
493
494        if crate::logging::enabled!(crate::logging::Level::INFO) {
495            let xorname_hex = hex::encode(xorname);
496            info!("EVM payment verified for {xorname_hex} (median paid {verified_amount} atto)");
497        }
498        Ok(())
499    }
500
501    /// Validate quote count, uniqueness, and basic structure.
502    fn validate_quote_structure(payment: &ProofOfPayment) -> Result<()> {
503        if payment.peer_quotes.is_empty() {
504            return Err(Error::Payment("Payment has no quotes".to_string()));
505        }
506
507        let quote_count = payment.peer_quotes.len();
508        if quote_count != CLOSE_GROUP_SIZE {
509            return Err(Error::Payment(format!(
510                "Payment must have exactly {CLOSE_GROUP_SIZE} quotes, got {quote_count}"
511            )));
512        }
513
514        let mut seen: Vec<&evmlib::EncodedPeerId> = Vec::with_capacity(quote_count);
515        for (encoded_peer_id, _) in &payment.peer_quotes {
516            if seen.contains(&encoded_peer_id) {
517                return Err(Error::Payment(format!(
518                    "Duplicate peer ID in payment quotes: {encoded_peer_id:?}"
519                )));
520            }
521            seen.push(encoded_peer_id);
522        }
523
524        Ok(())
525    }
526
527    /// Verify all quotes target the correct content address.
528    fn validate_quote_content(payment: &ProofOfPayment, xorname: &XorName) -> Result<()> {
529        for (encoded_peer_id, quote) in &payment.peer_quotes {
530            if !verify_quote_content(quote, xorname) {
531                let expected_hex = hex::encode(xorname);
532                let actual_hex = hex::encode(quote.content.0);
533                return Err(Error::Payment(format!(
534                    "Quote content address mismatch for peer {encoded_peer_id:?}: expected {expected_hex}, got {actual_hex}"
535                )));
536            }
537        }
538        Ok(())
539    }
540
541    /// Verify quote freshness — reject stale or excessively future quotes.
542    fn validate_quote_timestamps(payment: &ProofOfPayment) -> Result<()> {
543        let now = SystemTime::now();
544        for (encoded_peer_id, quote) in &payment.peer_quotes {
545            match now.duration_since(quote.timestamp) {
546                Ok(age) => {
547                    if age.as_secs() > QUOTE_MAX_AGE_SECS {
548                        return Err(Error::Payment(format!(
549                            "Quote from peer {encoded_peer_id:?} expired: age {}s exceeds max {QUOTE_MAX_AGE_SECS}s",
550                            age.as_secs()
551                        )));
552                    }
553                }
554                Err(_) => {
555                    if let Ok(skew) = quote.timestamp.duration_since(now) {
556                        if skew.as_secs() > QUOTE_CLOCK_SKEW_TOLERANCE_SECS {
557                            return Err(Error::Payment(format!(
558                                "Quote from peer {encoded_peer_id:?} has timestamp {}s in the future \
559                                 (exceeds {QUOTE_CLOCK_SKEW_TOLERANCE_SECS}s tolerance)",
560                                skew.as_secs()
561                            )));
562                        }
563                    } else {
564                        return Err(Error::Payment(format!(
565                            "Quote from peer {encoded_peer_id:?} has invalid timestamp"
566                        )));
567                    }
568                }
569            }
570        }
571        Ok(())
572    }
573
574    /// Verify each quote's `pub_key` matches the claimed peer ID via BLAKE3.
575    fn validate_peer_bindings(payment: &ProofOfPayment) -> Result<()> {
576        for (encoded_peer_id, quote) in &payment.peer_quotes {
577            let expected_peer_id = peer_id_from_public_key_bytes(&quote.pub_key)
578                .map_err(|e| Error::Payment(format!("Invalid ML-DSA public key in quote: {e}")))?;
579
580            if expected_peer_id.as_bytes() != encoded_peer_id.as_bytes() {
581                let expected_hex = expected_peer_id.to_hex();
582                let actual_hex = hex::encode(encoded_peer_id.as_bytes());
583                return Err(Error::Payment(format!(
584                    "Quote pub_key does not belong to claimed peer {encoded_peer_id:?}: \
585                     BLAKE3(pub_key) = {expected_hex}, peer_id = {actual_hex}"
586                )));
587            }
588        }
589        Ok(())
590    }
591
592    /// Minimum number of candidate `pub_keys` (out of 16) whose derived `PeerId`
593    /// must match the DHT's actual closest peers to the pool midpoint address.
594    ///
595    /// Set below 16/16 to absorb normal routing-table skew between the
596    /// payer's view and this node's view — on a well-connected network the
597    /// divergence between two nodes' closest-set views is typically 1-2
598    /// peers, occasionally 3 during churn. 13/16 tolerates 3 divergent
599    /// peers while still limiting how many candidates an attacker can
600    /// fabricate before the check bites. A lower threshold (e.g. 9/16)
601    /// would let an attacker who controls 7 real neighbourhood peers plant
602    /// 7 fabricated candidates and still pass.
603    ///
604    /// This is the pure "fabricated key" defence; it does not stop an
605    /// attacker who can grind the pool midpoint address to land near 13
606    /// pre-chosen keys AND run those keys as Sybil DHT participants. That
607    /// requires an orthogonal Sybil-resistance layer and is out of scope
608    /// for this check.
609    const CANDIDATE_CLOSENESS_REQUIRED: usize = 13;
610
611    /// Timeout for the authoritative network lookup used by the closeness
612    /// check.
613    ///
614    /// Iterative Kademlia lookups can cascade through up to 20 iterations,
615    /// and a single unresponsive peer's dial can take 20-30s before timing
616    /// out. 60s leaves room for the lookup to converge even under churn
617    /// while still capping `DoS` amplification at roughly one bounded lookup
618    /// per forged `pool_hash`.
619    const CLOSENESS_LOOKUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
620
621    /// Maximum waiter → leader retries when the leader's future was cancelled
622    /// or panicked before publishing a result. Beyond this the waiter returns
623    /// a visible error rather than spinning indefinitely through a
624    /// cancellation cascade.
625    const MAX_LEADER_RETRIES: usize = 4;
626
627    /// Verify that the candidate pool's `pub_keys` correspond to peers that
628    /// are actually XOR-closest to the pool midpoint address, by querying
629    /// the DHT for its closest peers to that address and requiring that a
630    /// majority of the candidates match.
631    ///
632    /// **What this blocks**: the "pay yourself" attack. Candidate signatures
633    /// only cover `(price, reward_address, timestamp)` and the `pub_key` bytes —
634    /// nothing ties a candidate to a network-registered identity or to the
635    /// pool neighbourhood. Without this check an attacker can generate 16
636    /// ML-DSA keypairs locally, point all 16 `reward_address` fields at a
637    /// single attacker-controlled wallet, submit the merkle payment, and drain
638    /// their own payment back out.
639    ///
640    /// **How it blocks**: each candidate's `PeerId = BLAKE3(pub_key)`; the DHT
641    /// is the authoritative source of "which peers exist at this XOR
642    /// coordinate". If the attacker's 16 fabricated `PeerId`s are not among
643    /// the peers the network actually lists as closest to the pool address,
644    /// the pool is forged.
645    ///
646    /// **Scope**: a `MerklePaymentProof` carries exactly one `winner_pool`
647    /// (the pool the smart contract selected for the batch). Every storing
648    /// node that receives the proof independently re-runs this check against
649    /// that same pool, so a forged pool is rejected at every node it
650    /// reaches.
651    ///
652    /// **Known limitation — Sybil-grinding**: `midpoint_proof.address()` is a
653    /// BLAKE3 hash of attacker-controllable inputs (leaf bytes, tree root,
654    /// timestamp). A determined attacker who *also* runs Sybil DHT nodes can
655    /// grind the midpoint until it lands in a region where 13 of their
656    /// Sybil keys are the true network-closest — at which point this check
657    /// passes for the attacker. Closing that gap requires binding the
658    /// midpoint to an attacker-uncontrolled value (e.g. a block hash at
659    /// payment time or an on-chain VRF) or a Sybil-resistant identity
660    /// layer. This defence raises the attack cost from "free" to "run N
661    /// Sybil nodes AND grind", which is a meaningful but not complete
662    /// improvement.
663    async fn verify_merkle_candidate_closeness(
664        &self,
665        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
666        pool_hash: PoolHash,
667    ) -> Result<()> {
668        // Fast path: this node already verified this pool successfully.
669        // A batch of 256 chunks shares one winner_pool, so without this cache
670        // we'd pay a Kademlia lookup per chunk.
671        if self.closeness_pass_cache.lock().get(&pool_hash).is_some() {
672            return Ok(());
673        }
674
675        // Single-flight: on each attempt, either claim leadership by
676        // inserting a fresh `ClosenessSlot`, or wait on an existing leader
677        // and read its published result. The leader holds an `Arc` to the
678        // slot independent of the LruCache so waiters are still woken if
679        // eviction pressure kicked the cache entry.
680        //
681        // The `notified_owned()` future snapshots the `notify_waiters`
682        // counter at the moment of construction (while we hold the lock),
683        // which makes the subsequent `.await` race-free: if the leader
684        // calls `notify_waiters` between our construction and our poll, the
685        // counter has advanced and the future resolves immediately on first
686        // poll.
687        //
688        // Bounded retry: if we're a waiter and the leader gets cancelled or
689        // panics (slot.result.get() == None after wake-up), we loop back to
690        // claim leadership. `MAX_LEADER_RETRIES` bounds the attempts so
691        // adversarial cancellation cascades cannot spin this indefinitely.
692        for attempt in 0..=Self::MAX_LEADER_RETRIES {
693            // Release the mutex guard explicitly before any await below.
694            // Clippy wants `if let ... else` written as `map_or_else`, but
695            // any such rewrite re-borrows the locked `inflight` inside the
696            // closure and fails the borrow checker — so the lint is
697            // silenced here.
698            #[allow(clippy::option_if_let_else)]
699            let (waiter_slot, leader_slot) = {
700                let mut inflight = self.inflight_closeness.lock();
701                let chosen = if let Some(existing) = inflight.get(&pool_hash) {
702                    (Some(Arc::clone(existing)), None)
703                } else {
704                    let slot = Arc::new(ClosenessSlot::new());
705                    inflight.put(pool_hash, Arc::clone(&slot));
706                    (None, Some(slot))
707                };
708                drop(inflight);
709                chosen
710            };
711
712            if let Some(slot) = waiter_slot {
713                // Build the owned-notified future BEFORE awaiting, so it
714                // snapshots the `notify_waiters` counter now. The slot
715                // already existed when we locked, so the leader is either
716                // running or finished; in both cases the snapshot + counter
717                // check ensures we wake up correctly.
718                let notified = slot.notified_owned();
719                notified.await;
720
721                // Leader published a result — use it directly.
722                if let Some(result) = slot.result.get() {
723                    return result.clone().map_err(Error::Payment);
724                }
725                // Leader disappeared without publishing (panic or
726                // cancellation). Slot was cleared by the leader's drop
727                // guard; loop to become the new leader — unless we've
728                // hit the retry bound (see MAX_LEADER_RETRIES).
729                if attempt == Self::MAX_LEADER_RETRIES {
730                    return Err(Error::Payment(
731                        "Merkle candidate pool rejected: closeness leader \
732                         repeatedly failed to publish a result (likely \
733                         repeated cancellation or panic)."
734                            .into(),
735                    ));
736                }
737                continue;
738            }
739
740            // Leader path. Drop guard clears the slot and wakes waiters on
741            // every exit (success, failure, panic, cancellation).
742            let Some(slot) = leader_slot else {
743                // Unreachable by construction.
744                return Err(Error::Payment(
745                    "internal error: neither leader nor waiter in closeness check".into(),
746                ));
747            };
748            let guard = InflightGuard {
749                slot_cache: &self.inflight_closeness,
750                pool_hash,
751                slot,
752            };
753
754            let result = self.verify_merkle_candidate_closeness_inner(pool).await;
755            guard.publish(&result);
756            if result.is_ok() {
757                self.closeness_pass_cache.lock().put(pool_hash, ());
758            }
759            return result;
760        }
761        // Unreachable: the for-loop body always either `return`s or `continue`s,
762        // and the waiter branch's `continue` only runs when `attempt <
763        // Self::MAX_LEADER_RETRIES`. The last iteration's waiter branch returns
764        // via the retry-bound check; the leader branch always returns.
765        Err(Error::Payment(
766            "internal error: closeness retry loop exited without returning".into(),
767        ))
768    }
769
770    /// Inner closeness check: the actual DHT lookup + set-membership test.
771    /// Wrapped by [`verify_merkle_candidate_closeness`] with a pass-cache and
772    /// single-flight guard so a batch of chunks and a storm of forged PUTs
773    /// don't multiply the lookup cost.
774    async fn verify_merkle_candidate_closeness_inner(
775        &self,
776        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
777    ) -> Result<()> {
778        // Release the RwLock guard before any await to avoid holding it
779        // across an iterative Kademlia lookup.
780        let attached = self.p2p_node.read().as_ref().map(Arc::clone);
781        let Some(p2p_node) = attached else {
782            // Production must call attach_p2p_node at startup. Fail CLOSED
783            // to avoid silently disabling the defence if a startup path
784            // regresses and loses the attach call. Unit-test builds that
785            // construct a PaymentVerifier directly without exercising merkle
786            // verification are opted-in via `test-utils` to fall back to
787            // fail-open.
788            #[cfg(any(test, feature = "test-utils"))]
789            {
790                crate::logging::warn!(
791                    "PaymentVerifier: no P2PNode attached; merkle pay-yourself \
792                     defence SKIPPED (test build). Production startup MUST call \
793                     PaymentVerifier::attach_p2p_node."
794                );
795                return Ok(());
796            }
797            #[cfg(not(any(test, feature = "test-utils")))]
798            {
799                crate::logging::error!(
800                    "PaymentVerifier: no P2PNode attached; rejecting merkle \
801                     payment. This is a node-startup bug — \
802                     PaymentVerifier::attach_p2p_node must be called before \
803                     any PUT handler runs."
804                );
805                return Err(Error::Payment(
806                    "Merkle candidate pool rejected: verifier is not wired to \
807                     the P2P layer; cannot verify candidate closeness."
808                        .into(),
809                ));
810            }
811        };
812
813        let pool_address = pool.midpoint_proof.address();
814
815        // Derive each candidate's would-be PeerId from its pub_key. Fail
816        // closed on malformed keys — the candidate signature check ran
817        // upstream so a valid-looking pool ought to parse cleanly here.
818        let mut candidate_peer_ids = Vec::with_capacity(pool.candidate_nodes.len());
819        for candidate in &pool.candidate_nodes {
820            let pid = peer_id_from_public_key_bytes(&candidate.pub_key).map_err(|e| {
821                Error::Payment(format!(
822                    "Invalid ML-DSA public key in merkle candidate: {e}"
823                ))
824            })?;
825            candidate_peer_ids.push(pid);
826        }
827
828        let lookup_count = pool.candidate_nodes.len();
829        let network_lookup = p2p_node
830            .dht_manager()
831            .find_closest_nodes_network(&pool_address.0, lookup_count);
832        let network_peers =
833            match tokio::time::timeout(Self::CLOSENESS_LOOKUP_TIMEOUT, network_lookup).await {
834                Ok(Ok(peers)) => peers,
835                Ok(Err(e)) => {
836                    debug!(
837                        "Merkle closeness network-lookup failed for pool midpoint {}: {e}",
838                        hex::encode(pool_address.0),
839                    );
840                    return Err(Error::Payment(
841                        "Merkle candidate pool rejected: could not verify candidate \
842                     closeness against the authoritative network view."
843                            .into(),
844                    ));
845                }
846                Err(_) => {
847                    debug!(
848                        "Merkle closeness network-lookup timeout ({:?}) for pool midpoint {}",
849                        Self::CLOSENESS_LOOKUP_TIMEOUT,
850                        hex::encode(pool_address.0),
851                    );
852                    return Err(Error::Payment(
853                        "Merkle candidate pool rejected: authoritative network lookup \
854                     timed out. Retry once the network lookup completes."
855                            .into(),
856                    ));
857                }
858            };
859
860        // Set-membership check against the returned closest-peers list. The
861        // lookup may return fewer than `lookup_count` on a sparse network,
862        // which only tightens the bar — any candidate not in the returned
863        // list counts as unmatched.
864        let network_set: std::collections::HashSet<PeerId> =
865            network_peers.iter().map(|n| n.peer_id).collect();
866        let matched = candidate_peer_ids
867            .iter()
868            .filter(|pid| network_set.contains(pid))
869            .count();
870
871        if matched < Self::CANDIDATE_CLOSENESS_REQUIRED {
872            debug!(
873                "Merkle closeness rejected: {matched}/{} candidates match the DHT's closest peers \
874                 for pool midpoint {} (required: {}, network returned {} peers)",
875                pool.candidate_nodes.len(),
876                hex::encode(pool_address.0),
877                Self::CANDIDATE_CLOSENESS_REQUIRED,
878                network_peers.len(),
879            );
880            return Err(Error::Payment(
881                "Merkle candidate pool rejected: candidate pub_keys do not match the \
882                 network's closest peers to the pool midpoint address. Pools must be \
883                 collected from the pool-address close group, not fabricated off-network."
884                    .into(),
885            ));
886        }
887
888        debug!(
889            "Merkle closeness passed: {matched}/{} candidates matched the DHT's closest peers \
890             for pool midpoint {}",
891            pool.candidate_nodes.len(),
892            hex::encode(pool_address.0),
893        );
894        Ok(())
895    }
896
897    /// Verify a merkle batch payment proof.
898    ///
899    /// This verification flow:
900    /// 1. Deserialize the `MerklePaymentProof`
901    /// 2. Check pool cache for previously verified pool hash
902    /// 3. If not cached, query on-chain for payment info
903    /// 4. Validate the proof against on-chain data
904    /// 5. Cache the pool hash for subsequent chunk verifications in the same batch
905    #[allow(clippy::too_many_lines)]
906    async fn verify_merkle_payment(&self, xorname: &XorName, proof_bytes: &[u8]) -> Result<()> {
907        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
908            debug!("Verifying merkle payment for {}", hex::encode(xorname));
909        }
910
911        // Deserialize the merkle proof
912        let merkle_proof = deserialize_merkle_proof(proof_bytes)
913            .map_err(|e| Error::Payment(format!("Failed to deserialize merkle proof: {e}")))?;
914
915        // Verify the address in the proof matches the xorname being stored
916        if merkle_proof.address.0 != *xorname {
917            let proof_hex = hex::encode(merkle_proof.address.0);
918            let store_hex = hex::encode(xorname);
919            return Err(Error::Payment(format!(
920                "Merkle proof address mismatch: proof is for {proof_hex}, but storing {store_hex}"
921            )));
922        }
923
924        let pool_hash = merkle_proof.winner_pool_hash();
925
926        // Run cheap local checks BEFORE expensive on-chain queries.
927        // This prevents DoS via garbage proofs that trigger RPC lookups.
928        for candidate in &merkle_proof.winner_pool.candidate_nodes {
929            if !crate::payment::verify_merkle_candidate_signature(candidate) {
930                return Err(Error::Payment(format!(
931                    "Invalid ML-DSA-65 signature on merkle candidate node (reward: {})",
932                    candidate.reward_address
933                )));
934            }
935        }
936
937        // Pay-yourself defence: the candidate pub_keys must map to peers the
938        // live DHT actually considers closest to the pool midpoint. Without
939        // this, an attacker can point all 16 reward_address fields at a
940        // self-owned wallet and drain their own payment. Every storing node
941        // runs this check against the single `winner_pool` in the proof, so a
942        // forged pool is rejected everywhere it lands. The pass cache and
943        // single-flight keyed on pool_hash collapse the Kademlia lookup cost
944        // within a batch and across concurrent PUTs for the same pool.
945        self.verify_merkle_candidate_closeness(&merkle_proof.winner_pool, pool_hash)
946            .await?;
947
948        // Check pool cache first
949        let cached_info = {
950            let mut pool_cache = self.pool_cache.lock();
951            pool_cache.get(&pool_hash).cloned()
952        };
953
954        let payment_info = if let Some(info) = cached_info {
955            debug!("Pool cache hit for hash {}", hex::encode(pool_hash));
956            info
957        } else {
958            // Query on-chain for completed merkle payment
959            let info =
960                payment_vault::get_completed_merkle_payment(&self.config.evm.network, pool_hash)
961                    .await
962                    .map_err(|e| {
963                        let pool_hex = hex::encode(pool_hash);
964                        Error::Payment(format!(
965                            "Failed to query merkle payment info for pool {pool_hex}: {e}"
966                        ))
967                    })?;
968
969            let paid_node_addresses: Vec<_> = info
970                .paidNodeAddresses
971                .iter()
972                .map(|pna| (pna.rewardsAddress, usize::from(pna.poolIndex), pna.amount))
973                .collect();
974
975            let on_chain_info = OnChainPaymentInfo {
976                depth: info.depth,
977                merkle_payment_timestamp: info.merklePaymentTimestamp,
978                paid_node_addresses,
979            };
980
981            // Cache the pool info for subsequent chunks in the same batch
982            {
983                let mut pool_cache = self.pool_cache.lock();
984                pool_cache.put(pool_hash, on_chain_info.clone());
985            }
986
987            debug!(
988                "Queried on-chain merkle payment info for pool {}: depth={}, timestamp={}, paid_nodes={}",
989                hex::encode(pool_hash),
990                on_chain_info.depth,
991                on_chain_info.merkle_payment_timestamp,
992                on_chain_info.paid_node_addresses.len()
993            );
994
995            on_chain_info
996        };
997
998        // Verify timestamp consistency (signatures already checked above before RPC).
999        for candidate in &merkle_proof.winner_pool.candidate_nodes {
1000            if candidate.merkle_payment_timestamp != payment_info.merkle_payment_timestamp {
1001                return Err(Error::Payment(format!(
1002                    "Candidate timestamp mismatch: expected {}, got {} (reward: {})",
1003                    payment_info.merkle_payment_timestamp,
1004                    candidate.merkle_payment_timestamp,
1005                    candidate.reward_address
1006                )));
1007            }
1008        }
1009
1010        // Get the root from the winner pool's midpoint proof
1011        let smart_contract_root = merkle_proof.winner_pool.midpoint_proof.root();
1012
1013        // Verify the cryptographic merkle proofs (address belongs to tree,
1014        // midpoint belongs to tree, roots match, timestamps valid).
1015        evmlib::merkle_payments::verify_merkle_proof(
1016            &merkle_proof.address,
1017            &merkle_proof.data_proof,
1018            &merkle_proof.winner_pool.midpoint_proof,
1019            payment_info.depth,
1020            smart_contract_root,
1021            payment_info.merkle_payment_timestamp,
1022        )
1023        .map_err(|e| {
1024            let xorname_hex = hex::encode(xorname);
1025            Error::Payment(format!(
1026                "Merkle proof verification failed for {xorname_hex}: {e}"
1027            ))
1028        })?;
1029
1030        // Verify paid node count matches depth
1031        let expected_depth = payment_info.depth as usize;
1032        let actual_paid = payment_info.paid_node_addresses.len();
1033        if actual_paid != expected_depth {
1034            return Err(Error::Payment(format!(
1035                "Wrong number of paid nodes: expected {expected_depth}, got {actual_paid}"
1036            )));
1037        }
1038
1039        // Compute expected per-node payment using the contract formula:
1040        // totalAmount = median16(candidate_prices) * (1 << depth)
1041        // amountPerNode = totalAmount / depth
1042        let expected_per_node = if payment_info.depth > 0 {
1043            let mut candidate_prices: Vec<Amount> = merkle_proof
1044                .winner_pool
1045                .candidate_nodes
1046                .iter()
1047                .map(|c| c.price)
1048                .collect();
1049            candidate_prices.sort_unstable(); // ascending
1050                                              // Upper median (index 8 of 16) — matches Solidity's median16 (k = 8)
1051            let median_price = *candidate_prices
1052                .get(candidate_prices.len() / 2)
1053                .ok_or_else(|| Error::Payment("empty candidate pool in merkle proof".into()))?;
1054            let shift = u32::from(payment_info.depth);
1055            let multiplier = 1u64
1056                .checked_shl(shift)
1057                .ok_or_else(|| Error::Payment("merkle proof depth too large".into()))?;
1058            let total_amount = median_price * Amount::from(multiplier);
1059            total_amount / Amount::from(u64::from(payment_info.depth))
1060        } else {
1061            Amount::ZERO
1062        };
1063
1064        // Verify paid node indices, addresses, and amounts against the candidate pool.
1065        //
1066        // Each paid node must:
1067        // 1. Have a valid index within the candidate pool
1068        // 2. Match the expected reward address at that index
1069        // 3. Have been paid at least the expected per-node amount from the
1070        //    contract formula: median16(prices) * 2^depth / depth
1071        //
1072        // Note: unlike single-node payments, merkle proofs are NOT bound to a
1073        // specific storing node. The contract pays `depth` random nodes from the
1074        // winner pool; the storing node is whichever close-group peer the client
1075        // routes the chunk to. There is no local-recipient check here because
1076        // any node that can verify the merkle proof is allowed to store the chunk.
1077        // Replay protection comes from the per-address proof binding (each proof
1078        // is for a specific XorName in the paid tree).
1079        for (addr, idx, paid_amount) in &payment_info.paid_node_addresses {
1080            let node = merkle_proof
1081                .winner_pool
1082                .candidate_nodes
1083                .get(*idx)
1084                .ok_or_else(|| {
1085                    Error::Payment(format!(
1086                        "Paid node index {idx} out of bounds for pool size {}",
1087                        merkle_proof.winner_pool.candidate_nodes.len()
1088                    ))
1089                })?;
1090            if node.reward_address != *addr {
1091                return Err(Error::Payment(format!(
1092                    "Paid node address mismatch at index {idx}: expected {addr}, got {}",
1093                    node.reward_address
1094                )));
1095            }
1096            if *paid_amount < expected_per_node {
1097                return Err(Error::Payment(format!(
1098                    "Underpayment for node at index {idx}: paid {paid_amount}, \
1099                     expected at least {expected_per_node} \
1100                     (median16 formula, depth={})",
1101                    payment_info.depth
1102                )));
1103            }
1104        }
1105
1106        if crate::logging::enabled!(crate::logging::Level::INFO) {
1107            info!(
1108                "Merkle payment verified for {} (pool: {})",
1109                hex::encode(xorname),
1110                hex::encode(pool_hash)
1111            );
1112        }
1113
1114        Ok(())
1115    }
1116
1117    /// Verify this node is among the paid recipients.
1118    fn validate_local_recipient(&self, payment: &ProofOfPayment) -> Result<()> {
1119        let local_addr = &self.config.local_rewards_address;
1120        let is_recipient = payment
1121            .peer_quotes
1122            .iter()
1123            .any(|(_, quote)| quote.rewards_address == *local_addr);
1124        if !is_recipient {
1125            return Err(Error::Payment(
1126                "Payment proof does not include this node as a recipient".to_string(),
1127            ));
1128        }
1129        Ok(())
1130    }
1131}
1132
1133#[cfg(test)]
1134#[allow(clippy::expect_used)]
1135mod tests {
1136    use super::*;
1137    use evmlib::merkle_payments::MerklePaymentCandidatePool;
1138
1139    /// Create a verifier for unit tests. EVM is always on, but tests can
1140    /// pre-populate the cache to bypass on-chain verification.
1141    fn create_test_verifier() -> PaymentVerifier {
1142        let config = PaymentVerifierConfig {
1143            evm: EvmVerifierConfig::default(),
1144            cache_capacity: 100,
1145            local_rewards_address: RewardsAddress::new([1u8; 20]),
1146        };
1147        PaymentVerifier::new(config)
1148    }
1149
1150    #[test]
1151    fn test_payment_required_for_new_data() {
1152        let verifier = create_test_verifier();
1153        let xorname = [1u8; 32];
1154
1155        // All uncached data requires payment
1156        let status = verifier.check_payment_required(&xorname);
1157        assert_eq!(status, PaymentStatus::PaymentRequired);
1158    }
1159
1160    #[test]
1161    fn test_cache_hit() {
1162        let verifier = create_test_verifier();
1163        let xorname = [1u8; 32];
1164
1165        // Manually add to cache
1166        verifier.cache.insert(xorname);
1167
1168        // Should return CachedAsVerified
1169        let status = verifier.check_payment_required(&xorname);
1170        assert_eq!(status, PaymentStatus::CachedAsVerified);
1171    }
1172
1173    #[tokio::test]
1174    async fn test_verify_payment_without_proof_rejected() {
1175        let verifier = create_test_verifier();
1176        let xorname = [1u8; 32];
1177
1178        // No proof provided => should return an error (EVM is always on)
1179        let result = verifier.verify_payment(&xorname, None).await;
1180        assert!(
1181            result.is_err(),
1182            "Expected Err without proof, got: {result:?}"
1183        );
1184    }
1185
1186    #[tokio::test]
1187    async fn test_verify_payment_cached() {
1188        let verifier = create_test_verifier();
1189        let xorname = [1u8; 32];
1190
1191        // Add to cache — simulates previously-paid data
1192        verifier.cache.insert(xorname);
1193
1194        // Should succeed without payment (cached)
1195        let result = verifier.verify_payment(&xorname, None).await;
1196        assert!(result.is_ok());
1197        assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified);
1198    }
1199
1200    #[test]
1201    fn test_payment_status_can_store() {
1202        assert!(PaymentStatus::CachedAsVerified.can_store());
1203        assert!(PaymentStatus::PaymentVerified.can_store());
1204        assert!(!PaymentStatus::PaymentRequired.can_store());
1205    }
1206
1207    #[test]
1208    fn test_payment_status_is_cached() {
1209        assert!(PaymentStatus::CachedAsVerified.is_cached());
1210        assert!(!PaymentStatus::PaymentVerified.is_cached());
1211        assert!(!PaymentStatus::PaymentRequired.is_cached());
1212    }
1213
1214    #[tokio::test]
1215    async fn test_cache_preload_bypasses_evm() {
1216        let verifier = create_test_verifier();
1217        let xorname = [42u8; 32];
1218
1219        // Not yet cached — should require payment
1220        assert_eq!(
1221            verifier.check_payment_required(&xorname),
1222            PaymentStatus::PaymentRequired
1223        );
1224
1225        // Pre-populate cache (simulates a previous successful payment)
1226        verifier.cache.insert(xorname);
1227
1228        // Now the xorname should be cached
1229        assert_eq!(
1230            verifier.check_payment_required(&xorname),
1231            PaymentStatus::CachedAsVerified
1232        );
1233    }
1234
1235    #[tokio::test]
1236    async fn test_proof_too_small() {
1237        let verifier = create_test_verifier();
1238        let xorname = [1u8; 32];
1239
1240        // Proof smaller than MIN_PAYMENT_PROOF_SIZE_BYTES
1241        let small_proof = vec![0u8; MIN_PAYMENT_PROOF_SIZE_BYTES - 1];
1242        let result = verifier.verify_payment(&xorname, Some(&small_proof)).await;
1243        assert!(result.is_err());
1244        let err_msg = format!("{}", result.expect_err("should fail"));
1245        assert!(
1246            err_msg.contains("too small"),
1247            "Error should mention 'too small': {err_msg}"
1248        );
1249    }
1250
1251    #[tokio::test]
1252    async fn test_proof_too_large() {
1253        let verifier = create_test_verifier();
1254        let xorname = [2u8; 32];
1255
1256        // Proof larger than MAX_PAYMENT_PROOF_SIZE_BYTES
1257        let large_proof = vec![0u8; MAX_PAYMENT_PROOF_SIZE_BYTES + 1];
1258        let result = verifier.verify_payment(&xorname, Some(&large_proof)).await;
1259        assert!(result.is_err());
1260        let err_msg = format!("{}", result.expect_err("should fail"));
1261        assert!(
1262            err_msg.contains("too large"),
1263            "Error should mention 'too large': {err_msg}"
1264        );
1265    }
1266
1267    #[tokio::test]
1268    async fn test_proof_at_min_boundary_unknown_tag() {
1269        let verifier = create_test_verifier();
1270        let xorname = [3u8; 32];
1271
1272        // Exactly MIN_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected
1273        let boundary_proof = vec![0xFFu8; MIN_PAYMENT_PROOF_SIZE_BYTES];
1274        let result = verifier
1275            .verify_payment(&xorname, Some(&boundary_proof))
1276            .await;
1277        assert!(result.is_err());
1278        let err_msg = format!("{}", result.expect_err("should fail"));
1279        assert!(
1280            err_msg.contains("Unknown payment proof type tag"),
1281            "Error should mention unknown tag: {err_msg}"
1282        );
1283    }
1284
1285    #[tokio::test]
1286    async fn test_proof_at_max_boundary_unknown_tag() {
1287        let verifier = create_test_verifier();
1288        let xorname = [4u8; 32];
1289
1290        // Exactly MAX_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected
1291        let boundary_proof = vec![0xFFu8; MAX_PAYMENT_PROOF_SIZE_BYTES];
1292        let result = verifier
1293            .verify_payment(&xorname, Some(&boundary_proof))
1294            .await;
1295        assert!(result.is_err());
1296        let err_msg = format!("{}", result.expect_err("should fail"));
1297        assert!(
1298            err_msg.contains("Unknown payment proof type tag"),
1299            "Error should mention unknown tag: {err_msg}"
1300        );
1301    }
1302
1303    #[tokio::test]
1304    async fn test_malformed_single_node_proof() {
1305        let verifier = create_test_verifier();
1306        let xorname = [5u8; 32];
1307
1308        // Valid tag (0x01) but garbage payload — should fail deserialization
1309        let mut garbage = vec![crate::ant_protocol::PROOF_TAG_SINGLE_NODE];
1310        garbage.extend_from_slice(&[0xAB; 63]);
1311        let result = verifier.verify_payment(&xorname, Some(&garbage)).await;
1312        assert!(result.is_err());
1313        let err_msg = format!("{}", result.expect_err("should fail"));
1314        assert!(
1315            err_msg.contains("deserialize") || err_msg.contains("Failed"),
1316            "Error should mention deserialization failure: {err_msg}"
1317        );
1318    }
1319
1320    #[test]
1321    fn test_cache_len_getter() {
1322        let verifier = create_test_verifier();
1323        assert_eq!(verifier.cache_len(), 0);
1324
1325        verifier.cache.insert([10u8; 32]);
1326        assert_eq!(verifier.cache_len(), 1);
1327
1328        verifier.cache.insert([20u8; 32]);
1329        assert_eq!(verifier.cache_len(), 2);
1330    }
1331
1332    #[test]
1333    fn test_cache_stats_after_operations() {
1334        let verifier = create_test_verifier();
1335        let xorname = [7u8; 32];
1336
1337        // Miss
1338        verifier.check_payment_required(&xorname);
1339        let stats = verifier.cache_stats();
1340        assert_eq!(stats.misses, 1);
1341        assert_eq!(stats.hits, 0);
1342
1343        // Insert and hit
1344        verifier.cache.insert(xorname);
1345        verifier.check_payment_required(&xorname);
1346        let stats = verifier.cache_stats();
1347        assert_eq!(stats.hits, 1);
1348        assert_eq!(stats.misses, 1);
1349        assert_eq!(stats.additions, 1);
1350    }
1351
1352    #[tokio::test]
1353    async fn test_concurrent_cache_lookups() {
1354        let verifier = std::sync::Arc::new(create_test_verifier());
1355
1356        // Pre-populate cache for all 10 xornames
1357        for i in 0..10u8 {
1358            verifier.cache.insert([i; 32]);
1359        }
1360
1361        let mut handles = Vec::new();
1362        for i in 0..10u8 {
1363            let v = verifier.clone();
1364            handles.push(tokio::spawn(async move {
1365                let xorname = [i; 32];
1366                v.verify_payment(&xorname, None).await
1367            }));
1368        }
1369
1370        for handle in handles {
1371            let result = handle.await.expect("task panicked");
1372            assert!(result.is_ok());
1373            assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified);
1374        }
1375
1376        assert_eq!(verifier.cache_len(), 10);
1377    }
1378
1379    #[test]
1380    fn test_default_evm_config() {
1381        let _config = EvmVerifierConfig::default();
1382        // EVM is always on — default network is ArbitrumOne
1383    }
1384
1385    #[test]
1386    fn test_real_ml_dsa_proof_size_within_limits() {
1387        use crate::payment::metrics::QuotingMetricsTracker;
1388        use crate::payment::proof::PaymentProof;
1389        use crate::payment::quote::{QuoteGenerator, XorName};
1390        use alloy::primitives::FixedBytes;
1391        use evmlib::{EncodedPeerId, RewardsAddress};
1392        use saorsa_core::MlDsa65;
1393        use saorsa_pqc::pqc::types::MlDsaSecretKey;
1394        use saorsa_pqc::pqc::MlDsaOperations;
1395
1396        let ml_dsa = MlDsa65::new();
1397        let mut peer_quotes = Vec::new();
1398
1399        for i in 0..5u8 {
1400            let (public_key, secret_key) = ml_dsa.generate_keypair().expect("keygen");
1401
1402            let rewards_address = RewardsAddress::new([i; 20]);
1403            let metrics_tracker = QuotingMetricsTracker::new(0);
1404            let mut generator = QuoteGenerator::new(rewards_address, metrics_tracker);
1405
1406            let pub_key_bytes = public_key.as_bytes().to_vec();
1407            let sk_bytes = secret_key.as_bytes().to_vec();
1408            generator.set_signer(pub_key_bytes, move |msg| {
1409                let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("sk parse");
1410                let ml_dsa = MlDsa65::new();
1411                ml_dsa.sign(&sk, msg).expect("sign").as_bytes().to_vec()
1412            });
1413
1414            let content: XorName = [i; 32];
1415            let quote = generator.create_quote(content, 4096, 0).expect("quote");
1416
1417            peer_quotes.push((EncodedPeerId::new(rand::random()), quote));
1418        }
1419
1420        let proof = PaymentProof {
1421            proof_of_payment: ProofOfPayment { peer_quotes },
1422            tx_hashes: vec![FixedBytes::from([0xABu8; 32])],
1423        };
1424
1425        let proof_bytes =
1426            crate::payment::proof::serialize_single_node_proof(&proof).expect("serialize");
1427
1428        // 7 ML-DSA-65 quotes with ~1952-byte pub keys and ~3309-byte signatures
1429        // should produce a proof in the 30-80 KB range
1430        assert!(
1431            proof_bytes.len() > 20_000,
1432            "Real 7-quote ML-DSA proof should be > 20 KB, got {} bytes",
1433            proof_bytes.len()
1434        );
1435        assert!(
1436            proof_bytes.len() < MAX_PAYMENT_PROOF_SIZE_BYTES,
1437            "Real 7-quote ML-DSA proof ({} bytes) should fit within {} byte limit",
1438            proof_bytes.len(),
1439            MAX_PAYMENT_PROOF_SIZE_BYTES
1440        );
1441    }
1442
1443    #[tokio::test]
1444    async fn test_content_address_mismatch_rejected() {
1445        use crate::payment::proof::{serialize_single_node_proof, PaymentProof};
1446        use evmlib::{EncodedPeerId, PaymentQuote, RewardsAddress};
1447        use std::time::SystemTime;
1448
1449        let verifier = create_test_verifier();
1450
1451        // The xorname we're trying to store
1452        let target_xorname = [0xAAu8; 32];
1453
1454        // Create a quote for a DIFFERENT xorname
1455        let wrong_xorname = [0xBBu8; 32];
1456        let quote = PaymentQuote {
1457            content: xor_name::XorName(wrong_xorname),
1458            timestamp: SystemTime::now(),
1459            price: Amount::from(1u64),
1460            rewards_address: RewardsAddress::new([1u8; 20]),
1461            pub_key: vec![0u8; 64],
1462            signature: vec![0u8; 64],
1463        };
1464
1465        // Build CLOSE_GROUP_SIZE quotes with distinct peer IDs
1466        let mut peer_quotes = Vec::new();
1467        for _ in 0..CLOSE_GROUP_SIZE {
1468            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1469        }
1470
1471        let proof = PaymentProof {
1472            proof_of_payment: ProofOfPayment { peer_quotes },
1473            tx_hashes: vec![],
1474        };
1475
1476        let proof_bytes = serialize_single_node_proof(&proof).expect("serialize proof");
1477
1478        let result = verifier
1479            .verify_payment(&target_xorname, Some(&proof_bytes))
1480            .await;
1481
1482        assert!(result.is_err(), "Should reject mismatched content address");
1483        let err_msg = format!("{}", result.expect_err("should be error"));
1484        assert!(
1485            err_msg.contains("content address mismatch"),
1486            "Error should mention 'content address mismatch': {err_msg}"
1487        );
1488    }
1489
1490    /// Helper: create a fake quote with the given xorname and timestamp.
1491    fn make_fake_quote(
1492        xorname: [u8; 32],
1493        timestamp: SystemTime,
1494        rewards_address: RewardsAddress,
1495    ) -> evmlib::PaymentQuote {
1496        use evmlib::PaymentQuote;
1497
1498        PaymentQuote {
1499            content: xor_name::XorName(xorname),
1500            timestamp,
1501            price: Amount::from(1u64),
1502            rewards_address,
1503            pub_key: vec![0u8; 64],
1504            signature: vec![0u8; 64],
1505        }
1506    }
1507
1508    /// Helper: wrap quotes into a tagged serialized `PaymentProof`.
1509    fn serialize_proof(peer_quotes: Vec<(evmlib::EncodedPeerId, evmlib::PaymentQuote)>) -> Vec<u8> {
1510        use crate::payment::proof::{serialize_single_node_proof, PaymentProof};
1511
1512        let proof = PaymentProof {
1513            proof_of_payment: ProofOfPayment { peer_quotes },
1514            tx_hashes: vec![],
1515        };
1516        serialize_single_node_proof(&proof).expect("serialize proof")
1517    }
1518
1519    #[tokio::test]
1520    async fn test_expired_quote_rejected() {
1521        use evmlib::{EncodedPeerId, RewardsAddress};
1522        use std::time::Duration;
1523
1524        let verifier = create_test_verifier();
1525        let xorname = [0xCCu8; 32];
1526        let rewards_addr = RewardsAddress::new([1u8; 20]);
1527
1528        // Create a quote that's 25 hours old (exceeds 24-hour max)
1529        let old_timestamp = SystemTime::now() - Duration::from_secs(25 * 3600);
1530        let quote = make_fake_quote(xorname, old_timestamp, rewards_addr);
1531
1532        let mut peer_quotes = Vec::new();
1533        for _ in 0..CLOSE_GROUP_SIZE {
1534            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1535        }
1536
1537        let proof_bytes = serialize_proof(peer_quotes);
1538        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1539
1540        assert!(result.is_err(), "Should reject expired quote");
1541        let err_msg = format!("{}", result.expect_err("should fail"));
1542        assert!(
1543            err_msg.contains("expired"),
1544            "Error should mention 'expired': {err_msg}"
1545        );
1546    }
1547
1548    #[tokio::test]
1549    async fn test_future_timestamp_rejected() {
1550        use evmlib::{EncodedPeerId, RewardsAddress};
1551        use std::time::Duration;
1552
1553        let verifier = create_test_verifier();
1554        let xorname = [0xDDu8; 32];
1555        let rewards_addr = RewardsAddress::new([1u8; 20]);
1556
1557        // Create a quote with a timestamp 1 hour in the future
1558        let future_timestamp = SystemTime::now() + Duration::from_secs(3600);
1559        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
1560
1561        let mut peer_quotes = Vec::new();
1562        for _ in 0..CLOSE_GROUP_SIZE {
1563            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1564        }
1565
1566        let proof_bytes = serialize_proof(peer_quotes);
1567        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1568
1569        assert!(result.is_err(), "Should reject future-timestamped quote");
1570        let err_msg = format!("{}", result.expect_err("should fail"));
1571        assert!(
1572            err_msg.contains("future"),
1573            "Error should mention 'future': {err_msg}"
1574        );
1575    }
1576
1577    #[tokio::test]
1578    async fn test_quote_within_clock_skew_tolerance_accepted() {
1579        use evmlib::{EncodedPeerId, RewardsAddress};
1580        use std::time::Duration;
1581
1582        let verifier = create_test_verifier();
1583        let xorname = [0xD1u8; 32];
1584        let rewards_addr = RewardsAddress::new([1u8; 20]);
1585
1586        // Quote 30 seconds in the future — within 60s tolerance
1587        let future_timestamp = SystemTime::now() + Duration::from_secs(30);
1588        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
1589
1590        let mut peer_quotes = Vec::new();
1591        for _ in 0..CLOSE_GROUP_SIZE {
1592            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1593        }
1594
1595        let proof_bytes = serialize_proof(peer_quotes);
1596        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1597
1598        // Should NOT fail at timestamp check (will fail later at pub_key binding)
1599        let err_msg = format!("{}", result.expect_err("should fail at later check"));
1600        assert!(
1601            !err_msg.contains("future"),
1602            "Should pass timestamp check (within tolerance), but got: {err_msg}"
1603        );
1604    }
1605
1606    #[tokio::test]
1607    async fn test_quote_just_beyond_clock_skew_tolerance_rejected() {
1608        use evmlib::{EncodedPeerId, RewardsAddress};
1609        use std::time::Duration;
1610
1611        let verifier = create_test_verifier();
1612        let xorname = [0xD2u8; 32];
1613        let rewards_addr = RewardsAddress::new([1u8; 20]);
1614
1615        // Quote 120 seconds in the future — exceeds 60s tolerance
1616        let future_timestamp = SystemTime::now() + Duration::from_secs(120);
1617        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
1618
1619        let mut peer_quotes = Vec::new();
1620        for _ in 0..CLOSE_GROUP_SIZE {
1621            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1622        }
1623
1624        let proof_bytes = serialize_proof(peer_quotes);
1625        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1626
1627        assert!(
1628            result.is_err(),
1629            "Should reject quote beyond clock skew tolerance"
1630        );
1631        let err_msg = format!("{}", result.expect_err("should fail"));
1632        assert!(
1633            err_msg.contains("future"),
1634            "Error should mention 'future': {err_msg}"
1635        );
1636    }
1637
1638    #[tokio::test]
1639    async fn test_quote_23h_old_still_accepted() {
1640        use evmlib::{EncodedPeerId, RewardsAddress};
1641        use std::time::Duration;
1642
1643        let verifier = create_test_verifier();
1644        let xorname = [0xD3u8; 32];
1645        let rewards_addr = RewardsAddress::new([1u8; 20]);
1646
1647        // Quote 23 hours old — within 24h max age
1648        let old_timestamp = SystemTime::now() - Duration::from_secs(23 * 3600);
1649        let quote = make_fake_quote(xorname, old_timestamp, rewards_addr);
1650
1651        let mut peer_quotes = Vec::new();
1652        for _ in 0..CLOSE_GROUP_SIZE {
1653            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1654        }
1655
1656        let proof_bytes = serialize_proof(peer_quotes);
1657        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1658
1659        // Should NOT fail at timestamp check (will fail later at pub_key binding)
1660        let err_msg = format!("{}", result.expect_err("should fail at later check"));
1661        assert!(
1662            !err_msg.contains("expired"),
1663            "Should pass expiry check (23h < 24h), but got: {err_msg}"
1664        );
1665    }
1666
1667    /// Helper: build an `EncodedPeerId` that matches the BLAKE3 hash of an ML-DSA public key.
1668    fn encoded_peer_id_for_pub_key(pub_key: &[u8]) -> evmlib::EncodedPeerId {
1669        let ant_peer_id = peer_id_from_public_key_bytes(pub_key).expect("valid ML-DSA pub key");
1670        evmlib::EncodedPeerId::new(*ant_peer_id.as_bytes())
1671    }
1672
1673    #[tokio::test]
1674    async fn test_local_not_in_paid_set_rejected() {
1675        use evmlib::RewardsAddress;
1676        use saorsa_core::MlDsa65;
1677        use saorsa_pqc::pqc::MlDsaOperations;
1678
1679        // Verifier with a local rewards address set
1680        let local_addr = RewardsAddress::new([0xAAu8; 20]);
1681        let config = PaymentVerifierConfig {
1682            evm: EvmVerifierConfig {
1683                network: EvmNetwork::ArbitrumOne,
1684            },
1685            cache_capacity: 100,
1686            local_rewards_address: local_addr,
1687        };
1688        let verifier = PaymentVerifier::new(config);
1689
1690        let xorname = [0xEEu8; 32];
1691        // Quotes pay a DIFFERENT rewards address
1692        let other_addr = RewardsAddress::new([0xBBu8; 20]);
1693
1694        // Use real ML-DSA keys so the pub_key→peer_id binding check passes
1695        let ml_dsa = MlDsa65::new();
1696        let mut peer_quotes = Vec::new();
1697        for _ in 0..CLOSE_GROUP_SIZE {
1698            let (public_key, _secret_key) = ml_dsa.generate_keypair().expect("keygen");
1699            let pub_key_bytes = public_key.as_bytes().to_vec();
1700            let encoded = encoded_peer_id_for_pub_key(&pub_key_bytes);
1701
1702            let mut quote = make_fake_quote(xorname, SystemTime::now(), other_addr);
1703            quote.pub_key = pub_key_bytes;
1704
1705            peer_quotes.push((encoded, quote));
1706        }
1707
1708        let proof_bytes = serialize_proof(peer_quotes);
1709        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1710
1711        assert!(result.is_err(), "Should reject payment not addressed to us");
1712        let err_msg = format!("{}", result.expect_err("should fail"));
1713        assert!(
1714            err_msg.contains("does not include this node as a recipient"),
1715            "Error should mention recipient rejection: {err_msg}"
1716        );
1717    }
1718
1719    #[tokio::test]
1720    async fn test_wrong_peer_binding_rejected() {
1721        use evmlib::{EncodedPeerId, RewardsAddress};
1722        use saorsa_core::MlDsa65;
1723        use saorsa_pqc::pqc::MlDsaOperations;
1724
1725        let verifier = create_test_verifier();
1726        let xorname = [0xFFu8; 32];
1727        let rewards_addr = RewardsAddress::new([1u8; 20]);
1728
1729        // Generate a real ML-DSA keypair so pub_key is valid
1730        let ml_dsa = MlDsa65::new();
1731        let (public_key, _secret_key) = ml_dsa.generate_keypair().expect("keygen");
1732        let pub_key_bytes = public_key.as_bytes().to_vec();
1733
1734        // Create a quote with a real pub_key but attach it to a random peer ID
1735        // whose identity multihash does NOT match BLAKE3(pub_key)
1736        let mut quote = make_fake_quote(xorname, SystemTime::now(), rewards_addr);
1737        quote.pub_key = pub_key_bytes;
1738
1739        // Use random ed25519 peer IDs — they won't match BLAKE3(pub_key)
1740        let mut peer_quotes = Vec::new();
1741        for _ in 0..CLOSE_GROUP_SIZE {
1742            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1743        }
1744
1745        let proof_bytes = serialize_proof(peer_quotes);
1746        let result = verifier.verify_payment(&xorname, Some(&proof_bytes)).await;
1747
1748        assert!(result.is_err(), "Should reject wrong peer binding");
1749        let err_msg = format!("{}", result.expect_err("should fail"));
1750        assert!(
1751            err_msg.contains("pub_key does not belong to claimed peer"),
1752            "Error should mention binding mismatch: {err_msg}"
1753        );
1754    }
1755
1756    // =========================================================================
1757    // Merkle-tagged proof tests
1758    // =========================================================================
1759
1760    #[tokio::test]
1761    async fn test_merkle_tagged_proof_invalid_data_rejected() {
1762        use crate::ant_protocol::PROOF_TAG_MERKLE;
1763
1764        let verifier = create_test_verifier();
1765        let xorname = [0xA1u8; 32];
1766
1767        // Build a merkle-tagged proof with garbage body.
1768        // The tag byte is correct but the body is not valid msgpack.
1769        let mut merkle_garbage = Vec::with_capacity(64);
1770        merkle_garbage.push(PROOF_TAG_MERKLE);
1771        merkle_garbage.extend_from_slice(&[0xAB; 63]);
1772
1773        let result = verifier
1774            .verify_payment(&xorname, Some(&merkle_garbage))
1775            .await;
1776
1777        assert!(
1778            result.is_err(),
1779            "Should reject merkle proof with invalid body"
1780        );
1781        let err_msg = format!("{}", result.expect_err("should fail"));
1782        assert!(
1783            err_msg.contains("deserialize") || err_msg.contains("merkle proof"),
1784            "Error should mention deserialization failure: {err_msg}"
1785        );
1786    }
1787
1788    #[tokio::test]
1789    async fn test_single_node_tagged_proof_deserialization() {
1790        use crate::payment::proof::serialize_single_node_proof;
1791        use evmlib::{EncodedPeerId, RewardsAddress};
1792
1793        let verifier = create_test_verifier();
1794        let xorname = [0xA2u8; 32];
1795        let rewards_addr = RewardsAddress::new([1u8; 20]);
1796
1797        // Build a valid tagged single-node proof
1798        let quote = make_fake_quote(xorname, SystemTime::now(), rewards_addr);
1799        let mut peer_quotes = Vec::new();
1800        for _ in 0..CLOSE_GROUP_SIZE {
1801            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
1802        }
1803
1804        let proof = crate::payment::proof::PaymentProof {
1805            proof_of_payment: ProofOfPayment {
1806                peer_quotes: peer_quotes.clone(),
1807            },
1808            tx_hashes: vec![],
1809        };
1810
1811        let tagged_bytes = serialize_single_node_proof(&proof).expect("serialize tagged proof");
1812
1813        // detect_proof_type should identify it as SingleNode
1814        assert_eq!(
1815            crate::payment::proof::detect_proof_type(&tagged_bytes),
1816            Some(crate::payment::proof::ProofType::SingleNode)
1817        );
1818
1819        // verify_payment should process it through the single-node path.
1820        // It will fail at quote validation (fake pub_key), but we verify
1821        // it passes the deserialization stage by checking the error type.
1822        let result = verifier.verify_payment(&xorname, Some(&tagged_bytes)).await;
1823
1824        assert!(result.is_err(), "Should fail at quote validation stage");
1825        let err_msg = format!("{}", result.expect_err("should fail"));
1826        // It should NOT be a deserialization error — it should get further
1827        assert!(
1828            !err_msg.contains("deserialize"),
1829            "Should pass deserialization but fail later: {err_msg}"
1830        );
1831    }
1832
1833    #[test]
1834    fn test_pool_cache_insert_and_lookup() {
1835        use evmlib::merkle_batch_payment::PoolHash;
1836
1837        // Verify the pool_cache field exists and works correctly.
1838        // Insert a pool hash, then verify it's present on lookup.
1839        let verifier = create_test_verifier();
1840
1841        let pool_hash: PoolHash = [0xBBu8; 32];
1842        let payment_info = evmlib::merkle_payments::OnChainPaymentInfo {
1843            depth: 4,
1844            merkle_payment_timestamp: 1_700_000_000,
1845            paid_node_addresses: vec![],
1846        };
1847
1848        // Insert into pool cache
1849        {
1850            let mut cache = verifier.pool_cache.lock();
1851            cache.put(pool_hash, payment_info);
1852        }
1853
1854        // First lookup — should find it
1855        {
1856            let found = verifier.pool_cache.lock().get(&pool_hash).cloned();
1857            assert!(found.is_some(), "Pool hash should be in cache after insert");
1858            let info = found.expect("cached info");
1859            assert_eq!(info.depth, 4);
1860            assert_eq!(info.merkle_payment_timestamp, 1_700_000_000);
1861        }
1862
1863        // Second lookup — same result (no double-query needed)
1864        {
1865            let found = verifier.pool_cache.lock().get(&pool_hash).cloned();
1866            assert!(
1867                found.is_some(),
1868                "Pool hash should still be in cache on second lookup"
1869            );
1870        }
1871
1872        // Different pool hash — should NOT be found
1873        let other_hash: PoolHash = [0xCCu8; 32];
1874        {
1875            let found = verifier.pool_cache.lock().get(&other_hash).cloned();
1876            assert!(found.is_none(), "Unknown pool hash should not be in cache");
1877        }
1878    }
1879
1880    #[tokio::test]
1881    async fn closeness_pass_cache_short_circuits_second_call() {
1882        // When a pool_hash is in the closeness_pass_cache, the outer
1883        // verify_merkle_candidate_closeness must return Ok(()) without
1884        // running the inner lookup — even if no P2PNode is attached.
1885        // That second half (no-p2p → would normally fail-closed in release)
1886        // is the proof the cache short-circuit ran first.
1887        let verifier = create_test_verifier();
1888        let pool_hash = [0xAAu8; 32];
1889        verifier.closeness_pass_cache.lock().put(pool_hash, ());
1890
1891        // Construct a dummy pool — contents don't matter because the cache
1892        // hit means we never look at them.
1893        let pool = MerklePaymentCandidatePool {
1894            midpoint_proof: fake_midpoint_proof(),
1895            candidate_nodes: make_candidate_nodes(1_700_000_000),
1896        };
1897
1898        let result = verifier
1899            .verify_merkle_candidate_closeness(&pool, pool_hash)
1900            .await;
1901        assert!(
1902            result.is_ok(),
1903            "cached pool hash must bypass the inner check and return Ok(()), got: {result:?}"
1904        );
1905    }
1906
1907    #[tokio::test]
1908    async fn closeness_single_flight_concurrent_readers_share_one_verification() {
1909        // Two concurrent callers for the same pool_hash should produce the
1910        // same outcome, and the cache should end up populated exactly once.
1911        // We use the test-utils fail-open path to short-circuit the inner
1912        // DHT lookup; the purpose of this test is the single-flight
1913        // plumbing, not the lookup itself.
1914        let verifier = Arc::new(create_test_verifier());
1915        let pool_hash = [0x77u8; 32];
1916        let pool = MerklePaymentCandidatePool {
1917            midpoint_proof: fake_midpoint_proof(),
1918            candidate_nodes: make_candidate_nodes(1_700_000_000),
1919        };
1920
1921        let v1 = Arc::clone(&verifier);
1922        let p1 = pool.clone();
1923        let v2 = Arc::clone(&verifier);
1924        let p2 = pool.clone();
1925
1926        let (r1, r2) = tokio::join!(
1927            async move { v1.verify_merkle_candidate_closeness(&p1, pool_hash).await },
1928            async move { v2.verify_merkle_candidate_closeness(&p2, pool_hash).await },
1929        );
1930
1931        assert_eq!(r1.is_ok(), r2.is_ok(), "concurrent callers must agree");
1932        assert!(
1933            r1.is_ok(),
1934            "both callers must succeed on the test-utils path"
1935        );
1936        assert!(
1937            verifier
1938                .closeness_pass_cache
1939                .lock()
1940                .get(&pool_hash)
1941                .is_some(),
1942            "success path must populate the pass cache"
1943        );
1944        assert!(
1945            verifier.inflight_closeness.lock().get(&pool_hash).is_none(),
1946            "inflight slot must be cleared after the leader finishes"
1947        );
1948    }
1949
1950    #[tokio::test]
1951    async fn closeness_waiter_reads_leaders_published_failure() {
1952        // Prove the waiter path actually surfaces a failure published by a
1953        // concurrent leader, without running its own inner check. Insert a
1954        // slot, spawn a waiter (which will park on notified_owned), then
1955        // publish failure + notify from the outside — simulating what the
1956        // leader's `publish` + drop-guard pair does.
1957        let verifier = Arc::new(create_test_verifier());
1958        let pool_hash = [0x55u8; 32];
1959        let slot = Arc::new(ClosenessSlot::new());
1960        verifier
1961            .inflight_closeness
1962            .lock()
1963            .put(pool_hash, Arc::clone(&slot));
1964
1965        let pool = MerklePaymentCandidatePool {
1966            midpoint_proof: fake_midpoint_proof(),
1967            candidate_nodes: make_candidate_nodes(1_700_000_000),
1968        };
1969
1970        let verifier_c = Arc::clone(&verifier);
1971        let pool_c = pool.clone();
1972        let waiter = tokio::spawn(async move {
1973            verifier_c
1974                .verify_merkle_candidate_closeness(&pool_c, pool_hash)
1975                .await
1976        });
1977
1978        // Yield so the waiter can run up to its `notified_owned().await`.
1979        // A few yields cover both single-threaded and multi-threaded tokio
1980        // runtimes regardless of scheduling.
1981        for _ in 0..5 {
1982            tokio::task::yield_now().await;
1983        }
1984
1985        // Simulate the leader's `publish` + drop-guard: publish the result,
1986        // clear the slot, wake waiters.
1987        slot.result
1988            .set(Err("forged pool: not close enough".to_string()))
1989            .expect("set once");
1990        verifier.inflight_closeness.lock().pop(&pool_hash);
1991        slot.notify.notify_waiters();
1992
1993        let result = waiter.await.expect("task panicked");
1994        let err = result.expect_err("waiter must return the leader's published failure");
1995        assert!(
1996            err.to_string().contains("forged pool"),
1997            "waiter must surface the leader's error message, got: {err}"
1998        );
1999    }
2000
2001    /// Build a deterministic but otherwise-unused `MidpointProof` so unit
2002    /// tests can construct a `MerklePaymentCandidatePool` without spinning
2003    /// up a real merkle tree. The closeness path only calls `.address()`
2004    /// on it, which is a pure BLAKE3 of the branch's leaf/root/timestamp —
2005    /// the values don't need to be tree-valid for these tests.
2006    fn fake_midpoint_proof() -> evmlib::merkle_payments::MidpointProof {
2007        // Build a minimal tree of two leaves so we get a real branch.
2008        let leaves = vec![xor_name::XorName([1u8; 32]), xor_name::XorName([2u8; 32])];
2009        let tree = evmlib::merkle_payments::MerkleTree::from_xornames(leaves).expect("tree");
2010        let candidates = tree.reward_candidates(1_700_000_000).expect("candidates");
2011        candidates.first().expect("at least one").clone()
2012    }
2013
2014    // =========================================================================
2015    // Merkle verification unit tests
2016    // =========================================================================
2017
2018    /// Helper: build 16 validly-signed ML-DSA-65 candidate nodes.
2019    fn make_candidate_nodes(
2020        timestamp: u64,
2021    ) -> [evmlib::merkle_payments::MerklePaymentCandidateNode;
2022           evmlib::merkle_payments::CANDIDATES_PER_POOL] {
2023        use evmlib::merkle_payments::{MerklePaymentCandidateNode, CANDIDATES_PER_POOL};
2024        use saorsa_core::MlDsa65;
2025        use saorsa_pqc::pqc::types::MlDsaSecretKey;
2026        use saorsa_pqc::pqc::MlDsaOperations;
2027
2028        std::array::from_fn::<_, CANDIDATES_PER_POOL, _>(|i| {
2029            let ml_dsa = MlDsa65::new();
2030            let (pub_key, secret_key) = ml_dsa.generate_keypair().expect("keygen");
2031            let price = evmlib::common::Amount::from(1024u64);
2032            #[allow(clippy::cast_possible_truncation)]
2033            let reward_address = RewardsAddress::new([i as u8; 20]);
2034            let msg = MerklePaymentCandidateNode::bytes_to_sign(&price, &reward_address, timestamp);
2035            let sk = MlDsaSecretKey::from_bytes(secret_key.as_bytes()).expect("sk");
2036            let signature = ml_dsa.sign(&sk, &msg).expect("sign").as_bytes().to_vec();
2037
2038            MerklePaymentCandidateNode {
2039                pub_key: pub_key.as_bytes().to_vec(),
2040                price,
2041                reward_address,
2042                merkle_payment_timestamp: timestamp,
2043                signature,
2044            }
2045        })
2046    }
2047
2048    /// Helper: build a valid `MerklePaymentProof` with real ML-DSA-65
2049    /// signatures. Returns the raw proof, pool hash, xorname, and timestamp.
2050    fn make_valid_merkle_proof() -> (
2051        evmlib::merkle_payments::MerklePaymentProof,
2052        evmlib::merkle_batch_payment::PoolHash,
2053        [u8; 32],
2054        u64,
2055    ) {
2056        use evmlib::merkle_payments::{MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree};
2057
2058        let timestamp = std::time::SystemTime::now()
2059            .duration_since(std::time::UNIX_EPOCH)
2060            .expect("system time")
2061            .as_secs();
2062
2063        let addresses: Vec<xor_name::XorName> = (0..4u8)
2064            .map(|i| xor_name::XorName::from_content(&[i]))
2065            .collect();
2066        let tree = MerkleTree::from_xornames(addresses.clone()).expect("tree");
2067
2068        let candidate_nodes = make_candidate_nodes(timestamp);
2069
2070        let reward_candidates = tree
2071            .reward_candidates(timestamp)
2072            .expect("reward candidates");
2073        let midpoint_proof = reward_candidates
2074            .first()
2075            .expect("at least one candidate")
2076            .clone();
2077
2078        let pool = MerklePaymentCandidatePool {
2079            midpoint_proof,
2080            candidate_nodes,
2081        };
2082
2083        let first_address = *addresses.first().expect("first address");
2084        let address_proof = tree
2085            .generate_address_proof(0, first_address)
2086            .expect("proof");
2087
2088        let merkle_proof = MerklePaymentProof::new(first_address, address_proof, pool);
2089        let pool_hash = merkle_proof.winner_pool_hash();
2090        let xorname = first_address.0;
2091
2092        (merkle_proof, pool_hash, xorname, timestamp)
2093    }
2094
2095    /// Helper: build a minimal valid `MerklePaymentProof` with real ML-DSA-65
2096    /// signatures. Returns `(xorname, serialized_tagged_proof, pool_hash, timestamp)`.
2097    fn make_valid_merkle_proof_bytes() -> (
2098        [u8; 32],
2099        Vec<u8>,
2100        evmlib::merkle_batch_payment::PoolHash,
2101        u64,
2102    ) {
2103        let (merkle_proof, pool_hash, xorname, timestamp) = make_valid_merkle_proof();
2104        let tagged = crate::payment::proof::serialize_merkle_proof(&merkle_proof)
2105            .expect("serialize merkle proof");
2106        (xorname, tagged, pool_hash, timestamp)
2107    }
2108
2109    #[tokio::test]
2110    async fn test_merkle_address_mismatch_rejected() {
2111        let verifier = create_test_verifier();
2112        let (_correct_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
2113
2114        // Use a DIFFERENT xorname than what the proof was built for
2115        let wrong_xorname = [0xFFu8; 32];
2116
2117        let result = verifier
2118            .verify_payment(&wrong_xorname, Some(&tagged_proof))
2119            .await;
2120
2121        assert!(
2122            result.is_err(),
2123            "Should reject merkle proof address mismatch"
2124        );
2125        let err_msg = format!("{}", result.expect_err("should fail"));
2126        assert!(
2127            err_msg.contains("address mismatch") || err_msg.contains("Merkle proof address"),
2128            "Error should mention address mismatch: {err_msg}"
2129        );
2130    }
2131
2132    #[tokio::test]
2133    async fn test_merkle_malformed_body_rejected() {
2134        let verifier = create_test_verifier();
2135        let xorname = [0xA3u8; 32];
2136
2137        // Valid merkle tag but truncated/corrupted msgpack body
2138        let mut bad_proof = vec![crate::ant_protocol::PROOF_TAG_MERKLE];
2139        bad_proof.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
2140        bad_proof.extend_from_slice(&[0x00; 10]);
2141        // pad to minimum size
2142        while bad_proof.len() < MIN_PAYMENT_PROOF_SIZE_BYTES {
2143            bad_proof.push(0x00);
2144        }
2145
2146        let result = verifier.verify_payment(&xorname, Some(&bad_proof)).await;
2147
2148        assert!(result.is_err(), "Should reject malformed merkle body");
2149        let err_msg = format!("{}", result.expect_err("should fail"));
2150        assert!(
2151            err_msg.contains("deserialize") || err_msg.contains("Failed"),
2152            "Error should mention deserialization: {err_msg}"
2153        );
2154    }
2155
2156    #[test]
2157    fn test_merkle_proof_serialized_size_within_limits() {
2158        let (_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
2159
2160        // 16 ML-DSA-65 candidates (~1952 pub key + ~3309 sig each) ≈ 84 KB + tree data
2161        assert!(
2162            tagged_proof.len() >= MIN_PAYMENT_PROOF_SIZE_BYTES,
2163            "Merkle proof ({} bytes) should be >= min {} bytes",
2164            tagged_proof.len(),
2165            MIN_PAYMENT_PROOF_SIZE_BYTES
2166        );
2167        assert!(
2168            tagged_proof.len() <= MAX_PAYMENT_PROOF_SIZE_BYTES,
2169            "Merkle proof ({} bytes) should be <= max {} bytes",
2170            tagged_proof.len(),
2171            MAX_PAYMENT_PROOF_SIZE_BYTES
2172        );
2173    }
2174
2175    #[test]
2176    fn test_merkle_proof_tag_is_correct() {
2177        let (_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
2178
2179        assert_eq!(
2180            tagged_proof.first().copied(),
2181            Some(crate::ant_protocol::PROOF_TAG_MERKLE),
2182            "First byte must be the merkle tag"
2183        );
2184        assert_eq!(
2185            crate::payment::proof::detect_proof_type(&tagged_proof),
2186            Some(crate::payment::proof::ProofType::Merkle)
2187        );
2188    }
2189
2190    #[test]
2191    fn test_pool_cache_eviction() {
2192        use evmlib::merkle_batch_payment::PoolHash;
2193
2194        let config = PaymentVerifierConfig {
2195            evm: EvmVerifierConfig::default(),
2196            cache_capacity: 100,
2197            local_rewards_address: RewardsAddress::new([1u8; 20]),
2198        };
2199        let verifier = PaymentVerifier::new(config);
2200
2201        // Fill the pool cache to capacity (DEFAULT_POOL_CACHE_CAPACITY = 1000)
2202        for i in 0..DEFAULT_POOL_CACHE_CAPACITY {
2203            let mut hash: PoolHash = [0u8; 32];
2204            // Write index bytes into the hash
2205            let idx_bytes = i.to_le_bytes();
2206            for (j, b) in idx_bytes.iter().enumerate() {
2207                if j < 32 {
2208                    hash[j] = *b;
2209                }
2210            }
2211            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2212                depth: 4,
2213                merkle_payment_timestamp: 1_700_000_000,
2214                paid_node_addresses: vec![],
2215            };
2216            verifier.pool_cache.lock().put(hash, info);
2217        }
2218
2219        assert_eq!(
2220            verifier.pool_cache.lock().len(),
2221            DEFAULT_POOL_CACHE_CAPACITY
2222        );
2223
2224        // Insert one more — should evict the oldest
2225        let overflow_hash: PoolHash = [0xFFu8; 32];
2226        let info = evmlib::merkle_payments::OnChainPaymentInfo {
2227            depth: 8,
2228            merkle_payment_timestamp: 1_800_000_000,
2229            paid_node_addresses: vec![],
2230        };
2231        verifier.pool_cache.lock().put(overflow_hash, info);
2232
2233        // Size should still be at capacity (not capacity + 1)
2234        assert_eq!(
2235            verifier.pool_cache.lock().len(),
2236            DEFAULT_POOL_CACHE_CAPACITY
2237        );
2238
2239        // The new entry should be present
2240        let found = verifier.pool_cache.lock().get(&overflow_hash).cloned();
2241        assert!(
2242            found.is_some(),
2243            "Newly inserted pool hash should be present"
2244        );
2245        assert_eq!(found.expect("info").depth, 8);
2246    }
2247
2248    #[test]
2249    fn test_pool_cache_concurrent_access() {
2250        use evmlib::merkle_batch_payment::PoolHash;
2251        use std::sync::Arc;
2252
2253        let verifier = Arc::new(create_test_verifier());
2254
2255        let mut handles = Vec::new();
2256        for i in 0..20u8 {
2257            let v = verifier.clone();
2258            handles.push(std::thread::spawn(move || {
2259                let hash: PoolHash = [i; 32];
2260                let info = evmlib::merkle_payments::OnChainPaymentInfo {
2261                    depth: i,
2262                    merkle_payment_timestamp: u64::from(i) * 1000,
2263                    paid_node_addresses: vec![],
2264                };
2265                v.pool_cache.lock().put(hash, info);
2266
2267                // Read back
2268                let found = v.pool_cache.lock().get(&hash).cloned();
2269                assert!(found.is_some(), "Entry {i} should be readable after insert");
2270            }));
2271        }
2272
2273        for handle in handles {
2274            handle.join().expect("thread panicked");
2275        }
2276
2277        // All 20 entries should be present (well under 1000 capacity)
2278        assert_eq!(verifier.pool_cache.lock().len(), 20);
2279    }
2280
2281    #[tokio::test]
2282    async fn test_merkle_tampered_candidate_signature_rejected() {
2283        let verifier = create_test_verifier();
2284
2285        let (mut merkle_proof, _pool_hash, xorname, timestamp) = make_valid_merkle_proof();
2286
2287        // Tamper the first candidate's signature
2288        if let Some(byte) = merkle_proof
2289            .winner_pool
2290            .candidate_nodes
2291            .first_mut()
2292            .and_then(|c| c.signature.first_mut())
2293        {
2294            *byte ^= 0xFF;
2295        }
2296
2297        // Recompute pool hash after tampering (signature change alters the hash)
2298        let tampered_pool_hash = merkle_proof.winner_pool_hash();
2299
2300        // Pre-populate pool cache so we skip the on-chain query
2301        {
2302            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2303                depth: 4,
2304                merkle_payment_timestamp: timestamp,
2305                paid_node_addresses: vec![],
2306            };
2307            verifier.pool_cache.lock().put(tampered_pool_hash, info);
2308        }
2309
2310        let tagged =
2311            crate::payment::proof::serialize_merkle_proof(&merkle_proof).expect("serialize");
2312
2313        let result = verifier.verify_payment(&xorname, Some(&tagged)).await;
2314
2315        assert!(
2316            result.is_err(),
2317            "Should reject merkle proof with tampered candidate signature"
2318        );
2319        let err_msg = format!("{}", result.expect_err("should fail"));
2320        assert!(
2321            err_msg.contains("Invalid ML-DSA-65 signature"),
2322            "Error should mention invalid signature: {err_msg}"
2323        );
2324    }
2325
2326    #[tokio::test]
2327    async fn test_merkle_timestamp_mismatch_rejected() {
2328        let verifier = create_test_verifier();
2329
2330        let (xorname, tagged, pool_hash, timestamp) = make_valid_merkle_proof_bytes();
2331
2332        // Pre-populate pool cache with a DIFFERENT timestamp than the candidates
2333        {
2334            let mismatched_ts = timestamp + 9999;
2335            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2336                depth: 4,
2337                merkle_payment_timestamp: mismatched_ts,
2338                paid_node_addresses: vec![],
2339            };
2340            verifier.pool_cache.lock().put(pool_hash, info);
2341        }
2342
2343        let result = verifier.verify_payment(&xorname, Some(&tagged)).await;
2344
2345        assert!(
2346            result.is_err(),
2347            "Should reject merkle proof with timestamp mismatch"
2348        );
2349        let err_msg = format!("{}", result.expect_err("should fail"));
2350        assert!(
2351            err_msg.contains("timestamp mismatch"),
2352            "Error should mention timestamp mismatch: {err_msg}"
2353        );
2354    }
2355
2356    #[tokio::test]
2357    async fn test_merkle_paid_node_index_out_of_bounds_rejected() {
2358        let verifier = create_test_verifier();
2359        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2360
2361        // The test tree has 4 addresses → depth 2. We must match the tree depth
2362        // so verify_merkle_proof passes the depth check, then the paid node
2363        // index out-of-bounds check fires.
2364        {
2365            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2366                depth: 2,
2367                merkle_payment_timestamp: ts,
2368                paid_node_addresses: vec![
2369                    // First paid node: valid (matches candidate 0, amount matches formula)
2370                    // Expected per-node: median(1024) * 2^2 / 2 = 2048
2371                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(2048u64)),
2372                    // Second paid node: index 999 is way beyond CANDIDATES_PER_POOL (16)
2373                    (RewardsAddress::new([1u8; 20]), 999, Amount::from(2048u64)),
2374                ],
2375            };
2376            verifier.pool_cache.lock().put(pool_hash, info);
2377        }
2378
2379        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2380
2381        assert!(
2382            result.is_err(),
2383            "Should reject paid node index out of bounds"
2384        );
2385        let err_msg = format!("{}", result.expect_err("should fail"));
2386        assert!(
2387            err_msg.contains("out of bounds"),
2388            "Error should mention out of bounds: {err_msg}"
2389        );
2390    }
2391
2392    #[tokio::test]
2393    async fn test_merkle_paid_node_address_mismatch_rejected() {
2394        let verifier = create_test_verifier();
2395        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2396
2397        // Tree has depth 2, so provide 2 paid node entries.
2398        // Both use valid indices but the second has a wrong reward address.
2399        {
2400            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2401                depth: 2,
2402                merkle_payment_timestamp: ts,
2403                paid_node_addresses: vec![
2404                    // Index 0 with matching address [0x00; 20]
2405                    // Expected per-node: median(1024) * 2^2 / 2 = 2048
2406                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(2048u64)),
2407                    // Index 1 with WRONG address — candidate 1's address is [0x01; 20]
2408                    (RewardsAddress::new([0xFF; 20]), 1, Amount::from(2048u64)),
2409                ],
2410            };
2411            verifier.pool_cache.lock().put(pool_hash, info);
2412        }
2413
2414        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2415
2416        assert!(result.is_err(), "Should reject paid node address mismatch");
2417        let err_msg = format!("{}", result.expect_err("should fail"));
2418        assert!(
2419            err_msg.contains("address mismatch"),
2420            "Error should mention address mismatch: {err_msg}"
2421        );
2422    }
2423
2424    #[tokio::test]
2425    async fn test_merkle_wrong_depth_rejected() {
2426        let verifier = create_test_verifier();
2427        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2428
2429        // Pre-populate pool cache with depth=3 but only 1 paid node address
2430        // (depth must equal paid_node_addresses.len())
2431        {
2432            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2433                depth: 3,
2434                merkle_payment_timestamp: ts,
2435                paid_node_addresses: vec![(
2436                    RewardsAddress::new([0u8; 20]),
2437                    0,
2438                    Amount::from(1024u64),
2439                )],
2440            };
2441            verifier.pool_cache.lock().put(pool_hash, info);
2442        }
2443
2444        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2445
2446        assert!(
2447            result.is_err(),
2448            "Should reject mismatched depth vs paid node count"
2449        );
2450        let err_msg = format!("{}", result.expect_err("should fail"));
2451        assert!(
2452            err_msg.contains("Wrong number of paid nodes")
2453                || err_msg.contains("verification failed"),
2454            "Error should mention depth/count mismatch: {err_msg}"
2455        );
2456    }
2457
2458    #[tokio::test]
2459    async fn test_merkle_underpayment_rejected() {
2460        let verifier = create_test_verifier();
2461        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
2462
2463        // Tree depth=2, so 2 paid nodes required. Candidates all quote price=1024.
2464        // Expected per-node: median(1024) * 2^2 / 2 = 2048.
2465        // Pay only 1 wei per node — far below the expected amount.
2466        {
2467            let info = evmlib::merkle_payments::OnChainPaymentInfo {
2468                depth: 2,
2469                merkle_payment_timestamp: ts,
2470                paid_node_addresses: vec![
2471                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(1u64)),
2472                    (RewardsAddress::new([1u8; 20]), 1, Amount::from(1u64)),
2473                ],
2474            };
2475            verifier.pool_cache.lock().put(pool_hash, info);
2476        }
2477
2478        let result = verifier.verify_payment(&xorname, Some(&tagged_proof)).await;
2479
2480        assert!(
2481            result.is_err(),
2482            "Should reject merkle payment where paid amount < expected per-node amount"
2483        );
2484        let err_msg = format!("{}", result.expect_err("should fail"));
2485        assert!(
2486            err_msg.contains("Underpayment"),
2487            "Error should mention underpayment: {err_msg}"
2488        );
2489    }
2490}