Skip to main content

ant_node/storage/
handler.rs

1//! ANT protocol handler for autonomi protocol messages.
2//!
3//! This handler processes chunk PUT/GET requests with optional payment verification,
4//! storing chunks to LMDB and using the DHT for network-wide retrieval.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────┐
10//! │                    AntProtocol                        │
11//! ├─────────────────────────────────────────────────────────┤
12//! │  protocol_id() = "autonomi.ant.chunk.v1"                  │
13//! │                                                         │
14//! │  try_handle_request(data) ──▶ decode ChunkMessage  │
15//! │                                   │                     │
16//! │         ┌─────────────────────────┼─────────────────┐  │
17//! │         ▼                         ▼                 ▼  │
18//! │   ChunkQuoteRequest           ChunkPutRequest    ChunkGetRequest
19//! │         │                         │                 │  │
20//! │         ▼                         ▼                 ▼  │
21//! │   QuoteGenerator          PaymentVerifier    LmdbStorage│
22//! │         │                         │                 │  │
23//! │         └─────────────────────────┴─────────────────┘  │
24//! │                           │                             │
25//! │           return Ok(Some(response_bytes))              │
26//! │           return Ok(None) for response messages       │
27//! └─────────────────────────────────────────────────────────┘
28//! ```
29
30#[cfg(test)]
31use crate::ant_protocol::DATA_TYPE_CHUNK;
32use crate::ant_protocol::{
33    ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
34    ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
35    MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE,
36};
37use crate::client::compute_address;
38use crate::error::{Error, Result};
39use crate::logging::{debug, info, warn};
40use crate::payment::{PaymentVerifier, QuoteGenerator};
41use crate::replication::fresh::FreshWriteEvent;
42use crate::storage::lmdb::LmdbStorage;
43use bytes::Bytes;
44use std::sync::Arc;
45use tokio::sync::mpsc;
46
47/// ANT protocol handler.
48///
49/// Handles chunk PUT/GET/Quote requests using LMDB storage for persistence
50/// and optional payment verification.
51pub struct AntProtocol {
52    /// LMDB storage for chunk persistence.
53    storage: Arc<LmdbStorage>,
54    /// Payment verifier for checking payments.
55    payment_verifier: Arc<PaymentVerifier>,
56    /// Quote generator for creating storage quotes.
57    /// Also handles merkle candidate quote signing via ML-DSA-65.
58    quote_generator: Arc<QuoteGenerator>,
59    /// Channel for notifying the replication engine about newly-stored chunks.
60    fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
61}
62
63impl AntProtocol {
64    /// Create a new ANT protocol handler.
65    ///
66    /// # Arguments
67    ///
68    /// * `storage` - LMDB storage for chunk persistence
69    /// * `payment_verifier` - Payment verifier for validating payments
70    /// * `quote_generator` - Quote generator for creating storage quotes
71    #[must_use]
72    pub fn new(
73        storage: Arc<LmdbStorage>,
74        payment_verifier: Arc<PaymentVerifier>,
75        quote_generator: Arc<QuoteGenerator>,
76    ) -> Self {
77        Self {
78            storage,
79            payment_verifier,
80            quote_generator,
81            fresh_write_tx: None,
82        }
83    }
84
85    /// Set the channel sender for fresh-write replication events.
86    ///
87    /// When set, successful chunk PUTs will notify the replication engine
88    /// so it can fan out fresh offers to the close group.
89    pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
90        self.fresh_write_tx = Some(tx);
91    }
92
93    /// Get the protocol identifier.
94    #[must_use]
95    pub fn protocol_id(&self) -> &'static str {
96        CHUNK_PROTOCOL_ID
97    }
98
99    /// Get a reference to the underlying LMDB storage.
100    #[must_use]
101    pub fn storage(&self) -> Arc<LmdbStorage> {
102        Arc::clone(&self.storage)
103    }
104
105    /// Get a shared reference to the payment verifier.
106    #[must_use]
107    pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
108        Arc::clone(&self.payment_verifier)
109    }
110
111    /// Handle an incoming request and produce a response.
112    ///
113    /// Decodes the raw message, processes it if it is a request variant,
114    /// and returns the encoded response bytes.  Returns `Ok(None)` for
115    /// response messages (which are meant for client subscribers, not for
116    /// the protocol handler).
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if message decoding, handling, or encoding fails.
121    pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
122        let message = ChunkMessage::decode(data)
123            .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
124
125        let request_id = message.request_id;
126
127        let response_body = match message.body {
128            ChunkMessageBody::PutRequest(req) => {
129                ChunkMessageBody::PutResponse(self.handle_put(req).await)
130            }
131            ChunkMessageBody::GetRequest(req) => {
132                ChunkMessageBody::GetResponse(self.handle_get(req).await)
133            }
134            ChunkMessageBody::QuoteRequest(ref req) => {
135                ChunkMessageBody::QuoteResponse(self.handle_quote(req))
136            }
137            ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
138                ChunkMessageBody::MerkleCandidateQuoteResponse(
139                    self.handle_merkle_candidate_quote(req),
140                )
141            }
142            // Anything else — response messages are handled by client
143            // subscribers (e.g. send_and_await_chunk_response), not by the
144            // protocol handler. Returning None prevents the caller from
145            // sending a reply, which would create an infinite ping-pong
146            // loop.
147            //
148            // `ChunkMessageBody` is `#[non_exhaustive]` in ant-protocol, so
149            // a future wire variant added on a protocol minor bump also
150            // lands here and is dropped. The CHUNK_PROTOCOL_ID multistream-
151            // select handshake version-gates peers, so this arm should
152            // only be reached by a misconfigured peer.
153            _ => return Ok(None),
154        };
155
156        let response = ChunkMessage {
157            request_id,
158            body: response_body,
159        };
160
161        response
162            .encode()
163            .map(|b| Some(Bytes::from(b)))
164            .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
165    }
166
167    /// Handle a PUT request.
168    ///
169    /// Wraps `handle_put_inner` to emit a single structured tracing event per
170    /// PUT RPC at every exit path, including early-return validation paths.
171    /// The event uses `target: "ant_node::storage::rpc_latency"` so that
172    /// Elasticsearch / Kibana can build p50/p95/p99 store-RPC latency
173    /// histograms from the existing telegraf log forwarding.
174    async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
175        let start = std::time::Instant::now();
176        let addr_hex = hex::encode(request.address);
177        let chunk_size = request.content.len();
178        let response = self.handle_put_inner(request).await;
179        let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
180        let outcome: &'static str = match &response {
181            ChunkPutResponse::Success { .. } => "success",
182            ChunkPutResponse::AlreadyExists { .. } => "already_exists",
183            ChunkPutResponse::PaymentRequired { .. } => "payment_required",
184            ChunkPutResponse::Error(_) => "error",
185            _ => "unknown",
186        };
187        info!(
188            target: "ant_node::storage::rpc_latency",
189            duration_ms,
190            chunk_size,
191            outcome,
192            addr = %addr_hex,
193            "put_rpc"
194        );
195        response
196    }
197
198    /// Inner body of `handle_put` — see the wrapper for the per-RPC latency log.
199    async fn handle_put_inner(&self, request: ChunkPutRequest) -> ChunkPutResponse {
200        let address = request.address;
201        let addr_hex = hex::encode(address);
202        debug!("Handling PUT request for {addr_hex}");
203
204        // 1. Validate chunk size
205        if request.content.len() > MAX_CHUNK_SIZE {
206            return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
207                size: request.content.len(),
208                max_size: MAX_CHUNK_SIZE,
209            });
210        }
211
212        // 2. Verify content address matches BLAKE3(content)
213        let computed = compute_address(&request.content);
214        if computed != address {
215            return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
216                expected: address,
217                actual: computed,
218            });
219        }
220
221        // 3. Check if already exists (idempotent success)
222        match self.storage.exists(&address) {
223            Ok(true) => {
224                debug!("Chunk {addr_hex} already exists");
225                return ChunkPutResponse::AlreadyExists { address };
226            }
227            Err(e) => {
228                return ChunkPutResponse::Error(ProtocolError::Internal(format!(
229                    "Storage read failed: {e}"
230                )));
231            }
232            Ok(false) => {}
233        }
234
235        // 4. Verify payment
236        let payment_result = self
237            .payment_verifier
238            .verify_payment(&address, request.payment_proof.as_deref())
239            .await;
240
241        match payment_result {
242            Ok(status) if status.can_store() => {
243                // Payment verified or cached
244            }
245            Ok(_) => {
246                return ChunkPutResponse::PaymentRequired {
247                    message: "Payment required for new chunk".to_string(),
248                };
249            }
250            Err(e) => {
251                return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
252            }
253        }
254
255        // 5. Store chunk
256        match self.storage.put(&address, &request.content).await {
257            Ok(_) => {
258                let content_len = request.content.len();
259                info!("Stored chunk {addr_hex} ({content_len} bytes)");
260                // Increment the close-records counter consumed by calculate_price.
261                self.quote_generator.record_store();
262
263                // 6. Notify replication engine for fresh fan-out.
264                //    Only emit when a real proof is present — cached-as-verified
265                //    PUTs have no proof to forward, and the chunk would have
266                //    already replicated on the original write that carried one.
267                if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
268                    // `request.content` is now `bytes::Bytes`; FreshWriteEvent
269                    // still carries the chunk as `Vec<u8>` for compatibility
270                    // with the replication wire format, so materialise once
271                    // here. Done only on the success path, where storage has
272                    // already accepted the chunk.
273                    let event = FreshWriteEvent {
274                        key: address,
275                        data: request.content.to_vec(),
276                        payment_proof: proof,
277                    };
278                    if tx.send(event).is_err() {
279                        debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
280                    }
281                }
282
283                ChunkPutResponse::Success { address }
284            }
285            Err(e) => {
286                warn!("Failed to store chunk {addr_hex}: {e}");
287                ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
288            }
289        }
290    }
291
292    /// Handle a GET request.
293    ///
294    /// Wraps `handle_get_inner` to emit a single structured tracing event per
295    /// GET RPC at every exit path. See `handle_put` for the rationale.
296    async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
297        let start = std::time::Instant::now();
298        let addr_hex = hex::encode(request.address);
299        let response = self.handle_get_inner(request).await;
300        let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
301        let outcome: &'static str = match &response {
302            ChunkGetResponse::Success { .. } => "success",
303            ChunkGetResponse::NotFound { .. } => "not_found",
304            ChunkGetResponse::Error(_) => "error",
305            _ => "unknown",
306        };
307        info!(
308            target: "ant_node::storage::rpc_latency",
309            duration_ms,
310            outcome,
311            addr = %addr_hex,
312            "get_rpc"
313        );
314        response
315    }
316
317    /// Inner body of `handle_get` — see the wrapper for the per-RPC latency log.
318    async fn handle_get_inner(&self, request: ChunkGetRequest) -> ChunkGetResponse {
319        let address = request.address;
320        let addr_hex = hex::encode(address);
321        debug!("Handling GET request for {addr_hex}");
322
323        match self.storage.get(&address).await {
324            Ok(Some(content)) => {
325                let content_len = content.len();
326                debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
327                ChunkGetResponse::Success { address, content }
328            }
329            Ok(None) => {
330                debug!("Chunk {addr_hex} not found");
331                ChunkGetResponse::NotFound { address }
332            }
333            Err(e) => {
334                warn!("Failed to retrieve chunk {addr_hex}: {e}");
335                ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
336            }
337        }
338    }
339
340    /// Handle a quote request.
341    fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
342        let addr_hex = hex::encode(request.address);
343        let data_size = request.data_size;
344        debug!("Handling quote request for {addr_hex} (size: {data_size})");
345
346        // Check if the chunk is already stored so we can tell the client
347        // to skip payment (already_stored = true).
348        // The match intentionally logs the error when the `logging` feature is
349        // active. Clippy suggests `unwrap_or_default()` when logging is compiled
350        // out, but keeping the explicit match preserves the diagnostic intent.
351        #[allow(clippy::manual_unwrap_or_default)]
352        let already_stored = match self.storage.exists(&request.address) {
353            Ok(exists) => exists,
354            Err(e) => {
355                warn!("Storage check failed for {addr_hex}: {e}");
356                false // Assume not stored on error — generate a normal quote.
357            }
358        };
359
360        if already_stored {
361            debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
362        }
363
364        // Validate data size - data_size is u64, cast carefully and reject overflow
365        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
366            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
367                size: MAX_CHUNK_SIZE + 1,
368                max_size: MAX_CHUNK_SIZE,
369            });
370        };
371        if data_size_usize > MAX_CHUNK_SIZE {
372            return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
373                size: data_size_usize,
374                max_size: MAX_CHUNK_SIZE,
375            });
376        }
377
378        match self
379            .quote_generator
380            .create_quote(request.address, data_size_usize, request.data_type)
381        {
382            Ok(quote) => {
383                // Serialize the quote
384                match rmp_serde::to_vec(&quote) {
385                    Ok(quote_bytes) => ChunkQuoteResponse::Success {
386                        quote: quote_bytes,
387                        already_stored,
388                    },
389                    Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
390                        "Failed to serialize quote: {e}"
391                    ))),
392                }
393            }
394            Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
395        }
396    }
397
398    /// Handle a merkle candidate quote request.
399    fn handle_merkle_candidate_quote(
400        &self,
401        request: &MerkleCandidateQuoteRequest,
402    ) -> MerkleCandidateQuoteResponse {
403        let addr_hex = hex::encode(request.address);
404        let data_size = request.data_size;
405        debug!(
406            "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
407            request.merkle_payment_timestamp
408        );
409
410        let Ok(data_size_usize) = usize::try_from(request.data_size) else {
411            return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
412                "data_size {} overflows usize",
413                request.data_size
414            )));
415        };
416        if data_size_usize > MAX_CHUNK_SIZE {
417            return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
418                size: data_size_usize,
419                max_size: MAX_CHUNK_SIZE,
420            });
421        }
422
423        match self.quote_generator.create_merkle_candidate_quote(
424            data_size_usize,
425            request.data_type,
426            request.merkle_payment_timestamp,
427        ) {
428            Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
429                Ok(bytes) => MerkleCandidateQuoteResponse::Success {
430                    candidate_node: bytes,
431                },
432                Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
433                    "Failed to serialize merkle candidate node: {e}"
434                ))),
435            },
436            Err(e) => {
437                MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
438            }
439        }
440    }
441
442    /// Get storage statistics.
443    #[must_use]
444    pub fn storage_stats(&self) -> crate::storage::StorageStats {
445        self.storage.stats()
446    }
447
448    /// Get payment cache statistics.
449    #[must_use]
450    pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
451        self.payment_verifier.cache_stats()
452    }
453
454    /// Get a reference to the payment verifier.
455    ///
456    /// Exposed for **test harnesses only** — production code should not call
457    /// this directly. Use `cache_insert()` on the returned verifier to
458    /// pre-populate the payment cache in test setups.
459    #[cfg(any(test, feature = "test-utils"))]
460    #[must_use]
461    pub fn payment_verifier(&self) -> &PaymentVerifier {
462        &self.payment_verifier
463    }
464
465    /// Check if a chunk exists locally.
466    ///
467    /// # Errors
468    ///
469    /// Returns an error if the storage read fails.
470    pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
471        self.storage.exists(address)
472    }
473
474    /// Get a chunk directly from local storage.
475    ///
476    /// # Errors
477    ///
478    /// Returns an error if storage access fails.
479    pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
480        self.storage.get(address).await
481    }
482
483    /// Store a chunk directly to local storage (bypasses payment verification).
484    ///
485    /// TEST ONLY - This method bypasses payment verification and should only be used in tests.
486    ///
487    /// # Errors
488    ///
489    /// Returns an error if storage fails or content doesn't match address.
490    #[cfg(test)]
491    pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
492        self.storage.put(address, content).await
493    }
494}
495
496#[cfg(test)]
497#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
498mod tests {
499    use super::*;
500    use crate::payment::metrics::QuotingMetricsTracker;
501    use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
502    use crate::storage::LmdbStorageConfig;
503    use evmlib::RewardsAddress;
504    use saorsa_core::identity::NodeIdentity;
505    use saorsa_core::MlDsa65;
506    use saorsa_pqc::pqc::types::MlDsaSecretKey;
507    use tempfile::TempDir;
508
509    async fn create_test_protocol() -> (AntProtocol, TempDir) {
510        let temp_dir = TempDir::new().expect("create temp dir");
511
512        let storage_config = LmdbStorageConfig {
513            root_dir: temp_dir.path().to_path_buf(),
514            ..LmdbStorageConfig::test_default()
515        };
516        let storage = Arc::new(
517            LmdbStorage::new(storage_config)
518                .await
519                .expect("create storage"),
520        );
521
522        let rewards_address = RewardsAddress::new([1u8; 20]);
523        let payment_config = PaymentVerifierConfig {
524            evm: EvmVerifierConfig::default(),
525            cache_capacity: 100_000,
526            local_rewards_address: rewards_address,
527        };
528        let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
529        let metrics_tracker = QuotingMetricsTracker::new(100);
530        let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
531
532        // Wire ML-DSA-65 signing so quote requests succeed
533        let identity = NodeIdentity::generate().expect("generate identity");
534        let pub_key_bytes = identity.public_key().as_bytes().to_vec();
535        let sk_bytes = identity.secret_key_bytes().to_vec();
536        let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
537        quote_generator.set_signer(pub_key_bytes, move |msg| {
538            use saorsa_pqc::pqc::MlDsaOperations;
539            let ml_dsa = MlDsa65::new();
540            ml_dsa
541                .sign(&sk, msg)
542                .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
543        });
544
545        let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
546        (protocol, temp_dir)
547    }
548
549    #[tokio::test]
550    async fn test_put_and_get_chunk() {
551        let (protocol, _temp) = create_test_protocol().await;
552
553        let content = b"hello world";
554        let address = LmdbStorage::compute_address(content);
555
556        // Pre-populate payment cache so EVM verification is bypassed
557        protocol.payment_verifier().cache_insert(address);
558
559        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
560        let put_msg = ChunkMessage {
561            request_id: 1,
562            body: ChunkMessageBody::PutRequest(put_request),
563        };
564        let put_bytes = put_msg.encode().expect("encode put");
565
566        // Handle PUT
567        let response_bytes = protocol
568            .try_handle_request(&put_bytes)
569            .await
570            .expect("handle put")
571            .expect("expected response");
572        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
573
574        assert_eq!(response.request_id, 1);
575        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
576            response.body
577        {
578            assert_eq!(addr, address);
579        } else {
580            panic!("expected PutResponse::Success, got: {response:?}");
581        }
582
583        // Create GET request
584        let get_request = ChunkGetRequest::new(address);
585        let get_msg = ChunkMessage {
586            request_id: 2,
587            body: ChunkMessageBody::GetRequest(get_request),
588        };
589        let get_bytes = get_msg.encode().expect("encode get");
590
591        // Handle GET
592        let response_bytes = protocol
593            .try_handle_request(&get_bytes)
594            .await
595            .expect("handle get")
596            .expect("expected response");
597        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
598
599        assert_eq!(response.request_id, 2);
600        if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
601            address: addr,
602            content: data,
603        }) = response.body
604        {
605            assert_eq!(addr, address);
606            assert_eq!(data, content.to_vec());
607        } else {
608            panic!("expected GetResponse::Success");
609        }
610    }
611
612    #[tokio::test]
613    async fn test_get_not_found() {
614        let (protocol, _temp) = create_test_protocol().await;
615
616        let address = [0xAB; 32];
617        let get_request = ChunkGetRequest::new(address);
618        let get_msg = ChunkMessage {
619            request_id: 10,
620            body: ChunkMessageBody::GetRequest(get_request),
621        };
622        let get_bytes = get_msg.encode().expect("encode get");
623
624        let response_bytes = protocol
625            .try_handle_request(&get_bytes)
626            .await
627            .expect("handle get")
628            .expect("expected response");
629        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
630
631        assert_eq!(response.request_id, 10);
632        if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
633            response.body
634        {
635            assert_eq!(addr, address);
636        } else {
637            panic!("expected GetResponse::NotFound");
638        }
639    }
640
641    #[tokio::test]
642    async fn test_put_address_mismatch() {
643        let (protocol, _temp) = create_test_protocol().await;
644
645        let content = b"test content";
646        let wrong_address = [0xFF; 32]; // Wrong address
647
648        // Pre-populate cache for the wrong address so we test address mismatch, not payment
649        protocol.payment_verifier().cache_insert(wrong_address);
650
651        let put_request = ChunkPutRequest::new(wrong_address, Bytes::copy_from_slice(content));
652        let put_msg = ChunkMessage {
653            request_id: 20,
654            body: ChunkMessageBody::PutRequest(put_request),
655        };
656        let put_bytes = put_msg.encode().expect("encode put");
657
658        let response_bytes = protocol
659            .try_handle_request(&put_bytes)
660            .await
661            .expect("handle put")
662            .expect("expected response");
663        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
664
665        assert_eq!(response.request_id, 20);
666        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
667            ProtocolError::AddressMismatch { .. },
668        )) = response.body
669        {
670            // Expected
671        } else {
672            panic!("expected AddressMismatch error, got: {response:?}");
673        }
674    }
675
676    #[tokio::test]
677    async fn test_put_chunk_too_large() {
678        let (protocol, _temp) = create_test_protocol().await;
679
680        // Create oversized content
681        let content = vec![0u8; MAX_CHUNK_SIZE + 1];
682        let address = LmdbStorage::compute_address(&content);
683
684        let put_request = ChunkPutRequest::new(address, Bytes::from(content));
685        let put_msg = ChunkMessage {
686            request_id: 30,
687            body: ChunkMessageBody::PutRequest(put_request),
688        };
689        let put_bytes = put_msg.encode().expect("encode put");
690
691        let response_bytes = protocol
692            .try_handle_request(&put_bytes)
693            .await
694            .expect("handle put")
695            .expect("expected response");
696        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
697
698        assert_eq!(response.request_id, 30);
699        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
700            ProtocolError::ChunkTooLarge { .. },
701        )) = response.body
702        {
703            // Expected
704        } else {
705            panic!("expected ChunkTooLarge error");
706        }
707    }
708
709    #[tokio::test]
710    async fn test_put_already_exists() {
711        let (protocol, _temp) = create_test_protocol().await;
712
713        let content = b"duplicate content";
714        let address = LmdbStorage::compute_address(content);
715
716        // Pre-populate cache so EVM verification is bypassed
717        protocol.payment_verifier().cache_insert(address);
718
719        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
720        let put_msg = ChunkMessage {
721            request_id: 40,
722            body: ChunkMessageBody::PutRequest(put_request),
723        };
724        let put_bytes = put_msg.encode().expect("encode put");
725
726        let _ = protocol
727            .try_handle_request(&put_bytes)
728            .await
729            .expect("handle put");
730
731        // Store again - should return AlreadyExists
732        let response_bytes = protocol
733            .try_handle_request(&put_bytes)
734            .await
735            .expect("handle put 2")
736            .expect("expected response");
737        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
738
739        assert_eq!(response.request_id, 40);
740        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
741            response.body
742        {
743            assert_eq!(addr, address);
744        } else {
745            panic!("expected AlreadyExists");
746        }
747    }
748
749    #[tokio::test]
750    async fn test_protocol_id() {
751        let (protocol, _temp) = create_test_protocol().await;
752        assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
753    }
754
755    #[tokio::test]
756    async fn test_exists_and_local_access() {
757        let (protocol, _temp) = create_test_protocol().await;
758
759        let content = b"local access test";
760        let address = LmdbStorage::compute_address(content);
761
762        assert!(!protocol.exists(&address).expect("exists check"));
763
764        protocol
765            .put_local(&address, content)
766            .await
767            .expect("put local");
768
769        assert!(protocol.exists(&address).expect("exists check"));
770
771        let retrieved = protocol.get_local(&address).await.expect("get local");
772        assert_eq!(retrieved, Some(content.to_vec()));
773    }
774
775    #[tokio::test]
776    async fn test_cache_insert_is_visible() {
777        let (protocol, _temp) = create_test_protocol().await;
778
779        let content = b"cache test content";
780        let address = LmdbStorage::compute_address(content);
781
782        // Before insert: cache should be empty
783        let stats_before = protocol.payment_cache_stats();
784        assert_eq!(stats_before.additions, 0);
785
786        // Pre-populate cache
787        protocol.payment_verifier().cache_insert(address);
788
789        // After insert: cache should have the xorname
790        let stats_after = protocol.payment_cache_stats();
791        assert_eq!(stats_after.additions, 1);
792
793        // PUT should succeed (cache hit)
794        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
795        let put_msg = ChunkMessage {
796            request_id: 100,
797            body: ChunkMessageBody::PutRequest(put_request),
798        };
799        let put_bytes = put_msg.encode().expect("encode put");
800        let response_bytes = protocol
801            .try_handle_request(&put_bytes)
802            .await
803            .expect("handle put")
804            .expect("expected response");
805        let response = ChunkMessage::decode(&response_bytes).expect("decode");
806
807        if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
808            // expected
809        } else {
810            panic!("expected success, got: {response:?}");
811        }
812    }
813
814    #[tokio::test]
815    async fn test_put_same_chunk_twice_hits_cache() {
816        let (protocol, _temp) = create_test_protocol().await;
817
818        let content = b"duplicate cache test";
819        let address = LmdbStorage::compute_address(content);
820
821        // Pre-populate cache for first PUT
822        protocol.payment_verifier().cache_insert(address);
823
824        // First PUT
825        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
826        let put_msg = ChunkMessage {
827            request_id: 110,
828            body: ChunkMessageBody::PutRequest(put_request),
829        };
830        let put_bytes = put_msg.encode().expect("encode put");
831        let _ = protocol
832            .try_handle_request(&put_bytes)
833            .await
834            .expect("handle put 1");
835
836        // Second PUT — should return AlreadyExists (checked in storage before payment)
837        let response_bytes = protocol
838            .try_handle_request(&put_bytes)
839            .await
840            .expect("handle put 2")
841            .expect("expected response");
842        let response = ChunkMessage::decode(&response_bytes).expect("decode");
843
844        if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
845        {
846            // expected — storage check comes before payment check
847        } else {
848            panic!("expected AlreadyExists, got: {response:?}");
849        }
850    }
851
852    #[tokio::test]
853    async fn test_payment_cache_stats_returns_correct_values() {
854        let (protocol, _temp) = create_test_protocol().await;
855
856        let stats = protocol.payment_cache_stats();
857        assert_eq!(stats.hits, 0);
858        assert_eq!(stats.misses, 0);
859        assert_eq!(stats.additions, 0);
860
861        // Pre-populate cache, then store a chunk to test stats
862        let content = b"stats test";
863        let address = LmdbStorage::compute_address(content);
864        protocol.payment_verifier().cache_insert(address);
865
866        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
867        let put_msg = ChunkMessage {
868            request_id: 120,
869            body: ChunkMessageBody::PutRequest(put_request),
870        };
871        let put_bytes = put_msg.encode().expect("encode put");
872        let _ = protocol
873            .try_handle_request(&put_bytes)
874            .await
875            .expect("handle put");
876
877        let stats = protocol.payment_cache_stats();
878        // Should have 1 addition (from cache_insert) + 1 hit (payment verification found cache)
879        assert_eq!(stats.additions, 1);
880        assert_eq!(stats.hits, 1);
881    }
882
883    #[tokio::test]
884    async fn test_storage_stats() {
885        let (protocol, _temp) = create_test_protocol().await;
886        let stats = protocol.storage_stats();
887        assert_eq!(stats.chunks_stored, 0);
888    }
889
890    #[tokio::test]
891    async fn test_merkle_candidate_quote_request() {
892        use ant_protocol::payment::verify::verify_merkle_candidate_signature;
893        use evmlib::merkle_payments::MerklePaymentCandidateNode;
894
895        // create_test_protocol already wires ML-DSA-65 signing
896        let (protocol, _temp) = create_test_protocol().await;
897
898        let address = [0x77; 32];
899        let timestamp = std::time::SystemTime::now()
900            .duration_since(std::time::UNIX_EPOCH)
901            .expect("system time")
902            .as_secs();
903
904        let request = MerkleCandidateQuoteRequest {
905            address,
906            data_type: DATA_TYPE_CHUNK,
907            data_size: 4096,
908            merkle_payment_timestamp: timestamp,
909        };
910        let msg = ChunkMessage {
911            request_id: 600,
912            body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
913        };
914        let msg_bytes = msg.encode().expect("encode request");
915
916        let response_bytes = protocol
917            .try_handle_request(&msg_bytes)
918            .await
919            .expect("handle merkle candidate quote")
920            .expect("expected response");
921        let response = ChunkMessage::decode(&response_bytes).expect("decode response");
922
923        assert_eq!(response.request_id, 600);
924        match response.body {
925            ChunkMessageBody::MerkleCandidateQuoteResponse(
926                MerkleCandidateQuoteResponse::Success { candidate_node },
927            ) => {
928                let candidate: MerklePaymentCandidateNode =
929                    rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
930
931                // Verify ML-DSA-65 signature
932                assert!(
933                    verify_merkle_candidate_signature(&candidate),
934                    "ML-DSA-65 candidate signature must be valid"
935                );
936
937                assert_eq!(candidate.merkle_payment_timestamp, timestamp);
938                // Node-calculated price based on records stored
939                assert!(candidate.price >= evmlib::common::Amount::ZERO);
940            }
941            other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
942        }
943    }
944
945    #[tokio::test]
946    async fn test_handle_unexpected_response_message() {
947        let (protocol, _temp) = create_test_protocol().await;
948
949        // Send a PutResponse as if it were a request — should return None
950        let msg = ChunkMessage {
951            request_id: 200,
952            body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
953        };
954        let msg_bytes = msg.encode().expect("encode");
955
956        let result = protocol
957            .try_handle_request(&msg_bytes)
958            .await
959            .expect("handle msg");
960
961        assert!(
962            result.is_none(),
963            "expected None for response message, got: {result:?}"
964        );
965    }
966
967    #[tokio::test]
968    async fn test_quote_already_stored_flag() {
969        let (protocol, _temp) = create_test_protocol().await;
970
971        let content = b"already stored quote test";
972        let address = LmdbStorage::compute_address(content);
973
974        // Store the chunk first
975        protocol.payment_verifier().cache_insert(address);
976        let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
977        let put_msg = ChunkMessage {
978            request_id: 300,
979            body: ChunkMessageBody::PutRequest(put_request),
980        };
981        let put_bytes = put_msg.encode().expect("encode put");
982        let _ = protocol
983            .try_handle_request(&put_bytes)
984            .await
985            .expect("handle put");
986
987        // Now request a quote for the same address — already_stored should be true
988        let quote_request = ChunkQuoteRequest {
989            address,
990            data_size: content.len() as u64,
991            data_type: DATA_TYPE_CHUNK,
992        };
993        let quote_msg = ChunkMessage {
994            request_id: 301,
995            body: ChunkMessageBody::QuoteRequest(quote_request),
996        };
997        let quote_bytes = quote_msg.encode().expect("encode quote");
998        let response_bytes = protocol
999            .try_handle_request(&quote_bytes)
1000            .await
1001            .expect("handle quote")
1002            .expect("expected response");
1003        let response = ChunkMessage::decode(&response_bytes).expect("decode");
1004
1005        match response.body {
1006            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1007                already_stored, ..
1008            }) => {
1009                assert!(
1010                    already_stored,
1011                    "already_stored should be true for existing chunk"
1012                );
1013            }
1014            other => panic!("expected Success with already_stored, got: {other:?}"),
1015        }
1016
1017        // Request a quote for a chunk that does NOT exist — already_stored should be false
1018        let new_address = [0xFFu8; 32];
1019        let quote_request2 = ChunkQuoteRequest {
1020            address: new_address,
1021            data_size: 100,
1022            data_type: DATA_TYPE_CHUNK,
1023        };
1024        let quote_msg2 = ChunkMessage {
1025            request_id: 302,
1026            body: ChunkMessageBody::QuoteRequest(quote_request2),
1027        };
1028        let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
1029        let response_bytes2 = protocol
1030            .try_handle_request(&quote_bytes2)
1031            .await
1032            .expect("handle quote2")
1033            .expect("expected response");
1034        let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
1035
1036        match response2.body {
1037            ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1038                already_stored, ..
1039            }) => {
1040                assert!(
1041                    !already_stored,
1042                    "already_stored should be false for new chunk"
1043                );
1044            }
1045            other => panic!("expected Success with already_stored=false, got: {other:?}"),
1046        }
1047    }
1048}