ant_core/data/client/
data.rs1use crate::data::client::batch::{PaymentIntent, PreparedChunk};
10use crate::data::client::file::{ExternalPaymentInfo, PreparedUpload};
11use crate::data::client::merkle::PaymentMode;
12use crate::data::client::Client;
13use crate::data::error::{Error, Result};
14use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
15use bytes::Bytes;
16use futures::stream::{self, StreamExt, TryStreamExt};
17use self_encryption::{decrypt, encrypt, DataMap, EncryptedChunk};
18use tracing::{debug, info};
19
20#[derive(Debug, Clone)]
22pub struct DataUploadResult {
23 pub data_map: DataMap,
25 pub chunks_stored: usize,
27 pub payment_mode_used: PaymentMode,
29}
30
31impl Client {
32 pub async fn data_upload(&self, content: Bytes) -> Result<DataUploadResult> {
42 let content_len = content.len();
43 debug!("Encrypting data ({content_len} bytes)");
44
45 let (data_map, encrypted_chunks) = encrypt(content)
46 .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
47
48 info!("Data encrypted into {} chunks", encrypted_chunks.len());
49
50 let chunk_contents: Vec<Bytes> = encrypted_chunks
51 .into_iter()
52 .map(|chunk| chunk.content)
53 .collect();
54
55 let (addresses, _storage_cost, _gas_cost) =
56 self.batch_upload_chunks(chunk_contents).await?;
57 let chunks_stored = addresses.len();
58
59 info!("Data uploaded: {chunks_stored} chunks stored ({content_len} bytes original)");
60
61 Ok(DataUploadResult {
62 data_map,
63 chunks_stored,
64 payment_mode_used: PaymentMode::Single,
65 })
66 }
67
68 pub async fn data_upload_with_mode(
78 &self,
79 content: Bytes,
80 mode: PaymentMode,
81 ) -> Result<DataUploadResult> {
82 let content_len = content.len();
83 debug!("Encrypting data ({content_len} bytes) with mode {mode:?}");
84
85 let (data_map, encrypted_chunks) = encrypt(content)
86 .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
87
88 let chunk_count = encrypted_chunks.len();
89 info!("Data encrypted into {chunk_count} chunks");
90
91 let chunk_contents: Vec<Bytes> = encrypted_chunks
92 .into_iter()
93 .map(|chunk| chunk.content)
94 .collect();
95
96 if self.should_use_merkle(chunk_count, mode) {
97 info!("Using merkle batch payment for {chunk_count} chunks");
99
100 let addresses: Vec<[u8; 32]> =
101 chunk_contents.iter().map(|c| compute_address(c)).collect();
102
103 let avg_size =
105 chunk_contents.iter().map(bytes::Bytes::len).sum::<usize>() / chunk_count.max(1);
106 let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0);
107
108 let batch_result = match self
110 .pay_for_merkle_batch(&addresses, DATA_TYPE_CHUNK, avg_size_u64)
111 .await
112 {
113 Ok(result) => result,
114 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
115 info!("Merkle needs more peers ({msg}), falling back to wave-batch");
116 let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
117 return Ok(DataUploadResult {
118 data_map,
119 chunks_stored: addresses.len(),
120 payment_mode_used: PaymentMode::Single,
121 });
122 }
123 Err(e) => return Err(e),
124 };
125
126 let chunks_stored = self
127 .merkle_upload_chunks(chunk_contents, addresses, &batch_result)
128 .await?;
129
130 info!("Data uploaded via merkle: {chunks_stored} chunks stored ({content_len} bytes)");
131 Ok(DataUploadResult {
132 data_map,
133 chunks_stored,
134 payment_mode_used: PaymentMode::Merkle,
135 })
136 } else {
137 let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
139
140 info!(
141 "Data uploaded: {} chunks stored ({content_len} bytes original)",
142 addresses.len()
143 );
144 Ok(DataUploadResult {
145 data_map,
146 chunks_stored: addresses.len(),
147 payment_mode_used: PaymentMode::Single,
148 })
149 }
150 }
151
152 pub async fn data_prepare_upload(&self, content: Bytes) -> Result<PreparedUpload> {
166 let content_len = content.len();
167 debug!("Preparing data upload for external signing ({content_len} bytes)");
168
169 let (data_map, encrypted_chunks) = encrypt(content)
170 .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
171
172 let chunk_count = encrypted_chunks.len();
173 info!("Data encrypted into {chunk_count} chunks");
174
175 let chunk_contents: Vec<Bytes> = encrypted_chunks
176 .into_iter()
177 .map(|chunk| chunk.content)
178 .collect();
179
180 let quote_concurrency = self.config().quote_concurrency;
181 let results: Vec<Result<Option<PreparedChunk>>> = futures::stream::iter(chunk_contents)
182 .map(|content| async move { self.prepare_chunk_payment(content).await })
183 .buffer_unordered(quote_concurrency)
184 .collect()
185 .await;
186
187 let mut prepared_chunks = Vec::with_capacity(results.len());
188 for result in results {
189 if let Some(prepared) = result? {
190 prepared_chunks.push(prepared);
191 }
192 }
193
194 let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
195
196 info!(
197 "Data prepared for external signing: {} chunks, total {} atto ({content_len} bytes)",
198 prepared_chunks.len(),
199 payment_intent.total_amount,
200 );
201
202 Ok(PreparedUpload {
203 data_map,
204 payment_info: ExternalPaymentInfo::WaveBatch {
205 prepared_chunks,
206 payment_intent,
207 },
208 data_map_address: None,
209 })
210 }
211
212 pub async fn data_map_store(&self, data_map: &DataMap) -> Result<[u8; 32]> {
222 let serialized = rmp_serde::to_vec(data_map)
223 .map_err(|e| Error::Serialization(format!("Failed to serialize DataMap: {e}")))?;
224
225 info!(
226 "Storing DataMap as public chunk ({} bytes serialized)",
227 serialized.len()
228 );
229
230 self.chunk_put(Bytes::from(serialized)).await
231 }
232
233 pub async fn data_map_fetch(&self, address: &[u8; 32]) -> Result<DataMap> {
241 let chunk = self.chunk_get(address).await?.ok_or_else(|| {
242 Error::InvalidData(format!(
243 "DataMap chunk not found at {}",
244 hex::encode(address)
245 ))
246 })?;
247
248 rmp_serde::from_slice(&chunk.content)
249 .map_err(|e| Error::Serialization(format!("Failed to deserialize DataMap: {e}")))
250 }
251
252 pub async fn data_download(&self, data_map: &DataMap) -> Result<Bytes> {
262 let chunk_infos = data_map.infos();
263 debug!("Downloading data ({} chunks)", chunk_infos.len());
264
265 let addresses: Vec<[u8; 32]> = chunk_infos.iter().map(|info| info.dst_hash.0).collect();
268
269 let encrypted_chunks: Vec<EncryptedChunk> = stream::iter(addresses)
270 .map(|address| async move {
271 let chunk = self.chunk_get(&address).await?.ok_or_else(|| {
272 Error::InvalidData(format!(
273 "Missing chunk {} required for data reconstruction",
274 hex::encode(address)
275 ))
276 })?;
277 Ok::<_, Error>(EncryptedChunk {
278 content: chunk.content,
279 })
280 })
281 .buffered(self.config().quote_concurrency)
282 .try_collect()
283 .await?;
284
285 debug!(
286 "All {} chunks retrieved, decrypting",
287 encrypted_chunks.len()
288 );
289
290 let content = decrypt(data_map, &encrypted_chunks)
291 .map_err(|e| Error::Encryption(format!("Failed to decrypt data: {e}")))?;
292
293 info!("Data downloaded and decrypted ({} bytes)", content.len());
294
295 Ok(content)
296 }
297}
298
299#[cfg(test)]
307mod send_assertions {
308 use super::*;
309
310 fn _assert_send<T: Send>(_: &T) {}
311
312 #[allow(
313 dead_code,
314 unreachable_code,
315 unused_variables,
316 clippy::diverging_sub_expression
317 )]
318 async fn _data_download_is_send(client: &Client) {
319 let dm: DataMap = todo!();
320 let fut = client.data_download(&dm);
321 _assert_send(&fut);
322 }
323
324 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
325 async fn _data_upload_is_send(client: &Client) {
326 let fut = client.data_upload(Bytes::new());
327 _assert_send(&fut);
328 }
329
330 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
331 async fn _data_upload_with_mode_is_send(client: &Client) {
332 let fut = client.data_upload_with_mode(Bytes::new(), PaymentMode::Auto);
333 _assert_send(&fut);
334 }
335
336 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
337 async fn _data_prepare_upload_is_send(client: &Client) {
338 let fut = client.data_prepare_upload(Bytes::new());
339 _assert_send(&fut);
340 }
341}