supabase_realtime_rs/client/
connection.rs1use crate::types::{error::Result, message::RealtimeMessage};
2use futures::SinkExt;
3use futures::stream::SplitSink;
4use serde_json;
5use std::sync::Arc;
6use tokio::net::TcpStream;
7use tokio::sync::RwLock;
8use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, tungstenite::Message};
9
10#[derive(Debug, Clone, Copy, PartialEq)]
11pub enum ConnectionState {
12 Closed,
13 Connecting,
14 Open,
15 Closing,
16}
17
18type WebSocketWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
20
21pub struct ConnectionManager {
22 ws_write: Arc<RwLock<Option<WebSocketWriter>>>,
23 state: Arc<RwLock<ConnectionState>>,
24}
25
26impl ConnectionManager {
27 pub fn new() -> Self {
28 Self {
29 ws_write: Arc::new(RwLock::new(None)),
30 state: Arc::new(RwLock::new(ConnectionState::Closed)),
31 }
32 }
33
34 pub async fn set_writer(&self, writer: WebSocketWriter) {
36 let mut ws = self.ws_write.write().await;
37 *ws = Some(writer);
38 }
39
40 pub async fn state(&self) -> ConnectionState {
42 *self.state.read().await
43 }
44
45 pub async fn set_state(&self, new_state: ConnectionState) {
47 let mut state = self.state.write().await;
48 *state = new_state;
49 }
50
51 pub async fn is_connected(&self) -> bool {
53 *self.state.read().await == ConnectionState::Open
54 }
55
56 pub async fn send_message(&self, msg: RealtimeMessage) -> Result<()> {
58 let json = serde_json::to_string(&msg)?;
59 let message = Message::Text(json.into());
60
61 let mut ws_guard = self.ws_write.write().await;
62 if let Some(ws) = ws_guard.as_mut() {
63 ws.send(message).await?;
64 }
65
66 Ok(())
67 }
68
69 pub async fn close(&self) -> Result<()> {
71 self.set_state(ConnectionState::Closing).await;
72
73 let mut ws_guard = self.ws_write.write().await;
74 if let Some(ws) = ws_guard.as_mut() {
75 ws.close().await?;
76 }
77 *ws_guard = None;
78
79 self.set_state(ConnectionState::Closed).await;
80
81 Ok(())
82 }
83
84 pub async fn clear_writer(&self) {
86 let mut ws = self.ws_write.write().await;
87 *ws = None;
88 }
89}
90
91impl Default for ConnectionManager {
92 fn default() -> Self {
93 Self::new()
94 }
95}