use std::cell::RefCell;
use std::collections::{HashMap, HashSet};
use std::ffi::OsString;
use std::io;
use std::rc::Rc;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::thread;
use std::time;
use threadpool::ThreadPool;
use crate::checksum::get_checksum_fn;
use crate::dir_tree::TableData;
use crate::{HashAlgorithm, NoProgressIndicator, ProgressIndicator};
type PartialChecksum = String;
type Checksum = String;
const HUNDRED_MILIS: time::Duration = time::Duration::from_millis(100);
#[derive(Debug)]
pub(crate) struct DuplicateTable {
table: HashMap<String, DTEntry>,
threadpool: Option<ThreadPool>,
checksum_rx: Receiver<(PartialChecksum, Checksum, TableData)>,
checksum_tx: Sender<(PartialChecksum, Checksum, TableData)>,
job_counter: u32, file_count: u64,
multithreaded: bool,
progress_indicator: Rc<RefCell<dyn ProgressIndicator>>,
checksum_fn: Arc<fn(&OsString) -> io::Result<String>>,
}
impl DuplicateTable {
pub(crate) fn new(num_threads: usize, hash_algorithm: HashAlgorithm) -> Self {
let mut threadpool = None;
let mut multithreaded = false;
if num_threads != 0 {
threadpool = Some(ThreadPool::new(num_threads));
multithreaded = true;
}
let (checksum_tx, checksum_rx) = channel::<(PartialChecksum, Checksum, TableData)>();
let progress_indicator = Rc::new(RefCell::new(NoProgressIndicator {}));
let checksum_fn = get_checksum_fn(&hash_algorithm);
DuplicateTable {
table: HashMap::new(),
threadpool,
multithreaded,
checksum_rx,
checksum_tx,
job_counter: 0,
file_count: 0,
progress_indicator,
checksum_fn: Arc::new(checksum_fn),
}
}
pub(crate) fn set_progress_indicator(
&mut self,
progress_indicator: Rc<RefCell<dyn ProgressIndicator>>,
) {
self.progress_indicator = progress_indicator;
}
pub(crate) fn register_item(&mut self, part_checksum: String, data: TableData) {
if self.multithreaded && self.threadpool.as_ref().unwrap().panic_count() > 0 {
panic!("There is at least one panicked checksum thread.");
}
self.file_count += 1;
match self.table.get(&part_checksum) {
Some(DTEntry::Single(_)) => {
let single_entry =
self.table.insert(part_checksum.clone(), DTEntry::new_multi_entry());
if let Some(DTEntry::Single(se)) = single_entry {
self.add_item(part_checksum.clone(), se);
} else {
panic!("Duplicate table should contain single entry at {part_checksum}");
}
self.add_item(part_checksum, data);
}
Some(DTEntry::Multiple(_)) => {
self.add_item(part_checksum, data);
}
None => {
self.table.insert(part_checksum, DTEntry::Single(data));
self.progress_indicator.borrow().update(self.file_count - self.job_counter as u64);
}
}
}
pub(crate) fn finalise(&mut self) {
if self.multithreaded {
log::debug!("Waiting for jobs in duplicate table.");
let threadpool = self.threadpool.as_ref().unwrap();
let mut num_not_done = threadpool.active_count() + threadpool.queued_count();
while num_not_done > 0 {
num_not_done = threadpool.active_count() + threadpool.queued_count();
self.progress_indicator.borrow().update(self.file_count - num_not_done as u64);
log::info!("Tracking progress.");
thread::sleep(2 * HUNDRED_MILIS);
}
log::debug!("All jobs in dupllicate table finished");
if self.threadpool.as_ref().unwrap().panic_count() > 0 {
panic!("There is at least one panicked checksum thread.");
}
for (part_checksum, checksum, entry) in
self.checksum_rx.try_iter().collect::<Vec<(PartialChecksum, Checksum, TableData)>>()
{
log::trace!("Adding {:?} to mult entries", entry.path());
self.add_to_mult_entries(part_checksum, checksum, entry);
}
log::trace!("Done adding checksums to duplicate table.");
self.progress_indicator.borrow().finalise();
if self.job_counter > 0 {
panic!("There were more jobs created ")
}
}
}
fn add_item(&mut self, part_checksum: String, entry: TableData) {
if self.multithreaded {
self.add_job(part_checksum, entry);
} else {
let checksum = (self.checksum_fn)(entry.path()).expect("Could not calculate checksum");
self.add_to_mult_entries(part_checksum, checksum, entry);
}
}
fn add_job(&mut self, part_checksum: String, entry: TableData) {
log::debug!("Adding job for {:?}", entry.path());
self.job_counter += 1;
let checksum_tx = self.checksum_tx.clone();
let checksum_fn = self.checksum_fn.clone();
self.threadpool.as_ref().unwrap().execute(move || {
let checksum = checksum_fn(entry.path()).expect("Could not calculate checksum");
checksum_tx.send((part_checksum, checksum, entry)).expect("Could not send data.");
})
}
fn add_to_mult_entries(&mut self, part_checksum: String, checksum: String, entry: TableData) {
if self.multithreaded {
self.job_counter -= 1;
}
if let Some(DTEntry::Multiple(me)) = self.table.get_mut(&part_checksum) {
match me.hashes.get_mut(&checksum) {
Some(v) => {
v.push(entry);
}
None => {
me.hashes.insert(checksum, vec![entry]);
}
}
} else {
panic!("Duplicate Table should contain Multiple entries with key:\n{part_checksum}")
}
self.progress_indicator.borrow().update(self.file_count - self.job_counter as u64);
}
pub(crate) fn get_duplicates(
&self,
part_checksum: &str,
entry: &TableData,
) -> Result<HashSet<TableData>, &str> {
if let Some(val) = self.table.get(part_checksum) {
match val {
DTEntry::Single(data) => {
if data == entry {
Ok(HashSet::new())
} else {
Err("There is unexpected data at {part_checksum}")
}
}
DTEntry::Multiple(MultipleEntries { hashes }) => {
for duplicates in hashes.values() {
if duplicates.contains(entry) {
let mut result: HashSet<TableData> =
duplicates.iter().map(|x| x.to_owned()).collect();
result.remove(entry);
return Ok(result);
}
}
Err("Could not find specified entry {entry:?} in MultipleEntries at {part_checksum}")
}
}
} else {
Err("There is no entry with the specified partial checksum {part_checksum:?}.")
}
}
}
#[derive(Debug)]
enum DTEntry {
Single(TableData),
Multiple(MultipleEntries),
}
impl DTEntry {
fn new_multi_entry() -> DTEntry {
DTEntry::Multiple(MultipleEntries { hashes: HashMap::new() })
}
}
#[derive(Debug)]
struct MultipleEntries {
hashes: HashMap<String, Vec<TableData>>,
}