vibesql_server/subscription/
selective.rs1use serde::{Deserialize, Serialize};
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct SelectiveColumnConfig {
20 #[serde(default = "default_selective_enabled")]
22 pub enabled: bool,
23 #[serde(skip)]
26 pub pk_columns: Vec<usize>,
27 #[serde(default = "default_min_changed_columns")]
30 pub min_changed_columns: usize,
31 #[serde(default = "default_max_changed_columns_ratio")]
34 pub max_changed_columns_ratio: f64,
35}
36
37fn default_selective_enabled() -> bool {
38 true
39}
40
41fn default_min_changed_columns() -> usize {
42 1
43}
44
45fn default_max_changed_columns_ratio() -> f64 {
46 0.5
47}
48
49impl Default for SelectiveColumnConfig {
50 fn default() -> Self {
51 Self {
52 enabled: default_selective_enabled(),
53 pk_columns: vec![0], min_changed_columns: default_min_changed_columns(),
55 max_changed_columns_ratio: default_max_changed_columns_ratio(),
56 }
57 }
58}
59
60impl SelectiveColumnConfig {
61 pub fn with_pk_columns(&self, pk_columns: Vec<usize>) -> Self {
65 Self {
66 enabled: self.enabled,
67 pk_columns,
68 min_changed_columns: self.min_changed_columns,
69 max_changed_columns_ratio: self.max_changed_columns_ratio,
70 }
71 }
72}
73
74#[derive(Debug, Clone)]
80pub struct ColumnDiff {
81 pub changed_columns: Vec<usize>,
83 pub included_columns: Vec<usize>,
85}
86
87pub fn compute_column_diff(
98 old_row: &crate::Row,
99 new_row: &crate::Row,
100 pk_columns: &[usize],
101) -> Option<ColumnDiff> {
102 if old_row.values.len() != new_row.values.len() {
104 return None;
105 }
106
107 let mut changed_columns = Vec::new();
108
109 for (idx, (old_val, new_val)) in old_row.values.iter().zip(new_row.values.iter()).enumerate() {
111 if old_val != new_val {
112 changed_columns.push(idx);
113 }
114 }
115
116 if changed_columns.is_empty() {
118 return None;
119 }
120
121 let mut included_columns: Vec<usize> = pk_columns.to_vec();
123 for &idx in &changed_columns {
124 if !included_columns.contains(&idx) {
125 included_columns.push(idx);
126 }
127 }
128 included_columns.sort_unstable();
129
130 Some(ColumnDiff { changed_columns, included_columns })
131}
132
133pub fn should_use_selective_update(
144 diff: &ColumnDiff,
145 total_columns: usize,
146 config: &SelectiveColumnConfig,
147) -> bool {
148 if !config.enabled {
149 return false;
150 }
151
152 if diff.changed_columns.len() < config.min_changed_columns {
154 return false;
155 }
156
157 let changed_ratio = diff.changed_columns.len() as f64 / total_columns as f64;
159 if changed_ratio > config.max_changed_columns_ratio {
160 return false;
161 }
162
163 true
164}
165
166pub fn should_use_selective_update_with_metrics(
168 diff: &ColumnDiff,
169 total_columns: usize,
170 config: &SelectiveColumnConfig,
171 metrics: Option<&crate::observability::metrics::ServerMetrics>,
172) -> bool {
173 if !config.enabled {
174 if let Some(m) = metrics {
175 m.record_partial_update_fallback("disabled");
176 }
177 return false;
178 }
179
180 if diff.changed_columns.len() < config.min_changed_columns {
182 return false;
183 }
184
185 let changed_ratio = diff.changed_columns.len() as f64 / total_columns as f64;
187 if changed_ratio > config.max_changed_columns_ratio {
188 if let Some(m) = metrics {
189 m.record_partial_update_fallback("threshold_exceeded");
190 }
191 return false;
192 }
193
194 true
195}
196
197pub fn create_partial_row_update(
213 old_row: &[Option<Vec<u8>>],
214 new_row: &[Option<Vec<u8>>],
215 pk_columns: &[usize],
216 config: &SelectiveColumnConfig,
217) -> Option<crate::protocol::messages::PartialRowUpdate> {
218 if old_row.len() != new_row.len() {
220 return None;
221 }
222
223 let total_columns = new_row.len();
224 let mut changed_columns = Vec::new();
225
226 for (idx, (old_val, new_val)) in old_row.iter().zip(new_row.iter()).enumerate() {
228 if old_val != new_val {
229 changed_columns.push(idx);
230 }
231 }
232
233 if changed_columns.is_empty() {
235 return None;
236 }
237
238 let changed_ratio = changed_columns.len() as f64 / total_columns as f64;
240 if !config.enabled || changed_ratio > config.max_changed_columns_ratio {
241 return None;
242 }
243
244 let mut included_columns: Vec<usize> = pk_columns.to_vec();
246 for &idx in &changed_columns {
247 if !included_columns.contains(&idx) {
248 included_columns.push(idx);
249 }
250 }
251 included_columns.sort_unstable();
252
253 let values: Vec<Option<Vec<u8>>> =
255 included_columns.iter().map(|&idx| new_row[idx].clone()).collect();
256
257 let present_columns: Vec<u16> = included_columns.iter().map(|&idx| idx as u16).collect();
259
260 Some(crate::protocol::messages::PartialRowUpdate::new(
261 total_columns as u16,
262 &present_columns,
263 values,
264 ))
265}
266
267pub fn create_partial_row_update_with_metrics(
280 old_row: &[Option<Vec<u8>>],
281 new_row: &[Option<Vec<u8>>],
282 pk_columns: &[usize],
283 config: &SelectiveColumnConfig,
284 metrics: Option<&crate::observability::metrics::ServerMetrics>,
285) -> Option<crate::protocol::messages::PartialRowUpdate> {
286 if old_row.len() != new_row.len() {
288 if let Some(m) = metrics {
289 m.record_partial_update_fallback("row_count_mismatch");
290 }
291 return None;
292 }
293
294 let total_columns = new_row.len();
295 let mut changed_columns = Vec::new();
296
297 for (idx, (old_val, new_val)) in old_row.iter().zip(new_row.iter()).enumerate() {
299 if old_val != new_val {
300 changed_columns.push(idx);
301 }
302 }
303
304 if changed_columns.is_empty() {
306 if let Some(m) = metrics {
307 m.record_partial_update_fallback("no_changes");
308 }
309 return None;
310 }
311
312 let changed_ratio = changed_columns.len() as f64 / total_columns as f64;
314 if !config.enabled || changed_ratio > config.max_changed_columns_ratio {
315 if let Some(m) = metrics {
316 if !config.enabled {
317 m.record_partial_update_fallback("disabled");
318 } else {
319 m.record_partial_update_fallback("threshold_exceeded");
320 }
321 }
322 return None;
323 }
324
325 let mut included_columns: Vec<usize> = pk_columns.to_vec();
327 for &idx in &changed_columns {
328 if !included_columns.contains(&idx) {
329 included_columns.push(idx);
330 }
331 }
332 included_columns.sort_unstable();
333
334 let values: Vec<Option<Vec<u8>>> =
336 included_columns.iter().map(|&idx| new_row[idx].clone()).collect();
337
338 let present_columns: Vec<u16> = included_columns.iter().map(|&idx| idx as u16).collect();
340
341 Some(crate::protocol::messages::PartialRowUpdate::new(
342 total_columns as u16,
343 &present_columns,
344 values,
345 ))
346}