use std::str::FromStr;
use std::time::{Duration, Instant};
use actix::prelude::*;
use actix_web_actors::ws;
use actix_web_actors::ws::Message::Text;
use crate::websocket::devstreams::Connect;
use crate::websocket::devstreams::Disconnect;
use crate::websocket::devstreams::Subscribe;
use crate::websocket::devstreams::SubscribeChannel;
use crate::websocket::devstreams::InputMessage;
use crate::websocket::devstreams::InputMessageType;
use crate::websocket::devstreams::OutputMessage;
use crate::websocket::devstreams::OutputMessageType;
use crate::websocket::devstreams::TypedMessage;
use crate::websocket::devstreams::Server;
use crate::websocket::devstreams::ConnectId;
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(5);
const CLIENT_TIMEOUT: Duration = Duration::from_secs(10);
pub struct Connection {
pub id: ConnectId,
pub hb: Instant,
pub addr: actix::Addr<Server>,
}
impl Connection {
pub fn new(addr: actix::Addr<Server>) -> Self {
Self {
id: crate::commons::base64_uuid(),
hb: Instant::now(),
addr,
}
}
fn hb(&self, ctx: &mut <Self as Actor>::Context) {
ctx.run_interval(HEARTBEAT_INTERVAL, |act, ctx| {
if Instant::now().duration_since(act.hb) > CLIENT_TIMEOUT {
ctx.stop();
return;
}
ctx.ping(b"Are you there?");
});
}
}
impl Actor for Connection {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
self.hb(ctx);
let addr = ctx.address();
self.addr
.send(
Connect {
addr: addr.recipient(),
id: self.id.to_owned(),
},
)
.into_actor(self)
.then(|res, _, ctx| {
match res {
Ok(_res) => (),
_ => ctx.stop(),
}
fut::ready(())
})
.wait(ctx);
}
fn stopping(&mut self, _: &mut Self::Context) -> Running {
self.addr.do_send(
Disconnect {
id: self.id.to_owned(),
},
);
Running::Stop
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Connection {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
match msg {
Ok(ws::Message::Ping(msg)) => {
self.hb = Instant::now();
ctx.pong(&msg);
}
Ok(ws::Message::Pong(_)) => {
self.hb = Instant::now();
}
Ok(ws::Message::Binary(bin)) => ctx.binary(bin),
Ok(ws::Message::Close(reason)) => {
ctx.close(reason);
ctx.stop();
}
Ok(ws::Message::Continuation(_)) => {
ctx.stop();
}
Ok(ws::Message::Nop) => (),
Ok(Text(s)) => match serde_json::from_str::<InputMessage>(&s) {
Ok(content) => ctx.notify(content),
Err(err) => ctx.text(OutputMessage::err(err.to_string(), &self.id)),
},
Err(e) => {
log::error!("error={:?}", e);
}
}
}
}
impl Handler<InputMessage> for Connection {
type Result = ();
fn handle(&mut self, msg: InputMessage, ctx: &mut Self::Context) -> Self::Result {
let val = msg.data;
let out = match msg.ty {
InputMessageType::Msg => {
let out = OutputMessage {
ty: OutputMessageType::Msg,
msg: TypedMessage::TextMessage(val.as_str().unwrap().to_string()),
id: self.id.to_owned(),
};
out
}
InputMessageType::Subscribe => {
match crate::commons::get_value_as_string(&val, "channel") {
Ok(Some(c)) => match SubscribeChannel::from_str(&c) {
Ok(channel) => {
self.addr.do_send(Subscribe {
id: self.id.to_owned(),
channel,
});
OutputMessage {
ty: OutputMessageType::Msg,
msg: TypedMessage::TextMessage(format!(
"subscribe success: channel={}",
c
)),
id: self.id.to_owned(),
}
}
Err(_) => OutputMessage {
ty: OutputMessageType::Err,
msg: TypedMessage::TextMessage("invalid channel".to_string()),
id: self.id.to_owned(),
},
},
Ok(None) => OutputMessage {
ty: OutputMessageType::Err,
msg: TypedMessage::TextMessage("channel is required".to_string()),
id: self.id.to_owned(),
},
Err(e) => OutputMessage {
ty: OutputMessageType::Err,
msg: TypedMessage::TextMessage(format!("invalid channel: error={:?}", e)),
id: self.id.to_owned(),
},
}
}
};
ctx.notify(out)
}
}
impl Handler<OutputMessage> for Connection {
type Result = ();
fn handle(&mut self, msg: OutputMessage, ctx: &mut Self::Context) {
ctx.text(msg.output());
}
}