Skip to main content

vibesql_server/subscription/
types.rs

1//! Core subscription types
2//!
3//! This module defines the fundamental types for the subscription system:
4//! - `SubscriptionId`: Unique identifier for subscriptions
5//! - `Subscription`: Individual subscription with query and notification channel
6//! - `SubscriptionMetrics`: Metrics for monitoring subscription health
7//! - `SubscriptionUpdate`: Update notifications sent to subscribers
8//! - `SubscriptionError`: Errors that can occur during subscription operations
9
10use std::collections::HashSet;
11
12use tokio::sync::mpsc;
13
14use super::config::{SubscriptionConfig, SubscriptionRetryPolicy};
15use super::delta::PartialRowDelta;
16use super::selective::SelectiveColumnConfig;
17
18// ============================================================================
19// Subscription ID
20// ============================================================================
21
22/// Unique subscription identifier
23///
24/// Each subscription is assigned a unique ID when created. This ID is used
25/// to track the subscription throughout its lifecycle and to unsubscribe.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27pub struct SubscriptionId(u64);
28
29impl SubscriptionId {
30    /// Create a new unique subscription ID
31    ///
32    /// Uses an atomic counter to ensure uniqueness across all threads.
33    pub fn new() -> Self {
34        use std::sync::atomic::{AtomicU64, Ordering};
35        static COUNTER: AtomicU64 = AtomicU64::new(1);
36        Self(COUNTER.fetch_add(1, Ordering::Relaxed))
37    }
38
39    /// Get the raw ID value (for debugging/logging)
40    pub fn as_u64(&self) -> u64 {
41        self.0
42    }
43}
44
45impl Default for SubscriptionId {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51impl std::fmt::Display for SubscriptionId {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        write!(f, "sub-{}", self.0)
54    }
55}
56
57// ============================================================================
58// Subscription Metrics
59// ============================================================================
60
61/// Metrics for a single subscription
62///
63/// Provides observability into subscription health and backpressure.
64#[derive(Debug, Clone, Default)]
65pub struct SubscriptionMetrics {
66    /// Subscription ID
67    pub subscription_id: Option<SubscriptionId>,
68    /// Total updates successfully sent to this subscription
69    pub updates_sent: u64,
70    /// Total updates dropped due to channel being full
71    pub updates_dropped: u64,
72    /// Configured channel buffer size
73    pub channel_buffer_size: usize,
74    /// Current channel capacity (available slots)
75    pub channel_capacity: usize,
76    /// Slow consumer threshold percentage
77    pub slow_consumer_threshold_percent: u8,
78}
79
80// ============================================================================
81// Subscription
82// ============================================================================
83
84/// A single query subscription
85///
86/// Tracks the query, its table dependencies, and the channel for sending updates.
87#[derive(Debug)]
88pub struct Subscription {
89    /// Unique identifier for this subscription
90    pub id: SubscriptionId,
91    /// The SQL query being monitored
92    pub query: String,
93    /// Tables this query depends on (extracted from AST)
94    pub tables: HashSet<String>,
95    /// Hash of the last result set (for change detection)
96    pub last_result_hash: u64,
97    /// Last result set (for delta computation)
98    /// This stores the previous result to enable computing deltas on change.
99    pub last_result: Option<Vec<crate::Row>>,
100    /// Channel to send updates to the subscriber
101    pub notify_tx: mpsc::Sender<SubscriptionUpdate>,
102    /// Retry policy for handling transient errors
103    pub retry_policy: SubscriptionRetryPolicy,
104    /// Current retry attempt count (resets on successful execution)
105    pub retry_count: u32,
106    /// Total updates sent to this subscription
107    pub updates_sent: u64,
108    /// Total updates dropped due to channel being full
109    pub updates_dropped: u64,
110    /// Buffer size for the subscription channel
111    pub channel_buffer_size: usize,
112    /// Slow consumer threshold percentage
113    pub slow_consumer_threshold_percent: u8,
114    /// Optional connection/session ID that owns this subscription
115    /// Used for connection-level subscription tracking and cleanup
116    pub connection_id: Option<String>,
117    /// Optional wire protocol subscription ID (UUID bytes)
118    /// Used to bridge between wire protocol IDs and internal SubscriptionId
119    pub wire_subscription_id: Option<[u8; 16]>,
120    /// Optional filter expression (SQL WHERE clause) to apply to updates
121    /// Only rows matching the filter will be included in subscription updates
122    pub filter: Option<String>,
123    /// Primary key column indices in the result set
124    /// Used for selective column updates to always include PK columns
125    /// Default: [0] (assumes first column is PK if not detected)
126    pub pk_columns: Vec<usize>,
127    /// Whether this subscription is eligible for selective column updates
128    /// True when PK columns were confidently detected
129    pub selective_eligible: bool,
130    /// Per-subscription override for selective column update configuration
131    /// If set, this overrides the server-level selective_updates config for this subscription
132    pub selective_updates_override: Option<SelectiveColumnConfig>,
133}
134
135impl Subscription {
136    /// Create a new subscription
137    pub fn new(
138        query: String,
139        tables: HashSet<String>,
140        notify_tx: mpsc::Sender<SubscriptionUpdate>,
141    ) -> Self {
142        Self::with_policy(query, tables, notify_tx, SubscriptionRetryPolicy::default())
143    }
144
145    /// Create a new subscription with a custom retry policy
146    pub fn with_policy(
147        query: String,
148        tables: HashSet<String>,
149        notify_tx: mpsc::Sender<SubscriptionUpdate>,
150        retry_policy: SubscriptionRetryPolicy,
151    ) -> Self {
152        Self {
153            id: SubscriptionId::new(),
154            query,
155            tables,
156            last_result_hash: 0,
157            last_result: None,
158            notify_tx,
159            retry_policy,
160            retry_count: 0,
161            updates_sent: 0,
162            updates_dropped: 0,
163            channel_buffer_size: 64, // default buffer size
164            slow_consumer_threshold_percent: 80,
165            connection_id: None,
166            wire_subscription_id: None,
167            filter: None,
168            pk_columns: vec![0], // default: assume first column is PK
169            selective_eligible: false,
170            selective_updates_override: None,
171        }
172    }
173
174    /// Create a new subscription with custom configuration
175    pub fn with_config(
176        query: String,
177        tables: HashSet<String>,
178        notify_tx: mpsc::Sender<SubscriptionUpdate>,
179        config: &SubscriptionConfig,
180    ) -> Self {
181        Self {
182            id: SubscriptionId::new(),
183            query,
184            tables,
185            last_result_hash: 0,
186            last_result: None,
187            notify_tx,
188            retry_policy: SubscriptionRetryPolicy::default(),
189            retry_count: 0,
190            updates_sent: 0,
191            updates_dropped: 0,
192            channel_buffer_size: config.channel_buffer_size,
193            slow_consumer_threshold_percent: config.slow_consumer_threshold_percent,
194            connection_id: None,
195            wire_subscription_id: None,
196            filter: None,
197            pk_columns: vec![0], // default: assume first column is PK
198            selective_eligible: false,
199            selective_updates_override: None,
200        }
201    }
202
203    /// Create a new subscription for a specific connection (wire protocol)
204    ///
205    /// This associates the subscription with a connection ID for tracking
206    /// and cleanup when the connection closes.
207    pub fn for_connection(
208        query: String,
209        tables: HashSet<String>,
210        notify_tx: mpsc::Sender<SubscriptionUpdate>,
211        connection_id: String,
212        wire_subscription_id: [u8; 16],
213        filter: Option<String>,
214        config: &SubscriptionConfig,
215    ) -> Self {
216        Self::for_connection_with_pk(
217            query,
218            tables,
219            notify_tx,
220            connection_id,
221            wire_subscription_id,
222            filter,
223            config,
224            vec![0], // default: assume first column is PK
225        )
226    }
227
228    /// Create a new subscription for a specific connection with custom PK columns
229    ///
230    /// This associates the subscription with a connection ID for tracking
231    /// and cleanup when the connection closes. It also allows specifying
232    /// which columns are primary keys for selective column updates.
233    #[allow(clippy::too_many_arguments)]
234    pub fn for_connection_with_pk(
235        query: String,
236        tables: HashSet<String>,
237        notify_tx: mpsc::Sender<SubscriptionUpdate>,
238        connection_id: String,
239        wire_subscription_id: [u8; 16],
240        filter: Option<String>,
241        config: &SubscriptionConfig,
242        pk_columns: Vec<usize>,
243    ) -> Self {
244        Self {
245            id: SubscriptionId::new(),
246            query,
247            tables,
248            last_result_hash: 0,
249            last_result: None,
250            notify_tx,
251            retry_policy: SubscriptionRetryPolicy::default(),
252            retry_count: 0,
253            updates_sent: 0,
254            updates_dropped: 0,
255            channel_buffer_size: config.channel_buffer_size,
256            slow_consumer_threshold_percent: config.slow_consumer_threshold_percent,
257            connection_id: Some(connection_id),
258            wire_subscription_id: Some(wire_subscription_id),
259            filter,
260            pk_columns,
261            selective_eligible: false,
262            selective_updates_override: None,
263        }
264    }
265
266    /// Set the primary key columns for this subscription
267    ///
268    /// Used after detection to update the subscription with actual PK columns.
269    pub fn set_pk_columns(&mut self, pk_columns: Vec<usize>) {
270        self.pk_columns = pk_columns;
271    }
272
273    /// Set both PK columns and selective eligibility
274    ///
275    /// Used after PK detection to update the subscription.
276    /// Returns true if the subscription is newly marked as selective-eligible.
277    pub fn set_pk_columns_with_eligibility(
278        &mut self,
279        pk_columns: Vec<usize>,
280        confident: bool,
281    ) -> bool {
282        self.pk_columns = pk_columns;
283        let was_eligible = self.selective_eligible;
284        self.selective_eligible = confident;
285        // Return true if newly eligible (wasn't before, is now)
286        !was_eligible && confident
287    }
288
289    /// Set per-subscription selective updates override
290    ///
291    /// Allows clients to override server-level selective update thresholds
292    /// on a per-subscription basis.
293    pub fn set_selective_updates_override(&mut self, config: SelectiveColumnConfig) {
294        self.selective_updates_override = Some(config);
295    }
296
297    /// Clear the selective updates override (use server defaults)
298    pub fn clear_selective_updates_override(&mut self) {
299        self.selective_updates_override = None;
300    }
301
302    /// Get the effective selective column update config for this subscription
303    ///
304    /// Returns the per-subscription override if set, otherwise creates a config
305    /// from the server-level config with this subscription's PK columns.
306    pub fn get_effective_selective_config(
307        &self,
308        server_config: &SelectiveColumnConfig,
309    ) -> SelectiveColumnConfig {
310        match &self.selective_updates_override {
311            Some(override_config) => {
312                // Use override but ensure pk_columns is always from the subscription
313                override_config.with_pk_columns(self.pk_columns.clone())
314            }
315            None => {
316                // Use server config with this subscription's pk_columns
317                server_config.with_pk_columns(self.pk_columns.clone())
318            }
319        }
320    }
321}
322
323// ============================================================================
324// Subscription Update
325// ============================================================================
326
327/// Update notification sent to subscribers
328///
329/// When a subscription's results change, an update is sent through the
330/// subscription's notification channel.
331#[derive(Debug, Clone)]
332pub enum SubscriptionUpdate {
333    /// Full result set (initial subscription or major change)
334    ///
335    /// Contains all rows matching the query. This is sent when:
336    /// - A new subscription is created (initial results)
337    /// - The results have changed and delta calculation isn't available
338    Full {
339        /// The subscription ID this update is for
340        subscription_id: SubscriptionId,
341        /// All rows in the result set
342        rows: Vec<crate::Row>,
343    },
344
345    /// Incremental delta update
346    ///
347    /// Contains only the changes since the last update. More efficient
348    /// for large result sets with small changes. Sent when the change
349    /// can be expressed as a set of inserts, updates, and deletes.
350    Delta {
351        /// The subscription ID this update is for
352        subscription_id: SubscriptionId,
353        /// Newly inserted rows (in new result, not in previous)
354        inserts: Vec<crate::Row>,
355        /// Updated rows (old value, new value) - rows with same identity but different content
356        updates: Vec<(crate::Row, crate::Row)>,
357        /// Deleted rows (in previous result, not in new)
358        deletes: Vec<crate::Row>,
359    },
360
361    /// Query execution error
362    ///
363    /// Sent when the subscription query fails to execute, typically due to
364    /// schema changes that invalidate the query.
365    Error {
366        /// The subscription ID this update is for
367        subscription_id: SubscriptionId,
368        /// Error message describing what went wrong
369        message: String,
370    },
371
372    /// Partial row updates (selective column updates)
373    ///
374    /// Sent when a subscription is eligible for selective column updates and
375    /// only a subset of columns have changed. Contains only the changed columns
376    /// plus the primary key columns, reducing bandwidth for wide tables.
377    ///
378    /// This is more efficient than Delta for tables with many columns where
379    /// only a few columns change at a time.
380    Partial {
381        /// The subscription ID this update is for
382        subscription_id: SubscriptionId,
383        /// Partial row updates, each containing only changed columns + PK columns
384        updates: Vec<PartialRowDelta>,
385    },
386}
387
388impl SubscriptionUpdate {
389    /// Get the subscription ID this update is for
390    pub fn subscription_id(&self) -> SubscriptionId {
391        match self {
392            SubscriptionUpdate::Full { subscription_id, .. } => *subscription_id,
393            SubscriptionUpdate::Delta { subscription_id, .. } => *subscription_id,
394            SubscriptionUpdate::Error { subscription_id, .. } => *subscription_id,
395            SubscriptionUpdate::Partial { subscription_id, .. } => *subscription_id,
396        }
397    }
398}
399
400// ============================================================================
401// Subscription Error
402// ============================================================================
403
404/// Errors that can occur during subscription operations
405#[derive(Debug, thiserror::Error)]
406pub enum SubscriptionError {
407    /// Failed to parse the subscription query
408    #[error("Failed to parse query: {0}")]
409    ParseError(String),
410
411    /// The query references unknown tables
412    #[error("Query references unknown table: {0}")]
413    UnknownTable(String),
414
415    /// The subscription was not found
416    #[error("Subscription not found: {0}")]
417    NotFound(SubscriptionId),
418
419    /// Failed to send notification to subscriber
420    #[error("Failed to send notification: channel closed")]
421    ChannelClosed,
422
423    /// Per-connection subscription limit exceeded
424    #[error("Connection limit exceeded: {current} subscriptions (max: {max})")]
425    ConnectionLimitExceeded {
426        /// Current number of subscriptions for this connection
427        current: usize,
428        /// Maximum allowed subscriptions per connection
429        max: usize,
430    },
431
432    /// Global subscription limit exceeded
433    #[error("Global limit exceeded: {current} subscriptions (max: {max})")]
434    GlobalLimitExceeded {
435        /// Current total subscriptions across all connections
436        current: usize,
437        /// Maximum allowed subscriptions globally
438        max: usize,
439    },
440
441    /// Result set too large for subscription
442    #[error("Result set too large: {rows} rows (max: {max})")]
443    ResultSetTooLarge {
444        /// Number of rows in the result set
445        rows: usize,
446        /// Maximum allowed rows per subscription
447        max: usize,
448    },
449
450    /// Rate limit exceeded for subscription creation
451    #[error("Rate limited: retry after {retry_after_ms}ms")]
452    RateLimited {
453        /// Milliseconds to wait before retrying
454        retry_after_ms: u64,
455    },
456}