dsc 0.1.3

dsc is a cli tool for finding and removing duplicate files on one or multiple file systems, while respecting your gitignore rules.
use std::time::Duration;

use anyhow::{Context, Result};
use crossbeam::channel::{Receiver, Sender};
use throttle::Throttle;

use crate::candidate_selection::collision_tracker::{
    CollisionTracker, FollowUpWork, FullCollisionKey,
};
use crate::candidate_selection::messages::CoordinatorMessage::HashCommand;
use crate::candidate_selection::messages::{
    AnalysisMessage, AnalysisStatistics, CoordinatorMessage, FileWalkerMessage, FileWithDescriptor,
    WorkerMessage,
};
use crate::concurrency::TaggedSelect;
use crate::options::Options;
use crate::types::{Duplicate, FileDescriptorWithPaths};

pub struct MessageHandler {
    options: Options,
    outstanding_computations: u32,
    total_file_size: u64,
    total_bytes_read: u64,
    file_count: u64,
    tracker: CollisionTracker,
    out: Sender<CoordinatorMessage>,
    coordinator_files_in: Receiver<FileWalkerMessage>,
    coordinator_work_in: Receiver<WorkerMessage>,
}

#[derive(Eq, Hash, PartialEq, Copy, Clone)]
enum CoordinateAction {
    FileReceived,
    WorkResultReceived,
}

impl MessageHandler {
    pub fn new(
        options: Options,
        out: Sender<CoordinatorMessage>,
        coordinator_files_in: Receiver<FileWalkerMessage>,
        coordinator_work_in: Receiver<WorkerMessage>,
    ) -> Self {
        MessageHandler {
            options,
            outstanding_computations: 0,
            total_file_size: 0,
            total_bytes_read: 0,
            file_count: 0,
            tracker: CollisionTracker::new(),
            out,
            coordinator_files_in,
            coordinator_work_in,
        }
    }

    pub fn into_duplicates(self) -> Vec<Duplicate> {
        let CollisionTracker {
            mut handles,
            duplicates,
            ..
        } = self.tracker;

        duplicates
            .into_iter()
            .map(|entry| {
                let (FullCollisionKey { file_size, .. }, descriptors) = entry;
                Duplicate {
                    file_size,
                    locations: descriptors
                        .into_iter()
                        .map(|file_descriptor| {
                            handles
                                .remove(&file_descriptor)
                                .map(|paths| FileDescriptorWithPaths {
                                    file_descriptor,
                                    paths,
                                })
                        })
                        .flat_map(|option| option.into_iter())
                        .collect(),
                }
            })
            .collect()
    }

    pub fn handle_messages(&mut self, update_sender: Sender<AnalysisMessage>) -> Result<()> {
        info!("Starting to handle messages");
        let mut files_finished = false;

        let coordinator_files_in = self.coordinator_files_in.clone();
        let coordinator_work_in = self.coordinator_work_in.clone();

        let mut select = TaggedSelect::new();

        select.recv(CoordinateAction::WorkResultReceived, &coordinator_work_in);

        let mut throttle = Throttle::new(Duration::from_millis(20), 1);

        while self.outstanding_computations > 0 || !files_finished {
            if !files_finished && self.outstanding_computations < 64 {
                select.recv(CoordinateAction::FileReceived, &coordinator_files_in)
            } else {
                select.remove(CoordinateAction::FileReceived);
            }

            trace!("Waiting for next message");

            if files_finished {
                info!("Waiting for more work")
            }

            match select.select() {
                (CoordinateAction::FileReceived, oper) => {
                    trace!("Reading file operation");
                    let message = oper
                        .recv(&coordinator_files_in)
                        .context("Receiving file failed")?;
                    trace!("Handling file operation");
                    files_finished = self
                        .handle_file_walker_message(message)
                        .context("Handling file operation failed")?;

                    if files_finished {
                        info!(
                            "Received finished message, waiting for {} computations to finish",
                            self.outstanding_computations
                        )
                    }
                    trace!("Finished handling file operation");
                }
                (CoordinateAction::WorkResultReceived, oper) => {
                    if files_finished {
                        info!(
                            "Received worker message, {} computations remaining",
                            self.outstanding_computations
                        )
                    }
                    trace!("Reading worker operation");
                    let message = oper
                        .recv(&coordinator_work_in)
                        .context("Receiving work result failed")?;
                    trace!("Handling worker operation");
                    self.handle_worker_message(message)
                        .context("Handling work operation failed")?;
                    trace!("Finished handling worker operation");
                }
            }

            if throttle.accept().is_ok() {
                trace!("Sending update to analysis feedback");
                update_sender.send(AnalysisMessage::Update(self.build_statistics()))?;
                trace!("Finished sending update to analysis feedback");
            }
        }

        info!("Message handler sending last analysis message");

        update_sender.send(AnalysisMessage::Finished(self.build_statistics()))?;

        info!("Message handler finished");

        Ok(())
    }

