Skip to main content

ant_core/data/client/
data.rs

1//! In-memory data operations using self-encryption.
2//!
3//! Upload and download raw byte data. Content is encrypted via
4//! convergent encryption and stored as content-addressed chunks.
5//! Use this when you already have data in memory (e.g., `Bytes`).
6//! For file-based streaming uploads that avoid loading the entire
7//! file into memory, see the `file` module.
8
9use crate::data::client::adaptive::{observe_op, rebucketed_ordered};
10use crate::data::client::batch::{PaymentIntent, PreparedChunk};
11use crate::data::client::classify_error;
12use crate::data::client::file::{ExternalPaymentInfo, PreparedUpload, Visibility};
13use crate::data::client::merkle::{chunk_contents_for_upload_addresses, PaymentMode};
14use crate::data::client::Client;
15use crate::data::error::{Error, Result};
16use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
17use bytes::Bytes;
18use futures::stream::StreamExt;
19use self_encryption::{decrypt, encrypt, DataMap, EncryptedChunk};
20use tracing::{debug, info};
21
22/// Result of an in-memory data upload: the `DataMap` needed to retrieve the data.
23#[derive(Debug, Clone)]
24pub struct DataUploadResult {
25    /// The data map containing chunk metadata for reconstruction.
26    pub data_map: DataMap,
27    /// Number of chunks stored on the network.
28    pub chunks_stored: usize,
29    /// Which payment mode was actually used (not just requested).
30    pub payment_mode_used: PaymentMode,
31}
32
33impl Client {
34    /// Upload in-memory data to the network using self-encryption.
35    ///
36    /// The content is encrypted and split into chunks, each stored
37    /// as a content-addressed chunk on the network. Returns a `DataMap`
38    /// that can be used to retrieve and decrypt the data.
39    ///
40    /// # Errors
41    ///
42    /// Returns an error if encryption fails or any chunk cannot be stored.
43    pub async fn data_upload(&self, content: Bytes) -> Result<DataUploadResult> {
44        let content_len = content.len();
45        debug!("Encrypting data ({content_len} bytes)");
46
47        let (data_map, encrypted_chunks) = encrypt(content)
48            .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
49
50        info!("Data encrypted into {} chunks", encrypted_chunks.len());
51
52        let chunk_contents: Vec<Bytes> = encrypted_chunks
53            .into_iter()
54            .map(|chunk| chunk.content)
55            .collect();
56
57        let (addresses, _storage_cost, _gas_cost) =
58            self.batch_upload_chunks(chunk_contents).await?;
59        let chunks_stored = addresses.len();
60
61        info!("Data uploaded: {chunks_stored} chunks stored ({content_len} bytes original)");
62
63        Ok(DataUploadResult {
64            data_map,
65            chunks_stored,
66            payment_mode_used: PaymentMode::Single,
67        })
68    }
69
70    /// Upload in-memory data with a specific payment mode.
71    ///
72    /// When `mode` is `Auto` and the chunk count >= threshold, or when `mode`
73    /// is `Merkle`, this buffers all chunks and pays via a single merkle
74    /// batch transaction. Otherwise falls back to per-chunk payment.
75    ///
76    /// # Errors
77    ///
78    /// Returns an error if encryption fails or any chunk cannot be stored.
79    pub async fn data_upload_with_mode(
80        &self,
81        content: Bytes,
82        mode: PaymentMode,
83    ) -> Result<DataUploadResult> {
84        let content_len = content.len();
85        debug!("Encrypting data ({content_len} bytes) with mode {mode:?}");
86
87        let (data_map, encrypted_chunks) = encrypt(content)
88            .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
89
90        let chunk_count = encrypted_chunks.len();
91        info!("Data encrypted into {chunk_count} chunks");
92
93        let chunk_contents: Vec<Bytes> = encrypted_chunks
94            .into_iter()
95            .map(|chunk| chunk.content)
96            .collect();
97
98        if self.should_use_merkle(chunk_count, mode) {
99            // Merkle batch payment path
100            info!("Using merkle batch payment for {chunk_count} chunks");
101
102            let chunk_entries: Vec<([u8; 32], u64)> = chunk_contents
103                .iter()
104                .map(|chunk| {
105                    let size = u64::try_from(chunk.len())
106                        .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
107                    Ok((compute_address(chunk), size))
108                })
109                .collect::<Result<Vec<_>>>()?;
110            let merkle_plan = match self
111                .plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, None)
112                .await
113            {
114                Ok(plan) => plan,
115                Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
116                    info!("Merkle preflight needs more peers ({msg}), falling back to wave-batch");
117                    let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
118                    return Ok(DataUploadResult {
119                        data_map,
120                        chunks_stored: addresses.len(),
121                        payment_mode_used: PaymentMode::Single,
122                    });
123                }
124                Err(e) => return Err(e),
125            };
126
127            if merkle_plan.to_upload.is_empty() {
128                info!("All {chunk_count} chunks already stored; skipping merkle payment");
129                return Ok(DataUploadResult {
130                    data_map,
131                    chunks_stored: chunk_count,
132                    payment_mode_used: PaymentMode::Merkle,
133                });
134            }
135
136            let chunk_contents =
137                chunk_contents_for_upload_addresses(chunk_contents, &merkle_plan.to_upload)?;
138
139            let remaining_chunks = merkle_plan.to_upload.len();
140            if !self.should_use_merkle(remaining_chunks, mode) {
141                info!(
142                    "{remaining_chunks} chunks need upload after merkle preflight; \
143                     using single-node payment"
144                );
145                let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
146                return Ok(DataUploadResult {
147                    data_map,
148                    chunks_stored: merkle_plan.already_stored.len() + addresses.len(),
149                    payment_mode_used: PaymentMode::Single,
150                });
151            }
152
153            // Try merkle batch; in Auto mode, fall back to per-chunk on network issues
154            let batch_result = match self
155                .pay_for_merkle_batch(
156                    &merkle_plan.to_upload,
157                    DATA_TYPE_CHUNK,
158                    merkle_plan.to_upload_avg_size(),
159                )
160                .await
161            {
162                Ok(result) => result,
163                Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
164                    info!("Merkle needs more peers ({msg}), falling back to wave-batch");
165                    let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
166                    return Ok(DataUploadResult {
167                        data_map,
168                        chunks_stored: merkle_plan.already_stored.len() + addresses.len(),
169                        payment_mode_used: PaymentMode::Single,
170                    });
171                }
172                Err(e) => return Err(e),
173            };
174
175            let outcome = self
176                .merkle_upload_chunks(
177                    chunk_contents,
178                    merkle_plan.to_upload,
179                    &batch_result,
180                    None,
181                    merkle_plan.already_stored.len(),
182                    chunk_count,
183                )
184                .await?;
185            // Unlike `FileUploadResult`, `DataUploadResult` cannot express a
186            // partial store, and the returned `data_map` is unusable unless
187            // every chunk landed (download fails on any missing chunk). So a
188            // residual shortfall after retries is a hard failure here, not a
189            // success with a quietly broken data map.
190            if outcome.failed > 0 {
191                return Err(Error::InsufficientPeers(format!(
192                    "Data merkle upload incomplete: {} of {} chunk(s) short of quorum after retries",
193                    outcome.failed, chunk_count
194                )));
195            }
196
197            info!(
198                "Data uploaded via merkle: {} chunks stored ({content_len} bytes)",
199                outcome.stored
200            );
201            Ok(DataUploadResult {
202                data_map,
203                chunks_stored: outcome.stored,
204                payment_mode_used: PaymentMode::Merkle,
205            })
206        } else {
207            // Wave-based batch payment path (single EVM tx per wave).
208            let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
209
210            info!(
211                "Data uploaded: {} chunks stored ({content_len} bytes original)",
212                addresses.len()
213            );
214            Ok(DataUploadResult {
215                data_map,
216                chunks_stored: addresses.len(),
217                payment_mode_used: PaymentMode::Single,
218            })
219        }
220    }
221
222    /// Phase 1 of external-signer data upload: encrypt and collect quotes.
223    ///
224    /// Equivalent to [`Client::data_prepare_upload_with_visibility`] with
225    /// [`Visibility::Private`] — see that method for details.
226    pub async fn data_prepare_upload(&self, content: Bytes) -> Result<PreparedUpload> {
227        self.data_prepare_upload_with_visibility(content, Visibility::Private)
228            .await
229    }
230
231    /// Phase 1 of external-signer data upload with explicit [`Visibility`] control.
232    ///
233    /// Encrypts in-memory data via self-encryption, then collects storage
234    /// quotes for each chunk without making any on-chain payment. Returns
235    /// a [`PreparedUpload`] containing the data map and a [`PaymentIntent`]
236    /// with the payment details for external signing.
237    ///
238    /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
239    /// is bundled into the payment batch as an additional chunk and its
240    /// address is recorded on the returned [`PreparedUpload`]. After
241    /// [`Client::finalize_upload`] succeeds, that address is surfaced via
242    /// [`crate::data::client::file::FileUploadResult::data_map_address`] so
243    /// the uploader can share a single address from which anyone can retrieve
244    /// the data.
245    ///
246    /// Wave-batch payment only — the in-memory data path does not currently
247    /// support merkle batching. Use [`Client::file_prepare_upload_with_visibility`]
248    /// for merkle-eligible public uploads.
249    ///
250    /// After the caller signs and submits the payment transaction, call
251    /// [`Client::finalize_upload`] with the tx hashes to complete storage.
252    ///
253    /// # Errors
254    ///
255    /// Returns an error if encryption fails, DataMap serialization fails
256    /// (public only), or quote collection fails.
257    pub async fn data_prepare_upload_with_visibility(
258        &self,
259        content: Bytes,
260        visibility: Visibility,
261    ) -> Result<PreparedUpload> {
262        let content_len = content.len();
263        debug!("Preparing data upload for external signing (visibility={visibility:?}, {content_len} bytes)");
264
265        let (data_map, encrypted_chunks) = encrypt(content)
266            .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
267
268        let mut chunk_contents: Vec<Bytes> = encrypted_chunks
269            .into_iter()
270            .map(|chunk| chunk.content)
271            .collect();
272
273        info!("Data encrypted into {} chunks", chunk_contents.len());
274
275        // For public uploads, bundle the serialized DataMap as an extra chunk
276        // in the same payment batch. This lets the external signer pay for
277        // the data chunks and the DataMap chunk in one flow, and lets the
278        // finalize step return the DataMap's chunk address as the shareable
279        // retrieval address.
280        let data_map_address = match visibility {
281            Visibility::Private => None,
282            Visibility::Public => {
283                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
284                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
285                })?;
286                let bytes = Bytes::from(serialized);
287                let address = compute_address(&bytes);
288                info!(
289                    "Public upload: bundling DataMap chunk ({} bytes) at address {}",
290                    bytes.len(),
291                    hex::encode(address)
292                );
293                chunk_contents.push(bytes);
294                Some(address)
295            }
296        };
297
298        let chunk_count = chunk_contents.len();
299        let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_contents
300            .into_iter()
301            .map(|content| {
302                let address = compute_address(&content);
303                (content, address)
304            })
305            .collect();
306
307        let quote_limiter = self.controller().quote.clone();
308        let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
309        let results: Vec<([u8; 32], Result<Option<PreparedChunk>>)> =
310            futures::stream::iter(chunks_with_addr)
311                .map(|(content, address)| {
312                    let limiter = quote_limiter.clone();
313                    async move {
314                        let result = observe_op(
315                            &limiter,
316                            || async move { self.prepare_chunk_payment(content).await },
317                            classify_error,
318                        )
319                        .await;
320                        (address, result)
321                    }
322                })
323                .buffer_unordered(quote_concurrency)
324                .collect()
325                .await;
326
327        let mut prepared_chunks = Vec::with_capacity(results.len());
328        let mut already_stored_addresses = Vec::new();
329        for (address, result) in results {
330            match result? {
331                Some(prepared) => prepared_chunks.push(prepared),
332                None => already_stored_addresses.push(address),
333            }
334        }
335
336        if let Some(addr) = data_map_address {
337            if already_stored_addresses.contains(&addr) {
338                info!(
339                    "Public upload: DataMap chunk {} was already stored \
340                     on the network — address is retrievable without a \
341                     new payment",
342                    hex::encode(addr)
343                );
344            }
345        }
346
347        let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
348
349        info!(
350            "Data prepared for external signing: {} chunks, {} already stored, total {} atto ({content_len} bytes)",
351            prepared_chunks.len(),
352            already_stored_addresses.len(),
353            payment_intent.total_amount,
354        );
355
356        Ok(PreparedUpload {
357            data_map,
358            payment_info: ExternalPaymentInfo::WaveBatch {
359                prepared_chunks,
360                payment_intent,
361            },
362            data_map_address,
363            already_stored_addresses,
364            total_chunks: chunk_count,
365        })
366    }
367
368    /// Store a `DataMap` on the network as a public chunk.
369    ///
370    /// The serialized `DataMap` is stored as a regular content-addressed chunk.
371    /// Anyone who knows the returned address can retrieve and use the `DataMap`
372    /// to download the original data.
373    ///
374    /// # Errors
375    ///
376    /// Returns an error if serialization or the chunk store fails.
377    pub async fn data_map_store(&self, data_map: &DataMap) -> Result<[u8; 32]> {
378        let serialized = rmp_serde::to_vec(data_map)
379            .map_err(|e| Error::Serialization(format!("Failed to serialize DataMap: {e}")))?;
380
381        info!(
382            "Storing DataMap as public chunk ({} bytes serialized)",
383            serialized.len()
384        );
385
386        self.chunk_put(Bytes::from(serialized)).await
387    }
388
389    /// Fetch a `DataMap` from the network by its chunk address.
390    ///
391    /// Retrieves the chunk at `address` and deserializes it as a `DataMap`.
392    ///
393    /// # Errors
394    ///
395    /// Returns an error if the chunk is not found or deserialization fails.
396    pub async fn data_map_fetch(&self, address: &[u8; 32]) -> Result<DataMap> {
397        let chunk = self.chunk_get(address).await?.ok_or_else(|| {
398            Error::InvalidData(format!(
399                "DataMap chunk not found at {}",
400                hex::encode(address)
401            ))
402        })?;
403
404        rmp_serde::from_slice(&chunk.content)
405            .map_err(|e| Error::Serialization(format!("Failed to deserialize DataMap: {e}")))
406    }
407
408    /// Download and decrypt data from the network using its `DataMap`.
409    ///
410    /// Retrieves all chunks referenced by the data map, then decrypts
411    /// and reassembles the original content. Fetches chunks concurrently;
412    /// the fan-out is sized by the adaptive controller's `fetch` channel
413    /// and ramps up under healthy conditions.
414    ///
415    /// # Errors
416    ///
417    /// Returns an error if any chunk cannot be retrieved or decryption fails.
418    pub async fn data_download(&self, data_map: &DataMap) -> Result<Bytes> {
419        let chunk_infos = data_map.infos();
420        debug!("Downloading data ({} chunks)", chunk_infos.len());
421
422        // Extract owned addresses to avoid HRTB lifetime issue with
423        // stream::iter over references combined with async closures.
424        let addresses: Vec<[u8; 32]> = chunk_infos.iter().map(|info| info.dst_hash.0).collect();
425
426        // Rolling rebucketing: re-reads the controller's fetch cap as
427        // each slot frees, so a long download (e.g. 10 GB = ~2500
428        // chunks) sees adaptive growth/decay mid-flight without batch
429        // fences. Output is index-sorted so self_encryption decrypt
430        // sees DataMap-ordered chunks.
431        let fetch_limiter = self.controller().fetch.clone();
432        let encrypted_chunks: Vec<EncryptedChunk> = rebucketed_ordered(
433            &fetch_limiter,
434            addresses.into_iter().enumerate(),
435            |(idx, address)| {
436                async move {
437                    // chunk_get_observed feeds the adaptive fetch
438                    // limiter once per call via chunk_get_outcome
439                    // (Ok(None) -> Timeout is the load-shedding
440                    // signal for sustained close-group exhaustion).
441                    let chunk = self.chunk_get_observed(&address).await?.ok_or_else(|| {
442                        Error::InvalidData(format!(
443                            "Missing chunk {} required for data reconstruction",
444                            hex::encode(address)
445                        ))
446                    })?;
447                    Ok::<_, Error>((
448                        idx,
449                        EncryptedChunk {
450                            content: chunk.content,
451                        },
452                    ))
453                }
454            },
455        )
456        .await?;
457
458        debug!(
459            "All {} chunks retrieved, decrypting",
460            encrypted_chunks.len()
461        );
462
463        let content = decrypt(data_map, &encrypted_chunks)
464            .map_err(|e| Error::Encryption(format!("Failed to decrypt data: {e}")))?;
465
466        info!("Data downloaded and decrypted ({} bytes)", content.len());
467
468        Ok(content)
469    }
470}
471
472/// Compile-time assertions that Client method futures are Send.
473///
474/// These methods are called from axum handlers and tokio::spawn contexts
475/// that require Send + 'static. The async closures inside stream
476/// combinators must not capture references with concrete lifetimes
477/// (HRTB issue). If any of these checks fail, the stream closures
478/// need restructuring to use owned values instead of references.
479#[cfg(test)]
480mod send_assertions {
481    use super::*;
482
483    fn _assert_send<T: Send>(_: &T) {}
484
485    #[allow(
486        dead_code,
487        unreachable_code,
488        unused_variables,
489        clippy::diverging_sub_expression
490    )]
491    async fn _data_download_is_send(client: &Client) {
492        let dm: DataMap = todo!();
493        let fut = client.data_download(&dm);
494        _assert_send(&fut);
495    }
496
497    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
498    async fn _data_upload_is_send(client: &Client) {
499        let fut = client.data_upload(Bytes::new());
500        _assert_send(&fut);
501    }
502
503    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
504    async fn _data_upload_with_mode_is_send(client: &Client) {
505        let fut = client.data_upload_with_mode(Bytes::new(), PaymentMode::Auto);
506        _assert_send(&fut);
507    }
508
509    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
510    async fn _data_prepare_upload_is_send(client: &Client) {
511        let fut = client.data_prepare_upload(Bytes::new());
512        _assert_send(&fut);
513    }
514
515    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
516    async fn _data_prepare_upload_with_visibility_is_send(client: &Client) {
517        let fut = client.data_prepare_upload_with_visibility(Bytes::new(), Visibility::Public);
518        _assert_send(&fut);
519    }
520}