1use crate::{
7 db::{
8 Db,
9 commit::{
10 CommitRowOp, CommitSchemaFingerprint,
11 prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint,
12 },
13 data::{CanonicalRow, DataKey, PersistedRow, RawRow, UpdatePatch},
14 executor::{
15 Context, ExecutorError,
16 mutation::{
17 MutationInput, PreparedRowOpDelta, commit_prepared_single_save_row_op_with_window,
18 commit_save_row_ops_with_window, emit_index_delta_metrics, mutation_write_context,
19 synchronized_store_handles_for_prepared_row_ops,
20 },
21 },
22 relation::model_has_strong_relation_targets,
23 schema::{SchemaInfo, commit_schema_fingerprint_for_entity},
24 },
25 error::InternalError,
26 metrics::sink::{ExecKind, MetricsEvent, Span, record},
27 traits::{EntityValue, FieldValue, Storable},
28};
29use candid::CandidType;
30use derive_more::Display;
31use serde::{Deserialize, Serialize};
32use std::collections::HashSet;
33
34#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Display, Serialize)]
46enum SaveMode {
47 #[default]
48 Insert,
49 Replace,
50 Update,
51}
52
53#[derive(Clone, Copy)]
58pub(in crate::db) struct SaveExecutor<E: PersistedRow + EntityValue> {
59 pub(in crate::db::executor::mutation) db: Db<E::Canister>,
60}
61
62#[derive(Clone, Copy)]
68enum SaveRule {
69 RequireAbsent,
70 RequirePresent,
71 AllowAny,
72}
73
74impl SaveRule {
75 const fn from_mode(mode: SaveMode) -> Self {
76 match mode {
77 SaveMode::Insert => Self::RequireAbsent,
78 SaveMode::Update => Self::RequirePresent,
79 SaveMode::Replace => Self::AllowAny,
80 }
81 }
82}
83
84#[derive(Clone, Copy)]
96pub enum MutationMode {
97 #[allow(dead_code)]
98 Insert,
99 #[allow(dead_code)]
100 Replace,
101 Update,
102}
103
104impl MutationMode {
105 const fn save_rule(self) -> SaveRule {
106 match self {
107 Self::Insert => SaveRule::RequireAbsent,
108 Self::Replace => SaveRule::AllowAny,
109 Self::Update => SaveRule::RequirePresent,
110 }
111 }
112}
113
114impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
115 #[must_use]
121 pub(in crate::db) const fn new(db: Db<E::Canister>, _debug: bool) -> Self {
122 Self { db }
123 }
124
125 pub(crate) fn insert(&self, entity: E) -> Result<E, InternalError> {
131 self.save_entity(SaveMode::Insert, entity)
132 }
133
134 pub(crate) fn update(&self, entity: E) -> Result<E, InternalError> {
136 self.save_entity(SaveMode::Update, entity)
137 }
138
139 #[allow(dead_code)]
144 pub(in crate::db) fn insert_structural(
145 &self,
146 key: E::Key,
147 patch: UpdatePatch,
148 ) -> Result<E, InternalError> {
149 self.apply_structural_mutation(MutationMode::Insert, key, patch)
150 }
151
152 #[allow(dead_code)]
157 pub(in crate::db) fn replace_structural(
158 &self,
159 key: E::Key,
160 patch: UpdatePatch,
161 ) -> Result<E, InternalError> {
162 self.apply_structural_mutation(MutationMode::Replace, key, patch)
163 }
164
165 #[allow(dead_code)]
170 pub(in crate::db) fn update_structural(
171 &self,
172 key: E::Key,
173 patch: UpdatePatch,
174 ) -> Result<E, InternalError> {
175 self.apply_structural_mutation(MutationMode::Update, key, patch)
176 }
177
178 pub(crate) fn replace(&self, entity: E) -> Result<E, InternalError> {
180 self.save_entity(SaveMode::Replace, entity)
181 }
182
183 fn save_batch_non_atomic(
192 &self,
193 mode: SaveMode,
194 entities: impl IntoIterator<Item = E>,
195 ) -> Result<Vec<E>, InternalError> {
196 let iter = entities.into_iter();
197 let mut out = Vec::with_capacity(iter.size_hint().0);
198 let ctx = mutation_write_context::<E>(&self.db)?;
199 let save_rule = SaveRule::from_mode(mode);
200 let schema = Self::schema_info()?;
201 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
202 let validate_relations = model_has_strong_relation_targets(E::MODEL);
203 let mut batch_span = None;
204 let mut batch_delta = PreparedRowOpDelta {
205 index_inserts: 0,
206 index_removes: 0,
207 reverse_index_inserts: 0,
208 reverse_index_removes: 0,
209 };
210
211 for entity in iter {
214 let span = batch_span.get_or_insert_with(|| Span::<E>::new(ExecKind::Save));
215
216 let result = (|| {
217 let (saved, marker_row_op) = self.prepare_entity_save_row_op(
218 &ctx,
219 save_rule,
220 schema,
221 schema_fingerprint,
222 validate_relations,
223 entity,
224 )?;
225 let prepared_row_op =
226 prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<
227 E,
228 >(&self.db, &marker_row_op, &ctx, &ctx, schema_fingerprint)?;
229 Self::commit_prepared_single_row(
230 &self.db,
231 marker_row_op,
232 prepared_row_op,
233 |delta| accumulate_prepared_row_op_delta(&mut batch_delta, delta),
234 || {},
235 )?;
236
237 Ok::<_, InternalError>(saved)
238 })();
239
240 match result {
241 Ok(saved) => {
242 out.push(saved);
243 span.set_rows(u64::try_from(out.len()).unwrap_or(u64::MAX));
244 }
245 Err(err) => {
246 if !out.is_empty() {
247 emit_index_delta_metrics::<E>(&batch_delta);
248 record(MetricsEvent::NonAtomicPartialCommit {
249 entity_path: E::PATH,
250 committed_rows: u64::try_from(out.len()).unwrap_or(u64::MAX),
251 });
252 }
253
254 return Err(err);
255 }
256 }
257 }
258
259 if !out.is_empty() {
260 emit_index_delta_metrics::<E>(&batch_delta);
261 }
262
263 Ok(out)
264 }
265
266 fn save_batch_atomic(
273 &self,
274 mode: SaveMode,
275 entities: impl IntoIterator<Item = E>,
276 ) -> Result<Vec<E>, InternalError> {
277 let entities: Vec<E> = entities.into_iter().collect();
278
279 self.save_batch_atomic_impl(SaveRule::from_mode(mode), entities)
280 }
281
282 pub(crate) fn insert_many_atomic(
286 &self,
287 entities: impl IntoIterator<Item = E>,
288 ) -> Result<Vec<E>, InternalError> {
289 self.save_batch_atomic(SaveMode::Insert, entities)
290 }
291
292 pub(crate) fn update_many_atomic(
296 &self,
297 entities: impl IntoIterator<Item = E>,
298 ) -> Result<Vec<E>, InternalError> {
299 self.save_batch_atomic(SaveMode::Update, entities)
300 }
301
302 pub(crate) fn replace_many_atomic(
306 &self,
307 entities: impl IntoIterator<Item = E>,
308 ) -> Result<Vec<E>, InternalError> {
309 self.save_batch_atomic(SaveMode::Replace, entities)
310 }
311
312 pub(crate) fn insert_many_non_atomic(
316 &self,
317 entities: impl IntoIterator<Item = E>,
318 ) -> Result<Vec<E>, InternalError> {
319 self.save_batch_non_atomic(SaveMode::Insert, entities)
320 }
321
322 pub(crate) fn update_many_non_atomic(
326 &self,
327 entities: impl IntoIterator<Item = E>,
328 ) -> Result<Vec<E>, InternalError> {
329 self.save_batch_non_atomic(SaveMode::Update, entities)
330 }
331
332 pub(crate) fn replace_many_non_atomic(
336 &self,
337 entities: impl IntoIterator<Item = E>,
338 ) -> Result<Vec<E>, InternalError> {
339 self.save_batch_non_atomic(SaveMode::Replace, entities)
340 }
341
342 #[inline(never)]
345 fn save_batch_atomic_impl(
346 &self,
347 save_rule: SaveRule,
348 entities: Vec<E>,
349 ) -> Result<Vec<E>, InternalError> {
350 let mut span = Span::<E>::new(ExecKind::Save);
352 let ctx = mutation_write_context::<E>(&self.db)?;
353 let mut out = Vec::with_capacity(entities.len());
354 let mut marker_row_ops = Vec::with_capacity(entities.len());
355 let mut seen_row_keys = HashSet::with_capacity(entities.len());
356 let schema = Self::schema_info()?;
357 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
358 let validate_relations = model_has_strong_relation_targets(E::MODEL);
359
360 for mut entity in entities {
362 self.preflight_entity_with_cached_schema(&mut entity, schema, validate_relations)?;
363 let marker_row_op =
364 Self::prepare_typed_entity_row_op(&ctx, save_rule, &entity, schema_fingerprint)?;
365 if !seen_row_keys.insert(marker_row_op.key) {
366 let data_key = DataKey::try_new::<E>(entity.id().key())?;
367 return Err(InternalError::mutation_atomic_save_duplicate_key(
368 E::PATH,
369 data_key,
370 ));
371 }
372 marker_row_ops.push(marker_row_op);
373 out.push(entity);
374 }
375
376 if marker_row_ops.is_empty() {
377 return Ok(out);
378 }
379
380 Self::commit_atomic_batch(&self.db, marker_row_ops, &mut span)?;
382
383 Ok(out)
384 }
385
386 fn prepare_typed_entity_row_op(
388 ctx: &Context<'_, E>,
389 save_rule: SaveRule,
390 entity: &E,
391 schema_fingerprint: CommitSchemaFingerprint,
392 ) -> Result<CommitRowOp, InternalError> {
393 let data_key = DataKey::try_new::<E>(entity.id().key())?;
395 let raw_key = data_key.to_raw()?;
396 let old_raw = Self::resolve_existing_row_for_rule(ctx, &data_key, save_rule)?;
397
398 let row_bytes = CanonicalRow::from_entity(entity)?
402 .into_raw_row()
403 .into_bytes();
404 let before_bytes = old_raw.map(<RawRow as Storable>::into_bytes);
405 let row_op = CommitRowOp::new(
406 E::PATH,
407 raw_key,
408 before_bytes,
409 Some(row_bytes),
410 schema_fingerprint,
411 );
412
413 Ok(row_op)
414 }
415
416 fn build_structural_after_image_row(
418 mode: MutationMode,
419 mutation: &MutationInput,
420 old_row: Option<&RawRow>,
421 ) -> Result<CanonicalRow, InternalError> {
422 match mode {
423 MutationMode::Update => {
424 let Some(old_row) = old_row else {
425 return RawRow::from_serialized_update_patch(
426 E::MODEL,
427 mutation.serialized_patch(),
428 );
429 };
430
431 old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
432 }
433 MutationMode::Insert | MutationMode::Replace => {
434 RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch())
435 }
436 }
437 }
438
439 fn resolve_existing_row_for_rule(
441 ctx: &Context<'_, E>,
442 data_key: &DataKey,
443 save_rule: SaveRule,
444 ) -> Result<Option<RawRow>, InternalError> {
445 let raw_key = data_key.to_raw()?;
446
447 match save_rule {
448 SaveRule::RequireAbsent => {
449 if let Some(existing) = ctx.with_store(|store| store.get(&raw_key))? {
450 Self::validate_existing_row_identity(data_key, &existing)?;
451 return Err(ExecutorError::KeyExists(data_key.clone()).into());
452 }
453
454 Ok(None)
455 }
456 SaveRule::RequirePresent => {
457 let old_row = ctx
458 .with_store(|store| store.get(&raw_key))?
459 .ok_or_else(|| InternalError::store_not_found(data_key.to_string()))?;
460 Self::validate_existing_row_identity(data_key, &old_row)?;
461
462 Ok(Some(old_row))
463 }
464 SaveRule::AllowAny => {
465 let old_row = ctx.with_store(|store| store.get(&raw_key))?;
466 if let Some(old) = old_row.as_ref() {
467 Self::validate_existing_row_identity(data_key, old)?;
468 }
469
470 Ok(old_row)
471 }
472 }
473 }
474
475 fn validate_existing_row_identity(
477 data_key: &DataKey,
478 row: &RawRow,
479 ) -> Result<(), InternalError> {
480 Self::ensure_persisted_row_invariants(data_key, row).map_err(|err| {
481 match (err.class(), err.origin()) {
482 (
483 crate::error::ErrorClass::Corruption,
484 crate::error::ErrorOrigin::Serialize | crate::error::ErrorOrigin::Store,
485 ) => err,
486 _ => InternalError::from(ExecutorError::persisted_row_invariant_violation(
487 data_key,
488 &err.message,
489 )),
490 }
491 })?;
492
493 Ok(())
494 }
495
496 fn save_entity(&self, mode: SaveMode, entity: E) -> Result<E, InternalError> {
497 let ctx = mutation_write_context::<E>(&self.db)?;
498 let save_rule = SaveRule::from_mode(mode);
499
500 self.save_entity_with_context(&ctx, save_rule, entity)
501 }
502
503 fn save_entity_with_context(
506 &self,
507 ctx: &Context<'_, E>,
508 save_rule: SaveRule,
509 entity: E,
510 ) -> Result<E, InternalError> {
511 let schema = Self::schema_info()?;
512 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
513 let validate_relations = model_has_strong_relation_targets(E::MODEL);
514 self.save_entity_with_context_and_schema(
515 ctx,
516 save_rule,
517 schema,
518 schema_fingerprint,
519 validate_relations,
520 entity,
521 )
522 }
523
524 fn save_entity_with_context_and_schema(
527 &self,
528 ctx: &Context<'_, E>,
529 save_rule: SaveRule,
530 schema: &SchemaInfo,
531 schema_fingerprint: CommitSchemaFingerprint,
532 validate_relations: bool,
533 entity: E,
534 ) -> Result<E, InternalError> {
535 let mut span = Span::<E>::new(ExecKind::Save);
536 let (entity, marker_row_op) = self.prepare_entity_save_row_op(
537 ctx,
538 save_rule,
539 schema,
540 schema_fingerprint,
541 validate_relations,
542 entity,
543 )?;
544 let prepared_row_op =
545 prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
546 &self.db,
547 &marker_row_op,
548 ctx,
549 ctx,
550 schema_fingerprint,
551 )?;
552
553 Self::commit_prepared_single_row(
556 &self.db,
557 marker_row_op,
558 prepared_row_op,
559 |delta| emit_index_delta_metrics::<E>(delta),
560 || {
561 span.set_rows(1);
562 },
563 )?;
564
565 Ok(entity)
566 }
567
568 fn prepare_entity_save_row_op(
571 &self,
572 ctx: &Context<'_, E>,
573 save_rule: SaveRule,
574 schema: &SchemaInfo,
575 schema_fingerprint: CommitSchemaFingerprint,
576 validate_relations: bool,
577 entity: E,
578 ) -> Result<(E, CommitRowOp), InternalError> {
579 let mut entity = entity;
580
581 self.preflight_entity_with_cached_schema(&mut entity, schema, validate_relations)?;
584 let marker_row_op =
585 Self::prepare_typed_entity_row_op(ctx, save_rule, &entity, schema_fingerprint)?;
586
587 Ok((entity, marker_row_op))
588 }
589
590 #[allow(dead_code)]
592 pub(in crate::db) fn apply_structural_mutation(
593 &self,
594 mode: MutationMode,
595 key: E::Key,
596 patch: UpdatePatch,
597 ) -> Result<E, InternalError> {
598 let mutation = MutationInput::from_update_patch::<E>(key, &patch)?;
599
600 self.save_structural_mutation(mode, mutation)
601 }
602
603 #[allow(dead_code)]
604 fn save_structural_mutation(
605 &self,
606 mode: MutationMode,
607 mutation: MutationInput,
608 ) -> Result<E, InternalError> {
609 let mut span = Span::<E>::new(ExecKind::Save);
610 let ctx = mutation_write_context::<E>(&self.db)?;
611 let data_key = mutation.data_key().clone();
612 let old_raw = Self::resolve_existing_row_for_rule(&ctx, &data_key, mode.save_rule())?;
613 let raw_after_image =
614 Self::build_structural_after_image_row(mode, &mutation, old_raw.as_ref())?;
615 let entity = self.validate_structural_after_image(&data_key, &raw_after_image)?;
616 let normalized_mutation = MutationInput::from_entity(&entity)?;
617 let row_bytes =
618 Self::build_structural_after_image_row(mode, &normalized_mutation, old_raw.as_ref())?;
619 let row_bytes = row_bytes.into_raw_row().into_bytes();
620 let before_bytes = old_raw.map(<RawRow as Storable>::into_bytes);
621 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
622 let marker_row_op = CommitRowOp::new(
623 E::PATH,
624 data_key.to_raw()?,
625 before_bytes,
626 Some(row_bytes),
627 schema_fingerprint,
628 );
629 let prepared_row_op =
630 prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
631 &self.db,
632 &marker_row_op,
633 &ctx,
634 &ctx,
635 schema_fingerprint,
636 )?;
637
638 Self::commit_prepared_single_row(
639 &self.db,
640 marker_row_op,
641 prepared_row_op,
642 |delta| emit_index_delta_metrics::<E>(delta),
643 || {
644 span.set_rows(1);
645 },
646 )?;
647
648 Ok(entity)
649 }
650
651 #[allow(dead_code)]
654 fn validate_structural_after_image(
655 &self,
656 data_key: &DataKey,
657 row: &RawRow,
658 ) -> Result<E, InternalError> {
659 let expected_key = data_key.try_key::<E>()?;
660 let mut entity = row.try_decode::<E>().map_err(|err| {
661 InternalError::mutation_structural_after_image_invalid(
662 E::PATH,
663 data_key,
664 err.to_string(),
665 )
666 })?;
667 let identity_key = entity.id().key();
668 if identity_key != expected_key {
669 let field_name = E::MODEL.primary_key().name();
670 let field_value = FieldValue::to_value(&identity_key);
671 let identity_value = FieldValue::to_value(&expected_key);
672
673 return Err(InternalError::mutation_entity_primary_key_mismatch(
674 E::PATH,
675 field_name,
676 &field_value,
677 &identity_value,
678 ));
679 }
680
681 self.preflight_entity(&mut entity)?;
682
683 Ok(entity)
684 }
685
686 fn commit_prepared_single_row(
688 db: &Db<E::Canister>,
689 marker_row_op: CommitRowOp,
690 prepared_row_op: crate::db::commit::PreparedRowCommitOp,
691 on_index_applied: impl FnOnce(&PreparedRowOpDelta),
692 on_data_applied: impl FnOnce(),
693 ) -> Result<(), InternalError> {
694 let synchronized_store_handles = synchronized_store_handles_for_prepared_row_ops(
695 db,
696 std::slice::from_ref(&prepared_row_op),
697 );
698
699 commit_prepared_single_save_row_op_with_window(
701 marker_row_op,
702 prepared_row_op,
703 synchronized_store_handles,
704 "save_row_apply",
705 on_index_applied,
706 || {
707 on_data_applied();
708 },
709 )?;
710
711 Ok(())
712 }
713
714 fn commit_atomic_batch(
716 db: &Db<E::Canister>,
717 marker_row_ops: Vec<CommitRowOp>,
718 span: &mut Span<E>,
719 ) -> Result<(), InternalError> {
720 let rows_touched = u64::try_from(marker_row_ops.len()).unwrap_or(u64::MAX);
721 commit_save_row_ops_with_window::<E>(
722 db,
723 marker_row_ops,
724 "save_batch_atomic_row_apply",
725 || {
726 span.set_rows(rows_touched);
727 },
728 )?;
729
730 Ok(())
731 }
732}
733
734const fn accumulate_prepared_row_op_delta(
736 total: &mut PreparedRowOpDelta,
737 delta: &PreparedRowOpDelta,
738) {
739 total.index_inserts = total.index_inserts.saturating_add(delta.index_inserts);
740 total.index_removes = total.index_removes.saturating_add(delta.index_removes);
741 total.reverse_index_inserts = total
742 .reverse_index_inserts
743 .saturating_add(delta.reverse_index_inserts);
744 total.reverse_index_removes = total
745 .reverse_index_removes
746 .saturating_add(delta.reverse_index_removes);
747}