1use anyhow::{anyhow, Result};
34use chrono::{DateTime, Utc};
35use serde::{Deserialize, Serialize};
36use std::collections::HashMap;
37use std::sync::Arc;
38use std::time::{Duration, Instant};
39use tokio::sync::{broadcast, RwLock};
40use tokio::time::interval;
41use tracing::{debug, info, warn};
42
43use crate::StreamEvent;
44
45#[derive(Debug, Clone)]
47pub struct BridgeConfig {
48 pub max_queue_size: usize,
50 pub debounce_duration: Duration,
52 pub enable_batching: bool,
54 pub max_batch_size: usize,
56 pub batch_interval: Duration,
58 pub enable_query_filtering: bool,
60 pub max_subscriptions: usize,
62}
63
64impl Default for BridgeConfig {
65 fn default() -> Self {
66 Self {
67 max_queue_size: 10000,
68 debounce_duration: Duration::from_millis(100),
69 enable_batching: true,
70 max_batch_size: 100,
71 batch_interval: Duration::from_millis(500),
72 enable_query_filtering: true,
73 max_subscriptions: 1000,
74 }
75 }
76}
77
78pub struct GraphQLBridge {
80 config: BridgeConfig,
82 subscriptions: Arc<RwLock<HashMap<String, GraphQLSubscription>>>,
84 event_sender: broadcast::Sender<GraphQLUpdate>,
86 stats: Arc<RwLock<BridgeStats>>,
88 debounce_tracker: Arc<RwLock<HashMap<String, Instant>>>,
90 batch_buffer: Arc<RwLock<Vec<GraphQLUpdate>>>,
92}
93
94#[derive(Debug, Clone)]
96pub struct GraphQLSubscription {
97 pub id: String,
99 pub query: String,
101 pub variables: HashMap<String, serde_json::Value>,
103 pub filters: Vec<SubscriptionFilter>,
105 pub created_at: DateTime<Utc>,
107 pub last_update: Option<DateTime<Utc>>,
109 pub update_count: u64,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub enum SubscriptionFilter {
116 SubjectPattern(String),
118 PredicatePattern(String),
120 ObjectPattern(String),
122 GraphFilter(String),
124 CustomFilter(String),
126}
127
128#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct GraphQLUpdate {
131 pub id: String,
133 pub timestamp: DateTime<Utc>,
135 pub update_type: GraphQLUpdateType,
137 pub data: serde_json::Value,
139 pub subscriptions: Vec<String>,
141}
142
143#[derive(Debug, Clone, Serialize, Deserialize)]
145pub enum GraphQLUpdateType {
146 DataAdded,
148 DataRemoved,
150 DataModified,
152 BulkUpdate,
154 QueryResultChanged,
156}
157
158#[derive(Debug, Clone, Default)]
160pub struct BridgeStats {
161 pub events_processed: u64,
163 pub updates_sent: u64,
165 pub updates_batched: u64,
167 pub updates_debounced: u64,
169 pub active_subscriptions: usize,
171 pub avg_processing_time_ms: f64,
173}
174
175impl GraphQLBridge {
176 pub fn new(config: BridgeConfig) -> Self {
178 let (event_sender, _) = broadcast::channel(config.max_queue_size);
179
180 let bridge = Self {
181 config,
182 subscriptions: Arc::new(RwLock::new(HashMap::new())),
183 event_sender,
184 stats: Arc::new(RwLock::new(BridgeStats::default())),
185 debounce_tracker: Arc::new(RwLock::new(HashMap::new())),
186 batch_buffer: Arc::new(RwLock::new(Vec::new())),
187 };
188
189 if bridge.config.enable_batching {
191 bridge.start_batch_processor();
192 }
193
194 bridge
195 }
196
197 pub async fn register_subscription(&self, subscription: GraphQLSubscription) -> Result<String> {
199 let mut subscriptions = self.subscriptions.write().await;
200
201 if subscriptions.len() >= self.config.max_subscriptions {
202 return Err(anyhow!("Maximum subscriptions limit reached"));
203 }
204
205 let id = subscription.id.clone();
206 subscriptions.insert(id.clone(), subscription);
207
208 self.stats.write().await.active_subscriptions = subscriptions.len();
210
211 info!("Registered GraphQL subscription: {}", id);
212 Ok(id)
213 }
214
215 pub async fn unregister_subscription(&self, subscription_id: &str) -> Result<()> {
217 let mut subscriptions = self.subscriptions.write().await;
218 subscriptions
219 .remove(subscription_id)
220 .ok_or_else(|| anyhow!("Subscription not found"))?;
221
222 self.stats.write().await.active_subscriptions = subscriptions.len();
224
225 info!("Unregistered GraphQL subscription: {}", subscription_id);
226 Ok(())
227 }
228
229 pub async fn process_stream_event(&self, event: &StreamEvent) -> Result<()> {
231 let start_time = Instant::now();
232
233 let update = self.convert_stream_event_to_update(event).await?;
235
236 if self.should_debounce(&update).await {
238 self.stats.write().await.updates_debounced += 1;
239 return Ok(());
240 }
241
242 self.update_debounce_tracker(&update).await;
244
245 if self.config.enable_batching {
247 self.add_to_batch(update).await?;
248 } else {
249 self.send_update(update).await?;
250 }
251
252 let mut stats = self.stats.write().await;
254 stats.events_processed += 1;
255
256 let processing_time = start_time.elapsed().as_millis() as f64;
257 stats.avg_processing_time_ms = (stats.avg_processing_time_ms + processing_time) / 2.0;
258
259 Ok(())
260 }
261
262 async fn convert_stream_event_to_update(&self, event: &StreamEvent) -> Result<GraphQLUpdate> {
264 let (update_type, data) = match event {
265 StreamEvent::TripleAdded {
266 subject,
267 predicate,
268 object,
269 graph,
270 metadata,
271 } => (
272 GraphQLUpdateType::DataAdded,
273 serde_json::json!({
274 "subject": subject,
275 "predicate": predicate,
276 "object": object,
277 "graph": graph,
278 "timestamp": metadata.timestamp,
279 }),
280 ),
281 StreamEvent::TripleRemoved {
282 subject,
283 predicate,
284 object,
285 graph,
286 metadata,
287 } => (
288 GraphQLUpdateType::DataRemoved,
289 serde_json::json!({
290 "subject": subject,
291 "predicate": predicate,
292 "object": object,
293 "graph": graph,
294 "timestamp": metadata.timestamp,
295 }),
296 ),
297 StreamEvent::QuadAdded {
298 subject,
299 predicate,
300 object,
301 graph,
302 metadata,
303 } => (
304 GraphQLUpdateType::DataAdded,
305 serde_json::json!({
306 "subject": subject,
307 "predicate": predicate,
308 "object": object,
309 "graph": graph,
310 "timestamp": metadata.timestamp,
311 }),
312 ),
313 StreamEvent::QuadRemoved {
314 subject,
315 predicate,
316 object,
317 graph,
318 metadata,
319 } => (
320 GraphQLUpdateType::DataRemoved,
321 serde_json::json!({
322 "subject": subject,
323 "predicate": predicate,
324 "object": object,
325 "graph": graph,
326 "timestamp": metadata.timestamp,
327 }),
328 ),
329 StreamEvent::QueryResultAdded {
330 query_id,
331 result,
332 metadata,
333 } => (
334 GraphQLUpdateType::QueryResultChanged,
335 serde_json::json!({
336 "query_id": query_id,
337 "result": result.bindings,
338 "execution_time": result.execution_time.as_millis(),
339 "timestamp": metadata.timestamp,
340 }),
341 ),
342 StreamEvent::QueryResultRemoved {
343 query_id,
344 result,
345 metadata,
346 } => (
347 GraphQLUpdateType::QueryResultChanged,
348 serde_json::json!({
349 "query_id": query_id,
350 "result": result.bindings,
351 "execution_time": result.execution_time.as_millis(),
352 "timestamp": metadata.timestamp,
353 }),
354 ),
355 _ => (
356 GraphQLUpdateType::BulkUpdate,
357 serde_json::json!({
358 "message": "Bulk update occurred",
359 "timestamp": Utc::now(),
360 }),
361 ),
362 };
363
364 let relevant_subscriptions = self.find_relevant_subscriptions(&data).await;
366
367 Ok(GraphQLUpdate {
368 id: uuid::Uuid::new_v4().to_string(),
369 timestamp: Utc::now(),
370 update_type,
371 data,
372 subscriptions: relevant_subscriptions,
373 })
374 }
375
376 async fn find_relevant_subscriptions(&self, data: &serde_json::Value) -> Vec<String> {
378 let subscriptions = self.subscriptions.read().await;
379
380 if !self.config.enable_query_filtering {
381 return subscriptions.keys().cloned().collect();
383 }
384
385 let mut relevant = Vec::new();
386
387 for (id, subscription) in subscriptions.iter() {
388 if self.subscription_matches_data(subscription, data) {
389 relevant.push(id.clone());
390 }
391 }
392
393 relevant
394 }
395
396 fn subscription_matches_data(
398 &self,
399 subscription: &GraphQLSubscription,
400 data: &serde_json::Value,
401 ) -> bool {
402 if subscription.filters.is_empty() {
403 return true;
405 }
406
407 for filter in &subscription.filters {
409 match filter {
410 SubscriptionFilter::SubjectPattern(pattern) => {
411 if let Some(subject) = data.get("subject").and_then(|v| v.as_str()) {
412 if self.pattern_matches(pattern, subject) {
413 return true;
414 }
415 }
416 }
417 SubscriptionFilter::PredicatePattern(pattern) => {
418 if let Some(predicate) = data.get("predicate").and_then(|v| v.as_str()) {
419 if self.pattern_matches(pattern, predicate) {
420 return true;
421 }
422 }
423 }
424 SubscriptionFilter::ObjectPattern(pattern) => {
425 if let Some(object) = data.get("object").and_then(|v| v.as_str()) {
426 if self.pattern_matches(pattern, object) {
427 return true;
428 }
429 }
430 }
431 SubscriptionFilter::GraphFilter(graph_uri) => {
432 if let Some(graph) = data.get("graph").and_then(|v| v.as_str()) {
433 if graph == graph_uri {
434 return true;
435 }
436 }
437 }
438 SubscriptionFilter::CustomFilter(_expr) => {
439 return true;
442 }
443 }
444 }
445
446 false
447 }
448
449 fn pattern_matches(&self, pattern: &str, value: &str) -> bool {
451 if pattern == "*" {
452 return true;
453 }
454
455 if pattern.contains('*') {
456 let regex_pattern = pattern.replace('*', ".*");
458 if let Ok(regex) = regex::Regex::new(®ex_pattern) {
459 return regex.is_match(value);
460 }
461 }
462
463 pattern == value
464 }
465
466 async fn should_debounce(&self, update: &GraphQLUpdate) -> bool {
468 let tracker = self.debounce_tracker.read().await;
469
470 if let Some(last_update) = tracker.get(&update.id) {
471 let elapsed = Instant::now().duration_since(*last_update);
472 elapsed < self.config.debounce_duration
473 } else {
474 false
475 }
476 }
477
478 async fn update_debounce_tracker(&self, update: &GraphQLUpdate) {
480 let mut tracker = self.debounce_tracker.write().await;
481 tracker.insert(update.id.clone(), Instant::now());
482 }
483
484 async fn add_to_batch(&self, update: GraphQLUpdate) -> Result<()> {
486 let mut buffer = self.batch_buffer.write().await;
487 buffer.push(update);
488
489 if buffer.len() >= self.config.max_batch_size {
490 let updates = std::mem::take(&mut *buffer);
492 drop(buffer);
493 self.send_batch(updates).await?;
494 }
495
496 Ok(())
497 }
498
499 async fn send_update(&self, update: GraphQLUpdate) -> Result<()> {
501 match self.event_sender.send(update.clone()) {
502 Ok(receiver_count) => {
503 debug!("Sent GraphQL update to {} receivers", receiver_count);
504 self.stats.write().await.updates_sent += 1;
505 Ok(())
506 }
507 Err(e) => {
508 warn!("No active GraphQL subscription receivers: {}", e);
509 Ok(())
510 }
511 }
512 }
513
514 async fn send_batch(&self, updates: Vec<GraphQLUpdate>) -> Result<()> {
516 for update in updates {
517 self.send_update(update).await?;
518 }
519
520 self.stats.write().await.updates_batched += 1;
521 Ok(())
522 }
523
524 fn start_batch_processor(&self) {
526 let batch_buffer = Arc::clone(&self.batch_buffer);
527 let event_sender = self.event_sender.clone();
528 let batch_interval = self.config.batch_interval;
529 let stats = Arc::clone(&self.stats);
530
531 tokio::spawn(async move {
532 let mut interval = interval(batch_interval);
533
534 loop {
535 interval.tick().await;
536
537 let updates = {
538 let mut buffer = batch_buffer.write().await;
539 if buffer.is_empty() {
540 continue;
541 }
542 std::mem::take(&mut *buffer)
543 };
544
545 if !updates.is_empty() {
546 debug!("Processing batch of {} updates", updates.len());
547
548 for update in updates {
549 if let Err(e) = event_sender.send(update) {
550 warn!("Failed to send batched update: {}", e);
551 } else {
552 stats.write().await.updates_sent += 1;
553 }
554 }
555
556 stats.write().await.updates_batched += 1;
557 }
558 }
559 });
560 }
561
562 pub fn subscribe(&self) -> broadcast::Receiver<GraphQLUpdate> {
564 self.event_sender.subscribe()
565 }
566
567 pub async fn get_stats(&self) -> BridgeStats {
569 self.stats.read().await.clone()
570 }
571
572 pub async fn list_subscriptions(&self) -> Vec<String> {
574 self.subscriptions.read().await.keys().cloned().collect()
575 }
576
577 pub async fn get_subscription(&self, id: &str) -> Option<GraphQLSubscription> {
579 self.subscriptions.read().await.get(id).cloned()
580 }
581}
582
583pub fn create_simple_subscription(
585 query: String,
586 filters: Vec<SubscriptionFilter>,
587) -> GraphQLSubscription {
588 GraphQLSubscription {
589 id: uuid::Uuid::new_v4().to_string(),
590 query,
591 variables: HashMap::new(),
592 filters,
593 created_at: Utc::now(),
594 last_update: None,
595 update_count: 0,
596 }
597}
598
599#[cfg(test)]
600mod tests {
601 use super::*;
602
603 #[test]
604 fn test_bridge_config_default() {
605 let config = BridgeConfig::default();
606 assert_eq!(config.max_queue_size, 10000);
607 assert!(config.enable_batching);
608 assert!(config.enable_query_filtering);
609 }
610
611 #[tokio::test]
612 async fn test_graphql_bridge_creation() {
613 let bridge = GraphQLBridge::new(BridgeConfig::default());
614 let stats = bridge.get_stats().await;
615
616 assert_eq!(stats.active_subscriptions, 0);
617 assert_eq!(stats.events_processed, 0);
618 }
619
620 #[tokio::test]
621 async fn test_subscription_registration() {
622 let bridge = GraphQLBridge::new(BridgeConfig::default());
623
624 let subscription = create_simple_subscription(
625 "subscription { triples { subject predicate object } }".to_string(),
626 vec![],
627 );
628
629 let id = bridge.register_subscription(subscription).await.unwrap();
630
631 assert!(!id.is_empty());
632
633 let stats = bridge.get_stats().await;
634 assert_eq!(stats.active_subscriptions, 1);
635 }
636
637 #[tokio::test]
638 async fn test_pattern_matching() {
639 let bridge = GraphQLBridge::new(BridgeConfig::default());
640
641 assert!(bridge.pattern_matches("*", "anything"));
642 assert!(bridge.pattern_matches("http://example.org/*", "http://example.org/resource"));
643 assert!(!bridge.pattern_matches("http://example.org/*", "http://other.org/resource"));
644 assert!(bridge.pattern_matches("exact_match", "exact_match"));
645 assert!(!bridge.pattern_matches("exact_match", "different"));
646 }
647
648 #[test]
649 fn test_subscription_filter_types() {
650 let filter = SubscriptionFilter::SubjectPattern("http://example.org/*".to_string());
651 matches!(filter, SubscriptionFilter::SubjectPattern(_));
652
653 let filter2 = SubscriptionFilter::GraphFilter("http://example.org/graph1".to_string());
654 matches!(filter2, SubscriptionFilter::GraphFilter(_));
655 }
656}