stoolap 0.4.0

High-performance embedded SQL database with MVCC, time-travel queries, and full ACID compliance
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
// Copyright 2025 Stoolap Contributors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Foreign Key Constraint Enforcement
//!
//! This module provides helpers for checking referential integrity:
//! - On INSERT/UPDATE: verify parent rows exist (index-based O(log n) / O(1))
//! - On DELETE/UPDATE of parent: enforce RESTRICT/CASCADE/SET NULL
//!
//! All operations participate in the caller's transaction (via `txn_id`), ensuring:
//! - CASCADE effects are atomic with the parent operation
//! - FK checks see uncommitted rows from the current transaction
//! - No independent transactions are created (no resource leaks)
//!
//! Performance guarantees:
//! - Zero cost for non-FK tables (all checks short-circuit on empty foreign_keys)
//! - Cached reverse FK mapping (rebuilt only on schema_epoch change)
//! - Index-based parent lookups (no table scans when index exists)

use std::sync::Arc;

use crate::core::{Error, ForeignKeyAction, ForeignKeyConstraint, Result, Schema, Value};
use crate::storage::expression::Expression as StorageExpression;
use crate::storage::mvcc::engine::MVCCEngine;
use crate::storage::traits::Engine;

/// Check that all FK values in a row reference existing parent rows.
/// Called on INSERT and UPDATE (when FK columns change).
///
/// Uses `txn_id` to check within the caller's transaction, so uncommitted
/// parent rows (inserted in the same transaction) are visible.
///
/// Short-circuits immediately if schema has no FKs (zero cost for non-FK tables).
pub(crate) fn check_parent_exists(
    engine: &MVCCEngine,
    txn_id: i64,
    schema: &Schema,
    row: &crate::core::Row,
) -> Result<()> {
    for fk in &schema.foreign_keys {
        let fk_value = match row.get(fk.column_index) {
            Some(v) if !v.is_null() => v,
            _ => continue, // NULL FK is allowed (no reference)
        };

        if !parent_row_exists(
            engine,
            txn_id,
            &fk.referenced_table,
            &fk.referenced_column,
            fk_value,
        )? {
            return Err(Error::foreign_key_violation(
                &schema.table_name,
                &fk.column_name,
                &fk.referenced_table,
                &fk.referenced_column,
                format!(
                    "referenced row with {} = {} does not exist",
                    fk.referenced_column, fk_value
                ),
            ));
        }
    }
    Ok(())
}

/// Pre-validate a single FK value against its parent table.
/// Used for early validation of constant SET values in UPDATE statements
/// to prevent dirty state in explicit transactions.
///
/// NULL values are allowed (no reference) and should be skipped by the caller.
pub(crate) fn validate_fk_value(
    engine: &MVCCEngine,
    txn_id: i64,
    fk: &ForeignKeyConstraint,
    value: &Value,
    child_table: &str,
) -> Result<()> {
    if !parent_row_exists(
        engine,
        txn_id,
        &fk.referenced_table,
        &fk.referenced_column,
        value,
    )? {
        return Err(Error::foreign_key_violation(
            child_table,
            &fk.column_name,
            &fk.referenced_table,
            &fk.referenced_column,
            format!(
                "referenced row with {} = {} does not exist",
                fk.referenced_column, value
            ),
        ));
    }
    Ok(())
}

