Skip to main content

icydb_core/db/executor/
save.rs

1use crate::{
2    db::{
3        Db,
4        executor::{ExecutorError, WriteUnit},
5        query::{SaveMode, SaveQuery},
6        store::{DataKey, IndexInsertError, IndexInsertOutcome, IndexRemoveOutcome, RawRow},
7    },
8    error::{ErrorClass, ErrorOrigin, InternalError},
9    obs::sink::{self, ExecKind, MetricsEvent, Span},
10    sanitize::sanitize,
11    serialize::{deserialize, serialize},
12    traits::EntityKind,
13    validate::validate,
14};
15use std::marker::PhantomData;
16
17///
18/// SaveExecutor
19///
20
21#[derive(Clone, Copy)]
22pub struct SaveExecutor<E: EntityKind> {
23    db: Db<E::Canister>,
24    debug: bool,
25    _marker: PhantomData<E>,
26}
27
28impl<E: EntityKind> SaveExecutor<E> {
29    // ======================================================================
30    // Construction & configuration
31    // ======================================================================
32
33    #[must_use]
34    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
35        Self {
36            db,
37            debug,
38            _marker: PhantomData,
39        }
40    }
41
42    #[must_use]
43    pub const fn debug(mut self) -> Self {
44        self.debug = true;
45        self
46    }
47
48    // ======================================================================
49    // Single-entity save operations
50    // ======================================================================
51
52    /// Insert a brand-new entity (errors if the key already exists).
53    pub fn insert(&self, entity: E) -> Result<E, InternalError> {
54        self.save_entity(SaveMode::Insert, entity)
55    }
56
57    /// Insert a new view, returning the stored view.
58    pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
59        let entity = E::from_view(view);
60        Ok(self.insert(entity)?.to_view())
61    }
62
63    /// Update an existing entity (errors if it does not exist).
64    pub fn update(&self, entity: E) -> Result<E, InternalError> {
65        self.save_entity(SaveMode::Update, entity)
66    }
67
68    /// Update an existing view (errors if it does not exist).
69    pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
70        let entity = E::from_view(view);
71
72        Ok(self.update(entity)?.to_view())
73    }
74
75    /// Replace an entity, inserting if missing.
76    pub fn replace(&self, entity: E) -> Result<E, InternalError> {
77        self.save_entity(SaveMode::Replace, entity)
78    }
79
80    /// Replace a view, inserting if missing.
81    pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
82        let entity = E::from_view(view);
83
84        Ok(self.replace(entity)?.to_view())
85    }
86
87    // ======================================================================
88    // Batch save operations (fail-fast, non-atomic)
89    // ======================================================================
90
91    pub fn insert_many(
92        &self,
93        entities: impl IntoIterator<Item = E>,
94    ) -> Result<Vec<E>, InternalError> {
95        let iter = entities.into_iter();
96        let mut out = Vec::with_capacity(iter.size_hint().0);
97
98        // Batch semantics: fail-fast and non-atomic; partial successes remain.
99        // Retry-safe only with caller idempotency and conflict handling.
100        for entity in iter {
101            out.push(self.insert(entity)?);
102        }
103
104        Ok(out)
105    }
106
107    pub fn update_many(
108        &self,
109        entities: impl IntoIterator<Item = E>,
110    ) -> Result<Vec<E>, InternalError> {
111        let iter = entities.into_iter();
112        let mut out = Vec::with_capacity(iter.size_hint().0);
113
114        // Batch semantics: fail-fast and non-atomic; partial successes remain.
115        // Retry-safe only if the caller tolerates already-updated rows.
116        for entity in iter {
117            out.push(self.update(entity)?);
118        }
119
120        Ok(out)
121    }
122
123    pub fn replace_many(
124        &self,
125        entities: impl IntoIterator<Item = E>,
126    ) -> Result<Vec<E>, InternalError> {
127        let iter = entities.into_iter();
128        let mut out = Vec::with_capacity(iter.size_hint().0);
129
130        // Batch semantics: fail-fast and non-atomic; partial successes remain.
131        // Retry-safe only with caller idempotency and conflict handling.
132        for entity in iter {
133            out.push(self.replace(entity)?);
134        }
135
136        Ok(out)
137    }
138
139    // ======================================================================
140    // Low-level execution
141    // ======================================================================
142
143    /// Execute a serialized save query.
144    ///
145    /// NOTE: Deserialization here is over user-supplied bytes. Failures are
146    /// considered invalid input rather than storage corruption.
147    pub fn execute(&self, query: SaveQuery) -> Result<E, InternalError> {
148        let entity: E = deserialize(&query.bytes)?;
149        self.save_entity(query.mode, entity)
150    }
151
152    fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, InternalError> {
153        let mut span = Span::<E>::new(ExecKind::Save);
154        let ctx = self.db.context::<E>();
155        let _unit = WriteUnit::new("save_entity_non_atomic");
156
157        // Sanitize & validate before key extraction in case PK fields are normalized
158        sanitize(&mut entity)?;
159        validate(&entity)?;
160
161        let key = entity.key();
162        let data_key = DataKey::new::<E>(key);
163        let raw_key = data_key.to_raw();
164        let old_result = ctx.with_store(|store| store.get(&raw_key))?;
165
166        let old = match (mode, old_result) {
167            (SaveMode::Insert | SaveMode::Replace, None) => None,
168            (SaveMode::Update | SaveMode::Replace, Some(old_row)) => {
169                Some(old_row.try_decode::<E>().map_err(|err| {
170                    ExecutorError::corruption(
171                        ErrorOrigin::Serialize,
172                        format!("failed to deserialize row: {data_key} ({err})"),
173                    )
174                })?)
175            }
176            (SaveMode::Insert, Some(_)) => return Err(ExecutorError::KeyExists(data_key).into()),
177            (SaveMode::Update, None) => return Err(ExecutorError::KeyNotFound(data_key).into()),
178        };
179
180        let bytes = serialize(&entity)?;
181        let row = RawRow::try_new(bytes)?;
182
183        // Partial-write window:
184        // - Phase 1 uniqueness checks are safe (no mutation, retry-safe).
185        // - Phase 2 mutates indexes; failures here can leave index divergence.
186        // - Data write happens after index updates; failures can orphan indexes.
187        // Corruption risk exists if failures occur after index mutation.
188        self.replace_indexes(old.as_ref(), &entity)?;
189
190        ctx.with_store_mut(|store| store.insert(raw_key, row))?;
191        span.set_rows(1);
192
193        Ok(entity)
194    }
195
196    // ======================================================================
197    // Index maintenance
198    // ======================================================================
199
200    /// Replace index entries using a two-phase (validate, then mutate) approach
201    /// to avoid partial updates on uniqueness violations.
202    #[allow(clippy::too_many_lines)]
203    fn replace_indexes(&self, old: Option<&E>, new: &E) -> Result<(), InternalError> {
204        use crate::db::store::IndexKey;
205
206        // Phase 1: validate uniqueness constraints without mutating.
207        for index in E::INDEXES {
208            if index.unique
209                && let Some(new_idx_key) = IndexKey::new(new, index)
210            {
211                let raw_key = new_idx_key.to_raw();
212                let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
213                let violates = store.with_borrow(|s| -> Result<bool, InternalError> {
214                    if let Some(existing) = s.get(&raw_key) {
215                        let entry = existing.try_decode().map_err(|err| {
216                            ExecutorError::corruption(
217                                ErrorOrigin::Index,
218                                format!(
219                                    "index corrupted: {} ({}) -> {}",
220                                    E::PATH,
221                                    index.fields.join(", "),
222                                    err
223                                ),
224                            )
225                        })?;
226                        if entry.len() > 1 {
227                            return Err(ExecutorError::corruption(
228                                ErrorOrigin::Index,
229                                format!(
230                                    "index corrupted: {} ({}) -> {} keys",
231                                    E::PATH,
232                                    index.fields.join(", "),
233                                    entry.len()
234                                ),
235                            )
236                            .into());
237                        }
238                        let new_entity_key = new.key();
239                        Ok(!entry.contains(&new_entity_key) && !entry.is_empty())
240                    } else {
241                        Ok(false)
242                    }
243                })?;
244
245                if violates {
246                    sink::record(MetricsEvent::UniqueViolation {
247                        entity_path: E::PATH,
248                    });
249
250                    return Err(ExecutorError::index_violation(E::PATH, index.fields).into());
251                }
252            }
253        }
254
255        // Phase 2: apply mutations.
256        // Failure here can leave partial index updates (corruption risk).
257        for index in E::INDEXES {
258            let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
259            let mut removed = false;
260            let mut inserted = false;
261            store.with_borrow_mut(|s| {
262                if let Some(old) = old
263                    && s.remove_index_entry(old, index).map_err(|err| {
264                        ExecutorError::corruption(
265                            ErrorOrigin::Index,
266                            format!(
267                                "index corrupted: {} ({}) -> {}",
268                                E::PATH,
269                                index.fields.join(", "),
270                                err
271                            ),
272                        )
273                    })? == IndexRemoveOutcome::Removed
274                {
275                    removed = true;
276                }
277                match s.insert_index_entry(new, index) {
278                    Ok(IndexInsertOutcome::Inserted) => {
279                        inserted = true;
280                    }
281                    Ok(IndexInsertOutcome::Skipped) => {}
282                    Err(IndexInsertError::UniqueViolation) => {
283                        sink::record(MetricsEvent::UniqueViolation {
284                            entity_path: E::PATH,
285                        });
286                        return Err(ExecutorError::index_violation(E::PATH, index.fields).into());
287                    }
288                    Err(IndexInsertError::CorruptedEntry(err)) => {
289                        return Err(ExecutorError::corruption(
290                            ErrorOrigin::Index,
291                            format!(
292                                "index corrupted: {} ({}) -> {}",
293                                E::PATH,
294                                index.fields.join(", "),
295                                err
296                            ),
297                        )
298                        .into());
299                    }
300                    Err(IndexInsertError::EntryTooLarge { keys }) => {
301                        return Err(InternalError::new(
302                            ErrorClass::Unsupported,
303                            ErrorOrigin::Index,
304                            format!(
305                                "index entry exceeds max keys: {} ({}) -> {keys} keys",
306                                E::PATH,
307                                index.fields.join(", ")
308                            ),
309                        ));
310                    }
311                }
312                Ok::<(), InternalError>(())
313            })?;
314
315            if removed {
316                sink::record(MetricsEvent::IndexRemove {
317                    entity_path: E::PATH,
318                });
319            }
320
321            if inserted {
322                sink::record(MetricsEvent::IndexInsert {
323                    entity_path: E::PATH,
324                });
325            }
326        }
327
328        Ok(())
329    }
330}