use std::sync::Arc;
use crate::core::{Error, ForeignKeyAction, ForeignKeyConstraint, Result, Schema, Value};
use crate::storage::expression::Expression as StorageExpression;
use crate::storage::mvcc::engine::MVCCEngine;
use crate::storage::traits::Engine;
pub(crate) fn check_parent_exists(
engine: &MVCCEngine,
txn_id: i64,
schema: &Schema,
row: &crate::core::Row,
) -> Result<()> {
for fk in &schema.foreign_keys {
let fk_value = match row.get(fk.column_index) {
Some(v) if !v.is_null() => v,
_ => continue, };
if !parent_row_exists(
engine,
txn_id,
&fk.referenced_table,
&fk.referenced_column,
fk_value,
)? {
return Err(Error::foreign_key_violation(
&schema.table_name,
&fk.column_name,
&fk.referenced_table,
&fk.referenced_column,
format!(
"referenced row with {} = {} does not exist",
fk.referenced_column, fk_value
),
));
}
}
Ok(())
}
pub(crate) fn validate_fk_value(
engine: &MVCCEngine,
txn_id: i64,
fk: &ForeignKeyConstraint,
value: &Value,
child_table: &str,
) -> Result<()> {
if !parent_row_exists(
engine,
txn_id,
&fk.referenced_table,
&fk.referenced_column,
value,
)? {
return Err(Error::foreign_key_violation(
child_table,
&fk.column_name,
&fk.referenced_table,
&fk.referenced_column,
format!(
"referenced row with {} = {} does not exist",
fk.referenced_column, value
),
));
}
Ok(())
}
fn parent_row_exists(
engine: &MVCCEngine,
txn_id: i64,
parent_table: &str,
parent_column: &str,
value: &Value,
) -> Result<bool> {
let parent_schema = engine.get_table_schema(parent_table).map_err(|_| {
Error::internal(format!(
"foreign key references non-existent table '{}'",
parent_table
))
})?;
let (_, ref_col) = parent_schema.find_column(parent_column).ok_or_else(|| {
Error::internal(format!(
"foreign key references non-existent column '{}' in table '{}'",
parent_column, parent_table
))
})?;
let mut expr = crate::storage::expression::ComparisonExpr::new(
ref_col.name.as_str(),
crate::core::Operator::Eq,
value.clone(),
);
expr.prepare_for_schema(&parent_schema);
let parent = engine.get_table_for_txn(txn_id, parent_table)?;
let rows = parent.collect_rows_with_limit_unordered(Some(&expr), 1, 0)?;
Ok(!rows.is_empty())
}
pub(crate) fn find_referencing_fks(
engine: &MVCCEngine,
parent_table: &str,
) -> Arc<Vec<(String, ForeignKeyConstraint)>> {
engine.find_referencing_fks(parent_table)
}
pub(crate) fn enforce_delete_actions_iter<'a>(
engine: &MVCCEngine,
txn_id: i64,
parent_table: &str,
deleted_pk_values: impl Iterator<Item = &'a Value>,
referencing_fks: &[(String, ForeignKeyConstraint)],
) -> Result<i32> {
if referencing_fks.is_empty() {
return Ok(0);
}
let mut total_affected = 0i32;
for pk_value in deleted_pk_values {
for (child_table_name, fk) in referencing_fks {
let action = fk.on_delete;
match action {
ForeignKeyAction::Restrict | ForeignKeyAction::NoAction => {
if child_rows_exist(engine, txn_id, child_table_name, fk, pk_value)? {
return Err(Error::foreign_key_violation(
child_table_name,
&fk.column_name,
parent_table,
&fk.referenced_column,
format!(
"cannot delete row with {} = {} — still referenced by table '{}'",
fk.referenced_column, pk_value, child_table_name
),
));
}
}
ForeignKeyAction::Cascade => {
let affected = cascade_delete(engine, txn_id, child_table_name, fk, pk_value)?;
total_affected = total_affected.saturating_add(affected);
}
ForeignKeyAction::SetNull => {
let affected =
set_null_on_delete(engine, txn_id, child_table_name, fk, pk_value)?;
total_affected = total_affected.saturating_add(affected);
}
}
}
}
Ok(total_affected)
}
pub(crate) fn pre_check_restrict_for_update(
engine: &MVCCEngine,
txn_id: i64,
parent_table: &str,
old_value: &Value,
referencing_fks: &[(String, ForeignKeyConstraint)],
) -> Result<()> {
pre_check_restrict_recursive(engine, txn_id, parent_table, old_value, referencing_fks, 0)
}
pub(crate) fn fk_tree_needs_precheck(
engine: &MVCCEngine,
referencing_fks: &[(String, ForeignKeyConstraint)],
) -> bool {
fk_tree_needs_precheck_recursive(engine, referencing_fks, 0)
}
fn fk_tree_needs_precheck_recursive(
engine: &MVCCEngine,
referencing_fks: &[(String, ForeignKeyConstraint)],
depth: usize,
) -> bool {
if depth >= MAX_CASCADE_DEPTH {
return true; }
for (child_table_name, fk) in referencing_fks {
match fk.on_update {
ForeignKeyAction::Restrict | ForeignKeyAction::NoAction => {
return true;
}
ForeignKeyAction::Cascade | ForeignKeyAction::SetNull => {
let grandchild_fks = find_referencing_fks(engine, child_table_name);
let child_fk_col = &fk.column_name;
let relevant: Vec<_> = grandchild_fks
.iter()
.filter(|(_, gfk)| gfk.referenced_column == *child_fk_col)
.cloned()
.collect();
if !relevant.is_empty()
&& fk_tree_needs_precheck_recursive(engine, &relevant, depth + 1)
{
return true;
}
}
}
}
false
}
fn pre_check_restrict_recursive(
engine: &MVCCEngine,
txn_id: i64,
parent_table: &str,
old_value: &Value,
referencing_fks: &[(String, ForeignKeyConstraint)],
depth: usize,
) -> Result<()> {
for (child_table_name, fk) in referencing_fks {
if !child_rows_exist(engine, txn_id, child_table_name, fk, old_value)? {
continue;
}
match fk.on_update {
ForeignKeyAction::Restrict | ForeignKeyAction::NoAction => {
return Err(Error::foreign_key_violation(
child_table_name,
&fk.column_name,
parent_table,
&fk.referenced_column,
format!(
"cannot update row with {} = {} — still referenced by table '{}'",
fk.referenced_column, old_value, child_table_name
),
));
}
ForeignKeyAction::Cascade | ForeignKeyAction::SetNull => {
if depth >= MAX_CASCADE_DEPTH {
return Err(Error::internal(format!(
"foreign key CASCADE depth limit ({}) exceeded — possible circular reference",
MAX_CASCADE_DEPTH
)));
}
let grandchild_fks = find_referencing_fks(engine, child_table_name);
let child_fk_col = &fk.column_name;
let relevant: Vec<_> = grandchild_fks
.iter()
.filter(|(_, gfk)| gfk.referenced_column == *child_fk_col)
.cloned()
.collect();
if !relevant.is_empty() {
pre_check_restrict_recursive(
engine,
txn_id,
child_table_name,
old_value,
&relevant,
depth + 1,
)?;
}
}
}
}
Ok(())
}
pub(crate) fn enforce_update_actions(
engine: &MVCCEngine,
txn_id: i64,
old_pk_value: &Value,
new_pk_value: &Value,
referencing_fks: &[(String, ForeignKeyConstraint)],
) -> Result<i32> {
if referencing_fks.is_empty() {
return Ok(0);
}
let mut total_affected = 0i32;
for (child_table_name, fk) in referencing_fks {
let action = fk.on_update;
match action {
ForeignKeyAction::Restrict | ForeignKeyAction::NoAction => {
}
ForeignKeyAction::Cascade => {
let affected = cascade_update(
engine,
txn_id,
child_table_name,
fk,
old_pk_value,
new_pk_value,
)?;
total_affected = total_affected.saturating_add(affected);
}
ForeignKeyAction::SetNull => {
let affected =
set_null_on_delete(engine, txn_id, child_table_name, fk, old_pk_value)?;
total_affected = total_affected.saturating_add(affected);
}
}
}
Ok(total_affected)
}
fn child_rows_exist(
engine: &MVCCEngine,
txn_id: i64,
child_table: &str,
fk: &ForeignKeyConstraint,
parent_pk_value: &Value,
) -> Result<bool> {
let child = engine.get_table_for_txn(txn_id, child_table)?;
let child_schema = child.schema();
let col_name = &child_schema.columns[fk.column_index].name;
let mut expr = crate::storage::expression::ComparisonExpr::new(
col_name.as_str(),
crate::core::Operator::Eq,
parent_pk_value.clone(),
);
expr.prepare_for_schema(child_schema);
let rows = child.collect_rows_with_limit_unordered(Some(&expr), 1, 0)?;
Ok(!rows.is_empty())
}
const MAX_CASCADE_DEPTH: usize = 16;
fn cascade_delete(
engine: &MVCCEngine,
txn_id: i64,
child_table: &str,
fk: &ForeignKeyConstraint,
parent_pk_value: &Value,
) -> Result<i32> {
cascade_delete_recursive(engine, txn_id, child_table, fk, parent_pk_value, 0)
}
fn cascade_delete_recursive(
engine: &MVCCEngine,
txn_id: i64,
child_table: &str,
fk: &ForeignKeyConstraint,
parent_pk_value: &Value,
depth: usize,
) -> Result<i32> {
if depth >= MAX_CASCADE_DEPTH {
return Err(Error::internal(format!(
"foreign key CASCADE depth limit ({}) exceeded — possible circular reference",
MAX_CASCADE_DEPTH
)));
}
let grandchild_fks = find_referencing_fks(engine, child_table);
let mut deleted_child_pks: Vec<Value> = Vec::new();
if !grandchild_fks.is_empty() {
let child_schema = engine.get_table_schema(child_table)?;
if let Some(pk_idx) = child_schema.pk_column_index() {
let child_handle = engine.get_table_for_txn(txn_id, child_table)?;
let col_name = &child_schema.columns[fk.column_index].name;
let mut filter = crate::storage::expression::ComparisonExpr::new(
col_name.as_str(),
crate::core::Operator::Eq,
parent_pk_value.clone(),
);
filter.prepare_for_schema(&child_schema);
let rows = child_handle.collect_all_rows(Some(&filter))?;
for (_, row) in rows.iter() {
if let Some(pk_val) = row.get(pk_idx) {
deleted_child_pks.push(pk_val.clone());
}
}
}
}
if !grandchild_fks.is_empty() && !deleted_child_pks.is_empty() {
for child_pk in &deleted_child_pks {
for (grandchild_table, grandchild_fk) in grandchild_fks.iter() {
if matches!(
grandchild_fk.on_delete,
ForeignKeyAction::Restrict | ForeignKeyAction::NoAction
) && child_rows_exist(engine, txn_id, grandchild_table, grandchild_fk, child_pk)?
{
return Err(Error::foreign_key_violation(
grandchild_table,
&grandchild_fk.column_name,
child_table,
&grandchild_fk.referenced_column,
format!(
"cannot cascade-delete row with {} = {} — still referenced by table '{}'",
grandchild_fk.referenced_column, child_pk, grandchild_table
),
));
}
}
}
}
let mut child = engine.get_table_for_txn(txn_id, child_table)?;
let child_schema = child.schema();
let col_name = &child_schema.columns[fk.column_index].name;
let mut expr = crate::storage::expression::ComparisonExpr::new(
col_name.as_str(),
crate::core::Operator::Eq,
parent_pk_value.clone(),
);
expr.prepare_for_schema(child_schema);
let count = child.delete(Some(&expr))?;
let mut total = count;
if !grandchild_fks.is_empty() && !deleted_child_pks.is_empty() {
for child_pk in &deleted_child_pks {
for (grandchild_table, grandchild_fk) in grandchild_fks.iter() {
match grandchild_fk.on_delete {
ForeignKeyAction::Restrict | ForeignKeyAction::NoAction => {
}
ForeignKeyAction::Cascade => {
let affected = cascade_delete_recursive(
engine,
txn_id,
grandchild_table,
grandchild_fk,
child_pk,
depth + 1,
)?;
total = total.saturating_add(affected);
}
ForeignKeyAction::SetNull => {
let affected = set_null_on_delete(
engine,
txn_id,
grandchild_table,
grandchild_fk,
child_pk,
)?;
total = total.saturating_add(affected);
}
}
}
}
}
Ok(total)
}
fn cascade_update(
engine: &MVCCEngine,
txn_id: i64,
child_table: &str,
fk: &ForeignKeyConstraint,
old_value: &Value,
new_value: &Value,
) -> Result<i32> {
cascade_update_recursive(engine, txn_id, child_table, fk, old_value, new_value, 0)
}
fn cascade_update_recursive(
engine: &MVCCEngine,
txn_id: i64,
child_table: &str,
fk: &ForeignKeyConstraint,
old_value: &Value,
new_value: &Value,
depth: usize,
) -> Result<i32> {
if !child_rows_exist(engine, txn_id, child_table, fk, old_value)? {
return Ok(0);
}
if depth >= MAX_CASCADE_DEPTH {
return Err(Error::internal(format!(
"foreign key CASCADE depth limit ({}) exceeded — possible circular reference",
MAX_CASCADE_DEPTH
)));
}
let grandchild_fks = find_referencing_fks(engine, child_table);
let child_fk_col = &fk.column_name;
let relevant_grandchild_fks: Vec<_> = grandchild_fks
.iter()
.filter(|(_, gfk)| gfk.referenced_column == *child_fk_col)
.collect();
if !relevant_grandchild_fks.is_empty() {
for (grandchild_table, grandchild_fk) in &relevant_grandchild_fks {
if matches!(
grandchild_fk.on_update,
ForeignKeyAction::Restrict | ForeignKeyAction::NoAction
) && child_rows_exist(engine, txn_id, grandchild_table, grandchild_fk, old_value)?
{
return Err(Error::foreign_key_violation(
grandchild_table,
&grandchild_fk.column_name,
child_table,
&grandchild_fk.referenced_column,
format!(
"cannot cascade-update row with {} = {} — still referenced by table '{}'",
grandchild_fk.referenced_column, old_value, grandchild_table
),
));
}
}
}
let mut child = engine.get_table_for_txn(txn_id, child_table)?;
let col_idx = fk.column_index;
let new_val = new_value.clone();
let child_schema = child.schema();
let col_name = &child_schema.columns[col_idx].name;
let mut expr = crate::storage::expression::ComparisonExpr::new(
col_name.as_str(),
crate::core::Operator::Eq,
old_value.clone(),
);
expr.prepare_for_schema(child_schema);
let count = child.update(Some(&expr), &mut |mut row| {
let _ = row.set(col_idx, new_val.clone());
Ok((row, true))
})?;
let mut total = count;
if !relevant_grandchild_fks.is_empty() && count > 0 {
for (grandchild_table, grandchild_fk) in &relevant_grandchild_fks {
match grandchild_fk.on_update {
ForeignKeyAction::Restrict | ForeignKeyAction::NoAction => {
}
ForeignKeyAction::Cascade => {
let affected = cascade_update_recursive(
engine,
txn_id,
grandchild_table,
grandchild_fk,
old_value,
new_value,
depth + 1,
)?;
total = total.saturating_add(affected);
}
ForeignKeyAction::SetNull => {
let null_val = Value::null(
engine.get_table_schema(grandchild_table)?.columns
[grandchild_fk.column_index]
.data_type,
);
let affected = cascade_update_recursive(
engine,
txn_id,
grandchild_table,
grandchild_fk,
old_value,
&null_val,
depth + 1,
)?;
total = total.saturating_add(affected);
}
}
}
}
Ok(total)
}
fn set_null_on_delete(
engine: &MVCCEngine,
txn_id: i64,
child_table: &str,
fk: &ForeignKeyConstraint,
parent_pk_value: &Value,
) -> Result<i32> {
let mut child = engine.get_table_for_txn(txn_id, child_table)?;
let col_idx = fk.column_index;
let child_schema = child.schema();
if !child_schema.columns[col_idx].nullable {
return Err(Error::foreign_key_violation(
child_table,
&fk.column_name,
&fk.referenced_table,
&fk.referenced_column,
format!(
"cannot SET NULL on non-nullable column '{}'",
fk.column_name
),
));
}
let null_val = Value::null(child_schema.columns[col_idx].data_type);
let col_name = &child_schema.columns[col_idx].name;
let mut expr = crate::storage::expression::ComparisonExpr::new(
col_name.as_str(),
crate::core::Operator::Eq,
parent_pk_value.clone(),
);
expr.prepare_for_schema(child_schema);
let count = child.update(Some(&expr), &mut |mut row| {
let _ = row.set(col_idx, null_val.clone());
Ok((row, true))
})?;
Ok(count)
}
pub(crate) fn check_no_referencing_rows(
engine: &MVCCEngine,
parent_table: &str,
txn_id: Option<i64>,
) -> Result<()> {
let referencing = find_referencing_fks(engine, parent_table);
if referencing.is_empty() {
return Ok(());
}
for (child_table, fk) in referencing.iter() {
let child_schema = engine.get_table_schema(child_table)?;
let col_name = &child_schema.columns[fk.column_index].name;
let mut not_null_expr =
crate::storage::expression::NullCheckExpr::is_not_null(col_name.as_str());
not_null_expr.prepare_for_schema(&child_schema);
let has_ref = if let Some(tid) = txn_id {
let child = engine.get_table_for_txn(tid, child_table)?;
!child
.collect_rows_with_limit_unordered(Some(¬_null_expr), 1, 0)?
.is_empty()
} else {
let tx = engine.begin_transaction()?;
let child = tx.get_table(child_table)?;
!child
.collect_rows_with_limit_unordered(Some(¬_null_expr), 1, 0)?
.is_empty()
};
if has_ref {
return Err(Error::foreign_key_violation(
child_table,
&fk.column_name,
parent_table,
&fk.referenced_column,
format!(
"cannot drop/truncate table '{}' — rows in '{}' still reference it",
parent_table, child_table
),
));
}
}
Ok(())
}