1use anyhow::{anyhow, Result};
11use chrono::{DateTime, Duration as ChronoDuration, Utc};
12use serde::{Deserialize, Serialize};
13use std::collections::{HashMap, HashSet, VecDeque};
14use std::sync::Arc;
15use std::time::Duration;
16use tokio::sync::{broadcast, RwLock};
17use tokio::time::interval;
18use tracing::{debug, info};
19
20use crate::StreamEvent;
21
22pub struct GraphQLSubscriptionManager {
24 subscriptions: Arc<RwLock<HashMap<String, EnhancedSubscription>>>,
26 groups: Arc<RwLock<HashMap<String, SubscriptionGroup>>>,
28 windows: Arc<RwLock<HashMap<String, SubscriptionWindow>>>,
30 event_tx: broadcast::Sender<SubscriptionEvent>,
32 config: SubscriptionConfig,
34 stats: Arc<RwLock<SubscriptionStats>>,
36}
37
38#[derive(Debug, Clone)]
40pub struct SubscriptionConfig {
41 pub max_subscriptions: usize,
43 pub max_subscriptions_per_client: usize,
45 pub default_window_size: Duration,
47 pub enable_windowing: bool,
49 pub enable_advanced_filtering: bool,
51 pub heartbeat_interval: Duration,
53 pub subscription_timeout: Duration,
55}
56
57impl Default for SubscriptionConfig {
58 fn default() -> Self {
59 Self {
60 max_subscriptions: 10000,
61 max_subscriptions_per_client: 100,
62 default_window_size: Duration::from_secs(60),
63 enable_windowing: true,
64 enable_advanced_filtering: true,
65 heartbeat_interval: Duration::from_secs(30),
66 subscription_timeout: Duration::from_secs(300),
67 }
68 }
69}
70
71#[derive(Debug, Clone)]
73pub struct EnhancedSubscription {
74 pub id: String,
76 pub client_id: String,
78 pub query: String,
80 pub variables: HashMap<String, serde_json::Value>,
82 pub filters: Vec<AdvancedFilter>,
84 pub window: Option<WindowSpec>,
86 pub state: SubscriptionState,
88 pub metadata: SubscriptionMetadata,
90 pub stats: SubscriptionStatistics,
92}
93
94#[derive(Debug, Clone, PartialEq)]
96pub enum SubscriptionState {
97 Active,
99 Paused,
101 Reconnecting {
103 attempts: u32,
104 next_retry: DateTime<Utc>,
105 },
106 Throttled { until: DateTime<Utc> },
108 Terminated {
110 reason: String,
111 timestamp: DateTime<Utc>,
112 },
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub enum AdvancedFilter {
118 TimeRange {
120 start: Option<DateTime<Utc>>,
121 end: Option<DateTime<Utc>>,
122 },
123 ValueFilter {
125 field: String,
126 operator: FilterOperator,
127 value: serde_json::Value,
128 },
129 PatternMatch {
131 field: String,
132 pattern: String,
133 case_sensitive: bool,
134 },
135 GeoFilter {
137 latitude: f64,
138 longitude: f64,
139 radius_km: f64,
140 },
141 SemanticFilter {
143 subject_pattern: Option<String>,
144 predicate_pattern: Option<String>,
145 object_pattern: Option<String>,
146 },
147 AggregationFilter {
149 function: AggregationFunction,
150 threshold: f64,
151 },
152 CompositeFilter {
154 operator: LogicalOperator,
155 filters: Vec<Box<AdvancedFilter>>,
156 },
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
161pub enum FilterOperator {
162 Equal,
163 NotEqual,
164 LessThan,
165 LessThanOrEqual,
166 GreaterThan,
167 GreaterThanOrEqual,
168 Contains,
169 StartsWith,
170 EndsWith,
171 In,
172 NotIn,
173}
174
175#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
177pub enum AggregationFunction {
178 Count,
179 Sum,
180 Average,
181 Min,
182 Max,
183 StdDev,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub enum LogicalOperator {
189 And,
190 Or,
191 Not,
192 Xor,
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct WindowSpec {
198 pub window_type: WindowType,
200 pub size: WindowSize,
202 pub slide: Option<WindowSize>,
204 pub triggers: Vec<WindowTrigger>,
206}
207
208#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
210pub enum WindowType {
211 Tumbling,
212 Sliding,
213 Session,
214 Global,
215}
216
217#[derive(Debug, Clone, Serialize, Deserialize)]
219pub enum WindowSize {
220 Time(Duration),
221 Count(usize),
222 Bytes(usize),
223}
224
225#[derive(Debug, Clone, Serialize, Deserialize)]
227pub enum WindowTrigger {
228 TimeInterval(Duration),
230 EventCount(usize),
232 Watermark,
234 EventType(String),
236 Custom(String),
238}
239
240#[derive(Debug, Clone)]
242pub struct SubscriptionMetadata {
243 pub created_at: DateTime<Utc>,
245 pub last_activity: DateTime<Utc>,
247 pub tags: Vec<String>,
249 pub priority: SubscriptionPriority,
251 pub namespace: Option<String>,
253 pub groups: Vec<String>,
255}
256
257#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
259pub enum SubscriptionPriority {
260 Low = 1,
261 Normal = 2,
262 High = 3,
263 Critical = 4,
264}
265
266#[derive(Debug, Clone, Default)]
268pub struct SubscriptionStatistics {
269 pub events_received: u64,
271 pub updates_sent: u64,
273 pub bytes_sent: u64,
275 pub avg_latency_ms: f64,
277 pub max_latency_ms: f64,
279 pub error_count: u64,
281 pub last_error: Option<String>,
283}
284
285#[derive(Debug, Clone)]
287pub struct SubscriptionGroup {
288 pub id: String,
290 pub name: String,
292 pub members: HashSet<String>,
294 pub filters: Vec<AdvancedFilter>,
296 pub config: GroupConfig,
298}
299
300#[derive(Debug, Clone)]
302pub struct GroupConfig {
303 pub shared_windowing: bool,
305 pub load_balancing: bool,
307 pub max_members: usize,
309}
310
311pub struct SubscriptionWindow {
313 pub id: String,
315 pub subscription_id: String,
317 pub spec: WindowSpec,
319 pub buffer: VecDeque<WindowedEvent>,
321 pub state: WindowState,
323}
324
325#[derive(Debug, Clone)]
327pub struct WindowedEvent {
328 pub event: StreamEvent,
329 pub timestamp: DateTime<Utc>,
330 pub sequence_id: u64,
331}
332
333#[derive(Debug, Clone)]
335pub struct WindowState {
336 pub start_time: DateTime<Utc>,
338 pub end_time: Option<DateTime<Utc>>,
340 pub event_count: usize,
342 pub total_bytes: usize,
344 pub is_closed: bool,
346}
347
348#[derive(Debug, Clone)]
350pub enum SubscriptionEvent {
351 Update {
353 subscription_id: String,
354 data: serde_json::Value,
355 timestamp: DateTime<Utc>,
356 },
357 StateChanged {
359 subscription_id: String,
360 old_state: SubscriptionState,
361 new_state: SubscriptionState,
362 },
363 Heartbeat {
365 subscription_id: String,
366 timestamp: DateTime<Utc>,
367 },
368 Error {
370 subscription_id: String,
371 error: String,
372 timestamp: DateTime<Utc>,
373 },
374}
375
376#[derive(Debug, Clone, Default)]
378pub struct SubscriptionStats {
379 pub total_subscriptions: usize,
380 pub active_subscriptions: usize,
381 pub paused_subscriptions: usize,
382 pub reconnecting_subscriptions: usize,
383 pub total_events_processed: u64,
384 pub total_updates_sent: u64,
385 pub avg_processing_time_ms: f64,
386}
387
388impl GraphQLSubscriptionManager {
389 pub fn new(config: SubscriptionConfig) -> Self {
391 let (event_tx, _) = broadcast::channel(10000);
392
393 let manager = Self {
394 subscriptions: Arc::new(RwLock::new(HashMap::new())),
395 groups: Arc::new(RwLock::new(HashMap::new())),
396 windows: Arc::new(RwLock::new(HashMap::new())),
397 event_tx,
398 config,
399 stats: Arc::new(RwLock::new(SubscriptionStats::default())),
400 };
401
402 manager.start_heartbeat_task();
404 manager.start_cleanup_task();
405
406 manager
407 }
408
409 pub async fn register_subscription(
411 &self,
412 subscription: EnhancedSubscription,
413 ) -> Result<String> {
414 let mut subscriptions = self.subscriptions.write().await;
415
416 if subscriptions.len() >= self.config.max_subscriptions {
418 return Err(anyhow!("Maximum subscriptions limit reached"));
419 }
420
421 let client_count = subscriptions
423 .values()
424 .filter(|s| s.client_id == subscription.client_id)
425 .count();
426
427 if client_count >= self.config.max_subscriptions_per_client {
428 return Err(anyhow!("Client subscription limit reached"));
429 }
430
431 let id = subscription.id.clone();
432
433 if self.config.enable_windowing {
435 if let Some(window_spec) = &subscription.window {
436 self.create_window(&id, window_spec.clone()).await?;
437 }
438 }
439
440 subscriptions.insert(id.clone(), subscription);
441
442 let mut stats = self.stats.write().await;
444 stats.total_subscriptions = subscriptions.len();
445 stats.active_subscriptions = subscriptions
446 .values()
447 .filter(|s| s.state == SubscriptionState::Active)
448 .count();
449
450 info!("Registered GraphQL subscription: {}", id);
451 Ok(id)
452 }
453
454 pub async fn unregister_subscription(&self, subscription_id: &str) -> Result<()> {
456 let mut subscriptions = self.subscriptions.write().await;
457 subscriptions
458 .remove(subscription_id)
459 .ok_or_else(|| anyhow!("Subscription not found"))?;
460
461 self.windows.write().await.remove(subscription_id);
463
464 let mut stats = self.stats.write().await;
466 stats.total_subscriptions = subscriptions.len();
467 stats.active_subscriptions = subscriptions
468 .values()
469 .filter(|s| s.state == SubscriptionState::Active)
470 .count();
471
472 info!("Unregistered GraphQL subscription: {}", subscription_id);
473 Ok(())
474 }
475
476 pub async fn pause_subscription(&self, subscription_id: &str) -> Result<()> {
478 let mut subscriptions = self.subscriptions.write().await;
479 let subscription = subscriptions
480 .get_mut(subscription_id)
481 .ok_or_else(|| anyhow!("Subscription not found"))?;
482
483 let old_state = subscription.state.clone();
484 subscription.state = SubscriptionState::Paused;
485
486 let _ = self.event_tx.send(SubscriptionEvent::StateChanged {
488 subscription_id: subscription_id.to_string(),
489 old_state,
490 new_state: SubscriptionState::Paused,
491 });
492
493 info!("Paused subscription: {}", subscription_id);
494 Ok(())
495 }
496
497 pub async fn resume_subscription(&self, subscription_id: &str) -> Result<()> {
499 let mut subscriptions = self.subscriptions.write().await;
500 let subscription = subscriptions
501 .get_mut(subscription_id)
502 .ok_or_else(|| anyhow!("Subscription not found"))?;
503
504 let old_state = subscription.state.clone();
505 subscription.state = SubscriptionState::Active;
506 subscription.metadata.last_activity = Utc::now();
507
508 let _ = self.event_tx.send(SubscriptionEvent::StateChanged {
510 subscription_id: subscription_id.to_string(),
511 old_state,
512 new_state: SubscriptionState::Active,
513 });
514
515 info!("Resumed subscription: {}", subscription_id);
516 Ok(())
517 }
518
519 pub async fn process_event(&self, event: &StreamEvent) -> Result<()> {
521 let subscriptions = self.subscriptions.read().await;
522
523 for (sub_id, subscription) in subscriptions.iter() {
524 if subscription.state != SubscriptionState::Active {
526 continue;
527 }
528
529 if !self.apply_filters(event, &subscription.filters).await? {
531 continue;
532 }
533
534 if self.config.enable_windowing && subscription.window.is_some() {
536 self.add_to_window(sub_id, event).await?;
537 } else {
538 self.send_update(sub_id, event).await?;
540 }
541 }
542
543 let mut stats = self.stats.write().await;
544 stats.total_events_processed += 1;
545
546 Ok(())
547 }
548
549 async fn apply_filters(
551 &self,
552 _event: &StreamEvent,
553 filters: &[AdvancedFilter],
554 ) -> Result<bool> {
555 if !self.config.enable_advanced_filtering || filters.is_empty() {
556 return Ok(true);
557 }
558
559 for filter in filters {
561 match filter {
562 AdvancedFilter::TimeRange { start, end } => {
563 let now = Utc::now();
564 if let Some(start) = start {
565 if &now < start {
566 return Ok(false);
567 }
568 }
569 if let Some(end) = end {
570 if &now > end {
571 return Ok(false);
572 }
573 }
574 }
575 _ => {
576 }
578 }
579 }
580
581 Ok(true)
582 }
583
584 async fn create_window(&self, subscription_id: &str, spec: WindowSpec) -> Result<()> {
586 let window = SubscriptionWindow {
587 id: uuid::Uuid::new_v4().to_string(),
588 subscription_id: subscription_id.to_string(),
589 spec,
590 buffer: VecDeque::new(),
591 state: WindowState {
592 start_time: Utc::now(),
593 end_time: None,
594 event_count: 0,
595 total_bytes: 0,
596 is_closed: false,
597 },
598 };
599
600 self.windows
601 .write()
602 .await
603 .insert(subscription_id.to_string(), window);
604
605 Ok(())
606 }
607
608 async fn add_to_window(&self, subscription_id: &str, event: &StreamEvent) -> Result<()> {
610 let mut windows = self.windows.write().await;
611 if let Some(window) = windows.get_mut(subscription_id) {
612 let windowed_event = WindowedEvent {
613 event: event.clone(),
614 timestamp: Utc::now(),
615 sequence_id: window.state.event_count as u64,
616 };
617
618 window.buffer.push_back(windowed_event);
619 window.state.event_count += 1;
620
621 self.check_window_triggers(window).await?;
623 }
624
625 Ok(())
626 }
627
628 async fn check_window_triggers(&self, window: &mut SubscriptionWindow) -> Result<()> {
630 for trigger in &window.spec.triggers {
631 match trigger {
632 WindowTrigger::EventCount(count) if window.state.event_count >= *count => {
633 debug!("Window trigger fired: event count {}", count);
635 }
636 WindowTrigger::TimeInterval(duration) => {
637 let elapsed = Utc::now() - window.state.start_time;
638 if elapsed > ChronoDuration::from_std(*duration)? {
639 debug!("Window trigger fired: time interval {:?}", duration);
640 }
641 }
642 _ => {}
643 }
644 }
645
646 Ok(())
647 }
648
649 async fn send_update(&self, subscription_id: &str, event: &StreamEvent) -> Result<()> {
651 let data = self.convert_event_to_graphql(event)?;
653
654 let _ = self.event_tx.send(SubscriptionEvent::Update {
656 subscription_id: subscription_id.to_string(),
657 data,
658 timestamp: Utc::now(),
659 });
660
661 let mut subscriptions = self.subscriptions.write().await;
663 if let Some(subscription) = subscriptions.get_mut(subscription_id) {
664 subscription.stats.updates_sent += 1;
665 subscription.metadata.last_activity = Utc::now();
666 }
667
668 Ok(())
669 }
670
671 fn convert_event_to_graphql(&self, event: &StreamEvent) -> Result<serde_json::Value> {
673 Ok(serde_json::json!({
675 "type": format!("{:?}", event),
676 "timestamp": Utc::now().to_rfc3339(),
677 }))
678 }
679
680 fn start_heartbeat_task(&self) {
682 let subscriptions = self.subscriptions.clone();
683 let event_tx = self.event_tx.clone();
684 let interval_duration = self.config.heartbeat_interval;
685
686 tokio::spawn(async move {
687 let mut interval_timer = interval(interval_duration);
688
689 loop {
690 interval_timer.tick().await;
691
692 let subs = subscriptions.read().await;
693 for (sub_id, subscription) in subs.iter() {
694 if subscription.state == SubscriptionState::Active {
695 let _ = event_tx.send(SubscriptionEvent::Heartbeat {
696 subscription_id: sub_id.clone(),
697 timestamp: Utc::now(),
698 });
699 }
700 }
701 }
702 });
703 }
704
705 fn start_cleanup_task(&self) {
707 let subscriptions = self.subscriptions.clone();
708 let timeout = self.config.subscription_timeout;
709
710 tokio::spawn(async move {
711 let mut interval_timer = interval(Duration::from_secs(60));
712
713 loop {
714 interval_timer.tick().await;
715
716 let mut subs = subscriptions.write().await;
717 let now = Utc::now();
718
719 subs.retain(|_, subscription| {
721 let inactive_duration = now - subscription.metadata.last_activity;
722 inactive_duration
723 < ChronoDuration::from_std(timeout)
724 .expect("timeout should be valid chrono Duration")
725 });
726 }
727 });
728 }
729
730 pub async fn get_stats(&self) -> SubscriptionStats {
732 self.stats.read().await.clone()
733 }
734
735 pub fn subscribe(&self) -> broadcast::Receiver<SubscriptionEvent> {
737 self.event_tx.subscribe()
738 }
739}
740
741#[cfg(test)]
742mod tests {
743 use super::*;
744
745 #[tokio::test]
746 async fn test_subscription_config_defaults() {
747 let config = SubscriptionConfig::default();
748 assert_eq!(config.max_subscriptions, 10000);
749 assert!(config.enable_windowing);
750 }
751
752 #[tokio::test]
753 async fn test_subscription_states() {
754 let state = SubscriptionState::Active;
755 assert_eq!(state, SubscriptionState::Active);
756
757 let state = SubscriptionState::Paused;
758 assert_eq!(state, SubscriptionState::Paused);
759 }
760
761 #[tokio::test]
762 async fn test_filter_operators() {
763 assert_eq!(FilterOperator::Equal, FilterOperator::Equal);
764 assert_ne!(FilterOperator::Equal, FilterOperator::NotEqual);
765 }
766
767 #[tokio::test]
768 async fn test_window_types() {
769 let window = WindowSpec {
770 window_type: WindowType::Tumbling,
771 size: WindowSize::Time(Duration::from_secs(60)),
772 slide: None,
773 triggers: vec![WindowTrigger::EventCount(100)],
774 };
775
776 assert_eq!(window.window_type, WindowType::Tumbling);
777 }
778
779 #[tokio::test]
780 async fn test_subscription_priority() {
781 assert!(SubscriptionPriority::Critical > SubscriptionPriority::High);
782 assert!(SubscriptionPriority::High > SubscriptionPriority::Normal);
783 assert!(SubscriptionPriority::Normal > SubscriptionPriority::Low);
784 }
785}