use std::fmt::Debug;
use std::time::Duration;
use actix::prelude::*;
use actix_web::{web, Error as ActixError, HttpRequest, HttpResponse};
use actix_web_actors::ws::{self, CloseCode, CloseReason};
use futures::{
future::FutureResult,
stream::{iter_ok, Stream},
sync::mpsc::{unbounded, UnboundedSender},
IntoFuture,
};
use serde::ser::Serialize;
use super::ResponseError;
const PING_INTERVAL: u64 = 30;
pub struct Request(HttpRequest, web::Payload);
impl From<(HttpRequest, web::Payload)> for Request {
fn from((http_request, payload): (HttpRequest, web::Payload)) -> Self {
Self(http_request, payload)
}
}
impl From<Request> for (HttpRequest, web::Payload) {
fn from(req: Request) -> Self {
(req.0, req.1)
}
}
pub struct Response(HttpResponse);
impl From<HttpResponse> for Response {
fn from(res: HttpResponse) -> Self {
Self(res)
}
}
impl IntoFuture for Response {
type Item = HttpResponse;
type Error = ActixError;
type Future = FutureResult<HttpResponse, ActixError>;
fn into_future(self) -> Self::Future {
self.0.into_future()
}
}
impl std::fmt::Debug for Response {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "{:?}", self.0)
}
}
pub fn new_websocket_event_sender<T: Serialize + Debug>(
req: Request,
initial_events: Box<dyn Iterator<Item = T> + Send>,
) -> Result<(EventSender<T>, Response), ResponseError> {
let (sender, recv) = unbounded();
let (request, payload) = req.into();
let stream = iter_ok::<_, ()>(initial_events.map(MessageWrapper::Message)).chain(recv);
let res = ws::start(
EventSenderWebSocket::new(Box::new(stream)),
&request,
payload,
)
.map_err(ResponseError::from)?;
Ok((EventSender { sender }, Response::from(res)))
}
#[derive(Clone)]
pub struct EventSender<T: Serialize + Debug + 'static> {
sender: UnboundedSender<MessageWrapper<T>>,
}
impl<T: Serialize + Debug + 'static> EventSender<T> {
pub fn send(&self, event: T) -> Result<(), EventSendError<T>> {
trace!("Event sent: {:?}", &event);
self.sender
.unbounded_send(MessageWrapper::Message(event))
.map_err(|err| match err.into_inner() {
MessageWrapper::Message(event) => EventSendError(event),
_ => {
panic!("Sent an Message variant, but didn't receive the same variant on error")
}
})
}
pub fn shutdown(self) {
if self
.sender
.unbounded_send(MessageWrapper::Shutdown)
.is_err()
{
debug!("Attempting to shutdown an already stopped websocket");
}
}
}
impl<T: Serialize + Debug + 'static> Drop for EventSender<T> {
fn drop(&mut self) {
if self
.sender
.unbounded_send(MessageWrapper::Shutdown)
.is_err()
{
debug!("Attempting to shutdown an already stopped websocket");
}
}
}
#[derive(Debug)]
pub struct EventSendError<T: Serialize + Debug + 'static>(pub T);
struct EventSenderWebSocket<T: Serialize + Debug + 'static> {
stream: Option<Box<dyn Stream<Item = MessageWrapper<T>, Error = ()>>>,
}
impl<T: Serialize + Debug + 'static> EventSenderWebSocket<T> {
fn new(stream: Box<dyn Stream<Item = MessageWrapper<T>, Error = ()>>) -> Self {
Self {
stream: Some(stream),
}
}
}
impl<T: Serialize + Debug + 'static> StreamHandler<MessageWrapper<T>, ()>
for EventSenderWebSocket<T>
{
fn handle(&mut self, msg: MessageWrapper<T>, ctx: &mut Self::Context) {
match msg {
MessageWrapper::Message(msg) => {
debug!("Received a message: {:?}", msg);
match serde_json::to_string(&msg) {
Ok(text) => ctx.text(text),
Err(err) => {
debug!("Failed to serialize payload: {:?}", err);
}
}
}
MessageWrapper::Shutdown => {
debug!("Shutting down websocket");
ctx.close(Some(CloseReason {
description: None,
code: CloseCode::Away,
}));
ctx.stop();
}
}
}
fn error(&mut self, _: (), ctx: &mut Self::Context) -> Running {
debug!("Received channel disconnect");
ctx.close(Some(CloseReason {
description: None,
code: CloseCode::Error,
}));
Running::Stop
}
}
impl<T: Serialize + Debug + 'static> Actor for EventSenderWebSocket<T> {
type Context = ws::WebsocketContext<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
if let Some(stream) = self.stream.take() {
debug!("Starting Event Websocket");
ctx.add_stream(stream);
ctx.run_interval(Duration::from_secs(PING_INTERVAL), move |_, ctx| {
trace!("Sending Ping");
ctx.ping("");
});
} else {
warn!("Event dealer websocket was unexpectedly started twice; ignoring");
}
}
}
impl<T: Serialize + Debug + 'static> StreamHandler<ws::Message, ws::ProtocolError>
for EventSenderWebSocket<T>
{
fn handle(&mut self, msg: ws::Message, ctx: &mut Self::Context) {
match msg {
ws::Message::Ping(msg) => ctx.ping(&msg),
ws::Message::Pong(msg) => ctx.pong(&msg),
ws::Message::Text(text) => ctx.text(text),
ws::Message::Binary(bin) => ctx.binary(bin),
ws::Message::Close(_) => {
ctx.close(Some(CloseReason {
description: Some("Received close frame closing normally".into()),
code: CloseCode::Normal,
}));
debug!("Received close message");
ctx.stop()
}
ws::Message::Nop => (),
};
}
}
#[derive(Debug, Message)]
enum MessageWrapper<T: Serialize + Debug + 'static> {
Message(T),
Shutdown,
}