Skip to main content

ant_node/payment/
verifier.rs

1//! Payment verifier with LRU cache and EVM verification.
2//!
3//! This is the core payment verification logic for ant-node.
4//! All new data requires EVM payment on Arbitrum (no free tier).
5
6use crate::ant_protocol::CLOSE_GROUP_SIZE;
7use crate::error::{Error, Result};
8use crate::logging::{debug, info, warn};
9use crate::payment::cache::{CacheStats, VerifiedCache, XorName};
10use crate::payment::pricing::{calculate_price, derive_records_stored_from_price};
11use crate::payment::proof::{
12    deserialize_merkle_proof, deserialize_proof, detect_proof_type, ProofType,
13};
14use crate::replication::config::K_BUCKET_SIZE;
15use crate::storage::lmdb::LmdbStorage;
16use ant_protocol::payment::verify::{verify_quote_content, verify_quote_signature};
17use evmlib::common::{Amount, QuoteHash};
18use evmlib::contract::payment_vault;
19use evmlib::merkle_batch_payment::{OnChainPaymentInfo, PoolHash};
20use evmlib::Network as EvmNetwork;
21use evmlib::PaymentQuote;
22use evmlib::ProofOfPayment;
23use evmlib::RewardsAddress;
24use lru::LruCache;
25use parking_lot::{Mutex, RwLock};
26use saorsa_core::identity::node_identity::peer_id_from_public_key_bytes;
27use saorsa_core::identity::PeerId;
28use saorsa_core::P2PNode;
29#[cfg(any(test, feature = "test-utils"))]
30use std::collections::HashMap;
31use std::num::NonZeroUsize;
32use std::sync::Arc;
33use std::time::Instant;
34
35/// Minimum allowed size for a payment proof in bytes.
36///
37/// This minimum ensures the proof contains at least a basic cryptographic hash or identifier.
38/// Proofs smaller than this are rejected as they cannot contain sufficient payment information.
39pub const MIN_PAYMENT_PROOF_SIZE_BYTES: usize = 32;
40
41/// Maximum allowed size for a payment proof in bytes (256 KB).
42///
43/// Single-node proofs with 7 ML-DSA-65 quotes reach ~40 KB.
44/// Merkle proofs include 16 candidate nodes (each with ~1,952-byte ML-DSA pub key
45/// and ~3,309-byte signature) plus merkle branch hashes, totaling ~130 KB.
46/// 256 KB provides headroom while still capping memory during verification.
47pub const MAX_PAYMENT_PROOF_SIZE_BYTES: usize = 262_144;
48
49/// Maximum percentage by which the median-paid quote may fall below this
50/// verifier's current local price before a client PUT is rejected.
51///
52/// A 20% floor means a paid quote must be at least `0.8 * P_v`, so an
53/// attacker who controls a real K-close issuer still pays at least
54/// `0.8 * P_v * 3` for an honest verifier. Honest median-paid bundles have
55/// a structural majority guarantee: the four nodes at or below the median
56/// accept unless their own price grows more than `1 / 0.8 = 1.25x` between
57/// quote and PUT. Above-median nodes may reject when `P_v > 1.25 * median`;
58/// those records are backfilled by replication, which deliberately skips
59/// this present-tense floor.
60const PAID_QUOTE_PRICE_FLOOR_TOLERANCE_PCT: u64 = 20;
61
62const PERCENT_DENOMINATOR: u64 = 100;
63const PAID_QUOTE_PAYMENT_MULTIPLIER: u64 = 3;
64const PAYMENT_VERIFY_SLOW_LOG_MS: u128 = 500;
65
66/// Number of nearest DHT peers accepted for paid-quote issuer locality.
67///
68/// This is the Kademlia K width, intentionally wider than `CLOSE_GROUP_SIZE`.
69const PAID_QUOTE_ISSUER_CLOSENESS_WIDTH: usize = K_BUCKET_SIZE;
70
71#[derive(Clone, Copy)]
72struct LegacyMedianCandidate<'a> {
73    encoded_peer_id: &'a evmlib::EncodedPeerId,
74    quote: &'a PaymentQuote,
75    expected_amount: Amount,
76}
77
78fn price_floor(current_price: Amount, tolerance_pct: u64) -> Amount {
79    current_price.saturating_mul(Amount::from(
80        PERCENT_DENOMINATOR.saturating_sub(tolerance_pct),
81    )) / Amount::from(PERCENT_DENOMINATOR)
82}
83
84fn median_quote_index(quote_count: usize) -> usize {
85    quote_count / 2
86}
87
88fn payment_proof_type_label(payment_proof: Option<&[u8]>) -> &'static str {
89    match payment_proof.and_then(detect_proof_type) {
90        Some(ProofType::Merkle) => "merkle",
91        Some(ProofType::SingleNode) => "single_node",
92        Some(_) => "unsupported",
93        None if payment_proof.is_some() => "unknown",
94        None => "none",
95    }
96}
97
98/// Configuration for EVM payment verification.
99///
100/// EVM verification is always on. All new data requires on-chain
101/// payment verification. The network field selects which EVM chain to use.
102#[derive(Debug, Clone)]
103pub struct EvmVerifierConfig {
104    /// EVM network to use (Arbitrum One, Arbitrum Sepolia, etc.)
105    pub network: EvmNetwork,
106}
107
108impl Default for EvmVerifierConfig {
109    fn default() -> Self {
110        Self {
111            network: EvmNetwork::ArbitrumOne,
112        }
113    }
114}
115
116/// Configuration for the payment verifier.
117///
118/// All new data requires EVM payment on Arbitrum. The cache stores
119/// previously verified payments to avoid redundant on-chain lookups.
120#[derive(Debug, Clone)]
121pub struct PaymentVerifierConfig {
122    /// EVM verifier configuration.
123    pub evm: EvmVerifierConfig,
124    /// Cache capacity (number of `XorName` values to cache).
125    pub cache_capacity: usize,
126    /// Close-group width exposed to storage and replication admission callers.
127    pub close_group_size: usize,
128    /// Local node's rewards address.
129    ///
130    /// Kept in the verifier config for payment policies that bind receipts to
131    /// this node's payout address.
132    pub local_rewards_address: RewardsAddress,
133}
134
135/// The fresh admission path a payment proof is being verified for.
136///
137/// - **`ClientPut`** — the node is admitting a chunk store. The verifier
138///   applies store-strength cache semantics and live payment checks.
139/// - **`PaidListAdmission`** — the node is admitting fresh paid-list metadata.
140///   It runs the same live payment checks as `ClientPut`, but writes a weaker
141///   cache entry that does not authorize future chunk stores.
142///
143/// The caller must check local receiver/admission membership before invoking
144/// the verifier for replication admission: fresh chunk replication requires
145/// local close-group responsibility, and fresh paid-list replication requires
146/// local paid-list close-group membership. Direct client PUT deliberately does
147/// not perform a receiver-responsibility gate. The verifier itself only checks
148/// payment proof validity and that the paid quote's issuer is in the K closest
149/// peers for the quoted chunk address.
150///
151/// Immediate fresh chunk replication is different: the receiver is about to
152/// store the newly written chunk as if the client PUT it there directly, so
153/// that call site deliberately uses `ClientPut`.
154///
155/// Later neighbour-sync repair does not include proof-of-payment bytes and
156/// does not call this verifier. It authorizes repair from network evidence:
157/// majority storage among the configured close group, or majority paid-list
158/// membership among the closest K.
159#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160pub enum VerificationContext {
161    /// The node is admitting a chunk store with store-strength cache semantics.
162    ClientPut,
163    /// The node is admitting fresh paid-list metadata with paid-list-strength
164    /// cache semantics.
165    PaidListAdmission,
166}
167
168/// Status returned by payment verification.
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
170pub enum PaymentStatus {
171    /// Data was found in local cache - previously paid.
172    CachedAsVerified,
173    /// New data - payment required.
174    PaymentRequired,
175    /// Payment was provided and verified.
176    PaymentVerified,
177}
178
179impl PaymentStatus {
180    /// Returns true if the data can be stored (cached or payment verified).
181    #[must_use]
182    pub fn can_store(&self) -> bool {
183        matches!(self, Self::CachedAsVerified | Self::PaymentVerified)
184    }
185
186    /// Returns true if this status indicates the data was already paid for.
187    #[must_use]
188    pub fn is_cached(&self) -> bool {
189        matches!(self, Self::CachedAsVerified)
190    }
191}
192
193/// Default capacity for the merkle pool cache (number of pool hashes to cache).
194const DEFAULT_POOL_CACHE_CAPACITY: usize = 1_000;
195
196/// Main payment verifier for ant-node.
197///
198/// Uses:
199/// 1. LRU cache for fast lookups of previously verified `XorName` values
200/// 2. EVM payment verification for new data (always required)
201/// 3. Pool-level cache for merkle batch payments (avoids repeated on-chain queries)
202pub struct PaymentVerifier {
203    /// LRU cache of verified `XorName` values.
204    cache: VerifiedCache,
205    /// LRU cache of verified merkle pool hashes → on-chain payment info.
206    pool_cache: Mutex<LruCache<PoolHash, OnChainPaymentInfo>>,
207    /// LRU cache of pool hashes whose candidate closeness has already been
208    /// verified by this node. Collapses the per-chunk Kademlia lookup cost
209    /// within a batch (256 chunks × 1 pool = 1 lookup instead of 256).
210    closeness_pass_cache: Mutex<LruCache<PoolHash, ()>>,
211    /// In-flight closeness lookups, keyed by pool hash. Lets concurrent PUTs
212    /// for the same pool coalesce onto a single Kademlia lookup AND share
213    /// its result — on both success and failure — which bounds `DoS`
214    /// amplification to one lookup per unique `pool_hash` regardless of
215    /// concurrency.
216    inflight_closeness: Mutex<LruCache<PoolHash, Arc<ClosenessSlot>>>,
217    /// P2P node handle, attached post-construction so paid-quote verification
218    /// can check paid-quote issuer K-closeness, and merkle verification can
219    /// check that candidate `pub_keys` map to peers actually close to the pool
220    /// midpoint in the live DHT. `None` in unit tests that don't exercise
221    /// live-DHT checks; production startup MUST call [`attach_p2p_node`].
222    p2p_node: RwLock<Option<Arc<P2PNode>>>,
223    /// LMDB storage handle, attached post-construction so the paid-quote
224    /// price-floor check can read the authoritative on-disk record count without
225    /// depending on a side counter that may drift from replication/repair/prune
226    /// paths. `None` in unit tests that pre-set [`Self::test_records_override`];
227    /// production startup MUST call [`attach_storage`].
228    storage: RwLock<Option<Arc<LmdbStorage>>>,
229    /// Test-only override for the paid-quote local price floor.
230    ///
231    /// When `Some(n)`, `validate_paid_quote_price_floor` uses `n` as the
232    /// current record count instead of querying `storage.current_chunks()`. Set via
233    /// [`Self::set_records_stored_for_tests`] so unit tests that don't wire a
234    /// real `LmdbStorage` can still drive the price-floor logic.
235    test_records_override: RwLock<Option<u64>>,
236    /// Test-only override for the paid-quote issuer K-closest check.
237    ///
238    /// Production code derives closest peers from the attached [`P2PNode`].
239    #[cfg(any(test, feature = "test-utils"))]
240    test_paid_quote_k_closest_override: RwLock<Option<Vec<[u8; 32]>>>,
241    /// Test-only override for `completedPayments(quote_hash)`.
242    ///
243    /// Production always queries the payment vault; unit tests use this to
244    /// exercise the full verifier path without starting an EVM chain.
245    #[cfg(any(test, feature = "test-utils"))]
246    test_completed_payments_override: RwLock<HashMap<QuoteHash, Amount>>,
247    /// Configuration.
248    config: PaymentVerifierConfig,
249}
250
251/// Shared state for an inflight closeness verification. The leader publishes
252/// its result via the `OnceLock`; waiters read that result directly instead
253/// of racing on a cache re-check. Wrapped in an `Arc` and held both by the
254/// leader's drop guard and by each waiting task.
255struct ClosenessSlot {
256    notify: Arc<tokio::sync::Notify>,
257    /// `Some(Ok(()))` on success, `Some(Err(msg))` on failure, `None` if the
258    /// leader disappeared without publishing (panic, cancellation).
259    result: std::sync::OnceLock<std::result::Result<(), String>>,
260}
261
262impl ClosenessSlot {
263    fn new() -> Self {
264        Self {
265            notify: Arc::new(tokio::sync::Notify::new()),
266            result: std::sync::OnceLock::new(),
267        }
268    }
269
270    /// Build an owned `Notified` future that snapshots the `notify_waiters`
271    /// counter at call time. Awaiting this future after dropping external
272    /// locks is race-free: if `notify_waiters` fires between construction
273    /// and the first poll, the snapshot mismatch resolves the future
274    /// immediately.
275    fn notified_owned(&self) -> tokio::sync::futures::OwnedNotified {
276        Arc::clone(&self.notify).notified_owned()
277    }
278}
279
280/// Drop guard that publishes the leader's result, clears the inflight slot,
281/// and wakes all waiters. Fires on every exit path: success, failure, panic,
282/// future-cancellation.
283///
284/// The guard owns its own `Arc<ClosenessSlot>` so `notify_waiters` still
285/// fires even if LRU pressure evicted the slot before the leader finished.
286/// Waiters see the published result via `result.get()`; the `Notify` is only
287/// the wake-up signal.
288struct InflightGuard<'a> {
289    slot_cache: &'a Mutex<LruCache<PoolHash, Arc<ClosenessSlot>>>,
290    pool_hash: PoolHash,
291    slot: Arc<ClosenessSlot>,
292}
293
294impl InflightGuard<'_> {
295    /// Publish the leader's result. Called exactly once by the leader on
296    /// every successful or explicit-error exit. If dropped without calling
297    /// (panic, cancellation) the guard still wakes waiters but leaves
298    /// `result` empty, which waiters treat as a transient failure and retry.
299    fn publish(&self, result: &Result<()>) {
300        let stored: std::result::Result<(), String> = match result {
301            Ok(()) => Ok(()),
302            Err(e) => Err(e.to_string()),
303        };
304        let _ = self.slot.result.set(stored);
305    }
306}
307
308impl Drop for InflightGuard<'_> {
309    fn drop(&mut self) {
310        // Remove the slot entry if it's still ours. A separate leader may
311        // have inserted a new slot for the same pool_hash after LRU
312        // eviction — don't pop someone else's entry.
313        {
314            let mut cache = self.slot_cache.lock();
315            if let Some(existing) = cache.peek(&self.pool_hash) {
316                if Arc::ptr_eq(existing, &self.slot) {
317                    cache.pop(&self.pool_hash);
318                }
319            }
320        }
321        // Wake every waiter registered against OUR slot, regardless of
322        // whether the cache entry is still ours.
323        self.slot.notify.notify_waiters();
324    }
325}
326
327impl PaymentVerifier {
328    /// Create a new payment verifier.
329    #[must_use]
330    pub fn new(config: PaymentVerifierConfig) -> Self {
331        const _: () = assert!(
332            DEFAULT_POOL_CACHE_CAPACITY > 0,
333            "pool cache capacity must be > 0"
334        );
335        let cache = VerifiedCache::with_capacity(config.cache_capacity);
336        let pool_cache_size =
337            NonZeroUsize::new(DEFAULT_POOL_CACHE_CAPACITY).unwrap_or(NonZeroUsize::MIN);
338        let pool_cache = Mutex::new(LruCache::new(pool_cache_size));
339        let closeness_pass_cache = Mutex::new(LruCache::new(pool_cache_size));
340        let inflight_closeness = Mutex::new(LruCache::new(pool_cache_size));
341
342        let cache_capacity = config.cache_capacity;
343        info!("Payment verifier initialized (cache_capacity={cache_capacity}, evm=always-on, pool_cache={DEFAULT_POOL_CACHE_CAPACITY})");
344
345        // Loud warning if a production binary was accidentally built with
346        // `test-utils`: that feature flips the live-DHT payment-check
347        // fail-open switches when P2PNode isn't attached. Safe in tests, never
348        // intended for prod.
349        #[cfg(feature = "test-utils")]
350        crate::logging::error!(
351            "PaymentVerifier: built with `test-utils` feature — payment live-DHT \
352             checks fall back to fail-open when no P2PNode is attached. This \
353             feature is for test binaries only; production nodes must be built \
354             without it."
355        );
356
357        Self {
358            cache,
359            pool_cache,
360            closeness_pass_cache,
361            inflight_closeness,
362            p2p_node: RwLock::new(None),
363            storage: RwLock::new(None),
364            test_records_override: RwLock::new(None),
365            #[cfg(any(test, feature = "test-utils"))]
366            test_paid_quote_k_closest_override: RwLock::new(None),
367            #[cfg(any(test, feature = "test-utils"))]
368            test_completed_payments_override: RwLock::new(HashMap::new()),
369            config,
370        }
371    }
372
373    /// Attach the node's [`P2PNode`] handle so paid-quote verification can
374    /// check issuer closeness, and merkle-payment verification can check
375    /// candidate `pub_keys` against the DHT's actual closest peers to the pool
376    /// midpoint.
377    ///
378    /// Production startup MUST call this once the `P2PNode` exists. Without
379    /// it, live-DHT payment checks fail CLOSED in release builds with a visible
380    /// error and fail open in test builds. Idempotent: calling twice replaces
381    /// the handle.
382    pub fn attach_p2p_node(&self, node: Arc<P2PNode>) {
383        *self.p2p_node.write() = Some(node);
384        debug!("PaymentVerifier: P2PNode attached for payment live-DHT checks");
385    }
386
387    /// Configured close-group width used by storage admission callers.
388    #[must_use]
389    pub fn close_group_size(&self) -> usize {
390        self.config.close_group_size
391    }
392
393    /// Attach the node's [`LmdbStorage`] handle so paid-quote price-floor
394    /// checks can query the authoritative on-disk record count.
395    ///
396    /// Production startup MUST call this once the storage exists; otherwise
397    /// client PUTs using paid-quote verification are rejected because
398    /// the local economic floor cannot be checked. Idempotent: calling twice
399    /// replaces the handle.
400    pub fn attach_storage(&self, storage: Arc<LmdbStorage>) {
401        *self.storage.write() = Some(storage);
402        debug!("PaymentVerifier: LmdbStorage attached for paid-quote price-floor checks");
403    }
404
405    /// Test-only setter for the current record count used by paid-quote
406    /// price-floor checks. Lets unit tests drive the floor logic without
407    /// wiring a real `LmdbStorage`. Has no effect in production code because
408    /// production code is expected to call [`Self::attach_storage`] instead.
409    #[cfg(any(test, feature = "test-utils"))]
410    pub fn set_records_stored_for_tests(&self, count: u64) {
411        *self.test_records_override.write() = Some(count);
412    }
413
414    /// Test-only setter for local closest peers used by the paid-quote
415    /// issuer K-closest check.
416    #[cfg(any(test, feature = "test-utils"))]
417    pub fn set_paid_quote_k_closest_for_tests(&self, peer_ids: Vec<[u8; 32]>) {
418        *self.test_paid_quote_k_closest_override.write() = Some(peer_ids);
419    }
420
421    /// Compatibility alias for older tests that called this the close group.
422    /// The check now accepts the K closest peers for the quoted chunk address.
423    #[cfg(any(test, feature = "test-utils"))]
424    pub fn set_paid_quote_close_group_for_tests(&self, peer_ids: Vec<[u8; 32]>) {
425        self.set_paid_quote_k_closest_for_tests(peer_ids);
426    }
427
428    /// Compatibility alias for older tests that called this the known-peer
429    /// set. The check now accepts the K closest peers for the quoted chunk
430    /// address.
431    #[cfg(any(test, feature = "test-utils"))]
432    pub fn set_paid_quote_known_peers_for_tests(&self, peer_ids: Vec<[u8; 32]>) {
433        self.set_paid_quote_k_closest_for_tests(peer_ids);
434    }
435
436    /// Test-only setter for an on-chain completed payment amount.
437    #[cfg(any(test, feature = "test-utils"))]
438    pub fn set_completed_payment_for_tests(&self, quote_hash: QuoteHash, amount: Amount) {
439        self.test_completed_payments_override
440            .write()
441            .insert(quote_hash, amount);
442    }
443
444    /// Snapshot the current record count for paid-quote price-floor checks.
445    ///
446    /// Prefers the attached `LmdbStorage` (authoritative — covers client PUTs,
447    /// replication stores, repair fetches, and prune deletes by definition).
448    /// Falls back to a test override if one was set. Returns `None` only when
449    /// no source is available (mis-configured production startup). The
450    /// paid-quote floor rejects client PUTs because the local floor is
451    /// the economic security gate for this proof policy.
452    fn current_records_stored(&self) -> Option<u64> {
453        if let Some(storage) = self.storage.read().as_ref() {
454            match storage.current_chunks() {
455                Ok(n) => return Some(n),
456                Err(e) => {
457                    warn!(
458                        "PaymentVerifier: failed to read current_chunks() for price-floor check: {e}"
459                    );
460                    return None;
461                }
462            }
463        }
464        *self.test_records_override.read()
465    }
466
467    /// Check if payment is required for the given `XorName`.
468    ///
469    /// This is the main entry point for payment verification:
470    /// 1. Check LRU cache (fast path)
471    /// 2. If not cached, payment is required
472    ///
473    /// The fast path is context-aware. A `ClientPut` lookup is satisfied only
474    /// by a close-group store verification. A `PaidListAdmission` lookup is
475    /// satisfied by either a paid-list or client-PUT verification.
476    ///
477    /// # Arguments
478    ///
479    /// * `xorname` - The content-addressed name of the data
480    /// * `context` - The verification context of the caller
481    ///
482    /// # Returns
483    ///
484    /// * `PaymentStatus::CachedAsVerified` - Found in local cache (previously paid)
485    /// * `PaymentStatus::PaymentRequired` - Not cached (payment required)
486    pub fn check_payment_required(
487        &self,
488        xorname: &XorName,
489        context: VerificationContext,
490    ) -> PaymentStatus {
491        // Check LRU cache (fast path)
492        let cached = match context {
493            VerificationContext::ClientPut => self.cache.contains_client_put_verified(xorname),
494            VerificationContext::PaidListAdmission => {
495                self.cache.contains_paid_list_verified(xorname)
496            }
497        };
498        if cached {
499            if crate::logging::enabled!(crate::logging::Level::DEBUG) {
500                debug!("Data {} found in verified cache", hex::encode(xorname));
501            }
502            return PaymentStatus::CachedAsVerified;
503        }
504
505        // Not in cache - payment required
506        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
507            debug!(
508                "Data {} not in cache - payment required",
509                hex::encode(xorname)
510            );
511        }
512        PaymentStatus::PaymentRequired
513    }
514
515    /// Verify that a PUT request has valid payment.
516    ///
517    /// This is the complete payment verification flow:
518    /// 1. Check if data is in cache (previously paid)
519    /// 2. If not, verify the provided payment proof
520    ///
521    /// # Arguments
522    ///
523    /// * `xorname` - The content-addressed name of the data
524    /// * `payment_proof` - Optional payment proof (required if not in cache)
525    /// * `context` - Which fresh admission path is verifying the proof — see
526    ///   [`VerificationContext`] for cache-strength semantics
527    ///
528    /// # Returns
529    ///
530    /// * `Ok(PaymentStatus)` - Verification succeeded
531    /// * `Err(Error::Payment)` - No payment and not cached, or payment invalid
532    ///
533    /// # Errors
534    ///
535    /// Returns an error if payment is required but not provided, or if payment is invalid.
536    pub async fn verify_payment(
537        &self,
538        xorname: &XorName,
539        payment_proof: Option<&[u8]>,
540        context: VerificationContext,
541    ) -> Result<PaymentStatus> {
542        let started = Instant::now();
543        let proof_type = payment_proof_type_label(payment_proof);
544        let proof_bytes = payment_proof.map_or(0, <[u8]>::len);
545        let result = self
546            .verify_payment_inner(xorname, payment_proof, context)
547            .await;
548        let elapsed_ms = started.elapsed().as_millis();
549
550        match &result {
551            Ok(status) if elapsed_ms >= PAYMENT_VERIFY_SLOW_LOG_MS => {
552                info!(
553                    target: "ant_node::payment::verify",
554                    "Slow payment verification: context={context:?}, proof_type={proof_type}, proof_bytes={proof_bytes}, status={status:?}, elapsed_ms={elapsed_ms}",
555                );
556            }
557            Ok(status) => {
558                debug!(
559                    target: "ant_node::payment::verify",
560                    "Payment verification: context={context:?}, proof_type={proof_type}, proof_bytes={proof_bytes}, status={status:?}, elapsed_ms={elapsed_ms}",
561                );
562            }
563            Err(e) if elapsed_ms >= PAYMENT_VERIFY_SLOW_LOG_MS => {
564                warn!(
565                    target: "ant_node::payment::verify",
566                    "Slow payment verification failed: context={context:?}, proof_type={proof_type}, proof_bytes={proof_bytes}, elapsed_ms={elapsed_ms}: {e}",
567                );
568            }
569            Err(e) => {
570                debug!(
571                    target: "ant_node::payment::verify",
572                    "Payment verification failed: context={context:?}, proof_type={proof_type}, proof_bytes={proof_bytes}, elapsed_ms={elapsed_ms}: {e}",
573                );
574            }
575        }
576
577        result
578    }
579
580    async fn verify_payment_inner(
581        &self,
582        xorname: &XorName,
583        payment_proof: Option<&[u8]>,
584        context: VerificationContext,
585    ) -> Result<PaymentStatus> {
586        // First check if payment is required
587        let status = self.check_payment_required(xorname, context);
588
589        match status {
590            PaymentStatus::CachedAsVerified => {
591                // No payment needed - already in cache
592                Ok(status)
593            }
594            PaymentStatus::PaymentRequired => {
595                // EVM verification is always on — verify the proof
596                if let Some(proof) = payment_proof {
597                    let proof_len = proof.len();
598                    if proof_len < MIN_PAYMENT_PROOF_SIZE_BYTES {
599                        return Err(Error::Payment(format!(
600                            "Payment proof too small: {proof_len} bytes (min {MIN_PAYMENT_PROOF_SIZE_BYTES})"
601                        )));
602                    }
603                    if proof_len > MAX_PAYMENT_PROOF_SIZE_BYTES {
604                        return Err(Error::Payment(format!(
605                            "Payment proof too large: {proof_len} bytes (max {MAX_PAYMENT_PROOF_SIZE_BYTES} bytes)"
606                        )));
607                    }
608
609                    // Detect proof type from version tag byte
610                    match detect_proof_type(proof) {
611                        Some(ProofType::Merkle) => {
612                            self.verify_merkle_payment(xorname, proof, context).await?;
613                        }
614                        Some(ProofType::SingleNode) => {
615                            let (payment, tx_hashes) = deserialize_proof(proof).map_err(|e| {
616                                Error::Payment(format!("Failed to deserialize payment proof: {e}"))
617                            })?;
618
619                            if !tx_hashes.is_empty() {
620                                debug!("Proof includes {} transaction hash(es)", tx_hashes.len());
621                            }
622
623                            self.verify_evm_payment(xorname, &payment, context).await?;
624                        }
625                        None => {
626                            let tag = proof.first().copied().unwrap_or(0);
627                            return Err(Error::Payment(format!(
628                                "Unknown payment proof type tag: 0x{tag:02x}"
629                            )));
630                        }
631                        // ant-protocol marks `ProofType` as `#[non_exhaustive]`.
632                        // A future proof variant that this node does not yet
633                        // understand must be rejected, not silently accepted.
634                        Some(_) => {
635                            let tag = proof.first().copied().unwrap_or(0);
636                            return Err(Error::Payment(format!(
637                                "Unsupported payment proof type tag: 0x{tag:02x} (this node's protocol version does not handle it — upgrade ant-node)"
638                            )));
639                        }
640                    }
641
642                    // Cache the verified xorname at the context's verification
643                    // strength. Stronger entries satisfy weaker future lookups,
644                    // but not the reverse.
645                    match context {
646                        VerificationContext::ClientPut => self.cache.insert(*xorname),
647                        VerificationContext::PaidListAdmission => {
648                            self.cache.insert_paid_list_verified(*xorname);
649                        }
650                    }
651
652                    Ok(PaymentStatus::PaymentVerified)
653                } else {
654                    // No payment provided in production mode
655                    let xorname_hex = hex::encode(xorname);
656                    Err(Error::Payment(format!(
657                        "Payment required for new data {xorname_hex}"
658                    )))
659                }
660            }
661            PaymentStatus::PaymentVerified => Err(Error::Payment(
662                "Unexpected PaymentVerified status from check_payment_required".to_string(),
663            )),
664        }
665    }
666
667    /// Get cache statistics.
668    #[must_use]
669    pub fn cache_stats(&self) -> CacheStats {
670        self.cache.stats()
671    }
672
673    /// Get the number of cached entries.
674    #[must_use]
675    pub fn cache_len(&self) -> usize {
676        self.cache.len()
677    }
678
679    /// Pre-populate the payment cache for a given address.
680    ///
681    /// This marks the address as already paid, so subsequent `verify_payment`
682    /// calls will return `CachedAsVerified` without on-chain verification.
683    /// Useful for test setups where real EVM payment is not needed.
684    #[cfg(any(test, feature = "test-utils"))]
685    pub fn cache_insert(&self, xorname: XorName) {
686        self.cache.insert(xorname);
687    }
688
689    /// Pre-populate the merkle pool cache. Testing helper that lets e2e tests
690    /// bypass the on-chain `completedMerklePayments` lookup when the point of
691    /// the test is to exercise merkle-verification logic BEFORE the on-chain
692    /// call (e.g. the pay-yourself closeness check).
693    #[cfg(any(test, feature = "test-utils"))]
694    pub fn pool_cache_insert(&self, pool_hash: PoolHash, info: OnChainPaymentInfo) {
695        let mut cache = self.pool_cache.lock();
696        cache.put(pool_hash, info);
697    }
698
699    /// Verify a single-node EVM payment proof.
700    ///
701    /// Verification steps:
702    /// 1. Between 1 and `CLOSE_GROUP_SIZE` quotes are present
703    /// 2. Median-priced candidate quotes are derived from the supplied bundle
704    /// 3. Each candidate is checked for content binding, peer binding, and a
705    ///    valid ML-DSA-65 signature
706    /// 4. Each candidate must also come from a local K-close peer and
707    ///    satisfy the paid-quote price floor
708    /// 5. A candidate is accepted only if `completedPayments(quoteHash)` is at
709    ///    least 3x the median price
710    ///
711    /// Non-median quotes are parsed only to locate the median. Their content,
712    /// peer bindings, and signatures are deliberately ignored: the paid
713    /// quote's content hash, quote hash, signature, local floor, issuer
714    /// K-closeness check, and on-chain settlement are the authority. A
715    /// one-quote proof is valid when that single quote passes these checks and
716    /// was paid 3x.
717    async fn verify_evm_payment(
718        &self,
719        xorname: &XorName,
720        payment: &ProofOfPayment,
721        context: VerificationContext,
722    ) -> Result<()> {
723        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
724            let xorname_hex = hex::encode(xorname);
725            let quote_count = payment.peer_quotes.len();
726            debug!(
727                "Verifying EVM payment for {xorname_hex} with {quote_count} quotes ({context:?})"
728            );
729        }
730
731        Self::validate_quote_structure(payment)?;
732        let candidates = Self::legacy_median_candidates(payment)?;
733        let mut failures = Vec::with_capacity(candidates.len());
734        let mut verified_paid_quote = false;
735
736        for candidate in candidates {
737            match self
738                .verify_legacy_median_candidate(xorname, candidate)
739                .await
740            {
741                Ok(()) => {
742                    verified_paid_quote = true;
743                    break;
744                }
745                Err(err) => failures.push(err.to_string()),
746            }
747        }
748
749        if !verified_paid_quote {
750            let xorname_hex = hex::encode(xorname);
751            let details = if failures.is_empty() {
752                "no median-priced candidates were available".to_string()
753            } else {
754                failures.join("; ")
755            };
756            return Err(Error::Payment(format!(
757                "Median quote payment verification failed for {xorname_hex}: {details}"
758            )));
759        }
760
761        if crate::logging::enabled!(crate::logging::Level::INFO) {
762            let xorname_hex = hex::encode(xorname);
763            info!("EVM payment verified for {xorname_hex}");
764        }
765        Ok(())
766    }
767
768    fn legacy_median_candidates(
769        payment: &ProofOfPayment,
770    ) -> Result<Vec<LegacyMedianCandidate<'_>>> {
771        let mut sorted_quotes: Vec<(&evmlib::EncodedPeerId, &PaymentQuote)> = payment
772            .peer_quotes
773            .iter()
774            .map(|(encoded_peer_id, quote)| (encoded_peer_id, quote))
775            .collect();
776        sorted_quotes.sort_by_key(|(_, quote)| quote.price);
777        let quote_count = sorted_quotes.len();
778        let median_index = median_quote_index(quote_count);
779        let median_price = sorted_quotes
780            .get(median_index)
781            .ok_or_else(|| {
782                Error::Payment(format!("Missing paid quote at median index {median_index}"))
783            })?
784            .1
785            .price;
786        let expected_amount = median_price
787            .checked_mul(Amount::from(PAID_QUOTE_PAYMENT_MULTIPLIER))
788            .ok_or_else(|| {
789                Error::Payment(format!(
790                    "Median quote payment amount overflow for price {median_price}"
791                ))
792            })?;
793
794        if expected_amount == Amount::ZERO || median_price == Amount::ZERO {
795            return Err(Error::Payment(format!(
796                "Median quote has zero price/amount (price={median_price}, amount={expected_amount}); refusing to verify as paid"
797            )));
798        }
799
800        Ok(sorted_quotes
801            .into_iter()
802            .filter(|(_, quote)| quote.price == median_price)
803            .map(|(encoded_peer_id, quote)| LegacyMedianCandidate {
804                encoded_peer_id,
805                quote,
806                expected_amount,
807            })
808            .collect())
809    }
810
811    async fn verify_legacy_median_candidate(
812        &self,
813        xorname: &XorName,
814        candidate: LegacyMedianCandidate<'_>,
815    ) -> Result<()> {
816        Self::validate_paid_quote_content(xorname, candidate)?;
817        let issuer_peer_id =
818            Self::validate_paid_quote_peer_binding(candidate.encoded_peer_id, candidate.quote)?;
819
820        self.validate_paid_quote_issuer_k_closest(xorname, &issuer_peer_id)
821            .await?;
822        self.validate_paid_quote_price_floor(candidate.quote)?;
823
824        Self::validate_paid_quote_signature(candidate).await?;
825
826        let on_chain_amount = self
827            .completed_payment_amount(candidate.quote.hash())
828            .await?;
829        if on_chain_amount >= candidate.expected_amount {
830            return Ok(());
831        }
832
833        Err(Error::Payment(format!(
834            "Median-priced quote for peer {:?} was not paid enough: expected at least {}, got {on_chain_amount}",
835            candidate.encoded_peer_id, candidate.expected_amount
836        )))
837    }
838
839    fn validate_paid_quote_content(
840        xorname: &XorName,
841        candidate: LegacyMedianCandidate<'_>,
842    ) -> Result<()> {
843        if verify_quote_content(candidate.quote, xorname) {
844            return Ok(());
845        }
846
847        let expected_hex = hex::encode(xorname);
848        let actual_hex = hex::encode(candidate.quote.content.0);
849        Err(Error::Payment(format!(
850            "Paid quote content address mismatch for peer {:?}: expected {expected_hex}, got {actual_hex}",
851            candidate.encoded_peer_id
852        )))
853    }
854
855    async fn validate_paid_quote_signature(candidate: LegacyMedianCandidate<'_>) -> Result<()> {
856        let quote_for_signature = candidate.quote.clone();
857        let peer_id_for_error = candidate.encoded_peer_id.clone();
858        tokio::task::spawn_blocking(move || {
859            if !verify_quote_signature(&quote_for_signature) {
860                return Err(Error::Payment(format!(
861                    "Paid quote ML-DSA-65 signature verification failed for peer {peer_id_for_error:?}"
862                )));
863            }
864            Ok(())
865        })
866        .await
867        .map_err(|e| Error::Payment(format!("Signature verification task failed: {e}")))?
868    }
869
870    async fn completed_payment_amount(&self, quote_hash: QuoteHash) -> Result<Amount> {
871        #[cfg(any(test, feature = "test-utils"))]
872        {
873            let completed_payment_override = {
874                self.test_completed_payments_override
875                    .read()
876                    .get(&quote_hash)
877                    .copied()
878            };
879            if let Some(amount) = completed_payment_override {
880                return Ok(amount);
881            }
882        }
883
884        let provider = evmlib::utils::http_provider(self.config.evm.network.rpc_url().clone());
885        let vault_address = *self.config.evm.network.payment_vault_address();
886        let contract = payment_vault::interface::IPaymentVault::new(vault_address, provider);
887
888        let result = contract
889            .completedPayments(quote_hash)
890            .call()
891            .await
892            .map_err(|e| Error::Payment(format!("completedPayments lookup failed: {e}")))?;
893
894        Ok(Amount::from(result.amount))
895    }
896
897    fn validate_paid_quote_peer_binding(
898        encoded_peer_id: &evmlib::EncodedPeerId,
899        quote: &PaymentQuote,
900    ) -> Result<PeerId> {
901        let expected_peer_id = peer_id_from_public_key_bytes(&quote.pub_key)
902            .map_err(|e| Error::Payment(format!("Invalid ML-DSA public key in quote: {e}")))?;
903
904        if expected_peer_id.as_bytes() != encoded_peer_id.as_bytes() {
905            let expected_hex = expected_peer_id.to_hex();
906            let actual_hex = hex::encode(encoded_peer_id.as_bytes());
907            return Err(Error::Payment(format!(
908                "Paid quote pub_key does not belong to claimed peer {encoded_peer_id:?}: \
909                 BLAKE3(pub_key) = {expected_hex}, peer_id = {actual_hex}"
910            )));
911        }
912
913        Ok(expected_peer_id)
914    }
915
916    fn validate_paid_quote_price_floor(&self, quote: &PaymentQuote) -> Result<()> {
917        let Some(current_records) = self.current_records_stored() else {
918            return Err(Error::Payment(
919                "PaymentVerifier: no record-count source attached; cannot verify \
920                 paid-quote local price floor"
921                    .to_string(),
922            ));
923        };
924
925        let current_price = calculate_price(usize::try_from(current_records).unwrap_or(usize::MAX));
926        let min_acceptable_price = price_floor(current_price, PAID_QUOTE_PRICE_FLOOR_TOLERANCE_PCT);
927
928        if quote.price < min_acceptable_price {
929            let quoted_records = derive_records_stored_from_price(quote.price);
930            return Err(Error::Payment(format!(
931                "Paid quote price below local floor: quoted price encodes \
932                 {quoted_records} records but node currently holds {current_records} \
933                 (quoted {}, minimum acceptable {min_acceptable_price} at \
934                 {PAID_QUOTE_PRICE_FLOOR_TOLERANCE_PCT}% under-payment tolerance)",
935                quote.price
936            )));
937        }
938
939        Ok(())
940    }
941
942    async fn validate_paid_quote_issuer_k_closest(
943        &self,
944        xorname: &XorName,
945        issuer_peer_id: &PeerId,
946    ) -> Result<()> {
947        #[cfg(any(test, feature = "test-utils"))]
948        if let Some(k_closest_peer_ids) = self.test_paid_quote_k_closest_override.read().as_ref() {
949            if k_closest_peer_ids
950                .iter()
951                .any(|peer_id| peer_id == issuer_peer_id.as_bytes())
952            {
953                return Ok(());
954            }
955            let issuer_closeness_width = PAID_QUOTE_ISSUER_CLOSENESS_WIDTH;
956            return Err(Error::Payment(format!(
957                "Paid quote issuer {} is not among this node's local K={issuer_closeness_width} closest peers for {}",
958                issuer_peer_id.to_hex(),
959                hex::encode(xorname)
960            )));
961        }
962
963        let attached = self.p2p_node.read().as_ref().map(Arc::clone);
964        let Some(p2p_node) = attached else {
965            #[cfg(any(test, feature = "test-utils"))]
966            {
967                crate::logging::warn!(
968                    "PaymentVerifier: no P2PNode attached; paid-quote issuer \
969                     K-closest check SKIPPED (test build). Production startup MUST call \
970                     PaymentVerifier::attach_p2p_node."
971                );
972                return Ok(());
973            }
974            #[cfg(not(any(test, feature = "test-utils")))]
975            {
976                crate::logging::error!(
977                    "PaymentVerifier: no P2PNode attached; rejecting paid-quote \
978                     payment. This is a node-startup bug — \
979                     PaymentVerifier::attach_p2p_node must be called before \
980                     any PUT handler runs."
981                );
982                return Err(Error::Payment(
983                    "Paid quote rejected: verifier is not wired to the P2P \
984                     layer; cannot verify issuer closeness."
985                        .into(),
986                ));
987            }
988        };
989
990        // Closeness *verification* must mirror the uploader's pure XOR-distance
991        // peer selection. `find_closest_nodes_local_with_self` reranks the local
992        // routing table by reachability (preferring directly-reachable peers,
993        // XOR only as a tiebreaker), which demotes an XOR-close relay-only /
994        // NAT'd peer out of the compared window and falsely rejects an honest
995        // payment that legitimately quoted that peer. Use the XOR-only sibling
996        // so this check matches how the client chose the quoted K-closest set.
997        let issuer_closeness_width = PAID_QUOTE_ISSUER_CLOSENESS_WIDTH;
998        let closest = p2p_node
999            .dht_manager()
1000            .find_closest_nodes_local_by_distance_with_self(xorname, issuer_closeness_width)
1001            .await;
1002        if closest.iter().any(|node| node.peer_id == *issuer_peer_id) {
1003            return Ok(());
1004        }
1005
1006        Err(Error::Payment(format!(
1007            "Paid quote issuer {} is not among this node's local K={issuer_closeness_width} closest peers for {}",
1008            issuer_peer_id.to_hex(),
1009            hex::encode(xorname)
1010        )))
1011    }
1012
1013    /// Validate quote count, uniqueness, and basic structure.
1014    fn validate_quote_structure(payment: &ProofOfPayment) -> Result<()> {
1015        if payment.peer_quotes.is_empty() {
1016            return Err(Error::Payment("Payment has no quotes".to_string()));
1017        }
1018
1019        let quote_count = payment.peer_quotes.len();
1020        if quote_count > CLOSE_GROUP_SIZE {
1021            return Err(Error::Payment(format!(
1022                "Payment must have at most {CLOSE_GROUP_SIZE} quotes, got {quote_count}"
1023            )));
1024        }
1025
1026        let mut seen: Vec<&evmlib::EncodedPeerId> = Vec::with_capacity(quote_count);
1027        for (encoded_peer_id, _) in &payment.peer_quotes {
1028            if seen.contains(&encoded_peer_id) {
1029                return Err(Error::Payment(format!(
1030                    "Duplicate peer ID in payment quotes: {encoded_peer_id:?}"
1031                )));
1032            }
1033            seen.push(encoded_peer_id);
1034        }
1035
1036        Ok(())
1037    }
1038
1039    /// Minimum number of candidate `pub_keys` (out of 16) whose derived
1040    /// `PeerId` must be among the DHT's actual closest peers to the pool
1041    /// midpoint address for the pool to be accepted.
1042    ///
1043    /// Set to a simple majority (9/16). Two nodes' views of the closest set
1044    /// to a midpoint diverge on a young, high-churn, NAT-heavy network — by
1045    /// more than a near-unanimous threshold tolerates — so a stricter bar
1046    /// rejected honest pools whose candidates are genuinely drawn from the
1047    /// midpoint's close group but don't all reappear in this storer's own
1048    /// lookup. A majority absorbs that divergence while still requiring most
1049    /// candidates to be real peers the live DHT lists as closest.
1050    ///
1051    /// Security cost: a lower threshold widens the room for the "pay-yourself"
1052    /// attack — an attacker running real neighbourhood peers needs fewer of
1053    /// them to clear a majority than to clear a near-unanimous bar. No theft
1054    /// of funds is possible regardless (payment binds on-chain to the rewards
1055    /// address); the cost is that grinding storage payments back to your own
1056    /// nodes gets cheaper. Each counted candidate must still be a peer the
1057    /// live DHT actually returns as closest — a fabricated off-network key
1058    /// cannot satisfy this — so the floor is "run N real top-K Sybil nodes
1059    /// AND grind the midpoint", just with a smaller N. Pairs with the planned
1060    /// pool-midpoint consensus-anchor work, which removes the midpoint
1061    /// grinding freedom that makes a low threshold dangerous.
1062    const CANDIDATE_CLOSENESS_REQUIRED: usize = 9;
1063
1064    /// Timeout for the authoritative network lookup used by the closeness
1065    /// check.
1066    ///
1067    /// Iterative Kademlia lookups can cascade through `MAX_ITERATIONS = 20`
1068    /// rounds in saorsa-core's `find_closest_nodes_network`, and a single
1069    /// unresponsive peer's dial can take 20–30s before timing out. On a
1070    /// young network (e.g. fresh testnet, NAT-simulated peers in 30% of
1071    /// the swarm) iterations average ~10s each — captured trace from
1072    /// STG-01 EWR-3 ant-node-1 just before a pre-fix timeout:
1073    ///
1074    /// ```text
1075    /// Iter 0: +0.0s | Iter 1: +0.2s | Iter 2: +6.6s | Iter 3: +13.1s
1076    /// Iter 4: +20.9s | Iter 5: +39.8s | Iter 6: +50.8s | [60s wall]
1077    /// ```
1078    ///
1079    /// 60s caps the lookup at ~7 iterations and rejects honest pools whose
1080    /// candidates only emerge after iteration 7. 240s gives ~1.2× headroom
1081    /// over the ~200s natural worst-case runtime on a 1k-node testnet.
1082    ///
1083    /// `DoS` amplification stays bounded at roughly one in-flight lookup
1084    /// per unique `pool_hash` under typical load, via
1085    /// [`closeness_pass_cache`] + [`inflight_closeness`]. The bound is
1086    /// "typical" because `inflight_closeness` is an LRU and a sustained
1087    /// flood of unique `pool_hash` entries can evict an in-flight slot,
1088    /// at which point a second leader can race for the same pool (see
1089    /// [`InflightGuard::drop`]). At steady state the pool cache and pool
1090    /// signature verification gate keep this rare in practice.
1091    const CLOSENESS_LOOKUP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(240);
1092
1093    /// Width of the storer's authoritative network lookup, in peers.
1094    ///
1095    /// The client over-queries `2 * CANDIDATES_PER_POOL = 32` peers via
1096    /// `find_closest_peers(addr, 32)` (see
1097    /// `ant-client/ant-core/src/data/client/merkle.rs::get_merkle_candidate_pool`)
1098    /// and selects 16 valid responders by XOR distance — so truly-close
1099    /// peers that are slow, NAT'd, or briefly unreachable get filtered
1100    /// out and replaced by peers from positions 17–32 of the network's
1101    /// actual ranking. The storer must therefore verify against the same
1102    /// wider window: a pool containing peers from positions 17–32 is
1103    /// honest (those peers really exist in the network's closest-32 set),
1104    /// it's just that the client's quote-collection step couldn't reach
1105    /// the peers at positions <17 in time.
1106    ///
1107    /// Empirical effect on STG-01 (1k-node testnet, 30% NAT-simulated):
1108    /// widening from K=16 to K=32 dropped client-side closeness
1109    /// mismatches from ~115 to ~31 per 5 min, a 73% reduction.
1110    ///
1111    /// Performance note: `count` does not just truncate the lookup —
1112    /// `find_closest_nodes_network` keeps iterating until either
1113    /// `MAX_ITERATIONS` is reached or `best_nodes.len() >= count`. K=32
1114    /// can therefore extend lookups by a few iterations on sparse
1115    /// networks vs K=16, which reinforces (rather than undermines) the
1116    /// timeout bump above.
1117    ///
1118    /// Security: the pay-yourself attack still requires the attacker's
1119    /// fabricated `PeerId`s to land in the storer's authoritative top-K, so
1120    /// the dominant cost is Sybil-grinding midpoint addresses or running real
1121    /// nodes near the target. The leniency for honest divergence comes from
1122    /// the `CANDIDATE_CLOSENESS_REQUIRED` majority threshold, not from this
1123    /// window; widening the window further was measured as too heavy on the
1124    /// lookup path.
1125    const CLOSENESS_LOOKUP_WIDTH: usize = 2 * evmlib::merkle_payments::CANDIDATES_PER_POOL;
1126
1127    /// Maximum waiter → leader retries when the leader's future was cancelled
1128    /// or panicked before publishing a result. Beyond this the waiter returns
1129    /// a visible error rather than spinning indefinitely through a
1130    /// cancellation cascade.
1131    ///
1132    /// Worst-case waiter wall-clock is `(MAX_LEADER_RETRIES + 1) *
1133    /// CLOSENESS_LOOKUP_TIMEOUT` (one wait per attempt). Kept low (1)
1134    /// because the only realistic trigger is leader future-cancellation,
1135    /// which should be extraordinarily rare; under sustained adversarial
1136    /// cancellation a higher cap doesn't add resilience, it just hides
1137    /// the symptom. With `CLOSENESS_LOOKUP_TIMEOUT = 240s` this caps a
1138    /// single user-visible verification at ~8 min worst case (vs ~20 min
1139    /// at the previous value of 4).
1140    const MAX_LEADER_RETRIES: usize = 1;
1141
1142    /// Compute the storer's authoritative-lookup width for a candidate pool.
1143    ///
1144    /// Returns `max(CLOSENESS_LOOKUP_WIDTH, pool_len)`: matches the client's
1145    /// over-query width today, and scales with the pool if a future protocol
1146    /// bump grows pool size beyond `CLOSENESS_LOOKUP_WIDTH`. Truncating to
1147    /// `CLOSENESS_LOOKUP_WIDTH` in that future case would re-open the
1148    /// K-too-small failure mode (the storer would reject honest pools whose
1149    /// candidates legitimately span a wider XOR range than the storer
1150    /// fetched). Pinned by `closeness_lookup_count_uses_max_of_width_and_pool_len`.
1151    const fn closeness_lookup_count(pool_len: usize) -> usize {
1152        if Self::CLOSENESS_LOOKUP_WIDTH > pool_len {
1153            Self::CLOSENESS_LOOKUP_WIDTH
1154        } else {
1155            pool_len
1156        }
1157    }
1158
1159    /// Verify that the candidate pool's `pub_keys` correspond to peers that
1160    /// are actually XOR-closest to the pool midpoint address, by querying
1161    /// the DHT for its closest peers to that address and requiring that a
1162    /// majority of the candidates match.
1163    ///
1164    /// **What this blocks**: the "pay yourself" attack. Candidate signatures
1165    /// only cover `(price, reward_address, timestamp)` and the `pub_key` bytes —
1166    /// nothing ties a candidate to a network-registered identity or to the
1167    /// pool neighbourhood. Without this check an attacker can generate 16
1168    /// ML-DSA keypairs locally, point all 16 `reward_address` fields at a
1169    /// single attacker-controlled wallet, submit the merkle payment, and drain
1170    /// their own payment back out.
1171    ///
1172    /// **How it blocks**: each candidate's `PeerId = BLAKE3(pub_key)`; the DHT
1173    /// is the authoritative source of "which peers exist at this XOR
1174    /// coordinate". If the attacker's 16 fabricated `PeerId`s are not among
1175    /// the peers the network actually lists as closest to the pool address,
1176    /// the pool is forged.
1177    ///
1178    /// **Scope**: a `MerklePaymentProof` carries exactly one `winner_pool`
1179    /// (the pool the smart contract selected for the batch). Every storing
1180    /// node that receives the proof independently re-runs this check against
1181    /// that same pool, so a forged pool is rejected at every node it
1182    /// reaches.
1183    ///
1184    /// **Known limitation — Sybil-grinding**: `midpoint_proof.address()` is a
1185    /// BLAKE3 hash of attacker-controllable inputs (leaf bytes, tree root,
1186    /// timestamp). A determined attacker who *also* runs Sybil DHT nodes can
1187    /// grind the midpoint until it lands in a region where a majority of
1188    /// their Sybil keys are the true network-closest — at which point this check
1189    /// passes for the attacker. Closing that gap requires binding the
1190    /// midpoint to an attacker-uncontrolled value (e.g. a block hash at
1191    /// payment time or an on-chain VRF) or a Sybil-resistant identity
1192    /// layer. This defence raises the attack cost from "free" to "run N
1193    /// Sybil nodes AND grind", which is a meaningful but not complete
1194    /// improvement.
1195    async fn verify_merkle_candidate_closeness(
1196        &self,
1197        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
1198        pool_hash: PoolHash,
1199    ) -> Result<()> {
1200        // Fast path: this node already verified this pool successfully.
1201        // A batch of 256 chunks shares one winner_pool, so without this cache
1202        // we'd pay a Kademlia lookup per chunk.
1203        if self.closeness_pass_cache.lock().get(&pool_hash).is_some() {
1204            return Ok(());
1205        }
1206
1207        // Single-flight: on each attempt, either claim leadership by
1208        // inserting a fresh `ClosenessSlot`, or wait on an existing leader
1209        // and read its published result. The leader holds an `Arc` to the
1210        // slot independent of the LruCache so waiters are still woken if
1211        // eviction pressure kicked the cache entry.
1212        //
1213        // The `notified_owned()` future snapshots the `notify_waiters`
1214        // counter at the moment of construction (while we hold the lock),
1215        // which makes the subsequent `.await` race-free: if the leader
1216        // calls `notify_waiters` between our construction and our poll, the
1217        // counter has advanced and the future resolves immediately on first
1218        // poll.
1219        //
1220        // Bounded retry: if we're a waiter and the leader gets cancelled or
1221        // panics (slot.result.get() == None after wake-up), we loop back to
1222        // claim leadership. `MAX_LEADER_RETRIES` bounds the attempts so
1223        // adversarial cancellation cascades cannot spin this indefinitely.
1224        for attempt in 0..=Self::MAX_LEADER_RETRIES {
1225            // Release the mutex guard explicitly before any await below.
1226            // Clippy wants `if let ... else` written as `map_or_else`, but
1227            // any such rewrite re-borrows the locked `inflight` inside the
1228            // closure and fails the borrow checker — so the lint is
1229            // silenced here.
1230            #[allow(clippy::option_if_let_else)]
1231            let (waiter_slot, leader_slot) = {
1232                let mut inflight = self.inflight_closeness.lock();
1233                let chosen = if let Some(existing) = inflight.get(&pool_hash) {
1234                    (Some(Arc::clone(existing)), None)
1235                } else {
1236                    let slot = Arc::new(ClosenessSlot::new());
1237                    inflight.put(pool_hash, Arc::clone(&slot));
1238                    (None, Some(slot))
1239                };
1240                drop(inflight);
1241                chosen
1242            };
1243
1244            if let Some(slot) = waiter_slot {
1245                // Build the owned-notified future BEFORE awaiting, so it
1246                // snapshots the `notify_waiters` counter now. The slot
1247                // already existed when we locked, so the leader is either
1248                // running or finished; in both cases the snapshot + counter
1249                // check ensures we wake up correctly.
1250                let notified = slot.notified_owned();
1251                notified.await;
1252
1253                // Leader published a result — use it directly.
1254                if let Some(result) = slot.result.get() {
1255                    return result.clone().map_err(Error::Payment);
1256                }
1257                // Leader disappeared without publishing (panic or
1258                // cancellation). Slot was cleared by the leader's drop
1259                // guard; loop to become the new leader — unless we've
1260                // hit the retry bound (see MAX_LEADER_RETRIES).
1261                if attempt == Self::MAX_LEADER_RETRIES {
1262                    return Err(Error::Payment(
1263                        "Merkle candidate pool rejected: closeness leader \
1264                         repeatedly failed to publish a result (likely \
1265                         repeated cancellation or panic)."
1266                            .into(),
1267                    ));
1268                }
1269                continue;
1270            }
1271
1272            // Leader path. Drop guard clears the slot and wakes waiters on
1273            // every exit (success, failure, panic, cancellation).
1274            let Some(slot) = leader_slot else {
1275                // Unreachable by construction.
1276                return Err(Error::Payment(
1277                    "internal error: neither leader nor waiter in closeness check".into(),
1278                ));
1279            };
1280            let guard = InflightGuard {
1281                slot_cache: &self.inflight_closeness,
1282                pool_hash,
1283                slot,
1284            };
1285
1286            let result = self.verify_merkle_candidate_closeness_inner(pool).await;
1287            guard.publish(&result);
1288            if result.is_ok() {
1289                self.closeness_pass_cache.lock().put(pool_hash, ());
1290            }
1291            return result;
1292        }
1293        // Unreachable: the for-loop body always either `return`s or `continue`s,
1294        // and the waiter branch's `continue` only runs when `attempt <
1295        // Self::MAX_LEADER_RETRIES`. The last iteration's waiter branch returns
1296        // via the retry-bound check; the leader branch always returns.
1297        Err(Error::Payment(
1298            "internal error: closeness retry loop exited without returning".into(),
1299        ))
1300    }
1301
1302    /// Inner closeness check: the actual DHT lookup + set-membership test.
1303    /// Wrapped by [`verify_merkle_candidate_closeness`] with a pass-cache and
1304    /// single-flight guard so a batch of chunks and a storm of forged PUTs
1305    /// don't multiply the lookup cost.
1306    /// Derive each candidate's `PeerId` from its `pub_key` and reject the
1307    /// pool if any `PeerId` appears more than once.
1308    ///
1309    /// This is a pure-validation pre-check, runnable without a `P2PNode`:
1310    /// catches the case where one real peer's `pub_key` is repeated to
1311    /// inflate the closeness match count, without paying for a Kademlia
1312    /// lookup. An honest pool has [`evmlib::merkle_payments::CANDIDATES_PER_POOL`]
1313    /// distinct candidate `pub_keys` by construction.
1314    fn derive_distinct_candidate_peer_ids(
1315        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
1316    ) -> Result<Vec<PeerId>> {
1317        let mut candidate_peer_ids = Vec::with_capacity(pool.candidate_nodes.len());
1318        let mut seen = std::collections::HashSet::with_capacity(pool.candidate_nodes.len());
1319        for candidate in &pool.candidate_nodes {
1320            let pid = peer_id_from_public_key_bytes(&candidate.pub_key).map_err(|e| {
1321                Error::Payment(format!(
1322                    "Invalid ML-DSA public key in merkle candidate: {e}"
1323                ))
1324            })?;
1325            if !seen.insert(pid) {
1326                return Err(Error::Payment(
1327                    "Merkle candidate pool rejected: duplicate candidate PeerId. An \
1328                     honest pool has 16 distinct candidate pub_keys; duplicates would \
1329                     let a single real peer satisfy the closeness threshold by being \
1330                     counted multiple times."
1331                        .into(),
1332                ));
1333            }
1334            candidate_peer_ids.push(pid);
1335        }
1336        Ok(candidate_peer_ids)
1337    }
1338
1339    /// Pure-logic closeness check: given the pool's candidate peer IDs and
1340    /// the storer's authoritative network view (closest peers to the pool
1341    /// midpoint), decide whether the pool passes the
1342    /// `CANDIDATE_CLOSENESS_REQUIRED`-of-N threshold.
1343    ///
1344    /// A candidate counts only if its `PeerId` is one of the peers the
1345    /// storer's own network lookup returned (exact set membership). This is
1346    /// the property that makes the gate meaningful: a passing candidate must
1347    /// be a real, reachable peer the live DHT actually routes to and lists
1348    /// among the closest — it cannot be a key fabricated off-network. The
1349    /// leniency in this check is purely the lowered threshold (a majority
1350    /// rather than near-unanimity), which tolerates the closest-set
1351    /// divergence between two nodes' views without admitting fabricated keys.
1352    ///
1353    /// Extracted from `verify_merkle_candidate_closeness_inner` so tests
1354    /// can exercise the matching logic without standing up a real DHT.
1355    /// Mirrors the runtime path exactly: same sparse-network short-circuit,
1356    /// same set-membership check, same error strings.
1357    fn check_closeness_match(
1358        candidate_peer_ids: &[PeerId],
1359        network_peer_ids: &[PeerId],
1360        pool_address: &[u8; 32],
1361    ) -> Result<()> {
1362        // Sparse-network short-circuit: if the DHT itself returned fewer
1363        // peers than the closeness threshold, the proof can never pass —
1364        // not because the candidates are forged, but because we don't
1365        // have an authoritative view to compare against. Surface this
1366        // distinct cause so operators can tell "retry once the network
1367        // settles" apart from "this peer sent a forged pool".
1368        if network_peer_ids.len() < Self::CANDIDATE_CLOSENESS_REQUIRED {
1369            debug!(
1370                "Merkle closeness deferred: network lookup returned {} peers \
1371                 for pool midpoint {} (need at least {} to verify)",
1372                network_peer_ids.len(),
1373                hex::encode(pool_address),
1374                Self::CANDIDATE_CLOSENESS_REQUIRED,
1375            );
1376            return Err(Error::Payment(format!(
1377                "Merkle candidate pool rejected: authoritative DHT lookup returned \
1378                 only {} peers, less than the {} required to verify candidate \
1379                 closeness. Retry once the routing table populates further.",
1380                network_peer_ids.len(),
1381                Self::CANDIDATE_CLOSENESS_REQUIRED,
1382            )));
1383        }
1384
1385        // Exact-match membership against the returned closest peers.
1386        // Candidate `PeerId`s are deduplicated upstream, so each match
1387        // corresponds to a distinct peer.
1388        let network_set: std::collections::HashSet<PeerId> =
1389            network_peer_ids.iter().copied().collect();
1390        let matched = candidate_peer_ids
1391            .iter()
1392            .filter(|pid| network_set.contains(pid))
1393            .count();
1394
1395        if matched < Self::CANDIDATE_CLOSENESS_REQUIRED {
1396            debug!(
1397                "Merkle closeness rejected: {matched}/{} candidates match the DHT's closest peers \
1398                 for pool midpoint {} (required: {}, network returned {} peers)",
1399                candidate_peer_ids.len(),
1400                hex::encode(pool_address),
1401                Self::CANDIDATE_CLOSENESS_REQUIRED,
1402                network_peer_ids.len(),
1403            );
1404            return Err(Error::Payment(
1405                "Merkle candidate pool rejected: candidate pub_keys do not match the \
1406                 network's closest peers to the pool midpoint address. Pools must be \
1407                 collected from the pool-address close group, not fabricated off-network."
1408                    .into(),
1409            ));
1410        }
1411
1412        debug!(
1413            "Merkle closeness passed: {matched}/{} candidates matched the DHT's closest peers \
1414             for pool midpoint {}",
1415            candidate_peer_ids.len(),
1416            hex::encode(pool_address),
1417        );
1418        Ok(())
1419    }
1420
1421    #[allow(clippy::too_many_lines)]
1422    async fn verify_merkle_candidate_closeness_inner(
1423        &self,
1424        pool: &evmlib::merkle_payments::MerklePaymentCandidatePool,
1425    ) -> Result<()> {
1426        // Pre-check: catch malformed/hostile pools (duplicate candidate
1427        // PeerIds) before paying for the Kademlia lookup. Runs in unit
1428        // tests without a P2PNode too.
1429        let candidate_peer_ids = Self::derive_distinct_candidate_peer_ids(pool)?;
1430
1431        // Release the RwLock guard before any await to avoid holding it
1432        // across an iterative Kademlia lookup.
1433        let attached = self.p2p_node.read().as_ref().map(Arc::clone);
1434        let Some(p2p_node) = attached else {
1435            // Production must call attach_p2p_node at startup. Fail CLOSED
1436            // to avoid silently disabling the defence if a startup path
1437            // regresses and loses the attach call. Unit-test builds that
1438            // construct a PaymentVerifier directly without exercising merkle
1439            // verification are opted-in via `test-utils` to fall back to
1440            // fail-open.
1441            #[cfg(any(test, feature = "test-utils"))]
1442            {
1443                crate::logging::warn!(
1444                    "PaymentVerifier: no P2PNode attached; merkle pay-yourself \
1445                     defence SKIPPED (test build). Production startup MUST call \
1446                     PaymentVerifier::attach_p2p_node."
1447                );
1448                return Ok(());
1449            }
1450            #[cfg(not(any(test, feature = "test-utils")))]
1451            {
1452                crate::logging::error!(
1453                    "PaymentVerifier: no P2PNode attached; rejecting merkle \
1454                     payment. This is a node-startup bug — \
1455                     PaymentVerifier::attach_p2p_node must be called before \
1456                     any PUT handler runs."
1457                );
1458                return Err(Error::Payment(
1459                    "Merkle candidate pool rejected: verifier is not wired to \
1460                     the P2P layer; cannot verify candidate closeness."
1461                        .into(),
1462                ));
1463            }
1464        };
1465
1466        let pool_address = pool.midpoint_proof.address();
1467        // Match the client's over-query width. The client's
1468        // `get_merkle_candidate_pool` queries 2 × `CANDIDATES_PER_POOL` peers
1469        // and picks the 16 closest *valid responders* — so legitimate pools
1470        // routinely include peers from positions 17–32 of the network's true
1471        // ranking when the closer peers are slow or NAT-stuck. The storer
1472        // must look at the same window or it will reject honest pools with
1473        // no security benefit.
1474        //
1475        // `pool.candidate_nodes` is currently a fixed-size array of length
1476        // `CANDIDATES_PER_POOL` (= 16), so `.max(...)` always evaluates to
1477        // `CLOSENESS_LOOKUP_WIDTH` today. The compile-time
1478        // `const _: () = assert!(WIDTH >= CANDIDATES_PER_POOL)` in the test
1479        // module pins that invariant. The `.max(...)` form is belt-and-braces
1480        // for a hypothetical future protocol that grows pool size to a
1481        // `Vec`-typed candidate set: the storer would scale its lookup with
1482        // the pool rather than truncating, which would otherwise re-open the
1483        // K-too-small failure mode.
1484        let lookup_count = Self::closeness_lookup_count(pool.candidate_nodes.len());
1485        let network_lookup = p2p_node
1486            .dht_manager()
1487            .find_closest_nodes_network(&pool_address.0, lookup_count);
1488        let network_peers =
1489            match tokio::time::timeout(Self::CLOSENESS_LOOKUP_TIMEOUT, network_lookup).await {
1490                Ok(Ok(peers)) => peers,
1491                Ok(Err(e)) => {
1492                    debug!(
1493                        "Merkle closeness network-lookup failed for pool midpoint {}: {e}",
1494                        hex::encode(pool_address.0),
1495                    );
1496                    return Err(Error::Payment(
1497                        "Merkle candidate pool rejected: could not verify candidate \
1498                     closeness against the authoritative network view."
1499                            .into(),
1500                    ));
1501                }
1502                Err(_) => {
1503                    debug!(
1504                        "Merkle closeness network-lookup timeout ({:?}) for pool midpoint {}",
1505                        Self::CLOSENESS_LOOKUP_TIMEOUT,
1506                        hex::encode(pool_address.0),
1507                    );
1508                    return Err(Error::Payment(
1509                        "Merkle candidate pool rejected: authoritative network lookup \
1510                     timed out. Retry once the network lookup completes."
1511                            .into(),
1512                    ));
1513                }
1514            };
1515
1516        let network_peer_ids: Vec<PeerId> = network_peers.iter().map(|n| n.peer_id).collect();
1517        Self::check_closeness_match(&candidate_peer_ids, &network_peer_ids, &pool_address.0)
1518    }
1519
1520    /// Verify a merkle batch payment proof.
1521    ///
1522    /// This verification flow:
1523    /// 1. Deserialize the `MerklePaymentProof`
1524    /// 2. Check pool cache for previously verified pool hash
1525    /// 3. If not cached, query on-chain for payment info
1526    /// 4. Validate the proof against on-chain data
1527    /// 5. Cache the pool hash for subsequent chunk verifications in the same batch
1528    #[allow(clippy::too_many_lines)]
1529    async fn verify_merkle_payment(
1530        &self,
1531        xorname: &XorName,
1532        proof_bytes: &[u8],
1533        context: VerificationContext,
1534    ) -> Result<()> {
1535        if crate::logging::enabled!(crate::logging::Level::DEBUG) {
1536            debug!(
1537                "Verifying merkle payment for {} ({context:?})",
1538                hex::encode(xorname)
1539            );
1540        }
1541
1542        // Deserialize the merkle proof
1543        let merkle_proof = deserialize_merkle_proof(proof_bytes)
1544            .map_err(|e| Error::Payment(format!("Failed to deserialize merkle proof: {e}")))?;
1545
1546        // Verify the address in the proof matches the xorname being stored
1547        if merkle_proof.address.0 != *xorname {
1548            let proof_hex = hex::encode(merkle_proof.address.0);
1549            let store_hex = hex::encode(xorname);
1550            return Err(Error::Payment(format!(
1551                "Merkle proof address mismatch: proof is for {proof_hex}, but storing {store_hex}"
1552            )));
1553        }
1554
1555        let pool_hash = merkle_proof.winner_pool_hash();
1556
1557        // Run cheap local checks BEFORE expensive on-chain queries.
1558        // This prevents DoS via garbage proofs that trigger RPC lookups.
1559        for candidate in &merkle_proof.winner_pool.candidate_nodes {
1560            if !crate::payment::verify_merkle_candidate_signature(candidate) {
1561                return Err(Error::Payment(format!(
1562                    "Invalid ML-DSA-65 signature on merkle candidate node (reward: {})",
1563                    candidate.reward_address
1564                )));
1565            }
1566        }
1567
1568        // Pay-yourself defence: the candidate pub_keys must map to peers the
1569        // live DHT actually considers closest to the pool midpoint. Without
1570        // this, an attacker can point all 16 reward_address fields at a
1571        // self-owned wallet and drain their own payment. Every storing node
1572        // runs this check against the single `winner_pool` in the proof, so a
1573        // forged pool is rejected everywhere it lands. The pass cache and
1574        // single-flight keyed on pool_hash collapse the Kademlia lookup cost
1575        // within a batch and across concurrent PUTs for the same pool.
1576        //
1577        self.verify_merkle_candidate_closeness(&merkle_proof.winner_pool, pool_hash)
1578            .await?;
1579
1580        // Check pool cache first
1581        let cached_info = {
1582            let mut pool_cache = self.pool_cache.lock();
1583            pool_cache.get(&pool_hash).cloned()
1584        };
1585
1586        let payment_info = if let Some(info) = cached_info {
1587            debug!("Pool cache hit for hash {}", hex::encode(pool_hash));
1588            info
1589        } else {
1590            // Query on-chain for completed merkle payment
1591            let info =
1592                payment_vault::get_completed_merkle_payment(&self.config.evm.network, pool_hash)
1593                    .await
1594                    .map_err(|e| {
1595                        let pool_hex = hex::encode(pool_hash);
1596                        Error::Payment(format!(
1597                            "Failed to query merkle payment info for pool {pool_hex}: {e}"
1598                        ))
1599                    })?;
1600
1601            let paid_node_addresses: Vec<_> = info
1602                .paidNodeAddresses
1603                .iter()
1604                .map(|pna| (pna.rewardsAddress, usize::from(pna.poolIndex), pna.amount))
1605                .collect();
1606
1607            let on_chain_info = OnChainPaymentInfo {
1608                depth: info.depth,
1609                merkle_payment_timestamp: info.merklePaymentTimestamp,
1610                paid_node_addresses,
1611            };
1612
1613            // Cache the pool info for subsequent chunks in the same batch
1614            {
1615                let mut pool_cache = self.pool_cache.lock();
1616                pool_cache.put(pool_hash, on_chain_info.clone());
1617            }
1618
1619            debug!(
1620                "Queried on-chain merkle payment info for pool {}: depth={}, timestamp={}, paid_nodes={}",
1621                hex::encode(pool_hash),
1622                on_chain_info.depth,
1623                on_chain_info.merkle_payment_timestamp,
1624                on_chain_info.paid_node_addresses.len()
1625            );
1626
1627            on_chain_info
1628        };
1629
1630        // Verify timestamp consistency (signatures already checked above before RPC).
1631        for candidate in &merkle_proof.winner_pool.candidate_nodes {
1632            if candidate.merkle_payment_timestamp != payment_info.merkle_payment_timestamp {
1633                return Err(Error::Payment(format!(
1634                    "Candidate timestamp mismatch: expected {}, got {} (reward: {})",
1635                    payment_info.merkle_payment_timestamp,
1636                    candidate.merkle_payment_timestamp,
1637                    candidate.reward_address
1638                )));
1639            }
1640        }
1641
1642        // Get the root from the winner pool's midpoint proof
1643        let smart_contract_root = merkle_proof.winner_pool.midpoint_proof.root();
1644
1645        // Verify the cryptographic merkle proofs (address belongs to tree,
1646        // midpoint belongs to tree, roots match, timestamps valid).
1647        evmlib::merkle_payments::verify_merkle_proof(
1648            &merkle_proof.address,
1649            &merkle_proof.data_proof,
1650            &merkle_proof.winner_pool.midpoint_proof,
1651            payment_info.depth,
1652            smart_contract_root,
1653            payment_info.merkle_payment_timestamp,
1654        )
1655        .map_err(|e| {
1656            let xorname_hex = hex::encode(xorname);
1657            Error::Payment(format!(
1658                "Merkle proof verification failed for {xorname_hex}: {e}"
1659            ))
1660        })?;
1661
1662        // Verify paid node count matches depth
1663        let expected_depth = payment_info.depth as usize;
1664        let actual_paid = payment_info.paid_node_addresses.len();
1665        if actual_paid != expected_depth {
1666            return Err(Error::Payment(format!(
1667                "Wrong number of paid nodes: expected {expected_depth}, got {actual_paid}"
1668            )));
1669        }
1670
1671        // Compute expected per-node payment using the contract formula:
1672        // totalAmount = median16(candidate_prices) * (1 << depth)
1673        // amountPerNode = totalAmount / depth
1674        let expected_per_node = if payment_info.depth > 0 {
1675            let mut candidate_prices: Vec<Amount> = merkle_proof
1676                .winner_pool
1677                .candidate_nodes
1678                .iter()
1679                .map(|c| c.price)
1680                .collect();
1681            candidate_prices.sort_unstable(); // ascending
1682                                              // Upper median (index 8 of 16) — matches Solidity's median16 (k = 8)
1683            let median_price = *candidate_prices
1684                .get(candidate_prices.len() / 2)
1685                .ok_or_else(|| Error::Payment("empty candidate pool in merkle proof".into()))?;
1686            let shift = u32::from(payment_info.depth);
1687            let multiplier = 1u64
1688                .checked_shl(shift)
1689                .ok_or_else(|| Error::Payment("merkle proof depth too large".into()))?;
1690            let total_amount = median_price * Amount::from(multiplier);
1691            total_amount / Amount::from(u64::from(payment_info.depth))
1692        } else {
1693            Amount::ZERO
1694        };
1695
1696        // Verify paid node indices, addresses, and amounts against the candidate pool.
1697        //
1698        // Each paid node must:
1699        // 1. Have a valid index within the candidate pool
1700        // 2. Match the expected reward address at that index
1701        // 3. Have been paid at least the expected per-node amount from the
1702        //    contract formula: median16(prices) * 2^depth / depth
1703        //
1704        // Note: unlike single-node payments, merkle proofs are NOT bound to a
1705        // specific storing node. The contract pays `depth` random nodes from the
1706        // winner pool; the storing node is whichever close-group peer the client
1707        // routes the chunk to. There is no local-recipient check here because
1708        // any node that can verify the merkle proof is allowed to store the chunk.
1709        // Replay protection comes from the per-address proof binding (each proof
1710        // is for a specific XorName in the paid tree).
1711        for (addr, idx, paid_amount) in &payment_info.paid_node_addresses {
1712            let node = merkle_proof
1713                .winner_pool
1714                .candidate_nodes
1715                .get(*idx)
1716                .ok_or_else(|| {
1717                    Error::Payment(format!(
1718                        "Paid node index {idx} out of bounds for pool size {}",
1719                        merkle_proof.winner_pool.candidate_nodes.len()
1720                    ))
1721                })?;
1722            if node.reward_address != *addr {
1723                return Err(Error::Payment(format!(
1724                    "Paid node address mismatch at index {idx}: expected {addr}, got {}",
1725                    node.reward_address
1726                )));
1727            }
1728            if *paid_amount < expected_per_node {
1729                return Err(Error::Payment(format!(
1730                    "Underpayment for node at index {idx}: paid {paid_amount}, \
1731                     expected at least {expected_per_node} \
1732                     (median16 formula, depth={})",
1733                    payment_info.depth
1734                )));
1735            }
1736        }
1737
1738        if crate::logging::enabled!(crate::logging::Level::INFO) {
1739            info!(
1740                "Merkle payment verified for {} (pool: {})",
1741                hex::encode(xorname),
1742                hex::encode(pool_hash)
1743            );
1744        }
1745
1746        Ok(())
1747    }
1748}
1749
1750#[cfg(test)]
1751#[allow(clippy::expect_used, clippy::panic)]
1752mod tests {
1753    use super::*;
1754    use evmlib::merkle_payments::MerklePaymentCandidatePool;
1755    use evmlib::PaymentQuote;
1756    use saorsa_core::MlDsa65;
1757    use saorsa_pqc::pqc::types::MlDsaSecretKey;
1758    use saorsa_pqc::pqc::MlDsaOperations;
1759    use std::time::SystemTime;
1760
1761    /// Create a verifier for unit tests. EVM is always on, but tests can
1762    /// pre-populate the cache to bypass on-chain verification.
1763    fn create_test_verifier() -> PaymentVerifier {
1764        let config = PaymentVerifierConfig {
1765            evm: EvmVerifierConfig::default(),
1766            cache_capacity: 100,
1767            close_group_size: CLOSE_GROUP_SIZE,
1768            local_rewards_address: RewardsAddress::new([1u8; 20]),
1769        };
1770        PaymentVerifier::new(config)
1771    }
1772
1773    #[test]
1774    fn paid_quote_issuer_closeness_width_uses_k() {
1775        let issuer_closeness_width = PAID_QUOTE_ISSUER_CLOSENESS_WIDTH;
1776        let k_bucket_size = K_BUCKET_SIZE;
1777        let close_group_size = CLOSE_GROUP_SIZE;
1778
1779        assert_eq!(issuer_closeness_width, k_bucket_size);
1780        assert!(issuer_closeness_width > close_group_size);
1781    }
1782
1783    fn make_signed_quote(
1784        xorname: XorName,
1785        price: Amount,
1786        rewards_seed: u8,
1787    ) -> (evmlib::EncodedPeerId, PaymentQuote) {
1788        let ml_dsa = MlDsa65::new();
1789        let (public_key, secret_key) = ml_dsa.generate_keypair().expect("keygen");
1790        let pub_key_bytes = public_key.as_bytes().to_vec();
1791        let peer_id = encoded_peer_id_for_pub_key(&pub_key_bytes);
1792        let mut quote = PaymentQuote {
1793            content: xor_name::XorName(xorname),
1794            timestamp: SystemTime::now(),
1795            price,
1796            rewards_address: RewardsAddress::new([rewards_seed; 20]),
1797            pub_key: pub_key_bytes,
1798            signature: Vec::new(),
1799        };
1800        let secret_key = MlDsaSecretKey::from_bytes(secret_key.as_bytes()).expect("secret key");
1801        quote.signature = ml_dsa
1802            .sign(&secret_key, &quote.bytes_for_sig())
1803            .expect("sign quote")
1804            .as_bytes()
1805            .to_vec();
1806        (peer_id, quote)
1807    }
1808
1809    fn make_signed_legacy_bundle(
1810        xorname: XorName,
1811        prices: [Amount; CLOSE_GROUP_SIZE],
1812    ) -> Vec<(evmlib::EncodedPeerId, PaymentQuote)> {
1813        prices
1814            .into_iter()
1815            .enumerate()
1816            .map(|(index, price)| {
1817                let rewards_seed = u8::try_from(index + 1).expect("small test index");
1818                make_signed_quote(xorname, price, rewards_seed)
1819            })
1820            .collect()
1821    }
1822
1823    fn price_at_records(records: usize) -> Amount {
1824        crate::payment::pricing::calculate_price(records)
1825    }
1826
1827    fn unique_test_prices() -> [Amount; CLOSE_GROUP_SIZE] {
1828        [
1829            price_at_records(0),
1830            price_at_records(1),
1831            price_at_records(2),
1832            price_at_records(3),
1833            price_at_records(4),
1834            price_at_records(5),
1835            price_at_records(6),
1836        ]
1837    }
1838
1839    fn tied_median_test_prices() -> [Amount; CLOSE_GROUP_SIZE] {
1840        [
1841            price_at_records(0),
1842            price_at_records(1),
1843            price_at_records(2),
1844            price_at_records(3),
1845            price_at_records(3),
1846            price_at_records(4),
1847            price_at_records(5),
1848        ]
1849    }
1850
1851    fn median_test_candidates(
1852        peer_quotes: &[(evmlib::EncodedPeerId, PaymentQuote)],
1853    ) -> Vec<(evmlib::EncodedPeerId, PaymentQuote)> {
1854        let mut sorted_quotes: Vec<_> = peer_quotes.iter().collect();
1855        sorted_quotes.sort_by_key(|(_, quote)| quote.price);
1856        let median_index = median_quote_index(sorted_quotes.len());
1857        let median_price = sorted_quotes
1858            .get(median_index)
1859            .expect("median quote")
1860            .1
1861            .price;
1862
1863        sorted_quotes
1864            .into_iter()
1865            .filter(|(_, quote)| quote.price == median_price)
1866            .map(|(peer_id, quote)| (peer_id.clone(), quote.clone()))
1867            .collect()
1868    }
1869
1870    fn expected_median_payment(peer_quotes: &[(evmlib::EncodedPeerId, PaymentQuote)]) -> Amount {
1871        let median_price = median_test_candidates(peer_quotes)
1872            .first()
1873            .expect("median candidate")
1874            .1
1875            .price;
1876        median_price * Amount::from(PAID_QUOTE_PAYMENT_MULTIPLIER)
1877    }
1878
1879    fn mark_k_closest_paid_candidates(
1880        verifier: &PaymentVerifier,
1881        peer_quotes: &[(evmlib::EncodedPeerId, PaymentQuote)],
1882    ) {
1883        let k_closest_peers = median_test_candidates(peer_quotes)
1884            .iter()
1885            .map(|(peer_id, _)| *peer_id.as_bytes())
1886            .collect();
1887        verifier.set_paid_quote_k_closest_for_tests(k_closest_peers);
1888    }
1889
1890    fn mark_candidate_paid(verifier: &PaymentVerifier, quote: &PaymentQuote, amount: Amount) {
1891        verifier.set_completed_payment_for_tests(quote.hash(), amount);
1892    }
1893
1894    fn mark_all_median_candidates_unpaid(
1895        verifier: &PaymentVerifier,
1896        peer_quotes: &[(evmlib::EncodedPeerId, PaymentQuote)],
1897    ) {
1898        for (_, quote) in median_test_candidates(peer_quotes) {
1899            mark_candidate_paid(verifier, &quote, Amount::ZERO);
1900        }
1901    }
1902
1903    #[test]
1904    fn test_payment_required_for_new_data() {
1905        let verifier = create_test_verifier();
1906        let xorname = [1u8; 32];
1907
1908        // All uncached data requires payment
1909        let status = verifier.check_payment_required(&xorname, VerificationContext::ClientPut);
1910        assert_eq!(status, PaymentStatus::PaymentRequired);
1911    }
1912
1913    #[test]
1914    fn test_cache_hit() {
1915        let verifier = create_test_verifier();
1916        let xorname = [1u8; 32];
1917
1918        // Manually add to cache
1919        verifier.cache.insert(xorname);
1920
1921        // Should return CachedAsVerified
1922        let status = verifier.check_payment_required(&xorname, VerificationContext::ClientPut);
1923        assert_eq!(status, PaymentStatus::CachedAsVerified);
1924    }
1925
1926    #[tokio::test]
1927    async fn test_verify_payment_without_proof_rejected() {
1928        let verifier = create_test_verifier();
1929        let xorname = [1u8; 32];
1930
1931        // No proof provided => should return an error (EVM is always on)
1932        let result = verifier
1933            .verify_payment(&xorname, None, VerificationContext::ClientPut)
1934            .await;
1935        assert!(
1936            result.is_err(),
1937            "Expected Err without proof, got: {result:?}"
1938        );
1939    }
1940
1941    #[tokio::test]
1942    async fn test_verify_payment_cached() {
1943        let verifier = create_test_verifier();
1944        let xorname = [1u8; 32];
1945
1946        // Add to cache — simulates previously-paid data
1947        verifier.cache.insert(xorname);
1948
1949        // Should succeed without payment (cached)
1950        let result = verifier
1951            .verify_payment(&xorname, None, VerificationContext::ClientPut)
1952            .await;
1953        assert!(result.is_ok());
1954        assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified);
1955    }
1956
1957    #[tokio::test]
1958    async fn test_paid_list_cache_entry_does_not_satisfy_client_put() {
1959        let verifier = create_test_verifier();
1960        let xorname = [0xB8u8; 32];
1961        verifier.cache.insert_paid_list_verified(xorname);
1962
1963        assert_eq!(
1964            verifier.check_payment_required(&xorname, VerificationContext::PaidListAdmission),
1965            PaymentStatus::CachedAsVerified,
1966            "paid-list lookups must hit a paid-list-verified entry"
1967        );
1968        assert_eq!(
1969            verifier.check_payment_required(&xorname, VerificationContext::ClientPut),
1970            PaymentStatus::PaymentRequired,
1971            "client PUT must not fast-path on a paid-list-verified entry"
1972        );
1973
1974        let err = verifier
1975            .verify_payment(&xorname, None, VerificationContext::ClientPut)
1976            .await
1977            .expect_err("proof-less client PUT must not ride the paid-list entry");
1978        assert!(
1979            format!("{err}").contains("Payment required"),
1980            "client PUT must still demand payment: {err}"
1981        );
1982    }
1983
1984    #[test]
1985    fn test_payment_status_can_store() {
1986        assert!(PaymentStatus::CachedAsVerified.can_store());
1987        assert!(PaymentStatus::PaymentVerified.can_store());
1988        assert!(!PaymentStatus::PaymentRequired.can_store());
1989    }
1990
1991    #[test]
1992    fn test_payment_status_is_cached() {
1993        assert!(PaymentStatus::CachedAsVerified.is_cached());
1994        assert!(!PaymentStatus::PaymentVerified.is_cached());
1995        assert!(!PaymentStatus::PaymentRequired.is_cached());
1996    }
1997
1998    #[tokio::test]
1999    async fn test_cache_preload_bypasses_evm() {
2000        let verifier = create_test_verifier();
2001        let xorname = [42u8; 32];
2002
2003        // Not yet cached — should require payment
2004        assert_eq!(
2005            verifier.check_payment_required(&xorname, VerificationContext::ClientPut),
2006            PaymentStatus::PaymentRequired
2007        );
2008
2009        // Pre-populate cache (simulates a previous successful payment)
2010        verifier.cache.insert(xorname);
2011
2012        // Now the xorname should be cached
2013        assert_eq!(
2014            verifier.check_payment_required(&xorname, VerificationContext::ClientPut),
2015            PaymentStatus::CachedAsVerified
2016        );
2017    }
2018
2019    #[tokio::test]
2020    async fn test_proof_too_small() {
2021        let verifier = create_test_verifier();
2022        let xorname = [1u8; 32];
2023
2024        // Proof smaller than MIN_PAYMENT_PROOF_SIZE_BYTES
2025        let small_proof = vec![0u8; MIN_PAYMENT_PROOF_SIZE_BYTES - 1];
2026        let result = verifier
2027            .verify_payment(&xorname, Some(&small_proof), VerificationContext::ClientPut)
2028            .await;
2029        assert!(result.is_err());
2030        let err_msg = format!("{}", result.expect_err("should fail"));
2031        assert!(
2032            err_msg.contains("too small"),
2033            "Error should mention 'too small': {err_msg}"
2034        );
2035    }
2036
2037    #[tokio::test]
2038    async fn test_proof_too_large() {
2039        let verifier = create_test_verifier();
2040        let xorname = [2u8; 32];
2041
2042        // Proof larger than MAX_PAYMENT_PROOF_SIZE_BYTES
2043        let large_proof = vec![0u8; MAX_PAYMENT_PROOF_SIZE_BYTES + 1];
2044        let result = verifier
2045            .verify_payment(&xorname, Some(&large_proof), VerificationContext::ClientPut)
2046            .await;
2047        assert!(result.is_err());
2048        let err_msg = format!("{}", result.expect_err("should fail"));
2049        assert!(
2050            err_msg.contains("too large"),
2051            "Error should mention 'too large': {err_msg}"
2052        );
2053    }
2054
2055    #[tokio::test]
2056    async fn test_proof_at_min_boundary_unknown_tag() {
2057        let verifier = create_test_verifier();
2058        let xorname = [3u8; 32];
2059
2060        // Exactly MIN_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected
2061        let boundary_proof = vec![0xFFu8; MIN_PAYMENT_PROOF_SIZE_BYTES];
2062        let result = verifier
2063            .verify_payment(
2064                &xorname,
2065                Some(&boundary_proof),
2066                VerificationContext::ClientPut,
2067            )
2068            .await;
2069        assert!(result.is_err());
2070        let err_msg = format!("{}", result.expect_err("should fail"));
2071        assert!(
2072            err_msg.contains("Unknown payment proof type tag"),
2073            "Error should mention unknown tag: {err_msg}"
2074        );
2075    }
2076
2077    #[tokio::test]
2078    async fn test_proof_at_max_boundary_unknown_tag() {
2079        let verifier = create_test_verifier();
2080        let xorname = [4u8; 32];
2081
2082        // Exactly MAX_PAYMENT_PROOF_SIZE_BYTES with unknown tag — rejected
2083        let boundary_proof = vec![0xFFu8; MAX_PAYMENT_PROOF_SIZE_BYTES];
2084        let result = verifier
2085            .verify_payment(
2086                &xorname,
2087                Some(&boundary_proof),
2088                VerificationContext::ClientPut,
2089            )
2090            .await;
2091        assert!(result.is_err());
2092        let err_msg = format!("{}", result.expect_err("should fail"));
2093        assert!(
2094            err_msg.contains("Unknown payment proof type tag"),
2095            "Error should mention unknown tag: {err_msg}"
2096        );
2097    }
2098
2099    #[tokio::test]
2100    async fn test_malformed_single_node_proof() {
2101        let verifier = create_test_verifier();
2102        let xorname = [5u8; 32];
2103
2104        // Valid tag (0x01) but garbage payload — should fail deserialization
2105        let mut garbage = vec![crate::ant_protocol::PROOF_TAG_SINGLE_NODE];
2106        garbage.extend_from_slice(&[0xAB; 63]);
2107        let result = verifier
2108            .verify_payment(&xorname, Some(&garbage), VerificationContext::ClientPut)
2109            .await;
2110        assert!(result.is_err());
2111        let err_msg = format!("{}", result.expect_err("should fail"));
2112        assert!(
2113            err_msg.contains("deserialize") || err_msg.contains("Failed"),
2114            "Error should mention deserialization failure: {err_msg}"
2115        );
2116    }
2117
2118    #[tokio::test]
2119    async fn test_legacy_paid_median_full_path_accepted() {
2120        let verifier = create_test_verifier();
2121        verifier.set_records_stored_for_tests(0);
2122        let xorname = [0xA1u8; 32];
2123        let peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices());
2124        mark_k_closest_paid_candidates(&verifier, &peer_quotes);
2125        let expected_amount = expected_median_payment(&peer_quotes);
2126        let paid_quote = median_test_candidates(&peer_quotes)
2127            .first()
2128            .expect("median candidate")
2129            .1
2130            .clone();
2131        mark_candidate_paid(&verifier, &paid_quote, expected_amount);
2132
2133        let proof_bytes = serialize_proof(peer_quotes);
2134        let result = verifier
2135            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2136            .await;
2137
2138        assert_eq!(
2139            result.expect("paid median should verify"),
2140            PaymentStatus::PaymentVerified
2141        );
2142    }
2143
2144    #[tokio::test]
2145    async fn test_legacy_single_quote_proof_accepted() {
2146        let verifier = create_test_verifier();
2147        verifier.set_records_stored_for_tests(0);
2148        let xorname = [0xB1u8; 32];
2149        let (peer_id, quote) = make_signed_quote(xorname, price_at_records(0), 1);
2150        let peer_quotes = vec![(peer_id, quote.clone())];
2151        mark_k_closest_paid_candidates(&verifier, &peer_quotes);
2152        mark_candidate_paid(&verifier, &quote, expected_median_payment(&peer_quotes));
2153
2154        let proof_bytes = serialize_proof(peer_quotes);
2155        let result = verifier
2156            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2157            .await;
2158
2159        assert_eq!(
2160            result.expect("single paid quote should verify"),
2161            PaymentStatus::PaymentVerified
2162        );
2163    }
2164
2165    #[tokio::test]
2166    async fn test_legacy_single_quote_proof_requires_three_x_payment() {
2167        let verifier = create_test_verifier();
2168        verifier.set_records_stored_for_tests(0);
2169        let xorname = [0xB2u8; 32];
2170        let (peer_id, quote) = make_signed_quote(xorname, price_at_records(0), 1);
2171        let peer_quotes = vec![(peer_id, quote.clone())];
2172        mark_k_closest_paid_candidates(&verifier, &peer_quotes);
2173        mark_candidate_paid(&verifier, &quote, quote.price);
2174
2175        let proof_bytes = serialize_proof(peer_quotes);
2176        let err = verifier
2177            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2178            .await
2179            .expect_err("single quote paid less than 3x should be rejected");
2180
2181        assert!(
2182            format!("{err}").contains("not paid enough"),
2183            "Error should mention underpayment: {err}"
2184        );
2185    }
2186
2187    #[tokio::test]
2188    async fn test_legacy_too_many_quotes_rejected() {
2189        let verifier = create_test_verifier();
2190        verifier.set_records_stored_for_tests(0);
2191        let xorname = [0xB3u8; 32];
2192        let mut peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices());
2193        peer_quotes.push(make_signed_quote(xorname, price_at_records(7), 8));
2194
2195        let proof_bytes = serialize_proof(peer_quotes);
2196        let err = verifier
2197            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2198            .await
2199            .expect_err("proof with more than close-group quotes should be rejected");
2200
2201        assert!(
2202            format!("{err}").contains("at most"),
2203            "Error should mention max quote count: {err}"
2204        );
2205    }
2206
2207    #[tokio::test]
2208    async fn test_legacy_structural_majority_price_at_median_accepted() {
2209        let verifier = create_test_verifier();
2210        verifier.set_records_stored_for_tests(1000);
2211        let xorname = [0xA2u8; 32];
2212        let peer_quotes = make_signed_legacy_bundle(
2213            xorname,
2214            [
2215                crate::payment::pricing::calculate_price(0),
2216                crate::payment::pricing::calculate_price(100),
2217                crate::payment::pricing::calculate_price(500),
2218                crate::payment::pricing::calculate_price(1000),
2219                crate::payment::pricing::calculate_price(2000),
2220                crate::payment::pricing::calculate_price(4000),
2221                crate::payment::pricing::calculate_price(6000),
2222            ],
2223        );
2224        mark_k_closest_paid_candidates(&verifier, &peer_quotes);
2225        let expected_amount = expected_median_payment(&peer_quotes);
2226        let paid_quote = median_test_candidates(&peer_quotes)
2227            .first()
2228            .expect("median candidate")
2229            .1
2230            .clone();
2231        mark_candidate_paid(&verifier, &paid_quote, expected_amount);
2232
2233        let proof_bytes = serialize_proof(peer_quotes);
2234        let result = verifier
2235            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2236            .await;
2237
2238        assert_eq!(
2239            result.expect("median-priced verifier should accept"),
2240            PaymentStatus::PaymentVerified
2241        );
2242    }
2243
2244    #[tokio::test]
2245    async fn test_legacy_above_median_verifier_rejected_by_floor() {
2246        let verifier = create_test_verifier();
2247        verifier.set_records_stored_for_tests(2000);
2248        let xorname = [0xA3u8; 32];
2249        let peer_quotes = make_signed_legacy_bundle(
2250            xorname,
2251            [
2252                crate::payment::pricing::calculate_price(0),
2253                crate::payment::pricing::calculate_price(100),
2254                crate::payment::pricing::calculate_price(500),
2255                crate::payment::pricing::calculate_price(1000),
2256                crate::payment::pricing::calculate_price(2000),
2257                crate::payment::pricing::calculate_price(4000),
2258                crate::payment::pricing::calculate_price(6000),
2259            ],
2260        );
2261        mark_k_closest_paid_candidates(&verifier, &peer_quotes);
2262        let expected_amount = expected_median_payment(&peer_quotes);
2263        let paid_quote = median_test_candidates(&peer_quotes)
2264            .first()
2265            .expect("median candidate")
2266            .1
2267            .clone();
2268        mark_candidate_paid(&verifier, &paid_quote, expected_amount);
2269
2270        let proof_bytes = serialize_proof(peer_quotes);
2271        let err = verifier
2272            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2273            .await
2274            .expect_err("above-median verifier should reject the client PUT");
2275
2276        assert!(
2277            format!("{err}").contains("below local floor"),
2278            "Error should mention paid-quote floor: {err}"
2279        );
2280    }
2281
2282    #[tokio::test]
2283    async fn test_legacy_paid_median_issuer_k_closest_rejection() {
2284        let verifier = create_test_verifier();
2285        verifier.set_records_stored_for_tests(0);
2286        verifier.set_paid_quote_k_closest_for_tests(vec![rand::random()]);
2287        let xorname = [0xA4u8; 32];
2288        let peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices());
2289        let expected_amount = expected_median_payment(&peer_quotes);
2290        let paid_quote = median_test_candidates(&peer_quotes)
2291            .first()
2292            .expect("median candidate")
2293            .1
2294            .clone();
2295        mark_candidate_paid(&verifier, &paid_quote, expected_amount);
2296
2297        let proof_bytes = serialize_proof(peer_quotes);
2298        let err = verifier
2299            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2300            .await
2301            .expect_err("out-of-K paid issuer should be rejected");
2302
2303        assert!(
2304            format!("{err}").contains("not among this node's local"),
2305            "Error should mention local K-closest peers: {err}"
2306        );
2307    }
2308
2309    #[tokio::test]
2310    async fn test_legacy_paid_median_floor_rejection() {
2311        let verifier = create_test_verifier();
2312        verifier.set_records_stored_for_tests(6000);
2313        let xorname = [0xA5u8; 32];
2314        let peer_quotes = make_signed_legacy_bundle(
2315            xorname,
2316            [
2317                crate::payment::pricing::calculate_price(0),
2318                crate::payment::pricing::calculate_price(0),
2319                crate::payment::pricing::calculate_price(0),
2320                crate::payment::pricing::calculate_price(0),
2321                crate::payment::pricing::calculate_price(0),
2322                crate::payment::pricing::calculate_price(0),
2323                crate::payment::pricing::calculate_price(0),
2324            ],
2325        );
2326        mark_k_closest_paid_candidates(&verifier, &peer_quotes);
2327        let expected_amount = expected_median_payment(&peer_quotes);
2328        let paid_quote = median_test_candidates(&peer_quotes)
2329            .first()
2330            .expect("median candidate")
2331            .1
2332            .clone();
2333        mark_candidate_paid(&verifier, &paid_quote, expected_amount);
2334
2335        let proof_bytes = serialize_proof(peer_quotes);
2336        let err = verifier
2337            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2338            .await
2339            .expect_err("cheap paid median should be rejected");
2340
2341        assert!(
2342            format!("{err}").contains("below local floor"),
2343            "Error should mention local floor: {err}"
2344        );
2345    }
2346
2347    #[tokio::test]
2348    async fn test_legacy_zero_price_median_rejected() {
2349        let verifier = create_test_verifier();
2350        verifier.set_records_stored_for_tests(0);
2351        let xorname = [0xA6u8; 32];
2352        let peer_quotes = make_signed_legacy_bundle(
2353            xorname,
2354            [
2355                Amount::ZERO,
2356                Amount::ZERO,
2357                Amount::ZERO,
2358                Amount::ZERO,
2359                Amount::from(1u64),
2360                Amount::from(2u64),
2361                Amount::from(3u64),
2362            ],
2363        );
2364
2365        let proof_bytes = serialize_proof(peer_quotes);
2366        let err = verifier
2367            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2368            .await
2369            .expect_err("zero median must be rejected");
2370
2371        assert!(
2372            format!("{err}").contains("zero price"),
2373            "Error should mention zero price: {err}"
2374        );
2375    }
2376
2377    #[tokio::test]
2378    async fn test_legacy_paid_quote_content_mismatch_rejected() {
2379        let verifier = create_test_verifier();
2380        verifier.set_records_stored_for_tests(0);
2381        let xorname = [0xA7u8; 32];
2382        let mut peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices());
2383        let median_index = median_quote_index(peer_quotes.len());
2384        peer_quotes[median_index].1.content = xor_name::XorName([0xE7u8; 32]);
2385        mark_k_closest_paid_candidates(&verifier, &peer_quotes);
2386
2387        let proof_bytes = serialize_proof(peer_quotes);
2388        let err = verifier
2389            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2390            .await
2391            .expect_err("paid quote content mismatch should be rejected");
2392
2393        assert!(
2394            format!("{err}").contains("content address mismatch"),
2395            "Error should mention content mismatch: {err}"
2396        );
2397    }
2398
2399    #[tokio::test]
2400    async fn test_legacy_unpaid_quote_content_mismatch_accepted() {
2401        let verifier = create_test_verifier();
2402        verifier.set_records_stored_for_tests(0);
2403        let xorname = [0xA8u8; 32];
2404        let mut peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices());
2405        peer_quotes[0].1.content = xor_name::XorName([0xE8u8; 32]);
2406        mark_k_closest_paid_candidates(&verifier, &peer_quotes);
2407        let expected_amount = expected_median_payment(&peer_quotes);
2408        let paid_quote = median_test_candidates(&peer_quotes)
2409            .first()
2410            .expect("median candidate")
2411            .1
2412            .clone();
2413        mark_candidate_paid(&verifier, &paid_quote, expected_amount);
2414
2415        let proof_bytes = serialize_proof(peer_quotes);
2416        let result = verifier
2417            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2418            .await;
2419
2420        assert_eq!(
2421            result.expect("unpaid content mismatch should be ignored"),
2422            PaymentStatus::PaymentVerified
2423        );
2424    }
2425
2426    #[tokio::test]
2427    async fn test_legacy_paid_quote_bad_signature_rejected() {
2428        let verifier = create_test_verifier();
2429        verifier.set_records_stored_for_tests(0);
2430        let xorname = [0xA9u8; 32];
2431        let mut peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices());
2432        let median_index = median_quote_index(peer_quotes.len());
2433        peer_quotes[median_index].1.signature.push(0xFF);
2434        mark_k_closest_paid_candidates(&verifier, &peer_quotes);
2435        let expected_amount = expected_median_payment(&peer_quotes);
2436        let paid_quote = median_test_candidates(&peer_quotes)
2437            .first()
2438            .expect("median candidate")
2439            .1
2440            .clone();
2441        mark_candidate_paid(&verifier, &paid_quote, expected_amount);
2442
2443        let proof_bytes = serialize_proof(peer_quotes);
2444        let err = verifier
2445            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2446            .await
2447            .expect_err("paid bad signature should be rejected");
2448
2449        assert!(
2450            format!("{err}").contains("signature verification failed"),
2451            "Error should mention signature failure: {err}"
2452        );
2453    }
2454
2455    #[tokio::test]
2456    async fn test_legacy_unpaid_quote_bad_signature_accepted() {
2457        let verifier = create_test_verifier();
2458        verifier.set_records_stored_for_tests(0);
2459        let xorname = [0xAAu8; 32];
2460        let mut peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices());
2461        peer_quotes[0].1.signature.push(0xFF);
2462        mark_k_closest_paid_candidates(&verifier, &peer_quotes);
2463        let expected_amount = expected_median_payment(&peer_quotes);
2464        let paid_quote = median_test_candidates(&peer_quotes)
2465            .first()
2466            .expect("median candidate")
2467            .1
2468            .clone();
2469        mark_candidate_paid(&verifier, &paid_quote, expected_amount);
2470
2471        let proof_bytes = serialize_proof(peer_quotes);
2472        let result = verifier
2473            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2474            .await;
2475
2476        assert_eq!(
2477            result.expect("unpaid bad signature should be ignored"),
2478            PaymentStatus::PaymentVerified
2479        );
2480    }
2481
2482    #[tokio::test]
2483    async fn test_legacy_unpaid_peer_binding_mismatch_accepted() {
2484        let verifier = create_test_verifier();
2485        verifier.set_records_stored_for_tests(0);
2486        let xorname = [0xABu8; 32];
2487        let mut peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices());
2488        peer_quotes[0].0 = evmlib::EncodedPeerId::new(rand::random());
2489        mark_k_closest_paid_candidates(&verifier, &peer_quotes);
2490        let expected_amount = expected_median_payment(&peer_quotes);
2491        let paid_quote = median_test_candidates(&peer_quotes)
2492            .first()
2493            .expect("median candidate")
2494            .1
2495            .clone();
2496        mark_candidate_paid(&verifier, &paid_quote, expected_amount);
2497
2498        let proof_bytes = serialize_proof(peer_quotes);
2499        let result = verifier
2500            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2501            .await;
2502
2503        assert_eq!(
2504            result.expect("unpaid peer binding mismatch should be ignored"),
2505            PaymentStatus::PaymentVerified
2506        );
2507    }
2508
2509    #[tokio::test]
2510    async fn test_legacy_median_tie_accepts_paid_candidate() {
2511        let verifier = create_test_verifier();
2512        verifier.set_records_stored_for_tests(0);
2513        let xorname = [0xACu8; 32];
2514        let peer_quotes = make_signed_legacy_bundle(xorname, tied_median_test_prices());
2515        mark_k_closest_paid_candidates(&verifier, &peer_quotes);
2516        mark_all_median_candidates_unpaid(&verifier, &peer_quotes);
2517        let expected_amount = expected_median_payment(&peer_quotes);
2518        let paid_quote = median_test_candidates(&peer_quotes)
2519            .get(1)
2520            .expect("second tied median candidate")
2521            .1
2522            .clone();
2523        mark_candidate_paid(&verifier, &paid_quote, expected_amount);
2524
2525        let proof_bytes = serialize_proof(peer_quotes);
2526        let result = verifier
2527            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2528            .await;
2529
2530        assert_eq!(
2531            result.expect("one paid tied median candidate should verify"),
2532            PaymentStatus::PaymentVerified
2533        );
2534    }
2535
2536    #[tokio::test]
2537    async fn test_legacy_paid_list_admission_enforces_issuer_k_closest() {
2538        let verifier = create_test_verifier();
2539        verifier.set_records_stored_for_tests(0);
2540        verifier.set_paid_quote_k_closest_for_tests(Vec::new());
2541        let xorname = [0xB5u8; 32];
2542        let peer_quotes = make_signed_legacy_bundle(xorname, unique_test_prices());
2543        let expected_amount = expected_median_payment(&peer_quotes);
2544        let paid_quote = median_test_candidates(&peer_quotes)
2545            .first()
2546            .expect("median candidate")
2547            .1
2548            .clone();
2549        mark_candidate_paid(&verifier, &paid_quote, expected_amount);
2550
2551        let proof_bytes = serialize_proof(peer_quotes);
2552        let err = verifier
2553            .verify_payment(
2554                &xorname,
2555                Some(&proof_bytes),
2556                VerificationContext::PaidListAdmission,
2557            )
2558            .await
2559            .expect_err("paid-list admission must enforce the paid issuer K-closest check");
2560
2561        assert!(
2562            format!("{err}").contains("not among this node's local"),
2563            "Error should mention local K-closest peers: {err}"
2564        );
2565    }
2566
2567    #[tokio::test]
2568    async fn test_legacy_paid_list_admission_enforces_full_bundle_floor() {
2569        let verifier = create_test_verifier();
2570        verifier.set_records_stored_for_tests(6000);
2571        let xorname = [0xB6u8; 32];
2572        let peer_quotes = make_signed_legacy_bundle(
2573            xorname,
2574            [
2575                crate::payment::pricing::calculate_price(0),
2576                crate::payment::pricing::calculate_price(0),
2577                crate::payment::pricing::calculate_price(0),
2578                crate::payment::pricing::calculate_price(0),
2579                crate::payment::pricing::calculate_price(0),
2580                crate::payment::pricing::calculate_price(0),
2581                crate::payment::pricing::calculate_price(0),
2582            ],
2583        );
2584        mark_k_closest_paid_candidates(&verifier, &peer_quotes);
2585        let expected_amount = expected_median_payment(&peer_quotes);
2586        let paid_quote = median_test_candidates(&peer_quotes)
2587            .first()
2588            .expect("median candidate")
2589            .1
2590            .clone();
2591        mark_candidate_paid(&verifier, &paid_quote, expected_amount);
2592
2593        let proof_bytes = serialize_proof(peer_quotes);
2594        let err = verifier
2595            .verify_payment(
2596                &xorname,
2597                Some(&proof_bytes),
2598                VerificationContext::PaidListAdmission,
2599            )
2600            .await
2601            .expect_err("paid-list admission must enforce the floor for full bundles");
2602
2603        assert!(
2604            format!("{err}").contains("below local floor"),
2605            "Error should mention the local price floor: {err}"
2606        );
2607    }
2608
2609    #[test]
2610    fn test_cache_len_getter() {
2611        let verifier = create_test_verifier();
2612        assert_eq!(verifier.cache_len(), 0);
2613
2614        verifier.cache.insert([10u8; 32]);
2615        assert_eq!(verifier.cache_len(), 1);
2616
2617        verifier.cache.insert([20u8; 32]);
2618        assert_eq!(verifier.cache_len(), 2);
2619    }
2620
2621    #[test]
2622    fn test_cache_stats_after_operations() {
2623        let verifier = create_test_verifier();
2624        let xorname = [7u8; 32];
2625
2626        // Miss
2627        verifier.check_payment_required(&xorname, VerificationContext::ClientPut);
2628        let stats = verifier.cache_stats();
2629        assert_eq!(stats.misses, 1);
2630        assert_eq!(stats.hits, 0);
2631
2632        // Insert and hit
2633        verifier.cache.insert(xorname);
2634        verifier.check_payment_required(&xorname, VerificationContext::ClientPut);
2635        let stats = verifier.cache_stats();
2636        assert_eq!(stats.hits, 1);
2637        assert_eq!(stats.misses, 1);
2638        assert_eq!(stats.additions, 1);
2639    }
2640
2641    #[tokio::test]
2642    async fn test_concurrent_cache_lookups() {
2643        let verifier = std::sync::Arc::new(create_test_verifier());
2644
2645        // Pre-populate cache for all 10 xornames
2646        for i in 0..10u8 {
2647            verifier.cache.insert([i; 32]);
2648        }
2649
2650        let mut handles = Vec::new();
2651        for i in 0..10u8 {
2652            let v = verifier.clone();
2653            handles.push(tokio::spawn(async move {
2654                let xorname = [i; 32];
2655                v.verify_payment(&xorname, None, VerificationContext::ClientPut)
2656                    .await
2657            }));
2658        }
2659
2660        for handle in handles {
2661            let result = handle.await.expect("task panicked");
2662            assert!(result.is_ok());
2663            assert_eq!(result.expect("cached"), PaymentStatus::CachedAsVerified);
2664        }
2665
2666        assert_eq!(verifier.cache_len(), 10);
2667    }
2668
2669    #[test]
2670    fn test_default_evm_config() {
2671        let _config = EvmVerifierConfig::default();
2672        // EVM is always on — default network is ArbitrumOne
2673    }
2674
2675    #[test]
2676    fn test_real_ml_dsa_proof_size_within_limits() {
2677        use crate::payment::metrics::QuotingMetricsTracker;
2678        use crate::payment::proof::PaymentProof;
2679        use crate::payment::quote::{QuoteGenerator, XorName};
2680        use alloy::primitives::FixedBytes;
2681        use evmlib::{EncodedPeerId, RewardsAddress};
2682        use saorsa_core::MlDsa65;
2683        use saorsa_pqc::pqc::types::MlDsaSecretKey;
2684        use saorsa_pqc::pqc::MlDsaOperations;
2685
2686        let ml_dsa = MlDsa65::new();
2687        let mut peer_quotes = Vec::new();
2688
2689        for i in 0..5u8 {
2690            let (public_key, secret_key) = ml_dsa.generate_keypair().expect("keygen");
2691
2692            let rewards_address = RewardsAddress::new([i; 20]);
2693            let metrics_tracker = QuotingMetricsTracker::new(0);
2694            let mut generator = QuoteGenerator::new(rewards_address, metrics_tracker);
2695
2696            let pub_key_bytes = public_key.as_bytes().to_vec();
2697            let sk_bytes = secret_key.as_bytes().to_vec();
2698            generator.set_signer(pub_key_bytes, move |msg| {
2699                let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("sk parse");
2700                let ml_dsa = MlDsa65::new();
2701                ml_dsa.sign(&sk, msg).expect("sign").as_bytes().to_vec()
2702            });
2703
2704            let content: XorName = [i; 32];
2705            let quote = generator.create_quote(content, 4096, 0).expect("quote");
2706
2707            peer_quotes.push((EncodedPeerId::new(rand::random()), quote));
2708        }
2709
2710        let proof = PaymentProof {
2711            proof_of_payment: ProofOfPayment { peer_quotes },
2712            tx_hashes: vec![FixedBytes::from([0xABu8; 32])],
2713        };
2714
2715        let proof_bytes =
2716            crate::payment::proof::serialize_single_node_proof(&proof).expect("serialize");
2717
2718        // 7 ML-DSA-65 quotes with ~1952-byte pub keys and ~3309-byte signatures
2719        // should produce a proof in the 30-80 KB range
2720        assert!(
2721            proof_bytes.len() > 20_000,
2722            "Real 7-quote ML-DSA proof should be > 20 KB, got {} bytes",
2723            proof_bytes.len()
2724        );
2725        assert!(
2726            proof_bytes.len() < MAX_PAYMENT_PROOF_SIZE_BYTES,
2727            "Real 7-quote ML-DSA proof ({} bytes) should fit within {} byte limit",
2728            proof_bytes.len(),
2729            MAX_PAYMENT_PROOF_SIZE_BYTES
2730        );
2731    }
2732
2733    #[tokio::test]
2734    async fn test_content_address_mismatch_rejected() {
2735        use crate::payment::proof::{serialize_single_node_proof, PaymentProof};
2736        use evmlib::{EncodedPeerId, PaymentQuote, RewardsAddress};
2737        use std::time::SystemTime;
2738
2739        let verifier = create_test_verifier();
2740
2741        // The xorname we're trying to store
2742        let target_xorname = [0xAAu8; 32];
2743
2744        // Create a quote for a DIFFERENT xorname
2745        let wrong_xorname = [0xBBu8; 32];
2746        let quote = PaymentQuote {
2747            content: xor_name::XorName(wrong_xorname),
2748            timestamp: SystemTime::now(),
2749            price: Amount::from(1u64),
2750            rewards_address: RewardsAddress::new([1u8; 20]),
2751            pub_key: vec![0u8; 64],
2752            signature: vec![0u8; 64],
2753        };
2754
2755        // Build CLOSE_GROUP_SIZE quotes with distinct peer IDs
2756        let mut peer_quotes = Vec::new();
2757        for _ in 0..CLOSE_GROUP_SIZE {
2758            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2759        }
2760
2761        let proof = PaymentProof {
2762            proof_of_payment: ProofOfPayment { peer_quotes },
2763            tx_hashes: vec![],
2764        };
2765
2766        let proof_bytes = serialize_single_node_proof(&proof).expect("serialize proof");
2767
2768        let result = verifier
2769            .verify_payment(
2770                &target_xorname,
2771                Some(&proof_bytes),
2772                VerificationContext::ClientPut,
2773            )
2774            .await;
2775
2776        assert!(result.is_err(), "Should reject mismatched content address");
2777        let err_msg = format!("{}", result.expect_err("should be error"));
2778        assert!(
2779            err_msg.contains("content address mismatch"),
2780            "Error should mention 'content address mismatch': {err_msg}"
2781        );
2782    }
2783
2784    /// Helper: create a fake quote with the given xorname and timestamp.
2785    fn make_fake_quote(
2786        xorname: [u8; 32],
2787        timestamp: SystemTime,
2788        rewards_address: RewardsAddress,
2789    ) -> evmlib::PaymentQuote {
2790        use evmlib::PaymentQuote;
2791
2792        PaymentQuote {
2793            content: xor_name::XorName(xorname),
2794            timestamp,
2795            price: Amount::from(1u64),
2796            rewards_address,
2797            pub_key: vec![0u8; 64],
2798            signature: vec![0u8; 64],
2799        }
2800    }
2801
2802    /// Helper: wrap quotes into a tagged serialized `PaymentProof`.
2803    fn serialize_proof(peer_quotes: Vec<(evmlib::EncodedPeerId, evmlib::PaymentQuote)>) -> Vec<u8> {
2804        use crate::payment::proof::{serialize_single_node_proof, PaymentProof};
2805
2806        let proof = PaymentProof {
2807            proof_of_payment: ProofOfPayment { peer_quotes },
2808            tx_hashes: vec![],
2809        };
2810        serialize_single_node_proof(&proof).expect("serialize proof")
2811    }
2812
2813    #[tokio::test]
2814    async fn test_old_quote_uses_storage_delta_not_timestamp() {
2815        use evmlib::{EncodedPeerId, RewardsAddress};
2816        use std::time::Duration;
2817
2818        let verifier = create_test_verifier();
2819        let xorname = [0xCCu8; 32];
2820        let rewards_addr = RewardsAddress::new([1u8; 20]);
2821
2822        // Create a quote that's 25 hours old (exceeds 24-hour max)
2823        let old_timestamp = SystemTime::now() - Duration::from_secs(25 * 3600);
2824        let quote = make_fake_quote(xorname, old_timestamp, rewards_addr);
2825
2826        let mut peer_quotes = Vec::new();
2827        for _ in 0..CLOSE_GROUP_SIZE {
2828            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2829        }
2830
2831        let proof_bytes = serialize_proof(peer_quotes);
2832        let result = verifier
2833            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2834            .await;
2835
2836        let err_msg = format!("{}", result.expect_err("should fail at later check"));
2837        assert!(
2838            !err_msg.contains("expired"),
2839            "Should not reject by timestamp age: {err_msg}"
2840        );
2841    }
2842
2843    #[tokio::test]
2844    async fn test_future_quote_uses_storage_delta_not_timestamp() {
2845        use evmlib::{EncodedPeerId, RewardsAddress};
2846        use std::time::Duration;
2847
2848        let verifier = create_test_verifier();
2849        let xorname = [0xDDu8; 32];
2850        let rewards_addr = RewardsAddress::new([1u8; 20]);
2851
2852        // Create a quote with a timestamp 1 hour in the future
2853        let future_timestamp = SystemTime::now() + Duration::from_secs(3600);
2854        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
2855
2856        let mut peer_quotes = Vec::new();
2857        for _ in 0..CLOSE_GROUP_SIZE {
2858            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2859        }
2860
2861        let proof_bytes = serialize_proof(peer_quotes);
2862        let result = verifier
2863            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2864            .await;
2865
2866        let err_msg = format!("{}", result.expect_err("should fail at later check"));
2867        assert!(
2868            !err_msg.contains("future"),
2869            "Should not reject by future timestamp: {err_msg}"
2870        );
2871    }
2872
2873    #[tokio::test]
2874    async fn test_quote_within_clock_skew_tolerance_accepted() {
2875        use evmlib::{EncodedPeerId, RewardsAddress};
2876        use std::time::Duration;
2877
2878        let verifier = create_test_verifier();
2879        let xorname = [0xD1u8; 32];
2880        let rewards_addr = RewardsAddress::new([1u8; 20]);
2881
2882        // Quote 30 seconds in the future — well within 300s tolerance
2883        let future_timestamp = SystemTime::now() + Duration::from_secs(30);
2884        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
2885
2886        let mut peer_quotes = Vec::new();
2887        for _ in 0..CLOSE_GROUP_SIZE {
2888            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2889        }
2890
2891        let proof_bytes = serialize_proof(peer_quotes);
2892        let result = verifier
2893            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2894            .await;
2895
2896        // Should NOT fail at timestamp check (will fail later at pub_key binding)
2897        let err_msg = format!("{}", result.expect_err("should fail at later check"));
2898        assert!(
2899            !err_msg.contains("future"),
2900            "Should pass timestamp check (within tolerance), but got: {err_msg}"
2901        );
2902    }
2903
2904    #[tokio::test]
2905    async fn test_quote_beyond_clock_skew_still_uses_storage_delta() {
2906        use evmlib::{EncodedPeerId, RewardsAddress};
2907        use std::time::Duration;
2908
2909        let verifier = create_test_verifier();
2910        let xorname = [0xD2u8; 32];
2911        let rewards_addr = RewardsAddress::new([1u8; 20]);
2912
2913        // Quote 360 seconds in the future — exceeds 300s tolerance
2914        let future_timestamp = SystemTime::now() + Duration::from_secs(360);
2915        let quote = make_fake_quote(xorname, future_timestamp, rewards_addr);
2916
2917        let mut peer_quotes = Vec::new();
2918        for _ in 0..CLOSE_GROUP_SIZE {
2919            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2920        }
2921
2922        let proof_bytes = serialize_proof(peer_quotes);
2923        let result = verifier
2924            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2925            .await;
2926
2927        let err_msg = format!("{}", result.expect_err("should fail at later check"));
2928        assert!(
2929            !err_msg.contains("future"),
2930            "Should not reject by future timestamp: {err_msg}"
2931        );
2932    }
2933
2934    #[tokio::test]
2935    async fn test_quote_23h_old_still_accepted() {
2936        use evmlib::{EncodedPeerId, RewardsAddress};
2937        use std::time::Duration;
2938
2939        let verifier = create_test_verifier();
2940        let xorname = [0xD3u8; 32];
2941        let rewards_addr = RewardsAddress::new([1u8; 20]);
2942
2943        // Quote 23 hours old — within 24h max age
2944        let old_timestamp = SystemTime::now() - Duration::from_secs(23 * 3600);
2945        let quote = make_fake_quote(xorname, old_timestamp, rewards_addr);
2946
2947        let mut peer_quotes = Vec::new();
2948        for _ in 0..CLOSE_GROUP_SIZE {
2949            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2950        }
2951
2952        let proof_bytes = serialize_proof(peer_quotes);
2953        let result = verifier
2954            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
2955            .await;
2956
2957        // Should NOT fail at timestamp check (will fail later at pub_key binding)
2958        let err_msg = format!("{}", result.expect_err("should fail at later check"));
2959        assert!(
2960            !err_msg.contains("expired"),
2961            "Should pass expiry check (23h < 24h), but got: {err_msg}"
2962        );
2963    }
2964
2965    /// Helper: build an `EncodedPeerId` that matches the BLAKE3 hash of an ML-DSA public key.
2966    fn encoded_peer_id_for_pub_key(pub_key: &[u8]) -> evmlib::EncodedPeerId {
2967        let ant_peer_id = peer_id_from_public_key_bytes(pub_key).expect("valid ML-DSA pub key");
2968        evmlib::EncodedPeerId::new(*ant_peer_id.as_bytes())
2969    }
2970
2971    #[tokio::test]
2972    async fn test_wrong_peer_binding_rejected() {
2973        use evmlib::{EncodedPeerId, RewardsAddress};
2974        use saorsa_core::MlDsa65;
2975        use saorsa_pqc::pqc::MlDsaOperations;
2976
2977        let verifier = create_test_verifier();
2978        let xorname = [0xFFu8; 32];
2979        let rewards_addr = RewardsAddress::new([1u8; 20]);
2980
2981        // Generate a real ML-DSA keypair so pub_key is valid
2982        let ml_dsa = MlDsa65::new();
2983        let (public_key, _secret_key) = ml_dsa.generate_keypair().expect("keygen");
2984        let pub_key_bytes = public_key.as_bytes().to_vec();
2985
2986        // Create a quote with a real pub_key but attach it to a random peer ID
2987        // whose identity multihash does NOT match BLAKE3(pub_key)
2988        let mut quote = make_fake_quote(xorname, SystemTime::now(), rewards_addr);
2989        quote.pub_key = pub_key_bytes;
2990
2991        // Use random ed25519 peer IDs — they won't match BLAKE3(pub_key)
2992        let mut peer_quotes = Vec::new();
2993        for _ in 0..CLOSE_GROUP_SIZE {
2994            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
2995        }
2996
2997        let proof_bytes = serialize_proof(peer_quotes);
2998        let result = verifier
2999            .verify_payment(&xorname, Some(&proof_bytes), VerificationContext::ClientPut)
3000            .await;
3001
3002        assert!(result.is_err(), "Should reject wrong peer binding");
3003        let err_msg = format!("{}", result.expect_err("should fail"));
3004        assert!(
3005            err_msg.contains("pub_key does not belong to claimed peer"),
3006            "Error should mention binding mismatch: {err_msg}"
3007        );
3008    }
3009
3010    // =========================================================================
3011    // VerificationContext tests — both contexts verify fresh proof admissions.
3012    // Later neighbour-sync repair has no proof-of-payment and is authorized by
3013    // closest-7 storage quorum or closest-K paid-list quorum instead.
3014    // =========================================================================
3015
3016    /// Content binding is required for every fresh proof context. A receipt for
3017    /// chunk A cannot admit chunk B as either a direct/fresh store or a fresh
3018    /// paid-list update.
3019    #[tokio::test]
3020    async fn test_fresh_contexts_reject_content_mismatch() {
3021        let verifier = create_test_verifier();
3022        let stored_xorname = [0xD2u8; 32];
3023        let quoted_xorname = [0xD3u8; 32];
3024        let rewards = RewardsAddress::new([1u8; 20]);
3025
3026        let mut peer_quotes = Vec::new();
3027        for _ in 0..CLOSE_GROUP_SIZE {
3028            let quote = make_fake_quote(quoted_xorname, SystemTime::now(), rewards);
3029            peer_quotes.push((evmlib::EncodedPeerId::new(rand::random()), quote));
3030        }
3031        let proof_bytes = serialize_proof(peer_quotes);
3032
3033        for context in [
3034            VerificationContext::ClientPut,
3035            VerificationContext::PaidListAdmission,
3036        ] {
3037            let err = verifier
3038                .verify_payment(&stored_xorname, Some(&proof_bytes), context)
3039                .await
3040                .expect_err("content binding must hold in every context");
3041            assert!(
3042                format!("{err}").contains("content address mismatch"),
3043                "{context:?} must reject a receipt for a different address: {err}"
3044            );
3045        }
3046    }
3047
3048    /// The merkle pay-yourself closeness defence (including its duplicate-
3049    /// candidate pre-check, which runs without a `P2PNode`) applies to every
3050    /// proof verification context because every context is a fresh admission.
3051    #[tokio::test]
3052    async fn test_fresh_contexts_enforce_merkle_closeness() {
3053        let verifier = create_test_verifier();
3054
3055        let (mut merkle_proof, _pool_hash, xorname, _timestamp) = make_valid_merkle_proof();
3056
3057        // 16 copies of one real candidate: every self-signature is valid, but
3058        // the candidate PeerIds are duplicates — the closeness pre-check
3059        // rejects this pool on a client PUT.
3060        let shared = merkle_proof
3061            .winner_pool
3062            .candidate_nodes
3063            .first()
3064            .expect("candidates")
3065            .clone();
3066        for c in &mut merkle_proof.winner_pool.candidate_nodes {
3067            *c = shared.clone();
3068        }
3069        let tagged =
3070            crate::payment::proof::serialize_merkle_proof(&merkle_proof).expect("serialize");
3071
3072        for context in [
3073            VerificationContext::ClientPut,
3074            VerificationContext::PaidListAdmission,
3075        ] {
3076            let err = verifier
3077                .verify_payment(&xorname, Some(&tagged), context)
3078                .await
3079                .expect_err("duplicate candidate PeerIds must fail fresh admission closeness");
3080            assert!(
3081                format!("{err}").contains("duplicate candidate PeerId"),
3082                "{context:?} must fail at the closeness pre-check: {err}"
3083            );
3084        }
3085    }
3086
3087    // =========================================================================
3088    // Merkle-tagged proof tests
3089    // =========================================================================
3090
3091    #[tokio::test]
3092    async fn test_merkle_tagged_proof_invalid_data_rejected() {
3093        use crate::ant_protocol::PROOF_TAG_MERKLE;
3094
3095        let verifier = create_test_verifier();
3096        let xorname = [0xA1u8; 32];
3097
3098        // Build a merkle-tagged proof with garbage body.
3099        // The tag byte is correct but the body is not valid msgpack.
3100        let mut merkle_garbage = Vec::with_capacity(64);
3101        merkle_garbage.push(PROOF_TAG_MERKLE);
3102        merkle_garbage.extend_from_slice(&[0xAB; 63]);
3103
3104        let result = verifier
3105            .verify_payment(
3106                &xorname,
3107                Some(&merkle_garbage),
3108                VerificationContext::ClientPut,
3109            )
3110            .await;
3111
3112        assert!(
3113            result.is_err(),
3114            "Should reject merkle proof with invalid body"
3115        );
3116        let err_msg = format!("{}", result.expect_err("should fail"));
3117        assert!(
3118            err_msg.contains("deserialize") || err_msg.contains("merkle proof"),
3119            "Error should mention deserialization failure: {err_msg}"
3120        );
3121    }
3122
3123    #[tokio::test]
3124    async fn test_single_node_tagged_proof_deserialization() {
3125        use crate::payment::proof::serialize_single_node_proof;
3126        use evmlib::{EncodedPeerId, RewardsAddress};
3127
3128        let verifier = create_test_verifier();
3129        let xorname = [0xA2u8; 32];
3130        let rewards_addr = RewardsAddress::new([1u8; 20]);
3131
3132        // Build a valid tagged single-node proof
3133        let quote = make_fake_quote(xorname, SystemTime::now(), rewards_addr);
3134        let mut peer_quotes = Vec::new();
3135        for _ in 0..CLOSE_GROUP_SIZE {
3136            peer_quotes.push((EncodedPeerId::new(rand::random()), quote.clone()));
3137        }
3138
3139        let proof = crate::payment::proof::PaymentProof {
3140            proof_of_payment: ProofOfPayment {
3141                peer_quotes: peer_quotes.clone(),
3142            },
3143            tx_hashes: vec![],
3144        };
3145
3146        let tagged_bytes = serialize_single_node_proof(&proof).expect("serialize tagged proof");
3147
3148        // detect_proof_type should identify it as SingleNode
3149        assert_eq!(
3150            crate::payment::proof::detect_proof_type(&tagged_bytes),
3151            Some(crate::payment::proof::ProofType::SingleNode)
3152        );
3153
3154        // verify_payment should process it through the single-node path.
3155        // It will fail at quote validation (fake pub_key), but we verify
3156        // it passes the deserialization stage by checking the error type.
3157        let result = verifier
3158            .verify_payment(
3159                &xorname,
3160                Some(&tagged_bytes),
3161                VerificationContext::ClientPut,
3162            )
3163            .await;
3164
3165        assert!(result.is_err(), "Should fail at quote validation stage");
3166        let err_msg = format!("{}", result.expect_err("should fail"));
3167        // It should NOT be a deserialization error — it should get further
3168        assert!(
3169            !err_msg.contains("deserialize"),
3170            "Should pass deserialization but fail later: {err_msg}"
3171        );
3172    }
3173
3174    #[test]
3175    fn test_pool_cache_insert_and_lookup() {
3176        use evmlib::merkle_batch_payment::PoolHash;
3177
3178        // Verify the pool_cache field exists and works correctly.
3179        // Insert a pool hash, then verify it's present on lookup.
3180        let verifier = create_test_verifier();
3181
3182        let pool_hash: PoolHash = [0xBBu8; 32];
3183        let payment_info = evmlib::merkle_payments::OnChainPaymentInfo {
3184            depth: 4,
3185            merkle_payment_timestamp: 1_700_000_000,
3186            paid_node_addresses: vec![],
3187        };
3188
3189        // Insert into pool cache
3190        {
3191            let mut cache = verifier.pool_cache.lock();
3192            cache.put(pool_hash, payment_info);
3193        }
3194
3195        // First lookup — should find it
3196        {
3197            let found = verifier.pool_cache.lock().get(&pool_hash).cloned();
3198            assert!(found.is_some(), "Pool hash should be in cache after insert");
3199            let info = found.expect("cached info");
3200            assert_eq!(info.depth, 4);
3201            assert_eq!(info.merkle_payment_timestamp, 1_700_000_000);
3202        }
3203
3204        // Second lookup — same result (no double-query needed)
3205        {
3206            let found = verifier.pool_cache.lock().get(&pool_hash).cloned();
3207            assert!(
3208                found.is_some(),
3209                "Pool hash should still be in cache on second lookup"
3210            );
3211        }
3212
3213        // Different pool hash — should NOT be found
3214        let other_hash: PoolHash = [0xCCu8; 32];
3215        {
3216            let found = verifier.pool_cache.lock().get(&other_hash).cloned();
3217            assert!(found.is_none(), "Unknown pool hash should not be in cache");
3218        }
3219    }
3220
3221    #[tokio::test]
3222    async fn closeness_pass_cache_short_circuits_second_call() {
3223        // When a pool_hash is in the closeness_pass_cache, the outer
3224        // verify_merkle_candidate_closeness must return Ok(()) without
3225        // running the inner lookup — even if no P2PNode is attached.
3226        // That second half (no-p2p → would normally fail-closed in release)
3227        // is the proof the cache short-circuit ran first.
3228        let verifier = create_test_verifier();
3229        let pool_hash = [0xAAu8; 32];
3230        verifier.closeness_pass_cache.lock().put(pool_hash, ());
3231
3232        // Construct a dummy pool — contents don't matter because the cache
3233        // hit means we never look at them.
3234        let pool = MerklePaymentCandidatePool {
3235            midpoint_proof: fake_midpoint_proof(),
3236            candidate_nodes: make_candidate_nodes(1_700_000_000),
3237        };
3238
3239        let result = verifier
3240            .verify_merkle_candidate_closeness(&pool, pool_hash)
3241            .await;
3242        assert!(
3243            result.is_ok(),
3244            "cached pool hash must bypass the inner check and return Ok(()), got: {result:?}"
3245        );
3246    }
3247
3248    #[tokio::test]
3249    async fn closeness_single_flight_concurrent_readers_share_one_verification() {
3250        // Two concurrent callers for the same pool_hash should produce the
3251        // same outcome, and the cache should end up populated exactly once.
3252        // We use the test-utils fail-open path to short-circuit the inner
3253        // DHT lookup; the purpose of this test is the single-flight
3254        // plumbing, not the lookup itself.
3255        let verifier = Arc::new(create_test_verifier());
3256        let pool_hash = [0x77u8; 32];
3257        let pool = MerklePaymentCandidatePool {
3258            midpoint_proof: fake_midpoint_proof(),
3259            candidate_nodes: make_candidate_nodes(1_700_000_000),
3260        };
3261
3262        let v1 = Arc::clone(&verifier);
3263        let p1 = pool.clone();
3264        let v2 = Arc::clone(&verifier);
3265        let p2 = pool.clone();
3266
3267        let (r1, r2) = tokio::join!(
3268            async move { v1.verify_merkle_candidate_closeness(&p1, pool_hash).await },
3269            async move { v2.verify_merkle_candidate_closeness(&p2, pool_hash).await },
3270        );
3271
3272        assert_eq!(r1.is_ok(), r2.is_ok(), "concurrent callers must agree");
3273        assert!(
3274            r1.is_ok(),
3275            "both callers must succeed on the test-utils path"
3276        );
3277        assert!(
3278            verifier
3279                .closeness_pass_cache
3280                .lock()
3281                .get(&pool_hash)
3282                .is_some(),
3283            "success path must populate the pass cache"
3284        );
3285        assert!(
3286            verifier.inflight_closeness.lock().get(&pool_hash).is_none(),
3287            "inflight slot must be cleared after the leader finishes"
3288        );
3289    }
3290
3291    #[tokio::test]
3292    async fn closeness_waiter_reads_leaders_published_failure() {
3293        // Prove the waiter path actually surfaces a failure published by a
3294        // concurrent leader, without running its own inner check. Insert a
3295        // slot, spawn a waiter (which will park on notified_owned), then
3296        // publish failure + notify from the outside — simulating what the
3297        // leader's `publish` + drop-guard pair does.
3298        let verifier = Arc::new(create_test_verifier());
3299        let pool_hash = [0x55u8; 32];
3300        let slot = Arc::new(ClosenessSlot::new());
3301        verifier
3302            .inflight_closeness
3303            .lock()
3304            .put(pool_hash, Arc::clone(&slot));
3305
3306        let pool = MerklePaymentCandidatePool {
3307            midpoint_proof: fake_midpoint_proof(),
3308            candidate_nodes: make_candidate_nodes(1_700_000_000),
3309        };
3310
3311        let verifier_c = Arc::clone(&verifier);
3312        let pool_c = pool.clone();
3313        let waiter = tokio::spawn(async move {
3314            verifier_c
3315                .verify_merkle_candidate_closeness(&pool_c, pool_hash)
3316                .await
3317        });
3318
3319        // Yield so the waiter can run up to its `notified_owned().await`.
3320        // A few yields cover both single-threaded and multi-threaded tokio
3321        // runtimes regardless of scheduling.
3322        for _ in 0..5 {
3323            tokio::task::yield_now().await;
3324        }
3325
3326        // Simulate the leader's `publish` + drop-guard: publish the result,
3327        // clear the slot, wake waiters.
3328        slot.result
3329            .set(Err("forged pool: not close enough".to_string()))
3330            .expect("set once");
3331        verifier.inflight_closeness.lock().pop(&pool_hash);
3332        slot.notify.notify_waiters();
3333
3334        let result = waiter.await.expect("task panicked");
3335        let err = result.expect_err("waiter must return the leader's published failure");
3336        assert!(
3337            err.to_string().contains("forged pool"),
3338            "waiter must surface the leader's error message, got: {err}"
3339        );
3340    }
3341
3342    #[tokio::test]
3343    async fn closeness_rejects_pool_with_duplicate_candidate_pub_keys() {
3344        // An attacker who submits 16 copies of the same real peer's pub_key
3345        // would otherwise satisfy the closeness threshold trivially:
3346        // that one peer's membership in the DHT-returned set would count
3347        // 16 times. The dedupe check in verify_merkle_candidate_closeness_inner
3348        // must reject the pool BEFORE the network lookup runs (so this test
3349        // works even with no P2PNode attached).
3350        let verifier = create_test_verifier();
3351        let pool_hash = [0xDDu8; 32];
3352
3353        // Build a normal pool, then overwrite every candidate's pub_key
3354        // with a single shared key so all 16 derive to the same PeerId.
3355        let mut candidates = make_candidate_nodes(1_700_000_000);
3356        let shared_pub_key = candidates
3357            .first()
3358            .expect("make_candidate_nodes returns CANDIDATES_PER_POOL entries")
3359            .pub_key
3360            .clone();
3361        for c in &mut candidates {
3362            c.pub_key = shared_pub_key.clone();
3363        }
3364        let pool = MerklePaymentCandidatePool {
3365            midpoint_proof: fake_midpoint_proof(),
3366            candidate_nodes: candidates,
3367        };
3368
3369        let result = verifier
3370            .verify_merkle_candidate_closeness(&pool, pool_hash)
3371            .await;
3372        let err = result.expect_err("duplicate candidate PeerIds must be rejected");
3373        let msg = err.to_string();
3374        assert!(
3375            msg.contains("duplicate candidate PeerId"),
3376            "rejection must be the duplicate-PeerId branch, got: {msg}"
3377        );
3378    }
3379
3380    /// Build a deterministic but otherwise-unused `MidpointProof` so unit
3381    /// tests can construct a `MerklePaymentCandidatePool` without spinning
3382    /// up a real merkle tree. The closeness path only calls `.address()`
3383    /// on it, which is a pure BLAKE3 of the branch's leaf/root/timestamp —
3384    /// the values don't need to be tree-valid for these tests.
3385    fn fake_midpoint_proof() -> evmlib::merkle_payments::MidpointProof {
3386        // Build a minimal tree of two leaves so we get a real branch.
3387        let leaves = vec![xor_name::XorName([1u8; 32]), xor_name::XorName([2u8; 32])];
3388        let tree = evmlib::merkle_payments::MerkleTree::from_xornames(leaves).expect("tree");
3389        let candidates = tree.reward_candidates(1_700_000_000).expect("candidates");
3390        candidates.first().expect("at least one").clone()
3391    }
3392
3393    // =========================================================================
3394    // Merkle verification unit tests
3395    // =========================================================================
3396
3397    /// Helper: build 16 validly-signed ML-DSA-65 candidate nodes.
3398    fn make_candidate_nodes(
3399        timestamp: u64,
3400    ) -> [evmlib::merkle_payments::MerklePaymentCandidateNode;
3401           evmlib::merkle_payments::CANDIDATES_PER_POOL] {
3402        use evmlib::merkle_payments::{MerklePaymentCandidateNode, CANDIDATES_PER_POOL};
3403        use saorsa_core::MlDsa65;
3404        use saorsa_pqc::pqc::types::MlDsaSecretKey;
3405        use saorsa_pqc::pqc::MlDsaOperations;
3406
3407        std::array::from_fn::<_, CANDIDATES_PER_POOL, _>(|i| {
3408            let ml_dsa = MlDsa65::new();
3409            let (pub_key, secret_key) = ml_dsa.generate_keypair().expect("keygen");
3410            let price = evmlib::common::Amount::from(1024u64);
3411            #[allow(clippy::cast_possible_truncation)]
3412            let reward_address = RewardsAddress::new([i as u8; 20]);
3413            let msg = MerklePaymentCandidateNode::bytes_to_sign(&price, &reward_address, timestamp);
3414            let sk = MlDsaSecretKey::from_bytes(secret_key.as_bytes()).expect("sk");
3415            let signature = ml_dsa.sign(&sk, &msg).expect("sign").as_bytes().to_vec();
3416
3417            MerklePaymentCandidateNode {
3418                pub_key: pub_key.as_bytes().to_vec(),
3419                price,
3420                reward_address,
3421                merkle_payment_timestamp: timestamp,
3422                signature,
3423            }
3424        })
3425    }
3426
3427    /// Helper: build a valid `MerklePaymentProof` with real ML-DSA-65
3428    /// signatures. Returns the raw proof, pool hash, xorname, and timestamp.
3429    fn make_valid_merkle_proof() -> (
3430        evmlib::merkle_payments::MerklePaymentProof,
3431        evmlib::merkle_batch_payment::PoolHash,
3432        [u8; 32],
3433        u64,
3434    ) {
3435        use evmlib::merkle_payments::{MerklePaymentCandidatePool, MerklePaymentProof, MerkleTree};
3436
3437        let timestamp = std::time::SystemTime::now()
3438            .duration_since(std::time::UNIX_EPOCH)
3439            .expect("system time")
3440            .as_secs();
3441
3442        let addresses: Vec<xor_name::XorName> = (0..4u8)
3443            .map(|i| xor_name::XorName::from_content(&[i]))
3444            .collect();
3445        let tree = MerkleTree::from_xornames(addresses.clone()).expect("tree");
3446
3447        let candidate_nodes = make_candidate_nodes(timestamp);
3448
3449        let reward_candidates = tree
3450            .reward_candidates(timestamp)
3451            .expect("reward candidates");
3452        let midpoint_proof = reward_candidates
3453            .first()
3454            .expect("at least one candidate")
3455            .clone();
3456
3457        let pool = MerklePaymentCandidatePool {
3458            midpoint_proof,
3459            candidate_nodes,
3460        };
3461
3462        let first_address = *addresses.first().expect("first address");
3463        let address_proof = tree
3464            .generate_address_proof(0, first_address)
3465            .expect("proof");
3466
3467        let merkle_proof = MerklePaymentProof::new(first_address, address_proof, pool);
3468        let pool_hash = merkle_proof.winner_pool_hash();
3469        let xorname = first_address.0;
3470
3471        (merkle_proof, pool_hash, xorname, timestamp)
3472    }
3473
3474    /// Helper: build a minimal valid `MerklePaymentProof` with real ML-DSA-65
3475    /// signatures. Returns `(xorname, serialized_tagged_proof, pool_hash, timestamp)`.
3476    fn make_valid_merkle_proof_bytes() -> (
3477        [u8; 32],
3478        Vec<u8>,
3479        evmlib::merkle_batch_payment::PoolHash,
3480        u64,
3481    ) {
3482        let (merkle_proof, pool_hash, xorname, timestamp) = make_valid_merkle_proof();
3483        let tagged = crate::payment::proof::serialize_merkle_proof(&merkle_proof)
3484            .expect("serialize merkle proof");
3485        (xorname, tagged, pool_hash, timestamp)
3486    }
3487
3488    #[tokio::test]
3489    async fn test_merkle_address_mismatch_rejected() {
3490        let verifier = create_test_verifier();
3491        let (_correct_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
3492
3493        // Use a DIFFERENT xorname than what the proof was built for
3494        let wrong_xorname = [0xFFu8; 32];
3495
3496        let result = verifier
3497            .verify_payment(
3498                &wrong_xorname,
3499                Some(&tagged_proof),
3500                VerificationContext::ClientPut,
3501            )
3502            .await;
3503
3504        assert!(
3505            result.is_err(),
3506            "Should reject merkle proof address mismatch"
3507        );
3508        let err_msg = format!("{}", result.expect_err("should fail"));
3509        assert!(
3510            err_msg.contains("address mismatch") || err_msg.contains("Merkle proof address"),
3511            "Error should mention address mismatch: {err_msg}"
3512        );
3513    }
3514
3515    #[tokio::test]
3516    async fn test_merkle_malformed_body_rejected() {
3517        let verifier = create_test_verifier();
3518        let xorname = [0xA3u8; 32];
3519
3520        // Valid merkle tag but truncated/corrupted msgpack body
3521        let mut bad_proof = vec![crate::ant_protocol::PROOF_TAG_MERKLE];
3522        bad_proof.extend_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
3523        bad_proof.extend_from_slice(&[0x00; 10]);
3524        // pad to minimum size
3525        while bad_proof.len() < MIN_PAYMENT_PROOF_SIZE_BYTES {
3526            bad_proof.push(0x00);
3527        }
3528
3529        let result = verifier
3530            .verify_payment(&xorname, Some(&bad_proof), VerificationContext::ClientPut)
3531            .await;
3532
3533        assert!(result.is_err(), "Should reject malformed merkle body");
3534        let err_msg = format!("{}", result.expect_err("should fail"));
3535        assert!(
3536            err_msg.contains("deserialize") || err_msg.contains("Failed"),
3537            "Error should mention deserialization: {err_msg}"
3538        );
3539    }
3540
3541    #[test]
3542    fn test_merkle_proof_serialized_size_within_limits() {
3543        let (_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
3544
3545        // 16 ML-DSA-65 candidates (~1952 pub key + ~3309 sig each) ≈ 84 KB + tree data
3546        assert!(
3547            tagged_proof.len() >= MIN_PAYMENT_PROOF_SIZE_BYTES,
3548            "Merkle proof ({} bytes) should be >= min {} bytes",
3549            tagged_proof.len(),
3550            MIN_PAYMENT_PROOF_SIZE_BYTES
3551        );
3552        assert!(
3553            tagged_proof.len() <= MAX_PAYMENT_PROOF_SIZE_BYTES,
3554            "Merkle proof ({} bytes) should be <= max {} bytes",
3555            tagged_proof.len(),
3556            MAX_PAYMENT_PROOF_SIZE_BYTES
3557        );
3558    }
3559
3560    #[test]
3561    fn test_merkle_proof_tag_is_correct() {
3562        let (_xorname, tagged_proof, _pool_hash, _ts) = make_valid_merkle_proof_bytes();
3563
3564        assert_eq!(
3565            tagged_proof.first().copied(),
3566            Some(crate::ant_protocol::PROOF_TAG_MERKLE),
3567            "First byte must be the merkle tag"
3568        );
3569        assert_eq!(
3570            crate::payment::proof::detect_proof_type(&tagged_proof),
3571            Some(crate::payment::proof::ProofType::Merkle)
3572        );
3573    }
3574
3575    #[test]
3576    fn test_pool_cache_eviction() {
3577        use evmlib::merkle_batch_payment::PoolHash;
3578
3579        let config = PaymentVerifierConfig {
3580            evm: EvmVerifierConfig::default(),
3581            cache_capacity: 100,
3582            close_group_size: CLOSE_GROUP_SIZE,
3583            local_rewards_address: RewardsAddress::new([1u8; 20]),
3584        };
3585        let verifier = PaymentVerifier::new(config);
3586
3587        // Fill the pool cache to capacity (DEFAULT_POOL_CACHE_CAPACITY = 1000)
3588        for i in 0..DEFAULT_POOL_CACHE_CAPACITY {
3589            let mut hash: PoolHash = [0u8; 32];
3590            // Write index bytes into the hash
3591            let idx_bytes = i.to_le_bytes();
3592            for (j, b) in idx_bytes.iter().enumerate() {
3593                if j < 32 {
3594                    hash[j] = *b;
3595                }
3596            }
3597            let info = evmlib::merkle_payments::OnChainPaymentInfo {
3598                depth: 4,
3599                merkle_payment_timestamp: 1_700_000_000,
3600                paid_node_addresses: vec![],
3601            };
3602            verifier.pool_cache.lock().put(hash, info);
3603        }
3604
3605        assert_eq!(
3606            verifier.pool_cache.lock().len(),
3607            DEFAULT_POOL_CACHE_CAPACITY
3608        );
3609
3610        // Insert one more — should evict the oldest
3611        let overflow_hash: PoolHash = [0xFFu8; 32];
3612        let info = evmlib::merkle_payments::OnChainPaymentInfo {
3613            depth: 8,
3614            merkle_payment_timestamp: 1_800_000_000,
3615            paid_node_addresses: vec![],
3616        };
3617        verifier.pool_cache.lock().put(overflow_hash, info);
3618
3619        // Size should still be at capacity (not capacity + 1)
3620        assert_eq!(
3621            verifier.pool_cache.lock().len(),
3622            DEFAULT_POOL_CACHE_CAPACITY
3623        );
3624
3625        // The new entry should be present
3626        let found = verifier.pool_cache.lock().get(&overflow_hash).cloned();
3627        assert!(
3628            found.is_some(),
3629            "Newly inserted pool hash should be present"
3630        );
3631        assert_eq!(found.expect("info").depth, 8);
3632    }
3633
3634    #[test]
3635    fn test_pool_cache_concurrent_access() {
3636        use evmlib::merkle_batch_payment::PoolHash;
3637        use std::sync::Arc;
3638
3639        let verifier = Arc::new(create_test_verifier());
3640
3641        let mut handles = Vec::new();
3642        for i in 0..20u8 {
3643            let v = verifier.clone();
3644            handles.push(std::thread::spawn(move || {
3645                let hash: PoolHash = [i; 32];
3646                let info = evmlib::merkle_payments::OnChainPaymentInfo {
3647                    depth: i,
3648                    merkle_payment_timestamp: u64::from(i) * 1000,
3649                    paid_node_addresses: vec![],
3650                };
3651                v.pool_cache.lock().put(hash, info);
3652
3653                // Read back
3654                let found = v.pool_cache.lock().get(&hash).cloned();
3655                assert!(found.is_some(), "Entry {i} should be readable after insert");
3656            }));
3657        }
3658
3659        for handle in handles {
3660            handle.join().expect("thread panicked");
3661        }
3662
3663        // All 20 entries should be present (well under 1000 capacity)
3664        assert_eq!(verifier.pool_cache.lock().len(), 20);
3665    }
3666
3667    #[tokio::test]
3668    async fn test_merkle_tampered_candidate_signature_rejected() {
3669        let verifier = create_test_verifier();
3670
3671        let (mut merkle_proof, _pool_hash, xorname, timestamp) = make_valid_merkle_proof();
3672
3673        // Tamper the first candidate's signature
3674        if let Some(byte) = merkle_proof
3675            .winner_pool
3676            .candidate_nodes
3677            .first_mut()
3678            .and_then(|c| c.signature.first_mut())
3679        {
3680            *byte ^= 0xFF;
3681        }
3682
3683        // Recompute pool hash after tampering (signature change alters the hash)
3684        let tampered_pool_hash = merkle_proof.winner_pool_hash();
3685
3686        // Pre-populate pool cache so we skip the on-chain query
3687        {
3688            let info = evmlib::merkle_payments::OnChainPaymentInfo {
3689                depth: 4,
3690                merkle_payment_timestamp: timestamp,
3691                paid_node_addresses: vec![],
3692            };
3693            verifier.pool_cache.lock().put(tampered_pool_hash, info);
3694        }
3695
3696        let tagged =
3697            crate::payment::proof::serialize_merkle_proof(&merkle_proof).expect("serialize");
3698
3699        let result = verifier
3700            .verify_payment(&xorname, Some(&tagged), VerificationContext::ClientPut)
3701            .await;
3702
3703        assert!(
3704            result.is_err(),
3705            "Should reject merkle proof with tampered candidate signature"
3706        );
3707        let err_msg = format!("{}", result.expect_err("should fail"));
3708        assert!(
3709            err_msg.contains("Invalid ML-DSA-65 signature"),
3710            "Error should mention invalid signature: {err_msg}"
3711        );
3712    }
3713
3714    #[tokio::test]
3715    async fn test_merkle_timestamp_mismatch_rejected() {
3716        let verifier = create_test_verifier();
3717
3718        let (xorname, tagged, pool_hash, timestamp) = make_valid_merkle_proof_bytes();
3719
3720        // Pre-populate pool cache with a DIFFERENT timestamp than the candidates
3721        {
3722            let mismatched_ts = timestamp + 9999;
3723            let info = evmlib::merkle_payments::OnChainPaymentInfo {
3724                depth: 4,
3725                merkle_payment_timestamp: mismatched_ts,
3726                paid_node_addresses: vec![],
3727            };
3728            verifier.pool_cache.lock().put(pool_hash, info);
3729        }
3730
3731        let result = verifier
3732            .verify_payment(&xorname, Some(&tagged), VerificationContext::ClientPut)
3733            .await;
3734
3735        assert!(
3736            result.is_err(),
3737            "Should reject merkle proof with timestamp mismatch"
3738        );
3739        let err_msg = format!("{}", result.expect_err("should fail"));
3740        assert!(
3741            err_msg.contains("timestamp mismatch"),
3742            "Error should mention timestamp mismatch: {err_msg}"
3743        );
3744    }
3745
3746    #[tokio::test]
3747    async fn test_merkle_paid_node_index_out_of_bounds_rejected() {
3748        let verifier = create_test_verifier();
3749        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
3750
3751        // The test tree has 4 addresses → depth 2. We must match the tree depth
3752        // so verify_merkle_proof passes the depth check, then the paid node
3753        // index out-of-bounds check fires.
3754        {
3755            let info = evmlib::merkle_payments::OnChainPaymentInfo {
3756                depth: 2,
3757                merkle_payment_timestamp: ts,
3758                paid_node_addresses: vec![
3759                    // First paid node: valid (matches candidate 0, amount matches formula)
3760                    // Expected per-node: median(1024) * 2^2 / 2 = 2048
3761                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(2048u64)),
3762                    // Second paid node: index 999 is way beyond CANDIDATES_PER_POOL (16)
3763                    (RewardsAddress::new([1u8; 20]), 999, Amount::from(2048u64)),
3764                ],
3765            };
3766            verifier.pool_cache.lock().put(pool_hash, info);
3767        }
3768
3769        let result = verifier
3770            .verify_payment(
3771                &xorname,
3772                Some(&tagged_proof),
3773                VerificationContext::ClientPut,
3774            )
3775            .await;
3776
3777        assert!(
3778            result.is_err(),
3779            "Should reject paid node index out of bounds"
3780        );
3781        let err_msg = format!("{}", result.expect_err("should fail"));
3782        assert!(
3783            err_msg.contains("out of bounds"),
3784            "Error should mention out of bounds: {err_msg}"
3785        );
3786    }
3787
3788    #[tokio::test]
3789    async fn test_merkle_paid_node_address_mismatch_rejected() {
3790        let verifier = create_test_verifier();
3791        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
3792
3793        // Tree has depth 2, so provide 2 paid node entries.
3794        // Both use valid indices but the second has a wrong reward address.
3795        {
3796            let info = evmlib::merkle_payments::OnChainPaymentInfo {
3797                depth: 2,
3798                merkle_payment_timestamp: ts,
3799                paid_node_addresses: vec![
3800                    // Index 0 with matching address [0x00; 20]
3801                    // Expected per-node: median(1024) * 2^2 / 2 = 2048
3802                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(2048u64)),
3803                    // Index 1 with WRONG address — candidate 1's address is [0x01; 20]
3804                    (RewardsAddress::new([0xFF; 20]), 1, Amount::from(2048u64)),
3805                ],
3806            };
3807            verifier.pool_cache.lock().put(pool_hash, info);
3808        }
3809
3810        let result = verifier
3811            .verify_payment(
3812                &xorname,
3813                Some(&tagged_proof),
3814                VerificationContext::ClientPut,
3815            )
3816            .await;
3817
3818        assert!(result.is_err(), "Should reject paid node address mismatch");
3819        let err_msg = format!("{}", result.expect_err("should fail"));
3820        assert!(
3821            err_msg.contains("address mismatch"),
3822            "Error should mention address mismatch: {err_msg}"
3823        );
3824    }
3825
3826    #[tokio::test]
3827    async fn test_merkle_wrong_depth_rejected() {
3828        let verifier = create_test_verifier();
3829        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
3830
3831        // Pre-populate pool cache with depth=3 but only 1 paid node address
3832        // (depth must equal paid_node_addresses.len())
3833        {
3834            let info = evmlib::merkle_payments::OnChainPaymentInfo {
3835                depth: 3,
3836                merkle_payment_timestamp: ts,
3837                paid_node_addresses: vec![(
3838                    RewardsAddress::new([0u8; 20]),
3839                    0,
3840                    Amount::from(1024u64),
3841                )],
3842            };
3843            verifier.pool_cache.lock().put(pool_hash, info);
3844        }
3845
3846        let result = verifier
3847            .verify_payment(
3848                &xorname,
3849                Some(&tagged_proof),
3850                VerificationContext::ClientPut,
3851            )
3852            .await;
3853
3854        assert!(
3855            result.is_err(),
3856            "Should reject mismatched depth vs paid node count"
3857        );
3858        let err_msg = format!("{}", result.expect_err("should fail"));
3859        assert!(
3860            err_msg.contains("Wrong number of paid nodes")
3861                || err_msg.contains("verification failed"),
3862            "Error should mention depth/count mismatch: {err_msg}"
3863        );
3864    }
3865
3866    #[tokio::test]
3867    async fn test_merkle_underpayment_rejected() {
3868        let verifier = create_test_verifier();
3869        let (xorname, tagged_proof, pool_hash, ts) = make_valid_merkle_proof_bytes();
3870
3871        // Tree depth=2, so 2 paid nodes required. Candidates all quote price=1024.
3872        // Expected per-node: median(1024) * 2^2 / 2 = 2048.
3873        // Pay only 1 wei per node — far below the expected amount.
3874        {
3875            let info = evmlib::merkle_payments::OnChainPaymentInfo {
3876                depth: 2,
3877                merkle_payment_timestamp: ts,
3878                paid_node_addresses: vec![
3879                    (RewardsAddress::new([0u8; 20]), 0, Amount::from(1u64)),
3880                    (RewardsAddress::new([1u8; 20]), 1, Amount::from(1u64)),
3881                ],
3882            };
3883            verifier.pool_cache.lock().put(pool_hash, info);
3884        }
3885
3886        let result = verifier
3887            .verify_payment(
3888                &xorname,
3889                Some(&tagged_proof),
3890                VerificationContext::ClientPut,
3891            )
3892            .await;
3893
3894        assert!(
3895            result.is_err(),
3896            "Should reject merkle payment where paid amount < expected per-node amount"
3897        );
3898        let err_msg = format!("{}", result.expect_err("should fail"));
3899        assert!(
3900            err_msg.contains("Underpayment"),
3901            "Error should mention underpayment: {err_msg}"
3902        );
3903    }
3904
3905    // =========================================================================
3906    // Closeness-window constants regression tests
3907    //
3908    // These constants are load-bearing for both correctness (the storer
3909    // must look at the same window the client picks from, otherwise honest
3910    // pools are rejected) and DoS resistance (the timeout caps lookup
3911    // amplification per forged pool_hash). Pinning them with tests gives
3912    // future patches a one-line failure if either is silently changed
3913    // without updating the security argument in the doc comments.
3914    //
3915    // Empirical justification, captured during STG-01 investigation on
3916    // 2026-05-01:
3917    //
3918    //   - 60s timeout cut iterative lookups off after ~7 of 20 iterations
3919    //     (trace from EWR-3 ant-node-1 in CLOSENESS_LOOKUP_TIMEOUT doc).
3920    //   - K=16 storer window vs K=32 client over-query produced 73%
3921    //     false-positive mismatch rejections under realistic load
3922    //     (115 → 31 client mismatches per 5min after K=32 deploy).
3923    // =========================================================================
3924
3925    #[test]
3926    fn closeness_lookup_timeout_is_240s() {
3927        // Pin the timeout. If a future change drops it back to 60s the
3928        // failure mode from the trace in the doc comment will return.
3929        assert_eq!(
3930            PaymentVerifier::CLOSENESS_LOOKUP_TIMEOUT,
3931            std::time::Duration::from_secs(240),
3932            "CLOSENESS_LOOKUP_TIMEOUT must be 240s; if changing this, update \
3933             the iteration trace in the doc comment and re-validate on a \
3934             fresh testnet"
3935        );
3936    }
3937
3938    #[test]
3939    fn closeness_lookup_width_is_32() {
3940        // Pin the storer's lookup width. Must equal the client's
3941        // over-query factor (CANDIDATES_PER_POOL * 2 = 32) so the storer
3942        // sees the same peers the client legitimately picks from.
3943        assert_eq!(
3944            PaymentVerifier::CLOSENESS_LOOKUP_WIDTH,
3945            2 * evmlib::merkle_payments::CANDIDATES_PER_POOL,
3946            "CLOSENESS_LOOKUP_WIDTH must equal 2 * CANDIDATES_PER_POOL to \
3947             match the client's over-query in get_merkle_candidate_pool"
3948        );
3949    }
3950
3951    #[test]
3952    fn closeness_required_threshold_is_majority() {
3953        // Pin the threshold so a future change can't silently move it. This
3954        // is the security knob: a 9/16 majority tolerates closest-set
3955        // divergence between two nodes' views while still requiring most
3956        // candidates to be real peers the live DHT lists as closest.
3957        assert_eq!(
3958            PaymentVerifier::CANDIDATE_CLOSENESS_REQUIRED,
3959            9,
3960            "closeness threshold is a 9/16 majority"
3961        );
3962    }
3963
3964    #[test]
3965    fn closeness_lookup_count_uses_max_of_width_and_pool_len() {
3966        // The honest case: a 16-candidate pool must trigger a 32-peer
3967        // network lookup. This is the K=16-rejects-honest-pool fix from
3968        // the STG-01 investigation — without it, the storer never
3969        // observes the peers at network-true positions 17–32 that the
3970        // client legitimately picks from.
3971        let standard =
3972            PaymentVerifier::closeness_lookup_count(evmlib::merkle_payments::CANDIDATES_PER_POOL);
3973        assert_eq!(
3974            standard, 32,
3975            "honest 16-candidate pool must trigger a 32-peer DHT lookup"
3976        );
3977
3978        // Future-proof: if a protocol bump ever produces a pool larger
3979        // than CLOSENESS_LOOKUP_WIDTH, lookup_count must scale with the
3980        // pool — not truncate to WIDTH. Truncating would let an attacker
3981        // hide candidates by padding the pool past the storer's window.
3982        assert_eq!(
3983            PaymentVerifier::closeness_lookup_count(64),
3984            64,
3985            "lookup_count must scale up if pool exceeds CLOSENESS_LOOKUP_WIDTH"
3986        );
3987
3988        // Lower bound (also covered by the const-assert below; pin the
3989        // runtime path too in case the const-assert is ever removed).
3990        assert_eq!(
3991            PaymentVerifier::closeness_lookup_count(1),
3992            PaymentVerifier::CLOSENESS_LOOKUP_WIDTH,
3993            "lookup_count must never drop below CLOSENESS_LOOKUP_WIDTH"
3994        );
3995    }
3996
3997    // Compile-time invariant: the `closeness_lookup_count` formula relies
3998    // on WIDTH being ≥ CANDIDATES_PER_POOL so we never request fewer peers
3999    // than the pool itself contains.
4000    const _: () = assert!(
4001        PaymentVerifier::CLOSENESS_LOOKUP_WIDTH >= evmlib::merkle_payments::CANDIDATES_PER_POOL,
4002        "CLOSENESS_LOOKUP_WIDTH must be ≥ CANDIDATES_PER_POOL",
4003    );
4004
4005    // =========================================================================
4006    // Closeness-match logic tests
4007    //
4008    // These tests use the extracted `check_closeness_match` helper to
4009    // exercise the matching logic directly with synthetic peer-ID sets,
4010    // without standing up a real DHT. They cover:
4011    //
4012    //   - the 9/16 majority threshold (accept at exactly 9, reject below);
4013    //   - that a candidate counts only via exact membership in the storer's
4014    //     returned closest peers, so off-network fabrications are rejected;
4015    //   - the sparse-network short-circuit.
4016    //
4017    // Synthetic PeerIds put the tag in `bytes[0]`, so a candidate is in or
4018    // out of the network's returned set purely by tag value.
4019    // =========================================================================
4020
4021    /// Build a deterministic `PeerId` from a single byte tag.
4022    fn synthetic_peer_id(tag: u8) -> PeerId {
4023        let mut bytes = [0u8; 32];
4024        bytes[0] = tag;
4025        PeerId::from_bytes(bytes)
4026    }
4027
4028    /// Build a vector of synthetic `PeerId`s tagged with bytes 1..=n.
4029    fn synthetic_peer_ids(n: u8) -> Vec<PeerId> {
4030        (1..=n).map(synthetic_peer_id).collect()
4031    }
4032
4033    #[test]
4034    fn closeness_match_passes_when_all_16_candidates_in_top_16() {
4035        // Trivial case: every candidate is in the network's top-16.
4036        // Asserts the happy path still works after the refactor.
4037        let candidates = synthetic_peer_ids(16);
4038        let network = synthetic_peer_ids(16);
4039        let pool_address = [0u8; 32];
4040        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
4041        assert!(result.is_ok(), "all-in-top-16 pool must pass: {result:?}");
4042    }
4043
4044    #[test]
4045    fn closeness_match_passes_when_candidates_span_positions_1_to_15_and_17() {
4046        // The client's pool contains 16 candidates, 15 at network-true
4047        // positions 1..=15 plus one at position 17 (the position-16 peer was
4048        // unresponsive when the client over-queried). Under K=32 all 16 are
4049        // exact matches, comfortably ≥ the 9/16 majority.
4050        let candidates = synthetic_peer_ids(15)
4051            .into_iter()
4052            .chain(std::iter::once(synthetic_peer_id(17)))
4053            .collect::<Vec<_>>();
4054        // Lookup window = 32, includes position 17.
4055        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
4056        let pool_address = [0u8; 32];
4057        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
4058        assert!(
4059            result.is_ok(),
4060            "pool with one candidate at position 17 must pass: {result:?}"
4061        );
4062    }
4063
4064    #[test]
4065    fn closeness_match_accepts_honest_skew_via_exact_matches() {
4066        // Honest skew: the client's 16 candidates span network-true positions
4067        // {1..=12, 17, 19, 21, 23}. The lookup window of 32 covers all of
4068        // them, so all 16 are exact matches — trivially ≥ the 9/16 majority.
4069        let candidates: Vec<PeerId> = (1..=12u8)
4070            .chain([17u8, 19, 21, 23])
4071            .map(synthetic_peer_id)
4072            .collect();
4073        let pool_address = [0u8; 32];
4074        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
4075
4076        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
4077        assert!(
4078            result.is_ok(),
4079            "honest pool fully inside the lookup window must pass: {result:?}"
4080        );
4081    }
4082
4083    #[test]
4084    fn closeness_match_rejects_forged_pool() {
4085        // Security floor: a fully-forged pool whose candidate PeerIds are
4086        // disjoint from the network's returned closest peers must be
4087        // rejected. The lowered majority threshold must NOT let off-network
4088        // fabrications pass — every counted candidate has to be a peer the
4089        // live DHT actually returned.
4090        let forged_candidates: Vec<PeerId> = (100..=115).map(synthetic_peer_id).collect();
4091        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
4092        let pool_address = [0u8; 32];
4093
4094        let result =
4095            PaymentVerifier::check_closeness_match(&forged_candidates, &network, &pool_address);
4096        match result {
4097            Err(Error::Payment(msg)) => {
4098                assert!(
4099                    msg.contains("candidate pub_keys do not match"),
4100                    "expected forged-pool rejection message, got: {msg}"
4101                );
4102            }
4103            other => {
4104                panic!("forged pool disjoint from the network set must be rejected: {other:?}")
4105            }
4106        }
4107    }
4108
4109    #[test]
4110    fn closeness_match_rejects_pool_below_majority() {
4111        // Threshold sanity: 8 candidates are exact matches (tags 1..=8) and
4112        // the other 8 are off-network fabrications (tags 100..=107). 8 < 9
4113        // → reject.
4114        let mut candidates = synthetic_peer_ids(8);
4115        candidates.extend((100..=107).map(synthetic_peer_id)); // 8 fabrications
4116        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
4117        let pool_address = [0u8; 32];
4118
4119        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
4120        assert!(
4121            result.is_err(),
4122            "8 matches < majority of 9/16 must reject: {result:?}"
4123        );
4124    }
4125
4126    #[test]
4127    fn closeness_match_accepts_at_exactly_majority() {
4128        // Threshold sanity: exactly 9 candidates are exact matches (tags
4129        // 1..=9), the other 7 are off-network fabrications (tags 100..=106).
4130        // 9 ≥ 9 → accept.
4131        let mut candidates = synthetic_peer_ids(9);
4132        candidates.extend((100..=106).map(synthetic_peer_id)); // 7 fabrications
4133        let network: Vec<PeerId> = (1..=32).map(synthetic_peer_id).collect();
4134        let pool_address = [0u8; 32];
4135
4136        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
4137        assert!(
4138            result.is_ok(),
4139            "9/16 ≥ majority threshold must accept: {result:?}"
4140        );
4141    }
4142
4143    #[test]
4144    fn closeness_match_returns_sparse_dht_error_when_lookup_too_small() {
4145        // The sparse-DHT short-circuit fires when the lookup returned
4146        // fewer peers than the threshold itself — even an all-matching
4147        // candidate set can't pass because the storer doesn't have an
4148        // authoritative view to compare against.
4149        let candidates = synthetic_peer_ids(16);
4150        let network = synthetic_peer_ids(8); // < CANDIDATE_CLOSENESS_REQUIRED (9)
4151        let pool_address = [0u8; 32];
4152
4153        let result = PaymentVerifier::check_closeness_match(&candidates, &network, &pool_address);
4154        match result {
4155            Err(Error::Payment(msg)) => {
4156                assert!(
4157                    msg.contains("authoritative DHT lookup returned only 8"),
4158                    "expected sparse-DHT error message, got: {msg}"
4159                );
4160            }
4161            other => panic!("expected sparse-DHT rejection, got: {other:?}"),
4162        }
4163    }
4164}