Skip to main content

icydb_core/db/executor/mutation/
save.rs

1//! Module: executor::mutation::save
2//! Responsibility: save-mode execution (`insert`/`update`/`replace`) and batch lanes.
3//! Does not own: relation-domain validation semantics or commit marker protocol internals.
4//! Boundary: save preflight + commit-window handoff for one entity type.
5
6use crate::{
7    db::{
8        Db,
9        commit::CommitRowOp,
10        data::{DataKey, PersistedRow, RawRow, UpdatePatch, decode_raw_row_for_entity_key},
11        executor::{
12            Context, ExecutorError,
13            mutation::{
14                StructuralMutationInput, commit_save_row_ops_with_window, mutation_write_context,
15            },
16        },
17        schema::commit_schema_fingerprint_for_entity,
18    },
19    error::InternalError,
20    metrics::sink::{ExecKind, MetricsEvent, Span, record},
21    traits::{EntityValue, FieldValue},
22};
23use candid::CandidType;
24use derive_more::Display;
25use serde::{Deserialize, Serialize};
26use std::collections::BTreeSet;
27
28// Debug assertions below are diagnostic sentinels; correctness is enforced by
29// runtime validation earlier in the pipeline.
30
31///
32/// SaveMode
33///
34/// Create  : will only insert a row if it's empty
35/// Replace : will change the row regardless of what was there
36/// Update  : will only change an existing row
37///
38
39#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Display, Serialize)]
40enum SaveMode {
41    #[default]
42    Insert,
43    Replace,
44    Update,
45}
46
47///
48/// SaveExecutor
49///
50
51#[derive(Clone, Copy)]
52pub(in crate::db) struct SaveExecutor<E: PersistedRow + EntityValue> {
53    pub(in crate::db::executor::mutation) db: Db<E::Canister>,
54}
55
56///
57/// SaveRule
58///
59/// Canonical save precondition for resolving the current row baseline.
60///
61#[derive(Clone, Copy)]
62enum SaveRule {
63    RequireAbsent,
64    RequirePresent,
65    AllowAny,
66}
67
68impl SaveRule {
69    const fn from_mode(mode: SaveMode) -> Self {
70        match mode {
71            SaveMode::Insert => Self::RequireAbsent,
72            SaveMode::Update => Self::RequirePresent,
73            SaveMode::Replace => Self::AllowAny,
74        }
75    }
76}
77
78///
79/// StructuralMutationMode
80///
81/// StructuralMutationMode
82///
83/// StructuralMutationMode makes the structural patch path spell out the same
84/// row-existence contract the typed save surface already owns.
85/// This keeps future structural callers from smuggling write-mode meaning
86/// through ad hoc helper choice once the seam moves beyond `icydb-core`.
87///
88
89#[derive(Clone, Copy)]
90pub enum StructuralMutationMode {
91    #[allow(dead_code)]
92    Insert,
93    #[allow(dead_code)]
94    Replace,
95    Update,
96}
97
98impl StructuralMutationMode {
99    const fn save_rule(self) -> SaveRule {
100        match self {
101            Self::Insert => SaveRule::RequireAbsent,
102            Self::Replace => SaveRule::AllowAny,
103            Self::Update => SaveRule::RequirePresent,
104        }
105    }
106}
107
108impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
109    // ======================================================================
110    // Construction & configuration
111    // ======================================================================
112
113    /// Construct one save executor bound to a database handle.
114    #[must_use]
115    pub(in crate::db) const fn new(db: Db<E::Canister>, _debug: bool) -> Self {
116        Self { db }
117    }
118
119    // ======================================================================
120    // Single-entity save operations
121    // ======================================================================
122
123    /// Insert a brand-new entity (errors if the key already exists).
124    pub(crate) fn insert(&self, entity: E) -> Result<E, InternalError> {
125        self.save_entity(SaveMode::Insert, entity)
126    }
127
128    /// Update an existing entity (errors if it does not exist).
129    pub(crate) fn update(&self, entity: E) -> Result<E, InternalError> {
130        self.save_entity(SaveMode::Update, entity)
131    }
132
133    /// Apply one structural field patch to an existing entity row.
134    ///
135    /// This entrypoint is intentionally staged ahead of the higher-level API
136    /// layer so the executor boundary can lock its invariants first.
137    #[allow(dead_code)]
138    pub(in crate::db) fn insert_structural(
139        &self,
140        key: E::Key,
141        patch: UpdatePatch,
142    ) -> Result<E, InternalError> {
143        self.apply_structural_mutation(StructuralMutationMode::Insert, key, patch)
144    }
145
146    /// Apply one structural full-row replacement, inserting if missing.
147    ///
148    /// Replace semantics deliberately rebuild the after-image from an empty row
149    /// layout so absent fields do not inherit old-row values implicitly.
150    #[allow(dead_code)]
151    pub(in crate::db) fn replace_structural(
152        &self,
153        key: E::Key,
154        patch: UpdatePatch,
155    ) -> Result<E, InternalError> {
156        self.apply_structural_mutation(StructuralMutationMode::Replace, key, patch)
157    }
158
159    /// Apply one structural field patch to an existing entity row.
160    ///
161    /// This entrypoint is intentionally staged ahead of the higher-level API
162    /// layer so the executor boundary can lock its invariants first.
163    #[allow(dead_code)]
164    pub(in crate::db) fn update_structural(
165        &self,
166        key: E::Key,
167        patch: UpdatePatch,
168    ) -> Result<E, InternalError> {
169        self.apply_structural_mutation(StructuralMutationMode::Update, key, patch)
170    }
171
172    /// Replace an entity, inserting if missing.
173    pub(crate) fn replace(&self, entity: E) -> Result<E, InternalError> {
174        self.save_entity(SaveMode::Replace, entity)
175    }
176
177    // ======================================================================
178    // Batch save operations (explicit atomic and non-atomic lanes)
179    // ======================================================================
180
181    /// Save a batch with explicitly non-atomic semantics.
182    ///
183    /// WARNING: this helper is fail-fast and non-atomic. If one element fails,
184    /// earlier elements in the batch remain committed.
185    fn save_batch_non_atomic(
186        &self,
187        mode: SaveMode,
188        entities: impl IntoIterator<Item = E>,
189    ) -> Result<Vec<E>, InternalError> {
190        let iter = entities.into_iter();
191        let mut out = Vec::with_capacity(iter.size_hint().0);
192
193        for entity in iter {
194            match self.save_entity(mode, entity) {
195                Ok(saved) => out.push(saved),
196                Err(err) => {
197                    if !out.is_empty() {
198                        record(MetricsEvent::NonAtomicPartialCommit {
199                            entity_path: E::PATH,
200                            committed_rows: u64::try_from(out.len()).unwrap_or(u64::MAX),
201                        });
202                    }
203
204                    return Err(err);
205                }
206            }
207        }
208
209        Ok(out)
210    }
211
212    /// Save a single-entity-type batch atomically in a single commit window.
213    ///
214    /// All entities are prevalidated first; if any entity fails pre-commit validation,
215    /// no row in this batch is persisted.
216    ///
217    /// This is not a multi-entity transaction surface.
218    fn save_batch_atomic(
219        &self,
220        mode: SaveMode,
221        entities: impl IntoIterator<Item = E>,
222    ) -> Result<Vec<E>, InternalError> {
223        let entities: Vec<E> = entities.into_iter().collect();
224
225        self.save_batch_atomic_impl(SaveRule::from_mode(mode), entities)
226    }
227
228    /// Insert a single-entity-type batch atomically in one commit window.
229    ///
230    /// This API is not a multi-entity transaction surface.
231    pub(crate) fn insert_many_atomic(
232        &self,
233        entities: impl IntoIterator<Item = E>,
234    ) -> Result<Vec<E>, InternalError> {
235        self.save_batch_atomic(SaveMode::Insert, entities)
236    }
237
238    /// Update a single-entity-type batch atomically in one commit window.
239    ///
240    /// This API is not a multi-entity transaction surface.
241    pub(crate) fn update_many_atomic(
242        &self,
243        entities: impl IntoIterator<Item = E>,
244    ) -> Result<Vec<E>, InternalError> {
245        self.save_batch_atomic(SaveMode::Update, entities)
246    }
247
248    /// Replace a single-entity-type batch atomically in one commit window.
249    ///
250    /// This API is not a multi-entity transaction surface.
251    pub(crate) fn replace_many_atomic(
252        &self,
253        entities: impl IntoIterator<Item = E>,
254    ) -> Result<Vec<E>, InternalError> {
255        self.save_batch_atomic(SaveMode::Replace, entities)
256    }
257
258    /// Insert a batch with explicitly non-atomic semantics.
259    ///
260    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
261    pub(crate) fn insert_many_non_atomic(
262        &self,
263        entities: impl IntoIterator<Item = E>,
264    ) -> Result<Vec<E>, InternalError> {
265        self.save_batch_non_atomic(SaveMode::Insert, entities)
266    }
267
268    /// Update a batch with explicitly non-atomic semantics.
269    ///
270    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
271    pub(crate) fn update_many_non_atomic(
272        &self,
273        entities: impl IntoIterator<Item = E>,
274    ) -> Result<Vec<E>, InternalError> {
275        self.save_batch_non_atomic(SaveMode::Update, entities)
276    }
277
278    /// Replace a batch with explicitly non-atomic semantics.
279    ///
280    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
281    pub(crate) fn replace_many_non_atomic(
282        &self,
283        entities: impl IntoIterator<Item = E>,
284    ) -> Result<Vec<E>, InternalError> {
285        self.save_batch_non_atomic(SaveMode::Replace, entities)
286    }
287
288    // Keep the atomic batch body out of the iterator-generic wrapper so mode
289    // wrappers do not each own one copy of the full staging pipeline.
290    #[inline(never)]
291    fn save_batch_atomic_impl(
292        &self,
293        save_rule: SaveRule,
294        entities: Vec<E>,
295    ) -> Result<Vec<E>, InternalError> {
296        // Phase 1: validate + stage all row ops before opening the commit window.
297        let mut span = Span::<E>::new(ExecKind::Save);
298        let ctx = mutation_write_context::<E>(&self.db)?;
299        let mut out = Vec::with_capacity(entities.len());
300        let mut marker_row_ops = Vec::with_capacity(entities.len());
301        let mut seen_row_keys = BTreeSet::<Vec<u8>>::new();
302
303        // Validate and stage all row ops before opening the commit window.
304        for mut entity in entities {
305            self.preflight_entity(&mut entity)?;
306            let mutation = StructuralMutationInput::from_entity(&entity)?;
307
308            let (marker_row_op, data_key) =
309                Self::prepare_logical_row_op(&ctx, save_rule, &mutation)?;
310            if !seen_row_keys.insert(marker_row_op.key.clone()) {
311                return Err(InternalError::mutation_atomic_save_duplicate_key(
312                    E::PATH,
313                    data_key,
314                ));
315            }
316            marker_row_ops.push(marker_row_op);
317            out.push(entity);
318        }
319
320        if marker_row_ops.is_empty() {
321            return Ok(out);
322        }
323
324        // Phase 2: enter commit window and apply staged row ops atomically.
325        Self::commit_atomic_batch(&self.db, marker_row_ops, &mut span)?;
326
327        Ok(out)
328    }
329
330    // Build one logical row operation from the save rule and current entity.
331    fn prepare_logical_row_op(
332        ctx: &Context<'_, E>,
333        save_rule: SaveRule,
334        mutation: &StructuralMutationInput,
335    ) -> Result<(CommitRowOp, DataKey), InternalError> {
336        // Phase 1: resolve key + current-store baseline from the canonical save rule.
337        let data_key = mutation.data_key().clone();
338        let raw_key = data_key.to_raw()?;
339        let old_raw = Self::resolve_existing_row_for_rule(ctx, &data_key, save_rule)?;
340        let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
341
342        // Phase 2: build the after-image through the structural row boundary.
343        let row = Self::build_after_image_row(mutation, old_raw.as_ref())?;
344        let row_op = CommitRowOp::new(
345            E::PATH,
346            raw_key.as_bytes().to_vec(),
347            old_raw.as_ref().map(|item| item.as_bytes().to_vec()),
348            Some(row.as_bytes().to_vec()),
349            schema_fingerprint,
350        );
351
352        Ok((row_op, data_key))
353    }
354
355    // Build the persisted after-image under one explicit structural mode.
356    fn build_after_image_row(
357        mutation: &StructuralMutationInput,
358        old_row: Option<&RawRow>,
359    ) -> Result<RawRow, InternalError> {
360        let Some(old_row) = old_row else {
361            return RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch());
362        };
363
364        old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
365    }
366
367    // Build the persisted after-image under one explicit structural mode.
368    fn build_structural_after_image_row(
369        mode: StructuralMutationMode,
370        mutation: &StructuralMutationInput,
371        old_row: Option<&RawRow>,
372    ) -> Result<RawRow, InternalError> {
373        match mode {
374            StructuralMutationMode::Update => {
375                let Some(old_row) = old_row else {
376                    return RawRow::from_serialized_update_patch(
377                        E::MODEL,
378                        mutation.serialized_patch(),
379                    );
380                };
381
382                old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
383            }
384            StructuralMutationMode::Insert | StructuralMutationMode::Replace => {
385                RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch())
386            }
387        }
388    }
389
390    // Resolve the "before" row according to one canonical save rule.
391    fn resolve_existing_row_for_rule(
392        ctx: &Context<'_, E>,
393        data_key: &DataKey,
394        save_rule: SaveRule,
395    ) -> Result<Option<RawRow>, InternalError> {
396        let raw_key = data_key.to_raw()?;
397
398        match save_rule {
399            SaveRule::RequireAbsent => {
400                if let Some(existing) = ctx.with_store(|store| store.get(&raw_key))? {
401                    Self::validate_existing_row_identity(data_key, &existing)?;
402                    return Err(ExecutorError::KeyExists(data_key.clone()).into());
403                }
404
405                Ok(None)
406            }
407            SaveRule::RequirePresent => {
408                let old_row = ctx
409                    .with_store(|store| store.get(&raw_key))?
410                    .ok_or_else(|| InternalError::store_not_found(data_key.to_string()))?;
411                Self::validate_existing_row_identity(data_key, &old_row)?;
412
413                Ok(Some(old_row))
414            }
415            SaveRule::AllowAny => {
416                let old_row = ctx.with_store(|store| store.get(&raw_key))?;
417                if let Some(old) = old_row.as_ref() {
418                    Self::validate_existing_row_identity(data_key, old)?;
419                }
420
421                Ok(old_row)
422            }
423        }
424    }
425
426    // Decode an existing row and verify it is consistent with the target data key.
427    fn validate_existing_row_identity(
428        data_key: &DataKey,
429        row: &RawRow,
430    ) -> Result<(), InternalError> {
431        let (_, decoded) = decode_raw_row_for_entity_key::<E>(data_key, row)?;
432        Self::ensure_entity_invariants(&decoded).map_err(|err| {
433            InternalError::from(ExecutorError::persisted_row_invariant_violation(
434                data_key,
435                &err.message,
436            ))
437        })?;
438
439        Ok(())
440    }
441
442    fn save_entity(&self, mode: SaveMode, entity: E) -> Result<E, InternalError> {
443        let mut entity = entity;
444        (|| {
445            let mut span = Span::<E>::new(ExecKind::Save);
446            let ctx = mutation_write_context::<E>(&self.db)?;
447            let save_rule = SaveRule::from_mode(mode);
448
449            // Run the canonical save preflight before key extraction.
450            self.preflight_entity(&mut entity)?;
451            let mutation = StructuralMutationInput::from_entity(&entity)?;
452
453            let (marker_row_op, _data_key) =
454                Self::prepare_logical_row_op(&ctx, save_rule, &mutation)?;
455
456            // Preflight data store availability before index mutations.
457            ctx.with_store(|_| ())?;
458
459            // Stage-2 commit protocol:
460            // - preflight row-op preparation before persisting the marker
461            // - then apply prepared row ops mechanically.
462            // Durable correctness is marker + recovery owned. Apply guard rollback
463            // here is best-effort, in-process cleanup only.
464            Self::commit_single_row(&self.db, marker_row_op, &mut span)?;
465
466            Ok(entity)
467        })()
468    }
469
470    // Run one structural key + patch mutation under one explicit save-mode contract.
471    #[allow(dead_code)]
472    pub(in crate::db) fn apply_structural_mutation(
473        &self,
474        mode: StructuralMutationMode,
475        key: E::Key,
476        patch: UpdatePatch,
477    ) -> Result<E, InternalError> {
478        let mutation = StructuralMutationInput::from_update_patch::<E>(key, &patch)?;
479
480        self.save_structural_mutation(mode, mutation)
481    }
482
483    #[allow(dead_code)]
484    fn save_structural_mutation(
485        &self,
486        mode: StructuralMutationMode,
487        mutation: StructuralMutationInput,
488    ) -> Result<E, InternalError> {
489        let mut span = Span::<E>::new(ExecKind::Save);
490        let ctx = mutation_write_context::<E>(&self.db)?;
491        let data_key = mutation.data_key().clone();
492        let old_raw = Self::resolve_existing_row_for_rule(&ctx, &data_key, mode.save_rule())?;
493        let raw_after_image =
494            Self::build_structural_after_image_row(mode, &mutation, old_raw.as_ref())?;
495        let entity = self.validate_structural_after_image(&data_key, &raw_after_image)?;
496        let normalized_mutation = StructuralMutationInput::from_entity(&entity)?;
497        let row =
498            Self::build_structural_after_image_row(mode, &normalized_mutation, old_raw.as_ref())?;
499        let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
500        let marker_row_op = CommitRowOp::new(
501            E::PATH,
502            data_key.to_raw()?.as_bytes().to_vec(),
503            old_raw.as_ref().map(|item| item.as_bytes().to_vec()),
504            Some(row.as_bytes().to_vec()),
505            schema_fingerprint,
506        );
507
508        ctx.with_store(|_| ())?;
509        Self::commit_single_row(&self.db, marker_row_op, &mut span)?;
510
511        Ok(entity)
512    }
513
514    // Validate one structurally patched after-image by decoding it against the
515    // target key and reusing the existing typed save preflight rules.
516    #[allow(dead_code)]
517    fn validate_structural_after_image(
518        &self,
519        data_key: &DataKey,
520        row: &RawRow,
521    ) -> Result<E, InternalError> {
522        let expected_key = data_key.try_key::<E>()?;
523        let mut entity = row.try_decode::<E>().map_err(|err| {
524            InternalError::mutation_structural_after_image_invalid(
525                E::PATH,
526                data_key,
527                err.to_string(),
528            )
529        })?;
530        let identity_key = entity.id().key();
531        if identity_key != expected_key {
532            let field_name = E::MODEL.primary_key().name();
533            let field_value = FieldValue::to_value(&identity_key);
534            let identity_value = FieldValue::to_value(&expected_key);
535
536            return Err(InternalError::mutation_entity_primary_key_mismatch(
537                E::PATH,
538                field_name,
539                &field_value,
540                &identity_value,
541            ));
542        }
543
544        self.preflight_entity(&mut entity)?;
545
546        Ok(entity)
547    }
548
549    // Open + apply commit mechanics for one logical row operation.
550    fn commit_single_row(
551        db: &Db<E::Canister>,
552        marker_row_op: CommitRowOp,
553        span: &mut Span<E>,
554    ) -> Result<(), InternalError> {
555        // FIRST STABLE WRITE: commit marker is persisted before any mutations.
556        commit_save_row_ops_with_window::<E>(db, vec![marker_row_op], "save_row_apply", || {
557            span.set_rows(1);
558        })?;
559
560        Ok(())
561    }
562
563    // Open + apply commit mechanics for an atomic staged row-op batch.
564    fn commit_atomic_batch(
565        db: &Db<E::Canister>,
566        marker_row_ops: Vec<CommitRowOp>,
567        span: &mut Span<E>,
568    ) -> Result<(), InternalError> {
569        let rows_touched = u64::try_from(marker_row_ops.len()).unwrap_or(u64::MAX);
570        commit_save_row_ops_with_window::<E>(
571            db,
572            marker_row_ops,
573            "save_batch_atomic_row_apply",
574            || {
575                span.set_rows(rows_touched);
576            },
577        )?;
578
579        Ok(())
580    }
581}