supabase_realtime_rs/channel/
postgres_changes.rs1use serde::{Deserialize, Serialize};
2use serde_json::Value;
3use std::collections::HashMap;
4
5#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
6#[serde(rename_all = "UPPERCASE")]
7pub enum PostgresChangeEvent {
8 #[serde(rename = "*")]
9 All,
10 Insert,
11 Update,
12 Delete,
13}
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct PostgresChangesFilter {
17 pub event: PostgresChangeEvent,
18 pub schema: String,
19
20 #[serde(skip_serializing_if = "Option::is_none")]
21 pub table: Option<String>,
22 #[serde(skip_serializing_if = "Option::is_none")]
23 pub filter: Option<String>,
24}
25
26impl PostgresChangesFilter {
27 pub fn new(event: PostgresChangeEvent, schema: impl Into<String>) -> Self {
28 Self {
29 event,
30 schema: schema.into(),
31 table: None,
32 filter: None,
33 }
34 }
35
36 pub fn table(mut self, table: impl Into<String>) -> Self {
37 self.table = Some(table.into());
38 self
39 }
40 pub fn filter(mut self, filter: impl Into<String>) -> Self {
41 self.filter = Some(filter.into());
42 self
43 }
44
45 pub fn to_hash_map(&self) -> HashMap<String, String> {
46 let mut map = HashMap::new();
47 map.insert(
48 "event".to_string(),
49 serde_json::to_string(&self.event)
50 .expect("Failed to serialize event to string")
51 .replace("\"", ""),
52 );
53 map.insert("schema".to_string(), self.schema.clone());
54 if let Some(table) = &self.table {
55 map.insert("table".to_string(), table.clone());
56 }
57 if let Some(filter) = &self.filter {
58 map.insert("filter".to_string(), filter.clone());
59 }
60 map
61 }
62}
63
64#[derive(Serialize, Deserialize, Debug, Clone)]
65pub struct ColumnInfo {
66 pub name: String,
67 #[serde(rename = "type")]
68 pub column_type: String,
69}
70
71#[derive(Serialize, Deserialize, Debug, Clone)]
72pub struct PostgresChangesPayloadBase {
73 pub schema: String,
74 pub table: String,
75 pub commit_timestamp: String,
76 #[serde(default)]
77 pub errors: Option<Vec<String>>,
78 #[serde(default)]
79 pub columns: Vec<ColumnInfo>,
80}
81
82#[derive(Deserialize, Debug)]
84struct PostgreInsertPayloadRaw {
85 #[serde(flatten)]
86 base: PostgresChangesPayloadBase,
87 #[serde(default)]
88 record: HashMap<String, Value>,
89}
90
91#[derive(Serialize, Debug, Clone)]
92pub struct PostgreInsertPayload {
93 #[serde(flatten)]
94 pub base: PostgresChangesPayloadBase,
95 pub new: HashMap<String, Value>,
96}
97
98impl<'de> Deserialize<'de> for PostgreInsertPayload {
99 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
100 where
101 D: serde::Deserializer<'de>,
102 {
103 let raw = PostgreInsertPayloadRaw::deserialize(deserializer)?;
104 Ok(PostgreInsertPayload {
105 base: raw.base,
106 new: raw.record,
107 })
108 }
109}
110
111#[derive(Deserialize, Debug)]
113struct PostgresUpdatePayloadRaw {
114 #[serde(flatten)]
115 base: PostgresChangesPayloadBase,
116 #[serde(default)]
117 record: HashMap<String, Value>,
118 #[serde(default)]
119 old_record: HashMap<String, Value>,
120}
121
122#[derive(Serialize, Debug, Clone)]
123pub struct PostgresUpdatePayload {
124 #[serde(flatten)]
125 pub base: PostgresChangesPayloadBase,
126 pub new: HashMap<String, Value>,
127 pub old: HashMap<String, Value>,
128}
129
130impl<'de> Deserialize<'de> for PostgresUpdatePayload {
131 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
132 where
133 D: serde::Deserializer<'de>,
134 {
135 let raw = PostgresUpdatePayloadRaw::deserialize(deserializer)?;
136 Ok(PostgresUpdatePayload {
137 base: raw.base,
138 new: raw.record,
139 old: raw.old_record,
140 })
141 }
142}
143
144#[derive(Deserialize, Debug)]
146struct PostgresDeletePayloadRaw {
147 #[serde(flatten)]
148 base: PostgresChangesPayloadBase,
149 #[serde(default)]
150 old_record: HashMap<String, Value>,
151}
152
153#[derive(Serialize, Debug, Clone)]
154pub struct PostgresDeletePayload {
155 #[serde(flatten)]
156 pub base: PostgresChangesPayloadBase,
157 pub old: HashMap<String, Value>,
158}
159
160impl<'de> Deserialize<'de> for PostgresDeletePayload {
161 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
162 where
163 D: serde::Deserializer<'de>,
164 {
165 let raw = PostgresDeletePayloadRaw::deserialize(deserializer)?;
166 Ok(PostgresDeletePayload {
167 base: raw.base,
168 old: raw.old_record,
169 })
170 }
171}
172
173#[derive(Serialize, Deserialize, Debug, Clone)]
174#[serde(tag = "type", rename_all = "UPPERCASE")]
175pub enum PostgresChangesPayload {
176 Insert(PostgreInsertPayload),
177 Update(PostgresUpdatePayload),
178 Delete(PostgresDeletePayload),
179}
180
181impl PostgresChangesPayload {
182 pub fn schema(&self) -> &str {
183 match self {
184 Self::Insert(payload) => &payload.base.schema,
185 Self::Update(payload) => &payload.base.schema,
186 Self::Delete(payload) => &payload.base.schema,
187 }
188 }
189
190 pub fn table(&self) -> &str {
191 match self {
192 Self::Insert(payload) => &payload.base.table,
193 Self::Update(payload) => &payload.base.table,
194 Self::Delete(payload) => &payload.base.table,
195 }
196 }
197
198 pub fn commit_timestamp(&self) -> &str {
199 match self {
200 Self::Insert(payload) => &payload.base.commit_timestamp,
201 Self::Update(payload) => &payload.base.commit_timestamp,
202 Self::Delete(payload) => &payload.base.commit_timestamp,
203 }
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210
211 #[test]
212 fn test_deserialize_insert_payload_from_server_format() {
213 let json = r#"{
215 "columns": [
216 {"name": "id", "type": "int8"},
217 {"name": "created_at", "type": "timestamptz"},
218 {"name": "name", "type": "text"}
219 ],
220 "commit_timestamp": "2025-11-27T16:16:54.545Z",
221 "errors": null,
222 "record": {
223 "created_at": "2025-11-27T16:16:54.541196+00:00",
224 "id": 47,
225 "name": "jena"
226 },
227 "schema": "public",
228 "table": "users",
229 "type": "INSERT"
230 }"#;
231
232 let payload: PostgresChangesPayload = serde_json::from_str(json).unwrap();
233
234 match payload {
235 PostgresChangesPayload::Insert(insert) => {
236 assert_eq!(insert.base.schema, "public");
237 assert_eq!(insert.base.table, "users");
238 assert_eq!(insert.base.columns.len(), 3);
239 assert_eq!(insert.new.get("name").unwrap().as_str().unwrap(), "jena");
240 assert_eq!(insert.new.get("id").unwrap().as_i64().unwrap(), 47);
241 }
242 _ => panic!("Expected Insert variant"),
243 }
244 }
245
246 #[test]
247 fn test_deserialize_update_payload_from_server_format() {
248 let json = r#"{
249 "columns": [
250 {"name": "id", "type": "int8"},
251 {"name": "name", "type": "text"}
252 ],
253 "commit_timestamp": "2025-11-27T16:20:00.000Z",
254 "errors": null,
255 "record": {
256 "id": 47,
257 "name": "new_name"
258 },
259 "old_record": {
260 "id": 47,
261 "name": "old_name"
262 },
263 "schema": "public",
264 "table": "users",
265 "type": "UPDATE"
266 }"#;
267
268 let payload: PostgresChangesPayload = serde_json::from_str(json).unwrap();
269
270 match payload {
271 PostgresChangesPayload::Update(update) => {
272 assert_eq!(update.base.schema, "public");
273 assert_eq!(update.base.table, "users");
274 assert_eq!(
275 update.new.get("name").unwrap().as_str().unwrap(),
276 "new_name"
277 );
278 assert_eq!(
279 update.old.get("name").unwrap().as_str().unwrap(),
280 "old_name"
281 );
282 }
283 _ => panic!("Expected Update variant"),
284 }
285 }
286
287 #[test]
288 fn test_deserialize_delete_payload_from_server_format() {
289 let json = r#"{
290 "columns": [
291 {"name": "id", "type": "int8"},
292 {"name": "name", "type": "text"}
293 ],
294 "commit_timestamp": "2025-11-27T16:25:00.000Z",
295 "errors": null,
296 "old_record": {
297 "id": 47,
298 "name": "deleted_name"
299 },
300 "schema": "public",
301 "table": "users",
302 "type": "DELETE"
303 }"#;
304
305 let payload: PostgresChangesPayload = serde_json::from_str(json).unwrap();
306
307 match payload {
308 PostgresChangesPayload::Delete(delete) => {
309 assert_eq!(delete.base.schema, "public");
310 assert_eq!(delete.base.table, "users");
311 assert_eq!(
312 delete.old.get("name").unwrap().as_str().unwrap(),
313 "deleted_name"
314 );
315 }
316 _ => panic!("Expected Delete variant"),
317 }
318 }
319}