/// Check if a value exists in the parent table's referenced column.
///
/// Uses `collect_rows_with_limit_unordered(limit=1)` with a ComparisonExpr filter
/// for both correctness and performance:
/// - O(1) via PK fast path when referenced column is the primary key (common case)
/// - O(log N) via secondary index when available
/// - Falls back to filtered scan with early termination otherwise
/// - Always txn-aware: sees uncommitted INSERTs, respects uncommitted DELETEs
fn parent_row_exists(
    engine: &MVCCEngine,
    txn_id: i64,
    parent_table: &str,
    parent_column: &str,
    value: &Value,
) -> Result<bool> {
    let parent_schema = engine.get_table_schema(parent_table).map_err(|_| {
        Error::internal(format!(
            "foreign key references non-existent table '{}'",
            parent_table
        ))
    })?;

    let (_, ref_col) = parent_schema.find_column(parent_column).ok_or_else(|| {
        Error::internal(format!(
            "foreign key references non-existent column '{}' in table '{}'",
            parent_column, parent_table
        ))
    })?;

    // Build a ComparisonExpr for `column = value` — the storage layer will use
    // PK fast path (O(1)) or secondary index (O(log N)) when available
    let mut expr = crate::storage::expression::ComparisonExpr::new(
        ref_col.name.as_str(),
        crate::core::Operator::Eq,
        value.clone(),
    );
    expr.prepare_for_schema(&parent_schema);

    let parent = engine.get_table_for_txn(txn_id, parent_table)?;
    let rows = parent.collect_rows_with_limit_unordered(Some(&expr), 1, 0)?;
    Ok(!rows.is_empty())
}

/// Find all foreign key constraints in other tables that reference the given parent table.
/// Delegates to the engine's cached reverse mapping (rebuilt only on schema_epoch change).
/// Returns Arc-wrapped Vec (ref-count bump only, no cloning).
pub(crate) fn find_referencing_fks(
    engine: &MVCCEngine,
    parent_table: &str,
) -> Arc<Vec<(String, ForeignKeyConstraint)>> {
    engine.find_referencing_fks(parent_table)
}

/// Enforce referential actions for DELETE from a parent table.
/// Accepts an iterator of PK values to avoid allocating a separate Vec.
///
/// All CASCADE/SET NULL operations use `txn_id` to participate in the caller's
/// transaction, ensuring atomicity (rollback undoes cascade effects).
///
/// For each deleted PK value, checks all child tables:
/// - RESTRICT/NO ACTION: error if child rows exist
/// - CASCADE: delete matching child rows (batched per child table)
/// - SET NULL: set FK column to NULL in matching child rows (batched per child table)
///
/// Returns the total count of cascaded/affected child rows.
pub(crate) fn enforce_delete_actions_iter<'a>(
    engine: &MVCCEngine,
    txn_id: i64,
    parent_table: &str,
    deleted_pk_values: impl Iterator<Item = &'a Value>,
    referencing_fks: &[(String, ForeignKeyConstraint)],
) -> Result<i32> {
    if referencing_fks.is_empty() {
        return Ok(0);
    }

    let mut total_affected = 0i32;

    for pk_value in deleted_pk_values {
        for (child_table_name, fk) in referencing_fks {
            let action = fk.on_delete;

            match action {
                ForeignKeyAction::Restrict | ForeignKeyAction::NoAction => {
                    // Check if any child rows reference this PK value
                    if child_rows_exist(engine, txn_id, child_table_name, fk, pk_value)? {
                        return Err(Error::foreign_key_violation(
                            child_table_name,
                            &fk.column_name,
                            parent_table,
                            &fk.referenced_column,
                            format!(
                                "cannot delete row with {} = {} — still referenced by table '{}'",
                                fk.referenced_column, pk_value, child_table_name
                            ),
                        ));
                    }
                }
                ForeignKeyAction::Cascade => {
                    // Delete matching child rows within the caller's transaction
                    let affected = cascade_delete(engine, txn_id, child_table_name, fk, pk_value)?;
                    total_affected = total_affected.saturating_add(affected);
                }
                ForeignKeyAction::SetNull => {
                    // Set FK column to NULL in matching child rows
                    let affected =
                        set_null_on_delete(engine, txn_id, child_table_name, fk, pk_value)?;
                    total_affected = total_affected.saturating_add(affected);
                }
            }
        }
    }

    Ok(total_affected)
}

