Skip to main content

cynos_storage/
constraint.rs

1//! Constraint checking for Cynos database.
2//!
3//! This module provides constraint validation including primary key,
4//! unique, not-null, and foreign key constraints.
5
6use crate::cache::TableCache;
7use crate::row_store::RowStore;
8use alloc::format;
9use cynos_core::schema::{ConstraintTiming, Table};
10use cynos_core::{Error, Result, Row, RowId};
11
12/// Constraint checker for validating database constraints.
13pub struct ConstraintChecker;
14
15impl ConstraintChecker {
16    /// Checks the not-null constraint for a row.
17    pub fn check_not_null(schema: &Table, row: &Row) -> Result<()> {
18        let not_nullable = schema.constraints().get_not_nullable();
19
20        for col_name in not_nullable {
21            if schema.get_column(col_name).is_some() {
22                let col_idx = schema.get_column_index(col_name).unwrap();
23                if let Some(value) = row.get(col_idx) {
24                    if value.is_null() {
25                        return Err(Error::NullConstraint {
26                            column: col_name.clone(),
27                        });
28                    }
29                }
30            }
31        }
32
33        Ok(())
34    }
35
36    /// Checks the not-null constraint for multiple rows.
37    pub fn check_not_null_rows(schema: &Table, rows: &[Row]) -> Result<()> {
38        for row in rows {
39            Self::check_not_null(schema, row)?;
40        }
41        Ok(())
42    }
43
44    /// Checks foreign key constraints for insert.
45    pub fn check_foreign_keys_for_insert(
46        cache: &TableCache,
47        schema: &Table,
48        rows: &[Row],
49        timing: ConstraintTiming,
50    ) -> Result<()> {
51        let foreign_keys = schema.constraints().get_foreign_keys();
52
53        for fk in foreign_keys {
54            if fk.timing != timing {
55                continue;
56            }
57
58            let parent_store = cache.get_table(&fk.parent_table).ok_or_else(|| {
59                Error::table_not_found(&fk.parent_table)
60            })?;
61
62            let child_col_idx = schema.get_column_index(&fk.child_column).ok_or_else(|| {
63                Error::column_not_found(schema.name(), &fk.child_column)
64            })?;
65
66            for row in rows {
67                if let Some(value) = row.get(child_col_idx) {
68                    if !value.is_null() && !parent_store.pk_exists(value) {
69                        return Err(Error::ForeignKeyViolation {
70                            constraint: fk.name.clone(),
71                            message: format!(
72                                "Referenced key {:?} does not exist in {}",
73                                value, fk.parent_table
74                            ),
75                        });
76                    }
77                }
78            }
79        }
80
81        Ok(())
82    }
83
84    /// Checks foreign key constraints for delete.
85    pub fn check_foreign_keys_for_delete(
86        cache: &TableCache,
87        schema: &Table,
88        rows: &[Row],
89        timing: ConstraintTiming,
90    ) -> Result<()> {
91        // Find all tables that reference this table
92        for table_name in cache.table_names() {
93            if let Some(child_store) = cache.get_table(table_name) {
94                let child_schema = child_store.schema();
95                let foreign_keys = child_schema.constraints().get_foreign_keys();
96
97                for fk in foreign_keys {
98                    if fk.parent_table != schema.name() || fk.timing != timing {
99                        continue;
100                    }
101
102                    let parent_col_idx = schema.get_column_index(&fk.parent_column).ok_or_else(|| {
103                        Error::column_not_found(schema.name(), &fk.parent_column)
104                    })?;
105
106                    for row in rows {
107                        if let Some(pk_value) = row.get(parent_col_idx) {
108                            // Check if any child rows reference this value
109                            let child_rows = child_store.get_by_pk(pk_value);
110                            if !child_rows.is_empty() {
111                                return Err(Error::ForeignKeyViolation {
112                                    constraint: fk.name.clone(),
113                                    message: format!(
114                                        "Cannot delete: referenced by {} rows in {}",
115                                        child_rows.len(),
116                                        child_schema.name()
117                                    ),
118                                });
119                            }
120                        }
121                    }
122                }
123            }
124        }
125
126        Ok(())
127    }
128
129    /// Checks foreign key constraints for update.
130    pub fn check_foreign_keys_for_update(
131        cache: &TableCache,
132        schema: &Table,
133        modifications: &[(Row, Row)],
134        timing: ConstraintTiming,
135    ) -> Result<()> {
136        // Check if updated values still satisfy FK constraints (as child)
137        let foreign_keys = schema.constraints().get_foreign_keys();
138
139        for fk in foreign_keys {
140            if fk.timing != timing {
141                continue;
142            }
143
144            let parent_store = cache.get_table(&fk.parent_table).ok_or_else(|| {
145                Error::table_not_found(&fk.parent_table)
146            })?;
147
148            let child_col_idx = schema.get_column_index(&fk.child_column).ok_or_else(|| {
149                Error::column_not_found(schema.name(), &fk.child_column)
150            })?;
151
152            for (_, new_row) in modifications {
153                if let Some(value) = new_row.get(child_col_idx) {
154                    if !value.is_null() && !parent_store.pk_exists(value) {
155                        return Err(Error::ForeignKeyViolation {
156                            constraint: fk.name.clone(),
157                            message: format!(
158                                "Referenced key {:?} does not exist in {}",
159                                value, fk.parent_table
160                            ),
161                        });
162                    }
163                }
164            }
165        }
166
167        // Check if updated values break FK constraints (as parent)
168        for table_name in cache.table_names() {
169            if let Some(child_store) = cache.get_table(table_name) {
170                let child_schema = child_store.schema();
171                let child_fks = child_schema.constraints().get_foreign_keys();
172
173                for fk in child_fks {
174                    if fk.parent_table != schema.name() || fk.timing != timing {
175                        continue;
176                    }
177
178                    let parent_col_idx = schema.get_column_index(&fk.parent_column).ok_or_else(|| {
179                        Error::column_not_found(schema.name(), &fk.parent_column)
180                    })?;
181
182                    for (old_row, new_row) in modifications {
183                        let old_value = old_row.get(parent_col_idx);
184                        let new_value = new_row.get(parent_col_idx);
185
186                        // If the referenced column value changed
187                        if old_value != new_value {
188                            if let Some(old_val) = old_value {
189                                let child_rows = child_store.get_by_pk(old_val);
190                                if !child_rows.is_empty() {
191                                    return Err(Error::ForeignKeyViolation {
192                                        constraint: fk.name.clone(),
193                                        message: format!(
194                                            "Cannot update: referenced by {} rows in {}",
195                                            child_rows.len(),
196                                            child_schema.name()
197                                        ),
198                                    });
199                                }
200                            }
201                        }
202                    }
203                }
204            }
205        }
206
207        Ok(())
208    }
209
210    /// Finds existing row ID by primary key in the store.
211    pub fn find_existing_row_id_in_pk_index(store: &RowStore, row: &Row) -> Option<RowId> {
212        store.find_row_id_by_pk(row)
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use super::*;
219    use cynos_core::schema::TableBuilder;
220    use cynos_core::{DataType, Value};
221    use alloc::vec;
222
223    fn test_schema_with_not_null() -> Table {
224        TableBuilder::new("test")
225            .unwrap()
226            .add_column("id", DataType::Int64)
227            .unwrap()
228            .add_column("name", DataType::String)
229            .unwrap()
230            .add_primary_key(&["id"], false)
231            .unwrap()
232            .build()
233            .unwrap()
234    }
235
236    #[test]
237    fn test_check_not_null_valid() {
238        let schema = test_schema_with_not_null();
239        let row = Row::new(1, vec![Value::Int64(1), Value::String("test".into())]);
240
241        let result = ConstraintChecker::check_not_null(&schema, &row);
242        assert!(result.is_ok());
243    }
244
245    #[test]
246    fn test_check_not_null_violation() {
247        let schema = test_schema_with_not_null();
248        // id column is not nullable (part of PK)
249        let row = Row::new(1, vec![Value::Null, Value::String("test".into())]);
250
251        let result = ConstraintChecker::check_not_null(&schema, &row);
252        assert!(result.is_err());
253    }
254
255    #[test]
256    fn test_find_existing_row_id() {
257        let schema = test_schema_with_not_null();
258        let mut store = RowStore::new(schema.clone());
259
260        let row = Row::new(1, vec![Value::Int64(100), Value::String("test".into())]);
261        store.insert(row).unwrap();
262
263        let search_row = Row::new(2, vec![Value::Int64(100), Value::String("other".into())]);
264        let found = ConstraintChecker::find_existing_row_id_in_pk_index(&store, &search_row);
265        assert_eq!(found, Some(1));
266
267        let not_found_row = Row::new(3, vec![Value::Int64(999), Value::String("other".into())]);
268        let not_found = ConstraintChecker::find_existing_row_id_in_pk_index(&store, &not_found_row);
269        assert!(not_found.is_none());
270    }
271
272    // === FK Constraint Tests ===
273
274    fn create_users_table() -> Table {
275        TableBuilder::new("users")
276            .unwrap()
277            .add_column("id", DataType::Int64)
278            .unwrap()
279            .add_column("name", DataType::String)
280            .unwrap()
281            .add_primary_key(&["id"], false)
282            .unwrap()
283            .build()
284            .unwrap()
285    }
286
287    fn create_orders_table_with_fk() -> Table {
288        TableBuilder::new("orders")
289            .unwrap()
290            .add_column("id", DataType::Int64)
291            .unwrap()
292            .add_column("user_id", DataType::Int64)
293            .unwrap()
294            .add_column("amount", DataType::Int64)
295            .unwrap()
296            .add_primary_key(&["id"], false)
297            .unwrap()
298            .add_foreign_key("fk_orders_user", "user_id", "users", "id")
299            .unwrap()
300            .build()
301            .unwrap()
302    }
303
304    #[test]
305    fn test_fk_insert_valid() {
306        let mut cache = TableCache::new();
307        cache.create_table(create_users_table()).unwrap();
308        cache.create_table(create_orders_table_with_fk()).unwrap();
309
310        // Insert parent row first
311        let user = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
312        cache.get_table_mut("users").unwrap().insert(user).unwrap();
313
314        // Insert child row referencing existing parent
315        let order = Row::new(1, vec![Value::Int64(1), Value::Int64(1), Value::Int64(100)]);
316        let orders_schema = cache.get_table("orders").unwrap().schema().clone();
317
318        let result = ConstraintChecker::check_foreign_keys_for_insert(
319            &cache,
320            &orders_schema,
321            &[order],
322            ConstraintTiming::Immediate,
323        );
324        assert!(result.is_ok());
325    }
326
327    #[test]
328    fn test_fk_insert_violation() {
329        let mut cache = TableCache::new();
330        cache.create_table(create_users_table()).unwrap();
331        cache.create_table(create_orders_table_with_fk()).unwrap();
332
333        // Try to insert child row without parent
334        let order = Row::new(1, vec![Value::Int64(1), Value::Int64(999), Value::Int64(100)]);
335        let orders_schema = cache.get_table("orders").unwrap().schema().clone();
336
337        let result = ConstraintChecker::check_foreign_keys_for_insert(
338            &cache,
339            &orders_schema,
340            &[order],
341            ConstraintTiming::Immediate,
342        );
343        assert!(result.is_err());
344    }
345
346    #[test]
347    fn test_fk_insert_null_allowed() {
348        let mut cache = TableCache::new();
349        cache.create_table(create_users_table()).unwrap();
350        cache.create_table(create_orders_table_with_fk()).unwrap();
351
352        // Insert child row with NULL FK (should be allowed)
353        let order = Row::new(1, vec![Value::Int64(1), Value::Null, Value::Int64(100)]);
354        let orders_schema = cache.get_table("orders").unwrap().schema().clone();
355
356        let result = ConstraintChecker::check_foreign_keys_for_insert(
357            &cache,
358            &orders_schema,
359            &[order],
360            ConstraintTiming::Immediate,
361        );
362        assert!(result.is_ok());
363    }
364
365    #[test]
366    fn test_fk_delete_with_children() {
367        let mut cache = TableCache::new();
368        cache.create_table(create_users_table()).unwrap();
369        cache.create_table(create_orders_table_with_fk()).unwrap();
370
371        // Insert parent
372        let user = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
373        cache.get_table_mut("users").unwrap().insert(user.clone()).unwrap();
374
375        // Insert child referencing parent
376        let order = Row::new(1, vec![Value::Int64(1), Value::Int64(1), Value::Int64(100)]);
377        cache.get_table_mut("orders").unwrap().insert(order).unwrap();
378
379        // Try to delete parent - should fail
380        let users_schema = cache.get_table("users").unwrap().schema().clone();
381        let result = ConstraintChecker::check_foreign_keys_for_delete(
382            &cache,
383            &users_schema,
384            &[user],
385            ConstraintTiming::Immediate,
386        );
387        assert!(result.is_err());
388    }
389
390    #[test]
391    fn test_fk_delete_no_children() {
392        let mut cache = TableCache::new();
393        cache.create_table(create_users_table()).unwrap();
394        cache.create_table(create_orders_table_with_fk()).unwrap();
395
396        // Insert parent only
397        let user = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
398        cache.get_table_mut("users").unwrap().insert(user.clone()).unwrap();
399
400        // Delete parent - should succeed (no children)
401        let users_schema = cache.get_table("users").unwrap().schema().clone();
402        let result = ConstraintChecker::check_foreign_keys_for_delete(
403            &cache,
404            &users_schema,
405            &[user],
406            ConstraintTiming::Immediate,
407        );
408        assert!(result.is_ok());
409    }
410
411    #[test]
412    fn test_fk_update_child_valid() {
413        let mut cache = TableCache::new();
414        cache.create_table(create_users_table()).unwrap();
415        cache.create_table(create_orders_table_with_fk()).unwrap();
416
417        // Insert two users
418        let user1 = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
419        let user2 = Row::new(2, vec![Value::Int64(2), Value::String("Bob".into())]);
420        cache.get_table_mut("users").unwrap().insert(user1).unwrap();
421        cache.get_table_mut("users").unwrap().insert(user2).unwrap();
422
423        // Insert order referencing user1
424        let order = Row::new(1, vec![Value::Int64(1), Value::Int64(1), Value::Int64(100)]);
425        cache.get_table_mut("orders").unwrap().insert(order.clone()).unwrap();
426
427        // Update order to reference user2 - should succeed
428        let updated_order = Row::new(1, vec![Value::Int64(1), Value::Int64(2), Value::Int64(100)]);
429        let orders_schema = cache.get_table("orders").unwrap().schema().clone();
430
431        let result = ConstraintChecker::check_foreign_keys_for_update(
432            &cache,
433            &orders_schema,
434            &[(order, updated_order)],
435            ConstraintTiming::Immediate,
436        );
437        assert!(result.is_ok());
438    }
439
440    #[test]
441    fn test_fk_update_child_violation() {
442        let mut cache = TableCache::new();
443        cache.create_table(create_users_table()).unwrap();
444        cache.create_table(create_orders_table_with_fk()).unwrap();
445
446        // Insert one user
447        let user = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
448        cache.get_table_mut("users").unwrap().insert(user).unwrap();
449
450        // Insert order referencing user
451        let order = Row::new(1, vec![Value::Int64(1), Value::Int64(1), Value::Int64(100)]);
452        cache.get_table_mut("orders").unwrap().insert(order.clone()).unwrap();
453
454        // Update order to reference non-existent user - should fail
455        let updated_order = Row::new(1, vec![Value::Int64(1), Value::Int64(999), Value::Int64(100)]);
456        let orders_schema = cache.get_table("orders").unwrap().schema().clone();
457
458        let result = ConstraintChecker::check_foreign_keys_for_update(
459            &cache,
460            &orders_schema,
461            &[(order, updated_order)],
462            ConstraintTiming::Immediate,
463        );
464        assert!(result.is_err());
465    }
466
467    #[test]
468    fn test_fk_update_parent_with_children() {
469        let mut cache = TableCache::new();
470        cache.create_table(create_users_table()).unwrap();
471        cache.create_table(create_orders_table_with_fk()).unwrap();
472
473        // Insert parent
474        let user = Row::new(1, vec![Value::Int64(1), Value::String("Alice".into())]);
475        cache.get_table_mut("users").unwrap().insert(user.clone()).unwrap();
476
477        // Insert child referencing parent
478        let order = Row::new(1, vec![Value::Int64(1), Value::Int64(1), Value::Int64(100)]);
479        cache.get_table_mut("orders").unwrap().insert(order).unwrap();
480
481        // Try to update parent PK - should fail (has children)
482        let updated_user = Row::new(1, vec![Value::Int64(999), Value::String("Alice".into())]);
483        let users_schema = cache.get_table("users").unwrap().schema().clone();
484
485        let result = ConstraintChecker::check_foreign_keys_for_update(
486            &cache,
487            &users_schema,
488            &[(user, updated_user)],
489            ConstraintTiming::Immediate,
490        );
491        assert!(result.is_err());
492    }
493}