use crate::codec::BubbleCodec;
use crate::dispatcher::Dispatcher;
use crate::params::Params;
use crate::session::{BubbleSessionActor, SessionDestroyed};
use crate::{dispatcher::IType, Shutdown};
use actix::{
Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, Running, StreamHandler,
SyncArbiter, System,
};
use std::collections::HashSet;
use std::net;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_util::codec::FramedRead;
pub(crate) struct Server {
params: Arc<Params>,
waxs_dispatcher: Addr<Dispatcher>,
saxs_dispatcher: Addr<Dispatcher>,
sessions: HashSet<Addr<BubbleSessionActor>>,
}
fn start_dispatcher(params: Arc<Params>, itype: IType) -> Addr<Dispatcher> {
SyncArbiter::start(1, move || Dispatcher::new(itype, params.clone()))
}
impl Server {
pub(crate) fn new(params: Arc<Params>) -> Self {
Self {
waxs_dispatcher: start_dispatcher(params.clone(), IType::WAXS),
saxs_dispatcher: start_dispatcher(params.clone(), IType::SAXS),
params,
sessions: HashSet::new(),
}
}
}
impl Actor for Server {
type Context = Context<Self>;
fn started(&mut self, _: &mut Self::Context) {
debug!("TCP server has started");
}
fn stopping(&mut self, _: &mut Self::Context) -> Running {
debug!("TCP server is stopping");
self.waxs_dispatcher.do_send(Shutdown);
self.saxs_dispatcher.do_send(Shutdown);
for session in &self.sessions {
session.do_send(Shutdown);
}
self.sessions = HashSet::new();
Running::Stop
}
fn stopped(&mut self, _: &mut Self::Context) {
debug!("TCP server has stopped");
System::current().stop();
std::process::exit(0);
}
}
impl Handler<Shutdown> for Server {
type Result = ();
fn handle(&mut self, _: Shutdown, ctx: &mut Self::Context) -> Self::Result {
ctx.stop();
}
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct TcpConnect(pub TcpStream, pub net::SocketAddr);
impl Handler<TcpConnect> for Server {
type Result = ();
fn handle(&mut self, msg: TcpConnect, ctx: &mut Context<Self>) {
let server = ctx.address();
let params = self.params.clone();
let waxs_addr = self.waxs_dispatcher.clone();
let saxs_addr = self.saxs_dispatcher.clone();
let session = BubbleSessionActor::create(move |ctx| {
let (r, w) = tokio::io::split(msg.0);
BubbleSessionActor::add_stream(FramedRead::new(r, BubbleCodec), ctx);
BubbleSessionActor::new(
server,
params,
msg.1.to_string(),
waxs_addr,
saxs_addr,
actix::io::FramedWrite::new(w, BubbleCodec, ctx),
)
});
self.sessions.insert(session);
}
}
impl Handler<SessionDestroyed> for Server {
type Result = ();
fn handle(&mut self, msg: SessionDestroyed, _: &mut Self::Context) -> Self::Result {
self.sessions.remove(&msg.0);
}
}