use super::hash::hash_row;
use super::types::{SubscriptionId, SubscriptionUpdate};
#[derive(Debug, Clone)]
pub struct PartialRowDelta {
pub column_indices: Vec<usize>,
pub old_values: Vec<vibesql_types::SqlValue>,
pub new_values: Vec<vibesql_types::SqlValue>,
}
impl PartialRowDelta {
pub fn from_rows(
old_row: &crate::Row,
new_row: &crate::Row,
pk_columns: &[usize],
) -> Option<Self> {
if old_row.values.len() != new_row.values.len() {
return None;
}
let mut changed_columns = Vec::new();
for (idx, (old_val, new_val)) in
old_row.values.iter().zip(new_row.values.iter()).enumerate()
{
if old_val != new_val {
changed_columns.push(idx);
}
}
if changed_columns.is_empty() {
return None;
}
let mut column_indices: Vec<usize> = pk_columns.to_vec();
for &idx in &changed_columns {
if !column_indices.contains(&idx) {
column_indices.push(idx);
}
}
column_indices.sort_unstable();
let old_values: Vec<vibesql_types::SqlValue> =
column_indices.iter().map(|&idx| old_row.values[idx].clone()).collect();
let new_values: Vec<vibesql_types::SqlValue> =
column_indices.iter().map(|&idx| new_row.values[idx].clone()).collect();
Some(Self { column_indices, old_values, new_values })
}
}
pub fn compute_delta(
subscription_id: SubscriptionId,
old: &[crate::Row],
new: &[crate::Row],
) -> Option<SubscriptionUpdate> {
compute_delta_with_pk(subscription_id, old, new, &[])
}
pub fn compute_delta_with_pk(
subscription_id: SubscriptionId,
old: &[crate::Row],
new: &[crate::Row],
pk_columns: &[usize],
) -> Option<SubscriptionUpdate> {
use std::collections::HashMap;
if pk_columns.is_empty() {
return compute_delta_hash_based(subscription_id, old, new);
}
let valid_pk = old.iter().chain(new.iter()).all(|row| {
pk_columns.iter().all(|&idx| idx < row.values.len())
});
if !valid_pk {
return compute_delta_hash_based(subscription_id, old, new);
}
let mut old_by_pk: HashMap<Vec<&vibesql_types::SqlValue>, Vec<(usize, &crate::Row)>> =
HashMap::new();
for (idx, row) in old.iter().enumerate() {
let pk_values: Vec<&vibesql_types::SqlValue> =
pk_columns.iter().map(|&i| &row.values[i]).collect();
old_by_pk.entry(pk_values).or_default().push((idx, row));
}
let mut inserts = Vec::new();
let mut updates: Vec<(crate::Row, crate::Row)> = Vec::new();
let mut matched_old_indices = std::collections::HashSet::new();
for new_row in new {
let pk_values: Vec<&vibesql_types::SqlValue> =
pk_columns.iter().map(|&i| &new_row.values[i]).collect();
if let Some(old_rows) = old_by_pk.get_mut(&pk_values) {
if let Some((old_idx, old_row)) = old_rows.pop() {
matched_old_indices.insert(old_idx);
if old_row.values != new_row.values {
updates.push((old_row.clone(), new_row.clone()));
}
} else {
inserts.push(new_row.clone());
}
} else {
inserts.push(new_row.clone());
}
}
let deletes: Vec<crate::Row> = old
.iter()
.enumerate()
.filter(|(idx, _)| !matched_old_indices.contains(idx))
.map(|(_, row)| row.clone())
.collect();
if inserts.is_empty() && updates.is_empty() && deletes.is_empty() {
return None;
}
Some(SubscriptionUpdate::Delta { subscription_id, inserts, updates, deletes })
}
fn compute_delta_hash_based(
subscription_id: SubscriptionId,
old: &[crate::Row],
new: &[crate::Row],
) -> Option<SubscriptionUpdate> {
use std::collections::HashMap;
let mut old_map: HashMap<u64, Vec<&crate::Row>> = HashMap::new();
for row in old {
let hash = hash_row(row);
old_map.entry(hash).or_default().push(row);
}
let mut new_map: HashMap<u64, Vec<&crate::Row>> = HashMap::new();
for row in new {
let hash = hash_row(row);
new_map.entry(hash).or_default().push(row);
}
let mut inserts = Vec::new();
let mut deletes = Vec::new();
for (hash, new_rows) in &new_map {
let old_rows = old_map.get(hash).map(|v| v.as_slice()).unwrap_or(&[]);
if new_rows.len() > old_rows.len() {
for row in new_rows.iter().skip(old_rows.len()) {
inserts.push((*row).clone());
}
}
}
for (hash, old_rows) in &old_map {
let new_rows = new_map.get(hash).map(|v| v.as_slice()).unwrap_or(&[]);
if old_rows.len() > new_rows.len() {
for row in old_rows.iter().skip(new_rows.len()) {
deletes.push((*row).clone());
}
}
}
if inserts.is_empty() && deletes.is_empty() {
return None;
}
let updates = Vec::new();
Some(SubscriptionUpdate::Delta { subscription_id, inserts, updates, deletes })
}