use std::thread;
use std::thread::JoinHandle;
use anyhow::{anyhow, Context, Result};
use crossbeam::channel;
use crossbeam::channel::Sender;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use crate::duplicate_detection::coordinator::Coordinator;
use crate::duplicate_detection::messages::{AnalysisMessage, DuplicateInvestigation};
use crate::options::Options;
use crate::types::Duplicate;
use crate::ui;
pub struct DuplicateDetector {
options: Options,
}
impl DuplicateDetector {
pub fn new(options: Options) -> Self {
Self { options }
}
pub fn check_duplicates(&self, duplicates: Vec<Duplicate>) -> Result<Vec<Duplicate>> {
info!("Creating coordinator");
let (investigation_out, investigation_in) = ::crossbeam::channel::unbounded();
let max_duplicate_size = duplicates
.iter()
.map(|d| (d.locations.len() as u64 - 1) * d.file_size)
.sum();
let coordinator = Coordinator::new(
self.options,
investigation_out.clone(),
investigation_in,
duplicates.len() as u64,
max_duplicate_size,
);
let feed_handle: JoinHandle<Result<()>> = thread::spawn(move || {
for duplicate in duplicates {
investigation_out
.send(DuplicateInvestigation::new(duplicate))
.context(anyhow!("Could not send investigation"))?
}
Ok(())
});
info!("Creating progress display");
let (sender, display_handle) = if self.options.debug {
self.spawn_progress_debug()
} else if self.options.no_progress {
self.spawn_progress_noop()
} else {
self.spawn_progress_display()
};
info!("Awaiting coordination completion");
let res = coordinator.work(sender);
display_handle
.join()
.map_err(|_| anyhow!("Could not join coordinator thread"))??;
feed_handle
.join()
.map_err(|_| anyhow!("Could not join coordinator thread"))??;
res
}
fn spawn_progress_noop(&self) -> (Sender<AnalysisMessage>, JoinHandle<Result<()>>) {
let (sender, receiver) = channel::unbounded();
let handle = std::thread::spawn(move || {
while receiver.recv().is_ok() {}
Ok(())
});
(sender, handle)
}
fn spawn_progress_debug(&self) -> (Sender<AnalysisMessage>, JoinHandle<Result<()>>) {
let (sender, receiver) = channel::unbounded();
let handle = std::thread::spawn(move || {
debug!("Starting debug analysis log");
while let Ok(msg) = receiver.recv() {
debug!("{:?}", msg);
}
debug!("Finished debug analysis log");
Ok(())
});
(sender, handle)
}
fn spawn_progress_display(&self) -> (Sender<AnalysisMessage>, JoinHandle<Result<()>>) {
let (sender, receiver) = channel::unbounded();
let multi = MultiProgress::new();
let file_style = ProgressStyle::default_spinner()
.template("[{elapsed_precise:.yellow}] {bar:40.cyan/blue} {pos}/{len}");
let bytes_style = ProgressStyle::default_bar()
.template("|{spinner:.white.dim}| {bar:40.cyan/blue} {bytes}/{total_bytes}")
.tick_strings(ui::TURBO_FISH);
let files = multi.add(ProgressBar::new(0));
files.set_style(file_style);
let bytes = multi.add(ProgressBar::new(0));
bytes.set_style(bytes_style);
bytes.enable_steady_tick(50);
let progress_updater = std::thread::spawn(move || {
while let Ok(msg) = receiver.recv() {
let stats = match msg {
AnalysisMessage::Update(stats) => stats,
AnalysisMessage::Finished(stats) => stats,
};
files.set_length(stats.total_suspected_duplicates);
files.set_position(stats.total_completed_investigations);
bytes.set_length(stats.max_duplicate_size);
bytes.set_position(stats.total_duplicate_size);
}
files.finish_and_clear();
bytes.finish_and_clear();
});
let handle = std::thread::spawn(move || {
multi
.join()
.map_err(|_| anyhow!("Failed to properly close display"))?;
progress_updater
.join()
.map_err(|_| anyhow!("Failed to properly close display"))
});
(sender, handle)
}
}