icydb_core/db/executor/
save.rs1use crate::{
2 Error,
3 db::{
4 Db,
5 executor::ExecutorError,
6 query::{SaveMode, SaveQuery},
7 store::DataKey,
8 },
9 deserialize,
10 obs::metrics,
11 sanitize, serialize,
12 traits::EntityKind,
13 validate,
14};
15use std::marker::PhantomData;
16
17#[derive(Clone, Copy)]
22pub struct SaveExecutor<E: EntityKind> {
23 db: Db<E::Canister>,
24 debug: bool,
25 _marker: PhantomData<E>,
26}
27
28impl<E: EntityKind> SaveExecutor<E> {
29 #[must_use]
34 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
35 Self {
36 db,
37 debug,
38 _marker: PhantomData,
39 }
40 }
41
42 #[must_use]
43 pub const fn debug(mut self) -> Self {
44 self.debug = true;
45 self
46 }
47
48 pub fn insert(&self, entity: E) -> Result<E, Error> {
54 self.save_entity(SaveMode::Insert, entity)
55 }
56
57 pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, Error> {
59 let entity = E::from_view(view);
60 Ok(self.insert(entity)?.to_view())
61 }
62
63 pub fn update(&self, entity: E) -> Result<E, Error> {
65 self.save_entity(SaveMode::Update, entity)
66 }
67
68 pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, Error> {
70 let entity = E::from_view(view);
71 Ok(self.update(entity)?.to_view())
72 }
73
74 pub fn replace(&self, entity: E) -> Result<E, Error> {
76 self.save_entity(SaveMode::Replace, entity)
77 }
78
79 pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, Error> {
81 let entity = E::from_view(view);
82 Ok(self.replace(entity)?.to_view())
83 }
84
85 pub fn insert_many(&self, entities: impl IntoIterator<Item = E>) -> Result<Vec<E>, Error> {
90 let iter = entities.into_iter();
91 let mut out = Vec::with_capacity(iter.size_hint().0);
92 for entity in iter {
93 out.push(self.insert(entity)?);
94 }
95 Ok(out)
96 }
97
98 pub fn update_many(&self, entities: impl IntoIterator<Item = E>) -> Result<Vec<E>, Error> {
99 let iter = entities.into_iter();
100 let mut out = Vec::with_capacity(iter.size_hint().0);
101 for entity in iter {
102 out.push(self.update(entity)?);
103 }
104 Ok(out)
105 }
106
107 pub fn replace_many(&self, entities: impl IntoIterator<Item = E>) -> Result<Vec<E>, Error> {
108 let iter = entities.into_iter();
109 let mut out = Vec::with_capacity(iter.size_hint().0);
110 for entity in iter {
111 out.push(self.replace(entity)?);
112 }
113 Ok(out)
114 }
115
116 pub fn execute(&self, query: SaveQuery) -> Result<E, Error> {
122 let entity: E = deserialize(&query.bytes)?;
123 self.save_entity(query.mode, entity)
124 }
125
126 fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, Error> {
127 let mut span = metrics::Span::<E>::new(metrics::ExecKind::Save);
128 let ctx = self.db.context::<E>();
129
130 sanitize(&mut entity)?;
132 validate(&entity)?;
133
134 let key = entity.key();
136 let data_key = DataKey::new::<E>(key);
137 let old_result = ctx.with_store(|store| store.get(&data_key))?;
138
139 let old = match (mode, old_result) {
140 (SaveMode::Insert | SaveMode::Replace, None) => None,
141
142 (SaveMode::Update | SaveMode::Replace, Some(old_bytes)) => {
143 Some(deserialize::<E>(&old_bytes)?)
144 }
145
146 (SaveMode::Insert, Some(_)) => return Err(ExecutorError::KeyExists(data_key))?,
147 (SaveMode::Update, None) => return Err(ExecutorError::KeyNotFound(data_key))?,
148 };
149
150 let bytes = serialize(&entity)?;
152
153 self.replace_indexes(old.as_ref(), &entity)?;
155
156 ctx.with_store_mut(|store| store.insert(data_key.clone(), bytes))?;
158 span.set_rows(1);
159
160 Ok(entity)
161 }
162
163 fn replace_indexes(&self, old: Option<&E>, new: &E) -> Result<(), Error> {
170 use crate::db::store::IndexKey;
171
172 for index in E::INDEXES {
174 if index.unique
175 && let Some(new_idx_key) = IndexKey::new(new, index)
176 {
177 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
178 let violates = store.with_borrow(|s| {
179 if let Some(existing) = s.get(&new_idx_key) {
180 let new_entity_key = new.key();
181 !existing.contains(&new_entity_key) && !existing.is_empty()
182 } else {
183 false
184 }
185 });
186
187 if violates {
188 metrics::with_state_mut(|m| {
189 metrics::record_unique_violation_for::<E>(m);
190 });
191
192 return Err(ExecutorError::index_violation(E::PATH, index.fields).into());
193 }
194 }
195 }
196
197 for index in E::INDEXES {
199 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
200 store.with_borrow_mut(|s| {
201 if let Some(old) = old {
202 s.remove_index_entry(old, index);
203 }
204 s.insert_index_entry(new, index)?;
205 Ok::<(), Error>(())
206 })?;
207 }
208
209 Ok(())
210 }
211}