icydb_core/db/executor/
delete.rs

1use crate::{
2    db::{
3        CommitDataOp, CommitIndexOp, CommitKind, CommitMarker, Db, begin_commit, ensure_recovered,
4        executor::{
5            ExecutorError, FilterEvaluator, UniqueIndexHandle, WriteUnit,
6            plan::{
7                plan_for, record_plan_metrics, scan_missing_ok, scan_strict, set_rows_from_len,
8            },
9            resolve_unique_pk,
10        },
11        finish_commit,
12        index::{
13            IndexEntry, IndexEntryCorruption, IndexKey, IndexRemoveOutcome, IndexStore,
14            RawIndexEntry, RawIndexKey,
15        },
16        primitives::FilterExpr,
17        query::{DeleteQuery, QueryPlan, QueryValidate},
18        response::Response,
19        store::DataKey,
20        traits::FromKey,
21    },
22    error::{ErrorOrigin, InternalError},
23    obs::sink::{self, ExecKind, MetricsEvent, Span},
24    prelude::*,
25    sanitize::sanitize,
26    traits::{EntityKind, FieldValue, Path},
27};
28use canic_cdk::structures::Storable;
29use std::{
30    cell::RefCell, collections::BTreeMap, marker::PhantomData, ops::ControlFlow, thread::LocalKey,
31};
32
33///
34/// DeleteAccumulator
35/// Accumulates delete candidates while respecting filter/offset/limit semantics.
36///
37
38struct DeleteAccumulator<'f, E> {
39    filter: Option<&'f FilterExpr>,
40    offset: usize,
41    skipped: usize,
42    limit: Option<usize>,
43    matches: Vec<(DataKey, E)>,
44}
45
46impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
47    fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
48        Self {
49            filter,
50            offset,
51            skipped: 0,
52            limit,
53            matches: Vec::with_capacity(limit.unwrap_or(0)),
54        }
55    }
56
57    fn limit_reached(&self) -> bool {
58        self.limit.is_some_and(|lim| self.matches.len() >= lim)
59    }
60
61    fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
62        if let Some(f) = self.filter
63            && !FilterEvaluator::new(&entity).eval(f)
64        {
65            return false;
66        }
67
68        if self.skipped < self.offset {
69            self.skipped += 1;
70            return false;
71        }
72
73        if self.limit_reached() {
74            return true;
75        }
76
77        self.matches.push((dk, entity));
78        false
79    }
80}
81
82///
83/// IndexPlan
84/// Prevalidated handle to an index store used during commit planning.
85///
86
87struct IndexPlan {
88    index: &'static IndexModel,
89    store: &'static LocalKey<RefCell<IndexStore>>,
90}
91
92///
93/// DeleteExecutor
94///
95/// Stage-1 atomicity invariant:
96/// All fallible validation completes before the first stable write.
97/// After mutation begins, only infallible operations or traps remain.
98/// IC rollback semantics guarantee atomicity within this update call.
99///
100#[derive(Clone, Copy)]
101pub struct DeleteExecutor<E: EntityKind> {
102    db: Db<E::Canister>,
103    debug: bool,
104    _marker: PhantomData<E>,
105}
106
107impl<E: EntityKind> DeleteExecutor<E> {
108    #[must_use]
109    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
110        Self {
111            db,
112            debug,
113            _marker: PhantomData,
114        }
115    }
116
117    #[must_use]
118    pub const fn debug(mut self) -> Self {
119        self.debug = true;
120        self
121    }
122
123    fn debug_log(&self, s: impl Into<String>) {
124        if self.debug {
125            println!("{}", s.into());
126        }
127    }
128
129    // ─────────────────────────────────────────────
130    // PK helpers
131    // ─────────────────────────────────────────────
132
133    pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, InternalError> {
134        self.execute(DeleteQuery::new().one::<E>(pk))
135    }
136
137    pub fn only(self) -> Result<Response<E>, InternalError> {
138        self.execute(DeleteQuery::new().one::<E>(()))
139    }
140
141    pub fn many<I, V>(self, values: I) -> Result<Response<E>, InternalError>
142    where
143        I: IntoIterator<Item = V>,
144        V: FieldValue,
145    {
146        self.execute(DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values))
147    }
148
149    // ─────────────────────────────────────────────
150    // Unique-index delete
151    // ─────────────────────────────────────────────
152
153    pub fn by_unique_index(
154        self,
155        index: UniqueIndexHandle,
156        entity: E,
157    ) -> Result<Response<E>, InternalError>
158    where
159        E::PrimaryKey: FromKey,
160    {
161        self.debug_log(format!(
162            "[debug] delete by unique index on {} ({})",
163            E::PATH,
164            index.index().fields.join(", ")
165        ));
166        let mut span = Span::<E>::new(ExecKind::Delete);
167        ensure_recovered(&self.db)?;
168
169        let index = index.index();
170        let mut lookup = entity;
171        sanitize(&mut lookup)?;
172
173        // Resolve PK via unique index; absence is a no-op.
174        let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
175            set_rows_from_len(&mut span, 0);
176            return Ok(Response(Vec::new()));
177        };
178
179        let (dk, stored) = self.load_existing(pk)?;
180        let ctx = self.db.context::<E>();
181        let index_plans = self.build_index_plans()?;
182        let index_ops = Self::build_index_removal_ops(&index_plans, &[&stored])?;
183
184        // Preflight: ensure stores are accessible before committing.
185        ctx.with_store(|_| ())?;
186
187        let marker = CommitMarker::new(
188            CommitKind::Delete,
189            index_ops,
190            vec![CommitDataOp {
191                store: E::Store::PATH.to_string(),
192                key: dk.to_raw().as_bytes().to_vec(),
193                value: None,
194            }],
195        )?;
196        let commit = begin_commit(marker)?;
197
198        finish_commit(
199            commit,
200            || {
201                let _unit = WriteUnit::new("delete_unique_row_stage1_atomic");
202                for plan in &index_plans {
203                    let outcome = plan
204                        .store
205                        .with_borrow_mut(|s| s.remove_index_entry(&stored, plan.index))
206                        .expect("index remove failed after prevalidation");
207                    if outcome == IndexRemoveOutcome::Removed {
208                        sink::record(MetricsEvent::IndexRemove {
209                            entity_path: E::PATH,
210                        });
211                    }
212                }
213            },
214            || {
215                ctx.with_store_mut(|s| s.remove(&dk.to_raw()))
216                    .expect("data store missing after preflight");
217            },
218        );
219
220        set_rows_from_len(&mut span, 1);
221        Ok(Response(vec![(dk.key(), stored)]))
222    }
223
224    // ─────────────────────────────────────────────
225    // Planner-based delete
226    // ─────────────────────────────────────────────
227
228    pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, InternalError> {
229        QueryValidate::<E>::validate(&query)?;
230        Ok(plan_for::<E>(query.filter.as_ref()))
231    }
232
233    pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, InternalError> {
234        // Validate query shape and ensure no recovery is in progress.
235        QueryValidate::<E>::validate(&query)?;
236        ensure_recovered(&self.db)?;
237
238        self.debug_log(format!("[debug] delete query {:?} on {}", query, E::PATH));
239
240        let mut span = Span::<E>::new(ExecKind::Delete);
241
242        // Plan the delete using the same planner as loads (side-effect free).
243        let plan = plan_for::<E>(query.filter.as_ref());
244        record_plan_metrics(&plan);
245
246        self.debug_log(format!("[debug] delete plan: {plan:?}"));
247
248        // Extract pagination controls for scan-time filtering.
249        let (limit, offset) = match query.limit.as_ref() {
250            Some(l) => (l.limit.map(|v| v as usize), l.offset as usize),
251            None => (None, 0),
252        };
253
254        // Prepare accumulator that enforces filter/offset/limit semantics.
255        let filter = query.filter.as_ref().map(|f| f.clone().simplify());
256        let mut acc = DeleteAccumulator::new(filter.as_ref(), offset, limit);
257        let mut scanned = 0u64;
258
259        // Scan phase: identify rows to delete without mutating storage.
260        match plan {
261            QueryPlan::Keys(keys) => {
262                // Key deletes are idempotent: missing rows are skipped.
263                scan_missing_ok::<E, _>(&self.db, QueryPlan::Keys(keys), |dk, entity| {
264                    scanned += 1;
265                    if acc.should_stop(dk, entity) {
266                        ControlFlow::Break(())
267                    } else {
268                        ControlFlow::Continue(())
269                    }
270                })?;
271            }
272            plan => {
273                // Non-key plans are strict: missing or corrupt rows are errors.
274                scan_strict::<E, _>(&self.db, plan, |dk, entity| {
275                    scanned += 1;
276                    if acc.should_stop(dk, entity) {
277                        ControlFlow::Break(())
278                    } else {
279                        ControlFlow::Continue(())
280                    }
281                })?;
282            }
283        }
284
285        // Record how many rows were examined during planning.
286        sink::record(MetricsEvent::RowsScanned {
287            entity_path: E::PATH,
288            rows_scanned: scanned,
289        });
290
291        // Fast exit when no rows matched after filtering and pagination.
292        if acc.matches.is_empty() {
293            set_rows_from_len(&mut span, 0);
294            return Ok(Response(Vec::new()));
295        }
296
297        // Precompute index and data mutations before beginning the commit.
298        let index_plans = self.build_index_plans()?;
299        let entities: Vec<&E> = acc.matches.iter().map(|(_, e)| e).collect();
300        let index_ops = Self::build_index_removal_ops(&index_plans, &entities)?;
301
302        // Preflight store access to ensure no fallible work remains post-commit.
303        let ctx = self.db.context::<E>();
304        ctx.with_store(|_| ())?;
305
306        let data_ops = acc
307            .matches
308            .iter()
309            .map(|(dk, _)| CommitDataOp {
310                store: E::Store::PATH.to_string(),
311                key: dk.to_raw().as_bytes().to_vec(),
312                value: None,
313            })
314            .collect();
315
316        let marker = CommitMarker::new(CommitKind::Delete, index_ops, data_ops)?;
317        let commit = begin_commit(marker)?;
318
319        // Commit phase: remove index entries first, then delete data rows.
320        finish_commit(
321            commit,
322            || {
323                for (_, entity) in &acc.matches {
324                    let _unit = WriteUnit::new("delete_row_stage1_atomic");
325                    for plan in &index_plans {
326                        let outcome = plan
327                            .store
328                            .with_borrow_mut(|s| s.remove_index_entry(entity, plan.index))
329                            .expect("index remove failed after prevalidation");
330                        if outcome == IndexRemoveOutcome::Removed {
331                            sink::record(MetricsEvent::IndexRemove {
332                                entity_path: E::PATH,
333                            });
334                        }
335                    }
336                }
337            },
338            || {
339                ctx.with_store_mut(|s| {
340                    for (dk, _) in &acc.matches {
341                        s.remove(&dk.to_raw());
342                    }
343                })
344                .expect("data store missing after preflight");
345            },
346        );
347
348        // Return deleted entities to the caller.
349        let res = acc
350            .matches
351            .into_iter()
352            .map(|(dk, e)| (dk.key(), e))
353            .collect::<Vec<_>>();
354        set_rows_from_len(&mut span, res.len());
355
356        Ok(Response(res))
357    }
358
359    // ─────────────────────────────────────────────
360    // Helpers
361    // ─────────────────────────────────────────────
362
363    fn load_existing(&self, pk: E::PrimaryKey) -> Result<(DataKey, E), InternalError> {
364        let dk = DataKey::new::<E>(pk.into());
365        let row = self.db.context::<E>().read_strict(&dk)?;
366        let entity = row.try_decode::<E>().map_err(|err| {
367            ExecutorError::corruption(
368                ErrorOrigin::Serialize,
369                format!("failed to deserialize row: {dk} ({err})"),
370            )
371        })?;
372        Ok((dk, entity))
373    }
374
375    fn build_index_plans(&self) -> Result<Vec<IndexPlan>, InternalError> {
376        E::INDEXES
377            .iter()
378            .map(|index| {
379                let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
380                Ok(IndexPlan { index, store })
381            })
382            .collect()
383    }
384
385    fn build_index_removal_ops(
386        plans: &[IndexPlan],
387        entities: &[&E],
388    ) -> Result<Vec<CommitIndexOp>, InternalError> {
389        let mut ops = Vec::new();
390
391        // Process each index independently to compute its resulting mutations.
392        for plan in plans {
393            let fields = plan.index.fields.join(", ");
394
395            // Map raw index keys → updated entry (or None if fully removed).
396            let mut entries: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
397
398            // Fold entity deletions into per-key index entry updates.
399            for entity in entities {
400                let Some(key) = IndexKey::new(*entity, plan.index) else {
401                    continue;
402                };
403                let raw_key = key.to_raw();
404
405                // Lazily load and decode the existing index entry once per key.
406                let entry = match entries.entry(raw_key) {
407                    std::collections::btree_map::Entry::Vacant(slot) => {
408                        let decoded = plan
409                            .store
410                            .with_borrow(|s| s.get(&raw_key))
411                            .map(|raw| {
412                                raw.try_decode().map_err(|err| {
413                                    ExecutorError::corruption(
414                                        ErrorOrigin::Index,
415                                        format!(
416                                            "index corrupted: {} ({}) -> {}",
417                                            E::PATH,
418                                            fields,
419                                            err
420                                        ),
421                                    )
422                                })
423                            })
424                            .transpose()?;
425                        slot.insert(decoded)
426                    }
427                    std::collections::btree_map::Entry::Occupied(slot) => slot.into_mut(),
428                };
429
430                // Remove this entity’s key from the index entry.
431                if let Some(e) = entry.as_mut() {
432                    e.remove_key(&entity.key());
433                    if e.is_empty() {
434                        *entry = None;
435                    }
436                }
437            }
438
439            // Emit commit ops for each touched index key.
440            for (raw_key, entry) in entries {
441                let value = if let Some(entry) = entry {
442                    let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| {
443                        ExecutorError::corruption(
444                            ErrorOrigin::Index,
445                            format!(
446                                "index corrupted: {} ({}) -> {}",
447                                E::PATH,
448                                fields,
449                                IndexEntryCorruption::TooManyKeys { count: err.keys() }
450                            ),
451                        )
452                    })?;
453                    Some(raw.into_bytes())
454                } else {
455                    // None means the index entry is fully removed.
456                    None
457                };
458
459                ops.push(CommitIndexOp {
460                    store: plan.index.store.to_string(),
461                    key: raw_key.as_bytes().to_vec(),
462                    value,
463                });
464            }
465        }
466
467        Ok(ops)
468    }
469}