1use crate::data::client::peer_cache::record_peer_outcome;
7use crate::data::client::Client;
8use crate::data::error::{Error, Result};
9use ant_protocol::transport::{MultiAddr, PeerId};
10use ant_protocol::{
11 compute_address, send_and_await_chunk_response, ChunkGetRequest, ChunkGetResponse,
12 ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk, XorName,
13 CLOSE_GROUP_MAJORITY,
14};
15use bytes::Bytes;
16use futures::stream::{FuturesUnordered, StreamExt};
17use std::future::Future;
18use std::time::{Duration, Instant};
19use tracing::{debug, warn};
20
21const CHUNK_DATA_TYPE: u32 = 0;
23
24impl Client {
25 pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
36 let address = compute_address(&content);
37 let data_size = u64::try_from(content.len())
38 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
39
40 match self
41 .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
42 .await
43 {
44 Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
45 Err(Error::AlreadyStored) => {
46 debug!(
47 "Chunk {} already stored on network, skipping payment",
48 hex::encode(address)
49 );
50 Ok(address)
51 }
52 Err(e) => Err(e),
53 }
54 }
55
56 pub(crate) async fn chunk_put_to_close_group(
68 &self,
69 content: Bytes,
70 proof: Vec<u8>,
71 peers: &[(PeerId, Vec<MultiAddr>)],
72 ) -> Result<XorName> {
73 let address = compute_address(&content);
74
75 let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
76 let (initial_peers, fallback_peers) = peers.split_at(initial_count);
77
78 let mut put_futures = FuturesUnordered::new();
79 for (peer_id, addrs) in initial_peers {
80 put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs));
81 }
82
83 let mut success_count = 0usize;
84 let mut failures: Vec<String> = Vec::new();
85 let mut fallback_iter = fallback_peers.iter();
86
87 while let Some((peer_id, result)) = put_futures.next().await {
88 match result {
89 Ok(_) => {
90 success_count += 1;
91 if success_count >= CLOSE_GROUP_MAJORITY {
92 debug!(
93 "Chunk {} stored on {success_count} peers (majority reached)",
94 hex::encode(address)
95 );
96 return Ok(address);
97 }
98 }
99 Err(e) => {
100 warn!("Failed to store chunk on {peer_id}: {e}");
101 failures.push(format!("{peer_id}: {e}"));
102
103 if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
104 debug!(
105 "Falling back to peer {fb_peer} for chunk {}",
106 hex::encode(address)
107 );
108 put_futures.push(self.spawn_chunk_put(
109 content.clone(),
110 proof.clone(),
111 fb_peer,
112 fb_addrs,
113 ));
114 }
115 }
116 }
117 }
118
119 Err(Error::InsufficientPeers(format!(
120 "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]",
121 failures.join("; ")
122 )))
123 }
124
125 fn spawn_chunk_put<'a>(
127 &'a self,
128 content: Bytes,
129 proof: Vec<u8>,
130 peer_id: &'a PeerId,
131 addrs: &'a [MultiAddr],
132 ) -> impl Future<Output = (PeerId, Result<XorName>)> + 'a {
133 let peer_id_owned = *peer_id;
134 async move {
135 let result = self
136 .chunk_put_with_proof(content, proof, &peer_id_owned, addrs)
137 .await;
138 (peer_id_owned, result)
139 }
140 }
141
142 pub async fn chunk_put_with_proof(
151 &self,
152 content: Bytes,
153 proof: Vec<u8>,
154 target_peer: &PeerId,
155 peer_addrs: &[MultiAddr],
156 ) -> Result<XorName> {
157 let address = compute_address(&content);
158 let node = self.network().node();
159
160 let request_id = self.next_request_id();
161 let request = ChunkPutRequest::with_payment(address, content.to_vec(), proof);
162 let message = ChunkMessage {
163 request_id,
164 body: ChunkMessageBody::PutRequest(request),
165 };
166 let message_bytes = message
167 .encode()
168 .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
169
170 let timeout = Duration::from_secs(self.config().store_timeout_secs);
171 let addr_hex = hex::encode(address);
172 let timeout_secs = self.config().store_timeout_secs;
173
174 let result = send_and_await_chunk_response(
175 node,
176 target_peer,
177 message_bytes,
178 request_id,
179 timeout,
180 peer_addrs,
181 |body| match body {
182 ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
183 debug!("Chunk stored at {}", hex::encode(addr));
184 Some(Ok(addr))
185 }
186 ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
187 address: addr,
188 }) => {
189 debug!("Chunk already exists at {}", hex::encode(addr));
190 Some(Ok(addr))
191 }
192 ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
193 Some(Err(Error::Payment(format!("Payment required: {message}"))))
194 }
195 ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => Some(Err(
196 Error::Protocol(format!("Remote PUT error for {addr_hex}: {e}")),
197 )),
198 _ => None,
199 },
200 |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
201 || {
202 Error::Timeout(format!(
203 "Timeout waiting for store response after {timeout_secs}s"
204 ))
205 },
206 )
207 .await;
208
209 record_peer_outcome(node, *target_peer, peer_addrs, result.is_ok(), None).await;
214
215 result
216 }
217
218 pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
229 if let Some(cached) = self.chunk_cache().get(address) {
231 let computed = compute_address(&cached);
232 if computed == *address {
233 debug!("Cache hit for chunk {}", hex::encode(address));
234 return Ok(Some(DataChunk::new(*address, cached)));
235 }
236 debug!(
238 "Cache corruption detected for {}: evicting",
239 hex::encode(address)
240 );
241 self.chunk_cache().remove(address);
242 }
243
244 let peers = self.close_group_peers(address).await?;
245 let addr_hex = hex::encode(address);
246
247 for (peer, addrs) in &peers {
248 match self.chunk_get_from_peer(address, peer, addrs).await {
249 Ok(Some(chunk)) => {
250 self.chunk_cache().put(chunk.address, chunk.content.clone());
251 return Ok(Some(chunk));
252 }
253 Ok(None) => {
254 debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
255 }
256 Err(Error::Timeout(_) | Error::Network(_)) => {
257 debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
258 }
259 Err(e) => return Err(e),
260 }
261 }
262
263 Ok(None)
265 }
266
267 async fn chunk_get_from_peer(
269 &self,
270 address: &XorName,
271 peer: &PeerId,
272 peer_addrs: &[MultiAddr],
273 ) -> Result<Option<DataChunk>> {
274 let node = self.network().node();
275 let request_id = self.next_request_id();
276 let request = ChunkGetRequest::new(*address);
277 let message = ChunkMessage {
278 request_id,
279 body: ChunkMessageBody::GetRequest(request),
280 };
281 let message_bytes = message
282 .encode()
283 .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
284
285 let timeout = Duration::from_secs(self.config().store_timeout_secs);
286 let addr_hex = hex::encode(address);
287 let timeout_secs = self.config().store_timeout_secs;
288
289 let start = Instant::now();
290 let result = send_and_await_chunk_response(
291 node,
292 peer,
293 message_bytes,
294 request_id,
295 timeout,
296 peer_addrs,
297 |body| match body {
298 ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
299 address: addr,
300 content,
301 }) => {
302 if addr != *address {
303 return Some(Err(Error::InvalidData(format!(
304 "Mismatched chunk address: expected {addr_hex}, got {}",
305 hex::encode(addr)
306 ))));
307 }
308
309 let computed = compute_address(&content);
310 if computed != addr {
311 return Some(Err(Error::InvalidData(format!(
312 "Invalid chunk content: expected hash {addr_hex}, got {}",
313 hex::encode(computed)
314 ))));
315 }
316
317 debug!(
318 "Retrieved chunk {} ({} bytes) from peer {peer}",
319 hex::encode(addr),
320 content.len()
321 );
322 Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
323 }
324 ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
325 ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
326 Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
327 )),
328 _ => None,
329 },
330 |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
331 || {
332 Error::Timeout(format!(
333 "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
334 ))
335 },
336 )
337 .await;
338
339 let success = result.is_ok();
340 let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
341 record_peer_outcome(node, *peer, peer_addrs, success, rtt_ms).await;
342
343 result
344 }
345
346 pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
352 self.chunk_get(address).await.map(|opt| opt.is_some())
353 }
354}