use crate::integrator::{Integration, Settings};
use crate::notifier::{NotifierActor, StartWatching, StopWatching};
use crate::results::{RestartIntegration, ResultsActor};
use crate::session::{BubbleSessionActor, SessionID, SessionRequest};
use crate::walker::{MultiColumnWalkerActor, RunWalker, StoreMultiColumn, WalkerActor};
use crate::{Params, Shutdown};
use actix::{
Actor, ActorContext, Addr, Handler, Message, Running, SyncArbiter, SyncContext, System,
};
use parking_lot::RwLock;
use std::fmt;
use std::sync::Arc;
#[derive(PartialEq, Copy, Clone)]
pub enum IType {
SAXS,
WAXS,
}
impl fmt::Display for IType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
IType::SAXS => write!(f, "SAXS"),
IType::WAXS => write!(f, "WAXS"),
}
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct Session2DispatcherRequest {
pub session: Addr<BubbleSessionActor>,
pub state: DispatcherState,
pub id: SessionID,
pub state_requested: bool,
}
struct Actors {
results: Addr<ResultsActor>,
walker: Addr<WalkerActor>,
multicolumn_walker: Addr<MultiColumnWalkerActor>,
notifier: Addr<NotifierActor>,
}
pub struct Dispatcher {
integration: Arc<RwLock<Integration>>,
actors: Option<Actors>,
params: Arc<Params>,
itype: IType,
}
impl Dispatcher {
pub fn new(itype: IType, params: Arc<Params>) -> Dispatcher {
Dispatcher {
integration: Arc::new(RwLock::new(Integration::new(itype))),
actors: None,
params,
itype,
}
}
fn parse_settings(&mut self, s: Settings) -> DispatcherState {
let was_running = { self.integration.read().is_running() };
let (errors, warnings, running) = {
let mut i = self.integration.write();
let (errors, warnings) = i.parse_settings(s);
(errors, warnings, i.is_running())
};
let actors = self.actors.as_ref().unwrap();
if running {
if !was_running {
let i = self.integration.read();
actors.results.do_send(RestartIntegration);
actors.walker.do_send(RunWalker {
path: i.path().to_owned(),
average: i.average(),
});
actors.notifier.do_send(StartWatching(i.path().to_owned()));
}
} else {
if was_running {
let i = self.integration.read();
let path = i.path().to_owned();
actors.notifier.do_send(StopWatching(path.clone()));
let multi_column = i.multi_column();
if !multi_column.is_empty() {
let name = multi_column.to_owned();
let ext = i.extension().to_owned();
actors
.multicolumn_walker
.do_send(StoreMultiColumn { path, name, ext });
}
}
}
DispatcherState {
integration: self.integration.clone(),
errors,
warnings,
}
}
fn state(&self) -> DispatcherState {
DispatcherState {
integration: self.integration.clone(),
errors: vec![],
warnings: vec![],
}
}
fn start_actors(&mut self) {
let threads = self.params.threads;
let p = self.params.clone();
let itype = self.itype;
let results = SyncArbiter::start(1, move || ResultsActor::new(p.clone(), itype));
let r = results.clone();
let i = self.integration.clone();
let walker = SyncArbiter::start(1, move || {
WalkerActor::new(threads, i.clone(), r.clone(), itype)
});
self.actors = Some(Actors {
results,
walker: walker.clone(),
multicolumn_walker: SyncArbiter::start(1, move || MultiColumnWalkerActor::new(itype)),
notifier: SyncArbiter::start(1, move || NotifierActor::new(walker.clone(), itype)),
})
}
}
impl Actor for Dispatcher {
type Context = SyncContext<Self>;
fn started(&mut self, _: &mut Self::Context) {
debug!("{} Dispatcher has started", self.itype);
}
fn stopping(&mut self, _: &mut Self::Context) -> Running {
debug!("{} Dispatcher is stopping", self.itype);
self.integration.write().stop();
if let Some(actors) = self.actors.as_ref() {
actors.results.do_send(Shutdown);
actors.walker.do_send(Shutdown);
actors.multicolumn_walker.do_send(Shutdown);
actors.notifier.do_send(Shutdown);
}
self.actors = None;
Running::Stop
}
fn stopped(&mut self, _: &mut Self::Context) {
debug!("{} Dispatcher has stopped", self.itype);
System::current().stop();
}
}
impl Handler<Shutdown> for Dispatcher {
type Result = ();
fn handle(&mut self, _: Shutdown, ctx: &mut Self::Context) -> Self::Result {
ctx.stop();
}
}
impl Handler<SessionRequest> for Dispatcher {
type Result = ();
fn handle(&mut self, msg: SessionRequest, _: &mut Self::Context) -> Self::Result {
if let None = self.actors {
self.start_actors();
}
let state = match msg.settings {
None => self.state(),
Some(s) => self.parse_settings(s),
};
let actors = self.actors.as_ref().unwrap();
actors.results.do_send(Session2DispatcherRequest {
session: msg.addr,
state,
id: msg.session,
state_requested: msg.state_requested,
});
}
}
pub struct DispatcherState {
pub integration: Arc<RwLock<Integration>>,
pub errors: Vec<String>,
pub warnings: Vec<String>,
}