bubbles 4.1.0

Bubble integration server for powder diffraction
use crate::integrator::{Integration, Settings};
use crate::notifier::{NotifierActor, StartWatching, StopWatching};
use crate::results::{RestartIntegration, ResultsActor};
use crate::session::{BubbleSessionActor, SessionID, SessionRequest};
use crate::walker::{MultiColumnWalkerActor, RunWalker, StoreMultiColumn, WalkerActor};
use crate::{IType, Params};
use actix::{Actor, Addr, Handler, Message, SyncArbiter, SyncContext};
use parking_lot::RwLock;
use std::sync::Arc;

#[derive(Message)]
#[rtype(result = "()")]
pub struct Session2DispatcherRequest {
    pub session: Addr<BubbleSessionActor>,
    pub state: DispatcherState,
    pub id: SessionID,
}

struct Actors {
    results: Addr<ResultsActor>,
    walker: Addr<WalkerActor>,
    multicolumn_walker: Addr<MultiColumnWalkerActor>,
    notifier: Addr<NotifierActor>,
}

pub struct Dispatcher {
    integration: Arc<RwLock<Integration>>,
    actors: Option<Actors>,
    params: Arc<Params>,
}

impl Dispatcher {
    pub fn new(itype: IType, params: Arc<Params>) -> Dispatcher {
        Dispatcher {
            integration: Arc::new(RwLock::new(Integration::new(itype))),
            actors: None,
            params,
        }
    }

    fn parse_settings(&mut self, s: Settings) -> DispatcherState {
        let was_running = { self.integration.read().is_running() };
        let (errors, warnings, running) = {
            let mut i = self.integration.write();
            let (errors, warnings) = i.parse_settings(s);
            (errors, warnings, i.is_running())
        };
        let actors = self.actors.as_ref().unwrap();
        if running {
            if !was_running {
                let i = self.integration.read();
                actors.results.do_send(RestartIntegration);
                actors.walker.do_send(RunWalker {
                    path: i.path().to_owned(),
                    average: i.average(),
                });
                actors.notifier.do_send(StartWatching(i.path().to_owned()));
            }
        } else {
            if was_running {
                let i = self.integration.read();
                let path = i.path().to_owned();
                actors.notifier.do_send(StopWatching(path.clone()));
                let multi_column = i.multi_column();
                if !multi_column.is_empty() {
                    let name = multi_column.to_owned();
                    let ext = i.extension().to_owned();
                    actors
                        .multicolumn_walker
                        .do_send(StoreMultiColumn { path, name, ext });
                }
            }
        }
        DispatcherState {
            integration: self.integration.clone(),
            errors,
            warnings,
        }
    }

    fn state(&self) -> DispatcherState {
        DispatcherState {
            integration: self.integration.clone(),
            errors: vec![],
            warnings: vec![],
        }
    }

    fn start_actors(&mut self) {
        let threads = self.params.threads;
        let p = self.params.clone();
        let results = SyncArbiter::start(1, move || ResultsActor::new(p.clone()));
        let r = results.clone();
        let i = self.integration.clone();
        let walker = SyncArbiter::start(1, move || WalkerActor::new(threads, i.clone(), r.clone()));
        self.actors = Some(Actors {
            results,
            walker: walker.clone(),
            multicolumn_walker: SyncArbiter::start(1, move || MultiColumnWalkerActor),
            notifier: SyncArbiter::start(1, move || NotifierActor::new(walker.clone())),
        })
    }
}

impl Actor for Dispatcher {
    type Context = SyncContext<Self>;
}

impl Handler<SessionRequest> for Dispatcher {
    type Result = ();

    fn handle(&mut self, msg: SessionRequest, _: &mut Self::Context) -> Self::Result {
        if let None = self.actors {
            self.start_actors();
        }
        let state = match msg.settings {
            None => self.state(),
            Some(s) => self.parse_settings(s),
        };
        let actors = self.actors.as_ref().unwrap();
        actors.results.do_send(Session2DispatcherRequest {
            session: msg.addr,
            state,
            id: msg.session,
        });
    }
}

pub struct DispatcherState {
    pub integration: Arc<RwLock<Integration>>,
    pub errors: Vec<String>,
    pub warnings: Vec<String>,
}