Skip to main content

sod_actix_web/
ws.rs

1//! # websockets
2//!
3//! An [`actix_web`] [`Handler`] that instantiates [`MutService`] impls to handle individual sessions.
4//!
5//! ## Session Factory
6//!
7//! The [`WsSessionFactory`] is the entry point to this module. It is used to create session services
8//! as connections are established by client, and acts as a [`actix_web`] [`Handler`].
9//!
10//! The [`WsSessionFactory`] encapsulates 2 functions to be defined be the user.
11//! - `F: Fn(&HttpRequest) -> Result<S, Error> + 'static` - The factory that produces session services,
12//!   which can produce either an Ok([`Service`]), Ok([`MutService`]), or Err([`Error`]).
13//! - `E: Fn(&mut S, S::Error)-> Result<(), S::Error>+ Unpin + 'static` - The error handler, which
14//!   is used as a callback to handle errors returned by your service implementation.
15//!
16//! Actix Wiring:
17//! ```rust,compile_fail
18//! web::resource("/echo").route(web::get().to(WsSessionFactory::new(
19//!   |_req| Ok(EchoService),
20//!   |_service, err| println!("ERROR: {err}"),
21//! ))),
22//! ```
23//!
24//! ## Session Services
25//!
26//! The underlying session [`MutService`] impls that are produced by the session service factory
27//! must accept a [`WsSessionEvent`] as input and produce a [`Option<WsMessage>`] as output.
28//!
29//! - [`WsSessionEvent`] input alerts the session of session lifecycle events and received messages.
30//! - [`Option<WsMessage>`] output can optionally send response payloads to a session.
31//!
32//! ## Error Handling
33//!
34//! The [`WsSessionFactory`] requires an `Fn(&mut S, S::Error) -> Result<(), S::Error>` error handler
35//! function to be provided by the user where `S: MutService` or `S: Service`. Since actix uses an
36//! asynchronous thread-pool behind the scenes to handle websocket requests, [`Service`] [`Err`] results
37//! are not able to bubble up outside of the underlying [`StreamHandler`].
38//!
39//! Instead of make assumptions about how a user wants to handle errors returned by a service, that is
40//! left entirely up to the user via the error handler. When the error handler returns [`Ok(())`], no action
41//! will be taken against the underlying session. When the [`Err`] is returned by the error handler, the
42//! session will be closed.
43//!
44//! A common error handler impl is to log the error and close the session:
45//! ```rust,compile_fail
46//! |_, err| {
47//!     log::error!("Session Error: {err}");
48//!     Err(err)
49//! }
50//! ```
51//!
52//! ## WsSendService
53//!
54//! To produce messages to a session outside of input event handling, use the [`WsSendService`]
55//! provided by the initial [`WsSessionEvent::Started`] event. You may take ownership of the
56//! [`WsSendService`] to asynchronously produce messages to the service outside of the session's
57//! input handler service, which will return a [`SendError`] once the session has been closed/shutdown.
58//!
59//! ## Ping/Pong
60//!
61//! Pong replies are automatically sent by this framework, so you may ignore Ping requests for the
62//! purpose of Ping/Pong responses.
63//!
64//! ## Echo Example
65//! ```rust,no_run
66//! use std::convert::Infallible;
67//! use actix_web::{web, App, HttpServer};
68//! use sod::MutService;
69//! use sod_actix_web::ws::{WsMessage, WsSessionEvent, WsSessionFactory};
70//!
71//! #[actix_web::main]
72//! async fn main() -> std::io::Result<()> {
73//!     struct EchoService;
74//!     impl MutService for EchoService {
75//!         type Input = WsSessionEvent;
76//!         type Output = Option<WsMessage>;
77//!         type Error = Infallible;
78//!         fn process(&mut self, event: WsSessionEvent) -> Result<Self::Output, Self::Error> {
79//!             Ok(match event {
80//!                 WsSessionEvent::Started(_) => {
81//!                     Some(WsMessage::Text("Welcome to EchoServer!".to_owned()))
82//!                 }
83//!                 WsSessionEvent::Message(message) => match message {
84//!                     WsMessage::Binary(data) => Some(WsMessage::Binary(data)),
85//!                     WsMessage::Text(text) => Some(WsMessage::Text(text)),
86//!                     _ => None, // note: pongs are sent automatically
87//!                 },
88//!                 _ => None,
89//!             })
90//!         }
91//!     }
92//!
93//!     HttpServer::new(|| {
94//!         App::new().service(
95//!             web::resource("/echo").route(web::get().to(WsSessionFactory::new(
96//!                 |_| Ok(EchoService),
97//!                 |_, err| {
98//!                     println!("ERROR: {err}");
99//!                     Err(err)
100//!                 },
101//!             ))),
102//!         )
103//!     })
104//!     .bind(("127.0.0.1", 8080))?
105//!     .run()
106//!     .await
107//! }
108//! ```
109
110use std::{marker::PhantomData, sync::Arc};
111
112use actix::{prelude::SendError, Actor, AsyncContext, Recipient, StreamHandler};
113use actix_web::{web, Error, Handler, HttpRequest, HttpResponse};
114use actix_web_actors::ws::{self, CloseCode, CloseReason};
115use sod::{MutService, Service};
116
117use crate::sealed::SettableFuture;
118
119/// The entry point to this module. It is used to create session services as connections are
120/// established by client, and acts as a [`actix_web`] [`Handler`].
121///
122/// See the this module's documentation for details and examples.
123pub struct WsSessionFactory<O, S, F, E>
124where
125    O: Into<Option<WsMessage>> + Unpin + 'static,
126    S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
127    F: Fn(&HttpRequest) -> Result<S, Error> + 'static,
128    E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
129{
130    factory: Arc<F>,
131    error_handler: Arc<E>,
132    _phantom: PhantomData<fn(O, S)>,
133}
134impl<O, S, F, E> WsSessionFactory<O, S, F, E>
135where
136    O: Into<Option<WsMessage>> + Unpin + 'static,
137    S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
138    F: Fn(&HttpRequest) -> Result<S, Error> + 'static,
139    E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
140{
141    /// Encapsulate the given [`sod::Service`] or [`sod::MutService`] factory and error handler,
142    /// making the service factory compatible with a native [`actix_web::Handler`] and underlying
143    /// session services compabile with a native [`actix::Handler`] and [`actix::StreamHandler`].
144    pub fn new(factory: F, error_handler: E) -> Self {
145        Self {
146            factory: Arc::new(factory),
147            error_handler: Arc::new(error_handler),
148            _phantom: PhantomData,
149        }
150    }
151}
152impl<O, S, F, E> Clone for WsSessionFactory<O, S, F, E>
153where
154    O: Into<Option<WsMessage>> + Unpin + 'static,
155    S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
156    F: Fn(&HttpRequest) -> Result<S, Error> + 'static,
157    E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
158{
159    fn clone(&self) -> Self {
160        Self {
161            factory: Arc::clone(&self.factory),
162            error_handler: Arc::clone(&self.error_handler),
163            _phantom: PhantomData,
164        }
165    }
166}
167impl<O, S, F, E> Handler<(HttpRequest, web::Payload)> for WsSessionFactory<O, S, F, E>
168where
169    O: Into<Option<WsMessage>> + Unpin + 'static,
170    S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
171    F: Fn(&HttpRequest) -> Result<S, Error> + 'static,
172    E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
173{
174    type Output = Result<HttpResponse, Error>;
175    type Future = SettableFuture<Result<HttpResponse, Error>>;
176    fn call(&self, (req, stream): (HttpRequest, web::Payload)) -> Self::Future {
177        let result = match (self.factory)(&req) {
178            Ok(service) => ws::start(
179                WsActor::new(service, Arc::clone(&self.error_handler)),
180                &req,
181                stream,
182            ),
183            Err(err) => Err(err),
184        };
185        SettableFuture::new().set(result)
186    }
187}
188
189/// Provided by [`WsSessionEvent::Started`], which may be used to asychronously send [`WsMessage`]
190/// to the underlying session outside of the session handler service.
191///
192/// This impls [`Service<WsMessage>`], which can easily be chained to other [`Service`] impls to
193/// dispatch events to the session from other pipelines.
194#[derive(Debug)]
195pub struct WsSendService {
196    recipient: Recipient<WsMessage>,
197}
198impl WsSendService {
199    fn new(recipient: Recipient<WsMessage>) -> Self {
200        Self { recipient }
201    }
202}
203impl Service for WsSendService {
204    type Input = WsMessage;
205    type Output = ();
206    type Error = SendError<WsMessage>;
207    fn process(&self, msg: WsMessage) -> Result<Self::Output, Self::Error> {
208        self.recipient.try_send(msg)
209    }
210}
211
212/// The input to a session [`MutService`] or [`Service`] impl.
213///
214/// - [`WsSessionEvent::Started`] - always called first, providing a [`WsSendService`].
215/// - [`WsSessionEvent::Message`] - called when a message is received from the client.
216/// - [`WsSessionEvent::Error`] - called when the [`actix_web`] [`StreamHandler`] reports a session error.
217/// - [`WsSessionEvent::Stopped`] - called as the final action before the session is dropped.
218#[derive(Debug)]
219pub enum WsSessionEvent {
220    Started(WsSendService),
221    Message(WsMessage),
222    Error(ws::ProtocolError),
223    Stopped,
224}
225impl WsSessionEvent {
226    fn from_actix_result(result: Result<ws::Message, ws::ProtocolError>) -> Option<Self> {
227        match result {
228            Ok(message) => match WsMessage::from_actix_ws_message(message) {
229                None => None,
230                Some(message) => Some(WsSessionEvent::Message(message)),
231            },
232            Err(err) => Some(Self::Error(err)),
233        }
234    }
235}
236
237/// A WebSocket message, which is used to receive or send messages from a session.
238#[derive(Debug)]
239pub enum WsMessage {
240    Ping(Vec<u8>),
241    Pong(Vec<u8>),
242    Binary(Vec<u8>),
243    Text(String),
244    Close(Option<CloseReason>),
245}
246impl WsMessage {
247    fn from_actix_ws_message(src: ws::Message) -> Option<Self> {
248        match src {
249            ws::Message::Binary(data) => Some(Self::Binary(data.into())),
250            ws::Message::Ping(data) => Some(Self::Ping(data.into())),
251            ws::Message::Pong(data) => Some(Self::Pong(data.into())),
252            ws::Message::Close(reason) => Some(Self::Close(reason)),
253            ws::Message::Text(text) => Some(Self::Text(text.into())),
254            ws::Message::Continuation(_) => None,
255            ws::Message::Nop => None,
256        }
257    }
258}
259impl From<WsMessage> for ws::Message {
260    fn from(value: WsMessage) -> Self {
261        match value {
262            WsMessage::Ping(data) => ws::Message::Ping(data.into()),
263            WsMessage::Pong(data) => ws::Message::Pong(data.into()),
264            WsMessage::Binary(data) => ws::Message::Binary(data.into()),
265            WsMessage::Text(text) => ws::Message::Text(text.into()),
266            WsMessage::Close(reason) => ws::Message::Close(reason),
267        }
268    }
269}
270impl actix::Message for WsMessage {
271    type Result = ();
272}
273
274/// Internal [`Actor`] that dispatches to the underlying [`MutService`].
275struct WsActor<O, S, E>
276where
277    O: Unpin + 'static,
278    S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
279    E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
280{
281    service: S,
282    error_handler: Arc<E>,
283    _phantom: PhantomData<fn(O)>,
284}
285impl<O, S, E> WsActor<O, S, E>
286where
287    O: Into<Option<WsMessage>> + Unpin + 'static,
288    S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
289    E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
290{
291    fn new(service: S, error_handler: Arc<E>) -> Self {
292        Self {
293            service,
294            error_handler,
295            _phantom: PhantomData,
296        }
297    }
298}
299impl<O, S, E> Actor for WsActor<O, S, E>
300where
301    O: Into<Option<WsMessage>> + Unpin + 'static,
302    S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
303    E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
304{
305    type Context = ws::WebsocketContext<Self>;
306    fn started(&mut self, ctx: &mut Self::Context) {
307        match self
308            .service
309            .process(WsSessionEvent::Started(WsSendService::new(
310                ctx.address().recipient(),
311            ))) {
312            Ok(send) => {
313                if let Some(send) = send.into() {
314                    ctx.write_raw(send.into());
315                }
316            }
317            Err(err) => {
318                if let Err(_) = (self.error_handler)(&mut self.service, err) {
319                    ctx.close(Some(CloseReason::from(CloseCode::Error)));
320                }
321            }
322        }
323    }
324    fn stopped(&mut self, _ctx: &mut Self::Context) {
325        if let Err(err) = self.service.process(WsSessionEvent::Stopped) {
326            (self.error_handler)(&mut self.service, err).ok();
327        }
328    }
329}
330impl<O, S, E> actix::Handler<WsMessage> for WsActor<O, S, E>
331where
332    O: Into<Option<WsMessage>> + Unpin + 'static,
333    S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
334    E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
335{
336    type Result = ();
337    fn handle(&mut self, msg: WsMessage, ctx: &mut Self::Context) -> Self::Result {
338        match msg {
339            WsMessage::Ping(data) => ctx.ping(&data),
340            WsMessage::Pong(data) => ctx.pong(&data),
341            WsMessage::Binary(data) => ctx.binary(data),
342            WsMessage::Text(text) => ctx.text(text),
343            WsMessage::Close(reason) => ctx.close(reason),
344        }
345    }
346}
347impl<O, S, E> StreamHandler<Result<ws::Message, ws::ProtocolError>> for WsActor<O, S, E>
348where
349    O: Into<Option<WsMessage>> + Unpin + 'static,
350    S: MutService<Input = WsSessionEvent, Output = O> + Unpin + 'static,
351    E: Fn(&mut S, S::Error) -> Result<(), S::Error> + Unpin + 'static,
352{
353    fn handle(&mut self, msg: Result<ws::Message, ws::ProtocolError>, ctx: &mut Self::Context) {
354        if let Some(msg) = WsSessionEvent::from_actix_result(msg) {
355            if let WsSessionEvent::Message(WsMessage::Ping(data)) = &msg {
356                ctx.pong(data);
357            }
358            match self.service.process(msg) {
359                Ok(send) => {
360                    if let Some(send) = send.into() {
361                        ctx.write_raw(send.into());
362                    }
363                }
364                Err(err) => {
365                    if let Err(_) = (self.error_handler)(&mut self.service, err) {
366                        ctx.close(Some(CloseCode::Error.into()));
367                    }
368                }
369            }
370        }
371    }
372}