sn_cli/files/
files_uploader.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use super::get_progress_bar;
10use crate::utils::duration_to_minute_seconds_string;
11use crate::ChunkManager;
12use bytes::Bytes;
13use color_eyre::{eyre::eyre, Report, Result};
14use futures::StreamExt;
15use rand::prelude::SliceRandom;
16use rand::thread_rng;
17use sn_client::{
18    transfers::{TransferError, WalletError},
19    Client, Error as ClientError, UploadCfg, UploadEvent, UploadSummary, Uploader,
20};
21use sn_protocol::storage::{Chunk, ChunkAddress};
22use std::{
23    ffi::OsString,
24    path::{Path, PathBuf},
25    time::{Duration, Instant},
26};
27use tokio::{sync::mpsc::Receiver, task::JoinHandle};
28use tracing::{debug, error, info, warn};
29use walkdir::{DirEntry, WalkDir};
30use xor_name::XorName;
31
32/// The result of a successful files upload.
33pub struct FilesUploadSummary {
34    /// The cost and count summary of the upload.
35    pub upload_summary: UploadSummary,
36    /// The list of completed files (FilePath, FileName, HeadChunkAddress)
37    pub completed_files: Vec<(PathBuf, OsString, ChunkAddress)>,
38    /// The list of incomplete files (FilePath, FileName, HeadChunkAddress)
39    pub incomplete_files: Vec<(PathBuf, OsString, ChunkAddress)>,
40}
41
42/// A trait designed to customize the standard output behavior for file upload processes.
43pub trait FilesUploadStatusNotifier: Send {
44    fn collect_entries(&mut self, entries_iter: Vec<DirEntry>);
45    fn collect_paths(&mut self, path: &Path);
46    fn on_verifying_uploaded_chunks_init(&self, chunks_len: usize);
47    fn on_verifying_uploaded_chunks_success(
48        &self,
49        completed_files: &[(PathBuf, OsString, ChunkAddress)],
50        make_data_public: bool,
51    );
52    fn on_verifying_uploaded_chunks_failure(&self, failed_chunks_len: usize);
53    fn on_failed_to_upload_all_files(
54        &self,
55        incomplete_files: Vec<(&PathBuf, &OsString, &ChunkAddress)>,
56        completed_files: &[(PathBuf, OsString, ChunkAddress)],
57        make_data_public: bool,
58    );
59    fn on_chunking_complete(
60        &self,
61        upload_cfg: &UploadCfg,
62        make_data_public: bool,
63        chunks_to_upload_len: usize,
64    );
65    fn on_upload_complete(
66        &self,
67        upload_sum: &UploadSummary,
68        elapsed_time: Duration,
69        chunks_to_upload_len: usize,
70    );
71}
72
73/// Combines the `Uploader` along with the `ChunkManager`
74pub struct FilesUploader {
75    client: Client,
76    root_dir: PathBuf,
77    /// entries to upload
78    entries_to_upload: Vec<DirEntry>,
79    /// The status notifier that can be overridden to perform custom actions instead of printing things to stdout.
80    status_notifier: Option<Box<dyn FilesUploadStatusNotifier>>,
81    /// config
82    make_data_public: bool,
83    upload_cfg: UploadCfg,
84}
85
86impl FilesUploader {
87    pub fn new(client: Client, root_dir: PathBuf) -> Self {
88        let status_notifier = Box::new(StdOutPrinter {
89            file_paths_to_print: Default::default(),
90        });
91        Self {
92            client,
93            root_dir,
94            entries_to_upload: Default::default(),
95            status_notifier: Some(status_notifier),
96            make_data_public: false,
97            upload_cfg: Default::default(),
98        }
99    }
100
101    pub fn set_upload_cfg(mut self, cfg: UploadCfg) -> Self {
102        self.upload_cfg = cfg;
103        self
104    }
105
106    pub fn set_make_data_public(mut self, make_data_public: bool) -> Self {
107        self.make_data_public = make_data_public;
108        self
109    }
110
111    /// Override the default status notifier. By default we print things to stdout.
112    pub fn set_status_notifier(
113        mut self,
114        status_notifier: Box<dyn FilesUploadStatusNotifier>,
115    ) -> Self {
116        self.status_notifier = Some(status_notifier);
117        self
118    }
119
120    pub fn insert_entries(mut self, entries_iter: impl IntoIterator<Item = DirEntry>) -> Self {
121        self.entries_to_upload.extend(entries_iter);
122        self
123    }
124
125    pub fn insert_path(mut self, path: &Path) -> Self {
126        if let Some(notifier) = &mut self.status_notifier {
127            notifier.collect_paths(path);
128        }
129        let entries = WalkDir::new(path).into_iter().flatten();
130        self.entries_to_upload.extend(entries);
131        self
132    }
133
134    pub async fn start_upload(mut self) -> Result<FilesUploadSummary> {
135        let mut chunk_manager = ChunkManager::new(&self.root_dir);
136        let chunks_to_upload = self.get_chunks_to_upload(&mut chunk_manager).await?;
137        let chunks_to_upload_len = chunks_to_upload.len();
138
139        // Notify on chunking complete
140        if let Some(notifier) = &self.status_notifier {
141            notifier.on_chunking_complete(
142                &self.upload_cfg,
143                self.make_data_public,
144                chunks_to_upload_len,
145            );
146        }
147
148        let now = Instant::now();
149        let mut uploader = Uploader::new(self.client, self.root_dir);
150        uploader.set_upload_cfg(self.upload_cfg);
151        uploader.insert_chunk_paths(chunks_to_upload);
152
153        let events_handle = Self::spawn_upload_events_handler(
154            chunk_manager,
155            self.make_data_public,
156            chunks_to_upload_len,
157            uploader.get_event_receiver(),
158            self.status_notifier.take(),
159        )?;
160
161        let upload_sum = match uploader.start_upload().await {
162            Ok(summary) => summary,
163            Err(ClientError::Wallet(WalletError::Transfer(TransferError::NotEnoughBalance(
164                available,
165                required,
166            )))) => {
167                return Err(eyre!(
168                    "Not enough balance in wallet to pay for chunk. \
169            We have {available:?} but need {required:?} to pay for the chunk"
170                ))
171            }
172            Err(err) => return Err(eyre!("Failed to upload chunk batch: {err}")),
173        };
174        let (chunk_manager, status_notifier) = events_handle.await??;
175        self.status_notifier = status_notifier;
176
177        // Notify on upload complete
178        if let Some(notifier) = &self.status_notifier {
179            notifier.on_upload_complete(&upload_sum, now.elapsed(), chunks_to_upload_len);
180        }
181
182        let summary = FilesUploadSummary {
183            upload_summary: upload_sum,
184            completed_files: chunk_manager.completed_files().clone(),
185            incomplete_files: chunk_manager
186                .incomplete_files()
187                .into_iter()
188                .map(|(path, file_name, head_address)| {
189                    (path.clone(), file_name.clone(), *head_address)
190                })
191                .collect(),
192        };
193        Ok(summary)
194    }
195
196    // This will read from the cache if possible. We only re-verify with the network if the file has been cached but
197    // there are no pending chunks to upload.
198    async fn get_chunks_to_upload(
199        &self,
200        chunk_manager: &mut ChunkManager,
201    ) -> Result<Vec<(XorName, PathBuf)>> {
202        // Initially try reading from the cache
203        chunk_manager.chunk_with_iter(
204            self.entries_to_upload.iter().cloned(),
205            true,
206            self.make_data_public,
207        )?;
208        // We verify if there are no chunks left to upload.
209        let mut chunks_to_upload = if !chunk_manager.is_chunks_empty() {
210            chunk_manager.get_chunks()
211        } else {
212            // re chunk it again to get back all the chunks
213            let chunks = chunk_manager.already_put_chunks(
214                self.entries_to_upload.iter().cloned(),
215                self.make_data_public,
216            )?;
217
218            // Notify on verification init
219            if let Some(notifier) = &self.status_notifier {
220                notifier.on_verifying_uploaded_chunks_init(chunks.len());
221            }
222
223            let failed_chunks = self.verify_uploaded_chunks(&chunks).await?;
224
225            chunk_manager.mark_completed(
226                chunks
227                    .into_iter()
228                    .filter(|c| !failed_chunks.contains(c))
229                    .map(|(xor, _)| xor),
230            )?;
231
232            if failed_chunks.is_empty() {
233                // Notify on verification success
234                if let Some(notifier) = &self.status_notifier {
235                    notifier.on_verifying_uploaded_chunks_success(
236                        chunk_manager.completed_files(),
237                        self.make_data_public,
238                    );
239                }
240
241                return Ok(vec![]);
242            }
243            // Notify on verification failure
244            if let Some(notifier) = &self.status_notifier {
245                notifier.on_verifying_uploaded_chunks_failure(failed_chunks.len());
246            }
247            failed_chunks
248        };
249        // shuffle the chunks
250        let mut rng = thread_rng();
251        chunks_to_upload.shuffle(&mut rng);
252
253        Ok(chunks_to_upload)
254    }
255
256    async fn verify_uploaded_chunks(
257        &self,
258        chunks_paths: &[(XorName, PathBuf)],
259    ) -> Result<Vec<(XorName, PathBuf)>> {
260        let mut stream = futures::stream::iter(chunks_paths)
261            .map(|(xorname, path)| async move {
262                let chunk = Chunk::new(Bytes::from(std::fs::read(path)?));
263                let res = self.client.verify_chunk_stored(&chunk).await;
264                Ok::<_, Report>((xorname, path.clone(), res.is_err()))
265            })
266            .buffer_unordered(self.upload_cfg.batch_size);
267        let mut failed_chunks = Vec::new();
268
269        while let Some(result) = stream.next().await {
270            let (xorname, path, is_error) = result?;
271            if is_error {
272                warn!("Failed to fetch a chunk {xorname:?}");
273                failed_chunks.push((*xorname, path));
274            }
275        }
276
277        Ok(failed_chunks)
278    }
279
280    #[expect(clippy::type_complexity)]
281    fn spawn_upload_events_handler(
282        mut chunk_manager: ChunkManager,
283        make_data_public: bool,
284        chunks_to_upload_len: usize,
285        mut upload_event_rx: Receiver<UploadEvent>,
286        status_notifier: Option<Box<dyn FilesUploadStatusNotifier>>,
287    ) -> Result<JoinHandle<Result<(ChunkManager, Option<Box<dyn FilesUploadStatusNotifier>>)>>>
288    {
289        let progress_bar = get_progress_bar(chunks_to_upload_len as u64)?;
290        let handle = tokio::spawn(async move {
291            let mut upload_terminated_with_error = false;
292            // The loop is guaranteed to end, as the channel will be
293            // closed when the upload completes or errors out.
294            while let Some(event) = upload_event_rx.recv().await {
295                match event {
296                    UploadEvent::ChunkUploaded(addr)
297                    | UploadEvent::ChunkAlreadyExistsInNetwork(addr) => {
298                        progress_bar.clone().inc(1);
299                        if let Err(err) =
300                            chunk_manager.mark_completed(std::iter::once(*addr.xorname()))
301                        {
302                            error!("Failed to mark chunk {addr:?} as completed: {err:?}");
303                        }
304                    }
305                    UploadEvent::Error => {
306                        upload_terminated_with_error = true;
307                    }
308                    UploadEvent::RegisterUploaded { .. }
309                    | UploadEvent::RegisterUpdated { .. }
310                    | UploadEvent::PaymentMade { .. } => {}
311                }
312            }
313            progress_bar.finish_and_clear();
314
315            // this check is to make sure that we don't partially write to the uploaded_files file if the upload process
316            // terminates with an error. This race condition can happen as we bail on `upload_result` before we await the
317            // handler.
318            if upload_terminated_with_error {
319                error!("Got UploadEvent::Error inside upload event loop");
320            } else {
321                // Notify on upload failure
322                if let Some(notifier) = &status_notifier {
323                    notifier.on_failed_to_upload_all_files(
324                        chunk_manager.incomplete_files(),
325                        chunk_manager.completed_files(),
326                        make_data_public,
327                    );
328                }
329            }
330
331            Ok::<_, Report>((chunk_manager, status_notifier))
332        });
333
334        Ok(handle)
335    }
336}
337
338/// The default
339struct StdOutPrinter {
340    file_paths_to_print: Vec<PathBuf>,
341}
342
343impl FilesUploadStatusNotifier for StdOutPrinter {
344    fn collect_entries(&mut self, _entries_iter: Vec<DirEntry>) {}
345
346    fn collect_paths(&mut self, path: &Path) {
347        self.file_paths_to_print.push(path.to_path_buf());
348    }
349
350    fn on_verifying_uploaded_chunks_init(&self, chunks_len: usize) {
351        println!("Files upload attempted previously, verifying {chunks_len} chunks",);
352    }
353
354    fn on_verifying_uploaded_chunks_success(
355        &self,
356        completed_files: &[(PathBuf, OsString, ChunkAddress)],
357        make_data_public: bool,
358    ) {
359        println!("All files were already uploaded and verified");
360        Self::print_uploaded_msg(make_data_public);
361
362        if completed_files.is_empty() {
363            println!("chunk_manager doesn't have any verified_files, nor any failed_chunks to re-upload.");
364        }
365        Self::print_completed_file_list(completed_files);
366    }
367
368    fn on_verifying_uploaded_chunks_failure(&self, failed_chunks_len: usize) {
369        println!("{failed_chunks_len} chunks were uploaded in the past but failed to verify. Will attempt to upload them again...");
370    }
371
372    fn on_failed_to_upload_all_files(
373        &self,
374        incomplete_files: Vec<(&PathBuf, &OsString, &ChunkAddress)>,
375        completed_files: &[(PathBuf, OsString, ChunkAddress)],
376        make_data_public: bool,
377    ) {
378        for (_, file_name, _) in incomplete_files {
379            if let Some(file_name) = file_name.to_str() {
380                println!("Unverified file \"{file_name}\", suggest to re-upload again.");
381                info!("Unverified {file_name}");
382            } else {
383                println!("Unverified file \"{file_name:?}\", suggest to re-upload again.");
384                info!("Unverified file {file_name:?}");
385            }
386        }
387
388        // log uploaded file information
389        Self::print_uploaded_msg(make_data_public);
390        Self::print_completed_file_list(completed_files);
391    }
392
393    fn on_chunking_complete(
394        &self,
395        upload_cfg: &UploadCfg,
396        make_data_public: bool,
397        chunks_to_upload_len: usize,
398    ) {
399        for path in self.file_paths_to_print.iter() {
400            debug!(
401                "Uploading file(s) from {path:?} batch size {:?} will verify?: {}",
402                upload_cfg.batch_size, upload_cfg.verify_store
403            );
404            if make_data_public {
405                info!("{path:?} will be made public and linkable");
406                println!("{path:?} will be made public and linkable");
407            }
408        }
409        if self.file_paths_to_print.len() == 1 {
410            println!(
411                "Splitting and uploading {:?} into {chunks_to_upload_len} chunks",
412                self.file_paths_to_print[0]
413            );
414        } else {
415            println!(
416                "Splitting and uploading {:?} into {chunks_to_upload_len} chunks",
417                self.file_paths_to_print
418            );
419        }
420    }
421
422    fn on_upload_complete(
423        &self,
424        upload_sum: &UploadSummary,
425        elapsed_time: Duration,
426        chunks_to_upload_len: usize,
427    ) {
428        let elapsed = duration_to_minute_seconds_string(elapsed_time);
429
430        println!(
431            "Among {chunks_to_upload_len} chunks, found {} already existed in network, uploaded \
432            the leftover {} chunks in {elapsed}",
433            upload_sum.skipped_count, upload_sum.uploaded_count,
434        );
435        info!(
436            "Among {chunks_to_upload_len} chunks, found {} already existed in network, uploaded \
437            the leftover {} chunks in {elapsed}",
438            upload_sum.skipped_count, upload_sum.uploaded_count,
439        );
440        println!("**************************************");
441        println!("*          Payment Details           *");
442        println!("**************************************");
443        println!(
444            "Made payment of {:?} for {} chunks",
445            upload_sum.storage_cost, upload_sum.uploaded_count
446        );
447        println!(
448            "Made payment of {:?} for royalties fees",
449            upload_sum.royalty_fees
450        );
451        println!("New wallet balance: {}", upload_sum.final_balance);
452    }
453}
454
455impl StdOutPrinter {
456    fn print_completed_file_list(completed_files: &[(PathBuf, OsString, ChunkAddress)]) {
457        for (_, file_name, addr) in completed_files {
458            let hex_addr = addr.to_hex();
459            if let Some(file_name) = file_name.to_str() {
460                println!("Uploaded \"{file_name}\" to address {hex_addr}");
461                info!("Uploaded {file_name} to {hex_addr}");
462            } else {
463                println!("Uploaded \"{file_name:?}\" to address {hex_addr}");
464                info!("Uploaded {file_name:?} to {hex_addr}");
465            }
466        }
467    }
468
469    fn print_uploaded_msg(make_data_public: bool) {
470        println!("**************************************");
471        println!("*          Uploaded Files            *");
472        if !make_data_public {
473            println!("*                                    *");
474            println!("*  These are not public by default.  *");
475            println!("*     Reupload with `-p` option      *");
476            println!("*      to publish the datamaps.      *");
477        }
478        println!("**************************************");
479    }
480}