1use std::collections::HashSet;
9
10use ormdb_core::catalog::{Catalog, DeleteBehavior};
11use ormdb_core::error::CascadeError;
12use ormdb_core::storage::{Record, StorageEngine, Transaction};
13
14use crate::error::Error;
15
16const MAX_CASCADE_DEPTH: usize = 100;
18
19#[derive(Debug, Default)]
21pub struct CascadeResult {
22 pub deleted_entities: Vec<(String, [u8; 16])>,
24 pub nullified_fields: Vec<(String, [u8; 16], String)>,
26}
27
28impl CascadeResult {
29 pub fn new() -> Self {
31 Self::default()
32 }
33
34 pub fn affected_count(&self) -> usize {
36 self.deleted_entities.len() + self.nullified_fields.len()
37 }
38}
39
40pub struct CascadeExecutor<'a> {
42 catalog: &'a Catalog,
43 engine: &'a StorageEngine,
44}
45
46impl<'a> CascadeExecutor<'a> {
47 pub fn new(catalog: &'a Catalog, engine: &'a StorageEngine) -> Self {
49 Self { catalog, engine }
50 }
51
52 pub fn process_delete(
57 &self,
58 entity: &str,
59 entity_id: [u8; 16],
60 tx: &mut Transaction<'_>,
61 ) -> Result<CascadeResult, Error> {
62 let mut result = CascadeResult::new();
63 let mut visited = HashSet::new();
64
65 self.process_delete_recursive(entity, entity_id, tx, &mut result, &mut visited, 0)?;
66
67 Ok(result)
68 }
69
70 fn process_delete_recursive(
72 &self,
73 entity: &str,
74 entity_id: [u8; 16],
75 tx: &mut Transaction<'_>,
76 result: &mut CascadeResult,
77 visited: &mut HashSet<[u8; 16]>,
78 depth: usize,
79 ) -> Result<(), Error> {
80 if depth > MAX_CASCADE_DEPTH {
82 return Err(Error::Storage(ormdb_core::Error::CascadeError(
83 CascadeError::MaxDepthExceeded { depth },
84 )));
85 }
86
87 if visited.contains(&entity_id) {
89 return Ok(());
90 }
91 visited.insert(entity_id);
92
93 let schema = match self.catalog.current_schema()? {
95 Some(s) => s,
96 None => return Ok(()), };
98
99 for relation in schema.relations.values() {
101 if relation.to_entity != entity {
102 continue;
103 }
104
105 let referencing_ids = self.find_referencing_entities(
107 &relation.from_entity,
108 &relation.from_field,
109 entity_id,
110 )?;
111
112 if referencing_ids.is_empty() {
113 continue;
114 }
115
116 match relation.on_delete {
117 DeleteBehavior::Restrict => {
118 return Err(Error::Storage(ormdb_core::Error::CascadeError(
120 CascadeError::RestrictViolation {
121 entity: entity.to_string(),
122 referencing_entity: relation.from_entity.clone(),
123 count: referencing_ids.len(),
124 },
125 )));
126 }
127 DeleteBehavior::Cascade => {
128 for ref_id in referencing_ids {
130 self.process_delete_recursive(
132 &relation.from_entity,
133 ref_id,
134 tx,
135 result,
136 visited,
137 depth + 1,
138 )?;
139
140 tx.delete_typed(&relation.from_entity, ref_id);
142 result
143 .deleted_entities
144 .push((relation.from_entity.clone(), ref_id));
145 }
146 }
147 DeleteBehavior::SetNull => {
148 for ref_id in referencing_ids {
150 self.set_field_null(
151 tx,
152 &relation.from_entity,
153 ref_id,
154 &relation.from_field,
155 )?;
156 result.nullified_fields.push((
157 relation.from_entity.clone(),
158 ref_id,
159 relation.from_field.clone(),
160 ));
161 }
162 }
163 }
164 }
165
166 Ok(())
167 }
168
169 fn find_referencing_entities(
171 &self,
172 from_entity: &str,
173 from_field: &str,
174 target_id: [u8; 16],
175 ) -> Result<Vec<[u8; 16]>, Error> {
176 let mut referencing = Vec::new();
177
178 for scan_result in self.engine.scan_entity_type(from_entity) {
180 let (entity_id, _, record) = scan_result?;
181
182 if self.entity_references_target(&record.data, from_field, target_id) {
184 referencing.push(entity_id);
185 }
186 }
187
188 Ok(referencing)
189 }
190
191 fn entity_references_target(&self, data: &[u8], field: &str, target_id: [u8; 16]) -> bool {
193 if let Ok(fields) = ormdb_core::query::decode_entity(data) {
195 for (name, value) in fields {
196 if name == field {
197 match value {
199 ormdb_proto::Value::Uuid(id) => return id == target_id,
200 ormdb_proto::Value::String(s) => {
201 if let Ok(id) = parse_uuid_string(&s) {
203 return id == target_id;
204 }
205 }
206 _ => {}
207 }
208 }
209 }
210 }
211 false
212 }
213
214 fn set_field_null(
216 &self,
217 tx: &mut Transaction<'_>,
218 entity_type: &str,
219 entity_id: [u8; 16],
220 field: &str,
221 ) -> Result<(), Error> {
222 let (_version, record) = match self.engine.get_latest(&entity_id)? {
224 Some(r) => r,
225 None => return Ok(()), };
227
228 let mut fields = ormdb_core::query::decode_entity(&record.data)?;
230
231 let mut found = false;
233 for (name, value) in &mut fields {
234 if name == field {
235 *value = ormdb_proto::Value::Null;
236 found = true;
237 break;
238 }
239 }
240
241 if !found {
242 fields.push((field.to_string(), ormdb_proto::Value::Null));
244 }
245
246 let encoded = ormdb_core::query::encode_entity(&fields)?;
248 tx.update(entity_type, entity_id, Record::new(encoded));
249
250 Ok(())
251 }
252
253 pub fn can_delete(&self, entity: &str, entity_id: [u8; 16]) -> Result<(), Error> {
257 let schema = match self.catalog.current_schema()? {
258 Some(s) => s,
259 None => return Ok(()),
260 };
261
262 for relation in schema.relations.values() {
263 if relation.to_entity != entity {
264 continue;
265 }
266
267 if relation.on_delete == DeleteBehavior::Restrict {
268 let count = self.count_references(
269 &relation.from_entity,
270 &relation.from_field,
271 entity_id,
272 )?;
273
274 if count > 0 {
275 return Err(Error::Storage(ormdb_core::Error::CascadeError(
276 CascadeError::RestrictViolation {
277 entity: entity.to_string(),
278 referencing_entity: relation.from_entity.clone(),
279 count,
280 },
281 )));
282 }
283 }
284 }
285
286 Ok(())
287 }
288
289 fn count_references(
291 &self,
292 from_entity: &str,
293 from_field: &str,
294 target_id: [u8; 16],
295 ) -> Result<usize, Error> {
296 let refs = self.find_referencing_entities(from_entity, from_field, target_id)?;
297 Ok(refs.len())
298 }
299}
300
301fn parse_uuid_string(s: &str) -> Result<[u8; 16], ()> {
303 let hex: String = s.chars().filter(|c| *c != '-').collect();
305 if hex.len() != 32 {
306 return Err(());
307 }
308
309 let mut bytes = [0u8; 16];
310 for (i, chunk) in hex.as_bytes().chunks(2).enumerate() {
311 let byte_str = std::str::from_utf8(chunk).map_err(|_| ())?;
312 bytes[i] = u8::from_str_radix(byte_str, 16).map_err(|_| ())?;
313 }
314
315 Ok(bytes)
316}
317
318#[cfg(test)]
319mod tests {
320 use super::*;
321 use ormdb_core::catalog::{
322 EntityDef, FieldDef, FieldType, RelationDef, ScalarType, SchemaBundle,
323 };
324 use ormdb_core::storage::{StorageConfig, VersionedKey};
325
326 fn setup_test_env() -> (tempfile::TempDir, StorageEngine, Catalog) {
327 let dir = tempfile::tempdir().unwrap();
328 let engine = StorageEngine::open(StorageConfig::new(dir.path())).unwrap();
329 let catalog = Catalog::open(engine.db()).unwrap();
330 (dir, engine, catalog)
331 }
332
333 fn create_test_schema() -> SchemaBundle {
334 let user = EntityDef::new("User", "id")
335 .with_field(FieldDef::new("id", FieldType::Scalar(ScalarType::Uuid)))
336 .with_field(FieldDef::new("name", FieldType::Scalar(ScalarType::String)));
337
338 let post = EntityDef::new("Post", "id")
339 .with_field(FieldDef::new("id", FieldType::Scalar(ScalarType::Uuid)))
340 .with_field(FieldDef::new("title", FieldType::Scalar(ScalarType::String)))
341 .with_field(FieldDef::new("author_id", FieldType::Scalar(ScalarType::Uuid)));
342
343 SchemaBundle::new(1)
344 .with_entity(user)
345 .with_entity(post)
346 }
347
348 #[test]
349 fn test_no_cascades_without_relations() {
350 let (_dir, engine, catalog) = setup_test_env();
351
352 let schema = create_test_schema();
354 catalog.apply_schema(schema).unwrap();
355
356 let cascade = CascadeExecutor::new(&catalog, &engine);
357
358 let user_id = StorageEngine::generate_id();
360 let user_data = ormdb_core::query::encode_entity(&[
361 ("id".to_string(), ormdb_proto::Value::Uuid(user_id)),
362 ("name".to_string(), ormdb_proto::Value::String("Alice".to_string())),
363 ])
364 .unwrap();
365 engine
366 .put_typed("User", VersionedKey::now(user_id), Record::new(user_data))
367 .unwrap();
368
369 let mut tx = engine.transaction();
371 let result = cascade.process_delete("User", user_id, &mut tx).unwrap();
372
373 assert_eq!(result.deleted_entities.len(), 0);
374 assert_eq!(result.nullified_fields.len(), 0);
375 }
376
377 #[test]
378 fn test_restrict_prevents_delete() {
379 let (_dir, engine, catalog) = setup_test_env();
380
381 let schema = create_test_schema().with_relation(
383 RelationDef::one_to_many("user_posts", "Post", "author_id", "User", "id")
384 .with_on_delete(DeleteBehavior::Restrict),
385 );
386 catalog.apply_schema(schema).unwrap();
387
388 let cascade = CascadeExecutor::new(&catalog, &engine);
389
390 let user_id = StorageEngine::generate_id();
392 let user_data = ormdb_core::query::encode_entity(&[
393 ("id".to_string(), ormdb_proto::Value::Uuid(user_id)),
394 ("name".to_string(), ormdb_proto::Value::String("Alice".to_string())),
395 ])
396 .unwrap();
397 engine
398 .put_typed("User", VersionedKey::now(user_id), Record::new(user_data))
399 .unwrap();
400
401 let post_id = StorageEngine::generate_id();
403 let post_data = ormdb_core::query::encode_entity(&[
404 ("id".to_string(), ormdb_proto::Value::Uuid(post_id)),
405 ("title".to_string(), ormdb_proto::Value::String("Hello".to_string())),
406 ("author_id".to_string(), ormdb_proto::Value::Uuid(user_id)),
407 ])
408 .unwrap();
409 engine
410 .put_typed("Post", VersionedKey::now(post_id), Record::new(post_data))
411 .unwrap();
412
413 engine.flush().unwrap();
415
416 let mut tx = engine.transaction();
418 let result = cascade.process_delete("User", user_id, &mut tx);
419
420 assert!(result.is_err());
421 if let Err(Error::Storage(ormdb_core::Error::CascadeError(CascadeError::RestrictViolation {
422 referencing_entity,
423 count,
424 ..
425 }))) = result
426 {
427 assert_eq!(referencing_entity, "Post");
428 assert_eq!(count, 1);
429 } else {
430 panic!("Expected RestrictViolation error");
431 }
432 }
433
434 #[test]
435 fn test_cascade_delete() {
436 let (_dir, engine, catalog) = setup_test_env();
437
438 let schema = create_test_schema().with_relation(
440 RelationDef::one_to_many("user_posts", "Post", "author_id", "User", "id")
441 .with_on_delete(DeleteBehavior::Cascade),
442 );
443 catalog.apply_schema(schema).unwrap();
444
445 let cascade = CascadeExecutor::new(&catalog, &engine);
446
447 let user_id = StorageEngine::generate_id();
449 let user_data = ormdb_core::query::encode_entity(&[
450 ("id".to_string(), ormdb_proto::Value::Uuid(user_id)),
451 ("name".to_string(), ormdb_proto::Value::String("Alice".to_string())),
452 ])
453 .unwrap();
454 engine
455 .put_typed("User", VersionedKey::now(user_id), Record::new(user_data))
456 .unwrap();
457
458 for i in 0..2 {
460 let post_id = StorageEngine::generate_id();
461 let post_data = ormdb_core::query::encode_entity(&[
462 ("id".to_string(), ormdb_proto::Value::Uuid(post_id)),
463 (
464 "title".to_string(),
465 ormdb_proto::Value::String(format!("Post {}", i)),
466 ),
467 ("author_id".to_string(), ormdb_proto::Value::Uuid(user_id)),
468 ])
469 .unwrap();
470 engine
471 .put_typed("Post", VersionedKey::now(post_id), Record::new(post_data))
472 .unwrap();
473 }
474
475 engine.flush().unwrap();
476
477 let mut tx = engine.transaction();
479 let result = cascade.process_delete("User", user_id, &mut tx).unwrap();
480
481 assert_eq!(result.deleted_entities.len(), 2);
483 assert!(result.deleted_entities.iter().all(|(e, _)| e == "Post"));
484 }
485
486 #[test]
487 fn test_can_delete() {
488 let (_dir, engine, catalog) = setup_test_env();
489
490 let schema = create_test_schema().with_relation(
492 RelationDef::one_to_many("user_posts", "Post", "author_id", "User", "id")
493 .with_on_delete(DeleteBehavior::Restrict),
494 );
495 catalog.apply_schema(schema).unwrap();
496
497 let cascade = CascadeExecutor::new(&catalog, &engine);
498
499 let user_id = StorageEngine::generate_id();
501 let user_data = ormdb_core::query::encode_entity(&[
502 ("id".to_string(), ormdb_proto::Value::Uuid(user_id)),
503 ("name".to_string(), ormdb_proto::Value::String("Bob".to_string())),
504 ])
505 .unwrap();
506 engine
507 .put_typed("User", VersionedKey::now(user_id), Record::new(user_data))
508 .unwrap();
509
510 assert!(cascade.can_delete("User", user_id).is_ok());
512
513 let post_id = StorageEngine::generate_id();
515 let post_data = ormdb_core::query::encode_entity(&[
516 ("id".to_string(), ormdb_proto::Value::Uuid(post_id)),
517 ("title".to_string(), ormdb_proto::Value::String("Hello".to_string())),
518 ("author_id".to_string(), ormdb_proto::Value::Uuid(user_id)),
519 ])
520 .unwrap();
521 engine
522 .put_typed("Post", VersionedKey::now(post_id), Record::new(post_data))
523 .unwrap();
524 engine.flush().unwrap();
525
526 assert!(cascade.can_delete("User", user_id).is_err());
528 }
529}