/// Pre-check RESTRICT constraints and CASCADE depth before writing parent rows.
/// Walks the full FK tree (including recursive grandchild RESTRICT behind CASCADE/SET NULL)
/// to detect violations before any rows are modified, preserving statement atomicity.
/// Returns true if the tree has constraints that need row-level pre-checking.
pub(crate) fn pre_check_restrict_for_update(
    engine: &MVCCEngine,
    txn_id: i64,
    parent_table: &str,
    old_value: &Value,
    referencing_fks: &[(String, ForeignKeyConstraint)],
) -> Result<()> {
    pre_check_restrict_recursive(engine, txn_id, parent_table, old_value, referencing_fks, 0)
}

/// Check if the FK tree rooted at these referencing FKs needs row-level pre-checking.
/// Returns true if any path contains RESTRICT/NoAction or exceeds CASCADE depth.
/// This is a metadata-only walk (no row scans) used to skip the expensive pre-scan
/// when the tree is pure CASCADE/SET NULL within depth limits.
pub(crate) fn fk_tree_needs_precheck(
    engine: &MVCCEngine,
    referencing_fks: &[(String, ForeignKeyConstraint)],
) -> bool {
    fk_tree_needs_precheck_recursive(engine, referencing_fks, 0)
}

fn fk_tree_needs_precheck_recursive(
    engine: &MVCCEngine,
    referencing_fks: &[(String, ForeignKeyConstraint)],
    depth: usize,
) -> bool {
    if depth >= MAX_CASCADE_DEPTH {
        return true; // depth limit will be hit → needs pre-check
    }
    for (child_table_name, fk) in referencing_fks {
        match fk.on_update {
            ForeignKeyAction::Restrict | ForeignKeyAction::NoAction => {
                return true;
            }
            ForeignKeyAction::Cascade | ForeignKeyAction::SetNull => {
                let grandchild_fks = find_referencing_fks(engine, child_table_name);
                let child_fk_col = &fk.column_name;
                let relevant: Vec<_> = grandchild_fks
                    .iter()
                    .filter(|(_, gfk)| gfk.referenced_column == *child_fk_col)
                    .cloned()
                    .collect();
                if !relevant.is_empty()
                    && fk_tree_needs_precheck_recursive(engine, &relevant, depth + 1)
                {
                    return true;
                }
            }
        }
    }
    false
}

fn pre_check_restrict_recursive(
    engine: &MVCCEngine,
    txn_id: i64,
    parent_table: &str,
    old_value: &Value,
    referencing_fks: &[(String, ForeignKeyConstraint)],
    depth: usize,
) -> Result<()> {
    for (child_table_name, fk) in referencing_fks {
        // Check if child rows actually reference old_value before doing anything.
        // If no matching rows exist, neither RESTRICT nor CASCADE applies.
        if !child_rows_exist(engine, txn_id, child_table_name, fk, old_value)? {
            continue;
        }
        match fk.on_update {
            ForeignKeyAction::Restrict | ForeignKeyAction::NoAction => {
                return Err(Error::foreign_key_violation(
                    child_table_name,
                    &fk.column_name,
                    parent_table,
                    &fk.referenced_column,
                    format!(
                        "cannot update row with {} = {} — still referenced by table '{}'",
                        fk.referenced_column, old_value, child_table_name
                    ),
                ));
            }
            ForeignKeyAction::Cascade | ForeignKeyAction::SetNull => {
                // Child rows exist and will be cascaded. Check depth limit
                // now — if exceeded, the actual cascade would fail too.
                if depth >= MAX_CASCADE_DEPTH {
                    return Err(Error::internal(format!(
                        "foreign key CASCADE depth limit ({}) exceeded — possible circular reference",
                        MAX_CASCADE_DEPTH
                    )));
                }
                let grandchild_fks = find_referencing_fks(engine, child_table_name);
                let child_fk_col = &fk.column_name;
                let relevant: Vec<_> = grandchild_fks
                    .iter()
                    .filter(|(_, gfk)| gfk.referenced_column == *child_fk_col)
                    .cloned()
                    .collect();
                if !relevant.is_empty() {
                    pre_check_restrict_recursive(
                        engine,
                        txn_id,
                        child_table_name,
                        old_value,
                        &relevant,
                        depth + 1,
                    )?;
                }
            }
        }
    }
    Ok(())
}

