1use crate::{
2 db::{
3 CommitDataOp, CommitIndexOp, CommitKind, CommitMarker, Db, begin_commit, ensure_recovered,
4 executor::{
5 ExecutorError, FilterEvaluator, UniqueIndexHandle, WriteUnit,
6 plan::{
7 plan_for, record_plan_metrics, scan_missing_ok, scan_strict, set_rows_from_len,
8 },
9 resolve_unique_pk,
10 },
11 finish_commit,
12 index::{
13 IndexEntry, IndexEntryCorruption, IndexKey, IndexRemoveOutcome, IndexStore,
14 RawIndexEntry, RawIndexKey,
15 },
16 primitives::FilterExpr,
17 query::{DeleteQuery, QueryPlan, QueryValidate},
18 response::Response,
19 store::DataKey,
20 traits::FromKey,
21 },
22 error::{ErrorOrigin, InternalError},
23 obs::sink::{self, ExecKind, MetricsEvent, Span},
24 prelude::*,
25 sanitize::sanitize,
26 traits::{EntityKind, FieldValue, Path},
27};
28use canic_cdk::structures::Storable;
29use std::{
30 cell::RefCell, collections::BTreeMap, marker::PhantomData, ops::ControlFlow, thread::LocalKey,
31};
32
33struct DeleteAccumulator<'f, E> {
39 filter: Option<&'f FilterExpr>,
40 offset: usize,
41 skipped: usize,
42 limit: Option<usize>,
43 matches: Vec<(DataKey, E)>,
44}
45
46impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
47 fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
48 Self {
49 filter,
50 offset,
51 skipped: 0,
52 limit,
53 matches: Vec::with_capacity(limit.unwrap_or(0)),
54 }
55 }
56
57 fn limit_reached(&self) -> bool {
58 self.limit.is_some_and(|lim| self.matches.len() >= lim)
59 }
60
61 fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
62 if let Some(f) = self.filter
63 && !FilterEvaluator::new(&entity).eval(f)
64 {
65 return false;
66 }
67
68 if self.skipped < self.offset {
69 self.skipped += 1;
70 return false;
71 }
72
73 if self.limit_reached() {
74 return true;
75 }
76
77 self.matches.push((dk, entity));
78 false
79 }
80}
81
82struct IndexPlan {
88 index: &'static IndexModel,
89 store: &'static LocalKey<RefCell<IndexStore>>,
90}
91
92#[derive(Clone, Copy)]
101pub struct DeleteExecutor<E: EntityKind> {
102 db: Db<E::Canister>,
103 debug: bool,
104 _marker: PhantomData<E>,
105}
106
107impl<E: EntityKind> DeleteExecutor<E> {
108 #[must_use]
109 pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
110 Self {
111 db,
112 debug,
113 _marker: PhantomData,
114 }
115 }
116
117 #[must_use]
118 pub const fn debug(mut self) -> Self {
119 self.debug = true;
120 self
121 }
122
123 pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, InternalError> {
128 self.execute(DeleteQuery::new().one::<E>(pk))
129 }
130
131 pub fn only(self) -> Result<Response<E>, InternalError> {
132 self.execute(DeleteQuery::new().one::<E>(()))
133 }
134
135 pub fn many<I, V>(self, values: I) -> Result<Response<E>, InternalError>
136 where
137 I: IntoIterator<Item = V>,
138 V: FieldValue,
139 {
140 self.execute(DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values))
141 }
142
143 pub fn by_unique_index(
148 self,
149 index: UniqueIndexHandle,
150 entity: E,
151 ) -> Result<Response<E>, InternalError>
152 where
153 E::PrimaryKey: FromKey,
154 {
155 let mut span = Span::<E>::new(ExecKind::Delete);
156 ensure_recovered(&self.db)?;
157
158 let index = index.index();
159 let mut lookup = entity;
160 sanitize(&mut lookup)?;
161
162 let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
164 set_rows_from_len(&mut span, 0);
165 return Ok(Response(Vec::new()));
166 };
167
168 let (dk, stored) = self.load_existing(pk)?;
169 let ctx = self.db.context::<E>();
170 let index_plans = self.build_index_plans()?;
171 let index_ops = Self::build_index_removal_ops(&index_plans, &[&stored])?;
172
173 ctx.with_store(|_| ())?;
175
176 let marker = CommitMarker::new(
177 CommitKind::Delete,
178 index_ops,
179 vec![CommitDataOp {
180 store: E::Store::PATH.to_string(),
181 key: dk.to_raw().as_bytes().to_vec(),
182 value: None,
183 }],
184 )?;
185 let commit = begin_commit(marker)?;
186
187 finish_commit(
188 commit,
189 || {
190 let _unit = WriteUnit::new("delete_unique_row_stage1_atomic");
191 for plan in &index_plans {
192 let outcome = plan
193 .store
194 .with_borrow_mut(|s| s.remove_index_entry(&stored, plan.index))
195 .expect("index remove failed after prevalidation");
196 if outcome == IndexRemoveOutcome::Removed {
197 sink::record(MetricsEvent::IndexRemove {
198 entity_path: E::PATH,
199 });
200 }
201 }
202 },
203 || {
204 ctx.with_store_mut(|s| s.remove(&dk.to_raw()))
205 .expect("data store missing after preflight");
206 },
207 );
208
209 set_rows_from_len(&mut span, 1);
210 Ok(Response(vec![(dk.key(), stored)]))
211 }
212
213 pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, InternalError> {
218 QueryValidate::<E>::validate(&query)?;
219 Ok(plan_for::<E>(query.filter.as_ref()))
220 }
221
222 pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, InternalError> {
223 QueryValidate::<E>::validate(&query)?;
225 ensure_recovered(&self.db)?;
226
227 let mut span = Span::<E>::new(ExecKind::Delete);
228
229 let plan = plan_for::<E>(query.filter.as_ref());
231 record_plan_metrics(&plan);
232
233 let (limit, offset) = match query.limit.as_ref() {
235 Some(l) => (l.limit.map(|v| v as usize), l.offset as usize),
236 None => (None, 0),
237 };
238
239 let filter = query.filter.as_ref().map(|f| f.clone().simplify());
241 let mut acc = DeleteAccumulator::new(filter.as_ref(), offset, limit);
242 let mut scanned = 0u64;
243
244 match plan {
246 QueryPlan::Keys(keys) => {
247 scan_missing_ok::<E, _>(&self.db, QueryPlan::Keys(keys), |dk, entity| {
249 scanned += 1;
250 if acc.should_stop(dk, entity) {
251 ControlFlow::Break(())
252 } else {
253 ControlFlow::Continue(())
254 }
255 })?;
256 }
257 plan => {
258 scan_strict::<E, _>(&self.db, plan, |dk, entity| {
260 scanned += 1;
261 if acc.should_stop(dk, entity) {
262 ControlFlow::Break(())
263 } else {
264 ControlFlow::Continue(())
265 }
266 })?;
267 }
268 }
269
270 sink::record(MetricsEvent::RowsScanned {
272 entity_path: E::PATH,
273 rows_scanned: scanned,
274 });
275
276 if acc.matches.is_empty() {
278 set_rows_from_len(&mut span, 0);
279 return Ok(Response(Vec::new()));
280 }
281
282 let index_plans = self.build_index_plans()?;
284 let entities: Vec<&E> = acc.matches.iter().map(|(_, e)| e).collect();
285 let index_ops = Self::build_index_removal_ops(&index_plans, &entities)?;
286
287 let ctx = self.db.context::<E>();
289 ctx.with_store(|_| ())?;
290
291 let data_ops = acc
292 .matches
293 .iter()
294 .map(|(dk, _)| CommitDataOp {
295 store: E::Store::PATH.to_string(),
296 key: dk.to_raw().as_bytes().to_vec(),
297 value: None,
298 })
299 .collect();
300
301 let marker = CommitMarker::new(CommitKind::Delete, index_ops, data_ops)?;
302 let commit = begin_commit(marker)?;
303
304 finish_commit(
306 commit,
307 || {
308 for (_, entity) in &acc.matches {
309 let _unit = WriteUnit::new("delete_row_stage1_atomic");
310 for plan in &index_plans {
311 let outcome = plan
312 .store
313 .with_borrow_mut(|s| s.remove_index_entry(entity, plan.index))
314 .expect("index remove failed after prevalidation");
315 if outcome == IndexRemoveOutcome::Removed {
316 sink::record(MetricsEvent::IndexRemove {
317 entity_path: E::PATH,
318 });
319 }
320 }
321 }
322 },
323 || {
324 ctx.with_store_mut(|s| {
325 for (dk, _) in &acc.matches {
326 s.remove(&dk.to_raw());
327 }
328 })
329 .expect("data store missing after preflight");
330 },
331 );
332
333 let res = acc
335 .matches
336 .into_iter()
337 .map(|(dk, e)| (dk.key(), e))
338 .collect::<Vec<_>>();
339 set_rows_from_len(&mut span, res.len());
340
341 Ok(Response(res))
342 }
343
344 fn load_existing(&self, pk: E::PrimaryKey) -> Result<(DataKey, E), InternalError> {
349 let dk = DataKey::new::<E>(pk.into());
350 let row = self.db.context::<E>().read_strict(&dk)?;
351 let entity = row.try_decode::<E>().map_err(|err| {
352 ExecutorError::corruption(
353 ErrorOrigin::Serialize,
354 format!("failed to deserialize row: {dk} ({err})"),
355 )
356 })?;
357 Ok((dk, entity))
358 }
359
360 fn build_index_plans(&self) -> Result<Vec<IndexPlan>, InternalError> {
361 E::INDEXES
362 .iter()
363 .map(|index| {
364 let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
365 Ok(IndexPlan { index, store })
366 })
367 .collect()
368 }
369
370 fn build_index_removal_ops(
371 plans: &[IndexPlan],
372 entities: &[&E],
373 ) -> Result<Vec<CommitIndexOp>, InternalError> {
374 let mut ops = Vec::new();
375
376 for plan in plans {
378 let fields = plan.index.fields.join(", ");
379
380 let mut entries: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
382
383 for entity in entities {
385 let Some(key) = IndexKey::new(*entity, plan.index) else {
386 continue;
387 };
388 let raw_key = key.to_raw();
389
390 let entry = match entries.entry(raw_key) {
392 std::collections::btree_map::Entry::Vacant(slot) => {
393 let decoded = plan
394 .store
395 .with_borrow(|s| s.get(&raw_key))
396 .map(|raw| {
397 raw.try_decode().map_err(|err| {
398 ExecutorError::corruption(
399 ErrorOrigin::Index,
400 format!(
401 "index corrupted: {} ({}) -> {}",
402 E::PATH,
403 fields,
404 err
405 ),
406 )
407 })
408 })
409 .transpose()?;
410 slot.insert(decoded)
411 }
412 std::collections::btree_map::Entry::Occupied(slot) => slot.into_mut(),
413 };
414
415 if let Some(e) = entry.as_mut() {
417 e.remove_key(&entity.key());
418 if e.is_empty() {
419 *entry = None;
420 }
421 }
422 }
423
424 for (raw_key, entry) in entries {
426 let value = if let Some(entry) = entry {
427 let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| {
428 ExecutorError::corruption(
429 ErrorOrigin::Index,
430 format!(
431 "index corrupted: {} ({}) -> {}",
432 E::PATH,
433 fields,
434 IndexEntryCorruption::TooManyKeys { count: err.keys() }
435 ),
436 )
437 })?;
438 Some(raw.into_bytes())
439 } else {
440 None
442 };
443
444 ops.push(CommitIndexOp {
445 store: plan.index.store.to_string(),
446 key: raw_key.as_bytes().to_vec(),
447 value,
448 });
449 }
450 }
451
452 Ok(ops)
453 }
454}