1#[cfg(test)]
31use crate::ant_protocol::DATA_TYPE_CHUNK;
32use crate::ant_protocol::{
33 ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
34 ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
35 MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE,
36};
37use crate::client::compute_address;
38use crate::error::{Error, Result};
39use crate::logging::{debug, info, warn};
40use crate::payment::{PaymentVerifier, QuoteGenerator};
41use crate::replication::fresh::FreshWriteEvent;
42use crate::storage::lmdb::LmdbStorage;
43use bytes::Bytes;
44use std::sync::Arc;
45use tokio::sync::mpsc;
46
47pub struct AntProtocol {
52 storage: Arc<LmdbStorage>,
54 payment_verifier: Arc<PaymentVerifier>,
56 quote_generator: Arc<QuoteGenerator>,
59 fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
61}
62
63impl AntProtocol {
64 #[must_use]
72 pub fn new(
73 storage: Arc<LmdbStorage>,
74 payment_verifier: Arc<PaymentVerifier>,
75 quote_generator: Arc<QuoteGenerator>,
76 ) -> Self {
77 payment_verifier.attach_storage(Arc::clone(&storage));
86 quote_generator.attach_storage(Arc::clone(&storage));
87
88 Self {
89 storage,
90 payment_verifier,
91 quote_generator,
92 fresh_write_tx: None,
93 }
94 }
95
96 pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
101 self.fresh_write_tx = Some(tx);
102 }
103
104 #[must_use]
106 pub fn protocol_id(&self) -> &'static str {
107 CHUNK_PROTOCOL_ID
108 }
109
110 #[must_use]
112 pub fn storage(&self) -> Arc<LmdbStorage> {
113 Arc::clone(&self.storage)
114 }
115
116 #[must_use]
118 pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
119 Arc::clone(&self.payment_verifier)
120 }
121
122 pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
133 let message = ChunkMessage::decode(data)
134 .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
135
136 let request_id = message.request_id;
137
138 let response_body = match message.body {
139 ChunkMessageBody::PutRequest(req) => {
140 ChunkMessageBody::PutResponse(self.handle_put(req).await)
141 }
142 ChunkMessageBody::GetRequest(req) => {
143 ChunkMessageBody::GetResponse(self.handle_get(req).await)
144 }
145 ChunkMessageBody::QuoteRequest(ref req) => {
146 ChunkMessageBody::QuoteResponse(self.handle_quote(req))
147 }
148 ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
149 ChunkMessageBody::MerkleCandidateQuoteResponse(
150 self.handle_merkle_candidate_quote(req),
151 )
152 }
153 _ => return Ok(None),
165 };
166
167 let response = ChunkMessage {
168 request_id,
169 body: response_body,
170 };
171
172 response
173 .encode()
174 .map(|b| Some(Bytes::from(b)))
175 .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
176 }
177
178 async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
186 let start = std::time::Instant::now();
187 let addr_hex = hex::encode(request.address);
188 let chunk_size = request.content.len();
189 let response = self.handle_put_inner(request).await;
190 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
191 let outcome: &'static str = match &response {
192 ChunkPutResponse::Success { .. } => "success",
193 ChunkPutResponse::AlreadyExists { .. } => "already_exists",
194 ChunkPutResponse::PaymentRequired { .. } => "payment_required",
195 ChunkPutResponse::Error(_) => "error",
196 _ => "unknown",
197 };
198 info!(
199 target: "ant_node::storage::rpc_latency",
200 duration_ms,
201 chunk_size,
202 outcome,
203 addr = %addr_hex,
204 "put_rpc"
205 );
206 response
207 }
208
209 async fn handle_put_inner(&self, request: ChunkPutRequest) -> ChunkPutResponse {
211 let address = request.address;
212 let addr_hex = hex::encode(address);
213 debug!("Handling PUT request for {addr_hex}");
214
215 if request.content.len() > MAX_CHUNK_SIZE {
217 return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
218 size: request.content.len(),
219 max_size: MAX_CHUNK_SIZE,
220 });
221 }
222
223 let computed = compute_address(&request.content);
225 if computed != address {
226 return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
227 expected: address,
228 actual: computed,
229 });
230 }
231
232 match self.storage.exists(&address) {
234 Ok(true) => {
235 debug!("Chunk {addr_hex} already exists");
236 return ChunkPutResponse::AlreadyExists { address };
237 }
238 Err(e) => {
239 return ChunkPutResponse::Error(ProtocolError::Internal(format!(
240 "Storage read failed: {e}"
241 )));
242 }
243 Ok(false) => {}
244 }
245
246 let payment_result = self
248 .payment_verifier
249 .verify_payment(&address, request.payment_proof.as_deref())
250 .await;
251
252 match payment_result {
253 Ok(status) if status.can_store() => {
254 }
256 Ok(_) => {
257 return ChunkPutResponse::PaymentRequired {
258 message: "Payment required for new chunk".to_string(),
259 };
260 }
261 Err(e) => {
262 return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
263 }
264 }
265
266 match self.storage.put(&address, &request.content).await {
268 Ok(_) => {
269 let content_len = request.content.len();
270 info!("Stored chunk {addr_hex} ({content_len} bytes)");
271 self.quote_generator.record_store();
277
278 if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
283 let event = FreshWriteEvent {
289 key: address,
290 data: request.content.to_vec(),
291 payment_proof: proof,
292 };
293 if tx.send(event).is_err() {
294 debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
295 }
296 }
297
298 ChunkPutResponse::Success { address }
299 }
300 Err(e) => {
301 warn!("Failed to store chunk {addr_hex}: {e}");
302 ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
303 }
304 }
305 }
306
307 async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
312 let start = std::time::Instant::now();
313 let addr_hex = hex::encode(request.address);
314 let response = self.handle_get_inner(request).await;
315 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
316 let outcome: &'static str = match &response {
317 ChunkGetResponse::Success { .. } => "success",
318 ChunkGetResponse::NotFound { .. } => "not_found",
319 ChunkGetResponse::Error(_) => "error",
320 _ => "unknown",
321 };
322 info!(
323 target: "ant_node::storage::rpc_latency",
324 duration_ms,
325 outcome,
326 addr = %addr_hex,
327 "get_rpc"
328 );
329 response
330 }
331
332 async fn handle_get_inner(&self, request: ChunkGetRequest) -> ChunkGetResponse {
334 let address = request.address;
335 let addr_hex = hex::encode(address);
336 debug!("Handling GET request for {addr_hex}");
337
338 match self.storage.get(&address).await {
339 Ok(Some(content)) => {
340 let content_len = content.len();
341 debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
342 ChunkGetResponse::Success { address, content }
343 }
344 Ok(None) => {
345 debug!("Chunk {addr_hex} not found");
346 ChunkGetResponse::NotFound { address }
347 }
348 Err(e) => {
349 warn!("Failed to retrieve chunk {addr_hex}: {e}");
350 ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
351 }
352 }
353 }
354
355 fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
357 let addr_hex = hex::encode(request.address);
358 let data_size = request.data_size;
359 debug!("Handling quote request for {addr_hex} (size: {data_size})");
360
361 #[allow(clippy::manual_unwrap_or_default)]
367 let already_stored = match self.storage.exists(&request.address) {
368 Ok(exists) => exists,
369 Err(e) => {
370 warn!("Storage check failed for {addr_hex}: {e}");
371 false }
373 };
374
375 if already_stored {
376 debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
377 }
378
379 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
381 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
382 size: MAX_CHUNK_SIZE + 1,
383 max_size: MAX_CHUNK_SIZE,
384 });
385 };
386 if data_size_usize > MAX_CHUNK_SIZE {
387 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
388 size: data_size_usize,
389 max_size: MAX_CHUNK_SIZE,
390 });
391 }
392
393 match self
394 .quote_generator
395 .create_quote(request.address, data_size_usize, request.data_type)
396 {
397 Ok(quote) => {
398 match rmp_serde::to_vec("e) {
400 Ok(quote_bytes) => ChunkQuoteResponse::Success {
401 quote: quote_bytes,
402 already_stored,
403 },
404 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
405 "Failed to serialize quote: {e}"
406 ))),
407 }
408 }
409 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
410 }
411 }
412
413 fn handle_merkle_candidate_quote(
415 &self,
416 request: &MerkleCandidateQuoteRequest,
417 ) -> MerkleCandidateQuoteResponse {
418 let addr_hex = hex::encode(request.address);
419 let data_size = request.data_size;
420 debug!(
421 "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
422 request.merkle_payment_timestamp
423 );
424
425 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
426 return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
427 "data_size {} overflows usize",
428 request.data_size
429 )));
430 };
431 if data_size_usize > MAX_CHUNK_SIZE {
432 return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
433 size: data_size_usize,
434 max_size: MAX_CHUNK_SIZE,
435 });
436 }
437
438 match self.quote_generator.create_merkle_candidate_quote(
439 data_size_usize,
440 request.data_type,
441 request.merkle_payment_timestamp,
442 ) {
443 Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
444 Ok(bytes) => MerkleCandidateQuoteResponse::Success {
445 candidate_node: bytes,
446 },
447 Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
448 "Failed to serialize merkle candidate node: {e}"
449 ))),
450 },
451 Err(e) => {
452 MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
453 }
454 }
455 }
456
457 #[must_use]
459 pub fn storage_stats(&self) -> crate::storage::StorageStats {
460 self.storage.stats()
461 }
462
463 #[must_use]
465 pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
466 self.payment_verifier.cache_stats()
467 }
468
469 #[cfg(any(test, feature = "test-utils"))]
475 #[must_use]
476 pub fn payment_verifier(&self) -> &PaymentVerifier {
477 &self.payment_verifier
478 }
479
480 pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
486 self.storage.exists(address)
487 }
488
489 pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
495 self.storage.get(address).await
496 }
497
498 #[cfg(test)]
506 pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
507 self.storage.put(address, content).await
508 }
509}
510
511#[cfg(test)]
512#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
513mod tests {
514 use super::*;
515 use crate::payment::metrics::QuotingMetricsTracker;
516 use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
517 use crate::storage::LmdbStorageConfig;
518 use evmlib::RewardsAddress;
519 use saorsa_core::identity::NodeIdentity;
520 use saorsa_core::MlDsa65;
521 use saorsa_pqc::pqc::types::MlDsaSecretKey;
522 use tempfile::TempDir;
523
524 async fn create_test_protocol() -> (AntProtocol, TempDir) {
525 let temp_dir = TempDir::new().expect("create temp dir");
526
527 let storage_config = LmdbStorageConfig {
528 root_dir: temp_dir.path().to_path_buf(),
529 ..LmdbStorageConfig::test_default()
530 };
531 let storage = Arc::new(
532 LmdbStorage::new(storage_config)
533 .await
534 .expect("create storage"),
535 );
536
537 let rewards_address = RewardsAddress::new([1u8; 20]);
538 let payment_config = PaymentVerifierConfig {
539 evm: EvmVerifierConfig::default(),
540 cache_capacity: 100_000,
541 local_rewards_address: rewards_address,
542 };
543 let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
544 let metrics_tracker = QuotingMetricsTracker::new(100);
545 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
546
547 let identity = NodeIdentity::generate().expect("generate identity");
549 let pub_key_bytes = identity.public_key().as_bytes().to_vec();
550 let sk_bytes = identity.secret_key_bytes().to_vec();
551 let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
552 quote_generator.set_signer(pub_key_bytes, move |msg| {
553 use saorsa_pqc::pqc::MlDsaOperations;
554 let ml_dsa = MlDsa65::new();
555 ml_dsa
556 .sign(&sk, msg)
557 .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
558 });
559
560 let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
561 (protocol, temp_dir)
562 }
563
564 #[tokio::test]
565 async fn test_put_and_get_chunk() {
566 let (protocol, _temp) = create_test_protocol().await;
567
568 let content = b"hello world";
569 let address = LmdbStorage::compute_address(content);
570
571 protocol.payment_verifier().cache_insert(address);
573
574 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
575 let put_msg = ChunkMessage {
576 request_id: 1,
577 body: ChunkMessageBody::PutRequest(put_request),
578 };
579 let put_bytes = put_msg.encode().expect("encode put");
580
581 let response_bytes = protocol
583 .try_handle_request(&put_bytes)
584 .await
585 .expect("handle put")
586 .expect("expected response");
587 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
588
589 assert_eq!(response.request_id, 1);
590 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
591 response.body
592 {
593 assert_eq!(addr, address);
594 } else {
595 panic!("expected PutResponse::Success, got: {response:?}");
596 }
597
598 let get_request = ChunkGetRequest::new(address);
600 let get_msg = ChunkMessage {
601 request_id: 2,
602 body: ChunkMessageBody::GetRequest(get_request),
603 };
604 let get_bytes = get_msg.encode().expect("encode get");
605
606 let response_bytes = protocol
608 .try_handle_request(&get_bytes)
609 .await
610 .expect("handle get")
611 .expect("expected response");
612 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
613
614 assert_eq!(response.request_id, 2);
615 if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
616 address: addr,
617 content: data,
618 }) = response.body
619 {
620 assert_eq!(addr, address);
621 assert_eq!(data, content.to_vec());
622 } else {
623 panic!("expected GetResponse::Success");
624 }
625 }
626
627 #[tokio::test]
628 async fn test_get_not_found() {
629 let (protocol, _temp) = create_test_protocol().await;
630
631 let address = [0xAB; 32];
632 let get_request = ChunkGetRequest::new(address);
633 let get_msg = ChunkMessage {
634 request_id: 10,
635 body: ChunkMessageBody::GetRequest(get_request),
636 };
637 let get_bytes = get_msg.encode().expect("encode get");
638
639 let response_bytes = protocol
640 .try_handle_request(&get_bytes)
641 .await
642 .expect("handle get")
643 .expect("expected response");
644 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
645
646 assert_eq!(response.request_id, 10);
647 if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
648 response.body
649 {
650 assert_eq!(addr, address);
651 } else {
652 panic!("expected GetResponse::NotFound");
653 }
654 }
655
656 #[tokio::test]
657 async fn test_put_address_mismatch() {
658 let (protocol, _temp) = create_test_protocol().await;
659
660 let content = b"test content";
661 let wrong_address = [0xFF; 32]; protocol.payment_verifier().cache_insert(wrong_address);
665
666 let put_request = ChunkPutRequest::new(wrong_address, Bytes::copy_from_slice(content));
667 let put_msg = ChunkMessage {
668 request_id: 20,
669 body: ChunkMessageBody::PutRequest(put_request),
670 };
671 let put_bytes = put_msg.encode().expect("encode put");
672
673 let response_bytes = protocol
674 .try_handle_request(&put_bytes)
675 .await
676 .expect("handle put")
677 .expect("expected response");
678 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
679
680 assert_eq!(response.request_id, 20);
681 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
682 ProtocolError::AddressMismatch { .. },
683 )) = response.body
684 {
685 } else {
687 panic!("expected AddressMismatch error, got: {response:?}");
688 }
689 }
690
691 #[tokio::test]
692 async fn test_put_chunk_too_large() {
693 let (protocol, _temp) = create_test_protocol().await;
694
695 let content = vec![0u8; MAX_CHUNK_SIZE + 1];
697 let address = LmdbStorage::compute_address(&content);
698
699 let put_request = ChunkPutRequest::new(address, Bytes::from(content));
700 let put_msg = ChunkMessage {
701 request_id: 30,
702 body: ChunkMessageBody::PutRequest(put_request),
703 };
704 let put_bytes = put_msg.encode().expect("encode put");
705
706 let response_bytes = protocol
707 .try_handle_request(&put_bytes)
708 .await
709 .expect("handle put")
710 .expect("expected response");
711 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
712
713 assert_eq!(response.request_id, 30);
714 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
715 ProtocolError::ChunkTooLarge { .. },
716 )) = response.body
717 {
718 } else {
720 panic!("expected ChunkTooLarge error");
721 }
722 }
723
724 #[tokio::test]
725 async fn test_put_already_exists() {
726 let (protocol, _temp) = create_test_protocol().await;
727
728 let content = b"duplicate content";
729 let address = LmdbStorage::compute_address(content);
730
731 protocol.payment_verifier().cache_insert(address);
733
734 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
735 let put_msg = ChunkMessage {
736 request_id: 40,
737 body: ChunkMessageBody::PutRequest(put_request),
738 };
739 let put_bytes = put_msg.encode().expect("encode put");
740
741 let _ = protocol
742 .try_handle_request(&put_bytes)
743 .await
744 .expect("handle put");
745
746 let response_bytes = protocol
748 .try_handle_request(&put_bytes)
749 .await
750 .expect("handle put 2")
751 .expect("expected response");
752 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
753
754 assert_eq!(response.request_id, 40);
755 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
756 response.body
757 {
758 assert_eq!(addr, address);
759 } else {
760 panic!("expected AlreadyExists");
761 }
762 }
763
764 #[tokio::test]
765 async fn test_protocol_id() {
766 let (protocol, _temp) = create_test_protocol().await;
767 assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
768 }
769
770 #[tokio::test]
771 async fn test_exists_and_local_access() {
772 let (protocol, _temp) = create_test_protocol().await;
773
774 let content = b"local access test";
775 let address = LmdbStorage::compute_address(content);
776
777 assert!(!protocol.exists(&address).expect("exists check"));
778
779 protocol
780 .put_local(&address, content)
781 .await
782 .expect("put local");
783
784 assert!(protocol.exists(&address).expect("exists check"));
785
786 let retrieved = protocol.get_local(&address).await.expect("get local");
787 assert_eq!(retrieved, Some(content.to_vec()));
788 }
789
790 #[tokio::test]
791 async fn test_cache_insert_is_visible() {
792 let (protocol, _temp) = create_test_protocol().await;
793
794 let content = b"cache test content";
795 let address = LmdbStorage::compute_address(content);
796
797 let stats_before = protocol.payment_cache_stats();
799 assert_eq!(stats_before.additions, 0);
800
801 protocol.payment_verifier().cache_insert(address);
803
804 let stats_after = protocol.payment_cache_stats();
806 assert_eq!(stats_after.additions, 1);
807
808 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
810 let put_msg = ChunkMessage {
811 request_id: 100,
812 body: ChunkMessageBody::PutRequest(put_request),
813 };
814 let put_bytes = put_msg.encode().expect("encode put");
815 let response_bytes = protocol
816 .try_handle_request(&put_bytes)
817 .await
818 .expect("handle put")
819 .expect("expected response");
820 let response = ChunkMessage::decode(&response_bytes).expect("decode");
821
822 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
823 } else {
825 panic!("expected success, got: {response:?}");
826 }
827 }
828
829 #[tokio::test]
830 async fn test_put_same_chunk_twice_hits_cache() {
831 let (protocol, _temp) = create_test_protocol().await;
832
833 let content = b"duplicate cache test";
834 let address = LmdbStorage::compute_address(content);
835
836 protocol.payment_verifier().cache_insert(address);
838
839 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
841 let put_msg = ChunkMessage {
842 request_id: 110,
843 body: ChunkMessageBody::PutRequest(put_request),
844 };
845 let put_bytes = put_msg.encode().expect("encode put");
846 let _ = protocol
847 .try_handle_request(&put_bytes)
848 .await
849 .expect("handle put 1");
850
851 let response_bytes = protocol
853 .try_handle_request(&put_bytes)
854 .await
855 .expect("handle put 2")
856 .expect("expected response");
857 let response = ChunkMessage::decode(&response_bytes).expect("decode");
858
859 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
860 {
861 } else {
863 panic!("expected AlreadyExists, got: {response:?}");
864 }
865 }
866
867 #[tokio::test]
868 async fn test_payment_cache_stats_returns_correct_values() {
869 let (protocol, _temp) = create_test_protocol().await;
870
871 let stats = protocol.payment_cache_stats();
872 assert_eq!(stats.hits, 0);
873 assert_eq!(stats.misses, 0);
874 assert_eq!(stats.additions, 0);
875
876 let content = b"stats test";
878 let address = LmdbStorage::compute_address(content);
879 protocol.payment_verifier().cache_insert(address);
880
881 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
882 let put_msg = ChunkMessage {
883 request_id: 120,
884 body: ChunkMessageBody::PutRequest(put_request),
885 };
886 let put_bytes = put_msg.encode().expect("encode put");
887 let _ = protocol
888 .try_handle_request(&put_bytes)
889 .await
890 .expect("handle put");
891
892 let stats = protocol.payment_cache_stats();
893 assert_eq!(stats.additions, 1);
895 assert_eq!(stats.hits, 1);
896 }
897
898 #[tokio::test]
899 async fn test_storage_stats() {
900 let (protocol, _temp) = create_test_protocol().await;
901 let stats = protocol.storage_stats();
902 assert_eq!(stats.chunks_stored, 0);
903 }
904
905 #[tokio::test]
906 async fn test_merkle_candidate_quote_request() {
907 use ant_protocol::payment::verify::verify_merkle_candidate_signature;
908 use evmlib::merkle_payments::MerklePaymentCandidateNode;
909
910 let (protocol, _temp) = create_test_protocol().await;
912
913 let address = [0x77; 32];
914 let timestamp = std::time::SystemTime::now()
915 .duration_since(std::time::UNIX_EPOCH)
916 .expect("system time")
917 .as_secs();
918
919 let request = MerkleCandidateQuoteRequest {
920 address,
921 data_type: DATA_TYPE_CHUNK,
922 data_size: 4096,
923 merkle_payment_timestamp: timestamp,
924 };
925 let msg = ChunkMessage {
926 request_id: 600,
927 body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
928 };
929 let msg_bytes = msg.encode().expect("encode request");
930
931 let response_bytes = protocol
932 .try_handle_request(&msg_bytes)
933 .await
934 .expect("handle merkle candidate quote")
935 .expect("expected response");
936 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
937
938 assert_eq!(response.request_id, 600);
939 match response.body {
940 ChunkMessageBody::MerkleCandidateQuoteResponse(
941 MerkleCandidateQuoteResponse::Success { candidate_node },
942 ) => {
943 let candidate: MerklePaymentCandidateNode =
944 rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
945
946 assert!(
948 verify_merkle_candidate_signature(&candidate),
949 "ML-DSA-65 candidate signature must be valid"
950 );
951
952 assert_eq!(candidate.merkle_payment_timestamp, timestamp);
953 assert!(candidate.price >= evmlib::common::Amount::ZERO);
955 }
956 other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
957 }
958 }
959
960 #[tokio::test]
961 async fn test_handle_unexpected_response_message() {
962 let (protocol, _temp) = create_test_protocol().await;
963
964 let msg = ChunkMessage {
966 request_id: 200,
967 body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
968 };
969 let msg_bytes = msg.encode().expect("encode");
970
971 let result = protocol
972 .try_handle_request(&msg_bytes)
973 .await
974 .expect("handle msg");
975
976 assert!(
977 result.is_none(),
978 "expected None for response message, got: {result:?}"
979 );
980 }
981
982 #[tokio::test]
983 async fn test_quote_already_stored_flag() {
984 let (protocol, _temp) = create_test_protocol().await;
985
986 let content = b"already stored quote test";
987 let address = LmdbStorage::compute_address(content);
988
989 protocol.payment_verifier().cache_insert(address);
991 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
992 let put_msg = ChunkMessage {
993 request_id: 300,
994 body: ChunkMessageBody::PutRequest(put_request),
995 };
996 let put_bytes = put_msg.encode().expect("encode put");
997 let _ = protocol
998 .try_handle_request(&put_bytes)
999 .await
1000 .expect("handle put");
1001
1002 let quote_request = ChunkQuoteRequest {
1004 address,
1005 data_size: content.len() as u64,
1006 data_type: DATA_TYPE_CHUNK,
1007 };
1008 let quote_msg = ChunkMessage {
1009 request_id: 301,
1010 body: ChunkMessageBody::QuoteRequest(quote_request),
1011 };
1012 let quote_bytes = quote_msg.encode().expect("encode quote");
1013 let response_bytes = protocol
1014 .try_handle_request("e_bytes)
1015 .await
1016 .expect("handle quote")
1017 .expect("expected response");
1018 let response = ChunkMessage::decode(&response_bytes).expect("decode");
1019
1020 match response.body {
1021 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1022 already_stored, ..
1023 }) => {
1024 assert!(
1025 already_stored,
1026 "already_stored should be true for existing chunk"
1027 );
1028 }
1029 other => panic!("expected Success with already_stored, got: {other:?}"),
1030 }
1031
1032 let new_address = [0xFFu8; 32];
1034 let quote_request2 = ChunkQuoteRequest {
1035 address: new_address,
1036 data_size: 100,
1037 data_type: DATA_TYPE_CHUNK,
1038 };
1039 let quote_msg2 = ChunkMessage {
1040 request_id: 302,
1041 body: ChunkMessageBody::QuoteRequest(quote_request2),
1042 };
1043 let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
1044 let response_bytes2 = protocol
1045 .try_handle_request("e_bytes2)
1046 .await
1047 .expect("handle quote2")
1048 .expect("expected response");
1049 let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
1050
1051 match response2.body {
1052 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1053 already_stored, ..
1054 }) => {
1055 assert!(
1056 !already_stored,
1057 "already_stored should be false for new chunk"
1058 );
1059 }
1060 other => panic!("expected Success with already_stored=false, got: {other:?}"),
1061 }
1062 }
1063}