vibesql_server/subscription/types.rs
1//! Core subscription types
2//!
3//! This module defines the fundamental types for the subscription system:
4//! - `SubscriptionId`: Unique identifier for subscriptions
5//! - `Subscription`: Individual subscription with query and notification channel
6//! - `SubscriptionMetrics`: Metrics for monitoring subscription health
7//! - `SubscriptionUpdate`: Update notifications sent to subscribers
8//! - `SubscriptionError`: Errors that can occur during subscription operations
9
10use std::collections::HashSet;
11
12use tokio::sync::mpsc;
13
14use super::config::{SubscriptionConfig, SubscriptionRetryPolicy};
15use super::delta::PartialRowDelta;
16use super::selective::SelectiveColumnConfig;
17
18// ============================================================================
19// Subscription ID
20// ============================================================================
21
22/// Unique subscription identifier
23///
24/// Each subscription is assigned a unique ID when created. This ID is used
25/// to track the subscription throughout its lifecycle and to unsubscribe.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub struct SubscriptionId(u64);
28
29impl SubscriptionId {
30 /// Create a new unique subscription ID
31 ///
32 /// Uses an atomic counter to ensure uniqueness across all threads.
33 pub fn new() -> Self {
34 use std::sync::atomic::{AtomicU64, Ordering};
35 static COUNTER: AtomicU64 = AtomicU64::new(1);
36 Self(COUNTER.fetch_add(1, Ordering::Relaxed))
37 }
38
39 /// Get the raw ID value (for debugging/logging)
40 pub fn as_u64(&self) -> u64 {
41 self.0
42 }
43}
44
45impl Default for SubscriptionId {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51impl std::fmt::Display for SubscriptionId {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 write!(f, "sub-{}", self.0)
54 }
55}
56
57// ============================================================================
58// Subscription Metrics
59// ============================================================================
60
61/// Metrics for a single subscription
62///
63/// Provides observability into subscription health and backpressure.
64#[derive(Debug, Clone, Default)]
65pub struct SubscriptionMetrics {
66 /// Subscription ID
67 pub subscription_id: Option<SubscriptionId>,
68 /// Total updates successfully sent to this subscription
69 pub updates_sent: u64,
70 /// Total updates dropped due to channel being full
71 pub updates_dropped: u64,
72 /// Configured channel buffer size
73 pub channel_buffer_size: usize,
74 /// Current channel capacity (available slots)
75 pub channel_capacity: usize,
76 /// Slow consumer threshold percentage
77 pub slow_consumer_threshold_percent: u8,
78}
79
80// ============================================================================
81// Subscription
82// ============================================================================
83
84/// A single query subscription
85///
86/// Tracks the query, its table dependencies, and the channel for sending updates.
87#[derive(Debug)]
88pub struct Subscription {
89 /// Unique identifier for this subscription
90 pub id: SubscriptionId,
91 /// The SQL query being monitored
92 pub query: String,
93 /// Tables this query depends on (extracted from AST)
94 pub tables: HashSet<String>,
95 /// Hash of the last result set (for change detection)
96 pub last_result_hash: u64,
97 /// Last result set (for delta computation)
98 /// This stores the previous result to enable computing deltas on change.
99 pub last_result: Option<Vec<crate::Row>>,
100 /// Channel to send updates to the subscriber
101 pub notify_tx: mpsc::Sender<SubscriptionUpdate>,
102 /// Retry policy for handling transient errors
103 pub retry_policy: SubscriptionRetryPolicy,
104 /// Current retry attempt count (resets on successful execution)
105 pub retry_count: u32,
106 /// Total updates sent to this subscription
107 pub updates_sent: u64,
108 /// Total updates dropped due to channel being full
109 pub updates_dropped: u64,
110 /// Buffer size for the subscription channel
111 pub channel_buffer_size: usize,
112 /// Slow consumer threshold percentage
113 pub slow_consumer_threshold_percent: u8,
114 /// Optional connection/session ID that owns this subscription
115 /// Used for connection-level subscription tracking and cleanup
116 pub connection_id: Option<String>,
117 /// Optional wire protocol subscription ID (UUID bytes)
118 /// Used to bridge between wire protocol IDs and internal SubscriptionId
119 pub wire_subscription_id: Option<[u8; 16]>,
120 /// Optional filter expression (SQL WHERE clause) to apply to updates
121 /// Only rows matching the filter will be included in subscription updates
122 pub filter: Option<String>,
123 /// Primary key column indices in the result set
124 /// Used for selective column updates to always include PK columns
125 /// Default: [0] (assumes first column is PK if not detected)
126 pub pk_columns: Vec<usize>,
127 /// Whether this subscription is eligible for selective column updates
128 /// True when PK columns were confidently detected
129 pub selective_eligible: bool,
130 /// Per-subscription override for selective column update configuration
131 /// If set, this overrides the server-level selective_updates config for this subscription
132 pub selective_updates_override: Option<SelectiveColumnConfig>,
133}
134
135impl Subscription {
136 /// Create a new subscription
137 pub fn new(
138 query: String,
139 tables: HashSet<String>,
140 notify_tx: mpsc::Sender<SubscriptionUpdate>,
141 ) -> Self {
142 Self::with_policy(query, tables, notify_tx, SubscriptionRetryPolicy::default())
143 }
144
145 /// Create a new subscription with a custom retry policy
146 pub fn with_policy(
147 query: String,
148 tables: HashSet<String>,
149 notify_tx: mpsc::Sender<SubscriptionUpdate>,
150 retry_policy: SubscriptionRetryPolicy,
151 ) -> Self {
152 Self {
153 id: SubscriptionId::new(),
154 query,
155 tables,
156 last_result_hash: 0,
157 last_result: None,
158 notify_tx,
159 retry_policy,
160 retry_count: 0,
161 updates_sent: 0,
162 updates_dropped: 0,
163 channel_buffer_size: 64, // default buffer size
164 slow_consumer_threshold_percent: 80,
165 connection_id: None,
166 wire_subscription_id: None,
167 filter: None,
168 pk_columns: vec![0], // default: assume first column is PK
169 selective_eligible: false,
170 selective_updates_override: None,
171 }
172 }
173
174 /// Create a new subscription with custom configuration
175 pub fn with_config(
176 query: String,
177 tables: HashSet<String>,
178 notify_tx: mpsc::Sender<SubscriptionUpdate>,
179 config: &SubscriptionConfig,
180 ) -> Self {
181 Self {
182 id: SubscriptionId::new(),
183 query,
184 tables,
185 last_result_hash: 0,
186 last_result: None,
187 notify_tx,
188 retry_policy: SubscriptionRetryPolicy::default(),
189 retry_count: 0,
190 updates_sent: 0,
191 updates_dropped: 0,
192 channel_buffer_size: config.channel_buffer_size,
193 slow_consumer_threshold_percent: config.slow_consumer_threshold_percent,
194 connection_id: None,
195 wire_subscription_id: None,
196 filter: None,
197 pk_columns: vec![0], // default: assume first column is PK
198 selective_eligible: false,
199 selective_updates_override: None,
200 }
201 }
202
203 /// Create a new subscription for a specific connection (wire protocol)
204 ///
205 /// This associates the subscription with a connection ID for tracking
206 /// and cleanup when the connection closes.
207 pub fn for_connection(
208 query: String,
209 tables: HashSet<String>,
210 notify_tx: mpsc::Sender<SubscriptionUpdate>,
211 connection_id: String,
212 wire_subscription_id: [u8; 16],
213 filter: Option<String>,
214 config: &SubscriptionConfig,
215 ) -> Self {
216 Self::for_connection_with_pk(
217 query,
218 tables,
219 notify_tx,
220 connection_id,
221 wire_subscription_id,
222 filter,
223 config,
224 vec![0], // default: assume first column is PK
225 )
226 }
227
228 /// Create a new subscription for a specific connection with custom PK columns
229 ///
230 /// This associates the subscription with a connection ID for tracking
231 /// and cleanup when the connection closes. It also allows specifying
232 /// which columns are primary keys for selective column updates.
233 #[allow(clippy::too_many_arguments)]
234 pub fn for_connection_with_pk(
235 query: String,
236 tables: HashSet<String>,
237 notify_tx: mpsc::Sender<SubscriptionUpdate>,
238 connection_id: String,
239 wire_subscription_id: [u8; 16],
240 filter: Option<String>,
241 config: &SubscriptionConfig,
242 pk_columns: Vec<usize>,
243 ) -> Self {
244 Self {
245 id: SubscriptionId::new(),
246 query,
247 tables,
248 last_result_hash: 0,
249 last_result: None,
250 notify_tx,
251 retry_policy: SubscriptionRetryPolicy::default(),
252 retry_count: 0,
253 updates_sent: 0,
254 updates_dropped: 0,
255 channel_buffer_size: config.channel_buffer_size,
256 slow_consumer_threshold_percent: config.slow_consumer_threshold_percent,
257 connection_id: Some(connection_id),
258 wire_subscription_id: Some(wire_subscription_id),
259 filter,
260 pk_columns,
261 selective_eligible: false,
262 selective_updates_override: None,
263 }
264 }
265
266 /// Set the primary key columns for this subscription
267 ///
268 /// Used after detection to update the subscription with actual PK columns.
269 pub fn set_pk_columns(&mut self, pk_columns: Vec<usize>) {
270 self.pk_columns = pk_columns;
271 }
272
273 /// Set both PK columns and selective eligibility
274 ///
275 /// Used after PK detection to update the subscription.
276 /// Returns true if the subscription is newly marked as selective-eligible.
277 pub fn set_pk_columns_with_eligibility(
278 &mut self,
279 pk_columns: Vec<usize>,
280 confident: bool,
281 ) -> bool {
282 self.pk_columns = pk_columns;
283 let was_eligible = self.selective_eligible;
284 self.selective_eligible = confident;
285 // Return true if newly eligible (wasn't before, is now)
286 !was_eligible && confident
287 }
288
289 /// Set per-subscription selective updates override
290 ///
291 /// Allows clients to override server-level selective update thresholds
292 /// on a per-subscription basis.
293 pub fn set_selective_updates_override(&mut self, config: SelectiveColumnConfig) {
294 self.selective_updates_override = Some(config);
295 }
296
297 /// Clear the selective updates override (use server defaults)
298 pub fn clear_selective_updates_override(&mut self) {
299 self.selective_updates_override = None;
300 }
301
302 /// Get the effective selective column update config for this subscription
303 ///
304 /// Returns the per-subscription override if set, otherwise creates a config
305 /// from the server-level config with this subscription's PK columns.
306 pub fn get_effective_selective_config(
307 &self,
308 server_config: &SelectiveColumnConfig,
309 ) -> SelectiveColumnConfig {
310 match &self.selective_updates_override {
311 Some(override_config) => {
312 // Use override but ensure pk_columns is always from the subscription
313 override_config.with_pk_columns(self.pk_columns.clone())
314 }
315 None => {
316 // Use server config with this subscription's pk_columns
317 server_config.with_pk_columns(self.pk_columns.clone())
318 }
319 }
320 }
321}
322
323// ============================================================================
324// Subscription Update
325// ============================================================================
326
327/// Update notification sent to subscribers
328///
329/// When a subscription's results change, an update is sent through the
330/// subscription's notification channel.
331#[derive(Debug, Clone)]
332pub enum SubscriptionUpdate {
333 /// Full result set (initial subscription or major change)
334 ///
335 /// Contains all rows matching the query. This is sent when:
336 /// - A new subscription is created (initial results)
337 /// - The results have changed and delta calculation isn't available
338 Full {
339 /// The subscription ID this update is for
340 subscription_id: SubscriptionId,
341 /// All rows in the result set
342 rows: Vec<crate::Row>,
343 },
344
345 /// Incremental delta update
346 ///
347 /// Contains only the changes since the last update. More efficient
348 /// for large result sets with small changes. Sent when the change
349 /// can be expressed as a set of inserts, updates, and deletes.
350 Delta {
351 /// The subscription ID this update is for
352 subscription_id: SubscriptionId,
353 /// Newly inserted rows (in new result, not in previous)
354 inserts: Vec<crate::Row>,
355 /// Updated rows (old value, new value) - rows with same identity but different content
356 updates: Vec<(crate::Row, crate::Row)>,
357 /// Deleted rows (in previous result, not in new)
358 deletes: Vec<crate::Row>,
359 },
360
361 /// Query execution error
362 ///
363 /// Sent when the subscription query fails to execute, typically due to
364 /// schema changes that invalidate the query.
365 Error {
366 /// The subscription ID this update is for
367 subscription_id: SubscriptionId,
368 /// Error message describing what went wrong
369 message: String,
370 },
371
372 /// Partial row updates (selective column updates)
373 ///
374 /// Sent when a subscription is eligible for selective column updates and
375 /// only a subset of columns have changed. Contains only the changed columns
376 /// plus the primary key columns, reducing bandwidth for wide tables.
377 ///
378 /// This is more efficient than Delta for tables with many columns where
379 /// only a few columns change at a time.
380 Partial {
381 /// The subscription ID this update is for
382 subscription_id: SubscriptionId,
383 /// Partial row updates, each containing only changed columns + PK columns
384 updates: Vec<PartialRowDelta>,
385 },
386}
387
388impl SubscriptionUpdate {
389 /// Get the subscription ID this update is for
390 pub fn subscription_id(&self) -> SubscriptionId {
391 match self {
392 SubscriptionUpdate::Full { subscription_id, .. } => *subscription_id,
393 SubscriptionUpdate::Delta { subscription_id, .. } => *subscription_id,
394 SubscriptionUpdate::Error { subscription_id, .. } => *subscription_id,
395 SubscriptionUpdate::Partial { subscription_id, .. } => *subscription_id,
396 }
397 }
398}
399
400// ============================================================================
401// Subscription Error
402// ============================================================================
403
404/// Errors that can occur during subscription operations
405#[derive(Debug, thiserror::Error)]
406pub enum SubscriptionError {
407 /// Failed to parse the subscription query
408 #[error("Failed to parse query: {0}")]
409 ParseError(String),
410
411 /// The query references unknown tables
412 #[error("Query references unknown table: {0}")]
413 UnknownTable(String),
414
415 /// The subscription was not found
416 #[error("Subscription not found: {0}")]
417 NotFound(SubscriptionId),
418
419 /// Failed to send notification to subscriber
420 #[error("Failed to send notification: channel closed")]
421 ChannelClosed,
422
423 /// Per-connection subscription limit exceeded
424 #[error("Connection limit exceeded: {current} subscriptions (max: {max})")]
425 ConnectionLimitExceeded {
426 /// Current number of subscriptions for this connection
427 current: usize,
428 /// Maximum allowed subscriptions per connection
429 max: usize,
430 },
431
432 /// Global subscription limit exceeded
433 #[error("Global limit exceeded: {current} subscriptions (max: {max})")]
434 GlobalLimitExceeded {
435 /// Current total subscriptions across all connections
436 current: usize,
437 /// Maximum allowed subscriptions globally
438 max: usize,
439 },
440
441 /// Result set too large for subscription
442 #[error("Result set too large: {rows} rows (max: {max})")]
443 ResultSetTooLarge {
444 /// Number of rows in the result set
445 rows: usize,
446 /// Maximum allowed rows per subscription
447 max: usize,
448 },
449
450 /// Rate limit exceeded for subscription creation
451 #[error("Rate limited: retry after {retry_after_ms}ms")]
452 RateLimited {
453 /// Milliseconds to wait before retrying
454 retry_after_ms: u64,
455 },
456}