use std::path::PathBuf;
use std::thread::JoinHandle;
use anyhow::{anyhow, Result};
use crossbeam::channel;
use crossbeam::channel::{Receiver, Sender};
use crate::candidate_selection::message_handler::MessageHandler;
use crate::candidate_selection::messages::{AnalysisMessage, CoordinatorMessage, WorkerMessage};
use crate::candidate_selection::path_walker;
use crate::candidate_selection::worker::Worker;
use crate::options::Options;
use crate::types::Duplicate;
pub struct Coordinator {
options: Options,
}
impl Coordinator {
pub fn new(options: Options) -> Self {
Coordinator { options }
}
pub fn coordinate(
&self,
path_vec: &[PathBuf],
update_sender: Sender<AnalysisMessage>,
) -> Result<Vec<Duplicate>> {
info!("Coordinator starting");
let (coordinator_out, worker_in) = channel::unbounded();
let (worker_out, coordinator_work_in) = channel::unbounded();
let (file_walker_out, coordinator_files_in) = channel::bounded(2048);
info!("Spawning workers");
self.spawn_workers(worker_in, worker_out);
info!("Finished spawning workers");
let mut message_handler = MessageHandler::new(
self.options,
coordinator_out,
coordinator_files_in,
coordinator_work_in,
);
let handle = std::thread::spawn(move || {
info!("Asking MessageHandler to start handling messages");
let res = message_handler.handle_messages(update_sender);
info!("Finished handling messages {:?}", res);
let result = message_handler.into_duplicates();
res?;
Ok(result)
});
info!("Walking paths");
path_walker::walk_paths(path_vec, file_walker_out)?;
info!("Walk finished, waiting for message handler to complete");
handle
.join()
.map_err(|_| anyhow!("Analysis failed unexpectedly"))?
}
fn spawn_workers(
&self,
receiver: Receiver<CoordinatorMessage>,
sender: Sender<WorkerMessage>,
) -> Vec<JoinHandle<Result<Worker>>> {
(0..self.options.thread_count)
.into_iter()
.map(|_| {
let worker = Worker::new(
receiver.clone(),
sender.clone(),
self.options.block_size as usize,
);
worker.work()
})
.collect()
}
}