use std::cmp::min;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use anyhow::anyhow;
use crossbeam::channel::{Receiver, Sender};
use throttle::Throttle;
use crate::concurrency::TaggedSelect;
use crate::duplicate_detection::messages::{
AnalysisMessage, AnalysisStatistics, DuplicateInvestigation, WorkRequest, WorkResponse,
};
use crate::duplicate_detection::open_file_guard::OpenFileGuard;
use crate::duplicate_detection::worker::Worker;
use crate::options::Options;
use crate::types::Duplicate;
#[derive(Hash, PartialEq, Eq, Copy, Clone)]
enum SplitWork {
LeaseGranted,
InvestigationBatch,
WorkFinished,
}
pub struct Coordinator {
options: Options,
investigation_out: Sender<DuplicateInvestigation>,
investigation_in: Receiver<DuplicateInvestigation>,
investigation_queue_size: u64,
max_duplicate_size: u64,
}
impl Coordinator {
pub fn new(
options: Options,
investigation_out: Sender<DuplicateInvestigation>,
investigation_in: Receiver<DuplicateInvestigation>,
investigation_count: u64,
max_duplicate_size: u64,
) -> Coordinator {
Coordinator {
options,
investigation_out,
investigation_in,
investigation_queue_size: investigation_count,
max_duplicate_size,
}
}
pub fn work(self, analysis_out: Sender<AnalysisMessage>) -> ::anyhow::Result<Vec<Duplicate>> {
let handle = thread::spawn(move || self.do_work(analysis_out));
handle
.join()
.map_err(|_| anyhow!("Could not join split tracker thread"))?
}
fn do_work(
mut self,
analysis_out: Sender<AnalysisMessage>,
) -> ::anyhow::Result<Vec<Duplicate>> {
let (work_request_out, work_request_in) = ::crossbeam::channel::unbounded();
let (work_response_out, work_response_in) = ::crossbeam::channel::unbounded();
let sem = Arc::new(crate::concurrency::Semaphore::new(512));
let (guard, lease_out, lease_in) = OpenFileGuard::new(sem.clone());
guard.work();
for _ in 0..self.options.thread_count {
Worker::new(
work_request_in.clone(),
work_response_out.clone(),
sem.clone(),
)
.work();
}
let mut throttle = Throttle::new(Duration::from_millis(20), 1);
let mut total_investigations = self.investigation_queue_size;
let mut total_completed_investigations = 0u64;
let mut outstanding_work = 0u64;
let mut lease_request_count = 0u64;
let mut select: TaggedSelect<SplitWork> = TaggedSelect::new();
select.recv(SplitWork::InvestigationBatch, &self.investigation_in);
select.recv(SplitWork::LeaseGranted, &lease_in);
select.recv(SplitWork::WorkFinished, &work_response_in);
let mut result = Vec::new();
while self.investigation_queue_size > 0 || lease_request_count > 0 || outstanding_work > 0 {
match select.select() {
(SplitWork::InvestigationBatch, oper) => {
if let Ok(investigation) = oper.recv(&self.investigation_in) {
self.investigation_queue_size -= 1;
let work_size = investigation.duplicate.locations.len();
if work_size > 1 {
lease_out.send(investigation)?;
lease_request_count += min(512, work_size as u64)
} else {
result.push(investigation.duplicate);
}
}
}
(SplitWork::LeaseGranted, oper) => {
debug!("Lease granted message");
let msg = oper.recv(&lease_in);
if let Ok(investigation) = msg {
debug!("Received lease for a new Duplicate");
lease_request_count -=
min(512, investigation.duplicate.locations.len() as u64);
work_request_out.send(WorkRequest::Differentiate(investigation))?;
outstanding_work += 1;
}
}
(SplitWork::WorkFinished, oper) => {
debug!("Work finished!");
let response = oper.recv(&work_response_in)?;
match response {
WorkResponse::PartitionedDuplicates(duplicates) => {
outstanding_work -= 1;
total_investigations += duplicates.len() as u64 - 1;
total_completed_investigations += duplicates.len() as u64;
for duplicate in duplicates {
if duplicate.locations.len() > 1 {
result.push(duplicate)
}
}
}
WorkResponse::FoundWork(work) => {
for investigation in work {
if investigation.duplicate.locations.len() > 1 {
self.investigation_queue_size += 1;
total_investigations += 1;
self.investigation_out.send(investigation)?;
}
}
}
}
}
}
if throttle.accept().is_ok() {
analysis_out.send(AnalysisMessage::Update(self.build_analysis_statistics(
&result,
total_completed_investigations,
total_investigations,
)))?
}
if self.investigation_queue_size == 0 || lease_request_count >= 512 {
debug!("Stopping fetching new duplicates : {}", lease_request_count);
select.remove(SplitWork::InvestigationBatch)
} else {
debug!(
"Continuing fetching new duplicates : {}",
lease_request_count
);
select.recv(SplitWork::InvestigationBatch, &self.investigation_in)
}
}
analysis_out.send(AnalysisMessage::Finished(self.build_analysis_statistics(
&result,
total_completed_investigations,
total_investigations,
)))?;
info!("Done");
Ok(result)
}
fn build_analysis_statistics(
&self,
duplicates: &[Duplicate],
total_completed_investigations: u64,
total_suspected_duplicates: u64,
) -> AnalysisStatistics {
let total_duplicates_found = duplicates.len() as u64;
let total_duplicate_size = duplicates
.iter()
.map(|d| (d.locations.len() as u64 - 1) * d.file_size)
.sum();
AnalysisStatistics {
total_completed_investigations,
total_duplicates_found,
total_suspected_duplicates,
max_duplicate_size: self.max_duplicate_size,
total_duplicate_size,
}
}
}