bubbles 4.2.0

Bubble integration server for powder diffraction
use crate::integrator::{Integration, IntegrationActor, Task};
use crate::notifier::FileCreated;
use crate::results::{ResultsActor, TaskToDo};
use crate::utils::{EntryProcessor, WalkResult};
use crate::{dispatcher::IType, Shutdown};
use actix::{
    Actor, ActorContext, Addr, Handler, Message, Running, SyncArbiter, SyncContext, System,
};
use cryiorust::frame::{self, is_frame};
use itertools::Itertools;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::fmt::Debug;
use std::fs::{self, File, ReadDir};
use std::io::{self, BufRead, Write};
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::Arc;

pub struct WalkerActor {
    threads: usize,
    itype: IType,
    ia: Addr<IntegrationActor>,
    results: Addr<ResultsActor>,
    accumulator: HashMap<Rc<Path>, HashMap<Rc<OsStr>, Vec<PathBuf>>>,
    average: usize,
}

impl WalkerActor {
    pub fn new(
        threads: usize,
        i: Arc<RwLock<Integration>>,
        results: Addr<ResultsActor>,
        itype: IType,
    ) -> WalkerActor {
        let r = results.clone();
        WalkerActor {
            threads,
            itype,
            ia: SyncArbiter::start(threads, move || IntegrationActor::new(i.clone(), r.clone())),
            results,
            accumulator: Default::default(),
            average: 0,
        }
    }

    fn run<T: AsRef<Path> + Debug>(&self, path: T, addr: Addr<WalkerActor>) {
        debug!("Walking in {:?}", path);
        let paths = fs::read_dir(path);
        match paths {
            Ok(paths) => self.visit(paths, addr),
            Err(e) => error!("Could not read dir: {}", e),
        }
    }

    fn visit(&self, paths: ReadDir, addr: Addr<WalkerActor>) {
        let mut acc = HashMap::new();
        paths
            .filter_map(|r| r.ok())
            .sorted_by_key(|e| e.path())
            .map(|path| match path.process() {
                WalkResult::None(e) => warn!("Could not process entry {:?}: {}", path, e),
                WalkResult::Dir(path) => addr.do_send(WalkInto(path)),
                WalkResult::File(path) => {
                    self.results.do_send(TaskToDo(1));
                    let task = acc.accumulate(path, self.average);
                    if task.is_done() {
                        self.ia.do_send(task);
                    }
                }
            })
            .for_each(|_| {});
    }
}

impl Actor for WalkerActor {
    type Context = SyncContext<Self>;

    fn started(&mut self, _: &mut Self::Context) {
        debug!("{} Walker has started", self.itype);
    }

    fn stopping(&mut self, _: &mut Self::Context) -> Running {
        debug!("{} Walker is stopping", self.itype);
        for _ in 0..self.threads {
            self.ia.do_send(Shutdown);
        }
        Running::Stop
    }

    fn stopped(&mut self, _: &mut Self::Context) {
        debug!("{} Walker has stopped", self.itype);
        System::current().stop();
    }
}

impl Handler<Shutdown> for WalkerActor {
    type Result = ();

    fn handle(&mut self, _: Shutdown, ctx: &mut Self::Context) -> Self::Result {
        ctx.stop();
    }
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct RunWalker<T: AsRef<Path> + Debug> {
    pub path: T,
    pub average: usize,
}

impl<T: AsRef<Path> + Debug> Handler<RunWalker<T>> for WalkerActor {
    type Result = ();

    fn handle(&mut self, msg: RunWalker<T>, ctx: &mut Self::Context) -> Self::Result {
        self.accumulator.clear();
        self.average = msg.average;
        self.run(msg.path, ctx.address())
    }
}

impl Handler<FileCreated> for WalkerActor {
    type Result = ();

    fn handle(&mut self, msg: FileCreated, _: &mut Self::Context) -> Self::Result {
        self.results.do_send(TaskToDo(1));
        if let Some(parent) = msg.0.parent() {
            let parent: Rc<Path> = Rc::from(parent);
            let accumulator = self.accumulator.entry(parent).or_insert(HashMap::new());
            let task = accumulator.accumulate(msg.0, self.average);
            if task.is_done() {
                self.ia.do_send(task);
            }
        }
    }
}

#[derive(Message)]
#[rtype(result = "()")]
struct WalkInto<T: AsRef<Path> + Debug>(T);

impl<T: AsRef<Path> + Debug> Handler<WalkInto<T>> for WalkerActor {
    type Result = ();

    fn handle(&mut self, msg: WalkInto<T>, ctx: &mut Self::Context) -> Self::Result {
        self.run(msg.0, ctx.address());
    }
}

trait PathAccumulator {
    fn accumulate(&mut self, path: PathBuf, average: usize) -> Task;
}

impl PathAccumulator for HashMap<Rc<OsStr>, Vec<PathBuf>> {
    fn accumulate(&mut self, path: PathBuf, average: usize) -> Task {
        if !is_frame(&path) {
            return Task::NotFrame(path);
        }
        if average < 2 || frame::is_possible_multi(&path) {
            return Task::One(path);
        }
        if let Some(ext) = path.extension() {
            let key: Rc<OsStr> = Rc::from(ext);
            let paths = self.entry(key.clone()).or_insert(vec![]);
            if paths.len() < average {
                paths.push(path);
            }
            if paths.len() == average {
                return Task::Many(self.remove(&key).unwrap());
            };
        }
        Task::NotYet
    }
}

pub struct MultiColumnWalkerActor {
    itype: IType,
}

impl Actor for MultiColumnWalkerActor {
    type Context = SyncContext<Self>;

