Skip to main content

vibesql_server/subscription/manager/
wire.rs

1//! Wire protocol integration for subscription management.
2//!
3//! This module provides methods for bridging wire protocol UUIDs to internal
4//! subscription IDs, and for updating subscription state from wire protocol.
5
6use super::SubscriptionManager;
7use crate::subscription::{SelectiveColumnConfig, SubscriptionId};
8
9impl SubscriptionManager {
10    /// Find a subscription by its wire protocol ID
11    ///
12    /// # Arguments
13    ///
14    /// * `wire_id` - The wire protocol subscription ID (UUID bytes)
15    ///
16    /// # Returns
17    ///
18    /// The internal subscription ID if found
19    pub fn find_subscription_by_wire_id(&self, wire_id: &[u8; 16]) -> Option<SubscriptionId> {
20        self.wire_id_index.get(wire_id).map(|r| *r)
21    }
22
23    /// Get the subscription count for a specific connection
24    ///
25    /// # Arguments
26    ///
27    /// * `connection_id` - The connection ID to check
28    ///
29    /// # Returns
30    ///
31    /// Number of subscriptions for this connection
32    pub fn connection_subscription_count(&self, connection_id: &str) -> usize {
33        self.connection_subscription_counts
34            .get(connection_id)
35            .map(|c| c.load(std::sync::atomic::Ordering::Acquire))
36            .unwrap_or(0)
37    }
38
39    /// Get affected subscription details for wire protocol
40    ///
41    /// This method finds all subscriptions that depend on the given table and returns
42    /// their details needed for wire protocol notifications.
43    ///
44    /// # Arguments
45    ///
46    /// * `table` - The table name to find affected subscriptions for
47    ///
48    /// # Returns
49    ///
50    /// Vector of (wire_subscription_id, query, last_result_hash, last_result) for
51    /// each affected subscription that has a wire ID.
52    #[allow(clippy::type_complexity)]
53    pub fn get_affected_subscriptions_for_wire_protocol(
54        &self,
55        table: &str,
56    ) -> Vec<([u8; 16], String, u64, Option<Vec<crate::Row>>)> {
57        let table_lower = table.to_lowercase();
58        let subscription_ids: Vec<SubscriptionId> = self
59            .table_index
60            .get(&table_lower)
61            .map(|ids| ids.iter().copied().collect())
62            .unwrap_or_default();
63
64        subscription_ids
65            .into_iter()
66            .filter_map(|id| {
67                self.subscriptions.get(&id).and_then(|sub| {
68                    sub.wire_subscription_id.map(|wire_id| {
69                        (wire_id, sub.query.clone(), sub.last_result_hash, sub.last_result.clone())
70                    })
71                })
72            })
73            .collect()
74    }
75
76    /// Get affected subscription details for wire protocol filtered by connection
77    ///
78    /// This method finds subscriptions for a specific connection that depend on the
79    /// given table and returns their details needed for wire protocol notifications.
80    ///
81    /// # Arguments
82    ///
83    /// * `table` - The table name to find affected subscriptions for
84    /// * `connection_id` - The connection ID to filter by
85    ///
86    /// # Returns
87    ///
88    /// Vector of (wire_subscription_id, query, last_result_hash, last_result, filter) for
89    /// each affected subscription that belongs to the specified connection.
90    #[allow(clippy::type_complexity)]
91    pub fn get_affected_subscriptions_for_connection(
92        &self,
93        table: &str,
94        connection_id: &str,
95    ) -> Vec<([u8; 16], String, u64, Option<Vec<crate::Row>>, Option<String>)> {
96        let table_lower = table.to_lowercase();
97        let subscription_ids: Vec<SubscriptionId> = self
98            .table_index
99            .get(&table_lower)
100            .map(|ids| ids.iter().copied().collect())
101            .unwrap_or_default();
102
103        subscription_ids
104            .into_iter()
105            .filter_map(|id| {
106                self.subscriptions.get(&id).and_then(|sub| {
107                    // Only include subscriptions that belong to this connection
108                    if sub.connection_id.as_deref() == Some(connection_id) {
109                        sub.wire_subscription_id.map(|wire_id| {
110                            (
111                                wire_id,
112                                sub.query.clone(),
113                                sub.last_result_hash,
114                                sub.last_result.clone(),
115                                sub.filter.clone(),
116                            )
117                        })
118                    } else {
119                        None
120                    }
121                })
122            })
123            .collect()
124    }
125
126    /// Update the stored result for a subscription by wire ID
127    ///
128    /// This is used by wire protocol to store results for delta computation.
129    ///
130    /// # Arguments
131    ///
132    /// * `wire_id` - The wire protocol subscription ID (UUID bytes)
133    /// * `result_hash` - Hash of the new result set
134    /// * `result` - The new result set
135    pub fn update_result_by_wire_id(
136        &self,
137        wire_id: &[u8; 16],
138        result_hash: u64,
139        result: Vec<crate::Row>,
140    ) {
141        if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
142            if let Some(mut sub) = self.subscriptions.get_mut(&id) {
143                sub.last_result_hash = result_hash;
144                sub.last_result = Some(result);
145            }
146        }
147    }
148
149    /// Update the primary key columns for a subscription by wire ID
150    ///
151    /// This is called after PK detection to set the actual PK column indices
152    /// for selective column updates.
153    pub fn update_pk_columns_by_wire_id(&self, wire_id: &[u8; 16], pk_columns: Vec<usize>) {
154        if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
155            if let Some(mut sub) = self.subscriptions.get_mut(&id) {
156                sub.pk_columns = pk_columns;
157            }
158        }
159    }
160
161    /// Update PK columns with eligibility tracking
162    ///
163    /// Sets the PK columns and marks whether the subscription is eligible
164    /// for selective column updates (based on confident PK detection).
165    ///
166    /// Returns `true` if the subscription is newly marked as selective-eligible.
167    pub fn update_pk_columns_with_eligibility_by_wire_id(
168        &self,
169        wire_id: &[u8; 16],
170        pk_columns: Vec<usize>,
171        confident: bool,
172    ) -> bool {
173        if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
174            if let Some(mut sub) = self.subscriptions.get_mut(&id) {
175                return sub.set_pk_columns_with_eligibility(pk_columns, confident);
176            }
177        }
178        false
179    }
180
181    /// Update PK columns with eligibility tracking by internal subscription ID
182    ///
183    /// Sets the PK columns and marks whether the subscription is eligible
184    /// for selective column updates (based on confident PK detection).
185    /// This variant is used for HTTP SSE subscriptions that don't have wire IDs.
186    ///
187    /// Returns `true` if the subscription is newly marked as selective-eligible.
188    pub fn update_pk_columns_with_eligibility(
189        &self,
190        id: SubscriptionId,
191        pk_columns: Vec<usize>,
192        confident: bool,
193    ) -> bool {
194        if let Some(mut sub) = self.subscriptions.get_mut(&id) {
195            return sub.set_pk_columns_with_eligibility(pk_columns, confident);
196        }
197        false
198    }
199
200    /// Update selective updates configuration for a subscription
201    ///
202    /// Allows per-subscription overrides of the server-level selective updates config.
203    ///
204    /// # Arguments
205    ///
206    /// * `id` - The subscription ID
207    /// * `config` - The new selective updates configuration (Some to set, None to clear)
208    pub fn update_selective_updates(&self, id: SubscriptionId, config: SelectiveColumnConfig) {
209        if let Some(mut sub) = self.subscriptions.get_mut(&id) {
210            sub.set_selective_updates_override(config);
211        }
212    }
213
214    /// Get the primary key columns for a subscription by wire ID
215    ///
216    /// Returns the PK column indices, or default [0] if not found.
217    pub fn get_pk_columns_by_wire_id(&self, wire_id: &[u8; 16]) -> Vec<usize> {
218        if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
219            if let Some(sub) = self.subscriptions.get(&id) {
220                return sub.pk_columns.clone();
221            }
222        }
223        vec![0] // default
224    }
225
226    /// Set per-subscription selective updates configuration override by wire ID
227    ///
228    /// Allows clients to override server-level selective update thresholds
229    /// on a per-subscription basis via the wire protocol.
230    pub fn set_selective_updates_override_by_wire_id(
231        &self,
232        wire_id: &[u8; 16],
233        config: SelectiveColumnConfig,
234    ) {
235        if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
236            if let Some(mut sub) = self.subscriptions.get_mut(&id) {
237                sub.set_selective_updates_override(config);
238            }
239        }
240    }
241
242    /// Get the effective selective column config for a subscription by wire ID
243    ///
244    /// Returns the per-subscription override config if set, otherwise creates
245    /// a config using the server-level settings with subscription-specific pk_columns.
246    pub fn get_effective_selective_config_by_wire_id(
247        &self,
248        wire_id: &[u8; 16],
249        server_config: &SelectiveColumnConfig,
250    ) -> SelectiveColumnConfig {
251        if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
252            if let Some(sub) = self.subscriptions.get(&id) {
253                return sub.get_effective_selective_config(server_config);
254            }
255        }
256        // Subscription not found, return server config with default pk_columns
257        server_config.with_pk_columns(vec![0])
258    }
259}