actix_ws/lib.rs
1//! WebSockets for Actix Web, without actors.
2//!
3//! For usage, see documentation on [`handle()`].
4
5#![warn(missing_docs)]
6#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
7#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]
8#![cfg_attr(docsrs, feature(doc_cfg))]
9
10pub use actix_http::ws::{CloseCode, CloseReason, Item, Message, ProtocolError};
11use actix_http::{
12 body::{BodyStream, MessageBody},
13 ws::handshake,
14};
15use actix_web::{web, HttpRequest, HttpResponse};
16use tokio::sync::mpsc::channel;
17
18mod aggregated;
19mod session;
20mod stream;
21
22pub use self::{
23 aggregated::{AggregatedMessage, AggregatedMessageStream},
24 session::{Closed, Session},
25 stream::{MessageStream, StreamingBody},
26};
27
28/// Begin handling websocket traffic
29///
30/// ```no_run
31/// use std::io;
32/// use actix_web::{middleware::Logger, web, App, HttpRequest, HttpServer, Responder};
33/// use actix_ws::Message;
34/// use futures_util::StreamExt as _;
35///
36/// async fn ws(req: HttpRequest, body: web::Payload) -> actix_web::Result<impl Responder> {
37/// let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
38///
39/// actix_web::rt::spawn(async move {
40/// while let Some(Ok(msg)) = msg_stream.next().await {
41/// match msg {
42/// Message::Ping(bytes) => {
43/// if session.pong(&bytes).await.is_err() {
44/// return;
45/// }
46/// }
47///
48/// Message::Text(msg) => println!("Got text: {msg}"),
49/// _ => break,
50/// }
51/// }
52///
53/// let _ = session.close(None).await;
54/// });
55///
56/// Ok(response)
57/// }
58///
59/// #[tokio::main(flavor = "current_thread")]
60/// async fn main() -> io::Result<()> {
61/// HttpServer::new(move || {
62/// App::new()
63/// .route("/ws", web::get().to(ws))
64/// .wrap(Logger::default())
65/// })
66/// .bind(("127.0.0.1", 8080))?
67/// .run()
68/// .await
69/// }
70/// ```
71pub fn handle(
72 req: &HttpRequest,
73 body: web::Payload,
74) -> Result<(HttpResponse, Session, MessageStream), actix_web::Error> {
75 let mut response = handshake(req.head())?;
76 let (tx, rx) = channel(32);
77
78 Ok((
79 response
80 .message_body(BodyStream::new(StreamingBody::new(rx)).boxed())?
81 .into(),
82 Session::new(tx),
83 MessageStream::new(body.into_inner()),
84 ))
85}