icydb_core/db/executor/
save.rs

1use crate::{
2    Error,
3    db::{
4        Db,
5        executor::ExecutorError,
6        query::{SaveMode, SaveQuery},
7        store::DataKey,
8    },
9    deserialize,
10    obs::metrics,
11    sanitize, serialize,
12    traits::EntityKind,
13    validate,
14};
15use std::marker::PhantomData;
16
17///
18/// SaveExecutor
19///
20
21#[derive(Clone, Copy)]
22pub struct SaveExecutor<E: EntityKind> {
23    db: Db<E::Canister>,
24    debug: bool,
25    _marker: PhantomData<E>,
26}
27
28impl<E: EntityKind> SaveExecutor<E> {
29    // ======================================================================
30    // Construction & configuration
31    // ======================================================================
32
33    #[must_use]
34    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
35        Self {
36            db,
37            debug,
38            _marker: PhantomData,
39        }
40    }
41
42    #[must_use]
43    pub const fn debug(mut self) -> Self {
44        self.debug = true;
45        self
46    }
47
48    // ======================================================================
49    // Single-entity save operations
50    // ======================================================================
51
52    /// Insert a brand-new entity (errors if the key already exists).
53    pub fn insert(&self, entity: E) -> Result<E, Error> {
54        self.save_entity(SaveMode::Insert, entity)
55    }
56
57    /// Insert a new view, returning the stored view.
58    pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, Error> {
59        let entity = E::from_view(view);
60        Ok(self.insert(entity)?.to_view())
61    }
62
63    /// Update an existing entity (errors if it does not exist).
64    pub fn update(&self, entity: E) -> Result<E, Error> {
65        self.save_entity(SaveMode::Update, entity)
66    }
67
68    /// Update an existing view (errors if it does not exist).
69    pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, Error> {
70        let entity = E::from_view(view);
71        Ok(self.update(entity)?.to_view())
72    }
73
74    /// Replace an entity, inserting if missing.
75    pub fn replace(&self, entity: E) -> Result<E, Error> {
76        self.save_entity(SaveMode::Replace, entity)
77    }
78
79    /// Replace a view, inserting if missing.
80    pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, Error> {
81        let entity = E::from_view(view);
82        Ok(self.replace(entity)?.to_view())
83    }
84
85    // ======================================================================
86    // Batch save operations (fail-fast, non-atomic)
87    // ======================================================================
88
89    pub fn insert_many(&self, entities: impl IntoIterator<Item = E>) -> Result<Vec<E>, Error> {
90        let iter = entities.into_iter();
91        let mut out = Vec::with_capacity(iter.size_hint().0);
92        for entity in iter {
93            out.push(self.insert(entity)?);
94        }
95        Ok(out)
96    }
97
98    pub fn update_many(&self, entities: impl IntoIterator<Item = E>) -> Result<Vec<E>, Error> {
99        let iter = entities.into_iter();
100        let mut out = Vec::with_capacity(iter.size_hint().0);
101        for entity in iter {
102            out.push(self.update(entity)?);
103        }
104        Ok(out)
105    }
106
107    pub fn replace_many(&self, entities: impl IntoIterator<Item = E>) -> Result<Vec<E>, Error> {
108        let iter = entities.into_iter();
109        let mut out = Vec::with_capacity(iter.size_hint().0);
110        for entity in iter {
111            out.push(self.replace(entity)?);
112        }
113        Ok(out)
114    }
115
116    // ======================================================================
117    // Low-level execution
118    // ======================================================================
119
120    /// Execute a serialized save query.
121    pub fn execute(&self, query: SaveQuery) -> Result<E, Error> {
122        let entity: E = deserialize(&query.bytes)?;
123        self.save_entity(query.mode, entity)
124    }
125
126    fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, Error> {
127        let mut span = metrics::Span::<E>::new(metrics::ExecKind::Save);
128        let ctx = self.db.context::<E>();
129
130        // sanitize & validate before key extraction in case PK fields are normalized
131        sanitize(&mut entity)?;
132        validate(&entity)?;
133
134        // match save mode
135        let key = entity.key();
136        let data_key = DataKey::new::<E>(key);
137        let old_result = ctx.with_store(|store| store.get(&data_key))?;
138
139        let old = match (mode, old_result) {
140            (SaveMode::Insert | SaveMode::Replace, None) => None,
141
142            (SaveMode::Update | SaveMode::Replace, Some(old_bytes)) => {
143                Some(deserialize::<E>(&old_bytes)?)
144            }
145
146            (SaveMode::Insert, Some(_)) => return Err(ExecutorError::KeyExists(data_key))?,
147            (SaveMode::Update, None) => return Err(ExecutorError::KeyNotFound(data_key))?,
148        };
149
150        // serialize new entity
151        let bytes = serialize(&entity)?;
152
153        // update indexes (two-phase)
154        self.replace_indexes(old.as_ref(), &entity)?;
155
156        // write data row
157        ctx.with_store_mut(|store| store.insert(data_key.clone(), bytes))?;
158        span.set_rows(1);
159
160        Ok(entity)
161    }
162
163    // ======================================================================
164    // Index maintenance
165    // ======================================================================
166
167    /// Replace index entries using a two-phase (validate, then mutate) approach
168    /// to avoid partial updates on uniqueness violations.
169    fn replace_indexes(&self, old: Option<&E>, new: &E) -> Result<(), Error> {
170        use crate::db::store::IndexKey;
171
172        // Phase 1: validate uniqueness constraints without mutating
173        for index in E::INDEXES {
174            if index.unique
175                && let Some(new_idx_key) = IndexKey::new(new, index)
176            {
177                let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
178                let violates = store.with_borrow(|s| {
179                    if let Some(existing) = s.get(&new_idx_key) {
180                        let new_entity_key = new.key();
181                        !existing.contains(&new_entity_key) && !existing.is_empty()
182                    } else {
183                        false
184                    }
185                });
186
187                if violates {
188                    metrics::with_state_mut(|m| {
189                        metrics::record_unique_violation_for::<E>(m);
190                    });
191
192                    return Err(ExecutorError::index_violation(E::PATH, index.fields).into());
193                }
194            }
195        }
196
197        // Phase 2: apply mutations
198        for index in E::INDEXES {
199            let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
200            store.with_borrow_mut(|s| {
201                if let Some(old) = old {
202                    s.remove_index_entry(old, index);
203                }
204                s.insert_index_entry(new, index)?;
205                Ok::<(), Error>(())
206            })?;
207        }
208
209        Ok(())
210    }
211}