Skip to main content

ant_node/storage/
handler.rs

1//! ANT protocol handler for autonomi protocol messages.
2//!
3//! This handler processes chunk PUT/GET requests with optional payment verification,
4//! storing chunks to LMDB and using the DHT for network-wide retrieval.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────┐
10//! │                    AntProtocol                        │
11//! ├─────────────────────────────────────────────────────────┤
12//! │  protocol_id() = "autonomi.ant.chunk.v1"                  │
13//! │                                                         │
14//! │  try_handle_request(data) ──▶ decode ChunkMessage  │
15//! │                                   │                     │
16//! │         ┌─────────────────────────┼─────────────────┐  │
17//! │         ▼                         ▼                 ▼  │
18//! │   ChunkQuoteRequest           ChunkPutRequest    ChunkGetRequest
19//! │         │                         │                 │  │
20//! │         ▼                         ▼                 ▼  │
21//! │   QuoteGenerator          PaymentVerifier    LmdbStorage│
22//! │         │                         │                 │  │
23//! │         └─────────────────────────┴─────────────────┘  │
24//! │                           │                             │
25//! │           return Ok(Some(response_bytes))              │
26//! │           return Ok(None) for response messages       │
27//! └─────────────────────────────────────────────────────────┘
28//! ```
29
30use crate::ant_protocol::{
31    ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
32    ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
33    MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, DATA_TYPE_CHUNK,
34    MAX_CHUNK_SIZE,
35};
36use crate::client::compute_address;
37use crate::error::{Error, Result};
38use crate::logging::{debug, info, warn};
39use crate::payment::{PaymentVerifier, QuoteGenerator};
40use crate::replication::fresh::FreshWriteEvent;
41use crate::storage::lmdb::LmdbStorage;
42use bytes::Bytes;
43use std::sync::Arc;
44use tokio::sync::mpsc;
45
46/// ANT protocol handler.
47///
48/// Handles chunk PUT/GET/Quote requests using LMDB storage for persistence
49/// and optional payment verification.
50pub struct AntProtocol {
51    /// LMDB storage for chunk persistence.
52    storage: Arc<LmdbStorage>,
53    /// Payment verifier for checking payments.
54    payment_verifier: Arc<PaymentVerifier>,
55    /// Quote generator for creating storage quotes.
56    /// Also handles merkle candidate quote signing via ML-DSA-65.
57    quote_generator: Arc<QuoteGenerator>,
58    /// Channel for notifying the replication engine about newly-stored chunks.
59    fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
60}
61
62impl AntProtocol {
63    /// Create a new ANT protocol handler.
64    ///
65    /// # Arguments
66    ///
67    /// * `storage` - LMDB storage for chunk persistence
68    /// * `payment_verifier` - Payment verifier for validating payments
69    /// * `quote_generator` - Quote generator for creating storage quotes
70    #[must_use]
71    pub fn new(
72        storage: Arc<LmdbStorage>,
73        payment_verifier: Arc<PaymentVerifier>,
74        quote_generator: Arc<QuoteGenerator>,
75    ) -> Self {
76        Self {
77            storage,
78            payment_verifier,
79            quote_generator,
80            fresh_write_tx: None,
81        }
82    }
83
84    /// Set the channel sender for fresh-write replication events.
85    ///
86    /// When set, successful chunk PUTs will notify the replication engine
87    /// so it can fan out fresh offers to the close group.
88    pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
89        self.fresh_write_tx = Some(tx);
90    }
91
92    /// Get the protocol identifier.
93    #[must_use]
94    pub fn protocol_id(&self) -> &'static str {
95        CHUNK_PROTOCOL_ID
96    }
97
98    /// Get a reference to the underlying LMDB storage.
99    #[must_use]
100    pub fn storage(&self) -> Arc<LmdbStorage> {
101        Arc::clone(&self.storage)
102    }
103
104    /// Get a shared reference to the payment verifier.
105    #[must_use]
106    pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
107        Arc::clone(&self.payment_verifier)
108    }
109
110    /// Handle an incoming request and produce a response.
111    ///
112    /// Decodes the raw message, processes it if it is a request variant,
113    /// and returns the encoded response bytes.  Returns `Ok(None)` for
114    /// response messages (which are meant for client subscribers, not for
115    /// the protocol handler).
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if message decoding, handling, or encoding fails.
120    pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
121        let message = ChunkMessage::decode(data)
122            .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
123
124        let request_id = message.request_id;
125
126        let response_body = match message.body {
127            ChunkMessageBody::PutRequest(req) => {
128                ChunkMessageBody::PutResponse(self.handle_put(req).await)
129            }
130            ChunkMessageBody::GetRequest(req) => {
131                ChunkMessageBody::GetResponse(self.handle_get(req).await)
132            }
133            ChunkMessageBody::QuoteRequest(ref req) => {
134                ChunkMessageBody::QuoteResponse(self.handle_quote(req))
135            }
136            ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
137                ChunkMessageBody::MerkleCandidateQuoteResponse(
138                    self.handle_merkle_candidate_quote(req),
139                )
140            }
141            // Anything else — response messages are handled by client
142            // subscribers (e.g. send_and_await_chunk_response), not by the
143            // protocol handler. Returning None prevents the caller from
144            // sending a reply, which would create an infinite ping-pong
145            // loop.
146            //
147            // `ChunkMessageBody` is `#[non_exhaustive]` in ant-protocol, so
148            // a future wire variant added on a protocol minor bump also
149            // lands here and is dropped. The CHUNK_PROTOCOL_ID multistream-
150            // select handshake version-gates peers, so this arm should
151            // only be reached by a misconfigured peer.
152            _ => return Ok(None),
153        };
154
155        let response = ChunkMessage {
156            request_id,
157            body: response_body,
158        };
159
160        response
161            .encode()
162            .map(|b| Some(Bytes::from(b)))
163            .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
164    }
165
166    /// Handle a PUT request.
167    async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
168        let address = request.address;
169        let addr_hex = hex::encode(address);
170        debug!("Handling PUT request for {addr_hex}");
171
172        // 1. Validate chunk size
173        if request.content.len() > MAX_CHUNK_SIZE {
174            return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
175                size: request.content.len(),
176                max_size: MAX_CHUNK_SIZE,
177            });
178        }
179
180        // 2. Verify content address matches BLAKE3(content)
181        let computed = compute_address(&request.content);
182        if computed != address {
183            return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
184                expected: address,
185                actual: computed,
186            });
187        }
188
189        // 3. Check if already exists (idempotent success)
190        match self.storage.exists(&address) {
191            Ok(true) => {
192                debug!("Chunk {addr_hex} already exists");
193                return ChunkPutResponse::AlreadyExists { address };
194            }
195            Err(e) => {
196                return ChunkPutResponse::Error(ProtocolError::Internal(format!(
197                    "Storage read failed: {e}"
198                )));
199            }
200            Ok(false) => {}
201        }
202
203        // 4. Verify payment
204        let payment_result = self
205            .payment_verifier
206            .verify_payment(&address, request.payment_proof.as_deref())
207            .await;
208
209        match payment_result {
210            Ok(status) if status.can_store() => {
211                // Payment verified or cached
212            }
213            Ok(_) => {
214                return ChunkPutResponse::PaymentRequired {
215                    message: "Payment required for new chunk".to_string(),
216                };
217            }
218            Err(e) => {
219                return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
220            }
221        }
222
223        // 5. Store chunk
224        match self.storage.put(&address, &request.content).await {
225            Ok(_) => {
226                let content_len = request.content.len();
227                info!("Stored chunk {addr_hex} ({content_len} bytes)");
228                // Record the store and payment in metrics
229                self.quote_generator.record_store(DATA_TYPE_CHUNK);
230                self.quote_generator.record_payment();
231
232                // 6. Notify replication engine for fresh fan-out.
233                //    Only emit when a real proof is present — cached-as-verified
234                //    PUTs have no proof to forward, and the chunk would have
235                //    already replicated on the original write that carried one.
236                if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
237                    let event = FreshWriteEvent {
238                        key: address,
239                        data: request.content,
240                        payment_proof: proof,
241                    };
242                    if tx.send(event).is_err() {
243                        debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
244                    }
245                }
246
247                ChunkPutResponse::Success { address }
248            }
249            Err(e) => {
250                warn!("Failed to store chunk {addr_hex}: {e}");
251                ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
252            }
253        }
254    }
255
256    /// Handle a GET request.
257    async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
258        let address = request.address;
259        let addr_hex = hex::encode(address);
260        debug!("Handling GET request for {addr_hex}");
261
262        match self.storage.get(&address).await {
263            Ok(Some(content)) => {
264                let content_len = content.len();
265                debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
266                ChunkGetResponse::Success { address, content }
267            }
268            Ok(None) => {
269                debug!("Chunk {addr_hex} not found");
270                ChunkGetResponse::NotFound { address }
271            }
272            Err(e) => {
273                warn!("Failed to retrieve chunk {addr_hex}: {e}");
274                ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
275            }
276        }
277    }
278
279    /// Handle a quote request.
280    fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
281        let addr_hex = hex::encode(request.address);
282        let data_size = request.data_size;
283        debug!("Handling quote request for {addr_hex} (size: {data_size})");
284
285        // Check if the chunk is already stored so we can tell the client
286        // to skip payment (already_stored = true).
287        // The match intentionally logs the error when the `logging` feature is
288        // active. Clippy suggests `unwrap_or_default()` when logging is compiled
289        // out, but keeping the explicit match preserves the diagnostic intent.
290        #[allow(clippy::manual_unwrap_or_default)]
291        let already_stored = match self.storage.exists(&request.address) {
292            Ok(exists) => exists,
293            Err(e) => {
294                warn!("Storage check failed for {addr_hex}: {e}");
295                false // Assume not stored on error — generate a normal quote.
296            }
297        };
298
299        if already_stored {
300            debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
301        }
302
303        // Validate data size - data_size is u64, cast carefully and reject overflow
304        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
305            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
306                size: MAX_CHUNK_SIZE + 1,
307                max_size: MAX_CHUNK_SIZE,
308            });
309        };
310        if data_size_usize > MAX_CHUNK_SIZE {
311            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
312                size: data_size_usize,
313                max_size: MAX_CHUNK_SIZE,
314            });
315        }
316
317        match self
318            .quote_generator
319            .create_quote(request.address, data_size_usize, request.data_type)
320        {
321            Ok(quote) => {
322                // Serialize the quote
323                match rmp_serde::to_vec(&quote) {
324                    Ok(quote_bytes) => ChunkQuoteResponse::Success {
325                        quote: quote_bytes,
326                        already_stored,
327                    },
328                    Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
329                        "Failed to serialize quote: {e}"
330                    ))),
331                }
332            }
333            Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
334        }
335    }
336
337    /// Handle a merkle candidate quote request.
338    fn handle_merkle_candidate_quote(
339        &self,
340        request: &MerkleCandidateQuoteRequest,
341    ) -> MerkleCandidateQuoteResponse {
342        let addr_hex = hex::encode(request.address);
343        let data_size = request.data_size;
344        debug!(
345            "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
346            request.merkle_payment_timestamp
347        );
348
349        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
350            return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
351                "data_size {} overflows usize",
352                request.data_size
353            )));
354        };
355        if data_size_usize > MAX_CHUNK_SIZE {
356            return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
357                size: data_size_usize,
358                max_size: MAX_CHUNK_SIZE,
359            });
360        }
361
362        match self.quote_generator.create_merkle_candidate_quote(
363            data_size_usize,
364            request.data_type,
365            request.merkle_payment_timestamp,
366        ) {
367            Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
368                Ok(bytes) => MerkleCandidateQuoteResponse::Success {
369                    candidate_node: bytes,
370                },
371                Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
372                    "Failed to serialize merkle candidate node: {e}"
373                ))),
374            },
375            Err(e) => {
376                MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
377            }
378        }
379    }
380
381    /// Get storage statistics.
382    #[must_use]
383    pub fn storage_stats(&self) -> crate::storage::StorageStats {
384        self.storage.stats()
385    }
386
387    /// Get payment cache statistics.
388    #[must_use]
389    pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
390        self.payment_verifier.cache_stats()
391    }
392
393    /// Get a reference to the payment verifier.
394    ///
395    /// Exposed for **test harnesses only** — production code should not call
396    /// this directly. Use `cache_insert()` on the returned verifier to
397    /// pre-populate the payment cache in test setups.
398    #[cfg(any(test, feature = "test-utils"))]
399    #[must_use]
400    pub fn payment_verifier(&self) -> &PaymentVerifier {
401        &self.payment_verifier
402    }
403
404    /// Check if a chunk exists locally.
405    ///
406    /// # Errors
407    ///
408    /// Returns an error if the storage read fails.
409    pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
410        self.storage.exists(address)
411    }
412
413    /// Get a chunk directly from local storage.
414    ///
415    /// # Errors
416    ///
417    /// Returns an error if storage access fails.
418    pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
419        self.storage.get(address).await
420    }
421
422    /// Store a chunk directly to local storage (bypasses payment verification).
423    ///
424    /// TEST ONLY - This method bypasses payment verification and should only be used in tests.
425    ///
426    /// # Errors
427    ///
428    /// Returns an error if storage fails or content doesn't match address.
429    #[cfg(test)]
430    pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
431        self.storage.put(address, content).await
432    }
433}
434
435#[cfg(test)]
436#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
437mod tests {
438    use super::*;
439    use crate::payment::metrics::QuotingMetricsTracker;
440    use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
441    use crate::storage::LmdbStorageConfig;
442    use evmlib::RewardsAddress;
443    use saorsa_core::identity::NodeIdentity;
444    use saorsa_core::MlDsa65;
445    use saorsa_pqc::pqc::types::MlDsaSecretKey;
446    use tempfile::TempDir;
447
448    async fn create_test_protocol() -> (AntProtocol, TempDir) {
449        let temp_dir = TempDir::new().expect("create temp dir");
450
451        let storage_config = LmdbStorageConfig {
452            root_dir: temp_dir.path().to_path_buf(),
453            ..LmdbStorageConfig::test_default()
454        };
455        let storage = Arc::new(
456            LmdbStorage::new(storage_config)
457                .await
458                .expect("create storage"),
459        );
460
461        let rewards_address = RewardsAddress::new([1u8; 20]);
462        let payment_config = PaymentVerifierConfig {
463            evm: EvmVerifierConfig::default(),
464            cache_capacity: 100_000,
465            local_rewards_address: rewards_address,
466        };
467        let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
468        let metrics_tracker = QuotingMetricsTracker::new(100);
469        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
470
471        // Wire ML-DSA-65 signing so quote requests succeed
472        let identity = NodeIdentity::generate().expect("generate identity");
473        let pub_key_bytes = identity.public_key().as_bytes().to_vec();
474        let sk_bytes = identity.secret_key_bytes().to_vec();
475        let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
476        quote_generator.set_signer(pub_key_bytes, move |msg| {
477            use saorsa_pqc::pqc::MlDsaOperations;
478            let ml_dsa = MlDsa65::new();
479            ml_dsa
480                .sign(&sk, msg)
481                .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
482        });
483
484        let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
485        (protocol, temp_dir)
486    }
487
488    #[tokio::test]
489    async fn test_put_and_get_chunk() {
490        let (protocol, _temp) = create_test_protocol().await;
491
492        let content = b"hello world";
493        let address = LmdbStorage::compute_address(content);
494
495        // Pre-populate payment cache so EVM verification is bypassed
496        protocol.payment_verifier().cache_insert(address);
497
498        let put_request = ChunkPutRequest::new(address, content.to_vec());
499        let put_msg = ChunkMessage {
500            request_id: 1,
501            body: ChunkMessageBody::PutRequest(put_request),
502        };
503        let put_bytes = put_msg.encode().expect("encode put");
504
505        // Handle PUT
506        let response_bytes = protocol
507            .try_handle_request(&put_bytes)
508            .await
509            .expect("handle put")
510            .expect("expected response");
511        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
512
513        assert_eq!(response.request_id, 1);
514        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
515            response.body
516        {
517            assert_eq!(addr, address);
518        } else {
519            panic!("expected PutResponse::Success, got: {response:?}");
520        }
521
522        // Create GET request
523        let get_request = ChunkGetRequest::new(address);
524        let get_msg = ChunkMessage {
525            request_id: 2,
526            body: ChunkMessageBody::GetRequest(get_request),
527        };
528        let get_bytes = get_msg.encode().expect("encode get");
529
530        // Handle GET
531        let response_bytes = protocol
532            .try_handle_request(&get_bytes)
533            .await
534            .expect("handle get")
535            .expect("expected response");
536        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
537
538        assert_eq!(response.request_id, 2);
539        if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
540            address: addr,
541            content: data,
542        }) = response.body
543        {
544            assert_eq!(addr, address);
545            assert_eq!(data, content.to_vec());
546        } else {
547            panic!("expected GetResponse::Success");
548        }
549    }
550
551    #[tokio::test]
552    async fn test_get_not_found() {
553        let (protocol, _temp) = create_test_protocol().await;
554
555        let address = [0xAB; 32];
556        let get_request = ChunkGetRequest::new(address);
557        let get_msg = ChunkMessage {
558            request_id: 10,
559            body: ChunkMessageBody::GetRequest(get_request),
560        };
561        let get_bytes = get_msg.encode().expect("encode get");
562
563        let response_bytes = protocol
564            .try_handle_request(&get_bytes)
565            .await
566            .expect("handle get")
567            .expect("expected response");
568        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
569
570        assert_eq!(response.request_id, 10);
571        if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
572            response.body
573        {
574            assert_eq!(addr, address);
575        } else {
576            panic!("expected GetResponse::NotFound");
577        }
578    }
579
580    #[tokio::test]
581    async fn test_put_address_mismatch() {
582        let (protocol, _temp) = create_test_protocol().await;
583
584        let content = b"test content";
585        let wrong_address = [0xFF; 32]; // Wrong address
586
587        // Pre-populate cache for the wrong address so we test address mismatch, not payment
588        protocol.payment_verifier().cache_insert(wrong_address);
589
590        let put_request = ChunkPutRequest::new(wrong_address, content.to_vec());
591        let put_msg = ChunkMessage {
592            request_id: 20,
593            body: ChunkMessageBody::PutRequest(put_request),
594        };
595        let put_bytes = put_msg.encode().expect("encode put");
596
597        let response_bytes = protocol
598            .try_handle_request(&put_bytes)
599            .await
600            .expect("handle put")
601            .expect("expected response");
602        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
603
604        assert_eq!(response.request_id, 20);
605        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
606            ProtocolError::AddressMismatch { .. },
607        )) = response.body
608        {
609            // Expected
610        } else {
611            panic!("expected AddressMismatch error, got: {response:?}");
612        }
613    }
614
615    #[tokio::test]
616    async fn test_put_chunk_too_large() {
617        let (protocol, _temp) = create_test_protocol().await;
618
619        // Create oversized content
620        let content = vec![0u8; MAX_CHUNK_SIZE + 1];
621        let address = LmdbStorage::compute_address(&content);
622
623        let put_request = ChunkPutRequest::new(address, content);
624        let put_msg = ChunkMessage {
625            request_id: 30,
626            body: ChunkMessageBody::PutRequest(put_request),
627        };
628        let put_bytes = put_msg.encode().expect("encode put");
629
630        let response_bytes = protocol
631            .try_handle_request(&put_bytes)
632            .await
633            .expect("handle put")
634            .expect("expected response");
635        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
636
637        assert_eq!(response.request_id, 30);
638        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
639            ProtocolError::ChunkTooLarge { .. },
640        )) = response.body
641        {
642            // Expected
643        } else {
644            panic!("expected ChunkTooLarge error");
645        }
646    }
647
648    #[tokio::test]
649    async fn test_put_already_exists() {
650        let (protocol, _temp) = create_test_protocol().await;
651
652        let content = b"duplicate content";
653        let address = LmdbStorage::compute_address(content);
654
655        // Pre-populate cache so EVM verification is bypassed
656        protocol.payment_verifier().cache_insert(address);
657
658        let put_request = ChunkPutRequest::new(address, content.to_vec());
659        let put_msg = ChunkMessage {
660            request_id: 40,
661            body: ChunkMessageBody::PutRequest(put_request),
662        };
663        let put_bytes = put_msg.encode().expect("encode put");
664
665        let _ = protocol
666            .try_handle_request(&put_bytes)
667            .await
668            .expect("handle put");
669
670        // Store again - should return AlreadyExists
671        let response_bytes = protocol
672            .try_handle_request(&put_bytes)
673            .await
674            .expect("handle put 2")
675            .expect("expected response");
676        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
677
678        assert_eq!(response.request_id, 40);
679        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
680            response.body
681        {
682            assert_eq!(addr, address);
683        } else {
684            panic!("expected AlreadyExists");
685        }
686    }
687
688    #[tokio::test]
689    async fn test_protocol_id() {
690        let (protocol, _temp) = create_test_protocol().await;
691        assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
692    }
693
694    #[tokio::test]
695    async fn test_exists_and_local_access() {
696        let (protocol, _temp) = create_test_protocol().await;
697
698        let content = b"local access test";
699        let address = LmdbStorage::compute_address(content);
700
701        assert!(!protocol.exists(&address).expect("exists check"));
702
703        protocol
704            .put_local(&address, content)
705            .await
706            .expect("put local");
707
708        assert!(protocol.exists(&address).expect("exists check"));
709
710        let retrieved = protocol.get_local(&address).await.expect("get local");
711        assert_eq!(retrieved, Some(content.to_vec()));
712    }
713
714    #[tokio::test]
715    async fn test_cache_insert_is_visible() {
716        let (protocol, _temp) = create_test_protocol().await;
717
718        let content = b"cache test content";
719        let address = LmdbStorage::compute_address(content);
720
721        // Before insert: cache should be empty
722        let stats_before = protocol.payment_cache_stats();
723        assert_eq!(stats_before.additions, 0);
724
725        // Pre-populate cache
726        protocol.payment_verifier().cache_insert(address);
727
728        // After insert: cache should have the xorname
729        let stats_after = protocol.payment_cache_stats();
730        assert_eq!(stats_after.additions, 1);
731
732        // PUT should succeed (cache hit)
733        let put_request = ChunkPutRequest::new(address, content.to_vec());
734        let put_msg = ChunkMessage {
735            request_id: 100,
736            body: ChunkMessageBody::PutRequest(put_request),
737        };
738        let put_bytes = put_msg.encode().expect("encode put");
739        let response_bytes = protocol
740            .try_handle_request(&put_bytes)
741            .await
742            .expect("handle put")
743            .expect("expected response");
744        let response = ChunkMessage::decode(&response_bytes).expect("decode");
745
746        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
747            // expected
748        } else {
749            panic!("expected success, got: {response:?}");
750        }
751    }
752
753    #[tokio::test]
754    async fn test_put_same_chunk_twice_hits_cache() {
755        let (protocol, _temp) = create_test_protocol().await;
756
757        let content = b"duplicate cache test";
758        let address = LmdbStorage::compute_address(content);
759
760        // Pre-populate cache for first PUT
761        protocol.payment_verifier().cache_insert(address);
762
763        // First PUT
764        let put_request = ChunkPutRequest::new(address, content.to_vec());
765        let put_msg = ChunkMessage {
766            request_id: 110,
767            body: ChunkMessageBody::PutRequest(put_request),
768        };
769        let put_bytes = put_msg.encode().expect("encode put");
770        let _ = protocol
771            .try_handle_request(&put_bytes)
772            .await
773            .expect("handle put 1");
774
775        // Second PUT — should return AlreadyExists (checked in storage before payment)
776        let response_bytes = protocol
777            .try_handle_request(&put_bytes)
778            .await
779            .expect("handle put 2")
780            .expect("expected response");
781        let response = ChunkMessage::decode(&response_bytes).expect("decode");
782
783        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
784        {
785            // expected — storage check comes before payment check
786        } else {
787            panic!("expected AlreadyExists, got: {response:?}");
788        }
789    }
790
791    #[tokio::test]
792    async fn test_payment_cache_stats_returns_correct_values() {
793        let (protocol, _temp) = create_test_protocol().await;
794
795        let stats = protocol.payment_cache_stats();
796        assert_eq!(stats.hits, 0);
797        assert_eq!(stats.misses, 0);
798        assert_eq!(stats.additions, 0);
799
800        // Pre-populate cache, then store a chunk to test stats
801        let content = b"stats test";
802        let address = LmdbStorage::compute_address(content);
803        protocol.payment_verifier().cache_insert(address);
804
805        let put_request = ChunkPutRequest::new(address, content.to_vec());
806        let put_msg = ChunkMessage {
807            request_id: 120,
808            body: ChunkMessageBody::PutRequest(put_request),
809        };
810        let put_bytes = put_msg.encode().expect("encode put");
811        let _ = protocol
812            .try_handle_request(&put_bytes)
813            .await
814            .expect("handle put");
815
816        let stats = protocol.payment_cache_stats();
817        // Should have 1 addition (from cache_insert) + 1 hit (payment verification found cache)
818        assert_eq!(stats.additions, 1);
819        assert_eq!(stats.hits, 1);
820    }
821
822    #[tokio::test]
823    async fn test_storage_stats() {
824        let (protocol, _temp) = create_test_protocol().await;
825        let stats = protocol.storage_stats();
826        assert_eq!(stats.chunks_stored, 0);
827    }
828
829    #[tokio::test]
830    async fn test_merkle_candidate_quote_request() {
831        use ant_protocol::payment::verify::verify_merkle_candidate_signature;
832        use evmlib::merkle_payments::MerklePaymentCandidateNode;
833
834        // create_test_protocol already wires ML-DSA-65 signing
835        let (protocol, _temp) = create_test_protocol().await;
836
837        let address = [0x77; 32];
838        let timestamp = std::time::SystemTime::now()
839            .duration_since(std::time::UNIX_EPOCH)
840            .expect("system time")
841            .as_secs();
842
843        let request = MerkleCandidateQuoteRequest {
844            address,
845            data_type: DATA_TYPE_CHUNK,
846            data_size: 4096,
847            merkle_payment_timestamp: timestamp,
848        };
849        let msg = ChunkMessage {
850            request_id: 600,
851            body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
852        };
853        let msg_bytes = msg.encode().expect("encode request");
854
855        let response_bytes = protocol
856            .try_handle_request(&msg_bytes)
857            .await
858            .expect("handle merkle candidate quote")
859            .expect("expected response");
860        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
861
862        assert_eq!(response.request_id, 600);
863        match response.body {
864            ChunkMessageBody::MerkleCandidateQuoteResponse(
865                MerkleCandidateQuoteResponse::Success { candidate_node },
866            ) => {
867                let candidate: MerklePaymentCandidateNode =
868                    rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
869
870                // Verify ML-DSA-65 signature
871                assert!(
872                    verify_merkle_candidate_signature(&candidate),
873                    "ML-DSA-65 candidate signature must be valid"
874                );
875
876                assert_eq!(candidate.merkle_payment_timestamp, timestamp);
877                // Node-calculated price based on records stored
878                assert!(candidate.price >= evmlib::common::Amount::ZERO);
879            }
880            other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
881        }
882    }
883
884    #[tokio::test]
885    async fn test_handle_unexpected_response_message() {
886        let (protocol, _temp) = create_test_protocol().await;
887
888        // Send a PutResponse as if it were a request — should return None
889        let msg = ChunkMessage {
890            request_id: 200,
891            body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
892        };
893        let msg_bytes = msg.encode().expect("encode");
894
895        let result = protocol
896            .try_handle_request(&msg_bytes)
897            .await
898            .expect("handle msg");
899
900        assert!(
901            result.is_none(),
902            "expected None for response message, got: {result:?}"
903        );
904    }
905
906    #[tokio::test]
907    async fn test_quote_already_stored_flag() {
908        let (protocol, _temp) = create_test_protocol().await;
909
910        let content = b"already stored quote test";
911        let address = LmdbStorage::compute_address(content);
912
913        // Store the chunk first
914        protocol.payment_verifier().cache_insert(address);
915        let put_request = ChunkPutRequest::new(address, content.to_vec());
916        let put_msg = ChunkMessage {
917            request_id: 300,
918            body: ChunkMessageBody::PutRequest(put_request),
919        };
920        let put_bytes = put_msg.encode().expect("encode put");
921        let _ = protocol
922            .try_handle_request(&put_bytes)
923            .await
924            .expect("handle put");
925
926        // Now request a quote for the same address — already_stored should be true
927        let quote_request = ChunkQuoteRequest {
928            address,
929            data_size: content.len() as u64,
930            data_type: DATA_TYPE_CHUNK,
931        };
932        let quote_msg = ChunkMessage {
933            request_id: 301,
934            body: ChunkMessageBody::QuoteRequest(quote_request),
935        };
936        let quote_bytes = quote_msg.encode().expect("encode quote");
937        let response_bytes = protocol
938            .try_handle_request(&quote_bytes)
939            .await
940            .expect("handle quote")
941            .expect("expected response");
942        let response = ChunkMessage::decode(&response_bytes).expect("decode");
943
944        match response.body {
945            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
946                already_stored, ..
947            }) => {
948                assert!(
949                    already_stored,
950                    "already_stored should be true for existing chunk"
951                );
952            }
953            other => panic!("expected Success with already_stored, got: {other:?}"),
954        }
955
956        // Request a quote for a chunk that does NOT exist — already_stored should be false
957        let new_address = [0xFFu8; 32];
958        let quote_request2 = ChunkQuoteRequest {
959            address: new_address,
960            data_size: 100,
961            data_type: DATA_TYPE_CHUNK,
962        };
963        let quote_msg2 = ChunkMessage {
964            request_id: 302,
965            body: ChunkMessageBody::QuoteRequest(quote_request2),
966        };
967        let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
968        let response_bytes2 = protocol
969            .try_handle_request(&quote_bytes2)
970            .await
971            .expect("handle quote2")
972            .expect("expected response");
973        let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
974
975        match response2.body {
976            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
977                already_stored, ..
978            }) => {
979                assert!(
980                    !already_stored,
981                    "already_stored should be false for new chunk"
982                );
983            }
984            other => panic!("expected Success with already_stored=false, got: {other:?}"),
985        }
986    }
987}