Skip to main content

icydb_core/db/executor/mutation/
save.rs

1//! Module: executor::mutation::save
2//! Responsibility: save-mode execution (`insert`/`update`/`replace`) and batch lanes.
3//! Does not own: relation-domain validation semantics or commit marker protocol internals.
4//! Boundary: save preflight + commit-window handoff for one entity type.
5
6use crate::{
7    db::{
8        Db,
9        commit::{
10            CommitRowOp, CommitSchemaFingerprint,
11            prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint,
12        },
13        data::{CanonicalRow, DataKey, PersistedRow, RawRow, UpdatePatch},
14        executor::{
15            Context, ExecutorError,
16            mutation::{
17                MutationInput, PreparedRowOpDelta, commit_prepared_single_save_row_op_with_window,
18                commit_save_row_ops_with_window, emit_index_delta_metrics, mutation_write_context,
19                synchronized_store_handles_for_prepared_row_ops,
20            },
21        },
22        relation::model_has_strong_relation_targets,
23        schema::{SchemaInfo, commit_schema_fingerprint_for_entity},
24    },
25    error::InternalError,
26    metrics::sink::{ExecKind, MetricsEvent, Span, record},
27    traits::{EntityValue, FieldValue, Storable},
28};
29use candid::CandidType;
30use derive_more::Display;
31use serde::{Deserialize, Serialize};
32use std::collections::HashSet;
33
34// Debug assertions below are diagnostic sentinels; correctness is enforced by
35// runtime validation earlier in the pipeline.
36
37//
38// SaveMode
39//
40// Create  : will only insert a row if it's empty
41// Replace : will change the row regardless of what was there
42// Update  : will only change an existing row
43//
44
45#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Display, Serialize)]
46enum SaveMode {
47    #[default]
48    Insert,
49    Replace,
50    Update,
51}
52
53//
54// SaveExecutor
55//
56
57#[derive(Clone, Copy)]
58pub(in crate::db) struct SaveExecutor<E: PersistedRow + EntityValue> {
59    pub(in crate::db::executor::mutation) db: Db<E::Canister>,
60}
61
62//
63// SaveRule
64//
65// Canonical save precondition for resolving the current row baseline.
66//
67#[derive(Clone, Copy)]
68enum SaveRule {
69    RequireAbsent,
70    RequirePresent,
71    AllowAny,
72}
73
74impl SaveRule {
75    const fn from_mode(mode: SaveMode) -> Self {
76        match mode {
77            SaveMode::Insert => Self::RequireAbsent,
78            SaveMode::Update => Self::RequirePresent,
79            SaveMode::Replace => Self::AllowAny,
80        }
81    }
82}
83
84//
85// MutationMode
86//
87// MutationMode
88//
89// MutationMode makes the structural patch path spell out the same
90// row-existence contract the typed save surface already owns.
91// This keeps future structural callers from smuggling write-mode meaning
92// through ad hoc helper choice once the seam moves beyond `icydb-core`.
93//
94
95#[derive(Clone, Copy)]
96pub enum MutationMode {
97    #[allow(dead_code)]
98    Insert,
99    #[allow(dead_code)]
100    Replace,
101    Update,
102}
103
104impl MutationMode {
105    const fn save_rule(self) -> SaveRule {
106        match self {
107            Self::Insert => SaveRule::RequireAbsent,
108            Self::Replace => SaveRule::AllowAny,
109            Self::Update => SaveRule::RequirePresent,
110        }
111    }
112}
113
114impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
115    // ======================================================================
116    // Construction & configuration
117    // ======================================================================
118
119    /// Construct one save executor bound to a database handle.
120    #[must_use]
121    pub(in crate::db) const fn new(db: Db<E::Canister>, _debug: bool) -> Self {
122        Self { db }
123    }
124
125    // ======================================================================
126    // Single-entity save operations
127    // ======================================================================
128
129    /// Insert a brand-new entity (errors if the key already exists).
130    pub(crate) fn insert(&self, entity: E) -> Result<E, InternalError> {
131        self.save_entity(SaveMode::Insert, entity)
132    }
133
134    /// Update an existing entity (errors if it does not exist).
135    pub(crate) fn update(&self, entity: E) -> Result<E, InternalError> {
136        self.save_entity(SaveMode::Update, entity)
137    }
138
139    /// Apply one structural field patch to an existing entity row.
140    ///
141    /// This entrypoint is intentionally staged ahead of the higher-level API
142    /// layer so the executor boundary can lock its invariants first.
143    #[allow(dead_code)]
144    pub(in crate::db) fn insert_structural(
145        &self,
146        key: E::Key,
147        patch: UpdatePatch,
148    ) -> Result<E, InternalError> {
149        self.apply_structural_mutation(MutationMode::Insert, key, patch)
150    }
151
152    /// Apply one structural full-row replacement, inserting if missing.
153    ///
154    /// Replace semantics deliberately rebuild the after-image from an empty row
155    /// layout so absent fields do not inherit old-row values implicitly.
156    #[allow(dead_code)]
157    pub(in crate::db) fn replace_structural(
158        &self,
159        key: E::Key,
160        patch: UpdatePatch,
161    ) -> Result<E, InternalError> {
162        self.apply_structural_mutation(MutationMode::Replace, key, patch)
163    }
164
165    /// Apply one structural field patch to an existing entity row.
166    ///
167    /// This entrypoint is intentionally staged ahead of the higher-level API
168    /// layer so the executor boundary can lock its invariants first.
169    #[allow(dead_code)]
170    pub(in crate::db) fn update_structural(
171        &self,
172        key: E::Key,
173        patch: UpdatePatch,
174    ) -> Result<E, InternalError> {
175        self.apply_structural_mutation(MutationMode::Update, key, patch)
176    }
177
178    /// Replace an entity, inserting if missing.
179    pub(crate) fn replace(&self, entity: E) -> Result<E, InternalError> {
180        self.save_entity(SaveMode::Replace, entity)
181    }
182
183    // ======================================================================
184    // Batch save operations (explicit atomic and non-atomic lanes)
185    // ======================================================================
186
187    /// Save a batch with explicitly non-atomic semantics.
188    ///
189    /// WARNING: this helper is fail-fast and non-atomic. If one element fails,
190    /// earlier elements in the batch remain committed.
191    fn save_batch_non_atomic(
192        &self,
193        mode: SaveMode,
194        entities: impl IntoIterator<Item = E>,
195    ) -> Result<Vec<E>, InternalError> {
196        let iter = entities.into_iter();
197        let mut out = Vec::with_capacity(iter.size_hint().0);
198        let ctx = mutation_write_context::<E>(&self.db)?;
199        let save_rule = SaveRule::from_mode(mode);
200        let schema = Self::schema_info()?;
201        let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
202        let validate_relations = model_has_strong_relation_targets(E::MODEL);
203        let mut batch_span = None;
204        let mut batch_delta = PreparedRowOpDelta {
205            index_inserts: 0,
206            index_removes: 0,
207            reverse_index_inserts: 0,
208            reverse_index_removes: 0,
209        };
210
211        // Phase 1: apply each row independently while reusing the same resolved
212        // mutation context and schema metadata across the whole batch.
213        for entity in iter {
214            let span = batch_span.get_or_insert_with(|| Span::<E>::new(ExecKind::Save));
215
216            let result = (|| {
217                let (saved, marker_row_op) = self.prepare_entity_save_row_op(
218                    &ctx,
219                    save_rule,
220                    schema,
221                    schema_fingerprint,
222                    validate_relations,
223                    entity,
224                )?;
225                let prepared_row_op =
226                    prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<
227                        E,
228                    >(&self.db, &marker_row_op, &ctx, &ctx, schema_fingerprint)?;
229                Self::commit_prepared_single_row(
230                    &self.db,
231                    marker_row_op,
232                    prepared_row_op,
233                    |delta| accumulate_prepared_row_op_delta(&mut batch_delta, delta),
234                    || {},
235                )?;
236
237                Ok::<_, InternalError>(saved)
238            })();
239
240            match result {
241                Ok(saved) => {
242                    out.push(saved);
243                    span.set_rows(u64::try_from(out.len()).unwrap_or(u64::MAX));
244                }
245                Err(err) => {
246                    if !out.is_empty() {
247                        emit_index_delta_metrics::<E>(&batch_delta);
248                        record(MetricsEvent::NonAtomicPartialCommit {
249                            entity_path: E::PATH,
250                            committed_rows: u64::try_from(out.len()).unwrap_or(u64::MAX),
251                        });
252                    }
253
254                    return Err(err);
255                }
256            }
257        }
258
259        if !out.is_empty() {
260            emit_index_delta_metrics::<E>(&batch_delta);
261        }
262
263        Ok(out)
264    }
265
266    /// Save a single-entity-type batch atomically in a single commit window.
267    ///
268    /// All entities are prevalidated first; if any entity fails pre-commit validation,
269    /// no row in this batch is persisted.
270    ///
271    /// This is not a multi-entity transaction surface.
272    fn save_batch_atomic(
273        &self,
274        mode: SaveMode,
275        entities: impl IntoIterator<Item = E>,
276    ) -> Result<Vec<E>, InternalError> {
277        let entities: Vec<E> = entities.into_iter().collect();
278
279        self.save_batch_atomic_impl(SaveRule::from_mode(mode), entities)
280    }
281
282    /// Insert a single-entity-type batch atomically in one commit window.
283    ///
284    /// This API is not a multi-entity transaction surface.
285    pub(crate) fn insert_many_atomic(
286        &self,
287        entities: impl IntoIterator<Item = E>,
288    ) -> Result<Vec<E>, InternalError> {
289        self.save_batch_atomic(SaveMode::Insert, entities)
290    }
291
292    /// Update a single-entity-type batch atomically in one commit window.
293    ///
294    /// This API is not a multi-entity transaction surface.
295    pub(crate) fn update_many_atomic(
296        &self,
297        entities: impl IntoIterator<Item = E>,
298    ) -> Result<Vec<E>, InternalError> {
299        self.save_batch_atomic(SaveMode::Update, entities)
300    }
301
302    /// Replace a single-entity-type batch atomically in one commit window.
303    ///
304    /// This API is not a multi-entity transaction surface.
305    pub(crate) fn replace_many_atomic(
306        &self,
307        entities: impl IntoIterator<Item = E>,
308    ) -> Result<Vec<E>, InternalError> {
309        self.save_batch_atomic(SaveMode::Replace, entities)
310    }
311
312    /// Insert a batch with explicitly non-atomic semantics.
313    ///
314    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
315    pub(crate) fn insert_many_non_atomic(
316        &self,
317        entities: impl IntoIterator<Item = E>,
318    ) -> Result<Vec<E>, InternalError> {
319        self.save_batch_non_atomic(SaveMode::Insert, entities)
320    }
321
322    /// Update a batch with explicitly non-atomic semantics.
323    ///
324    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
325    pub(crate) fn update_many_non_atomic(
326        &self,
327        entities: impl IntoIterator<Item = E>,
328    ) -> Result<Vec<E>, InternalError> {
329        self.save_batch_non_atomic(SaveMode::Update, entities)
330    }
331
332    /// Replace a batch with explicitly non-atomic semantics.
333    ///
334    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
335    pub(crate) fn replace_many_non_atomic(
336        &self,
337        entities: impl IntoIterator<Item = E>,
338    ) -> Result<Vec<E>, InternalError> {
339        self.save_batch_non_atomic(SaveMode::Replace, entities)
340    }
341
342    // Keep the atomic batch body out of the iterator-generic wrapper so mode
343    // wrappers do not each own one copy of the full staging pipeline.
344    #[inline(never)]
345    fn save_batch_atomic_impl(
346        &self,
347        save_rule: SaveRule,
348        entities: Vec<E>,
349    ) -> Result<Vec<E>, InternalError> {
350        // Phase 1: validate + stage all row ops before opening the commit window.
351        let mut span = Span::<E>::new(ExecKind::Save);
352        let ctx = mutation_write_context::<E>(&self.db)?;
353        let mut out = Vec::with_capacity(entities.len());
354        let mut marker_row_ops = Vec::with_capacity(entities.len());
355        let mut seen_row_keys = HashSet::with_capacity(entities.len());
356        let schema = Self::schema_info()?;
357        let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
358        let validate_relations = model_has_strong_relation_targets(E::MODEL);
359
360        // Validate and stage all row ops before opening the commit window.
361        for mut entity in entities {
362            self.preflight_entity_with_cached_schema(&mut entity, schema, validate_relations)?;
363            let marker_row_op =
364                Self::prepare_typed_entity_row_op(&ctx, save_rule, &entity, schema_fingerprint)?;
365            if !seen_row_keys.insert(marker_row_op.key) {
366                let data_key = DataKey::try_new::<E>(entity.id().key())?;
367                return Err(InternalError::mutation_atomic_save_duplicate_key(
368                    E::PATH,
369                    data_key,
370                ));
371            }
372            marker_row_ops.push(marker_row_op);
373            out.push(entity);
374        }
375
376        if marker_row_ops.is_empty() {
377            return Ok(out);
378        }
379
380        // Phase 2: enter commit window and apply staged row ops atomically.
381        Self::commit_atomic_batch(&self.db, marker_row_ops, &mut span)?;
382
383        Ok(out)
384    }
385
386    // Build one logical row operation from a full typed after-image.
387    fn prepare_typed_entity_row_op(
388        ctx: &Context<'_, E>,
389        save_rule: SaveRule,
390        entity: &E,
391        schema_fingerprint: CommitSchemaFingerprint,
392    ) -> Result<CommitRowOp, InternalError> {
393        // Phase 1: resolve key + current-store baseline from the canonical save rule.
394        let data_key = DataKey::try_new::<E>(entity.id().key())?;
395        let raw_key = data_key.to_raw()?;
396        let old_raw = Self::resolve_existing_row_for_rule(ctx, &data_key, save_rule)?;
397
398        // Phase 2: typed save lanes already own a complete after-image, so
399        // emit the canonical row directly instead of replaying a dense slot
400        // patch back into the same full row image.
401        let row_bytes = CanonicalRow::from_entity(entity)?
402            .into_raw_row()
403            .into_bytes();
404        let before_bytes = old_raw.map(<RawRow as Storable>::into_bytes);
405        let row_op = CommitRowOp::new(
406            E::PATH,
407            raw_key,
408            before_bytes,
409            Some(row_bytes),
410            schema_fingerprint,
411        );
412
413        Ok(row_op)
414    }
415
416    // Build the persisted after-image under one explicit structural mode.
417    fn build_structural_after_image_row(
418        mode: MutationMode,
419        mutation: &MutationInput,
420        old_row: Option<&RawRow>,
421    ) -> Result<CanonicalRow, InternalError> {
422        match mode {
423            MutationMode::Update => {
424                let Some(old_row) = old_row else {
425                    return RawRow::from_serialized_update_patch(
426                        E::MODEL,
427                        mutation.serialized_patch(),
428                    );
429                };
430
431                old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
432            }
433            MutationMode::Insert | MutationMode::Replace => {
434                RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch())
435            }
436        }
437    }
438
439    // Resolve the "before" row according to one canonical save rule.
440    fn resolve_existing_row_for_rule(
441        ctx: &Context<'_, E>,
442        data_key: &DataKey,
443        save_rule: SaveRule,
444    ) -> Result<Option<RawRow>, InternalError> {
445        let raw_key = data_key.to_raw()?;
446
447        match save_rule {
448            SaveRule::RequireAbsent => {
449                if let Some(existing) = ctx.with_store(|store| store.get(&raw_key))? {
450                    Self::validate_existing_row_identity(data_key, &existing)?;
451                    return Err(ExecutorError::KeyExists(data_key.clone()).into());
452                }
453
454                Ok(None)
455            }
456            SaveRule::RequirePresent => {
457                let old_row = ctx
458                    .with_store(|store| store.get(&raw_key))?
459                    .ok_or_else(|| InternalError::store_not_found(data_key.to_string()))?;
460                Self::validate_existing_row_identity(data_key, &old_row)?;
461
462                Ok(Some(old_row))
463            }
464            SaveRule::AllowAny => {
465                let old_row = ctx.with_store(|store| store.get(&raw_key))?;
466                if let Some(old) = old_row.as_ref() {
467                    Self::validate_existing_row_identity(data_key, old)?;
468                }
469
470                Ok(old_row)
471            }
472        }
473    }
474
475    // Decode an existing row and verify it is consistent with the target data key.
476    fn validate_existing_row_identity(
477        data_key: &DataKey,
478        row: &RawRow,
479    ) -> Result<(), InternalError> {
480        Self::ensure_persisted_row_invariants(data_key, row).map_err(|err| {
481            match (err.class(), err.origin()) {
482                (
483                    crate::error::ErrorClass::Corruption,
484                    crate::error::ErrorOrigin::Serialize | crate::error::ErrorOrigin::Store,
485                ) => err,
486                _ => InternalError::from(ExecutorError::persisted_row_invariant_violation(
487                    data_key,
488                    &err.message,
489                )),
490            }
491        })?;
492
493        Ok(())
494    }
495
496    fn save_entity(&self, mode: SaveMode, entity: E) -> Result<E, InternalError> {
497        let ctx = mutation_write_context::<E>(&self.db)?;
498        let save_rule = SaveRule::from_mode(mode);
499
500        self.save_entity_with_context(&ctx, save_rule, entity)
501    }
502
503    // Run one typed save against an already-resolved write context so batch
504    // non-atomic lanes do not rebuild the same store authority for every row.
505    fn save_entity_with_context(
506        &self,
507        ctx: &Context<'_, E>,
508        save_rule: SaveRule,
509        entity: E,
510    ) -> Result<E, InternalError> {
511        let schema = Self::schema_info()?;
512        let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
513        let validate_relations = model_has_strong_relation_targets(E::MODEL);
514        self.save_entity_with_context_and_schema(
515            ctx,
516            save_rule,
517            schema,
518            schema_fingerprint,
519            validate_relations,
520            entity,
521        )
522    }
523
524    // Run one typed save against an already-resolved write context and
525    // preflight schema metadata so batch lanes do not repay cache lookups.
526    fn save_entity_with_context_and_schema(
527        &self,
528        ctx: &Context<'_, E>,
529        save_rule: SaveRule,
530        schema: &SchemaInfo,
531        schema_fingerprint: CommitSchemaFingerprint,
532        validate_relations: bool,
533        entity: E,
534    ) -> Result<E, InternalError> {
535        let mut span = Span::<E>::new(ExecKind::Save);
536        let (entity, marker_row_op) = self.prepare_entity_save_row_op(
537            ctx,
538            save_rule,
539            schema,
540            schema_fingerprint,
541            validate_relations,
542            entity,
543        )?;
544        let prepared_row_op =
545            prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
546                &self.db,
547                &marker_row_op,
548                ctx,
549                ctx,
550                schema_fingerprint,
551            )?;
552
553        // Phase 1: persist/apply one single-row commit through the shared
554        // commit-window path under the normal single-save metrics contract.
555        Self::commit_prepared_single_row(
556            &self.db,
557            marker_row_op,
558            prepared_row_op,
559            |delta| emit_index_delta_metrics::<E>(delta),
560            || {
561                span.set_rows(1);
562            },
563        )?;
564
565        Ok(entity)
566    }
567
568    // Prepare one typed save row op after canonical entity preflight so both
569    // single-row and batched non-atomic lanes share the same validation path.
570    fn prepare_entity_save_row_op(
571        &self,
572        ctx: &Context<'_, E>,
573        save_rule: SaveRule,
574        schema: &SchemaInfo,
575        schema_fingerprint: CommitSchemaFingerprint,
576        validate_relations: bool,
577        entity: E,
578    ) -> Result<(E, CommitRowOp), InternalError> {
579        let mut entity = entity;
580
581        // Phase 1: run canonical save preflight before key extraction so
582        // typed validation still owns the write contract.
583        self.preflight_entity_with_cached_schema(&mut entity, schema, validate_relations)?;
584        let marker_row_op =
585            Self::prepare_typed_entity_row_op(ctx, save_rule, &entity, schema_fingerprint)?;
586
587        Ok((entity, marker_row_op))
588    }
589
590    // Run one structural key + patch mutation under one explicit save-mode contract.
591    #[allow(dead_code)]
592    pub(in crate::db) fn apply_structural_mutation(
593        &self,
594        mode: MutationMode,
595        key: E::Key,
596        patch: UpdatePatch,
597    ) -> Result<E, InternalError> {
598        let mutation = MutationInput::from_update_patch::<E>(key, &patch)?;
599
600        self.save_structural_mutation(mode, mutation)
601    }
602
603    #[allow(dead_code)]
604    fn save_structural_mutation(
605        &self,
606        mode: MutationMode,
607        mutation: MutationInput,
608    ) -> Result<E, InternalError> {
609        let mut span = Span::<E>::new(ExecKind::Save);
610        let ctx = mutation_write_context::<E>(&self.db)?;
611        let data_key = mutation.data_key().clone();
612        let old_raw = Self::resolve_existing_row_for_rule(&ctx, &data_key, mode.save_rule())?;
613        let raw_after_image =
614            Self::build_structural_after_image_row(mode, &mutation, old_raw.as_ref())?;
615        let entity = self.validate_structural_after_image(&data_key, &raw_after_image)?;
616        let normalized_mutation = MutationInput::from_entity(&entity)?;
617        let row_bytes =
618            Self::build_structural_after_image_row(mode, &normalized_mutation, old_raw.as_ref())?;
619        let row_bytes = row_bytes.into_raw_row().into_bytes();
620        let before_bytes = old_raw.map(<RawRow as Storable>::into_bytes);
621        let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
622        let marker_row_op = CommitRowOp::new(
623            E::PATH,
624            data_key.to_raw()?,
625            before_bytes,
626            Some(row_bytes),
627            schema_fingerprint,
628        );
629        let prepared_row_op =
630            prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
631                &self.db,
632                &marker_row_op,
633                &ctx,
634                &ctx,
635                schema_fingerprint,
636            )?;
637
638        Self::commit_prepared_single_row(
639            &self.db,
640            marker_row_op,
641            prepared_row_op,
642            |delta| emit_index_delta_metrics::<E>(delta),
643            || {
644                span.set_rows(1);
645            },
646        )?;
647
648        Ok(entity)
649    }
650
651    // Validate one structurally patched after-image by decoding it against the
652    // target key and reusing the existing typed save preflight rules.
653    #[allow(dead_code)]
654    fn validate_structural_after_image(
655        &self,
656        data_key: &DataKey,
657        row: &RawRow,
658    ) -> Result<E, InternalError> {
659        let expected_key = data_key.try_key::<E>()?;
660        let mut entity = row.try_decode::<E>().map_err(|err| {
661            InternalError::mutation_structural_after_image_invalid(
662                E::PATH,
663                data_key,
664                err.to_string(),
665            )
666        })?;
667        let identity_key = entity.id().key();
668        if identity_key != expected_key {
669            let field_name = E::MODEL.primary_key().name();
670            let field_value = FieldValue::to_value(&identity_key);
671            let identity_value = FieldValue::to_value(&expected_key);
672
673            return Err(InternalError::mutation_entity_primary_key_mismatch(
674                E::PATH,
675                field_name,
676                &field_value,
677                &identity_value,
678            ));
679        }
680
681        self.preflight_entity(&mut entity)?;
682
683        Ok(entity)
684    }
685
686    // Open + apply commit mechanics for one logical row operation.
687    fn commit_prepared_single_row(
688        db: &Db<E::Canister>,
689        marker_row_op: CommitRowOp,
690        prepared_row_op: crate::db::commit::PreparedRowCommitOp,
691        on_index_applied: impl FnOnce(&PreparedRowOpDelta),
692        on_data_applied: impl FnOnce(),
693    ) -> Result<(), InternalError> {
694        let synchronized_store_handles = synchronized_store_handles_for_prepared_row_ops(
695            db,
696            std::slice::from_ref(&prepared_row_op),
697        );
698
699        // FIRST STABLE WRITE: commit marker is persisted before any mutations.
700        commit_prepared_single_save_row_op_with_window(
701            marker_row_op,
702            prepared_row_op,
703            synchronized_store_handles,
704            "save_row_apply",
705            on_index_applied,
706            || {
707                on_data_applied();
708            },
709        )?;
710
711        Ok(())
712    }
713
714    // Open + apply commit mechanics for an atomic staged row-op batch.
715    fn commit_atomic_batch(
716        db: &Db<E::Canister>,
717        marker_row_ops: Vec<CommitRowOp>,
718        span: &mut Span<E>,
719    ) -> Result<(), InternalError> {
720        let rows_touched = u64::try_from(marker_row_ops.len()).unwrap_or(u64::MAX);
721        commit_save_row_ops_with_window::<E>(
722            db,
723            marker_row_ops,
724            "save_batch_atomic_row_apply",
725            || {
726                span.set_rows(rows_touched);
727            },
728        )?;
729
730        Ok(())
731    }
732}
733
734// Fold one single-row prepared delta into one saturated batch accumulator.
735const fn accumulate_prepared_row_op_delta(
736    total: &mut PreparedRowOpDelta,
737    delta: &PreparedRowOpDelta,
738) {
739    total.index_inserts = total.index_inserts.saturating_add(delta.index_inserts);
740    total.index_removes = total.index_removes.saturating_add(delta.index_removes);
741    total.reverse_index_inserts = total
742        .reverse_index_inserts
743        .saturating_add(delta.reverse_index_inserts);
744    total.reverse_index_removes = total
745        .reverse_index_removes
746        .saturating_add(delta.reverse_index_removes);
747}