supabase_realtime_rs/client/
core.rs

1use super::{
2    ClientState, ConnectionManager, ConnectionState, RealtimeClientBuilder, RealtimeClientOptions,
3};
4use crate::RealtimeChannel;
5use crate::infrastructure::{HeartbeatManager, Timer};
6use crate::messaging::MessageRouter;
7use crate::types::{RealtimeError, RealtimeMessage, Result};
8use crate::websocket::WebSocketFactory;
9use futures::stream::StreamExt;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use url::Url;
13
14/// The main entry point for interacting with Supabase Realtime.
15///
16/// `RealtimeClient` manages the WebSocket connection to Supabase Realtime servers,
17/// handles automatic reconnection with exponential backoff, and provides channel
18/// creation for real-time subscriptions.
19///
20/// # Example
21///
22/// ```no_run
23/// use supabase_realtime_rs::{RealtimeClient, RealtimeClientOptions};
24///
25/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
26/// let client = RealtimeClient::new(
27///     "wss://your-project.supabase.co/realtime/v1",
28///     RealtimeClientOptions {
29///         api_key: "your-anon-key".to_string(),
30///         ..Default::default()
31///     }
32/// )?;
33///
34/// client.connect().await?;
35/// // Use the client...
36/// client.disconnect().await?;
37/// # Ok(())
38/// # }
39/// ```
40#[derive(Clone)]
41pub struct RealtimeClient {
42    pub(crate) endpoint: String,
43    pub(crate) options: RealtimeClientOptions,
44
45    // Connection manager
46    pub(crate) connection: Arc<ConnectionManager>,
47
48    // Consolidated mutable state
49    pub(crate) state: Arc<RwLock<ClientState>>,
50}
51
52impl RealtimeClient {
53    /// Creates a new RealtimeClient instance.
54    ///
55    /// This initializes the client but does not establish a connection. You must call
56    /// [`connect()`](Self::connect) to establish the WebSocket connection.
57    ///
58    /// # Arguments
59    ///
60    /// * `endpoint` - The WebSocket endpoint URL (e.g., `wss://your-project.supabase.co/realtime/v1`)
61    /// * `options` - Configuration options including API key and optional settings
62    ///
63    /// # Returns
64    ///
65    /// Returns `Ok(RealtimeClient)` if the endpoint is valid, or an error if the URL is malformed.
66    ///
67    /// # Errors
68    ///
69    /// Returns [`RealtimeError::UrlParse`](crate::types::RealtimeError::UrlParse) if the endpoint URL cannot be parsed.
70    ///
71    /// # Example
72    ///
73    /// ```no_run
74    /// use supabase_realtime_rs::{RealtimeClient, RealtimeClientOptions};
75    ///
76    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
77    /// let client = RealtimeClient::new(
78    ///     "wss://your-project.supabase.co/realtime/v1",
79    ///     RealtimeClientOptions {
80    ///         api_key: "your-anon-key".to_string(),
81    ///         ..Default::default()
82    ///     }
83    /// )?;
84    /// # Ok(())
85    /// # }
86    /// ```
87    pub fn new(endpoint: impl Into<String>, options: RealtimeClientOptions) -> Result<Self> {
88        RealtimeClientBuilder::new(endpoint, options).map(|builder| builder.build())
89    }
90
91    /// Set connection state and notify watchers
92    async fn set_state(&self, new_state: ConnectionState) {
93        self.connection.set_state(new_state).await;
94
95        let state = self.state.read().await;
96        state.notify_state_change(new_state, state.was_manual_disconnect);
97    }
98
99    /// Set manual disconnect flag and notify watchers
100    async fn set_manual_disconnect(&self, manual: bool) {
101        let mut state = self.state.write().await;
102        state.was_manual_disconnect = manual;
103
104        let conn_state = self.connection.state().await;
105        state.notify_state_change(conn_state, manual);
106    }
107
108    pub async fn resubscribe_all_channels(&self) -> Result<()> {
109        let channels = self.state.read().await.channels.clone();
110        for channel in channels.iter() {
111            if channel.was_joined().await {
112                channel.subscribe().await?;
113            }
114        }
115        Ok(())
116    }
117
118    pub async fn try_reconnect(&self) -> Result<()> {
119        if self.state.read().await.was_manual_disconnect {
120            tracing::info!("Manual disconnect detected, will not attempt to reconnect");
121            return Ok(());
122        }
123
124        let mut timer = Timer::default();
125        loop {
126            {
127                let state = self.connection.state().await;
128                if state == ConnectionState::Open || state == ConnectionState::Connecting {
129                    tracing::info!(
130                        "Already connected or connecting, stopping reconnection attempts"
131                    );
132                    break;
133                }
134            }
135
136            tracing::info!("Attempting to reconnect...");
137            match self.connect().await {
138                Ok(_) => {
139                    tracing::info!("Reconnected successfully");
140                    self.resubscribe_all_channels().await?;
141                    break;
142                }
143                Err(e) => {
144                    tracing::error!("Reconnection attempt failed: {}", e);
145                    timer.schedule_timeout().await;
146                }
147            }
148        }
149        Ok(())
150    }
151    /// Establishes a WebSocket connection to the Supabase Realtime server.
152    ///
153    /// This method opens the WebSocket connection, starts the heartbeat mechanism,
154    /// and spawns background tasks for reading messages and maintaining the connection.
155    /// If already connected, this method returns immediately without error.
156    ///
157    /// After connecting successfully, the client will automatically:
158    /// - Send periodic heartbeat messages
159    /// - Attempt reconnection if the connection drops (unless manually disconnected)
160    /// - Route incoming messages to subscribed channels
161    ///
162    /// # Errors
163    ///
164    /// Returns an error if:
165    /// - The WebSocket handshake fails
166    /// - The endpoint URL is invalid
167    /// - TLS/SSL negotiation fails
168    ///
169    /// # Example
170    ///
171    /// ```no_run
172    /// use supabase_realtime_rs::{RealtimeClient, RealtimeClientOptions};
173    ///
174    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
175    /// let client = RealtimeClient::new(
176    ///     "wss://your-project.supabase.co/realtime/v1",
177    ///     RealtimeClientOptions {
178    ///         api_key: "your-anon-key".to_string(),
179    ///         ..Default::default()
180    ///     }
181    /// )?;
182    ///
183    /// // Establish connection
184    /// client.connect().await?;
185    ///
186    /// // Now you can create channels and subscribe
187    /// # Ok(())
188    /// # }
189    /// ```
190    pub async fn connect(&self) -> Result<()> {
191        {
192            let state = self.connection.state().await;
193            if state == ConnectionState::Open || state == ConnectionState::Connecting {
194                return Ok(());
195            }
196        }
197        self.set_state(ConnectionState::Connecting).await;
198
199        // Build WebSocket URL with query parameters
200        let url = self.build_endpoint_url()?;
201        tracing::info!("Connecting to {}", &self.endpoint);
202
203        // Create WebSocket connection
204        let ws_stream = WebSocketFactory::create(url).await?;
205        let (write_half, mut read_half) = ws_stream.split();
206
207        // Give write half to ConnectionManager
208        self.connection.set_writer(write_half).await;
209
210        // Create message router with Arc to state
211        let state_for_router = Arc::clone(&self.state);
212        let router = MessageRouter::new_with_state(state_for_router);
213
214        // Spawn read task with router using TaskManager
215        let self_cloned = self.clone();
216        {
217            let mut state = self.state.write().await;
218            state.task_manager.spawn(async move {
219                tracing::info!("Starting read task");
220                while let Some(msg_result) = read_half.next().await {
221                    match msg_result {
222                        Ok(msg) => {
223                            use tokio_tungstenite::tungstenite::Message;
224
225                            match msg {
226                                Message::Text(text) => {
227                                    tracing::debug!("Received text message: {}", text);
228                                    match serde_json::from_str::<RealtimeMessage>(&text) {
229                                        Ok(realtime_msg) => {
230                                            tracing::debug!(
231                                                "Parsed message: topic={}, event={}, payload={}",
232                                                realtime_msg.topic,
233                                                realtime_msg.event.as_str(),
234                                                serde_json::to_string(&realtime_msg.payload)
235                                                    .unwrap_or_default()
236                                            );
237                                            router.route(realtime_msg).await;
238                                        }
239                                        Err(e) => {
240                                            tracing::error!(
241                                                "Failed to parse message: {} - Raw: {}",
242                                                e,
243                                                text
244                                            );
245                                        }
246                                    }
247                                }
248                                Message::Close(frame) => {
249                                    if let Some(close_frame) = frame {
250                                        tracing::error!(
251                                            "Server closed connection: code={:?}, reason='{}'",
252                                            close_frame.code,
253                                            close_frame.reason
254                                        );
255                                    } else {
256                                        tracing::warn!(
257                                            "Server closed connection without close frame"
258                                        );
259                                    }
260                                    self_cloned.set_state(ConnectionState::Closed).await;
261                                    break;
262                                }
263                                Message::Ping(data) => {
264                                    tracing::debug!("Received ping ({} bytes)", data.len());
265                                }
266                                Message::Pong(data) => {
267                                    tracing::debug!("Received pong ({} bytes)", data.len());
268                                }
269                                Message::Binary(data) => {
270                                    tracing::warn!(
271                                        "Received unexpected binary message ({} bytes)",
272                                        data.len()
273                                    );
274                                }
275                                Message::Frame(_) => {
276                                    tracing::debug!("Received raw frame (internal)");
277                                }
278                            }
279                        }
280                        Err(e) => {
281                            tracing::error!("WebSocket read error: {}", e);
282                            self_cloned.set_state(ConnectionState::Closed).await;
283                            break;
284                        }
285                    }
286                }
287                tracing::info!("Read task finished");
288            });
289        }
290
291        // Spawn heartbeat task using HeartbeatManager
292        let heartbeat_interval = self.options.heartbeat_interval.unwrap_or(25_000);
293
294        let heartbeat_manager =
295            HeartbeatManager::new(Arc::downgrade(&self.connection), Arc::clone(&self.state))
296                .with_interval(std::time::Duration::from_millis(heartbeat_interval));
297
298        heartbeat_manager.spawn_on(&self.state).await;
299
300        self.set_manual_disconnect(false).await;
301        self.set_state(ConnectionState::Open).await;
302
303        tracing::info!("Connected to WebSocket server");
304        Ok(())
305    }
306
307    /// Creates or retrieves a channel for real-time subscriptions.
308    ///
309    /// Channels are the primary way to subscribe to real-time events. Each channel is identified
310    /// by a unique topic string. If a channel with the given topic already exists, this method
311    /// returns the existing channel instead of creating a new one.
312    ///
313    /// # Arguments
314    ///
315    /// * `topic` - The channel topic (e.g., "room:lobby", "public:todos"). The "realtime:" prefix
316    ///   is automatically added.
317    /// * `options` - Configuration options for the channel (broadcast settings, presence key, etc.)
318    ///
319    /// # Returns
320    ///
321    /// Returns an `Arc<RealtimeChannel>` that can be used to subscribe to events, send broadcasts,
322    /// and track presence.
323    ///
324    /// # Example
325    ///
326    /// ```no_run
327    /// use supabase_realtime_rs::{RealtimeClient, RealtimeClientOptions, RealtimeChannelOptions};
328    ///
329    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
330    /// # let client = RealtimeClient::new(
331    /// #     "wss://your-project.supabase.co/realtime/v1",
332    /// #     RealtimeClientOptions {
333    /// #         api_key: "your-anon-key".to_string(),
334    /// #         ..Default::default()
335    /// #     }
336    /// # )?;
337    /// # client.connect().await?;
338    /// // Create a channel
339    /// let channel = client.channel("room:lobby", Default::default()).await;
340    ///
341    /// // Subscribe to receive events
342    /// channel.subscribe().await?;
343    /// # Ok(())
344    /// # }
345    /// ```
346    pub async fn channel(
347        &self,
348        topic: &str,
349        options: crate::channel::RealtimeChannelOptions,
350    ) -> Arc<RealtimeChannel> {
351        let full_topic = format!("realtime:{}", topic);
352
353        let state = self.state.read().await;
354        for existing_channel in state.channels.iter() {
355            if existing_channel.topic() == full_topic {
356                return Arc::clone(existing_channel);
357            }
358        }
359        drop(state);
360
361        let new_channel = Arc::new(RealtimeChannel::new(
362            full_topic,
363            Arc::new(self.clone()),
364            options,
365        ));
366        self.state
367            .write()
368            .await
369            .channels
370            .push(Arc::clone(&new_channel));
371
372        new_channel
373    }
374
375    /// Gracefully disconnects from the WebSocket server.
376    ///
377    /// This method closes the WebSocket connection, aborts all background tasks (heartbeat,
378    /// message reading), and marks the disconnect as manual. When disconnected manually, the
379    /// client will NOT attempt automatic reconnection.
380    ///
381    /// To reconnect after a manual disconnect, call [`connect()`](Self::connect) again.
382    ///
383    /// # Errors
384    ///
385    /// Returns an error if the WebSocket close handshake fails (rare).
386    ///
387    /// # Example
388    ///
389    /// ```no_run
390    /// use supabase_realtime_rs::{RealtimeClient, RealtimeClientOptions};
391    ///
392    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
393    /// # let client = RealtimeClient::new(
394    /// #     "wss://your-project.supabase.co/realtime/v1",
395    /// #     RealtimeClientOptions {
396    /// #         api_key: "your-anon-key".to_string(),
397    /// #         ..Default::default()
398    /// #     }
399    /// # )?;
400    /// # client.connect().await?;
401    /// // When done, disconnect gracefully
402    /// client.disconnect().await?;
403    /// # Ok(())
404    /// # }
405    /// ```
406    pub async fn disconnect(&self) -> Result<()> {
407        {
408            let state = self.connection.state().await;
409            if state == ConnectionState::Closed {
410                return Ok(());
411            }
412        }
413
414        self.set_manual_disconnect(true).await;
415        tracing::info!("Disconnecting from WebSocket server");
416
417        // Abort all tasks via TaskManager
418        {
419            let mut state = self.state.write().await;
420            state.task_manager.abort_all();
421            state.pending_heartbeat_ref = None;
422        }
423
424        // Close connection via ConnectionManager
425        self.connection.close().await?;
426
427        tracing::info!("Disconnected from WebSocket server");
428        Ok(())
429    }
430
431    /// Checks whether the client is currently connected to the server.
432    ///
433    /// # Returns
434    ///
435    /// Returns `true` if the WebSocket connection is open, `false` otherwise.
436    ///
437    /// # Example
438    ///
439    /// ```no_run
440    /// use supabase_realtime_rs::{RealtimeClient, RealtimeClientOptions};
441    ///
442    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
443    /// # let client = RealtimeClient::new(
444    /// #     "wss://your-project.supabase.co/realtime/v1",
445    /// #     RealtimeClientOptions {
446    /// #         api_key: "your-anon-key".to_string(),
447    /// #         ..Default::default()
448    /// #     }
449    /// # )?;
450    /// if !client.is_connected().await {
451    ///     client.connect().await?;
452    /// }
453    /// # Ok(())
454    /// # }
455    /// ```
456    pub async fn is_connected(&self) -> bool {
457        self.connection.is_connected().await
458    }
459
460    /// Build the WebSocket endpoint URL with query parameters
461    fn build_endpoint_url(&self) -> Result<String> {
462        let mut url = Url::parse(&self.endpoint)?;
463
464        // Add required query parameters
465        url.query_pairs_mut()
466            .append_pair("apikey", &self.options.api_key);
467
468        Ok(url.to_string())
469    }
470
471    /// Generate next message reference
472    pub async fn make_ref(&self) -> String {
473        let mut state = self.state.write().await;
474        state.make_ref()
475    }
476
477    /// Push a message to the server
478    pub async fn push(&self, message: RealtimeMessage) -> Result<()> {
479        if !self.is_connected().await {
480            return Err(RealtimeError::NotConnected);
481        }
482
483        self.connection.send_message(message).await?;
484        Ok(())
485    }
486
487    /// Get HTTP endpoint URL (for broadcasts)
488    pub fn http_endpoint(&self) -> String {
489        crate::infrastructure::ws_to_http_endpoint(&self.endpoint)
490    }
491
492    /// Get API key
493    pub fn api_key(&self) -> &str {
494        &self.options.api_key
495    }
496
497    /// Get access token
498    pub fn access_token(&self) -> Option<&str> {
499        self.options.access_token.as_deref()
500    }
501}