1use crate::ant_protocol::{
31 ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
32 ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
33 MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, DATA_TYPE_CHUNK,
34 MAX_CHUNK_SIZE,
35};
36use crate::client::compute_address;
37use crate::error::{Error, Result};
38use crate::payment::{PaymentVerifier, QuoteGenerator};
39use crate::storage::lmdb::LmdbStorage;
40use bytes::Bytes;
41use std::sync::Arc;
42use tracing::{debug, info, warn};
43
44pub struct AntProtocol {
49 storage: Arc<LmdbStorage>,
51 payment_verifier: Arc<PaymentVerifier>,
53 quote_generator: Arc<QuoteGenerator>,
56}
57
58impl AntProtocol {
59 #[must_use]
67 pub fn new(
68 storage: Arc<LmdbStorage>,
69 payment_verifier: Arc<PaymentVerifier>,
70 quote_generator: Arc<QuoteGenerator>,
71 ) -> Self {
72 Self {
73 storage,
74 payment_verifier,
75 quote_generator,
76 }
77 }
78
79 #[must_use]
81 pub fn protocol_id(&self) -> &'static str {
82 CHUNK_PROTOCOL_ID
83 }
84
85 pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
96 let message = ChunkMessage::decode(data)
97 .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
98
99 let request_id = message.request_id;
100
101 let response_body = match message.body {
102 ChunkMessageBody::PutRequest(req) => {
103 ChunkMessageBody::PutResponse(self.handle_put(req).await)
104 }
105 ChunkMessageBody::GetRequest(req) => {
106 ChunkMessageBody::GetResponse(self.handle_get(req).await)
107 }
108 ChunkMessageBody::QuoteRequest(ref req) => {
109 ChunkMessageBody::QuoteResponse(self.handle_quote(req))
110 }
111 ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
112 ChunkMessageBody::MerkleCandidateQuoteResponse(
113 self.handle_merkle_candidate_quote(req),
114 )
115 }
116 ChunkMessageBody::PutResponse(_)
121 | ChunkMessageBody::GetResponse(_)
122 | ChunkMessageBody::QuoteResponse(_)
123 | ChunkMessageBody::MerkleCandidateQuoteResponse(_) => return Ok(None),
124 };
125
126 let response = ChunkMessage {
127 request_id,
128 body: response_body,
129 };
130
131 response
132 .encode()
133 .map(|b| Some(Bytes::from(b)))
134 .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
135 }
136
137 async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
139 let address = request.address;
140 let addr_hex = hex::encode(address);
141 debug!("Handling PUT request for {addr_hex}");
142
143 if request.content.len() > MAX_CHUNK_SIZE {
145 return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
146 size: request.content.len(),
147 max_size: MAX_CHUNK_SIZE,
148 });
149 }
150
151 let computed = compute_address(&request.content);
153 if computed != address {
154 return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
155 expected: address,
156 actual: computed,
157 });
158 }
159
160 match self.storage.exists(&address) {
162 Ok(true) => {
163 debug!("Chunk {addr_hex} already exists");
164 return ChunkPutResponse::AlreadyExists { address };
165 }
166 Err(e) => {
167 return ChunkPutResponse::Error(ProtocolError::Internal(format!(
168 "Storage read failed: {e}"
169 )));
170 }
171 Ok(false) => {}
172 }
173
174 let payment_result = self
176 .payment_verifier
177 .verify_payment(&address, request.payment_proof.as_deref())
178 .await;
179
180 match payment_result {
181 Ok(status) if status.can_store() => {
182 }
184 Ok(_) => {
185 return ChunkPutResponse::PaymentRequired {
186 message: "Payment required for new chunk".to_string(),
187 };
188 }
189 Err(e) => {
190 return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
191 }
192 }
193
194 match self.storage.put(&address, &request.content).await {
196 Ok(_) => {
197 let content_len = request.content.len();
198 info!("Stored chunk {addr_hex} ({content_len} bytes)");
199 self.quote_generator.record_store(DATA_TYPE_CHUNK);
201 self.quote_generator.record_payment();
202 ChunkPutResponse::Success { address }
203 }
204 Err(e) => {
205 warn!("Failed to store chunk {addr_hex}: {e}");
206 ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
207 }
208 }
209 }
210
211 async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
213 let address = request.address;
214 let addr_hex = hex::encode(address);
215 debug!("Handling GET request for {addr_hex}");
216
217 match self.storage.get(&address).await {
218 Ok(Some(content)) => {
219 let content_len = content.len();
220 debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
221 ChunkGetResponse::Success { address, content }
222 }
223 Ok(None) => {
224 debug!("Chunk {addr_hex} not found");
225 ChunkGetResponse::NotFound { address }
226 }
227 Err(e) => {
228 warn!("Failed to retrieve chunk {addr_hex}: {e}");
229 ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
230 }
231 }
232 }
233
234 fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
236 let addr_hex = hex::encode(request.address);
237 let data_size = request.data_size;
238 debug!("Handling quote request for {addr_hex} (size: {data_size})");
239
240 let already_stored = match self.storage.exists(&request.address) {
243 Ok(exists) => exists,
244 Err(e) => {
245 warn!("Storage check failed for {addr_hex}: {e}");
246 false }
248 };
249
250 if already_stored {
251 debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
252 }
253
254 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
256 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
257 size: MAX_CHUNK_SIZE + 1,
258 max_size: MAX_CHUNK_SIZE,
259 });
260 };
261 if data_size_usize > MAX_CHUNK_SIZE {
262 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
263 size: data_size_usize,
264 max_size: MAX_CHUNK_SIZE,
265 });
266 }
267
268 match self
269 .quote_generator
270 .create_quote(request.address, data_size_usize, request.data_type)
271 {
272 Ok(quote) => {
273 match rmp_serde::to_vec("e) {
275 Ok(quote_bytes) => ChunkQuoteResponse::Success {
276 quote: quote_bytes,
277 already_stored,
278 },
279 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
280 "Failed to serialize quote: {e}"
281 ))),
282 }
283 }
284 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
285 }
286 }
287
288 fn handle_merkle_candidate_quote(
290 &self,
291 request: &MerkleCandidateQuoteRequest,
292 ) -> MerkleCandidateQuoteResponse {
293 let addr_hex = hex::encode(request.address);
294 let data_size = request.data_size;
295 debug!(
296 "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
297 request.merkle_payment_timestamp
298 );
299
300 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
301 return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
302 "data_size {} overflows usize",
303 request.data_size
304 )));
305 };
306 if data_size_usize > MAX_CHUNK_SIZE {
307 return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
308 size: data_size_usize,
309 max_size: MAX_CHUNK_SIZE,
310 });
311 }
312
313 match self.quote_generator.create_merkle_candidate_quote(
314 data_size_usize,
315 request.data_type,
316 request.merkle_payment_timestamp,
317 ) {
318 Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
319 Ok(bytes) => MerkleCandidateQuoteResponse::Success {
320 candidate_node: bytes,
321 },
322 Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
323 "Failed to serialize merkle candidate node: {e}"
324 ))),
325 },
326 Err(e) => {
327 MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
328 }
329 }
330 }
331
332 #[must_use]
334 pub fn storage_stats(&self) -> crate::storage::StorageStats {
335 self.storage.stats()
336 }
337
338 #[must_use]
340 pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
341 self.payment_verifier.cache_stats()
342 }
343
344 #[cfg(any(test, feature = "test-utils"))]
350 #[must_use]
351 pub fn payment_verifier(&self) -> &PaymentVerifier {
352 &self.payment_verifier
353 }
354
355 pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
361 self.storage.exists(address)
362 }
363
364 pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
370 self.storage.get(address).await
371 }
372
373 #[cfg(test)]
381 pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
382 self.storage.put(address, content).await
383 }
384}
385
386#[cfg(test)]
387#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
388mod tests {
389 use super::*;
390 use crate::payment::metrics::QuotingMetricsTracker;
391 use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
392 use crate::storage::LmdbStorageConfig;
393 use evmlib::RewardsAddress;
394 use saorsa_core::identity::NodeIdentity;
395 use saorsa_core::MlDsa65;
396 use saorsa_pqc::pqc::types::MlDsaSecretKey;
397 use tempfile::TempDir;
398
399 async fn create_test_protocol() -> (AntProtocol, TempDir) {
400 let temp_dir = TempDir::new().expect("create temp dir");
401
402 let storage_config = LmdbStorageConfig {
403 root_dir: temp_dir.path().to_path_buf(),
404 verify_on_read: true,
405 max_chunks: 0,
406 max_map_size: 0,
407 };
408 let storage = Arc::new(
409 LmdbStorage::new(storage_config)
410 .await
411 .expect("create storage"),
412 );
413
414 let rewards_address = RewardsAddress::new([1u8; 20]);
415 let payment_config = PaymentVerifierConfig {
416 evm: EvmVerifierConfig::default(),
417 cache_capacity: 100_000,
418 local_rewards_address: rewards_address,
419 };
420 let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
421 let metrics_tracker = QuotingMetricsTracker::new(1000, 100);
422 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
423
424 let identity = NodeIdentity::generate().expect("generate identity");
426 let pub_key_bytes = identity.public_key().as_bytes().to_vec();
427 let sk_bytes = identity.secret_key_bytes().to_vec();
428 let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
429 quote_generator.set_signer(pub_key_bytes, move |msg| {
430 use saorsa_pqc::pqc::MlDsaOperations;
431 let ml_dsa = MlDsa65::new();
432 ml_dsa
433 .sign(&sk, msg)
434 .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
435 });
436
437 let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
438 (protocol, temp_dir)
439 }
440
441 #[tokio::test]
442 async fn test_put_and_get_chunk() {
443 let (protocol, _temp) = create_test_protocol().await;
444
445 let content = b"hello world";
446 let address = LmdbStorage::compute_address(content);
447
448 protocol.payment_verifier().cache_insert(address);
450
451 let put_request = ChunkPutRequest::new(address, content.to_vec());
452 let put_msg = ChunkMessage {
453 request_id: 1,
454 body: ChunkMessageBody::PutRequest(put_request),
455 };
456 let put_bytes = put_msg.encode().expect("encode put");
457
458 let response_bytes = protocol
460 .try_handle_request(&put_bytes)
461 .await
462 .expect("handle put")
463 .expect("expected response");
464 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
465
466 assert_eq!(response.request_id, 1);
467 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
468 response.body
469 {
470 assert_eq!(addr, address);
471 } else {
472 panic!("expected PutResponse::Success, got: {response:?}");
473 }
474
475 let get_request = ChunkGetRequest::new(address);
477 let get_msg = ChunkMessage {
478 request_id: 2,
479 body: ChunkMessageBody::GetRequest(get_request),
480 };
481 let get_bytes = get_msg.encode().expect("encode get");
482
483 let response_bytes = protocol
485 .try_handle_request(&get_bytes)
486 .await
487 .expect("handle get")
488 .expect("expected response");
489 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
490
491 assert_eq!(response.request_id, 2);
492 if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
493 address: addr,
494 content: data,
495 }) = response.body
496 {
497 assert_eq!(addr, address);
498 assert_eq!(data, content.to_vec());
499 } else {
500 panic!("expected GetResponse::Success");
501 }
502 }
503
504 #[tokio::test]
505 async fn test_get_not_found() {
506 let (protocol, _temp) = create_test_protocol().await;
507
508 let address = [0xAB; 32];
509 let get_request = ChunkGetRequest::new(address);
510 let get_msg = ChunkMessage {
511 request_id: 10,
512 body: ChunkMessageBody::GetRequest(get_request),
513 };
514 let get_bytes = get_msg.encode().expect("encode get");
515
516 let response_bytes = protocol
517 .try_handle_request(&get_bytes)
518 .await
519 .expect("handle get")
520 .expect("expected response");
521 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
522
523 assert_eq!(response.request_id, 10);
524 if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
525 response.body
526 {
527 assert_eq!(addr, address);
528 } else {
529 panic!("expected GetResponse::NotFound");
530 }
531 }
532
533 #[tokio::test]
534 async fn test_put_address_mismatch() {
535 let (protocol, _temp) = create_test_protocol().await;
536
537 let content = b"test content";
538 let wrong_address = [0xFF; 32]; protocol.payment_verifier().cache_insert(wrong_address);
542
543 let put_request = ChunkPutRequest::new(wrong_address, content.to_vec());
544 let put_msg = ChunkMessage {
545 request_id: 20,
546 body: ChunkMessageBody::PutRequest(put_request),
547 };
548 let put_bytes = put_msg.encode().expect("encode put");
549
550 let response_bytes = protocol
551 .try_handle_request(&put_bytes)
552 .await
553 .expect("handle put")
554 .expect("expected response");
555 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
556
557 assert_eq!(response.request_id, 20);
558 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
559 ProtocolError::AddressMismatch { .. },
560 )) = response.body
561 {
562 } else {
564 panic!("expected AddressMismatch error, got: {response:?}");
565 }
566 }
567
568 #[tokio::test]
569 async fn test_put_chunk_too_large() {
570 let (protocol, _temp) = create_test_protocol().await;
571
572 let content = vec![0u8; MAX_CHUNK_SIZE + 1];
574 let address = LmdbStorage::compute_address(&content);
575
576 let put_request = ChunkPutRequest::new(address, content);
577 let put_msg = ChunkMessage {
578 request_id: 30,
579 body: ChunkMessageBody::PutRequest(put_request),
580 };
581 let put_bytes = put_msg.encode().expect("encode put");
582
583 let response_bytes = protocol
584 .try_handle_request(&put_bytes)
585 .await
586 .expect("handle put")
587 .expect("expected response");
588 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
589
590 assert_eq!(response.request_id, 30);
591 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
592 ProtocolError::ChunkTooLarge { .. },
593 )) = response.body
594 {
595 } else {
597 panic!("expected ChunkTooLarge error");
598 }
599 }
600
601 #[tokio::test]
602 async fn test_put_already_exists() {
603 let (protocol, _temp) = create_test_protocol().await;
604
605 let content = b"duplicate content";
606 let address = LmdbStorage::compute_address(content);
607
608 protocol.payment_verifier().cache_insert(address);
610
611 let put_request = ChunkPutRequest::new(address, content.to_vec());
612 let put_msg = ChunkMessage {
613 request_id: 40,
614 body: ChunkMessageBody::PutRequest(put_request),
615 };
616 let put_bytes = put_msg.encode().expect("encode put");
617
618 let _ = protocol
619 .try_handle_request(&put_bytes)
620 .await
621 .expect("handle put");
622
623 let response_bytes = protocol
625 .try_handle_request(&put_bytes)
626 .await
627 .expect("handle put 2")
628 .expect("expected response");
629 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
630
631 assert_eq!(response.request_id, 40);
632 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
633 response.body
634 {
635 assert_eq!(addr, address);
636 } else {
637 panic!("expected AlreadyExists");
638 }
639 }
640
641 #[tokio::test]
642 async fn test_protocol_id() {
643 let (protocol, _temp) = create_test_protocol().await;
644 assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
645 }
646
647 #[tokio::test]
648 async fn test_exists_and_local_access() {
649 let (protocol, _temp) = create_test_protocol().await;
650
651 let content = b"local access test";
652 let address = LmdbStorage::compute_address(content);
653
654 assert!(!protocol.exists(&address).expect("exists check"));
655
656 protocol
657 .put_local(&address, content)
658 .await
659 .expect("put local");
660
661 assert!(protocol.exists(&address).expect("exists check"));
662
663 let retrieved = protocol.get_local(&address).await.expect("get local");
664 assert_eq!(retrieved, Some(content.to_vec()));
665 }
666
667 #[tokio::test]
668 async fn test_cache_insert_is_visible() {
669 let (protocol, _temp) = create_test_protocol().await;
670
671 let content = b"cache test content";
672 let address = LmdbStorage::compute_address(content);
673
674 let stats_before = protocol.payment_cache_stats();
676 assert_eq!(stats_before.additions, 0);
677
678 protocol.payment_verifier().cache_insert(address);
680
681 let stats_after = protocol.payment_cache_stats();
683 assert_eq!(stats_after.additions, 1);
684
685 let put_request = ChunkPutRequest::new(address, content.to_vec());
687 let put_msg = ChunkMessage {
688 request_id: 100,
689 body: ChunkMessageBody::PutRequest(put_request),
690 };
691 let put_bytes = put_msg.encode().expect("encode put");
692 let response_bytes = protocol
693 .try_handle_request(&put_bytes)
694 .await
695 .expect("handle put")
696 .expect("expected response");
697 let response = ChunkMessage::decode(&response_bytes).expect("decode");
698
699 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
700 } else {
702 panic!("expected success, got: {response:?}");
703 }
704 }
705
706 #[tokio::test]
707 async fn test_put_same_chunk_twice_hits_cache() {
708 let (protocol, _temp) = create_test_protocol().await;
709
710 let content = b"duplicate cache test";
711 let address = LmdbStorage::compute_address(content);
712
713 protocol.payment_verifier().cache_insert(address);
715
716 let put_request = ChunkPutRequest::new(address, content.to_vec());
718 let put_msg = ChunkMessage {
719 request_id: 110,
720 body: ChunkMessageBody::PutRequest(put_request),
721 };
722 let put_bytes = put_msg.encode().expect("encode put");
723 let _ = protocol
724 .try_handle_request(&put_bytes)
725 .await
726 .expect("handle put 1");
727
728 let response_bytes = protocol
730 .try_handle_request(&put_bytes)
731 .await
732 .expect("handle put 2")
733 .expect("expected response");
734 let response = ChunkMessage::decode(&response_bytes).expect("decode");
735
736 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
737 {
738 } else {
740 panic!("expected AlreadyExists, got: {response:?}");
741 }
742 }
743
744 #[tokio::test]
745 async fn test_payment_cache_stats_returns_correct_values() {
746 let (protocol, _temp) = create_test_protocol().await;
747
748 let stats = protocol.payment_cache_stats();
749 assert_eq!(stats.hits, 0);
750 assert_eq!(stats.misses, 0);
751 assert_eq!(stats.additions, 0);
752
753 let content = b"stats test";
755 let address = LmdbStorage::compute_address(content);
756 protocol.payment_verifier().cache_insert(address);
757
758 let put_request = ChunkPutRequest::new(address, content.to_vec());
759 let put_msg = ChunkMessage {
760 request_id: 120,
761 body: ChunkMessageBody::PutRequest(put_request),
762 };
763 let put_bytes = put_msg.encode().expect("encode put");
764 let _ = protocol
765 .try_handle_request(&put_bytes)
766 .await
767 .expect("handle put");
768
769 let stats = protocol.payment_cache_stats();
770 assert_eq!(stats.additions, 1);
772 assert_eq!(stats.hits, 1);
773 }
774
775 #[tokio::test]
776 async fn test_storage_stats() {
777 let (protocol, _temp) = create_test_protocol().await;
778 let stats = protocol.storage_stats();
779 assert_eq!(stats.chunks_stored, 0);
780 }
781
782 #[tokio::test]
783 async fn test_merkle_candidate_quote_request() {
784 use crate::payment::quote::verify_merkle_candidate_signature;
785 use evmlib::merkle_payments::MerklePaymentCandidateNode;
786
787 let (protocol, _temp) = create_test_protocol().await;
789
790 let address = [0x77; 32];
791 let timestamp = std::time::SystemTime::now()
792 .duration_since(std::time::UNIX_EPOCH)
793 .expect("system time")
794 .as_secs();
795
796 let request = MerkleCandidateQuoteRequest {
797 address,
798 data_type: DATA_TYPE_CHUNK,
799 data_size: 4096,
800 merkle_payment_timestamp: timestamp,
801 };
802 let msg = ChunkMessage {
803 request_id: 600,
804 body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
805 };
806 let msg_bytes = msg.encode().expect("encode request");
807
808 let response_bytes = protocol
809 .try_handle_request(&msg_bytes)
810 .await
811 .expect("handle merkle candidate quote")
812 .expect("expected response");
813 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
814
815 assert_eq!(response.request_id, 600);
816 match response.body {
817 ChunkMessageBody::MerkleCandidateQuoteResponse(
818 MerkleCandidateQuoteResponse::Success { candidate_node },
819 ) => {
820 let candidate: MerklePaymentCandidateNode =
821 rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
822
823 assert!(
825 verify_merkle_candidate_signature(&candidate),
826 "ML-DSA-65 candidate signature must be valid"
827 );
828
829 assert_eq!(candidate.merkle_payment_timestamp, timestamp);
830 assert_eq!(candidate.quoting_metrics.data_size, 4096);
831 assert_eq!(candidate.quoting_metrics.data_type, DATA_TYPE_CHUNK);
832 }
833 other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
834 }
835 }
836
837 #[tokio::test]
838 async fn test_handle_unexpected_response_message() {
839 let (protocol, _temp) = create_test_protocol().await;
840
841 let msg = ChunkMessage {
843 request_id: 200,
844 body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
845 };
846 let msg_bytes = msg.encode().expect("encode");
847
848 let result = protocol
849 .try_handle_request(&msg_bytes)
850 .await
851 .expect("handle msg");
852
853 assert!(
854 result.is_none(),
855 "expected None for response message, got: {result:?}"
856 );
857 }
858
859 #[tokio::test]
860 async fn test_quote_already_stored_flag() {
861 let (protocol, _temp) = create_test_protocol().await;
862
863 let content = b"already stored quote test";
864 let address = LmdbStorage::compute_address(content);
865
866 protocol.payment_verifier().cache_insert(address);
868 let put_request = ChunkPutRequest::new(address, content.to_vec());
869 let put_msg = ChunkMessage {
870 request_id: 300,
871 body: ChunkMessageBody::PutRequest(put_request),
872 };
873 let put_bytes = put_msg.encode().expect("encode put");
874 let _ = protocol
875 .try_handle_request(&put_bytes)
876 .await
877 .expect("handle put");
878
879 let quote_request = ChunkQuoteRequest {
881 address,
882 data_size: content.len() as u64,
883 data_type: DATA_TYPE_CHUNK,
884 };
885 let quote_msg = ChunkMessage {
886 request_id: 301,
887 body: ChunkMessageBody::QuoteRequest(quote_request),
888 };
889 let quote_bytes = quote_msg.encode().expect("encode quote");
890 let response_bytes = protocol
891 .try_handle_request("e_bytes)
892 .await
893 .expect("handle quote")
894 .expect("expected response");
895 let response = ChunkMessage::decode(&response_bytes).expect("decode");
896
897 match response.body {
898 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
899 already_stored, ..
900 }) => {
901 assert!(
902 already_stored,
903 "already_stored should be true for existing chunk"
904 );
905 }
906 other => panic!("expected Success with already_stored, got: {other:?}"),
907 }
908
909 let new_address = [0xFFu8; 32];
911 let quote_request2 = ChunkQuoteRequest {
912 address: new_address,
913 data_size: 100,
914 data_type: DATA_TYPE_CHUNK,
915 };
916 let quote_msg2 = ChunkMessage {
917 request_id: 302,
918 body: ChunkMessageBody::QuoteRequest(quote_request2),
919 };
920 let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
921 let response_bytes2 = protocol
922 .try_handle_request("e_bytes2)
923 .await
924 .expect("handle quote2")
925 .expect("expected response");
926 let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
927
928 match response2.body {
929 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
930 already_stored, ..
931 }) => {
932 assert!(
933 !already_stored,
934 "already_stored should be false for new chunk"
935 );
936 }
937 other => panic!("expected Success with already_stored=false, got: {other:?}"),
938 }
939 }
940}