icydb_core/db/executor/
save.rs

1use crate::{
2    db::{
3        CommitDataOp, CommitKind, CommitMarker, Db, begin_commit, ensure_recovered,
4        executor::{ExecutorError, WriteUnit},
5        finish_commit,
6        index::{
7            IndexInsertOutcome, IndexRemoveOutcome,
8            plan::{IndexApplyPlan, plan_index_mutation_for_entity},
9        },
10        query::{SaveMode, SaveQuery},
11        store::{DataKey, RawRow},
12    },
13    error::{ErrorOrigin, InternalError},
14    obs::sink::{self, ExecKind, MetricsEvent, Span},
15    sanitize::sanitize,
16    serialize::{deserialize, serialize},
17    traits::{EntityKind, Path},
18    validate::validate,
19};
20use std::marker::PhantomData;
21
22///
23/// SaveExecutor
24///
25
26#[derive(Clone, Copy)]
27pub struct SaveExecutor<E: EntityKind> {
28    db: Db<E::Canister>,
29    debug: bool,
30    _marker: PhantomData<E>,
31}
32
33impl<E: EntityKind> SaveExecutor<E> {
34    // ======================================================================
35    // Construction & configuration
36    // ======================================================================
37
38    #[must_use]
39    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
40        Self {
41            db,
42            debug,
43            _marker: PhantomData,
44        }
45    }
46
47    #[must_use]
48    pub const fn debug(mut self) -> Self {
49        self.debug = true;
50        self
51    }
52
53    fn debug_log(&self, s: impl Into<String>) {
54        if self.debug {
55            println!("{}", s.into());
56        }
57    }
58
59    // ======================================================================
60    // Single-entity save operations
61    // ======================================================================
62
63    /// Insert a brand-new entity (errors if the key already exists).
64    pub fn insert(&self, entity: E) -> Result<E, InternalError> {
65        self.save_entity(SaveMode::Insert, entity)
66    }
67
68    /// Insert a new view, returning the stored view.
69    pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
70        let entity = E::from_view(view);
71        Ok(self.insert(entity)?.to_view())
72    }
73
74    /// Update an existing entity (errors if it does not exist).
75    pub fn update(&self, entity: E) -> Result<E, InternalError> {
76        self.save_entity(SaveMode::Update, entity)
77    }
78
79    /// Update an existing view (errors if it does not exist).
80    pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
81        let entity = E::from_view(view);
82
83        Ok(self.update(entity)?.to_view())
84    }
85
86    /// Replace an entity, inserting if missing.
87    pub fn replace(&self, entity: E) -> Result<E, InternalError> {
88        self.save_entity(SaveMode::Replace, entity)
89    }
90
91    /// Replace a view, inserting if missing.
92    pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
93        let entity = E::from_view(view);
94
95        Ok(self.replace(entity)?.to_view())
96    }
97
98    // ======================================================================
99    // Batch save operations (fail-fast, non-atomic)
100    // ======================================================================
101
102    pub fn insert_many(
103        &self,
104        entities: impl IntoIterator<Item = E>,
105    ) -> Result<Vec<E>, InternalError> {
106        let iter = entities.into_iter();
107        let mut out = Vec::with_capacity(iter.size_hint().0);
108
109        // Batch semantics: fail-fast and non-atomic; partial successes remain.
110        // Retry-safe only with caller idempotency and conflict handling.
111        for entity in iter {
112            out.push(self.insert(entity)?);
113        }
114
115        Ok(out)
116    }
117
118    pub fn update_many(
119        &self,
120        entities: impl IntoIterator<Item = E>,
121    ) -> Result<Vec<E>, InternalError> {
122        let iter = entities.into_iter();
123        let mut out = Vec::with_capacity(iter.size_hint().0);
124
125        // Batch semantics: fail-fast and non-atomic; partial successes remain.
126        // Retry-safe only if the caller tolerates already-updated rows.
127        for entity in iter {
128            out.push(self.update(entity)?);
129        }
130
131        Ok(out)
132    }
133
134    pub fn replace_many(
135        &self,
136        entities: impl IntoIterator<Item = E>,
137    ) -> Result<Vec<E>, InternalError> {
138        let iter = entities.into_iter();
139        let mut out = Vec::with_capacity(iter.size_hint().0);
140
141        // Batch semantics: fail-fast and non-atomic; partial successes remain.
142        // Retry-safe only with caller idempotency and conflict handling.
143        for entity in iter {
144            out.push(self.replace(entity)?);
145        }
146
147        Ok(out)
148    }
149
150    // ======================================================================
151    // Low-level execution
152    // ======================================================================
153
154    /// Execute a serialized save query.
155    ///
156    /// NOTE: Deserialization here is over user-supplied bytes. Failures are
157    /// considered invalid input rather than storage corruption.
158    pub fn execute(&self, query: SaveQuery) -> Result<E, InternalError> {
159        let entity: E = deserialize(&query.bytes)?;
160        self.save_entity(query.mode, entity)
161    }
162
163    fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, InternalError> {
164        let mut span = Span::<E>::new(ExecKind::Save);
165        let ctx = self.db.context::<E>();
166        let _unit = WriteUnit::new("save_entity_stage2_atomic");
167
168        // Recovery is mutation-only to keep read paths side-effect free.
169        ensure_recovered(&self.db)?;
170
171        // Sanitize & validate before key extraction in case PK fields are normalized
172        sanitize(&mut entity)?;
173        validate(&entity)?;
174
175        let key = entity.key();
176        let data_key = DataKey::new::<E>(key);
177        let raw_key = data_key.to_raw();
178
179        self.debug_log(format!(
180            "[debug] save {:?} on {} (key={})",
181            mode,
182            E::PATH,
183            data_key
184        ));
185        let old = match mode {
186            SaveMode::Insert => {
187                // Inserts must not load or decode existing rows; absence is expected.
188                if ctx.with_store(|store| store.contains_key(&raw_key))? {
189                    return Err(ExecutorError::KeyExists(data_key).into());
190                }
191                None
192            }
193            SaveMode::Update => {
194                let Some(old_row) = ctx.with_store(|store| store.get(&raw_key))? else {
195                    return Err(InternalError::store_not_found(data_key.to_string()));
196                };
197                Some(old_row.try_decode::<E>().map_err(|err| {
198                    ExecutorError::corruption(
199                        ErrorOrigin::Serialize,
200                        format!("failed to deserialize row: {data_key} ({err})"),
201                    )
202                })?)
203            }
204            SaveMode::Replace => {
205                let old_row = ctx.with_store(|store| store.get(&raw_key))?;
206                old_row
207                    .map(|row| {
208                        row.try_decode::<E>().map_err(|err| {
209                            ExecutorError::corruption(
210                                ErrorOrigin::Serialize,
211                                format!("failed to deserialize row: {data_key} ({err})"),
212                            )
213                        })
214                    })
215                    .transpose()?
216            }
217        };
218
219        let bytes = serialize(&entity)?;
220        let row = RawRow::try_new(bytes)?;
221
222        // Preflight data store availability before index mutations.
223        ctx.with_store(|_| ())?;
224
225        // Stage-2 atomicity:
226        // Prevalidate index/data mutations before the commit marker is written.
227        // After the marker is persisted, only infallible operations or traps remain.
228        let index_plan =
229            plan_index_mutation_for_entity::<E>(&self.db, old.as_ref(), Some(&entity))?;
230        let data_op = CommitDataOp {
231            store: E::Store::PATH.to_string(),
232            key: raw_key.as_bytes().to_vec(),
233            value: Some(row.as_bytes().to_vec()),
234        };
235        let marker = CommitMarker::new(CommitKind::Save, index_plan.commit_ops, vec![data_op])?;
236        let commit = begin_commit(marker)?;
237
238        // FIRST STABLE WRITE: commit marker is persisted; apply phase is infallible or traps.
239        finish_commit(
240            commit,
241            || Self::apply_indexes(&index_plan.apply, old.as_ref(), &entity),
242            || {
243                ctx.with_store_mut(|store| store.insert(raw_key, row))
244                    .expect("data store missing after preflight");
245                span.set_rows(1);
246            },
247        );
248
249        Ok(entity)
250    }
251
252    // ======================================================================
253    // Index maintenance
254    // ======================================================================
255
256    /// Apply index mutations using an infallible (prevalidated) plan.
257    fn apply_indexes(plans: &[IndexApplyPlan], old: Option<&E>, new: &E) {
258        // Prevalidation guarantees these mutations cannot fail except by trap.
259        for plan in plans {
260            let mut removed = false;
261            let mut inserted = false;
262
263            plan.store.with_borrow_mut(|s| {
264                if let Some(old) = old {
265                    let outcome = s
266                        .remove_index_entry(old, plan.index)
267                        .expect("index remove failed after prevalidation");
268                    if outcome == IndexRemoveOutcome::Removed {
269                        removed = true;
270                    }
271                }
272
273                let outcome = s
274                    .insert_index_entry(new, plan.index)
275                    .expect("index insert failed after prevalidation");
276                if outcome == IndexInsertOutcome::Inserted {
277                    inserted = true;
278                }
279            });
280
281            if removed {
282                sink::record(MetricsEvent::IndexRemove {
283                    entity_path: E::PATH,
284                });
285            }
286
287            if inserted {
288                sink::record(MetricsEvent::IndexInsert {
289                    entity_path: E::PATH,
290                });
291            }
292        }
293    }
294}