Skip to main content

icydb_core/db/
session.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6#[cfg(test)]
7use crate::db::{DataStore, IndexStore};
8use crate::{
9    db::{
10        Db, EntityResponse, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
11        MissingRowPolicy, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace, PlanError,
12        ProjectionResponse, Query, QueryError, QueryTracePlan, StorageReport, StoreRegistry,
13        TraceExecutionStrategy, WriteBatchResponse,
14        access::AccessStrategy,
15        commit::EntityRuntimeHooks,
16        cursor::decode_optional_cursor_token,
17        executor::{
18            DeleteExecutor, ExecutablePlan, ExecutionStrategy, ExecutorPlanError, LoadExecutor,
19            SaveExecutor,
20        },
21        query::{
22            builder::aggregate::{AggregateExpr, avg, count, count_by, max_by, min_by, sum},
23            explain::ExplainAggregateTerminalPlan,
24            intent::IntentError,
25            plan::{FieldSlot, QueryMode},
26        },
27        schema::{describe_entity_model, show_indexes_for_model},
28        sql::lowering::{
29            SqlCommand, SqlGlobalAggregateCommand, SqlGlobalAggregateTerminal, SqlLoweringError,
30            compile_sql_command, compile_sql_global_aggregate_command,
31        },
32        sql::parser::SqlExplainMode,
33    },
34    error::{ErrorClass, ErrorOrigin, InternalError},
35    metrics::sink::{MetricsSink, with_metrics_sink},
36    traits::{CanisterKind, EntityKind, EntityValue},
37    value::Value,
38};
39use std::thread::LocalKey;
40
41// Map executor-owned plan-surface failures into query-owned plan errors.
42fn map_executor_plan_error(err: ExecutorPlanError) -> QueryError {
43    match err {
44        ExecutorPlanError::Cursor(err) => QueryError::from(PlanError::from(*err)),
45    }
46}
47
48// Decode one optional external cursor token and map decode failures into the
49// query-plan cursor error boundary.
50fn decode_optional_cursor_bytes(cursor_token: Option<&str>) -> Result<Option<Vec<u8>>, QueryError> {
51    decode_optional_cursor_token(cursor_token).map_err(|err| QueryError::from(PlanError::from(err)))
52}
53
54// Map SQL frontend parse/lowering failures into query-facing execution errors.
55fn map_sql_lowering_error(err: SqlLoweringError) -> QueryError {
56    match err {
57        SqlLoweringError::Query(err) => err,
58        SqlLoweringError::Parse(crate::db::sql::parser::SqlParseError::UnsupportedFeature {
59            feature,
60        }) => QueryError::execute(InternalError::query_unsupported_sql_feature(feature)),
61        other => QueryError::execute(InternalError::classified(
62            ErrorClass::Unsupported,
63            ErrorOrigin::Query,
64            format!("SQL query is not executable in this release: {other}"),
65        )),
66    }
67}
68
69// Resolve one aggregate target field through planner slot contracts before
70// aggregate terminal execution.
71fn resolve_sql_aggregate_target_slot<E: EntityKind>(field: &str) -> Result<FieldSlot, QueryError> {
72    FieldSlot::resolve(E::MODEL, field).ok_or_else(|| {
73        QueryError::execute(crate::db::error::executor_unsupported(format!(
74            "unknown aggregate target field: {field}",
75        )))
76    })
77}
78
79// Convert one lowered global SQL aggregate terminal into aggregate expression
80// contracts used by aggregate explain execution descriptors.
81fn sql_global_aggregate_terminal_to_expr<E: EntityKind>(
82    terminal: &SqlGlobalAggregateTerminal,
83) -> Result<AggregateExpr, QueryError> {
84    match terminal {
85        SqlGlobalAggregateTerminal::CountRows => Ok(count()),
86        SqlGlobalAggregateTerminal::CountField(field) => {
87            let _ = resolve_sql_aggregate_target_slot::<E>(field)?;
88
89            Ok(count_by(field.as_str()))
90        }
91        SqlGlobalAggregateTerminal::SumField(field) => {
92            let _ = resolve_sql_aggregate_target_slot::<E>(field)?;
93
94            Ok(sum(field.as_str()))
95        }
96        SqlGlobalAggregateTerminal::AvgField(field) => {
97            let _ = resolve_sql_aggregate_target_slot::<E>(field)?;
98
99            Ok(avg(field.as_str()))
100        }
101        SqlGlobalAggregateTerminal::MinField(field) => {
102            let _ = resolve_sql_aggregate_target_slot::<E>(field)?;
103
104            Ok(min_by(field.as_str()))
105        }
106        SqlGlobalAggregateTerminal::MaxField(field) => {
107            let _ = resolve_sql_aggregate_target_slot::<E>(field)?;
108
109            Ok(max_by(field.as_str()))
110        }
111    }
112}
113
114///
115/// DbSession
116///
117/// Session-scoped database handle with policy (debug, metrics) and execution routing.
118///
119
120pub struct DbSession<C: CanisterKind> {
121    db: Db<C>,
122    debug: bool,
123    metrics: Option<&'static dyn MetricsSink>,
124}
125
126impl<C: CanisterKind> DbSession<C> {
127    /// Construct one session facade for a database handle.
128    #[must_use]
129    pub(crate) const fn new(db: Db<C>) -> Self {
130        Self {
131            db,
132            debug: false,
133            metrics: None,
134        }
135    }
136
137    /// Construct one session facade from store registry and runtime hooks.
138    #[must_use]
139    pub const fn new_with_hooks(
140        store: &'static LocalKey<StoreRegistry>,
141        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
142    ) -> Self {
143        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
144    }
145
146    /// Enable debug execution behavior where supported by executors.
147    #[must_use]
148    pub const fn debug(mut self) -> Self {
149        self.debug = true;
150        self
151    }
152
153    /// Attach one metrics sink for all session-executed operations.
154    #[must_use]
155    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
156        self.metrics = Some(sink);
157        self
158    }
159
160    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
161        if let Some(sink) = self.metrics {
162            with_metrics_sink(sink, f)
163        } else {
164            f()
165        }
166    }
167
168    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
169    fn execute_save_with<E, T, R>(
170        &self,
171        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
172        map: impl FnOnce(T) -> R,
173    ) -> Result<R, InternalError>
174    where
175        E: EntityKind<Canister = C> + EntityValue,
176    {
177        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
178
179        Ok(map(value))
180    }
181
182    // Shared save-facade wrappers keep response shape explicit at call sites.
183    fn execute_save_entity<E>(
184        &self,
185        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
186    ) -> Result<E, InternalError>
187    where
188        E: EntityKind<Canister = C> + EntityValue,
189    {
190        self.execute_save_with(op, std::convert::identity)
191    }
192
193    fn execute_save_batch<E>(
194        &self,
195        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
196    ) -> Result<WriteBatchResponse<E>, InternalError>
197    where
198        E: EntityKind<Canister = C> + EntityValue,
199    {
200        self.execute_save_with(op, WriteBatchResponse::new)
201    }
202
203    fn execute_save_view<E>(
204        &self,
205        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
206    ) -> Result<E::ViewType, InternalError>
207    where
208        E: EntityKind<Canister = C> + EntityValue,
209    {
210        self.execute_save_with(op, std::convert::identity)
211    }
212
213    // ---------------------------------------------------------------------
214    // Query entry points (public, fluent)
215    // ---------------------------------------------------------------------
216
217    /// Start a fluent load query with default missing-row policy (`Ignore`).
218    #[must_use]
219    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
220    where
221        E: EntityKind<Canister = C>,
222    {
223        FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
224    }
225
226    /// Start a fluent load query with explicit missing-row policy.
227    #[must_use]
228    pub const fn load_with_consistency<E>(
229        &self,
230        consistency: MissingRowPolicy,
231    ) -> FluentLoadQuery<'_, E>
232    where
233        E: EntityKind<Canister = C>,
234    {
235        FluentLoadQuery::new(self, Query::new(consistency))
236    }
237
238    /// Build one typed query intent from one reduced SQL statement.
239    ///
240    /// This parser/lowering entrypoint is intentionally constrained to the
241    /// executable subset wired in the current release.
242    pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
243    where
244        E: EntityKind<Canister = C>,
245    {
246        let command = compile_sql_command::<E>(sql, MissingRowPolicy::Ignore)
247            .map_err(map_sql_lowering_error)?;
248
249        match command {
250            SqlCommand::Query(query) => Ok(query),
251            SqlCommand::Explain { .. } | SqlCommand::ExplainGlobalAggregate { .. } => {
252                Err(QueryError::execute(InternalError::classified(
253                    ErrorClass::Unsupported,
254                    ErrorOrigin::Query,
255                    "query_from_sql does not accept EXPLAIN statements; use explain_sql(...)",
256                )))
257            }
258        }
259    }
260
261    /// Execute one reduced SQL `SELECT`/`DELETE` statement for entity `E`.
262    pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
263    where
264        E: EntityKind<Canister = C> + EntityValue,
265    {
266        let query = self.query_from_sql::<E>(sql)?;
267        if query.has_grouping() {
268            return Err(QueryError::Intent(
269                IntentError::GroupedRequiresExecuteGrouped,
270            ));
271        }
272
273        self.execute_query(&query)
274    }
275
276    /// Execute one reduced SQL `SELECT` statement and return projection-shaped rows.
277    ///
278    /// This surface keeps `execute_sql(...)` backwards-compatible for callers
279    /// that currently consume full entity rows.
280    pub fn execute_sql_projection<E>(&self, sql: &str) -> Result<ProjectionResponse<E>, QueryError>
281    where
282        E: EntityKind<Canister = C> + EntityValue,
283    {
284        let query = self.query_from_sql::<E>(sql)?;
285        if query.has_grouping() {
286            return Err(QueryError::Intent(
287                IntentError::GroupedRequiresExecuteGrouped,
288            ));
289        }
290
291        match query.mode() {
292            QueryMode::Load(_) => {
293                self.execute_load_query_with(&query, |load, plan| load.execute_projection(plan))
294            }
295            QueryMode::Delete(_) => Err(QueryError::execute(InternalError::classified(
296                ErrorClass::Unsupported,
297                ErrorOrigin::Query,
298                "execute_sql_projection only supports SELECT statements",
299            ))),
300        }
301    }
302
303    /// Execute one reduced SQL global aggregate `SELECT` statement.
304    ///
305    /// This entrypoint is intentionally constrained to one aggregate terminal
306    /// shape per statement and preserves existing terminal semantics.
307    pub fn execute_sql_aggregate<E>(&self, sql: &str) -> Result<Value, QueryError>
308    where
309        E: EntityKind<Canister = C> + EntityValue,
310    {
311        let command = compile_sql_global_aggregate_command::<E>(sql, MissingRowPolicy::Ignore)
312            .map_err(map_sql_lowering_error)?;
313
314        match command.terminal() {
315            SqlGlobalAggregateTerminal::CountRows => self
316                .execute_load_query_with(command.query(), |load, plan| load.aggregate_count(plan))
317                .map(|count| Value::Uint(u64::from(count))),
318            SqlGlobalAggregateTerminal::CountField(field) => {
319                let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
320                self.execute_load_query_with(command.query(), |load, plan| {
321                    load.values_by_slot(plan, target_slot)
322                })
323                .map(|values| {
324                    let count = values
325                        .into_iter()
326                        .filter(|value| !matches!(value, Value::Null))
327                        .count();
328                    Value::Uint(u64::try_from(count).unwrap_or(u64::MAX))
329                })
330            }
331            SqlGlobalAggregateTerminal::SumField(field) => {
332                let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
333                self.execute_load_query_with(command.query(), |load, plan| {
334                    load.aggregate_sum_by_slot(plan, target_slot)
335                })
336                .map(|value| value.map_or(Value::Null, Value::Decimal))
337            }
338            SqlGlobalAggregateTerminal::AvgField(field) => {
339                let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
340                self.execute_load_query_with(command.query(), |load, plan| {
341                    load.aggregate_avg_by_slot(plan, target_slot)
342                })
343                .map(|value| value.map_or(Value::Null, Value::Decimal))
344            }
345            SqlGlobalAggregateTerminal::MinField(field) => {
346                let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
347                let min_id = self.execute_load_query_with(command.query(), |load, plan| {
348                    load.aggregate_min_by_slot(plan, target_slot)
349                })?;
350
351                match min_id {
352                    Some(id) => self
353                        .load::<E>()
354                        .by_id(id)
355                        .first_value_by(field)
356                        .map(|value| value.unwrap_or(Value::Null)),
357                    None => Ok(Value::Null),
358                }
359            }
360            SqlGlobalAggregateTerminal::MaxField(field) => {
361                let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
362                let max_id = self.execute_load_query_with(command.query(), |load, plan| {
363                    load.aggregate_max_by_slot(plan, target_slot)
364                })?;
365
366                match max_id {
367                    Some(id) => self
368                        .load::<E>()
369                        .by_id(id)
370                        .first_value_by(field)
371                        .map(|value| value.unwrap_or(Value::Null)),
372                    None => Ok(Value::Null),
373                }
374            }
375        }
376    }
377
378    /// Execute one reduced SQL grouped `SELECT` statement and return grouped rows.
379    pub fn execute_sql_grouped<E>(
380        &self,
381        sql: &str,
382        cursor_token: Option<&str>,
383    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
384    where
385        E: EntityKind<Canister = C> + EntityValue,
386    {
387        let query = self.query_from_sql::<E>(sql)?;
388        if !query.has_grouping() {
389            return Err(QueryError::execute(InternalError::classified(
390                ErrorClass::Unsupported,
391                ErrorOrigin::Query,
392                "execute_sql_grouped requires grouped SQL query intent",
393            )));
394        }
395
396        self.execute_grouped(&query, cursor_token)
397    }
398
399    /// Explain one reduced SQL statement for entity `E`.
400    ///
401    /// Supported modes:
402    /// - `EXPLAIN ...` -> logical plan text
403    /// - `EXPLAIN EXECUTION ...` -> execution descriptor text
404    /// - `EXPLAIN JSON ...` -> logical plan canonical JSON
405    pub fn explain_sql<E>(&self, sql: &str) -> Result<String, QueryError>
406    where
407        E: EntityKind<Canister = C> + EntityValue,
408    {
409        let command = compile_sql_command::<E>(sql, MissingRowPolicy::Ignore)
410            .map_err(map_sql_lowering_error)?;
411
412        match command {
413            SqlCommand::Query(_) => Err(QueryError::execute(InternalError::classified(
414                ErrorClass::Unsupported,
415                ErrorOrigin::Query,
416                "explain_sql requires an EXPLAIN statement",
417            ))),
418            SqlCommand::Explain { mode, query } => match mode {
419                SqlExplainMode::Plan => Ok(query.explain()?.render_text_canonical()),
420                SqlExplainMode::Execution => query.explain_execution_text(),
421                SqlExplainMode::Json => Ok(query.explain()?.render_json_canonical()),
422            },
423            SqlCommand::ExplainGlobalAggregate { mode, command } => {
424                Self::explain_sql_global_aggregate::<E>(mode, command)
425            }
426        }
427    }
428
429    // Render one EXPLAIN payload for constrained global aggregate SQL command.
430    fn explain_sql_global_aggregate<E>(
431        mode: SqlExplainMode,
432        command: SqlGlobalAggregateCommand<E>,
433    ) -> Result<String, QueryError>
434    where
435        E: EntityKind<Canister = C> + EntityValue,
436    {
437        match mode {
438            SqlExplainMode::Plan => {
439                // Keep explain validation parity with execution by requiring the
440                // target field to resolve before returning explain output.
441                let _ = sql_global_aggregate_terminal_to_expr::<E>(command.terminal())?;
442
443                Ok(command.query().explain()?.render_text_canonical())
444            }
445            SqlExplainMode::Execution => {
446                let aggregate = sql_global_aggregate_terminal_to_expr::<E>(command.terminal())?;
447                let plan = Self::explain_load_query_terminal_with(command.query(), aggregate)?;
448
449                Ok(plan.execution_node_descriptor().render_text_tree())
450            }
451            SqlExplainMode::Json => {
452                // Keep explain validation parity with execution by requiring the
453                // target field to resolve before returning explain output.
454                let _ = sql_global_aggregate_terminal_to_expr::<E>(command.terminal())?;
455
456                Ok(command.query().explain()?.render_json_canonical())
457            }
458        }
459    }
460
461    /// Start a fluent delete query with default missing-row policy (`Ignore`).
462    #[must_use]
463    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
464    where
465        E: EntityKind<Canister = C>,
466    {
467        FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
468    }
469
470    /// Start a fluent delete query with explicit missing-row policy.
471    #[must_use]
472    pub fn delete_with_consistency<E>(
473        &self,
474        consistency: MissingRowPolicy,
475    ) -> FluentDeleteQuery<'_, E>
476    where
477        E: EntityKind<Canister = C>,
478    {
479        FluentDeleteQuery::new(self, Query::new(consistency).delete())
480    }
481
482    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
483    ///
484    /// This terminal bypasses query planning and access routing entirely.
485    #[must_use]
486    pub const fn select_one(&self) -> Value {
487        Value::Int(1)
488    }
489
490    /// Return one stable, human-readable index listing for the entity schema.
491    ///
492    /// Output format mirrors SQL-style introspection:
493    /// - `PRIMARY KEY (field)`
494    /// - `INDEX name (field_a, field_b)`
495    /// - `UNIQUE INDEX name (field_a, field_b)`
496    #[must_use]
497    pub fn show_indexes<E>(&self) -> Vec<String>
498    where
499        E: EntityKind<Canister = C>,
500    {
501        show_indexes_for_model(E::MODEL)
502    }
503
504    /// Return one structured schema description for the entity.
505    ///
506    /// This is a typed `DESCRIBE`-style introspection surface consumed by
507    /// developer tooling and pre-EXPLAIN debugging.
508    #[must_use]
509    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
510    where
511        E: EntityKind<Canister = C>,
512    {
513        describe_entity_model(E::MODEL)
514    }
515
516    /// Build one point-in-time storage report for observability endpoints.
517    pub fn storage_report(
518        &self,
519        name_to_path: &[(&'static str, &'static str)],
520    ) -> Result<StorageReport, InternalError> {
521        self.db.storage_report(name_to_path)
522    }
523
524    // ---------------------------------------------------------------------
525    // Low-level executors (crate-internal; execution primitives)
526    // ---------------------------------------------------------------------
527
528    #[must_use]
529    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
530    where
531        E: EntityKind<Canister = C> + EntityValue,
532    {
533        LoadExecutor::new(self.db, self.debug)
534    }
535
536    #[must_use]
537    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
538    where
539        E: EntityKind<Canister = C> + EntityValue,
540    {
541        DeleteExecutor::new(self.db, self.debug)
542    }
543
544    #[must_use]
545    pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
546    where
547        E: EntityKind<Canister = C> + EntityValue,
548    {
549        SaveExecutor::new(self.db, self.debug)
550    }
551
552    // ---------------------------------------------------------------------
553    // Query diagnostics / execution (internal routing)
554    // ---------------------------------------------------------------------
555
556    /// Execute one scalar load/delete query and return materialized response rows.
557    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
558    where
559        E: EntityKind<Canister = C> + EntityValue,
560    {
561        let plan = query.plan()?.into_executable();
562
563        let result = match query.mode() {
564            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
565            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
566        };
567
568        result.map_err(QueryError::execute)
569    }
570
571    // Shared load-query terminal wrapper: build plan, run under metrics, map
572    // execution errors into query-facing errors.
573    pub(in crate::db) fn execute_load_query_with<E, T>(
574        &self,
575        query: &Query<E>,
576        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
577    ) -> Result<T, QueryError>
578    where
579        E: EntityKind<Canister = C> + EntityValue,
580    {
581        let plan = query.plan()?.into_executable();
582
583        self.with_metrics(|| op(self.load_executor::<E>(), plan))
584            .map_err(QueryError::execute)
585    }
586
587    /// Build one trace payload for a query without executing it.
588    ///
589    /// This lightweight surface is intended for developer diagnostics:
590    /// plan hash, access strategy summary, and planner/executor route shape.
591    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
592    where
593        E: EntityKind<Canister = C>,
594    {
595        let compiled = query.plan()?;
596        let explain = compiled.explain();
597        let plan_hash = compiled.plan_hash_hex();
598
599        let executable = compiled.into_executable();
600        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
601        let execution_strategy = match query.mode() {
602            QueryMode::Load(_) => Some(trace_execution_strategy(
603                executable
604                    .execution_strategy()
605                    .map_err(QueryError::execute)?,
606            )),
607            QueryMode::Delete(_) => None,
608        };
609
610        Ok(QueryTracePlan::new(
611            plan_hash,
612            access_strategy,
613            execution_strategy,
614            explain,
615        ))
616    }
617
618    /// Build one aggregate-terminal explain payload without executing the query.
619    pub(crate) fn explain_load_query_terminal_with<E>(
620        query: &Query<E>,
621        aggregate: AggregateExpr,
622    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
623    where
624        E: EntityKind<Canister = C> + EntityValue,
625    {
626        // Phase 1: build one compiled query once and project logical explain output.
627        let compiled = query.plan()?;
628        let query_explain = compiled.explain();
629        let terminal = aggregate.kind();
630
631        // Phase 2: derive the executor route label for this aggregate terminal.
632        let executable = compiled.into_executable();
633        let execution = executable.explain_aggregate_terminal_execution_descriptor(aggregate);
634
635        Ok(ExplainAggregateTerminalPlan::new(
636            query_explain,
637            terminal,
638            execution,
639        ))
640    }
641
642    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
643    pub(crate) fn execute_load_query_paged_with_trace<E>(
644        &self,
645        query: &Query<E>,
646        cursor_token: Option<&str>,
647    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
648    where
649        E: EntityKind<Canister = C> + EntityValue,
650    {
651        // Phase 1: build/validate executable plan and reject grouped plans.
652        let plan = query.plan()?.into_executable();
653        match plan.execution_strategy().map_err(QueryError::execute)? {
654            ExecutionStrategy::PrimaryKey => {
655                return Err(QueryError::execute(
656                    crate::db::error::query_executor_invariant(
657                        "cursor pagination requires explicit or grouped ordering",
658                    ),
659                ));
660            }
661            ExecutionStrategy::Ordered => {}
662            ExecutionStrategy::Grouped => {
663                return Err(QueryError::execute(
664                    crate::db::error::query_executor_invariant(
665                        "grouped plans require execute_grouped(...)",
666                    ),
667                ));
668            }
669        }
670
671        // Phase 2: decode external cursor token and validate it against plan surface.
672        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
673        let cursor = plan
674            .prepare_cursor(cursor_bytes.as_deref())
675            .map_err(map_executor_plan_error)?;
676
677        // Phase 3: execute one traced page and encode outbound continuation token.
678        let (page, trace) = self
679            .with_metrics(|| {
680                self.load_executor::<E>()
681                    .execute_paged_with_cursor_traced(plan, cursor)
682            })
683            .map_err(QueryError::execute)?;
684        let next_cursor = page
685            .next_cursor
686            .map(|token| {
687                let Some(token) = token.as_scalar() else {
688                    return Err(QueryError::execute(
689                        crate::db::error::query_executor_invariant(
690                            "scalar load pagination emitted grouped continuation token",
691                        ),
692                    ));
693                };
694
695                token.encode().map_err(|err| {
696                    QueryError::execute(InternalError::serialize_internal(format!(
697                        "failed to serialize continuation cursor: {err}"
698                    )))
699                })
700            })
701            .transpose()?;
702
703        Ok(PagedLoadExecutionWithTrace::new(
704            page.items,
705            next_cursor,
706            trace,
707        ))
708    }
709
710    /// Execute one grouped query page with optional grouped continuation cursor.
711    ///
712    /// This is the explicit grouped execution boundary; scalar load APIs reject
713    /// grouped plans to preserve scalar response contracts.
714    pub fn execute_grouped<E>(
715        &self,
716        query: &Query<E>,
717        cursor_token: Option<&str>,
718    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
719    where
720        E: EntityKind<Canister = C> + EntityValue,
721    {
722        // Phase 1: build/validate executable plan and require grouped shape.
723        let plan = query.plan()?.into_executable();
724        if !matches!(
725            plan.execution_strategy().map_err(QueryError::execute)?,
726            ExecutionStrategy::Grouped
727        ) {
728            return Err(QueryError::execute(
729                crate::db::error::query_executor_invariant(
730                    "execute_grouped requires grouped logical plans",
731                ),
732            ));
733        }
734
735        // Phase 2: decode external grouped cursor token and validate against plan.
736        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
737        let cursor = plan
738            .prepare_grouped_cursor(cursor_bytes.as_deref())
739            .map_err(map_executor_plan_error)?;
740
741        // Phase 3: execute grouped page and encode outbound grouped continuation token.
742        let (page, trace) = self
743            .with_metrics(|| {
744                self.load_executor::<E>()
745                    .execute_grouped_paged_with_cursor_traced(plan, cursor)
746            })
747            .map_err(QueryError::execute)?;
748        let next_cursor = page
749            .next_cursor
750            .map(|token| {
751                let Some(token) = token.as_grouped() else {
752                    return Err(QueryError::execute(
753                        crate::db::error::query_executor_invariant(
754                            "grouped pagination emitted scalar continuation token",
755                        ),
756                    ));
757                };
758
759                token.encode().map_err(|err| {
760                    QueryError::execute(InternalError::serialize_internal(format!(
761                        "failed to serialize grouped continuation cursor: {err}"
762                    )))
763                })
764            })
765            .transpose()?;
766
767        Ok(PagedGroupedExecutionWithTrace::new(
768            page.rows,
769            next_cursor,
770            trace,
771        ))
772    }
773
774    // ---------------------------------------------------------------------
775    // High-level write API (public, intent-level)
776    // ---------------------------------------------------------------------
777
778    /// Insert one entity row.
779    pub fn insert<E>(&self, entity: E) -> Result<E, InternalError>
780    where
781        E: EntityKind<Canister = C> + EntityValue,
782    {
783        self.execute_save_entity(|save| save.insert(entity))
784    }
785
786    /// Insert a single-entity-type batch atomically in one commit window.
787    ///
788    /// If any item fails pre-commit validation, no row in the batch is persisted.
789    ///
790    /// This API is not a multi-entity transaction surface.
791    pub fn insert_many_atomic<E>(
792        &self,
793        entities: impl IntoIterator<Item = E>,
794    ) -> Result<WriteBatchResponse<E>, InternalError>
795    where
796        E: EntityKind<Canister = C> + EntityValue,
797    {
798        self.execute_save_batch(|save| save.insert_many_atomic(entities))
799    }
800
801    /// Insert a batch with explicitly non-atomic semantics.
802    ///
803    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
804    pub fn insert_many_non_atomic<E>(
805        &self,
806        entities: impl IntoIterator<Item = E>,
807    ) -> Result<WriteBatchResponse<E>, InternalError>
808    where
809        E: EntityKind<Canister = C> + EntityValue,
810    {
811        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
812    }
813
814    /// Replace one existing entity row.
815    pub fn replace<E>(&self, entity: E) -> Result<E, InternalError>
816    where
817        E: EntityKind<Canister = C> + EntityValue,
818    {
819        self.execute_save_entity(|save| save.replace(entity))
820    }
821
822    /// Replace a single-entity-type batch atomically in one commit window.
823    ///
824    /// If any item fails pre-commit validation, no row in the batch is persisted.
825    ///
826    /// This API is not a multi-entity transaction surface.
827    pub fn replace_many_atomic<E>(
828        &self,
829        entities: impl IntoIterator<Item = E>,
830    ) -> Result<WriteBatchResponse<E>, InternalError>
831    where
832        E: EntityKind<Canister = C> + EntityValue,
833    {
834        self.execute_save_batch(|save| save.replace_many_atomic(entities))
835    }
836
837    /// Replace a batch with explicitly non-atomic semantics.
838    ///
839    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
840    pub fn replace_many_non_atomic<E>(
841        &self,
842        entities: impl IntoIterator<Item = E>,
843    ) -> Result<WriteBatchResponse<E>, InternalError>
844    where
845        E: EntityKind<Canister = C> + EntityValue,
846    {
847        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
848    }
849
850    /// Update one existing entity row.
851    pub fn update<E>(&self, entity: E) -> Result<E, InternalError>
852    where
853        E: EntityKind<Canister = C> + EntityValue,
854    {
855        self.execute_save_entity(|save| save.update(entity))
856    }
857
858    /// Update a single-entity-type batch atomically in one commit window.
859    ///
860    /// If any item fails pre-commit validation, no row in the batch is persisted.
861    ///
862    /// This API is not a multi-entity transaction surface.
863    pub fn update_many_atomic<E>(
864        &self,
865        entities: impl IntoIterator<Item = E>,
866    ) -> Result<WriteBatchResponse<E>, InternalError>
867    where
868        E: EntityKind<Canister = C> + EntityValue,
869    {
870        self.execute_save_batch(|save| save.update_many_atomic(entities))
871    }
872
873    /// Update a batch with explicitly non-atomic semantics.
874    ///
875    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
876    pub fn update_many_non_atomic<E>(
877        &self,
878        entities: impl IntoIterator<Item = E>,
879    ) -> Result<WriteBatchResponse<E>, InternalError>
880    where
881        E: EntityKind<Canister = C> + EntityValue,
882    {
883        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
884    }
885
886    /// Insert one view value and return the stored view.
887    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
888    where
889        E: EntityKind<Canister = C> + EntityValue,
890    {
891        self.execute_save_view::<E>(|save| save.insert_view(view))
892    }
893
894    /// Replace one view value and return the stored view.
895    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
896    where
897        E: EntityKind<Canister = C> + EntityValue,
898    {
899        self.execute_save_view::<E>(|save| save.replace_view(view))
900    }
901
902    /// Update one view value and return the stored view.
903    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
904    where
905        E: EntityKind<Canister = C> + EntityValue,
906    {
907        self.execute_save_view::<E>(|save| save.update_view(view))
908    }
909
910    /// TEST ONLY: clear all registered data and index stores for this database.
911    #[cfg(test)]
912    #[doc(hidden)]
913    pub fn clear_stores_for_tests(&self) {
914        self.db.with_store_registry(|reg| {
915            // Test cleanup only: clearing all stores is set-like and does not
916            // depend on registry iteration order.
917            for (_, store) in reg.iter() {
918                store.with_data_mut(DataStore::clear);
919                store.with_index_mut(IndexStore::clear);
920            }
921        });
922    }
923}
924
925const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
926    match strategy {
927        ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
928        ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
929        ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
930    }
931}
932
933///
934/// TESTS
935///
936
937#[cfg(test)]
938mod tests {
939    use super::*;
940    use crate::{
941        db::{
942            Db,
943            commit::{ensure_recovered, init_commit_store_for_tests},
944            cursor::CursorPlanError,
945            data::DataStore,
946            index::IndexStore,
947            query::plan::expr::{Expr, ProjectionField},
948            registry::StoreRegistry,
949        },
950        error::{ErrorClass, ErrorDetail, ErrorOrigin, QueryErrorDetail},
951        model::field::FieldKind,
952        testing::test_memory,
953        traits::Path,
954        types::Ulid,
955        value::Value,
956    };
957    use icydb_derive::FieldProjection;
958    use serde::{Deserialize, Serialize};
959    use std::cell::RefCell;
960
961    crate::test_canister! {
962        ident = SessionSqlCanister,
963        commit_memory_id = crate::testing::test_commit_memory_id(),
964    }
965
966    crate::test_store! {
967        ident = SessionSqlStore,
968        canister = SessionSqlCanister,
969    }
970
971    thread_local! {
972        static SESSION_SQL_DATA_STORE: RefCell<DataStore> =
973            RefCell::new(DataStore::init(test_memory(160)));
974        static SESSION_SQL_INDEX_STORE: RefCell<IndexStore> =
975            RefCell::new(IndexStore::init(test_memory(161)));
976        static SESSION_SQL_STORE_REGISTRY: StoreRegistry = {
977            let mut reg = StoreRegistry::new();
978            reg.register_store(
979                SessionSqlStore::PATH,
980                &SESSION_SQL_DATA_STORE,
981                &SESSION_SQL_INDEX_STORE,
982            )
983            .expect("SQL session test store registration should succeed");
984            reg
985        };
986    }
987
988    static SESSION_SQL_DB: Db<SessionSqlCanister> = Db::new(&SESSION_SQL_STORE_REGISTRY);
989
990    ///
991    /// SessionSqlEntity
992    ///
993    /// Test entity used to lock end-to-end reduced SQL session behavior.
994    ///
995
996    #[derive(Clone, Debug, Default, Deserialize, FieldProjection, PartialEq, Serialize)]
997    struct SessionSqlEntity {
998        id: Ulid,
999        name: String,
1000        age: u64,
1001    }
1002
1003    crate::test_entity_schema! {
1004        ident = SessionSqlEntity,
1005        id = Ulid,
1006        id_field = id,
1007        entity_name = "SessionSqlEntity",
1008        primary_key = "id",
1009        pk_index = 0,
1010        fields = [
1011            ("id", FieldKind::Ulid),
1012            ("name", FieldKind::Text),
1013            ("age", FieldKind::Uint),
1014        ],
1015        indexes = [],
1016        store = SessionSqlStore,
1017        canister = SessionSqlCanister,
1018    }
1019
1020    // Reset all session SQL fixture state between tests to preserve deterministic assertions.
1021    fn reset_session_sql_store() {
1022        init_commit_store_for_tests().expect("commit store init should succeed");
1023        ensure_recovered(&SESSION_SQL_DB).expect("write-side recovery should succeed");
1024        SESSION_SQL_DATA_STORE.with(|store| store.borrow_mut().clear());
1025        SESSION_SQL_INDEX_STORE.with(|store| store.borrow_mut().clear());
1026    }
1027
1028    fn sql_session() -> DbSession<SessionSqlCanister> {
1029        DbSession::new(SESSION_SQL_DB)
1030    }
1031
1032    // Seed one deterministic SQL fixture dataset used by matrix tests.
1033    fn seed_session_sql_entities(
1034        session: &DbSession<SessionSqlCanister>,
1035        rows: &[(&'static str, u64)],
1036    ) {
1037        for (name, age) in rows {
1038            session
1039                .insert(SessionSqlEntity {
1040                    id: Ulid::generate(),
1041                    name: (*name).to_string(),
1042                    age: *age,
1043                })
1044                .expect("seed insert should succeed");
1045        }
1046    }
1047
1048    // Execute one scalar SQL query and return `(name, age)` tuples in response order.
1049    fn execute_sql_name_age_rows(
1050        session: &DbSession<SessionSqlCanister>,
1051        sql: &str,
1052    ) -> Vec<(String, u64)> {
1053        session
1054            .execute_sql::<SessionSqlEntity>(sql)
1055            .expect("scalar SQL execution should succeed")
1056            .iter()
1057            .map(|row| (row.entity_ref().name.clone(), row.entity_ref().age))
1058            .collect()
1059    }
1060
1061    // Assert one explain payload contains every required token for one case.
1062    fn assert_explain_contains_tokens(explain: &str, tokens: &[&str], context: &str) {
1063        for token in tokens {
1064            assert!(
1065                explain.contains(token),
1066                "explain matrix case missing token `{token}`: {context}",
1067            );
1068        }
1069    }
1070
1071    // Assert query-surface cursor errors remain wrapped under QueryError::Plan(PlanError::Cursor).
1072    fn assert_query_error_is_cursor_plan(
1073        err: QueryError,
1074        predicate: impl FnOnce(&CursorPlanError) -> bool,
1075    ) {
1076        assert!(matches!(
1077            err,
1078            QueryError::Plan(plan_err)
1079                if matches!(
1080                    plan_err.as_ref(),
1081                    PlanError::Cursor(inner) if predicate(inner.as_ref())
1082                )
1083        ));
1084    }
1085
1086    // Assert both session conversion paths preserve the same cursor-plan variant payload.
1087    fn assert_cursor_mapping_parity(
1088        build: impl Fn() -> CursorPlanError,
1089        predicate: impl Fn(&CursorPlanError) -> bool + Copy,
1090    ) {
1091        let mapped_via_executor = map_executor_plan_error(ExecutorPlanError::from(build()));
1092        assert_query_error_is_cursor_plan(mapped_via_executor, predicate);
1093
1094        let mapped_via_plan = QueryError::from(PlanError::from(build()));
1095        assert_query_error_is_cursor_plan(mapped_via_plan, predicate);
1096    }
1097
1098    // Assert SQL parser unsupported-feature labels remain preserved through
1099    // query-facing execution error detail payloads.
1100    fn assert_sql_unsupported_feature_detail(err: QueryError, expected_feature: &'static str) {
1101        let QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1102            internal,
1103        )) = err
1104        else {
1105            panic!("expected query execution unsupported error variant");
1106        };
1107
1108        assert_eq!(internal.class(), ErrorClass::Unsupported);
1109        assert_eq!(internal.origin(), ErrorOrigin::Query);
1110        assert!(
1111            matches!(
1112                internal.detail(),
1113                Some(ErrorDetail::Query(QueryErrorDetail::UnsupportedSqlFeature { feature }))
1114                    if *feature == expected_feature
1115            ),
1116            "unsupported SQL feature detail label should be preserved",
1117        );
1118    }
1119
1120    fn unsupported_sql_feature_cases() -> [(&'static str, &'static str); 3] {
1121        [
1122            (
1123                "SELECT * FROM SessionSqlEntity JOIN other ON SessionSqlEntity.id = other.id",
1124                "JOIN",
1125            ),
1126            (
1127                "SELECT \"name\" FROM SessionSqlEntity",
1128                "quoted identifiers",
1129            ),
1130            ("SELECT * FROM SessionSqlEntity alias", "table aliases"),
1131        ]
1132    }
1133
1134    #[test]
1135    fn session_cursor_error_mapping_parity_boundary_arity() {
1136        assert_cursor_mapping_parity(
1137            || CursorPlanError::continuation_cursor_boundary_arity_mismatch(2, 1),
1138            |inner| {
1139                matches!(
1140                    inner,
1141                    CursorPlanError::ContinuationCursorBoundaryArityMismatch {
1142                        expected: 2,
1143                        found: 1
1144                    }
1145                )
1146            },
1147        );
1148    }
1149
1150    #[test]
1151    fn session_cursor_error_mapping_parity_window_mismatch() {
1152        assert_cursor_mapping_parity(
1153            || CursorPlanError::continuation_cursor_window_mismatch(8, 3),
1154            |inner| {
1155                matches!(
1156                    inner,
1157                    CursorPlanError::ContinuationCursorWindowMismatch {
1158                        expected_offset: 8,
1159                        actual_offset: 3
1160                    }
1161                )
1162            },
1163        );
1164    }
1165
1166    #[test]
1167    fn session_cursor_error_mapping_parity_decode_reason() {
1168        assert_cursor_mapping_parity(
1169            || {
1170                CursorPlanError::invalid_continuation_cursor(
1171                    crate::db::codec::cursor::CursorDecodeError::OddLength,
1172                )
1173            },
1174            |inner| {
1175                matches!(
1176                    inner,
1177                    CursorPlanError::InvalidContinuationCursor {
1178                        reason: crate::db::codec::cursor::CursorDecodeError::OddLength
1179                    }
1180                )
1181            },
1182        );
1183    }
1184
1185    #[test]
1186    fn session_cursor_error_mapping_parity_primary_key_type_mismatch() {
1187        assert_cursor_mapping_parity(
1188            || {
1189                CursorPlanError::continuation_cursor_primary_key_type_mismatch(
1190                    "id",
1191                    "ulid",
1192                    Some(crate::value::Value::Text("not-a-ulid".to_string())),
1193                )
1194            },
1195            |inner| {
1196                matches!(
1197                    inner,
1198                    CursorPlanError::ContinuationCursorPrimaryKeyTypeMismatch {
1199                        field,
1200                        expected,
1201                        value: Some(crate::value::Value::Text(value))
1202                    } if field == "id" && expected == "ulid" && value == "not-a-ulid"
1203                )
1204            },
1205        );
1206    }
1207
1208    #[test]
1209    fn session_cursor_error_mapping_parity_matrix_preserves_cursor_variants() {
1210        // Keep one matrix-level canary test name so cross-module audit references remain stable.
1211        assert_cursor_mapping_parity(
1212            || CursorPlanError::continuation_cursor_boundary_arity_mismatch(2, 1),
1213            |inner| {
1214                matches!(
1215                    inner,
1216                    CursorPlanError::ContinuationCursorBoundaryArityMismatch {
1217                        expected: 2,
1218                        found: 1
1219                    }
1220                )
1221            },
1222        );
1223    }
1224
1225    #[test]
1226    fn execute_sql_select_star_honors_order_limit_offset() {
1227        reset_session_sql_store();
1228        let session = sql_session();
1229
1230        session
1231            .insert(SessionSqlEntity {
1232                id: Ulid::generate(),
1233                name: "older".to_string(),
1234                age: 37,
1235            })
1236            .expect("seed insert should succeed");
1237        session
1238            .insert(SessionSqlEntity {
1239                id: Ulid::generate(),
1240                name: "younger".to_string(),
1241                age: 19,
1242            })
1243            .expect("seed insert should succeed");
1244
1245        let response = session
1246            .execute_sql::<SessionSqlEntity>(
1247                "SELECT * FROM SessionSqlEntity ORDER BY age ASC LIMIT 1 OFFSET 1",
1248            )
1249            .expect("SELECT * should execute");
1250
1251        assert_eq!(response.count(), 1, "window should return one row");
1252        let row = response
1253            .iter()
1254            .next()
1255            .expect("windowed result should include one row");
1256        assert_eq!(
1257            row.entity_ref().name,
1258            "older",
1259            "ordered window should return the second age-ordered row",
1260        );
1261    }
1262
1263    #[test]
1264    fn execute_sql_scalar_matrix_queries_match_expected_rows() {
1265        reset_session_sql_store();
1266        let session = sql_session();
1267
1268        // Phase 1: seed deterministic rows for scalar matrix cases.
1269        seed_session_sql_entities(
1270            &session,
1271            &[
1272                ("scalar-matrix-a", 10),
1273                ("scalar-matrix-b", 20),
1274                ("scalar-matrix-c", 30),
1275                ("scalar-matrix-d", 40),
1276            ],
1277        );
1278
1279        // Phase 2: execute table-driven scalar SQL cases.
1280        let cases = vec![
1281            (
1282                "SELECT * \
1283                 FROM SessionSqlEntity \
1284                 ORDER BY age DESC LIMIT 2 OFFSET 1",
1285                vec![
1286                    ("scalar-matrix-c".to_string(), 30_u64),
1287                    ("scalar-matrix-b".to_string(), 20_u64),
1288                ],
1289            ),
1290            (
1291                "SELECT * \
1292                 FROM SessionSqlEntity \
1293                 WHERE age >= 20 \
1294                 ORDER BY age ASC LIMIT 2",
1295                vec![
1296                    ("scalar-matrix-b".to_string(), 20_u64),
1297                    ("scalar-matrix-c".to_string(), 30_u64),
1298                ],
1299            ),
1300            (
1301                "SELECT DISTINCT * \
1302                 FROM SessionSqlEntity \
1303                 WHERE age >= 30 \
1304                 ORDER BY age DESC",
1305                vec![
1306                    ("scalar-matrix-d".to_string(), 40_u64),
1307                    ("scalar-matrix-c".to_string(), 30_u64),
1308                ],
1309            ),
1310            (
1311                "SELECT * \
1312                 FROM public.SessionSqlEntity \
1313                 WHERE age < 25 \
1314                 ORDER BY age ASC",
1315                vec![
1316                    ("scalar-matrix-a".to_string(), 10_u64),
1317                    ("scalar-matrix-b".to_string(), 20_u64),
1318                ],
1319            ),
1320        ];
1321
1322        // Phase 3: assert scalar row payload order and values for each query.
1323        for (sql, expected_rows) in cases {
1324            let actual_rows = execute_sql_name_age_rows(&session, sql);
1325            assert_eq!(actual_rows, expected_rows, "scalar matrix case: {sql}");
1326        }
1327    }
1328
1329    #[test]
1330    fn execute_sql_delete_honors_predicate_order_and_limit() {
1331        reset_session_sql_store();
1332        let session = sql_session();
1333
1334        session
1335            .insert(SessionSqlEntity {
1336                id: Ulid::generate(),
1337                name: "first-minor".to_string(),
1338                age: 16,
1339            })
1340            .expect("seed insert should succeed");
1341        session
1342            .insert(SessionSqlEntity {
1343                id: Ulid::generate(),
1344                name: "second-minor".to_string(),
1345                age: 17,
1346            })
1347            .expect("seed insert should succeed");
1348        session
1349            .insert(SessionSqlEntity {
1350                id: Ulid::generate(),
1351                name: "adult".to_string(),
1352                age: 42,
1353            })
1354            .expect("seed insert should succeed");
1355
1356        let deleted = session
1357            .execute_sql::<SessionSqlEntity>(
1358                "DELETE FROM SessionSqlEntity WHERE age < 20 ORDER BY age ASC LIMIT 1",
1359            )
1360            .expect("DELETE should execute");
1361
1362        assert_eq!(deleted.count(), 1, "delete limit should remove one row");
1363        assert_eq!(
1364            deleted
1365                .iter()
1366                .next()
1367                .expect("deleted row should exist")
1368                .entity_ref()
1369                .age,
1370            16,
1371            "ordered delete should remove the youngest matching row first",
1372        );
1373
1374        let remaining = session
1375            .load::<SessionSqlEntity>()
1376            .order_by("age")
1377            .execute()
1378            .expect("post-delete load should succeed");
1379        let remaining_ages = remaining
1380            .iter()
1381            .map(|row| row.entity_ref().age)
1382            .collect::<Vec<_>>();
1383
1384        assert_eq!(
1385            remaining_ages,
1386            vec![17, 42],
1387            "delete window semantics should preserve non-deleted rows",
1388        );
1389    }
1390
1391    #[test]
1392    fn execute_sql_delete_matrix_queries_match_deleted_and_remaining_rows() {
1393        // Phase 1: define one shared seed dataset and table-driven DELETE cases.
1394        let seed_rows = [
1395            ("delete-matrix-a", 10_u64),
1396            ("delete-matrix-b", 20_u64),
1397            ("delete-matrix-c", 30_u64),
1398            ("delete-matrix-d", 40_u64),
1399        ];
1400        let cases = vec![
1401            (
1402                "DELETE FROM SessionSqlEntity \
1403                 WHERE age >= 20 \
1404                 ORDER BY age ASC LIMIT 1",
1405                vec![("delete-matrix-b".to_string(), 20_u64)],
1406                vec![
1407                    ("delete-matrix-a".to_string(), 10_u64),
1408                    ("delete-matrix-c".to_string(), 30_u64),
1409                    ("delete-matrix-d".to_string(), 40_u64),
1410                ],
1411            ),
1412            (
1413                "DELETE FROM SessionSqlEntity \
1414                 WHERE age >= 20 \
1415                 ORDER BY age DESC LIMIT 2",
1416                vec![
1417                    ("delete-matrix-d".to_string(), 40_u64),
1418                    ("delete-matrix-c".to_string(), 30_u64),
1419                ],
1420                vec![
1421                    ("delete-matrix-a".to_string(), 10_u64),
1422                    ("delete-matrix-b".to_string(), 20_u64),
1423                ],
1424            ),
1425            (
1426                "DELETE FROM SessionSqlEntity \
1427                 WHERE age >= 100 \
1428                 ORDER BY age ASC LIMIT 1",
1429                vec![],
1430                vec![
1431                    ("delete-matrix-a".to_string(), 10_u64),
1432                    ("delete-matrix-b".to_string(), 20_u64),
1433                    ("delete-matrix-c".to_string(), 30_u64),
1434                    ("delete-matrix-d".to_string(), 40_u64),
1435                ],
1436            ),
1437        ];
1438
1439        // Phase 2: execute each DELETE case from a fresh seeded store.
1440        for (sql, expected_deleted, expected_remaining) in cases {
1441            reset_session_sql_store();
1442            let session = sql_session();
1443            seed_session_sql_entities(&session, &seed_rows);
1444
1445            let deleted = session
1446                .execute_sql::<SessionSqlEntity>(sql)
1447                .expect("delete matrix SQL execution should succeed");
1448            let deleted_rows = deleted
1449                .iter()
1450                .map(|row| (row.entity_ref().name.clone(), row.entity_ref().age))
1451                .collect::<Vec<_>>();
1452            let remaining_rows = execute_sql_name_age_rows(
1453                &session,
1454                "SELECT * FROM SessionSqlEntity ORDER BY age ASC",
1455            );
1456
1457            assert_eq!(
1458                deleted_rows, expected_deleted,
1459                "delete matrix deleted rows: {sql}"
1460            );
1461            assert_eq!(
1462                remaining_rows, expected_remaining,
1463                "delete matrix remaining rows: {sql}",
1464            );
1465        }
1466    }
1467
1468    #[test]
1469    fn query_from_sql_rejects_explain_statements() {
1470        reset_session_sql_store();
1471        let session = sql_session();
1472
1473        let err = session
1474            .query_from_sql::<SessionSqlEntity>("EXPLAIN SELECT * FROM SessionSqlEntity")
1475            .expect_err("query_from_sql must reject EXPLAIN statements");
1476
1477        assert!(
1478            matches!(
1479                err,
1480                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1481                    _
1482                ))
1483            ),
1484            "query_from_sql EXPLAIN rejection must map to unsupported execution class",
1485        );
1486    }
1487
1488    #[test]
1489    fn query_from_sql_preserves_parser_unsupported_feature_detail_labels() {
1490        reset_session_sql_store();
1491        let session = sql_session();
1492
1493        for (sql, feature) in unsupported_sql_feature_cases() {
1494            let err = session
1495                .query_from_sql::<SessionSqlEntity>(sql)
1496                .expect_err("unsupported SQL feature should fail through query_from_sql");
1497            assert_sql_unsupported_feature_detail(err, feature);
1498        }
1499    }
1500
1501    #[test]
1502    fn execute_sql_preserves_parser_unsupported_feature_detail_labels() {
1503        reset_session_sql_store();
1504        let session = sql_session();
1505
1506        for (sql, feature) in unsupported_sql_feature_cases() {
1507            let err = session
1508                .execute_sql::<SessionSqlEntity>(sql)
1509                .expect_err("unsupported SQL feature should fail through execute_sql");
1510            assert_sql_unsupported_feature_detail(err, feature);
1511        }
1512    }
1513
1514    #[test]
1515    fn execute_sql_projection_preserves_parser_unsupported_feature_detail_labels() {
1516        reset_session_sql_store();
1517        let session = sql_session();
1518
1519        for (sql, feature) in unsupported_sql_feature_cases() {
1520            let err = session
1521                .execute_sql_projection::<SessionSqlEntity>(sql)
1522                .expect_err("unsupported SQL feature should fail through execute_sql_projection");
1523            assert_sql_unsupported_feature_detail(err, feature);
1524        }
1525    }
1526
1527    #[test]
1528    fn execute_sql_grouped_preserves_parser_unsupported_feature_detail_labels() {
1529        reset_session_sql_store();
1530        let session = sql_session();
1531
1532        for (sql, feature) in unsupported_sql_feature_cases() {
1533            let err = session
1534                .execute_sql_grouped::<SessionSqlEntity>(sql, None)
1535                .expect_err("unsupported SQL feature should fail through execute_sql_grouped");
1536            assert_sql_unsupported_feature_detail(err, feature);
1537        }
1538    }
1539
1540    #[test]
1541    fn execute_sql_aggregate_preserves_parser_unsupported_feature_detail_labels() {
1542        reset_session_sql_store();
1543        let session = sql_session();
1544
1545        for (sql, feature) in unsupported_sql_feature_cases() {
1546            let err = session
1547                .execute_sql_aggregate::<SessionSqlEntity>(sql)
1548                .expect_err("unsupported SQL feature should fail through execute_sql_aggregate");
1549            assert_sql_unsupported_feature_detail(err, feature);
1550        }
1551    }
1552
1553    #[test]
1554    fn explain_sql_preserves_parser_unsupported_feature_detail_labels() {
1555        reset_session_sql_store();
1556        let session = sql_session();
1557
1558        for (sql, feature) in unsupported_sql_feature_cases() {
1559            let explain_sql = format!("EXPLAIN {sql}");
1560            let err = session
1561                .explain_sql::<SessionSqlEntity>(explain_sql.as_str())
1562                .expect_err("unsupported SQL feature should fail through explain_sql");
1563            assert_sql_unsupported_feature_detail(err, feature);
1564        }
1565    }
1566
1567    #[test]
1568    fn query_from_sql_select_field_projection_lowers_to_scalar_field_selection() {
1569        reset_session_sql_store();
1570        let session = sql_session();
1571
1572        let query = session
1573            .query_from_sql::<SessionSqlEntity>("SELECT name, age FROM SessionSqlEntity")
1574            .expect("field-list SQL query should lower");
1575        let projection = query
1576            .plan()
1577            .expect("field-list SQL plan should build")
1578            .projection_spec();
1579        let field_names = projection
1580            .fields()
1581            .map(|field| match field {
1582                ProjectionField::Scalar {
1583                    expr: Expr::Field(field),
1584                    alias: None,
1585                } => field.as_str().to_string(),
1586                other @ ProjectionField::Scalar { .. } => {
1587                    panic!("field-list SQL projection should lower to plain field exprs: {other:?}")
1588                }
1589            })
1590            .collect::<Vec<_>>();
1591
1592        assert_eq!(field_names, vec!["name".to_string(), "age".to_string()]);
1593    }
1594
1595    #[test]
1596    fn query_from_sql_select_grouped_aggregate_projection_lowers_to_grouped_intent() {
1597        reset_session_sql_store();
1598        let session = sql_session();
1599
1600        let query = session
1601            .query_from_sql::<SessionSqlEntity>(
1602                "SELECT age, COUNT(*) FROM SessionSqlEntity GROUP BY age",
1603            )
1604            .expect("grouped aggregate projection SQL query should lower");
1605        assert!(
1606            query.has_grouping(),
1607            "grouped aggregate SQL projection lowering should produce grouped query intent",
1608        );
1609    }
1610
1611    #[test]
1612    fn execute_sql_select_field_projection_currently_returns_entity_shaped_rows() {
1613        reset_session_sql_store();
1614        let session = sql_session();
1615
1616        session
1617            .insert(SessionSqlEntity {
1618                id: Ulid::generate(),
1619                name: "projected-row".to_string(),
1620                age: 29,
1621            })
1622            .expect("seed insert should succeed");
1623
1624        let response = session
1625            .execute_sql::<SessionSqlEntity>(
1626                "SELECT name FROM SessionSqlEntity ORDER BY age ASC LIMIT 1",
1627            )
1628            .expect("field-list SQL projection should execute");
1629        let row = response
1630            .iter()
1631            .next()
1632            .expect("field-list SQL projection response should contain one row");
1633
1634        assert_eq!(
1635            row.entity_ref().name,
1636            "projected-row",
1637            "field-list SQL projection should still return entity rows in this baseline",
1638        );
1639        assert_eq!(
1640            row.entity_ref().age,
1641            29,
1642            "field-list SQL projection should preserve full entity payload until projection response shaping is introduced",
1643        );
1644    }
1645
1646    #[test]
1647    fn execute_sql_projection_select_field_list_returns_projection_shaped_rows() {
1648        reset_session_sql_store();
1649        let session = sql_session();
1650
1651        session
1652            .insert(SessionSqlEntity {
1653                id: Ulid::generate(),
1654                name: "projection-surface".to_string(),
1655                age: 33,
1656            })
1657            .expect("seed insert should succeed");
1658
1659        let response = session
1660            .execute_sql_projection::<SessionSqlEntity>(
1661                "SELECT name FROM SessionSqlEntity ORDER BY age ASC LIMIT 1",
1662            )
1663            .expect("projection SQL execution should succeed");
1664        let row = response
1665            .iter()
1666            .next()
1667            .expect("projection SQL response should contain one row");
1668
1669        assert_eq!(response.count(), 1);
1670        assert_eq!(
1671            row.values(),
1672            [Value::Text("projection-surface".to_string())],
1673            "projection SQL response should carry only projected field values in declaration order",
1674        );
1675    }
1676
1677    #[test]
1678    fn execute_sql_projection_select_star_returns_all_fields_in_model_order() {
1679        reset_session_sql_store();
1680        let session = sql_session();
1681
1682        session
1683            .insert(SessionSqlEntity {
1684                id: Ulid::generate(),
1685                name: "projection-star".to_string(),
1686                age: 41,
1687            })
1688            .expect("seed insert should succeed");
1689
1690        let response = session
1691            .execute_sql_projection::<SessionSqlEntity>(
1692                "SELECT * FROM SessionSqlEntity ORDER BY age ASC LIMIT 1",
1693            )
1694            .expect("projection SQL star execution should succeed");
1695        let row = response
1696            .iter()
1697            .next()
1698            .expect("projection SQL star response should contain one row");
1699
1700        assert_eq!(response.count(), 1);
1701        assert_eq!(
1702            row.values().len(),
1703            3,
1704            "SELECT * projection response should include all model fields",
1705        );
1706        assert_eq!(row.values()[0], Value::Ulid(row.id().key()));
1707        assert_eq!(row.values()[1], Value::Text("projection-star".to_string()));
1708        assert_eq!(row.values()[2], Value::Uint(41));
1709    }
1710
1711    #[test]
1712    fn execute_sql_select_schema_qualified_entity_executes() {
1713        reset_session_sql_store();
1714        let session = sql_session();
1715
1716        session
1717            .insert(SessionSqlEntity {
1718                id: Ulid::generate(),
1719                name: "schema-qualified".to_string(),
1720                age: 41,
1721            })
1722            .expect("seed insert should succeed");
1723
1724        let response = session
1725            .execute_sql::<SessionSqlEntity>(
1726                "SELECT * FROM public.SessionSqlEntity ORDER BY age ASC LIMIT 1",
1727            )
1728            .expect("schema-qualified entity SQL should execute");
1729
1730        assert_eq!(response.len(), 1);
1731    }
1732
1733    #[test]
1734    fn execute_sql_projection_select_table_qualified_fields_executes() {
1735        reset_session_sql_store();
1736        let session = sql_session();
1737
1738        session
1739            .insert(SessionSqlEntity {
1740                id: Ulid::generate(),
1741                name: "qualified-projection".to_string(),
1742                age: 42,
1743            })
1744            .expect("seed insert should succeed");
1745
1746        let response = session
1747            .execute_sql_projection::<SessionSqlEntity>(
1748                "SELECT SessionSqlEntity.name \
1749                 FROM SessionSqlEntity \
1750                 WHERE SessionSqlEntity.age >= 40 \
1751                 ORDER BY SessionSqlEntity.age DESC LIMIT 1",
1752            )
1753            .expect("table-qualified projection SQL should execute");
1754        let row = response
1755            .iter()
1756            .next()
1757            .expect("table-qualified projection SQL response should contain one row");
1758
1759        assert_eq!(response.count(), 1);
1760        assert_eq!(
1761            row.values(),
1762            [Value::Text("qualified-projection".to_string())]
1763        );
1764    }
1765
1766    #[test]
1767    fn execute_sql_projection_select_field_list_honors_order_limit_offset_window() {
1768        reset_session_sql_store();
1769        let session = sql_session();
1770
1771        // Phase 1: seed deterministic age-ordered rows.
1772        session
1773            .insert(SessionSqlEntity {
1774                id: Ulid::generate(),
1775                name: "projection-window-a".to_string(),
1776                age: 10,
1777            })
1778            .expect("seed insert should succeed");
1779        session
1780            .insert(SessionSqlEntity {
1781                id: Ulid::generate(),
1782                name: "projection-window-b".to_string(),
1783                age: 20,
1784            })
1785            .expect("seed insert should succeed");
1786        session
1787            .insert(SessionSqlEntity {
1788                id: Ulid::generate(),
1789                name: "projection-window-c".to_string(),
1790                age: 30,
1791            })
1792            .expect("seed insert should succeed");
1793        session
1794            .insert(SessionSqlEntity {
1795                id: Ulid::generate(),
1796                name: "projection-window-d".to_string(),
1797                age: 40,
1798            })
1799            .expect("seed insert should succeed");
1800
1801        // Phase 2: execute one projection query with explicit window controls.
1802        let response = session
1803            .execute_sql_projection::<SessionSqlEntity>(
1804                "SELECT name, age \
1805                 FROM SessionSqlEntity \
1806                 ORDER BY age DESC LIMIT 2 OFFSET 1",
1807            )
1808            .expect("projection SQL window execution should succeed");
1809        let rows = response.iter().collect::<Vec<_>>();
1810
1811        // Phase 3: assert projected row payloads follow ordered window semantics.
1812        assert_eq!(response.count(), 2);
1813        assert_eq!(
1814            rows[0].values(),
1815            [
1816                Value::Text("projection-window-c".to_string()),
1817                Value::Uint(30)
1818            ],
1819        );
1820        assert_eq!(
1821            rows[1].values(),
1822            [
1823                Value::Text("projection-window-b".to_string()),
1824                Value::Uint(20)
1825            ],
1826        );
1827    }
1828
1829    #[test]
1830    fn execute_sql_projection_rejects_delete_statements() {
1831        reset_session_sql_store();
1832        let session = sql_session();
1833
1834        let err = session
1835            .execute_sql_projection::<SessionSqlEntity>(
1836                "DELETE FROM SessionSqlEntity ORDER BY age LIMIT 1",
1837            )
1838            .expect_err("projection SQL execution should reject delete statements");
1839
1840        assert!(
1841            matches!(
1842                err,
1843                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1844                    _
1845                ))
1846            ),
1847            "projection SQL delete usage should fail as unsupported",
1848        );
1849    }
1850
1851    #[test]
1852    fn execute_sql_select_field_projection_unknown_field_fails_with_plan_error() {
1853        reset_session_sql_store();
1854        let session = sql_session();
1855
1856        let err = session
1857            .execute_sql::<SessionSqlEntity>("SELECT missing_field FROM SessionSqlEntity")
1858            .expect_err("unknown projected fields should fail planner validation");
1859
1860        assert!(
1861            matches!(err, QueryError::Plan(_)),
1862            "unknown projected fields should surface planner-domain query errors: {err:?}",
1863        );
1864    }
1865
1866    #[test]
1867    fn execute_sql_rejects_aggregate_projection_in_current_slice() {
1868        reset_session_sql_store();
1869        let session = sql_session();
1870
1871        let err = session
1872            .execute_sql::<SessionSqlEntity>("SELECT COUNT(*) FROM SessionSqlEntity")
1873            .expect_err("global aggregate SQL projection should remain lowering-gated");
1874
1875        assert!(
1876            matches!(
1877                err,
1878                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1879                    _
1880                ))
1881            ),
1882            "global aggregate SQL projection should fail at reduced lowering boundary",
1883        );
1884    }
1885
1886    #[test]
1887    fn execute_sql_rejects_table_alias_forms_in_reduced_parser() {
1888        reset_session_sql_store();
1889        let session = sql_session();
1890
1891        let err = session
1892            .execute_sql::<SessionSqlEntity>("SELECT * FROM SessionSqlEntity alias")
1893            .expect_err("table aliases should be rejected by reduced SQL parser");
1894
1895        assert!(
1896            matches!(
1897                err,
1898                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1899                    _
1900                ))
1901            ),
1902            "table alias usage should fail closed through unsupported SQL boundary",
1903        );
1904    }
1905
1906    #[test]
1907    fn execute_sql_rejects_quoted_identifiers_in_reduced_parser() {
1908        reset_session_sql_store();
1909        let session = sql_session();
1910
1911        let err = session
1912            .execute_sql::<SessionSqlEntity>("SELECT \"name\" FROM SessionSqlEntity")
1913            .expect_err("quoted identifiers should be rejected by reduced SQL parser");
1914
1915        assert!(
1916            matches!(
1917                err,
1918                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1919                    _
1920                ))
1921            ),
1922            "quoted identifiers should fail closed through unsupported SQL boundary",
1923        );
1924    }
1925
1926    #[test]
1927    fn execute_sql_select_distinct_star_executes() {
1928        reset_session_sql_store();
1929        let session = sql_session();
1930
1931        let id_a = Ulid::generate();
1932        let id_b = Ulid::generate();
1933        session
1934            .insert(SessionSqlEntity {
1935                id: id_a,
1936                name: "distinct-a".to_string(),
1937                age: 20,
1938            })
1939            .expect("seed insert should succeed");
1940        session
1941            .insert(SessionSqlEntity {
1942                id: id_b,
1943                name: "distinct-b".to_string(),
1944                age: 20,
1945            })
1946            .expect("seed insert should succeed");
1947
1948        let response = session
1949            .execute_sql::<SessionSqlEntity>(
1950                "SELECT DISTINCT * FROM SessionSqlEntity ORDER BY id ASC",
1951            )
1952            .expect("SELECT DISTINCT * should execute");
1953        assert_eq!(response.len(), 2);
1954    }
1955
1956    #[test]
1957    fn execute_sql_projection_select_distinct_with_pk_field_list_executes() {
1958        reset_session_sql_store();
1959        let session = sql_session();
1960
1961        session
1962            .insert(SessionSqlEntity {
1963                id: Ulid::generate(),
1964                name: "distinct-pk-a".to_string(),
1965                age: 25,
1966            })
1967            .expect("seed insert should succeed");
1968        session
1969            .insert(SessionSqlEntity {
1970                id: Ulid::generate(),
1971                name: "distinct-pk-b".to_string(),
1972                age: 25,
1973            })
1974            .expect("seed insert should succeed");
1975
1976        let response = session
1977            .execute_sql_projection::<SessionSqlEntity>(
1978                "SELECT DISTINCT id, age FROM SessionSqlEntity ORDER BY id ASC",
1979            )
1980            .expect("SELECT DISTINCT field-list with PK should execute");
1981        assert_eq!(response.len(), 2);
1982        assert_eq!(response[0].values().len(), 2);
1983    }
1984
1985    #[test]
1986    fn execute_sql_rejects_distinct_without_pk_projection_in_current_slice() {
1987        reset_session_sql_store();
1988        let session = sql_session();
1989
1990        let err = session
1991            .execute_sql::<SessionSqlEntity>("SELECT DISTINCT age FROM SessionSqlEntity")
1992            .expect_err("SELECT DISTINCT without PK in projection should remain lowering-gated");
1993
1994        assert!(
1995            matches!(
1996                err,
1997                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1998                    _
1999                ))
2000            ),
2001            "distinct SQL gating should map to unsupported execution error boundary",
2002        );
2003    }
2004
2005    #[test]
2006    fn execute_sql_aggregate_count_star_and_count_field_return_uint() {
2007        reset_session_sql_store();
2008        let session = sql_session();
2009
2010        session
2011            .insert(SessionSqlEntity {
2012                id: Ulid::generate(),
2013                name: "aggregate-a".to_string(),
2014                age: 20,
2015            })
2016            .expect("seed insert should succeed");
2017        session
2018            .insert(SessionSqlEntity {
2019                id: Ulid::generate(),
2020                name: "aggregate-b".to_string(),
2021                age: 32,
2022            })
2023            .expect("seed insert should succeed");
2024
2025        let count_rows = session
2026            .execute_sql_aggregate::<SessionSqlEntity>("SELECT COUNT(*) FROM SessionSqlEntity")
2027            .expect("COUNT(*) SQL aggregate should execute");
2028        let count_field = session
2029            .execute_sql_aggregate::<SessionSqlEntity>("SELECT COUNT(age) FROM SessionSqlEntity")
2030            .expect("COUNT(field) SQL aggregate should execute");
2031        assert_eq!(count_rows, Value::Uint(2));
2032        assert_eq!(count_field, Value::Uint(2));
2033    }
2034
2035    #[test]
2036    fn execute_sql_aggregate_sum_with_table_qualified_field_executes() {
2037        reset_session_sql_store();
2038        let session = sql_session();
2039
2040        session
2041            .insert(SessionSqlEntity {
2042                id: Ulid::generate(),
2043                name: "qualified-aggregate-a".to_string(),
2044                age: 20,
2045            })
2046            .expect("seed insert should succeed");
2047        session
2048            .insert(SessionSqlEntity {
2049                id: Ulid::generate(),
2050                name: "qualified-aggregate-b".to_string(),
2051                age: 32,
2052            })
2053            .expect("seed insert should succeed");
2054
2055        let sum = session
2056            .execute_sql_aggregate::<SessionSqlEntity>(
2057                "SELECT SUM(SessionSqlEntity.age) FROM SessionSqlEntity",
2058            )
2059            .expect("table-qualified aggregate SQL should execute");
2060
2061        assert_eq!(sum, Value::Decimal(crate::types::Decimal::from(52u64)));
2062    }
2063
2064    #[test]
2065    fn execute_sql_aggregate_rejects_distinct_aggregate_qualifier() {
2066        reset_session_sql_store();
2067        let session = sql_session();
2068
2069        let err = session
2070            .execute_sql_aggregate::<SessionSqlEntity>(
2071                "SELECT COUNT(DISTINCT age) FROM SessionSqlEntity",
2072            )
2073            .expect_err("aggregate DISTINCT qualifier should remain unsupported");
2074
2075        assert!(
2076            matches!(
2077                err,
2078                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
2079                    _
2080                ))
2081            ),
2082            "aggregate DISTINCT qualifier should fail closed through unsupported SQL boundary",
2083        );
2084    }
2085
2086    #[test]
2087    fn execute_sql_aggregate_sum_avg_min_max_return_expected_values() {
2088        reset_session_sql_store();
2089        let session = sql_session();
2090
2091        session
2092            .insert(SessionSqlEntity {
2093                id: Ulid::generate(),
2094                name: "sumavg-a".to_string(),
2095                age: 20,
2096            })
2097            .expect("seed insert should succeed");
2098        session
2099            .insert(SessionSqlEntity {
2100                id: Ulid::generate(),
2101                name: "sumavg-b".to_string(),
2102                age: 32,
2103            })
2104            .expect("seed insert should succeed");
2105
2106        let sum = session
2107            .execute_sql_aggregate::<SessionSqlEntity>("SELECT SUM(age) FROM SessionSqlEntity")
2108            .expect("SUM(field) SQL aggregate should execute");
2109        let avg = session
2110            .execute_sql_aggregate::<SessionSqlEntity>("SELECT AVG(age) FROM SessionSqlEntity")
2111            .expect("AVG(field) SQL aggregate should execute");
2112        let min = session
2113            .execute_sql_aggregate::<SessionSqlEntity>("SELECT MIN(age) FROM SessionSqlEntity")
2114            .expect("MIN(field) SQL aggregate should execute");
2115        let max = session
2116            .execute_sql_aggregate::<SessionSqlEntity>("SELECT MAX(age) FROM SessionSqlEntity")
2117            .expect("MAX(field) SQL aggregate should execute");
2118        let empty_sum = session
2119            .execute_sql_aggregate::<SessionSqlEntity>(
2120                "SELECT SUM(age) FROM SessionSqlEntity WHERE age < 0",
2121            )
2122            .expect("SUM(field) SQL aggregate empty-window execution should succeed");
2123        let empty_min = session
2124            .execute_sql_aggregate::<SessionSqlEntity>(
2125                "SELECT MIN(age) FROM SessionSqlEntity WHERE age < 0",
2126            )
2127            .expect("MIN(field) SQL aggregate empty-window execution should succeed");
2128        let empty_max = session
2129            .execute_sql_aggregate::<SessionSqlEntity>(
2130                "SELECT MAX(age) FROM SessionSqlEntity WHERE age < 0",
2131            )
2132            .expect("MAX(field) SQL aggregate empty-window execution should succeed");
2133
2134        assert_eq!(sum, Value::Decimal(crate::types::Decimal::from(52u64)));
2135        assert_eq!(avg, Value::Decimal(crate::types::Decimal::from(26u64)));
2136        assert_eq!(min, Value::Uint(20));
2137        assert_eq!(max, Value::Uint(32));
2138        assert_eq!(empty_sum, Value::Null);
2139        assert_eq!(empty_min, Value::Null);
2140        assert_eq!(empty_max, Value::Null);
2141    }
2142
2143    #[test]
2144    fn execute_sql_aggregate_honors_order_limit_offset_window() {
2145        reset_session_sql_store();
2146        let session = sql_session();
2147
2148        session
2149            .insert(SessionSqlEntity {
2150                id: Ulid::generate(),
2151                name: "window-a".to_string(),
2152                age: 10,
2153            })
2154            .expect("seed insert should succeed");
2155        session
2156            .insert(SessionSqlEntity {
2157                id: Ulid::generate(),
2158                name: "window-b".to_string(),
2159                age: 20,
2160            })
2161            .expect("seed insert should succeed");
2162        session
2163            .insert(SessionSqlEntity {
2164                id: Ulid::generate(),
2165                name: "window-c".to_string(),
2166                age: 30,
2167            })
2168            .expect("seed insert should succeed");
2169
2170        let count = session
2171            .execute_sql_aggregate::<SessionSqlEntity>(
2172                "SELECT COUNT(*) FROM SessionSqlEntity ORDER BY age DESC LIMIT 2 OFFSET 1",
2173            )
2174            .expect("COUNT(*) SQL aggregate window execution should succeed");
2175        let sum = session
2176            .execute_sql_aggregate::<SessionSqlEntity>(
2177                "SELECT SUM(age) FROM SessionSqlEntity ORDER BY age DESC LIMIT 1 OFFSET 1",
2178            )
2179            .expect("SUM(field) SQL aggregate window execution should succeed");
2180        let avg = session
2181            .execute_sql_aggregate::<SessionSqlEntity>(
2182                "SELECT AVG(age) FROM SessionSqlEntity ORDER BY age ASC LIMIT 2 OFFSET 1",
2183            )
2184            .expect("AVG(field) SQL aggregate window execution should succeed");
2185
2186        assert_eq!(count, Value::Uint(2));
2187        assert_eq!(sum, Value::Decimal(crate::types::Decimal::from(20u64)));
2188        assert_eq!(avg, Value::Decimal(crate::types::Decimal::from(25u64)));
2189    }
2190
2191    #[test]
2192    fn execute_sql_aggregate_offset_beyond_window_returns_empty_aggregate_semantics() {
2193        reset_session_sql_store();
2194        let session = sql_session();
2195
2196        // Phase 1: seed a small scalar window.
2197        session
2198            .insert(SessionSqlEntity {
2199                id: Ulid::generate(),
2200                name: "beyond-window-a".to_string(),
2201                age: 10,
2202            })
2203            .expect("seed insert should succeed");
2204        session
2205            .insert(SessionSqlEntity {
2206                id: Ulid::generate(),
2207                name: "beyond-window-b".to_string(),
2208                age: 20,
2209            })
2210            .expect("seed insert should succeed");
2211
2212        // Phase 2: execute aggregates where OFFSET removes all visible rows.
2213        let count = session
2214            .execute_sql_aggregate::<SessionSqlEntity>(
2215                "SELECT COUNT(*) FROM SessionSqlEntity ORDER BY age ASC LIMIT 1 OFFSET 10",
2216            )
2217            .expect("COUNT(*) aggregate with offset beyond window should execute");
2218        let sum = session
2219            .execute_sql_aggregate::<SessionSqlEntity>(
2220                "SELECT SUM(age) FROM SessionSqlEntity ORDER BY age ASC LIMIT 1 OFFSET 10",
2221            )
2222            .expect("SUM aggregate with offset beyond window should execute");
2223        let avg = session
2224            .execute_sql_aggregate::<SessionSqlEntity>(
2225                "SELECT AVG(age) FROM SessionSqlEntity ORDER BY age ASC LIMIT 1 OFFSET 10",
2226            )
2227            .expect("AVG aggregate with offset beyond window should execute");
2228        let min = session
2229            .execute_sql_aggregate::<SessionSqlEntity>(
2230                "SELECT MIN(age) FROM SessionSqlEntity ORDER BY age ASC LIMIT 1 OFFSET 10",
2231            )
2232            .expect("MIN aggregate with offset beyond window should execute");
2233        let max = session
2234            .execute_sql_aggregate::<SessionSqlEntity>(
2235                "SELECT MAX(age) FROM SessionSqlEntity ORDER BY age ASC LIMIT 1 OFFSET 10",
2236            )
2237            .expect("MAX aggregate with offset beyond window should execute");
2238
2239        // Phase 3: assert empty-window aggregate semantics.
2240        assert_eq!(count, Value::Uint(0));
2241        assert_eq!(sum, Value::Null);
2242        assert_eq!(avg, Value::Null);
2243        assert_eq!(min, Value::Null);
2244        assert_eq!(max, Value::Null);
2245    }
2246
2247    #[test]
2248    fn execute_sql_projection_matrix_queries_match_expected_projected_rows() {
2249        reset_session_sql_store();
2250        let session = sql_session();
2251
2252        // Phase 1: seed deterministic rows used by matrix projections.
2253        seed_session_sql_entities(
2254            &session,
2255            &[
2256                ("matrix-a", 10),
2257                ("matrix-b", 20),
2258                ("matrix-c", 30),
2259                ("matrix-d", 40),
2260            ],
2261        );
2262
2263        // Phase 2: execute table-driven projection SQL cases.
2264        let cases = vec![
2265            (
2266                "SELECT name, age \
2267                 FROM SessionSqlEntity \
2268                 ORDER BY age DESC LIMIT 2 OFFSET 1",
2269                vec![
2270                    vec![Value::Text("matrix-c".to_string()), Value::Uint(30)],
2271                    vec![Value::Text("matrix-b".to_string()), Value::Uint(20)],
2272                ],
2273            ),
2274            (
2275                "SELECT age \
2276                 FROM SessionSqlEntity \
2277                 WHERE age >= 20 \
2278                 ORDER BY age ASC LIMIT 2",
2279                vec![vec![Value::Uint(20)], vec![Value::Uint(30)]],
2280            ),
2281            (
2282                "SELECT name \
2283                 FROM SessionSqlEntity \
2284                 WHERE age < 25 \
2285                 ORDER BY age ASC",
2286                vec![
2287                    vec![Value::Text("matrix-a".to_string())],
2288                    vec![Value::Text("matrix-b".to_string())],
2289                ],
2290            ),
2291        ];
2292
2293        // Phase 3: assert projected row payloads for each SQL input.
2294        for (sql, expected_rows) in cases {
2295            let response = session
2296                .execute_sql_projection::<SessionSqlEntity>(sql)
2297                .expect("projection matrix SQL execution should succeed");
2298            let actual_rows = response
2299                .iter()
2300                .map(|row| row.values().to_vec())
2301                .collect::<Vec<_>>();
2302
2303            assert_eq!(actual_rows, expected_rows, "projection matrix case: {sql}");
2304        }
2305    }
2306
2307    #[test]
2308    fn execute_sql_grouped_matrix_queries_match_expected_grouped_rows() {
2309        reset_session_sql_store();
2310        let session = sql_session();
2311
2312        // Phase 1: seed deterministic rows used by grouped matrix queries.
2313        seed_session_sql_entities(
2314            &session,
2315            &[
2316                ("group-matrix-a", 10),
2317                ("group-matrix-b", 10),
2318                ("group-matrix-c", 20),
2319                ("group-matrix-d", 30),
2320                ("group-matrix-e", 30),
2321                ("group-matrix-f", 30),
2322            ],
2323        );
2324
2325        // Phase 2: execute table-driven grouped SQL cases.
2326        let cases = vec![
2327            (
2328                "SELECT age, COUNT(*) \
2329                 FROM SessionSqlEntity \
2330                 GROUP BY age \
2331                 ORDER BY age ASC LIMIT 10",
2332                vec![(10_u64, 2_u64), (20_u64, 1_u64), (30_u64, 3_u64)],
2333            ),
2334            (
2335                "SELECT age, COUNT(*) \
2336                 FROM SessionSqlEntity \
2337                 WHERE age >= 20 \
2338                 GROUP BY age \
2339                 ORDER BY age ASC LIMIT 10",
2340                vec![(20_u64, 1_u64), (30_u64, 3_u64)],
2341            ),
2342            (
2343                "SELECT SessionSqlEntity.age, COUNT(*) \
2344                 FROM public.SessionSqlEntity \
2345                 WHERE SessionSqlEntity.age >= 20 \
2346                 GROUP BY SessionSqlEntity.age \
2347                 ORDER BY SessionSqlEntity.age ASC LIMIT 10",
2348                vec![(20_u64, 1_u64), (30_u64, 3_u64)],
2349            ),
2350        ];
2351
2352        // Phase 3: assert grouped row payloads for each SQL input.
2353        for (sql, expected_rows) in cases {
2354            let execution = session
2355                .execute_sql_grouped::<SessionSqlEntity>(sql, None)
2356                .expect("grouped matrix SQL execution should succeed");
2357            let actual_rows = execution
2358                .rows()
2359                .iter()
2360                .map(|row| {
2361                    (
2362                        row.group_key()[0].clone(),
2363                        row.aggregate_values()[0].clone(),
2364                    )
2365                })
2366                .collect::<Vec<_>>();
2367            let expected_values = expected_rows
2368                .iter()
2369                .map(|(group_key, count)| (Value::Uint(*group_key), Value::Uint(*count)))
2370                .collect::<Vec<_>>();
2371
2372            assert!(
2373                execution.continuation_cursor().is_none(),
2374                "grouped matrix cases should fully materialize under LIMIT 10: {sql}",
2375            );
2376            assert_eq!(actual_rows, expected_values, "grouped matrix case: {sql}");
2377        }
2378    }
2379
2380    #[test]
2381    fn execute_sql_aggregate_matrix_queries_match_expected_values() {
2382        reset_session_sql_store();
2383        let session = sql_session();
2384
2385        // Phase 1: seed deterministic rows used by aggregate matrix queries.
2386        seed_session_sql_entities(
2387            &session,
2388            &[
2389                ("agg-matrix-a", 10),
2390                ("agg-matrix-b", 10),
2391                ("agg-matrix-c", 20),
2392                ("agg-matrix-d", 30),
2393                ("agg-matrix-e", 30),
2394                ("agg-matrix-f", 30),
2395            ],
2396        );
2397
2398        // Phase 2: execute table-driven aggregate SQL cases.
2399        let cases = vec![
2400            ("SELECT COUNT(*) FROM SessionSqlEntity", Value::Uint(6)),
2401            (
2402                "SELECT SUM(age) FROM SessionSqlEntity",
2403                Value::Decimal(crate::types::Decimal::from(130_u64)),
2404            ),
2405            (
2406                "SELECT AVG(age) FROM SessionSqlEntity ORDER BY age DESC LIMIT 2",
2407                Value::Decimal(crate::types::Decimal::from(30_u64)),
2408            ),
2409            (
2410                "SELECT MIN(age) FROM SessionSqlEntity WHERE age >= 20",
2411                Value::Uint(20),
2412            ),
2413            (
2414                "SELECT MAX(age) FROM SessionSqlEntity WHERE age <= 20",
2415                Value::Uint(20),
2416            ),
2417            (
2418                "SELECT COUNT(*) FROM SessionSqlEntity WHERE age < 0",
2419                Value::Uint(0),
2420            ),
2421            (
2422                "SELECT SUM(age) FROM SessionSqlEntity WHERE age < 0",
2423                Value::Null,
2424            ),
2425        ];
2426
2427        // Phase 3: assert aggregate outputs for each SQL input.
2428        for (sql, expected_value) in cases {
2429            let actual_value = session
2430                .execute_sql_aggregate::<SessionSqlEntity>(sql)
2431                .expect("aggregate matrix SQL execution should succeed");
2432
2433            assert_eq!(actual_value, expected_value, "aggregate matrix case: {sql}");
2434        }
2435    }
2436
2437    #[test]
2438    fn execute_sql_aggregate_rejects_unsupported_aggregate_shapes() {
2439        reset_session_sql_store();
2440        let session = sql_session();
2441
2442        for sql in [
2443            "SELECT age FROM SessionSqlEntity",
2444            "SELECT age, COUNT(*) FROM SessionSqlEntity GROUP BY age",
2445        ] {
2446            let err = session
2447                .execute_sql_aggregate::<SessionSqlEntity>(sql)
2448                .expect_err("unsupported SQL aggregate shape should fail closed");
2449            assert!(
2450                matches!(
2451                    err,
2452                    QueryError::Execute(
2453                        crate::db::query::intent::QueryExecutionError::Unsupported(_)
2454                    )
2455                ),
2456                "unsupported SQL aggregate shape should map to unsupported execution error boundary: {sql}",
2457            );
2458        }
2459    }
2460
2461    #[test]
2462    fn execute_sql_aggregate_rejects_unknown_target_field() {
2463        reset_session_sql_store();
2464        let session = sql_session();
2465
2466        let err = session
2467            .execute_sql_aggregate::<SessionSqlEntity>(
2468                "SELECT SUM(missing_field) FROM SessionSqlEntity",
2469            )
2470            .expect_err("unknown aggregate target field should fail");
2471
2472        assert!(
2473            matches!(
2474                err,
2475                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
2476                    _
2477                ))
2478            ),
2479            "unknown aggregate target field should map to unsupported execution error boundary",
2480        );
2481    }
2482
2483    #[test]
2484    fn execute_sql_projection_rejects_grouped_aggregate_sql() {
2485        reset_session_sql_store();
2486        let session = sql_session();
2487
2488        let err = session
2489            .execute_sql_projection::<SessionSqlEntity>(
2490                "SELECT age, COUNT(*) FROM SessionSqlEntity GROUP BY age",
2491            )
2492            .expect_err("projection SQL API should reject grouped aggregate SQL intent");
2493
2494        assert!(
2495            matches!(
2496                err,
2497                QueryError::Intent(
2498                    crate::db::query::intent::IntentError::GroupedRequiresExecuteGrouped
2499                )
2500            ),
2501            "projection SQL API must reject grouped aggregate SQL with grouped-intent routing error",
2502        );
2503    }
2504
2505    #[test]
2506    fn execute_sql_grouped_select_count_returns_grouped_aggregate_row() {
2507        reset_session_sql_store();
2508        let session = sql_session();
2509
2510        session
2511            .insert(SessionSqlEntity {
2512                id: Ulid::generate(),
2513                name: "aggregate-a".to_string(),
2514                age: 20,
2515            })
2516            .expect("seed insert should succeed");
2517        session
2518            .insert(SessionSqlEntity {
2519                id: Ulid::generate(),
2520                name: "aggregate-b".to_string(),
2521                age: 20,
2522            })
2523            .expect("seed insert should succeed");
2524        session
2525            .insert(SessionSqlEntity {
2526                id: Ulid::generate(),
2527                name: "aggregate-c".to_string(),
2528                age: 32,
2529            })
2530            .expect("seed insert should succeed");
2531
2532        let execution = session
2533            .execute_sql_grouped::<SessionSqlEntity>(
2534                "SELECT age, COUNT(*) FROM SessionSqlEntity GROUP BY age ORDER BY age ASC LIMIT 10",
2535                None,
2536            )
2537            .expect("grouped SQL aggregate execution should succeed");
2538
2539        assert!(
2540            execution.continuation_cursor().is_none(),
2541            "single-page grouped aggregate execution should not emit continuation cursor",
2542        );
2543        assert_eq!(execution.rows().len(), 2);
2544        assert_eq!(execution.rows()[0].group_key(), [Value::Uint(20)]);
2545        assert_eq!(execution.rows()[0].aggregate_values(), [Value::Uint(2)]);
2546        assert_eq!(execution.rows()[1].group_key(), [Value::Uint(32)]);
2547        assert_eq!(execution.rows()[1].aggregate_values(), [Value::Uint(1)]);
2548    }
2549
2550    #[test]
2551    fn execute_sql_grouped_select_count_with_qualified_identifiers_executes() {
2552        reset_session_sql_store();
2553        let session = sql_session();
2554
2555        session
2556            .insert(SessionSqlEntity {
2557                id: Ulid::generate(),
2558                name: "qualified-group-a".to_string(),
2559                age: 20,
2560            })
2561            .expect("seed insert should succeed");
2562        session
2563            .insert(SessionSqlEntity {
2564                id: Ulid::generate(),
2565                name: "qualified-group-b".to_string(),
2566                age: 20,
2567            })
2568            .expect("seed insert should succeed");
2569        session
2570            .insert(SessionSqlEntity {
2571                id: Ulid::generate(),
2572                name: "qualified-group-c".to_string(),
2573                age: 32,
2574            })
2575            .expect("seed insert should succeed");
2576
2577        let execution = session
2578            .execute_sql_grouped::<SessionSqlEntity>(
2579                "SELECT SessionSqlEntity.age, COUNT(*) \
2580                 FROM public.SessionSqlEntity \
2581                 WHERE SessionSqlEntity.age >= 20 \
2582                 GROUP BY SessionSqlEntity.age \
2583                 ORDER BY SessionSqlEntity.age ASC LIMIT 10",
2584                None,
2585            )
2586            .expect("qualified grouped SQL aggregate execution should succeed");
2587
2588        assert!(execution.continuation_cursor().is_none());
2589        assert_eq!(execution.rows().len(), 2);
2590        assert_eq!(execution.rows()[0].group_key(), [Value::Uint(20)]);
2591        assert_eq!(execution.rows()[0].aggregate_values(), [Value::Uint(2)]);
2592        assert_eq!(execution.rows()[1].group_key(), [Value::Uint(32)]);
2593        assert_eq!(execution.rows()[1].aggregate_values(), [Value::Uint(1)]);
2594    }
2595
2596    #[test]
2597    fn execute_sql_grouped_limit_window_emits_cursor_and_resumes_next_group_page() {
2598        reset_session_sql_store();
2599        let session = sql_session();
2600
2601        // Phase 1: seed three grouped-key buckets with deterministic counts.
2602        session
2603            .insert(SessionSqlEntity {
2604                id: Ulid::generate(),
2605                name: "group-page-a".to_string(),
2606                age: 10,
2607            })
2608            .expect("seed insert should succeed");
2609        session
2610            .insert(SessionSqlEntity {
2611                id: Ulid::generate(),
2612                name: "group-page-b".to_string(),
2613                age: 10,
2614            })
2615            .expect("seed insert should succeed");
2616        session
2617            .insert(SessionSqlEntity {
2618                id: Ulid::generate(),
2619                name: "group-page-c".to_string(),
2620                age: 20,
2621            })
2622            .expect("seed insert should succeed");
2623        session
2624            .insert(SessionSqlEntity {
2625                id: Ulid::generate(),
2626                name: "group-page-d".to_string(),
2627                age: 30,
2628            })
2629            .expect("seed insert should succeed");
2630        session
2631            .insert(SessionSqlEntity {
2632                id: Ulid::generate(),
2633                name: "group-page-e".to_string(),
2634                age: 30,
2635            })
2636            .expect("seed insert should succeed");
2637        session
2638            .insert(SessionSqlEntity {
2639                id: Ulid::generate(),
2640                name: "group-page-f".to_string(),
2641                age: 30,
2642            })
2643            .expect("seed insert should succeed");
2644
2645        // Phase 2: execute the first grouped page and capture continuation cursor.
2646        let sql = "SELECT age, COUNT(*) \
2647                   FROM SessionSqlEntity \
2648                   GROUP BY age \
2649                   ORDER BY age ASC LIMIT 1";
2650        let first_page = session
2651            .execute_sql_grouped::<SessionSqlEntity>(sql, None)
2652            .expect("first grouped SQL page should execute");
2653        assert_eq!(first_page.rows().len(), 1);
2654        assert_eq!(first_page.rows()[0].group_key(), [Value::Uint(10)]);
2655        assert_eq!(first_page.rows()[0].aggregate_values(), [Value::Uint(2)]);
2656        let cursor_one = crate::db::encode_cursor(
2657            first_page
2658                .continuation_cursor()
2659                .expect("first grouped SQL page should emit continuation cursor"),
2660        );
2661
2662        // Phase 3: resume to second grouped page and capture next cursor.
2663        let second_page = session
2664            .execute_sql_grouped::<SessionSqlEntity>(sql, Some(cursor_one.as_str()))
2665            .expect("second grouped SQL page should execute");
2666        assert_eq!(second_page.rows().len(), 1);
2667        assert_eq!(second_page.rows()[0].group_key(), [Value::Uint(20)]);
2668        assert_eq!(second_page.rows()[0].aggregate_values(), [Value::Uint(1)]);
2669        let cursor_two = crate::db::encode_cursor(
2670            second_page
2671                .continuation_cursor()
2672                .expect("second grouped SQL page should emit continuation cursor"),
2673        );
2674
2675        // Phase 4: resume final grouped page and assert no further continuation.
2676        let third_page = session
2677            .execute_sql_grouped::<SessionSqlEntity>(sql, Some(cursor_two.as_str()))
2678            .expect("third grouped SQL page should execute");
2679        assert_eq!(third_page.rows().len(), 1);
2680        assert_eq!(third_page.rows()[0].group_key(), [Value::Uint(30)]);
2681        assert_eq!(third_page.rows()[0].aggregate_values(), [Value::Uint(3)]);
2682        assert!(
2683            third_page.continuation_cursor().is_none(),
2684            "last grouped SQL page should not emit continuation cursor",
2685        );
2686    }
2687
2688    #[test]
2689    fn execute_sql_grouped_rejects_invalid_cursor_token_payload() {
2690        reset_session_sql_store();
2691        let session = sql_session();
2692
2693        // Phase 1: execute one grouped query with an invalid cursor token payload.
2694        let err = session
2695            .execute_sql_grouped::<SessionSqlEntity>(
2696                "SELECT age, COUNT(*) \
2697                 FROM SessionSqlEntity \
2698                 GROUP BY age \
2699                 ORDER BY age ASC LIMIT 1",
2700                Some("zz"),
2701            )
2702            .expect_err("grouped SQL should fail closed on invalid cursor token payload");
2703
2704        // Phase 2: assert decode failures stay in cursor-plan error taxonomy.
2705        assert_query_error_is_cursor_plan(err, |inner| {
2706            matches!(inner, CursorPlanError::InvalidContinuationCursor { .. })
2707        });
2708    }
2709
2710    #[test]
2711    fn execute_sql_grouped_rejects_cursor_token_from_different_query_signature() {
2712        reset_session_sql_store();
2713        let session = sql_session();
2714
2715        // Phase 1: seed grouped buckets and capture one valid continuation cursor.
2716        seed_session_sql_entities(
2717            &session,
2718            &[
2719                ("cursor-signature-a", 10),
2720                ("cursor-signature-b", 20),
2721                ("cursor-signature-c", 30),
2722            ],
2723        );
2724        let first_page = session
2725            .execute_sql_grouped::<SessionSqlEntity>(
2726                "SELECT age, COUNT(*) \
2727                 FROM SessionSqlEntity \
2728                 GROUP BY age \
2729                 ORDER BY age ASC LIMIT 1",
2730                None,
2731            )
2732            .expect("first grouped SQL page should execute");
2733        let cursor = crate::db::encode_cursor(
2734            first_page
2735                .continuation_cursor()
2736                .expect("first grouped SQL page should emit continuation cursor"),
2737        );
2738
2739        // Phase 2: replay cursor against a signature-incompatible grouped SQL shape.
2740        let err = session
2741            .execute_sql_grouped::<SessionSqlEntity>(
2742                "SELECT age, COUNT(*) \
2743                 FROM SessionSqlEntity \
2744                 GROUP BY age \
2745                 ORDER BY age DESC LIMIT 1",
2746                Some(cursor.as_str()),
2747            )
2748            .expect_err(
2749                "grouped SQL should reject cursor tokens from incompatible query signatures",
2750            );
2751
2752        // Phase 3: assert mismatch stays in cursor-plan signature error taxonomy.
2753        assert_query_error_is_cursor_plan(err, |inner| {
2754            matches!(
2755                inner,
2756                CursorPlanError::ContinuationCursorSignatureMismatch { .. }
2757            )
2758        });
2759    }
2760
2761    #[test]
2762    fn execute_sql_grouped_rejects_scalar_sql_intent() {
2763        reset_session_sql_store();
2764        let session = sql_session();
2765
2766        let err = session
2767            .execute_sql_grouped::<SessionSqlEntity>("SELECT name FROM SessionSqlEntity", None)
2768            .expect_err("grouped SQL API should reject non-grouped SQL queries");
2769
2770        assert!(
2771            matches!(
2772                err,
2773                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
2774                    _
2775                ))
2776            ),
2777            "grouped SQL API should fail closed for non-grouped SQL shapes",
2778        );
2779    }
2780
2781    #[test]
2782    fn execute_sql_rejects_grouped_sql_intent_without_grouped_api() {
2783        reset_session_sql_store();
2784        let session = sql_session();
2785
2786        let err = session
2787            .execute_sql::<SessionSqlEntity>(
2788                "SELECT age, COUNT(*) FROM SessionSqlEntity GROUP BY age",
2789            )
2790            .expect_err("scalar SQL API should reject grouped SQL intent");
2791
2792        assert!(
2793            matches!(
2794                err,
2795                QueryError::Intent(
2796                    crate::db::query::intent::IntentError::GroupedRequiresExecuteGrouped
2797                )
2798            ),
2799            "scalar SQL API must preserve grouped explicit-entrypoint contract",
2800        );
2801    }
2802
2803    #[test]
2804    fn execute_sql_rejects_unsupported_group_by_projection_shape() {
2805        reset_session_sql_store();
2806        let session = sql_session();
2807
2808        let err = session
2809            .execute_sql::<SessionSqlEntity>("SELECT COUNT(*) FROM SessionSqlEntity GROUP BY age")
2810            .expect_err("group-by projection mismatch should fail closed");
2811
2812        assert!(
2813            matches!(
2814                err,
2815                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
2816                    _
2817                ))
2818            ),
2819            "unsupported grouped SQL projection shapes should fail at reduced lowering boundary",
2820        );
2821    }
2822
2823    #[test]
2824    fn explain_sql_plan_matrix_queries_include_expected_tokens() {
2825        reset_session_sql_store();
2826        let session = sql_session();
2827
2828        // Phase 1: define table-driven EXPLAIN plan SQL cases.
2829        let cases = vec![
2830            (
2831                "EXPLAIN SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
2832                vec!["mode=Load", "access="],
2833            ),
2834            (
2835                "EXPLAIN SELECT DISTINCT * FROM SessionSqlEntity ORDER BY id ASC",
2836                vec!["mode=Load", "distinct=true"],
2837            ),
2838            (
2839                "EXPLAIN SELECT age, COUNT(*) \
2840                 FROM SessionSqlEntity \
2841                 GROUP BY age \
2842                 ORDER BY age ASC LIMIT 10",
2843                vec!["mode=Load", "grouping=Grouped"],
2844            ),
2845            (
2846                "EXPLAIN DELETE FROM SessionSqlEntity ORDER BY age LIMIT 1",
2847                vec!["mode=Delete", "access="],
2848            ),
2849            (
2850                "EXPLAIN SELECT COUNT(*) FROM SessionSqlEntity",
2851                vec!["mode=Load", "access="],
2852            ),
2853        ];
2854
2855        // Phase 2: execute each EXPLAIN plan query and assert stable output tokens.
2856        for (sql, tokens) in cases {
2857            let explain = session
2858                .explain_sql::<SessionSqlEntity>(sql)
2859                .expect("EXPLAIN plan matrix query should succeed");
2860            assert_explain_contains_tokens(explain.as_str(), tokens.as_slice(), sql);
2861        }
2862    }
2863
2864    #[test]
2865    fn explain_sql_execution_matrix_queries_include_expected_tokens() {
2866        reset_session_sql_store();
2867        let session = sql_session();
2868
2869        // Phase 1: define table-driven EXPLAIN EXECUTION SQL cases.
2870        let cases = vec![
2871            (
2872                "EXPLAIN EXECUTION SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
2873                vec!["node_id=0", "layer="],
2874            ),
2875            (
2876                "EXPLAIN EXECUTION SELECT age, COUNT(*) \
2877                 FROM SessionSqlEntity \
2878                 GROUP BY age \
2879                 ORDER BY age ASC LIMIT 10",
2880                vec!["node_id=0", "execution_mode="],
2881            ),
2882            (
2883                "EXPLAIN EXECUTION SELECT COUNT(*) FROM SessionSqlEntity",
2884                vec!["AggregateCount execution_mode=", "node_id=0"],
2885            ),
2886        ];
2887
2888        // Phase 2: execute each EXPLAIN EXECUTION query and assert stable output tokens.
2889        for (sql, tokens) in cases {
2890            let explain = session
2891                .explain_sql::<SessionSqlEntity>(sql)
2892                .expect("EXPLAIN EXECUTION matrix query should succeed");
2893            assert_explain_contains_tokens(explain.as_str(), tokens.as_slice(), sql);
2894        }
2895    }
2896
2897    #[test]
2898    fn explain_sql_json_matrix_queries_include_expected_tokens() {
2899        reset_session_sql_store();
2900        let session = sql_session();
2901
2902        // Phase 1: define table-driven EXPLAIN JSON SQL cases.
2903        let cases = vec![
2904            (
2905                "EXPLAIN JSON SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
2906                vec!["\"mode\":{\"type\":\"Load\"", "\"access\":"],
2907            ),
2908            (
2909                "EXPLAIN JSON SELECT DISTINCT * FROM SessionSqlEntity ORDER BY id ASC",
2910                vec!["\"mode\":{\"type\":\"Load\"", "\"distinct\":true"],
2911            ),
2912            (
2913                "EXPLAIN JSON SELECT age, COUNT(*) \
2914                 FROM SessionSqlEntity \
2915                 GROUP BY age \
2916                 ORDER BY age ASC LIMIT 10",
2917                vec!["\"mode\":{\"type\":\"Load\"", "\"grouping\""],
2918            ),
2919            (
2920                "EXPLAIN JSON DELETE FROM SessionSqlEntity ORDER BY age LIMIT 1",
2921                vec!["\"mode\":{\"type\":\"Delete\"", "\"access\":"],
2922            ),
2923            (
2924                "EXPLAIN JSON SELECT COUNT(*) FROM SessionSqlEntity",
2925                vec!["\"mode\":{\"type\":\"Load\"", "\"access\":"],
2926            ),
2927        ];
2928
2929        // Phase 2: execute each EXPLAIN JSON query and assert stable output tokens.
2930        for (sql, tokens) in cases {
2931            let explain = session
2932                .explain_sql::<SessionSqlEntity>(sql)
2933                .expect("EXPLAIN JSON matrix query should succeed");
2934            assert!(
2935                explain.starts_with('{') && explain.ends_with('}'),
2936                "explain JSON matrix output should be one JSON object payload: {sql}",
2937            );
2938            assert_explain_contains_tokens(explain.as_str(), tokens.as_slice(), sql);
2939        }
2940    }
2941
2942    #[test]
2943    fn explain_sql_execution_returns_descriptor_text() {
2944        reset_session_sql_store();
2945        let session = sql_session();
2946
2947        let explain = session
2948            .explain_sql::<SessionSqlEntity>(
2949                "EXPLAIN EXECUTION SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
2950            )
2951            .expect("EXPLAIN EXECUTION should succeed");
2952
2953        assert!(
2954            explain.contains("node_id=0"),
2955            "execution explain output should include the root descriptor node id",
2956        );
2957        assert!(
2958            explain.contains("layer="),
2959            "execution explain output should include execution layer annotations",
2960        );
2961    }
2962
2963    #[test]
2964    fn explain_sql_plan_returns_logical_plan_text() {
2965        reset_session_sql_store();
2966        let session = sql_session();
2967
2968        let explain = session
2969            .explain_sql::<SessionSqlEntity>(
2970                "EXPLAIN SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
2971            )
2972            .expect("EXPLAIN should succeed");
2973
2974        assert!(
2975            explain.contains("mode=Load"),
2976            "logical explain text should include query mode projection",
2977        );
2978        assert!(
2979            explain.contains("access="),
2980            "logical explain text should include projected access shape",
2981        );
2982    }
2983
2984    #[test]
2985    fn explain_sql_plan_grouped_qualified_identifiers_match_unqualified_output() {
2986        reset_session_sql_store();
2987        let session = sql_session();
2988
2989        let qualified = session
2990            .explain_sql::<SessionSqlEntity>(
2991                "EXPLAIN SELECT SessionSqlEntity.age, COUNT(*) \
2992                 FROM public.SessionSqlEntity \
2993                 WHERE SessionSqlEntity.age >= 21 \
2994                 GROUP BY SessionSqlEntity.age \
2995                 ORDER BY SessionSqlEntity.age DESC LIMIT 2 OFFSET 1",
2996            )
2997            .expect("qualified grouped EXPLAIN plan SQL should succeed");
2998        let unqualified = session
2999            .explain_sql::<SessionSqlEntity>(
3000                "EXPLAIN SELECT age, COUNT(*) \
3001                 FROM SessionSqlEntity \
3002                 WHERE age >= 21 \
3003                 GROUP BY age \
3004                 ORDER BY age DESC LIMIT 2 OFFSET 1",
3005            )
3006            .expect("unqualified grouped EXPLAIN plan SQL should succeed");
3007
3008        assert_eq!(
3009            qualified, unqualified,
3010            "qualified grouped identifiers should normalize to the same logical EXPLAIN plan output",
3011        );
3012    }
3013
3014    #[test]
3015    fn explain_sql_execution_grouped_qualified_identifiers_match_unqualified_output() {
3016        reset_session_sql_store();
3017        let session = sql_session();
3018
3019        let qualified = session
3020            .explain_sql::<SessionSqlEntity>(
3021                "EXPLAIN EXECUTION SELECT SessionSqlEntity.age, COUNT(*) \
3022                 FROM public.SessionSqlEntity \
3023                 WHERE SessionSqlEntity.age >= 21 \
3024                 GROUP BY SessionSqlEntity.age \
3025                 ORDER BY SessionSqlEntity.age DESC LIMIT 2 OFFSET 1",
3026            )
3027            .expect("qualified grouped EXPLAIN execution SQL should succeed");
3028        let unqualified = session
3029            .explain_sql::<SessionSqlEntity>(
3030                "EXPLAIN EXECUTION SELECT age, COUNT(*) \
3031                 FROM SessionSqlEntity \
3032                 WHERE age >= 21 \
3033                 GROUP BY age \
3034                 ORDER BY age DESC LIMIT 2 OFFSET 1",
3035            )
3036            .expect("unqualified grouped EXPLAIN execution SQL should succeed");
3037
3038        assert_eq!(
3039            qualified, unqualified,
3040            "qualified grouped identifiers should normalize to the same execution EXPLAIN descriptor output",
3041        );
3042    }
3043
3044    #[test]
3045    fn explain_sql_json_grouped_qualified_identifiers_match_unqualified_output() {
3046        reset_session_sql_store();
3047        let session = sql_session();
3048
3049        let qualified = session
3050            .explain_sql::<SessionSqlEntity>(
3051                "EXPLAIN JSON SELECT SessionSqlEntity.age, COUNT(*) \
3052                 FROM public.SessionSqlEntity \
3053                 WHERE SessionSqlEntity.age >= 21 \
3054                 GROUP BY SessionSqlEntity.age \
3055                 ORDER BY SessionSqlEntity.age DESC LIMIT 2 OFFSET 1",
3056            )
3057            .expect("qualified grouped EXPLAIN JSON SQL should succeed");
3058        let unqualified = session
3059            .explain_sql::<SessionSqlEntity>(
3060                "EXPLAIN JSON SELECT age, COUNT(*) \
3061                 FROM SessionSqlEntity \
3062                 WHERE age >= 21 \
3063                 GROUP BY age \
3064                 ORDER BY age DESC LIMIT 2 OFFSET 1",
3065            )
3066            .expect("unqualified grouped EXPLAIN JSON SQL should succeed");
3067
3068        assert_eq!(
3069            qualified, unqualified,
3070            "qualified grouped identifiers should normalize to the same EXPLAIN JSON output",
3071        );
3072    }
3073
3074    #[test]
3075    fn explain_sql_plan_qualified_identifiers_match_unqualified_output() {
3076        reset_session_sql_store();
3077        let session = sql_session();
3078
3079        let qualified = session
3080            .explain_sql::<SessionSqlEntity>(
3081                "EXPLAIN SELECT * \
3082                 FROM public.SessionSqlEntity \
3083                 WHERE SessionSqlEntity.age >= 21 \
3084                 ORDER BY SessionSqlEntity.age DESC LIMIT 1",
3085            )
3086            .expect("qualified EXPLAIN plan SQL should succeed");
3087        let unqualified = session
3088            .explain_sql::<SessionSqlEntity>(
3089                "EXPLAIN SELECT * \
3090                 FROM SessionSqlEntity \
3091                 WHERE age >= 21 \
3092                 ORDER BY age DESC LIMIT 1",
3093            )
3094            .expect("unqualified EXPLAIN plan SQL should succeed");
3095
3096        assert_eq!(
3097            qualified, unqualified,
3098            "qualified identifiers should normalize to the same logical EXPLAIN plan output",
3099        );
3100    }
3101
3102    #[test]
3103    fn explain_sql_execution_qualified_identifiers_match_unqualified_output() {
3104        reset_session_sql_store();
3105        let session = sql_session();
3106
3107        let qualified = session
3108            .explain_sql::<SessionSqlEntity>(
3109                "EXPLAIN EXECUTION SELECT SessionSqlEntity.name \
3110                 FROM SessionSqlEntity \
3111                 WHERE SessionSqlEntity.age >= 21 \
3112                 ORDER BY SessionSqlEntity.age DESC LIMIT 1",
3113            )
3114            .expect("qualified EXPLAIN execution SQL should succeed");
3115        let unqualified = session
3116            .explain_sql::<SessionSqlEntity>(
3117                "EXPLAIN EXECUTION SELECT name \
3118                 FROM SessionSqlEntity \
3119                 WHERE age >= 21 \
3120                 ORDER BY age DESC LIMIT 1",
3121            )
3122            .expect("unqualified EXPLAIN execution SQL should succeed");
3123
3124        assert_eq!(
3125            qualified, unqualified,
3126            "qualified identifiers should normalize to the same execution EXPLAIN descriptor output",
3127        );
3128    }
3129
3130    #[test]
3131    fn explain_sql_json_qualified_aggregate_matches_unqualified_output() {
3132        reset_session_sql_store();
3133        let session = sql_session();
3134
3135        let qualified = session
3136            .explain_sql::<SessionSqlEntity>(
3137                "EXPLAIN JSON SELECT SUM(SessionSqlEntity.age) \
3138                 FROM public.SessionSqlEntity \
3139                 WHERE SessionSqlEntity.age >= 21",
3140            )
3141            .expect("qualified global aggregate EXPLAIN JSON should succeed");
3142        let unqualified = session
3143            .explain_sql::<SessionSqlEntity>(
3144                "EXPLAIN JSON SELECT SUM(age) FROM SessionSqlEntity WHERE age >= 21",
3145            )
3146            .expect("unqualified global aggregate EXPLAIN JSON should succeed");
3147
3148        assert_eq!(
3149            qualified, unqualified,
3150            "qualified identifiers should normalize to the same global aggregate EXPLAIN JSON output",
3151        );
3152    }
3153
3154    #[test]
3155    fn explain_sql_plan_select_distinct_star_marks_distinct_true() {
3156        reset_session_sql_store();
3157        let session = sql_session();
3158
3159        let explain = session
3160            .explain_sql::<SessionSqlEntity>(
3161                "EXPLAIN SELECT DISTINCT * FROM SessionSqlEntity ORDER BY id ASC",
3162            )
3163            .expect("EXPLAIN SELECT DISTINCT * should succeed");
3164
3165        assert!(
3166            explain.contains("distinct=true"),
3167            "logical explain text should preserve scalar distinct intent",
3168        );
3169    }
3170
3171    #[test]
3172    fn explain_sql_execution_select_distinct_star_returns_execution_descriptor_text() {
3173        reset_session_sql_store();
3174        let session = sql_session();
3175
3176        let explain = session
3177            .explain_sql::<SessionSqlEntity>(
3178                "EXPLAIN EXECUTION SELECT DISTINCT * FROM SessionSqlEntity ORDER BY id ASC LIMIT 1",
3179            )
3180            .expect("EXPLAIN EXECUTION SELECT DISTINCT * should succeed");
3181
3182        assert!(
3183            explain.contains("node_id=0"),
3184            "execution explain output should include the root descriptor node id",
3185        );
3186    }
3187
3188    #[test]
3189    fn explain_sql_json_returns_logical_plan_json() {
3190        reset_session_sql_store();
3191        let session = sql_session();
3192
3193        let explain = session
3194            .explain_sql::<SessionSqlEntity>(
3195                "EXPLAIN JSON SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
3196            )
3197            .expect("EXPLAIN JSON should succeed");
3198
3199        assert!(
3200            explain.starts_with('{') && explain.ends_with('}'),
3201            "logical explain JSON should render one JSON object payload",
3202        );
3203        assert!(
3204            explain.contains("\"mode\":{\"type\":\"Load\""),
3205            "logical explain JSON should expose structured query mode metadata",
3206        );
3207        assert!(
3208            explain.contains("\"access\":"),
3209            "logical explain JSON should include projected access metadata",
3210        );
3211    }
3212
3213    #[test]
3214    fn explain_sql_json_select_distinct_star_marks_distinct_true() {
3215        reset_session_sql_store();
3216        let session = sql_session();
3217
3218        let explain = session
3219            .explain_sql::<SessionSqlEntity>(
3220                "EXPLAIN JSON SELECT DISTINCT * FROM SessionSqlEntity ORDER BY id ASC",
3221            )
3222            .expect("EXPLAIN JSON SELECT DISTINCT * should succeed");
3223
3224        assert!(
3225            explain.contains("\"distinct\":true"),
3226            "logical explain JSON should preserve scalar distinct intent",
3227        );
3228    }
3229
3230    #[test]
3231    fn explain_sql_json_delete_returns_logical_delete_mode() {
3232        reset_session_sql_store();
3233        let session = sql_session();
3234
3235        let explain = session
3236            .explain_sql::<SessionSqlEntity>(
3237                "EXPLAIN JSON DELETE FROM SessionSqlEntity ORDER BY age LIMIT 1",
3238            )
3239            .expect("EXPLAIN JSON DELETE should succeed");
3240
3241        assert!(
3242            explain.contains("\"mode\":{\"type\":\"Delete\""),
3243            "logical explain JSON should expose delete query mode metadata",
3244        );
3245    }
3246
3247    #[test]
3248    fn explain_sql_plan_global_aggregate_returns_logical_plan_text() {
3249        reset_session_sql_store();
3250        let session = sql_session();
3251
3252        let explain = session
3253            .explain_sql::<SessionSqlEntity>("EXPLAIN SELECT COUNT(*) FROM SessionSqlEntity")
3254            .expect("global aggregate SQL explain plan should succeed");
3255
3256        assert!(
3257            explain.contains("mode=Load"),
3258            "global aggregate SQL explain plan should project logical load mode",
3259        );
3260        assert!(
3261            explain.contains("access="),
3262            "global aggregate SQL explain plan should include logical access projection",
3263        );
3264    }
3265
3266    #[test]
3267    fn explain_sql_execution_global_aggregate_returns_execution_descriptor_text() {
3268        reset_session_sql_store();
3269        let session = sql_session();
3270
3271        let explain = session
3272            .explain_sql::<SessionSqlEntity>(
3273                "EXPLAIN EXECUTION SELECT COUNT(*) FROM SessionSqlEntity",
3274            )
3275            .expect("global aggregate SQL explain execution should succeed");
3276
3277        assert!(
3278            explain.contains("AggregateCount execution_mode="),
3279            "global aggregate SQL explain execution should include aggregate terminal node heading",
3280        );
3281        assert!(
3282            explain.contains("node_id=0"),
3283            "global aggregate SQL explain execution should include root node id",
3284        );
3285    }
3286
3287    #[test]
3288    fn explain_sql_json_global_aggregate_returns_logical_plan_json() {
3289        reset_session_sql_store();
3290        let session = sql_session();
3291
3292        let explain = session
3293            .explain_sql::<SessionSqlEntity>("EXPLAIN JSON SELECT COUNT(*) FROM SessionSqlEntity")
3294            .expect("global aggregate SQL explain json should succeed");
3295
3296        assert!(
3297            explain.starts_with('{') && explain.ends_with('}'),
3298            "global aggregate SQL explain json should render one JSON object payload",
3299        );
3300        assert!(
3301            explain.contains("\"mode\":{\"type\":\"Load\""),
3302            "global aggregate SQL explain json should expose logical query mode metadata",
3303        );
3304    }
3305
3306    #[test]
3307    fn explain_sql_global_aggregate_rejects_unknown_target_field() {
3308        reset_session_sql_store();
3309        let session = sql_session();
3310
3311        let err = session
3312            .explain_sql::<SessionSqlEntity>(
3313                "EXPLAIN EXECUTION SELECT SUM(missing_field) FROM SessionSqlEntity",
3314            )
3315            .expect_err("global aggregate SQL explain should reject unknown target fields");
3316
3317        assert!(
3318            matches!(
3319                err,
3320                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
3321                    _
3322                ))
3323            ),
3324            "global aggregate SQL explain should map unknown target field to unsupported execution error boundary",
3325        );
3326    }
3327
3328    #[test]
3329    fn explain_sql_rejects_distinct_without_pk_projection_in_current_slice() {
3330        reset_session_sql_store();
3331        let session = sql_session();
3332
3333        let err = session
3334            .explain_sql::<SessionSqlEntity>("EXPLAIN SELECT DISTINCT age FROM SessionSqlEntity")
3335            .expect_err("EXPLAIN SELECT DISTINCT without PK projection should remain fail-closed");
3336
3337        assert!(
3338            matches!(
3339                err,
3340                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
3341                    _
3342                ))
3343            ),
3344            "unsupported DISTINCT explain shape should map to unsupported execution error boundary",
3345        );
3346    }
3347
3348    #[test]
3349    fn explain_sql_rejects_non_explain_statements() {
3350        reset_session_sql_store();
3351        let session = sql_session();
3352
3353        let err = session
3354            .explain_sql::<SessionSqlEntity>("SELECT * FROM SessionSqlEntity")
3355            .expect_err("explain_sql must reject non-EXPLAIN statements");
3356
3357        assert!(
3358            matches!(
3359                err,
3360                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
3361                    _
3362                ))
3363            ),
3364            "non-EXPLAIN input must fail as unsupported explain usage",
3365        );
3366    }
3367}