    fn build_statistics(&self) -> AnalysisStatistics {
        let duplicates_found = self
            .tracker
            .duplicates
            .iter()
            .map(|v| (v.1.len() - 1) as u64)
            .sum();

        let total_bytes_duplicated = self
            .tracker
            .duplicates
            .iter()
            .map(|v| v.0.file_size * ((v.1.len() as u64) - 1))
            .sum();

        AnalysisStatistics {
            files_found: self.file_count,
            duplicates_found,
            total_bytes_encountered: self.total_file_size,
            total_bytes_read: self.total_bytes_read,
            total_bytes_duplicated,
        }
    }

    fn handle_file_walker_message(&mut self, message: FileWalkerMessage) -> Result<bool> {
        match message {
            FileWalkerMessage::FileFound(path) => {
                trace!("Reading file metadata");

                let metadata = match path.metadata() {
                    Ok(metadata) => metadata,
                    Err(err) => {
                        trace!("File could not be inspected {:?}", err);
                        return Ok(false);
                    }
                };

                let file_size = metadata.len();

                if file_size < self.options.min_size {
                    return Ok(false);
                }

                if let Some(max_file_size) = self.options.max_size {
                    if file_size > max_file_size {
                        return Ok(false);
                    }
                }

                trace!("Opening file");

                let file_with_handle = match FileWithDescriptor::from_path(path.as_path()) {
                    Ok(result) => result,
                    Err(err) => {
                        trace!("File could not be opened {:?}", err);
                        return Ok(false);
                    }
                };

                trace!("Opened file");

                if self
                    .tracker
                    .register_path(file_with_handle.file_descriptor, path)
                {
                    self.file_count += 1;
                    self.outstanding_computations += 1;

                    let instructions = self.options.hashing_strategy.instructions(file_size);

                    self.total_file_size += instructions.file_size;
                    // total_size_bar.set_position(total_size);
                    trace!("Sending hash instructions for new file");
                    self.out
                        .send(HashCommand(file_with_handle, instructions))
                        .context("Sending hash command failed")?;
                }
                Ok(false)
            }

            FileWalkerMessage::WalkCompleted => {
                info!("File walk completed");
                Ok(true)
            }
        }
    }

    fn handle_worker_message(&mut self, message: WorkerMessage) -> Result<()> {
        match message {
            WorkerMessage::NotHashable(file_with_handle) => {
                self.outstanding_computations -= 1;
                self.tracker.remove_file(file_with_handle);
            }

            WorkerMessage::HashComputed(file_with_handle, result) => {
                self.outstanding_computations -= 1;
                self.total_bytes_read += result.instructions.work_size();

                match self.tracker.update_hash(file_with_handle, &result)? {
                    FollowUpWork::Nothing => {}

                    FollowUpWork::One(handle) => {
                        let instructions = self
                            .options
                            .hashing_strategy
                            .next_instructions(&result.instructions, result.hash);
                        self.outstanding_computations += 1;

                        trace!("Sending hash instructions for existing file");

                        self.out.send(HashCommand(handle, instructions))?
                    }

                    FollowUpWork::Two(first_handle, second_handle) => {
                        let first_instructions = self
                            .options
                            .hashing_strategy
                            .next_instructions(&result.instructions, result.hash);
                        let second_instructions = self
                            .options
                            .hashing_strategy
                            .next_instructions(&result.instructions, result.hash);

                        self.outstanding_computations += 2;

                        trace!("Sending hash instructions for existing files");

                        self.out
                            .send(HashCommand(first_handle, first_instructions))?;
                        self.out
                            .send(HashCommand(second_handle, second_instructions))?
                    }
                }
            }
        }
        Ok(())
    }
}