actix_ws_ng/
session.rs

1use std::sync::{
2    atomic::{AtomicBool, Ordering},
3    Arc,
4};
5
6use actix_http::ws::{CloseReason, Message};
7use actix_web::web::Bytes;
8use bytestring::ByteString;
9use tokio::sync::mpsc::Sender;
10
11/// A handle into the websocket session.
12///
13/// This type can be used to send messages into the websocket.
14#[derive(Clone)]
15pub struct Session {
16    inner: Option<Sender<Message>>,
17    closed: Arc<AtomicBool>,
18}
19
20/// The error representing a closed websocket session
21#[derive(Debug)]
22pub struct Closed;
23
24impl std::fmt::Display for Closed {
25    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26        write!(f, "Session is closed")
27    }
28}
29
30impl std::error::Error for Closed {}
31
32impl Session {
33    pub(super) fn new(inner: Sender<Message>) -> Self {
34        Session {
35            inner: Some(inner),
36            closed: Arc::new(AtomicBool::new(false)),
37        }
38    }
39
40    fn pre_check(&mut self) {
41        if self.closed.load(Ordering::Relaxed) {
42            self.inner.take();
43        }
44    }
45
46    /// Send text into the websocket
47    ///
48    /// ```rust,ignore
49    /// if session.text("Some text").await.is_err() {
50    ///     // session closed
51    /// }
52    /// ```
53    pub async fn text(&mut self, msg: impl Into<ByteString>) -> Result<(), Closed> {
54        self.pre_check();
55        if let Some(inner) = self.inner.as_mut() {
56            inner
57                .send(Message::Text(msg.into()))
58                .await
59                .map_err(|_| Closed)
60        } else {
61            Err(Closed)
62        }
63    }
64
65    /// Send raw bytes into the websocket
66    ///
67    /// ```rust,ignore
68    /// if session.binary(b"some bytes").await.is_err() {
69    ///     // session closed
70    /// }
71    /// ```
72    pub async fn binary(&mut self, msg: impl Into<Bytes>) -> Result<(), Closed> {
73        self.pre_check();
74        if let Some(inner) = self.inner.as_mut() {
75            inner
76                .send(Message::Binary(msg.into()))
77                .await
78                .map_err(|_| Closed)
79        } else {
80            Err(Closed)
81        }
82    }
83
84    /// Ping the client
85    ///
86    /// For many applications, it will be important to send regular pings to keep track of if the
87    /// client has disconnected
88    ///
89    /// ```rust,ignore
90    /// if session.ping(b"").await.is_err() {
91    ///     // session is closed
92    /// }
93    /// ```
94    pub async fn ping(&mut self, msg: &[u8]) -> Result<(), Closed> {
95        self.pre_check();
96        if let Some(inner) = self.inner.as_mut() {
97            inner
98                .send(Message::Ping(Bytes::copy_from_slice(msg)))
99                .await
100                .map_err(|_| Closed)
101        } else {
102            Err(Closed)
103        }
104    }
105
106    /// Pong the client
107    ///
108    /// ```rust,ignore
109    /// match msg {
110    ///     Message::Ping(bytes) => {
111    ///         let _ = session.pong(&bytes).await;
112    ///     }
113    ///     _ => (),
114    /// }
115    pub async fn pong(&mut self, msg: &[u8]) -> Result<(), Closed> {
116        self.pre_check();
117        if let Some(inner) = self.inner.as_mut() {
118            inner
119                .send(Message::Pong(Bytes::copy_from_slice(msg)))
120                .await
121                .map_err(|_| Closed)
122        } else {
123            Err(Closed)
124        }
125    }
126
127    /// Send a close message, and consume the session
128    ///
129    /// All clones will return `Err(Closed)` if used after this call
130    ///
131    /// ```rust,ignore
132    /// session.close(None).await
133    /// ```
134    pub async fn close(mut self, reason: Option<CloseReason>) -> Result<(), Closed> {
135        self.pre_check();
136        if let Some(inner) = self.inner.take() {
137            self.closed.store(true, Ordering::Relaxed);
138            inner.send(Message::Close(reason)).await.map_err(|_| Closed)
139        } else {
140            Err(Closed)
141        }
142    }
143}