1use crate::{
2 db::{
3 CommitDataOp, CommitKind, CommitMarker, Db, begin_commit, ensure_recovered,
4 executor::{ExecutorError, WriteUnit},
5 finish_commit,
6 index::{
7 IndexInsertOutcome, IndexRemoveOutcome,
8 plan::{IndexApplyPlan, plan_index_mutation_for_entity},
9 },
10 query::{SaveMode, SaveQuery},
11 store::{DataKey, RawRow},
12 },
13 error::{ErrorOrigin, InternalError},
14 obs::sink::{self, ExecKind, MetricsEvent, Span},
15 sanitize::sanitize,
16 serialize::{deserialize, serialize},
17 traits::{EntityKind, Path},
18 validate::validate,
19};
20use std::marker::PhantomData;
21
22#[derive(Clone, Copy)]
27pub struct SaveExecutor<E: EntityKind> {
28 db: Db<E::Canister>,
29 debug: bool,
30 _marker: PhantomData<E>,
31}
32
33impl<E: EntityKind> SaveExecutor<E> {
34 #[must_use]
39 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
40 Self {
41 db,
42 debug,
43 _marker: PhantomData,
44 }
45 }
46
47 #[must_use]
48 pub const fn debug(mut self) -> Self {
49 self.debug = true;
50 self
51 }
52
53 fn debug_log(&self, s: impl Into<String>) {
54 if self.debug {
55 println!("{}", s.into());
56 }
57 }
58
59 pub fn insert(&self, entity: E) -> Result<E, InternalError> {
65 self.save_entity(SaveMode::Insert, entity)
66 }
67
68 pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
70 let entity = E::from_view(view);
71 Ok(self.insert(entity)?.to_view())
72 }
73
74 pub fn update(&self, entity: E) -> Result<E, InternalError> {
76 self.save_entity(SaveMode::Update, entity)
77 }
78
79 pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
81 let entity = E::from_view(view);
82
83 Ok(self.update(entity)?.to_view())
84 }
85
86 pub fn replace(&self, entity: E) -> Result<E, InternalError> {
88 self.save_entity(SaveMode::Replace, entity)
89 }
90
91 pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
93 let entity = E::from_view(view);
94
95 Ok(self.replace(entity)?.to_view())
96 }
97
98 pub fn insert_many(
103 &self,
104 entities: impl IntoIterator<Item = E>,
105 ) -> Result<Vec<E>, InternalError> {
106 let iter = entities.into_iter();
107 let mut out = Vec::with_capacity(iter.size_hint().0);
108
109 for entity in iter {
112 out.push(self.insert(entity)?);
113 }
114
115 Ok(out)
116 }
117
118 pub fn update_many(
119 &self,
120 entities: impl IntoIterator<Item = E>,
121 ) -> Result<Vec<E>, InternalError> {
122 let iter = entities.into_iter();
123 let mut out = Vec::with_capacity(iter.size_hint().0);
124
125 for entity in iter {
128 out.push(self.update(entity)?);
129 }
130
131 Ok(out)
132 }
133
134 pub fn replace_many(
135 &self,
136 entities: impl IntoIterator<Item = E>,
137 ) -> Result<Vec<E>, InternalError> {
138 let iter = entities.into_iter();
139 let mut out = Vec::with_capacity(iter.size_hint().0);
140
141 for entity in iter {
144 out.push(self.replace(entity)?);
145 }
146
147 Ok(out)
148 }
149
150 pub fn execute(&self, query: SaveQuery) -> Result<E, InternalError> {
159 let entity: E = deserialize(&query.bytes)?;
160 self.save_entity(query.mode, entity)
161 }
162
163 fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, InternalError> {
164 let mut span = Span::<E>::new(ExecKind::Save);
165 let ctx = self.db.context::<E>();
166 let _unit = WriteUnit::new("save_entity_stage2_atomic");
167
168 ensure_recovered(&self.db)?;
170
171 sanitize(&mut entity)?;
173 validate(&entity)?;
174
175 let key = entity.key();
176 let data_key = DataKey::new::<E>(key);
177 let raw_key = data_key.to_raw();
178
179 self.debug_log(format!(
180 "[debug] save {:?} on {} (key={})",
181 mode,
182 E::PATH,
183 data_key
184 ));
185 let old = match mode {
186 SaveMode::Insert => {
187 if ctx.with_store(|store| store.contains_key(&raw_key))? {
189 return Err(ExecutorError::KeyExists(data_key).into());
190 }
191 None
192 }
193 SaveMode::Update => {
194 let Some(old_row) = ctx.with_store(|store| store.get(&raw_key))? else {
195 return Err(InternalError::store_not_found(data_key.to_string()));
196 };
197 Some(old_row.try_decode::<E>().map_err(|err| {
198 ExecutorError::corruption(
199 ErrorOrigin::Serialize,
200 format!("failed to deserialize row: {data_key} ({err})"),
201 )
202 })?)
203 }
204 SaveMode::Replace => {
205 let old_row = ctx.with_store(|store| store.get(&raw_key))?;
206 old_row
207 .map(|row| {
208 row.try_decode::<E>().map_err(|err| {
209 ExecutorError::corruption(
210 ErrorOrigin::Serialize,
211 format!("failed to deserialize row: {data_key} ({err})"),
212 )
213 })
214 })
215 .transpose()?
216 }
217 };
218
219 let bytes = serialize(&entity)?;
220 let row = RawRow::try_new(bytes)?;
221
222 ctx.with_store(|_| ())?;
224
225 let index_plan =
229 plan_index_mutation_for_entity::<E>(&self.db, old.as_ref(), Some(&entity))?;
230 let data_op = CommitDataOp {
231 store: E::Store::PATH.to_string(),
232 key: raw_key.as_bytes().to_vec(),
233 value: Some(row.as_bytes().to_vec()),
234 };
235 let marker = CommitMarker::new(CommitKind::Save, index_plan.commit_ops, vec![data_op])?;
236 let commit = begin_commit(marker)?;
237
238 finish_commit(
240 commit,
241 || Self::apply_indexes(&index_plan.apply, old.as_ref(), &entity),
242 || {
243 ctx.with_store_mut(|store| store.insert(raw_key, row))
244 .expect("data store missing after preflight");
245 span.set_rows(1);
246 },
247 );
248
249 Ok(entity)
250 }
251
252 fn apply_indexes(plans: &[IndexApplyPlan], old: Option<&E>, new: &E) {
258 for plan in plans {
260 let mut removed = false;
261 let mut inserted = false;
262
263 plan.store.with_borrow_mut(|s| {
264 if let Some(old) = old {
265 let outcome = s
266 .remove_index_entry(old, plan.index)
267 .expect("index remove failed after prevalidation");
268 if outcome == IndexRemoveOutcome::Removed {
269 removed = true;
270 }
271 }
272
273 let outcome = s
274 .insert_index_entry(new, plan.index)
275 .expect("index insert failed after prevalidation");
276 if outcome == IndexInsertOutcome::Inserted {
277 inserted = true;
278 }
279 });
280
281 if removed {
282 sink::record(MetricsEvent::IndexRemove {
283 entity_path: E::PATH,
284 });
285 }
286
287 if inserted {
288 sink::record(MetricsEvent::IndexInsert {
289 entity_path: E::PATH,
290 });
291 }
292 }
293 }
294}