1use crate::{
7 db::{
8 Db,
9 commit::{
10 CommitRowOp, CommitSchemaFingerprint,
11 prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint,
12 },
13 data::{CanonicalRow, DataKey, PersistedRow, RawRow, UpdatePatch},
14 executor::{
15 Context, ExecutorError,
16 mutation::{
17 MutationInput, PreparedRowOpDelta, commit_prepared_single_save_row_op_with_window,
18 commit_save_row_ops_with_window, emit_index_delta_metrics, mutation_write_context,
19 },
20 },
21 relation::model_has_strong_relation_targets,
22 schema::{SchemaInfo, commit_schema_fingerprint_for_entity},
23 },
24 error::InternalError,
25 metrics::sink::{ExecKind, MetricsEvent, Span, record},
26 traits::{EntityValue, FieldValue, Storable},
27};
28use candid::CandidType;
29use derive_more::Display;
30use serde::{Deserialize, Serialize};
31use std::collections::HashSet;
32
33#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Display, Serialize)]
45enum SaveMode {
46 #[default]
47 Insert,
48 Replace,
49 Update,
50}
51
52#[derive(Clone, Copy)]
57pub(in crate::db) struct SaveExecutor<E: PersistedRow + EntityValue> {
58 pub(in crate::db::executor::mutation) db: Db<E::Canister>,
59}
60
61#[derive(Clone, Copy)]
67enum SaveRule {
68 RequireAbsent,
69 RequirePresent,
70 AllowAny,
71}
72
73impl SaveRule {
74 const fn from_mode(mode: SaveMode) -> Self {
75 match mode {
76 SaveMode::Insert => Self::RequireAbsent,
77 SaveMode::Update => Self::RequirePresent,
78 SaveMode::Replace => Self::AllowAny,
79 }
80 }
81}
82
83#[derive(Clone, Copy)]
95pub enum MutationMode {
96 #[allow(dead_code)]
97 Insert,
98 #[allow(dead_code)]
99 Replace,
100 Update,
101}
102
103impl MutationMode {
104 const fn save_rule(self) -> SaveRule {
105 match self {
106 Self::Insert => SaveRule::RequireAbsent,
107 Self::Replace => SaveRule::AllowAny,
108 Self::Update => SaveRule::RequirePresent,
109 }
110 }
111}
112
113impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
114 #[must_use]
120 pub(in crate::db) const fn new(db: Db<E::Canister>, _debug: bool) -> Self {
121 Self { db }
122 }
123
124 pub(crate) fn insert(&self, entity: E) -> Result<E, InternalError> {
130 self.save_entity(SaveMode::Insert, entity)
131 }
132
133 pub(crate) fn update(&self, entity: E) -> Result<E, InternalError> {
135 self.save_entity(SaveMode::Update, entity)
136 }
137
138 #[allow(dead_code)]
143 pub(in crate::db) fn insert_structural(
144 &self,
145 key: E::Key,
146 patch: UpdatePatch,
147 ) -> Result<E, InternalError> {
148 self.apply_structural_mutation(MutationMode::Insert, key, patch)
149 }
150
151 #[allow(dead_code)]
156 pub(in crate::db) fn replace_structural(
157 &self,
158 key: E::Key,
159 patch: UpdatePatch,
160 ) -> Result<E, InternalError> {
161 self.apply_structural_mutation(MutationMode::Replace, key, patch)
162 }
163
164 #[allow(dead_code)]
169 pub(in crate::db) fn update_structural(
170 &self,
171 key: E::Key,
172 patch: UpdatePatch,
173 ) -> Result<E, InternalError> {
174 self.apply_structural_mutation(MutationMode::Update, key, patch)
175 }
176
177 pub(crate) fn replace(&self, entity: E) -> Result<E, InternalError> {
179 self.save_entity(SaveMode::Replace, entity)
180 }
181
182 fn save_batch_non_atomic(
191 &self,
192 mode: SaveMode,
193 entities: impl IntoIterator<Item = E>,
194 ) -> Result<Vec<E>, InternalError> {
195 let iter = entities.into_iter();
196 let mut out = Vec::with_capacity(iter.size_hint().0);
197 let ctx = mutation_write_context::<E>(&self.db)?;
198 let save_rule = SaveRule::from_mode(mode);
199 let schema = Self::schema_info()?;
200 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
201 let validate_relations = model_has_strong_relation_targets(E::MODEL);
202 let mut batch_span = None;
203 let mut batch_delta = PreparedRowOpDelta {
204 index_inserts: 0,
205 index_removes: 0,
206 reverse_index_inserts: 0,
207 reverse_index_removes: 0,
208 };
209
210 for entity in iter {
213 let span = batch_span.get_or_insert_with(|| Span::<E>::new(ExecKind::Save));
214
215 let result = (|| {
216 let (saved, marker_row_op) = self.prepare_entity_save_row_op(
217 &ctx,
218 save_rule,
219 schema,
220 schema_fingerprint,
221 validate_relations,
222 entity,
223 )?;
224 let prepared_row_op =
225 prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<
226 E,
227 >(&self.db, &marker_row_op, &ctx, &ctx, schema_fingerprint)?;
228 Self::commit_prepared_single_row(
229 marker_row_op,
230 prepared_row_op,
231 |delta| accumulate_prepared_row_op_delta(&mut batch_delta, delta),
232 || {},
233 )?;
234
235 Ok::<_, InternalError>(saved)
236 })();
237
238 match result {
239 Ok(saved) => {
240 out.push(saved);
241 span.set_rows(u64::try_from(out.len()).unwrap_or(u64::MAX));
242 }
243 Err(err) => {
244 if !out.is_empty() {
245 emit_index_delta_metrics::<E>(&batch_delta);
246 record(MetricsEvent::NonAtomicPartialCommit {
247 entity_path: E::PATH,
248 committed_rows: u64::try_from(out.len()).unwrap_or(u64::MAX),
249 });
250 }
251
252 return Err(err);
253 }
254 }
255 }
256
257 if !out.is_empty() {
258 emit_index_delta_metrics::<E>(&batch_delta);
259 }
260
261 Ok(out)
262 }
263
264 fn save_batch_atomic(
271 &self,
272 mode: SaveMode,
273 entities: impl IntoIterator<Item = E>,
274 ) -> Result<Vec<E>, InternalError> {
275 let entities: Vec<E> = entities.into_iter().collect();
276
277 self.save_batch_atomic_impl(SaveRule::from_mode(mode), entities)
278 }
279
280 pub(crate) fn insert_many_atomic(
284 &self,
285 entities: impl IntoIterator<Item = E>,
286 ) -> Result<Vec<E>, InternalError> {
287 self.save_batch_atomic(SaveMode::Insert, entities)
288 }
289
290 pub(crate) fn update_many_atomic(
294 &self,
295 entities: impl IntoIterator<Item = E>,
296 ) -> Result<Vec<E>, InternalError> {
297 self.save_batch_atomic(SaveMode::Update, entities)
298 }
299
300 pub(crate) fn replace_many_atomic(
304 &self,
305 entities: impl IntoIterator<Item = E>,
306 ) -> Result<Vec<E>, InternalError> {
307 self.save_batch_atomic(SaveMode::Replace, entities)
308 }
309
310 pub(crate) fn insert_many_non_atomic(
314 &self,
315 entities: impl IntoIterator<Item = E>,
316 ) -> Result<Vec<E>, InternalError> {
317 self.save_batch_non_atomic(SaveMode::Insert, entities)
318 }
319
320 pub(crate) fn update_many_non_atomic(
324 &self,
325 entities: impl IntoIterator<Item = E>,
326 ) -> Result<Vec<E>, InternalError> {
327 self.save_batch_non_atomic(SaveMode::Update, entities)
328 }
329
330 pub(crate) fn replace_many_non_atomic(
334 &self,
335 entities: impl IntoIterator<Item = E>,
336 ) -> Result<Vec<E>, InternalError> {
337 self.save_batch_non_atomic(SaveMode::Replace, entities)
338 }
339
340 #[inline(never)]
343 fn save_batch_atomic_impl(
344 &self,
345 save_rule: SaveRule,
346 entities: Vec<E>,
347 ) -> Result<Vec<E>, InternalError> {
348 let mut span = Span::<E>::new(ExecKind::Save);
350 let ctx = mutation_write_context::<E>(&self.db)?;
351 let mut out = Vec::with_capacity(entities.len());
352 let mut marker_row_ops = Vec::with_capacity(entities.len());
353 let mut seen_row_keys = HashSet::with_capacity(entities.len());
354 let schema = Self::schema_info()?;
355 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
356 let validate_relations = model_has_strong_relation_targets(E::MODEL);
357
358 for mut entity in entities {
360 self.preflight_entity_with_cached_schema(&mut entity, schema, validate_relations)?;
361 let marker_row_op =
362 Self::prepare_typed_entity_row_op(&ctx, save_rule, &entity, schema_fingerprint)?;
363 if !seen_row_keys.insert(marker_row_op.key) {
364 let data_key = DataKey::try_new::<E>(entity.id().key())?;
365 return Err(InternalError::mutation_atomic_save_duplicate_key(
366 E::PATH,
367 data_key,
368 ));
369 }
370 marker_row_ops.push(marker_row_op);
371 out.push(entity);
372 }
373
374 if marker_row_ops.is_empty() {
375 return Ok(out);
376 }
377
378 Self::commit_atomic_batch(&self.db, marker_row_ops, &mut span)?;
380
381 Ok(out)
382 }
383
384 fn prepare_typed_entity_row_op(
386 ctx: &Context<'_, E>,
387 save_rule: SaveRule,
388 entity: &E,
389 schema_fingerprint: CommitSchemaFingerprint,
390 ) -> Result<CommitRowOp, InternalError> {
391 let data_key = DataKey::try_new::<E>(entity.id().key())?;
393 let raw_key = data_key.to_raw()?;
394 let old_raw = Self::resolve_existing_row_for_rule(ctx, &data_key, save_rule)?;
395
396 let row_bytes = CanonicalRow::from_entity(entity)?
400 .into_raw_row()
401 .into_bytes();
402 let before_bytes = old_raw.map(<RawRow as Storable>::into_bytes);
403 let row_op = CommitRowOp::new(
404 E::PATH,
405 raw_key,
406 before_bytes,
407 Some(row_bytes),
408 schema_fingerprint,
409 );
410
411 Ok(row_op)
412 }
413
414 fn build_structural_after_image_row(
416 mode: MutationMode,
417 mutation: &MutationInput,
418 old_row: Option<&RawRow>,
419 ) -> Result<CanonicalRow, InternalError> {
420 match mode {
421 MutationMode::Update => {
422 let Some(old_row) = old_row else {
423 return RawRow::from_serialized_update_patch(
424 E::MODEL,
425 mutation.serialized_patch(),
426 );
427 };
428
429 old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
430 }
431 MutationMode::Insert | MutationMode::Replace => {
432 RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch())
433 }
434 }
435 }
436
437 fn resolve_existing_row_for_rule(
439 ctx: &Context<'_, E>,
440 data_key: &DataKey,
441 save_rule: SaveRule,
442 ) -> Result<Option<RawRow>, InternalError> {
443 let raw_key = data_key.to_raw()?;
444
445 match save_rule {
446 SaveRule::RequireAbsent => {
447 if let Some(existing) = ctx.with_store(|store| store.get(&raw_key))? {
448 Self::validate_existing_row_identity(data_key, &existing)?;
449 return Err(ExecutorError::KeyExists(data_key.clone()).into());
450 }
451
452 Ok(None)
453 }
454 SaveRule::RequirePresent => {
455 let old_row = ctx
456 .with_store(|store| store.get(&raw_key))?
457 .ok_or_else(|| InternalError::store_not_found(data_key.to_string()))?;
458 Self::validate_existing_row_identity(data_key, &old_row)?;
459
460 Ok(Some(old_row))
461 }
462 SaveRule::AllowAny => {
463 let old_row = ctx.with_store(|store| store.get(&raw_key))?;
464 if let Some(old) = old_row.as_ref() {
465 Self::validate_existing_row_identity(data_key, old)?;
466 }
467
468 Ok(old_row)
469 }
470 }
471 }
472
473 fn validate_existing_row_identity(
475 data_key: &DataKey,
476 row: &RawRow,
477 ) -> Result<(), InternalError> {
478 Self::ensure_persisted_row_invariants(data_key, row).map_err(|err| {
479 match (err.class(), err.origin()) {
480 (
481 crate::error::ErrorClass::Corruption,
482 crate::error::ErrorOrigin::Serialize | crate::error::ErrorOrigin::Store,
483 ) => err,
484 _ => InternalError::from(ExecutorError::persisted_row_invariant_violation(
485 data_key,
486 &err.message,
487 )),
488 }
489 })?;
490
491 Ok(())
492 }
493
494 fn save_entity(&self, mode: SaveMode, entity: E) -> Result<E, InternalError> {
495 let ctx = mutation_write_context::<E>(&self.db)?;
496 let save_rule = SaveRule::from_mode(mode);
497
498 self.save_entity_with_context(&ctx, save_rule, entity)
499 }
500
501 fn save_entity_with_context(
504 &self,
505 ctx: &Context<'_, E>,
506 save_rule: SaveRule,
507 entity: E,
508 ) -> Result<E, InternalError> {
509 let schema = Self::schema_info()?;
510 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
511 let validate_relations = model_has_strong_relation_targets(E::MODEL);
512 self.save_entity_with_context_and_schema(
513 ctx,
514 save_rule,
515 schema,
516 schema_fingerprint,
517 validate_relations,
518 entity,
519 )
520 }
521
522 fn save_entity_with_context_and_schema(
525 &self,
526 ctx: &Context<'_, E>,
527 save_rule: SaveRule,
528 schema: &SchemaInfo,
529 schema_fingerprint: CommitSchemaFingerprint,
530 validate_relations: bool,
531 entity: E,
532 ) -> Result<E, InternalError> {
533 let mut span = Span::<E>::new(ExecKind::Save);
534 let (entity, marker_row_op) = self.prepare_entity_save_row_op(
535 ctx,
536 save_rule,
537 schema,
538 schema_fingerprint,
539 validate_relations,
540 entity,
541 )?;
542 let prepared_row_op =
543 prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
544 &self.db,
545 &marker_row_op,
546 ctx,
547 ctx,
548 schema_fingerprint,
549 )?;
550
551 Self::commit_prepared_single_row(
554 marker_row_op,
555 prepared_row_op,
556 |delta| emit_index_delta_metrics::<E>(delta),
557 || {
558 span.set_rows(1);
559 },
560 )?;
561
562 Ok(entity)
563 }
564
565 fn prepare_entity_save_row_op(
568 &self,
569 ctx: &Context<'_, E>,
570 save_rule: SaveRule,
571 schema: &SchemaInfo,
572 schema_fingerprint: CommitSchemaFingerprint,
573 validate_relations: bool,
574 entity: E,
575 ) -> Result<(E, CommitRowOp), InternalError> {
576 let mut entity = entity;
577
578 self.preflight_entity_with_cached_schema(&mut entity, schema, validate_relations)?;
581 let marker_row_op =
582 Self::prepare_typed_entity_row_op(ctx, save_rule, &entity, schema_fingerprint)?;
583
584 Ok((entity, marker_row_op))
585 }
586
587 #[allow(dead_code)]
589 pub(in crate::db) fn apply_structural_mutation(
590 &self,
591 mode: MutationMode,
592 key: E::Key,
593 patch: UpdatePatch,
594 ) -> Result<E, InternalError> {
595 let mutation = MutationInput::from_update_patch::<E>(key, &patch)?;
596
597 self.save_structural_mutation(mode, mutation)
598 }
599
600 #[allow(dead_code)]
601 fn save_structural_mutation(
602 &self,
603 mode: MutationMode,
604 mutation: MutationInput,
605 ) -> Result<E, InternalError> {
606 let mut span = Span::<E>::new(ExecKind::Save);
607 let ctx = mutation_write_context::<E>(&self.db)?;
608 let data_key = mutation.data_key().clone();
609 let old_raw = Self::resolve_existing_row_for_rule(&ctx, &data_key, mode.save_rule())?;
610 let raw_after_image =
611 Self::build_structural_after_image_row(mode, &mutation, old_raw.as_ref())?;
612 let entity = self.validate_structural_after_image(&data_key, &raw_after_image)?;
613 let normalized_mutation = MutationInput::from_entity(&entity)?;
614 let row_bytes =
615 Self::build_structural_after_image_row(mode, &normalized_mutation, old_raw.as_ref())?;
616 let row_bytes = row_bytes.into_raw_row().into_bytes();
617 let before_bytes = old_raw.map(<RawRow as Storable>::into_bytes);
618 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
619 let marker_row_op = CommitRowOp::new(
620 E::PATH,
621 data_key.to_raw()?,
622 before_bytes,
623 Some(row_bytes),
624 schema_fingerprint,
625 );
626 let prepared_row_op =
627 prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
628 &self.db,
629 &marker_row_op,
630 &ctx,
631 &ctx,
632 schema_fingerprint,
633 )?;
634
635 Self::commit_prepared_single_row(
636 marker_row_op,
637 prepared_row_op,
638 |delta| emit_index_delta_metrics::<E>(delta),
639 || {
640 span.set_rows(1);
641 },
642 )?;
643
644 Ok(entity)
645 }
646
647 #[allow(dead_code)]
650 fn validate_structural_after_image(
651 &self,
652 data_key: &DataKey,
653 row: &RawRow,
654 ) -> Result<E, InternalError> {
655 let expected_key = data_key.try_key::<E>()?;
656 let mut entity = row.try_decode::<E>().map_err(|err| {
657 InternalError::mutation_structural_after_image_invalid(
658 E::PATH,
659 data_key,
660 err.to_string(),
661 )
662 })?;
663 let identity_key = entity.id().key();
664 if identity_key != expected_key {
665 let field_name = E::MODEL.primary_key().name();
666 let field_value = FieldValue::to_value(&identity_key);
667 let identity_value = FieldValue::to_value(&expected_key);
668
669 return Err(InternalError::mutation_entity_primary_key_mismatch(
670 E::PATH,
671 field_name,
672 &field_value,
673 &identity_value,
674 ));
675 }
676
677 self.preflight_entity(&mut entity)?;
678
679 Ok(entity)
680 }
681
682 fn commit_prepared_single_row(
684 marker_row_op: CommitRowOp,
685 prepared_row_op: crate::db::commit::PreparedRowCommitOp,
686 on_index_applied: impl FnOnce(&PreparedRowOpDelta),
687 on_data_applied: impl FnOnce(),
688 ) -> Result<(), InternalError> {
689 commit_prepared_single_save_row_op_with_window(
691 marker_row_op,
692 prepared_row_op,
693 "save_row_apply",
694 on_index_applied,
695 || {
696 on_data_applied();
697 },
698 )?;
699
700 Ok(())
701 }
702
703 fn commit_atomic_batch(
705 db: &Db<E::Canister>,
706 marker_row_ops: Vec<CommitRowOp>,
707 span: &mut Span<E>,
708 ) -> Result<(), InternalError> {
709 let rows_touched = u64::try_from(marker_row_ops.len()).unwrap_or(u64::MAX);
710 commit_save_row_ops_with_window::<E>(
711 db,
712 marker_row_ops,
713 "save_batch_atomic_row_apply",
714 || {
715 span.set_rows(rows_touched);
716 },
717 )?;
718
719 Ok(())
720 }
721}
722
723const fn accumulate_prepared_row_op_delta(
725 total: &mut PreparedRowOpDelta,
726 delta: &PreparedRowOpDelta,
727) {
728 total.index_inserts = total.index_inserts.saturating_add(delta.index_inserts);
729 total.index_removes = total.index_removes.saturating_add(delta.index_removes);
730 total.reverse_index_inserts = total
731 .reverse_index_inserts
732 .saturating_add(delta.reverse_index_inserts);
733 total.reverse_index_removes = total
734 .reverse_index_removes
735 .saturating_add(delta.reverse_index_removes);
736}