Skip to main content

icydb_core/db/
session.rs

1// 3️⃣ Internal imports (implementation wiring)
2#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5    db::{
6        Db, FluentDeleteQuery, FluentLoadQuery, PagedGroupedExecutionWithTrace,
7        PagedLoadExecutionWithTrace, PlanError, Query, QueryError, ReadConsistency, Response,
8        WriteBatchResponse, WriteResponse,
9        cursor::CursorPlanError,
10        decode_cursor,
11        executor::{DeleteExecutor, ExecutablePlan, ExecutorPlanError, LoadExecutor, SaveExecutor},
12        query::intent::QueryMode,
13    },
14    error::InternalError,
15    obs::sink::{MetricsSink, with_metrics_sink},
16    traits::{CanisterKind, EntityKind, EntityValue},
17    types::{Decimal, Id},
18    value::Value,
19};
20
21// Map executor-owned plan-surface failures into query-owned plan errors.
22fn map_executor_plan_error(err: ExecutorPlanError) -> QueryError {
23    QueryError::from(err.into_plan_error())
24}
25
26///
27/// DbSession
28///
29/// Session-scoped database handle with policy (debug, metrics) and execution routing.
30///
31
32pub struct DbSession<C: CanisterKind> {
33    db: Db<C>,
34    debug: bool,
35    metrics: Option<&'static dyn MetricsSink>,
36}
37
38impl<C: CanisterKind> DbSession<C> {
39    #[must_use]
40    pub const fn new(db: Db<C>) -> Self {
41        Self {
42            db,
43            debug: false,
44            metrics: None,
45        }
46    }
47
48    #[must_use]
49    pub const fn debug(mut self) -> Self {
50        self.debug = true;
51        self
52    }
53
54    #[must_use]
55    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
56        self.metrics = Some(sink);
57        self
58    }
59
60    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
61        if let Some(sink) = self.metrics {
62            with_metrics_sink(sink, f)
63        } else {
64            f()
65        }
66    }
67
68    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
69    fn execute_save_with<E, T, R>(
70        &self,
71        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
72        map: impl FnOnce(T) -> R,
73    ) -> Result<R, InternalError>
74    where
75        E: EntityKind<Canister = C> + EntityValue,
76    {
77        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
78
79        Ok(map(value))
80    }
81
82    // Shared save-facade wrappers keep response shape explicit at call sites.
83    fn execute_save_entity<E>(
84        &self,
85        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
86    ) -> Result<WriteResponse<E>, InternalError>
87    where
88        E: EntityKind<Canister = C> + EntityValue,
89    {
90        self.execute_save_with(op, WriteResponse::new)
91    }
92
93    fn execute_save_batch<E>(
94        &self,
95        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
96    ) -> Result<WriteBatchResponse<E>, InternalError>
97    where
98        E: EntityKind<Canister = C> + EntityValue,
99    {
100        self.execute_save_with(op, WriteBatchResponse::new)
101    }
102
103    fn execute_save_view<E>(
104        &self,
105        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
106    ) -> Result<E::ViewType, InternalError>
107    where
108        E: EntityKind<Canister = C> + EntityValue,
109    {
110        self.execute_save_with(op, std::convert::identity)
111    }
112
113    // ---------------------------------------------------------------------
114    // Query entry points (public, fluent)
115    // ---------------------------------------------------------------------
116
117    #[must_use]
118    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
119    where
120        E: EntityKind<Canister = C>,
121    {
122        FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
123    }
124
125    #[must_use]
126    pub const fn load_with_consistency<E>(
127        &self,
128        consistency: ReadConsistency,
129    ) -> FluentLoadQuery<'_, E>
130    where
131        E: EntityKind<Canister = C>,
132    {
133        FluentLoadQuery::new(self, Query::new(consistency))
134    }
135
136    #[must_use]
137    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
138    where
139        E: EntityKind<Canister = C>,
140    {
141        FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
142    }
143
144    #[must_use]
145    pub fn delete_with_consistency<E>(
146        &self,
147        consistency: ReadConsistency,
148    ) -> FluentDeleteQuery<'_, E>
149    where
150        E: EntityKind<Canister = C>,
151    {
152        FluentDeleteQuery::new(self, Query::new(consistency).delete())
153    }
154
155    // ---------------------------------------------------------------------
156    // Low-level executors (crate-internal; execution primitives)
157    // ---------------------------------------------------------------------
158
159    #[must_use]
160    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
161    where
162        E: EntityKind<Canister = C> + EntityValue,
163    {
164        LoadExecutor::new(self.db, self.debug)
165    }
166
167    #[must_use]
168    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
169    where
170        E: EntityKind<Canister = C> + EntityValue,
171    {
172        DeleteExecutor::new(self.db, self.debug)
173    }
174
175    #[must_use]
176    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
177    where
178        E: EntityKind<Canister = C> + EntityValue,
179    {
180        SaveExecutor::new(self.db, self.debug)
181    }
182
183    // ---------------------------------------------------------------------
184    // Query diagnostics / execution (internal routing)
185    // ---------------------------------------------------------------------
186
187    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
188    where
189        E: EntityKind<Canister = C> + EntityValue,
190    {
191        let plan = query.plan()?;
192
193        let result = match query.mode() {
194            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
195            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
196        };
197
198        result.map_err(QueryError::Execute)
199    }
200
201    // Shared load-query terminal wrapper: build plan, run under metrics, map
202    // execution errors into query-facing errors.
203    fn execute_load_query_with<E, T>(
204        &self,
205        query: &Query<E>,
206        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
207    ) -> Result<T, QueryError>
208    where
209        E: EntityKind<Canister = C> + EntityValue,
210    {
211        let plan = query.plan()?;
212
213        self.with_metrics(|| op(self.load_executor::<E>(), plan))
214            .map_err(QueryError::Execute)
215    }
216
217    pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
218    where
219        E: EntityKind<Canister = C> + EntityValue,
220    {
221        self.execute_load_query_with(query, |load, plan| load.aggregate_count(plan))
222    }
223
224    pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
225    where
226        E: EntityKind<Canister = C> + EntityValue,
227    {
228        self.execute_load_query_with(query, |load, plan| load.aggregate_exists(plan))
229    }
230
231    pub(crate) fn execute_load_query_min<E>(
232        &self,
233        query: &Query<E>,
234    ) -> Result<Option<Id<E>>, QueryError>
235    where
236        E: EntityKind<Canister = C> + EntityValue,
237    {
238        self.execute_load_query_with(query, |load, plan| load.aggregate_min(plan))
239    }
240
241    pub(crate) fn execute_load_query_max<E>(
242        &self,
243        query: &Query<E>,
244    ) -> Result<Option<Id<E>>, QueryError>
245    where
246        E: EntityKind<Canister = C> + EntityValue,
247    {
248        self.execute_load_query_with(query, |load, plan| load.aggregate_max(plan))
249    }
250
251    pub(crate) fn execute_load_query_min_by<E>(
252        &self,
253        query: &Query<E>,
254        target_field: &str,
255    ) -> Result<Option<Id<E>>, QueryError>
256    where
257        E: EntityKind<Canister = C> + EntityValue,
258    {
259        self.execute_load_query_with(query, |load, plan| {
260            load.aggregate_min_by(plan, target_field)
261        })
262    }
263
264    pub(crate) fn execute_load_query_max_by<E>(
265        &self,
266        query: &Query<E>,
267        target_field: &str,
268    ) -> Result<Option<Id<E>>, QueryError>
269    where
270        E: EntityKind<Canister = C> + EntityValue,
271    {
272        self.execute_load_query_with(query, |load, plan| {
273            load.aggregate_max_by(plan, target_field)
274        })
275    }
276
277    pub(crate) fn execute_load_query_nth_by<E>(
278        &self,
279        query: &Query<E>,
280        target_field: &str,
281        nth: usize,
282    ) -> Result<Option<Id<E>>, QueryError>
283    where
284        E: EntityKind<Canister = C> + EntityValue,
285    {
286        self.execute_load_query_with(query, |load, plan| {
287            load.aggregate_nth_by(plan, target_field, nth)
288        })
289    }
290
291    pub(crate) fn execute_load_query_sum_by<E>(
292        &self,
293        query: &Query<E>,
294        target_field: &str,
295    ) -> Result<Option<Decimal>, QueryError>
296    where
297        E: EntityKind<Canister = C> + EntityValue,
298    {
299        self.execute_load_query_with(query, |load, plan| {
300            load.aggregate_sum_by(plan, target_field)
301        })
302    }
303
304    pub(crate) fn execute_load_query_avg_by<E>(
305        &self,
306        query: &Query<E>,
307        target_field: &str,
308    ) -> Result<Option<Decimal>, QueryError>
309    where
310        E: EntityKind<Canister = C> + EntityValue,
311    {
312        self.execute_load_query_with(query, |load, plan| {
313            load.aggregate_avg_by(plan, target_field)
314        })
315    }
316
317    pub(crate) fn execute_load_query_median_by<E>(
318        &self,
319        query: &Query<E>,
320        target_field: &str,
321    ) -> Result<Option<Id<E>>, QueryError>
322    where
323        E: EntityKind<Canister = C> + EntityValue,
324    {
325        self.execute_load_query_with(query, |load, plan| {
326            load.aggregate_median_by(plan, target_field)
327        })
328    }
329
330    pub(crate) fn execute_load_query_count_distinct_by<E>(
331        &self,
332        query: &Query<E>,
333        target_field: &str,
334    ) -> Result<u32, QueryError>
335    where
336        E: EntityKind<Canister = C> + EntityValue,
337    {
338        self.execute_load_query_with(query, |load, plan| {
339            load.aggregate_count_distinct_by(plan, target_field)
340        })
341    }
342
343    #[expect(clippy::type_complexity)]
344    pub(crate) fn execute_load_query_min_max_by<E>(
345        &self,
346        query: &Query<E>,
347        target_field: &str,
348    ) -> Result<Option<(Id<E>, Id<E>)>, QueryError>
349    where
350        E: EntityKind<Canister = C> + EntityValue,
351    {
352        self.execute_load_query_with(query, |load, plan| {
353            load.aggregate_min_max_by(plan, target_field)
354        })
355    }
356
357    pub(crate) fn execute_load_query_values_by<E>(
358        &self,
359        query: &Query<E>,
360        target_field: &str,
361    ) -> Result<Vec<Value>, QueryError>
362    where
363        E: EntityKind<Canister = C> + EntityValue,
364    {
365        self.execute_load_query_with(query, |load, plan| load.values_by(plan, target_field))
366    }
367
368    pub(crate) fn execute_load_query_take<E>(
369        &self,
370        query: &Query<E>,
371        take_count: u32,
372    ) -> Result<Response<E>, QueryError>
373    where
374        E: EntityKind<Canister = C> + EntityValue,
375    {
376        self.execute_load_query_with(query, |load, plan| load.take(plan, take_count))
377    }
378
379    pub(crate) fn execute_load_query_top_k_by<E>(
380        &self,
381        query: &Query<E>,
382        target_field: &str,
383        take_count: u32,
384    ) -> Result<Response<E>, QueryError>
385    where
386        E: EntityKind<Canister = C> + EntityValue,
387    {
388        self.execute_load_query_with(query, |load, plan| {
389            load.top_k_by(plan, target_field, take_count)
390        })
391    }
392
393    pub(crate) fn execute_load_query_bottom_k_by<E>(
394        &self,
395        query: &Query<E>,
396        target_field: &str,
397        take_count: u32,
398    ) -> Result<Response<E>, QueryError>
399    where
400        E: EntityKind<Canister = C> + EntityValue,
401    {
402        self.execute_load_query_with(query, |load, plan| {
403            load.bottom_k_by(plan, target_field, take_count)
404        })
405    }
406
407    pub(crate) fn execute_load_query_top_k_by_values<E>(
408        &self,
409        query: &Query<E>,
410        target_field: &str,
411        take_count: u32,
412    ) -> Result<Vec<Value>, QueryError>
413    where
414        E: EntityKind<Canister = C> + EntityValue,
415    {
416        self.execute_load_query_with(query, |load, plan| {
417            load.top_k_by_values(plan, target_field, take_count)
418        })
419    }
420
421    pub(crate) fn execute_load_query_bottom_k_by_values<E>(
422        &self,
423        query: &Query<E>,
424        target_field: &str,
425        take_count: u32,
426    ) -> Result<Vec<Value>, QueryError>
427    where
428        E: EntityKind<Canister = C> + EntityValue,
429    {
430        self.execute_load_query_with(query, |load, plan| {
431            load.bottom_k_by_values(plan, target_field, take_count)
432        })
433    }
434
435    pub(crate) fn execute_load_query_top_k_by_with_ids<E>(
436        &self,
437        query: &Query<E>,
438        target_field: &str,
439        take_count: u32,
440    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
441    where
442        E: EntityKind<Canister = C> + EntityValue,
443    {
444        self.execute_load_query_with(query, |load, plan| {
445            load.top_k_by_with_ids(plan, target_field, take_count)
446        })
447    }
448
449    pub(crate) fn execute_load_query_bottom_k_by_with_ids<E>(
450        &self,
451        query: &Query<E>,
452        target_field: &str,
453        take_count: u32,
454    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
455    where
456        E: EntityKind<Canister = C> + EntityValue,
457    {
458        self.execute_load_query_with(query, |load, plan| {
459            load.bottom_k_by_with_ids(plan, target_field, take_count)
460        })
461    }
462
463    pub(crate) fn execute_load_query_distinct_values_by<E>(
464        &self,
465        query: &Query<E>,
466        target_field: &str,
467    ) -> Result<Vec<Value>, QueryError>
468    where
469        E: EntityKind<Canister = C> + EntityValue,
470    {
471        self.execute_load_query_with(query, |load, plan| {
472            load.distinct_values_by(plan, target_field)
473        })
474    }
475
476    pub(crate) fn execute_load_query_values_by_with_ids<E>(
477        &self,
478        query: &Query<E>,
479        target_field: &str,
480    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
481    where
482        E: EntityKind<Canister = C> + EntityValue,
483    {
484        self.execute_load_query_with(query, |load, plan| {
485            load.values_by_with_ids(plan, target_field)
486        })
487    }
488
489    pub(crate) fn execute_load_query_first_value_by<E>(
490        &self,
491        query: &Query<E>,
492        target_field: &str,
493    ) -> Result<Option<Value>, QueryError>
494    where
495        E: EntityKind<Canister = C> + EntityValue,
496    {
497        self.execute_load_query_with(query, |load, plan| load.first_value_by(plan, target_field))
498    }
499
500    pub(crate) fn execute_load_query_last_value_by<E>(
501        &self,
502        query: &Query<E>,
503        target_field: &str,
504    ) -> Result<Option<Value>, QueryError>
505    where
506        E: EntityKind<Canister = C> + EntityValue,
507    {
508        self.execute_load_query_with(query, |load, plan| load.last_value_by(plan, target_field))
509    }
510
511    pub(crate) fn execute_load_query_first<E>(
512        &self,
513        query: &Query<E>,
514    ) -> Result<Option<Id<E>>, QueryError>
515    where
516        E: EntityKind<Canister = C> + EntityValue,
517    {
518        self.execute_load_query_with(query, |load, plan| load.aggregate_first(plan))
519    }
520
521    pub(crate) fn execute_load_query_last<E>(
522        &self,
523        query: &Query<E>,
524    ) -> Result<Option<Id<E>>, QueryError>
525    where
526        E: EntityKind<Canister = C> + EntityValue,
527    {
528        self.execute_load_query_with(query, |load, plan| load.aggregate_last(plan))
529    }
530
531    pub(crate) fn execute_load_query_paged_with_trace<E>(
532        &self,
533        query: &Query<E>,
534        cursor_token: Option<&str>,
535    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
536    where
537        E: EntityKind<Canister = C> + EntityValue,
538    {
539        let plan = query.plan()?;
540        if plan.as_inner().grouped_plan().is_some() {
541            return Err(QueryError::Execute(
542                InternalError::query_executor_invariant(
543                    "grouped plans require execute_grouped(...)",
544                ),
545            ));
546        }
547        let cursor_bytes = match cursor_token {
548            Some(token) => Some(decode_cursor(token).map_err(|reason| {
549                QueryError::from(PlanError::from(
550                    CursorPlanError::InvalidContinuationCursor { reason },
551                ))
552            })?),
553            None => None,
554        };
555        let cursor = plan
556            .prepare_cursor(cursor_bytes.as_deref())
557            .map_err(map_executor_plan_error)?;
558
559        let (page, trace) = self
560            .with_metrics(|| {
561                self.load_executor::<E>()
562                    .execute_paged_with_cursor_traced(plan, cursor)
563            })
564            .map_err(QueryError::Execute)?;
565        let next_cursor = page
566            .next_cursor
567            .map(|token| {
568                let Some(token) = token.as_scalar() else {
569                    return Err(QueryError::Execute(
570                        InternalError::query_executor_invariant(
571                            "scalar load pagination emitted grouped continuation token",
572                        ),
573                    ));
574                };
575
576                token.encode().map_err(|err| {
577                    QueryError::Execute(InternalError::serialize_internal(format!(
578                        "failed to serialize continuation cursor: {err}"
579                    )))
580                })
581            })
582            .transpose()?;
583
584        Ok(PagedLoadExecutionWithTrace::new(
585            page.items,
586            next_cursor,
587            trace,
588        ))
589    }
590
591    /// Execute one grouped query page with optional grouped continuation cursor.
592    ///
593    /// This is the explicit grouped execution boundary; scalar load APIs reject
594    /// grouped plans to preserve scalar response contracts.
595    pub fn execute_grouped<E>(
596        &self,
597        query: &Query<E>,
598        cursor_token: Option<&str>,
599    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
600    where
601        E: EntityKind<Canister = C> + EntityValue,
602    {
603        let plan = query.plan()?;
604        if plan.as_inner().grouped_plan().is_none() {
605            return Err(QueryError::Execute(
606                InternalError::query_executor_invariant(
607                    "execute_grouped requires grouped logical plans",
608                ),
609            ));
610        }
611        let cursor_bytes = match cursor_token {
612            Some(token) => Some(decode_cursor(token).map_err(|reason| {
613                QueryError::from(PlanError::from(
614                    CursorPlanError::InvalidContinuationCursor { reason },
615                ))
616            })?),
617            None => None,
618        };
619        let cursor = plan
620            .prepare_grouped_cursor(cursor_bytes.as_deref())
621            .map_err(map_executor_plan_error)?;
622
623        let (page, trace) = self
624            .with_metrics(|| {
625                self.load_executor::<E>()
626                    .execute_grouped_paged_with_cursor_traced(plan, cursor)
627            })
628            .map_err(QueryError::Execute)?;
629        let next_cursor = page
630            .next_cursor
631            .map(|token| {
632                let Some(token) = token.as_grouped() else {
633                    return Err(QueryError::Execute(
634                        InternalError::query_executor_invariant(
635                            "grouped pagination emitted scalar continuation token",
636                        ),
637                    ));
638                };
639
640                token.encode().map_err(|err| {
641                    QueryError::Execute(InternalError::serialize_internal(format!(
642                        "failed to serialize grouped continuation cursor: {err}"
643                    )))
644                })
645            })
646            .transpose()?;
647
648        Ok(PagedGroupedExecutionWithTrace::new(
649            page.rows,
650            next_cursor,
651            trace,
652        ))
653    }
654
655    // ---------------------------------------------------------------------
656    // High-level write API (public, intent-level)
657    // ---------------------------------------------------------------------
658
659    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
660    where
661        E: EntityKind<Canister = C> + EntityValue,
662    {
663        self.execute_save_entity(|save| save.insert(entity))
664    }
665
666    /// Insert a single-entity-type batch atomically in one commit window.
667    ///
668    /// If any item fails pre-commit validation, no row in the batch is persisted.
669    ///
670    /// This API is not a multi-entity transaction surface.
671    pub fn insert_many_atomic<E>(
672        &self,
673        entities: impl IntoIterator<Item = E>,
674    ) -> Result<WriteBatchResponse<E>, InternalError>
675    where
676        E: EntityKind<Canister = C> + EntityValue,
677    {
678        self.execute_save_batch(|save| save.insert_many_atomic(entities))
679    }
680
681    /// Insert a batch with explicitly non-atomic semantics.
682    ///
683    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
684    pub fn insert_many_non_atomic<E>(
685        &self,
686        entities: impl IntoIterator<Item = E>,
687    ) -> Result<WriteBatchResponse<E>, InternalError>
688    where
689        E: EntityKind<Canister = C> + EntityValue,
690    {
691        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
692    }
693
694    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
695    where
696        E: EntityKind<Canister = C> + EntityValue,
697    {
698        self.execute_save_entity(|save| save.replace(entity))
699    }
700
701    /// Replace a single-entity-type batch atomically in one commit window.
702    ///
703    /// If any item fails pre-commit validation, no row in the batch is persisted.
704    ///
705    /// This API is not a multi-entity transaction surface.
706    pub fn replace_many_atomic<E>(
707        &self,
708        entities: impl IntoIterator<Item = E>,
709    ) -> Result<WriteBatchResponse<E>, InternalError>
710    where
711        E: EntityKind<Canister = C> + EntityValue,
712    {
713        self.execute_save_batch(|save| save.replace_many_atomic(entities))
714    }
715
716    /// Replace a batch with explicitly non-atomic semantics.
717    ///
718    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
719    pub fn replace_many_non_atomic<E>(
720        &self,
721        entities: impl IntoIterator<Item = E>,
722    ) -> Result<WriteBatchResponse<E>, InternalError>
723    where
724        E: EntityKind<Canister = C> + EntityValue,
725    {
726        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
727    }
728
729    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
730    where
731        E: EntityKind<Canister = C> + EntityValue,
732    {
733        self.execute_save_entity(|save| save.update(entity))
734    }
735
736    /// Update a single-entity-type batch atomically in one commit window.
737    ///
738    /// If any item fails pre-commit validation, no row in the batch is persisted.
739    ///
740    /// This API is not a multi-entity transaction surface.
741    pub fn update_many_atomic<E>(
742        &self,
743        entities: impl IntoIterator<Item = E>,
744    ) -> Result<WriteBatchResponse<E>, InternalError>
745    where
746        E: EntityKind<Canister = C> + EntityValue,
747    {
748        self.execute_save_batch(|save| save.update_many_atomic(entities))
749    }
750
751    /// Update a batch with explicitly non-atomic semantics.
752    ///
753    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
754    pub fn update_many_non_atomic<E>(
755        &self,
756        entities: impl IntoIterator<Item = E>,
757    ) -> Result<WriteBatchResponse<E>, InternalError>
758    where
759        E: EntityKind<Canister = C> + EntityValue,
760    {
761        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
762    }
763
764    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
765    where
766        E: EntityKind<Canister = C> + EntityValue,
767    {
768        self.execute_save_view::<E>(|save| save.insert_view(view))
769    }
770
771    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
772    where
773        E: EntityKind<Canister = C> + EntityValue,
774    {
775        self.execute_save_view::<E>(|save| save.replace_view(view))
776    }
777
778    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
779    where
780        E: EntityKind<Canister = C> + EntityValue,
781    {
782        self.execute_save_view::<E>(|save| save.update_view(view))
783    }
784
785    /// TEST ONLY: clear all registered data and index stores for this database.
786    #[cfg(test)]
787    #[doc(hidden)]
788    pub fn clear_stores_for_tests(&self) {
789        self.db.with_store_registry(|reg| {
790            // Test cleanup only: clearing all stores is set-like and does not
791            // depend on registry iteration order.
792            for (_, store) in reg.iter() {
793                store.with_data_mut(DataStore::clear);
794                store.with_index_mut(IndexStore::clear);
795            }
796        });
797    }
798}