Skip to main content

ant_core/data/client/
data.rs

1//! In-memory data operations using self-encryption.
2//!
3//! Upload and download raw byte data. Content is encrypted via
4//! convergent encryption and stored as content-addressed chunks.
5//! Use this when you already have data in memory (e.g., `Bytes`).
6//! For file-based streaming uploads that avoid loading the entire
7//! file into memory, see the `file` module.
8
9use crate::data::client::batch::{PaymentIntent, PreparedChunk};
10use crate::data::client::file::{ExternalPaymentInfo, PreparedUpload};
11use crate::data::client::merkle::PaymentMode;
12use crate::data::client::Client;
13use crate::data::error::{Error, Result};
14use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
15use bytes::Bytes;
16use futures::stream::{self, StreamExt, TryStreamExt};
17use self_encryption::{decrypt, encrypt, DataMap, EncryptedChunk};
18use tracing::{debug, info};
19
20/// Result of an in-memory data upload: the `DataMap` needed to retrieve the data.
21#[derive(Debug, Clone)]
22pub struct DataUploadResult {
23    /// The data map containing chunk metadata for reconstruction.
24    pub data_map: DataMap,
25    /// Number of chunks stored on the network.
26    pub chunks_stored: usize,
27    /// Which payment mode was actually used (not just requested).
28    pub payment_mode_used: PaymentMode,
29}
30
31impl Client {
32    /// Upload in-memory data to the network using self-encryption.
33    ///
34    /// The content is encrypted and split into chunks, each stored
35    /// as a content-addressed chunk on the network. Returns a `DataMap`
36    /// that can be used to retrieve and decrypt the data.
37    ///
38    /// # Errors
39    ///
40    /// Returns an error if encryption fails or any chunk cannot be stored.
41    pub async fn data_upload(&self, content: Bytes) -> Result<DataUploadResult> {
42        let content_len = content.len();
43        debug!("Encrypting data ({content_len} bytes)");
44
45        let (data_map, encrypted_chunks) = encrypt(content)
46            .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
47
48        info!("Data encrypted into {} chunks", encrypted_chunks.len());
49
50        let chunk_contents: Vec<Bytes> = encrypted_chunks
51            .into_iter()
52            .map(|chunk| chunk.content)
53            .collect();
54
55        let (addresses, _storage_cost, _gas_cost) =
56            self.batch_upload_chunks(chunk_contents).await?;
57        let chunks_stored = addresses.len();
58
59        info!("Data uploaded: {chunks_stored} chunks stored ({content_len} bytes original)");
60
61        Ok(DataUploadResult {
62            data_map,
63            chunks_stored,
64            payment_mode_used: PaymentMode::Single,
65        })
66    }
67
68    /// Upload in-memory data with a specific payment mode.
69    ///
70    /// When `mode` is `Auto` and the chunk count >= threshold, or when `mode`
71    /// is `Merkle`, this buffers all chunks and pays via a single merkle
72    /// batch transaction. Otherwise falls back to per-chunk payment.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if encryption fails or any chunk cannot be stored.
77    pub async fn data_upload_with_mode(
78        &self,
79        content: Bytes,
80        mode: PaymentMode,
81    ) -> Result<DataUploadResult> {
82        let content_len = content.len();
83        debug!("Encrypting data ({content_len} bytes) with mode {mode:?}");
84
85        let (data_map, encrypted_chunks) = encrypt(content)
86            .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
87
88        let chunk_count = encrypted_chunks.len();
89        info!("Data encrypted into {chunk_count} chunks");
90
91        let chunk_contents: Vec<Bytes> = encrypted_chunks
92            .into_iter()
93            .map(|chunk| chunk.content)
94            .collect();
95
96        if self.should_use_merkle(chunk_count, mode) {
97            // Merkle batch payment path
98            info!("Using merkle batch payment for {chunk_count} chunks");
99
100            let addresses: Vec<[u8; 32]> =
101                chunk_contents.iter().map(|c| compute_address(c)).collect();
102
103            // Compute average chunk size for quoting
104            let avg_size =
105                chunk_contents.iter().map(bytes::Bytes::len).sum::<usize>() / chunk_count.max(1);
106            let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0);
107
108            // Try merkle batch; in Auto mode, fall back to per-chunk on network issues
109            let batch_result = match self
110                .pay_for_merkle_batch(&addresses, DATA_TYPE_CHUNK, avg_size_u64)
111                .await
112            {
113                Ok(result) => result,
114                Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
115                    info!("Merkle needs more peers ({msg}), falling back to wave-batch");
116                    let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
117                    return Ok(DataUploadResult {
118                        data_map,
119                        chunks_stored: addresses.len(),
120                        payment_mode_used: PaymentMode::Single,
121                    });
122                }
123                Err(e) => return Err(e),
124            };
125
126            let chunks_stored = self
127                .merkle_upload_chunks(chunk_contents, addresses, &batch_result)
128                .await?;
129
130            info!("Data uploaded via merkle: {chunks_stored} chunks stored ({content_len} bytes)");
131            Ok(DataUploadResult {
132                data_map,
133                chunks_stored,
134                payment_mode_used: PaymentMode::Merkle,
135            })
136        } else {
137            // Wave-based batch payment path (single EVM tx per wave).
138            let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
139
140            info!(
141                "Data uploaded: {} chunks stored ({content_len} bytes original)",
142                addresses.len()
143            );
144            Ok(DataUploadResult {
145                data_map,
146                chunks_stored: addresses.len(),
147                payment_mode_used: PaymentMode::Single,
148            })
149        }
150    }
151
152    /// Phase 1 of external-signer data upload: encrypt and collect quotes.
153    ///
154    /// Encrypts in-memory data via self-encryption, then collects storage
155    /// quotes for each chunk without making any on-chain payment. Returns
156    /// a [`PreparedUpload`] containing the data map and a [`PaymentIntent`]
157    /// with the payment details for external signing.
158    ///
159    /// After the caller signs and submits the payment transaction, call
160    /// [`Client::finalize_upload`] with the tx hashes to complete storage.
161    ///
162    /// # Errors
163    ///
164    /// Returns an error if encryption fails or quote collection fails.
165    pub async fn data_prepare_upload(&self, content: Bytes) -> Result<PreparedUpload> {
166        let content_len = content.len();
167        debug!("Preparing data upload for external signing ({content_len} bytes)");
168
169        let (data_map, encrypted_chunks) = encrypt(content)
170            .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
171
172        let chunk_count = encrypted_chunks.len();
173        info!("Data encrypted into {chunk_count} chunks");
174
175        let chunk_contents: Vec<Bytes> = encrypted_chunks
176            .into_iter()
177            .map(|chunk| chunk.content)
178            .collect();
179
180        let quote_concurrency = self.config().quote_concurrency;
181        let results: Vec<Result<Option<PreparedChunk>>> = futures::stream::iter(chunk_contents)
182            .map(|content| async move { self.prepare_chunk_payment(content).await })
183            .buffer_unordered(quote_concurrency)
184            .collect()
185            .await;
186
187        let mut prepared_chunks = Vec::with_capacity(results.len());
188        for result in results {
189            if let Some(prepared) = result? {
190                prepared_chunks.push(prepared);
191            }
192        }
193
194        let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
195
196        info!(
197            "Data prepared for external signing: {} chunks, total {} atto ({content_len} bytes)",
198            prepared_chunks.len(),
199            payment_intent.total_amount,
200        );
201
202        Ok(PreparedUpload {
203            data_map,
204            payment_info: ExternalPaymentInfo::WaveBatch {
205                prepared_chunks,
206                payment_intent,
207            },
208            data_map_address: None,
209        })
210    }
211
212    /// Store a `DataMap` on the network as a public chunk.
213    ///
214    /// The serialized `DataMap` is stored as a regular content-addressed chunk.
215    /// Anyone who knows the returned address can retrieve and use the `DataMap`
216    /// to download the original data.
217    ///
218    /// # Errors
219    ///
220    /// Returns an error if serialization or the chunk store fails.
221    pub async fn data_map_store(&self, data_map: &DataMap) -> Result<[u8; 32]> {
222        let serialized = rmp_serde::to_vec(data_map)
223            .map_err(|e| Error::Serialization(format!("Failed to serialize DataMap: {e}")))?;
224
225        info!(
226            "Storing DataMap as public chunk ({} bytes serialized)",
227            serialized.len()
228        );
229
230        self.chunk_put(Bytes::from(serialized)).await
231    }
232
233    /// Fetch a `DataMap` from the network by its chunk address.
234    ///
235    /// Retrieves the chunk at `address` and deserializes it as a `DataMap`.
236    ///
237    /// # Errors
238    ///
239    /// Returns an error if the chunk is not found or deserialization fails.
240    pub async fn data_map_fetch(&self, address: &[u8; 32]) -> Result<DataMap> {
241        let chunk = self.chunk_get(address).await?.ok_or_else(|| {
242            Error::InvalidData(format!(
243                "DataMap chunk not found at {}",
244                hex::encode(address)
245            ))
246        })?;
247
248        rmp_serde::from_slice(&chunk.content)
249            .map_err(|e| Error::Serialization(format!("Failed to deserialize DataMap: {e}")))
250    }
251
252    /// Download and decrypt data from the network using its `DataMap`.
253    ///
254    /// Retrieves all chunks referenced by the data map, then decrypts
255    /// and reassembles the original content. Fetches chunks concurrently
256    /// (bounded by `quote_concurrency`) while preserving order.
257    ///
258    /// # Errors
259    ///
260    /// Returns an error if any chunk cannot be retrieved or decryption fails.
261    pub async fn data_download(&self, data_map: &DataMap) -> Result<Bytes> {
262        let chunk_infos = data_map.infos();
263        debug!("Downloading data ({} chunks)", chunk_infos.len());
264
265        // Extract owned addresses to avoid HRTB lifetime issue with
266        // stream::iter over references combined with async closures.
267        let addresses: Vec<[u8; 32]> = chunk_infos.iter().map(|info| info.dst_hash.0).collect();
268
269        let encrypted_chunks: Vec<EncryptedChunk> = stream::iter(addresses)
270            .map(|address| async move {
271                let chunk = self.chunk_get(&address).await?.ok_or_else(|| {
272                    Error::InvalidData(format!(
273                        "Missing chunk {} required for data reconstruction",
274                        hex::encode(address)
275                    ))
276                })?;
277                Ok::<_, Error>(EncryptedChunk {
278                    content: chunk.content,
279                })
280            })
281            .buffered(self.config().quote_concurrency)
282            .try_collect()
283            .await?;
284
285        debug!(
286            "All {} chunks retrieved, decrypting",
287            encrypted_chunks.len()
288        );
289
290        let content = decrypt(data_map, &encrypted_chunks)
291            .map_err(|e| Error::Encryption(format!("Failed to decrypt data: {e}")))?;
292
293        info!("Data downloaded and decrypted ({} bytes)", content.len());
294
295        Ok(content)
296    }
297}
298
299/// Compile-time assertions that Client method futures are Send.
300///
301/// These methods are called from axum handlers and tokio::spawn contexts
302/// that require Send + 'static. The async closures inside stream
303/// combinators must not capture references with concrete lifetimes
304/// (HRTB issue). If any of these checks fail, the stream closures
305/// need restructuring to use owned values instead of references.
306#[cfg(test)]
307mod send_assertions {
308    use super::*;
309
310    fn _assert_send<T: Send>(_: &T) {}
311
312    #[allow(
313        dead_code,
314        unreachable_code,
315        unused_variables,
316        clippy::diverging_sub_expression
317    )]
318    async fn _data_download_is_send(client: &Client) {
319        let dm: DataMap = todo!();
320        let fut = client.data_download(&dm);
321        _assert_send(&fut);
322    }
323
324    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
325    async fn _data_upload_is_send(client: &Client) {
326        let fut = client.data_upload(Bytes::new());
327        _assert_send(&fut);
328    }
329
330    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
331    async fn _data_upload_with_mode_is_send(client: &Client) {
332        let fut = client.data_upload_with_mode(Bytes::new(), PaymentMode::Auto);
333        _assert_send(&fut);
334    }
335
336    #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
337    async fn _data_prepare_upload_is_send(client: &Client) {
338        let fut = client.data_prepare_upload(Bytes::new());
339        _assert_send(&fut);
340    }
341}