use serde::Deserialize;
use serde_json::Value;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PostgresChangeKind {
All,
Insert,
Update,
Delete,
}
impl PostgresChangeKind {
pub fn as_str(self) -> &'static str {
match self {
Self::All => "*",
Self::Insert => "INSERT",
Self::Update => "UPDATE",
Self::Delete => "DELETE",
}
}
pub fn matches(self, change_type: &str) -> bool {
match self {
Self::All => true,
Self::Insert => change_type.eq_ignore_ascii_case("INSERT"),
Self::Update => change_type.eq_ignore_ascii_case("UPDATE"),
Self::Delete => change_type.eq_ignore_ascii_case("DELETE"),
}
}
}
#[derive(Debug, Clone)]
pub struct PostgresChangesFilter {
pub(crate) event: PostgresChangeKind,
pub(crate) schema: String,
pub(crate) table: Option<String>,
pub(crate) filter: Option<String>,
}
impl PostgresChangesFilter {
pub fn new(event: PostgresChangeKind) -> Self {
Self {
event,
schema: "public".to_string(),
table: None,
filter: None,
}
}
pub fn schema(mut self, schema: impl Into<String>) -> Self {
self.schema = schema.into();
self
}
pub fn table(mut self, table: impl Into<String>) -> Self {
self.table = Some(table.into());
self
}
pub fn filter(mut self, filter: impl Into<String>) -> Self {
self.filter = Some(filter.into());
self
}
pub(crate) fn to_json(&self) -> serde_json::Value {
let mut obj = serde_json::json!({
"event": self.event.as_str(),
"schema": self.schema,
});
if let Some(t) = &self.table {
obj["table"] = serde_json::Value::String(t.clone());
}
if let Some(f) = &self.filter {
obj["filter"] = serde_json::Value::String(f.clone());
}
obj
}
}
#[derive(Debug, Clone, Deserialize)]
pub struct PostgresChangePayload {
#[serde(default, rename = "type")]
pub change_type: String,
#[serde(default)]
pub schema: String,
#[serde(default)]
pub table: String,
#[serde(default)]
pub commit_timestamp: Option<String>,
#[serde(default)]
pub record: Value,
#[serde(default)]
pub old_record: Value,
#[serde(default)]
pub errors: Option<Value>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct BroadcastPayload {
pub event: String,
#[serde(default)]
pub payload: Value,
}
#[derive(Debug, Clone, Deserialize)]
pub struct PresencePayload(pub Value);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PresenceEvent {
Sync,
Join,
Leave,
}
impl PresenceEvent {
pub fn as_str(self) -> &'static str {
match self {
Self::Sync => "sync",
Self::Join => "join",
Self::Leave => "leave",
}
}
}
#[derive(Debug, Clone)]
pub enum RealtimeEvent {
PostgresChange(PostgresChangePayload),
Broadcast(BroadcastPayload),
PresenceSync(Value),
PresenceDiff(Value),
System(Value),
Error(String),
Closed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SubscriptionStatus {
Subscribing,
Subscribed,
TimedOut,
Closed,
ChannelError,
}
impl SubscriptionStatus {
pub fn as_str(self) -> &'static str {
match self {
Self::Subscribing => "SUBSCRIBING",
Self::Subscribed => "SUBSCRIBED",
Self::TimedOut => "TIMED_OUT",
Self::Closed => "CLOSED",
Self::ChannelError => "CHANNEL_ERROR",
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn kind_as_str_all_variants() {
assert_eq!(PostgresChangeKind::All.as_str(), "*");
assert_eq!(PostgresChangeKind::Insert.as_str(), "INSERT");
assert_eq!(PostgresChangeKind::Update.as_str(), "UPDATE");
assert_eq!(PostgresChangeKind::Delete.as_str(), "DELETE");
}
#[test]
fn kind_matches_case_insensitive() {
assert!(PostgresChangeKind::Insert.matches("INSERT"));
assert!(PostgresChangeKind::Insert.matches("insert"));
assert!(PostgresChangeKind::Insert.matches("Insert"));
assert!(!PostgresChangeKind::Insert.matches("UPDATE"));
assert!(!PostgresChangeKind::Insert.matches("DELETE"));
}
#[test]
fn kind_all_matches_everything() {
assert!(PostgresChangeKind::All.matches("INSERT"));
assert!(PostgresChangeKind::All.matches("UPDATE"));
assert!(PostgresChangeKind::All.matches("DELETE"));
assert!(PostgresChangeKind::All.matches("anything"));
}
#[test]
fn kind_update_matches_only_update() {
assert!(PostgresChangeKind::Update.matches("UPDATE"));
assert!(PostgresChangeKind::Update.matches("update"));
assert!(!PostgresChangeKind::Update.matches("INSERT"));
}
#[test]
fn kind_delete_matches_only_delete() {
assert!(PostgresChangeKind::Delete.matches("DELETE"));
assert!(!PostgresChangeKind::Delete.matches("UPDATE"));
}
#[test]
fn filter_defaults_to_public_schema() {
let f = PostgresChangesFilter::new(PostgresChangeKind::Insert);
assert_eq!(f.schema, "public");
assert!(f.table.is_none());
assert!(f.filter.is_none());
}
#[test]
fn filter_to_json_minimal() {
let f = PostgresChangesFilter::new(PostgresChangeKind::Update).schema("auth");
assert_eq!(
f.to_json(),
json!({"event": "UPDATE", "schema": "auth"})
);
}
#[test]
fn filter_to_json_with_table_and_filter() {
let f = PostgresChangesFilter::new(PostgresChangeKind::Delete)
.schema("public")
.table("messages")
.filter("user_id=eq.42");
assert_eq!(
f.to_json(),
json!({"event": "DELETE", "schema": "public", "table": "messages", "filter": "user_id=eq.42"})
);
}
#[test]
fn filter_all_event_to_json() {
let f = PostgresChangesFilter::new(PostgresChangeKind::All);
assert_eq!(f.to_json()["event"], "*");
}
#[test]
fn filter_table_not_set_omits_key() {
let f = PostgresChangesFilter::new(PostgresChangeKind::Insert);
let j = f.to_json();
assert!(j.get("table").is_none(), "table should be absent when not set");
}
#[test]
fn filter_filter_not_set_omits_key() {
let f = PostgresChangesFilter::new(PostgresChangeKind::Insert).table("t");
let j = f.to_json();
assert!(j.get("filter").is_none(), "filter key should be absent when not set");
}
#[test]
fn presence_event_strings() {
assert_eq!(PresenceEvent::Sync.as_str(), "sync");
assert_eq!(PresenceEvent::Join.as_str(), "join");
assert_eq!(PresenceEvent::Leave.as_str(), "leave");
}
#[test]
fn presence_event_eq() {
assert_eq!(PresenceEvent::Sync, PresenceEvent::Sync);
assert_ne!(PresenceEvent::Sync, PresenceEvent::Join);
}
#[test]
fn subscription_status_strings() {
assert_eq!(SubscriptionStatus::Subscribing.as_str(), "SUBSCRIBING");
assert_eq!(SubscriptionStatus::Subscribed.as_str(), "SUBSCRIBED");
assert_eq!(SubscriptionStatus::TimedOut.as_str(), "TIMED_OUT");
assert_eq!(SubscriptionStatus::Closed.as_str(), "CLOSED");
assert_eq!(SubscriptionStatus::ChannelError.as_str(), "CHANNEL_ERROR");
}
#[test]
fn postgres_change_payload_deserializes_insert() {
let payload = json!({
"type": "INSERT",
"schema": "public",
"table": "messages",
"commit_timestamp": "2024-01-01T00:00:00Z",
"record": {"id": 1, "text": "hello"},
"old_record": {},
"errors": null
});
let p: PostgresChangePayload = serde_json::from_value(payload).unwrap();
assert_eq!(p.change_type, "INSERT");
assert_eq!(p.schema, "public");
assert_eq!(p.table, "messages");
assert_eq!(p.record["id"], 1);
}
#[test]
fn postgres_change_payload_deserializes_missing_optional_fields() {
let payload = json!({ "type": "DELETE", "schema": "public", "table": "t" });
let p: PostgresChangePayload = serde_json::from_value(payload).unwrap();
assert_eq!(p.change_type, "DELETE");
assert!(p.commit_timestamp.is_none());
assert!(p.errors.is_none());
}
#[test]
fn broadcast_payload_deserializes() {
let payload = json!({"event": "cursor-pos", "payload": {"x": 10, "y": 20}});
let b: BroadcastPayload = serde_json::from_value(payload).unwrap();
assert_eq!(b.event, "cursor-pos");
assert_eq!(b.payload["x"], 10);
}
#[test]
fn broadcast_payload_missing_payload_defaults_to_null() {
let payload = json!({"event": "ping"});
let b: BroadcastPayload = serde_json::from_value(payload).unwrap();
assert_eq!(b.event, "ping");
assert_eq!(b.payload, json!(null));
}
#[test]
fn realtime_event_is_debug_and_clone() {
let e = RealtimeEvent::Closed;
let cloned = e.clone();
let _fmt = format!("{cloned:?}");
}
#[test]
fn realtime_event_error_holds_string() {
let e = RealtimeEvent::Error("something bad".into());
match e {
RealtimeEvent::Error(msg) => assert_eq!(msg, "something bad"),
_ => panic!("wrong variant"),
}
}
}