supabase_realtime_rs/client/
builder.rs1use super::{ClientState, ConnectionManager, ConnectionState, RealtimeClient};
2use crate::types::{RealtimeError, Result};
3use std::sync::Arc;
4use tokio::sync::{RwLock, watch};
5
6#[derive(Debug, Clone, Default)]
7pub struct RealtimeClientOptions {
8 pub api_key: String,
9 pub timeout: Option<u64>,
10 pub heartbeat_interval: Option<u64>,
11 pub access_token: Option<String>,
12}
13
14pub struct RealtimeClientBuilder {
16 endpoint: String,
17 options: RealtimeClientOptions,
18}
19
20impl RealtimeClientBuilder {
21 pub fn new(endpoint: impl Into<String>, options: RealtimeClientOptions) -> Result<Self> {
23 let endpoint = endpoint.into();
24
25 if options.api_key.is_empty() {
27 return Err(RealtimeError::Auth("API key is required".to_string()));
28 }
29
30 Ok(Self { endpoint, options })
31 }
32
33 pub fn build(self) -> RealtimeClient {
35 let mut client_state = ClientState::new();
36
37 let (state_tx, state_rx) = watch::channel((ConnectionState::Closed, false));
39 client_state.state_change_tx = Some(state_tx);
40
41 let client = RealtimeClient {
42 endpoint: self.endpoint,
43 options: self.options,
44 connection: Arc::new(ConnectionManager::new()),
45 state: Arc::new(RwLock::new(client_state)),
46 };
47
48 let client_for_watcher = client.clone();
50 tokio::spawn(async move {
51 let mut rx = state_rx;
52
53 while rx.changed().await.is_ok() {
54 let (state, was_manual) = *rx.borrow_and_update();
55
56 if matches!(state, ConnectionState::Closed) && !was_manual {
58 tracing::info!("State watcher detected disconnect, attempting reconnection...");
59
60 if let Err(e) = client_for_watcher.try_reconnect().await {
61 tracing::error!("Reconnection watcher failed: {}", e);
62 }
63 }
64 }
65 tracing::info!("Reconnection watcher task finished");
66 });
67
68 client
69 }
70}