Skip to main content

icydb_core/db/executor/mutation/
save.rs

1//! Module: executor::mutation::save
2//! Responsibility: save-mode execution (`insert`/`update`/`replace`) and batch lanes.
3//! Does not own: relation-domain validation semantics or commit marker protocol internals.
4//! Boundary: save preflight + commit-window handoff for one entity type.
5
6use crate::{
7    db::{
8        Db,
9        commit::CommitRowOp,
10        data::{CanonicalRow, DataKey, PersistedRow, RawRow, UpdatePatch},
11        executor::{
12            Context, ExecutorError,
13            mutation::{
14                MutationInput, commit_save_row_ops_with_window,
15                commit_single_save_row_op_with_window, mutation_write_context,
16            },
17        },
18        schema::commit_schema_fingerprint_for_entity,
19    },
20    error::InternalError,
21    metrics::sink::{ExecKind, MetricsEvent, Span, record},
22    traits::{EntityValue, FieldValue},
23};
24use candid::CandidType;
25use derive_more::Display;
26use serde::{Deserialize, Serialize};
27use std::collections::BTreeSet;
28
29// Debug assertions below are diagnostic sentinels; correctness is enforced by
30// runtime validation earlier in the pipeline.
31
32///
33/// SaveMode
34///
35/// Create  : will only insert a row if it's empty
36/// Replace : will change the row regardless of what was there
37/// Update  : will only change an existing row
38///
39
40#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Display, Serialize)]
41enum SaveMode {
42    #[default]
43    Insert,
44    Replace,
45    Update,
46}
47
48///
49/// SaveExecutor
50///
51
52#[derive(Clone, Copy)]
53pub(in crate::db) struct SaveExecutor<E: PersistedRow + EntityValue> {
54    pub(in crate::db::executor::mutation) db: Db<E::Canister>,
55}
56
57///
58/// SaveRule
59///
60/// Canonical save precondition for resolving the current row baseline.
61///
62#[derive(Clone, Copy)]
63enum SaveRule {
64    RequireAbsent,
65    RequirePresent,
66    AllowAny,
67}
68
69impl SaveRule {
70    const fn from_mode(mode: SaveMode) -> Self {
71        match mode {
72            SaveMode::Insert => Self::RequireAbsent,
73            SaveMode::Update => Self::RequirePresent,
74            SaveMode::Replace => Self::AllowAny,
75        }
76    }
77}
78
79///
80/// MutationMode
81///
82/// MutationMode
83///
84/// MutationMode makes the structural patch path spell out the same
85/// row-existence contract the typed save surface already owns.
86/// This keeps future structural callers from smuggling write-mode meaning
87/// through ad hoc helper choice once the seam moves beyond `icydb-core`.
88///
89
90#[derive(Clone, Copy)]
91pub enum MutationMode {
92    #[allow(dead_code)]
93    Insert,
94    #[allow(dead_code)]
95    Replace,
96    Update,
97}
98
99impl MutationMode {
100    const fn save_rule(self) -> SaveRule {
101        match self {
102            Self::Insert => SaveRule::RequireAbsent,
103            Self::Replace => SaveRule::AllowAny,
104            Self::Update => SaveRule::RequirePresent,
105        }
106    }
107}
108
109impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
110    // ======================================================================
111    // Construction & configuration
112    // ======================================================================
113
114    /// Construct one save executor bound to a database handle.
115    #[must_use]
116    pub(in crate::db) const fn new(db: Db<E::Canister>, _debug: bool) -> Self {
117        Self { db }
118    }
119
120    // ======================================================================
121    // Single-entity save operations
122    // ======================================================================
123
124    /// Insert a brand-new entity (errors if the key already exists).
125    pub(crate) fn insert(&self, entity: E) -> Result<E, InternalError> {
126        self.save_entity(SaveMode::Insert, entity)
127    }
128
129    /// Update an existing entity (errors if it does not exist).
130    pub(crate) fn update(&self, entity: E) -> Result<E, InternalError> {
131        self.save_entity(SaveMode::Update, entity)
132    }
133
134    /// Apply one structural field patch to an existing entity row.
135    ///
136    /// This entrypoint is intentionally staged ahead of the higher-level API
137    /// layer so the executor boundary can lock its invariants first.
138    #[allow(dead_code)]
139    pub(in crate::db) fn insert_structural(
140        &self,
141        key: E::Key,
142        patch: UpdatePatch,
143    ) -> Result<E, InternalError> {
144        self.apply_structural_mutation(MutationMode::Insert, key, patch)
145    }
146
147    /// Apply one structural full-row replacement, inserting if missing.
148    ///
149    /// Replace semantics deliberately rebuild the after-image from an empty row
150    /// layout so absent fields do not inherit old-row values implicitly.
151    #[allow(dead_code)]
152    pub(in crate::db) fn replace_structural(
153        &self,
154        key: E::Key,
155        patch: UpdatePatch,
156    ) -> Result<E, InternalError> {
157        self.apply_structural_mutation(MutationMode::Replace, key, patch)
158    }
159
160    /// Apply one structural field patch to an existing entity row.
161    ///
162    /// This entrypoint is intentionally staged ahead of the higher-level API
163    /// layer so the executor boundary can lock its invariants first.
164    #[allow(dead_code)]
165    pub(in crate::db) fn update_structural(
166        &self,
167        key: E::Key,
168        patch: UpdatePatch,
169    ) -> Result<E, InternalError> {
170        self.apply_structural_mutation(MutationMode::Update, key, patch)
171    }
172
173    /// Replace an entity, inserting if missing.
174    pub(crate) fn replace(&self, entity: E) -> Result<E, InternalError> {
175        self.save_entity(SaveMode::Replace, entity)
176    }
177
178    // ======================================================================
179    // Batch save operations (explicit atomic and non-atomic lanes)
180    // ======================================================================
181
182    /// Save a batch with explicitly non-atomic semantics.
183    ///
184    /// WARNING: this helper is fail-fast and non-atomic. If one element fails,
185    /// earlier elements in the batch remain committed.
186    fn save_batch_non_atomic(
187        &self,
188        mode: SaveMode,
189        entities: impl IntoIterator<Item = E>,
190    ) -> Result<Vec<E>, InternalError> {
191        let iter = entities.into_iter();
192        let mut out = Vec::with_capacity(iter.size_hint().0);
193
194        for entity in iter {
195            match self.save_entity(mode, entity) {
196                Ok(saved) => out.push(saved),
197                Err(err) => {
198                    if !out.is_empty() {
199                        record(MetricsEvent::NonAtomicPartialCommit {
200                            entity_path: E::PATH,
201                            committed_rows: u64::try_from(out.len()).unwrap_or(u64::MAX),
202                        });
203                    }
204
205                    return Err(err);
206                }
207            }
208        }
209
210        Ok(out)
211    }
212
213    /// Save a single-entity-type batch atomically in a single commit window.
214    ///
215    /// All entities are prevalidated first; if any entity fails pre-commit validation,
216    /// no row in this batch is persisted.
217    ///
218    /// This is not a multi-entity transaction surface.
219    fn save_batch_atomic(
220        &self,
221        mode: SaveMode,
222        entities: impl IntoIterator<Item = E>,
223    ) -> Result<Vec<E>, InternalError> {
224        let entities: Vec<E> = entities.into_iter().collect();
225
226        self.save_batch_atomic_impl(SaveRule::from_mode(mode), entities)
227    }
228
229    /// Insert a single-entity-type batch atomically in one commit window.
230    ///
231    /// This API is not a multi-entity transaction surface.
232    pub(crate) fn insert_many_atomic(
233        &self,
234        entities: impl IntoIterator<Item = E>,
235    ) -> Result<Vec<E>, InternalError> {
236        self.save_batch_atomic(SaveMode::Insert, entities)
237    }
238
239    /// Update a single-entity-type batch atomically in one commit window.
240    ///
241    /// This API is not a multi-entity transaction surface.
242    pub(crate) fn update_many_atomic(
243        &self,
244        entities: impl IntoIterator<Item = E>,
245    ) -> Result<Vec<E>, InternalError> {
246        self.save_batch_atomic(SaveMode::Update, entities)
247    }
248
249    /// Replace a single-entity-type batch atomically in one commit window.
250    ///
251    /// This API is not a multi-entity transaction surface.
252    pub(crate) fn replace_many_atomic(
253        &self,
254        entities: impl IntoIterator<Item = E>,
255    ) -> Result<Vec<E>, InternalError> {
256        self.save_batch_atomic(SaveMode::Replace, entities)
257    }
258
259    /// Insert a batch with explicitly non-atomic semantics.
260    ///
261    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
262    pub(crate) fn insert_many_non_atomic(
263        &self,
264        entities: impl IntoIterator<Item = E>,
265    ) -> Result<Vec<E>, InternalError> {
266        self.save_batch_non_atomic(SaveMode::Insert, entities)
267    }
268
269    /// Update a batch with explicitly non-atomic semantics.
270    ///
271    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
272    pub(crate) fn update_many_non_atomic(
273        &self,
274        entities: impl IntoIterator<Item = E>,
275    ) -> Result<Vec<E>, InternalError> {
276        self.save_batch_non_atomic(SaveMode::Update, entities)
277    }
278
279    /// Replace a batch with explicitly non-atomic semantics.
280    ///
281    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
282    pub(crate) fn replace_many_non_atomic(
283        &self,
284        entities: impl IntoIterator<Item = E>,
285    ) -> Result<Vec<E>, InternalError> {
286        self.save_batch_non_atomic(SaveMode::Replace, entities)
287    }
288
289    // Keep the atomic batch body out of the iterator-generic wrapper so mode
290    // wrappers do not each own one copy of the full staging pipeline.
291    #[inline(never)]
292    fn save_batch_atomic_impl(
293        &self,
294        save_rule: SaveRule,
295        entities: Vec<E>,
296    ) -> Result<Vec<E>, InternalError> {
297        // Phase 1: validate + stage all row ops before opening the commit window.
298        let mut span = Span::<E>::new(ExecKind::Save);
299        let ctx = mutation_write_context::<E>(&self.db)?;
300        let mut out = Vec::with_capacity(entities.len());
301        let mut marker_row_ops = Vec::with_capacity(entities.len());
302        let mut seen_row_keys = BTreeSet::<Vec<u8>>::new();
303
304        // Validate and stage all row ops before opening the commit window.
305        for mut entity in entities {
306            self.preflight_entity(&mut entity)?;
307            let mutation = MutationInput::from_entity(&entity)?;
308
309            let (marker_row_op, data_key) =
310                Self::prepare_logical_row_op(&ctx, save_rule, &mutation)?;
311            if !seen_row_keys.insert(marker_row_op.key.clone()) {
312                return Err(InternalError::mutation_atomic_save_duplicate_key(
313                    E::PATH,
314                    data_key,
315                ));
316            }
317            marker_row_ops.push(marker_row_op);
318            out.push(entity);
319        }
320
321        if marker_row_ops.is_empty() {
322            return Ok(out);
323        }
324
325        // Phase 2: enter commit window and apply staged row ops atomically.
326        Self::commit_atomic_batch(&self.db, marker_row_ops, &mut span)?;
327
328        Ok(out)
329    }
330
331    // Build one logical row operation from the save rule and current entity.
332    fn prepare_logical_row_op(
333        ctx: &Context<'_, E>,
334        save_rule: SaveRule,
335        mutation: &MutationInput,
336    ) -> Result<(CommitRowOp, DataKey), InternalError> {
337        // Phase 1: resolve key + current-store baseline from the canonical save rule.
338        let data_key = mutation.data_key().clone();
339        let raw_key = data_key.to_raw()?;
340        let old_raw = Self::resolve_existing_row_for_rule(ctx, &data_key, save_rule)?;
341        let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
342
343        // Phase 2: build the after-image through the structural row boundary.
344        let row = Self::build_after_image_row(mutation, old_raw.as_ref())?;
345        let row_op = CommitRowOp::new(
346            E::PATH,
347            raw_key.as_bytes().to_vec(),
348            old_raw.as_ref().map(|item| item.as_bytes().to_vec()),
349            Some(row.as_bytes().to_vec()),
350            schema_fingerprint,
351        );
352
353        Ok((row_op, data_key))
354    }
355
356    // Build the persisted after-image under one explicit structural mode.
357    fn build_after_image_row(
358        mutation: &MutationInput,
359        old_row: Option<&RawRow>,
360    ) -> Result<CanonicalRow, InternalError> {
361        let Some(old_row) = old_row else {
362            return RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch());
363        };
364
365        old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
366    }
367
368    // Build the persisted after-image under one explicit structural mode.
369    fn build_structural_after_image_row(
370        mode: MutationMode,
371        mutation: &MutationInput,
372        old_row: Option<&RawRow>,
373    ) -> Result<CanonicalRow, InternalError> {
374        match mode {
375            MutationMode::Update => {
376                let Some(old_row) = old_row else {
377                    return RawRow::from_serialized_update_patch(
378                        E::MODEL,
379                        mutation.serialized_patch(),
380                    );
381                };
382
383                old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
384            }
385            MutationMode::Insert | MutationMode::Replace => {
386                RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch())
387            }
388        }
389    }
390
391    // Resolve the "before" row according to one canonical save rule.
392    fn resolve_existing_row_for_rule(
393        ctx: &Context<'_, E>,
394        data_key: &DataKey,
395        save_rule: SaveRule,
396    ) -> Result<Option<RawRow>, InternalError> {
397        let raw_key = data_key.to_raw()?;
398
399        match save_rule {
400            SaveRule::RequireAbsent => {
401                if let Some(existing) = ctx.with_store(|store| store.get(&raw_key))? {
402                    Self::validate_existing_row_identity(data_key, &existing)?;
403                    return Err(ExecutorError::KeyExists(data_key.clone()).into());
404                }
405
406                Ok(None)
407            }
408            SaveRule::RequirePresent => {
409                let old_row = ctx
410                    .with_store(|store| store.get(&raw_key))?
411                    .ok_or_else(|| InternalError::store_not_found(data_key.to_string()))?;
412                Self::validate_existing_row_identity(data_key, &old_row)?;
413
414                Ok(Some(old_row))
415            }
416            SaveRule::AllowAny => {
417                let old_row = ctx.with_store(|store| store.get(&raw_key))?;
418                if let Some(old) = old_row.as_ref() {
419                    Self::validate_existing_row_identity(data_key, old)?;
420                }
421
422                Ok(old_row)
423            }
424        }
425    }
426
427    // Decode an existing row and verify it is consistent with the target data key.
428    fn validate_existing_row_identity(
429        data_key: &DataKey,
430        row: &RawRow,
431    ) -> Result<(), InternalError> {
432        Self::ensure_persisted_row_invariants(data_key, row).map_err(|err| {
433            match (err.class(), err.origin()) {
434                (
435                    crate::error::ErrorClass::Corruption,
436                    crate::error::ErrorOrigin::Serialize | crate::error::ErrorOrigin::Store,
437                ) => err,
438                _ => InternalError::from(ExecutorError::persisted_row_invariant_violation(
439                    data_key,
440                    &err.message,
441                )),
442            }
443        })?;
444
445        Ok(())
446    }
447
448    fn save_entity(&self, mode: SaveMode, entity: E) -> Result<E, InternalError> {
449        let mut entity = entity;
450        (|| {
451            let mut span = Span::<E>::new(ExecKind::Save);
452            let ctx = mutation_write_context::<E>(&self.db)?;
453            let save_rule = SaveRule::from_mode(mode);
454
455            // Run the canonical save preflight before key extraction.
456            self.preflight_entity(&mut entity)?;
457            let mutation = MutationInput::from_entity(&entity)?;
458
459            let (marker_row_op, _data_key) =
460                Self::prepare_logical_row_op(&ctx, save_rule, &mutation)?;
461
462            // Stage-2 commit protocol:
463            // - preflight row-op preparation before persisting the marker
464            // - then apply prepared row ops mechanically.
465            // Durable correctness is marker + recovery owned. Apply guard rollback
466            // here is best-effort, in-process cleanup only.
467            Self::commit_single_row(&self.db, marker_row_op, &mut span)?;
468
469            Ok(entity)
470        })()
471    }
472
473    // Run one structural key + patch mutation under one explicit save-mode contract.
474    #[allow(dead_code)]
475    pub(in crate::db) fn apply_structural_mutation(
476        &self,
477        mode: MutationMode,
478        key: E::Key,
479        patch: UpdatePatch,
480    ) -> Result<E, InternalError> {
481        let mutation = MutationInput::from_update_patch::<E>(key, &patch)?;
482
483        self.save_structural_mutation(mode, mutation)
484    }
485
486    #[allow(dead_code)]
487    fn save_structural_mutation(
488        &self,
489        mode: MutationMode,
490        mutation: MutationInput,
491    ) -> Result<E, InternalError> {
492        let mut span = Span::<E>::new(ExecKind::Save);
493        let ctx = mutation_write_context::<E>(&self.db)?;
494        let data_key = mutation.data_key().clone();
495        let old_raw = Self::resolve_existing_row_for_rule(&ctx, &data_key, mode.save_rule())?;
496        let raw_after_image =
497            Self::build_structural_after_image_row(mode, &mutation, old_raw.as_ref())?;
498        let entity = self.validate_structural_after_image(&data_key, &raw_after_image)?;
499        let normalized_mutation = MutationInput::from_entity(&entity)?;
500        let row =
501            Self::build_structural_after_image_row(mode, &normalized_mutation, old_raw.as_ref())?;
502        let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
503        let marker_row_op = CommitRowOp::new(
504            E::PATH,
505            data_key.to_raw()?.as_bytes().to_vec(),
506            old_raw.as_ref().map(|item| item.as_bytes().to_vec()),
507            Some(row.as_bytes().to_vec()),
508            schema_fingerprint,
509        );
510
511        Self::commit_single_row(&self.db, marker_row_op, &mut span)?;
512
513        Ok(entity)
514    }
515
516    // Validate one structurally patched after-image by decoding it against the
517    // target key and reusing the existing typed save preflight rules.
518    #[allow(dead_code)]
519    fn validate_structural_after_image(
520        &self,
521        data_key: &DataKey,
522        row: &RawRow,
523    ) -> Result<E, InternalError> {
524        let expected_key = data_key.try_key::<E>()?;
525        let mut entity = row.try_decode::<E>().map_err(|err| {
526            InternalError::mutation_structural_after_image_invalid(
527                E::PATH,
528                data_key,
529                err.to_string(),
530            )
531        })?;
532        let identity_key = entity.id().key();
533        if identity_key != expected_key {
534            let field_name = E::MODEL.primary_key().name();
535            let field_value = FieldValue::to_value(&identity_key);
536            let identity_value = FieldValue::to_value(&expected_key);
537
538            return Err(InternalError::mutation_entity_primary_key_mismatch(
539                E::PATH,
540                field_name,
541                &field_value,
542                &identity_value,
543            ));
544        }
545
546        self.preflight_entity(&mut entity)?;
547
548        Ok(entity)
549    }
550
551    // Open + apply commit mechanics for one logical row operation.
552    fn commit_single_row(
553        db: &Db<E::Canister>,
554        marker_row_op: CommitRowOp,
555        span: &mut Span<E>,
556    ) -> Result<(), InternalError> {
557        // FIRST STABLE WRITE: commit marker is persisted before any mutations.
558        commit_single_save_row_op_with_window::<E>(db, marker_row_op, "save_row_apply", || {
559            span.set_rows(1);
560        })?;
561
562        Ok(())
563    }
564
565    // Open + apply commit mechanics for an atomic staged row-op batch.
566    fn commit_atomic_batch(
567        db: &Db<E::Canister>,
568        marker_row_ops: Vec<CommitRowOp>,
569        span: &mut Span<E>,
570    ) -> Result<(), InternalError> {
571        let rows_touched = u64::try_from(marker_row_ops.len()).unwrap_or(u64::MAX);
572        commit_save_row_ops_with_window::<E>(
573            db,
574            marker_row_ops,
575            "save_batch_atomic_row_apply",
576            || {
577                span.set_rows(rows_touched);
578            },
579        )?;
580
581        Ok(())
582    }
583}