Skip to main content

ant_core/data/client/
chunk.rs

1//! Chunk storage operations.
2//!
3//! Chunks are immutable, content-addressed data blocks where the address
4//! is the BLAKE3 hash of the content.
5
6use crate::data::client::peer_cache::record_peer_outcome;
7use crate::data::client::Client;
8use crate::data::error::{Error, Result};
9use ant_protocol::transport::{MultiAddr, PeerId};
10use ant_protocol::{
11    compute_address, send_and_await_chunk_response, ChunkGetRequest, ChunkGetResponse,
12    ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk, XorName,
13    CLOSE_GROUP_MAJORITY,
14};
15use bytes::Bytes;
16use futures::stream::{FuturesUnordered, StreamExt};
17use std::future::Future;
18use std::time::{Duration, Instant};
19use tracing::{debug, warn};
20
21/// Data type identifier for chunks (used in quote requests).
22const CHUNK_DATA_TYPE: u32 = 0;
23
24impl Client {
25    /// Store a chunk on the Autonomi network with payment.
26    ///
27    /// Checks if the chunk already exists before paying. If it does,
28    /// returns the address immediately without incurring on-chain costs.
29    /// Otherwise collects quotes, pays on-chain, then stores with proof
30    /// to `CLOSE_GROUP_MAJORITY` peers.
31    ///
32    /// # Errors
33    ///
34    /// Returns an error if payment or the network operation fails.
35    pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
36        let address = compute_address(&content);
37        let data_size = u64::try_from(content.len())
38            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
39
40        match self
41            .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
42            .await
43        {
44            Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
45            Err(Error::AlreadyStored) => {
46                debug!(
47                    "Chunk {} already stored on network, skipping payment",
48                    hex::encode(address)
49                );
50                Ok(address)
51            }
52            Err(e) => Err(e),
53        }
54    }
55
56    /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers from the quoted set.
57    ///
58    /// Initially sends the PUT concurrently to the first
59    /// `CLOSE_GROUP_MAJORITY` peers. If any fail, falls back to the
60    /// remaining peers in the quoted set until majority is reached or
61    /// all peers are exhausted.
62    ///
63    /// # Errors
64    ///
65    /// Returns an error if fewer than `CLOSE_GROUP_MAJORITY` peers accept
66    /// the chunk.
67    pub(crate) async fn chunk_put_to_close_group(
68        &self,
69        content: Bytes,
70        proof: Vec<u8>,
71        peers: &[(PeerId, Vec<MultiAddr>)],
72    ) -> Result<XorName> {
73        let address = compute_address(&content);
74
75        let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
76        let (initial_peers, fallback_peers) = peers.split_at(initial_count);
77
78        let mut put_futures = FuturesUnordered::new();
79        for (peer_id, addrs) in initial_peers {
80            put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs));
81        }
82
83        let mut success_count = 0usize;
84        let mut failures: Vec<String> = Vec::new();
85        let mut fallback_iter = fallback_peers.iter();
86
87        while let Some((peer_id, result)) = put_futures.next().await {
88            match result {
89                Ok(_) => {
90                    success_count += 1;
91                    if success_count >= CLOSE_GROUP_MAJORITY {
92                        debug!(
93                            "Chunk {} stored on {success_count} peers (majority reached)",
94                            hex::encode(address)
95                        );
96                        return Ok(address);
97                    }
98                }
99                Err(e) => {
100                    warn!("Failed to store chunk on {peer_id}: {e}");
101                    failures.push(format!("{peer_id}: {e}"));
102
103                    if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
104                        debug!(
105                            "Falling back to peer {fb_peer} for chunk {}",
106                            hex::encode(address)
107                        );
108                        put_futures.push(self.spawn_chunk_put(
109                            content.clone(),
110                            proof.clone(),
111                            fb_peer,
112                            fb_addrs,
113                        ));
114                    }
115                }
116            }
117        }
118
119        Err(Error::InsufficientPeers(format!(
120            "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]",
121            failures.join("; ")
122        )))
123    }
124
125    /// Spawn a chunk PUT future for a single peer.
126    fn spawn_chunk_put<'a>(
127        &'a self,
128        content: Bytes,
129        proof: Vec<u8>,
130        peer_id: &'a PeerId,
131        addrs: &'a [MultiAddr],
132    ) -> impl Future<Output = (PeerId, Result<XorName>)> + 'a {
133        let peer_id_owned = *peer_id;
134        async move {
135            let result = self
136                .chunk_put_with_proof(content, proof, &peer_id_owned, addrs)
137                .await;
138            (peer_id_owned, result)
139        }
140    }
141
142    /// Store a chunk on the Autonomi network with a pre-built payment proof.
143    ///
144    /// Sends to a single peer. Callers that need replication across the
145    /// close group should use `chunk_put_to_close_group` instead.
146    ///
147    /// # Errors
148    ///
149    /// Returns an error if the network operation fails.
150    pub async fn chunk_put_with_proof(
151        &self,
152        content: Bytes,
153        proof: Vec<u8>,
154        target_peer: &PeerId,
155        peer_addrs: &[MultiAddr],
156    ) -> Result<XorName> {
157        let address = compute_address(&content);
158        let node = self.network().node();
159
160        let request_id = self.next_request_id();
161        let request = ChunkPutRequest::with_payment(address, content.to_vec(), proof);
162        let message = ChunkMessage {
163            request_id,
164            body: ChunkMessageBody::PutRequest(request),
165        };
166        let message_bytes = message
167            .encode()
168            .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
169
170        let timeout = Duration::from_secs(self.config().store_timeout_secs);
171        let addr_hex = hex::encode(address);
172        let timeout_secs = self.config().store_timeout_secs;
173
174        let result = send_and_await_chunk_response(
175            node,
176            target_peer,
177            message_bytes,
178            request_id,
179            timeout,
180            peer_addrs,
181            |body| match body {
182                ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
183                    debug!("Chunk stored at {}", hex::encode(addr));
184                    Some(Ok(addr))
185                }
186                ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
187                    address: addr,
188                }) => {
189                    debug!("Chunk already exists at {}", hex::encode(addr));
190                    Some(Ok(addr))
191                }
192                ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
193                    Some(Err(Error::Payment(format!("Payment required: {message}"))))
194                }
195                ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => Some(Err(
196                    Error::Protocol(format!("Remote PUT error for {addr_hex}: {e}")),
197                )),
198                _ => None,
199            },
200            |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
201            || {
202                Error::Timeout(format!(
203                    "Timeout waiting for store response after {timeout_secs}s"
204                ))
205            },
206        )
207        .await;
208
209        // No RTT recorded on the PUT path: the wall-clock is dominated by
210        // the ~4 MB payload upload, which reflects the uploader's uplink
211        // rather than the peer's responsiveness. Quote-path and GET-path
212        // RTTs still feed quality scoring.
213        record_peer_outcome(node, *target_peer, peer_addrs, result.is_ok(), None).await;
214
215        result
216    }
217
218    /// Retrieve a chunk from the Autonomi network.
219    ///
220    /// Queries all peers in the close group for the chunk address,
221    /// returning the first successful response. This handles the case
222    /// where the storing peer differs from the first peer returned by
223    /// DHT routing.
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if the network operation fails.
228    pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
229        // Check cache first, with integrity verification.
230        if let Some(cached) = self.chunk_cache().get(address) {
231            let computed = compute_address(&cached);
232            if computed == *address {
233                debug!("Cache hit for chunk {}", hex::encode(address));
234                return Ok(Some(DataChunk::new(*address, cached)));
235            }
236            // Cache entry corrupted — evict and fall through to network fetch.
237            debug!(
238                "Cache corruption detected for {}: evicting",
239                hex::encode(address)
240            );
241            self.chunk_cache().remove(address);
242        }
243
244        let peers = self.close_group_peers(address).await?;
245        let addr_hex = hex::encode(address);
246
247        for (peer, addrs) in &peers {
248            match self.chunk_get_from_peer(address, peer, addrs).await {
249                Ok(Some(chunk)) => {
250                    self.chunk_cache().put(chunk.address, chunk.content.clone());
251                    return Ok(Some(chunk));
252                }
253                Ok(None) => {
254                    debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
255                }
256                Err(Error::Timeout(_) | Error::Network(_)) => {
257                    debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
258                }
259                Err(e) => return Err(e),
260            }
261        }
262
263        // None of the close group peers had the chunk
264        Ok(None)
265    }
266
267    /// Fetch a chunk from a specific peer.
268    async fn chunk_get_from_peer(
269        &self,
270        address: &XorName,
271        peer: &PeerId,
272        peer_addrs: &[MultiAddr],
273    ) -> Result<Option<DataChunk>> {
274        let node = self.network().node();
275        let request_id = self.next_request_id();
276        let request = ChunkGetRequest::new(*address);
277        let message = ChunkMessage {
278            request_id,
279            body: ChunkMessageBody::GetRequest(request),
280        };
281        let message_bytes = message
282            .encode()
283            .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
284
285        let timeout = Duration::from_secs(self.config().store_timeout_secs);
286        let addr_hex = hex::encode(address);
287        let timeout_secs = self.config().store_timeout_secs;
288
289        let start = Instant::now();
290        let result = send_and_await_chunk_response(
291            node,
292            peer,
293            message_bytes,
294            request_id,
295            timeout,
296            peer_addrs,
297            |body| match body {
298                ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
299                    address: addr,
300                    content,
301                }) => {
302                    if addr != *address {
303                        return Some(Err(Error::InvalidData(format!(
304                            "Mismatched chunk address: expected {addr_hex}, got {}",
305                            hex::encode(addr)
306                        ))));
307                    }
308
309                    let computed = compute_address(&content);
310                    if computed != addr {
311                        return Some(Err(Error::InvalidData(format!(
312                            "Invalid chunk content: expected hash {addr_hex}, got {}",
313                            hex::encode(computed)
314                        ))));
315                    }
316
317                    debug!(
318                        "Retrieved chunk {} ({} bytes) from peer {peer}",
319                        hex::encode(addr),
320                        content.len()
321                    );
322                    Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
323                }
324                ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
325                ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
326                    Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
327                )),
328                _ => None,
329            },
330            |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
331            || {
332                Error::Timeout(format!(
333                    "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
334                ))
335            },
336        )
337        .await;
338
339        let success = result.is_ok();
340        let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
341        record_peer_outcome(node, *peer, peer_addrs, success, rtt_ms).await;
342
343        result
344    }
345
346    /// Check if a chunk exists on the network.
347    ///
348    /// # Errors
349    ///
350    /// Returns an error if the network operation fails.
351    pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
352        self.chunk_get(address).await.map(|opt| opt.is_some())
353    }
354}