supabase_realtime_rs/client/
builder.rs

1use super::{ClientState, ConnectionManager, ConnectionState, RealtimeClient};
2use crate::types::{RealtimeError, Result};
3use std::sync::Arc;
4use tokio::sync::{RwLock, watch};
5
6#[derive(Debug, Clone, Default)]
7pub struct RealtimeClientOptions {
8    pub api_key: String,
9    pub timeout: Option<u64>,
10    pub heartbeat_interval: Option<u64>,
11    pub access_token: Option<String>,
12}
13
14/// Builder for RealtimeClient that handles initialization
15pub struct RealtimeClientBuilder {
16    endpoint: String,
17    options: RealtimeClientOptions,
18}
19
20impl RealtimeClientBuilder {
21    /// Create a new builder
22    pub fn new(endpoint: impl Into<String>, options: RealtimeClientOptions) -> Result<Self> {
23        let endpoint = endpoint.into();
24
25        // Validate API key is provided
26        if options.api_key.is_empty() {
27            return Err(RealtimeError::Auth("API key is required".to_string()));
28        }
29
30        Ok(Self { endpoint, options })
31    }
32
33    /// Build the client and spawn background tasks
34    pub fn build(self) -> RealtimeClient {
35        let mut client_state = ClientState::new();
36
37        // Initialize state watcher channel
38        let (state_tx, state_rx) = watch::channel((ConnectionState::Closed, false));
39        client_state.state_change_tx = Some(state_tx);
40
41        let client = RealtimeClient {
42            endpoint: self.endpoint,
43            options: self.options,
44            connection: Arc::new(ConnectionManager::new()),
45            state: Arc::new(RwLock::new(client_state)),
46        };
47
48        // Spawn reconnection watcher task
49        let client_for_watcher = client.clone();
50        tokio::spawn(async move {
51            let mut rx = state_rx;
52
53            while rx.changed().await.is_ok() {
54                let (state, was_manual) = *rx.borrow_and_update();
55
56                // Reconnect if closed/disconnected AND not manual
57                if matches!(state, ConnectionState::Closed) && !was_manual {
58                    tracing::info!("State watcher detected disconnect, attempting reconnection...");
59
60                    if let Err(e) = client_for_watcher.try_reconnect().await {
61                        tracing::error!("Reconnection watcher failed: {}", e);
62                    }
63                }
64            }
65            tracing::info!("Reconnection watcher task finished");
66        });
67
68        client
69    }
70}