dsc 0.1.3

dsc is a cli tool for finding and removing duplicate files on one or multiple file systems, while respecting your gitignore rules.
use std::cmp::min;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use anyhow::anyhow;
use crossbeam::channel::{Receiver, Sender};
use throttle::Throttle;

use crate::concurrency::TaggedSelect;
use crate::duplicate_detection::messages::{
    AnalysisMessage, AnalysisStatistics, DuplicateInvestigation, WorkRequest, WorkResponse,
};
use crate::duplicate_detection::open_file_guard::OpenFileGuard;
use crate::duplicate_detection::worker::Worker;
use crate::options::Options;
use crate::types::Duplicate;

#[derive(Hash, PartialEq, Eq, Copy, Clone)]
enum SplitWork {
    LeaseGranted,
    InvestigationBatch,
    WorkFinished,
}

pub struct Coordinator {
    options: Options,
    investigation_out: Sender<DuplicateInvestigation>,
    investigation_in: Receiver<DuplicateInvestigation>,
    investigation_queue_size: u64,
    max_duplicate_size: u64,
}

impl Coordinator {
    pub fn new(
        options: Options,
        investigation_out: Sender<DuplicateInvestigation>,
        investigation_in: Receiver<DuplicateInvestigation>,
        investigation_count: u64,
        max_duplicate_size: u64,
    ) -> Coordinator {
        Coordinator {
            options,
            investigation_out,
            investigation_in,
            investigation_queue_size: investigation_count,
            max_duplicate_size,
        }
    }

    pub fn work(self, analysis_out: Sender<AnalysisMessage>) -> ::anyhow::Result<Vec<Duplicate>> {
        let handle = thread::spawn(move || self.do_work(analysis_out));

        handle
            .join()
            .map_err(|_| anyhow!("Could not join split tracker thread"))?
    }

    fn do_work(
        mut self,
        analysis_out: Sender<AnalysisMessage>,
    ) -> ::anyhow::Result<Vec<Duplicate>> {
        let (work_request_out, work_request_in) = ::crossbeam::channel::unbounded();
        let (work_response_out, work_response_in) = ::crossbeam::channel::unbounded();

        let sem = Arc::new(crate::concurrency::Semaphore::new(512));

        let (guard, lease_out, lease_in) = OpenFileGuard::new(sem.clone());
        guard.work();

        for _ in 0..self.options.thread_count {
            Worker::new(
                work_request_in.clone(),
                work_response_out.clone(),
                sem.clone(),
            )
            .work();
        }

        let mut throttle = Throttle::new(Duration::from_millis(20), 1);

        let mut total_investigations = self.investigation_queue_size;
        let mut total_completed_investigations = 0u64;
        let mut outstanding_work = 0u64;
        let mut lease_request_count = 0u64;
        let mut select: TaggedSelect<SplitWork> = TaggedSelect::new();

        select.recv(SplitWork::InvestigationBatch, &self.investigation_in);
        select.recv(SplitWork::LeaseGranted, &lease_in);
        select.recv(SplitWork::WorkFinished, &work_response_in);

        let mut result = Vec::new();

        while self.investigation_queue_size > 0 || lease_request_count > 0 || outstanding_work > 0 {
            match select.select() {
                (SplitWork::InvestigationBatch, oper) => {
                    if let Ok(investigation) = oper.recv(&self.investigation_in) {
                        self.investigation_queue_size -= 1;

                        let work_size = investigation.duplicate.locations.len();

                        if work_size > 1 {
                            lease_out.send(investigation)?;
                            lease_request_count += min(512, work_size as u64)
                        } else {
                            result.push(investigation.duplicate);
                        }
                    }
                }
                (SplitWork::LeaseGranted, oper) => {
                    debug!("Lease granted message");
                    let msg = oper.recv(&lease_in);

                    if let Ok(investigation) = msg {
                        debug!("Received lease for a new Duplicate");

                        lease_request_count -=
                            min(512, investigation.duplicate.locations.len() as u64);
                        work_request_out.send(WorkRequest::Differentiate(investigation))?;
                        outstanding_work += 1;
                    }
                }
                (SplitWork::WorkFinished, oper) => {
                    debug!("Work finished!");
                    let response = oper.recv(&work_response_in)?;

                    match response {
                        WorkResponse::PartitionedDuplicates(duplicates) => {
                            outstanding_work -= 1;
                            total_investigations += duplicates.len() as u64 - 1;
                            total_completed_investigations += duplicates.len() as u64;

                            for duplicate in duplicates {
                                if duplicate.locations.len() > 1 {
                                    result.push(duplicate)
                                }
                            }
                        }
                        WorkResponse::FoundWork(work) => {
                            for investigation in work {
                                if investigation.duplicate.locations.len() > 1 {
                                    self.investigation_queue_size += 1;
                                    total_investigations += 1;
                                    self.investigation_out.send(investigation)?;
                                }
                            }
                        }
                    }
                }
            }

            if throttle.accept().is_ok() {
                analysis_out.send(AnalysisMessage::Update(self.build_analysis_statistics(
                    &result,
                    total_completed_investigations,
                    total_investigations,
                )))?
            }

            // Don't take more batches if we are currently not expecting any more message
            // or are waiting for a lease
            if self.investigation_queue_size == 0 || lease_request_count >= 512 {
                debug!("Stopping fetching new duplicates : {}", lease_request_count);
                select.remove(SplitWork::InvestigationBatch)
            } else {
                debug!(
                    "Continuing fetching new duplicates : {}",
                    lease_request_count
                );
                select.recv(SplitWork::InvestigationBatch, &self.investigation_in)
            }
        }

        analysis_out.send(AnalysisMessage::Finished(self.build_analysis_statistics(
            &result,
            total_completed_investigations,
            total_investigations,
        )))?;

        info!("Done");

        Ok(result)
    }

    fn build_analysis_statistics(
        &self,
        duplicates: &[Duplicate],
        total_completed_investigations: u64,
        total_suspected_duplicates: u64,
    ) -> AnalysisStatistics {
        let total_duplicates_found = duplicates.len() as u64;
        let total_duplicate_size = duplicates
            .iter()
            .map(|d| (d.locations.len() as u64 - 1) * d.file_size)
            .sum();

        AnalysisStatistics {
            total_completed_investigations,
            total_duplicates_found,
            total_suspected_duplicates,
            max_duplicate_size: self.max_duplicate_size,
            total_duplicate_size,
        }
    }
}