1use crate::ant_protocol::{
31 ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
32 ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
33 MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, DATA_TYPE_CHUNK,
34 MAX_CHUNK_SIZE,
35};
36use crate::client::compute_address;
37use crate::error::{Error, Result};
38use crate::logging::{debug, info, warn};
39use crate::payment::{PaymentVerifier, QuoteGenerator};
40use crate::replication::fresh::FreshWriteEvent;
41use crate::storage::lmdb::LmdbStorage;
42use bytes::Bytes;
43use std::sync::Arc;
44use tokio::sync::mpsc;
45
46pub struct AntProtocol {
51 storage: Arc<LmdbStorage>,
53 payment_verifier: Arc<PaymentVerifier>,
55 quote_generator: Arc<QuoteGenerator>,
58 fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
60}
61
62impl AntProtocol {
63 #[must_use]
71 pub fn new(
72 storage: Arc<LmdbStorage>,
73 payment_verifier: Arc<PaymentVerifier>,
74 quote_generator: Arc<QuoteGenerator>,
75 ) -> Self {
76 Self {
77 storage,
78 payment_verifier,
79 quote_generator,
80 fresh_write_tx: None,
81 }
82 }
83
84 pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
89 self.fresh_write_tx = Some(tx);
90 }
91
92 #[must_use]
94 pub fn protocol_id(&self) -> &'static str {
95 CHUNK_PROTOCOL_ID
96 }
97
98 #[must_use]
100 pub fn storage(&self) -> Arc<LmdbStorage> {
101 Arc::clone(&self.storage)
102 }
103
104 #[must_use]
106 pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
107 Arc::clone(&self.payment_verifier)
108 }
109
110 pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
121 let message = ChunkMessage::decode(data)
122 .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
123
124 let request_id = message.request_id;
125
126 let response_body = match message.body {
127 ChunkMessageBody::PutRequest(req) => {
128 ChunkMessageBody::PutResponse(self.handle_put(req).await)
129 }
130 ChunkMessageBody::GetRequest(req) => {
131 ChunkMessageBody::GetResponse(self.handle_get(req).await)
132 }
133 ChunkMessageBody::QuoteRequest(ref req) => {
134 ChunkMessageBody::QuoteResponse(self.handle_quote(req))
135 }
136 ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
137 ChunkMessageBody::MerkleCandidateQuoteResponse(
138 self.handle_merkle_candidate_quote(req),
139 )
140 }
141 ChunkMessageBody::PutResponse(_)
146 | ChunkMessageBody::GetResponse(_)
147 | ChunkMessageBody::QuoteResponse(_)
148 | ChunkMessageBody::MerkleCandidateQuoteResponse(_) => return Ok(None),
149 };
150
151 let response = ChunkMessage {
152 request_id,
153 body: response_body,
154 };
155
156 response
157 .encode()
158 .map(|b| Some(Bytes::from(b)))
159 .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
160 }
161
162 async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
164 let address = request.address;
165 let addr_hex = hex::encode(address);
166 debug!("Handling PUT request for {addr_hex}");
167
168 if request.content.len() > MAX_CHUNK_SIZE {
170 return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
171 size: request.content.len(),
172 max_size: MAX_CHUNK_SIZE,
173 });
174 }
175
176 let computed = compute_address(&request.content);
178 if computed != address {
179 return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
180 expected: address,
181 actual: computed,
182 });
183 }
184
185 match self.storage.exists(&address) {
187 Ok(true) => {
188 debug!("Chunk {addr_hex} already exists");
189 return ChunkPutResponse::AlreadyExists { address };
190 }
191 Err(e) => {
192 return ChunkPutResponse::Error(ProtocolError::Internal(format!(
193 "Storage read failed: {e}"
194 )));
195 }
196 Ok(false) => {}
197 }
198
199 let payment_result = self
201 .payment_verifier
202 .verify_payment(&address, request.payment_proof.as_deref())
203 .await;
204
205 match payment_result {
206 Ok(status) if status.can_store() => {
207 }
209 Ok(_) => {
210 return ChunkPutResponse::PaymentRequired {
211 message: "Payment required for new chunk".to_string(),
212 };
213 }
214 Err(e) => {
215 return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
216 }
217 }
218
219 match self.storage.put(&address, &request.content).await {
221 Ok(_) => {
222 let content_len = request.content.len();
223 info!("Stored chunk {addr_hex} ({content_len} bytes)");
224 self.quote_generator.record_store(DATA_TYPE_CHUNK);
226 self.quote_generator.record_payment();
227
228 if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
233 let event = FreshWriteEvent {
234 key: address,
235 data: request.content,
236 payment_proof: proof,
237 };
238 if tx.send(event).is_err() {
239 debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
240 }
241 }
242
243 ChunkPutResponse::Success { address }
244 }
245 Err(e) => {
246 warn!("Failed to store chunk {addr_hex}: {e}");
247 ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
248 }
249 }
250 }
251
252 async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
254 let address = request.address;
255 let addr_hex = hex::encode(address);
256 debug!("Handling GET request for {addr_hex}");
257
258 match self.storage.get(&address).await {
259 Ok(Some(content)) => {
260 let content_len = content.len();
261 debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
262 ChunkGetResponse::Success { address, content }
263 }
264 Ok(None) => {
265 debug!("Chunk {addr_hex} not found");
266 ChunkGetResponse::NotFound { address }
267 }
268 Err(e) => {
269 warn!("Failed to retrieve chunk {addr_hex}: {e}");
270 ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
271 }
272 }
273 }
274
275 fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
277 let addr_hex = hex::encode(request.address);
278 let data_size = request.data_size;
279 debug!("Handling quote request for {addr_hex} (size: {data_size})");
280
281 #[allow(clippy::manual_unwrap_or_default)]
287 let already_stored = match self.storage.exists(&request.address) {
288 Ok(exists) => exists,
289 Err(e) => {
290 warn!("Storage check failed for {addr_hex}: {e}");
291 false }
293 };
294
295 if already_stored {
296 debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
297 }
298
299 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
301 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
302 size: MAX_CHUNK_SIZE + 1,
303 max_size: MAX_CHUNK_SIZE,
304 });
305 };
306 if data_size_usize > MAX_CHUNK_SIZE {
307 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
308 size: data_size_usize,
309 max_size: MAX_CHUNK_SIZE,
310 });
311 }
312
313 match self
314 .quote_generator
315 .create_quote(request.address, data_size_usize, request.data_type)
316 {
317 Ok(quote) => {
318 match rmp_serde::to_vec("e) {
320 Ok(quote_bytes) => ChunkQuoteResponse::Success {
321 quote: quote_bytes,
322 already_stored,
323 },
324 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
325 "Failed to serialize quote: {e}"
326 ))),
327 }
328 }
329 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
330 }
331 }
332
333 fn handle_merkle_candidate_quote(
335 &self,
336 request: &MerkleCandidateQuoteRequest,
337 ) -> MerkleCandidateQuoteResponse {
338 let addr_hex = hex::encode(request.address);
339 let data_size = request.data_size;
340 debug!(
341 "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
342 request.merkle_payment_timestamp
343 );
344
345 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
346 return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
347 "data_size {} overflows usize",
348 request.data_size
349 )));
350 };
351 if data_size_usize > MAX_CHUNK_SIZE {
352 return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
353 size: data_size_usize,
354 max_size: MAX_CHUNK_SIZE,
355 });
356 }
357
358 match self.quote_generator.create_merkle_candidate_quote(
359 data_size_usize,
360 request.data_type,
361 request.merkle_payment_timestamp,
362 ) {
363 Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
364 Ok(bytes) => MerkleCandidateQuoteResponse::Success {
365 candidate_node: bytes,
366 },
367 Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
368 "Failed to serialize merkle candidate node: {e}"
369 ))),
370 },
371 Err(e) => {
372 MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
373 }
374 }
375 }
376
377 #[must_use]
379 pub fn storage_stats(&self) -> crate::storage::StorageStats {
380 self.storage.stats()
381 }
382
383 #[must_use]
385 pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
386 self.payment_verifier.cache_stats()
387 }
388
389 #[cfg(any(test, feature = "test-utils"))]
395 #[must_use]
396 pub fn payment_verifier(&self) -> &PaymentVerifier {
397 &self.payment_verifier
398 }
399
400 pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
406 self.storage.exists(address)
407 }
408
409 pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
415 self.storage.get(address).await
416 }
417
418 #[cfg(test)]
426 pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
427 self.storage.put(address, content).await
428 }
429}
430
431#[cfg(test)]
432#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
433mod tests {
434 use super::*;
435 use crate::payment::metrics::QuotingMetricsTracker;
436 use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
437 use crate::storage::LmdbStorageConfig;
438 use evmlib::RewardsAddress;
439 use saorsa_core::identity::NodeIdentity;
440 use saorsa_core::MlDsa65;
441 use saorsa_pqc::pqc::types::MlDsaSecretKey;
442 use tempfile::TempDir;
443
444 async fn create_test_protocol() -> (AntProtocol, TempDir) {
445 let temp_dir = TempDir::new().expect("create temp dir");
446
447 let storage_config = LmdbStorageConfig {
448 root_dir: temp_dir.path().to_path_buf(),
449 ..LmdbStorageConfig::test_default()
450 };
451 let storage = Arc::new(
452 LmdbStorage::new(storage_config)
453 .await
454 .expect("create storage"),
455 );
456
457 let rewards_address = RewardsAddress::new([1u8; 20]);
458 let payment_config = PaymentVerifierConfig {
459 evm: EvmVerifierConfig::default(),
460 cache_capacity: 100_000,
461 local_rewards_address: rewards_address,
462 };
463 let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
464 let metrics_tracker = QuotingMetricsTracker::new(100);
465 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
466
467 let identity = NodeIdentity::generate().expect("generate identity");
469 let pub_key_bytes = identity.public_key().as_bytes().to_vec();
470 let sk_bytes = identity.secret_key_bytes().to_vec();
471 let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
472 quote_generator.set_signer(pub_key_bytes, move |msg| {
473 use saorsa_pqc::pqc::MlDsaOperations;
474 let ml_dsa = MlDsa65::new();
475 ml_dsa
476 .sign(&sk, msg)
477 .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
478 });
479
480 let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
481 (protocol, temp_dir)
482 }
483
484 #[tokio::test]
485 async fn test_put_and_get_chunk() {
486 let (protocol, _temp) = create_test_protocol().await;
487
488 let content = b"hello world";
489 let address = LmdbStorage::compute_address(content);
490
491 protocol.payment_verifier().cache_insert(address);
493
494 let put_request = ChunkPutRequest::new(address, content.to_vec());
495 let put_msg = ChunkMessage {
496 request_id: 1,
497 body: ChunkMessageBody::PutRequest(put_request),
498 };
499 let put_bytes = put_msg.encode().expect("encode put");
500
501 let response_bytes = protocol
503 .try_handle_request(&put_bytes)
504 .await
505 .expect("handle put")
506 .expect("expected response");
507 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
508
509 assert_eq!(response.request_id, 1);
510 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
511 response.body
512 {
513 assert_eq!(addr, address);
514 } else {
515 panic!("expected PutResponse::Success, got: {response:?}");
516 }
517
518 let get_request = ChunkGetRequest::new(address);
520 let get_msg = ChunkMessage {
521 request_id: 2,
522 body: ChunkMessageBody::GetRequest(get_request),
523 };
524 let get_bytes = get_msg.encode().expect("encode get");
525
526 let response_bytes = protocol
528 .try_handle_request(&get_bytes)
529 .await
530 .expect("handle get")
531 .expect("expected response");
532 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
533
534 assert_eq!(response.request_id, 2);
535 if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
536 address: addr,
537 content: data,
538 }) = response.body
539 {
540 assert_eq!(addr, address);
541 assert_eq!(data, content.to_vec());
542 } else {
543 panic!("expected GetResponse::Success");
544 }
545 }
546
547 #[tokio::test]
548 async fn test_get_not_found() {
549 let (protocol, _temp) = create_test_protocol().await;
550
551 let address = [0xAB; 32];
552 let get_request = ChunkGetRequest::new(address);
553 let get_msg = ChunkMessage {
554 request_id: 10,
555 body: ChunkMessageBody::GetRequest(get_request),
556 };
557 let get_bytes = get_msg.encode().expect("encode get");
558
559 let response_bytes = protocol
560 .try_handle_request(&get_bytes)
561 .await
562 .expect("handle get")
563 .expect("expected response");
564 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
565
566 assert_eq!(response.request_id, 10);
567 if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
568 response.body
569 {
570 assert_eq!(addr, address);
571 } else {
572 panic!("expected GetResponse::NotFound");
573 }
574 }
575
576 #[tokio::test]
577 async fn test_put_address_mismatch() {
578 let (protocol, _temp) = create_test_protocol().await;
579
580 let content = b"test content";
581 let wrong_address = [0xFF; 32]; protocol.payment_verifier().cache_insert(wrong_address);
585
586 let put_request = ChunkPutRequest::new(wrong_address, content.to_vec());
587 let put_msg = ChunkMessage {
588 request_id: 20,
589 body: ChunkMessageBody::PutRequest(put_request),
590 };
591 let put_bytes = put_msg.encode().expect("encode put");
592
593 let response_bytes = protocol
594 .try_handle_request(&put_bytes)
595 .await
596 .expect("handle put")
597 .expect("expected response");
598 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
599
600 assert_eq!(response.request_id, 20);
601 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
602 ProtocolError::AddressMismatch { .. },
603 )) = response.body
604 {
605 } else {
607 panic!("expected AddressMismatch error, got: {response:?}");
608 }
609 }
610
611 #[tokio::test]
612 async fn test_put_chunk_too_large() {
613 let (protocol, _temp) = create_test_protocol().await;
614
615 let content = vec![0u8; MAX_CHUNK_SIZE + 1];
617 let address = LmdbStorage::compute_address(&content);
618
619 let put_request = ChunkPutRequest::new(address, content);
620 let put_msg = ChunkMessage {
621 request_id: 30,
622 body: ChunkMessageBody::PutRequest(put_request),
623 };
624 let put_bytes = put_msg.encode().expect("encode put");
625
626 let response_bytes = protocol
627 .try_handle_request(&put_bytes)
628 .await
629 .expect("handle put")
630 .expect("expected response");
631 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
632
633 assert_eq!(response.request_id, 30);
634 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
635 ProtocolError::ChunkTooLarge { .. },
636 )) = response.body
637 {
638 } else {
640 panic!("expected ChunkTooLarge error");
641 }
642 }
643
644 #[tokio::test]
645 async fn test_put_already_exists() {
646 let (protocol, _temp) = create_test_protocol().await;
647
648 let content = b"duplicate content";
649 let address = LmdbStorage::compute_address(content);
650
651 protocol.payment_verifier().cache_insert(address);
653
654 let put_request = ChunkPutRequest::new(address, content.to_vec());
655 let put_msg = ChunkMessage {
656 request_id: 40,
657 body: ChunkMessageBody::PutRequest(put_request),
658 };
659 let put_bytes = put_msg.encode().expect("encode put");
660
661 let _ = protocol
662 .try_handle_request(&put_bytes)
663 .await
664 .expect("handle put");
665
666 let response_bytes = protocol
668 .try_handle_request(&put_bytes)
669 .await
670 .expect("handle put 2")
671 .expect("expected response");
672 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
673
674 assert_eq!(response.request_id, 40);
675 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
676 response.body
677 {
678 assert_eq!(addr, address);
679 } else {
680 panic!("expected AlreadyExists");
681 }
682 }
683
684 #[tokio::test]
685 async fn test_protocol_id() {
686 let (protocol, _temp) = create_test_protocol().await;
687 assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
688 }
689
690 #[tokio::test]
691 async fn test_exists_and_local_access() {
692 let (protocol, _temp) = create_test_protocol().await;
693
694 let content = b"local access test";
695 let address = LmdbStorage::compute_address(content);
696
697 assert!(!protocol.exists(&address).expect("exists check"));
698
699 protocol
700 .put_local(&address, content)
701 .await
702 .expect("put local");
703
704 assert!(protocol.exists(&address).expect("exists check"));
705
706 let retrieved = protocol.get_local(&address).await.expect("get local");
707 assert_eq!(retrieved, Some(content.to_vec()));
708 }
709
710 #[tokio::test]
711 async fn test_cache_insert_is_visible() {
712 let (protocol, _temp) = create_test_protocol().await;
713
714 let content = b"cache test content";
715 let address = LmdbStorage::compute_address(content);
716
717 let stats_before = protocol.payment_cache_stats();
719 assert_eq!(stats_before.additions, 0);
720
721 protocol.payment_verifier().cache_insert(address);
723
724 let stats_after = protocol.payment_cache_stats();
726 assert_eq!(stats_after.additions, 1);
727
728 let put_request = ChunkPutRequest::new(address, content.to_vec());
730 let put_msg = ChunkMessage {
731 request_id: 100,
732 body: ChunkMessageBody::PutRequest(put_request),
733 };
734 let put_bytes = put_msg.encode().expect("encode put");
735 let response_bytes = protocol
736 .try_handle_request(&put_bytes)
737 .await
738 .expect("handle put")
739 .expect("expected response");
740 let response = ChunkMessage::decode(&response_bytes).expect("decode");
741
742 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
743 } else {
745 panic!("expected success, got: {response:?}");
746 }
747 }
748
749 #[tokio::test]
750 async fn test_put_same_chunk_twice_hits_cache() {
751 let (protocol, _temp) = create_test_protocol().await;
752
753 let content = b"duplicate cache test";
754 let address = LmdbStorage::compute_address(content);
755
756 protocol.payment_verifier().cache_insert(address);
758
759 let put_request = ChunkPutRequest::new(address, content.to_vec());
761 let put_msg = ChunkMessage {
762 request_id: 110,
763 body: ChunkMessageBody::PutRequest(put_request),
764 };
765 let put_bytes = put_msg.encode().expect("encode put");
766 let _ = protocol
767 .try_handle_request(&put_bytes)
768 .await
769 .expect("handle put 1");
770
771 let response_bytes = protocol
773 .try_handle_request(&put_bytes)
774 .await
775 .expect("handle put 2")
776 .expect("expected response");
777 let response = ChunkMessage::decode(&response_bytes).expect("decode");
778
779 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
780 {
781 } else {
783 panic!("expected AlreadyExists, got: {response:?}");
784 }
785 }
786
787 #[tokio::test]
788 async fn test_payment_cache_stats_returns_correct_values() {
789 let (protocol, _temp) = create_test_protocol().await;
790
791 let stats = protocol.payment_cache_stats();
792 assert_eq!(stats.hits, 0);
793 assert_eq!(stats.misses, 0);
794 assert_eq!(stats.additions, 0);
795
796 let content = b"stats test";
798 let address = LmdbStorage::compute_address(content);
799 protocol.payment_verifier().cache_insert(address);
800
801 let put_request = ChunkPutRequest::new(address, content.to_vec());
802 let put_msg = ChunkMessage {
803 request_id: 120,
804 body: ChunkMessageBody::PutRequest(put_request),
805 };
806 let put_bytes = put_msg.encode().expect("encode put");
807 let _ = protocol
808 .try_handle_request(&put_bytes)
809 .await
810 .expect("handle put");
811
812 let stats = protocol.payment_cache_stats();
813 assert_eq!(stats.additions, 1);
815 assert_eq!(stats.hits, 1);
816 }
817
818 #[tokio::test]
819 async fn test_storage_stats() {
820 let (protocol, _temp) = create_test_protocol().await;
821 let stats = protocol.storage_stats();
822 assert_eq!(stats.chunks_stored, 0);
823 }
824
825 #[tokio::test]
826 async fn test_merkle_candidate_quote_request() {
827 use crate::payment::quote::verify_merkle_candidate_signature;
828 use evmlib::merkle_payments::MerklePaymentCandidateNode;
829
830 let (protocol, _temp) = create_test_protocol().await;
832
833 let address = [0x77; 32];
834 let timestamp = std::time::SystemTime::now()
835 .duration_since(std::time::UNIX_EPOCH)
836 .expect("system time")
837 .as_secs();
838
839 let request = MerkleCandidateQuoteRequest {
840 address,
841 data_type: DATA_TYPE_CHUNK,
842 data_size: 4096,
843 merkle_payment_timestamp: timestamp,
844 };
845 let msg = ChunkMessage {
846 request_id: 600,
847 body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
848 };
849 let msg_bytes = msg.encode().expect("encode request");
850
851 let response_bytes = protocol
852 .try_handle_request(&msg_bytes)
853 .await
854 .expect("handle merkle candidate quote")
855 .expect("expected response");
856 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
857
858 assert_eq!(response.request_id, 600);
859 match response.body {
860 ChunkMessageBody::MerkleCandidateQuoteResponse(
861 MerkleCandidateQuoteResponse::Success { candidate_node },
862 ) => {
863 let candidate: MerklePaymentCandidateNode =
864 rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
865
866 assert!(
868 verify_merkle_candidate_signature(&candidate),
869 "ML-DSA-65 candidate signature must be valid"
870 );
871
872 assert_eq!(candidate.merkle_payment_timestamp, timestamp);
873 assert!(candidate.price >= evmlib::common::Amount::ZERO);
875 }
876 other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
877 }
878 }
879
880 #[tokio::test]
881 async fn test_handle_unexpected_response_message() {
882 let (protocol, _temp) = create_test_protocol().await;
883
884 let msg = ChunkMessage {
886 request_id: 200,
887 body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
888 };
889 let msg_bytes = msg.encode().expect("encode");
890
891 let result = protocol
892 .try_handle_request(&msg_bytes)
893 .await
894 .expect("handle msg");
895
896 assert!(
897 result.is_none(),
898 "expected None for response message, got: {result:?}"
899 );
900 }
901
902 #[tokio::test]
903 async fn test_quote_already_stored_flag() {
904 let (protocol, _temp) = create_test_protocol().await;
905
906 let content = b"already stored quote test";
907 let address = LmdbStorage::compute_address(content);
908
909 protocol.payment_verifier().cache_insert(address);
911 let put_request = ChunkPutRequest::new(address, content.to_vec());
912 let put_msg = ChunkMessage {
913 request_id: 300,
914 body: ChunkMessageBody::PutRequest(put_request),
915 };
916 let put_bytes = put_msg.encode().expect("encode put");
917 let _ = protocol
918 .try_handle_request(&put_bytes)
919 .await
920 .expect("handle put");
921
922 let quote_request = ChunkQuoteRequest {
924 address,
925 data_size: content.len() as u64,
926 data_type: DATA_TYPE_CHUNK,
927 };
928 let quote_msg = ChunkMessage {
929 request_id: 301,
930 body: ChunkMessageBody::QuoteRequest(quote_request),
931 };
932 let quote_bytes = quote_msg.encode().expect("encode quote");
933 let response_bytes = protocol
934 .try_handle_request("e_bytes)
935 .await
936 .expect("handle quote")
937 .expect("expected response");
938 let response = ChunkMessage::decode(&response_bytes).expect("decode");
939
940 match response.body {
941 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
942 already_stored, ..
943 }) => {
944 assert!(
945 already_stored,
946 "already_stored should be true for existing chunk"
947 );
948 }
949 other => panic!("expected Success with already_stored, got: {other:?}"),
950 }
951
952 let new_address = [0xFFu8; 32];
954 let quote_request2 = ChunkQuoteRequest {
955 address: new_address,
956 data_size: 100,
957 data_type: DATA_TYPE_CHUNK,
958 };
959 let quote_msg2 = ChunkMessage {
960 request_id: 302,
961 body: ChunkMessageBody::QuoteRequest(quote_request2),
962 };
963 let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
964 let response_bytes2 = protocol
965 .try_handle_request("e_bytes2)
966 .await
967 .expect("handle quote2")
968 .expect("expected response");
969 let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
970
971 match response2.body {
972 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
973 already_stored, ..
974 }) => {
975 assert!(
976 !already_stored,
977 "already_stored should be false for new chunk"
978 );
979 }
980 other => panic!("expected Success with already_stored=false, got: {other:?}"),
981 }
982 }
983}