Skip to main content

icydb_core/db/
session.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6#[cfg(test)]
7use crate::db::{DataStore, IndexStore};
8use crate::{
9    db::{
10        Db, EntityResponse, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
11        MissingRowPolicy, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace, PlanError,
12        ProjectionResponse, Query, QueryError, QueryTracePlan, StorageReport, StoreRegistry,
13        TraceExecutionStrategy, WriteBatchResponse,
14        access::AccessStrategy,
15        commit::EntityRuntimeHooks,
16        cursor::decode_optional_cursor_token,
17        executor::{
18            DeleteExecutor, ExecutablePlan, ExecutionStrategy, ExecutorPlanError, LoadExecutor,
19            SaveExecutor,
20        },
21        query::{
22            builder::aggregate::AggregateExpr, explain::ExplainAggregateTerminalPlan,
23            plan::QueryMode,
24        },
25        schema::{describe_entity_model, show_indexes_for_model},
26        sql::lowering::{SqlCommand, SqlLoweringError, compile_sql_command},
27    },
28    error::{ErrorClass, ErrorOrigin, InternalError},
29    metrics::sink::{MetricsSink, with_metrics_sink},
30    traits::{CanisterKind, EntityKind, EntityValue},
31    value::Value,
32};
33use std::thread::LocalKey;
34
35// Map executor-owned plan-surface failures into query-owned plan errors.
36fn map_executor_plan_error(err: ExecutorPlanError) -> QueryError {
37    match err {
38        ExecutorPlanError::Cursor(err) => QueryError::from(PlanError::from(*err)),
39    }
40}
41
42// Decode one optional external cursor token and map decode failures into the
43// query-plan cursor error boundary.
44fn decode_optional_cursor_bytes(cursor_token: Option<&str>) -> Result<Option<Vec<u8>>, QueryError> {
45    decode_optional_cursor_token(cursor_token).map_err(|err| QueryError::from(PlanError::from(err)))
46}
47
48// Map SQL frontend parse/lowering failures into query-facing execution errors.
49fn map_sql_lowering_error(err: SqlLoweringError) -> QueryError {
50    QueryError::execute(InternalError::classified(
51        ErrorClass::Unsupported,
52        ErrorOrigin::Query,
53        format!("SQL query is not executable in this release: {err}"),
54    ))
55}
56
57///
58/// DbSession
59///
60/// Session-scoped database handle with policy (debug, metrics) and execution routing.
61///
62
63pub struct DbSession<C: CanisterKind> {
64    db: Db<C>,
65    debug: bool,
66    metrics: Option<&'static dyn MetricsSink>,
67}
68
69impl<C: CanisterKind> DbSession<C> {
70    /// Construct one session facade for a database handle.
71    #[must_use]
72    pub(crate) const fn new(db: Db<C>) -> Self {
73        Self {
74            db,
75            debug: false,
76            metrics: None,
77        }
78    }
79
80    /// Construct one session facade from store registry and runtime hooks.
81    #[must_use]
82    pub const fn new_with_hooks(
83        store: &'static LocalKey<StoreRegistry>,
84        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
85    ) -> Self {
86        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
87    }
88
89    /// Enable debug execution behavior where supported by executors.
90    #[must_use]
91    pub const fn debug(mut self) -> Self {
92        self.debug = true;
93        self
94    }
95
96    /// Attach one metrics sink for all session-executed operations.
97    #[must_use]
98    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
99        self.metrics = Some(sink);
100        self
101    }
102
103    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
104        if let Some(sink) = self.metrics {
105            with_metrics_sink(sink, f)
106        } else {
107            f()
108        }
109    }
110
111    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
112    fn execute_save_with<E, T, R>(
113        &self,
114        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
115        map: impl FnOnce(T) -> R,
116    ) -> Result<R, InternalError>
117    where
118        E: EntityKind<Canister = C> + EntityValue,
119    {
120        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
121
122        Ok(map(value))
123    }
124
125    // Shared save-facade wrappers keep response shape explicit at call sites.
126    fn execute_save_entity<E>(
127        &self,
128        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
129    ) -> Result<E, InternalError>
130    where
131        E: EntityKind<Canister = C> + EntityValue,
132    {
133        self.execute_save_with(op, std::convert::identity)
134    }
135
136    fn execute_save_batch<E>(
137        &self,
138        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
139    ) -> Result<WriteBatchResponse<E>, InternalError>
140    where
141        E: EntityKind<Canister = C> + EntityValue,
142    {
143        self.execute_save_with(op, WriteBatchResponse::new)
144    }
145
146    fn execute_save_view<E>(
147        &self,
148        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
149    ) -> Result<E::ViewType, InternalError>
150    where
151        E: EntityKind<Canister = C> + EntityValue,
152    {
153        self.execute_save_with(op, std::convert::identity)
154    }
155
156    // ---------------------------------------------------------------------
157    // Query entry points (public, fluent)
158    // ---------------------------------------------------------------------
159
160    /// Start a fluent load query with default missing-row policy (`Ignore`).
161    #[must_use]
162    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
163    where
164        E: EntityKind<Canister = C>,
165    {
166        FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
167    }
168
169    /// Start a fluent load query with explicit missing-row policy.
170    #[must_use]
171    pub const fn load_with_consistency<E>(
172        &self,
173        consistency: MissingRowPolicy,
174    ) -> FluentLoadQuery<'_, E>
175    where
176        E: EntityKind<Canister = C>,
177    {
178        FluentLoadQuery::new(self, Query::new(consistency))
179    }
180
181    /// Build one typed query intent from one reduced SQL statement.
182    ///
183    /// This parser/lowering entrypoint is intentionally constrained to the
184    /// executable subset wired in the current release.
185    pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
186    where
187        E: EntityKind<Canister = C>,
188    {
189        let command = compile_sql_command::<E>(sql, MissingRowPolicy::Ignore)
190            .map_err(map_sql_lowering_error)?;
191
192        match command {
193            SqlCommand::Query(query) => Ok(query),
194            SqlCommand::Explain { .. } => Err(QueryError::execute(InternalError::classified(
195                ErrorClass::Unsupported,
196                ErrorOrigin::Query,
197                "query_from_sql does not accept EXPLAIN statements; use explain_sql(...)",
198            ))),
199        }
200    }
201
202    /// Execute one reduced SQL `SELECT`/`DELETE` statement for entity `E`.
203    pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
204    where
205        E: EntityKind<Canister = C> + EntityValue,
206    {
207        let query = self.query_from_sql::<E>(sql)?;
208        self.execute_query(&query)
209    }
210
211    /// Execute one reduced SQL `SELECT` statement and return projection-shaped rows.
212    ///
213    /// This surface keeps `execute_sql(...)` backwards-compatible for callers
214    /// that currently consume full entity rows.
215    pub fn execute_sql_projection<E>(&self, sql: &str) -> Result<ProjectionResponse<E>, QueryError>
216    where
217        E: EntityKind<Canister = C> + EntityValue,
218    {
219        let query = self.query_from_sql::<E>(sql)?;
220        match query.mode() {
221            QueryMode::Load(_) => {
222                self.execute_load_query_with(&query, |load, plan| load.execute_projection(plan))
223            }
224            QueryMode::Delete(_) => Err(QueryError::execute(InternalError::classified(
225                ErrorClass::Unsupported,
226                ErrorOrigin::Query,
227                "execute_sql_projection only supports SELECT statements",
228            ))),
229        }
230    }
231
232    /// Explain one reduced SQL statement for entity `E`.
233    ///
234    /// Supported modes:
235    /// - `EXPLAIN ...` -> logical plan text
236    /// - `EXPLAIN EXECUTION ...` -> execution descriptor text
237    /// - `EXPLAIN JSON ...` -> logical plan canonical JSON
238    pub fn explain_sql<E>(&self, sql: &str) -> Result<String, QueryError>
239    where
240        E: EntityKind<Canister = C> + EntityValue,
241    {
242        let command = compile_sql_command::<E>(sql, MissingRowPolicy::Ignore)
243            .map_err(map_sql_lowering_error)?;
244
245        match command {
246            SqlCommand::Query(_) => Err(QueryError::execute(InternalError::classified(
247                ErrorClass::Unsupported,
248                ErrorOrigin::Query,
249                "explain_sql requires an EXPLAIN statement",
250            ))),
251            SqlCommand::Explain { mode, query } => match mode {
252                crate::db::sql::parser::SqlExplainMode::Plan => {
253                    Ok(query.explain()?.render_text_canonical())
254                }
255                crate::db::sql::parser::SqlExplainMode::Execution => query.explain_execution_text(),
256                crate::db::sql::parser::SqlExplainMode::Json => {
257                    Ok(query.explain()?.render_json_canonical())
258                }
259            },
260        }
261    }
262
263    /// Start a fluent delete query with default missing-row policy (`Ignore`).
264    #[must_use]
265    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
266    where
267        E: EntityKind<Canister = C>,
268    {
269        FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
270    }
271
272    /// Start a fluent delete query with explicit missing-row policy.
273    #[must_use]
274    pub fn delete_with_consistency<E>(
275        &self,
276        consistency: MissingRowPolicy,
277    ) -> FluentDeleteQuery<'_, E>
278    where
279        E: EntityKind<Canister = C>,
280    {
281        FluentDeleteQuery::new(self, Query::new(consistency).delete())
282    }
283
284    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
285    ///
286    /// This terminal bypasses query planning and access routing entirely.
287    #[must_use]
288    pub const fn select_one(&self) -> Value {
289        Value::Int(1)
290    }
291
292    /// Return one stable, human-readable index listing for the entity schema.
293    ///
294    /// Output format mirrors SQL-style introspection:
295    /// - `PRIMARY KEY (field)`
296    /// - `INDEX name (field_a, field_b)`
297    /// - `UNIQUE INDEX name (field_a, field_b)`
298    #[must_use]
299    pub fn show_indexes<E>(&self) -> Vec<String>
300    where
301        E: EntityKind<Canister = C>,
302    {
303        show_indexes_for_model(E::MODEL)
304    }
305
306    /// Return one structured schema description for the entity.
307    ///
308    /// This is a typed `DESCRIBE`-style introspection surface consumed by
309    /// developer tooling and pre-EXPLAIN debugging.
310    #[must_use]
311    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
312    where
313        E: EntityKind<Canister = C>,
314    {
315        describe_entity_model(E::MODEL)
316    }
317
318    /// Build one point-in-time storage report for observability endpoints.
319    pub fn storage_report(
320        &self,
321        name_to_path: &[(&'static str, &'static str)],
322    ) -> Result<StorageReport, InternalError> {
323        self.db.storage_report(name_to_path)
324    }
325
326    // ---------------------------------------------------------------------
327    // Low-level executors (crate-internal; execution primitives)
328    // ---------------------------------------------------------------------
329
330    #[must_use]
331    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
332    where
333        E: EntityKind<Canister = C> + EntityValue,
334    {
335        LoadExecutor::new(self.db, self.debug)
336    }
337
338    #[must_use]
339    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
340    where
341        E: EntityKind<Canister = C> + EntityValue,
342    {
343        DeleteExecutor::new(self.db, self.debug)
344    }
345
346    #[must_use]
347    pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
348    where
349        E: EntityKind<Canister = C> + EntityValue,
350    {
351        SaveExecutor::new(self.db, self.debug)
352    }
353
354    // ---------------------------------------------------------------------
355    // Query diagnostics / execution (internal routing)
356    // ---------------------------------------------------------------------
357
358    /// Execute one scalar load/delete query and return materialized response rows.
359    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
360    where
361        E: EntityKind<Canister = C> + EntityValue,
362    {
363        let plan = query.plan()?.into_executable();
364
365        let result = match query.mode() {
366            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
367            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
368        };
369
370        result.map_err(QueryError::execute)
371    }
372
373    // Shared load-query terminal wrapper: build plan, run under metrics, map
374    // execution errors into query-facing errors.
375    pub(in crate::db) fn execute_load_query_with<E, T>(
376        &self,
377        query: &Query<E>,
378        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
379    ) -> Result<T, QueryError>
380    where
381        E: EntityKind<Canister = C> + EntityValue,
382    {
383        let plan = query.plan()?.into_executable();
384
385        self.with_metrics(|| op(self.load_executor::<E>(), plan))
386            .map_err(QueryError::execute)
387    }
388
389    /// Build one trace payload for a query without executing it.
390    ///
391    /// This lightweight surface is intended for developer diagnostics:
392    /// plan hash, access strategy summary, and planner/executor route shape.
393    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
394    where
395        E: EntityKind<Canister = C>,
396    {
397        let compiled = query.plan()?;
398        let explain = compiled.explain();
399        let plan_hash = compiled.plan_hash_hex();
400
401        let executable = compiled.into_executable();
402        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
403        let execution_strategy = match query.mode() {
404            QueryMode::Load(_) => Some(trace_execution_strategy(
405                executable
406                    .execution_strategy()
407                    .map_err(QueryError::execute)?,
408            )),
409            QueryMode::Delete(_) => None,
410        };
411
412        Ok(QueryTracePlan::new(
413            plan_hash,
414            access_strategy,
415            execution_strategy,
416            explain,
417        ))
418    }
419
420    /// Build one aggregate-terminal explain payload without executing the query.
421    pub(crate) fn explain_load_query_terminal_with<E>(
422        query: &Query<E>,
423        aggregate: AggregateExpr,
424    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
425    where
426        E: EntityKind<Canister = C> + EntityValue,
427    {
428        // Phase 1: build one compiled query once and project logical explain output.
429        let compiled = query.plan()?;
430        let query_explain = compiled.explain();
431        let terminal = aggregate.kind();
432
433        // Phase 2: derive the executor route label for this aggregate terminal.
434        let executable = compiled.into_executable();
435        let execution = executable.explain_aggregate_terminal_execution_descriptor(aggregate);
436
437        Ok(ExplainAggregateTerminalPlan::new(
438            query_explain,
439            terminal,
440            execution,
441        ))
442    }
443
444    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
445    pub(crate) fn execute_load_query_paged_with_trace<E>(
446        &self,
447        query: &Query<E>,
448        cursor_token: Option<&str>,
449    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
450    where
451        E: EntityKind<Canister = C> + EntityValue,
452    {
453        // Phase 1: build/validate executable plan and reject grouped plans.
454        let plan = query.plan()?.into_executable();
455        match plan.execution_strategy().map_err(QueryError::execute)? {
456            ExecutionStrategy::PrimaryKey => {
457                return Err(QueryError::execute(
458                    crate::db::error::query_executor_invariant(
459                        "cursor pagination requires explicit or grouped ordering",
460                    ),
461                ));
462            }
463            ExecutionStrategy::Ordered => {}
464            ExecutionStrategy::Grouped => {
465                return Err(QueryError::execute(
466                    crate::db::error::query_executor_invariant(
467                        "grouped plans require execute_grouped(...)",
468                    ),
469                ));
470            }
471        }
472
473        // Phase 2: decode external cursor token and validate it against plan surface.
474        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
475        let cursor = plan
476            .prepare_cursor(cursor_bytes.as_deref())
477            .map_err(map_executor_plan_error)?;
478
479        // Phase 3: execute one traced page and encode outbound continuation token.
480        let (page, trace) = self
481            .with_metrics(|| {
482                self.load_executor::<E>()
483                    .execute_paged_with_cursor_traced(plan, cursor)
484            })
485            .map_err(QueryError::execute)?;
486        let next_cursor = page
487            .next_cursor
488            .map(|token| {
489                let Some(token) = token.as_scalar() else {
490                    return Err(QueryError::execute(
491                        crate::db::error::query_executor_invariant(
492                            "scalar load pagination emitted grouped continuation token",
493                        ),
494                    ));
495                };
496
497                token.encode().map_err(|err| {
498                    QueryError::execute(InternalError::serialize_internal(format!(
499                        "failed to serialize continuation cursor: {err}"
500                    )))
501                })
502            })
503            .transpose()?;
504
505        Ok(PagedLoadExecutionWithTrace::new(
506            page.items,
507            next_cursor,
508            trace,
509        ))
510    }
511
512    /// Execute one grouped query page with optional grouped continuation cursor.
513    ///
514    /// This is the explicit grouped execution boundary; scalar load APIs reject
515    /// grouped plans to preserve scalar response contracts.
516    pub fn execute_grouped<E>(
517        &self,
518        query: &Query<E>,
519        cursor_token: Option<&str>,
520    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
521    where
522        E: EntityKind<Canister = C> + EntityValue,
523    {
524        // Phase 1: build/validate executable plan and require grouped shape.
525        let plan = query.plan()?.into_executable();
526        if !matches!(
527            plan.execution_strategy().map_err(QueryError::execute)?,
528            ExecutionStrategy::Grouped
529        ) {
530            return Err(QueryError::execute(
531                crate::db::error::query_executor_invariant(
532                    "execute_grouped requires grouped logical plans",
533                ),
534            ));
535        }
536
537        // Phase 2: decode external grouped cursor token and validate against plan.
538        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
539        let cursor = plan
540            .prepare_grouped_cursor(cursor_bytes.as_deref())
541            .map_err(map_executor_plan_error)?;
542
543        // Phase 3: execute grouped page and encode outbound grouped continuation token.
544        let (page, trace) = self
545            .with_metrics(|| {
546                self.load_executor::<E>()
547                    .execute_grouped_paged_with_cursor_traced(plan, cursor)
548            })
549            .map_err(QueryError::execute)?;
550        let next_cursor = page
551            .next_cursor
552            .map(|token| {
553                let Some(token) = token.as_grouped() else {
554                    return Err(QueryError::execute(
555                        crate::db::error::query_executor_invariant(
556                            "grouped pagination emitted scalar continuation token",
557                        ),
558                    ));
559                };
560
561                token.encode().map_err(|err| {
562                    QueryError::execute(InternalError::serialize_internal(format!(
563                        "failed to serialize grouped continuation cursor: {err}"
564                    )))
565                })
566            })
567            .transpose()?;
568
569        Ok(PagedGroupedExecutionWithTrace::new(
570            page.rows,
571            next_cursor,
572            trace,
573        ))
574    }
575
576    // ---------------------------------------------------------------------
577    // High-level write API (public, intent-level)
578    // ---------------------------------------------------------------------
579
580    /// Insert one entity row.
581    pub fn insert<E>(&self, entity: E) -> Result<E, InternalError>
582    where
583        E: EntityKind<Canister = C> + EntityValue,
584    {
585        self.execute_save_entity(|save| save.insert(entity))
586    }
587
588    /// Insert a single-entity-type batch atomically in one commit window.
589    ///
590    /// If any item fails pre-commit validation, no row in the batch is persisted.
591    ///
592    /// This API is not a multi-entity transaction surface.
593    pub fn insert_many_atomic<E>(
594        &self,
595        entities: impl IntoIterator<Item = E>,
596    ) -> Result<WriteBatchResponse<E>, InternalError>
597    where
598        E: EntityKind<Canister = C> + EntityValue,
599    {
600        self.execute_save_batch(|save| save.insert_many_atomic(entities))
601    }
602
603    /// Insert a batch with explicitly non-atomic semantics.
604    ///
605    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
606    pub fn insert_many_non_atomic<E>(
607        &self,
608        entities: impl IntoIterator<Item = E>,
609    ) -> Result<WriteBatchResponse<E>, InternalError>
610    where
611        E: EntityKind<Canister = C> + EntityValue,
612    {
613        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
614    }
615
616    /// Replace one existing entity row.
617    pub fn replace<E>(&self, entity: E) -> Result<E, InternalError>
618    where
619        E: EntityKind<Canister = C> + EntityValue,
620    {
621        self.execute_save_entity(|save| save.replace(entity))
622    }
623
624    /// Replace a single-entity-type batch atomically in one commit window.
625    ///
626    /// If any item fails pre-commit validation, no row in the batch is persisted.
627    ///
628    /// This API is not a multi-entity transaction surface.
629    pub fn replace_many_atomic<E>(
630        &self,
631        entities: impl IntoIterator<Item = E>,
632    ) -> Result<WriteBatchResponse<E>, InternalError>
633    where
634        E: EntityKind<Canister = C> + EntityValue,
635    {
636        self.execute_save_batch(|save| save.replace_many_atomic(entities))
637    }
638
639    /// Replace a batch with explicitly non-atomic semantics.
640    ///
641    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
642    pub fn replace_many_non_atomic<E>(
643        &self,
644        entities: impl IntoIterator<Item = E>,
645    ) -> Result<WriteBatchResponse<E>, InternalError>
646    where
647        E: EntityKind<Canister = C> + EntityValue,
648    {
649        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
650    }
651
652    /// Update one existing entity row.
653    pub fn update<E>(&self, entity: E) -> Result<E, InternalError>
654    where
655        E: EntityKind<Canister = C> + EntityValue,
656    {
657        self.execute_save_entity(|save| save.update(entity))
658    }
659
660    /// Update a single-entity-type batch atomically in one commit window.
661    ///
662    /// If any item fails pre-commit validation, no row in the batch is persisted.
663    ///
664    /// This API is not a multi-entity transaction surface.
665    pub fn update_many_atomic<E>(
666        &self,
667        entities: impl IntoIterator<Item = E>,
668    ) -> Result<WriteBatchResponse<E>, InternalError>
669    where
670        E: EntityKind<Canister = C> + EntityValue,
671    {
672        self.execute_save_batch(|save| save.update_many_atomic(entities))
673    }
674
675    /// Update a batch with explicitly non-atomic semantics.
676    ///
677    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
678    pub fn update_many_non_atomic<E>(
679        &self,
680        entities: impl IntoIterator<Item = E>,
681    ) -> Result<WriteBatchResponse<E>, InternalError>
682    where
683        E: EntityKind<Canister = C> + EntityValue,
684    {
685        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
686    }
687
688    /// Insert one view value and return the stored view.
689    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
690    where
691        E: EntityKind<Canister = C> + EntityValue,
692    {
693        self.execute_save_view::<E>(|save| save.insert_view(view))
694    }
695
696    /// Replace one view value and return the stored view.
697    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
698    where
699        E: EntityKind<Canister = C> + EntityValue,
700    {
701        self.execute_save_view::<E>(|save| save.replace_view(view))
702    }
703
704    /// Update one view value and return the stored view.
705    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
706    where
707        E: EntityKind<Canister = C> + EntityValue,
708    {
709        self.execute_save_view::<E>(|save| save.update_view(view))
710    }
711
712    /// TEST ONLY: clear all registered data and index stores for this database.
713    #[cfg(test)]
714    #[doc(hidden)]
715    pub fn clear_stores_for_tests(&self) {
716        self.db.with_store_registry(|reg| {
717            // Test cleanup only: clearing all stores is set-like and does not
718            // depend on registry iteration order.
719            for (_, store) in reg.iter() {
720                store.with_data_mut(DataStore::clear);
721                store.with_index_mut(IndexStore::clear);
722            }
723        });
724    }
725}
726
727const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
728    match strategy {
729        ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
730        ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
731        ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
732    }
733}
734
735///
736/// TESTS
737///
738
739#[cfg(test)]
740mod tests {
741    use super::*;
742    use crate::{
743        db::{
744            Db,
745            commit::{ensure_recovered, init_commit_store_for_tests},
746            cursor::CursorPlanError,
747            data::DataStore,
748            index::IndexStore,
749            query::plan::expr::{Expr, ProjectionField},
750            registry::StoreRegistry,
751        },
752        model::field::FieldKind,
753        testing::test_memory,
754        traits::Path,
755        types::Ulid,
756        value::Value,
757    };
758    use icydb_derive::FieldProjection;
759    use serde::{Deserialize, Serialize};
760    use std::cell::RefCell;
761
762    crate::test_canister! {
763        ident = SessionSqlCanister,
764        commit_memory_id = crate::testing::test_commit_memory_id(),
765    }
766
767    crate::test_store! {
768        ident = SessionSqlStore,
769        canister = SessionSqlCanister,
770    }
771
772    thread_local! {
773        static SESSION_SQL_DATA_STORE: RefCell<DataStore> =
774            RefCell::new(DataStore::init(test_memory(160)));
775        static SESSION_SQL_INDEX_STORE: RefCell<IndexStore> =
776            RefCell::new(IndexStore::init(test_memory(161)));
777        static SESSION_SQL_STORE_REGISTRY: StoreRegistry = {
778            let mut reg = StoreRegistry::new();
779            reg.register_store(
780                SessionSqlStore::PATH,
781                &SESSION_SQL_DATA_STORE,
782                &SESSION_SQL_INDEX_STORE,
783            )
784            .expect("SQL session test store registration should succeed");
785            reg
786        };
787    }
788
789    static SESSION_SQL_DB: Db<SessionSqlCanister> = Db::new(&SESSION_SQL_STORE_REGISTRY);
790
791    ///
792    /// SessionSqlEntity
793    ///
794    /// Test entity used to lock end-to-end reduced SQL session behavior.
795    ///
796
797    #[derive(Clone, Debug, Default, Deserialize, FieldProjection, PartialEq, Serialize)]
798    struct SessionSqlEntity {
799        id: Ulid,
800        name: String,
801        age: u64,
802    }
803
804    crate::test_entity_schema! {
805        ident = SessionSqlEntity,
806        id = Ulid,
807        id_field = id,
808        entity_name = "SessionSqlEntity",
809        primary_key = "id",
810        pk_index = 0,
811        fields = [
812            ("id", FieldKind::Ulid),
813            ("name", FieldKind::Text),
814            ("age", FieldKind::Uint),
815        ],
816        indexes = [],
817        store = SessionSqlStore,
818        canister = SessionSqlCanister,
819    }
820
821    // Reset all session SQL fixture state between tests to preserve deterministic assertions.
822    fn reset_session_sql_store() {
823        init_commit_store_for_tests().expect("commit store init should succeed");
824        ensure_recovered(&SESSION_SQL_DB).expect("write-side recovery should succeed");
825        SESSION_SQL_DATA_STORE.with(|store| store.borrow_mut().clear());
826        SESSION_SQL_INDEX_STORE.with(|store| store.borrow_mut().clear());
827    }
828
829    fn sql_session() -> DbSession<SessionSqlCanister> {
830        DbSession::new(SESSION_SQL_DB)
831    }
832
833    // Assert query-surface cursor errors remain wrapped under QueryError::Plan(PlanError::Cursor).
834    fn assert_query_error_is_cursor_plan(
835        err: QueryError,
836        predicate: impl FnOnce(&CursorPlanError) -> bool,
837    ) {
838        assert!(matches!(
839            err,
840            QueryError::Plan(plan_err)
841                if matches!(
842                    plan_err.as_ref(),
843                    PlanError::Cursor(inner) if predicate(inner.as_ref())
844                )
845        ));
846    }
847
848    // Assert both session conversion paths preserve the same cursor-plan variant payload.
849    fn assert_cursor_mapping_parity(
850        build: impl Fn() -> CursorPlanError,
851        predicate: impl Fn(&CursorPlanError) -> bool + Copy,
852    ) {
853        let mapped_via_executor = map_executor_plan_error(ExecutorPlanError::from(build()));
854        assert_query_error_is_cursor_plan(mapped_via_executor, predicate);
855
856        let mapped_via_plan = QueryError::from(PlanError::from(build()));
857        assert_query_error_is_cursor_plan(mapped_via_plan, predicate);
858    }
859
860    #[test]
861    fn session_cursor_error_mapping_parity_boundary_arity() {
862        assert_cursor_mapping_parity(
863            || CursorPlanError::continuation_cursor_boundary_arity_mismatch(2, 1),
864            |inner| {
865                matches!(
866                    inner,
867                    CursorPlanError::ContinuationCursorBoundaryArityMismatch {
868                        expected: 2,
869                        found: 1
870                    }
871                )
872            },
873        );
874    }
875
876    #[test]
877    fn session_cursor_error_mapping_parity_window_mismatch() {
878        assert_cursor_mapping_parity(
879            || CursorPlanError::continuation_cursor_window_mismatch(8, 3),
880            |inner| {
881                matches!(
882                    inner,
883                    CursorPlanError::ContinuationCursorWindowMismatch {
884                        expected_offset: 8,
885                        actual_offset: 3
886                    }
887                )
888            },
889        );
890    }
891
892    #[test]
893    fn session_cursor_error_mapping_parity_decode_reason() {
894        assert_cursor_mapping_parity(
895            || {
896                CursorPlanError::invalid_continuation_cursor(
897                    crate::db::codec::cursor::CursorDecodeError::OddLength,
898                )
899            },
900            |inner| {
901                matches!(
902                    inner,
903                    CursorPlanError::InvalidContinuationCursor {
904                        reason: crate::db::codec::cursor::CursorDecodeError::OddLength
905                    }
906                )
907            },
908        );
909    }
910
911    #[test]
912    fn session_cursor_error_mapping_parity_primary_key_type_mismatch() {
913        assert_cursor_mapping_parity(
914            || {
915                CursorPlanError::continuation_cursor_primary_key_type_mismatch(
916                    "id",
917                    "ulid",
918                    Some(crate::value::Value::Text("not-a-ulid".to_string())),
919                )
920            },
921            |inner| {
922                matches!(
923                    inner,
924                    CursorPlanError::ContinuationCursorPrimaryKeyTypeMismatch {
925                        field,
926                        expected,
927                        value: Some(crate::value::Value::Text(value))
928                    } if field == "id" && expected == "ulid" && value == "not-a-ulid"
929                )
930            },
931        );
932    }
933
934    #[test]
935    fn session_cursor_error_mapping_parity_matrix_preserves_cursor_variants() {
936        // Keep one matrix-level canary test name so cross-module audit references remain stable.
937        assert_cursor_mapping_parity(
938            || CursorPlanError::continuation_cursor_boundary_arity_mismatch(2, 1),
939            |inner| {
940                matches!(
941                    inner,
942                    CursorPlanError::ContinuationCursorBoundaryArityMismatch {
943                        expected: 2,
944                        found: 1
945                    }
946                )
947            },
948        );
949    }
950
951    #[test]
952    fn execute_sql_select_star_honors_order_limit_offset() {
953        reset_session_sql_store();
954        let session = sql_session();
955
956        session
957            .insert(SessionSqlEntity {
958                id: Ulid::generate(),
959                name: "older".to_string(),
960                age: 37,
961            })
962            .expect("seed insert should succeed");
963        session
964            .insert(SessionSqlEntity {
965                id: Ulid::generate(),
966                name: "younger".to_string(),
967                age: 19,
968            })
969            .expect("seed insert should succeed");
970
971        let response = session
972            .execute_sql::<SessionSqlEntity>(
973                "SELECT * FROM SessionSqlEntity ORDER BY age ASC LIMIT 1 OFFSET 1",
974            )
975            .expect("SELECT * should execute");
976
977        assert_eq!(response.count(), 1, "window should return one row");
978        let row = response
979            .iter()
980            .next()
981            .expect("windowed result should include one row");
982        assert_eq!(
983            row.entity_ref().name,
984            "older",
985            "ordered window should return the second age-ordered row",
986        );
987    }
988
989    #[test]
990    fn execute_sql_delete_honors_predicate_order_and_limit() {
991        reset_session_sql_store();
992        let session = sql_session();
993
994        session
995            .insert(SessionSqlEntity {
996                id: Ulid::generate(),
997                name: "first-minor".to_string(),
998                age: 16,
999            })
1000            .expect("seed insert should succeed");
1001        session
1002            .insert(SessionSqlEntity {
1003                id: Ulid::generate(),
1004                name: "second-minor".to_string(),
1005                age: 17,
1006            })
1007            .expect("seed insert should succeed");
1008        session
1009            .insert(SessionSqlEntity {
1010                id: Ulid::generate(),
1011                name: "adult".to_string(),
1012                age: 42,
1013            })
1014            .expect("seed insert should succeed");
1015
1016        let deleted = session
1017            .execute_sql::<SessionSqlEntity>(
1018                "DELETE FROM SessionSqlEntity WHERE age < 20 ORDER BY age ASC LIMIT 1",
1019            )
1020            .expect("DELETE should execute");
1021
1022        assert_eq!(deleted.count(), 1, "delete limit should remove one row");
1023        assert_eq!(
1024            deleted
1025                .iter()
1026                .next()
1027                .expect("deleted row should exist")
1028                .entity_ref()
1029                .age,
1030            16,
1031            "ordered delete should remove the youngest matching row first",
1032        );
1033
1034        let remaining = session
1035            .load::<SessionSqlEntity>()
1036            .order_by("age")
1037            .execute()
1038            .expect("post-delete load should succeed");
1039        let remaining_ages = remaining
1040            .iter()
1041            .map(|row| row.entity_ref().age)
1042            .collect::<Vec<_>>();
1043
1044        assert_eq!(
1045            remaining_ages,
1046            vec![17, 42],
1047            "delete window semantics should preserve non-deleted rows",
1048        );
1049    }
1050
1051    #[test]
1052    fn query_from_sql_rejects_explain_statements() {
1053        reset_session_sql_store();
1054        let session = sql_session();
1055
1056        let err = session
1057            .query_from_sql::<SessionSqlEntity>("EXPLAIN SELECT * FROM SessionSqlEntity")
1058            .expect_err("query_from_sql must reject EXPLAIN statements");
1059
1060        assert!(
1061            matches!(
1062                err,
1063                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1064                    _
1065                ))
1066            ),
1067            "query_from_sql EXPLAIN rejection must map to unsupported execution class",
1068        );
1069    }
1070
1071    #[test]
1072    fn query_from_sql_select_field_projection_lowers_to_scalar_field_selection() {
1073        reset_session_sql_store();
1074        let session = sql_session();
1075
1076        let query = session
1077            .query_from_sql::<SessionSqlEntity>("SELECT name, age FROM SessionSqlEntity")
1078            .expect("field-list SQL query should lower");
1079        let projection = query
1080            .plan()
1081            .expect("field-list SQL plan should build")
1082            .projection_spec();
1083        let field_names = projection
1084            .fields()
1085            .map(|field| match field {
1086                ProjectionField::Scalar {
1087                    expr: Expr::Field(field),
1088                    alias: None,
1089                } => field.as_str().to_string(),
1090                other @ ProjectionField::Scalar { .. } => {
1091                    panic!("field-list SQL projection should lower to plain field exprs: {other:?}")
1092                }
1093            })
1094            .collect::<Vec<_>>();
1095
1096        assert_eq!(field_names, vec!["name".to_string(), "age".to_string()]);
1097    }
1098
1099    #[test]
1100    fn execute_sql_select_field_projection_currently_returns_entity_shaped_rows() {
1101        reset_session_sql_store();
1102        let session = sql_session();
1103
1104        session
1105            .insert(SessionSqlEntity {
1106                id: Ulid::generate(),
1107                name: "projected-row".to_string(),
1108                age: 29,
1109            })
1110            .expect("seed insert should succeed");
1111
1112        let response = session
1113            .execute_sql::<SessionSqlEntity>(
1114                "SELECT name FROM SessionSqlEntity ORDER BY age ASC LIMIT 1",
1115            )
1116            .expect("field-list SQL projection should execute");
1117        let row = response
1118            .iter()
1119            .next()
1120            .expect("field-list SQL projection response should contain one row");
1121
1122        assert_eq!(
1123            row.entity_ref().name,
1124            "projected-row",
1125            "field-list SQL projection should still return entity rows in this baseline",
1126        );
1127        assert_eq!(
1128            row.entity_ref().age,
1129            29,
1130            "field-list SQL projection should preserve full entity payload until projection response shaping is introduced",
1131        );
1132    }
1133
1134    #[test]
1135    fn execute_sql_projection_select_field_list_returns_projection_shaped_rows() {
1136        reset_session_sql_store();
1137        let session = sql_session();
1138
1139        session
1140            .insert(SessionSqlEntity {
1141                id: Ulid::generate(),
1142                name: "projection-surface".to_string(),
1143                age: 33,
1144            })
1145            .expect("seed insert should succeed");
1146
1147        let response = session
1148            .execute_sql_projection::<SessionSqlEntity>(
1149                "SELECT name FROM SessionSqlEntity ORDER BY age ASC LIMIT 1",
1150            )
1151            .expect("projection SQL execution should succeed");
1152        let row = response
1153            .iter()
1154            .next()
1155            .expect("projection SQL response should contain one row");
1156
1157        assert_eq!(response.count(), 1);
1158        assert_eq!(
1159            row.values(),
1160            [Value::Text("projection-surface".to_string())],
1161            "projection SQL response should carry only projected field values in declaration order",
1162        );
1163    }
1164
1165    #[test]
1166    fn execute_sql_projection_select_star_returns_all_fields_in_model_order() {
1167        reset_session_sql_store();
1168        let session = sql_session();
1169
1170        session
1171            .insert(SessionSqlEntity {
1172                id: Ulid::generate(),
1173                name: "projection-star".to_string(),
1174                age: 41,
1175            })
1176            .expect("seed insert should succeed");
1177
1178        let response = session
1179            .execute_sql_projection::<SessionSqlEntity>(
1180                "SELECT * FROM SessionSqlEntity ORDER BY age ASC LIMIT 1",
1181            )
1182            .expect("projection SQL star execution should succeed");
1183        let row = response
1184            .iter()
1185            .next()
1186            .expect("projection SQL star response should contain one row");
1187
1188        assert_eq!(response.count(), 1);
1189        assert_eq!(
1190            row.values().len(),
1191            3,
1192            "SELECT * projection response should include all model fields",
1193        );
1194        assert_eq!(row.values()[0], Value::Ulid(row.id().key()));
1195        assert_eq!(row.values()[1], Value::Text("projection-star".to_string()));
1196        assert_eq!(row.values()[2], Value::Uint(41));
1197    }
1198
1199    #[test]
1200    fn execute_sql_projection_rejects_delete_statements() {
1201        reset_session_sql_store();
1202        let session = sql_session();
1203
1204        let err = session
1205            .execute_sql_projection::<SessionSqlEntity>(
1206                "DELETE FROM SessionSqlEntity ORDER BY age LIMIT 1",
1207            )
1208            .expect_err("projection SQL execution should reject delete statements");
1209
1210        assert!(
1211            matches!(
1212                err,
1213                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1214                    _
1215                ))
1216            ),
1217            "projection SQL delete usage should fail as unsupported",
1218        );
1219    }
1220
1221    #[test]
1222    fn execute_sql_select_field_projection_unknown_field_fails_with_plan_error() {
1223        reset_session_sql_store();
1224        let session = sql_session();
1225
1226        let err = session
1227            .execute_sql::<SessionSqlEntity>("SELECT missing_field FROM SessionSqlEntity")
1228            .expect_err("unknown projected fields should fail planner validation");
1229
1230        assert!(
1231            matches!(err, QueryError::Plan(_)),
1232            "unknown projected fields should surface planner-domain query errors: {err:?}",
1233        );
1234    }
1235
1236    #[test]
1237    fn execute_sql_rejects_aggregate_projection_in_current_slice() {
1238        reset_session_sql_store();
1239        let session = sql_session();
1240
1241        let err = session
1242            .execute_sql::<SessionSqlEntity>("SELECT COUNT(*) FROM SessionSqlEntity")
1243            .expect_err("aggregate SQL projection should remain lowering-gated in this slice");
1244
1245        assert!(
1246            matches!(
1247                err,
1248                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1249                    _
1250                ))
1251            ),
1252            "aggregate projection gate should remain an unsupported execution error boundary",
1253        );
1254    }
1255
1256    #[test]
1257    fn execute_sql_rejects_group_by_in_current_slice() {
1258        reset_session_sql_store();
1259        let session = sql_session();
1260
1261        let err = session
1262            .execute_sql::<SessionSqlEntity>("SELECT * FROM SessionSqlEntity GROUP BY age")
1263            .expect_err("GROUP BY should be rejected in this slice");
1264
1265        assert!(
1266            matches!(
1267                err,
1268                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1269                    _
1270                ))
1271            ),
1272            "group-by gate should remain an unsupported execution error boundary",
1273        );
1274    }
1275
1276    #[test]
1277    fn explain_sql_execution_returns_descriptor_text() {
1278        reset_session_sql_store();
1279        let session = sql_session();
1280
1281        let explain = session
1282            .explain_sql::<SessionSqlEntity>(
1283                "EXPLAIN EXECUTION SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
1284            )
1285            .expect("EXPLAIN EXECUTION should succeed");
1286
1287        assert!(
1288            explain.contains("node_id=0"),
1289            "execution explain output should include the root descriptor node id",
1290        );
1291        assert!(
1292            explain.contains("layer="),
1293            "execution explain output should include execution layer annotations",
1294        );
1295    }
1296
1297    #[test]
1298    fn explain_sql_plan_returns_logical_plan_text() {
1299        reset_session_sql_store();
1300        let session = sql_session();
1301
1302        let explain = session
1303            .explain_sql::<SessionSqlEntity>(
1304                "EXPLAIN SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
1305            )
1306            .expect("EXPLAIN should succeed");
1307
1308        assert!(
1309            explain.contains("mode=Load"),
1310            "logical explain text should include query mode projection",
1311        );
1312        assert!(
1313            explain.contains("access="),
1314            "logical explain text should include projected access shape",
1315        );
1316    }
1317
1318    #[test]
1319    fn explain_sql_json_returns_logical_plan_json() {
1320        reset_session_sql_store();
1321        let session = sql_session();
1322
1323        let explain = session
1324            .explain_sql::<SessionSqlEntity>(
1325                "EXPLAIN JSON SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
1326            )
1327            .expect("EXPLAIN JSON should succeed");
1328
1329        assert!(
1330            explain.starts_with('{') && explain.ends_with('}'),
1331            "logical explain JSON should render one JSON object payload",
1332        );
1333        assert!(
1334            explain.contains("\"mode\":{\"type\":\"Load\""),
1335            "logical explain JSON should expose structured query mode metadata",
1336        );
1337        assert!(
1338            explain.contains("\"access\":"),
1339            "logical explain JSON should include projected access metadata",
1340        );
1341    }
1342
1343    #[test]
1344    fn explain_sql_json_delete_returns_logical_delete_mode() {
1345        reset_session_sql_store();
1346        let session = sql_session();
1347
1348        let explain = session
1349            .explain_sql::<SessionSqlEntity>(
1350                "EXPLAIN JSON DELETE FROM SessionSqlEntity ORDER BY age LIMIT 1",
1351            )
1352            .expect("EXPLAIN JSON DELETE should succeed");
1353
1354        assert!(
1355            explain.contains("\"mode\":{\"type\":\"Delete\""),
1356            "logical explain JSON should expose delete query mode metadata",
1357        );
1358    }
1359
1360    #[test]
1361    fn explain_sql_rejects_non_explain_statements() {
1362        reset_session_sql_store();
1363        let session = sql_session();
1364
1365        let err = session
1366            .explain_sql::<SessionSqlEntity>("SELECT * FROM SessionSqlEntity")
1367            .expect_err("explain_sql must reject non-EXPLAIN statements");
1368
1369        assert!(
1370            matches!(
1371                err,
1372                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1373                    _
1374                ))
1375            ),
1376            "non-EXPLAIN input must fail as unsupported explain usage",
1377        );
1378    }
1379}