use lazy_static::lazy_static;
use serde_json::json;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};
use actix::prelude::*;
use actix_web::web::Bytes;
use actix_web_actors::ws;
use crate::core::auth0::Requestor;
use crate::websocket::webchat::actions::ConnectionWarnning;
use crate::websocket::webchat::actions::Disconnect;
use crate::websocket::webchat::input::subscribes::SubscribeAction;
use crate::websocket::webchat::input::subscribes::UnsubscribeAction;
use crate::websocket::webchat::input::Action;
use crate::websocket::webchat::input::Input;
use crate::websocket::webchat::output::Level;
use crate::websocket::webchat::output::OutputMessage;
use crate::websocket::webchat::output::OutputMessageType;
use crate::websocket::webchat::output::TypedMessage;
use crate::websocket::webchat::server::Server;
use crate::websocket::webchat::ConnectId;
use crate::websocket::webchat::ctx::TIMER_INTERVAL;
use super::input::chats::SendMessageAction;
lazy_static! {
#[rustfmt::skip]
pub static ref ALL_CONNECTIONS_LINE: Mutex<HashMap<String, i64>> = Mutex::new(HashMap::new());
pub static ref CONNECTION_TO_REMOVE: Mutex<HashMap<String, i64>> = Mutex::new(HashMap::new());
}
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(2);
const CLIENT_TIMEOUT_WARNNING: Duration = Duration::from_secs(10);
const CLIENT_TIMEOUT_REMOVE: Duration = Duration::from_secs(30);
pub struct Connection {
pub id: ConnectId,
pub hb: Instant,
pub addr: actix::Addr<Server>,
pub requestor: Requestor,
}
impl Handler<OutputMessage> for Connection {
type Result = ();
fn handle(&mut self, msg: OutputMessage, ctx: &mut Self::Context) {
if let Some(output) = msg.output() {
ctx.text(output);
}
}
}
impl Connection {
pub fn established(addr: actix::Addr<Server>, requestor: Requestor) -> Self {
let connection_id = crate::commons::base64_uuid();
let mut map = ALL_CONNECTIONS_LINE.lock().unwrap();
map.insert(
connection_id.to_owned(),
chrono::Utc::now().timestamp_millis(),
);
Self {
id: connection_id,
hb: Instant::now(),
addr,
requestor,
}
}
pub fn ping(&self, ctx: &mut <Self as Actor>::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT_REMOVE {
act.addr.do_send(Disconnect {
id: act.id.to_owned(),
});
act.clear_connection();
ctx.stop();
} else if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT_WARNNING {
#[rustfmt::skip]
act.addr.send(ConnectionWarnning {
id: act.id.to_owned(),
at: format!(
"{}ms",
CLIENT_TIMEOUT_REMOVE.as_millis() - Instant::now().duration_since(act.hb).as_millis()
),
})
.into_actor(act)
.then(|_, _, _| {
fut::ready(())
})
.wait(ctx);
}
ctx.ping(b"Are you there?");
});
}
pub fn server_time(&self, ctx: &mut <Self as Actor>::Context) {
ctx.run_interval(*TIMER_INTERVAL, |_, ctx| {
let s = chrono::Utc::now()
.format("%Y-%m-%dT%H:%M:%S.%3f")
.to_string()
+ "Z";
ctx.binary(Bytes::from(s));
});
}
pub fn has_connection(connection_id: &str) -> bool {
let map = ALL_CONNECTIONS_LINE.lock().unwrap();
map.contains_key(connection_id)
}
pub fn pending_remove(connection_id: &str) {
if !Self::has_connection(connection_id) {
return;
}
let mut map = CONNECTION_TO_REMOVE.lock().unwrap();
if !map.contains_key(connection_id) {
map.insert(
connection_id.to_owned(),
chrono::Utc::now().timestamp_millis(),
);
}
}
pub fn should_be_remove(&self) -> bool {
let map = CONNECTION_TO_REMOVE.lock().unwrap();
map.contains_key(&self.id)
}
pub fn clear_connection(&self) {
let mut map1 = CONNECTION_TO_REMOVE.lock().unwrap();
map1.remove(&self.id);
let mut map2 = ALL_CONNECTIONS_LINE.lock().unwrap();
map2.remove(&self.id);
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Connection {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => {
if !self.should_be_remove() {
self.hb = Instant::now();
ctx.pong(&msg);
}
}
Ok(ws::Message::Pong(_)) => {
if !self.should_be_remove() {
self.hb = Instant::now();
}
}
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Close(reason)) => {
self.clear_connection();
ctx.close(reason);
ctx.stop();
}
Ok(ws::Message::Continuation(_)) => {
ctx.stop();
}
Ok(ws::Message::Nop) => (),
Ok(ws::Message::Text(msg)) => match serde_json::from_str::<Input>(&msg) {
Ok(content) => ctx.notify(content),
Err(err) => {
if msg.starts_with('{') && msg.ends_with('}') {
ctx.text(OutputMessage::err(
OutputMessageType::Failed,
&err.to_string(),
None,
))
} else {
let s = format!("{}", msg);
ctx.binary(Bytes::from(s));
}
}
},
Err(e) => {
log::error!("connection_id={}, error={:?}", self.id, e);
} }
}
}
impl Handler<Input> for Connection {
type Result = ();
fn handle(&mut self, msg: Input, ctx: &mut Self::Context) -> Self::Result {
let val = msg.data;
let out = match msg.action {
Action::Subscribe => match SubscribeAction::parse(&self.id, &val) {
Ok(subscribe) => {
let out = OutputMessage {
id: msg.id,
level: Level::Info,
ty: OutputMessageType::Success,
msg: TypedMessage::JsonMessage(
json! ({"action": Action::Subscribe, "channel": subscribe.channel}),
),
};
self.addr.do_send(subscribe);
out
}
Err(err) => OutputMessage {
id: msg.id,
level: Level::Err,
ty: OutputMessageType::Failed,
msg: TypedMessage::JsonMessage(
json! ({"action": Action::Subscribe, "description": err}),
),
},
},
Action::Unsubscribe => match UnsubscribeAction::parse(&self.id, &val) {
Ok(unsubscribe) => {
let out = OutputMessage {
id: msg.id,
level: Level::Info,
ty: OutputMessageType::Success,
msg: TypedMessage::JsonMessage(
json! ({"action": Action::Unsubscribe, "channel": unsubscribe.channel}),
),
};
self.addr.do_send(unsubscribe);
out
}
Err(err) => OutputMessage {
id: msg.id,
level: Level::Err,
ty: OutputMessageType::Failed,
msg: TypedMessage::JsonMessage(
json! ({"action": Action::Unsubscribe, "description": err}),
),
},
},
Action::SendMessage => match SendMessageAction::parse(&self.id, &val) {
Ok(message) => {
self.addr.do_send(message);
OutputMessage {
id: msg.id,
level: Level::Info,
ty: OutputMessageType::Accept,
msg: TypedMessage::JsonMessage(json!(true)),
}
}
Err(err) => OutputMessage {
id: msg.id,
level: Level::Err,
ty: OutputMessageType::Failed,
msg: TypedMessage::JsonMessage(
json! ({"action": Action::SendMessage, "description": err}),
),
},
},
};
ctx.notify(out)
}
}