1use crate::{
7 db::{
8 Db,
9 commit::CommitRowOp,
10 data::{CanonicalRow, DataKey, PersistedRow, RawRow, UpdatePatch},
11 executor::{
12 Context, ExecutorError,
13 mutation::{
14 MutationInput, commit_save_row_ops_with_window,
15 commit_single_save_row_op_with_window, mutation_write_context,
16 },
17 },
18 schema::commit_schema_fingerprint_for_entity,
19 },
20 error::InternalError,
21 metrics::sink::{ExecKind, MetricsEvent, Span, record},
22 traits::{EntityValue, FieldValue},
23};
24use candid::CandidType;
25use derive_more::Display;
26use serde::{Deserialize, Serialize};
27use std::collections::BTreeSet;
28
29#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Display, Serialize)]
41enum SaveMode {
42 #[default]
43 Insert,
44 Replace,
45 Update,
46}
47
48#[derive(Clone, Copy)]
53pub(in crate::db) struct SaveExecutor<E: PersistedRow + EntityValue> {
54 pub(in crate::db::executor::mutation) db: Db<E::Canister>,
55}
56
57#[derive(Clone, Copy)]
63enum SaveRule {
64 RequireAbsent,
65 RequirePresent,
66 AllowAny,
67}
68
69impl SaveRule {
70 const fn from_mode(mode: SaveMode) -> Self {
71 match mode {
72 SaveMode::Insert => Self::RequireAbsent,
73 SaveMode::Update => Self::RequirePresent,
74 SaveMode::Replace => Self::AllowAny,
75 }
76 }
77}
78
79#[derive(Clone, Copy)]
91pub enum MutationMode {
92 #[allow(dead_code)]
93 Insert,
94 #[allow(dead_code)]
95 Replace,
96 Update,
97}
98
99impl MutationMode {
100 const fn save_rule(self) -> SaveRule {
101 match self {
102 Self::Insert => SaveRule::RequireAbsent,
103 Self::Replace => SaveRule::AllowAny,
104 Self::Update => SaveRule::RequirePresent,
105 }
106 }
107}
108
109impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
110 #[must_use]
116 pub(in crate::db) const fn new(db: Db<E::Canister>, _debug: bool) -> Self {
117 Self { db }
118 }
119
120 pub(crate) fn insert(&self, entity: E) -> Result<E, InternalError> {
126 self.save_entity(SaveMode::Insert, entity)
127 }
128
129 pub(crate) fn update(&self, entity: E) -> Result<E, InternalError> {
131 self.save_entity(SaveMode::Update, entity)
132 }
133
134 #[allow(dead_code)]
139 pub(in crate::db) fn insert_structural(
140 &self,
141 key: E::Key,
142 patch: UpdatePatch,
143 ) -> Result<E, InternalError> {
144 self.apply_structural_mutation(MutationMode::Insert, key, patch)
145 }
146
147 #[allow(dead_code)]
152 pub(in crate::db) fn replace_structural(
153 &self,
154 key: E::Key,
155 patch: UpdatePatch,
156 ) -> Result<E, InternalError> {
157 self.apply_structural_mutation(MutationMode::Replace, key, patch)
158 }
159
160 #[allow(dead_code)]
165 pub(in crate::db) fn update_structural(
166 &self,
167 key: E::Key,
168 patch: UpdatePatch,
169 ) -> Result<E, InternalError> {
170 self.apply_structural_mutation(MutationMode::Update, key, patch)
171 }
172
173 pub(crate) fn replace(&self, entity: E) -> Result<E, InternalError> {
175 self.save_entity(SaveMode::Replace, entity)
176 }
177
178 fn save_batch_non_atomic(
187 &self,
188 mode: SaveMode,
189 entities: impl IntoIterator<Item = E>,
190 ) -> Result<Vec<E>, InternalError> {
191 let iter = entities.into_iter();
192 let mut out = Vec::with_capacity(iter.size_hint().0);
193
194 for entity in iter {
195 match self.save_entity(mode, entity) {
196 Ok(saved) => out.push(saved),
197 Err(err) => {
198 if !out.is_empty() {
199 record(MetricsEvent::NonAtomicPartialCommit {
200 entity_path: E::PATH,
201 committed_rows: u64::try_from(out.len()).unwrap_or(u64::MAX),
202 });
203 }
204
205 return Err(err);
206 }
207 }
208 }
209
210 Ok(out)
211 }
212
213 fn save_batch_atomic(
220 &self,
221 mode: SaveMode,
222 entities: impl IntoIterator<Item = E>,
223 ) -> Result<Vec<E>, InternalError> {
224 let entities: Vec<E> = entities.into_iter().collect();
225
226 self.save_batch_atomic_impl(SaveRule::from_mode(mode), entities)
227 }
228
229 pub(crate) fn insert_many_atomic(
233 &self,
234 entities: impl IntoIterator<Item = E>,
235 ) -> Result<Vec<E>, InternalError> {
236 self.save_batch_atomic(SaveMode::Insert, entities)
237 }
238
239 pub(crate) fn update_many_atomic(
243 &self,
244 entities: impl IntoIterator<Item = E>,
245 ) -> Result<Vec<E>, InternalError> {
246 self.save_batch_atomic(SaveMode::Update, entities)
247 }
248
249 pub(crate) fn replace_many_atomic(
253 &self,
254 entities: impl IntoIterator<Item = E>,
255 ) -> Result<Vec<E>, InternalError> {
256 self.save_batch_atomic(SaveMode::Replace, entities)
257 }
258
259 pub(crate) fn insert_many_non_atomic(
263 &self,
264 entities: impl IntoIterator<Item = E>,
265 ) -> Result<Vec<E>, InternalError> {
266 self.save_batch_non_atomic(SaveMode::Insert, entities)
267 }
268
269 pub(crate) fn update_many_non_atomic(
273 &self,
274 entities: impl IntoIterator<Item = E>,
275 ) -> Result<Vec<E>, InternalError> {
276 self.save_batch_non_atomic(SaveMode::Update, entities)
277 }
278
279 pub(crate) fn replace_many_non_atomic(
283 &self,
284 entities: impl IntoIterator<Item = E>,
285 ) -> Result<Vec<E>, InternalError> {
286 self.save_batch_non_atomic(SaveMode::Replace, entities)
287 }
288
289 #[inline(never)]
292 fn save_batch_atomic_impl(
293 &self,
294 save_rule: SaveRule,
295 entities: Vec<E>,
296 ) -> Result<Vec<E>, InternalError> {
297 let mut span = Span::<E>::new(ExecKind::Save);
299 let ctx = mutation_write_context::<E>(&self.db)?;
300 let mut out = Vec::with_capacity(entities.len());
301 let mut marker_row_ops = Vec::with_capacity(entities.len());
302 let mut seen_row_keys = BTreeSet::<Vec<u8>>::new();
303
304 for mut entity in entities {
306 self.preflight_entity(&mut entity)?;
307 let mutation = MutationInput::from_entity(&entity)?;
308
309 let (marker_row_op, data_key) =
310 Self::prepare_logical_row_op(&ctx, save_rule, &mutation)?;
311 if !seen_row_keys.insert(marker_row_op.key.clone()) {
312 return Err(InternalError::mutation_atomic_save_duplicate_key(
313 E::PATH,
314 data_key,
315 ));
316 }
317 marker_row_ops.push(marker_row_op);
318 out.push(entity);
319 }
320
321 if marker_row_ops.is_empty() {
322 return Ok(out);
323 }
324
325 Self::commit_atomic_batch(&self.db, marker_row_ops, &mut span)?;
327
328 Ok(out)
329 }
330
331 fn prepare_logical_row_op(
333 ctx: &Context<'_, E>,
334 save_rule: SaveRule,
335 mutation: &MutationInput,
336 ) -> Result<(CommitRowOp, DataKey), InternalError> {
337 let data_key = mutation.data_key().clone();
339 let raw_key = data_key.to_raw()?;
340 let old_raw = Self::resolve_existing_row_for_rule(ctx, &data_key, save_rule)?;
341 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
342
343 let row = Self::build_after_image_row(mutation, old_raw.as_ref())?;
345 let row_op = CommitRowOp::new(
346 E::PATH,
347 raw_key.as_bytes().to_vec(),
348 old_raw.as_ref().map(|item| item.as_bytes().to_vec()),
349 Some(row.as_bytes().to_vec()),
350 schema_fingerprint,
351 );
352
353 Ok((row_op, data_key))
354 }
355
356 fn build_after_image_row(
358 mutation: &MutationInput,
359 old_row: Option<&RawRow>,
360 ) -> Result<CanonicalRow, InternalError> {
361 let Some(old_row) = old_row else {
362 return RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch());
363 };
364
365 old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
366 }
367
368 fn build_structural_after_image_row(
370 mode: MutationMode,
371 mutation: &MutationInput,
372 old_row: Option<&RawRow>,
373 ) -> Result<CanonicalRow, InternalError> {
374 match mode {
375 MutationMode::Update => {
376 let Some(old_row) = old_row else {
377 return RawRow::from_serialized_update_patch(
378 E::MODEL,
379 mutation.serialized_patch(),
380 );
381 };
382
383 old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
384 }
385 MutationMode::Insert | MutationMode::Replace => {
386 RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch())
387 }
388 }
389 }
390
391 fn resolve_existing_row_for_rule(
393 ctx: &Context<'_, E>,
394 data_key: &DataKey,
395 save_rule: SaveRule,
396 ) -> Result<Option<RawRow>, InternalError> {
397 let raw_key = data_key.to_raw()?;
398
399 match save_rule {
400 SaveRule::RequireAbsent => {
401 if let Some(existing) = ctx.with_store(|store| store.get(&raw_key))? {
402 Self::validate_existing_row_identity(data_key, &existing)?;
403 return Err(ExecutorError::KeyExists(data_key.clone()).into());
404 }
405
406 Ok(None)
407 }
408 SaveRule::RequirePresent => {
409 let old_row = ctx
410 .with_store(|store| store.get(&raw_key))?
411 .ok_or_else(|| InternalError::store_not_found(data_key.to_string()))?;
412 Self::validate_existing_row_identity(data_key, &old_row)?;
413
414 Ok(Some(old_row))
415 }
416 SaveRule::AllowAny => {
417 let old_row = ctx.with_store(|store| store.get(&raw_key))?;
418 if let Some(old) = old_row.as_ref() {
419 Self::validate_existing_row_identity(data_key, old)?;
420 }
421
422 Ok(old_row)
423 }
424 }
425 }
426
427 fn validate_existing_row_identity(
429 data_key: &DataKey,
430 row: &RawRow,
431 ) -> Result<(), InternalError> {
432 Self::ensure_persisted_row_invariants(data_key, row).map_err(|err| {
433 match (err.class(), err.origin()) {
434 (
435 crate::error::ErrorClass::Corruption,
436 crate::error::ErrorOrigin::Serialize | crate::error::ErrorOrigin::Store,
437 ) => err,
438 _ => InternalError::from(ExecutorError::persisted_row_invariant_violation(
439 data_key,
440 &err.message,
441 )),
442 }
443 })?;
444
445 Ok(())
446 }
447
448 fn save_entity(&self, mode: SaveMode, entity: E) -> Result<E, InternalError> {
449 let mut entity = entity;
450 (|| {
451 let mut span = Span::<E>::new(ExecKind::Save);
452 let ctx = mutation_write_context::<E>(&self.db)?;
453 let save_rule = SaveRule::from_mode(mode);
454
455 self.preflight_entity(&mut entity)?;
457 let mutation = MutationInput::from_entity(&entity)?;
458
459 let (marker_row_op, _data_key) =
460 Self::prepare_logical_row_op(&ctx, save_rule, &mutation)?;
461
462 Self::commit_single_row(&self.db, marker_row_op, &mut span)?;
468
469 Ok(entity)
470 })()
471 }
472
473 #[allow(dead_code)]
475 pub(in crate::db) fn apply_structural_mutation(
476 &self,
477 mode: MutationMode,
478 key: E::Key,
479 patch: UpdatePatch,
480 ) -> Result<E, InternalError> {
481 let mutation = MutationInput::from_update_patch::<E>(key, &patch)?;
482
483 self.save_structural_mutation(mode, mutation)
484 }
485
486 #[allow(dead_code)]
487 fn save_structural_mutation(
488 &self,
489 mode: MutationMode,
490 mutation: MutationInput,
491 ) -> Result<E, InternalError> {
492 let mut span = Span::<E>::new(ExecKind::Save);
493 let ctx = mutation_write_context::<E>(&self.db)?;
494 let data_key = mutation.data_key().clone();
495 let old_raw = Self::resolve_existing_row_for_rule(&ctx, &data_key, mode.save_rule())?;
496 let raw_after_image =
497 Self::build_structural_after_image_row(mode, &mutation, old_raw.as_ref())?;
498 let entity = self.validate_structural_after_image(&data_key, &raw_after_image)?;
499 let normalized_mutation = MutationInput::from_entity(&entity)?;
500 let row =
501 Self::build_structural_after_image_row(mode, &normalized_mutation, old_raw.as_ref())?;
502 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
503 let marker_row_op = CommitRowOp::new(
504 E::PATH,
505 data_key.to_raw()?.as_bytes().to_vec(),
506 old_raw.as_ref().map(|item| item.as_bytes().to_vec()),
507 Some(row.as_bytes().to_vec()),
508 schema_fingerprint,
509 );
510
511 Self::commit_single_row(&self.db, marker_row_op, &mut span)?;
512
513 Ok(entity)
514 }
515
516 #[allow(dead_code)]
519 fn validate_structural_after_image(
520 &self,
521 data_key: &DataKey,
522 row: &RawRow,
523 ) -> Result<E, InternalError> {
524 let expected_key = data_key.try_key::<E>()?;
525 let mut entity = row.try_decode::<E>().map_err(|err| {
526 InternalError::mutation_structural_after_image_invalid(
527 E::PATH,
528 data_key,
529 err.to_string(),
530 )
531 })?;
532 let identity_key = entity.id().key();
533 if identity_key != expected_key {
534 let field_name = E::MODEL.primary_key().name();
535 let field_value = FieldValue::to_value(&identity_key);
536 let identity_value = FieldValue::to_value(&expected_key);
537
538 return Err(InternalError::mutation_entity_primary_key_mismatch(
539 E::PATH,
540 field_name,
541 &field_value,
542 &identity_value,
543 ));
544 }
545
546 self.preflight_entity(&mut entity)?;
547
548 Ok(entity)
549 }
550
551 fn commit_single_row(
553 db: &Db<E::Canister>,
554 marker_row_op: CommitRowOp,
555 span: &mut Span<E>,
556 ) -> Result<(), InternalError> {
557 commit_single_save_row_op_with_window::<E>(db, marker_row_op, "save_row_apply", || {
559 span.set_rows(1);
560 })?;
561
562 Ok(())
563 }
564
565 fn commit_atomic_batch(
567 db: &Db<E::Canister>,
568 marker_row_ops: Vec<CommitRowOp>,
569 span: &mut Span<E>,
570 ) -> Result<(), InternalError> {
571 let rows_touched = u64::try_from(marker_row_ops.len()).unwrap_or(u64::MAX);
572 commit_save_row_ops_with_window::<E>(
573 db,
574 marker_row_ops,
575 "save_batch_atomic_row_apply",
576 || {
577 span.set_rows(rows_touched);
578 },
579 )?;
580
581 Ok(())
582 }
583}