use std::collections::HashMap;
use std::fs;
use std::io::Write;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use anyhow::{anyhow, Result};
use log::{error, info, trace};
use crate::analyze::analysis::{AnalysisFile, ResultEntryRef};
use crate::analyze::worker::{AnalysisJob, AnalysisResult, MarkedIntermediaryFile, WorkerArgument};
use crate::data::{GeneralHash, SaveFile, SaveFileEntry, SaveFileEntryType};
use crate::threadpool::ThreadPool;
pub struct AnalysisSettings {
pub input: PathBuf,
pub output: PathBuf,
pub threads: Option<usize>,
}
pub fn run(analysis_settings: AnalysisSettings) -> Result<()> {
let mut input_file_options = fs::File::options();
input_file_options.read(true);
input_file_options.write(false);
let mut output_file_options = fs::File::options();
output_file_options.create(true);
output_file_options.write(true);
output_file_options.truncate(true);
let input_file = match input_file_options.open(analysis_settings.input) {
Ok(file) => file,
Err(err) => {
return Err(anyhow!("Failed to open input file: {}", err));
}
};
let output_file = match output_file_options.open(analysis_settings.output) {
Ok(file) => file,
Err(err) => {
return Err(anyhow!("Failed to open output file: {}", err));
}
};
let mut input_buf_reader = std::io::BufReader::new(&input_file);
let mut output_buf_writer = std::io::BufWriter::new(&output_file);
let mut save_file = SaveFile::new(&mut output_buf_writer, &mut input_buf_reader, true, true, true);
save_file.load_header()?;
save_file.load_all_entries_no_filter()?;
let mut file_by_path = save_file.file_by_path;
let mut file_by_path_marked = HashMap::with_capacity(file_by_path.len());
let mut file_by_hash = save_file.file_by_hash;
let mut all_files = save_file.all_entries;
for (path, entry) in file_by_path.iter_mut() {
file_by_path_marked.insert(path.clone(), MarkedIntermediaryFile {
saved_file_entry: Arc::clone(entry),
file: Arc::new(Mutex::new(None)),
});
}
drop(file_by_path);
file_by_hash.retain(|_, entry| {
entry.len() >= 2
});
file_by_hash.shrink_to_fit();
all_files.retain(|entry| {
Arc::strong_count(entry) >= 3 });
let file_by_path = Arc::new(file_by_path_marked);
let mut args = Vec::with_capacity(analysis_settings.threads.unwrap_or_else(|| num_cpus::get()));
for _ in 0..args.capacity() {
args.push(WorkerArgument {
file_by_path: Arc::clone(&file_by_path)
});
}
let pool: ThreadPool<AnalysisJob, AnalysisResult> = ThreadPool::new(args, crate::cmd::analyze::worker::worker_run);
for entry in &all_files {
pool.publish(AnalysisJob::new(Arc::clone(entry)));
}
loop {
match pool.receive_timeout(Duration::from_secs(10)) {
Ok(result) => {
info!("Result: {:?}", result);
}
Err(_) => {
break;
}
}
}
drop(pool);
let mut duplicated_bytes: u64 = 0;
for entry in &all_files {
trace!("File: {}", entry.path);
let file = file_by_path.get(&entry.path).unwrap();
let file = file.file.lock().unwrap();
if let Some(file) = file.deref() {
let parent = file.parent().lock().unwrap();
match parent.deref() {
Some(parent) => {
let parent = parent.upgrade().unwrap();
let parent_hash;
match parent.deref() {
AnalysisFile::File(info) => {
parent_hash = Some(&info.content_hash);
},
AnalysisFile::Directory(info) => {
parent_hash = Some(&info.content_hash);
},
AnalysisFile::Symlink(info) => {
parent_hash = Some(&info.content_hash);
},
AnalysisFile::Other(_) => {
parent_hash = None;
},
}
let parent_conflicting;
match parent_hash {
None => {parent_conflicting = false;}
Some(parent_hash) => {
parent_conflicting = match file_by_hash.get(parent_hash) {
Some(entries) => {
entries.len() >= 2
},
None => {
false
}
}
}
}
if !parent_conflicting {
duplicated_bytes += write_result_entry(file, &file_by_hash, &mut output_buf_writer);
}
}
None => {
duplicated_bytes += write_result_entry(file, &file_by_hash, &mut output_buf_writer);
}
}
} else {
error!("File not analyzed yet: {:?}", entry.path);
}
}
output_buf_writer.flush().expect("Unable to flush file");
print!("There are {} GB of duplicated files", duplicated_bytes / 1024 / 1024 / 1024);
Ok(())
}
#[derive(Debug, PartialEq, Hash, Eq)]
struct SetKey<'a> {
size: u64,
ftype: &'a SaveFileEntryType,
}
fn write_result_entry(file: &AnalysisFile, file_by_hash: &HashMap<GeneralHash, Vec<Arc<SaveFileEntry>>>, output_buf_writer: &mut std::io::BufWriter<&fs::File>) -> u64 {
let hash = match file {
AnalysisFile::File(info) => &info.content_hash,
AnalysisFile::Directory(info) => &info.content_hash,
AnalysisFile::Symlink(info) => &info.content_hash,
AnalysisFile::Other(_) => {
return 0;
}
};
let mut sets: HashMap<SetKey, Vec<&SaveFileEntry>> = HashMap::new();
for file in file_by_hash.get(hash).unwrap() {
sets.entry(SetKey {
size: file.size,
ftype: &file.file_type
}).or_insert(Vec::new()).push(file);
}
let mut result_size: u64 = 0;
for set in &sets {
if set.1.len() <= 1 {
continue;
}
if &set.1[0].path != file.path() {
continue;
}
let mut conflicting = Vec::with_capacity(set.1.len());
for file in set.1 {
conflicting.push(&file.path);
}
let result = ResultEntryRef {
ftype: &set.0.ftype,
size: set.0.size,
hash,
conflicting,
};
output_buf_writer.write(serde_json::to_string(&result).unwrap().as_bytes()).expect("Unable to write to file");
output_buf_writer.write('\n'.to_string().as_bytes()).expect("Unable to write to file");
result_size += result.size * (result.conflicting.len() as u64 - 1);
}
return result_size;
}
mod worker;
pub mod analysis;