Skip to main content

ant_core/data/client/
chunk.rs

1//! Chunk storage operations.
2//!
3//! Chunks are immutable, content-addressed data blocks where the address
4//! is the BLAKE3 hash of the content.
5
6use crate::data::client::peer_cache::record_peer_outcome;
7use crate::data::client::Client;
8use crate::data::error::{Error, Result};
9use ant_protocol::transport::{MultiAddr, PeerId};
10use ant_protocol::{
11    compute_address, send_and_await_chunk_response, ChunkGetRequest, ChunkGetResponse,
12    ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk, XorName,
13    CLOSE_GROUP_MAJORITY,
14};
15use bytes::Bytes;
16use futures::stream::{FuturesUnordered, StreamExt};
17use std::future::Future;
18use std::time::{Duration, Instant};
19use tracing::{debug, warn};
20
21/// Data type identifier for chunks (used in quote requests).
22const CHUNK_DATA_TYPE: u32 = 0;
23
24/// A reusable set of peers built from a single DHT lookup, suitable
25/// for fetching many chunks from overlapping close groups without
26/// paying the lookup cost per chunk.
27///
28/// Build via [`Client::build_peer_pool_for`]. Use via
29/// [`Client::chunk_get_with_pool`].
30///
31/// On live mainnet today, where `find_closest_peers` dominates
32/// per-chunk download wall-clock (~70 s/lookup), a pool built once
33/// and reused across a 32-chunk batch reduces total wall-clock by
34/// ~34% vs the per-chunk lookup baseline. The advantage grows with
35/// batch size.
36#[derive(Debug, Clone)]
37pub struct PeerPool {
38    pub(crate) peers: Vec<(PeerId, Vec<MultiAddr>)>,
39}
40
41impl PeerPool {
42    /// Number of peers in the pool.
43    #[must_use]
44    pub fn len(&self) -> usize {
45        self.peers.len()
46    }
47
48    /// Whether the pool is empty.
49    #[must_use]
50    pub fn is_empty(&self) -> bool {
51        self.peers.is_empty()
52    }
53}
54
55impl Client {
56    /// Store a chunk on the Autonomi network with payment.
57    ///
58    /// Checks if the chunk already exists before paying. If it does,
59    /// returns the address immediately without incurring on-chain costs.
60    /// Otherwise collects quotes, pays on-chain, then stores with proof
61    /// to `CLOSE_GROUP_MAJORITY` peers.
62    ///
63    /// # Errors
64    ///
65    /// Returns an error if payment or the network operation fails.
66    pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
67        let address = compute_address(&content);
68        let data_size = u64::try_from(content.len())
69            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
70
71        match self
72            .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
73            .await
74        {
75            Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
76            Err(Error::AlreadyStored) => {
77                debug!(
78                    "Chunk {} already stored on network, skipping payment",
79                    hex::encode(address)
80                );
81                Ok(address)
82            }
83            Err(e) => Err(e),
84        }
85    }
86
87    /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers from the quoted set.
88    ///
89    /// Initially sends the PUT concurrently to the first
90    /// `CLOSE_GROUP_MAJORITY` peers. If any fail, falls back to the
91    /// remaining peers in the quoted set until majority is reached or
92    /// all peers are exhausted.
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if fewer than `CLOSE_GROUP_MAJORITY` peers accept
97    /// the chunk.
98    pub(crate) async fn chunk_put_to_close_group(
99        &self,
100        content: Bytes,
101        proof: Vec<u8>,
102        peers: &[(PeerId, Vec<MultiAddr>)],
103    ) -> Result<XorName> {
104        let address = compute_address(&content);
105
106        let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
107        let (initial_peers, fallback_peers) = peers.split_at(initial_count);
108
109        let mut put_futures = FuturesUnordered::new();
110        for (peer_id, addrs) in initial_peers {
111            put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs));
112        }
113
114        let mut success_count = 0usize;
115        let mut failures: Vec<String> = Vec::new();
116        let mut fallback_iter = fallback_peers.iter();
117
118        while let Some((peer_id, result)) = put_futures.next().await {
119            match result {
120                Ok(_) => {
121                    success_count += 1;
122                    if success_count >= CLOSE_GROUP_MAJORITY {
123                        debug!(
124                            "Chunk {} stored on {success_count} peers (majority reached)",
125                            hex::encode(address)
126                        );
127                        return Ok(address);
128                    }
129                }
130                Err(e) => {
131                    warn!("Failed to store chunk on {peer_id}: {e}");
132                    failures.push(format!("{peer_id}: {e}"));
133
134                    if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
135                        debug!(
136                            "Falling back to peer {fb_peer} for chunk {}",
137                            hex::encode(address)
138                        );
139                        put_futures.push(self.spawn_chunk_put(
140                            content.clone(),
141                            proof.clone(),
142                            fb_peer,
143                            fb_addrs,
144                        ));
145                    }
146                }
147            }
148        }
149
150        Err(Error::InsufficientPeers(format!(
151            "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]",
152            failures.join("; ")
153        )))
154    }
155
156    /// Spawn a chunk PUT future for a single peer.
157    fn spawn_chunk_put<'a>(
158        &'a self,
159        content: Bytes,
160        proof: Vec<u8>,
161        peer_id: &'a PeerId,
162        addrs: &'a [MultiAddr],
163    ) -> impl Future<Output = (PeerId, Result<XorName>)> + 'a {
164        let peer_id_owned = *peer_id;
165        async move {
166            let result = self
167                .chunk_put_with_proof(content, proof, &peer_id_owned, addrs)
168                .await;
169            (peer_id_owned, result)
170        }
171    }
172
173    /// Store a chunk on the Autonomi network with a pre-built payment proof.
174    ///
175    /// Sends to a single peer. Callers that need replication across the
176    /// close group should use `chunk_put_to_close_group` instead.
177    ///
178    /// # Errors
179    ///
180    /// Returns an error if the network operation fails.
181    pub async fn chunk_put_with_proof(
182        &self,
183        content: Bytes,
184        proof: Vec<u8>,
185        target_peer: &PeerId,
186        peer_addrs: &[MultiAddr],
187    ) -> Result<XorName> {
188        let address = compute_address(&content);
189        let node = self.network().node();
190
191        let request_id = self.next_request_id();
192        let request = ChunkPutRequest::with_payment(address, content.to_vec(), proof);
193        let message = ChunkMessage {
194            request_id,
195            body: ChunkMessageBody::PutRequest(request),
196        };
197        let message_bytes = message
198            .encode()
199            .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
200
201        let timeout = Duration::from_secs(self.config().store_timeout_secs);
202        let addr_hex = hex::encode(address);
203        let timeout_secs = self.config().store_timeout_secs;
204
205        let result = send_and_await_chunk_response(
206            node,
207            target_peer,
208            message_bytes,
209            request_id,
210            timeout,
211            peer_addrs,
212            |body| match body {
213                ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
214                    debug!("Chunk stored at {}", hex::encode(addr));
215                    Some(Ok(addr))
216                }
217                ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
218                    address: addr,
219                }) => {
220                    debug!("Chunk already exists at {}", hex::encode(addr));
221                    Some(Ok(addr))
222                }
223                ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
224                    Some(Err(Error::Payment(format!("Payment required: {message}"))))
225                }
226                ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => Some(Err(
227                    Error::Protocol(format!("Remote PUT error for {addr_hex}: {e}")),
228                )),
229                _ => None,
230            },
231            |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
232            || {
233                Error::Timeout(format!(
234                    "Timeout waiting for store response after {timeout_secs}s"
235                ))
236            },
237        )
238        .await;
239
240        // No RTT recorded on the PUT path: the wall-clock is dominated by
241        // the ~4 MB payload upload, which reflects the uploader's uplink
242        // rather than the peer's responsiveness. Quote-path and GET-path
243        // RTTs still feed quality scoring.
244        record_peer_outcome(node, *target_peer, peer_addrs, result.is_ok(), None).await;
245
246        result
247    }
248
249    /// Retrieve a chunk from the Autonomi network.
250    ///
251    /// Queries all peers in the close group for the chunk address,
252    /// returning the first successful response. This handles the case
253    /// where the storing peer differs from the first peer returned by
254    /// DHT routing.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if the network operation fails.
259    pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
260        // Check cache first, with integrity verification.
261        if let Some(cached) = self.chunk_cache().get(address) {
262            let computed = compute_address(&cached);
263            if computed == *address {
264                debug!("Cache hit for chunk {}", hex::encode(address));
265                return Ok(Some(DataChunk::new(*address, cached)));
266            }
267            // Cache entry corrupted — evict and fall through to network fetch.
268            debug!(
269                "Cache corruption detected for {}: evicting",
270                hex::encode(address)
271            );
272            self.chunk_cache().remove(address);
273        }
274
275        let peers = self.close_group_peers(address).await?;
276        let addr_hex = hex::encode(address);
277
278        for (peer, addrs) in &peers {
279            match self.chunk_get_from_peer(address, peer, addrs).await {
280                Ok(Some(chunk)) => {
281                    self.chunk_cache().put(chunk.address, chunk.content.clone());
282                    return Ok(Some(chunk));
283                }
284                Ok(None) => {
285                    debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
286                }
287                Err(Error::Timeout(_) | Error::Network(_)) => {
288                    debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
289                }
290                Err(e) => return Err(e),
291            }
292        }
293
294        // None of the close group peers had the chunk
295        Ok(None)
296    }
297
298    /// Build a reusable pool of peers suitable for fetching any chunk
299    /// in `addresses`.
300    ///
301    /// Performs ONE close-group lookup at the median address of the
302    /// batch and returns the resulting peer set. On a network where
303    /// `find_closest_peers` dominates per-chunk wall-clock (e.g. live
304    /// Autonomi mainnet today), reusing this pool across many GETs
305    /// via `chunk_get_with_pool` gives a measured ~1.5x speedup on
306    /// 32-chunk downloads and grows with batch size.
307    ///
308    /// The pool is "best-effort": peers are the close group of the
309    /// median XorName, which closely overlaps the close group of any
310    /// nearby address. Chunks whose actual close group has rotated
311    /// can still be fetched via the per-chunk fallback in
312    /// `chunk_get_with_pool`.
313    ///
314    /// # Errors
315    ///
316    /// Returns an error if `addresses` is empty or if the DHT lookup
317    /// fails.
318    pub async fn build_peer_pool_for(&self, addresses: &[XorName]) -> Result<PeerPool> {
319        if addresses.is_empty() {
320            return Err(Error::InvalidData(
321                "build_peer_pool_for requires at least one address".to_string(),
322            ));
323        }
324        let mut sorted = addresses.to_vec();
325        sorted.sort_unstable();
326        let median_idx = sorted.len() / 2;
327        let median = sorted.get(median_idx).copied().unwrap_or_else(|| sorted[0]);
328        let peers = self.close_group_peers(&median).await?;
329        debug!(
330            "Built peer pool of {} peers from median address {} (batch size {})",
331            peers.len(),
332            hex::encode(median),
333            addresses.len()
334        );
335        Ok(PeerPool { peers })
336    }
337
338    /// Fetch a chunk, trying the supplied pool's peers first.
339    ///
340    /// Falls back to a per-chunk close-group lookup if none of the
341    /// pool peers have the chunk (or all pool peers are unreachable).
342    /// Pair with `build_peer_pool_for` to amortize a single DHT walk
343    /// across many GETs.
344    ///
345    /// Cache, integrity verification, and per-peer outcome reporting
346    /// behave identically to `chunk_get`.
347    ///
348    /// # Errors
349    ///
350    /// Returns an error if all peers (pool + fallback close group)
351    /// fail with non-recoverable errors.
352    pub async fn chunk_get_with_pool(
353        &self,
354        address: &XorName,
355        pool: &PeerPool,
356    ) -> Result<Option<DataChunk>> {
357        // Cache lookup with integrity verification — same as chunk_get.
358        if let Some(cached) = self.chunk_cache().get(address) {
359            let computed = compute_address(&cached);
360            if computed == *address {
361                debug!("Cache hit for chunk {} (pooled GET)", hex::encode(address));
362                return Ok(Some(DataChunk::new(*address, cached)));
363            }
364            self.chunk_cache().remove(address);
365        }
366
367        let addr_hex = hex::encode(address);
368
369        // Try the pool first.
370        for (peer, addrs) in &pool.peers {
371            match self.chunk_get_from_peer(address, peer, addrs).await {
372                Ok(Some(chunk)) => {
373                    self.chunk_cache().put(chunk.address, chunk.content.clone());
374                    return Ok(Some(chunk));
375                }
376                Ok(None) => {
377                    debug!("Chunk {addr_hex} not found on pool peer {peer}, trying next");
378                }
379                Err(Error::Timeout(_) | Error::Network(_)) => {
380                    debug!("Pool peer {peer} unreachable for chunk {addr_hex}");
381                }
382                Err(e) => return Err(e),
383            }
384        }
385
386        // Pool miss: fall back to a per-chunk close-group lookup so
387        // we don't return a false NotFound when the pool's peers
388        // simply weren't the right close group for this address.
389        debug!("Pool exhausted for {addr_hex}; falling back to per-chunk close group");
390        self.chunk_get(address).await
391    }
392
393    /// Fetch a chunk from a specific peer.
394    async fn chunk_get_from_peer(
395        &self,
396        address: &XorName,
397        peer: &PeerId,
398        peer_addrs: &[MultiAddr],
399    ) -> Result<Option<DataChunk>> {
400        let node = self.network().node();
401        let request_id = self.next_request_id();
402        let request = ChunkGetRequest::new(*address);
403        let message = ChunkMessage {
404            request_id,
405            body: ChunkMessageBody::GetRequest(request),
406        };
407        let message_bytes = message
408            .encode()
409            .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
410
411        let timeout = Duration::from_secs(self.config().store_timeout_secs);
412        let addr_hex = hex::encode(address);
413        let timeout_secs = self.config().store_timeout_secs;
414
415        let start = Instant::now();
416        let result = send_and_await_chunk_response(
417            node,
418            peer,
419            message_bytes,
420            request_id,
421            timeout,
422            peer_addrs,
423            |body| match body {
424                ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
425                    address: addr,
426                    content,
427                }) => {
428                    if addr != *address {
429                        return Some(Err(Error::InvalidData(format!(
430                            "Mismatched chunk address: expected {addr_hex}, got {}",
431                            hex::encode(addr)
432                        ))));
433                    }
434
435                    let computed = compute_address(&content);
436                    if computed != addr {
437                        return Some(Err(Error::InvalidData(format!(
438                            "Invalid chunk content: expected hash {addr_hex}, got {}",
439                            hex::encode(computed)
440                        ))));
441                    }
442
443                    debug!(
444                        "Retrieved chunk {} ({} bytes) from peer {peer}",
445                        hex::encode(addr),
446                        content.len()
447                    );
448                    Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
449                }
450                ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
451                ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
452                    Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
453                )),
454                _ => None,
455            },
456            |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
457            || {
458                Error::Timeout(format!(
459                    "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
460                ))
461            },
462        )
463        .await;
464
465        let success = result.is_ok();
466        let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
467        record_peer_outcome(node, *peer, peer_addrs, success, rtt_ms).await;
468
469        result
470    }
471
472    /// Check if a chunk exists on the network.
473    ///
474    /// # Errors
475    ///
476    /// Returns an error if the network operation fails.
477    pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
478        self.chunk_get(address).await.map(|opt| opt.is_some())
479    }
480}
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485
486    #[test]
487    fn peer_pool_empty_reports_zero_len() {
488        let pool = PeerPool { peers: Vec::new() };
489        assert_eq!(pool.len(), 0);
490        assert!(pool.is_empty());
491    }
492
493    #[test]
494    fn peer_pool_with_entries_reports_len() {
495        use ant_protocol::transport::PeerId;
496        let peer = PeerId::random();
497        let pool = PeerPool {
498            peers: vec![(peer, vec![])],
499        };
500        assert_eq!(pool.len(), 1);
501        assert!(!pool.is_empty());
502    }
503}