supabase/realtime/
mod.rs

1//! Realtime module for Supabase WebSocket subscriptions
2//!
3//! This module provides cross-platform WebSocket support using proper abstractions:
4//! - Native: Uses tokio-tungstenite with TLS support
5//! - WASM: Uses web-sys WebSocket API through the browser
6//!
7//! ## Usage
8//!
9//! ```rust,no_run
10//! use supabase::Client;
11//! use supabase::realtime::RealtimeEvent;
12//!
13//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
14//! let client = Client::new("your-url", "your-key")?;
15//! let realtime = client.realtime();
16//!
17//! // Connect to realtime
18//! realtime.connect().await?;
19//!
20//! // Subscribe to table changes
21//! let subscription_id = realtime
22//!     .channel("posts")
23//!     .table("posts")
24//!     .event(RealtimeEvent::All)
25//!     .subscribe(|message| {
26//!         println!("Received update: {:?}", message);
27//!     })
28//!     .await?;
29//!
30//! // Later, unsubscribe
31//! realtime.unsubscribe(&subscription_id).await?;
32//! # Ok(())
33//! # }
34//! ```
35
36#[cfg(feature = "realtime")]
37use crate::{
38    async_runtime::{AsyncLock, RuntimeLock},
39    error::{Error, Result},
40    types::SupabaseConfig,
41    websocket::{create_websocket, WebSocketConnection},
42};
43
44#[cfg(feature = "realtime")]
45use serde::{Deserialize, Serialize};
46
47#[cfg(feature = "realtime")]
48use std::{
49    collections::HashMap,
50    sync::{
51        atomic::{AtomicBool, AtomicU64, Ordering},
52        Arc,
53    },
54    time::Duration,
55};
56
57#[cfg(feature = "realtime")]
58use tracing::{debug, error, info, warn};
59
60#[cfg(feature = "realtime")]
61use uuid::Uuid;
62
63/// Type alias for complex connection storage
64#[cfg(feature = "realtime")]
65pub type ConnectionStorage = Arc<RuntimeLock<Vec<Option<Box<dyn WebSocketConnection>>>>>;
66
67/// Realtime client for WebSocket subscriptions
68///
69/// Provides cross-platform realtime subscriptions to Supabase database changes.
70///
71/// # Examples
72///
73/// ## Basic subscription
74/// ```rust,no_run
75/// use supabase::{Client, realtime::RealtimeEvent};
76///
77/// # async fn example() -> supabase::Result<()> {
78/// let client = Client::new("your-url", "your-key")?;
79/// let realtime = client.realtime();
80///
81/// realtime.connect().await?;
82///
83/// let sub_id = realtime
84///     .channel("public-posts")
85///     .table("posts")
86///     .subscribe(|msg| println!("New post: {:?}", msg))
87///     .await?;
88/// # Ok(())
89/// # }
90/// ```
91#[cfg(feature = "realtime")]
92#[derive(Debug, Clone)]
93pub struct Realtime {
94    connection_manager: Arc<ConnectionManager>,
95    message_loop_handle: Arc<AtomicBool>,
96}
97
98/// Connection manager for WebSocket connections
99#[cfg(feature = "realtime")]
100struct ConnectionManager {
101    url: String,
102    api_key: String,
103    connection: RuntimeLock<Option<Box<dyn WebSocketConnection>>>,
104    ref_counter: AtomicU64,
105    subscriptions: RuntimeLock<HashMap<String, Subscription>>,
106    is_message_loop_running: AtomicBool,
107}
108
109#[cfg(feature = "realtime")]
110impl std::fmt::Debug for ConnectionManager {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        f.debug_struct("ConnectionManager")
113            .field("url", &self.url)
114            .field("api_key", &"[REDACTED]")
115            .field("ref_counter", &self.ref_counter)
116            .field("connection", &"<WebSocket connection>")
117            .field("subscriptions", &"<subscriptions>")
118            .finish()
119    }
120}
121
122/// Subscription information
123#[cfg(feature = "realtime")]
124#[derive(Clone)]
125pub struct Subscription {
126    pub id: String,
127    pub topic: String,
128    pub config: SubscriptionConfig,
129    #[cfg(not(target_arch = "wasm32"))]
130    pub callback: Arc<dyn Fn(RealtimeMessage) + Send + Sync>,
131    #[cfg(target_arch = "wasm32")]
132    pub callback: Arc<dyn Fn(RealtimeMessage)>,
133}
134
135#[cfg(feature = "realtime")]
136impl std::fmt::Debug for Subscription {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        f.debug_struct("Subscription")
139            .field("id", &self.id)
140            .field("topic", &self.topic)
141            .field("config", &self.config)
142            .field("callback", &"<callback fn>")
143            .finish()
144    }
145}
146
147/// Configuration for subscriptions
148///
149/// # Examples
150/// ```rust
151/// use supabase::realtime::{SubscriptionConfig, RealtimeEvent, AdvancedFilter, FilterOperator};
152/// use std::collections::HashMap;
153///
154/// let config = SubscriptionConfig {
155///     table: Some("posts".to_string()),
156///     schema: "public".to_string(),
157///     event: Some(RealtimeEvent::Insert),
158///     filter: Some("author_id=eq.123".to_string()),
159///     advanced_filters: vec![
160///         AdvancedFilter {
161///             column: "status".to_string(),
162///             operator: FilterOperator::Equal,
163///             value: serde_json::Value::String("published".to_string()),
164///         }
165///     ],
166///     enable_presence: false,
167///     enable_broadcast: false,
168///     presence_callback: None,
169///     broadcast_callback: None,
170/// };
171/// ```
172#[cfg(feature = "realtime")]
173#[derive(Clone)]
174pub struct SubscriptionConfig {
175    pub table: Option<String>,
176    pub schema: String,
177    pub event: Option<RealtimeEvent>,
178    pub filter: Option<String>,
179    pub advanced_filters: Vec<AdvancedFilter>,
180    pub enable_presence: bool,
181    pub enable_broadcast: bool,
182    #[cfg(not(target_arch = "wasm32"))]
183    pub presence_callback: Option<PresenceCallback>,
184    #[cfg(target_arch = "wasm32")]
185    pub presence_callback: Option<Arc<dyn Fn(PresenceEvent)>>,
186    #[cfg(not(target_arch = "wasm32"))]
187    pub broadcast_callback: Option<BroadcastCallback>,
188    #[cfg(target_arch = "wasm32")]
189    pub broadcast_callback: Option<Arc<dyn Fn(BroadcastMessage)>>,
190}
191
192#[cfg(feature = "realtime")]
193impl std::fmt::Debug for SubscriptionConfig {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        f.debug_struct("SubscriptionConfig")
196            .field("table", &self.table)
197            .field("schema", &self.schema)
198            .field("event", &self.event)
199            .field("filter", &self.filter)
200            .field("advanced_filters", &self.advanced_filters)
201            .field("enable_presence", &self.enable_presence)
202            .field("enable_broadcast", &self.enable_broadcast)
203            .field("presence_callback", &"<callback fn>")
204            .field("broadcast_callback", &"<callback fn>")
205            .finish()
206    }
207}
208
209#[cfg(feature = "realtime")]
210impl Default for SubscriptionConfig {
211    fn default() -> Self {
212        Self {
213            table: None,
214            schema: "public".to_string(),
215            event: None,
216            filter: None,
217            advanced_filters: Vec::new(),
218            enable_presence: false,
219            enable_broadcast: false,
220            presence_callback: None,
221            broadcast_callback: None,
222        }
223    }
224}
225
226/// Realtime event types for filtering subscriptions
227///
228/// # Examples
229/// ```rust
230/// use supabase::realtime::RealtimeEvent;
231///
232/// // Listen to all events
233/// let all_events = RealtimeEvent::All;
234///
235/// // Listen only to inserts
236/// let inserts_only = RealtimeEvent::Insert;
237/// ```
238#[cfg(feature = "realtime")]
239#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
240pub enum RealtimeEvent {
241    #[serde(rename = "INSERT")]
242    Insert,
243    #[serde(rename = "UPDATE")]
244    Update,
245    #[serde(rename = "DELETE")]
246    Delete,
247    #[serde(rename = "*")]
248    All,
249}
250
251/// Realtime message received from Supabase
252///
253/// Contains the event data and metadata about the database change.
254#[cfg(feature = "realtime")]
255#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct RealtimeMessage {
257    pub event: String,
258    pub payload: RealtimePayload,
259    pub ref_id: Option<String>,
260    pub topic: String,
261}
262
263/// Payload of a realtime message
264///
265/// Contains the actual data from database changes.
266#[cfg(feature = "realtime")]
267#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct RealtimePayload {
269    pub record: Option<serde_json::Value>,
270    pub old_record: Option<serde_json::Value>,
271    pub schema: Option<String>,
272    pub table: Option<String>,
273    pub commit_timestamp: Option<String>,
274    pub event_type: Option<String>,
275    pub new: Option<serde_json::Value>,
276    pub old: Option<serde_json::Value>,
277}
278
279/// Supabase realtime protocol message for sending to server
280#[cfg(feature = "realtime")]
281#[derive(Debug, Serialize)]
282struct RealtimeProtocolMessage {
283    topic: String,
284    event: String,
285    payload: serde_json::Value,
286    ref_id: String,
287}
288
289/// Presence state for user tracking
290#[cfg(feature = "realtime")]
291#[derive(Debug, Clone, Serialize, Deserialize)]
292pub struct PresenceState {
293    pub user_id: String,
294    pub online_at: String,
295    pub metadata: Option<HashMap<String, serde_json::Value>>,
296}
297
298/// Presence event for tracking user joins/leaves
299#[cfg(feature = "realtime")]
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct PresenceEvent {
302    pub event_type: PresenceEventType,
303    pub user_id: String,
304    pub presence_state: PresenceState,
305}
306
307/// Types of presence events
308#[cfg(feature = "realtime")]
309#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
310pub enum PresenceEventType {
311    #[serde(rename = "presence_state")]
312    Join,
313    #[serde(rename = "presence_diff")]
314    Leave,
315}
316
317/// Callback for presence events
318#[cfg(feature = "realtime")]
319pub type PresenceCallback = Arc<dyn Fn(PresenceEvent) + Send + Sync>;
320
321/// Broadcast message for cross-client communication
322#[cfg(feature = "realtime")]
323#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct BroadcastMessage {
325    pub event: String,
326    pub payload: serde_json::Value,
327    pub from_user_id: Option<String>,
328    pub timestamp: String,
329}
330
331/// Callback for broadcast messages
332#[cfg(feature = "realtime")]
333pub type BroadcastCallback = Arc<dyn Fn(BroadcastMessage) + Send + Sync>;
334
335/// Advanced filter configuration
336#[cfg(feature = "realtime")]
337#[derive(Debug, Clone)]
338pub struct AdvancedFilter {
339    pub column: String,
340    pub operator: FilterOperator,
341    pub value: serde_json::Value,
342}
343
344/// Filter operators for advanced filtering
345#[cfg(feature = "realtime")]
346#[derive(Debug, Clone, Serialize, Deserialize)]
347pub enum FilterOperator {
348    #[serde(rename = "eq")]
349    Equal,
350    #[serde(rename = "neq")]
351    NotEqual,
352    #[serde(rename = "gt")]
353    GreaterThan,
354    #[serde(rename = "gte")]
355    GreaterThanOrEqual,
356    #[serde(rename = "lt")]
357    LessThan,
358    #[serde(rename = "lte")]
359    LessThanOrEqual,
360    #[serde(rename = "in")]
361    In,
362    #[serde(rename = "is")]
363    Is,
364    #[serde(rename = "like")]
365    Like,
366    #[serde(rename = "ilike")]
367    ILike,
368    #[serde(rename = "match")]
369    Match,
370    #[serde(rename = "imatch")]
371    IMatch,
372}
373
374/// Connection pool configuration
375#[cfg(feature = "realtime")]
376#[derive(Debug, Clone)]
377pub struct ConnectionPoolConfig {
378    /// Maximum number of connections in pool (default: 10)
379    pub max_connections: usize,
380    /// Connection timeout in seconds (default: 30)
381    pub connection_timeout: u64,
382    /// Keep-alive interval in seconds (default: 30)
383    pub keep_alive_interval: u64,
384    /// Reconnect delay in milliseconds (default: 1000)
385    pub reconnect_delay: u64,
386    /// Maximum reconnect attempts (default: 5)
387    pub max_reconnect_attempts: u32,
388}
389
390impl Default for ConnectionPoolConfig {
391    fn default() -> Self {
392        Self {
393            max_connections: 10,
394            connection_timeout: 30,
395            keep_alive_interval: 30,
396            reconnect_delay: 1000,
397            max_reconnect_attempts: 5,
398        }
399    }
400}
401
402/// Connection pool for efficient WebSocket management
403#[cfg(feature = "realtime")]
404pub struct ConnectionPool {
405    config: ConnectionPoolConfig,
406    connections: ConnectionStorage,
407    active_connections: Arc<AtomicU64>,
408}
409
410#[cfg(feature = "realtime")]
411impl std::fmt::Debug for ConnectionPool {
412    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
413        f.debug_struct("ConnectionPool")
414            .field("config", &self.config)
415            .field("active_connections", &self.active_connections)
416            .finish()
417    }
418}
419
420#[cfg(feature = "realtime")]
421impl ConnectionPool {
422    /// Create a new connection pool
423    pub fn new(config: ConnectionPoolConfig) -> Self {
424        let mut connections = Vec::new();
425        connections.resize_with(config.max_connections, || None);
426
427        Self {
428            config,
429            connections: Arc::new(RuntimeLock::new(connections)),
430            active_connections: Arc::new(AtomicU64::new(0)),
431        }
432    }
433
434    /// Get an available connection from the pool
435    pub async fn get_connection(&self) -> Result<Option<Box<dyn WebSocketConnection>>> {
436        let mut connections = self.connections.write().await;
437
438        for connection_slot in connections.iter_mut() {
439            if let Some(connection) = connection_slot.take() {
440                if connection.is_connected() {
441                    debug!("Reusing existing connection from pool");
442                    return Ok(Some(connection));
443                }
444            }
445        }
446
447        // No available connections, try to create a new one
448        for connection_slot in connections.iter_mut() {
449            if connection_slot.is_none() {
450                let new_connection = crate::websocket::create_websocket();
451                *connection_slot = Some(new_connection);
452                self.active_connections
453                    .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
454                debug!("Created new connection in pool");
455                return Ok(connection_slot.take());
456            }
457        }
458
459        debug!("Connection pool is full");
460        Ok(None)
461    }
462
463    /// Return a connection to the pool
464    pub async fn return_connection(&self, connection: Box<dyn WebSocketConnection>) {
465        let mut connections = self.connections.write().await;
466
467        for connection_slot in connections.iter_mut() {
468            if connection_slot.is_none() {
469                *connection_slot = Some(connection);
470                debug!("Returned connection to pool");
471                return;
472            }
473        }
474
475        // Pool is full, close the connection
476        warn!("Pool is full, dropping connection");
477    }
478
479    /// Get pool statistics
480    pub async fn get_stats(&self) -> ConnectionPoolStats {
481        let connections = self.connections.read().await;
482        let total = connections.len();
483        let active = connections.iter().filter(|c| c.is_some()).count();
484        let available = connections
485            .iter()
486            .filter(|c| c.as_ref().is_some_and(|conn| conn.is_connected()))
487            .count();
488
489        ConnectionPoolStats {
490            total_connections: total,
491            active_connections: active,
492            available_connections: available,
493            max_connections: self.config.max_connections,
494        }
495    }
496
497    /// Close all connections in the pool
498    pub async fn close_all(&self) -> Result<()> {
499        let mut connections = self.connections.write().await;
500
501        for connection_slot in connections.iter_mut() {
502            if let Some(mut connection) = connection_slot.take() {
503                connection.close().await?;
504            }
505        }
506
507        self.active_connections
508            .store(0, std::sync::atomic::Ordering::SeqCst);
509        info!("Closed all connections in pool");
510        Ok(())
511    }
512}
513
514/// Statistics about the connection pool
515#[cfg(feature = "realtime")]
516#[derive(Debug, Clone)]
517pub struct ConnectionPoolStats {
518    pub total_connections: usize,
519    pub active_connections: usize,
520    pub available_connections: usize,
521    pub max_connections: usize,
522}
523
524#[cfg(feature = "realtime")]
525impl Realtime {
526    /// Create a new realtime client (works on both native and WASM)
527    ///
528    /// # Examples
529    /// ```rust
530    /// use supabase::types::SupabaseConfig;
531    /// use supabase::realtime::Realtime;
532    /// use std::sync::Arc;
533    ///
534    /// let config = Arc::new(SupabaseConfig {
535    ///     url: "https://your-project.supabase.co".to_string(),
536    ///     key: "your-anon-key".to_string(),
537    ///     ..Default::default()
538    /// });
539    ///
540    /// let realtime = Realtime::new(config).unwrap();
541    /// ```
542    pub fn new(config: Arc<SupabaseConfig>) -> Result<Self> {
543        debug!("Creating realtime client");
544
545        let ws_url = config
546            .url
547            .replace("http://", "ws://")
548            .replace("https://", "wss://");
549        let realtime_url = format!("{}/realtime/v1/websocket", ws_url);
550
551        let connection_manager = Arc::new(ConnectionManager {
552            url: realtime_url,
553            api_key: config.key.clone(),
554            connection: RuntimeLock::new(None),
555            ref_counter: AtomicU64::new(0),
556            subscriptions: RuntimeLock::new(HashMap::new()),
557            is_message_loop_running: AtomicBool::new(false),
558        });
559
560        let message_loop_handle = Arc::new(AtomicBool::new(false));
561
562        Ok(Self {
563            connection_manager,
564            message_loop_handle,
565        })
566    }
567
568    /// Connect to the realtime server (cross-platform)
569    ///
570    /// # Examples
571    /// ```rust,no_run
572    /// # use supabase::Client;
573    /// # async fn example() -> supabase::Result<()> {
574    /// let client = Client::new("your-url", "your-key")?;
575    /// let realtime = client.realtime();
576    ///
577    /// realtime.connect().await?;
578    /// println!("Connected to Supabase realtime!");
579    /// # Ok(())
580    /// # }
581    /// ```
582    pub async fn connect(&self) -> Result<()> {
583        debug!("Connecting to realtime server");
584
585        let mut connection_guard = self.connection_manager.connection.write().await;
586
587        if let Some(ref conn) = *connection_guard {
588            if conn.is_connected() {
589                debug!("Already connected to realtime server");
590                return Ok(());
591            }
592        }
593
594        let mut connection = create_websocket();
595        let url = format!(
596            "{}?apikey={}&vsn=1.0.0",
597            self.connection_manager.url, self.connection_manager.api_key
598        );
599
600        connection.connect(&url).await?;
601        *connection_guard = Some(connection);
602
603        // Start message loop
604        self.start_message_loop().await?;
605
606        info!("Connected to realtime server");
607        Ok(())
608    }
609
610    /// Disconnect from the realtime server
611    ///
612    /// # Examples
613    /// ```rust,no_run
614    /// # use supabase::Client;
615    /// # async fn example() -> supabase::Result<()> {
616    /// let client = Client::new("your-url", "your-key")?;
617    /// let realtime = client.realtime();
618    ///
619    /// realtime.connect().await?;
620    /// // ... do work ...
621    /// realtime.disconnect().await?;
622    /// # Ok(())
623    /// # }
624    /// ```
625    pub async fn disconnect(&self) -> Result<()> {
626        debug!("Disconnecting from realtime server");
627
628        // Stop message loop
629        self.message_loop_handle.store(false, Ordering::SeqCst);
630        self.connection_manager
631            .is_message_loop_running
632            .store(false, Ordering::SeqCst);
633
634        let mut connection_guard = self.connection_manager.connection.write().await;
635        if let Some(ref mut connection) = *connection_guard {
636            connection.close().await?;
637        }
638        *connection_guard = None;
639
640        // Clear all subscriptions
641        let mut subscriptions = self.connection_manager.subscriptions.write().await;
642        subscriptions.clear();
643
644        info!("Disconnected from realtime server");
645        Ok(())
646    }
647
648    /// Check if connected to realtime server
649    ///
650    /// # Examples
651    /// ```rust,no_run
652    /// # use supabase::Client;
653    /// # async fn example() -> supabase::Result<()> {
654    /// let client = Client::new("your-url", "your-key")?;
655    /// let realtime = client.realtime();
656    ///
657    /// if !realtime.is_connected().await {
658    ///     realtime.connect().await?;
659    /// }
660    /// # Ok(())
661    /// # }
662    /// ```
663    pub async fn is_connected(&self) -> bool {
664        let connection_guard = self.connection_manager.connection.read().await;
665        if let Some(ref conn) = *connection_guard {
666            conn.is_connected()
667        } else {
668            false
669        }
670    }
671
672    /// Create a channel subscription builder
673    ///
674    /// # Examples
675    /// ```rust,no_run
676    /// # use supabase::Client;
677    /// # async fn example() -> supabase::Result<()> {
678    /// let client = Client::new("your-url", "your-key")?;
679    ///
680    /// let subscription = client.realtime()
681    ///     .channel("public-posts")
682    ///     .table("posts")
683    ///     .subscribe(|msg| println!("Update: {:?}", msg))
684    ///     .await?;
685    /// # Ok(())
686    /// # }
687    /// ```
688    pub fn channel(&self, _topic: &str) -> ChannelBuilder {
689        ChannelBuilder {
690            realtime: self.clone(),
691            config: SubscriptionConfig::default(),
692        }
693    }
694
695    /// Unsubscribe from a channel
696    ///
697    /// # Examples
698    /// ```rust,no_run
699    /// # use supabase::Client;
700    /// # async fn example() -> supabase::Result<()> {
701    /// let client = Client::new("your-url", "your-key")?;
702    /// let realtime = client.realtime();
703    ///
704    /// let subscription_id = realtime
705    ///     .channel("posts")
706    ///     .table("posts")
707    ///     .subscribe(|_| {})
708    ///     .await?;
709    ///
710    /// // Later...
711    /// realtime.unsubscribe(&subscription_id).await?;
712    /// # Ok(())
713    /// # }
714    /// ```
715    pub async fn unsubscribe(&self, subscription_id: &str) -> Result<()> {
716        debug!("Unsubscribing from subscription: {}", subscription_id);
717
718        let mut subscriptions = self.connection_manager.subscriptions.write().await;
719        if let Some(subscription) = subscriptions.remove(subscription_id) {
720            // Send leave message to server
721            self.send_leave_message(&subscription.topic).await?;
722            info!("Unsubscribed from subscription: {}", subscription_id);
723        } else {
724            warn!("Subscription {} not found for unsubscribe", subscription_id);
725        }
726
727        Ok(())
728    }
729
730    /// Subscribe to a channel with custom configuration
731    #[cfg(not(target_arch = "wasm32"))]
732    pub async fn subscribe<F>(
733        &self,
734        subscription_config: SubscriptionConfig,
735        callback: F,
736    ) -> Result<String>
737    where
738        F: Fn(RealtimeMessage) + Send + Sync + 'static,
739    {
740        let subscription_id = Uuid::new_v4().to_string();
741        let topic = self.build_topic(&subscription_config);
742
743        debug!(
744            "Creating subscription {} for topic {}",
745            subscription_id, topic
746        );
747
748        // Ensure we're connected
749        self.connect().await?;
750
751        // Send join message to server
752        self.send_join_message(&topic, &subscription_config).await?;
753
754        // Store subscription
755        let subscription = Subscription {
756            id: subscription_id.clone(),
757            topic: topic.clone(),
758            config: subscription_config,
759            callback: Arc::new(callback),
760        };
761
762        let mut subscriptions = self.connection_manager.subscriptions.write().await;
763        subscriptions.insert(subscription_id.clone(), subscription);
764
765        info!("Subscribed to topic {} with ID {}", topic, subscription_id);
766        Ok(subscription_id)
767    }
768
769    /// Subscribe to a channel with custom configuration (WASM version)
770    #[cfg(target_arch = "wasm32")]
771    pub async fn subscribe<F>(
772        &self,
773        subscription_config: SubscriptionConfig,
774        callback: F,
775    ) -> Result<String>
776    where
777        F: Fn(RealtimeMessage) + 'static,
778    {
779        let subscription_id = Uuid::new_v4().to_string();
780        let topic = self.build_topic(&subscription_config);
781
782        debug!(
783            "Creating subscription {} for topic {}",
784            subscription_id, topic
785        );
786
787        // Ensure we're connected
788        self.connect().await?;
789
790        // Send join message to server
791        self.send_join_message(&topic, &subscription_config).await?;
792
793        // Store subscription
794        let subscription = Subscription {
795            id: subscription_id.clone(),
796            topic: topic.clone(),
797            config: subscription_config,
798            callback: Arc::new(callback),
799        };
800
801        let mut subscriptions = self.connection_manager.subscriptions.write().await;
802        subscriptions.insert(subscription_id.clone(), subscription);
803
804        info!("Subscribed to topic {} with ID {}", topic, subscription_id);
805        Ok(subscription_id)
806    }
807
808    /// Build topic string from subscription config
809    fn build_topic(&self, config: &SubscriptionConfig) -> String {
810        if let Some(ref table) = config.table {
811            format!("realtime:{}:{}", config.schema, table)
812        } else {
813            format!("realtime:{}", config.schema)
814        }
815    }
816
817    /// Send join message to Supabase realtime server
818    async fn send_join_message(&self, topic: &str, config: &SubscriptionConfig) -> Result<()> {
819        let mut payload = serde_json::Map::new();
820
821        if let Some(ref table) = config.table {
822            payload.insert(
823                "table".to_string(),
824                serde_json::Value::String(table.clone()),
825            );
826        }
827
828        if let Some(ref event) = config.event {
829            let event_str = match event {
830                RealtimeEvent::Insert => "INSERT",
831                RealtimeEvent::Update => "UPDATE",
832                RealtimeEvent::Delete => "DELETE",
833                RealtimeEvent::All => "*",
834            };
835            payload.insert(
836                "event".to_string(),
837                serde_json::Value::String(event_str.to_string()),
838            );
839        }
840
841        if let Some(ref filter) = config.filter {
842            payload.insert(
843                "filter".to_string(),
844                serde_json::Value::String(filter.clone()),
845            );
846        }
847
848        let message = RealtimeProtocolMessage {
849            topic: topic.to_string(),
850            event: "phx_join".to_string(),
851            payload: serde_json::Value::Object(payload),
852            ref_id: Uuid::new_v4().to_string(),
853        };
854
855        self.send_message(&message).await
856    }
857
858    /// Send leave message to Supabase realtime server
859    async fn send_leave_message(&self, topic: &str) -> Result<()> {
860        let message = RealtimeProtocolMessage {
861            topic: topic.to_string(),
862            event: "phx_leave".to_string(),
863            payload: serde_json::Value::Object(serde_json::Map::new()),
864            ref_id: Uuid::new_v4().to_string(),
865        };
866
867        self.send_message(&message).await
868    }
869
870    /// Send message through WebSocket
871    async fn send_message(&self, message: &RealtimeProtocolMessage) -> Result<()> {
872        let message_json = serde_json::to_string(message)?;
873
874        let mut connection_guard = self.connection_manager.connection.write().await;
875        if let Some(ref mut connection) = *connection_guard {
876            connection.send(&message_json).await?;
877            debug!("Sent realtime message: {}", message_json);
878        } else {
879            return Err(Error::realtime("Not connected to realtime server"));
880        }
881
882        Ok(())
883    }
884
885    /// Start the message processing loop
886    async fn start_message_loop(&self) -> Result<()> {
887        if self
888            .connection_manager
889            .is_message_loop_running
890            .load(Ordering::SeqCst)
891        {
892            debug!("Message loop already running");
893            return Ok(());
894        }
895
896        self.connection_manager
897            .is_message_loop_running
898            .store(true, Ordering::SeqCst);
899        self.message_loop_handle.store(true, Ordering::SeqCst);
900
901        let connection_manager = Arc::clone(&self.connection_manager);
902        let loop_handle = Arc::clone(&self.message_loop_handle);
903
904        // Spawn the message loop
905        #[cfg(not(target_arch = "wasm32"))]
906        {
907            let connection_manager = Arc::clone(&connection_manager);
908            let loop_handle = Arc::clone(&loop_handle);
909
910            tokio::spawn(async move {
911                Self::message_loop(connection_manager, loop_handle).await;
912            });
913        }
914
915        #[cfg(target_arch = "wasm32")]
916        {
917            // For WASM, we'll use a simple polling approach
918            wasm_bindgen_futures::spawn_local(async move {
919                Self::message_loop(connection_manager, loop_handle).await;
920            });
921        }
922
923        info!("Started realtime message loop");
924        Ok(())
925    }
926
927    /// Main message processing loop
928    async fn message_loop(
929        connection_manager: Arc<ConnectionManager>,
930        loop_handle: Arc<AtomicBool>,
931    ) {
932        debug!("Starting realtime message loop");
933
934        while loop_handle.load(Ordering::SeqCst) {
935            // Try to receive messages
936            let message = {
937                let mut connection_guard = connection_manager.connection.write().await;
938
939                if let Some(ref mut connection) = *connection_guard {
940                    if !connection.is_connected() {
941                        debug!("Connection lost, stopping message loop");
942                        break;
943                    }
944
945                    match connection.receive().await {
946                        Ok(Some(msg)) => Some(msg),
947                        Ok(None) => None,
948                        Err(e) => {
949                            error!("Error receiving message: {}", e);
950                            None
951                        }
952                    }
953                } else {
954                    debug!("No connection available, stopping message loop");
955                    break;
956                }
957            };
958
959            if let Some(message_str) = message {
960                debug!("Received realtime message: {}", message_str);
961
962                // Parse the message
963                match serde_json::from_str::<RealtimeMessage>(&message_str) {
964                    Ok(realtime_message) => {
965                        // Process the message
966                        Self::process_message(&connection_manager, realtime_message).await;
967                    }
968                    Err(e) => {
969                        debug!(
970                            "Failed to parse realtime message: {} - Error: {}",
971                            message_str, e
972                        );
973                        // Try to parse as protocol message (join/leave responses, etc.)
974                        if let Ok(_protocol_msg) =
975                            serde_json::from_str::<serde_json::Value>(&message_str)
976                        {
977                            debug!("Received protocol message, ignoring for now");
978                        }
979                    }
980                }
981            }
982
983            // Small delay to prevent busy waiting
984            #[cfg(not(target_arch = "wasm32"))]
985            tokio::time::sleep(Duration::from_millis(10)).await;
986
987            #[cfg(target_arch = "wasm32")]
988            {
989                // For WASM, use a simple promise-based delay
990                use wasm_bindgen::prelude::*;
991                use wasm_bindgen_futures::JsFuture;
992
993                let promise = js_sys::Promise::new(&mut |resolve, _| {
994                    web_sys::window()
995                        .unwrap()
996                        .set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 10)
997                        .unwrap();
998                });
999                let _ = JsFuture::from(promise).await;
1000            }
1001        }
1002
1003        connection_manager
1004            .is_message_loop_running
1005            .store(false, Ordering::SeqCst);
1006        debug!("Realtime message loop stopped");
1007    }
1008
1009    /// Process incoming realtime message
1010    async fn process_message(
1011        connection_manager: &Arc<ConnectionManager>,
1012        message: RealtimeMessage,
1013    ) {
1014        debug!("Processing message for topic: {}", message.topic);
1015
1016        let subscriptions = connection_manager.subscriptions.read().await;
1017
1018        let mut matched_subscriptions = Vec::new();
1019
1020        // Find matching subscriptions
1021        for subscription in subscriptions.values() {
1022            if Self::topic_matches(&subscription.topic, &message.topic) {
1023                // Check event filter
1024                if let Some(ref event_filter) = subscription.config.event {
1025                    let message_event = match message.event.as_str() {
1026                        "INSERT" => Some(RealtimeEvent::Insert),
1027                        "UPDATE" => Some(RealtimeEvent::Update),
1028                        "DELETE" => Some(RealtimeEvent::Delete),
1029                        _ => None,
1030                    };
1031
1032                    if let Some(msg_event) = message_event {
1033                        if *event_filter != RealtimeEvent::All && *event_filter != msg_event {
1034                            continue; // Skip if event doesn't match
1035                        }
1036                    }
1037                }
1038
1039                matched_subscriptions.push(subscription.clone());
1040            }
1041        }
1042
1043        drop(subscriptions); // Explicitly drop the guard
1044
1045        // Call callbacks for matched subscriptions
1046        for subscription in matched_subscriptions {
1047            debug!("Calling callback for subscription: {}", subscription.id);
1048            (subscription.callback)(message.clone());
1049        }
1050    }
1051
1052    /// Check if topic matches subscription pattern
1053    fn topic_matches(subscription_topic: &str, message_topic: &str) -> bool {
1054        // Simple pattern matching - could be enhanced with wildcards
1055        subscription_topic == message_topic || message_topic.starts_with(subscription_topic)
1056    }
1057
1058    /// Track user presence in a channel
1059    ///
1060    /// # Examples
1061    /// ```rust,no_run
1062    /// use supabase::realtime::PresenceState;
1063    /// use std::collections::HashMap;
1064    ///
1065    /// # async fn example(realtime: &supabase::realtime::Realtime) -> supabase::Result<()> {
1066    /// let mut metadata = HashMap::new();
1067    /// metadata.insert("status".to_string(), serde_json::Value::String("online".to_string()));
1068    /// metadata.insert("location".to_string(), serde_json::Value::String("dashboard".to_string()));
1069    ///
1070    /// let presence_state = PresenceState {
1071    ///     user_id: "user123".to_string(),
1072    ///     online_at: chrono::Utc::now().to_rfc3339(),
1073    ///     metadata: Some(metadata),
1074    /// };
1075    ///
1076    /// realtime.track_presence("lobby", presence_state).await?;
1077    /// # Ok(())
1078    /// # }
1079    /// ```
1080    pub async fn track_presence(&self, channel: &str, presence_state: PresenceState) -> Result<()> {
1081        debug!(
1082            "Tracking presence for user {} in channel {}",
1083            presence_state.user_id, channel
1084        );
1085
1086        let topic = format!("realtime:{}", channel);
1087        let ref_id = Uuid::new_v4().to_string();
1088
1089        let message = RealtimeProtocolMessage {
1090            topic: topic.clone(),
1091            event: "presence".to_string(),
1092            payload: serde_json::json!({
1093                "event": "track",
1094                "payload": presence_state
1095            }),
1096            ref_id,
1097        };
1098
1099        let mut connection_guard = self.connection_manager.connection.write().await;
1100        if let Some(ref mut connection) = *connection_guard {
1101            let message_json = serde_json::to_string(&message).map_err(|e| {
1102                Error::realtime(format!("Failed to serialize presence message: {}", e))
1103            })?;
1104
1105            connection.send(&message_json).await?;
1106            info!(
1107                "Started tracking presence for user {}",
1108                presence_state.user_id
1109            );
1110        } else {
1111            return Err(Error::realtime("Not connected to realtime server"));
1112        }
1113
1114        Ok(())
1115    }
1116
1117    /// Stop tracking user presence in a channel
1118    ///
1119    /// # Examples
1120    /// ```rust,no_run
1121    /// # async fn example(realtime: &supabase::realtime::Realtime) -> supabase::Result<()> {
1122    /// realtime.untrack_presence("lobby", "user123").await?;
1123    /// # Ok(())
1124    /// # }
1125    /// ```
1126    pub async fn untrack_presence(&self, channel: &str, user_id: &str) -> Result<()> {
1127        debug!(
1128            "Untracking presence for user {} in channel {}",
1129            user_id, channel
1130        );
1131
1132        let topic = format!("realtime:{}", channel);
1133        let ref_id = Uuid::new_v4().to_string();
1134
1135        let message = RealtimeProtocolMessage {
1136            topic: topic.clone(),
1137            event: "presence".to_string(),
1138            payload: serde_json::json!({
1139                "event": "untrack",
1140                "payload": {
1141                    "user_id": user_id
1142                }
1143            }),
1144            ref_id,
1145        };
1146
1147        let mut connection_guard = self.connection_manager.connection.write().await;
1148        if let Some(ref mut connection) = *connection_guard {
1149            let message_json = serde_json::to_string(&message).map_err(|e| {
1150                Error::realtime(format!("Failed to serialize presence message: {}", e))
1151            })?;
1152
1153            connection.send(&message_json).await?;
1154            info!("Stopped tracking presence for user {}", user_id);
1155        } else {
1156            return Err(Error::realtime("Not connected to realtime server"));
1157        }
1158
1159        Ok(())
1160    }
1161
1162    /// Get all users currently present in a channel
1163    ///
1164    /// # Examples
1165    /// ```rust,no_run
1166    /// # async fn example(realtime: &supabase::realtime::Realtime) -> supabase::Result<()> {
1167    /// let present_users = realtime.get_presence("lobby").await?;
1168    /// println!("Users online: {}", present_users.len());
1169    /// # Ok(())
1170    /// # }
1171    /// ```
1172    pub async fn get_presence(&self, channel: &str) -> Result<Vec<PresenceState>> {
1173        debug!("Getting presence for channel: {}", channel);
1174
1175        let topic = format!("realtime:{}", channel);
1176        let ref_id = Uuid::new_v4().to_string();
1177
1178        let message = RealtimeProtocolMessage {
1179            topic: topic.clone(),
1180            event: "presence".to_string(),
1181            payload: serde_json::json!({
1182                "event": "state"
1183            }),
1184            ref_id,
1185        };
1186
1187        let mut connection_guard = self.connection_manager.connection.write().await;
1188        if let Some(ref mut connection) = *connection_guard {
1189            let message_json = serde_json::to_string(&message).map_err(|e| {
1190                Error::realtime(format!("Failed to serialize presence message: {}", e))
1191            })?;
1192
1193            connection.send(&message_json).await?;
1194
1195            // Note: In a real implementation, you'd wait for the response
1196            // For now, returning empty vec as this would require more complex message handling
1197            info!("Requested presence state for channel: {}", channel);
1198            Ok(Vec::new())
1199        } else {
1200            Err(Error::realtime("Not connected to realtime server"))
1201        }
1202    }
1203
1204    /// Send a broadcast message to all subscribers in a channel
1205    ///
1206    /// # Examples
1207    /// ```rust,no_run
1208    /// use serde_json::json;
1209    ///
1210    /// # async fn example(realtime: &supabase::realtime::Realtime) -> supabase::Result<()> {
1211    /// let payload = json!({
1212    ///     "message": "Hello, everyone!",
1213    ///     "from": "user123",
1214    ///     "timestamp": chrono::Utc::now().to_rfc3339()
1215    /// });
1216    ///
1217    /// realtime.broadcast("chat", "new_message", payload, Some("user123")).await?;
1218    /// # Ok(())
1219    /// # }
1220    /// ```
1221    pub async fn broadcast(
1222        &self,
1223        channel: &str,
1224        event: &str,
1225        payload: serde_json::Value,
1226        from_user_id: Option<&str>,
1227    ) -> Result<()> {
1228        debug!(
1229            "Broadcasting message to channel: {} event: {}",
1230            channel, event
1231        );
1232
1233        let topic = format!("realtime:{}", channel);
1234        let ref_id = Uuid::new_v4().to_string();
1235
1236        let broadcast_message = BroadcastMessage {
1237            event: event.to_string(),
1238            payload,
1239            from_user_id: from_user_id.map(|s| s.to_string()),
1240            timestamp: chrono::Utc::now().to_rfc3339(),
1241        };
1242
1243        let message = RealtimeProtocolMessage {
1244            topic: topic.clone(),
1245            event: "broadcast".to_string(),
1246            payload: serde_json::to_value(broadcast_message)?,
1247            ref_id,
1248        };
1249
1250        let mut connection_guard = self.connection_manager.connection.write().await;
1251        if let Some(ref mut connection) = *connection_guard {
1252            let message_json = serde_json::to_string(&message).map_err(|e| {
1253                Error::realtime(format!("Failed to serialize broadcast message: {}", e))
1254            })?;
1255
1256            connection.send(&message_json).await?;
1257            info!("Sent broadcast message to channel: {}", channel);
1258        } else {
1259            return Err(Error::realtime("Not connected to realtime server"));
1260        }
1261
1262        Ok(())
1263    }
1264
1265    /// Subscribe to a channel with advanced configuration
1266    ///
1267    /// This method provides more control over subscriptions including presence tracking,
1268    /// broadcast messages, and advanced filtering.
1269    ///
1270    /// # Examples
1271    /// ```rust,no_run
1272    /// use supabase::realtime::{SubscriptionConfig, RealtimeEvent, AdvancedFilter, FilterOperator};
1273    /// use std::sync::Arc;
1274    ///
1275    /// # async fn example(realtime: &supabase::realtime::Realtime) -> supabase::Result<()> {
1276    /// let config = SubscriptionConfig {
1277    ///     table: Some("posts".to_string()),
1278    ///     schema: "public".to_string(),
1279    ///     event: Some(RealtimeEvent::All),
1280    ///     advanced_filters: vec![
1281    ///         AdvancedFilter {
1282    ///             column: "status".to_string(),
1283    ///             operator: FilterOperator::Equal,
1284    ///             value: serde_json::Value::String("published".to_string()),
1285    ///         }
1286    ///     ],
1287    ///     enable_presence: true,
1288    ///     enable_broadcast: true,
1289    ///     presence_callback: Some(Arc::new(|event| {
1290    ///         println!("Presence event: {:?}", event);
1291    ///     })),
1292    ///     broadcast_callback: Some(Arc::new(|message| {
1293    ///         println!("Broadcast message: {:?}", message);
1294    ///     })),
1295    ///     ..Default::default()
1296    /// };
1297    ///
1298    /// let subscription_id = realtime.subscribe_advanced("posts", config, |msg| {
1299    ///     println!("Received message: {:?}", msg);
1300    /// }).await?;
1301    /// println!("Advanced subscription ID: {}", subscription_id);
1302    /// # Ok(())
1303    /// # }
1304    /// ```
1305    #[cfg(not(target_arch = "wasm32"))]
1306    pub async fn subscribe_advanced<F>(
1307        &self,
1308        channel: &str,
1309        config: SubscriptionConfig,
1310        callback: F,
1311    ) -> Result<String>
1312    where
1313        F: Fn(RealtimeMessage) + Send + Sync + 'static,
1314    {
1315        debug!("Creating advanced subscription for channel: {}", channel);
1316
1317        let subscription_id = Uuid::new_v4().to_string();
1318        let topic = if let Some(ref table) = config.table {
1319            format!("realtime:{}:{}:{}", config.schema, table, channel)
1320        } else {
1321            format!("realtime:{}", channel)
1322        };
1323
1324        // Build filter string from advanced filters
1325        let mut filter_parts = Vec::new();
1326
1327        if let Some(ref simple_filter) = config.filter {
1328            filter_parts.push(simple_filter.clone());
1329        }
1330
1331        for advanced_filter in &config.advanced_filters {
1332            let filter_str = match &advanced_filter.value {
1333                serde_json::Value::String(s) => format!(
1334                    "{}={}. {}",
1335                    advanced_filter.column,
1336                    serde_json::to_string(&advanced_filter.operator)?.trim_matches('"'),
1337                    s
1338                ),
1339                serde_json::Value::Array(arr) => {
1340                    let values: Vec<String> = arr
1341                        .iter()
1342                        .map(|v| v.to_string().trim_matches('"').to_string())
1343                        .collect();
1344                    format!(
1345                        "{}={}.({})",
1346                        advanced_filter.column,
1347                        serde_json::to_string(&advanced_filter.operator)?.trim_matches('"'),
1348                        values.join(",")
1349                    )
1350                }
1351                other => format!(
1352                    "{}={}. {}",
1353                    advanced_filter.column,
1354                    serde_json::to_string(&advanced_filter.operator)?.trim_matches('"'),
1355                    other.to_string().trim_matches('"')
1356                ),
1357            };
1358            filter_parts.push(filter_str);
1359        }
1360
1361        let combined_filter = if !filter_parts.is_empty() {
1362            Some(filter_parts.join(" and "))
1363        } else {
1364            None
1365        };
1366
1367        let subscription = Subscription {
1368            id: subscription_id.clone(),
1369            topic: topic.clone(),
1370            config: SubscriptionConfig {
1371                filter: combined_filter,
1372                ..config.clone()
1373            },
1374            callback: Arc::new(callback),
1375        };
1376
1377        // Store subscription
1378        {
1379            let mut subscriptions = self.connection_manager.subscriptions.write().await;
1380            subscriptions.insert(subscription_id.clone(), subscription);
1381        }
1382
1383        // Send join message
1384        let ref_id = self
1385            .connection_manager
1386            .ref_counter
1387            .fetch_add(1, Ordering::SeqCst)
1388            .to_string();
1389
1390        let mut join_payload = serde_json::json!({
1391            "config": {
1392                "postgres_changes": [{
1393                    "event": config.event.unwrap_or(RealtimeEvent::All),
1394                    "schema": config.schema,
1395                }]
1396            }
1397        });
1398
1399        if let Some(ref table) = config.table {
1400            join_payload["config"]["postgres_changes"][0]["table"] =
1401                serde_json::Value::String(table.clone());
1402        }
1403
1404        if let Some(ref filter) = config.filter {
1405            join_payload["config"]["postgres_changes"][0]["filter"] =
1406                serde_json::Value::String(filter.clone());
1407        }
1408
1409        // Add presence configuration
1410        if config.enable_presence {
1411            join_payload["config"]["presence"] = serde_json::json!({ "key": "" });
1412        }
1413
1414        // Add broadcast configuration
1415        if config.enable_broadcast {
1416            join_payload["config"]["broadcast"] = serde_json::json!({ "self": true });
1417        }
1418
1419        let join_message = RealtimeProtocolMessage {
1420            topic: topic.clone(),
1421            event: "phx_join".to_string(),
1422            payload: join_payload,
1423            ref_id,
1424        };
1425
1426        let mut connection_guard = self.connection_manager.connection.write().await;
1427        if let Some(ref mut connection) = *connection_guard {
1428            let message_json = serde_json::to_string(&join_message)
1429                .map_err(|e| Error::realtime(format!("Failed to serialize join message: {}", e)))?;
1430
1431            connection.send(&message_json).await?;
1432            info!("Advanced subscription created: {}", subscription_id);
1433        } else {
1434            return Err(Error::realtime("Not connected to realtime server"));
1435        }
1436
1437        Ok(subscription_id)
1438    }
1439
1440    /// Subscribe to a channel with advanced configuration (WASM version)
1441    #[cfg(target_arch = "wasm32")]
1442    pub async fn subscribe_advanced<F>(
1443        &self,
1444        channel: &str,
1445        config: SubscriptionConfig,
1446        callback: F,
1447    ) -> Result<String>
1448    where
1449        F: Fn(RealtimeMessage) + 'static,
1450    {
1451        debug!("Creating advanced subscription for channel: {}", channel);
1452
1453        let subscription_id = Uuid::new_v4().to_string();
1454        let topic = if let Some(ref table) = config.table {
1455            format!("realtime:{}:{}:{}", config.schema, table, channel)
1456        } else {
1457            format!("realtime:{}", channel)
1458        };
1459
1460        // Build filter string from advanced filters
1461        let mut filter_parts = Vec::new();
1462
1463        if let Some(ref simple_filter) = config.filter {
1464            filter_parts.push(simple_filter.clone());
1465        }
1466
1467        for advanced_filter in &config.advanced_filters {
1468            let filter_str = match &advanced_filter.value {
1469                serde_json::Value::String(s) => format!(
1470                    "{}={}. {}",
1471                    advanced_filter.column,
1472                    serde_json::to_string(&advanced_filter.operator)?.trim_matches('"'),
1473                    s
1474                ),
1475                serde_json::Value::Array(arr) => {
1476                    let values: Vec<String> = arr
1477                        .iter()
1478                        .map(|v| v.to_string().trim_matches('"').to_string())
1479                        .collect();
1480                    format!(
1481                        "{}={}.({})",
1482                        advanced_filter.column,
1483                        serde_json::to_string(&advanced_filter.operator)?.trim_matches('"'),
1484                        values.join(",")
1485                    )
1486                }
1487                other => format!(
1488                    "{}={}. {}",
1489                    advanced_filter.column,
1490                    serde_json::to_string(&advanced_filter.operator)?.trim_matches('"'),
1491                    other.to_string().trim_matches('"')
1492                ),
1493            };
1494            filter_parts.push(filter_str);
1495        }
1496
1497        let combined_filter = if !filter_parts.is_empty() {
1498            Some(filter_parts.join(" and "))
1499        } else {
1500            None
1501        };
1502
1503        let subscription = Subscription {
1504            id: subscription_id.clone(),
1505            topic: topic.clone(),
1506            config: SubscriptionConfig {
1507                filter: combined_filter,
1508                ..config.clone()
1509            },
1510            callback: Arc::new(callback),
1511        };
1512
1513        // Store subscription
1514        {
1515            let mut subscriptions = self.connection_manager.subscriptions.write().await;
1516            subscriptions.insert(subscription_id.clone(), subscription);
1517        }
1518
1519        // Send join message
1520        let ref_id = self
1521            .connection_manager
1522            .ref_counter
1523            .fetch_add(1, Ordering::SeqCst)
1524            .to_string();
1525
1526        let mut join_payload = serde_json::json!({
1527            "config": {
1528                "postgres_changes": [{
1529                    "event": config.event.unwrap_or(RealtimeEvent::All),
1530                    "schema": config.schema,
1531                }]
1532            }
1533        });
1534
1535        if let Some(ref table) = config.table {
1536            join_payload["config"]["postgres_changes"][0]["table"] =
1537                serde_json::Value::String(table.clone());
1538        }
1539
1540        if let Some(ref filter) = config.filter {
1541            join_payload["config"]["postgres_changes"][0]["filter"] =
1542                serde_json::Value::String(filter.clone());
1543        }
1544
1545        // Add presence configuration
1546        if config.enable_presence {
1547            join_payload["config"]["presence"] = serde_json::json!({ "key": "" });
1548        }
1549
1550        // Add broadcast configuration
1551        if config.enable_broadcast {
1552            join_payload["config"]["broadcast"] = serde_json::json!({ "self": true });
1553        }
1554
1555        let join_message = RealtimeProtocolMessage {
1556            topic: topic.clone(),
1557            event: "phx_join".to_string(),
1558            payload: join_payload,
1559            ref_id,
1560        };
1561
1562        let mut connection_guard = self.connection_manager.connection.write().await;
1563        if let Some(ref mut connection) = *connection_guard {
1564            let message_json = serde_json::to_string(&join_message)
1565                .map_err(|e| Error::realtime(format!("Failed to serialize join message: {}", e)))?;
1566
1567            connection.send(&message_json).await?;
1568            info!("Advanced subscription created: {}", subscription_id);
1569        } else {
1570            return Err(Error::realtime("Not connected to realtime server"));
1571        }
1572
1573        Ok(subscription_id)
1574    }
1575}
1576
1577/// Builder for channel subscriptions
1578///
1579/// Provides a fluent API for configuring realtime subscriptions.
1580///
1581/// # Examples
1582/// ```rust,no_run
1583/// # use supabase::Client;
1584/// # use supabase::realtime::RealtimeEvent;
1585/// # async fn example() -> supabase::Result<()> {
1586/// let client = Client::new("your-url", "your-key")?;
1587///
1588/// let subscription = client.realtime()
1589///     .channel("public-posts")
1590///     .table("posts")
1591///     .event(RealtimeEvent::Insert)
1592///     .filter("author_id=eq.123")
1593///     .subscribe(|message| {
1594///         println!("New post by author 123: {:?}", message);
1595///     })
1596///     .await?;
1597/// # Ok(())
1598/// # }
1599/// ```
1600#[cfg(feature = "realtime")]
1601pub struct ChannelBuilder {
1602    realtime: Realtime,
1603    config: SubscriptionConfig,
1604}
1605
1606#[cfg(feature = "realtime")]
1607impl ChannelBuilder {
1608    /// Set the table to subscribe to
1609    ///
1610    /// # Examples
1611    /// ```rust,no_run
1612    /// # use supabase::Client;
1613    /// # async fn example() -> supabase::Result<()> {
1614    /// let client = Client::new("your-url", "your-key")?;
1615    ///
1616    /// let subscription = client.realtime()
1617    ///     .channel("posts")
1618    ///     .table("posts") // Subscribe to the 'posts' table
1619    ///     .subscribe(|_| {})
1620    ///     .await?;
1621    /// # Ok(())
1622    /// # }
1623    /// ```
1624    pub fn table(mut self, table: &str) -> Self {
1625        self.config.table = Some(table.to_string());
1626        self
1627    }
1628
1629    /// Set the schema (default: "public")
1630    ///
1631    /// # Examples
1632    /// ```rust,no_run
1633    /// # use supabase::Client;
1634    /// # async fn example() -> supabase::Result<()> {
1635    /// let client = Client::new("your-url", "your-key")?;
1636    ///
1637    /// let subscription = client.realtime()
1638    ///     .channel("admin-logs")
1639    ///     .schema("admin") // Subscribe to 'admin' schema
1640    ///     .table("logs")
1641    ///     .subscribe(|_| {})
1642    ///     .await?;
1643    /// # Ok(())
1644    /// # }
1645    /// ```
1646    pub fn schema(mut self, schema: &str) -> Self {
1647        self.config.schema = schema.to_string();
1648        self
1649    }
1650
1651    /// Set the event type filter
1652    ///
1653    /// # Examples
1654    /// ```rust,no_run
1655    /// # use supabase::Client;
1656    /// # use supabase::realtime::RealtimeEvent;
1657    /// # async fn example() -> supabase::Result<()> {
1658    /// let client = Client::new("your-url", "your-key")?;
1659    ///
1660    /// // Only listen to INSERT events
1661    /// let subscription = client.realtime()
1662    ///     .channel("new-posts")
1663    ///     .table("posts")
1664    ///     .event(RealtimeEvent::Insert)
1665    ///     .subscribe(|_| {})
1666    ///     .await?;
1667    /// # Ok(())
1668    /// # }
1669    /// ```
1670    pub fn event(mut self, event: RealtimeEvent) -> Self {
1671        self.config.event = Some(event);
1672        self
1673    }
1674
1675    /// Set a filter for the subscription
1676    ///
1677    /// # Examples
1678    /// ```rust,no_run
1679    /// # use supabase::Client;
1680    /// # async fn example() -> supabase::Result<()> {
1681    /// let client = Client::new("your-url", "your-key")?;
1682    ///
1683    /// // Only posts by specific author
1684    /// let subscription = client.realtime()
1685    ///     .channel("my-posts")
1686    ///     .table("posts")
1687    ///     .filter("author_id=eq.123")
1688    ///     .subscribe(|_| {})
1689    ///     .await?;
1690    /// # Ok(())
1691    /// # }
1692    /// ```
1693    pub fn filter(mut self, filter: &str) -> Self {
1694        self.config.filter = Some(filter.to_string());
1695        self
1696    }
1697
1698    /// Subscribe with a callback function
1699    ///
1700    /// # Examples
1701    /// ```rust,no_run
1702    /// # use supabase::Client;
1703    /// # async fn example() -> supabase::Result<()> {
1704    /// let client = Client::new("your-url", "your-key")?;
1705    ///
1706    /// let subscription_id = client.realtime()
1707    ///     .channel("posts")
1708    ///     .table("posts")
1709    ///     .subscribe(|message| {
1710    ///         match message.event.as_str() {
1711    ///             "INSERT" => println!("New post created!"),
1712    ///             "UPDATE" => println!("Post updated!"),
1713    ///             "DELETE" => println!("Post deleted!"),
1714    ///             _ => println!("Other event: {}", message.event),
1715    ///         }
1716    ///     })
1717    ///     .await?;
1718    /// # Ok(())
1719    /// # }
1720    /// ```
1721    #[cfg(not(target_arch = "wasm32"))]
1722    pub async fn subscribe<F>(self, callback: F) -> Result<String>
1723    where
1724        F: Fn(RealtimeMessage) + Send + Sync + 'static,
1725    {
1726        self.realtime.subscribe(self.config, callback).await
1727    }
1728
1729    /// Subscribe with a callback function (WASM version)
1730    #[cfg(target_arch = "wasm32")]
1731    pub async fn subscribe<F>(self, callback: F) -> Result<String>
1732    where
1733        F: Fn(RealtimeMessage) + 'static,
1734    {
1735        self.realtime.subscribe(self.config, callback).await
1736    }
1737}
1738
1739#[cfg(all(test, feature = "realtime"))]
1740mod tests {
1741    use super::*;
1742    use std::sync::atomic::{AtomicBool, Ordering};
1743    use std::sync::Arc;
1744
1745    #[tokio::test]
1746    async fn test_realtime_creation() {
1747        let config = Arc::new(SupabaseConfig {
1748            url: "https://test.supabase.co".to_string(),
1749            key: "test-key".to_string(),
1750            ..Default::default()
1751        });
1752
1753        let realtime = Realtime::new(config).unwrap();
1754        assert!(!realtime.is_connected().await);
1755    }
1756
1757    #[tokio::test]
1758    async fn test_subscription_config_default() {
1759        let config = SubscriptionConfig::default();
1760        assert_eq!(config.schema, "public");
1761        assert!(config.table.is_none());
1762        assert!(config.event.is_none());
1763        assert!(config.filter.is_none());
1764    }
1765
1766    #[tokio::test]
1767    async fn test_realtime_event_serialization() {
1768        use serde_json;
1769
1770        let event = RealtimeEvent::Insert;
1771        let serialized = serde_json::to_string(&event).unwrap();
1772        assert_eq!(serialized, "\"INSERT\"");
1773
1774        let event = RealtimeEvent::All;
1775        let serialized = serde_json::to_string(&event).unwrap();
1776        assert_eq!(serialized, "\"*\"");
1777    }
1778
1779    #[tokio::test]
1780    async fn test_build_topic() {
1781        let config = Arc::new(SupabaseConfig {
1782            url: "https://test.supabase.co".to_string(),
1783            key: "test-key".to_string(),
1784            ..Default::default()
1785        });
1786
1787        let realtime = Realtime::new(config).unwrap();
1788
1789        // Test with table
1790        let subscription_config = SubscriptionConfig {
1791            table: Some("posts".to_string()),
1792            schema: "public".to_string(),
1793            event: None,
1794            filter: None,
1795            ..Default::default()
1796        };
1797        let topic = realtime.build_topic(&subscription_config);
1798        assert_eq!(topic, "realtime:public:posts");
1799
1800        // Test without table
1801        let subscription_config = SubscriptionConfig {
1802            table: None,
1803            schema: "admin".to_string(),
1804            event: None,
1805            filter: None,
1806            ..Default::default()
1807        };
1808        let topic = realtime.build_topic(&subscription_config);
1809        assert_eq!(topic, "realtime:admin");
1810    }
1811
1812    #[tokio::test]
1813    async fn test_topic_matching() {
1814        // Exact match
1815        assert!(Realtime::topic_matches(
1816            "realtime:public:posts",
1817            "realtime:public:posts"
1818        ));
1819
1820        // Prefix match
1821        assert!(Realtime::topic_matches(
1822            "realtime:public",
1823            "realtime:public:posts"
1824        ));
1825
1826        // No match
1827        assert!(!Realtime::topic_matches(
1828            "realtime:public:users",
1829            "realtime:public:posts"
1830        ));
1831    }
1832
1833    #[tokio::test]
1834    async fn test_realtime_message_parsing() {
1835        let json = r#"{
1836            "event": "INSERT",
1837            "payload": {
1838                "record": {"id": 1, "title": "Test"},
1839                "schema": "public",
1840                "table": "posts"
1841            },
1842            "topic": "realtime:public:posts"
1843        }"#;
1844
1845        let message = serde_json::from_str::<RealtimeMessage>(json);
1846        assert!(message.is_ok());
1847
1848        let message = message.unwrap();
1849        assert_eq!(message.event, "INSERT");
1850        assert_eq!(message.topic, "realtime:public:posts");
1851        assert!(message.payload.record.is_some());
1852    }
1853
1854    #[tokio::test]
1855    async fn test_channel_builder() {
1856        let config = Arc::new(SupabaseConfig {
1857            url: "https://test.supabase.co".to_string(),
1858            key: "test-key".to_string(),
1859            ..Default::default()
1860        });
1861
1862        let realtime = Realtime::new(config).unwrap();
1863        let builder = realtime.channel("test");
1864
1865        // Test builder methods
1866        let builder = builder
1867            .table("posts")
1868            .schema("public")
1869            .event(RealtimeEvent::Insert)
1870            .filter("author_id=eq.123");
1871
1872        assert_eq!(builder.config.table, Some("posts".to_string()));
1873        assert_eq!(builder.config.schema, "public");
1874        assert_eq!(builder.config.event, Some(RealtimeEvent::Insert));
1875        assert_eq!(builder.config.filter, Some("author_id=eq.123".to_string()));
1876    }
1877
1878    #[cfg(not(target_arch = "wasm32"))] // This test requires native tokio
1879    #[tokio::test]
1880    async fn test_subscription_callback() {
1881        let config = Arc::new(SupabaseConfig {
1882            url: "https://test.supabase.co".to_string(),
1883            key: "test-key".to_string(),
1884            ..Default::default()
1885        });
1886
1887        let realtime = Realtime::new(config).unwrap();
1888
1889        // Test that subscription creation works without connecting
1890        let called = Arc::new(AtomicBool::new(false));
1891        let called_clone = Arc::clone(&called);
1892
1893        let subscription_config = SubscriptionConfig {
1894            table: Some("test".to_string()),
1895            schema: "public".to_string(),
1896            event: Some(RealtimeEvent::All),
1897            filter: None,
1898            ..Default::default()
1899        };
1900
1901        // This will fail because we're not connected, but that's expected
1902        let result = realtime
1903            .subscribe(subscription_config, move |_msg| {
1904                called_clone.store(true, Ordering::SeqCst);
1905            })
1906            .await;
1907
1908        // Should fail due to no connection
1909        assert!(result.is_err());
1910        assert!(!called.load(Ordering::SeqCst));
1911    }
1912
1913    #[tokio::test]
1914    async fn test_protocol_message_serialization() {
1915        let message = RealtimeProtocolMessage {
1916            topic: "realtime:public:posts".to_string(),
1917            event: "phx_join".to_string(),
1918            payload: serde_json::json!({"table": "posts"}),
1919            ref_id: "123".to_string(),
1920        };
1921
1922        let serialized = serde_json::to_string(&message).unwrap();
1923        assert!(serialized.contains("phx_join"));
1924        assert!(serialized.contains("realtime:public:posts"));
1925        assert!(serialized.contains("posts"));
1926    }
1927
1928    #[tokio::test]
1929    async fn test_event_filter_matching() {
1930        // Test INSERT event matching
1931        let insert_event = Some(RealtimeEvent::Insert);
1932        let update_event = Some(RealtimeEvent::Update);
1933        let all_event = Some(RealtimeEvent::All);
1934
1935        // INSERT should match INSERT
1936        assert_eq!(insert_event, Some(RealtimeEvent::Insert));
1937
1938        // INSERT should not match UPDATE
1939        assert_ne!(insert_event, update_event);
1940
1941        // ALL should match ALL
1942        assert_eq!(all_event, Some(RealtimeEvent::All));
1943    }
1944}