dsc 0.1.3

dsc is a cli tool for finding and removing duplicate files on one or multiple file systems, while respecting your gitignore rules.
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;

use anyhow::{anyhow, Context, Result};
use crossbeam::channel::{Receiver, Sender};

use crate::concurrency::Semaphore;
use crate::duplicate_detection::messages::{DuplicateInvestigation, WorkRequest, WorkResponse};
use crate::types::{Duplicate, FileDescriptorWithPaths};
use rustc_hash::FxHashMap;
use std::cmp::min;

pub struct Worker {
    work_in: Receiver<WorkRequest>,
    work_out: Sender<WorkResponse>,
    sem: Arc<Semaphore>,
    buffer: Vec<u8>,
}

impl Worker {
    // response message should change into an actual message (split offs needs to be respected)
    pub fn new(
        work_in: Receiver<WorkRequest>,
        work_out: Sender<WorkResponse>,
        sem: Arc<Semaphore>,
    ) -> Self {
        Worker {
            work_in,
            work_out,
            sem,
            buffer: vec![0u8; 4096],
        }
    }

    pub fn work(mut self) -> JoinHandle<Result<Worker>> {
        thread::spawn(move || {
            let res = self.work_internal();
            info!("Worker has finished {:?}", res);
            res?;
            Ok(self)
        })
    }

    // We are allowed to open up to 512 file handles here
    fn work_internal(&mut self) -> Result<()> {
        while let Ok(WorkRequest::Differentiate(investigation)) = self.work_in.recv() {
            self.read_fully(investigation)?;
        }
        Ok(())
    }

    fn read_fully(&mut self, investigation: DuplicateInvestigation) -> Result<()> {
        let DuplicateInvestigation { offset, duplicate } = investigation;

        info!("testing {:?}", duplicate);

        let Duplicate {
            file_size,
            locations,
        } = duplicate;

        let (mut work, mut held_leases) = Self::locations_to_readable_descriptors(locations);

        let mut offset = offset.unwrap_or(0u64);

        let mut result = Vec::new();

        let mut has_more_work = true;

        while has_more_work {
            let (grouping_round, released_leases) = self.do_grouping_round(work, offset);
            held_leases -= released_leases;
            offset += 4096;

            if offset < file_size {
                // Always pick the largest amount of available work
                let (new_work, released_leases) =
                    self.release_future_work(offset, file_size, grouping_round)?;
                work = new_work.unwrap_or_default();
                held_leases -= released_leases;

                // If we don't have any possible duplication left, finish early
                if work.len() < 2 {
                    result = work
                        .into_iter()
                        .map(|descriptor| Duplicate {
                            file_size,
                            locations: vec![descriptor.descriptor],
                        })
                        .collect();
                    work = Vec::new();
                    has_more_work = false;
                }
            } else {
                result = Self::grouping_round_into_duplicates(grouping_round, file_size);
                work = Vec::new();

                has_more_work = false;
            }
        }

        self.work_out
            .send(WorkResponse::PartitionedDuplicates(result))?;

        // Note that even with an empty result work size can still be larger than
        if held_leases > 0 {
            self.sem.release(held_leases);
        }

        Ok(())
    }

    fn grouping_round_into_duplicates(
        grouping_round: FxHashMap<Vec<u8>, Vec<ReadableDescriptor>>,
        file_size: u64,
    ) -> Vec<Duplicate> {
        grouping_round
            .into_iter()
            .map(|v| v.1)
            .map(|v| Duplicate {
                file_size,
                locations: v.into_iter().map(|v| v.descriptor).collect(),
            })
            .collect()
    }

    fn do_grouping_round(
        &mut self,
        work: Vec<ReadableDescriptor>,
        offset: u64,
    ) -> (FxHashMap<Vec<u8>, Vec<ReadableDescriptor>>, usize) {
        let mut grouping_round: FxHashMap<Vec<u8>, Vec<ReadableDescriptor>> = FxHashMap::default();

        let mut lease_count = 0usize;

        for mut item in work {
            // TODO: we may want to consider that we don' fully fill the buffer,
            // however as the same buffer is used for all files and all files have
            // the same size any "random" data at the end of the buffer
            // will not affect the grouping behaviour
            if item.read(offset, &mut self.buffer).is_ok() {
                if let Some(set) = grouping_round.get_mut(&self.buffer) {
                    set.push(item);
                } else {
                    let mut vec = Vec::new();
                    vec.push(item);

                    grouping_round.insert(self.buffer.clone(), vec);
                }
            } else if item.file.is_some() {
                lease_count += 1;
            }
        }

        if lease_count > 0 {
            self.sem.release(lease_count)
        }

        (grouping_round, lease_count)
    }

    fn release_future_work(
        &self,
        offset: u64,
        file_size: u64,
        grouping_round: FxHashMap<Vec<u8>, Vec<ReadableDescriptor>>,
    ) -> Result<(Option<Vec<ReadableDescriptor>>, usize)> {
        // Pick the largest remaining body of work
        let mut partitions: Vec<_> = grouping_round.into_iter().map(|v| v.1).collect();

        // TODO: there is probably a nicer way to extract the maximum
        partitions.sort_by_key(|v| v.len());

        let work = partitions.pop();

        let lease_count = partitions
            .iter()
            .flatten()
            .filter(|d| d.file.is_some())
            .count();

        let future_work: Vec<DuplicateInvestigation> = partitions
            .into_iter()
            .map(|partition| {
                let locations = partition
                    .into_iter()
                    .map(|descriptor| descriptor.descriptor)
                    .collect();

                DuplicateInvestigation {
                    offset: Some(offset),
                    duplicate: Duplicate {
                        file_size,
                        locations,
                    },
                }
            })
            .collect();

        if lease_count > 0 {
            self.sem.release(lease_count);
            self.work_out.send(WorkResponse::FoundWork(future_work))?;
        }

        Ok((work, lease_count))
    }

    fn locations_to_readable_descriptors(
        locations: Vec<FileDescriptorWithPaths>,
    ) -> (Vec<ReadableDescriptor>, usize) {
        let res: Vec<_> = locations
            .into_iter()
            .enumerate()
            .map(|(index, handle)| {
                if index >= 512 {
                    ReadableDescriptor {
                        file: None,
                        descriptor: handle,
                    }
                } else {
                    ReadableDescriptor {
                        file: File::open(handle.paths.iter().next().unwrap()).ok(),
                        descriptor: handle,
                    }
                }
            })
            .collect();
        let lease_count = min(512, res.len());
        (res, lease_count)
    }
}

struct ReadableDescriptor {
    file: Option<File>,
    descriptor: FileDescriptorWithPaths,
}

impl ReadableDescriptor {
    fn read(&mut self, position: u64, buffer: &mut Vec<u8>) -> Result<usize> {
        if let Some(file) = &mut self.file {
            file.read(buffer).context(anyhow!("Failed to read"))
        } else {
            let mut file = File::open(self.descriptor.paths.iter().next().unwrap())?;
            file.seek(SeekFrom::Start(position))?;
            file.read(buffer).context(anyhow!("Failed to read"))
        }
    }
}