actix_ws/session.rs
1use std::{
2 fmt,
3 sync::{
4 atomic::{AtomicBool, Ordering},
5 Arc,
6 },
7};
8
9use actix_http::ws::{CloseReason, Item, Message};
10use actix_web::web::Bytes;
11use bytestring::ByteString;
12use tokio::sync::mpsc::Sender;
13
14/// A handle into the websocket session.
15///
16/// This type can be used to send messages into the WebSocket.
17#[derive(Clone)]
18pub struct Session {
19 inner: Option<Sender<Message>>,
20 closed: Arc<AtomicBool>,
21}
22
23/// The error representing a closed websocket session
24#[derive(Debug)]
25pub struct Closed;
26
27impl fmt::Display for Closed {
28 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
29 f.write_str("Session is closed")
30 }
31}
32
33impl std::error::Error for Closed {}
34
35impl Session {
36 pub(super) fn new(inner: Sender<Message>) -> Self {
37 Session {
38 inner: Some(inner),
39 closed: Arc::new(AtomicBool::new(false)),
40 }
41 }
42
43 fn pre_check(&mut self) {
44 if self.closed.load(Ordering::Relaxed) {
45 self.inner.take();
46 }
47 }
48
49 /// Sends text into the WebSocket.
50 ///
51 /// ```no_run
52 /// # use actix_ws::Session;
53 /// # async fn test(mut session: Session) {
54 /// if session.text("Some text").await.is_err() {
55 /// // session closed
56 /// }
57 /// # }
58 /// ```
59 pub async fn text(&mut self, msg: impl Into<ByteString>) -> Result<(), Closed> {
60 self.pre_check();
61 if let Some(inner) = self.inner.as_mut() {
62 inner
63 .send(Message::Text(msg.into()))
64 .await
65 .map_err(|_| Closed)
66 } else {
67 Err(Closed)
68 }
69 }
70
71 /// Sends raw bytes into the WebSocket.
72 ///
73 /// ```no_run
74 /// # use actix_ws::Session;
75 /// # async fn test(mut session: Session) {
76 /// if session.binary(&b"some bytes"[..]).await.is_err() {
77 /// // session closed
78 /// }
79 /// # }
80 /// ```
81 pub async fn binary(&mut self, msg: impl Into<Bytes>) -> Result<(), Closed> {
82 self.pre_check();
83 if let Some(inner) = self.inner.as_mut() {
84 inner
85 .send(Message::Binary(msg.into()))
86 .await
87 .map_err(|_| Closed)
88 } else {
89 Err(Closed)
90 }
91 }
92
93 /// Pings the client.
94 ///
95 /// For many applications, it will be important to send regular pings to keep track of if the
96 /// client has disconnected
97 ///
98 /// ```no_run
99 /// # use actix_ws::Session;
100 /// # async fn test(mut session: Session) {
101 /// if session.ping(b"").await.is_err() {
102 /// // session is closed
103 /// }
104 /// # }
105 /// ```
106 pub async fn ping(&mut self, msg: &[u8]) -> Result<(), Closed> {
107 self.pre_check();
108 if let Some(inner) = self.inner.as_mut() {
109 inner
110 .send(Message::Ping(Bytes::copy_from_slice(msg)))
111 .await
112 .map_err(|_| Closed)
113 } else {
114 Err(Closed)
115 }
116 }
117
118 /// Pongs the client.
119 ///
120 /// ```no_run
121 /// # use actix_ws::{Message, Session};
122 /// # async fn test(mut session: Session, msg: Message) {
123 /// match msg {
124 /// Message::Ping(bytes) => {
125 /// let _ = session.pong(&bytes).await;
126 /// }
127 /// _ => (),
128 /// }
129 /// # }
130 pub async fn pong(&mut self, msg: &[u8]) -> Result<(), Closed> {
131 self.pre_check();
132 if let Some(inner) = self.inner.as_mut() {
133 inner
134 .send(Message::Pong(Bytes::copy_from_slice(msg)))
135 .await
136 .map_err(|_| Closed)
137 } else {
138 Err(Closed)
139 }
140 }
141
142 /// Manually controls sending continuations.
143 ///
144 /// Be wary of this method. Continuations represent multiple frames that, when combined, are
145 /// presented as a single message. They are useful when the entire contents of a message are
146 /// not available all at once. However, continuations MUST NOT be interrupted by other Text or
147 /// Binary messages. Control messages such as Ping, Pong, or Close are allowed to interrupt a
148 /// continuation.
149 ///
150 /// Continuations must be initialized with a First variant, and must be terminated by a Last
151 /// variant, with only Continue variants sent in between.
152 ///
153 /// ```no_run
154 /// # use actix_ws::{Item, Session};
155 /// # async fn test(mut session: Session) -> Result<(), Box<dyn std::error::Error>> {
156 /// session.continuation(Item::FirstText("Hello".into())).await?;
157 /// session.continuation(Item::Continue(b", World"[..].into())).await?;
158 /// session.continuation(Item::Last(b"!"[..].into())).await?;
159 /// # Ok(())
160 /// # }
161 /// ```
162 pub async fn continuation(&mut self, msg: Item) -> Result<(), Closed> {
163 self.pre_check();
164 if let Some(inner) = self.inner.as_mut() {
165 inner
166 .send(Message::Continuation(msg))
167 .await
168 .map_err(|_| Closed)
169 } else {
170 Err(Closed)
171 }
172 }
173
174 /// Sends a close message, and consumes the session.
175 ///
176 /// All clones will return `Err(Closed)` if used after this call.
177 ///
178 /// ```no_run
179 /// # use actix_ws::{Closed, Session};
180 /// # async fn test(mut session: Session) -> Result<(), Closed> {
181 /// session.close(None).await
182 /// # }
183 /// ```
184 pub async fn close(mut self, reason: Option<CloseReason>) -> Result<(), Closed> {
185 self.pre_check();
186
187 if let Some(inner) = self.inner.take() {
188 self.closed.store(true, Ordering::Relaxed);
189 inner.send(Message::Close(reason)).await.map_err(|_| Closed)
190 } else {
191 Err(Closed)
192 }
193 }
194}