dsc 0.1.3

dsc is a cli tool for finding and removing duplicate files on one or multiple file systems, while respecting your gitignore rules.
use std::path::PathBuf;
use std::thread::JoinHandle;

use anyhow::{anyhow, Result};
use crossbeam::channel;
use crossbeam::channel::{Receiver, Sender};

use crate::candidate_selection::message_handler::MessageHandler;
use crate::candidate_selection::messages::{AnalysisMessage, CoordinatorMessage, WorkerMessage};
use crate::candidate_selection::path_walker;
use crate::candidate_selection::worker::Worker;
use crate::options::Options;
use crate::types::Duplicate;

pub struct Coordinator {
    options: Options,
}

impl Coordinator {
    pub fn new(options: Options) -> Self {
        Coordinator { options }
    }

    pub fn coordinate(
        &self,
        path_vec: &[PathBuf],
        update_sender: Sender<AnalysisMessage>,
    ) -> Result<Vec<Duplicate>> {
        info!("Coordinator starting");

        let (coordinator_out, worker_in) = channel::unbounded();
        let (worker_out, coordinator_work_in) = channel::unbounded();
        let (file_walker_out, coordinator_files_in) = channel::bounded(2048);

        info!("Spawning workers");

        self.spawn_workers(worker_in, worker_out);

        info!("Finished spawning workers");

        let mut message_handler = MessageHandler::new(
            self.options,
            coordinator_out,
            coordinator_files_in,
            coordinator_work_in,
        );

        let handle = std::thread::spawn(move || {
            info!("Asking MessageHandler to start handling messages");

            let res = message_handler.handle_messages(update_sender);

            info!("Finished handling messages {:?}", res);

            let result = message_handler.into_duplicates();

            res?;

            Ok(result)
        });

        info!("Walking paths");

        path_walker::walk_paths(path_vec, file_walker_out)?;

        info!("Walk finished, waiting for message handler to complete");

        handle
            .join()
            .map_err(|_| anyhow!("Analysis failed unexpectedly"))?
    }

    fn spawn_workers(
        &self,
        receiver: Receiver<CoordinatorMessage>,
        sender: Sender<WorkerMessage>,
    ) -> Vec<JoinHandle<Result<Worker>>> {
        (0..self.options.thread_count)
            .into_iter()
            .map(|_| {
                let worker = Worker::new(
                    receiver.clone(),
                    sender.clone(),
                    self.options.block_size as usize,
                );
                worker.work()
            })
            .collect()
    }
}