Skip to main content

oxirs_stream/
graphql_bridge.rs

1//! # GraphQL Subscription Bridge
2//!
3//! This module bridges oxirs-stream with GraphQL subscriptions, enabling real-time
4//! GraphQL updates when stream events occur.
5//!
6//! ## Features
7//!
8//! - **Stream-to-GraphQL Event Mapping**: Convert stream events to GraphQL subscription updates
9//! - **Query-based Filtering**: Only trigger subscriptions for relevant data changes
10//! - **WebSocket Integration**: Seamless integration with GraphQL subscription servers
11//! - **Multi-subscriber Support**: Broadcast events to multiple GraphQL clients
12//! - **Performance Optimization**: Debouncing and batching for high-frequency updates
13//!
14//! ## Usage Example
15//!
16//! ```rust,no_run
17//! use oxirs_stream::{StreamConfig, StreamProducer};
18//! use oxirs_stream::graphql_bridge::{BridgeConfig, GraphQLBridge};
19//!
20//! async fn setup_graphql_streaming() -> anyhow::Result<()> {
21//!     let config = StreamConfig::default();
22//!     let _producer = StreamProducer::new(config).await?;
23//!
24//!     // Create GraphQL bridge and inspect initial statistics
25//!     let bridge = GraphQLBridge::new(BridgeConfig::default());
26//!     let stats = bridge.get_stats().await;
27//!     println!("Active subscriptions: {}", stats.active_subscriptions);
28//!
29//!     Ok(())
30//! }
31//! ```
32
33use anyhow::{anyhow, Result};
34use chrono::{DateTime, Utc};
35use serde::{Deserialize, Serialize};
36use std::collections::HashMap;
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39use tokio::sync::{broadcast, RwLock};
40use tokio::time::interval;
41use tracing::{debug, info, warn};
42
43use crate::StreamEvent;
44
45/// GraphQL bridge configuration
46#[derive(Debug, Clone)]
47pub struct BridgeConfig {
48    /// Maximum queue size for pending updates
49    pub max_queue_size: usize,
50    /// Debounce duration for frequent updates
51    pub debounce_duration: Duration,
52    /// Enable batching of updates
53    pub enable_batching: bool,
54    /// Maximum batch size
55    pub max_batch_size: usize,
56    /// Update interval for batched updates
57    pub batch_interval: Duration,
58    /// Enable query filtering (only send relevant updates)
59    pub enable_query_filtering: bool,
60    /// Maximum concurrent subscriptions
61    pub max_subscriptions: usize,
62}
63
64impl Default for BridgeConfig {
65    fn default() -> Self {
66        Self {
67            max_queue_size: 10000,
68            debounce_duration: Duration::from_millis(100),
69            enable_batching: true,
70            max_batch_size: 100,
71            batch_interval: Duration::from_millis(500),
72            enable_query_filtering: true,
73            max_subscriptions: 1000,
74        }
75    }
76}
77
78/// GraphQL subscription bridge
79pub struct GraphQLBridge {
80    /// Configuration
81    config: BridgeConfig,
82    /// Registered GraphQL subscriptions
83    subscriptions: Arc<RwLock<HashMap<String, GraphQLSubscription>>>,
84    /// Event broadcaster
85    event_sender: broadcast::Sender<GraphQLUpdate>,
86    /// Statistics
87    stats: Arc<RwLock<BridgeStats>>,
88    /// Debounce tracker
89    debounce_tracker: Arc<RwLock<HashMap<String, Instant>>>,
90    /// Batch buffer
91    batch_buffer: Arc<RwLock<Vec<GraphQLUpdate>>>,
92}
93
94/// GraphQL subscription registration
95#[derive(Debug, Clone)]
96pub struct GraphQLSubscription {
97    /// Subscription ID
98    pub id: String,
99    /// GraphQL query
100    pub query: String,
101    /// Variables
102    pub variables: HashMap<String, serde_json::Value>,
103    /// Filter patterns (for optimization)
104    pub filters: Vec<SubscriptionFilter>,
105    /// Created timestamp
106    pub created_at: DateTime<Utc>,
107    /// Last update timestamp
108    pub last_update: Option<DateTime<Utc>>,
109    /// Update count
110    pub update_count: u64,
111}
112
113/// Subscription filter for query optimization
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub enum SubscriptionFilter {
116    /// Filter by subject pattern
117    SubjectPattern(String),
118    /// Filter by predicate pattern
119    PredicatePattern(String),
120    /// Filter by object pattern
121    ObjectPattern(String),
122    /// Filter by graph
123    GraphFilter(String),
124    /// Custom filter expression
125    CustomFilter(String),
126}
127
128/// GraphQL update event
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct GraphQLUpdate {
131    /// Update ID
132    pub id: String,
133    /// Timestamp
134    pub timestamp: DateTime<Utc>,
135    /// Update type
136    pub update_type: GraphQLUpdateType,
137    /// Data payload
138    pub data: serde_json::Value,
139    /// Affected subscriptions (IDs)
140    pub subscriptions: Vec<String>,
141}
142
143/// GraphQL update types
144#[derive(Debug, Clone, Serialize, Deserialize)]
145pub enum GraphQLUpdateType {
146    /// Data added
147    DataAdded,
148    /// Data removed
149    DataRemoved,
150    /// Data modified
151    DataModified,
152    /// Bulk update
153    BulkUpdate,
154    /// Query result changed
155    QueryResultChanged,
156}
157
158/// Bridge statistics
159#[derive(Debug, Clone, Default)]
160pub struct BridgeStats {
161    /// Total events processed
162    pub events_processed: u64,
163    /// Total updates sent
164    pub updates_sent: u64,
165    /// Updates batched
166    pub updates_batched: u64,
167    /// Updates debounced
168    pub updates_debounced: u64,
169    /// Active subscriptions
170    pub active_subscriptions: usize,
171    /// Average processing time (ms)
172    pub avg_processing_time_ms: f64,
173}
174
175impl GraphQLBridge {
176    /// Create a new GraphQL bridge
177    pub fn new(config: BridgeConfig) -> Self {
178        let (event_sender, _) = broadcast::channel(config.max_queue_size);
179
180        let bridge = Self {
181            config,
182            subscriptions: Arc::new(RwLock::new(HashMap::new())),
183            event_sender,
184            stats: Arc::new(RwLock::new(BridgeStats::default())),
185            debounce_tracker: Arc::new(RwLock::new(HashMap::new())),
186            batch_buffer: Arc::new(RwLock::new(Vec::new())),
187        };
188
189        // Start background tasks
190        if bridge.config.enable_batching {
191            bridge.start_batch_processor();
192        }
193
194        bridge
195    }
196
197    /// Register a GraphQL subscription
198    pub async fn register_subscription(&self, subscription: GraphQLSubscription) -> Result<String> {
199        let mut subscriptions = self.subscriptions.write().await;
200
201        if subscriptions.len() >= self.config.max_subscriptions {
202            return Err(anyhow!("Maximum subscriptions limit reached"));
203        }
204
205        let id = subscription.id.clone();
206        subscriptions.insert(id.clone(), subscription);
207
208        // Update stats
209        self.stats.write().await.active_subscriptions = subscriptions.len();
210
211        info!("Registered GraphQL subscription: {}", id);
212        Ok(id)
213    }
214
215    /// Unregister a GraphQL subscription
216    pub async fn unregister_subscription(&self, subscription_id: &str) -> Result<()> {
217        let mut subscriptions = self.subscriptions.write().await;
218        subscriptions
219            .remove(subscription_id)
220            .ok_or_else(|| anyhow!("Subscription not found"))?;
221
222        // Update stats
223        self.stats.write().await.active_subscriptions = subscriptions.len();
224
225        info!("Unregistered GraphQL subscription: {}", subscription_id);
226        Ok(())
227    }
228
229    /// Process a stream event and trigger GraphQL updates
230    pub async fn process_stream_event(&self, event: &StreamEvent) -> Result<()> {
231        let start_time = Instant::now();
232
233        // Convert stream event to GraphQL update
234        let update = self.convert_stream_event_to_update(event).await?;
235
236        // Check debouncing
237        if self.should_debounce(&update).await {
238            self.stats.write().await.updates_debounced += 1;
239            return Ok(());
240        }
241
242        // Update debounce tracker
243        self.update_debounce_tracker(&update).await;
244
245        // Handle batching if enabled
246        if self.config.enable_batching {
247            self.add_to_batch(update).await?;
248        } else {
249            self.send_update(update).await?;
250        }
251
252        // Update stats
253        let mut stats = self.stats.write().await;
254        stats.events_processed += 1;
255
256        let processing_time = start_time.elapsed().as_millis() as f64;
257        stats.avg_processing_time_ms = (stats.avg_processing_time_ms + processing_time) / 2.0;
258
259        Ok(())
260    }
261
262    /// Convert stream event to GraphQL update
263    async fn convert_stream_event_to_update(&self, event: &StreamEvent) -> Result<GraphQLUpdate> {
264        let (update_type, data) = match event {
265            StreamEvent::TripleAdded {
266                subject,
267                predicate,
268                object,
269                graph,
270                metadata,
271            } => (
272                GraphQLUpdateType::DataAdded,
273                serde_json::json!({
274                    "subject": subject,
275                    "predicate": predicate,
276                    "object": object,
277                    "graph": graph,
278                    "timestamp": metadata.timestamp,
279                }),
280            ),
281            StreamEvent::TripleRemoved {
282                subject,
283                predicate,
284                object,
285                graph,
286                metadata,
287            } => (
288                GraphQLUpdateType::DataRemoved,
289                serde_json::json!({
290                    "subject": subject,
291                    "predicate": predicate,
292                    "object": object,
293                    "graph": graph,
294                    "timestamp": metadata.timestamp,
295                }),
296            ),
297            StreamEvent::QuadAdded {
298                subject,
299                predicate,
300                object,
301                graph,
302                metadata,
303            } => (
304                GraphQLUpdateType::DataAdded,
305                serde_json::json!({
306                    "subject": subject,
307                    "predicate": predicate,
308                    "object": object,
309                    "graph": graph,
310                    "timestamp": metadata.timestamp,
311                }),
312            ),
313            StreamEvent::QuadRemoved {
314                subject,
315                predicate,
316                object,
317                graph,
318                metadata,
319            } => (
320                GraphQLUpdateType::DataRemoved,
321                serde_json::json!({
322                    "subject": subject,
323                    "predicate": predicate,
324                    "object": object,
325                    "graph": graph,
326                    "timestamp": metadata.timestamp,
327                }),
328            ),
329            StreamEvent::QueryResultAdded {
330                query_id,
331                result,
332                metadata,
333            } => (
334                GraphQLUpdateType::QueryResultChanged,
335                serde_json::json!({
336                    "query_id": query_id,
337                    "result": result.bindings,
338                    "execution_time": result.execution_time.as_millis(),
339                    "timestamp": metadata.timestamp,
340                }),
341            ),
342            StreamEvent::QueryResultRemoved {
343                query_id,
344                result,
345                metadata,
346            } => (
347                GraphQLUpdateType::QueryResultChanged,
348                serde_json::json!({
349                    "query_id": query_id,
350                    "result": result.bindings,
351                    "execution_time": result.execution_time.as_millis(),
352                    "timestamp": metadata.timestamp,
353                }),
354            ),
355            _ => (
356                GraphQLUpdateType::BulkUpdate,
357                serde_json::json!({
358                    "message": "Bulk update occurred",
359                    "timestamp": Utc::now(),
360                }),
361            ),
362        };
363
364        // Find relevant subscriptions
365        let relevant_subscriptions = self.find_relevant_subscriptions(&data).await;
366
367        Ok(GraphQLUpdate {
368            id: uuid::Uuid::new_v4().to_string(),
369            timestamp: Utc::now(),
370            update_type,
371            data,
372            subscriptions: relevant_subscriptions,
373        })
374    }
375
376    /// Find subscriptions relevant to the update data
377    async fn find_relevant_subscriptions(&self, data: &serde_json::Value) -> Vec<String> {
378        let subscriptions = self.subscriptions.read().await;
379
380        if !self.config.enable_query_filtering {
381            // Return all subscription IDs
382            return subscriptions.keys().cloned().collect();
383        }
384
385        let mut relevant = Vec::new();
386
387        for (id, subscription) in subscriptions.iter() {
388            if self.subscription_matches_data(subscription, data) {
389                relevant.push(id.clone());
390            }
391        }
392
393        relevant
394    }
395
396    /// Check if a subscription matches the update data
397    fn subscription_matches_data(
398        &self,
399        subscription: &GraphQLSubscription,
400        data: &serde_json::Value,
401    ) -> bool {
402        if subscription.filters.is_empty() {
403            // No filters means subscribe to everything
404            return true;
405        }
406
407        // Check if any filter matches
408        for filter in &subscription.filters {
409            match filter {
410                SubscriptionFilter::SubjectPattern(pattern) => {
411                    if let Some(subject) = data.get("subject").and_then(|v| v.as_str()) {
412                        if self.pattern_matches(pattern, subject) {
413                            return true;
414                        }
415                    }
416                }
417                SubscriptionFilter::PredicatePattern(pattern) => {
418                    if let Some(predicate) = data.get("predicate").and_then(|v| v.as_str()) {
419                        if self.pattern_matches(pattern, predicate) {
420                            return true;
421                        }
422                    }
423                }
424                SubscriptionFilter::ObjectPattern(pattern) => {
425                    if let Some(object) = data.get("object").and_then(|v| v.as_str()) {
426                        if self.pattern_matches(pattern, object) {
427                            return true;
428                        }
429                    }
430                }
431                SubscriptionFilter::GraphFilter(graph_uri) => {
432                    if let Some(graph) = data.get("graph").and_then(|v| v.as_str()) {
433                        if graph == graph_uri {
434                            return true;
435                        }
436                    }
437                }
438                SubscriptionFilter::CustomFilter(_expr) => {
439                    // Custom filter evaluation would go here
440                    // For now, be conservative and include it
441                    return true;
442                }
443            }
444        }
445
446        false
447    }
448
449    /// Simple pattern matching (supports wildcards)
450    fn pattern_matches(&self, pattern: &str, value: &str) -> bool {
451        if pattern == "*" {
452            return true;
453        }
454
455        if pattern.contains('*') {
456            // Simple wildcard matching
457            let regex_pattern = pattern.replace('*', ".*");
458            if let Ok(regex) = regex::Regex::new(&regex_pattern) {
459                return regex.is_match(value);
460            }
461        }
462
463        pattern == value
464    }
465
466    /// Check if update should be debounced
467    async fn should_debounce(&self, update: &GraphQLUpdate) -> bool {
468        let tracker = self.debounce_tracker.read().await;
469
470        if let Some(last_update) = tracker.get(&update.id) {
471            let elapsed = Instant::now().duration_since(*last_update);
472            elapsed < self.config.debounce_duration
473        } else {
474            false
475        }
476    }
477
478    /// Update debounce tracker
479    async fn update_debounce_tracker(&self, update: &GraphQLUpdate) {
480        let mut tracker = self.debounce_tracker.write().await;
481        tracker.insert(update.id.clone(), Instant::now());
482    }
483
484    /// Add update to batch buffer
485    async fn add_to_batch(&self, update: GraphQLUpdate) -> Result<()> {
486        let mut buffer = self.batch_buffer.write().await;
487        buffer.push(update);
488
489        if buffer.len() >= self.config.max_batch_size {
490            // Flush immediately if batch is full
491            let updates = std::mem::take(&mut *buffer);
492            drop(buffer);
493            self.send_batch(updates).await?;
494        }
495
496        Ok(())
497    }
498
499    /// Send individual update
500    async fn send_update(&self, update: GraphQLUpdate) -> Result<()> {
501        match self.event_sender.send(update.clone()) {
502            Ok(receiver_count) => {
503                debug!("Sent GraphQL update to {} receivers", receiver_count);
504                self.stats.write().await.updates_sent += 1;
505                Ok(())
506            }
507            Err(e) => {
508                warn!("No active GraphQL subscription receivers: {}", e);
509                Ok(())
510            }
511        }
512    }
513
514    /// Send batch of updates
515    async fn send_batch(&self, updates: Vec<GraphQLUpdate>) -> Result<()> {
516        for update in updates {
517            self.send_update(update).await?;
518        }
519
520        self.stats.write().await.updates_batched += 1;
521        Ok(())
522    }
523
524    /// Start batch processor task
525    fn start_batch_processor(&self) {
526        let batch_buffer = Arc::clone(&self.batch_buffer);
527        let event_sender = self.event_sender.clone();
528        let batch_interval = self.config.batch_interval;
529        let stats = Arc::clone(&self.stats);
530
531        tokio::spawn(async move {
532            let mut interval = interval(batch_interval);
533
534            loop {
535                interval.tick().await;
536
537                let updates = {
538                    let mut buffer = batch_buffer.write().await;
539                    if buffer.is_empty() {
540                        continue;
541                    }
542                    std::mem::take(&mut *buffer)
543                };
544
545                if !updates.is_empty() {
546                    debug!("Processing batch of {} updates", updates.len());
547
548                    for update in updates {
549                        if let Err(e) = event_sender.send(update) {
550                            warn!("Failed to send batched update: {}", e);
551                        } else {
552                            stats.write().await.updates_sent += 1;
553                        }
554                    }
555
556                    stats.write().await.updates_batched += 1;
557                }
558            }
559        });
560    }
561
562    /// Subscribe to GraphQL updates
563    pub fn subscribe(&self) -> broadcast::Receiver<GraphQLUpdate> {
564        self.event_sender.subscribe()
565    }
566
567    /// Get bridge statistics
568    pub async fn get_stats(&self) -> BridgeStats {
569        self.stats.read().await.clone()
570    }
571
572    /// List all registered subscriptions
573    pub async fn list_subscriptions(&self) -> Vec<String> {
574        self.subscriptions.read().await.keys().cloned().collect()
575    }
576
577    /// Get subscription details
578    pub async fn get_subscription(&self, id: &str) -> Option<GraphQLSubscription> {
579        self.subscriptions.read().await.get(id).cloned()
580    }
581}
582
583/// Helper function to create a simple GraphQL subscription
584pub fn create_simple_subscription(
585    query: String,
586    filters: Vec<SubscriptionFilter>,
587) -> GraphQLSubscription {
588    GraphQLSubscription {
589        id: uuid::Uuid::new_v4().to_string(),
590        query,
591        variables: HashMap::new(),
592        filters,
593        created_at: Utc::now(),
594        last_update: None,
595        update_count: 0,
596    }
597}
598
599#[cfg(test)]
600mod tests {
601    use super::*;
602
603    #[test]
604    fn test_bridge_config_default() {
605        let config = BridgeConfig::default();
606        assert_eq!(config.max_queue_size, 10000);
607        assert!(config.enable_batching);
608        assert!(config.enable_query_filtering);
609    }
610
611    #[tokio::test]
612    async fn test_graphql_bridge_creation() {
613        let bridge = GraphQLBridge::new(BridgeConfig::default());
614        let stats = bridge.get_stats().await;
615
616        assert_eq!(stats.active_subscriptions, 0);
617        assert_eq!(stats.events_processed, 0);
618    }
619
620    #[tokio::test]
621    async fn test_subscription_registration() {
622        let bridge = GraphQLBridge::new(BridgeConfig::default());
623
624        let subscription = create_simple_subscription(
625            "subscription { triples { subject predicate object } }".to_string(),
626            vec![],
627        );
628
629        let id = bridge.register_subscription(subscription).await.unwrap();
630
631        assert!(!id.is_empty());
632
633        let stats = bridge.get_stats().await;
634        assert_eq!(stats.active_subscriptions, 1);
635    }
636
637    #[tokio::test]
638    async fn test_pattern_matching() {
639        let bridge = GraphQLBridge::new(BridgeConfig::default());
640
641        assert!(bridge.pattern_matches("*", "anything"));
642        assert!(bridge.pattern_matches("http://example.org/*", "http://example.org/resource"));
643        assert!(!bridge.pattern_matches("http://example.org/*", "http://other.org/resource"));
644        assert!(bridge.pattern_matches("exact_match", "exact_match"));
645        assert!(!bridge.pattern_matches("exact_match", "different"));
646    }
647
648    #[test]
649    fn test_subscription_filter_types() {
650        let filter = SubscriptionFilter::SubjectPattern("http://example.org/*".to_string());
651        matches!(filter, SubscriptionFilter::SubjectPattern(_));
652
653        let filter2 = SubscriptionFilter::GraphFilter("http://example.org/graph1".to_string());
654        matches!(filter2, SubscriptionFilter::GraphFilter(_));
655    }
656}