bubbles 4.1.0

Bubble integration server for powder diffraction
use crate::codec::BubbleCodec;
use crate::dispatcher::Dispatcher;
use crate::integrator::Settings;
use crate::results::{ClientResponse, SessionResponse};
use crate::Params;
use actix::System;
use actix::{Actor, Addr, AsyncContext, Context, Handler, Message, StreamHandler};
use serde;
use std::io;
use std::sync::Arc;
use tokio::io::WriteHalf;
use tokio::net::TcpStream;
use uuid::Uuid;

pub type SessionID = Uuid;

#[derive(serde::Deserialize, Message, Debug)]
#[rtype(result = "()")]
pub struct ClientRequest {
    #[serde(default)]
    state: i32,
    #[serde(default)]
    kill: i32,
    #[serde(default)]
    waxs: Option<Settings>,
    #[serde(default)]
    saxs: Option<Settings>,
}

#[derive(serde::Serialize)]
pub struct ServerState {
    #[serde(rename = "startedTimestamp")]
    pub started: f64,
    pub killed: bool,
}

impl ServerState {
    fn current(started: f64, killed: bool) -> ServerState {
        ServerState { started, killed }
    }
}

#[derive(serde::Serialize)]
pub struct BubbleState {
    server: ServerState,
    waxs: Option<ClientResponse>,
    saxs: Option<ClientResponse>,
    errors: Vec<String>,
    warnings: Vec<String>,
}

pub struct BubbleSessionActor {
    params: Arc<Params>,
    client: String,
    waxs_addr: Addr<Dispatcher>,
    saxs_addr: Addr<Dispatcher>,
    framed: actix::io::FramedWrite<BubbleState, WriteHalf<TcpStream>, BubbleCodec>,
    id: SessionID,
}

#[derive(Message)]
#[rtype(result = "()")]
pub struct SessionRequest {
    pub settings: Option<Settings>,
    pub addr: Addr<BubbleSessionActor>,
    pub session: SessionID,
}

impl BubbleSessionActor {
    pub fn new(
        params: Arc<Params>,
        client: String,
        waxs_addr: Addr<Dispatcher>,
        saxs_addr: Addr<Dispatcher>,
        framed: actix::io::FramedWrite<BubbleState, WriteHalf<TcpStream>, BubbleCodec>,
    ) -> BubbleSessionActor {
        info!("A client connected from {}", &client);
        BubbleSessionActor {
            params,
            client,
            waxs_addr,
            saxs_addr,
            framed,
            id: Uuid::new_v4(),
        }
    }

    fn make_response(&mut self, req: ClientRequest, addr: Addr<BubbleSessionActor>) {
        let killed = if req.kill != 0 {
            info!(
                "The client {} requested an immediate exit, shutting down",
                self.client
            );
            true
        } else {
            false
        };
        self.waxs_addr.do_send(SessionRequest {
            settings: req.waxs,
            addr: addr.clone(),
            session: self.id,
        });
        self.saxs_addr.do_send(SessionRequest {
            settings: req.saxs,
            addr,
            session: self.id,
        });
        if killed {
            System::current().stop(); // does not seem to work
            debug!("We're still alive after System::current().stop(), let's commit a suicide");
            std::process::exit(0);
        }
    }
}

impl Actor for BubbleSessionActor {
    type Context = Context<Self>;
}

impl actix::io::WriteHandler<io::Error> for BubbleSessionActor {}

impl StreamHandler<Result<ClientRequest, io::Error>> for BubbleSessionActor {
    fn handle(&mut self, msg: Result<ClientRequest, io::Error>, ctx: &mut Self::Context) {
        match msg {
            Ok(req) => self.make_response(req, ctx.address()),
            Err(err) => {
                error!("Error parsing client {} request: {}", self.client, err);
                info!("Client {} disconnected", self.client);
                self.framed.close();
            }
        }
    }
}

impl Handler<SessionResponse> for BubbleSessionActor {
    type Result = ();

    fn handle(&mut self, msg: SessionResponse, _: &mut Self::Context) {
        let state = BubbleState {
            server: ServerState::current(self.params.started, false),
            waxs: msg.waxs,
            saxs: msg.saxs,
            errors: msg.errors,
            warnings: msg.warnings,
        };
        self.framed.write(state);
    }
}