1use std::sync::{
2 atomic::{AtomicBool, Ordering},
3 Arc,
4};
5
6use actix_http::ws::{CloseReason, Message};
7use actix_web::web::Bytes;
8use bytestring::ByteString;
9use tokio::sync::mpsc::Sender;
10
11#[derive(Clone)]
15pub struct Session {
16 inner: Option<Sender<Message>>,
17 closed: Arc<AtomicBool>,
18}
19
20#[derive(Debug)]
22pub struct Closed;
23
24impl std::fmt::Display for Closed {
25 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
26 write!(f, "Session is closed")
27 }
28}
29
30impl std::error::Error for Closed {}
31
32impl Session {
33 pub(super) fn new(inner: Sender<Message>) -> Self {
34 Session {
35 inner: Some(inner),
36 closed: Arc::new(AtomicBool::new(false)),
37 }
38 }
39
40 fn pre_check(&mut self) {
41 if self.closed.load(Ordering::Relaxed) {
42 self.inner.take();
43 }
44 }
45
46 pub async fn text(&mut self, msg: impl Into<ByteString>) -> Result<(), Closed> {
54 self.pre_check();
55 if let Some(inner) = self.inner.as_mut() {
56 inner
57 .send(Message::Text(msg.into()))
58 .await
59 .map_err(|_| Closed)
60 } else {
61 Err(Closed)
62 }
63 }
64
65 pub async fn binary(&mut self, msg: impl Into<Bytes>) -> Result<(), Closed> {
73 self.pre_check();
74 if let Some(inner) = self.inner.as_mut() {
75 inner
76 .send(Message::Binary(msg.into()))
77 .await
78 .map_err(|_| Closed)
79 } else {
80 Err(Closed)
81 }
82 }
83
84 pub async fn ping(&mut self, msg: &[u8]) -> Result<(), Closed> {
95 self.pre_check();
96 if let Some(inner) = self.inner.as_mut() {
97 inner
98 .send(Message::Ping(Bytes::copy_from_slice(msg)))
99 .await
100 .map_err(|_| Closed)
101 } else {
102 Err(Closed)
103 }
104 }
105
106 pub async fn pong(&mut self, msg: &[u8]) -> Result<(), Closed> {
116 self.pre_check();
117 if let Some(inner) = self.inner.as_mut() {
118 inner
119 .send(Message::Pong(Bytes::copy_from_slice(msg)))
120 .await
121 .map_err(|_| Closed)
122 } else {
123 Err(Closed)
124 }
125 }
126
127 pub async fn close(mut self, reason: Option<CloseReason>) -> Result<(), Closed> {
135 self.pre_check();
136 if let Some(inner) = self.inner.take() {
137 self.closed.store(true, Ordering::Relaxed);
138 inner.send(Message::Close(reason)).await.map_err(|_| Closed)
139 } else {
140 Err(Closed)
141 }
142 }
143}