Skip to main content

supabase_client_realtime/
types.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::time::Duration;
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7
8// ── Phoenix Protocol ──────────────────────────────────────────────────────────
9
10/// A Phoenix Channels protocol message (v1.0.0).
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct PhoenixMessage {
13    pub event: String,
14    pub topic: String,
15    pub payload: Value,
16    #[serde(rename = "ref")]
17    pub msg_ref: Option<String>,
18    pub join_ref: Option<String>,
19}
20
21// ── Channel State ─────────────────────────────────────────────────────────────
22
23/// The lifecycle state of a channel.
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ChannelState {
26    Closed,
27    Joining,
28    Joined,
29    Leaving,
30    Errored,
31}
32
33impl fmt::Display for ChannelState {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        match self {
36            Self::Closed => write!(f, "closed"),
37            Self::Joining => write!(f, "joining"),
38            Self::Joined => write!(f, "joined"),
39            Self::Leaving => write!(f, "leaving"),
40            Self::Errored => write!(f, "errored"),
41        }
42    }
43}
44
45// ── Subscription Status ───────────────────────────────────────────────────────
46
47/// Status reported to the user's subscribe callback.
48/// Matches JS: SUBSCRIBED, TIMED_OUT, CLOSED, CHANNEL_ERROR.
49#[derive(Debug, Clone, Copy, PartialEq, Eq)]
50pub enum SubscriptionStatus {
51    Subscribed,
52    TimedOut,
53    Closed,
54    ChannelError,
55}
56
57impl fmt::Display for SubscriptionStatus {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        match self {
60            Self::Subscribed => write!(f, "SUBSCRIBED"),
61            Self::TimedOut => write!(f, "TIMED_OUT"),
62            Self::Closed => write!(f, "CLOSED"),
63            Self::ChannelError => write!(f, "CHANNEL_ERROR"),
64        }
65    }
66}
67
68// ── Postgres Changes ──────────────────────────────────────────────────────────
69
70/// Which Postgres change events to listen for.
71#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
72pub enum PostgresChangesEvent {
73    #[serde(rename = "*")]
74    All,
75    #[serde(rename = "INSERT")]
76    Insert,
77    #[serde(rename = "UPDATE")]
78    Update,
79    #[serde(rename = "DELETE")]
80    Delete,
81}
82
83impl fmt::Display for PostgresChangesEvent {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        match self {
86            Self::All => write!(f, "*"),
87            Self::Insert => write!(f, "INSERT"),
88            Self::Update => write!(f, "UPDATE"),
89            Self::Delete => write!(f, "DELETE"),
90        }
91    }
92}
93
94/// Filter for postgres_changes subscriptions.
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct PostgresChangesFilter {
97    pub event: String,
98    pub schema: String,
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub table: Option<String>,
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub filter: Option<String>,
103}
104
105impl PostgresChangesFilter {
106    /// Create a new filter for the given schema and table.
107    pub fn new(schema: impl Into<String>, table: impl Into<String>) -> Self {
108        Self {
109            event: "*".to_string(),
110            schema: schema.into(),
111            table: Some(table.into()),
112            filter: None,
113        }
114    }
115
116    /// Create a schema-level filter (no specific table).
117    pub fn schema_only(schema: impl Into<String>) -> Self {
118        Self {
119            event: "*".to_string(),
120            schema: schema.into(),
121            table: None,
122            filter: None,
123        }
124    }
125
126    /// Set the event type for this filter.
127    pub fn event(mut self, event: PostgresChangesEvent) -> Self {
128        self.event = event.to_string();
129        self
130    }
131
132    /// Add a row-level filter (e.g., "id=eq.1").
133    pub fn with_filter(mut self, filter: impl Into<String>) -> Self {
134        self.filter = Some(filter.into());
135        self
136    }
137}
138
139/// Payload delivered for a postgres_changes event.
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct PostgresChangePayload {
142    pub schema: String,
143    pub table: String,
144    #[serde(rename = "type")]
145    pub change_type: String,
146    #[serde(default)]
147    pub commit_timestamp: Option<String>,
148    #[serde(default)]
149    pub columns: Vec<ColumnInfo>,
150    #[serde(default)]
151    pub record: Option<Value>,
152    #[serde(default)]
153    pub old_record: Option<Value>,
154    #[serde(default)]
155    pub errors: Option<Value>,
156}
157
158/// Column metadata from a postgres_changes payload.
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct ColumnInfo {
161    pub name: String,
162    #[serde(rename = "type")]
163    pub column_type: String,
164}
165
166// ── Presence ──────────────────────────────────────────────────────────────────
167
168/// Full presence state: key → list of presence metas.
169pub type PresenceState = HashMap<String, Vec<PresenceMeta>>;
170
171/// Metadata associated with a single presence entry.
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct PresenceMeta {
174    #[serde(default)]
175    pub phx_ref: Option<String>,
176    #[serde(default)]
177    pub phx_ref_prev: Option<String>,
178    #[serde(flatten)]
179    pub data: Value,
180}
181
182/// A presence diff message from the server.
183#[derive(Debug, Clone, Serialize, Deserialize)]
184pub struct PresenceDiff {
185    pub joins: HashMap<String, PresenceEntry>,
186    pub leaves: HashMap<String, PresenceEntry>,
187}
188
189/// A single presence entry containing its metas.
190#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct PresenceEntry {
192    pub metas: Vec<PresenceMeta>,
193}
194
195// ── Join Payload ──────────────────────────────────────────────────────────────
196
197/// The payload sent with `phx_join` to configure channel features.
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct JoinPayload {
200    pub config: JoinConfig,
201    #[serde(skip_serializing_if = "Option::is_none")]
202    pub access_token: Option<String>,
203}
204
205/// Channel configuration sent during join.
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct JoinConfig {
208    pub broadcast: BroadcastConfig,
209    pub presence: PresenceConfig,
210    pub postgres_changes: Vec<PostgresChangesFilter>,
211}
212
213/// Broadcast feature configuration.
214#[derive(Debug, Clone, Serialize, Deserialize)]
215pub struct BroadcastConfig {
216    pub ack: bool,
217    #[serde(rename = "self")]
218    pub self_send: bool,
219}
220
221impl Default for BroadcastConfig {
222    fn default() -> Self {
223        Self {
224            ack: false,
225            self_send: false,
226        }
227    }
228}
229
230/// Presence feature configuration.
231#[derive(Debug, Clone, Serialize, Deserialize)]
232pub struct PresenceConfig {
233    pub key: String,
234}
235
236impl Default for PresenceConfig {
237    fn default() -> Self {
238        Self {
239            key: String::new(),
240        }
241    }
242}
243
244// ── Realtime Client Config ────────────────────────────────────────────────────
245
246/// Configuration for the RealtimeClient.
247#[derive(Debug, Clone)]
248pub struct RealtimeConfig {
249    /// The Supabase project URL (http/https).
250    pub url: String,
251    /// The Supabase API key.
252    pub api_key: String,
253    /// Heartbeat interval (default: 25s).
254    pub heartbeat_interval: Duration,
255    /// Timeout for subscribe operations (default: 10s).
256    pub subscribe_timeout: Duration,
257    /// Reconnection backoff intervals.
258    pub reconnect: ReconnectConfig,
259    /// Custom headers to include in the WebSocket handshake request.
260    pub headers: HashMap<String, String>,
261}
262
263impl RealtimeConfig {
264    pub fn new(url: impl Into<String>, api_key: impl Into<String>) -> Self {
265        Self {
266            url: url.into(),
267            api_key: api_key.into(),
268            heartbeat_interval: Duration::from_secs(25),
269            subscribe_timeout: Duration::from_secs(10),
270            reconnect: ReconnectConfig::default(),
271            headers: HashMap::new(),
272        }
273    }
274
275    /// Set custom headers for the WebSocket handshake.
276    pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
277        self.headers = headers;
278        self
279    }
280}
281
282/// Reconnection backoff configuration.
283#[derive(Debug, Clone)]
284pub struct ReconnectConfig {
285    /// Backoff intervals to try in order.
286    pub intervals: Vec<Duration>,
287    /// Fallback interval once all intervals are exhausted.
288    pub fallback: Duration,
289}
290
291impl Default for ReconnectConfig {
292    fn default() -> Self {
293        Self {
294            intervals: vec![
295                Duration::from_secs(1),
296                Duration::from_secs(2),
297                Duration::from_secs(5),
298                Duration::from_secs(10),
299            ],
300            fallback: Duration::from_secs(10),
301        }
302    }
303}