postrust_graphql/subscription/
mod.rs1pub mod broker;
34
35pub use broker::{
36 create_notify_trigger_sql, drop_notify_trigger_sql, table_channel_name, BrokerError,
37 NotifyBroker, PgNotification,
38};
39
40use crate::schema::GeneratedSchema;
41use postrust_core::schema_cache::SchemaCache;
42
43#[derive(Debug, Clone)]
45pub struct SubscriptionField {
46 pub name: String,
48 pub table_name: String,
50 pub schema_name: String,
52 pub return_type: String,
54 pub description: Option<String>,
56}
57
58impl SubscriptionField {
59 pub fn for_table(schema: &str, table: &str, type_name: &str) -> Self {
61 Self {
62 name: to_camel_case(table),
63 table_name: table.to_string(),
64 schema_name: schema.to_string(),
65 return_type: type_name.to_string(),
66 description: Some(format!("Subscribe to changes on the {} table", table)),
67 }
68 }
69
70 pub fn channel_name(&self) -> String {
72 table_channel_name(&self.schema_name, &self.table_name)
73 }
74}
75
76pub fn generate_subscription_fields(
78 _schema_cache: &SchemaCache,
79 generated: &GeneratedSchema,
80) -> Vec<SubscriptionField> {
81 let mut fields = Vec::new();
82
83 for (type_name, obj_type) in &generated.object_types {
84 let table = &obj_type.table;
85
86 if !table.is_view {
88 fields.push(SubscriptionField::for_table(
89 &table.schema,
90 &table.name,
91 type_name,
92 ));
93 }
94 }
95
96 fields
97}
98
99fn to_camel_case(s: &str) -> String {
101 let mut result = String::new();
102 let mut capitalize_next = false;
103
104 for (i, c) in s.chars().enumerate() {
105 if c == '_' {
106 capitalize_next = true;
107 } else if capitalize_next {
108 result.push(c.to_ascii_uppercase());
109 capitalize_next = false;
110 } else if i == 0 {
111 result.push(c.to_ascii_lowercase());
112 } else {
113 result.push(c);
114 }
115 }
116
117 result
118}
119
120#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
122pub struct TableChangePayload {
123 pub operation: String,
125 pub table: String,
127 pub schema: String,
129 #[serde(skip_serializing_if = "Option::is_none")]
131 pub old: Option<serde_json::Value>,
132 #[serde(skip_serializing_if = "Option::is_none")]
134 pub new: Option<serde_json::Value>,
135}
136
137impl TableChangePayload {
138 pub fn from_payload(payload: &str) -> Result<Self, serde_json::Error> {
140 serde_json::from_str(payload)
141 }
142
143 pub fn data(&self) -> Option<&serde_json::Value> {
148 match self.operation.as_str() {
149 "DELETE" => self.old.as_ref(),
150 _ => self.new.as_ref(),
151 }
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158
159 #[test]
160 fn test_to_camel_case() {
161 assert_eq!(to_camel_case("users"), "users");
162 assert_eq!(to_camel_case("user_orders"), "userOrders");
163 assert_eq!(to_camel_case("order_items"), "orderItems");
164 assert_eq!(to_camel_case("my_table_name"), "myTableName");
166 }
167
168 #[test]
169 fn test_subscription_field_channel_name() {
170 let field = SubscriptionField::for_table("public", "users", "Users");
171 assert_eq!(field.channel_name(), "postrust_public_users");
172 }
173
174 #[test]
175 fn test_table_change_payload_parsing() {
176 let json = r#"{
177 "operation": "INSERT",
178 "table": "users",
179 "schema": "public",
180 "new": {"id": 1, "name": "Alice"}
181 }"#;
182
183 let payload = TableChangePayload::from_payload(json).unwrap();
184 assert_eq!(payload.operation, "INSERT");
185 assert_eq!(payload.table, "users");
186 assert!(payload.new.is_some());
187 assert!(payload.old.is_none());
188 }
189
190 #[test]
191 fn test_table_change_payload_data() {
192 let insert_payload = TableChangePayload {
193 operation: "INSERT".to_string(),
194 table: "users".to_string(),
195 schema: "public".to_string(),
196 old: None,
197 new: Some(serde_json::json!({"id": 1})),
198 };
199 assert!(insert_payload.data().is_some());
200
201 let delete_payload = TableChangePayload {
202 operation: "DELETE".to_string(),
203 table: "users".to_string(),
204 schema: "public".to_string(),
205 old: Some(serde_json::json!({"id": 1})),
206 new: None,
207 };
208 assert!(delete_payload.data().is_some());
209 }
210}