/// Enforce referential actions for UPDATE of a referenced column.
/// RESTRICT is already handled by pre_check_restrict_for_update before
/// the parent row is written. This function only dispatches CASCADE/SET NULL.
pub(crate) fn enforce_update_actions(
    engine: &MVCCEngine,
    txn_id: i64,
    old_pk_value: &Value,
    new_pk_value: &Value,
    referencing_fks: &[(String, ForeignKeyConstraint)],
) -> Result<i32> {
    if referencing_fks.is_empty() {
        return Ok(0);
    }

    let mut total_affected = 0i32;

    for (child_table_name, fk) in referencing_fks {
        let action = fk.on_update;

        match action {
            ForeignKeyAction::Restrict | ForeignKeyAction::NoAction => {
                // RESTRICT is already enforced by pre_check_restrict_for_update
                // before the parent row is written. Nothing to do here.
            }
            ForeignKeyAction::Cascade => {
                let affected = cascade_update(
                    engine,
                    txn_id,
                    child_table_name,
                    fk,
                    old_pk_value,
                    new_pk_value,
                )?;
                total_affected = total_affected.saturating_add(affected);
            }
            ForeignKeyAction::SetNull => {
                let affected =
                    set_null_on_delete(engine, txn_id, child_table_name, fk, old_pk_value)?;
                total_affected = total_affected.saturating_add(affected);
            }
        }
    }

    Ok(total_affected)
}

/// Check if any child rows in the child table reference the given parent PK value.
///
/// Uses `collect_rows_with_limit_unordered(limit=1)` with a ComparisonExpr filter
/// for both correctness and performance:
/// - O(log N) via secondary index when an index exists on the FK column
/// - Falls back to filtered scan with early termination otherwise
/// - Always txn-aware: sees uncommitted INSERTs, respects uncommitted DELETEs
fn child_rows_exist(
    engine: &MVCCEngine,
    txn_id: i64,
    child_table: &str,
    fk: &ForeignKeyConstraint,
    parent_pk_value: &Value,
) -> Result<bool> {
    let child = engine.get_table_for_txn(txn_id, child_table)?;
    let child_schema = child.schema();

    // Build a ComparisonExpr for `fk_column = parent_pk_value`
    let col_name = &child_schema.columns[fk.column_index].name;
    let mut expr = crate::storage::expression::ComparisonExpr::new(
        col_name.as_str(),
        crate::core::Operator::Eq,
        parent_pk_value.clone(),
    );
    expr.prepare_for_schema(child_schema);

    let rows = child.collect_rows_with_limit_unordered(Some(&expr), 1, 0)?;
    Ok(!rows.is_empty())
}

/// Maximum CASCADE recursion depth to prevent infinite loops from circular FK references.
const MAX_CASCADE_DEPTH: usize = 16;

/// CASCADE DELETE: delete all child rows referencing the given parent PK value.
/// Operates within the caller's transaction (no independent commit).
/// Recursively cascades to grandchild tables (up to MAX_CASCADE_DEPTH).
fn cascade_delete(
    engine: &MVCCEngine,
    txn_id: i64,
    child_table: &str,
    fk: &ForeignKeyConstraint,
    parent_pk_value: &Value,
) -> Result<i32> {
    cascade_delete_recursive(engine, txn_id, child_table, fk, parent_pk_value, 0)
}

