Skip to main content

vibesql_server/subscription/
delta.rs

1//! Delta computation for subscription updates
2//!
3//! This module provides types and functions for computing the differences
4//! between result sets, enabling efficient incremental updates to subscribers.
5
6use super::hash::hash_row;
7use super::types::{SubscriptionId, SubscriptionUpdate};
8
9// ============================================================================
10// Partial Row Delta
11// ============================================================================
12
13/// A partial row update containing only changed columns plus primary key columns
14///
15/// Used for efficient updates when only a subset of columns have changed.
16/// The `column_indices` field indicates which columns are present in `values`.
17#[derive(Debug, Clone)]
18pub struct PartialRowDelta {
19    /// Indices of columns that are included in this partial update
20    /// (primary key columns + changed columns, sorted)
21    pub column_indices: Vec<usize>,
22    /// Old values for the included columns
23    pub old_values: Vec<vibesql_types::SqlValue>,
24    /// New values for the included columns
25    pub new_values: Vec<vibesql_types::SqlValue>,
26}
27
28impl PartialRowDelta {
29    /// Create a new partial row delta from old and new rows
30    ///
31    /// # Arguments
32    /// * `old_row` - The previous row values
33    /// * `new_row` - The current row values
34    /// * `pk_columns` - Primary key column indices (always included)
35    ///
36    /// # Returns
37    /// * `Some(PartialRowDelta)` if the rows differ
38    /// * `None` if the rows are identical
39    pub fn from_rows(
40        old_row: &crate::Row,
41        new_row: &crate::Row,
42        pk_columns: &[usize],
43    ) -> Option<Self> {
44        if old_row.values.len() != new_row.values.len() {
45            return None;
46        }
47
48        // Find changed columns
49        let mut changed_columns = Vec::new();
50        for (idx, (old_val, new_val)) in
51            old_row.values.iter().zip(new_row.values.iter()).enumerate()
52        {
53            if old_val != new_val {
54                changed_columns.push(idx);
55            }
56        }
57
58        // If no columns changed, return None
59        if changed_columns.is_empty() {
60            return None;
61        }
62
63        // Build included columns: PK columns + changed columns, sorted
64        let mut column_indices: Vec<usize> = pk_columns.to_vec();
65        for &idx in &changed_columns {
66            if !column_indices.contains(&idx) {
67                column_indices.push(idx);
68            }
69        }
70        column_indices.sort_unstable();
71
72        // Extract values for included columns
73        let old_values: Vec<vibesql_types::SqlValue> =
74            column_indices.iter().map(|&idx| old_row.values[idx].clone()).collect();
75        let new_values: Vec<vibesql_types::SqlValue> =
76            column_indices.iter().map(|&idx| new_row.values[idx].clone()).collect();
77
78        Some(Self { column_indices, old_values, new_values })
79    }
80}
81
82// ============================================================================
83// Delta Computation
84// ============================================================================
85
86/// Compute delta between old and new result sets
87///
88/// This function compares two result sets and produces a delta update
89/// containing the inserts, updates, and deletes needed to transform
90/// the old result into the new result.
91///
92/// # Algorithm
93///
94/// Uses row hashing to efficiently detect changes:
95/// - Rows in new but not in old are inserts
96/// - Rows in old but not in new are deletes
97/// - Updates are not detected in this implementation (would appear as delete + insert)
98///
99/// For proper update detection, use `compute_delta_with_pk()` with primary key information.
100///
101/// # Returns
102///
103/// Returns `Some(SubscriptionUpdate::Delta)` if there are changes,
104/// or `None` if the result sets are identical.
105pub fn compute_delta(
106    subscription_id: SubscriptionId,
107    old: &[crate::Row],
108    new: &[crate::Row],
109) -> Option<SubscriptionUpdate> {
110    // Delegate to PK-based implementation with empty pk_columns for backward compatibility
111    compute_delta_with_pk(subscription_id, old, new, &[])
112}
113
114/// Compute delta between old and new result sets using primary key columns
115///
116/// This function compares two result sets and produces a delta update
117/// containing the inserts, updates, and deletes needed to transform
118/// the old result into the new result.
119///
120/// # Algorithm
121///
122/// When `pk_columns` is provided and non-empty:
123/// - Builds a lookup map of old rows indexed by their PK values
124/// - For each new row, looks up by PK to determine if it's an INSERT or UPDATE
125/// - Rows in old but not in new (by PK) are DELETEs
126/// - Rows with same PK but different content are UPDATEs
127///
128/// When `pk_columns` is empty, falls back to hash-based matching:
129/// - Rows in new but not in old are inserts
130/// - Rows in old but not in new are deletes
131/// - Updates appear as delete + insert pairs
132///
133/// # Arguments
134///
135/// * `subscription_id` - The subscription ID for the delta update
136/// * `old` - Previous result set rows
137/// * `new` - Current result set rows
138/// * `pk_columns` - Indices of primary key columns in the result set
139///
140/// # Returns
141///
142/// Returns `Some(SubscriptionUpdate::Delta)` if there are changes,
143/// or `None` if the result sets are identical.
144pub fn compute_delta_with_pk(
145    subscription_id: SubscriptionId,
146    old: &[crate::Row],
147    new: &[crate::Row],
148    pk_columns: &[usize],
149) -> Option<SubscriptionUpdate> {
150    use std::collections::HashMap;
151
152    // If no PK columns provided, use hash-based matching
153    if pk_columns.is_empty() {
154        return compute_delta_hash_based(subscription_id, old, new);
155    }
156
157    // Validate PK columns are within bounds for both old and new rows
158    let valid_pk = old.iter().chain(new.iter()).all(|row| {
159        pk_columns.iter().all(|&idx| idx < row.values.len())
160    });
161
162    if !valid_pk {
163        // Fall back to hash-based if PK columns are out of bounds
164        return compute_delta_hash_based(subscription_id, old, new);
165    }
166
167    // Build a lookup map of old rows indexed by PK values
168    // Key: PK values as a vector, Value: list of (index, row) for handling duplicates
169    let mut old_by_pk: HashMap<Vec<&vibesql_types::SqlValue>, Vec<(usize, &crate::Row)>> =
170        HashMap::new();
171    for (idx, row) in old.iter().enumerate() {
172        let pk_values: Vec<&vibesql_types::SqlValue> =
173            pk_columns.iter().map(|&i| &row.values[i]).collect();
174        old_by_pk.entry(pk_values).or_default().push((idx, row));
175    }
176
177    let mut inserts = Vec::new();
178    let mut updates: Vec<(crate::Row, crate::Row)> = Vec::new();
179    let mut matched_old_indices = std::collections::HashSet::new();
180
181    // Process each new row
182    for new_row in new {
183        let pk_values: Vec<&vibesql_types::SqlValue> =
184            pk_columns.iter().map(|&i| &new_row.values[i]).collect();
185
186        if let Some(old_rows) = old_by_pk.get_mut(&pk_values) {
187            // Found matching PK in old - check if it's an update or unchanged
188            if let Some((old_idx, old_row)) = old_rows.pop() {
189                matched_old_indices.insert(old_idx);
190
191                // Compare full row content to detect changes
192                if old_row.values != new_row.values {
193                    // Content differs - this is an UPDATE
194                    updates.push((old_row.clone(), new_row.clone()));
195                }
196                // If content is identical, row is unchanged - no action needed
197            } else {
198                // No more old rows with this PK - treat as insert
199                // (handles case where new has more duplicates than old)
200                inserts.push(new_row.clone());
201            }
202        } else {
203            // No matching PK in old - this is an INSERT
204            inserts.push(new_row.clone());
205        }
206    }
207
208    // Find deletes: old rows that weren't matched
209    let deletes: Vec<crate::Row> = old
210        .iter()
211        .enumerate()
212        .filter(|(idx, _)| !matched_old_indices.contains(idx))
213        .map(|(_, row)| row.clone())
214        .collect();
215
216    // If no changes, return None
217    if inserts.is_empty() && updates.is_empty() && deletes.is_empty() {
218        return None;
219    }
220
221    Some(SubscriptionUpdate::Delta { subscription_id, inserts, updates, deletes })
222}
223
224/// Hash-based delta computation (original algorithm)
225///
226/// This is the fallback when PK columns are not available.
227fn compute_delta_hash_based(
228    subscription_id: SubscriptionId,
229    old: &[crate::Row],
230    new: &[crate::Row],
231) -> Option<SubscriptionUpdate> {
232    use std::collections::HashMap;
233
234    // Build hash maps for efficient lookup
235    // Map from row hash -> (count, row reference)
236    // We use count to handle duplicate rows correctly
237    let mut old_map: HashMap<u64, Vec<&crate::Row>> = HashMap::new();
238    for row in old {
239        let hash = hash_row(row);
240        old_map.entry(hash).or_default().push(row);
241    }
242
243    let mut new_map: HashMap<u64, Vec<&crate::Row>> = HashMap::new();
244    for row in new {
245        let hash = hash_row(row);
246        new_map.entry(hash).or_default().push(row);
247    }
248
249    let mut inserts = Vec::new();
250    let mut deletes = Vec::new();
251
252    // Find inserts: rows in new but not in old (or with higher count in new)
253    for (hash, new_rows) in &new_map {
254        let old_rows = old_map.get(hash).map(|v| v.as_slice()).unwrap_or(&[]);
255
256        // For each row in new that exceeds the count in old, it's an insert
257        if new_rows.len() > old_rows.len() {
258            for row in new_rows.iter().skip(old_rows.len()) {
259                inserts.push((*row).clone());
260            }
261        }
262    }
263
264    // Find deletes: rows in old but not in new (or with higher count in old)
265    for (hash, old_rows) in &old_map {
266        let new_rows = new_map.get(hash).map(|v| v.as_slice()).unwrap_or(&[]);
267
268        // For each row in old that exceeds the count in new, it's a delete
269        if old_rows.len() > new_rows.len() {
270            for row in old_rows.iter().skip(new_rows.len()) {
271                deletes.push((*row).clone());
272            }
273        }
274    }
275
276    // If no changes, return None
277    if inserts.is_empty() && deletes.is_empty() {
278        return None;
279    }
280
281    // Updates are not detected in hash-based mode
282    // A row update would appear as a delete of the old row + insert of the new row
283    let updates = Vec::new();
284
285    Some(SubscriptionUpdate::Delta { subscription_id, inserts, updates, deletes })
286}