1use crate::ant_protocol::{
30 ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
31 ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
32 MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, DATA_TYPE_CHUNK,
33 MAX_CHUNK_SIZE,
34};
35use crate::client::compute_address;
36use crate::error::{Error, Result};
37use crate::payment::{PaymentVerifier, QuoteGenerator};
38use crate::storage::lmdb::LmdbStorage;
39use bytes::Bytes;
40use std::sync::Arc;
41use tracing::{debug, info, warn};
42
43pub struct AntProtocol {
48 storage: Arc<LmdbStorage>,
50 payment_verifier: Arc<PaymentVerifier>,
52 quote_generator: Arc<QuoteGenerator>,
55}
56
57impl AntProtocol {
58 #[must_use]
66 pub fn new(
67 storage: Arc<LmdbStorage>,
68 payment_verifier: Arc<PaymentVerifier>,
69 quote_generator: Arc<QuoteGenerator>,
70 ) -> Self {
71 Self {
72 storage,
73 payment_verifier,
74 quote_generator,
75 }
76 }
77
78 #[must_use]
80 pub fn protocol_id(&self) -> &'static str {
81 CHUNK_PROTOCOL_ID
82 }
83
84 pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
95 let message = ChunkMessage::decode(data)
96 .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
97
98 let request_id = message.request_id;
99
100 let response_body = match message.body {
101 ChunkMessageBody::PutRequest(req) => {
102 ChunkMessageBody::PutResponse(self.handle_put(req).await)
103 }
104 ChunkMessageBody::GetRequest(req) => {
105 ChunkMessageBody::GetResponse(self.handle_get(req).await)
106 }
107 ChunkMessageBody::QuoteRequest(ref req) => {
108 ChunkMessageBody::QuoteResponse(self.handle_quote(req))
109 }
110 ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
111 ChunkMessageBody::MerkleCandidateQuoteResponse(
112 self.handle_merkle_candidate_quote(req),
113 )
114 }
115 ChunkMessageBody::PutResponse(_)
120 | ChunkMessageBody::GetResponse(_)
121 | ChunkMessageBody::QuoteResponse(_)
122 | ChunkMessageBody::MerkleCandidateQuoteResponse(_) => return Ok(None),
123 };
124
125 let response = ChunkMessage {
126 request_id,
127 body: response_body,
128 };
129
130 response
131 .encode()
132 .map(|b| Some(Bytes::from(b)))
133 .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
134 }
135
136 pub async fn handle_message(&self, data: &[u8]) -> Result<Bytes> {
147 let message = ChunkMessage::decode(data)
148 .map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
149
150 let request_id = message.request_id;
151
152 let response_body = match message.body {
153 ChunkMessageBody::PutRequest(req) => {
154 ChunkMessageBody::PutResponse(self.handle_put(req).await)
155 }
156 ChunkMessageBody::GetRequest(req) => {
157 ChunkMessageBody::GetResponse(self.handle_get(req).await)
158 }
159 ChunkMessageBody::QuoteRequest(ref req) => {
160 ChunkMessageBody::QuoteResponse(self.handle_quote(req))
161 }
162 ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
163 ChunkMessageBody::MerkleCandidateQuoteResponse(
164 self.handle_merkle_candidate_quote(req),
165 )
166 }
167 ChunkMessageBody::PutResponse(_)
168 | ChunkMessageBody::GetResponse(_)
169 | ChunkMessageBody::QuoteResponse(_)
170 | ChunkMessageBody::MerkleCandidateQuoteResponse(_) => {
171 let error = ProtocolError::Internal("Unexpected response message".to_string());
172 ChunkMessageBody::PutResponse(ChunkPutResponse::Error(error))
173 }
174 };
175
176 let response = ChunkMessage {
177 request_id,
178 body: response_body,
179 };
180
181 response
182 .encode()
183 .map(Bytes::from)
184 .map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
185 }
186
187 async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
189 let address = request.address;
190 let addr_hex = hex::encode(address);
191 debug!("Handling PUT request for {addr_hex}");
192
193 if request.content.len() > MAX_CHUNK_SIZE {
195 return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
196 size: request.content.len(),
197 max_size: MAX_CHUNK_SIZE,
198 });
199 }
200
201 let computed = compute_address(&request.content);
203 if computed != address {
204 return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
205 expected: address,
206 actual: computed,
207 });
208 }
209
210 match self.storage.exists(&address) {
212 Ok(true) => {
213 debug!("Chunk {addr_hex} already exists");
214 return ChunkPutResponse::AlreadyExists { address };
215 }
216 Err(e) => {
217 return ChunkPutResponse::Error(ProtocolError::Internal(format!(
218 "Storage read failed: {e}"
219 )));
220 }
221 Ok(false) => {}
222 }
223
224 let payment_result = self
226 .payment_verifier
227 .verify_payment(&address, request.payment_proof.as_deref())
228 .await;
229
230 match payment_result {
231 Ok(status) if status.can_store() => {
232 }
234 Ok(_) => {
235 return ChunkPutResponse::PaymentRequired {
236 message: "Payment required for new chunk".to_string(),
237 };
238 }
239 Err(e) => {
240 return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
241 }
242 }
243
244 match self.storage.put(&address, &request.content).await {
246 Ok(_) => {
247 let content_len = request.content.len();
248 info!("Stored chunk {addr_hex} ({content_len} bytes)");
249 self.quote_generator.record_store(DATA_TYPE_CHUNK);
251 self.quote_generator.record_payment();
252 ChunkPutResponse::Success { address }
253 }
254 Err(e) => {
255 warn!("Failed to store chunk {addr_hex}: {e}");
256 ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
257 }
258 }
259 }
260
261 async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
263 let address = request.address;
264 let addr_hex = hex::encode(address);
265 debug!("Handling GET request for {addr_hex}");
266
267 match self.storage.get(&address).await {
268 Ok(Some(content)) => {
269 let content_len = content.len();
270 debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
271 ChunkGetResponse::Success { address, content }
272 }
273 Ok(None) => {
274 debug!("Chunk {addr_hex} not found");
275 ChunkGetResponse::NotFound { address }
276 }
277 Err(e) => {
278 warn!("Failed to retrieve chunk {addr_hex}: {e}");
279 ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
280 }
281 }
282 }
283
284 fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
286 let addr_hex = hex::encode(request.address);
287 let data_size = request.data_size;
288 debug!("Handling quote request for {addr_hex} (size: {data_size})");
289
290 let already_stored = match self.storage.exists(&request.address) {
293 Ok(exists) => exists,
294 Err(e) => {
295 warn!("Storage check failed for {addr_hex}: {e}");
296 false }
298 };
299
300 if already_stored {
301 debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
302 }
303
304 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
306 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
307 size: MAX_CHUNK_SIZE + 1,
308 max_size: MAX_CHUNK_SIZE,
309 });
310 };
311 if data_size_usize > MAX_CHUNK_SIZE {
312 return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
313 size: data_size_usize,
314 max_size: MAX_CHUNK_SIZE,
315 });
316 }
317
318 match self
319 .quote_generator
320 .create_quote(request.address, data_size_usize, request.data_type)
321 {
322 Ok(quote) => {
323 match rmp_serde::to_vec("e) {
325 Ok(quote_bytes) => ChunkQuoteResponse::Success {
326 quote: quote_bytes,
327 already_stored,
328 },
329 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
330 "Failed to serialize quote: {e}"
331 ))),
332 }
333 }
334 Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
335 }
336 }
337
338 fn handle_merkle_candidate_quote(
340 &self,
341 request: &MerkleCandidateQuoteRequest,
342 ) -> MerkleCandidateQuoteResponse {
343 let addr_hex = hex::encode(request.address);
344 let data_size = request.data_size;
345 debug!(
346 "Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
347 request.merkle_payment_timestamp
348 );
349
350 let Ok(data_size_usize) = usize::try_from(request.data_size) else {
351 return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
352 "data_size {} overflows usize",
353 request.data_size
354 )));
355 };
356 if data_size_usize > MAX_CHUNK_SIZE {
357 return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
358 size: data_size_usize,
359 max_size: MAX_CHUNK_SIZE,
360 });
361 }
362
363 match self.quote_generator.create_merkle_candidate_quote(
364 data_size_usize,
365 request.data_type,
366 request.merkle_payment_timestamp,
367 ) {
368 Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
369 Ok(bytes) => MerkleCandidateQuoteResponse::Success {
370 candidate_node: bytes,
371 },
372 Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
373 "Failed to serialize merkle candidate node: {e}"
374 ))),
375 },
376 Err(e) => {
377 MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
378 }
379 }
380 }
381
382 #[must_use]
384 pub fn storage_stats(&self) -> crate::storage::StorageStats {
385 self.storage.stats()
386 }
387
388 #[must_use]
390 pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
391 self.payment_verifier.cache_stats()
392 }
393
394 #[cfg(any(test, feature = "test-utils"))]
400 #[must_use]
401 pub fn payment_verifier(&self) -> &PaymentVerifier {
402 &self.payment_verifier
403 }
404
405 pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
411 self.storage.exists(address)
412 }
413
414 pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
420 self.storage.get(address).await
421 }
422
423 #[cfg(test)]
431 pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
432 self.storage.put(address, content).await
433 }
434}
435
436#[cfg(test)]
437#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
438mod tests {
439 use super::*;
440 use crate::payment::metrics::QuotingMetricsTracker;
441 use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
442 use crate::storage::LmdbStorageConfig;
443 use ant_evm::RewardsAddress;
444 use saorsa_core::identity::NodeIdentity;
445 use saorsa_core::MlDsa65;
446 use saorsa_pqc::pqc::types::MlDsaSecretKey;
447 use tempfile::TempDir;
448
449 async fn create_test_protocol() -> (AntProtocol, TempDir) {
450 let temp_dir = TempDir::new().expect("create temp dir");
451
452 let storage_config = LmdbStorageConfig {
453 root_dir: temp_dir.path().to_path_buf(),
454 verify_on_read: true,
455 max_chunks: 0,
456 max_map_size: 0,
457 };
458 let storage = Arc::new(
459 LmdbStorage::new(storage_config)
460 .await
461 .expect("create storage"),
462 );
463
464 let rewards_address = RewardsAddress::new([1u8; 20]);
465 let payment_config = PaymentVerifierConfig {
466 evm: EvmVerifierConfig::default(),
467 cache_capacity: 100_000,
468 local_rewards_address: rewards_address,
469 };
470 let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
471 let metrics_tracker = QuotingMetricsTracker::new(1000, 100);
472 let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
473
474 let identity = NodeIdentity::generate().expect("generate identity");
476 let pub_key_bytes = identity.public_key().as_bytes().to_vec();
477 let sk_bytes = identity.secret_key_bytes().to_vec();
478 let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
479 quote_generator.set_signer(pub_key_bytes, move |msg| {
480 use saorsa_pqc::pqc::MlDsaOperations;
481 let ml_dsa = MlDsa65::new();
482 ml_dsa
483 .sign(&sk, msg)
484 .map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
485 });
486
487 let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
488 (protocol, temp_dir)
489 }
490
491 #[tokio::test]
492 async fn test_put_and_get_chunk() {
493 let (protocol, _temp) = create_test_protocol().await;
494
495 let content = b"hello world";
496 let address = LmdbStorage::compute_address(content);
497
498 protocol.payment_verifier().cache_insert(address);
500
501 let put_request = ChunkPutRequest::new(address, content.to_vec());
502 let put_msg = ChunkMessage {
503 request_id: 1,
504 body: ChunkMessageBody::PutRequest(put_request),
505 };
506 let put_bytes = put_msg.encode().expect("encode put");
507
508 let response_bytes = protocol
510 .handle_message(&put_bytes)
511 .await
512 .expect("handle put");
513 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
514
515 assert_eq!(response.request_id, 1);
516 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
517 response.body
518 {
519 assert_eq!(addr, address);
520 } else {
521 panic!("expected PutResponse::Success, got: {response:?}");
522 }
523
524 let get_request = ChunkGetRequest::new(address);
526 let get_msg = ChunkMessage {
527 request_id: 2,
528 body: ChunkMessageBody::GetRequest(get_request),
529 };
530 let get_bytes = get_msg.encode().expect("encode get");
531
532 let response_bytes = protocol
534 .handle_message(&get_bytes)
535 .await
536 .expect("handle get");
537 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
538
539 assert_eq!(response.request_id, 2);
540 if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
541 address: addr,
542 content: data,
543 }) = response.body
544 {
545 assert_eq!(addr, address);
546 assert_eq!(data, content.to_vec());
547 } else {
548 panic!("expected GetResponse::Success");
549 }
550 }
551
552 #[tokio::test]
553 async fn test_get_not_found() {
554 let (protocol, _temp) = create_test_protocol().await;
555
556 let address = [0xAB; 32];
557 let get_request = ChunkGetRequest::new(address);
558 let get_msg = ChunkMessage {
559 request_id: 10,
560 body: ChunkMessageBody::GetRequest(get_request),
561 };
562 let get_bytes = get_msg.encode().expect("encode get");
563
564 let response_bytes = protocol
565 .handle_message(&get_bytes)
566 .await
567 .expect("handle get");
568 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
569
570 assert_eq!(response.request_id, 10);
571 if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
572 response.body
573 {
574 assert_eq!(addr, address);
575 } else {
576 panic!("expected GetResponse::NotFound");
577 }
578 }
579
580 #[tokio::test]
581 async fn test_put_address_mismatch() {
582 let (protocol, _temp) = create_test_protocol().await;
583
584 let content = b"test content";
585 let wrong_address = [0xFF; 32]; protocol.payment_verifier().cache_insert(wrong_address);
589
590 let put_request = ChunkPutRequest::new(wrong_address, content.to_vec());
591 let put_msg = ChunkMessage {
592 request_id: 20,
593 body: ChunkMessageBody::PutRequest(put_request),
594 };
595 let put_bytes = put_msg.encode().expect("encode put");
596
597 let response_bytes = protocol
598 .handle_message(&put_bytes)
599 .await
600 .expect("handle put");
601 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
602
603 assert_eq!(response.request_id, 20);
604 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
605 ProtocolError::AddressMismatch { .. },
606 )) = response.body
607 {
608 } else {
610 panic!("expected AddressMismatch error, got: {response:?}");
611 }
612 }
613
614 #[tokio::test]
615 async fn test_put_chunk_too_large() {
616 let (protocol, _temp) = create_test_protocol().await;
617
618 let content = vec![0u8; MAX_CHUNK_SIZE + 1];
620 let address = LmdbStorage::compute_address(&content);
621
622 let put_request = ChunkPutRequest::new(address, content);
623 let put_msg = ChunkMessage {
624 request_id: 30,
625 body: ChunkMessageBody::PutRequest(put_request),
626 };
627 let put_bytes = put_msg.encode().expect("encode put");
628
629 let response_bytes = protocol
630 .handle_message(&put_bytes)
631 .await
632 .expect("handle put");
633 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
634
635 assert_eq!(response.request_id, 30);
636 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
637 ProtocolError::ChunkTooLarge { .. },
638 )) = response.body
639 {
640 } else {
642 panic!("expected ChunkTooLarge error");
643 }
644 }
645
646 #[tokio::test]
647 async fn test_put_already_exists() {
648 let (protocol, _temp) = create_test_protocol().await;
649
650 let content = b"duplicate content";
651 let address = LmdbStorage::compute_address(content);
652
653 protocol.payment_verifier().cache_insert(address);
655
656 let put_request = ChunkPutRequest::new(address, content.to_vec());
657 let put_msg = ChunkMessage {
658 request_id: 40,
659 body: ChunkMessageBody::PutRequest(put_request),
660 };
661 let put_bytes = put_msg.encode().expect("encode put");
662
663 let _ = protocol
664 .handle_message(&put_bytes)
665 .await
666 .expect("handle put");
667
668 let response_bytes = protocol
670 .handle_message(&put_bytes)
671 .await
672 .expect("handle put 2");
673 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
674
675 assert_eq!(response.request_id, 40);
676 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
677 response.body
678 {
679 assert_eq!(addr, address);
680 } else {
681 panic!("expected AlreadyExists");
682 }
683 }
684
685 #[tokio::test]
686 async fn test_protocol_id() {
687 let (protocol, _temp) = create_test_protocol().await;
688 assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
689 }
690
691 #[tokio::test]
692 async fn test_exists_and_local_access() {
693 let (protocol, _temp) = create_test_protocol().await;
694
695 let content = b"local access test";
696 let address = LmdbStorage::compute_address(content);
697
698 assert!(!protocol.exists(&address).expect("exists check"));
699
700 protocol
701 .put_local(&address, content)
702 .await
703 .expect("put local");
704
705 assert!(protocol.exists(&address).expect("exists check"));
706
707 let retrieved = protocol.get_local(&address).await.expect("get local");
708 assert_eq!(retrieved, Some(content.to_vec()));
709 }
710
711 #[tokio::test]
712 async fn test_cache_insert_is_visible() {
713 let (protocol, _temp) = create_test_protocol().await;
714
715 let content = b"cache test content";
716 let address = LmdbStorage::compute_address(content);
717
718 let stats_before = protocol.payment_cache_stats();
720 assert_eq!(stats_before.additions, 0);
721
722 protocol.payment_verifier().cache_insert(address);
724
725 let stats_after = protocol.payment_cache_stats();
727 assert_eq!(stats_after.additions, 1);
728
729 let put_request = ChunkPutRequest::new(address, content.to_vec());
731 let put_msg = ChunkMessage {
732 request_id: 100,
733 body: ChunkMessageBody::PutRequest(put_request),
734 };
735 let put_bytes = put_msg.encode().expect("encode put");
736 let response_bytes = protocol
737 .handle_message(&put_bytes)
738 .await
739 .expect("handle put");
740 let response = ChunkMessage::decode(&response_bytes).expect("decode");
741
742 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
743 } else {
745 panic!("expected success, got: {response:?}");
746 }
747 }
748
749 #[tokio::test]
750 async fn test_put_same_chunk_twice_hits_cache() {
751 let (protocol, _temp) = create_test_protocol().await;
752
753 let content = b"duplicate cache test";
754 let address = LmdbStorage::compute_address(content);
755
756 protocol.payment_verifier().cache_insert(address);
758
759 let put_request = ChunkPutRequest::new(address, content.to_vec());
761 let put_msg = ChunkMessage {
762 request_id: 110,
763 body: ChunkMessageBody::PutRequest(put_request),
764 };
765 let put_bytes = put_msg.encode().expect("encode put");
766 let _ = protocol
767 .handle_message(&put_bytes)
768 .await
769 .expect("handle put 1");
770
771 let response_bytes = protocol
773 .handle_message(&put_bytes)
774 .await
775 .expect("handle put 2");
776 let response = ChunkMessage::decode(&response_bytes).expect("decode");
777
778 if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
779 {
780 } else {
782 panic!("expected AlreadyExists, got: {response:?}");
783 }
784 }
785
786 #[tokio::test]
787 async fn test_payment_cache_stats_returns_correct_values() {
788 let (protocol, _temp) = create_test_protocol().await;
789
790 let stats = protocol.payment_cache_stats();
791 assert_eq!(stats.hits, 0);
792 assert_eq!(stats.misses, 0);
793 assert_eq!(stats.additions, 0);
794
795 let content = b"stats test";
797 let address = LmdbStorage::compute_address(content);
798 protocol.payment_verifier().cache_insert(address);
799
800 let put_request = ChunkPutRequest::new(address, content.to_vec());
801 let put_msg = ChunkMessage {
802 request_id: 120,
803 body: ChunkMessageBody::PutRequest(put_request),
804 };
805 let put_bytes = put_msg.encode().expect("encode put");
806 let _ = protocol
807 .handle_message(&put_bytes)
808 .await
809 .expect("handle put");
810
811 let stats = protocol.payment_cache_stats();
812 assert_eq!(stats.additions, 1);
814 assert_eq!(stats.hits, 1);
815 }
816
817 #[tokio::test]
818 async fn test_storage_stats() {
819 let (protocol, _temp) = create_test_protocol().await;
820 let stats = protocol.storage_stats();
821 assert_eq!(stats.chunks_stored, 0);
822 }
823
824 #[tokio::test]
825 async fn test_merkle_candidate_quote_request() {
826 use crate::payment::quote::verify_merkle_candidate_signature;
827 use ant_evm::merkle_payments::MerklePaymentCandidateNode;
828
829 let (protocol, _temp) = create_test_protocol().await;
831
832 let address = [0x77; 32];
833 let timestamp = std::time::SystemTime::now()
834 .duration_since(std::time::UNIX_EPOCH)
835 .expect("system time")
836 .as_secs();
837
838 let request = MerkleCandidateQuoteRequest {
839 address,
840 data_type: DATA_TYPE_CHUNK,
841 data_size: 4096,
842 merkle_payment_timestamp: timestamp,
843 };
844 let msg = ChunkMessage {
845 request_id: 600,
846 body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
847 };
848 let msg_bytes = msg.encode().expect("encode request");
849
850 let response_bytes = protocol
851 .handle_message(&msg_bytes)
852 .await
853 .expect("handle merkle candidate quote");
854 let response = ChunkMessage::decode(&response_bytes).expect("decode response");
855
856 assert_eq!(response.request_id, 600);
857 match response.body {
858 ChunkMessageBody::MerkleCandidateQuoteResponse(
859 MerkleCandidateQuoteResponse::Success { candidate_node },
860 ) => {
861 let candidate: MerklePaymentCandidateNode =
862 rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
863
864 assert!(
866 verify_merkle_candidate_signature(&candidate),
867 "ML-DSA-65 candidate signature must be valid"
868 );
869
870 assert_eq!(candidate.merkle_payment_timestamp, timestamp);
871 assert_eq!(candidate.quoting_metrics.data_size, 4096);
872 assert_eq!(candidate.quoting_metrics.data_type, DATA_TYPE_CHUNK);
873 }
874 other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
875 }
876 }
877
878 #[tokio::test]
879 async fn test_handle_unexpected_response_message() {
880 let (protocol, _temp) = create_test_protocol().await;
881
882 let msg = ChunkMessage {
884 request_id: 200,
885 body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
886 };
887 let msg_bytes = msg.encode().expect("encode");
888
889 let response_bytes = protocol
890 .handle_message(&msg_bytes)
891 .await
892 .expect("handle msg");
893 let response = ChunkMessage::decode(&response_bytes).expect("decode");
894
895 if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(ProtocolError::Internal(
896 msg,
897 ))) = response.body
898 {
899 assert!(msg.contains("Unexpected"));
900 } else {
901 panic!("expected Internal error, got: {response:?}");
902 }
903 }
904
905 #[tokio::test]
906 async fn test_quote_already_stored_flag() {
907 let (protocol, _temp) = create_test_protocol().await;
908
909 let content = b"already stored quote test";
910 let address = LmdbStorage::compute_address(content);
911
912 protocol.payment_verifier().cache_insert(address);
914 let put_request = ChunkPutRequest::new(address, content.to_vec());
915 let put_msg = ChunkMessage {
916 request_id: 300,
917 body: ChunkMessageBody::PutRequest(put_request),
918 };
919 let put_bytes = put_msg.encode().expect("encode put");
920 let _ = protocol
921 .handle_message(&put_bytes)
922 .await
923 .expect("handle put");
924
925 let quote_request = ChunkQuoteRequest {
927 address,
928 data_size: content.len() as u64,
929 data_type: DATA_TYPE_CHUNK,
930 };
931 let quote_msg = ChunkMessage {
932 request_id: 301,
933 body: ChunkMessageBody::QuoteRequest(quote_request),
934 };
935 let quote_bytes = quote_msg.encode().expect("encode quote");
936 let response_bytes = protocol
937 .handle_message("e_bytes)
938 .await
939 .expect("handle quote");
940 let response = ChunkMessage::decode(&response_bytes).expect("decode");
941
942 match response.body {
943 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
944 already_stored, ..
945 }) => {
946 assert!(
947 already_stored,
948 "already_stored should be true for existing chunk"
949 );
950 }
951 other => panic!("expected Success with already_stored, got: {other:?}"),
952 }
953
954 let new_address = [0xFFu8; 32];
956 let quote_request2 = ChunkQuoteRequest {
957 address: new_address,
958 data_size: 100,
959 data_type: DATA_TYPE_CHUNK,
960 };
961 let quote_msg2 = ChunkMessage {
962 request_id: 302,
963 body: ChunkMessageBody::QuoteRequest(quote_request2),
964 };
965 let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
966 let response_bytes2 = protocol
967 .handle_message("e_bytes2)
968 .await
969 .expect("handle quote2");
970 let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
971
972 match response2.body {
973 ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
974 already_stored, ..
975 }) => {
976 assert!(
977 !already_stored,
978 "already_stored should be false for new chunk"
979 );
980 }
981 other => panic!("expected Success with already_stored=false, got: {other:?}"),
982 }
983 }
984}