use crate::integrator::{Integration, Settings};
use crate::notifier::{NotifierActor, StartWatching, StopWatching};
use crate::results::{RestartIntegration, ResultsActor};
use crate::session::{BubbleSessionActor, SessionID, SessionRequest};
use crate::walker::{MultiColumnWalkerActor, RunWalker, StoreMultiColumn, WalkerActor};
use crate::{IType, Params};
use actix::{Actor, Addr, Handler, Message, SyncArbiter, SyncContext};
use parking_lot::RwLock;
use std::sync::Arc;
#[derive(Message)]
#[rtype(result = "()")]
pub struct Session2DispatcherRequest {
pub session: Addr<BubbleSessionActor>,
pub state: DispatcherState,
pub id: SessionID,
}
struct Actors {
results: Addr<ResultsActor>,
walker: Addr<WalkerActor>,
multicolumn_walker: Addr<MultiColumnWalkerActor>,
notifier: Addr<NotifierActor>,
}
pub struct Dispatcher {
integration: Arc<RwLock<Integration>>,
actors: Option<Actors>,
params: Arc<Params>,
}
impl Dispatcher {
pub fn new(itype: IType, params: Arc<Params>) -> Dispatcher {
Dispatcher {
integration: Arc::new(RwLock::new(Integration::new(itype))),
actors: None,
params,
}
}
fn parse_settings(&mut self, s: Settings) -> DispatcherState {
let was_running = { self.integration.read().is_running() };
let (errors, warnings, running) = {
let mut i = self.integration.write();
let (errors, warnings) = i.parse_settings(s);
(errors, warnings, i.is_running())
};
let actors = self.actors.as_ref().unwrap();
if running {
if !was_running {
let i = self.integration.read();
actors.results.do_send(RestartIntegration);
actors.walker.do_send(RunWalker {
path: i.path().to_owned(),
average: i.average(),
});
actors.notifier.do_send(StartWatching(i.path().to_owned()));
}
} else {
if was_running {
let i = self.integration.read();
let path = i.path().to_owned();
actors.notifier.do_send(StopWatching(path.clone()));
let multi_column = i.multi_column();
if !multi_column.is_empty() {
let name = multi_column.to_owned();
let ext = i.extension().to_owned();
actors
.multicolumn_walker
.do_send(StoreMultiColumn { path, name, ext });
}
}
}
DispatcherState {
integration: self.integration.clone(),
errors,
warnings,
}
}
fn state(&self) -> DispatcherState {
DispatcherState {
integration: self.integration.clone(),
errors: vec![],
warnings: vec![],
}
}
fn start_actors(&mut self) {
let threads = self.params.threads;
let p = self.params.clone();
let results = SyncArbiter::start(1, move || ResultsActor::new(p.clone()));
let r = results.clone();
let i = self.integration.clone();
let walker = SyncArbiter::start(1, move || WalkerActor::new(threads, i.clone(), r.clone()));
self.actors = Some(Actors {
results,
walker: walker.clone(),
multicolumn_walker: SyncArbiter::start(1, move || MultiColumnWalkerActor),
notifier: SyncArbiter::start(1, move || NotifierActor::new(walker.clone())),
})
}
}
impl Actor for Dispatcher {
type Context = SyncContext<Self>;
}
impl Handler<SessionRequest> for Dispatcher {
type Result = ();
fn handle(&mut self, msg: SessionRequest, _: &mut Self::Context) -> Self::Result {
if let None = self.actors {
self.start_actors();
}
let state = match msg.settings {
None => self.state(),
Some(s) => self.parse_settings(s),
};
let actors = self.actors.as_ref().unwrap();
actors.results.do_send(Session2DispatcherRequest {
session: msg.addr,
state,
id: msg.session,
});
}
}
pub struct DispatcherState {
pub integration: Arc<RwLock<Integration>>,
pub errors: Vec<String>,
pub warnings: Vec<String>,
}