use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use std::thread;
use std::thread::JoinHandle;
use anyhow::{anyhow, Context, Result};
use crossbeam::channel::{Receiver, Sender};
use crate::concurrency::Semaphore;
use crate::duplicate_detection::messages::{DuplicateInvestigation, WorkRequest, WorkResponse};
use crate::types::{Duplicate, FileDescriptorWithPaths};
use rustc_hash::FxHashMap;
use std::cmp::min;
pub struct Worker {
work_in: Receiver<WorkRequest>,
work_out: Sender<WorkResponse>,
sem: Arc<Semaphore>,
buffer: Vec<u8>,
}
impl Worker {
pub fn new(
work_in: Receiver<WorkRequest>,
work_out: Sender<WorkResponse>,
sem: Arc<Semaphore>,
) -> Self {
Worker {
work_in,
work_out,
sem,
buffer: vec![0u8; 4096],
}
}
pub fn work(mut self) -> JoinHandle<Result<Worker>> {
thread::spawn(move || {
let res = self.work_internal();
info!("Worker has finished {:?}", res);
res?;
Ok(self)
})
}
fn work_internal(&mut self) -> Result<()> {
while let Ok(WorkRequest::Differentiate(investigation)) = self.work_in.recv() {
self.read_fully(investigation)?;
}
Ok(())
}
fn read_fully(&mut self, investigation: DuplicateInvestigation) -> Result<()> {
let DuplicateInvestigation { offset, duplicate } = investigation;
info!("testing {:?}", duplicate);
let Duplicate {
file_size,
locations,
} = duplicate;
let (mut work, mut held_leases) = Self::locations_to_readable_descriptors(locations);
let mut offset = offset.unwrap_or(0u64);
let mut result = Vec::new();
let mut has_more_work = true;
while has_more_work {
let (grouping_round, released_leases) = self.do_grouping_round(work, offset);
held_leases -= released_leases;
offset += 4096;
if offset < file_size {
let (new_work, released_leases) =
self.release_future_work(offset, file_size, grouping_round)?;
work = new_work.unwrap_or_default();
held_leases -= released_leases;
if work.len() < 2 {
result = work
.into_iter()
.map(|descriptor| Duplicate {
file_size,
locations: vec![descriptor.descriptor],
})
.collect();
work = Vec::new();
has_more_work = false;
}
} else {
result = Self::grouping_round_into_duplicates(grouping_round, file_size);
work = Vec::new();
has_more_work = false;
}
}
self.work_out
.send(WorkResponse::PartitionedDuplicates(result))?;
if held_leases > 0 {
self.sem.release(held_leases);
}
Ok(())
}
fn grouping_round_into_duplicates(
grouping_round: FxHashMap<Vec<u8>, Vec<ReadableDescriptor>>,
file_size: u64,
) -> Vec<Duplicate> {
grouping_round
.into_iter()
.map(|v| v.1)
.map(|v| Duplicate {
file_size,
locations: v.into_iter().map(|v| v.descriptor).collect(),
})
.collect()
}
fn do_grouping_round(
&mut self,
work: Vec<ReadableDescriptor>,
offset: u64,
) -> (FxHashMap<Vec<u8>, Vec<ReadableDescriptor>>, usize) {
let mut grouping_round: FxHashMap<Vec<u8>, Vec<ReadableDescriptor>> = FxHashMap::default();
let mut lease_count = 0usize;
for mut item in work {
if item.read(offset, &mut self.buffer).is_ok() {
if let Some(set) = grouping_round.get_mut(&self.buffer) {
set.push(item);
} else {
let mut vec = Vec::new();
vec.push(item);
grouping_round.insert(self.buffer.clone(), vec);
}
} else if item.file.is_some() {
lease_count += 1;
}
}
if lease_count > 0 {
self.sem.release(lease_count)
}
(grouping_round, lease_count)
}
fn release_future_work(
&self,
offset: u64,
file_size: u64,
grouping_round: FxHashMap<Vec<u8>, Vec<ReadableDescriptor>>,
) -> Result<(Option<Vec<ReadableDescriptor>>, usize)> {
let mut partitions: Vec<_> = grouping_round.into_iter().map(|v| v.1).collect();
partitions.sort_by_key(|v| v.len());
let work = partitions.pop();
let lease_count = partitions
.iter()
.flatten()
.filter(|d| d.file.is_some())
.count();
let future_work: Vec<DuplicateInvestigation> = partitions
.into_iter()
.map(|partition| {
let locations = partition
.into_iter()
.map(|descriptor| descriptor.descriptor)
.collect();
DuplicateInvestigation {
offset: Some(offset),
duplicate: Duplicate {
file_size,
locations,
},
}
})
.collect();
if lease_count > 0 {
self.sem.release(lease_count);
self.work_out.send(WorkResponse::FoundWork(future_work))?;
}
Ok((work, lease_count))
}
fn locations_to_readable_descriptors(
locations: Vec<FileDescriptorWithPaths>,
) -> (Vec<ReadableDescriptor>, usize) {
let res: Vec<_> = locations
.into_iter()
.enumerate()
.map(|(index, handle)| {
if index >= 512 {
ReadableDescriptor {
file: None,
descriptor: handle,
}
} else {
ReadableDescriptor {
file: File::open(handle.paths.iter().next().unwrap()).ok(),
descriptor: handle,
}
}
})
.collect();
let lease_count = min(512, res.len());
(res, lease_count)
}
}
struct ReadableDescriptor {
file: Option<File>,
descriptor: FileDescriptorWithPaths,
}
impl ReadableDescriptor {
fn read(&mut self, position: u64, buffer: &mut Vec<u8>) -> Result<usize> {
if let Some(file) = &mut self.file {
file.read(buffer).context(anyhow!("Failed to read"))
} else {
let mut file = File::open(self.descriptor.paths.iter().next().unwrap())?;
file.seek(SeekFrom::Start(position))?;
file.read(buffer).context(anyhow!("Failed to read"))
}
}
}