postrust_graphql/subscription/
mod.rs

1//! GraphQL subscriptions using PostgreSQL LISTEN/NOTIFY.
2//!
3//! This module provides realtime data synchronization through GraphQL subscriptions,
4//! powered by PostgreSQL's native LISTEN/NOTIFY mechanism.
5//!
6//! ## Architecture
7//!
8//! ```text
9//! ┌──────────────┐   WebSocket   ┌──────────────┐   LISTEN/NOTIFY   ┌────────────┐
10//! │   Client     │◀────────────▶│   Postrust   │◀────────────────▶│  PostgreSQL│
11//! │  (Browser)   │              │   Server     │                   │  Database  │
12//! └──────────────┘              └──────────────┘                   └────────────┘
13//! ```
14//!
15//! ## Usage
16//!
17//! 1. Create notification triggers on your tables using [`broker::create_notify_trigger_sql`]
18//! 2. Start the [`NotifyBroker`] to listen for database notifications
19//! 3. Use GraphQL subscriptions to receive realtime updates
20//!
21//! ## Example
22//!
23//! ```graphql
24//! subscription {
25//!   users {
26//!     id
27//!     name
28//!     email
29//!   }
30//! }
31//! ```
32
33pub mod broker;
34
35pub use broker::{
36    create_notify_trigger_sql, drop_notify_trigger_sql, table_channel_name, BrokerError,
37    NotifyBroker, PgNotification,
38};
39
40use crate::schema::GeneratedSchema;
41use postrust_core::schema_cache::SchemaCache;
42
43/// A subscription field in the GraphQL schema.
44#[derive(Debug, Clone)]
45pub struct SubscriptionField {
46    /// The GraphQL field name (e.g., "users", "orders")
47    pub name: String,
48    /// The source table name
49    pub table_name: String,
50    /// The source schema name
51    pub schema_name: String,
52    /// The return type (e.g., "Users", "Orders")
53    pub return_type: String,
54    /// Description for documentation
55    pub description: Option<String>,
56}
57
58impl SubscriptionField {
59    /// Create a new subscription field for a table.
60    pub fn for_table(schema: &str, table: &str, type_name: &str) -> Self {
61        Self {
62            name: to_camel_case(table),
63            table_name: table.to_string(),
64            schema_name: schema.to_string(),
65            return_type: type_name.to_string(),
66            description: Some(format!("Subscribe to changes on the {} table", table)),
67        }
68    }
69
70    /// Get the PostgreSQL channel name for this subscription.
71    pub fn channel_name(&self) -> String {
72        table_channel_name(&self.schema_name, &self.table_name)
73    }
74}
75
76/// Generate subscription fields for all tables in the schema.
77pub fn generate_subscription_fields(
78    _schema_cache: &SchemaCache,
79    generated: &GeneratedSchema,
80) -> Vec<SubscriptionField> {
81    let mut fields = Vec::new();
82
83    for (type_name, obj_type) in &generated.object_types {
84        let table = &obj_type.table;
85
86        // Only create subscriptions for tables, not views (views can be added later)
87        if !table.is_view {
88            fields.push(SubscriptionField::for_table(
89                &table.schema,
90                &table.name,
91                type_name,
92            ));
93        }
94    }
95
96    fields
97}
98
99/// Convert a snake_case string to camelCase.
100fn to_camel_case(s: &str) -> String {
101    let mut result = String::new();
102    let mut capitalize_next = false;
103
104    for (i, c) in s.chars().enumerate() {
105        if c == '_' {
106            capitalize_next = true;
107        } else if capitalize_next {
108            result.push(c.to_ascii_uppercase());
109            capitalize_next = false;
110        } else if i == 0 {
111            result.push(c.to_ascii_lowercase());
112        } else {
113            result.push(c);
114        }
115    }
116
117    result
118}
119
120/// Payload structure for table change notifications.
121#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
122pub struct TableChangePayload {
123    /// The operation type: INSERT, UPDATE, or DELETE
124    pub operation: String,
125    /// The table name
126    pub table: String,
127    /// The schema name
128    pub schema: String,
129    /// The old row data (for UPDATE and DELETE)
130    #[serde(skip_serializing_if = "Option::is_none")]
131    pub old: Option<serde_json::Value>,
132    /// The new row data (for INSERT and UPDATE)
133    #[serde(skip_serializing_if = "Option::is_none")]
134    pub new: Option<serde_json::Value>,
135}
136
137impl TableChangePayload {
138    /// Parse a notification payload.
139    pub fn from_payload(payload: &str) -> Result<Self, serde_json::Error> {
140        serde_json::from_str(payload)
141    }
142
143    /// Get the data to return to the client.
144    ///
145    /// For INSERT and UPDATE, returns the new row.
146    /// For DELETE, returns the old row.
147    pub fn data(&self) -> Option<&serde_json::Value> {
148        match self.operation.as_str() {
149            "DELETE" => self.old.as_ref(),
150            _ => self.new.as_ref(),
151        }
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158
159    #[test]
160    fn test_to_camel_case() {
161        assert_eq!(to_camel_case("users"), "users");
162        assert_eq!(to_camel_case("user_orders"), "userOrders");
163        assert_eq!(to_camel_case("order_items"), "orderItems");
164        // PostgreSQL identifiers are typically lowercase, but function preserves case
165        assert_eq!(to_camel_case("my_table_name"), "myTableName");
166    }
167
168    #[test]
169    fn test_subscription_field_channel_name() {
170        let field = SubscriptionField::for_table("public", "users", "Users");
171        assert_eq!(field.channel_name(), "postrust_public_users");
172    }
173
174    #[test]
175    fn test_table_change_payload_parsing() {
176        let json = r#"{
177            "operation": "INSERT",
178            "table": "users",
179            "schema": "public",
180            "new": {"id": 1, "name": "Alice"}
181        }"#;
182
183        let payload = TableChangePayload::from_payload(json).unwrap();
184        assert_eq!(payload.operation, "INSERT");
185        assert_eq!(payload.table, "users");
186        assert!(payload.new.is_some());
187        assert!(payload.old.is_none());
188    }
189
190    #[test]
191    fn test_table_change_payload_data() {
192        let insert_payload = TableChangePayload {
193            operation: "INSERT".to_string(),
194            table: "users".to_string(),
195            schema: "public".to_string(),
196            old: None,
197            new: Some(serde_json::json!({"id": 1})),
198        };
199        assert!(insert_payload.data().is_some());
200
201        let delete_payload = TableChangePayload {
202            operation: "DELETE".to_string(),
203            table: "users".to_string(),
204            schema: "public".to_string(),
205            old: Some(serde_json::json!({"id": 1})),
206            new: None,
207        };
208        assert!(delete_payload.data().is_some());
209    }
210}