Skip to main content

fraiseql_core/runtime/subscription/
mod.rs

1//! Subscription runtime for event-driven GraphQL subscriptions.
2//!
3//! FraiseQL subscriptions are **compiled projections of database events**, not
4//! traditional resolver-based subscriptions. Events originate from database
5//! transactions (via LISTEN/NOTIFY or CDC) and are delivered through transport
6//! adapters.
7//!
8//! # Architecture
9//!
10//! ```text
11//! Database Transaction (INSERT/UPDATE/DELETE)
12//!     ↓ (commits)
13//! LISTEN/NOTIFY (PostgreSQL)
14//!     ↓
15//! SubscriptionManager (event routing)
16//!     ↓
17//! SubscriptionMatcher (filter evaluation)
18//!     ↓ (parallel delivery)
19//! ├─ graphql-ws Adapter (WebSocket)
20//! ├─ Webhook Adapter (HTTP POST)
21//! └─ Kafka Adapter (event streaming)
22//! ```
23//!
24//! # Example
25//!
26//! ```text
27//! // Illustrative — subscription infrastructure requires a live schema + transport.
28//! // Use SubscriptionManager::new(Arc::new(schema)) for the full API.
29//!
30//! // Create subscription manager
31//! let manager = SubscriptionManager::new(Arc::new(schema));
32//!
33//! // Subscribe to events (synchronous, not async)
34//! let subscription_id = manager.subscribe(
35//!     "OrderCreated",
36//!     user_context_json,
37//!     variables_json,
38//!     "connection-id",
39//! )?;
40//!
41//! // Receive events via broadcast channel
42//! let mut receiver = manager.receiver();
43//! while let Ok(payload) = receiver.recv().await {
44//!     if payload.subscription_id == subscription_id {
45//!         // Deliver to client
46//!     }
47//! }
48//!
49//! // Unsubscribe (synchronous)
50//! manager.unsubscribe(subscription_id)?;
51//! ```
52
53use thiserror::Error;
54
55mod kafka;
56mod manager;
57pub mod protocol;
58#[cfg(test)]
59mod tests;
60mod transport;
61mod types;
62mod webhook;
63
64pub use kafka::{KafkaAdapter, KafkaConfig, KafkaMessage};
65pub use manager::SubscriptionManager;
66pub use transport::{BoxDynTransportAdapter, DeliveryResult, TransportAdapter, TransportManager};
67pub use types::{
68    ActiveSubscription, SubscriptionEvent, SubscriptionId, SubscriptionOperation,
69    SubscriptionPayload,
70};
71pub use webhook::{WebhookAdapter, WebhookPayload, WebhookTransportConfig};
72/// Backward-compatible type alias — use [`WebhookTransportConfig`] in new code.
73pub type WebhookConfig = WebhookTransportConfig;
74
75/// Extract `(field, value)` equality conditions from an RLS `WhereClause`.
76///
77/// Walks the clause tree and collects all `Field { op: Eq }` nodes.
78/// `And` nodes are recursively flattened. Other clause types (nested `Or`,
79/// non-Eq operators) are ignored — they cannot be represented as simple
80/// field-value pairs and will not filter subscription events.
81///
82/// The caller should evaluate the RLS policy at subscribe time and pass
83/// the result through this function to produce conditions for
84/// [`ActiveSubscription::with_rls_conditions`].
85///
86/// # Errors
87///
88/// This function is infallible — unsupported clause shapes are silently
89/// skipped, which is a safe default (fewer conditions = more events
90/// delivered, never fewer).
91pub fn extract_rls_conditions(clause: &crate::db::WhereClause) -> Vec<(String, serde_json::Value)> {
92    let mut conditions = Vec::new();
93    collect_eq_conditions(clause, &mut conditions);
94    conditions
95}
96
97fn collect_eq_conditions(
98    clause: &crate::db::WhereClause,
99    out: &mut Vec<(String, serde_json::Value)>,
100) {
101    use crate::db::{WhereClause, WhereOperator};
102    match clause {
103        WhereClause::Field {
104            path,
105            operator: WhereOperator::Eq,
106            value,
107        } => {
108            // Use the last path component as the field name (e.g., ["tenant_id"] → "tenant_id")
109            if let Some(field) = path.last() {
110                out.push((field.clone(), value.clone()));
111            }
112        },
113        WhereClause::And(clauses) => {
114            for c in clauses {
115                collect_eq_conditions(c, out);
116            }
117        },
118        _ => {
119            // Or, Not, non-Eq operators — cannot be represented as simple field-value pairs.
120            // Safe default: skip (delivers more events, never fewer).
121        },
122    }
123}
124
125// =============================================================================
126// Error Types
127// =============================================================================
128
129/// Errors that can occur during subscription operations.
130#[non_exhaustive]
131#[derive(Debug, Error)]
132pub enum SubscriptionError {
133    /// Subscription type not found in schema.
134    #[error("Subscription not found: {0}")]
135    SubscriptionNotFound(String),
136
137    /// Authentication required for subscription.
138    #[error("Authentication required for subscription: {0}")]
139    AuthenticationRequired(String),
140
141    /// User not authorized for subscription.
142    #[error("Not authorized for subscription: {0}")]
143    Forbidden(String),
144
145    /// Invalid subscription variables.
146    #[error("Invalid subscription variables: {0}")]
147    InvalidVariables(String),
148
149    /// Subscription already exists.
150    #[error("Subscription already exists: {0}")]
151    AlreadyExists(String),
152
153    /// Subscription not active.
154    #[error("Subscription not active: {0}")]
155    NotActive(String),
156
157    /// Internal subscription error.
158    #[error("Subscription error: {0}")]
159    Internal(String),
160
161    /// Channel send error.
162    #[error("Failed to send event: {0}")]
163    SendError(String),
164
165    /// Database connection error.
166    #[error("Database connection error: {0}")]
167    DatabaseConnection(String),
168
169    /// Listener already running.
170    #[error("Listener already running")]
171    ListenerAlreadyRunning,
172
173    /// Listener not running.
174    #[error("Listener not running")]
175    ListenerNotRunning,
176
177    /// Failed to parse notification payload.
178    #[error("Failed to parse notification: {0}")]
179    InvalidNotification(String),
180
181    /// Failed to deliver event to transport.
182    #[error("Failed to deliver to {transport}: {reason}")]
183    DeliveryFailed {
184        /// Transport that failed.
185        transport: String,
186        /// Reason for failure.
187        reason:    String,
188    },
189}