fraiseql_core/runtime/subscription/mod.rs
1//! Subscription runtime for event-driven GraphQL subscriptions.
2//!
3//! FraiseQL subscriptions are **compiled projections of database events**, not
4//! traditional resolver-based subscriptions. Events originate from database
5//! transactions (via LISTEN/NOTIFY or CDC) and are delivered through transport
6//! adapters.
7//!
8//! # Architecture
9//!
10//! ```text
11//! Database Transaction (INSERT/UPDATE/DELETE)
12//! ↓ (commits)
13//! LISTEN/NOTIFY (PostgreSQL)
14//! ↓
15//! SubscriptionManager (event routing)
16//! ↓
17//! SubscriptionMatcher (filter evaluation)
18//! ↓ (parallel delivery)
19//! ├─ graphql-ws Adapter (WebSocket)
20//! ├─ Webhook Adapter (HTTP POST)
21//! └─ Kafka Adapter (event streaming)
22//! ```
23//!
24//! # Example
25//!
26//! ```text
27//! // Illustrative — subscription infrastructure requires a live schema + transport.
28//! // Use SubscriptionManager::new(Arc::new(schema)) for the full API.
29//!
30//! // Create subscription manager
31//! let manager = SubscriptionManager::new(Arc::new(schema));
32//!
33//! // Subscribe to events (synchronous, not async)
34//! let subscription_id = manager.subscribe(
35//! "OrderCreated",
36//! user_context_json,
37//! variables_json,
38//! "connection-id",
39//! )?;
40//!
41//! // Receive events via broadcast channel
42//! let mut receiver = manager.receiver();
43//! while let Ok(payload) = receiver.recv().await {
44//! if payload.subscription_id == subscription_id {
45//! // Deliver to client
46//! }
47//! }
48//!
49//! // Unsubscribe (synchronous)
50//! manager.unsubscribe(subscription_id)?;
51//! ```
52
53use thiserror::Error;
54
55mod kafka;
56mod manager;
57pub mod protocol;
58#[cfg(test)]
59mod tests;
60mod transport;
61mod types;
62mod webhook;
63
64pub use kafka::{KafkaAdapter, KafkaConfig, KafkaMessage};
65pub use manager::SubscriptionManager;
66pub use transport::{BoxDynTransportAdapter, DeliveryResult, TransportAdapter, TransportManager};
67pub use types::{
68 ActiveSubscription, SubscriptionEvent, SubscriptionId, SubscriptionOperation,
69 SubscriptionPayload,
70};
71pub use webhook::{WebhookAdapter, WebhookPayload, WebhookTransportConfig};
72/// Backward-compatible type alias — use [`WebhookTransportConfig`] in new code.
73pub type WebhookConfig = WebhookTransportConfig;
74
75/// Extract `(field, value)` equality conditions from an RLS `WhereClause`.
76///
77/// Walks the clause tree and collects all `Field { op: Eq }` nodes.
78/// `And` nodes are recursively flattened. Other clause types (nested `Or`,
79/// non-Eq operators) are ignored — they cannot be represented as simple
80/// field-value pairs and will not filter subscription events.
81///
82/// The caller should evaluate the RLS policy at subscribe time and pass
83/// the result through this function to produce conditions for
84/// [`ActiveSubscription::with_rls_conditions`].
85///
86/// # Errors
87///
88/// This function is infallible — unsupported clause shapes are silently
89/// skipped, which is a safe default (fewer conditions = more events
90/// delivered, never fewer).
91pub fn extract_rls_conditions(clause: &crate::db::WhereClause) -> Vec<(String, serde_json::Value)> {
92 let mut conditions = Vec::new();
93 collect_eq_conditions(clause, &mut conditions);
94 conditions
95}
96
97fn collect_eq_conditions(
98 clause: &crate::db::WhereClause,
99 out: &mut Vec<(String, serde_json::Value)>,
100) {
101 use crate::db::{WhereClause, WhereOperator};
102 match clause {
103 WhereClause::Field {
104 path,
105 operator: WhereOperator::Eq,
106 value,
107 } => {
108 // Use the last path component as the field name (e.g., ["tenant_id"] → "tenant_id")
109 if let Some(field) = path.last() {
110 out.push((field.clone(), value.clone()));
111 }
112 },
113 WhereClause::And(clauses) => {
114 for c in clauses {
115 collect_eq_conditions(c, out);
116 }
117 },
118 _ => {
119 // Or, Not, non-Eq operators — cannot be represented as simple field-value pairs.
120 // Safe default: skip (delivers more events, never fewer).
121 },
122 }
123}
124
125// =============================================================================
126// Error Types
127// =============================================================================
128
129/// Errors that can occur during subscription operations.
130#[non_exhaustive]
131#[derive(Debug, Error)]
132pub enum SubscriptionError {
133 /// Subscription type not found in schema.
134 #[error("Subscription not found: {0}")]
135 SubscriptionNotFound(String),
136
137 /// Authentication required for subscription.
138 #[error("Authentication required for subscription: {0}")]
139 AuthenticationRequired(String),
140
141 /// User not authorized for subscription.
142 #[error("Not authorized for subscription: {0}")]
143 Forbidden(String),
144
145 /// Invalid subscription variables.
146 #[error("Invalid subscription variables: {0}")]
147 InvalidVariables(String),
148
149 /// Subscription already exists.
150 #[error("Subscription already exists: {0}")]
151 AlreadyExists(String),
152
153 /// Subscription not active.
154 #[error("Subscription not active: {0}")]
155 NotActive(String),
156
157 /// Internal subscription error.
158 #[error("Subscription error: {0}")]
159 Internal(String),
160
161 /// Channel send error.
162 #[error("Failed to send event: {0}")]
163 SendError(String),
164
165 /// Database connection error.
166 #[error("Database connection error: {0}")]
167 DatabaseConnection(String),
168
169 /// Listener already running.
170 #[error("Listener already running")]
171 ListenerAlreadyRunning,
172
173 /// Listener not running.
174 #[error("Listener not running")]
175 ListenerNotRunning,
176
177 /// Failed to parse notification payload.
178 #[error("Failed to parse notification: {0}")]
179 InvalidNotification(String),
180
181 /// Failed to deliver event to transport.
182 #[error("Failed to deliver to {transport}: {reason}")]
183 DeliveryFailed {
184 /// Transport that failed.
185 transport: String,
186 /// Reason for failure.
187 reason: String,
188 },
189}