dsc 0.1.3

dsc is a cli tool for finding and removing duplicate files on one or multiple file systems, while respecting your gitignore rules.
use std::thread;
use std::thread::JoinHandle;

use anyhow::{anyhow, Context, Result};
use crossbeam::channel;
use crossbeam::channel::Sender;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};

use crate::duplicate_detection::coordinator::Coordinator;
use crate::duplicate_detection::messages::{AnalysisMessage, DuplicateInvestigation};
use crate::options::Options;
use crate::types::Duplicate;
use crate::ui;

pub struct DuplicateDetector {
    options: Options,
}

impl DuplicateDetector {
    pub fn new(options: Options) -> Self {
        Self { options }
    }

    pub fn check_duplicates(&self, duplicates: Vec<Duplicate>) -> Result<Vec<Duplicate>> {
        info!("Creating coordinator");
        let (investigation_out, investigation_in) = ::crossbeam::channel::unbounded();

        let max_duplicate_size = duplicates
            .iter()
            .map(|d| (d.locations.len() as u64 - 1) * d.file_size)
            .sum();

        let coordinator = Coordinator::new(
            self.options,
            investigation_out.clone(),
            investigation_in,
            duplicates.len() as u64,
            max_duplicate_size,
        );

        let feed_handle: JoinHandle<Result<()>> = thread::spawn(move || {
            for duplicate in duplicates {
                investigation_out
                    .send(DuplicateInvestigation::new(duplicate))
                    .context(anyhow!("Could not send investigation"))?
            }
            Ok(())
        });

        info!("Creating progress display");

        let (sender, display_handle) = if self.options.debug {
            self.spawn_progress_debug()
        } else if self.options.no_progress {
            self.spawn_progress_noop()
        } else {
            self.spawn_progress_display()
        };

        info!("Awaiting coordination completion");

        let res = coordinator.work(sender);

        display_handle
            .join()
            .map_err(|_| anyhow!("Could not join coordinator thread"))??;

        feed_handle
            .join()
            .map_err(|_| anyhow!("Could not join coordinator thread"))??;

        res
    }

    fn spawn_progress_noop(&self) -> (Sender<AnalysisMessage>, JoinHandle<Result<()>>) {
        let (sender, receiver) = channel::unbounded();

        let handle = std::thread::spawn(move || {
            while receiver.recv().is_ok() {}
            Ok(())
        });

        (sender, handle)
    }

    fn spawn_progress_debug(&self) -> (Sender<AnalysisMessage>, JoinHandle<Result<()>>) {
        let (sender, receiver) = channel::unbounded();

        let handle = std::thread::spawn(move || {
            debug!("Starting debug analysis log");
            while let Ok(msg) = receiver.recv() {
                debug!("{:?}", msg);
            }
            debug!("Finished debug analysis log");
            Ok(())
        });

        (sender, handle)
    }

    fn spawn_progress_display(&self) -> (Sender<AnalysisMessage>, JoinHandle<Result<()>>) {
        let (sender, receiver) = channel::unbounded();

        let multi = MultiProgress::new();

        let file_style = ProgressStyle::default_spinner()
            .template("[{elapsed_precise:.yellow}] {bar:40.cyan/blue} {pos}/{len}");

        let bytes_style = ProgressStyle::default_bar()
            .template("|{spinner:.white.dim}| {bar:40.cyan/blue} {bytes}/{total_bytes}")
            .tick_strings(ui::TURBO_FISH);

        // let bytes_style = ProgressStyle::default_bar()
        //     .template("           {bar:40.cyan/blue}: {bytes}/{total_bytes} {msg} data analyzed");

        let files = multi.add(ProgressBar::new(0));
        files.set_style(file_style);

        let bytes = multi.add(ProgressBar::new(0));

        bytes.set_style(bytes_style);
        bytes.enable_steady_tick(50);

        let progress_updater = std::thread::spawn(move || {
            while let Ok(msg) = receiver.recv() {
                let stats = match msg {
                    AnalysisMessage::Update(stats) => stats,
                    AnalysisMessage::Finished(stats) => stats,
                };
                //
                // let mut speedup = (stats.total_bytes_encountered as f64)
                //     .div(stats.total_bytes_read as f64);
                //
                // if speedup.is_nan() {
                //     speedup = 1.0
                // }

                files.set_length(stats.total_suspected_duplicates);
                files.set_position(stats.total_completed_investigations);

                bytes.set_length(stats.max_duplicate_size);
                bytes.set_position(stats.total_duplicate_size);
            }

            files.finish_and_clear();
            bytes.finish_and_clear();
        });

        let handle = std::thread::spawn(move || {
            multi
                .join()
                .map_err(|_| anyhow!("Failed to properly close display"))?;
            progress_updater
                .join()
                .map_err(|_| anyhow!("Failed to properly close display"))
        });

        (sender, handle)
    }
}