Skip to main content

ant_node/storage/
handler.rs

1//! ANT protocol handler for autonomi protocol messages.
2//!
3//! This handler processes chunk PUT/GET requests with optional payment verification,
4//! storing chunks to LMDB and using the DHT for network-wide retrieval.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────┐
10//! │                    AntProtocol                        │
11//! ├─────────────────────────────────────────────────────────┤
12//! │  protocol_id() = "autonomi.ant.chunk.v1"                  │
13//! │                                                         │
14//! │  try_handle_request(data) ──▶ decode ChunkMessage  │
15//! │                                   │                     │
16//! │         ┌─────────────────────────┼─────────────────┐  │
17//! │         ▼                         ▼                 ▼  │
18//! │   ChunkQuoteRequest           ChunkPutRequest    ChunkGetRequest
19//! │         │                         │                 │  │
20//! │         ▼                         ▼                 ▼  │
21//! │   QuoteGenerator          PaymentVerifier    LmdbStorage│
22//! │         │                         │                 │  │
23//! │         └─────────────────────────┴─────────────────┘  │
24//! │                           │                             │
25//! │           return Ok(Some(response_bytes))              │
26//! │           return Ok(None) for response messages       │
27//! └─────────────────────────────────────────────────────────┘
28//! ```
29
30use crate::ant_protocol::{
31    ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
32    ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
33    MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, DATA_TYPE_CHUNK,
34    MAX_CHUNK_SIZE,
35};
36use crate::client::compute_address;
37use crate::error::{Error, Result};
38use crate::logging::{debug, info, warn};
39use crate::payment::{PaymentVerifier, QuoteGenerator};
40use crate::replication::fresh::FreshWriteEvent;
41use crate::storage::lmdb::LmdbStorage;
42use bytes::Bytes;
43use std::sync::Arc;
44use tokio::sync::mpsc;
45
46/// ANT protocol handler.
47///
48/// Handles chunk PUT/GET/Quote requests using LMDB storage for persistence
49/// and optional payment verification.
50pub struct AntProtocol {
51    /// LMDB storage for chunk persistence.
52    storage: Arc<LmdbStorage>,
53    /// Payment verifier for checking payments.
54    payment_verifier: Arc<PaymentVerifier>,
55    /// Quote generator for creating storage quotes.
56    /// Also handles merkle candidate quote signing via ML-DSA-65.
57    quote_generator: Arc<QuoteGenerator>,
58    /// Channel for notifying the replication engine about newly-stored chunks.
59    fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
60}
61
62impl AntProtocol {
63    /// Create a new ANT protocol handler.
64    ///
65    /// # Arguments
66    ///
67    /// * `storage` - LMDB storage for chunk persistence
68    /// * `payment_verifier` - Payment verifier for validating payments
69    /// * `quote_generator` - Quote generator for creating storage quotes
70    #[must_use]
71    pub fn new(
72        storage: Arc<LmdbStorage>,
73        payment_verifier: Arc<PaymentVerifier>,
74        quote_generator: Arc<QuoteGenerator>,
75    ) -> Self {
76        Self {
77            storage,
78            payment_verifier,
79            quote_generator,
80            fresh_write_tx: None,
81        }
82    }
83
84    /// Set the channel sender for fresh-write replication events.
85    ///
86    /// When set, successful chunk PUTs will notify the replication engine
87    /// so it can fan out fresh offers to the close group.
88    pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
89        self.fresh_write_tx = Some(tx);
90    }
91
92    /// Get the protocol identifier.
93    #[must_use]
94    pub fn protocol_id(&self) -> &'static str {
95        CHUNK_PROTOCOL_ID
96    }
97
98    /// Get a reference to the underlying LMDB storage.
99    #[must_use]
100    pub fn storage(&self) -> Arc<LmdbStorage> {
101        Arc::clone(&self.storage)
102    }
103
104    /// Get a shared reference to the payment verifier.
105    #[must_use]
106    pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
107        Arc::clone(&self.payment_verifier)
108    }
109
110    /// Handle an incoming request and produce a response.
111    ///
112    /// Decodes the raw message, processes it if it is a request variant,
113    /// and returns the encoded response bytes.  Returns `Ok(None)` for
114    /// response messages (which are meant for client subscribers, not for
115    /// the protocol handler).
116    ///
117    /// # Errors
118    ///
119    /// Returns an error if message decoding, handling, or encoding fails.
120    pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
121        let message = ChunkMessage::decode(data)
122            .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
123
124        let request_id = message.request_id;
125
126        let response_body = match message.body {
127            ChunkMessageBody::PutRequest(req) => {
128                ChunkMessageBody::PutResponse(self.handle_put(req).await)
129            }
130            ChunkMessageBody::GetRequest(req) => {
131                ChunkMessageBody::GetResponse(self.handle_get(req).await)
132            }
133            ChunkMessageBody::QuoteRequest(ref req) => {
134                ChunkMessageBody::QuoteResponse(self.handle_quote(req))
135            }
136            ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
137                ChunkMessageBody::MerkleCandidateQuoteResponse(
138                    self.handle_merkle_candidate_quote(req),
139                )
140            }
141            // Response messages are handled by client subscribers
142            // (e.g. send_and_await_chunk_response), not by the protocol
143            // handler. Returning None prevents the caller from sending a
144            // reply, which would create an infinite ping-pong loop.
145            ChunkMessageBody::PutResponse(_)
146            | ChunkMessageBody::GetResponse(_)
147            | ChunkMessageBody::QuoteResponse(_)
148            | ChunkMessageBody::MerkleCandidateQuoteResponse(_) => return Ok(None),
149        };
150
151        let response = ChunkMessage {
152            request_id,
153            body: response_body,
154        };
155
156        response
157            .encode()
158            .map(|b| Some(Bytes::from(b)))
159            .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
160    }
161
162    /// Handle a PUT request.
163    async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
164        let address = request.address;
165        let addr_hex = hex::encode(address);
166        debug!("Handling PUT request for {addr_hex}");
167
168        // 1. Validate chunk size
169        if request.content.len() > MAX_CHUNK_SIZE {
170            return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
171                size: request.content.len(),
172                max_size: MAX_CHUNK_SIZE,
173            });
174        }
175
176        // 2. Verify content address matches BLAKE3(content)
177        let computed = compute_address(&request.content);
178        if computed != address {
179            return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
180                expected: address,
181                actual: computed,
182            });
183        }
184
185        // 3. Check if already exists (idempotent success)
186        match self.storage.exists(&address) {
187            Ok(true) => {
188                debug!("Chunk {addr_hex} already exists");
189                return ChunkPutResponse::AlreadyExists { address };
190            }
191            Err(e) => {
192                return ChunkPutResponse::Error(ProtocolError::Internal(format!(
193                    "Storage read failed: {e}"
194                )));
195            }
196            Ok(false) => {}
197        }
198
199        // 4. Verify payment
200        let payment_result = self
201            .payment_verifier
202            .verify_payment(&address, request.payment_proof.as_deref())
203            .await;
204
205        match payment_result {
206            Ok(status) if status.can_store() => {
207                // Payment verified or cached
208            }
209            Ok(_) => {
210                return ChunkPutResponse::PaymentRequired {
211                    message: "Payment required for new chunk".to_string(),
212                };
213            }
214            Err(e) => {
215                return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
216            }
217        }
218
219        // 5. Store chunk
220        match self.storage.put(&address, &request.content).await {
221            Ok(_) => {
222                let content_len = request.content.len();
223                info!("Stored chunk {addr_hex} ({content_len} bytes)");
224                // Record the store and payment in metrics
225                self.quote_generator.record_store(DATA_TYPE_CHUNK);
226                self.quote_generator.record_payment();
227
228                // 6. Notify replication engine for fresh fan-out.
229                //    Only emit when a real proof is present — cached-as-verified
230                //    PUTs have no proof to forward, and the chunk would have
231                //    already replicated on the original write that carried one.
232                if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
233                    let event = FreshWriteEvent {
234                        key: address,
235                        data: request.content,
236                        payment_proof: proof,
237                    };
238                    if tx.send(event).is_err() {
239                        debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
240                    }
241                }
242
243                ChunkPutResponse::Success { address }
244            }
245            Err(e) => {
246                warn!("Failed to store chunk {addr_hex}: {e}");
247                ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
248            }
249        }
250    }
251
252    /// Handle a GET request.
253    async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
254        let address = request.address;
255        let addr_hex = hex::encode(address);
256        debug!("Handling GET request for {addr_hex}");
257
258        match self.storage.get(&address).await {
259            Ok(Some(content)) => {
260                let content_len = content.len();
261                debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
262                ChunkGetResponse::Success { address, content }
263            }
264            Ok(None) => {
265                debug!("Chunk {addr_hex} not found");
266                ChunkGetResponse::NotFound { address }
267            }
268            Err(e) => {
269                warn!("Failed to retrieve chunk {addr_hex}: {e}");
270                ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
271            }
272        }
273    }
274
275    /// Handle a quote request.
276    fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
277        let addr_hex = hex::encode(request.address);
278        let data_size = request.data_size;
279        debug!("Handling quote request for {addr_hex} (size: {data_size})");
280
281        // Check if the chunk is already stored so we can tell the client
282        // to skip payment (already_stored = true).
283        // The match intentionally logs the error when the `logging` feature is
284        // active. Clippy suggests `unwrap_or_default()` when logging is compiled
285        // out, but keeping the explicit match preserves the diagnostic intent.
286        #[allow(clippy::manual_unwrap_or_default)]
287        let already_stored = match self.storage.exists(&request.address) {
288            Ok(exists) => exists,
289            Err(e) => {
290                warn!("Storage check failed for {addr_hex}: {e}");
291                false // Assume not stored on error — generate a normal quote.
292            }
293        };
294
295        if already_stored {
296            debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
297        }
298
299        // Validate data size - data_size is u64, cast carefully and reject overflow
300        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
301            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
302                size: MAX_CHUNK_SIZE + 1,
303                max_size: MAX_CHUNK_SIZE,
304            });
305        };
306        if data_size_usize > MAX_CHUNK_SIZE {
307            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
308                size: data_size_usize,
309                max_size: MAX_CHUNK_SIZE,
310            });
311        }
312
313        match self
314            .quote_generator
315            .create_quote(request.address, data_size_usize, request.data_type)
316        {
317            Ok(quote) => {
318                // Serialize the quote
319                match rmp_serde::to_vec(&quote) {
320                    Ok(quote_bytes) => ChunkQuoteResponse::Success {
321                        quote: quote_bytes,
322                        already_stored,
323                    },
324                    Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
325                        "Failed to serialize quote: {e}"
326                    ))),
327                }
328            }
329            Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
330        }
331    }
332
333    /// Handle a merkle candidate quote request.
334    fn handle_merkle_candidate_quote(
335        &self,
336        request: &MerkleCandidateQuoteRequest,
337    ) -> MerkleCandidateQuoteResponse {
338        let addr_hex = hex::encode(request.address);
339        let data_size = request.data_size;
340        debug!(
341            "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
342            request.merkle_payment_timestamp
343        );
344
345        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
346            return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
347                "data_size {} overflows usize",
348                request.data_size
349            )));
350        };
351        if data_size_usize > MAX_CHUNK_SIZE {
352            return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
353                size: data_size_usize,
354                max_size: MAX_CHUNK_SIZE,
355            });
356        }
357
358        match self.quote_generator.create_merkle_candidate_quote(
359            data_size_usize,
360            request.data_type,
361            request.merkle_payment_timestamp,
362        ) {
363            Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
364                Ok(bytes) => MerkleCandidateQuoteResponse::Success {
365                    candidate_node: bytes,
366                },
367                Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
368                    "Failed to serialize merkle candidate node: {e}"
369                ))),
370            },
371            Err(e) => {
372                MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
373            }
374        }
375    }
376
377    /// Get storage statistics.
378    #[must_use]
379    pub fn storage_stats(&self) -> crate::storage::StorageStats {
380        self.storage.stats()
381    }
382
383    /// Get payment cache statistics.
384    #[must_use]
385    pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
386        self.payment_verifier.cache_stats()
387    }
388
389    /// Get a reference to the payment verifier.
390    ///
391    /// Exposed for **test harnesses only** — production code should not call
392    /// this directly. Use `cache_insert()` on the returned verifier to
393    /// pre-populate the payment cache in test setups.
394    #[cfg(any(test, feature = "test-utils"))]
395    #[must_use]
396    pub fn payment_verifier(&self) -> &PaymentVerifier {
397        &self.payment_verifier
398    }
399
400    /// Check if a chunk exists locally.
401    ///
402    /// # Errors
403    ///
404    /// Returns an error if the storage read fails.
405    pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
406        self.storage.exists(address)
407    }
408
409    /// Get a chunk directly from local storage.
410    ///
411    /// # Errors
412    ///
413    /// Returns an error if storage access fails.
414    pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
415        self.storage.get(address).await
416    }
417
418    /// Store a chunk directly to local storage (bypasses payment verification).
419    ///
420    /// TEST ONLY - This method bypasses payment verification and should only be used in tests.
421    ///
422    /// # Errors
423    ///
424    /// Returns an error if storage fails or content doesn't match address.
425    #[cfg(test)]
426    pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
427        self.storage.put(address, content).await
428    }
429}
430
431#[cfg(test)]
432#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
433mod tests {
434    use super::*;
435    use crate::payment::metrics::QuotingMetricsTracker;
436    use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
437    use crate::storage::LmdbStorageConfig;
438    use evmlib::RewardsAddress;
439    use saorsa_core::identity::NodeIdentity;
440    use saorsa_core::MlDsa65;
441    use saorsa_pqc::pqc::types::MlDsaSecretKey;
442    use tempfile::TempDir;
443
444    async fn create_test_protocol() -> (AntProtocol, TempDir) {
445        let temp_dir = TempDir::new().expect("create temp dir");
446
447        let storage_config = LmdbStorageConfig {
448            root_dir: temp_dir.path().to_path_buf(),
449            ..LmdbStorageConfig::test_default()
450        };
451        let storage = Arc::new(
452            LmdbStorage::new(storage_config)
453                .await
454                .expect("create storage"),
455        );
456
457        let rewards_address = RewardsAddress::new([1u8; 20]);
458        let payment_config = PaymentVerifierConfig {
459            evm: EvmVerifierConfig::default(),
460            cache_capacity: 100_000,
461            local_rewards_address: rewards_address,
462        };
463        let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
464        let metrics_tracker = QuotingMetricsTracker::new(100);
465        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
466
467        // Wire ML-DSA-65 signing so quote requests succeed
468        let identity = NodeIdentity::generate().expect("generate identity");
469        let pub_key_bytes = identity.public_key().as_bytes().to_vec();
470        let sk_bytes = identity.secret_key_bytes().to_vec();
471        let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
472        quote_generator.set_signer(pub_key_bytes, move |msg| {
473            use saorsa_pqc::pqc::MlDsaOperations;
474            let ml_dsa = MlDsa65::new();
475            ml_dsa
476                .sign(&sk, msg)
477                .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
478        });
479
480        let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
481        (protocol, temp_dir)
482    }
483
484    #[tokio::test]
485    async fn test_put_and_get_chunk() {
486        let (protocol, _temp) = create_test_protocol().await;
487
488        let content = b"hello world";
489        let address = LmdbStorage::compute_address(content);
490
491        // Pre-populate payment cache so EVM verification is bypassed
492        protocol.payment_verifier().cache_insert(address);
493
494        let put_request = ChunkPutRequest::new(address, content.to_vec());
495        let put_msg = ChunkMessage {
496            request_id: 1,
497            body: ChunkMessageBody::PutRequest(put_request),
498        };
499        let put_bytes = put_msg.encode().expect("encode put");
500
501        // Handle PUT
502        let response_bytes = protocol
503            .try_handle_request(&put_bytes)
504            .await
505            .expect("handle put")
506            .expect("expected response");
507        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
508
509        assert_eq!(response.request_id, 1);
510        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
511            response.body
512        {
513            assert_eq!(addr, address);
514        } else {
515            panic!("expected PutResponse::Success, got: {response:?}");
516        }
517
518        // Create GET request
519        let get_request = ChunkGetRequest::new(address);
520        let get_msg = ChunkMessage {
521            request_id: 2,
522            body: ChunkMessageBody::GetRequest(get_request),
523        };
524        let get_bytes = get_msg.encode().expect("encode get");
525
526        // Handle GET
527        let response_bytes = protocol
528            .try_handle_request(&get_bytes)
529            .await
530            .expect("handle get")
531            .expect("expected response");
532        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
533
534        assert_eq!(response.request_id, 2);
535        if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
536            address: addr,
537            content: data,
538        }) = response.body
539        {
540            assert_eq!(addr, address);
541            assert_eq!(data, content.to_vec());
542        } else {
543            panic!("expected GetResponse::Success");
544        }
545    }
546
547    #[tokio::test]
548    async fn test_get_not_found() {
549        let (protocol, _temp) = create_test_protocol().await;
550
551        let address = [0xAB; 32];
552        let get_request = ChunkGetRequest::new(address);
553        let get_msg = ChunkMessage {
554            request_id: 10,
555            body: ChunkMessageBody::GetRequest(get_request),
556        };
557        let get_bytes = get_msg.encode().expect("encode get");
558
559        let response_bytes = protocol
560            .try_handle_request(&get_bytes)
561            .await
562            .expect("handle get")
563            .expect("expected response");
564        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
565
566        assert_eq!(response.request_id, 10);
567        if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
568            response.body
569        {
570            assert_eq!(addr, address);
571        } else {
572            panic!("expected GetResponse::NotFound");
573        }
574    }
575
576    #[tokio::test]
577    async fn test_put_address_mismatch() {
578        let (protocol, _temp) = create_test_protocol().await;
579
580        let content = b"test content";
581        let wrong_address = [0xFF; 32]; // Wrong address
582
583        // Pre-populate cache for the wrong address so we test address mismatch, not payment
584        protocol.payment_verifier().cache_insert(wrong_address);
585
586        let put_request = ChunkPutRequest::new(wrong_address, content.to_vec());
587        let put_msg = ChunkMessage {
588            request_id: 20,
589            body: ChunkMessageBody::PutRequest(put_request),
590        };
591        let put_bytes = put_msg.encode().expect("encode put");
592
593        let response_bytes = protocol
594            .try_handle_request(&put_bytes)
595            .await
596            .expect("handle put")
597            .expect("expected response");
598        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
599
600        assert_eq!(response.request_id, 20);
601        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
602            ProtocolError::AddressMismatch { .. },
603        )) = response.body
604        {
605            // Expected
606        } else {
607            panic!("expected AddressMismatch error, got: {response:?}");
608        }
609    }
610
611    #[tokio::test]
612    async fn test_put_chunk_too_large() {
613        let (protocol, _temp) = create_test_protocol().await;
614
615        // Create oversized content
616        let content = vec![0u8; MAX_CHUNK_SIZE + 1];
617        let address = LmdbStorage::compute_address(&content);
618
619        let put_request = ChunkPutRequest::new(address, content);
620        let put_msg = ChunkMessage {
621            request_id: 30,
622            body: ChunkMessageBody::PutRequest(put_request),
623        };
624        let put_bytes = put_msg.encode().expect("encode put");
625
626        let response_bytes = protocol
627            .try_handle_request(&put_bytes)
628            .await
629            .expect("handle put")
630            .expect("expected response");
631        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
632
633        assert_eq!(response.request_id, 30);
634        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
635            ProtocolError::ChunkTooLarge { .. },
636        )) = response.body
637        {
638            // Expected
639        } else {
640            panic!("expected ChunkTooLarge error");
641        }
642    }
643
644    #[tokio::test]
645    async fn test_put_already_exists() {
646        let (protocol, _temp) = create_test_protocol().await;
647
648        let content = b"duplicate content";
649        let address = LmdbStorage::compute_address(content);
650
651        // Pre-populate cache so EVM verification is bypassed
652        protocol.payment_verifier().cache_insert(address);
653
654        let put_request = ChunkPutRequest::new(address, content.to_vec());
655        let put_msg = ChunkMessage {
656            request_id: 40,
657            body: ChunkMessageBody::PutRequest(put_request),
658        };
659        let put_bytes = put_msg.encode().expect("encode put");
660
661        let _ = protocol
662            .try_handle_request(&put_bytes)
663            .await
664            .expect("handle put");
665
666        // Store again - should return AlreadyExists
667        let response_bytes = protocol
668            .try_handle_request(&put_bytes)
669            .await
670            .expect("handle put 2")
671            .expect("expected response");
672        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
673
674        assert_eq!(response.request_id, 40);
675        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
676            response.body
677        {
678            assert_eq!(addr, address);
679        } else {
680            panic!("expected AlreadyExists");
681        }
682    }
683
684    #[tokio::test]
685    async fn test_protocol_id() {
686        let (protocol, _temp) = create_test_protocol().await;
687        assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
688    }
689
690    #[tokio::test]
691    async fn test_exists_and_local_access() {
692        let (protocol, _temp) = create_test_protocol().await;
693
694        let content = b"local access test";
695        let address = LmdbStorage::compute_address(content);
696
697        assert!(!protocol.exists(&address).expect("exists check"));
698
699        protocol
700            .put_local(&address, content)
701            .await
702            .expect("put local");
703
704        assert!(protocol.exists(&address).expect("exists check"));
705
706        let retrieved = protocol.get_local(&address).await.expect("get local");
707        assert_eq!(retrieved, Some(content.to_vec()));
708    }
709
710    #[tokio::test]
711    async fn test_cache_insert_is_visible() {
712        let (protocol, _temp) = create_test_protocol().await;
713
714        let content = b"cache test content";
715        let address = LmdbStorage::compute_address(content);
716
717        // Before insert: cache should be empty
718        let stats_before = protocol.payment_cache_stats();
719        assert_eq!(stats_before.additions, 0);
720
721        // Pre-populate cache
722        protocol.payment_verifier().cache_insert(address);
723
724        // After insert: cache should have the xorname
725        let stats_after = protocol.payment_cache_stats();
726        assert_eq!(stats_after.additions, 1);
727
728        // PUT should succeed (cache hit)
729        let put_request = ChunkPutRequest::new(address, content.to_vec());
730        let put_msg = ChunkMessage {
731            request_id: 100,
732            body: ChunkMessageBody::PutRequest(put_request),
733        };
734        let put_bytes = put_msg.encode().expect("encode put");
735        let response_bytes = protocol
736            .try_handle_request(&put_bytes)
737            .await
738            .expect("handle put")
739            .expect("expected response");
740        let response = ChunkMessage::decode(&response_bytes).expect("decode");
741
742        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
743            // expected
744        } else {
745            panic!("expected success, got: {response:?}");
746        }
747    }
748
749    #[tokio::test]
750    async fn test_put_same_chunk_twice_hits_cache() {
751        let (protocol, _temp) = create_test_protocol().await;
752
753        let content = b"duplicate cache test";
754        let address = LmdbStorage::compute_address(content);
755
756        // Pre-populate cache for first PUT
757        protocol.payment_verifier().cache_insert(address);
758
759        // First PUT
760        let put_request = ChunkPutRequest::new(address, content.to_vec());
761        let put_msg = ChunkMessage {
762            request_id: 110,
763            body: ChunkMessageBody::PutRequest(put_request),
764        };
765        let put_bytes = put_msg.encode().expect("encode put");
766        let _ = protocol
767            .try_handle_request(&put_bytes)
768            .await
769            .expect("handle put 1");
770
771        // Second PUT — should return AlreadyExists (checked in storage before payment)
772        let response_bytes = protocol
773            .try_handle_request(&put_bytes)
774            .await
775            .expect("handle put 2")
776            .expect("expected response");
777        let response = ChunkMessage::decode(&response_bytes).expect("decode");
778
779        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
780        {
781            // expected — storage check comes before payment check
782        } else {
783            panic!("expected AlreadyExists, got: {response:?}");
784        }
785    }
786
787    #[tokio::test]
788    async fn test_payment_cache_stats_returns_correct_values() {
789        let (protocol, _temp) = create_test_protocol().await;
790
791        let stats = protocol.payment_cache_stats();
792        assert_eq!(stats.hits, 0);
793        assert_eq!(stats.misses, 0);
794        assert_eq!(stats.additions, 0);
795
796        // Pre-populate cache, then store a chunk to test stats
797        let content = b"stats test";
798        let address = LmdbStorage::compute_address(content);
799        protocol.payment_verifier().cache_insert(address);
800
801        let put_request = ChunkPutRequest::new(address, content.to_vec());
802        let put_msg = ChunkMessage {
803            request_id: 120,
804            body: ChunkMessageBody::PutRequest(put_request),
805        };
806        let put_bytes = put_msg.encode().expect("encode put");
807        let _ = protocol
808            .try_handle_request(&put_bytes)
809            .await
810            .expect("handle put");
811
812        let stats = protocol.payment_cache_stats();
813        // Should have 1 addition (from cache_insert) + 1 hit (payment verification found cache)
814        assert_eq!(stats.additions, 1);
815        assert_eq!(stats.hits, 1);
816    }
817
818    #[tokio::test]
819    async fn test_storage_stats() {
820        let (protocol, _temp) = create_test_protocol().await;
821        let stats = protocol.storage_stats();
822        assert_eq!(stats.chunks_stored, 0);
823    }
824
825    #[tokio::test]
826    async fn test_merkle_candidate_quote_request() {
827        use crate::payment::quote::verify_merkle_candidate_signature;
828        use evmlib::merkle_payments::MerklePaymentCandidateNode;
829
830        // create_test_protocol already wires ML-DSA-65 signing
831        let (protocol, _temp) = create_test_protocol().await;
832
833        let address = [0x77; 32];
834        let timestamp = std::time::SystemTime::now()
835            .duration_since(std::time::UNIX_EPOCH)
836            .expect("system time")
837            .as_secs();
838
839        let request = MerkleCandidateQuoteRequest {
840            address,
841            data_type: DATA_TYPE_CHUNK,
842            data_size: 4096,
843            merkle_payment_timestamp: timestamp,
844        };
845        let msg = ChunkMessage {
846            request_id: 600,
847            body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
848        };
849        let msg_bytes = msg.encode().expect("encode request");
850
851        let response_bytes = protocol
852            .try_handle_request(&msg_bytes)
853            .await
854            .expect("handle merkle candidate quote")
855            .expect("expected response");
856        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
857
858        assert_eq!(response.request_id, 600);
859        match response.body {
860            ChunkMessageBody::MerkleCandidateQuoteResponse(
861                MerkleCandidateQuoteResponse::Success { candidate_node },
862            ) => {
863                let candidate: MerklePaymentCandidateNode =
864                    rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
865
866                // Verify ML-DSA-65 signature
867                assert!(
868                    verify_merkle_candidate_signature(&candidate),
869                    "ML-DSA-65 candidate signature must be valid"
870                );
871
872                assert_eq!(candidate.merkle_payment_timestamp, timestamp);
873                // Node-calculated price based on records stored
874                assert!(candidate.price >= evmlib::common::Amount::ZERO);
875            }
876            other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
877        }
878    }
879
880    #[tokio::test]
881    async fn test_handle_unexpected_response_message() {
882        let (protocol, _temp) = create_test_protocol().await;
883
884        // Send a PutResponse as if it were a request — should return None
885        let msg = ChunkMessage {
886            request_id: 200,
887            body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
888        };
889        let msg_bytes = msg.encode().expect("encode");
890
891        let result = protocol
892            .try_handle_request(&msg_bytes)
893            .await
894            .expect("handle msg");
895
896        assert!(
897            result.is_none(),
898            "expected None for response message, got: {result:?}"
899        );
900    }
901
902    #[tokio::test]
903    async fn test_quote_already_stored_flag() {
904        let (protocol, _temp) = create_test_protocol().await;
905
906        let content = b"already stored quote test";
907        let address = LmdbStorage::compute_address(content);
908
909        // Store the chunk first
910        protocol.payment_verifier().cache_insert(address);
911        let put_request = ChunkPutRequest::new(address, content.to_vec());
912        let put_msg = ChunkMessage {
913            request_id: 300,
914            body: ChunkMessageBody::PutRequest(put_request),
915        };
916        let put_bytes = put_msg.encode().expect("encode put");
917        let _ = protocol
918            .try_handle_request(&put_bytes)
919            .await
920            .expect("handle put");
921
922        // Now request a quote for the same address — already_stored should be true
923        let quote_request = ChunkQuoteRequest {
924            address,
925            data_size: content.len() as u64,
926            data_type: DATA_TYPE_CHUNK,
927        };
928        let quote_msg = ChunkMessage {
929            request_id: 301,
930            body: ChunkMessageBody::QuoteRequest(quote_request),
931        };
932        let quote_bytes = quote_msg.encode().expect("encode quote");
933        let response_bytes = protocol
934            .try_handle_request(&quote_bytes)
935            .await
936            .expect("handle quote")
937            .expect("expected response");
938        let response = ChunkMessage::decode(&response_bytes).expect("decode");
939
940        match response.body {
941            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
942                already_stored, ..
943            }) => {
944                assert!(
945                    already_stored,
946                    "already_stored should be true for existing chunk"
947                );
948            }
949            other => panic!("expected Success with already_stored, got: {other:?}"),
950        }
951
952        // Request a quote for a chunk that does NOT exist — already_stored should be false
953        let new_address = [0xFFu8; 32];
954        let quote_request2 = ChunkQuoteRequest {
955            address: new_address,
956            data_size: 100,
957            data_type: DATA_TYPE_CHUNK,
958        };
959        let quote_msg2 = ChunkMessage {
960            request_id: 302,
961            body: ChunkMessageBody::QuoteRequest(quote_request2),
962        };
963        let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
964        let response_bytes2 = protocol
965            .try_handle_request(&quote_bytes2)
966            .await
967            .expect("handle quote2")
968            .expect("expected response");
969        let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
970
971        match response2.body {
972            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
973                already_stored, ..
974            }) => {
975                assert!(
976                    !already_stored,
977                    "already_stored should be false for new chunk"
978                );
979            }
980            other => panic!("expected Success with already_stored=false, got: {other:?}"),
981        }
982    }
983}