1use crate::ant_protocol::{
30 ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
31 ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID,
32 DATA_TYPE_CHUNK, MAX_CHUNK_SIZE,
33};
34use crate::client::compute_address;
35use crate::error::{Error, Result};
36use crate::payment::{PaymentVerifier, QuoteGenerator};
37use crate::storage::lmdb::LmdbStorage;
38use bytes::Bytes;
39use std::sync::Arc;
40use tracing::{debug, info, warn};
41
42pub struct AntProtocol {
47 storage: Arc<LmdbStorage>,
49 payment_verifier: Arc<PaymentVerifier>,
51 quote_generator: Arc<QuoteGenerator>,
53}
54
55impl AntProtocol {
56 #[must_use]
64 pub fn new(
65 storage: Arc<LmdbStorage>,
66 payment_verifier: Arc<PaymentVerifier>,
67 quote_generator: Arc<QuoteGenerator>,
68 ) -> Self {
69 Self {
70 storage,
71 payment_verifier,
72 quote_generator,
73 }
74 }
75
76 #[must_use]
78 pub fn protocol_id(&self) -> &'static str {
79 CHUNK_PROTOCOL_ID
80 }
81
82 pub async fn handle_message(&self, data: &[u8]) -> Result<Bytes> {
96 let message = ChunkMessage::decode(data)
97 .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
98
99 let request_id = message.request_id;
100
101 let response_body = match message.body {
102 ChunkMessageBody::PutRequest(req) => {
103 ChunkMessageBody::PutResponse(self.handle_put(req).await)
104 }
105 ChunkMessageBody::GetRequest(req) => {
106 ChunkMessageBody::GetResponse(self.handle_get(req).await)
107 }
108 ChunkMessageBody::QuoteRequest(ref req) => {
109 ChunkMessageBody::QuoteResponse(self.handle_quote(req))
110 }
111 ChunkMessageBody::PutResponse(_)
113 | ChunkMessageBody::GetResponse(_)
114 | ChunkMessageBody::QuoteResponse(_) => {
115 let error = ProtocolError::Internal("Unexpected response message".to_string());
116 ChunkMessageBody::PutResponse(ChunkPutResponse::Error(error))
117 }
118 };
119
120 let response = ChunkMessage {
121 request_id,
122 body: response_body,
123 };
124
125 response
126 .encode()
127 .map(Bytes::from)
128 .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
129 }
130
131 async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
133 let address = request.address;
134 let addr_hex = hex::encode(address);
135 debug!("Handling PUT request for {addr_hex}");
136
137 if request.content.len() > MAX_CHUNK_SIZE {
139 return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
140 size: request.content.len(),
141 max_size: MAX_CHUNK_SIZE,
142 });
143 }
144
145 let computed = compute_address(&request.content);
147 if computed != address {
148 return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
149 expected: address,
150 actual: computed,
151 });
152 }
153
154 match self.storage.exists(&address) {
156 Ok(true) => {
157 debug!("Chunk {addr_hex} already exists");
158 return ChunkPutResponse::AlreadyExists { address };
159 }
160 Err(e) => {
161 return ChunkPutResponse::Error(ProtocolError::Internal(format!(
162 "Storage read failed: {e}"
163 )));
164 }
165 Ok(false) => {}
166 }
167
168 let payment_result = self
170 .payment_verifier
171 .verify_payment(&address, request.payment_proof.as_deref())
172 .await;
173
174 match payment_result {
175 Ok(status) if status.can_store() => {
176 }
178 Ok(_) => {
179 return ChunkPutResponse::PaymentRequired {
180 message: "Payment required for new chunk".to_string(),
181 };
182 }
183 Err(e) => {
184 return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
185 }
186 }
187
188 match self.storage.put(&address, &request.content).await {
190 Ok(_) => {
191 let content_len = request.content.len();
192 info!("Stored chunk {addr_hex} ({content_len} bytes)");
193 self.quote_generator.record_store(DATA_TYPE_CHUNK);
195 self.quote_generator.record_payment();
196 ChunkPutResponse::Success { address }
197 }
198 Err(e) => {
199 warn!("Failed to store chunk {addr_hex}: {e}");
200 ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
201 }
202 }
203 }
204
205 async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
207 let address = request.address;
208 let addr_hex = hex::encode(address);
209 debug!("Handling GET request for {addr_hex}");
210
211 match self.storage.get(&address).await {
212 Ok(Some(content)) => {
213 let content_len = content.len();
214 debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
215 ChunkGetResponse::Success { address, content }
216 }
217 Ok(None) => {
218 debug!("Chunk {addr_hex} not found");
219 ChunkGetResponse::NotFound { address }
220 }
221 Err(e) => {
222 warn!("Failed to retrieve chunk {addr_hex}: {e}");
223 ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
224 }
225 }
226 }
227
228 fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
230 let addr_hex = hex::encode(request.address);
231 let data_size = request.data_size;
232 debug!("Handling quote request for {addr_hex} (size: {data_size})");
233
234 let already_stored = match self.storage.exists(&request.address) {
237 Ok(exists) => exists,
238 Err(e) => {
239 warn!("Storage check failed for {addr_hex}: {e}");
240 false }
242 };
243
244 if already_stored {
245 debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
246 }
247
248 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
250 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
251 size: MAX_CHUNK_SIZE + 1,
252 max_size: MAX_CHUNK_SIZE,
253 });
254 };
255 if data_size_usize > MAX_CHUNK_SIZE {
256 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
257 size: data_size_usize,
258 max_size: MAX_CHUNK_SIZE,
259 });
260 }
261
262 match self
263 .quote_generator
264 .create_quote(request.address, data_size_usize, request.data_type)
265 {
266 Ok(quote) => {
267 match rmp_serde::to_vec("e) {
269 Ok(quote_bytes) => ChunkQuoteResponse::Success {
270 quote: quote_bytes,
271 already_stored,
272 },
273 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
274 "Failed to serialize quote: {e}"
275 ))),
276 }
277 }
278 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
279 }
280 }
281
282 #[must_use]
284 pub fn storage_stats(&self) -> crate::storage::StorageStats {
285 self.storage.stats()
286 }
287
288 #[must_use]
290 pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
291 self.payment_verifier.cache_stats()
292 }
293
294 #[cfg(any(test, feature = "test-utils"))]
300 #[must_use]
301 pub fn payment_verifier(&self) -> &PaymentVerifier {
302 &self.payment_verifier
303 }
304
305 pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
311 self.storage.exists(address)
312 }
313
314 pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
320 self.storage.get(address).await
321 }
322
323 #[cfg(test)]
331 pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
332 self.storage.put(address, content).await
333 }
334}
335
336#[cfg(test)]
337#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
338mod tests {
339 use super::*;
340 use crate::payment::metrics::QuotingMetricsTracker;
341 use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
342 use crate::storage::LmdbStorageConfig;
343 use ant_evm::RewardsAddress;
344 use saorsa_core::identity::NodeIdentity;
345 use saorsa_core::MlDsa65;
346 use saorsa_pqc::pqc::types::MlDsaSecretKey;
347 use tempfile::TempDir;
348
349 async fn create_test_protocol() -> (AntProtocol, TempDir) {
350 let temp_dir = TempDir::new().expect("create temp dir");
351
352 let storage_config = LmdbStorageConfig {
353 root_dir: temp_dir.path().to_path_buf(),
354 verify_on_read: true,
355 max_chunks: 0,
356 max_map_size: 0,
357 };
358 let storage = Arc::new(
359 LmdbStorage::new(storage_config)
360 .await
361 .expect("create storage"),
362 );
363
364 let rewards_address = RewardsAddress::new([1u8; 20]);
365 let payment_config = PaymentVerifierConfig {
366 evm: EvmVerifierConfig::default(),
367 cache_capacity: 100_000,
368 local_rewards_address: rewards_address,
369 };
370 let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
371 let metrics_tracker = QuotingMetricsTracker::new(1000, 100);
372 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
373
374 let identity = NodeIdentity::generate().expect("generate identity");
376 let pub_key_bytes = identity.public_key().as_bytes().to_vec();
377 let sk_bytes = identity.secret_key_bytes().to_vec();
378 let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
379 quote_generator.set_signer(pub_key_bytes, move |msg| {
380 use saorsa_pqc::pqc::MlDsaOperations;
381 let ml_dsa = MlDsa65::new();
382 ml_dsa
383 .sign(&sk, msg)
384 .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
385 });
386
387 let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
388 (protocol, temp_dir)
389 }
390
391 #[tokio::test]
392 async fn test_put_and_get_chunk() {
393 let (protocol, _temp) = create_test_protocol().await;
394
395 let content = b"hello world";
396 let address = LmdbStorage::compute_address(content);
397
398 protocol.payment_verifier().cache_insert(address);
400
401 let put_request = ChunkPutRequest::new(address, content.to_vec());
402 let put_msg = ChunkMessage {
403 request_id: 1,
404 body: ChunkMessageBody::PutRequest(put_request),
405 };
406 let put_bytes = put_msg.encode().expect("encode put");
407
408 let response_bytes = protocol
410 .handle_message(&put_bytes)
411 .await
412 .expect("handle put");
413 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
414
415 assert_eq!(response.request_id, 1);
416 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
417 response.body
418 {
419 assert_eq!(addr, address);
420 } else {
421 panic!("expected PutResponse::Success, got: {response:?}");
422 }
423
424 let get_request = ChunkGetRequest::new(address);
426 let get_msg = ChunkMessage {
427 request_id: 2,
428 body: ChunkMessageBody::GetRequest(get_request),
429 };
430 let get_bytes = get_msg.encode().expect("encode get");
431
432 let response_bytes = protocol
434 .handle_message(&get_bytes)
435 .await
436 .expect("handle get");
437 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
438
439 assert_eq!(response.request_id, 2);
440 if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
441 address: addr,
442 content: data,
443 }) = response.body
444 {
445 assert_eq!(addr, address);
446 assert_eq!(data, content.to_vec());
447 } else {
448 panic!("expected GetResponse::Success");
449 }
450 }
451
452 #[tokio::test]
453 async fn test_get_not_found() {
454 let (protocol, _temp) = create_test_protocol().await;
455
456 let address = [0xAB; 32];
457 let get_request = ChunkGetRequest::new(address);
458 let get_msg = ChunkMessage {
459 request_id: 10,
460 body: ChunkMessageBody::GetRequest(get_request),
461 };
462 let get_bytes = get_msg.encode().expect("encode get");
463
464 let response_bytes = protocol
465 .handle_message(&get_bytes)
466 .await
467 .expect("handle get");
468 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
469
470 assert_eq!(response.request_id, 10);
471 if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
472 response.body
473 {
474 assert_eq!(addr, address);
475 } else {
476 panic!("expected GetResponse::NotFound");
477 }
478 }
479
480 #[tokio::test]
481 async fn test_put_address_mismatch() {
482 let (protocol, _temp) = create_test_protocol().await;
483
484 let content = b"test content";
485 let wrong_address = [0xFF; 32]; protocol.payment_verifier().cache_insert(wrong_address);
489
490 let put_request = ChunkPutRequest::new(wrong_address, content.to_vec());
491 let put_msg = ChunkMessage {
492 request_id: 20,
493 body: ChunkMessageBody::PutRequest(put_request),
494 };
495 let put_bytes = put_msg.encode().expect("encode put");
496
497 let response_bytes = protocol
498 .handle_message(&put_bytes)
499 .await
500 .expect("handle put");
501 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
502
503 assert_eq!(response.request_id, 20);
504 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
505 ProtocolError::AddressMismatch { .. },
506 )) = response.body
507 {
508 } else {
510 panic!("expected AddressMismatch error, got: {response:?}");
511 }
512 }
513
514 #[tokio::test]
515 async fn test_put_chunk_too_large() {
516 let (protocol, _temp) = create_test_protocol().await;
517
518 let content = vec![0u8; MAX_CHUNK_SIZE + 1];
520 let address = LmdbStorage::compute_address(&content);
521
522 let put_request = ChunkPutRequest::new(address, content);
523 let put_msg = ChunkMessage {
524 request_id: 30,
525 body: ChunkMessageBody::PutRequest(put_request),
526 };
527 let put_bytes = put_msg.encode().expect("encode put");
528
529 let response_bytes = protocol
530 .handle_message(&put_bytes)
531 .await
532 .expect("handle put");
533 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
534
535 assert_eq!(response.request_id, 30);
536 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
537 ProtocolError::ChunkTooLarge { .. },
538 )) = response.body
539 {
540 } else {
542 panic!("expected ChunkTooLarge error");
543 }
544 }
545
546 #[tokio::test]
547 async fn test_put_already_exists() {
548 let (protocol, _temp) = create_test_protocol().await;
549
550 let content = b"duplicate content";
551 let address = LmdbStorage::compute_address(content);
552
553 protocol.payment_verifier().cache_insert(address);
555
556 let put_request = ChunkPutRequest::new(address, content.to_vec());
557 let put_msg = ChunkMessage {
558 request_id: 40,
559 body: ChunkMessageBody::PutRequest(put_request),
560 };
561 let put_bytes = put_msg.encode().expect("encode put");
562
563 let _ = protocol
564 .handle_message(&put_bytes)
565 .await
566 .expect("handle put");
567
568 let response_bytes = protocol
570 .handle_message(&put_bytes)
571 .await
572 .expect("handle put 2");
573 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
574
575 assert_eq!(response.request_id, 40);
576 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
577 response.body
578 {
579 assert_eq!(addr, address);
580 } else {
581 panic!("expected AlreadyExists");
582 }
583 }
584
585 #[tokio::test]
586 async fn test_protocol_id() {
587 let (protocol, _temp) = create_test_protocol().await;
588 assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
589 }
590
591 #[tokio::test]
592 async fn test_exists_and_local_access() {
593 let (protocol, _temp) = create_test_protocol().await;
594
595 let content = b"local access test";
596 let address = LmdbStorage::compute_address(content);
597
598 assert!(!protocol.exists(&address).expect("exists check"));
599
600 protocol
601 .put_local(&address, content)
602 .await
603 .expect("put local");
604
605 assert!(protocol.exists(&address).expect("exists check"));
606
607 let retrieved = protocol.get_local(&address).await.expect("get local");
608 assert_eq!(retrieved, Some(content.to_vec()));
609 }
610
611 #[tokio::test]
612 async fn test_cache_insert_is_visible() {
613 let (protocol, _temp) = create_test_protocol().await;
614
615 let content = b"cache test content";
616 let address = LmdbStorage::compute_address(content);
617
618 let stats_before = protocol.payment_cache_stats();
620 assert_eq!(stats_before.additions, 0);
621
622 protocol.payment_verifier().cache_insert(address);
624
625 let stats_after = protocol.payment_cache_stats();
627 assert_eq!(stats_after.additions, 1);
628
629 let put_request = ChunkPutRequest::new(address, content.to_vec());
631 let put_msg = ChunkMessage {
632 request_id: 100,
633 body: ChunkMessageBody::PutRequest(put_request),
634 };
635 let put_bytes = put_msg.encode().expect("encode put");
636 let response_bytes = protocol
637 .handle_message(&put_bytes)
638 .await
639 .expect("handle put");
640 let response = ChunkMessage::decode(&response_bytes).expect("decode");
641
642 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
643 } else {
645 panic!("expected success, got: {response:?}");
646 }
647 }
648
649 #[tokio::test]
650 async fn test_put_same_chunk_twice_hits_cache() {
651 let (protocol, _temp) = create_test_protocol().await;
652
653 let content = b"duplicate cache test";
654 let address = LmdbStorage::compute_address(content);
655
656 protocol.payment_verifier().cache_insert(address);
658
659 let put_request = ChunkPutRequest::new(address, content.to_vec());
661 let put_msg = ChunkMessage {
662 request_id: 110,
663 body: ChunkMessageBody::PutRequest(put_request),
664 };
665 let put_bytes = put_msg.encode().expect("encode put");
666 let _ = protocol
667 .handle_message(&put_bytes)
668 .await
669 .expect("handle put 1");
670
671 let response_bytes = protocol
673 .handle_message(&put_bytes)
674 .await
675 .expect("handle put 2");
676 let response = ChunkMessage::decode(&response_bytes).expect("decode");
677
678 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
679 {
680 } else {
682 panic!("expected AlreadyExists, got: {response:?}");
683 }
684 }
685
686 #[tokio::test]
687 async fn test_payment_cache_stats_returns_correct_values() {
688 let (protocol, _temp) = create_test_protocol().await;
689
690 let stats = protocol.payment_cache_stats();
691 assert_eq!(stats.hits, 0);
692 assert_eq!(stats.misses, 0);
693 assert_eq!(stats.additions, 0);
694
695 let content = b"stats test";
697 let address = LmdbStorage::compute_address(content);
698 protocol.payment_verifier().cache_insert(address);
699
700 let put_request = ChunkPutRequest::new(address, content.to_vec());
701 let put_msg = ChunkMessage {
702 request_id: 120,
703 body: ChunkMessageBody::PutRequest(put_request),
704 };
705 let put_bytes = put_msg.encode().expect("encode put");
706 let _ = protocol
707 .handle_message(&put_bytes)
708 .await
709 .expect("handle put");
710
711 let stats = protocol.payment_cache_stats();
712 assert_eq!(stats.additions, 1);
714 assert_eq!(stats.hits, 1);
715 }
716
717 #[tokio::test]
718 async fn test_storage_stats() {
719 let (protocol, _temp) = create_test_protocol().await;
720 let stats = protocol.storage_stats();
721 assert_eq!(stats.chunks_stored, 0);
722 }
723
724 #[tokio::test]
725 async fn test_handle_unexpected_response_message() {
726 let (protocol, _temp) = create_test_protocol().await;
727
728 let msg = ChunkMessage {
730 request_id: 200,
731 body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
732 };
733 let msg_bytes = msg.encode().expect("encode");
734
735 let response_bytes = protocol
736 .handle_message(&msg_bytes)
737 .await
738 .expect("handle msg");
739 let response = ChunkMessage::decode(&response_bytes).expect("decode");
740
741 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(ProtocolError::Internal(
742 msg,
743 ))) = response.body
744 {
745 assert!(msg.contains("Unexpected"));
746 } else {
747 panic!("expected Internal error, got: {response:?}");
748 }
749 }
750
751 #[tokio::test]
752 async fn test_quote_already_stored_flag() {
753 let (protocol, _temp) = create_test_protocol().await;
754
755 let content = b"already stored quote test";
756 let address = LmdbStorage::compute_address(content);
757
758 protocol.payment_verifier().cache_insert(address);
760 let put_request = ChunkPutRequest::new(address, content.to_vec());
761 let put_msg = ChunkMessage {
762 request_id: 300,
763 body: ChunkMessageBody::PutRequest(put_request),
764 };
765 let put_bytes = put_msg.encode().expect("encode put");
766 let _ = protocol
767 .handle_message(&put_bytes)
768 .await
769 .expect("handle put");
770
771 let quote_request = ChunkQuoteRequest {
773 address,
774 data_size: content.len() as u64,
775 data_type: DATA_TYPE_CHUNK,
776 };
777 let quote_msg = ChunkMessage {
778 request_id: 301,
779 body: ChunkMessageBody::QuoteRequest(quote_request),
780 };
781 let quote_bytes = quote_msg.encode().expect("encode quote");
782 let response_bytes = protocol
783 .handle_message("e_bytes)
784 .await
785 .expect("handle quote");
786 let response = ChunkMessage::decode(&response_bytes).expect("decode");
787
788 match response.body {
789 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
790 already_stored, ..
791 }) => {
792 assert!(
793 already_stored,
794 "already_stored should be true for existing chunk"
795 );
796 }
797 other => panic!("expected Success with already_stored, got: {other:?}"),
798 }
799
800 let new_address = [0xFFu8; 32];
802 let quote_request2 = ChunkQuoteRequest {
803 address: new_address,
804 data_size: 100,
805 data_type: DATA_TYPE_CHUNK,
806 };
807 let quote_msg2 = ChunkMessage {
808 request_id: 302,
809 body: ChunkMessageBody::QuoteRequest(quote_request2),
810 };
811 let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
812 let response_bytes2 = protocol
813 .handle_message("e_bytes2)
814 .await
815 .expect("handle quote2");
816 let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
817
818 match response2.body {
819 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
820 already_stored, ..
821 }) => {
822 assert!(
823 !already_stored,
824 "already_stored should be false for new chunk"
825 );
826 }
827 other => panic!("expected Success with already_stored=false, got: {other:?}"),
828 }
829 }
830}