Skip to main content

ant_node/storage/
handler.rs

1//! ANT protocol handler for autonomi protocol messages.
2//!
3//! This handler processes chunk PUT/GET requests with optional payment verification,
4//! storing chunks to LMDB and using the DHT for network-wide retrieval.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────┐
10//! │                    AntProtocol                        │
11//! ├─────────────────────────────────────────────────────────┤
12//! │  protocol_id() = "autonomi.ant.chunk.v1"                  │
13//! │                                                         │
14//! │  try_handle_request(data) ──▶ decode ChunkMessage  │
15//! │                                   │                     │
16//! │         ┌─────────────────────────┼─────────────────┐  │
17//! │         ▼                         ▼                 ▼  │
18//! │   ChunkQuoteRequest           ChunkPutRequest    ChunkGetRequest
19//! │         │                         │                 │  │
20//! │         ▼                         ▼                 ▼  │
21//! │   QuoteGenerator          PaymentVerifier    LmdbStorage│
22//! │         │                         │                 │  │
23//! │         └─────────────────────────┴─────────────────┘  │
24//! │                           │                             │
25//! │           return Ok(Some(response_bytes))              │
26//! │           return Ok(None) for response messages       │
27//! └─────────────────────────────────────────────────────────┘
28//! ```
29
30#[cfg(test)]
31use crate::ant_protocol::DATA_TYPE_CHUNK;
32use crate::ant_protocol::{
33    ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
34    ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
35    MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE,
36};
37use crate::client::compute_address;
38use crate::error::{Error, Result};
39use crate::logging::{debug, info, warn};
40use crate::payment::{PaymentVerifier, QuoteGenerator, VerificationContext};
41use crate::replication::admission;
42use crate::replication::config::K_BUCKET_SIZE;
43use crate::replication::fresh::FreshWriteEvent;
44use crate::storage::lmdb::LmdbStorage;
45use bytes::Bytes;
46use parking_lot::RwLock;
47use saorsa_core::P2PNode;
48use std::sync::Arc;
49use tokio::sync::mpsc;
50
51/// Width of the self-closeness gate on client PUTs (ADR-0003): a node accepts
52/// a PUT only when it is within its own local `SELF_CLOSENESS_GATE_WIDTH`
53/// closest peers to the address.
54///
55/// Set to the client's PUT fallback ceiling (`K_BUCKET_SIZE`), wider than the
56/// storage-admission width, so a client routing past full close-group members
57/// onto further peers (ADR-0002) is still accepted here, while a genuinely far
58/// node — which could only mis-attribute fresh-replication failures — is
59/// turned away.
60const SELF_CLOSENESS_GATE_WIDTH: usize = K_BUCKET_SIZE;
61
62/// ANT protocol handler.
63///
64/// Handles chunk PUT/GET/Quote requests using LMDB storage for persistence
65/// and optional payment verification.
66pub struct AntProtocol {
67    /// LMDB storage for chunk persistence.
68    storage: Arc<LmdbStorage>,
69    /// Payment verifier for checking payments.
70    payment_verifier: Arc<PaymentVerifier>,
71    /// Quote generator for creating storage quotes.
72    /// Also handles merkle candidate quote signing via ML-DSA-65.
73    quote_generator: Arc<QuoteGenerator>,
74    /// Channel for notifying the replication engine about newly-stored chunks.
75    fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
76    /// The node's P2P handle, attached post-construction via
77    /// `attach_p2p_node`. Drives the self-closeness gate on client PUTs;
78    /// `None` in unit tests that never attach a node.
79    p2p_node: RwLock<Option<Arc<P2PNode>>>,
80}
81
82impl AntProtocol {
83    /// Create a new ANT protocol handler.
84    ///
85    /// # Arguments
86    ///
87    /// * `storage` - LMDB storage for chunk persistence
88    /// * `payment_verifier` - Payment verifier for validating payments
89    /// * `quote_generator` - Quote generator for creating storage quotes
90    #[must_use]
91    pub fn new(
92        storage: Arc<LmdbStorage>,
93        payment_verifier: Arc<PaymentVerifier>,
94        quote_generator: Arc<QuoteGenerator>,
95    ) -> Self {
96        // Keep the PaymentVerifier's paid-quote price floor and the
97        // QuoteGenerator's pricing wired to the same authoritative store used
98        // by this protocol handler. Both must read the same record count: the
99        // generator prices quotes from current_chunks(), and the verifier later
100        // checks the paid median quote against current_chunks(). Attaching both
101        // here makes the invariant automatic for every AntProtocol
102        // construction path, including tests and future startup variants.
103        payment_verifier.attach_storage(Arc::clone(&storage));
104        quote_generator.attach_storage(Arc::clone(&storage));
105
106        Self {
107            storage,
108            payment_verifier,
109            quote_generator,
110            fresh_write_tx: None,
111            p2p_node: RwLock::new(None),
112        }
113    }
114
115    /// Attach the node's P2P handle for payment live-DHT checks.
116    ///
117    /// Wires the handle into the payment verifier so payment-proof closeness
118    /// checks can use the live routing view. Idempotent: calling twice
119    /// replaces the verifier handle.
120    pub fn attach_p2p_node(&self, node: Arc<P2PNode>) {
121        *self.p2p_node.write() = Some(Arc::clone(&node));
122        self.payment_verifier.attach_p2p_node(node);
123        debug!("AntProtocol: P2PNode attached for payment live-DHT checks and self-closeness gate");
124    }
125
126    /// Set the channel sender for fresh-write replication events.
127    ///
128    /// When set, successful chunk PUTs will notify the replication engine
129    /// so it can fan out fresh offers to the close group.
130    pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
131        self.fresh_write_tx = Some(tx);
132    }
133
134    /// Get the protocol identifier.
135    #[must_use]
136    pub fn protocol_id(&self) -> &'static str {
137        CHUNK_PROTOCOL_ID
138    }
139
140    /// Get a reference to the underlying LMDB storage.
141    #[must_use]
142    pub fn storage(&self) -> Arc<LmdbStorage> {
143        Arc::clone(&self.storage)
144    }
145
146    /// Test-only: the record count the quote generator currently prices on.
147    /// Used to assert that quote-time resync tracks records actually held.
148    #[cfg(test)]
149    #[must_use]
150    pub(crate) fn priced_records_stored(&self) -> usize {
151        self.quote_generator.records_stored()
152    }
153
154    /// Get a shared reference to the payment verifier.
155    #[must_use]
156    pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
157        Arc::clone(&self.payment_verifier)
158    }
159
160    /// Handle an incoming request and produce a response.
161    ///
162    /// Decodes the raw message, processes it if it is a request variant,
163    /// and returns the encoded response bytes.  Returns `Ok(None)` for
164    /// response messages (which are meant for client subscribers, not for
165    /// the protocol handler).
166    ///
167    /// # Errors
168    ///
169    /// Returns an error if message decoding, handling, or encoding fails.
170    pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
171        let message = ChunkMessage::decode(data)
172            .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
173
174        let request_id = message.request_id;
175
176        let response_body = match message.body {
177            ChunkMessageBody::PutRequest(req) => {
178                ChunkMessageBody::PutResponse(self.handle_put(req).await)
179            }
180            ChunkMessageBody::GetRequest(req) => {
181                ChunkMessageBody::GetResponse(self.handle_get(req).await)
182            }
183            ChunkMessageBody::QuoteRequest(ref req) => {
184                ChunkMessageBody::QuoteResponse(self.handle_quote(req))
185            }
186            ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
187                ChunkMessageBody::MerkleCandidateQuoteResponse(
188                    self.handle_merkle_candidate_quote(req),
189                )
190            }
191            // Anything else — response messages are handled by client
192            // subscribers (e.g. send_and_await_chunk_response), not by the
193            // protocol handler. Returning None prevents the caller from
194            // sending a reply, which would create an infinite ping-pong
195            // loop.
196            //
197            // `ChunkMessageBody` is `#[non_exhaustive]` in ant-protocol, so
198            // a future wire variant added on a protocol minor bump also
199            // lands here and is dropped. The CHUNK_PROTOCOL_ID multistream-
200            // select handshake version-gates peers, so this arm should
201            // only be reached by a misconfigured peer.
202            _ => return Ok(None),
203        };
204
205        let response = ChunkMessage {
206            request_id,
207            body: response_body,
208        };
209
210        response
211            .encode()
212            .map(|b| Some(Bytes::from(b)))
213            .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
214    }
215
216    /// Handle a PUT request.
217    ///
218    /// Wraps `handle_put_inner` to emit a single structured tracing event per
219    /// PUT RPC at every exit path, including early-return validation paths.
220    /// The event uses `target: "ant_node::storage::rpc_latency"` so that
221    /// Elasticsearch / Kibana can build p50/p95/p99 store-RPC latency
222    /// histograms from the existing telegraf log forwarding.
223    async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
224        let start = std::time::Instant::now();
225        let addr_hex = hex::encode(request.address);
226        let chunk_size = request.content.len();
227        let response = self.handle_put_inner(request).await;
228        let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
229        let outcome: &'static str = match &response {
230            ChunkPutResponse::Success { .. } => "success",
231            ChunkPutResponse::AlreadyExists { .. } => "already_exists",
232            ChunkPutResponse::PaymentRequired { .. } => "payment_required",
233            ChunkPutResponse::Error(_) => "error",
234            _ => "unknown",
235        };
236        info!(
237            target: "ant_node::storage::rpc_latency",
238            duration_ms,
239            chunk_size,
240            outcome,
241            addr = %addr_hex,
242            "put_rpc"
243        );
244        response
245    }
246
247    /// Inner body of `handle_put` — see the wrapper for the per-RPC latency log.
248    async fn handle_put_inner(&self, request: ChunkPutRequest) -> ChunkPutResponse {
249        let address = request.address;
250        let addr_hex = hex::encode(address);
251        debug!("Handling PUT request for {addr_hex}");
252
253        // 1. Validate chunk size
254        if request.content.len() > MAX_CHUNK_SIZE {
255            return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
256                size: request.content.len(),
257                max_size: MAX_CHUNK_SIZE,
258            });
259        }
260
261        // 2. Verify content address matches BLAKE3(content)
262        let computed = compute_address(&request.content);
263        if computed != address {
264            return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
265                expected: address,
266                actual: computed,
267            });
268        }
269
270        // 3. Check if already exists (idempotent success)
271        match self.storage.exists(&address) {
272            Ok(true) => {
273                debug!("Chunk {addr_hex} already exists");
274                return ChunkPutResponse::AlreadyExists { address };
275            }
276            Err(e) => {
277                return ChunkPutResponse::Error(ProtocolError::Internal(format!(
278                    "Storage read failed: {e}"
279                )));
280            }
281            Ok(false) => {}
282        }
283
284        // 4. Cheap disk-space pre-check — runs BEFORE the expensive payment
285        //    verification path (ML-DSA pool checks, a Kademlia closeness
286        //    lookup, and an on-chain Arbitrum RPC). A disk-full node can never
287        //    satisfy this PUT, so reject it here rather than burning that work
288        //    only to fail the reserve check inside `storage.put` (V2-411). The
289        //    check caches passing results, so it is free per-PUT on a healthy
290        //    node; a disk-full node re-runs a cheap `available_space` syscall
291        //    each PUT (still negligible next to the verification it avoids) and
292        //    so detects freed space promptly. The store path keeps its own
293        //    check as defence-in-depth.
294        if let Err(e) = self.storage.check_capacity() {
295            info!(
296                target: "ant_node::storage::disk_precheck",
297                addr = %addr_hex,
298                "Rejecting PUT before payment verification: {e}"
299            );
300            return ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()));
301        }
302
303        // Self-closeness gate (ADR-0003): accept a client PUT only when this
304        // node is within its own local closest view of the address, so the
305        // fresh replication it triggers is legitimate and cannot mis-penalise
306        // honest peers. The width is the client's PUT fallback ceiling
307        // (`SELF_CLOSENESS_GATE_WIDTH`), so a client routing past full
308        // close-group members onto further peers is still accepted here.
309        // Skipped when no P2P handle is attached (unit tests). Bind the handle
310        // out of the lock first so no guard is held across the `.await`.
311        let attached = self.p2p_node.read().as_ref().map(Arc::clone);
312        if let Some(p2p) = attached {
313            let self_id = *p2p.peer_id();
314            if !admission::is_responsible(&self_id, &address, &p2p, SELF_CLOSENESS_GATE_WIDTH).await
315            {
316                debug!("Rejecting PUT for {addr_hex}: not within local closest peers");
317                return ChunkPutResponse::Error(ProtocolError::StorageFailed(
318                    "node is not within its local closest peers for this address".to_string(),
319                ));
320            }
321        }
322
323        // 5. Verify payment. The ClientPut context applies the store-strength
324        //    payment cache and verifies live proofs.
325        let payment_result = self
326            .payment_verifier
327            .verify_payment(
328                &address,
329                request.payment_proof.as_deref(),
330                VerificationContext::ClientPut,
331            )
332            .await;
333
334        match payment_result {
335            Ok(status) if status.can_store() => {
336                // Payment verified or cached
337            }
338            Ok(_) => {
339                return ChunkPutResponse::PaymentRequired {
340                    message: "Payment required for new chunk".to_string(),
341                };
342            }
343            Err(e) => {
344                return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
345            }
346        }
347
348        // 6. Store chunk
349        match self.storage.put(&address, &request.content).await {
350            Ok(_) => {
351                let content_len = request.content.len();
352                info!("Stored chunk {addr_hex} ({content_len} bytes)");
353                // Bump the in-memory fallback counter. Both pricing and the
354                // paid-quote floor now read LmdbStorage::current_chunks() directly,
355                // so this counter only matters when no storage is attached
356                // (unit tests / mis-configured startup). Kept warm so that
357                // fallback path stays roughly accurate.
358                self.quote_generator.record_store();
359
360                // 7. Notify replication engine for fresh fan-out.
361                //    Only emit when a real proof is present — cached-as-verified
362                //    PUTs have no proof to forward, and the chunk would have
363                //    already replicated on the original write that carried one.
364                if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
365                    // `request.content` is now `bytes::Bytes`; FreshWriteEvent
366                    // still carries the chunk as `Vec<u8>` for compatibility
367                    // with the replication wire format, so materialise once
368                    // here. Done only on the success path, where storage has
369                    // already accepted the chunk.
370                    let event = FreshWriteEvent {
371                        key: address,
372                        data: request.content.to_vec(),
373                        payment_proof: proof,
374                    };
375                    if tx.send(event).is_err() {
376                        debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
377                    }
378                }
379
380                ChunkPutResponse::Success { address }
381            }
382            Err(e) => {
383                warn!("Failed to store chunk {addr_hex}: {e}");
384                ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
385            }
386        }
387    }
388
389    /// Handle a GET request.
390    ///
391    /// Wraps `handle_get_inner` to emit a single structured tracing event per
392    /// GET RPC at every exit path. See `handle_put` for the rationale.
393    async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
394        let start = std::time::Instant::now();
395        let addr_hex = hex::encode(request.address);
396        let response = self.handle_get_inner(request).await;
397        let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
398        let outcome: &'static str = match &response {
399            ChunkGetResponse::Success { .. } => "success",
400            ChunkGetResponse::NotFound { .. } => "not_found",
401            ChunkGetResponse::Error(_) => "error",
402            _ => "unknown",
403        };
404        info!(
405            target: "ant_node::storage::rpc_latency",
406            duration_ms,
407            outcome,
408            addr = %addr_hex,
409            "get_rpc"
410        );
411        response
412    }
413
414    /// Inner body of `handle_get` — see the wrapper for the per-RPC latency log.
415    async fn handle_get_inner(&self, request: ChunkGetRequest) -> ChunkGetResponse {
416        let address = request.address;
417        let addr_hex = hex::encode(address);
418        debug!("Handling GET request for {addr_hex}");
419
420        match self.storage.get(&address).await {
421            Ok(Some(content)) => {
422                let content_len = content.len();
423                debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
424                ChunkGetResponse::Success { address, content }
425            }
426            Ok(None) => {
427                debug!("Chunk {addr_hex} not found");
428                ChunkGetResponse::NotFound { address }
429            }
430            Err(e) => {
431                warn!("Failed to retrieve chunk {addr_hex}: {e}");
432                ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
433            }
434        }
435    }
436
437    /// Resync the quoting metric to the authoritative count of records the node
438    /// actually holds.
439    ///
440    /// The quote price is driven by `QuoteGenerator::records_stored()`. Reading
441    /// the live LMDB entry count (an O(1) B-tree page-header read) right before
442    /// pricing makes the metric deletion-aware: any chunk removed by
443    /// [`LmdbStorage::delete`] or by the replication prune pass is reflected
444    /// immediately, with no risk of missing a delete path.
445    ///
446    /// On a storage read error — or a count that does not fit `usize` — the
447    /// previous metric value is left untouched so a transient LMDB error never
448    /// disrupts quote generation.
449    fn resync_quote_metric(&self) {
450        match self.storage.current_chunks() {
451            // Saturating an overflowing count to usize::MAX would jump the
452            // metric to the maximum possible price driver; keep the previous
453            // value instead, as for a read error.
454            Ok(count) => usize::try_from(count).map_or_else(
455                |_| {
456                    warn!(
457                        "current_chunks() count {count} overflows usize; keeping previous quote \
458                         metric"
459                    );
460                },
461                |records| self.quote_generator.resync_records(records),
462            ),
463            Err(e) => {
464                warn!("Failed to read current_chunks() for quote metric resync: {e}");
465            }
466        }
467    }
468
469    /// Handle a quote request.
470    fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
471        let addr_hex = hex::encode(request.address);
472        let data_size = request.data_size;
473        debug!("Handling quote request for {addr_hex} (size: {data_size})");
474
475        // Price on records ACTUALLY HELD, not a monotonic store counter.
476        self.resync_quote_metric();
477
478        // Check if the chunk is already stored so we can tell the client
479        // to skip payment (already_stored = true).
480        // The match intentionally logs the error when the `logging` feature is
481        // active. Clippy suggests `unwrap_or_default()` when logging is compiled
482        // out, but keeping the explicit match preserves the diagnostic intent.
483        #[allow(clippy::manual_unwrap_or_default)]
484        let already_stored = match self.storage.exists(&request.address) {
485            Ok(exists) => exists,
486            Err(e) => {
487                warn!("Storage check failed for {addr_hex}: {e}");
488                false // Assume not stored on error — generate a normal quote.
489            }
490        };
491
492        if already_stored {
493            debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
494        }
495
496        // Validate data size - data_size is u64, cast carefully and reject overflow
497        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
498            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
499                size: MAX_CHUNK_SIZE + 1,
500                max_size: MAX_CHUNK_SIZE,
501            });
502        };
503        if data_size_usize > MAX_CHUNK_SIZE {
504            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
505                size: data_size_usize,
506                max_size: MAX_CHUNK_SIZE,
507            });
508        }
509
510        match self
511            .quote_generator
512            .create_quote(request.address, data_size_usize, request.data_type)
513        {
514            Ok(quote) => {
515                // Serialize the quote
516                match rmp_serde::to_vec(&quote) {
517                    Ok(quote_bytes) => ChunkQuoteResponse::Success {
518                        quote: quote_bytes,
519                        already_stored,
520                    },
521                    Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
522                        "Failed to serialize quote: {e}"
523                    ))),
524                }
525            }
526            Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
527        }
528    }
529
530    /// Handle a merkle candidate quote request.
531    fn handle_merkle_candidate_quote(
532        &self,
533        request: &MerkleCandidateQuoteRequest,
534    ) -> MerkleCandidateQuoteResponse {
535        let addr_hex = hex::encode(request.address);
536        let data_size = request.data_size;
537        debug!(
538            "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
539            request.merkle_payment_timestamp
540        );
541
542        // Price on records ACTUALLY HELD, not a monotonic store counter.
543        self.resync_quote_metric();
544
545        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
546            return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
547                "data_size {} overflows usize",
548                request.data_size
549            )));
550        };
551        if data_size_usize > MAX_CHUNK_SIZE {
552            return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
553                size: data_size_usize,
554                max_size: MAX_CHUNK_SIZE,
555            });
556        }
557
558        match self.quote_generator.create_merkle_candidate_quote(
559            data_size_usize,
560            request.data_type,
561            request.merkle_payment_timestamp,
562        ) {
563            Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
564                Ok(bytes) => MerkleCandidateQuoteResponse::Success {
565                    candidate_node: bytes,
566                },
567                Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
568                    "Failed to serialize merkle candidate node: {e}"
569                ))),
570            },
571            Err(e) => {
572                MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
573            }
574        }
575    }
576
577    /// Get storage statistics.
578    #[must_use]
579    pub fn storage_stats(&self) -> crate::storage::StorageStats {
580        self.storage.stats()
581    }
582
583    /// Get payment cache statistics.
584    #[must_use]
585    pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
586        self.payment_verifier.cache_stats()
587    }
588
589    /// Get a reference to the payment verifier.
590    ///
591    /// Exposed for **test harnesses only** — production code should not call
592    /// this directly. Use `cache_insert()` on the returned verifier to
593    /// pre-populate the payment cache in test setups.
594    #[cfg(any(test, feature = "test-utils"))]
595    #[must_use]
596    pub fn payment_verifier(&self) -> &PaymentVerifier {
597        &self.payment_verifier
598    }
599
600    /// Check if a chunk exists locally.
601    ///
602    /// # Errors
603    ///
604    /// Returns an error if the storage read fails.
605    pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
606        self.storage.exists(address)
607    }
608
609    /// Get a chunk directly from local storage.
610    ///
611    /// # Errors
612    ///
613    /// Returns an error if storage access fails.
614    pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
615        self.storage.get(address).await
616    }
617
618    /// Store a chunk directly to local storage (bypasses payment verification).
619    ///
620    /// TEST ONLY - This method bypasses payment verification and should only be used in tests.
621    ///
622    /// # Errors
623    ///
624    /// Returns an error if storage fails or content doesn't match address.
625    #[cfg(test)]
626    pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
627        self.storage.put(address, content).await
628    }
629}
630
631#[cfg(test)]
632#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
633mod tests {
634    use super::*;
635    use crate::payment::metrics::QuotingMetricsTracker;
636    use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
637    use crate::storage::LmdbStorageConfig;
638    use evmlib::RewardsAddress;
639    use saorsa_core::identity::NodeIdentity;
640    use saorsa_core::MlDsa65;
641    use saorsa_pqc::pqc::types::MlDsaSecretKey;
642    use tempfile::TempDir;
643
644    async fn create_test_protocol() -> (AntProtocol, TempDir) {
645        // `test_default()` sets `disk_reserve: 0`, so the disk pre-check always
646        // passes for the regular tests.
647        create_test_protocol_with_reserve(0).await
648    }
649
650    /// Build a test protocol whose storage enforces the given disk reserve.
651    ///
652    /// A very large reserve (e.g. `u64::MAX`) makes `available < reserve`
653    /// always true, so the disk-space pre-check in `handle_put_inner` fails —
654    /// used to exercise the V2-411 early-return path.
655    async fn create_test_protocol_with_reserve(disk_reserve: u64) -> (AntProtocol, TempDir) {
656        let temp_dir = TempDir::new().expect("create temp dir");
657
658        let storage_config = LmdbStorageConfig {
659            root_dir: temp_dir.path().to_path_buf(),
660            disk_reserve,
661            ..LmdbStorageConfig::test_default()
662        };
663        let storage = Arc::new(
664            LmdbStorage::new(storage_config)
665                .await
666                .expect("create storage"),
667        );
668
669        let rewards_address = RewardsAddress::new([1u8; 20]);
670        let payment_config = PaymentVerifierConfig {
671            evm: EvmVerifierConfig::default(),
672            cache_capacity: 100_000,
673            close_group_size: crate::ant_protocol::CLOSE_GROUP_SIZE,
674            local_rewards_address: rewards_address,
675        };
676        let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
677        let metrics_tracker = QuotingMetricsTracker::new(100);
678        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
679
680        // Wire ML-DSA-65 signing so quote requests succeed
681        let identity = NodeIdentity::generate().expect("generate identity");
682        let pub_key_bytes = identity.public_key().as_bytes().to_vec();
683        let sk_bytes = identity.secret_key_bytes().to_vec();
684        let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
685        quote_generator.set_signer(pub_key_bytes, move |msg| {
686            use saorsa_pqc::pqc::MlDsaOperations;
687            let ml_dsa = MlDsa65::new();
688            ml_dsa
689                .sign(&sk, msg)
690                .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
691        });
692
693        let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
694        (protocol, temp_dir)
695    }
696
697    #[tokio::test]
698    async fn test_put_and_get_chunk() {
699        let (protocol, _temp) = create_test_protocol().await;
700
701        let content = b"hello world";
702        let address = LmdbStorage::compute_address(content);
703
704        // Pre-populate payment cache so EVM verification is bypassed
705        protocol.payment_verifier().cache_insert(address);
706
707        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
708        let put_msg = ChunkMessage {
709            request_id: 1,
710            body: ChunkMessageBody::PutRequest(put_request),
711        };
712        let put_bytes = put_msg.encode().expect("encode put");
713
714        // Handle PUT
715        let response_bytes = protocol
716            .try_handle_request(&put_bytes)
717            .await
718            .expect("handle put")
719            .expect("expected response");
720        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
721
722        assert_eq!(response.request_id, 1);
723        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
724            response.body
725        {
726            assert_eq!(addr, address);
727        } else {
728            panic!("expected PutResponse::Success, got: {response:?}");
729        }
730
731        // Create GET request
732        let get_request = ChunkGetRequest::new(address);
733        let get_msg = ChunkMessage {
734            request_id: 2,
735            body: ChunkMessageBody::GetRequest(get_request),
736        };
737        let get_bytes = get_msg.encode().expect("encode get");
738
739        // Handle GET
740        let response_bytes = protocol
741            .try_handle_request(&get_bytes)
742            .await
743            .expect("handle get")
744            .expect("expected response");
745        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
746
747        assert_eq!(response.request_id, 2);
748        if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
749            address: addr,
750            content: data,
751        }) = response.body
752        {
753            assert_eq!(addr, address);
754            assert_eq!(data, content.to_vec());
755        } else {
756            panic!("expected GetResponse::Success");
757        }
758    }
759
760    #[tokio::test]
761    async fn test_get_not_found() {
762        let (protocol, _temp) = create_test_protocol().await;
763
764        let address = [0xAB; 32];
765        let get_request = ChunkGetRequest::new(address);
766        let get_msg = ChunkMessage {
767            request_id: 10,
768            body: ChunkMessageBody::GetRequest(get_request),
769        };
770        let get_bytes = get_msg.encode().expect("encode get");
771
772        let response_bytes = protocol
773            .try_handle_request(&get_bytes)
774            .await
775            .expect("handle get")
776            .expect("expected response");
777        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
778
779        assert_eq!(response.request_id, 10);
780        if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
781            response.body
782        {
783            assert_eq!(addr, address);
784        } else {
785            panic!("expected GetResponse::NotFound");
786        }
787    }
788
789    #[tokio::test]
790    async fn test_put_address_mismatch() {
791        let (protocol, _temp) = create_test_protocol().await;
792
793        let content = b"test content";
794        let wrong_address = [0xFF; 32]; // Wrong address
795
796        // Pre-populate cache for the wrong address so we test address mismatch, not payment
797        protocol.payment_verifier().cache_insert(wrong_address);
798
799        let put_request = ChunkPutRequest::new(wrong_address, Bytes::copy_from_slice(content));
800        let put_msg = ChunkMessage {
801            request_id: 20,
802            body: ChunkMessageBody::PutRequest(put_request),
803        };
804        let put_bytes = put_msg.encode().expect("encode put");
805
806        let response_bytes = protocol
807            .try_handle_request(&put_bytes)
808            .await
809            .expect("handle put")
810            .expect("expected response");
811        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
812
813        assert_eq!(response.request_id, 20);
814        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
815            ProtocolError::AddressMismatch { .. },
816        )) = response.body
817        {
818            // Expected
819        } else {
820            panic!("expected AddressMismatch error, got: {response:?}");
821        }
822    }
823
824    #[tokio::test]
825    async fn test_put_chunk_too_large() {
826        let (protocol, _temp) = create_test_protocol().await;
827
828        // Create oversized content
829        let content = vec![0u8; MAX_CHUNK_SIZE + 1];
830        let address = LmdbStorage::compute_address(&content);
831
832        let put_request = ChunkPutRequest::new(address, Bytes::from(content));
833        let put_msg = ChunkMessage {
834            request_id: 30,
835            body: ChunkMessageBody::PutRequest(put_request),
836        };
837        let put_bytes = put_msg.encode().expect("encode put");
838
839        let response_bytes = protocol
840            .try_handle_request(&put_bytes)
841            .await
842            .expect("handle put")
843            .expect("expected response");
844        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
845
846        assert_eq!(response.request_id, 30);
847        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
848            ProtocolError::ChunkTooLarge { .. },
849        )) = response.body
850        {
851            // Expected
852        } else {
853            panic!("expected ChunkTooLarge error");
854        }
855    }
856
857    /// V2-411: a disk-full node must reject a PUT with the disk-space error
858    /// *before* running payment verification.
859    ///
860    /// The chunk is intentionally **not** cache-inserted, so if the handler
861    /// reached `verify_payment` it would return `PaymentRequired`/`PaymentFailed`
862    /// (an uncached chunk with no proof). Observing the `StorageFailed` disk
863    /// error instead proves the disk pre-check short-circuited ahead of
864    /// verification — there is no on-chain path to reach.
865    #[tokio::test]
866    async fn test_put_rejected_on_insufficient_disk_before_verification() {
867        // u64::MAX reserve guarantees `available < reserve`, so the cached
868        // disk-space check always fails.
869        let (protocol, _temp) = create_test_protocol_with_reserve(u64::MAX).await;
870
871        let content = b"chunk for a disk-full node";
872        let address = LmdbStorage::compute_address(content);
873
874        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
875        let put_msg = ChunkMessage {
876            request_id: 41,
877            body: ChunkMessageBody::PutRequest(put_request),
878        };
879        let put_bytes = put_msg.encode().expect("encode put");
880
881        let response_bytes = protocol
882            .try_handle_request(&put_bytes)
883            .await
884            .expect("handle put")
885            .expect("expected response");
886        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
887
888        assert_eq!(response.request_id, 41);
889        match response.body {
890            ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
891                ProtocolError::StorageFailed(msg),
892            )) => {
893                assert!(
894                    msg.contains("Insufficient disk space"),
895                    "expected disk-space error, got: {msg}"
896                );
897            }
898            other => {
899                panic!("expected StorageFailed disk error before verification, got: {other:?}")
900            }
901        }
902
903        // And nothing was stored.
904        assert!(!protocol.exists(&address).expect("exists check"));
905    }
906
907    #[tokio::test]
908    async fn test_put_already_exists() {
909        let (protocol, _temp) = create_test_protocol().await;
910
911        let content = b"duplicate content";
912        let address = LmdbStorage::compute_address(content);
913
914        // Pre-populate cache so EVM verification is bypassed
915        protocol.payment_verifier().cache_insert(address);
916
917        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
918        let put_msg = ChunkMessage {
919            request_id: 40,
920            body: ChunkMessageBody::PutRequest(put_request),
921        };
922        let put_bytes = put_msg.encode().expect("encode put");
923
924        let _ = protocol
925            .try_handle_request(&put_bytes)
926            .await
927            .expect("handle put");
928
929        // Store again - should return AlreadyExists
930        let response_bytes = protocol
931            .try_handle_request(&put_bytes)
932            .await
933            .expect("handle put 2")
934            .expect("expected response");
935        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
936
937        assert_eq!(response.request_id, 40);
938        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
939            response.body
940        {
941            assert_eq!(addr, address);
942        } else {
943            panic!("expected AlreadyExists");
944        }
945    }
946
947    #[tokio::test]
948    async fn test_protocol_id() {
949        let (protocol, _temp) = create_test_protocol().await;
950        assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
951    }
952
953    #[tokio::test]
954    async fn test_exists_and_local_access() {
955        let (protocol, _temp) = create_test_protocol().await;
956
957        let content = b"local access test";
958        let address = LmdbStorage::compute_address(content);
959
960        assert!(!protocol.exists(&address).expect("exists check"));
961
962        protocol
963            .put_local(&address, content)
964            .await
965            .expect("put local");
966
967        assert!(protocol.exists(&address).expect("exists check"));
968
969        let retrieved = protocol.get_local(&address).await.expect("get local");
970        assert_eq!(retrieved, Some(content.to_vec()));
971    }
972
973    #[tokio::test]
974    async fn test_cache_insert_is_visible() {
975        let (protocol, _temp) = create_test_protocol().await;
976
977        let content = b"cache test content";
978        let address = LmdbStorage::compute_address(content);
979
980        // Before insert: cache should be empty
981        let stats_before = protocol.payment_cache_stats();
982        assert_eq!(stats_before.additions, 0);
983
984        // Pre-populate cache
985        protocol.payment_verifier().cache_insert(address);
986
987        // After insert: cache should have the xorname
988        let stats_after = protocol.payment_cache_stats();
989        assert_eq!(stats_after.additions, 1);
990
991        // PUT should succeed (cache hit)
992        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
993        let put_msg = ChunkMessage {
994            request_id: 100,
995            body: ChunkMessageBody::PutRequest(put_request),
996        };
997        let put_bytes = put_msg.encode().expect("encode put");
998        let response_bytes = protocol
999            .try_handle_request(&put_bytes)
1000            .await
1001            .expect("handle put")
1002            .expect("expected response");
1003        let response = ChunkMessage::decode(&response_bytes).expect("decode");
1004
1005        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
1006            // expected
1007        } else {
1008            panic!("expected success, got: {response:?}");
1009        }
1010    }
1011
1012    #[tokio::test]
1013    async fn test_put_same_chunk_twice_hits_cache() {
1014        let (protocol, _temp) = create_test_protocol().await;
1015
1016        let content = b"duplicate cache test";
1017        let address = LmdbStorage::compute_address(content);
1018
1019        // Pre-populate cache for first PUT
1020        protocol.payment_verifier().cache_insert(address);
1021
1022        // First PUT
1023        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
1024        let put_msg = ChunkMessage {
1025            request_id: 110,
1026            body: ChunkMessageBody::PutRequest(put_request),
1027        };
1028        let put_bytes = put_msg.encode().expect("encode put");
1029        let _ = protocol
1030            .try_handle_request(&put_bytes)
1031            .await
1032            .expect("handle put 1");
1033
1034        // Second PUT should return AlreadyExists from the storage idempotency check.
1035        let response_bytes = protocol
1036            .try_handle_request(&put_bytes)
1037            .await
1038            .expect("handle put 2")
1039            .expect("expected response");
1040        let response = ChunkMessage::decode(&response_bytes).expect("decode");
1041
1042        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
1043        {
1044            // expected
1045        } else {
1046            panic!("expected AlreadyExists, got: {response:?}");
1047        }
1048    }
1049
1050    #[tokio::test]
1051    async fn test_payment_cache_stats_returns_correct_values() {
1052        let (protocol, _temp) = create_test_protocol().await;
1053
1054        let stats = protocol.payment_cache_stats();
1055        assert_eq!(stats.hits, 0);
1056        assert_eq!(stats.misses, 0);
1057        assert_eq!(stats.additions, 0);
1058
1059        // Pre-populate cache, then store a chunk to test stats
1060        let content = b"stats test";
1061        let address = LmdbStorage::compute_address(content);
1062        protocol.payment_verifier().cache_insert(address);
1063
1064        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
1065        let put_msg = ChunkMessage {
1066            request_id: 120,
1067            body: ChunkMessageBody::PutRequest(put_request),
1068        };
1069        let put_bytes = put_msg.encode().expect("encode put");
1070        let _ = protocol
1071            .try_handle_request(&put_bytes)
1072            .await
1073            .expect("handle put");
1074
1075        let stats = protocol.payment_cache_stats();
1076        // Should have 1 addition (from cache_insert) + 1 hit (payment verification found cache)
1077        assert_eq!(stats.additions, 1);
1078        assert_eq!(stats.hits, 1);
1079    }
1080
1081    #[tokio::test]
1082    async fn test_storage_stats() {
1083        let (protocol, _temp) = create_test_protocol().await;
1084        let stats = protocol.storage_stats();
1085        assert_eq!(stats.chunks_stored, 0);
1086    }
1087
1088    #[tokio::test]
1089    async fn test_merkle_candidate_quote_request() {
1090        use ant_protocol::payment::verify::verify_merkle_candidate_signature;
1091        use evmlib::merkle_payments::MerklePaymentCandidateNode;
1092
1093        // create_test_protocol already wires ML-DSA-65 signing
1094        let (protocol, _temp) = create_test_protocol().await;
1095
1096        let address = [0x77; 32];
1097        let timestamp = std::time::SystemTime::now()
1098            .duration_since(std::time::UNIX_EPOCH)
1099            .expect("system time")
1100            .as_secs();
1101
1102        let request = MerkleCandidateQuoteRequest {
1103            address,
1104            data_type: DATA_TYPE_CHUNK,
1105            data_size: 4096,
1106            merkle_payment_timestamp: timestamp,
1107        };
1108        let msg = ChunkMessage {
1109            request_id: 600,
1110            body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
1111        };
1112        let msg_bytes = msg.encode().expect("encode request");
1113
1114        let response_bytes = protocol
1115            .try_handle_request(&msg_bytes)
1116            .await
1117            .expect("handle merkle candidate quote")
1118            .expect("expected response");
1119        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
1120
1121        assert_eq!(response.request_id, 600);
1122        match response.body {
1123            ChunkMessageBody::MerkleCandidateQuoteResponse(
1124                MerkleCandidateQuoteResponse::Success { candidate_node },
1125            ) => {
1126                let candidate: MerklePaymentCandidateNode =
1127                    rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
1128
1129                // Verify ML-DSA-65 signature
1130                assert!(
1131                    verify_merkle_candidate_signature(&candidate),
1132                    "ML-DSA-65 candidate signature must be valid"
1133                );
1134
1135                assert_eq!(candidate.merkle_payment_timestamp, timestamp);
1136                // Node-calculated price based on records stored
1137                assert!(candidate.price >= evmlib::common::Amount::ZERO);
1138            }
1139            other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
1140        }
1141    }
1142
1143    #[tokio::test]
1144    async fn test_handle_unexpected_response_message() {
1145        let (protocol, _temp) = create_test_protocol().await;
1146
1147        // Send a PutResponse as if it were a request — should return None
1148        let msg = ChunkMessage {
1149            request_id: 200,
1150            body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
1151        };
1152        let msg_bytes = msg.encode().expect("encode");
1153
1154        let result = protocol
1155            .try_handle_request(&msg_bytes)
1156            .await
1157            .expect("handle msg");
1158
1159        assert!(
1160            result.is_none(),
1161            "expected None for response message, got: {result:?}"
1162        );
1163    }
1164
1165    #[tokio::test]
1166    async fn test_quote_already_stored_flag() {
1167        let (protocol, _temp) = create_test_protocol().await;
1168
1169        let content = b"already stored quote test";
1170        let address = LmdbStorage::compute_address(content);
1171
1172        // Store the chunk first
1173        protocol.payment_verifier().cache_insert(address);
1174        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
1175        let put_msg = ChunkMessage {
1176            request_id: 300,
1177            body: ChunkMessageBody::PutRequest(put_request),
1178        };
1179        let put_bytes = put_msg.encode().expect("encode put");
1180        let _ = protocol
1181            .try_handle_request(&put_bytes)
1182            .await
1183            .expect("handle put");
1184
1185        // Now request a quote for the same address — already_stored should be true
1186        let quote_request = ChunkQuoteRequest {
1187            address,
1188            data_size: content.len() as u64,
1189            data_type: DATA_TYPE_CHUNK,
1190        };
1191        let quote_msg = ChunkMessage {
1192            request_id: 301,
1193            body: ChunkMessageBody::QuoteRequest(quote_request),
1194        };
1195        let quote_bytes = quote_msg.encode().expect("encode quote");
1196        let response_bytes = protocol
1197            .try_handle_request(&quote_bytes)
1198            .await
1199            .expect("handle quote")
1200            .expect("expected response");
1201        let response = ChunkMessage::decode(&response_bytes).expect("decode");
1202
1203        match response.body {
1204            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1205                already_stored, ..
1206            }) => {
1207                assert!(
1208                    already_stored,
1209                    "already_stored should be true for existing chunk"
1210                );
1211            }
1212            other => panic!("expected Success with already_stored, got: {other:?}"),
1213        }
1214
1215        // Request a quote for a chunk that does NOT exist — already_stored should be false
1216        let new_address = [0xFFu8; 32];
1217        let quote_request2 = ChunkQuoteRequest {
1218            address: new_address,
1219            data_size: 100,
1220            data_type: DATA_TYPE_CHUNK,
1221        };
1222        let quote_msg2 = ChunkMessage {
1223            request_id: 302,
1224            body: ChunkMessageBody::QuoteRequest(quote_request2),
1225        };
1226        let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
1227        let response_bytes2 = protocol
1228            .try_handle_request(&quote_bytes2)
1229            .await
1230            .expect("handle quote2")
1231            .expect("expected response");
1232        let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
1233
1234        match response2.body {
1235            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1236                already_stored, ..
1237            }) => {
1238                assert!(
1239                    !already_stored,
1240                    "already_stored should be false for new chunk"
1241                );
1242            }
1243            other => panic!("expected Success with already_stored=false, got: {other:?}"),
1244        }
1245    }
1246
1247    /// Drive the real quote handler, then read the record count it priced on.
1248    /// The handler calls `resync_quote_metric` first, so this reflects records
1249    /// ACTUALLY HELD.
1250    fn priced_records_after_quote(protocol: &AntProtocol) -> usize {
1251        let quote_request = ChunkQuoteRequest {
1252            address: [0xAAu8; 32], // a quote-only probe, not one of the stored chunks
1253            data_size: 100,
1254            data_type: DATA_TYPE_CHUNK,
1255        };
1256        let _ = protocol.handle_quote(&quote_request);
1257        protocol.priced_records_stored()
1258    }
1259
1260    /// The quote price must track records ACTUALLY HELD: deleting stored chunks
1261    /// must lower the priced record count, not keep quoting as if the data were
1262    /// still held. Exercises the storage-driven resync in `resync_quote_metric`.
1263    #[tokio::test]
1264    async fn test_quote_metric_reflects_deletions() {
1265        let (protocol, _temp) = create_test_protocol().await;
1266
1267        // Distinct content -> distinct content-addressed keys.
1268        let contents: Vec<Vec<u8>> = (0u8..5).map(|i| vec![i; 64]).collect();
1269        let mut addresses = Vec::new();
1270        for content in &contents {
1271            let addr = LmdbStorage::compute_address(content);
1272            protocol.put_local(&addr, content).await.expect("put_local");
1273            addresses.push(addr);
1274        }
1275
1276        // 5 records held -> priced count 5.
1277        assert_eq!(priced_records_after_quote(&protocol), 5);
1278
1279        // Delete 2 chunks the node was holding.
1280        for addr in addresses.iter().take(2) {
1281            assert!(protocol.storage().delete(addr).await.expect("delete"));
1282        }
1283        assert_eq!(priced_records_after_quote(&protocol), 3);
1284
1285        // Delete the rest; priced count floors at 0, never underflows.
1286        for addr in addresses.iter().skip(2) {
1287            assert!(protocol.storage().delete(addr).await.expect("delete"));
1288        }
1289        assert_eq!(priced_records_after_quote(&protocol), 0);
1290    }
1291
1292    /// Stronger, externally-observable proof: the actual quote PRICE returned
1293    /// to a client must drop after the node deletes data it held. A monotonic
1294    /// store counter would keep the price elevated; the resync ties price to
1295    /// records actually held.
1296    /// FLIPS IF: `resync_quote_metric` is removed — the price would stay at the
1297    /// 10-record level even after deletions (`record_store` only ever increments).
1298    #[tokio::test]
1299    async fn test_quote_price_drops_after_deletion() {
1300        use crate::payment::pricing::calculate_price;
1301
1302        let (protocol, _temp) = create_test_protocol().await;
1303        let contents: Vec<Vec<u8>> = (0u8..10).map(|i| vec![i; 64]).collect();
1304        let mut addresses = Vec::new();
1305        for content in &contents {
1306            let addr = LmdbStorage::compute_address(content);
1307            protocol.put_local(&addr, content).await.expect("put_local");
1308            addresses.push(addr);
1309        }
1310
1311        // Drive a real quote; the priced count must equal records held (10),
1312        // and the price must equal calculate_price(10) — the externally
1313        // observable contract.
1314        assert_eq!(priced_records_after_quote(&protocol), 10);
1315        let price_full = calculate_price(10);
1316
1317        // Delete 8 of 10 held chunks.
1318        for addr in addresses.iter().take(8) {
1319            assert!(protocol.storage().delete(addr).await.expect("delete"));
1320        }
1321        // The next quote must price on 2 records, and the price must be the
1322        // calculate_price(2) value — strictly different from the 10-record
1323        // price (price is monotonic non-decreasing in records_stored).
1324        assert_eq!(priced_records_after_quote(&protocol), 2);
1325        let price_after = calculate_price(2);
1326        assert!(
1327            price_after < price_full,
1328            "deleting data must lower the observable quote price \
1329             (full={price_full:?}, after={price_after:?})"
1330        );
1331    }
1332}