icydb_core/db/executor/
save.rs1use crate::{
2 db::{
3 Db,
4 executor::{ExecutorError, WriteUnit},
5 query::{SaveMode, SaveQuery},
6 store::{DataKey, IndexInsertError, IndexInsertOutcome, IndexRemoveOutcome},
7 },
8 error::InternalError,
9 obs::sink::{self, ExecKind, MetricsEvent, Span},
10 sanitize::sanitize,
11 serialize::{deserialize, serialize},
12 traits::EntityKind,
13 validate::validate,
14};
15use std::marker::PhantomData;
16
17#[derive(Clone, Copy)]
22pub struct SaveExecutor<E: EntityKind> {
23 db: Db<E::Canister>,
24 debug: bool,
25 _marker: PhantomData<E>,
26}
27
28impl<E: EntityKind> SaveExecutor<E> {
29 #[must_use]
34 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
35 Self {
36 db,
37 debug,
38 _marker: PhantomData,
39 }
40 }
41
42 #[must_use]
43 pub const fn debug(mut self) -> Self {
44 self.debug = true;
45 self
46 }
47
48 pub fn insert(&self, entity: E) -> Result<E, InternalError> {
54 self.save_entity(SaveMode::Insert, entity)
55 }
56
57 pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
59 let entity = E::from_view(view);
60 Ok(self.insert(entity)?.to_view())
61 }
62
63 pub fn update(&self, entity: E) -> Result<E, InternalError> {
65 self.save_entity(SaveMode::Update, entity)
66 }
67
68 pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
70 let entity = E::from_view(view);
71
72 Ok(self.update(entity)?.to_view())
73 }
74
75 pub fn replace(&self, entity: E) -> Result<E, InternalError> {
77 self.save_entity(SaveMode::Replace, entity)
78 }
79
80 pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
82 let entity = E::from_view(view);
83
84 Ok(self.replace(entity)?.to_view())
85 }
86
87 pub fn insert_many(
92 &self,
93 entities: impl IntoIterator<Item = E>,
94 ) -> Result<Vec<E>, InternalError> {
95 let iter = entities.into_iter();
96 let mut out = Vec::with_capacity(iter.size_hint().0);
97
98 for entity in iter {
101 out.push(self.insert(entity)?);
102 }
103
104 Ok(out)
105 }
106
107 pub fn update_many(
108 &self,
109 entities: impl IntoIterator<Item = E>,
110 ) -> Result<Vec<E>, InternalError> {
111 let iter = entities.into_iter();
112 let mut out = Vec::with_capacity(iter.size_hint().0);
113
114 for entity in iter {
117 out.push(self.update(entity)?);
118 }
119
120 Ok(out)
121 }
122
123 pub fn replace_many(
124 &self,
125 entities: impl IntoIterator<Item = E>,
126 ) -> Result<Vec<E>, InternalError> {
127 let iter = entities.into_iter();
128 let mut out = Vec::with_capacity(iter.size_hint().0);
129
130 for entity in iter {
133 out.push(self.replace(entity)?);
134 }
135
136 Ok(out)
137 }
138
139 pub fn execute(&self, query: SaveQuery) -> Result<E, InternalError> {
148 let entity: E = deserialize(&query.bytes)?;
149 self.save_entity(query.mode, entity)
150 }
151
152 fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, InternalError> {
153 let mut span = Span::<E>::new(ExecKind::Save);
154 let ctx = self.db.context::<E>();
155 let _unit = WriteUnit::new("save_entity_non_atomic");
156
157 sanitize(&mut entity)?;
159 validate(&entity)?;
160
161 let key = entity.key();
162 let data_key = DataKey::new::<E>(key);
163 let old_result = ctx.with_store(|store| store.get(&data_key))?;
164
165 let old = match (mode, old_result) {
166 (SaveMode::Insert | SaveMode::Replace, None) => None,
167 (SaveMode::Update | SaveMode::Replace, Some(old_bytes)) => {
168 Some(deserialize::<E>(&old_bytes)?)
169 }
170 (SaveMode::Insert, Some(_)) => return Err(ExecutorError::KeyExists(data_key).into()),
171 (SaveMode::Update, None) => return Err(ExecutorError::KeyNotFound(data_key).into()),
172 };
173
174 let bytes = serialize(&entity)?;
175
176 self.replace_indexes(old.as_ref(), &entity)?;
182
183 ctx.with_store_mut(|store| store.insert(data_key.clone(), bytes))?;
184 span.set_rows(1);
185
186 Ok(entity)
187 }
188
189 fn replace_indexes(&self, old: Option<&E>, new: &E) -> Result<(), InternalError> {
196 use crate::db::store::IndexKey;
197
198 for index in E::INDEXES {
200 if index.unique
201 && let Some(new_idx_key) = IndexKey::new(new, index)
202 {
203 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
204 let violates = store.with_borrow(|s| {
205 if let Some(existing) = s.get(&new_idx_key) {
206 let new_entity_key = new.key();
207 !existing.contains(&new_entity_key) && !existing.is_empty()
208 } else {
209 false
210 }
211 });
212
213 if violates {
214 sink::record(MetricsEvent::UniqueViolation {
215 entity_path: E::PATH,
216 });
217
218 return Err(ExecutorError::index_violation(E::PATH, index.fields).into());
219 }
220 }
221 }
222
223 for index in E::INDEXES {
226 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
227 let mut removed = false;
228 let mut inserted = false;
229 store.with_borrow_mut(|s| {
230 if let Some(old) = old
231 && s.remove_index_entry(old, index) == IndexRemoveOutcome::Removed
232 {
233 removed = true;
234 }
235 match s.insert_index_entry(new, index) {
236 Ok(IndexInsertOutcome::Inserted) => {
237 inserted = true;
238 }
239 Ok(IndexInsertOutcome::Skipped) => {}
240 Err(IndexInsertError::UniqueViolation) => {
241 sink::record(MetricsEvent::UniqueViolation {
242 entity_path: E::PATH,
243 });
244 return Err(ExecutorError::index_violation(E::PATH, index.fields).into());
245 }
246 }
247 Ok::<(), InternalError>(())
248 })?;
249
250 if removed {
251 sink::record(MetricsEvent::IndexRemove {
252 entity_path: E::PATH,
253 });
254 }
255
256 if inserted {
257 sink::record(MetricsEvent::IndexInsert {
258 entity_path: E::PATH,
259 });
260 }
261 }
262
263 Ok(())
264 }
265}