use std::sync::atomic::{AtomicU64, Ordering};
use fsqlite_types::TypeAffinity;
use fsqlite_types::glossary::{
ColumnIdx, IndexId, IntentOp, IntentOpKind, RebaseExpr, RowId, Snapshot, StructuralEffects,
TableId,
};
use fsqlite_types::record::{parse_record, serialize_record};
use fsqlite_types::value::SqliteValue;
use tracing::{debug, warn};
use crate::index_regen::{
IndexDef, IndexRegenError, IndexRegenOps, UniqueChecker, apply_column_updates,
eval_rebase_expr, regenerate_index_ops,
};
const BEAD_ID: &str = "bd-1h3b";
static FSQLITE_REBASE_ATTEMPTS_TOTAL: AtomicU64 = AtomicU64::new(0);
static FSQLITE_REBASE_CONFLICTS_TOTAL: AtomicU64 = AtomicU64::new(0);
static FSQLITE_REBASE_SUCCESSES_TOTAL: AtomicU64 = AtomicU64::new(0);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RebaseMetricsSnapshot {
pub attempts_total: u64,
pub conflicts_total: u64,
pub successes_total: u64,
}
#[must_use]
pub fn rebase_metrics_snapshot() -> RebaseMetricsSnapshot {
RebaseMetricsSnapshot {
attempts_total: FSQLITE_REBASE_ATTEMPTS_TOTAL.load(Ordering::Relaxed),
conflicts_total: FSQLITE_REBASE_CONFLICTS_TOTAL.load(Ordering::Relaxed),
successes_total: FSQLITE_REBASE_SUCCESSES_TOTAL.load(Ordering::Relaxed),
}
}
pub fn reset_rebase_metrics() {
FSQLITE_REBASE_ATTEMPTS_TOTAL.store(0, Ordering::Relaxed);
FSQLITE_REBASE_CONFLICTS_TOTAL.store(0, Ordering::Relaxed);
FSQLITE_REBASE_SUCCESSES_TOTAL.store(0, Ordering::Relaxed);
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RebaseError {
SchemaEpochMismatch { expected: u64, actual: u64 },
BlockingReads { op_index: usize },
StructuralEffects { op_index: usize, effects: u32 },
TargetRowNotFound { table: TableId, key: RowId },
NotNullViolation { table: TableId, column_idx: u32 },
CheckViolation { table: TableId },
UniqueViolation {
index_id: IndexId,
conflicting_rowid: RowId,
},
IndexRegenError(IndexRegenError),
MalformedRecord { table: TableId, key: RowId },
}
impl std::fmt::Display for RebaseError {
#[allow(clippy::too_many_lines)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SchemaEpochMismatch { expected, actual } => {
write!(
f,
"SQLITE_SCHEMA: schema epoch mismatch (expected {expected}, got {actual})"
)
}
Self::BlockingReads { op_index } => {
write!(
f,
"SQLITE_BUSY_SNAPSHOT: blocking reads at intent op {op_index}"
)
}
Self::StructuralEffects { op_index, effects } => {
write!(
f,
"SQLITE_BUSY_SNAPSHOT: structural effects 0x{effects:x} at intent op {op_index}"
)
}
Self::TargetRowNotFound { table, key } => {
write!(
f,
"rebase target not found: table {} rowid {}",
table.get(),
key.get()
)
}
Self::NotNullViolation { table, column_idx } => {
write!(
f,
"NOT NULL constraint failed: table {} column {column_idx}",
table.get()
)
}
Self::CheckViolation { table } => {
write!(f, "CHECK constraint failed: table {}", table.get())
}
Self::UniqueViolation {
index_id,
conflicting_rowid,
} => {
write!(
f,
"UNIQUE constraint failed: index {} conflicting rowid {}",
index_id.get(),
conflicting_rowid.get()
)
}
Self::IndexRegenError(e) => write!(f, "index regen: {e}"),
Self::MalformedRecord { table, key } => {
write!(
f,
"malformed record: table {} rowid {}",
table.get(),
key.get()
)
}
}
}
}
impl std::error::Error for RebaseError {}
impl From<IndexRegenError> for RebaseError {
fn from(e: IndexRegenError) -> Self {
match e {
IndexRegenError::UniqueConstraintViolation {
index_id,
conflicting_rowid,
} => Self::UniqueViolation {
index_id,
conflicting_rowid,
},
other => Self::IndexRegenError(other),
}
}
}
#[derive(Debug, Clone)]
pub struct TableConstraints {
pub table_id: TableId,
pub not_null: Vec<bool>,
pub affinities: Vec<TypeAffinity>,
pub check_exprs: Vec<RebaseExpr>,
}
pub trait BaseRowReader {
fn read_base_row(&self, table: TableId, key: RowId) -> Option<Vec<u8>>;
}
pub trait RebaseSchemaLookup {
fn table_constraints(&self, table: TableId) -> Option<TableConstraints>;
fn table_indexes(&self, table: TableId) -> Vec<IndexDef>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RebaseEligibility {
Eligible,
BlockingReads { op_index: usize },
StructuralEffects { op_index: usize, effects: u32 },
}
#[must_use]
pub fn check_rebase_eligibility(intent_log: &[IntentOp]) -> RebaseEligibility {
for (i, op) in intent_log.iter().enumerate() {
if !op.footprint.reads.is_empty() {
debug!(
bead_id = BEAD_ID,
op_index = i,
reads = op.footprint.reads.len(),
"rebase blocked: non-empty reads"
);
return RebaseEligibility::BlockingReads { op_index: i };
}
if op.footprint.structural != StructuralEffects::NONE {
debug!(
bead_id = BEAD_ID,
op_index = i,
effects = op.footprint.structural.bits(),
"rebase blocked: structural effects"
);
return RebaseEligibility::StructuralEffects {
op_index: i,
effects: op.footprint.structural.bits(),
};
}
}
RebaseEligibility::Eligible
}
pub fn check_schema_epoch(
intent_log: &[IntentOp],
current_snapshot: Snapshot,
) -> Result<(), RebaseError> {
let current_epoch = current_snapshot.schema_epoch.get();
for op in intent_log {
if op.schema_epoch != current_epoch {
warn!(
bead_id = BEAD_ID,
expected = current_epoch,
actual = op.schema_epoch,
"SQLITE_SCHEMA: schema epoch mismatch"
);
return Err(RebaseError::SchemaEpochMismatch {
expected: current_epoch,
actual: op.schema_epoch,
});
}
}
Ok(())
}
fn enforce_constraints(
updated_row: &[SqliteValue],
constraints: &TableConstraints,
) -> Result<(), RebaseError> {
for (i, &is_not_null) in constraints.not_null.iter().enumerate() {
let val = updated_row.get(i).unwrap_or(&SqliteValue::Null);
if is_not_null && matches!(val, SqliteValue::Null) {
return Err(RebaseError::NotNullViolation {
table: constraints.table_id,
#[allow(clippy::cast_possible_truncation)]
column_idx: i as u32,
});
}
}
for check_expr in &constraints.check_exprs {
let result =
eval_rebase_expr(check_expr, updated_row).map_err(|_| RebaseError::CheckViolation {
table: constraints.table_id,
})?;
let is_false = match result {
SqliteValue::Null => false,
v => v.to_integer() == 0,
};
if is_false {
return Err(RebaseError::CheckViolation {
table: constraints.table_id,
});
}
}
Ok(())
}
#[derive(Debug, Clone, PartialEq)]
pub struct ReplayResult {
pub new_record: Vec<u8>,
pub index_ops: Vec<IntentOpKind>,
}
pub fn replay_update_expression(
table: TableId,
key: RowId,
column_updates: &[(ColumnIdx, RebaseExpr)],
base_reader: &dyn BaseRowReader,
schema: &dyn RebaseSchemaLookup,
unique_checker: &dyn UniqueChecker,
) -> Result<ReplayResult, RebaseError> {
let _ = BEAD_ID;
let base_record = base_reader
.read_base_row(table, key)
.ok_or(RebaseError::TargetRowNotFound { table, key })?;
let base_row = parse_record(&base_record).ok_or(RebaseError::MalformedRecord { table, key })?;
let constraints = schema.table_constraints(table);
let indexes = schema.table_indexes(table);
let affinities = constraints
.as_ref()
.map(|c| c.affinities.as_slice())
.or_else(|| {
indexes
.first()
.map(|idx| idx.table_column_affinities.as_slice())
})
.unwrap_or(&[]);
let updated_row = apply_column_updates(&base_row, column_updates, affinities)?;
let new_record = serialize_record(&updated_row);
if let Some(ref c) = constraints {
enforce_constraints(&updated_row, c)?;
}
let regen_result = if indexes.is_empty() {
IndexRegenOps { ops: vec![] }
} else {
regenerate_index_ops(&base_record, column_updates, &indexes, key, unique_checker)?
};
Ok(ReplayResult {
new_record,
index_ops: regen_result.ops,
})
}
#[derive(Debug, Clone, PartialEq)]
pub struct RebaseResult {
pub rebased_ops: Vec<IntentOpKind>,
pub replayed_count: usize,
}
#[allow(clippy::too_many_lines)]
pub fn deterministic_rebase(
intent_log: &[IntentOp],
current_snapshot: Snapshot,
base_reader: &dyn BaseRowReader,
schema: &dyn RebaseSchemaLookup,
unique_checker: &dyn UniqueChecker,
) -> Result<RebaseResult, RebaseError> {
FSQLITE_REBASE_ATTEMPTS_TOTAL.fetch_add(1, Ordering::Relaxed);
if let Err(e) = check_schema_epoch(intent_log, current_snapshot) {
FSQLITE_REBASE_CONFLICTS_TOTAL.fetch_add(1, Ordering::Relaxed);
warn!(
target: "fsqlite_mvcc::rebase",
bead_id = BEAD_ID,
conflict = %e,
intents = intent_log.len(),
"rebase conflict: schema epoch mismatch",
);
return Err(e);
}
match check_rebase_eligibility(intent_log) {
RebaseEligibility::Eligible => {}
RebaseEligibility::BlockingReads { op_index } => {
FSQLITE_REBASE_CONFLICTS_TOTAL.fetch_add(1, Ordering::Relaxed);
warn!(
target: "fsqlite_mvcc::rebase",
bead_id = BEAD_ID,
op_index,
intents = intent_log.len(),
"rebase conflict: blocking reads",
);
return Err(RebaseError::BlockingReads { op_index });
}
RebaseEligibility::StructuralEffects { op_index, effects } => {
FSQLITE_REBASE_CONFLICTS_TOTAL.fetch_add(1, Ordering::Relaxed);
warn!(
target: "fsqlite_mvcc::rebase",
bead_id = BEAD_ID,
op_index,
effects,
intents = intent_log.len(),
"rebase conflict: structural effects",
);
return Err(RebaseError::StructuralEffects { op_index, effects });
}
}
let mut stale_indexes = std::collections::HashSet::new();
for op in intent_log {
if let IntentOpKind::UpdateExpression { table, key, .. } = &op.op {
for index_def in schema.table_indexes(*table) {
stale_indexes.insert((index_def.index_id, *key));
}
}
}
let mut rebased_ops = Vec::with_capacity(intent_log.len());
let mut replayed_count = 0;
let mut updated_rows: std::collections::HashMap<(TableId, RowId), Vec<u8>> =
std::collections::HashMap::new();
struct ChainedReader<'a> {
base: &'a dyn BaseRowReader,
updates: &'a std::collections::HashMap<(TableId, RowId), Vec<u8>>,
}
impl BaseRowReader for ChainedReader<'_> {
fn read_base_row(&self, table: TableId, key: RowId) -> Option<Vec<u8>> {
if let Some(record) = self.updates.get(&(table, key)) {
Some(record.clone())
} else {
self.base.read_base_row(table, key)
}
}
}
for op in intent_log {
match &op.op {
IntentOpKind::UpdateExpression {
table,
key,
column_updates,
} => {
let _indexes = schema.table_indexes(*table);
let chained_reader = ChainedReader {
base: base_reader,
updates: &updated_rows,
};
let result = match replay_update_expression(
*table,
*key,
column_updates,
&chained_reader,
schema,
unique_checker,
) {
Ok(r) => r,
Err(e) => {
FSQLITE_REBASE_CONFLICTS_TOTAL.fetch_add(1, Ordering::Relaxed);
warn!(
target: "fsqlite_mvcc::rebase",
bead_id = BEAD_ID,
intents_replayed = replayed_count,
conflict = %e,
"rebase conflict during intent replay",
);
return Err(e);
}
};
updated_rows.insert((*table, *key), result.new_record.clone());
rebased_ops.push(IntentOpKind::Update {
table: *table,
key: *key,
new_record: result.new_record,
});
rebased_ops.extend(result.index_ops);
replayed_count += 1;
debug!(
target: "fsqlite_mvcc::rebase",
bead_id = BEAD_ID,
table = table.get(),
key = key.get(),
"replayed UpdateExpression"
);
}
IntentOpKind::IndexInsert { index, rowid, .. }
| IntentOpKind::IndexDelete { index, rowid, .. } => {
if stale_indexes.contains(&(*index, *rowid)) {
continue; }
rebased_ops.push(op.op.clone());
}
other => {
rebased_ops.push(other.clone());
}
}
}
FSQLITE_REBASE_SUCCESSES_TOTAL.fetch_add(1, Ordering::Relaxed);
tracing::info!(
target: "fsqlite_mvcc::rebase",
bead_id = BEAD_ID,
intents_replayed = replayed_count,
total_ops = intent_log.len(),
rebased_ops = rebased_ops.len(),
"rebase succeeded",
);
Ok(RebaseResult {
rebased_ops,
replayed_count,
})
}
#[derive(Debug, Clone)]
#[allow(clippy::struct_excessive_bools)]
pub struct UpdateExpressionCandidate {
pub has_triggers: bool,
pub has_foreign_keys: bool,
pub all_checks_rebase_safe: bool,
pub is_rowid_point_lookup: bool,
pub sets_rowid: bool,
pub all_sets_rebase_safe: bool,
pub has_prior_read_of_same_row: bool,
}
#[must_use]
pub fn can_emit_update_expression(candidate: &UpdateExpressionCandidate) -> bool {
!candidate.has_triggers
&& !candidate.has_foreign_keys
&& candidate.all_checks_rebase_safe
&& candidate.is_rowid_point_lookup
&& !candidate.sets_rowid
&& candidate.all_sets_rebase_safe
&& !candidate.has_prior_read_of_same_row
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use fsqlite_types::glossary::{
CommitSeq, IntentFootprint, SchemaEpoch, SemanticKeyRef, StructuralEffects,
};
use fsqlite_types::record::serialize_record;
use crate::index_regen::{Collation, IndexKeyPart, NoOpUniqueChecker, compute_index_key};
struct MemBaseReader {
rows: HashMap<(u32, i64), Vec<u8>>,
}
impl BaseRowReader for MemBaseReader {
fn read_base_row(&self, table: TableId, key: RowId) -> Option<Vec<u8>> {
self.rows.get(&(table.get(), key.get())).cloned()
}
}
struct MemSchema {
constraints: HashMap<u32, TableConstraints>,
indexes: HashMap<u32, Vec<IndexDef>>,
}
impl RebaseSchemaLookup for MemSchema {
fn table_constraints(&self, table: TableId) -> Option<TableConstraints> {
self.constraints.get(&table.get()).cloned()
}
fn table_indexes(&self, table: TableId) -> Vec<IndexDef> {
self.indexes.get(&table.get()).cloned().unwrap_or_default()
}
}
fn empty_footprint() -> IntentFootprint {
IntentFootprint {
reads: vec![],
writes: vec![],
structural: StructuralEffects::NONE,
}
}
fn make_intent_op(epoch: u64, footprint: IntentFootprint, op: IntentOpKind) -> IntentOp {
IntentOp {
schema_epoch: epoch,
footprint,
op,
}
}
fn test_snapshot(epoch: u64) -> Snapshot {
Snapshot::new(CommitSeq::new(10), SchemaEpoch::new(epoch))
}
fn record_bytes(values: &[SqliteValue]) -> Vec<u8> {
serialize_record(values)
}
fn empty_schema() -> MemSchema {
MemSchema {
constraints: HashMap::new(),
indexes: HashMap::new(),
}
}
#[test]
fn test_rebase_schema_epoch_guard_aborts_on_mismatch() {
let intent_log = vec![make_intent_op(
5, empty_footprint(),
IntentOpKind::UpdateExpression {
table: TableId::new(1),
key: RowId::new(1),
column_updates: vec![],
},
)];
let snapshot = test_snapshot(6);
let result = check_schema_epoch(&intent_log, snapshot);
assert!(
matches!(
result,
Err(RebaseError::SchemaEpochMismatch {
expected: 6,
actual: 5
})
),
"bead_id={BEAD_ID} schema_epoch_mismatch"
);
}
#[test]
fn test_rebase_rejects_blocking_reads() {
use fsqlite_types::glossary::{BtreeRef, SemanticKeyKind};
let blocking_read = SemanticKeyRef::new(
BtreeRef::Table(TableId::new(1)),
SemanticKeyKind::TableRow,
&[1, 2, 3],
);
let intent_log = vec![make_intent_op(
1,
IntentFootprint {
reads: vec![blocking_read],
writes: vec![],
structural: StructuralEffects::NONE,
},
IntentOpKind::Update {
table: TableId::new(1),
key: RowId::new(1),
new_record: vec![],
},
)];
let result = check_rebase_eligibility(&intent_log);
assert_eq!(
result,
RebaseEligibility::BlockingReads { op_index: 0 },
"bead_id={BEAD_ID} blocking_reads"
);
}
#[test]
fn test_rebase_rejects_structural_effects() {
let intent_log = vec![make_intent_op(
1,
IntentFootprint {
reads: vec![],
writes: vec![],
structural: StructuralEffects::PAGE_SPLIT | StructuralEffects::OVERFLOW_ALLOC,
},
IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(1),
record: vec![],
},
)];
let result = check_rebase_eligibility(&intent_log);
assert!(
matches!(
result,
RebaseEligibility::StructuralEffects {
op_index: 0,
effects,
} if effects != 0
),
"bead_id={BEAD_ID} structural_effects"
);
}
#[test]
fn test_rebase_update_expression_column_ref_uses_new_base() {
let mut rows = HashMap::new();
rows.insert(
(1, 1),
record_bytes(&[SqliteValue::Integer(1), SqliteValue::Integer(100)]),
);
let reader = MemBaseReader { rows };
let column_updates = vec![(
ColumnIdx::new(1),
RebaseExpr::BinaryOp {
op: fsqlite_types::glossary::RebaseBinaryOp::Add,
left: Box::new(RebaseExpr::ColumnRef(ColumnIdx::new(1))),
right: Box::new(RebaseExpr::Literal(SqliteValue::Integer(1))),
},
)];
let schema = MemSchema {
constraints: HashMap::from([(
1,
TableConstraints {
table_id: TableId::new(1),
not_null: vec![false, false],
affinities: vec![TypeAffinity::Integer, TypeAffinity::Integer],
check_exprs: vec![],
},
)]),
indexes: HashMap::new(),
};
let result = replay_update_expression(
TableId::new(1),
RowId::new(1),
&column_updates,
&reader,
&schema,
&NoOpUniqueChecker,
)
.unwrap();
let parsed = parse_record(&result.new_record).unwrap();
assert_eq!(
parsed[1],
SqliteValue::Integer(101),
"bead_id={BEAD_ID} uses_new_base"
);
}
#[test]
fn test_rebase_constraint_failure_aborts() {
let mut rows = HashMap::new();
rows.insert(
(1, 1),
record_bytes(&[SqliteValue::Integer(1), SqliteValue::Text("hello".into())]),
);
let reader = MemBaseReader { rows };
let column_updates = vec![(ColumnIdx::new(1), RebaseExpr::Literal(SqliteValue::Null))];
let schema = MemSchema {
constraints: HashMap::from([(
1,
TableConstraints {
table_id: TableId::new(1),
not_null: vec![false, true], affinities: vec![TypeAffinity::Integer, TypeAffinity::Text],
check_exprs: vec![],
},
)]),
indexes: HashMap::new(),
};
let result = replay_update_expression(
TableId::new(1),
RowId::new(1),
&column_updates,
&reader,
&schema,
&NoOpUniqueChecker,
);
assert!(
matches!(
result,
Err(RebaseError::NotNullViolation { column_idx: 1, .. })
),
"bead_id={BEAD_ID} not_null_violation"
);
}
#[test]
fn test_rebase_index_regeneration_discards_stale_ops() {
let mut rows = HashMap::new();
rows.insert(
(1, 1),
record_bytes(&[
SqliteValue::Integer(1),
SqliteValue::Text("new_base".into()),
]),
);
let reader = MemBaseReader { rows };
let index_def = IndexDef {
index_id: IndexId::new(10),
table_id: TableId::new(1),
unique: false,
key_parts: vec![IndexKeyPart::Column {
col_idx: ColumnIdx::new(1),
affinity: TypeAffinity::Text,
collation: Collation::Binary,
}],
where_predicate: None,
table_column_affinities: vec![TypeAffinity::Integer, TypeAffinity::Text],
};
let schema = MemSchema {
constraints: HashMap::from([(
1,
TableConstraints {
table_id: TableId::new(1),
not_null: vec![false, false],
affinities: vec![TypeAffinity::Integer, TypeAffinity::Text],
check_exprs: vec![],
},
)]),
indexes: HashMap::from([(1, vec![index_def])]),
};
let column_updates = vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Text("updated".into())),
)];
let result = replay_update_expression(
TableId::new(1),
RowId::new(1),
&column_updates,
&reader,
&schema,
&NoOpUniqueChecker,
)
.unwrap();
assert_eq!(
result.index_ops.len(),
2,
"bead_id={BEAD_ID} index_regen_ops"
);
assert!(
matches!(&result.index_ops[0], IntentOpKind::IndexDelete { .. }),
"bead_id={BEAD_ID} first is delete"
);
assert!(
matches!(&result.index_ops[1], IntentOpKind::IndexInsert { .. }),
"bead_id={BEAD_ID} second is insert"
);
if let IntentOpKind::IndexInsert { key, .. } = &result.index_ops[1] {
let parsed = parse_record(key).unwrap();
assert_eq!(
parsed[0],
SqliteValue::Text("updated".into()),
"bead_id={BEAD_ID} key_from_new_base"
);
}
}
#[test]
fn test_rebase_unique_index_enforcement_on_new_base() {
struct ConflictChecker {
expected_key: Vec<u8>,
}
impl UniqueChecker for ConflictChecker {
fn check_unique(
&self,
_index_id: IndexId,
key_bytes: &[u8],
_exclude_rowid: RowId,
) -> Option<RowId> {
if key_bytes == self.expected_key {
Some(RowId::new(99))
} else {
None
}
}
}
let mut rows = HashMap::new();
rows.insert(
(1, 1),
record_bytes(&[
SqliteValue::Integer(1),
SqliteValue::Text("original".into()),
]),
);
let reader = MemBaseReader { rows };
let index_def = IndexDef {
index_id: IndexId::new(20),
table_id: TableId::new(1),
unique: true,
key_parts: vec![IndexKeyPart::Column {
col_idx: ColumnIdx::new(1),
affinity: TypeAffinity::Text,
collation: Collation::Binary,
}],
where_predicate: None,
table_column_affinities: vec![TypeAffinity::Integer, TypeAffinity::Text],
};
let schema = MemSchema {
constraints: HashMap::from([(
1,
TableConstraints {
table_id: TableId::new(1),
not_null: vec![false, false],
affinities: vec![TypeAffinity::Integer, TypeAffinity::Text],
check_exprs: vec![],
},
)]),
indexes: HashMap::from([(1, vec![index_def.clone()])]),
};
let conflict_key = compute_index_key(
&index_def,
&[
SqliteValue::Integer(1),
SqliteValue::Text("conflict_value".into()),
],
)
.unwrap();
let checker = ConflictChecker {
expected_key: conflict_key,
};
let column_updates = vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Text("conflict_value".into())),
)];
let result = replay_update_expression(
TableId::new(1),
RowId::new(1),
&column_updates,
&reader,
&schema,
&checker,
);
assert!(
matches!(
result,
Err(RebaseError::UniqueViolation {
conflicting_rowid,
..
}) if conflicting_rowid.get() == 99
),
"bead_id={BEAD_ID} unique_violation_on_rebase"
);
}
#[test]
fn test_vdbe_codegen_updateexpression_emission_rules() {
let good = UpdateExpressionCandidate {
has_triggers: false,
has_foreign_keys: false,
all_checks_rebase_safe: true,
is_rowid_point_lookup: true,
sets_rowid: false,
all_sets_rebase_safe: true,
has_prior_read_of_same_row: false,
};
assert!(
can_emit_update_expression(&good),
"bead_id={BEAD_ID} all_conditions_met"
);
assert!(
!can_emit_update_expression(&UpdateExpressionCandidate {
has_triggers: true,
..good
}),
"bead_id={BEAD_ID} has_triggers"
);
assert!(
!can_emit_update_expression(&UpdateExpressionCandidate {
has_foreign_keys: true,
..good
}),
"bead_id={BEAD_ID} has_fk"
);
assert!(
!can_emit_update_expression(&UpdateExpressionCandidate {
all_checks_rebase_safe: false,
..good
}),
"bead_id={BEAD_ID} check_not_safe"
);
assert!(
!can_emit_update_expression(&UpdateExpressionCandidate {
is_rowid_point_lookup: false,
..good
}),
"bead_id={BEAD_ID} not_point_lookup"
);
assert!(
!can_emit_update_expression(&UpdateExpressionCandidate {
sets_rowid: true,
..good
}),
"bead_id={BEAD_ID} sets_rowid"
);
assert!(
!can_emit_update_expression(&UpdateExpressionCandidate {
all_sets_rebase_safe: false,
..good
}),
"bead_id={BEAD_ID} set_not_safe"
);
assert!(
!can_emit_update_expression(&UpdateExpressionCandidate {
has_prior_read_of_same_row: true,
..good
}),
"bead_id={BEAD_ID} prior_read"
);
}
#[test]
fn test_full_deterministic_rebase_pipeline() {
let mut rows = HashMap::new();
rows.insert(
(1, 1),
record_bytes(&[SqliteValue::Integer(1), SqliteValue::Integer(50)]),
);
rows.insert(
(1, 2),
record_bytes(&[SqliteValue::Integer(2), SqliteValue::Integer(75)]),
);
let reader = MemBaseReader { rows };
let schema = MemSchema {
constraints: HashMap::from([(
1,
TableConstraints {
table_id: TableId::new(1),
not_null: vec![false, false],
affinities: vec![TypeAffinity::Integer, TypeAffinity::Integer],
check_exprs: vec![],
},
)]),
indexes: HashMap::new(),
};
let intent_log = vec![
make_intent_op(
1,
empty_footprint(),
IntentOpKind::UpdateExpression {
table: TableId::new(1),
key: RowId::new(1),
column_updates: vec![(
ColumnIdx::new(1),
RebaseExpr::BinaryOp {
op: fsqlite_types::glossary::RebaseBinaryOp::Add,
left: Box::new(RebaseExpr::ColumnRef(ColumnIdx::new(1))),
right: Box::new(RebaseExpr::Literal(SqliteValue::Integer(10))),
},
)],
},
),
make_intent_op(
1,
empty_footprint(),
IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(100),
record: vec![1, 2, 3],
},
),
make_intent_op(
1,
empty_footprint(),
IntentOpKind::UpdateExpression {
table: TableId::new(1),
key: RowId::new(2),
column_updates: vec![(
ColumnIdx::new(1),
RebaseExpr::BinaryOp {
op: fsqlite_types::glossary::RebaseBinaryOp::Multiply,
left: Box::new(RebaseExpr::ColumnRef(ColumnIdx::new(1))),
right: Box::new(RebaseExpr::Literal(SqliteValue::Integer(2))),
},
)],
},
),
];
let snapshot = test_snapshot(1);
let result =
deterministic_rebase(&intent_log, snapshot, &reader, &schema, &NoOpUniqueChecker)
.unwrap();
assert_eq!(result.replayed_count, 2, "bead_id={BEAD_ID} replayed_count");
assert_eq!(result.rebased_ops.len(), 3, "bead_id={BEAD_ID} total_ops");
if let IntentOpKind::Update { new_record, .. } = &result.rebased_ops[0] {
let parsed = parse_record(new_record).unwrap();
assert_eq!(parsed[1], SqliteValue::Integer(60));
} else {
panic!("bead_id={BEAD_ID} expected Update for rowid 1");
}
assert!(matches!(
&result.rebased_ops[1],
IntentOpKind::Insert { .. }
));
if let IntentOpKind::Update { new_record, .. } = &result.rebased_ops[2] {
let parsed = parse_record(new_record).unwrap();
assert_eq!(parsed[1], SqliteValue::Integer(150));
} else {
panic!("bead_id={BEAD_ID} expected Update for rowid 2");
}
}
#[test]
fn test_rebase_target_row_not_found() {
let reader = MemBaseReader {
rows: HashMap::new(),
};
let schema = empty_schema();
let result = replay_update_expression(
TableId::new(1),
RowId::new(999),
&[],
&reader,
&schema,
&NoOpUniqueChecker,
);
assert!(
matches!(
result,
Err(RebaseError::TargetRowNotFound { key, .. }) if key.get() == 999
),
"bead_id={BEAD_ID} target_not_found"
);
}
#[test]
fn test_rebase_check_constraint_violation() {
let mut rows = HashMap::new();
rows.insert(
(1, 1),
record_bytes(&[SqliteValue::Integer(1), SqliteValue::Integer(10)]),
);
let reader = MemBaseReader { rows };
let schema = MemSchema {
constraints: HashMap::from([(
1,
TableConstraints {
table_id: TableId::new(1),
not_null: vec![false, false],
affinities: vec![TypeAffinity::Integer, TypeAffinity::Integer],
check_exprs: vec![
RebaseExpr::ColumnRef(ColumnIdx::new(1)),
],
},
)]),
indexes: HashMap::new(),
};
let column_updates = vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Integer(0)),
)];
let result = replay_update_expression(
TableId::new(1),
RowId::new(1),
&column_updates,
&reader,
&schema,
&NoOpUniqueChecker,
);
assert!(
matches!(result, Err(RebaseError::CheckViolation { .. })),
"bead_id={BEAD_ID} check_violation"
);
}
#[test]
fn test_rebase_check_null_passes() {
let mut rows = HashMap::new();
rows.insert(
(1, 1),
record_bytes(&[SqliteValue::Integer(1), SqliteValue::Integer(10)]),
);
let reader = MemBaseReader { rows };
let schema = MemSchema {
constraints: HashMap::from([(
1,
TableConstraints {
table_id: TableId::new(1),
not_null: vec![false, false],
affinities: vec![TypeAffinity::Integer, TypeAffinity::Integer],
check_exprs: vec![
RebaseExpr::Literal(SqliteValue::Null),
],
},
)]),
indexes: HashMap::new(),
};
let column_updates = vec![(
ColumnIdx::new(1),
RebaseExpr::Literal(SqliteValue::Integer(42)),
)];
let result = replay_update_expression(
TableId::new(1),
RowId::new(1),
&column_updates,
&reader,
&schema,
&NoOpUniqueChecker,
);
assert!(result.is_ok(), "bead_id={BEAD_ID} check_null_passes");
}
#[test]
fn test_eligible_intent_log_rebase_eligibility() {
let intent_log = vec![
make_intent_op(
1,
empty_footprint(),
IntentOpKind::Insert {
table: TableId::new(1),
key: RowId::new(1),
record: vec![],
},
),
make_intent_op(
1,
empty_footprint(),
IntentOpKind::UpdateExpression {
table: TableId::new(1),
key: RowId::new(2),
column_updates: vec![],
},
),
];
assert_eq!(
check_rebase_eligibility(&intent_log),
RebaseEligibility::Eligible,
"bead_id={BEAD_ID} eligible"
);
}
}