Skip to main content

saorsa_node/storage/
handler.rs

1//! ANT protocol handler for autonomi protocol messages.
2//!
3//! This handler processes chunk PUT/GET requests with optional payment verification,
4//! storing chunks to LMDB and using the DHT for network-wide retrieval.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────┐
10//! │                    AntProtocol                        │
11//! ├─────────────────────────────────────────────────────────┤
12//! │  protocol_id() = "saorsa/ant/chunk/v1"                  │
13//! │                                                         │
14//! │  handle_message(data) ──▶ decode ChunkMessage  │
15//! │                                   │                     │
16//! │         ┌─────────────────────────┼─────────────────┐  │
17//! │         ▼                         ▼                 ▼  │
18//! │   ChunkQuoteRequest           ChunkPutRequest    ChunkGetRequest
19//! │         │                         │                 │  │
20//! │         ▼                         ▼                 ▼  │
21//! │   QuoteGenerator          PaymentVerifier    LmdbStorage│
22//! │         │                         │                 │  │
23//! │         └─────────────────────────┴─────────────────┘  │
24//! │                           │                             │
25//! │                 return Ok(response_bytes)               │
26//! └─────────────────────────────────────────────────────────┘
27//! ```
28
29use crate::ant_protocol::{
30    ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
31    ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID,
32    DATA_TYPE_CHUNK, MAX_CHUNK_SIZE,
33};
34use crate::client::compute_address;
35use crate::error::{Error, Result};
36use crate::payment::{PaymentVerifier, QuoteGenerator};
37use crate::storage::lmdb::LmdbStorage;
38use bytes::Bytes;
39use std::sync::Arc;
40use tracing::{debug, info, warn};
41
42/// ANT protocol handler.
43///
44/// Handles chunk PUT/GET/Quote requests using LMDB storage for persistence
45/// and optional payment verification.
46pub struct AntProtocol {
47    /// LMDB storage for chunk persistence.
48    storage: Arc<LmdbStorage>,
49    /// Payment verifier for checking payments.
50    payment_verifier: Arc<PaymentVerifier>,
51    /// Quote generator for creating storage quotes.
52    quote_generator: Arc<QuoteGenerator>,
53}
54
55impl AntProtocol {
56    /// Create a new ANT protocol handler.
57    ///
58    /// # Arguments
59    ///
60    /// * `storage` - LMDB storage for chunk persistence
61    /// * `payment_verifier` - Payment verifier for validating payments
62    /// * `quote_generator` - Quote generator for creating storage quotes
63    #[must_use]
64    pub fn new(
65        storage: Arc<LmdbStorage>,
66        payment_verifier: Arc<PaymentVerifier>,
67        quote_generator: Arc<QuoteGenerator>,
68    ) -> Self {
69        Self {
70            storage,
71            payment_verifier,
72            quote_generator,
73        }
74    }
75
76    /// Get the protocol identifier.
77    #[must_use]
78    pub fn protocol_id(&self) -> &'static str {
79        CHUNK_PROTOCOL_ID
80    }
81
82    /// Handle an incoming protocol message.
83    ///
84    /// # Arguments
85    ///
86    /// * `data` - Raw message bytes
87    ///
88    /// # Returns
89    ///
90    /// Response bytes, or an error if handling fails.
91    ///
92    /// # Errors
93    ///
94    /// Returns an error if message decoding or handling fails.
95    pub async fn handle_message(&self, data: &[u8]) -> Result<Bytes> {
96        let message = ChunkMessage::decode(data)
97            .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
98
99        let request_id = message.request_id;
100
101        let response_body = match message.body {
102            ChunkMessageBody::PutRequest(req) => {
103                ChunkMessageBody::PutResponse(self.handle_put(req).await)
104            }
105            ChunkMessageBody::GetRequest(req) => {
106                ChunkMessageBody::GetResponse(self.handle_get(req).await)
107            }
108            ChunkMessageBody::QuoteRequest(ref req) => {
109                ChunkMessageBody::QuoteResponse(self.handle_quote(req))
110            }
111            // Response messages shouldn't be received as requests
112            ChunkMessageBody::PutResponse(_)
113            | ChunkMessageBody::GetResponse(_)
114            | ChunkMessageBody::QuoteResponse(_) => {
115                let error = ProtocolError::Internal("Unexpected response message".to_string());
116                ChunkMessageBody::PutResponse(ChunkPutResponse::Error(error))
117            }
118        };
119
120        let response = ChunkMessage {
121            request_id,
122            body: response_body,
123        };
124
125        response
126            .encode()
127            .map(Bytes::from)
128            .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
129    }
130
131    /// Handle a PUT request.
132    async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
133        let address = request.address;
134        let addr_hex = hex::encode(address);
135        debug!("Handling PUT request for {addr_hex}");
136
137        // 1. Validate chunk size
138        if request.content.len() > MAX_CHUNK_SIZE {
139            return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
140                size: request.content.len(),
141                max_size: MAX_CHUNK_SIZE,
142            });
143        }
144
145        // 2. Verify content address matches BLAKE3(content)
146        let computed = compute_address(&request.content);
147        if computed != address {
148            return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
149                expected: address,
150                actual: computed,
151            });
152        }
153
154        // 3. Check if already exists (idempotent success)
155        match self.storage.exists(&address) {
156            Ok(true) => {
157                debug!("Chunk {addr_hex} already exists");
158                return ChunkPutResponse::AlreadyExists { address };
159            }
160            Err(e) => {
161                return ChunkPutResponse::Error(ProtocolError::Internal(format!(
162                    "Storage read failed: {e}"
163                )));
164            }
165            Ok(false) => {}
166        }
167
168        // 4. Verify payment
169        let payment_result = self
170            .payment_verifier
171            .verify_payment(&address, request.payment_proof.as_deref())
172            .await;
173
174        match payment_result {
175            Ok(status) if status.can_store() => {
176                // Payment verified or cached
177            }
178            Ok(_) => {
179                return ChunkPutResponse::PaymentRequired {
180                    message: "Payment required for new chunk".to_string(),
181                };
182            }
183            Err(e) => {
184                return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
185            }
186        }
187
188        // 5. Store chunk
189        match self.storage.put(&address, &request.content).await {
190            Ok(_) => {
191                let content_len = request.content.len();
192                info!("Stored chunk {addr_hex} ({content_len} bytes)");
193                // Record the store and payment in metrics
194                self.quote_generator.record_store(DATA_TYPE_CHUNK);
195                self.quote_generator.record_payment();
196                ChunkPutResponse::Success { address }
197            }
198            Err(e) => {
199                warn!("Failed to store chunk {addr_hex}: {e}");
200                ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
201            }
202        }
203    }
204
205    /// Handle a GET request.
206    async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
207        let address = request.address;
208        let addr_hex = hex::encode(address);
209        debug!("Handling GET request for {addr_hex}");
210
211        match self.storage.get(&address).await {
212            Ok(Some(content)) => {
213                let content_len = content.len();
214                debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
215                ChunkGetResponse::Success { address, content }
216            }
217            Ok(None) => {
218                debug!("Chunk {addr_hex} not found");
219                ChunkGetResponse::NotFound { address }
220            }
221            Err(e) => {
222                warn!("Failed to retrieve chunk {addr_hex}: {e}");
223                ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
224            }
225        }
226    }
227
228    /// Handle a quote request.
229    fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
230        let addr_hex = hex::encode(request.address);
231        let data_size = request.data_size;
232        debug!("Handling quote request for {addr_hex} (size: {data_size})");
233
234        // Check if the chunk is already stored so we can tell the client
235        // to skip payment (already_stored = true).
236        let already_stored = match self.storage.exists(&request.address) {
237            Ok(exists) => exists,
238            Err(e) => {
239                warn!("Storage check failed for {addr_hex}: {e}");
240                false // Assume not stored on error — generate a normal quote.
241            }
242        };
243
244        if already_stored {
245            debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
246        }
247
248        // Validate data size - data_size is u64, cast carefully and reject overflow
249        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
250            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
251                size: MAX_CHUNK_SIZE + 1,
252                max_size: MAX_CHUNK_SIZE,
253            });
254        };
255        if data_size_usize > MAX_CHUNK_SIZE {
256            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
257                size: data_size_usize,
258                max_size: MAX_CHUNK_SIZE,
259            });
260        }
261
262        match self
263            .quote_generator
264            .create_quote(request.address, data_size_usize, request.data_type)
265        {
266            Ok(quote) => {
267                // Serialize the quote
268                match rmp_serde::to_vec(&quote) {
269                    Ok(quote_bytes) => ChunkQuoteResponse::Success {
270                        quote: quote_bytes,
271                        already_stored,
272                    },
273                    Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
274                        "Failed to serialize quote: {e}"
275                    ))),
276                }
277            }
278            Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
279        }
280    }
281
282    /// Get storage statistics.
283    #[must_use]
284    pub fn storage_stats(&self) -> crate::storage::StorageStats {
285        self.storage.stats()
286    }
287
288    /// Get payment cache statistics.
289    #[must_use]
290    pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
291        self.payment_verifier.cache_stats()
292    }
293
294    /// Get a reference to the payment verifier.
295    ///
296    /// Exposed for **test harnesses only** — production code should not call
297    /// this directly. Use `cache_insert()` on the returned verifier to
298    /// pre-populate the payment cache in test setups.
299    #[cfg(any(test, feature = "test-utils"))]
300    #[must_use]
301    pub fn payment_verifier(&self) -> &PaymentVerifier {
302        &self.payment_verifier
303    }
304
305    /// Check if a chunk exists locally.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if the storage read fails.
310    pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
311        self.storage.exists(address)
312    }
313
314    /// Get a chunk directly from local storage.
315    ///
316    /// # Errors
317    ///
318    /// Returns an error if storage access fails.
319    pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
320        self.storage.get(address).await
321    }
322
323    /// Store a chunk directly to local storage (bypasses payment verification).
324    ///
325    /// TEST ONLY - This method bypasses payment verification and should only be used in tests.
326    ///
327    /// # Errors
328    ///
329    /// Returns an error if storage fails or content doesn't match address.
330    #[cfg(test)]
331    pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
332        self.storage.put(address, content).await
333    }
334}
335
336#[cfg(test)]
337#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
338mod tests {
339    use super::*;
340    use crate::payment::metrics::QuotingMetricsTracker;
341    use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
342    use crate::storage::LmdbStorageConfig;
343    use ant_evm::RewardsAddress;
344    use saorsa_core::identity::NodeIdentity;
345    use saorsa_core::MlDsa65;
346    use saorsa_pqc::pqc::types::MlDsaSecretKey;
347    use tempfile::TempDir;
348
349    async fn create_test_protocol() -> (AntProtocol, TempDir) {
350        let temp_dir = TempDir::new().expect("create temp dir");
351
352        let storage_config = LmdbStorageConfig {
353            root_dir: temp_dir.path().to_path_buf(),
354            verify_on_read: true,
355            max_chunks: 0,
356            max_map_size: 0,
357        };
358        let storage = Arc::new(
359            LmdbStorage::new(storage_config)
360                .await
361                .expect("create storage"),
362        );
363
364        let rewards_address = RewardsAddress::new([1u8; 20]);
365        let payment_config = PaymentVerifierConfig {
366            evm: EvmVerifierConfig::default(),
367            cache_capacity: 100_000,
368            local_rewards_address: rewards_address,
369        };
370        let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
371        let metrics_tracker = QuotingMetricsTracker::new(1000, 100);
372        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
373
374        // Wire ML-DSA-65 signing so quote requests succeed
375        let identity = NodeIdentity::generate().expect("generate identity");
376        let pub_key_bytes = identity.public_key().as_bytes().to_vec();
377        let sk_bytes = identity.secret_key_bytes().to_vec();
378        let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
379        quote_generator.set_signer(pub_key_bytes, move |msg| {
380            use saorsa_pqc::pqc::MlDsaOperations;
381            let ml_dsa = MlDsa65::new();
382            ml_dsa
383                .sign(&sk, msg)
384                .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
385        });
386
387        let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
388        (protocol, temp_dir)
389    }
390
391    #[tokio::test]
392    async fn test_put_and_get_chunk() {
393        let (protocol, _temp) = create_test_protocol().await;
394
395        let content = b"hello world";
396        let address = LmdbStorage::compute_address(content);
397
398        // Pre-populate payment cache so EVM verification is bypassed
399        protocol.payment_verifier().cache_insert(address);
400
401        let put_request = ChunkPutRequest::new(address, content.to_vec());
402        let put_msg = ChunkMessage {
403            request_id: 1,
404            body: ChunkMessageBody::PutRequest(put_request),
405        };
406        let put_bytes = put_msg.encode().expect("encode put");
407
408        // Handle PUT
409        let response_bytes = protocol
410            .handle_message(&put_bytes)
411            .await
412            .expect("handle put");
413        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
414
415        assert_eq!(response.request_id, 1);
416        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
417            response.body
418        {
419            assert_eq!(addr, address);
420        } else {
421            panic!("expected PutResponse::Success, got: {response:?}");
422        }
423
424        // Create GET request
425        let get_request = ChunkGetRequest::new(address);
426        let get_msg = ChunkMessage {
427            request_id: 2,
428            body: ChunkMessageBody::GetRequest(get_request),
429        };
430        let get_bytes = get_msg.encode().expect("encode get");
431
432        // Handle GET
433        let response_bytes = protocol
434            .handle_message(&get_bytes)
435            .await
436            .expect("handle get");
437        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
438
439        assert_eq!(response.request_id, 2);
440        if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
441            address: addr,
442            content: data,
443        }) = response.body
444        {
445            assert_eq!(addr, address);
446            assert_eq!(data, content.to_vec());
447        } else {
448            panic!("expected GetResponse::Success");
449        }
450    }
451
452    #[tokio::test]
453    async fn test_get_not_found() {
454        let (protocol, _temp) = create_test_protocol().await;
455
456        let address = [0xAB; 32];
457        let get_request = ChunkGetRequest::new(address);
458        let get_msg = ChunkMessage {
459            request_id: 10,
460            body: ChunkMessageBody::GetRequest(get_request),
461        };
462        let get_bytes = get_msg.encode().expect("encode get");
463
464        let response_bytes = protocol
465            .handle_message(&get_bytes)
466            .await
467            .expect("handle get");
468        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
469
470        assert_eq!(response.request_id, 10);
471        if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
472            response.body
473        {
474            assert_eq!(addr, address);
475        } else {
476            panic!("expected GetResponse::NotFound");
477        }
478    }
479
480    #[tokio::test]
481    async fn test_put_address_mismatch() {
482        let (protocol, _temp) = create_test_protocol().await;
483
484        let content = b"test content";
485        let wrong_address = [0xFF; 32]; // Wrong address
486
487        // Pre-populate cache for the wrong address so we test address mismatch, not payment
488        protocol.payment_verifier().cache_insert(wrong_address);
489
490        let put_request = ChunkPutRequest::new(wrong_address, content.to_vec());
491        let put_msg = ChunkMessage {
492            request_id: 20,
493            body: ChunkMessageBody::PutRequest(put_request),
494        };
495        let put_bytes = put_msg.encode().expect("encode put");
496
497        let response_bytes = protocol
498            .handle_message(&put_bytes)
499            .await
500            .expect("handle put");
501        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
502
503        assert_eq!(response.request_id, 20);
504        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
505            ProtocolError::AddressMismatch { .. },
506        )) = response.body
507        {
508            // Expected
509        } else {
510            panic!("expected AddressMismatch error, got: {response:?}");
511        }
512    }
513
514    #[tokio::test]
515    async fn test_put_chunk_too_large() {
516        let (protocol, _temp) = create_test_protocol().await;
517
518        // Create oversized content
519        let content = vec![0u8; MAX_CHUNK_SIZE + 1];
520        let address = LmdbStorage::compute_address(&content);
521
522        let put_request = ChunkPutRequest::new(address, content);
523        let put_msg = ChunkMessage {
524            request_id: 30,
525            body: ChunkMessageBody::PutRequest(put_request),
526        };
527        let put_bytes = put_msg.encode().expect("encode put");
528
529        let response_bytes = protocol
530            .handle_message(&put_bytes)
531            .await
532            .expect("handle put");
533        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
534
535        assert_eq!(response.request_id, 30);
536        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
537            ProtocolError::ChunkTooLarge { .. },
538        )) = response.body
539        {
540            // Expected
541        } else {
542            panic!("expected ChunkTooLarge error");
543        }
544    }
545
546    #[tokio::test]
547    async fn test_put_already_exists() {
548        let (protocol, _temp) = create_test_protocol().await;
549
550        let content = b"duplicate content";
551        let address = LmdbStorage::compute_address(content);
552
553        // Pre-populate cache so EVM verification is bypassed
554        protocol.payment_verifier().cache_insert(address);
555
556        let put_request = ChunkPutRequest::new(address, content.to_vec());
557        let put_msg = ChunkMessage {
558            request_id: 40,
559            body: ChunkMessageBody::PutRequest(put_request),
560        };
561        let put_bytes = put_msg.encode().expect("encode put");
562
563        let _ = protocol
564            .handle_message(&put_bytes)
565            .await
566            .expect("handle put");
567
568        // Store again - should return AlreadyExists
569        let response_bytes = protocol
570            .handle_message(&put_bytes)
571            .await
572            .expect("handle put 2");
573        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
574
575        assert_eq!(response.request_id, 40);
576        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
577            response.body
578        {
579            assert_eq!(addr, address);
580        } else {
581            panic!("expected AlreadyExists");
582        }
583    }
584
585    #[tokio::test]
586    async fn test_protocol_id() {
587        let (protocol, _temp) = create_test_protocol().await;
588        assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
589    }
590
591    #[tokio::test]
592    async fn test_exists_and_local_access() {
593        let (protocol, _temp) = create_test_protocol().await;
594
595        let content = b"local access test";
596        let address = LmdbStorage::compute_address(content);
597
598        assert!(!protocol.exists(&address).expect("exists check"));
599
600        protocol
601            .put_local(&address, content)
602            .await
603            .expect("put local");
604
605        assert!(protocol.exists(&address).expect("exists check"));
606
607        let retrieved = protocol.get_local(&address).await.expect("get local");
608        assert_eq!(retrieved, Some(content.to_vec()));
609    }
610
611    #[tokio::test]
612    async fn test_cache_insert_is_visible() {
613        let (protocol, _temp) = create_test_protocol().await;
614
615        let content = b"cache test content";
616        let address = LmdbStorage::compute_address(content);
617
618        // Before insert: cache should be empty
619        let stats_before = protocol.payment_cache_stats();
620        assert_eq!(stats_before.additions, 0);
621
622        // Pre-populate cache
623        protocol.payment_verifier().cache_insert(address);
624
625        // After insert: cache should have the xorname
626        let stats_after = protocol.payment_cache_stats();
627        assert_eq!(stats_after.additions, 1);
628
629        // PUT should succeed (cache hit)
630        let put_request = ChunkPutRequest::new(address, content.to_vec());
631        let put_msg = ChunkMessage {
632            request_id: 100,
633            body: ChunkMessageBody::PutRequest(put_request),
634        };
635        let put_bytes = put_msg.encode().expect("encode put");
636        let response_bytes = protocol
637            .handle_message(&put_bytes)
638            .await
639            .expect("handle put");
640        let response = ChunkMessage::decode(&response_bytes).expect("decode");
641
642        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
643            // expected
644        } else {
645            panic!("expected success, got: {response:?}");
646        }
647    }
648
649    #[tokio::test]
650    async fn test_put_same_chunk_twice_hits_cache() {
651        let (protocol, _temp) = create_test_protocol().await;
652
653        let content = b"duplicate cache test";
654        let address = LmdbStorage::compute_address(content);
655
656        // Pre-populate cache for first PUT
657        protocol.payment_verifier().cache_insert(address);
658
659        // First PUT
660        let put_request = ChunkPutRequest::new(address, content.to_vec());
661        let put_msg = ChunkMessage {
662            request_id: 110,
663            body: ChunkMessageBody::PutRequest(put_request),
664        };
665        let put_bytes = put_msg.encode().expect("encode put");
666        let _ = protocol
667            .handle_message(&put_bytes)
668            .await
669            .expect("handle put 1");
670
671        // Second PUT — should return AlreadyExists (checked in storage before payment)
672        let response_bytes = protocol
673            .handle_message(&put_bytes)
674            .await
675            .expect("handle put 2");
676        let response = ChunkMessage::decode(&response_bytes).expect("decode");
677
678        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
679        {
680            // expected — storage check comes before payment check
681        } else {
682            panic!("expected AlreadyExists, got: {response:?}");
683        }
684    }
685
686    #[tokio::test]
687    async fn test_payment_cache_stats_returns_correct_values() {
688        let (protocol, _temp) = create_test_protocol().await;
689
690        let stats = protocol.payment_cache_stats();
691        assert_eq!(stats.hits, 0);
692        assert_eq!(stats.misses, 0);
693        assert_eq!(stats.additions, 0);
694
695        // Pre-populate cache, then store a chunk to test stats
696        let content = b"stats test";
697        let address = LmdbStorage::compute_address(content);
698        protocol.payment_verifier().cache_insert(address);
699
700        let put_request = ChunkPutRequest::new(address, content.to_vec());
701        let put_msg = ChunkMessage {
702            request_id: 120,
703            body: ChunkMessageBody::PutRequest(put_request),
704        };
705        let put_bytes = put_msg.encode().expect("encode put");
706        let _ = protocol
707            .handle_message(&put_bytes)
708            .await
709            .expect("handle put");
710
711        let stats = protocol.payment_cache_stats();
712        // Should have 1 addition (from cache_insert) + 1 hit (payment verification found cache)
713        assert_eq!(stats.additions, 1);
714        assert_eq!(stats.hits, 1);
715    }
716
717    #[tokio::test]
718    async fn test_storage_stats() {
719        let (protocol, _temp) = create_test_protocol().await;
720        let stats = protocol.storage_stats();
721        assert_eq!(stats.chunks_stored, 0);
722    }
723
724    #[tokio::test]
725    async fn test_handle_unexpected_response_message() {
726        let (protocol, _temp) = create_test_protocol().await;
727
728        // Send a PutResponse as if it were a request
729        let msg = ChunkMessage {
730            request_id: 200,
731            body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
732        };
733        let msg_bytes = msg.encode().expect("encode");
734
735        let response_bytes = protocol
736            .handle_message(&msg_bytes)
737            .await
738            .expect("handle msg");
739        let response = ChunkMessage::decode(&response_bytes).expect("decode");
740
741        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(ProtocolError::Internal(
742            msg,
743        ))) = response.body
744        {
745            assert!(msg.contains("Unexpected"));
746        } else {
747            panic!("expected Internal error, got: {response:?}");
748        }
749    }
750
751    #[tokio::test]
752    async fn test_quote_already_stored_flag() {
753        let (protocol, _temp) = create_test_protocol().await;
754
755        let content = b"already stored quote test";
756        let address = LmdbStorage::compute_address(content);
757
758        // Store the chunk first
759        protocol.payment_verifier().cache_insert(address);
760        let put_request = ChunkPutRequest::new(address, content.to_vec());
761        let put_msg = ChunkMessage {
762            request_id: 300,
763            body: ChunkMessageBody::PutRequest(put_request),
764        };
765        let put_bytes = put_msg.encode().expect("encode put");
766        let _ = protocol
767            .handle_message(&put_bytes)
768            .await
769            .expect("handle put");
770
771        // Now request a quote for the same address — already_stored should be true
772        let quote_request = ChunkQuoteRequest {
773            address,
774            data_size: content.len() as u64,
775            data_type: DATA_TYPE_CHUNK,
776        };
777        let quote_msg = ChunkMessage {
778            request_id: 301,
779            body: ChunkMessageBody::QuoteRequest(quote_request),
780        };
781        let quote_bytes = quote_msg.encode().expect("encode quote");
782        let response_bytes = protocol
783            .handle_message(&quote_bytes)
784            .await
785            .expect("handle quote");
786        let response = ChunkMessage::decode(&response_bytes).expect("decode");
787
788        match response.body {
789            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
790                already_stored, ..
791            }) => {
792                assert!(
793                    already_stored,
794                    "already_stored should be true for existing chunk"
795                );
796            }
797            other => panic!("expected Success with already_stored, got: {other:?}"),
798        }
799
800        // Request a quote for a chunk that does NOT exist — already_stored should be false
801        let new_address = [0xFFu8; 32];
802        let quote_request2 = ChunkQuoteRequest {
803            address: new_address,
804            data_size: 100,
805            data_type: DATA_TYPE_CHUNK,
806        };
807        let quote_msg2 = ChunkMessage {
808            request_id: 302,
809            body: ChunkMessageBody::QuoteRequest(quote_request2),
810        };
811        let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
812        let response_bytes2 = protocol
813            .handle_message(&quote_bytes2)
814            .await
815            .expect("handle quote2");
816        let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
817
818        match response2.body {
819            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
820                already_stored, ..
821            }) => {
822                assert!(
823                    !already_stored,
824                    "already_stored should be false for new chunk"
825                );
826            }
827            other => panic!("expected Success with already_stored=false, got: {other:?}"),
828        }
829    }
830}