fn cascade_delete_recursive(
    engine: &MVCCEngine,
    txn_id: i64,
    child_table: &str,
    fk: &ForeignKeyConstraint,
    parent_pk_value: &Value,
    depth: usize,
) -> Result<i32> {
    if depth >= MAX_CASCADE_DEPTH {
        return Err(Error::internal(format!(
            "foreign key CASCADE depth limit ({}) exceeded — possible circular reference",
            MAX_CASCADE_DEPTH
        )));
    }

    // Before deleting child rows, collect their PK values for recursive CASCADE.
    // This is needed because the child table may itself be a parent with CASCADE children.
    let grandchild_fks = find_referencing_fks(engine, child_table);
    let mut deleted_child_pks: Vec<Value> = Vec::new();

    if !grandchild_fks.is_empty() {
        // Need to collect PK values of rows about to be deleted
        let child_schema = engine.get_table_schema(child_table)?;
        if let Some(pk_idx) = child_schema.pk_column_index() {
            let child_handle = engine.get_table_for_txn(txn_id, child_table)?;
            // Use filtered scan instead of full table scan
            let col_name = &child_schema.columns[fk.column_index].name;
            let mut filter = crate::storage::expression::ComparisonExpr::new(
                col_name.as_str(),
                crate::core::Operator::Eq,
                parent_pk_value.clone(),
            );
            filter.prepare_for_schema(&child_schema);
            let rows = child_handle.collect_all_rows(Some(&filter))?;
            for (_, row) in rows.iter() {
                if let Some(pk_val) = row.get(pk_idx) {
                    deleted_child_pks.push(pk_val.clone());
                }
            }
        }
    }

    // Pre-check: verify grandchild RESTRICT constraints BEFORE deleting child rows.
    // If we deleted children first and a grandchild RESTRICT check fails, the child
    // deletions would remain in the transaction state (orphaning data in explicit txns).
    if !grandchild_fks.is_empty() && !deleted_child_pks.is_empty() {
        for child_pk in &deleted_child_pks {
            for (grandchild_table, grandchild_fk) in grandchild_fks.iter() {
                if matches!(
                    grandchild_fk.on_delete,
                    ForeignKeyAction::Restrict | ForeignKeyAction::NoAction
                ) && child_rows_exist(engine, txn_id, grandchild_table, grandchild_fk, child_pk)?
                {
                    return Err(Error::foreign_key_violation(
                        grandchild_table,
                        &grandchild_fk.column_name,
                        child_table,
                        &grandchild_fk.referenced_column,
                        format!(
                            "cannot cascade-delete row with {} = {} — still referenced by table '{}'",
                            grandchild_fk.referenced_column, child_pk, grandchild_table
                        ),
                    ));
                }
            }
        }
    }

    // Now delete the matching child rows (safe — RESTRICT checks passed above)
    let mut child = engine.get_table_for_txn(txn_id, child_table)?;
    let child_schema = child.schema();
    let col_name = &child_schema.columns[fk.column_index].name;
    let mut expr = crate::storage::expression::ComparisonExpr::new(
        col_name.as_str(),
        crate::core::Operator::Eq,
        parent_pk_value.clone(),
    );
    expr.prepare_for_schema(child_schema);

    let count = child.delete(Some(&expr))?;
    // Do NOT commit here — changes stay in TransactionVersionStore and are committed
    // atomically when the parent transaction's commit_all_tables() runs.

    let mut total = count;

    // Recursively enforce CASCADE/SET NULL on grandchild tables (RESTRICT already checked above)
    if !grandchild_fks.is_empty() && !deleted_child_pks.is_empty() {
        for child_pk in &deleted_child_pks {
            for (grandchild_table, grandchild_fk) in grandchild_fks.iter() {
                match grandchild_fk.on_delete {
                    ForeignKeyAction::Restrict | ForeignKeyAction::NoAction => {
                        // Already checked above — skip
                    }
                    ForeignKeyAction::Cascade => {
                        let affected = cascade_delete_recursive(
                            engine,
                            txn_id,
                            grandchild_table,
                            grandchild_fk,
                            child_pk,
                            depth + 1,
                        )?;
                        total = total.saturating_add(affected);
                    }
                    ForeignKeyAction::SetNull => {
                        let affected = set_null_on_delete(
                            engine,
                            txn_id,
                            grandchild_table,
                            grandchild_fk,
                            child_pk,
                        )?;
                        total = total.saturating_add(affected);
                    }
                }
            }
        }
    }

    Ok(total)
}

