use std::time::Duration;
use anyhow::{Context, Result};
use crossbeam::channel::{Receiver, Sender};
use throttle::Throttle;
use crate::candidate_selection::collision_tracker::{
CollisionTracker, FollowUpWork, FullCollisionKey,
};
use crate::candidate_selection::messages::CoordinatorMessage::HashCommand;
use crate::candidate_selection::messages::{
AnalysisMessage, AnalysisStatistics, CoordinatorMessage, FileWalkerMessage, FileWithDescriptor,
WorkerMessage,
};
use crate::concurrency::TaggedSelect;
use crate::options::Options;
use crate::types::{Duplicate, FileDescriptorWithPaths};
pub struct MessageHandler {
options: Options,
outstanding_computations: u32,
total_file_size: u64,
total_bytes_read: u64,
file_count: u64,
tracker: CollisionTracker,
out: Sender<CoordinatorMessage>,
coordinator_files_in: Receiver<FileWalkerMessage>,
coordinator_work_in: Receiver<WorkerMessage>,
}
#[derive(Eq, Hash, PartialEq, Copy, Clone)]
enum CoordinateAction {
FileReceived,
WorkResultReceived,
}
impl MessageHandler {
pub fn new(
options: Options,
out: Sender<CoordinatorMessage>,
coordinator_files_in: Receiver<FileWalkerMessage>,
coordinator_work_in: Receiver<WorkerMessage>,
) -> Self {
MessageHandler {
options,
outstanding_computations: 0,
total_file_size: 0,
total_bytes_read: 0,
file_count: 0,
tracker: CollisionTracker::new(),
out,
coordinator_files_in,
coordinator_work_in,
}
}
pub fn into_duplicates(self) -> Vec<Duplicate> {
let CollisionTracker {
mut handles,
duplicates,
..
} = self.tracker;
duplicates
.into_iter()
.map(|entry| {
let (FullCollisionKey { file_size, .. }, descriptors) = entry;
Duplicate {
file_size,
locations: descriptors
.into_iter()
.map(|file_descriptor| {
handles
.remove(&file_descriptor)
.map(|paths| FileDescriptorWithPaths {
file_descriptor,
paths,
})
})
.flat_map(|option| option.into_iter())
.collect(),
}
})
.collect()
}
pub fn handle_messages(&mut self, update_sender: Sender<AnalysisMessage>) -> Result<()> {
info!("Starting to handle messages");
let mut files_finished = false;
let coordinator_files_in = self.coordinator_files_in.clone();
let coordinator_work_in = self.coordinator_work_in.clone();
let mut select = TaggedSelect::new();
select.recv(CoordinateAction::WorkResultReceived, &coordinator_work_in);
let mut throttle = Throttle::new(Duration::from_millis(20), 1);
while self.outstanding_computations > 0 || !files_finished {
if !files_finished && self.outstanding_computations < 64 {
select.recv(CoordinateAction::FileReceived, &coordinator_files_in)
} else {
select.remove(CoordinateAction::FileReceived);
}
trace!("Waiting for next message");
if files_finished {
info!("Waiting for more work")
}
match select.select() {
(CoordinateAction::FileReceived, oper) => {
trace!("Reading file operation");
let message = oper
.recv(&coordinator_files_in)
.context("Receiving file failed")?;
trace!("Handling file operation");
files_finished = self
.handle_file_walker_message(message)
.context("Handling file operation failed")?;
if files_finished {
info!(
"Received finished message, waiting for {} computations to finish",
self.outstanding_computations
)
}
trace!("Finished handling file operation");
}
(CoordinateAction::WorkResultReceived, oper) => {
if files_finished {
info!(
"Received worker message, {} computations remaining",
self.outstanding_computations
)
}
trace!("Reading worker operation");
let message = oper
.recv(&coordinator_work_in)
.context("Receiving work result failed")?;
trace!("Handling worker operation");
self.handle_worker_message(message)
.context("Handling work operation failed")?;
trace!("Finished handling worker operation");
}
}
if throttle.accept().is_ok() {
trace!("Sending update to analysis feedback");
update_sender.send(AnalysisMessage::Update(self.build_statistics()))?;
trace!("Finished sending update to analysis feedback");
}
}
info!("Message handler sending last analysis message");
update_sender.send(AnalysisMessage::Finished(self.build_statistics()))?;
info!("Message handler finished");
Ok(())
}
fn build_statistics(&self) -> AnalysisStatistics {
let duplicates_found = self
.tracker
.duplicates
.iter()
.map(|v| (v.1.len() - 1) as u64)
.sum();
let total_bytes_duplicated = self
.tracker
.duplicates
.iter()
.map(|v| v.0.file_size * ((v.1.len() as u64) - 1))
.sum();
AnalysisStatistics {
files_found: self.file_count,
duplicates_found,
total_bytes_encountered: self.total_file_size,
total_bytes_read: self.total_bytes_read,
total_bytes_duplicated,
}
}
fn handle_file_walker_message(&mut self, message: FileWalkerMessage) -> Result<bool> {
match message {
FileWalkerMessage::FileFound(path) => {
trace!("Reading file metadata");
let metadata = match path.metadata() {
Ok(metadata) => metadata,
Err(err) => {
trace!("File could not be inspected {:?}", err);
return Ok(false);
}
};
let file_size = metadata.len();
if file_size < self.options.min_size {
return Ok(false);
}
if let Some(max_file_size) = self.options.max_size {
if file_size > max_file_size {
return Ok(false);
}
}
trace!("Opening file");
let file_with_handle = match FileWithDescriptor::from_path(path.as_path()) {
Ok(result) => result,
Err(err) => {
trace!("File could not be opened {:?}", err);
return Ok(false);
}
};
trace!("Opened file");
if self
.tracker
.register_path(file_with_handle.file_descriptor, path)
{
self.file_count += 1;
self.outstanding_computations += 1;
let instructions = self.options.hashing_strategy.instructions(file_size);
self.total_file_size += instructions.file_size;
trace!("Sending hash instructions for new file");
self.out
.send(HashCommand(file_with_handle, instructions))
.context("Sending hash command failed")?;
}
Ok(false)
}
FileWalkerMessage::WalkCompleted => {
info!("File walk completed");
Ok(true)
}
}
}
fn handle_worker_message(&mut self, message: WorkerMessage) -> Result<()> {
match message {
WorkerMessage::NotHashable(file_with_handle) => {
self.outstanding_computations -= 1;
self.tracker.remove_file(file_with_handle);
}
WorkerMessage::HashComputed(file_with_handle, result) => {
self.outstanding_computations -= 1;
self.total_bytes_read += result.instructions.work_size();
match self.tracker.update_hash(file_with_handle, &result)? {
FollowUpWork::Nothing => {}
FollowUpWork::One(handle) => {
let instructions = self
.options
.hashing_strategy
.next_instructions(&result.instructions, result.hash);
self.outstanding_computations += 1;
trace!("Sending hash instructions for existing file");
self.out.send(HashCommand(handle, instructions))?
}
FollowUpWork::Two(first_handle, second_handle) => {
let first_instructions = self
.options
.hashing_strategy
.next_instructions(&result.instructions, result.hash);
let second_instructions = self
.options
.hashing_strategy
.next_instructions(&result.instructions, result.hash);
self.outstanding_computations += 2;
trace!("Sending hash instructions for existing files");
self.out
.send(HashCommand(first_handle, first_instructions))?;
self.out
.send(HashCommand(second_handle, second_instructions))?
}
}
}
}
Ok(())
}
}