1use super::get_progress_bar;
10use crate::utils::duration_to_minute_seconds_string;
11use crate::ChunkManager;
12use bytes::Bytes;
13use color_eyre::{eyre::eyre, Report, Result};
14use futures::StreamExt;
15use rand::prelude::SliceRandom;
16use rand::thread_rng;
17use sn_client::{
18 transfers::{TransferError, WalletError},
19 Client, Error as ClientError, UploadCfg, UploadEvent, UploadSummary, Uploader,
20};
21use sn_protocol::storage::{Chunk, ChunkAddress};
22use std::{
23 ffi::OsString,
24 path::{Path, PathBuf},
25 time::{Duration, Instant},
26};
27use tokio::{sync::mpsc::Receiver, task::JoinHandle};
28use tracing::{debug, error, info, warn};
29use walkdir::{DirEntry, WalkDir};
30use xor_name::XorName;
31
32pub struct FilesUploadSummary {
34 pub upload_summary: UploadSummary,
36 pub completed_files: Vec<(PathBuf, OsString, ChunkAddress)>,
38 pub incomplete_files: Vec<(PathBuf, OsString, ChunkAddress)>,
40}
41
42pub trait FilesUploadStatusNotifier: Send {
44 fn collect_entries(&mut self, entries_iter: Vec<DirEntry>);
45 fn collect_paths(&mut self, path: &Path);
46 fn on_verifying_uploaded_chunks_init(&self, chunks_len: usize);
47 fn on_verifying_uploaded_chunks_success(
48 &self,
49 completed_files: &[(PathBuf, OsString, ChunkAddress)],
50 make_data_public: bool,
51 );
52 fn on_verifying_uploaded_chunks_failure(&self, failed_chunks_len: usize);
53 fn on_failed_to_upload_all_files(
54 &self,
55 incomplete_files: Vec<(&PathBuf, &OsString, &ChunkAddress)>,
56 completed_files: &[(PathBuf, OsString, ChunkAddress)],
57 make_data_public: bool,
58 );
59 fn on_chunking_complete(
60 &self,
61 upload_cfg: &UploadCfg,
62 make_data_public: bool,
63 chunks_to_upload_len: usize,
64 );
65 fn on_upload_complete(
66 &self,
67 upload_sum: &UploadSummary,
68 elapsed_time: Duration,
69 chunks_to_upload_len: usize,
70 );
71}
72
73pub struct FilesUploader {
75 client: Client,
76 root_dir: PathBuf,
77 entries_to_upload: Vec<DirEntry>,
79 status_notifier: Option<Box<dyn FilesUploadStatusNotifier>>,
81 make_data_public: bool,
83 upload_cfg: UploadCfg,
84}
85
86impl FilesUploader {
87 pub fn new(client: Client, root_dir: PathBuf) -> Self {
88 let status_notifier = Box::new(StdOutPrinter {
89 file_paths_to_print: Default::default(),
90 });
91 Self {
92 client,
93 root_dir,
94 entries_to_upload: Default::default(),
95 status_notifier: Some(status_notifier),
96 make_data_public: false,
97 upload_cfg: Default::default(),
98 }
99 }
100
101 pub fn set_upload_cfg(mut self, cfg: UploadCfg) -> Self {
102 self.upload_cfg = cfg;
103 self
104 }
105
106 pub fn set_make_data_public(mut self, make_data_public: bool) -> Self {
107 self.make_data_public = make_data_public;
108 self
109 }
110
111 pub fn set_status_notifier(
113 mut self,
114 status_notifier: Box<dyn FilesUploadStatusNotifier>,
115 ) -> Self {
116 self.status_notifier = Some(status_notifier);
117 self
118 }
119
120 pub fn insert_entries(mut self, entries_iter: impl IntoIterator<Item = DirEntry>) -> Self {
121 self.entries_to_upload.extend(entries_iter);
122 self
123 }
124
125 pub fn insert_path(mut self, path: &Path) -> Self {
126 if let Some(notifier) = &mut self.status_notifier {
127 notifier.collect_paths(path);
128 }
129 let entries = WalkDir::new(path).into_iter().flatten();
130 self.entries_to_upload.extend(entries);
131 self
132 }
133
134 pub async fn start_upload(mut self) -> Result<FilesUploadSummary> {
135 let mut chunk_manager = ChunkManager::new(&self.root_dir);
136 let chunks_to_upload = self.get_chunks_to_upload(&mut chunk_manager).await?;
137 let chunks_to_upload_len = chunks_to_upload.len();
138
139 if let Some(notifier) = &self.status_notifier {
141 notifier.on_chunking_complete(
142 &self.upload_cfg,
143 self.make_data_public,
144 chunks_to_upload_len,
145 );
146 }
147
148 let now = Instant::now();
149 let mut uploader = Uploader::new(self.client, self.root_dir);
150 uploader.set_upload_cfg(self.upload_cfg);
151 uploader.insert_chunk_paths(chunks_to_upload);
152
153 let events_handle = Self::spawn_upload_events_handler(
154 chunk_manager,
155 self.make_data_public,
156 chunks_to_upload_len,
157 uploader.get_event_receiver(),
158 self.status_notifier.take(),
159 )?;
160
161 let upload_sum = match uploader.start_upload().await {
162 Ok(summary) => summary,
163 Err(ClientError::Wallet(WalletError::Transfer(TransferError::NotEnoughBalance(
164 available,
165 required,
166 )))) => {
167 return Err(eyre!(
168 "Not enough balance in wallet to pay for chunk. \
169 We have {available:?} but need {required:?} to pay for the chunk"
170 ))
171 }
172 Err(err) => return Err(eyre!("Failed to upload chunk batch: {err}")),
173 };
174 let (chunk_manager, status_notifier) = events_handle.await??;
175 self.status_notifier = status_notifier;
176
177 if let Some(notifier) = &self.status_notifier {
179 notifier.on_upload_complete(&upload_sum, now.elapsed(), chunks_to_upload_len);
180 }
181
182 let summary = FilesUploadSummary {
183 upload_summary: upload_sum,
184 completed_files: chunk_manager.completed_files().clone(),
185 incomplete_files: chunk_manager
186 .incomplete_files()
187 .into_iter()
188 .map(|(path, file_name, head_address)| {
189 (path.clone(), file_name.clone(), *head_address)
190 })
191 .collect(),
192 };
193 Ok(summary)
194 }
195
196 async fn get_chunks_to_upload(
199 &self,
200 chunk_manager: &mut ChunkManager,
201 ) -> Result<Vec<(XorName, PathBuf)>> {
202 chunk_manager.chunk_with_iter(
204 self.entries_to_upload.iter().cloned(),
205 true,
206 self.make_data_public,
207 )?;
208 let mut chunks_to_upload = if !chunk_manager.is_chunks_empty() {
210 chunk_manager.get_chunks()
211 } else {
212 let chunks = chunk_manager.already_put_chunks(
214 self.entries_to_upload.iter().cloned(),
215 self.make_data_public,
216 )?;
217
218 if let Some(notifier) = &self.status_notifier {
220 notifier.on_verifying_uploaded_chunks_init(chunks.len());
221 }
222
223 let failed_chunks = self.verify_uploaded_chunks(&chunks).await?;
224
225 chunk_manager.mark_completed(
226 chunks
227 .into_iter()
228 .filter(|c| !failed_chunks.contains(c))
229 .map(|(xor, _)| xor),
230 )?;
231
232 if failed_chunks.is_empty() {
233 if let Some(notifier) = &self.status_notifier {
235 notifier.on_verifying_uploaded_chunks_success(
236 chunk_manager.completed_files(),
237 self.make_data_public,
238 );
239 }
240
241 return Ok(vec![]);
242 }
243 if let Some(notifier) = &self.status_notifier {
245 notifier.on_verifying_uploaded_chunks_failure(failed_chunks.len());
246 }
247 failed_chunks
248 };
249 let mut rng = thread_rng();
251 chunks_to_upload.shuffle(&mut rng);
252
253 Ok(chunks_to_upload)
254 }
255
256 async fn verify_uploaded_chunks(
257 &self,
258 chunks_paths: &[(XorName, PathBuf)],
259 ) -> Result<Vec<(XorName, PathBuf)>> {
260 let mut stream = futures::stream::iter(chunks_paths)
261 .map(|(xorname, path)| async move {
262 let chunk = Chunk::new(Bytes::from(std::fs::read(path)?));
263 let res = self.client.verify_chunk_stored(&chunk).await;
264 Ok::<_, Report>((xorname, path.clone(), res.is_err()))
265 })
266 .buffer_unordered(self.upload_cfg.batch_size);
267 let mut failed_chunks = Vec::new();
268
269 while let Some(result) = stream.next().await {
270 let (xorname, path, is_error) = result?;
271 if is_error {
272 warn!("Failed to fetch a chunk {xorname:?}");
273 failed_chunks.push((*xorname, path));
274 }
275 }
276
277 Ok(failed_chunks)
278 }
279
280 #[expect(clippy::type_complexity)]
281 fn spawn_upload_events_handler(
282 mut chunk_manager: ChunkManager,
283 make_data_public: bool,
284 chunks_to_upload_len: usize,
285 mut upload_event_rx: Receiver<UploadEvent>,
286 status_notifier: Option<Box<dyn FilesUploadStatusNotifier>>,
287 ) -> Result<JoinHandle<Result<(ChunkManager, Option<Box<dyn FilesUploadStatusNotifier>>)>>>
288 {
289 let progress_bar = get_progress_bar(chunks_to_upload_len as u64)?;
290 let handle = tokio::spawn(async move {
291 let mut upload_terminated_with_error = false;
292 while let Some(event) = upload_event_rx.recv().await {
295 match event {
296 UploadEvent::ChunkUploaded(addr)
297 | UploadEvent::ChunkAlreadyExistsInNetwork(addr) => {
298 progress_bar.clone().inc(1);
299 if let Err(err) =
300 chunk_manager.mark_completed(std::iter::once(*addr.xorname()))
301 {
302 error!("Failed to mark chunk {addr:?} as completed: {err:?}");
303 }
304 }
305 UploadEvent::Error => {
306 upload_terminated_with_error = true;
307 }
308 UploadEvent::RegisterUploaded { .. }
309 | UploadEvent::RegisterUpdated { .. }
310 | UploadEvent::PaymentMade { .. } => {}
311 }
312 }
313 progress_bar.finish_and_clear();
314
315 if upload_terminated_with_error {
319 error!("Got UploadEvent::Error inside upload event loop");
320 } else {
321 if let Some(notifier) = &status_notifier {
323 notifier.on_failed_to_upload_all_files(
324 chunk_manager.incomplete_files(),
325 chunk_manager.completed_files(),
326 make_data_public,
327 );
328 }
329 }
330
331 Ok::<_, Report>((chunk_manager, status_notifier))
332 });
333
334 Ok(handle)
335 }
336}
337
338struct StdOutPrinter {
340 file_paths_to_print: Vec<PathBuf>,
341}
342
343impl FilesUploadStatusNotifier for StdOutPrinter {
344 fn collect_entries(&mut self, _entries_iter: Vec<DirEntry>) {}
345
346 fn collect_paths(&mut self, path: &Path) {
347 self.file_paths_to_print.push(path.to_path_buf());
348 }
349
350 fn on_verifying_uploaded_chunks_init(&self, chunks_len: usize) {
351 println!("Files upload attempted previously, verifying {chunks_len} chunks",);
352 }
353
354 fn on_verifying_uploaded_chunks_success(
355 &self,
356 completed_files: &[(PathBuf, OsString, ChunkAddress)],
357 make_data_public: bool,
358 ) {
359 println!("All files were already uploaded and verified");
360 Self::print_uploaded_msg(make_data_public);
361
362 if completed_files.is_empty() {
363 println!("chunk_manager doesn't have any verified_files, nor any failed_chunks to re-upload.");
364 }
365 Self::print_completed_file_list(completed_files);
366 }
367
368 fn on_verifying_uploaded_chunks_failure(&self, failed_chunks_len: usize) {
369 println!("{failed_chunks_len} chunks were uploaded in the past but failed to verify. Will attempt to upload them again...");
370 }
371
372 fn on_failed_to_upload_all_files(
373 &self,
374 incomplete_files: Vec<(&PathBuf, &OsString, &ChunkAddress)>,
375 completed_files: &[(PathBuf, OsString, ChunkAddress)],
376 make_data_public: bool,
377 ) {
378 for (_, file_name, _) in incomplete_files {
379 if let Some(file_name) = file_name.to_str() {
380 println!("Unverified file \"{file_name}\", suggest to re-upload again.");
381 info!("Unverified {file_name}");
382 } else {
383 println!("Unverified file \"{file_name:?}\", suggest to re-upload again.");
384 info!("Unverified file {file_name:?}");
385 }
386 }
387
388 Self::print_uploaded_msg(make_data_public);
390 Self::print_completed_file_list(completed_files);
391 }
392
393 fn on_chunking_complete(
394 &self,
395 upload_cfg: &UploadCfg,
396 make_data_public: bool,
397 chunks_to_upload_len: usize,
398 ) {
399 for path in self.file_paths_to_print.iter() {
400 debug!(
401 "Uploading file(s) from {path:?} batch size {:?} will verify?: {}",
402 upload_cfg.batch_size, upload_cfg.verify_store
403 );
404 if make_data_public {
405 info!("{path:?} will be made public and linkable");
406 println!("{path:?} will be made public and linkable");
407 }
408 }
409 if self.file_paths_to_print.len() == 1 {
410 println!(
411 "Splitting and uploading {:?} into {chunks_to_upload_len} chunks",
412 self.file_paths_to_print[0]
413 );
414 } else {
415 println!(
416 "Splitting and uploading {:?} into {chunks_to_upload_len} chunks",
417 self.file_paths_to_print
418 );
419 }
420 }
421
422 fn on_upload_complete(
423 &self,
424 upload_sum: &UploadSummary,
425 elapsed_time: Duration,
426 chunks_to_upload_len: usize,
427 ) {
428 let elapsed = duration_to_minute_seconds_string(elapsed_time);
429
430 println!(
431 "Among {chunks_to_upload_len} chunks, found {} already existed in network, uploaded \
432 the leftover {} chunks in {elapsed}",
433 upload_sum.skipped_count, upload_sum.uploaded_count,
434 );
435 info!(
436 "Among {chunks_to_upload_len} chunks, found {} already existed in network, uploaded \
437 the leftover {} chunks in {elapsed}",
438 upload_sum.skipped_count, upload_sum.uploaded_count,
439 );
440 println!("**************************************");
441 println!("* Payment Details *");
442 println!("**************************************");
443 println!(
444 "Made payment of {:?} for {} chunks",
445 upload_sum.storage_cost, upload_sum.uploaded_count
446 );
447 println!(
448 "Made payment of {:?} for royalties fees",
449 upload_sum.royalty_fees
450 );
451 println!("New wallet balance: {}", upload_sum.final_balance);
452 }
453}
454
455impl StdOutPrinter {
456 fn print_completed_file_list(completed_files: &[(PathBuf, OsString, ChunkAddress)]) {
457 for (_, file_name, addr) in completed_files {
458 let hex_addr = addr.to_hex();
459 if let Some(file_name) = file_name.to_str() {
460 println!("Uploaded \"{file_name}\" to address {hex_addr}");
461 info!("Uploaded {file_name} to {hex_addr}");
462 } else {
463 println!("Uploaded \"{file_name:?}\" to address {hex_addr}");
464 info!("Uploaded {file_name:?} to {hex_addr}");
465 }
466 }
467 }
468
469 fn print_uploaded_msg(make_data_public: bool) {
470 println!("**************************************");
471 println!("* Uploaded Files *");
472 if !make_data_public {
473 println!("* *");
474 println!("* These are not public by default. *");
475 println!("* Reupload with `-p` option *");
476 println!("* to publish the datamaps. *");
477 }
478 println!("**************************************");
479 }
480}