use std::cell::RefCell;
use std::cmp::{max, min, Reverse};
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::env::{args_os, current_dir};
use std::ffi::{OsStr, OsString};
use std::fmt::Debug;
use std::fs::File;
use std::hash::Hash;
use std::io;
use std::io::BufWriter;
use std::iter::FromIterator;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use chrono::{DateTime, Local};
use console::Term;
use crossbeam_utils::thread;
use indexmap::IndexMap;
use itertools::Itertools;
use rayon::prelude::*;
use serde::*;
use smallvec::SmallVec;
use thread_local::ThreadLocal;
use crate::arg::Arg;
use crate::config::*;
use crate::device::{DiskDevice, DiskDevices};
use crate::error::Error;
use crate::file::*;
use crate::hasher::FileHasher;
use crate::log::{Log, LogExt, ProgressBarLength};
use crate::path::Path;
use crate::phase::{Phase, Phases};
use crate::report::{FileStats, ReportHeader, ReportWriter};
use crate::rlimit::RLIMIT_OPEN_FILES;
use crate::selector::PathSelector;
use crate::semaphore::Semaphore;
use crate::walk::Walk;
struct GroupMap<T, K, V, F>
where
K: PartialEq + Hash,
F: Fn(T) -> (K, V),
{
item_type: PhantomData<T>,
groups: BTreeMap<K, SmallVec<[V; 1]>>,
split_fn: F,
}
impl<T, K, V, F> GroupMap<T, K, V, F>
where
K: Eq + Hash + Ord,
F: Fn(T) -> (K, V),
{
pub fn new(split_fn: F) -> GroupMap<T, K, V, F> {
GroupMap {
item_type: PhantomData,
groups: BTreeMap::new(),
split_fn,
}
}
pub fn add(&mut self, item: T) {
let (key, new_item) = (self.split_fn)(item);
self.groups.entry(key).or_default().push(new_item);
}
}
impl<T, K, V, F> IntoIterator for GroupMap<T, K, V, F>
where
K: Eq + Hash,
F: Fn(T) -> (K, V),
{
type Item = (K, SmallVec<[V; 1]>);
type IntoIter = <BTreeMap<K, SmallVec<[V; 1]>> as IntoIterator>::IntoIter;
fn into_iter(self) -> Self::IntoIter {
self.groups.into_iter()
}
}
struct GroupCtx<'a> {
pub config: &'a GroupConfig,
pub log: &'a dyn Log,
phases: Phases,
group_filter: FileGroupFilter,
devices: DiskDevices,
path_selector: PathSelector,
hasher: FileHasher<'a>,
}
impl<'a> GroupCtx<'a> {
pub fn new(config: &'a GroupConfig, log: &'a dyn Log) -> Result<GroupCtx<'a>, Error> {
let phases = if config.transform.is_some() {
Phases::new(vec![
Phase::Walk,
Phase::FetchExtents,
Phase::TransformAndGroup,
])
} else {
Phases::new(vec![
Phase::Walk,
Phase::GroupBySize,
Phase::FetchExtents,
Phase::GroupByPrefix,
Phase::GroupBySuffix,
Phase::GroupByContents,
])
};
let thread_pool_sizes = config.thread_pool_sizes();
let devices = DiskDevices::new(&thread_pool_sizes);
let transform = match config.transform() {
None => None,
Some(Ok(transform)) => Some(transform),
Some(Err(e)) => return Err(Error::new(format!("Invalid transform: {e}"))),
};
let base_dir = Path::from(current_dir().unwrap_or_default());
let group_filter = config.group_filter();
let path_selector = config
.path_selector(&base_dir)
.map_err(|e| format!("Invalid pattern: {e}"))?;
let hasher = if config.cache {
FileHasher::new_cached(config.hash_fn, transform, log)?
} else {
FileHasher::new(config.hash_fn, transform, log)
};
Self::check_pool_config(thread_pool_sizes, &devices)?;
Ok(GroupCtx {
config,
log,
phases,
group_filter,
devices,
path_selector,
hasher,
})
}
fn check_pool_config(
thread_pool_sizes: HashMap<OsString, Parallelism>,
devices: &DiskDevices,
) -> Result<(), Error> {
let mut allowed_pool_names = DiskDevices::device_types();
allowed_pool_names.push("main");
allowed_pool_names.push("default");
for (name, _) in thread_pool_sizes.iter() {
let name = name.to_string_lossy();
match name.strip_prefix("dev:") {
Some(name) if devices.get_by_name(OsStr::new(name)).is_none() => {
return Err(Error::new(format!("Unknown device: {name}")));
}
None if !allowed_pool_names.contains(&name.as_ref()) => {
return Err(Error::new(format!(
"Unknown thread pool or device type: {name}"
)));
}
_ => {}
}
}
Ok(())
}
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)]
pub struct FileGroup<F> {
pub file_len: FileLen,
pub file_hash: FileHash,
pub files: Vec<F>,
}
#[derive(Debug)]
pub enum Replication {
Underreplicated(usize),
Overreplicated(usize),
}
#[derive(Debug)]
pub struct FileGroupFilter {
pub replication: Replication,
pub root_paths: Vec<Path>,
pub group_by_id: bool,
}
impl<F> FileGroup<F> {
pub fn file_count(&self) -> usize {
self.files.len()
}
pub fn total_size(&self) -> FileLen {
self.file_len * self.file_count() as u64
}
pub fn map<R>(self, f: impl Fn(F) -> R) -> FileGroup<R> {
FileGroup {
file_len: self.file_len,
file_hash: self.file_hash,
files: self.files.into_iter().map(f).collect(),
}
}
pub fn filter_map<R>(self, f: impl Fn(F) -> Option<R>) -> FileGroup<R> {
FileGroup {
file_len: self.file_len,
file_hash: self.file_hash,
files: self.files.into_iter().filter_map(f).collect(),
}
}
pub fn try_map_all<R: Debug, E: Debug>(
self,
f: impl Fn(F) -> Result<R, E>,
) -> Result<FileGroup<R>, Vec<E>> {
let (ok, err): (Vec<_>, Vec<_>) = self.files.into_iter().map(f).partition(Result::is_ok);
if err.is_empty() {
Ok(FileGroup {
file_len: self.file_len,
file_hash: self.file_hash,
files: ok.into_iter().map(Result::unwrap).collect(),
})
} else {
Err(err.into_iter().map(Result::unwrap_err).collect())
}
}
pub fn flat_map<R, I>(self, f: impl Fn(F) -> I) -> FileGroup<R>
where
I: IntoIterator<Item = R>,
{
FileGroup {
file_len: self.file_len,
file_hash: self.file_hash,
files: self.files.into_iter().flat_map(f).collect(),
}
}
pub fn partition_by_key<K: Eq + Hash>(self, key_fn: impl Fn(&F) -> K) -> Vec<FileGroup<F>> {
let mut groups = HashMap::new();
for f in self.files {
let key = key_fn(&f);
groups.entry(key).or_insert_with(Vec::new).push(f);
}
groups
.into_values()
.map(|files| FileGroup {
file_len: self.file_len,
file_hash: self.file_hash.clone(),
files,
})
.collect()
}
}
impl<F: AsRef<Path> + core::fmt::Debug> FileGroup<F> {
#[cfg(test)]
fn paths(&self) -> Vec<Path> {
self.files.iter().map(|f| f.as_ref().clone()).collect_vec()
}
}
impl<F: AsRef<FileId>> FileGroup<F> {
pub fn unique_count(&self) -> usize {
self.files
.iter()
.dedup_by(|f1, f2| FileId::of(f1) == FileId::of(f2))
.count()
}
pub fn unique_size(&self) -> FileLen {
self.file_len * self.unique_count() as u64
}
pub fn sort_by_id(&mut self) {
self.files.sort_by_key(|f| FileId::of(f));
}
}
impl<F: AsRef<Path> + AsRef<FileId>> FileGroup<F> {
pub fn matches(&self, filter: &FileGroupFilter) -> bool {
match filter.replication {
Replication::Overreplicated(rf) => self.subgroup_count(filter) > rf,
Replication::Underreplicated(_) => true,
}
}
pub fn matches_strictly(&self, filter: &FileGroupFilter) -> bool {
let count = self.subgroup_count(filter);
match filter.replication {
Replication::Overreplicated(rf) => count > rf,
Replication::Underreplicated(rf) => count < rf,
}
}
pub fn missing_count(&self, filter: &FileGroupFilter) -> usize {
match filter.replication {
Replication::Overreplicated(_) => 0,
Replication::Underreplicated(rf) => rf.saturating_sub(self.subgroup_count(filter)),
}
}
pub fn redundant_count(&self, filter: &FileGroupFilter) -> usize {
match filter.replication {
Replication::Underreplicated(_) => 0,
Replication::Overreplicated(rf) => {
let rf = max(rf, 1);
if filter.root_paths.is_empty() {
self.file_count().saturating_sub(rf)
} else {
let sub_groups =
FileSubGroup::group(&self.files, &filter.root_paths, filter.group_by_id);
let sub_group_lengths = sub_groups
.into_iter()
.map(|sg| sg.files.len())
.collect_vec();
let cutoff_index = min(rf, sub_group_lengths.len());
sub_group_lengths[cutoff_index..].iter().sum()
}
}
}
}
pub fn reported_count(&self, filter: &FileGroupFilter) -> usize {
match filter.replication {
Replication::Overreplicated(_) => self.redundant_count(filter),
Replication::Underreplicated(_) => self.missing_count(filter),
}
}
fn subgroup_count(&self, filter: &FileGroupFilter) -> usize {
FileSubGroup::group(&self.files, &filter.root_paths, filter.group_by_id).len()
}
pub fn sort_by_path(&mut self, root_paths: &[Path]) {
self.files.sort_by(|f1, f2| {
let p1: &Path = f1.as_ref();
let p2: &Path = f2.as_ref();
p1.cmp(p2)
});
if !root_paths.is_empty() {
self.files = FileSubGroup::group(self.files.drain(..), root_paths, true)
.into_iter()
.flat_map(|g| g.files)
.collect()
}
}
}
impl<T> AsRef<FileGroup<T>> for FileGroup<T> {
fn as_ref(&self) -> &FileGroup<T> {
self
}
}
#[derive(Debug, Eq, PartialEq)]
pub struct FileSubGroup<F> {
pub files: Vec<F>,
}
impl<F> FileSubGroup<F> {
pub fn empty() -> FileSubGroup<F> {
FileSubGroup { files: vec![] }
}
pub fn single(f: F) -> FileSubGroup<F> {
FileSubGroup { files: vec![f] }
}
pub fn push(&mut self, file: F) {
self.files.push(file)
}
}
impl<F: AsRef<Path> + AsRef<FileId>> FileSubGroup<F> {
pub fn group(
files: impl IntoIterator<Item = F>,
roots: &[Path],
group_by_id: bool,
) -> Vec<FileSubGroup<F>> {
let mut prefix_groups = Vec::from_iter(roots.iter().map(|_| FileSubGroup::empty()));
let mut id_groups = IndexMap::new(); for f in files {
let path: &Path = f.as_ref();
let id: FileId = *f.as_ref();
let root_idx = roots.iter().position(|r| r.is_prefix_of(path));
match root_idx {
Some(idx) => prefix_groups[idx].files.push(f),
None if group_by_id => id_groups.entry(id).or_insert(FileSubGroup::empty()).push(f),
None => prefix_groups.push(FileSubGroup::single(f)),
}
}
prefix_groups.extend(id_groups.into_values());
prefix_groups.retain(|sg| !sg.files.is_empty());
prefix_groups
}
}
struct HashedFileInfo {
file_hash: FileHash,
file_info: FileInfo,
}
fn partition_by_devices(
files: Vec<FileGroup<FileInfo>>,
devices: &DiskDevices,
) -> Vec<Vec<HashedFileInfo>> {
let mut result: Vec<Vec<HashedFileInfo>> = Vec::with_capacity(devices.len());
for _ in 0..devices.len() {
result.push(Vec::new());
}
for g in files {
for f in g.files {
let device = &devices[f.get_device_index()];
result[device.index].push(HashedFileInfo {
file_hash: g.file_hash.clone(),
file_info: f,
});
}
}
result
}
fn flat_iter(files: &[FileGroup<FileInfo>]) -> impl ParallelIterator<Item = &FileInfo> {
files.par_iter().flat_map(|g| &g.files)
}
fn rehash<'a, F1, F2, H>(
groups: Vec<FileGroup<FileInfo>>,
group_pre_filter: F1,
group_post_filter: F2,
devices: &DiskDevices,
access_type: FileAccess,
hash_fn: H,
) -> Vec<FileGroup<FileInfo>>
where
F1: Fn(&FileGroup<FileInfo>) -> bool,
F2: Fn(&FileGroup<FileInfo>) -> bool,
H: Fn((&mut FileInfo, FileHash)) -> Option<FileHash> + Sync + Send + 'a,
{
type HashFn<'a> = dyn Fn((&mut FileInfo, FileHash)) -> Option<FileHash> + Sync + Send + 'a;
let hash_fn: &HashFn<'a> = &hash_fn;
let (tx, rx): (Sender<HashedFileInfo>, Receiver<HashedFileInfo>) = channel();
let (groups_to_fclones, groups_to_pass): (Vec<_>, Vec<_>) =
groups.into_iter().partition(group_pre_filter);
let files = partition_by_devices(groups_to_fclones, devices);
let mut hash_map =
GroupMap::new(|f: HashedFileInfo| ((f.file_info.len, f.file_hash), f.file_info));
let hash_map_ref = &mut hash_map;
thread::scope(move |s| {
for (mut files, device) in files.into_iter().zip(devices.iter()) {
if files.is_empty() {
continue;
}
let tx = tx.clone();
s.spawn(move |_| {
files.par_sort_unstable_by_key(|f| f.file_info.location);
let thread_pool = match access_type {
FileAccess::Sequential => device.seq_thread_pool(),
FileAccess::Random => device.rand_thread_pool(),
};
let thread_count = thread_pool.current_num_threads() as isize;
let semaphore = Arc::new(Semaphore::new(8 * thread_count));
for (_, fg) in &files.into_iter().group_by(|f| f.file_info.id) {
let mut fg = fg.collect_vec();
let tx = tx.clone();
let guard = semaphore.clone().access_owned();
let hash_fn: &HashFn<'static> = unsafe { std::mem::transmute(hash_fn) };
thread_pool.spawn_fifo(move || {
let _open_files_guard = RLIMIT_OPEN_FILES.clone().access_owned();
let old_hash = fg[0].file_hash.clone();
if let Some(hash) = hash_fn((&mut fg[0].file_info, old_hash)) {
for mut f in fg {
f.file_hash = hash.clone();
tx.send(f).unwrap();
}
}
drop(guard);
});
}
});
}
drop(tx);
while let Ok(hashed_file) = rx.recv() {
hash_map_ref.add(hashed_file);
}
})
.unwrap();
hash_map
.into_iter()
.map(|((len, hash), files)| FileGroup {
file_len: len,
file_hash: hash,
files: files.to_vec(),
})
.chain(groups_to_pass)
.filter(group_post_filter)
.collect()
}
fn scan_files(ctx: &GroupCtx<'_>) -> Vec<Vec<FileInfo>> {
let file_collector = ThreadLocal::new();
let file_count = AtomicUsize::new(0);
let spinner = ctx
.log
.progress_bar(&ctx.phases.format(Phase::Walk), ProgressBarLength::Unknown);
let spinner_tick = &|_: &Path| {
file_count.fetch_add(1, Ordering::Relaxed);
spinner.inc(1);
};
let config = &ctx.config;
let min_size = config.min_size;
let max_size = config.max_size.unwrap_or(FileLen::MAX);
let mut walk = Walk::new();
walk.depth = config.depth.unwrap_or(usize::MAX);
walk.hidden = config.hidden;
walk.follow_links = config.follow_links;
walk.report_links = config.symbolic_links;
walk.no_ignore = config.no_ignore;
walk.one_fs = config.one_fs;
walk.path_selector = ctx.path_selector.clone();
walk.log = Some(ctx.log);
walk.on_visit = spinner_tick;
walk.run(ctx.config.input_paths(), |path| {
file_info_or_log_err(path, &ctx.devices, ctx.log)
.into_iter()
.filter(|info| {
let l = info.len;
l >= min_size && l <= max_size
})
.for_each(|info| {
let vec = file_collector.get_or(|| RefCell::new(Vec::new()));
vec.borrow_mut().push(info);
});
});
ctx.log.info(format!(
"Scanned {} file entries",
file_count.load(Ordering::Relaxed)
));
let files: Vec<_> = file_collector.into_iter().map(|r| r.into_inner()).collect();
let file_count: usize = files.iter().map(|v| v.len()).sum();
let total_size: u64 = files.iter().flat_map(|v| v.iter().map(|i| i.len.0)).sum();
ctx.log.info(format!(
"Found {} ({}) files matching selection criteria",
file_count,
FileLen(total_size)
));
files
}
fn file_count<'a, T: 'a>(groups: impl IntoIterator<Item = &'a FileGroup<T>>) -> usize {
groups.into_iter().map(|g| g.file_count()).sum()
}
fn total_size<'a, T: 'a>(groups: impl IntoIterator<Item = &'a FileGroup<T>>) -> FileLen {
groups.into_iter().map(|g| g.total_size()).sum()
}
fn unique_file_count<'a, T>(groups: impl IntoIterator<Item = &'a FileGroup<T>>) -> usize
where
T: AsRef<FileId> + 'a,
{
groups.into_iter().map(|g| g.unique_count()).sum()
}
fn unique_file_size<'a, T: 'a>(groups: impl IntoIterator<Item = &'a FileGroup<T>>) -> FileLen
where
T: AsRef<FileId> + 'a,
{
groups.into_iter().map(|g| g.unique_size()).sum()
}
fn sort_files_by_id<'a, T: 'a>(groups: impl IntoIterator<Item = &'a mut FileGroup<T>>)
where
T: AsRef<FileId> + 'a,
{
for g in groups.into_iter() {
g.sort_by_id()
}
}
fn stage_stats(groups: &[FileGroup<FileInfo>], filter: &FileGroupFilter) -> (usize, FileLen) {
let mut total_count = 0;
let mut total_size = FileLen(0);
for g in groups {
let count = g.reported_count(filter);
let size = g.file_len * count as u64;
total_count += count;
total_size += size;
}
(total_count, total_size)
}
fn group_by_size(ctx: &GroupCtx<'_>, files: Vec<Vec<FileInfo>>) -> Vec<FileGroup<FileInfo>> {
let file_count: usize = files.iter().map(|v| v.len()).sum();
let progress = ctx.log.progress_bar(
&ctx.phases.format(Phase::GroupBySize),
ProgressBarLength::Items(file_count as u64),
);
let mut groups = GroupMap::new(|info: FileInfo| (info.len, info));
for files in files.into_iter() {
for file in files.into_iter() {
progress.inc(1);
groups.add(file);
}
}
let groups: Vec<_> = groups
.into_iter()
.map(|(l, files)| FileGroup {
file_len: l,
file_hash: FileHash::from(0),
files: files.into_vec(),
})
.filter(|g| g.matches(&ctx.group_filter))
.collect();
let stats = stage_stats(&groups, &ctx.group_filter);
ctx.log.info(format!(
"Found {} ({}) candidates after grouping by size",
stats.0, stats.1
));
groups
}
fn deduplicate<F>(files: &mut Vec<FileInfo>, progress: F)
where
F: Fn(&Path) + Sync + Send,
{
let mut groups = GroupMap::new(|fi: FileInfo| (fi.location, fi));
for f in files.drain(..) {
groups.add(f)
}
for (_, file_group) in groups.into_iter() {
if file_group.len() == 1 {
files.extend(file_group.into_iter().inspect(|p| progress(&p.path)));
} else {
files.extend(
file_group
.into_iter()
.inspect(|p| progress(&p.path))
.unique_by(|p| p.path.hash128()),
)
}
}
}
fn remove_same_files(
ctx: &GroupCtx<'_>,
groups: Vec<FileGroup<FileInfo>>,
) -> Vec<FileGroup<FileInfo>> {
let groups: Vec<_> = groups
.into_par_iter()
.update(|g| deduplicate(&mut g.files, |_| {}))
.filter(|g| g.matches(&ctx.group_filter))
.collect();
let stats = stage_stats(&groups, &ctx.group_filter);
ctx.log.info(format!(
"Found {} ({}) candidates after grouping by paths",
stats.0, stats.1,
));
groups
}
#[cfg(target_os = "linux")]
fn atomic_counter_vec(len: usize) -> Vec<std::sync::atomic::AtomicU32> {
let mut v = Vec::with_capacity(len);
for _ in 0..len {
v.push(std::sync::atomic::AtomicU32::new(0));
}
v
}
#[cfg(target_os = "linux")]
fn update_file_locations(ctx: &GroupCtx<'_>, groups: &mut (impl FileCollection + ?Sized)) {
let count = groups.count();
let progress = ctx
.log
.progress_bar("Fetching extents", ProgressBarLength::Items(count as u64));
let err_counters = atomic_counter_vec(ctx.devices.len());
groups.for_each_mut(|fi| {
let device: &DiskDevice = &ctx.devices[fi.get_device_index()];
if device.disk_kind != sysinfo::DiskKind::SSD {
if let Err(e) = fi.fetch_physical_location() {
const ENOENT_NO_SUCH_FILE: i32 = 2;
if e.raw_os_error()
.map_or(true, |err| err != ENOENT_NO_SUCH_FILE)
{
handle_fetch_physical_location_err(ctx, &err_counters, fi, e)
}
}
}
progress.inc(1)
});
}
#[cfg(not(target_os = "linux"))]
fn update_file_locations(_ctx: &GroupCtx<'_>, _groups: &mut (impl FileCollection + ?Sized)) {}
#[cfg(target_os = "linux")]
fn handle_fetch_physical_location_err(
ctx: &GroupCtx<'_>,
err_counters: &[std::sync::atomic::AtomicU32],
file_info: &FileInfo,
error: io::Error,
) {
const MAX_ERR_COUNT_TO_LOG: u32 = 10;
let device = &ctx.devices[file_info.get_device_index()];
let counter = &err_counters[device.index];
if crate::error::error_kind(&error) == io::ErrorKind::Unsupported {
if counter.swap(MAX_ERR_COUNT_TO_LOG, Ordering::Release) < MAX_ERR_COUNT_TO_LOG {
ctx.log.warn(format!(
"File system {} on device {} doesn't support FIEMAP ioctl API. \
This is generally harmless, but random access performance might be decreased \
because fclones can't determine physical on-disk location of file data needed \
for reading files in the optimal order.",
device.file_system,
device.name.to_string_lossy()
));
}
} else if counter.load(Ordering::Acquire) < MAX_ERR_COUNT_TO_LOG {
ctx.log.warn(format!(
"Failed to fetch file extents mapping for file {}: {}. \
This is generally harmless, but it might decrease random access performance.",
file_info.path.display(),
error
));
let err_count = counter.fetch_add(1, Ordering::AcqRel);
if err_count == MAX_ERR_COUNT_TO_LOG {
ctx.log.warn(format!(
"Too many errors trying to fetch file extent mappings on device {}. \
Subsequent errors for this device will be ignored.",
device.name.to_string_lossy()
))
}
}
}
fn group_transformed(ctx: &GroupCtx<'_>, files: Vec<FileInfo>) -> Vec<FileGroup<FileInfo>> {
let mut files = files;
files.par_sort_unstable_by_key(|f| FileId::of(f)); let groups = vec![FileGroup {
file_len: FileLen(0), file_hash: FileHash::from(0), files,
}];
let progress = ctx.log.progress_bar(
&ctx.phases.format(Phase::TransformAndGroup),
ProgressBarLength::Items(unique_file_count(&groups) as u64),
);
let groups = rehash(
groups,
|_| true,
|g| g.matches(&ctx.group_filter),
&ctx.devices,
FileAccess::Sequential,
|(fi, _)| {
let chunk = FileChunk::new(&fi.path, FilePos(0), fi.len);
let result =
ctx.hasher
.hash_transformed_or_log_err(&chunk, |_| {})
.map(|(len, hash)| {
fi.len = len;
hash
});
progress.inc(1);
result
},
);
let stats = stage_stats(&groups, &ctx.group_filter);
ctx.log.info(format!(
"Found {} ({}) {} files",
stats.0,
stats.1,
ctx.config.search_type()
));
groups
}
fn max_device_property<'a>(
devices: &DiskDevices,
files: impl ParallelIterator<Item = &'a FileInfo>,
property_fn: impl Fn(&DiskDevice) -> FileLen + Sync,
) -> FileLen {
files
.into_par_iter()
.map(|f| property_fn(&devices[f.get_device_index()]))
.max()
.unwrap_or_else(|| property_fn(devices.get_default()))
}
fn prefix_len<'a>(
partitions: &DiskDevices,
files: impl ParallelIterator<Item = &'a FileInfo>,
) -> FileLen {
max_device_property(partitions, files, |dd| dd.max_prefix_len())
}
fn group_by_prefix(
ctx: &GroupCtx<'_>,
prefix_len: FileLen,
groups: Vec<FileGroup<FileInfo>>,
) -> Vec<FileGroup<FileInfo>> {
let mut groups = groups;
sort_files_by_id(&mut groups);
let pre_filter = |g: &FileGroup<FileInfo>| g.unique_count() > 1;
let file_count = unique_file_count(groups.iter().filter(|g| pre_filter(g)));
let progress = ctx.log.progress_bar(
&ctx.phases.format(Phase::GroupByPrefix),
ProgressBarLength::Items(file_count as u64),
);
let groups = rehash(
groups,
pre_filter,
|g| g.matches(&ctx.group_filter),
&ctx.devices,
FileAccess::Random,
|(fi, _)| {
progress.inc(1);
let prefix_len = if fi.len <= prefix_len {
prefix_len
} else {
ctx.devices[fi.get_device_index()].min_prefix_len()
};
let chunk = FileChunk::new(&fi.path, FilePos(0), prefix_len);
ctx.hasher.hash_file_or_log_err(&chunk, |_| {})
},
);
let stats = stage_stats(&groups, &ctx.group_filter);
ctx.log.info(format!(
"Found {} ({}) candidates after grouping by prefix",
stats.0, stats.1
));
groups
}
fn suffix_len<'a>(
partitions: &DiskDevices,
files: impl ParallelIterator<Item = &'a FileInfo>,
) -> FileLen {
max_device_property(partitions, files, |dd| dd.suffix_len())
}
fn suffix_threshold<'a>(
partitions: &DiskDevices,
files: impl ParallelIterator<Item = &'a FileInfo>,
) -> FileLen {
max_device_property(partitions, files, |dd| dd.suffix_threshold())
}
fn group_by_suffix(
ctx: &GroupCtx<'_>,
groups: Vec<FileGroup<FileInfo>>,
) -> Vec<FileGroup<FileInfo>> {
let mut groups = groups;
sort_files_by_id(&mut groups);
let suffix_len = ctx
.config
.max_suffix_size
.unwrap_or_else(|| suffix_len(&ctx.devices, flat_iter(&groups)));
let suffix_threshold = suffix_threshold(&ctx.devices, flat_iter(&groups));
let pre_filter =
|g: &FileGroup<FileInfo>| g.file_len >= suffix_threshold && g.unique_count() > 1;
let file_count = unique_file_count(groups.iter().filter(|g| pre_filter(g)));
let progress = ctx.log.progress_bar(
&ctx.phases.format(Phase::GroupBySuffix),
ProgressBarLength::Items(file_count as u64),
);
let groups = rehash(
groups,
pre_filter,
|g| g.matches(&ctx.group_filter),
&ctx.devices,
FileAccess::Random,
|(fi, old_hash)| {
progress.inc(1);
let chunk = FileChunk::new(&fi.path, fi.len.as_pos() - suffix_len, suffix_len);
ctx.hasher
.hash_file_or_log_err(&chunk, |_| {})
.map(|new_hash| old_hash ^ new_hash)
},
);
let stats = stage_stats(&groups, &ctx.group_filter);
ctx.log.info(format!(
"Found {} ({}) candidates after grouping by suffix",
stats.0, stats.1
));
groups
}
fn group_by_contents(
ctx: &GroupCtx<'_>,
min_file_len: FileLen,
groups: Vec<FileGroup<FileInfo>>,
) -> Vec<FileGroup<FileInfo>> {
let mut groups = groups;
sort_files_by_id(&mut groups);
let pre_filter = |g: &FileGroup<FileInfo>| g.unique_count() > 1 && g.file_len >= min_file_len;
let bytes_to_scan = unique_file_size(groups.iter().filter(|g| pre_filter(g)));
let progress = &ctx.log.progress_bar(
&ctx.phases.format(Phase::GroupByContents),
ProgressBarLength::Bytes(bytes_to_scan.0),
);
let groups = rehash(
groups,
pre_filter,
|g| g.matches_strictly(&ctx.group_filter),
&ctx.devices,
FileAccess::Sequential,
|(fi, _)| {
let chunk = FileChunk::new(&fi.path, FilePos(0), fi.len);
ctx.hasher
.hash_file_or_log_err(&chunk, |bytes_read| progress.inc(bytes_read as u64))
},
);
let stats = stage_stats(&groups, &ctx.group_filter);
ctx.log.info(format!(
"Found {} ({}) {} files",
stats.0,
stats.1,
ctx.config.search_type()
));
groups
}
pub fn group_files(config: &GroupConfig, log: &dyn Log) -> Result<Vec<FileGroup<FileInfo>>, Error> {
let spinner = log.progress_bar("Initializing", ProgressBarLength::Unknown);
let ctx = GroupCtx::new(config, log)?;
drop(spinner);
let matching_files = scan_files(&ctx);
let mut groups = match &ctx.hasher.transform {
Some(_transform) => {
let mut files = matching_files.into_iter().flatten().collect_vec();
deduplicate(&mut files, |_| {});
update_file_locations(&ctx, &mut files);
group_transformed(&ctx, files)
}
_ => {
let size_groups = group_by_size(&ctx, matching_files);
let mut size_groups_pruned = remove_same_files(&ctx, size_groups);
update_file_locations(&ctx, &mut size_groups_pruned);
let prefix_len = ctx
.config
.max_prefix_size
.unwrap_or_else(|| prefix_len(&ctx.devices, flat_iter(&size_groups_pruned)));
let prefix_groups = group_by_prefix(&ctx, prefix_len, size_groups_pruned);
let suffix_groups = group_by_suffix(&ctx, prefix_groups);
if !ctx.config.skip_content_hash {
group_by_contents(&ctx, prefix_len, suffix_groups)
} else {
suffix_groups
}
}
};
groups.par_sort_by_key(|g| Reverse((g.file_len, g.file_hash.u128_prefix())));
groups
.par_iter_mut()
.for_each(|g| g.sort_by_path(&ctx.group_filter.root_paths));
Ok(groups)
}
pub fn write_report(
config: &GroupConfig,
log: &dyn Log,
groups: &[FileGroup<FileInfo>],
) -> io::Result<()> {
let now = Local::now();
let total_count = file_count(groups.iter());
let total_size = total_size(groups.iter());
let (redundant_count, redundant_size) = groups.iter().fold((0, FileLen(0)), |res, g| {
let count = g.redundant_count(&config.group_filter());
(res.0 + count, res.1 + g.file_len * count as u64)
});
let (missing_count, missing_size) = groups.iter().fold((0, FileLen(0)), |res, g| {
let count = g.missing_count(&config.group_filter());
(res.0 + count, res.1 + g.file_len * count as u64)
});
let header = ReportHeader {
timestamp: DateTime::from_naive_utc_and_offset(now.naive_utc(), *now.offset()),
version: env!("CARGO_PKG_VERSION").to_owned(),
command: args_os().map(Arg::from).collect(),
base_dir: config.base_dir.clone(),
stats: Some(FileStats {
group_count: groups.len(),
total_file_count: total_count,
total_file_size: total_size,
redundant_file_count: redundant_count,
redundant_file_size: redundant_size,
missing_file_count: missing_count,
missing_file_size: missing_size,
}),
};
match &config.output {
Some(path) => {
let progress = log.progress_bar(
"Writing report",
ProgressBarLength::Items(groups.len() as u64),
);
let iter = groups.iter().inspect(|_g| progress.inc(1));
let file = BufWriter::new(File::create(path)?);
let mut reporter = ReportWriter::new(file, false);
reporter.write(config.format, &header, iter)
}
None => {
let term = Term::stdout();
let color = term.is_term();
let mut reporter = ReportWriter::new(BufWriter::new(term), color);
reporter.write(config.format, &header, groups.iter())
}
}
}
#[cfg(test)]
mod test {
use std::fs::{create_dir, hard_link, File, OpenOptions};
use std::io::{Read, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Mutex;
use crate::log::StdLog;
use rand::seq::SliceRandom;
use sysinfo::DiskKind;
use crate::path::Path;
use crate::util::test::*;
use super::*;
const MAX_PREFIX_LEN: usize = 256 * 1024;
const MAX_SUFFIX_LEN: usize = 256 * 1024;
#[test]
fn items_should_be_split_into_groups() {
use super::GroupMap;
use smallvec::SmallVec;
let mut map = GroupMap::new(|item: (u32, u32)| (item.0, item.1));
map.add((1, 10));
map.add((2, 20));
map.add((1, 11));
map.add((2, 21));
let mut groups: Vec<_> = map.into_iter().collect();
groups.sort_by_key(|item| item.0);
assert_eq!(groups[0], (1, SmallVec::from_vec(vec![10, 11])));
assert_eq!(groups[1], (2, SmallVec::from_vec(vec![20, 21])));
}
#[test]
fn test_rehash_puts_files_with_different_hashes_to_different_groups() {
let devices = DiskDevices::default();
let input = vec![FileGroup {
file_len: FileLen(200),
file_hash: FileHash::from(0),
files: vec![
FileInfo {
id: FileId {
device: 1,
inode: 1,
},
len: FileLen(200),
location: 0,
path: Path::from("file1"),
},
FileInfo {
id: FileId {
device: 1,
inode: 2,
},
len: FileLen(200),
location: 35847587,
path: Path::from("file2"),
},
],
}];
let result = rehash(
input,
|_| true,
|_| true,
&devices,
FileAccess::Random,
|(fi, _)| Some(FileHash::from(fi.location as u128)),
);
assert_eq!(result.len(), 2);
assert_eq!(result[0].files.len(), 1);
assert_eq!(result[1].files.len(), 1);
assert_ne!(result[0].files[0].path, result[1].files[0].path);
}
#[test]
fn test_rehash_doesnt_hash_files_with_same_id_more_than_once() {
let devices = DiskDevices::default();
let input = vec![FileGroup {
file_len: FileLen(200),
file_hash: FileHash::from(0),
files: vec![
FileInfo {
id: FileId {
device: 1,
inode: 1,
},
len: FileLen(200),
location: 0,
path: Path::from("file1"),
},
FileInfo {
id: FileId {
device: 1,
inode: 1,
},
len: FileLen(200),
location: 0,
path: Path::from("file2"),
},
],
}];
let hash_call_count = AtomicUsize::new(0);
let result = rehash(
input,
|_| true,
|_| true,
&devices,
FileAccess::Random,
|(fi, _)| {
hash_call_count.fetch_add(1, Ordering::Relaxed);
Some(FileHash::from(fi.location as u128))
},
);
assert_eq!(result.len(), 1);
assert_eq!(result[0].files.len(), 2);
assert_ne!(result[0].files[0].path, result[0].files[1].path);
assert_eq!(hash_call_count.load(Ordering::Relaxed), 1);
}
#[test]
fn test_rehash_puts_files_with_same_hashes_to_same_groups() {
let devices = DiskDevices::default();
let input = vec![
FileGroup {
file_len: FileLen(200),
file_hash: FileHash::from(0),
files: vec![FileInfo {
id: FileId {
device: 1,
inode: 1,
},
len: FileLen(200),
location: 0,
path: Path::from("file1"),
}],
},
FileGroup {
file_len: FileLen(500),
file_hash: FileHash::from(0),
files: vec![FileInfo {
id: FileId {
device: 1,
inode: 2,
},
len: FileLen(200),
location: 35847587,
path: Path::from("file2"),
}],
},
];
let result = rehash(
input,
|_| true,
|_| true,
&devices,
FileAccess::Random,
|(_, _)| Some(FileHash::from(123456)),
);
assert_eq!(result.len(), 1);
assert_eq!(result[0].files.len(), 2);
}
#[test]
fn test_rehash_can_skip_processing_files() {
let devices = DiskDevices::default();
let input = vec![FileGroup {
file_len: FileLen(200),
file_hash: FileHash::from(0),
files: vec![FileInfo {
id: FileId {
device: 1,
inode: 1,
},
len: FileLen(200),
location: 0,
path: Path::from("file1"),
}],
}];
let called = AtomicBool::new(false);
let result = rehash(
input,
|_| false,
|_| true,
&devices,
FileAccess::Random,
|(fi, _)| {
called.store(true, Ordering::Release);
Some(FileHash::from(fi.location as u128))
},
);
assert_eq!(result.len(), 1);
assert!(!called.load(Ordering::Acquire));
}
#[test]
fn test_rehash_post_filter_removes_groups() {
let devices = DiskDevices::default();
let input = vec![FileGroup {
file_len: FileLen(200),
file_hash: FileHash::from(0),
files: vec![
FileInfo {
id: FileId {
device: 1,
inode: 1,
},
len: FileLen(200),
location: 0,
path: Path::from("file1"),
},
FileInfo {
id: FileId {
device: 1,
inode: 2,
},
len: FileLen(200),
location: 35847587,
path: Path::from("file2"),
},
],
}];
let result = rehash(
input,
|_| true,
|g| g.files.len() >= 2,
&devices,
FileAccess::Random,
|(fi, _)| Some(FileHash::from(fi.location as u128)),
);
assert!(result.is_empty())
}
#[test]
fn test_rehash_processes_files_in_location_order_on_hdd() {
let thread_count = 2;
let devices = DiskDevices::single(DiskKind::HDD, thread_count);
let count = 1000;
let mut input = Vec::with_capacity(count);
for i in 0..count {
input.push(FileGroup {
file_len: FileLen(0),
file_hash: FileHash::from(0),
files: vec![FileInfo {
id: FileId {
device: 1,
inode: i as InodeId,
},
len: FileLen(0),
location: i as u64,
path: Path::from(format!("file{i}")),
}],
})
}
input.shuffle(&mut rand::thread_rng());
let processing_order = Mutex::new(Vec::new());
rehash(
input,
|_| true,
|_| true,
&devices,
FileAccess::Random,
|(fi, _)| {
processing_order.lock().unwrap().push(fi.location as i32);
Some(FileHash::from(fi.location as u128))
},
);
let processing_order = processing_order.into_inner().unwrap();
let mut distance = 0;
for i in 0..processing_order.len() - 1 {
distance += i32::abs(processing_order[i] - processing_order[i + 1])
}
assert!(distance < (thread_count * count) as i32)
}
#[test]
fn identical_small_files() {
with_dir("main/identical_small_files", |root| {
let file1 = root.join("file1");
let file2 = root.join("file2");
write_test_file(&file1, b"aaa", b"", b"");
write_test_file(&file2, b"aaa", b"", b"");
let log = test_log();
let config = GroupConfig {
paths: vec![file1.into(), file2.into()],
..GroupConfig::default()
};
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].file_len, FileLen(3));
assert_eq!(results[0].files.len(), 2);
});
}
#[test]
fn identical_large_files() {
with_dir("main/identical_large_files", |root| {
let file1 = root.join("file1");
let file2 = root.join("file2");
write_test_file(&file1, &[0; MAX_PREFIX_LEN], &[1; 4096], &[2; 4096]);
write_test_file(&file2, &[0; MAX_PREFIX_LEN], &[1; 4096], &[2; 4096]);
let log = test_log();
let config = GroupConfig {
paths: vec![file1.into(), file2.into()],
..GroupConfig::default()
};
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].files.len(), 2);
});
}
#[test]
fn files_differing_by_size() {
with_dir("main/files_differing_by_size", |root| {
let file1 = root.join("file1");
let file2 = root.join("file2");
write_test_file(&file1, b"aaaa", b"", b"");
write_test_file(&file2, b"aaa", b"", b"");
let file1 = Path::from(file1);
let file2 = Path::from(file2);
let log = test_log();
let config = GroupConfig {
paths: vec![file1.clone(), file2.clone()],
rf_over: Some(0),
..GroupConfig::default()
};
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].paths(), vec![file1.canonicalize()]);
assert_eq!(results[1].paths(), vec![file2.canonicalize()]);
});
}
#[test]
fn files_differing_by_prefix() {
with_dir("main/files_differing_by_prefix", |root| {
let file1 = root.join("file1");
let file2 = root.join("file2");
write_test_file(&file1, b"aaa", b"", b"");
write_test_file(&file2, b"bbb", b"", b"");
let log = test_log();
let config = GroupConfig {
paths: vec![file1.into(), file2.into()],
unique: true,
..GroupConfig::default()
};
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].files.len(), 1);
assert_eq!(results[1].files.len(), 1);
});
}
#[test]
fn files_differing_by_suffix() {
with_dir("main/files_differing_by_suffix", |root| {
let file1 = root.join("file1");
let file2 = root.join("file2");
let prefix = [0; MAX_PREFIX_LEN];
let mid = [1; MAX_PREFIX_LEN + MAX_SUFFIX_LEN];
write_test_file(&file1, &prefix, &mid, b"suffix1");
write_test_file(&file2, &prefix, &mid, b"suffix2");
let log = test_log();
let config = GroupConfig {
paths: vec![file1.into(), file2.into()],
unique: true,
..GroupConfig::default()
};
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].files.len(), 1);
assert_eq!(results[1].files.len(), 1);
});
}
#[test]
fn files_differing_by_middle() {
with_dir("main/files_differing_by_middle", |root| {
let file1 = root.join("file1");
let file2 = root.join("file2");
let prefix = [0; MAX_PREFIX_LEN];
let suffix = [1; MAX_SUFFIX_LEN];
write_test_file(&file1, &prefix, b"middle1", &suffix);
write_test_file(&file2, &prefix, b"middle2", &suffix);
let log = test_log();
let config = GroupConfig {
paths: vec![file1.into(), file2.into()],
unique: true,
..GroupConfig::default()
};
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 2);
assert_eq!(results[0].files.len(), 1);
assert_eq!(results[1].files.len(), 1);
});
}
#[test]
fn hard_links() {
with_dir("main/hard_links", |root| {
let file1 = root.join("file1");
let file2 = root.join("file2");
write_test_file(&file1, b"aaa", b"", b"");
hard_link(&file1, &file2).unwrap();
let log = test_log();
let config = GroupConfig {
paths: vec![file1.into(), file2.into()],
unique: true, ..GroupConfig::default()
};
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].files.len(), 2);
});
}
#[test]
#[cfg(unix)]
fn report_symbolic_links_to_files() {
with_dir("main/soft_links", |root| {
let file1 = root.join("file1");
let file2 = root.join("file2");
write_test_file(&file1, b"aaa", b"", b"");
std::os::unix::fs::symlink(&file1, &file2).unwrap();
let log = test_log();
let mut config = GroupConfig {
paths: vec![file1.into(), file2.into()],
match_links: true,
symbolic_links: true,
..GroupConfig::default()
};
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].files.len(), 2);
config.symbolic_links = false;
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 0);
config.unique = true;
config.symbolic_links = true;
config.match_links = false;
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].files.len(), 2);
});
}
#[test]
fn duplicate_input_files() {
with_dir("main/duplicate_input_files", |root| {
let file1 = root.join("file1");
write_test_file(&file1, b"foo", b"", b"");
let log = test_log();
let file1 = Path::from(file1);
let config = GroupConfig {
paths: vec![file1.clone(), file1.clone(), file1],
match_links: true,
unique: true,
..GroupConfig::default()
};
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].files.len(), 1);
});
}
#[test]
#[cfg(unix)]
fn duplicate_input_files_non_canonical() {
use std::os::unix::fs::symlink;
with_dir("main/duplicate_input_files_non_canonical", |root| {
let dir = root.join("dir");
symlink(root, dir).unwrap();
let file1 = root.join("file1");
let file2 = root.join("dir/file1");
write_test_file(&file1, b"foo", b"", b"");
let log = test_log();
let config = GroupConfig {
paths: vec![file1.into(), file2.into()],
match_links: true,
unique: true,
..GroupConfig::default()
};
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].files.len(), 1);
});
}
#[test]
fn duplicate_files_different_roots() {
with_dir("main/duplicate_files_different_roots", |root| {
let root1 = root.join("root1");
let root2 = root.join("root2");
create_dir(&root1).unwrap();
create_dir(&root2).unwrap();
let file1 = root1.join("file1");
let file2 = root1.join("file2");
write_test_file(&file1, b"foo", b"", b"");
write_test_file(&file2, b"foo", b"", b"");
let log = test_log();
let mut config = GroupConfig {
paths: vec![root1.into(), root2.into()],
isolate: true,
..GroupConfig::default()
};
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 0);
config.isolate = false;
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].files.len(), 2);
});
}
#[test]
#[cfg(unix)]
fn transformed_truncated() {
with_dir("target/test/group/transform/truncate/", |root| {
let input_path_1 = root.join("input1.txt");
let input_path_2 = root.join("input2.txt");
write_file(&input_path_1, "aa|1");
write_file(&input_path_2, "aa|23456");
let log = test_log();
let config = GroupConfig {
paths: vec![input_path_1.into(), input_path_2.into()],
transform: Some("dd count=2 bs=1".to_string()),
..GroupConfig::default()
};
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].files.len(), 2);
})
}
#[test]
fn unique_files() {
with_dir("main/unique_files", |root| {
let file1 = root.join("file1");
let file2 = root.join("file2");
let file3 = root.join("file3");
write_test_file(&file1, b"duplicate", b"", b"");
write_test_file(&file2, b"duplicate", b"", b"");
write_test_file(&file3, b"unique", b"", b"");
let file3 = Path::from(file3);
let log = test_log();
let config = GroupConfig {
unique: true,
paths: vec![file1.into(), file2.into(), file3.clone()],
..GroupConfig::default()
};
let results = group_files(&config, &log).unwrap();
assert_eq!(results.len(), 1);
assert_eq!(results[0].files[0].path, file3);
});
}
#[test]
fn report() {
with_dir("main/report", |root| {
let file = root.join("file1");
write_test_file(&file, b"foo", b"", b"");
let report_file = root.join("report.txt");
let log = test_log();
let config = GroupConfig {
paths: vec![file.into()],
unique: true,
output: Some(report_file.clone()),
..GroupConfig::default()
};
let results = group_files(&config, &log).unwrap();
write_report(&config, &log, &results).unwrap();
assert!(report_file.exists());
let mut report = String::new();
File::open(report_file)
.unwrap()
.read_to_string(&mut report)
.unwrap();
assert!(report.contains("file1"))
});
}
#[test]
fn split_to_subgroups() {
fn file(path: &str, id: InodeId) -> FileInfo {
FileInfo {
path: Path::from(path),
id: FileId {
inode: id,
device: 0,
},
len: FileLen(1024),
location: id * 1024,
}
}
let roots = vec![Path::from("/r0"), Path::from("/r1"), Path::from("/r2")];
let files = vec![
file("/r1/f1a", 0),
file("/r2/f2a", 1),
file("/r2/f2b", 2),
file("/r1/f1b", 3),
file("/r1/f1c", 4),
file("/r3/f3a", 5),
file("/r2/f2c", 6),
];
let groups = FileSubGroup::group(files, &roots, true);
assert_eq!(
groups,
vec![
FileSubGroup {
files: vec![file("/r1/f1a", 0), file("/r1/f1b", 3), file("/r1/f1c", 4),]
},
FileSubGroup {
files: vec![file("/r2/f2a", 1), file("/r2/f2b", 2), file("/r2/f2c", 6)]
},
FileSubGroup {
files: vec![file("/r3/f3a", 5)]
}
]
)
}
#[test]
fn partition() {
let fg = FileGroup {
file_len: FileLen::from(1u64),
file_hash: FileHash::from(1u128),
files: vec!["a1", "b1", "a2", "b2", "b3"],
};
let mut partitions = fg.partition_by_key(|f| f.chars().next().unwrap());
assert_eq!(partitions.len(), 2);
partitions.sort_by_key(|p| p.files.len());
assert_eq!(partitions[0].files, vec!["a1", "a2"]);
assert_eq!(partitions[1].files, vec!["b1", "b2", "b3"]);
}
#[test]
fn map() {
let fg = FileGroup {
file_len: FileLen::from(1u64),
file_hash: FileHash::from(1u128),
files: vec!["a", "b"],
};
let fg = fg.map(|f| format!("{f}.txt"));
assert_eq!(fg.files, vec![String::from("a.txt"), String::from("b.txt")]);
}
#[test]
fn try_map_all_happy_path() {
let fg = FileGroup {
file_len: FileLen::from(1u64),
file_hash: FileHash::from(1u128),
files: vec!["a", "b"],
};
let fg = fg.try_map_all(|f| Result::<_, ()>::Ok(format!("{f}.txt")));
assert!(fg.is_ok());
assert_eq!(
fg.unwrap().files,
vec![String::from("a.txt"), String::from("b.txt")]
);
}
#[test]
fn try_map_all_errors() {
let fg = FileGroup {
file_len: FileLen::from(1u64),
file_hash: FileHash::from(1u128),
files: vec!["a", "b"],
};
let fg = fg.try_map_all(|f| Result::<(), _>::Err(format!("error {f}")));
assert!(fg.is_err());
assert_eq!(
fg.unwrap_err(),
vec![String::from("error a"), String::from("error b")]
);
}
#[test]
fn flat_map() {
let fg = FileGroup {
file_len: FileLen::from(1u64),
file_hash: FileHash::from(1u128),
files: vec!["a1", "b1", "a2", "b2", "b3"],
};
let fg = fg.flat_map(|f| if f.starts_with('a') { Some(f) } else { None });
assert_eq!(fg.files, vec!["a1", "a2"]);
let fg = fg.flat_map(|f| vec![f, f]);
assert_eq!(fg.files, vec!["a1", "a1", "a2", "a2"]);
}
fn write_test_file(path: &PathBuf, prefix: &[u8], mid: &[u8], suffix: &[u8]) {
let mut file = OpenOptions::new()
.write(true)
.create(true)
.open(path)
.unwrap();
file.write_all(prefix).unwrap();
file.write_all(mid).unwrap();
file.write_all(suffix).unwrap();
}
fn test_log() -> StdLog {
let mut log = StdLog::new();
log.no_progress = true;
log
}
}