1#[cfg(feature = "realtime")]
37use crate::{
38 async_runtime::{AsyncLock, RuntimeLock},
39 error::{Error, Result},
40 types::SupabaseConfig,
41 websocket::{create_websocket, WebSocketConnection},
42};
43
44#[cfg(feature = "realtime")]
45use serde::{Deserialize, Serialize};
46
47#[cfg(feature = "realtime")]
48use std::{
49 collections::HashMap,
50 sync::{
51 atomic::{AtomicBool, AtomicU64, Ordering},
52 Arc,
53 },
54 time::Duration,
55};
56
57#[cfg(feature = "realtime")]
58use tracing::{debug, error, info, warn};
59
60#[cfg(feature = "realtime")]
61use uuid::Uuid;
62
63#[cfg(feature = "realtime")]
65pub type ConnectionStorage = Arc<RuntimeLock<Vec<Option<Box<dyn WebSocketConnection>>>>>;
66
67#[cfg(feature = "realtime")]
92#[derive(Debug, Clone)]
93pub struct Realtime {
94 connection_manager: Arc<ConnectionManager>,
95 message_loop_handle: Arc<AtomicBool>,
96}
97
98#[cfg(feature = "realtime")]
100struct ConnectionManager {
101 url: String,
102 api_key: String,
103 connection: RuntimeLock<Option<Box<dyn WebSocketConnection>>>,
104 ref_counter: AtomicU64,
105 subscriptions: RuntimeLock<HashMap<String, Subscription>>,
106 is_message_loop_running: AtomicBool,
107}
108
109#[cfg(feature = "realtime")]
110impl std::fmt::Debug for ConnectionManager {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 f.debug_struct("ConnectionManager")
113 .field("url", &self.url)
114 .field("api_key", &"[REDACTED]")
115 .field("ref_counter", &self.ref_counter)
116 .field("connection", &"<WebSocket connection>")
117 .field("subscriptions", &"<subscriptions>")
118 .finish()
119 }
120}
121
122#[cfg(feature = "realtime")]
124#[derive(Clone)]
125pub struct Subscription {
126 pub id: String,
127 pub topic: String,
128 pub config: SubscriptionConfig,
129 #[cfg(not(target_arch = "wasm32"))]
130 pub callback: Arc<dyn Fn(RealtimeMessage) + Send + Sync>,
131 #[cfg(target_arch = "wasm32")]
132 pub callback: Arc<dyn Fn(RealtimeMessage)>,
133}
134
135#[cfg(feature = "realtime")]
136impl std::fmt::Debug for Subscription {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 f.debug_struct("Subscription")
139 .field("id", &self.id)
140 .field("topic", &self.topic)
141 .field("config", &self.config)
142 .field("callback", &"<callback fn>")
143 .finish()
144 }
145}
146
147#[cfg(feature = "realtime")]
173#[derive(Clone)]
174pub struct SubscriptionConfig {
175 pub table: Option<String>,
176 pub schema: String,
177 pub event: Option<RealtimeEvent>,
178 pub filter: Option<String>,
179 pub advanced_filters: Vec<AdvancedFilter>,
180 pub enable_presence: bool,
181 pub enable_broadcast: bool,
182 #[cfg(not(target_arch = "wasm32"))]
183 pub presence_callback: Option<PresenceCallback>,
184 #[cfg(target_arch = "wasm32")]
185 pub presence_callback: Option<Arc<dyn Fn(PresenceEvent)>>,
186 #[cfg(not(target_arch = "wasm32"))]
187 pub broadcast_callback: Option<BroadcastCallback>,
188 #[cfg(target_arch = "wasm32")]
189 pub broadcast_callback: Option<Arc<dyn Fn(BroadcastMessage)>>,
190}
191
192#[cfg(feature = "realtime")]
193impl std::fmt::Debug for SubscriptionConfig {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 f.debug_struct("SubscriptionConfig")
196 .field("table", &self.table)
197 .field("schema", &self.schema)
198 .field("event", &self.event)
199 .field("filter", &self.filter)
200 .field("advanced_filters", &self.advanced_filters)
201 .field("enable_presence", &self.enable_presence)
202 .field("enable_broadcast", &self.enable_broadcast)
203 .field("presence_callback", &"<callback fn>")
204 .field("broadcast_callback", &"<callback fn>")
205 .finish()
206 }
207}
208
209#[cfg(feature = "realtime")]
210impl Default for SubscriptionConfig {
211 fn default() -> Self {
212 Self {
213 table: None,
214 schema: "public".to_string(),
215 event: None,
216 filter: None,
217 advanced_filters: Vec::new(),
218 enable_presence: false,
219 enable_broadcast: false,
220 presence_callback: None,
221 broadcast_callback: None,
222 }
223 }
224}
225
226#[cfg(feature = "realtime")]
239#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
240pub enum RealtimeEvent {
241 #[serde(rename = "INSERT")]
242 Insert,
243 #[serde(rename = "UPDATE")]
244 Update,
245 #[serde(rename = "DELETE")]
246 Delete,
247 #[serde(rename = "*")]
248 All,
249}
250
251#[cfg(feature = "realtime")]
255#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct RealtimeMessage {
257 pub event: String,
258 pub payload: RealtimePayload,
259 pub ref_id: Option<String>,
260 pub topic: String,
261}
262
263#[cfg(feature = "realtime")]
267#[derive(Debug, Clone, Serialize, Deserialize)]
268pub struct RealtimePayload {
269 pub record: Option<serde_json::Value>,
270 pub old_record: Option<serde_json::Value>,
271 pub schema: Option<String>,
272 pub table: Option<String>,
273 pub commit_timestamp: Option<String>,
274 pub event_type: Option<String>,
275 pub new: Option<serde_json::Value>,
276 pub old: Option<serde_json::Value>,
277}
278
279#[cfg(feature = "realtime")]
281#[derive(Debug, Serialize)]
282struct RealtimeProtocolMessage {
283 topic: String,
284 event: String,
285 payload: serde_json::Value,
286 ref_id: String,
287}
288
289#[cfg(feature = "realtime")]
291#[derive(Debug, Clone, Serialize, Deserialize)]
292pub struct PresenceState {
293 pub user_id: String,
294 pub online_at: String,
295 pub metadata: Option<HashMap<String, serde_json::Value>>,
296}
297
298#[cfg(feature = "realtime")]
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct PresenceEvent {
302 pub event_type: PresenceEventType,
303 pub user_id: String,
304 pub presence_state: PresenceState,
305}
306
307#[cfg(feature = "realtime")]
309#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
310pub enum PresenceEventType {
311 #[serde(rename = "presence_state")]
312 Join,
313 #[serde(rename = "presence_diff")]
314 Leave,
315}
316
317#[cfg(feature = "realtime")]
319pub type PresenceCallback = Arc<dyn Fn(PresenceEvent) + Send + Sync>;
320
321#[cfg(feature = "realtime")]
323#[derive(Debug, Clone, Serialize, Deserialize)]
324pub struct BroadcastMessage {
325 pub event: String,
326 pub payload: serde_json::Value,
327 pub from_user_id: Option<String>,
328 pub timestamp: String,
329}
330
331#[cfg(feature = "realtime")]
333pub type BroadcastCallback = Arc<dyn Fn(BroadcastMessage) + Send + Sync>;
334
335#[cfg(feature = "realtime")]
337#[derive(Debug, Clone)]
338pub struct AdvancedFilter {
339 pub column: String,
340 pub operator: FilterOperator,
341 pub value: serde_json::Value,
342}
343
344#[cfg(feature = "realtime")]
346#[derive(Debug, Clone, Serialize, Deserialize)]
347pub enum FilterOperator {
348 #[serde(rename = "eq")]
349 Equal,
350 #[serde(rename = "neq")]
351 NotEqual,
352 #[serde(rename = "gt")]
353 GreaterThan,
354 #[serde(rename = "gte")]
355 GreaterThanOrEqual,
356 #[serde(rename = "lt")]
357 LessThan,
358 #[serde(rename = "lte")]
359 LessThanOrEqual,
360 #[serde(rename = "in")]
361 In,
362 #[serde(rename = "is")]
363 Is,
364 #[serde(rename = "like")]
365 Like,
366 #[serde(rename = "ilike")]
367 ILike,
368 #[serde(rename = "match")]
369 Match,
370 #[serde(rename = "imatch")]
371 IMatch,
372}
373
374#[cfg(feature = "realtime")]
376#[derive(Debug, Clone)]
377pub struct ConnectionPoolConfig {
378 pub max_connections: usize,
380 pub connection_timeout: u64,
382 pub keep_alive_interval: u64,
384 pub reconnect_delay: u64,
386 pub max_reconnect_attempts: u32,
388}
389
390impl Default for ConnectionPoolConfig {
391 fn default() -> Self {
392 Self {
393 max_connections: 10,
394 connection_timeout: 30,
395 keep_alive_interval: 30,
396 reconnect_delay: 1000,
397 max_reconnect_attempts: 5,
398 }
399 }
400}
401
402#[cfg(feature = "realtime")]
404pub struct ConnectionPool {
405 config: ConnectionPoolConfig,
406 connections: ConnectionStorage,
407 active_connections: Arc<AtomicU64>,
408}
409
410#[cfg(feature = "realtime")]
411impl std::fmt::Debug for ConnectionPool {
412 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
413 f.debug_struct("ConnectionPool")
414 .field("config", &self.config)
415 .field("active_connections", &self.active_connections)
416 .finish()
417 }
418}
419
420#[cfg(feature = "realtime")]
421impl ConnectionPool {
422 pub fn new(config: ConnectionPoolConfig) -> Self {
424 let mut connections = Vec::new();
425 connections.resize_with(config.max_connections, || None);
426
427 Self {
428 config,
429 connections: Arc::new(RuntimeLock::new(connections)),
430 active_connections: Arc::new(AtomicU64::new(0)),
431 }
432 }
433
434 pub async fn get_connection(&self) -> Result<Option<Box<dyn WebSocketConnection>>> {
436 let mut connections = self.connections.write().await;
437
438 for connection_slot in connections.iter_mut() {
439 if let Some(connection) = connection_slot.take() {
440 if connection.is_connected() {
441 debug!("Reusing existing connection from pool");
442 return Ok(Some(connection));
443 }
444 }
445 }
446
447 for connection_slot in connections.iter_mut() {
449 if connection_slot.is_none() {
450 let new_connection = crate::websocket::create_websocket();
451 *connection_slot = Some(new_connection);
452 self.active_connections
453 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
454 debug!("Created new connection in pool");
455 return Ok(connection_slot.take());
456 }
457 }
458
459 debug!("Connection pool is full");
460 Ok(None)
461 }
462
463 pub async fn return_connection(&self, connection: Box<dyn WebSocketConnection>) {
465 let mut connections = self.connections.write().await;
466
467 for connection_slot in connections.iter_mut() {
468 if connection_slot.is_none() {
469 *connection_slot = Some(connection);
470 debug!("Returned connection to pool");
471 return;
472 }
473 }
474
475 warn!("Pool is full, dropping connection");
477 }
478
479 pub async fn get_stats(&self) -> ConnectionPoolStats {
481 let connections = self.connections.read().await;
482 let total = connections.len();
483 let active = connections.iter().filter(|c| c.is_some()).count();
484 let available = connections
485 .iter()
486 .filter(|c| c.as_ref().is_some_and(|conn| conn.is_connected()))
487 .count();
488
489 ConnectionPoolStats {
490 total_connections: total,
491 active_connections: active,
492 available_connections: available,
493 max_connections: self.config.max_connections,
494 }
495 }
496
497 pub async fn close_all(&self) -> Result<()> {
499 let mut connections = self.connections.write().await;
500
501 for connection_slot in connections.iter_mut() {
502 if let Some(mut connection) = connection_slot.take() {
503 connection.close().await?;
504 }
505 }
506
507 self.active_connections
508 .store(0, std::sync::atomic::Ordering::SeqCst);
509 info!("Closed all connections in pool");
510 Ok(())
511 }
512}
513
514#[cfg(feature = "realtime")]
516#[derive(Debug, Clone)]
517pub struct ConnectionPoolStats {
518 pub total_connections: usize,
519 pub active_connections: usize,
520 pub available_connections: usize,
521 pub max_connections: usize,
522}
523
524#[cfg(feature = "realtime")]
525impl Realtime {
526 pub fn new(config: Arc<SupabaseConfig>) -> Result<Self> {
543 debug!("Creating realtime client");
544
545 let ws_url = config
546 .url
547 .replace("http://", "ws://")
548 .replace("https://", "wss://");
549 let realtime_url = format!("{}/realtime/v1/websocket", ws_url);
550
551 let connection_manager = Arc::new(ConnectionManager {
552 url: realtime_url,
553 api_key: config.key.clone(),
554 connection: RuntimeLock::new(None),
555 ref_counter: AtomicU64::new(0),
556 subscriptions: RuntimeLock::new(HashMap::new()),
557 is_message_loop_running: AtomicBool::new(false),
558 });
559
560 let message_loop_handle = Arc::new(AtomicBool::new(false));
561
562 Ok(Self {
563 connection_manager,
564 message_loop_handle,
565 })
566 }
567
568 pub async fn connect(&self) -> Result<()> {
583 debug!("Connecting to realtime server");
584
585 let mut connection_guard = self.connection_manager.connection.write().await;
586
587 if let Some(ref conn) = *connection_guard {
588 if conn.is_connected() {
589 debug!("Already connected to realtime server");
590 return Ok(());
591 }
592 }
593
594 let mut connection = create_websocket();
595 let url = format!(
596 "{}?apikey={}&vsn=1.0.0",
597 self.connection_manager.url, self.connection_manager.api_key
598 );
599
600 connection.connect(&url).await?;
601 *connection_guard = Some(connection);
602
603 self.start_message_loop().await?;
605
606 info!("Connected to realtime server");
607 Ok(())
608 }
609
610 pub async fn disconnect(&self) -> Result<()> {
626 debug!("Disconnecting from realtime server");
627
628 self.message_loop_handle.store(false, Ordering::SeqCst);
630 self.connection_manager
631 .is_message_loop_running
632 .store(false, Ordering::SeqCst);
633
634 let mut connection_guard = self.connection_manager.connection.write().await;
635 if let Some(ref mut connection) = *connection_guard {
636 connection.close().await?;
637 }
638 *connection_guard = None;
639
640 let mut subscriptions = self.connection_manager.subscriptions.write().await;
642 subscriptions.clear();
643
644 info!("Disconnected from realtime server");
645 Ok(())
646 }
647
648 pub async fn is_connected(&self) -> bool {
664 let connection_guard = self.connection_manager.connection.read().await;
665 if let Some(ref conn) = *connection_guard {
666 conn.is_connected()
667 } else {
668 false
669 }
670 }
671
672 pub fn channel(&self, _topic: &str) -> ChannelBuilder {
689 ChannelBuilder {
690 realtime: self.clone(),
691 config: SubscriptionConfig::default(),
692 }
693 }
694
695 pub async fn unsubscribe(&self, subscription_id: &str) -> Result<()> {
716 debug!("Unsubscribing from subscription: {}", subscription_id);
717
718 let mut subscriptions = self.connection_manager.subscriptions.write().await;
719 if let Some(subscription) = subscriptions.remove(subscription_id) {
720 self.send_leave_message(&subscription.topic).await?;
722 info!("Unsubscribed from subscription: {}", subscription_id);
723 } else {
724 warn!("Subscription {} not found for unsubscribe", subscription_id);
725 }
726
727 Ok(())
728 }
729
730 #[cfg(not(target_arch = "wasm32"))]
732 pub async fn subscribe<F>(
733 &self,
734 subscription_config: SubscriptionConfig,
735 callback: F,
736 ) -> Result<String>
737 where
738 F: Fn(RealtimeMessage) + Send + Sync + 'static,
739 {
740 let subscription_id = Uuid::new_v4().to_string();
741 let topic = self.build_topic(&subscription_config);
742
743 debug!(
744 "Creating subscription {} for topic {}",
745 subscription_id, topic
746 );
747
748 self.connect().await?;
750
751 self.send_join_message(&topic, &subscription_config).await?;
753
754 let subscription = Subscription {
756 id: subscription_id.clone(),
757 topic: topic.clone(),
758 config: subscription_config,
759 callback: Arc::new(callback),
760 };
761
762 let mut subscriptions = self.connection_manager.subscriptions.write().await;
763 subscriptions.insert(subscription_id.clone(), subscription);
764
765 info!("Subscribed to topic {} with ID {}", topic, subscription_id);
766 Ok(subscription_id)
767 }
768
769 #[cfg(target_arch = "wasm32")]
771 pub async fn subscribe<F>(
772 &self,
773 subscription_config: SubscriptionConfig,
774 callback: F,
775 ) -> Result<String>
776 where
777 F: Fn(RealtimeMessage) + 'static,
778 {
779 let subscription_id = Uuid::new_v4().to_string();
780 let topic = self.build_topic(&subscription_config);
781
782 debug!(
783 "Creating subscription {} for topic {}",
784 subscription_id, topic
785 );
786
787 self.connect().await?;
789
790 self.send_join_message(&topic, &subscription_config).await?;
792
793 let subscription = Subscription {
795 id: subscription_id.clone(),
796 topic: topic.clone(),
797 config: subscription_config,
798 callback: Arc::new(callback),
799 };
800
801 let mut subscriptions = self.connection_manager.subscriptions.write().await;
802 subscriptions.insert(subscription_id.clone(), subscription);
803
804 info!("Subscribed to topic {} with ID {}", topic, subscription_id);
805 Ok(subscription_id)
806 }
807
808 fn build_topic(&self, config: &SubscriptionConfig) -> String {
810 if let Some(ref table) = config.table {
811 format!("realtime:{}:{}", config.schema, table)
812 } else {
813 format!("realtime:{}", config.schema)
814 }
815 }
816
817 async fn send_join_message(&self, topic: &str, config: &SubscriptionConfig) -> Result<()> {
819 let mut payload = serde_json::Map::new();
820
821 if let Some(ref table) = config.table {
822 payload.insert(
823 "table".to_string(),
824 serde_json::Value::String(table.clone()),
825 );
826 }
827
828 if let Some(ref event) = config.event {
829 let event_str = match event {
830 RealtimeEvent::Insert => "INSERT",
831 RealtimeEvent::Update => "UPDATE",
832 RealtimeEvent::Delete => "DELETE",
833 RealtimeEvent::All => "*",
834 };
835 payload.insert(
836 "event".to_string(),
837 serde_json::Value::String(event_str.to_string()),
838 );
839 }
840
841 if let Some(ref filter) = config.filter {
842 payload.insert(
843 "filter".to_string(),
844 serde_json::Value::String(filter.clone()),
845 );
846 }
847
848 let message = RealtimeProtocolMessage {
849 topic: topic.to_string(),
850 event: "phx_join".to_string(),
851 payload: serde_json::Value::Object(payload),
852 ref_id: Uuid::new_v4().to_string(),
853 };
854
855 self.send_message(&message).await
856 }
857
858 async fn send_leave_message(&self, topic: &str) -> Result<()> {
860 let message = RealtimeProtocolMessage {
861 topic: topic.to_string(),
862 event: "phx_leave".to_string(),
863 payload: serde_json::Value::Object(serde_json::Map::new()),
864 ref_id: Uuid::new_v4().to_string(),
865 };
866
867 self.send_message(&message).await
868 }
869
870 async fn send_message(&self, message: &RealtimeProtocolMessage) -> Result<()> {
872 let message_json = serde_json::to_string(message)?;
873
874 let mut connection_guard = self.connection_manager.connection.write().await;
875 if let Some(ref mut connection) = *connection_guard {
876 connection.send(&message_json).await?;
877 debug!("Sent realtime message: {}", message_json);
878 } else {
879 return Err(Error::realtime("Not connected to realtime server"));
880 }
881
882 Ok(())
883 }
884
885 async fn start_message_loop(&self) -> Result<()> {
887 if self
888 .connection_manager
889 .is_message_loop_running
890 .load(Ordering::SeqCst)
891 {
892 debug!("Message loop already running");
893 return Ok(());
894 }
895
896 self.connection_manager
897 .is_message_loop_running
898 .store(true, Ordering::SeqCst);
899 self.message_loop_handle.store(true, Ordering::SeqCst);
900
901 let connection_manager = Arc::clone(&self.connection_manager);
902 let loop_handle = Arc::clone(&self.message_loop_handle);
903
904 #[cfg(not(target_arch = "wasm32"))]
906 {
907 let connection_manager = Arc::clone(&connection_manager);
908 let loop_handle = Arc::clone(&loop_handle);
909
910 tokio::spawn(async move {
911 Self::message_loop(connection_manager, loop_handle).await;
912 });
913 }
914
915 #[cfg(target_arch = "wasm32")]
916 {
917 wasm_bindgen_futures::spawn_local(async move {
919 Self::message_loop(connection_manager, loop_handle).await;
920 });
921 }
922
923 info!("Started realtime message loop");
924 Ok(())
925 }
926
927 async fn message_loop(
929 connection_manager: Arc<ConnectionManager>,
930 loop_handle: Arc<AtomicBool>,
931 ) {
932 debug!("Starting realtime message loop");
933
934 while loop_handle.load(Ordering::SeqCst) {
935 let message = {
937 let mut connection_guard = connection_manager.connection.write().await;
938
939 if let Some(ref mut connection) = *connection_guard {
940 if !connection.is_connected() {
941 debug!("Connection lost, stopping message loop");
942 break;
943 }
944
945 match connection.receive().await {
946 Ok(Some(msg)) => Some(msg),
947 Ok(None) => None,
948 Err(e) => {
949 error!("Error receiving message: {}", e);
950 None
951 }
952 }
953 } else {
954 debug!("No connection available, stopping message loop");
955 break;
956 }
957 };
958
959 if let Some(message_str) = message {
960 debug!("Received realtime message: {}", message_str);
961
962 match serde_json::from_str::<RealtimeMessage>(&message_str) {
964 Ok(realtime_message) => {
965 Self::process_message(&connection_manager, realtime_message).await;
967 }
968 Err(e) => {
969 debug!(
970 "Failed to parse realtime message: {} - Error: {}",
971 message_str, e
972 );
973 if let Ok(_protocol_msg) =
975 serde_json::from_str::<serde_json::Value>(&message_str)
976 {
977 debug!("Received protocol message, ignoring for now");
978 }
979 }
980 }
981 }
982
983 #[cfg(not(target_arch = "wasm32"))]
985 tokio::time::sleep(Duration::from_millis(10)).await;
986
987 #[cfg(target_arch = "wasm32")]
988 {
989 use wasm_bindgen::prelude::*;
991 use wasm_bindgen_futures::JsFuture;
992
993 let promise = js_sys::Promise::new(&mut |resolve, _| {
994 web_sys::window()
995 .unwrap()
996 .set_timeout_with_callback_and_timeout_and_arguments_0(&resolve, 10)
997 .unwrap();
998 });
999 let _ = JsFuture::from(promise).await;
1000 }
1001 }
1002
1003 connection_manager
1004 .is_message_loop_running
1005 .store(false, Ordering::SeqCst);
1006 debug!("Realtime message loop stopped");
1007 }
1008
1009 async fn process_message(
1011 connection_manager: &Arc<ConnectionManager>,
1012 message: RealtimeMessage,
1013 ) {
1014 debug!("Processing message for topic: {}", message.topic);
1015
1016 let subscriptions = connection_manager.subscriptions.read().await;
1017
1018 let mut matched_subscriptions = Vec::new();
1019
1020 for subscription in subscriptions.values() {
1022 if Self::topic_matches(&subscription.topic, &message.topic) {
1023 if let Some(ref event_filter) = subscription.config.event {
1025 let message_event = match message.event.as_str() {
1026 "INSERT" => Some(RealtimeEvent::Insert),
1027 "UPDATE" => Some(RealtimeEvent::Update),
1028 "DELETE" => Some(RealtimeEvent::Delete),
1029 _ => None,
1030 };
1031
1032 if let Some(msg_event) = message_event {
1033 if *event_filter != RealtimeEvent::All && *event_filter != msg_event {
1034 continue; }
1036 }
1037 }
1038
1039 matched_subscriptions.push(subscription.clone());
1040 }
1041 }
1042
1043 drop(subscriptions); for subscription in matched_subscriptions {
1047 debug!("Calling callback for subscription: {}", subscription.id);
1048 (subscription.callback)(message.clone());
1049 }
1050 }
1051
1052 fn topic_matches(subscription_topic: &str, message_topic: &str) -> bool {
1054 subscription_topic == message_topic || message_topic.starts_with(subscription_topic)
1056 }
1057
1058 pub async fn track_presence(&self, channel: &str, presence_state: PresenceState) -> Result<()> {
1081 debug!(
1082 "Tracking presence for user {} in channel {}",
1083 presence_state.user_id, channel
1084 );
1085
1086 let topic = format!("realtime:{}", channel);
1087 let ref_id = Uuid::new_v4().to_string();
1088
1089 let message = RealtimeProtocolMessage {
1090 topic: topic.clone(),
1091 event: "presence".to_string(),
1092 payload: serde_json::json!({
1093 "event": "track",
1094 "payload": presence_state
1095 }),
1096 ref_id,
1097 };
1098
1099 let mut connection_guard = self.connection_manager.connection.write().await;
1100 if let Some(ref mut connection) = *connection_guard {
1101 let message_json = serde_json::to_string(&message).map_err(|e| {
1102 Error::realtime(format!("Failed to serialize presence message: {}", e))
1103 })?;
1104
1105 connection.send(&message_json).await?;
1106 info!(
1107 "Started tracking presence for user {}",
1108 presence_state.user_id
1109 );
1110 } else {
1111 return Err(Error::realtime("Not connected to realtime server"));
1112 }
1113
1114 Ok(())
1115 }
1116
1117 pub async fn untrack_presence(&self, channel: &str, user_id: &str) -> Result<()> {
1127 debug!(
1128 "Untracking presence for user {} in channel {}",
1129 user_id, channel
1130 );
1131
1132 let topic = format!("realtime:{}", channel);
1133 let ref_id = Uuid::new_v4().to_string();
1134
1135 let message = RealtimeProtocolMessage {
1136 topic: topic.clone(),
1137 event: "presence".to_string(),
1138 payload: serde_json::json!({
1139 "event": "untrack",
1140 "payload": {
1141 "user_id": user_id
1142 }
1143 }),
1144 ref_id,
1145 };
1146
1147 let mut connection_guard = self.connection_manager.connection.write().await;
1148 if let Some(ref mut connection) = *connection_guard {
1149 let message_json = serde_json::to_string(&message).map_err(|e| {
1150 Error::realtime(format!("Failed to serialize presence message: {}", e))
1151 })?;
1152
1153 connection.send(&message_json).await?;
1154 info!("Stopped tracking presence for user {}", user_id);
1155 } else {
1156 return Err(Error::realtime("Not connected to realtime server"));
1157 }
1158
1159 Ok(())
1160 }
1161
1162 pub async fn get_presence(&self, channel: &str) -> Result<Vec<PresenceState>> {
1173 debug!("Getting presence for channel: {}", channel);
1174
1175 let topic = format!("realtime:{}", channel);
1176 let ref_id = Uuid::new_v4().to_string();
1177
1178 let message = RealtimeProtocolMessage {
1179 topic: topic.clone(),
1180 event: "presence".to_string(),
1181 payload: serde_json::json!({
1182 "event": "state"
1183 }),
1184 ref_id,
1185 };
1186
1187 let mut connection_guard = self.connection_manager.connection.write().await;
1188 if let Some(ref mut connection) = *connection_guard {
1189 let message_json = serde_json::to_string(&message).map_err(|e| {
1190 Error::realtime(format!("Failed to serialize presence message: {}", e))
1191 })?;
1192
1193 connection.send(&message_json).await?;
1194
1195 info!("Requested presence state for channel: {}", channel);
1198 Ok(Vec::new())
1199 } else {
1200 Err(Error::realtime("Not connected to realtime server"))
1201 }
1202 }
1203
1204 pub async fn broadcast(
1222 &self,
1223 channel: &str,
1224 event: &str,
1225 payload: serde_json::Value,
1226 from_user_id: Option<&str>,
1227 ) -> Result<()> {
1228 debug!(
1229 "Broadcasting message to channel: {} event: {}",
1230 channel, event
1231 );
1232
1233 let topic = format!("realtime:{}", channel);
1234 let ref_id = Uuid::new_v4().to_string();
1235
1236 let broadcast_message = BroadcastMessage {
1237 event: event.to_string(),
1238 payload,
1239 from_user_id: from_user_id.map(|s| s.to_string()),
1240 timestamp: chrono::Utc::now().to_rfc3339(),
1241 };
1242
1243 let message = RealtimeProtocolMessage {
1244 topic: topic.clone(),
1245 event: "broadcast".to_string(),
1246 payload: serde_json::to_value(broadcast_message)?,
1247 ref_id,
1248 };
1249
1250 let mut connection_guard = self.connection_manager.connection.write().await;
1251 if let Some(ref mut connection) = *connection_guard {
1252 let message_json = serde_json::to_string(&message).map_err(|e| {
1253 Error::realtime(format!("Failed to serialize broadcast message: {}", e))
1254 })?;
1255
1256 connection.send(&message_json).await?;
1257 info!("Sent broadcast message to channel: {}", channel);
1258 } else {
1259 return Err(Error::realtime("Not connected to realtime server"));
1260 }
1261
1262 Ok(())
1263 }
1264
1265 #[cfg(not(target_arch = "wasm32"))]
1306 pub async fn subscribe_advanced<F>(
1307 &self,
1308 channel: &str,
1309 config: SubscriptionConfig,
1310 callback: F,
1311 ) -> Result<String>
1312 where
1313 F: Fn(RealtimeMessage) + Send + Sync + 'static,
1314 {
1315 debug!("Creating advanced subscription for channel: {}", channel);
1316
1317 let subscription_id = Uuid::new_v4().to_string();
1318 let topic = if let Some(ref table) = config.table {
1319 format!("realtime:{}:{}:{}", config.schema, table, channel)
1320 } else {
1321 format!("realtime:{}", channel)
1322 };
1323
1324 let mut filter_parts = Vec::new();
1326
1327 if let Some(ref simple_filter) = config.filter {
1328 filter_parts.push(simple_filter.clone());
1329 }
1330
1331 for advanced_filter in &config.advanced_filters {
1332 let filter_str = match &advanced_filter.value {
1333 serde_json::Value::String(s) => format!(
1334 "{}={}. {}",
1335 advanced_filter.column,
1336 serde_json::to_string(&advanced_filter.operator)?.trim_matches('"'),
1337 s
1338 ),
1339 serde_json::Value::Array(arr) => {
1340 let values: Vec<String> = arr
1341 .iter()
1342 .map(|v| v.to_string().trim_matches('"').to_string())
1343 .collect();
1344 format!(
1345 "{}={}.({})",
1346 advanced_filter.column,
1347 serde_json::to_string(&advanced_filter.operator)?.trim_matches('"'),
1348 values.join(",")
1349 )
1350 }
1351 other => format!(
1352 "{}={}. {}",
1353 advanced_filter.column,
1354 serde_json::to_string(&advanced_filter.operator)?.trim_matches('"'),
1355 other.to_string().trim_matches('"')
1356 ),
1357 };
1358 filter_parts.push(filter_str);
1359 }
1360
1361 let combined_filter = if !filter_parts.is_empty() {
1362 Some(filter_parts.join(" and "))
1363 } else {
1364 None
1365 };
1366
1367 let subscription = Subscription {
1368 id: subscription_id.clone(),
1369 topic: topic.clone(),
1370 config: SubscriptionConfig {
1371 filter: combined_filter,
1372 ..config.clone()
1373 },
1374 callback: Arc::new(callback),
1375 };
1376
1377 {
1379 let mut subscriptions = self.connection_manager.subscriptions.write().await;
1380 subscriptions.insert(subscription_id.clone(), subscription);
1381 }
1382
1383 let ref_id = self
1385 .connection_manager
1386 .ref_counter
1387 .fetch_add(1, Ordering::SeqCst)
1388 .to_string();
1389
1390 let mut join_payload = serde_json::json!({
1391 "config": {
1392 "postgres_changes": [{
1393 "event": config.event.unwrap_or(RealtimeEvent::All),
1394 "schema": config.schema,
1395 }]
1396 }
1397 });
1398
1399 if let Some(ref table) = config.table {
1400 join_payload["config"]["postgres_changes"][0]["table"] =
1401 serde_json::Value::String(table.clone());
1402 }
1403
1404 if let Some(ref filter) = config.filter {
1405 join_payload["config"]["postgres_changes"][0]["filter"] =
1406 serde_json::Value::String(filter.clone());
1407 }
1408
1409 if config.enable_presence {
1411 join_payload["config"]["presence"] = serde_json::json!({ "key": "" });
1412 }
1413
1414 if config.enable_broadcast {
1416 join_payload["config"]["broadcast"] = serde_json::json!({ "self": true });
1417 }
1418
1419 let join_message = RealtimeProtocolMessage {
1420 topic: topic.clone(),
1421 event: "phx_join".to_string(),
1422 payload: join_payload,
1423 ref_id,
1424 };
1425
1426 let mut connection_guard = self.connection_manager.connection.write().await;
1427 if let Some(ref mut connection) = *connection_guard {
1428 let message_json = serde_json::to_string(&join_message)
1429 .map_err(|e| Error::realtime(format!("Failed to serialize join message: {}", e)))?;
1430
1431 connection.send(&message_json).await?;
1432 info!("Advanced subscription created: {}", subscription_id);
1433 } else {
1434 return Err(Error::realtime("Not connected to realtime server"));
1435 }
1436
1437 Ok(subscription_id)
1438 }
1439
1440 #[cfg(target_arch = "wasm32")]
1442 pub async fn subscribe_advanced<F>(
1443 &self,
1444 channel: &str,
1445 config: SubscriptionConfig,
1446 callback: F,
1447 ) -> Result<String>
1448 where
1449 F: Fn(RealtimeMessage) + 'static,
1450 {
1451 debug!("Creating advanced subscription for channel: {}", channel);
1452
1453 let subscription_id = Uuid::new_v4().to_string();
1454 let topic = if let Some(ref table) = config.table {
1455 format!("realtime:{}:{}:{}", config.schema, table, channel)
1456 } else {
1457 format!("realtime:{}", channel)
1458 };
1459
1460 let mut filter_parts = Vec::new();
1462
1463 if let Some(ref simple_filter) = config.filter {
1464 filter_parts.push(simple_filter.clone());
1465 }
1466
1467 for advanced_filter in &config.advanced_filters {
1468 let filter_str = match &advanced_filter.value {
1469 serde_json::Value::String(s) => format!(
1470 "{}={}. {}",
1471 advanced_filter.column,
1472 serde_json::to_string(&advanced_filter.operator)?.trim_matches('"'),
1473 s
1474 ),
1475 serde_json::Value::Array(arr) => {
1476 let values: Vec<String> = arr
1477 .iter()
1478 .map(|v| v.to_string().trim_matches('"').to_string())
1479 .collect();
1480 format!(
1481 "{}={}.({})",
1482 advanced_filter.column,
1483 serde_json::to_string(&advanced_filter.operator)?.trim_matches('"'),
1484 values.join(",")
1485 )
1486 }
1487 other => format!(
1488 "{}={}. {}",
1489 advanced_filter.column,
1490 serde_json::to_string(&advanced_filter.operator)?.trim_matches('"'),
1491 other.to_string().trim_matches('"')
1492 ),
1493 };
1494 filter_parts.push(filter_str);
1495 }
1496
1497 let combined_filter = if !filter_parts.is_empty() {
1498 Some(filter_parts.join(" and "))
1499 } else {
1500 None
1501 };
1502
1503 let subscription = Subscription {
1504 id: subscription_id.clone(),
1505 topic: topic.clone(),
1506 config: SubscriptionConfig {
1507 filter: combined_filter,
1508 ..config.clone()
1509 },
1510 callback: Arc::new(callback),
1511 };
1512
1513 {
1515 let mut subscriptions = self.connection_manager.subscriptions.write().await;
1516 subscriptions.insert(subscription_id.clone(), subscription);
1517 }
1518
1519 let ref_id = self
1521 .connection_manager
1522 .ref_counter
1523 .fetch_add(1, Ordering::SeqCst)
1524 .to_string();
1525
1526 let mut join_payload = serde_json::json!({
1527 "config": {
1528 "postgres_changes": [{
1529 "event": config.event.unwrap_or(RealtimeEvent::All),
1530 "schema": config.schema,
1531 }]
1532 }
1533 });
1534
1535 if let Some(ref table) = config.table {
1536 join_payload["config"]["postgres_changes"][0]["table"] =
1537 serde_json::Value::String(table.clone());
1538 }
1539
1540 if let Some(ref filter) = config.filter {
1541 join_payload["config"]["postgres_changes"][0]["filter"] =
1542 serde_json::Value::String(filter.clone());
1543 }
1544
1545 if config.enable_presence {
1547 join_payload["config"]["presence"] = serde_json::json!({ "key": "" });
1548 }
1549
1550 if config.enable_broadcast {
1552 join_payload["config"]["broadcast"] = serde_json::json!({ "self": true });
1553 }
1554
1555 let join_message = RealtimeProtocolMessage {
1556 topic: topic.clone(),
1557 event: "phx_join".to_string(),
1558 payload: join_payload,
1559 ref_id,
1560 };
1561
1562 let mut connection_guard = self.connection_manager.connection.write().await;
1563 if let Some(ref mut connection) = *connection_guard {
1564 let message_json = serde_json::to_string(&join_message)
1565 .map_err(|e| Error::realtime(format!("Failed to serialize join message: {}", e)))?;
1566
1567 connection.send(&message_json).await?;
1568 info!("Advanced subscription created: {}", subscription_id);
1569 } else {
1570 return Err(Error::realtime("Not connected to realtime server"));
1571 }
1572
1573 Ok(subscription_id)
1574 }
1575}
1576
1577#[cfg(feature = "realtime")]
1601pub struct ChannelBuilder {
1602 realtime: Realtime,
1603 config: SubscriptionConfig,
1604}
1605
1606#[cfg(feature = "realtime")]
1607impl ChannelBuilder {
1608 pub fn table(mut self, table: &str) -> Self {
1625 self.config.table = Some(table.to_string());
1626 self
1627 }
1628
1629 pub fn schema(mut self, schema: &str) -> Self {
1647 self.config.schema = schema.to_string();
1648 self
1649 }
1650
1651 pub fn event(mut self, event: RealtimeEvent) -> Self {
1671 self.config.event = Some(event);
1672 self
1673 }
1674
1675 pub fn filter(mut self, filter: &str) -> Self {
1694 self.config.filter = Some(filter.to_string());
1695 self
1696 }
1697
1698 #[cfg(not(target_arch = "wasm32"))]
1722 pub async fn subscribe<F>(self, callback: F) -> Result<String>
1723 where
1724 F: Fn(RealtimeMessage) + Send + Sync + 'static,
1725 {
1726 self.realtime.subscribe(self.config, callback).await
1727 }
1728
1729 #[cfg(target_arch = "wasm32")]
1731 pub async fn subscribe<F>(self, callback: F) -> Result<String>
1732 where
1733 F: Fn(RealtimeMessage) + 'static,
1734 {
1735 self.realtime.subscribe(self.config, callback).await
1736 }
1737}
1738
1739#[cfg(all(test, feature = "realtime"))]
1740mod tests {
1741 use super::*;
1742 use std::sync::atomic::{AtomicBool, Ordering};
1743 use std::sync::Arc;
1744
1745 #[tokio::test]
1746 async fn test_realtime_creation() {
1747 let config = Arc::new(SupabaseConfig {
1748 url: "https://test.supabase.co".to_string(),
1749 key: "test-key".to_string(),
1750 ..Default::default()
1751 });
1752
1753 let realtime = Realtime::new(config).unwrap();
1754 assert!(!realtime.is_connected().await);
1755 }
1756
1757 #[tokio::test]
1758 async fn test_subscription_config_default() {
1759 let config = SubscriptionConfig::default();
1760 assert_eq!(config.schema, "public");
1761 assert!(config.table.is_none());
1762 assert!(config.event.is_none());
1763 assert!(config.filter.is_none());
1764 }
1765
1766 #[tokio::test]
1767 async fn test_realtime_event_serialization() {
1768 use serde_json;
1769
1770 let event = RealtimeEvent::Insert;
1771 let serialized = serde_json::to_string(&event).unwrap();
1772 assert_eq!(serialized, "\"INSERT\"");
1773
1774 let event = RealtimeEvent::All;
1775 let serialized = serde_json::to_string(&event).unwrap();
1776 assert_eq!(serialized, "\"*\"");
1777 }
1778
1779 #[tokio::test]
1780 async fn test_build_topic() {
1781 let config = Arc::new(SupabaseConfig {
1782 url: "https://test.supabase.co".to_string(),
1783 key: "test-key".to_string(),
1784 ..Default::default()
1785 });
1786
1787 let realtime = Realtime::new(config).unwrap();
1788
1789 let subscription_config = SubscriptionConfig {
1791 table: Some("posts".to_string()),
1792 schema: "public".to_string(),
1793 event: None,
1794 filter: None,
1795 ..Default::default()
1796 };
1797 let topic = realtime.build_topic(&subscription_config);
1798 assert_eq!(topic, "realtime:public:posts");
1799
1800 let subscription_config = SubscriptionConfig {
1802 table: None,
1803 schema: "admin".to_string(),
1804 event: None,
1805 filter: None,
1806 ..Default::default()
1807 };
1808 let topic = realtime.build_topic(&subscription_config);
1809 assert_eq!(topic, "realtime:admin");
1810 }
1811
1812 #[tokio::test]
1813 async fn test_topic_matching() {
1814 assert!(Realtime::topic_matches(
1816 "realtime:public:posts",
1817 "realtime:public:posts"
1818 ));
1819
1820 assert!(Realtime::topic_matches(
1822 "realtime:public",
1823 "realtime:public:posts"
1824 ));
1825
1826 assert!(!Realtime::topic_matches(
1828 "realtime:public:users",
1829 "realtime:public:posts"
1830 ));
1831 }
1832
1833 #[tokio::test]
1834 async fn test_realtime_message_parsing() {
1835 let json = r#"{
1836 "event": "INSERT",
1837 "payload": {
1838 "record": {"id": 1, "title": "Test"},
1839 "schema": "public",
1840 "table": "posts"
1841 },
1842 "topic": "realtime:public:posts"
1843 }"#;
1844
1845 let message = serde_json::from_str::<RealtimeMessage>(json);
1846 assert!(message.is_ok());
1847
1848 let message = message.unwrap();
1849 assert_eq!(message.event, "INSERT");
1850 assert_eq!(message.topic, "realtime:public:posts");
1851 assert!(message.payload.record.is_some());
1852 }
1853
1854 #[tokio::test]
1855 async fn test_channel_builder() {
1856 let config = Arc::new(SupabaseConfig {
1857 url: "https://test.supabase.co".to_string(),
1858 key: "test-key".to_string(),
1859 ..Default::default()
1860 });
1861
1862 let realtime = Realtime::new(config).unwrap();
1863 let builder = realtime.channel("test");
1864
1865 let builder = builder
1867 .table("posts")
1868 .schema("public")
1869 .event(RealtimeEvent::Insert)
1870 .filter("author_id=eq.123");
1871
1872 assert_eq!(builder.config.table, Some("posts".to_string()));
1873 assert_eq!(builder.config.schema, "public");
1874 assert_eq!(builder.config.event, Some(RealtimeEvent::Insert));
1875 assert_eq!(builder.config.filter, Some("author_id=eq.123".to_string()));
1876 }
1877
1878 #[cfg(not(target_arch = "wasm32"))] #[tokio::test]
1880 async fn test_subscription_callback() {
1881 let config = Arc::new(SupabaseConfig {
1882 url: "https://test.supabase.co".to_string(),
1883 key: "test-key".to_string(),
1884 ..Default::default()
1885 });
1886
1887 let realtime = Realtime::new(config).unwrap();
1888
1889 let called = Arc::new(AtomicBool::new(false));
1891 let called_clone = Arc::clone(&called);
1892
1893 let subscription_config = SubscriptionConfig {
1894 table: Some("test".to_string()),
1895 schema: "public".to_string(),
1896 event: Some(RealtimeEvent::All),
1897 filter: None,
1898 ..Default::default()
1899 };
1900
1901 let result = realtime
1903 .subscribe(subscription_config, move |_msg| {
1904 called_clone.store(true, Ordering::SeqCst);
1905 })
1906 .await;
1907
1908 assert!(result.is_err());
1910 assert!(!called.load(Ordering::SeqCst));
1911 }
1912
1913 #[tokio::test]
1914 async fn test_protocol_message_serialization() {
1915 let message = RealtimeProtocolMessage {
1916 topic: "realtime:public:posts".to_string(),
1917 event: "phx_join".to_string(),
1918 payload: serde_json::json!({"table": "posts"}),
1919 ref_id: "123".to_string(),
1920 };
1921
1922 let serialized = serde_json::to_string(&message).unwrap();
1923 assert!(serialized.contains("phx_join"));
1924 assert!(serialized.contains("realtime:public:posts"));
1925 assert!(serialized.contains("posts"));
1926 }
1927
1928 #[tokio::test]
1929 async fn test_event_filter_matching() {
1930 let insert_event = Some(RealtimeEvent::Insert);
1932 let update_event = Some(RealtimeEvent::Update);
1933 let all_event = Some(RealtimeEvent::All);
1934
1935 assert_eq!(insert_event, Some(RealtimeEvent::Insert));
1937
1938 assert_ne!(insert_event, update_event);
1940
1941 assert_eq!(all_event, Some(RealtimeEvent::All));
1943 }
1944}