pub mod broker;
pub use broker::{
create_notify_trigger_sql, drop_notify_trigger_sql, table_channel_name, BrokerError,
NotifyBroker, PgNotification,
};
use crate::schema::GeneratedSchema;
use postrust_core::schema_cache::SchemaCache;
#[derive(Debug, Clone)]
pub struct SubscriptionField {
pub name: String,
pub table_name: String,
pub schema_name: String,
pub return_type: String,
pub description: Option<String>,
}
impl SubscriptionField {
pub fn for_table(schema: &str, table: &str, type_name: &str) -> Self {
Self {
name: to_camel_case(table),
table_name: table.to_string(),
schema_name: schema.to_string(),
return_type: type_name.to_string(),
description: Some(format!("Subscribe to changes on the {} table", table)),
}
}
pub fn channel_name(&self) -> String {
table_channel_name(&self.schema_name, &self.table_name)
}
}
pub fn generate_subscription_fields(
_schema_cache: &SchemaCache,
generated: &GeneratedSchema,
) -> Vec<SubscriptionField> {
let mut fields = Vec::new();
for (type_name, obj_type) in &generated.object_types {
let table = &obj_type.table;
if !table.is_view {
fields.push(SubscriptionField::for_table(
&table.schema,
&table.name,
type_name,
));
}
}
fields
}
fn to_camel_case(s: &str) -> String {
let mut result = String::new();
let mut capitalize_next = false;
for (i, c) in s.chars().enumerate() {
if c == '_' {
capitalize_next = true;
} else if capitalize_next {
result.push(c.to_ascii_uppercase());
capitalize_next = false;
} else if i == 0 {
result.push(c.to_ascii_lowercase());
} else {
result.push(c);
}
}
result
}
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct TableChangePayload {
pub operation: String,
pub table: String,
pub schema: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub old: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub new: Option<serde_json::Value>,
}
impl TableChangePayload {
pub fn from_payload(payload: &str) -> Result<Self, serde_json::Error> {
serde_json::from_str(payload)
}
pub fn data(&self) -> Option<&serde_json::Value> {
match self.operation.as_str() {
"DELETE" => self.old.as_ref(),
_ => self.new.as_ref(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_to_camel_case() {
assert_eq!(to_camel_case("users"), "users");
assert_eq!(to_camel_case("user_orders"), "userOrders");
assert_eq!(to_camel_case("order_items"), "orderItems");
assert_eq!(to_camel_case("my_table_name"), "myTableName");
}
#[test]
fn test_subscription_field_channel_name() {
let field = SubscriptionField::for_table("public", "users", "Users");
assert_eq!(field.channel_name(), "postrust_public_users");
}
#[test]
fn test_table_change_payload_parsing() {
let json = r#"{
"operation": "INSERT",
"table": "users",
"schema": "public",
"new": {"id": 1, "name": "Alice"}
}"#;
let payload = TableChangePayload::from_payload(json).unwrap();
assert_eq!(payload.operation, "INSERT");
assert_eq!(payload.table, "users");
assert!(payload.new.is_some());
assert!(payload.old.is_none());
}
#[test]
fn test_table_change_payload_data() {
let insert_payload = TableChangePayload {
operation: "INSERT".to_string(),
table: "users".to_string(),
schema: "public".to_string(),
old: None,
new: Some(serde_json::json!({"id": 1})),
};
assert!(insert_payload.data().is_some());
let delete_payload = TableChangePayload {
operation: "DELETE".to_string(),
table: "users".to_string(),
schema: "public".to_string(),
old: Some(serde_json::json!({"id": 1})),
new: None,
};
assert!(delete_payload.data().is_some());
}
}