1use crate::{
2 db::{
3 CommitDataOp, CommitKind, CommitMarker, Db, begin_commit, ensure_recovered,
4 executor::{ExecutorError, WriteUnit},
5 finish_commit,
6 index::{
7 IndexInsertOutcome, IndexRemoveOutcome,
8 plan::{IndexApplyPlan, plan_index_mutation_for_entity},
9 },
10 query::{SaveMode, SaveQuery},
11 store::{DataKey, RawRow},
12 },
13 error::{ErrorOrigin, InternalError},
14 obs::sink::{self, ExecKind, MetricsEvent, Span},
15 sanitize::sanitize,
16 serialize::{deserialize, serialize},
17 traits::{EntityKind, Path},
18 validate::validate,
19};
20use std::marker::PhantomData;
21
22#[derive(Clone, Copy)]
27pub struct SaveExecutor<E: EntityKind> {
28 db: Db<E::Canister>,
29 debug: bool,
30 _marker: PhantomData<E>,
31}
32
33impl<E: EntityKind> SaveExecutor<E> {
34 #[must_use]
39 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
40 Self {
41 db,
42 debug,
43 _marker: PhantomData,
44 }
45 }
46
47 #[must_use]
48 pub const fn debug(mut self) -> Self {
49 self.debug = true;
50 self
51 }
52
53 pub fn insert(&self, entity: E) -> Result<E, InternalError> {
59 self.save_entity(SaveMode::Insert, entity)
60 }
61
62 pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
64 let entity = E::from_view(view);
65 Ok(self.insert(entity)?.to_view())
66 }
67
68 pub fn update(&self, entity: E) -> Result<E, InternalError> {
70 self.save_entity(SaveMode::Update, entity)
71 }
72
73 pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
75 let entity = E::from_view(view);
76
77 Ok(self.update(entity)?.to_view())
78 }
79
80 pub fn replace(&self, entity: E) -> Result<E, InternalError> {
82 self.save_entity(SaveMode::Replace, entity)
83 }
84
85 pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
87 let entity = E::from_view(view);
88
89 Ok(self.replace(entity)?.to_view())
90 }
91
92 pub fn insert_many(
97 &self,
98 entities: impl IntoIterator<Item = E>,
99 ) -> Result<Vec<E>, InternalError> {
100 let iter = entities.into_iter();
101 let mut out = Vec::with_capacity(iter.size_hint().0);
102
103 for entity in iter {
106 out.push(self.insert(entity)?);
107 }
108
109 Ok(out)
110 }
111
112 pub fn update_many(
113 &self,
114 entities: impl IntoIterator<Item = E>,
115 ) -> Result<Vec<E>, InternalError> {
116 let iter = entities.into_iter();
117 let mut out = Vec::with_capacity(iter.size_hint().0);
118
119 for entity in iter {
122 out.push(self.update(entity)?);
123 }
124
125 Ok(out)
126 }
127
128 pub fn replace_many(
129 &self,
130 entities: impl IntoIterator<Item = E>,
131 ) -> Result<Vec<E>, InternalError> {
132 let iter = entities.into_iter();
133 let mut out = Vec::with_capacity(iter.size_hint().0);
134
135 for entity in iter {
138 out.push(self.replace(entity)?);
139 }
140
141 Ok(out)
142 }
143
144 pub fn execute(&self, query: SaveQuery) -> Result<E, InternalError> {
153 let entity: E = deserialize(&query.bytes)?;
154 self.save_entity(query.mode, entity)
155 }
156
157 fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, InternalError> {
158 let mut span = Span::<E>::new(ExecKind::Save);
159 let ctx = self.db.context::<E>();
160 let _unit = WriteUnit::new("save_entity_stage2_atomic");
161
162 ensure_recovered(&self.db)?;
164
165 sanitize(&mut entity)?;
167 validate(&entity)?;
168
169 let key = entity.key();
170 let data_key = DataKey::new::<E>(key);
171 let raw_key = data_key.to_raw();
172 let old_result = ctx.with_store(|store| store.get(&raw_key))?;
173
174 let old = match (mode, old_result) {
175 (SaveMode::Insert | SaveMode::Replace, None) => None,
176 (SaveMode::Update | SaveMode::Replace, Some(old_row)) => {
177 Some(old_row.try_decode::<E>().map_err(|err| {
178 ExecutorError::corruption(
179 ErrorOrigin::Serialize,
180 format!("failed to deserialize row: {data_key} ({err})"),
181 )
182 })?)
183 }
184 (SaveMode::Insert, Some(_)) => return Err(ExecutorError::KeyExists(data_key).into()),
185 (SaveMode::Update, None) => return Err(ExecutorError::KeyNotFound(data_key).into()),
186 };
187
188 let bytes = serialize(&entity)?;
189 let row = RawRow::try_new(bytes)?;
190
191 ctx.with_store(|_| ())?;
193
194 let index_plan =
198 plan_index_mutation_for_entity::<E>(&self.db, old.as_ref(), Some(&entity))?;
199 let data_op = CommitDataOp {
200 store: E::Store::PATH.to_string(),
201 key: raw_key.as_bytes().to_vec(),
202 value: Some(row.as_bytes().to_vec()),
203 };
204 let marker = CommitMarker::new(CommitKind::Save, index_plan.commit_ops, vec![data_op])?;
205 let commit = begin_commit(marker)?;
206
207 finish_commit(
209 commit,
210 || Self::apply_indexes(&index_plan.apply, old.as_ref(), &entity),
211 || {
212 ctx.with_store_mut(|store| store.insert(raw_key, row))
213 .expect("data store missing after preflight");
214 span.set_rows(1);
215 },
216 );
217
218 Ok(entity)
219 }
220
221 fn apply_indexes(plans: &[IndexApplyPlan], old: Option<&E>, new: &E) {
227 for plan in plans {
229 let mut removed = false;
230 let mut inserted = false;
231
232 plan.store.with_borrow_mut(|s| {
233 if let Some(old) = old {
234 let outcome = s
235 .remove_index_entry(old, plan.index)
236 .expect("index remove failed after prevalidation");
237 if outcome == IndexRemoveOutcome::Removed {
238 removed = true;
239 }
240 }
241
242 let outcome = s
243 .insert_index_entry(new, plan.index)
244 .expect("index insert failed after prevalidation");
245 if outcome == IndexInsertOutcome::Inserted {
246 inserted = true;
247 }
248 });
249
250 if removed {
251 sink::record(MetricsEvent::IndexRemove {
252 entity_path: E::PATH,
253 });
254 }
255
256 if inserted {
257 sink::record(MetricsEvent::IndexInsert {
258 entity_path: E::PATH,
259 });
260 }
261 }
262 }
263}