use crate::schema::ColumnLayout;
use crate::translate::emitter::{emit_index_column_value_old_image, gencol};
use crate::turso_debug_assert;
use crate::{
error::{SQLITE_CONSTRAINT_NOTNULL, SQLITE_CONSTRAINT_PRIMARYKEY, SQLITE_CONSTRAINT_UNIQUE},
schema::{
self, BTreeTable, ColDef, Column, Index, IndexColumn, ResolvedFkRef, Table,
EXPR_INDEX_SENTINEL, SQLITE_SEQUENCE_TABLE_NAME,
},
sync::Arc,
translate::{
emitter::{
delete::emit_fk_child_decrement_on_delete, emit_cdc_autocommit_commit,
emit_cdc_full_record, emit_cdc_insns, emit_cdc_patch_record, emit_check_constraints,
emit_make_record, prepare_cdc_if_necessary, OperationMode, Resolver,
},
expr::{
bind_and_rewrite_expr, emit_returning_results, emit_returning_scan_back,
process_returning_clause, restore_returning_row_image_in_cache,
seed_returning_row_image_in_cache, translate_expr, translate_expr_no_constant_opt,
walk_expr, BindingBehavior, NoConstantOptReason, ReturningBufferCtx, WalkControl,
},
fkeys::{
build_index_affinity_string, emit_fk_restrict_halt, emit_fk_violation,
emit_guarded_fk_decrement, index_probe, open_read_index, open_read_table,
ForeignKeyActions,
},
plan::{
ColumnUsedMask, EvalAt, JoinedTable, Operation, QueryDestination, ResultSetColumn,
TableReferences,
},
planner::{plan_ctes_as_outer_refs, ROWID_STRS},
select::translate_select,
stmt_journal::{any_index_or_ipk_has_replace, set_insert_stmt_journal_flags},
subquery::{
emit_non_from_clause_subqueries_for_eval_at, emit_non_from_clause_subquery,
plan_subqueries_from_returning,
},
trigger_exec::{
fire_trigger, get_triggers_including_temp, has_triggers_including_temp, TriggerContext,
},
upsert::{
collect_set_clauses_for_upsert, emit_upsert, resolve_upsert_target,
ResolvedUpsertTarget,
},
},
util::normalize_ident,
vdbe::{
affinity::Affinity,
builder::{CursorKey, CursorType, DmlColumnContext, ProgramBuilder, ProgramBuilderOpts},
insn::{to_u16, CmpInsFlags, IdxInsertFlags, InsertFlags, Insn, RegisterOrLiteral},
BranchOffset,
},
CaptureDataChangesExt, Connection, LimboError, Result, VirtualTable,
};
use gencol::compute_virtual_columns;
use std::num::NonZeroUsize;
use turso_macros::turso_assert;
use turso_parser::ast::{
self, Expr, InsertBody, OneSelect, QualifiedName, ResolveType, ResultColumn, TriggerEvent,
TriggerTime, Upsert, UpsertDo, With,
};
fn validate(
table_name: &str,
resolver: &Resolver,
table: &Table,
database_id: usize,
conn: &Arc<Connection>,
) -> Result<()> {
if !conn.is_nested_stmt()
&& !conn.is_mvcc_bootstrap_connection()
&& !crate::schema::allow_user_dml(table_name)
{
crate::bail_parse_error!("table {} may not be modified", table_name);
}
if resolver.schema().is_materialized_view(table_name) {
crate::bail_parse_error!("cannot modify materialized view {}", table_name);
}
if table.btree().is_some_and(|t| t.has_autoincrement)
&& conn.mv_store_for_db(database_id).is_some()
{
crate::bail_parse_error!(
"AUTOINCREMENT is not supported in MVCC mode (journal_mode=experimental_mvcc)"
);
}
resolver.schema().with_incompatible_dependent_views(table_name, |views| {
if !views.is_empty() {
use crate::incremental::compiler::DBSP_CIRCUIT_VERSION;
crate::bail_parse_error!(
"Cannot DELETE from table '{table_name}' because it has incompatible dependent materialized view(s): {}. \n\
These views were created with a different DBSP version than the current version ({DBSP_CIRCUIT_VERSION}). \n\
Please DROP and recreate the view(s) before modifying this table.",
views.iter().fold(String::new(), |_, s| s.to_string() + ", "),
);
}
Ok(())
})
}
pub struct TempTableCtx {
cursor_id: usize,
loop_start_label: BranchOffset,
loop_end_label: BranchOffset,
}
pub struct InsertLoopLabels {
pub loop_start: BranchOffset,
pub row_done: BranchOffset,
pub stmt_epilogue: BranchOffset,
pub select_exhausted: Option<BranchOffset>,
}
pub struct InsertKeyLabels {
pub key_ready_for_check: BranchOffset,
pub key_generation: BranchOffset,
}
#[allow(dead_code)]
pub struct InsertEmitCtx<'a> {
pub table: &'a Arc<BTreeTable>,
pub idx_cursors: Vec<(String, i64, usize)>,
pub temp_table_ctx: Option<TempTableCtx>,
pub on_conflict: ResolveType,
pub statement_on_conflict: Option<ResolveType>,
pub num_values: usize,
pub yield_reg_opt: Option<usize>,
pub conflict_rowid_reg: usize,
pub cursor_id: usize,
pub halt_label: BranchOffset,
pub loop_labels: InsertLoopLabels,
pub key_labels: InsertKeyLabels,
pub cdc_table: Option<(usize, Arc<BTreeTable>)>,
pub autoincrement_meta: Option<AutoincMeta>,
pub database_id: usize,
pub returning_buffer: Option<ReturningBufferCtx>,
}
impl<'a> InsertEmitCtx<'a> {
#[allow(clippy::too_many_arguments)]
fn new(
program: &mut ProgramBuilder,
resolver: &Resolver,
table: &'a Arc<BTreeTable>,
on_conflict: Option<ResolveType>,
cdc_table: Option<(usize, Arc<BTreeTable>)>,
num_values: usize,
temp_table_ctx: Option<TempTableCtx>,
database_id: usize,
) -> Result<Self> {
let indices: Vec<_> = resolver.with_schema(database_id, |s| {
s.get_indices(table.name.as_str()).cloned().collect()
});
let mut idx_cursors = Vec::new();
for idx in &indices {
idx_cursors.push((
idx.name.clone(),
idx.root_page,
program.alloc_cursor_index(None, idx)?,
));
}
let loop_labels = InsertLoopLabels {
loop_start: program.allocate_label(),
row_done: program.allocate_label(),
stmt_epilogue: program.allocate_label(),
select_exhausted: None,
};
let key_labels = InsertKeyLabels {
key_ready_for_check: program.allocate_label(),
key_generation: program.allocate_label(),
};
Ok(Self {
table,
idx_cursors,
temp_table_ctx,
on_conflict: on_conflict.unwrap_or(ResolveType::Abort),
statement_on_conflict: on_conflict,
yield_reg_opt: None,
conflict_rowid_reg: program.alloc_register(),
cursor_id: 0, halt_label: program.allocate_label(),
loop_labels,
key_labels,
cdc_table,
num_values,
autoincrement_meta: None,
database_id,
returning_buffer: None,
})
}
}
#[allow(clippy::too_many_arguments)]
#[turso_macros::trace_stack]
pub fn translate_insert(
resolver: &mut Resolver,
on_conflict: Option<ResolveType>,
tbl_name: QualifiedName,
columns: Vec<ast::Name>,
mut body: InsertBody,
mut returning: Vec<ResultColumn>,
with: Option<With>,
program: &mut ProgramBuilder,
connection: &Arc<crate::Connection>,
) -> Result<()> {
let opts = ProgramBuilderOpts::new(1, 30, 5);
program.extend(&opts);
let with_for_returning = with.clone();
if let Some(insert_with) = with {
if let InsertBody::Select(select, _) = &mut body {
match &mut select.with {
Some(select_with) => {
let mut merged = insert_with.ctes;
merged.append(&mut select_with.ctes);
select_with.ctes = merged;
select_with.recursive |= insert_with.recursive;
}
None => select.with = Some(insert_with),
}
} else {
}
}
let database_id = resolver.resolve_existing_table_database_id_qualified(&tbl_name)?;
let table_name = &tbl_name.name;
let table = match resolver.with_schema(database_id, |s| s.get_table(table_name.as_str())) {
Some(table) => table,
None => crate::bail_parse_error!("no such table: {}", table_name),
};
if program.trigger.is_some() && table.virtual_table().is_some() {
crate::bail_parse_error!("unsafe use of virtual table \"{}\"", tbl_name.name.as_str());
}
validate(
table_name.as_str(),
resolver,
&table,
database_id,
connection,
)?;
let fk_enabled = connection.foreign_keys_enabled();
if let Some(virtual_table) = &table.virtual_table() {
translate_virtual_table_insert(
program,
virtual_table.clone(),
columns,
body,
on_conflict,
resolver,
connection,
)?;
return Ok(());
}
let Some(btree_table) = table.btree() else {
crate::bail_parse_error!("no such table: {}", table_name);
};
let BoundInsertResult {
mut values,
mut upsert_actions,
inserting_multiple_rows,
} = bind_insert(
program,
resolver,
&table,
&columns,
&mut body,
on_conflict.unwrap_or(ResolveType::Abort),
database_id,
)?;
if inserting_multiple_rows && btree_table.has_autoincrement {
ensure_sequence_initialized(program, resolver, &btree_table, database_id)?;
}
let cdc_table = prepare_cdc_if_necessary(program, resolver.schema(), table.get_name())?;
let schema_cookie = resolver.with_schema(database_id, |s| s.schema_version);
program.begin_write_on_database(database_id, schema_cookie);
let mut table_references = TableReferences::new(
vec![JoinedTable {
table: Table::BTree(
table
.btree()
.expect("we shouldn't have got here without a BTree table"),
),
identifier: normalize_ident(table_name.as_str()),
internal_id: program.table_reference_counter.next(),
op: Operation::default_scan_for(&table),
join_info: None,
col_used_mask: ColumnUsedMask::default(),
column_use_counts: Vec::new(),
expression_index_usages: Vec::new(),
database_id,
indexed: None,
}],
vec![],
);
plan_ctes_as_outer_refs(
with_for_returning,
resolver,
program,
&mut table_references,
connection,
)?;
let mut returning_subqueries = vec![];
plan_subqueries_from_returning(
program,
&mut returning_subqueries,
&mut table_references,
&mut returning,
resolver,
connection,
)?;
let mut result_columns =
process_returning_clause(&mut returning, &mut table_references, resolver)?;
let has_fks = fk_enabled
&& (resolver.with_schema(database_id, |s| s.has_child_fks(table_name.as_str()))
|| resolver.with_schema(database_id, |s| {
s.any_resolved_fks_referencing(table_name.as_str())
}));
if !btree_table.has_rowid {
if has_fks {
crate::bail_parse_error!("foreign keys on WITHOUT ROWID tables are not supported");
}
if on_conflict == Some(ResolveType::Replace) || !upsert_actions.is_empty() {
crate::bail_parse_error!(
"UPSERT and REPLACE on WITHOUT ROWID tables are not supported"
);
}
if cdc_table.is_some() {
crate::bail_parse_error!("CDC on WITHOUT ROWID tables is not supported");
}
if !resolver
.schema()
.get_dependent_materialized_views(table_name.as_str())
.is_empty()
{
crate::bail_parse_error!(
"materialized views on WITHOUT ROWID tables are not supported"
);
}
if resolver.with_schema(database_id, |s| {
s.get_indices(table_name.as_str()).next().is_some()
}) {
crate::bail_parse_error!("secondary indexes on WITHOUT ROWID tables are not supported");
}
}
let mut ctx = InsertEmitCtx::new(
program,
resolver,
&btree_table,
on_conflict,
cdc_table,
values.len(),
None,
database_id,
)?;
program
.flags
.set_has_statement_conflict(on_conflict.is_some());
if !result_columns.is_empty() {
let ret_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(btree_table.clone()));
program.emit_insn(Insn::OpenEphemeral {
cursor_id: ret_cursor_id,
is_table: true,
});
ctx.returning_buffer = Some(ReturningBufferCtx {
cursor_id: ret_cursor_id,
num_columns: result_columns.len(),
});
}
init_source_emission(
program,
&table,
connection,
&mut ctx,
resolver,
&mut values,
body,
&columns,
&table_references,
database_id,
)?;
let has_upsert = !upsert_actions.is_empty();
if !result_columns.is_empty() {
program.result_columns.clone_from(&result_columns);
}
let insertion = build_insertion(program, &table, &columns, ctx.num_values)?;
translate_rows_and_open_tables(
program,
resolver,
&insertion,
&ctx,
&values,
inserting_multiple_rows,
)?;
emit_non_from_clause_subqueries_for_eval_at(
program,
resolver,
&mut returning_subqueries,
&[],
Some(&table_references),
EvalAt::BeforeLoop,
|_| true,
)?;
let has_user_provided_rowid = ctx.table.has_rowid && insertion.key.is_provided_by_user();
if ctx.table.has_autoincrement {
init_autoincrement(program, &mut ctx, resolver)?;
}
if !ctx.table.is_strict {
let affinity = insertion
.col_mappings
.iter()
.filter(|cm| !cm.column.is_virtual_generated())
.map(|col_mapping| col_mapping.column.affinity());
if affinity.clone().any(|a| a != Affinity::Blob) {
if let Ok(count) = NonZeroUsize::try_from(insertion.num_non_virtual_cols) {
program.emit_insn(Insn::Affinity {
start_reg: insertion.first_col_register(),
count,
affinities: affinity.map(|a| a.aff_mask()).collect(),
});
}
}
}
let relevant_before_triggers = get_triggers_including_temp(
resolver,
database_id,
TriggerEvent::Insert,
TriggerTime::Before,
None,
&btree_table,
);
let dml_ctx =
DmlColumnContext::from_column_reg_mapping(insertion.col_mappings.iter().map(|cm| {
(
cm.column,
if cm.column.is_rowid_alias() {
insertion.key_register()
} else {
cm.register
},
)
}));
let has_before_triggers = !relevant_before_triggers.is_empty();
if has_before_triggers {
compute_virtual_columns(
program,
&ctx.table.columns_topo_sort()?,
&dml_ctx,
resolver,
&btree_table,
)?;
let saved_key_reg = if has_user_provided_rowid {
let save_reg = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: save_reg,
extra_amount: 0,
});
let skip_label = program.allocate_label();
program.emit_insn(Insn::NotNull {
reg: insertion.key_register(),
target_pc: skip_label,
});
program.emit_insn(Insn::Integer {
value: -1,
dest: insertion.key_register(),
});
program.preassign_label_to_next_insn(skip_label);
Some(save_reg)
} else {
program.emit_insn(Insn::Integer {
value: -1,
dest: insertion.key_register(),
});
None
};
let new_registers: Vec<usize> = insertion
.col_mappings
.iter()
.map(|col_mapping| {
if col_mapping.column.is_rowid_alias() {
insertion.key_register()
} else {
col_mapping.register
}
})
.chain(std::iter::once(insertion.key_register()))
.collect();
let trigger_ctx = if let Some(override_conflict) = program.trigger_conflict_override {
TriggerContext::new_with_override_conflict(
btree_table.clone(),
Some(new_registers),
None, override_conflict,
)
} else if !matches!(ctx.on_conflict, ResolveType::Abort) {
TriggerContext::new_with_override_conflict(
btree_table.clone(),
Some(new_registers),
None, ctx.on_conflict,
)
} else {
TriggerContext::new(
btree_table.clone(),
Some(new_registers),
None, )
};
for trigger in relevant_before_triggers {
fire_trigger(
program,
resolver,
trigger,
&trigger_ctx,
connection,
database_id,
ctx.loop_labels.row_done,
)?;
}
if let Some(save_reg) = saved_key_reg {
program.emit_insn(Insn::Copy {
src_reg: save_reg,
dst_reg: insertion.key_register(),
extra_amount: 0,
});
}
}
if has_user_provided_rowid {
emit_check_for_user_provided_rowid(program, &insertion, &ctx);
}
program.preassign_label_to_next_insn(ctx.key_labels.key_generation);
if ctx.table.has_rowid {
emit_rowid_generation(program, &ctx, &insertion, resolver)?;
}
program.preassign_label_to_next_insn(ctx.key_labels.key_ready_for_check);
if ctx.table.is_strict {
program.emit_insn(Insn::TypeCheck {
start_reg: insertion.first_col_register(),
count: insertion.num_non_virtual_cols,
check_generated: true,
table_reference: BTreeTable::input_type_check_table_ref(
ctx.table,
resolver.schema(),
None,
),
});
emit_custom_type_encode(program, resolver, &insertion, &ctx.table.name)?;
program.emit_insn(Insn::TypeCheck {
start_reg: insertion.first_col_register(),
count: insertion.num_non_virtual_cols,
check_generated: true,
table_reference: BTreeTable::type_check_table_ref(ctx.table, resolver.schema()),
});
}
if has_user_provided_rowid {
if let Some(AutoincMeta {
seq_cursor_id,
r_seq,
r_seq_rowid,
table_name_reg,
}) = ctx.autoincrement_meta
{
turso_assert!(ctx.table.has_autoincrement);
reload_autoincrement_state(
program,
AutoincMeta {
seq_cursor_id,
r_seq,
r_seq_rowid,
table_name_reg,
},
);
let missing_row_label = program.allocate_label();
let explicit_done_label = program.allocate_label();
program.emit_insn(Insn::IsNull {
reg: r_seq_rowid,
target_pc: missing_row_label,
});
let skip_seq_update_label = program.allocate_label();
program.emit_insn(Insn::Le {
lhs: insertion.key_register(),
rhs: r_seq,
target_pc: skip_seq_update_label,
flags: Default::default(),
collation: None,
});
emit_update_sqlite_sequence(
program,
resolver,
ctx.database_id,
seq_cursor_id,
r_seq_rowid,
table_name_reg,
insertion.key_register(),
)?;
program.emit_insn(Insn::Goto {
target_pc: explicit_done_label,
});
program.preassign_label_to_next_insn(skip_seq_update_label);
program.preassign_label_to_next_insn(missing_row_label);
let seq_to_write_reg = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: r_seq,
dst_reg: seq_to_write_reg,
extra_amount: 0,
});
program.emit_insn(Insn::MemMax {
dest_reg: seq_to_write_reg,
src_reg: insertion.key_register(),
});
emit_update_sqlite_sequence(
program,
resolver,
ctx.database_id,
seq_cursor_id,
r_seq_rowid,
table_name_reg,
seq_to_write_reg,
)?;
program.preassign_label_to_next_insn(explicit_done_label);
}
}
if insertion.has_virtual_columns() {
compute_virtual_columns(
program,
&ctx.table.columns_topo_sort()?,
&dml_ctx,
resolver,
&btree_table,
)?;
}
emit_check_constraints(
program,
&ctx.table.check_constraints,
resolver,
&ctx.table.name,
insertion.key_register(),
insertion.col_mappings.iter().filter_map(|m| {
m.column.name.as_deref().map(|n| {
let reg = if m.column.is_rowid_alias() {
insertion.key_register()
} else {
m.register
};
(n, reg)
})
}),
connection,
ctx.on_conflict,
ctx.loop_labels.row_done,
Some(&table_references),
)?;
let constraints = build_constraints_to_check(
table_name.as_str(),
&upsert_actions,
ctx.table.has_rowid,
has_user_provided_rowid,
resolver,
ctx.database_id,
ctx.table.rowid_alias_conflict_clause,
ctx.statement_on_conflict.is_some(),
);
let has_ddl_replace = ctx.statement_on_conflict.is_none()
&& upsert_actions.is_empty()
&& resolver.with_schema(ctx.database_id, |schema| {
any_index_or_ipk_has_replace(
ctx.table.rowid_alias_conflict_clause,
schema
.get_indices(ctx.table.name.as_str())
.map(|idx| idx.on_conflict),
)
});
let on_replace = (matches!(ctx.on_conflict, ResolveType::Replace) && upsert_actions.is_empty())
|| has_ddl_replace;
let mut preflight_ctx = PreflightCtx {
upsert_actions: &upsert_actions,
on_replace,
effective_on_conflict: ctx.on_conflict,
connection,
table_references: &mut table_references,
};
emit_notnulls(program, &ctx, &insertion, resolver)?;
for cm in &insertion.col_mappings {
resolver
.register_affinities
.insert(cm.register, cm.column.affinity());
}
resolver
.register_affinities
.insert(insertion.key_register(), Affinity::Integer);
emit_preflight_constraint_checks(
program,
&mut ctx,
resolver,
&insertion,
&constraints,
&mut preflight_ctx,
)?;
emit_make_record(
program,
insertion.col_mappings.iter().map(|m| m.column),
insertion.base_reg,
insertion.record_register(),
ctx.table.is_strict,
);
if has_fks {
let fk_layout = btree_table.column_layout();
emit_fk_child_insert_checks(
program,
&btree_table,
insertion.first_col_register(),
insertion.key_register(),
resolver,
database_id,
&fk_layout,
)?;
}
let statement_replace = matches!(ctx.on_conflict, ResolveType::Replace);
let skip_replace_indexes = has_ddl_replace && !statement_replace;
if has_upsert || !statement_replace {
emit_commit_phase(program, resolver, &insertion, &ctx, skip_replace_indexes)?;
}
resolver.register_affinities.clear();
let mut insert_flags = InsertFlags::new();
if matches!(ctx.on_conflict, ResolveType::Replace) || has_ddl_replace {
insert_flags = insert_flags.require_seek();
}
program.emit_insn(Insn::Insert {
cursor: ctx.cursor_id,
key_reg: insertion.key_register(),
record_reg: insertion.record_register(),
flag: insert_flags,
table_name: table_name.to_string(),
});
let relevant_after_triggers = get_triggers_including_temp(
resolver,
database_id,
TriggerEvent::Insert,
TriggerTime::After,
None,
&btree_table,
);
let has_after_triggers = !relevant_after_triggers.is_empty();
if has_after_triggers {
compute_virtual_columns(
program,
&ctx.table.columns_topo_sort()?,
&dml_ctx,
resolver,
&btree_table,
)?;
let key_reg = insertion.key_register();
let new_registers_after: Vec<usize> = insertion
.col_mappings
.iter()
.map(|cm| {
if cm.column.is_rowid_alias() {
key_reg
} else {
cm.register
}
})
.chain(std::iter::once(key_reg))
.collect();
let trigger_ctx_after = if let Some(override_conflict) = program.trigger_conflict_override {
TriggerContext::new_after_with_override_conflict(
btree_table.clone(),
Some(new_registers_after),
None,
override_conflict,
)
} else if !matches!(ctx.on_conflict, ResolveType::Abort) {
TriggerContext::new_after_with_override_conflict(
btree_table.clone(),
Some(new_registers_after),
None,
ctx.on_conflict,
)
} else {
TriggerContext::new_after(btree_table.clone(), Some(new_registers_after), None)
};
let after_trigger_done = program.allocate_label();
for trigger in relevant_after_triggers {
fire_trigger(
program,
resolver,
trigger,
&trigger_ctx_after,
connection,
database_id,
after_trigger_done,
)?;
}
program.preassign_label_to_next_insn(after_trigger_done);
}
if has_fks {
emit_parent_side_fk_decrement_on_insert(
program,
&btree_table,
&insertion,
on_replace,
resolver,
database_id,
)?;
}
if let Some(AutoincMeta {
seq_cursor_id,
r_seq,
r_seq_rowid,
table_name_reg,
}) = ctx.autoincrement_meta
{
reload_autoincrement_state(
program,
AutoincMeta {
seq_cursor_id,
r_seq,
r_seq_rowid,
table_name_reg,
},
);
let no_update_needed_label = program.allocate_label();
program.emit_insn(Insn::Le {
lhs: insertion.key_register(),
rhs: r_seq,
target_pc: no_update_needed_label,
flags: Default::default(),
collation: None,
});
emit_update_sqlite_sequence(
program,
resolver,
ctx.database_id,
seq_cursor_id,
r_seq_rowid,
table_name_reg,
insertion.key_register(),
)?;
program.preassign_label_to_next_insn(no_update_needed_label);
program.emit_insn(Insn::Close {
cursor_id: seq_cursor_id,
});
}
if let Some((cdc_cursor_id, _)) = &ctx.cdc_table {
let cdc_has_after = program.capture_data_changes_info().has_after();
let after_record_reg = if cdc_has_after {
Some(emit_cdc_patch_record(
program,
&table,
insertion.first_col_register(),
insertion.record_register(),
insertion.key_register(),
&ColumnLayout::from_table(&table),
))
} else {
None
};
emit_cdc_insns(
program,
resolver,
OperationMode::INSERT,
*cdc_cursor_id,
insertion.key_register(),
None,
after_record_reg,
None,
table_name.as_str(),
)?;
}
if !returning_subqueries.is_empty() {
let target_table = table_references
.joined_tables()
.first()
.expect("INSERT RETURNING target table must exist");
let cache_state = seed_returning_row_image_in_cache(
program,
&table_references,
insertion.first_col_register(),
insertion.key_register(),
resolver,
&btree_table.column_layout(),
)?;
let result: Result<()> = (|| {
for subquery in returning_subqueries
.iter_mut()
.filter(|s| !s.has_been_evaluated())
{
let rerun_for_target_scan =
subquery.reads_table(target_table.database_id, target_table.table.get_name());
let subquery_plan = subquery.consume_plan(EvalAt::Loop(0));
emit_non_from_clause_subquery(
program,
resolver,
*subquery_plan,
&subquery.query_type,
subquery.correlated || rerun_for_target_scan,
true,
)?;
}
Ok(())
})();
restore_returning_row_image_in_cache(resolver, cache_state);
result?;
}
if !result_columns.is_empty() {
emit_returning_results(
program,
&table_references,
&result_columns,
insertion.first_col_register(),
insertion.key_register(),
resolver,
ctx.returning_buffer.as_ref(),
&btree_table.column_layout(),
)?;
}
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.row_done,
});
if !upsert_actions.is_empty() {
resolve_upserts(
program,
resolver,
&mut upsert_actions,
&ctx,
&insertion,
&table,
&mut result_columns,
connection,
&mut table_references,
)?;
}
emit_epilogue(program, resolver, &ctx, inserting_multiple_rows)?;
{
let has_statement_conflict = ctx.statement_on_conflict.is_some();
let notnull_col_exists = insertion
.col_mappings
.iter()
.any(|m| m.column.notnull() && !m.column.is_rowid_alias());
let has_unique = !constraints.constraints_to_check.is_empty();
let has_triggers = has_before_triggers || has_after_triggers;
set_insert_stmt_journal_flags(
program,
resolver,
database_id,
ctx.table,
has_statement_conflict,
ctx.on_conflict,
inserting_multiple_rows,
has_triggers,
has_fks,
has_upsert,
btree_table.has_autoincrement,
notnull_col_exists,
has_unique,
);
}
program.result_columns = result_columns;
program.table_references.extend(table_references);
Ok(())
}
fn emit_check_for_user_provided_rowid(
program: &mut ProgramBuilder,
insertion: &Insertion,
ctx: &InsertEmitCtx,
) {
let must_be_int_label = program.allocate_label();
program.emit_insn(Insn::NotNull {
reg: insertion.key_register(),
target_pc: must_be_int_label,
});
program.emit_insn(Insn::Goto {
target_pc: ctx.key_labels.key_generation,
});
program.preassign_label_to_next_insn(must_be_int_label);
program.emit_insn(Insn::MustBeInt {
reg: insertion.key_register(),
});
program.emit_insn(Insn::Goto {
target_pc: ctx.key_labels.key_ready_for_check,
});
}
fn emit_epilogue(
program: &mut ProgramBuilder,
resolver: &Resolver,
ctx: &InsertEmitCtx,
inserting_multiple_rows: bool,
) -> Result<()> {
if inserting_multiple_rows {
if let Some(temp_table_ctx) = &ctx.temp_table_ctx {
program.preassign_label_to_next_insn(ctx.loop_labels.row_done);
program.emit_insn(Insn::Next {
cursor_id: temp_table_ctx.cursor_id,
pc_if_next: temp_table_ctx.loop_start_label,
});
program.preassign_label_to_next_insn(temp_table_ctx.loop_end_label);
program.emit_insn(Insn::Close {
cursor_id: temp_table_ctx.cursor_id,
});
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.stmt_epilogue,
});
} else {
program.preassign_label_to_next_insn(ctx.loop_labels.row_done);
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.loop_start,
});
if let Some(sel_eof) = ctx.loop_labels.select_exhausted {
program.preassign_label_to_next_insn(sel_eof);
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.stmt_epilogue,
});
}
}
} else {
program.preassign_label_to_next_insn(ctx.loop_labels.row_done);
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.stmt_epilogue,
});
}
program.preassign_label_to_next_insn(ctx.loop_labels.stmt_epilogue);
if let Some((cdc_cursor_id, _)) = &ctx.cdc_table {
emit_cdc_autocommit_commit(program, resolver, *cdc_cursor_id)?;
}
if let Some(ref buf) = ctx.returning_buffer {
program.emit_insn(Insn::FkCheck { deferred: false });
emit_returning_scan_back(program, buf);
}
program.preassign_label_to_next_insn(ctx.halt_label);
Ok(())
}
fn emit_partial_index_check(
program: &mut ProgramBuilder,
resolver: &Resolver,
index: &Index,
insertion: &Insertion,
table: &Arc<BTreeTable>,
) -> Result<Option<BranchOffset>> {
let Some(where_clause) = &index.where_clause else {
return Ok(None);
};
let expr = where_clause.as_ref().clone();
let columns: Vec<Column> = insertion
.col_mappings
.iter()
.map(|cm| cm.column.clone())
.collect();
let mut column_regs: Vec<usize> = insertion
.col_mappings
.iter()
.map(|cm| {
if cm.column.is_rowid_alias() {
insertion.key_register()
} else {
cm.register
}
})
.collect();
let reg = program.alloc_register();
crate::translate::expr::emit_dml_expr_index_value(
program,
resolver,
expr,
&columns,
&mut column_regs,
table,
reg,
)?;
let skip_label = program.allocate_label();
program.emit_insn(Insn::IfNot {
reg,
target_pc: skip_label,
jump_if_null: true,
});
Ok(Some(skip_label))
}
fn emit_commit_phase(
program: &mut ProgramBuilder,
resolver: &Resolver,
insertion: &Insertion,
ctx: &InsertEmitCtx,
skip_replace_indexes: bool,
) -> Result<()> {
let indices: Vec<_> = resolver.with_schema(ctx.database_id, |s| {
s.get_indices(ctx.table.name.as_str()).cloned().collect()
});
for index in &indices {
if skip_replace_indexes && index.on_conflict == Some(ResolveType::Replace) {
continue;
}
let idx_cursor_id = ctx
.idx_cursors
.iter()
.find(|(name, _, _)| name == &index.name)
.map(|(_, _, c_id)| *c_id)
.expect("no cursor found for index");
let commit_skip_label =
emit_partial_index_check(program, resolver, index, insertion, ctx.table)?;
let num_cols = index.columns.len();
let idx_start_reg = program.alloc_registers(num_cols + 1);
for (i, idx_col) in index.columns.iter().enumerate() {
emit_index_column_value_for_insert(
program,
resolver,
insertion,
ctx.table,
idx_col,
idx_start_reg + i,
)?;
}
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: idx_start_reg + num_cols,
extra_amount: 0,
});
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: to_u16(idx_start_reg),
count: to_u16(num_cols + 1),
dest_reg: to_u16(record_reg),
index_name: Some(index.name.clone()),
affinity_str: None,
});
program.emit_insn(Insn::IdxInsert {
cursor_id: idx_cursor_id,
record_reg,
unpacked_start: Some(idx_start_reg),
unpacked_count: Some((num_cols + 1) as u16),
flags: IdxInsertFlags::new().nchange(true),
});
if let Some(lbl) = commit_skip_label {
program.preassign_label_to_next_insn(lbl);
}
}
Ok(())
}
#[turso_macros::trace_stack]
fn translate_rows_and_open_tables(
program: &mut ProgramBuilder,
resolver: &Resolver,
insertion: &Insertion,
ctx: &InsertEmitCtx,
values: &[Box<Expr>],
inserting_multiple_rows: bool,
) -> Result<()> {
if inserting_multiple_rows {
let select_result_start_reg = program
.reg_result_cols_start
.unwrap_or_else(|| ctx.yield_reg_opt.unwrap() + 1);
translate_rows_multiple(
program,
insertion,
select_result_start_reg,
resolver,
&ctx.temp_table_ctx,
ctx.table.is_strict,
)?;
} else {
program.emit_insn(Insn::OpenWrite {
cursor_id: ctx.cursor_id,
root_page: RegisterOrLiteral::Literal(ctx.table.root_page),
db: ctx.database_id,
});
translate_rows_single(program, values, insertion, resolver, ctx.table.is_strict)?;
}
for idx_cursor in ctx.idx_cursors.iter() {
program.emit_insn(Insn::OpenWrite {
cursor_id: idx_cursor.2,
root_page: idx_cursor.1.into(),
db: ctx.database_id,
});
}
Ok(())
}
fn emit_rowid_generation(
program: &mut ProgramBuilder,
ctx: &InsertEmitCtx,
insertion: &Insertion,
resolver: &Resolver,
) -> Result<()> {
if let Some(AutoincMeta {
r_seq,
seq_cursor_id,
r_seq_rowid,
table_name_reg,
..
}) = ctx.autoincrement_meta
{
reload_autoincrement_state(
program,
AutoincMeta {
seq_cursor_id,
r_seq,
r_seq_rowid,
table_name_reg,
},
);
let r_max = program.alloc_register();
let dummy_reg = program.alloc_register();
program.emit_insn(Insn::NewRowid {
cursor: ctx.cursor_id,
rowid_reg: dummy_reg,
prev_largest_reg: r_max,
});
program.emit_insn(Insn::Copy {
src_reg: r_seq,
dst_reg: insertion.key_register(),
extra_amount: 0,
});
program.emit_insn(Insn::MemMax {
dest_reg: insertion.key_register(),
src_reg: r_max,
});
let no_overflow_label = program.allocate_label();
let max_i64_reg = program.alloc_register();
program.emit_insn(Insn::Integer {
dest: max_i64_reg,
value: i64::MAX,
});
program.emit_insn(Insn::Ne {
lhs: insertion.key_register(),
rhs: max_i64_reg,
target_pc: no_overflow_label,
flags: Default::default(),
collation: None,
});
program.emit_insn(Insn::Halt {
err_code: crate::error::SQLITE_FULL,
description: "database or disk is full".to_string(),
on_error: None,
description_reg: None,
});
program.preassign_label_to_next_insn(no_overflow_label);
program.emit_insn(Insn::AddImm {
register: insertion.key_register(),
value: 1,
});
emit_update_sqlite_sequence(
program,
resolver,
ctx.database_id,
seq_cursor_id,
r_seq_rowid,
table_name_reg,
insertion.key_register(),
)?;
} else {
program.emit_insn(Insn::NewRowid {
cursor: ctx.cursor_id,
rowid_reg: insertion.key_register(),
prev_largest_reg: 0,
});
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn resolve_upserts(
program: &mut ProgramBuilder,
resolver: &mut Resolver,
upsert_actions: &mut [(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)],
ctx: &InsertEmitCtx,
insertion: &Insertion,
table: &Table,
result_columns: &mut [ResultSetColumn],
connection: &Arc<crate::Connection>,
table_references: &mut TableReferences,
) -> Result<()> {
for (_, label, upsert) in upsert_actions {
program.preassign_label_to_next_insn(*label);
if let UpsertDo::Set {
ref mut sets,
ref mut where_clause,
} = upsert.do_clause
{
let mut rewritten_sets = collect_set_clauses_for_upsert(table, sets)?;
emit_upsert(
program,
table,
ctx,
insertion,
&mut rewritten_sets,
where_clause,
resolver,
result_columns,
connection,
table_references,
)?;
} else {
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.row_done,
});
}
}
Ok(())
}
fn get_valid_sqlite_sequence_table(
resolver: &Resolver,
database_id: usize,
) -> Result<Arc<BTreeTable>> {
let Some(seq_table) = resolver.with_schema(database_id, |s| {
s.get_btree_table(SQLITE_SEQUENCE_TABLE_NAME)
}) else {
crate::bail_corrupt_error!("missing sqlite_sequence table");
};
if !seq_table.has_rowid {
crate::bail_corrupt_error!("malformed sqlite_sequence: table must have rowid");
}
if seq_table.columns().len() != 2 {
crate::bail_corrupt_error!(
"malformed sqlite_sequence: expected 2 columns, got {}",
seq_table.columns().len()
);
}
let col0_name = seq_table.columns()[0].name.as_deref();
let col1_name = seq_table.columns()[1].name.as_deref();
if !matches!(col0_name, Some(name) if name.eq_ignore_ascii_case("name"))
|| !matches!(col1_name, Some(name) if name.eq_ignore_ascii_case("seq"))
{
crate::bail_corrupt_error!("malformed sqlite_sequence: expected columns (name, seq)");
}
Ok(seq_table)
}
fn init_autoincrement(
program: &mut ProgramBuilder,
ctx: &mut InsertEmitCtx,
resolver: &Resolver,
) -> Result<()> {
open_autoincrement_state(program, ctx, resolver)?;
reload_autoincrement_state(
program,
ctx.autoincrement_meta
.expect("AUTOINCREMENT metadata should be initialized"),
);
Ok(())
}
fn open_autoincrement_state(
program: &mut ProgramBuilder,
ctx: &mut InsertEmitCtx,
resolver: &Resolver,
) -> Result<()> {
let seq_table = get_valid_sqlite_sequence_table(resolver, ctx.database_id)?;
let seq_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(seq_table.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: seq_cursor_id,
root_page: seq_table.root_page.into(),
db: ctx.database_id,
});
let table_name_reg = program.emit_string8_new_reg(ctx.table.name.clone());
let r_seq = program.alloc_register();
let r_seq_rowid = program.alloc_register();
ctx.autoincrement_meta = Some(AutoincMeta {
seq_cursor_id,
r_seq,
r_seq_rowid,
table_name_reg,
});
program.emit_insn(Insn::Integer {
dest: r_seq,
value: 0,
});
program.emit_insn(Insn::Null {
dest: r_seq_rowid,
dest_end: None,
});
Ok(())
}
fn reload_autoincrement_state(program: &mut ProgramBuilder, meta: AutoincMeta) {
let AutoincMeta {
seq_cursor_id,
r_seq,
r_seq_rowid,
table_name_reg,
} = meta;
program.emit_insn(Insn::Integer {
dest: r_seq,
value: 0,
});
program.emit_insn(Insn::Null {
dest: r_seq_rowid,
dest_end: None,
});
let loop_start_label = program.allocate_label();
let loop_end_label = program.allocate_label();
let found_label = program.allocate_label();
program.emit_insn(Insn::Rewind {
cursor_id: seq_cursor_id,
pc_if_empty: loop_end_label,
});
program.preassign_label_to_next_insn(loop_start_label);
let name_col_reg = program.alloc_register();
program.emit_column_or_rowid(seq_cursor_id, 0, name_col_reg);
program.emit_insn(Insn::Ne {
lhs: table_name_reg,
rhs: name_col_reg,
target_pc: found_label,
flags: Default::default(),
collation: None,
});
program.emit_column_or_rowid(seq_cursor_id, 1, r_seq);
program.emit_insn(Insn::RowId {
cursor_id: seq_cursor_id,
dest: r_seq_rowid,
});
program.emit_insn(Insn::Goto {
target_pc: loop_end_label,
});
program.preassign_label_to_next_insn(found_label);
program.emit_insn(Insn::Next {
cursor_id: seq_cursor_id,
pc_if_next: loop_start_label,
});
program.preassign_label_to_next_insn(loop_end_label);
}
fn emit_notnulls(
program: &mut ProgramBuilder,
ctx: &InsertEmitCtx,
insertion: &Insertion,
resolver: &Resolver,
) -> Result<()> {
for column_mapping in insertion
.col_mappings
.iter()
.filter(|column_mapping| column_mapping.column.notnull())
{
if column_mapping.column.is_rowid_alias() {
continue;
}
let effective = if ctx.statement_on_conflict.is_some() {
ctx.on_conflict
} else {
column_mapping
.column
.notnull_conflict_clause
.unwrap_or(ResolveType::Abort)
};
let on_replace = matches!(effective, ResolveType::Replace);
let on_ignore = matches!(effective, ResolveType::Ignore);
if on_replace {
if let Some(default_expr) = column_mapping.column.default.as_ref() {
let skip_label = program.allocate_label();
program.emit_insn(Insn::NotNull {
reg: column_mapping.register,
target_pc: skip_label,
});
translate_expr_no_constant_opt(
program,
None,
default_expr,
column_mapping.register,
resolver,
NoConstantOptReason::RegisterReuse,
)?;
program.preassign_label_to_next_insn(skip_label);
}
}
let check_reg = if let Some(type_def) = resolver
.schema()
.get_type_def(&column_mapping.column.ty_str, ctx.table.is_strict)
{
if type_def.decode().is_some() {
let decoded_reg = program.alloc_register();
crate::translate::expr::emit_user_facing_column_value(
program,
column_mapping.register,
decoded_reg,
column_mapping.column,
ctx.table.is_strict,
resolver,
)?;
decoded_reg
} else {
column_mapping.register
}
} else {
column_mapping.register
};
if on_ignore {
program.emit_insn(Insn::IsNull {
reg: check_reg,
target_pc: ctx.loop_labels.row_done,
});
} else {
program.emit_insn(Insn::HaltIfNull {
target_reg: check_reg,
err_code: SQLITE_CONSTRAINT_NOTNULL,
description: {
let mut description = String::with_capacity(
ctx.table.name.as_str().len()
+ column_mapping
.column
.name
.as_ref()
.expect("Column name must be present")
.len()
+ 2,
);
description.push_str(ctx.table.name.as_str());
description.push('.');
description.push_str(
column_mapping
.column
.name
.as_ref()
.expect("Column name must be present"),
);
description
},
});
}
}
Ok(())
}
struct BoundInsertResult {
#[allow(clippy::vec_box)]
values: Vec<Box<Expr>>,
upsert_actions: Vec<(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)>,
inserting_multiple_rows: bool,
}
fn expr_contains_subquery(expr: &Expr) -> bool {
let mut found_subquery = false;
let _ = walk_expr(expr, &mut |e| {
if matches!(
e,
Expr::Subquery(_) | Expr::InSelect { .. } | Expr::Exists(_)
) {
found_subquery = true;
return Ok(WalkControl::SkipChildren);
}
Ok(WalkControl::Continue)
});
found_subquery
}
fn resolve_defaults_in_row(
row: &mut [Box<Expr>],
table: &Table,
columns: &[ast::Name],
resolver: &Resolver,
) {
let is_strict = table.is_strict();
for (i, expr) in row.iter_mut().enumerate() {
if !matches!(expr.as_ref(), Expr::Default) {
continue;
}
let col = if columns.is_empty() {
table.columns().iter().filter(|c| !c.hidden()).nth(i)
} else {
columns.get(i).and_then(|name| {
let name = crate::util::normalize_ident(name.as_str());
table.get_column_by_name(&name).map(|(_, col)| col)
})
};
*expr = match col {
Some(col) => col.default.clone().unwrap_or_else(|| {
if let Ok(Some(resolved)) = resolver.schema().resolve_type(&col.ty_str, is_strict) {
if let Some(default_expr) = resolved.default_expr() {
return Box::new(default_expr.clone());
}
}
Box::new(ast::Expr::Literal(ast::Literal::Null))
}),
None => Box::new(ast::Expr::Literal(ast::Literal::Null)),
};
}
}
#[turso_macros::trace_stack]
fn bind_insert(
program: &mut ProgramBuilder,
resolver: &Resolver,
table: &Table,
columns: &[ast::Name],
body: &mut InsertBody,
on_conflict: ResolveType,
database_id: usize,
) -> Result<BoundInsertResult> {
let mut values: Vec<Box<Expr>> = vec![];
let mut upsert: Option<Box<Upsert>> = None;
let mut upsert_actions: Vec<(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)> = Vec::new();
let mut inserting_multiple_rows = false;
match body {
InsertBody::DefaultValues => {
let is_strict = table.is_strict();
values = table
.columns()
.iter()
.filter(|c| !c.hidden() && !c.is_generated())
.map(|c| {
c.default.clone().unwrap_or_else(|| {
if let Ok(Some(resolved)) =
resolver.schema().resolve_type(&c.ty_str, is_strict)
{
if let Some(default_expr) = resolved.default_expr() {
return Box::new(default_expr.clone());
}
}
Box::new(ast::Expr::Literal(ast::Literal::Null))
})
})
.collect();
}
InsertBody::Select(select, upsert_opt) => {
if let OneSelect::Values(values_expr) = &mut select.body.select {
for row in values_expr.iter_mut() {
resolve_defaults_in_row(row, table, columns, resolver);
}
}
for compound in select.body.compounds.iter_mut() {
if let OneSelect::Values(values_expr) = &mut compound.select {
for row in values_expr.iter_mut() {
resolve_defaults_in_row(row, table, columns, resolver);
}
}
}
if select.body.compounds.is_empty() {
match &mut select.body.select {
OneSelect::Values(values_expr) if values_expr.len() <= 1 => {
if values_expr.is_empty() {
crate::bail_parse_error!("no values to insert");
}
let has_subquery = values_expr
.iter()
.any(|row| row.iter().any(|expr| expr_contains_subquery(expr)));
if has_subquery {
inserting_multiple_rows = true;
} else {
for expr in values_expr.iter_mut().flat_map(|v| v.iter_mut()) {
match expr.as_mut() {
Expr::Id(name) => {
if name.quoted_with('"') && resolver.dqs_dml.is_enabled() {
*expr = Expr::Literal(ast::Literal::String(
name.as_literal(),
))
.into();
} else {
crate::bail_parse_error!("no such column: {name}");
}
}
Expr::Qualified(first_name, second_name) => {
crate::bail_parse_error!(
"no such column: {first_name}.{second_name}"
);
}
_ => {}
}
bind_and_rewrite_expr(
expr,
None,
None,
resolver,
BindingBehavior::ResultColumnsNotAllowed,
)?;
}
values = values_expr.pop().unwrap_or_else(Vec::new);
}
}
_ => inserting_multiple_rows = true,
}
} else {
inserting_multiple_rows = true;
}
upsert = upsert_opt.take();
}
}
if let ResolveType::Ignore = on_conflict {
program.set_resolve_type(ResolveType::Ignore);
upsert.replace(Box::new(ast::Upsert {
do_clause: UpsertDo::Nothing,
index: None,
next: None,
}));
} else {
program.set_resolve_type(on_conflict);
}
while let Some(mut upsert_opt) = upsert.take() {
if let UpsertDo::Set {
ref mut sets,
ref mut where_clause,
} = &mut upsert_opt.do_clause
{
for set in sets.iter_mut() {
bind_and_rewrite_expr(
&mut set.expr,
None,
None,
resolver,
BindingBehavior::AllowUnboundIdentifiers,
)?;
}
if let Some(ref mut where_expr) = where_clause {
bind_and_rewrite_expr(
where_expr,
None,
None,
resolver,
BindingBehavior::AllowUnboundIdentifiers,
)?;
}
}
let next = upsert_opt.next.take();
upsert_actions.push((
resolver.with_schema(database_id, |s| {
resolve_upsert_target(s, table, &upsert_opt)
})?,
program.allocate_label(),
upsert_opt,
));
upsert = next;
}
Ok(BoundInsertResult {
values,
upsert_actions,
inserting_multiple_rows,
})
}
#[allow(clippy::too_many_arguments, clippy::vec_box)]
#[turso_macros::trace_stack]
fn init_source_emission<'a>(
program: &mut ProgramBuilder,
table: &Table,
connection: &Arc<Connection>,
ctx: &mut InsertEmitCtx<'a>,
resolver: &Resolver,
values: &mut Vec<Box<Expr>>,
body: InsertBody,
columns: &'a [ast::Name],
table_references: &TableReferences,
database_id: usize,
) -> Result<()> {
let required_column_count = if columns.is_empty() {
table.columns().iter().filter(|c| !c.is_generated()).count()
} else {
columns.len()
};
if !values.is_empty() {
if values.len() != required_column_count {
crate::bail_parse_error!(
"table {} has {required_column_count} columns but {} values were supplied",
table.get_name(),
values.len()
);
}
}
let has_insert_triggers = has_triggers_including_temp(
resolver,
database_id,
TriggerEvent::Insert,
None,
ctx.table.as_ref(),
);
let (num_values, cursor_id) = match body {
InsertBody::Select(select, _) => {
if !values.is_empty()
&& select.body.compounds.is_empty()
&& matches!(&select.body.select, OneSelect::Values(values) if values.len() <= 1)
{
(
values.len(),
program.alloc_cursor_id_keyed(
CursorKey::table(table_references.joined_tables()[0].internal_id),
CursorType::BTreeTable(ctx.table.clone()),
),
)
} else {
let yield_reg = program.alloc_register();
let jump_on_definition_label = program.allocate_label();
let start_offset_label = program.allocate_label();
program.emit_insn(Insn::InitCoroutine {
yield_reg,
jump_on_definition: jump_on_definition_label,
start_offset: start_offset_label,
});
program.preassign_label_to_next_insn(start_offset_label);
let query_destination = QueryDestination::CoroutineYield {
yield_reg,
coroutine_implementation_start: ctx.halt_label,
};
let num_result_cols = program.nested(|program| {
translate_select(select, resolver, program, query_destination, connection)
})?;
if num_result_cols != required_column_count {
crate::bail_parse_error!(
"table {} has {required_column_count} columns but {} values were supplied",
table.get_name(),
num_result_cols,
);
}
program.emit_insn(Insn::EndCoroutine { yield_reg });
program.preassign_label_to_next_insn(jump_on_definition_label);
let cursor_id = program.alloc_cursor_id_keyed(
CursorKey::table(table_references.joined_tables()[0].internal_id),
CursorType::BTreeTable(ctx.table.clone()),
);
if program.is_table_open(table) || has_insert_triggers {
let temp_cursor_id =
program.alloc_cursor_id(CursorType::BTreeTable(ctx.table.clone()));
ctx.temp_table_ctx = Some(TempTableCtx {
cursor_id: temp_cursor_id,
loop_start_label: program.allocate_label(),
loop_end_label: program.allocate_label(),
});
program.emit_insn(Insn::OpenEphemeral {
cursor_id: temp_cursor_id,
is_table: true,
});
program.preassign_label_to_next_insn(ctx.loop_labels.loop_start);
let yield_label = program.allocate_label();
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: yield_label, subtype_clear_start_reg: 0,
subtype_clear_count: 0,
});
let record_reg = program.alloc_register();
let affinity_str = if columns.is_empty() {
ctx.table
.columns()
.iter()
.filter(|col| !col.hidden() && !col.is_generated())
.map(|col| col.affinity_with_strict(ctx.table.is_strict).aff_mask())
.collect::<String>()
} else {
columns
.iter()
.map(|col_name| {
let column_name = normalize_ident(col_name.as_str());
if ROWID_STRS
.iter()
.any(|s| s.eq_ignore_ascii_case(&column_name))
{
return Ok(Affinity::Integer.aff_mask());
}
table
.get_column_by_name(&column_name)
.map(|(_, col)| {
col.affinity_with_strict(ctx.table.is_strict).aff_mask()
})
.ok_or_else(|| {
crate::error::LimboError::ParseError(format!(
"table {} has no column named {}",
table.get_name(),
column_name
))
})
})
.collect::<Result<String>>()?
};
program.emit_insn(Insn::MakeRecord {
start_reg: to_u16(program.reg_result_cols_start.unwrap_or(yield_reg + 1)),
count: to_u16(num_result_cols),
dest_reg: to_u16(record_reg),
index_name: None,
affinity_str: Some(affinity_str),
});
let rowid_reg = program.alloc_register();
program.emit_insn(Insn::NewRowid {
cursor: temp_cursor_id,
rowid_reg,
prev_largest_reg: 0,
});
program.emit_insn(Insn::Insert {
cursor: temp_cursor_id,
key_reg: rowid_reg,
record_reg,
flag: InsertFlags::new()
.require_seek()
.is_ephemeral_table_insert(),
table_name: "".to_string(),
});
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.loop_start,
});
program.preassign_label_to_next_insn(yield_label);
program.emit_insn(Insn::OpenWrite {
cursor_id,
root_page: RegisterOrLiteral::Literal(ctx.table.root_page),
db: ctx.database_id,
});
} else {
program.emit_insn(Insn::OpenWrite {
cursor_id,
root_page: RegisterOrLiteral::Literal(ctx.table.root_page),
db: ctx.database_id,
});
program.preassign_label_to_next_insn(ctx.loop_labels.loop_start);
let select_exhausted = program.allocate_label();
ctx.loop_labels.select_exhausted = Some(select_exhausted);
program.emit_insn(Insn::Yield {
yield_reg,
end_offset: select_exhausted,
subtype_clear_start_reg: 0,
subtype_clear_count: 0,
});
}
ctx.yield_reg_opt = Some(yield_reg);
(num_result_cols, cursor_id)
}
}
InsertBody::DefaultValues => {
let storable_columns: Vec<_> = table
.columns()
.iter()
.filter(|c| !c.is_generated())
.collect();
let num_values = storable_columns.len();
let is_strict = table.is_strict();
values.extend(storable_columns.iter().map(|c| {
c.default.clone().unwrap_or_else(|| {
if let Ok(Some(resolved)) = resolver.schema().resolve_type(&c.ty_str, is_strict)
{
if let Some(default_expr) = resolved.default_expr() {
return Box::new(default_expr.clone());
}
}
Box::new(ast::Expr::Literal(ast::Literal::Null))
})
}));
(
num_values,
program.alloc_cursor_id_keyed(
CursorKey::table(table_references.joined_tables()[0].internal_id),
CursorType::BTreeTable(ctx.table.clone()),
),
)
}
};
ctx.num_values = num_values;
ctx.cursor_id = cursor_id;
Ok(())
}
#[derive(Clone, Copy)]
pub struct AutoincMeta {
seq_cursor_id: usize,
r_seq: usize,
r_seq_rowid: usize,
table_name_reg: usize,
}
pub static ROWID_COLUMN: std::sync::LazyLock<Column> = std::sync::LazyLock::new(|| {
Column::new(
None, String::new(), None, None, schema::Type::Integer,
None,
ColDef {
primary_key: true,
rowid_alias: true,
notnull: true,
explicit_notnull: false,
hidden: false,
unique: false,
notnull_conflict_clause: None,
},
)
});
#[derive(Debug)]
pub struct Insertion<'a> {
key: InsertionKey<'a>,
col_mappings: Vec<ColMapping<'a>>,
record_reg: usize,
base_reg: usize,
num_non_virtual_cols: usize,
}
impl<'a> Insertion<'a> {
pub fn key_register(&self) -> usize {
self.key.register()
}
pub fn first_col_register(&self) -> usize {
self.base_reg
}
pub fn record_register(&self) -> usize {
self.record_reg
}
fn has_virtual_columns(&self) -> bool {
self.col_mappings.len() - self.num_non_virtual_cols > 0
}
pub fn get_col_mapping_by_name(&self, name: &str) -> Option<&ColMapping<'a>> {
if let InsertionKey::RowidAlias(mapping) = &self.key {
if mapping
.column
.name
.as_ref()
.is_some_and(|n| n.eq_ignore_ascii_case(name))
{
return Some(mapping);
}
}
self.col_mappings.iter().find(|col| {
col.column
.name
.as_ref()
.is_some_and(|n| n.eq_ignore_ascii_case(name))
})
}
}
#[derive(Debug)]
enum InsertionKey<'a> {
Autogenerated { register: usize },
LiteralRowid {
value_index: Option<usize>,
register: usize,
},
RowidAlias(ColMapping<'a>),
}
impl InsertionKey<'_> {
fn register(&self) -> usize {
match self {
InsertionKey::Autogenerated { register } => *register,
InsertionKey::LiteralRowid { register, .. } => *register,
InsertionKey::RowidAlias(x) => x.register,
}
}
fn is_provided_by_user(&self) -> bool {
!matches!(self, InsertionKey::Autogenerated { .. })
}
fn column_name(&self) -> &str {
match self {
InsertionKey::RowidAlias(x) => x
.column
.name
.as_ref()
.expect("rowid alias column must be present")
.as_str(),
InsertionKey::LiteralRowid { .. } => ROWID_STRS[0],
InsertionKey::Autogenerated { .. } => ROWID_STRS[0],
}
}
}
#[derive(Debug)]
pub struct ColMapping<'a> {
pub column: &'a Column,
pub value_index: Option<usize>,
pub register: usize,
}
fn build_insertion<'a>(
program: &mut ProgramBuilder,
table: &'a Table,
columns: &'a [ast::Name],
num_values: usize,
) -> Result<Insertion<'a>> {
let table_columns = table.columns();
let num_cols = table_columns.len();
let rowid_register = program.alloc_register();
let mut insertion_key = InsertionKey::Autogenerated {
register: rowid_register,
};
let layout = table
.btree()
.map(|bt| bt.column_layout())
.unwrap_or(ColumnLayout::Identity {
column_count: num_cols,
});
let base_reg = program.alloc_registers(num_cols);
let mut column_mappings = table_columns
.iter()
.enumerate()
.map(|(i, c)| ColMapping {
column: c,
value_index: None,
register: layout.to_register(base_reg, i),
})
.collect::<Vec<_>>();
if columns.is_empty() {
let num_storable_columns = table_columns
.iter()
.filter(|c| !c.hidden() && !c.is_generated())
.count();
if num_values != num_storable_columns {
crate::bail_parse_error!(
"table {} has {} columns but {} values were supplied",
&table.get_name(),
num_storable_columns,
num_values
);
}
let mut value_idx = 0;
for (i, col) in table_columns.iter().enumerate() {
if col.hidden() || col.is_generated() {
continue;
}
if col.is_rowid_alias() {
insertion_key = InsertionKey::RowidAlias(ColMapping {
column: col,
value_index: Some(value_idx),
register: rowid_register,
});
} else {
column_mappings[i].value_index = Some(value_idx);
}
value_idx += 1;
}
} else {
for (value_index, column_name) in columns.iter().enumerate() {
let column_name = normalize_ident(column_name.as_str());
if let Some((idx_in_table, col_in_table)) = table.get_column_by_name(&column_name) {
col_in_table.ensure_not_generated("INSERT into", &column_name)?;
if col_in_table.is_rowid_alias() {
insertion_key = InsertionKey::RowidAlias(ColMapping {
column: col_in_table,
value_index: Some(value_index),
register: rowid_register,
});
} else if column_mappings[idx_in_table].value_index.is_none() {
column_mappings[idx_in_table].value_index = Some(value_index);
}
} else if ROWID_STRS
.iter()
.any(|s| s.eq_ignore_ascii_case(&column_name))
{
if let Some(col_in_table) = table.columns().iter().find(|c| c.is_rowid_alias()) {
insertion_key = InsertionKey::RowidAlias(ColMapping {
column: col_in_table,
value_index: Some(value_index),
register: rowid_register,
});
} else {
insertion_key = InsertionKey::LiteralRowid {
value_index: Some(value_index),
register: rowid_register,
};
}
} else {
crate::bail_parse_error!(
"table {} has no column named {}",
&table.get_name(),
column_name
);
}
}
}
Ok(Insertion {
key: insertion_key,
col_mappings: column_mappings,
record_reg: program.alloc_register(),
base_reg,
num_non_virtual_cols: layout.num_non_virtual_cols(),
})
}
fn translate_rows_multiple<'short, 'long: 'short>(
program: &mut ProgramBuilder,
insertion: &'short Insertion<'long>,
yield_reg: usize,
resolver: &Resolver,
temp_table_ctx: &Option<TempTableCtx>,
is_strict: bool,
) -> Result<()> {
if let Some(ref temp_table_ctx) = temp_table_ctx {
program.emit_insn(Insn::Rewind {
cursor_id: temp_table_ctx.cursor_id,
pc_if_empty: temp_table_ctx.loop_end_label,
});
program.preassign_label_to_next_insn(temp_table_ctx.loop_start_label);
}
let translate_value_fn =
|prg: &mut ProgramBuilder, value_index: usize, column_register: usize| {
if let Some(temp_table_ctx) = temp_table_ctx {
prg.emit_insn(Insn::Column {
cursor_id: temp_table_ctx.cursor_id,
column: value_index,
dest: column_register,
default: None,
});
} else {
prg.emit_insn(Insn::Copy {
src_reg: yield_reg + value_index,
dst_reg: column_register,
extra_amount: 0,
});
}
Ok(())
};
translate_rows_base(program, insertion, translate_value_fn, resolver, is_strict)
}
fn translate_rows_single(
program: &mut ProgramBuilder,
value: &[Box<Expr>],
insertion: &Insertion,
resolver: &Resolver,
is_strict: bool,
) -> Result<()> {
let translate_value_fn =
|prg: &mut ProgramBuilder, value_index: usize, column_register: usize| -> Result<()> {
translate_expr_no_constant_opt(
prg,
None,
value.get(value_index).unwrap_or_else(|| {
panic!("value index out of bounds: {value_index} for value: {value:?}")
}),
column_register,
resolver,
NoConstantOptReason::RegisterReuse,
)?;
Ok(())
};
translate_rows_base(program, insertion, translate_value_fn, resolver, is_strict)
}
fn translate_rows_base<'short, 'long: 'short>(
program: &mut ProgramBuilder,
insertion: &'short Insertion<'long>,
mut translate_value_fn: impl FnMut(&mut ProgramBuilder, usize, usize) -> Result<()>,
resolver: &Resolver,
is_strict: bool,
) -> Result<()> {
translate_key(
program,
insertion,
&mut translate_value_fn,
resolver,
is_strict,
)?;
for col in insertion.col_mappings.iter() {
translate_column(
program,
col.column,
col.register,
col.value_index,
&mut translate_value_fn,
resolver,
is_strict,
)?;
}
Ok(())
}
fn translate_key(
program: &mut ProgramBuilder,
insertion: &Insertion,
mut translate_value_fn: impl FnMut(&mut ProgramBuilder, usize, usize) -> Result<()>,
resolver: &Resolver,
is_strict: bool,
) -> Result<()> {
match &insertion.key {
InsertionKey::RowidAlias(rowid_alias_column) => translate_column(
program,
rowid_alias_column.column,
rowid_alias_column.register,
rowid_alias_column.value_index,
&mut translate_value_fn,
resolver,
is_strict,
),
InsertionKey::LiteralRowid {
value_index,
register,
} => translate_column(
program,
&ROWID_COLUMN,
*register,
*value_index,
&mut translate_value_fn,
resolver,
is_strict,
),
InsertionKey::Autogenerated { .. } => Ok(()), }
}
fn translate_column(
program: &mut ProgramBuilder,
column: &Column,
column_register: usize,
value_index: Option<usize>,
translate_value_fn: &mut impl FnMut(&mut ProgramBuilder, usize, usize) -> Result<()>,
resolver: &Resolver,
is_strict: bool,
) -> Result<()> {
if let Some(value_index) = value_index {
let union_td = resolver
.schema()
.get_type_def_unchecked(&column.ty_str)
.filter(|td| td.is_union())
.cloned();
let prev = program.target_union_type.take();
program.target_union_type = union_td;
let result = translate_value_fn(program, value_index, column_register);
program.target_union_type = prev;
result?;
} else if column.is_rowid_alias() {
program.emit_insn(Insn::SoftNull {
reg: column_register,
});
} else if column.is_virtual_generated() {
} else if column.hidden() {
program.emit_insn(Insn::Null {
dest: column_register,
dest_end: None,
});
} else if let Some(default_expr) = column.default.as_ref() {
translate_expr(program, None, default_expr, column_register, resolver)?;
} else if let Ok(Some(resolved)) = resolver.schema().resolve_type(&column.ty_str, is_strict) {
if let Some(default_expr) = resolved.default_expr() {
translate_expr(program, None, default_expr, column_register, resolver)?;
} else {
program.emit_insn(Insn::Null {
dest: column_register,
dest_end: None,
});
}
} else {
program.emit_insn(Insn::Null {
dest: column_register,
dest_end: None,
});
}
Ok(())
}
fn emit_pk_uniqueness_check(
program: &mut ProgramBuilder,
ctx: &mut InsertEmitCtx,
resolver: &mut Resolver,
insertion: &Insertion,
position: Option<usize>,
upsert_catch_all: Option<usize>,
preflight: &mut PreflightCtx,
) -> Result<()> {
if !ctx.table.has_rowid {
let pk_regs = program.alloc_registers(ctx.table.primary_key_columns.len());
let pk_affinities = ctx
.table
.primary_key_columns
.iter()
.map(|(name, _)| {
let col = insertion
.get_col_mapping_by_name(name)
.unwrap_or_else(|| panic!("primary key column missing from insertion: {name}"));
col.column
.affinity_with_strict(ctx.table.is_strict)
.aff_mask()
})
.collect::<String>();
for (i, (name, _)) in ctx.table.primary_key_columns.iter().enumerate() {
let src_reg = insertion
.get_col_mapping_by_name(name)
.unwrap_or_else(|| panic!("primary key column missing from insertion: {name}"))
.register;
program.emit_insn(Insn::Copy {
src_reg,
dst_reg: pk_regs + i,
extra_amount: 0,
});
}
program.emit_insn(Insn::Affinity {
start_reg: pk_regs,
count: NonZeroUsize::new(ctx.table.primary_key_columns.len())
.expect("WITHOUT ROWID tables must have a primary key"),
affinities: pk_affinities,
});
let no_conflict = program.allocate_label();
program.emit_insn(Insn::NoConflict {
cursor_id: ctx.cursor_id,
target_pc: no_conflict,
record_reg: pk_regs,
num_regs: ctx.table.primary_key_columns.len(),
});
if let Some(position) = position.or(upsert_catch_all) {
program.emit_insn(Insn::Goto {
target_pc: preflight.upsert_actions[position].1,
});
} else if matches!(preflight.effective_on_conflict, ResolveType::Ignore) {
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.row_done,
});
} else {
let raw_desc = ctx
.table
.primary_key_columns
.iter()
.map(|(name, _)| format!("{}.{}", ctx.table.name, name))
.collect::<Vec<_>>()
.join(", ");
let (description, on_error) = halt_desc_and_on_error(
&raw_desc,
preflight.effective_on_conflict,
program.flags.has_statement_conflict(),
);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description,
on_error,
description_reg: None,
});
}
program.preassign_label_to_next_insn(no_conflict);
return Ok(());
}
let make_record_label = program.allocate_label();
program.emit_insn(Insn::NotExists {
cursor: ctx.cursor_id,
rowid_reg: insertion.key_register(),
target_pc: make_record_label,
});
let rowid_column_name = insertion.key.column_name();
'emit_halt: {
if preflight.on_replace {
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: ctx.conflict_rowid_reg,
extra_amount: 0,
});
emit_replace_delete_conflicting_row(
program,
resolver,
preflight.connection,
ctx,
preflight.table_references,
)?;
program.emit_insn(Insn::Goto {
target_pc: make_record_label,
});
break 'emit_halt;
}
if let Some(position) = position.or(upsert_catch_all) {
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: ctx.conflict_rowid_reg,
extra_amount: 0,
});
program.emit_insn(Insn::Goto {
target_pc: preflight.upsert_actions[position].1,
});
break 'emit_halt;
}
if matches!(preflight.effective_on_conflict, ResolveType::Ignore) {
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.row_done,
});
break 'emit_halt;
}
let raw_desc = format!("{}.{}", ctx.table.name, rowid_column_name);
let (description, on_error) = halt_desc_and_on_error(
&raw_desc,
preflight.effective_on_conflict,
program.flags.has_statement_conflict(),
);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_PRIMARYKEY,
description,
on_error,
description_reg: None,
});
}
program.preassign_label_to_next_insn(make_record_label);
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn emit_index_uniqueness_check(
program: &mut ProgramBuilder,
ctx: &mut InsertEmitCtx,
resolver: &mut Resolver,
insertion: &Insertion,
index: &Index,
position: Option<usize>,
upsert_catch_all: Option<usize>,
preflight: &mut PreflightCtx,
) -> Result<()> {
let idx_cursor_id = ctx
.idx_cursors
.iter()
.find(|(name, _, _)| name == &index.name)
.map(|(_, _, c_id)| *c_id)
.expect("no cursor found for index");
let maybe_skip_probe_label =
emit_partial_index_check(program, resolver, index, insertion, ctx.table)?;
let num_cols = index.columns.len();
let idx_start_reg = program.alloc_registers(num_cols + 1);
for (i, idx_col) in index.columns.iter().enumerate() {
emit_index_column_value_for_insert(
program,
resolver,
insertion,
ctx.table,
idx_col,
idx_start_reg + i,
)?;
}
program.emit_insn(Insn::Copy {
src_reg: insertion.key_register(),
dst_reg: idx_start_reg + num_cols,
extra_amount: 0,
});
if index.unique {
emit_unique_index_check(
program,
ctx,
resolver,
index,
idx_cursor_id,
idx_start_reg,
num_cols,
position,
upsert_catch_all,
preflight,
)?;
} else {
if preflight.on_replace {
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: to_u16(idx_start_reg),
count: to_u16(num_cols + 1),
dest_reg: to_u16(record_reg),
index_name: Some(index.name.clone()),
affinity_str: None,
});
program.emit_insn(Insn::IdxInsert {
cursor_id: idx_cursor_id,
record_reg,
unpacked_start: Some(idx_start_reg),
unpacked_count: Some((num_cols + 1) as u16),
flags: IdxInsertFlags::new().nchange(true),
});
}
}
if let Some(lbl) = maybe_skip_probe_label {
program.preassign_label_to_next_insn(lbl);
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
fn emit_unique_index_check(
program: &mut ProgramBuilder,
ctx: &mut InsertEmitCtx,
resolver: &mut Resolver,
index: &Index,
idx_cursor_id: usize,
idx_start_reg: usize,
num_cols: usize,
position: Option<usize>,
upsert_catch_all: Option<usize>,
preflight: &mut PreflightCtx,
) -> Result<()> {
let aff = index
.columns
.iter()
.map(|ic| {
if ic.expr.is_some() {
Affinity::Blob.aff_mask()
} else {
ctx.table.columns()[ic.pos_in_table]
.affinity_with_strict(ctx.table.is_strict)
.aff_mask()
}
})
.collect::<String>();
program.emit_insn(Insn::Affinity {
start_reg: idx_start_reg,
count: NonZeroUsize::new(num_cols).expect("nonzero col count"),
affinities: aff,
});
if !preflight.upsert_actions.is_empty() {
let next_check = program.allocate_label();
program.emit_insn(Insn::NoConflict {
cursor_id: idx_cursor_id,
target_pc: next_check,
record_reg: idx_start_reg,
num_regs: num_cols,
});
if let Some(position) = position.or(upsert_catch_all) {
match &preflight.upsert_actions[position].2.do_clause {
UpsertDo::Nothing => {
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.row_done,
});
}
UpsertDo::Set { .. } => {
program.emit_insn(Insn::IdxRowId {
cursor_id: idx_cursor_id,
dest: ctx.conflict_rowid_reg,
});
program.emit_insn(Insn::Goto {
target_pc: preflight.upsert_actions[position].1,
});
}
}
}
let raw_desc = format_unique_violation_desc(ctx.table.name.as_str(), index);
let (description, on_error) = halt_desc_and_on_error(
&raw_desc,
preflight.effective_on_conflict,
program.flags.has_statement_conflict(),
);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_UNIQUE,
description,
on_error,
description_reg: None,
});
program.preassign_label_to_next_insn(next_check);
} else {
let ok = program.allocate_label();
program.emit_insn(Insn::NoConflict {
cursor_id: idx_cursor_id,
target_pc: ok,
record_reg: idx_start_reg,
num_regs: num_cols,
});
if preflight.on_replace {
program.emit_insn(Insn::IdxRowId {
cursor_id: idx_cursor_id,
dest: ctx.conflict_rowid_reg,
});
emit_replace_delete_conflicting_row(
program,
resolver,
preflight.connection,
ctx,
preflight.table_references,
)?;
program.emit_insn(Insn::Goto { target_pc: ok });
} else if matches!(preflight.effective_on_conflict, ResolveType::Ignore) {
program.emit_insn(Insn::Goto {
target_pc: ctx.loop_labels.row_done,
});
} else {
let raw_desc = format_unique_violation_desc(ctx.table.name.as_str(), index);
let (description, on_error) = halt_desc_and_on_error(
&raw_desc,
preflight.effective_on_conflict,
program.flags.has_statement_conflict(),
);
program.emit_insn(Insn::Halt {
err_code: SQLITE_CONSTRAINT_UNIQUE,
description,
on_error,
description_reg: None,
});
}
program.preassign_label_to_next_insn(ok);
if preflight.on_replace {
let record_reg = program.alloc_register();
program.emit_insn(Insn::MakeRecord {
start_reg: to_u16(idx_start_reg),
count: to_u16(num_cols + 1),
dest_reg: to_u16(record_reg),
index_name: Some(index.name.clone()),
affinity_str: None,
});
program.emit_insn(Insn::IdxInsert {
cursor_id: idx_cursor_id,
record_reg,
unpacked_start: Some(idx_start_reg),
unpacked_count: Some((num_cols + 1) as u16),
flags: IdxInsertFlags::new().nchange(true),
});
}
}
Ok(())
}
fn emit_preflight_constraint_checks(
program: &mut ProgramBuilder,
ctx: &mut InsertEmitCtx,
resolver: &mut Resolver,
insertion: &Insertion,
constraints: &ConstraintsToCheck,
preflight: &mut PreflightCtx,
) -> Result<()> {
let mut seen_replace = false;
for (constraint, position) in &constraints.constraints_to_check {
let effective = if ctx.statement_on_conflict.is_some() {
ctx.on_conflict
} else {
match constraint {
ResolvedUpsertTarget::PrimaryKey => {
ctx.table
.rowid_alias_conflict_clause
.unwrap_or(ResolveType::Abort)
}
ResolvedUpsertTarget::Index(index) => {
index.on_conflict.unwrap_or(ResolveType::Abort)
}
ResolvedUpsertTarget::CatchAll => unreachable!(),
}
};
if effective == ResolveType::Replace {
seen_replace = true;
} else {
turso_assert!(
!seen_replace,
"non-REPLACE constraint after REPLACE constraint — sort order invariant violated"
);
}
let effective_on_replace =
matches!(effective, ResolveType::Replace) && preflight.upsert_actions.is_empty();
preflight.on_replace = effective_on_replace;
preflight.effective_on_conflict = effective;
match constraint {
ResolvedUpsertTarget::PrimaryKey => {
emit_pk_uniqueness_check(
program,
ctx,
resolver,
insertion,
*position,
constraints.upsert_catch_all_position,
preflight,
)?;
}
ResolvedUpsertTarget::Index(index) => {
emit_index_uniqueness_check(
program,
ctx,
resolver,
insertion,
index,
*position,
constraints.upsert_catch_all_position,
preflight,
)?;
}
ResolvedUpsertTarget::CatchAll => unreachable!(),
}
}
Ok(())
}
fn translate_virtual_table_insert(
program: &mut ProgramBuilder,
virtual_table: Arc<VirtualTable>,
columns: Vec<ast::Name>,
mut body: InsertBody,
on_conflict: Option<ResolveType>,
resolver: &Resolver,
connection: &Arc<crate::Connection>,
) -> Result<()> {
#[cfg(not(feature = "cli_only"))]
let _ = connection;
let allow_dbpage_write = {
#[cfg(feature = "cli_only")]
{
virtual_table.name == crate::dbpage::DBPAGE_TABLE_NAME
&& connection.db.opts.unsafe_testing
}
#[cfg(not(feature = "cli_only"))]
{
false
}
};
if virtual_table.readonly() && !allow_dbpage_write {
crate::bail_constraint_error!("Table is read-only: {}", virtual_table.name);
}
let (num_values, value) = match &mut body {
InsertBody::Select(select, None) => match &mut select.body.select {
OneSelect::Values(values) => (values[0].len(), values.pop().unwrap()),
_ => crate::bail_parse_error!("Virtual tables only support VALUES clause in INSERT"),
},
InsertBody::DefaultValues => (0, vec![]),
_ => crate::bail_parse_error!("Unsupported INSERT body for virtual tables"),
};
let table = Table::Virtual(virtual_table.clone());
let cursor_id = program.alloc_cursor_id(CursorType::VirtualTable(virtual_table));
program.emit_insn(Insn::VOpen { cursor_id });
if !allow_dbpage_write {
program.emit_insn(Insn::VBegin { cursor_id });
}
let registers_start = program.alloc_register();
program.emit_insn(Insn::Null {
dest: registers_start,
dest_end: None,
});
let insertion = build_insertion(program, &table, &columns, num_values)?;
translate_rows_single(program, &value, &insertion, resolver, false)?;
let conflict_action = on_conflict.as_ref().map(|c| c.bit_value()).unwrap_or(0) as u16;
program.emit_insn(Insn::VUpdate {
cursor_id,
arg_count: insertion.col_mappings.len() + 2, start_reg: registers_start,
conflict_action,
});
program.emit_insn(Insn::Close { cursor_id });
let halt_label = program.allocate_label();
program.preassign_label_to_next_insn(halt_label);
Ok(())
}
fn ensure_sequence_initialized(
program: &mut ProgramBuilder,
resolver: &Resolver,
table: &schema::BTreeTable,
database_id: usize,
) -> Result<()> {
let seq_table = get_valid_sqlite_sequence_table(resolver, database_id)?;
let seq_cursor_id = program.alloc_cursor_id(CursorType::BTreeTable(seq_table.clone()));
program.emit_insn(Insn::OpenWrite {
cursor_id: seq_cursor_id,
root_page: seq_table.root_page.into(),
db: database_id,
});
let table_name_reg = program.emit_string8_new_reg(table.name.clone());
let loop_start_label = program.allocate_label();
let entry_exists_label = program.allocate_label();
let insert_new_label = program.allocate_label();
program.emit_insn(Insn::Rewind {
cursor_id: seq_cursor_id,
pc_if_empty: insert_new_label,
});
program.preassign_label_to_next_insn(loop_start_label);
let name_col_reg = program.alloc_register();
program.emit_column_or_rowid(seq_cursor_id, 0, name_col_reg);
program.emit_insn(Insn::Eq {
lhs: table_name_reg,
rhs: name_col_reg,
target_pc: entry_exists_label,
flags: Default::default(),
collation: None,
});
program.emit_insn(Insn::Next {
cursor_id: seq_cursor_id,
pc_if_next: loop_start_label,
});
program.preassign_label_to_next_insn(insert_new_label);
let record_reg = program.alloc_register();
let record_start_reg = program.alloc_registers(2);
let zero_reg = program.alloc_register();
program.emit_insn(Insn::Integer {
dest: zero_reg,
value: 0,
});
program.emit_insn(Insn::Copy {
src_reg: table_name_reg,
dst_reg: record_start_reg,
extra_amount: 0,
});
program.emit_insn(Insn::Copy {
src_reg: zero_reg,
dst_reg: record_start_reg + 1,
extra_amount: 0,
});
let affinity_str = seq_table
.columns()
.iter()
.map(|c| c.affinity().aff_mask())
.collect();
program.emit_insn(Insn::MakeRecord {
start_reg: to_u16(record_start_reg),
count: to_u16(2),
dest_reg: to_u16(record_reg),
index_name: None,
affinity_str: Some(affinity_str),
});
let new_rowid_reg = program.alloc_register();
program.emit_insn(Insn::NewRowid {
cursor: seq_cursor_id,
rowid_reg: new_rowid_reg,
prev_largest_reg: 0,
});
program.emit_insn(Insn::Insert {
cursor: seq_cursor_id,
key_reg: new_rowid_reg,
record_reg,
flag: InsertFlags::new(),
table_name: SQLITE_SEQUENCE_TABLE_NAME.to_string(),
});
program.preassign_label_to_next_insn(entry_exists_label);
program.emit_insn(Insn::Close {
cursor_id: seq_cursor_id,
});
Ok(())
}
#[inline]
pub(crate) fn halt_desc_and_on_error(
raw_desc: &str,
effective: ResolveType,
has_statement_conflict: bool,
) -> (String, Option<ResolveType>) {
if has_statement_conflict {
return (raw_desc.to_string(), None);
}
match effective {
ResolveType::Fail | ResolveType::Rollback => (
format!("UNIQUE constraint failed: {raw_desc} (19)"),
Some(effective),
),
_ => (raw_desc.to_string(), None),
}
}
pub fn format_unique_violation_desc(table_name: &str, index: &Index) -> String {
if index.columns.len() == 1 {
let mut s = String::with_capacity(table_name.len() + 1 + index.columns[0].name.len());
s.push_str(table_name);
s.push('.');
s.push_str(&index.columns[0].name);
s
} else {
let mut s = String::with_capacity(table_name.len() + 3 + 4 * index.columns.len());
s.push_str(table_name);
s.push_str(".(");
s.push_str(
&index
.columns
.iter()
.map(|c| c.name.as_str())
.collect::<Vec<_>>()
.join(", "),
);
s.push(')');
s
}
}
fn emit_index_column_value_for_insert(
program: &mut ProgramBuilder,
resolver: &Resolver,
insertion: &Insertion,
table: &Arc<BTreeTable>,
idx_col: &IndexColumn,
dest_reg: usize,
) -> Result<()> {
if let Some(expr) = &idx_col.expr {
let expr = expr.as_ref().clone();
let columns: Vec<Column> = insertion
.col_mappings
.iter()
.map(|cm| cm.column.clone())
.collect();
let mut column_regs: Vec<usize> = insertion
.col_mappings
.iter()
.map(|cm| {
if cm.column.is_rowid_alias() {
insertion.key_register()
} else {
cm.register
}
})
.collect();
crate::translate::expr::emit_dml_expr_index_value(
program,
resolver,
expr,
&columns,
&mut column_regs,
table,
dest_reg,
)?;
if idx_col.pos_in_table != EXPR_INDEX_SENTINEL {
let column = &table.columns()[idx_col.pos_in_table];
if column.is_virtual_generated() {
program.emit_column_affinity(dest_reg, column.affinity());
}
}
} else {
let Some(cm) = insertion.get_col_mapping_by_name(&idx_col.name) else {
return Err(LimboError::PlanningError(
"Column not found in INSERT".to_string(),
));
};
let src_reg = if cm.column.is_rowid_alias() {
insertion.key_register()
} else {
cm.register
};
program.emit_insn(Insn::Copy {
src_reg,
dst_reg: dest_reg,
extra_amount: 0,
});
}
Ok(())
}
struct ConstraintsToCheck {
constraints_to_check: Vec<(ResolvedUpsertTarget, Option<usize>)>,
upsert_catch_all_position: Option<usize>,
}
struct PreflightCtx<'a, 'b> {
upsert_actions: &'a [(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)],
on_replace: bool,
effective_on_conflict: ResolveType,
connection: &'a Arc<Connection>,
table_references: &'b mut TableReferences,
}
#[allow(clippy::too_many_arguments)]
fn build_constraints_to_check(
table_name: &str,
upsert_actions: &[(ResolvedUpsertTarget, BranchOffset, Box<Upsert>)],
has_rowid: bool,
has_user_provided_rowid: bool,
resolver: &Resolver,
database_id: usize,
rowid_alias_conflict_clause: Option<ResolveType>,
has_statement_conflict: bool,
) -> ConstraintsToCheck {
let mut constraints_to_check = Vec::new();
if !has_rowid || has_user_provided_rowid {
let position = upsert_actions
.iter()
.position(|(target, ..)| matches!(target, ResolvedUpsertTarget::PrimaryKey));
constraints_to_check.push((ResolvedUpsertTarget::PrimaryKey, position));
}
let indices: Vec<_> = resolver.with_schema(database_id, |s| {
s.get_indices(table_name).cloned().collect()
});
for index in &indices {
let position = upsert_actions
.iter()
.position(|(target, ..)| matches!(target, ResolvedUpsertTarget::Index(x) if Arc::ptr_eq(x, index)));
constraints_to_check.push((ResolvedUpsertTarget::Index(index.clone()), position));
}
constraints_to_check.sort_by(|(_, p1), (_, p2)| match (p1, p2) {
(Some(p1), Some(p2)) => p1.cmp(p2),
(Some(_), None) => std::cmp::Ordering::Less,
(None, Some(_)) => std::cmp::Ordering::Greater,
(None, None) => std::cmp::Ordering::Equal,
});
let defer_ipk_replace_after_other_checks = !has_statement_conflict
&& rowid_alias_conflict_clause == Some(ResolveType::Replace)
&& constraints_to_check.len() > 1
&& !upsert_actions
.iter()
.any(|(t, ..)| matches!(t, ResolvedUpsertTarget::PrimaryKey));
if defer_ipk_replace_after_other_checks {
if let Some(pos) = constraints_to_check
.iter()
.position(|(c, _)| matches!(c, ResolvedUpsertTarget::PrimaryKey))
{
let pk = constraints_to_check.remove(pos);
constraints_to_check.push(pk);
}
}
turso_debug_assert!(
has_statement_conflict || {
let mut saw_replace = false;
constraints_to_check.iter().all(|(c, _)| {
let mode = match c {
ResolvedUpsertTarget::PrimaryKey => {
rowid_alias_conflict_clause.unwrap_or(ResolveType::Abort)
}
ResolvedUpsertTarget::Index(idx) => {
idx.on_conflict.unwrap_or(ResolveType::Abort)
}
ResolvedUpsertTarget::CatchAll => return true,
};
if mode == ResolveType::Replace {
saw_replace = true;
true
} else {
!saw_replace
}
})
},
"constraints must have all REPLACE entries at the end"
);
let upsert_catch_all_position =
if let Some((ResolvedUpsertTarget::CatchAll, ..)) = upsert_actions.last() {
Some(upsert_actions.len() - 1)
} else {
None
};
ConstraintsToCheck {
constraints_to_check,
upsert_catch_all_position,
}
}
fn emit_update_sqlite_sequence(
program: &mut ProgramBuilder,
resolver: &Resolver,
database_id: usize,
seq_cursor_id: usize,
r_seq_rowid: usize,
table_name_reg: usize,
new_key_reg: usize,
) -> Result<()> {
let record_reg = program.alloc_register();
let record_start_reg = program.alloc_registers(2);
program.emit_insn(Insn::Copy {
src_reg: table_name_reg,
dst_reg: record_start_reg,
extra_amount: 0,
});
program.emit_insn(Insn::Copy {
src_reg: new_key_reg,
dst_reg: record_start_reg + 1,
extra_amount: 0,
});
let seq_table = get_valid_sqlite_sequence_table(resolver, database_id)?;
let affinity_str = seq_table
.columns()
.iter()
.map(|col| col.affinity().aff_mask())
.collect::<String>();
program.emit_insn(Insn::MakeRecord {
start_reg: to_u16(record_start_reg),
count: to_u16(2),
dest_reg: to_u16(record_reg),
index_name: None,
affinity_str: Some(affinity_str),
});
let update_existing_label = program.allocate_label();
let end_update_label = program.allocate_label();
program.emit_insn(Insn::NotNull {
reg: r_seq_rowid,
target_pc: update_existing_label,
});
program.emit_insn(Insn::NewRowid {
cursor: seq_cursor_id,
rowid_reg: r_seq_rowid,
prev_largest_reg: 0,
});
program.emit_insn(Insn::Insert {
cursor: seq_cursor_id,
key_reg: r_seq_rowid,
record_reg,
flag: InsertFlags::new(),
table_name: SQLITE_SEQUENCE_TABLE_NAME.to_string(),
});
program.emit_insn(Insn::Goto {
target_pc: end_update_label,
});
program.preassign_label_to_next_insn(update_existing_label);
program.emit_insn(Insn::Insert {
cursor: seq_cursor_id,
key_reg: r_seq_rowid,
record_reg,
flag: InsertFlags(turso_parser::ast::ResolveType::Replace.bit_value() as u8),
table_name: SQLITE_SEQUENCE_TABLE_NAME.to_string(),
});
program.preassign_label_to_next_insn(end_update_label);
Ok(())
}
fn emit_replace_delete_conflicting_row(
program: &mut ProgramBuilder,
resolver: &mut Resolver,
connection: &Arc<Connection>,
ctx: &mut InsertEmitCtx,
table_references: &mut TableReferences,
) -> Result<()> {
program.emit_insn(Insn::SeekRowid {
cursor_id: ctx.cursor_id,
src_reg: ctx.conflict_rowid_reg,
target_pc: ctx.halt_label,
});
let prepared_fk_actions = if connection.foreign_keys_enabled() {
let prepared = ForeignKeyActions::prepare_fk_delete_actions(
program,
resolver,
ctx.table.name.as_str(),
ctx.cursor_id,
ctx.conflict_rowid_reg,
None,
ctx.database_id,
)?;
if resolver.schema().has_child_fks(ctx.table.name.as_str()) {
emit_fk_child_decrement_on_delete(
program,
ctx.table.as_ref(),
ctx.table.name.as_str(),
ctx.cursor_id,
ctx.conflict_rowid_reg,
ctx.database_id,
resolver,
)?;
}
prepared
} else {
ForeignKeyActions::default()
};
let table = &ctx.table;
let table_name = table.name.as_str();
let main_cursor_id = ctx.cursor_id;
for (name, _, index_cursor_id) in ctx.idx_cursors.iter() {
let index = resolver
.with_schema(ctx.database_id, |s| s.get_index(table_name, name).cloned())
.expect("index to exist");
let skip_delete_label = if index.where_clause.is_some() {
let where_copy = index
.bind_where_expr(Some(table_references), resolver)
.expect("where clause to exist");
let skip_label = program.allocate_label();
let reg = program.alloc_register();
translate_expr_no_constant_opt(
program,
Some(table_references),
&where_copy,
reg,
resolver,
NoConstantOptReason::RegisterReuse,
)?;
program.emit_insn(Insn::IfNot {
reg,
jump_if_null: true,
target_pc: skip_label,
});
Some(skip_label)
} else {
None
};
let num_regs = index.columns.len() + 1;
let start_reg = program.alloc_registers(num_regs);
let table_internal_id = table_references.joined_tables()[0].internal_id;
for (reg_offset, column_index) in index.columns.iter().enumerate() {
emit_index_column_value_old_image(
program,
resolver,
table_references,
main_cursor_id,
table_internal_id,
column_index,
start_reg + reg_offset,
)?;
}
program.emit_insn(Insn::Copy {
src_reg: ctx.conflict_rowid_reg,
dst_reg: start_reg + num_regs - 1,
extra_amount: 0,
});
program.emit_insn(Insn::IdxDelete {
start_reg,
num_regs,
cursor_id: *index_cursor_id,
raise_error_if_no_matching_entry: index.where_clause.is_none(),
});
if let Some(label) = skip_delete_label {
program.preassign_label_to_next_insn(label);
}
}
if let Some(cdc_cursor_id) = ctx.cdc_table.as_ref().map(|(id, _tbl)| *id) {
let cdc_has_before = program.capture_data_changes_info().has_before();
let before_record_reg = if cdc_has_before {
Some(emit_cdc_full_record(
program,
table.columns(),
main_cursor_id,
ctx.conflict_rowid_reg,
table.is_strict,
))
} else {
None
};
emit_cdc_insns(
program,
resolver,
OperationMode::DELETE,
cdc_cursor_id,
ctx.conflict_rowid_reg,
before_record_reg,
None,
None,
table_name,
)?;
}
program.emit_insn(Insn::Delete {
cursor_id: main_cursor_id,
table_name: table_name.to_string(),
is_part_of_update: true,
});
prepared_fk_actions.fire_prepared_fk_delete_actions(
program,
resolver,
connection,
ctx.database_id,
)?;
Ok(())
}
pub fn emit_fk_child_insert_checks(
program: &mut ProgramBuilder,
child_tbl: &BTreeTable,
new_start_reg: usize,
new_rowid_reg: usize,
resolver: &Resolver,
database_id: usize,
layout: &ColumnLayout,
) -> crate::Result<()> {
for fk_ref in
resolver.with_schema(database_id, |s| s.resolved_fks_for_child(&child_tbl.name))?
{
let is_self_ref = fk_ref.fk.parent_table.eq_ignore_ascii_case(&child_tbl.name);
let fk_ok = program.allocate_label();
for cname in &fk_ref.fk.child_columns {
let (i, col) = child_tbl.get_column(cname).unwrap();
let src = if col.is_rowid_alias() {
new_rowid_reg
} else {
layout.to_register(new_start_reg, i)
};
program.emit_insn(Insn::IsNull {
reg: src,
target_pc: fk_ok,
});
}
let parent_tbl = resolver
.with_schema(database_id, |s| s.get_btree_table(&fk_ref.fk.parent_table))
.expect("parent btree");
if fk_ref.parent_uses_rowid {
let pcur = open_read_table(program, &parent_tbl, database_id);
let (i_child, col_child) = child_tbl.get_column(&fk_ref.fk.child_columns[0]).unwrap();
let val_reg = if col_child.is_rowid_alias() {
new_rowid_reg
} else {
layout.to_register(new_start_reg, i_child)
};
let tmp = program.alloc_register();
program.emit_insn(Insn::Copy {
src_reg: val_reg,
dst_reg: tmp,
extra_amount: 0,
});
program.emit_insn(Insn::MustBeInt { reg: tmp });
if is_self_ref {
program.emit_insn(Insn::Eq {
lhs: tmp,
rhs: new_rowid_reg,
target_pc: fk_ok,
flags: CmpInsFlags::default(),
collation: None,
});
}
let violation = program.allocate_label();
program.emit_insn(Insn::NotExists {
cursor: pcur,
rowid_reg: tmp,
target_pc: violation,
});
program.emit_insn(Insn::Close { cursor_id: pcur });
program.emit_insn(Insn::Goto { target_pc: fk_ok });
program.preassign_label_to_next_insn(violation);
program.emit_insn(Insn::Close { cursor_id: pcur });
if fk_ref.fk.deferred {
emit_fk_violation(program, &fk_ref.fk)?;
} else {
emit_fk_restrict_halt(program)?;
}
program.preassign_label_to_next_insn(fk_ok);
} else {
let idx = fk_ref
.parent_unique_index
.as_ref()
.expect("parent unique index required");
let icur = open_read_index(program, idx, database_id);
let ncols = fk_ref.fk.child_columns.len();
let probe = {
let start = program.alloc_registers(ncols);
for (k, cname) in fk_ref.fk.child_columns.iter().enumerate() {
let (i, col) = child_tbl.get_column(cname).unwrap();
program.emit_insn(Insn::Copy {
src_reg: if col.is_rowid_alias() {
new_rowid_reg
} else {
layout.to_register(new_start_reg, i)
},
dst_reg: start + k,
extra_amount: 0,
});
}
if let Some(cnt) = NonZeroUsize::new(ncols) {
program.emit_insn(Insn::Affinity {
start_reg: start,
count: cnt,
affinities: build_index_affinity_string(idx, &parent_tbl),
});
}
start
};
if is_self_ref {
let parent_cols: Vec<&str> =
idx.columns.iter().map(|ic| ic.name.as_str()).collect();
let parent_new = program.alloc_registers(ncols);
for (i, pname) in parent_cols.iter().enumerate() {
let (pos, col) = child_tbl.get_column(pname).unwrap();
program.emit_insn(Insn::Copy {
src_reg: if col.is_rowid_alias() {
new_rowid_reg
} else {
new_start_reg + pos
},
dst_reg: parent_new + i,
extra_amount: 0,
});
}
if let Some(cnt) = NonZeroUsize::new(ncols) {
program.emit_insn(Insn::Affinity {
start_reg: parent_new,
count: cnt,
affinities: build_index_affinity_string(idx, &parent_tbl),
});
}
let mismatch = program.allocate_label();
for i in 0..ncols {
let cont = program.allocate_label();
program.emit_insn(Insn::Eq {
lhs: probe + i,
rhs: parent_new + i,
target_pc: cont,
flags: CmpInsFlags::default().jump_if_null(),
collation: Some(super::collate::CollationSeq::Binary),
});
program.emit_insn(Insn::Goto {
target_pc: mismatch,
});
program.preassign_label_to_next_insn(cont);
}
program.emit_insn(Insn::Goto { target_pc: fk_ok });
program.preassign_label_to_next_insn(mismatch);
}
index_probe(
program,
icur,
probe,
ncols,
|_p| Ok(()),
|p| {
if fk_ref.fk.deferred {
emit_fk_violation(p, &fk_ref.fk)?;
} else {
emit_fk_restrict_halt(p)?;
}
Ok(())
},
)?;
program.emit_insn(Insn::Goto { target_pc: fk_ok });
program.preassign_label_to_next_insn(fk_ok);
}
}
Ok(())
}
fn build_parent_key_image_for_insert(
program: &mut ProgramBuilder,
parent_table: &BTreeTable,
pref: &ResolvedFkRef,
insertion: &Insertion,
) -> crate::Result<(usize, usize)> {
let rowid_slot: [String; 1];
let parent_cols: &[String] = if pref.parent_uses_rowid {
rowid_slot = ["rowid".to_string()];
&rowid_slot
} else {
&pref.parent_cols
};
let ncols = parent_cols.len();
let start = program.alloc_registers(ncols);
for (i, pname) in parent_cols.iter().enumerate() {
let src = if pname.eq_ignore_ascii_case("rowid") {
insertion.key_register()
} else {
insertion
.get_col_mapping_by_name(pname)
.ok_or_else(|| {
crate::LimboError::PlanningError(format!(
"Column '{}' not present in INSERT image for parent {}",
pname, parent_table.name
))
})?
.register
};
program.emit_insn(Insn::Copy {
src_reg: src,
dst_reg: start + i,
extra_amount: 0,
});
}
let aff: String = if pref.parent_uses_rowid {
"i".to_string()
} else {
parent_cols
.iter()
.map(|name| {
let (_, col) = parent_table.get_column(name).ok_or_else(|| {
crate::LimboError::InternalError(format!("parent col {name} missing"))
})?;
Ok::<_, crate::LimboError>(
col.affinity_with_strict(parent_table.is_strict).aff_mask(),
)
})
.collect::<Result<String, _>>()?
};
if let Some(count) = NonZeroUsize::new(ncols) {
program.emit_insn(Insn::Affinity {
start_reg: start,
count,
affinities: aff,
});
}
Ok((start, ncols))
}
pub fn emit_parent_side_fk_decrement_on_insert(
program: &mut ProgramBuilder,
parent_table: &BTreeTable,
insertion: &Insertion,
force_immediate: bool,
resolver: &Resolver,
database_id: usize,
) -> crate::Result<()> {
for pref in resolver.with_schema(database_id, |s| {
s.resolved_fks_referencing(&parent_table.name)
})? {
let is_self_ref = pref
.child_table
.name
.eq_ignore_ascii_case(&parent_table.name);
if !force_immediate && !pref.fk.deferred && !is_self_ref {
continue;
}
let (new_pk_start, n_cols) =
build_parent_key_image_for_insert(program, parent_table, &pref, insertion)?;
let child_tbl = &pref.child_table;
let child_cols = &pref.fk.child_columns;
let indices: Vec<_> = resolver.with_schema(database_id, |s| {
s.get_indices(&child_tbl.name).cloned().collect()
});
let idx = indices.iter().find(|ix| {
ix.columns.len() == child_cols.len()
&& ix
.columns
.iter()
.zip(child_cols.iter())
.all(|(ic, cc)| ic.name.eq_ignore_ascii_case(cc))
});
if let Some(ix) = idx {
let icur = open_read_index(program, ix, database_id);
let probe_start = program.alloc_registers(n_cols);
for i in 0..n_cols {
program.emit_insn(Insn::Copy {
src_reg: new_pk_start + i,
dst_reg: probe_start + i,
extra_amount: 0,
});
}
if let Some(count) = NonZeroUsize::new(n_cols) {
program.emit_insn(Insn::Affinity {
start_reg: probe_start,
count,
affinities: build_index_affinity_string(ix, child_tbl),
});
}
let found = program.allocate_label();
program.emit_insn(Insn::Found {
cursor_id: icur,
target_pc: found,
record_reg: probe_start,
num_regs: n_cols,
});
program.emit_insn(Insn::Close { cursor_id: icur });
let skip = program.allocate_label();
program.emit_insn(Insn::Goto { target_pc: skip });
program.preassign_label_to_next_insn(found);
program.emit_insn(Insn::Close { cursor_id: icur });
emit_guarded_fk_decrement(program, skip, pref.fk.deferred);
program.preassign_label_to_next_insn(skip);
} else {
let ccur = open_read_table(program, child_tbl, database_id);
let done = program.allocate_label();
program.emit_insn(Insn::Rewind {
cursor_id: ccur,
pc_if_empty: done,
});
let loop_top = program.allocate_label();
let next_row = program.allocate_label();
program.preassign_label_to_next_insn(loop_top);
for (i, child_name) in child_cols.iter().enumerate() {
let (pos, _) = child_tbl.get_column(child_name).ok_or_else(|| {
crate::LimboError::InternalError(format!("child col {child_name} missing"))
})?;
let tmp = program.alloc_register();
program.emit_insn(Insn::Column {
cursor_id: ccur,
column: pos,
dest: tmp,
default: None,
});
program.emit_insn(Insn::IsNull {
reg: tmp,
target_pc: next_row,
});
let cont = program.allocate_label();
program.emit_insn(Insn::Eq {
lhs: tmp,
rhs: new_pk_start + i,
target_pc: cont,
flags: CmpInsFlags::default().jump_if_null(),
collation: Some(super::collate::CollationSeq::Binary),
});
program.emit_insn(Insn::Goto {
target_pc: next_row,
});
program.preassign_label_to_next_insn(cont);
}
emit_guarded_fk_decrement(program, next_row, pref.fk.deferred);
program.preassign_label_to_next_insn(next_row);
program.emit_insn(Insn::Next {
cursor_id: ccur,
pc_if_next: loop_top,
});
program.preassign_label_to_next_insn(done);
program.emit_insn(Insn::Close { cursor_id: ccur });
}
}
Ok(())
}
fn emit_custom_type_encode(
program: &mut ProgramBuilder,
resolver: &Resolver,
insertion: &Insertion,
table_name: &str,
) -> Result<()> {
let columns: Vec<_> = insertion
.col_mappings
.iter()
.map(|m| m.column.clone())
.collect();
let layout = ColumnLayout::from_columns(&columns);
crate::translate::expr::emit_custom_type_encode_columns(
program,
resolver,
&columns,
insertion.first_col_register(),
None, table_name,
&layout,
)
}