Skip to main content

icydb_core/db/executor/mutation/
save.rs

1//! Module: executor::mutation::save
2//! Responsibility: save-mode execution (`insert`/`update`/`replace`) and batch lanes.
3//! Does not own: relation-domain validation semantics or commit marker protocol internals.
4//! Boundary: save preflight + commit-window handoff for one entity type.
5
6use crate::{
7    db::{
8        Db,
9        commit::{
10            CommitRowOp, CommitSchemaFingerprint,
11            prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint,
12        },
13        data::{CanonicalRow, DataKey, PersistedRow, RawRow, UpdatePatch},
14        executor::{
15            Context, ExecutorError,
16            mutation::{
17                MutationInput, PreparedRowOpDelta, commit_prepared_single_save_row_op_with_window,
18                commit_save_row_ops_with_window, emit_index_delta_metrics, mutation_write_context,
19            },
20        },
21        relation::model_has_strong_relation_targets,
22        schema::{SchemaInfo, commit_schema_fingerprint_for_entity},
23    },
24    error::InternalError,
25    metrics::sink::{ExecKind, MetricsEvent, Span, record},
26    traits::{EntityValue, FieldValue, Storable},
27};
28use candid::CandidType;
29use derive_more::Display;
30use serde::{Deserialize, Serialize};
31use std::collections::HashSet;
32
33// Debug assertions below are diagnostic sentinels; correctness is enforced by
34// runtime validation earlier in the pipeline.
35
36//
37// SaveMode
38//
39// Create  : will only insert a row if it's empty
40// Replace : will change the row regardless of what was there
41// Update  : will only change an existing row
42//
43
44#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Display, Serialize)]
45enum SaveMode {
46    #[default]
47    Insert,
48    Replace,
49    Update,
50}
51
52//
53// SaveExecutor
54//
55
56#[derive(Clone, Copy)]
57pub(in crate::db) struct SaveExecutor<E: PersistedRow + EntityValue> {
58    pub(in crate::db::executor::mutation) db: Db<E::Canister>,
59}
60
61//
62// SaveRule
63//
64// Canonical save precondition for resolving the current row baseline.
65//
66#[derive(Clone, Copy)]
67enum SaveRule {
68    RequireAbsent,
69    RequirePresent,
70    AllowAny,
71}
72
73impl SaveRule {
74    const fn from_mode(mode: SaveMode) -> Self {
75        match mode {
76            SaveMode::Insert => Self::RequireAbsent,
77            SaveMode::Update => Self::RequirePresent,
78            SaveMode::Replace => Self::AllowAny,
79        }
80    }
81}
82
83//
84// MutationMode
85//
86// MutationMode
87//
88// MutationMode makes the structural patch path spell out the same
89// row-existence contract the typed save surface already owns.
90// This keeps future structural callers from smuggling write-mode meaning
91// through ad hoc helper choice once the seam moves beyond `icydb-core`.
92//
93
94#[derive(Clone, Copy)]
95pub enum MutationMode {
96    #[allow(dead_code)]
97    Insert,
98    #[allow(dead_code)]
99    Replace,
100    Update,
101}
102
103impl MutationMode {
104    const fn save_rule(self) -> SaveRule {
105        match self {
106            Self::Insert => SaveRule::RequireAbsent,
107            Self::Replace => SaveRule::AllowAny,
108            Self::Update => SaveRule::RequirePresent,
109        }
110    }
111}
112
113impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
114    // ======================================================================
115    // Construction & configuration
116    // ======================================================================
117
118    /// Construct one save executor bound to a database handle.
119    #[must_use]
120    pub(in crate::db) const fn new(db: Db<E::Canister>, _debug: bool) -> Self {
121        Self { db }
122    }
123
124    // ======================================================================
125    // Single-entity save operations
126    // ======================================================================
127
128    /// Insert a brand-new entity (errors if the key already exists).
129    pub(crate) fn insert(&self, entity: E) -> Result<E, InternalError> {
130        self.save_entity(SaveMode::Insert, entity)
131    }
132
133    /// Update an existing entity (errors if it does not exist).
134    pub(crate) fn update(&self, entity: E) -> Result<E, InternalError> {
135        self.save_entity(SaveMode::Update, entity)
136    }
137
138    /// Apply one structural field patch to an existing entity row.
139    ///
140    /// This entrypoint is intentionally staged ahead of the higher-level API
141    /// layer so the executor boundary can lock its invariants first.
142    #[allow(dead_code)]
143    pub(in crate::db) fn insert_structural(
144        &self,
145        key: E::Key,
146        patch: UpdatePatch,
147    ) -> Result<E, InternalError> {
148        self.apply_structural_mutation(MutationMode::Insert, key, patch)
149    }
150
151    /// Apply one structural full-row replacement, inserting if missing.
152    ///
153    /// Replace semantics deliberately rebuild the after-image from an empty row
154    /// layout so absent fields do not inherit old-row values implicitly.
155    #[allow(dead_code)]
156    pub(in crate::db) fn replace_structural(
157        &self,
158        key: E::Key,
159        patch: UpdatePatch,
160    ) -> Result<E, InternalError> {
161        self.apply_structural_mutation(MutationMode::Replace, key, patch)
162    }
163
164    /// Apply one structural field patch to an existing entity row.
165    ///
166    /// This entrypoint is intentionally staged ahead of the higher-level API
167    /// layer so the executor boundary can lock its invariants first.
168    #[allow(dead_code)]
169    pub(in crate::db) fn update_structural(
170        &self,
171        key: E::Key,
172        patch: UpdatePatch,
173    ) -> Result<E, InternalError> {
174        self.apply_structural_mutation(MutationMode::Update, key, patch)
175    }
176
177    /// Replace an entity, inserting if missing.
178    pub(crate) fn replace(&self, entity: E) -> Result<E, InternalError> {
179        self.save_entity(SaveMode::Replace, entity)
180    }
181
182    // ======================================================================
183    // Batch save operations (explicit atomic and non-atomic lanes)
184    // ======================================================================
185
186    /// Save a batch with explicitly non-atomic semantics.
187    ///
188    /// WARNING: this helper is fail-fast and non-atomic. If one element fails,
189    /// earlier elements in the batch remain committed.
190    fn save_batch_non_atomic(
191        &self,
192        mode: SaveMode,
193        entities: impl IntoIterator<Item = E>,
194    ) -> Result<Vec<E>, InternalError> {
195        let iter = entities.into_iter();
196        let mut out = Vec::with_capacity(iter.size_hint().0);
197        let ctx = mutation_write_context::<E>(&self.db)?;
198        let save_rule = SaveRule::from_mode(mode);
199        let schema = Self::schema_info()?;
200        let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
201        let validate_relations = model_has_strong_relation_targets(E::MODEL);
202        let mut batch_span = None;
203        let mut batch_delta = PreparedRowOpDelta {
204            index_inserts: 0,
205            index_removes: 0,
206            reverse_index_inserts: 0,
207            reverse_index_removes: 0,
208        };
209
210        // Phase 1: apply each row independently while reusing the same resolved
211        // mutation context and schema metadata across the whole batch.
212        for entity in iter {
213            let span = batch_span.get_or_insert_with(|| Span::<E>::new(ExecKind::Save));
214
215            let result = (|| {
216                let (saved, marker_row_op) = self.prepare_entity_save_row_op(
217                    &ctx,
218                    save_rule,
219                    schema,
220                    schema_fingerprint,
221                    validate_relations,
222                    entity,
223                )?;
224                let prepared_row_op =
225                    prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<
226                        E,
227                    >(&self.db, &marker_row_op, &ctx, &ctx, schema_fingerprint)?;
228                Self::commit_prepared_single_row(
229                    marker_row_op,
230                    prepared_row_op,
231                    |delta| accumulate_prepared_row_op_delta(&mut batch_delta, delta),
232                    || {},
233                )?;
234
235                Ok::<_, InternalError>(saved)
236            })();
237
238            match result {
239                Ok(saved) => {
240                    out.push(saved);
241                    span.set_rows(u64::try_from(out.len()).unwrap_or(u64::MAX));
242                }
243                Err(err) => {
244                    if !out.is_empty() {
245                        emit_index_delta_metrics::<E>(&batch_delta);
246                        record(MetricsEvent::NonAtomicPartialCommit {
247                            entity_path: E::PATH,
248                            committed_rows: u64::try_from(out.len()).unwrap_or(u64::MAX),
249                        });
250                    }
251
252                    return Err(err);
253                }
254            }
255        }
256
257        if !out.is_empty() {
258            emit_index_delta_metrics::<E>(&batch_delta);
259        }
260
261        Ok(out)
262    }
263
264    /// Save a single-entity-type batch atomically in a single commit window.
265    ///
266    /// All entities are prevalidated first; if any entity fails pre-commit validation,
267    /// no row in this batch is persisted.
268    ///
269    /// This is not a multi-entity transaction surface.
270    fn save_batch_atomic(
271        &self,
272        mode: SaveMode,
273        entities: impl IntoIterator<Item = E>,
274    ) -> Result<Vec<E>, InternalError> {
275        let entities: Vec<E> = entities.into_iter().collect();
276
277        self.save_batch_atomic_impl(SaveRule::from_mode(mode), entities)
278    }
279
280    /// Insert a single-entity-type batch atomically in one commit window.
281    ///
282    /// This API is not a multi-entity transaction surface.
283    pub(crate) fn insert_many_atomic(
284        &self,
285        entities: impl IntoIterator<Item = E>,
286    ) -> Result<Vec<E>, InternalError> {
287        self.save_batch_atomic(SaveMode::Insert, entities)
288    }
289
290    /// Update a single-entity-type batch atomically in one commit window.
291    ///
292    /// This API is not a multi-entity transaction surface.
293    pub(crate) fn update_many_atomic(
294        &self,
295        entities: impl IntoIterator<Item = E>,
296    ) -> Result<Vec<E>, InternalError> {
297        self.save_batch_atomic(SaveMode::Update, entities)
298    }
299
300    /// Replace a single-entity-type batch atomically in one commit window.
301    ///
302    /// This API is not a multi-entity transaction surface.
303    pub(crate) fn replace_many_atomic(
304        &self,
305        entities: impl IntoIterator<Item = E>,
306    ) -> Result<Vec<E>, InternalError> {
307        self.save_batch_atomic(SaveMode::Replace, entities)
308    }
309
310    /// Insert a batch with explicitly non-atomic semantics.
311    ///
312    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
313    pub(crate) fn insert_many_non_atomic(
314        &self,
315        entities: impl IntoIterator<Item = E>,
316    ) -> Result<Vec<E>, InternalError> {
317        self.save_batch_non_atomic(SaveMode::Insert, entities)
318    }
319
320    /// Update a batch with explicitly non-atomic semantics.
321    ///
322    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
323    pub(crate) fn update_many_non_atomic(
324        &self,
325        entities: impl IntoIterator<Item = E>,
326    ) -> Result<Vec<E>, InternalError> {
327        self.save_batch_non_atomic(SaveMode::Update, entities)
328    }
329
330    /// Replace a batch with explicitly non-atomic semantics.
331    ///
332    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
333    pub(crate) fn replace_many_non_atomic(
334        &self,
335        entities: impl IntoIterator<Item = E>,
336    ) -> Result<Vec<E>, InternalError> {
337        self.save_batch_non_atomic(SaveMode::Replace, entities)
338    }
339
340    // Keep the atomic batch body out of the iterator-generic wrapper so mode
341    // wrappers do not each own one copy of the full staging pipeline.
342    #[inline(never)]
343    fn save_batch_atomic_impl(
344        &self,
345        save_rule: SaveRule,
346        entities: Vec<E>,
347    ) -> Result<Vec<E>, InternalError> {
348        // Phase 1: validate + stage all row ops before opening the commit window.
349        let mut span = Span::<E>::new(ExecKind::Save);
350        let ctx = mutation_write_context::<E>(&self.db)?;
351        let mut out = Vec::with_capacity(entities.len());
352        let mut marker_row_ops = Vec::with_capacity(entities.len());
353        let mut seen_row_keys = HashSet::with_capacity(entities.len());
354        let schema = Self::schema_info()?;
355        let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
356        let validate_relations = model_has_strong_relation_targets(E::MODEL);
357
358        // Validate and stage all row ops before opening the commit window.
359        for mut entity in entities {
360            self.preflight_entity_with_cached_schema(&mut entity, schema, validate_relations)?;
361            let marker_row_op =
362                Self::prepare_typed_entity_row_op(&ctx, save_rule, &entity, schema_fingerprint)?;
363            if !seen_row_keys.insert(marker_row_op.key) {
364                let data_key = DataKey::try_new::<E>(entity.id().key())?;
365                return Err(InternalError::mutation_atomic_save_duplicate_key(
366                    E::PATH,
367                    data_key,
368                ));
369            }
370            marker_row_ops.push(marker_row_op);
371            out.push(entity);
372        }
373
374        if marker_row_ops.is_empty() {
375            return Ok(out);
376        }
377
378        // Phase 2: enter commit window and apply staged row ops atomically.
379        Self::commit_atomic_batch(&self.db, marker_row_ops, &mut span)?;
380
381        Ok(out)
382    }
383
384    // Build one logical row operation from a full typed after-image.
385    fn prepare_typed_entity_row_op(
386        ctx: &Context<'_, E>,
387        save_rule: SaveRule,
388        entity: &E,
389        schema_fingerprint: CommitSchemaFingerprint,
390    ) -> Result<CommitRowOp, InternalError> {
391        // Phase 1: resolve key + current-store baseline from the canonical save rule.
392        let data_key = DataKey::try_new::<E>(entity.id().key())?;
393        let raw_key = data_key.to_raw()?;
394        let old_raw = Self::resolve_existing_row_for_rule(ctx, &data_key, save_rule)?;
395
396        // Phase 2: typed save lanes already own a complete after-image, so
397        // emit the canonical row directly instead of replaying a dense slot
398        // patch back into the same full row image.
399        let row_bytes = CanonicalRow::from_entity(entity)?
400            .into_raw_row()
401            .into_bytes();
402        let before_bytes = old_raw.map(<RawRow as Storable>::into_bytes);
403        let row_op = CommitRowOp::new(
404            E::PATH,
405            raw_key,
406            before_bytes,
407            Some(row_bytes),
408            schema_fingerprint,
409        );
410
411        Ok(row_op)
412    }
413
414    // Build the persisted after-image under one explicit structural mode.
415    fn build_structural_after_image_row(
416        mode: MutationMode,
417        mutation: &MutationInput,
418        old_row: Option<&RawRow>,
419    ) -> Result<CanonicalRow, InternalError> {
420        match mode {
421            MutationMode::Update => {
422                let Some(old_row) = old_row else {
423                    return RawRow::from_serialized_update_patch(
424                        E::MODEL,
425                        mutation.serialized_patch(),
426                    );
427                };
428
429                old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
430            }
431            MutationMode::Insert | MutationMode::Replace => {
432                RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch())
433            }
434        }
435    }
436
437    // Resolve the "before" row according to one canonical save rule.
438    fn resolve_existing_row_for_rule(
439        ctx: &Context<'_, E>,
440        data_key: &DataKey,
441        save_rule: SaveRule,
442    ) -> Result<Option<RawRow>, InternalError> {
443        let raw_key = data_key.to_raw()?;
444
445        match save_rule {
446            SaveRule::RequireAbsent => {
447                if let Some(existing) = ctx.with_store(|store| store.get(&raw_key))? {
448                    Self::validate_existing_row_identity(data_key, &existing)?;
449                    return Err(ExecutorError::KeyExists(data_key.clone()).into());
450                }
451
452                Ok(None)
453            }
454            SaveRule::RequirePresent => {
455                let old_row = ctx
456                    .with_store(|store| store.get(&raw_key))?
457                    .ok_or_else(|| InternalError::store_not_found(data_key.to_string()))?;
458                Self::validate_existing_row_identity(data_key, &old_row)?;
459
460                Ok(Some(old_row))
461            }
462            SaveRule::AllowAny => {
463                let old_row = ctx.with_store(|store| store.get(&raw_key))?;
464                if let Some(old) = old_row.as_ref() {
465                    Self::validate_existing_row_identity(data_key, old)?;
466                }
467
468                Ok(old_row)
469            }
470        }
471    }
472
473    // Decode an existing row and verify it is consistent with the target data key.
474    fn validate_existing_row_identity(
475        data_key: &DataKey,
476        row: &RawRow,
477    ) -> Result<(), InternalError> {
478        Self::ensure_persisted_row_invariants(data_key, row).map_err(|err| {
479            match (err.class(), err.origin()) {
480                (
481                    crate::error::ErrorClass::Corruption,
482                    crate::error::ErrorOrigin::Serialize | crate::error::ErrorOrigin::Store,
483                ) => err,
484                _ => InternalError::from(ExecutorError::persisted_row_invariant_violation(
485                    data_key,
486                    &err.message,
487                )),
488            }
489        })?;
490
491        Ok(())
492    }
493
494    fn save_entity(&self, mode: SaveMode, entity: E) -> Result<E, InternalError> {
495        let ctx = mutation_write_context::<E>(&self.db)?;
496        let save_rule = SaveRule::from_mode(mode);
497
498        self.save_entity_with_context(&ctx, save_rule, entity)
499    }
500
501    // Run one typed save against an already-resolved write context so batch
502    // non-atomic lanes do not rebuild the same store authority for every row.
503    fn save_entity_with_context(
504        &self,
505        ctx: &Context<'_, E>,
506        save_rule: SaveRule,
507        entity: E,
508    ) -> Result<E, InternalError> {
509        let schema = Self::schema_info()?;
510        let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
511        let validate_relations = model_has_strong_relation_targets(E::MODEL);
512        self.save_entity_with_context_and_schema(
513            ctx,
514            save_rule,
515            schema,
516            schema_fingerprint,
517            validate_relations,
518            entity,
519        )
520    }
521
522    // Run one typed save against an already-resolved write context and
523    // preflight schema metadata so batch lanes do not repay cache lookups.
524    fn save_entity_with_context_and_schema(
525        &self,
526        ctx: &Context<'_, E>,
527        save_rule: SaveRule,
528        schema: &SchemaInfo,
529        schema_fingerprint: CommitSchemaFingerprint,
530        validate_relations: bool,
531        entity: E,
532    ) -> Result<E, InternalError> {
533        let mut span = Span::<E>::new(ExecKind::Save);
534        let (entity, marker_row_op) = self.prepare_entity_save_row_op(
535            ctx,
536            save_rule,
537            schema,
538            schema_fingerprint,
539            validate_relations,
540            entity,
541        )?;
542        let prepared_row_op =
543            prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
544                &self.db,
545                &marker_row_op,
546                ctx,
547                ctx,
548                schema_fingerprint,
549            )?;
550
551        // Phase 1: persist/apply one single-row commit through the shared
552        // commit-window path under the normal single-save metrics contract.
553        Self::commit_prepared_single_row(
554            marker_row_op,
555            prepared_row_op,
556            |delta| emit_index_delta_metrics::<E>(delta),
557            || {
558                span.set_rows(1);
559            },
560        )?;
561
562        Ok(entity)
563    }
564
565    // Prepare one typed save row op after canonical entity preflight so both
566    // single-row and batched non-atomic lanes share the same validation path.
567    fn prepare_entity_save_row_op(
568        &self,
569        ctx: &Context<'_, E>,
570        save_rule: SaveRule,
571        schema: &SchemaInfo,
572        schema_fingerprint: CommitSchemaFingerprint,
573        validate_relations: bool,
574        entity: E,
575    ) -> Result<(E, CommitRowOp), InternalError> {
576        let mut entity = entity;
577
578        // Phase 1: run canonical save preflight before key extraction so
579        // typed validation still owns the write contract.
580        self.preflight_entity_with_cached_schema(&mut entity, schema, validate_relations)?;
581        let marker_row_op =
582            Self::prepare_typed_entity_row_op(ctx, save_rule, &entity, schema_fingerprint)?;
583
584        Ok((entity, marker_row_op))
585    }
586
587    // Run one structural key + patch mutation under one explicit save-mode contract.
588    #[allow(dead_code)]
589    pub(in crate::db) fn apply_structural_mutation(
590        &self,
591        mode: MutationMode,
592        key: E::Key,
593        patch: UpdatePatch,
594    ) -> Result<E, InternalError> {
595        let mutation = MutationInput::from_update_patch::<E>(key, &patch)?;
596
597        self.save_structural_mutation(mode, mutation)
598    }
599
600    #[allow(dead_code)]
601    fn save_structural_mutation(
602        &self,
603        mode: MutationMode,
604        mutation: MutationInput,
605    ) -> Result<E, InternalError> {
606        let mut span = Span::<E>::new(ExecKind::Save);
607        let ctx = mutation_write_context::<E>(&self.db)?;
608        let data_key = mutation.data_key().clone();
609        let old_raw = Self::resolve_existing_row_for_rule(&ctx, &data_key, mode.save_rule())?;
610        let raw_after_image =
611            Self::build_structural_after_image_row(mode, &mutation, old_raw.as_ref())?;
612        let entity = self.validate_structural_after_image(&data_key, &raw_after_image)?;
613        let normalized_mutation = MutationInput::from_entity(&entity)?;
614        let row_bytes =
615            Self::build_structural_after_image_row(mode, &normalized_mutation, old_raw.as_ref())?;
616        let row_bytes = row_bytes.into_raw_row().into_bytes();
617        let before_bytes = old_raw.map(<RawRow as Storable>::into_bytes);
618        let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
619        let marker_row_op = CommitRowOp::new(
620            E::PATH,
621            data_key.to_raw()?,
622            before_bytes,
623            Some(row_bytes),
624            schema_fingerprint,
625        );
626        let prepared_row_op =
627            prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
628                &self.db,
629                &marker_row_op,
630                &ctx,
631                &ctx,
632                schema_fingerprint,
633            )?;
634
635        Self::commit_prepared_single_row(
636            marker_row_op,
637            prepared_row_op,
638            |delta| emit_index_delta_metrics::<E>(delta),
639            || {
640                span.set_rows(1);
641            },
642        )?;
643
644        Ok(entity)
645    }
646
647    // Validate one structurally patched after-image by decoding it against the
648    // target key and reusing the existing typed save preflight rules.
649    #[allow(dead_code)]
650    fn validate_structural_after_image(
651        &self,
652        data_key: &DataKey,
653        row: &RawRow,
654    ) -> Result<E, InternalError> {
655        let expected_key = data_key.try_key::<E>()?;
656        let mut entity = row.try_decode::<E>().map_err(|err| {
657            InternalError::mutation_structural_after_image_invalid(
658                E::PATH,
659                data_key,
660                err.to_string(),
661            )
662        })?;
663        let identity_key = entity.id().key();
664        if identity_key != expected_key {
665            let field_name = E::MODEL.primary_key().name();
666            let field_value = FieldValue::to_value(&identity_key);
667            let identity_value = FieldValue::to_value(&expected_key);
668
669            return Err(InternalError::mutation_entity_primary_key_mismatch(
670                E::PATH,
671                field_name,
672                &field_value,
673                &identity_value,
674            ));
675        }
676
677        self.preflight_entity(&mut entity)?;
678
679        Ok(entity)
680    }
681
682    // Open + apply commit mechanics for one logical row operation.
683    fn commit_prepared_single_row(
684        marker_row_op: CommitRowOp,
685        prepared_row_op: crate::db::commit::PreparedRowCommitOp,
686        on_index_applied: impl FnOnce(&PreparedRowOpDelta),
687        on_data_applied: impl FnOnce(),
688    ) -> Result<(), InternalError> {
689        // FIRST STABLE WRITE: commit marker is persisted before any mutations.
690        commit_prepared_single_save_row_op_with_window(
691            marker_row_op,
692            prepared_row_op,
693            "save_row_apply",
694            on_index_applied,
695            || {
696                on_data_applied();
697            },
698        )?;
699
700        Ok(())
701    }
702
703    // Open + apply commit mechanics for an atomic staged row-op batch.
704    fn commit_atomic_batch(
705        db: &Db<E::Canister>,
706        marker_row_ops: Vec<CommitRowOp>,
707        span: &mut Span<E>,
708    ) -> Result<(), InternalError> {
709        let rows_touched = u64::try_from(marker_row_ops.len()).unwrap_or(u64::MAX);
710        commit_save_row_ops_with_window::<E>(
711            db,
712            marker_row_ops,
713            "save_batch_atomic_row_apply",
714            || {
715                span.set_rows(rows_touched);
716            },
717        )?;
718
719        Ok(())
720    }
721}
722
723// Fold one single-row prepared delta into one saturated batch accumulator.
724const fn accumulate_prepared_row_op_delta(
725    total: &mut PreparedRowOpDelta,
726    delta: &PreparedRowOpDelta,
727) {
728    total.index_inserts = total.index_inserts.saturating_add(delta.index_inserts);
729    total.index_removes = total.index_removes.saturating_add(delta.index_removes);
730    total.reverse_index_inserts = total
731        .reverse_index_inserts
732        .saturating_add(delta.reverse_index_inserts);
733    total.reverse_index_removes = total
734        .reverse_index_removes
735        .saturating_add(delta.reverse_index_removes);
736}