1use crate::ant_protocol::{
31 ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
32 ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
33 MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, DATA_TYPE_CHUNK,
34 MAX_CHUNK_SIZE,
35};
36use crate::client::compute_address;
37use crate::error::{Error, Result};
38use crate::logging::{debug, info, warn};
39use crate::payment::{PaymentVerifier, QuoteGenerator};
40use crate::replication::fresh::FreshWriteEvent;
41use crate::storage::lmdb::LmdbStorage;
42use bytes::Bytes;
43use std::sync::Arc;
44use tokio::sync::mpsc;
45
46pub struct AntProtocol {
51 storage: Arc<LmdbStorage>,
53 payment_verifier: Arc<PaymentVerifier>,
55 quote_generator: Arc<QuoteGenerator>,
58 fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
60}
61
62impl AntProtocol {
63 #[must_use]
71 pub fn new(
72 storage: Arc<LmdbStorage>,
73 payment_verifier: Arc<PaymentVerifier>,
74 quote_generator: Arc<QuoteGenerator>,
75 ) -> Self {
76 Self {
77 storage,
78 payment_verifier,
79 quote_generator,
80 fresh_write_tx: None,
81 }
82 }
83
84 pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
89 self.fresh_write_tx = Some(tx);
90 }
91
92 #[must_use]
94 pub fn protocol_id(&self) -> &'static str {
95 CHUNK_PROTOCOL_ID
96 }
97
98 #[must_use]
100 pub fn storage(&self) -> Arc<LmdbStorage> {
101 Arc::clone(&self.storage)
102 }
103
104 #[must_use]
106 pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
107 Arc::clone(&self.payment_verifier)
108 }
109
110 pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
121 let message = ChunkMessage::decode(data)
122 .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
123
124 let request_id = message.request_id;
125
126 let response_body = match message.body {
127 ChunkMessageBody::PutRequest(req) => {
128 ChunkMessageBody::PutResponse(self.handle_put(req).await)
129 }
130 ChunkMessageBody::GetRequest(req) => {
131 ChunkMessageBody::GetResponse(self.handle_get(req).await)
132 }
133 ChunkMessageBody::QuoteRequest(ref req) => {
134 ChunkMessageBody::QuoteResponse(self.handle_quote(req))
135 }
136 ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
137 ChunkMessageBody::MerkleCandidateQuoteResponse(
138 self.handle_merkle_candidate_quote(req),
139 )
140 }
141 _ => return Ok(None),
153 };
154
155 let response = ChunkMessage {
156 request_id,
157 body: response_body,
158 };
159
160 response
161 .encode()
162 .map(|b| Some(Bytes::from(b)))
163 .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
164 }
165
166 async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
168 let address = request.address;
169 let addr_hex = hex::encode(address);
170 debug!("Handling PUT request for {addr_hex}");
171
172 if request.content.len() > MAX_CHUNK_SIZE {
174 return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
175 size: request.content.len(),
176 max_size: MAX_CHUNK_SIZE,
177 });
178 }
179
180 let computed = compute_address(&request.content);
182 if computed != address {
183 return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
184 expected: address,
185 actual: computed,
186 });
187 }
188
189 match self.storage.exists(&address) {
191 Ok(true) => {
192 debug!("Chunk {addr_hex} already exists");
193 return ChunkPutResponse::AlreadyExists { address };
194 }
195 Err(e) => {
196 return ChunkPutResponse::Error(ProtocolError::Internal(format!(
197 "Storage read failed: {e}"
198 )));
199 }
200 Ok(false) => {}
201 }
202
203 let payment_result = self
205 .payment_verifier
206 .verify_payment(&address, request.payment_proof.as_deref())
207 .await;
208
209 match payment_result {
210 Ok(status) if status.can_store() => {
211 }
213 Ok(_) => {
214 return ChunkPutResponse::PaymentRequired {
215 message: "Payment required for new chunk".to_string(),
216 };
217 }
218 Err(e) => {
219 return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
220 }
221 }
222
223 match self.storage.put(&address, &request.content).await {
225 Ok(_) => {
226 let content_len = request.content.len();
227 info!("Stored chunk {addr_hex} ({content_len} bytes)");
228 self.quote_generator.record_store(DATA_TYPE_CHUNK);
230 self.quote_generator.record_payment();
231
232 if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
237 let event = FreshWriteEvent {
238 key: address,
239 data: request.content,
240 payment_proof: proof,
241 };
242 if tx.send(event).is_err() {
243 debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
244 }
245 }
246
247 ChunkPutResponse::Success { address }
248 }
249 Err(e) => {
250 warn!("Failed to store chunk {addr_hex}: {e}");
251 ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
252 }
253 }
254 }
255
256 async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
258 let address = request.address;
259 let addr_hex = hex::encode(address);
260 debug!("Handling GET request for {addr_hex}");
261
262 match self.storage.get(&address).await {
263 Ok(Some(content)) => {
264 let content_len = content.len();
265 debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
266 ChunkGetResponse::Success { address, content }
267 }
268 Ok(None) => {
269 debug!("Chunk {addr_hex} not found");
270 ChunkGetResponse::NotFound { address }
271 }
272 Err(e) => {
273 warn!("Failed to retrieve chunk {addr_hex}: {e}");
274 ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
275 }
276 }
277 }
278
279 fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
281 let addr_hex = hex::encode(request.address);
282 let data_size = request.data_size;
283 debug!("Handling quote request for {addr_hex} (size: {data_size})");
284
285 #[allow(clippy::manual_unwrap_or_default)]
291 let already_stored = match self.storage.exists(&request.address) {
292 Ok(exists) => exists,
293 Err(e) => {
294 warn!("Storage check failed for {addr_hex}: {e}");
295 false }
297 };
298
299 if already_stored {
300 debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
301 }
302
303 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
305 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
306 size: MAX_CHUNK_SIZE + 1,
307 max_size: MAX_CHUNK_SIZE,
308 });
309 };
310 if data_size_usize > MAX_CHUNK_SIZE {
311 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
312 size: data_size_usize,
313 max_size: MAX_CHUNK_SIZE,
314 });
315 }
316
317 match self
318 .quote_generator
319 .create_quote(request.address, data_size_usize, request.data_type)
320 {
321 Ok(quote) => {
322 match rmp_serde::to_vec("e) {
324 Ok(quote_bytes) => ChunkQuoteResponse::Success {
325 quote: quote_bytes,
326 already_stored,
327 },
328 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
329 "Failed to serialize quote: {e}"
330 ))),
331 }
332 }
333 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
334 }
335 }
336
337 fn handle_merkle_candidate_quote(
339 &self,
340 request: &MerkleCandidateQuoteRequest,
341 ) -> MerkleCandidateQuoteResponse {
342 let addr_hex = hex::encode(request.address);
343 let data_size = request.data_size;
344 debug!(
345 "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
346 request.merkle_payment_timestamp
347 );
348
349 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
350 return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
351 "data_size {} overflows usize",
352 request.data_size
353 )));
354 };
355 if data_size_usize > MAX_CHUNK_SIZE {
356 return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
357 size: data_size_usize,
358 max_size: MAX_CHUNK_SIZE,
359 });
360 }
361
362 match self.quote_generator.create_merkle_candidate_quote(
363 data_size_usize,
364 request.data_type,
365 request.merkle_payment_timestamp,
366 ) {
367 Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
368 Ok(bytes) => MerkleCandidateQuoteResponse::Success {
369 candidate_node: bytes,
370 },
371 Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
372 "Failed to serialize merkle candidate node: {e}"
373 ))),
374 },
375 Err(e) => {
376 MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
377 }
378 }
379 }
380
381 #[must_use]
383 pub fn storage_stats(&self) -> crate::storage::StorageStats {
384 self.storage.stats()
385 }
386
387 #[must_use]
389 pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
390 self.payment_verifier.cache_stats()
391 }
392
393 #[cfg(any(test, feature = "test-utils"))]
399 #[must_use]
400 pub fn payment_verifier(&self) -> &PaymentVerifier {
401 &self.payment_verifier
402 }
403
404 pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
410 self.storage.exists(address)
411 }
412
413 pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
419 self.storage.get(address).await
420 }
421
422 #[cfg(test)]
430 pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
431 self.storage.put(address, content).await
432 }
433}
434
435#[cfg(test)]
436#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
437mod tests {
438 use super::*;
439 use crate::payment::metrics::QuotingMetricsTracker;
440 use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
441 use crate::storage::LmdbStorageConfig;
442 use evmlib::RewardsAddress;
443 use saorsa_core::identity::NodeIdentity;
444 use saorsa_core::MlDsa65;
445 use saorsa_pqc::pqc::types::MlDsaSecretKey;
446 use tempfile::TempDir;
447
448 async fn create_test_protocol() -> (AntProtocol, TempDir) {
449 let temp_dir = TempDir::new().expect("create temp dir");
450
451 let storage_config = LmdbStorageConfig {
452 root_dir: temp_dir.path().to_path_buf(),
453 ..LmdbStorageConfig::test_default()
454 };
455 let storage = Arc::new(
456 LmdbStorage::new(storage_config)
457 .await
458 .expect("create storage"),
459 );
460
461 let rewards_address = RewardsAddress::new([1u8; 20]);
462 let payment_config = PaymentVerifierConfig {
463 evm: EvmVerifierConfig::default(),
464 cache_capacity: 100_000,
465 local_rewards_address: rewards_address,
466 };
467 let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
468 let metrics_tracker = QuotingMetricsTracker::new(100);
469 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
470
471 let identity = NodeIdentity::generate().expect("generate identity");
473 let pub_key_bytes = identity.public_key().as_bytes().to_vec();
474 let sk_bytes = identity.secret_key_bytes().to_vec();
475 let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
476 quote_generator.set_signer(pub_key_bytes, move |msg| {
477 use saorsa_pqc::pqc::MlDsaOperations;
478 let ml_dsa = MlDsa65::new();
479 ml_dsa
480 .sign(&sk, msg)
481 .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
482 });
483
484 let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
485 (protocol, temp_dir)
486 }
487
488 #[tokio::test]
489 async fn test_put_and_get_chunk() {
490 let (protocol, _temp) = create_test_protocol().await;
491
492 let content = b"hello world";
493 let address = LmdbStorage::compute_address(content);
494
495 protocol.payment_verifier().cache_insert(address);
497
498 let put_request = ChunkPutRequest::new(address, content.to_vec());
499 let put_msg = ChunkMessage {
500 request_id: 1,
501 body: ChunkMessageBody::PutRequest(put_request),
502 };
503 let put_bytes = put_msg.encode().expect("encode put");
504
505 let response_bytes = protocol
507 .try_handle_request(&put_bytes)
508 .await
509 .expect("handle put")
510 .expect("expected response");
511 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
512
513 assert_eq!(response.request_id, 1);
514 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
515 response.body
516 {
517 assert_eq!(addr, address);
518 } else {
519 panic!("expected PutResponse::Success, got: {response:?}");
520 }
521
522 let get_request = ChunkGetRequest::new(address);
524 let get_msg = ChunkMessage {
525 request_id: 2,
526 body: ChunkMessageBody::GetRequest(get_request),
527 };
528 let get_bytes = get_msg.encode().expect("encode get");
529
530 let response_bytes = protocol
532 .try_handle_request(&get_bytes)
533 .await
534 .expect("handle get")
535 .expect("expected response");
536 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
537
538 assert_eq!(response.request_id, 2);
539 if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
540 address: addr,
541 content: data,
542 }) = response.body
543 {
544 assert_eq!(addr, address);
545 assert_eq!(data, content.to_vec());
546 } else {
547 panic!("expected GetResponse::Success");
548 }
549 }
550
551 #[tokio::test]
552 async fn test_get_not_found() {
553 let (protocol, _temp) = create_test_protocol().await;
554
555 let address = [0xAB; 32];
556 let get_request = ChunkGetRequest::new(address);
557 let get_msg = ChunkMessage {
558 request_id: 10,
559 body: ChunkMessageBody::GetRequest(get_request),
560 };
561 let get_bytes = get_msg.encode().expect("encode get");
562
563 let response_bytes = protocol
564 .try_handle_request(&get_bytes)
565 .await
566 .expect("handle get")
567 .expect("expected response");
568 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
569
570 assert_eq!(response.request_id, 10);
571 if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
572 response.body
573 {
574 assert_eq!(addr, address);
575 } else {
576 panic!("expected GetResponse::NotFound");
577 }
578 }
579
580 #[tokio::test]
581 async fn test_put_address_mismatch() {
582 let (protocol, _temp) = create_test_protocol().await;
583
584 let content = b"test content";
585 let wrong_address = [0xFF; 32]; protocol.payment_verifier().cache_insert(wrong_address);
589
590 let put_request = ChunkPutRequest::new(wrong_address, content.to_vec());
591 let put_msg = ChunkMessage {
592 request_id: 20,
593 body: ChunkMessageBody::PutRequest(put_request),
594 };
595 let put_bytes = put_msg.encode().expect("encode put");
596
597 let response_bytes = protocol
598 .try_handle_request(&put_bytes)
599 .await
600 .expect("handle put")
601 .expect("expected response");
602 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
603
604 assert_eq!(response.request_id, 20);
605 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
606 ProtocolError::AddressMismatch { .. },
607 )) = response.body
608 {
609 } else {
611 panic!("expected AddressMismatch error, got: {response:?}");
612 }
613 }
614
615 #[tokio::test]
616 async fn test_put_chunk_too_large() {
617 let (protocol, _temp) = create_test_protocol().await;
618
619 let content = vec![0u8; MAX_CHUNK_SIZE + 1];
621 let address = LmdbStorage::compute_address(&content);
622
623 let put_request = ChunkPutRequest::new(address, content);
624 let put_msg = ChunkMessage {
625 request_id: 30,
626 body: ChunkMessageBody::PutRequest(put_request),
627 };
628 let put_bytes = put_msg.encode().expect("encode put");
629
630 let response_bytes = protocol
631 .try_handle_request(&put_bytes)
632 .await
633 .expect("handle put")
634 .expect("expected response");
635 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
636
637 assert_eq!(response.request_id, 30);
638 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
639 ProtocolError::ChunkTooLarge { .. },
640 )) = response.body
641 {
642 } else {
644 panic!("expected ChunkTooLarge error");
645 }
646 }
647
648 #[tokio::test]
649 async fn test_put_already_exists() {
650 let (protocol, _temp) = create_test_protocol().await;
651
652 let content = b"duplicate content";
653 let address = LmdbStorage::compute_address(content);
654
655 protocol.payment_verifier().cache_insert(address);
657
658 let put_request = ChunkPutRequest::new(address, content.to_vec());
659 let put_msg = ChunkMessage {
660 request_id: 40,
661 body: ChunkMessageBody::PutRequest(put_request),
662 };
663 let put_bytes = put_msg.encode().expect("encode put");
664
665 let _ = protocol
666 .try_handle_request(&put_bytes)
667 .await
668 .expect("handle put");
669
670 let response_bytes = protocol
672 .try_handle_request(&put_bytes)
673 .await
674 .expect("handle put 2")
675 .expect("expected response");
676 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
677
678 assert_eq!(response.request_id, 40);
679 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
680 response.body
681 {
682 assert_eq!(addr, address);
683 } else {
684 panic!("expected AlreadyExists");
685 }
686 }
687
688 #[tokio::test]
689 async fn test_protocol_id() {
690 let (protocol, _temp) = create_test_protocol().await;
691 assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
692 }
693
694 #[tokio::test]
695 async fn test_exists_and_local_access() {
696 let (protocol, _temp) = create_test_protocol().await;
697
698 let content = b"local access test";
699 let address = LmdbStorage::compute_address(content);
700
701 assert!(!protocol.exists(&address).expect("exists check"));
702
703 protocol
704 .put_local(&address, content)
705 .await
706 .expect("put local");
707
708 assert!(protocol.exists(&address).expect("exists check"));
709
710 let retrieved = protocol.get_local(&address).await.expect("get local");
711 assert_eq!(retrieved, Some(content.to_vec()));
712 }
713
714 #[tokio::test]
715 async fn test_cache_insert_is_visible() {
716 let (protocol, _temp) = create_test_protocol().await;
717
718 let content = b"cache test content";
719 let address = LmdbStorage::compute_address(content);
720
721 let stats_before = protocol.payment_cache_stats();
723 assert_eq!(stats_before.additions, 0);
724
725 protocol.payment_verifier().cache_insert(address);
727
728 let stats_after = protocol.payment_cache_stats();
730 assert_eq!(stats_after.additions, 1);
731
732 let put_request = ChunkPutRequest::new(address, content.to_vec());
734 let put_msg = ChunkMessage {
735 request_id: 100,
736 body: ChunkMessageBody::PutRequest(put_request),
737 };
738 let put_bytes = put_msg.encode().expect("encode put");
739 let response_bytes = protocol
740 .try_handle_request(&put_bytes)
741 .await
742 .expect("handle put")
743 .expect("expected response");
744 let response = ChunkMessage::decode(&response_bytes).expect("decode");
745
746 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
747 } else {
749 panic!("expected success, got: {response:?}");
750 }
751 }
752
753 #[tokio::test]
754 async fn test_put_same_chunk_twice_hits_cache() {
755 let (protocol, _temp) = create_test_protocol().await;
756
757 let content = b"duplicate cache test";
758 let address = LmdbStorage::compute_address(content);
759
760 protocol.payment_verifier().cache_insert(address);
762
763 let put_request = ChunkPutRequest::new(address, content.to_vec());
765 let put_msg = ChunkMessage {
766 request_id: 110,
767 body: ChunkMessageBody::PutRequest(put_request),
768 };
769 let put_bytes = put_msg.encode().expect("encode put");
770 let _ = protocol
771 .try_handle_request(&put_bytes)
772 .await
773 .expect("handle put 1");
774
775 let response_bytes = protocol
777 .try_handle_request(&put_bytes)
778 .await
779 .expect("handle put 2")
780 .expect("expected response");
781 let response = ChunkMessage::decode(&response_bytes).expect("decode");
782
783 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
784 {
785 } else {
787 panic!("expected AlreadyExists, got: {response:?}");
788 }
789 }
790
791 #[tokio::test]
792 async fn test_payment_cache_stats_returns_correct_values() {
793 let (protocol, _temp) = create_test_protocol().await;
794
795 let stats = protocol.payment_cache_stats();
796 assert_eq!(stats.hits, 0);
797 assert_eq!(stats.misses, 0);
798 assert_eq!(stats.additions, 0);
799
800 let content = b"stats test";
802 let address = LmdbStorage::compute_address(content);
803 protocol.payment_verifier().cache_insert(address);
804
805 let put_request = ChunkPutRequest::new(address, content.to_vec());
806 let put_msg = ChunkMessage {
807 request_id: 120,
808 body: ChunkMessageBody::PutRequest(put_request),
809 };
810 let put_bytes = put_msg.encode().expect("encode put");
811 let _ = protocol
812 .try_handle_request(&put_bytes)
813 .await
814 .expect("handle put");
815
816 let stats = protocol.payment_cache_stats();
817 assert_eq!(stats.additions, 1);
819 assert_eq!(stats.hits, 1);
820 }
821
822 #[tokio::test]
823 async fn test_storage_stats() {
824 let (protocol, _temp) = create_test_protocol().await;
825 let stats = protocol.storage_stats();
826 assert_eq!(stats.chunks_stored, 0);
827 }
828
829 #[tokio::test]
830 async fn test_merkle_candidate_quote_request() {
831 use ant_protocol::payment::verify::verify_merkle_candidate_signature;
832 use evmlib::merkle_payments::MerklePaymentCandidateNode;
833
834 let (protocol, _temp) = create_test_protocol().await;
836
837 let address = [0x77; 32];
838 let timestamp = std::time::SystemTime::now()
839 .duration_since(std::time::UNIX_EPOCH)
840 .expect("system time")
841 .as_secs();
842
843 let request = MerkleCandidateQuoteRequest {
844 address,
845 data_type: DATA_TYPE_CHUNK,
846 data_size: 4096,
847 merkle_payment_timestamp: timestamp,
848 };
849 let msg = ChunkMessage {
850 request_id: 600,
851 body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
852 };
853 let msg_bytes = msg.encode().expect("encode request");
854
855 let response_bytes = protocol
856 .try_handle_request(&msg_bytes)
857 .await
858 .expect("handle merkle candidate quote")
859 .expect("expected response");
860 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
861
862 assert_eq!(response.request_id, 600);
863 match response.body {
864 ChunkMessageBody::MerkleCandidateQuoteResponse(
865 MerkleCandidateQuoteResponse::Success { candidate_node },
866 ) => {
867 let candidate: MerklePaymentCandidateNode =
868 rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
869
870 assert!(
872 verify_merkle_candidate_signature(&candidate),
873 "ML-DSA-65 candidate signature must be valid"
874 );
875
876 assert_eq!(candidate.merkle_payment_timestamp, timestamp);
877 assert!(candidate.price >= evmlib::common::Amount::ZERO);
879 }
880 other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
881 }
882 }
883
884 #[tokio::test]
885 async fn test_handle_unexpected_response_message() {
886 let (protocol, _temp) = create_test_protocol().await;
887
888 let msg = ChunkMessage {
890 request_id: 200,
891 body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
892 };
893 let msg_bytes = msg.encode().expect("encode");
894
895 let result = protocol
896 .try_handle_request(&msg_bytes)
897 .await
898 .expect("handle msg");
899
900 assert!(
901 result.is_none(),
902 "expected None for response message, got: {result:?}"
903 );
904 }
905
906 #[tokio::test]
907 async fn test_quote_already_stored_flag() {
908 let (protocol, _temp) = create_test_protocol().await;
909
910 let content = b"already stored quote test";
911 let address = LmdbStorage::compute_address(content);
912
913 protocol.payment_verifier().cache_insert(address);
915 let put_request = ChunkPutRequest::new(address, content.to_vec());
916 let put_msg = ChunkMessage {
917 request_id: 300,
918 body: ChunkMessageBody::PutRequest(put_request),
919 };
920 let put_bytes = put_msg.encode().expect("encode put");
921 let _ = protocol
922 .try_handle_request(&put_bytes)
923 .await
924 .expect("handle put");
925
926 let quote_request = ChunkQuoteRequest {
928 address,
929 data_size: content.len() as u64,
930 data_type: DATA_TYPE_CHUNK,
931 };
932 let quote_msg = ChunkMessage {
933 request_id: 301,
934 body: ChunkMessageBody::QuoteRequest(quote_request),
935 };
936 let quote_bytes = quote_msg.encode().expect("encode quote");
937 let response_bytes = protocol
938 .try_handle_request("e_bytes)
939 .await
940 .expect("handle quote")
941 .expect("expected response");
942 let response = ChunkMessage::decode(&response_bytes).expect("decode");
943
944 match response.body {
945 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
946 already_stored, ..
947 }) => {
948 assert!(
949 already_stored,
950 "already_stored should be true for existing chunk"
951 );
952 }
953 other => panic!("expected Success with already_stored, got: {other:?}"),
954 }
955
956 let new_address = [0xFFu8; 32];
958 let quote_request2 = ChunkQuoteRequest {
959 address: new_address,
960 data_size: 100,
961 data_type: DATA_TYPE_CHUNK,
962 };
963 let quote_msg2 = ChunkMessage {
964 request_id: 302,
965 body: ChunkMessageBody::QuoteRequest(quote_request2),
966 };
967 let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
968 let response_bytes2 = protocol
969 .try_handle_request("e_bytes2)
970 .await
971 .expect("handle quote2")
972 .expect("expected response");
973 let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
974
975 match response2.body {
976 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
977 already_stored, ..
978 }) => {
979 assert!(
980 !already_stored,
981 "already_stored should be false for new chunk"
982 );
983 }
984 other => panic!("expected Success with already_stored=false, got: {other:?}"),
985 }
986 }
987}