1#[cfg(test)]
31use crate::ant_protocol::DATA_TYPE_CHUNK;
32use crate::ant_protocol::{
33 ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
34 ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
35 MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE,
36};
37use crate::client::compute_address;
38use crate::error::{Error, Result};
39use crate::logging::{debug, info, warn};
40use crate::payment::{PaymentVerifier, QuoteGenerator};
41use crate::replication::fresh::FreshWriteEvent;
42use crate::storage::lmdb::LmdbStorage;
43use bytes::Bytes;
44use std::sync::Arc;
45use tokio::sync::mpsc;
46
47pub struct AntProtocol {
52 storage: Arc<LmdbStorage>,
54 payment_verifier: Arc<PaymentVerifier>,
56 quote_generator: Arc<QuoteGenerator>,
59 fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
61}
62
63impl AntProtocol {
64 #[must_use]
72 pub fn new(
73 storage: Arc<LmdbStorage>,
74 payment_verifier: Arc<PaymentVerifier>,
75 quote_generator: Arc<QuoteGenerator>,
76 ) -> Self {
77 Self {
78 storage,
79 payment_verifier,
80 quote_generator,
81 fresh_write_tx: None,
82 }
83 }
84
85 pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
90 self.fresh_write_tx = Some(tx);
91 }
92
93 #[must_use]
95 pub fn protocol_id(&self) -> &'static str {
96 CHUNK_PROTOCOL_ID
97 }
98
99 #[must_use]
101 pub fn storage(&self) -> Arc<LmdbStorage> {
102 Arc::clone(&self.storage)
103 }
104
105 #[must_use]
107 pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
108 Arc::clone(&self.payment_verifier)
109 }
110
111 pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
122 let message = ChunkMessage::decode(data)
123 .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
124
125 let request_id = message.request_id;
126
127 let response_body = match message.body {
128 ChunkMessageBody::PutRequest(req) => {
129 ChunkMessageBody::PutResponse(self.handle_put(req).await)
130 }
131 ChunkMessageBody::GetRequest(req) => {
132 ChunkMessageBody::GetResponse(self.handle_get(req).await)
133 }
134 ChunkMessageBody::QuoteRequest(ref req) => {
135 ChunkMessageBody::QuoteResponse(self.handle_quote(req))
136 }
137 ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
138 ChunkMessageBody::MerkleCandidateQuoteResponse(
139 self.handle_merkle_candidate_quote(req),
140 )
141 }
142 _ => return Ok(None),
154 };
155
156 let response = ChunkMessage {
157 request_id,
158 body: response_body,
159 };
160
161 response
162 .encode()
163 .map(|b| Some(Bytes::from(b)))
164 .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
165 }
166
167 async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
175 let start = std::time::Instant::now();
176 let addr_hex = hex::encode(request.address);
177 let chunk_size = request.content.len();
178 let response = self.handle_put_inner(request).await;
179 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
180 let outcome: &'static str = match &response {
181 ChunkPutResponse::Success { .. } => "success",
182 ChunkPutResponse::AlreadyExists { .. } => "already_exists",
183 ChunkPutResponse::PaymentRequired { .. } => "payment_required",
184 ChunkPutResponse::Error(_) => "error",
185 _ => "unknown",
186 };
187 info!(
188 target: "ant_node::storage::rpc_latency",
189 duration_ms,
190 chunk_size,
191 outcome,
192 addr = %addr_hex,
193 "put_rpc"
194 );
195 response
196 }
197
198 async fn handle_put_inner(&self, request: ChunkPutRequest) -> ChunkPutResponse {
200 let address = request.address;
201 let addr_hex = hex::encode(address);
202 debug!("Handling PUT request for {addr_hex}");
203
204 if request.content.len() > MAX_CHUNK_SIZE {
206 return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
207 size: request.content.len(),
208 max_size: MAX_CHUNK_SIZE,
209 });
210 }
211
212 let computed = compute_address(&request.content);
214 if computed != address {
215 return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
216 expected: address,
217 actual: computed,
218 });
219 }
220
221 match self.storage.exists(&address) {
223 Ok(true) => {
224 debug!("Chunk {addr_hex} already exists");
225 return ChunkPutResponse::AlreadyExists { address };
226 }
227 Err(e) => {
228 return ChunkPutResponse::Error(ProtocolError::Internal(format!(
229 "Storage read failed: {e}"
230 )));
231 }
232 Ok(false) => {}
233 }
234
235 let payment_result = self
237 .payment_verifier
238 .verify_payment(&address, request.payment_proof.as_deref())
239 .await;
240
241 match payment_result {
242 Ok(status) if status.can_store() => {
243 }
245 Ok(_) => {
246 return ChunkPutResponse::PaymentRequired {
247 message: "Payment required for new chunk".to_string(),
248 };
249 }
250 Err(e) => {
251 return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
252 }
253 }
254
255 match self.storage.put(&address, &request.content).await {
257 Ok(_) => {
258 let content_len = request.content.len();
259 info!("Stored chunk {addr_hex} ({content_len} bytes)");
260 self.quote_generator.record_store();
262
263 if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
268 let event = FreshWriteEvent {
274 key: address,
275 data: request.content.to_vec(),
276 payment_proof: proof,
277 };
278 if tx.send(event).is_err() {
279 debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
280 }
281 }
282
283 ChunkPutResponse::Success { address }
284 }
285 Err(e) => {
286 warn!("Failed to store chunk {addr_hex}: {e}");
287 ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
288 }
289 }
290 }
291
292 async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
297 let start = std::time::Instant::now();
298 let addr_hex = hex::encode(request.address);
299 let response = self.handle_get_inner(request).await;
300 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
301 let outcome: &'static str = match &response {
302 ChunkGetResponse::Success { .. } => "success",
303 ChunkGetResponse::NotFound { .. } => "not_found",
304 ChunkGetResponse::Error(_) => "error",
305 _ => "unknown",
306 };
307 info!(
308 target: "ant_node::storage::rpc_latency",
309 duration_ms,
310 outcome,
311 addr = %addr_hex,
312 "get_rpc"
313 );
314 response
315 }
316
317 async fn handle_get_inner(&self, request: ChunkGetRequest) -> ChunkGetResponse {
319 let address = request.address;
320 let addr_hex = hex::encode(address);
321 debug!("Handling GET request for {addr_hex}");
322
323 match self.storage.get(&address).await {
324 Ok(Some(content)) => {
325 let content_len = content.len();
326 debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
327 ChunkGetResponse::Success { address, content }
328 }
329 Ok(None) => {
330 debug!("Chunk {addr_hex} not found");
331 ChunkGetResponse::NotFound { address }
332 }
333 Err(e) => {
334 warn!("Failed to retrieve chunk {addr_hex}: {e}");
335 ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
336 }
337 }
338 }
339
340 fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
342 let addr_hex = hex::encode(request.address);
343 let data_size = request.data_size;
344 debug!("Handling quote request for {addr_hex} (size: {data_size})");
345
346 #[allow(clippy::manual_unwrap_or_default)]
352 let already_stored = match self.storage.exists(&request.address) {
353 Ok(exists) => exists,
354 Err(e) => {
355 warn!("Storage check failed for {addr_hex}: {e}");
356 false }
358 };
359
360 if already_stored {
361 debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
362 }
363
364 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
366 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
367 size: MAX_CHUNK_SIZE + 1,
368 max_size: MAX_CHUNK_SIZE,
369 });
370 };
371 if data_size_usize > MAX_CHUNK_SIZE {
372 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
373 size: data_size_usize,
374 max_size: MAX_CHUNK_SIZE,
375 });
376 }
377
378 match self
379 .quote_generator
380 .create_quote(request.address, data_size_usize, request.data_type)
381 {
382 Ok(quote) => {
383 match rmp_serde::to_vec("e) {
385 Ok(quote_bytes) => ChunkQuoteResponse::Success {
386 quote: quote_bytes,
387 already_stored,
388 },
389 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
390 "Failed to serialize quote: {e}"
391 ))),
392 }
393 }
394 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
395 }
396 }
397
398 fn handle_merkle_candidate_quote(
400 &self,
401 request: &MerkleCandidateQuoteRequest,
402 ) -> MerkleCandidateQuoteResponse {
403 let addr_hex = hex::encode(request.address);
404 let data_size = request.data_size;
405 debug!(
406 "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
407 request.merkle_payment_timestamp
408 );
409
410 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
411 return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
412 "data_size {} overflows usize",
413 request.data_size
414 )));
415 };
416 if data_size_usize > MAX_CHUNK_SIZE {
417 return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
418 size: data_size_usize,
419 max_size: MAX_CHUNK_SIZE,
420 });
421 }
422
423 match self.quote_generator.create_merkle_candidate_quote(
424 data_size_usize,
425 request.data_type,
426 request.merkle_payment_timestamp,
427 ) {
428 Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
429 Ok(bytes) => MerkleCandidateQuoteResponse::Success {
430 candidate_node: bytes,
431 },
432 Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
433 "Failed to serialize merkle candidate node: {e}"
434 ))),
435 },
436 Err(e) => {
437 MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
438 }
439 }
440 }
441
442 #[must_use]
444 pub fn storage_stats(&self) -> crate::storage::StorageStats {
445 self.storage.stats()
446 }
447
448 #[must_use]
450 pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
451 self.payment_verifier.cache_stats()
452 }
453
454 #[cfg(any(test, feature = "test-utils"))]
460 #[must_use]
461 pub fn payment_verifier(&self) -> &PaymentVerifier {
462 &self.payment_verifier
463 }
464
465 pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
471 self.storage.exists(address)
472 }
473
474 pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
480 self.storage.get(address).await
481 }
482
483 #[cfg(test)]
491 pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
492 self.storage.put(address, content).await
493 }
494}
495
496#[cfg(test)]
497#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
498mod tests {
499 use super::*;
500 use crate::payment::metrics::QuotingMetricsTracker;
501 use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
502 use crate::storage::LmdbStorageConfig;
503 use evmlib::RewardsAddress;
504 use saorsa_core::identity::NodeIdentity;
505 use saorsa_core::MlDsa65;
506 use saorsa_pqc::pqc::types::MlDsaSecretKey;
507 use tempfile::TempDir;
508
509 async fn create_test_protocol() -> (AntProtocol, TempDir) {
510 let temp_dir = TempDir::new().expect("create temp dir");
511
512 let storage_config = LmdbStorageConfig {
513 root_dir: temp_dir.path().to_path_buf(),
514 ..LmdbStorageConfig::test_default()
515 };
516 let storage = Arc::new(
517 LmdbStorage::new(storage_config)
518 .await
519 .expect("create storage"),
520 );
521
522 let rewards_address = RewardsAddress::new([1u8; 20]);
523 let payment_config = PaymentVerifierConfig {
524 evm: EvmVerifierConfig::default(),
525 cache_capacity: 100_000,
526 local_rewards_address: rewards_address,
527 };
528 let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
529 let metrics_tracker = QuotingMetricsTracker::new(100);
530 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
531
532 let identity = NodeIdentity::generate().expect("generate identity");
534 let pub_key_bytes = identity.public_key().as_bytes().to_vec();
535 let sk_bytes = identity.secret_key_bytes().to_vec();
536 let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
537 quote_generator.set_signer(pub_key_bytes, move |msg| {
538 use saorsa_pqc::pqc::MlDsaOperations;
539 let ml_dsa = MlDsa65::new();
540 ml_dsa
541 .sign(&sk, msg)
542 .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
543 });
544
545 let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
546 (protocol, temp_dir)
547 }
548
549 #[tokio::test]
550 async fn test_put_and_get_chunk() {
551 let (protocol, _temp) = create_test_protocol().await;
552
553 let content = b"hello world";
554 let address = LmdbStorage::compute_address(content);
555
556 protocol.payment_verifier().cache_insert(address);
558
559 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
560 let put_msg = ChunkMessage {
561 request_id: 1,
562 body: ChunkMessageBody::PutRequest(put_request),
563 };
564 let put_bytes = put_msg.encode().expect("encode put");
565
566 let response_bytes = protocol
568 .try_handle_request(&put_bytes)
569 .await
570 .expect("handle put")
571 .expect("expected response");
572 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
573
574 assert_eq!(response.request_id, 1);
575 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
576 response.body
577 {
578 assert_eq!(addr, address);
579 } else {
580 panic!("expected PutResponse::Success, got: {response:?}");
581 }
582
583 let get_request = ChunkGetRequest::new(address);
585 let get_msg = ChunkMessage {
586 request_id: 2,
587 body: ChunkMessageBody::GetRequest(get_request),
588 };
589 let get_bytes = get_msg.encode().expect("encode get");
590
591 let response_bytes = protocol
593 .try_handle_request(&get_bytes)
594 .await
595 .expect("handle get")
596 .expect("expected response");
597 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
598
599 assert_eq!(response.request_id, 2);
600 if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
601 address: addr,
602 content: data,
603 }) = response.body
604 {
605 assert_eq!(addr, address);
606 assert_eq!(data, content.to_vec());
607 } else {
608 panic!("expected GetResponse::Success");
609 }
610 }
611
612 #[tokio::test]
613 async fn test_get_not_found() {
614 let (protocol, _temp) = create_test_protocol().await;
615
616 let address = [0xAB; 32];
617 let get_request = ChunkGetRequest::new(address);
618 let get_msg = ChunkMessage {
619 request_id: 10,
620 body: ChunkMessageBody::GetRequest(get_request),
621 };
622 let get_bytes = get_msg.encode().expect("encode get");
623
624 let response_bytes = protocol
625 .try_handle_request(&get_bytes)
626 .await
627 .expect("handle get")
628 .expect("expected response");
629 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
630
631 assert_eq!(response.request_id, 10);
632 if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
633 response.body
634 {
635 assert_eq!(addr, address);
636 } else {
637 panic!("expected GetResponse::NotFound");
638 }
639 }
640
641 #[tokio::test]
642 async fn test_put_address_mismatch() {
643 let (protocol, _temp) = create_test_protocol().await;
644
645 let content = b"test content";
646 let wrong_address = [0xFF; 32]; protocol.payment_verifier().cache_insert(wrong_address);
650
651 let put_request = ChunkPutRequest::new(wrong_address, Bytes::copy_from_slice(content));
652 let put_msg = ChunkMessage {
653 request_id: 20,
654 body: ChunkMessageBody::PutRequest(put_request),
655 };
656 let put_bytes = put_msg.encode().expect("encode put");
657
658 let response_bytes = protocol
659 .try_handle_request(&put_bytes)
660 .await
661 .expect("handle put")
662 .expect("expected response");
663 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
664
665 assert_eq!(response.request_id, 20);
666 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
667 ProtocolError::AddressMismatch { .. },
668 )) = response.body
669 {
670 } else {
672 panic!("expected AddressMismatch error, got: {response:?}");
673 }
674 }
675
676 #[tokio::test]
677 async fn test_put_chunk_too_large() {
678 let (protocol, _temp) = create_test_protocol().await;
679
680 let content = vec![0u8; MAX_CHUNK_SIZE + 1];
682 let address = LmdbStorage::compute_address(&content);
683
684 let put_request = ChunkPutRequest::new(address, Bytes::from(content));
685 let put_msg = ChunkMessage {
686 request_id: 30,
687 body: ChunkMessageBody::PutRequest(put_request),
688 };
689 let put_bytes = put_msg.encode().expect("encode put");
690
691 let response_bytes = protocol
692 .try_handle_request(&put_bytes)
693 .await
694 .expect("handle put")
695 .expect("expected response");
696 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
697
698 assert_eq!(response.request_id, 30);
699 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
700 ProtocolError::ChunkTooLarge { .. },
701 )) = response.body
702 {
703 } else {
705 panic!("expected ChunkTooLarge error");
706 }
707 }
708
709 #[tokio::test]
710 async fn test_put_already_exists() {
711 let (protocol, _temp) = create_test_protocol().await;
712
713 let content = b"duplicate content";
714 let address = LmdbStorage::compute_address(content);
715
716 protocol.payment_verifier().cache_insert(address);
718
719 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
720 let put_msg = ChunkMessage {
721 request_id: 40,
722 body: ChunkMessageBody::PutRequest(put_request),
723 };
724 let put_bytes = put_msg.encode().expect("encode put");
725
726 let _ = protocol
727 .try_handle_request(&put_bytes)
728 .await
729 .expect("handle put");
730
731 let response_bytes = protocol
733 .try_handle_request(&put_bytes)
734 .await
735 .expect("handle put 2")
736 .expect("expected response");
737 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
738
739 assert_eq!(response.request_id, 40);
740 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
741 response.body
742 {
743 assert_eq!(addr, address);
744 } else {
745 panic!("expected AlreadyExists");
746 }
747 }
748
749 #[tokio::test]
750 async fn test_protocol_id() {
751 let (protocol, _temp) = create_test_protocol().await;
752 assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
753 }
754
755 #[tokio::test]
756 async fn test_exists_and_local_access() {
757 let (protocol, _temp) = create_test_protocol().await;
758
759 let content = b"local access test";
760 let address = LmdbStorage::compute_address(content);
761
762 assert!(!protocol.exists(&address).expect("exists check"));
763
764 protocol
765 .put_local(&address, content)
766 .await
767 .expect("put local");
768
769 assert!(protocol.exists(&address).expect("exists check"));
770
771 let retrieved = protocol.get_local(&address).await.expect("get local");
772 assert_eq!(retrieved, Some(content.to_vec()));
773 }
774
775 #[tokio::test]
776 async fn test_cache_insert_is_visible() {
777 let (protocol, _temp) = create_test_protocol().await;
778
779 let content = b"cache test content";
780 let address = LmdbStorage::compute_address(content);
781
782 let stats_before = protocol.payment_cache_stats();
784 assert_eq!(stats_before.additions, 0);
785
786 protocol.payment_verifier().cache_insert(address);
788
789 let stats_after = protocol.payment_cache_stats();
791 assert_eq!(stats_after.additions, 1);
792
793 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
795 let put_msg = ChunkMessage {
796 request_id: 100,
797 body: ChunkMessageBody::PutRequest(put_request),
798 };
799 let put_bytes = put_msg.encode().expect("encode put");
800 let response_bytes = protocol
801 .try_handle_request(&put_bytes)
802 .await
803 .expect("handle put")
804 .expect("expected response");
805 let response = ChunkMessage::decode(&response_bytes).expect("decode");
806
807 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
808 } else {
810 panic!("expected success, got: {response:?}");
811 }
812 }
813
814 #[tokio::test]
815 async fn test_put_same_chunk_twice_hits_cache() {
816 let (protocol, _temp) = create_test_protocol().await;
817
818 let content = b"duplicate cache test";
819 let address = LmdbStorage::compute_address(content);
820
821 protocol.payment_verifier().cache_insert(address);
823
824 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
826 let put_msg = ChunkMessage {
827 request_id: 110,
828 body: ChunkMessageBody::PutRequest(put_request),
829 };
830 let put_bytes = put_msg.encode().expect("encode put");
831 let _ = protocol
832 .try_handle_request(&put_bytes)
833 .await
834 .expect("handle put 1");
835
836 let response_bytes = protocol
838 .try_handle_request(&put_bytes)
839 .await
840 .expect("handle put 2")
841 .expect("expected response");
842 let response = ChunkMessage::decode(&response_bytes).expect("decode");
843
844 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
845 {
846 } else {
848 panic!("expected AlreadyExists, got: {response:?}");
849 }
850 }
851
852 #[tokio::test]
853 async fn test_payment_cache_stats_returns_correct_values() {
854 let (protocol, _temp) = create_test_protocol().await;
855
856 let stats = protocol.payment_cache_stats();
857 assert_eq!(stats.hits, 0);
858 assert_eq!(stats.misses, 0);
859 assert_eq!(stats.additions, 0);
860
861 let content = b"stats test";
863 let address = LmdbStorage::compute_address(content);
864 protocol.payment_verifier().cache_insert(address);
865
866 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
867 let put_msg = ChunkMessage {
868 request_id: 120,
869 body: ChunkMessageBody::PutRequest(put_request),
870 };
871 let put_bytes = put_msg.encode().expect("encode put");
872 let _ = protocol
873 .try_handle_request(&put_bytes)
874 .await
875 .expect("handle put");
876
877 let stats = protocol.payment_cache_stats();
878 assert_eq!(stats.additions, 1);
880 assert_eq!(stats.hits, 1);
881 }
882
883 #[tokio::test]
884 async fn test_storage_stats() {
885 let (protocol, _temp) = create_test_protocol().await;
886 let stats = protocol.storage_stats();
887 assert_eq!(stats.chunks_stored, 0);
888 }
889
890 #[tokio::test]
891 async fn test_merkle_candidate_quote_request() {
892 use ant_protocol::payment::verify::verify_merkle_candidate_signature;
893 use evmlib::merkle_payments::MerklePaymentCandidateNode;
894
895 let (protocol, _temp) = create_test_protocol().await;
897
898 let address = [0x77; 32];
899 let timestamp = std::time::SystemTime::now()
900 .duration_since(std::time::UNIX_EPOCH)
901 .expect("system time")
902 .as_secs();
903
904 let request = MerkleCandidateQuoteRequest {
905 address,
906 data_type: DATA_TYPE_CHUNK,
907 data_size: 4096,
908 merkle_payment_timestamp: timestamp,
909 };
910 let msg = ChunkMessage {
911 request_id: 600,
912 body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
913 };
914 let msg_bytes = msg.encode().expect("encode request");
915
916 let response_bytes = protocol
917 .try_handle_request(&msg_bytes)
918 .await
919 .expect("handle merkle candidate quote")
920 .expect("expected response");
921 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
922
923 assert_eq!(response.request_id, 600);
924 match response.body {
925 ChunkMessageBody::MerkleCandidateQuoteResponse(
926 MerkleCandidateQuoteResponse::Success { candidate_node },
927 ) => {
928 let candidate: MerklePaymentCandidateNode =
929 rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
930
931 assert!(
933 verify_merkle_candidate_signature(&candidate),
934 "ML-DSA-65 candidate signature must be valid"
935 );
936
937 assert_eq!(candidate.merkle_payment_timestamp, timestamp);
938 assert!(candidate.price >= evmlib::common::Amount::ZERO);
940 }
941 other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
942 }
943 }
944
945 #[tokio::test]
946 async fn test_handle_unexpected_response_message() {
947 let (protocol, _temp) = create_test_protocol().await;
948
949 let msg = ChunkMessage {
951 request_id: 200,
952 body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
953 };
954 let msg_bytes = msg.encode().expect("encode");
955
956 let result = protocol
957 .try_handle_request(&msg_bytes)
958 .await
959 .expect("handle msg");
960
961 assert!(
962 result.is_none(),
963 "expected None for response message, got: {result:?}"
964 );
965 }
966
967 #[tokio::test]
968 async fn test_quote_already_stored_flag() {
969 let (protocol, _temp) = create_test_protocol().await;
970
971 let content = b"already stored quote test";
972 let address = LmdbStorage::compute_address(content);
973
974 protocol.payment_verifier().cache_insert(address);
976 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
977 let put_msg = ChunkMessage {
978 request_id: 300,
979 body: ChunkMessageBody::PutRequest(put_request),
980 };
981 let put_bytes = put_msg.encode().expect("encode put");
982 let _ = protocol
983 .try_handle_request(&put_bytes)
984 .await
985 .expect("handle put");
986
987 let quote_request = ChunkQuoteRequest {
989 address,
990 data_size: content.len() as u64,
991 data_type: DATA_TYPE_CHUNK,
992 };
993 let quote_msg = ChunkMessage {
994 request_id: 301,
995 body: ChunkMessageBody::QuoteRequest(quote_request),
996 };
997 let quote_bytes = quote_msg.encode().expect("encode quote");
998 let response_bytes = protocol
999 .try_handle_request("e_bytes)
1000 .await
1001 .expect("handle quote")
1002 .expect("expected response");
1003 let response = ChunkMessage::decode(&response_bytes).expect("decode");
1004
1005 match response.body {
1006 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1007 already_stored, ..
1008 }) => {
1009 assert!(
1010 already_stored,
1011 "already_stored should be true for existing chunk"
1012 );
1013 }
1014 other => panic!("expected Success with already_stored, got: {other:?}"),
1015 }
1016
1017 let new_address = [0xFFu8; 32];
1019 let quote_request2 = ChunkQuoteRequest {
1020 address: new_address,
1021 data_size: 100,
1022 data_type: DATA_TYPE_CHUNK,
1023 };
1024 let quote_msg2 = ChunkMessage {
1025 request_id: 302,
1026 body: ChunkMessageBody::QuoteRequest(quote_request2),
1027 };
1028 let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
1029 let response_bytes2 = protocol
1030 .try_handle_request("e_bytes2)
1031 .await
1032 .expect("handle quote2")
1033 .expect("expected response");
1034 let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
1035
1036 match response2.body {
1037 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1038 already_stored, ..
1039 }) => {
1040 assert!(
1041 !already_stored,
1042 "already_stored should be false for new chunk"
1043 );
1044 }
1045 other => panic!("expected Success with already_stored=false, got: {other:?}"),
1046 }
1047 }
1048}