vibesql_server/subscription/manager/wire.rs
1//! Wire protocol integration for subscription management.
2//!
3//! This module provides methods for bridging wire protocol UUIDs to internal
4//! subscription IDs, and for updating subscription state from wire protocol.
5
6use super::SubscriptionManager;
7use crate::subscription::{SelectiveColumnConfig, SubscriptionId};
8
9impl SubscriptionManager {
10 /// Find a subscription by its wire protocol ID
11 ///
12 /// # Arguments
13 ///
14 /// * `wire_id` - The wire protocol subscription ID (UUID bytes)
15 ///
16 /// # Returns
17 ///
18 /// The internal subscription ID if found
19 pub fn find_subscription_by_wire_id(&self, wire_id: &[u8; 16]) -> Option<SubscriptionId> {
20 self.wire_id_index.get(wire_id).map(|r| *r)
21 }
22
23 /// Get the subscription count for a specific connection
24 ///
25 /// # Arguments
26 ///
27 /// * `connection_id` - The connection ID to check
28 ///
29 /// # Returns
30 ///
31 /// Number of subscriptions for this connection
32 pub fn connection_subscription_count(&self, connection_id: &str) -> usize {
33 self.connection_subscription_counts
34 .get(connection_id)
35 .map(|c| c.load(std::sync::atomic::Ordering::Acquire))
36 .unwrap_or(0)
37 }
38
39 /// Get affected subscription details for wire protocol
40 ///
41 /// This method finds all subscriptions that depend on the given table and returns
42 /// their details needed for wire protocol notifications.
43 ///
44 /// # Arguments
45 ///
46 /// * `table` - The table name to find affected subscriptions for
47 ///
48 /// # Returns
49 ///
50 /// Vector of (wire_subscription_id, query, last_result_hash, last_result) for
51 /// each affected subscription that has a wire ID.
52 #[allow(clippy::type_complexity)]
53 pub fn get_affected_subscriptions_for_wire_protocol(
54 &self,
55 table: &str,
56 ) -> Vec<([u8; 16], String, u64, Option<Vec<crate::Row>>)> {
57 let table_lower = table.to_lowercase();
58 let subscription_ids: Vec<SubscriptionId> = self
59 .table_index
60 .get(&table_lower)
61 .map(|ids| ids.iter().copied().collect())
62 .unwrap_or_default();
63
64 subscription_ids
65 .into_iter()
66 .filter_map(|id| {
67 self.subscriptions.get(&id).and_then(|sub| {
68 sub.wire_subscription_id.map(|wire_id| {
69 (wire_id, sub.query.clone(), sub.last_result_hash, sub.last_result.clone())
70 })
71 })
72 })
73 .collect()
74 }
75
76 /// Get affected subscription details for wire protocol filtered by connection
77 ///
78 /// This method finds subscriptions for a specific connection that depend on the
79 /// given table and returns their details needed for wire protocol notifications.
80 ///
81 /// # Arguments
82 ///
83 /// * `table` - The table name to find affected subscriptions for
84 /// * `connection_id` - The connection ID to filter by
85 ///
86 /// # Returns
87 ///
88 /// Vector of (wire_subscription_id, query, last_result_hash, last_result, filter) for
89 /// each affected subscription that belongs to the specified connection.
90 #[allow(clippy::type_complexity)]
91 pub fn get_affected_subscriptions_for_connection(
92 &self,
93 table: &str,
94 connection_id: &str,
95 ) -> Vec<([u8; 16], String, u64, Option<Vec<crate::Row>>, Option<String>)> {
96 let table_lower = table.to_lowercase();
97 let subscription_ids: Vec<SubscriptionId> = self
98 .table_index
99 .get(&table_lower)
100 .map(|ids| ids.iter().copied().collect())
101 .unwrap_or_default();
102
103 subscription_ids
104 .into_iter()
105 .filter_map(|id| {
106 self.subscriptions.get(&id).and_then(|sub| {
107 // Only include subscriptions that belong to this connection
108 if sub.connection_id.as_deref() == Some(connection_id) {
109 sub.wire_subscription_id.map(|wire_id| {
110 (
111 wire_id,
112 sub.query.clone(),
113 sub.last_result_hash,
114 sub.last_result.clone(),
115 sub.filter.clone(),
116 )
117 })
118 } else {
119 None
120 }
121 })
122 })
123 .collect()
124 }
125
126 /// Update the stored result for a subscription by wire ID
127 ///
128 /// This is used by wire protocol to store results for delta computation.
129 ///
130 /// # Arguments
131 ///
132 /// * `wire_id` - The wire protocol subscription ID (UUID bytes)
133 /// * `result_hash` - Hash of the new result set
134 /// * `result` - The new result set
135 pub fn update_result_by_wire_id(
136 &self,
137 wire_id: &[u8; 16],
138 result_hash: u64,
139 result: Vec<crate::Row>,
140 ) {
141 if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
142 if let Some(mut sub) = self.subscriptions.get_mut(&id) {
143 sub.last_result_hash = result_hash;
144 sub.last_result = Some(result);
145 }
146 }
147 }
148
149 /// Update the primary key columns for a subscription by wire ID
150 ///
151 /// This is called after PK detection to set the actual PK column indices
152 /// for selective column updates.
153 pub fn update_pk_columns_by_wire_id(&self, wire_id: &[u8; 16], pk_columns: Vec<usize>) {
154 if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
155 if let Some(mut sub) = self.subscriptions.get_mut(&id) {
156 sub.pk_columns = pk_columns;
157 }
158 }
159 }
160
161 /// Update PK columns with eligibility tracking
162 ///
163 /// Sets the PK columns and marks whether the subscription is eligible
164 /// for selective column updates (based on confident PK detection).
165 ///
166 /// Returns `true` if the subscription is newly marked as selective-eligible.
167 pub fn update_pk_columns_with_eligibility_by_wire_id(
168 &self,
169 wire_id: &[u8; 16],
170 pk_columns: Vec<usize>,
171 confident: bool,
172 ) -> bool {
173 if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
174 if let Some(mut sub) = self.subscriptions.get_mut(&id) {
175 return sub.set_pk_columns_with_eligibility(pk_columns, confident);
176 }
177 }
178 false
179 }
180
181 /// Update PK columns with eligibility tracking by internal subscription ID
182 ///
183 /// Sets the PK columns and marks whether the subscription is eligible
184 /// for selective column updates (based on confident PK detection).
185 /// This variant is used for HTTP SSE subscriptions that don't have wire IDs.
186 ///
187 /// Returns `true` if the subscription is newly marked as selective-eligible.
188 pub fn update_pk_columns_with_eligibility(
189 &self,
190 id: SubscriptionId,
191 pk_columns: Vec<usize>,
192 confident: bool,
193 ) -> bool {
194 if let Some(mut sub) = self.subscriptions.get_mut(&id) {
195 return sub.set_pk_columns_with_eligibility(pk_columns, confident);
196 }
197 false
198 }
199
200 /// Update selective updates configuration for a subscription
201 ///
202 /// Allows per-subscription overrides of the server-level selective updates config.
203 ///
204 /// # Arguments
205 ///
206 /// * `id` - The subscription ID
207 /// * `config` - The new selective updates configuration (Some to set, None to clear)
208 pub fn update_selective_updates(&self, id: SubscriptionId, config: SelectiveColumnConfig) {
209 if let Some(mut sub) = self.subscriptions.get_mut(&id) {
210 sub.set_selective_updates_override(config);
211 }
212 }
213
214 /// Get the primary key columns for a subscription by wire ID
215 ///
216 /// Returns the PK column indices, or default [0] if not found.
217 pub fn get_pk_columns_by_wire_id(&self, wire_id: &[u8; 16]) -> Vec<usize> {
218 if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
219 if let Some(sub) = self.subscriptions.get(&id) {
220 return sub.pk_columns.clone();
221 }
222 }
223 vec![0] // default
224 }
225
226 /// Set per-subscription selective updates configuration override by wire ID
227 ///
228 /// Allows clients to override server-level selective update thresholds
229 /// on a per-subscription basis via the wire protocol.
230 pub fn set_selective_updates_override_by_wire_id(
231 &self,
232 wire_id: &[u8; 16],
233 config: SelectiveColumnConfig,
234 ) {
235 if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
236 if let Some(mut sub) = self.subscriptions.get_mut(&id) {
237 sub.set_selective_updates_override(config);
238 }
239 }
240 }
241
242 /// Get the effective selective column config for a subscription by wire ID
243 ///
244 /// Returns the per-subscription override config if set, otherwise creates
245 /// a config using the server-level settings with subscription-specific pk_columns.
246 pub fn get_effective_selective_config_by_wire_id(
247 &self,
248 wire_id: &[u8; 16],
249 server_config: &SelectiveColumnConfig,
250 ) -> SelectiveColumnConfig {
251 if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
252 if let Some(sub) = self.subscriptions.get(&id) {
253 return sub.get_effective_selective_config(server_config);
254 }
255 }
256 // Subscription not found, return server config with default pk_columns
257 server_config.with_pk_columns(vec![0])
258 }
259}