Skip to main content

ant_node/storage/
handler.rs

1//! ANT protocol handler for autonomi protocol messages.
2//!
3//! This handler processes chunk PUT/GET requests with optional payment verification,
4//! storing chunks to LMDB and using the DHT for network-wide retrieval.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────┐
10//! │                    AntProtocol                        │
11//! ├─────────────────────────────────────────────────────────┤
12//! │  protocol_id() = "autonomi.ant.chunk.v1"                  │
13//! │                                                         │
14//! │  try_handle_request(data) ──▶ decode ChunkMessage  │
15//! │                                   │                     │
16//! │         ┌─────────────────────────┼─────────────────┐  │
17//! │         ▼                         ▼                 ▼  │
18//! │   ChunkQuoteRequest           ChunkPutRequest    ChunkGetRequest
19//! │         │                         │                 │  │
20//! │         ▼                         ▼                 ▼  │
21//! │   QuoteGenerator          PaymentVerifier    LmdbStorage│
22//! │         │                         │                 │  │
23//! │         └─────────────────────────┴─────────────────┘  │
24//! │                           │                             │
25//! │           return Ok(Some(response_bytes))              │
26//! │           return Ok(None) for response messages       │
27//! └─────────────────────────────────────────────────────────┘
28//! ```
29
30#[cfg(test)]
31use crate::ant_protocol::DATA_TYPE_CHUNK;
32use crate::ant_protocol::{
33    ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
34    ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
35    MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE,
36};
37use crate::client::compute_address;
38use crate::error::{Error, Result};
39use crate::logging::{debug, info, warn};
40use crate::payment::{PaymentVerifier, QuoteGenerator, VerificationContext};
41use crate::replication::fresh::FreshWriteEvent;
42use crate::storage::lmdb::LmdbStorage;
43use bytes::Bytes;
44use saorsa_core::P2PNode;
45use std::sync::Arc;
46use tokio::sync::mpsc;
47
48/// ANT protocol handler.
49///
50/// Handles chunk PUT/GET/Quote requests using LMDB storage for persistence
51/// and optional payment verification.
52pub struct AntProtocol {
53    /// LMDB storage for chunk persistence.
54    storage: Arc<LmdbStorage>,
55    /// Payment verifier for checking payments.
56    payment_verifier: Arc<PaymentVerifier>,
57    /// Quote generator for creating storage quotes.
58    /// Also handles merkle candidate quote signing via ML-DSA-65.
59    quote_generator: Arc<QuoteGenerator>,
60    /// Channel for notifying the replication engine about newly-stored chunks.
61    fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
62}
63
64impl AntProtocol {
65    /// Create a new ANT protocol handler.
66    ///
67    /// # Arguments
68    ///
69    /// * `storage` - LMDB storage for chunk persistence
70    /// * `payment_verifier` - Payment verifier for validating payments
71    /// * `quote_generator` - Quote generator for creating storage quotes
72    #[must_use]
73    pub fn new(
74        storage: Arc<LmdbStorage>,
75        payment_verifier: Arc<PaymentVerifier>,
76        quote_generator: Arc<QuoteGenerator>,
77    ) -> Self {
78        // Keep the PaymentVerifier's paid-quote price floor and the
79        // QuoteGenerator's pricing wired to the same authoritative store used
80        // by this protocol handler. Both must read the same record count: the
81        // generator prices quotes from current_chunks(), and the verifier later
82        // checks the paid median quote against current_chunks(). Attaching both
83        // here makes the invariant automatic for every AntProtocol
84        // construction path, including tests and future startup variants.
85        payment_verifier.attach_storage(Arc::clone(&storage));
86        quote_generator.attach_storage(Arc::clone(&storage));
87
88        Self {
89            storage,
90            payment_verifier,
91            quote_generator,
92            fresh_write_tx: None,
93        }
94    }
95
96    /// Attach the node's P2P handle for payment live-DHT checks.
97    ///
98    /// Wires the handle into the payment verifier so payment-proof closeness
99    /// checks can use the live routing view. Idempotent: calling twice
100    /// replaces the verifier handle.
101    pub fn attach_p2p_node(&self, node: Arc<P2PNode>) {
102        self.payment_verifier.attach_p2p_node(node);
103        debug!("AntProtocol: P2PNode attached for payment live-DHT checks");
104    }
105
106    /// Set the channel sender for fresh-write replication events.
107    ///
108    /// When set, successful chunk PUTs will notify the replication engine
109    /// so it can fan out fresh offers to the close group.
110    pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
111        self.fresh_write_tx = Some(tx);
112    }
113
114    /// Get the protocol identifier.
115    #[must_use]
116    pub fn protocol_id(&self) -> &'static str {
117        CHUNK_PROTOCOL_ID
118    }
119
120    /// Get a reference to the underlying LMDB storage.
121    #[must_use]
122    pub fn storage(&self) -> Arc<LmdbStorage> {
123        Arc::clone(&self.storage)
124    }
125
126    /// Get a shared reference to the payment verifier.
127    #[must_use]
128    pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
129        Arc::clone(&self.payment_verifier)
130    }
131
132    /// Handle an incoming request and produce a response.
133    ///
134    /// Decodes the raw message, processes it if it is a request variant,
135    /// and returns the encoded response bytes.  Returns `Ok(None)` for
136    /// response messages (which are meant for client subscribers, not for
137    /// the protocol handler).
138    ///
139    /// # Errors
140    ///
141    /// Returns an error if message decoding, handling, or encoding fails.
142    pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
143        let message = ChunkMessage::decode(data)
144            .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
145
146        let request_id = message.request_id;
147
148        let response_body = match message.body {
149            ChunkMessageBody::PutRequest(req) => {
150                ChunkMessageBody::PutResponse(self.handle_put(req).await)
151            }
152            ChunkMessageBody::GetRequest(req) => {
153                ChunkMessageBody::GetResponse(self.handle_get(req).await)
154            }
155            ChunkMessageBody::QuoteRequest(ref req) => {
156                ChunkMessageBody::QuoteResponse(self.handle_quote(req))
157            }
158            ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
159                ChunkMessageBody::MerkleCandidateQuoteResponse(
160                    self.handle_merkle_candidate_quote(req),
161                )
162            }
163            // Anything else — response messages are handled by client
164            // subscribers (e.g. send_and_await_chunk_response), not by the
165            // protocol handler. Returning None prevents the caller from
166            // sending a reply, which would create an infinite ping-pong
167            // loop.
168            //
169            // `ChunkMessageBody` is `#[non_exhaustive]` in ant-protocol, so
170            // a future wire variant added on a protocol minor bump also
171            // lands here and is dropped. The CHUNK_PROTOCOL_ID multistream-
172            // select handshake version-gates peers, so this arm should
173            // only be reached by a misconfigured peer.
174            _ => return Ok(None),
175        };
176
177        let response = ChunkMessage {
178            request_id,
179            body: response_body,
180        };
181
182        response
183            .encode()
184            .map(|b| Some(Bytes::from(b)))
185            .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
186    }
187
188    /// Handle a PUT request.
189    ///
190    /// Wraps `handle_put_inner` to emit a single structured tracing event per
191    /// PUT RPC at every exit path, including early-return validation paths.
192    /// The event uses `target: "ant_node::storage::rpc_latency"` so that
193    /// Elasticsearch / Kibana can build p50/p95/p99 store-RPC latency
194    /// histograms from the existing telegraf log forwarding.
195    async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
196        let start = std::time::Instant::now();
197        let addr_hex = hex::encode(request.address);
198        let chunk_size = request.content.len();
199        let response = self.handle_put_inner(request).await;
200        let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
201        let outcome: &'static str = match &response {
202            ChunkPutResponse::Success { .. } => "success",
203            ChunkPutResponse::AlreadyExists { .. } => "already_exists",
204            ChunkPutResponse::PaymentRequired { .. } => "payment_required",
205            ChunkPutResponse::Error(_) => "error",
206            _ => "unknown",
207        };
208        info!(
209            target: "ant_node::storage::rpc_latency",
210            duration_ms,
211            chunk_size,
212            outcome,
213            addr = %addr_hex,
214            "put_rpc"
215        );
216        response
217    }
218
219    /// Inner body of `handle_put` — see the wrapper for the per-RPC latency log.
220    async fn handle_put_inner(&self, request: ChunkPutRequest) -> ChunkPutResponse {
221        let address = request.address;
222        let addr_hex = hex::encode(address);
223        debug!("Handling PUT request for {addr_hex}");
224
225        // 1. Validate chunk size
226        if request.content.len() > MAX_CHUNK_SIZE {
227            return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
228                size: request.content.len(),
229                max_size: MAX_CHUNK_SIZE,
230            });
231        }
232
233        // 2. Verify content address matches BLAKE3(content)
234        let computed = compute_address(&request.content);
235        if computed != address {
236            return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
237                expected: address,
238                actual: computed,
239            });
240        }
241
242        // 3. Check if already exists (idempotent success)
243        match self.storage.exists(&address) {
244            Ok(true) => {
245                debug!("Chunk {addr_hex} already exists");
246                return ChunkPutResponse::AlreadyExists { address };
247            }
248            Err(e) => {
249                return ChunkPutResponse::Error(ProtocolError::Internal(format!(
250                    "Storage read failed: {e}"
251                )));
252            }
253            Ok(false) => {}
254        }
255
256        // 4. Cheap disk-space pre-check — runs BEFORE the expensive payment
257        //    verification path (ML-DSA pool checks, a Kademlia closeness
258        //    lookup, and an on-chain Arbitrum RPC). A disk-full node can never
259        //    satisfy this PUT, so reject it here rather than burning that work
260        //    only to fail the reserve check inside `storage.put` (V2-411). The
261        //    check caches passing results, so it is free per-PUT on a healthy
262        //    node; a disk-full node re-runs a cheap `available_space` syscall
263        //    each PUT (still negligible next to the verification it avoids) and
264        //    so detects freed space promptly. The store path keeps its own
265        //    check as defence-in-depth.
266        if let Err(e) = self.storage.check_capacity() {
267            info!(
268                target: "ant_node::storage::disk_precheck",
269                addr = %addr_hex,
270                "Rejecting PUT before payment verification: {e}"
271            );
272            return ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()));
273        }
274
275        // 5. Verify payment. The ClientPut context applies the store-strength
276        // payment cache and verifies live proofs. Direct client PUT does not
277        // reject based on this node's local storage-responsibility view.
278        let payment_result = self
279            .payment_verifier
280            .verify_payment(
281                &address,
282                request.payment_proof.as_deref(),
283                VerificationContext::ClientPut,
284            )
285            .await;
286
287        match payment_result {
288            Ok(status) if status.can_store() => {
289                // Payment verified or cached
290            }
291            Ok(_) => {
292                return ChunkPutResponse::PaymentRequired {
293                    message: "Payment required for new chunk".to_string(),
294                };
295            }
296            Err(e) => {
297                return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
298            }
299        }
300
301        // 6. Store chunk
302        match self.storage.put(&address, &request.content).await {
303            Ok(_) => {
304                let content_len = request.content.len();
305                info!("Stored chunk {addr_hex} ({content_len} bytes)");
306                // Bump the in-memory fallback counter. Both pricing and the
307                // paid-quote floor now read LmdbStorage::current_chunks() directly,
308                // so this counter only matters when no storage is attached
309                // (unit tests / mis-configured startup). Kept warm so that
310                // fallback path stays roughly accurate.
311                self.quote_generator.record_store();
312
313                // 7. Notify replication engine for fresh fan-out.
314                //    Only emit when a real proof is present — cached-as-verified
315                //    PUTs have no proof to forward, and the chunk would have
316                //    already replicated on the original write that carried one.
317                if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
318                    // `request.content` is now `bytes::Bytes`; FreshWriteEvent
319                    // still carries the chunk as `Vec<u8>` for compatibility
320                    // with the replication wire format, so materialise once
321                    // here. Done only on the success path, where storage has
322                    // already accepted the chunk.
323                    let event = FreshWriteEvent {
324                        key: address,
325                        data: request.content.to_vec(),
326                        payment_proof: proof,
327                    };
328                    if tx.send(event).is_err() {
329                        debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
330                    }
331                }
332
333                ChunkPutResponse::Success { address }
334            }
335            Err(e) => {
336                warn!("Failed to store chunk {addr_hex}: {e}");
337                ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
338            }
339        }
340    }
341
342    /// Handle a GET request.
343    ///
344    /// Wraps `handle_get_inner` to emit a single structured tracing event per
345    /// GET RPC at every exit path. See `handle_put` for the rationale.
346    async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
347        let start = std::time::Instant::now();
348        let addr_hex = hex::encode(request.address);
349        let response = self.handle_get_inner(request).await;
350        let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
351        let outcome: &'static str = match &response {
352            ChunkGetResponse::Success { .. } => "success",
353            ChunkGetResponse::NotFound { .. } => "not_found",
354            ChunkGetResponse::Error(_) => "error",
355            _ => "unknown",
356        };
357        info!(
358            target: "ant_node::storage::rpc_latency",
359            duration_ms,
360            outcome,
361            addr = %addr_hex,
362            "get_rpc"
363        );
364        response
365    }
366
367    /// Inner body of `handle_get` — see the wrapper for the per-RPC latency log.
368    async fn handle_get_inner(&self, request: ChunkGetRequest) -> ChunkGetResponse {
369        let address = request.address;
370        let addr_hex = hex::encode(address);
371        debug!("Handling GET request for {addr_hex}");
372
373        match self.storage.get(&address).await {
374            Ok(Some(content)) => {
375                let content_len = content.len();
376                debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
377                ChunkGetResponse::Success { address, content }
378            }
379            Ok(None) => {
380                debug!("Chunk {addr_hex} not found");
381                ChunkGetResponse::NotFound { address }
382            }
383            Err(e) => {
384                warn!("Failed to retrieve chunk {addr_hex}: {e}");
385                ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
386            }
387        }
388    }
389
390    /// Handle a quote request.
391    fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
392        let addr_hex = hex::encode(request.address);
393        let data_size = request.data_size;
394        debug!("Handling quote request for {addr_hex} (size: {data_size})");
395
396        // Check if the chunk is already stored so we can tell the client
397        // to skip payment (already_stored = true).
398        // The match intentionally logs the error when the `logging` feature is
399        // active. Clippy suggests `unwrap_or_default()` when logging is compiled
400        // out, but keeping the explicit match preserves the diagnostic intent.
401        #[allow(clippy::manual_unwrap_or_default)]
402        let already_stored = match self.storage.exists(&request.address) {
403            Ok(exists) => exists,
404            Err(e) => {
405                warn!("Storage check failed for {addr_hex}: {e}");
406                false // Assume not stored on error — generate a normal quote.
407            }
408        };
409
410        if already_stored {
411            debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
412        }
413
414        // Validate data size - data_size is u64, cast carefully and reject overflow
415        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
416            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
417                size: MAX_CHUNK_SIZE + 1,
418                max_size: MAX_CHUNK_SIZE,
419            });
420        };
421        if data_size_usize > MAX_CHUNK_SIZE {
422            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
423                size: data_size_usize,
424                max_size: MAX_CHUNK_SIZE,
425            });
426        }
427
428        match self
429            .quote_generator
430            .create_quote(request.address, data_size_usize, request.data_type)
431        {
432            Ok(quote) => {
433                // Serialize the quote
434                match rmp_serde::to_vec(&quote) {
435                    Ok(quote_bytes) => ChunkQuoteResponse::Success {
436                        quote: quote_bytes,
437                        already_stored,
438                    },
439                    Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
440                        "Failed to serialize quote: {e}"
441                    ))),
442                }
443            }
444            Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
445        }
446    }
447
448    /// Handle a merkle candidate quote request.
449    fn handle_merkle_candidate_quote(
450        &self,
451        request: &MerkleCandidateQuoteRequest,
452    ) -> MerkleCandidateQuoteResponse {
453        let addr_hex = hex::encode(request.address);
454        let data_size = request.data_size;
455        debug!(
456            "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
457            request.merkle_payment_timestamp
458        );
459
460        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
461            return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
462                "data_size {} overflows usize",
463                request.data_size
464            )));
465        };
466        if data_size_usize > MAX_CHUNK_SIZE {
467            return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
468                size: data_size_usize,
469                max_size: MAX_CHUNK_SIZE,
470            });
471        }
472
473        match self.quote_generator.create_merkle_candidate_quote(
474            data_size_usize,
475            request.data_type,
476            request.merkle_payment_timestamp,
477        ) {
478            Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
479                Ok(bytes) => MerkleCandidateQuoteResponse::Success {
480                    candidate_node: bytes,
481                },
482                Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
483                    "Failed to serialize merkle candidate node: {e}"
484                ))),
485            },
486            Err(e) => {
487                MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
488            }
489        }
490    }
491
492    /// Get storage statistics.
493    #[must_use]
494    pub fn storage_stats(&self) -> crate::storage::StorageStats {
495        self.storage.stats()
496    }
497
498    /// Get payment cache statistics.
499    #[must_use]
500    pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
501        self.payment_verifier.cache_stats()
502    }
503
504    /// Get a reference to the payment verifier.
505    ///
506    /// Exposed for **test harnesses only** — production code should not call
507    /// this directly. Use `cache_insert()` on the returned verifier to
508    /// pre-populate the payment cache in test setups.
509    #[cfg(any(test, feature = "test-utils"))]
510    #[must_use]
511    pub fn payment_verifier(&self) -> &PaymentVerifier {
512        &self.payment_verifier
513    }
514
515    /// Check if a chunk exists locally.
516    ///
517    /// # Errors
518    ///
519    /// Returns an error if the storage read fails.
520    pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
521        self.storage.exists(address)
522    }
523
524    /// Get a chunk directly from local storage.
525    ///
526    /// # Errors
527    ///
528    /// Returns an error if storage access fails.
529    pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
530        self.storage.get(address).await
531    }
532
533    /// Store a chunk directly to local storage (bypasses payment verification).
534    ///
535    /// TEST ONLY - This method bypasses payment verification and should only be used in tests.
536    ///
537    /// # Errors
538    ///
539    /// Returns an error if storage fails or content doesn't match address.
540    #[cfg(test)]
541    pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
542        self.storage.put(address, content).await
543    }
544}
545
546#[cfg(test)]
547#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
548mod tests {
549    use super::*;
550    use crate::payment::metrics::QuotingMetricsTracker;
551    use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
552    use crate::storage::LmdbStorageConfig;
553    use evmlib::RewardsAddress;
554    use saorsa_core::identity::NodeIdentity;
555    use saorsa_core::MlDsa65;
556    use saorsa_pqc::pqc::types::MlDsaSecretKey;
557    use tempfile::TempDir;
558
559    async fn create_test_protocol() -> (AntProtocol, TempDir) {
560        // `test_default()` sets `disk_reserve: 0`, so the disk pre-check always
561        // passes for the regular tests.
562        create_test_protocol_with_reserve(0).await
563    }
564
565    /// Build a test protocol whose storage enforces the given disk reserve.
566    ///
567    /// A very large reserve (e.g. `u64::MAX`) makes `available < reserve`
568    /// always true, so the disk-space pre-check in `handle_put_inner` fails —
569    /// used to exercise the V2-411 early-return path.
570    async fn create_test_protocol_with_reserve(disk_reserve: u64) -> (AntProtocol, TempDir) {
571        let temp_dir = TempDir::new().expect("create temp dir");
572
573        let storage_config = LmdbStorageConfig {
574            root_dir: temp_dir.path().to_path_buf(),
575            disk_reserve,
576            ..LmdbStorageConfig::test_default()
577        };
578        let storage = Arc::new(
579            LmdbStorage::new(storage_config)
580                .await
581                .expect("create storage"),
582        );
583
584        let rewards_address = RewardsAddress::new([1u8; 20]);
585        let payment_config = PaymentVerifierConfig {
586            evm: EvmVerifierConfig::default(),
587            cache_capacity: 100_000,
588            close_group_size: crate::ant_protocol::CLOSE_GROUP_SIZE,
589            local_rewards_address: rewards_address,
590        };
591        let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
592        let metrics_tracker = QuotingMetricsTracker::new(100);
593        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
594
595        // Wire ML-DSA-65 signing so quote requests succeed
596        let identity = NodeIdentity::generate().expect("generate identity");
597        let pub_key_bytes = identity.public_key().as_bytes().to_vec();
598        let sk_bytes = identity.secret_key_bytes().to_vec();
599        let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
600        quote_generator.set_signer(pub_key_bytes, move |msg| {
601            use saorsa_pqc::pqc::MlDsaOperations;
602            let ml_dsa = MlDsa65::new();
603            ml_dsa
604                .sign(&sk, msg)
605                .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
606        });
607
608        let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
609        (protocol, temp_dir)
610    }
611
612    #[tokio::test]
613    async fn test_put_and_get_chunk() {
614        let (protocol, _temp) = create_test_protocol().await;
615
616        let content = b"hello world";
617        let address = LmdbStorage::compute_address(content);
618
619        // Pre-populate payment cache so EVM verification is bypassed
620        protocol.payment_verifier().cache_insert(address);
621
622        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
623        let put_msg = ChunkMessage {
624            request_id: 1,
625            body: ChunkMessageBody::PutRequest(put_request),
626        };
627        let put_bytes = put_msg.encode().expect("encode put");
628
629        // Handle PUT
630        let response_bytes = protocol
631            .try_handle_request(&put_bytes)
632            .await
633            .expect("handle put")
634            .expect("expected response");
635        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
636
637        assert_eq!(response.request_id, 1);
638        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
639            response.body
640        {
641            assert_eq!(addr, address);
642        } else {
643            panic!("expected PutResponse::Success, got: {response:?}");
644        }
645
646        // Create GET request
647        let get_request = ChunkGetRequest::new(address);
648        let get_msg = ChunkMessage {
649            request_id: 2,
650            body: ChunkMessageBody::GetRequest(get_request),
651        };
652        let get_bytes = get_msg.encode().expect("encode get");
653
654        // Handle GET
655        let response_bytes = protocol
656            .try_handle_request(&get_bytes)
657            .await
658            .expect("handle get")
659            .expect("expected response");
660        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
661
662        assert_eq!(response.request_id, 2);
663        if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
664            address: addr,
665            content: data,
666        }) = response.body
667        {
668            assert_eq!(addr, address);
669            assert_eq!(data, content.to_vec());
670        } else {
671            panic!("expected GetResponse::Success");
672        }
673    }
674
675    #[tokio::test]
676    async fn test_get_not_found() {
677        let (protocol, _temp) = create_test_protocol().await;
678
679        let address = [0xAB; 32];
680        let get_request = ChunkGetRequest::new(address);
681        let get_msg = ChunkMessage {
682            request_id: 10,
683            body: ChunkMessageBody::GetRequest(get_request),
684        };
685        let get_bytes = get_msg.encode().expect("encode get");
686
687        let response_bytes = protocol
688            .try_handle_request(&get_bytes)
689            .await
690            .expect("handle get")
691            .expect("expected response");
692        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
693
694        assert_eq!(response.request_id, 10);
695        if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
696            response.body
697        {
698            assert_eq!(addr, address);
699        } else {
700            panic!("expected GetResponse::NotFound");
701        }
702    }
703
704    #[tokio::test]
705    async fn test_put_address_mismatch() {
706        let (protocol, _temp) = create_test_protocol().await;
707
708        let content = b"test content";
709        let wrong_address = [0xFF; 32]; // Wrong address
710
711        // Pre-populate cache for the wrong address so we test address mismatch, not payment
712        protocol.payment_verifier().cache_insert(wrong_address);
713
714        let put_request = ChunkPutRequest::new(wrong_address, Bytes::copy_from_slice(content));
715        let put_msg = ChunkMessage {
716            request_id: 20,
717            body: ChunkMessageBody::PutRequest(put_request),
718        };
719        let put_bytes = put_msg.encode().expect("encode put");
720
721        let response_bytes = protocol
722            .try_handle_request(&put_bytes)
723            .await
724            .expect("handle put")
725            .expect("expected response");
726        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
727
728        assert_eq!(response.request_id, 20);
729        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
730            ProtocolError::AddressMismatch { .. },
731        )) = response.body
732        {
733            // Expected
734        } else {
735            panic!("expected AddressMismatch error, got: {response:?}");
736        }
737    }
738
739    #[tokio::test]
740    async fn test_put_chunk_too_large() {
741        let (protocol, _temp) = create_test_protocol().await;
742
743        // Create oversized content
744        let content = vec![0u8; MAX_CHUNK_SIZE + 1];
745        let address = LmdbStorage::compute_address(&content);
746
747        let put_request = ChunkPutRequest::new(address, Bytes::from(content));
748        let put_msg = ChunkMessage {
749            request_id: 30,
750            body: ChunkMessageBody::PutRequest(put_request),
751        };
752        let put_bytes = put_msg.encode().expect("encode put");
753
754        let response_bytes = protocol
755            .try_handle_request(&put_bytes)
756            .await
757            .expect("handle put")
758            .expect("expected response");
759        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
760
761        assert_eq!(response.request_id, 30);
762        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
763            ProtocolError::ChunkTooLarge { .. },
764        )) = response.body
765        {
766            // Expected
767        } else {
768            panic!("expected ChunkTooLarge error");
769        }
770    }
771
772    /// V2-411: a disk-full node must reject a PUT with the disk-space error
773    /// *before* running payment verification.
774    ///
775    /// The chunk is intentionally **not** cache-inserted, so if the handler
776    /// reached `verify_payment` it would return `PaymentRequired`/`PaymentFailed`
777    /// (an uncached chunk with no proof). Observing the `StorageFailed` disk
778    /// error instead proves the disk pre-check short-circuited ahead of
779    /// verification — there is no on-chain path to reach.
780    #[tokio::test]
781    async fn test_put_rejected_on_insufficient_disk_before_verification() {
782        // u64::MAX reserve guarantees `available < reserve`, so the cached
783        // disk-space check always fails.
784        let (protocol, _temp) = create_test_protocol_with_reserve(u64::MAX).await;
785
786        let content = b"chunk for a disk-full node";
787        let address = LmdbStorage::compute_address(content);
788
789        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
790        let put_msg = ChunkMessage {
791            request_id: 41,
792            body: ChunkMessageBody::PutRequest(put_request),
793        };
794        let put_bytes = put_msg.encode().expect("encode put");
795
796        let response_bytes = protocol
797            .try_handle_request(&put_bytes)
798            .await
799            .expect("handle put")
800            .expect("expected response");
801        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
802
803        assert_eq!(response.request_id, 41);
804        match response.body {
805            ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
806                ProtocolError::StorageFailed(msg),
807            )) => {
808                assert!(
809                    msg.contains("Insufficient disk space"),
810                    "expected disk-space error, got: {msg}"
811                );
812            }
813            other => {
814                panic!("expected StorageFailed disk error before verification, got: {other:?}")
815            }
816        }
817
818        // And nothing was stored.
819        assert!(!protocol.exists(&address).expect("exists check"));
820    }
821
822    #[tokio::test]
823    async fn test_put_already_exists() {
824        let (protocol, _temp) = create_test_protocol().await;
825
826        let content = b"duplicate content";
827        let address = LmdbStorage::compute_address(content);
828
829        // Pre-populate cache so EVM verification is bypassed
830        protocol.payment_verifier().cache_insert(address);
831
832        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
833        let put_msg = ChunkMessage {
834            request_id: 40,
835            body: ChunkMessageBody::PutRequest(put_request),
836        };
837        let put_bytes = put_msg.encode().expect("encode put");
838
839        let _ = protocol
840            .try_handle_request(&put_bytes)
841            .await
842            .expect("handle put");
843
844        // Store again - should return AlreadyExists
845        let response_bytes = protocol
846            .try_handle_request(&put_bytes)
847            .await
848            .expect("handle put 2")
849            .expect("expected response");
850        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
851
852        assert_eq!(response.request_id, 40);
853        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
854            response.body
855        {
856            assert_eq!(addr, address);
857        } else {
858            panic!("expected AlreadyExists");
859        }
860    }
861
862    #[tokio::test]
863    async fn test_protocol_id() {
864        let (protocol, _temp) = create_test_protocol().await;
865        assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
866    }
867
868    #[tokio::test]
869    async fn test_exists_and_local_access() {
870        let (protocol, _temp) = create_test_protocol().await;
871
872        let content = b"local access test";
873        let address = LmdbStorage::compute_address(content);
874
875        assert!(!protocol.exists(&address).expect("exists check"));
876
877        protocol
878            .put_local(&address, content)
879            .await
880            .expect("put local");
881
882        assert!(protocol.exists(&address).expect("exists check"));
883
884        let retrieved = protocol.get_local(&address).await.expect("get local");
885        assert_eq!(retrieved, Some(content.to_vec()));
886    }
887
888    #[tokio::test]
889    async fn test_cache_insert_is_visible() {
890        let (protocol, _temp) = create_test_protocol().await;
891
892        let content = b"cache test content";
893        let address = LmdbStorage::compute_address(content);
894
895        // Before insert: cache should be empty
896        let stats_before = protocol.payment_cache_stats();
897        assert_eq!(stats_before.additions, 0);
898
899        // Pre-populate cache
900        protocol.payment_verifier().cache_insert(address);
901
902        // After insert: cache should have the xorname
903        let stats_after = protocol.payment_cache_stats();
904        assert_eq!(stats_after.additions, 1);
905
906        // PUT should succeed (cache hit)
907        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
908        let put_msg = ChunkMessage {
909            request_id: 100,
910            body: ChunkMessageBody::PutRequest(put_request),
911        };
912        let put_bytes = put_msg.encode().expect("encode put");
913        let response_bytes = protocol
914            .try_handle_request(&put_bytes)
915            .await
916            .expect("handle put")
917            .expect("expected response");
918        let response = ChunkMessage::decode(&response_bytes).expect("decode");
919
920        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
921            // expected
922        } else {
923            panic!("expected success, got: {response:?}");
924        }
925    }
926
927    #[tokio::test]
928    async fn test_put_same_chunk_twice_hits_cache() {
929        let (protocol, _temp) = create_test_protocol().await;
930
931        let content = b"duplicate cache test";
932        let address = LmdbStorage::compute_address(content);
933
934        // Pre-populate cache for first PUT
935        protocol.payment_verifier().cache_insert(address);
936
937        // First PUT
938        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
939        let put_msg = ChunkMessage {
940            request_id: 110,
941            body: ChunkMessageBody::PutRequest(put_request),
942        };
943        let put_bytes = put_msg.encode().expect("encode put");
944        let _ = protocol
945            .try_handle_request(&put_bytes)
946            .await
947            .expect("handle put 1");
948
949        // Second PUT should return AlreadyExists from the storage idempotency check.
950        let response_bytes = protocol
951            .try_handle_request(&put_bytes)
952            .await
953            .expect("handle put 2")
954            .expect("expected response");
955        let response = ChunkMessage::decode(&response_bytes).expect("decode");
956
957        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
958        {
959            // expected
960        } else {
961            panic!("expected AlreadyExists, got: {response:?}");
962        }
963    }
964
965    #[tokio::test]
966    async fn test_payment_cache_stats_returns_correct_values() {
967        let (protocol, _temp) = create_test_protocol().await;
968
969        let stats = protocol.payment_cache_stats();
970        assert_eq!(stats.hits, 0);
971        assert_eq!(stats.misses, 0);
972        assert_eq!(stats.additions, 0);
973
974        // Pre-populate cache, then store a chunk to test stats
975        let content = b"stats test";
976        let address = LmdbStorage::compute_address(content);
977        protocol.payment_verifier().cache_insert(address);
978
979        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
980        let put_msg = ChunkMessage {
981            request_id: 120,
982            body: ChunkMessageBody::PutRequest(put_request),
983        };
984        let put_bytes = put_msg.encode().expect("encode put");
985        let _ = protocol
986            .try_handle_request(&put_bytes)
987            .await
988            .expect("handle put");
989
990        let stats = protocol.payment_cache_stats();
991        // Should have 1 addition (from cache_insert) + 1 hit (payment verification found cache)
992        assert_eq!(stats.additions, 1);
993        assert_eq!(stats.hits, 1);
994    }
995
996    #[tokio::test]
997    async fn test_storage_stats() {
998        let (protocol, _temp) = create_test_protocol().await;
999        let stats = protocol.storage_stats();
1000        assert_eq!(stats.chunks_stored, 0);
1001    }
1002
1003    #[tokio::test]
1004    async fn test_merkle_candidate_quote_request() {
1005        use ant_protocol::payment::verify::verify_merkle_candidate_signature;
1006        use evmlib::merkle_payments::MerklePaymentCandidateNode;
1007
1008        // create_test_protocol already wires ML-DSA-65 signing
1009        let (protocol, _temp) = create_test_protocol().await;
1010
1011        let address = [0x77; 32];
1012        let timestamp = std::time::SystemTime::now()
1013            .duration_since(std::time::UNIX_EPOCH)
1014            .expect("system time")
1015            .as_secs();
1016
1017        let request = MerkleCandidateQuoteRequest {
1018            address,
1019            data_type: DATA_TYPE_CHUNK,
1020            data_size: 4096,
1021            merkle_payment_timestamp: timestamp,
1022        };
1023        let msg = ChunkMessage {
1024            request_id: 600,
1025            body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
1026        };
1027        let msg_bytes = msg.encode().expect("encode request");
1028
1029        let response_bytes = protocol
1030            .try_handle_request(&msg_bytes)
1031            .await
1032            .expect("handle merkle candidate quote")
1033            .expect("expected response");
1034        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
1035
1036        assert_eq!(response.request_id, 600);
1037        match response.body {
1038            ChunkMessageBody::MerkleCandidateQuoteResponse(
1039                MerkleCandidateQuoteResponse::Success { candidate_node },
1040            ) => {
1041                let candidate: MerklePaymentCandidateNode =
1042                    rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
1043
1044                // Verify ML-DSA-65 signature
1045                assert!(
1046                    verify_merkle_candidate_signature(&candidate),
1047                    "ML-DSA-65 candidate signature must be valid"
1048                );
1049
1050                assert_eq!(candidate.merkle_payment_timestamp, timestamp);
1051                // Node-calculated price based on records stored
1052                assert!(candidate.price >= evmlib::common::Amount::ZERO);
1053            }
1054            other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
1055        }
1056    }
1057
1058    #[tokio::test]
1059    async fn test_handle_unexpected_response_message() {
1060        let (protocol, _temp) = create_test_protocol().await;
1061
1062        // Send a PutResponse as if it were a request — should return None
1063        let msg = ChunkMessage {
1064            request_id: 200,
1065            body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
1066        };
1067        let msg_bytes = msg.encode().expect("encode");
1068
1069        let result = protocol
1070            .try_handle_request(&msg_bytes)
1071            .await
1072            .expect("handle msg");
1073
1074        assert!(
1075            result.is_none(),
1076            "expected None for response message, got: {result:?}"
1077        );
1078    }
1079
1080    #[tokio::test]
1081    async fn test_quote_already_stored_flag() {
1082        let (protocol, _temp) = create_test_protocol().await;
1083
1084        let content = b"already stored quote test";
1085        let address = LmdbStorage::compute_address(content);
1086
1087        // Store the chunk first
1088        protocol.payment_verifier().cache_insert(address);
1089        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
1090        let put_msg = ChunkMessage {
1091            request_id: 300,
1092            body: ChunkMessageBody::PutRequest(put_request),
1093        };
1094        let put_bytes = put_msg.encode().expect("encode put");
1095        let _ = protocol
1096            .try_handle_request(&put_bytes)
1097            .await
1098            .expect("handle put");
1099
1100        // Now request a quote for the same address — already_stored should be true
1101        let quote_request = ChunkQuoteRequest {
1102            address,
1103            data_size: content.len() as u64,
1104            data_type: DATA_TYPE_CHUNK,
1105        };
1106        let quote_msg = ChunkMessage {
1107            request_id: 301,
1108            body: ChunkMessageBody::QuoteRequest(quote_request),
1109        };
1110        let quote_bytes = quote_msg.encode().expect("encode quote");
1111        let response_bytes = protocol
1112            .try_handle_request(&quote_bytes)
1113            .await
1114            .expect("handle quote")
1115            .expect("expected response");
1116        let response = ChunkMessage::decode(&response_bytes).expect("decode");
1117
1118        match response.body {
1119            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1120                already_stored, ..
1121            }) => {
1122                assert!(
1123                    already_stored,
1124                    "already_stored should be true for existing chunk"
1125                );
1126            }
1127            other => panic!("expected Success with already_stored, got: {other:?}"),
1128        }
1129
1130        // Request a quote for a chunk that does NOT exist — already_stored should be false
1131        let new_address = [0xFFu8; 32];
1132        let quote_request2 = ChunkQuoteRequest {
1133            address: new_address,
1134            data_size: 100,
1135            data_type: DATA_TYPE_CHUNK,
1136        };
1137        let quote_msg2 = ChunkMessage {
1138            request_id: 302,
1139            body: ChunkMessageBody::QuoteRequest(quote_request2),
1140        };
1141        let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
1142        let response_bytes2 = protocol
1143            .try_handle_request(&quote_bytes2)
1144            .await
1145            .expect("handle quote2")
1146            .expect("expected response");
1147        let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
1148
1149        match response2.body {
1150            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1151                already_stored, ..
1152            }) => {
1153                assert!(
1154                    !already_stored,
1155                    "already_stored should be false for new chunk"
1156                );
1157            }
1158            other => panic!("expected Success with already_stored=false, got: {other:?}"),
1159        }
1160    }
1161}