Skip to main content

ant_node/storage/
handler.rs

1//! ANT protocol handler for autonomi protocol messages.
2//!
3//! This handler processes chunk PUT/GET requests with optional payment verification,
4//! storing chunks to LMDB and using the DHT for network-wide retrieval.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────┐
10//! │                    AntProtocol                        │
11//! ├─────────────────────────────────────────────────────────┤
12//! │  protocol_id() = "autonomi.ant.chunk.v1"                  │
13//! │                                                         │
14//! │  try_handle_request(data) ──▶ decode ChunkMessage  │
15//! │                                   │                     │
16//! │         ┌─────────────────────────┼─────────────────┐  │
17//! │         ▼                         ▼                 ▼  │
18//! │   ChunkQuoteRequest           ChunkPutRequest    ChunkGetRequest
19//! │         │                         │                 │  │
20//! │         ▼                         ▼                 ▼  │
21//! │   QuoteGenerator          PaymentVerifier    LmdbStorage│
22//! │         │                         │                 │  │
23//! │         └─────────────────────────┴─────────────────┘  │
24//! │                           │                             │
25//! │           return Ok(Some(response_bytes))              │
26//! │           return Ok(None) for response messages       │
27//! └─────────────────────────────────────────────────────────┘
28//! ```
29
30#[cfg(test)]
31use crate::ant_protocol::DATA_TYPE_CHUNK;
32use crate::ant_protocol::{
33    ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
34    ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
35    MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE,
36};
37use crate::client::compute_address;
38use crate::error::{Error, Result};
39use crate::logging::{debug, info, warn};
40use crate::payment::{PaymentVerifier, QuoteGenerator, VerificationContext};
41use crate::replication::fresh::FreshWriteEvent;
42use crate::storage::lmdb::LmdbStorage;
43use bytes::Bytes;
44use saorsa_core::P2PNode;
45use std::sync::Arc;
46use tokio::sync::mpsc;
47
48/// ANT protocol handler.
49///
50/// Handles chunk PUT/GET/Quote requests using LMDB storage for persistence
51/// and optional payment verification.
52pub struct AntProtocol {
53    /// LMDB storage for chunk persistence.
54    storage: Arc<LmdbStorage>,
55    /// Payment verifier for checking payments.
56    payment_verifier: Arc<PaymentVerifier>,
57    /// Quote generator for creating storage quotes.
58    /// Also handles merkle candidate quote signing via ML-DSA-65.
59    quote_generator: Arc<QuoteGenerator>,
60    /// Channel for notifying the replication engine about newly-stored chunks.
61    fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
62}
63
64impl AntProtocol {
65    /// Create a new ANT protocol handler.
66    ///
67    /// # Arguments
68    ///
69    /// * `storage` - LMDB storage for chunk persistence
70    /// * `payment_verifier` - Payment verifier for validating payments
71    /// * `quote_generator` - Quote generator for creating storage quotes
72    #[must_use]
73    pub fn new(
74        storage: Arc<LmdbStorage>,
75        payment_verifier: Arc<PaymentVerifier>,
76        quote_generator: Arc<QuoteGenerator>,
77    ) -> Self {
78        // Keep the PaymentVerifier's paid-quote price floor and the
79        // QuoteGenerator's pricing wired to the same authoritative store used
80        // by this protocol handler. Both must read the same record count: the
81        // generator prices quotes from current_chunks(), and the verifier later
82        // checks the paid median quote against current_chunks(). Attaching both
83        // here makes the invariant automatic for every AntProtocol
84        // construction path, including tests and future startup variants.
85        payment_verifier.attach_storage(Arc::clone(&storage));
86        quote_generator.attach_storage(Arc::clone(&storage));
87
88        Self {
89            storage,
90            payment_verifier,
91            quote_generator,
92            fresh_write_tx: None,
93        }
94    }
95
96    /// Attach the node's P2P handle for payment live-DHT checks.
97    ///
98    /// Wires the handle into the payment verifier so payment-proof closeness
99    /// checks can use the live routing view. Idempotent: calling twice
100    /// replaces the verifier handle.
101    pub fn attach_p2p_node(&self, node: Arc<P2PNode>) {
102        self.payment_verifier.attach_p2p_node(node);
103        debug!("AntProtocol: P2PNode attached for payment live-DHT checks");
104    }
105
106    /// Set the channel sender for fresh-write replication events.
107    ///
108    /// When set, successful chunk PUTs will notify the replication engine
109    /// so it can fan out fresh offers to the close group.
110    pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
111        self.fresh_write_tx = Some(tx);
112    }
113
114    /// Get the protocol identifier.
115    #[must_use]
116    pub fn protocol_id(&self) -> &'static str {
117        CHUNK_PROTOCOL_ID
118    }
119
120    /// Get a reference to the underlying LMDB storage.
121    #[must_use]
122    pub fn storage(&self) -> Arc<LmdbStorage> {
123        Arc::clone(&self.storage)
124    }
125
126    /// Test-only: the record count the quote generator currently prices on.
127    /// Used to assert that quote-time resync tracks records actually held.
128    #[cfg(test)]
129    #[must_use]
130    pub(crate) fn priced_records_stored(&self) -> usize {
131        self.quote_generator.records_stored()
132    }
133
134    /// Get a shared reference to the payment verifier.
135    #[must_use]
136    pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
137        Arc::clone(&self.payment_verifier)
138    }
139
140    /// Handle an incoming request and produce a response.
141    ///
142    /// Decodes the raw message, processes it if it is a request variant,
143    /// and returns the encoded response bytes.  Returns `Ok(None)` for
144    /// response messages (which are meant for client subscribers, not for
145    /// the protocol handler).
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if message decoding, handling, or encoding fails.
150    pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
151        let message = ChunkMessage::decode(data)
152            .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
153
154        let request_id = message.request_id;
155
156        let response_body = match message.body {
157            ChunkMessageBody::PutRequest(req) => {
158                ChunkMessageBody::PutResponse(self.handle_put(req).await)
159            }
160            ChunkMessageBody::GetRequest(req) => {
161                ChunkMessageBody::GetResponse(self.handle_get(req).await)
162            }
163            ChunkMessageBody::QuoteRequest(ref req) => {
164                ChunkMessageBody::QuoteResponse(self.handle_quote(req))
165            }
166            ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
167                ChunkMessageBody::MerkleCandidateQuoteResponse(
168                    self.handle_merkle_candidate_quote(req),
169                )
170            }
171            // Anything else — response messages are handled by client
172            // subscribers (e.g. send_and_await_chunk_response), not by the
173            // protocol handler. Returning None prevents the caller from
174            // sending a reply, which would create an infinite ping-pong
175            // loop.
176            //
177            // `ChunkMessageBody` is `#[non_exhaustive]` in ant-protocol, so
178            // a future wire variant added on a protocol minor bump also
179            // lands here and is dropped. The CHUNK_PROTOCOL_ID multistream-
180            // select handshake version-gates peers, so this arm should
181            // only be reached by a misconfigured peer.
182            _ => return Ok(None),
183        };
184
185        let response = ChunkMessage {
186            request_id,
187            body: response_body,
188        };
189
190        response
191            .encode()
192            .map(|b| Some(Bytes::from(b)))
193            .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
194    }
195
196    /// Handle a PUT request.
197    ///
198    /// Wraps `handle_put_inner` to emit a single structured tracing event per
199    /// PUT RPC at every exit path, including early-return validation paths.
200    /// The event uses `target: "ant_node::storage::rpc_latency"` so that
201    /// Elasticsearch / Kibana can build p50/p95/p99 store-RPC latency
202    /// histograms from the existing telegraf log forwarding.
203    async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
204        let start = std::time::Instant::now();
205        let addr_hex = hex::encode(request.address);
206        let chunk_size = request.content.len();
207        let response = self.handle_put_inner(request).await;
208        let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
209        let outcome: &'static str = match &response {
210            ChunkPutResponse::Success { .. } => "success",
211            ChunkPutResponse::AlreadyExists { .. } => "already_exists",
212            ChunkPutResponse::PaymentRequired { .. } => "payment_required",
213            ChunkPutResponse::Error(_) => "error",
214            _ => "unknown",
215        };
216        info!(
217            target: "ant_node::storage::rpc_latency",
218            duration_ms,
219            chunk_size,
220            outcome,
221            addr = %addr_hex,
222            "put_rpc"
223        );
224        response
225    }
226
227    /// Inner body of `handle_put` — see the wrapper for the per-RPC latency log.
228    async fn handle_put_inner(&self, request: ChunkPutRequest) -> ChunkPutResponse {
229        let address = request.address;
230        let addr_hex = hex::encode(address);
231        debug!("Handling PUT request for {addr_hex}");
232
233        // 1. Validate chunk size
234        if request.content.len() > MAX_CHUNK_SIZE {
235            return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
236                size: request.content.len(),
237                max_size: MAX_CHUNK_SIZE,
238            });
239        }
240
241        // 2. Verify content address matches BLAKE3(content)
242        let computed = compute_address(&request.content);
243        if computed != address {
244            return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
245                expected: address,
246                actual: computed,
247            });
248        }
249
250        // 3. Check if already exists (idempotent success)
251        match self.storage.exists(&address) {
252            Ok(true) => {
253                debug!("Chunk {addr_hex} already exists");
254                return ChunkPutResponse::AlreadyExists { address };
255            }
256            Err(e) => {
257                return ChunkPutResponse::Error(ProtocolError::Internal(format!(
258                    "Storage read failed: {e}"
259                )));
260            }
261            Ok(false) => {}
262        }
263
264        // 4. Cheap disk-space pre-check — runs BEFORE the expensive payment
265        //    verification path (ML-DSA pool checks, a Kademlia closeness
266        //    lookup, and an on-chain Arbitrum RPC). A disk-full node can never
267        //    satisfy this PUT, so reject it here rather than burning that work
268        //    only to fail the reserve check inside `storage.put` (V2-411). The
269        //    check caches passing results, so it is free per-PUT on a healthy
270        //    node; a disk-full node re-runs a cheap `available_space` syscall
271        //    each PUT (still negligible next to the verification it avoids) and
272        //    so detects freed space promptly. The store path keeps its own
273        //    check as defence-in-depth.
274        if let Err(e) = self.storage.check_capacity() {
275            info!(
276                target: "ant_node::storage::disk_precheck",
277                addr = %addr_hex,
278                "Rejecting PUT before payment verification: {e}"
279            );
280            return ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()));
281        }
282
283        // 5. Verify payment. The ClientPut context applies the store-strength
284        // payment cache and verifies live proofs. Direct client PUT does not
285        // reject based on this node's local storage-responsibility view.
286        let payment_result = self
287            .payment_verifier
288            .verify_payment(
289                &address,
290                request.payment_proof.as_deref(),
291                VerificationContext::ClientPut,
292            )
293            .await;
294
295        match payment_result {
296            Ok(status) if status.can_store() => {
297                // Payment verified or cached
298            }
299            Ok(_) => {
300                return ChunkPutResponse::PaymentRequired {
301                    message: "Payment required for new chunk".to_string(),
302                };
303            }
304            Err(e) => {
305                return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
306            }
307        }
308
309        // 6. Store chunk
310        match self.storage.put(&address, &request.content).await {
311            Ok(_) => {
312                let content_len = request.content.len();
313                info!("Stored chunk {addr_hex} ({content_len} bytes)");
314                // Bump the in-memory fallback counter. Both pricing and the
315                // paid-quote floor now read LmdbStorage::current_chunks() directly,
316                // so this counter only matters when no storage is attached
317                // (unit tests / mis-configured startup). Kept warm so that
318                // fallback path stays roughly accurate.
319                self.quote_generator.record_store();
320
321                // 7. Notify replication engine for fresh fan-out.
322                //    Only emit when a real proof is present — cached-as-verified
323                //    PUTs have no proof to forward, and the chunk would have
324                //    already replicated on the original write that carried one.
325                if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
326                    // `request.content` is now `bytes::Bytes`; FreshWriteEvent
327                    // still carries the chunk as `Vec<u8>` for compatibility
328                    // with the replication wire format, so materialise once
329                    // here. Done only on the success path, where storage has
330                    // already accepted the chunk.
331                    let event = FreshWriteEvent {
332                        key: address,
333                        data: request.content.to_vec(),
334                        payment_proof: proof,
335                    };
336                    if tx.send(event).is_err() {
337                        debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
338                    }
339                }
340
341                ChunkPutResponse::Success { address }
342            }
343            Err(e) => {
344                warn!("Failed to store chunk {addr_hex}: {e}");
345                ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
346            }
347        }
348    }
349
350    /// Handle a GET request.
351    ///
352    /// Wraps `handle_get_inner` to emit a single structured tracing event per
353    /// GET RPC at every exit path. See `handle_put` for the rationale.
354    async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
355        let start = std::time::Instant::now();
356        let addr_hex = hex::encode(request.address);
357        let response = self.handle_get_inner(request).await;
358        let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
359        let outcome: &'static str = match &response {
360            ChunkGetResponse::Success { .. } => "success",
361            ChunkGetResponse::NotFound { .. } => "not_found",
362            ChunkGetResponse::Error(_) => "error",
363            _ => "unknown",
364        };
365        info!(
366            target: "ant_node::storage::rpc_latency",
367            duration_ms,
368            outcome,
369            addr = %addr_hex,
370            "get_rpc"
371        );
372        response
373    }
374
375    /// Inner body of `handle_get` — see the wrapper for the per-RPC latency log.
376    async fn handle_get_inner(&self, request: ChunkGetRequest) -> ChunkGetResponse {
377        let address = request.address;
378        let addr_hex = hex::encode(address);
379        debug!("Handling GET request for {addr_hex}");
380
381        match self.storage.get(&address).await {
382            Ok(Some(content)) => {
383                let content_len = content.len();
384                debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
385                ChunkGetResponse::Success { address, content }
386            }
387            Ok(None) => {
388                debug!("Chunk {addr_hex} not found");
389                ChunkGetResponse::NotFound { address }
390            }
391            Err(e) => {
392                warn!("Failed to retrieve chunk {addr_hex}: {e}");
393                ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
394            }
395        }
396    }
397
398    /// Resync the quoting metric to the authoritative count of records the node
399    /// actually holds.
400    ///
401    /// The quote price is driven by `QuoteGenerator::records_stored()`. Reading
402    /// the live LMDB entry count (an O(1) B-tree page-header read) right before
403    /// pricing makes the metric deletion-aware: any chunk removed by
404    /// [`LmdbStorage::delete`] or by the replication prune pass is reflected
405    /// immediately, with no risk of missing a delete path.
406    ///
407    /// On a storage read error — or a count that does not fit `usize` — the
408    /// previous metric value is left untouched so a transient LMDB error never
409    /// disrupts quote generation.
410    fn resync_quote_metric(&self) {
411        match self.storage.current_chunks() {
412            // Saturating an overflowing count to usize::MAX would jump the
413            // metric to the maximum possible price driver; keep the previous
414            // value instead, as for a read error.
415            Ok(count) => usize::try_from(count).map_or_else(
416                |_| {
417                    warn!(
418                        "current_chunks() count {count} overflows usize; keeping previous quote \
419                         metric"
420                    );
421                },
422                |records| self.quote_generator.resync_records(records),
423            ),
424            Err(e) => {
425                warn!("Failed to read current_chunks() for quote metric resync: {e}");
426            }
427        }
428    }
429
430    /// Handle a quote request.
431    fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
432        let addr_hex = hex::encode(request.address);
433        let data_size = request.data_size;
434        debug!("Handling quote request for {addr_hex} (size: {data_size})");
435
436        // Price on records ACTUALLY HELD, not a monotonic store counter.
437        self.resync_quote_metric();
438
439        // Check if the chunk is already stored so we can tell the client
440        // to skip payment (already_stored = true).
441        // The match intentionally logs the error when the `logging` feature is
442        // active. Clippy suggests `unwrap_or_default()` when logging is compiled
443        // out, but keeping the explicit match preserves the diagnostic intent.
444        #[allow(clippy::manual_unwrap_or_default)]
445        let already_stored = match self.storage.exists(&request.address) {
446            Ok(exists) => exists,
447            Err(e) => {
448                warn!("Storage check failed for {addr_hex}: {e}");
449                false // Assume not stored on error — generate a normal quote.
450            }
451        };
452
453        if already_stored {
454            debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
455        }
456
457        // Validate data size - data_size is u64, cast carefully and reject overflow
458        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
459            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
460                size: MAX_CHUNK_SIZE + 1,
461                max_size: MAX_CHUNK_SIZE,
462            });
463        };
464        if data_size_usize > MAX_CHUNK_SIZE {
465            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
466                size: data_size_usize,
467                max_size: MAX_CHUNK_SIZE,
468            });
469        }
470
471        match self
472            .quote_generator
473            .create_quote(request.address, data_size_usize, request.data_type)
474        {
475            Ok(quote) => {
476                // Serialize the quote
477                match rmp_serde::to_vec(&quote) {
478                    Ok(quote_bytes) => ChunkQuoteResponse::Success {
479                        quote: quote_bytes,
480                        already_stored,
481                    },
482                    Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
483                        "Failed to serialize quote: {e}"
484                    ))),
485                }
486            }
487            Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
488        }
489    }
490
491    /// Handle a merkle candidate quote request.
492    fn handle_merkle_candidate_quote(
493        &self,
494        request: &MerkleCandidateQuoteRequest,
495    ) -> MerkleCandidateQuoteResponse {
496        let addr_hex = hex::encode(request.address);
497        let data_size = request.data_size;
498        debug!(
499            "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
500            request.merkle_payment_timestamp
501        );
502
503        // Price on records ACTUALLY HELD, not a monotonic store counter.
504        self.resync_quote_metric();
505
506        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
507            return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
508                "data_size {} overflows usize",
509                request.data_size
510            )));
511        };
512        if data_size_usize > MAX_CHUNK_SIZE {
513            return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
514                size: data_size_usize,
515                max_size: MAX_CHUNK_SIZE,
516            });
517        }
518
519        match self.quote_generator.create_merkle_candidate_quote(
520            data_size_usize,
521            request.data_type,
522            request.merkle_payment_timestamp,
523        ) {
524            Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
525                Ok(bytes) => MerkleCandidateQuoteResponse::Success {
526                    candidate_node: bytes,
527                },
528                Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
529                    "Failed to serialize merkle candidate node: {e}"
530                ))),
531            },
532            Err(e) => {
533                MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
534            }
535        }
536    }
537
538    /// Get storage statistics.
539    #[must_use]
540    pub fn storage_stats(&self) -> crate::storage::StorageStats {
541        self.storage.stats()
542    }
543
544    /// Get payment cache statistics.
545    #[must_use]
546    pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
547        self.payment_verifier.cache_stats()
548    }
549
550    /// Get a reference to the payment verifier.
551    ///
552    /// Exposed for **test harnesses only** — production code should not call
553    /// this directly. Use `cache_insert()` on the returned verifier to
554    /// pre-populate the payment cache in test setups.
555    #[cfg(any(test, feature = "test-utils"))]
556    #[must_use]
557    pub fn payment_verifier(&self) -> &PaymentVerifier {
558        &self.payment_verifier
559    }
560
561    /// Check if a chunk exists locally.
562    ///
563    /// # Errors
564    ///
565    /// Returns an error if the storage read fails.
566    pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
567        self.storage.exists(address)
568    }
569
570    /// Get a chunk directly from local storage.
571    ///
572    /// # Errors
573    ///
574    /// Returns an error if storage access fails.
575    pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
576        self.storage.get(address).await
577    }
578
579    /// Store a chunk directly to local storage (bypasses payment verification).
580    ///
581    /// TEST ONLY - This method bypasses payment verification and should only be used in tests.
582    ///
583    /// # Errors
584    ///
585    /// Returns an error if storage fails or content doesn't match address.
586    #[cfg(test)]
587    pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
588        self.storage.put(address, content).await
589    }
590}
591
592#[cfg(test)]
593#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
594mod tests {
595    use super::*;
596    use crate::payment::metrics::QuotingMetricsTracker;
597    use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
598    use crate::storage::LmdbStorageConfig;
599    use evmlib::RewardsAddress;
600    use saorsa_core::identity::NodeIdentity;
601    use saorsa_core::MlDsa65;
602    use saorsa_pqc::pqc::types::MlDsaSecretKey;
603    use tempfile::TempDir;
604
605    async fn create_test_protocol() -> (AntProtocol, TempDir) {
606        // `test_default()` sets `disk_reserve: 0`, so the disk pre-check always
607        // passes for the regular tests.
608        create_test_protocol_with_reserve(0).await
609    }
610
611    /// Build a test protocol whose storage enforces the given disk reserve.
612    ///
613    /// A very large reserve (e.g. `u64::MAX`) makes `available < reserve`
614    /// always true, so the disk-space pre-check in `handle_put_inner` fails —
615    /// used to exercise the V2-411 early-return path.
616    async fn create_test_protocol_with_reserve(disk_reserve: u64) -> (AntProtocol, TempDir) {
617        let temp_dir = TempDir::new().expect("create temp dir");
618
619        let storage_config = LmdbStorageConfig {
620            root_dir: temp_dir.path().to_path_buf(),
621            disk_reserve,
622            ..LmdbStorageConfig::test_default()
623        };
624        let storage = Arc::new(
625            LmdbStorage::new(storage_config)
626                .await
627                .expect("create storage"),
628        );
629
630        let rewards_address = RewardsAddress::new([1u8; 20]);
631        let payment_config = PaymentVerifierConfig {
632            evm: EvmVerifierConfig::default(),
633            cache_capacity: 100_000,
634            close_group_size: crate::ant_protocol::CLOSE_GROUP_SIZE,
635            local_rewards_address: rewards_address,
636        };
637        let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
638        let metrics_tracker = QuotingMetricsTracker::new(100);
639        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
640
641        // Wire ML-DSA-65 signing so quote requests succeed
642        let identity = NodeIdentity::generate().expect("generate identity");
643        let pub_key_bytes = identity.public_key().as_bytes().to_vec();
644        let sk_bytes = identity.secret_key_bytes().to_vec();
645        let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
646        quote_generator.set_signer(pub_key_bytes, move |msg| {
647            use saorsa_pqc::pqc::MlDsaOperations;
648            let ml_dsa = MlDsa65::new();
649            ml_dsa
650                .sign(&sk, msg)
651                .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
652        });
653
654        let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
655        (protocol, temp_dir)
656    }
657
658    #[tokio::test]
659    async fn test_put_and_get_chunk() {
660        let (protocol, _temp) = create_test_protocol().await;
661
662        let content = b"hello world";
663        let address = LmdbStorage::compute_address(content);
664
665        // Pre-populate payment cache so EVM verification is bypassed
666        protocol.payment_verifier().cache_insert(address);
667
668        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
669        let put_msg = ChunkMessage {
670            request_id: 1,
671            body: ChunkMessageBody::PutRequest(put_request),
672        };
673        let put_bytes = put_msg.encode().expect("encode put");
674
675        // Handle PUT
676        let response_bytes = protocol
677            .try_handle_request(&put_bytes)
678            .await
679            .expect("handle put")
680            .expect("expected response");
681        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
682
683        assert_eq!(response.request_id, 1);
684        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
685            response.body
686        {
687            assert_eq!(addr, address);
688        } else {
689            panic!("expected PutResponse::Success, got: {response:?}");
690        }
691
692        // Create GET request
693        let get_request = ChunkGetRequest::new(address);
694        let get_msg = ChunkMessage {
695            request_id: 2,
696            body: ChunkMessageBody::GetRequest(get_request),
697        };
698        let get_bytes = get_msg.encode().expect("encode get");
699
700        // Handle GET
701        let response_bytes = protocol
702            .try_handle_request(&get_bytes)
703            .await
704            .expect("handle get")
705            .expect("expected response");
706        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
707
708        assert_eq!(response.request_id, 2);
709        if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
710            address: addr,
711            content: data,
712        }) = response.body
713        {
714            assert_eq!(addr, address);
715            assert_eq!(data, content.to_vec());
716        } else {
717            panic!("expected GetResponse::Success");
718        }
719    }
720
721    #[tokio::test]
722    async fn test_get_not_found() {
723        let (protocol, _temp) = create_test_protocol().await;
724
725        let address = [0xAB; 32];
726        let get_request = ChunkGetRequest::new(address);
727        let get_msg = ChunkMessage {
728            request_id: 10,
729            body: ChunkMessageBody::GetRequest(get_request),
730        };
731        let get_bytes = get_msg.encode().expect("encode get");
732
733        let response_bytes = protocol
734            .try_handle_request(&get_bytes)
735            .await
736            .expect("handle get")
737            .expect("expected response");
738        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
739
740        assert_eq!(response.request_id, 10);
741        if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
742            response.body
743        {
744            assert_eq!(addr, address);
745        } else {
746            panic!("expected GetResponse::NotFound");
747        }
748    }
749
750    #[tokio::test]
751    async fn test_put_address_mismatch() {
752        let (protocol, _temp) = create_test_protocol().await;
753
754        let content = b"test content";
755        let wrong_address = [0xFF; 32]; // Wrong address
756
757        // Pre-populate cache for the wrong address so we test address mismatch, not payment
758        protocol.payment_verifier().cache_insert(wrong_address);
759
760        let put_request = ChunkPutRequest::new(wrong_address, Bytes::copy_from_slice(content));
761        let put_msg = ChunkMessage {
762            request_id: 20,
763            body: ChunkMessageBody::PutRequest(put_request),
764        };
765        let put_bytes = put_msg.encode().expect("encode put");
766
767        let response_bytes = protocol
768            .try_handle_request(&put_bytes)
769            .await
770            .expect("handle put")
771            .expect("expected response");
772        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
773
774        assert_eq!(response.request_id, 20);
775        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
776            ProtocolError::AddressMismatch { .. },
777        )) = response.body
778        {
779            // Expected
780        } else {
781            panic!("expected AddressMismatch error, got: {response:?}");
782        }
783    }
784
785    #[tokio::test]
786    async fn test_put_chunk_too_large() {
787        let (protocol, _temp) = create_test_protocol().await;
788
789        // Create oversized content
790        let content = vec![0u8; MAX_CHUNK_SIZE + 1];
791        let address = LmdbStorage::compute_address(&content);
792
793        let put_request = ChunkPutRequest::new(address, Bytes::from(content));
794        let put_msg = ChunkMessage {
795            request_id: 30,
796            body: ChunkMessageBody::PutRequest(put_request),
797        };
798        let put_bytes = put_msg.encode().expect("encode put");
799
800        let response_bytes = protocol
801            .try_handle_request(&put_bytes)
802            .await
803            .expect("handle put")
804            .expect("expected response");
805        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
806
807        assert_eq!(response.request_id, 30);
808        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
809            ProtocolError::ChunkTooLarge { .. },
810        )) = response.body
811        {
812            // Expected
813        } else {
814            panic!("expected ChunkTooLarge error");
815        }
816    }
817
818    /// V2-411: a disk-full node must reject a PUT with the disk-space error
819    /// *before* running payment verification.
820    ///
821    /// The chunk is intentionally **not** cache-inserted, so if the handler
822    /// reached `verify_payment` it would return `PaymentRequired`/`PaymentFailed`
823    /// (an uncached chunk with no proof). Observing the `StorageFailed` disk
824    /// error instead proves the disk pre-check short-circuited ahead of
825    /// verification — there is no on-chain path to reach.
826    #[tokio::test]
827    async fn test_put_rejected_on_insufficient_disk_before_verification() {
828        // u64::MAX reserve guarantees `available < reserve`, so the cached
829        // disk-space check always fails.
830        let (protocol, _temp) = create_test_protocol_with_reserve(u64::MAX).await;
831
832        let content = b"chunk for a disk-full node";
833        let address = LmdbStorage::compute_address(content);
834
835        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
836        let put_msg = ChunkMessage {
837            request_id: 41,
838            body: ChunkMessageBody::PutRequest(put_request),
839        };
840        let put_bytes = put_msg.encode().expect("encode put");
841
842        let response_bytes = protocol
843            .try_handle_request(&put_bytes)
844            .await
845            .expect("handle put")
846            .expect("expected response");
847        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
848
849        assert_eq!(response.request_id, 41);
850        match response.body {
851            ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
852                ProtocolError::StorageFailed(msg),
853            )) => {
854                assert!(
855                    msg.contains("Insufficient disk space"),
856                    "expected disk-space error, got: {msg}"
857                );
858            }
859            other => {
860                panic!("expected StorageFailed disk error before verification, got: {other:?}")
861            }
862        }
863
864        // And nothing was stored.
865        assert!(!protocol.exists(&address).expect("exists check"));
866    }
867
868    #[tokio::test]
869    async fn test_put_already_exists() {
870        let (protocol, _temp) = create_test_protocol().await;
871
872        let content = b"duplicate content";
873        let address = LmdbStorage::compute_address(content);
874
875        // Pre-populate cache so EVM verification is bypassed
876        protocol.payment_verifier().cache_insert(address);
877
878        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
879        let put_msg = ChunkMessage {
880            request_id: 40,
881            body: ChunkMessageBody::PutRequest(put_request),
882        };
883        let put_bytes = put_msg.encode().expect("encode put");
884
885        let _ = protocol
886            .try_handle_request(&put_bytes)
887            .await
888            .expect("handle put");
889
890        // Store again - should return AlreadyExists
891        let response_bytes = protocol
892            .try_handle_request(&put_bytes)
893            .await
894            .expect("handle put 2")
895            .expect("expected response");
896        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
897
898        assert_eq!(response.request_id, 40);
899        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
900            response.body
901        {
902            assert_eq!(addr, address);
903        } else {
904            panic!("expected AlreadyExists");
905        }
906    }
907
908    #[tokio::test]
909    async fn test_protocol_id() {
910        let (protocol, _temp) = create_test_protocol().await;
911        assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
912    }
913
914    #[tokio::test]
915    async fn test_exists_and_local_access() {
916        let (protocol, _temp) = create_test_protocol().await;
917
918        let content = b"local access test";
919        let address = LmdbStorage::compute_address(content);
920
921        assert!(!protocol.exists(&address).expect("exists check"));
922
923        protocol
924            .put_local(&address, content)
925            .await
926            .expect("put local");
927
928        assert!(protocol.exists(&address).expect("exists check"));
929
930        let retrieved = protocol.get_local(&address).await.expect("get local");
931        assert_eq!(retrieved, Some(content.to_vec()));
932    }
933
934    #[tokio::test]
935    async fn test_cache_insert_is_visible() {
936        let (protocol, _temp) = create_test_protocol().await;
937
938        let content = b"cache test content";
939        let address = LmdbStorage::compute_address(content);
940
941        // Before insert: cache should be empty
942        let stats_before = protocol.payment_cache_stats();
943        assert_eq!(stats_before.additions, 0);
944
945        // Pre-populate cache
946        protocol.payment_verifier().cache_insert(address);
947
948        // After insert: cache should have the xorname
949        let stats_after = protocol.payment_cache_stats();
950        assert_eq!(stats_after.additions, 1);
951
952        // PUT should succeed (cache hit)
953        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
954        let put_msg = ChunkMessage {
955            request_id: 100,
956            body: ChunkMessageBody::PutRequest(put_request),
957        };
958        let put_bytes = put_msg.encode().expect("encode put");
959        let response_bytes = protocol
960            .try_handle_request(&put_bytes)
961            .await
962            .expect("handle put")
963            .expect("expected response");
964        let response = ChunkMessage::decode(&response_bytes).expect("decode");
965
966        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
967            // expected
968        } else {
969            panic!("expected success, got: {response:?}");
970        }
971    }
972
973    #[tokio::test]
974    async fn test_put_same_chunk_twice_hits_cache() {
975        let (protocol, _temp) = create_test_protocol().await;
976
977        let content = b"duplicate cache test";
978        let address = LmdbStorage::compute_address(content);
979
980        // Pre-populate cache for first PUT
981        protocol.payment_verifier().cache_insert(address);
982
983        // First PUT
984        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
985        let put_msg = ChunkMessage {
986            request_id: 110,
987            body: ChunkMessageBody::PutRequest(put_request),
988        };
989        let put_bytes = put_msg.encode().expect("encode put");
990        let _ = protocol
991            .try_handle_request(&put_bytes)
992            .await
993            .expect("handle put 1");
994
995        // Second PUT should return AlreadyExists from the storage idempotency check.
996        let response_bytes = protocol
997            .try_handle_request(&put_bytes)
998            .await
999            .expect("handle put 2")
1000            .expect("expected response");
1001        let response = ChunkMessage::decode(&response_bytes).expect("decode");
1002
1003        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
1004        {
1005            // expected
1006        } else {
1007            panic!("expected AlreadyExists, got: {response:?}");
1008        }
1009    }
1010
1011    #[tokio::test]
1012    async fn test_payment_cache_stats_returns_correct_values() {
1013        let (protocol, _temp) = create_test_protocol().await;
1014
1015        let stats = protocol.payment_cache_stats();
1016        assert_eq!(stats.hits, 0);
1017        assert_eq!(stats.misses, 0);
1018        assert_eq!(stats.additions, 0);
1019
1020        // Pre-populate cache, then store a chunk to test stats
1021        let content = b"stats test";
1022        let address = LmdbStorage::compute_address(content);
1023        protocol.payment_verifier().cache_insert(address);
1024
1025        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
1026        let put_msg = ChunkMessage {
1027            request_id: 120,
1028            body: ChunkMessageBody::PutRequest(put_request),
1029        };
1030        let put_bytes = put_msg.encode().expect("encode put");
1031        let _ = protocol
1032            .try_handle_request(&put_bytes)
1033            .await
1034            .expect("handle put");
1035
1036        let stats = protocol.payment_cache_stats();
1037        // Should have 1 addition (from cache_insert) + 1 hit (payment verification found cache)
1038        assert_eq!(stats.additions, 1);
1039        assert_eq!(stats.hits, 1);
1040    }
1041
1042    #[tokio::test]
1043    async fn test_storage_stats() {
1044        let (protocol, _temp) = create_test_protocol().await;
1045        let stats = protocol.storage_stats();
1046        assert_eq!(stats.chunks_stored, 0);
1047    }
1048
1049    #[tokio::test]
1050    async fn test_merkle_candidate_quote_request() {
1051        use ant_protocol::payment::verify::verify_merkle_candidate_signature;
1052        use evmlib::merkle_payments::MerklePaymentCandidateNode;
1053
1054        // create_test_protocol already wires ML-DSA-65 signing
1055        let (protocol, _temp) = create_test_protocol().await;
1056
1057        let address = [0x77; 32];
1058        let timestamp = std::time::SystemTime::now()
1059            .duration_since(std::time::UNIX_EPOCH)
1060            .expect("system time")
1061            .as_secs();
1062
1063        let request = MerkleCandidateQuoteRequest {
1064            address,
1065            data_type: DATA_TYPE_CHUNK,
1066            data_size: 4096,
1067            merkle_payment_timestamp: timestamp,
1068        };
1069        let msg = ChunkMessage {
1070            request_id: 600,
1071            body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
1072        };
1073        let msg_bytes = msg.encode().expect("encode request");
1074
1075        let response_bytes = protocol
1076            .try_handle_request(&msg_bytes)
1077            .await
1078            .expect("handle merkle candidate quote")
1079            .expect("expected response");
1080        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
1081
1082        assert_eq!(response.request_id, 600);
1083        match response.body {
1084            ChunkMessageBody::MerkleCandidateQuoteResponse(
1085                MerkleCandidateQuoteResponse::Success { candidate_node },
1086            ) => {
1087                let candidate: MerklePaymentCandidateNode =
1088                    rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
1089
1090                // Verify ML-DSA-65 signature
1091                assert!(
1092                    verify_merkle_candidate_signature(&candidate),
1093                    "ML-DSA-65 candidate signature must be valid"
1094                );
1095
1096                assert_eq!(candidate.merkle_payment_timestamp, timestamp);
1097                // Node-calculated price based on records stored
1098                assert!(candidate.price >= evmlib::common::Amount::ZERO);
1099            }
1100            other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
1101        }
1102    }
1103
1104    #[tokio::test]
1105    async fn test_handle_unexpected_response_message() {
1106        let (protocol, _temp) = create_test_protocol().await;
1107
1108        // Send a PutResponse as if it were a request — should return None
1109        let msg = ChunkMessage {
1110            request_id: 200,
1111            body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
1112        };
1113        let msg_bytes = msg.encode().expect("encode");
1114
1115        let result = protocol
1116            .try_handle_request(&msg_bytes)
1117            .await
1118            .expect("handle msg");
1119
1120        assert!(
1121            result.is_none(),
1122            "expected None for response message, got: {result:?}"
1123        );
1124    }
1125
1126    #[tokio::test]
1127    async fn test_quote_already_stored_flag() {
1128        let (protocol, _temp) = create_test_protocol().await;
1129
1130        let content = b"already stored quote test";
1131        let address = LmdbStorage::compute_address(content);
1132
1133        // Store the chunk first
1134        protocol.payment_verifier().cache_insert(address);
1135        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
1136        let put_msg = ChunkMessage {
1137            request_id: 300,
1138            body: ChunkMessageBody::PutRequest(put_request),
1139        };
1140        let put_bytes = put_msg.encode().expect("encode put");
1141        let _ = protocol
1142            .try_handle_request(&put_bytes)
1143            .await
1144            .expect("handle put");
1145
1146        // Now request a quote for the same address — already_stored should be true
1147        let quote_request = ChunkQuoteRequest {
1148            address,
1149            data_size: content.len() as u64,
1150            data_type: DATA_TYPE_CHUNK,
1151        };
1152        let quote_msg = ChunkMessage {
1153            request_id: 301,
1154            body: ChunkMessageBody::QuoteRequest(quote_request),
1155        };
1156        let quote_bytes = quote_msg.encode().expect("encode quote");
1157        let response_bytes = protocol
1158            .try_handle_request(&quote_bytes)
1159            .await
1160            .expect("handle quote")
1161            .expect("expected response");
1162        let response = ChunkMessage::decode(&response_bytes).expect("decode");
1163
1164        match response.body {
1165            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1166                already_stored, ..
1167            }) => {
1168                assert!(
1169                    already_stored,
1170                    "already_stored should be true for existing chunk"
1171                );
1172            }
1173            other => panic!("expected Success with already_stored, got: {other:?}"),
1174        }
1175
1176        // Request a quote for a chunk that does NOT exist — already_stored should be false
1177        let new_address = [0xFFu8; 32];
1178        let quote_request2 = ChunkQuoteRequest {
1179            address: new_address,
1180            data_size: 100,
1181            data_type: DATA_TYPE_CHUNK,
1182        };
1183        let quote_msg2 = ChunkMessage {
1184            request_id: 302,
1185            body: ChunkMessageBody::QuoteRequest(quote_request2),
1186        };
1187        let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
1188        let response_bytes2 = protocol
1189            .try_handle_request(&quote_bytes2)
1190            .await
1191            .expect("handle quote2")
1192            .expect("expected response");
1193        let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
1194
1195        match response2.body {
1196            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1197                already_stored, ..
1198            }) => {
1199                assert!(
1200                    !already_stored,
1201                    "already_stored should be false for new chunk"
1202                );
1203            }
1204            other => panic!("expected Success with already_stored=false, got: {other:?}"),
1205        }
1206    }
1207
1208    /// Drive the real quote handler, then read the record count it priced on.
1209    /// The handler calls `resync_quote_metric` first, so this reflects records
1210    /// ACTUALLY HELD.
1211    fn priced_records_after_quote(protocol: &AntProtocol) -> usize {
1212        let quote_request = ChunkQuoteRequest {
1213            address: [0xAAu8; 32], // a quote-only probe, not one of the stored chunks
1214            data_size: 100,
1215            data_type: DATA_TYPE_CHUNK,
1216        };
1217        let _ = protocol.handle_quote(&quote_request);
1218        protocol.priced_records_stored()
1219    }
1220
1221    /// The quote price must track records ACTUALLY HELD: deleting stored chunks
1222    /// must lower the priced record count, not keep quoting as if the data were
1223    /// still held. Exercises the storage-driven resync in `resync_quote_metric`.
1224    #[tokio::test]
1225    async fn test_quote_metric_reflects_deletions() {
1226        let (protocol, _temp) = create_test_protocol().await;
1227
1228        // Distinct content -> distinct content-addressed keys.
1229        let contents: Vec<Vec<u8>> = (0u8..5).map(|i| vec![i; 64]).collect();
1230        let mut addresses = Vec::new();
1231        for content in &contents {
1232            let addr = LmdbStorage::compute_address(content);
1233            protocol.put_local(&addr, content).await.expect("put_local");
1234            addresses.push(addr);
1235        }
1236
1237        // 5 records held -> priced count 5.
1238        assert_eq!(priced_records_after_quote(&protocol), 5);
1239
1240        // Delete 2 chunks the node was holding.
1241        for addr in addresses.iter().take(2) {
1242            assert!(protocol.storage().delete(addr).await.expect("delete"));
1243        }
1244        assert_eq!(priced_records_after_quote(&protocol), 3);
1245
1246        // Delete the rest; priced count floors at 0, never underflows.
1247        for addr in addresses.iter().skip(2) {
1248            assert!(protocol.storage().delete(addr).await.expect("delete"));
1249        }
1250        assert_eq!(priced_records_after_quote(&protocol), 0);
1251    }
1252
1253    /// Stronger, externally-observable proof: the actual quote PRICE returned
1254    /// to a client must drop after the node deletes data it held. A monotonic
1255    /// store counter would keep the price elevated; the resync ties price to
1256    /// records actually held.
1257    /// FLIPS IF: `resync_quote_metric` is removed — the price would stay at the
1258    /// 10-record level even after deletions (`record_store` only ever increments).
1259    #[tokio::test]
1260    async fn test_quote_price_drops_after_deletion() {
1261        use crate::payment::pricing::calculate_price;
1262
1263        let (protocol, _temp) = create_test_protocol().await;
1264        let contents: Vec<Vec<u8>> = (0u8..10).map(|i| vec![i; 64]).collect();
1265        let mut addresses = Vec::new();
1266        for content in &contents {
1267            let addr = LmdbStorage::compute_address(content);
1268            protocol.put_local(&addr, content).await.expect("put_local");
1269            addresses.push(addr);
1270        }
1271
1272        // Drive a real quote; the priced count must equal records held (10),
1273        // and the price must equal calculate_price(10) — the externally
1274        // observable contract.
1275        assert_eq!(priced_records_after_quote(&protocol), 10);
1276        let price_full = calculate_price(10);
1277
1278        // Delete 8 of 10 held chunks.
1279        for addr in addresses.iter().take(8) {
1280            assert!(protocol.storage().delete(addr).await.expect("delete"));
1281        }
1282        // The next quote must price on 2 records, and the price must be the
1283        // calculate_price(2) value — strictly different from the 10-record
1284        // price (price is monotonic non-decreasing in records_stored).
1285        assert_eq!(priced_records_after_quote(&protocol), 2);
1286        let price_after = calculate_price(2);
1287        assert!(
1288            price_after < price_full,
1289            "deleting data must lower the observable quote price \
1290             (full={price_full:?}, after={price_after:?})"
1291        );
1292    }
1293}