bubbles 4.2.0

Bubble integration server for powder diffraction
use crate::codec::BubbleCodec;
use crate::dispatcher::Dispatcher;
use crate::params::Params;
use crate::session::{BubbleSessionActor, SessionDestroyed};
use crate::{dispatcher::IType, Shutdown};
use actix::{
    Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, Running, StreamHandler,
    SyncArbiter, System,
};
use std::collections::HashSet;
use std::net;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;

pub(crate) struct Server {
    params: Arc<Params>,
    waxs_dispatcher: Addr<Dispatcher>,
    saxs_dispatcher: Addr<Dispatcher>,
    sessions: HashSet<Addr<BubbleSessionActor>>,
}

fn start_dispatcher(params: Arc<Params>, itype: IType) -> Addr<Dispatcher> {
    SyncArbiter::start(1, move || Dispatcher::new(itype, params.clone()))
}

impl Server {
    pub(crate) fn new(params: Arc<Params>) -> Self {
        Self {
            waxs_dispatcher: start_dispatcher(params.clone(), IType::WAXS),
            saxs_dispatcher: start_dispatcher(params.clone(), IType::SAXS),
            params,
            sessions: HashSet::new(),
        }
    }
}

impl Actor for Server {
    type Context = Context<Self>;

    fn started(&mut self, _: &mut Self::Context) {
        debug!("TCP server has started");
    }

    fn stopping(&mut self, _: &mut Self::Context) -> Running {
        debug!("TCP server is stopping");
        self.waxs_dispatcher.do_send(Shutdown);
        self.saxs_dispatcher.do_send(Shutdown);
        for session in &self.sessions {
            session.do_send(Shutdown);
        }
        self.sessions = HashSet::new();
        Running::Stop
    }

    fn stopped(&mut self, _: &mut Self::Context) {
        debug!("TCP server has stopped");
        System::current().stop();
        std::process::exit(0);
    }
}

impl Handler<Shutdown> for Server {
    type Result = ();

    fn handle(&mut self, _: Shutdown, ctx: &mut Self::Context) -> Self::Result {
        ctx.stop();
    }
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct TcpConnect(pub TcpStream, pub net::SocketAddr);

impl Handler<TcpConnect> for Server {
    type Result = ();

    fn handle(&mut self, msg: TcpConnect, ctx: &mut Context<Self>) {
        let server = ctx.address();
        let params = self.params.clone();
        let waxs_addr = self.waxs_dispatcher.clone();
        let saxs_addr = self.saxs_dispatcher.clone();
        let session = BubbleSessionActor::create(move |ctx| {
            let (r, w) = tokio::io::split(msg.0);
            BubbleSessionActor::add_stream(FramedRead::new(r, BubbleCodec), ctx);
            BubbleSessionActor::new(
                server,
                params,
                msg.1.to_string(),
                waxs_addr,
                saxs_addr,
                actix::io::FramedWrite::new(w, BubbleCodec, ctx),
            )
        });
        self.sessions.insert(session);
    }
}

impl Handler<SessionDestroyed> for Server {
    type Result = ();

    fn handle(&mut self, msg: SessionDestroyed, _: &mut Self::Context) -> Self::Result {
        self.sessions.remove(&msg.0);
    }
}