mod config;
mod delta;
pub mod error;
pub mod filter;
mod hash;
mod manager;
pub mod pk_detector;
mod router;
mod selective;
pub mod session;
mod table_dependencies;
mod table_extract;
mod types;
pub use config::{SubscriptionConfig, SubscriptionRetryPolicy};
pub use delta::{compute_delta, compute_delta_with_pk, PartialRowDelta};
pub use error::{classify_error, classify_error_str, SubscriptionErrorKind};
pub use hash::hash_rows;
pub use manager::SubscriptionManager;
pub use pk_detector::{
detect_pk_columns, detect_pk_columns_from_stmt, PkDetectionFailureReason, PkDetectionResult,
};
pub use router::{ChangeRouter, SubscriptionUpdate as RouterUpdate};
pub use selective::{
compute_column_diff, create_partial_row_update, create_partial_row_update_with_metrics,
should_use_selective_update, should_use_selective_update_with_metrics, ColumnDiff,
SelectiveColumnConfig,
};
pub use table_dependencies::extract_table_dependencies;
pub use table_extract::extract_table_refs;
pub use types::{
Subscription, SubscriptionError, SubscriptionId, SubscriptionMetrics, SubscriptionUpdate,
};
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_subscription_id_uniqueness() {
let id1 = SubscriptionId::new();
let id2 = SubscriptionId::new();
let id3 = SubscriptionId::new();
assert_ne!(id1, id2);
assert_ne!(id2, id3);
assert_ne!(id1, id3);
}
#[test]
fn test_subscription_id_display() {
let id = SubscriptionId::new();
let display = format!("{}", id);
assert!(display.starts_with("sub-"));
}
#[test]
fn test_hash_rows_empty() {
let rows: Vec<crate::Row> = vec![];
let hash = hash_rows(&rows);
assert_eq!(hash, hash_rows(&[]));
}
#[test]
fn test_hash_rows_different_content() {
use vibesql_types::SqlValue;
let rows1 = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("hello"))],
}];
let rows2 = vec![crate::Row {
values: vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("hello"))],
}];
let hash1 = hash_rows(&rows1);
let hash2 = hash_rows(&rows2);
assert_ne!(hash1, hash2);
}
#[test]
fn test_hash_rows_same_content() {
use vibesql_types::SqlValue;
let rows1 = vec![crate::Row {
values: vec![SqlValue::Integer(42), SqlValue::Varchar(arcstr::ArcStr::from("test"))],
}];
let rows2 = vec![crate::Row {
values: vec![SqlValue::Integer(42), SqlValue::Varchar(arcstr::ArcStr::from("test"))],
}];
let hash1 = hash_rows(&rows1);
let hash2 = hash_rows(&rows2);
assert_eq!(hash1, hash2);
}
#[test]
fn test_compute_delta_no_changes() {
use vibesql_types::SqlValue;
let rows = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
},
crate::Row { values: vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))] },
];
let test_id = SubscriptionId::new();
let delta = compute_delta(test_id, &rows, &rows);
assert!(delta.is_none());
}
#[test]
fn test_compute_delta_single_insert() {
use vibesql_types::SqlValue;
let old = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
}];
let new = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
},
crate::Row { values: vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))] },
];
let test_id = SubscriptionId::new();
let delta = compute_delta(test_id, &old, &new);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes, .. } => {
assert_eq!(inserts.len(), 1);
assert_eq!(inserts[0].values[0], SqlValue::Integer(2));
assert_eq!(inserts[0].values[1], SqlValue::Varchar(arcstr::ArcStr::from("Bob")));
assert!(updates.is_empty());
assert!(deletes.is_empty());
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_single_delete() {
use vibesql_types::SqlValue;
let old = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
},
crate::Row { values: vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))] },
];
let new = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
}];
let test_id = SubscriptionId::new();
let delta = compute_delta(test_id, &old, &new);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes, .. } => {
assert!(inserts.is_empty());
assert!(updates.is_empty());
assert_eq!(deletes.len(), 1);
assert_eq!(deletes[0].values[0], SqlValue::Integer(2));
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_insert_and_delete() {
use vibesql_types::SqlValue;
let old = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
}];
let new = vec![crate::Row {
values: vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))],
}];
let test_id = SubscriptionId::new();
let delta = compute_delta(test_id, &old, &new);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes, .. } => {
assert_eq!(inserts.len(), 1);
assert_eq!(deletes.len(), 1);
assert!(updates.is_empty());
assert_eq!(inserts[0].values[0], SqlValue::Integer(2));
assert_eq!(deletes[0].values[0], SqlValue::Integer(1));
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_empty_to_rows() {
use vibesql_types::SqlValue;
let old: Vec<crate::Row> = vec![];
let new = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
},
crate::Row { values: vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))] },
];
let test_id = SubscriptionId::new();
let delta = compute_delta(test_id, &old, &new);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes, .. } => {
assert_eq!(inserts.len(), 2);
assert!(updates.is_empty());
assert!(deletes.is_empty());
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_rows_to_empty() {
use vibesql_types::SqlValue;
let old = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
},
crate::Row { values: vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))] },
];
let new: Vec<crate::Row> = vec![];
let test_id = SubscriptionId::new();
let delta = compute_delta(test_id, &old, &new);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes, .. } => {
assert!(inserts.is_empty());
assert!(updates.is_empty());
assert_eq!(deletes.len(), 2);
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_duplicate_rows() {
use vibesql_types::SqlValue;
let old = vec![
crate::Row { values: vec![SqlValue::Integer(1)] },
crate::Row { values: vec![SqlValue::Integer(1)] },
];
let new = vec![
crate::Row { values: vec![SqlValue::Integer(1)] },
crate::Row { values: vec![SqlValue::Integer(1)] },
crate::Row { values: vec![SqlValue::Integer(1)] },
];
let test_id = SubscriptionId::new();
let delta = compute_delta(test_id, &old, &new);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes, .. } => {
assert_eq!(inserts.len(), 1);
assert!(updates.is_empty());
assert!(deletes.is_empty());
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_with_pk_detects_update() {
use vibesql_types::SqlValue;
let old = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
}];
let new = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))],
}];
let test_id = SubscriptionId::new();
let pk_columns = vec![0]; let delta = compute_delta_with_pk(test_id, &old, &new, &pk_columns);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes, .. } => {
assert!(inserts.is_empty());
assert_eq!(updates.len(), 1);
assert!(deletes.is_empty());
let (old_row, new_row) = &updates[0];
assert_eq!(old_row.values[0], SqlValue::Integer(1));
assert_eq!(old_row.values[1], SqlValue::Varchar(arcstr::ArcStr::from("Alice")));
assert_eq!(new_row.values[0], SqlValue::Integer(1));
assert_eq!(new_row.values[1], SqlValue::Varchar(arcstr::ArcStr::from("Bob")));
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_with_pk_insert_and_delete() {
use vibesql_types::SqlValue;
let old = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
}];
let new = vec![crate::Row {
values: vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))],
}];
let test_id = SubscriptionId::new();
let pk_columns = vec![0];
let delta = compute_delta_with_pk(test_id, &old, &new, &pk_columns);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes, .. } => {
assert_eq!(inserts.len(), 1);
assert!(updates.is_empty());
assert_eq!(deletes.len(), 1);
assert_eq!(inserts[0].values[0], SqlValue::Integer(2));
assert_eq!(deletes[0].values[0], SqlValue::Integer(1));
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_with_pk_no_changes() {
use vibesql_types::SqlValue;
let rows = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
},
crate::Row { values: vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))] },
];
let test_id = SubscriptionId::new();
let pk_columns = vec![0];
let delta = compute_delta_with_pk(test_id, &rows, &rows, &pk_columns);
assert!(delta.is_none());
}
#[test]
fn test_compute_delta_with_pk_multiple_updates() {
use vibesql_types::SqlValue;
let old = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
},
crate::Row { values: vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))] },
];
let new = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("ALICE"))],
},
crate::Row { values: vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("BOB"))] },
];
let test_id = SubscriptionId::new();
let pk_columns = vec![0];
let delta = compute_delta_with_pk(test_id, &old, &new, &pk_columns);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes, .. } => {
assert!(inserts.is_empty());
assert_eq!(updates.len(), 2);
assert!(deletes.is_empty());
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_with_pk_composite_pk() {
use vibesql_types::SqlValue;
let old = vec![crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Integer(100),
SqlValue::Varchar(arcstr::ArcStr::from("pending")),
],
}];
let new = vec![crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Integer(100),
SqlValue::Varchar(arcstr::ArcStr::from("shipped")),
],
}];
let test_id = SubscriptionId::new();
let pk_columns = vec![0, 1]; let delta = compute_delta_with_pk(test_id, &old, &new, &pk_columns);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes, .. } => {
assert!(inserts.is_empty());
assert_eq!(updates.len(), 1);
assert!(deletes.is_empty());
let (_, new_row) = &updates[0];
assert_eq!(new_row.values[2], SqlValue::Varchar(arcstr::ArcStr::from("shipped")));
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_with_pk_empty_fallback() {
use vibesql_types::SqlValue;
let old = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
}];
let new = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))],
}];
let test_id = SubscriptionId::new();
let delta = compute_delta_with_pk(test_id, &old, &new, &[]);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes, .. } => {
assert_eq!(inserts.len(), 1);
assert!(updates.is_empty());
assert_eq!(deletes.len(), 1);
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_with_pk_mixed_operations() {
use vibesql_types::SqlValue;
let old = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
},
crate::Row { values: vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))] },
crate::Row {
values: vec![SqlValue::Integer(3), SqlValue::Varchar(arcstr::ArcStr::from("Charlie"))],
},
];
let new = vec![
crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("ALICE"))],
}, crate::Row {
values: vec![SqlValue::Integer(3), SqlValue::Varchar(arcstr::ArcStr::from("Charlie"))],
}, crate::Row {
values: vec![SqlValue::Integer(4), SqlValue::Varchar(arcstr::ArcStr::from("Diana"))],
}, ];
let test_id = SubscriptionId::new();
let pk_columns = vec![0];
let delta = compute_delta_with_pk(test_id, &old, &new, &pk_columns);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes, .. } => {
assert_eq!(inserts.len(), 1);
assert_eq!(updates.len(), 1);
assert_eq!(deletes.len(), 1);
assert_eq!(inserts[0].values[0], SqlValue::Integer(4));
let (old_row, new_row) = &updates[0];
assert_eq!(old_row.values[1], SqlValue::Varchar(arcstr::ArcStr::from("Alice")));
assert_eq!(new_row.values[1], SqlValue::Varchar(arcstr::ArcStr::from("ALICE")));
assert_eq!(deletes[0].values[0], SqlValue::Integer(2));
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_delta_with_pk_out_of_bounds_fallback() {
use vibesql_types::SqlValue;
let old = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
}];
let new = vec![crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))],
}];
let test_id = SubscriptionId::new();
let pk_columns = vec![5]; let delta = compute_delta_with_pk(test_id, &old, &new, &pk_columns);
assert!(delta.is_some());
match delta.unwrap() {
SubscriptionUpdate::Delta { inserts, updates, deletes, .. } => {
assert_eq!(inserts.len(), 1);
assert!(updates.is_empty());
assert_eq!(deletes.len(), 1);
}
_ => panic!("Expected Delta update"),
}
}
#[test]
fn test_compute_column_diff_no_changes() {
use vibesql_types::SqlValue;
let old = crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
};
let new = crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
};
let diff = compute_column_diff(&old, &new, &[0]);
assert!(diff.is_none());
}
#[test]
fn test_compute_column_diff_single_column_change() {
use vibesql_types::SqlValue;
let old = crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
};
let new = crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Bob"))],
};
let diff = compute_column_diff(&old, &new, &[0]).unwrap();
assert_eq!(diff.changed_columns, vec![1]);
assert_eq!(diff.included_columns, vec![0, 1]);
}
#[test]
fn test_compute_column_diff_multiple_columns_change() {
use vibesql_types::SqlValue;
let old = crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Varchar(arcstr::ArcStr::from("Alice")),
SqlValue::Integer(100),
SqlValue::Varchar(arcstr::ArcStr::from("active")),
],
};
let new = crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Varchar(arcstr::ArcStr::from("Bob")),
SqlValue::Integer(100),
SqlValue::Varchar(arcstr::ArcStr::from("inactive")),
],
};
let diff = compute_column_diff(&old, &new, &[0]).unwrap();
assert_eq!(diff.changed_columns, vec![1, 3]);
assert_eq!(diff.included_columns, vec![0, 1, 3]);
}
#[test]
fn test_compute_column_diff_pk_column_changed() {
use vibesql_types::SqlValue;
let old = crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
};
let new = crate::Row {
values: vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
};
let diff = compute_column_diff(&old, &new, &[0]).unwrap();
assert_eq!(diff.changed_columns, vec![0]);
assert_eq!(diff.included_columns, vec![0]);
}
#[test]
fn test_compute_column_diff_null_handling() {
use vibesql_types::SqlValue;
let old = crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
};
let new = crate::Row { values: vec![SqlValue::Integer(1), SqlValue::Null] };
let diff = compute_column_diff(&old, &new, &[0]).unwrap();
assert_eq!(diff.changed_columns, vec![1]);
assert_eq!(diff.included_columns, vec![0, 1]);
}
#[test]
fn test_should_use_selective_update_enabled() {
let diff = ColumnDiff { changed_columns: vec![1], included_columns: vec![0, 1] };
let config =
SelectiveColumnConfig { enabled: true, pk_columns: vec![0], ..Default::default() };
assert!(should_use_selective_update(&diff, 10, &config));
}
#[test]
fn test_should_use_selective_update_disabled() {
let diff = ColumnDiff { changed_columns: vec![1], included_columns: vec![0, 1] };
let config =
SelectiveColumnConfig { enabled: false, pk_columns: vec![0], ..Default::default() };
assert!(!should_use_selective_update(&diff, 10, &config));
}
#[test]
fn test_should_use_selective_update_too_many_changes() {
let diff = ColumnDiff {
changed_columns: vec![1, 2, 3, 4, 5, 6],
included_columns: vec![0, 1, 2, 3, 4, 5, 6],
};
let config = SelectiveColumnConfig {
enabled: true,
pk_columns: vec![0],
max_changed_columns_ratio: 0.5,
..Default::default()
};
assert!(!should_use_selective_update(&diff, 10, &config));
}
#[test]
fn test_create_partial_row_update() {
let old_row =
vec![Some(b"1".to_vec()), Some(b"Alice".to_vec()), Some(b"100".to_vec())];
let new_row =
vec![Some(b"1".to_vec()), Some(b"Bob".to_vec()), Some(b"100".to_vec())];
let config = SelectiveColumnConfig {
enabled: true,
pk_columns: vec![0],
max_changed_columns_ratio: 0.5,
..Default::default()
};
let partial = create_partial_row_update(&old_row, &new_row, &[0], &config).unwrap();
assert_eq!(partial.total_columns, 3);
assert!(partial.is_column_present(0));
assert!(partial.is_column_present(1));
assert!(!partial.is_column_present(2));
assert_eq!(partial.present_column_count(), 2);
assert_eq!(partial.values.len(), 2);
assert_eq!(partial.values[0], Some(b"1".to_vec()));
assert_eq!(partial.values[1], Some(b"Bob".to_vec()));
}
#[test]
fn test_create_partial_row_update_null_change() {
let old_row = vec![Some(b"1".to_vec()), Some(b"Alice".to_vec())];
let new_row = vec![Some(b"1".to_vec()), None];
let config =
SelectiveColumnConfig { enabled: true, pk_columns: vec![0], ..Default::default() };
let partial = create_partial_row_update(&old_row, &new_row, &[0], &config).unwrap();
assert_eq!(partial.total_columns, 2);
assert!(partial.is_column_present(0));
assert!(partial.is_column_present(1));
assert_eq!(partial.values.len(), 2);
assert_eq!(partial.values[0], Some(b"1".to_vec()));
assert_eq!(partial.values[1], None); }
#[test]
fn test_create_partial_row_update_no_changes() {
let old_row = vec![Some(b"1".to_vec()), Some(b"Alice".to_vec())];
let new_row = vec![Some(b"1".to_vec()), Some(b"Alice".to_vec())];
let config =
SelectiveColumnConfig { enabled: true, pk_columns: vec![0], ..Default::default() };
let partial = create_partial_row_update(&old_row, &new_row, &[0], &config);
assert!(partial.is_none());
}
#[test]
fn test_partial_row_update_column_mask() {
use crate::protocol::messages::PartialRowUpdate;
let partial = PartialRowUpdate::new(
10,
&[0, 3, 7],
vec![Some(b"a".to_vec()), Some(b"b".to_vec()), Some(b"c".to_vec())],
);
assert_eq!(partial.total_columns, 10);
assert_eq!(partial.column_mask.len(), 2);
assert!(partial.is_column_present(0));
assert!(!partial.is_column_present(1));
assert!(!partial.is_column_present(2));
assert!(partial.is_column_present(3));
assert!(!partial.is_column_present(4));
assert!(!partial.is_column_present(5));
assert!(!partial.is_column_present(6));
assert!(partial.is_column_present(7));
assert!(!partial.is_column_present(8));
assert!(!partial.is_column_present(9));
assert!(!partial.is_column_present(10));
assert_eq!(partial.present_column_count(), 3);
}
#[test]
fn test_delta_updates_produce_partial_row_updates() {
use vibesql_types::SqlValue;
let test_id = SubscriptionId::new();
let old = vec![crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Varchar(arcstr::ArcStr::from("Alice")),
SqlValue::Integer(100),
],
}];
let new = vec![crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Varchar(arcstr::ArcStr::from("Alice")),
SqlValue::Integer(150),
],
}];
let pk_columns = vec![0]; let delta = compute_delta_with_pk(test_id, &old, &new, &pk_columns);
if let SubscriptionUpdate::Delta { updates, inserts, deletes, .. } = delta.unwrap() {
assert!(inserts.is_empty(), "Should not have inserts");
assert!(deletes.is_empty(), "Should not have deletes");
assert_eq!(updates.len(), 1, "Should have one update");
let (old_row, new_row) = &updates[0];
let old_wire: Vec<Option<Vec<u8>>> =
old_row.values.iter().map(|v| Some(v.to_string().as_bytes().to_vec())).collect();
let new_wire: Vec<Option<Vec<u8>>> =
new_row.values.iter().map(|v| Some(v.to_string().as_bytes().to_vec())).collect();
let config =
SelectiveColumnConfig { enabled: true, pk_columns: vec![0], ..Default::default() };
let partial = create_partial_row_update(&old_wire, &new_wire, &[0], &config);
assert!(partial.is_some(), "Should produce partial row update");
let partial = partial.unwrap();
assert_eq!(partial.total_columns, 3);
assert!(partial.is_column_present(0), "PK column should be present");
assert!(!partial.is_column_present(1), "Unchanged column should not be present");
assert!(partial.is_column_present(2), "Changed column should be present");
assert_eq!(partial.values.len(), 2); assert_eq!(partial.values[0], Some(b"1".to_vec())); assert_eq!(partial.values[1], Some(b"150".to_vec())); } else {
panic!("Expected Delta, got something else");
}
}
#[test]
fn test_delta_updates_fallback_to_full_row_when_too_many_changes() {
use vibesql_types::SqlValue;
let test_id = SubscriptionId::new();
let old = vec![crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Varchar(arcstr::ArcStr::from("Alice")),
SqlValue::Varchar(arcstr::ArcStr::from("alice@old.com")),
],
}];
let new = vec![crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Varchar(arcstr::ArcStr::from("Bob")),
SqlValue::Varchar(arcstr::ArcStr::from("bob@new.com")),
],
}];
let pk_columns = vec![0];
let delta = compute_delta_with_pk(test_id, &old, &new, &pk_columns);
if let SubscriptionUpdate::Delta { updates, .. } = delta.unwrap() {
assert_eq!(updates.len(), 1);
let (old_row, new_row) = &updates[0];
let old_wire: Vec<Option<Vec<u8>>> =
old_row.values.iter().map(|v| Some(v.to_string().as_bytes().to_vec())).collect();
let new_wire: Vec<Option<Vec<u8>>> =
new_row.values.iter().map(|v| Some(v.to_string().as_bytes().to_vec())).collect();
let config = SelectiveColumnConfig {
enabled: true,
pk_columns: vec![0],
max_changed_columns_ratio: 0.3,
..Default::default()
};
let partial = create_partial_row_update(&old_wire, &new_wire, &[0], &config);
assert!(partial.is_none(), "Should fall back to full row when too many columns change");
} else {
panic!("Expected Delta");
}
}
#[test]
fn test_should_use_selective_update_below_min_changed_columns() {
let diff = ColumnDiff { changed_columns: vec![1], included_columns: vec![0, 1] };
let config = SelectiveColumnConfig {
enabled: true,
pk_columns: vec![0],
min_changed_columns: 2, max_changed_columns_ratio: 0.5,
};
assert!(!should_use_selective_update(&diff, 10, &config));
}
#[test]
fn test_should_use_selective_update_at_min_changed_columns() {
let diff = ColumnDiff { changed_columns: vec![1, 2], included_columns: vec![0, 1, 2] };
let config = SelectiveColumnConfig {
enabled: true,
pk_columns: vec![0],
min_changed_columns: 2, max_changed_columns_ratio: 0.5,
};
assert!(should_use_selective_update(&diff, 10, &config));
}
#[test]
fn test_should_use_selective_update_at_max_ratio() {
let diff = ColumnDiff {
changed_columns: vec![1, 2, 3, 4, 5],
included_columns: vec![0, 1, 2, 3, 4, 5],
};
let config = SelectiveColumnConfig {
enabled: true,
pk_columns: vec![0],
min_changed_columns: 1,
max_changed_columns_ratio: 0.5, };
assert!(should_use_selective_update(&diff, 10, &config));
}
#[test]
fn test_create_partial_row_update_all_columns_changed() {
let old_row = vec![Some(b"1".to_vec()), Some(b"Alice".to_vec()), Some(b"100".to_vec())];
let new_row = vec![Some(b"2".to_vec()), Some(b"Bob".to_vec()), Some(b"200".to_vec())];
let config = SelectiveColumnConfig {
enabled: true,
pk_columns: vec![0],
min_changed_columns: 1,
max_changed_columns_ratio: 0.5,
};
let partial = create_partial_row_update(&old_row, &new_row, &[0], &config);
assert!(partial.is_none());
}
#[test]
fn test_create_partial_row_update_empty_pk_columns() {
let old_row = vec![Some(b"1".to_vec()), Some(b"Alice".to_vec()), Some(b"100".to_vec())];
let new_row = vec![Some(b"1".to_vec()), Some(b"Bob".to_vec()), Some(b"100".to_vec())];
let config = SelectiveColumnConfig {
enabled: true,
pk_columns: vec![],
min_changed_columns: 1,
max_changed_columns_ratio: 0.5,
};
let partial = create_partial_row_update(&old_row, &new_row, &[], &config).unwrap();
assert_eq!(partial.total_columns, 3);
assert!(!partial.is_column_present(0));
assert!(partial.is_column_present(1));
assert!(!partial.is_column_present(2));
assert_eq!(partial.present_column_count(), 1);
assert_eq!(partial.values.len(), 1);
assert_eq!(partial.values[0], Some(b"Bob".to_vec()));
}
#[test]
fn test_create_partial_row_update_disabled_returns_none() {
let old_row = vec![Some(b"1".to_vec()), Some(b"Alice".to_vec())];
let new_row = vec![Some(b"1".to_vec()), Some(b"Bob".to_vec())];
let config = SelectiveColumnConfig {
enabled: false, pk_columns: vec![0],
min_changed_columns: 1,
max_changed_columns_ratio: 0.5,
};
let partial = create_partial_row_update(&old_row, &new_row, &[0], &config);
assert!(partial.is_none());
}
#[test]
fn test_create_partial_row_update_different_row_lengths() {
let old_row = vec![Some(b"1".to_vec()), Some(b"Alice".to_vec())];
let new_row = vec![Some(b"1".to_vec()), Some(b"Bob".to_vec()), Some(b"extra".to_vec())];
let config =
SelectiveColumnConfig { enabled: true, pk_columns: vec![0], ..Default::default() };
let partial = create_partial_row_update(&old_row, &new_row, &[0], &config);
assert!(partial.is_none());
}
#[test]
fn test_compute_column_diff_different_row_lengths() {
use vibesql_types::SqlValue;
let old = crate::Row { values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))] };
let new = crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Varchar(arcstr::ArcStr::from("Alice")),
SqlValue::Integer(100),
],
};
let diff = compute_column_diff(&old, &new, &[0]);
assert!(diff.is_none());
}
#[test]
fn test_compute_column_diff_composite_pk() {
use vibesql_types::SqlValue;
let old = crate::Row {
values: vec![
SqlValue::Integer(1), SqlValue::Integer(100), SqlValue::Varchar(arcstr::ArcStr::from("Alice")),
SqlValue::Integer(50),
],
};
let new = crate::Row {
values: vec![
SqlValue::Integer(1), SqlValue::Integer(100), SqlValue::Varchar(arcstr::ArcStr::from("Bob")), SqlValue::Integer(50), ],
};
let diff = compute_column_diff(&old, &new, &[0, 1]).unwrap();
assert_eq!(diff.changed_columns, vec![2]); assert_eq!(diff.included_columns, vec![0, 1, 2]);
}
#[test]
fn test_create_partial_row_update_composite_pk() {
let old_row = vec![
Some(b"1".to_vec()), Some(b"100".to_vec()), Some(b"Alice".to_vec()), Some(b"50".to_vec()), ];
let new_row = vec![
Some(b"1".to_vec()), Some(b"100".to_vec()), Some(b"Bob".to_vec()), Some(b"50".to_vec()), ];
let config = SelectiveColumnConfig {
enabled: true,
pk_columns: vec![0, 1],
min_changed_columns: 1,
max_changed_columns_ratio: 0.5,
};
let partial = create_partial_row_update(&old_row, &new_row, &[0, 1], &config).unwrap();
assert_eq!(partial.total_columns, 4);
assert!(partial.is_column_present(0));
assert!(partial.is_column_present(1));
assert!(partial.is_column_present(2));
assert!(!partial.is_column_present(3));
assert_eq!(partial.present_column_count(), 3);
}
#[test]
fn test_partial_row_update_large_column_count() {
use crate::protocol::messages::PartialRowUpdate;
let partial = PartialRowUpdate::new(
20,
&[0, 7, 8, 15, 16], vec![
Some(b"a".to_vec()),
Some(b"b".to_vec()),
Some(b"c".to_vec()),
Some(b"d".to_vec()),
Some(b"e".to_vec()),
],
);
assert_eq!(partial.total_columns, 20);
assert_eq!(partial.column_mask.len(), 3);
assert!(partial.is_column_present(0)); assert!(partial.is_column_present(7)); assert!(partial.is_column_present(8)); assert!(partial.is_column_present(15)); assert!(partial.is_column_present(16)); assert!(!partial.is_column_present(19)); assert_eq!(partial.present_column_count(), 5);
}
#[test]
fn test_compute_column_diff_null_to_value() {
use vibesql_types::SqlValue;
let old = crate::Row { values: vec![SqlValue::Integer(1), SqlValue::Null] };
let new = crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
};
let diff = compute_column_diff(&old, &new, &[0]).unwrap();
assert_eq!(diff.changed_columns, vec![1]);
assert_eq!(diff.included_columns, vec![0, 1]);
}
#[test]
fn test_compute_column_diff_value_to_null() {
use vibesql_types::SqlValue;
let old = crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
};
let new = crate::Row { values: vec![SqlValue::Integer(1), SqlValue::Null] };
let diff = compute_column_diff(&old, &new, &[0]).unwrap();
assert_eq!(diff.changed_columns, vec![1]);
assert_eq!(diff.included_columns, vec![0, 1]);
}
#[test]
fn test_create_partial_row_update_null_to_value() {
let old_row = vec![Some(b"1".to_vec()), None]; let new_row = vec![Some(b"1".to_vec()), Some(b"Alice".to_vec())];
let config =
SelectiveColumnConfig { enabled: true, pk_columns: vec![0], ..Default::default() };
let partial = create_partial_row_update(&old_row, &new_row, &[0], &config).unwrap();
assert_eq!(partial.total_columns, 2);
assert!(partial.is_column_present(0));
assert!(partial.is_column_present(1));
assert_eq!(partial.values.len(), 2);
assert_eq!(partial.values[0], Some(b"1".to_vec()));
assert_eq!(partial.values[1], Some(b"Alice".to_vec())); }
#[test]
fn test_partial_row_delta_from_rows_single_column_change() {
use vibesql_types::SqlValue;
let old_row = crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Varchar(arcstr::ArcStr::from("Alice")),
SqlValue::Integer(100),
],
};
let new_row = crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Varchar(arcstr::ArcStr::from("Alice")),
SqlValue::Integer(150),
],
};
let pk_columns = vec![0];
let delta = PartialRowDelta::from_rows(&old_row, &new_row, &pk_columns);
assert!(delta.is_some());
let delta = delta.unwrap();
assert_eq!(delta.column_indices, vec![0, 2]);
assert_eq!(delta.old_values, vec![SqlValue::Integer(1), SqlValue::Integer(100)]);
assert_eq!(delta.new_values, vec![SqlValue::Integer(1), SqlValue::Integer(150)]);
}
#[test]
fn test_partial_row_delta_from_rows_multiple_column_changes() {
use vibesql_types::SqlValue;
let old_row = crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Varchar(arcstr::ArcStr::from("Alice")),
SqlValue::Integer(100),
SqlValue::Varchar(arcstr::ArcStr::from("active")),
],
};
let new_row = crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Varchar(arcstr::ArcStr::from("Bob")),
SqlValue::Integer(100),
SqlValue::Varchar(arcstr::ArcStr::from("inactive")),
],
};
let pk_columns = vec![0];
let delta = PartialRowDelta::from_rows(&old_row, &new_row, &pk_columns);
assert!(delta.is_some());
let delta = delta.unwrap();
assert_eq!(delta.column_indices, vec![0, 1, 3]);
assert_eq!(
delta.old_values,
vec![
SqlValue::Integer(1),
SqlValue::Varchar(arcstr::ArcStr::from("Alice")),
SqlValue::Varchar(arcstr::ArcStr::from("active"))
]
);
assert_eq!(
delta.new_values,
vec![
SqlValue::Integer(1),
SqlValue::Varchar(arcstr::ArcStr::from("Bob")),
SqlValue::Varchar(arcstr::ArcStr::from("inactive"))
]
);
}
#[test]
fn test_partial_row_delta_from_rows_no_changes() {
use vibesql_types::SqlValue;
let row = crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
};
let pk_columns = vec![0];
let delta = PartialRowDelta::from_rows(&row, &row, &pk_columns);
assert!(delta.is_none(), "Should return None when rows are identical");
}
#[test]
fn test_partial_row_delta_from_rows_pk_column_changed() {
use vibesql_types::SqlValue;
let old_row = crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
};
let new_row = crate::Row {
values: vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
};
let pk_columns = vec![0];
let delta = PartialRowDelta::from_rows(&old_row, &new_row, &pk_columns);
assert!(delta.is_some());
let delta = delta.unwrap();
assert_eq!(delta.column_indices, vec![0]);
assert_eq!(delta.old_values, vec![SqlValue::Integer(1)]);
assert_eq!(delta.new_values, vec![SqlValue::Integer(2)]);
}
#[test]
fn test_partial_row_delta_from_rows_null_handling() {
use vibesql_types::SqlValue;
let old_row = crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
};
let new_row = crate::Row { values: vec![SqlValue::Integer(1), SqlValue::Null] };
let pk_columns = vec![0];
let delta = PartialRowDelta::from_rows(&old_row, &new_row, &pk_columns);
assert!(delta.is_some());
let delta = delta.unwrap();
assert_eq!(delta.column_indices, vec![0, 1]);
assert_eq!(delta.new_values, vec![SqlValue::Integer(1), SqlValue::Null]);
}
#[test]
fn test_partial_row_delta_from_rows_composite_pk() {
use vibesql_types::SqlValue;
let old_row = crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Integer(100),
SqlValue::Varchar(arcstr::ArcStr::from("old")),
],
};
let new_row = crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Integer(100),
SqlValue::Varchar(arcstr::ArcStr::from("new")),
],
};
let pk_columns = vec![0, 1]; let delta = PartialRowDelta::from_rows(&old_row, &new_row, &pk_columns);
assert!(delta.is_some());
let delta = delta.unwrap();
assert_eq!(delta.column_indices, vec![0, 1, 2]);
assert_eq!(
delta.old_values,
vec![
SqlValue::Integer(1),
SqlValue::Integer(100),
SqlValue::Varchar(arcstr::ArcStr::from("old"))
]
);
assert_eq!(
delta.new_values,
vec![
SqlValue::Integer(1),
SqlValue::Integer(100),
SqlValue::Varchar(arcstr::ArcStr::from("new"))
]
);
}
#[test]
fn test_partial_row_delta_from_rows_different_column_count() {
use vibesql_types::SqlValue;
let old_row = crate::Row {
values: vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("Alice"))],
};
let new_row = crate::Row {
values: vec![
SqlValue::Integer(1),
SqlValue::Varchar(arcstr::ArcStr::from("Alice")),
SqlValue::Integer(100),
],
};
let pk_columns = vec![0];
let delta = PartialRowDelta::from_rows(&old_row, &new_row, &pk_columns);
assert!(delta.is_none(), "Should return None when column counts differ");
}
#[test]
fn test_subscription_update_partial_subscription_id() {
let test_id = SubscriptionId::new();
let update = SubscriptionUpdate::Partial { subscription_id: test_id, updates: vec![] };
assert_eq!(update.subscription_id(), test_id);
}
}