Skip to main content

ant_node/storage/
handler.rs

1//! ANT protocol handler for autonomi protocol messages.
2//!
3//! This handler processes chunk PUT/GET requests with optional payment verification,
4//! storing chunks to LMDB and using the DHT for network-wide retrieval.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────┐
10//! │                    AntProtocol                        │
11//! ├─────────────────────────────────────────────────────────┤
12//! │  protocol_id() = "autonomi/ant/chunk/v1"                  │
13//! │                                                         │
14//! │  handle_message(data) ──▶ decode ChunkMessage  │
15//! │                                   │                     │
16//! │         ┌─────────────────────────┼─────────────────┐  │
17//! │         ▼                         ▼                 ▼  │
18//! │   ChunkQuoteRequest           ChunkPutRequest    ChunkGetRequest
19//! │         │                         │                 │  │
20//! │         ▼                         ▼                 ▼  │
21//! │   QuoteGenerator          PaymentVerifier    LmdbStorage│
22//! │         │                         │                 │  │
23//! │         └─────────────────────────┴─────────────────┘  │
24//! │                           │                             │
25//! │                 return Ok(response_bytes)               │
26//! └─────────────────────────────────────────────────────────┘
27//! ```
28
29use crate::ant_protocol::{
30    ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
31    ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
32    MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, DATA_TYPE_CHUNK,
33    MAX_CHUNK_SIZE,
34};
35use crate::client::compute_address;
36use crate::error::{Error, Result};
37use crate::payment::{PaymentVerifier, QuoteGenerator};
38use crate::storage::lmdb::LmdbStorage;
39use bytes::Bytes;
40use std::sync::Arc;
41use tracing::{debug, info, warn};
42
43/// ANT protocol handler.
44///
45/// Handles chunk PUT/GET/Quote requests using LMDB storage for persistence
46/// and optional payment verification.
47pub struct AntProtocol {
48    /// LMDB storage for chunk persistence.
49    storage: Arc<LmdbStorage>,
50    /// Payment verifier for checking payments.
51    payment_verifier: Arc<PaymentVerifier>,
52    /// Quote generator for creating storage quotes.
53    /// Also handles merkle candidate quote signing via ML-DSA-65.
54    quote_generator: Arc<QuoteGenerator>,
55}
56
57impl AntProtocol {
58    /// Create a new ANT protocol handler.
59    ///
60    /// # Arguments
61    ///
62    /// * `storage` - LMDB storage for chunk persistence
63    /// * `payment_verifier` - Payment verifier for validating payments
64    /// * `quote_generator` - Quote generator for creating storage quotes
65    #[must_use]
66    pub fn new(
67        storage: Arc<LmdbStorage>,
68        payment_verifier: Arc<PaymentVerifier>,
69        quote_generator: Arc<QuoteGenerator>,
70    ) -> Self {
71        Self {
72            storage,
73            payment_verifier,
74            quote_generator,
75        }
76    }
77
78    /// Get the protocol identifier.
79    #[must_use]
80    pub fn protocol_id(&self) -> &'static str {
81        CHUNK_PROTOCOL_ID
82    }
83
84    /// Handle an incoming request and produce a response.
85    ///
86    /// Decodes the raw message, processes it if it is a request variant,
87    /// and returns the encoded response bytes.  Returns `Ok(None)` for
88    /// response messages (which are meant for client subscribers, not for
89    /// the protocol handler).
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if message decoding, handling, or encoding fails.
94    pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
95        let message = ChunkMessage::decode(data)
96            .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
97
98        let request_id = message.request_id;
99
100        let response_body = match message.body {
101            ChunkMessageBody::PutRequest(req) => {
102                ChunkMessageBody::PutResponse(self.handle_put(req).await)
103            }
104            ChunkMessageBody::GetRequest(req) => {
105                ChunkMessageBody::GetResponse(self.handle_get(req).await)
106            }
107            ChunkMessageBody::QuoteRequest(ref req) => {
108                ChunkMessageBody::QuoteResponse(self.handle_quote(req))
109            }
110            ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
111                ChunkMessageBody::MerkleCandidateQuoteResponse(
112                    self.handle_merkle_candidate_quote(req),
113                )
114            }
115            // Response messages are handled by client subscribers
116            // (e.g. send_and_await_chunk_response), not by the protocol
117            // handler. Returning None prevents the caller from sending a
118            // reply, which would create an infinite ping-pong loop.
119            ChunkMessageBody::PutResponse(_)
120            | ChunkMessageBody::GetResponse(_)
121            | ChunkMessageBody::QuoteResponse(_)
122            | ChunkMessageBody::MerkleCandidateQuoteResponse(_) => return Ok(None),
123        };
124
125        let response = ChunkMessage {
126            request_id,
127            body: response_body,
128        };
129
130        response
131            .encode()
132            .map(|b| Some(Bytes::from(b)))
133            .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
134    }
135
136    /// Handle an incoming protocol message.
137    ///
138    /// **Deprecated**: Use [`try_handle_request`](Self::try_handle_request)
139    /// instead. This method always returns response bytes — even for
140    /// response messages that should be ignored — which can create an
141    /// infinite ping-pong loop if the caller blindly sends the result back.
142    ///
143    /// # Errors
144    ///
145    /// Returns an error if message decoding or handling fails.
146    pub async fn handle_message(&self, data: &[u8]) -> Result<Bytes> {
147        let message = ChunkMessage::decode(data)
148            .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
149
150        let request_id = message.request_id;
151
152        let response_body = match message.body {
153            ChunkMessageBody::PutRequest(req) => {
154                ChunkMessageBody::PutResponse(self.handle_put(req).await)
155            }
156            ChunkMessageBody::GetRequest(req) => {
157                ChunkMessageBody::GetResponse(self.handle_get(req).await)
158            }
159            ChunkMessageBody::QuoteRequest(ref req) => {
160                ChunkMessageBody::QuoteResponse(self.handle_quote(req))
161            }
162            ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
163                ChunkMessageBody::MerkleCandidateQuoteResponse(
164                    self.handle_merkle_candidate_quote(req),
165                )
166            }
167            ChunkMessageBody::PutResponse(_)
168            | ChunkMessageBody::GetResponse(_)
169            | ChunkMessageBody::QuoteResponse(_)
170            | ChunkMessageBody::MerkleCandidateQuoteResponse(_) => {
171                let error = ProtocolError::Internal("Unexpected response message".to_string());
172                ChunkMessageBody::PutResponse(ChunkPutResponse::Error(error))
173            }
174        };
175
176        let response = ChunkMessage {
177            request_id,
178            body: response_body,
179        };
180
181        response
182            .encode()
183            .map(Bytes::from)
184            .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
185    }
186
187    /// Handle a PUT request.
188    async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
189        let address = request.address;
190        let addr_hex = hex::encode(address);
191        debug!("Handling PUT request for {addr_hex}");
192
193        // 1. Validate chunk size
194        if request.content.len() > MAX_CHUNK_SIZE {
195            return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
196                size: request.content.len(),
197                max_size: MAX_CHUNK_SIZE,
198            });
199        }
200
201        // 2. Verify content address matches BLAKE3(content)
202        let computed = compute_address(&request.content);
203        if computed != address {
204            return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
205                expected: address,
206                actual: computed,
207            });
208        }
209
210        // 3. Check if already exists (idempotent success)
211        match self.storage.exists(&address) {
212            Ok(true) => {
213                debug!("Chunk {addr_hex} already exists");
214                return ChunkPutResponse::AlreadyExists { address };
215            }
216            Err(e) => {
217                return ChunkPutResponse::Error(ProtocolError::Internal(format!(
218                    "Storage read failed: {e}"
219                )));
220            }
221            Ok(false) => {}
222        }
223
224        // 4. Verify payment
225        let payment_result = self
226            .payment_verifier
227            .verify_payment(&address, request.payment_proof.as_deref())
228            .await;
229
230        match payment_result {
231            Ok(status) if status.can_store() => {
232                // Payment verified or cached
233            }
234            Ok(_) => {
235                return ChunkPutResponse::PaymentRequired {
236                    message: "Payment required for new chunk".to_string(),
237                };
238            }
239            Err(e) => {
240                return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
241            }
242        }
243
244        // 5. Store chunk
245        match self.storage.put(&address, &request.content).await {
246            Ok(_) => {
247                let content_len = request.content.len();
248                info!("Stored chunk {addr_hex} ({content_len} bytes)");
249                // Record the store and payment in metrics
250                self.quote_generator.record_store(DATA_TYPE_CHUNK);
251                self.quote_generator.record_payment();
252                ChunkPutResponse::Success { address }
253            }
254            Err(e) => {
255                warn!("Failed to store chunk {addr_hex}: {e}");
256                ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
257            }
258        }
259    }
260
261    /// Handle a GET request.
262    async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
263        let address = request.address;
264        let addr_hex = hex::encode(address);
265        debug!("Handling GET request for {addr_hex}");
266
267        match self.storage.get(&address).await {
268            Ok(Some(content)) => {
269                let content_len = content.len();
270                debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
271                ChunkGetResponse::Success { address, content }
272            }
273            Ok(None) => {
274                debug!("Chunk {addr_hex} not found");
275                ChunkGetResponse::NotFound { address }
276            }
277            Err(e) => {
278                warn!("Failed to retrieve chunk {addr_hex}: {e}");
279                ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
280            }
281        }
282    }
283
284    /// Handle a quote request.
285    fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
286        let addr_hex = hex::encode(request.address);
287        let data_size = request.data_size;
288        debug!("Handling quote request for {addr_hex} (size: {data_size})");
289
290        // Check if the chunk is already stored so we can tell the client
291        // to skip payment (already_stored = true).
292        let already_stored = match self.storage.exists(&request.address) {
293            Ok(exists) => exists,
294            Err(e) => {
295                warn!("Storage check failed for {addr_hex}: {e}");
296                false // Assume not stored on error — generate a normal quote.
297            }
298        };
299
300        if already_stored {
301            debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
302        }
303
304        // Validate data size - data_size is u64, cast carefully and reject overflow
305        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
306            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
307                size: MAX_CHUNK_SIZE + 1,
308                max_size: MAX_CHUNK_SIZE,
309            });
310        };
311        if data_size_usize > MAX_CHUNK_SIZE {
312            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
313                size: data_size_usize,
314                max_size: MAX_CHUNK_SIZE,
315            });
316        }
317
318        match self
319            .quote_generator
320            .create_quote(request.address, data_size_usize, request.data_type)
321        {
322            Ok(quote) => {
323                // Serialize the quote
324                match rmp_serde::to_vec(&quote) {
325                    Ok(quote_bytes) => ChunkQuoteResponse::Success {
326                        quote: quote_bytes,
327                        already_stored,
328                    },
329                    Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
330                        "Failed to serialize quote: {e}"
331                    ))),
332                }
333            }
334            Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
335        }
336    }
337
338    /// Handle a merkle candidate quote request.
339    fn handle_merkle_candidate_quote(
340        &self,
341        request: &MerkleCandidateQuoteRequest,
342    ) -> MerkleCandidateQuoteResponse {
343        let addr_hex = hex::encode(request.address);
344        let data_size = request.data_size;
345        debug!(
346            "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
347            request.merkle_payment_timestamp
348        );
349
350        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
351            return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
352                "data_size {} overflows usize",
353                request.data_size
354            )));
355        };
356        if data_size_usize > MAX_CHUNK_SIZE {
357            return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
358                size: data_size_usize,
359                max_size: MAX_CHUNK_SIZE,
360            });
361        }
362
363        match self.quote_generator.create_merkle_candidate_quote(
364            data_size_usize,
365            request.data_type,
366            request.merkle_payment_timestamp,
367        ) {
368            Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
369                Ok(bytes) => MerkleCandidateQuoteResponse::Success {
370                    candidate_node: bytes,
371                },
372                Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
373                    "Failed to serialize merkle candidate node: {e}"
374                ))),
375            },
376            Err(e) => {
377                MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
378            }
379        }
380    }
381
382    /// Get storage statistics.
383    #[must_use]
384    pub fn storage_stats(&self) -> crate::storage::StorageStats {
385        self.storage.stats()
386    }
387
388    /// Get payment cache statistics.
389    #[must_use]
390    pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
391        self.payment_verifier.cache_stats()
392    }
393
394    /// Get a reference to the payment verifier.
395    ///
396    /// Exposed for **test harnesses only** — production code should not call
397    /// this directly. Use `cache_insert()` on the returned verifier to
398    /// pre-populate the payment cache in test setups.
399    #[cfg(any(test, feature = "test-utils"))]
400    #[must_use]
401    pub fn payment_verifier(&self) -> &PaymentVerifier {
402        &self.payment_verifier
403    }
404
405    /// Check if a chunk exists locally.
406    ///
407    /// # Errors
408    ///
409    /// Returns an error if the storage read fails.
410    pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
411        self.storage.exists(address)
412    }
413
414    /// Get a chunk directly from local storage.
415    ///
416    /// # Errors
417    ///
418    /// Returns an error if storage access fails.
419    pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
420        self.storage.get(address).await
421    }
422
423    /// Store a chunk directly to local storage (bypasses payment verification).
424    ///
425    /// TEST ONLY - This method bypasses payment verification and should only be used in tests.
426    ///
427    /// # Errors
428    ///
429    /// Returns an error if storage fails or content doesn't match address.
430    #[cfg(test)]
431    pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
432        self.storage.put(address, content).await
433    }
434}
435
436#[cfg(test)]
437#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
438mod tests {
439    use super::*;
440    use crate::payment::metrics::QuotingMetricsTracker;
441    use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
442    use crate::storage::LmdbStorageConfig;
443    use ant_evm::RewardsAddress;
444    use saorsa_core::identity::NodeIdentity;
445    use saorsa_core::MlDsa65;
446    use saorsa_pqc::pqc::types::MlDsaSecretKey;
447    use tempfile::TempDir;
448
449    async fn create_test_protocol() -> (AntProtocol, TempDir) {
450        let temp_dir = TempDir::new().expect("create temp dir");
451
452        let storage_config = LmdbStorageConfig {
453            root_dir: temp_dir.path().to_path_buf(),
454            verify_on_read: true,
455            max_chunks: 0,
456            max_map_size: 0,
457        };
458        let storage = Arc::new(
459            LmdbStorage::new(storage_config)
460                .await
461                .expect("create storage"),
462        );
463
464        let rewards_address = RewardsAddress::new([1u8; 20]);
465        let payment_config = PaymentVerifierConfig {
466            evm: EvmVerifierConfig::default(),
467            cache_capacity: 100_000,
468            local_rewards_address: rewards_address,
469        };
470        let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
471        let metrics_tracker = QuotingMetricsTracker::new(1000, 100);
472        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
473
474        // Wire ML-DSA-65 signing so quote requests succeed
475        let identity = NodeIdentity::generate().expect("generate identity");
476        let pub_key_bytes = identity.public_key().as_bytes().to_vec();
477        let sk_bytes = identity.secret_key_bytes().to_vec();
478        let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
479        quote_generator.set_signer(pub_key_bytes, move |msg| {
480            use saorsa_pqc::pqc::MlDsaOperations;
481            let ml_dsa = MlDsa65::new();
482            ml_dsa
483                .sign(&sk, msg)
484                .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
485        });
486
487        let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
488        (protocol, temp_dir)
489    }
490
491    #[tokio::test]
492    async fn test_put_and_get_chunk() {
493        let (protocol, _temp) = create_test_protocol().await;
494
495        let content = b"hello world";
496        let address = LmdbStorage::compute_address(content);
497
498        // Pre-populate payment cache so EVM verification is bypassed
499        protocol.payment_verifier().cache_insert(address);
500
501        let put_request = ChunkPutRequest::new(address, content.to_vec());
502        let put_msg = ChunkMessage {
503            request_id: 1,
504            body: ChunkMessageBody::PutRequest(put_request),
505        };
506        let put_bytes = put_msg.encode().expect("encode put");
507
508        // Handle PUT
509        let response_bytes = protocol
510            .handle_message(&put_bytes)
511            .await
512            .expect("handle put");
513        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
514
515        assert_eq!(response.request_id, 1);
516        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
517            response.body
518        {
519            assert_eq!(addr, address);
520        } else {
521            panic!("expected PutResponse::Success, got: {response:?}");
522        }
523
524        // Create GET request
525        let get_request = ChunkGetRequest::new(address);
526        let get_msg = ChunkMessage {
527            request_id: 2,
528            body: ChunkMessageBody::GetRequest(get_request),
529        };
530        let get_bytes = get_msg.encode().expect("encode get");
531
532        // Handle GET
533        let response_bytes = protocol
534            .handle_message(&get_bytes)
535            .await
536            .expect("handle get");
537        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
538
539        assert_eq!(response.request_id, 2);
540        if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
541            address: addr,
542            content: data,
543        }) = response.body
544        {
545            assert_eq!(addr, address);
546            assert_eq!(data, content.to_vec());
547        } else {
548            panic!("expected GetResponse::Success");
549        }
550    }
551
552    #[tokio::test]
553    async fn test_get_not_found() {
554        let (protocol, _temp) = create_test_protocol().await;
555
556        let address = [0xAB; 32];
557        let get_request = ChunkGetRequest::new(address);
558        let get_msg = ChunkMessage {
559            request_id: 10,
560            body: ChunkMessageBody::GetRequest(get_request),
561        };
562        let get_bytes = get_msg.encode().expect("encode get");
563
564        let response_bytes = protocol
565            .handle_message(&get_bytes)
566            .await
567            .expect("handle get");
568        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
569
570        assert_eq!(response.request_id, 10);
571        if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
572            response.body
573        {
574            assert_eq!(addr, address);
575        } else {
576            panic!("expected GetResponse::NotFound");
577        }
578    }
579
580    #[tokio::test]
581    async fn test_put_address_mismatch() {
582        let (protocol, _temp) = create_test_protocol().await;
583
584        let content = b"test content";
585        let wrong_address = [0xFF; 32]; // Wrong address
586
587        // Pre-populate cache for the wrong address so we test address mismatch, not payment
588        protocol.payment_verifier().cache_insert(wrong_address);
589
590        let put_request = ChunkPutRequest::new(wrong_address, content.to_vec());
591        let put_msg = ChunkMessage {
592            request_id: 20,
593            body: ChunkMessageBody::PutRequest(put_request),
594        };
595        let put_bytes = put_msg.encode().expect("encode put");
596
597        let response_bytes = protocol
598            .handle_message(&put_bytes)
599            .await
600            .expect("handle put");
601        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
602
603        assert_eq!(response.request_id, 20);
604        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
605            ProtocolError::AddressMismatch { .. },
606        )) = response.body
607        {
608            // Expected
609        } else {
610            panic!("expected AddressMismatch error, got: {response:?}");
611        }
612    }
613
614    #[tokio::test]
615    async fn test_put_chunk_too_large() {
616        let (protocol, _temp) = create_test_protocol().await;
617
618        // Create oversized content
619        let content = vec![0u8; MAX_CHUNK_SIZE + 1];
620        let address = LmdbStorage::compute_address(&content);
621
622        let put_request = ChunkPutRequest::new(address, content);
623        let put_msg = ChunkMessage {
624            request_id: 30,
625            body: ChunkMessageBody::PutRequest(put_request),
626        };
627        let put_bytes = put_msg.encode().expect("encode put");
628
629        let response_bytes = protocol
630            .handle_message(&put_bytes)
631            .await
632            .expect("handle put");
633        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
634
635        assert_eq!(response.request_id, 30);
636        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
637            ProtocolError::ChunkTooLarge { .. },
638        )) = response.body
639        {
640            // Expected
641        } else {
642            panic!("expected ChunkTooLarge error");
643        }
644    }
645
646    #[tokio::test]
647    async fn test_put_already_exists() {
648        let (protocol, _temp) = create_test_protocol().await;
649
650        let content = b"duplicate content";
651        let address = LmdbStorage::compute_address(content);
652
653        // Pre-populate cache so EVM verification is bypassed
654        protocol.payment_verifier().cache_insert(address);
655
656        let put_request = ChunkPutRequest::new(address, content.to_vec());
657        let put_msg = ChunkMessage {
658            request_id: 40,
659            body: ChunkMessageBody::PutRequest(put_request),
660        };
661        let put_bytes = put_msg.encode().expect("encode put");
662
663        let _ = protocol
664            .handle_message(&put_bytes)
665            .await
666            .expect("handle put");
667
668        // Store again - should return AlreadyExists
669        let response_bytes = protocol
670            .handle_message(&put_bytes)
671            .await
672            .expect("handle put 2");
673        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
674
675        assert_eq!(response.request_id, 40);
676        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
677            response.body
678        {
679            assert_eq!(addr, address);
680        } else {
681            panic!("expected AlreadyExists");
682        }
683    }
684
685    #[tokio::test]
686    async fn test_protocol_id() {
687        let (protocol, _temp) = create_test_protocol().await;
688        assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
689    }
690
691    #[tokio::test]
692    async fn test_exists_and_local_access() {
693        let (protocol, _temp) = create_test_protocol().await;
694
695        let content = b"local access test";
696        let address = LmdbStorage::compute_address(content);
697
698        assert!(!protocol.exists(&address).expect("exists check"));
699
700        protocol
701            .put_local(&address, content)
702            .await
703            .expect("put local");
704
705        assert!(protocol.exists(&address).expect("exists check"));
706
707        let retrieved = protocol.get_local(&address).await.expect("get local");
708        assert_eq!(retrieved, Some(content.to_vec()));
709    }
710
711    #[tokio::test]
712    async fn test_cache_insert_is_visible() {
713        let (protocol, _temp) = create_test_protocol().await;
714
715        let content = b"cache test content";
716        let address = LmdbStorage::compute_address(content);
717
718        // Before insert: cache should be empty
719        let stats_before = protocol.payment_cache_stats();
720        assert_eq!(stats_before.additions, 0);
721
722        // Pre-populate cache
723        protocol.payment_verifier().cache_insert(address);
724
725        // After insert: cache should have the xorname
726        let stats_after = protocol.payment_cache_stats();
727        assert_eq!(stats_after.additions, 1);
728
729        // PUT should succeed (cache hit)
730        let put_request = ChunkPutRequest::new(address, content.to_vec());
731        let put_msg = ChunkMessage {
732            request_id: 100,
733            body: ChunkMessageBody::PutRequest(put_request),
734        };
735        let put_bytes = put_msg.encode().expect("encode put");
736        let response_bytes = protocol
737            .handle_message(&put_bytes)
738            .await
739            .expect("handle put");
740        let response = ChunkMessage::decode(&response_bytes).expect("decode");
741
742        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
743            // expected
744        } else {
745            panic!("expected success, got: {response:?}");
746        }
747    }
748
749    #[tokio::test]
750    async fn test_put_same_chunk_twice_hits_cache() {
751        let (protocol, _temp) = create_test_protocol().await;
752
753        let content = b"duplicate cache test";
754        let address = LmdbStorage::compute_address(content);
755
756        // Pre-populate cache for first PUT
757        protocol.payment_verifier().cache_insert(address);
758
759        // First PUT
760        let put_request = ChunkPutRequest::new(address, content.to_vec());
761        let put_msg = ChunkMessage {
762            request_id: 110,
763            body: ChunkMessageBody::PutRequest(put_request),
764        };
765        let put_bytes = put_msg.encode().expect("encode put");
766        let _ = protocol
767            .handle_message(&put_bytes)
768            .await
769            .expect("handle put 1");
770
771        // Second PUT — should return AlreadyExists (checked in storage before payment)
772        let response_bytes = protocol
773            .handle_message(&put_bytes)
774            .await
775            .expect("handle put 2");
776        let response = ChunkMessage::decode(&response_bytes).expect("decode");
777
778        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
779        {
780            // expected — storage check comes before payment check
781        } else {
782            panic!("expected AlreadyExists, got: {response:?}");
783        }
784    }
785
786    #[tokio::test]
787    async fn test_payment_cache_stats_returns_correct_values() {
788        let (protocol, _temp) = create_test_protocol().await;
789
790        let stats = protocol.payment_cache_stats();
791        assert_eq!(stats.hits, 0);
792        assert_eq!(stats.misses, 0);
793        assert_eq!(stats.additions, 0);
794
795        // Pre-populate cache, then store a chunk to test stats
796        let content = b"stats test";
797        let address = LmdbStorage::compute_address(content);
798        protocol.payment_verifier().cache_insert(address);
799
800        let put_request = ChunkPutRequest::new(address, content.to_vec());
801        let put_msg = ChunkMessage {
802            request_id: 120,
803            body: ChunkMessageBody::PutRequest(put_request),
804        };
805        let put_bytes = put_msg.encode().expect("encode put");
806        let _ = protocol
807            .handle_message(&put_bytes)
808            .await
809            .expect("handle put");
810
811        let stats = protocol.payment_cache_stats();
812        // Should have 1 addition (from cache_insert) + 1 hit (payment verification found cache)
813        assert_eq!(stats.additions, 1);
814        assert_eq!(stats.hits, 1);
815    }
816
817    #[tokio::test]
818    async fn test_storage_stats() {
819        let (protocol, _temp) = create_test_protocol().await;
820        let stats = protocol.storage_stats();
821        assert_eq!(stats.chunks_stored, 0);
822    }
823
824    #[tokio::test]
825    async fn test_merkle_candidate_quote_request() {
826        use crate::payment::quote::verify_merkle_candidate_signature;
827        use ant_evm::merkle_payments::MerklePaymentCandidateNode;
828
829        // create_test_protocol already wires ML-DSA-65 signing
830        let (protocol, _temp) = create_test_protocol().await;
831
832        let address = [0x77; 32];
833        let timestamp = std::time::SystemTime::now()
834            .duration_since(std::time::UNIX_EPOCH)
835            .expect("system time")
836            .as_secs();
837
838        let request = MerkleCandidateQuoteRequest {
839            address,
840            data_type: DATA_TYPE_CHUNK,
841            data_size: 4096,
842            merkle_payment_timestamp: timestamp,
843        };
844        let msg = ChunkMessage {
845            request_id: 600,
846            body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
847        };
848        let msg_bytes = msg.encode().expect("encode request");
849
850        let response_bytes = protocol
851            .handle_message(&msg_bytes)
852            .await
853            .expect("handle merkle candidate quote");
854        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
855
856        assert_eq!(response.request_id, 600);
857        match response.body {
858            ChunkMessageBody::MerkleCandidateQuoteResponse(
859                MerkleCandidateQuoteResponse::Success { candidate_node },
860            ) => {
861                let candidate: MerklePaymentCandidateNode =
862                    rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
863
864                // Verify ML-DSA-65 signature
865                assert!(
866                    verify_merkle_candidate_signature(&candidate),
867                    "ML-DSA-65 candidate signature must be valid"
868                );
869
870                assert_eq!(candidate.merkle_payment_timestamp, timestamp);
871                assert_eq!(candidate.quoting_metrics.data_size, 4096);
872                assert_eq!(candidate.quoting_metrics.data_type, DATA_TYPE_CHUNK);
873            }
874            other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
875        }
876    }
877
878    #[tokio::test]
879    async fn test_handle_unexpected_response_message() {
880        let (protocol, _temp) = create_test_protocol().await;
881
882        // Send a PutResponse as if it were a request
883        let msg = ChunkMessage {
884            request_id: 200,
885            body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
886        };
887        let msg_bytes = msg.encode().expect("encode");
888
889        let response_bytes = protocol
890            .handle_message(&msg_bytes)
891            .await
892            .expect("handle msg");
893        let response = ChunkMessage::decode(&response_bytes).expect("decode");
894
895        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(ProtocolError::Internal(
896            msg,
897        ))) = response.body
898        {
899            assert!(msg.contains("Unexpected"));
900        } else {
901            panic!("expected Internal error, got: {response:?}");
902        }
903    }
904
905    #[tokio::test]
906    async fn test_quote_already_stored_flag() {
907        let (protocol, _temp) = create_test_protocol().await;
908
909        let content = b"already stored quote test";
910        let address = LmdbStorage::compute_address(content);
911
912        // Store the chunk first
913        protocol.payment_verifier().cache_insert(address);
914        let put_request = ChunkPutRequest::new(address, content.to_vec());
915        let put_msg = ChunkMessage {
916            request_id: 300,
917            body: ChunkMessageBody::PutRequest(put_request),
918        };
919        let put_bytes = put_msg.encode().expect("encode put");
920        let _ = protocol
921            .handle_message(&put_bytes)
922            .await
923            .expect("handle put");
924
925        // Now request a quote for the same address — already_stored should be true
926        let quote_request = ChunkQuoteRequest {
927            address,
928            data_size: content.len() as u64,
929            data_type: DATA_TYPE_CHUNK,
930        };
931        let quote_msg = ChunkMessage {
932            request_id: 301,
933            body: ChunkMessageBody::QuoteRequest(quote_request),
934        };
935        let quote_bytes = quote_msg.encode().expect("encode quote");
936        let response_bytes = protocol
937            .handle_message(&quote_bytes)
938            .await
939            .expect("handle quote");
940        let response = ChunkMessage::decode(&response_bytes).expect("decode");
941
942        match response.body {
943            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
944                already_stored, ..
945            }) => {
946                assert!(
947                    already_stored,
948                    "already_stored should be true for existing chunk"
949                );
950            }
951            other => panic!("expected Success with already_stored, got: {other:?}"),
952        }
953
954        // Request a quote for a chunk that does NOT exist — already_stored should be false
955        let new_address = [0xFFu8; 32];
956        let quote_request2 = ChunkQuoteRequest {
957            address: new_address,
958            data_size: 100,
959            data_type: DATA_TYPE_CHUNK,
960        };
961        let quote_msg2 = ChunkMessage {
962            request_id: 302,
963            body: ChunkMessageBody::QuoteRequest(quote_request2),
964        };
965        let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
966        let response_bytes2 = protocol
967            .handle_message(&quote_bytes2)
968            .await
969            .expect("handle quote2");
970        let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
971
972        match response2.body {
973            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
974                already_stored, ..
975            }) => {
976                assert!(
977                    !already_stored,
978                    "already_stored should be false for new chunk"
979                );
980            }
981            other => panic!("expected Success with already_stored=false, got: {other:?}"),
982        }
983    }
984}