Skip to main content

ant_core/data/client/
chunk.rs

1//! Chunk storage operations.
2//!
3//! Chunks are immutable, content-addressed data blocks where the address
4//! is the BLAKE3 hash of the content.
5
6use crate::data::client::peer_cache::record_peer_outcome;
7use crate::data::client::Client;
8use crate::data::error::{Error, Result};
9use ant_protocol::transport::{MultiAddr, PeerId};
10use ant_protocol::{
11    compute_address, detect_proof_type, send_and_await_chunk_response, ChunkGetRequest,
12    ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk,
13    ProofType, XorName, CLOSE_GROUP_MAJORITY,
14};
15use bytes::Bytes;
16use futures::stream::{FuturesUnordered, StreamExt};
17use std::future::Future;
18use std::time::{Duration, Instant};
19use tracing::{debug, warn};
20
21/// Data type identifier for chunks (used in quote requests).
22const CHUNK_DATA_TYPE: u32 = 0;
23
24/// Store-response timeout for non-merkle chunk PUTs.
25const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(30);
26
27fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration {
28    match detect_proof_type(proof) {
29        Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs),
30        _ => STORE_RESPONSE_TIMEOUT,
31    }
32}
33
34impl Client {
35    /// Store a chunk on the Autonomi network with payment.
36    ///
37    /// Checks if the chunk already exists before paying. If it does,
38    /// returns the address immediately without incurring on-chain costs.
39    /// Otherwise collects quotes, pays on-chain, then stores with proof
40    /// to `CLOSE_GROUP_MAJORITY` peers.
41    ///
42    /// # Errors
43    ///
44    /// Returns an error if payment or the network operation fails.
45    pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
46        let address = compute_address(&content);
47        let data_size = u64::try_from(content.len())
48            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
49
50        match self
51            .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
52            .await
53        {
54            Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
55            Err(Error::AlreadyStored) => {
56                debug!(
57                    "Chunk {} already stored on network, skipping payment",
58                    hex::encode(address)
59                );
60                Ok(address)
61            }
62            Err(e) => Err(e),
63        }
64    }
65
66    /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers from the quoted set.
67    ///
68    /// Initially sends the PUT concurrently to the first
69    /// `CLOSE_GROUP_MAJORITY` peers. If any fail, falls back to the
70    /// remaining peers in the quoted set until majority is reached or
71    /// all peers are exhausted.
72    ///
73    /// # Errors
74    ///
75    /// Returns an error if fewer than `CLOSE_GROUP_MAJORITY` peers accept
76    /// the chunk.
77    pub(crate) async fn chunk_put_to_close_group(
78        &self,
79        content: Bytes,
80        proof: Vec<u8>,
81        peers: &[(PeerId, Vec<MultiAddr>)],
82    ) -> Result<XorName> {
83        let address = compute_address(&content);
84
85        let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
86        let (initial_peers, fallback_peers) = peers.split_at(initial_count);
87
88        let mut put_futures = FuturesUnordered::new();
89        for (peer_id, addrs) in initial_peers {
90            put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs));
91        }
92
93        let mut success_count = 0usize;
94        let mut failures: Vec<String> = Vec::new();
95        let mut fallback_iter = fallback_peers.iter();
96
97        while let Some((peer_id, result)) = put_futures.next().await {
98            match result {
99                Ok(_) => {
100                    success_count += 1;
101                    if success_count >= CLOSE_GROUP_MAJORITY {
102                        debug!(
103                            "Chunk {} stored on {success_count} peers (majority reached)",
104                            hex::encode(address)
105                        );
106                        return Ok(address);
107                    }
108                }
109                Err(e) => {
110                    warn!("Failed to store chunk on {peer_id}: {e}");
111                    failures.push(format!("{peer_id}: {e}"));
112
113                    if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
114                        debug!(
115                            "Falling back to peer {fb_peer} for chunk {}",
116                            hex::encode(address)
117                        );
118                        put_futures.push(self.spawn_chunk_put(
119                            content.clone(),
120                            proof.clone(),
121                            fb_peer,
122                            fb_addrs,
123                        ));
124                    }
125                }
126            }
127        }
128
129        Err(Error::InsufficientPeers(format!(
130            "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]",
131            failures.join("; ")
132        )))
133    }
134
135    /// Spawn a chunk PUT future for a single peer.
136    fn spawn_chunk_put<'a>(
137        &'a self,
138        content: Bytes,
139        proof: Vec<u8>,
140        peer_id: &'a PeerId,
141        addrs: &'a [MultiAddr],
142    ) -> impl Future<Output = (PeerId, Result<XorName>)> + 'a {
143        let peer_id_owned = *peer_id;
144        async move {
145            let result = self
146                .chunk_put_with_proof(content, proof, &peer_id_owned, addrs)
147                .await;
148            (peer_id_owned, result)
149        }
150    }
151
152    /// Store a chunk on the Autonomi network with a pre-built payment proof.
153    ///
154    /// Sends to a single peer. Callers that need replication across the
155    /// close group should use `chunk_put_to_close_group` instead.
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the network operation fails.
160    pub async fn chunk_put_with_proof(
161        &self,
162        content: Bytes,
163        proof: Vec<u8>,
164        target_peer: &PeerId,
165        peer_addrs: &[MultiAddr],
166    ) -> Result<XorName> {
167        let address = compute_address(&content);
168        let node = self.network().node();
169        let timeout = store_response_timeout_for_proof(&proof, self.config().store_timeout_secs);
170        let timeout_secs = timeout.as_secs();
171
172        let request_id = self.next_request_id();
173        let request = ChunkPutRequest::with_payment(address, content.to_vec(), proof);
174        let message = ChunkMessage {
175            request_id,
176            body: ChunkMessageBody::PutRequest(request),
177        };
178        let message_bytes = message
179            .encode()
180            .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
181
182        let addr_hex = hex::encode(address);
183
184        let result = send_and_await_chunk_response(
185            node,
186            target_peer,
187            message_bytes,
188            request_id,
189            timeout,
190            peer_addrs,
191            |body| match body {
192                ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
193                    debug!("Chunk stored at {}", hex::encode(addr));
194                    Some(Ok(addr))
195                }
196                ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
197                    address: addr,
198                }) => {
199                    debug!("Chunk already exists at {}", hex::encode(addr));
200                    Some(Ok(addr))
201                }
202                ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
203                    Some(Err(Error::Payment(format!("Payment required: {message}"))))
204                }
205                ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => Some(Err(
206                    Error::Protocol(format!("Remote PUT error for {addr_hex}: {e}")),
207                )),
208                _ => None,
209            },
210            |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
211            || {
212                Error::Timeout(format!(
213                    "Timeout waiting for store response after {timeout_secs}s"
214                ))
215            },
216        )
217        .await;
218
219        // No RTT recorded on the PUT path: the wall-clock is dominated by
220        // the ~4 MB payload upload, which reflects the uploader's uplink
221        // rather than the peer's responsiveness. Quote-path and GET-path
222        // RTTs still feed quality scoring.
223        record_peer_outcome(node, *target_peer, peer_addrs, result.is_ok(), None).await;
224
225        result
226    }
227
228    /// Retrieve a chunk from the Autonomi network.
229    ///
230    /// Queries all peers in the close group for the chunk address,
231    /// returning the first successful response. This handles the case
232    /// where the storing peer differs from the first peer returned by
233    /// DHT routing.
234    ///
235    /// # Errors
236    ///
237    /// Returns an error if the network operation fails.
238    pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
239        // Check cache first, with integrity verification.
240        if let Some(cached) = self.chunk_cache().get(address) {
241            let computed = compute_address(&cached);
242            if computed == *address {
243                debug!("Cache hit for chunk {}", hex::encode(address));
244                return Ok(Some(DataChunk::new(*address, cached)));
245            }
246            // Cache entry corrupted — evict and fall through to network fetch.
247            debug!(
248                "Cache corruption detected for {}: evicting",
249                hex::encode(address)
250            );
251            self.chunk_cache().remove(address);
252        }
253
254        let peers = self.close_group_peers(address).await?;
255        let addr_hex = hex::encode(address);
256
257        for (peer, addrs) in &peers {
258            match self.chunk_get_from_peer(address, peer, addrs).await {
259                Ok(Some(chunk)) => {
260                    self.chunk_cache().put(chunk.address, chunk.content.clone());
261                    return Ok(Some(chunk));
262                }
263                Ok(None) => {
264                    debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
265                }
266                Err(Error::Timeout(_) | Error::Network(_)) => {
267                    debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
268                }
269                Err(e) => return Err(e),
270            }
271        }
272
273        // None of the close group peers had the chunk
274        Ok(None)
275    }
276
277    /// Fetch a chunk from a specific peer.
278    async fn chunk_get_from_peer(
279        &self,
280        address: &XorName,
281        peer: &PeerId,
282        peer_addrs: &[MultiAddr],
283    ) -> Result<Option<DataChunk>> {
284        let node = self.network().node();
285        let request_id = self.next_request_id();
286        let request = ChunkGetRequest::new(*address);
287        let message = ChunkMessage {
288            request_id,
289            body: ChunkMessageBody::GetRequest(request),
290        };
291        let message_bytes = message
292            .encode()
293            .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
294
295        let timeout = Duration::from_secs(self.config().store_timeout_secs);
296        let addr_hex = hex::encode(address);
297        let timeout_secs = self.config().store_timeout_secs;
298
299        let start = Instant::now();
300        let result = send_and_await_chunk_response(
301            node,
302            peer,
303            message_bytes,
304            request_id,
305            timeout,
306            peer_addrs,
307            |body| match body {
308                ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
309                    address: addr,
310                    content,
311                }) => {
312                    if addr != *address {
313                        return Some(Err(Error::InvalidData(format!(
314                            "Mismatched chunk address: expected {addr_hex}, got {}",
315                            hex::encode(addr)
316                        ))));
317                    }
318
319                    let computed = compute_address(&content);
320                    if computed != addr {
321                        return Some(Err(Error::InvalidData(format!(
322                            "Invalid chunk content: expected hash {addr_hex}, got {}",
323                            hex::encode(computed)
324                        ))));
325                    }
326
327                    debug!(
328                        "Retrieved chunk {} ({} bytes) from peer {peer}",
329                        hex::encode(addr),
330                        content.len()
331                    );
332                    Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
333                }
334                ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
335                ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
336                    Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
337                )),
338                _ => None,
339            },
340            |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
341            || {
342                Error::Timeout(format!(
343                    "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
344                ))
345            },
346        )
347        .await;
348
349        let success = result.is_ok();
350        let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
351        record_peer_outcome(node, *peer, peer_addrs, success, rtt_ms).await;
352
353        result
354    }
355
356    /// Check if a chunk exists on the network.
357    ///
358    /// # Errors
359    ///
360    /// Returns an error if the network operation fails.
361    pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
362        self.chunk_get(address).await.map(|opt| opt.is_some())
363    }
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE};
370
371    /// Arbitrary configured Merkle store timeout used by the timeout-selection tests.
372    const TEST_MERKLE_TIMEOUT_SECS: u64 = 60;
373    /// Sentinel byte used to represent an unknown/unrecognized proof tag.
374    const UNKNOWN_PROOF_TAG: u8 = 0xff;
375
376    #[test]
377    fn single_node_proof_uses_store_response_timeout() {
378        let timeout =
379            store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS);
380
381        assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
382    }
383
384    #[test]
385    fn unknown_proof_uses_store_response_timeout() {
386        let timeout =
387            store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS);
388
389        assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
390    }
391
392    #[test]
393    fn merkle_proof_uses_configured_store_timeout() {
394        let timeout =
395            store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS);
396
397        assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS));
398    }
399}