1use crate::{
2 db::{
3 CommitDataOp, CommitIndexOp, CommitKind, CommitMarker, Db, WriteUnit, begin_commit,
4 ensure_recovered,
5 executor::{
6 ExecutorError,
7 trace::{QueryTraceSink, TraceExecutorKind, start_exec_trace},
8 },
9 finish_commit,
10 index::{
11 IndexKey, IndexStore, MAX_INDEX_ENTRY_BYTES, RawIndexEntry, RawIndexKey,
12 plan::{IndexApplyPlan, plan_index_mutation_for_entity},
13 },
14 query::{SaveMode, SaveQuery},
15 store::{DataKey, RawDataKey, RawRow},
16 },
17 error::{ErrorClass, ErrorOrigin, InternalError},
18 obs::sink::{self, ExecKind, MetricsEvent, Span},
19 sanitize::sanitize,
20 serialize::{deserialize, serialize},
21 traits::{EntityKind, Path, Storable},
22 validate::validate,
23};
24use std::{
25 borrow::Cow, cell::RefCell, collections::BTreeMap, marker::PhantomData, thread::LocalKey,
26};
27
28#[derive(Clone, Copy)]
33pub struct SaveExecutor<E: EntityKind> {
34 db: Db<E::Canister>,
35 debug: bool,
36 trace: Option<&'static dyn QueryTraceSink>,
37 _marker: PhantomData<E>,
38}
39
40impl<E: EntityKind> SaveExecutor<E> {
41 #[must_use]
46 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
47 Self {
48 db,
49 debug,
50 trace: None,
51 _marker: PhantomData,
52 }
53 }
54
55 #[must_use]
56 #[allow(dead_code)]
57 pub(crate) const fn with_trace_sink(
58 mut self,
59 sink: Option<&'static dyn QueryTraceSink>,
60 ) -> Self {
61 self.trace = sink;
62 self
63 }
64
65 #[must_use]
66 pub const fn debug(mut self) -> Self {
67 self.debug = true;
68 self
69 }
70
71 fn debug_log(&self, s: impl Into<String>) {
72 if self.debug {
73 println!("{}", s.into());
74 }
75 }
76
77 pub fn insert(&self, entity: E) -> Result<E, InternalError> {
83 self.save_entity(SaveMode::Insert, entity)
84 }
85
86 pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
88 let entity = E::from_view(view)?;
89 Ok(self.insert(entity)?.to_view())
90 }
91
92 pub fn update(&self, entity: E) -> Result<E, InternalError> {
94 self.save_entity(SaveMode::Update, entity)
95 }
96
97 pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
99 let entity = E::from_view(view)?;
100
101 Ok(self.update(entity)?.to_view())
102 }
103
104 pub fn replace(&self, entity: E) -> Result<E, InternalError> {
106 self.save_entity(SaveMode::Replace, entity)
107 }
108
109 pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
111 let entity = E::from_view(view)?;
112
113 Ok(self.replace(entity)?.to_view())
114 }
115
116 pub fn insert_many(
121 &self,
122 entities: impl IntoIterator<Item = E>,
123 ) -> Result<Vec<E>, InternalError> {
124 let iter = entities.into_iter();
125 let mut out = Vec::with_capacity(iter.size_hint().0);
126
127 for entity in iter {
130 out.push(self.insert(entity)?);
131 }
132
133 Ok(out)
134 }
135
136 pub fn update_many(
137 &self,
138 entities: impl IntoIterator<Item = E>,
139 ) -> Result<Vec<E>, InternalError> {
140 let iter = entities.into_iter();
141 let mut out = Vec::with_capacity(iter.size_hint().0);
142
143 for entity in iter {
146 out.push(self.update(entity)?);
147 }
148
149 Ok(out)
150 }
151
152 pub fn replace_many(
153 &self,
154 entities: impl IntoIterator<Item = E>,
155 ) -> Result<Vec<E>, InternalError> {
156 let iter = entities.into_iter();
157 let mut out = Vec::with_capacity(iter.size_hint().0);
158
159 for entity in iter {
162 out.push(self.replace(entity)?);
163 }
164
165 Ok(out)
166 }
167
168 pub fn execute(&self, query: SaveQuery) -> Result<E, InternalError> {
177 let entity: E = deserialize(&query.bytes).map_err(|err| {
178 InternalError::new(
179 ErrorClass::Unsupported,
180 ErrorOrigin::Serialize,
181 format!("save query decode failed: {err}"),
182 )
183 })?;
184 self.save_entity(query.mode, entity)
185 }
186
187 #[expect(clippy::too_many_lines)]
188 fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, InternalError> {
189 let trace = start_exec_trace(
190 self.trace,
191 TraceExecutorKind::Save,
192 E::PATH,
193 None,
194 Some(save_mode_tag(mode)),
195 );
196 let result = (|| {
197 let mut span = Span::<E>::new(ExecKind::Save);
198 let ctx = self.db.context::<E>();
199
200 ensure_recovered(&self.db)?;
202
203 sanitize(&mut entity)?;
205 validate(&entity)?;
206
207 let key = entity.key();
208 let data_key = DataKey::new::<E>(key);
209 let raw_key = data_key.to_raw()?;
210
211 self.debug_log(format!(
212 "[debug] save {:?} on {} (key={})",
213 mode,
214 E::PATH,
215 data_key
216 ));
217 let (old, old_raw) = match mode {
218 SaveMode::Insert => {
219 if ctx.with_store(|store| store.contains_key(&raw_key))? {
221 return Err(ExecutorError::KeyExists(data_key).into());
222 }
223 (None, None)
224 }
225 SaveMode::Update => {
226 let Some(old_row) = ctx.with_store(|store| store.get(&raw_key))? else {
227 return Err(InternalError::store_not_found(data_key.to_string()));
228 };
229 let old = old_row.try_decode::<E>().map_err(|err| {
230 ExecutorError::corruption(
231 ErrorOrigin::Serialize,
232 format!("failed to deserialize row: {data_key} ({err})"),
233 )
234 })?;
235 (Some(old), Some(old_row))
236 }
237 SaveMode::Replace => {
238 let old_row = ctx.with_store(|store| store.get(&raw_key))?;
239 let old = old_row
240 .as_ref()
241 .map(|row| {
242 row.try_decode::<E>().map_err(|err| {
243 ExecutorError::corruption(
244 ErrorOrigin::Serialize,
245 format!("failed to deserialize row: {data_key} ({err})"),
246 )
247 })
248 })
249 .transpose()?;
250 (old, old_row)
251 }
252 };
253
254 let bytes = serialize(&entity)?;
255 let row = RawRow::try_new(bytes)?;
256
257 ctx.with_store(|_| ())?;
259
260 let index_plan =
265 plan_index_mutation_for_entity::<E>(&self.db, old.as_ref(), Some(&entity))?;
266 let data_op = CommitDataOp {
267 store: E::Store::PATH.to_string(),
268 key: raw_key.as_bytes().to_vec(),
269 value: Some(row.as_bytes().to_vec()),
270 };
271 let marker = CommitMarker::new(CommitKind::Save, index_plan.commit_ops, vec![data_op])?;
272 let (index_apply_stores, index_rollback_ops) =
273 Self::prepare_index_save_ops(&index_plan.apply, &marker.index_ops)?;
274 let (index_removes, index_inserts) = Self::plan_index_metrics(old.as_ref(), &entity)?;
275 let data_rollback_ops = Self::prepare_data_save_ops(&marker.data_ops, old_raw)?;
276 let commit = begin_commit(marker)?;
277
278 finish_commit(commit, |guard| {
280 let mut unit = WriteUnit::new("save_entity_atomic");
281 let index_rollback_ops = index_rollback_ops;
282 unit.record_rollback(move || Self::apply_index_rollbacks(index_rollback_ops));
283 Self::apply_marker_index_ops(&guard.marker.index_ops, index_apply_stores);
284 for _ in 0..index_removes {
285 sink::record(MetricsEvent::IndexRemove {
286 entity_path: E::PATH,
287 });
288 }
289 for _ in 0..index_inserts {
290 sink::record(MetricsEvent::IndexInsert {
291 entity_path: E::PATH,
292 });
293 }
294
295 unit.checkpoint("save_entity_after_indexes")?;
296
297 let data_rollback_ops = data_rollback_ops;
298 let db = self.db;
299 unit.record_rollback(move || Self::apply_data_rollbacks(db, data_rollback_ops));
300 unit.run(|| Self::apply_marker_data_ops(&guard.marker.data_ops, &ctx))?;
301
302 span.set_rows(1);
303 unit.commit();
304 Ok(())
305 })?;
306
307 Ok(entity)
308 })();
309
310 if let Some(trace) = trace {
311 match &result {
312 Ok(_) => trace.finish(1),
313 Err(err) => trace.error(err),
314 }
315 }
316
317 result
318 }
319
320 fn plan_index_metrics(old: Option<&E>, new: &E) -> Result<(usize, usize), InternalError> {
326 let mut removes = 0usize;
327 let mut inserts = 0usize;
328
329 for index in E::INDEXES {
330 if let Some(old) = old
331 && IndexKey::new(old, index)?.is_some()
332 {
333 removes = removes.saturating_add(1);
334 }
335 if IndexKey::new(new, index)?.is_some() {
336 inserts = inserts.saturating_add(1);
337 }
338 }
339
340 Ok((removes, inserts))
341 }
342
343 #[allow(clippy::type_complexity)]
345 fn prepare_index_save_ops(
346 plans: &[IndexApplyPlan],
347 ops: &[CommitIndexOp],
348 ) -> Result<
349 (
350 Vec<&'static LocalKey<RefCell<IndexStore>>>,
351 Vec<PreparedIndexRollback>,
352 ),
353 InternalError,
354 > {
355 let mut stores = BTreeMap::new();
357 for plan in plans {
358 stores.insert(plan.index.store, plan.store);
359 }
360
361 let mut apply_stores = Vec::with_capacity(ops.len());
362 let mut rollbacks = Vec::with_capacity(ops.len());
363
364 for op in ops {
366 let store = stores.get(op.store.as_str()).ok_or_else(|| {
367 InternalError::new(
368 ErrorClass::Internal,
369 ErrorOrigin::Index,
370 format!(
371 "commit marker references unknown index store '{}' ({})",
372 op.store,
373 E::PATH
374 ),
375 )
376 })?;
377 if op.key.len() != IndexKey::STORED_SIZE as usize {
378 return Err(InternalError::new(
379 ErrorClass::Internal,
380 ErrorOrigin::Index,
381 format!(
382 "commit marker index key length {} does not match {} ({})",
383 op.key.len(),
384 IndexKey::STORED_SIZE,
385 E::PATH
386 ),
387 ));
388 }
389 if let Some(value) = &op.value
390 && value.len() > MAX_INDEX_ENTRY_BYTES as usize
391 {
392 return Err(InternalError::new(
393 ErrorClass::Internal,
394 ErrorOrigin::Index,
395 format!(
396 "commit marker index entry exceeds max size: {} bytes ({})",
397 value.len(),
398 E::PATH
399 ),
400 ));
401 }
402
403 let raw_key = RawIndexKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
404 let existing = store.with_borrow(|s| s.get(&raw_key));
405 if op.value.is_none() && existing.is_none() {
406 return Err(InternalError::new(
407 ErrorClass::Internal,
408 ErrorOrigin::Index,
409 format!(
410 "commit marker index op missing entry before save: {} ({})",
411 op.store,
412 E::PATH
413 ),
414 ));
415 }
416
417 apply_stores.push(*store);
418 rollbacks.push(PreparedIndexRollback {
419 store,
420 key: raw_key,
421 value: existing,
422 });
423 }
424
425 Ok((apply_stores, rollbacks))
426 }
427
428 fn prepare_data_save_ops(
430 ops: &[CommitDataOp],
431 old_row: Option<RawRow>,
432 ) -> Result<Vec<PreparedDataRollback>, InternalError> {
433 if ops.len() != 1 {
434 return Err(InternalError::new(
435 ErrorClass::Internal,
436 ErrorOrigin::Store,
437 format!(
438 "commit marker save expects 1 data op, found {} ({})",
439 ops.len(),
440 E::PATH
441 ),
442 ));
443 }
444
445 let op = &ops[0];
446 if op.store != E::Store::PATH {
447 return Err(InternalError::new(
448 ErrorClass::Internal,
449 ErrorOrigin::Store,
450 format!(
451 "commit marker references unexpected data store '{}' ({})",
452 op.store,
453 E::PATH
454 ),
455 ));
456 }
457 if op.key.len() != DataKey::STORED_SIZE as usize {
458 return Err(InternalError::new(
459 ErrorClass::Internal,
460 ErrorOrigin::Store,
461 format!(
462 "commit marker data key length {} does not match {} ({})",
463 op.key.len(),
464 DataKey::STORED_SIZE,
465 E::PATH
466 ),
467 ));
468 }
469 let Some(value) = &op.value else {
470 return Err(InternalError::new(
471 ErrorClass::Internal,
472 ErrorOrigin::Store,
473 format!("commit marker save missing data payload ({})", E::PATH),
474 ));
475 };
476 if value.len() > crate::db::store::MAX_ROW_BYTES as usize {
477 return Err(InternalError::new(
478 ErrorClass::Internal,
479 ErrorOrigin::Store,
480 format!(
481 "commit marker data payload exceeds max size: {} bytes ({})",
482 value.len(),
483 E::PATH
484 ),
485 ));
486 }
487
488 let raw_key = RawDataKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
489 Ok(vec![PreparedDataRollback {
490 key: raw_key,
491 value: old_row,
492 }])
493 }
494
495 fn apply_marker_index_ops(
497 ops: &[CommitIndexOp],
498 stores: Vec<&'static LocalKey<RefCell<IndexStore>>>,
499 ) {
500 debug_assert_eq!(
501 ops.len(),
502 stores.len(),
503 "commit marker index ops length mismatch"
504 );
505
506 for (op, store) in ops.iter().zip(stores.into_iter()) {
507 debug_assert_eq!(op.key.len(), IndexKey::STORED_SIZE as usize);
508 let raw_key = RawIndexKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
509
510 store.with_borrow_mut(|s| {
511 if let Some(value) = &op.value {
512 debug_assert!(value.len() <= MAX_INDEX_ENTRY_BYTES as usize);
513 let raw_entry = RawIndexEntry::from_bytes(Cow::Borrowed(value.as_slice()));
514 s.insert(raw_key, raw_entry);
515 } else {
516 s.remove(&raw_key);
517 }
518 });
519 }
520 }
521
522 fn apply_index_rollbacks(ops: Vec<PreparedIndexRollback>) {
524 for op in ops {
525 op.store.with_borrow_mut(|s| {
526 if let Some(value) = op.value {
527 s.insert(op.key, value);
528 } else {
529 s.remove(&op.key);
530 }
531 });
532 }
533 }
534
535 fn apply_marker_data_ops(
537 ops: &[CommitDataOp],
538 ctx: &crate::db::executor::Context<'_, E>,
539 ) -> Result<(), InternalError> {
540 for op in ops {
541 debug_assert!(op.value.is_some());
542 let Some(value) = op.value.as_ref() else {
543 return Err(InternalError::new(
544 ErrorClass::Internal,
545 ErrorOrigin::Store,
546 format!("commit marker save missing data payload ({})", E::PATH),
547 ));
548 };
549 let raw_key = RawDataKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
550 let raw_value = RawRow::from_bytes(Cow::Borrowed(value.as_slice()));
551 ctx.with_store_mut(|s| s.insert(raw_key, raw_value))?;
552 }
553 Ok(())
554 }
555
556 fn apply_data_rollbacks(db: Db<E::Canister>, ops: Vec<PreparedDataRollback>) {
558 let ctx = db.context::<E>();
559 for op in ops {
560 let _ = ctx.with_store_mut(|s| {
561 if let Some(value) = op.value {
562 s.insert(op.key, value);
563 } else {
564 s.remove(&op.key);
565 }
566 });
567 }
568 }
569}
570
571const fn save_mode_tag(mode: SaveMode) -> &'static str {
572 match mode {
573 SaveMode::Insert => "insert",
574 SaveMode::Update => "update",
575 SaveMode::Replace => "replace",
576 }
577}
578
579struct PreparedIndexRollback {
581 store: &'static LocalKey<RefCell<IndexStore>>,
582 key: RawIndexKey,
583 value: Option<RawIndexEntry>,
584}
585
586struct PreparedDataRollback {
588 key: RawDataKey,
589 value: Option<RawRow>,
590}