    fn started(&mut self, _: &mut Self::Context) {
        debug!("{} Multicolumn Walker has started", self.itype);
    }

    fn stopping(&mut self, _: &mut Self::Context) -> Running {
        debug!("{} Multicolumn Walker is stopping", self.itype);
        Running::Stop
    }

    fn stopped(&mut self, _: &mut Self::Context) {
        debug!("{} Multicolumn Walker has stopped", self.itype);
        System::current().stop();
    }
}

impl Handler<Shutdown> for MultiColumnWalkerActor {
    type Result = ();

    fn handle(&mut self, _: Shutdown, ctx: &mut Self::Context) -> Self::Result {
        ctx.stop();
    }
}

impl MultiColumnWalkerActor {
    pub(crate) fn new(itype: IType) -> Self {
        Self { itype }
    }

    fn walk<T: AsRef<Path> + Debug>(
        &self,
        path: T,
        name: String,
        ext: String,
        addr: Addr<MultiColumnWalkerActor>,
    ) {
        let entries = match fs::read_dir(path.as_ref()) {
            Ok(entries) => entries,
            Err(err) => {
                warn!(
                    "Failed to read path {:?} for multi column file: {}",
                    path, err
                );
                return;
            }
        };
        let mut files = vec![];
        for entry in entries {
            let entry = match entry {
                Ok(entry) => entry,
                Err(err) => {
                    warn!(
                        "Failed to process entry in {:?} for multi column file: {}",
                        path, err
                    );
                    return;
                }
            };
            match entry.process() {
                WalkResult::None(err) => {
                    warn!(
                        "Failed to process entry in {:?} for multi column file: {}",
                        path, err
                    );
                    return;
                }
                WalkResult::Dir(path) => addr.do_send(StoreMultiColumn {
                    path: Arc::from(path),
                    name: name.clone(),
                    ext: ext.clone(),
                }),
                WalkResult::File(path) => {
                    if let Some(path_ext) = path.extension() {
                        if let Some(path_ext) = path_ext.to_str() {
                            if path_ext == ext {
                                files.push(path);
                            }
                        }
                    }
                }
            }
        }
        files.sort();
        self.save(path, files, name);
    }

    fn save<T: AsRef<Path> + Debug>(&self, path: T, files: Vec<PathBuf>, name: String) {
        let name = name.as_str();
        let mut out = path.as_ref().to_path_buf();
        out.push(name);
        match Self::write(&out, files) {
            Ok(_) => info!("Multicolumn file {:?} is written", out),
            Err(e) => warn!("Failed to write multicolumn file {:?}: {}", out, e),
        }
    }

    fn write(name: &PathBuf, files: Vec<PathBuf>) -> io::Result<()> {
        let mut data = vec![];
        let mut names = vec![];
        let mut line_counter = 0;
        for path in &files {
            if path.as_os_str() == name {
                continue;
            }
            let reader = io::BufReader::new(File::open(path)?);
            let mut i = 0;
            for line in reader.lines() {
                let line = line?;
                data.push(line);
                i += 1;
            }
            names.push(path);
            if line_counter == 0 {
                line_counter = i
            } else if line_counter != i {
                return Err(io::Error::new(
                    io::ErrorKind::InvalidData,
                    format!(
                        "Integrated files in {:?} have different number of bins",
                        path
                    ),
                ));
            }
        }
        if line_counter == 0 || names.len() == 0 {
            return Ok(());
        }
        let mut writer = io::BufWriter::new(File::create(&name)?);
        write!(writer, "#q")?;
        for name in &names {
            if let Some(stem) = name.file_stem() {
                if let Some(name) = stem.to_str() {
                    write!(writer, " {}", name)?;
                }
            }
        }
        write!(writer, "\n")?;
        let mut write_q = true;
        for i in 0..line_counter {
            for j in 0..names.len() {
                let sl = data[j * line_counter + i].split_ascii_whitespace();
                for (k, item) in sl.enumerate() {
                    match k {
                        0 => {
                            if write_q {
                                write!(writer, "{}", item)?;
                                write_q = false;
                            }
                        }
                        1 => write!(writer, " {}", item)?,
                        _ => break,
                    }
                }
            }
            write!(writer, "\n")?;
            write_q = true;
        }
        Ok(())
    }
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct StoreMultiColumn<T: AsRef<Path> + Debug> {
    pub path: T,
    pub name: String,
    pub ext: String,
}

impl<T: AsRef<Path> + Debug> Handler<StoreMultiColumn<T>> for MultiColumnWalkerActor {
    type Result = ();

    fn handle(&mut self, msg: StoreMultiColumn<T>, ctx: &mut Self::Context) -> Self::Result {
        self.walk(msg.path, msg.name, msg.ext, ctx.address())
    }
}