sn_cli/files/
chunk_manager.rs

1// Copyright 2024 MaidSafe.net limited.
2//
3// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
4// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
5// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
6// KIND, either express or implied. Please review the Licences for the specific language governing
7// permissions and limitations relating to use of the SAFE Network Software.
8
9use super::get_progress_bar;
10use super::upload::UploadedFile;
11use bytes::Bytes;
12use color_eyre::{
13    eyre::{bail, eyre},
14    Result,
15};
16use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
17use sn_client::{
18    protocol::storage::{Chunk, ChunkAddress},
19    FilesApi,
20};
21use std::{
22    collections::{BTreeMap, BTreeSet},
23    ffi::OsString,
24    fs::{self, File},
25    io::Write,
26    path::{Path, PathBuf},
27    time::Instant,
28};
29use tracing::{debug, error, info, trace};
30use walkdir::{DirEntry, WalkDir};
31use xor_name::XorName;
32
33const CHUNK_ARTIFACTS_DIR: &str = "chunk_artifacts";
34const METADATA_FILE: &str = "metadata";
35
36// The unique hex encoded hash(path)
37// This allows us to uniquely identify if a file has been chunked or not.
38// An alternative to use instead of filename as it might not be unique
39#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
40struct PathXorName(String);
41
42impl PathXorName {
43    fn new(path: &Path) -> PathXorName {
44        // we just need an unique value per path, thus we don't have to mind between the
45        // [u8]/[u16] differences
46        let path_as_lossy_str = path.as_os_str().to_string_lossy();
47        let path_xor = XorName::from_content(path_as_lossy_str.as_bytes());
48        PathXorName(hex::encode(path_xor))
49    }
50}
51
52/// Info about a file that has been chunked
53#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
54pub struct ChunkedFile {
55    pub file_path: PathBuf,
56    pub file_name: OsString,
57    pub head_chunk_address: ChunkAddress,
58    pub chunks: BTreeSet<(XorName, PathBuf)>,
59    pub data_map: Chunk,
60}
61
62/// Manages the chunking process by resuming pre-chunked files and chunking any
63/// file that has not been chunked yet.
64#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
65pub struct ChunkManager {
66    /// Whole client root dir
67    root_dir: PathBuf,
68    /// Dir for chunk artifacts
69    artifacts_dir: PathBuf,
70    files_to_chunk: Vec<(OsString, PathXorName, PathBuf)>,
71    chunks: BTreeMap<PathXorName, ChunkedFile>,
72    completed_files: Vec<(PathBuf, OsString, ChunkAddress)>,
73    resumed_chunk_count: usize,
74    resumed_files_count: usize,
75}
76
77impl ChunkManager {
78    // Provide the root_dir. The function creates a sub-directory to store the SE chunks
79    pub fn new(root_dir: &Path) -> Self {
80        let artifacts_dir = root_dir.join(CHUNK_ARTIFACTS_DIR);
81        Self {
82            root_dir: root_dir.to_path_buf(),
83            artifacts_dir,
84            files_to_chunk: Default::default(),
85            chunks: Default::default(),
86            completed_files: Default::default(),
87            resumed_files_count: 0,
88            resumed_chunk_count: 0,
89        }
90    }
91
92    /// Chunk all the files in the provided `files_path`
93    /// These are stored to the CHUNK_ARTIFACTS_DIR
94    /// if read_cache is true, will take cache from previous runs into account
95    ///
96    /// # Arguments
97    /// * files_path - &[Path]
98    /// * read_cache - Boolean. Set to true to resume the chunks from the artifacts dir.
99    /// * include_data_maps - Boolean. If set to true, will append all the ChunkedFile.data_map chunks
100    pub fn chunk_path(
101        &mut self,
102        files_path: &Path,
103        read_cache: bool,
104        include_data_maps: bool,
105    ) -> Result<()> {
106        self.chunk_with_iter(
107            WalkDir::new(files_path).into_iter().flatten(),
108            read_cache,
109            include_data_maps,
110        )
111    }
112
113    /// Return the filename and the file's Xor address if all their chunks has been marked as
114    /// verified
115    pub(crate) fn already_put_chunks(
116        &mut self,
117        entries_iter: impl Iterator<Item = DirEntry>,
118        make_files_public: bool,
119    ) -> Result<Vec<(XorName, PathBuf)>> {
120        self.chunk_with_iter(entries_iter, false, make_files_public)?;
121        Ok(self.get_chunks())
122    }
123
124    /// Chunk all the files in the provided iterator
125    /// These are stored to the CHUNK_ARTIFACTS_DIR
126    /// if read_cache is true, will take cache from previous runs into account
127    pub fn chunk_with_iter(
128        &mut self,
129        entries_iter: impl Iterator<Item = DirEntry>,
130        read_cache: bool,
131        include_data_maps: bool,
132    ) -> Result<()> {
133        let now = Instant::now();
134        // clean up
135        self.files_to_chunk = Default::default();
136        self.chunks = Default::default();
137        self.completed_files = Default::default();
138        self.resumed_chunk_count = 0;
139        self.resumed_files_count = 0;
140
141        // collect the files to chunk
142        entries_iter.for_each(|entry| {
143            if entry.file_type().is_file() {
144                let path_xor = PathXorName::new(entry.path());
145                info!(
146                    "Added file {:?} with path_xor: {path_xor:?} to be chunked/resumed",
147                    entry.path()
148                );
149                self.files_to_chunk.push((
150                    entry.file_name().to_owned(),
151                    path_xor,
152                    entry.into_path(),
153                ));
154            }
155        });
156        let total_files = self.files_to_chunk.len();
157
158        if total_files == 0 {
159            return Ok(());
160        };
161
162        // resume the chunks from the artifacts dir
163        if read_cache {
164            self.resume_path();
165        }
166
167        // note the number of chunks that we've resumed
168        self.resumed_chunk_count = self
169            .chunks
170            .values()
171            .flat_map(|chunked_file| &chunked_file.chunks)
172            .count();
173        // note the number of files that we've resumed
174        self.resumed_files_count = self.chunks.keys().collect::<BTreeSet<_>>().len();
175
176        // Filter out files_to_chunk; Any PathXorName in chunks_to_upload is considered to be resumed.
177        {
178            let path_xors = self.chunks.keys().collect::<BTreeSet<_>>();
179            self.files_to_chunk
180                .retain(|(_, path_xor, _)| !path_xors.contains(path_xor));
181        }
182
183        // Get the list of completed files
184        {
185            let completed_files = self.chunks.iter().filter_map(|(_, chunked_file)| {
186                if chunked_file.chunks.is_empty() {
187                    Some((
188                        chunked_file.file_path.clone(),
189                        chunked_file.file_name.clone(),
190                        chunked_file.head_chunk_address,
191                    ))
192                } else {
193                    None
194                }
195            });
196
197            self.completed_files.extend(completed_files);
198        }
199
200        // Return early if no more files to chunk
201        if self.files_to_chunk.is_empty() {
202            debug!(
203                "All files_to_chunk ({total_files:?}) were resumed. Returning the resumed chunks.",
204            );
205            debug!("It took {:?} to resume all the files", now.elapsed());
206            return Ok(());
207        }
208
209        let progress_bar = get_progress_bar(total_files as u64)?;
210        progress_bar.println(format!("Chunking {total_files} files..."));
211
212        let artifacts_dir = &self.artifacts_dir.clone();
213        let chunked_files = self.files_to_chunk
214            .par_iter()
215            .map(|(original_file_name, path_xor, path)| {
216                let file_chunks_dir = {
217                    let file_chunks_dir = artifacts_dir.join(&path_xor.0);
218                    fs::create_dir_all(&file_chunks_dir).map_err(|err| {
219                        error!("Failed to create folder {file_chunks_dir:?} for SE chunks with error {err:?}!");
220                        eyre!("Failed to create dir {file_chunks_dir:?} for SE chunks with error {err:?}")
221                    })?;
222                    file_chunks_dir
223                };
224
225                match FilesApi::chunk_file(path, &file_chunks_dir, include_data_maps) {
226                    Ok((head_chunk_address, data_map, size, chunks)) => {
227                        progress_bar.clone().inc(1);
228                        debug!("Chunked {original_file_name:?} with {path_xor:?} into file's XorName: {head_chunk_address:?} of size {size}, and chunks len: {}", chunks.len());
229
230                        let chunked_file = ChunkedFile {
231                            head_chunk_address,
232                            file_path: path.to_owned(),
233                            file_name: original_file_name.clone(),
234                            chunks: chunks.into_iter().collect(),
235                            data_map
236                        };
237                        Ok((path_xor.clone(), chunked_file))
238                    }
239                    Err(err) => {
240                        println!("Failed to chunk file {path:?}/{path_xor:?} with err: {err:?}");
241                        error!("Failed to chunk file {path:?}/{path_xor:?} with err: {err:?}");
242                        Err(eyre!("Failed to chunk file {path:?}/{path_xor:?} with err: {err:?}"))
243                    }
244                }
245            })
246            .collect::<Result<BTreeMap<_, _>>>()?;
247        debug!(
248            "Out of total files_to_chunk {total_files}, we have resumed {} files and chunked {} files",
249            self.resumed_files_count,
250            chunked_files.len()
251        );
252
253        // Self::resume_path would create an empty self.chunks entry if a file that was fully
254        // completed was resumed. Thus if it is empty, the user did not provide any valid file
255        // path.
256        if chunked_files.is_empty() && self.chunks.is_empty() {
257            bail!(
258                "The provided path does not contain any file. Please check your path!\nExiting..."
259            );
260        }
261
262        // write metadata and data_map
263        chunked_files
264            .par_iter()
265            .map(|(path_xor, chunked_file)| {
266                let metadata_path = artifacts_dir.join(&path_xor.0).join(METADATA_FILE);
267
268                info!("Metadata path is: {metadata_path:?}");
269                let metadata = rmp_serde::to_vec(&(
270                    chunked_file.head_chunk_address,
271                    chunked_file.data_map.clone(),
272                ))
273                .map_err(|_| {
274                    error!("Failed to serialize file_xor_addr for writing metadata");
275                    eyre!("Failed to serialize file_xor_addr for writing metadata")
276                })?;
277
278                let mut metadata_file = File::create(&metadata_path).map_err(|_| {
279                    error!("Failed to create metadata_path {metadata_path:?} for {path_xor:?}");
280                    eyre!("Failed to create metadata_path {metadata_path:?} for {path_xor:?}")
281                })?;
282
283                metadata_file.write_all(&metadata).map_err(|_| {
284                    error!("Failed to write metadata to {metadata_path:?} for {path_xor:?}");
285                    eyre!("Failed to write metadata to {metadata_path:?} for {path_xor:?}")
286                })?;
287
288                debug!("Wrote metadata for {path_xor:?}");
289                Ok(())
290            })
291            .collect::<Result<()>>()?;
292
293        progress_bar.finish_and_clear();
294        debug!("It took {:?} to chunk {} files", now.elapsed(), total_files);
295        self.chunks.extend(chunked_files);
296
297        Ok(())
298    }
299
300    // Try to resume the chunks
301    fn resume_path(&mut self) {
302        let artifacts_dir = self.artifacts_dir.clone();
303        let resumed = self
304            .files_to_chunk
305            .par_iter()
306            .filter_map(|(original_file_name, path_xor, original_file_path)| {
307                // if this folder exists, and if we find chunks under this, we upload them.
308                let file_chunks_dir = artifacts_dir.join(&path_xor.0);
309                if !file_chunks_dir.exists() {
310                    return None;
311                }
312                Self::read_file_chunks_dir(
313                    file_chunks_dir,
314                    path_xor,
315                    original_file_path.clone(),
316                    original_file_name.clone(),
317                )
318            })
319            .collect::<BTreeMap<_, _>>();
320
321        self.chunks.extend(resumed);
322    }
323
324    /// Get all the chunk name and their path.
325    /// If include_data_maps is true, append all the ChunkedFile.data_map chunks to the vec
326    pub fn get_chunks(&self) -> Vec<(XorName, PathBuf)> {
327        self.chunks
328            .values()
329            .flat_map(|chunked_file| &chunked_file.chunks)
330            .cloned()
331            .collect::<Vec<(XorName, PathBuf)>>()
332    }
333
334    pub fn is_chunks_empty(&self) -> bool {
335        self.chunks
336            .values()
337            .flat_map(|chunked_file| &chunked_file.chunks)
338            .next()
339            .is_none()
340    }
341
342    /// Mark all the chunks as completed. This removes the chunks from the CHUNK_ARTIFACTS_DIR.
343    /// But keeps the folder and metadata file that denotes that the file has been already completed.
344    pub fn mark_completed_all(&mut self) -> Result<()> {
345        let all_chunks = self
346            .chunks
347            .values()
348            .flat_map(|chunked_file| &chunked_file.chunks)
349            .map(|(chunk, _)| *chunk)
350            .collect::<Vec<_>>();
351        self.mark_completed(all_chunks.into_iter())
352    }
353
354    /// Mark a set of chunks as completed and remove them from CHUNK_ARTIFACTS_DIR
355    /// If the entire file is completed, keep the folder and metadata file
356    pub fn mark_completed(&mut self, chunks: impl Iterator<Item = XorName>) -> Result<()> {
357        let set_of_completed_chunks = chunks.collect::<BTreeSet<_>>();
358        trace!("marking as completed: {set_of_completed_chunks:?}");
359
360        // remove those files
361        self.chunks
362            .par_iter()
363            .flat_map(|(_, chunked_file)| &chunked_file.chunks)
364            .map(|(chunk_xor, chunk_path)| {
365                if set_of_completed_chunks.contains(chunk_xor) {
366                    debug!("removing {chunk_xor:?} at {chunk_path:?} as it is marked as completed");
367                    fs::remove_file(chunk_path).map_err(|_err| {
368                        error!("Failed to remove SE chunk {chunk_xor} from {chunk_path:?}");
369                        eyre!("Failed to remove SE chunk {chunk_xor} from {chunk_path:?}")
370                    })?;
371                }
372                Ok(())
373            })
374            .collect::<Result<()>>()?;
375
376        let mut entire_file_is_done = BTreeSet::new();
377        // remove the entries from the struct
378        self.chunks.iter_mut().for_each(|(path_xor, chunked_file)| {
379            chunked_file
380                .chunks
381                // if chunk is part of completed_chunks, return false to remove it
382                .retain(|(chunk_xor, _)| !set_of_completed_chunks.contains(chunk_xor));
383            if chunked_file.chunks.is_empty() {
384                entire_file_is_done.insert(path_xor.clone());
385            }
386        });
387
388        for path_xor in &entire_file_is_done {
389            // todo: should we remove the entry? ig so
390            if let Some(chunked_file) = self.chunks.remove(path_xor) {
391                trace!("removed {path_xor:?} from chunks list");
392
393                self.completed_files.push((
394                    chunked_file.file_path.clone(),
395                    chunked_file.file_name.clone(),
396                    chunked_file.head_chunk_address,
397                ));
398
399                let uploaded_file_metadata = UploadedFile {
400                    filename: chunked_file.file_name,
401                    data_map: Some(chunked_file.data_map.value),
402                };
403                // errors are logged by write()
404                let _result =
405                    uploaded_file_metadata.write(&self.root_dir, &chunked_file.head_chunk_address);
406            }
407        }
408        Ok(())
409
410        // let mut entire_file_is_done = BTreeSet::new();
411        // // remove the entries from the struct
412        // self.chunks.iter_mut().for_each(|(path_xor, chunked_file)| {
413        //     chunked_file
414        //         .chunks
415        //         // if chunk is part of completed_chunks, return false to remove it
416        //         .retain(|(chunk_xor, _)| !set_of_completed_chunks.contains(chunk_xor));
417        //     if chunked_file.chunks.is_empty() {
418        //         entire_file_is_done.insert(path_xor.clone());
419        //     }
420        // });
421
422        // for path_xor in &entire_file_is_done {
423        //     // todo: should we remove the entry? ig so
424        //     if let Some(chunked_file) = self.chunks.remove(path_xor) {
425        //         trace!("removed {path_xor:?} from chunks list");
426        //         self.verified_files
427        //             .push((chunked_file.file_name, chunked_file.head_chunk_address));
428        //     }
429        // }
430    }
431
432    /// Return the filename and the file's Xor address if all their chunks has been marked as
433    /// completed
434    pub(crate) fn completed_files(&self) -> &Vec<(PathBuf, OsString, ChunkAddress)> {
435        &self.completed_files
436    }
437
438    /// Return the list of Filenames that have some chunks that are yet to be marked as completed.
439    pub(crate) fn incomplete_files(&self) -> Vec<(&PathBuf, &OsString, &ChunkAddress)> {
440        self.chunks
441            .values()
442            .map(|chunked_file| {
443                (
444                    &chunked_file.file_path,
445                    &chunked_file.file_name,
446                    &chunked_file.head_chunk_address,
447                )
448            })
449            .collect()
450    }
451
452    /// Returns an iterator over the list of chunked files
453    pub(crate) fn iter_chunked_files(&mut self) -> impl Iterator<Item = &ChunkedFile> {
454        self.chunks.values()
455    }
456
457    // Try to read the chunks from `file_chunks_dir`
458    // Returns the ChunkedFile if the metadata file exists
459    // file_chunks_dir: artifacts_dir/path_xor
460    // path_xor: Used during logging and is returned
461    // original_file_name: Used to create ChunkedFile
462    fn read_file_chunks_dir(
463        file_chunks_dir: PathBuf,
464        path_xor: &PathXorName,
465        original_file_path: PathBuf,
466        original_file_name: OsString,
467    ) -> Option<(PathXorName, ChunkedFile)> {
468        let mut file_chunk_address: Option<ChunkAddress> = None;
469        let mut data_map = Chunk::new(Bytes::new());
470        debug!("Trying to resume {path_xor:?} as the file_chunks_dir exists");
471
472        let chunks = WalkDir::new(file_chunks_dir.clone())
473            .into_iter()
474            .flatten()
475            .filter_map(|entry| {
476                if !entry.file_type().is_file() {
477                    return None;
478                }
479                if entry.file_name() == METADATA_FILE {
480                    if let Some((address, optional_data_map)) =
481                        Self::try_read_metadata(entry.path())
482                    {
483                        file_chunk_address = Some(address);
484                        data_map = optional_data_map;
485                        debug!("Obtained metadata for {path_xor:?}");
486                    } else {
487                        error!("Could not read metadata for {path_xor:?}");
488                    }
489                    // not a chunk, so don't return
490                    return None;
491                }
492
493                // try to get the chunk's xorname from its filename
494                if let Some(file_name) = entry.file_name().to_str() {
495                    Self::hex_decode_xorname(file_name)
496                        .map(|chunk_xorname| (chunk_xorname, entry.into_path()))
497                } else {
498                    error!(
499                        "Failed to convert OsString to str for {:?}",
500                        entry.file_name()
501                    );
502                    None
503                }
504            })
505            .collect::<BTreeSet<_>>();
506
507        match file_chunk_address {
508            Some(head_chunk_address) => {
509                debug!("Resuming {} chunks for file {original_file_name:?} and with file_xor_addr {head_chunk_address:?}/{path_xor:?}", chunks.len());
510
511                Some((
512                    path_xor.clone(),
513                    ChunkedFile {
514                        file_path: original_file_path,
515                        file_name: original_file_name,
516                        head_chunk_address,
517                        chunks,
518                        data_map,
519                    },
520                ))
521            }
522            _ => {
523                error!("Metadata file or data map was not present for {path_xor:?}");
524                // metadata file or data map was not present/was not read
525                None
526            }
527        }
528    }
529
530    /// Try to read the metadata file
531    /// Returning (head_chunk_address, datamap Chunk)
532    fn try_read_metadata(path: &Path) -> Option<(ChunkAddress, Chunk)> {
533        let metadata = fs::read(path)
534            .map_err(|err| error!("Failed to read metadata with err {err:?}"))
535            .ok()?;
536        // head chunk address and the final datamap contents if a datamap exists for this file
537        let metadata: (ChunkAddress, Chunk) = rmp_serde::from_slice(&metadata)
538            .map_err(|err| error!("Failed to deserialize metadata with err {err:?}"))
539            .ok()?;
540
541        Some(metadata)
542    }
543
544    // Decode the hex encoded xorname
545    fn hex_decode_xorname(string: &str) -> Option<XorName> {
546        let hex_decoded = hex::decode(string)
547            .map_err(|err| error!("Failed to decode {string} into bytes with err {err:?}"))
548            .ok()?;
549        let decoded_xorname: [u8; xor_name::XOR_NAME_LEN] = hex_decoded
550            .try_into()
551            .map_err(|_| error!("Failed to convert hex_decoded xorname into an [u8; 32]"))
552            .ok()?;
553        Some(XorName(decoded_xorname))
554    }
555}
556
557#[cfg(test)]
558mod tests {
559    use super::*;
560    use color_eyre::{eyre::eyre, Result};
561    use rand::{thread_rng, Rng};
562    use rayon::prelude::IntoParallelIterator;
563    use sn_logging::LogBuilder;
564    use tempfile::TempDir;
565
566    /// Assert any collection/iterator even if their orders do not match.
567    pub fn assert_list_eq<I, J, K>(a: I, b: J)
568    where
569        K: Eq + Clone,
570        I: IntoIterator<Item = K>,
571        J: IntoIterator<Item = K>,
572    {
573        let vec1: Vec<_> = a.into_iter().collect::<Vec<_>>();
574        let mut vec2: Vec<_> = b.into_iter().collect();
575
576        assert_eq!(vec1.len(), vec2.len());
577
578        for item1 in &vec1 {
579            let idx2 = vec2
580                .iter()
581                .position(|item2| item1 == item2)
582                .expect("Item not found in second list");
583
584            vec2.swap_remove(idx2);
585        }
586
587        assert_eq!(vec2.len(), 0);
588    }
589
590    #[test]
591    fn chunked_files_should_be_written_to_artifacts_dir() -> Result<()> {
592        let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
593        let (_tmp_dir, mut manager, _, random_files_dir) = init_manager()?;
594        let artifacts_dir = manager.artifacts_dir.clone();
595        let _ = create_random_files(&random_files_dir, 1, 1)?;
596        manager.chunk_path(&random_files_dir, true, true)?;
597
598        let chunks = manager.get_chunks();
599        // 1. 1mb file produces 4 chunks
600        assert_eq!(chunks.len(), 4);
601
602        // 2. make sure we have 1 folder == 1 file
603        let n_folders = WalkDir::new(&artifacts_dir)
604            .into_iter()
605            .flatten()
606            .filter(|entry| entry.file_type().is_dir() && entry.path() != artifacts_dir)
607            .count();
608        assert_eq!(n_folders, 1);
609
610        // 3. make sure we have the 1 files per chunk, + 1 datamap + 1 metadata file
611        let n_files = WalkDir::new(&artifacts_dir)
612            .into_iter()
613            .flatten()
614            .filter(|entry| {
615                info!("direntry {entry:?}");
616                entry.file_type().is_file()
617            })
618            .count();
619        assert_eq!(n_files, chunks.len() + 1);
620
621        // 4. make sure metadata file holds the correct file_xor_addr
622        let mut file_xor_addr_from_metadata = None;
623        for entry in WalkDir::new(&artifacts_dir).into_iter().flatten() {
624            if entry.file_type().is_file() && entry.file_name() == METADATA_FILE {
625                let metadata = ChunkManager::try_read_metadata(entry.path());
626
627                if let Some((head_chunk_addr, _datamap)) = metadata {
628                    file_xor_addr_from_metadata = Some(head_chunk_addr);
629                }
630            }
631        }
632        let file_xor_addr_from_metadata =
633            file_xor_addr_from_metadata.expect("The metadata file should be present");
634        let file_xor_addr = manager
635            .chunks
636            .values()
637            .next()
638            .expect("1 file should be present")
639            .head_chunk_address;
640        assert_eq!(file_xor_addr_from_metadata, file_xor_addr);
641
642        // 5. make sure the chunked file's name is the XorName of that chunk
643        let chunk_xornames = manager
644            .chunks
645            .values()
646            .next()
647            .expect("We must have 1 file here")
648            .chunks
649            .iter()
650            .map(|(xor_name, _)| *xor_name)
651            .collect::<BTreeSet<_>>();
652        for entry in WalkDir::new(&artifacts_dir).into_iter().flatten() {
653            let file_name = entry.file_name();
654            if entry.file_type().is_file() && file_name != METADATA_FILE {
655                let chunk_xorname_from_filename =
656                    ChunkManager::hex_decode_xorname(file_name.to_str().unwrap())
657                        .expect("Failed to get xorname from hex encoded file_name");
658                assert!(chunk_xornames.contains(&chunk_xorname_from_filename));
659            }
660        }
661
662        Ok(())
663    }
664
665    #[test]
666    fn no_datamap_chunked_files_should_be_written_to_artifacts_dir_when_not_public() -> Result<()> {
667        let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
668        let (_tmp_dir, mut manager, _, random_files_dir) = init_manager()?;
669        let artifacts_dir = manager.artifacts_dir.clone();
670        let _ = create_random_files(&random_files_dir, 1, 1)?;
671
672        // we do NOT want to include or write the data_map chunk here
673        manager.chunk_path(&random_files_dir, true, false)?;
674
675        let chunks = manager.get_chunks();
676        // 1. 1mb file produces 3 chunks without the datamap
677        assert_eq!(chunks.len(), 3);
678
679        // 2. make sure we have 1 folder == 1 file
680        let n_folders = WalkDir::new(&artifacts_dir)
681            .into_iter()
682            .flatten()
683            .filter(|entry| entry.file_type().is_dir() && entry.path() != artifacts_dir)
684            .count();
685        assert_eq!(n_folders, 1);
686
687        // 3. make sure we have the 1 files per chunk, + 1 metadata file
688        let n_files = WalkDir::new(&artifacts_dir)
689            .into_iter()
690            .flatten()
691            .filter(|entry| {
692                info!("direntry {entry:?}");
693                entry.file_type().is_file()
694            })
695            .count();
696        assert_eq!(n_files, chunks.len() + 1);
697
698        // 4. make sure metadata file holds the correct file_xor_addr
699        let mut file_xor_addr_from_metadata = None;
700        for entry in WalkDir::new(&artifacts_dir).into_iter().flatten() {
701            if entry.file_type().is_file() && entry.file_name() == METADATA_FILE {
702                let metadata = ChunkManager::try_read_metadata(entry.path());
703
704                if let Some((head_chunk_addr, _datamap)) = metadata {
705                    file_xor_addr_from_metadata = Some(head_chunk_addr);
706                }
707            }
708        }
709        let file_xor_addr_from_metadata =
710            file_xor_addr_from_metadata.expect("The metadata file should be present");
711        let file_xor_addr = manager
712            .chunks
713            .values()
714            .next()
715            .expect("1 file should be present")
716            .head_chunk_address;
717        assert_eq!(file_xor_addr_from_metadata, file_xor_addr);
718
719        // 5. make sure the chunked file's name is the XorName of that chunk
720        let chunk_xornames = manager
721            .chunks
722            .values()
723            .next()
724            .expect("We must have 1 file here")
725            .chunks
726            .iter()
727            .map(|(xor_name, _)| *xor_name)
728            .collect::<BTreeSet<_>>();
729        for entry in WalkDir::new(&artifacts_dir).into_iter().flatten() {
730            let file_name = entry.file_name();
731            if entry.file_type().is_file() && file_name != METADATA_FILE {
732                let chunk_xorname_from_filename =
733                    ChunkManager::hex_decode_xorname(file_name.to_str().unwrap())
734                        .expect("Failed to get xorname from hex encoded file_name");
735                assert!(chunk_xornames.contains(&chunk_xorname_from_filename));
736            }
737        }
738
739        Ok(())
740    }
741
742    #[test]
743    fn chunks_should_be_removed_from_artifacts_dir_if_marked_as_completed() -> Result<()> {
744        let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
745        let (_tmp_dir, mut manager, _, random_files_dir) = init_manager()?;
746
747        let _ = create_random_files(&random_files_dir, 1, 1)?;
748        manager.chunk_path(&random_files_dir, true, true)?;
749
750        let path_xor = manager.chunks.keys().next().unwrap().clone();
751        let chunked_file = manager.chunks.values().next().unwrap().clone();
752        let file_xor_addr = chunked_file.head_chunk_address;
753        let (chunk, _) = chunked_file
754            .chunks
755            .first()
756            .expect("Must contain 1 chunk")
757            .clone();
758        let total_chunks = manager.chunks.values().next().unwrap().chunks.len();
759        manager.mark_completed(vec![chunk].into_iter())?;
760
761        // 1. chunk should be removed from the struct
762        assert_eq!(
763            manager
764                .chunks
765                .values()
766                .next()
767                .expect("Since the file was not fully completed, it should be present")
768                .chunks
769                .len(),
770            total_chunks - 1,
771        );
772
773        // 2. the folder should exists, but chunk removed
774        let file_chunks_dir = manager.artifacts_dir.join(&path_xor.0);
775        let (path_xor_from_dir, chunked_file_from_dir) = ChunkManager::read_file_chunks_dir(
776            file_chunks_dir,
777            &path_xor,
778            chunked_file.file_path,
779            chunked_file.file_name,
780        )
781        .expect("Folder and metadata should be present");
782        assert_eq!(chunked_file_from_dir.chunks.len(), total_chunks - 1);
783        assert_eq!(chunked_file_from_dir.head_chunk_address, file_xor_addr);
784        assert_eq!(path_xor_from_dir, path_xor);
785
786        // 2. file should not be marked as completed
787        assert!(manager.completed_files.is_empty());
788
789        Ok(())
790    }
791
792    #[test]
793    fn marking_all_chunks_as_completed_should_not_remove_the_dir() -> Result<()> {
794        let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
795        let (_tmp_dir, mut manager, _, random_files_dir) = init_manager()?;
796
797        let _ = create_random_files(&random_files_dir, 5, 5)?;
798        manager.chunk_path(&random_files_dir, true, true)?;
799        // cloned after chunking
800        let manager_clone = manager.clone();
801
802        let n_folders = WalkDir::new(&manager.artifacts_dir)
803            .into_iter()
804            .flatten()
805            .filter(|entry| entry.file_type().is_dir() && entry.path() != manager.artifacts_dir)
806            .count();
807        assert_eq!(n_folders, 5);
808
809        manager.mark_completed_all()?;
810
811        // all 5 files should be marked as completed
812        assert_eq!(manager.completed_files.len(), 5);
813
814        // all 5 folders should exist
815        for (path_xor, chunked_file) in manager_clone.chunks.iter() {
816            let file_chunks_dir = manager_clone.artifacts_dir.join(path_xor.0.clone());
817            let (path_xor_from_dir, chunked_file_from_dir) = ChunkManager::read_file_chunks_dir(
818                file_chunks_dir,
819                path_xor,
820                chunked_file.file_path.clone(),
821                chunked_file.file_name.to_owned(),
822            )
823            .expect("Folder and metadata should be present");
824            assert_eq!(chunked_file_from_dir.chunks.len(), 0);
825            assert_eq!(
826                chunked_file_from_dir.head_chunk_address,
827                chunked_file.head_chunk_address
828            );
829            assert_eq!(&path_xor_from_dir, path_xor);
830        }
831
832        Ok(())
833    }
834
835    #[test]
836    fn mark_none_and_resume() -> Result<()> {
837        let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
838        let (_tmp_dir, mut manager, root_dir, random_files_dir) = init_manager()?;
839
840        let _ = create_random_files(&random_files_dir, 5, 5)?;
841        manager.chunk_path(&random_files_dir, true, true)?;
842
843        let mut new_manager = ChunkManager::new(&root_dir);
844        new_manager.chunk_path(&random_files_dir, true, true)?;
845
846        // 1. make sure the chunk counts match
847        let total_chunk_count = manager
848            .chunks
849            .values()
850            .flat_map(|chunked_file| &chunked_file.chunks)
851            .count();
852        assert_eq!(manager.resumed_chunk_count, 0);
853        assert_eq!(new_manager.resumed_chunk_count, total_chunk_count);
854
855        // 2. assert the two managers
856        assert_eq!(manager.chunks, new_manager.chunks);
857        assert_eq!(manager.completed_files, new_manager.completed_files);
858
859        Ok(())
860    }
861
862    #[test]
863    fn mark_one_chunk_and_resume() -> Result<()> {
864        let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
865        let (_tmp_dir, mut manager, root_dir, random_files_dir) = init_manager()?;
866
867        let _ = create_random_files(&random_files_dir, 5, 5)?;
868        manager.chunk_path(&random_files_dir, true, true)?;
869
870        let total_chunks_count = manager
871            .chunks
872            .values()
873            .flat_map(|chunked_file| &chunked_file.chunks)
874            .count();
875
876        // mark a chunk as completed
877        let removed_chunk = manager
878            .chunks
879            .values()
880            .next()
881            .expect("Atleast 1 file should be present")
882            .chunks
883            .iter()
884            .next()
885            .expect("Chunk should be present")
886            .0;
887        manager.mark_completed([removed_chunk].into_iter())?;
888        let mut new_manager = ChunkManager::new(&root_dir);
889        new_manager.chunk_path(&random_files_dir, true, true)?;
890
891        // 1. we should have 1 completed chunk and (total_chunks_count-1) incomplete chunks
892        assert_eq!(manager.resumed_chunk_count, 0);
893        assert_eq!(new_manager.resumed_chunk_count, total_chunks_count - 1);
894        // also check the structs
895        assert_eq!(
896            new_manager
897                .chunks
898                .values()
899                .flat_map(|chunked_file| &chunked_file.chunks)
900                .count(),
901            total_chunks_count - 1
902        );
903
904        // 2. files should not be added to completed files
905        assert_eq!(new_manager.completed_files.len(), 0);
906
907        Ok(())
908    }
909
910    #[test]
911    fn mark_all_and_resume() -> Result<()> {
912        let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
913        let (_tmp_dir, mut manager, root_dir, random_files_dir) = init_manager()?;
914
915        let _ = create_random_files(&random_files_dir, 5, 5)?;
916        manager.chunk_path(&random_files_dir, true, true)?;
917        manager.mark_completed_all()?;
918
919        let mut new_manager = ChunkManager::new(&root_dir);
920        new_manager.chunk_path(&random_files_dir, true, true)?;
921
922        // 1. we should have chunk entries, but 0 chunks inside them
923        assert_eq!(new_manager.chunks.len(), 5);
924        assert_eq!(
925            new_manager
926                .chunks
927                .values()
928                .flat_map(|chunked_file| &chunked_file.chunks)
929                .count(),
930            0
931        );
932        // 2. the resumed stats should be 0
933        assert_eq!(new_manager.resumed_chunk_count, 0);
934
935        // 3. make sure the files are added to completed list
936        assert_eq!(new_manager.completed_files.len(), 5);
937
938        Ok(())
939    }
940
941    #[test]
942    fn absence_of_metadata_file_should_re_chunk_the_entire_file() -> Result<()> {
943        let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
944        let (_tmp_dir, mut manager, _root_dir, random_files_dir) = init_manager()?;
945
946        let mut random_files = create_random_files(&random_files_dir, 1, 1)?;
947        let random_file = random_files.remove(0);
948        manager.chunk_path(&random_files_dir, true, true)?;
949
950        let mut old_chunks_list = BTreeSet::new();
951        for entry in WalkDir::new(&manager.artifacts_dir).into_iter().flatten() {
952            let file_name = entry.file_name();
953            if entry.file_type().is_file() && file_name != METADATA_FILE {
954                let chunk_xorname_from_filename =
955                    ChunkManager::hex_decode_xorname(file_name.to_str().unwrap())
956                        .expect("Failed to get xorname from hex encoded file_name");
957                old_chunks_list.insert(chunk_xorname_from_filename);
958            }
959        }
960
961        // remove metadata file from artifacts_dir
962        let path_xor = PathXorName::new(&random_file);
963        let metadata_path = manager.artifacts_dir.join(path_xor.0).join(METADATA_FILE);
964        fs::remove_file(&metadata_path)?;
965
966        // use the same manager to chunk the path
967        manager.chunk_path(&random_files_dir, true, true)?;
968        // nothing should be resumed
969        assert_eq!(manager.resumed_chunk_count, 0);
970        // but it should be re-chunked
971        assert_eq!(
972            manager.get_chunks().len(),
973            4,
974            "we have correct chunk len including data_map"
975        );
976        // metadata file should be created
977        assert!(metadata_path.exists());
978
979        let mut new_chunks_list = BTreeSet::new();
980        for entry in WalkDir::new(&manager.artifacts_dir).into_iter().flatten() {
981            let file_name = entry.file_name();
982            if entry.file_type().is_file() && file_name != METADATA_FILE {
983                let chunk_xorname_from_filename =
984                    ChunkManager::hex_decode_xorname(file_name.to_str().unwrap())
985                        .expect("Failed to get xorname from hex encoded file_name");
986                new_chunks_list.insert(chunk_xorname_from_filename);
987            }
988        }
989        assert_list_eq(new_chunks_list, old_chunks_list);
990
991        Ok(())
992    }
993
994    fn init_manager() -> Result<(TempDir, ChunkManager, PathBuf, PathBuf)> {
995        let tmp_dir = tempfile::tempdir()?;
996        let random_files_dir = tmp_dir.path().join("random_files");
997        let root_dir = tmp_dir.path().join("root_dir");
998        fs::create_dir_all(&random_files_dir)?;
999        fs::create_dir_all(&root_dir)?;
1000        let manager = ChunkManager::new(&root_dir);
1001
1002        Ok((tmp_dir, manager, root_dir, random_files_dir))
1003    }
1004
1005    fn create_random_files(
1006        at: &Path,
1007        num_files: usize,
1008        mb_per_file: usize,
1009    ) -> Result<Vec<PathBuf>> {
1010        let files = (0..num_files)
1011            .into_par_iter()
1012            .filter_map(|i| {
1013                let mut path = at.to_path_buf();
1014                path.push(format!("random_file_{i}"));
1015                match generate_file(&path, mb_per_file) {
1016                    Ok(_) => Some(path),
1017                    Err(err) => {
1018                        error!("Failed to generate random file with {err:?}");
1019                        None
1020                    }
1021                }
1022            })
1023            .collect::<Vec<_>>();
1024        if files.len() < num_files {
1025            return Err(eyre!("Failed to create a Failedkk"));
1026        }
1027        Ok(files)
1028    }
1029
1030    fn generate_file(path: &PathBuf, file_size_mb: usize) -> Result<()> {
1031        let mut file = File::create(path)?;
1032        let mut rng = thread_rng();
1033
1034        // can create [u8; 32] max at time. Thus each mb has 1024*32 such small chunks
1035        let n_small_chunks = file_size_mb * 1024 * 32;
1036        for _ in 0..n_small_chunks {
1037            let random_data: [u8; 32] = rng.gen();
1038            file.write_all(&random_data)?;
1039        }
1040        let size = file.metadata()?.len() as f64 / (1024 * 1024) as f64;
1041        assert_eq!(file_size_mb as f64, size);
1042
1043        Ok(())
1044    }
1045}