Skip to main content

icydb_core/db/executor/
delete.rs

1use crate::{
2    db::{
3        CommitDataOp, CommitIndexOp, CommitKind, CommitMarker, Db, begin_commit, ensure_recovered,
4        executor::{
5            ExecutorError, FilterEvaluator, UniqueIndexHandle, WriteUnit,
6            plan::{
7                plan_for, record_plan_metrics, scan_missing_ok, scan_strict, set_rows_from_len,
8            },
9            resolve_unique_pk,
10        },
11        finish_commit,
12        index::{
13            IndexEntry, IndexEntryCorruption, IndexKey, IndexRemoveOutcome, IndexStore,
14            RawIndexEntry, RawIndexKey,
15        },
16        primitives::FilterExpr,
17        query::{DeleteQuery, QueryPlan, QueryValidate},
18        response::Response,
19        store::DataKey,
20        traits::FromKey,
21    },
22    error::{ErrorOrigin, InternalError},
23    obs::sink::{self, ExecKind, MetricsEvent, Span},
24    prelude::*,
25    sanitize::sanitize,
26    traits::{EntityKind, FieldValue, Path},
27};
28use canic_cdk::structures::Storable;
29use std::{
30    cell::RefCell, collections::BTreeMap, marker::PhantomData, ops::ControlFlow, thread::LocalKey,
31};
32
33///
34/// DeleteAccumulator
35/// Accumulates delete candidates while respecting filter/offset/limit semantics.
36///
37
38struct DeleteAccumulator<'f, E> {
39    filter: Option<&'f FilterExpr>,
40    offset: usize,
41    skipped: usize,
42    limit: Option<usize>,
43    matches: Vec<(DataKey, E)>,
44}
45
46impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
47    fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
48        Self {
49            filter,
50            offset,
51            skipped: 0,
52            limit,
53            matches: Vec::with_capacity(limit.unwrap_or(0)),
54        }
55    }
56
57    fn limit_reached(&self) -> bool {
58        self.limit.is_some_and(|lim| self.matches.len() >= lim)
59    }
60
61    fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
62        if let Some(f) = self.filter
63            && !FilterEvaluator::new(&entity).eval(f)
64        {
65            return false;
66        }
67
68        if self.skipped < self.offset {
69            self.skipped += 1;
70            return false;
71        }
72
73        if self.limit_reached() {
74            return true;
75        }
76
77        self.matches.push((dk, entity));
78        false
79    }
80}
81
82///
83/// IndexPlan
84/// Prevalidated handle to an index store used during commit planning.
85///
86
87struct IndexPlan {
88    index: &'static IndexModel,
89    store: &'static LocalKey<RefCell<IndexStore>>,
90}
91
92///
93/// DeleteExecutor
94///
95/// Stage-1 atomicity invariant:
96/// All fallible validation completes before the first stable write.
97/// After mutation begins, only infallible operations or traps remain.
98/// IC rollback semantics guarantee atomicity within this update call.
99///
100#[derive(Clone, Copy)]
101pub struct DeleteExecutor<E: EntityKind> {
102    db: Db<E::Canister>,
103    debug: bool,
104    _marker: PhantomData<E>,
105}
106
107impl<E: EntityKind> DeleteExecutor<E> {
108    #[must_use]
109    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
110        Self {
111            db,
112            debug,
113            _marker: PhantomData,
114        }
115    }
116
117    #[must_use]
118    pub const fn debug(mut self) -> Self {
119        self.debug = true;
120        self
121    }
122
123    // ─────────────────────────────────────────────
124    // PK helpers
125    // ─────────────────────────────────────────────
126
127    pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, InternalError> {
128        self.execute(DeleteQuery::new().one::<E>(pk))
129    }
130
131    pub fn only(self) -> Result<Response<E>, InternalError> {
132        self.execute(DeleteQuery::new().one::<E>(()))
133    }
134
135    pub fn many<I, V>(self, values: I) -> Result<Response<E>, InternalError>
136    where
137        I: IntoIterator<Item = V>,
138        V: FieldValue,
139    {
140        self.execute(DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values))
141    }
142
143    // ─────────────────────────────────────────────
144    // Unique-index delete
145    // ─────────────────────────────────────────────
146
147    pub fn by_unique_index(
148        self,
149        index: UniqueIndexHandle,
150        entity: E,
151    ) -> Result<Response<E>, InternalError>
152    where
153        E::PrimaryKey: FromKey,
154    {
155        let mut span = Span::<E>::new(ExecKind::Delete);
156        ensure_recovered(&self.db)?;
157
158        let index = index.index();
159        let mut lookup = entity;
160        sanitize(&mut lookup)?;
161
162        // Resolve PK via unique index; absence is a no-op.
163        let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
164            set_rows_from_len(&mut span, 0);
165            return Ok(Response(Vec::new()));
166        };
167
168        let (dk, stored) = self.load_existing(pk)?;
169        let ctx = self.db.context::<E>();
170        let index_plans = self.build_index_plans()?;
171        let index_ops = Self::build_index_removal_ops(&index_plans, &[&stored])?;
172
173        // Preflight: ensure stores are accessible before committing.
174        ctx.with_store(|_| ())?;
175
176        let marker = CommitMarker::new(
177            CommitKind::Delete,
178            index_ops,
179            vec![CommitDataOp {
180                store: E::Store::PATH.to_string(),
181                key: dk.to_raw().as_bytes().to_vec(),
182                value: None,
183            }],
184        )?;
185        let commit = begin_commit(marker)?;
186
187        finish_commit(
188            commit,
189            || {
190                let _unit = WriteUnit::new("delete_unique_row_stage1_atomic");
191                for plan in &index_plans {
192                    let outcome = plan
193                        .store
194                        .with_borrow_mut(|s| s.remove_index_entry(&stored, plan.index))
195                        .expect("index remove failed after prevalidation");
196                    if outcome == IndexRemoveOutcome::Removed {
197                        sink::record(MetricsEvent::IndexRemove {
198                            entity_path: E::PATH,
199                        });
200                    }
201                }
202            },
203            || {
204                ctx.with_store_mut(|s| s.remove(&dk.to_raw()))
205                    .expect("data store missing after preflight");
206            },
207        );
208
209        set_rows_from_len(&mut span, 1);
210        Ok(Response(vec![(dk.key(), stored)]))
211    }
212
213    // ─────────────────────────────────────────────
214    // Planner-based delete
215    // ─────────────────────────────────────────────
216
217    pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, InternalError> {
218        QueryValidate::<E>::validate(&query)?;
219        Ok(plan_for::<E>(query.filter.as_ref()))
220    }
221
222    pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, InternalError> {
223        // Validate query shape and ensure no recovery is in progress.
224        QueryValidate::<E>::validate(&query)?;
225        ensure_recovered(&self.db)?;
226
227        let mut span = Span::<E>::new(ExecKind::Delete);
228
229        // Plan the delete using the same planner as loads (side-effect free).
230        let plan = plan_for::<E>(query.filter.as_ref());
231        record_plan_metrics(&plan);
232
233        // Extract pagination controls for scan-time filtering.
234        let (limit, offset) = match query.limit.as_ref() {
235            Some(l) => (l.limit.map(|v| v as usize), l.offset as usize),
236            None => (None, 0),
237        };
238
239        // Prepare accumulator that enforces filter/offset/limit semantics.
240        let filter = query.filter.as_ref().map(|f| f.clone().simplify());
241        let mut acc = DeleteAccumulator::new(filter.as_ref(), offset, limit);
242        let mut scanned = 0u64;
243
244        // Scan phase: identify rows to delete without mutating storage.
245        match plan {
246            QueryPlan::Keys(keys) => {
247                // Key deletes are idempotent: missing rows are skipped.
248                scan_missing_ok::<E, _>(&self.db, QueryPlan::Keys(keys), |dk, entity| {
249                    scanned += 1;
250                    if acc.should_stop(dk, entity) {
251                        ControlFlow::Break(())
252                    } else {
253                        ControlFlow::Continue(())
254                    }
255                })?;
256            }
257            plan => {
258                // Non-key plans are strict: missing or corrupt rows are errors.
259                scan_strict::<E, _>(&self.db, plan, |dk, entity| {
260                    scanned += 1;
261                    if acc.should_stop(dk, entity) {
262                        ControlFlow::Break(())
263                    } else {
264                        ControlFlow::Continue(())
265                    }
266                })?;
267            }
268        }
269
270        // Record how many rows were examined during planning.
271        sink::record(MetricsEvent::RowsScanned {
272            entity_path: E::PATH,
273            rows_scanned: scanned,
274        });
275
276        // Fast exit when no rows matched after filtering and pagination.
277        if acc.matches.is_empty() {
278            set_rows_from_len(&mut span, 0);
279            return Ok(Response(Vec::new()));
280        }
281
282        // Precompute index and data mutations before beginning the commit.
283        let index_plans = self.build_index_plans()?;
284        let entities: Vec<&E> = acc.matches.iter().map(|(_, e)| e).collect();
285        let index_ops = Self::build_index_removal_ops(&index_plans, &entities)?;
286
287        // Preflight store access to ensure no fallible work remains post-commit.
288        let ctx = self.db.context::<E>();
289        ctx.with_store(|_| ())?;
290
291        let data_ops = acc
292            .matches
293            .iter()
294            .map(|(dk, _)| CommitDataOp {
295                store: E::Store::PATH.to_string(),
296                key: dk.to_raw().as_bytes().to_vec(),
297                value: None,
298            })
299            .collect();
300
301        let marker = CommitMarker::new(CommitKind::Delete, index_ops, data_ops)?;
302        let commit = begin_commit(marker)?;
303
304        // Commit phase: remove index entries first, then delete data rows.
305        finish_commit(
306            commit,
307            || {
308                for (_, entity) in &acc.matches {
309                    let _unit = WriteUnit::new("delete_row_stage1_atomic");
310                    for plan in &index_plans {
311                        let outcome = plan
312                            .store
313                            .with_borrow_mut(|s| s.remove_index_entry(entity, plan.index))
314                            .expect("index remove failed after prevalidation");
315                        if outcome == IndexRemoveOutcome::Removed {
316                            sink::record(MetricsEvent::IndexRemove {
317                                entity_path: E::PATH,
318                            });
319                        }
320                    }
321                }
322            },
323            || {
324                ctx.with_store_mut(|s| {
325                    for (dk, _) in &acc.matches {
326                        s.remove(&dk.to_raw());
327                    }
328                })
329                .expect("data store missing after preflight");
330            },
331        );
332
333        // Return deleted entities to the caller.
334        let res = acc
335            .matches
336            .into_iter()
337            .map(|(dk, e)| (dk.key(), e))
338            .collect::<Vec<_>>();
339        set_rows_from_len(&mut span, res.len());
340
341        Ok(Response(res))
342    }
343
344    // ─────────────────────────────────────────────
345    // Helpers
346    // ─────────────────────────────────────────────
347
348    fn load_existing(&self, pk: E::PrimaryKey) -> Result<(DataKey, E), InternalError> {
349        let dk = DataKey::new::<E>(pk.into());
350        let row = self.db.context::<E>().read_strict(&dk)?;
351        let entity = row.try_decode::<E>().map_err(|err| {
352            ExecutorError::corruption(
353                ErrorOrigin::Serialize,
354                format!("failed to deserialize row: {dk} ({err})"),
355            )
356        })?;
357        Ok((dk, entity))
358    }
359
360    fn build_index_plans(&self) -> Result<Vec<IndexPlan>, InternalError> {
361        E::INDEXES
362            .iter()
363            .map(|index| {
364                let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
365                Ok(IndexPlan { index, store })
366            })
367            .collect()
368    }
369
370    fn build_index_removal_ops(
371        plans: &[IndexPlan],
372        entities: &[&E],
373    ) -> Result<Vec<CommitIndexOp>, InternalError> {
374        let mut ops = Vec::new();
375
376        // Process each index independently to compute its resulting mutations.
377        for plan in plans {
378            let fields = plan.index.fields.join(", ");
379
380            // Map raw index keys → updated entry (or None if fully removed).
381            let mut entries: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
382
383            // Fold entity deletions into per-key index entry updates.
384            for entity in entities {
385                let Some(key) = IndexKey::new(*entity, plan.index) else {
386                    continue;
387                };
388                let raw_key = key.to_raw();
389
390                // Lazily load and decode the existing index entry once per key.
391                let entry = match entries.entry(raw_key) {
392                    std::collections::btree_map::Entry::Vacant(slot) => {
393                        let decoded = plan
394                            .store
395                            .with_borrow(|s| s.get(&raw_key))
396                            .map(|raw| {
397                                raw.try_decode().map_err(|err| {
398                                    ExecutorError::corruption(
399                                        ErrorOrigin::Index,
400                                        format!(
401                                            "index corrupted: {} ({}) -> {}",
402                                            E::PATH,
403                                            fields,
404                                            err
405                                        ),
406                                    )
407                                })
408                            })
409                            .transpose()?;
410                        slot.insert(decoded)
411                    }
412                    std::collections::btree_map::Entry::Occupied(slot) => slot.into_mut(),
413                };
414
415                // Remove this entity’s key from the index entry.
416                if let Some(e) = entry.as_mut() {
417                    e.remove_key(&entity.key());
418                    if e.is_empty() {
419                        *entry = None;
420                    }
421                }
422            }
423
424            // Emit commit ops for each touched index key.
425            for (raw_key, entry) in entries {
426                let value = if let Some(entry) = entry {
427                    let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| {
428                        ExecutorError::corruption(
429                            ErrorOrigin::Index,
430                            format!(
431                                "index corrupted: {} ({}) -> {}",
432                                E::PATH,
433                                fields,
434                                IndexEntryCorruption::TooManyKeys { count: err.keys() }
435                            ),
436                        )
437                    })?;
438                    Some(raw.into_bytes())
439                } else {
440                    // None means the index entry is fully removed.
441                    None
442                };
443
444                ops.push(CommitIndexOp {
445                    store: plan.index.store.to_string(),
446                    key: raw_key.as_bytes().to_vec(),
447                    value,
448                });
449            }
450        }
451
452        Ok(ops)
453    }
454}