actix_ws/
session.rs

1use std::{
2    fmt,
3    sync::{
4        atomic::{AtomicBool, Ordering},
5        Arc,
6    },
7};
8
9use actix_http::ws::{CloseReason, Item, Message};
10use actix_web::web::Bytes;
11use bytestring::ByteString;
12use tokio::sync::mpsc::Sender;
13
14/// A handle into the websocket session.
15///
16/// This type can be used to send messages into the WebSocket.
17#[derive(Clone)]
18pub struct Session {
19    inner: Option<Sender<Message>>,
20    closed: Arc<AtomicBool>,
21}
22
23/// The error representing a closed websocket session
24#[derive(Debug)]
25pub struct Closed;
26
27impl fmt::Display for Closed {
28    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29        f.write_str("Session is closed")
30    }
31}
32
33impl std::error::Error for Closed {}
34
35impl Session {
36    pub(super) fn new(inner: Sender<Message>) -> Self {
37        Session {
38            inner: Some(inner),
39            closed: Arc::new(AtomicBool::new(false)),
40        }
41    }
42
43    fn pre_check(&mut self) {
44        if self.closed.load(Ordering::Relaxed) {
45            self.inner.take();
46        }
47    }
48
49    /// Sends text into the WebSocket.
50    ///
51    /// ```no_run
52    /// # use actix_ws::Session;
53    /// # async fn test(mut session: Session) {
54    /// if session.text("Some text").await.is_err() {
55    ///     // session closed
56    /// }
57    /// # }
58    /// ```
59    pub async fn text(&mut self, msg: impl Into<ByteString>) -> Result<(), Closed> {
60        self.pre_check();
61        if let Some(inner) = self.inner.as_mut() {
62            inner
63                .send(Message::Text(msg.into()))
64                .await
65                .map_err(|_| Closed)
66        } else {
67            Err(Closed)
68        }
69    }
70
71    /// Sends raw bytes into the WebSocket.
72    ///
73    /// ```no_run
74    /// # use actix_ws::Session;
75    /// # async fn test(mut session: Session) {
76    /// if session.binary(&b"some bytes"[..]).await.is_err() {
77    ///     // session closed
78    /// }
79    /// # }
80    /// ```
81    pub async fn binary(&mut self, msg: impl Into<Bytes>) -> Result<(), Closed> {
82        self.pre_check();
83        if let Some(inner) = self.inner.as_mut() {
84            inner
85                .send(Message::Binary(msg.into()))
86                .await
87                .map_err(|_| Closed)
88        } else {
89            Err(Closed)
90        }
91    }
92
93    /// Pings the client.
94    ///
95    /// For many applications, it will be important to send regular pings to keep track of if the
96    /// client has disconnected
97    ///
98    /// ```no_run
99    /// # use actix_ws::Session;
100    /// # async fn test(mut session: Session) {
101    /// if session.ping(b"").await.is_err() {
102    ///     // session is closed
103    /// }
104    /// # }
105    /// ```
106    pub async fn ping(&mut self, msg: &[u8]) -> Result<(), Closed> {
107        self.pre_check();
108        if let Some(inner) = self.inner.as_mut() {
109            inner
110                .send(Message::Ping(Bytes::copy_from_slice(msg)))
111                .await
112                .map_err(|_| Closed)
113        } else {
114            Err(Closed)
115        }
116    }
117
118    /// Pongs the client.
119    ///
120    /// ```no_run
121    /// # use actix_ws::{Message, Session};
122    /// # async fn test(mut session: Session, msg: Message) {
123    /// match msg {
124    ///     Message::Ping(bytes) => {
125    ///         let _ = session.pong(&bytes).await;
126    ///     }
127    ///     _ => (),
128    /// }
129    /// # }
130    pub async fn pong(&mut self, msg: &[u8]) -> Result<(), Closed> {
131        self.pre_check();
132        if let Some(inner) = self.inner.as_mut() {
133            inner
134                .send(Message::Pong(Bytes::copy_from_slice(msg)))
135                .await
136                .map_err(|_| Closed)
137        } else {
138            Err(Closed)
139        }
140    }
141
142    /// Manually controls sending continuations.
143    ///
144    /// Be wary of this method. Continuations represent multiple frames that, when combined, are
145    /// presented as a single message. They are useful when the entire contents of a message are
146    /// not available all at once. However, continuations MUST NOT be interrupted by other Text or
147    /// Binary messages. Control messages such as Ping, Pong, or Close are allowed to interrupt a
148    /// continuation.
149    ///
150    /// Continuations must be initialized with a First variant, and must be terminated by a Last
151    /// variant, with only Continue variants sent in between.
152    ///
153    /// ```no_run
154    /// # use actix_ws::{Item, Session};
155    /// # async fn test(mut session: Session) -> Result<(), Box<dyn std::error::Error>> {
156    /// session.continuation(Item::FirstText("Hello".into())).await?;
157    /// session.continuation(Item::Continue(b", World"[..].into())).await?;
158    /// session.continuation(Item::Last(b"!"[..].into())).await?;
159    /// # Ok(())
160    /// # }
161    /// ```
162    pub async fn continuation(&mut self, msg: Item) -> Result<(), Closed> {
163        self.pre_check();
164        if let Some(inner) = self.inner.as_mut() {
165            inner
166                .send(Message::Continuation(msg))
167                .await
168                .map_err(|_| Closed)
169        } else {
170            Err(Closed)
171        }
172    }
173
174    /// Sends a close message, and consumes the session.
175    ///
176    /// All clones will return `Err(Closed)` if used after this call.
177    ///
178    /// ```no_run
179    /// # use actix_ws::{Closed, Session};
180    /// # async fn test(mut session: Session) -> Result<(), Closed> {
181    /// session.close(None).await
182    /// # }
183    /// ```
184    pub async fn close(mut self, reason: Option<CloseReason>) -> Result<(), Closed> {
185        self.pre_check();
186
187        if let Some(inner) = self.inner.take() {
188            self.closed.store(true, Ordering::Relaxed);
189            inner.send(Message::Close(reason)).await.map_err(|_| Closed)
190        } else {
191            Err(Closed)
192        }
193    }
194}