vibesql_server/subscription/delta.rs
1//! Delta computation for subscription updates
2//!
3//! This module provides types and functions for computing the differences
4//! between result sets, enabling efficient incremental updates to subscribers.
5
6use super::hash::hash_row;
7use super::types::{SubscriptionId, SubscriptionUpdate};
8
9// ============================================================================
10// Partial Row Delta
11// ============================================================================
12
13/// A partial row update containing only changed columns plus primary key columns
14///
15/// Used for efficient updates when only a subset of columns have changed.
16/// The `column_indices` field indicates which columns are present in `values`.
17#[derive(Debug, Clone)]
18pub struct PartialRowDelta {
19 /// Indices of columns that are included in this partial update
20 /// (primary key columns + changed columns, sorted)
21 pub column_indices: Vec<usize>,
22 /// Old values for the included columns
23 pub old_values: Vec<vibesql_types::SqlValue>,
24 /// New values for the included columns
25 pub new_values: Vec<vibesql_types::SqlValue>,
26}
27
28impl PartialRowDelta {
29 /// Create a new partial row delta from old and new rows
30 ///
31 /// # Arguments
32 /// * `old_row` - The previous row values
33 /// * `new_row` - The current row values
34 /// * `pk_columns` - Primary key column indices (always included)
35 ///
36 /// # Returns
37 /// * `Some(PartialRowDelta)` if the rows differ
38 /// * `None` if the rows are identical
39 pub fn from_rows(
40 old_row: &crate::Row,
41 new_row: &crate::Row,
42 pk_columns: &[usize],
43 ) -> Option<Self> {
44 if old_row.values.len() != new_row.values.len() {
45 return None;
46 }
47
48 // Find changed columns
49 let mut changed_columns = Vec::new();
50 for (idx, (old_val, new_val)) in
51 old_row.values.iter().zip(new_row.values.iter()).enumerate()
52 {
53 if old_val != new_val {
54 changed_columns.push(idx);
55 }
56 }
57
58 // If no columns changed, return None
59 if changed_columns.is_empty() {
60 return None;
61 }
62
63 // Build included columns: PK columns + changed columns, sorted
64 let mut column_indices: Vec<usize> = pk_columns.to_vec();
65 for &idx in &changed_columns {
66 if !column_indices.contains(&idx) {
67 column_indices.push(idx);
68 }
69 }
70 column_indices.sort_unstable();
71
72 // Extract values for included columns
73 let old_values: Vec<vibesql_types::SqlValue> =
74 column_indices.iter().map(|&idx| old_row.values[idx].clone()).collect();
75 let new_values: Vec<vibesql_types::SqlValue> =
76 column_indices.iter().map(|&idx| new_row.values[idx].clone()).collect();
77
78 Some(Self { column_indices, old_values, new_values })
79 }
80}
81
82// ============================================================================
83// Delta Computation
84// ============================================================================
85
86/// Compute delta between old and new result sets
87///
88/// This function compares two result sets and produces a delta update
89/// containing the inserts, updates, and deletes needed to transform
90/// the old result into the new result.
91///
92/// # Algorithm
93///
94/// Uses row hashing to efficiently detect changes:
95/// - Rows in new but not in old are inserts
96/// - Rows in old but not in new are deletes
97/// - Updates are not detected in this implementation (would appear as delete + insert)
98///
99/// For proper update detection, use `compute_delta_with_pk()` with primary key information.
100///
101/// # Returns
102///
103/// Returns `Some(SubscriptionUpdate::Delta)` if there are changes,
104/// or `None` if the result sets are identical.
105pub fn compute_delta(
106 subscription_id: SubscriptionId,
107 old: &[crate::Row],
108 new: &[crate::Row],
109) -> Option<SubscriptionUpdate> {
110 // Delegate to PK-based implementation with empty pk_columns for backward compatibility
111 compute_delta_with_pk(subscription_id, old, new, &[])
112}
113
114/// Compute delta between old and new result sets using primary key columns
115///
116/// This function compares two result sets and produces a delta update
117/// containing the inserts, updates, and deletes needed to transform
118/// the old result into the new result.
119///
120/// # Algorithm
121///
122/// When `pk_columns` is provided and non-empty:
123/// - Builds a lookup map of old rows indexed by their PK values
124/// - For each new row, looks up by PK to determine if it's an INSERT or UPDATE
125/// - Rows in old but not in new (by PK) are DELETEs
126/// - Rows with same PK but different content are UPDATEs
127///
128/// When `pk_columns` is empty, falls back to hash-based matching:
129/// - Rows in new but not in old are inserts
130/// - Rows in old but not in new are deletes
131/// - Updates appear as delete + insert pairs
132///
133/// # Arguments
134///
135/// * `subscription_id` - The subscription ID for the delta update
136/// * `old` - Previous result set rows
137/// * `new` - Current result set rows
138/// * `pk_columns` - Indices of primary key columns in the result set
139///
140/// # Returns
141///
142/// Returns `Some(SubscriptionUpdate::Delta)` if there are changes,
143/// or `None` if the result sets are identical.
144pub fn compute_delta_with_pk(
145 subscription_id: SubscriptionId,
146 old: &[crate::Row],
147 new: &[crate::Row],
148 pk_columns: &[usize],
149) -> Option<SubscriptionUpdate> {
150 use std::collections::HashMap;
151
152 // If no PK columns provided, use hash-based matching
153 if pk_columns.is_empty() {
154 return compute_delta_hash_based(subscription_id, old, new);
155 }
156
157 // Validate PK columns are within bounds for both old and new rows
158 let valid_pk = old.iter().chain(new.iter()).all(|row| {
159 pk_columns.iter().all(|&idx| idx < row.values.len())
160 });
161
162 if !valid_pk {
163 // Fall back to hash-based if PK columns are out of bounds
164 return compute_delta_hash_based(subscription_id, old, new);
165 }
166
167 // Build a lookup map of old rows indexed by PK values
168 // Key: PK values as a vector, Value: list of (index, row) for handling duplicates
169 let mut old_by_pk: HashMap<Vec<&vibesql_types::SqlValue>, Vec<(usize, &crate::Row)>> =
170 HashMap::new();
171 for (idx, row) in old.iter().enumerate() {
172 let pk_values: Vec<&vibesql_types::SqlValue> =
173 pk_columns.iter().map(|&i| &row.values[i]).collect();
174 old_by_pk.entry(pk_values).or_default().push((idx, row));
175 }
176
177 let mut inserts = Vec::new();
178 let mut updates: Vec<(crate::Row, crate::Row)> = Vec::new();
179 let mut matched_old_indices = std::collections::HashSet::new();
180
181 // Process each new row
182 for new_row in new {
183 let pk_values: Vec<&vibesql_types::SqlValue> =
184 pk_columns.iter().map(|&i| &new_row.values[i]).collect();
185
186 if let Some(old_rows) = old_by_pk.get_mut(&pk_values) {
187 // Found matching PK in old - check if it's an update or unchanged
188 if let Some((old_idx, old_row)) = old_rows.pop() {
189 matched_old_indices.insert(old_idx);
190
191 // Compare full row content to detect changes
192 if old_row.values != new_row.values {
193 // Content differs - this is an UPDATE
194 updates.push((old_row.clone(), new_row.clone()));
195 }
196 // If content is identical, row is unchanged - no action needed
197 } else {
198 // No more old rows with this PK - treat as insert
199 // (handles case where new has more duplicates than old)
200 inserts.push(new_row.clone());
201 }
202 } else {
203 // No matching PK in old - this is an INSERT
204 inserts.push(new_row.clone());
205 }
206 }
207
208 // Find deletes: old rows that weren't matched
209 let deletes: Vec<crate::Row> = old
210 .iter()
211 .enumerate()
212 .filter(|(idx, _)| !matched_old_indices.contains(idx))
213 .map(|(_, row)| row.clone())
214 .collect();
215
216 // If no changes, return None
217 if inserts.is_empty() && updates.is_empty() && deletes.is_empty() {
218 return None;
219 }
220
221 Some(SubscriptionUpdate::Delta { subscription_id, inserts, updates, deletes })
222}
223
224/// Hash-based delta computation (original algorithm)
225///
226/// This is the fallback when PK columns are not available.
227fn compute_delta_hash_based(
228 subscription_id: SubscriptionId,
229 old: &[crate::Row],
230 new: &[crate::Row],
231) -> Option<SubscriptionUpdate> {
232 use std::collections::HashMap;
233
234 // Build hash maps for efficient lookup
235 // Map from row hash -> (count, row reference)
236 // We use count to handle duplicate rows correctly
237 let mut old_map: HashMap<u64, Vec<&crate::Row>> = HashMap::new();
238 for row in old {
239 let hash = hash_row(row);
240 old_map.entry(hash).or_default().push(row);
241 }
242
243 let mut new_map: HashMap<u64, Vec<&crate::Row>> = HashMap::new();
244 for row in new {
245 let hash = hash_row(row);
246 new_map.entry(hash).or_default().push(row);
247 }
248
249 let mut inserts = Vec::new();
250 let mut deletes = Vec::new();
251
252 // Find inserts: rows in new but not in old (or with higher count in new)
253 for (hash, new_rows) in &new_map {
254 let old_rows = old_map.get(hash).map(|v| v.as_slice()).unwrap_or(&[]);
255
256 // For each row in new that exceeds the count in old, it's an insert
257 if new_rows.len() > old_rows.len() {
258 for row in new_rows.iter().skip(old_rows.len()) {
259 inserts.push((*row).clone());
260 }
261 }
262 }
263
264 // Find deletes: rows in old but not in new (or with higher count in old)
265 for (hash, old_rows) in &old_map {
266 let new_rows = new_map.get(hash).map(|v| v.as_slice()).unwrap_or(&[]);
267
268 // For each row in old that exceeds the count in new, it's a delete
269 if old_rows.len() > new_rows.len() {
270 for row in old_rows.iter().skip(new_rows.len()) {
271 deletes.push((*row).clone());
272 }
273 }
274 }
275
276 // If no changes, return None
277 if inserts.is_empty() && deletes.is_empty() {
278 return None;
279 }
280
281 // Updates are not detected in hash-based mode
282 // A row update would appear as a delete of the old row + insert of the new row
283 let updates = Vec::new();
284
285 Some(SubscriptionUpdate::Delta { subscription_id, inserts, updates, deletes })
286}