supabase_realtime_rs/client/core.rs
1use super::{
2 ClientState, ConnectionManager, ConnectionState, RealtimeClientBuilder, RealtimeClientOptions,
3};
4use crate::RealtimeChannel;
5use crate::infrastructure::{HeartbeatManager, Timer};
6use crate::messaging::MessageRouter;
7use crate::types::{RealtimeError, RealtimeMessage, Result};
8use crate::websocket::WebSocketFactory;
9use futures::stream::StreamExt;
10use std::sync::Arc;
11use tokio::sync::RwLock;
12use url::Url;
13
14/// The main entry point for interacting with Supabase Realtime.
15///
16/// `RealtimeClient` manages the WebSocket connection to Supabase Realtime servers,
17/// handles automatic reconnection with exponential backoff, and provides channel
18/// creation for real-time subscriptions.
19///
20/// # Example
21///
22/// ```no_run
23/// use supabase_realtime_rs::{RealtimeClient, RealtimeClientOptions};
24///
25/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
26/// let client = RealtimeClient::new(
27/// "wss://your-project.supabase.co/realtime/v1",
28/// RealtimeClientOptions {
29/// api_key: "your-anon-key".to_string(),
30/// ..Default::default()
31/// }
32/// )?;
33///
34/// client.connect().await?;
35/// // Use the client...
36/// client.disconnect().await?;
37/// # Ok(())
38/// # }
39/// ```
40#[derive(Clone)]
41pub struct RealtimeClient {
42 pub(crate) endpoint: String,
43 pub(crate) options: RealtimeClientOptions,
44
45 // Connection manager
46 pub(crate) connection: Arc<ConnectionManager>,
47
48 // Consolidated mutable state
49 pub(crate) state: Arc<RwLock<ClientState>>,
50}
51
52impl RealtimeClient {
53 /// Creates a new RealtimeClient instance.
54 ///
55 /// This initializes the client but does not establish a connection. You must call
56 /// [`connect()`](Self::connect) to establish the WebSocket connection.
57 ///
58 /// # Arguments
59 ///
60 /// * `endpoint` - The WebSocket endpoint URL (e.g., `wss://your-project.supabase.co/realtime/v1`)
61 /// * `options` - Configuration options including API key and optional settings
62 ///
63 /// # Returns
64 ///
65 /// Returns `Ok(RealtimeClient)` if the endpoint is valid, or an error if the URL is malformed.
66 ///
67 /// # Errors
68 ///
69 /// Returns [`RealtimeError::UrlParse`](crate::types::RealtimeError::UrlParse) if the endpoint URL cannot be parsed.
70 ///
71 /// # Example
72 ///
73 /// ```no_run
74 /// use supabase_realtime_rs::{RealtimeClient, RealtimeClientOptions};
75 ///
76 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
77 /// let client = RealtimeClient::new(
78 /// "wss://your-project.supabase.co/realtime/v1",
79 /// RealtimeClientOptions {
80 /// api_key: "your-anon-key".to_string(),
81 /// ..Default::default()
82 /// }
83 /// )?;
84 /// # Ok(())
85 /// # }
86 /// ```
87 pub fn new(endpoint: impl Into<String>, options: RealtimeClientOptions) -> Result<Self> {
88 RealtimeClientBuilder::new(endpoint, options).map(|builder| builder.build())
89 }
90
91 /// Set connection state and notify watchers
92 async fn set_state(&self, new_state: ConnectionState) {
93 self.connection.set_state(new_state).await;
94
95 let state = self.state.read().await;
96 state.notify_state_change(new_state, state.was_manual_disconnect);
97 }
98
99 /// Set manual disconnect flag and notify watchers
100 async fn set_manual_disconnect(&self, manual: bool) {
101 let mut state = self.state.write().await;
102 state.was_manual_disconnect = manual;
103
104 let conn_state = self.connection.state().await;
105 state.notify_state_change(conn_state, manual);
106 }
107
108 pub async fn resubscribe_all_channels(&self) -> Result<()> {
109 let channels = self.state.read().await.channels.clone();
110 for channel in channels.iter() {
111 if channel.was_joined().await {
112 channel.subscribe().await?;
113 }
114 }
115 Ok(())
116 }
117
118 pub async fn try_reconnect(&self) -> Result<()> {
119 if self.state.read().await.was_manual_disconnect {
120 tracing::info!("Manual disconnect detected, will not attempt to reconnect");
121 return Ok(());
122 }
123
124 let mut timer = Timer::default();
125 loop {
126 {
127 let state = self.connection.state().await;
128 if state == ConnectionState::Open || state == ConnectionState::Connecting {
129 tracing::info!(
130 "Already connected or connecting, stopping reconnection attempts"
131 );
132 break;
133 }
134 }
135
136 tracing::info!("Attempting to reconnect...");
137 match self.connect().await {
138 Ok(_) => {
139 tracing::info!("Reconnected successfully");
140 self.resubscribe_all_channels().await?;
141 break;
142 }
143 Err(e) => {
144 tracing::error!("Reconnection attempt failed: {}", e);
145 timer.schedule_timeout().await;
146 }
147 }
148 }
149 Ok(())
150 }
151 /// Establishes a WebSocket connection to the Supabase Realtime server.
152 ///
153 /// This method opens the WebSocket connection, starts the heartbeat mechanism,
154 /// and spawns background tasks for reading messages and maintaining the connection.
155 /// If already connected, this method returns immediately without error.
156 ///
157 /// After connecting successfully, the client will automatically:
158 /// - Send periodic heartbeat messages
159 /// - Attempt reconnection if the connection drops (unless manually disconnected)
160 /// - Route incoming messages to subscribed channels
161 ///
162 /// # Errors
163 ///
164 /// Returns an error if:
165 /// - The WebSocket handshake fails
166 /// - The endpoint URL is invalid
167 /// - TLS/SSL negotiation fails
168 ///
169 /// # Example
170 ///
171 /// ```no_run
172 /// use supabase_realtime_rs::{RealtimeClient, RealtimeClientOptions};
173 ///
174 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
175 /// let client = RealtimeClient::new(
176 /// "wss://your-project.supabase.co/realtime/v1",
177 /// RealtimeClientOptions {
178 /// api_key: "your-anon-key".to_string(),
179 /// ..Default::default()
180 /// }
181 /// )?;
182 ///
183 /// // Establish connection
184 /// client.connect().await?;
185 ///
186 /// // Now you can create channels and subscribe
187 /// # Ok(())
188 /// # }
189 /// ```
190 pub async fn connect(&self) -> Result<()> {
191 {
192 let state = self.connection.state().await;
193 if state == ConnectionState::Open || state == ConnectionState::Connecting {
194 return Ok(());
195 }
196 }
197 self.set_state(ConnectionState::Connecting).await;
198
199 // Build WebSocket URL with query parameters
200 let url = self.build_endpoint_url()?;
201 tracing::info!("Connecting to {}", &self.endpoint);
202
203 // Create WebSocket connection
204 let ws_stream = WebSocketFactory::create(url).await?;
205 let (write_half, mut read_half) = ws_stream.split();
206
207 // Give write half to ConnectionManager
208 self.connection.set_writer(write_half).await;
209
210 // Create message router with Arc to state
211 let state_for_router = Arc::clone(&self.state);
212 let router = MessageRouter::new_with_state(state_for_router);
213
214 // Spawn read task with router using TaskManager
215 let self_cloned = self.clone();
216 {
217 let mut state = self.state.write().await;
218 state.task_manager.spawn(async move {
219 tracing::info!("Starting read task");
220 while let Some(msg_result) = read_half.next().await {
221 match msg_result {
222 Ok(msg) => {
223 use tokio_tungstenite::tungstenite::Message;
224
225 match msg {
226 Message::Text(text) => {
227 tracing::debug!("Received text message: {}", text);
228 match serde_json::from_str::<RealtimeMessage>(&text) {
229 Ok(realtime_msg) => {
230 tracing::debug!(
231 "Parsed message: topic={}, event={}, payload={}",
232 realtime_msg.topic,
233 realtime_msg.event.as_str(),
234 serde_json::to_string(&realtime_msg.payload)
235 .unwrap_or_default()
236 );
237 router.route(realtime_msg).await;
238 }
239 Err(e) => {
240 tracing::error!(
241 "Failed to parse message: {} - Raw: {}",
242 e,
243 text
244 );
245 }
246 }
247 }
248 Message::Close(frame) => {
249 if let Some(close_frame) = frame {
250 tracing::error!(
251 "Server closed connection: code={:?}, reason='{}'",
252 close_frame.code,
253 close_frame.reason
254 );
255 } else {
256 tracing::warn!(
257 "Server closed connection without close frame"
258 );
259 }
260 self_cloned.set_state(ConnectionState::Closed).await;
261 break;
262 }
263 Message::Ping(data) => {
264 tracing::debug!("Received ping ({} bytes)", data.len());
265 }
266 Message::Pong(data) => {
267 tracing::debug!("Received pong ({} bytes)", data.len());
268 }
269 Message::Binary(data) => {
270 tracing::warn!(
271 "Received unexpected binary message ({} bytes)",
272 data.len()
273 );
274 }
275 Message::Frame(_) => {
276 tracing::debug!("Received raw frame (internal)");
277 }
278 }
279 }
280 Err(e) => {
281 tracing::error!("WebSocket read error: {}", e);
282 self_cloned.set_state(ConnectionState::Closed).await;
283 break;
284 }
285 }
286 }
287 tracing::info!("Read task finished");
288 });
289 }
290
291 // Spawn heartbeat task using HeartbeatManager
292 let heartbeat_interval = self.options.heartbeat_interval.unwrap_or(25_000);
293
294 let heartbeat_manager =
295 HeartbeatManager::new(Arc::downgrade(&self.connection), Arc::clone(&self.state))
296 .with_interval(std::time::Duration::from_millis(heartbeat_interval));
297
298 heartbeat_manager.spawn_on(&self.state).await;
299
300 self.set_manual_disconnect(false).await;
301 self.set_state(ConnectionState::Open).await;
302
303 tracing::info!("Connected to WebSocket server");
304 Ok(())
305 }
306
307 /// Creates or retrieves a channel for real-time subscriptions.
308 ///
309 /// Channels are the primary way to subscribe to real-time events. Each channel is identified
310 /// by a unique topic string. If a channel with the given topic already exists, this method
311 /// returns the existing channel instead of creating a new one.
312 ///
313 /// # Arguments
314 ///
315 /// * `topic` - The channel topic (e.g., "room:lobby", "public:todos"). The "realtime:" prefix
316 /// is automatically added.
317 /// * `options` - Configuration options for the channel (broadcast settings, presence key, etc.)
318 ///
319 /// # Returns
320 ///
321 /// Returns an `Arc<RealtimeChannel>` that can be used to subscribe to events, send broadcasts,
322 /// and track presence.
323 ///
324 /// # Example
325 ///
326 /// ```no_run
327 /// use supabase_realtime_rs::{RealtimeClient, RealtimeClientOptions, RealtimeChannelOptions};
328 ///
329 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
330 /// # let client = RealtimeClient::new(
331 /// # "wss://your-project.supabase.co/realtime/v1",
332 /// # RealtimeClientOptions {
333 /// # api_key: "your-anon-key".to_string(),
334 /// # ..Default::default()
335 /// # }
336 /// # )?;
337 /// # client.connect().await?;
338 /// // Create a channel
339 /// let channel = client.channel("room:lobby", Default::default()).await;
340 ///
341 /// // Subscribe to receive events
342 /// channel.subscribe().await?;
343 /// # Ok(())
344 /// # }
345 /// ```
346 pub async fn channel(
347 &self,
348 topic: &str,
349 options: crate::channel::RealtimeChannelOptions,
350 ) -> Arc<RealtimeChannel> {
351 let full_topic = format!("realtime:{}", topic);
352
353 let state = self.state.read().await;
354 for existing_channel in state.channels.iter() {
355 if existing_channel.topic() == full_topic {
356 return Arc::clone(existing_channel);
357 }
358 }
359 drop(state);
360
361 let new_channel = Arc::new(RealtimeChannel::new(
362 full_topic,
363 Arc::new(self.clone()),
364 options,
365 ));
366 self.state
367 .write()
368 .await
369 .channels
370 .push(Arc::clone(&new_channel));
371
372 new_channel
373 }
374
375 /// Gracefully disconnects from the WebSocket server.
376 ///
377 /// This method closes the WebSocket connection, aborts all background tasks (heartbeat,
378 /// message reading), and marks the disconnect as manual. When disconnected manually, the
379 /// client will NOT attempt automatic reconnection.
380 ///
381 /// To reconnect after a manual disconnect, call [`connect()`](Self::connect) again.
382 ///
383 /// # Errors
384 ///
385 /// Returns an error if the WebSocket close handshake fails (rare).
386 ///
387 /// # Example
388 ///
389 /// ```no_run
390 /// use supabase_realtime_rs::{RealtimeClient, RealtimeClientOptions};
391 ///
392 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
393 /// # let client = RealtimeClient::new(
394 /// # "wss://your-project.supabase.co/realtime/v1",
395 /// # RealtimeClientOptions {
396 /// # api_key: "your-anon-key".to_string(),
397 /// # ..Default::default()
398 /// # }
399 /// # )?;
400 /// # client.connect().await?;
401 /// // When done, disconnect gracefully
402 /// client.disconnect().await?;
403 /// # Ok(())
404 /// # }
405 /// ```
406 pub async fn disconnect(&self) -> Result<()> {
407 {
408 let state = self.connection.state().await;
409 if state == ConnectionState::Closed {
410 return Ok(());
411 }
412 }
413
414 self.set_manual_disconnect(true).await;
415 tracing::info!("Disconnecting from WebSocket server");
416
417 // Abort all tasks via TaskManager
418 {
419 let mut state = self.state.write().await;
420 state.task_manager.abort_all();
421 state.pending_heartbeat_ref = None;
422 }
423
424 // Close connection via ConnectionManager
425 self.connection.close().await?;
426
427 tracing::info!("Disconnected from WebSocket server");
428 Ok(())
429 }
430
431 /// Checks whether the client is currently connected to the server.
432 ///
433 /// # Returns
434 ///
435 /// Returns `true` if the WebSocket connection is open, `false` otherwise.
436 ///
437 /// # Example
438 ///
439 /// ```no_run
440 /// use supabase_realtime_rs::{RealtimeClient, RealtimeClientOptions};
441 ///
442 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
443 /// # let client = RealtimeClient::new(
444 /// # "wss://your-project.supabase.co/realtime/v1",
445 /// # RealtimeClientOptions {
446 /// # api_key: "your-anon-key".to_string(),
447 /// # ..Default::default()
448 /// # }
449 /// # )?;
450 /// if !client.is_connected().await {
451 /// client.connect().await?;
452 /// }
453 /// # Ok(())
454 /// # }
455 /// ```
456 pub async fn is_connected(&self) -> bool {
457 self.connection.is_connected().await
458 }
459
460 /// Build the WebSocket endpoint URL with query parameters
461 fn build_endpoint_url(&self) -> Result<String> {
462 let mut url = Url::parse(&self.endpoint)?;
463
464 // Add required query parameters
465 url.query_pairs_mut()
466 .append_pair("apikey", &self.options.api_key);
467
468 Ok(url.to_string())
469 }
470
471 /// Generate next message reference
472 pub async fn make_ref(&self) -> String {
473 let mut state = self.state.write().await;
474 state.make_ref()
475 }
476
477 /// Push a message to the server
478 pub async fn push(&self, message: RealtimeMessage) -> Result<()> {
479 if !self.is_connected().await {
480 return Err(RealtimeError::NotConnected);
481 }
482
483 self.connection.send_message(message).await?;
484 Ok(())
485 }
486
487 /// Get HTTP endpoint URL (for broadcasts)
488 pub fn http_endpoint(&self) -> String {
489 crate::infrastructure::ws_to_http_endpoint(&self.endpoint)
490 }
491
492 /// Get API key
493 pub fn api_key(&self) -> &str {
494 &self.options.api_key
495 }
496
497 /// Get access token
498 pub fn access_token(&self) -> Option<&str> {
499 self.options.access_token.as_deref()
500 }
501}