vibesql_server/subscription/manager/mod.rs
1//! Subscription manager for tracking and notifying query subscriptions
2//!
3//! The SubscriptionManager is the central component of the subscription system.
4//! It maintains the registry of active subscriptions, indexes them by table
5//! dependencies, and handles change event notifications.
6//!
7//! # Module Organization
8//!
9//! This module is split into focused submodules:
10//!
11//! - [`lifecycle`]: Subscribe/unsubscribe operations
12//! - [`wire`]: Wire protocol integration
13//! - [`query`]: Query execution and retry logic
14//! - [`events`]: Change event handling and notification
15//! - [`metrics`]: Metrics and configuration accessors
16
17mod events;
18mod lifecycle;
19mod metrics;
20mod query;
21mod wire;
22
23#[cfg(test)]
24mod tests;
25
26use std::collections::HashSet;
27use std::sync::atomic::AtomicUsize;
28
29use dashmap::DashMap;
30
31use super::{SubscriptionConfig, SubscriptionId};
32use crate::subscription::Subscription;
33
34// ============================================================================
35// Subscription Manager
36// ============================================================================
37
38/// Manager for query subscriptions
39///
40/// Tracks all active subscriptions, indexes them by table dependencies,
41/// and handles notifications when data changes.
42///
43/// # Thread Safety
44///
45/// The manager uses `DashMap` for lock-free concurrent access to subscriptions.
46/// Multiple threads can subscribe, unsubscribe, and process change events
47/// concurrently without explicit locking.
48///
49/// # Performance
50///
51/// The manager uses a table-based index to quickly find subscriptions affected
52/// by a change event. This allows O(1) lookup of subscriptions by table name,
53/// rather than scanning all subscriptions.
54///
55/// # Connection Tracking
56///
57/// The manager supports tracking subscriptions by connection/session ID.
58/// This enables efficient cleanup when a connection closes, and allows
59/// both HTTP SSE and wire protocol clients to use the same subscription
60/// infrastructure.
61pub struct SubscriptionManager {
62 /// All active subscriptions, indexed by ID
63 pub(crate) subscriptions: DashMap<SubscriptionId, Subscription>,
64
65 /// Index: table_name -> subscription IDs that depend on it
66 /// This enables fast lookup of affected subscriptions when a table changes
67 pub(crate) table_index: DashMap<String, HashSet<SubscriptionId>>,
68
69 /// Index: connection_id -> subscription IDs belonging to that connection
70 /// Used for connection-level subscription tracking and cleanup
71 pub(crate) connection_index: DashMap<String, HashSet<SubscriptionId>>,
72
73 /// Index: wire_subscription_id -> internal SubscriptionId
74 /// Used to bridge wire protocol UUIDs to internal u64 IDs
75 pub(crate) wire_id_index: DashMap<[u8; 16], SubscriptionId>,
76
77 /// Configuration for limits and quotas
78 pub(crate) config: SubscriptionConfig,
79
80 /// Counter for global limit exceeded events (for metrics)
81 pub(crate) limit_exceeded_count: AtomicUsize,
82
83 /// Counter for result set too large events (for metrics)
84 pub(crate) result_set_exceeded_count: AtomicUsize,
85
86 /// Atomic counter for current subscription count (for lock-free limit checking)
87 pub(crate) subscription_count_atomic: AtomicUsize,
88
89 /// Per-connection subscription counts (for per-connection limit enforcement)
90 pub(crate) connection_subscription_counts: DashMap<String, AtomicUsize>,
91}
92
93impl SubscriptionManager {
94 /// Create a new subscription manager with default configuration
95 pub fn new() -> Self {
96 Self::with_config(SubscriptionConfig::default())
97 }
98
99 /// Create a new subscription manager with custom configuration
100 pub fn with_config(config: SubscriptionConfig) -> Self {
101 Self {
102 subscriptions: DashMap::new(),
103 table_index: DashMap::new(),
104 connection_index: DashMap::new(),
105 wire_id_index: DashMap::new(),
106 config,
107 limit_exceeded_count: AtomicUsize::new(0),
108 result_set_exceeded_count: AtomicUsize::new(0),
109 subscription_count_atomic: AtomicUsize::new(0),
110 connection_subscription_counts: DashMap::new(),
111 }
112 }
113}
114
115impl Default for SubscriptionManager {
116 fn default() -> Self {
117 Self::new()
118 }
119}