1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
#![deny(missing_docs)] //! Websockets without Actors for actix runtimes //! //! See documentation for the [`handle`] method for usage use actix_http::ws::handshake; use actix_web::{web, HttpRequest, HttpResponse}; use tokio::sync::mpsc::channel; pub use actix_http::ws::{CloseCode, CloseReason, Message, ProtocolError}; mod fut; mod session; pub use self::{ fut::{MessageStream, StreamingBody}, session::{Closed, Session}, }; /// Begin handling websocket traffic /// /// ```rust,ignore /// use actix_web::{middleware::Logger, web, App, Error, HttpRequest, HttpResponse, HttpServer}; /// use actix_ws::Message; /// /// async fn ws(req: HttpRequest, body: web::Payload) -> Result<HttpResponse, Error> { /// let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?; /// /// actix_rt::spawn(async move { /// while let Some(Ok(msg)) = msg_stream.next().await { /// match msg { /// Message::Ping(bytes) => { /// if session.pong(&bytes).await.is_err() { /// return; /// } /// } /// Message::Text(s) => println!("Got text, {}", s), /// _ => break, /// } /// } /// /// let _ = session.close(None).await; /// }); /// /// Ok(response) /// } /// /// #[actix_rt::main] /// async fn main() -> Result<(), anyhow::Error> { /// HttpServer::new(move || { /// App::new() /// .wrap(Logger::default()) /// .route("/ws", web::get().to(ws)) /// }) /// .bind("127.0.0.1:8080")? /// .run() /// .await?; /// /// Ok(()) /// } /// ``` pub fn handle( req: &HttpRequest, body: web::Payload, ) -> Result<(HttpResponse, Session, MessageStream), actix_web::Error> { let mut response = handshake(req.head())?; let (tx, rx) = channel(32); Ok(( response.streaming(StreamingBody::new(rx)), Session::new(tx), MessageStream::new(body.into_inner()), )) }