1use ormdb_core::query::{decode_entity, encode_entity};
4use ormdb_core::replication::ChangeLog;
5use ormdb_core::storage::{Record, StorageEngine, VersionedKey};
6use ormdb_proto::replication::ChangeLogEntry;
7use ormdb_proto::{FieldValue, Mutation, MutationBatch, MutationResult, Value};
8
9use crate::database::Database;
10use crate::error::Error;
11
12pub struct MutationExecutor<'a> {
14 database: &'a Database,
15}
16
17impl<'a> MutationExecutor<'a> {
18 pub fn new(database: &'a Database) -> Self {
20 Self { database }
21 }
22
23 pub fn execute(&self, mutation: &Mutation) -> Result<MutationResult, Error> {
25 match mutation {
26 Mutation::Insert { entity, data } => self.execute_insert(entity, data),
27 Mutation::Update { entity, id, data } => self.execute_update(entity, id, data),
28 Mutation::Delete { entity, id } => self.execute_delete(entity, id),
29 Mutation::Upsert { entity, id, data } => self.execute_upsert(entity, id.as_ref(), data),
30 }
31 }
32
33 pub fn execute_with_cdc(&self, mutation: &Mutation) -> Result<MutationResult, Error> {
37 let changelog = self.database.changelog();
38 let schema_version = self.database.schema_version();
39
40 match mutation {
41 Mutation::Insert { entity, data } => {
42 self.execute_insert_with_cdc(entity, data, changelog, schema_version)
43 }
44 Mutation::Update { entity, id, data } => {
45 self.execute_update_with_cdc(entity, id, data, changelog, schema_version)
46 }
47 Mutation::Delete { entity, id } => {
48 self.execute_delete_with_cdc(entity, id, changelog, schema_version)
49 }
50 Mutation::Upsert { entity, id, data } => {
51 self.execute_upsert_with_cdc(entity, id.as_ref(), data, changelog, schema_version)
52 }
53 }
54 }
55
56 pub fn execute_batch(&self, batch: &MutationBatch) -> Result<MutationResult, Error> {
58 if batch.is_empty() {
59 return Ok(MutationResult::affected(0));
60 }
61
62 let mut inserted_ids = Vec::new();
63 let mut affected = 0u64;
64
65 for mutation in &batch.mutations {
69 let result = self.execute(mutation)?;
70 affected += result.affected;
71 inserted_ids.extend(result.inserted_ids);
72 }
73
74 if inserted_ids.is_empty() {
75 Ok(MutationResult::affected(affected))
76 } else {
77 Ok(MutationResult::bulk_inserted(inserted_ids))
78 }
79 }
80
81 pub fn execute_batch_with_cdc(&self, batch: &MutationBatch) -> Result<MutationResult, Error> {
83 if batch.is_empty() {
84 return Ok(MutationResult::affected(0));
85 }
86
87 let mut inserted_ids = Vec::new();
88 let mut affected = 0u64;
89
90 for mutation in &batch.mutations {
91 let result = self.execute_with_cdc(mutation)?;
92 affected += result.affected;
93 inserted_ids.extend(result.inserted_ids);
94 }
95
96 if inserted_ids.is_empty() {
97 Ok(MutationResult::affected(affected))
98 } else {
99 Ok(MutationResult::bulk_inserted(inserted_ids))
100 }
101 }
102
103 fn execute_insert(&self, entity: &str, data: &[FieldValue]) -> Result<MutationResult, Error> {
105 let (id, _encoded) = self.do_insert(entity, data)?;
106 Ok(MutationResult::inserted(id))
107 }
108
109 fn execute_insert_with_cdc(
111 &self,
112 entity: &str,
113 data: &[FieldValue],
114 changelog: &ChangeLog,
115 schema_version: u64,
116 ) -> Result<MutationResult, Error> {
117 let (id, encoded) = self.do_insert(entity, data)?;
118
119 let changed_fields: Vec<String> = data.iter().map(|fv| fv.field.clone()).collect();
121 let entry = ChangeLogEntry::insert(entity, id, encoded, changed_fields, schema_version);
122
123 changelog
125 .append(entry)
126 .map_err(|e| Error::Database(format!("failed to log to changelog: {}", e)))?;
127
128 Ok(MutationResult::inserted(id))
129 }
130
131 fn do_insert(&self, entity: &str, data: &[FieldValue]) -> Result<([u8; 16], Vec<u8>), Error> {
133 let id = StorageEngine::generate_id();
135
136 let mut fields: Vec<(String, Value)> = Vec::with_capacity(data.len() + 1);
138 fields.push(("id".to_string(), Value::Uuid(id)));
139
140 for fv in data {
141 if fv.field != "id" {
143 fields.push((fv.field.clone(), fv.value.clone()));
144 }
145 }
146
147 let encoded = encode_entity(&fields)
149 .map_err(|e| Error::Database(format!("failed to encode entity: {}", e)))?;
150
151 let key = VersionedKey::now(id);
153 self.database
154 .storage()
155 .put_typed(entity, key, Record::new(encoded.clone()))
156 .map_err(|e| Error::Storage(e))?;
157 self.database.statistics().increment(entity);
158 self.update_secondary_indexes(entity, id, None, Some(&encoded))?;
159
160 Ok((id, encoded))
161 }
162
163 fn execute_update(
165 &self,
166 entity: &str,
167 id: &[u8; 16],
168 data: &[FieldValue],
169 ) -> Result<MutationResult, Error> {
170 let _ = self.do_update(entity, id, data)?;
171 Ok(MutationResult::affected(1))
172 }
173
174 fn execute_update_with_cdc(
176 &self,
177 entity: &str,
178 id: &[u8; 16],
179 data: &[FieldValue],
180 changelog: &ChangeLog,
181 schema_version: u64,
182 ) -> Result<MutationResult, Error> {
183 let (before_data, after_data, changed_fields) = self.do_update(entity, id, data)?;
184
185 let entry = ChangeLogEntry::update(
187 entity,
188 *id,
189 before_data,
190 after_data,
191 changed_fields,
192 schema_version,
193 );
194
195 changelog
197 .append(entry)
198 .map_err(|e| Error::Database(format!("failed to log to changelog: {}", e)))?;
199
200 Ok(MutationResult::affected(1))
201 }
202
203 fn do_update(
205 &self,
206 entity: &str,
207 id: &[u8; 16],
208 data: &[FieldValue],
209 ) -> Result<(Vec<u8>, Vec<u8>, Vec<String>), Error> {
210 let (_version, existing) = self
212 .database
213 .storage()
214 .get_latest(id)
215 .map_err(|e| Error::Storage(e))?
216 .ok_or_else(|| {
217 Error::Database(format!("entity {}:{} not found", entity, hex_id(id)))
218 })?;
219
220 let before_data = existing.data.clone();
221
222 let before_fields: Vec<(String, Value)> =
224 decode_entity(&existing.data)
225 .map_err(|e| Error::Database(format!("failed to decode entity: {}", e)))?;
226
227 let mut fields = before_fields.clone();
228
229 let mut changed_fields = Vec::new();
231 for fv in data {
232 if fv.field == "id" {
234 continue;
235 }
236
237 if let Some(pos) = fields.iter().position(|(name, _)| name == &fv.field) {
238 if fields[pos].1 != fv.value {
239 changed_fields.push(fv.field.clone());
240 }
241 fields[pos].1 = fv.value.clone();
242 } else {
243 changed_fields.push(fv.field.clone());
244 fields.push((fv.field.clone(), fv.value.clone()));
245 }
246 }
247
248 let encoded = encode_entity(&fields)
250 .map_err(|e| Error::Database(format!("failed to encode entity: {}", e)))?;
251
252 let key = VersionedKey::now(*id);
253 self.database
254 .storage()
255 .put_typed(entity, key, Record::new(encoded.clone()))
256 .map_err(|e| Error::Storage(e))?;
257 self.update_secondary_indexes(entity, *id, Some(&before_data), Some(&encoded))?;
258
259 Ok((before_data, encoded, changed_fields))
260 }
261
262 fn execute_delete(&self, entity: &str, id: &[u8; 16]) -> Result<MutationResult, Error> {
264 let before_data = self.do_delete(entity, id)?;
265 if before_data.is_some() {
266 Ok(MutationResult::affected(1))
267 } else {
268 Ok(MutationResult::affected(0))
269 }
270 }
271
272 fn execute_delete_with_cdc(
274 &self,
275 entity: &str,
276 id: &[u8; 16],
277 changelog: &ChangeLog,
278 schema_version: u64,
279 ) -> Result<MutationResult, Error> {
280 let before_data = self.do_delete(entity, id)?;
281
282 if let Some(data) = before_data {
283 let entry = ChangeLogEntry::delete(entity, *id, data, schema_version);
285
286 changelog
288 .append(entry)
289 .map_err(|e| Error::Database(format!("failed to log to changelog: {}", e)))?;
290
291 Ok(MutationResult::affected(1))
292 } else {
293 Ok(MutationResult::affected(0))
294 }
295 }
296
297 fn do_delete(&self, entity: &str, id: &[u8; 16]) -> Result<Option<Vec<u8>>, Error> {
299 let existing = self
301 .database
302 .storage()
303 .get_latest(id)
304 .map_err(|e| Error::Storage(e))?;
305
306 if existing.is_none() {
307 return Ok(None);
308 }
309
310 let (_version, record) = existing.unwrap();
311 let before_data = record.data.clone();
312
313 self.database
315 .storage()
316 .delete_typed(entity, id)
317 .map_err(|e| Error::Storage(e))?;
318 self.database.statistics().decrement(entity);
319 self.update_secondary_indexes(entity, *id, Some(&before_data), None)?;
320
321 Ok(Some(before_data))
322 }
323
324 fn execute_upsert(
326 &self,
327 entity: &str,
328 id: Option<&[u8; 16]>,
329 data: &[FieldValue],
330 ) -> Result<MutationResult, Error> {
331 match id {
332 Some(existing_id) => {
333 let exists = self
335 .database
336 .storage()
337 .get_latest(existing_id)
338 .map_err(|e| Error::Storage(e))?
339 .is_some();
340
341 if exists {
342 self.execute_update(entity, existing_id, data)
344 } else {
345 self.execute_insert_with_id(entity, *existing_id, data)
347 }
348 }
349 None => {
350 self.execute_insert(entity, data)
352 }
353 }
354 }
355
356 fn execute_upsert_with_cdc(
358 &self,
359 entity: &str,
360 id: Option<&[u8; 16]>,
361 data: &[FieldValue],
362 changelog: &ChangeLog,
363 schema_version: u64,
364 ) -> Result<MutationResult, Error> {
365 match id {
366 Some(existing_id) => {
367 let exists = self
369 .database
370 .storage()
371 .get_latest(existing_id)
372 .map_err(|e| Error::Storage(e))?
373 .is_some();
374
375 if exists {
376 self.execute_update_with_cdc(entity, existing_id, data, changelog, schema_version)
378 } else {
379 self.execute_insert_with_id_cdc(entity, *existing_id, data, changelog, schema_version)
381 }
382 }
383 None => {
384 self.execute_insert_with_cdc(entity, data, changelog, schema_version)
386 }
387 }
388 }
389
390 fn execute_insert_with_id(
392 &self,
393 entity: &str,
394 id: [u8; 16],
395 data: &[FieldValue],
396 ) -> Result<MutationResult, Error> {
397 let _ = self.do_insert_with_id(entity, id, data)?;
398 Ok(MutationResult::inserted(id))
399 }
400
401 fn execute_insert_with_id_cdc(
403 &self,
404 entity: &str,
405 id: [u8; 16],
406 data: &[FieldValue],
407 changelog: &ChangeLog,
408 schema_version: u64,
409 ) -> Result<MutationResult, Error> {
410 let encoded = self.do_insert_with_id(entity, id, data)?;
411
412 let changed_fields: Vec<String> = data.iter().map(|fv| fv.field.clone()).collect();
414 let entry = ChangeLogEntry::insert(entity, id, encoded, changed_fields, schema_version);
415
416 changelog
418 .append(entry)
419 .map_err(|e| Error::Database(format!("failed to log to changelog: {}", e)))?;
420
421 Ok(MutationResult::inserted(id))
422 }
423
424 fn do_insert_with_id(
426 &self,
427 entity: &str,
428 id: [u8; 16],
429 data: &[FieldValue],
430 ) -> Result<Vec<u8>, Error> {
431 let mut fields: Vec<(String, Value)> = Vec::with_capacity(data.len() + 1);
433 fields.push(("id".to_string(), Value::Uuid(id)));
434
435 for fv in data {
436 if fv.field != "id" {
437 fields.push((fv.field.clone(), fv.value.clone()));
438 }
439 }
440
441 let encoded = encode_entity(&fields)
443 .map_err(|e| Error::Database(format!("failed to encode entity: {}", e)))?;
444
445 let key = VersionedKey::now(id);
447 self.database
448 .storage()
449 .put_typed(entity, key, Record::new(encoded.clone()))
450 .map_err(|e| Error::Storage(e))?;
451 self.database.statistics().increment(entity);
452 self.update_secondary_indexes(entity, id, None, Some(&encoded))?;
453
454 Ok(encoded)
455 }
456
457 fn update_secondary_indexes(
458 &self,
459 entity: &str,
460 entity_id: [u8; 16],
461 before: Option<&[u8]>,
462 after: Option<&[u8]>,
463 ) -> Result<(), Error> {
464 let before_fields = if let Some(data) = before {
465 Some(decode_entity(data).map_err(|e| {
466 Error::Database(format!("failed to decode entity: {}", e))
467 })?)
468 } else {
469 None
470 };
471
472 let after_fields = if let Some(data) = after {
473 Some(decode_entity(data).map_err(|e| {
474 Error::Database(format!("failed to decode entity: {}", e))
475 })?)
476 } else {
477 None
478 };
479
480 self.update_hash_indexes(entity, entity_id, &before_fields, &after_fields)?;
481 self.update_btree_indexes(entity, entity_id, &before_fields, &after_fields)?;
482
483 if after.is_none() {
484 self.delete_columnar_row(entity, entity_id)?;
485 }
486
487 Ok(())
488 }
489
490 fn update_hash_indexes(
491 &self,
492 entity: &str,
493 entity_id: [u8; 16],
494 before_fields: &Option<Vec<(String, Value)>>,
495 after_fields: &Option<Vec<(String, Value)>>,
496 ) -> Result<(), Error> {
497 use std::collections::{HashMap, HashSet};
498
499 let to_map = |fields: &Option<Vec<(String, Value)>>| -> HashMap<String, Value> {
500 fields
501 .as_ref()
502 .map(|items| items.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
503 .unwrap_or_default()
504 };
505
506 let before_map = to_map(before_fields);
507 let after_map = to_map(after_fields);
508
509 let mut names: HashSet<String> = HashSet::new();
510 names.extend(before_map.keys().cloned());
511 names.extend(after_map.keys().cloned());
512
513 let hash_index = self.database.storage().hash_index();
514
515 for name in names {
516 let before_value = before_map.get(&name);
517 let after_value = after_map.get(&name);
518
519 if before_value == after_value {
520 continue;
521 }
522
523 if let Some(value) = before_value {
524 if !matches!(value, Value::Null) {
525 hash_index.remove(entity, &name, value, entity_id)?;
526 }
527 }
528
529 if let Some(value) = after_value {
530 if !matches!(value, Value::Null) {
531 hash_index.insert(entity, &name, value, entity_id)?;
532 }
533 }
534 }
535
536 Ok(())
537 }
538
539 fn update_btree_indexes(
540 &self,
541 entity: &str,
542 entity_id: [u8; 16],
543 before_fields: &Option<Vec<(String, Value)>>,
544 after_fields: &Option<Vec<(String, Value)>>,
545 ) -> Result<(), Error> {
546 use std::collections::{HashMap, HashSet};
547
548 let Some(btree) = self.database.storage().btree_index() else {
549 return Ok(());
550 };
551
552 let to_map = |fields: &Option<Vec<(String, Value)>>| -> HashMap<String, Value> {
553 fields
554 .as_ref()
555 .map(|items| items.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
556 .unwrap_or_default()
557 };
558
559 let before_map = to_map(before_fields);
560 let after_map = to_map(after_fields);
561
562 let mut columns: HashSet<String> = HashSet::new();
563 columns.extend(self.database.storage().btree_indexed_columns_for_entity(entity));
564
565 let relations = self
566 .database
567 .catalog()
568 .relations_to(entity)
569 .map_err(|e| Error::Database(format!("failed to load relations: {}", e)))?;
570 for relation in relations {
571 columns.insert(relation.to_field.clone());
572 }
573
574 for column in columns {
575 let before_value = before_map.get(&column);
576 let after_value = after_map.get(&column);
577
578 if before_value == after_value {
579 continue;
580 }
581
582 if let Some(value) = before_value {
583 if !matches!(value, Value::Null) {
584 btree.remove(entity, &column, value, entity_id)?;
585 }
586 }
587
588 if let Some(value) = after_value {
589 if !matches!(value, Value::Null) {
590 btree.insert(entity, &column, value, entity_id)?;
591 }
592 }
593 }
594
595 Ok(())
596 }
597
598 fn delete_columnar_row(
599 &self,
600 entity: &str,
601 entity_id: [u8; 16],
602 ) -> Result<(), Error> {
603 let entity_def = self
604 .database
605 .catalog()
606 .get_entity(entity)
607 .map_err(|e| Error::Database(format!("failed to load entity: {}", e)))?
608 .ok_or_else(|| Error::Database(format!("entity '{}' not found", entity)))?;
609
610 let columns: Vec<&str> = entity_def
611 .fields
612 .iter()
613 .map(|field| field.name.as_str())
614 .collect();
615
616 let projection = self
617 .database
618 .storage()
619 .columnar()
620 .projection(entity)
621 .map_err(|e| Error::Database(format!("failed to load columnar projection: {}", e)))?;
622
623 projection
624 .delete_row(&entity_id, &columns)
625 .map_err(|e| Error::Database(format!("failed to delete columnar row: {}", e)))?;
626
627 Ok(())
628 }
629}
630
631fn hex_id(id: &[u8; 16]) -> String {
633 id.iter().map(|b| format!("{:02x}", b)).collect()
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639 use ormdb_core::catalog::{EntityDef, FieldDef, FieldType, ScalarType, SchemaBundle};
640
641 fn setup_test_db() -> (tempfile::TempDir, Database) {
642 let dir = tempfile::tempdir().unwrap();
643 let db = Database::open(dir.path()).unwrap();
644
645 let schema = SchemaBundle::new(1).with_entity(
647 EntityDef::new("User", "id")
648 .with_field(FieldDef::new("id", FieldType::Scalar(ScalarType::Uuid)))
649 .with_field(FieldDef::new("name", FieldType::Scalar(ScalarType::String)))
650 .with_field(FieldDef::new("age", FieldType::Scalar(ScalarType::Int32))),
651 );
652 db.catalog().apply_schema(schema).unwrap();
653
654 (dir, db)
655 }
656
657 #[test]
658 fn test_insert() {
659 let (_dir, db) = setup_test_db();
660 let executor = MutationExecutor::new(&db);
661
662 let mutation = Mutation::insert(
663 "User",
664 vec![
665 FieldValue::new("name", "Alice"),
666 FieldValue::new("age", 30i32),
667 ],
668 );
669
670 let result = executor.execute(&mutation).unwrap();
671 assert_eq!(result.affected, 1);
672 assert_eq!(result.inserted_ids.len(), 1);
673
674 let inserted_id = result.inserted_ids[0];
676 let (_, record) = db.storage().get_latest(&inserted_id).unwrap().unwrap();
677 let fields = ormdb_core::query::decode_entity(&record.data).unwrap();
678
679 assert!(fields.iter().any(|(n, v)| n == "name" && *v == Value::String("Alice".into())));
680 assert!(fields.iter().any(|(n, v)| n == "age" && *v == Value::Int32(30)));
681 }
682
683 #[test]
684 fn test_update() {
685 let (_dir, db) = setup_test_db();
686 let executor = MutationExecutor::new(&db);
687
688 let insert = Mutation::insert(
690 "User",
691 vec![
692 FieldValue::new("name", "Alice"),
693 FieldValue::new("age", 30i32),
694 ],
695 );
696 let insert_result = executor.execute(&insert).unwrap();
697 let id = insert_result.inserted_ids[0];
698
699 let update = Mutation::update(
701 "User",
702 id,
703 vec![FieldValue::new("name", "Alicia"), FieldValue::new("age", 31i32)],
704 );
705 let update_result = executor.execute(&update).unwrap();
706 assert_eq!(update_result.affected, 1);
707
708 let (_, record) = db.storage().get_latest(&id).unwrap().unwrap();
710 let fields = ormdb_core::query::decode_entity(&record.data).unwrap();
711
712 assert!(fields.iter().any(|(n, v)| n == "name" && *v == Value::String("Alicia".into())));
713 assert!(fields.iter().any(|(n, v)| n == "age" && *v == Value::Int32(31)));
714 }
715
716 #[test]
717 fn test_delete() {
718 let (_dir, db) = setup_test_db();
719 let executor = MutationExecutor::new(&db);
720
721 let insert = Mutation::insert("User", vec![FieldValue::new("name", "Bob")]);
723 let insert_result = executor.execute(&insert).unwrap();
724 let id = insert_result.inserted_ids[0];
725
726 let delete = Mutation::delete("User", id);
728 let delete_result = executor.execute(&delete).unwrap();
729 assert_eq!(delete_result.affected, 1);
730
731 assert!(db.storage().get_latest(&id).unwrap().is_none());
733 }
734
735 #[test]
736 fn test_delete_nonexistent() {
737 let (_dir, db) = setup_test_db();
738 let executor = MutationExecutor::new(&db);
739
740 let delete = Mutation::delete("User", [0u8; 16]);
741 let result = executor.execute(&delete).unwrap();
742 assert_eq!(result.affected, 0);
743 }
744
745 #[test]
746 fn test_upsert_insert() {
747 let (_dir, db) = setup_test_db();
748 let executor = MutationExecutor::new(&db);
749
750 let upsert = Mutation::upsert(
751 "User",
752 None,
753 vec![FieldValue::new("name", "Charlie")],
754 );
755 let result = executor.execute(&upsert).unwrap();
756 assert_eq!(result.affected, 1);
757 assert_eq!(result.inserted_ids.len(), 1);
758 }
759
760 #[test]
761 fn test_upsert_update() {
762 let (_dir, db) = setup_test_db();
763 let executor = MutationExecutor::new(&db);
764
765 let insert = Mutation::insert("User", vec![FieldValue::new("name", "Dave")]);
767 let insert_result = executor.execute(&insert).unwrap();
768 let id = insert_result.inserted_ids[0];
769
770 let upsert = Mutation::upsert(
772 "User",
773 Some(id),
774 vec![FieldValue::new("name", "David")],
775 );
776 let result = executor.execute(&upsert).unwrap();
777 assert_eq!(result.affected, 1);
778 assert!(result.inserted_ids.is_empty());
779
780 let (_, record) = db.storage().get_latest(&id).unwrap().unwrap();
782 let fields = ormdb_core::query::decode_entity(&record.data).unwrap();
783 assert!(fields.iter().any(|(n, v)| n == "name" && *v == Value::String("David".into())));
784 }
785
786 #[test]
787 fn test_batch() {
788 let (_dir, db) = setup_test_db();
789 let executor = MutationExecutor::new(&db);
790
791 let batch = MutationBatch::from_mutations(vec![
792 Mutation::insert("User", vec![FieldValue::new("name", "User1")]),
793 Mutation::insert("User", vec![FieldValue::new("name", "User2")]),
794 Mutation::insert("User", vec![FieldValue::new("name", "User3")]),
795 ]);
796
797 let result = executor.execute_batch(&batch).unwrap();
798 assert_eq!(result.affected, 3);
799 assert_eq!(result.inserted_ids.len(), 3);
800 }
801}