icydb_core/db/executor/
save.rs1use crate::{
2 db::{
3 Db,
4 executor::{ExecutorError, WriteUnit},
5 query::{SaveMode, SaveQuery},
6 store::DataKey,
7 },
8 deserialize,
9 obs::metrics,
10 runtime_error::RuntimeError,
11 sanitize, serialize,
12 traits::EntityKind,
13 validate,
14};
15use std::marker::PhantomData;
16
17#[derive(Clone, Copy)]
22pub struct SaveExecutor<E: EntityKind> {
23 db: Db<E::Canister>,
24 debug: bool,
25 _marker: PhantomData<E>,
26}
27
28impl<E: EntityKind> SaveExecutor<E> {
29 #[must_use]
34 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
35 Self {
36 db,
37 debug,
38 _marker: PhantomData,
39 }
40 }
41
42 #[must_use]
43 pub const fn debug(mut self) -> Self {
44 self.debug = true;
45 self
46 }
47
48 pub fn insert(&self, entity: E) -> Result<E, RuntimeError> {
54 self.save_entity(SaveMode::Insert, entity)
55 }
56
57 pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, RuntimeError> {
59 let entity = E::from_view(view);
60 Ok(self.insert(entity)?.to_view())
61 }
62
63 pub fn update(&self, entity: E) -> Result<E, RuntimeError> {
65 self.save_entity(SaveMode::Update, entity)
66 }
67
68 pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, RuntimeError> {
70 let entity = E::from_view(view);
71 Ok(self.update(entity)?.to_view())
72 }
73
74 pub fn replace(&self, entity: E) -> Result<E, RuntimeError> {
76 self.save_entity(SaveMode::Replace, entity)
77 }
78
79 pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, RuntimeError> {
81 let entity = E::from_view(view);
82 Ok(self.replace(entity)?.to_view())
83 }
84
85 pub fn insert_many(
90 &self,
91 entities: impl IntoIterator<Item = E>,
92 ) -> Result<Vec<E>, RuntimeError> {
93 let iter = entities.into_iter();
94 let mut out = Vec::with_capacity(iter.size_hint().0);
95 for entity in iter {
98 out.push(self.insert(entity)?);
99 }
100 Ok(out)
101 }
102
103 pub fn update_many(
104 &self,
105 entities: impl IntoIterator<Item = E>,
106 ) -> Result<Vec<E>, RuntimeError> {
107 let iter = entities.into_iter();
108 let mut out = Vec::with_capacity(iter.size_hint().0);
109 for entity in iter {
112 out.push(self.update(entity)?);
113 }
114 Ok(out)
115 }
116
117 pub fn replace_many(
118 &self,
119 entities: impl IntoIterator<Item = E>,
120 ) -> Result<Vec<E>, RuntimeError> {
121 let iter = entities.into_iter();
122 let mut out = Vec::with_capacity(iter.size_hint().0);
123 for entity in iter {
126 out.push(self.replace(entity)?);
127 }
128 Ok(out)
129 }
130
131 pub fn execute(&self, query: SaveQuery) -> Result<E, RuntimeError> {
137 let entity: E = deserialize(&query.bytes)?;
138 self.save_entity(query.mode, entity)
139 }
140
141 fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, RuntimeError> {
142 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Save);
143 let ctx = self.db.context::<E>();
144 let _unit = WriteUnit::new("save_entity");
145
146 sanitize(&mut entity)?;
148 validate(&entity)?;
149
150 let key = entity.key();
152 let data_key = DataKey::new::<E>(key);
153 let old_result = ctx.with_store(|store| store.get(&data_key))?;
154
155 let old = match (mode, old_result) {
156 (SaveMode::Insert | SaveMode::Replace, None) => None,
157
158 (SaveMode::Update | SaveMode::Replace, Some(old_bytes)) => {
159 Some(deserialize::<E>(&old_bytes)?)
160 }
161
162 (SaveMode::Insert, Some(_)) => return Err(ExecutorError::KeyExists(data_key).into()),
163 (SaveMode::Update, None) => return Err(ExecutorError::KeyNotFound(data_key).into()),
164 };
165
166 let bytes = serialize(&entity)?;
168
169 self.replace_indexes(old.as_ref(), &entity)?;
176
177 ctx.with_store_mut(|store| store.insert(data_key.clone(), bytes))?;
179 span.set_rows(1);
180
181 Ok(entity)
182 }
183
184 fn replace_indexes(&self, old: Option<&E>, new: &E) -> Result<(), RuntimeError> {
191 use crate::db::store::IndexKey;
192
193 for index in E::INDEXES {
196 if index.unique
197 && let Some(new_idx_key) = IndexKey::new(new, index)
198 {
199 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
200 let violates = store.with_borrow(|s| {
201 if let Some(existing) = s.get(&new_idx_key) {
202 let new_entity_key = new.key();
203 !existing.contains(&new_entity_key) && !existing.is_empty()
204 } else {
205 false
206 }
207 });
208
209 if violates {
210 metrics::with_state_mut(|m| {
211 metrics::record_unique_violation_for::<E>(m);
212 });
213
214 return Err(ExecutorError::index_violation(E::PATH, index.fields).into());
215 }
216 }
217 }
218
219 for index in E::INDEXES {
222 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
223 store.with_borrow_mut(|s| {
224 if let Some(old) = old {
225 s.remove_index_entry(old, index);
226 }
227 s.insert_index_entry(new, index)?;
228 Ok::<(), RuntimeError>(())
229 })?;
230 }
231
232 Ok(())
233 }
234}