Skip to main content

icydb_core/db/executor/
delete.rs

1use crate::{
2    db::{
3        CommitDataOp, CommitIndexOp, CommitKind, CommitMarker, Db, begin_commit, ensure_recovered,
4        executor::{
5            ExecutorError, FilterEvaluator, UniqueIndexHandle, WriteUnit,
6            plan::{plan_for, record_plan_metrics, scan_strict, set_rows_from_len},
7            resolve_unique_pk,
8        },
9        finish_commit,
10        index::{
11            IndexEntry, IndexEntryCorruption, IndexKey, IndexRemoveOutcome, IndexStore,
12            RawIndexEntry, RawIndexKey,
13        },
14        primitives::FilterExpr,
15        query::{DeleteQuery, QueryPlan, QueryValidate},
16        response::Response,
17        store::DataKey,
18        traits::FromKey,
19    },
20    error::{ErrorOrigin, InternalError},
21    obs::sink::{self, ExecKind, MetricsEvent, Span},
22    prelude::*,
23    sanitize::sanitize,
24    traits::{EntityKind, FieldValue, Path},
25};
26use canic_cdk::structures::Storable;
27use std::{
28    cell::RefCell, collections::BTreeMap, marker::PhantomData, ops::ControlFlow, thread::LocalKey,
29};
30
31///
32/// DeleteAccumulator
33/// Collects rows to delete during planner execution.
34///
35
36struct DeleteAccumulator<'f, E> {
37    filter: Option<&'f FilterExpr>,
38    offset: usize,
39    skipped: usize,
40    limit: Option<usize>,
41    matches: Vec<(DataKey, E)>,
42}
43
44impl<'f, E: EntityKind> DeleteAccumulator<'f, E> {
45    fn new(filter: Option<&'f FilterExpr>, offset: usize, limit: Option<usize>) -> Self {
46        Self {
47            filter,
48            offset,
49            skipped: 0,
50            limit,
51            matches: Vec::with_capacity(limit.unwrap_or(0)),
52        }
53    }
54
55    fn limit_reached(&self) -> bool {
56        self.limit.is_some_and(|lim| self.matches.len() >= lim)
57    }
58
59    fn should_stop(&mut self, dk: DataKey, entity: E) -> bool {
60        if let Some(f) = self.filter
61            && !FilterEvaluator::new(&entity).eval(f)
62        {
63            return false;
64        }
65
66        if self.skipped < self.offset {
67            self.skipped += 1;
68            return false;
69        }
70
71        if self.limit_reached() {
72            return true;
73        }
74
75        self.matches.push((dk, entity));
76        false
77    }
78}
79
80///
81/// IndexPlan
82/// Prevalidated index store handle.
83///
84
85struct IndexPlan {
86    index: &'static IndexModel,
87    store: &'static LocalKey<RefCell<IndexStore>>,
88}
89
90///
91/// DeleteExecutor
92///
93/// Stage-1 atomicity invariant:
94/// All fallible validation completes before the first stable write.
95/// After mutation begins, only infallible operations or traps remain.
96/// IC rollback semantics guarantee atomicity within this update call.
97///
98#[derive(Clone, Copy)]
99pub struct DeleteExecutor<E: EntityKind> {
100    db: Db<E::Canister>,
101    debug: bool,
102    _marker: PhantomData<E>,
103}
104
105impl<E: EntityKind> DeleteExecutor<E> {
106    #[must_use]
107    pub const fn new(db: Db<E::Canister>, debug: bool) -> Self {
108        Self {
109            db,
110            debug,
111            _marker: PhantomData,
112        }
113    }
114
115    #[must_use]
116    pub const fn debug(mut self) -> Self {
117        self.debug = true;
118        self
119    }
120
121    // ─────────────────────────────────────────────
122    // PK helpers
123    // ─────────────────────────────────────────────
124
125    pub fn one(self, pk: impl FieldValue) -> Result<Response<E>, InternalError> {
126        self.execute(DeleteQuery::new().one::<E>(pk))
127    }
128
129    pub fn only(self) -> Result<Response<E>, InternalError> {
130        self.execute(DeleteQuery::new().one::<E>(()))
131    }
132
133    pub fn many<I, V>(self, values: I) -> Result<Response<E>, InternalError>
134    where
135        I: IntoIterator<Item = V>,
136        V: FieldValue,
137    {
138        self.execute(DeleteQuery::new().many_by_field(E::PRIMARY_KEY, values))
139    }
140
141    // ─────────────────────────────────────────────
142    // Unique-index delete
143    // ─────────────────────────────────────────────
144
145    pub fn by_unique_index(
146        self,
147        index: UniqueIndexHandle,
148        entity: E,
149    ) -> Result<Response<E>, InternalError>
150    where
151        E::PrimaryKey: FromKey,
152    {
153        let mut span = Span::<E>::new(ExecKind::Delete);
154        ensure_recovered(&self.db)?;
155
156        let index = index.index();
157        let mut lookup = entity;
158        sanitize(&mut lookup)?;
159
160        let Some(pk) = resolve_unique_pk::<E>(&self.db, index, &lookup)? else {
161            set_rows_from_len(&mut span, 0);
162            return Ok(Response(Vec::new()));
163        };
164
165        let (dk, stored) = self.load_existing(pk)?;
166        let ctx = self.db.context::<E>();
167        let index_plans = self.build_index_plans()?;
168        let index_ops = Self::build_index_removal_ops(&index_plans, &[&stored])?;
169
170        ctx.with_store(|_| ())?;
171
172        let marker = CommitMarker::new(
173            CommitKind::Delete,
174            index_ops,
175            vec![CommitDataOp {
176                store: E::Store::PATH.to_string(),
177                key: dk.to_raw().as_bytes().to_vec(),
178                value: None,
179            }],
180        )?;
181        let commit = begin_commit(marker)?;
182
183        finish_commit(
184            commit,
185            || {
186                let _unit = WriteUnit::new("delete_unique_row_stage1_atomic");
187                for plan in &index_plans {
188                    let outcome = plan
189                        .store
190                        .with_borrow_mut(|s| s.remove_index_entry(&stored, plan.index))
191                        .expect("index remove failed after prevalidation");
192                    if outcome == IndexRemoveOutcome::Removed {
193                        sink::record(MetricsEvent::IndexRemove {
194                            entity_path: E::PATH,
195                        });
196                    }
197                }
198            },
199            || {
200                ctx.with_store_mut(|s| s.remove(&dk.to_raw()))
201                    .expect("data store missing after preflight");
202            },
203        );
204
205        set_rows_from_len(&mut span, 1);
206        Ok(Response(vec![(dk.key(), stored)]))
207    }
208
209    // ─────────────────────────────────────────────
210    // Planner-based delete
211    // ─────────────────────────────────────────────
212
213    pub fn explain(self, query: DeleteQuery) -> Result<QueryPlan, InternalError> {
214        QueryValidate::<E>::validate(&query)?;
215        Ok(plan_for::<E>(query.filter.as_ref()))
216    }
217
218    pub fn execute(self, query: DeleteQuery) -> Result<Response<E>, InternalError> {
219        QueryValidate::<E>::validate(&query)?;
220        ensure_recovered(&self.db)?;
221
222        let mut span = Span::<E>::new(ExecKind::Delete);
223        let plan = plan_for::<E>(query.filter.as_ref());
224        record_plan_metrics(&plan);
225
226        let (limit, offset) = match query.limit.as_ref() {
227            Some(l) => (l.limit.map(|v| v as usize), l.offset as usize),
228            None => (None, 0),
229        };
230
231        let filter = query.filter.as_ref().map(|f| f.clone().simplify());
232        let mut acc = DeleteAccumulator::new(filter.as_ref(), offset, limit);
233        let mut scanned = 0u64;
234
235        scan_strict::<E, _>(&self.db, plan, |dk, entity| {
236            scanned += 1;
237            if acc.should_stop(dk, entity) {
238                ControlFlow::Break(())
239            } else {
240                ControlFlow::Continue(())
241            }
242        })?;
243
244        sink::record(MetricsEvent::RowsScanned {
245            entity_path: E::PATH,
246            rows_scanned: scanned,
247        });
248
249        if acc.matches.is_empty() {
250            set_rows_from_len(&mut span, 0);
251            return Ok(Response(Vec::new()));
252        }
253
254        let index_plans = self.build_index_plans()?;
255        let entities: Vec<&E> = acc.matches.iter().map(|(_, e)| e).collect();
256        let index_ops = Self::build_index_removal_ops(&index_plans, &entities)?;
257
258        let ctx = self.db.context::<E>();
259        ctx.with_store(|_| ())?;
260
261        let data_ops = acc
262            .matches
263            .iter()
264            .map(|(dk, _)| CommitDataOp {
265                store: E::Store::PATH.to_string(),
266                key: dk.to_raw().as_bytes().to_vec(),
267                value: None,
268            })
269            .collect();
270
271        let marker = CommitMarker::new(CommitKind::Delete, index_ops, data_ops)?;
272        let commit = begin_commit(marker)?;
273
274        finish_commit(
275            commit,
276            || {
277                for (_, entity) in &acc.matches {
278                    let _unit = WriteUnit::new("delete_row_stage1_atomic");
279                    for plan in &index_plans {
280                        let outcome = plan
281                            .store
282                            .with_borrow_mut(|s| s.remove_index_entry(entity, plan.index))
283                            .expect("index remove failed after prevalidation");
284                        if outcome == IndexRemoveOutcome::Removed {
285                            sink::record(MetricsEvent::IndexRemove {
286                                entity_path: E::PATH,
287                            });
288                        }
289                    }
290                }
291            },
292            || {
293                ctx.with_store_mut(|s| {
294                    for (dk, _) in &acc.matches {
295                        s.remove(&dk.to_raw());
296                    }
297                })
298                .expect("data store missing after preflight");
299            },
300        );
301
302        let res = acc
303            .matches
304            .into_iter()
305            .map(|(dk, e)| (dk.key(), e))
306            .collect::<Vec<_>>();
307        set_rows_from_len(&mut span, res.len());
308
309        Ok(Response(res))
310    }
311
312    // ─────────────────────────────────────────────
313    // Helpers
314    // ─────────────────────────────────────────────
315
316    fn load_existing(&self, pk: E::PrimaryKey) -> Result<(DataKey, E), InternalError> {
317        let dk = DataKey::new::<E>(pk.into());
318        let row = self.db.context::<E>().read_strict(&dk)?;
319        let entity = row.try_decode::<E>().map_err(|err| {
320            ExecutorError::corruption(
321                ErrorOrigin::Serialize,
322                format!("failed to deserialize row: {dk} ({err})"),
323            )
324        })?;
325        Ok((dk, entity))
326    }
327
328    fn build_index_plans(&self) -> Result<Vec<IndexPlan>, InternalError> {
329        E::INDEXES
330            .iter()
331            .map(|index| {
332                let store = self.db.with_index(|reg| reg.try_get_store(index.store))?;
333                Ok(IndexPlan { index, store })
334            })
335            .collect()
336    }
337
338    fn build_index_removal_ops(
339        plans: &[IndexPlan],
340        entities: &[&E],
341    ) -> Result<Vec<CommitIndexOp>, InternalError> {
342        let mut ops = Vec::new();
343
344        for plan in plans {
345            let fields = plan.index.fields.join(", ");
346            let mut entries: BTreeMap<RawIndexKey, Option<IndexEntry>> = BTreeMap::new();
347
348            for entity in entities {
349                let Some(key) = IndexKey::new(*entity, plan.index) else {
350                    continue;
351                };
352                let raw_key = key.to_raw();
353
354                let entry = match entries.entry(raw_key) {
355                    std::collections::btree_map::Entry::Vacant(slot) => {
356                        let decoded = plan
357                            .store
358                            .with_borrow(|s| s.get(&raw_key))
359                            .map(|raw| {
360                                raw.try_decode().map_err(|err| {
361                                    ExecutorError::corruption(
362                                        ErrorOrigin::Index,
363                                        format!(
364                                            "index corrupted: {} ({}) -> {}",
365                                            E::PATH,
366                                            fields,
367                                            err
368                                        ),
369                                    )
370                                })
371                            })
372                            .transpose()?;
373                        slot.insert(decoded)
374                    }
375                    std::collections::btree_map::Entry::Occupied(slot) => slot.into_mut(),
376                };
377
378                if let Some(e) = entry.as_mut() {
379                    e.remove_key(&entity.key());
380                    if e.is_empty() {
381                        *entry = None;
382                    }
383                }
384            }
385
386            for (raw_key, entry) in entries {
387                let value = if let Some(entry) = entry {
388                    let raw = RawIndexEntry::try_from_entry(&entry).map_err(|err| {
389                        ExecutorError::corruption(
390                            ErrorOrigin::Index,
391                            format!(
392                                "index corrupted: {} ({}) -> {}",
393                                E::PATH,
394                                fields,
395                                IndexEntryCorruption::TooManyKeys { count: err.keys() }
396                            ),
397                        )
398                    })?;
399                    Some(raw.into_bytes())
400                } else {
401                    None
402                };
403
404                ops.push(CommitIndexOp {
405                    store: plan.index.store.to_string(),
406                    key: raw_key.as_bytes().to_vec(),
407                    value,
408                });
409            }
410        }
411
412        Ok(ops)
413    }
414}