supabase_realtime_rs/client/
connection.rs

1use crate::types::{error::Result, message::RealtimeMessage};
2use futures::SinkExt;
3use futures::stream::SplitSink;
4use serde_json;
5use std::sync::Arc;
6use tokio::net::TcpStream;
7use tokio::sync::RwLock;
8use tokio_tungstenite::{MaybeTlsStream, WebSocketStream, tungstenite::Message};
9
10#[derive(Debug, Clone, Copy, PartialEq)]
11pub enum ConnectionState {
12    Closed,
13    Connecting,
14    Open,
15    Closing,
16}
17
18/// Type alias for WebSocket writer to reduce complexity
19type WebSocketWriter = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
20
21pub struct ConnectionManager {
22    ws_write: Arc<RwLock<Option<WebSocketWriter>>>,
23    state: Arc<RwLock<ConnectionState>>,
24}
25
26impl ConnectionManager {
27    pub fn new() -> Self {
28        Self {
29            ws_write: Arc::new(RwLock::new(None)),
30            state: Arc::new(RwLock::new(ConnectionState::Closed)),
31        }
32    }
33
34    /// Sets the WebSocket write sink (called after successful connection)
35    pub async fn set_writer(&self, writer: WebSocketWriter) {
36        let mut ws = self.ws_write.write().await;
37        *ws = Some(writer);
38    }
39
40    /// Gets the current connection state
41    pub async fn state(&self) -> ConnectionState {
42        *self.state.read().await
43    }
44
45    /// Sets the connection state
46    pub async fn set_state(&self, new_state: ConnectionState) {
47        let mut state = self.state.write().await;
48        *state = new_state;
49    }
50
51    /// Checks if currently connected
52    pub async fn is_connected(&self) -> bool {
53        *self.state.read().await == ConnectionState::Open
54    }
55
56    /// Sends a message through the WebSocket connection
57    pub async fn send_message(&self, msg: RealtimeMessage) -> Result<()> {
58        let json = serde_json::to_string(&msg)?;
59        let message = Message::Text(json.into());
60
61        let mut ws_guard = self.ws_write.write().await;
62        if let Some(ws) = ws_guard.as_mut() {
63            ws.send(message).await?;
64        }
65
66        Ok(())
67    }
68
69    /// Closes the WebSocket connection gracefully
70    pub async fn close(&self) -> Result<()> {
71        self.set_state(ConnectionState::Closing).await;
72
73        let mut ws_guard = self.ws_write.write().await;
74        if let Some(ws) = ws_guard.as_mut() {
75            ws.close().await?;
76        }
77        *ws_guard = None;
78
79        self.set_state(ConnectionState::Closed).await;
80
81        Ok(())
82    }
83
84    /// Clears the writer (used during disconnect)
85    pub async fn clear_writer(&self) {
86        let mut ws = self.ws_write.write().await;
87        *ws = None;
88    }
89}
90
91impl Default for ConnectionManager {
92    fn default() -> Self {
93        Self::new()
94    }
95}