Skip to main content

ant_core/data/client/
data.rs

1//! In-memory data operations using self-encryption.
2//!
3//! Upload and download raw byte data. Content is encrypted via
4//! convergent encryption and stored as content-addressed chunks.
5//! Use this when you already have data in memory (e.g., `Bytes`).
6//! For file-based streaming uploads that avoid loading the entire
7//! file into memory, see the `file` module.
8
9use crate::data::client::adaptive::{observe_op, rebucketed_ordered};
10use crate::data::client::batch::{PaymentIntent, PreparedChunk};
11use crate::data::client::classify_error;
12use crate::data::client::file::{ExternalPaymentInfo, PreparedUpload, Visibility};
13use crate::data::client::merkle::PaymentMode;
14use crate::data::client::Client;
15use crate::data::error::{Error, Result};
16use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
17use bytes::Bytes;
18use futures::stream::StreamExt;
19use self_encryption::{decrypt, encrypt, DataMap, EncryptedChunk};
20use tracing::{debug, info};
21
22/// Result of an in-memory data upload: the `DataMap` needed to retrieve the data.
23#[derive(Debug, Clone)]
24pub struct DataUploadResult {
25    /// The data map containing chunk metadata for reconstruction.
26    pub data_map: DataMap,
27    /// Number of chunks stored on the network.
28    pub chunks_stored: usize,
29    /// Which payment mode was actually used (not just requested).
30    pub payment_mode_used: PaymentMode,
31}
32
33impl Client {
34    /// Upload in-memory data to the network using self-encryption.
35    ///
36    /// The content is encrypted and split into chunks, each stored
37    /// as a content-addressed chunk on the network. Returns a `DataMap`
38    /// that can be used to retrieve and decrypt the data.
39    ///
40    /// # Errors
41    ///
42    /// Returns an error if encryption fails or any chunk cannot be stored.
43    pub async fn data_upload(&self, content: Bytes) -> Result<DataUploadResult> {
44        let content_len = content.len();
45        debug!("Encrypting data ({content_len} bytes)");
46
47        let (data_map, encrypted_chunks) = encrypt(content)
48            .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
49
50        info!("Data encrypted into {} chunks", encrypted_chunks.len());
51
52        let chunk_contents: Vec<Bytes> = encrypted_chunks
53            .into_iter()
54            .map(|chunk| chunk.content)
55            .collect();
56
57        let (addresses, _storage_cost, _gas_cost) =
58            self.batch_upload_chunks(chunk_contents).await?;
59        let chunks_stored = addresses.len();
60
61        info!("Data uploaded: {chunks_stored} chunks stored ({content_len} bytes original)");
62
63        Ok(DataUploadResult {
64            data_map,
65            chunks_stored,
66            payment_mode_used: PaymentMode::Single,
67        })
68    }
69
70    /// Upload in-memory data with a specific payment mode.
71    ///
72    /// When `mode` is `Auto` and the chunk count >= threshold, or when `mode`
73    /// is `Merkle`, this buffers all chunks and pays via a single merkle
74    /// batch transaction. Otherwise falls back to per-chunk payment.
75    ///
76    /// # Errors
77    ///
78    /// Returns an error if encryption fails or any chunk cannot be stored.
79    pub async fn data_upload_with_mode(
80        &self,
81        content: Bytes,
82        mode: PaymentMode,
83    ) -> Result<DataUploadResult> {
84        let content_len = content.len();
85        debug!("Encrypting data ({content_len} bytes) with mode {mode:?}");
86
87        let (data_map, encrypted_chunks) = encrypt(content)
88            .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
89
90        let chunk_count = encrypted_chunks.len();
91        info!("Data encrypted into {chunk_count} chunks");
92
93        let chunk_contents: Vec<Bytes> = encrypted_chunks
94            .into_iter()
95            .map(|chunk| chunk.content)
96            .collect();
97
98        if self.should_use_merkle(chunk_count, mode) {
99            // Merkle batch payment path
100            info!("Using merkle batch payment for {chunk_count} chunks");
101
102            let addresses: Vec<[u8; 32]> =
103                chunk_contents.iter().map(|c| compute_address(c)).collect();
104
105            // Compute average chunk size for quoting
106            let avg_size =
107                chunk_contents.iter().map(bytes::Bytes::len).sum::<usize>() / chunk_count.max(1);
108            let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0);
109
110            // Try merkle batch; in Auto mode, fall back to per-chunk on network issues
111            let batch_result = match self
112                .pay_for_merkle_batch(&addresses, DATA_TYPE_CHUNK, avg_size_u64)
113                .await
114            {
115                Ok(result) => result,
116                Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
117                    info!("Merkle needs more peers ({msg}), falling back to wave-batch");
118                    let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
119                    return Ok(DataUploadResult {
120                        data_map,
121                        chunks_stored: addresses.len(),
122                        payment_mode_used: PaymentMode::Single,
123                    });
124                }
125                Err(e) => return Err(e),
126            };
127
128            let chunks_stored = self
129                .merkle_upload_chunks(chunk_contents, addresses, &batch_result, None)
130                .await?;
131
132            info!("Data uploaded via merkle: {chunks_stored} chunks stored ({content_len} bytes)");
133            Ok(DataUploadResult {
134                data_map,
135                chunks_stored,
136                payment_mode_used: PaymentMode::Merkle,
137            })
138        } else {
139            // Wave-based batch payment path (single EVM tx per wave).
140            let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
141
142            info!(
143                "Data uploaded: {} chunks stored ({content_len} bytes original)",
144                addresses.len()
145            );
146            Ok(DataUploadResult {
147                data_map,
148                chunks_stored: addresses.len(),
149                payment_mode_used: PaymentMode::Single,
150            })
151        }
152    }
153
154    /// Phase 1 of external-signer data upload: encrypt and collect quotes.
155    ///
156    /// Equivalent to [`Client::data_prepare_upload_with_visibility`] with
157    /// [`Visibility::Private`] — see that method for details.
158    pub async fn data_prepare_upload(&self, content: Bytes) -> Result<PreparedUpload> {
159        self.data_prepare_upload_with_visibility(content, Visibility::Private)
160            .await
161    }
162
163    /// Phase 1 of external-signer data upload with explicit [`Visibility`] control.
164    ///
165    /// Encrypts in-memory data via self-encryption, then collects storage
166    /// quotes for each chunk without making any on-chain payment. Returns
167    /// a [`PreparedUpload`] containing the data map and a [`PaymentIntent`]
168    /// with the payment details for external signing.
169    ///
170    /// When `visibility` is [`Visibility::Public`], the serialized `DataMap`
171    /// is bundled into the payment batch as an additional chunk and its
172    /// address is recorded on the returned [`PreparedUpload`]. After
173    /// [`Client::finalize_upload`] succeeds, that address is surfaced via
174    /// [`crate::data::client::file::FileUploadResult::data_map_address`] so
175    /// the uploader can share a single address from which anyone can retrieve
176    /// the data.
177    ///
178    /// Wave-batch payment only — the in-memory data path does not currently
179    /// support merkle batching. Use [`Client::file_prepare_upload_with_visibility`]
180    /// for merkle-eligible public uploads.
181    ///
182    /// After the caller signs and submits the payment transaction, call
183    /// [`Client::finalize_upload`] with the tx hashes to complete storage.
184    ///
185    /// # Errors
186    ///
187    /// Returns an error if encryption fails, DataMap serialization fails
188    /// (public only), or quote collection fails.
189    pub async fn data_prepare_upload_with_visibility(
190        &self,
191        content: Bytes,
192        visibility: Visibility,
193    ) -> Result<PreparedUpload> {
194        let content_len = content.len();
195        debug!("Preparing data upload for external signing (visibility={visibility:?}, {content_len} bytes)");
196
197        let (data_map, encrypted_chunks) = encrypt(content)
198            .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
199
200        let mut chunk_contents: Vec<Bytes> = encrypted_chunks
201            .into_iter()
202            .map(|chunk| chunk.content)
203            .collect();
204
205        info!("Data encrypted into {} chunks", chunk_contents.len());
206
207        // For public uploads, bundle the serialized DataMap as an extra chunk
208        // in the same payment batch. This lets the external signer pay for
209        // the data chunks and the DataMap chunk in one flow, and lets the
210        // finalize step return the DataMap's chunk address as the shareable
211        // retrieval address.
212        let data_map_address = match visibility {
213            Visibility::Private => None,
214            Visibility::Public => {
215                let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
216                    Error::Serialization(format!("Failed to serialize DataMap: {e}"))
217                })?;
218                let bytes = Bytes::from(serialized);
219                let address = compute_address(&bytes);
220                info!(
221                    "Public upload: bundling DataMap chunk ({} bytes) at address {}",
222                    bytes.len(),
223                    hex::encode(address)
224                );
225                chunk_contents.push(bytes);
226                Some(address)
227            }
228        };
229
230        let chunk_count = chunk_contents.len();
231
232        let quote_limiter = self.controller().quote.clone();
233        let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
234        let results: Vec<Result<Option<PreparedChunk>>> = futures::stream::iter(chunk_contents)
235            .map(|content| {
236                let limiter = quote_limiter.clone();
237                async move {
238                    observe_op(
239                        &limiter,
240                        || async move { self.prepare_chunk_payment(content).await },
241                        classify_error,
242                    )
243                    .await
244                }
245            })
246            .buffer_unordered(quote_concurrency)
247            .collect()
248            .await;
249
250        let mut prepared_chunks = Vec::with_capacity(results.len());
251        for result in results {
252            if let Some(prepared) = result? {
253                prepared_chunks.push(prepared);
254            }
255        }
256
257        let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
258
259        info!(
260            "Data prepared for external signing: {} chunks, total {} atto ({content_len} bytes)",
261            prepared_chunks.len(),
262            payment_intent.total_amount,
263        );
264
265        Ok(PreparedUpload {
266            data_map,
267            payment_info: ExternalPaymentInfo::WaveBatch {
268                prepared_chunks,
269                payment_intent,
270            },
271            data_map_address,
272        })
273    }
274
275    /// Store a `DataMap` on the network as a public chunk.
276    ///
277    /// The serialized `DataMap` is stored as a regular content-addressed chunk.
278    /// Anyone who knows the returned address can retrieve and use the `DataMap`
279    /// to download the original data.
280    ///
281    /// # Errors
282    ///
283    /// Returns an error if serialization or the chunk store fails.
284    pub async fn data_map_store(&self, data_map: &DataMap) -> Result<[u8; 32]> {
285        let serialized = rmp_serde::to_vec(data_map)
286            .map_err(|e| Error::Serialization(format!("Failed to serialize DataMap: {e}")))?;
287
288        info!(
289            "Storing DataMap as public chunk ({} bytes serialized)",
290            serialized.len()
291        );
292
293        self.chunk_put(Bytes::from(serialized)).await
294    }
295
296    /// Fetch a `DataMap` from the network by its chunk address.
297    ///
298    /// Retrieves the chunk at `address` and deserializes it as a `DataMap`.
299    ///
300    /// # Errors
301    ///
302    /// Returns an error if the chunk is not found or deserialization fails.
303    pub async fn data_map_fetch(&self, address: &[u8; 32]) -> Result<DataMap> {
304        let chunk = self.chunk_get(address).await?.ok_or_else(|| {
305            Error::InvalidData(format!(
306                "DataMap chunk not found at {}",
307                hex::encode(address)
308            ))
309        })?;
310
311        rmp_serde::from_slice(&chunk.content)
312            .map_err(|e| Error::Serialization(format!("Failed to deserialize DataMap: {e}")))
313    }
314
315    /// Download and decrypt data from the network using its `DataMap`.
316    ///
317    /// Retrieves all chunks referenced by the data map, then decrypts
318    /// and reassembles the original content. Fetches chunks concurrently;
319    /// the fan-out is sized by the adaptive controller's `fetch` channel
320    /// and ramps up under healthy conditions.
321    ///
322    /// # Errors
323    ///
324    /// Returns an error if any chunk cannot be retrieved or decryption fails.
325    pub async fn data_download(&self, data_map: &DataMap) -> Result<Bytes> {
326        let chunk_infos = data_map.infos();
327        debug!("Downloading data ({} chunks)", chunk_infos.len());
328
329        // Extract owned addresses to avoid HRTB lifetime issue with
330        // stream::iter over references combined with async closures.
331        let addresses: Vec<[u8; 32]> = chunk_infos.iter().map(|info| info.dst_hash.0).collect();
332
333        // Rolling rebucketing: re-reads the controller's fetch cap as
334        // each slot frees, so a long download (e.g. 10 GB = ~2500
335        // chunks) sees adaptive growth/decay mid-flight without batch
336        // fences. Output is index-sorted so self_encryption decrypt
337        // sees DataMap-ordered chunks.
338        let fetch_limiter = self.controller().fetch.clone();
339        let encrypted_chunks: Vec<EncryptedChunk> = rebucketed_ordered(
340            &fetch_limiter,
341            addresses.into_iter().enumerate(),
342            |(idx, address)| {
343                let limiter = fetch_limiter.clone();
344                async move {
345                    let chunk = observe_op(
346                        &limiter,
347                        || async move { self.chunk_get(&address).await },
348                        classify_error,
349                    )
350                    .await?
351                    .ok_or_else(|| {
352                        Error::InvalidData(format!(
353                            "Missing chunk {} required for data reconstruction",
354                            hex::encode(address)
355                        ))
356                    })?;
357                    Ok::<_, Error>((
358                        idx,
359                        EncryptedChunk {
360                            content: chunk.content,
361                        },
362                    ))
363                }
364            },
365        )
366        .await?;
367
368        debug!(
369            "All {} chunks retrieved, decrypting",
370            encrypted_chunks.len()
371        );
372
373        let content = decrypt(data_map, &encrypted_chunks)
374            .map_err(|e| Error::Encryption(format!("Failed to decrypt data: {e}")))?;
375
376        info!("Data downloaded and decrypted ({} bytes)", content.len());
377
378        Ok(content)
379    }
380}
381
382/// Compile-time assertions that Client method futures are Send.
383///
384/// These methods are called from axum handlers and tokio::spawn contexts
385/// that require Send + 'static. The async closures inside stream
386/// combinators must not capture references with concrete lifetimes
387/// (HRTB issue). If any of these checks fail, the stream closures
388/// need restructuring to use owned values instead of references.
389#[cfg(test)]
390mod send_assertions {
391    use super::*;
392
393    fn _assert_send<T: Send>(_: &T) {}
394
395    #[allow(
396        dead_code,
397        unreachable_code,
398        unused_variables,
399        clippy::diverging_sub_expression
400    )]
401    async fn _data_download_is_send(client: &Client) {
402        let dm: DataMap = todo!();
403        let fut = client.data_download(&dm);
404        _assert_send(&fut);
405    }
406
407    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
408    async fn _data_upload_is_send(client: &Client) {
409        let fut = client.data_upload(Bytes::new());
410        _assert_send(&fut);
411    }
412
413    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
414    async fn _data_upload_with_mode_is_send(client: &Client) {
415        let fut = client.data_upload_with_mode(Bytes::new(), PaymentMode::Auto);
416        _assert_send(&fut);
417    }
418
419    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
420    async fn _data_prepare_upload_is_send(client: &Client) {
421        let fut = client.data_prepare_upload(Bytes::new());
422        _assert_send(&fut);
423    }
424
425    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
426    async fn _data_prepare_upload_with_visibility_is_send(client: &Client) {
427        let fut = client.data_prepare_upload_with_visibility(Bytes::new(), Visibility::Public);
428        _assert_send(&fut);
429    }
430}