1use crate::data::client::adaptive::{observe_op, rebucketed_ordered};
10use crate::data::client::batch::{PaymentIntent, PreparedChunk};
11use crate::data::client::classify_error;
12use crate::data::client::file::{ExternalPaymentInfo, PreparedUpload, Visibility};
13use crate::data::client::merkle::{chunk_contents_for_upload_addresses, PaymentMode};
14use crate::data::client::Client;
15use crate::data::error::{Error, Result};
16use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
17use bytes::Bytes;
18use futures::stream::StreamExt;
19use self_encryption::{decrypt, encrypt, DataMap, EncryptedChunk};
20use tracing::{debug, info};
21
22#[derive(Debug, Clone)]
24pub struct DataUploadResult {
25 pub data_map: DataMap,
27 pub chunks_stored: usize,
29 pub payment_mode_used: PaymentMode,
31}
32
33impl Client {
34 pub async fn data_upload(&self, content: Bytes) -> Result<DataUploadResult> {
44 let content_len = content.len();
45 debug!("Encrypting data ({content_len} bytes)");
46
47 let (data_map, encrypted_chunks) = encrypt(content)
48 .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
49
50 info!("Data encrypted into {} chunks", encrypted_chunks.len());
51
52 let chunk_contents: Vec<Bytes> = encrypted_chunks
53 .into_iter()
54 .map(|chunk| chunk.content)
55 .collect();
56
57 let (addresses, _storage_cost, _gas_cost) =
58 self.batch_upload_chunks(chunk_contents).await?;
59 let chunks_stored = addresses.len();
60
61 info!("Data uploaded: {chunks_stored} chunks stored ({content_len} bytes original)");
62
63 Ok(DataUploadResult {
64 data_map,
65 chunks_stored,
66 payment_mode_used: PaymentMode::Single,
67 })
68 }
69
70 pub async fn data_upload_with_mode(
80 &self,
81 content: Bytes,
82 mode: PaymentMode,
83 ) -> Result<DataUploadResult> {
84 let content_len = content.len();
85 debug!("Encrypting data ({content_len} bytes) with mode {mode:?}");
86
87 let (data_map, encrypted_chunks) = encrypt(content)
88 .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
89
90 let chunk_count = encrypted_chunks.len();
91 info!("Data encrypted into {chunk_count} chunks");
92
93 let chunk_contents: Vec<Bytes> = encrypted_chunks
94 .into_iter()
95 .map(|chunk| chunk.content)
96 .collect();
97
98 if self.should_use_merkle(chunk_count, mode) {
99 info!("Using merkle batch payment for {chunk_count} chunks");
101
102 let chunk_entries: Vec<([u8; 32], u64)> = chunk_contents
103 .iter()
104 .map(|chunk| {
105 let size = u64::try_from(chunk.len())
106 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
107 Ok((compute_address(chunk), size))
108 })
109 .collect::<Result<Vec<_>>>()?;
110 let merkle_plan = match self
111 .plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, None)
112 .await
113 {
114 Ok(plan) => plan,
115 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
116 info!("Merkle preflight needs more peers ({msg}), falling back to wave-batch");
117 let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
118 return Ok(DataUploadResult {
119 data_map,
120 chunks_stored: addresses.len(),
121 payment_mode_used: PaymentMode::Single,
122 });
123 }
124 Err(e) => return Err(e),
125 };
126
127 if merkle_plan.to_upload.is_empty() {
128 info!("All {chunk_count} chunks already stored; skipping merkle payment");
129 return Ok(DataUploadResult {
130 data_map,
131 chunks_stored: chunk_count,
132 payment_mode_used: PaymentMode::Merkle,
133 });
134 }
135
136 let chunk_contents =
137 chunk_contents_for_upload_addresses(chunk_contents, &merkle_plan.to_upload)?;
138
139 let remaining_chunks = merkle_plan.to_upload.len();
140 if !self.should_use_merkle(remaining_chunks, mode) {
141 info!(
142 "{remaining_chunks} chunks need upload after merkle preflight; \
143 using single-node payment"
144 );
145 let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
146 return Ok(DataUploadResult {
147 data_map,
148 chunks_stored: merkle_plan.already_stored.len() + addresses.len(),
149 payment_mode_used: PaymentMode::Single,
150 });
151 }
152
153 let batch_result = match self
155 .pay_for_merkle_batch(
156 &merkle_plan.to_upload,
157 DATA_TYPE_CHUNK,
158 merkle_plan.to_upload_avg_size(),
159 )
160 .await
161 {
162 Ok(result) => result,
163 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
164 info!("Merkle needs more peers ({msg}), falling back to wave-batch");
165 let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
166 return Ok(DataUploadResult {
167 data_map,
168 chunks_stored: merkle_plan.already_stored.len() + addresses.len(),
169 payment_mode_used: PaymentMode::Single,
170 });
171 }
172 Err(e) => return Err(e),
173 };
174
175 let outcome = self
176 .merkle_upload_chunks(
177 chunk_contents,
178 merkle_plan.to_upload,
179 &batch_result,
180 None,
181 merkle_plan.already_stored.len(),
182 chunk_count,
183 )
184 .await?;
185 if outcome.failed > 0 {
191 return Err(Error::InsufficientPeers(format!(
192 "Data merkle upload incomplete: {} of {} chunk(s) short of quorum after retries",
193 outcome.failed, chunk_count
194 )));
195 }
196
197 info!(
198 "Data uploaded via merkle: {} chunks stored ({content_len} bytes)",
199 outcome.stored
200 );
201 Ok(DataUploadResult {
202 data_map,
203 chunks_stored: outcome.stored,
204 payment_mode_used: PaymentMode::Merkle,
205 })
206 } else {
207 let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
209
210 info!(
211 "Data uploaded: {} chunks stored ({content_len} bytes original)",
212 addresses.len()
213 );
214 Ok(DataUploadResult {
215 data_map,
216 chunks_stored: addresses.len(),
217 payment_mode_used: PaymentMode::Single,
218 })
219 }
220 }
221
222 pub async fn data_prepare_upload(&self, content: Bytes) -> Result<PreparedUpload> {
227 self.data_prepare_upload_with_visibility(content, Visibility::Private)
228 .await
229 }
230
231 pub async fn data_prepare_upload_with_visibility(
258 &self,
259 content: Bytes,
260 visibility: Visibility,
261 ) -> Result<PreparedUpload> {
262 let content_len = content.len();
263 debug!("Preparing data upload for external signing (visibility={visibility:?}, {content_len} bytes)");
264
265 let (data_map, encrypted_chunks) = encrypt(content)
266 .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
267
268 let mut chunk_contents: Vec<Bytes> = encrypted_chunks
269 .into_iter()
270 .map(|chunk| chunk.content)
271 .collect();
272
273 info!("Data encrypted into {} chunks", chunk_contents.len());
274
275 let data_map_address = match visibility {
281 Visibility::Private => None,
282 Visibility::Public => {
283 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
284 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
285 })?;
286 let bytes = Bytes::from(serialized);
287 let address = compute_address(&bytes);
288 info!(
289 "Public upload: bundling DataMap chunk ({} bytes) at address {}",
290 bytes.len(),
291 hex::encode(address)
292 );
293 chunk_contents.push(bytes);
294 Some(address)
295 }
296 };
297
298 let chunk_count = chunk_contents.len();
299 let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_contents
300 .into_iter()
301 .map(|content| {
302 let address = compute_address(&content);
303 (content, address)
304 })
305 .collect();
306
307 let quote_limiter = self.controller().quote.clone();
308 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
309 let results: Vec<([u8; 32], Result<Option<PreparedChunk>>)> =
310 futures::stream::iter(chunks_with_addr)
311 .map(|(content, address)| {
312 let limiter = quote_limiter.clone();
313 async move {
314 let result = observe_op(
315 &limiter,
316 || async move { self.prepare_chunk_payment(content).await },
317 classify_error,
318 )
319 .await;
320 (address, result)
321 }
322 })
323 .buffer_unordered(quote_concurrency)
324 .collect()
325 .await;
326
327 let mut prepared_chunks = Vec::with_capacity(results.len());
328 let mut already_stored_addresses = Vec::new();
329 for (address, result) in results {
330 match result? {
331 Some(prepared) => prepared_chunks.push(prepared),
332 None => already_stored_addresses.push(address),
333 }
334 }
335
336 if let Some(addr) = data_map_address {
337 if already_stored_addresses.contains(&addr) {
338 info!(
339 "Public upload: DataMap chunk {} was already stored \
340 on the network — address is retrievable without a \
341 new payment",
342 hex::encode(addr)
343 );
344 }
345 }
346
347 let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
348
349 info!(
350 "Data prepared for external signing: {} chunks, {} already stored, total {} atto ({content_len} bytes)",
351 prepared_chunks.len(),
352 already_stored_addresses.len(),
353 payment_intent.total_amount,
354 );
355
356 Ok(PreparedUpload {
357 data_map,
358 payment_info: ExternalPaymentInfo::WaveBatch {
359 prepared_chunks,
360 payment_intent,
361 },
362 data_map_address,
363 already_stored_addresses,
364 total_chunks: chunk_count,
365 })
366 }
367
368 pub async fn data_map_store(&self, data_map: &DataMap) -> Result<[u8; 32]> {
378 let serialized = rmp_serde::to_vec(data_map)
379 .map_err(|e| Error::Serialization(format!("Failed to serialize DataMap: {e}")))?;
380
381 info!(
382 "Storing DataMap as public chunk ({} bytes serialized)",
383 serialized.len()
384 );
385
386 self.chunk_put(Bytes::from(serialized)).await
387 }
388
389 pub async fn data_map_fetch(&self, address: &[u8; 32]) -> Result<DataMap> {
397 let chunk = self.chunk_get(address).await?.ok_or_else(|| {
398 Error::InvalidData(format!(
399 "DataMap chunk not found at {}",
400 hex::encode(address)
401 ))
402 })?;
403
404 rmp_serde::from_slice(&chunk.content)
405 .map_err(|e| Error::Serialization(format!("Failed to deserialize DataMap: {e}")))
406 }
407
408 pub async fn data_download(&self, data_map: &DataMap) -> Result<Bytes> {
419 let chunk_infos = data_map.infos();
420 debug!("Downloading data ({} chunks)", chunk_infos.len());
421
422 let addresses: Vec<[u8; 32]> = chunk_infos.iter().map(|info| info.dst_hash.0).collect();
425
426 let fetch_limiter = self.controller().fetch.clone();
432 let encrypted_chunks: Vec<EncryptedChunk> = rebucketed_ordered(
433 &fetch_limiter,
434 addresses.into_iter().enumerate(),
435 |(idx, address)| {
436 async move {
437 let chunk = self.chunk_get_observed(&address).await?.ok_or_else(|| {
442 Error::InvalidData(format!(
443 "Missing chunk {} required for data reconstruction",
444 hex::encode(address)
445 ))
446 })?;
447 Ok::<_, Error>((
448 idx,
449 EncryptedChunk {
450 content: chunk.content,
451 },
452 ))
453 }
454 },
455 )
456 .await?;
457
458 debug!(
459 "All {} chunks retrieved, decrypting",
460 encrypted_chunks.len()
461 );
462
463 let content = decrypt(data_map, &encrypted_chunks)
464 .map_err(|e| Error::Encryption(format!("Failed to decrypt data: {e}")))?;
465
466 info!("Data downloaded and decrypted ({} bytes)", content.len());
467
468 Ok(content)
469 }
470}
471
472#[cfg(test)]
480mod send_assertions {
481 use super::*;
482
483 fn _assert_send<T: Send>(_: &T) {}
484
485 #[allow(
486 dead_code,
487 unreachable_code,
488 unused_variables,
489 clippy::diverging_sub_expression
490 )]
491 async fn _data_download_is_send(client: &Client) {
492 let dm: DataMap = todo!();
493 let fut = client.data_download(&dm);
494 _assert_send(&fut);
495 }
496
497 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
498 async fn _data_upload_is_send(client: &Client) {
499 let fut = client.data_upload(Bytes::new());
500 _assert_send(&fut);
501 }
502
503 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
504 async fn _data_upload_with_mode_is_send(client: &Client) {
505 let fut = client.data_upload_with_mode(Bytes::new(), PaymentMode::Auto);
506 _assert_send(&fut);
507 }
508
509 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
510 async fn _data_prepare_upload_is_send(client: &Client) {
511 let fut = client.data_prepare_upload(Bytes::new());
512 _assert_send(&fut);
513 }
514
515 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
516 async fn _data_prepare_upload_with_visibility_is_send(client: &Client) {
517 let fut = client.data_prepare_upload_with_visibility(Bytes::new(), Visibility::Public);
518 _assert_send(&fut);
519 }
520}