1use crate::{
2 db::{
3 CommitDataOp, CommitKind, CommitMarker, Db, begin_commit, ensure_recovered,
4 executor::{ExecutorError, WriteUnit},
5 finish_commit,
6 index::{
7 IndexInsertOutcome, IndexRemoveOutcome,
8 plan::{IndexApplyPlan, plan_index_mutation_for_entity},
9 },
10 query::{SaveMode, SaveQuery},
11 store::{DataKey, RawRow},
12 },
13 error::{ErrorOrigin, InternalError},
14 obs::sink::{self, ExecKind, MetricsEvent, Span},
15 sanitize::sanitize,
16 serialize::{deserialize, serialize},
17 traits::{EntityKind, Path},
18 validate::validate,
19};
20use std::marker::PhantomData;
21
22#[derive(Clone, Copy)]
27pub struct SaveExecutor<E: EntityKind> {
28 db: Db<E::Canister>,
29 debug: bool,
30 _marker: PhantomData<E>,
31}
32
33impl<E: EntityKind> SaveExecutor<E> {
34 #[must_use]
39 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
40 Self {
41 db,
42 debug,
43 _marker: PhantomData,
44 }
45 }
46
47 #[must_use]
48 pub const fn debug(mut self) -> Self {
49 self.debug = true;
50 self
51 }
52
53 pub fn insert(&self, entity: E) -> Result<E, InternalError> {
59 self.save_entity(SaveMode::Insert, entity)
60 }
61
62 pub fn insert_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
64 let entity = E::from_view(view);
65 Ok(self.insert(entity)?.to_view())
66 }
67
68 pub fn update(&self, entity: E) -> Result<E, InternalError> {
70 self.save_entity(SaveMode::Update, entity)
71 }
72
73 pub fn update_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
75 let entity = E::from_view(view);
76
77 Ok(self.update(entity)?.to_view())
78 }
79
80 pub fn replace(&self, entity: E) -> Result<E, InternalError> {
82 self.save_entity(SaveMode::Replace, entity)
83 }
84
85 pub fn replace_view(&self, view: E::ViewType) -> Result<E::ViewType, InternalError> {
87 let entity = E::from_view(view);
88
89 Ok(self.replace(entity)?.to_view())
90 }
91
92 pub fn insert_many(
97 &self,
98 entities: impl IntoIterator<Item = E>,
99 ) -> Result<Vec<E>, InternalError> {
100 let iter = entities.into_iter();
101 let mut out = Vec::with_capacity(iter.size_hint().0);
102
103 for entity in iter {
106 out.push(self.insert(entity)?);
107 }
108
109 Ok(out)
110 }
111
112 pub fn update_many(
113 &self,
114 entities: impl IntoIterator<Item = E>,
115 ) -> Result<Vec<E>, InternalError> {
116 let iter = entities.into_iter();
117 let mut out = Vec::with_capacity(iter.size_hint().0);
118
119 for entity in iter {
122 out.push(self.update(entity)?);
123 }
124
125 Ok(out)
126 }
127
128 pub fn replace_many(
129 &self,
130 entities: impl IntoIterator<Item = E>,
131 ) -> Result<Vec<E>, InternalError> {
132 let iter = entities.into_iter();
133 let mut out = Vec::with_capacity(iter.size_hint().0);
134
135 for entity in iter {
138 out.push(self.replace(entity)?);
139 }
140
141 Ok(out)
142 }
143
144 pub fn execute(&self, query: SaveQuery) -> Result<E, InternalError> {
153 let entity: E = deserialize(&query.bytes)?;
154 self.save_entity(query.mode, entity)
155 }
156
157 fn save_entity(&self, mode: SaveMode, mut entity: E) -> Result<E, InternalError> {
158 let mut span = Span::<E>::new(ExecKind::Save);
159 let ctx = self.db.context::<E>();
160 let _unit = WriteUnit::new("save_entity_stage2_atomic");
161
162 ensure_recovered(&self.db)?;
164
165 sanitize(&mut entity)?;
167 validate(&entity)?;
168
169 let key = entity.key();
170 let data_key = DataKey::new::<E>(key);
171 let raw_key = data_key.to_raw();
172 let old_result = ctx.with_store(|store| store.get(&raw_key))?;
173
174 let old = match (mode, old_result) {
175 (SaveMode::Insert | SaveMode::Replace, None) => None,
176 (SaveMode::Update | SaveMode::Replace, Some(old_row)) => {
177 Some(old_row.try_decode::<E>().map_err(|err| {
178 ExecutorError::corruption(
179 ErrorOrigin::Serialize,
180 format!("failed to deserialize row: {data_key} ({err})"),
181 )
182 })?)
183 }
184 (SaveMode::Insert, Some(_)) => return Err(ExecutorError::KeyExists(data_key).into()),
185 (SaveMode::Update, None) => {
186 return Err(InternalError::store_not_found(data_key.to_string()));
187 }
188 };
189
190 let bytes = serialize(&entity)?;
191 let row = RawRow::try_new(bytes)?;
192
193 ctx.with_store(|_| ())?;
195
196 let index_plan =
200 plan_index_mutation_for_entity::<E>(&self.db, old.as_ref(), Some(&entity))?;
201 let data_op = CommitDataOp {
202 store: E::Store::PATH.to_string(),
203 key: raw_key.as_bytes().to_vec(),
204 value: Some(row.as_bytes().to_vec()),
205 };
206 let marker = CommitMarker::new(CommitKind::Save, index_plan.commit_ops, vec![data_op])?;
207 let commit = begin_commit(marker)?;
208
209 finish_commit(
211 commit,
212 || Self::apply_indexes(&index_plan.apply, old.as_ref(), &entity),
213 || {
214 ctx.with_store_mut(|store| store.insert(raw_key, row))
215 .expect("data store missing after preflight");
216 span.set_rows(1);
217 },
218 );
219
220 Ok(entity)
221 }
222
223 fn apply_indexes(plans: &[IndexApplyPlan], old: Option<&E>, new: &E) {
229 for plan in plans {
231 let mut removed = false;
232 let mut inserted = false;
233
234 plan.store.with_borrow_mut(|s| {
235 if let Some(old) = old {
236 let outcome = s
237 .remove_index_entry(old, plan.index)
238 .expect("index remove failed after prevalidation");
239 if outcome == IndexRemoveOutcome::Removed {
240 removed = true;
241 }
242 }
243
244 let outcome = s
245 .insert_index_entry(new, plan.index)
246 .expect("index insert failed after prevalidation");
247 if outcome == IndexInsertOutcome::Inserted {
248 inserted = true;
249 }
250 });
251
252 if removed {
253 sink::record(MetricsEvent::IndexRemove {
254 entity_path: E::PATH,
255 });
256 }
257
258 if inserted {
259 sink::record(MetricsEvent::IndexInsert {
260 entity_path: E::PATH,
261 });
262 }
263 }
264 }
265}