supabase_realtime_rs/channel/
postgres_changes.rs

1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3use std::collections::HashMap;
4
5#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
6#[serde(rename_all = "UPPERCASE")]
7pub enum PostgresChangeEvent {
8    #[serde(rename = "*")]
9    All,
10    Insert,
11    Update,
12    Delete,
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct PostgresChangesFilter {
17    pub event: PostgresChangeEvent,
18    pub schema: String,
19
20    #[serde(skip_serializing_if = "Option::is_none")]
21    pub table: Option<String>,
22    #[serde(skip_serializing_if = "Option::is_none")]
23    pub filter: Option<String>,
24}
25
26impl PostgresChangesFilter {
27    pub fn new(event: PostgresChangeEvent, schema: impl Into<String>) -> Self {
28        Self {
29            event,
30            schema: schema.into(),
31            table: None,
32            filter: None,
33        }
34    }
35
36    pub fn table(mut self, table: impl Into<String>) -> Self {
37        self.table = Some(table.into());
38        self
39    }
40    pub fn filter(mut self, filter: impl Into<String>) -> Self {
41        self.filter = Some(filter.into());
42        self
43    }
44
45    pub fn to_hash_map(&self) -> HashMap<String, String> {
46        let mut map = HashMap::new();
47        map.insert(
48            "event".to_string(),
49            serde_json::to_string(&self.event)
50                .expect("Failed to serialize event to string")
51                .replace("\"", ""),
52        );
53        map.insert("schema".to_string(), self.schema.clone());
54        if let Some(table) = &self.table {
55            map.insert("table".to_string(), table.clone());
56        }
57        if let Some(filter) = &self.filter {
58            map.insert("filter".to_string(), filter.clone());
59        }
60        map
61    }
62}
63
64#[derive(Serialize, Deserialize, Debug, Clone)]
65pub struct ColumnInfo {
66    pub name: String,
67    #[serde(rename = "type")]
68    pub column_type: String,
69}
70
71#[derive(Serialize, Deserialize, Debug, Clone)]
72pub struct PostgresChangesPayloadBase {
73    pub schema: String,
74    pub table: String,
75    pub commit_timestamp: String,
76    #[serde(default)]
77    pub errors: Option<Vec<String>>,
78    #[serde(default)]
79    pub columns: Vec<ColumnInfo>,
80}
81
82// Internal struct for deserializing server format
83#[derive(Deserialize, Debug)]
84struct PostgreInsertPayloadRaw {
85    #[serde(flatten)]
86    base: PostgresChangesPayloadBase,
87    #[serde(default)]
88    record: HashMap<String, Value>,
89}
90
91#[derive(Serialize, Debug, Clone)]
92pub struct PostgreInsertPayload {
93    #[serde(flatten)]
94    pub base: PostgresChangesPayloadBase,
95    pub new: HashMap<String, Value>,
96}
97
98impl<'de> Deserialize<'de> for PostgreInsertPayload {
99    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
100    where
101        D: serde::Deserializer<'de>,
102    {
103        let raw = PostgreInsertPayloadRaw::deserialize(deserializer)?;
104        Ok(PostgreInsertPayload {
105            base: raw.base,
106            new: raw.record,
107        })
108    }
109}
110
111// Internal struct for deserializing server format
112#[derive(Deserialize, Debug)]
113struct PostgresUpdatePayloadRaw {
114    #[serde(flatten)]
115    base: PostgresChangesPayloadBase,
116    #[serde(default)]
117    record: HashMap<String, Value>,
118    #[serde(default)]
119    old_record: HashMap<String, Value>,
120}
121
122#[derive(Serialize, Debug, Clone)]
123pub struct PostgresUpdatePayload {
124    #[serde(flatten)]
125    pub base: PostgresChangesPayloadBase,
126    pub new: HashMap<String, Value>,
127    pub old: HashMap<String, Value>,
128}
129
130impl<'de> Deserialize<'de> for PostgresUpdatePayload {
131    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
132    where
133        D: serde::Deserializer<'de>,
134    {
135        let raw = PostgresUpdatePayloadRaw::deserialize(deserializer)?;
136        Ok(PostgresUpdatePayload {
137            base: raw.base,
138            new: raw.record,
139            old: raw.old_record,
140        })
141    }
142}
143
144// Internal struct for deserializing server format
145#[derive(Deserialize, Debug)]
146struct PostgresDeletePayloadRaw {
147    #[serde(flatten)]
148    base: PostgresChangesPayloadBase,
149    #[serde(default)]
150    old_record: HashMap<String, Value>,
151}
152
153#[derive(Serialize, Debug, Clone)]
154pub struct PostgresDeletePayload {
155    #[serde(flatten)]
156    pub base: PostgresChangesPayloadBase,
157    pub old: HashMap<String, Value>,
158}
159
160impl<'de> Deserialize<'de> for PostgresDeletePayload {
161    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
162    where
163        D: serde::Deserializer<'de>,
164    {
165        let raw = PostgresDeletePayloadRaw::deserialize(deserializer)?;
166        Ok(PostgresDeletePayload {
167            base: raw.base,
168            old: raw.old_record,
169        })
170    }
171}
172
173#[derive(Serialize, Deserialize, Debug, Clone)]
174#[serde(tag = "type", rename_all = "UPPERCASE")]
175pub enum PostgresChangesPayload {
176    Insert(PostgreInsertPayload),
177    Update(PostgresUpdatePayload),
178    Delete(PostgresDeletePayload),
179}
180
181impl PostgresChangesPayload {
182    pub fn schema(&self) -> &str {
183        match self {
184            Self::Insert(payload) => &payload.base.schema,
185            Self::Update(payload) => &payload.base.schema,
186            Self::Delete(payload) => &payload.base.schema,
187        }
188    }
189
190    pub fn table(&self) -> &str {
191        match self {
192            Self::Insert(payload) => &payload.base.table,
193            Self::Update(payload) => &payload.base.table,
194            Self::Delete(payload) => &payload.base.table,
195        }
196    }
197
198    pub fn commit_timestamp(&self) -> &str {
199        match self {
200            Self::Insert(payload) => &payload.base.commit_timestamp,
201            Self::Update(payload) => &payload.base.commit_timestamp,
202            Self::Delete(payload) => &payload.base.commit_timestamp,
203        }
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210
211    #[test]
212    fn test_deserialize_insert_payload_from_server_format() {
213        // This is the actual format sent by Supabase server
214        let json = r#"{
215            "columns": [
216                {"name": "id", "type": "int8"},
217                {"name": "created_at", "type": "timestamptz"},
218                {"name": "name", "type": "text"}
219            ],
220            "commit_timestamp": "2025-11-27T16:16:54.545Z",
221            "errors": null,
222            "record": {
223                "created_at": "2025-11-27T16:16:54.541196+00:00",
224                "id": 47,
225                "name": "jena"
226            },
227            "schema": "public",
228            "table": "users",
229            "type": "INSERT"
230        }"#;
231
232        let payload: PostgresChangesPayload = serde_json::from_str(json).unwrap();
233
234        match payload {
235            PostgresChangesPayload::Insert(insert) => {
236                assert_eq!(insert.base.schema, "public");
237                assert_eq!(insert.base.table, "users");
238                assert_eq!(insert.base.columns.len(), 3);
239                assert_eq!(insert.new.get("name").unwrap().as_str().unwrap(), "jena");
240                assert_eq!(insert.new.get("id").unwrap().as_i64().unwrap(), 47);
241            }
242            _ => panic!("Expected Insert variant"),
243        }
244    }
245
246    #[test]
247    fn test_deserialize_update_payload_from_server_format() {
248        let json = r#"{
249            "columns": [
250                {"name": "id", "type": "int8"},
251                {"name": "name", "type": "text"}
252            ],
253            "commit_timestamp": "2025-11-27T16:20:00.000Z",
254            "errors": null,
255            "record": {
256                "id": 47,
257                "name": "new_name"
258            },
259            "old_record": {
260                "id": 47,
261                "name": "old_name"
262            },
263            "schema": "public",
264            "table": "users",
265            "type": "UPDATE"
266        }"#;
267
268        let payload: PostgresChangesPayload = serde_json::from_str(json).unwrap();
269
270        match payload {
271            PostgresChangesPayload::Update(update) => {
272                assert_eq!(update.base.schema, "public");
273                assert_eq!(update.base.table, "users");
274                assert_eq!(
275                    update.new.get("name").unwrap().as_str().unwrap(),
276                    "new_name"
277                );
278                assert_eq!(
279                    update.old.get("name").unwrap().as_str().unwrap(),
280                    "old_name"
281                );
282            }
283            _ => panic!("Expected Update variant"),
284        }
285    }
286
287    #[test]
288    fn test_deserialize_delete_payload_from_server_format() {
289        let json = r#"{
290            "columns": [
291                {"name": "id", "type": "int8"},
292                {"name": "name", "type": "text"}
293            ],
294            "commit_timestamp": "2025-11-27T16:25:00.000Z",
295            "errors": null,
296            "old_record": {
297                "id": 47,
298                "name": "deleted_name"
299            },
300            "schema": "public",
301            "table": "users",
302            "type": "DELETE"
303        }"#;
304
305        let payload: PostgresChangesPayload = serde_json::from_str(json).unwrap();
306
307        match payload {
308            PostgresChangesPayload::Delete(delete) => {
309                assert_eq!(delete.base.schema, "public");
310                assert_eq!(delete.base.table, "users");
311                assert_eq!(
312                    delete.old.get("name").unwrap().as_str().unwrap(),
313                    "deleted_name"
314                );
315            }
316            _ => panic!("Expected Delete variant"),
317        }
318    }
319}