1use crate::data::client::peer_cache::record_peer_outcome;
7use crate::data::client::Client;
8use crate::data::error::{Error, Result};
9use ant_protocol::transport::{MultiAddr, PeerId};
10use ant_protocol::{
11 compute_address, detect_proof_type, send_and_await_chunk_response, ChunkGetRequest,
12 ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk,
13 ProofType, XorName, CLOSE_GROUP_MAJORITY,
14};
15use bytes::Bytes;
16use futures::stream::{FuturesUnordered, StreamExt};
17use std::future::Future;
18use std::time::{Duration, Instant};
19use tracing::{debug, warn};
20
21const CHUNK_DATA_TYPE: u32 = 0;
23
24const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(30);
26
27fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration {
28 match detect_proof_type(proof) {
29 Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs),
30 _ => STORE_RESPONSE_TIMEOUT,
31 }
32}
33
34impl Client {
35 pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
46 let address = compute_address(&content);
47 let data_size = u64::try_from(content.len())
48 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
49
50 match self
51 .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
52 .await
53 {
54 Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
55 Err(Error::AlreadyStored) => {
56 debug!(
57 "Chunk {} already stored on network, skipping payment",
58 hex::encode(address)
59 );
60 Ok(address)
61 }
62 Err(e) => Err(e),
63 }
64 }
65
66 pub(crate) async fn chunk_put_to_close_group(
78 &self,
79 content: Bytes,
80 proof: Vec<u8>,
81 peers: &[(PeerId, Vec<MultiAddr>)],
82 ) -> Result<XorName> {
83 let address = compute_address(&content);
84
85 let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
86 let (initial_peers, fallback_peers) = peers.split_at(initial_count);
87
88 let mut put_futures = FuturesUnordered::new();
89 for (peer_id, addrs) in initial_peers {
90 put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs));
91 }
92
93 let mut success_count = 0usize;
94 let mut failures: Vec<String> = Vec::new();
95 let mut fallback_iter = fallback_peers.iter();
96
97 while let Some((peer_id, result)) = put_futures.next().await {
98 match result {
99 Ok(_) => {
100 success_count += 1;
101 if success_count >= CLOSE_GROUP_MAJORITY {
102 debug!(
103 "Chunk {} stored on {success_count} peers (majority reached)",
104 hex::encode(address)
105 );
106 return Ok(address);
107 }
108 }
109 Err(e) => {
110 warn!("Failed to store chunk on {peer_id}: {e}");
111 failures.push(format!("{peer_id}: {e}"));
112
113 if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
114 debug!(
115 "Falling back to peer {fb_peer} for chunk {}",
116 hex::encode(address)
117 );
118 put_futures.push(self.spawn_chunk_put(
119 content.clone(),
120 proof.clone(),
121 fb_peer,
122 fb_addrs,
123 ));
124 }
125 }
126 }
127 }
128
129 Err(Error::InsufficientPeers(format!(
130 "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]",
131 failures.join("; ")
132 )))
133 }
134
135 fn spawn_chunk_put<'a>(
137 &'a self,
138 content: Bytes,
139 proof: Vec<u8>,
140 peer_id: &'a PeerId,
141 addrs: &'a [MultiAddr],
142 ) -> impl Future<Output = (PeerId, Result<XorName>)> + 'a {
143 let peer_id_owned = *peer_id;
144 async move {
145 let result = self
146 .chunk_put_with_proof(content, proof, &peer_id_owned, addrs)
147 .await;
148 (peer_id_owned, result)
149 }
150 }
151
152 pub async fn chunk_put_with_proof(
161 &self,
162 content: Bytes,
163 proof: Vec<u8>,
164 target_peer: &PeerId,
165 peer_addrs: &[MultiAddr],
166 ) -> Result<XorName> {
167 let address = compute_address(&content);
168 let node = self.network().node();
169 let timeout = store_response_timeout_for_proof(&proof, self.config().store_timeout_secs);
170 let timeout_secs = timeout.as_secs();
171
172 let request_id = self.next_request_id();
173 let request = ChunkPutRequest::with_payment(address, content.to_vec(), proof);
174 let message = ChunkMessage {
175 request_id,
176 body: ChunkMessageBody::PutRequest(request),
177 };
178 let message_bytes = message
179 .encode()
180 .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
181
182 let addr_hex = hex::encode(address);
183
184 let result = send_and_await_chunk_response(
185 node,
186 target_peer,
187 message_bytes,
188 request_id,
189 timeout,
190 peer_addrs,
191 |body| match body {
192 ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
193 debug!("Chunk stored at {}", hex::encode(addr));
194 Some(Ok(addr))
195 }
196 ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
197 address: addr,
198 }) => {
199 debug!("Chunk already exists at {}", hex::encode(addr));
200 Some(Ok(addr))
201 }
202 ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
203 Some(Err(Error::Payment(format!("Payment required: {message}"))))
204 }
205 ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => Some(Err(
206 Error::Protocol(format!("Remote PUT error for {addr_hex}: {e}")),
207 )),
208 _ => None,
209 },
210 |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
211 || {
212 Error::Timeout(format!(
213 "Timeout waiting for store response after {timeout_secs}s"
214 ))
215 },
216 )
217 .await;
218
219 record_peer_outcome(node, *target_peer, peer_addrs, result.is_ok(), None).await;
224
225 result
226 }
227
228 pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
239 if let Some(cached) = self.chunk_cache().get(address) {
241 let computed = compute_address(&cached);
242 if computed == *address {
243 debug!("Cache hit for chunk {}", hex::encode(address));
244 return Ok(Some(DataChunk::new(*address, cached)));
245 }
246 debug!(
248 "Cache corruption detected for {}: evicting",
249 hex::encode(address)
250 );
251 self.chunk_cache().remove(address);
252 }
253
254 let peers = self.close_group_peers(address).await?;
255 let addr_hex = hex::encode(address);
256
257 for (peer, addrs) in &peers {
258 match self.chunk_get_from_peer(address, peer, addrs).await {
259 Ok(Some(chunk)) => {
260 self.chunk_cache().put(chunk.address, chunk.content.clone());
261 return Ok(Some(chunk));
262 }
263 Ok(None) => {
264 debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
265 }
266 Err(Error::Timeout(_) | Error::Network(_)) => {
267 debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
268 }
269 Err(e) => return Err(e),
270 }
271 }
272
273 Ok(None)
275 }
276
277 async fn chunk_get_from_peer(
279 &self,
280 address: &XorName,
281 peer: &PeerId,
282 peer_addrs: &[MultiAddr],
283 ) -> Result<Option<DataChunk>> {
284 let node = self.network().node();
285 let request_id = self.next_request_id();
286 let request = ChunkGetRequest::new(*address);
287 let message = ChunkMessage {
288 request_id,
289 body: ChunkMessageBody::GetRequest(request),
290 };
291 let message_bytes = message
292 .encode()
293 .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
294
295 let timeout = Duration::from_secs(self.config().store_timeout_secs);
296 let addr_hex = hex::encode(address);
297 let timeout_secs = self.config().store_timeout_secs;
298
299 let start = Instant::now();
300 let result = send_and_await_chunk_response(
301 node,
302 peer,
303 message_bytes,
304 request_id,
305 timeout,
306 peer_addrs,
307 |body| match body {
308 ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
309 address: addr,
310 content,
311 }) => {
312 if addr != *address {
313 return Some(Err(Error::InvalidData(format!(
314 "Mismatched chunk address: expected {addr_hex}, got {}",
315 hex::encode(addr)
316 ))));
317 }
318
319 let computed = compute_address(&content);
320 if computed != addr {
321 return Some(Err(Error::InvalidData(format!(
322 "Invalid chunk content: expected hash {addr_hex}, got {}",
323 hex::encode(computed)
324 ))));
325 }
326
327 debug!(
328 "Retrieved chunk {} ({} bytes) from peer {peer}",
329 hex::encode(addr),
330 content.len()
331 );
332 Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
333 }
334 ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
335 ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
336 Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
337 )),
338 _ => None,
339 },
340 |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
341 || {
342 Error::Timeout(format!(
343 "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
344 ))
345 },
346 )
347 .await;
348
349 let success = result.is_ok();
350 let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
351 record_peer_outcome(node, *peer, peer_addrs, success, rtt_ms).await;
352
353 result
354 }
355
356 pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
362 self.chunk_get(address).await.map(|opt| opt.is_some())
363 }
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE};
370
371 const TEST_MERKLE_TIMEOUT_SECS: u64 = 60;
373 const UNKNOWN_PROOF_TAG: u8 = 0xff;
375
376 #[test]
377 fn single_node_proof_uses_store_response_timeout() {
378 let timeout =
379 store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS);
380
381 assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
382 }
383
384 #[test]
385 fn unknown_proof_uses_store_response_timeout() {
386 let timeout =
387 store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS);
388
389 assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
390 }
391
392 #[test]
393 fn merkle_proof_uses_configured_store_timeout() {
394 let timeout =
395 store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS);
396
397 assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS));
398 }
399}