use crate::codec::BubbleCodec;
use crate::dispatcher::Dispatcher;
use crate::integrator::Settings;
use crate::results::{ClientResponse, SessionResponse};
use crate::{Params, Server, Shutdown};
use actix::{Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, StreamHandler};
use actix::{Running, System};
use serde;
use std::io;
use std::sync::Arc;
use tokio::io::WriteHalf;
use tokio::net::TcpStream;
use uuid::Uuid;
pub type SessionID = Uuid;
#[derive(serde::Deserialize, Message, Debug)]
#[rtype(result = "()")]
pub struct ClientRequest {
#[serde(default)]
state: i32,
#[serde(default)]
kill: i32,
#[serde(default)]
waxs: Option<Settings>,
#[serde(default)]
saxs: Option<Settings>,
}
#[derive(serde::Serialize)]
pub struct ServerState {
#[serde(rename = "startedTimestamp")]
pub started: f64,
pub killed: bool,
}
impl ServerState {
fn current(started: f64, killed: bool) -> ServerState {
ServerState { started, killed }
}
}
#[derive(serde::Serialize)]
pub struct BubbleState {
server: ServerState,
waxs: Option<ClientResponse>,
saxs: Option<ClientResponse>,
errors: Vec<String>,
warnings: Vec<String>,
}
pub struct BubbleSessionActor {
server: Addr<Server>,
params: Arc<Params>,
client: String,
waxs_addr: Addr<Dispatcher>,
saxs_addr: Addr<Dispatcher>,
framed: actix::io::FramedWrite<BubbleState, WriteHalf<TcpStream>, BubbleCodec>,
id: SessionID,
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct SessionDestroyed(pub Addr<BubbleSessionActor>);
#[derive(Message)]
#[rtype(result = "()")]
pub struct SessionRequest {
pub settings: Option<Settings>,
pub addr: Addr<BubbleSessionActor>,
pub session: SessionID,
pub state_requested: bool,
}
impl BubbleSessionActor {
pub(crate) fn new(
server: Addr<Server>,
params: Arc<Params>,
client: String,
waxs_addr: Addr<Dispatcher>,
saxs_addr: Addr<Dispatcher>,
framed: actix::io::FramedWrite<BubbleState, WriteHalf<TcpStream>, BubbleCodec>,
) -> BubbleSessionActor {
info!("A client connected from {}", &client);
BubbleSessionActor {
server,
params,
client,
waxs_addr,
saxs_addr,
framed,
id: Uuid::new_v4(),
}
}
fn make_response(&mut self, req: ClientRequest, addr: Addr<BubbleSessionActor>) {
let killed = if req.kill != 0 {
info!(
"The client {} requested an immediate exit, shutting down",
self.client
);
true
} else {
false
};
self.waxs_addr.do_send(SessionRequest {
settings: req.waxs,
addr: addr.clone(),
session: self.id,
state_requested: req.state != 0,
});
self.saxs_addr.do_send(SessionRequest {
settings: req.saxs,
addr,
session: self.id,
state_requested: req.state != 0,
});
if killed {
self.server.do_send(Shutdown);
}
}
}
impl Actor for BubbleSessionActor {
type Context = Context<Self>;
fn started(&mut self, _: &mut Self::Context) {
debug!("Session for {} ({}) has started", self.client, self.id);
}
fn stopping(&mut self, _: &mut Self::Context) -> Running {
debug!("Session for {} ({}) is stopping", self.client, self.id);
Running::Stop
}
fn stopped(&mut self, _: &mut Self::Context) {
debug!("Session for {} ({}) has stopped", self.client, self.id);
System::current().stop();
}
}
impl actix::io::WriteHandler<io::Error> for BubbleSessionActor {}
impl StreamHandler<Result<ClientRequest, io::Error>> for BubbleSessionActor {
fn handle(&mut self, msg: Result<ClientRequest, io::Error>, ctx: &mut Self::Context) {
match msg {
Ok(req) => self.make_response(req, ctx.address()),
Err(err) => {
error!("Error parsing client {} request: {}", self.client, err);
info!("Client {} disconnected", self.client);
self.framed.close();
self.server.do_send(SessionDestroyed(ctx.address()));
}
}
}
}
impl Handler<SessionResponse> for BubbleSessionActor {
type Result = ();
fn handle(&mut self, msg: SessionResponse, _: &mut Self::Context) {
let state = BubbleState {
server: ServerState::current(self.params.started, false),
waxs: msg.waxs,
saxs: msg.saxs,
errors: msg.errors,
warnings: msg.warnings,
};
self.framed.write(state);
}
}
impl Handler<Shutdown> for BubbleSessionActor {
type Result = ();
fn handle(&mut self, _: Shutdown, ctx: &mut Self::Context) -> Self::Result {
ctx.stop();
}
}