actix_ws/
lib.rs

1//! WebSockets for Actix Web, without actors.
2//!
3//! For usage, see documentation on [`handle()`].
4
5#![warn(missing_docs)]
6#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
7#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
8#![cfg_attr(docsrs, feature(doc_cfg))]
9
10pub use actix_http::ws::{CloseCode, CloseReason, Item, Message, ProtocolError};
11use actix_http::{
12    body::{BodyStream, MessageBody},
13    ws::handshake,
14};
15use actix_web::{web, HttpRequest, HttpResponse};
16use tokio::sync::mpsc::channel;
17
18mod aggregated;
19mod session;
20mod stream;
21
22pub use self::{
23    aggregated::{AggregatedMessage, AggregatedMessageStream},
24    session::{Closed, Session},
25    stream::{MessageStream, StreamingBody},
26};
27
28/// Begin handling websocket traffic
29///
30/// ```no_run
31/// use std::io;
32/// use actix_web::{middleware::Logger, web, App, HttpRequest, HttpServer, Responder};
33/// use actix_ws::Message;
34/// use futures_util::StreamExt as _;
35///
36/// async fn ws(req: HttpRequest, body: web::Payload) -> actix_web::Result<impl Responder> {
37///     let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
38///
39///     actix_web::rt::spawn(async move {
40///         while let Some(Ok(msg)) = msg_stream.next().await {
41///             match msg {
42///                 Message::Ping(bytes) => {
43///                     if session.pong(&bytes).await.is_err() {
44///                         return;
45///                     }
46///                 }
47///
48///                 Message::Text(msg) => println!("Got text: {msg}"),
49///                 _ => break,
50///             }
51///         }
52///
53///         let _ = session.close(None).await;
54///     });
55///
56///     Ok(response)
57/// }
58///
59/// #[tokio::main(flavor = "current_thread")]
60/// async fn main() -> io::Result<()> {
61///     HttpServer::new(move || {
62///         App::new()
63///             .route("/ws", web::get().to(ws))
64///             .wrap(Logger::default())
65///     })
66///     .bind(("127.0.0.1", 8080))?
67///     .run()
68///     .await
69/// }
70/// ```
71pub fn handle(
72    req: &HttpRequest,
73    body: web::Payload,
74) -> Result<(HttpResponse, Session, MessageStream), actix_web::Error> {
75    let mut response = handshake(req.head())?;
76    let (tx, rx) = channel(32);
77
78    Ok((
79        response
80            .message_body(BodyStream::new(StreamingBody::new(rx)).boxed())?
81            .into(),
82        Session::new(tx),
83        MessageStream::new(body.into_inner()),
84    ))
85}