use crate::{
error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY, SQLITE_CONSTRAINT_UNIQUE},
schema::{self, BTreeTable, ColDef, Column, Index, IndexColumn, ResolvedFkRef, Schema, Table},
translate::{
emitter::{
emit_cdc_full_record, emit_cdc_insns, emit_cdc_patch_record, prepare_cdc_if_necessary,
OperationMode, Resolver,
},
expr::{
bind_and_rewrite_expr, emit_returning_results, process_returning_clause,
translate_expr, translate_expr_no_constant_opt, walk_expr_mut, BindingBehavior,
NoConstantOptReason, WalkControl,
},
fkeys::{
build_index_affinity_string, emit_fk_violation, emit_guarded_fk_decrement,
fire_fk_delete_actions, index_probe, open_read_index, open_read_table,
},
plan::{
ColumnUsedMask, JoinedTable, Operation, QueryDestination, ResultSetColumn,
TableReferences,
},
planner::ROWID_STRS,
select::translate_select,
trigger_exec::{
fire_trigger, get_relevant_triggers_type_and_time, has_relevant_triggers_type_only,
TriggerContext,
},
upsert::{
collect_set_clauses_for_upsert, emit_upsert, resolve_upsert_target,
ResolvedUpsertTarget,
},
},
util::normalize_ident,
vdbe::{
affinity::Affinity,
builder::{CursorKey, ProgramBuilderOpts},
insn::{
to_u16, {CmpInsFlags, IdxInsertFlags, InsertFlags, RegisterOrLiteral},
},
BranchOffset,
},
vdbe::{
builder::{CursorType, ProgramBuilder},
insn::Insn,
},
Connection, LimboError, Result, VirtualTable,
};
use std::num::NonZeroUsize;
use std::sync::Arc;
use turso_parser::ast::{
self, Expr, InsertBody, OneSelect, QualifiedName, ResolveType, ResultColumn, TriggerEvent,
TriggerTime, Upsert, UpsertDo,
};
fn validate(table_name: &str, resolver: &Resolver, table: &Table) -> Result<()> {
if !crate::schema::can_write_to_table(table_name) {
crate::bail_parse_error!("table {} may not be modified", table_name);
}
let incompatible_views = resolver.schema.has_incompatible_dependent_views(table_name);
if !incompatible_views.is_empty() {
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
crate::bail_parse_error!(
"Cannot INSERT into table '{}' because it has incompatible dependent materialized view(s): {}. \n\
These views were created with a different DBSP version than the current version ({}). \n\
Please DROP and recreate the view(s) before modifying this table.",
table_name,
incompatible_views.join(", "),
DBSP_CIRCUIT_VERSION
);
}
if resolver.schema.is_materialized_view(table_name) {
crate::bail_parse_error!("cannot modify materialized view {}", table_name);
}
if table.btree().is_some_and(|t| !t.has_rowid) {
crate::bail_parse_error!("INSERT into WITHOUT ROWID table is not supported");
}
Ok(())
}
pub struct TempTableCtx {
cursor_id: usize,
loop_start_label: BranchOffset,
loop_end_label: BranchOffset,
}
pub struct InsertLoopLabels {
pub loop_start: BranchOffset,
pub row_done: BranchOffset,
pub stmt_epilogue: BranchOffset,
pub select_exhausted: Option<BranchOffset>,
}
pub struct InsertKeyLabels {
pub key_ready_for_check: BranchOffset,
pub key_generation: BranchOffset,
}
#[allow(dead_code)]
pub struct InsertEmitCtx<'a> {
pub table: &'a Arc<BTreeTable>,
pub idx_cursors: Vec<(String, i64, usize)>,
pub temp_table_ctx: Option<TempTableCtx>,
pub on_conflict: ResolveType,
pub num_values: usize,
pub yield_reg_opt: Option<usize>,
pub conflict_rowid_reg: usize,
pub cursor_id: usize,
pub halt_label: BranchOffset,
pub loop_labels: InsertLoopLabels,
pub key_labels: InsertKeyLabels,
pub cdc_table: Option<(usize, Arc<BTreeTable>)>,
pub autoincrement_meta: Option<AutoincMeta>,
}
impl<'a> InsertEmitCtx<'a> {
fn new(
program: &mut ProgramBuilder,
resolver: &Resolver,
table: &'a Arc<BTreeTable>,
on_conflict: Option<ResolveType>,
cdc_table: Option<(usize, Arc<BTreeTable>)>,
num_values: usize,
temp_table_ctx: Option<TempTableCtx>,
) -> Result<Self> {
let indices = resolver.schema.get_indices(table.name.as_str());
let mut idx_cursors = Vec::new();
for idx in indices {
idx_cursors.push((
idx.name.clone(),
idx.root_page,
program.alloc_cursor_index(None, idx)?,
));
}
let loop_labels = InsertLoopLabels {
loop_start: program.allocate_label(),
row_done: program.allocate_label(),
stmt_epilogue: program.allocate_label(),
select_exhausted: None,
};
let key_labels = InsertKeyLabels {
key_ready_for_check: program.allocate_label(),
key_generation: program.allocate_label(),
};
Ok(Self {
table,
idx_cursors,
temp_table_ctx,
on_conflict: on_conflict.unwrap_or(ResolveType::Abort),
yield_reg_opt: None,
conflict_rowid_reg: program.alloc_register(),
cursor_id: 0, halt_label: program.allocate_label(),
loop_labels,
key_labels,
cdc_table,
num_values,
autoincrement_meta: None,
})
}
}
#[allow(clippy::too_many_arguments)]
pub fn translate_insert(
resolver: &mut Resolver,
on_conflict: Option<ResolveType>,
tbl_name: QualifiedName,
columns: Vec<ast::Name>,
mut body: InsertBody,
mut returning: Vec<ResultColumn>,
mut program: ProgramBuilder,
connection: &Arc<crate::Connection>,
) -> Result<ProgramBuilder> {
let opts = ProgramBuilderOpts {
num_cursors: 1,
approx_num_insns: 30,
approx_num_labels: 5,
};
program.extend(&opts);
let table_name = &tbl_name.name;
let table = match resolver.schema.get_table(table_name.as_str()) {
Some(table) => table,
None => crate::bail_parse_error!("no such table: {}", table_name),
};
validate(table_name.as_str(), resolver, &table)?;
let fk_enabled = connection.foreign_keys_enabled();
if let Some(virtual_table) = &table.virtual_table() {
program = translate_virtual_table_insert(
program,
virtual_table.clone(),
columns,
body,
on_conflict,
resolver,
)?;
return Ok(program);
}
let Some(btree_table) = table.btree() else {
crate::bail_parse_error!("no such table: {}", table_name);
};
let BoundInsertResult {
mut values,
mut upsert_actions,
inserting_multiple_rows,
} = bind_insert(
&mut program,
resolver,
&table,
&mut body,
connection,
on_conflict.unwrap_or(ResolveType::Abort),
)?;
if inserting_multiple_rows && btree_table.has_autoincrement {
ensure_sequence_initialized(&mut program, resolver.schema, &btree_table)?;
}
let cdc_table = prepare_cdc_if_necessary(&mut program, resolver.schema, table.get_name())?;
let mut table_references = TableReferences::new(
vec![JoinedTable {
table: Table::BTree(
table
.btree()
.expect("we shouldn't have got here without a BTree table"),
),
identifier: table_name.to_string(),
internal_id: program.table_reference_counter.next(),
op: Operation::default_scan_for(&table),
join_info: None,
col_used_mask: ColumnUsedMask::default(),
column_use_counts: Vec::new(),
expression_index_usages: Vec::new(),
database_id: 0,
}],
vec![],
);
let mut result_columns =
process_returning_clause(&mut returning, &mut table_references, connection)?;
let has_fks = fk_enabled
&& (resolver.schema.has_child_fks(table_name.as_str())
|| resolver
.schema
.any_resolved_fks_referencing(table_name.as_str()));
let mut ctx = InsertEmitCtx::new(
&mut program,
resolver,
&btree_table,
on_conflict,
cdc_table,
values.len(),
None,
)?;
program = init_source_emission(
program,
&table,
connection,
&mut ctx,
resolver,
&mut values,
body,
&columns,
&table_references,
)?;
let has_upsert = !upsert_actions.is_empty();
if !result_columns.is_empty() {
program.result_columns = result_columns.clone();
}
let insertion = build_insertion(&mut program, &table, &columns, ctx.num_values)?;
translate_rows_and_open_tables(
&mut program,
resolver,
&insertion,
&ctx,
&values,
inserting_multiple_rows,
)?;
let has_user_provided_rowid = insertion.key.is_provided_by_user();
if ctx.table.has_autoincrement {
init_autoincrement(&mut program, &mut ctx, resolver)?;
}
let relevant_before_triggers = get_relevant_triggers_type_and_time(
resolver.schema,
TriggerEvent::Insert,
TriggerTime::Before,
None,
&btree_table,
);
let has_relevant_before_triggers = relevant_before_triggers.clone().count() > 0;
if has_relevant_before_triggers {
let new_registers: Vec<usize> = insertion
.col_mappings
.iter()
.map(|col_mapping| {
if col_mapping.column.is_rowid_alias() {
insertion.key_register()
} else {
col_mapping.register
}
})
.chain(std::iter::once(insertion.key_register()))
.collect();
let trigger_ctx = TriggerContext::new(
btree_table.clone(),
Some(new_registers),
None, );
for trigger in relevant_before_triggers {
fire_trigger(&mut program, resolver, trigger, &trigger_ctx, connection)?;
}
}
if has_user_provided_rowid {
let must_be_int_label = program.allocate_label();
program.emit_insn(Insn::NotNull {
reg: insertion.key_register(),
target_pc: must_be_int_label,
});
program.emit_insn(Insn::Goto {
target_pc: ctx.key_labels.key_generation,
});
program.preassign_label_to_next_insn(must_be_int_label);
program.emit_insn(Insn::MustBeInt {
reg: insertion.key_register(),
});
program.emit_insn(Insn::Goto {
target_pc: ctx.key_labels.key_ready_for_check,
});
}
program.preassign_label_to_next_insn(ctx.key_labels.key_generation);
emit_rowid_generation(&mut program, resolver, &ctx, &insertion)?;
program.preassign_label_to_next_insn(ctx.key_labels.key_ready_for_check);
if ctx.table.is_strict {
program.emit_insn(Insn::TypeCheck {
start_reg: insertion.first_col_register(),
count: insertion.col_mappings.len(),
check_generated: true,
table_reference: Arc::clone(ctx.table),
});
} else {
let affinity = insertion
.col_mappings
.iter()
.map(|col_mapping| col_mapping.column.affinity());
if affinity.clone().any(|a| a != Affinity::Blob) {
if let Ok(count) = std::num::NonZeroUsize::try_from(insertion.col_mappings.len()) {
program.emit_insn(Insn::Affinity {
start_reg: insertion.first_col_register(),
count,
affinities: affinity.map(|a| a.aff_mask()).collect(),
});
}
}
}
let constraints = build_constraints_to_check(
resolver,
table_name.as_str(),
&upsert_actions,
has_user_provided_rowid,
);
let on_replace = matches!(ctx.on_conflict, ResolveType::Replace) && upsert_actions.is_empty();
let mut preflight_ctx = PreflightCtx {
upsert_actions: &upsert_actions,
on_replace,
connection,
table_references: &mut table_references,
};
emit_preflight_constraint_checks(
&mut program,
&mut ctx,
resolver,
&insertion,
&constraints,
&mut preflight_ctx,
)?;
let notnull_resume_label = emit_notnulls(&mut program, &ctx, &insertion, resolver)?;
let affinity_str = insertion
.col_mappings
.iter()
.map(|col_mapping| col_mapping.column.affinity().aff_mask())
.collect::<String>();
if let Some(lbl) = notnull_resume_label {
program.preassign_label_to_next_insn(lbl);
}
program.emit_insn(Insn::MakeRecord {
start_reg: to_u16(insertion.first_col_register()),
count: to_u16(insertion.col_mappings.len()),
dest_reg: to_u16(insertion.record_register()),
index_name: None,
affinity_str: Some(affinity_str),
});
if has_upsert {
emit_commit_phase(&mut program, resolver, &insertion, &ctx)?;
}
if has_fks {
emit_fk_child_insert_checks(
&mut program,
resolver,
&btree_table,
insertion.first_col_register(),
insertion.key_register(),
)?;
}
let mut insert_flags = InsertFlags::new();
let on_replace = matches!(ctx.on_conflict, ResolveType::Replace);
if on_replace {
insert_flags = insert_flags.require_seek();
}
program.emit_insn(Insn::Insert {
cursor: ctx.cursor_id,
key_reg: insertion.key_register(),
record_reg: insertion.record_register(),
flag: insert_flags,
table_name: table_name.to_string(),
});
let relevant_after_triggers = get_relevant_triggers_type_and_time(
resolver.schema,
TriggerEvent::Insert,
TriggerTime::After,
None,
&btree_table,
);
let has_relevant_after_triggers = relevant_after_triggers.clone().count() > 0;
if has_relevant_after_triggers {
let new_registers_after: Vec<usize> = insertion
.col_mappings
.iter()
.map(|col_mapping| {
if col_mapping.column.is_rowid_alias() {
insertion.key_register()
} else {
col_mapping.register
}
})
.chain(std::iter::once(insertion.key_register()))
.collect();
let trigger_ctx_after =
TriggerContext::new(btree_table.clone(), Some(new_registers_after), None);
for trigger in relevant_after_triggers {
fire_trigger(
&mut program,
resolver,
trigger,
&trigger_ctx_after,
connection,
)?;
}
}
if has_fks {
emit_parent_side_fk_decrement_on_insert(
&mut program,
resolver,
&btree_table,
&insertion,
on_replace,
)?;
}
if let Some(AutoincMeta {
seq_cursor_id,
r_seq,
r_seq_rowid,
table_name_reg,
}) = ctx.autoincrement_meta
{
let no_update_needed_label = program.allocate_label();
program.emit_insn(Insn::Le {
lhs: insertion.key_register(),
rhs: r_seq,
target_pc: no_update_needed_label,
flags: Default::default(),
collation: None,
});
emit_update_sqlite_sequence(
&mut program,
resolver.schema,
seq_cursor_id,
r_seq_rowid,
table_name_reg,
insertion.key_register(),
)?;
program.preassign_label_to_next_insn(no_update_needed_label);
program.emit_insn(Insn::Close {
cursor_id: seq_cursor_id,
});
}
if let Some((cdc_cursor_id, _)) = &ctx.cdc_table {
let cdc_has_after = program.capture_data_changes_mode().has_after();
let after_record_reg = if cdc_has_after {
Some(emit_cdc_patch_record(
&mut program,
&table,
insertion.first_col_register(),
insertion.record_register(),
insertion.key_register(),
))
} else {
None
};
emit_cdc_insns(
&mut program,
resolver,
OperationMode::INSERT,
*cdc_cursor_id,
insertion.key_register(),
None,
after_record_reg,
None,
table_name.as_str(),
)?;
}
if !result_columns.is_empty() {
emit_returning_results(
&mut program,
&table_references,
&result_columns,
insertion.first_col_register(),
insertion.key_register(),
resolver,
)?;
}
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.row_done,
});
if !upsert_actions.is_empty() {
resolve_upserts(
&mut program,
resolver,
&mut upsert_actions,
&ctx,
&insertion,
&table,
&mut result_columns,
connection,
&mut table_references,
)?;
}
emit_epilogue(&mut program, &ctx, inserting_multiple_rows);
program.set_needs_stmt_subtransactions(true);
program.result_columns = result_columns;
program.table_references.extend(table_references);
Ok(program)
}
fn emit_epilogue(program: &mut ProgramBuilder, ctx: &InsertEmitCtx, inserting_multiple_rows: bool) {
if inserting_multiple_rows {
if let Some(temp_table_ctx) = &ctx.temp_table_ctx {
program.resolve_label(ctx.loop_labels.row_done, program.offset());
program.emit_insn(Insn::Next {
cursor_id: temp_table_ctx.cursor_id,
pc_if_next: temp_table_ctx.loop_start_label,
});
program.preassign_label_to_next_insn(temp_table_ctx.loop_end_label);
program.emit_insn(Insn::Close {
cursor_id: temp_table_ctx.cursor_id,
});
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.stmt_epilogue,
});
} else {
program.resolve_label(ctx.loop_labels.row_done, program.offset());
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.loop_start,
});
if let Some(sel_eof) = ctx.loop_labels.select_exhausted {
program.preassign_label_to_next_insn(sel_eof);
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.stmt_epilogue,
});
}
}
} else {
program.resolve_label(ctx.loop_labels.row_done, program.offset());
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.stmt_epilogue,
});
}
program.preassign_label_to_next_insn(ctx.loop_labels.stmt_epilogue);
program.resolve_label(ctx.halt_label, program.offset());
}
fn emit_partial_index_check(
program: &mut ProgramBuilder,
resolver: &Resolver,
index: &Index,
insertion: &Insertion,
) -> Result<Option<BranchOffset>> {
let Some(where_clause) = &index.where_clause else {
return Ok(None);
};
let mut where_for_eval = where_clause.as_ref().clone();
rewrite_partial_index_where(&mut where_for_eval, insertion)?;
let reg = program.alloc_register();
translate_expr_no_constant_opt(
program,
Some(&TableReferences::new_empty()),
&where_for_eval,
reg,
resolver,
NoConstantOptReason::RegisterReuse,
)?;
let skip_label = program.allocate_label();
program.emit_insn(Insn::IfNot {
reg,
target_pc: skip_label,
jump_if_null: true,
});
Ok(Some(skip_label))
}
fn emit_commit_phase(
program: &mut ProgramBuilder,
resolver: &Resolver,
insertion: &Insertion,
ctx: &InsertEmitCtx,
) -> Result<()> {
for index in resolver.schema.get_indices(ctx.table.name.as_str()) {
let idx_cursor_id = ctx
.idx_cursors
.iter()
.find(|(name, _, _)| name == &index.name)
.map(|(_, _, c_id)| *c_id)
.expect("no cursor found for index");
let commit_skip_label = emit_partial_index_check(program, resolver, index, insertion)?;
let num_cols = index.columns.len();
let idx_start_reg = program.alloc_registers(num_cols + 1);
for (i, idx_col) in index.columns.iter().enumerate() {
emit_index_column_value_for_insert(
program,
resolver,
insertion,
idx_col,
idx_start_reg + i,
)?;
}
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: idx_start_reg + num_cols,
extra_amount: 0,
});
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: to_u16(idx_start_reg),
count: to_u16(num_cols + 1),
dest_reg: to_u16(record_reg),
index_name: Some(index.name.clone()),
affinity_str: None,
});
program.emit_insn(Insn::IdxInsert {
cursor_id: idx_cursor_id,
record_reg,
unpacked_start: Some(idx_start_reg),
unpacked_count: Some((num_cols + 1) as u16),
flags: IdxInsertFlags::new().nchange(true),
});
if let Some(lbl) = commit_skip_label {
program.resolve_label(lbl, program.offset());
}
}
Ok(())
}
fn translate_rows_and_open_tables(
program: &mut ProgramBuilder,
resolver: &Resolver,
insertion: &Insertion,
ctx: &InsertEmitCtx,
values: &[Box<Expr>],
inserting_multiple_rows: bool,
) -> Result<()> {
if inserting_multiple_rows {
let select_result_start_reg = program
.reg_result_cols_start
.unwrap_or_else(|| ctx.yield_reg_opt.unwrap() + 1);
translate_rows_multiple(
program,
insertion,
select_result_start_reg,
resolver,
&ctx.temp_table_ctx,
)?;
} else {
program.emit_insn(Insn::OpenWrite {
cursor_id: ctx.cursor_id,
root_page: RegisterOrLiteral::Literal(ctx.table.root_page),
db: 0,
});
translate_rows_single(program, values, insertion, resolver)?;
}
for idx_cursor in ctx.idx_cursors.iter() {
program.emit_insn(Insn::OpenWrite {
cursor_id: idx_cursor.2,
root_page: idx_cursor.1.into(),
db: 0,
});
}
Ok(())
}
fn emit_rowid_generation(
program: &mut ProgramBuilder,
resolver: &Resolver,
ctx: &InsertEmitCtx,
insertion: &Insertion,
) -> Result<()> {
if let Some(AutoincMeta {
r_seq,
seq_cursor_id,
r_seq_rowid,
table_name_reg,
..
}) = ctx.autoincrement_meta
{
let r_max = program.alloc_register();
let dummy_reg = program.alloc_register();
program.emit_insn(Insn::NewRowid {
cursor: ctx.cursor_id,
rowid_reg: dummy_reg,
prev_largest_reg: r_max,
});
program.emit_insn(Insn::Copy {
src_reg: r_seq,
dst_reg: insertion.key_register(),
extra_amount: 0,
});
program.emit_insn(Insn::MemMax {
dest_reg: insertion.key_register(),
src_reg: r_max,
});
let no_overflow_label = program.allocate_label();
let max_i64_reg = program.alloc_register();
program.emit_insn(Insn::Integer {
dest: max_i64_reg,
value: i64::MAX,
});
program.emit_insn(Insn::Ne {
lhs: insertion.key_register(),
rhs: max_i64_reg,
target_pc: no_overflow_label,
flags: Default::default(),
collation: None,
});
program.emit_insn(Insn::Halt {
err_code: crate::error::SQLITE_FULL,
description: "database or disk is full".to_string(),
});
program.preassign_label_to_next_insn(no_overflow_label);
program.emit_insn(Insn::AddImm {
register: insertion.key_register(),
value: 1,
});
emit_update_sqlite_sequence(
program,
resolver.schema,
seq_cursor_id,
r_seq_rowid,
table_name_reg,
insertion.key_register(),
)?;
} else {
program.emit_insn(Insn::NewRowid {
cursor: ctx.cursor_id,
rowid_reg: insertion.key_register(),
prev_largest_reg: 0,
});
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn resolve_upserts(
program: &mut ProgramBuilder,
resolver: &mut Resolver,
upsert_actions: &mut [(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)],
ctx: &InsertEmitCtx,
insertion: &Insertion,
table: &Table,
result_columns: &mut [ResultSetColumn],
connection: &Arc<crate::Connection>,
table_references: &mut TableReferences,
) -> Result<()> {
for (_, label, upsert) in upsert_actions {
program.preassign_label_to_next_insn(*label);
if let UpsertDo::Set {
ref mut sets,
ref mut where_clause,
} = upsert.do_clause
{
let mut rewritten_sets = collect_set_clauses_for_upsert(table, sets)?;
emit_upsert(
program,
table,
ctx,
insertion,
&mut rewritten_sets,
where_clause,
resolver,
result_columns,
connection,
table_references,
)?;
} else {
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.row_done,
});
}
}
Ok(())
}
fn init_autoincrement(
program: &mut ProgramBuilder,
ctx: &mut InsertEmitCtx,
resolver: &Resolver,
) -> Result<()> {
let seq_table = resolver
.schema
.get_btree_table("sqlite_sequence")
.ok_or_else(|| {
crate::error::LimboError::InternalError("sqlite_sequence table not found".to_string())
})?;
let seq_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(seq_table.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: seq_cursor_id,
root_page: seq_table.root_page.into(),
db: 0,
});
let table_name_reg = program.emit_string8_new_reg(ctx.table.name.clone());
let r_seq = program.alloc_register();
let r_seq_rowid = program.alloc_register();
ctx.autoincrement_meta = Some(AutoincMeta {
seq_cursor_id,
r_seq,
r_seq_rowid,
table_name_reg,
});
program.emit_insn(Insn::Integer {
dest: r_seq,
value: 0,
});
program.emit_insn(Insn::Null {
dest: r_seq_rowid,
dest_end: None,
});
let loop_start_label = program.allocate_label();
let loop_end_label = program.allocate_label();
let found_label = program.allocate_label();
program.emit_insn(Insn::Rewind {
cursor_id: seq_cursor_id,
pc_if_empty: loop_end_label,
});
program.preassign_label_to_next_insn(loop_start_label);
let name_col_reg = program.alloc_register();
program.emit_column_or_rowid(seq_cursor_id, 0, name_col_reg);
program.emit_insn(Insn::Ne {
lhs: table_name_reg,
rhs: name_col_reg,
target_pc: found_label,
flags: Default::default(),
collation: None,
});
program.emit_column_or_rowid(seq_cursor_id, 1, r_seq);
program.emit_insn(Insn::RowId {
cursor_id: seq_cursor_id,
dest: r_seq_rowid,
});
program.emit_insn(Insn::Goto {
target_pc: loop_end_label,
});
program.preassign_label_to_next_insn(found_label);
program.emit_insn(Insn::Next {
cursor_id: seq_cursor_id,
pc_if_next: loop_start_label,
});
program.preassign_label_to_next_insn(loop_end_label);
Ok(())
}
fn emit_notnulls(
program: &mut ProgramBuilder,
ctx: &InsertEmitCtx,
insertion: &Insertion,
resolver: &Resolver,
) -> Result<Option<BranchOffset>> {
let on_replace = matches!(ctx.on_conflict, ResolveType::Replace);
let on_ignore = matches!(ctx.on_conflict, ResolveType::Ignore);
let mut pending_resume_label = None;
for column_mapping in insertion
.col_mappings
.iter()
.filter(|column_mapping| column_mapping.column.notnull())
{
if column_mapping.column.is_rowid_alias() {
continue;
}
if on_replace {
if let Some(default_expr) = column_mapping.column.default.as_ref() {
let default_label = {
if let Some(lbl) = pending_resume_label {
lbl
} else {
program.allocate_label()
}
};
let resume_label = program.allocate_label();
program.emit_insn(Insn::IsNull {
reg: column_mapping.register,
target_pc: default_label,
});
program.emit_insn(Insn::Goto {
target_pc: resume_label,
});
program.resolve_label(default_label, program.offset());
translate_expr_no_constant_opt(
program,
None,
default_expr,
column_mapping.register,
resolver,
NoConstantOptReason::RegisterReuse,
)?;
program.emit_insn(Insn::Goto {
target_pc: resume_label,
});
pending_resume_label = Some(resume_label);
}
}
if on_ignore {
program.emit_insn(Insn::IsNull {
reg: column_mapping.register,
target_pc: ctx.loop_labels.row_done,
});
} else {
program.emit_insn(Insn::HaltIfNull {
target_reg: column_mapping.register,
err_code: SQLITE_CONSTRAINT_NOTNULL,
description: {
let mut description = String::with_capacity(
ctx.table.name.as_str().len()
+ column_mapping
.column
.name
.as_ref()
.expect("Column name must be present")
.len()
+ 2,
);
description.push_str(ctx.table.name.as_str());
description.push('.');
description.push_str(
column_mapping
.column
.name
.as_ref()
.expect("Column name must be present"),
);
description
},
});
}
}
Ok(pending_resume_label)
}
struct BoundInsertResult {
#[allow(clippy::vec_box)]
values: Vec<Box<Expr>>,
upsert_actions: Vec<(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)>,
inserting_multiple_rows: bool,
}
fn bind_insert(
program: &mut ProgramBuilder,
resolver: &Resolver,
table: &Table,
body: &mut InsertBody,
connection: &Arc<Connection>,
on_conflict: ResolveType,
) -> Result<BoundInsertResult> {
let mut values: Vec<Box<Expr>> = vec![];
let mut upsert: Option<Box<Upsert>> = None;
let mut upsert_actions: Vec<(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)> = Vec::new();
let mut inserting_multiple_rows = false;
match body {
InsertBody::DefaultValues => {
values = table
.columns()
.iter()
.filter(|c| !c.hidden())
.map(|c| {
c.default
.clone()
.unwrap_or_else(|| Box::new(ast::Expr::Literal(ast::Literal::Null)))
})
.collect();
}
InsertBody::Select(select, upsert_opt) => {
if select.body.compounds.is_empty() {
match &mut select.body.select {
OneSelect::Values(values_expr) if values_expr.len() <= 1 => {
if values_expr.is_empty() {
crate::bail_parse_error!("no values to insert");
}
for expr in values_expr.iter_mut().flat_map(|v| v.iter_mut()) {
match expr.as_mut() {
Expr::Id(name) => {
if name.quoted_with('"') {
*expr =
Expr::Literal(ast::Literal::String(name.as_literal()))
.into();
} else {
crate::bail_parse_error!("no such column: {name}");
}
}
Expr::Qualified(first_name, second_name) => {
crate::bail_parse_error!(
"no such column: {first_name}.{second_name}"
);
}
_ => {}
}
bind_and_rewrite_expr(
expr,
None,
None,
connection,
BindingBehavior::ResultColumnsNotAllowed,
)?;
}
values = values_expr.pop().unwrap_or_else(Vec::new);
}
_ => inserting_multiple_rows = true,
}
} else {
inserting_multiple_rows = true;
}
upsert = upsert_opt.take();
}
}
match on_conflict {
ResolveType::Ignore => {
program.set_resolve_type(ResolveType::Ignore);
upsert.replace(Box::new(ast::Upsert {
do_clause: UpsertDo::Nothing,
index: None,
next: None,
}));
}
ResolveType::Abort | ResolveType::Replace => {
}
_ => {
crate::bail_parse_error!("INSERT OR {} is not yet supported", on_conflict.to_string());
}
}
while let Some(mut upsert_opt) = upsert.take() {
if let UpsertDo::Set {
ref mut sets,
ref mut where_clause,
} = &mut upsert_opt.do_clause
{
for set in sets.iter_mut() {
bind_and_rewrite_expr(
&mut set.expr,
None,
None,
connection,
BindingBehavior::AllowUnboundIdentifiers,
)?;
}
if let Some(ref mut where_expr) = where_clause {
bind_and_rewrite_expr(
where_expr,
None,
None,
connection,
BindingBehavior::AllowUnboundIdentifiers,
)?;
}
}
let next = upsert_opt.next.take();
upsert_actions.push((
resolve_upsert_target(resolver.schema, table, &upsert_opt)?,
program.allocate_label(),
upsert_opt,
));
upsert = next;
}
Ok(BoundInsertResult {
values,
upsert_actions,
inserting_multiple_rows,
})
}
#[allow(clippy::too_many_arguments, clippy::vec_box)]
fn init_source_emission<'a>(
mut program: ProgramBuilder,
table: &Table,
connection: &Arc<Connection>,
ctx: &mut InsertEmitCtx<'a>,
resolver: &Resolver,
values: &mut Vec<Box<Expr>>,
body: InsertBody,
columns: &'a [ast::Name],
table_references: &TableReferences,
) -> Result<ProgramBuilder> {
let required_column_count = if columns.is_empty() {
table.columns().len()
} else {
columns.len()
};
if !values.is_empty() {
if values.len() != required_column_count {
crate::bail_parse_error!(
"{} values for {required_column_count} columns",
values.len()
);
}
}
let has_insert_triggers = has_relevant_triggers_type_only(
resolver.schema,
TriggerEvent::Insert,
None,
ctx.table.as_ref(),
);
let (num_values, cursor_id) = match body {
InsertBody::Select(select, _) => {
if select.body.compounds.is_empty()
&& matches!(&select.body.select, OneSelect::Values(values) if values.len() <= 1)
{
(
values.len(),
program.alloc_cursor_id_keyed(
CursorKey::table(table_references.joined_tables()[0].internal_id),
CursorType::BTreeTable(ctx.table.clone()),
),
)
} else {
let yield_reg = program.alloc_register();
let jump_on_definition_label = program.allocate_label();
let start_offset_label = program.allocate_label();
program.emit_insn(Insn::InitCoroutine {
yield_reg,
jump_on_definition: jump_on_definition_label,
start_offset: start_offset_label,
});
program.preassign_label_to_next_insn(start_offset_label);
let query_destination = QueryDestination::CoroutineYield {
yield_reg,
coroutine_implementation_start: ctx.halt_label,
};
program.incr_nesting();
let result =
translate_select(select, resolver, program, query_destination, connection)?;
if result.num_result_cols != required_column_count {
crate::bail_parse_error!(
"{} values for {required_column_count} columns",
result.num_result_cols,
);
}
program = result.program;
program.decr_nesting();
program.emit_insn(Insn::EndCoroutine { yield_reg });
program.preassign_label_to_next_insn(jump_on_definition_label);
let cursor_id = program.alloc_cursor_id_keyed(
CursorKey::table(table_references.joined_tables()[0].internal_id),
CursorType::BTreeTable(ctx.table.clone()),
);
if program.is_table_open(table) || has_insert_triggers {
let temp_cursor_id =
program.alloc_cursor_id(CursorType::BTreeTable(ctx.table.clone()));
ctx.temp_table_ctx = Some(TempTableCtx {
cursor_id: temp_cursor_id,
loop_start_label: program.allocate_label(),
loop_end_label: program.allocate_label(),
});
program.emit_insn(Insn::OpenEphemeral {
cursor_id: temp_cursor_id,
is_table: true,
});
program.preassign_label_to_next_insn(ctx.loop_labels.loop_start);
let yield_label = program.allocate_label();
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: yield_label, });
let record_reg = program.alloc_register();
let affinity_str = if columns.is_empty() {
ctx.table
.columns
.iter()
.filter(|col| !col.hidden())
.map(|col| col.affinity().aff_mask())
.collect::<String>()
} else {
columns
.iter()
.map(|col_name| {
let column_name = normalize_ident(col_name.as_str());
if ROWID_STRS
.iter()
.any(|s| s.eq_ignore_ascii_case(&column_name))
{
return Affinity::Integer.aff_mask();
}
table
.get_column_by_name(&column_name)
.unwrap()
.1
.affinity()
.aff_mask()
})
.collect::<String>()
};
program.emit_insn(Insn::MakeRecord {
start_reg: to_u16(program.reg_result_cols_start.unwrap_or(yield_reg + 1)),
count: to_u16(result.num_result_cols),
dest_reg: to_u16(record_reg),
index_name: None,
affinity_str: Some(affinity_str),
});
let rowid_reg = program.alloc_register();
program.emit_insn(Insn::NewRowid {
cursor: temp_cursor_id,
rowid_reg,
prev_largest_reg: 0,
});
program.emit_insn(Insn::Insert {
cursor: temp_cursor_id,
key_reg: rowid_reg,
record_reg,
flag: InsertFlags::new().require_seek(),
table_name: "".to_string(),
});
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.loop_start,
});
program.preassign_label_to_next_insn(yield_label);
program.emit_insn(Insn::OpenWrite {
cursor_id,
root_page: RegisterOrLiteral::Literal(ctx.table.root_page),
db: 0,
});
} else {
program.emit_insn(Insn::OpenWrite {
cursor_id,
root_page: RegisterOrLiteral::Literal(ctx.table.root_page),
db: 0,
});
program.preassign_label_to_next_insn(ctx.loop_labels.loop_start);
let select_exhausted = program.allocate_label();
ctx.loop_labels.select_exhausted = Some(select_exhausted);
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: select_exhausted,
});
}
ctx.yield_reg_opt = Some(yield_reg);
(result.num_result_cols, cursor_id)
}
}
InsertBody::DefaultValues => {
let num_values = table.columns().len();
values.extend(table.columns().iter().map(|c| {
c.default
.clone()
.unwrap_or_else(|| Box::new(ast::Expr::Literal(ast::Literal::Null)))
}));
(
num_values,
program.alloc_cursor_id_keyed(
CursorKey::table(table_references.joined_tables()[0].internal_id),
CursorType::BTreeTable(ctx.table.clone()),
),
)
}
};
ctx.num_values = num_values;
ctx.cursor_id = cursor_id;
Ok(program)
}
pub struct AutoincMeta {
seq_cursor_id: usize,
r_seq: usize,
r_seq_rowid: usize,
table_name_reg: usize,
}
pub const ROWID_COLUMN: Column = Column::new(
None, String::new(), None, None, schema::Type::Integer,
None,
ColDef {
primary_key: true,
rowid_alias: true,
notnull: true,
hidden: false,
unique: false,
},
);
#[derive(Debug)]
pub struct Insertion<'a> {
key: InsertionKey<'a>,
col_mappings: Vec<ColMapping<'a>>,
record_reg: usize,
}
impl<'a> Insertion<'a> {
pub fn key_register(&self) -> usize {
self.key.register()
}
pub fn first_col_register(&self) -> usize {
self.col_mappings
.first()
.expect("columns must be present")
.register
}
pub fn record_register(&self) -> usize {
self.record_reg
}
pub fn get_col_mapping_by_name(&self, name: &str) -> Option<&ColMapping<'a>> {
if let InsertionKey::RowidAlias(mapping) = &self.key {
if mapping
.column
.name
.as_ref()
.is_some_and(|n| n.eq_ignore_ascii_case(name))
{
return Some(mapping);
}
}
self.col_mappings.iter().find(|col| {
col.column
.name
.as_ref()
.is_some_and(|n| n.eq_ignore_ascii_case(name))
})
}
}
#[derive(Debug)]
enum InsertionKey<'a> {
Autogenerated { register: usize },
LiteralRowid {
value_index: Option<usize>,
register: usize,
},
RowidAlias(ColMapping<'a>),
}
impl InsertionKey<'_> {
fn register(&self) -> usize {
match self {
InsertionKey::Autogenerated { register } => *register,
InsertionKey::LiteralRowid { register, .. } => *register,
InsertionKey::RowidAlias(x) => x.register,
}
}
fn is_provided_by_user(&self) -> bool {
!matches!(self, InsertionKey::Autogenerated { .. })
}
fn column_name(&self) -> &str {
match self {
InsertionKey::RowidAlias(x) => x
.column
.name
.as_ref()
.expect("rowid alias column must be present")
.as_str(),
InsertionKey::LiteralRowid { .. } => ROWID_STRS[0],
InsertionKey::Autogenerated { .. } => ROWID_STRS[0],
}
}
}
#[derive(Debug)]
pub struct ColMapping<'a> {
pub column: &'a Column,
pub value_index: Option<usize>,
pub register: usize,
}
fn build_insertion<'a>(
program: &mut ProgramBuilder,
table: &'a Table,
columns: &'a [ast::Name],
num_values: usize,
) -> Result<Insertion<'a>> {
let table_columns = table.columns();
let rowid_register = program.alloc_register();
let mut insertion_key = InsertionKey::Autogenerated {
register: rowid_register,
};
let mut column_mappings = table
.columns()
.iter()
.map(|c| ColMapping {
column: c,
value_index: None,
register: program.alloc_register(),
})
.collect::<Vec<_>>();
if columns.is_empty() {
if num_values != table_columns.iter().filter(|c| !c.hidden()).count() {
crate::bail_parse_error!(
"table {} has {} columns but {} values were supplied",
&table.get_name(),
table_columns.len(),
num_values
);
}
let mut value_idx = 0;
for (i, col) in table_columns.iter().enumerate() {
if col.hidden() {
continue;
}
if col.is_rowid_alias() {
insertion_key = InsertionKey::RowidAlias(ColMapping {
column: col,
value_index: Some(value_idx),
register: rowid_register,
});
} else {
column_mappings[i].value_index = Some(value_idx);
}
value_idx += 1;
}
} else {
for (value_index, column_name) in columns.iter().enumerate() {
let column_name = normalize_ident(column_name.as_str());
if let Some((idx_in_table, col_in_table)) = table.get_column_by_name(&column_name) {
if col_in_table.is_rowid_alias() {
insertion_key = InsertionKey::RowidAlias(ColMapping {
column: col_in_table,
value_index: Some(value_index),
register: rowid_register,
});
} else if column_mappings[idx_in_table].value_index.is_none() {
column_mappings[idx_in_table].value_index = Some(value_index);
}
} else if ROWID_STRS
.iter()
.any(|s| s.eq_ignore_ascii_case(&column_name))
{
if let Some(col_in_table) = table.columns().iter().find(|c| c.is_rowid_alias()) {
insertion_key = InsertionKey::RowidAlias(ColMapping {
column: col_in_table,
value_index: Some(value_index),
register: rowid_register,
});
} else {
insertion_key = InsertionKey::LiteralRowid {
value_index: Some(value_index),
register: rowid_register,
};
}
} else {
crate::bail_parse_error!(
"table {} has no column named {}",
&table.get_name(),
column_name
);
}
}
}
Ok(Insertion {
key: insertion_key,
col_mappings: column_mappings,
record_reg: program.alloc_register(),
})
}
fn translate_rows_multiple<'short, 'long: 'short>(
program: &mut ProgramBuilder,
insertion: &'short Insertion<'long>,
yield_reg: usize,
resolver: &Resolver,
temp_table_ctx: &Option<TempTableCtx>,
) -> Result<()> {
if let Some(ref temp_table_ctx) = temp_table_ctx {
program.emit_insn(Insn::Rewind {
cursor_id: temp_table_ctx.cursor_id,
pc_if_empty: temp_table_ctx.loop_end_label,
});
program.preassign_label_to_next_insn(temp_table_ctx.loop_start_label);
}
let translate_value_fn =
|prg: &mut ProgramBuilder, value_index: usize, column_register: usize| {
if let Some(temp_table_ctx) = temp_table_ctx {
prg.emit_insn(Insn::Column {
cursor_id: temp_table_ctx.cursor_id,
column: value_index,
dest: column_register,
default: None,
});
} else {
prg.emit_insn(Insn::Copy {
src_reg: yield_reg + value_index,
dst_reg: column_register,
extra_amount: 0,
});
}
Ok(())
};
translate_rows_base(program, insertion, translate_value_fn, resolver)
}
fn translate_rows_single(
program: &mut ProgramBuilder,
value: &[Box<Expr>],
insertion: &Insertion,
resolver: &Resolver,
) -> Result<()> {
let translate_value_fn =
|prg: &mut ProgramBuilder, value_index: usize, column_register: usize| -> Result<()> {
translate_expr_no_constant_opt(
prg,
None,
value.get(value_index).unwrap_or_else(|| {
panic!("value index out of bounds: {value_index} for value: {value:?}")
}),
column_register,
resolver,
NoConstantOptReason::RegisterReuse,
)?;
Ok(())
};
translate_rows_base(program, insertion, translate_value_fn, resolver)
}
fn translate_rows_base<'short, 'long: 'short>(
program: &mut ProgramBuilder,
insertion: &'short Insertion<'long>,
mut translate_value_fn: impl FnMut(&mut ProgramBuilder, usize, usize) -> Result<()>,
resolver: &Resolver,
) -> Result<()> {
translate_key(program, insertion, &mut translate_value_fn, resolver)?;
for col in insertion.col_mappings.iter() {
translate_column(
program,
col.column,
col.register,
col.value_index,
&mut translate_value_fn,
resolver,
)?;
}
Ok(())
}
fn translate_key(
program: &mut ProgramBuilder,
insertion: &Insertion,
mut translate_value_fn: impl FnMut(&mut ProgramBuilder, usize, usize) -> Result<()>,
resolver: &Resolver,
) -> Result<()> {
match &insertion.key {
InsertionKey::RowidAlias(rowid_alias_column) => translate_column(
program,
rowid_alias_column.column,
rowid_alias_column.register,
rowid_alias_column.value_index,
&mut translate_value_fn,
resolver,
),
InsertionKey::LiteralRowid {
value_index,
register,
} => translate_column(
program,
&ROWID_COLUMN,
*register,
*value_index,
&mut translate_value_fn,
resolver,
),
InsertionKey::Autogenerated { .. } => Ok(()), }
}
fn translate_column(
program: &mut ProgramBuilder,
column: &Column,
column_register: usize,
value_index: Option<usize>,
translate_value_fn: &mut impl FnMut(&mut ProgramBuilder, usize, usize) -> Result<()>,
resolver: &Resolver,
) -> Result<()> {
if let Some(value_index) = value_index {
translate_value_fn(program, value_index, column_register)?;
} else if column.is_rowid_alias() {
program.emit_insn(Insn::SoftNull {
reg: column_register,
});
} else if column.hidden() {
program.emit_insn(Insn::Null {
dest: column_register,
dest_end: None,
});
} else if let Some(default_expr) = column.default.as_ref() {
translate_expr(program, None, default_expr, column_register, resolver)?;
} else {
let nullable = !column.notnull() && !column.primary_key();
if !nullable {
crate::bail_parse_error!(
"column {} is not nullable",
column
.name
.as_ref()
.expect("column name must be present")
.as_str()
);
}
program.emit_insn(Insn::Null {
dest: column_register,
dest_end: None,
});
}
Ok(())
}
fn emit_pk_uniqueness_check(
program: &mut ProgramBuilder,
ctx: &mut InsertEmitCtx,
resolver: &mut Resolver,
insertion: &Insertion,
position: Option<usize>,
upsert_catch_all: Option<usize>,
preflight: &mut PreflightCtx,
) -> Result<()> {
let make_record_label = program.allocate_label();
program.emit_insn(Insn::NotExists {
cursor: ctx.cursor_id,
rowid_reg: insertion.key_register(),
target_pc: make_record_label,
});
let rowid_column_name = insertion.key.column_name();
'emit_halt: {
if preflight.on_replace {
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: ctx.conflict_rowid_reg,
extra_amount: 0,
});
emit_replace_delete_conflicting_row(
program,
resolver,
preflight.connection,
ctx,
preflight.table_references,
)?;
program.emit_insn(Insn::Goto {
target_pc: make_record_label,
});
break 'emit_halt;
}
if let Some(position) = position.or(upsert_catch_all) {
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: ctx.conflict_rowid_reg,
extra_amount: 0,
});
program.emit_insn(Insn::Goto {
target_pc: preflight.upsert_actions[position].1,
});
break 'emit_halt;
}
let mut description =
String::with_capacity(ctx.table.name.len() + rowid_column_name.len() + 2);
description.push_str(ctx.table.name.as_str());
description.push('.');
description.push_str(rowid_column_name);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description,
});
}
program.preassign_label_to_next_insn(make_record_label);
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn emit_index_uniqueness_check(
program: &mut ProgramBuilder,
ctx: &mut InsertEmitCtx,
resolver: &mut Resolver,
insertion: &Insertion,
index: &Index,
position: Option<usize>,
upsert_catch_all: Option<usize>,
preflight: &mut PreflightCtx,
) -> Result<()> {
let idx_cursor_id = ctx
.idx_cursors
.iter()
.find(|(name, _, _)| name == &index.name)
.map(|(_, _, c_id)| *c_id)
.expect("no cursor found for index");
let maybe_skip_probe_label = emit_partial_index_check(program, resolver, index, insertion)?;
let num_cols = index.columns.len();
let idx_start_reg = program.alloc_registers(num_cols + 1);
for (i, idx_col) in index.columns.iter().enumerate() {
emit_index_column_value_for_insert(
program,
resolver,
insertion,
idx_col,
idx_start_reg + i,
)?;
}
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: idx_start_reg + num_cols,
extra_amount: 0,
});
if index.unique {
emit_unique_index_check(
program,
ctx,
resolver,
index,
idx_cursor_id,
idx_start_reg,
num_cols,
position,
upsert_catch_all,
preflight,
)?;
} else {
if preflight.upsert_actions.is_empty() {
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: to_u16(idx_start_reg),
count: to_u16(num_cols + 1),
dest_reg: to_u16(record_reg),
index_name: Some(index.name.clone()),
affinity_str: None,
});
program.emit_insn(Insn::IdxInsert {
cursor_id: idx_cursor_id,
record_reg,
unpacked_start: Some(idx_start_reg),
unpacked_count: Some((num_cols + 1) as u16),
flags: IdxInsertFlags::new().nchange(true),
});
}
}
if let Some(lbl) = maybe_skip_probe_label {
program.resolve_label(lbl, program.offset());
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn emit_unique_index_check(
program: &mut ProgramBuilder,
ctx: &mut InsertEmitCtx,
resolver: &mut Resolver,
index: &Index,
idx_cursor_id: usize,
idx_start_reg: usize,
num_cols: usize,
position: Option<usize>,
upsert_catch_all: Option<usize>,
preflight: &mut PreflightCtx,
) -> Result<()> {
let aff = index
.columns
.iter()
.map(|ic| {
if ic.expr.is_some() {
Affinity::Blob.aff_mask()
} else {
ctx.table.columns[ic.pos_in_table].affinity().aff_mask()
}
})
.collect::<String>();
program.emit_insn(Insn::Affinity {
start_reg: idx_start_reg,
count: NonZeroUsize::new(num_cols).expect("nonzero col count"),
affinities: aff,
});
if !preflight.upsert_actions.is_empty() {
let next_check = program.allocate_label();
program.emit_insn(Insn::NoConflict {
cursor_id: idx_cursor_id,
target_pc: next_check,
record_reg: idx_start_reg,
num_regs: num_cols,
});
if let Some(position) = position.or(upsert_catch_all) {
match &preflight.upsert_actions[position].2.do_clause {
UpsertDo::Nothing => {
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.row_done,
});
}
UpsertDo::Set { .. } => {
program.emit_insn(Insn::IdxRowId {
cursor_id: idx_cursor_id,
dest: ctx.conflict_rowid_reg,
});
program.emit_insn(Insn::Goto {
target_pc: preflight.upsert_actions[position].1,
});
}
}
}
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_UNIQUE,
description: format_unique_violation_desc(ctx.table.name.as_str(), index),
});
program.preassign_label_to_next_insn(next_check);
} else {
let ok = program.allocate_label();
program.emit_insn(Insn::NoConflict {
cursor_id: idx_cursor_id,
target_pc: ok,
record_reg: idx_start_reg,
num_regs: num_cols,
});
if preflight.on_replace {
program.emit_insn(Insn::IdxRowId {
cursor_id: idx_cursor_id,
dest: ctx.conflict_rowid_reg,
});
emit_replace_delete_conflicting_row(
program,
resolver,
preflight.connection,
ctx,
preflight.table_references,
)?;
program.emit_insn(Insn::Goto { target_pc: ok });
} else {
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_UNIQUE,
description: format_unique_violation_desc(ctx.table.name.as_str(), index),
});
}
program.preassign_label_to_next_insn(ok);
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: to_u16(idx_start_reg),
count: to_u16(num_cols + 1),
dest_reg: to_u16(record_reg),
index_name: Some(index.name.clone()),
affinity_str: None,
});
program.emit_insn(Insn::IdxInsert {
cursor_id: idx_cursor_id,
record_reg,
unpacked_start: Some(idx_start_reg),
unpacked_count: Some((num_cols + 1) as u16),
flags: IdxInsertFlags::new().nchange(true),
});
}
Ok(())
}
fn emit_preflight_constraint_checks(
program: &mut ProgramBuilder,
ctx: &mut InsertEmitCtx,
resolver: &mut Resolver,
insertion: &Insertion,
constraints: &ConstraintsToCheck,
preflight: &mut PreflightCtx,
) -> Result<()> {
for (constraint, position) in &constraints.constraints_to_check {
match constraint {
ResolvedUpsertTarget::PrimaryKey => {
emit_pk_uniqueness_check(
program,
ctx,
resolver,
insertion,
*position,
constraints.upsert_catch_all_position,
preflight,
)?;
}
ResolvedUpsertTarget::Index(index) => {
emit_index_uniqueness_check(
program,
ctx,
resolver,
insertion,
index,
*position,
constraints.upsert_catch_all_position,
preflight,
)?;
}
ResolvedUpsertTarget::CatchAll => unreachable!(),
}
}
Ok(())
}
fn translate_virtual_table_insert(
mut program: ProgramBuilder,
virtual_table: Arc<VirtualTable>,
columns: Vec<ast::Name>,
mut body: InsertBody,
on_conflict: Option<ResolveType>,
resolver: &Resolver,
) -> Result<ProgramBuilder> {
if virtual_table.readonly() {
crate::bail_constraint_error!("Table is read-only: {}", virtual_table.name);
}
let (num_values, value) = match &mut body {
InsertBody::Select(select, None) => match &mut select.body.select {
OneSelect::Values(values) => (values[0].len(), values.pop().unwrap()),
_ => crate::bail_parse_error!("Virtual tables only support VALUES clause in INSERT"),
},
InsertBody::DefaultValues => (0, vec![]),
_ => crate::bail_parse_error!("Unsupported INSERT body for virtual tables"),
};
let table = Table::Virtual(virtual_table.clone());
let cursor_id = program.alloc_cursor_id(CursorType::VirtualTable(virtual_table.clone()));
program.emit_insn(Insn::VOpen { cursor_id });
program.emit_insn(Insn::VBegin { cursor_id });
let registers_start = program.alloc_register();
program.emit_insn(Insn::Null {
dest: registers_start,
dest_end: None,
});
let insertion = build_insertion(&mut program, &table, &columns, num_values)?;
translate_rows_single(&mut program, &value, &insertion, resolver)?;
let conflict_action = on_conflict.as_ref().map(|c| c.bit_value()).unwrap_or(0) as u16;
program.emit_insn(Insn::VUpdate {
cursor_id,
arg_count: insertion.col_mappings.len() + 2, start_reg: registers_start,
conflict_action,
});
program.emit_insn(Insn::Close { cursor_id });
let halt_label = program.allocate_label();
program.resolve_label(halt_label, program.offset());
Ok(program)
}
fn ensure_sequence_initialized(
program: &mut ProgramBuilder,
schema: &Schema,
table: &schema::BTreeTable,
) -> Result<()> {
let seq_table = schema.get_btree_table("sqlite_sequence").ok_or_else(|| {
crate::error::LimboError::InternalError("sqlite_sequence table not found".to_string())
})?;
let seq_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(seq_table.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: seq_cursor_id,
root_page: seq_table.root_page.into(),
db: 0,
});
let table_name_reg = program.emit_string8_new_reg(table.name.clone());
let loop_start_label = program.allocate_label();
let entry_exists_label = program.allocate_label();
let insert_new_label = program.allocate_label();
program.emit_insn(Insn::Rewind {
cursor_id: seq_cursor_id,
pc_if_empty: insert_new_label,
});
program.preassign_label_to_next_insn(loop_start_label);
let name_col_reg = program.alloc_register();
program.emit_column_or_rowid(seq_cursor_id, 0, name_col_reg);
program.emit_insn(Insn::Eq {
lhs: table_name_reg,
rhs: name_col_reg,
target_pc: entry_exists_label,
flags: Default::default(),
collation: None,
});
program.emit_insn(Insn::Next {
cursor_id: seq_cursor_id,
pc_if_next: loop_start_label,
});
program.preassign_label_to_next_insn(insert_new_label);
let record_reg = program.alloc_register();
let record_start_reg = program.alloc_registers(2);
let zero_reg = program.alloc_register();
program.emit_insn(Insn::Integer {
dest: zero_reg,
value: 0,
});
program.emit_insn(Insn::Copy {
src_reg: table_name_reg,
dst_reg: record_start_reg,
extra_amount: 0,
});
program.emit_insn(Insn::Copy {
src_reg: zero_reg,
dst_reg: record_start_reg + 1,
extra_amount: 0,
});
let affinity_str = seq_table
.columns
.iter()
.map(|c| c.affinity().aff_mask())
.collect();
program.emit_insn(Insn::MakeRecord {
start_reg: to_u16(record_start_reg),
count: to_u16(2),
dest_reg: to_u16(record_reg),
index_name: None,
affinity_str: Some(affinity_str),
});
let new_rowid_reg = program.alloc_register();
program.emit_insn(Insn::NewRowid {
cursor: seq_cursor_id,
rowid_reg: new_rowid_reg,
prev_largest_reg: 0,
});
program.emit_insn(Insn::Insert {
cursor: seq_cursor_id,
key_reg: new_rowid_reg,
record_reg,
flag: InsertFlags::new(),
table_name: "sqlite_sequence".to_string(),
});
program.preassign_label_to_next_insn(entry_exists_label);
program.emit_insn(Insn::Close {
cursor_id: seq_cursor_id,
});
Ok(())
}
#[inline]
pub fn format_unique_violation_desc(table_name: &str, index: &Index) -> String {
if index.columns.len() == 1 {
let mut s = String::with_capacity(table_name.len() + 1 + index.columns[0].name.len());
s.push_str(table_name);
s.push('.');
s.push_str(&index.columns[0].name);
s
} else {
let mut s = String::with_capacity(table_name.len() + 3 + 4 * index.columns.len());
s.push_str(table_name);
s.push_str(".(");
s.push_str(
&index
.columns
.iter()
.map(|c| c.name.as_str())
.collect::<Vec<_>>()
.join(", "),
);
s.push(')');
s
}
}
pub fn rewrite_partial_index_where(
expr: &mut ast::Expr,
insertion: &Insertion,
) -> crate::Result<WalkControl> {
let col_reg = |name: &str| -> Option<usize> {
if ROWID_STRS.iter().any(|s| s.eq_ignore_ascii_case(name)) {
Some(insertion.key_register())
} else if let Some(c) = insertion.get_col_mapping_by_name(name) {
if c.column.is_rowid_alias() {
Some(insertion.key_register())
} else {
Some(c.register)
}
} else {
None
}
};
walk_expr_mut(
expr,
&mut |e: &mut ast::Expr| -> crate::Result<WalkControl> {
match e {
Expr::Id(name) => {
let normalized = normalize_ident(name.as_str());
if let Some(reg) = col_reg(&normalized) {
*e = Expr::Register(reg);
}
}
Expr::Qualified(_, col) | Expr::DoublyQualified(_, _, col) => {
let normalized = normalize_ident(col.as_str());
if let Some(reg) = col_reg(&normalized) {
*e = Expr::Register(reg);
}
}
_ => {}
}
Ok(WalkControl::Continue)
},
)
}
fn rewrite_index_expr_for_insertion(expr: &mut ast::Expr, insertion: &Insertion) -> Result<()> {
let mut missing_column = None;
let col_reg = |name: &str| -> Option<usize> {
if ROWID_STRS.iter().any(|s| s.eq_ignore_ascii_case(name)) {
Some(insertion.key_register())
} else if let Some(c) = insertion.get_col_mapping_by_name(name) {
if c.column.is_rowid_alias() {
Some(insertion.key_register())
} else {
Some(c.register)
}
} else {
None
}
};
walk_expr_mut(
expr,
&mut |e: &mut ast::Expr| -> crate::Result<WalkControl> {
match e {
Expr::Id(name) | Expr::Name(name) => {
let normalized = normalize_ident(name.as_str());
if let Some(reg) = col_reg(&normalized) {
*e = Expr::Register(reg);
} else {
missing_column = Some(normalized);
}
}
Expr::Qualified(_, col) | Expr::DoublyQualified(_, _, col) => {
let normalized = normalize_ident(col.as_str());
if let Some(reg) = col_reg(&normalized) {
*e = Expr::Register(reg);
} else {
missing_column = Some(normalized);
}
}
_ => {}
}
Ok(if missing_column.is_some() {
WalkControl::SkipChildren
} else {
WalkControl::Continue
})
},
)?;
if let Some(col) = missing_column {
return Err(LimboError::PlanningError(format!(
"Column not found in INSERT: {col}"
)));
}
Ok(())
}
fn emit_index_column_value_for_insert(
program: &mut ProgramBuilder,
resolver: &Resolver,
insertion: &Insertion,
idx_col: &IndexColumn,
dest_reg: usize,
) -> Result<()> {
if let Some(expr) = &idx_col.expr {
let mut expr = expr.as_ref().clone();
rewrite_index_expr_for_insertion(&mut expr, insertion)?;
translate_expr_no_constant_opt(
program,
Some(&TableReferences::new_empty()),
&expr,
dest_reg,
resolver,
NoConstantOptReason::RegisterReuse,
)?;
} else {
let Some(cm) = insertion.get_col_mapping_by_name(&idx_col.name) else {
return Err(LimboError::PlanningError(
"Column not found in INSERT".to_string(),
));
};
program.emit_insn(Insn::Copy {
src_reg: cm.register,
dst_reg: dest_reg,
extra_amount: 0,
});
}
Ok(())
}
struct ConstraintsToCheck {
constraints_to_check: Vec<(ResolvedUpsertTarget, Option<usize>)>,
upsert_catch_all_position: Option<usize>,
}
struct PreflightCtx<'a, 'b> {
upsert_actions: &'a [(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)],
on_replace: bool,
connection: &'a Arc<Connection>,
table_references: &'b mut TableReferences,
}
fn build_constraints_to_check(
resolver: &Resolver,
table_name: &str,
upsert_actions: &[(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)],
has_user_provided_rowid: bool,
) -> ConstraintsToCheck {
let mut constraints_to_check = Vec::new();
if has_user_provided_rowid {
let position = upsert_actions
.iter()
.position(|(target, ..)| matches!(target, ResolvedUpsertTarget::PrimaryKey));
constraints_to_check.push((ResolvedUpsertTarget::PrimaryKey, position));
}
for index in resolver.schema.get_indices(table_name) {
let position = upsert_actions
.iter()
.position(|(target, ..)| matches!(target, ResolvedUpsertTarget::Index(x) if Arc::ptr_eq(x, index)));
constraints_to_check.push((ResolvedUpsertTarget::Index(index.clone()), position));
}
constraints_to_check.sort_by(|(_, p1), (_, p2)| match (p1, p2) {
(Some(p1), Some(p2)) => p1.cmp(p2),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => std::cmp::Ordering::Equal,
});
let upsert_catch_all_position =
if let Some((ResolvedUpsertTarget::CatchAll, ..)) = upsert_actions.last() {
Some(upsert_actions.len() - 1)
} else {
None
};
ConstraintsToCheck {
constraints_to_check,
upsert_catch_all_position,
}
}
fn emit_update_sqlite_sequence(
program: &mut ProgramBuilder,
schema: &Schema,
seq_cursor_id: usize,
r_seq_rowid: usize,
table_name_reg: usize,
new_key_reg: usize,
) -> Result<()> {
let record_reg = program.alloc_register();
let record_start_reg = program.alloc_registers(2);
program.emit_insn(Insn::Copy {
src_reg: table_name_reg,
dst_reg: record_start_reg,
extra_amount: 0,
});
program.emit_insn(Insn::Copy {
src_reg: new_key_reg,
dst_reg: record_start_reg + 1,
extra_amount: 0,
});
let seq_table = schema.get_btree_table("sqlite_sequence").unwrap();
let affinity_str = seq_table
.columns
.iter()
.map(|col| col.affinity().aff_mask())
.collect::<String>();
program.emit_insn(Insn::MakeRecord {
start_reg: to_u16(record_start_reg),
count: to_u16(2),
dest_reg: to_u16(record_reg),
index_name: None,
affinity_str: Some(affinity_str),
});
let update_existing_label = program.allocate_label();
let end_update_label = program.allocate_label();
program.emit_insn(Insn::NotNull {
reg: r_seq_rowid,
target_pc: update_existing_label,
});
program.emit_insn(Insn::NewRowid {
cursor: seq_cursor_id,
rowid_reg: r_seq_rowid,
prev_largest_reg: 0,
});
program.emit_insn(Insn::Insert {
cursor: seq_cursor_id,
key_reg: r_seq_rowid,
record_reg,
flag: InsertFlags::new(),
table_name: "sqlite_sequence".to_string(),
});
program.emit_insn(Insn::Goto {
target_pc: end_update_label,
});
program.preassign_label_to_next_insn(update_existing_label);
program.emit_insn(Insn::Insert {
cursor: seq_cursor_id,
key_reg: r_seq_rowid,
record_reg,
flag: InsertFlags(turso_parser::ast::ResolveType::Replace.bit_value() as u8),
table_name: "sqlite_sequence".to_string(),
});
program.preassign_label_to_next_insn(end_update_label);
Ok(())
}
fn emit_replace_delete_conflicting_row(
program: &mut ProgramBuilder,
resolver: &mut Resolver,
connection: &Arc<Connection>,
ctx: &mut InsertEmitCtx,
table_references: &mut TableReferences,
) -> Result<()> {
program.emit_insn(Insn::SeekRowid {
cursor_id: ctx.cursor_id,
src_reg: ctx.conflict_rowid_reg,
target_pc: ctx.halt_label,
});
if connection.foreign_keys_enabled() {
fire_fk_delete_actions(
program,
resolver,
ctx.table.name.as_str(),
ctx.cursor_id,
ctx.conflict_rowid_reg,
connection,
)?;
}
let table = &ctx.table;
let table_name = table.name.as_str();
let main_cursor_id = ctx.cursor_id;
for (name, _, index_cursor_id) in ctx.idx_cursors.iter() {
let index = resolver
.schema
.get_index(table_name, name)
.expect("index to exist");
let skip_delete_label = if index.where_clause.is_some() {
let where_copy = index
.bind_where_expr(Some(table_references), connection)
.expect("where clause to exist");
let skip_label = program.allocate_label();
let reg = program.alloc_register();
translate_expr_no_constant_opt(
program,
Some(table_references),
&where_copy,
reg,
resolver,
NoConstantOptReason::RegisterReuse,
)?;
program.emit_insn(Insn::IfNot {
reg,
jump_if_null: true,
target_pc: skip_label,
});
Some(skip_label)
} else {
None
};
let num_regs = index.columns.len() + 1;
let start_reg = program.alloc_registers(num_regs);
for (reg_offset, column_index) in index.columns.iter().enumerate() {
if let Some(expr) = &column_index.expr {
let mut expr = expr.as_ref().clone();
bind_and_rewrite_expr(
&mut expr,
Some(table_references),
None,
connection,
BindingBehavior::ResultColumnsNotAllowed,
)?;
translate_expr_no_constant_opt(
program,
Some(table_references),
&expr,
start_reg + reg_offset,
resolver,
NoConstantOptReason::RegisterReuse,
)?;
} else {
program.emit_column_or_rowid(
main_cursor_id,
column_index.pos_in_table,
start_reg + reg_offset,
);
}
}
program.emit_insn(Insn::Copy {
src_reg: ctx.conflict_rowid_reg,
dst_reg: start_reg + num_regs - 1,
extra_amount: 0,
});
program.emit_insn(Insn::IdxDelete {
start_reg,
num_regs,
cursor_id: *index_cursor_id,
raise_error_if_no_matching_entry: index.where_clause.is_none(),
});
if let Some(label) = skip_delete_label {
program.resolve_label(label, program.offset());
}
}
if let Some(cdc_cursor_id) = ctx.cdc_table.as_ref().map(|(id, _tbl)| *id) {
let cdc_has_before = program.capture_data_changes_mode().has_before();
let before_record_reg = if cdc_has_before {
Some(emit_cdc_full_record(
program,
&table.columns,
main_cursor_id,
ctx.conflict_rowid_reg,
))
} else {
None
};
emit_cdc_insns(
program,
resolver,
OperationMode::DELETE,
cdc_cursor_id,
ctx.conflict_rowid_reg,
before_record_reg,
None,
None,
table_name,
)?;
}
program.emit_insn(Insn::Delete {
cursor_id: main_cursor_id,
table_name: table_name.to_string(),
is_part_of_update: true,
});
Ok(())
}
pub fn emit_fk_child_insert_checks(
program: &mut ProgramBuilder,
resolver: &Resolver,
child_tbl: &BTreeTable,
new_start_reg: usize,
new_rowid_reg: usize,
) -> crate::Result<()> {
for fk_ref in resolver.schema.resolved_fks_for_child(&child_tbl.name)? {
let is_self_ref = fk_ref.fk.parent_table.eq_ignore_ascii_case(&child_tbl.name);
let fk_ok = program.allocate_label();
for cname in &fk_ref.child_cols {
let (i, col) = child_tbl.get_column(cname).unwrap();
let src = if col.is_rowid_alias() {
new_rowid_reg
} else {
new_start_reg + i
};
program.emit_insn(Insn::IsNull {
reg: src,
target_pc: fk_ok,
});
}
let parent_tbl = resolver
.schema
.get_btree_table(&fk_ref.fk.parent_table)
.expect("parent btree");
if fk_ref.parent_uses_rowid {
let pcur = open_read_table(program, &parent_tbl);
let (i_child, col_child) = child_tbl.get_column(&fk_ref.child_cols[0]).unwrap();
let val_reg = if col_child.is_rowid_alias() {
new_rowid_reg
} else {
new_start_reg + i_child
};
let tmp = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: val_reg,
dst_reg: tmp,
extra_amount: 0,
});
program.emit_insn(Insn::MustBeInt { reg: tmp });
if is_self_ref {
program.emit_insn(Insn::Eq {
lhs: tmp,
rhs: new_rowid_reg,
target_pc: fk_ok,
flags: CmpInsFlags::default(),
collation: None,
});
}
let violation = program.allocate_label();
program.emit_insn(Insn::NotExists {
cursor: pcur,
rowid_reg: tmp,
target_pc: violation,
});
program.emit_insn(Insn::Close { cursor_id: pcur });
program.emit_insn(Insn::Goto { target_pc: fk_ok });
program.preassign_label_to_next_insn(violation);
program.emit_insn(Insn::Close { cursor_id: pcur });
emit_fk_violation(program, &fk_ref.fk)?;
program.preassign_label_to_next_insn(fk_ok);
} else {
let idx = fk_ref
.parent_unique_index
.as_ref()
.expect("parent unique index required");
let icur = open_read_index(program, idx);
let ncols = fk_ref.child_cols.len();
let probe = {
let start = program.alloc_registers(ncols);
for (k, cname) in fk_ref.child_cols.iter().enumerate() {
let (i, col) = child_tbl.get_column(cname).unwrap();
program.emit_insn(Insn::Copy {
src_reg: if col.is_rowid_alias() {
new_rowid_reg
} else {
new_start_reg + i
},
dst_reg: start + k,
extra_amount: 0,
});
}
if let Some(cnt) = NonZeroUsize::new(ncols) {
program.emit_insn(Insn::Affinity {
start_reg: start,
count: cnt,
affinities: build_index_affinity_string(idx, &parent_tbl),
});
}
start
};
if is_self_ref {
let parent_cols: Vec<&str> =
idx.columns.iter().map(|ic| ic.name.as_str()).collect();
let parent_new = program.alloc_registers(ncols);
for (i, pname) in parent_cols.iter().enumerate() {
let (pos, col) = child_tbl.get_column(pname).unwrap();
program.emit_insn(Insn::Copy {
src_reg: if col.is_rowid_alias() {
new_rowid_reg
} else {
new_start_reg + pos
},
dst_reg: parent_new + i,
extra_amount: 0,
});
}
if let Some(cnt) = NonZeroUsize::new(ncols) {
program.emit_insn(Insn::Affinity {
start_reg: parent_new,
count: cnt,
affinities: build_index_affinity_string(idx, &parent_tbl),
});
}
let mismatch = program.allocate_label();
for i in 0..ncols {
let cont = program.allocate_label();
program.emit_insn(Insn::Eq {
lhs: probe + i,
rhs: parent_new + i,
target_pc: cont,
flags: CmpInsFlags::default().jump_if_null(),
collation: Some(super::collate::CollationSeq::Binary),
});
program.emit_insn(Insn::Goto {
target_pc: mismatch,
});
program.preassign_label_to_next_insn(cont);
}
program.emit_insn(Insn::Goto { target_pc: fk_ok });
program.preassign_label_to_next_insn(mismatch);
}
index_probe(
program,
icur,
probe,
ncols,
|_p| Ok(()),
|p| {
emit_fk_violation(p, &fk_ref.fk)?;
Ok(())
},
)?;
program.emit_insn(Insn::Goto { target_pc: fk_ok });
program.preassign_label_to_next_insn(fk_ok);
}
}
Ok(())
}
fn build_parent_key_image_for_insert(
program: &mut ProgramBuilder,
parent_table: &BTreeTable,
pref: &ResolvedFkRef,
insertion: &Insertion,
) -> crate::Result<(usize, usize)> {
let parent_cols: Vec<String> = if pref.parent_uses_rowid {
vec!["rowid".to_string()]
} else if !pref.fk.parent_columns.is_empty() {
pref.fk.parent_columns.clone()
} else {
parent_table
.primary_key_columns
.iter()
.map(|(n, _)| n.clone())
.collect()
};
let ncols = parent_cols.len();
let start = program.alloc_registers(ncols);
for (i, pname) in parent_cols.iter().enumerate() {
let src = if pname.eq_ignore_ascii_case("rowid") {
insertion.key_register()
} else {
insertion
.get_col_mapping_by_name(pname)
.ok_or_else(|| {
crate::LimboError::PlanningError(format!(
"Column '{}' not present in INSERT image for parent {}",
pname, parent_table.name
))
})?
.register
};
program.emit_insn(Insn::Copy {
src_reg: src,
dst_reg: start + i,
extra_amount: 0,
});
}
let aff: String = if pref.parent_uses_rowid {
"i".to_string()
} else {
parent_cols
.iter()
.map(|name| {
let (_, col) = parent_table.get_column(name).ok_or_else(|| {
crate::LimboError::InternalError(format!("parent col {name} missing"))
})?;
Ok::<_, crate::LimboError>(col.affinity().aff_mask())
})
.collect::<Result<String, _>>()?
};
if let Some(count) = NonZeroUsize::new(ncols) {
program.emit_insn(Insn::Affinity {
start_reg: start,
count,
affinities: aff,
});
}
Ok((start, ncols))
}
pub fn emit_parent_side_fk_decrement_on_insert(
program: &mut ProgramBuilder,
resolver: &Resolver,
parent_table: &BTreeTable,
insertion: &Insertion,
force_immediate: bool,
) -> crate::Result<()> {
for pref in resolver
.schema
.resolved_fks_referencing(&parent_table.name)?
{
let is_self_ref = pref
.child_table
.name
.eq_ignore_ascii_case(&parent_table.name);
if !force_immediate && !pref.fk.deferred && !is_self_ref {
continue;
}
let (new_pk_start, n_cols) =
build_parent_key_image_for_insert(program, parent_table, &pref, insertion)?;
let child_tbl = &pref.child_table;
let child_cols = &pref.fk.child_columns;
let idx = resolver.schema.get_indices(&child_tbl.name).find(|ix| {
ix.columns.len() == child_cols.len()
&& ix
.columns
.iter()
.zip(child_cols.iter())
.all(|(ic, cc)| ic.name.eq_ignore_ascii_case(cc))
});
if let Some(ix) = idx {
let icur = open_read_index(program, ix);
let probe_start = program.alloc_registers(n_cols);
for i in 0..n_cols {
program.emit_insn(Insn::Copy {
src_reg: new_pk_start + i,
dst_reg: probe_start + i,
extra_amount: 0,
});
}
if let Some(count) = NonZeroUsize::new(n_cols) {
program.emit_insn(Insn::Affinity {
start_reg: probe_start,
count,
affinities: build_index_affinity_string(ix, child_tbl),
});
}
let found = program.allocate_label();
program.emit_insn(Insn::Found {
cursor_id: icur,
target_pc: found,
record_reg: probe_start,
num_regs: n_cols,
});
program.emit_insn(Insn::Close { cursor_id: icur });
let skip = program.allocate_label();
program.emit_insn(Insn::Goto { target_pc: skip });
program.resolve_label(found, program.offset());
program.emit_insn(Insn::Close { cursor_id: icur });
emit_guarded_fk_decrement(program, skip, pref.fk.deferred);
program.resolve_label(skip, program.offset());
} else {
let ccur = open_read_table(program, child_tbl);
let done = program.allocate_label();
program.emit_insn(Insn::Rewind {
cursor_id: ccur,
pc_if_empty: done,
});
let loop_top = program.allocate_label();
let next_row = program.allocate_label();
program.resolve_label(loop_top, program.offset());
for (i, child_name) in child_cols.iter().enumerate() {
let (pos, _) = child_tbl.get_column(child_name).ok_or_else(|| {
crate::LimboError::InternalError(format!("child col {child_name} missing"))
})?;
let tmp = program.alloc_register();
program.emit_insn(Insn::Column {
cursor_id: ccur,
column: pos,
dest: tmp,
default: None,
});
program.emit_insn(Insn::IsNull {
reg: tmp,
target_pc: next_row,
});
let cont = program.allocate_label();
program.emit_insn(Insn::Eq {
lhs: tmp,
rhs: new_pk_start + i,
target_pc: cont,
flags: CmpInsFlags::default().jump_if_null(),
collation: Some(super::collate::CollationSeq::Binary),
});
program.emit_insn(Insn::Goto {
target_pc: next_row,
});
program.resolve_label(cont, program.offset());
}
emit_guarded_fk_decrement(program, next_row, pref.fk.deferred);
program.resolve_label(next_row, program.offset());
program.emit_insn(Insn::Next {
cursor_id: ccur,
pc_if_next: loop_top,
});
program.resolve_label(done, program.offset());
program.emit_insn(Insn::Close { cursor_id: ccur });
}
}
Ok(())
}