Skip to main content

ormdb_server/
cascade.rs

1//! Cascade executor for handling referential integrity on deletes.
2//!
3//! This module implements the cascade behavior for delete operations:
4//! - CASCADE: Delete related entities recursively
5//! - RESTRICT: Prevent deletion if related entities exist
6//! - SET NULL: Set foreign key fields to null on related entities
7
8use std::collections::HashSet;
9
10use ormdb_core::catalog::{Catalog, DeleteBehavior};
11use ormdb_core::error::CascadeError;
12use ormdb_core::storage::{Record, StorageEngine, Transaction};
13
14use crate::error::Error;
15
16/// Maximum cascade depth to prevent infinite recursion.
17const MAX_CASCADE_DEPTH: usize = 100;
18
19/// Result of a cascade operation.
20#[derive(Debug, Default)]
21pub struct CascadeResult {
22    /// Entities that were deleted.
23    pub deleted_entities: Vec<(String, [u8; 16])>,
24    /// Fields that were set to null.
25    pub nullified_fields: Vec<(String, [u8; 16], String)>,
26}
27
28impl CascadeResult {
29    /// Create an empty cascade result.
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    /// Get the total number of affected entities.
35    pub fn affected_count(&self) -> usize {
36        self.deleted_entities.len() + self.nullified_fields.len()
37    }
38}
39
40/// Executes cascade operations for delete.
41pub struct CascadeExecutor<'a> {
42    catalog: &'a Catalog,
43    engine: &'a StorageEngine,
44}
45
46impl<'a> CascadeExecutor<'a> {
47    /// Create a new cascade executor.
48    pub fn new(catalog: &'a Catalog, engine: &'a StorageEngine) -> Self {
49        Self { catalog, engine }
50    }
51
52    /// Process cascades for a delete operation.
53    ///
54    /// This method checks all relations that reference the entity being deleted
55    /// and applies the appropriate cascade behavior.
56    pub fn process_delete(
57        &self,
58        entity: &str,
59        entity_id: [u8; 16],
60        tx: &mut Transaction<'_>,
61    ) -> Result<CascadeResult, Error> {
62        let mut result = CascadeResult::new();
63        let mut visited = HashSet::new();
64
65        self.process_delete_recursive(entity, entity_id, tx, &mut result, &mut visited, 0)?;
66
67        Ok(result)
68    }
69
70    /// Recursively process cascades.
71    fn process_delete_recursive(
72        &self,
73        entity: &str,
74        entity_id: [u8; 16],
75        tx: &mut Transaction<'_>,
76        result: &mut CascadeResult,
77        visited: &mut HashSet<[u8; 16]>,
78        depth: usize,
79    ) -> Result<(), Error> {
80        // Check for maximum depth
81        if depth > MAX_CASCADE_DEPTH {
82            return Err(Error::Storage(ormdb_core::Error::CascadeError(
83                CascadeError::MaxDepthExceeded { depth },
84            )));
85        }
86
87        // Prevent cycles
88        if visited.contains(&entity_id) {
89            return Ok(());
90        }
91        visited.insert(entity_id);
92
93        // Get schema to find relations
94        let schema = match self.catalog.current_schema()? {
95            Some(s) => s,
96            None => return Ok(()), // No schema, no relations to check
97        };
98
99        // Find all relations that reference this entity type
100        for relation in schema.relations.values() {
101            if relation.to_entity != entity {
102                continue;
103            }
104
105            // Find all entities of the referencing type that point to this entity
106            let referencing_ids = self.find_referencing_entities(
107                &relation.from_entity,
108                &relation.from_field,
109                entity_id,
110            )?;
111
112            if referencing_ids.is_empty() {
113                continue;
114            }
115
116            match relation.on_delete {
117                DeleteBehavior::Restrict => {
118                    // Cannot delete - there are references
119                    return Err(Error::Storage(ormdb_core::Error::CascadeError(
120                        CascadeError::RestrictViolation {
121                            entity: entity.to_string(),
122                            referencing_entity: relation.from_entity.clone(),
123                            count: referencing_ids.len(),
124                        },
125                    )));
126                }
127                DeleteBehavior::Cascade => {
128                    // Delete all referencing entities
129                    for ref_id in referencing_ids {
130                        // First, recursively cascade from the referencing entity
131                        self.process_delete_recursive(
132                            &relation.from_entity,
133                            ref_id,
134                            tx,
135                            result,
136                            visited,
137                            depth + 1,
138                        )?;
139
140                        // Then delete the referencing entity
141                        tx.delete_typed(&relation.from_entity, ref_id);
142                        result
143                            .deleted_entities
144                            .push((relation.from_entity.clone(), ref_id));
145                    }
146                }
147                DeleteBehavior::SetNull => {
148                    // Set the FK field to null on all referencing entities
149                    for ref_id in referencing_ids {
150                        self.set_field_null(
151                            tx,
152                            &relation.from_entity,
153                            ref_id,
154                            &relation.from_field,
155                        )?;
156                        result.nullified_fields.push((
157                            relation.from_entity.clone(),
158                            ref_id,
159                            relation.from_field.clone(),
160                        ));
161                    }
162                }
163            }
164        }
165
166        Ok(())
167    }
168
169    /// Find all entities of a type that reference a given ID.
170    fn find_referencing_entities(
171        &self,
172        from_entity: &str,
173        from_field: &str,
174        target_id: [u8; 16],
175    ) -> Result<Vec<[u8; 16]>, Error> {
176        let mut referencing = Vec::new();
177
178        // Scan all entities of the from_entity type
179        for scan_result in self.engine.scan_entity_type(from_entity) {
180            let (entity_id, _, record) = scan_result?;
181
182            // Check if this entity references the target
183            if self.entity_references_target(&record.data, from_field, target_id) {
184                referencing.push(entity_id);
185            }
186        }
187
188        Ok(referencing)
189    }
190
191    /// Check if an entity's data references a target ID in a specific field.
192    fn entity_references_target(&self, data: &[u8], field: &str, target_id: [u8; 16]) -> bool {
193        // Decode the entity data
194        if let Ok(fields) = ormdb_core::query::decode_entity(data) {
195            for (name, value) in fields {
196                if name == field {
197                    // Check if the field value matches the target ID
198                    match value {
199                        ormdb_proto::Value::Uuid(id) => return id == target_id,
200                        ormdb_proto::Value::String(s) => {
201                            // Try to parse as UUID hex string
202                            if let Ok(id) = parse_uuid_string(&s) {
203                                return id == target_id;
204                            }
205                        }
206                        _ => {}
207                    }
208                }
209            }
210        }
211        false
212    }
213
214    /// Set a field to null on an entity.
215    fn set_field_null(
216        &self,
217        tx: &mut Transaction<'_>,
218        entity_type: &str,
219        entity_id: [u8; 16],
220        field: &str,
221    ) -> Result<(), Error> {
222        // Get the current entity data
223        let (_version, record) = match self.engine.get_latest(&entity_id)? {
224            Some(r) => r,
225            None => return Ok(()), // Entity doesn't exist
226        };
227
228        // Decode fields
229        let mut fields = ormdb_core::query::decode_entity(&record.data)?;
230
231        // Find and update the field
232        let mut found = false;
233        for (name, value) in &mut fields {
234            if name == field {
235                *value = ormdb_proto::Value::Null;
236                found = true;
237                break;
238            }
239        }
240
241        if !found {
242            // Field doesn't exist, add it as null
243            fields.push((field.to_string(), ormdb_proto::Value::Null));
244        }
245
246        // Re-encode and queue update
247        let encoded = ormdb_core::query::encode_entity(&fields)?;
248        tx.update(entity_type, entity_id, Record::new(encoded));
249
250        Ok(())
251    }
252
253    /// Check if deleting an entity is allowed (without actually performing cascades).
254    ///
255    /// This is useful for validation before committing.
256    pub fn can_delete(&self, entity: &str, entity_id: [u8; 16]) -> Result<(), Error> {
257        let schema = match self.catalog.current_schema()? {
258            Some(s) => s,
259            None => return Ok(()),
260        };
261
262        for relation in schema.relations.values() {
263            if relation.to_entity != entity {
264                continue;
265            }
266
267            if relation.on_delete == DeleteBehavior::Restrict {
268                let count = self.count_references(
269                    &relation.from_entity,
270                    &relation.from_field,
271                    entity_id,
272                )?;
273
274                if count > 0 {
275                    return Err(Error::Storage(ormdb_core::Error::CascadeError(
276                        CascadeError::RestrictViolation {
277                            entity: entity.to_string(),
278                            referencing_entity: relation.from_entity.clone(),
279                            count,
280                        },
281                    )));
282                }
283            }
284        }
285
286        Ok(())
287    }
288
289    /// Count how many entities reference a given ID.
290    fn count_references(
291        &self,
292        from_entity: &str,
293        from_field: &str,
294        target_id: [u8; 16],
295    ) -> Result<usize, Error> {
296        let refs = self.find_referencing_entities(from_entity, from_field, target_id)?;
297        Ok(refs.len())
298    }
299}
300
301/// Parse a UUID string to bytes.
302fn parse_uuid_string(s: &str) -> Result<[u8; 16], ()> {
303    // Remove dashes and parse as hex
304    let hex: String = s.chars().filter(|c| *c != '-').collect();
305    if hex.len() != 32 {
306        return Err(());
307    }
308
309    let mut bytes = [0u8; 16];
310    for (i, chunk) in hex.as_bytes().chunks(2).enumerate() {
311        let byte_str = std::str::from_utf8(chunk).map_err(|_| ())?;
312        bytes[i] = u8::from_str_radix(byte_str, 16).map_err(|_| ())?;
313    }
314
315    Ok(bytes)
316}
317
318#[cfg(test)]
319mod tests {
320    use super::*;
321    use ormdb_core::catalog::{
322        EntityDef, FieldDef, FieldType, RelationDef, ScalarType, SchemaBundle,
323    };
324    use ormdb_core::storage::{StorageConfig, VersionedKey};
325
326    fn setup_test_env() -> (tempfile::TempDir, StorageEngine, Catalog) {
327        let dir = tempfile::tempdir().unwrap();
328        let engine = StorageEngine::open(StorageConfig::new(dir.path())).unwrap();
329        let catalog = Catalog::open(engine.db()).unwrap();
330        (dir, engine, catalog)
331    }
332
333    fn create_test_schema() -> SchemaBundle {
334        let user = EntityDef::new("User", "id")
335            .with_field(FieldDef::new("id", FieldType::Scalar(ScalarType::Uuid)))
336            .with_field(FieldDef::new("name", FieldType::Scalar(ScalarType::String)));
337
338        let post = EntityDef::new("Post", "id")
339            .with_field(FieldDef::new("id", FieldType::Scalar(ScalarType::Uuid)))
340            .with_field(FieldDef::new("title", FieldType::Scalar(ScalarType::String)))
341            .with_field(FieldDef::new("author_id", FieldType::Scalar(ScalarType::Uuid)));
342
343        SchemaBundle::new(1)
344            .with_entity(user)
345            .with_entity(post)
346    }
347
348    #[test]
349    fn test_no_cascades_without_relations() {
350        let (_dir, engine, catalog) = setup_test_env();
351
352        // Schema without relations
353        let schema = create_test_schema();
354        catalog.apply_schema(schema).unwrap();
355
356        let cascade = CascadeExecutor::new(&catalog, &engine);
357
358        // Create a user
359        let user_id = StorageEngine::generate_id();
360        let user_data = ormdb_core::query::encode_entity(&[
361            ("id".to_string(), ormdb_proto::Value::Uuid(user_id)),
362            ("name".to_string(), ormdb_proto::Value::String("Alice".to_string())),
363        ])
364        .unwrap();
365        engine
366            .put_typed("User", VersionedKey::now(user_id), Record::new(user_data))
367            .unwrap();
368
369        // Process delete - should succeed with no cascades
370        let mut tx = engine.transaction();
371        let result = cascade.process_delete("User", user_id, &mut tx).unwrap();
372
373        assert_eq!(result.deleted_entities.len(), 0);
374        assert_eq!(result.nullified_fields.len(), 0);
375    }
376
377    #[test]
378    fn test_restrict_prevents_delete() {
379        let (_dir, engine, catalog) = setup_test_env();
380
381        // Schema with RESTRICT relation
382        let schema = create_test_schema().with_relation(
383            RelationDef::one_to_many("user_posts", "Post", "author_id", "User", "id")
384                .with_on_delete(DeleteBehavior::Restrict),
385        );
386        catalog.apply_schema(schema).unwrap();
387
388        let cascade = CascadeExecutor::new(&catalog, &engine);
389
390        // Create a user
391        let user_id = StorageEngine::generate_id();
392        let user_data = ormdb_core::query::encode_entity(&[
393            ("id".to_string(), ormdb_proto::Value::Uuid(user_id)),
394            ("name".to_string(), ormdb_proto::Value::String("Alice".to_string())),
395        ])
396        .unwrap();
397        engine
398            .put_typed("User", VersionedKey::now(user_id), Record::new(user_data))
399            .unwrap();
400
401        // Create a post referencing the user
402        let post_id = StorageEngine::generate_id();
403        let post_data = ormdb_core::query::encode_entity(&[
404            ("id".to_string(), ormdb_proto::Value::Uuid(post_id)),
405            ("title".to_string(), ormdb_proto::Value::String("Hello".to_string())),
406            ("author_id".to_string(), ormdb_proto::Value::Uuid(user_id)),
407        ])
408        .unwrap();
409        engine
410            .put_typed("Post", VersionedKey::now(post_id), Record::new(post_data))
411            .unwrap();
412
413        // Flush to ensure data is persisted
414        engine.flush().unwrap();
415
416        // Try to delete user - should fail with RestrictViolation
417        let mut tx = engine.transaction();
418        let result = cascade.process_delete("User", user_id, &mut tx);
419
420        assert!(result.is_err());
421        if let Err(Error::Storage(ormdb_core::Error::CascadeError(CascadeError::RestrictViolation {
422            referencing_entity,
423            count,
424            ..
425        }))) = result
426        {
427            assert_eq!(referencing_entity, "Post");
428            assert_eq!(count, 1);
429        } else {
430            panic!("Expected RestrictViolation error");
431        }
432    }
433
434    #[test]
435    fn test_cascade_delete() {
436        let (_dir, engine, catalog) = setup_test_env();
437
438        // Schema with CASCADE relation
439        let schema = create_test_schema().with_relation(
440            RelationDef::one_to_many("user_posts", "Post", "author_id", "User", "id")
441                .with_on_delete(DeleteBehavior::Cascade),
442        );
443        catalog.apply_schema(schema).unwrap();
444
445        let cascade = CascadeExecutor::new(&catalog, &engine);
446
447        // Create a user
448        let user_id = StorageEngine::generate_id();
449        let user_data = ormdb_core::query::encode_entity(&[
450            ("id".to_string(), ormdb_proto::Value::Uuid(user_id)),
451            ("name".to_string(), ormdb_proto::Value::String("Alice".to_string())),
452        ])
453        .unwrap();
454        engine
455            .put_typed("User", VersionedKey::now(user_id), Record::new(user_data))
456            .unwrap();
457
458        // Create two posts referencing the user
459        for i in 0..2 {
460            let post_id = StorageEngine::generate_id();
461            let post_data = ormdb_core::query::encode_entity(&[
462                ("id".to_string(), ormdb_proto::Value::Uuid(post_id)),
463                (
464                    "title".to_string(),
465                    ormdb_proto::Value::String(format!("Post {}", i)),
466                ),
467                ("author_id".to_string(), ormdb_proto::Value::Uuid(user_id)),
468            ])
469            .unwrap();
470            engine
471                .put_typed("Post", VersionedKey::now(post_id), Record::new(post_data))
472                .unwrap();
473        }
474
475        engine.flush().unwrap();
476
477        // Delete user - should cascade to posts
478        let mut tx = engine.transaction();
479        let result = cascade.process_delete("User", user_id, &mut tx).unwrap();
480
481        // Two posts should be marked for deletion
482        assert_eq!(result.deleted_entities.len(), 2);
483        assert!(result.deleted_entities.iter().all(|(e, _)| e == "Post"));
484    }
485
486    #[test]
487    fn test_can_delete() {
488        let (_dir, engine, catalog) = setup_test_env();
489
490        // Schema with RESTRICT relation
491        let schema = create_test_schema().with_relation(
492            RelationDef::one_to_many("user_posts", "Post", "author_id", "User", "id")
493                .with_on_delete(DeleteBehavior::Restrict),
494        );
495        catalog.apply_schema(schema).unwrap();
496
497        let cascade = CascadeExecutor::new(&catalog, &engine);
498
499        // Create a user without posts
500        let user_id = StorageEngine::generate_id();
501        let user_data = ormdb_core::query::encode_entity(&[
502            ("id".to_string(), ormdb_proto::Value::Uuid(user_id)),
503            ("name".to_string(), ormdb_proto::Value::String("Bob".to_string())),
504        ])
505        .unwrap();
506        engine
507            .put_typed("User", VersionedKey::now(user_id), Record::new(user_data))
508            .unwrap();
509
510        // can_delete should succeed (no posts)
511        assert!(cascade.can_delete("User", user_id).is_ok());
512
513        // Create a post
514        let post_id = StorageEngine::generate_id();
515        let post_data = ormdb_core::query::encode_entity(&[
516            ("id".to_string(), ormdb_proto::Value::Uuid(post_id)),
517            ("title".to_string(), ormdb_proto::Value::String("Hello".to_string())),
518            ("author_id".to_string(), ormdb_proto::Value::Uuid(user_id)),
519        ])
520        .unwrap();
521        engine
522            .put_typed("Post", VersionedKey::now(post_id), Record::new(post_data))
523            .unwrap();
524        engine.flush().unwrap();
525
526        // Now can_delete should fail
527        assert!(cascade.can_delete("User", user_id).is_err());
528    }
529}