Skip to main content

ormdb_server/
mutation.rs

1//! Mutation executor for handling write operations.
2
3use ormdb_core::query::{decode_entity, encode_entity};
4use ormdb_core::replication::ChangeLog;
5use ormdb_core::storage::{Record, StorageEngine, VersionedKey};
6use ormdb_proto::replication::ChangeLogEntry;
7use ormdb_proto::{FieldValue, Mutation, MutationBatch, MutationResult, Value};
8
9use crate::database::Database;
10use crate::error::Error;
11
12/// Executes mutation operations against the database.
13pub struct MutationExecutor<'a> {
14    database: &'a Database,
15}
16
17impl<'a> MutationExecutor<'a> {
18    /// Create a new mutation executor.
19    pub fn new(database: &'a Database) -> Self {
20        Self { database }
21    }
22
23    /// Execute a single mutation (without CDC logging).
24    pub fn execute(&self, mutation: &Mutation) -> Result<MutationResult, Error> {
25        match mutation {
26            Mutation::Insert { entity, data } => self.execute_insert(entity, data),
27            Mutation::Update { entity, id, data } => self.execute_update(entity, id, data),
28            Mutation::Delete { entity, id } => self.execute_delete(entity, id),
29            Mutation::Upsert { entity, id, data } => self.execute_upsert(entity, id.as_ref(), data),
30        }
31    }
32
33    /// Execute a single mutation with CDC logging.
34    ///
35    /// This logs the mutation to the changelog after successful execution.
36    pub fn execute_with_cdc(&self, mutation: &Mutation) -> Result<MutationResult, Error> {
37        let changelog = self.database.changelog();
38        let schema_version = self.database.schema_version();
39
40        match mutation {
41            Mutation::Insert { entity, data } => {
42                self.execute_insert_with_cdc(entity, data, changelog, schema_version)
43            }
44            Mutation::Update { entity, id, data } => {
45                self.execute_update_with_cdc(entity, id, data, changelog, schema_version)
46            }
47            Mutation::Delete { entity, id } => {
48                self.execute_delete_with_cdc(entity, id, changelog, schema_version)
49            }
50            Mutation::Upsert { entity, id, data } => {
51                self.execute_upsert_with_cdc(entity, id.as_ref(), data, changelog, schema_version)
52            }
53        }
54    }
55
56    /// Execute a batch of mutations atomically (without CDC logging).
57    pub fn execute_batch(&self, batch: &MutationBatch) -> Result<MutationResult, Error> {
58        if batch.is_empty() {
59            return Ok(MutationResult::affected(0));
60        }
61
62        let mut inserted_ids = Vec::new();
63        let mut affected = 0u64;
64
65        // Execute each mutation in order
66        // Note: For true atomicity, we'd use sled transactions here.
67        // For now, we execute sequentially which is sufficient for most use cases.
68        for mutation in &batch.mutations {
69            let result = self.execute(mutation)?;
70            affected += result.affected;
71            inserted_ids.extend(result.inserted_ids);
72        }
73
74        if inserted_ids.is_empty() {
75            Ok(MutationResult::affected(affected))
76        } else {
77            Ok(MutationResult::bulk_inserted(inserted_ids))
78        }
79    }
80
81    /// Execute a batch of mutations with CDC logging.
82    pub fn execute_batch_with_cdc(&self, batch: &MutationBatch) -> Result<MutationResult, Error> {
83        if batch.is_empty() {
84            return Ok(MutationResult::affected(0));
85        }
86
87        let mut inserted_ids = Vec::new();
88        let mut affected = 0u64;
89
90        for mutation in &batch.mutations {
91            let result = self.execute_with_cdc(mutation)?;
92            affected += result.affected;
93            inserted_ids.extend(result.inserted_ids);
94        }
95
96        if inserted_ids.is_empty() {
97            Ok(MutationResult::affected(affected))
98        } else {
99            Ok(MutationResult::bulk_inserted(inserted_ids))
100        }
101    }
102
103    /// Execute an insert operation.
104    fn execute_insert(&self, entity: &str, data: &[FieldValue]) -> Result<MutationResult, Error> {
105        let (id, _encoded) = self.do_insert(entity, data)?;
106        Ok(MutationResult::inserted(id))
107    }
108
109    /// Execute an insert operation with CDC logging.
110    fn execute_insert_with_cdc(
111        &self,
112        entity: &str,
113        data: &[FieldValue],
114        changelog: &ChangeLog,
115        schema_version: u64,
116    ) -> Result<MutationResult, Error> {
117        let (id, encoded) = self.do_insert(entity, data)?;
118
119        // Create changelog entry
120        let changed_fields: Vec<String> = data.iter().map(|fv| fv.field.clone()).collect();
121        let entry = ChangeLogEntry::insert(entity, id, encoded, changed_fields, schema_version);
122
123        // Log to changelog
124        changelog
125            .append(entry)
126            .map_err(|e| Error::Database(format!("failed to log to changelog: {}", e)))?;
127
128        Ok(MutationResult::inserted(id))
129    }
130
131    /// Internal insert implementation that returns both ID and encoded data.
132    fn do_insert(&self, entity: &str, data: &[FieldValue]) -> Result<([u8; 16], Vec<u8>), Error> {
133        // Generate a new entity ID
134        let id = StorageEngine::generate_id();
135
136        // Build field data including the ID
137        let mut fields: Vec<(String, Value)> = Vec::with_capacity(data.len() + 1);
138        fields.push(("id".to_string(), Value::Uuid(id)));
139
140        for fv in data {
141            // Skip if someone tried to set the ID field
142            if fv.field != "id" {
143                fields.push((fv.field.clone(), fv.value.clone()));
144            }
145        }
146
147        // Encode the entity data
148        let encoded = encode_entity(&fields)
149            .map_err(|e| Error::Database(format!("failed to encode entity: {}", e)))?;
150
151        // Store the entity
152        let key = VersionedKey::now(id);
153        self.database
154            .storage()
155            .put_typed(entity, key, Record::new(encoded.clone()))
156            .map_err(|e| Error::Storage(e))?;
157        self.database.statistics().increment(entity);
158        self.update_secondary_indexes(entity, id, None, Some(&encoded))?;
159
160        Ok((id, encoded))
161    }
162
163    /// Execute an update operation.
164    fn execute_update(
165        &self,
166        entity: &str,
167        id: &[u8; 16],
168        data: &[FieldValue],
169    ) -> Result<MutationResult, Error> {
170        let _ = self.do_update(entity, id, data)?;
171        Ok(MutationResult::affected(1))
172    }
173
174    /// Execute an update operation with CDC logging.
175    fn execute_update_with_cdc(
176        &self,
177        entity: &str,
178        id: &[u8; 16],
179        data: &[FieldValue],
180        changelog: &ChangeLog,
181        schema_version: u64,
182    ) -> Result<MutationResult, Error> {
183        let (before_data, after_data, changed_fields) = self.do_update(entity, id, data)?;
184
185        // Create changelog entry
186        let entry = ChangeLogEntry::update(
187            entity,
188            *id,
189            before_data,
190            after_data,
191            changed_fields,
192            schema_version,
193        );
194
195        // Log to changelog
196        changelog
197            .append(entry)
198            .map_err(|e| Error::Database(format!("failed to log to changelog: {}", e)))?;
199
200        Ok(MutationResult::affected(1))
201    }
202
203    /// Internal update implementation that returns before/after data and changed fields.
204    fn do_update(
205        &self,
206        entity: &str,
207        id: &[u8; 16],
208        data: &[FieldValue],
209    ) -> Result<(Vec<u8>, Vec<u8>, Vec<String>), Error> {
210        // Get existing entity data
211        let (_version, existing) = self
212            .database
213            .storage()
214            .get_latest(id)
215            .map_err(|e| Error::Storage(e))?
216            .ok_or_else(|| {
217                Error::Database(format!("entity {}:{} not found", entity, hex_id(id)))
218            })?;
219
220        let before_data = existing.data.clone();
221
222        // Decode existing fields
223        let before_fields: Vec<(String, Value)> =
224            decode_entity(&existing.data)
225                .map_err(|e| Error::Database(format!("failed to decode entity: {}", e)))?;
226
227        let mut fields = before_fields.clone();
228
229        // Merge updates (replace existing fields, add new ones)
230        let mut changed_fields = Vec::new();
231        for fv in data {
232            // Don't allow updating the ID field
233            if fv.field == "id" {
234                continue;
235            }
236
237            if let Some(pos) = fields.iter().position(|(name, _)| name == &fv.field) {
238                if fields[pos].1 != fv.value {
239                    changed_fields.push(fv.field.clone());
240                }
241                fields[pos].1 = fv.value.clone();
242            } else {
243                changed_fields.push(fv.field.clone());
244                fields.push((fv.field.clone(), fv.value.clone()));
245            }
246        }
247
248        // Encode and store
249        let encoded = encode_entity(&fields)
250            .map_err(|e| Error::Database(format!("failed to encode entity: {}", e)))?;
251
252        let key = VersionedKey::now(*id);
253        self.database
254            .storage()
255            .put_typed(entity, key, Record::new(encoded.clone()))
256            .map_err(|e| Error::Storage(e))?;
257        self.update_secondary_indexes(entity, *id, Some(&before_data), Some(&encoded))?;
258
259        Ok((before_data, encoded, changed_fields))
260    }
261
262    /// Execute a delete operation.
263    fn execute_delete(&self, entity: &str, id: &[u8; 16]) -> Result<MutationResult, Error> {
264        let before_data = self.do_delete(entity, id)?;
265        if before_data.is_some() {
266            Ok(MutationResult::affected(1))
267        } else {
268            Ok(MutationResult::affected(0))
269        }
270    }
271
272    /// Execute a delete operation with CDC logging.
273    fn execute_delete_with_cdc(
274        &self,
275        entity: &str,
276        id: &[u8; 16],
277        changelog: &ChangeLog,
278        schema_version: u64,
279    ) -> Result<MutationResult, Error> {
280        let before_data = self.do_delete(entity, id)?;
281
282        if let Some(data) = before_data {
283            // Create changelog entry
284            let entry = ChangeLogEntry::delete(entity, *id, data, schema_version);
285
286            // Log to changelog
287            changelog
288                .append(entry)
289                .map_err(|e| Error::Database(format!("failed to log to changelog: {}", e)))?;
290
291            Ok(MutationResult::affected(1))
292        } else {
293            Ok(MutationResult::affected(0))
294        }
295    }
296
297    /// Internal delete implementation that returns the before data if entity existed.
298    fn do_delete(&self, entity: &str, id: &[u8; 16]) -> Result<Option<Vec<u8>>, Error> {
299        // Check if entity exists and get its data
300        let existing = self
301            .database
302            .storage()
303            .get_latest(id)
304            .map_err(|e| Error::Storage(e))?;
305
306        if existing.is_none() {
307            return Ok(None);
308        }
309
310        let (_version, record) = existing.unwrap();
311        let before_data = record.data.clone();
312
313        // Soft delete (creates tombstone)
314        self.database
315            .storage()
316            .delete_typed(entity, id)
317            .map_err(|e| Error::Storage(e))?;
318        self.database.statistics().decrement(entity);
319        self.update_secondary_indexes(entity, *id, Some(&before_data), None)?;
320
321        Ok(Some(before_data))
322    }
323
324    /// Execute an upsert operation.
325    fn execute_upsert(
326        &self,
327        entity: &str,
328        id: Option<&[u8; 16]>,
329        data: &[FieldValue],
330    ) -> Result<MutationResult, Error> {
331        match id {
332            Some(existing_id) => {
333                // Check if entity exists
334                let exists = self
335                    .database
336                    .storage()
337                    .get_latest(existing_id)
338                    .map_err(|e| Error::Storage(e))?
339                    .is_some();
340
341                if exists {
342                    // Update existing
343                    self.execute_update(entity, existing_id, data)
344                } else {
345                    // Insert with provided ID
346                    self.execute_insert_with_id(entity, *existing_id, data)
347                }
348            }
349            None => {
350                // No ID provided, always insert
351                self.execute_insert(entity, data)
352            }
353        }
354    }
355
356    /// Execute an upsert operation with CDC logging.
357    fn execute_upsert_with_cdc(
358        &self,
359        entity: &str,
360        id: Option<&[u8; 16]>,
361        data: &[FieldValue],
362        changelog: &ChangeLog,
363        schema_version: u64,
364    ) -> Result<MutationResult, Error> {
365        match id {
366            Some(existing_id) => {
367                // Check if entity exists
368                let exists = self
369                    .database
370                    .storage()
371                    .get_latest(existing_id)
372                    .map_err(|e| Error::Storage(e))?
373                    .is_some();
374
375                if exists {
376                    // Update existing with CDC
377                    self.execute_update_with_cdc(entity, existing_id, data, changelog, schema_version)
378                } else {
379                    // Insert with provided ID and CDC
380                    self.execute_insert_with_id_cdc(entity, *existing_id, data, changelog, schema_version)
381                }
382            }
383            None => {
384                // No ID provided, always insert with CDC
385                self.execute_insert_with_cdc(entity, data, changelog, schema_version)
386            }
387        }
388    }
389
390    /// Execute an insert with a specific ID.
391    fn execute_insert_with_id(
392        &self,
393        entity: &str,
394        id: [u8; 16],
395        data: &[FieldValue],
396    ) -> Result<MutationResult, Error> {
397        let _ = self.do_insert_with_id(entity, id, data)?;
398        Ok(MutationResult::inserted(id))
399    }
400
401    /// Execute an insert with a specific ID and CDC logging.
402    fn execute_insert_with_id_cdc(
403        &self,
404        entity: &str,
405        id: [u8; 16],
406        data: &[FieldValue],
407        changelog: &ChangeLog,
408        schema_version: u64,
409    ) -> Result<MutationResult, Error> {
410        let encoded = self.do_insert_with_id(entity, id, data)?;
411
412        // Create changelog entry
413        let changed_fields: Vec<String> = data.iter().map(|fv| fv.field.clone()).collect();
414        let entry = ChangeLogEntry::insert(entity, id, encoded, changed_fields, schema_version);
415
416        // Log to changelog
417        changelog
418            .append(entry)
419            .map_err(|e| Error::Database(format!("failed to log to changelog: {}", e)))?;
420
421        Ok(MutationResult::inserted(id))
422    }
423
424    /// Internal insert with ID implementation.
425    fn do_insert_with_id(
426        &self,
427        entity: &str,
428        id: [u8; 16],
429        data: &[FieldValue],
430    ) -> Result<Vec<u8>, Error> {
431        // Build field data including the ID
432        let mut fields: Vec<(String, Value)> = Vec::with_capacity(data.len() + 1);
433        fields.push(("id".to_string(), Value::Uuid(id)));
434
435        for fv in data {
436            if fv.field != "id" {
437                fields.push((fv.field.clone(), fv.value.clone()));
438            }
439        }
440
441        // Encode the entity data
442        let encoded = encode_entity(&fields)
443            .map_err(|e| Error::Database(format!("failed to encode entity: {}", e)))?;
444
445        // Store the entity
446        let key = VersionedKey::now(id);
447        self.database
448            .storage()
449            .put_typed(entity, key, Record::new(encoded.clone()))
450            .map_err(|e| Error::Storage(e))?;
451        self.database.statistics().increment(entity);
452        self.update_secondary_indexes(entity, id, None, Some(&encoded))?;
453
454        Ok(encoded)
455    }
456
457    fn update_secondary_indexes(
458        &self,
459        entity: &str,
460        entity_id: [u8; 16],
461        before: Option<&[u8]>,
462        after: Option<&[u8]>,
463    ) -> Result<(), Error> {
464        let before_fields = if let Some(data) = before {
465            Some(decode_entity(data).map_err(|e| {
466                Error::Database(format!("failed to decode entity: {}", e))
467            })?)
468        } else {
469            None
470        };
471
472        let after_fields = if let Some(data) = after {
473            Some(decode_entity(data).map_err(|e| {
474                Error::Database(format!("failed to decode entity: {}", e))
475            })?)
476        } else {
477            None
478        };
479
480        self.update_hash_indexes(entity, entity_id, &before_fields, &after_fields)?;
481        self.update_btree_indexes(entity, entity_id, &before_fields, &after_fields)?;
482
483        if after.is_none() {
484            self.delete_columnar_row(entity, entity_id)?;
485        }
486
487        Ok(())
488    }
489
490    fn update_hash_indexes(
491        &self,
492        entity: &str,
493        entity_id: [u8; 16],
494        before_fields: &Option<Vec<(String, Value)>>,
495        after_fields: &Option<Vec<(String, Value)>>,
496    ) -> Result<(), Error> {
497        use std::collections::{HashMap, HashSet};
498
499        let to_map = |fields: &Option<Vec<(String, Value)>>| -> HashMap<String, Value> {
500            fields
501                .as_ref()
502                .map(|items| items.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
503                .unwrap_or_default()
504        };
505
506        let before_map = to_map(before_fields);
507        let after_map = to_map(after_fields);
508
509        let mut names: HashSet<String> = HashSet::new();
510        names.extend(before_map.keys().cloned());
511        names.extend(after_map.keys().cloned());
512
513        let hash_index = self.database.storage().hash_index();
514
515        for name in names {
516            let before_value = before_map.get(&name);
517            let after_value = after_map.get(&name);
518
519            if before_value == after_value {
520                continue;
521            }
522
523            if let Some(value) = before_value {
524                if !matches!(value, Value::Null) {
525                    hash_index.remove(entity, &name, value, entity_id)?;
526                }
527            }
528
529            if let Some(value) = after_value {
530                if !matches!(value, Value::Null) {
531                    hash_index.insert(entity, &name, value, entity_id)?;
532                }
533            }
534        }
535
536        Ok(())
537    }
538
539    fn update_btree_indexes(
540        &self,
541        entity: &str,
542        entity_id: [u8; 16],
543        before_fields: &Option<Vec<(String, Value)>>,
544        after_fields: &Option<Vec<(String, Value)>>,
545    ) -> Result<(), Error> {
546        use std::collections::{HashMap, HashSet};
547
548        let Some(btree) = self.database.storage().btree_index() else {
549            return Ok(());
550        };
551
552        let to_map = |fields: &Option<Vec<(String, Value)>>| -> HashMap<String, Value> {
553            fields
554                .as_ref()
555                .map(|items| items.iter().map(|(k, v)| (k.clone(), v.clone())).collect())
556                .unwrap_or_default()
557        };
558
559        let before_map = to_map(before_fields);
560        let after_map = to_map(after_fields);
561
562        let mut columns: HashSet<String> = HashSet::new();
563        columns.extend(self.database.storage().btree_indexed_columns_for_entity(entity));
564
565        let relations = self
566            .database
567            .catalog()
568            .relations_to(entity)
569            .map_err(|e| Error::Database(format!("failed to load relations: {}", e)))?;
570        for relation in relations {
571            columns.insert(relation.to_field.clone());
572        }
573
574        for column in columns {
575            let before_value = before_map.get(&column);
576            let after_value = after_map.get(&column);
577
578            if before_value == after_value {
579                continue;
580            }
581
582            if let Some(value) = before_value {
583                if !matches!(value, Value::Null) {
584                    btree.remove(entity, &column, value, entity_id)?;
585                }
586            }
587
588            if let Some(value) = after_value {
589                if !matches!(value, Value::Null) {
590                    btree.insert(entity, &column, value, entity_id)?;
591                }
592            }
593        }
594
595        Ok(())
596    }
597
598    fn delete_columnar_row(
599        &self,
600        entity: &str,
601        entity_id: [u8; 16],
602    ) -> Result<(), Error> {
603        let entity_def = self
604            .database
605            .catalog()
606            .get_entity(entity)
607            .map_err(|e| Error::Database(format!("failed to load entity: {}", e)))?
608            .ok_or_else(|| Error::Database(format!("entity '{}' not found", entity)))?;
609
610        let columns: Vec<&str> = entity_def
611            .fields
612            .iter()
613            .map(|field| field.name.as_str())
614            .collect();
615
616        let projection = self
617            .database
618            .storage()
619            .columnar()
620            .projection(entity)
621            .map_err(|e| Error::Database(format!("failed to load columnar projection: {}", e)))?;
622
623        projection
624            .delete_row(&entity_id, &columns)
625            .map_err(|e| Error::Database(format!("failed to delete columnar row: {}", e)))?;
626
627        Ok(())
628    }
629}
630
631/// Format an ID as hex for error messages.
632fn hex_id(id: &[u8; 16]) -> String {
633    id.iter().map(|b| format!("{:02x}", b)).collect()
634}
635
636#[cfg(test)]
637mod tests {
638    use super::*;
639    use ormdb_core::catalog::{EntityDef, FieldDef, FieldType, ScalarType, SchemaBundle};
640
641    fn setup_test_db() -> (tempfile::TempDir, Database) {
642        let dir = tempfile::tempdir().unwrap();
643        let db = Database::open(dir.path()).unwrap();
644
645        // Create schema
646        let schema = SchemaBundle::new(1).with_entity(
647            EntityDef::new("User", "id")
648                .with_field(FieldDef::new("id", FieldType::Scalar(ScalarType::Uuid)))
649                .with_field(FieldDef::new("name", FieldType::Scalar(ScalarType::String)))
650                .with_field(FieldDef::new("age", FieldType::Scalar(ScalarType::Int32))),
651        );
652        db.catalog().apply_schema(schema).unwrap();
653
654        (dir, db)
655    }
656
657    #[test]
658    fn test_insert() {
659        let (_dir, db) = setup_test_db();
660        let executor = MutationExecutor::new(&db);
661
662        let mutation = Mutation::insert(
663            "User",
664            vec![
665                FieldValue::new("name", "Alice"),
666                FieldValue::new("age", 30i32),
667            ],
668        );
669
670        let result = executor.execute(&mutation).unwrap();
671        assert_eq!(result.affected, 1);
672        assert_eq!(result.inserted_ids.len(), 1);
673
674        // Verify we can query the entity back
675        let inserted_id = result.inserted_ids[0];
676        let (_, record) = db.storage().get_latest(&inserted_id).unwrap().unwrap();
677        let fields = ormdb_core::query::decode_entity(&record.data).unwrap();
678
679        assert!(fields.iter().any(|(n, v)| n == "name" && *v == Value::String("Alice".into())));
680        assert!(fields.iter().any(|(n, v)| n == "age" && *v == Value::Int32(30)));
681    }
682
683    #[test]
684    fn test_update() {
685        let (_dir, db) = setup_test_db();
686        let executor = MutationExecutor::new(&db);
687
688        // First insert
689        let insert = Mutation::insert(
690            "User",
691            vec![
692                FieldValue::new("name", "Alice"),
693                FieldValue::new("age", 30i32),
694            ],
695        );
696        let insert_result = executor.execute(&insert).unwrap();
697        let id = insert_result.inserted_ids[0];
698
699        // Then update
700        let update = Mutation::update(
701            "User",
702            id,
703            vec![FieldValue::new("name", "Alicia"), FieldValue::new("age", 31i32)],
704        );
705        let update_result = executor.execute(&update).unwrap();
706        assert_eq!(update_result.affected, 1);
707
708        // Verify changes
709        let (_, record) = db.storage().get_latest(&id).unwrap().unwrap();
710        let fields = ormdb_core::query::decode_entity(&record.data).unwrap();
711
712        assert!(fields.iter().any(|(n, v)| n == "name" && *v == Value::String("Alicia".into())));
713        assert!(fields.iter().any(|(n, v)| n == "age" && *v == Value::Int32(31)));
714    }
715
716    #[test]
717    fn test_delete() {
718        let (_dir, db) = setup_test_db();
719        let executor = MutationExecutor::new(&db);
720
721        // First insert
722        let insert = Mutation::insert("User", vec![FieldValue::new("name", "Bob")]);
723        let insert_result = executor.execute(&insert).unwrap();
724        let id = insert_result.inserted_ids[0];
725
726        // Then delete
727        let delete = Mutation::delete("User", id);
728        let delete_result = executor.execute(&delete).unwrap();
729        assert_eq!(delete_result.affected, 1);
730
731        // Verify deleted
732        assert!(db.storage().get_latest(&id).unwrap().is_none());
733    }
734
735    #[test]
736    fn test_delete_nonexistent() {
737        let (_dir, db) = setup_test_db();
738        let executor = MutationExecutor::new(&db);
739
740        let delete = Mutation::delete("User", [0u8; 16]);
741        let result = executor.execute(&delete).unwrap();
742        assert_eq!(result.affected, 0);
743    }
744
745    #[test]
746    fn test_upsert_insert() {
747        let (_dir, db) = setup_test_db();
748        let executor = MutationExecutor::new(&db);
749
750        let upsert = Mutation::upsert(
751            "User",
752            None,
753            vec![FieldValue::new("name", "Charlie")],
754        );
755        let result = executor.execute(&upsert).unwrap();
756        assert_eq!(result.affected, 1);
757        assert_eq!(result.inserted_ids.len(), 1);
758    }
759
760    #[test]
761    fn test_upsert_update() {
762        let (_dir, db) = setup_test_db();
763        let executor = MutationExecutor::new(&db);
764
765        // First insert
766        let insert = Mutation::insert("User", vec![FieldValue::new("name", "Dave")]);
767        let insert_result = executor.execute(&insert).unwrap();
768        let id = insert_result.inserted_ids[0];
769
770        // Then upsert (should update)
771        let upsert = Mutation::upsert(
772            "User",
773            Some(id),
774            vec![FieldValue::new("name", "David")],
775        );
776        let result = executor.execute(&upsert).unwrap();
777        assert_eq!(result.affected, 1);
778        assert!(result.inserted_ids.is_empty());
779
780        // Verify
781        let (_, record) = db.storage().get_latest(&id).unwrap().unwrap();
782        let fields = ormdb_core::query::decode_entity(&record.data).unwrap();
783        assert!(fields.iter().any(|(n, v)| n == "name" && *v == Value::String("David".into())));
784    }
785
786    #[test]
787    fn test_batch() {
788        let (_dir, db) = setup_test_db();
789        let executor = MutationExecutor::new(&db);
790
791        let batch = MutationBatch::from_mutations(vec![
792            Mutation::insert("User", vec![FieldValue::new("name", "User1")]),
793            Mutation::insert("User", vec![FieldValue::new("name", "User2")]),
794            Mutation::insert("User", vec![FieldValue::new("name", "User3")]),
795        ]);
796
797        let result = executor.execute_batch(&batch).unwrap();
798        assert_eq!(result.affected, 3);
799        assert_eq!(result.inserted_ids.len(), 3);
800    }
801}