1use crate::data::client::adaptive::{observe_op, rebucketed_ordered};
10use crate::data::client::batch::{PaymentIntent, PreparedChunk};
11use crate::data::client::classify_error;
12use crate::data::client::file::{ExternalPaymentInfo, PreparedUpload, Visibility};
13use crate::data::client::merkle::PaymentMode;
14use crate::data::client::Client;
15use crate::data::error::{Error, Result};
16use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
17use bytes::Bytes;
18use futures::stream::StreamExt;
19use self_encryption::{decrypt, encrypt, DataMap, EncryptedChunk};
20use tracing::{debug, info};
21
22#[derive(Debug, Clone)]
24pub struct DataUploadResult {
25 pub data_map: DataMap,
27 pub chunks_stored: usize,
29 pub payment_mode_used: PaymentMode,
31}
32
33impl Client {
34 pub async fn data_upload(&self, content: Bytes) -> Result<DataUploadResult> {
44 let content_len = content.len();
45 debug!("Encrypting data ({content_len} bytes)");
46
47 let (data_map, encrypted_chunks) = encrypt(content)
48 .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
49
50 info!("Data encrypted into {} chunks", encrypted_chunks.len());
51
52 let chunk_contents: Vec<Bytes> = encrypted_chunks
53 .into_iter()
54 .map(|chunk| chunk.content)
55 .collect();
56
57 let (addresses, _storage_cost, _gas_cost) =
58 self.batch_upload_chunks(chunk_contents).await?;
59 let chunks_stored = addresses.len();
60
61 info!("Data uploaded: {chunks_stored} chunks stored ({content_len} bytes original)");
62
63 Ok(DataUploadResult {
64 data_map,
65 chunks_stored,
66 payment_mode_used: PaymentMode::Single,
67 })
68 }
69
70 pub async fn data_upload_with_mode(
80 &self,
81 content: Bytes,
82 mode: PaymentMode,
83 ) -> Result<DataUploadResult> {
84 let content_len = content.len();
85 debug!("Encrypting data ({content_len} bytes) with mode {mode:?}");
86
87 let (data_map, encrypted_chunks) = encrypt(content)
88 .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
89
90 let chunk_count = encrypted_chunks.len();
91 info!("Data encrypted into {chunk_count} chunks");
92
93 let chunk_contents: Vec<Bytes> = encrypted_chunks
94 .into_iter()
95 .map(|chunk| chunk.content)
96 .collect();
97
98 if self.should_use_merkle(chunk_count, mode) {
99 info!("Using merkle batch payment for {chunk_count} chunks");
101
102 let addresses: Vec<[u8; 32]> =
103 chunk_contents.iter().map(|c| compute_address(c)).collect();
104
105 let avg_size =
107 chunk_contents.iter().map(bytes::Bytes::len).sum::<usize>() / chunk_count.max(1);
108 let avg_size_u64 = u64::try_from(avg_size).unwrap_or(0);
109
110 let batch_result = match self
112 .pay_for_merkle_batch(&addresses, DATA_TYPE_CHUNK, avg_size_u64)
113 .await
114 {
115 Ok(result) => result,
116 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
117 info!("Merkle needs more peers ({msg}), falling back to wave-batch");
118 let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
119 return Ok(DataUploadResult {
120 data_map,
121 chunks_stored: addresses.len(),
122 payment_mode_used: PaymentMode::Single,
123 });
124 }
125 Err(e) => return Err(e),
126 };
127
128 let chunks_stored = self
129 .merkle_upload_chunks(chunk_contents, addresses, &batch_result, None)
130 .await?;
131
132 info!("Data uploaded via merkle: {chunks_stored} chunks stored ({content_len} bytes)");
133 Ok(DataUploadResult {
134 data_map,
135 chunks_stored,
136 payment_mode_used: PaymentMode::Merkle,
137 })
138 } else {
139 let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
141
142 info!(
143 "Data uploaded: {} chunks stored ({content_len} bytes original)",
144 addresses.len()
145 );
146 Ok(DataUploadResult {
147 data_map,
148 chunks_stored: addresses.len(),
149 payment_mode_used: PaymentMode::Single,
150 })
151 }
152 }
153
154 pub async fn data_prepare_upload(&self, content: Bytes) -> Result<PreparedUpload> {
159 self.data_prepare_upload_with_visibility(content, Visibility::Private)
160 .await
161 }
162
163 pub async fn data_prepare_upload_with_visibility(
190 &self,
191 content: Bytes,
192 visibility: Visibility,
193 ) -> Result<PreparedUpload> {
194 let content_len = content.len();
195 debug!("Preparing data upload for external signing (visibility={visibility:?}, {content_len} bytes)");
196
197 let (data_map, encrypted_chunks) = encrypt(content)
198 .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
199
200 let mut chunk_contents: Vec<Bytes> = encrypted_chunks
201 .into_iter()
202 .map(|chunk| chunk.content)
203 .collect();
204
205 info!("Data encrypted into {} chunks", chunk_contents.len());
206
207 let data_map_address = match visibility {
213 Visibility::Private => None,
214 Visibility::Public => {
215 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
216 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
217 })?;
218 let bytes = Bytes::from(serialized);
219 let address = compute_address(&bytes);
220 info!(
221 "Public upload: bundling DataMap chunk ({} bytes) at address {}",
222 bytes.len(),
223 hex::encode(address)
224 );
225 chunk_contents.push(bytes);
226 Some(address)
227 }
228 };
229
230 let chunk_count = chunk_contents.len();
231
232 let quote_limiter = self.controller().quote.clone();
233 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
234 let results: Vec<Result<Option<PreparedChunk>>> = futures::stream::iter(chunk_contents)
235 .map(|content| {
236 let limiter = quote_limiter.clone();
237 async move {
238 observe_op(
239 &limiter,
240 || async move { self.prepare_chunk_payment(content).await },
241 classify_error,
242 )
243 .await
244 }
245 })
246 .buffer_unordered(quote_concurrency)
247 .collect()
248 .await;
249
250 let mut prepared_chunks = Vec::with_capacity(results.len());
251 for result in results {
252 if let Some(prepared) = result? {
253 prepared_chunks.push(prepared);
254 }
255 }
256
257 let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
258
259 info!(
260 "Data prepared for external signing: {} chunks, total {} atto ({content_len} bytes)",
261 prepared_chunks.len(),
262 payment_intent.total_amount,
263 );
264
265 Ok(PreparedUpload {
266 data_map,
267 payment_info: ExternalPaymentInfo::WaveBatch {
268 prepared_chunks,
269 payment_intent,
270 },
271 data_map_address,
272 })
273 }
274
275 pub async fn data_map_store(&self, data_map: &DataMap) -> Result<[u8; 32]> {
285 let serialized = rmp_serde::to_vec(data_map)
286 .map_err(|e| Error::Serialization(format!("Failed to serialize DataMap: {e}")))?;
287
288 info!(
289 "Storing DataMap as public chunk ({} bytes serialized)",
290 serialized.len()
291 );
292
293 self.chunk_put(Bytes::from(serialized)).await
294 }
295
296 pub async fn data_map_fetch(&self, address: &[u8; 32]) -> Result<DataMap> {
304 let chunk = self.chunk_get(address).await?.ok_or_else(|| {
305 Error::InvalidData(format!(
306 "DataMap chunk not found at {}",
307 hex::encode(address)
308 ))
309 })?;
310
311 rmp_serde::from_slice(&chunk.content)
312 .map_err(|e| Error::Serialization(format!("Failed to deserialize DataMap: {e}")))
313 }
314
315 pub async fn data_download(&self, data_map: &DataMap) -> Result<Bytes> {
326 let chunk_infos = data_map.infos();
327 debug!("Downloading data ({} chunks)", chunk_infos.len());
328
329 let addresses: Vec<[u8; 32]> = chunk_infos.iter().map(|info| info.dst_hash.0).collect();
332
333 let fetch_limiter = self.controller().fetch.clone();
339 let encrypted_chunks: Vec<EncryptedChunk> = rebucketed_ordered(
340 &fetch_limiter,
341 addresses.into_iter().enumerate(),
342 |(idx, address)| {
343 let limiter = fetch_limiter.clone();
344 async move {
345 let chunk = observe_op(
346 &limiter,
347 || async move { self.chunk_get(&address).await },
348 classify_error,
349 )
350 .await?
351 .ok_or_else(|| {
352 Error::InvalidData(format!(
353 "Missing chunk {} required for data reconstruction",
354 hex::encode(address)
355 ))
356 })?;
357 Ok::<_, Error>((
358 idx,
359 EncryptedChunk {
360 content: chunk.content,
361 },
362 ))
363 }
364 },
365 )
366 .await?;
367
368 debug!(
369 "All {} chunks retrieved, decrypting",
370 encrypted_chunks.len()
371 );
372
373 let content = decrypt(data_map, &encrypted_chunks)
374 .map_err(|e| Error::Encryption(format!("Failed to decrypt data: {e}")))?;
375
376 info!("Data downloaded and decrypted ({} bytes)", content.len());
377
378 Ok(content)
379 }
380}
381
382#[cfg(test)]
390mod send_assertions {
391 use super::*;
392
393 fn _assert_send<T: Send>(_: &T) {}
394
395 #[allow(
396 dead_code,
397 unreachable_code,
398 unused_variables,
399 clippy::diverging_sub_expression
400 )]
401 async fn _data_download_is_send(client: &Client) {
402 let dm: DataMap = todo!();
403 let fut = client.data_download(&dm);
404 _assert_send(&fut);
405 }
406
407 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
408 async fn _data_upload_is_send(client: &Client) {
409 let fut = client.data_upload(Bytes::new());
410 _assert_send(&fut);
411 }
412
413 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
414 async fn _data_upload_with_mode_is_send(client: &Client) {
415 let fut = client.data_upload_with_mode(Bytes::new(), PaymentMode::Auto);
416 _assert_send(&fut);
417 }
418
419 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
420 async fn _data_prepare_upload_is_send(client: &Client) {
421 let fut = client.data_prepare_upload(Bytes::new());
422 _assert_send(&fut);
423 }
424
425 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
426 async fn _data_prepare_upload_with_visibility_is_send(client: &Client) {
427 let fut = client.data_prepare_upload_with_visibility(Bytes::new(), Visibility::Public);
428 _assert_send(&fut);
429 }
430}