1#[cfg(test)]
31use crate::ant_protocol::DATA_TYPE_CHUNK;
32use crate::ant_protocol::{
33 ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
34 ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
35 MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE,
36};
37use crate::client::compute_address;
38use crate::error::{Error, Result};
39use crate::logging::{debug, info, warn};
40use crate::payment::{PaymentVerifier, QuoteGenerator, VerificationContext};
41use crate::replication::fresh::FreshWriteEvent;
42use crate::storage::lmdb::LmdbStorage;
43use bytes::Bytes;
44use saorsa_core::P2PNode;
45use std::sync::Arc;
46use tokio::sync::mpsc;
47
48pub struct AntProtocol {
53 storage: Arc<LmdbStorage>,
55 payment_verifier: Arc<PaymentVerifier>,
57 quote_generator: Arc<QuoteGenerator>,
60 fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
62}
63
64impl AntProtocol {
65 #[must_use]
73 pub fn new(
74 storage: Arc<LmdbStorage>,
75 payment_verifier: Arc<PaymentVerifier>,
76 quote_generator: Arc<QuoteGenerator>,
77 ) -> Self {
78 payment_verifier.attach_storage(Arc::clone(&storage));
86 quote_generator.attach_storage(Arc::clone(&storage));
87
88 Self {
89 storage,
90 payment_verifier,
91 quote_generator,
92 fresh_write_tx: None,
93 }
94 }
95
96 pub fn attach_p2p_node(&self, node: Arc<P2PNode>) {
102 self.payment_verifier.attach_p2p_node(node);
103 debug!("AntProtocol: P2PNode attached for payment live-DHT checks");
104 }
105
106 pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
111 self.fresh_write_tx = Some(tx);
112 }
113
114 #[must_use]
116 pub fn protocol_id(&self) -> &'static str {
117 CHUNK_PROTOCOL_ID
118 }
119
120 #[must_use]
122 pub fn storage(&self) -> Arc<LmdbStorage> {
123 Arc::clone(&self.storage)
124 }
125
126 #[cfg(test)]
129 #[must_use]
130 pub(crate) fn priced_records_stored(&self) -> usize {
131 self.quote_generator.records_stored()
132 }
133
134 #[must_use]
136 pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
137 Arc::clone(&self.payment_verifier)
138 }
139
140 pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
151 let message = ChunkMessage::decode(data)
152 .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
153
154 let request_id = message.request_id;
155
156 let response_body = match message.body {
157 ChunkMessageBody::PutRequest(req) => {
158 ChunkMessageBody::PutResponse(self.handle_put(req).await)
159 }
160 ChunkMessageBody::GetRequest(req) => {
161 ChunkMessageBody::GetResponse(self.handle_get(req).await)
162 }
163 ChunkMessageBody::QuoteRequest(ref req) => {
164 ChunkMessageBody::QuoteResponse(self.handle_quote(req))
165 }
166 ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
167 ChunkMessageBody::MerkleCandidateQuoteResponse(
168 self.handle_merkle_candidate_quote(req),
169 )
170 }
171 _ => return Ok(None),
183 };
184
185 let response = ChunkMessage {
186 request_id,
187 body: response_body,
188 };
189
190 response
191 .encode()
192 .map(|b| Some(Bytes::from(b)))
193 .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
194 }
195
196 async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
204 let start = std::time::Instant::now();
205 let addr_hex = hex::encode(request.address);
206 let chunk_size = request.content.len();
207 let response = self.handle_put_inner(request).await;
208 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
209 let outcome: &'static str = match &response {
210 ChunkPutResponse::Success { .. } => "success",
211 ChunkPutResponse::AlreadyExists { .. } => "already_exists",
212 ChunkPutResponse::PaymentRequired { .. } => "payment_required",
213 ChunkPutResponse::Error(_) => "error",
214 _ => "unknown",
215 };
216 info!(
217 target: "ant_node::storage::rpc_latency",
218 duration_ms,
219 chunk_size,
220 outcome,
221 addr = %addr_hex,
222 "put_rpc"
223 );
224 response
225 }
226
227 async fn handle_put_inner(&self, request: ChunkPutRequest) -> ChunkPutResponse {
229 let address = request.address;
230 let addr_hex = hex::encode(address);
231 debug!("Handling PUT request for {addr_hex}");
232
233 if request.content.len() > MAX_CHUNK_SIZE {
235 return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
236 size: request.content.len(),
237 max_size: MAX_CHUNK_SIZE,
238 });
239 }
240
241 let computed = compute_address(&request.content);
243 if computed != address {
244 return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
245 expected: address,
246 actual: computed,
247 });
248 }
249
250 match self.storage.exists(&address) {
252 Ok(true) => {
253 debug!("Chunk {addr_hex} already exists");
254 return ChunkPutResponse::AlreadyExists { address };
255 }
256 Err(e) => {
257 return ChunkPutResponse::Error(ProtocolError::Internal(format!(
258 "Storage read failed: {e}"
259 )));
260 }
261 Ok(false) => {}
262 }
263
264 if let Err(e) = self.storage.check_capacity() {
275 info!(
276 target: "ant_node::storage::disk_precheck",
277 addr = %addr_hex,
278 "Rejecting PUT before payment verification: {e}"
279 );
280 return ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()));
281 }
282
283 let payment_result = self
287 .payment_verifier
288 .verify_payment(
289 &address,
290 request.payment_proof.as_deref(),
291 VerificationContext::ClientPut,
292 )
293 .await;
294
295 match payment_result {
296 Ok(status) if status.can_store() => {
297 }
299 Ok(_) => {
300 return ChunkPutResponse::PaymentRequired {
301 message: "Payment required for new chunk".to_string(),
302 };
303 }
304 Err(e) => {
305 return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
306 }
307 }
308
309 match self.storage.put(&address, &request.content).await {
311 Ok(_) => {
312 let content_len = request.content.len();
313 info!("Stored chunk {addr_hex} ({content_len} bytes)");
314 self.quote_generator.record_store();
320
321 if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
326 let event = FreshWriteEvent {
332 key: address,
333 data: request.content.to_vec(),
334 payment_proof: proof,
335 };
336 if tx.send(event).is_err() {
337 debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
338 }
339 }
340
341 ChunkPutResponse::Success { address }
342 }
343 Err(e) => {
344 warn!("Failed to store chunk {addr_hex}: {e}");
345 ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
346 }
347 }
348 }
349
350 async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
355 let start = std::time::Instant::now();
356 let addr_hex = hex::encode(request.address);
357 let response = self.handle_get_inner(request).await;
358 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
359 let outcome: &'static str = match &response {
360 ChunkGetResponse::Success { .. } => "success",
361 ChunkGetResponse::NotFound { .. } => "not_found",
362 ChunkGetResponse::Error(_) => "error",
363 _ => "unknown",
364 };
365 info!(
366 target: "ant_node::storage::rpc_latency",
367 duration_ms,
368 outcome,
369 addr = %addr_hex,
370 "get_rpc"
371 );
372 response
373 }
374
375 async fn handle_get_inner(&self, request: ChunkGetRequest) -> ChunkGetResponse {
377 let address = request.address;
378 let addr_hex = hex::encode(address);
379 debug!("Handling GET request for {addr_hex}");
380
381 match self.storage.get(&address).await {
382 Ok(Some(content)) => {
383 let content_len = content.len();
384 debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
385 ChunkGetResponse::Success { address, content }
386 }
387 Ok(None) => {
388 debug!("Chunk {addr_hex} not found");
389 ChunkGetResponse::NotFound { address }
390 }
391 Err(e) => {
392 warn!("Failed to retrieve chunk {addr_hex}: {e}");
393 ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
394 }
395 }
396 }
397
398 fn resync_quote_metric(&self) {
411 match self.storage.current_chunks() {
412 Ok(count) => usize::try_from(count).map_or_else(
416 |_| {
417 warn!(
418 "current_chunks() count {count} overflows usize; keeping previous quote \
419 metric"
420 );
421 },
422 |records| self.quote_generator.resync_records(records),
423 ),
424 Err(e) => {
425 warn!("Failed to read current_chunks() for quote metric resync: {e}");
426 }
427 }
428 }
429
430 fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
432 let addr_hex = hex::encode(request.address);
433 let data_size = request.data_size;
434 debug!("Handling quote request for {addr_hex} (size: {data_size})");
435
436 self.resync_quote_metric();
438
439 #[allow(clippy::manual_unwrap_or_default)]
445 let already_stored = match self.storage.exists(&request.address) {
446 Ok(exists) => exists,
447 Err(e) => {
448 warn!("Storage check failed for {addr_hex}: {e}");
449 false }
451 };
452
453 if already_stored {
454 debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
455 }
456
457 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
459 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
460 size: MAX_CHUNK_SIZE + 1,
461 max_size: MAX_CHUNK_SIZE,
462 });
463 };
464 if data_size_usize > MAX_CHUNK_SIZE {
465 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
466 size: data_size_usize,
467 max_size: MAX_CHUNK_SIZE,
468 });
469 }
470
471 match self
472 .quote_generator
473 .create_quote(request.address, data_size_usize, request.data_type)
474 {
475 Ok(quote) => {
476 match rmp_serde::to_vec("e) {
478 Ok(quote_bytes) => ChunkQuoteResponse::Success {
479 quote: quote_bytes,
480 already_stored,
481 },
482 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
483 "Failed to serialize quote: {e}"
484 ))),
485 }
486 }
487 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
488 }
489 }
490
491 fn handle_merkle_candidate_quote(
493 &self,
494 request: &MerkleCandidateQuoteRequest,
495 ) -> MerkleCandidateQuoteResponse {
496 let addr_hex = hex::encode(request.address);
497 let data_size = request.data_size;
498 debug!(
499 "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
500 request.merkle_payment_timestamp
501 );
502
503 self.resync_quote_metric();
505
506 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
507 return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
508 "data_size {} overflows usize",
509 request.data_size
510 )));
511 };
512 if data_size_usize > MAX_CHUNK_SIZE {
513 return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
514 size: data_size_usize,
515 max_size: MAX_CHUNK_SIZE,
516 });
517 }
518
519 match self.quote_generator.create_merkle_candidate_quote(
520 data_size_usize,
521 request.data_type,
522 request.merkle_payment_timestamp,
523 ) {
524 Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
525 Ok(bytes) => MerkleCandidateQuoteResponse::Success {
526 candidate_node: bytes,
527 },
528 Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
529 "Failed to serialize merkle candidate node: {e}"
530 ))),
531 },
532 Err(e) => {
533 MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
534 }
535 }
536 }
537
538 #[must_use]
540 pub fn storage_stats(&self) -> crate::storage::StorageStats {
541 self.storage.stats()
542 }
543
544 #[must_use]
546 pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
547 self.payment_verifier.cache_stats()
548 }
549
550 #[cfg(any(test, feature = "test-utils"))]
556 #[must_use]
557 pub fn payment_verifier(&self) -> &PaymentVerifier {
558 &self.payment_verifier
559 }
560
561 pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
567 self.storage.exists(address)
568 }
569
570 pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
576 self.storage.get(address).await
577 }
578
579 #[cfg(test)]
587 pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
588 self.storage.put(address, content).await
589 }
590}
591
592#[cfg(test)]
593#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
594mod tests {
595 use super::*;
596 use crate::payment::metrics::QuotingMetricsTracker;
597 use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
598 use crate::storage::LmdbStorageConfig;
599 use evmlib::RewardsAddress;
600 use saorsa_core::identity::NodeIdentity;
601 use saorsa_core::MlDsa65;
602 use saorsa_pqc::pqc::types::MlDsaSecretKey;
603 use tempfile::TempDir;
604
605 async fn create_test_protocol() -> (AntProtocol, TempDir) {
606 create_test_protocol_with_reserve(0).await
609 }
610
611 async fn create_test_protocol_with_reserve(disk_reserve: u64) -> (AntProtocol, TempDir) {
617 let temp_dir = TempDir::new().expect("create temp dir");
618
619 let storage_config = LmdbStorageConfig {
620 root_dir: temp_dir.path().to_path_buf(),
621 disk_reserve,
622 ..LmdbStorageConfig::test_default()
623 };
624 let storage = Arc::new(
625 LmdbStorage::new(storage_config)
626 .await
627 .expect("create storage"),
628 );
629
630 let rewards_address = RewardsAddress::new([1u8; 20]);
631 let payment_config = PaymentVerifierConfig {
632 evm: EvmVerifierConfig::default(),
633 cache_capacity: 100_000,
634 close_group_size: crate::ant_protocol::CLOSE_GROUP_SIZE,
635 local_rewards_address: rewards_address,
636 };
637 let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
638 let metrics_tracker = QuotingMetricsTracker::new(100);
639 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
640
641 let identity = NodeIdentity::generate().expect("generate identity");
643 let pub_key_bytes = identity.public_key().as_bytes().to_vec();
644 let sk_bytes = identity.secret_key_bytes().to_vec();
645 let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
646 quote_generator.set_signer(pub_key_bytes, move |msg| {
647 use saorsa_pqc::pqc::MlDsaOperations;
648 let ml_dsa = MlDsa65::new();
649 ml_dsa
650 .sign(&sk, msg)
651 .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
652 });
653
654 let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
655 (protocol, temp_dir)
656 }
657
658 #[tokio::test]
659 async fn test_put_and_get_chunk() {
660 let (protocol, _temp) = create_test_protocol().await;
661
662 let content = b"hello world";
663 let address = LmdbStorage::compute_address(content);
664
665 protocol.payment_verifier().cache_insert(address);
667
668 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
669 let put_msg = ChunkMessage {
670 request_id: 1,
671 body: ChunkMessageBody::PutRequest(put_request),
672 };
673 let put_bytes = put_msg.encode().expect("encode put");
674
675 let response_bytes = protocol
677 .try_handle_request(&put_bytes)
678 .await
679 .expect("handle put")
680 .expect("expected response");
681 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
682
683 assert_eq!(response.request_id, 1);
684 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
685 response.body
686 {
687 assert_eq!(addr, address);
688 } else {
689 panic!("expected PutResponse::Success, got: {response:?}");
690 }
691
692 let get_request = ChunkGetRequest::new(address);
694 let get_msg = ChunkMessage {
695 request_id: 2,
696 body: ChunkMessageBody::GetRequest(get_request),
697 };
698 let get_bytes = get_msg.encode().expect("encode get");
699
700 let response_bytes = protocol
702 .try_handle_request(&get_bytes)
703 .await
704 .expect("handle get")
705 .expect("expected response");
706 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
707
708 assert_eq!(response.request_id, 2);
709 if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
710 address: addr,
711 content: data,
712 }) = response.body
713 {
714 assert_eq!(addr, address);
715 assert_eq!(data, content.to_vec());
716 } else {
717 panic!("expected GetResponse::Success");
718 }
719 }
720
721 #[tokio::test]
722 async fn test_get_not_found() {
723 let (protocol, _temp) = create_test_protocol().await;
724
725 let address = [0xAB; 32];
726 let get_request = ChunkGetRequest::new(address);
727 let get_msg = ChunkMessage {
728 request_id: 10,
729 body: ChunkMessageBody::GetRequest(get_request),
730 };
731 let get_bytes = get_msg.encode().expect("encode get");
732
733 let response_bytes = protocol
734 .try_handle_request(&get_bytes)
735 .await
736 .expect("handle get")
737 .expect("expected response");
738 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
739
740 assert_eq!(response.request_id, 10);
741 if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
742 response.body
743 {
744 assert_eq!(addr, address);
745 } else {
746 panic!("expected GetResponse::NotFound");
747 }
748 }
749
750 #[tokio::test]
751 async fn test_put_address_mismatch() {
752 let (protocol, _temp) = create_test_protocol().await;
753
754 let content = b"test content";
755 let wrong_address = [0xFF; 32]; protocol.payment_verifier().cache_insert(wrong_address);
759
760 let put_request = ChunkPutRequest::new(wrong_address, Bytes::copy_from_slice(content));
761 let put_msg = ChunkMessage {
762 request_id: 20,
763 body: ChunkMessageBody::PutRequest(put_request),
764 };
765 let put_bytes = put_msg.encode().expect("encode put");
766
767 let response_bytes = protocol
768 .try_handle_request(&put_bytes)
769 .await
770 .expect("handle put")
771 .expect("expected response");
772 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
773
774 assert_eq!(response.request_id, 20);
775 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
776 ProtocolError::AddressMismatch { .. },
777 )) = response.body
778 {
779 } else {
781 panic!("expected AddressMismatch error, got: {response:?}");
782 }
783 }
784
785 #[tokio::test]
786 async fn test_put_chunk_too_large() {
787 let (protocol, _temp) = create_test_protocol().await;
788
789 let content = vec![0u8; MAX_CHUNK_SIZE + 1];
791 let address = LmdbStorage::compute_address(&content);
792
793 let put_request = ChunkPutRequest::new(address, Bytes::from(content));
794 let put_msg = ChunkMessage {
795 request_id: 30,
796 body: ChunkMessageBody::PutRequest(put_request),
797 };
798 let put_bytes = put_msg.encode().expect("encode put");
799
800 let response_bytes = protocol
801 .try_handle_request(&put_bytes)
802 .await
803 .expect("handle put")
804 .expect("expected response");
805 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
806
807 assert_eq!(response.request_id, 30);
808 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
809 ProtocolError::ChunkTooLarge { .. },
810 )) = response.body
811 {
812 } else {
814 panic!("expected ChunkTooLarge error");
815 }
816 }
817
818 #[tokio::test]
827 async fn test_put_rejected_on_insufficient_disk_before_verification() {
828 let (protocol, _temp) = create_test_protocol_with_reserve(u64::MAX).await;
831
832 let content = b"chunk for a disk-full node";
833 let address = LmdbStorage::compute_address(content);
834
835 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
836 let put_msg = ChunkMessage {
837 request_id: 41,
838 body: ChunkMessageBody::PutRequest(put_request),
839 };
840 let put_bytes = put_msg.encode().expect("encode put");
841
842 let response_bytes = protocol
843 .try_handle_request(&put_bytes)
844 .await
845 .expect("handle put")
846 .expect("expected response");
847 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
848
849 assert_eq!(response.request_id, 41);
850 match response.body {
851 ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
852 ProtocolError::StorageFailed(msg),
853 )) => {
854 assert!(
855 msg.contains("Insufficient disk space"),
856 "expected disk-space error, got: {msg}"
857 );
858 }
859 other => {
860 panic!("expected StorageFailed disk error before verification, got: {other:?}")
861 }
862 }
863
864 assert!(!protocol.exists(&address).expect("exists check"));
866 }
867
868 #[tokio::test]
869 async fn test_put_already_exists() {
870 let (protocol, _temp) = create_test_protocol().await;
871
872 let content = b"duplicate content";
873 let address = LmdbStorage::compute_address(content);
874
875 protocol.payment_verifier().cache_insert(address);
877
878 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
879 let put_msg = ChunkMessage {
880 request_id: 40,
881 body: ChunkMessageBody::PutRequest(put_request),
882 };
883 let put_bytes = put_msg.encode().expect("encode put");
884
885 let _ = protocol
886 .try_handle_request(&put_bytes)
887 .await
888 .expect("handle put");
889
890 let response_bytes = protocol
892 .try_handle_request(&put_bytes)
893 .await
894 .expect("handle put 2")
895 .expect("expected response");
896 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
897
898 assert_eq!(response.request_id, 40);
899 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
900 response.body
901 {
902 assert_eq!(addr, address);
903 } else {
904 panic!("expected AlreadyExists");
905 }
906 }
907
908 #[tokio::test]
909 async fn test_protocol_id() {
910 let (protocol, _temp) = create_test_protocol().await;
911 assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
912 }
913
914 #[tokio::test]
915 async fn test_exists_and_local_access() {
916 let (protocol, _temp) = create_test_protocol().await;
917
918 let content = b"local access test";
919 let address = LmdbStorage::compute_address(content);
920
921 assert!(!protocol.exists(&address).expect("exists check"));
922
923 protocol
924 .put_local(&address, content)
925 .await
926 .expect("put local");
927
928 assert!(protocol.exists(&address).expect("exists check"));
929
930 let retrieved = protocol.get_local(&address).await.expect("get local");
931 assert_eq!(retrieved, Some(content.to_vec()));
932 }
933
934 #[tokio::test]
935 async fn test_cache_insert_is_visible() {
936 let (protocol, _temp) = create_test_protocol().await;
937
938 let content = b"cache test content";
939 let address = LmdbStorage::compute_address(content);
940
941 let stats_before = protocol.payment_cache_stats();
943 assert_eq!(stats_before.additions, 0);
944
945 protocol.payment_verifier().cache_insert(address);
947
948 let stats_after = protocol.payment_cache_stats();
950 assert_eq!(stats_after.additions, 1);
951
952 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
954 let put_msg = ChunkMessage {
955 request_id: 100,
956 body: ChunkMessageBody::PutRequest(put_request),
957 };
958 let put_bytes = put_msg.encode().expect("encode put");
959 let response_bytes = protocol
960 .try_handle_request(&put_bytes)
961 .await
962 .expect("handle put")
963 .expect("expected response");
964 let response = ChunkMessage::decode(&response_bytes).expect("decode");
965
966 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
967 } else {
969 panic!("expected success, got: {response:?}");
970 }
971 }
972
973 #[tokio::test]
974 async fn test_put_same_chunk_twice_hits_cache() {
975 let (protocol, _temp) = create_test_protocol().await;
976
977 let content = b"duplicate cache test";
978 let address = LmdbStorage::compute_address(content);
979
980 protocol.payment_verifier().cache_insert(address);
982
983 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
985 let put_msg = ChunkMessage {
986 request_id: 110,
987 body: ChunkMessageBody::PutRequest(put_request),
988 };
989 let put_bytes = put_msg.encode().expect("encode put");
990 let _ = protocol
991 .try_handle_request(&put_bytes)
992 .await
993 .expect("handle put 1");
994
995 let response_bytes = protocol
997 .try_handle_request(&put_bytes)
998 .await
999 .expect("handle put 2")
1000 .expect("expected response");
1001 let response = ChunkMessage::decode(&response_bytes).expect("decode");
1002
1003 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
1004 {
1005 } else {
1007 panic!("expected AlreadyExists, got: {response:?}");
1008 }
1009 }
1010
1011 #[tokio::test]
1012 async fn test_payment_cache_stats_returns_correct_values() {
1013 let (protocol, _temp) = create_test_protocol().await;
1014
1015 let stats = protocol.payment_cache_stats();
1016 assert_eq!(stats.hits, 0);
1017 assert_eq!(stats.misses, 0);
1018 assert_eq!(stats.additions, 0);
1019
1020 let content = b"stats test";
1022 let address = LmdbStorage::compute_address(content);
1023 protocol.payment_verifier().cache_insert(address);
1024
1025 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
1026 let put_msg = ChunkMessage {
1027 request_id: 120,
1028 body: ChunkMessageBody::PutRequest(put_request),
1029 };
1030 let put_bytes = put_msg.encode().expect("encode put");
1031 let _ = protocol
1032 .try_handle_request(&put_bytes)
1033 .await
1034 .expect("handle put");
1035
1036 let stats = protocol.payment_cache_stats();
1037 assert_eq!(stats.additions, 1);
1039 assert_eq!(stats.hits, 1);
1040 }
1041
1042 #[tokio::test]
1043 async fn test_storage_stats() {
1044 let (protocol, _temp) = create_test_protocol().await;
1045 let stats = protocol.storage_stats();
1046 assert_eq!(stats.chunks_stored, 0);
1047 }
1048
1049 #[tokio::test]
1050 async fn test_merkle_candidate_quote_request() {
1051 use ant_protocol::payment::verify::verify_merkle_candidate_signature;
1052 use evmlib::merkle_payments::MerklePaymentCandidateNode;
1053
1054 let (protocol, _temp) = create_test_protocol().await;
1056
1057 let address = [0x77; 32];
1058 let timestamp = std::time::SystemTime::now()
1059 .duration_since(std::time::UNIX_EPOCH)
1060 .expect("system time")
1061 .as_secs();
1062
1063 let request = MerkleCandidateQuoteRequest {
1064 address,
1065 data_type: DATA_TYPE_CHUNK,
1066 data_size: 4096,
1067 merkle_payment_timestamp: timestamp,
1068 };
1069 let msg = ChunkMessage {
1070 request_id: 600,
1071 body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
1072 };
1073 let msg_bytes = msg.encode().expect("encode request");
1074
1075 let response_bytes = protocol
1076 .try_handle_request(&msg_bytes)
1077 .await
1078 .expect("handle merkle candidate quote")
1079 .expect("expected response");
1080 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
1081
1082 assert_eq!(response.request_id, 600);
1083 match response.body {
1084 ChunkMessageBody::MerkleCandidateQuoteResponse(
1085 MerkleCandidateQuoteResponse::Success { candidate_node },
1086 ) => {
1087 let candidate: MerklePaymentCandidateNode =
1088 rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
1089
1090 assert!(
1092 verify_merkle_candidate_signature(&candidate),
1093 "ML-DSA-65 candidate signature must be valid"
1094 );
1095
1096 assert_eq!(candidate.merkle_payment_timestamp, timestamp);
1097 assert!(candidate.price >= evmlib::common::Amount::ZERO);
1099 }
1100 other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
1101 }
1102 }
1103
1104 #[tokio::test]
1105 async fn test_handle_unexpected_response_message() {
1106 let (protocol, _temp) = create_test_protocol().await;
1107
1108 let msg = ChunkMessage {
1110 request_id: 200,
1111 body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
1112 };
1113 let msg_bytes = msg.encode().expect("encode");
1114
1115 let result = protocol
1116 .try_handle_request(&msg_bytes)
1117 .await
1118 .expect("handle msg");
1119
1120 assert!(
1121 result.is_none(),
1122 "expected None for response message, got: {result:?}"
1123 );
1124 }
1125
1126 #[tokio::test]
1127 async fn test_quote_already_stored_flag() {
1128 let (protocol, _temp) = create_test_protocol().await;
1129
1130 let content = b"already stored quote test";
1131 let address = LmdbStorage::compute_address(content);
1132
1133 protocol.payment_verifier().cache_insert(address);
1135 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
1136 let put_msg = ChunkMessage {
1137 request_id: 300,
1138 body: ChunkMessageBody::PutRequest(put_request),
1139 };
1140 let put_bytes = put_msg.encode().expect("encode put");
1141 let _ = protocol
1142 .try_handle_request(&put_bytes)
1143 .await
1144 .expect("handle put");
1145
1146 let quote_request = ChunkQuoteRequest {
1148 address,
1149 data_size: content.len() as u64,
1150 data_type: DATA_TYPE_CHUNK,
1151 };
1152 let quote_msg = ChunkMessage {
1153 request_id: 301,
1154 body: ChunkMessageBody::QuoteRequest(quote_request),
1155 };
1156 let quote_bytes = quote_msg.encode().expect("encode quote");
1157 let response_bytes = protocol
1158 .try_handle_request("e_bytes)
1159 .await
1160 .expect("handle quote")
1161 .expect("expected response");
1162 let response = ChunkMessage::decode(&response_bytes).expect("decode");
1163
1164 match response.body {
1165 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1166 already_stored, ..
1167 }) => {
1168 assert!(
1169 already_stored,
1170 "already_stored should be true for existing chunk"
1171 );
1172 }
1173 other => panic!("expected Success with already_stored, got: {other:?}"),
1174 }
1175
1176 let new_address = [0xFFu8; 32];
1178 let quote_request2 = ChunkQuoteRequest {
1179 address: new_address,
1180 data_size: 100,
1181 data_type: DATA_TYPE_CHUNK,
1182 };
1183 let quote_msg2 = ChunkMessage {
1184 request_id: 302,
1185 body: ChunkMessageBody::QuoteRequest(quote_request2),
1186 };
1187 let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
1188 let response_bytes2 = protocol
1189 .try_handle_request("e_bytes2)
1190 .await
1191 .expect("handle quote2")
1192 .expect("expected response");
1193 let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
1194
1195 match response2.body {
1196 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1197 already_stored, ..
1198 }) => {
1199 assert!(
1200 !already_stored,
1201 "already_stored should be false for new chunk"
1202 );
1203 }
1204 other => panic!("expected Success with already_stored=false, got: {other:?}"),
1205 }
1206 }
1207
1208 fn priced_records_after_quote(protocol: &AntProtocol) -> usize {
1212 let quote_request = ChunkQuoteRequest {
1213 address: [0xAAu8; 32], data_size: 100,
1215 data_type: DATA_TYPE_CHUNK,
1216 };
1217 let _ = protocol.handle_quote("e_request);
1218 protocol.priced_records_stored()
1219 }
1220
1221 #[tokio::test]
1225 async fn test_quote_metric_reflects_deletions() {
1226 let (protocol, _temp) = create_test_protocol().await;
1227
1228 let contents: Vec<Vec<u8>> = (0u8..5).map(|i| vec![i; 64]).collect();
1230 let mut addresses = Vec::new();
1231 for content in &contents {
1232 let addr = LmdbStorage::compute_address(content);
1233 protocol.put_local(&addr, content).await.expect("put_local");
1234 addresses.push(addr);
1235 }
1236
1237 assert_eq!(priced_records_after_quote(&protocol), 5);
1239
1240 for addr in addresses.iter().take(2) {
1242 assert!(protocol.storage().delete(addr).await.expect("delete"));
1243 }
1244 assert_eq!(priced_records_after_quote(&protocol), 3);
1245
1246 for addr in addresses.iter().skip(2) {
1248 assert!(protocol.storage().delete(addr).await.expect("delete"));
1249 }
1250 assert_eq!(priced_records_after_quote(&protocol), 0);
1251 }
1252
1253 #[tokio::test]
1260 async fn test_quote_price_drops_after_deletion() {
1261 use crate::payment::pricing::calculate_price;
1262
1263 let (protocol, _temp) = create_test_protocol().await;
1264 let contents: Vec<Vec<u8>> = (0u8..10).map(|i| vec![i; 64]).collect();
1265 let mut addresses = Vec::new();
1266 for content in &contents {
1267 let addr = LmdbStorage::compute_address(content);
1268 protocol.put_local(&addr, content).await.expect("put_local");
1269 addresses.push(addr);
1270 }
1271
1272 assert_eq!(priced_records_after_quote(&protocol), 10);
1276 let price_full = calculate_price(10);
1277
1278 for addr in addresses.iter().take(8) {
1280 assert!(protocol.storage().delete(addr).await.expect("delete"));
1281 }
1282 assert_eq!(priced_records_after_quote(&protocol), 2);
1286 let price_after = calculate_price(2);
1287 assert!(
1288 price_after < price_full,
1289 "deleting data must lower the observable quote price \
1290 (full={price_full:?}, after={price_after:?})"
1291 );
1292 }
1293}