Skip to main content

vibesql_server/subscription/
selective.rs

1//! Selective column updates for efficient subscription notifications
2//!
3//! This module provides types and functions for computing partial row updates
4//! that only include changed columns plus primary key columns, reducing bandwidth
5//! for wide tables with few column changes.
6
7use serde::{Deserialize, Serialize};
8
9// ============================================================================
10// Configuration
11// ============================================================================
12
13/// Configuration for selective column updates
14///
15/// This config controls when selective column updates (0xF7 messages) are used
16/// instead of full row updates. Selective updates only send changed columns
17/// plus primary key columns, reducing bandwidth for wide tables with few changes.
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct SelectiveColumnConfig {
20    /// Enable selective column updates
21    #[serde(default = "default_selective_enabled")]
22    pub enabled: bool,
23    /// Column indices that are primary key columns (always included)
24    /// This is per-subscription and not configurable via config file
25    #[serde(skip)]
26    pub pk_columns: Vec<usize>,
27    /// Minimum columns that must change to use selective update
28    /// If fewer columns change, send full row instead
29    #[serde(default = "default_min_changed_columns")]
30    pub min_changed_columns: usize,
31    /// Maximum ratio of changed columns before falling back to full row
32    /// E.g., 0.5 means if >50% of columns changed, send full row instead
33    #[serde(default = "default_max_changed_columns_ratio")]
34    pub max_changed_columns_ratio: f64,
35}
36
37fn default_selective_enabled() -> bool {
38    true
39}
40
41fn default_min_changed_columns() -> usize {
42    1
43}
44
45fn default_max_changed_columns_ratio() -> f64 {
46    0.5
47}
48
49impl Default for SelectiveColumnConfig {
50    fn default() -> Self {
51        Self {
52            enabled: default_selective_enabled(),
53            pk_columns: vec![0], // Assume first column is PK by default
54            min_changed_columns: default_min_changed_columns(),
55            max_changed_columns_ratio: default_max_changed_columns_ratio(),
56        }
57    }
58}
59
60impl SelectiveColumnConfig {
61    /// Create a copy of this config with the specified pk_columns
62    ///
63    /// Useful for creating subscription-specific configs from a server-level template.
64    pub fn with_pk_columns(&self, pk_columns: Vec<usize>) -> Self {
65        Self {
66            enabled: self.enabled,
67            pk_columns,
68            min_changed_columns: self.min_changed_columns,
69            max_changed_columns_ratio: self.max_changed_columns_ratio,
70        }
71    }
72}
73
74// ============================================================================
75// Column Diff
76// ============================================================================
77
78/// Result of column-level diff computation
79#[derive(Debug, Clone)]
80pub struct ColumnDiff {
81    /// Indices of columns that changed
82    pub changed_columns: Vec<usize>,
83    /// Indices of columns to include (PK + changed)
84    pub included_columns: Vec<usize>,
85}
86
87/// Compute which columns differ between two rows
88///
89/// # Arguments
90/// * `old_row` - The previous row values
91/// * `new_row` - The current row values
92/// * `pk_columns` - Indices of primary key columns (always included even if unchanged)
93///
94/// # Returns
95/// * `Some(ColumnDiff)` if rows have same column count and some columns differ
96/// * `None` if rows have different column counts or are identical
97pub fn compute_column_diff(
98    old_row: &crate::Row,
99    new_row: &crate::Row,
100    pk_columns: &[usize],
101) -> Option<ColumnDiff> {
102    // Rows must have same number of columns
103    if old_row.values.len() != new_row.values.len() {
104        return None;
105    }
106
107    let mut changed_columns = Vec::new();
108
109    // Compare each column
110    for (idx, (old_val, new_val)) in old_row.values.iter().zip(new_row.values.iter()).enumerate() {
111        if old_val != new_val {
112            changed_columns.push(idx);
113        }
114    }
115
116    // If no columns changed, return None
117    if changed_columns.is_empty() {
118        return None;
119    }
120
121    // Build included columns: PK columns + changed columns
122    let mut included_columns: Vec<usize> = pk_columns.to_vec();
123    for &idx in &changed_columns {
124        if !included_columns.contains(&idx) {
125            included_columns.push(idx);
126        }
127    }
128    included_columns.sort_unstable();
129
130    Some(ColumnDiff { changed_columns, included_columns })
131}
132
133// ============================================================================
134// Selective Update Decision Functions
135// ============================================================================
136
137/// Determine if selective update should be used based on configuration
138///
139/// Returns true if:
140/// - Selective updates are enabled
141/// - Number of changed columns meets minimum threshold
142/// - Changed column ratio doesn't exceed maximum
143pub fn should_use_selective_update(
144    diff: &ColumnDiff,
145    total_columns: usize,
146    config: &SelectiveColumnConfig,
147) -> bool {
148    if !config.enabled {
149        return false;
150    }
151
152    // Check minimum changed columns
153    if diff.changed_columns.len() < config.min_changed_columns {
154        return false;
155    }
156
157    // Check maximum ratio
158    let changed_ratio = diff.changed_columns.len() as f64 / total_columns as f64;
159    if changed_ratio > config.max_changed_columns_ratio {
160        return false;
161    }
162
163    true
164}
165
166/// Determine if selective update should be used, with metrics recording
167pub fn should_use_selective_update_with_metrics(
168    diff: &ColumnDiff,
169    total_columns: usize,
170    config: &SelectiveColumnConfig,
171    metrics: Option<&crate::observability::metrics::ServerMetrics>,
172) -> bool {
173    if !config.enabled {
174        if let Some(m) = metrics {
175            m.record_partial_update_fallback("disabled");
176        }
177        return false;
178    }
179
180    // Check minimum changed columns
181    if diff.changed_columns.len() < config.min_changed_columns {
182        return false;
183    }
184
185    // Check maximum ratio
186    let changed_ratio = diff.changed_columns.len() as f64 / total_columns as f64;
187    if changed_ratio > config.max_changed_columns_ratio {
188        if let Some(m) = metrics {
189            m.record_partial_update_fallback("threshold_exceeded");
190        }
191        return false;
192    }
193
194    true
195}
196
197// ============================================================================
198// Partial Row Update Creation
199// ============================================================================
200
201/// Create a partial row update from old and new rows
202///
203/// # Arguments
204/// * `old_row` - The previous row values (wire format)
205/// * `new_row` - The current row values (wire format)
206/// * `pk_columns` - Primary key column indices
207/// * `config` - Selective column configuration
208///
209/// # Returns
210/// * `Some(PartialRowUpdate)` if selective update should be used
211/// * `None` if full row should be sent instead
212pub fn create_partial_row_update(
213    old_row: &[Option<Vec<u8>>],
214    new_row: &[Option<Vec<u8>>],
215    pk_columns: &[usize],
216    config: &SelectiveColumnConfig,
217) -> Option<crate::protocol::messages::PartialRowUpdate> {
218    // Rows must have same number of columns
219    if old_row.len() != new_row.len() {
220        return None;
221    }
222
223    let total_columns = new_row.len();
224    let mut changed_columns = Vec::new();
225
226    // Compare each column
227    for (idx, (old_val, new_val)) in old_row.iter().zip(new_row.iter()).enumerate() {
228        if old_val != new_val {
229            changed_columns.push(idx);
230        }
231    }
232
233    // If no columns changed, return None
234    if changed_columns.is_empty() {
235        return None;
236    }
237
238    // Check if we should use selective update
239    let changed_ratio = changed_columns.len() as f64 / total_columns as f64;
240    if !config.enabled || changed_ratio > config.max_changed_columns_ratio {
241        return None;
242    }
243
244    // Build included columns: PK columns + changed columns, sorted
245    let mut included_columns: Vec<usize> = pk_columns.to_vec();
246    for &idx in &changed_columns {
247        if !included_columns.contains(&idx) {
248            included_columns.push(idx);
249        }
250    }
251    included_columns.sort_unstable();
252
253    // Extract values for included columns
254    let values: Vec<Option<Vec<u8>>> =
255        included_columns.iter().map(|&idx| new_row[idx].clone()).collect();
256
257    // Convert to u16 for protocol
258    let present_columns: Vec<u16> = included_columns.iter().map(|&idx| idx as u16).collect();
259
260    Some(crate::protocol::messages::PartialRowUpdate::new(
261        total_columns as u16,
262        &present_columns,
263        values,
264    ))
265}
266
267/// Create a partial row update from old and new rows with metrics recording
268///
269/// # Arguments
270/// * `old_row` - The previous row values (wire format)
271/// * `new_row` - The current row values (wire format)
272/// * `pk_columns` - Primary key column indices
273/// * `config` - Selective column configuration
274/// * `metrics` - Optional metrics for recording fallback reasons
275///
276/// # Returns
277/// * `Some(PartialRowUpdate)` if selective update should be used
278/// * `None` if full row should be sent instead
279pub fn create_partial_row_update_with_metrics(
280    old_row: &[Option<Vec<u8>>],
281    new_row: &[Option<Vec<u8>>],
282    pk_columns: &[usize],
283    config: &SelectiveColumnConfig,
284    metrics: Option<&crate::observability::metrics::ServerMetrics>,
285) -> Option<crate::protocol::messages::PartialRowUpdate> {
286    // Rows must have same number of columns
287    if old_row.len() != new_row.len() {
288        if let Some(m) = metrics {
289            m.record_partial_update_fallback("row_count_mismatch");
290        }
291        return None;
292    }
293
294    let total_columns = new_row.len();
295    let mut changed_columns = Vec::new();
296
297    // Compare each column
298    for (idx, (old_val, new_val)) in old_row.iter().zip(new_row.iter()).enumerate() {
299        if old_val != new_val {
300            changed_columns.push(idx);
301        }
302    }
303
304    // If no columns changed, return None
305    if changed_columns.is_empty() {
306        if let Some(m) = metrics {
307            m.record_partial_update_fallback("no_changes");
308        }
309        return None;
310    }
311
312    // Check if we should use selective update
313    let changed_ratio = changed_columns.len() as f64 / total_columns as f64;
314    if !config.enabled || changed_ratio > config.max_changed_columns_ratio {
315        if let Some(m) = metrics {
316            if !config.enabled {
317                m.record_partial_update_fallback("disabled");
318            } else {
319                m.record_partial_update_fallback("threshold_exceeded");
320            }
321        }
322        return None;
323    }
324
325    // Build included columns: PK columns + changed columns, sorted
326    let mut included_columns: Vec<usize> = pk_columns.to_vec();
327    for &idx in &changed_columns {
328        if !included_columns.contains(&idx) {
329            included_columns.push(idx);
330        }
331    }
332    included_columns.sort_unstable();
333
334    // Extract values for included columns
335    let values: Vec<Option<Vec<u8>>> =
336        included_columns.iter().map(|&idx| new_row[idx].clone()).collect();
337
338    // Convert to u16 for protocol
339    let present_columns: Vec<u16> = included_columns.iter().map(|&idx| idx as u16).collect();
340
341    Some(crate::protocol::messages::PartialRowUpdate::new(
342        total_columns as u16,
343        &present_columns,
344        values,
345    ))
346}