1use crate::data::client::peer_cache::record_peer_outcome;
7use crate::data::client::Client;
8use crate::data::error::{Error, Result};
9use ant_protocol::transport::{MultiAddr, PeerId};
10use ant_protocol::{
11 compute_address, send_and_await_chunk_response, ChunkGetRequest, ChunkGetResponse,
12 ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk, XorName,
13 CLOSE_GROUP_MAJORITY,
14};
15use bytes::Bytes;
16use futures::stream::{FuturesUnordered, StreamExt};
17use std::future::Future;
18use std::time::{Duration, Instant};
19use tracing::{debug, warn};
20
21const CHUNK_DATA_TYPE: u32 = 0;
23
24#[derive(Debug, Clone)]
37pub struct PeerPool {
38 pub(crate) peers: Vec<(PeerId, Vec<MultiAddr>)>,
39}
40
41impl PeerPool {
42 #[must_use]
44 pub fn len(&self) -> usize {
45 self.peers.len()
46 }
47
48 #[must_use]
50 pub fn is_empty(&self) -> bool {
51 self.peers.is_empty()
52 }
53}
54
55impl Client {
56 pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
67 let address = compute_address(&content);
68 let data_size = u64::try_from(content.len())
69 .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
70
71 match self
72 .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
73 .await
74 {
75 Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
76 Err(Error::AlreadyStored) => {
77 debug!(
78 "Chunk {} already stored on network, skipping payment",
79 hex::encode(address)
80 );
81 Ok(address)
82 }
83 Err(e) => Err(e),
84 }
85 }
86
87 pub(crate) async fn chunk_put_to_close_group(
99 &self,
100 content: Bytes,
101 proof: Vec<u8>,
102 peers: &[(PeerId, Vec<MultiAddr>)],
103 ) -> Result<XorName> {
104 let address = compute_address(&content);
105
106 let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
107 let (initial_peers, fallback_peers) = peers.split_at(initial_count);
108
109 let mut put_futures = FuturesUnordered::new();
110 for (peer_id, addrs) in initial_peers {
111 put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs));
112 }
113
114 let mut success_count = 0usize;
115 let mut failures: Vec<String> = Vec::new();
116 let mut fallback_iter = fallback_peers.iter();
117
118 while let Some((peer_id, result)) = put_futures.next().await {
119 match result {
120 Ok(_) => {
121 success_count += 1;
122 if success_count >= CLOSE_GROUP_MAJORITY {
123 debug!(
124 "Chunk {} stored on {success_count} peers (majority reached)",
125 hex::encode(address)
126 );
127 return Ok(address);
128 }
129 }
130 Err(e) => {
131 warn!("Failed to store chunk on {peer_id}: {e}");
132 failures.push(format!("{peer_id}: {e}"));
133
134 if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
135 debug!(
136 "Falling back to peer {fb_peer} for chunk {}",
137 hex::encode(address)
138 );
139 put_futures.push(self.spawn_chunk_put(
140 content.clone(),
141 proof.clone(),
142 fb_peer,
143 fb_addrs,
144 ));
145 }
146 }
147 }
148 }
149
150 Err(Error::InsufficientPeers(format!(
151 "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]",
152 failures.join("; ")
153 )))
154 }
155
156 fn spawn_chunk_put<'a>(
158 &'a self,
159 content: Bytes,
160 proof: Vec<u8>,
161 peer_id: &'a PeerId,
162 addrs: &'a [MultiAddr],
163 ) -> impl Future<Output = (PeerId, Result<XorName>)> + 'a {
164 let peer_id_owned = *peer_id;
165 async move {
166 let result = self
167 .chunk_put_with_proof(content, proof, &peer_id_owned, addrs)
168 .await;
169 (peer_id_owned, result)
170 }
171 }
172
173 pub async fn chunk_put_with_proof(
182 &self,
183 content: Bytes,
184 proof: Vec<u8>,
185 target_peer: &PeerId,
186 peer_addrs: &[MultiAddr],
187 ) -> Result<XorName> {
188 let address = compute_address(&content);
189 let node = self.network().node();
190
191 let request_id = self.next_request_id();
192 let request = ChunkPutRequest::with_payment(address, content.to_vec(), proof);
193 let message = ChunkMessage {
194 request_id,
195 body: ChunkMessageBody::PutRequest(request),
196 };
197 let message_bytes = message
198 .encode()
199 .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
200
201 let timeout = Duration::from_secs(self.config().store_timeout_secs);
202 let addr_hex = hex::encode(address);
203 let timeout_secs = self.config().store_timeout_secs;
204
205 let result = send_and_await_chunk_response(
206 node,
207 target_peer,
208 message_bytes,
209 request_id,
210 timeout,
211 peer_addrs,
212 |body| match body {
213 ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
214 debug!("Chunk stored at {}", hex::encode(addr));
215 Some(Ok(addr))
216 }
217 ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
218 address: addr,
219 }) => {
220 debug!("Chunk already exists at {}", hex::encode(addr));
221 Some(Ok(addr))
222 }
223 ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
224 Some(Err(Error::Payment(format!("Payment required: {message}"))))
225 }
226 ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => Some(Err(
227 Error::Protocol(format!("Remote PUT error for {addr_hex}: {e}")),
228 )),
229 _ => None,
230 },
231 |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
232 || {
233 Error::Timeout(format!(
234 "Timeout waiting for store response after {timeout_secs}s"
235 ))
236 },
237 )
238 .await;
239
240 record_peer_outcome(node, *target_peer, peer_addrs, result.is_ok(), None).await;
245
246 result
247 }
248
249 pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
260 if let Some(cached) = self.chunk_cache().get(address) {
262 let computed = compute_address(&cached);
263 if computed == *address {
264 debug!("Cache hit for chunk {}", hex::encode(address));
265 return Ok(Some(DataChunk::new(*address, cached)));
266 }
267 debug!(
269 "Cache corruption detected for {}: evicting",
270 hex::encode(address)
271 );
272 self.chunk_cache().remove(address);
273 }
274
275 let peers = self.close_group_peers(address).await?;
276 let addr_hex = hex::encode(address);
277
278 for (peer, addrs) in &peers {
279 match self.chunk_get_from_peer(address, peer, addrs).await {
280 Ok(Some(chunk)) => {
281 self.chunk_cache().put(chunk.address, chunk.content.clone());
282 return Ok(Some(chunk));
283 }
284 Ok(None) => {
285 debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
286 }
287 Err(Error::Timeout(_) | Error::Network(_)) => {
288 debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
289 }
290 Err(e) => return Err(e),
291 }
292 }
293
294 Ok(None)
296 }
297
298 pub async fn build_peer_pool_for(&self, addresses: &[XorName]) -> Result<PeerPool> {
319 if addresses.is_empty() {
320 return Err(Error::InvalidData(
321 "build_peer_pool_for requires at least one address".to_string(),
322 ));
323 }
324 let mut sorted = addresses.to_vec();
325 sorted.sort_unstable();
326 let median_idx = sorted.len() / 2;
327 let median = sorted.get(median_idx).copied().unwrap_or_else(|| sorted[0]);
328 let peers = self.close_group_peers(&median).await?;
329 debug!(
330 "Built peer pool of {} peers from median address {} (batch size {})",
331 peers.len(),
332 hex::encode(median),
333 addresses.len()
334 );
335 Ok(PeerPool { peers })
336 }
337
338 pub async fn chunk_get_with_pool(
353 &self,
354 address: &XorName,
355 pool: &PeerPool,
356 ) -> Result<Option<DataChunk>> {
357 if let Some(cached) = self.chunk_cache().get(address) {
359 let computed = compute_address(&cached);
360 if computed == *address {
361 debug!("Cache hit for chunk {} (pooled GET)", hex::encode(address));
362 return Ok(Some(DataChunk::new(*address, cached)));
363 }
364 self.chunk_cache().remove(address);
365 }
366
367 let addr_hex = hex::encode(address);
368
369 for (peer, addrs) in &pool.peers {
371 match self.chunk_get_from_peer(address, peer, addrs).await {
372 Ok(Some(chunk)) => {
373 self.chunk_cache().put(chunk.address, chunk.content.clone());
374 return Ok(Some(chunk));
375 }
376 Ok(None) => {
377 debug!("Chunk {addr_hex} not found on pool peer {peer}, trying next");
378 }
379 Err(Error::Timeout(_) | Error::Network(_)) => {
380 debug!("Pool peer {peer} unreachable for chunk {addr_hex}");
381 }
382 Err(e) => return Err(e),
383 }
384 }
385
386 debug!("Pool exhausted for {addr_hex}; falling back to per-chunk close group");
390 self.chunk_get(address).await
391 }
392
393 async fn chunk_get_from_peer(
395 &self,
396 address: &XorName,
397 peer: &PeerId,
398 peer_addrs: &[MultiAddr],
399 ) -> Result<Option<DataChunk>> {
400 let node = self.network().node();
401 let request_id = self.next_request_id();
402 let request = ChunkGetRequest::new(*address);
403 let message = ChunkMessage {
404 request_id,
405 body: ChunkMessageBody::GetRequest(request),
406 };
407 let message_bytes = message
408 .encode()
409 .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
410
411 let timeout = Duration::from_secs(self.config().store_timeout_secs);
412 let addr_hex = hex::encode(address);
413 let timeout_secs = self.config().store_timeout_secs;
414
415 let start = Instant::now();
416 let result = send_and_await_chunk_response(
417 node,
418 peer,
419 message_bytes,
420 request_id,
421 timeout,
422 peer_addrs,
423 |body| match body {
424 ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
425 address: addr,
426 content,
427 }) => {
428 if addr != *address {
429 return Some(Err(Error::InvalidData(format!(
430 "Mismatched chunk address: expected {addr_hex}, got {}",
431 hex::encode(addr)
432 ))));
433 }
434
435 let computed = compute_address(&content);
436 if computed != addr {
437 return Some(Err(Error::InvalidData(format!(
438 "Invalid chunk content: expected hash {addr_hex}, got {}",
439 hex::encode(computed)
440 ))));
441 }
442
443 debug!(
444 "Retrieved chunk {} ({} bytes) from peer {peer}",
445 hex::encode(addr),
446 content.len()
447 );
448 Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
449 }
450 ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
451 ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
452 Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
453 )),
454 _ => None,
455 },
456 |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
457 || {
458 Error::Timeout(format!(
459 "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
460 ))
461 },
462 )
463 .await;
464
465 let success = result.is_ok();
466 let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
467 record_peer_outcome(node, *peer, peer_addrs, success, rtt_ms).await;
468
469 result
470 }
471
472 pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
478 self.chunk_get(address).await.map(|opt| opt.is_some())
479 }
480}
481
482#[cfg(test)]
483mod tests {
484 use super::*;
485
486 #[test]
487 fn peer_pool_empty_reports_zero_len() {
488 let pool = PeerPool { peers: Vec::new() };
489 assert_eq!(pool.len(), 0);
490 assert!(pool.is_empty());
491 }
492
493 #[test]
494 fn peer_pool_with_entries_reports_len() {
495 use ant_protocol::transport::PeerId;
496 let peer = PeerId::random();
497 let pool = PeerPool {
498 peers: vec![(peer, vec![])],
499 };
500 assert_eq!(pool.len(), 1);
501 assert!(!pool.is_empty());
502 }
503}