1#[cfg(test)]
31use crate::ant_protocol::DATA_TYPE_CHUNK;
32use crate::ant_protocol::{
33 ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
34 ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
35 MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, MAX_CHUNK_SIZE,
36};
37use crate::client::compute_address;
38use crate::error::{Error, Result};
39use crate::logging::{debug, info, warn};
40use crate::payment::{PaymentVerifier, QuoteGenerator, VerificationContext};
41use crate::replication::admission;
42use crate::replication::config::K_BUCKET_SIZE;
43use crate::replication::fresh::FreshWriteEvent;
44use crate::storage::lmdb::LmdbStorage;
45use bytes::Bytes;
46use parking_lot::RwLock;
47use saorsa_core::P2PNode;
48use std::sync::Arc;
49use tokio::sync::mpsc;
50
51const SELF_CLOSENESS_GATE_WIDTH: usize = K_BUCKET_SIZE;
61
62pub struct AntProtocol {
67 storage: Arc<LmdbStorage>,
69 payment_verifier: Arc<PaymentVerifier>,
71 quote_generator: Arc<QuoteGenerator>,
74 fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
76 p2p_node: RwLock<Option<Arc<P2PNode>>>,
80}
81
82impl AntProtocol {
83 #[must_use]
91 pub fn new(
92 storage: Arc<LmdbStorage>,
93 payment_verifier: Arc<PaymentVerifier>,
94 quote_generator: Arc<QuoteGenerator>,
95 ) -> Self {
96 payment_verifier.attach_storage(Arc::clone(&storage));
104 quote_generator.attach_storage(Arc::clone(&storage));
105
106 Self {
107 storage,
108 payment_verifier,
109 quote_generator,
110 fresh_write_tx: None,
111 p2p_node: RwLock::new(None),
112 }
113 }
114
115 pub fn attach_p2p_node(&self, node: Arc<P2PNode>) {
121 *self.p2p_node.write() = Some(Arc::clone(&node));
122 self.payment_verifier.attach_p2p_node(node);
123 debug!("AntProtocol: P2PNode attached for payment live-DHT checks and self-closeness gate");
124 }
125
126 pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
131 self.fresh_write_tx = Some(tx);
132 }
133
134 #[must_use]
136 pub fn protocol_id(&self) -> &'static str {
137 CHUNK_PROTOCOL_ID
138 }
139
140 #[must_use]
142 pub fn storage(&self) -> Arc<LmdbStorage> {
143 Arc::clone(&self.storage)
144 }
145
146 #[cfg(test)]
149 #[must_use]
150 pub(crate) fn priced_records_stored(&self) -> usize {
151 self.quote_generator.records_stored()
152 }
153
154 #[must_use]
156 pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
157 Arc::clone(&self.payment_verifier)
158 }
159
160 pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
171 let message = ChunkMessage::decode(data)
172 .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
173
174 let request_id = message.request_id;
175
176 let response_body = match message.body {
177 ChunkMessageBody::PutRequest(req) => {
178 ChunkMessageBody::PutResponse(self.handle_put(req).await)
179 }
180 ChunkMessageBody::GetRequest(req) => {
181 ChunkMessageBody::GetResponse(self.handle_get(req).await)
182 }
183 ChunkMessageBody::QuoteRequest(ref req) => {
184 ChunkMessageBody::QuoteResponse(self.handle_quote(req))
185 }
186 ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
187 ChunkMessageBody::MerkleCandidateQuoteResponse(
188 self.handle_merkle_candidate_quote(req),
189 )
190 }
191 _ => return Ok(None),
203 };
204
205 let response = ChunkMessage {
206 request_id,
207 body: response_body,
208 };
209
210 response
211 .encode()
212 .map(|b| Some(Bytes::from(b)))
213 .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
214 }
215
216 async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
224 let start = std::time::Instant::now();
225 let addr_hex = hex::encode(request.address);
226 let chunk_size = request.content.len();
227 let response = self.handle_put_inner(request).await;
228 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
229 let outcome: &'static str = match &response {
230 ChunkPutResponse::Success { .. } => "success",
231 ChunkPutResponse::AlreadyExists { .. } => "already_exists",
232 ChunkPutResponse::PaymentRequired { .. } => "payment_required",
233 ChunkPutResponse::Error(_) => "error",
234 _ => "unknown",
235 };
236 info!(
237 target: "ant_node::storage::rpc_latency",
238 duration_ms,
239 chunk_size,
240 outcome,
241 addr = %addr_hex,
242 "put_rpc"
243 );
244 response
245 }
246
247 async fn handle_put_inner(&self, request: ChunkPutRequest) -> ChunkPutResponse {
249 let address = request.address;
250 let addr_hex = hex::encode(address);
251 debug!("Handling PUT request for {addr_hex}");
252
253 if request.content.len() > MAX_CHUNK_SIZE {
255 return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
256 size: request.content.len(),
257 max_size: MAX_CHUNK_SIZE,
258 });
259 }
260
261 let computed = compute_address(&request.content);
263 if computed != address {
264 return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
265 expected: address,
266 actual: computed,
267 });
268 }
269
270 match self.storage.exists(&address) {
272 Ok(true) => {
273 debug!("Chunk {addr_hex} already exists");
274 return ChunkPutResponse::AlreadyExists { address };
275 }
276 Err(e) => {
277 return ChunkPutResponse::Error(ProtocolError::Internal(format!(
278 "Storage read failed: {e}"
279 )));
280 }
281 Ok(false) => {}
282 }
283
284 if let Err(e) = self.storage.check_capacity() {
295 info!(
296 target: "ant_node::storage::disk_precheck",
297 addr = %addr_hex,
298 "Rejecting PUT before payment verification: {e}"
299 );
300 return ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()));
301 }
302
303 let attached = self.p2p_node.read().as_ref().map(Arc::clone);
312 if let Some(p2p) = attached {
313 let self_id = *p2p.peer_id();
314 if !admission::is_responsible(&self_id, &address, &p2p, SELF_CLOSENESS_GATE_WIDTH).await
315 {
316 debug!("Rejecting PUT for {addr_hex}: not within local closest peers");
317 return ChunkPutResponse::Error(ProtocolError::StorageFailed(
318 "node is not within its local closest peers for this address".to_string(),
319 ));
320 }
321 }
322
323 let payment_result = self
326 .payment_verifier
327 .verify_payment(
328 &address,
329 request.payment_proof.as_deref(),
330 VerificationContext::ClientPut,
331 )
332 .await;
333
334 match payment_result {
335 Ok(status) if status.can_store() => {
336 }
338 Ok(_) => {
339 return ChunkPutResponse::PaymentRequired {
340 message: "Payment required for new chunk".to_string(),
341 };
342 }
343 Err(e) => {
344 return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
345 }
346 }
347
348 match self.storage.put(&address, &request.content).await {
350 Ok(_) => {
351 let content_len = request.content.len();
352 info!("Stored chunk {addr_hex} ({content_len} bytes)");
353 self.quote_generator.record_store();
359
360 if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
365 let event = FreshWriteEvent {
371 key: address,
372 data: request.content.to_vec(),
373 payment_proof: proof,
374 };
375 if tx.send(event).is_err() {
376 debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
377 }
378 }
379
380 ChunkPutResponse::Success { address }
381 }
382 Err(e) => {
383 warn!("Failed to store chunk {addr_hex}: {e}");
384 ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
385 }
386 }
387 }
388
389 async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
394 let start = std::time::Instant::now();
395 let addr_hex = hex::encode(request.address);
396 let response = self.handle_get_inner(request).await;
397 let duration_ms = u64::try_from(start.elapsed().as_millis()).unwrap_or(u64::MAX);
398 let outcome: &'static str = match &response {
399 ChunkGetResponse::Success { .. } => "success",
400 ChunkGetResponse::NotFound { .. } => "not_found",
401 ChunkGetResponse::Error(_) => "error",
402 _ => "unknown",
403 };
404 info!(
405 target: "ant_node::storage::rpc_latency",
406 duration_ms,
407 outcome,
408 addr = %addr_hex,
409 "get_rpc"
410 );
411 response
412 }
413
414 async fn handle_get_inner(&self, request: ChunkGetRequest) -> ChunkGetResponse {
416 let address = request.address;
417 let addr_hex = hex::encode(address);
418 debug!("Handling GET request for {addr_hex}");
419
420 match self.storage.get(&address).await {
421 Ok(Some(content)) => {
422 let content_len = content.len();
423 debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
424 ChunkGetResponse::Success { address, content }
425 }
426 Ok(None) => {
427 debug!("Chunk {addr_hex} not found");
428 ChunkGetResponse::NotFound { address }
429 }
430 Err(e) => {
431 warn!("Failed to retrieve chunk {addr_hex}: {e}");
432 ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
433 }
434 }
435 }
436
437 fn resync_quote_metric(&self) {
450 match self.storage.current_chunks() {
451 Ok(count) => usize::try_from(count).map_or_else(
455 |_| {
456 warn!(
457 "current_chunks() count {count} overflows usize; keeping previous quote \
458 metric"
459 );
460 },
461 |records| self.quote_generator.resync_records(records),
462 ),
463 Err(e) => {
464 warn!("Failed to read current_chunks() for quote metric resync: {e}");
465 }
466 }
467 }
468
469 fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
471 let addr_hex = hex::encode(request.address);
472 let data_size = request.data_size;
473 debug!("Handling quote request for {addr_hex} (size: {data_size})");
474
475 self.resync_quote_metric();
477
478 #[allow(clippy::manual_unwrap_or_default)]
484 let already_stored = match self.storage.exists(&request.address) {
485 Ok(exists) => exists,
486 Err(e) => {
487 warn!("Storage check failed for {addr_hex}: {e}");
488 false }
490 };
491
492 if already_stored {
493 debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
494 }
495
496 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
498 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
499 size: MAX_CHUNK_SIZE + 1,
500 max_size: MAX_CHUNK_SIZE,
501 });
502 };
503 if data_size_usize > MAX_CHUNK_SIZE {
504 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
505 size: data_size_usize,
506 max_size: MAX_CHUNK_SIZE,
507 });
508 }
509
510 match self
511 .quote_generator
512 .create_quote(request.address, data_size_usize, request.data_type)
513 {
514 Ok(quote) => {
515 match rmp_serde::to_vec("e) {
517 Ok(quote_bytes) => ChunkQuoteResponse::Success {
518 quote: quote_bytes,
519 already_stored,
520 },
521 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
522 "Failed to serialize quote: {e}"
523 ))),
524 }
525 }
526 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
527 }
528 }
529
530 fn handle_merkle_candidate_quote(
532 &self,
533 request: &MerkleCandidateQuoteRequest,
534 ) -> MerkleCandidateQuoteResponse {
535 let addr_hex = hex::encode(request.address);
536 let data_size = request.data_size;
537 debug!(
538 "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
539 request.merkle_payment_timestamp
540 );
541
542 self.resync_quote_metric();
544
545 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
546 return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
547 "data_size {} overflows usize",
548 request.data_size
549 )));
550 };
551 if data_size_usize > MAX_CHUNK_SIZE {
552 return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
553 size: data_size_usize,
554 max_size: MAX_CHUNK_SIZE,
555 });
556 }
557
558 match self.quote_generator.create_merkle_candidate_quote(
559 data_size_usize,
560 request.data_type,
561 request.merkle_payment_timestamp,
562 ) {
563 Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
564 Ok(bytes) => MerkleCandidateQuoteResponse::Success {
565 candidate_node: bytes,
566 },
567 Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
568 "Failed to serialize merkle candidate node: {e}"
569 ))),
570 },
571 Err(e) => {
572 MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
573 }
574 }
575 }
576
577 #[must_use]
579 pub fn storage_stats(&self) -> crate::storage::StorageStats {
580 self.storage.stats()
581 }
582
583 #[must_use]
585 pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
586 self.payment_verifier.cache_stats()
587 }
588
589 #[cfg(any(test, feature = "test-utils"))]
595 #[must_use]
596 pub fn payment_verifier(&self) -> &PaymentVerifier {
597 &self.payment_verifier
598 }
599
600 pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
606 self.storage.exists(address)
607 }
608
609 pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
615 self.storage.get(address).await
616 }
617
618 #[cfg(test)]
626 pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
627 self.storage.put(address, content).await
628 }
629}
630
631#[cfg(test)]
632#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
633mod tests {
634 use super::*;
635 use crate::payment::metrics::QuotingMetricsTracker;
636 use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
637 use crate::storage::LmdbStorageConfig;
638 use evmlib::RewardsAddress;
639 use saorsa_core::identity::NodeIdentity;
640 use saorsa_core::MlDsa65;
641 use saorsa_pqc::pqc::types::MlDsaSecretKey;
642 use tempfile::TempDir;
643
644 async fn create_test_protocol() -> (AntProtocol, TempDir) {
645 create_test_protocol_with_reserve(0).await
648 }
649
650 async fn create_test_protocol_with_reserve(disk_reserve: u64) -> (AntProtocol, TempDir) {
656 let temp_dir = TempDir::new().expect("create temp dir");
657
658 let storage_config = LmdbStorageConfig {
659 root_dir: temp_dir.path().to_path_buf(),
660 disk_reserve,
661 ..LmdbStorageConfig::test_default()
662 };
663 let storage = Arc::new(
664 LmdbStorage::new(storage_config)
665 .await
666 .expect("create storage"),
667 );
668
669 let rewards_address = RewardsAddress::new([1u8; 20]);
670 let payment_config = PaymentVerifierConfig {
671 evm: EvmVerifierConfig::default(),
672 cache_capacity: 100_000,
673 close_group_size: crate::ant_protocol::CLOSE_GROUP_SIZE,
674 local_rewards_address: rewards_address,
675 };
676 let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
677 let metrics_tracker = QuotingMetricsTracker::new(100);
678 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
679
680 let identity = NodeIdentity::generate().expect("generate identity");
682 let pub_key_bytes = identity.public_key().as_bytes().to_vec();
683 let sk_bytes = identity.secret_key_bytes().to_vec();
684 let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
685 quote_generator.set_signer(pub_key_bytes, move |msg| {
686 use saorsa_pqc::pqc::MlDsaOperations;
687 let ml_dsa = MlDsa65::new();
688 ml_dsa
689 .sign(&sk, msg)
690 .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
691 });
692
693 let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
694 (protocol, temp_dir)
695 }
696
697 #[tokio::test]
698 async fn test_put_and_get_chunk() {
699 let (protocol, _temp) = create_test_protocol().await;
700
701 let content = b"hello world";
702 let address = LmdbStorage::compute_address(content);
703
704 protocol.payment_verifier().cache_insert(address);
706
707 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
708 let put_msg = ChunkMessage {
709 request_id: 1,
710 body: ChunkMessageBody::PutRequest(put_request),
711 };
712 let put_bytes = put_msg.encode().expect("encode put");
713
714 let response_bytes = protocol
716 .try_handle_request(&put_bytes)
717 .await
718 .expect("handle put")
719 .expect("expected response");
720 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
721
722 assert_eq!(response.request_id, 1);
723 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
724 response.body
725 {
726 assert_eq!(addr, address);
727 } else {
728 panic!("expected PutResponse::Success, got: {response:?}");
729 }
730
731 let get_request = ChunkGetRequest::new(address);
733 let get_msg = ChunkMessage {
734 request_id: 2,
735 body: ChunkMessageBody::GetRequest(get_request),
736 };
737 let get_bytes = get_msg.encode().expect("encode get");
738
739 let response_bytes = protocol
741 .try_handle_request(&get_bytes)
742 .await
743 .expect("handle get")
744 .expect("expected response");
745 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
746
747 assert_eq!(response.request_id, 2);
748 if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
749 address: addr,
750 content: data,
751 }) = response.body
752 {
753 assert_eq!(addr, address);
754 assert_eq!(data, content.to_vec());
755 } else {
756 panic!("expected GetResponse::Success");
757 }
758 }
759
760 #[tokio::test]
761 async fn test_get_not_found() {
762 let (protocol, _temp) = create_test_protocol().await;
763
764 let address = [0xAB; 32];
765 let get_request = ChunkGetRequest::new(address);
766 let get_msg = ChunkMessage {
767 request_id: 10,
768 body: ChunkMessageBody::GetRequest(get_request),
769 };
770 let get_bytes = get_msg.encode().expect("encode get");
771
772 let response_bytes = protocol
773 .try_handle_request(&get_bytes)
774 .await
775 .expect("handle get")
776 .expect("expected response");
777 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
778
779 assert_eq!(response.request_id, 10);
780 if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
781 response.body
782 {
783 assert_eq!(addr, address);
784 } else {
785 panic!("expected GetResponse::NotFound");
786 }
787 }
788
789 #[tokio::test]
790 async fn test_put_address_mismatch() {
791 let (protocol, _temp) = create_test_protocol().await;
792
793 let content = b"test content";
794 let wrong_address = [0xFF; 32]; protocol.payment_verifier().cache_insert(wrong_address);
798
799 let put_request = ChunkPutRequest::new(wrong_address, Bytes::copy_from_slice(content));
800 let put_msg = ChunkMessage {
801 request_id: 20,
802 body: ChunkMessageBody::PutRequest(put_request),
803 };
804 let put_bytes = put_msg.encode().expect("encode put");
805
806 let response_bytes = protocol
807 .try_handle_request(&put_bytes)
808 .await
809 .expect("handle put")
810 .expect("expected response");
811 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
812
813 assert_eq!(response.request_id, 20);
814 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
815 ProtocolError::AddressMismatch { .. },
816 )) = response.body
817 {
818 } else {
820 panic!("expected AddressMismatch error, got: {response:?}");
821 }
822 }
823
824 #[tokio::test]
825 async fn test_put_chunk_too_large() {
826 let (protocol, _temp) = create_test_protocol().await;
827
828 let content = vec![0u8; MAX_CHUNK_SIZE + 1];
830 let address = LmdbStorage::compute_address(&content);
831
832 let put_request = ChunkPutRequest::new(address, Bytes::from(content));
833 let put_msg = ChunkMessage {
834 request_id: 30,
835 body: ChunkMessageBody::PutRequest(put_request),
836 };
837 let put_bytes = put_msg.encode().expect("encode put");
838
839 let response_bytes = protocol
840 .try_handle_request(&put_bytes)
841 .await
842 .expect("handle put")
843 .expect("expected response");
844 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
845
846 assert_eq!(response.request_id, 30);
847 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
848 ProtocolError::ChunkTooLarge { .. },
849 )) = response.body
850 {
851 } else {
853 panic!("expected ChunkTooLarge error");
854 }
855 }
856
857 #[tokio::test]
866 async fn test_put_rejected_on_insufficient_disk_before_verification() {
867 let (protocol, _temp) = create_test_protocol_with_reserve(u64::MAX).await;
870
871 let content = b"chunk for a disk-full node";
872 let address = LmdbStorage::compute_address(content);
873
874 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
875 let put_msg = ChunkMessage {
876 request_id: 41,
877 body: ChunkMessageBody::PutRequest(put_request),
878 };
879 let put_bytes = put_msg.encode().expect("encode put");
880
881 let response_bytes = protocol
882 .try_handle_request(&put_bytes)
883 .await
884 .expect("handle put")
885 .expect("expected response");
886 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
887
888 assert_eq!(response.request_id, 41);
889 match response.body {
890 ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
891 ProtocolError::StorageFailed(msg),
892 )) => {
893 assert!(
894 msg.contains("Insufficient disk space"),
895 "expected disk-space error, got: {msg}"
896 );
897 }
898 other => {
899 panic!("expected StorageFailed disk error before verification, got: {other:?}")
900 }
901 }
902
903 assert!(!protocol.exists(&address).expect("exists check"));
905 }
906
907 #[tokio::test]
908 async fn test_put_already_exists() {
909 let (protocol, _temp) = create_test_protocol().await;
910
911 let content = b"duplicate content";
912 let address = LmdbStorage::compute_address(content);
913
914 protocol.payment_verifier().cache_insert(address);
916
917 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
918 let put_msg = ChunkMessage {
919 request_id: 40,
920 body: ChunkMessageBody::PutRequest(put_request),
921 };
922 let put_bytes = put_msg.encode().expect("encode put");
923
924 let _ = protocol
925 .try_handle_request(&put_bytes)
926 .await
927 .expect("handle put");
928
929 let response_bytes = protocol
931 .try_handle_request(&put_bytes)
932 .await
933 .expect("handle put 2")
934 .expect("expected response");
935 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
936
937 assert_eq!(response.request_id, 40);
938 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
939 response.body
940 {
941 assert_eq!(addr, address);
942 } else {
943 panic!("expected AlreadyExists");
944 }
945 }
946
947 #[tokio::test]
948 async fn test_protocol_id() {
949 let (protocol, _temp) = create_test_protocol().await;
950 assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
951 }
952
953 #[tokio::test]
954 async fn test_exists_and_local_access() {
955 let (protocol, _temp) = create_test_protocol().await;
956
957 let content = b"local access test";
958 let address = LmdbStorage::compute_address(content);
959
960 assert!(!protocol.exists(&address).expect("exists check"));
961
962 protocol
963 .put_local(&address, content)
964 .await
965 .expect("put local");
966
967 assert!(protocol.exists(&address).expect("exists check"));
968
969 let retrieved = protocol.get_local(&address).await.expect("get local");
970 assert_eq!(retrieved, Some(content.to_vec()));
971 }
972
973 #[tokio::test]
974 async fn test_cache_insert_is_visible() {
975 let (protocol, _temp) = create_test_protocol().await;
976
977 let content = b"cache test content";
978 let address = LmdbStorage::compute_address(content);
979
980 let stats_before = protocol.payment_cache_stats();
982 assert_eq!(stats_before.additions, 0);
983
984 protocol.payment_verifier().cache_insert(address);
986
987 let stats_after = protocol.payment_cache_stats();
989 assert_eq!(stats_after.additions, 1);
990
991 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
993 let put_msg = ChunkMessage {
994 request_id: 100,
995 body: ChunkMessageBody::PutRequest(put_request),
996 };
997 let put_bytes = put_msg.encode().expect("encode put");
998 let response_bytes = protocol
999 .try_handle_request(&put_bytes)
1000 .await
1001 .expect("handle put")
1002 .expect("expected response");
1003 let response = ChunkMessage::decode(&response_bytes).expect("decode");
1004
1005 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
1006 } else {
1008 panic!("expected success, got: {response:?}");
1009 }
1010 }
1011
1012 #[tokio::test]
1013 async fn test_put_same_chunk_twice_hits_cache() {
1014 let (protocol, _temp) = create_test_protocol().await;
1015
1016 let content = b"duplicate cache test";
1017 let address = LmdbStorage::compute_address(content);
1018
1019 protocol.payment_verifier().cache_insert(address);
1021
1022 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
1024 let put_msg = ChunkMessage {
1025 request_id: 110,
1026 body: ChunkMessageBody::PutRequest(put_request),
1027 };
1028 let put_bytes = put_msg.encode().expect("encode put");
1029 let _ = protocol
1030 .try_handle_request(&put_bytes)
1031 .await
1032 .expect("handle put 1");
1033
1034 let response_bytes = protocol
1036 .try_handle_request(&put_bytes)
1037 .await
1038 .expect("handle put 2")
1039 .expect("expected response");
1040 let response = ChunkMessage::decode(&response_bytes).expect("decode");
1041
1042 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
1043 {
1044 } else {
1046 panic!("expected AlreadyExists, got: {response:?}");
1047 }
1048 }
1049
1050 #[tokio::test]
1051 async fn test_payment_cache_stats_returns_correct_values() {
1052 let (protocol, _temp) = create_test_protocol().await;
1053
1054 let stats = protocol.payment_cache_stats();
1055 assert_eq!(stats.hits, 0);
1056 assert_eq!(stats.misses, 0);
1057 assert_eq!(stats.additions, 0);
1058
1059 let content = b"stats test";
1061 let address = LmdbStorage::compute_address(content);
1062 protocol.payment_verifier().cache_insert(address);
1063
1064 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
1065 let put_msg = ChunkMessage {
1066 request_id: 120,
1067 body: ChunkMessageBody::PutRequest(put_request),
1068 };
1069 let put_bytes = put_msg.encode().expect("encode put");
1070 let _ = protocol
1071 .try_handle_request(&put_bytes)
1072 .await
1073 .expect("handle put");
1074
1075 let stats = protocol.payment_cache_stats();
1076 assert_eq!(stats.additions, 1);
1078 assert_eq!(stats.hits, 1);
1079 }
1080
1081 #[tokio::test]
1082 async fn test_storage_stats() {
1083 let (protocol, _temp) = create_test_protocol().await;
1084 let stats = protocol.storage_stats();
1085 assert_eq!(stats.chunks_stored, 0);
1086 }
1087
1088 #[tokio::test]
1089 async fn test_merkle_candidate_quote_request() {
1090 use ant_protocol::payment::verify::verify_merkle_candidate_signature;
1091 use evmlib::merkle_payments::MerklePaymentCandidateNode;
1092
1093 let (protocol, _temp) = create_test_protocol().await;
1095
1096 let address = [0x77; 32];
1097 let timestamp = std::time::SystemTime::now()
1098 .duration_since(std::time::UNIX_EPOCH)
1099 .expect("system time")
1100 .as_secs();
1101
1102 let request = MerkleCandidateQuoteRequest {
1103 address,
1104 data_type: DATA_TYPE_CHUNK,
1105 data_size: 4096,
1106 merkle_payment_timestamp: timestamp,
1107 };
1108 let msg = ChunkMessage {
1109 request_id: 600,
1110 body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
1111 };
1112 let msg_bytes = msg.encode().expect("encode request");
1113
1114 let response_bytes = protocol
1115 .try_handle_request(&msg_bytes)
1116 .await
1117 .expect("handle merkle candidate quote")
1118 .expect("expected response");
1119 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
1120
1121 assert_eq!(response.request_id, 600);
1122 match response.body {
1123 ChunkMessageBody::MerkleCandidateQuoteResponse(
1124 MerkleCandidateQuoteResponse::Success { candidate_node },
1125 ) => {
1126 let candidate: MerklePaymentCandidateNode =
1127 rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
1128
1129 assert!(
1131 verify_merkle_candidate_signature(&candidate),
1132 "ML-DSA-65 candidate signature must be valid"
1133 );
1134
1135 assert_eq!(candidate.merkle_payment_timestamp, timestamp);
1136 assert!(candidate.price >= evmlib::common::Amount::ZERO);
1138 }
1139 other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
1140 }
1141 }
1142
1143 #[tokio::test]
1144 async fn test_handle_unexpected_response_message() {
1145 let (protocol, _temp) = create_test_protocol().await;
1146
1147 let msg = ChunkMessage {
1149 request_id: 200,
1150 body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
1151 };
1152 let msg_bytes = msg.encode().expect("encode");
1153
1154 let result = protocol
1155 .try_handle_request(&msg_bytes)
1156 .await
1157 .expect("handle msg");
1158
1159 assert!(
1160 result.is_none(),
1161 "expected None for response message, got: {result:?}"
1162 );
1163 }
1164
1165 #[tokio::test]
1166 async fn test_quote_already_stored_flag() {
1167 let (protocol, _temp) = create_test_protocol().await;
1168
1169 let content = b"already stored quote test";
1170 let address = LmdbStorage::compute_address(content);
1171
1172 protocol.payment_verifier().cache_insert(address);
1174 let put_request = ChunkPutRequest::new(address, Bytes::copy_from_slice(content));
1175 let put_msg = ChunkMessage {
1176 request_id: 300,
1177 body: ChunkMessageBody::PutRequest(put_request),
1178 };
1179 let put_bytes = put_msg.encode().expect("encode put");
1180 let _ = protocol
1181 .try_handle_request(&put_bytes)
1182 .await
1183 .expect("handle put");
1184
1185 let quote_request = ChunkQuoteRequest {
1187 address,
1188 data_size: content.len() as u64,
1189 data_type: DATA_TYPE_CHUNK,
1190 };
1191 let quote_msg = ChunkMessage {
1192 request_id: 301,
1193 body: ChunkMessageBody::QuoteRequest(quote_request),
1194 };
1195 let quote_bytes = quote_msg.encode().expect("encode quote");
1196 let response_bytes = protocol
1197 .try_handle_request("e_bytes)
1198 .await
1199 .expect("handle quote")
1200 .expect("expected response");
1201 let response = ChunkMessage::decode(&response_bytes).expect("decode");
1202
1203 match response.body {
1204 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1205 already_stored, ..
1206 }) => {
1207 assert!(
1208 already_stored,
1209 "already_stored should be true for existing chunk"
1210 );
1211 }
1212 other => panic!("expected Success with already_stored, got: {other:?}"),
1213 }
1214
1215 let new_address = [0xFFu8; 32];
1217 let quote_request2 = ChunkQuoteRequest {
1218 address: new_address,
1219 data_size: 100,
1220 data_type: DATA_TYPE_CHUNK,
1221 };
1222 let quote_msg2 = ChunkMessage {
1223 request_id: 302,
1224 body: ChunkMessageBody::QuoteRequest(quote_request2),
1225 };
1226 let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
1227 let response_bytes2 = protocol
1228 .try_handle_request("e_bytes2)
1229 .await
1230 .expect("handle quote2")
1231 .expect("expected response");
1232 let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
1233
1234 match response2.body {
1235 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
1236 already_stored, ..
1237 }) => {
1238 assert!(
1239 !already_stored,
1240 "already_stored should be false for new chunk"
1241 );
1242 }
1243 other => panic!("expected Success with already_stored=false, got: {other:?}"),
1244 }
1245 }
1246
1247 fn priced_records_after_quote(protocol: &AntProtocol) -> usize {
1251 let quote_request = ChunkQuoteRequest {
1252 address: [0xAAu8; 32], data_size: 100,
1254 data_type: DATA_TYPE_CHUNK,
1255 };
1256 let _ = protocol.handle_quote("e_request);
1257 protocol.priced_records_stored()
1258 }
1259
1260 #[tokio::test]
1264 async fn test_quote_metric_reflects_deletions() {
1265 let (protocol, _temp) = create_test_protocol().await;
1266
1267 let contents: Vec<Vec<u8>> = (0u8..5).map(|i| vec![i; 64]).collect();
1269 let mut addresses = Vec::new();
1270 for content in &contents {
1271 let addr = LmdbStorage::compute_address(content);
1272 protocol.put_local(&addr, content).await.expect("put_local");
1273 addresses.push(addr);
1274 }
1275
1276 assert_eq!(priced_records_after_quote(&protocol), 5);
1278
1279 for addr in addresses.iter().take(2) {
1281 assert!(protocol.storage().delete(addr).await.expect("delete"));
1282 }
1283 assert_eq!(priced_records_after_quote(&protocol), 3);
1284
1285 for addr in addresses.iter().skip(2) {
1287 assert!(protocol.storage().delete(addr).await.expect("delete"));
1288 }
1289 assert_eq!(priced_records_after_quote(&protocol), 0);
1290 }
1291
1292 #[tokio::test]
1299 async fn test_quote_price_drops_after_deletion() {
1300 use crate::payment::pricing::calculate_price;
1301
1302 let (protocol, _temp) = create_test_protocol().await;
1303 let contents: Vec<Vec<u8>> = (0u8..10).map(|i| vec![i; 64]).collect();
1304 let mut addresses = Vec::new();
1305 for content in &contents {
1306 let addr = LmdbStorage::compute_address(content);
1307 protocol.put_local(&addr, content).await.expect("put_local");
1308 addresses.push(addr);
1309 }
1310
1311 assert_eq!(priced_records_after_quote(&protocol), 10);
1315 let price_full = calculate_price(10);
1316
1317 for addr in addresses.iter().take(8) {
1319 assert!(protocol.storage().delete(addr).await.expect("delete"));
1320 }
1321 assert_eq!(priced_records_after_quote(&protocol), 2);
1325 let price_after = calculate_price(2);
1326 assert!(
1327 price_after < price_full,
1328 "deleting data must lower the observable quote price \
1329 (full={price_full:?}, after={price_after:?})"
1330 );
1331 }
1332}