use actix::prelude::{Actor, Context, Handler, Recipient};
use crate::websocket::devstreams::Broadcast;
use crate::websocket::devstreams::Connect;
use crate::websocket::devstreams::Disconnect;
use crate::websocket::devstreams::Subscribe;
use crate::websocket::devstreams::SubscribeChannel;
use crate::websocket::devstreams::ConnectId;
use crate::websocket::devstreams::OutputMessage;
use crate::websocket::devstreams::OutputMessageType;
use crate::websocket::devstreams::TypedMessage;
#[derive(Default)]
pub struct Server {
connections: std::collections::HashMap<ConnectId, Recipient<OutputMessage>>,
subscribes: std::collections::HashMap<ConnectId, Vec<String>>,
}
impl Actor for Server {
type Context = Context<Self>;
}
impl Server {
pub fn send_message(&self, ty: &OutputMessageType, connect_id: &str, msg: &str) {
if let Some(socket_recipient) = self.connections.get(connect_id) {
socket_recipient.do_send(OutputMessage {
ty: *ty,
msg: TypedMessage::TextMessage(msg.to_string()),
id: connect_id.to_string(),
});
} else {
log::warn!("attempting to send message but couldn't find user id.");
}
}
pub fn broadcase_message(&self, ty: &OutputMessageType, msg: &str) {
self.connections
.iter()
.for_each(|conn| self.send_message(ty, conn.0, msg));
}
}
impl Handler<Connect> for Server {
type Result = ();
fn handle(&mut self, connect: Connect, _: &mut Context<Self>) -> Self::Result {
self.connections.insert(connect.id.to_owned(), connect.addr);
self.send_message(
&OutputMessageType::Info,
&connect.id,
"Ok, Websocket connection established ~",
);
}
}
impl Handler<Disconnect> for Server {
type Result = ();
fn handle(&mut self, msg: Disconnect, _: &mut Context<Self>) {
if self.connections.contains_key(&msg.id) {
self.send_message(&OutputMessageType::Info, &msg.id, "Bye bye.");
self.connections.remove(&msg.id);
}
if self.subscribes.contains_key(&msg.id) {
self.subscribes.remove(&msg.id);
}
}
}
impl Handler<Broadcast> for Server {
type Result = ();
fn handle(&mut self, msg: Broadcast, _: &mut Context<Self>) {
match msg.ty {
OutputMessageType::Info | OutputMessageType::Msg | OutputMessageType::Err => {
self.broadcase_message(&msg.ty, &msg.msg);
}
OutputMessageType::ServerTime => {
self.subscribes.iter().for_each(|(id, channels)| {
if channels.contains(&SubscribeChannel::ServerTime.to_string()) {
self.send_message(&msg.ty, id, &msg.msg);
}
});
}
OutputMessageType::Startup => {
self.subscribes.iter().for_each(|(id, channels)| {
if channels.contains(&SubscribeChannel::Startup.to_string()) {
self.send_message(&msg.ty, id, &msg.msg);
}
});
}
}
}
}
impl Handler<Subscribe> for Server {
type Result = ();
fn handle(&mut self, msg: Subscribe, _: &mut Context<Self>) {
let subscribes = &mut self.subscribes;
let channel = msg.channel.to_string();
if let Some(channels) = subscribes.get_mut(&msg.id) {
if !channels.contains(&channel) {
channels.push(channel);
}
} else {
subscribes.insert(msg.id.to_owned(), vec![channel]);
}
log::info!(
"Subscribe: connect_id={}, subscribes={:?}",
msg.id,
self.subscribes.get(&msg.id)
)
}
}