/// CASCADE UPDATE: update FK column in all child rows from old to new value.
/// Recursively cascades to grandchild tables (up to MAX_CASCADE_DEPTH).
/// Operates within the caller's transaction (no independent commit).
fn cascade_update(
    engine: &MVCCEngine,
    txn_id: i64,
    child_table: &str,
    fk: &ForeignKeyConstraint,
    old_value: &Value,
    new_value: &Value,
) -> Result<i32> {
    cascade_update_recursive(engine, txn_id, child_table, fk, old_value, new_value, 0)
}

fn cascade_update_recursive(
    engine: &MVCCEngine,
    txn_id: i64,
    child_table: &str,
    fk: &ForeignKeyConstraint,
    old_value: &Value,
    new_value: &Value,
    depth: usize,
) -> Result<i32> {
    // Early exit: if no child rows reference old_value, cascade stops here.
    // No depth check, RESTRICT check, or writes needed.
    if !child_rows_exist(engine, txn_id, child_table, fk, old_value)? {
        return Ok(0);
    }

    // Child rows exist. Check depth limit before doing any work.
    if depth >= MAX_CASCADE_DEPTH {
        return Err(Error::internal(format!(
            "foreign key CASCADE depth limit ({}) exceeded — possible circular reference",
            MAX_CASCADE_DEPTH
        )));
    }

    // Find grandchild FKs that reference the column being updated
    let grandchild_fks = find_referencing_fks(engine, child_table);
    let child_fk_col = &fk.column_name;

    let relevant_grandchild_fks: Vec<_> = grandchild_fks
        .iter()
        .filter(|(_, gfk)| gfk.referenced_column == *child_fk_col)
        .collect();

    // Pre-check: verify grandchild RESTRICT constraints BEFORE updating child rows
    if !relevant_grandchild_fks.is_empty() {
        for (grandchild_table, grandchild_fk) in &relevant_grandchild_fks {
            if matches!(
                grandchild_fk.on_update,
                ForeignKeyAction::Restrict | ForeignKeyAction::NoAction
            ) && child_rows_exist(engine, txn_id, grandchild_table, grandchild_fk, old_value)?
            {
                return Err(Error::foreign_key_violation(
                    grandchild_table,
                    &grandchild_fk.column_name,
                    child_table,
                    &grandchild_fk.referenced_column,
                    format!(
                        "cannot cascade-update row with {} = {} — still referenced by table '{}'",
                        grandchild_fk.referenced_column, old_value, grandchild_table
                    ),
                ));
            }
        }
    }

    // Now update the matching child rows (safe — RESTRICT checks passed above)
    let mut child = engine.get_table_for_txn(txn_id, child_table)?;
    let col_idx = fk.column_index;
    let new_val = new_value.clone();

    let child_schema = child.schema();
    let col_name = &child_schema.columns[col_idx].name;
    let mut expr = crate::storage::expression::ComparisonExpr::new(
        col_name.as_str(),
        crate::core::Operator::Eq,
        old_value.clone(),
    );
    expr.prepare_for_schema(child_schema);

    let count = child.update(Some(&expr), &mut |mut row| {
        let _ = row.set(col_idx, new_val.clone());
        Ok((row, true))
    })?;

    let mut total = count;

    if !relevant_grandchild_fks.is_empty() && count > 0 {
        for (grandchild_table, grandchild_fk) in &relevant_grandchild_fks {
            match grandchild_fk.on_update {
                ForeignKeyAction::Restrict | ForeignKeyAction::NoAction => {
                    // Already checked above
                }
                ForeignKeyAction::Cascade => {
                    let affected = cascade_update_recursive(
                        engine,
                        txn_id,
                        grandchild_table,
                        grandchild_fk,
                        old_value,
                        new_value,
                        depth + 1,
                    )?;
                    total = total.saturating_add(affected);
                }
                ForeignKeyAction::SetNull => {
                    // SET NULL changes the grandchild's FK column from old_value to NULL.
                    // Deeper descendants that reference the grandchild's column must see
                    // this as an update from old_value → NULL and be cascaded accordingly.
                    let null_val = Value::null(
                        engine.get_table_schema(grandchild_table)?.columns
                            [grandchild_fk.column_index]
                            .data_type,
                    );
                    let affected = cascade_update_recursive(
                        engine,
                        txn_id,
                        grandchild_table,
                        grandchild_fk,
                        old_value,
                        &null_val,
                        depth + 1,
                    )?;
                    total = total.saturating_add(affected);
                }
            }
        }
    }

    Ok(total)
}

