icydb_core/db/executor/
save.rs

1use crate::{
2    db::{
3        Db,
4        executor::{ExecutorError, WriteUnit},
5        query::{SaveMode, SaveQuery},
6        store::DataKey,
7    },
8    deserialize,
9    obs::metrics,
10    runtime_error::RuntimeError,
11    sanitize, serialize,
12    traits::EntityKind,
13    validate,
14};
15use std::marker::PhantomData;
16
17///
18/// SaveExecutor
19///
20
21#[derive(Clone, Copy)]
22pub struct SaveExecutor<E: EntityKind> {
23    db: Db<E::Canister>,
24    debug: bool,
25    _marker: PhantomData<E>,
26}
27
28impl<E: EntityKind> SaveExecutor<E> {
29    // ======================================================================
30    // Construction & configuration
31    // ======================================================================
32
33    #[must_use]
34    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
35        Self {
36            db,
37            debug,
38            _marker: PhantomData,
39        }
40    }
41
42    #[must_use]
43    pub const fn debug(mut self) -> Self {
44        self.debug = true;
45        self
46    }
47
48    // ======================================================================
49    // Single-entity save operations
50    // ======================================================================
51
52    /// Insert a brand-new entity (errors if the key already exists).
53    pub fn insert(&self, entity: E) -> Result<E, RuntimeError> {
54        self.save_entity(SaveMode::Insert, entity)
55    }
56
57    /// Insert a new view, returning the stored view.
58    pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, RuntimeError> {
59        let entity = E::from_view(view);
60        Ok(self.insert(entity)?.to_view())
61    }
62
63    /// Update an existing entity (errors if it does not exist).
64    pub fn update(&self, entity: E) -> Result<E, RuntimeError> {
65        self.save_entity(SaveMode::Update, entity)
66    }
67
68    /// Update an existing view (errors if it does not exist).
69    pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, RuntimeError> {
70        let entity = E::from_view(view);
71        Ok(self.update(entity)?.to_view())
72    }
73
74    /// Replace an entity, inserting if missing.
75    pub fn replace(&self, entity: E) -> Result<E, RuntimeError> {
76        self.save_entity(SaveMode::Replace, entity)
77    }
78
79    /// Replace a view, inserting if missing.
80    pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, RuntimeError> {
81        let entity = E::from_view(view);
82        Ok(self.replace(entity)?.to_view())
83    }
84
85    // ======================================================================
86    // Batch save operations (fail-fast, non-atomic)
87    // ======================================================================
88
89    pub fn insert_many(
90        &self,
91        entities: impl IntoIterator<Item = E>,
92    ) -> Result<Vec<E>, RuntimeError> {
93        let iter = entities.into_iter();
94        let mut out = Vec::with_capacity(iter.size_hint().0);
95        // Batch semantics: fail-fast and non-atomic; partial successes remain.
96        // Retry-safe only with caller idempotency and conflict handling.
97        for entity in iter {
98            out.push(self.insert(entity)?);
99        }
100        Ok(out)
101    }
102
103    pub fn update_many(
104        &self,
105        entities: impl IntoIterator<Item = E>,
106    ) -> Result<Vec<E>, RuntimeError> {
107        let iter = entities.into_iter();
108        let mut out = Vec::with_capacity(iter.size_hint().0);
109        // Batch semantics: fail-fast and non-atomic; partial successes remain.
110        // Retry-safe only if the caller tolerates already-updated rows.
111        for entity in iter {
112            out.push(self.update(entity)?);
113        }
114        Ok(out)
115    }
116
117    pub fn replace_many(
118        &self,
119        entities: impl IntoIterator<Item = E>,
120    ) -> Result<Vec<E>, RuntimeError> {
121        let iter = entities.into_iter();
122        let mut out = Vec::with_capacity(iter.size_hint().0);
123        // Batch semantics: fail-fast and non-atomic; partial successes remain.
124        // Retry-safe only with caller idempotency and conflict handling.
125        for entity in iter {
126            out.push(self.replace(entity)?);
127        }
128        Ok(out)
129    }
130
131    // ======================================================================
132    // Low-level execution
133    // ======================================================================
134
135    /// Execute a serialized save query.
136    pub fn execute(&self, query: SaveQuery) -> Result<E, RuntimeError> {
137        let entity: E = deserialize(&query.bytes)?;
138        self.save_entity(query.mode, entity)
139    }
140
141    fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, RuntimeError> {
142        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Save);
143        let ctx = self.db.context::<E>();
144        let _unit = WriteUnit::new("save_entity");
145
146        // sanitize & validate before key extraction in case PK fields are normalized
147        sanitize(&mut entity)?;
148        validate(&entity)?;
149
150        // match save mode
151        let key = entity.key();
152        let data_key = DataKey::new::<E>(key);
153        let old_result = ctx.with_store(|store| store.get(&data_key))?;
154
155        let old = match (mode, old_result) {
156            (SaveMode::Insert | SaveMode::Replace, None) => None,
157
158            (SaveMode::Update | SaveMode::Replace, Some(old_bytes)) => {
159                Some(deserialize::<E>(&old_bytes)?)
160            }
161
162            (SaveMode::Insert, Some(_)) => return Err(ExecutorError::KeyExists(data_key).into()),
163            (SaveMode::Update, None) => return Err(ExecutorError::KeyNotFound(data_key).into()),
164        };
165
166        // serialize new entity
167        let bytes = serialize(&entity)?;
168
169        // Partial-write window:
170        // - Phase 1 uniqueness checks are safe (no mutation, retry-safe).
171        // - Phase 2 mutates indexes; failures here can leave index divergence.
172        // - Data write happens after index updates; failures can orphan indexes.
173        // Acceptable failure: uniqueness conflict (no mutation).
174        // update indexes (two-phase)
175        self.replace_indexes(old.as_ref(), &entity)?;
176
177        // write data row
178        ctx.with_store_mut(|store| store.insert(data_key.clone(), bytes))?;
179        span.set_rows(1);
180
181        Ok(entity)
182    }
183
184    // ======================================================================
185    // Index maintenance
186    // ======================================================================
187
188    /// Replace index entries using a two-phase (validate, then mutate) approach
189    /// to avoid partial updates on uniqueness violations.
190    fn replace_indexes(&self, old: Option<&E>, new: &E) -> Result<(), RuntimeError> {
191        use crate::db::store::IndexKey;
192
193        // Phase 1: validate uniqueness constraints without mutating.
194        // Acceptable failure: uniqueness violation is safe and retryable.
195        for index in E::INDEXES {
196            if index.unique
197                && let Some(new_idx_key) = IndexKey::new(new, index)
198            {
199                let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
200                let violates = store.with_borrow(|s| {
201                    if let Some(existing) = s.get(&new_idx_key) {
202                        let new_entity_key = new.key();
203                        !existing.contains(&new_entity_key) && !existing.is_empty()
204                    } else {
205                        false
206                    }
207                });
208
209                if violates {
210                    metrics::with_state_mut(|m| {
211                        metrics::record_unique_violation_for::<E>(m);
212                    });
213
214                    return Err(ExecutorError::index_violation(E::PATH, index.fields).into());
215                }
216            }
217        }
218
219        // Phase 2: apply mutations.
220        // Failure here can leave partial index updates (corruption risk).
221        for index in E::INDEXES {
222            let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
223            store.with_borrow_mut(|s| {
224                if let Some(old) = old {
225                    s.remove_index_entry(old, index);
226                }
227                s.insert_index_entry(new, index)?;
228                Ok::<(), RuntimeError>(())
229            })?;
230        }
231
232        Ok(())
233    }
234}