1use super::get_progress_bar;
10use super::upload::UploadedFile;
11use bytes::Bytes;
12use color_eyre::{
13 eyre::{bail, eyre},
14 Result,
15};
16use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
17use sn_client::{
18 protocol::storage::{Chunk, ChunkAddress},
19 FilesApi,
20};
21use std::{
22 collections::{BTreeMap, BTreeSet},
23 ffi::OsString,
24 fs::{self, File},
25 io::Write,
26 path::{Path, PathBuf},
27 time::Instant,
28};
29use tracing::{debug, error, info, trace};
30use walkdir::{DirEntry, WalkDir};
31use xor_name::XorName;
32
33const CHUNK_ARTIFACTS_DIR: &str = "chunk_artifacts";
34const METADATA_FILE: &str = "metadata";
35
36#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
40struct PathXorName(String);
41
42impl PathXorName {
43 fn new(path: &Path) -> PathXorName {
44 let path_as_lossy_str = path.as_os_str().to_string_lossy();
47 let path_xor = XorName::from_content(path_as_lossy_str.as_bytes());
48 PathXorName(hex::encode(path_xor))
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
54pub struct ChunkedFile {
55 pub file_path: PathBuf,
56 pub file_name: OsString,
57 pub head_chunk_address: ChunkAddress,
58 pub chunks: BTreeSet<(XorName, PathBuf)>,
59 pub data_map: Chunk,
60}
61
62#[derive(Debug, Clone, PartialEq, PartialOrd, Eq, Ord)]
65pub struct ChunkManager {
66 root_dir: PathBuf,
68 artifacts_dir: PathBuf,
70 files_to_chunk: Vec<(OsString, PathXorName, PathBuf)>,
71 chunks: BTreeMap<PathXorName, ChunkedFile>,
72 completed_files: Vec<(PathBuf, OsString, ChunkAddress)>,
73 resumed_chunk_count: usize,
74 resumed_files_count: usize,
75}
76
77impl ChunkManager {
78 pub fn new(root_dir: &Path) -> Self {
80 let artifacts_dir = root_dir.join(CHUNK_ARTIFACTS_DIR);
81 Self {
82 root_dir: root_dir.to_path_buf(),
83 artifacts_dir,
84 files_to_chunk: Default::default(),
85 chunks: Default::default(),
86 completed_files: Default::default(),
87 resumed_files_count: 0,
88 resumed_chunk_count: 0,
89 }
90 }
91
92 pub fn chunk_path(
101 &mut self,
102 files_path: &Path,
103 read_cache: bool,
104 include_data_maps: bool,
105 ) -> Result<()> {
106 self.chunk_with_iter(
107 WalkDir::new(files_path).into_iter().flatten(),
108 read_cache,
109 include_data_maps,
110 )
111 }
112
113 pub(crate) fn already_put_chunks(
116 &mut self,
117 entries_iter: impl Iterator<Item = DirEntry>,
118 make_files_public: bool,
119 ) -> Result<Vec<(XorName, PathBuf)>> {
120 self.chunk_with_iter(entries_iter, false, make_files_public)?;
121 Ok(self.get_chunks())
122 }
123
124 pub fn chunk_with_iter(
128 &mut self,
129 entries_iter: impl Iterator<Item = DirEntry>,
130 read_cache: bool,
131 include_data_maps: bool,
132 ) -> Result<()> {
133 let now = Instant::now();
134 self.files_to_chunk = Default::default();
136 self.chunks = Default::default();
137 self.completed_files = Default::default();
138 self.resumed_chunk_count = 0;
139 self.resumed_files_count = 0;
140
141 entries_iter.for_each(|entry| {
143 if entry.file_type().is_file() {
144 let path_xor = PathXorName::new(entry.path());
145 info!(
146 "Added file {:?} with path_xor: {path_xor:?} to be chunked/resumed",
147 entry.path()
148 );
149 self.files_to_chunk.push((
150 entry.file_name().to_owned(),
151 path_xor,
152 entry.into_path(),
153 ));
154 }
155 });
156 let total_files = self.files_to_chunk.len();
157
158 if total_files == 0 {
159 return Ok(());
160 };
161
162 if read_cache {
164 self.resume_path();
165 }
166
167 self.resumed_chunk_count = self
169 .chunks
170 .values()
171 .flat_map(|chunked_file| &chunked_file.chunks)
172 .count();
173 self.resumed_files_count = self.chunks.keys().collect::<BTreeSet<_>>().len();
175
176 {
178 let path_xors = self.chunks.keys().collect::<BTreeSet<_>>();
179 self.files_to_chunk
180 .retain(|(_, path_xor, _)| !path_xors.contains(path_xor));
181 }
182
183 {
185 let completed_files = self.chunks.iter().filter_map(|(_, chunked_file)| {
186 if chunked_file.chunks.is_empty() {
187 Some((
188 chunked_file.file_path.clone(),
189 chunked_file.file_name.clone(),
190 chunked_file.head_chunk_address,
191 ))
192 } else {
193 None
194 }
195 });
196
197 self.completed_files.extend(completed_files);
198 }
199
200 if self.files_to_chunk.is_empty() {
202 debug!(
203 "All files_to_chunk ({total_files:?}) were resumed. Returning the resumed chunks.",
204 );
205 debug!("It took {:?} to resume all the files", now.elapsed());
206 return Ok(());
207 }
208
209 let progress_bar = get_progress_bar(total_files as u64)?;
210 progress_bar.println(format!("Chunking {total_files} files..."));
211
212 let artifacts_dir = &self.artifacts_dir.clone();
213 let chunked_files = self.files_to_chunk
214 .par_iter()
215 .map(|(original_file_name, path_xor, path)| {
216 let file_chunks_dir = {
217 let file_chunks_dir = artifacts_dir.join(&path_xor.0);
218 fs::create_dir_all(&file_chunks_dir).map_err(|err| {
219 error!("Failed to create folder {file_chunks_dir:?} for SE chunks with error {err:?}!");
220 eyre!("Failed to create dir {file_chunks_dir:?} for SE chunks with error {err:?}")
221 })?;
222 file_chunks_dir
223 };
224
225 match FilesApi::chunk_file(path, &file_chunks_dir, include_data_maps) {
226 Ok((head_chunk_address, data_map, size, chunks)) => {
227 progress_bar.clone().inc(1);
228 debug!("Chunked {original_file_name:?} with {path_xor:?} into file's XorName: {head_chunk_address:?} of size {size}, and chunks len: {}", chunks.len());
229
230 let chunked_file = ChunkedFile {
231 head_chunk_address,
232 file_path: path.to_owned(),
233 file_name: original_file_name.clone(),
234 chunks: chunks.into_iter().collect(),
235 data_map
236 };
237 Ok((path_xor.clone(), chunked_file))
238 }
239 Err(err) => {
240 println!("Failed to chunk file {path:?}/{path_xor:?} with err: {err:?}");
241 error!("Failed to chunk file {path:?}/{path_xor:?} with err: {err:?}");
242 Err(eyre!("Failed to chunk file {path:?}/{path_xor:?} with err: {err:?}"))
243 }
244 }
245 })
246 .collect::<Result<BTreeMap<_, _>>>()?;
247 debug!(
248 "Out of total files_to_chunk {total_files}, we have resumed {} files and chunked {} files",
249 self.resumed_files_count,
250 chunked_files.len()
251 );
252
253 if chunked_files.is_empty() && self.chunks.is_empty() {
257 bail!(
258 "The provided path does not contain any file. Please check your path!\nExiting..."
259 );
260 }
261
262 chunked_files
264 .par_iter()
265 .map(|(path_xor, chunked_file)| {
266 let metadata_path = artifacts_dir.join(&path_xor.0).join(METADATA_FILE);
267
268 info!("Metadata path is: {metadata_path:?}");
269 let metadata = rmp_serde::to_vec(&(
270 chunked_file.head_chunk_address,
271 chunked_file.data_map.clone(),
272 ))
273 .map_err(|_| {
274 error!("Failed to serialize file_xor_addr for writing metadata");
275 eyre!("Failed to serialize file_xor_addr for writing metadata")
276 })?;
277
278 let mut metadata_file = File::create(&metadata_path).map_err(|_| {
279 error!("Failed to create metadata_path {metadata_path:?} for {path_xor:?}");
280 eyre!("Failed to create metadata_path {metadata_path:?} for {path_xor:?}")
281 })?;
282
283 metadata_file.write_all(&metadata).map_err(|_| {
284 error!("Failed to write metadata to {metadata_path:?} for {path_xor:?}");
285 eyre!("Failed to write metadata to {metadata_path:?} for {path_xor:?}")
286 })?;
287
288 debug!("Wrote metadata for {path_xor:?}");
289 Ok(())
290 })
291 .collect::<Result<()>>()?;
292
293 progress_bar.finish_and_clear();
294 debug!("It took {:?} to chunk {} files", now.elapsed(), total_files);
295 self.chunks.extend(chunked_files);
296
297 Ok(())
298 }
299
300 fn resume_path(&mut self) {
302 let artifacts_dir = self.artifacts_dir.clone();
303 let resumed = self
304 .files_to_chunk
305 .par_iter()
306 .filter_map(|(original_file_name, path_xor, original_file_path)| {
307 let file_chunks_dir = artifacts_dir.join(&path_xor.0);
309 if !file_chunks_dir.exists() {
310 return None;
311 }
312 Self::read_file_chunks_dir(
313 file_chunks_dir,
314 path_xor,
315 original_file_path.clone(),
316 original_file_name.clone(),
317 )
318 })
319 .collect::<BTreeMap<_, _>>();
320
321 self.chunks.extend(resumed);
322 }
323
324 pub fn get_chunks(&self) -> Vec<(XorName, PathBuf)> {
327 self.chunks
328 .values()
329 .flat_map(|chunked_file| &chunked_file.chunks)
330 .cloned()
331 .collect::<Vec<(XorName, PathBuf)>>()
332 }
333
334 pub fn is_chunks_empty(&self) -> bool {
335 self.chunks
336 .values()
337 .flat_map(|chunked_file| &chunked_file.chunks)
338 .next()
339 .is_none()
340 }
341
342 pub fn mark_completed_all(&mut self) -> Result<()> {
345 let all_chunks = self
346 .chunks
347 .values()
348 .flat_map(|chunked_file| &chunked_file.chunks)
349 .map(|(chunk, _)| *chunk)
350 .collect::<Vec<_>>();
351 self.mark_completed(all_chunks.into_iter())
352 }
353
354 pub fn mark_completed(&mut self, chunks: impl Iterator<Item = XorName>) -> Result<()> {
357 let set_of_completed_chunks = chunks.collect::<BTreeSet<_>>();
358 trace!("marking as completed: {set_of_completed_chunks:?}");
359
360 self.chunks
362 .par_iter()
363 .flat_map(|(_, chunked_file)| &chunked_file.chunks)
364 .map(|(chunk_xor, chunk_path)| {
365 if set_of_completed_chunks.contains(chunk_xor) {
366 debug!("removing {chunk_xor:?} at {chunk_path:?} as it is marked as completed");
367 fs::remove_file(chunk_path).map_err(|_err| {
368 error!("Failed to remove SE chunk {chunk_xor} from {chunk_path:?}");
369 eyre!("Failed to remove SE chunk {chunk_xor} from {chunk_path:?}")
370 })?;
371 }
372 Ok(())
373 })
374 .collect::<Result<()>>()?;
375
376 let mut entire_file_is_done = BTreeSet::new();
377 self.chunks.iter_mut().for_each(|(path_xor, chunked_file)| {
379 chunked_file
380 .chunks
381 .retain(|(chunk_xor, _)| !set_of_completed_chunks.contains(chunk_xor));
383 if chunked_file.chunks.is_empty() {
384 entire_file_is_done.insert(path_xor.clone());
385 }
386 });
387
388 for path_xor in &entire_file_is_done {
389 if let Some(chunked_file) = self.chunks.remove(path_xor) {
391 trace!("removed {path_xor:?} from chunks list");
392
393 self.completed_files.push((
394 chunked_file.file_path.clone(),
395 chunked_file.file_name.clone(),
396 chunked_file.head_chunk_address,
397 ));
398
399 let uploaded_file_metadata = UploadedFile {
400 filename: chunked_file.file_name,
401 data_map: Some(chunked_file.data_map.value),
402 };
403 let _result =
405 uploaded_file_metadata.write(&self.root_dir, &chunked_file.head_chunk_address);
406 }
407 }
408 Ok(())
409
410 }
431
432 pub(crate) fn completed_files(&self) -> &Vec<(PathBuf, OsString, ChunkAddress)> {
435 &self.completed_files
436 }
437
438 pub(crate) fn incomplete_files(&self) -> Vec<(&PathBuf, &OsString, &ChunkAddress)> {
440 self.chunks
441 .values()
442 .map(|chunked_file| {
443 (
444 &chunked_file.file_path,
445 &chunked_file.file_name,
446 &chunked_file.head_chunk_address,
447 )
448 })
449 .collect()
450 }
451
452 pub(crate) fn iter_chunked_files(&mut self) -> impl Iterator<Item = &ChunkedFile> {
454 self.chunks.values()
455 }
456
457 fn read_file_chunks_dir(
463 file_chunks_dir: PathBuf,
464 path_xor: &PathXorName,
465 original_file_path: PathBuf,
466 original_file_name: OsString,
467 ) -> Option<(PathXorName, ChunkedFile)> {
468 let mut file_chunk_address: Option<ChunkAddress> = None;
469 let mut data_map = Chunk::new(Bytes::new());
470 debug!("Trying to resume {path_xor:?} as the file_chunks_dir exists");
471
472 let chunks = WalkDir::new(file_chunks_dir.clone())
473 .into_iter()
474 .flatten()
475 .filter_map(|entry| {
476 if !entry.file_type().is_file() {
477 return None;
478 }
479 if entry.file_name() == METADATA_FILE {
480 if let Some((address, optional_data_map)) =
481 Self::try_read_metadata(entry.path())
482 {
483 file_chunk_address = Some(address);
484 data_map = optional_data_map;
485 debug!("Obtained metadata for {path_xor:?}");
486 } else {
487 error!("Could not read metadata for {path_xor:?}");
488 }
489 return None;
491 }
492
493 if let Some(file_name) = entry.file_name().to_str() {
495 Self::hex_decode_xorname(file_name)
496 .map(|chunk_xorname| (chunk_xorname, entry.into_path()))
497 } else {
498 error!(
499 "Failed to convert OsString to str for {:?}",
500 entry.file_name()
501 );
502 None
503 }
504 })
505 .collect::<BTreeSet<_>>();
506
507 match file_chunk_address {
508 Some(head_chunk_address) => {
509 debug!("Resuming {} chunks for file {original_file_name:?} and with file_xor_addr {head_chunk_address:?}/{path_xor:?}", chunks.len());
510
511 Some((
512 path_xor.clone(),
513 ChunkedFile {
514 file_path: original_file_path,
515 file_name: original_file_name,
516 head_chunk_address,
517 chunks,
518 data_map,
519 },
520 ))
521 }
522 _ => {
523 error!("Metadata file or data map was not present for {path_xor:?}");
524 None
526 }
527 }
528 }
529
530 fn try_read_metadata(path: &Path) -> Option<(ChunkAddress, Chunk)> {
533 let metadata = fs::read(path)
534 .map_err(|err| error!("Failed to read metadata with err {err:?}"))
535 .ok()?;
536 let metadata: (ChunkAddress, Chunk) = rmp_serde::from_slice(&metadata)
538 .map_err(|err| error!("Failed to deserialize metadata with err {err:?}"))
539 .ok()?;
540
541 Some(metadata)
542 }
543
544 fn hex_decode_xorname(string: &str) -> Option<XorName> {
546 let hex_decoded = hex::decode(string)
547 .map_err(|err| error!("Failed to decode {string} into bytes with err {err:?}"))
548 .ok()?;
549 let decoded_xorname: [u8; xor_name::XOR_NAME_LEN] = hex_decoded
550 .try_into()
551 .map_err(|_| error!("Failed to convert hex_decoded xorname into an [u8; 32]"))
552 .ok()?;
553 Some(XorName(decoded_xorname))
554 }
555}
556
557#[cfg(test)]
558mod tests {
559 use super::*;
560 use color_eyre::{eyre::eyre, Result};
561 use rand::{thread_rng, Rng};
562 use rayon::prelude::IntoParallelIterator;
563 use sn_logging::LogBuilder;
564 use tempfile::TempDir;
565
566 pub fn assert_list_eq<I, J, K>(a: I, b: J)
568 where
569 K: Eq + Clone,
570 I: IntoIterator<Item = K>,
571 J: IntoIterator<Item = K>,
572 {
573 let vec1: Vec<_> = a.into_iter().collect::<Vec<_>>();
574 let mut vec2: Vec<_> = b.into_iter().collect();
575
576 assert_eq!(vec1.len(), vec2.len());
577
578 for item1 in &vec1 {
579 let idx2 = vec2
580 .iter()
581 .position(|item2| item1 == item2)
582 .expect("Item not found in second list");
583
584 vec2.swap_remove(idx2);
585 }
586
587 assert_eq!(vec2.len(), 0);
588 }
589
590 #[test]
591 fn chunked_files_should_be_written_to_artifacts_dir() -> Result<()> {
592 let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
593 let (_tmp_dir, mut manager, _, random_files_dir) = init_manager()?;
594 let artifacts_dir = manager.artifacts_dir.clone();
595 let _ = create_random_files(&random_files_dir, 1, 1)?;
596 manager.chunk_path(&random_files_dir, true, true)?;
597
598 let chunks = manager.get_chunks();
599 assert_eq!(chunks.len(), 4);
601
602 let n_folders = WalkDir::new(&artifacts_dir)
604 .into_iter()
605 .flatten()
606 .filter(|entry| entry.file_type().is_dir() && entry.path() != artifacts_dir)
607 .count();
608 assert_eq!(n_folders, 1);
609
610 let n_files = WalkDir::new(&artifacts_dir)
612 .into_iter()
613 .flatten()
614 .filter(|entry| {
615 info!("direntry {entry:?}");
616 entry.file_type().is_file()
617 })
618 .count();
619 assert_eq!(n_files, chunks.len() + 1);
620
621 let mut file_xor_addr_from_metadata = None;
623 for entry in WalkDir::new(&artifacts_dir).into_iter().flatten() {
624 if entry.file_type().is_file() && entry.file_name() == METADATA_FILE {
625 let metadata = ChunkManager::try_read_metadata(entry.path());
626
627 if let Some((head_chunk_addr, _datamap)) = metadata {
628 file_xor_addr_from_metadata = Some(head_chunk_addr);
629 }
630 }
631 }
632 let file_xor_addr_from_metadata =
633 file_xor_addr_from_metadata.expect("The metadata file should be present");
634 let file_xor_addr = manager
635 .chunks
636 .values()
637 .next()
638 .expect("1 file should be present")
639 .head_chunk_address;
640 assert_eq!(file_xor_addr_from_metadata, file_xor_addr);
641
642 let chunk_xornames = manager
644 .chunks
645 .values()
646 .next()
647 .expect("We must have 1 file here")
648 .chunks
649 .iter()
650 .map(|(xor_name, _)| *xor_name)
651 .collect::<BTreeSet<_>>();
652 for entry in WalkDir::new(&artifacts_dir).into_iter().flatten() {
653 let file_name = entry.file_name();
654 if entry.file_type().is_file() && file_name != METADATA_FILE {
655 let chunk_xorname_from_filename =
656 ChunkManager::hex_decode_xorname(file_name.to_str().unwrap())
657 .expect("Failed to get xorname from hex encoded file_name");
658 assert!(chunk_xornames.contains(&chunk_xorname_from_filename));
659 }
660 }
661
662 Ok(())
663 }
664
665 #[test]
666 fn no_datamap_chunked_files_should_be_written_to_artifacts_dir_when_not_public() -> Result<()> {
667 let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
668 let (_tmp_dir, mut manager, _, random_files_dir) = init_manager()?;
669 let artifacts_dir = manager.artifacts_dir.clone();
670 let _ = create_random_files(&random_files_dir, 1, 1)?;
671
672 manager.chunk_path(&random_files_dir, true, false)?;
674
675 let chunks = manager.get_chunks();
676 assert_eq!(chunks.len(), 3);
678
679 let n_folders = WalkDir::new(&artifacts_dir)
681 .into_iter()
682 .flatten()
683 .filter(|entry| entry.file_type().is_dir() && entry.path() != artifacts_dir)
684 .count();
685 assert_eq!(n_folders, 1);
686
687 let n_files = WalkDir::new(&artifacts_dir)
689 .into_iter()
690 .flatten()
691 .filter(|entry| {
692 info!("direntry {entry:?}");
693 entry.file_type().is_file()
694 })
695 .count();
696 assert_eq!(n_files, chunks.len() + 1);
697
698 let mut file_xor_addr_from_metadata = None;
700 for entry in WalkDir::new(&artifacts_dir).into_iter().flatten() {
701 if entry.file_type().is_file() && entry.file_name() == METADATA_FILE {
702 let metadata = ChunkManager::try_read_metadata(entry.path());
703
704 if let Some((head_chunk_addr, _datamap)) = metadata {
705 file_xor_addr_from_metadata = Some(head_chunk_addr);
706 }
707 }
708 }
709 let file_xor_addr_from_metadata =
710 file_xor_addr_from_metadata.expect("The metadata file should be present");
711 let file_xor_addr = manager
712 .chunks
713 .values()
714 .next()
715 .expect("1 file should be present")
716 .head_chunk_address;
717 assert_eq!(file_xor_addr_from_metadata, file_xor_addr);
718
719 let chunk_xornames = manager
721 .chunks
722 .values()
723 .next()
724 .expect("We must have 1 file here")
725 .chunks
726 .iter()
727 .map(|(xor_name, _)| *xor_name)
728 .collect::<BTreeSet<_>>();
729 for entry in WalkDir::new(&artifacts_dir).into_iter().flatten() {
730 let file_name = entry.file_name();
731 if entry.file_type().is_file() && file_name != METADATA_FILE {
732 let chunk_xorname_from_filename =
733 ChunkManager::hex_decode_xorname(file_name.to_str().unwrap())
734 .expect("Failed to get xorname from hex encoded file_name");
735 assert!(chunk_xornames.contains(&chunk_xorname_from_filename));
736 }
737 }
738
739 Ok(())
740 }
741
742 #[test]
743 fn chunks_should_be_removed_from_artifacts_dir_if_marked_as_completed() -> Result<()> {
744 let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
745 let (_tmp_dir, mut manager, _, random_files_dir) = init_manager()?;
746
747 let _ = create_random_files(&random_files_dir, 1, 1)?;
748 manager.chunk_path(&random_files_dir, true, true)?;
749
750 let path_xor = manager.chunks.keys().next().unwrap().clone();
751 let chunked_file = manager.chunks.values().next().unwrap().clone();
752 let file_xor_addr = chunked_file.head_chunk_address;
753 let (chunk, _) = chunked_file
754 .chunks
755 .first()
756 .expect("Must contain 1 chunk")
757 .clone();
758 let total_chunks = manager.chunks.values().next().unwrap().chunks.len();
759 manager.mark_completed(vec![chunk].into_iter())?;
760
761 assert_eq!(
763 manager
764 .chunks
765 .values()
766 .next()
767 .expect("Since the file was not fully completed, it should be present")
768 .chunks
769 .len(),
770 total_chunks - 1,
771 );
772
773 let file_chunks_dir = manager.artifacts_dir.join(&path_xor.0);
775 let (path_xor_from_dir, chunked_file_from_dir) = ChunkManager::read_file_chunks_dir(
776 file_chunks_dir,
777 &path_xor,
778 chunked_file.file_path,
779 chunked_file.file_name,
780 )
781 .expect("Folder and metadata should be present");
782 assert_eq!(chunked_file_from_dir.chunks.len(), total_chunks - 1);
783 assert_eq!(chunked_file_from_dir.head_chunk_address, file_xor_addr);
784 assert_eq!(path_xor_from_dir, path_xor);
785
786 assert!(manager.completed_files.is_empty());
788
789 Ok(())
790 }
791
792 #[test]
793 fn marking_all_chunks_as_completed_should_not_remove_the_dir() -> Result<()> {
794 let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
795 let (_tmp_dir, mut manager, _, random_files_dir) = init_manager()?;
796
797 let _ = create_random_files(&random_files_dir, 5, 5)?;
798 manager.chunk_path(&random_files_dir, true, true)?;
799 let manager_clone = manager.clone();
801
802 let n_folders = WalkDir::new(&manager.artifacts_dir)
803 .into_iter()
804 .flatten()
805 .filter(|entry| entry.file_type().is_dir() && entry.path() != manager.artifacts_dir)
806 .count();
807 assert_eq!(n_folders, 5);
808
809 manager.mark_completed_all()?;
810
811 assert_eq!(manager.completed_files.len(), 5);
813
814 for (path_xor, chunked_file) in manager_clone.chunks.iter() {
816 let file_chunks_dir = manager_clone.artifacts_dir.join(path_xor.0.clone());
817 let (path_xor_from_dir, chunked_file_from_dir) = ChunkManager::read_file_chunks_dir(
818 file_chunks_dir,
819 path_xor,
820 chunked_file.file_path.clone(),
821 chunked_file.file_name.to_owned(),
822 )
823 .expect("Folder and metadata should be present");
824 assert_eq!(chunked_file_from_dir.chunks.len(), 0);
825 assert_eq!(
826 chunked_file_from_dir.head_chunk_address,
827 chunked_file.head_chunk_address
828 );
829 assert_eq!(&path_xor_from_dir, path_xor);
830 }
831
832 Ok(())
833 }
834
835 #[test]
836 fn mark_none_and_resume() -> Result<()> {
837 let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
838 let (_tmp_dir, mut manager, root_dir, random_files_dir) = init_manager()?;
839
840 let _ = create_random_files(&random_files_dir, 5, 5)?;
841 manager.chunk_path(&random_files_dir, true, true)?;
842
843 let mut new_manager = ChunkManager::new(&root_dir);
844 new_manager.chunk_path(&random_files_dir, true, true)?;
845
846 let total_chunk_count = manager
848 .chunks
849 .values()
850 .flat_map(|chunked_file| &chunked_file.chunks)
851 .count();
852 assert_eq!(manager.resumed_chunk_count, 0);
853 assert_eq!(new_manager.resumed_chunk_count, total_chunk_count);
854
855 assert_eq!(manager.chunks, new_manager.chunks);
857 assert_eq!(manager.completed_files, new_manager.completed_files);
858
859 Ok(())
860 }
861
862 #[test]
863 fn mark_one_chunk_and_resume() -> Result<()> {
864 let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
865 let (_tmp_dir, mut manager, root_dir, random_files_dir) = init_manager()?;
866
867 let _ = create_random_files(&random_files_dir, 5, 5)?;
868 manager.chunk_path(&random_files_dir, true, true)?;
869
870 let total_chunks_count = manager
871 .chunks
872 .values()
873 .flat_map(|chunked_file| &chunked_file.chunks)
874 .count();
875
876 let removed_chunk = manager
878 .chunks
879 .values()
880 .next()
881 .expect("Atleast 1 file should be present")
882 .chunks
883 .iter()
884 .next()
885 .expect("Chunk should be present")
886 .0;
887 manager.mark_completed([removed_chunk].into_iter())?;
888 let mut new_manager = ChunkManager::new(&root_dir);
889 new_manager.chunk_path(&random_files_dir, true, true)?;
890
891 assert_eq!(manager.resumed_chunk_count, 0);
893 assert_eq!(new_manager.resumed_chunk_count, total_chunks_count - 1);
894 assert_eq!(
896 new_manager
897 .chunks
898 .values()
899 .flat_map(|chunked_file| &chunked_file.chunks)
900 .count(),
901 total_chunks_count - 1
902 );
903
904 assert_eq!(new_manager.completed_files.len(), 0);
906
907 Ok(())
908 }
909
910 #[test]
911 fn mark_all_and_resume() -> Result<()> {
912 let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
913 let (_tmp_dir, mut manager, root_dir, random_files_dir) = init_manager()?;
914
915 let _ = create_random_files(&random_files_dir, 5, 5)?;
916 manager.chunk_path(&random_files_dir, true, true)?;
917 manager.mark_completed_all()?;
918
919 let mut new_manager = ChunkManager::new(&root_dir);
920 new_manager.chunk_path(&random_files_dir, true, true)?;
921
922 assert_eq!(new_manager.chunks.len(), 5);
924 assert_eq!(
925 new_manager
926 .chunks
927 .values()
928 .flat_map(|chunked_file| &chunked_file.chunks)
929 .count(),
930 0
931 );
932 assert_eq!(new_manager.resumed_chunk_count, 0);
934
935 assert_eq!(new_manager.completed_files.len(), 5);
937
938 Ok(())
939 }
940
941 #[test]
942 fn absence_of_metadata_file_should_re_chunk_the_entire_file() -> Result<()> {
943 let _log_guards = LogBuilder::init_single_threaded_tokio_test("chunk_manager", true);
944 let (_tmp_dir, mut manager, _root_dir, random_files_dir) = init_manager()?;
945
946 let mut random_files = create_random_files(&random_files_dir, 1, 1)?;
947 let random_file = random_files.remove(0);
948 manager.chunk_path(&random_files_dir, true, true)?;
949
950 let mut old_chunks_list = BTreeSet::new();
951 for entry in WalkDir::new(&manager.artifacts_dir).into_iter().flatten() {
952 let file_name = entry.file_name();
953 if entry.file_type().is_file() && file_name != METADATA_FILE {
954 let chunk_xorname_from_filename =
955 ChunkManager::hex_decode_xorname(file_name.to_str().unwrap())
956 .expect("Failed to get xorname from hex encoded file_name");
957 old_chunks_list.insert(chunk_xorname_from_filename);
958 }
959 }
960
961 let path_xor = PathXorName::new(&random_file);
963 let metadata_path = manager.artifacts_dir.join(path_xor.0).join(METADATA_FILE);
964 fs::remove_file(&metadata_path)?;
965
966 manager.chunk_path(&random_files_dir, true, true)?;
968 assert_eq!(manager.resumed_chunk_count, 0);
970 assert_eq!(
972 manager.get_chunks().len(),
973 4,
974 "we have correct chunk len including data_map"
975 );
976 assert!(metadata_path.exists());
978
979 let mut new_chunks_list = BTreeSet::new();
980 for entry in WalkDir::new(&manager.artifacts_dir).into_iter().flatten() {
981 let file_name = entry.file_name();
982 if entry.file_type().is_file() && file_name != METADATA_FILE {
983 let chunk_xorname_from_filename =
984 ChunkManager::hex_decode_xorname(file_name.to_str().unwrap())
985 .expect("Failed to get xorname from hex encoded file_name");
986 new_chunks_list.insert(chunk_xorname_from_filename);
987 }
988 }
989 assert_list_eq(new_chunks_list, old_chunks_list);
990
991 Ok(())
992 }
993
994 fn init_manager() -> Result<(TempDir, ChunkManager, PathBuf, PathBuf)> {
995 let tmp_dir = tempfile::tempdir()?;
996 let random_files_dir = tmp_dir.path().join("random_files");
997 let root_dir = tmp_dir.path().join("root_dir");
998 fs::create_dir_all(&random_files_dir)?;
999 fs::create_dir_all(&root_dir)?;
1000 let manager = ChunkManager::new(&root_dir);
1001
1002 Ok((tmp_dir, manager, root_dir, random_files_dir))
1003 }
1004
1005 fn create_random_files(
1006 at: &Path,
1007 num_files: usize,
1008 mb_per_file: usize,
1009 ) -> Result<Vec<PathBuf>> {
1010 let files = (0..num_files)
1011 .into_par_iter()
1012 .filter_map(|i| {
1013 let mut path = at.to_path_buf();
1014 path.push(format!("random_file_{i}"));
1015 match generate_file(&path, mb_per_file) {
1016 Ok(_) => Some(path),
1017 Err(err) => {
1018 error!("Failed to generate random file with {err:?}");
1019 None
1020 }
1021 }
1022 })
1023 .collect::<Vec<_>>();
1024 if files.len() < num_files {
1025 return Err(eyre!("Failed to create a Failedkk"));
1026 }
1027 Ok(files)
1028 }
1029
1030 fn generate_file(path: &PathBuf, file_size_mb: usize) -> Result<()> {
1031 let mut file = File::create(path)?;
1032 let mut rng = thread_rng();
1033
1034 let n_small_chunks = file_size_mb * 1024 * 32;
1036 for _ in 0..n_small_chunks {
1037 let random_data: [u8; 32] = rng.gen();
1038 file.write_all(&random_data)?;
1039 }
1040 let size = file.metadata()?.len() as f64 / (1024 * 1024) as f64;
1041 assert_eq!(file_size_mb as f64, size);
1042
1043 Ok(())
1044 }
1045}