use crate::ant_protocol::{
ChunkGetRequest, ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest,
ChunkPutResponse, ChunkQuoteRequest, ChunkQuoteResponse, MerkleCandidateQuoteRequest,
MerkleCandidateQuoteResponse, ProtocolError, CHUNK_PROTOCOL_ID, DATA_TYPE_CHUNK,
MAX_CHUNK_SIZE,
};
use crate::client::compute_address;
use crate::error::{Error, Result};
use crate::logging::{debug, info, warn};
use crate::payment::{PaymentVerifier, QuoteGenerator};
use crate::replication::fresh::FreshWriteEvent;
use crate::storage::lmdb::LmdbStorage;
use bytes::Bytes;
use std::sync::Arc;
use tokio::sync::mpsc;
pub struct AntProtocol {
storage: Arc<LmdbStorage>,
payment_verifier: Arc<PaymentVerifier>,
quote_generator: Arc<QuoteGenerator>,
fresh_write_tx: Option<mpsc::UnboundedSender<FreshWriteEvent>>,
}
impl AntProtocol {
#[must_use]
pub fn new(
storage: Arc<LmdbStorage>,
payment_verifier: Arc<PaymentVerifier>,
quote_generator: Arc<QuoteGenerator>,
) -> Self {
Self {
storage,
payment_verifier,
quote_generator,
fresh_write_tx: None,
}
}
pub fn set_fresh_write_sender(&mut self, tx: mpsc::UnboundedSender<FreshWriteEvent>) {
self.fresh_write_tx = Some(tx);
}
#[must_use]
pub fn protocol_id(&self) -> &'static str {
CHUNK_PROTOCOL_ID
}
#[must_use]
pub fn storage(&self) -> Arc<LmdbStorage> {
Arc::clone(&self.storage)
}
#[must_use]
pub fn payment_verifier_arc(&self) -> Arc<PaymentVerifier> {
Arc::clone(&self.payment_verifier)
}
pub async fn try_handle_request(&self, data: &[u8]) -> Result<Option<Bytes>> {
let message = ChunkMessage::decode(data)
.map_err(|e| Error::Protocol(format!("Failed to decode message: {e}")))?;
let request_id = message.request_id;
let response_body = match message.body {
ChunkMessageBody::PutRequest(req) => {
ChunkMessageBody::PutResponse(self.handle_put(req).await)
}
ChunkMessageBody::GetRequest(req) => {
ChunkMessageBody::GetResponse(self.handle_get(req).await)
}
ChunkMessageBody::QuoteRequest(ref req) => {
ChunkMessageBody::QuoteResponse(self.handle_quote(req))
}
ChunkMessageBody::MerkleCandidateQuoteRequest(ref req) => {
ChunkMessageBody::MerkleCandidateQuoteResponse(
self.handle_merkle_candidate_quote(req),
)
}
ChunkMessageBody::PutResponse(_)
| ChunkMessageBody::GetResponse(_)
| ChunkMessageBody::QuoteResponse(_)
| ChunkMessageBody::MerkleCandidateQuoteResponse(_) => return Ok(None),
};
let response = ChunkMessage {
request_id,
body: response_body,
};
response
.encode()
.map(|b| Some(Bytes::from(b)))
.map_err(|e| Error::Protocol(format!("Failed to encode response: {e}")))
}
async fn handle_put(&self, request: ChunkPutRequest) -> ChunkPutResponse {
let address = request.address;
let addr_hex = hex::encode(address);
debug!("Handling PUT request for {addr_hex}");
if request.content.len() > MAX_CHUNK_SIZE {
return ChunkPutResponse::Error(ProtocolError::ChunkTooLarge {
size: request.content.len(),
max_size: MAX_CHUNK_SIZE,
});
}
let computed = compute_address(&request.content);
if computed != address {
return ChunkPutResponse::Error(ProtocolError::AddressMismatch {
expected: address,
actual: computed,
});
}
match self.storage.exists(&address) {
Ok(true) => {
debug!("Chunk {addr_hex} already exists");
return ChunkPutResponse::AlreadyExists { address };
}
Err(e) => {
return ChunkPutResponse::Error(ProtocolError::Internal(format!(
"Storage read failed: {e}"
)));
}
Ok(false) => {}
}
let payment_result = self
.payment_verifier
.verify_payment(&address, request.payment_proof.as_deref())
.await;
match payment_result {
Ok(status) if status.can_store() => {
}
Ok(_) => {
return ChunkPutResponse::PaymentRequired {
message: "Payment required for new chunk".to_string(),
};
}
Err(e) => {
return ChunkPutResponse::Error(ProtocolError::PaymentFailed(e.to_string()));
}
}
match self.storage.put(&address, &request.content).await {
Ok(_) => {
let content_len = request.content.len();
info!("Stored chunk {addr_hex} ({content_len} bytes)");
self.quote_generator.record_store(DATA_TYPE_CHUNK);
self.quote_generator.record_payment();
if let (Some(ref tx), Some(proof)) = (&self.fresh_write_tx, request.payment_proof) {
let event = FreshWriteEvent {
key: address,
data: request.content,
payment_proof: proof,
};
if tx.send(event).is_err() {
debug!("Fresh-write channel closed, skipping replication for {addr_hex}");
}
}
ChunkPutResponse::Success { address }
}
Err(e) => {
warn!("Failed to store chunk {addr_hex}: {e}");
ChunkPutResponse::Error(ProtocolError::StorageFailed(e.to_string()))
}
}
}
async fn handle_get(&self, request: ChunkGetRequest) -> ChunkGetResponse {
let address = request.address;
let addr_hex = hex::encode(address);
debug!("Handling GET request for {addr_hex}");
match self.storage.get(&address).await {
Ok(Some(content)) => {
let content_len = content.len();
debug!("Retrieved chunk {addr_hex} ({content_len} bytes)");
ChunkGetResponse::Success { address, content }
}
Ok(None) => {
debug!("Chunk {addr_hex} not found");
ChunkGetResponse::NotFound { address }
}
Err(e) => {
warn!("Failed to retrieve chunk {addr_hex}: {e}");
ChunkGetResponse::Error(ProtocolError::StorageFailed(e.to_string()))
}
}
}
fn handle_quote(&self, request: &ChunkQuoteRequest) -> ChunkQuoteResponse {
let addr_hex = hex::encode(request.address);
let data_size = request.data_size;
debug!("Handling quote request for {addr_hex} (size: {data_size})");
#[allow(clippy::manual_unwrap_or_default)]
let already_stored = match self.storage.exists(&request.address) {
Ok(exists) => exists,
Err(e) => {
warn!("Storage check failed for {addr_hex}: {e}");
false }
};
if already_stored {
debug!("Chunk {addr_hex} already stored — returning quote with already_stored=true");
}
let Ok(data_size_usize) = usize::try_from(request.data_size) else {
return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
size: MAX_CHUNK_SIZE + 1,
max_size: MAX_CHUNK_SIZE,
});
};
if data_size_usize > MAX_CHUNK_SIZE {
return ChunkQuoteResponse::Error(ProtocolError::ChunkTooLarge {
size: data_size_usize,
max_size: MAX_CHUNK_SIZE,
});
}
match self
.quote_generator
.create_quote(request.address, data_size_usize, request.data_type)
{
Ok(quote) => {
match rmp_serde::to_vec("e) {
Ok(quote_bytes) => ChunkQuoteResponse::Success {
quote: quote_bytes,
already_stored,
},
Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
"Failed to serialize quote: {e}"
))),
}
}
Err(e) => ChunkQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string())),
}
}
fn handle_merkle_candidate_quote(
&self,
request: &MerkleCandidateQuoteRequest,
) -> MerkleCandidateQuoteResponse {
let addr_hex = hex::encode(request.address);
let data_size = request.data_size;
debug!(
"Handling merkle candidate quote request for {addr_hex} (size: {data_size}, ts: {})",
request.merkle_payment_timestamp
);
let Ok(data_size_usize) = usize::try_from(request.data_size) else {
return MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
"data_size {} overflows usize",
request.data_size
)));
};
if data_size_usize > MAX_CHUNK_SIZE {
return MerkleCandidateQuoteResponse::Error(ProtocolError::ChunkTooLarge {
size: data_size_usize,
max_size: MAX_CHUNK_SIZE,
});
}
match self.quote_generator.create_merkle_candidate_quote(
data_size_usize,
request.data_type,
request.merkle_payment_timestamp,
) {
Ok(candidate_node) => match rmp_serde::to_vec(&candidate_node) {
Ok(bytes) => MerkleCandidateQuoteResponse::Success {
candidate_node: bytes,
},
Err(e) => MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(format!(
"Failed to serialize merkle candidate node: {e}"
))),
},
Err(e) => {
MerkleCandidateQuoteResponse::Error(ProtocolError::QuoteFailed(e.to_string()))
}
}
}
#[must_use]
pub fn storage_stats(&self) -> crate::storage::StorageStats {
self.storage.stats()
}
#[must_use]
pub fn payment_cache_stats(&self) -> crate::payment::CacheStats {
self.payment_verifier.cache_stats()
}
#[cfg(any(test, feature = "test-utils"))]
#[must_use]
pub fn payment_verifier(&self) -> &PaymentVerifier {
&self.payment_verifier
}
pub fn exists(&self, address: &[u8; 32]) -> Result<bool> {
self.storage.exists(address)
}
pub async fn get_local(&self, address: &[u8; 32]) -> Result<Option<Vec<u8>>> {
self.storage.get(address).await
}
#[cfg(test)]
pub async fn put_local(&self, address: &[u8; 32], content: &[u8]) -> Result<bool> {
self.storage.put(address, content).await
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::payment::metrics::QuotingMetricsTracker;
use crate::payment::{EvmVerifierConfig, PaymentVerifierConfig};
use crate::storage::LmdbStorageConfig;
use evmlib::RewardsAddress;
use saorsa_core::identity::NodeIdentity;
use saorsa_core::MlDsa65;
use saorsa_pqc::pqc::types::MlDsaSecretKey;
use tempfile::TempDir;
async fn create_test_protocol() -> (AntProtocol, TempDir) {
let temp_dir = TempDir::new().expect("create temp dir");
let storage_config = LmdbStorageConfig {
root_dir: temp_dir.path().to_path_buf(),
..LmdbStorageConfig::test_default()
};
let storage = Arc::new(
LmdbStorage::new(storage_config)
.await
.expect("create storage"),
);
let rewards_address = RewardsAddress::new([1u8; 20]);
let payment_config = PaymentVerifierConfig {
evm: EvmVerifierConfig::default(),
cache_capacity: 100_000,
local_rewards_address: rewards_address,
};
let payment_verifier = Arc::new(PaymentVerifier::new(payment_config));
let metrics_tracker = QuotingMetricsTracker::new(100);
let mut quote_generator = QuoteGenerator::new(rewards_address, metrics_tracker);
let identity = NodeIdentity::generate().expect("generate identity");
let pub_key_bytes = identity.public_key().as_bytes().to_vec();
let sk_bytes = identity.secret_key_bytes().to_vec();
let sk = MlDsaSecretKey::from_bytes(&sk_bytes).expect("deserialize secret key");
quote_generator.set_signer(pub_key_bytes, move |msg| {
use saorsa_pqc::pqc::MlDsaOperations;
let ml_dsa = MlDsa65::new();
ml_dsa
.sign(&sk, msg)
.map_or_else(|_| vec![], |sig| sig.as_bytes().to_vec())
});
let protocol = AntProtocol::new(storage, payment_verifier, Arc::new(quote_generator));
(protocol, temp_dir)
}
#[tokio::test]
async fn test_put_and_get_chunk() {
let (protocol, _temp) = create_test_protocol().await;
let content = b"hello world";
let address = LmdbStorage::compute_address(content);
protocol.payment_verifier().cache_insert(address);
let put_request = ChunkPutRequest::new(address, content.to_vec());
let put_msg = ChunkMessage {
request_id: 1,
body: ChunkMessageBody::PutRequest(put_request),
};
let put_bytes = put_msg.encode().expect("encode put");
let response_bytes = protocol
.try_handle_request(&put_bytes)
.await
.expect("handle put")
.expect("expected response");
let response = ChunkMessage::decode(&response_bytes).expect("decode response");
assert_eq!(response.request_id, 1);
if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) =
response.body
{
assert_eq!(addr, address);
} else {
panic!("expected PutResponse::Success, got: {response:?}");
}
let get_request = ChunkGetRequest::new(address);
let get_msg = ChunkMessage {
request_id: 2,
body: ChunkMessageBody::GetRequest(get_request),
};
let get_bytes = get_msg.encode().expect("encode get");
let response_bytes = protocol
.try_handle_request(&get_bytes)
.await
.expect("handle get")
.expect("expected response");
let response = ChunkMessage::decode(&response_bytes).expect("decode response");
assert_eq!(response.request_id, 2);
if let ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
address: addr,
content: data,
}) = response.body
{
assert_eq!(addr, address);
assert_eq!(data, content.to_vec());
} else {
panic!("expected GetResponse::Success");
}
}
#[tokio::test]
async fn test_get_not_found() {
let (protocol, _temp) = create_test_protocol().await;
let address = [0xAB; 32];
let get_request = ChunkGetRequest::new(address);
let get_msg = ChunkMessage {
request_id: 10,
body: ChunkMessageBody::GetRequest(get_request),
};
let get_bytes = get_msg.encode().expect("encode get");
let response_bytes = protocol
.try_handle_request(&get_bytes)
.await
.expect("handle get")
.expect("expected response");
let response = ChunkMessage::decode(&response_bytes).expect("decode response");
assert_eq!(response.request_id, 10);
if let ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { address: addr }) =
response.body
{
assert_eq!(addr, address);
} else {
panic!("expected GetResponse::NotFound");
}
}
#[tokio::test]
async fn test_put_address_mismatch() {
let (protocol, _temp) = create_test_protocol().await;
let content = b"test content";
let wrong_address = [0xFF; 32];
protocol.payment_verifier().cache_insert(wrong_address);
let put_request = ChunkPutRequest::new(wrong_address, content.to_vec());
let put_msg = ChunkMessage {
request_id: 20,
body: ChunkMessageBody::PutRequest(put_request),
};
let put_bytes = put_msg.encode().expect("encode put");
let response_bytes = protocol
.try_handle_request(&put_bytes)
.await
.expect("handle put")
.expect("expected response");
let response = ChunkMessage::decode(&response_bytes).expect("decode response");
assert_eq!(response.request_id, 20);
if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
ProtocolError::AddressMismatch { .. },
)) = response.body
{
} else {
panic!("expected AddressMismatch error, got: {response:?}");
}
}
#[tokio::test]
async fn test_put_chunk_too_large() {
let (protocol, _temp) = create_test_protocol().await;
let content = vec![0u8; MAX_CHUNK_SIZE + 1];
let address = LmdbStorage::compute_address(&content);
let put_request = ChunkPutRequest::new(address, content);
let put_msg = ChunkMessage {
request_id: 30,
body: ChunkMessageBody::PutRequest(put_request),
};
let put_bytes = put_msg.encode().expect("encode put");
let response_bytes = protocol
.try_handle_request(&put_bytes)
.await
.expect("handle put")
.expect("expected response");
let response = ChunkMessage::decode(&response_bytes).expect("decode response");
assert_eq!(response.request_id, 30);
if let ChunkMessageBody::PutResponse(ChunkPutResponse::Error(
ProtocolError::ChunkTooLarge { .. },
)) = response.body
{
} else {
panic!("expected ChunkTooLarge error");
}
}
#[tokio::test]
async fn test_put_already_exists() {
let (protocol, _temp) = create_test_protocol().await;
let content = b"duplicate content";
let address = LmdbStorage::compute_address(content);
protocol.payment_verifier().cache_insert(address);
let put_request = ChunkPutRequest::new(address, content.to_vec());
let put_msg = ChunkMessage {
request_id: 40,
body: ChunkMessageBody::PutRequest(put_request),
};
let put_bytes = put_msg.encode().expect("encode put");
let _ = protocol
.try_handle_request(&put_bytes)
.await
.expect("handle put");
let response_bytes = protocol
.try_handle_request(&put_bytes)
.await
.expect("handle put 2")
.expect("expected response");
let response = ChunkMessage::decode(&response_bytes).expect("decode response");
assert_eq!(response.request_id, 40);
if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { address: addr }) =
response.body
{
assert_eq!(addr, address);
} else {
panic!("expected AlreadyExists");
}
}
#[tokio::test]
async fn test_protocol_id() {
let (protocol, _temp) = create_test_protocol().await;
assert_eq!(protocol.protocol_id(), CHUNK_PROTOCOL_ID);
}
#[tokio::test]
async fn test_exists_and_local_access() {
let (protocol, _temp) = create_test_protocol().await;
let content = b"local access test";
let address = LmdbStorage::compute_address(content);
assert!(!protocol.exists(&address).expect("exists check"));
protocol
.put_local(&address, content)
.await
.expect("put local");
assert!(protocol.exists(&address).expect("exists check"));
let retrieved = protocol.get_local(&address).await.expect("get local");
assert_eq!(retrieved, Some(content.to_vec()));
}
#[tokio::test]
async fn test_cache_insert_is_visible() {
let (protocol, _temp) = create_test_protocol().await;
let content = b"cache test content";
let address = LmdbStorage::compute_address(content);
let stats_before = protocol.payment_cache_stats();
assert_eq!(stats_before.additions, 0);
protocol.payment_verifier().cache_insert(address);
let stats_after = protocol.payment_cache_stats();
assert_eq!(stats_after.additions, 1);
let put_request = ChunkPutRequest::new(address, content.to_vec());
let put_msg = ChunkMessage {
request_id: 100,
body: ChunkMessageBody::PutRequest(put_request),
};
let put_bytes = put_msg.encode().expect("encode put");
let response_bytes = protocol
.try_handle_request(&put_bytes)
.await
.expect("handle put")
.expect("expected response");
let response = ChunkMessage::decode(&response_bytes).expect("decode");
if let ChunkMessageBody::PutResponse(ChunkPutResponse::Success { .. }) = response.body {
} else {
panic!("expected success, got: {response:?}");
}
}
#[tokio::test]
async fn test_put_same_chunk_twice_hits_cache() {
let (protocol, _temp) = create_test_protocol().await;
let content = b"duplicate cache test";
let address = LmdbStorage::compute_address(content);
protocol.payment_verifier().cache_insert(address);
let put_request = ChunkPutRequest::new(address, content.to_vec());
let put_msg = ChunkMessage {
request_id: 110,
body: ChunkMessageBody::PutRequest(put_request),
};
let put_bytes = put_msg.encode().expect("encode put");
let _ = protocol
.try_handle_request(&put_bytes)
.await
.expect("handle put 1");
let response_bytes = protocol
.try_handle_request(&put_bytes)
.await
.expect("handle put 2")
.expect("expected response");
let response = ChunkMessage::decode(&response_bytes).expect("decode");
if let ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists { .. }) = response.body
{
} else {
panic!("expected AlreadyExists, got: {response:?}");
}
}
#[tokio::test]
async fn test_payment_cache_stats_returns_correct_values() {
let (protocol, _temp) = create_test_protocol().await;
let stats = protocol.payment_cache_stats();
assert_eq!(stats.hits, 0);
assert_eq!(stats.misses, 0);
assert_eq!(stats.additions, 0);
let content = b"stats test";
let address = LmdbStorage::compute_address(content);
protocol.payment_verifier().cache_insert(address);
let put_request = ChunkPutRequest::new(address, content.to_vec());
let put_msg = ChunkMessage {
request_id: 120,
body: ChunkMessageBody::PutRequest(put_request),
};
let put_bytes = put_msg.encode().expect("encode put");
let _ = protocol
.try_handle_request(&put_bytes)
.await
.expect("handle put");
let stats = protocol.payment_cache_stats();
assert_eq!(stats.additions, 1);
assert_eq!(stats.hits, 1);
}
#[tokio::test]
async fn test_storage_stats() {
let (protocol, _temp) = create_test_protocol().await;
let stats = protocol.storage_stats();
assert_eq!(stats.chunks_stored, 0);
}
#[tokio::test]
async fn test_merkle_candidate_quote_request() {
use crate::payment::quote::verify_merkle_candidate_signature;
use evmlib::merkle_payments::MerklePaymentCandidateNode;
let (protocol, _temp) = create_test_protocol().await;
let address = [0x77; 32];
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system time")
.as_secs();
let request = MerkleCandidateQuoteRequest {
address,
data_type: DATA_TYPE_CHUNK,
data_size: 4096,
merkle_payment_timestamp: timestamp,
};
let msg = ChunkMessage {
request_id: 600,
body: ChunkMessageBody::MerkleCandidateQuoteRequest(request),
};
let msg_bytes = msg.encode().expect("encode request");
let response_bytes = protocol
.try_handle_request(&msg_bytes)
.await
.expect("handle merkle candidate quote")
.expect("expected response");
let response = ChunkMessage::decode(&response_bytes).expect("decode response");
assert_eq!(response.request_id, 600);
match response.body {
ChunkMessageBody::MerkleCandidateQuoteResponse(
MerkleCandidateQuoteResponse::Success { candidate_node },
) => {
let candidate: MerklePaymentCandidateNode =
rmp_serde::from_slice(&candidate_node).expect("deserialize candidate node");
assert!(
verify_merkle_candidate_signature(&candidate),
"ML-DSA-65 candidate signature must be valid"
);
assert_eq!(candidate.merkle_payment_timestamp, timestamp);
assert!(candidate.price >= evmlib::common::Amount::ZERO);
}
other => panic!("expected MerkleCandidateQuoteResponse::Success, got: {other:?}"),
}
}
#[tokio::test]
async fn test_handle_unexpected_response_message() {
let (protocol, _temp) = create_test_protocol().await;
let msg = ChunkMessage {
request_id: 200,
body: ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: [0u8; 32] }),
};
let msg_bytes = msg.encode().expect("encode");
let result = protocol
.try_handle_request(&msg_bytes)
.await
.expect("handle msg");
assert!(
result.is_none(),
"expected None for response message, got: {result:?}"
);
}
#[tokio::test]
async fn test_quote_already_stored_flag() {
let (protocol, _temp) = create_test_protocol().await;
let content = b"already stored quote test";
let address = LmdbStorage::compute_address(content);
protocol.payment_verifier().cache_insert(address);
let put_request = ChunkPutRequest::new(address, content.to_vec());
let put_msg = ChunkMessage {
request_id: 300,
body: ChunkMessageBody::PutRequest(put_request),
};
let put_bytes = put_msg.encode().expect("encode put");
let _ = protocol
.try_handle_request(&put_bytes)
.await
.expect("handle put");
let quote_request = ChunkQuoteRequest {
address,
data_size: content.len() as u64,
data_type: DATA_TYPE_CHUNK,
};
let quote_msg = ChunkMessage {
request_id: 301,
body: ChunkMessageBody::QuoteRequest(quote_request),
};
let quote_bytes = quote_msg.encode().expect("encode quote");
let response_bytes = protocol
.try_handle_request("e_bytes)
.await
.expect("handle quote")
.expect("expected response");
let response = ChunkMessage::decode(&response_bytes).expect("decode");
match response.body {
ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
already_stored, ..
}) => {
assert!(
already_stored,
"already_stored should be true for existing chunk"
);
}
other => panic!("expected Success with already_stored, got: {other:?}"),
}
let new_address = [0xFFu8; 32];
let quote_request2 = ChunkQuoteRequest {
address: new_address,
data_size: 100,
data_type: DATA_TYPE_CHUNK,
};
let quote_msg2 = ChunkMessage {
request_id: 302,
body: ChunkMessageBody::QuoteRequest(quote_request2),
};
let quote_bytes2 = quote_msg2.encode().expect("encode quote2");
let response_bytes2 = protocol
.try_handle_request("e_bytes2)
.await
.expect("handle quote2")
.expect("expected response");
let response2 = ChunkMessage::decode(&response_bytes2).expect("decode2");
match response2.body {
ChunkMessageBody::QuoteResponse(ChunkQuoteResponse::Success {
already_stored, ..
}) => {
assert!(
!already_stored,
"already_stored should be false for new chunk"
);
}
other => panic!("expected Success with already_stored=false, got: {other:?}"),
}
}
}