Skip to main content

ant_core/data/client/
chunk.rs

1//! Chunk storage operations.
2//!
3//! Chunks are immutable, content-addressed data blocks where the address
4//! is the BLAKE3 hash of the content.
5
6use crate::data::client::batch::{finalize_batch_payment, PreparedChunk};
7use crate::data::client::peer_cache::record_peer_outcome;
8use crate::data::client::Client;
9use crate::data::error::{Error, Result};
10use ant_protocol::evm::{QuoteHash, TxHash};
11use ant_protocol::transport::{MultiAddr, PeerId};
12use ant_protocol::{
13    compute_address, detect_proof_type, send_and_await_chunk_response, ChunkGetRequest,
14    ChunkGetResponse, ChunkMessage, ChunkMessageBody, ChunkPutRequest, ChunkPutResponse, DataChunk,
15    ProofType, XorName, CLOSE_GROUP_MAJORITY,
16};
17use bytes::Bytes;
18use futures::stream::{FuturesUnordered, StreamExt};
19use std::collections::HashMap;
20use std::future::Future;
21use std::time::{Duration, Instant};
22use tracing::{debug, info, warn};
23
24/// Data type identifier for chunks (used in quote requests).
25const CHUNK_DATA_TYPE: u32 = 0;
26
27/// Store-response timeout for non-merkle chunk PUTs.
28const STORE_RESPONSE_TIMEOUT: Duration = Duration::from_secs(10);
29
30fn store_response_timeout_for_proof(proof: &[u8], merkle_timeout_secs: u64) -> Duration {
31    match detect_proof_type(proof) {
32        Some(ProofType::Merkle) => Duration::from_secs(merkle_timeout_secs),
33        _ => STORE_RESPONSE_TIMEOUT,
34    }
35}
36
37impl Client {
38    /// Store a chunk on the Autonomi network with payment.
39    ///
40    /// Checks if the chunk already exists before paying. If it does,
41    /// returns the address immediately without incurring on-chain costs.
42    /// Otherwise collects quotes, pays on-chain, then stores with proof
43    /// to `CLOSE_GROUP_MAJORITY` peers.
44    ///
45    /// # Errors
46    ///
47    /// Returns an error if payment or the network operation fails.
48    pub async fn chunk_put(&self, content: Bytes) -> Result<XorName> {
49        let address = compute_address(&content);
50        let data_size = u64::try_from(content.len())
51            .map_err(|e| Error::InvalidData(format!("content size too large: {e}")))?;
52
53        match self
54            .pay_for_storage(&address, data_size, CHUNK_DATA_TYPE)
55            .await
56        {
57            Ok((proof, peers)) => self.chunk_put_to_close_group(content, proof, &peers).await,
58            Err(Error::AlreadyStored) => {
59                debug!(
60                    "Chunk {} already stored on network, skipping payment",
61                    hex::encode(address)
62                );
63                Ok(address)
64            }
65            Err(e) => Err(e),
66        }
67    }
68
69    /// Store a chunk to `CLOSE_GROUP_MAJORITY` peers from the quoted set.
70    ///
71    /// Initially sends the PUT concurrently to the first
72    /// `CLOSE_GROUP_MAJORITY` peers. If any fail, falls back to the
73    /// remaining peers in the quoted set until majority is reached or
74    /// all peers are exhausted.
75    ///
76    /// # Errors
77    ///
78    /// Returns an error if fewer than `CLOSE_GROUP_MAJORITY` peers accept
79    /// the chunk.
80    pub(crate) async fn chunk_put_to_close_group(
81        &self,
82        content: Bytes,
83        proof: Vec<u8>,
84        peers: &[(PeerId, Vec<MultiAddr>)],
85    ) -> Result<XorName> {
86        let address = compute_address(&content);
87
88        let initial_count = peers.len().min(CLOSE_GROUP_MAJORITY);
89        let (initial_peers, fallback_peers) = peers.split_at(initial_count);
90
91        let mut put_futures = FuturesUnordered::new();
92        for (peer_id, addrs) in initial_peers {
93            put_futures.push(self.spawn_chunk_put(content.clone(), proof.clone(), peer_id, addrs));
94        }
95
96        let mut success_count = 0usize;
97        let mut failures: Vec<String> = Vec::new();
98        let mut fallback_iter = fallback_peers.iter();
99
100        while let Some((peer_id, result)) = put_futures.next().await {
101            match result {
102                Ok(_) => {
103                    success_count += 1;
104                    if success_count >= CLOSE_GROUP_MAJORITY {
105                        debug!(
106                            "Chunk {} stored on {success_count} peers (majority reached)",
107                            hex::encode(address)
108                        );
109                        return Ok(address);
110                    }
111                }
112                Err(e) => {
113                    warn!("Failed to store chunk on {peer_id}: {e}");
114                    failures.push(format!("{peer_id}: {e}"));
115
116                    if let Some((fb_peer, fb_addrs)) = fallback_iter.next() {
117                        debug!(
118                            "Falling back to peer {fb_peer} for chunk {}",
119                            hex::encode(address)
120                        );
121                        put_futures.push(self.spawn_chunk_put(
122                            content.clone(),
123                            proof.clone(),
124                            fb_peer,
125                            fb_addrs,
126                        ));
127                    }
128                }
129            }
130        }
131
132        Err(Error::InsufficientPeers(format!(
133            "Stored on {success_count} peers, need {CLOSE_GROUP_MAJORITY}. Failures: [{}]",
134            failures.join("; ")
135        )))
136    }
137
138    /// Spawn a chunk PUT future for a single peer.
139    fn spawn_chunk_put<'a>(
140        &'a self,
141        content: Bytes,
142        proof: Vec<u8>,
143        peer_id: &'a PeerId,
144        addrs: &'a [MultiAddr],
145    ) -> impl Future<Output = (PeerId, Result<XorName>)> + 'a {
146        let peer_id_owned = *peer_id;
147        async move {
148            let result = self
149                .chunk_put_with_proof(content, proof, &peer_id_owned, addrs)
150                .await;
151            (peer_id_owned, result)
152        }
153    }
154
155    /// Store a chunk on the Autonomi network with a pre-built payment proof.
156    ///
157    /// Sends to a single peer. Callers that need replication across the
158    /// close group should use `chunk_put_to_close_group` instead.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if the network operation fails.
163    pub async fn chunk_put_with_proof(
164        &self,
165        content: Bytes,
166        proof: Vec<u8>,
167        target_peer: &PeerId,
168        peer_addrs: &[MultiAddr],
169    ) -> Result<XorName> {
170        let address = compute_address(&content);
171        let node = self.network().node();
172        let timeout =
173            store_response_timeout_for_proof(&proof, self.config().merkle_store_timeout_secs);
174        let timeout_secs = timeout.as_secs();
175
176        let request_id = self.next_request_id();
177        // `content` is a refcounted `Bytes` shared with the sibling
178        // close-group sends; pass it through directly so each peer shares
179        // the same backing buffer instead of deep-copying the 4 MB payload.
180        let request = ChunkPutRequest::with_payment(address, content, proof);
181        let message = ChunkMessage {
182            request_id,
183            body: ChunkMessageBody::PutRequest(request),
184        };
185        let message_bytes = message
186            .encode()
187            .map_err(|e| Error::Protocol(format!("Failed to encode PUT request: {e}")))?;
188
189        let addr_hex = hex::encode(address);
190
191        let result = send_and_await_chunk_response(
192            node,
193            target_peer,
194            message_bytes,
195            request_id,
196            timeout,
197            peer_addrs,
198            |body| match body {
199                ChunkMessageBody::PutResponse(ChunkPutResponse::Success { address: addr }) => {
200                    debug!("Chunk stored at {}", hex::encode(addr));
201                    Some(Ok(addr))
202                }
203                ChunkMessageBody::PutResponse(ChunkPutResponse::AlreadyExists {
204                    address: addr,
205                }) => {
206                    debug!("Chunk already exists at {}", hex::encode(addr));
207                    Some(Ok(addr))
208                }
209                ChunkMessageBody::PutResponse(ChunkPutResponse::PaymentRequired { message }) => {
210                    Some(Err(Error::Payment(format!("Payment required: {message}"))))
211                }
212                ChunkMessageBody::PutResponse(ChunkPutResponse::Error(e)) => Some(Err(
213                    Error::Protocol(format!("Remote PUT error for {addr_hex}: {e}")),
214                )),
215                _ => None,
216            },
217            |e| Error::Network(format!("Failed to send PUT to peer: {e}")),
218            || {
219                Error::Timeout(format!(
220                    "Timeout waiting for store response after {timeout_secs}s"
221                ))
222            },
223        )
224        .await;
225
226        // No RTT recorded on the PUT path: the wall-clock is dominated by
227        // the ~4 MB payload upload, which reflects the uploader's uplink
228        // rather than the peer's responsiveness. Quote-path and GET-path
229        // RTTs still feed quality scoring.
230        record_peer_outcome(node, *target_peer, peer_addrs, result.is_ok(), None).await;
231
232        result
233    }
234
235    /// Retrieve a chunk from the Autonomi network.
236    ///
237    /// Queries all peers in the close group for the chunk address,
238    /// returning the first successful response. This handles the case
239    /// where the storing peer differs from the first peer returned by
240    /// DHT routing.
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if the network operation fails.
245    pub async fn chunk_get(&self, address: &XorName) -> Result<Option<DataChunk>> {
246        // Check cache first, with integrity verification.
247        if let Some(cached) = self.chunk_cache().get(address) {
248            let computed = compute_address(&cached);
249            if computed == *address {
250                debug!("Cache hit for chunk {}", hex::encode(address));
251                return Ok(Some(DataChunk::new(*address, cached)));
252            }
253            // Cache entry corrupted — evict and fall through to network fetch.
254            debug!(
255                "Cache corruption detected for {}: evicting",
256                hex::encode(address)
257            );
258            self.chunk_cache().remove(address);
259        }
260
261        let peers = self.close_group_peers(address).await?;
262        let addr_hex = hex::encode(address);
263
264        let queried = peers.len();
265        let mut not_found = 0usize;
266        let mut timeout = 0usize;
267        let mut network_err = 0usize;
268
269        for (peer, addrs) in &peers {
270            match self.chunk_get_from_peer(address, peer, addrs).await {
271                Ok(Some(chunk)) => {
272                    self.chunk_cache().put(chunk.address, chunk.content.clone());
273                    return Ok(Some(chunk));
274                }
275                Ok(None) => {
276                    not_found += 1;
277                    debug!("Chunk {addr_hex} not found on peer {peer}, trying next");
278                }
279                Err(Error::Timeout(_)) => {
280                    timeout += 1;
281                    debug!("Peer {peer} timed out for chunk {addr_hex}, trying next");
282                }
283                Err(Error::Network(_)) => {
284                    network_err += 1;
285                    debug!("Peer {peer} unreachable for chunk {addr_hex}, trying next");
286                }
287                Err(e) => return Err(e),
288            }
289        }
290
291        // None of the close group peers had the chunk. Emit a single summary
292        // so operators can distinguish data loss (all peers responded NotFound)
293        // from a reachability problem (most peers timed out / errored).
294        info!(
295            "chunk_get exhausted close group for {addr_hex}: \
296             queried={queried} not_found={not_found} timeout={timeout} network_err={network_err}"
297        );
298        Ok(None)
299    }
300
301    /// Fetch a chunk from a specific peer.
302    async fn chunk_get_from_peer(
303        &self,
304        address: &XorName,
305        peer: &PeerId,
306        peer_addrs: &[MultiAddr],
307    ) -> Result<Option<DataChunk>> {
308        let node = self.network().node();
309        let request_id = self.next_request_id();
310        let request = ChunkGetRequest::new(*address);
311        let message = ChunkMessage {
312            request_id,
313            body: ChunkMessageBody::GetRequest(request),
314        };
315        let message_bytes = message
316            .encode()
317            .map_err(|e| Error::Protocol(format!("Failed to encode GET request: {e}")))?;
318
319        let timeout = Duration::from_secs(self.config().chunk_get_timeout_secs);
320        let addr_hex = hex::encode(address);
321        let timeout_secs = self.config().chunk_get_timeout_secs;
322
323        let start = Instant::now();
324        let result = send_and_await_chunk_response(
325            node,
326            peer,
327            message_bytes,
328            request_id,
329            timeout,
330            peer_addrs,
331            |body| match body {
332                ChunkMessageBody::GetResponse(ChunkGetResponse::Success {
333                    address: addr,
334                    content,
335                }) => {
336                    if addr != *address {
337                        return Some(Err(Error::InvalidData(format!(
338                            "Mismatched chunk address: expected {addr_hex}, got {}",
339                            hex::encode(addr)
340                        ))));
341                    }
342
343                    let computed = compute_address(&content);
344                    if computed != addr {
345                        return Some(Err(Error::InvalidData(format!(
346                            "Invalid chunk content: expected hash {addr_hex}, got {}",
347                            hex::encode(computed)
348                        ))));
349                    }
350
351                    debug!(
352                        "Retrieved chunk {} ({} bytes) from peer {peer}",
353                        hex::encode(addr),
354                        content.len()
355                    );
356                    Some(Ok(Some(DataChunk::new(addr, Bytes::from(content)))))
357                }
358                ChunkMessageBody::GetResponse(ChunkGetResponse::NotFound { .. }) => Some(Ok(None)),
359                ChunkMessageBody::GetResponse(ChunkGetResponse::Error(e)) => Some(Err(
360                    Error::Protocol(format!("Remote GET error for {addr_hex}: {e}")),
361                )),
362                _ => None,
363            },
364            |e| Error::Network(format!("Failed to send GET to peer {peer}: {e}")),
365            || {
366                Error::Timeout(format!(
367                    "Timeout waiting for chunk {addr_hex} from {peer} after {timeout_secs}s"
368                ))
369            },
370        )
371        .await;
372
373        let success = result.is_ok();
374        let rtt_ms = success.then(|| start.elapsed().as_millis() as u64);
375        record_peer_outcome(node, *peer, peer_addrs, success, rtt_ms).await;
376
377        result
378    }
379
380    /// Check if a chunk exists on the network.
381    ///
382    /// # Errors
383    ///
384    /// Returns an error if the network operation fails.
385    pub async fn chunk_exists(&self, address: &XorName) -> Result<bool> {
386        self.chunk_get(address).await.map(|opt| opt.is_some())
387    }
388
389    /// Finalize a single-chunk publish after an external signer has paid.
390    ///
391    /// Single-chunk analogue of [`Client::finalize_upload`]. Takes a
392    /// [`PreparedChunk`] (from [`Client::prepare_chunk_payment`]) and a
393    /// `quote_hash -> tx_hash` map containing receipts for every non-zero
394    /// quote in the chunk's payment. Builds the `PaymentProof` and stores
395    /// the chunk on `CLOSE_GROUP_MAJORITY` peers, returning its address.
396    ///
397    /// Wave-batch payment shape only. Single-chunk publishes don't need
398    /// Merkle batching: one chunk's worth of quotes is well below the
399    /// wave-batch threshold.
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if the proof construction fails (e.g. missing
404    /// `tx_hash` for a non-zero quote) or if fewer than
405    /// `CLOSE_GROUP_MAJORITY` peers accept the chunk.
406    pub async fn finalize_chunk(
407        &self,
408        prepared: PreparedChunk,
409        tx_hash_map: &HashMap<QuoteHash, TxHash>,
410    ) -> Result<XorName> {
411        let mut paid = finalize_batch_payment(vec![prepared], tx_hash_map)?;
412        // finalize_batch_payment returns one PaidChunk per PreparedChunk
413        // input; we passed exactly one. If that invariant is ever violated
414        // it's an upstream bug — fail loudly rather than silently address-0.
415        let chunk = paid.pop().ok_or_else(|| {
416            Error::Payment(
417                "finalize_batch_payment returned no paid chunks for a single \
418                 prepared chunk — internal invariant violated"
419                    .into(),
420            )
421        })?;
422        self.chunk_put_to_close_group(chunk.content, chunk.proof_bytes, &chunk.quoted_peers)
423            .await
424    }
425}
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430    use ant_protocol::{PROOF_TAG_MERKLE, PROOF_TAG_SINGLE_NODE};
431
432    /// Arbitrary configured Merkle store timeout used by the timeout-selection tests.
433    const TEST_MERKLE_TIMEOUT_SECS: u64 = 60;
434    /// Sentinel byte used to represent an unknown/unrecognized proof tag.
435    const UNKNOWN_PROOF_TAG: u8 = 0xff;
436
437    #[test]
438    fn single_node_proof_uses_store_response_timeout() {
439        let timeout =
440            store_response_timeout_for_proof(&[PROOF_TAG_SINGLE_NODE], TEST_MERKLE_TIMEOUT_SECS);
441
442        assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
443    }
444
445    #[test]
446    fn unknown_proof_uses_store_response_timeout() {
447        let timeout =
448            store_response_timeout_for_proof(&[UNKNOWN_PROOF_TAG], TEST_MERKLE_TIMEOUT_SECS);
449
450        assert_eq!(timeout, STORE_RESPONSE_TIMEOUT);
451    }
452
453    #[test]
454    fn merkle_proof_uses_configured_store_timeout() {
455        let timeout =
456            store_response_timeout_for_proof(&[PROOF_TAG_MERKLE], TEST_MERKLE_TIMEOUT_SECS);
457
458        assert_eq!(timeout, Duration::from_secs(TEST_MERKLE_TIMEOUT_SECS));
459    }
460
461    /// Regression: the default `merkle_store_timeout_secs` must be at
462    /// least the storer-side `CLOSENESS_LOOKUP_TIMEOUT` (240 s) plus
463    /// padding. If either side moves and this invariant breaks, the
464    /// client will give up on chunks the storer is still verifying.
465    /// See `DEFAULT_MERKLE_STORE_TIMEOUT_SECS` doc comment for the
466    /// derivation.
467    #[test]
468    fn default_merkle_store_timeout_satisfies_storer_invariant() {
469        use crate::data::client::ClientConfig;
470        const STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS: u64 = 240;
471        const MIN_PADDING_SECS: u64 = 30;
472        let config = ClientConfig::default();
473        assert!(
474            config.merkle_store_timeout_secs
475                >= STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS + MIN_PADDING_SECS,
476            "merkle_store_timeout_secs ({}) must be >= storer CLOSENESS_LOOKUP_TIMEOUT ({}) + padding ({})",
477            config.merkle_store_timeout_secs,
478            STORER_CLOSENESS_LOOKUP_TIMEOUT_SECS,
479            MIN_PADDING_SECS,
480        );
481    }
482
483    /// Regression: the non-merkle PUT path uses the hardcoded
484    /// `STORE_RESPONSE_TIMEOUT` constant, not the per-config
485    /// `merkle_store_timeout_secs`. If a future refactor accidentally
486    /// routes non-merkle PUTs through the merkle field they'd inherit
487    /// the 270 s value and silently regress non-merkle latency.
488    /// `store_response_timeout_for_proof` with a non-merkle proof tag
489    /// must return the const regardless of what merkle timeout is
490    /// passed.
491    #[test]
492    fn non_merkle_put_ignores_merkle_timeout_value() {
493        let absurd_merkle_timeout = 9_999;
494        for tag in [PROOF_TAG_SINGLE_NODE, UNKNOWN_PROOF_TAG] {
495            let timeout = store_response_timeout_for_proof(&[tag], absurd_merkle_timeout);
496            assert_eq!(
497                timeout, STORE_RESPONSE_TIMEOUT,
498                "non-merkle proof tag {tag:#x} should ignore merkle timeout {absurd_merkle_timeout}",
499            );
500        }
501    }
502}