1use crate::{
7 db::{
8 Db,
9 commit::{
10 CommitRowOp, CommitSchemaFingerprint,
11 prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint,
12 },
13 data::{CanonicalRow, DataKey, PersistedRow, RawRow, UpdatePatch},
14 executor::{
15 Context, ExecutorError,
16 mutation::{
17 MutationInput, PreparedRowOpDelta, commit_prepared_single_save_row_op_with_window,
18 commit_save_row_ops_with_window, emit_index_delta_metrics, mutation_write_context,
19 synchronized_store_handles_for_prepared_row_ops,
20 },
21 },
22 relation::model_has_strong_relation_targets,
23 schema::{SchemaInfo, commit_schema_fingerprint_for_entity},
24 },
25 error::InternalError,
26 metrics::sink::{ExecKind, MetricsEvent, Span, record},
27 traits::{EntityValue, FieldValue, Storable},
28};
29use candid::CandidType;
30use derive_more::Display;
31use serde::{Deserialize, Serialize};
32use std::collections::HashSet;
33
34#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Display, Serialize)]
46enum SaveMode {
47 #[default]
48 Insert,
49 Replace,
50 Update,
51}
52
53#[derive(Clone, Copy)]
58pub(in crate::db) struct SaveExecutor<E: PersistedRow + EntityValue> {
59 pub(in crate::db::executor::mutation) db: Db<E::Canister>,
60}
61
62#[derive(Clone, Copy)]
68enum SaveRule {
69 RequireAbsent,
70 RequirePresent,
71 AllowAny,
72}
73
74impl SaveRule {
75 const fn from_mode(mode: SaveMode) -> Self {
76 match mode {
77 SaveMode::Insert => Self::RequireAbsent,
78 SaveMode::Update => Self::RequirePresent,
79 SaveMode::Replace => Self::AllowAny,
80 }
81 }
82}
83
84#[derive(Clone, Copy)]
94pub enum MutationMode {
95 Insert,
96 Replace,
97 Update,
98}
99
100impl MutationMode {
101 const fn save_rule(self) -> SaveRule {
102 match self {
103 Self::Insert => SaveRule::RequireAbsent,
104 Self::Replace => SaveRule::AllowAny,
105 Self::Update => SaveRule::RequirePresent,
106 }
107 }
108}
109
110impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
111 #[must_use]
117 pub(in crate::db) const fn new(db: Db<E::Canister>, _debug: bool) -> Self {
118 Self { db }
119 }
120
121 pub(crate) fn insert(&self, entity: E) -> Result<E, InternalError> {
127 self.save_entity(SaveMode::Insert, entity)
128 }
129
130 pub(crate) fn update(&self, entity: E) -> Result<E, InternalError> {
132 self.save_entity(SaveMode::Update, entity)
133 }
134
135 pub(crate) fn replace(&self, entity: E) -> Result<E, InternalError> {
137 self.save_entity(SaveMode::Replace, entity)
138 }
139
140 fn save_batch_non_atomic(
149 &self,
150 mode: SaveMode,
151 entities: impl IntoIterator<Item = E>,
152 ) -> Result<Vec<E>, InternalError> {
153 let iter = entities.into_iter();
154 let mut out = Vec::with_capacity(iter.size_hint().0);
155 let ctx = mutation_write_context::<E>(&self.db)?;
156 let save_rule = SaveRule::from_mode(mode);
157 let schema = Self::schema_info();
158 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
159 let validate_relations = model_has_strong_relation_targets(E::MODEL);
160 let mut batch_span = None;
161 let mut batch_delta = PreparedRowOpDelta {
162 index_inserts: 0,
163 index_removes: 0,
164 reverse_index_inserts: 0,
165 reverse_index_removes: 0,
166 };
167
168 for entity in iter {
171 let span = batch_span.get_or_insert_with(|| Span::<E>::new(ExecKind::Save));
172
173 let result = (|| {
174 let (saved, marker_row_op) = self.prepare_entity_save_row_op(
175 &ctx,
176 save_rule,
177 schema,
178 schema_fingerprint,
179 validate_relations,
180 entity,
181 )?;
182 let prepared_row_op =
183 prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<
184 E,
185 >(&self.db, &marker_row_op, &ctx, &ctx, schema_fingerprint)?;
186 Self::commit_prepared_single_row(
187 &self.db,
188 marker_row_op,
189 prepared_row_op,
190 |delta| accumulate_prepared_row_op_delta(&mut batch_delta, delta),
191 || {},
192 )?;
193
194 Ok::<_, InternalError>(saved)
195 })();
196
197 match result {
198 Ok(saved) => {
199 out.push(saved);
200 span.set_rows(u64::try_from(out.len()).unwrap_or(u64::MAX));
201 }
202 Err(err) => {
203 if !out.is_empty() {
204 emit_index_delta_metrics::<E>(&batch_delta);
205 record(MetricsEvent::NonAtomicPartialCommit {
206 entity_path: E::PATH,
207 committed_rows: u64::try_from(out.len()).unwrap_or(u64::MAX),
208 });
209 }
210
211 return Err(err);
212 }
213 }
214 }
215
216 if !out.is_empty() {
217 emit_index_delta_metrics::<E>(&batch_delta);
218 }
219
220 Ok(out)
221 }
222
223 fn save_batch_atomic(
230 &self,
231 mode: SaveMode,
232 entities: impl IntoIterator<Item = E>,
233 ) -> Result<Vec<E>, InternalError> {
234 let entities: Vec<E> = entities.into_iter().collect();
235
236 self.save_batch_atomic_impl(SaveRule::from_mode(mode), entities)
237 }
238
239 pub(crate) fn insert_many_atomic(
243 &self,
244 entities: impl IntoIterator<Item = E>,
245 ) -> Result<Vec<E>, InternalError> {
246 self.save_batch_atomic(SaveMode::Insert, entities)
247 }
248
249 pub(crate) fn update_many_atomic(
253 &self,
254 entities: impl IntoIterator<Item = E>,
255 ) -> Result<Vec<E>, InternalError> {
256 self.save_batch_atomic(SaveMode::Update, entities)
257 }
258
259 pub(crate) fn replace_many_atomic(
263 &self,
264 entities: impl IntoIterator<Item = E>,
265 ) -> Result<Vec<E>, InternalError> {
266 self.save_batch_atomic(SaveMode::Replace, entities)
267 }
268
269 pub(crate) fn insert_many_non_atomic(
273 &self,
274 entities: impl IntoIterator<Item = E>,
275 ) -> Result<Vec<E>, InternalError> {
276 self.save_batch_non_atomic(SaveMode::Insert, entities)
277 }
278
279 pub(crate) fn update_many_non_atomic(
283 &self,
284 entities: impl IntoIterator<Item = E>,
285 ) -> Result<Vec<E>, InternalError> {
286 self.save_batch_non_atomic(SaveMode::Update, entities)
287 }
288
289 pub(crate) fn replace_many_non_atomic(
293 &self,
294 entities: impl IntoIterator<Item = E>,
295 ) -> Result<Vec<E>, InternalError> {
296 self.save_batch_non_atomic(SaveMode::Replace, entities)
297 }
298
299 #[inline(never)]
302 fn save_batch_atomic_impl(
303 &self,
304 save_rule: SaveRule,
305 entities: Vec<E>,
306 ) -> Result<Vec<E>, InternalError> {
307 let mut span = Span::<E>::new(ExecKind::Save);
309 let ctx = mutation_write_context::<E>(&self.db)?;
310 let mut out = Vec::with_capacity(entities.len());
311 let mut marker_row_ops = Vec::with_capacity(entities.len());
312 let mut seen_row_keys = HashSet::with_capacity(entities.len());
313 let schema = Self::schema_info();
314 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
315 let validate_relations = model_has_strong_relation_targets(E::MODEL);
316
317 for mut entity in entities {
319 self.preflight_entity_with_cached_schema(&mut entity, schema, validate_relations)?;
320 let marker_row_op =
321 Self::prepare_typed_entity_row_op(&ctx, save_rule, &entity, schema_fingerprint)?;
322 if !seen_row_keys.insert(marker_row_op.key) {
323 let data_key = DataKey::try_new::<E>(entity.id().key())?;
324 return Err(InternalError::mutation_atomic_save_duplicate_key(
325 E::PATH,
326 data_key,
327 ));
328 }
329 marker_row_ops.push(marker_row_op);
330 out.push(entity);
331 }
332
333 if marker_row_ops.is_empty() {
334 return Ok(out);
335 }
336
337 Self::commit_atomic_batch(&self.db, marker_row_ops, &mut span)?;
339
340 Ok(out)
341 }
342
343 fn prepare_typed_entity_row_op(
345 ctx: &Context<'_, E>,
346 save_rule: SaveRule,
347 entity: &E,
348 schema_fingerprint: CommitSchemaFingerprint,
349 ) -> Result<CommitRowOp, InternalError> {
350 let data_key = DataKey::try_new::<E>(entity.id().key())?;
352 let raw_key = data_key.to_raw()?;
353 let old_raw = Self::resolve_existing_row_for_rule(ctx, &data_key, save_rule)?;
354
355 let row_bytes = CanonicalRow::from_entity(entity)?
359 .into_raw_row()
360 .into_bytes();
361 let before_bytes = old_raw.map(<RawRow as Storable>::into_bytes);
362 let row_op = CommitRowOp::new(
363 E::PATH,
364 raw_key,
365 before_bytes,
366 Some(row_bytes),
367 schema_fingerprint,
368 );
369
370 Ok(row_op)
371 }
372
373 fn build_structural_after_image_row(
375 mode: MutationMode,
376 mutation: &MutationInput,
377 old_row: Option<&RawRow>,
378 ) -> Result<CanonicalRow, InternalError> {
379 match mode {
380 MutationMode::Update => {
381 let Some(old_row) = old_row else {
382 return RawRow::from_serialized_update_patch(
383 E::MODEL,
384 mutation.serialized_patch(),
385 );
386 };
387
388 old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
389 }
390 MutationMode::Insert | MutationMode::Replace => {
391 RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch())
392 }
393 }
394 }
395
396 fn resolve_existing_row_for_rule(
398 ctx: &Context<'_, E>,
399 data_key: &DataKey,
400 save_rule: SaveRule,
401 ) -> Result<Option<RawRow>, InternalError> {
402 let raw_key = data_key.to_raw()?;
403
404 match save_rule {
405 SaveRule::RequireAbsent => {
406 if let Some(existing) = ctx.with_store(|store| store.get(&raw_key))? {
407 Self::validate_existing_row_identity(data_key, &existing)?;
408 return Err(ExecutorError::KeyExists(data_key.clone()).into());
409 }
410
411 Ok(None)
412 }
413 SaveRule::RequirePresent => {
414 let old_row = ctx
415 .with_store(|store| store.get(&raw_key))?
416 .ok_or_else(|| InternalError::store_not_found(data_key.to_string()))?;
417 Self::validate_existing_row_identity(data_key, &old_row)?;
418
419 Ok(Some(old_row))
420 }
421 SaveRule::AllowAny => {
422 let old_row = ctx.with_store(|store| store.get(&raw_key))?;
423 if let Some(old) = old_row.as_ref() {
424 Self::validate_existing_row_identity(data_key, old)?;
425 }
426
427 Ok(old_row)
428 }
429 }
430 }
431
432 fn validate_existing_row_identity(
434 data_key: &DataKey,
435 row: &RawRow,
436 ) -> Result<(), InternalError> {
437 Self::ensure_persisted_row_invariants(data_key, row).map_err(|err| {
438 match (err.class(), err.origin()) {
439 (
440 crate::error::ErrorClass::Corruption,
441 crate::error::ErrorOrigin::Serialize | crate::error::ErrorOrigin::Store,
442 ) => err,
443 _ => InternalError::from(ExecutorError::persisted_row_invariant_violation(
444 data_key,
445 &err.message,
446 )),
447 }
448 })?;
449
450 Ok(())
451 }
452
453 fn save_entity(&self, mode: SaveMode, entity: E) -> Result<E, InternalError> {
454 let ctx = mutation_write_context::<E>(&self.db)?;
455 let save_rule = SaveRule::from_mode(mode);
456
457 self.save_entity_with_context(&ctx, save_rule, entity)
458 }
459
460 fn save_entity_with_context(
463 &self,
464 ctx: &Context<'_, E>,
465 save_rule: SaveRule,
466 entity: E,
467 ) -> Result<E, InternalError> {
468 let schema = Self::schema_info();
469 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
470 let validate_relations = model_has_strong_relation_targets(E::MODEL);
471 self.save_entity_with_context_and_schema(
472 ctx,
473 save_rule,
474 schema,
475 schema_fingerprint,
476 validate_relations,
477 entity,
478 )
479 }
480
481 fn save_entity_with_context_and_schema(
484 &self,
485 ctx: &Context<'_, E>,
486 save_rule: SaveRule,
487 schema: &SchemaInfo,
488 schema_fingerprint: CommitSchemaFingerprint,
489 validate_relations: bool,
490 entity: E,
491 ) -> Result<E, InternalError> {
492 let mut span = Span::<E>::new(ExecKind::Save);
493 let (entity, marker_row_op) = self.prepare_entity_save_row_op(
494 ctx,
495 save_rule,
496 schema,
497 schema_fingerprint,
498 validate_relations,
499 entity,
500 )?;
501 let prepared_row_op =
502 prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
503 &self.db,
504 &marker_row_op,
505 ctx,
506 ctx,
507 schema_fingerprint,
508 )?;
509
510 Self::commit_prepared_single_row(
513 &self.db,
514 marker_row_op,
515 prepared_row_op,
516 |delta| emit_index_delta_metrics::<E>(delta),
517 || {
518 span.set_rows(1);
519 },
520 )?;
521
522 Ok(entity)
523 }
524
525 fn prepare_entity_save_row_op(
528 &self,
529 ctx: &Context<'_, E>,
530 save_rule: SaveRule,
531 schema: &SchemaInfo,
532 schema_fingerprint: CommitSchemaFingerprint,
533 validate_relations: bool,
534 entity: E,
535 ) -> Result<(E, CommitRowOp), InternalError> {
536 let mut entity = entity;
537
538 self.preflight_entity_with_cached_schema(&mut entity, schema, validate_relations)?;
541 let marker_row_op =
542 Self::prepare_typed_entity_row_op(ctx, save_rule, &entity, schema_fingerprint)?;
543
544 Ok((entity, marker_row_op))
545 }
546
547 pub(in crate::db) fn apply_structural_mutation(
549 &self,
550 mode: MutationMode,
551 key: E::Key,
552 patch: UpdatePatch,
553 ) -> Result<E, InternalError> {
554 let mutation = MutationInput::from_update_patch::<E>(key, &patch)?;
555
556 self.save_structural_mutation(mode, mutation)
557 }
558
559 fn save_structural_mutation(
560 &self,
561 mode: MutationMode,
562 mutation: MutationInput,
563 ) -> Result<E, InternalError> {
564 let mut span = Span::<E>::new(ExecKind::Save);
565 let ctx = mutation_write_context::<E>(&self.db)?;
566 let data_key = mutation.data_key().clone();
567 let old_raw = Self::resolve_existing_row_for_rule(&ctx, &data_key, mode.save_rule())?;
568 let raw_after_image =
569 Self::build_structural_after_image_row(mode, &mutation, old_raw.as_ref())?;
570 let entity = self.validate_structural_after_image(&data_key, &raw_after_image)?;
571 let normalized_mutation = MutationInput::from_entity(&entity)?;
572 let row_bytes =
573 Self::build_structural_after_image_row(mode, &normalized_mutation, old_raw.as_ref())?;
574 let row_bytes = row_bytes.into_raw_row().into_bytes();
575 let before_bytes = old_raw.map(<RawRow as Storable>::into_bytes);
576 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
577 let marker_row_op = CommitRowOp::new(
578 E::PATH,
579 data_key.to_raw()?,
580 before_bytes,
581 Some(row_bytes),
582 schema_fingerprint,
583 );
584 let prepared_row_op =
585 prepare_row_commit_for_entity_with_structural_readers_and_schema_fingerprint::<E>(
586 &self.db,
587 &marker_row_op,
588 &ctx,
589 &ctx,
590 schema_fingerprint,
591 )?;
592
593 Self::commit_prepared_single_row(
594 &self.db,
595 marker_row_op,
596 prepared_row_op,
597 |delta| emit_index_delta_metrics::<E>(delta),
598 || {
599 span.set_rows(1);
600 },
601 )?;
602
603 Ok(entity)
604 }
605
606 fn validate_structural_after_image(
609 &self,
610 data_key: &DataKey,
611 row: &RawRow,
612 ) -> Result<E, InternalError> {
613 let expected_key = data_key.try_key::<E>()?;
614 let mut entity = row.try_decode::<E>().map_err(|err| {
615 InternalError::mutation_structural_after_image_invalid(
616 E::PATH,
617 data_key,
618 err.to_string(),
619 )
620 })?;
621 let identity_key = entity.id().key();
622 if identity_key != expected_key {
623 let field_name = E::MODEL.primary_key().name();
624 let field_value = FieldValue::to_value(&identity_key);
625 let identity_value = FieldValue::to_value(&expected_key);
626
627 return Err(InternalError::mutation_entity_primary_key_mismatch(
628 E::PATH,
629 field_name,
630 &field_value,
631 &identity_value,
632 ));
633 }
634
635 self.preflight_entity(&mut entity)?;
636
637 Ok(entity)
638 }
639
640 fn commit_prepared_single_row(
642 db: &Db<E::Canister>,
643 marker_row_op: CommitRowOp,
644 prepared_row_op: crate::db::commit::PreparedRowCommitOp,
645 on_index_applied: impl FnOnce(&PreparedRowOpDelta),
646 on_data_applied: impl FnOnce(),
647 ) -> Result<(), InternalError> {
648 let synchronized_store_handles = synchronized_store_handles_for_prepared_row_ops(
649 db,
650 std::slice::from_ref(&prepared_row_op),
651 );
652
653 commit_prepared_single_save_row_op_with_window(
655 marker_row_op,
656 prepared_row_op,
657 synchronized_store_handles,
658 "save_row_apply",
659 on_index_applied,
660 || {
661 on_data_applied();
662 },
663 )?;
664
665 Ok(())
666 }
667
668 fn commit_atomic_batch(
670 db: &Db<E::Canister>,
671 marker_row_ops: Vec<CommitRowOp>,
672 span: &mut Span<E>,
673 ) -> Result<(), InternalError> {
674 let rows_touched = u64::try_from(marker_row_ops.len()).unwrap_or(u64::MAX);
675 commit_save_row_ops_with_window::<E>(
676 db,
677 marker_row_ops,
678 "save_batch_atomic_row_apply",
679 || {
680 span.set_rows(rows_touched);
681 },
682 )?;
683
684 Ok(())
685 }
686}
687
688const fn accumulate_prepared_row_op_delta(
690 total: &mut PreparedRowOpDelta,
691 delta: &PreparedRowOpDelta,
692) {
693 total.index_inserts = total.index_inserts.saturating_add(delta.index_inserts);
694 total.index_removes = total.index_removes.saturating_add(delta.index_removes);
695 total.reverse_index_inserts = total
696 .reverse_index_inserts
697 .saturating_add(delta.reverse_index_inserts);
698 total.reverse_index_removes = total
699 .reverse_index_removes
700 .saturating_add(delta.reverse_index_removes);
701}