Skip to main content

oxirs_stream/
graphql_subscriptions.rs

1//! # Enhanced GraphQL Subscription System
2//!
3//! Advanced GraphQL subscription features for real-time RDF stream updates:
4//! - Window-based subscriptions
5//! - Advanced filtering and pattern matching
6//! - Subscription lifecycle management
7//! - Subscription groups and namespaces
8//! - Connection pooling and resilience
9
10use anyhow::{anyhow, Result};
11use chrono::{DateTime, Duration as ChronoDuration, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, HashSet, VecDeque};
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{broadcast, RwLock};
17use tokio::time::interval;
18use tracing::{debug, info};
19
20use crate::StreamEvent;
21
22/// Enhanced GraphQL subscription manager
23pub struct GraphQLSubscriptionManager {
24    /// Active subscriptions
25    subscriptions: Arc<RwLock<HashMap<String, EnhancedSubscription>>>,
26    /// Subscription groups
27    groups: Arc<RwLock<HashMap<String, SubscriptionGroup>>>,
28    /// Window buffers
29    windows: Arc<RwLock<HashMap<String, SubscriptionWindow>>>,
30    /// Event broadcaster
31    event_tx: broadcast::Sender<SubscriptionEvent>,
32    /// Configuration
33    config: SubscriptionConfig,
34    /// Statistics
35    stats: Arc<RwLock<SubscriptionStats>>,
36}
37
38/// Subscription configuration
39#[derive(Debug, Clone)]
40pub struct SubscriptionConfig {
41    /// Maximum concurrent subscriptions
42    pub max_subscriptions: usize,
43    /// Maximum subscriptions per client
44    pub max_subscriptions_per_client: usize,
45    /// Default window size
46    pub default_window_size: Duration,
47    /// Enable windowing
48    pub enable_windowing: bool,
49    /// Enable advanced filtering
50    pub enable_advanced_filtering: bool,
51    /// Heartbeat interval
52    pub heartbeat_interval: Duration,
53    /// Subscription timeout
54    pub subscription_timeout: Duration,
55}
56
57impl Default for SubscriptionConfig {
58    fn default() -> Self {
59        Self {
60            max_subscriptions: 10000,
61            max_subscriptions_per_client: 100,
62            default_window_size: Duration::from_secs(60),
63            enable_windowing: true,
64            enable_advanced_filtering: true,
65            heartbeat_interval: Duration::from_secs(30),
66            subscription_timeout: Duration::from_secs(300),
67        }
68    }
69}
70
71/// Enhanced subscription with lifecycle management
72#[derive(Debug, Clone)]
73pub struct EnhancedSubscription {
74    /// Subscription identifier
75    pub id: String,
76    /// Client identifier
77    pub client_id: String,
78    /// GraphQL query
79    pub query: String,
80    /// Variables
81    pub variables: HashMap<String, serde_json::Value>,
82    /// Advanced filters
83    pub filters: Vec<AdvancedFilter>,
84    /// Window specification
85    pub window: Option<WindowSpec>,
86    /// Lifecycle state
87    pub state: SubscriptionState,
88    /// Metadata
89    pub metadata: SubscriptionMetadata,
90    /// Statistics
91    pub stats: SubscriptionStatistics,
92}
93
94/// Subscription lifecycle state
95#[derive(Debug, Clone, PartialEq)]
96pub enum SubscriptionState {
97    /// Subscription is active
98    Active,
99    /// Subscription is paused (buffering updates)
100    Paused,
101    /// Subscription is in reconnection mode
102    Reconnecting {
103        attempts: u32,
104        next_retry: DateTime<Utc>,
105    },
106    /// Subscription is throttled
107    Throttled { until: DateTime<Utc> },
108    /// Subscription is terminated
109    Terminated {
110        reason: String,
111        timestamp: DateTime<Utc>,
112    },
113}
114
115/// Advanced filtering options
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub enum AdvancedFilter {
118    /// Time-based filter
119    TimeRange {
120        start: Option<DateTime<Utc>>,
121        end: Option<DateTime<Utc>>,
122    },
123    /// Value-based filter
124    ValueFilter {
125        field: String,
126        operator: FilterOperator,
127        value: serde_json::Value,
128    },
129    /// Pattern matching filter
130    PatternMatch {
131        field: String,
132        pattern: String,
133        case_sensitive: bool,
134    },
135    /// Geospatial filter
136    GeoFilter {
137        latitude: f64,
138        longitude: f64,
139        radius_km: f64,
140    },
141    /// Semantic filter (RDF-specific)
142    SemanticFilter {
143        subject_pattern: Option<String>,
144        predicate_pattern: Option<String>,
145        object_pattern: Option<String>,
146    },
147    /// Aggregation filter
148    AggregationFilter {
149        function: AggregationFunction,
150        threshold: f64,
151    },
152    /// Composite filter
153    CompositeFilter {
154        operator: LogicalOperator,
155        filters: Vec<Box<AdvancedFilter>>,
156    },
157}
158
159/// Filter operators
160#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
161pub enum FilterOperator {
162    Equal,
163    NotEqual,
164    LessThan,
165    LessThanOrEqual,
166    GreaterThan,
167    GreaterThanOrEqual,
168    Contains,
169    StartsWith,
170    EndsWith,
171    In,
172    NotIn,
173}
174
175/// Aggregation functions for filtering
176#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
177pub enum AggregationFunction {
178    Count,
179    Sum,
180    Average,
181    Min,
182    Max,
183    StdDev,
184}
185
186/// Logical operators for composite filters
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub enum LogicalOperator {
189    And,
190    Or,
191    Not,
192    Xor,
193}
194
195/// Window specification for subscriptions
196#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct WindowSpec {
198    /// Window type
199    pub window_type: WindowType,
200    /// Size (time or count)
201    pub size: WindowSize,
202    /// Slide interval (for sliding windows)
203    pub slide: Option<WindowSize>,
204    /// Trigger conditions
205    pub triggers: Vec<WindowTrigger>,
206}
207
208/// Window types
209#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
210pub enum WindowType {
211    Tumbling,
212    Sliding,
213    Session,
214    Global,
215}
216
217/// Window size specification
218#[derive(Debug, Clone, Serialize, Deserialize)]
219pub enum WindowSize {
220    Time(Duration),
221    Count(usize),
222    Bytes(usize),
223}
224
225/// Window trigger conditions
226#[derive(Debug, Clone, Serialize, Deserialize)]
227pub enum WindowTrigger {
228    /// Trigger on time interval
229    TimeInterval(Duration),
230    /// Trigger on event count
231    EventCount(usize),
232    /// Trigger on watermark
233    Watermark,
234    /// Trigger on specific event type
235    EventType(String),
236    /// Custom trigger condition
237    Custom(String),
238}
239
240/// Subscription metadata
241#[derive(Debug, Clone)]
242pub struct SubscriptionMetadata {
243    /// Creation timestamp
244    pub created_at: DateTime<Utc>,
245    /// Last activity timestamp
246    pub last_activity: DateTime<Utc>,
247    /// Tags for organization
248    pub tags: Vec<String>,
249    /// Priority level
250    pub priority: SubscriptionPriority,
251    /// Namespace
252    pub namespace: Option<String>,
253    /// Group membership
254    pub groups: Vec<String>,
255}
256
257/// Subscription priority
258#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
259pub enum SubscriptionPriority {
260    Low = 1,
261    Normal = 2,
262    High = 3,
263    Critical = 4,
264}
265
266/// Subscription statistics
267#[derive(Debug, Clone, Default)]
268pub struct SubscriptionStatistics {
269    /// Total events received
270    pub events_received: u64,
271    /// Total updates sent
272    pub updates_sent: u64,
273    /// Total bytes sent
274    pub bytes_sent: u64,
275    /// Average latency (ms)
276    pub avg_latency_ms: f64,
277    /// Max latency (ms)
278    pub max_latency_ms: f64,
279    /// Error count
280    pub error_count: u64,
281    /// Last error
282    pub last_error: Option<String>,
283}
284
285/// Subscription group for managing related subscriptions
286#[derive(Debug, Clone)]
287pub struct SubscriptionGroup {
288    /// Group identifier
289    pub id: String,
290    /// Group name
291    pub name: String,
292    /// Member subscription IDs
293    pub members: HashSet<String>,
294    /// Group-level filters
295    pub filters: Vec<AdvancedFilter>,
296    /// Group configuration
297    pub config: GroupConfig,
298}
299
300/// Group configuration
301#[derive(Debug, Clone)]
302pub struct GroupConfig {
303    /// Enable shared windowing
304    pub shared_windowing: bool,
305    /// Enable load balancing
306    pub load_balancing: bool,
307    /// Maximum members
308    pub max_members: usize,
309}
310
311/// Subscription window buffer
312pub struct SubscriptionWindow {
313    /// Window identifier
314    pub id: String,
315    /// Associated subscription ID
316    pub subscription_id: String,
317    /// Window specification
318    pub spec: WindowSpec,
319    /// Event buffer
320    pub buffer: VecDeque<WindowedEvent>,
321    /// Window state
322    pub state: WindowState,
323}
324
325/// Windowed event
326#[derive(Debug, Clone)]
327pub struct WindowedEvent {
328    pub event: StreamEvent,
329    pub timestamp: DateTime<Utc>,
330    pub sequence_id: u64,
331}
332
333/// Window state
334#[derive(Debug, Clone)]
335pub struct WindowState {
336    /// Window start time
337    pub start_time: DateTime<Utc>,
338    /// Window end time
339    pub end_time: Option<DateTime<Utc>>,
340    /// Event count
341    pub event_count: usize,
342    /// Total bytes
343    pub total_bytes: usize,
344    /// Is window closed
345    pub is_closed: bool,
346}
347
348/// Subscription event
349#[derive(Debug, Clone)]
350pub enum SubscriptionEvent {
351    /// New update available
352    Update {
353        subscription_id: String,
354        data: serde_json::Value,
355        timestamp: DateTime<Utc>,
356    },
357    /// Subscription state changed
358    StateChanged {
359        subscription_id: String,
360        old_state: SubscriptionState,
361        new_state: SubscriptionState,
362    },
363    /// Heartbeat
364    Heartbeat {
365        subscription_id: String,
366        timestamp: DateTime<Utc>,
367    },
368    /// Error occurred
369    Error {
370        subscription_id: String,
371        error: String,
372        timestamp: DateTime<Utc>,
373    },
374}
375
376/// Overall statistics
377#[derive(Debug, Clone, Default)]
378pub struct SubscriptionStats {
379    pub total_subscriptions: usize,
380    pub active_subscriptions: usize,
381    pub paused_subscriptions: usize,
382    pub reconnecting_subscriptions: usize,
383    pub total_events_processed: u64,
384    pub total_updates_sent: u64,
385    pub avg_processing_time_ms: f64,
386}
387
388impl GraphQLSubscriptionManager {
389    /// Create a new subscription manager
390    pub fn new(config: SubscriptionConfig) -> Self {
391        let (event_tx, _) = broadcast::channel(10000);
392
393        let manager = Self {
394            subscriptions: Arc::new(RwLock::new(HashMap::new())),
395            groups: Arc::new(RwLock::new(HashMap::new())),
396            windows: Arc::new(RwLock::new(HashMap::new())),
397            event_tx,
398            config,
399            stats: Arc::new(RwLock::new(SubscriptionStats::default())),
400        };
401
402        // Start background tasks
403        manager.start_heartbeat_task();
404        manager.start_cleanup_task();
405
406        manager
407    }
408
409    /// Register a new subscription
410    pub async fn register_subscription(
411        &self,
412        subscription: EnhancedSubscription,
413    ) -> Result<String> {
414        let mut subscriptions = self.subscriptions.write().await;
415
416        // Check limits
417        if subscriptions.len() >= self.config.max_subscriptions {
418            return Err(anyhow!("Maximum subscriptions limit reached"));
419        }
420
421        // Check per-client limit
422        let client_count = subscriptions
423            .values()
424            .filter(|s| s.client_id == subscription.client_id)
425            .count();
426
427        if client_count >= self.config.max_subscriptions_per_client {
428            return Err(anyhow!("Client subscription limit reached"));
429        }
430
431        let id = subscription.id.clone();
432
433        // Create window if needed
434        if self.config.enable_windowing {
435            if let Some(window_spec) = &subscription.window {
436                self.create_window(&id, window_spec.clone()).await?;
437            }
438        }
439
440        subscriptions.insert(id.clone(), subscription);
441
442        // Update stats
443        let mut stats = self.stats.write().await;
444        stats.total_subscriptions = subscriptions.len();
445        stats.active_subscriptions = subscriptions
446            .values()
447            .filter(|s| s.state == SubscriptionState::Active)
448            .count();
449
450        info!("Registered GraphQL subscription: {}", id);
451        Ok(id)
452    }
453
454    /// Unregister a subscription
455    pub async fn unregister_subscription(&self, subscription_id: &str) -> Result<()> {
456        let mut subscriptions = self.subscriptions.write().await;
457        subscriptions
458            .remove(subscription_id)
459            .ok_or_else(|| anyhow!("Subscription not found"))?;
460
461        // Remove window
462        self.windows.write().await.remove(subscription_id);
463
464        // Update stats
465        let mut stats = self.stats.write().await;
466        stats.total_subscriptions = subscriptions.len();
467        stats.active_subscriptions = subscriptions
468            .values()
469            .filter(|s| s.state == SubscriptionState::Active)
470            .count();
471
472        info!("Unregistered GraphQL subscription: {}", subscription_id);
473        Ok(())
474    }
475
476    /// Pause a subscription
477    pub async fn pause_subscription(&self, subscription_id: &str) -> Result<()> {
478        let mut subscriptions = self.subscriptions.write().await;
479        let subscription = subscriptions
480            .get_mut(subscription_id)
481            .ok_or_else(|| anyhow!("Subscription not found"))?;
482
483        let old_state = subscription.state.clone();
484        subscription.state = SubscriptionState::Paused;
485
486        // Emit state change event
487        let _ = self.event_tx.send(SubscriptionEvent::StateChanged {
488            subscription_id: subscription_id.to_string(),
489            old_state,
490            new_state: SubscriptionState::Paused,
491        });
492
493        info!("Paused subscription: {}", subscription_id);
494        Ok(())
495    }
496
497    /// Resume a paused subscription
498    pub async fn resume_subscription(&self, subscription_id: &str) -> Result<()> {
499        let mut subscriptions = self.subscriptions.write().await;
500        let subscription = subscriptions
501            .get_mut(subscription_id)
502            .ok_or_else(|| anyhow!("Subscription not found"))?;
503
504        let old_state = subscription.state.clone();
505        subscription.state = SubscriptionState::Active;
506        subscription.metadata.last_activity = Utc::now();
507
508        // Emit state change event
509        let _ = self.event_tx.send(SubscriptionEvent::StateChanged {
510            subscription_id: subscription_id.to_string(),
511            old_state,
512            new_state: SubscriptionState::Active,
513        });
514
515        info!("Resumed subscription: {}", subscription_id);
516        Ok(())
517    }
518
519    /// Process stream event
520    pub async fn process_event(&self, event: &StreamEvent) -> Result<()> {
521        let subscriptions = self.subscriptions.read().await;
522
523        for (sub_id, subscription) in subscriptions.iter() {
524            // Only process for active subscriptions
525            if subscription.state != SubscriptionState::Active {
526                continue;
527            }
528
529            // Apply filters
530            if !self.apply_filters(event, &subscription.filters).await? {
531                continue;
532            }
533
534            // Handle windowing
535            if self.config.enable_windowing && subscription.window.is_some() {
536                self.add_to_window(sub_id, event).await?;
537            } else {
538                // Send immediate update
539                self.send_update(sub_id, event).await?;
540            }
541        }
542
543        let mut stats = self.stats.write().await;
544        stats.total_events_processed += 1;
545
546        Ok(())
547    }
548
549    /// Apply filters to event
550    async fn apply_filters(
551        &self,
552        _event: &StreamEvent,
553        filters: &[AdvancedFilter],
554    ) -> Result<bool> {
555        if !self.config.enable_advanced_filtering || filters.is_empty() {
556            return Ok(true);
557        }
558
559        // Simplified filter logic - in production, implement full filter evaluation
560        for filter in filters {
561            match filter {
562                AdvancedFilter::TimeRange { start, end } => {
563                    let now = Utc::now();
564                    if let Some(start) = start {
565                        if &now < start {
566                            return Ok(false);
567                        }
568                    }
569                    if let Some(end) = end {
570                        if &now > end {
571                            return Ok(false);
572                        }
573                    }
574                }
575                _ => {
576                    // Other filter types would be evaluated here
577                }
578            }
579        }
580
581        Ok(true)
582    }
583
584    /// Create window for subscription
585    async fn create_window(&self, subscription_id: &str, spec: WindowSpec) -> Result<()> {
586        let window = SubscriptionWindow {
587            id: uuid::Uuid::new_v4().to_string(),
588            subscription_id: subscription_id.to_string(),
589            spec,
590            buffer: VecDeque::new(),
591            state: WindowState {
592                start_time: Utc::now(),
593                end_time: None,
594                event_count: 0,
595                total_bytes: 0,
596                is_closed: false,
597            },
598        };
599
600        self.windows
601            .write()
602            .await
603            .insert(subscription_id.to_string(), window);
604
605        Ok(())
606    }
607
608    /// Add event to window
609    async fn add_to_window(&self, subscription_id: &str, event: &StreamEvent) -> Result<()> {
610        let mut windows = self.windows.write().await;
611        if let Some(window) = windows.get_mut(subscription_id) {
612            let windowed_event = WindowedEvent {
613                event: event.clone(),
614                timestamp: Utc::now(),
615                sequence_id: window.state.event_count as u64,
616            };
617
618            window.buffer.push_back(windowed_event);
619            window.state.event_count += 1;
620
621            // Check triggers
622            self.check_window_triggers(window).await?;
623        }
624
625        Ok(())
626    }
627
628    /// Check if window triggers should fire
629    async fn check_window_triggers(&self, window: &mut SubscriptionWindow) -> Result<()> {
630        for trigger in &window.spec.triggers {
631            match trigger {
632                WindowTrigger::EventCount(count) if window.state.event_count >= *count => {
633                    // Trigger window emission
634                    debug!("Window trigger fired: event count {}", count);
635                }
636                WindowTrigger::TimeInterval(duration) => {
637                    let elapsed = Utc::now() - window.state.start_time;
638                    if elapsed > ChronoDuration::from_std(*duration)? {
639                        debug!("Window trigger fired: time interval {:?}", duration);
640                    }
641                }
642                _ => {}
643            }
644        }
645
646        Ok(())
647    }
648
649    /// Send update to subscription
650    async fn send_update(&self, subscription_id: &str, event: &StreamEvent) -> Result<()> {
651        // Convert event to GraphQL data
652        let data = self.convert_event_to_graphql(event)?;
653
654        // Emit update event
655        let _ = self.event_tx.send(SubscriptionEvent::Update {
656            subscription_id: subscription_id.to_string(),
657            data,
658            timestamp: Utc::now(),
659        });
660
661        // Update subscription statistics
662        let mut subscriptions = self.subscriptions.write().await;
663        if let Some(subscription) = subscriptions.get_mut(subscription_id) {
664            subscription.stats.updates_sent += 1;
665            subscription.metadata.last_activity = Utc::now();
666        }
667
668        Ok(())
669    }
670
671    /// Convert stream event to GraphQL data
672    fn convert_event_to_graphql(&self, event: &StreamEvent) -> Result<serde_json::Value> {
673        // Simplified conversion - in production, implement full event mapping
674        Ok(serde_json::json!({
675            "type": format!("{:?}", event),
676            "timestamp": Utc::now().to_rfc3339(),
677        }))
678    }
679
680    /// Start heartbeat task
681    fn start_heartbeat_task(&self) {
682        let subscriptions = self.subscriptions.clone();
683        let event_tx = self.event_tx.clone();
684        let interval_duration = self.config.heartbeat_interval;
685
686        tokio::spawn(async move {
687            let mut interval_timer = interval(interval_duration);
688
689            loop {
690                interval_timer.tick().await;
691
692                let subs = subscriptions.read().await;
693                for (sub_id, subscription) in subs.iter() {
694                    if subscription.state == SubscriptionState::Active {
695                        let _ = event_tx.send(SubscriptionEvent::Heartbeat {
696                            subscription_id: sub_id.clone(),
697                            timestamp: Utc::now(),
698                        });
699                    }
700                }
701            }
702        });
703    }
704
705    /// Start cleanup task
706    fn start_cleanup_task(&self) {
707        let subscriptions = self.subscriptions.clone();
708        let timeout = self.config.subscription_timeout;
709
710        tokio::spawn(async move {
711            let mut interval_timer = interval(Duration::from_secs(60));
712
713            loop {
714                interval_timer.tick().await;
715
716                let mut subs = subscriptions.write().await;
717                let now = Utc::now();
718
719                // Remove timed-out subscriptions
720                subs.retain(|_, subscription| {
721                    let inactive_duration = now - subscription.metadata.last_activity;
722                    inactive_duration
723                        < ChronoDuration::from_std(timeout)
724                            .expect("timeout should be valid chrono Duration")
725                });
726            }
727        });
728    }
729
730    /// Get statistics
731    pub async fn get_stats(&self) -> SubscriptionStats {
732        self.stats.read().await.clone()
733    }
734
735    /// Subscribe to events
736    pub fn subscribe(&self) -> broadcast::Receiver<SubscriptionEvent> {
737        self.event_tx.subscribe()
738    }
739}
740
741#[cfg(test)]
742mod tests {
743    use super::*;
744
745    #[tokio::test]
746    async fn test_subscription_config_defaults() {
747        let config = SubscriptionConfig::default();
748        assert_eq!(config.max_subscriptions, 10000);
749        assert!(config.enable_windowing);
750    }
751
752    #[tokio::test]
753    async fn test_subscription_states() {
754        let state = SubscriptionState::Active;
755        assert_eq!(state, SubscriptionState::Active);
756
757        let state = SubscriptionState::Paused;
758        assert_eq!(state, SubscriptionState::Paused);
759    }
760
761    #[tokio::test]
762    async fn test_filter_operators() {
763        assert_eq!(FilterOperator::Equal, FilterOperator::Equal);
764        assert_ne!(FilterOperator::Equal, FilterOperator::NotEqual);
765    }
766
767    #[tokio::test]
768    async fn test_window_types() {
769        let window = WindowSpec {
770            window_type: WindowType::Tumbling,
771            size: WindowSize::Time(Duration::from_secs(60)),
772            slide: None,
773            triggers: vec![WindowTrigger::EventCount(100)],
774        };
775
776        assert_eq!(window.window_type, WindowType::Tumbling);
777    }
778
779    #[tokio::test]
780    async fn test_subscription_priority() {
781        assert!(SubscriptionPriority::Critical > SubscriptionPriority::High);
782        assert!(SubscriptionPriority::High > SubscriptionPriority::Normal);
783        assert!(SubscriptionPriority::Normal > SubscriptionPriority::Low);
784    }
785}