use std::sync::Arc;
use fsqlite_types::SqliteValue;
#[derive(Debug)]
pub struct DeltaNode {
pub column_idx: u16,
pub new_value: SqliteValue,
pub prev: Option<Arc<Self>>,
}
#[derive(Debug, Clone)]
pub struct HotRowDeltaChain {
pub base: Arc<Vec<SqliteValue>>,
pub head: Option<Arc<DeltaNode>>,
pub depth: u32,
}
impl HotRowDeltaChain {
#[must_use]
pub fn new(base: Arc<Vec<SqliteValue>>) -> Self {
Self {
base,
head: None,
depth: 0,
}
}
#[must_use]
pub fn column_count(&self) -> usize {
self.base.len()
}
}
pub fn apply_update(chain: &mut HotRowDeltaChain, column_idx: u16, new_value: SqliteValue) {
debug_assert!(
(column_idx as usize) < chain.base.len(),
"delta_chain::apply_update column_idx={column_idx} out of range for base len={}",
chain.base.len()
);
let node = Arc::new(DeltaNode {
column_idx,
new_value,
prev: chain.head.take(),
});
chain.head = Some(node);
chain.depth = chain.depth.saturating_add(1);
}
#[must_use]
pub fn materialize(chain: &HotRowDeltaChain) -> Vec<SqliteValue> {
let mut row: Vec<SqliteValue> = (*chain.base).clone();
let n = row.len();
let mut written: Vec<bool> = vec![false; n];
let mut cursor = chain.head.as_ref();
while let Some(node) = cursor {
let idx = node.column_idx as usize;
if idx < n && !written[idx] {
row[idx] = node.new_value.clone();
written[idx] = true;
}
cursor = node.prev.as_ref();
}
row
}
#[must_use]
pub fn should_consolidate(chain: &HotRowDeltaChain, threshold: u32) -> bool {
chain.depth >= threshold
}
#[must_use]
pub fn consolidate(chain: &HotRowDeltaChain) -> Arc<Vec<SqliteValue>> {
Arc::new(materialize(chain))
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
fn base_row(n: usize) -> Arc<Vec<SqliteValue>> {
Arc::new(
(0..n)
.map(|i| SqliteValue::Integer(i as i64))
.collect::<Vec<_>>(),
)
}
fn values_eq(a: &[SqliteValue], b: &[SqliteValue]) -> bool {
if a.len() != b.len() {
return false;
}
a.iter().zip(b.iter()).all(|(x, y)| match (x, y) {
(SqliteValue::Null, SqliteValue::Null) => true,
(SqliteValue::Integer(xi), SqliteValue::Integer(yi)) => xi == yi,
(SqliteValue::Float(xf), SqliteValue::Float(yf)) => xf.to_bits() == yf.to_bits(),
(SqliteValue::Text(xt), SqliteValue::Text(yt)) => xt.as_str() == yt.as_str(),
(SqliteValue::Blob(xb), SqliteValue::Blob(yb)) => xb.as_ref() == yb.as_ref(),
_ => false,
})
}
#[test]
fn empty_chain_materializes_to_base() {
let base = base_row(4);
let chain = HotRowDeltaChain::new(Arc::clone(&base));
let out = materialize(&chain);
assert!(values_eq(&out, base.as_slice()));
assert_eq!(chain.depth, 0);
assert!(!should_consolidate(&chain, 1));
}
#[test]
fn column_count_reflects_base_width_and_is_stable_under_updates() {
let base = base_row(3);
let mut chain = HotRowDeltaChain::new(base);
assert_eq!(chain.column_count(), 3);
apply_update(&mut chain, 1, SqliteValue::Integer(42));
apply_update(&mut chain, 1, SqliteValue::Integer(43));
apply_update(&mut chain, 2, SqliteValue::Integer(7));
assert_eq!(chain.column_count(), 3);
assert_eq!(chain.depth, 3);
}
#[test]
fn single_update_visible_in_materialize() {
let base = base_row(3);
let mut chain = HotRowDeltaChain::new(Arc::clone(&base));
apply_update(&mut chain, 1, SqliteValue::Integer(999));
let out = materialize(&chain);
assert!(matches!(out[0], SqliteValue::Integer(0)));
assert!(matches!(out[1], SqliteValue::Integer(999)));
assert!(matches!(out[2], SqliteValue::Integer(2)));
assert_eq!(chain.depth, 1);
}
#[test]
fn repeated_updates_same_column_latest_wins() {
let base = base_row(3);
let mut chain = HotRowDeltaChain::new(Arc::clone(&base));
apply_update(&mut chain, 0, SqliteValue::Integer(10));
apply_update(&mut chain, 0, SqliteValue::Integer(20));
apply_update(&mut chain, 0, SqliteValue::Integer(30));
let out = materialize(&chain);
assert!(matches!(out[0], SqliteValue::Integer(30)));
assert!(matches!(out[1], SqliteValue::Integer(1)));
assert!(matches!(out[2], SqliteValue::Integer(2)));
assert_eq!(chain.depth, 3);
}
#[test]
fn updates_to_different_columns_all_applied() {
let base = base_row(4);
let mut chain = HotRowDeltaChain::new(Arc::clone(&base));
apply_update(&mut chain, 0, SqliteValue::Integer(100));
apply_update(&mut chain, 2, SqliteValue::Integer(200));
apply_update(&mut chain, 3, SqliteValue::Null);
let out = materialize(&chain);
assert!(matches!(out[0], SqliteValue::Integer(100)));
assert!(matches!(out[1], SqliteValue::Integer(1)));
assert!(matches!(out[2], SqliteValue::Integer(200)));
assert!(matches!(out[3], SqliteValue::Null));
assert_eq!(chain.depth, 3);
}
#[test]
fn consolidate_preserves_logical_row() {
let base = base_row(5);
let mut chain = HotRowDeltaChain::new(Arc::clone(&base));
apply_update(&mut chain, 1, SqliteValue::Integer(11));
apply_update(&mut chain, 4, SqliteValue::Integer(44));
apply_update(&mut chain, 1, SqliteValue::Integer(111));
let before = materialize(&chain);
let new_base = consolidate(&chain);
let rebuilt = HotRowDeltaChain::new(Arc::clone(&new_base));
let after = materialize(&rebuilt);
assert!(values_eq(&before, &after));
assert_eq!(rebuilt.depth, 0);
assert!(rebuilt.head.is_none());
}
#[test]
fn should_consolidate_threshold_boundary() {
let base = base_row(2);
let mut chain = HotRowDeltaChain::new(Arc::clone(&base));
assert!(!should_consolidate(&chain, 3));
apply_update(&mut chain, 0, SqliteValue::Integer(1));
apply_update(&mut chain, 0, SqliteValue::Integer(2));
assert!(!should_consolidate(&chain, 3));
apply_update(&mut chain, 0, SqliteValue::Integer(3));
assert!(should_consolidate(&chain, 3));
assert!(should_consolidate(&chain, 2));
}
#[test]
fn deep_chain_materializes_correctly() {
let base = base_row(1);
let mut chain = HotRowDeltaChain::new(Arc::clone(&base));
for v in 0..500_i64 {
apply_update(&mut chain, 0, SqliteValue::Integer(v));
}
let out = materialize(&chain);
assert!(matches!(out[0], SqliteValue::Integer(499)));
assert_eq!(chain.depth, 500);
}
proptest! {
#[test]
fn property_random_updates_match_naive_rewrite(
updates in proptest::collection::vec(
(0u16..10, any::<i64>()),
1..100,
),
) {
const COLUMNS: usize = 10;
const THRESHOLD: u32 = 5;
let base = base_row(COLUMNS);
let mut chain = HotRowDeltaChain::new(Arc::clone(&base));
let mut ground: Vec<SqliteValue> = (*base).clone();
for (col, val) in updates {
apply_update(&mut chain, col, SqliteValue::Integer(val));
ground[col as usize] = SqliteValue::Integer(val);
let materialized = materialize(&chain);
prop_assert!(values_eq(&materialized, &ground));
if should_consolidate(&chain, THRESHOLD) {
let new_base = consolidate(&chain);
chain = HotRowDeltaChain::new(new_base);
let after = materialize(&chain);
prop_assert!(values_eq(&after, &ground));
}
}
let final_row = materialize(&chain);
prop_assert!(values_eq(&final_row, &ground));
}
}
}