icydb_core/db/executor/
save.rs

1use crate::{
2    db::{
3        Db,
4        executor::{ExecutorError, WriteUnit},
5        query::{SaveMode, SaveQuery},
6        store::{DataKey, IndexInsertError, IndexInsertOutcome, IndexRemoveOutcome},
7    },
8    error::InternalError,
9    obs::sink::{self, ExecKind, MetricsEvent, Span},
10    sanitize::sanitize,
11    serialize::{deserialize, serialize},
12    traits::EntityKind,
13    validate::validate,
14};
15use std::marker::PhantomData;
16
17///
18/// SaveExecutor
19///
20
21#[derive(Clone, Copy)]
22pub struct SaveExecutor<E: EntityKind> {
23    db: Db<E::Canister>,
24    debug: bool,
25    _marker: PhantomData<E>,
26}
27
28impl<E: EntityKind> SaveExecutor<E> {
29    // ======================================================================
30    // Construction & configuration
31    // ======================================================================
32
33    #[must_use]
34    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
35        Self {
36            db,
37            debug,
38            _marker: PhantomData,
39        }
40    }
41
42    #[must_use]
43    pub const fn debug(mut self) -> Self {
44        self.debug = true;
45        self
46    }
47
48    // ======================================================================
49    // Single-entity save operations
50    // ======================================================================
51
52    /// Insert a brand-new entity (errors if the key already exists).
53    pub fn insert(&self, entity: E) -> Result<E, InternalError> {
54        self.save_entity(SaveMode::Insert, entity)
55    }
56
57    /// Insert a new view, returning the stored view.
58    pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
59        let entity = E::from_view(view);
60        Ok(self.insert(entity)?.to_view())
61    }
62
63    /// Update an existing entity (errors if it does not exist).
64    pub fn update(&self, entity: E) -> Result<E, InternalError> {
65        self.save_entity(SaveMode::Update, entity)
66    }
67
68    /// Update an existing view (errors if it does not exist).
69    pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
70        let entity = E::from_view(view);
71
72        Ok(self.update(entity)?.to_view())
73    }
74
75    /// Replace an entity, inserting if missing.
76    pub fn replace(&self, entity: E) -> Result<E, InternalError> {
77        self.save_entity(SaveMode::Replace, entity)
78    }
79
80    /// Replace a view, inserting if missing.
81    pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
82        let entity = E::from_view(view);
83
84        Ok(self.replace(entity)?.to_view())
85    }
86
87    // ======================================================================
88    // Batch save operations (fail-fast, non-atomic)
89    // ======================================================================
90
91    pub fn insert_many(
92        &self,
93        entities: impl IntoIterator<Item = E>,
94    ) -> Result<Vec<E>, InternalError> {
95        let iter = entities.into_iter();
96        let mut out = Vec::with_capacity(iter.size_hint().0);
97
98        // Batch semantics: fail-fast and non-atomic; partial successes remain.
99        // Retry-safe only with caller idempotency and conflict handling.
100        for entity in iter {
101            out.push(self.insert(entity)?);
102        }
103
104        Ok(out)
105    }
106
107    pub fn update_many(
108        &self,
109        entities: impl IntoIterator<Item = E>,
110    ) -> Result<Vec<E>, InternalError> {
111        let iter = entities.into_iter();
112        let mut out = Vec::with_capacity(iter.size_hint().0);
113
114        // Batch semantics: fail-fast and non-atomic; partial successes remain.
115        // Retry-safe only if the caller tolerates already-updated rows.
116        for entity in iter {
117            out.push(self.update(entity)?);
118        }
119
120        Ok(out)
121    }
122
123    pub fn replace_many(
124        &self,
125        entities: impl IntoIterator<Item = E>,
126    ) -> Result<Vec<E>, InternalError> {
127        let iter = entities.into_iter();
128        let mut out = Vec::with_capacity(iter.size_hint().0);
129
130        // Batch semantics: fail-fast and non-atomic; partial successes remain.
131        // Retry-safe only with caller idempotency and conflict handling.
132        for entity in iter {
133            out.push(self.replace(entity)?);
134        }
135
136        Ok(out)
137    }
138
139    // ======================================================================
140    // Low-level execution
141    // ======================================================================
142
143    /// Execute a serialized save query.
144    ///
145    /// NOTE: Deserialization here is over user-supplied bytes. Failures are
146    /// considered invalid input rather than storage corruption.
147    pub fn execute(&self, query: SaveQuery) -> Result<E, InternalError> {
148        let entity: E = deserialize(&query.bytes)?;
149        self.save_entity(query.mode, entity)
150    }
151
152    fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, InternalError> {
153        let mut span = Span::<E>::new(ExecKind::Save);
154        let ctx = self.db.context::<E>();
155        let _unit = WriteUnit::new("save_entity_non_atomic");
156
157        // Sanitize & validate before key extraction in case PK fields are normalized
158        sanitize(&mut entity)?;
159        validate(&entity)?;
160
161        let key = entity.key();
162        let data_key = DataKey::new::<E>(key);
163        let old_result = ctx.with_store(|store| store.get(&data_key))?;
164
165        let old = match (mode, old_result) {
166            (SaveMode::Insert | SaveMode::Replace, None) => None,
167            (SaveMode::Update | SaveMode::Replace, Some(old_bytes)) => {
168                Some(deserialize::<E>(&old_bytes)?)
169            }
170            (SaveMode::Insert, Some(_)) => return Err(ExecutorError::KeyExists(data_key).into()),
171            (SaveMode::Update, None) => return Err(ExecutorError::KeyNotFound(data_key).into()),
172        };
173
174        let bytes = serialize(&entity)?;
175
176        // Partial-write window:
177        // - Phase 1 uniqueness checks are safe (no mutation, retry-safe).
178        // - Phase 2 mutates indexes; failures here can leave index divergence.
179        // - Data write happens after index updates; failures can orphan indexes.
180        // Corruption risk exists if failures occur after index mutation.
181        self.replace_indexes(old.as_ref(), &entity)?;
182
183        ctx.with_store_mut(|store| store.insert(data_key.clone(), bytes))?;
184        span.set_rows(1);
185
186        Ok(entity)
187    }
188
189    // ======================================================================
190    // Index maintenance
191    // ======================================================================
192
193    /// Replace index entries using a two-phase (validate, then mutate) approach
194    /// to avoid partial updates on uniqueness violations.
195    fn replace_indexes(&self, old: Option<&E>, new: &E) -> Result<(), InternalError> {
196        use crate::db::store::IndexKey;
197
198        // Phase 1: validate uniqueness constraints without mutating.
199        for index in E::INDEXES {
200            if index.unique
201                && let Some(new_idx_key) = IndexKey::new(new, index)
202            {
203                let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
204                let violates = store.with_borrow(|s| {
205                    if let Some(existing) = s.get(&new_idx_key) {
206                        let new_entity_key = new.key();
207                        !existing.contains(&new_entity_key) && !existing.is_empty()
208                    } else {
209                        false
210                    }
211                });
212
213                if violates {
214                    sink::record(MetricsEvent::UniqueViolation {
215                        entity_path: E::PATH,
216                    });
217
218                    return Err(ExecutorError::index_violation(E::PATH, index.fields).into());
219                }
220            }
221        }
222
223        // Phase 2: apply mutations.
224        // Failure here can leave partial index updates (corruption risk).
225        for index in E::INDEXES {
226            let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
227            let mut removed = false;
228            let mut inserted = false;
229            store.with_borrow_mut(|s| {
230                if let Some(old) = old
231                    && s.remove_index_entry(old, index) == IndexRemoveOutcome::Removed
232                {
233                    removed = true;
234                }
235                match s.insert_index_entry(new, index) {
236                    Ok(IndexInsertOutcome::Inserted) => {
237                        inserted = true;
238                    }
239                    Ok(IndexInsertOutcome::Skipped) => {}
240                    Err(IndexInsertError::UniqueViolation) => {
241                        sink::record(MetricsEvent::UniqueViolation {
242                            entity_path: E::PATH,
243                        });
244                        return Err(ExecutorError::index_violation(E::PATH, index.fields).into());
245                    }
246                }
247                Ok::<(), InternalError>(())
248            })?;
249
250            if removed {
251                sink::record(MetricsEvent::IndexRemove {
252                    entity_path: E::PATH,
253                });
254            }
255
256            if inserted {
257                sink::record(MetricsEvent::IndexInsert {
258                    entity_path: E::PATH,
259                });
260            }
261        }
262
263        Ok(())
264    }
265}