bubbles 4.2.0

Bubble integration server for powder diffraction
use crate::integrator::{Integration, Settings};
use crate::notifier::{NotifierActor, StartWatching, StopWatching};
use crate::results::{RestartIntegration, ResultsActor};
use crate::session::{BubbleSessionActor, SessionID, SessionRequest};
use crate::walker::{MultiColumnWalkerActor, RunWalker, StoreMultiColumn, WalkerActor};
use crate::{Params, Shutdown};
use actix::{
    Actor, ActorContext, Addr, Handler, Message, Running, SyncArbiter, SyncContext, System,
};
use parking_lot::RwLock;
use std::fmt;
use std::sync::Arc;

#[derive(PartialEq, Copy, Clone)]
pub enum IType {
    SAXS,
    WAXS,
}

impl fmt::Display for IType {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            IType::SAXS => write!(f, "SAXS"),
            IType::WAXS => write!(f, "WAXS"),
        }
    }
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct Session2DispatcherRequest {
    pub session: Addr<BubbleSessionActor>,
    pub state: DispatcherState,
    pub id: SessionID,
    pub state_requested: bool,
}

struct Actors {
    results: Addr<ResultsActor>,
    walker: Addr<WalkerActor>,
    multicolumn_walker: Addr<MultiColumnWalkerActor>,
    notifier: Addr<NotifierActor>,
}

pub struct Dispatcher {
    integration: Arc<RwLock<Integration>>,
    actors: Option<Actors>,
    params: Arc<Params>,
    itype: IType,
}

impl Dispatcher {
    pub fn new(itype: IType, params: Arc<Params>) -> Dispatcher {
        Dispatcher {
            integration: Arc::new(RwLock::new(Integration::new(itype))),
            actors: None,
            params,
            itype,
        }
    }

    fn parse_settings(&mut self, s: Settings) -> DispatcherState {
        let was_running = { self.integration.read().is_running() };
        let (errors, warnings, running) = {
            let mut i = self.integration.write();
            let (errors, warnings) = i.parse_settings(s);
            (errors, warnings, i.is_running())
        };
        let actors = self.actors.as_ref().unwrap();
        if running {
            if !was_running {
                let i = self.integration.read();
                actors.results.do_send(RestartIntegration);
                actors.walker.do_send(RunWalker {
                    path: i.path().to_owned(),
                    average: i.average(),
                });
                actors.notifier.do_send(StartWatching(i.path().to_owned()));
            }
        } else {
            if was_running {
                let i = self.integration.read();
                let path = i.path().to_owned();
                actors.notifier.do_send(StopWatching(path.clone()));
                let multi_column = i.multi_column();
                if !multi_column.is_empty() {
                    let name = multi_column.to_owned();
                    let ext = i.extension().to_owned();
                    actors
                        .multicolumn_walker
                        .do_send(StoreMultiColumn { path, name, ext });
                }
            }
        }
        DispatcherState {
            integration: self.integration.clone(),
            errors,
            warnings,
        }
    }

    fn state(&self) -> DispatcherState {
        DispatcherState {
            integration: self.integration.clone(),
            errors: vec![],
            warnings: vec![],
        }
    }

    fn start_actors(&mut self) {
        let threads = self.params.threads;
        let p = self.params.clone();
        let itype = self.itype;
        let results = SyncArbiter::start(1, move || ResultsActor::new(p.clone(), itype));
        let r = results.clone();
        let i = self.integration.clone();
        let walker = SyncArbiter::start(1, move || {
            WalkerActor::new(threads, i.clone(), r.clone(), itype)
        });
        self.actors = Some(Actors {
            results,
            walker: walker.clone(),
            multicolumn_walker: SyncArbiter::start(1, move || MultiColumnWalkerActor::new(itype)),
            notifier: SyncArbiter::start(1, move || NotifierActor::new(walker.clone(), itype)),
        })
    }
}

impl Actor for Dispatcher {
    type Context = SyncContext<Self>;

    fn started(&mut self, _: &mut Self::Context) {
        debug!("{} Dispatcher has started", self.itype);
    }

    fn stopping(&mut self, _: &mut Self::Context) -> Running {
        debug!("{} Dispatcher is stopping", self.itype);
        self.integration.write().stop();
        if let Some(actors) = self.actors.as_ref() {
            actors.results.do_send(Shutdown);
            actors.walker.do_send(Shutdown);
            actors.multicolumn_walker.do_send(Shutdown);
            actors.notifier.do_send(Shutdown);
        }
        self.actors = None;
        Running::Stop
    }

    fn stopped(&mut self, _: &mut Self::Context) {
        debug!("{} Dispatcher has stopped", self.itype);
        System::current().stop();
    }
}

impl Handler<Shutdown> for Dispatcher {
    type Result = ();

    fn handle(&mut self, _: Shutdown, ctx: &mut Self::Context) -> Self::Result {
        ctx.stop();
    }
}

impl Handler<SessionRequest> for Dispatcher {
    type Result = ();

    fn handle(&mut self, msg: SessionRequest, _: &mut Self::Context) -> Self::Result {
        if let None = self.actors {
            self.start_actors();
        }
        let state = match msg.settings {
            None => self.state(),
            Some(s) => self.parse_settings(s),
        };
        let actors = self.actors.as_ref().unwrap();
        actors.results.do_send(Session2DispatcherRequest {
            session: msg.addr,
            state,
            id: msg.session,
            state_requested: msg.state_requested,
        });
    }
}

pub struct DispatcherState {
    pub integration: Arc<RwLock<Integration>>,
    pub errors: Vec<String>,
    pub warnings: Vec<String>,
}