/// SET NULL: set FK column to NULL in all child rows referencing the given parent PK value.
/// Operates within the caller's transaction (no independent commit).
fn set_null_on_delete(
    engine: &MVCCEngine,
    txn_id: i64,
    child_table: &str,
    fk: &ForeignKeyConstraint,
    parent_pk_value: &Value,
) -> Result<i32> {
    let mut child = engine.get_table_for_txn(txn_id, child_table)?;

    let col_idx = fk.column_index;

    // Check that the FK column is nullable
    let child_schema = child.schema();
    if !child_schema.columns[col_idx].nullable {
        return Err(Error::foreign_key_violation(
            child_table,
            &fk.column_name,
            &fk.referenced_table,
            &fk.referenced_column,
            format!(
                "cannot SET NULL on non-nullable column '{}'",
                fk.column_name
            ),
        ));
    }

    let null_val = Value::null(child_schema.columns[col_idx].data_type);
    let col_name = &child_schema.columns[col_idx].name;
    let mut expr = crate::storage::expression::ComparisonExpr::new(
        col_name.as_str(),
        crate::core::Operator::Eq,
        parent_pk_value.clone(),
    );
    expr.prepare_for_schema(child_schema);

    let count = child.update(Some(&expr), &mut |mut row| {
        let _ = row.set(col_idx, null_val.clone());
        Ok((row, true))
    })?;
    // Do NOT commit here — changes committed atomically with parent transaction.

    Ok(count)
}

/// Check if any child tables have rows that actually reference the given parent table.
/// Used by DROP TABLE and TRUNCATE to ensure no referencing rows exist.
/// Only counts rows where the FK column is non-NULL (NULL means "no reference").
///
/// Blocks for ALL FK action types (RESTRICT, CASCADE, SET NULL, NO ACTION) because
/// DROP TABLE/TRUNCATE are DDL operations that don't cascade to child rows — they
/// would leave orphaned references. The user must delete child rows first.
///
/// When `txn_id` is provided, uses the caller's transaction for visibility (sees
/// uncommitted deletes within an explicit transaction). Otherwise creates a fresh
/// read-only transaction.
pub(crate) fn check_no_referencing_rows(
    engine: &MVCCEngine,
    parent_table: &str,
    txn_id: Option<i64>,
) -> Result<()> {
    let referencing = find_referencing_fks(engine, parent_table);
    if referencing.is_empty() {
        return Ok(());
    }

    for (child_table, fk) in referencing.iter() {
        // Build IS NOT NULL filter on the FK column — pushed down to storage layer
        // so indexes can be used and we stop after the first match (limit=1)
        let child_schema = engine.get_table_schema(child_table)?;
        let col_name = &child_schema.columns[fk.column_index].name;
        let mut not_null_expr =
            crate::storage::expression::NullCheckExpr::is_not_null(col_name.as_str());
        not_null_expr.prepare_for_schema(&child_schema);

        let has_ref = if let Some(tid) = txn_id {
            let child = engine.get_table_for_txn(tid, child_table)?;
            !child
                .collect_rows_with_limit_unordered(Some(&not_null_expr), 1, 0)?
                .is_empty()
        } else {
            let tx = engine.begin_transaction()?;
            let child = tx.get_table(child_table)?;
            !child
                .collect_rows_with_limit_unordered(Some(&not_null_expr), 1, 0)?
                .is_empty()
        };

        if has_ref {
            return Err(Error::foreign_key_violation(
                child_table,
                &fk.column_name,
                parent_table,
                &fk.referenced_column,
                format!(
                    "cannot drop/truncate table '{}' — rows in '{}' still reference it",
                    parent_table, child_table
                ),
            ));
        }
    }

    Ok(())
}