Skip to main content

icydb_core/db/
session.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6#[cfg(test)]
7use crate::db::{DataStore, IndexStore};
8use crate::{
9    db::{
10        Db, EntityResponse, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
11        MissingRowPolicy, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace, PlanError,
12        Query, QueryError, QueryTracePlan, StorageReport, StoreRegistry, TraceExecutionStrategy,
13        WriteBatchResponse,
14        access::AccessStrategy,
15        commit::EntityRuntimeHooks,
16        cursor::decode_optional_cursor_token,
17        executor::{
18            DeleteExecutor, ExecutablePlan, ExecutionStrategy, ExecutorPlanError, LoadExecutor,
19            SaveExecutor,
20        },
21        query::{
22            builder::aggregate::AggregateExpr, explain::ExplainAggregateTerminalPlan,
23            plan::QueryMode,
24        },
25        schema::{describe_entity_model, show_indexes_for_model},
26        sql::lowering::{SqlCommand, SqlLoweringError, compile_sql_command},
27    },
28    error::{ErrorClass, ErrorOrigin, InternalError},
29    metrics::sink::{MetricsSink, with_metrics_sink},
30    traits::{CanisterKind, EntityKind, EntityValue},
31    value::Value,
32};
33use std::thread::LocalKey;
34
35// Map executor-owned plan-surface failures into query-owned plan errors.
36fn map_executor_plan_error(err: ExecutorPlanError) -> QueryError {
37    match err {
38        ExecutorPlanError::Cursor(err) => QueryError::from(PlanError::from(*err)),
39    }
40}
41
42// Decode one optional external cursor token and map decode failures into the
43// query-plan cursor error boundary.
44fn decode_optional_cursor_bytes(cursor_token: Option<&str>) -> Result<Option<Vec<u8>>, QueryError> {
45    decode_optional_cursor_token(cursor_token).map_err(|err| QueryError::from(PlanError::from(err)))
46}
47
48// Map SQL frontend parse/lowering failures into query-facing execution errors.
49fn map_sql_lowering_error(err: SqlLoweringError) -> QueryError {
50    QueryError::execute(InternalError::classified(
51        ErrorClass::Unsupported,
52        ErrorOrigin::Query,
53        format!("SQL query is not executable in this release: {err}"),
54    ))
55}
56
57///
58/// DbSession
59///
60/// Session-scoped database handle with policy (debug, metrics) and execution routing.
61///
62
63pub struct DbSession<C: CanisterKind> {
64    db: Db<C>,
65    debug: bool,
66    metrics: Option<&'static dyn MetricsSink>,
67}
68
69impl<C: CanisterKind> DbSession<C> {
70    /// Construct one session facade for a database handle.
71    #[must_use]
72    pub(crate) const fn new(db: Db<C>) -> Self {
73        Self {
74            db,
75            debug: false,
76            metrics: None,
77        }
78    }
79
80    /// Construct one session facade from store registry and runtime hooks.
81    #[must_use]
82    pub const fn new_with_hooks(
83        store: &'static LocalKey<StoreRegistry>,
84        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
85    ) -> Self {
86        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
87    }
88
89    /// Enable debug execution behavior where supported by executors.
90    #[must_use]
91    pub const fn debug(mut self) -> Self {
92        self.debug = true;
93        self
94    }
95
96    /// Attach one metrics sink for all session-executed operations.
97    #[must_use]
98    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
99        self.metrics = Some(sink);
100        self
101    }
102
103    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
104        if let Some(sink) = self.metrics {
105            with_metrics_sink(sink, f)
106        } else {
107            f()
108        }
109    }
110
111    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
112    fn execute_save_with<E, T, R>(
113        &self,
114        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
115        map: impl FnOnce(T) -> R,
116    ) -> Result<R, InternalError>
117    where
118        E: EntityKind<Canister = C> + EntityValue,
119    {
120        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
121
122        Ok(map(value))
123    }
124
125    // Shared save-facade wrappers keep response shape explicit at call sites.
126    fn execute_save_entity<E>(
127        &self,
128        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
129    ) -> Result<E, InternalError>
130    where
131        E: EntityKind<Canister = C> + EntityValue,
132    {
133        self.execute_save_with(op, std::convert::identity)
134    }
135
136    fn execute_save_batch<E>(
137        &self,
138        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
139    ) -> Result<WriteBatchResponse<E>, InternalError>
140    where
141        E: EntityKind<Canister = C> + EntityValue,
142    {
143        self.execute_save_with(op, WriteBatchResponse::new)
144    }
145
146    fn execute_save_view<E>(
147        &self,
148        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
149    ) -> Result<E::ViewType, InternalError>
150    where
151        E: EntityKind<Canister = C> + EntityValue,
152    {
153        self.execute_save_with(op, std::convert::identity)
154    }
155
156    // ---------------------------------------------------------------------
157    // Query entry points (public, fluent)
158    // ---------------------------------------------------------------------
159
160    /// Start a fluent load query with default missing-row policy (`Ignore`).
161    #[must_use]
162    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
163    where
164        E: EntityKind<Canister = C>,
165    {
166        FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
167    }
168
169    /// Start a fluent load query with explicit missing-row policy.
170    #[must_use]
171    pub const fn load_with_consistency<E>(
172        &self,
173        consistency: MissingRowPolicy,
174    ) -> FluentLoadQuery<'_, E>
175    where
176        E: EntityKind<Canister = C>,
177    {
178        FluentLoadQuery::new(self, Query::new(consistency))
179    }
180
181    /// Build one typed query intent from one reduced SQL statement.
182    ///
183    /// This parser/lowering entrypoint is intentionally constrained to the
184    /// executable subset wired in the current release.
185    pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
186    where
187        E: EntityKind<Canister = C>,
188    {
189        let command = compile_sql_command::<E>(sql, MissingRowPolicy::Ignore)
190            .map_err(map_sql_lowering_error)?;
191
192        match command {
193            SqlCommand::Query(query) => Ok(query),
194            SqlCommand::Explain { .. } => Err(QueryError::execute(InternalError::classified(
195                ErrorClass::Unsupported,
196                ErrorOrigin::Query,
197                "query_from_sql does not accept EXPLAIN statements; use explain_sql(...)",
198            ))),
199        }
200    }
201
202    /// Execute one reduced SQL `SELECT *`/`DELETE` statement for entity `E`.
203    pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
204    where
205        E: EntityKind<Canister = C> + EntityValue,
206    {
207        let query = self.query_from_sql::<E>(sql)?;
208        self.execute_query(&query)
209    }
210
211    /// Explain one reduced SQL statement for entity `E`.
212    ///
213    /// Supported modes:
214    /// - `EXPLAIN ...` -> logical plan text
215    /// - `EXPLAIN EXECUTION ...` -> execution descriptor text
216    /// - `EXPLAIN JSON ...` -> execution descriptor canonical JSON
217    pub fn explain_sql<E>(&self, sql: &str) -> Result<String, QueryError>
218    where
219        E: EntityKind<Canister = C> + EntityValue,
220    {
221        let command = compile_sql_command::<E>(sql, MissingRowPolicy::Ignore)
222            .map_err(map_sql_lowering_error)?;
223
224        match command {
225            SqlCommand::Query(_) => Err(QueryError::execute(InternalError::classified(
226                ErrorClass::Unsupported,
227                ErrorOrigin::Query,
228                "explain_sql requires an EXPLAIN statement",
229            ))),
230            SqlCommand::Explain { mode, query } => match mode {
231                crate::db::sql::parser::SqlExplainMode::Plan => {
232                    Ok(format!("{:?}", query.explain()?))
233                }
234                crate::db::sql::parser::SqlExplainMode::Execution => query.explain_execution_text(),
235                crate::db::sql::parser::SqlExplainMode::Json => query.explain_execution_json(),
236            },
237        }
238    }
239
240    /// Start a fluent delete query with default missing-row policy (`Ignore`).
241    #[must_use]
242    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
243    where
244        E: EntityKind<Canister = C>,
245    {
246        FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
247    }
248
249    /// Start a fluent delete query with explicit missing-row policy.
250    #[must_use]
251    pub fn delete_with_consistency<E>(
252        &self,
253        consistency: MissingRowPolicy,
254    ) -> FluentDeleteQuery<'_, E>
255    where
256        E: EntityKind<Canister = C>,
257    {
258        FluentDeleteQuery::new(self, Query::new(consistency).delete())
259    }
260
261    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
262    ///
263    /// This terminal bypasses query planning and access routing entirely.
264    #[must_use]
265    pub const fn select_one(&self) -> Value {
266        Value::Int(1)
267    }
268
269    /// Return one stable, human-readable index listing for the entity schema.
270    ///
271    /// Output format mirrors SQL-style introspection:
272    /// - `PRIMARY KEY (field)`
273    /// - `INDEX name (field_a, field_b)`
274    /// - `UNIQUE INDEX name (field_a, field_b)`
275    #[must_use]
276    pub fn show_indexes<E>(&self) -> Vec<String>
277    where
278        E: EntityKind<Canister = C>,
279    {
280        show_indexes_for_model(E::MODEL)
281    }
282
283    /// Return one structured schema description for the entity.
284    ///
285    /// This is a typed `DESCRIBE`-style introspection surface consumed by
286    /// developer tooling and pre-EXPLAIN debugging.
287    #[must_use]
288    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
289    where
290        E: EntityKind<Canister = C>,
291    {
292        describe_entity_model(E::MODEL)
293    }
294
295    /// Build one point-in-time storage report for observability endpoints.
296    pub fn storage_report(
297        &self,
298        name_to_path: &[(&'static str, &'static str)],
299    ) -> Result<StorageReport, InternalError> {
300        self.db.storage_report(name_to_path)
301    }
302
303    // ---------------------------------------------------------------------
304    // Low-level executors (crate-internal; execution primitives)
305    // ---------------------------------------------------------------------
306
307    #[must_use]
308    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
309    where
310        E: EntityKind<Canister = C> + EntityValue,
311    {
312        LoadExecutor::new(self.db, self.debug)
313    }
314
315    #[must_use]
316    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
317    where
318        E: EntityKind<Canister = C> + EntityValue,
319    {
320        DeleteExecutor::new(self.db, self.debug)
321    }
322
323    #[must_use]
324    pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
325    where
326        E: EntityKind<Canister = C> + EntityValue,
327    {
328        SaveExecutor::new(self.db, self.debug)
329    }
330
331    // ---------------------------------------------------------------------
332    // Query diagnostics / execution (internal routing)
333    // ---------------------------------------------------------------------
334
335    /// Execute one scalar load/delete query and return materialized response rows.
336    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
337    where
338        E: EntityKind<Canister = C> + EntityValue,
339    {
340        let plan = query.plan()?.into_executable();
341
342        let result = match query.mode() {
343            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
344            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
345        };
346
347        result.map_err(QueryError::execute)
348    }
349
350    // Shared load-query terminal wrapper: build plan, run under metrics, map
351    // execution errors into query-facing errors.
352    pub(in crate::db) fn execute_load_query_with<E, T>(
353        &self,
354        query: &Query<E>,
355        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
356    ) -> Result<T, QueryError>
357    where
358        E: EntityKind<Canister = C> + EntityValue,
359    {
360        let plan = query.plan()?.into_executable();
361
362        self.with_metrics(|| op(self.load_executor::<E>(), plan))
363            .map_err(QueryError::execute)
364    }
365
366    /// Build one trace payload for a query without executing it.
367    ///
368    /// This lightweight surface is intended for developer diagnostics:
369    /// plan hash, access strategy summary, and planner/executor route shape.
370    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
371    where
372        E: EntityKind<Canister = C>,
373    {
374        let compiled = query.plan()?;
375        let explain = compiled.explain();
376        let plan_hash = compiled.plan_hash_hex();
377
378        let executable = compiled.into_executable();
379        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
380        let execution_strategy = match query.mode() {
381            QueryMode::Load(_) => Some(trace_execution_strategy(
382                executable
383                    .execution_strategy()
384                    .map_err(QueryError::execute)?,
385            )),
386            QueryMode::Delete(_) => None,
387        };
388
389        Ok(QueryTracePlan::new(
390            plan_hash,
391            access_strategy,
392            execution_strategy,
393            explain,
394        ))
395    }
396
397    /// Build one aggregate-terminal explain payload without executing the query.
398    pub(crate) fn explain_load_query_terminal_with<E>(
399        query: &Query<E>,
400        aggregate: AggregateExpr,
401    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
402    where
403        E: EntityKind<Canister = C> + EntityValue,
404    {
405        // Phase 1: build one compiled query once and project logical explain output.
406        let compiled = query.plan()?;
407        let query_explain = compiled.explain();
408        let terminal = aggregate.kind();
409
410        // Phase 2: derive the executor route label for this aggregate terminal.
411        let executable = compiled.into_executable();
412        let execution = executable.explain_aggregate_terminal_execution_descriptor(aggregate);
413
414        Ok(ExplainAggregateTerminalPlan::new(
415            query_explain,
416            terminal,
417            execution,
418        ))
419    }
420
421    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
422    pub(crate) fn execute_load_query_paged_with_trace<E>(
423        &self,
424        query: &Query<E>,
425        cursor_token: Option<&str>,
426    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
427    where
428        E: EntityKind<Canister = C> + EntityValue,
429    {
430        // Phase 1: build/validate executable plan and reject grouped plans.
431        let plan = query.plan()?.into_executable();
432        match plan.execution_strategy().map_err(QueryError::execute)? {
433            ExecutionStrategy::PrimaryKey => {
434                return Err(QueryError::execute(
435                    crate::db::error::query_executor_invariant(
436                        "cursor pagination requires explicit or grouped ordering",
437                    ),
438                ));
439            }
440            ExecutionStrategy::Ordered => {}
441            ExecutionStrategy::Grouped => {
442                return Err(QueryError::execute(
443                    crate::db::error::query_executor_invariant(
444                        "grouped plans require execute_grouped(...)",
445                    ),
446                ));
447            }
448        }
449
450        // Phase 2: decode external cursor token and validate it against plan surface.
451        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
452        let cursor = plan
453            .prepare_cursor(cursor_bytes.as_deref())
454            .map_err(map_executor_plan_error)?;
455
456        // Phase 3: execute one traced page and encode outbound continuation token.
457        let (page, trace) = self
458            .with_metrics(|| {
459                self.load_executor::<E>()
460                    .execute_paged_with_cursor_traced(plan, cursor)
461            })
462            .map_err(QueryError::execute)?;
463        let next_cursor = page
464            .next_cursor
465            .map(|token| {
466                let Some(token) = token.as_scalar() else {
467                    return Err(QueryError::execute(
468                        crate::db::error::query_executor_invariant(
469                            "scalar load pagination emitted grouped continuation token",
470                        ),
471                    ));
472                };
473
474                token.encode().map_err(|err| {
475                    QueryError::execute(InternalError::serialize_internal(format!(
476                        "failed to serialize continuation cursor: {err}"
477                    )))
478                })
479            })
480            .transpose()?;
481
482        Ok(PagedLoadExecutionWithTrace::new(
483            page.items,
484            next_cursor,
485            trace,
486        ))
487    }
488
489    /// Execute one grouped query page with optional grouped continuation cursor.
490    ///
491    /// This is the explicit grouped execution boundary; scalar load APIs reject
492    /// grouped plans to preserve scalar response contracts.
493    pub fn execute_grouped<E>(
494        &self,
495        query: &Query<E>,
496        cursor_token: Option<&str>,
497    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
498    where
499        E: EntityKind<Canister = C> + EntityValue,
500    {
501        // Phase 1: build/validate executable plan and require grouped shape.
502        let plan = query.plan()?.into_executable();
503        if !matches!(
504            plan.execution_strategy().map_err(QueryError::execute)?,
505            ExecutionStrategy::Grouped
506        ) {
507            return Err(QueryError::execute(
508                crate::db::error::query_executor_invariant(
509                    "execute_grouped requires grouped logical plans",
510                ),
511            ));
512        }
513
514        // Phase 2: decode external grouped cursor token and validate against plan.
515        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
516        let cursor = plan
517            .prepare_grouped_cursor(cursor_bytes.as_deref())
518            .map_err(map_executor_plan_error)?;
519
520        // Phase 3: execute grouped page and encode outbound grouped continuation token.
521        let (page, trace) = self
522            .with_metrics(|| {
523                self.load_executor::<E>()
524                    .execute_grouped_paged_with_cursor_traced(plan, cursor)
525            })
526            .map_err(QueryError::execute)?;
527        let next_cursor = page
528            .next_cursor
529            .map(|token| {
530                let Some(token) = token.as_grouped() else {
531                    return Err(QueryError::execute(
532                        crate::db::error::query_executor_invariant(
533                            "grouped pagination emitted scalar continuation token",
534                        ),
535                    ));
536                };
537
538                token.encode().map_err(|err| {
539                    QueryError::execute(InternalError::serialize_internal(format!(
540                        "failed to serialize grouped continuation cursor: {err}"
541                    )))
542                })
543            })
544            .transpose()?;
545
546        Ok(PagedGroupedExecutionWithTrace::new(
547            page.rows,
548            next_cursor,
549            trace,
550        ))
551    }
552
553    // ---------------------------------------------------------------------
554    // High-level write API (public, intent-level)
555    // ---------------------------------------------------------------------
556
557    /// Insert one entity row.
558    pub fn insert<E>(&self, entity: E) -> Result<E, InternalError>
559    where
560        E: EntityKind<Canister = C> + EntityValue,
561    {
562        self.execute_save_entity(|save| save.insert(entity))
563    }
564
565    /// Insert a single-entity-type batch atomically in one commit window.
566    ///
567    /// If any item fails pre-commit validation, no row in the batch is persisted.
568    ///
569    /// This API is not a multi-entity transaction surface.
570    pub fn insert_many_atomic<E>(
571        &self,
572        entities: impl IntoIterator<Item = E>,
573    ) -> Result<WriteBatchResponse<E>, InternalError>
574    where
575        E: EntityKind<Canister = C> + EntityValue,
576    {
577        self.execute_save_batch(|save| save.insert_many_atomic(entities))
578    }
579
580    /// Insert a batch with explicitly non-atomic semantics.
581    ///
582    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
583    pub fn insert_many_non_atomic<E>(
584        &self,
585        entities: impl IntoIterator<Item = E>,
586    ) -> Result<WriteBatchResponse<E>, InternalError>
587    where
588        E: EntityKind<Canister = C> + EntityValue,
589    {
590        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
591    }
592
593    /// Replace one existing entity row.
594    pub fn replace<E>(&self, entity: E) -> Result<E, InternalError>
595    where
596        E: EntityKind<Canister = C> + EntityValue,
597    {
598        self.execute_save_entity(|save| save.replace(entity))
599    }
600
601    /// Replace a single-entity-type batch atomically in one commit window.
602    ///
603    /// If any item fails pre-commit validation, no row in the batch is persisted.
604    ///
605    /// This API is not a multi-entity transaction surface.
606    pub fn replace_many_atomic<E>(
607        &self,
608        entities: impl IntoIterator<Item = E>,
609    ) -> Result<WriteBatchResponse<E>, InternalError>
610    where
611        E: EntityKind<Canister = C> + EntityValue,
612    {
613        self.execute_save_batch(|save| save.replace_many_atomic(entities))
614    }
615
616    /// Replace a batch with explicitly non-atomic semantics.
617    ///
618    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
619    pub fn replace_many_non_atomic<E>(
620        &self,
621        entities: impl IntoIterator<Item = E>,
622    ) -> Result<WriteBatchResponse<E>, InternalError>
623    where
624        E: EntityKind<Canister = C> + EntityValue,
625    {
626        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
627    }
628
629    /// Update one existing entity row.
630    pub fn update<E>(&self, entity: E) -> Result<E, InternalError>
631    where
632        E: EntityKind<Canister = C> + EntityValue,
633    {
634        self.execute_save_entity(|save| save.update(entity))
635    }
636
637    /// Update a single-entity-type batch atomically in one commit window.
638    ///
639    /// If any item fails pre-commit validation, no row in the batch is persisted.
640    ///
641    /// This API is not a multi-entity transaction surface.
642    pub fn update_many_atomic<E>(
643        &self,
644        entities: impl IntoIterator<Item = E>,
645    ) -> Result<WriteBatchResponse<E>, InternalError>
646    where
647        E: EntityKind<Canister = C> + EntityValue,
648    {
649        self.execute_save_batch(|save| save.update_many_atomic(entities))
650    }
651
652    /// Update a batch with explicitly non-atomic semantics.
653    ///
654    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
655    pub fn update_many_non_atomic<E>(
656        &self,
657        entities: impl IntoIterator<Item = E>,
658    ) -> Result<WriteBatchResponse<E>, InternalError>
659    where
660        E: EntityKind<Canister = C> + EntityValue,
661    {
662        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
663    }
664
665    /// Insert one view value and return the stored view.
666    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
667    where
668        E: EntityKind<Canister = C> + EntityValue,
669    {
670        self.execute_save_view::<E>(|save| save.insert_view(view))
671    }
672
673    /// Replace one view value and return the stored view.
674    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
675    where
676        E: EntityKind<Canister = C> + EntityValue,
677    {
678        self.execute_save_view::<E>(|save| save.replace_view(view))
679    }
680
681    /// Update one view value and return the stored view.
682    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
683    where
684        E: EntityKind<Canister = C> + EntityValue,
685    {
686        self.execute_save_view::<E>(|save| save.update_view(view))
687    }
688
689    /// TEST ONLY: clear all registered data and index stores for this database.
690    #[cfg(test)]
691    #[doc(hidden)]
692    pub fn clear_stores_for_tests(&self) {
693        self.db.with_store_registry(|reg| {
694            // Test cleanup only: clearing all stores is set-like and does not
695            // depend on registry iteration order.
696            for (_, store) in reg.iter() {
697                store.with_data_mut(DataStore::clear);
698                store.with_index_mut(IndexStore::clear);
699            }
700        });
701    }
702}
703
704const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
705    match strategy {
706        ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
707        ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
708        ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
709    }
710}
711
712///
713/// TESTS
714///
715
716#[cfg(test)]
717mod tests {
718    use super::*;
719    use crate::{
720        db::{
721            Db,
722            commit::{ensure_recovered, init_commit_store_for_tests},
723            cursor::CursorPlanError,
724            data::DataStore,
725            index::IndexStore,
726            registry::StoreRegistry,
727        },
728        model::field::FieldKind,
729        testing::test_memory,
730        traits::Path,
731        types::Ulid,
732    };
733    use icydb_derive::FieldProjection;
734    use serde::{Deserialize, Serialize};
735    use std::cell::RefCell;
736
737    crate::test_canister! {
738        ident = SessionSqlCanister,
739        commit_memory_id = crate::testing::test_commit_memory_id(),
740    }
741
742    crate::test_store! {
743        ident = SessionSqlStore,
744        canister = SessionSqlCanister,
745    }
746
747    thread_local! {
748        static SESSION_SQL_DATA_STORE: RefCell<DataStore> =
749            RefCell::new(DataStore::init(test_memory(160)));
750        static SESSION_SQL_INDEX_STORE: RefCell<IndexStore> =
751            RefCell::new(IndexStore::init(test_memory(161)));
752        static SESSION_SQL_STORE_REGISTRY: StoreRegistry = {
753            let mut reg = StoreRegistry::new();
754            reg.register_store(
755                SessionSqlStore::PATH,
756                &SESSION_SQL_DATA_STORE,
757                &SESSION_SQL_INDEX_STORE,
758            )
759            .expect("SQL session test store registration should succeed");
760            reg
761        };
762    }
763
764    static SESSION_SQL_DB: Db<SessionSqlCanister> = Db::new(&SESSION_SQL_STORE_REGISTRY);
765
766    ///
767    /// SessionSqlEntity
768    ///
769    /// Test entity used to lock end-to-end reduced SQL session behavior.
770    ///
771
772    #[derive(Clone, Debug, Default, Deserialize, FieldProjection, PartialEq, Serialize)]
773    struct SessionSqlEntity {
774        id: Ulid,
775        name: String,
776        age: u64,
777    }
778
779    crate::test_entity_schema! {
780        ident = SessionSqlEntity,
781        id = Ulid,
782        id_field = id,
783        entity_name = "SessionSqlEntity",
784        primary_key = "id",
785        pk_index = 0,
786        fields = [
787            ("id", FieldKind::Ulid),
788            ("name", FieldKind::Text),
789            ("age", FieldKind::Uint),
790        ],
791        indexes = [],
792        store = SessionSqlStore,
793        canister = SessionSqlCanister,
794    }
795
796    // Reset all session SQL fixture state between tests to preserve deterministic assertions.
797    fn reset_session_sql_store() {
798        init_commit_store_for_tests().expect("commit store init should succeed");
799        ensure_recovered(&SESSION_SQL_DB).expect("write-side recovery should succeed");
800        SESSION_SQL_DATA_STORE.with(|store| store.borrow_mut().clear());
801        SESSION_SQL_INDEX_STORE.with(|store| store.borrow_mut().clear());
802    }
803
804    fn sql_session() -> DbSession<SessionSqlCanister> {
805        DbSession::new(SESSION_SQL_DB)
806    }
807
808    // Assert query-surface cursor errors remain wrapped under QueryError::Plan(PlanError::Cursor).
809    fn assert_query_error_is_cursor_plan(
810        err: QueryError,
811        predicate: impl FnOnce(&CursorPlanError) -> bool,
812    ) {
813        assert!(matches!(
814            err,
815            QueryError::Plan(plan_err)
816                if matches!(
817                    plan_err.as_ref(),
818                    PlanError::Cursor(inner) if predicate(inner.as_ref())
819                )
820        ));
821    }
822
823    // Assert both session conversion paths preserve the same cursor-plan variant payload.
824    fn assert_cursor_mapping_parity(
825        build: impl Fn() -> CursorPlanError,
826        predicate: impl Fn(&CursorPlanError) -> bool + Copy,
827    ) {
828        let mapped_via_executor = map_executor_plan_error(ExecutorPlanError::from(build()));
829        assert_query_error_is_cursor_plan(mapped_via_executor, predicate);
830
831        let mapped_via_plan = QueryError::from(PlanError::from(build()));
832        assert_query_error_is_cursor_plan(mapped_via_plan, predicate);
833    }
834
835    #[test]
836    fn session_cursor_error_mapping_parity_boundary_arity() {
837        assert_cursor_mapping_parity(
838            || CursorPlanError::continuation_cursor_boundary_arity_mismatch(2, 1),
839            |inner| {
840                matches!(
841                    inner,
842                    CursorPlanError::ContinuationCursorBoundaryArityMismatch {
843                        expected: 2,
844                        found: 1
845                    }
846                )
847            },
848        );
849    }
850
851    #[test]
852    fn session_cursor_error_mapping_parity_window_mismatch() {
853        assert_cursor_mapping_parity(
854            || CursorPlanError::continuation_cursor_window_mismatch(8, 3),
855            |inner| {
856                matches!(
857                    inner,
858                    CursorPlanError::ContinuationCursorWindowMismatch {
859                        expected_offset: 8,
860                        actual_offset: 3
861                    }
862                )
863            },
864        );
865    }
866
867    #[test]
868    fn session_cursor_error_mapping_parity_decode_reason() {
869        assert_cursor_mapping_parity(
870            || {
871                CursorPlanError::invalid_continuation_cursor(
872                    crate::db::codec::cursor::CursorDecodeError::OddLength,
873                )
874            },
875            |inner| {
876                matches!(
877                    inner,
878                    CursorPlanError::InvalidContinuationCursor {
879                        reason: crate::db::codec::cursor::CursorDecodeError::OddLength
880                    }
881                )
882            },
883        );
884    }
885
886    #[test]
887    fn session_cursor_error_mapping_parity_primary_key_type_mismatch() {
888        assert_cursor_mapping_parity(
889            || {
890                CursorPlanError::continuation_cursor_primary_key_type_mismatch(
891                    "id",
892                    "ulid",
893                    Some(crate::value::Value::Text("not-a-ulid".to_string())),
894                )
895            },
896            |inner| {
897                matches!(
898                    inner,
899                    CursorPlanError::ContinuationCursorPrimaryKeyTypeMismatch {
900                        field,
901                        expected,
902                        value: Some(crate::value::Value::Text(value))
903                    } if field == "id" && expected == "ulid" && value == "not-a-ulid"
904                )
905            },
906        );
907    }
908
909    #[test]
910    fn session_cursor_error_mapping_parity_matrix_preserves_cursor_variants() {
911        // Keep one matrix-level canary test name so cross-module audit references remain stable.
912        assert_cursor_mapping_parity(
913            || CursorPlanError::continuation_cursor_boundary_arity_mismatch(2, 1),
914            |inner| {
915                matches!(
916                    inner,
917                    CursorPlanError::ContinuationCursorBoundaryArityMismatch {
918                        expected: 2,
919                        found: 1
920                    }
921                )
922            },
923        );
924    }
925
926    #[test]
927    fn execute_sql_select_star_honors_order_limit_offset() {
928        reset_session_sql_store();
929        let session = sql_session();
930
931        session
932            .insert(SessionSqlEntity {
933                id: Ulid::generate(),
934                name: "older".to_string(),
935                age: 37,
936            })
937            .expect("seed insert should succeed");
938        session
939            .insert(SessionSqlEntity {
940                id: Ulid::generate(),
941                name: "younger".to_string(),
942                age: 19,
943            })
944            .expect("seed insert should succeed");
945
946        let response = session
947            .execute_sql::<SessionSqlEntity>(
948                "SELECT * FROM SessionSqlEntity ORDER BY age ASC LIMIT 1 OFFSET 1",
949            )
950            .expect("SELECT * should execute");
951
952        assert_eq!(response.count(), 1, "window should return one row");
953        let row = response
954            .iter()
955            .next()
956            .expect("windowed result should include one row");
957        assert_eq!(
958            row.entity_ref().name,
959            "older",
960            "ordered window should return the second age-ordered row",
961        );
962    }
963
964    #[test]
965    fn execute_sql_delete_honors_predicate_order_and_limit() {
966        reset_session_sql_store();
967        let session = sql_session();
968
969        session
970            .insert(SessionSqlEntity {
971                id: Ulid::generate(),
972                name: "first-minor".to_string(),
973                age: 16,
974            })
975            .expect("seed insert should succeed");
976        session
977            .insert(SessionSqlEntity {
978                id: Ulid::generate(),
979                name: "second-minor".to_string(),
980                age: 17,
981            })
982            .expect("seed insert should succeed");
983        session
984            .insert(SessionSqlEntity {
985                id: Ulid::generate(),
986                name: "adult".to_string(),
987                age: 42,
988            })
989            .expect("seed insert should succeed");
990
991        let deleted = session
992            .execute_sql::<SessionSqlEntity>(
993                "DELETE FROM SessionSqlEntity WHERE age < 20 ORDER BY age ASC LIMIT 1",
994            )
995            .expect("DELETE should execute");
996
997        assert_eq!(deleted.count(), 1, "delete limit should remove one row");
998        assert_eq!(
999            deleted
1000                .iter()
1001                .next()
1002                .expect("deleted row should exist")
1003                .entity_ref()
1004                .age,
1005            16,
1006            "ordered delete should remove the youngest matching row first",
1007        );
1008
1009        let remaining = session
1010            .load::<SessionSqlEntity>()
1011            .order_by("age")
1012            .execute()
1013            .expect("post-delete load should succeed");
1014        let remaining_ages = remaining
1015            .iter()
1016            .map(|row| row.entity_ref().age)
1017            .collect::<Vec<_>>();
1018
1019        assert_eq!(
1020            remaining_ages,
1021            vec![17, 42],
1022            "delete window semantics should preserve non-deleted rows",
1023        );
1024    }
1025
1026    #[test]
1027    fn query_from_sql_rejects_explain_statements() {
1028        reset_session_sql_store();
1029        let session = sql_session();
1030
1031        let err = session
1032            .query_from_sql::<SessionSqlEntity>("EXPLAIN SELECT * FROM SessionSqlEntity")
1033            .expect_err("query_from_sql must reject EXPLAIN statements");
1034
1035        assert!(
1036            matches!(
1037                err,
1038                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1039                    _
1040                ))
1041            ),
1042            "query_from_sql EXPLAIN rejection must map to unsupported execution class",
1043        );
1044    }
1045
1046    #[test]
1047    fn explain_sql_execution_returns_descriptor_text() {
1048        reset_session_sql_store();
1049        let session = sql_session();
1050
1051        let explain = session
1052            .explain_sql::<SessionSqlEntity>(
1053                "EXPLAIN EXECUTION SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
1054            )
1055            .expect("EXPLAIN EXECUTION should succeed");
1056
1057        assert!(
1058            explain.contains("node_id=0"),
1059            "execution explain output should include the root descriptor node id",
1060        );
1061        assert!(
1062            explain.contains("layer="),
1063            "execution explain output should include execution layer annotations",
1064        );
1065    }
1066
1067    #[test]
1068    fn explain_sql_plan_returns_logical_plan_text() {
1069        reset_session_sql_store();
1070        let session = sql_session();
1071
1072        let explain = session
1073            .explain_sql::<SessionSqlEntity>(
1074                "EXPLAIN SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
1075            )
1076            .expect("EXPLAIN should succeed");
1077
1078        assert!(
1079            explain.contains("mode: Load"),
1080            "logical explain text should include query mode projection",
1081        );
1082        assert!(
1083            explain.contains("access:"),
1084            "logical explain text should include projected access shape",
1085        );
1086    }
1087
1088    #[test]
1089    fn explain_sql_json_returns_execution_descriptor_json() {
1090        reset_session_sql_store();
1091        let session = sql_session();
1092
1093        let explain = session
1094            .explain_sql::<SessionSqlEntity>(
1095                "EXPLAIN JSON SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
1096            )
1097            .expect("EXPLAIN JSON should succeed");
1098
1099        assert!(
1100            explain.starts_with('{') && explain.ends_with('}'),
1101            "execution explain JSON should render one JSON object payload",
1102        );
1103        assert!(
1104            explain.contains("\"node_id\":0"),
1105            "execution explain JSON should include the root descriptor node id",
1106        );
1107    }
1108
1109    #[test]
1110    fn explain_sql_rejects_non_explain_statements() {
1111        reset_session_sql_store();
1112        let session = sql_session();
1113
1114        let err = session
1115            .explain_sql::<SessionSqlEntity>("SELECT * FROM SessionSqlEntity")
1116            .expect_err("explain_sql must reject non-EXPLAIN statements");
1117
1118        assert!(
1119            matches!(
1120                err,
1121                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1122                    _
1123                ))
1124            ),
1125            "non-EXPLAIN input must fail as unsupported explain usage",
1126        );
1127    }
1128}