1use crate::data::client::batch::{finalize_batch_payment, PreparedChunk};
7use crate::data::client::peer_cache::record_peer_outcome;
8use crate::data::client::Client;
9use crate::data::error::{Error, Result};
10use ant_protocol::evm::{QuoteHash, TxHash};
11use ant_protocol::transport::{MultiAddr, PeerId};
12use ant_protocol::{
13 compute_address, detect_proof_type, send_and_await_chunk_response, ChunkGetRequest,
14 ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk,
15 ProofType, XorName, CLOSE_GROUP_MAJORITY,
16};
17use bytes::Bytes;
18use futures::stream::{FuturesUnordered, StreamExt};
19use std::collections::HashMap;
20use std::future::Future;
21use std::time::{Duration, Instant};
22use tracing::{debug, info, warn};
23
24const CHUNK_DATA_TYPE: u32 = 0;
26
27const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
29
30fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration {
31 match detect_proof_type(proof) {
32 Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs),
33 _ => STORE_RESPONSE_TIMEOUT,
34 }
35}
36
37impl Client {
38 pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
49 let address = compute_address(&content);
50 let data_size = u64::try_from(content.len())
51 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
52
53 match self
54 .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
55 .await
56 {
57 Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
58 Err(Error::AlreadyStored) => {
59 debug!(
60 "Chunk {} already stored on network, skipping payment",
61 hex::encode(address)
62 );
63 Ok(address)
64 }
65 Err(e) => Err(e),
66 }
67 }
68
69 pub(crate) async fn chunk_put_to_close_group(
81 &self,
82 content: Bytes,
83 proof: Vec<u8>,
84 peers: &[(PeerId, Vec<MultiAddr>)],
85 ) -> Result<XorName> {
86 let address = compute_address(&content);
87
88 let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
89 let (initial_peers, fallback_peers) = peers.split_at(initial_count);
90
91 let mut put_futures = FuturesUnordered::new();
92 for (peer_id, addrs) in initial_peers {
93 put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs));
94 }
95
96 let mut success_count = 0usize;
97 let mut failures: Vec<String> = Vec::new();
98 let mut fallback_iter = fallback_peers.iter();
99
100 while let Some((peer_id, result)) = put_futures.next().await {
101 match result {
102 Ok(_) => {
103 success_count += 1;
104 if success_count >= CLOSE_GROUP_MAJORITY {
105 debug!(
106 "Chunk {} stored on {success_count} peers (majority reached)",
107 hex::encode(address)
108 );
109 return Ok(address);
110 }
111 }
112 Err(e) => {
113 warn!("Failed to store chunk on {peer_id}: {e}");
114 failures.push(format!("{peer_id}: {e}"));
115
116 if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
117 debug!(
118 "Falling back to peer {fb_peer} for chunk {}",
119 hex::encode(address)
120 );
121 put_futures.push(self.spawn_chunk_put(
122 content.clone(),
123 proof.clone(),
124 fb_peer,
125 fb_addrs,
126 ));
127 }
128 }
129 }
130 }
131
132 Err(Error::InsufficientPeers(format!(
133 "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]",
134 failures.join("; ")
135 )))
136 }
137
138 fn spawn_chunk_put<'a>(
140 &'a self,
141 content: Bytes,
142 proof: Vec<u8>,
143 peer_id: &'a PeerId,
144 addrs: &'a [MultiAddr],
145 ) -> impl Future<Output = (PeerId, Result<XorName>)> + 'a {
146 let peer_id_owned = *peer_id;
147 async move {
148 let result = self
149 .chunk_put_with_proof(content, proof, &peer_id_owned, addrs)
150 .await;
151 (peer_id_owned, result)
152 }
153 }
154
155 pub async fn chunk_put_with_proof(
164 &self,
165 content: Bytes,
166 proof: Vec<u8>,
167 target_peer: &PeerId,
168 peer_addrs: &[MultiAddr],
169 ) -> Result<XorName> {
170 let address = compute_address(&content);
171 let node = self.network().node();
172 let timeout =
173 store_response_timeout_for_proof(&proof, self.config().merkle_store_timeout_secs);
174 let timeout_secs = timeout.as_secs();
175
176 let request_id = self.next_request_id();
177 let request = ChunkPutRequest::with_payment(address, content, proof);
181 let message = ChunkMessage {
182 request_id,
183 body: ChunkMessageBody::PutRequest(request),
184 };
185 let message_bytes = message
186 .encode()
187 .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
188
189 let addr_hex = hex::encode(address);
190
191 let result = send_and_await_chunk_response(
192 node,
193 target_peer,
194 message_bytes,
195 request_id,
196 timeout,
197 peer_addrs,
198 |body| match body {
199 ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
200 debug!("Chunk stored at {}", hex::encode(addr));
201 Some(Ok(addr))
202 }
203 ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
204 address: addr,
205 }) => {
206 debug!("Chunk already exists at {}", hex::encode(addr));
207 Some(Ok(addr))
208 }
209 ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
210 Some(Err(Error::Payment(format!("Payment required: {message}"))))
211 }
212 ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => Some(Err(
213 Error::Protocol(format!("Remote PUT error for {addr_hex}: {e}")),
214 )),
215 _ => None,
216 },
217 |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
218 || {
219 Error::Timeout(format!(
220 "Timeout waiting for store response after {timeout_secs}s"
221 ))
222 },
223 )
224 .await;
225
226 record_peer_outcome(node, *target_peer, peer_addrs, result.is_ok(), None).await;
231
232 result
233 }
234
235 pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
246 if let Some(cached) = self.chunk_cache().get(address) {
248 let computed = compute_address(&cached);
249 if computed == *address {
250 debug!("Cache hit for chunk {}", hex::encode(address));
251 return Ok(Some(DataChunk::new(*address, cached)));
252 }
253 debug!(
255 "Cache corruption detected for {}: evicting",
256 hex::encode(address)
257 );
258 self.chunk_cache().remove(address);
259 }
260
261 let peers = self.close_group_peers(address).await?;
262 let addr_hex = hex::encode(address);
263
264 let queried = peers.len();
265 let mut not_found = 0usize;
266 let mut timeout = 0usize;
267 let mut network_err = 0usize;
268
269 for (peer, addrs) in &peers {
270 match self.chunk_get_from_peer(address, peer, addrs).await {
271 Ok(Some(chunk)) => {
272 self.chunk_cache().put(chunk.address, chunk.content.clone());
273 return Ok(Some(chunk));
274 }
275 Ok(None) => {
276 not_found += 1;
277 debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
278 }
279 Err(Error::Timeout(_)) => {
280 timeout += 1;
281 debug!("Peer {peer} timed out for chunk {addr_hex}, trying next");
282 }
283 Err(Error::Network(_)) => {
284 network_err += 1;
285 debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
286 }
287 Err(e) => return Err(e),
288 }
289 }
290
291 info!(
295 "chunk_get exhausted close group for {addr_hex}: \
296 queried={queried} not_found={not_found} timeout={timeout} network_err={network_err}"
297 );
298 Ok(None)
299 }
300
301 async fn chunk_get_from_peer(
303 &self,
304 address: &XorName,
305 peer: &PeerId,
306 peer_addrs: &[MultiAddr],
307 ) -> Result<Option<DataChunk>> {
308 let node = self.network().node();
309 let request_id = self.next_request_id();
310 let request = ChunkGetRequest::new(*address);
311 let message = ChunkMessage {
312 request_id,
313 body: ChunkMessageBody::GetRequest(request),
314 };
315 let message_bytes = message
316 .encode()
317 .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
318
319 let timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
320 let addr_hex = hex::encode(address);
321 let timeout_secs = self.config().chunk_get_timeout_secs;
322
323 let start = Instant::now();
324 let result = send_and_await_chunk_response(
325 node,
326 peer,
327 message_bytes,
328 request_id,
329 timeout,
330 peer_addrs,
331 |body| match body {
332 ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
333 address: addr,
334 content,
335 }) => {
336 if addr != *address {
337 return Some(Err(Error::InvalidData(format!(
338 "Mismatched chunk address: expected {addr_hex}, got {}",
339 hex::encode(addr)
340 ))));
341 }
342
343 let computed = compute_address(&content);
344 if computed != addr {
345 return Some(Err(Error::InvalidData(format!(
346 "Invalid chunk content: expected hash {addr_hex}, got {}",
347 hex::encode(computed)
348 ))));
349 }
350
351 debug!(
352 "Retrieved chunk {} ({} bytes) from peer {peer}",
353 hex::encode(addr),
354 content.len()
355 );
356 Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
357 }
358 ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
359 ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
360 Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
361 )),
362 _ => None,
363 },
364 |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
365 || {
366 Error::Timeout(format!(
367 "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
368 ))
369 },
370 )
371 .await;
372
373 let success = result.is_ok();
374 let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
375 record_peer_outcome(node, *peer, peer_addrs, success, rtt_ms).await;
376
377 result
378 }
379
380 pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
386 self.chunk_get(address).await.map(|opt| opt.is_some())
387 }
388
389 pub async fn finalize_chunk(
407 &self,
408 prepared: PreparedChunk,
409 tx_hash_map: &HashMap<QuoteHash, TxHash>,
410 ) -> Result<XorName> {
411 let mut paid = finalize_batch_payment(vec![prepared], tx_hash_map)?;
412 let chunk = paid.pop().ok_or_else(|| {
416 Error::Payment(
417 "finalize_batch_payment returned no paid chunks for a single \
418 prepared chunk — internal invariant violated"
419 .into(),
420 )
421 })?;
422 self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers)
423 .await
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430 use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE};
431
432 const TEST_MERKLE_TIMEOUT_SECS: u64 = 60;
434 const UNKNOWN_PROOF_TAG: u8 = 0xff;
436
437 #[test]
438 fn single_node_proof_uses_store_response_timeout() {
439 let timeout =
440 store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS);
441
442 assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
443 }
444
445 #[test]
446 fn unknown_proof_uses_store_response_timeout() {
447 let timeout =
448 store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS);
449
450 assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
451 }
452
453 #[test]
454 fn merkle_proof_uses_configured_store_timeout() {
455 let timeout =
456 store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS);
457
458 assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS));
459 }
460
461 #[test]
468 fn default_merkle_store_timeout_satisfies_storer_invariant() {
469 use crate::data::client::ClientConfig;
470 const STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS: u64 = 240;
471 const MIN_PADDING_SECS: u64 = 30;
472 let config = ClientConfig::default();
473 assert!(
474 config.merkle_store_timeout_secs
475 >= STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS + MIN_PADDING_SECS,
476 "merkle_store_timeout_secs ({}) must be >= storer CLOSENESS_LOOKUP_TIMEOUT ({}) + padding ({})",
477 config.merkle_store_timeout_secs,
478 STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS,
479 MIN_PADDING_SECS,
480 );
481 }
482
483 #[test]
492 fn non_merkle_put_ignores_merkle_timeout_value() {
493 let absurd_merkle_timeout = 9_999;
494 for tag in [PROOF_TAG_SINGLE_NODE, UNKNOWN_PROOF_TAG] {
495 let timeout = store_response_timeout_for_proof(&[tag], absurd_merkle_timeout);
496 assert_eq!(
497 timeout, STORE_RESPONSE_TIMEOUT,
498 "non-merkle proof tag {tag:#x} should ignore merkle timeout {absurd_merkle_timeout}",
499 );
500 }
501 }
502}