use crate::data::client::adaptive::Outcome;
use crate::data::client::batch::{finalize_batch_payment, PreparedChunk};
use crate::data::client::peer_xor_distance;
use crate::data::client::Client;
use crate::data::error::{Error, Result};
use ant_protocol::evm::{QuoteHash, TxHash};
use ant_protocol::transport::{MultiAddr, PeerId};
use ant_protocol::{
compute_address, detect_proof_type, send_and_await_chunk_response, ChunkGetRequest,
ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk,
ProofType, XorName, CLOSE_GROUP_MAJORITY,
};
use bytes::Bytes;
use futures::stream::{self, FuturesUnordered, StreamExt};
use std::collections::HashMap;
use std::future::Future;
use std::time::{Duration, Instant};
use tracing::{debug, info, warn};
const CHUNK_DATA_TYPE: u32 = 0;
struct CloseGroupOutcome {
chunk: Option<DataChunk>,
queried: usize,
not_found: usize,
timeout: usize,
network_err: usize,
protocol_err: usize,
}
fn is_authoritative_not_found(not_found: usize, queried: usize) -> bool {
queried >= CLOSE_GROUP_MAJORITY && not_found == queried
}
const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
const DIAGNOSTIC_TIMEOUT_PADDING_WAVES: usize = 1;
pub struct ChunkPeerGetResult {
pub peer_id: PeerId,
pub peer_addrs: Vec<MultiAddr>,
pub xor_distance: [u8; 32],
pub chunk_result: Result<Option<DataChunk>>,
}
#[derive(Clone)]
struct ChunkPeerGetTarget {
index: usize,
peer_id: PeerId,
peer_addrs: Vec<MultiAddr>,
xor_distance: [u8; 32],
}
fn chunk_peer_get_targets(
peers: Vec<(PeerId, Vec<MultiAddr>)>,
address: &XorName,
) -> Vec<ChunkPeerGetTarget> {
peers
.into_iter()
.enumerate()
.map(|(index, (peer_id, peer_addrs))| ChunkPeerGetTarget {
index,
peer_id,
peer_addrs,
xor_distance: peer_xor_distance(&peer_id, address),
})
.collect()
}
fn sort_chunk_peer_get_results(results: &mut [ChunkPeerGetResult]) {
results.sort_by_key(|result| result.xor_distance);
}
fn diagnostic_peer_get_concurrency(peer_count: usize, close_group_size: usize) -> usize {
peer_count.min(close_group_size.max(1))
}
fn diagnostic_peer_get_overall_timeout(
per_peer_timeout: Duration,
target_count: usize,
concurrency_limit: usize,
) -> Duration {
let concurrency_limit = concurrency_limit.max(1);
let peer_get_waves = target_count.div_ceil(concurrency_limit);
let timeout_waves = peer_get_waves.saturating_add(DIAGNOSTIC_TIMEOUT_PADDING_WAVES);
let timeout_waves = u32::try_from(timeout_waves).unwrap_or(u32::MAX);
per_peer_timeout.saturating_mul(timeout_waves)
}
fn timed_out_chunk_peer_get_result(
target: &ChunkPeerGetTarget,
address: &XorName,
timeout: Duration,
) -> ChunkPeerGetResult {
let addr_hex = hex::encode(address);
let timeout_secs = timeout.as_secs();
ChunkPeerGetResult {
peer_id: target.peer_id,
peer_addrs: target.peer_addrs.clone(),
xor_distance: target.xor_distance,
chunk_result: Err(Error::Timeout(format!(
"Diagnostic chunk GET sweep timed out before peer {} completed for chunk {addr_hex} after {timeout_secs}s",
target.peer_id
))),
}
}
fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration {
match detect_proof_type(proof) {
Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs),
_ => STORE_RESPONSE_TIMEOUT,
}
}
impl Client {
pub(crate) async fn chunk_get_observed(&self, address: &XorName) -> Result<Option<DataChunk>> {
self.chunk_get_observed_from_closest_peers(address, self.config().close_group_size)
.await
}
pub(crate) async fn chunk_get_observed_from_closest_peers(
&self,
address: &XorName,
peer_count: usize,
) -> Result<Option<DataChunk>> {
let started = Instant::now();
let result = self.chunk_get_from_closest_peers(address, peer_count).await;
let latency = started.elapsed();
let bytes = result
.as_ref()
.ok()
.and_then(Option::as_ref)
.map_or(0, |chunk| chunk.content.len() as u64);
self.controller()
.fetch
.observe_with_bytes(chunk_get_outcome(&result), latency, bytes);
result
}
}
pub(crate) fn chunk_get_outcome(result: &Result<Option<DataChunk>>) -> Outcome {
match result {
Ok(Some(_)) => Outcome::Success,
Ok(None) => Outcome::Timeout,
Err(Error::Timeout(_)) => Outcome::Timeout,
Err(Error::Network(_)) => Outcome::NetworkError,
Err(_) => Outcome::ApplicationError,
}
}
impl Client {
pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
let address = compute_address(&content);
let data_size = u64::try_from(content.len())
.map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
match self
.pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
.await
{
Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
Err(Error::AlreadyStored) => {
debug!(
"Chunk {} already stored on network, skipping payment",
hex::encode(address)
);
Ok(address)
}
Err(e) => Err(e),
}
}
pub(crate) async fn chunk_put_to_close_group(
&self,
content: Bytes,
proof: Vec<u8>,
peers: &[(PeerId, Vec<MultiAddr>)],
) -> Result<XorName> {
let address = compute_address(&content);
let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
let (initial_peers, fallback_peers) = peers.split_at(initial_count);
let mut put_futures = FuturesUnordered::new();
for (peer_id, addrs) in initial_peers {
put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs));
}
let mut success_count = 0usize;
let mut failures: Vec<String> = Vec::new();
let mut had_non_rejection_failure = false;
let mut first_remote_rejection: Option<Error> = None;
let mut fallback_iter = fallback_peers.iter();
while let Some((peer_id, result)) = put_futures.next().await {
match result {
Ok(_) => {
success_count += 1;
if success_count >= CLOSE_GROUP_MAJORITY {
debug!(
"Chunk {} stored on {success_count} peers (majority reached)",
hex::encode(address)
);
return Ok(address);
}
}
Err(e) => {
warn!("Failed to store chunk on {peer_id}: {e}");
failures.push(format!("{peer_id}: {e}"));
if matches!(e, Error::RemotePut { .. }) {
if first_remote_rejection.is_none() {
first_remote_rejection = Some(e);
}
} else {
had_non_rejection_failure = true;
}
if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
debug!(
"Falling back to peer {fb_peer} for chunk {}",
hex::encode(address)
);
put_futures.push(self.spawn_chunk_put(
content.clone(),
proof.clone(),
fb_peer,
fb_addrs,
));
}
}
}
}
if !had_non_rejection_failure {
if let Some(remote_rejection) = first_remote_rejection {
return Err(remote_rejection);
}
}
Err(Error::InsufficientPeers(format!(
"Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]",
failures.join("; ")
)))
}
fn spawn_chunk_put<'a>(
&'a self,
content: Bytes,
proof: Vec<u8>,
peer_id: &'a PeerId,
addrs: &'a [MultiAddr],
) -> impl Future<Output = (PeerId, Result<XorName>)> + 'a {
let peer_id_owned = *peer_id;
async move {
let result = self
.chunk_put_with_proof(content, proof, &peer_id_owned, addrs)
.await;
(peer_id_owned, result)
}
}
pub async fn chunk_put_with_proof(
&self,
content: Bytes,
proof: Vec<u8>,
target_peer: &PeerId,
peer_addrs: &[MultiAddr],
) -> Result<XorName> {
let address = compute_address(&content);
let node = self.network().node();
let timeout =
store_response_timeout_for_proof(&proof, self.config().merkle_store_timeout_secs);
let timeout_secs = timeout.as_secs();
let request_id = self.next_request_id();
let request = ChunkPutRequest::with_payment(address, content, proof);
let message = ChunkMessage {
request_id,
body: ChunkMessageBody::PutRequest(request),
};
let message_bytes = message
.encode()
.map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
let addr_hex = hex::encode(address);
let result = send_and_await_chunk_response(
node,
target_peer,
message_bytes,
request_id,
timeout,
peer_addrs,
|body| match body {
ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
debug!("Chunk stored at {}", hex::encode(addr));
Some(Ok(addr))
}
ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
address: addr,
}) => {
debug!("Chunk already exists at {}", hex::encode(addr));
Some(Ok(addr))
}
ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
Some(Err(Error::Payment(format!("Payment required: {message}"))))
}
ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => {
Some(Err(Error::RemotePut {
address: addr_hex.clone(),
source: e,
}))
}
_ => None,
},
|e| Error::Network(format!("Failed to send PUT to peer: {e}")),
|| {
Error::Timeout(format!(
"Timeout waiting for store response after {timeout_secs}s"
))
},
)
.await;
result
}
pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
self.chunk_get_from_closest_peers(address, self.config().close_group_size)
.await
}
pub async fn chunk_get_from_closest_peers(
&self,
address: &XorName,
peer_count: usize,
) -> Result<Option<DataChunk>> {
if let Some(cached) = self.chunk_cache().get(address) {
let computed = compute_address(&cached);
if computed == *address {
debug!("Cache hit for chunk {}", hex::encode(address));
return Ok(Some(DataChunk::new(*address, cached)));
}
debug!(
"Cache corruption detected for {}: evicting",
hex::encode(address)
);
self.chunk_cache().remove(address);
}
let addr_hex = hex::encode(address);
let first = match self.chunk_get_try_closest_peers(address, peer_count).await {
Ok(outcome) => outcome,
Err(e) => {
info!("chunk_get first close-group lookup failed for {addr_hex}: {e}; will retry");
CloseGroupOutcome {
chunk: None,
queried: 0,
not_found: 0,
timeout: 0,
network_err: 0,
protocol_err: 0,
}
}
};
if let Some(chunk) = first.chunk {
self.chunk_cache().put(chunk.address, chunk.content.clone());
return Ok(Some(chunk));
}
if is_authoritative_not_found(first.not_found, first.queried) {
info!(
"chunk_get giving up on {addr_hex} (unanimous NotFound): \
queried={} not_found={} timeout={} network_err={} protocol_err={}",
first.queried,
first.not_found,
first.timeout,
first.network_err,
first.protocol_err,
);
return Ok(None);
}
info!(
"chunk_get retrying {addr_hex} after reachability failure: \
queried={} not_found={} timeout={} network_err={} protocol_err={}",
first.queried, first.not_found, first.timeout, first.network_err, first.protocol_err,
);
tokio::time::sleep(Duration::from_secs(1)).await;
let retry = match self.chunk_get_try_closest_peers(address, peer_count).await {
Ok(o) => o,
Err(e) => {
info!(
"chunk_get retry close-group lookup failed for {addr_hex}: {e}; \
first(queried={} not_found={} timeout={} network_err={} protocol_err={})",
first.queried,
first.not_found,
first.timeout,
first.network_err,
first.protocol_err,
);
return Ok(None);
}
};
if let Some(chunk) = retry.chunk {
info!("chunk_get retry succeeded for {addr_hex}");
self.chunk_cache().put(chunk.address, chunk.content.clone());
return Ok(Some(chunk));
}
info!(
"chunk_get exhausted close group after retry for {addr_hex}: \
first(queried={} not_found={} timeout={} network_err={} protocol_err={}) \
retry(queried={} not_found={} timeout={} network_err={} protocol_err={})",
first.queried,
first.not_found,
first.timeout,
first.network_err,
first.protocol_err,
retry.queried,
retry.not_found,
retry.timeout,
retry.network_err,
retry.protocol_err,
);
Ok(None)
}
async fn chunk_get_try_closest_peers(
&self,
address: &XorName,
peer_count: usize,
) -> Result<CloseGroupOutcome> {
let peers = self.closest_peers(address, peer_count).await?;
let addr_hex = hex::encode(address);
let queried = peers.len();
let mut not_found = 0usize;
let mut timeout = 0usize;
let mut network_err = 0usize;
let mut protocol_err = 0usize;
for (peer, addrs) in &peers {
match self.chunk_get_from_peer(address, peer, addrs).await {
Ok(Some(chunk)) => {
return Ok(CloseGroupOutcome {
chunk: Some(chunk),
queried,
not_found,
timeout,
network_err,
protocol_err,
});
}
Ok(None) => {
not_found += 1;
debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
}
Err(Error::Timeout(_)) => {
timeout += 1;
debug!("Peer {peer} timed out for chunk {addr_hex}, trying next");
}
Err(Error::Network(_)) => {
network_err += 1;
debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
}
Err(Error::Protocol(ref e)) => {
protocol_err += 1;
debug!(
"Peer {peer} returned protocol error for chunk {addr_hex} ({e}), trying next"
);
}
Err(e) => return Err(e),
}
}
Ok(CloseGroupOutcome {
chunk: None,
queried,
not_found,
timeout,
network_err,
protocol_err,
})
}
pub async fn chunk_get_from_close_group(
&self,
address: &XorName,
) -> Result<Vec<ChunkPeerGetResult>> {
self.chunk_get_from_closest_peer_group(address, self.config().close_group_size)
.await
}
pub async fn chunk_get_from_closest_peer_group(
&self,
address: &XorName,
peer_count: usize,
) -> Result<Vec<ChunkPeerGetResult>> {
let peers = self.closest_peers(address, peer_count).await?;
let targets = chunk_peer_get_targets(peers, address);
let concurrency_limit =
diagnostic_peer_get_concurrency(peer_count, self.config().close_group_size);
let per_peer_timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
let overall_timeout =
diagnostic_peer_get_overall_timeout(per_peer_timeout, targets.len(), concurrency_limit);
let mut completed = vec![false; targets.len()];
let mut results = Vec::with_capacity(targets.len());
let mut get_results = stream::iter(targets.iter().cloned())
.map(|target| async move {
let chunk_result = self
.chunk_get_from_peer(address, &target.peer_id, &target.peer_addrs)
.await;
if let Ok(Some(chunk)) = &chunk_result {
self.chunk_cache().put(chunk.address, chunk.content.clone());
}
(
target.index,
ChunkPeerGetResult {
peer_id: target.peer_id,
peer_addrs: target.peer_addrs,
xor_distance: target.xor_distance,
chunk_result,
},
)
})
.buffer_unordered(concurrency_limit);
let collect_results = async {
while let Some((index, result)) = get_results.next().await {
completed[index] = true;
results.push(result);
}
};
if tokio::time::timeout(overall_timeout, collect_results)
.await
.is_err()
{
for target in &targets {
if !completed[target.index] {
results.push(timed_out_chunk_peer_get_result(
target,
address,
overall_timeout,
));
}
}
}
sort_chunk_peer_get_results(&mut results);
Ok(results)
}
async fn chunk_get_from_peer(
&self,
address: &XorName,
peer: &PeerId,
peer_addrs: &[MultiAddr],
) -> Result<Option<DataChunk>> {
let node = self.network().node();
let request_id = self.next_request_id();
let request = ChunkGetRequest::new(*address);
let message = ChunkMessage {
request_id,
body: ChunkMessageBody::GetRequest(request),
};
let message_bytes = message
.encode()
.map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
let timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
let addr_hex = hex::encode(address);
let timeout_secs = self.config().chunk_get_timeout_secs;
let result = send_and_await_chunk_response(
node,
peer,
message_bytes,
request_id,
timeout,
peer_addrs,
|body| match body {
ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
address: addr,
content,
}) => {
if addr != *address {
return Some(Err(Error::InvalidData(format!(
"Mismatched chunk address: expected {addr_hex}, got {}",
hex::encode(addr)
))));
}
let computed = compute_address(&content);
if computed != addr {
return Some(Err(Error::InvalidData(format!(
"Invalid chunk content: expected hash {addr_hex}, got {}",
hex::encode(computed)
))));
}
debug!(
"Retrieved chunk {} ({} bytes) from peer {peer}",
hex::encode(addr),
content.len()
);
Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
}
ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
)),
_ => None,
},
|e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
|| {
Error::Timeout(format!(
"Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
))
},
)
.await;
result
}
pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
self.chunk_get(address).await.map(|opt| opt.is_some())
}
pub async fn finalize_chunk(
&self,
prepared: PreparedChunk,
tx_hash_map: &HashMap<QuoteHash, TxHash>,
) -> Result<XorName> {
let mut paid = finalize_batch_payment(vec![prepared], tx_hash_map)?;
let chunk = paid.pop().ok_or_else(|| {
Error::Payment(
"finalize_batch_payment returned no paid chunks for a single \
prepared chunk — internal invariant violated"
.into(),
)
})?;
self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE};
const TEST_MERKLE_TIMEOUT_SECS: u64 = 60;
const UNKNOWN_PROOF_TAG: u8 = 0xff;
const TEST_XORNAME_BYTE_LEN: usize = 32;
const TEST_DISTANCE_TAIL_INDEX: usize = TEST_XORNAME_BYTE_LEN - 1;
fn chunk_peer_get_result(peer_seed: u8, distance_tail: u8) -> ChunkPeerGetResult {
let mut xor_distance = [0; TEST_XORNAME_BYTE_LEN];
xor_distance[TEST_DISTANCE_TAIL_INDEX] = distance_tail;
ChunkPeerGetResult {
peer_id: PeerId::from_bytes([peer_seed; TEST_XORNAME_BYTE_LEN]),
peer_addrs: Vec::new(),
xor_distance,
chunk_result: Ok(None),
}
}
#[test]
fn authoritative_not_found_requires_unanimous_well_sampled_response() {
assert!(is_authoritative_not_found(7, 7));
assert!(is_authoritative_not_found(
CLOSE_GROUP_MAJORITY,
CLOSE_GROUP_MAJORITY
));
assert!(!is_authoritative_not_found(1, 1));
assert!(!is_authoritative_not_found(3, 3));
assert!(!is_authoritative_not_found(
CLOSE_GROUP_MAJORITY - 1,
CLOSE_GROUP_MAJORITY - 1
));
assert!(!is_authoritative_not_found(4, 7));
assert!(!is_authoritative_not_found(6, 7));
assert!(!is_authoritative_not_found(0, 7));
assert!(!is_authoritative_not_found(0, 0));
}
#[test]
fn chunk_get_outcome_classifies_each_result_kind() {
let chunk = DataChunk::new([0u8; 32], Bytes::from_static(b"x"));
assert_eq!(
chunk_get_outcome(&Ok(Some(chunk))),
Outcome::Success,
"found-chunk must be Success",
);
assert_eq!(
chunk_get_outcome(&Ok(None)),
Outcome::Timeout,
"Ok(None) must be Timeout — that's the controller's load-shedding signal",
);
assert_eq!(
chunk_get_outcome(&Err(Error::Timeout("t".into()))),
Outcome::Timeout,
);
assert_eq!(
chunk_get_outcome(&Err(Error::Network("n".into()))),
Outcome::NetworkError,
);
assert_eq!(
chunk_get_outcome(&Err(Error::Protocol("p".into()))),
Outcome::ApplicationError,
);
}
#[test]
fn single_node_proof_uses_store_response_timeout() {
let timeout =
store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS);
assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
}
#[test]
fn unknown_proof_uses_store_response_timeout() {
let timeout =
store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS);
assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
}
#[test]
fn merkle_proof_uses_configured_store_timeout() {
let timeout =
store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS);
assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS));
}
#[test]
fn chunk_peer_get_results_sort_by_xor_distance() {
let mut results = vec![
chunk_peer_get_result(3, 3),
chunk_peer_get_result(1, 1),
chunk_peer_get_result(2, 2),
];
sort_chunk_peer_get_results(&mut results);
let ordered_distances = results
.iter()
.map(|result| result.xor_distance[TEST_DISTANCE_TAIL_INDEX])
.collect::<Vec<_>>();
assert_eq!(ordered_distances, vec![1, 2, 3]);
}
#[test]
fn diagnostic_peer_get_overall_timeout_allows_one_wave_plus_padding() {
const PER_PEER_TIMEOUT_SECS: u64 = 10;
const EXPECTED_WAVES_WITH_PADDING: u64 = 2;
const TARGET_COUNT: usize = 7;
const CONCURRENCY_LIMIT: usize = 7;
let timeout = diagnostic_peer_get_overall_timeout(
Duration::from_secs(PER_PEER_TIMEOUT_SECS),
TARGET_COUNT,
CONCURRENCY_LIMIT,
);
assert_eq!(
timeout,
Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
);
}
#[test]
fn diagnostic_peer_get_overall_timeout_scales_with_peer_count() {
const PER_PEER_TIMEOUT_SECS: u64 = 10;
const TARGET_COUNT: usize = 20;
const CLOSE_GROUP_SIZE: usize = 7;
const EXPECTED_WAVES_WITH_PADDING: u64 = 4;
let concurrency_limit = diagnostic_peer_get_concurrency(TARGET_COUNT, CLOSE_GROUP_SIZE);
let timeout = diagnostic_peer_get_overall_timeout(
Duration::from_secs(PER_PEER_TIMEOUT_SECS),
TARGET_COUNT,
concurrency_limit,
);
assert_eq!(
timeout,
Duration::from_secs(PER_PEER_TIMEOUT_SECS * EXPECTED_WAVES_WITH_PADDING)
);
}
#[test]
fn default_merkle_store_timeout_satisfies_storer_invariant() {
use crate::data::client::ClientConfig;
const STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS: u64 = 240;
const MIN_PADDING_SECS: u64 = 30;
let config = ClientConfig::default();
assert!(
config.merkle_store_timeout_secs
>= STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS + MIN_PADDING_SECS,
"merkle_store_timeout_secs ({}) must be >= storer CLOSENESS_LOOKUP_TIMEOUT ({}) + padding ({})",
config.merkle_store_timeout_secs,
STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS,
MIN_PADDING_SECS,
);
}
#[test]
fn non_merkle_put_ignores_merkle_timeout_value() {
let absurd_merkle_timeout = 9_999;
for tag in [PROOF_TAG_SINGLE_NODE, UNKNOWN_PROOF_TAG] {
let timeout = store_response_timeout_for_proof(&[tag], absurd_merkle_timeout);
assert_eq!(
timeout, STORE_RESPONSE_TIMEOUT,
"non-merkle proof tag {tag:#x} should ignore merkle timeout {absurd_merkle_timeout}",
);
}
}
}