1use crate::{
7 db::{
8 Db,
9 commit::CommitRowOp,
10 data::{DataKey, PersistedRow, RawRow, UpdatePatch, decode_raw_row_for_entity_key},
11 executor::{
12 Context, ExecutorError,
13 mutation::{
14 StructuralMutationInput, commit_save_row_ops_with_window, mutation_write_context,
15 },
16 },
17 schema::commit_schema_fingerprint_for_entity,
18 },
19 error::InternalError,
20 metrics::sink::{ExecKind, MetricsEvent, Span, record},
21 traits::{EntityValue, FieldValue},
22};
23use candid::CandidType;
24use derive_more::Display;
25use serde::{Deserialize, Serialize};
26use std::collections::BTreeSet;
27
28#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Display, Serialize)]
40enum SaveMode {
41 #[default]
42 Insert,
43 Replace,
44 Update,
45}
46
47#[derive(Clone, Copy)]
52pub(in crate::db) struct SaveExecutor<E: PersistedRow + EntityValue> {
53 pub(in crate::db::executor::mutation) db: Db<E::Canister>,
54}
55
56#[derive(Clone, Copy)]
62enum SaveRule {
63 RequireAbsent,
64 RequirePresent,
65 AllowAny,
66}
67
68impl SaveRule {
69 const fn from_mode(mode: SaveMode) -> Self {
70 match mode {
71 SaveMode::Insert => Self::RequireAbsent,
72 SaveMode::Update => Self::RequirePresent,
73 SaveMode::Replace => Self::AllowAny,
74 }
75 }
76}
77
78#[derive(Clone, Copy)]
90pub enum StructuralMutationMode {
91 #[allow(dead_code)]
92 Insert,
93 #[allow(dead_code)]
94 Replace,
95 Update,
96}
97
98impl StructuralMutationMode {
99 const fn save_rule(self) -> SaveRule {
100 match self {
101 Self::Insert => SaveRule::RequireAbsent,
102 Self::Replace => SaveRule::AllowAny,
103 Self::Update => SaveRule::RequirePresent,
104 }
105 }
106}
107
108impl<E: PersistedRow + EntityValue> SaveExecutor<E> {
109 #[must_use]
115 pub(in crate::db) const fn new(db: Db<E::Canister>, _debug: bool) -> Self {
116 Self { db }
117 }
118
119 pub(crate) fn insert(&self, entity: E) -> Result<E, InternalError> {
125 self.save_entity(SaveMode::Insert, entity)
126 }
127
128 pub(crate) fn update(&self, entity: E) -> Result<E, InternalError> {
130 self.save_entity(SaveMode::Update, entity)
131 }
132
133 #[allow(dead_code)]
138 pub(in crate::db) fn insert_structural(
139 &self,
140 key: E::Key,
141 patch: UpdatePatch,
142 ) -> Result<E, InternalError> {
143 self.apply_structural_mutation(StructuralMutationMode::Insert, key, patch)
144 }
145
146 #[allow(dead_code)]
151 pub(in crate::db) fn replace_structural(
152 &self,
153 key: E::Key,
154 patch: UpdatePatch,
155 ) -> Result<E, InternalError> {
156 self.apply_structural_mutation(StructuralMutationMode::Replace, key, patch)
157 }
158
159 #[allow(dead_code)]
164 pub(in crate::db) fn update_structural(
165 &self,
166 key: E::Key,
167 patch: UpdatePatch,
168 ) -> Result<E, InternalError> {
169 self.apply_structural_mutation(StructuralMutationMode::Update, key, patch)
170 }
171
172 pub(crate) fn replace(&self, entity: E) -> Result<E, InternalError> {
174 self.save_entity(SaveMode::Replace, entity)
175 }
176
177 fn save_batch_non_atomic(
186 &self,
187 mode: SaveMode,
188 entities: impl IntoIterator<Item = E>,
189 ) -> Result<Vec<E>, InternalError> {
190 let iter = entities.into_iter();
191 let mut out = Vec::with_capacity(iter.size_hint().0);
192
193 for entity in iter {
194 match self.save_entity(mode, entity) {
195 Ok(saved) => out.push(saved),
196 Err(err) => {
197 if !out.is_empty() {
198 record(MetricsEvent::NonAtomicPartialCommit {
199 entity_path: E::PATH,
200 committed_rows: u64::try_from(out.len()).unwrap_or(u64::MAX),
201 });
202 }
203
204 return Err(err);
205 }
206 }
207 }
208
209 Ok(out)
210 }
211
212 fn save_batch_atomic(
219 &self,
220 mode: SaveMode,
221 entities: impl IntoIterator<Item = E>,
222 ) -> Result<Vec<E>, InternalError> {
223 let entities: Vec<E> = entities.into_iter().collect();
224
225 self.save_batch_atomic_impl(SaveRule::from_mode(mode), entities)
226 }
227
228 pub(crate) fn insert_many_atomic(
232 &self,
233 entities: impl IntoIterator<Item = E>,
234 ) -> Result<Vec<E>, InternalError> {
235 self.save_batch_atomic(SaveMode::Insert, entities)
236 }
237
238 pub(crate) fn update_many_atomic(
242 &self,
243 entities: impl IntoIterator<Item = E>,
244 ) -> Result<Vec<E>, InternalError> {
245 self.save_batch_atomic(SaveMode::Update, entities)
246 }
247
248 pub(crate) fn replace_many_atomic(
252 &self,
253 entities: impl IntoIterator<Item = E>,
254 ) -> Result<Vec<E>, InternalError> {
255 self.save_batch_atomic(SaveMode::Replace, entities)
256 }
257
258 pub(crate) fn insert_many_non_atomic(
262 &self,
263 entities: impl IntoIterator<Item = E>,
264 ) -> Result<Vec<E>, InternalError> {
265 self.save_batch_non_atomic(SaveMode::Insert, entities)
266 }
267
268 pub(crate) fn update_many_non_atomic(
272 &self,
273 entities: impl IntoIterator<Item = E>,
274 ) -> Result<Vec<E>, InternalError> {
275 self.save_batch_non_atomic(SaveMode::Update, entities)
276 }
277
278 pub(crate) fn replace_many_non_atomic(
282 &self,
283 entities: impl IntoIterator<Item = E>,
284 ) -> Result<Vec<E>, InternalError> {
285 self.save_batch_non_atomic(SaveMode::Replace, entities)
286 }
287
288 #[inline(never)]
291 fn save_batch_atomic_impl(
292 &self,
293 save_rule: SaveRule,
294 entities: Vec<E>,
295 ) -> Result<Vec<E>, InternalError> {
296 let mut span = Span::<E>::new(ExecKind::Save);
298 let ctx = mutation_write_context::<E>(&self.db)?;
299 let mut out = Vec::with_capacity(entities.len());
300 let mut marker_row_ops = Vec::with_capacity(entities.len());
301 let mut seen_row_keys = BTreeSet::<Vec<u8>>::new();
302
303 for mut entity in entities {
305 self.preflight_entity(&mut entity)?;
306 let mutation = StructuralMutationInput::from_entity(&entity)?;
307
308 let (marker_row_op, data_key) =
309 Self::prepare_logical_row_op(&ctx, save_rule, &mutation)?;
310 if !seen_row_keys.insert(marker_row_op.key.clone()) {
311 return Err(InternalError::mutation_atomic_save_duplicate_key(
312 E::PATH,
313 data_key,
314 ));
315 }
316 marker_row_ops.push(marker_row_op);
317 out.push(entity);
318 }
319
320 if marker_row_ops.is_empty() {
321 return Ok(out);
322 }
323
324 Self::commit_atomic_batch(&self.db, marker_row_ops, &mut span)?;
326
327 Ok(out)
328 }
329
330 fn prepare_logical_row_op(
332 ctx: &Context<'_, E>,
333 save_rule: SaveRule,
334 mutation: &StructuralMutationInput,
335 ) -> Result<(CommitRowOp, DataKey), InternalError> {
336 let data_key = mutation.data_key().clone();
338 let raw_key = data_key.to_raw()?;
339 let old_raw = Self::resolve_existing_row_for_rule(ctx, &data_key, save_rule)?;
340 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
341
342 let row = Self::build_after_image_row(mutation, old_raw.as_ref())?;
344 let row_op = CommitRowOp::new(
345 E::PATH,
346 raw_key.as_bytes().to_vec(),
347 old_raw.as_ref().map(|item| item.as_bytes().to_vec()),
348 Some(row.as_bytes().to_vec()),
349 schema_fingerprint,
350 );
351
352 Ok((row_op, data_key))
353 }
354
355 fn build_after_image_row(
357 mutation: &StructuralMutationInput,
358 old_row: Option<&RawRow>,
359 ) -> Result<RawRow, InternalError> {
360 let Some(old_row) = old_row else {
361 return RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch());
362 };
363
364 old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
365 }
366
367 fn build_structural_after_image_row(
369 mode: StructuralMutationMode,
370 mutation: &StructuralMutationInput,
371 old_row: Option<&RawRow>,
372 ) -> Result<RawRow, InternalError> {
373 match mode {
374 StructuralMutationMode::Update => {
375 let Some(old_row) = old_row else {
376 return RawRow::from_serialized_update_patch(
377 E::MODEL,
378 mutation.serialized_patch(),
379 );
380 };
381
382 old_row.apply_serialized_update_patch(E::MODEL, mutation.serialized_patch())
383 }
384 StructuralMutationMode::Insert | StructuralMutationMode::Replace => {
385 RawRow::from_serialized_update_patch(E::MODEL, mutation.serialized_patch())
386 }
387 }
388 }
389
390 fn resolve_existing_row_for_rule(
392 ctx: &Context<'_, E>,
393 data_key: &DataKey,
394 save_rule: SaveRule,
395 ) -> Result<Option<RawRow>, InternalError> {
396 let raw_key = data_key.to_raw()?;
397
398 match save_rule {
399 SaveRule::RequireAbsent => {
400 if let Some(existing) = ctx.with_store(|store| store.get(&raw_key))? {
401 Self::validate_existing_row_identity(data_key, &existing)?;
402 return Err(ExecutorError::KeyExists(data_key.clone()).into());
403 }
404
405 Ok(None)
406 }
407 SaveRule::RequirePresent => {
408 let old_row = ctx
409 .with_store(|store| store.get(&raw_key))?
410 .ok_or_else(|| InternalError::store_not_found(data_key.to_string()))?;
411 Self::validate_existing_row_identity(data_key, &old_row)?;
412
413 Ok(Some(old_row))
414 }
415 SaveRule::AllowAny => {
416 let old_row = ctx.with_store(|store| store.get(&raw_key))?;
417 if let Some(old) = old_row.as_ref() {
418 Self::validate_existing_row_identity(data_key, old)?;
419 }
420
421 Ok(old_row)
422 }
423 }
424 }
425
426 fn validate_existing_row_identity(
428 data_key: &DataKey,
429 row: &RawRow,
430 ) -> Result<(), InternalError> {
431 let (_, decoded) = decode_raw_row_for_entity_key::<E>(data_key, row)?;
432 Self::ensure_entity_invariants(&decoded).map_err(|err| {
433 InternalError::from(ExecutorError::persisted_row_invariant_violation(
434 data_key,
435 &err.message,
436 ))
437 })?;
438
439 Ok(())
440 }
441
442 fn save_entity(&self, mode: SaveMode, entity: E) -> Result<E, InternalError> {
443 let mut entity = entity;
444 (|| {
445 let mut span = Span::<E>::new(ExecKind::Save);
446 let ctx = mutation_write_context::<E>(&self.db)?;
447 let save_rule = SaveRule::from_mode(mode);
448
449 self.preflight_entity(&mut entity)?;
451 let mutation = StructuralMutationInput::from_entity(&entity)?;
452
453 let (marker_row_op, _data_key) =
454 Self::prepare_logical_row_op(&ctx, save_rule, &mutation)?;
455
456 ctx.with_store(|_| ())?;
458
459 Self::commit_single_row(&self.db, marker_row_op, &mut span)?;
465
466 Ok(entity)
467 })()
468 }
469
470 #[allow(dead_code)]
472 pub(in crate::db) fn apply_structural_mutation(
473 &self,
474 mode: StructuralMutationMode,
475 key: E::Key,
476 patch: UpdatePatch,
477 ) -> Result<E, InternalError> {
478 let mutation = StructuralMutationInput::from_update_patch::<E>(key, &patch)?;
479
480 self.save_structural_mutation(mode, mutation)
481 }
482
483 #[allow(dead_code)]
484 fn save_structural_mutation(
485 &self,
486 mode: StructuralMutationMode,
487 mutation: StructuralMutationInput,
488 ) -> Result<E, InternalError> {
489 let mut span = Span::<E>::new(ExecKind::Save);
490 let ctx = mutation_write_context::<E>(&self.db)?;
491 let data_key = mutation.data_key().clone();
492 let old_raw = Self::resolve_existing_row_for_rule(&ctx, &data_key, mode.save_rule())?;
493 let raw_after_image =
494 Self::build_structural_after_image_row(mode, &mutation, old_raw.as_ref())?;
495 let entity = self.validate_structural_after_image(&data_key, &raw_after_image)?;
496 let normalized_mutation = StructuralMutationInput::from_entity(&entity)?;
497 let row =
498 Self::build_structural_after_image_row(mode, &normalized_mutation, old_raw.as_ref())?;
499 let schema_fingerprint = commit_schema_fingerprint_for_entity::<E>();
500 let marker_row_op = CommitRowOp::new(
501 E::PATH,
502 data_key.to_raw()?.as_bytes().to_vec(),
503 old_raw.as_ref().map(|item| item.as_bytes().to_vec()),
504 Some(row.as_bytes().to_vec()),
505 schema_fingerprint,
506 );
507
508 ctx.with_store(|_| ())?;
509 Self::commit_single_row(&self.db, marker_row_op, &mut span)?;
510
511 Ok(entity)
512 }
513
514 #[allow(dead_code)]
517 fn validate_structural_after_image(
518 &self,
519 data_key: &DataKey,
520 row: &RawRow,
521 ) -> Result<E, InternalError> {
522 let expected_key = data_key.try_key::<E>()?;
523 let mut entity = row.try_decode::<E>().map_err(|err| {
524 InternalError::mutation_structural_after_image_invalid(
525 E::PATH,
526 data_key,
527 err.to_string(),
528 )
529 })?;
530 let identity_key = entity.id().key();
531 if identity_key != expected_key {
532 let field_name = E::MODEL.primary_key().name();
533 let field_value = FieldValue::to_value(&identity_key);
534 let identity_value = FieldValue::to_value(&expected_key);
535
536 return Err(InternalError::mutation_entity_primary_key_mismatch(
537 E::PATH,
538 field_name,
539 &field_value,
540 &identity_value,
541 ));
542 }
543
544 self.preflight_entity(&mut entity)?;
545
546 Ok(entity)
547 }
548
549 fn commit_single_row(
551 db: &Db<E::Canister>,
552 marker_row_op: CommitRowOp,
553 span: &mut Span<E>,
554 ) -> Result<(), InternalError> {
555 commit_save_row_ops_with_window::<E>(db, vec![marker_row_op], "save_row_apply", || {
557 span.set_rows(1);
558 })?;
559
560 Ok(())
561 }
562
563 fn commit_atomic_batch(
565 db: &Db<E::Canister>,
566 marker_row_ops: Vec<CommitRowOp>,
567 span: &mut Span<E>,
568 ) -> Result<(), InternalError> {
569 let rows_touched = u64::try_from(marker_row_ops.len()).unwrap_or(u64::MAX);
570 commit_save_row_ops_with_window::<E>(
571 db,
572 marker_row_ops,
573 "save_batch_atomic_row_apply",
574 || {
575 span.set_rows(rows_touched);
576 },
577 )?;
578
579 Ok(())
580 }
581}