Skip to main content

ant_node/storage/
handler.rs

1//! ANT protocol handler for autonomi protocol messages.
2//!
3//! This handler processes chunk PUT/GET requests with optional payment verification,
4//! storing chunks to LMDB and using the DHT for network-wide retrieval.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────┐
10//! │                    AntProtocol                        │
11//! ├─────────────────────────────────────────────────────────┤
12//! │  protocol_id() = "autonomi/ant/chunk/v1"                  │
13//! │                                                         │
14//! │  try_handle_request(data) ──▶ decode ChunkMessage  │
15//! │                                   │                     │
16//! │         ┌─────────────────────────┼─────────────────┐  │
17//! │         ▼                         ▼                 ▼  │
18//! │   ChunkQuoteRequest           ChunkPutRequest    ChunkGetRequest
19//! │         │                         │                 │  │
20//! │         ▼                         ▼                 ▼  │
21//! │   QuoteGenerator          PaymentVerifier    LmdbStorage│
22//! │         │                         │                 │  │
23//! │         └─────────────────────────┴─────────────────┘  │
24//! │                           │                             │
25//! │           return Ok(Some(response_bytes))              │
26//! │           return Ok(None) for response messages       │
27//! └─────────────────────────────────────────────────────────┘
28//! ```
29
30use crate::ant_protocol::{
31    ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
32    ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
33    MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, DATA_TYPE_CHUNK,
34    MAX_CHUNK_SIZE,
35};
36use crate::client::compute_address;
37use crate::error::{Error, Result};
38use crate::payment::{PaymentVerifier, QuoteGenerator};
39use crate::storage::lmdb::LmdbStorage;
40use bytes::Bytes;
41use std::sync::Arc;
42use tracing::{debug, info, warn};
43
44/// ANT protocol handler.
45///
46/// Handles chunk PUT/GET/Quote requests using LMDB storage for persistence
47/// and optional payment verification.
48pub struct AntProtocol {
49    /// LMDB storage for chunk persistence.
50    storage: Arc<LmdbStorage>,
51    /// Payment verifier for checking payments.
52    payment_verifier: Arc<PaymentVerifier>,
53    /// Quote generator for creating storage quotes.
54    /// Also handles merkle candidate quote signing via ML-DSA-65.
55    quote_generator: Arc<QuoteGenerator>,
56}
57
58impl AntProtocol {
59    /// Create a new ANT protocol handler.
60    ///
61    /// # Arguments
62    ///
63    /// * `storage` - LMDB storage for chunk persistence
64    /// * `payment_verifier` - Payment verifier for validating payments
65    /// * `quote_generator` - Quote generator for creating storage quotes
66    #[must_use]
67    pub fn new(
68        storage: Arc<LmdbStorage>,
69        payment_verifier: Arc<PaymentVerifier>,
70        quote_generator: Arc<QuoteGenerator>,
71    ) -> Self {
72        Self {
73            storage,
74            payment_verifier,
75            quote_generator,
76        }
77    }
78
79    /// Get the protocol identifier.
80    #[must_use]
81    pub fn protocol_id(&self) -> &'static str {
82        CHUNK_PROTOCOL_ID
83    }
84
85    /// Handle an incoming request and produce a response.
86    ///
87    /// Decodes the raw message, processes it if it is a request variant,
88    /// and returns the encoded response bytes.  Returns `Ok(None)` for
89    /// response messages (which are meant for client subscribers, not for
90    /// the protocol handler).
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if message decoding, handling, or encoding fails.
95    pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
96        let message = ChunkMessage::decode(data)
97            .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
98
99        let request_id = message.request_id;
100
101        let response_body = match message.body {
102            ChunkMessageBody::PutRequest(req) => {
103                ChunkMessageBody::PutResponse(self.handle_put(req).await)
104            }
105            ChunkMessageBody::GetRequest(req) => {
106                ChunkMessageBody::GetResponse(self.handle_get(req).await)
107            }
108            ChunkMessageBody::QuoteRequest(ref req) => {
109                ChunkMessageBody::QuoteResponse(self.handle_quote(req))
110            }
111            ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
112                ChunkMessageBody::MerkleCandidateQuoteResponse(
113                    self.handle_merkle_candidate_quote(req),
114                )
115            }
116            // Response messages are handled by client subscribers
117            // (e.g. send_and_await_chunk_response), not by the protocol
118            // handler. Returning None prevents the caller from sending a
119            // reply, which would create an infinite ping-pong loop.
120            ChunkMessageBody::PutResponse(_)
121            | ChunkMessageBody::GetResponse(_)
122            | ChunkMessageBody::QuoteResponse(_)
123            | ChunkMessageBody::MerkleCandidateQuoteResponse(_) => return Ok(None),
124        };
125
126        let response = ChunkMessage {
127            request_id,
128            body: response_body,
129        };
130
131        response
132            .encode()
133            .map(|b| Some(Bytes::from(b)))
134            .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
135    }
136
137    /// Handle a PUT request.
138    async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
139        let address = request.address;
140        let addr_hex = hex::encode(address);
141        debug!("Handling PUT request for {addr_hex}");
142
143        // 1. Validate chunk size
144        if request.content.len() > MAX_CHUNK_SIZE {
145            return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
146                size: request.content.len(),
147                max_size: MAX_CHUNK_SIZE,
148            });
149        }
150
151        // 2. Verify content address matches BLAKE3(content)
152        let computed = compute_address(&request.content);
153        if computed != address {
154            return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
155                expected: address,
156                actual: computed,
157            });
158        }
159
160        // 3. Check if already exists (idempotent success)
161        match self.storage.exists(&address) {
162            Ok(true) => {
163                debug!("Chunk {addr_hex} already exists");
164                return ChunkPutResponse::AlreadyExists { address };
165            }
166            Err(e) => {
167                return ChunkPutResponse::Error(ProtocolError::Internal(format!(
168                    "Storage read failed: {e}"
169                )));
170            }
171            Ok(false) => {}
172        }
173
174        // 4. Verify payment
175        let payment_result = self
176            .payment_verifier
177            .verify_payment(&address, request.payment_proof.as_deref())
178            .await;
179
180        match payment_result {
181            Ok(status) if status.can_store() => {
182                // Payment verified or cached
183            }
184            Ok(_) => {
185                return ChunkPutResponse::PaymentRequired {
186                    message: "Payment required for new chunk".to_string(),
187                };
188            }
189            Err(e) => {
190                return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
191            }
192        }
193
194        // 5. Store chunk
195        match self.storage.put(&address, &request.content).await {
196            Ok(_) => {
197                let content_len = request.content.len();
198                info!("Stored chunk {addr_hex} ({content_len} bytes)");
199                // Record the store and payment in metrics
200                self.quote_generator.record_store(DATA_TYPE_CHUNK);
201                self.quote_generator.record_payment();
202                ChunkPutResponse::Success { address }
203            }
204            Err(e) => {
205                warn!("Failed to store chunk {addr_hex}: {e}");
206                ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
207            }
208        }
209    }
210
211    /// Handle a GET request.
212    async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
213        let address = request.address;
214        let addr_hex = hex::encode(address);
215        debug!("Handling GET request for {addr_hex}");
216
217        match self.storage.get(&address).await {
218            Ok(Some(content)) => {
219                let content_len = content.len();
220                debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
221                ChunkGetResponse::Success { address, content }
222            }
223            Ok(None) => {
224                debug!("Chunk {addr_hex} not found");
225                ChunkGetResponse::NotFound { address }
226            }
227            Err(e) => {
228                warn!("Failed to retrieve chunk {addr_hex}: {e}");
229                ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
230            }
231        }
232    }
233
234    /// Handle a quote request.
235    fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
236        let addr_hex = hex::encode(request.address);
237        let data_size = request.data_size;
238        debug!("Handling quote request for {addr_hex} (size: {data_size})");
239
240        // Check if the chunk is already stored so we can tell the client
241        // to skip payment (already_stored = true).
242        let already_stored = match self.storage.exists(&request.address) {
243            Ok(exists) => exists,
244            Err(e) => {
245                warn!("Storage check failed for {addr_hex}: {e}");
246                false // Assume not stored on error — generate a normal quote.
247            }
248        };
249
250        if already_stored {
251            debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
252        }
253
254        // Validate data size - data_size is u64, cast carefully and reject overflow
255        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
256            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
257                size: MAX_CHUNK_SIZE + 1,
258                max_size: MAX_CHUNK_SIZE,
259            });
260        };
261        if data_size_usize > MAX_CHUNK_SIZE {
262            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
263                size: data_size_usize,
264                max_size: MAX_CHUNK_SIZE,
265            });
266        }
267
268        match self
269            .quote_generator
270            .create_quote(request.address, data_size_usize, request.data_type)
271        {
272            Ok(quote) => {
273                // Serialize the quote
274                match rmp_serde::to_vec(&quote) {
275                    Ok(quote_bytes) => ChunkQuoteResponse::Success {
276                        quote: quote_bytes,
277                        already_stored,
278                    },
279                    Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
280                        "Failed to serialize quote: {e}"
281                    ))),
282                }
283            }
284            Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
285        }
286    }
287
288    /// Handle a merkle candidate quote request.
289    fn handle_merkle_candidate_quote(
290        &self,
291        request: &MerkleCandidateQuoteRequest,
292    ) -> MerkleCandidateQuoteResponse {
293        let addr_hex = hex::encode(request.address);
294        let data_size = request.data_size;
295        debug!(
296            "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
297            request.merkle_payment_timestamp
298        );
299
300        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
301            return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
302                "data_size {} overflows usize",
303                request.data_size
304            )));
305        };
306        if data_size_usize > MAX_CHUNK_SIZE {
307            return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
308                size: data_size_usize,
309                max_size: MAX_CHUNK_SIZE,
310            });
311        }
312
313        match self.quote_generator.create_merkle_candidate_quote(
314            data_size_usize,
315            request.data_type,
316            request.merkle_payment_timestamp,
317        ) {
318            Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
319                Ok(bytes) => MerkleCandidateQuoteResponse::Success {
320                    candidate_node: bytes,
321                },
322                Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
323                    "Failed to serialize merkle candidate node: {e}"
324                ))),
325            },
326            Err(e) => {
327                MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
328            }
329        }
330    }
331
332    /// Get storage statistics.
333    #[must_use]
334    pub fn storage_stats(&self) -> crate::storage::StorageStats {
335        self.storage.stats()
336    }
337
338    /// Get payment cache statistics.
339    #[must_use]
340    pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
341        self.payment_verifier.cache_stats()
342    }
343
344    /// Get a reference to the payment verifier.
345    ///
346    /// Exposed for **test harnesses only** — production code should not call
347    /// this directly. Use `cache_insert()` on the returned verifier to
348    /// pre-populate the payment cache in test setups.
349    #[cfg(any(test, feature = "test-utils"))]
350    #[must_use]
351    pub fn payment_verifier(&self) -> &PaymentVerifier {
352        &self.payment_verifier
353    }
354
355    /// Check if a chunk exists locally.
356    ///
357    /// # Errors
358    ///
359    /// Returns an error if the storage read fails.
360    pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
361        self.storage.exists(address)
362    }
363
364    /// Get a chunk directly from local storage.
365    ///
366    /// # Errors
367    ///
368    /// Returns an error if storage access fails.
369    pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
370        self.storage.get(address).await
371    }
372
373    /// Store a chunk directly to local storage (bypasses payment verification).
374    ///
375    /// TEST ONLY - This method bypasses payment verification and should only be used in tests.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if storage fails or content doesn't match address.
380    #[cfg(test)]
381    pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
382        self.storage.put(address, content).await
383    }
384}
385
386#[cfg(test)]
387#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
388mod tests {
389    use super::*;
390    use crate::payment::metrics::QuotingMetricsTracker;
391    use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
392    use crate::storage::LmdbStorageConfig;
393    use evmlib::RewardsAddress;
394    use saorsa_core::identity::NodeIdentity;
395    use saorsa_core::MlDsa65;
396    use saorsa_pqc::pqc::types::MlDsaSecretKey;
397    use tempfile::TempDir;
398
399    async fn create_test_protocol() -> (AntProtocol, TempDir) {
400        let temp_dir = TempDir::new().expect("create temp dir");
401
402        let storage_config = LmdbStorageConfig {
403            root_dir: temp_dir.path().to_path_buf(),
404            verify_on_read: true,
405            max_chunks: 0,
406            max_map_size: 0,
407        };
408        let storage = Arc::new(
409            LmdbStorage::new(storage_config)
410                .await
411                .expect("create storage"),
412        );
413
414        let rewards_address = RewardsAddress::new([1u8; 20]);
415        let payment_config = PaymentVerifierConfig {
416            evm: EvmVerifierConfig::default(),
417            cache_capacity: 100_000,
418            local_rewards_address: rewards_address,
419        };
420        let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
421        let metrics_tracker = QuotingMetricsTracker::new(1000, 100);
422        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
423
424        // Wire ML-DSA-65 signing so quote requests succeed
425        let identity = NodeIdentity::generate().expect("generate identity");
426        let pub_key_bytes = identity.public_key().as_bytes().to_vec();
427        let sk_bytes = identity.secret_key_bytes().to_vec();
428        let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
429        quote_generator.set_signer(pub_key_bytes, move |msg| {
430            use saorsa_pqc::pqc::MlDsaOperations;
431            let ml_dsa = MlDsa65::new();
432            ml_dsa
433                .sign(&sk, msg)
434                .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
435        });
436
437        let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
438        (protocol, temp_dir)
439    }
440
441    #[tokio::test]
442    async fn test_put_and_get_chunk() {
443        let (protocol, _temp) = create_test_protocol().await;
444
445        let content = b"hello world";
446        let address = LmdbStorage::compute_address(content);
447
448        // Pre-populate payment cache so EVM verification is bypassed
449        protocol.payment_verifier().cache_insert(address);
450
451        let put_request = ChunkPutRequest::new(address, content.to_vec());
452        let put_msg = ChunkMessage {
453            request_id: 1,
454            body: ChunkMessageBody::PutRequest(put_request),
455        };
456        let put_bytes = put_msg.encode().expect("encode put");
457
458        // Handle PUT
459        let response_bytes = protocol
460            .try_handle_request(&put_bytes)
461            .await
462            .expect("handle put")
463            .expect("expected response");
464        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
465
466        assert_eq!(response.request_id, 1);
467        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
468            response.body
469        {
470            assert_eq!(addr, address);
471        } else {
472            panic!("expected PutResponse::Success, got: {response:?}");
473        }
474
475        // Create GET request
476        let get_request = ChunkGetRequest::new(address);
477        let get_msg = ChunkMessage {
478            request_id: 2,
479            body: ChunkMessageBody::GetRequest(get_request),
480        };
481        let get_bytes = get_msg.encode().expect("encode get");
482
483        // Handle GET
484        let response_bytes = protocol
485            .try_handle_request(&get_bytes)
486            .await
487            .expect("handle get")
488            .expect("expected response");
489        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
490
491        assert_eq!(response.request_id, 2);
492        if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
493            address: addr,
494            content: data,
495        }) = response.body
496        {
497            assert_eq!(addr, address);
498            assert_eq!(data, content.to_vec());
499        } else {
500            panic!("expected GetResponse::Success");
501        }
502    }
503
504    #[tokio::test]
505    async fn test_get_not_found() {
506        let (protocol, _temp) = create_test_protocol().await;
507
508        let address = [0xAB; 32];
509        let get_request = ChunkGetRequest::new(address);
510        let get_msg = ChunkMessage {
511            request_id: 10,
512            body: ChunkMessageBody::GetRequest(get_request),
513        };
514        let get_bytes = get_msg.encode().expect("encode get");
515
516        let response_bytes = protocol
517            .try_handle_request(&get_bytes)
518            .await
519            .expect("handle get")
520            .expect("expected response");
521        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
522
523        assert_eq!(response.request_id, 10);
524        if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
525            response.body
526        {
527            assert_eq!(addr, address);
528        } else {
529            panic!("expected GetResponse::NotFound");
530        }
531    }
532
533    #[tokio::test]
534    async fn test_put_address_mismatch() {
535        let (protocol, _temp) = create_test_protocol().await;
536
537        let content = b"test content";
538        let wrong_address = [0xFF; 32]; // Wrong address
539
540        // Pre-populate cache for the wrong address so we test address mismatch, not payment
541        protocol.payment_verifier().cache_insert(wrong_address);
542
543        let put_request = ChunkPutRequest::new(wrong_address, content.to_vec());
544        let put_msg = ChunkMessage {
545            request_id: 20,
546            body: ChunkMessageBody::PutRequest(put_request),
547        };
548        let put_bytes = put_msg.encode().expect("encode put");
549
550        let response_bytes = protocol
551            .try_handle_request(&put_bytes)
552            .await
553            .expect("handle put")
554            .expect("expected response");
555        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
556
557        assert_eq!(response.request_id, 20);
558        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
559            ProtocolError::AddressMismatch { .. },
560        )) = response.body
561        {
562            // Expected
563        } else {
564            panic!("expected AddressMismatch error, got: {response:?}");
565        }
566    }
567
568    #[tokio::test]
569    async fn test_put_chunk_too_large() {
570        let (protocol, _temp) = create_test_protocol().await;
571
572        // Create oversized content
573        let content = vec![0u8; MAX_CHUNK_SIZE + 1];
574        let address = LmdbStorage::compute_address(&content);
575
576        let put_request = ChunkPutRequest::new(address, content);
577        let put_msg = ChunkMessage {
578            request_id: 30,
579            body: ChunkMessageBody::PutRequest(put_request),
580        };
581        let put_bytes = put_msg.encode().expect("encode put");
582
583        let response_bytes = protocol
584            .try_handle_request(&put_bytes)
585            .await
586            .expect("handle put")
587            .expect("expected response");
588        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
589
590        assert_eq!(response.request_id, 30);
591        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
592            ProtocolError::ChunkTooLarge { .. },
593        )) = response.body
594        {
595            // Expected
596        } else {
597            panic!("expected ChunkTooLarge error");
598        }
599    }
600
601    #[tokio::test]
602    async fn test_put_already_exists() {
603        let (protocol, _temp) = create_test_protocol().await;
604
605        let content = b"duplicate content";
606        let address = LmdbStorage::compute_address(content);
607
608        // Pre-populate cache so EVM verification is bypassed
609        protocol.payment_verifier().cache_insert(address);
610
611        let put_request = ChunkPutRequest::new(address, content.to_vec());
612        let put_msg = ChunkMessage {
613            request_id: 40,
614            body: ChunkMessageBody::PutRequest(put_request),
615        };
616        let put_bytes = put_msg.encode().expect("encode put");
617
618        let _ = protocol
619            .try_handle_request(&put_bytes)
620            .await
621            .expect("handle put");
622
623        // Store again - should return AlreadyExists
624        let response_bytes = protocol
625            .try_handle_request(&put_bytes)
626            .await
627            .expect("handle put 2")
628            .expect("expected response");
629        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
630
631        assert_eq!(response.request_id, 40);
632        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
633            response.body
634        {
635            assert_eq!(addr, address);
636        } else {
637            panic!("expected AlreadyExists");
638        }
639    }
640
641    #[tokio::test]
642    async fn test_protocol_id() {
643        let (protocol, _temp) = create_test_protocol().await;
644        assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
645    }
646
647    #[tokio::test]
648    async fn test_exists_and_local_access() {
649        let (protocol, _temp) = create_test_protocol().await;
650
651        let content = b"local access test";
652        let address = LmdbStorage::compute_address(content);
653
654        assert!(!protocol.exists(&address).expect("exists check"));
655
656        protocol
657            .put_local(&address, content)
658            .await
659            .expect("put local");
660
661        assert!(protocol.exists(&address).expect("exists check"));
662
663        let retrieved = protocol.get_local(&address).await.expect("get local");
664        assert_eq!(retrieved, Some(content.to_vec()));
665    }
666
667    #[tokio::test]
668    async fn test_cache_insert_is_visible() {
669        let (protocol, _temp) = create_test_protocol().await;
670
671        let content = b"cache test content";
672        let address = LmdbStorage::compute_address(content);
673
674        // Before insert: cache should be empty
675        let stats_before = protocol.payment_cache_stats();
676        assert_eq!(stats_before.additions, 0);
677
678        // Pre-populate cache
679        protocol.payment_verifier().cache_insert(address);
680
681        // After insert: cache should have the xorname
682        let stats_after = protocol.payment_cache_stats();
683        assert_eq!(stats_after.additions, 1);
684
685        // PUT should succeed (cache hit)
686        let put_request = ChunkPutRequest::new(address, content.to_vec());
687        let put_msg = ChunkMessage {
688            request_id: 100,
689            body: ChunkMessageBody::PutRequest(put_request),
690        };
691        let put_bytes = put_msg.encode().expect("encode put");
692        let response_bytes = protocol
693            .try_handle_request(&put_bytes)
694            .await
695            .expect("handle put")
696            .expect("expected response");
697        let response = ChunkMessage::decode(&response_bytes).expect("decode");
698
699        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
700            // expected
701        } else {
702            panic!("expected success, got: {response:?}");
703        }
704    }
705
706    #[tokio::test]
707    async fn test_put_same_chunk_twice_hits_cache() {
708        let (protocol, _temp) = create_test_protocol().await;
709
710        let content = b"duplicate cache test";
711        let address = LmdbStorage::compute_address(content);
712
713        // Pre-populate cache for first PUT
714        protocol.payment_verifier().cache_insert(address);
715
716        // First PUT
717        let put_request = ChunkPutRequest::new(address, content.to_vec());
718        let put_msg = ChunkMessage {
719            request_id: 110,
720            body: ChunkMessageBody::PutRequest(put_request),
721        };
722        let put_bytes = put_msg.encode().expect("encode put");
723        let _ = protocol
724            .try_handle_request(&put_bytes)
725            .await
726            .expect("handle put 1");
727
728        // Second PUT — should return AlreadyExists (checked in storage before payment)
729        let response_bytes = protocol
730            .try_handle_request(&put_bytes)
731            .await
732            .expect("handle put 2")
733            .expect("expected response");
734        let response = ChunkMessage::decode(&response_bytes).expect("decode");
735
736        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
737        {
738            // expected — storage check comes before payment check
739        } else {
740            panic!("expected AlreadyExists, got: {response:?}");
741        }
742    }
743
744    #[tokio::test]
745    async fn test_payment_cache_stats_returns_correct_values() {
746        let (protocol, _temp) = create_test_protocol().await;
747
748        let stats = protocol.payment_cache_stats();
749        assert_eq!(stats.hits, 0);
750        assert_eq!(stats.misses, 0);
751        assert_eq!(stats.additions, 0);
752
753        // Pre-populate cache, then store a chunk to test stats
754        let content = b"stats test";
755        let address = LmdbStorage::compute_address(content);
756        protocol.payment_verifier().cache_insert(address);
757
758        let put_request = ChunkPutRequest::new(address, content.to_vec());
759        let put_msg = ChunkMessage {
760            request_id: 120,
761            body: ChunkMessageBody::PutRequest(put_request),
762        };
763        let put_bytes = put_msg.encode().expect("encode put");
764        let _ = protocol
765            .try_handle_request(&put_bytes)
766            .await
767            .expect("handle put");
768
769        let stats = protocol.payment_cache_stats();
770        // Should have 1 addition (from cache_insert) + 1 hit (payment verification found cache)
771        assert_eq!(stats.additions, 1);
772        assert_eq!(stats.hits, 1);
773    }
774
775    #[tokio::test]
776    async fn test_storage_stats() {
777        let (protocol, _temp) = create_test_protocol().await;
778        let stats = protocol.storage_stats();
779        assert_eq!(stats.chunks_stored, 0);
780    }
781
782    #[tokio::test]
783    async fn test_merkle_candidate_quote_request() {
784        use crate::payment::quote::verify_merkle_candidate_signature;
785        use evmlib::merkle_payments::MerklePaymentCandidateNode;
786
787        // create_test_protocol already wires ML-DSA-65 signing
788        let (protocol, _temp) = create_test_protocol().await;
789
790        let address = [0x77; 32];
791        let timestamp = std::time::SystemTime::now()
792            .duration_since(std::time::UNIX_EPOCH)
793            .expect("system time")
794            .as_secs();
795
796        let request = MerkleCandidateQuoteRequest {
797            address,
798            data_type: DATA_TYPE_CHUNK,
799            data_size: 4096,
800            merkle_payment_timestamp: timestamp,
801        };
802        let msg = ChunkMessage {
803            request_id: 600,
804            body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
805        };
806        let msg_bytes = msg.encode().expect("encode request");
807
808        let response_bytes = protocol
809            .try_handle_request(&msg_bytes)
810            .await
811            .expect("handle merkle candidate quote")
812            .expect("expected response");
813        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
814
815        assert_eq!(response.request_id, 600);
816        match response.body {
817            ChunkMessageBody::MerkleCandidateQuoteResponse(
818                MerkleCandidateQuoteResponse::Success { candidate_node },
819            ) => {
820                let candidate: MerklePaymentCandidateNode =
821                    rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
822
823                // Verify ML-DSA-65 signature
824                assert!(
825                    verify_merkle_candidate_signature(&candidate),
826                    "ML-DSA-65 candidate signature must be valid"
827                );
828
829                assert_eq!(candidate.merkle_payment_timestamp, timestamp);
830                assert_eq!(candidate.quoting_metrics.data_size, 4096);
831                assert_eq!(candidate.quoting_metrics.data_type, DATA_TYPE_CHUNK);
832            }
833            other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
834        }
835    }
836
837    #[tokio::test]
838    async fn test_handle_unexpected_response_message() {
839        let (protocol, _temp) = create_test_protocol().await;
840
841        // Send a PutResponse as if it were a request — should return None
842        let msg = ChunkMessage {
843            request_id: 200,
844            body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
845        };
846        let msg_bytes = msg.encode().expect("encode");
847
848        let result = protocol
849            .try_handle_request(&msg_bytes)
850            .await
851            .expect("handle msg");
852
853        assert!(
854            result.is_none(),
855            "expected None for response message, got: {result:?}"
856        );
857    }
858
859    #[tokio::test]
860    async fn test_quote_already_stored_flag() {
861        let (protocol, _temp) = create_test_protocol().await;
862
863        let content = b"already stored quote test";
864        let address = LmdbStorage::compute_address(content);
865
866        // Store the chunk first
867        protocol.payment_verifier().cache_insert(address);
868        let put_request = ChunkPutRequest::new(address, content.to_vec());
869        let put_msg = ChunkMessage {
870            request_id: 300,
871            body: ChunkMessageBody::PutRequest(put_request),
872        };
873        let put_bytes = put_msg.encode().expect("encode put");
874        let _ = protocol
875            .try_handle_request(&put_bytes)
876            .await
877            .expect("handle put");
878
879        // Now request a quote for the same address — already_stored should be true
880        let quote_request = ChunkQuoteRequest {
881            address,
882            data_size: content.len() as u64,
883            data_type: DATA_TYPE_CHUNK,
884        };
885        let quote_msg = ChunkMessage {
886            request_id: 301,
887            body: ChunkMessageBody::QuoteRequest(quote_request),
888        };
889        let quote_bytes = quote_msg.encode().expect("encode quote");
890        let response_bytes = protocol
891            .try_handle_request(&quote_bytes)
892            .await
893            .expect("handle quote")
894            .expect("expected response");
895        let response = ChunkMessage::decode(&response_bytes).expect("decode");
896
897        match response.body {
898            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
899                already_stored, ..
900            }) => {
901                assert!(
902                    already_stored,
903                    "already_stored should be true for existing chunk"
904                );
905            }
906            other => panic!("expected Success with already_stored, got: {other:?}"),
907        }
908
909        // Request a quote for a chunk that does NOT exist — already_stored should be false
910        let new_address = [0xFFu8; 32];
911        let quote_request2 = ChunkQuoteRequest {
912            address: new_address,
913            data_size: 100,
914            data_type: DATA_TYPE_CHUNK,
915        };
916        let quote_msg2 = ChunkMessage {
917            request_id: 302,
918            body: ChunkMessageBody::QuoteRequest(quote_request2),
919        };
920        let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
921        let response_bytes2 = protocol
922            .try_handle_request(&quote_bytes2)
923            .await
924            .expect("handle quote2")
925            .expect("expected response");
926        let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
927
928        match response2.body {
929            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
930                already_stored, ..
931            }) => {
932                assert!(
933                    !already_stored,
934                    "already_stored should be false for new chunk"
935                );
936            }
937            other => panic!("expected Success with already_stored=false, got: {other:?}"),
938        }
939    }
940}