1use crate::{
2 db::{
3 Db,
4 executor::{ExecutorError, WriteUnit},
5 query::{SaveMode, SaveQuery},
6 store::{DataKey, IndexInsertError, IndexInsertOutcome, IndexRemoveOutcome, RawRow},
7 },
8 error::{ErrorClass, ErrorOrigin, InternalError},
9 obs::sink::{self, ExecKind, MetricsEvent, Span},
10 sanitize::sanitize,
11 serialize::{deserialize, serialize},
12 traits::EntityKind,
13 validate::validate,
14};
15use std::marker::PhantomData;
16
17#[derive(Clone, Copy)]
22pub struct SaveExecutor<E: EntityKind> {
23 db: Db<E::Canister>,
24 debug: bool,
25 _marker: PhantomData<E>,
26}
27
28impl<E: EntityKind> SaveExecutor<E> {
29 #[must_use]
34 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
35 Self {
36 db,
37 debug,
38 _marker: PhantomData,
39 }
40 }
41
42 #[must_use]
43 pub const fn debug(mut self) -> Self {
44 self.debug = true;
45 self
46 }
47
48 pub fn insert(&self, entity: E) -> Result<E, InternalError> {
54 self.save_entity(SaveMode::Insert, entity)
55 }
56
57 pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
59 let entity = E::from_view(view);
60 Ok(self.insert(entity)?.to_view())
61 }
62
63 pub fn update(&self, entity: E) -> Result<E, InternalError> {
65 self.save_entity(SaveMode::Update, entity)
66 }
67
68 pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
70 let entity = E::from_view(view);
71
72 Ok(self.update(entity)?.to_view())
73 }
74
75 pub fn replace(&self, entity: E) -> Result<E, InternalError> {
77 self.save_entity(SaveMode::Replace, entity)
78 }
79
80 pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
82 let entity = E::from_view(view);
83
84 Ok(self.replace(entity)?.to_view())
85 }
86
87 pub fn insert_many(
92 &self,
93 entities: impl IntoIterator<Item = E>,
94 ) -> Result<Vec<E>, InternalError> {
95 let iter = entities.into_iter();
96 let mut out = Vec::with_capacity(iter.size_hint().0);
97
98 for entity in iter {
101 out.push(self.insert(entity)?);
102 }
103
104 Ok(out)
105 }
106
107 pub fn update_many(
108 &self,
109 entities: impl IntoIterator<Item = E>,
110 ) -> Result<Vec<E>, InternalError> {
111 let iter = entities.into_iter();
112 let mut out = Vec::with_capacity(iter.size_hint().0);
113
114 for entity in iter {
117 out.push(self.update(entity)?);
118 }
119
120 Ok(out)
121 }
122
123 pub fn replace_many(
124 &self,
125 entities: impl IntoIterator<Item = E>,
126 ) -> Result<Vec<E>, InternalError> {
127 let iter = entities.into_iter();
128 let mut out = Vec::with_capacity(iter.size_hint().0);
129
130 for entity in iter {
133 out.push(self.replace(entity)?);
134 }
135
136 Ok(out)
137 }
138
139 pub fn execute(&self, query: SaveQuery) -> Result<E, InternalError> {
148 let entity: E = deserialize(&query.bytes)?;
149 self.save_entity(query.mode, entity)
150 }
151
152 fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, InternalError> {
153 let mut span = Span::<E>::new(ExecKind::Save);
154 let ctx = self.db.context::<E>();
155 let _unit = WriteUnit::new("save_entity_non_atomic");
156
157 sanitize(&mut entity)?;
159 validate(&entity)?;
160
161 let key = entity.key();
162 let data_key = DataKey::new::<E>(key);
163 let raw_key = data_key.to_raw();
164 let old_result = ctx.with_store(|store| store.get(&raw_key))?;
165
166 let old = match (mode, old_result) {
167 (SaveMode::Insert | SaveMode::Replace, None) => None,
168 (SaveMode::Update | SaveMode::Replace, Some(old_row)) => {
169 Some(old_row.try_decode::<E>().map_err(|err| {
170 ExecutorError::corruption(
171 ErrorOrigin::Serialize,
172 format!("failed to deserialize row: {data_key} ({err})"),
173 )
174 })?)
175 }
176 (SaveMode::Insert, Some(_)) => return Err(ExecutorError::KeyExists(data_key).into()),
177 (SaveMode::Update, None) => return Err(ExecutorError::KeyNotFound(data_key).into()),
178 };
179
180 let bytes = serialize(&entity)?;
181 let row = RawRow::try_new(bytes)?;
182
183 self.replace_indexes(old.as_ref(), &entity)?;
189
190 ctx.with_store_mut(|store| store.insert(raw_key, row))?;
191 span.set_rows(1);
192
193 Ok(entity)
194 }
195
196 #[allow(clippy::too_many_lines)]
203 fn replace_indexes(&self, old: Option<&E>, new: &E) -> Result<(), InternalError> {
204 use crate::db::store::IndexKey;
205
206 for index in E::INDEXES {
208 if index.unique
209 && let Some(new_idx_key) = IndexKey::new(new, index)
210 {
211 let raw_key = new_idx_key.to_raw();
212 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
213 let violates = store.with_borrow(|s| -> Result<bool, InternalError> {
214 if let Some(existing) = s.get(&raw_key) {
215 let entry = existing.try_decode().map_err(|err| {
216 ExecutorError::corruption(
217 ErrorOrigin::Index,
218 format!(
219 "index corrupted: {} ({}) -> {}",
220 E::PATH,
221 index.fields.join(", "),
222 err
223 ),
224 )
225 })?;
226 if entry.len() > 1 {
227 return Err(ExecutorError::corruption(
228 ErrorOrigin::Index,
229 format!(
230 "index corrupted: {} ({}) -> {} keys",
231 E::PATH,
232 index.fields.join(", "),
233 entry.len()
234 ),
235 )
236 .into());
237 }
238 let new_entity_key = new.key();
239 Ok(!entry.contains(&new_entity_key) && !entry.is_empty())
240 } else {
241 Ok(false)
242 }
243 })?;
244
245 if violates {
246 sink::record(MetricsEvent::UniqueViolation {
247 entity_path: E::PATH,
248 });
249
250 return Err(ExecutorError::index_violation(E::PATH, index.fields).into());
251 }
252 }
253 }
254
255 for index in E::INDEXES {
258 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
259 let mut removed = false;
260 let mut inserted = false;
261 store.with_borrow_mut(|s| {
262 if let Some(old) = old
263 && s.remove_index_entry(old, index).map_err(|err| {
264 ExecutorError::corruption(
265 ErrorOrigin::Index,
266 format!(
267 "index corrupted: {} ({}) -> {}",
268 E::PATH,
269 index.fields.join(", "),
270 err
271 ),
272 )
273 })? == IndexRemoveOutcome::Removed
274 {
275 removed = true;
276 }
277 match s.insert_index_entry(new, index) {
278 Ok(IndexInsertOutcome::Inserted) => {
279 inserted = true;
280 }
281 Ok(IndexInsertOutcome::Skipped) => {}
282 Err(IndexInsertError::UniqueViolation) => {
283 sink::record(MetricsEvent::UniqueViolation {
284 entity_path: E::PATH,
285 });
286 return Err(ExecutorError::index_violation(E::PATH, index.fields).into());
287 }
288 Err(IndexInsertError::CorruptedEntry(err)) => {
289 return Err(ExecutorError::corruption(
290 ErrorOrigin::Index,
291 format!(
292 "index corrupted: {} ({}) -> {}",
293 E::PATH,
294 index.fields.join(", "),
295 err
296 ),
297 )
298 .into());
299 }
300 Err(IndexInsertError::EntryTooLarge { keys }) => {
301 return Err(InternalError::new(
302 ErrorClass::Unsupported,
303 ErrorOrigin::Index,
304 format!(
305 "index entry exceeds max keys: {} ({}) -> {keys} keys",
306 E::PATH,
307 index.fields.join(", ")
308 ),
309 ));
310 }
311 }
312 Ok::<(), InternalError>(())
313 })?;
314
315 if removed {
316 sink::record(MetricsEvent::IndexRemove {
317 entity_path: E::PATH,
318 });
319 }
320
321 if inserted {
322 sink::record(MetricsEvent::IndexInsert {
323 entity_path: E::PATH,
324 });
325 }
326 }
327
328 Ok(())
329 }
330}