1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132
use actix::prelude::*;
use actix_web_actors::ws;
use log::warn;
use crate::{
server::models, ClientMessage, Connect, Disconnect, EncodedMessage, Message, MessageType,
Server,
};
#[derive(Debug)]
pub struct WsSession {
/// unique session id
pub id: String,
/// peer name
pub name: Option<String>,
/// Is this WS session a TS transport?
pub is_transport: bool,
/// Chat server
pub addr: Addr<Server>,
}
impl Actor for WsSession {
type Context = ws::WebsocketContext<Self>;
/// Method is called on actor start.
/// We register ws session with ChatServer
fn started(&mut self, ctx: &mut Self::Context) {
// register self in chat server. `AsyncContext::wait` register
// future within context, but context waits until this future resolves
// before processing any other events.
// HttpContext::state() is instance of WsChatSessionState, state is shared
// across all routes within application
let addr = ctx.address();
self.addr
.send(Connect {
id: if self.id.is_empty() {
None
} else {
Some(self.id.to_owned())
},
is_transport: self.is_transport,
addr: addr.recipient(),
})
.into_actor(self)
.then(|res, act, ctx| {
match res {
Ok(res) => act.id = res,
// something is wrong with chat server
_ => ctx.stop(),
}
fut::ready(())
})
.wait(ctx);
}
fn stopping(&mut self, _: &mut Self::Context) -> Running {
// notify chat server
self.addr.do_send(Disconnect {
id: self.id.to_owned(),
});
Running::Stop
}
}
/// Handle messages from chat server, we simply send it to peer websocket
impl Handler<EncodedMessage> for WsSession {
type Result = ();
fn handle(&mut self, msg: EncodedMessage, ctx: &mut Self::Context) {
ctx.binary(msg.0);
}
}
impl Handler<Disconnect> for WsSession {
type Result = ();
fn handle(&mut self, _: Disconnect, ctx: &mut Self::Context) {
ctx.terminate();
}
}
/// WebSocket message handler
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsSession {
fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
let msg = match msg {
Err(_) => {
ctx.stop();
return;
}
Ok(msg) => msg,
};
match msg {
ws::Message::Binary(bytes) => {
let message = models::decode_message(&bytes.to_vec()).unwrap();
self.addr
.send(ClientMessage {
id: self.id.to_owned(),
data: message,
})
.into_actor(self)
.then(|res, _, ctx| {
match res {
Ok(res) => {
if let Some(error_msg) = res {
warn!("Error: {}", error_msg);
ctx.binary(models::encode_message(
&Message::new(&MessageType::Error).text(&error_msg).build(),
));
ctx.stop();
}
}
_ => ctx.stop(),
}
fut::ready(())
})
.wait(ctx);
}
ws::Message::Close(reason) => {
ctx.close(reason);
ctx.stop();
}
ws::Message::Continuation(_) => {
ctx.stop();
}
_ => (),
}
}
}