Skip to main content

ant_node/storage/
handler.rs

1//! ANT protocol handler for autonomi protocol messages.
2//!
3//! This handler processes chunk PUT/GET requests with optional payment verification,
4//! storing chunks to LMDB and using the DHT for network-wide retrieval.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────┐
10//! │                    AntProtocol                        │
11//! ├─────────────────────────────────────────────────────────┤
12//! │  protocol_id() = "autonomi.ant.chunk.v1"                  │
13//! │                                                         │
14//! │  try_handle_request(data) ──▶ decode ChunkMessage  │
15//! │                                   │                     │
16//! │         ┌─────────────────────────┼─────────────────┐  │
17//! │         ▼                         ▼                 ▼  │
18//! │   ChunkQuoteRequest           ChunkPutRequest    ChunkGetRequest
19//! │         │                         │                 │  │
20//! │         ▼                         ▼                 ▼  │
21//! │   QuoteGenerator          PaymentVerifier    LmdbStorage│
22//! │         │                         │                 │  │
23//! │         └─────────────────────────┴─────────────────┘  │
24//! │                           │                             │
25//! │           return Ok(Some(response_bytes))              │
26//! │           return Ok(None) for response messages       │
27//! └─────────────────────────────────────────────────────────┘
28//! ```
29
30#[cfg(test)]
31use crate::ant_protocol::DATA_TYPE_CHUNK;
32use crate::ant_protocol::{
33    ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
34    ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
35    MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE,
36};
37use crate::client::compute_address;
38use crate::error::{Error, Result};
39use crate::logging::{debug, info, warn};
40use crate::payment::{PaymentVerifier, QuoteGenerator, VerificationContext};
41use crate::replication::fresh::FreshWriteEvent;
42use crate::storage::lmdb::LmdbStorage;
43use bytes::Bytes;
44use std::sync::Arc;
45use tokio::sync::mpsc;
46
47/// ANT protocol handler.
48///
49/// Handles chunk PUT/GET/Quote requests using LMDB storage for persistence
50/// and optional payment verification.
51pub struct AntProtocol {
52    /// LMDB storage for chunk persistence.
53    storage: Arc<LmdbStorage>,
54    /// Payment verifier for checking payments.
55    payment_verifier: Arc<PaymentVerifier>,
56    /// Quote generator for creating storage quotes.
57    /// Also handles merkle candidate quote signing via ML-DSA-65.
58    quote_generator: Arc<QuoteGenerator>,
59    /// Channel for notifying the replication engine about newly-stored chunks.
60    fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
61}
62
63impl AntProtocol {
64    /// Create a new ANT protocol handler.
65    ///
66    /// # Arguments
67    ///
68    /// * `storage` - LMDB storage for chunk persistence
69    /// * `payment_verifier` - Payment verifier for validating payments
70    /// * `quote_generator` - Quote generator for creating storage quotes
71    #[must_use]
72    pub fn new(
73        storage: Arc<LmdbStorage>,
74        payment_verifier: Arc<PaymentVerifier>,
75        quote_generator: Arc<QuoteGenerator>,
76    ) -> Self {
77        // Keep the PaymentVerifier's freshness gate AND the QuoteGenerator's
78        // pricing wired to the same authoritative store used by this protocol
79        // handler. Pricing and the freshness gate MUST read the same record
80        // count: the generator prices a quote from current_chunks() and the
81        // verifier later checks the quote against current_chunks(), so the only
82        // difference they see is genuine in-flight growth. Attaching both here
83        // makes the invariant automatic for every AntProtocol construction
84        // path, including tests and future startup variants.
85        payment_verifier.attach_storage(Arc::clone(&storage));
86        quote_generator.attach_storage(Arc::clone(&storage));
87
88        Self {
89            storage,
90            payment_verifier,
91            quote_generator,
92            fresh_write_tx: None,
93        }
94    }
95
96    /// Set the channel sender for fresh-write replication events.
97    ///
98    /// When set, successful chunk PUTs will notify the replication engine
99    /// so it can fan out fresh offers to the close group.
100    pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
101        self.fresh_write_tx = Some(tx);
102    }
103
104    /// Get the protocol identifier.
105    #[must_use]
106    pub fn protocol_id(&self) -> &'static str {
107        CHUNK_PROTOCOL_ID
108    }
109
110    /// Get a reference to the underlying LMDB storage.
111    #[must_use]
112    pub fn storage(&self) -> Arc<LmdbStorage> {
113        Arc::clone(&self.storage)
114    }
115
116    /// Get a shared reference to the payment verifier.
117    #[must_use]
118    pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
119        Arc::clone(&self.payment_verifier)
120    }
121
122    /// Handle an incoming request and produce a response.
123    ///
124    /// Decodes the raw message, processes it if it is a request variant,
125    /// and returns the encoded response bytes.  Returns `Ok(None)` for
126    /// response messages (which are meant for client subscribers, not for
127    /// the protocol handler).
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if message decoding, handling, or encoding fails.
132    pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
133        let message = ChunkMessage::decode(data)
134            .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
135
136        let request_id = message.request_id;
137
138        let response_body = match message.body {
139            ChunkMessageBody::PutRequest(req) => {
140                ChunkMessageBody::PutResponse(self.handle_put(req).await)
141            }
142            ChunkMessageBody::GetRequest(req) => {
143                ChunkMessageBody::GetResponse(self.handle_get(req).await)
144            }
145            ChunkMessageBody::QuoteRequest(ref req) => {
146                ChunkMessageBody::QuoteResponse(self.handle_quote(req))
147            }
148            ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
149                ChunkMessageBody::MerkleCandidateQuoteResponse(
150                    self.handle_merkle_candidate_quote(req),
151                )
152            }
153            // Anything else — response messages are handled by client
154            // subscribers (e.g. send_and_await_chunk_response), not by the
155            // protocol handler. Returning None prevents the caller from
156            // sending a reply, which would create an infinite ping-pong
157            // loop.
158            //
159            // `ChunkMessageBody` is `#[non_exhaustive]` in ant-protocol, so
160            // a future wire variant added on a protocol minor bump also
161            // lands here and is dropped. The CHUNK_PROTOCOL_ID multistream-
162            // select handshake version-gates peers, so this arm should
163            // only be reached by a misconfigured peer.
164            _ => return Ok(None),
165        };
166
167        let response = ChunkMessage {
168            request_id,
169            body: response_body,
170        };
171
172        response
173            .encode()
174            .map(|b| Some(Bytes::from(b)))
175            .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
176    }
177
178    /// Handle a PUT request.
179    ///
180    /// Wraps `handle_put_inner` to emit a single structured tracing event per
181    /// PUT RPC at every exit path, including early-return validation paths.
182    /// The event uses `target: "ant_node::storage::rpc_latency"` so that
183    /// Elasticsearch / Kibana can build p50/p95/p99 store-RPC latency
184    /// histograms from the existing telegraf log forwarding.
185    async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
186        let start = std::time::Instant::now();
187        let addr_hex = hex::encode(request.address);
188        let chunk_size = request.content.len();
189        let response = self.handle_put_inner(request).await;
190        let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
191        let outcome: &'static str = match &response {
192            ChunkPutResponse::Success { .. } => "success",
193            ChunkPutResponse::AlreadyExists { .. } => "already_exists",
194            ChunkPutResponse::PaymentRequired { .. } => "payment_required",
195            ChunkPutResponse::Error(_) => "error",
196            _ => "unknown",
197        };
198        info!(
199            target: "ant_node::storage::rpc_latency",
200            duration_ms,
201            chunk_size,
202            outcome,
203            addr = %addr_hex,
204            "put_rpc"
205        );
206        response
207    }
208
209    /// Inner body of `handle_put` — see the wrapper for the per-RPC latency log.
210    async fn handle_put_inner(&self, request: ChunkPutRequest) -> ChunkPutResponse {
211        let address = request.address;
212        let addr_hex = hex::encode(address);
213        debug!("Handling PUT request for {addr_hex}");
214
215        // 1. Validate chunk size
216        if request.content.len() > MAX_CHUNK_SIZE {
217            return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
218                size: request.content.len(),
219                max_size: MAX_CHUNK_SIZE,
220            });
221        }
222
223        // 2. Verify content address matches BLAKE3(content)
224        let computed = compute_address(&request.content);
225        if computed != address {
226            return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
227                expected: address,
228                actual: computed,
229            });
230        }
231
232        // 3. Check if already exists (idempotent success)
233        match self.storage.exists(&address) {
234            Ok(true) => {
235                debug!("Chunk {addr_hex} already exists");
236                return ChunkPutResponse::AlreadyExists { address };
237            }
238            Err(e) => {
239                return ChunkPutResponse::Error(ProtocolError::Internal(format!(
240                    "Storage read failed: {e}"
241                )));
242            }
243            Ok(false) => {}
244        }
245
246        // 4. Verify payment. This node is the storer being paid right now, so
247        // the full ClientPut check set applies (own-quote price freshness,
248        // local recipient, merkle candidate closeness).
249        let payment_result = self
250            .payment_verifier
251            .verify_payment(
252                &address,
253                request.payment_proof.as_deref(),
254                VerificationContext::ClientPut,
255            )
256            .await;
257
258        match payment_result {
259            Ok(status) if status.can_store() => {
260                // Payment verified or cached
261            }
262            Ok(_) => {
263                return ChunkPutResponse::PaymentRequired {
264                    message: "Payment required for new chunk".to_string(),
265                };
266            }
267            Err(e) => {
268                return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
269            }
270        }
271
272        // 5. Store chunk
273        match self.storage.put(&address, &request.content).await {
274            Ok(_) => {
275                let content_len = request.content.len();
276                info!("Stored chunk {addr_hex} ({content_len} bytes)");
277                // Bump the in-memory fallback counter. Both pricing and the
278                // freshness gate now read LmdbStorage::current_chunks() directly,
279                // so this counter only matters when no storage is attached
280                // (unit tests / mis-configured startup). Kept warm so that
281                // fallback path stays roughly accurate.
282                self.quote_generator.record_store();
283
284                // 6. Notify replication engine for fresh fan-out.
285                //    Only emit when a real proof is present — cached-as-verified
286                //    PUTs have no proof to forward, and the chunk would have
287                //    already replicated on the original write that carried one.
288                if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
289                    // `request.content` is now `bytes::Bytes`; FreshWriteEvent
290                    // still carries the chunk as `Vec<u8>` for compatibility
291                    // with the replication wire format, so materialise once
292                    // here. Done only on the success path, where storage has
293                    // already accepted the chunk.
294                    let event = FreshWriteEvent {
295                        key: address,
296                        data: request.content.to_vec(),
297                        payment_proof: proof,
298                    };
299                    if tx.send(event).is_err() {
300                        debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
301                    }
302                }
303
304                ChunkPutResponse::Success { address }
305            }
306            Err(e) => {
307                warn!("Failed to store chunk {addr_hex}: {e}");
308                ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
309            }
310        }
311    }
312
313    /// Handle a GET request.
314    ///
315    /// Wraps `handle_get_inner` to emit a single structured tracing event per
316    /// GET RPC at every exit path. See `handle_put` for the rationale.
317    async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
318        let start = std::time::Instant::now();
319        let addr_hex = hex::encode(request.address);
320        let response = self.handle_get_inner(request).await;
321        let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
322        let outcome: &'static str = match &response {
323            ChunkGetResponse::Success { .. } => "success",
324            ChunkGetResponse::NotFound { .. } => "not_found",
325            ChunkGetResponse::Error(_) => "error",
326            _ => "unknown",
327        };
328        info!(
329            target: "ant_node::storage::rpc_latency",
330            duration_ms,
331            outcome,
332            addr = %addr_hex,
333            "get_rpc"
334        );
335        response
336    }
337
338    /// Inner body of `handle_get` — see the wrapper for the per-RPC latency log.
339    async fn handle_get_inner(&self, request: ChunkGetRequest) -> ChunkGetResponse {
340        let address = request.address;
341        let addr_hex = hex::encode(address);
342        debug!("Handling GET request for {addr_hex}");
343
344        match self.storage.get(&address).await {
345            Ok(Some(content)) => {
346                let content_len = content.len();
347                debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
348                ChunkGetResponse::Success { address, content }
349            }
350            Ok(None) => {
351                debug!("Chunk {addr_hex} not found");
352                ChunkGetResponse::NotFound { address }
353            }
354            Err(e) => {
355                warn!("Failed to retrieve chunk {addr_hex}: {e}");
356                ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
357            }
358        }
359    }
360
361    /// Handle a quote request.
362    fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
363        let addr_hex = hex::encode(request.address);
364        let data_size = request.data_size;
365        debug!("Handling quote request for {addr_hex} (size: {data_size})");
366
367        // Check if the chunk is already stored so we can tell the client
368        // to skip payment (already_stored = true).
369        // The match intentionally logs the error when the `logging` feature is
370        // active. Clippy suggests `unwrap_or_default()` when logging is compiled
371        // out, but keeping the explicit match preserves the diagnostic intent.
372        #[allow(clippy::manual_unwrap_or_default)]
373        let already_stored = match self.storage.exists(&request.address) {
374            Ok(exists) => exists,
375            Err(e) => {
376                warn!("Storage check failed for {addr_hex}: {e}");
377                false // Assume not stored on error — generate a normal quote.
378            }
379        };
380
381        if already_stored {
382            debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
383        }
384
385        // Validate data size - data_size is u64, cast carefully and reject overflow
386        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
387            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
388                size: MAX_CHUNK_SIZE + 1,
389                max_size: MAX_CHUNK_SIZE,
390            });
391        };
392        if data_size_usize > MAX_CHUNK_SIZE {
393            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
394                size: data_size_usize,
395                max_size: MAX_CHUNK_SIZE,
396            });
397        }
398
399        match self
400            .quote_generator
401            .create_quote(request.address, data_size_usize, request.data_type)
402        {
403            Ok(quote) => {
404                // Serialize the quote
405                match rmp_serde::to_vec(&quote) {
406                    Ok(quote_bytes) => ChunkQuoteResponse::Success {
407                        quote: quote_bytes,
408                        already_stored,
409                    },
410                    Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
411                        "Failed to serialize quote: {e}"
412                    ))),
413                }
414            }
415            Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
416        }
417    }
418
419    /// Handle a merkle candidate quote request.
420    fn handle_merkle_candidate_quote(
421        &self,
422        request: &MerkleCandidateQuoteRequest,
423    ) -> MerkleCandidateQuoteResponse {
424        let addr_hex = hex::encode(request.address);
425        let data_size = request.data_size;
426        debug!(
427            "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
428            request.merkle_payment_timestamp
429        );
430
431        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
432            return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
433                "data_size {} overflows usize",
434                request.data_size
435            )));
436        };
437        if data_size_usize > MAX_CHUNK_SIZE {
438            return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
439                size: data_size_usize,
440                max_size: MAX_CHUNK_SIZE,
441            });
442        }
443
444        match self.quote_generator.create_merkle_candidate_quote(
445            data_size_usize,
446            request.data_type,
447            request.merkle_payment_timestamp,
448        ) {
449            Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
450                Ok(bytes) => MerkleCandidateQuoteResponse::Success {
451                    candidate_node: bytes,
452                },
453                Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
454                    "Failed to serialize merkle candidate node: {e}"
455                ))),
456            },
457            Err(e) => {
458                MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
459            }
460        }
461    }
462
463    /// Get storage statistics.
464    #[must_use]
465    pub fn storage_stats(&self) -> crate::storage::StorageStats {
466        self.storage.stats()
467    }
468
469    /// Get payment cache statistics.
470    #[must_use]
471    pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
472        self.payment_verifier.cache_stats()
473    }
474
475    /// Get a reference to the payment verifier.
476    ///
477    /// Exposed for **test harnesses only** — production code should not call
478    /// this directly. Use `cache_insert()` on the returned verifier to
479    /// pre-populate the payment cache in test setups.
480    #[cfg(any(test, feature = "test-utils"))]
481    #[must_use]
482    pub fn payment_verifier(&self) -> &PaymentVerifier {
483        &self.payment_verifier
484    }
485
486    /// Check if a chunk exists locally.
487    ///
488    /// # Errors
489    ///
490    /// Returns an error if the storage read fails.
491    pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
492        self.storage.exists(address)
493    }
494
495    /// Get a chunk directly from local storage.
496    ///
497    /// # Errors
498    ///
499    /// Returns an error if storage access fails.
500    pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
501        self.storage.get(address).await
502    }
503
504    /// Store a chunk directly to local storage (bypasses payment verification).
505    ///
506    /// TEST ONLY - This method bypasses payment verification and should only be used in tests.
507    ///
508    /// # Errors
509    ///
510    /// Returns an error if storage fails or content doesn't match address.
511    #[cfg(test)]
512    pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
513        self.storage.put(address, content).await
514    }
515}
516
517#[cfg(test)]
518#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
519mod tests {
520    use super::*;
521    use crate::payment::metrics::QuotingMetricsTracker;
522    use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
523    use crate::storage::LmdbStorageConfig;
524    use evmlib::RewardsAddress;
525    use saorsa_core::identity::NodeIdentity;
526    use saorsa_core::MlDsa65;
527    use saorsa_pqc::pqc::types::MlDsaSecretKey;
528    use tempfile::TempDir;
529
530    async fn create_test_protocol() -> (AntProtocol, TempDir) {
531        let temp_dir = TempDir::new().expect("create temp dir");
532
533        let storage_config = LmdbStorageConfig {
534            root_dir: temp_dir.path().to_path_buf(),
535            ..LmdbStorageConfig::test_default()
536        };
537        let storage = Arc::new(
538            LmdbStorage::new(storage_config)
539                .await
540                .expect("create storage"),
541        );
542
543        let rewards_address = RewardsAddress::new([1u8; 20]);
544        let payment_config = PaymentVerifierConfig {
545            evm: EvmVerifierConfig::default(),
546            cache_capacity: 100_000,
547            local_rewards_address: rewards_address,
548        };
549        let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
550        let metrics_tracker = QuotingMetricsTracker::new(100);
551        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
552
553        // Wire ML-DSA-65 signing so quote requests succeed
554        let identity = NodeIdentity::generate().expect("generate identity");
555        let pub_key_bytes = identity.public_key().as_bytes().to_vec();
556        let sk_bytes = identity.secret_key_bytes().to_vec();
557        let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
558        quote_generator.set_signer(pub_key_bytes, move |msg| {
559            use saorsa_pqc::pqc::MlDsaOperations;
560            let ml_dsa = MlDsa65::new();
561            ml_dsa
562                .sign(&sk, msg)
563                .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
564        });
565
566        let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
567        (protocol, temp_dir)
568    }
569
570    #[tokio::test]
571    async fn test_put_and_get_chunk() {
572        let (protocol, _temp) = create_test_protocol().await;
573
574        let content = b"hello world";
575        let address = LmdbStorage::compute_address(content);
576
577        // Pre-populate payment cache so EVM verification is bypassed
578        protocol.payment_verifier().cache_insert(address);
579
580        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
581        let put_msg = ChunkMessage {
582            request_id: 1,
583            body: ChunkMessageBody::PutRequest(put_request),
584        };
585        let put_bytes = put_msg.encode().expect("encode put");
586
587        // Handle PUT
588        let response_bytes = protocol
589            .try_handle_request(&put_bytes)
590            .await
591            .expect("handle put")
592            .expect("expected response");
593        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
594
595        assert_eq!(response.request_id, 1);
596        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
597            response.body
598        {
599            assert_eq!(addr, address);
600        } else {
601            panic!("expected PutResponse::Success, got: {response:?}");
602        }
603
604        // Create GET request
605        let get_request = ChunkGetRequest::new(address);
606        let get_msg = ChunkMessage {
607            request_id: 2,
608            body: ChunkMessageBody::GetRequest(get_request),
609        };
610        let get_bytes = get_msg.encode().expect("encode get");
611
612        // Handle GET
613        let response_bytes = protocol
614            .try_handle_request(&get_bytes)
615            .await
616            .expect("handle get")
617            .expect("expected response");
618        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
619
620        assert_eq!(response.request_id, 2);
621        if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
622            address: addr,
623            content: data,
624        }) = response.body
625        {
626            assert_eq!(addr, address);
627            assert_eq!(data, content.to_vec());
628        } else {
629            panic!("expected GetResponse::Success");
630        }
631    }
632
633    #[tokio::test]
634    async fn test_get_not_found() {
635        let (protocol, _temp) = create_test_protocol().await;
636
637        let address = [0xAB; 32];
638        let get_request = ChunkGetRequest::new(address);
639        let get_msg = ChunkMessage {
640            request_id: 10,
641            body: ChunkMessageBody::GetRequest(get_request),
642        };
643        let get_bytes = get_msg.encode().expect("encode get");
644
645        let response_bytes = protocol
646            .try_handle_request(&get_bytes)
647            .await
648            .expect("handle get")
649            .expect("expected response");
650        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
651
652        assert_eq!(response.request_id, 10);
653        if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
654            response.body
655        {
656            assert_eq!(addr, address);
657        } else {
658            panic!("expected GetResponse::NotFound");
659        }
660    }
661
662    #[tokio::test]
663    async fn test_put_address_mismatch() {
664        let (protocol, _temp) = create_test_protocol().await;
665
666        let content = b"test content";
667        let wrong_address = [0xFF; 32]; // Wrong address
668
669        // Pre-populate cache for the wrong address so we test address mismatch, not payment
670        protocol.payment_verifier().cache_insert(wrong_address);
671
672        let put_request = ChunkPutRequest::new(wrong_address, Bytes::copy_from_slice(content));
673        let put_msg = ChunkMessage {
674            request_id: 20,
675            body: ChunkMessageBody::PutRequest(put_request),
676        };
677        let put_bytes = put_msg.encode().expect("encode put");
678
679        let response_bytes = protocol
680            .try_handle_request(&put_bytes)
681            .await
682            .expect("handle put")
683            .expect("expected response");
684        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
685
686        assert_eq!(response.request_id, 20);
687        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
688            ProtocolError::AddressMismatch { .. },
689        )) = response.body
690        {
691            // Expected
692        } else {
693            panic!("expected AddressMismatch error, got: {response:?}");
694        }
695    }
696
697    #[tokio::test]
698    async fn test_put_chunk_too_large() {
699        let (protocol, _temp) = create_test_protocol().await;
700
701        // Create oversized content
702        let content = vec![0u8; MAX_CHUNK_SIZE + 1];
703        let address = LmdbStorage::compute_address(&content);
704
705        let put_request = ChunkPutRequest::new(address, Bytes::from(content));
706        let put_msg = ChunkMessage {
707            request_id: 30,
708            body: ChunkMessageBody::PutRequest(put_request),
709        };
710        let put_bytes = put_msg.encode().expect("encode put");
711
712        let response_bytes = protocol
713            .try_handle_request(&put_bytes)
714            .await
715            .expect("handle put")
716            .expect("expected response");
717        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
718
719        assert_eq!(response.request_id, 30);
720        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
721            ProtocolError::ChunkTooLarge { .. },
722        )) = response.body
723        {
724            // Expected
725        } else {
726            panic!("expected ChunkTooLarge error");
727        }
728    }
729
730    #[tokio::test]
731    async fn test_put_already_exists() {
732        let (protocol, _temp) = create_test_protocol().await;
733
734        let content = b"duplicate content";
735        let address = LmdbStorage::compute_address(content);
736
737        // Pre-populate cache so EVM verification is bypassed
738        protocol.payment_verifier().cache_insert(address);
739
740        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
741        let put_msg = ChunkMessage {
742            request_id: 40,
743            body: ChunkMessageBody::PutRequest(put_request),
744        };
745        let put_bytes = put_msg.encode().expect("encode put");
746
747        let _ = protocol
748            .try_handle_request(&put_bytes)
749            .await
750            .expect("handle put");
751
752        // Store again - should return AlreadyExists
753        let response_bytes = protocol
754            .try_handle_request(&put_bytes)
755            .await
756            .expect("handle put 2")
757            .expect("expected response");
758        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
759
760        assert_eq!(response.request_id, 40);
761        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
762            response.body
763        {
764            assert_eq!(addr, address);
765        } else {
766            panic!("expected AlreadyExists");
767        }
768    }
769
770    #[tokio::test]
771    async fn test_protocol_id() {
772        let (protocol, _temp) = create_test_protocol().await;
773        assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
774    }
775
776    #[tokio::test]
777    async fn test_exists_and_local_access() {
778        let (protocol, _temp) = create_test_protocol().await;
779
780        let content = b"local access test";
781        let address = LmdbStorage::compute_address(content);
782
783        assert!(!protocol.exists(&address).expect("exists check"));
784
785        protocol
786            .put_local(&address, content)
787            .await
788            .expect("put local");
789
790        assert!(protocol.exists(&address).expect("exists check"));
791
792        let retrieved = protocol.get_local(&address).await.expect("get local");
793        assert_eq!(retrieved, Some(content.to_vec()));
794    }
795
796    #[tokio::test]
797    async fn test_cache_insert_is_visible() {
798        let (protocol, _temp) = create_test_protocol().await;
799
800        let content = b"cache test content";
801        let address = LmdbStorage::compute_address(content);
802
803        // Before insert: cache should be empty
804        let stats_before = protocol.payment_cache_stats();
805        assert_eq!(stats_before.additions, 0);
806
807        // Pre-populate cache
808        protocol.payment_verifier().cache_insert(address);
809
810        // After insert: cache should have the xorname
811        let stats_after = protocol.payment_cache_stats();
812        assert_eq!(stats_after.additions, 1);
813
814        // PUT should succeed (cache hit)
815        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
816        let put_msg = ChunkMessage {
817            request_id: 100,
818            body: ChunkMessageBody::PutRequest(put_request),
819        };
820        let put_bytes = put_msg.encode().expect("encode put");
821        let response_bytes = protocol
822            .try_handle_request(&put_bytes)
823            .await
824            .expect("handle put")
825            .expect("expected response");
826        let response = ChunkMessage::decode(&response_bytes).expect("decode");
827
828        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
829            // expected
830        } else {
831            panic!("expected success, got: {response:?}");
832        }
833    }
834
835    #[tokio::test]
836    async fn test_put_same_chunk_twice_hits_cache() {
837        let (protocol, _temp) = create_test_protocol().await;
838
839        let content = b"duplicate cache test";
840        let address = LmdbStorage::compute_address(content);
841
842        // Pre-populate cache for first PUT
843        protocol.payment_verifier().cache_insert(address);
844
845        // First PUT
846        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
847        let put_msg = ChunkMessage {
848            request_id: 110,
849            body: ChunkMessageBody::PutRequest(put_request),
850        };
851        let put_bytes = put_msg.encode().expect("encode put");
852        let _ = protocol
853            .try_handle_request(&put_bytes)
854            .await
855            .expect("handle put 1");
856
857        // Second PUT — should return AlreadyExists (checked in storage before payment)
858        let response_bytes = protocol
859            .try_handle_request(&put_bytes)
860            .await
861            .expect("handle put 2")
862            .expect("expected response");
863        let response = ChunkMessage::decode(&response_bytes).expect("decode");
864
865        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
866        {
867            // expected — storage check comes before payment check
868        } else {
869            panic!("expected AlreadyExists, got: {response:?}");
870        }
871    }
872
873    #[tokio::test]
874    async fn test_payment_cache_stats_returns_correct_values() {
875        let (protocol, _temp) = create_test_protocol().await;
876
877        let stats = protocol.payment_cache_stats();
878        assert_eq!(stats.hits, 0);
879        assert_eq!(stats.misses, 0);
880        assert_eq!(stats.additions, 0);
881
882        // Pre-populate cache, then store a chunk to test stats
883        let content = b"stats test";
884        let address = LmdbStorage::compute_address(content);
885        protocol.payment_verifier().cache_insert(address);
886
887        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
888        let put_msg = ChunkMessage {
889            request_id: 120,
890            body: ChunkMessageBody::PutRequest(put_request),
891        };
892        let put_bytes = put_msg.encode().expect("encode put");
893        let _ = protocol
894            .try_handle_request(&put_bytes)
895            .await
896            .expect("handle put");
897
898        let stats = protocol.payment_cache_stats();
899        // Should have 1 addition (from cache_insert) + 1 hit (payment verification found cache)
900        assert_eq!(stats.additions, 1);
901        assert_eq!(stats.hits, 1);
902    }
903
904    #[tokio::test]
905    async fn test_storage_stats() {
906        let (protocol, _temp) = create_test_protocol().await;
907        let stats = protocol.storage_stats();
908        assert_eq!(stats.chunks_stored, 0);
909    }
910
911    #[tokio::test]
912    async fn test_merkle_candidate_quote_request() {
913        use ant_protocol::payment::verify::verify_merkle_candidate_signature;
914        use evmlib::merkle_payments::MerklePaymentCandidateNode;
915
916        // create_test_protocol already wires ML-DSA-65 signing
917        let (protocol, _temp) = create_test_protocol().await;
918
919        let address = [0x77; 32];
920        let timestamp = std::time::SystemTime::now()
921            .duration_since(std::time::UNIX_EPOCH)
922            .expect("system time")
923            .as_secs();
924
925        let request = MerkleCandidateQuoteRequest {
926            address,
927            data_type: DATA_TYPE_CHUNK,
928            data_size: 4096,
929            merkle_payment_timestamp: timestamp,
930        };
931        let msg = ChunkMessage {
932            request_id: 600,
933            body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
934        };
935        let msg_bytes = msg.encode().expect("encode request");
936
937        let response_bytes = protocol
938            .try_handle_request(&msg_bytes)
939            .await
940            .expect("handle merkle candidate quote")
941            .expect("expected response");
942        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
943
944        assert_eq!(response.request_id, 600);
945        match response.body {
946            ChunkMessageBody::MerkleCandidateQuoteResponse(
947                MerkleCandidateQuoteResponse::Success { candidate_node },
948            ) => {
949                let candidate: MerklePaymentCandidateNode =
950                    rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
951
952                // Verify ML-DSA-65 signature
953                assert!(
954                    verify_merkle_candidate_signature(&candidate),
955                    "ML-DSA-65 candidate signature must be valid"
956                );
957
958                assert_eq!(candidate.merkle_payment_timestamp, timestamp);
959                // Node-calculated price based on records stored
960                assert!(candidate.price >= evmlib::common::Amount::ZERO);
961            }
962            other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
963        }
964    }
965
966    #[tokio::test]
967    async fn test_handle_unexpected_response_message() {
968        let (protocol, _temp) = create_test_protocol().await;
969
970        // Send a PutResponse as if it were a request — should return None
971        let msg = ChunkMessage {
972            request_id: 200,
973            body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
974        };
975        let msg_bytes = msg.encode().expect("encode");
976
977        let result = protocol
978            .try_handle_request(&msg_bytes)
979            .await
980            .expect("handle msg");
981
982        assert!(
983            result.is_none(),
984            "expected None for response message, got: {result:?}"
985        );
986    }
987
988    #[tokio::test]
989    async fn test_quote_already_stored_flag() {
990        let (protocol, _temp) = create_test_protocol().await;
991
992        let content = b"already stored quote test";
993        let address = LmdbStorage::compute_address(content);
994
995        // Store the chunk first
996        protocol.payment_verifier().cache_insert(address);
997        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
998        let put_msg = ChunkMessage {
999            request_id: 300,
1000            body: ChunkMessageBody::PutRequest(put_request),
1001        };
1002        let put_bytes = put_msg.encode().expect("encode put");
1003        let _ = protocol
1004            .try_handle_request(&put_bytes)
1005            .await
1006            .expect("handle put");
1007
1008        // Now request a quote for the same address — already_stored should be true
1009        let quote_request = ChunkQuoteRequest {
1010            address,
1011            data_size: content.len() as u64,
1012            data_type: DATA_TYPE_CHUNK,
1013        };
1014        let quote_msg = ChunkMessage {
1015            request_id: 301,
1016            body: ChunkMessageBody::QuoteRequest(quote_request),
1017        };
1018        let quote_bytes = quote_msg.encode().expect("encode quote");
1019        let response_bytes = protocol
1020            .try_handle_request(&quote_bytes)
1021            .await
1022            .expect("handle quote")
1023            .expect("expected response");
1024        let response = ChunkMessage::decode(&response_bytes).expect("decode");
1025
1026        match response.body {
1027            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1028                already_stored, ..
1029            }) => {
1030                assert!(
1031                    already_stored,
1032                    "already_stored should be true for existing chunk"
1033                );
1034            }
1035            other => panic!("expected Success with already_stored, got: {other:?}"),
1036        }
1037
1038        // Request a quote for a chunk that does NOT exist — already_stored should be false
1039        let new_address = [0xFFu8; 32];
1040        let quote_request2 = ChunkQuoteRequest {
1041            address: new_address,
1042            data_size: 100,
1043            data_type: DATA_TYPE_CHUNK,
1044        };
1045        let quote_msg2 = ChunkMessage {
1046            request_id: 302,
1047            body: ChunkMessageBody::QuoteRequest(quote_request2),
1048        };
1049        let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
1050        let response_bytes2 = protocol
1051            .try_handle_request(&quote_bytes2)
1052            .await
1053            .expect("handle quote2")
1054            .expect("expected response");
1055        let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
1056
1057        match response2.body {
1058            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1059                already_stored, ..
1060            }) => {
1061                assert!(
1062                    !already_stored,
1063                    "already_stored should be false for new chunk"
1064                );
1065            }
1066            other => panic!("expected Success with already_stored=false, got: {other:?}"),
1067        }
1068    }
1069}