use std::sync::Arc;
use fsqlite_types::TypeAffinity;
use fsqlite_types::glossary::{ColumnIdx, IndexId, IntentOpKind, RebaseExpr, RowId, TableId};
use fsqlite_types::record::{parse_record, serialize_record};
use fsqlite_types::value::SqliteValue;
const BEAD_ID: &str = "bd-zj56";
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IndexRegenError {
UniqueConstraintViolation {
index_id: IndexId,
conflicting_rowid: RowId,
},
MalformedRecord,
ExprEvalError { detail: &'static str },
}
impl std::fmt::Display for IndexRegenError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UniqueConstraintViolation {
index_id,
conflicting_rowid,
} => write!(
f,
"UNIQUE constraint failed: index {} has conflicting rowid {}",
index_id.get(),
conflicting_rowid.get()
),
Self::MalformedRecord => write!(f, "malformed record bytes"),
Self::ExprEvalError { detail } => write!(f, "expression eval error: {detail}"),
}
}
}
impl std::error::Error for IndexRegenError {}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
pub enum Collation {
#[default]
Binary,
Nocase,
Rtrim,
}
#[derive(Debug, Clone)]
pub enum IndexKeyPart {
Column {
col_idx: ColumnIdx,
affinity: TypeAffinity,
collation: Collation,
},
Expression {
expr: RebaseExpr,
affinity: TypeAffinity,
collation: Collation,
},
}
#[derive(Debug, Clone)]
pub struct IndexDef {
pub index_id: IndexId,
pub table_id: TableId,
pub unique: bool,
pub key_parts: Vec<IndexKeyPart>,
pub where_predicate: Option<RebaseExpr>,
pub table_column_affinities: Vec<TypeAffinity>,
}
pub trait UniqueChecker {
fn check_unique(
&self,
index_id: IndexId,
key_bytes: &[u8],
exclude_rowid: RowId,
) -> Option<RowId>;
}
pub struct NoOpUniqueChecker;
impl UniqueChecker for NoOpUniqueChecker {
fn check_unique(
&self,
_index_id: IndexId,
_key_bytes: &[u8],
_exclude_rowid: RowId,
) -> Option<RowId> {
None
}
}
pub fn eval_rebase_expr(
expr: &RebaseExpr,
row: &[SqliteValue],
) -> Result<SqliteValue, IndexRegenError> {
match expr {
RebaseExpr::ColumnRef(idx) => {
let i = idx.get() as usize;
Ok(row.get(i).cloned().unwrap_or(SqliteValue::Null))
}
RebaseExpr::Literal(val) => Ok(val.clone()),
RebaseExpr::UnaryOp { op, operand } => {
let v = eval_rebase_expr(operand, row)?;
Ok(eval_unary_op(*op, v))
}
RebaseExpr::BinaryOp { op, left, right } => {
let l = eval_rebase_expr(left, row)?;
let r = eval_rebase_expr(right, row)?;
Ok(eval_binary_op(*op, l, r))
}
RebaseExpr::FunctionCall { name, args } => {
let evaluated: Result<Vec<_>, _> =
args.iter().map(|a| eval_rebase_expr(a, row)).collect();
eval_function(name, &evaluated?)
}
RebaseExpr::Cast { expr, type_name } => {
let v = eval_rebase_expr(expr, row)?;
let affinity = TypeAffinity::from_type_name(type_name);
Ok(v.apply_affinity(affinity))
}
RebaseExpr::Case {
operand,
when_clauses,
else_clause,
} => eval_case(
operand.as_deref(),
when_clauses,
else_clause.as_deref(),
row,
),
RebaseExpr::Coalesce(exprs) => {
for e in exprs {
let v = eval_rebase_expr(e, row)?;
if !matches!(v, SqliteValue::Null) {
return Ok(v);
}
}
Ok(SqliteValue::Null)
}
RebaseExpr::NullIf { left, right } => {
let l = eval_rebase_expr(left, row)?;
let r = eval_rebase_expr(right, row)?;
if sqlite_values_equal(&l, &r) {
Ok(SqliteValue::Null)
} else {
Ok(l)
}
}
RebaseExpr::Concat { left, right } => {
let l = eval_rebase_expr(left, row)?;
let r = eval_rebase_expr(right, row)?;
Ok(sqlite_concat(&l, &r))
}
}
}
fn eval_case(
operand: Option<&RebaseExpr>,
when_clauses: &[(RebaseExpr, RebaseExpr)],
else_clause: Option<&RebaseExpr>,
row: &[SqliteValue],
) -> Result<SqliteValue, IndexRegenError> {
if let Some(op_expr) = operand {
let op_val = eval_rebase_expr(op_expr, row)?;
for (when_expr, then_expr) in when_clauses {
let when_val = eval_rebase_expr(when_expr, row)?;
if sqlite_values_equal(&op_val, &when_val) {
return eval_rebase_expr(then_expr, row);
}
}
} else {
for (when_expr, then_expr) in when_clauses {
let when_val = eval_rebase_expr(when_expr, row)?;
if sqlite_value_is_truthy(&when_val) {
return eval_rebase_expr(then_expr, row);
}
}
}
if let Some(else_expr) = else_clause {
eval_rebase_expr(else_expr, row)
} else {
Ok(SqliteValue::Null)
}
}
#[allow(clippy::float_cmp)]
fn sqlite_values_equal(a: &SqliteValue, b: &SqliteValue) -> bool {
match (a, b) {
(SqliteValue::Integer(x), SqliteValue::Integer(y)) => x == y,
(SqliteValue::Float(x), SqliteValue::Float(y)) => x == y,
#[allow(clippy::cast_precision_loss)]
(SqliteValue::Integer(x), SqliteValue::Float(y))
| (SqliteValue::Float(y), SqliteValue::Integer(x)) => *x as f64 == *y,
(SqliteValue::Text(x), SqliteValue::Text(y)) => x == y,
(SqliteValue::Blob(x), SqliteValue::Blob(y)) => x == y,
_ => false,
}
}
fn sqlite_value_is_truthy(v: &SqliteValue) -> bool {
match v {
SqliteValue::Null => false,
v => v.to_integer() != 0,
}
}
fn sqlite_concat(a: &SqliteValue, b: &SqliteValue) -> SqliteValue {
if matches!(a, SqliteValue::Null) || matches!(b, SqliteValue::Null) {
return SqliteValue::Null;
}
if let (SqliteValue::Blob(ba), SqliteValue::Blob(bb)) = (a, b) {
let mut res = Vec::with_capacity(ba.len() + bb.len());
res.extend_from_slice(ba);
res.extend_from_slice(bb);
return SqliteValue::Blob(res.into());
}
let sa = a.to_text();
let sb = b.to_text();
SqliteValue::Text(format!("{sa}{sb}").into())
}
use fsqlite_types::glossary::{RebaseBinaryOp, RebaseUnaryOp};
fn eval_unary_op(op: RebaseUnaryOp, v: SqliteValue) -> SqliteValue {
match op {
RebaseUnaryOp::Negate => {
if matches!(v, SqliteValue::Null) {
return SqliteValue::Null;
}
match v.apply_affinity(TypeAffinity::Numeric) {
SqliteValue::Integer(i) => match i.checked_neg() {
Some(val) => SqliteValue::Integer(val),
None => SqliteValue::Float(-(i as f64)),
},
SqliteValue::Float(f) => SqliteValue::Float(-f),
_ => SqliteValue::Integer(0),
}
}
RebaseUnaryOp::BitwiseNot => match v {
SqliteValue::Integer(i) => SqliteValue::Integer(!i),
SqliteValue::Null => SqliteValue::Null,
other => {
let coerced = other.apply_affinity(TypeAffinity::Integer);
if let SqliteValue::Integer(i) = coerced {
SqliteValue::Integer(!i)
} else {
SqliteValue::Integer(!0)
}
}
},
RebaseUnaryOp::Not => {
if matches!(v, SqliteValue::Null) {
SqliteValue::Null
} else {
SqliteValue::Integer(i64::from(!sqlite_value_is_truthy(&v)))
}
}
}
}
#[allow(clippy::cast_possible_truncation, clippy::cast_precision_loss)]
fn eval_binary_op(op: RebaseBinaryOp, left: SqliteValue, right: SqliteValue) -> SqliteValue {
if matches!(left, SqliteValue::Null) || matches!(right, SqliteValue::Null) {
return SqliteValue::Null;
}
let left = left.apply_affinity(TypeAffinity::Numeric);
let right = right.apply_affinity(TypeAffinity::Numeric);
match op {
RebaseBinaryOp::Add => left.sql_add(&right),
RebaseBinaryOp::Subtract => left.sql_sub(&right),
RebaseBinaryOp::Multiply => left.sql_mul(&right),
RebaseBinaryOp::Divide => numeric_div(&left, &right),
RebaseBinaryOp::Remainder => numeric_rem(&left, &right),
RebaseBinaryOp::BitwiseAnd => integer_bitop(&left, &right, |a, b| a & b),
RebaseBinaryOp::BitwiseOr => integer_bitop(&left, &right, |a, b| a | b),
#[allow(clippy::cast_sign_loss)]
RebaseBinaryOp::ShiftLeft => integer_bitop(&left, &right, |a, b| {
let shift = b.unsigned_abs() as u32;
if shift >= 64 {
if b < 0 && a < 0 { -1 } else { 0 }
} else if b < 0 {
a >> shift
} else {
a << shift
}
}),
#[allow(clippy::cast_sign_loss)]
RebaseBinaryOp::ShiftRight => integer_bitop(&left, &right, |a, b| {
let shift = b.unsigned_abs() as u32;
if shift >= 64 {
if b >= 0 && a < 0 { -1 } else { 0 }
} else if b < 0 {
a << shift
} else {
a >> shift
}
}),
}
}
fn numeric_div(l: &SqliteValue, r: &SqliteValue) -> SqliteValue {
if let (SqliteValue::Integer(a), SqliteValue::Integer(b)) = (l, r) {
if *b == 0 {
return SqliteValue::Null;
}
SqliteValue::Integer(a.wrapping_div(*b))
} else {
let fb = r.to_float();
if fb == 0.0 {
return SqliteValue::Null;
}
SqliteValue::from(l.to_float() / fb)
}
}
fn numeric_rem(l: &SqliteValue, r: &SqliteValue) -> SqliteValue {
if let (SqliteValue::Integer(a), SqliteValue::Integer(b)) = (l, r) {
if *b == 0 {
return SqliteValue::Null;
}
SqliteValue::Integer(a.wrapping_rem(*b))
} else {
let ia = l.to_integer();
let ib = r.to_integer();
if ib == 0 {
return SqliteValue::Null;
}
let result = ia.checked_rem(ib).unwrap_or_default();
SqliteValue::Float(result as f64)
}
}
fn integer_bitop(l: &SqliteValue, r: &SqliteValue, f: impl FnOnce(i64, i64) -> i64) -> SqliteValue {
SqliteValue::Integer(f(l.to_integer(), r.to_integer()))
}
#[allow(clippy::too_many_lines)]
fn eval_function(name: &str, args: &[SqliteValue]) -> Result<SqliteValue, IndexRegenError> {
match name.to_ascii_lowercase().as_str() {
"abs" => {
if let Some(v) = args.first() {
if matches!(v, SqliteValue::Null) {
return Ok(SqliteValue::Null);
}
let coerced = v.clone().apply_affinity(TypeAffinity::Numeric);
Ok(match coerced {
SqliteValue::Integer(i) => match i.checked_abs() {
Some(val) => SqliteValue::Integer(val),
None => SqliteValue::Float((i as f64).abs()),
},
SqliteValue::Float(f) => SqliteValue::Float(f.abs()),
_ => SqliteValue::Integer(0),
})
} else {
Ok(SqliteValue::Null)
}
}
"lower" => {
if let Some(SqliteValue::Text(s)) = args.first() {
Ok(SqliteValue::Text(s.to_ascii_lowercase().into()))
} else {
Ok(args.first().cloned().unwrap_or(SqliteValue::Null))
}
}
"upper" => {
if let Some(SqliteValue::Text(s)) = args.first() {
Ok(SqliteValue::Text(s.to_ascii_uppercase().into()))
} else {
Ok(args.first().cloned().unwrap_or(SqliteValue::Null))
}
}
"length" =>
{
#[allow(clippy::cast_possible_wrap)]
if let Some(v) = args.first() {
Ok(match v {
SqliteValue::Null => SqliteValue::Null,
SqliteValue::Text(s) => SqliteValue::Integer(s.chars().count() as i64),
SqliteValue::Blob(b) => SqliteValue::Integer(b.len() as i64),
SqliteValue::Integer(n) => SqliteValue::Integer(n.to_string().len() as i64),
v @ SqliteValue::Float(_) => SqliteValue::Integer(v.to_text().len() as i64),
})
} else {
Ok(SqliteValue::Null)
}
}
"typeof" => {
if let Some(v) = args.first() {
Ok(SqliteValue::Text(Arc::from(match v {
SqliteValue::Null => "null",
SqliteValue::Integer(_) => "integer",
SqliteValue::Float(_) => "real",
SqliteValue::Text(_) => "text",
SqliteValue::Blob(_) => "blob",
})))
} else {
Ok(SqliteValue::Null)
}
}
"ifnull" => {
if args.len() >= 2 {
if matches!(args[0], SqliteValue::Null) {
Ok(args[1].clone())
} else {
Ok(args[0].clone())
}
} else {
Ok(SqliteValue::Null)
}
}
"max" => {
let mut best: Option<&SqliteValue> = None;
for a in args {
if matches!(a, SqliteValue::Null) {
return Ok(SqliteValue::Null);
}
if let Some(cur) = best {
if sqlite_value_compare(a, cur) == std::cmp::Ordering::Greater {
best = Some(a);
}
} else {
best = Some(a);
}
}
Ok(best.cloned().unwrap_or(SqliteValue::Null))
}
"min" => {
let mut best: Option<&SqliteValue> = None;
for a in args {
if matches!(a, SqliteValue::Null) {
return Ok(SqliteValue::Null);
}
if let Some(cur) = best {
if sqlite_value_compare(a, cur) == std::cmp::Ordering::Less {
best = Some(a);
}
} else {
best = Some(a);
}
}
Ok(best.cloned().unwrap_or(SqliteValue::Null))
}
_ => Err(IndexRegenError::ExprEvalError {
detail: "unsupported function in rebase expression",
}),
}
}
fn sqlite_value_compare(a: &SqliteValue, b: &SqliteValue) -> std::cmp::Ordering {
use std::cmp::Ordering;
fn type_order(v: &SqliteValue) -> u8 {
match v {
SqliteValue::Null => 0,
SqliteValue::Integer(_) | SqliteValue::Float(_) => 1,
SqliteValue::Text(_) => 2,
SqliteValue::Blob(_) => 3,
}
}
let ta = type_order(a);
let tb = type_order(b);
if ta != tb {
return ta.cmp(&tb);
}
match (a, b) {
(SqliteValue::Integer(x), SqliteValue::Integer(y)) => x.cmp(y),
(SqliteValue::Float(x), SqliteValue::Float(y)) => x.total_cmp(y),
#[allow(clippy::cast_precision_loss)]
(SqliteValue::Integer(x), SqliteValue::Float(y)) => (*x as f64).total_cmp(y),
#[allow(clippy::cast_precision_loss)]
(SqliteValue::Float(x), SqliteValue::Integer(y)) => x.total_cmp(&(*y as f64)),
(SqliteValue::Text(x), SqliteValue::Text(y)) => x.cmp(y),
(SqliteValue::Blob(x), SqliteValue::Blob(y)) => x.cmp(y),
_ => Ordering::Equal,
}
}
pub fn compute_index_key(
index_def: &IndexDef,
row: &[SqliteValue],
) -> Result<Vec<u8>, IndexRegenError> {
let mut key_values = Vec::with_capacity(index_def.key_parts.len());
for part in &index_def.key_parts {
let (val, affinity, collation) = match part {
IndexKeyPart::Column {
col_idx,
affinity,
collation,
} => {
let i = col_idx.get() as usize;
let v = row.get(i).cloned().unwrap_or(SqliteValue::Null);
(v, *affinity, *collation)
}
IndexKeyPart::Expression {
expr,
affinity,
collation,
} => {
let v = eval_rebase_expr(expr, row)?;
(v, *affinity, *collation)
}
};
let coerced = val.apply_affinity(affinity);
let final_val = apply_collation(coerced, collation);
key_values.push(final_val);
}
Ok(serialize_record(&key_values))
}
fn apply_collation(val: SqliteValue, collation: Collation) -> SqliteValue {
match collation {
Collation::Binary => val,
Collation::Nocase => {
if let SqliteValue::Text(s) = val {
SqliteValue::Text(s.to_ascii_uppercase().into())
} else {
val
}
}
Collation::Rtrim => {
if let SqliteValue::Text(s) = val {
SqliteValue::Text(Arc::from(s.trim_end_matches(' ')))
} else {
val
}
}
}
}
fn row_participates(index_def: &IndexDef, row: &[SqliteValue]) -> Result<bool, IndexRegenError> {
if let Some(ref predicate) = index_def.where_predicate {
let result = eval_rebase_expr(predicate, row)?;
Ok(sqlite_value_is_truthy(&result))
} else {
Ok(true)
}
}
pub fn apply_column_updates(
base_row: &[SqliteValue],
column_updates: &[(ColumnIdx, RebaseExpr)],
table_column_affinities: &[TypeAffinity],
) -> Result<Vec<SqliteValue>, IndexRegenError> {
let mut updated = base_row.to_vec();
for (col_idx, expr) in column_updates {
let i = col_idx.get() as usize;
if i >= updated.len() {
updated.resize(i + 1, SqliteValue::Null);
}
let new_val = eval_rebase_expr(expr, base_row)?;
let affinity = table_column_affinities
.get(i)
.copied()
.unwrap_or(TypeAffinity::Blob);
updated[i] = new_val.apply_affinity(affinity);
}
Ok(updated)
}
#[derive(Debug, Clone, PartialEq)]
pub struct IndexRegenOps {
pub ops: Vec<IntentOpKind>,
}
pub fn discard_stale_index_ops(
ops: &[IntentOpKind],
table_id: TableId,
rowid: RowId,
indexes: &[IndexDef],
) -> Vec<IntentOpKind> {
let table_index_ids: std::collections::HashSet<u32> = indexes
.iter()
.filter(|idx| idx.table_id == table_id)
.map(|idx| idx.index_id.get())
.collect();
ops.iter()
.filter(|op| {
match op {
IntentOpKind::IndexInsert {
index, rowid: r, ..
}
| IntentOpKind::IndexDelete {
index, rowid: r, ..
} => {
!(table_index_ids.contains(&index.get()) && *r == rowid)
}
_ => true,
}
})
.cloned()
.collect()
}
#[allow(clippy::too_many_lines)]
pub fn regenerate_index_ops(
base_record: &[u8],
column_updates: &[(ColumnIdx, RebaseExpr)],
indexes: &[IndexDef],
rowid: RowId,
unique_checker: &dyn UniqueChecker,
) -> Result<IndexRegenOps, IndexRegenError> {
let _ = BEAD_ID;
let base_row = parse_record(base_record).ok_or(IndexRegenError::MalformedRecord)?;
let table_col_affinities = indexes.first().map_or(&[] as &[TypeAffinity], |idx| {
idx.table_column_affinities.as_slice()
});
let updated_row = apply_column_updates(&base_row, column_updates, table_col_affinities)?;
let mut ops = Vec::new();
for index_def in indexes {
let base_participates = row_participates(index_def, &base_row)?;
let updated_participates = row_participates(index_def, &updated_row)?;
match (base_participates, updated_participates) {
(true, false) => {
let old_key = compute_index_key(index_def, &base_row)?;
ops.push(IntentOpKind::IndexDelete {
index: index_def.index_id,
key: old_key,
rowid,
});
}
(false, true) => {
let new_key = compute_index_key(index_def, &updated_row)?;
if index_def.unique {
let has_null = parse_record(&new_key)
.map(|fields| fields.iter().any(|v| matches!(v, SqliteValue::Null)))
.unwrap_or(false);
if !has_null {
if let Some(conflicting) =
unique_checker.check_unique(index_def.index_id, &new_key, rowid)
{
return Err(IndexRegenError::UniqueConstraintViolation {
index_id: index_def.index_id,
conflicting_rowid: conflicting,
});
}
}
}
ops.push(IntentOpKind::IndexInsert {
index: index_def.index_id,
key: new_key,
rowid,
});
}
(true, true) => {
let old_key = compute_index_key(index_def, &base_row)?;
let new_key = compute_index_key(index_def, &updated_row)?;
if old_key != new_key {
ops.push(IntentOpKind::IndexDelete {
index: index_def.index_id,
key: old_key,
rowid,
});
if index_def.unique {
let has_null = parse_record(&new_key)
.map(|fields| fields.iter().any(|v| matches!(v, SqliteValue::Null)))
.unwrap_or(false);
if !has_null {
if let Some(conflicting) =
unique_checker.check_unique(index_def.index_id, &new_key, rowid)
{
return Err(IndexRegenError::UniqueConstraintViolation {
index_id: index_def.index_id,
conflicting_rowid: conflicting,
});
}
}
}
ops.push(IntentOpKind::IndexInsert {
index: index_def.index_id,
key: new_key,
rowid,
});
}
}
(false, false) => {
}
}
}
Ok(IndexRegenOps { ops })
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use super::*;
use fsqlite_types::record::serialize_record;
fn ordinary_index(
index_id: u32,
table_id: u32,
columns: &[(u32, TypeAffinity)],
unique: bool,
table_affinities: Vec<TypeAffinity>,
) -> IndexDef {
IndexDef {
index_id: IndexId::new(index_id),
table_id: TableId::new(table_id),
unique,
key_parts: columns
.iter()
.map(|&(col, aff)| IndexKeyPart::Column {
col_idx: ColumnIdx::new(col),
affinity: aff,
collation: Collation::Binary,
})
.collect(),
where_predicate: None,
table_column_affinities: table_affinities,
}
}
fn record_bytes(values: &[SqliteValue]) -> Vec<u8> {
serialize_record(values)
}
#[test]
fn test_index_regen_ordinary_index_key_change() {
let base = record_bytes(&[
SqliteValue::Integer(1), SqliteValue::Text("old".into()), SqliteValue::Integer(42), ]);
let indexes = vec![ordinary_index(
10,
1,
&[(1, TypeAffinity::Text)],
false,
vec![
TypeAffinity::Integer,
TypeAffinity::Text,
TypeAffinity::Integer,
],
)];
let updates = vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Text("new".into())),
)];
let result =
regenerate_index_ops(&base, &updates, &indexes, RowId::new(1), &NoOpUniqueChecker)
.unwrap();
assert_eq!(result.ops.len(), 2, "bead_id={BEAD_ID} key_change");
assert!(
matches!(&result.ops[0], IntentOpKind::IndexDelete { index, .. } if index.get() == 10),
"bead_id={BEAD_ID} first op is delete"
);
assert!(
matches!(&result.ops[1], IntentOpKind::IndexInsert { index, .. } if index.get() == 10),
"bead_id={BEAD_ID} second op is insert"
);
}
#[test]
fn test_index_regen_partial_index_participation_change() {
let base = record_bytes(&[SqliteValue::Integer(1), SqliteValue::Integer(0)]);
let indexes = vec![IndexDef {
index_id: IndexId::new(20),
table_id: TableId::new(1),
unique: false,
key_parts: vec![IndexKeyPart::Column {
col_idx: ColumnIdx::new(1),
affinity: TypeAffinity::Integer,
collation: Collation::Binary,
}],
where_predicate: Some(RebaseExpr::ColumnRef(ColumnIdx::new(1))),
table_column_affinities: vec![TypeAffinity::Integer, TypeAffinity::Integer],
}];
let updates = vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Integer(20)),
)];
let result =
regenerate_index_ops(&base, &updates, &indexes, RowId::new(1), &NoOpUniqueChecker)
.unwrap();
assert_eq!(result.ops.len(), 1, "bead_id={BEAD_ID} partial_insert_only");
assert!(
matches!(&result.ops[0], IntentOpKind::IndexInsert { index, .. } if index.get() == 20),
"bead_id={BEAD_ID} op is insert"
);
}
#[test]
fn test_index_regen_partial_index_entry_to_entry() {
let base = record_bytes(&[
SqliteValue::Integer(1),
SqliteValue::Integer(5),
SqliteValue::Text("foo".into()),
]);
let indexes = vec![IndexDef {
index_id: IndexId::new(30),
table_id: TableId::new(1),
unique: false,
key_parts: vec![IndexKeyPart::Column {
col_idx: ColumnIdx::new(1),
affinity: TypeAffinity::Integer,
collation: Collation::Binary,
}],
where_predicate: Some(RebaseExpr::ColumnRef(ColumnIdx::new(1))),
table_column_affinities: vec![
TypeAffinity::Integer,
TypeAffinity::Integer,
TypeAffinity::Text,
],
}];
let updates = vec![(
ColumnIdx::new(2),
RebaseExpr::Literal(SqliteValue::Text("bar".into())),
)];
let result =
regenerate_index_ops(&base, &updates, &indexes, RowId::new(1), &NoOpUniqueChecker)
.unwrap();
assert!(result.ops.is_empty(), "bead_id={BEAD_ID} no_op_same_key");
}
#[test]
fn test_index_regen_unique_constraint_violation_aborts() {
struct ConflictChecker;
impl UniqueChecker for ConflictChecker {
fn check_unique(
&self,
_index_id: IndexId,
_key_bytes: &[u8],
_exclude_rowid: RowId,
) -> Option<RowId> {
Some(RowId::new(99))
}
}
let base = record_bytes(&[SqliteValue::Integer(1), SqliteValue::Text("alpha".into())]);
let indexes = vec![ordinary_index(
40,
1,
&[(1, TypeAffinity::Text)],
true, vec![TypeAffinity::Integer, TypeAffinity::Text],
)];
let updates = vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Text("beta".into())),
)];
let result =
regenerate_index_ops(&base, &updates, &indexes, RowId::new(1), &ConflictChecker);
assert!(result.is_err(), "bead_id={BEAD_ID} unique_violation");
if let Err(IndexRegenError::UniqueConstraintViolation {
index_id,
conflicting_rowid,
}) = result
{
assert_eq!(index_id.get(), 40, "bead_id={BEAD_ID} correct_index");
assert_eq!(
conflicting_rowid.get(),
99,
"bead_id={BEAD_ID} correct_rowid"
);
} else {
panic!("bead_id={BEAD_ID} wrong error type");
}
}
#[test]
fn test_index_regen_expression_index() {
let base = record_bytes(&[SqliteValue::Integer(1), SqliteValue::Text("Hello".into())]);
let indexes = vec![IndexDef {
index_id: IndexId::new(50),
table_id: TableId::new(1),
unique: false,
key_parts: vec![IndexKeyPart::Expression {
expr: RebaseExpr::FunctionCall {
name: "lower".to_owned(),
args: vec![RebaseExpr::ColumnRef(ColumnIdx::new(1))],
},
affinity: TypeAffinity::Text,
collation: Collation::Binary,
}],
where_predicate: None,
table_column_affinities: vec![TypeAffinity::Integer, TypeAffinity::Text],
}];
let updates = vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Text("World".into())),
)];
let result =
regenerate_index_ops(&base, &updates, &indexes, RowId::new(1), &NoOpUniqueChecker)
.unwrap();
assert_eq!(result.ops.len(), 2, "bead_id={BEAD_ID} expr_index");
if let IntentOpKind::IndexDelete { key, .. } = &result.ops[0] {
let parsed = parse_record(key).unwrap();
assert_eq!(
parsed,
vec![SqliteValue::Text("hello".into())],
"bead_id={BEAD_ID} old_key_lower"
);
} else {
panic!("bead_id={BEAD_ID} expected delete");
}
if let IntentOpKind::IndexInsert { key, .. } = &result.ops[1] {
let parsed = parse_record(key).unwrap();
assert_eq!(
parsed,
vec![SqliteValue::Text("world".into())],
"bead_id={BEAD_ID} new_key_lower"
);
} else {
panic!("bead_id={BEAD_ID} expected insert");
}
}
#[test]
fn test_index_regen_no_op_when_key_unchanged() {
let base = record_bytes(&[SqliteValue::Integer(1), SqliteValue::Text("foo".into())]);
let indexes = vec![ordinary_index(
60,
1,
&[(0, TypeAffinity::Integer)], false,
vec![TypeAffinity::Integer, TypeAffinity::Text],
)];
let updates = vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Text("bar".into())),
)];
let result =
regenerate_index_ops(&base, &updates, &indexes, RowId::new(1), &NoOpUniqueChecker)
.unwrap();
assert!(result.ops.is_empty(), "bead_id={BEAD_ID} no_op_unchanged");
}
#[test]
fn test_index_regen_multiple_indexes_same_table() {
let base = record_bytes(&[
SqliteValue::Integer(1),
SqliteValue::Text("alice".into()),
SqliteValue::Integer(30),
]);
let affinities = vec![
TypeAffinity::Integer,
TypeAffinity::Text,
TypeAffinity::Integer,
];
let indexes = vec![
ordinary_index(70, 1, &[(1, TypeAffinity::Text)], false, affinities.clone()),
ordinary_index(71, 1, &[(2, TypeAffinity::Integer)], false, affinities),
];
let updates = vec![
(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Text("bob".into())),
),
(
ColumnIdx::new(2),
RebaseExpr::Literal(SqliteValue::Integer(40)),
),
];
let result =
regenerate_index_ops(&base, &updates, &indexes, RowId::new(1), &NoOpUniqueChecker)
.unwrap();
assert_eq!(result.ops.len(), 4, "bead_id={BEAD_ID} multi_index");
assert!(
matches!(&result.ops[0], IntentOpKind::IndexDelete { index, .. } if index.get() == 70)
);
assert!(
matches!(&result.ops[1], IntentOpKind::IndexInsert { index, .. } if index.get() == 70)
);
assert!(
matches!(&result.ops[2], IntentOpKind::IndexDelete { index, .. } if index.get() == 71)
);
assert!(
matches!(&result.ops[3], IntentOpKind::IndexInsert { index, .. } if index.get() == 71)
);
}
#[test]
fn test_index_regen_overflow_key_handling() {
let large_text = "x".repeat(4000);
let base = record_bytes(&[
SqliteValue::Integer(1),
SqliteValue::Text(large_text.clone().into()),
]);
let indexes = vec![ordinary_index(
80,
1,
&[(1, TypeAffinity::Text)],
false,
vec![TypeAffinity::Integer, TypeAffinity::Text],
)];
let new_large_text = "y".repeat(4000);
let updates = vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Text(new_large_text.clone().into())),
)];
let result =
regenerate_index_ops(&base, &updates, &indexes, RowId::new(1), &NoOpUniqueChecker)
.unwrap();
assert_eq!(result.ops.len(), 2, "bead_id={BEAD_ID} overflow_key");
if let IntentOpKind::IndexDelete { key, .. } = &result.ops[0] {
let parsed = parse_record(key).unwrap();
assert_eq!(parsed[0], SqliteValue::Text(large_text.into()));
}
if let IntentOpKind::IndexInsert { key, .. } = &result.ops[1] {
let parsed = parse_record(key).unwrap();
assert_eq!(parsed[0], SqliteValue::Text(new_large_text.into()));
}
}
#[test]
fn test_discard_stale_index_ops() {
let indexes = vec![ordinary_index(
10,
1,
&[(1, TypeAffinity::Text)],
false,
vec![TypeAffinity::Integer, TypeAffinity::Text],
)];
let ops = vec![
IntentOpKind::IndexDelete {
index: IndexId::new(10),
key: vec![1, 2, 3],
rowid: RowId::new(5),
},
IntentOpKind::IndexInsert {
index: IndexId::new(10),
key: vec![4, 5, 6],
rowid: RowId::new(5),
},
IntentOpKind::IndexInsert {
index: IndexId::new(10),
key: vec![7, 8, 9],
rowid: RowId::new(99),
},
IntentOpKind::IndexDelete {
index: IndexId::new(999),
key: vec![10, 11],
rowid: RowId::new(5),
},
IntentOpKind::Update {
table: TableId::new(1),
key: RowId::new(5),
new_record: vec![],
},
];
let filtered = discard_stale_index_ops(&ops, TableId::new(1), RowId::new(5), &indexes);
assert_eq!(filtered.len(), 3, "bead_id={BEAD_ID} stale_discard");
assert!(
matches!(&filtered[0], IntentOpKind::IndexInsert { rowid, .. } if rowid.get() == 99)
);
assert!(
matches!(&filtered[1], IntentOpKind::IndexDelete { index, .. } if index.get() == 999)
);
assert!(matches!(&filtered[2], IntentOpKind::Update { .. }));
}
#[test]
fn test_column_out_of_bounds_resolves_to_null() {
let base = record_bytes(&[SqliteValue::Integer(1)]);
let indexes = vec![ordinary_index(
90,
1,
&[(5, TypeAffinity::Text)], false,
vec![TypeAffinity::Integer],
)];
let updates = vec![(
ColumnIdx::new(0),
RebaseExpr::Literal(SqliteValue::Integer(2)),
)];
let result =
regenerate_index_ops(&base, &updates, &indexes, RowId::new(1), &NoOpUniqueChecker);
assert!(result.is_ok(), "bead_id={BEAD_ID} col_oob_now_ok");
}
#[test]
fn test_nocase_collation_key_normalization() {
let base = record_bytes(&[SqliteValue::Integer(1), SqliteValue::Text("hello".into())]);
let indexes = vec![IndexDef {
index_id: IndexId::new(100),
table_id: TableId::new(1),
unique: false,
key_parts: vec![IndexKeyPart::Column {
col_idx: ColumnIdx::new(1),
affinity: TypeAffinity::Text,
collation: Collation::Nocase,
}],
where_predicate: None,
table_column_affinities: vec![TypeAffinity::Integer, TypeAffinity::Text],
}];
let updates = vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Text("HELLO".into())),
)];
let result =
regenerate_index_ops(&base, &updates, &indexes, RowId::new(1), &NoOpUniqueChecker)
.unwrap();
assert!(result.ops.is_empty(), "bead_id={BEAD_ID} nocase_same_key");
}
#[test]
fn test_eval_rebase_expr_arithmetic() {
let row = vec![SqliteValue::Integer(10), SqliteValue::Integer(3)];
let expr = RebaseExpr::BinaryOp {
op: RebaseBinaryOp::Add,
left: Box::new(RebaseExpr::ColumnRef(ColumnIdx::new(0))),
right: Box::new(RebaseExpr::ColumnRef(ColumnIdx::new(1))),
};
let result = eval_rebase_expr(&expr, &row).unwrap();
assert_eq!(result, SqliteValue::Integer(13), "bead_id={BEAD_ID} add");
let expr = RebaseExpr::BinaryOp {
op: RebaseBinaryOp::Multiply,
left: Box::new(RebaseExpr::ColumnRef(ColumnIdx::new(0))),
right: Box::new(RebaseExpr::ColumnRef(ColumnIdx::new(1))),
};
let result = eval_rebase_expr(&expr, &row).unwrap();
assert_eq!(result, SqliteValue::Integer(30), "bead_id={BEAD_ID} mul");
}
#[test]
fn test_eval_division_by_zero() {
let row = vec![SqliteValue::Integer(10), SqliteValue::Integer(0)];
let expr = RebaseExpr::BinaryOp {
op: RebaseBinaryOp::Divide,
left: Box::new(RebaseExpr::ColumnRef(ColumnIdx::new(0))),
right: Box::new(RebaseExpr::ColumnRef(ColumnIdx::new(1))),
};
let result = eval_rebase_expr(&expr, &row).unwrap();
assert_eq!(result, SqliteValue::Null, "bead_id={BEAD_ID} div_zero");
}
#[test]
fn test_index_regen_partial_leaves_participation() {
let base = record_bytes(&[
SqliteValue::Integer(1),
SqliteValue::Integer(10), ]);
let indexes = vec![IndexDef {
index_id: IndexId::new(110),
table_id: TableId::new(1),
unique: false,
key_parts: vec![IndexKeyPart::Column {
col_idx: ColumnIdx::new(1),
affinity: TypeAffinity::Integer,
collation: Collation::Binary,
}],
where_predicate: Some(RebaseExpr::ColumnRef(ColumnIdx::new(1))),
table_column_affinities: vec![TypeAffinity::Integer, TypeAffinity::Integer],
}];
let updates = vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Integer(0)),
)];
let result =
regenerate_index_ops(&base, &updates, &indexes, RowId::new(1), &NoOpUniqueChecker)
.unwrap();
assert_eq!(result.ops.len(), 1, "bead_id={BEAD_ID} leaves_partial");
assert!(
matches!(&result.ops[0], IntentOpKind::IndexDelete { index, .. } if index.get() == 110),
"bead_id={BEAD_ID} delete_on_leave"
);
}
#[test]
fn test_index_regen_unique_no_conflict() {
let base = record_bytes(&[SqliteValue::Integer(1), SqliteValue::Text("alpha".into())]);
let indexes = vec![ordinary_index(
120,
1,
&[(1, TypeAffinity::Text)],
true,
vec![TypeAffinity::Integer, TypeAffinity::Text],
)];
let updates = vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Text("beta".into())),
)];
let result =
regenerate_index_ops(&base, &updates, &indexes, RowId::new(1), &NoOpUniqueChecker)
.unwrap();
assert_eq!(result.ops.len(), 2, "bead_id={BEAD_ID} unique_ok");
}
#[test]
fn test_apply_column_updates() {
let base = vec![
SqliteValue::Integer(1),
SqliteValue::Text("old".into()),
SqliteValue::Float(3.125),
];
let affinities = vec![
TypeAffinity::Integer,
TypeAffinity::Text,
TypeAffinity::Real,
];
let updates = vec![
(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Text("new".into())),
),
(
ColumnIdx::new(2),
RebaseExpr::Literal(SqliteValue::Float(2.72)),
),
];
let result = apply_column_updates(&base, &updates, &affinities).unwrap();
assert_eq!(result[0], SqliteValue::Integer(1));
assert_eq!(result[1], SqliteValue::Text("new".into()));
assert_eq!(result[2], SqliteValue::Float(2.72));
}
#[test]
fn test_malformed_record_error() {
let bad_record = vec![0xFF, 0xFF, 0xFF]; let indexes = vec![ordinary_index(
130,
1,
&[(0, TypeAffinity::Integer)],
false,
vec![TypeAffinity::Integer],
)];
let result = regenerate_index_ops(
&bad_record,
&[],
&indexes,
RowId::new(1),
&NoOpUniqueChecker,
);
assert!(
matches!(result, Err(IndexRegenError::MalformedRecord)),
"bead_id={BEAD_ID} malformed"
);
}
#[test]
fn test_composite_index_key() {
let base = record_bytes(&[
SqliteValue::Integer(1),
SqliteValue::Text("alice".into()),
SqliteValue::Integer(30),
]);
let indexes = vec![IndexDef {
index_id: IndexId::new(140),
table_id: TableId::new(1),
unique: false,
key_parts: vec![
IndexKeyPart::Column {
col_idx: ColumnIdx::new(1),
affinity: TypeAffinity::Text,
collation: Collation::Binary,
},
IndexKeyPart::Column {
col_idx: ColumnIdx::new(2),
affinity: TypeAffinity::Integer,
collation: Collation::Binary,
},
],
where_predicate: None,
table_column_affinities: vec![
TypeAffinity::Integer,
TypeAffinity::Text,
TypeAffinity::Integer,
],
}];
let updates = vec![(
ColumnIdx::new(2),
RebaseExpr::Literal(SqliteValue::Integer(40)),
)];
let result =
regenerate_index_ops(&base, &updates, &indexes, RowId::new(1), &NoOpUniqueChecker)
.unwrap();
assert_eq!(result.ops.len(), 2, "bead_id={BEAD_ID} composite_key");
if let IntentOpKind::IndexInsert { key, .. } = &result.ops[1] {
let parsed = parse_record(key).unwrap();
assert_eq!(parsed.len(), 2, "bead_id={BEAD_ID} composite_2_cols");
assert_eq!(parsed[0], SqliteValue::Text("alice".into()));
assert_eq!(parsed[1], SqliteValue::Integer(40));
} else {
panic!("bead_id={BEAD_ID} expected insert");
}
}
#[test]
fn test_unique_checker_with_btreemap() {
struct BTreeMapChecker {
entries: BTreeMap<(u32, Vec<u8>), RowId>,
}
impl UniqueChecker for BTreeMapChecker {
fn check_unique(
&self,
index_id: IndexId,
key_bytes: &[u8],
exclude_rowid: RowId,
) -> Option<RowId> {
self.entries
.get(&(index_id.get(), key_bytes.to_vec()))
.filter(|&&rid| rid != exclude_rowid)
.copied()
}
}
let base = record_bytes(&[SqliteValue::Integer(1), SqliteValue::Text("alice".into())]);
let indexes = vec![ordinary_index(
150,
1,
&[(1, TypeAffinity::Text)],
true,
vec![TypeAffinity::Integer, TypeAffinity::Text],
)];
let updates = vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Text("bob".into())),
)];
let new_key = compute_index_key(
&indexes[0],
&[SqliteValue::Integer(1), SqliteValue::Text("bob".into())],
)
.unwrap();
let mut entries = BTreeMap::new();
entries.insert((150, new_key), RowId::new(42));
let checker = BTreeMapChecker { entries };
let result = regenerate_index_ops(&base, &updates, &indexes, RowId::new(1), &checker);
assert!(
matches!(
result,
Err(IndexRegenError::UniqueConstraintViolation {
conflicting_rowid,
..
}) if conflicting_rowid.get() == 42
),
"bead_id={BEAD_ID} btreemap_unique"
);
}
}