1use crate::data::client::adaptive::{observe_op, rebucketed_ordered};
10use crate::data::client::batch::{PaymentIntent, PreparedChunk};
11use crate::data::client::classify_error;
12use crate::data::client::file::{ExternalPaymentInfo, PreparedUpload, Visibility};
13use crate::data::client::merkle::{chunk_contents_for_upload_addresses, PaymentMode};
14use crate::data::client::Client;
15use crate::data::error::{Error, Result};
16use ant_protocol::{compute_address, DATA_TYPE_CHUNK};
17use bytes::Bytes;
18use futures::stream::StreamExt;
19use self_encryption::{decrypt, encrypt, DataMap, EncryptedChunk};
20use std::num::NonZeroUsize;
21use tracing::{debug, info};
22
23#[derive(Debug, Clone)]
25pub struct DataUploadResult {
26 pub data_map: DataMap,
28 pub chunks_stored: usize,
30 pub payment_mode_used: PaymentMode,
32}
33
34impl Client {
35 pub async fn data_upload(&self, content: Bytes) -> Result<DataUploadResult> {
45 let content_len = content.len();
46 debug!("Encrypting data ({content_len} bytes)");
47
48 let (data_map, encrypted_chunks) = encrypt(content)
49 .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
50
51 info!("Data encrypted into {} chunks", encrypted_chunks.len());
52
53 let chunk_contents: Vec<Bytes> = encrypted_chunks
54 .into_iter()
55 .map(|chunk| chunk.content)
56 .collect();
57
58 let (addresses, _storage_cost, _gas_cost) =
59 self.batch_upload_chunks(chunk_contents).await?;
60 let chunks_stored = addresses.len();
61
62 info!("Data uploaded: {chunks_stored} chunks stored ({content_len} bytes original)");
63
64 Ok(DataUploadResult {
65 data_map,
66 chunks_stored,
67 payment_mode_used: PaymentMode::Single,
68 })
69 }
70
71 pub async fn data_upload_with_mode(
81 &self,
82 content: Bytes,
83 mode: PaymentMode,
84 ) -> Result<DataUploadResult> {
85 let content_len = content.len();
86 debug!("Encrypting data ({content_len} bytes) with mode {mode:?}");
87
88 let (data_map, encrypted_chunks) = encrypt(content)
89 .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
90
91 let chunk_count = encrypted_chunks.len();
92 info!("Data encrypted into {chunk_count} chunks");
93
94 let chunk_contents: Vec<Bytes> = encrypted_chunks
95 .into_iter()
96 .map(|chunk| chunk.content)
97 .collect();
98
99 if self.should_use_merkle(chunk_count, mode) {
100 info!("Using merkle batch payment for {chunk_count} chunks");
102
103 let chunk_entries: Vec<([u8; 32], u64)> = chunk_contents
104 .iter()
105 .map(|chunk| {
106 let size = u64::try_from(chunk.len())
107 .map_err(|e| Error::InvalidData(format!("chunk size too large: {e}")))?;
108 Ok((compute_address(chunk), size))
109 })
110 .collect::<Result<Vec<_>>>()?;
111 let merkle_plan = match self
112 .plan_merkle_upload(chunk_entries, DATA_TYPE_CHUNK, None)
113 .await
114 {
115 Ok(plan) => plan,
116 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
117 info!("Merkle preflight needs more peers ({msg}), falling back to wave-batch");
118 let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
119 return Ok(DataUploadResult {
120 data_map,
121 chunks_stored: addresses.len(),
122 payment_mode_used: PaymentMode::Single,
123 });
124 }
125 Err(e) => return Err(e),
126 };
127
128 if merkle_plan.to_upload.is_empty() {
129 info!("All {chunk_count} chunks already stored; skipping merkle payment");
130 return Ok(DataUploadResult {
131 data_map,
132 chunks_stored: chunk_count,
133 payment_mode_used: PaymentMode::Merkle,
134 });
135 }
136
137 let chunk_contents =
138 chunk_contents_for_upload_addresses(chunk_contents, &merkle_plan.to_upload)?;
139
140 let remaining_chunks = merkle_plan.to_upload.len();
141 if !self.should_use_merkle(remaining_chunks, mode) {
142 info!(
143 "{remaining_chunks} chunks need upload after merkle preflight; \
144 using single-node payment"
145 );
146 let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
147 return Ok(DataUploadResult {
148 data_map,
149 chunks_stored: merkle_plan.already_stored.len() + addresses.len(),
150 payment_mode_used: PaymentMode::Single,
151 });
152 }
153
154 let batch_result = match self
156 .pay_for_merkle_batch(
157 &merkle_plan.to_upload,
158 DATA_TYPE_CHUNK,
159 merkle_plan.to_upload_avg_size(),
160 )
161 .await
162 {
163 Ok(result) => result,
164 Err(Error::InsufficientPeers(ref msg)) if mode == PaymentMode::Auto => {
165 info!("Merkle needs more peers ({msg}), falling back to wave-batch");
166 let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
167 return Ok(DataUploadResult {
168 data_map,
169 chunks_stored: merkle_plan.already_stored.len() + addresses.len(),
170 payment_mode_used: PaymentMode::Single,
171 });
172 }
173 Err(e) => return Err(e),
174 };
175
176 let outcome = self
177 .merkle_upload_chunks(
178 chunk_contents,
179 merkle_plan.to_upload,
180 &batch_result,
181 None,
182 merkle_plan.already_stored.len(),
183 chunk_count,
184 )
185 .await?;
186 if outcome.failed > 0 {
192 return Err(Error::InsufficientPeers(format!(
193 "Data merkle upload incomplete: {} of {} chunk(s) short of quorum after retries",
194 outcome.failed, chunk_count
195 )));
196 }
197
198 info!(
199 "Data uploaded via merkle: {} chunks stored ({content_len} bytes)",
200 outcome.stored
201 );
202 Ok(DataUploadResult {
203 data_map,
204 chunks_stored: outcome.stored,
205 payment_mode_used: PaymentMode::Merkle,
206 })
207 } else {
208 let (addresses, _sc, _gc) = self.batch_upload_chunks(chunk_contents).await?;
210
211 info!(
212 "Data uploaded: {} chunks stored ({content_len} bytes original)",
213 addresses.len()
214 );
215 Ok(DataUploadResult {
216 data_map,
217 chunks_stored: addresses.len(),
218 payment_mode_used: PaymentMode::Single,
219 })
220 }
221 }
222
223 pub async fn data_prepare_upload(&self, content: Bytes) -> Result<PreparedUpload> {
228 self.data_prepare_upload_with_visibility(content, Visibility::Private)
229 .await
230 }
231
232 pub async fn data_prepare_upload_with_visibility(
259 &self,
260 content: Bytes,
261 visibility: Visibility,
262 ) -> Result<PreparedUpload> {
263 let content_len = content.len();
264 debug!("Preparing data upload for external signing (visibility={visibility:?}, {content_len} bytes)");
265
266 let (data_map, encrypted_chunks) = encrypt(content)
267 .map_err(|e| Error::Encryption(format!("Failed to encrypt data: {e}")))?;
268
269 let mut chunk_contents: Vec<Bytes> = encrypted_chunks
270 .into_iter()
271 .map(|chunk| chunk.content)
272 .collect();
273
274 info!("Data encrypted into {} chunks", chunk_contents.len());
275
276 let data_map_address = match visibility {
282 Visibility::Private => None,
283 Visibility::Public => {
284 let serialized = rmp_serde::to_vec(&data_map).map_err(|e| {
285 Error::Serialization(format!("Failed to serialize DataMap: {e}"))
286 })?;
287 let bytes = Bytes::from(serialized);
288 let address = compute_address(&bytes);
289 info!(
290 "Public upload: bundling DataMap chunk ({} bytes) at address {}",
291 bytes.len(),
292 hex::encode(address)
293 );
294 chunk_contents.push(bytes);
295 Some(address)
296 }
297 };
298
299 let chunk_count = chunk_contents.len();
300 let chunks_with_addr: Vec<(Bytes, [u8; 32])> = chunk_contents
301 .into_iter()
302 .map(|content| {
303 let address = compute_address(&content);
304 (content, address)
305 })
306 .collect();
307
308 let quote_limiter = self.controller().quote.clone();
309 let quote_concurrency = quote_limiter.current().min(chunk_count.max(1));
310 let results: Vec<([u8; 32], Result<Option<PreparedChunk>>)> =
311 futures::stream::iter(chunks_with_addr)
312 .map(|(content, address)| {
313 let limiter = quote_limiter.clone();
314 async move {
315 let result = observe_op(
316 &limiter,
317 || async move { self.prepare_chunk_payment(content).await },
318 classify_error,
319 )
320 .await;
321 (address, result)
322 }
323 })
324 .buffer_unordered(quote_concurrency)
325 .collect()
326 .await;
327
328 let mut prepared_chunks = Vec::with_capacity(results.len());
329 let mut already_stored_addresses = Vec::new();
330 for (address, result) in results {
331 match result? {
332 Some(prepared) => prepared_chunks.push(prepared),
333 None => already_stored_addresses.push(address),
334 }
335 }
336
337 if let Some(addr) = data_map_address {
338 if already_stored_addresses.contains(&addr) {
339 info!(
340 "Public upload: DataMap chunk {} was already stored \
341 on the network — address is retrievable without a \
342 new payment",
343 hex::encode(addr)
344 );
345 }
346 }
347
348 let payment_intent = PaymentIntent::from_prepared_chunks(&prepared_chunks);
349
350 info!(
351 "Data prepared for external signing: {} chunks, {} already stored, total {} atto ({content_len} bytes)",
352 prepared_chunks.len(),
353 already_stored_addresses.len(),
354 payment_intent.total_amount,
355 );
356
357 Ok(PreparedUpload {
358 data_map,
359 payment_info: ExternalPaymentInfo::WaveBatch {
360 prepared_chunks,
361 payment_intent,
362 },
363 data_map_address,
364 already_stored_addresses,
365 total_chunks: chunk_count,
366 })
367 }
368
369 pub async fn data_map_store(&self, data_map: &DataMap) -> Result<[u8; 32]> {
379 let serialized = rmp_serde::to_vec(data_map)
380 .map_err(|e| Error::Serialization(format!("Failed to serialize DataMap: {e}")))?;
381
382 info!(
383 "Storing DataMap as public chunk ({} bytes serialized)",
384 serialized.len()
385 );
386
387 self.chunk_put(Bytes::from(serialized)).await
388 }
389
390 pub async fn data_map_fetch(&self, address: &[u8; 32]) -> Result<DataMap> {
398 let chunk = self.chunk_get(address).await?.ok_or_else(|| {
399 Error::InvalidData(format!(
400 "DataMap chunk not found at {}",
401 hex::encode(address)
402 ))
403 })?;
404
405 decode_data_map_chunk(&chunk.content)
406 }
407
408 pub async fn data_map_fetch_from_closest_peers(
415 &self,
416 address: &[u8; 32],
417 peer_count: NonZeroUsize,
418 ) -> Result<DataMap> {
419 let chunk = self
420 .chunk_get_from_closest_peers(address, peer_count.get())
421 .await?
422 .ok_or_else(|| {
423 Error::InvalidData(format!(
424 "DataMap chunk not found at {}",
425 hex::encode(address)
426 ))
427 })?;
428
429 decode_data_map_chunk(&chunk.content)
430 }
431
432 pub async fn data_download(&self, data_map: &DataMap) -> Result<Bytes> {
443 let chunk_infos = data_map.infos();
444 debug!("Downloading data ({} chunks)", chunk_infos.len());
445
446 let addresses: Vec<[u8; 32]> = chunk_infos.iter().map(|info| info.dst_hash.0).collect();
449
450 let fetch_limiter = self.controller().fetch.clone();
456 let encrypted_chunks: Vec<EncryptedChunk> = rebucketed_ordered(
457 &fetch_limiter,
458 addresses.into_iter().enumerate(),
459 |(idx, address)| {
460 async move {
461 let chunk = self.chunk_get_observed(&address).await?.ok_or_else(|| {
466 Error::InvalidData(format!(
467 "Missing chunk {} required for data reconstruction",
468 hex::encode(address)
469 ))
470 })?;
471 Ok::<_, Error>((
472 idx,
473 EncryptedChunk {
474 content: chunk.content,
475 },
476 ))
477 }
478 },
479 )
480 .await?;
481
482 debug!(
483 "All {} chunks retrieved, decrypting",
484 encrypted_chunks.len()
485 );
486
487 let content = decrypt(data_map, &encrypted_chunks)
488 .map_err(|e| Error::Encryption(format!("Failed to decrypt data: {e}")))?;
489
490 info!("Data downloaded and decrypted ({} bytes)", content.len());
491
492 Ok(content)
493 }
494}
495
496fn decode_data_map_chunk(content: &[u8]) -> Result<DataMap> {
497 rmp_serde::from_slice(content)
498 .map_err(|e| Error::Serialization(format!("Failed to deserialize DataMap: {e}")))
499}
500
501#[cfg(test)]
509mod send_assertions {
510 use super::*;
511
512 fn _assert_send<T: Send>(_: &T) {}
513
514 #[allow(
515 dead_code,
516 unreachable_code,
517 unused_variables,
518 clippy::diverging_sub_expression
519 )]
520 async fn _data_download_is_send(client: &Client) {
521 let dm: DataMap = todo!();
522 let fut = client.data_download(&dm);
523 _assert_send(&fut);
524 }
525
526 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
527 async fn _data_upload_is_send(client: &Client) {
528 let fut = client.data_upload(Bytes::new());
529 _assert_send(&fut);
530 }
531
532 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
533 async fn _data_upload_with_mode_is_send(client: &Client) {
534 let fut = client.data_upload_with_mode(Bytes::new(), PaymentMode::Auto);
535 _assert_send(&fut);
536 }
537
538 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
539 async fn _data_prepare_upload_is_send(client: &Client) {
540 let fut = client.data_prepare_upload(Bytes::new());
541 _assert_send(&fut);
542 }
543
544 #[allow(dead_code, unreachable_code, clippy::diverging_sub_expression)]
545 async fn _data_prepare_upload_with_visibility_is_send(client: &Client) {
546 let fut = client.data_prepare_upload_with_visibility(Bytes::new(), Visibility::Public);
547 _assert_send(&fut);
548 }
549}