Skip to main content

icydb_core/db/executor/
save.rs

1use crate::{
2    db::{
3        CommitDataOp, CommitIndexOp, CommitKind, CommitMarker, Db, WriteUnit, begin_commit,
4        ensure_recovered,
5        executor::{
6            ExecutorError,
7            trace::{QueryTraceSink, TraceExecutorKind, start_exec_trace},
8        },
9        finish_commit,
10        index::{
11            IndexKey, IndexStore, MAX_INDEX_ENTRY_BYTES, RawIndexEntry, RawIndexKey,
12            plan::{IndexApplyPlan, plan_index_mutation_for_entity},
13        },
14        query::{SaveMode, SaveQuery},
15        store::{DataKey, RawDataKey, RawRow},
16    },
17    error::{ErrorClass, ErrorOrigin, InternalError},
18    obs::sink::{self, ExecKind, MetricsEvent, Span},
19    sanitize::sanitize,
20    serialize::{deserialize, serialize},
21    traits::{EntityKind, Path, Storable},
22    validate::validate,
23};
24use std::{
25    borrow::Cow, cell::RefCell, collections::BTreeMap, marker::PhantomData, thread::LocalKey,
26};
27
28///
29/// SaveExecutor
30///
31
32#[derive(Clone, Copy)]
33pub struct SaveExecutor<E: EntityKind> {
34    db: Db<E::Canister>,
35    debug: bool,
36    trace: Option<&'static dyn QueryTraceSink>,
37    _marker: PhantomData<E>,
38}
39
40impl<E: EntityKind> SaveExecutor<E> {
41    // ======================================================================
42    // Construction & configuration
43    // ======================================================================
44
45    #[must_use]
46    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
47        Self {
48            db,
49            debug,
50            trace: None,
51            _marker: PhantomData,
52        }
53    }
54
55    #[must_use]
56    #[allow(dead_code)]
57    pub(crate) const fn with_trace_sink(
58        mut self,
59        sink: Option<&'static dyn QueryTraceSink>,
60    ) -> Self {
61        self.trace = sink;
62        self
63    }
64
65    #[must_use]
66    pub const fn debug(mut self) -> Self {
67        self.debug = true;
68        self
69    }
70
71    fn debug_log(&self, s: impl Into<String>) {
72        if self.debug {
73            println!("{}", s.into());
74        }
75    }
76
77    // ======================================================================
78    // Single-entity save operations
79    // ======================================================================
80
81    /// Insert a brand-new entity (errors if the key already exists).
82    pub fn insert(&self, entity: E) -> Result<E, InternalError> {
83        self.save_entity(SaveMode::Insert, entity)
84    }
85
86    /// Insert a new view, returning the stored view.
87    pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
88        let entity = E::from_view(view)?;
89        Ok(self.insert(entity)?.to_view())
90    }
91
92    /// Update an existing entity (errors if it does not exist).
93    pub fn update(&self, entity: E) -> Result<E, InternalError> {
94        self.save_entity(SaveMode::Update, entity)
95    }
96
97    /// Update an existing view (errors if it does not exist).
98    pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
99        let entity = E::from_view(view)?;
100
101        Ok(self.update(entity)?.to_view())
102    }
103
104    /// Replace an entity, inserting if missing.
105    pub fn replace(&self, entity: E) -> Result<E, InternalError> {
106        self.save_entity(SaveMode::Replace, entity)
107    }
108
109    /// Replace a view, inserting if missing.
110    pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
111        let entity = E::from_view(view)?;
112
113        Ok(self.replace(entity)?.to_view())
114    }
115
116    // ======================================================================
117    // Batch save operations (fail-fast, non-atomic)
118    // ======================================================================
119
120    pub fn insert_many(
121        &self,
122        entities: impl IntoIterator<Item = E>,
123    ) -> Result<Vec<E>, InternalError> {
124        let iter = entities.into_iter();
125        let mut out = Vec::with_capacity(iter.size_hint().0);
126
127        // Batch semantics: fail-fast and non-atomic; partial successes remain.
128        // Retry-safe only with caller idempotency and conflict handling.
129        for entity in iter {
130            out.push(self.insert(entity)?);
131        }
132
133        Ok(out)
134    }
135
136    pub fn update_many(
137        &self,
138        entities: impl IntoIterator<Item = E>,
139    ) -> Result<Vec<E>, InternalError> {
140        let iter = entities.into_iter();
141        let mut out = Vec::with_capacity(iter.size_hint().0);
142
143        // Batch semantics: fail-fast and non-atomic; partial successes remain.
144        // Retry-safe only if the caller tolerates already-updated rows.
145        for entity in iter {
146            out.push(self.update(entity)?);
147        }
148
149        Ok(out)
150    }
151
152    pub fn replace_many(
153        &self,
154        entities: impl IntoIterator<Item = E>,
155    ) -> Result<Vec<E>, InternalError> {
156        let iter = entities.into_iter();
157        let mut out = Vec::with_capacity(iter.size_hint().0);
158
159        // Batch semantics: fail-fast and non-atomic; partial successes remain.
160        // Retry-safe only with caller idempotency and conflict handling.
161        for entity in iter {
162            out.push(self.replace(entity)?);
163        }
164
165        Ok(out)
166    }
167
168    // ======================================================================
169    // Low-level execution
170    // ======================================================================
171
172    /// Execute a serialized save query.
173    ///
174    /// NOTE: Deserialization here is over user-supplied bytes. Failures are
175    /// considered invalid input rather than storage corruption.
176    pub fn execute(&self, query: SaveQuery) -> Result<E, InternalError> {
177        let entity: E = deserialize(&query.bytes).map_err(|err| {
178            InternalError::new(
179                ErrorClass::Unsupported,
180                ErrorOrigin::Serialize,
181                format!("save query decode failed: {err}"),
182            )
183        })?;
184        self.save_entity(query.mode, entity)
185    }
186
187    #[expect(clippy::too_many_lines)]
188    fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, InternalError> {
189        let trace = start_exec_trace(
190            self.trace,
191            TraceExecutorKind::Save,
192            E::PATH,
193            None,
194            Some(save_mode_tag(mode)),
195        );
196        let result = (|| {
197            let mut span = Span::<E>::new(ExecKind::Save);
198            let ctx = self.db.context::<E>();
199
200            // Recovery is mutation-only to keep read paths side-effect free.
201            ensure_recovered(&self.db)?;
202
203            // Sanitize & validate before key extraction in case PK fields are normalized
204            sanitize(&mut entity)?;
205            validate(&entity)?;
206
207            let key = entity.key();
208            let data_key = DataKey::new::<E>(key);
209            let raw_key = data_key.to_raw()?;
210
211            self.debug_log(format!(
212                "[debug] save {:?} on {} (key={})",
213                mode,
214                E::PATH,
215                data_key
216            ));
217            let (old, old_raw) = match mode {
218                SaveMode::Insert => {
219                    // Inserts must not load or decode existing rows; absence is expected.
220                    if ctx.with_store(|store| store.contains_key(&raw_key))? {
221                        return Err(ExecutorError::KeyExists(data_key).into());
222                    }
223                    (None, None)
224                }
225                SaveMode::Update => {
226                    let Some(old_row) = ctx.with_store(|store| store.get(&raw_key))? else {
227                        return Err(InternalError::store_not_found(data_key.to_string()));
228                    };
229                    let old = old_row.try_decode::<E>().map_err(|err| {
230                        ExecutorError::corruption(
231                            ErrorOrigin::Serialize,
232                            format!("failed to deserialize row: {data_key} ({err})"),
233                        )
234                    })?;
235                    (Some(old), Some(old_row))
236                }
237                SaveMode::Replace => {
238                    let old_row = ctx.with_store(|store| store.get(&raw_key))?;
239                    let old = old_row
240                        .as_ref()
241                        .map(|row| {
242                            row.try_decode::<E>().map_err(|err| {
243                                ExecutorError::corruption(
244                                    ErrorOrigin::Serialize,
245                                    format!("failed to deserialize row: {data_key} ({err})"),
246                                )
247                            })
248                        })
249                        .transpose()?;
250                    (old, old_row)
251                }
252            };
253
254            let bytes = serialize(&entity)?;
255            let row = RawRow::try_new(bytes)?;
256
257            // Preflight data store availability before index mutations.
258            ctx.with_store(|_| ())?;
259
260            // Stage-2 atomicity:
261            // Prevalidate index/data mutations before the commit marker is written.
262            // After the marker is persisted, mutations run inside a WriteUnit so
263            // failures roll back before the marker is cleared.
264            let index_plan =
265                plan_index_mutation_for_entity::<E>(&self.db, old.as_ref(), Some(&entity))?;
266            let data_op = CommitDataOp {
267                store: E::Store::PATH.to_string(),
268                key: raw_key.as_bytes().to_vec(),
269                value: Some(row.as_bytes().to_vec()),
270            };
271            let marker = CommitMarker::new(CommitKind::Save, index_plan.commit_ops, vec![data_op])?;
272            let (index_apply_stores, index_rollback_ops) =
273                Self::prepare_index_save_ops(&index_plan.apply, &marker.index_ops)?;
274            let (index_removes, index_inserts) = Self::plan_index_metrics(old.as_ref(), &entity)?;
275            let data_rollback_ops = Self::prepare_data_save_ops(&marker.data_ops, old_raw)?;
276            let commit = begin_commit(marker)?;
277
278            // FIRST STABLE WRITE: commit marker is persisted before any mutations.
279            finish_commit(commit, |guard| {
280                let mut unit = WriteUnit::new("save_entity_atomic");
281                let index_rollback_ops = index_rollback_ops;
282                unit.record_rollback(move || Self::apply_index_rollbacks(index_rollback_ops));
283                Self::apply_marker_index_ops(&guard.marker.index_ops, index_apply_stores);
284                for _ in 0..index_removes {
285                    sink::record(MetricsEvent::IndexRemove {
286                        entity_path: E::PATH,
287                    });
288                }
289                for _ in 0..index_inserts {
290                    sink::record(MetricsEvent::IndexInsert {
291                        entity_path: E::PATH,
292                    });
293                }
294
295                unit.checkpoint("save_entity_after_indexes")?;
296
297                let data_rollback_ops = data_rollback_ops;
298                let db = self.db;
299                unit.record_rollback(move || Self::apply_data_rollbacks(db, data_rollback_ops));
300                unit.run(|| Self::apply_marker_data_ops(&guard.marker.data_ops, &ctx))?;
301
302                span.set_rows(1);
303                unit.commit();
304                Ok(())
305            })?;
306
307            Ok(entity)
308        })();
309
310        if let Some(trace) = trace {
311            match &result {
312                Ok(_) => trace.finish(1),
313                Err(err) => trace.error(err),
314            }
315        }
316
317        result
318    }
319
320    // ======================================================================
321    // Commit-marker apply (mechanical)
322    // ======================================================================
323
324    /// Precompute index mutation metrics before the commit marker is persisted.
325    fn plan_index_metrics(old: Option<&E>, new: &E) -> Result<(usize, usize), InternalError> {
326        let mut removes = 0usize;
327        let mut inserts = 0usize;
328
329        for index in E::INDEXES {
330            if let Some(old) = old
331                && IndexKey::new(old, index)?.is_some()
332            {
333                removes = removes.saturating_add(1);
334            }
335            if IndexKey::new(new, index)?.is_some() {
336                inserts = inserts.saturating_add(1);
337            }
338        }
339
340        Ok((removes, inserts))
341    }
342
343    /// Resolve commit index ops into stores and capture rollback entries.
344    #[allow(clippy::type_complexity)]
345    fn prepare_index_save_ops(
346        plans: &[IndexApplyPlan],
347        ops: &[CommitIndexOp],
348    ) -> Result<
349        (
350            Vec<&'static LocalKey<RefCell<IndexStore>>>,
351            Vec<PreparedIndexRollback>,
352        ),
353        InternalError,
354    > {
355        // Phase 1: map index store paths to store handles.
356        let mut stores = BTreeMap::new();
357        for plan in plans {
358            stores.insert(plan.index.store, plan.store);
359        }
360
361        let mut apply_stores = Vec::with_capacity(ops.len());
362        let mut rollbacks = Vec::with_capacity(ops.len());
363
364        // Phase 2: validate marker ops and snapshot current entries for rollback.
365        for op in ops {
366            let store = stores.get(op.store.as_str()).ok_or_else(|| {
367                InternalError::new(
368                    ErrorClass::Internal,
369                    ErrorOrigin::Index,
370                    format!(
371                        "commit marker references unknown index store '{}' ({})",
372                        op.store,
373                        E::PATH
374                    ),
375                )
376            })?;
377            if op.key.len() != IndexKey::STORED_SIZE as usize {
378                return Err(InternalError::new(
379                    ErrorClass::Internal,
380                    ErrorOrigin::Index,
381                    format!(
382                        "commit marker index key length {} does not match {} ({})",
383                        op.key.len(),
384                        IndexKey::STORED_SIZE,
385                        E::PATH
386                    ),
387                ));
388            }
389            if let Some(value) = &op.value
390                && value.len() > MAX_INDEX_ENTRY_BYTES as usize
391            {
392                return Err(InternalError::new(
393                    ErrorClass::Internal,
394                    ErrorOrigin::Index,
395                    format!(
396                        "commit marker index entry exceeds max size: {} bytes ({})",
397                        value.len(),
398                        E::PATH
399                    ),
400                ));
401            }
402
403            let raw_key = RawIndexKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
404            let existing = store.with_borrow(|s| s.get(&raw_key));
405            if op.value.is_none() && existing.is_none() {
406                return Err(InternalError::new(
407                    ErrorClass::Internal,
408                    ErrorOrigin::Index,
409                    format!(
410                        "commit marker index op missing entry before save: {} ({})",
411                        op.store,
412                        E::PATH
413                    ),
414                ));
415            }
416
417            apply_stores.push(*store);
418            rollbacks.push(PreparedIndexRollback {
419                store,
420                key: raw_key,
421                value: existing,
422            });
423        }
424
425        Ok((apply_stores, rollbacks))
426    }
427
428    /// Validate commit data ops and prepare rollback rows for the save.
429    fn prepare_data_save_ops(
430        ops: &[CommitDataOp],
431        old_row: Option<RawRow>,
432    ) -> Result<Vec<PreparedDataRollback>, InternalError> {
433        if ops.len() != 1 {
434            return Err(InternalError::new(
435                ErrorClass::Internal,
436                ErrorOrigin::Store,
437                format!(
438                    "commit marker save expects 1 data op, found {} ({})",
439                    ops.len(),
440                    E::PATH
441                ),
442            ));
443        }
444
445        let op = &ops[0];
446        if op.store != E::Store::PATH {
447            return Err(InternalError::new(
448                ErrorClass::Internal,
449                ErrorOrigin::Store,
450                format!(
451                    "commit marker references unexpected data store '{}' ({})",
452                    op.store,
453                    E::PATH
454                ),
455            ));
456        }
457        if op.key.len() != DataKey::STORED_SIZE as usize {
458            return Err(InternalError::new(
459                ErrorClass::Internal,
460                ErrorOrigin::Store,
461                format!(
462                    "commit marker data key length {} does not match {} ({})",
463                    op.key.len(),
464                    DataKey::STORED_SIZE,
465                    E::PATH
466                ),
467            ));
468        }
469        let Some(value) = &op.value else {
470            return Err(InternalError::new(
471                ErrorClass::Internal,
472                ErrorOrigin::Store,
473                format!("commit marker save missing data payload ({})", E::PATH),
474            ));
475        };
476        if value.len() > crate::db::store::MAX_ROW_BYTES as usize {
477            return Err(InternalError::new(
478                ErrorClass::Internal,
479                ErrorOrigin::Store,
480                format!(
481                    "commit marker data payload exceeds max size: {} bytes ({})",
482                    value.len(),
483                    E::PATH
484                ),
485            ));
486        }
487
488        let raw_key = RawDataKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
489        Ok(vec![PreparedDataRollback {
490            key: raw_key,
491            value: old_row,
492        }])
493    }
494
495    /// Apply commit marker index ops using pre-resolved stores.
496    fn apply_marker_index_ops(
497        ops: &[CommitIndexOp],
498        stores: Vec<&'static LocalKey<RefCell<IndexStore>>>,
499    ) {
500        debug_assert_eq!(
501            ops.len(),
502            stores.len(),
503            "commit marker index ops length mismatch"
504        );
505
506        for (op, store) in ops.iter().zip(stores.into_iter()) {
507            debug_assert_eq!(op.key.len(), IndexKey::STORED_SIZE as usize);
508            let raw_key = RawIndexKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
509
510            store.with_borrow_mut(|s| {
511                if let Some(value) = &op.value {
512                    debug_assert!(value.len() <= MAX_INDEX_ENTRY_BYTES as usize);
513                    let raw_entry = RawIndexEntry::from_bytes(Cow::Borrowed(value.as_slice()));
514                    s.insert(raw_key, raw_entry);
515                } else {
516                    s.remove(&raw_key);
517                }
518            });
519        }
520    }
521
522    /// Apply rollback mutations for index entries using raw bytes.
523    fn apply_index_rollbacks(ops: Vec<PreparedIndexRollback>) {
524        for op in ops {
525            op.store.with_borrow_mut(|s| {
526                if let Some(value) = op.value {
527                    s.insert(op.key, value);
528                } else {
529                    s.remove(&op.key);
530                }
531            });
532        }
533    }
534
535    /// Apply commit marker data ops to the data store.
536    fn apply_marker_data_ops(
537        ops: &[CommitDataOp],
538        ctx: &crate::db::executor::Context<'_, E>,
539    ) -> Result<(), InternalError> {
540        for op in ops {
541            debug_assert!(op.value.is_some());
542            let Some(value) = op.value.as_ref() else {
543                return Err(InternalError::new(
544                    ErrorClass::Internal,
545                    ErrorOrigin::Store,
546                    format!("commit marker save missing data payload ({})", E::PATH),
547                ));
548            };
549            let raw_key = RawDataKey::from_bytes(Cow::Borrowed(op.key.as_slice()));
550            let raw_value = RawRow::from_bytes(Cow::Borrowed(value.as_slice()));
551            ctx.with_store_mut(|s| s.insert(raw_key, raw_value))?;
552        }
553        Ok(())
554    }
555
556    /// Apply rollback mutations for saved rows.
557    fn apply_data_rollbacks(db: Db<E::Canister>, ops: Vec<PreparedDataRollback>) {
558        let ctx = db.context::<E>();
559        for op in ops {
560            let _ = ctx.with_store_mut(|s| {
561                if let Some(value) = op.value {
562                    s.insert(op.key, value);
563                } else {
564                    s.remove(&op.key);
565                }
566            });
567        }
568    }
569}
570
571const fn save_mode_tag(mode: SaveMode) -> &'static str {
572    match mode {
573        SaveMode::Insert => "insert",
574        SaveMode::Update => "update",
575        SaveMode::Replace => "replace",
576    }
577}
578
579/// Rollback descriptor for index mutations recorded in a commit marker.
580struct PreparedIndexRollback {
581    store: &'static LocalKey<RefCell<IndexStore>>,
582    key: RawIndexKey,
583    value: Option<RawIndexEntry>,
584}
585
586/// Rollback descriptor for data mutations recorded in a commit marker.
587struct PreparedDataRollback {
588    key: RawDataKey,
589    value: Option<RawRow>,
590}