use crate::integrator::{Integration, IntegrationActor, Task};
use crate::notifier::FileCreated;
use crate::results::{ResultsActor, TaskToDo};
use crate::utils::{EntryProcessor, WalkResult};
use crate::{dispatcher::IType, Shutdown};
use actix::{
Actor, ActorContext, Addr, Handler, Message, Running, SyncArbiter, SyncContext, System,
};
use cryiorust::frame::{self, is_frame};
use itertools::Itertools;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::fmt::Debug;
use std::fs::{self, File, ReadDir};
use std::io::{self, BufRead, Write};
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::Arc;
pub struct WalkerActor {
threads: usize,
itype: IType,
ia: Addr<IntegrationActor>,
results: Addr<ResultsActor>,
accumulator: HashMap<Rc<Path>, HashMap<Rc<OsStr>, Vec<PathBuf>>>,
average: usize,
}
impl WalkerActor {
pub fn new(
threads: usize,
i: Arc<RwLock<Integration>>,
results: Addr<ResultsActor>,
itype: IType,
) -> WalkerActor {
let r = results.clone();
WalkerActor {
threads,
itype,
ia: SyncArbiter::start(threads, move || IntegrationActor::new(i.clone(), r.clone())),
results,
accumulator: Default::default(),
average: 0,
}
}
fn run<T: AsRef<Path> + Debug>(&self, path: T, addr: Addr<WalkerActor>) {
debug!("Walking in {:?}", path);
let paths = fs::read_dir(path);
match paths {
Ok(paths) => self.visit(paths, addr),
Err(e) => error!("Could not read dir: {}", e),
}
}
fn visit(&self, paths: ReadDir, addr: Addr<WalkerActor>) {
let mut acc = HashMap::new();
paths
.filter_map(|r| r.ok())
.sorted_by_key(|e| e.path())
.map(|path| match path.process() {
WalkResult::None(e) => warn!("Could not process entry {:?}: {}", path, e),
WalkResult::Dir(path) => addr.do_send(WalkInto(path)),
WalkResult::File(path) => {
self.results.do_send(TaskToDo(1));
let task = acc.accumulate(path, self.average);
if task.is_done() {
self.ia.do_send(task);
}
}
})
.for_each(|_| {});
}
}
impl Actor for WalkerActor {
type Context = SyncContext<Self>;
fn started(&mut self, _: &mut Self::Context) {
debug!("{} Walker has started", self.itype);
}
fn stopping(&mut self, _: &mut Self::Context) -> Running {
debug!("{} Walker is stopping", self.itype);
for _ in 0..self.threads {
self.ia.do_send(Shutdown);
}
Running::Stop
}
fn stopped(&mut self, _: &mut Self::Context) {
debug!("{} Walker has stopped", self.itype);
System::current().stop();
}
}
impl Handler<Shutdown> for WalkerActor {
type Result = ();
fn handle(&mut self, _: Shutdown, ctx: &mut Self::Context) -> Self::Result {
ctx.stop();
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct RunWalker<T: AsRef<Path> + Debug> {
pub path: T,
pub average: usize,
}
impl<T: AsRef<Path> + Debug> Handler<RunWalker<T>> for WalkerActor {
type Result = ();
fn handle(&mut self, msg: RunWalker<T>, ctx: &mut Self::Context) -> Self::Result {
self.accumulator.clear();
self.average = msg.average;
self.run(msg.path, ctx.address())
}
}
impl Handler<FileCreated> for WalkerActor {
type Result = ();
fn handle(&mut self, msg: FileCreated, _: &mut Self::Context) -> Self::Result {
self.results.do_send(TaskToDo(1));
if let Some(parent) = msg.0.parent() {
let parent: Rc<Path> = Rc::from(parent);
let accumulator = self.accumulator.entry(parent).or_insert(HashMap::new());
let task = accumulator.accumulate(msg.0, self.average);
if task.is_done() {
self.ia.do_send(task);
}
}
}
}
#[derive(Message)]
#[rtype(result = "()")]
struct WalkInto<T: AsRef<Path> + Debug>(T);
impl<T: AsRef<Path> + Debug> Handler<WalkInto<T>> for WalkerActor {
type Result = ();
fn handle(&mut self, msg: WalkInto<T>, ctx: &mut Self::Context) -> Self::Result {
self.run(msg.0, ctx.address());
}
}
trait PathAccumulator {
fn accumulate(&mut self, path: PathBuf, average: usize) -> Task;
}
impl PathAccumulator for HashMap<Rc<OsStr>, Vec<PathBuf>> {
fn accumulate(&mut self, path: PathBuf, average: usize) -> Task {
if !is_frame(&path) {
return Task::NotFrame(path);
}
if average < 2 || frame::is_possible_multi(&path) {
return Task::One(path);
}
if let Some(ext) = path.extension() {
let key: Rc<OsStr> = Rc::from(ext);
let paths = self.entry(key.clone()).or_insert(vec![]);
if paths.len() < average {
paths.push(path);
}
if paths.len() == average {
return Task::Many(self.remove(&key).unwrap());
};
}
Task::NotYet
}
}
pub struct MultiColumnWalkerActor {
itype: IType,
}
impl Actor for MultiColumnWalkerActor {
type Context = SyncContext<Self>;
fn started(&mut self, _: &mut Self::Context) {
debug!("{} Multicolumn Walker has started", self.itype);
}
fn stopping(&mut self, _: &mut Self::Context) -> Running {
debug!("{} Multicolumn Walker is stopping", self.itype);
Running::Stop
}
fn stopped(&mut self, _: &mut Self::Context) {
debug!("{} Multicolumn Walker has stopped", self.itype);
System::current().stop();
}
}
impl Handler<Shutdown> for MultiColumnWalkerActor {
type Result = ();
fn handle(&mut self, _: Shutdown, ctx: &mut Self::Context) -> Self::Result {
ctx.stop();
}
}
impl MultiColumnWalkerActor {
pub(crate) fn new(itype: IType) -> Self {
Self { itype }
}
fn walk<T: AsRef<Path> + Debug>(
&self,
path: T,
name: String,
ext: String,
addr: Addr<MultiColumnWalkerActor>,
) {
let entries = match fs::read_dir(path.as_ref()) {
Ok(entries) => entries,
Err(err) => {
warn!(
"Failed to read path {:?} for multi column file: {}",
path, err
);
return;
}
};
let mut files = vec![];
for entry in entries {
let entry = match entry {
Ok(entry) => entry,
Err(err) => {
warn!(
"Failed to process entry in {:?} for multi column file: {}",
path, err
);
return;
}
};
match entry.process() {
WalkResult::None(err) => {
warn!(
"Failed to process entry in {:?} for multi column file: {}",
path, err
);
return;
}
WalkResult::Dir(path) => addr.do_send(StoreMultiColumn {
path: Arc::from(path),
name: name.clone(),
ext: ext.clone(),
}),
WalkResult::File(path) => {
if let Some(path_ext) = path.extension() {
if let Some(path_ext) = path_ext.to_str() {
if path_ext == ext {
files.push(path);
}
}
}
}
}
}
files.sort();
self.save(path, files, name);
}
fn save<T: AsRef<Path> + Debug>(&self, path: T, files: Vec<PathBuf>, name: String) {
let name = name.as_str();
let mut out = path.as_ref().to_path_buf();
out.push(name);
match Self::write(&out, files) {
Ok(_) => info!("Multicolumn file {:?} is written", out),
Err(e) => warn!("Failed to write multicolumn file {:?}: {}", out, e),
}
}
fn write(name: &PathBuf, files: Vec<PathBuf>) -> io::Result<()> {
let mut data = vec![];
let mut names = vec![];
let mut line_counter = 0;
for path in &files {
if path.as_os_str() == name {
continue;
}
let reader = io::BufReader::new(File::open(path)?);
let mut i = 0;
for line in reader.lines() {
let line = line?;
data.push(line);
i += 1;
}
names.push(path);
if line_counter == 0 {
line_counter = i
} else if line_counter != i {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"Integrated files in {:?} have different number of bins",
path
),
));
}
}
if line_counter == 0 || names.len() == 0 {
return Ok(());
}
let mut writer = io::BufWriter::new(File::create(&name)?);
write!(writer, "#q")?;
for name in &names {
if let Some(stem) = name.file_stem() {
if let Some(name) = stem.to_str() {
write!(writer, " {}", name)?;
}
}
}
write!(writer, "\n")?;
let mut write_q = true;
for i in 0..line_counter {
for j in 0..names.len() {
let sl = data[j * line_counter + i].split_ascii_whitespace();
for (k, item) in sl.enumerate() {
match k {
0 => {
if write_q {
write!(writer, "{}", item)?;
write_q = false;
}
}
1 => write!(writer, " {}", item)?,
_ => break,
}
}
}
write!(writer, "\n")?;
write_q = true;
}
Ok(())
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct StoreMultiColumn<T: AsRef<Path> + Debug> {
pub path: T,
pub name: String,
pub ext: String,
}
impl<T: AsRef<Path> + Debug> Handler<StoreMultiColumn<T>> for MultiColumnWalkerActor {
type Result = ();
fn handle(&mut self, msg: StoreMultiColumn<T>, ctx: &mut Self::Context) -> Self::Result {
self.walk(msg.path, msg.name, msg.ext, ctx.address())
}
}