Skip to main content

ant_core/data/client/
data.rs

1//! In-memory data operations using self-encryption.
2//!
3//! Upload and download raw byte data. Content is encrypted via
4//! convergent encryption and stored as content-addressed chunks.
5//! Use this when you already have data in memory (e.g., `Bytes`).
6//! For file-based streaming uploads that avoid loading the entire
7//! file into memory, see the `file` module.
8
9use crate::data::client::adaptive::{observe_op, rebucketed_ordered};
10use crate::data::client::batch::{PaymentIntent, PreparedChunk};
11use crate::data::client::classify_error;
12use crate::data::client::file::{ExternalPaymentInfo, PreparedUpload, Visibility};
13use crate::data::client::merkle::{chunk_contents_for_upload_addresses, PaymentMode};
14use crate::data::client::Client;
15use crate::data::error::{Error, Result};
16use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
17use bytes::Bytes;
18use futures::stream::StreamExt;
19use self_encryption::{decrypt, encrypt, DataMap, EncryptedChunk};
20use std::num::NonZeroUsize;
21use tracing::{debug, info};
22
23/// Result of an in-memory data upload: the `DataMap` needed to retrieve the data.
24#[derive(Debug, Clone)]
25pub struct DataUploadResult {
26    /// The data map containing chunk metadata for reconstruction.
27    pub data_map: DataMap,
28    /// Number of chunks stored on the network.
29    pub chunks_stored: usize,
30    /// Which payment mode was actually used (not just requested).
31    pub payment_mode_used: PaymentMode,
32}
33
34impl Client {
35    /// Upload in-memory data to the network using self-encryption.
36    ///
37    /// The content is encrypted and split into chunks, each stored
38    /// as a content-addressed chunk on the network. Returns a `DataMap`
39    /// that can be used to retrieve and decrypt the data.
40    ///
41    /// # Errors
42    ///
43    /// Returns an error if encryption fails or any chunk cannot be stored.
44    pub async fn data_upload(&self, content: Bytes) -> Result<DataUploadResult> {
45        let content_len = content.len();
46        debug!("Encrypting data ({content_len} bytes)");
47
48        let (data_map, encrypted_chunks) = encrypt(content)
49            .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
50
51        info!("Data encrypted into {} chunks", encrypted_chunks.len());
52
53        let chunk_contents: Vec<Bytes> = encrypted_chunks
54            .into_iter()
55            .map(|chunk| chunk.content)
56            .collect();
57
58        let (addresses, _storage_cost, _gas_cost) =
59            self.batch_upload_chunks(chunk_contents).await?;
60        let chunks_stored = addresses.len();
61
62        info!("Data uploaded: {chunks_stored} chunks stored ({content_len} bytes original)");
63
64        Ok(DataUploadResult {
65            data_map,
66            chunks_stored,
67            payment_mode_used: PaymentMode::Single,
68        })
69    }
70
71    /// Upload in-memory data with a specific payment mode.
72    ///
73    /// When `mode` is `Auto` and the chunk count >= threshold, or when `mode`
74    /// is `Merkle`, this buffers all chunks and pays via a single merkle
75    /// batch transaction. Otherwise falls back to per-chunk payment.
76    ///
77    /// # Errors
78    ///
79    /// Returns an error if encryption fails or any chunk cannot be stored.
80    pub async fn data_upload_with_mode(
81        &self,
82        content: Bytes,
83        mode: PaymentMode,
84    ) -> Result<DataUploadResult> {
85        let content_len = content.len();
86        debug!("Encrypting data ({content_len} bytes) with mode {mode:?}");
87
88        let (data_map, encrypted_chunks) = encrypt(content)
89            .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
90
91        let chunk_count = encrypted_chunks.len();
92        info!("Data encrypted into {chunk_count} chunks");
93
94        let chunk_contents: Vec<Bytes> = encrypted_chunks
95            .into_iter()
96            .map(|chunk| chunk.content)
97            .collect();
98
99        if self.should_use_merkle(chunk_count, mode) {
100            // Merkle batch payment path
101            info!("Using merkle batch payment for {chunk_count} chunks");
102
103            let chunk_entries: Vec<([u8; 32], u64)> = chunk_contents
104                .iter()
105                .map(|chunk| {
106                    let size = u64::try_from(chunk.len())
107                        .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
108                    Ok((compute_address(chunk), size))
109                })
110                .collect::<Result<Vec<_>>>()?;
111            let merkle_plan = match self
112                .plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, None)
113                .await
114            {
115                Ok(plan) => plan,
116                Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
117                    info!("Merkle preflight needs more peers ({msg}), falling back to wave-batch");
118                    let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
119                    return Ok(DataUploadResult {
120                        data_map,
121                        chunks_stored: addresses.len(),
122                        payment_mode_used: PaymentMode::Single,
123                    });
124                }
125                Err(e) => return Err(e),
126            };
127
128            if merkle_plan.to_upload.is_empty() {
129                info!("All {chunk_count} chunks already stored; skipping merkle payment");
130                return Ok(DataUploadResult {
131                    data_map,
132                    chunks_stored: chunk_count,
133                    payment_mode_used: PaymentMode::Merkle,
134                });
135            }
136
137            let chunk_contents =
138                chunk_contents_for_upload_addresses(chunk_contents, &merkle_plan.to_upload)?;
139
140            let remaining_chunks = merkle_plan.to_upload.len();
141            if !self.should_use_merkle(remaining_chunks, mode) {
142                info!(
143                    "{remaining_chunks} chunks need upload after merkle preflight; \
144                     using single-node payment"
145                );
146                let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
147                return Ok(DataUploadResult {
148                    data_map,
149                    chunks_stored: merkle_plan.already_stored.len() + addresses.len(),
150                    payment_mode_used: PaymentMode::Single,
151                });
152            }
153
154            // Try merkle batch; in Auto mode, fall back to per-chunk on network issues
155            let batch_result = match self
156                .pay_for_merkle_batch(
157                    &merkle_plan.to_upload,
158                    DATA_TYPE_CHUNK,
159                    merkle_plan.to_upload_avg_size(),
160                )
161                .await
162            {
163                Ok(result) => result,
164                Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
165                    info!("Merkle needs more peers ({msg}), falling back to wave-batch");
166                    let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
167                    return Ok(DataUploadResult {
168                        data_map,
169                        chunks_stored: merkle_plan.already_stored.len() + addresses.len(),
170                        payment_mode_used: PaymentMode::Single,
171                    });
172                }
173                Err(e) => return Err(e),
174            };
175
176            let outcome = self
177                .merkle_upload_chunks(
178                    chunk_contents,
179                    merkle_plan.to_upload,
180                    &batch_result,
181                    None,
182                    merkle_plan.already_stored.len(),
183                    chunk_count,
184                )
185                .await?;
186            // Unlike `FileUploadResult`, `DataUploadResult` cannot express a
187            // partial store, and the returned `data_map` is unusable unless
188            // every chunk landed (download fails on any missing chunk). So a
189            // residual shortfall after retries is a hard failure here, not a
190            // success with a quietly broken data map.
191            if outcome.failed > 0 {
192                return Err(Error::InsufficientPeers(format!(
193                    "Data merkle upload incomplete: {} of {} chunk(s) short of quorum after retries",
194                    outcome.failed, chunk_count
195                )));
196            }
197
198            info!(
199                "Data uploaded via merkle: {} chunks stored ({content_len} bytes)",
200                outcome.stored
201            );
202            Ok(DataUploadResult {
203                data_map,
204                chunks_stored: outcome.stored,
205                payment_mode_used: PaymentMode::Merkle,
206            })
207        } else {
208            // Wave-based batch payment path (single EVM tx per wave).
209            let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
210
211            info!(
212                "Data uploaded: {} chunks stored ({content_len} bytes original)",
213                addresses.len()
214            );
215            Ok(DataUploadResult {
216                data_map,
217                chunks_stored: addresses.len(),
218                payment_mode_used: PaymentMode::Single,
219            })
220        }
221    }
222
223    /// Phase 1 of external-signer data upload: encrypt and collect quotes.
224    ///
225    /// Equivalent to [`Client::data_prepare_upload_with_visibility`] with
226    /// [`Visibility::Private`] — see that method for details.
227    pub async fn data_prepare_upload(&self, content: Bytes) -> Result<PreparedUpload> {
228        self.data_prepare_upload_with_visibility(content, Visibility::Private)
229            .await
230    }
231
232    /// Phase 1 of external-signer data upload with explicit [`Visibility`] control.
233    ///
234    /// Encrypts in-memory data via self-encryption, then collects storage
235    /// quotes for each chunk without making any on-chain payment. Returns
236    /// a [`PreparedUpload`] containing the data map and a [`PaymentIntent`]
237    /// with the payment details for external signing.
238    ///
239    /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
240    /// is bundled into the payment batch as an additional chunk and its
241    /// address is recorded on the returned [`PreparedUpload`]. After
242    /// [`Client::finalize_upload`] succeeds, that address is surfaced via
243    /// [`crate::data::client::file::FileUploadResult::data_map_address`] so
244    /// the uploader can share a single address from which anyone can retrieve
245    /// the data.
246    ///
247    /// Wave-batch payment only — the in-memory data path does not currently
248    /// support merkle batching. Use [`Client::file_prepare_upload_with_visibility`]
249    /// for merkle-eligible public uploads.
250    ///
251    /// After the caller signs and submits the payment transaction, call
252    /// [`Client::finalize_upload`] with the tx hashes to complete storage.
253    ///
254    /// # Errors
255    ///
256    /// Returns an error if encryption fails, DataMap serialization fails
257    /// (public only), or quote collection fails.
258    pub async fn data_prepare_upload_with_visibility(
259        &self,
260        content: Bytes,
261        visibility: Visibility,
262    ) -> Result<PreparedUpload> {
263        let content_len = content.len();
264        debug!("Preparing data upload for external signing (visibility={visibility:?}, {content_len} bytes)");
265
266        let (data_map, encrypted_chunks) = encrypt(content)
267            .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
268
269        let mut chunk_contents: Vec<Bytes> = encrypted_chunks
270            .into_iter()
271            .map(|chunk| chunk.content)
272            .collect();
273
274        info!("Data encrypted into {} chunks", chunk_contents.len());
275
276        // For public uploads, bundle the serialized DataMap as an extra chunk
277        // in the same payment batch. This lets the external signer pay for
278        // the data chunks and the DataMap chunk in one flow, and lets the
279        // finalize step return the DataMap's chunk address as the shareable
280        // retrieval address.
281        let data_map_address = match visibility {
282            Visibility::Private => None,
283            Visibility::Public => {
284                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
285                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
286                })?;
287                let bytes = Bytes::from(serialized);
288                let address = compute_address(&bytes);
289                info!(
290                    "Public upload: bundling DataMap chunk ({} bytes) at address {}",
291                    bytes.len(),
292                    hex::encode(address)
293                );
294                chunk_contents.push(bytes);
295                Some(address)
296            }
297        };
298
299        let chunk_count = chunk_contents.len();
300        let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_contents
301            .into_iter()
302            .map(|content| {
303                let address = compute_address(&content);
304                (content, address)
305            })
306            .collect();
307
308        let quote_limiter = self.controller().quote.clone();
309        let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
310        let results: Vec<([u8; 32], Result<Option<PreparedChunk>>)> =
311            futures::stream::iter(chunks_with_addr)
312                .map(|(content, address)| {
313                    let limiter = quote_limiter.clone();
314                    async move {
315                        let result = observe_op(
316                            &limiter,
317                            || async move { self.prepare_chunk_payment(content).await },
318                            classify_error,
319                        )
320                        .await;
321                        (address, result)
322                    }
323                })
324                .buffer_unordered(quote_concurrency)
325                .collect()
326                .await;
327
328        let mut prepared_chunks = Vec::with_capacity(results.len());
329        let mut already_stored_addresses = Vec::new();
330        for (address, result) in results {
331            match result? {
332                Some(prepared) => prepared_chunks.push(prepared),
333                None => already_stored_addresses.push(address),
334            }
335        }
336
337        if let Some(addr) = data_map_address {
338            if already_stored_addresses.contains(&addr) {
339                info!(
340                    "Public upload: DataMap chunk {} was already stored \
341                     on the network — address is retrievable without a \
342                     new payment",
343                    hex::encode(addr)
344                );
345            }
346        }
347
348        let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
349
350        info!(
351            "Data prepared for external signing: {} chunks, {} already stored, total {} atto ({content_len} bytes)",
352            prepared_chunks.len(),
353            already_stored_addresses.len(),
354            payment_intent.total_amount,
355        );
356
357        Ok(PreparedUpload {
358            data_map,
359            payment_info: ExternalPaymentInfo::WaveBatch {
360                prepared_chunks,
361                payment_intent,
362            },
363            data_map_address,
364            already_stored_addresses,
365            total_chunks: chunk_count,
366        })
367    }
368
369    /// Store a `DataMap` on the network as a public chunk.
370    ///
371    /// The serialized `DataMap` is stored as a regular content-addressed chunk.
372    /// Anyone who knows the returned address can retrieve and use the `DataMap`
373    /// to download the original data.
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if serialization or the chunk store fails.
378    pub async fn data_map_store(&self, data_map: &DataMap) -> Result<[u8; 32]> {
379        let serialized = rmp_serde::to_vec(data_map)
380            .map_err(|e| Error::Serialization(format!("Failed to serialize DataMap: {e}")))?;
381
382        info!(
383            "Storing DataMap as public chunk ({} bytes serialized)",
384            serialized.len()
385        );
386
387        self.chunk_put(Bytes::from(serialized)).await
388    }
389
390    /// Fetch a `DataMap` from the network by its chunk address.
391    ///
392    /// Retrieves the chunk at `address` and deserializes it as a `DataMap`.
393    ///
394    /// # Errors
395    ///
396    /// Returns an error if the chunk is not found or deserialization fails.
397    pub async fn data_map_fetch(&self, address: &[u8; 32]) -> Result<DataMap> {
398        let chunk = self.chunk_get(address).await?.ok_or_else(|| {
399            Error::InvalidData(format!(
400                "DataMap chunk not found at {}",
401                hex::encode(address)
402            ))
403        })?;
404
405        decode_data_map_chunk(&chunk.content)
406    }
407
408    /// Fetch a `DataMap` from the network by trying the requested number
409    /// of closest peers for the DataMap chunk.
410    ///
411    /// # Errors
412    ///
413    /// Returns an error if the chunk is not found or deserialization fails.
414    pub async fn data_map_fetch_from_closest_peers(
415        &self,
416        address: &[u8; 32],
417        peer_count: NonZeroUsize,
418    ) -> Result<DataMap> {
419        let chunk = self
420            .chunk_get_from_closest_peers(address, peer_count.get())
421            .await?
422            .ok_or_else(|| {
423                Error::InvalidData(format!(
424                    "DataMap chunk not found at {}",
425                    hex::encode(address)
426                ))
427            })?;
428
429        decode_data_map_chunk(&chunk.content)
430    }
431
432    /// Download and decrypt data from the network using its `DataMap`.
433    ///
434    /// Retrieves all chunks referenced by the data map, then decrypts
435    /// and reassembles the original content. Fetches chunks concurrently;
436    /// the fan-out is sized by the adaptive controller's `fetch` channel
437    /// and ramps up under healthy conditions.
438    ///
439    /// # Errors
440    ///
441    /// Returns an error if any chunk cannot be retrieved or decryption fails.
442    pub async fn data_download(&self, data_map: &DataMap) -> Result<Bytes> {
443        let chunk_infos = data_map.infos();
444        debug!("Downloading data ({} chunks)", chunk_infos.len());
445
446        // Extract owned addresses to avoid HRTB lifetime issue with
447        // stream::iter over references combined with async closures.
448        let addresses: Vec<[u8; 32]> = chunk_infos.iter().map(|info| info.dst_hash.0).collect();
449
450        // Rolling rebucketing: re-reads the controller's fetch cap as
451        // each slot frees, so a long download (e.g. 10 GB = ~2500
452        // chunks) sees adaptive growth/decay mid-flight without batch
453        // fences. Output is index-sorted so self_encryption decrypt
454        // sees DataMap-ordered chunks.
455        let fetch_limiter = self.controller().fetch.clone();
456        let encrypted_chunks: Vec<EncryptedChunk> = rebucketed_ordered(
457            &fetch_limiter,
458            addresses.into_iter().enumerate(),
459            |(idx, address)| {
460                async move {
461                    // chunk_get_observed feeds the adaptive fetch
462                    // limiter once per call via chunk_get_outcome
463                    // (Ok(None) -> Timeout is the load-shedding
464                    // signal for sustained close-group exhaustion).
465                    let chunk = self.chunk_get_observed(&address).await?.ok_or_else(|| {
466                        Error::InvalidData(format!(
467                            "Missing chunk {} required for data reconstruction",
468                            hex::encode(address)
469                        ))
470                    })?;
471                    Ok::<_, Error>((
472                        idx,
473                        EncryptedChunk {
474                            content: chunk.content,
475                        },
476                    ))
477                }
478            },
479        )
480        .await?;
481
482        debug!(
483            "All {} chunks retrieved, decrypting",
484            encrypted_chunks.len()
485        );
486
487        let content = decrypt(data_map, &encrypted_chunks)
488            .map_err(|e| Error::Encryption(format!("Failed to decrypt data: {e}")))?;
489
490        info!("Data downloaded and decrypted ({} bytes)", content.len());
491
492        Ok(content)
493    }
494}
495
496fn decode_data_map_chunk(content: &[u8]) -> Result<DataMap> {
497    rmp_serde::from_slice(content)
498        .map_err(|e| Error::Serialization(format!("Failed to deserialize DataMap: {e}")))
499}
500
501/// Compile-time assertions that Client method futures are Send.
502///
503/// These methods are called from axum handlers and tokio::spawn contexts
504/// that require Send + 'static. The async closures inside stream
505/// combinators must not capture references with concrete lifetimes
506/// (HRTB issue). If any of these checks fail, the stream closures
507/// need restructuring to use owned values instead of references.
508#[cfg(test)]
509mod send_assertions {
510    use super::*;
511
512    fn _assert_send<T: Send>(_: &T) {}
513
514    #[allow(
515        dead_code,
516        unreachable_code,
517        unused_variables,
518        clippy::diverging_sub_expression
519    )]
520    async fn _data_download_is_send(client: &Client) {
521        let dm: DataMap = todo!();
522        let fut = client.data_download(&dm);
523        _assert_send(&fut);
524    }
525
526    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
527    async fn _data_upload_is_send(client: &Client) {
528        let fut = client.data_upload(Bytes::new());
529        _assert_send(&fut);
530    }
531
532    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
533    async fn _data_upload_with_mode_is_send(client: &Client) {
534        let fut = client.data_upload_with_mode(Bytes::new(), PaymentMode::Auto);
535        _assert_send(&fut);
536    }
537
538    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
539    async fn _data_prepare_upload_is_send(client: &Client) {
540        let fut = client.data_prepare_upload(Bytes::new());
541        _assert_send(&fut);
542    }
543
544    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
545    async fn _data_prepare_upload_with_visibility_is_send(client: &Client) {
546        let fut = client.data_prepare_upload_with_visibility(Bytes::new(), Visibility::Public);
547        _assert_send(&fut);
548    }
549}