1#[cfg(test)]
31use crate::ant_protocol::DATA_TYPE_CHUNK;
32use crate::ant_protocol::{
33 ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
34 ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
35 MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE,
36};
37use crate::client::compute_address;
38use crate::error::{Error, Result};
39use crate::logging::{debug, info, warn};
40use crate::payment::{PaymentVerifier, QuoteGenerator, VerificationContext};
41use crate::replication::fresh::FreshWriteEvent;
42use crate::storage::lmdb::LmdbStorage;
43use bytes::Bytes;
44use saorsa_core::P2PNode;
45use std::sync::Arc;
46use tokio::sync::mpsc;
47
48pub struct AntProtocol {
53 storage: Arc<LmdbStorage>,
55 payment_verifier: Arc<PaymentVerifier>,
57 quote_generator: Arc<QuoteGenerator>,
60 fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
62}
63
64impl AntProtocol {
65 #[must_use]
73 pub fn new(
74 storage: Arc<LmdbStorage>,
75 payment_verifier: Arc<PaymentVerifier>,
76 quote_generator: Arc<QuoteGenerator>,
77 ) -> Self {
78 payment_verifier.attach_storage(Arc::clone(&storage));
86 quote_generator.attach_storage(Arc::clone(&storage));
87
88 Self {
89 storage,
90 payment_verifier,
91 quote_generator,
92 fresh_write_tx: None,
93 }
94 }
95
96 pub fn attach_p2p_node(&self, node: Arc<P2PNode>) {
102 self.payment_verifier.attach_p2p_node(node);
103 debug!("AntProtocol: P2PNode attached for payment live-DHT checks");
104 }
105
106 pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
111 self.fresh_write_tx = Some(tx);
112 }
113
114 #[must_use]
116 pub fn protocol_id(&self) -> &'static str {
117 CHUNK_PROTOCOL_ID
118 }
119
120 #[must_use]
122 pub fn storage(&self) -> Arc<LmdbStorage> {
123 Arc::clone(&self.storage)
124 }
125
126 #[must_use]
128 pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
129 Arc::clone(&self.payment_verifier)
130 }
131
132 pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
143 let message = ChunkMessage::decode(data)
144 .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
145
146 let request_id = message.request_id;
147
148 let response_body = match message.body {
149 ChunkMessageBody::PutRequest(req) => {
150 ChunkMessageBody::PutResponse(self.handle_put(req).await)
151 }
152 ChunkMessageBody::GetRequest(req) => {
153 ChunkMessageBody::GetResponse(self.handle_get(req).await)
154 }
155 ChunkMessageBody::QuoteRequest(ref req) => {
156 ChunkMessageBody::QuoteResponse(self.handle_quote(req))
157 }
158 ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
159 ChunkMessageBody::MerkleCandidateQuoteResponse(
160 self.handle_merkle_candidate_quote(req),
161 )
162 }
163 _ => return Ok(None),
175 };
176
177 let response = ChunkMessage {
178 request_id,
179 body: response_body,
180 };
181
182 response
183 .encode()
184 .map(|b| Some(Bytes::from(b)))
185 .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
186 }
187
188 async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
196 let start = std::time::Instant::now();
197 let addr_hex = hex::encode(request.address);
198 let chunk_size = request.content.len();
199 let response = self.handle_put_inner(request).await;
200 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
201 let outcome: &'static str = match &response {
202 ChunkPutResponse::Success { .. } => "success",
203 ChunkPutResponse::AlreadyExists { .. } => "already_exists",
204 ChunkPutResponse::PaymentRequired { .. } => "payment_required",
205 ChunkPutResponse::Error(_) => "error",
206 _ => "unknown",
207 };
208 info!(
209 target: "ant_node::storage::rpc_latency",
210 duration_ms,
211 chunk_size,
212 outcome,
213 addr = %addr_hex,
214 "put_rpc"
215 );
216 response
217 }
218
219 async fn handle_put_inner(&self, request: ChunkPutRequest) -> ChunkPutResponse {
221 let address = request.address;
222 let addr_hex = hex::encode(address);
223 debug!("Handling PUT request for {addr_hex}");
224
225 if request.content.len() > MAX_CHUNK_SIZE {
227 return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
228 size: request.content.len(),
229 max_size: MAX_CHUNK_SIZE,
230 });
231 }
232
233 let computed = compute_address(&request.content);
235 if computed != address {
236 return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
237 expected: address,
238 actual: computed,
239 });
240 }
241
242 match self.storage.exists(&address) {
244 Ok(true) => {
245 debug!("Chunk {addr_hex} already exists");
246 return ChunkPutResponse::AlreadyExists { address };
247 }
248 Err(e) => {
249 return ChunkPutResponse::Error(ProtocolError::Internal(format!(
250 "Storage read failed: {e}"
251 )));
252 }
253 Ok(false) => {}
254 }
255
256 if let Err(e) = self.storage.check_capacity() {
267 info!(
268 target: "ant_node::storage::disk_precheck",
269 addr = %addr_hex,
270 "Rejecting PUT before payment verification: {e}"
271 );
272 return ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()));
273 }
274
275 let payment_result = self
279 .payment_verifier
280 .verify_payment(
281 &address,
282 request.payment_proof.as_deref(),
283 VerificationContext::ClientPut,
284 )
285 .await;
286
287 match payment_result {
288 Ok(status) if status.can_store() => {
289 }
291 Ok(_) => {
292 return ChunkPutResponse::PaymentRequired {
293 message: "Payment required for new chunk".to_string(),
294 };
295 }
296 Err(e) => {
297 return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
298 }
299 }
300
301 match self.storage.put(&address, &request.content).await {
303 Ok(_) => {
304 let content_len = request.content.len();
305 info!("Stored chunk {addr_hex} ({content_len} bytes)");
306 self.quote_generator.record_store();
312
313 if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
318 let event = FreshWriteEvent {
324 key: address,
325 data: request.content.to_vec(),
326 payment_proof: proof,
327 };
328 if tx.send(event).is_err() {
329 debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
330 }
331 }
332
333 ChunkPutResponse::Success { address }
334 }
335 Err(e) => {
336 warn!("Failed to store chunk {addr_hex}: {e}");
337 ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
338 }
339 }
340 }
341
342 async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
347 let start = std::time::Instant::now();
348 let addr_hex = hex::encode(request.address);
349 let response = self.handle_get_inner(request).await;
350 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
351 let outcome: &'static str = match &response {
352 ChunkGetResponse::Success { .. } => "success",
353 ChunkGetResponse::NotFound { .. } => "not_found",
354 ChunkGetResponse::Error(_) => "error",
355 _ => "unknown",
356 };
357 info!(
358 target: "ant_node::storage::rpc_latency",
359 duration_ms,
360 outcome,
361 addr = %addr_hex,
362 "get_rpc"
363 );
364 response
365 }
366
367 async fn handle_get_inner(&self, request: ChunkGetRequest) -> ChunkGetResponse {
369 let address = request.address;
370 let addr_hex = hex::encode(address);
371 debug!("Handling GET request for {addr_hex}");
372
373 match self.storage.get(&address).await {
374 Ok(Some(content)) => {
375 let content_len = content.len();
376 debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
377 ChunkGetResponse::Success { address, content }
378 }
379 Ok(None) => {
380 debug!("Chunk {addr_hex} not found");
381 ChunkGetResponse::NotFound { address }
382 }
383 Err(e) => {
384 warn!("Failed to retrieve chunk {addr_hex}: {e}");
385 ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
386 }
387 }
388 }
389
390 fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
392 let addr_hex = hex::encode(request.address);
393 let data_size = request.data_size;
394 debug!("Handling quote request for {addr_hex} (size: {data_size})");
395
396 #[allow(clippy::manual_unwrap_or_default)]
402 let already_stored = match self.storage.exists(&request.address) {
403 Ok(exists) => exists,
404 Err(e) => {
405 warn!("Storage check failed for {addr_hex}: {e}");
406 false }
408 };
409
410 if already_stored {
411 debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
412 }
413
414 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
416 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
417 size: MAX_CHUNK_SIZE + 1,
418 max_size: MAX_CHUNK_SIZE,
419 });
420 };
421 if data_size_usize > MAX_CHUNK_SIZE {
422 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
423 size: data_size_usize,
424 max_size: MAX_CHUNK_SIZE,
425 });
426 }
427
428 match self
429 .quote_generator
430 .create_quote(request.address, data_size_usize, request.data_type)
431 {
432 Ok(quote) => {
433 match rmp_serde::to_vec("e) {
435 Ok(quote_bytes) => ChunkQuoteResponse::Success {
436 quote: quote_bytes,
437 already_stored,
438 },
439 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
440 "Failed to serialize quote: {e}"
441 ))),
442 }
443 }
444 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
445 }
446 }
447
448 fn handle_merkle_candidate_quote(
450 &self,
451 request: &MerkleCandidateQuoteRequest,
452 ) -> MerkleCandidateQuoteResponse {
453 let addr_hex = hex::encode(request.address);
454 let data_size = request.data_size;
455 debug!(
456 "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
457 request.merkle_payment_timestamp
458 );
459
460 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
461 return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
462 "data_size {} overflows usize",
463 request.data_size
464 )));
465 };
466 if data_size_usize > MAX_CHUNK_SIZE {
467 return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
468 size: data_size_usize,
469 max_size: MAX_CHUNK_SIZE,
470 });
471 }
472
473 match self.quote_generator.create_merkle_candidate_quote(
474 data_size_usize,
475 request.data_type,
476 request.merkle_payment_timestamp,
477 ) {
478 Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
479 Ok(bytes) => MerkleCandidateQuoteResponse::Success {
480 candidate_node: bytes,
481 },
482 Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
483 "Failed to serialize merkle candidate node: {e}"
484 ))),
485 },
486 Err(e) => {
487 MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
488 }
489 }
490 }
491
492 #[must_use]
494 pub fn storage_stats(&self) -> crate::storage::StorageStats {
495 self.storage.stats()
496 }
497
498 #[must_use]
500 pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
501 self.payment_verifier.cache_stats()
502 }
503
504 #[cfg(any(test, feature = "test-utils"))]
510 #[must_use]
511 pub fn payment_verifier(&self) -> &PaymentVerifier {
512 &self.payment_verifier
513 }
514
515 pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
521 self.storage.exists(address)
522 }
523
524 pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
530 self.storage.get(address).await
531 }
532
533 #[cfg(test)]
541 pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
542 self.storage.put(address, content).await
543 }
544}
545
546#[cfg(test)]
547#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
548mod tests {
549 use super::*;
550 use crate::payment::metrics::QuotingMetricsTracker;
551 use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
552 use crate::storage::LmdbStorageConfig;
553 use evmlib::RewardsAddress;
554 use saorsa_core::identity::NodeIdentity;
555 use saorsa_core::MlDsa65;
556 use saorsa_pqc::pqc::types::MlDsaSecretKey;
557 use tempfile::TempDir;
558
559 async fn create_test_protocol() -> (AntProtocol, TempDir) {
560 create_test_protocol_with_reserve(0).await
563 }
564
565 async fn create_test_protocol_with_reserve(disk_reserve: u64) -> (AntProtocol, TempDir) {
571 let temp_dir = TempDir::new().expect("create temp dir");
572
573 let storage_config = LmdbStorageConfig {
574 root_dir: temp_dir.path().to_path_buf(),
575 disk_reserve,
576 ..LmdbStorageConfig::test_default()
577 };
578 let storage = Arc::new(
579 LmdbStorage::new(storage_config)
580 .await
581 .expect("create storage"),
582 );
583
584 let rewards_address = RewardsAddress::new([1u8; 20]);
585 let payment_config = PaymentVerifierConfig {
586 evm: EvmVerifierConfig::default(),
587 cache_capacity: 100_000,
588 close_group_size: crate::ant_protocol::CLOSE_GROUP_SIZE,
589 local_rewards_address: rewards_address,
590 };
591 let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
592 let metrics_tracker = QuotingMetricsTracker::new(100);
593 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
594
595 let identity = NodeIdentity::generate().expect("generate identity");
597 let pub_key_bytes = identity.public_key().as_bytes().to_vec();
598 let sk_bytes = identity.secret_key_bytes().to_vec();
599 let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
600 quote_generator.set_signer(pub_key_bytes, move |msg| {
601 use saorsa_pqc::pqc::MlDsaOperations;
602 let ml_dsa = MlDsa65::new();
603 ml_dsa
604 .sign(&sk, msg)
605 .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
606 });
607
608 let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
609 (protocol, temp_dir)
610 }
611
612 #[tokio::test]
613 async fn test_put_and_get_chunk() {
614 let (protocol, _temp) = create_test_protocol().await;
615
616 let content = b"hello world";
617 let address = LmdbStorage::compute_address(content);
618
619 protocol.payment_verifier().cache_insert(address);
621
622 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
623 let put_msg = ChunkMessage {
624 request_id: 1,
625 body: ChunkMessageBody::PutRequest(put_request),
626 };
627 let put_bytes = put_msg.encode().expect("encode put");
628
629 let response_bytes = protocol
631 .try_handle_request(&put_bytes)
632 .await
633 .expect("handle put")
634 .expect("expected response");
635 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
636
637 assert_eq!(response.request_id, 1);
638 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
639 response.body
640 {
641 assert_eq!(addr, address);
642 } else {
643 panic!("expected PutResponse::Success, got: {response:?}");
644 }
645
646 let get_request = ChunkGetRequest::new(address);
648 let get_msg = ChunkMessage {
649 request_id: 2,
650 body: ChunkMessageBody::GetRequest(get_request),
651 };
652 let get_bytes = get_msg.encode().expect("encode get");
653
654 let response_bytes = protocol
656 .try_handle_request(&get_bytes)
657 .await
658 .expect("handle get")
659 .expect("expected response");
660 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
661
662 assert_eq!(response.request_id, 2);
663 if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
664 address: addr,
665 content: data,
666 }) = response.body
667 {
668 assert_eq!(addr, address);
669 assert_eq!(data, content.to_vec());
670 } else {
671 panic!("expected GetResponse::Success");
672 }
673 }
674
675 #[tokio::test]
676 async fn test_get_not_found() {
677 let (protocol, _temp) = create_test_protocol().await;
678
679 let address = [0xAB; 32];
680 let get_request = ChunkGetRequest::new(address);
681 let get_msg = ChunkMessage {
682 request_id: 10,
683 body: ChunkMessageBody::GetRequest(get_request),
684 };
685 let get_bytes = get_msg.encode().expect("encode get");
686
687 let response_bytes = protocol
688 .try_handle_request(&get_bytes)
689 .await
690 .expect("handle get")
691 .expect("expected response");
692 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
693
694 assert_eq!(response.request_id, 10);
695 if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
696 response.body
697 {
698 assert_eq!(addr, address);
699 } else {
700 panic!("expected GetResponse::NotFound");
701 }
702 }
703
704 #[tokio::test]
705 async fn test_put_address_mismatch() {
706 let (protocol, _temp) = create_test_protocol().await;
707
708 let content = b"test content";
709 let wrong_address = [0xFF; 32]; protocol.payment_verifier().cache_insert(wrong_address);
713
714 let put_request = ChunkPutRequest::new(wrong_address, Bytes::copy_from_slice(content));
715 let put_msg = ChunkMessage {
716 request_id: 20,
717 body: ChunkMessageBody::PutRequest(put_request),
718 };
719 let put_bytes = put_msg.encode().expect("encode put");
720
721 let response_bytes = protocol
722 .try_handle_request(&put_bytes)
723 .await
724 .expect("handle put")
725 .expect("expected response");
726 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
727
728 assert_eq!(response.request_id, 20);
729 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
730 ProtocolError::AddressMismatch { .. },
731 )) = response.body
732 {
733 } else {
735 panic!("expected AddressMismatch error, got: {response:?}");
736 }
737 }
738
739 #[tokio::test]
740 async fn test_put_chunk_too_large() {
741 let (protocol, _temp) = create_test_protocol().await;
742
743 let content = vec![0u8; MAX_CHUNK_SIZE + 1];
745 let address = LmdbStorage::compute_address(&content);
746
747 let put_request = ChunkPutRequest::new(address, Bytes::from(content));
748 let put_msg = ChunkMessage {
749 request_id: 30,
750 body: ChunkMessageBody::PutRequest(put_request),
751 };
752 let put_bytes = put_msg.encode().expect("encode put");
753
754 let response_bytes = protocol
755 .try_handle_request(&put_bytes)
756 .await
757 .expect("handle put")
758 .expect("expected response");
759 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
760
761 assert_eq!(response.request_id, 30);
762 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
763 ProtocolError::ChunkTooLarge { .. },
764 )) = response.body
765 {
766 } else {
768 panic!("expected ChunkTooLarge error");
769 }
770 }
771
772 #[tokio::test]
781 async fn test_put_rejected_on_insufficient_disk_before_verification() {
782 let (protocol, _temp) = create_test_protocol_with_reserve(u64::MAX).await;
785
786 let content = b"chunk for a disk-full node";
787 let address = LmdbStorage::compute_address(content);
788
789 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
790 let put_msg = ChunkMessage {
791 request_id: 41,
792 body: ChunkMessageBody::PutRequest(put_request),
793 };
794 let put_bytes = put_msg.encode().expect("encode put");
795
796 let response_bytes = protocol
797 .try_handle_request(&put_bytes)
798 .await
799 .expect("handle put")
800 .expect("expected response");
801 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
802
803 assert_eq!(response.request_id, 41);
804 match response.body {
805 ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
806 ProtocolError::StorageFailed(msg),
807 )) => {
808 assert!(
809 msg.contains("Insufficient disk space"),
810 "expected disk-space error, got: {msg}"
811 );
812 }
813 other => {
814 panic!("expected StorageFailed disk error before verification, got: {other:?}")
815 }
816 }
817
818 assert!(!protocol.exists(&address).expect("exists check"));
820 }
821
822 #[tokio::test]
823 async fn test_put_already_exists() {
824 let (protocol, _temp) = create_test_protocol().await;
825
826 let content = b"duplicate content";
827 let address = LmdbStorage::compute_address(content);
828
829 protocol.payment_verifier().cache_insert(address);
831
832 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
833 let put_msg = ChunkMessage {
834 request_id: 40,
835 body: ChunkMessageBody::PutRequest(put_request),
836 };
837 let put_bytes = put_msg.encode().expect("encode put");
838
839 let _ = protocol
840 .try_handle_request(&put_bytes)
841 .await
842 .expect("handle put");
843
844 let response_bytes = protocol
846 .try_handle_request(&put_bytes)
847 .await
848 .expect("handle put 2")
849 .expect("expected response");
850 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
851
852 assert_eq!(response.request_id, 40);
853 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
854 response.body
855 {
856 assert_eq!(addr, address);
857 } else {
858 panic!("expected AlreadyExists");
859 }
860 }
861
862 #[tokio::test]
863 async fn test_protocol_id() {
864 let (protocol, _temp) = create_test_protocol().await;
865 assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
866 }
867
868 #[tokio::test]
869 async fn test_exists_and_local_access() {
870 let (protocol, _temp) = create_test_protocol().await;
871
872 let content = b"local access test";
873 let address = LmdbStorage::compute_address(content);
874
875 assert!(!protocol.exists(&address).expect("exists check"));
876
877 protocol
878 .put_local(&address, content)
879 .await
880 .expect("put local");
881
882 assert!(protocol.exists(&address).expect("exists check"));
883
884 let retrieved = protocol.get_local(&address).await.expect("get local");
885 assert_eq!(retrieved, Some(content.to_vec()));
886 }
887
888 #[tokio::test]
889 async fn test_cache_insert_is_visible() {
890 let (protocol, _temp) = create_test_protocol().await;
891
892 let content = b"cache test content";
893 let address = LmdbStorage::compute_address(content);
894
895 let stats_before = protocol.payment_cache_stats();
897 assert_eq!(stats_before.additions, 0);
898
899 protocol.payment_verifier().cache_insert(address);
901
902 let stats_after = protocol.payment_cache_stats();
904 assert_eq!(stats_after.additions, 1);
905
906 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
908 let put_msg = ChunkMessage {
909 request_id: 100,
910 body: ChunkMessageBody::PutRequest(put_request),
911 };
912 let put_bytes = put_msg.encode().expect("encode put");
913 let response_bytes = protocol
914 .try_handle_request(&put_bytes)
915 .await
916 .expect("handle put")
917 .expect("expected response");
918 let response = ChunkMessage::decode(&response_bytes).expect("decode");
919
920 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
921 } else {
923 panic!("expected success, got: {response:?}");
924 }
925 }
926
927 #[tokio::test]
928 async fn test_put_same_chunk_twice_hits_cache() {
929 let (protocol, _temp) = create_test_protocol().await;
930
931 let content = b"duplicate cache test";
932 let address = LmdbStorage::compute_address(content);
933
934 protocol.payment_verifier().cache_insert(address);
936
937 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
939 let put_msg = ChunkMessage {
940 request_id: 110,
941 body: ChunkMessageBody::PutRequest(put_request),
942 };
943 let put_bytes = put_msg.encode().expect("encode put");
944 let _ = protocol
945 .try_handle_request(&put_bytes)
946 .await
947 .expect("handle put 1");
948
949 let response_bytes = protocol
951 .try_handle_request(&put_bytes)
952 .await
953 .expect("handle put 2")
954 .expect("expected response");
955 let response = ChunkMessage::decode(&response_bytes).expect("decode");
956
957 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
958 {
959 } else {
961 panic!("expected AlreadyExists, got: {response:?}");
962 }
963 }
964
965 #[tokio::test]
966 async fn test_payment_cache_stats_returns_correct_values() {
967 let (protocol, _temp) = create_test_protocol().await;
968
969 let stats = protocol.payment_cache_stats();
970 assert_eq!(stats.hits, 0);
971 assert_eq!(stats.misses, 0);
972 assert_eq!(stats.additions, 0);
973
974 let content = b"stats test";
976 let address = LmdbStorage::compute_address(content);
977 protocol.payment_verifier().cache_insert(address);
978
979 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
980 let put_msg = ChunkMessage {
981 request_id: 120,
982 body: ChunkMessageBody::PutRequest(put_request),
983 };
984 let put_bytes = put_msg.encode().expect("encode put");
985 let _ = protocol
986 .try_handle_request(&put_bytes)
987 .await
988 .expect("handle put");
989
990 let stats = protocol.payment_cache_stats();
991 assert_eq!(stats.additions, 1);
993 assert_eq!(stats.hits, 1);
994 }
995
996 #[tokio::test]
997 async fn test_storage_stats() {
998 let (protocol, _temp) = create_test_protocol().await;
999 let stats = protocol.storage_stats();
1000 assert_eq!(stats.chunks_stored, 0);
1001 }
1002
1003 #[tokio::test]
1004 async fn test_merkle_candidate_quote_request() {
1005 use ant_protocol::payment::verify::verify_merkle_candidate_signature;
1006 use evmlib::merkle_payments::MerklePaymentCandidateNode;
1007
1008 let (protocol, _temp) = create_test_protocol().await;
1010
1011 let address = [0x77; 32];
1012 let timestamp = std::time::SystemTime::now()
1013 .duration_since(std::time::UNIX_EPOCH)
1014 .expect("system time")
1015 .as_secs();
1016
1017 let request = MerkleCandidateQuoteRequest {
1018 address,
1019 data_type: DATA_TYPE_CHUNK,
1020 data_size: 4096,
1021 merkle_payment_timestamp: timestamp,
1022 };
1023 let msg = ChunkMessage {
1024 request_id: 600,
1025 body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
1026 };
1027 let msg_bytes = msg.encode().expect("encode request");
1028
1029 let response_bytes = protocol
1030 .try_handle_request(&msg_bytes)
1031 .await
1032 .expect("handle merkle candidate quote")
1033 .expect("expected response");
1034 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
1035
1036 assert_eq!(response.request_id, 600);
1037 match response.body {
1038 ChunkMessageBody::MerkleCandidateQuoteResponse(
1039 MerkleCandidateQuoteResponse::Success { candidate_node },
1040 ) => {
1041 let candidate: MerklePaymentCandidateNode =
1042 rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
1043
1044 assert!(
1046 verify_merkle_candidate_signature(&candidate),
1047 "ML-DSA-65 candidate signature must be valid"
1048 );
1049
1050 assert_eq!(candidate.merkle_payment_timestamp, timestamp);
1051 assert!(candidate.price >= evmlib::common::Amount::ZERO);
1053 }
1054 other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
1055 }
1056 }
1057
1058 #[tokio::test]
1059 async fn test_handle_unexpected_response_message() {
1060 let (protocol, _temp) = create_test_protocol().await;
1061
1062 let msg = ChunkMessage {
1064 request_id: 200,
1065 body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
1066 };
1067 let msg_bytes = msg.encode().expect("encode");
1068
1069 let result = protocol
1070 .try_handle_request(&msg_bytes)
1071 .await
1072 .expect("handle msg");
1073
1074 assert!(
1075 result.is_none(),
1076 "expected None for response message, got: {result:?}"
1077 );
1078 }
1079
1080 #[tokio::test]
1081 async fn test_quote_already_stored_flag() {
1082 let (protocol, _temp) = create_test_protocol().await;
1083
1084 let content = b"already stored quote test";
1085 let address = LmdbStorage::compute_address(content);
1086
1087 protocol.payment_verifier().cache_insert(address);
1089 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
1090 let put_msg = ChunkMessage {
1091 request_id: 300,
1092 body: ChunkMessageBody::PutRequest(put_request),
1093 };
1094 let put_bytes = put_msg.encode().expect("encode put");
1095 let _ = protocol
1096 .try_handle_request(&put_bytes)
1097 .await
1098 .expect("handle put");
1099
1100 let quote_request = ChunkQuoteRequest {
1102 address,
1103 data_size: content.len() as u64,
1104 data_type: DATA_TYPE_CHUNK,
1105 };
1106 let quote_msg = ChunkMessage {
1107 request_id: 301,
1108 body: ChunkMessageBody::QuoteRequest(quote_request),
1109 };
1110 let quote_bytes = quote_msg.encode().expect("encode quote");
1111 let response_bytes = protocol
1112 .try_handle_request("e_bytes)
1113 .await
1114 .expect("handle quote")
1115 .expect("expected response");
1116 let response = ChunkMessage::decode(&response_bytes).expect("decode");
1117
1118 match response.body {
1119 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1120 already_stored, ..
1121 }) => {
1122 assert!(
1123 already_stored,
1124 "already_stored should be true for existing chunk"
1125 );
1126 }
1127 other => panic!("expected Success with already_stored, got: {other:?}"),
1128 }
1129
1130 let new_address = [0xFFu8; 32];
1132 let quote_request2 = ChunkQuoteRequest {
1133 address: new_address,
1134 data_size: 100,
1135 data_type: DATA_TYPE_CHUNK,
1136 };
1137 let quote_msg2 = ChunkMessage {
1138 request_id: 302,
1139 body: ChunkMessageBody::QuoteRequest(quote_request2),
1140 };
1141 let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
1142 let response_bytes2 = protocol
1143 .try_handle_request("e_bytes2)
1144 .await
1145 .expect("handle quote2")
1146 .expect("expected response");
1147 let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
1148
1149 match response2.body {
1150 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1151 already_stored, ..
1152 }) => {
1153 assert!(
1154 !already_stored,
1155 "already_stored should be false for new chunk"
1156 );
1157 }
1158 other => panic!("expected Success with already_stored=false, got: {other:?}"),
1159 }
1160 }
1161}