actix_ws_ng/
lib.rs

1#![deny(rust_2018_idioms, nonstandard_style, future_incompatible)]
2#![warn(missing_docs)]
3#![cfg_attr(docsrs, feature(doc_auto_cfg))]
4
5//! WebSockets for Actix Web, without actors.
6//!
7//! See documentation for the [`handle`] method for usage.
8
9pub use actix_http::ws::{CloseCode, CloseReason, Message, ProtocolError};
10use actix_http::{
11    body::{BodyStream, MessageBody},
12    ws::handshake,
13};
14use actix_web::{web, HttpRequest, HttpResponse};
15use tokio::sync::mpsc::channel;
16
17mod fut;
18mod session;
19
20pub use self::{
21    fut::{MessageStream, StreamingBody},
22    session::{Closed, Session},
23};
24
25/// Begin handling websocket traffic
26///
27/// ```rust,ignore
28/// use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer};
29/// use actix_ws::Message;
30///
31/// async fn ws(req: HttpRequest, body: web::Payload) -> Result<HttpResponse, Error> {
32///     let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?;
33///
34///     actix_rt::spawn(async move {
35///         while let Some(Ok(msg)) = msg_stream.next().await {
36///             match msg {
37///                 Message::Ping(bytes) => {
38///                     if session.pong(&bytes).await.is_err() {
39///                         return;
40///                     }
41///                 }
42///                 Message::Text(s) => println!("Got text, {}", s),
43///                 _ => break,
44///             }
45///         }
46///
47///         let _ = session.close(None).await;
48///     });
49///
50///     Ok(response)
51/// }
52///
53/// #[actix_rt::main]
54/// async fn main() -> Result<(), anyhow::Error> {
55///     HttpServer::new(move || {
56///         App::new()
57///             .wrap(Logger::default())
58///             .route("/ws", web::get().to(ws))
59///     })
60///     .bind("127.0.0.1:8080")?
61///     .run()
62///     .await?;
63///
64///     Ok(())
65/// }
66/// ```
67pub fn handle(
68    req: &HttpRequest,
69    body: web::Payload,
70) -> Result<(HttpResponse, Session, MessageStream), actix_web::Error> {
71    let mut response = handshake(req.head())?;
72    let (tx, rx) = channel(32);
73
74    Ok((
75        response
76            .message_body(BodyStream::new(StreamingBody::new(rx)).boxed())?
77            .into(),
78        Session::new(tx),
79        MessageStream::new(body.into_inner()),
80    ))
81}