Skip to main content

icydb_core/db/executor/
save.rs

1use crate::{
2    db::{
3        CommitDataOp, CommitKind, CommitMarker, Db, begin_commit, ensure_recovered,
4        executor::{ExecutorError, WriteUnit},
5        finish_commit,
6        index::{
7            IndexInsertOutcome, IndexRemoveOutcome,
8            plan::{IndexApplyPlan, plan_index_mutation_for_entity},
9        },
10        query::{SaveMode, SaveQuery},
11        store::{DataKey, RawRow},
12    },
13    error::{ErrorOrigin, InternalError},
14    obs::sink::{self, ExecKind, MetricsEvent, Span},
15    sanitize::sanitize,
16    serialize::{deserialize, serialize},
17    traits::{EntityKind, Path},
18    validate::validate,
19};
20use std::marker::PhantomData;
21
22///
23/// SaveExecutor
24///
25
26#[derive(Clone, Copy)]
27pub struct SaveExecutor<E: EntityKind> {
28    db: Db<E::Canister>,
29    debug: bool,
30    _marker: PhantomData<E>,
31}
32
33impl<E: EntityKind> SaveExecutor<E> {
34    // ======================================================================
35    // Construction & configuration
36    // ======================================================================
37
38    #[must_use]
39    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
40        Self {
41            db,
42            debug,
43            _marker: PhantomData,
44        }
45    }
46
47    #[must_use]
48    pub const fn debug(mut self) -> Self {
49        self.debug = true;
50        self
51    }
52
53    // ======================================================================
54    // Single-entity save operations
55    // ======================================================================
56
57    /// Insert a brand-new entity (errors if the key already exists).
58    pub fn insert(&self, entity: E) -> Result<E, InternalError> {
59        self.save_entity(SaveMode::Insert, entity)
60    }
61
62    /// Insert a new view, returning the stored view.
63    pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
64        let entity = E::from_view(view);
65        Ok(self.insert(entity)?.to_view())
66    }
67
68    /// Update an existing entity (errors if it does not exist).
69    pub fn update(&self, entity: E) -> Result<E, InternalError> {
70        self.save_entity(SaveMode::Update, entity)
71    }
72
73    /// Update an existing view (errors if it does not exist).
74    pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
75        let entity = E::from_view(view);
76
77        Ok(self.update(entity)?.to_view())
78    }
79
80    /// Replace an entity, inserting if missing.
81    pub fn replace(&self, entity: E) -> Result<E, InternalError> {
82        self.save_entity(SaveMode::Replace, entity)
83    }
84
85    /// Replace a view, inserting if missing.
86    pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
87        let entity = E::from_view(view);
88
89        Ok(self.replace(entity)?.to_view())
90    }
91
92    // ======================================================================
93    // Batch save operations (fail-fast, non-atomic)
94    // ======================================================================
95
96    pub fn insert_many(
97        &self,
98        entities: impl IntoIterator<Item = E>,
99    ) -> Result<Vec<E>, InternalError> {
100        let iter = entities.into_iter();
101        let mut out = Vec::with_capacity(iter.size_hint().0);
102
103        // Batch semantics: fail-fast and non-atomic; partial successes remain.
104        // Retry-safe only with caller idempotency and conflict handling.
105        for entity in iter {
106            out.push(self.insert(entity)?);
107        }
108
109        Ok(out)
110    }
111
112    pub fn update_many(
113        &self,
114        entities: impl IntoIterator<Item = E>,
115    ) -> Result<Vec<E>, InternalError> {
116        let iter = entities.into_iter();
117        let mut out = Vec::with_capacity(iter.size_hint().0);
118
119        // Batch semantics: fail-fast and non-atomic; partial successes remain.
120        // Retry-safe only if the caller tolerates already-updated rows.
121        for entity in iter {
122            out.push(self.update(entity)?);
123        }
124
125        Ok(out)
126    }
127
128    pub fn replace_many(
129        &self,
130        entities: impl IntoIterator<Item = E>,
131    ) -> Result<Vec<E>, InternalError> {
132        let iter = entities.into_iter();
133        let mut out = Vec::with_capacity(iter.size_hint().0);
134
135        // Batch semantics: fail-fast and non-atomic; partial successes remain.
136        // Retry-safe only with caller idempotency and conflict handling.
137        for entity in iter {
138            out.push(self.replace(entity)?);
139        }
140
141        Ok(out)
142    }
143
144    // ======================================================================
145    // Low-level execution
146    // ======================================================================
147
148    /// Execute a serialized save query.
149    ///
150    /// NOTE: Deserialization here is over user-supplied bytes. Failures are
151    /// considered invalid input rather than storage corruption.
152    pub fn execute(&self, query: SaveQuery) -> Result<E, InternalError> {
153        let entity: E = deserialize(&query.bytes)?;
154        self.save_entity(query.mode, entity)
155    }
156
157    fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, InternalError> {
158        let mut span = Span::<E>::new(ExecKind::Save);
159        let ctx = self.db.context::<E>();
160        let _unit = WriteUnit::new("save_entity_stage2_atomic");
161
162        // Recovery is mutation-only to keep read paths side-effect free.
163        ensure_recovered(&self.db)?;
164
165        // Sanitize & validate before key extraction in case PK fields are normalized
166        sanitize(&mut entity)?;
167        validate(&entity)?;
168
169        let key = entity.key();
170        let data_key = DataKey::new::<E>(key);
171        let raw_key = data_key.to_raw();
172        let old_result = ctx.with_store(|store| store.get(&raw_key))?;
173
174        let old = match (mode, old_result) {
175            (SaveMode::Insert | SaveMode::Replace, None) => None,
176            (SaveMode::Update | SaveMode::Replace, Some(old_row)) => {
177                Some(old_row.try_decode::<E>().map_err(|err| {
178                    ExecutorError::corruption(
179                        ErrorOrigin::Serialize,
180                        format!("failed to deserialize row: {data_key} ({err})"),
181                    )
182                })?)
183            }
184            (SaveMode::Insert, Some(_)) => return Err(ExecutorError::KeyExists(data_key).into()),
185            (SaveMode::Update, None) => return Err(ExecutorError::KeyNotFound(data_key).into()),
186        };
187
188        let bytes = serialize(&entity)?;
189        let row = RawRow::try_new(bytes)?;
190
191        // Preflight data store availability before index mutations.
192        ctx.with_store(|_| ())?;
193
194        // Stage-2 atomicity:
195        // Prevalidate index/data mutations before the commit marker is written.
196        // After the marker is persisted, only infallible operations or traps remain.
197        let index_plan =
198            plan_index_mutation_for_entity::<E>(&self.db, old.as_ref(), Some(&entity))?;
199        let data_op = CommitDataOp {
200            store: E::Store::PATH.to_string(),
201            key: raw_key.as_bytes().to_vec(),
202            value: Some(row.as_bytes().to_vec()),
203        };
204        let marker = CommitMarker::new(CommitKind::Save, index_plan.commit_ops, vec![data_op])?;
205        let commit = begin_commit(marker)?;
206
207        // FIRST STABLE WRITE: commit marker is persisted; apply phase is infallible or traps.
208        finish_commit(
209            commit,
210            || Self::apply_indexes(&index_plan.apply, old.as_ref(), &entity),
211            || {
212                ctx.with_store_mut(|store| store.insert(raw_key, row))
213                    .expect("data store missing after preflight");
214                span.set_rows(1);
215            },
216        );
217
218        Ok(entity)
219    }
220
221    // ======================================================================
222    // Index maintenance
223    // ======================================================================
224
225    /// Apply index mutations using an infallible (prevalidated) plan.
226    fn apply_indexes(plans: &[IndexApplyPlan], old: Option<&E>, new: &E) {
227        // Prevalidation guarantees these mutations cannot fail except by trap.
228        for plan in plans {
229            let mut removed = false;
230            let mut inserted = false;
231
232            plan.store.with_borrow_mut(|s| {
233                if let Some(old) = old {
234                    let outcome = s
235                        .remove_index_entry(old, plan.index)
236                        .expect("index remove failed after prevalidation");
237                    if outcome == IndexRemoveOutcome::Removed {
238                        removed = true;
239                    }
240                }
241
242                let outcome = s
243                    .insert_index_entry(new, plan.index)
244                    .expect("index insert failed after prevalidation");
245                if outcome == IndexInsertOutcome::Inserted {
246                    inserted = true;
247                }
248            });
249
250            if removed {
251                sink::record(MetricsEvent::IndexRemove {
252                    entity_path: E::PATH,
253                });
254            }
255
256            if inserted {
257                sink::record(MetricsEvent::IndexInsert {
258                    entity_path: E::PATH,
259                });
260            }
261        }
262    }
263}