1#[cfg(test)]
31use crate::ant_protocol::DATA_TYPE_CHUNK;
32use crate::ant_protocol::{
33 ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
34 ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
35 MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE,
36};
37use crate::client::compute_address;
38use crate::error::{Error, Result};
39use crate::logging::{debug, info, warn};
40use crate::payment::{PaymentVerifier, QuoteGenerator, VerificationContext};
41use crate::replication::fresh::FreshWriteEvent;
42use crate::storage::lmdb::LmdbStorage;
43use bytes::Bytes;
44use std::sync::Arc;
45use tokio::sync::mpsc;
46
47pub struct AntProtocol {
52 storage: Arc<LmdbStorage>,
54 payment_verifier: Arc<PaymentVerifier>,
56 quote_generator: Arc<QuoteGenerator>,
59 fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
61}
62
63impl AntProtocol {
64 #[must_use]
72 pub fn new(
73 storage: Arc<LmdbStorage>,
74 payment_verifier: Arc<PaymentVerifier>,
75 quote_generator: Arc<QuoteGenerator>,
76 ) -> Self {
77 payment_verifier.attach_storage(Arc::clone(&storage));
86 quote_generator.attach_storage(Arc::clone(&storage));
87
88 Self {
89 storage,
90 payment_verifier,
91 quote_generator,
92 fresh_write_tx: None,
93 }
94 }
95
96 pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
101 self.fresh_write_tx = Some(tx);
102 }
103
104 #[must_use]
106 pub fn protocol_id(&self) -> &'static str {
107 CHUNK_PROTOCOL_ID
108 }
109
110 #[must_use]
112 pub fn storage(&self) -> Arc<LmdbStorage> {
113 Arc::clone(&self.storage)
114 }
115
116 #[must_use]
118 pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
119 Arc::clone(&self.payment_verifier)
120 }
121
122 pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
133 let message = ChunkMessage::decode(data)
134 .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
135
136 let request_id = message.request_id;
137
138 let response_body = match message.body {
139 ChunkMessageBody::PutRequest(req) => {
140 ChunkMessageBody::PutResponse(self.handle_put(req).await)
141 }
142 ChunkMessageBody::GetRequest(req) => {
143 ChunkMessageBody::GetResponse(self.handle_get(req).await)
144 }
145 ChunkMessageBody::QuoteRequest(ref req) => {
146 ChunkMessageBody::QuoteResponse(self.handle_quote(req))
147 }
148 ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
149 ChunkMessageBody::MerkleCandidateQuoteResponse(
150 self.handle_merkle_candidate_quote(req),
151 )
152 }
153 _ => return Ok(None),
165 };
166
167 let response = ChunkMessage {
168 request_id,
169 body: response_body,
170 };
171
172 response
173 .encode()
174 .map(|b| Some(Bytes::from(b)))
175 .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
176 }
177
178 async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
186 let start = std::time::Instant::now();
187 let addr_hex = hex::encode(request.address);
188 let chunk_size = request.content.len();
189 let response = self.handle_put_inner(request).await;
190 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
191 let outcome: &'static str = match &response {
192 ChunkPutResponse::Success { .. } => "success",
193 ChunkPutResponse::AlreadyExists { .. } => "already_exists",
194 ChunkPutResponse::PaymentRequired { .. } => "payment_required",
195 ChunkPutResponse::Error(_) => "error",
196 _ => "unknown",
197 };
198 info!(
199 target: "ant_node::storage::rpc_latency",
200 duration_ms,
201 chunk_size,
202 outcome,
203 addr = %addr_hex,
204 "put_rpc"
205 );
206 response
207 }
208
209 async fn handle_put_inner(&self, request: ChunkPutRequest) -> ChunkPutResponse {
211 let address = request.address;
212 let addr_hex = hex::encode(address);
213 debug!("Handling PUT request for {addr_hex}");
214
215 if request.content.len() > MAX_CHUNK_SIZE {
217 return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
218 size: request.content.len(),
219 max_size: MAX_CHUNK_SIZE,
220 });
221 }
222
223 let computed = compute_address(&request.content);
225 if computed != address {
226 return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
227 expected: address,
228 actual: computed,
229 });
230 }
231
232 match self.storage.exists(&address) {
234 Ok(true) => {
235 debug!("Chunk {addr_hex} already exists");
236 return ChunkPutResponse::AlreadyExists { address };
237 }
238 Err(e) => {
239 return ChunkPutResponse::Error(ProtocolError::Internal(format!(
240 "Storage read failed: {e}"
241 )));
242 }
243 Ok(false) => {}
244 }
245
246 let payment_result = self
250 .payment_verifier
251 .verify_payment(
252 &address,
253 request.payment_proof.as_deref(),
254 VerificationContext::ClientPut,
255 )
256 .await;
257
258 match payment_result {
259 Ok(status) if status.can_store() => {
260 }
262 Ok(_) => {
263 return ChunkPutResponse::PaymentRequired {
264 message: "Payment required for new chunk".to_string(),
265 };
266 }
267 Err(e) => {
268 return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
269 }
270 }
271
272 match self.storage.put(&address, &request.content).await {
274 Ok(_) => {
275 let content_len = request.content.len();
276 info!("Stored chunk {addr_hex} ({content_len} bytes)");
277 self.quote_generator.record_store();
283
284 if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
289 let event = FreshWriteEvent {
295 key: address,
296 data: request.content.to_vec(),
297 payment_proof: proof,
298 };
299 if tx.send(event).is_err() {
300 debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
301 }
302 }
303
304 ChunkPutResponse::Success { address }
305 }
306 Err(e) => {
307 warn!("Failed to store chunk {addr_hex}: {e}");
308 ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
309 }
310 }
311 }
312
313 async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
318 let start = std::time::Instant::now();
319 let addr_hex = hex::encode(request.address);
320 let response = self.handle_get_inner(request).await;
321 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
322 let outcome: &'static str = match &response {
323 ChunkGetResponse::Success { .. } => "success",
324 ChunkGetResponse::NotFound { .. } => "not_found",
325 ChunkGetResponse::Error(_) => "error",
326 _ => "unknown",
327 };
328 info!(
329 target: "ant_node::storage::rpc_latency",
330 duration_ms,
331 outcome,
332 addr = %addr_hex,
333 "get_rpc"
334 );
335 response
336 }
337
338 async fn handle_get_inner(&self, request: ChunkGetRequest) -> ChunkGetResponse {
340 let address = request.address;
341 let addr_hex = hex::encode(address);
342 debug!("Handling GET request for {addr_hex}");
343
344 match self.storage.get(&address).await {
345 Ok(Some(content)) => {
346 let content_len = content.len();
347 debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
348 ChunkGetResponse::Success { address, content }
349 }
350 Ok(None) => {
351 debug!("Chunk {addr_hex} not found");
352 ChunkGetResponse::NotFound { address }
353 }
354 Err(e) => {
355 warn!("Failed to retrieve chunk {addr_hex}: {e}");
356 ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
357 }
358 }
359 }
360
361 fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
363 let addr_hex = hex::encode(request.address);
364 let data_size = request.data_size;
365 debug!("Handling quote request for {addr_hex} (size: {data_size})");
366
367 #[allow(clippy::manual_unwrap_or_default)]
373 let already_stored = match self.storage.exists(&request.address) {
374 Ok(exists) => exists,
375 Err(e) => {
376 warn!("Storage check failed for {addr_hex}: {e}");
377 false }
379 };
380
381 if already_stored {
382 debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
383 }
384
385 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
387 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
388 size: MAX_CHUNK_SIZE + 1,
389 max_size: MAX_CHUNK_SIZE,
390 });
391 };
392 if data_size_usize > MAX_CHUNK_SIZE {
393 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
394 size: data_size_usize,
395 max_size: MAX_CHUNK_SIZE,
396 });
397 }
398
399 match self
400 .quote_generator
401 .create_quote(request.address, data_size_usize, request.data_type)
402 {
403 Ok(quote) => {
404 match rmp_serde::to_vec("e) {
406 Ok(quote_bytes) => ChunkQuoteResponse::Success {
407 quote: quote_bytes,
408 already_stored,
409 },
410 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
411 "Failed to serialize quote: {e}"
412 ))),
413 }
414 }
415 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
416 }
417 }
418
419 fn handle_merkle_candidate_quote(
421 &self,
422 request: &MerkleCandidateQuoteRequest,
423 ) -> MerkleCandidateQuoteResponse {
424 let addr_hex = hex::encode(request.address);
425 let data_size = request.data_size;
426 debug!(
427 "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
428 request.merkle_payment_timestamp
429 );
430
431 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
432 return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
433 "data_size {} overflows usize",
434 request.data_size
435 )));
436 };
437 if data_size_usize > MAX_CHUNK_SIZE {
438 return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
439 size: data_size_usize,
440 max_size: MAX_CHUNK_SIZE,
441 });
442 }
443
444 match self.quote_generator.create_merkle_candidate_quote(
445 data_size_usize,
446 request.data_type,
447 request.merkle_payment_timestamp,
448 ) {
449 Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
450 Ok(bytes) => MerkleCandidateQuoteResponse::Success {
451 candidate_node: bytes,
452 },
453 Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
454 "Failed to serialize merkle candidate node: {e}"
455 ))),
456 },
457 Err(e) => {
458 MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
459 }
460 }
461 }
462
463 #[must_use]
465 pub fn storage_stats(&self) -> crate::storage::StorageStats {
466 self.storage.stats()
467 }
468
469 #[must_use]
471 pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
472 self.payment_verifier.cache_stats()
473 }
474
475 #[cfg(any(test, feature = "test-utils"))]
481 #[must_use]
482 pub fn payment_verifier(&self) -> &PaymentVerifier {
483 &self.payment_verifier
484 }
485
486 pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
492 self.storage.exists(address)
493 }
494
495 pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
501 self.storage.get(address).await
502 }
503
504 #[cfg(test)]
512 pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
513 self.storage.put(address, content).await
514 }
515}
516
517#[cfg(test)]
518#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
519mod tests {
520 use super::*;
521 use crate::payment::metrics::QuotingMetricsTracker;
522 use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
523 use crate::storage::LmdbStorageConfig;
524 use evmlib::RewardsAddress;
525 use saorsa_core::identity::NodeIdentity;
526 use saorsa_core::MlDsa65;
527 use saorsa_pqc::pqc::types::MlDsaSecretKey;
528 use tempfile::TempDir;
529
530 async fn create_test_protocol() -> (AntProtocol, TempDir) {
531 let temp_dir = TempDir::new().expect("create temp dir");
532
533 let storage_config = LmdbStorageConfig {
534 root_dir: temp_dir.path().to_path_buf(),
535 ..LmdbStorageConfig::test_default()
536 };
537 let storage = Arc::new(
538 LmdbStorage::new(storage_config)
539 .await
540 .expect("create storage"),
541 );
542
543 let rewards_address = RewardsAddress::new([1u8; 20]);
544 let payment_config = PaymentVerifierConfig {
545 evm: EvmVerifierConfig::default(),
546 cache_capacity: 100_000,
547 local_rewards_address: rewards_address,
548 };
549 let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
550 let metrics_tracker = QuotingMetricsTracker::new(100);
551 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
552
553 let identity = NodeIdentity::generate().expect("generate identity");
555 let pub_key_bytes = identity.public_key().as_bytes().to_vec();
556 let sk_bytes = identity.secret_key_bytes().to_vec();
557 let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
558 quote_generator.set_signer(pub_key_bytes, move |msg| {
559 use saorsa_pqc::pqc::MlDsaOperations;
560 let ml_dsa = MlDsa65::new();
561 ml_dsa
562 .sign(&sk, msg)
563 .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
564 });
565
566 let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
567 (protocol, temp_dir)
568 }
569
570 #[tokio::test]
571 async fn test_put_and_get_chunk() {
572 let (protocol, _temp) = create_test_protocol().await;
573
574 let content = b"hello world";
575 let address = LmdbStorage::compute_address(content);
576
577 protocol.payment_verifier().cache_insert(address);
579
580 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
581 let put_msg = ChunkMessage {
582 request_id: 1,
583 body: ChunkMessageBody::PutRequest(put_request),
584 };
585 let put_bytes = put_msg.encode().expect("encode put");
586
587 let response_bytes = protocol
589 .try_handle_request(&put_bytes)
590 .await
591 .expect("handle put")
592 .expect("expected response");
593 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
594
595 assert_eq!(response.request_id, 1);
596 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
597 response.body
598 {
599 assert_eq!(addr, address);
600 } else {
601 panic!("expected PutResponse::Success, got: {response:?}");
602 }
603
604 let get_request = ChunkGetRequest::new(address);
606 let get_msg = ChunkMessage {
607 request_id: 2,
608 body: ChunkMessageBody::GetRequest(get_request),
609 };
610 let get_bytes = get_msg.encode().expect("encode get");
611
612 let response_bytes = protocol
614 .try_handle_request(&get_bytes)
615 .await
616 .expect("handle get")
617 .expect("expected response");
618 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
619
620 assert_eq!(response.request_id, 2);
621 if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
622 address: addr,
623 content: data,
624 }) = response.body
625 {
626 assert_eq!(addr, address);
627 assert_eq!(data, content.to_vec());
628 } else {
629 panic!("expected GetResponse::Success");
630 }
631 }
632
633 #[tokio::test]
634 async fn test_get_not_found() {
635 let (protocol, _temp) = create_test_protocol().await;
636
637 let address = [0xAB; 32];
638 let get_request = ChunkGetRequest::new(address);
639 let get_msg = ChunkMessage {
640 request_id: 10,
641 body: ChunkMessageBody::GetRequest(get_request),
642 };
643 let get_bytes = get_msg.encode().expect("encode get");
644
645 let response_bytes = protocol
646 .try_handle_request(&get_bytes)
647 .await
648 .expect("handle get")
649 .expect("expected response");
650 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
651
652 assert_eq!(response.request_id, 10);
653 if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
654 response.body
655 {
656 assert_eq!(addr, address);
657 } else {
658 panic!("expected GetResponse::NotFound");
659 }
660 }
661
662 #[tokio::test]
663 async fn test_put_address_mismatch() {
664 let (protocol, _temp) = create_test_protocol().await;
665
666 let content = b"test content";
667 let wrong_address = [0xFF; 32]; protocol.payment_verifier().cache_insert(wrong_address);
671
672 let put_request = ChunkPutRequest::new(wrong_address, Bytes::copy_from_slice(content));
673 let put_msg = ChunkMessage {
674 request_id: 20,
675 body: ChunkMessageBody::PutRequest(put_request),
676 };
677 let put_bytes = put_msg.encode().expect("encode put");
678
679 let response_bytes = protocol
680 .try_handle_request(&put_bytes)
681 .await
682 .expect("handle put")
683 .expect("expected response");
684 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
685
686 assert_eq!(response.request_id, 20);
687 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
688 ProtocolError::AddressMismatch { .. },
689 )) = response.body
690 {
691 } else {
693 panic!("expected AddressMismatch error, got: {response:?}");
694 }
695 }
696
697 #[tokio::test]
698 async fn test_put_chunk_too_large() {
699 let (protocol, _temp) = create_test_protocol().await;
700
701 let content = vec![0u8; MAX_CHUNK_SIZE + 1];
703 let address = LmdbStorage::compute_address(&content);
704
705 let put_request = ChunkPutRequest::new(address, Bytes::from(content));
706 let put_msg = ChunkMessage {
707 request_id: 30,
708 body: ChunkMessageBody::PutRequest(put_request),
709 };
710 let put_bytes = put_msg.encode().expect("encode put");
711
712 let response_bytes = protocol
713 .try_handle_request(&put_bytes)
714 .await
715 .expect("handle put")
716 .expect("expected response");
717 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
718
719 assert_eq!(response.request_id, 30);
720 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
721 ProtocolError::ChunkTooLarge { .. },
722 )) = response.body
723 {
724 } else {
726 panic!("expected ChunkTooLarge error");
727 }
728 }
729
730 #[tokio::test]
731 async fn test_put_already_exists() {
732 let (protocol, _temp) = create_test_protocol().await;
733
734 let content = b"duplicate content";
735 let address = LmdbStorage::compute_address(content);
736
737 protocol.payment_verifier().cache_insert(address);
739
740 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
741 let put_msg = ChunkMessage {
742 request_id: 40,
743 body: ChunkMessageBody::PutRequest(put_request),
744 };
745 let put_bytes = put_msg.encode().expect("encode put");
746
747 let _ = protocol
748 .try_handle_request(&put_bytes)
749 .await
750 .expect("handle put");
751
752 let response_bytes = protocol
754 .try_handle_request(&put_bytes)
755 .await
756 .expect("handle put 2")
757 .expect("expected response");
758 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
759
760 assert_eq!(response.request_id, 40);
761 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
762 response.body
763 {
764 assert_eq!(addr, address);
765 } else {
766 panic!("expected AlreadyExists");
767 }
768 }
769
770 #[tokio::test]
771 async fn test_protocol_id() {
772 let (protocol, _temp) = create_test_protocol().await;
773 assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
774 }
775
776 #[tokio::test]
777 async fn test_exists_and_local_access() {
778 let (protocol, _temp) = create_test_protocol().await;
779
780 let content = b"local access test";
781 let address = LmdbStorage::compute_address(content);
782
783 assert!(!protocol.exists(&address).expect("exists check"));
784
785 protocol
786 .put_local(&address, content)
787 .await
788 .expect("put local");
789
790 assert!(protocol.exists(&address).expect("exists check"));
791
792 let retrieved = protocol.get_local(&address).await.expect("get local");
793 assert_eq!(retrieved, Some(content.to_vec()));
794 }
795
796 #[tokio::test]
797 async fn test_cache_insert_is_visible() {
798 let (protocol, _temp) = create_test_protocol().await;
799
800 let content = b"cache test content";
801 let address = LmdbStorage::compute_address(content);
802
803 let stats_before = protocol.payment_cache_stats();
805 assert_eq!(stats_before.additions, 0);
806
807 protocol.payment_verifier().cache_insert(address);
809
810 let stats_after = protocol.payment_cache_stats();
812 assert_eq!(stats_after.additions, 1);
813
814 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
816 let put_msg = ChunkMessage {
817 request_id: 100,
818 body: ChunkMessageBody::PutRequest(put_request),
819 };
820 let put_bytes = put_msg.encode().expect("encode put");
821 let response_bytes = protocol
822 .try_handle_request(&put_bytes)
823 .await
824 .expect("handle put")
825 .expect("expected response");
826 let response = ChunkMessage::decode(&response_bytes).expect("decode");
827
828 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
829 } else {
831 panic!("expected success, got: {response:?}");
832 }
833 }
834
835 #[tokio::test]
836 async fn test_put_same_chunk_twice_hits_cache() {
837 let (protocol, _temp) = create_test_protocol().await;
838
839 let content = b"duplicate cache test";
840 let address = LmdbStorage::compute_address(content);
841
842 protocol.payment_verifier().cache_insert(address);
844
845 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
847 let put_msg = ChunkMessage {
848 request_id: 110,
849 body: ChunkMessageBody::PutRequest(put_request),
850 };
851 let put_bytes = put_msg.encode().expect("encode put");
852 let _ = protocol
853 .try_handle_request(&put_bytes)
854 .await
855 .expect("handle put 1");
856
857 let response_bytes = protocol
859 .try_handle_request(&put_bytes)
860 .await
861 .expect("handle put 2")
862 .expect("expected response");
863 let response = ChunkMessage::decode(&response_bytes).expect("decode");
864
865 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
866 {
867 } else {
869 panic!("expected AlreadyExists, got: {response:?}");
870 }
871 }
872
873 #[tokio::test]
874 async fn test_payment_cache_stats_returns_correct_values() {
875 let (protocol, _temp) = create_test_protocol().await;
876
877 let stats = protocol.payment_cache_stats();
878 assert_eq!(stats.hits, 0);
879 assert_eq!(stats.misses, 0);
880 assert_eq!(stats.additions, 0);
881
882 let content = b"stats test";
884 let address = LmdbStorage::compute_address(content);
885 protocol.payment_verifier().cache_insert(address);
886
887 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
888 let put_msg = ChunkMessage {
889 request_id: 120,
890 body: ChunkMessageBody::PutRequest(put_request),
891 };
892 let put_bytes = put_msg.encode().expect("encode put");
893 let _ = protocol
894 .try_handle_request(&put_bytes)
895 .await
896 .expect("handle put");
897
898 let stats = protocol.payment_cache_stats();
899 assert_eq!(stats.additions, 1);
901 assert_eq!(stats.hits, 1);
902 }
903
904 #[tokio::test]
905 async fn test_storage_stats() {
906 let (protocol, _temp) = create_test_protocol().await;
907 let stats = protocol.storage_stats();
908 assert_eq!(stats.chunks_stored, 0);
909 }
910
911 #[tokio::test]
912 async fn test_merkle_candidate_quote_request() {
913 use ant_protocol::payment::verify::verify_merkle_candidate_signature;
914 use evmlib::merkle_payments::MerklePaymentCandidateNode;
915
916 let (protocol, _temp) = create_test_protocol().await;
918
919 let address = [0x77; 32];
920 let timestamp = std::time::SystemTime::now()
921 .duration_since(std::time::UNIX_EPOCH)
922 .expect("system time")
923 .as_secs();
924
925 let request = MerkleCandidateQuoteRequest {
926 address,
927 data_type: DATA_TYPE_CHUNK,
928 data_size: 4096,
929 merkle_payment_timestamp: timestamp,
930 };
931 let msg = ChunkMessage {
932 request_id: 600,
933 body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
934 };
935 let msg_bytes = msg.encode().expect("encode request");
936
937 let response_bytes = protocol
938 .try_handle_request(&msg_bytes)
939 .await
940 .expect("handle merkle candidate quote")
941 .expect("expected response");
942 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
943
944 assert_eq!(response.request_id, 600);
945 match response.body {
946 ChunkMessageBody::MerkleCandidateQuoteResponse(
947 MerkleCandidateQuoteResponse::Success { candidate_node },
948 ) => {
949 let candidate: MerklePaymentCandidateNode =
950 rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
951
952 assert!(
954 verify_merkle_candidate_signature(&candidate),
955 "ML-DSA-65 candidate signature must be valid"
956 );
957
958 assert_eq!(candidate.merkle_payment_timestamp, timestamp);
959 assert!(candidate.price >= evmlib::common::Amount::ZERO);
961 }
962 other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
963 }
964 }
965
966 #[tokio::test]
967 async fn test_handle_unexpected_response_message() {
968 let (protocol, _temp) = create_test_protocol().await;
969
970 let msg = ChunkMessage {
972 request_id: 200,
973 body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
974 };
975 let msg_bytes = msg.encode().expect("encode");
976
977 let result = protocol
978 .try_handle_request(&msg_bytes)
979 .await
980 .expect("handle msg");
981
982 assert!(
983 result.is_none(),
984 "expected None for response message, got: {result:?}"
985 );
986 }
987
988 #[tokio::test]
989 async fn test_quote_already_stored_flag() {
990 let (protocol, _temp) = create_test_protocol().await;
991
992 let content = b"already stored quote test";
993 let address = LmdbStorage::compute_address(content);
994
995 protocol.payment_verifier().cache_insert(address);
997 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
998 let put_msg = ChunkMessage {
999 request_id: 300,
1000 body: ChunkMessageBody::PutRequest(put_request),
1001 };
1002 let put_bytes = put_msg.encode().expect("encode put");
1003 let _ = protocol
1004 .try_handle_request(&put_bytes)
1005 .await
1006 .expect("handle put");
1007
1008 let quote_request = ChunkQuoteRequest {
1010 address,
1011 data_size: content.len() as u64,
1012 data_type: DATA_TYPE_CHUNK,
1013 };
1014 let quote_msg = ChunkMessage {
1015 request_id: 301,
1016 body: ChunkMessageBody::QuoteRequest(quote_request),
1017 };
1018 let quote_bytes = quote_msg.encode().expect("encode quote");
1019 let response_bytes = protocol
1020 .try_handle_request("e_bytes)
1021 .await
1022 .expect("handle quote")
1023 .expect("expected response");
1024 let response = ChunkMessage::decode(&response_bytes).expect("decode");
1025
1026 match response.body {
1027 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1028 already_stored, ..
1029 }) => {
1030 assert!(
1031 already_stored,
1032 "already_stored should be true for existing chunk"
1033 );
1034 }
1035 other => panic!("expected Success with already_stored, got: {other:?}"),
1036 }
1037
1038 let new_address = [0xFFu8; 32];
1040 let quote_request2 = ChunkQuoteRequest {
1041 address: new_address,
1042 data_size: 100,
1043 data_type: DATA_TYPE_CHUNK,
1044 };
1045 let quote_msg2 = ChunkMessage {
1046 request_id: 302,
1047 body: ChunkMessageBody::QuoteRequest(quote_request2),
1048 };
1049 let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
1050 let response_bytes2 = protocol
1051 .try_handle_request("e_bytes2)
1052 .await
1053 .expect("handle quote2")
1054 .expect("expected response");
1055 let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
1056
1057 match response2.body {
1058 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1059 already_stored, ..
1060 }) => {
1061 assert!(
1062 !already_stored,
1063 "already_stored should be false for new chunk"
1064 );
1065 }
1066 other => panic!("expected Success with already_stored=false, got: {other:?}"),
1067 }
1068 }
1069}