1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
//! Wire protocol integration for subscription management.
//!
//! This module provides methods for bridging wire protocol UUIDs to internal
//! subscription IDs, and for updating subscription state from wire protocol.
use super::SubscriptionManager;
use crate::subscription::{SelectiveColumnConfig, SubscriptionId};
impl SubscriptionManager {
/// Find a subscription by its wire protocol ID
///
/// # Arguments
///
/// * `wire_id` - The wire protocol subscription ID (UUID bytes)
///
/// # Returns
///
/// The internal subscription ID if found
pub fn find_subscription_by_wire_id(&self, wire_id: &[u8; 16]) -> Option<SubscriptionId> {
self.wire_id_index.get(wire_id).map(|r| *r)
}
/// Get the subscription count for a specific connection
///
/// # Arguments
///
/// * `connection_id` - The connection ID to check
///
/// # Returns
///
/// Number of subscriptions for this connection
pub fn connection_subscription_count(&self, connection_id: &str) -> usize {
self.connection_subscription_counts
.get(connection_id)
.map(|c| c.load(std::sync::atomic::Ordering::Acquire))
.unwrap_or(0)
}
/// Get affected subscription details for wire protocol
///
/// This method finds all subscriptions that depend on the given table and returns
/// their details needed for wire protocol notifications.
///
/// # Arguments
///
/// * `table` - The table name to find affected subscriptions for
///
/// # Returns
///
/// Vector of (wire_subscription_id, query, last_result_hash, last_result) for
/// each affected subscription that has a wire ID.
#[allow(clippy::type_complexity)]
pub fn get_affected_subscriptions_for_wire_protocol(
&self,
table: &str,
) -> Vec<([u8; 16], String, u64, Option<Vec<crate::Row>>)> {
let table_lower = table.to_lowercase();
let subscription_ids: Vec<SubscriptionId> = self
.table_index
.get(&table_lower)
.map(|ids| ids.iter().copied().collect())
.unwrap_or_default();
subscription_ids
.into_iter()
.filter_map(|id| {
self.subscriptions.get(&id).and_then(|sub| {
sub.wire_subscription_id.map(|wire_id| {
(wire_id, sub.query.clone(), sub.last_result_hash, sub.last_result.clone())
})
})
})
.collect()
}
/// Get affected subscription details for wire protocol filtered by connection
///
/// This method finds subscriptions for a specific connection that depend on the
/// given table and returns their details needed for wire protocol notifications.
///
/// # Arguments
///
/// * `table` - The table name to find affected subscriptions for
/// * `connection_id` - The connection ID to filter by
///
/// # Returns
///
/// Vector of (wire_subscription_id, query, last_result_hash, last_result, filter) for
/// each affected subscription that belongs to the specified connection.
#[allow(clippy::type_complexity)]
pub fn get_affected_subscriptions_for_connection(
&self,
table: &str,
connection_id: &str,
) -> Vec<([u8; 16], String, u64, Option<Vec<crate::Row>>, Option<String>)> {
let table_lower = table.to_lowercase();
let subscription_ids: Vec<SubscriptionId> = self
.table_index
.get(&table_lower)
.map(|ids| ids.iter().copied().collect())
.unwrap_or_default();
subscription_ids
.into_iter()
.filter_map(|id| {
self.subscriptions.get(&id).and_then(|sub| {
// Only include subscriptions that belong to this connection
if sub.connection_id.as_deref() == Some(connection_id) {
sub.wire_subscription_id.map(|wire_id| {
(
wire_id,
sub.query.clone(),
sub.last_result_hash,
sub.last_result.clone(),
sub.filter.clone(),
)
})
} else {
None
}
})
})
.collect()
}
/// Update the stored result for a subscription by wire ID
///
/// This is used by wire protocol to store results for delta computation.
///
/// # Arguments
///
/// * `wire_id` - The wire protocol subscription ID (UUID bytes)
/// * `result_hash` - Hash of the new result set
/// * `result` - The new result set
pub fn update_result_by_wire_id(
&self,
wire_id: &[u8; 16],
result_hash: u64,
result: Vec<crate::Row>,
) {
if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
if let Some(mut sub) = self.subscriptions.get_mut(&id) {
sub.last_result_hash = result_hash;
sub.last_result = Some(result);
}
}
}
/// Update the primary key columns for a subscription by wire ID
///
/// This is called after PK detection to set the actual PK column indices
/// for selective column updates.
pub fn update_pk_columns_by_wire_id(&self, wire_id: &[u8; 16], pk_columns: Vec<usize>) {
if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
if let Some(mut sub) = self.subscriptions.get_mut(&id) {
sub.pk_columns = pk_columns;
}
}
}
/// Update PK columns with eligibility tracking
///
/// Sets the PK columns and marks whether the subscription is eligible
/// for selective column updates (based on confident PK detection).
///
/// Returns `true` if the subscription is newly marked as selective-eligible.
pub fn update_pk_columns_with_eligibility_by_wire_id(
&self,
wire_id: &[u8; 16],
pk_columns: Vec<usize>,
confident: bool,
) -> bool {
if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
if let Some(mut sub) = self.subscriptions.get_mut(&id) {
return sub.set_pk_columns_with_eligibility(pk_columns, confident);
}
}
false
}
/// Update PK columns with eligibility tracking by internal subscription ID
///
/// Sets the PK columns and marks whether the subscription is eligible
/// for selective column updates (based on confident PK detection).
/// This variant is used for HTTP SSE subscriptions that don't have wire IDs.
///
/// Returns `true` if the subscription is newly marked as selective-eligible.
pub fn update_pk_columns_with_eligibility(
&self,
id: SubscriptionId,
pk_columns: Vec<usize>,
confident: bool,
) -> bool {
if let Some(mut sub) = self.subscriptions.get_mut(&id) {
return sub.set_pk_columns_with_eligibility(pk_columns, confident);
}
false
}
/// Update selective updates configuration for a subscription
///
/// Allows per-subscription overrides of the server-level selective updates config.
///
/// # Arguments
///
/// * `id` - The subscription ID
/// * `config` - The new selective updates configuration (Some to set, None to clear)
pub fn update_selective_updates(&self, id: SubscriptionId, config: SelectiveColumnConfig) {
if let Some(mut sub) = self.subscriptions.get_mut(&id) {
sub.set_selective_updates_override(config);
}
}
/// Get the primary key columns for a subscription by wire ID
///
/// Returns the PK column indices, or default [0] if not found.
pub fn get_pk_columns_by_wire_id(&self, wire_id: &[u8; 16]) -> Vec<usize> {
if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
if let Some(sub) = self.subscriptions.get(&id) {
return sub.pk_columns.clone();
}
}
vec![0] // default
}
/// Set per-subscription selective updates configuration override by wire ID
///
/// Allows clients to override server-level selective update thresholds
/// on a per-subscription basis via the wire protocol.
pub fn set_selective_updates_override_by_wire_id(
&self,
wire_id: &[u8; 16],
config: SelectiveColumnConfig,
) {
if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
if let Some(mut sub) = self.subscriptions.get_mut(&id) {
sub.set_selective_updates_override(config);
}
}
}
/// Get the effective selective column config for a subscription by wire ID
///
/// Returns the per-subscription override config if set, otherwise creates
/// a config using the server-level settings with subscription-specific pk_columns.
pub fn get_effective_selective_config_by_wire_id(
&self,
wire_id: &[u8; 16],
server_config: &SelectiveColumnConfig,
) -> SelectiveColumnConfig {
if let Some(id) = self.wire_id_index.get(wire_id).map(|r| *r) {
if let Some(sub) = self.subscriptions.get(&id) {
return sub.get_effective_selective_config(server_config);
}
}
// Subscription not found, return server config with default pk_columns
server_config.with_pk_columns(vec![0])
}
}