Skip to main content

icydb_core/db/
session.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6#[cfg(test)]
7use crate::db::{DataStore, IndexStore};
8use crate::{
9    db::{
10        Db, EntityResponse, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
11        MissingRowPolicy, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace, PlanError,
12        ProjectionResponse, Query, QueryError, QueryTracePlan, StorageReport, StoreRegistry,
13        TraceExecutionStrategy, WriteBatchResponse,
14        access::AccessStrategy,
15        commit::EntityRuntimeHooks,
16        cursor::decode_optional_cursor_token,
17        executor::{
18            DeleteExecutor, ExecutablePlan, ExecutionStrategy, ExecutorPlanError, LoadExecutor,
19            SaveExecutor,
20        },
21        query::{
22            builder::aggregate::{AggregateExpr, avg, count, count_by, max_by, min_by, sum},
23            explain::ExplainAggregateTerminalPlan,
24            intent::IntentError,
25            plan::{FieldSlot, QueryMode},
26        },
27        schema::{describe_entity_model, show_indexes_for_model},
28        sql::lowering::{
29            SqlCommand, SqlGlobalAggregateCommand, SqlGlobalAggregateTerminal, SqlLoweringError,
30            compile_sql_command, compile_sql_global_aggregate_command,
31        },
32        sql::parser::SqlExplainMode,
33    },
34    error::{ErrorClass, ErrorOrigin, InternalError},
35    metrics::sink::{MetricsSink, with_metrics_sink},
36    traits::{CanisterKind, EntityKind, EntityValue},
37    value::Value,
38};
39use std::thread::LocalKey;
40
41// Map executor-owned plan-surface failures into query-owned plan errors.
42fn map_executor_plan_error(err: ExecutorPlanError) -> QueryError {
43    match err {
44        ExecutorPlanError::Cursor(err) => QueryError::from(PlanError::from(*err)),
45    }
46}
47
48// Decode one optional external cursor token and map decode failures into the
49// query-plan cursor error boundary.
50fn decode_optional_cursor_bytes(cursor_token: Option<&str>) -> Result<Option<Vec<u8>>, QueryError> {
51    decode_optional_cursor_token(cursor_token).map_err(|err| QueryError::from(PlanError::from(err)))
52}
53
54// Map SQL frontend parse/lowering failures into query-facing execution errors.
55fn map_sql_lowering_error(err: SqlLoweringError) -> QueryError {
56    match err {
57        SqlLoweringError::Query(err) => err,
58        SqlLoweringError::Parse(crate::db::sql::parser::SqlParseError::UnsupportedFeature {
59            feature,
60        }) => QueryError::execute(InternalError::query_unsupported_sql_feature(feature)),
61        other => QueryError::execute(InternalError::classified(
62            ErrorClass::Unsupported,
63            ErrorOrigin::Query,
64            format!("SQL query is not executable in this release: {other}"),
65        )),
66    }
67}
68
69// Resolve one aggregate target field through planner slot contracts before
70// aggregate terminal execution.
71fn resolve_sql_aggregate_target_slot<E: EntityKind>(field: &str) -> Result<FieldSlot, QueryError> {
72    FieldSlot::resolve(E::MODEL, field).ok_or_else(|| {
73        QueryError::execute(crate::db::error::executor_unsupported(format!(
74            "unknown aggregate target field: {field}",
75        )))
76    })
77}
78
79// Convert one lowered global SQL aggregate terminal into aggregate expression
80// contracts used by aggregate explain execution descriptors.
81fn sql_global_aggregate_terminal_to_expr<E: EntityKind>(
82    terminal: &SqlGlobalAggregateTerminal,
83) -> Result<AggregateExpr, QueryError> {
84    match terminal {
85        SqlGlobalAggregateTerminal::CountRows => Ok(count()),
86        SqlGlobalAggregateTerminal::CountField(field) => {
87            let _ = resolve_sql_aggregate_target_slot::<E>(field)?;
88
89            Ok(count_by(field.as_str()))
90        }
91        SqlGlobalAggregateTerminal::SumField(field) => {
92            let _ = resolve_sql_aggregate_target_slot::<E>(field)?;
93
94            Ok(sum(field.as_str()))
95        }
96        SqlGlobalAggregateTerminal::AvgField(field) => {
97            let _ = resolve_sql_aggregate_target_slot::<E>(field)?;
98
99            Ok(avg(field.as_str()))
100        }
101        SqlGlobalAggregateTerminal::MinField(field) => {
102            let _ = resolve_sql_aggregate_target_slot::<E>(field)?;
103
104            Ok(min_by(field.as_str()))
105        }
106        SqlGlobalAggregateTerminal::MaxField(field) => {
107            let _ = resolve_sql_aggregate_target_slot::<E>(field)?;
108
109            Ok(max_by(field.as_str()))
110        }
111    }
112}
113
114///
115/// DbSession
116///
117/// Session-scoped database handle with policy (debug, metrics) and execution routing.
118///
119
120pub struct DbSession<C: CanisterKind> {
121    db: Db<C>,
122    debug: bool,
123    metrics: Option<&'static dyn MetricsSink>,
124}
125
126impl<C: CanisterKind> DbSession<C> {
127    /// Construct one session facade for a database handle.
128    #[must_use]
129    pub(crate) const fn new(db: Db<C>) -> Self {
130        Self {
131            db,
132            debug: false,
133            metrics: None,
134        }
135    }
136
137    /// Construct one session facade from store registry and runtime hooks.
138    #[must_use]
139    pub const fn new_with_hooks(
140        store: &'static LocalKey<StoreRegistry>,
141        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
142    ) -> Self {
143        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
144    }
145
146    /// Enable debug execution behavior where supported by executors.
147    #[must_use]
148    pub const fn debug(mut self) -> Self {
149        self.debug = true;
150        self
151    }
152
153    /// Attach one metrics sink for all session-executed operations.
154    #[must_use]
155    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
156        self.metrics = Some(sink);
157        self
158    }
159
160    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
161        if let Some(sink) = self.metrics {
162            with_metrics_sink(sink, f)
163        } else {
164            f()
165        }
166    }
167
168    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
169    fn execute_save_with<E, T, R>(
170        &self,
171        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
172        map: impl FnOnce(T) -> R,
173    ) -> Result<R, InternalError>
174    where
175        E: EntityKind<Canister = C> + EntityValue,
176    {
177        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
178
179        Ok(map(value))
180    }
181
182    // Shared save-facade wrappers keep response shape explicit at call sites.
183    fn execute_save_entity<E>(
184        &self,
185        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
186    ) -> Result<E, InternalError>
187    where
188        E: EntityKind<Canister = C> + EntityValue,
189    {
190        self.execute_save_with(op, std::convert::identity)
191    }
192
193    fn execute_save_batch<E>(
194        &self,
195        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
196    ) -> Result<WriteBatchResponse<E>, InternalError>
197    where
198        E: EntityKind<Canister = C> + EntityValue,
199    {
200        self.execute_save_with(op, WriteBatchResponse::new)
201    }
202
203    fn execute_save_view<E>(
204        &self,
205        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
206    ) -> Result<E::ViewType, InternalError>
207    where
208        E: EntityKind<Canister = C> + EntityValue,
209    {
210        self.execute_save_with(op, std::convert::identity)
211    }
212
213    // ---------------------------------------------------------------------
214    // Query entry points (public, fluent)
215    // ---------------------------------------------------------------------
216
217    /// Start a fluent load query with default missing-row policy (`Ignore`).
218    #[must_use]
219    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
220    where
221        E: EntityKind<Canister = C>,
222    {
223        FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
224    }
225
226    /// Start a fluent load query with explicit missing-row policy.
227    #[must_use]
228    pub const fn load_with_consistency<E>(
229        &self,
230        consistency: MissingRowPolicy,
231    ) -> FluentLoadQuery<'_, E>
232    where
233        E: EntityKind<Canister = C>,
234    {
235        FluentLoadQuery::new(self, Query::new(consistency))
236    }
237
238    /// Build one typed query intent from one reduced SQL statement.
239    ///
240    /// This parser/lowering entrypoint is intentionally constrained to the
241    /// executable subset wired in the current release.
242    pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
243    where
244        E: EntityKind<Canister = C>,
245    {
246        let command = compile_sql_command::<E>(sql, MissingRowPolicy::Ignore)
247            .map_err(map_sql_lowering_error)?;
248
249        match command {
250            SqlCommand::Query(query) => Ok(query),
251            SqlCommand::Explain { .. } | SqlCommand::ExplainGlobalAggregate { .. } => {
252                Err(QueryError::execute(InternalError::classified(
253                    ErrorClass::Unsupported,
254                    ErrorOrigin::Query,
255                    "query_from_sql does not accept EXPLAIN statements; use explain_sql(...)",
256                )))
257            }
258        }
259    }
260
261    /// Execute one reduced SQL `SELECT`/`DELETE` statement for entity `E`.
262    pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
263    where
264        E: EntityKind<Canister = C> + EntityValue,
265    {
266        let query = self.query_from_sql::<E>(sql)?;
267        if query.has_grouping() {
268            return Err(QueryError::Intent(
269                IntentError::GroupedRequiresExecuteGrouped,
270            ));
271        }
272
273        self.execute_query(&query)
274    }
275
276    /// Execute one reduced SQL `SELECT` statement and return projection-shaped rows.
277    ///
278    /// This surface keeps `execute_sql(...)` backwards-compatible for callers
279    /// that currently consume full entity rows.
280    pub fn execute_sql_projection<E>(&self, sql: &str) -> Result<ProjectionResponse<E>, QueryError>
281    where
282        E: EntityKind<Canister = C> + EntityValue,
283    {
284        let query = self.query_from_sql::<E>(sql)?;
285        if query.has_grouping() {
286            return Err(QueryError::Intent(
287                IntentError::GroupedRequiresExecuteGrouped,
288            ));
289        }
290
291        match query.mode() {
292            QueryMode::Load(_) => {
293                self.execute_load_query_with(&query, |load, plan| load.execute_projection(plan))
294            }
295            QueryMode::Delete(_) => Err(QueryError::execute(InternalError::classified(
296                ErrorClass::Unsupported,
297                ErrorOrigin::Query,
298                "execute_sql_projection only supports SELECT statements",
299            ))),
300        }
301    }
302
303    /// Execute one reduced SQL global aggregate `SELECT` statement.
304    ///
305    /// This entrypoint is intentionally constrained to one aggregate terminal
306    /// shape per statement and preserves existing terminal semantics.
307    pub fn execute_sql_aggregate<E>(&self, sql: &str) -> Result<Value, QueryError>
308    where
309        E: EntityKind<Canister = C> + EntityValue,
310    {
311        let command = compile_sql_global_aggregate_command::<E>(sql, MissingRowPolicy::Ignore)
312            .map_err(map_sql_lowering_error)?;
313
314        match command.terminal() {
315            SqlGlobalAggregateTerminal::CountRows => self
316                .execute_load_query_with(command.query(), |load, plan| load.aggregate_count(plan))
317                .map(|count| Value::Uint(u64::from(count))),
318            SqlGlobalAggregateTerminal::CountField(field) => {
319                let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
320                self.execute_load_query_with(command.query(), |load, plan| {
321                    load.values_by_slot(plan, target_slot)
322                })
323                .map(|values| {
324                    let count = values
325                        .into_iter()
326                        .filter(|value| !matches!(value, Value::Null))
327                        .count();
328                    Value::Uint(u64::try_from(count).unwrap_or(u64::MAX))
329                })
330            }
331            SqlGlobalAggregateTerminal::SumField(field) => {
332                let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
333                self.execute_load_query_with(command.query(), |load, plan| {
334                    load.aggregate_sum_by_slot(plan, target_slot)
335                })
336                .map(|value| value.map_or(Value::Null, Value::Decimal))
337            }
338            SqlGlobalAggregateTerminal::AvgField(field) => {
339                let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
340                self.execute_load_query_with(command.query(), |load, plan| {
341                    load.aggregate_avg_by_slot(plan, target_slot)
342                })
343                .map(|value| value.map_or(Value::Null, Value::Decimal))
344            }
345            SqlGlobalAggregateTerminal::MinField(field) => {
346                let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
347                let min_id = self.execute_load_query_with(command.query(), |load, plan| {
348                    load.aggregate_min_by_slot(plan, target_slot)
349                })?;
350
351                match min_id {
352                    Some(id) => self
353                        .load::<E>()
354                        .by_id(id)
355                        .first_value_by(field)
356                        .map(|value| value.unwrap_or(Value::Null)),
357                    None => Ok(Value::Null),
358                }
359            }
360            SqlGlobalAggregateTerminal::MaxField(field) => {
361                let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
362                let max_id = self.execute_load_query_with(command.query(), |load, plan| {
363                    load.aggregate_max_by_slot(plan, target_slot)
364                })?;
365
366                match max_id {
367                    Some(id) => self
368                        .load::<E>()
369                        .by_id(id)
370                        .first_value_by(field)
371                        .map(|value| value.unwrap_or(Value::Null)),
372                    None => Ok(Value::Null),
373                }
374            }
375        }
376    }
377
378    /// Execute one reduced SQL grouped `SELECT` statement and return grouped rows.
379    pub fn execute_sql_grouped<E>(
380        &self,
381        sql: &str,
382        cursor_token: Option<&str>,
383    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
384    where
385        E: EntityKind<Canister = C> + EntityValue,
386    {
387        let query = self.query_from_sql::<E>(sql)?;
388        if !query.has_grouping() {
389            return Err(QueryError::execute(InternalError::classified(
390                ErrorClass::Unsupported,
391                ErrorOrigin::Query,
392                "execute_sql_grouped requires grouped SQL query intent",
393            )));
394        }
395
396        self.execute_grouped(&query, cursor_token)
397    }
398
399    /// Explain one reduced SQL statement for entity `E`.
400    ///
401    /// Supported modes:
402    /// - `EXPLAIN ...` -> logical plan text
403    /// - `EXPLAIN EXECUTION ...` -> execution descriptor text
404    /// - `EXPLAIN JSON ...` -> logical plan canonical JSON
405    pub fn explain_sql<E>(&self, sql: &str) -> Result<String, QueryError>
406    where
407        E: EntityKind<Canister = C> + EntityValue,
408    {
409        let command = compile_sql_command::<E>(sql, MissingRowPolicy::Ignore)
410            .map_err(map_sql_lowering_error)?;
411
412        match command {
413            SqlCommand::Query(_) => Err(QueryError::execute(InternalError::classified(
414                ErrorClass::Unsupported,
415                ErrorOrigin::Query,
416                "explain_sql requires an EXPLAIN statement",
417            ))),
418            SqlCommand::Explain { mode, query } => match mode {
419                SqlExplainMode::Plan => Ok(query.explain()?.render_text_canonical()),
420                SqlExplainMode::Execution => query.explain_execution_text(),
421                SqlExplainMode::Json => Ok(query.explain()?.render_json_canonical()),
422            },
423            SqlCommand::ExplainGlobalAggregate { mode, command } => {
424                Self::explain_sql_global_aggregate::<E>(mode, command)
425            }
426        }
427    }
428
429    // Render one EXPLAIN payload for constrained global aggregate SQL command.
430    fn explain_sql_global_aggregate<E>(
431        mode: SqlExplainMode,
432        command: SqlGlobalAggregateCommand<E>,
433    ) -> Result<String, QueryError>
434    where
435        E: EntityKind<Canister = C> + EntityValue,
436    {
437        match mode {
438            SqlExplainMode::Plan => {
439                // Keep explain validation parity with execution by requiring the
440                // target field to resolve before returning explain output.
441                let _ = sql_global_aggregate_terminal_to_expr::<E>(command.terminal())?;
442
443                Ok(command.query().explain()?.render_text_canonical())
444            }
445            SqlExplainMode::Execution => {
446                let aggregate = sql_global_aggregate_terminal_to_expr::<E>(command.terminal())?;
447                let plan = Self::explain_load_query_terminal_with(command.query(), aggregate)?;
448
449                Ok(plan.execution_node_descriptor().render_text_tree())
450            }
451            SqlExplainMode::Json => {
452                // Keep explain validation parity with execution by requiring the
453                // target field to resolve before returning explain output.
454                let _ = sql_global_aggregate_terminal_to_expr::<E>(command.terminal())?;
455
456                Ok(command.query().explain()?.render_json_canonical())
457            }
458        }
459    }
460
461    /// Start a fluent delete query with default missing-row policy (`Ignore`).
462    #[must_use]
463    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
464    where
465        E: EntityKind<Canister = C>,
466    {
467        FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
468    }
469
470    /// Start a fluent delete query with explicit missing-row policy.
471    #[must_use]
472    pub fn delete_with_consistency<E>(
473        &self,
474        consistency: MissingRowPolicy,
475    ) -> FluentDeleteQuery<'_, E>
476    where
477        E: EntityKind<Canister = C>,
478    {
479        FluentDeleteQuery::new(self, Query::new(consistency).delete())
480    }
481
482    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
483    ///
484    /// This terminal bypasses query planning and access routing entirely.
485    #[must_use]
486    pub const fn select_one(&self) -> Value {
487        Value::Int(1)
488    }
489
490    /// Return one stable, human-readable index listing for the entity schema.
491    ///
492    /// Output format mirrors SQL-style introspection:
493    /// - `PRIMARY KEY (field)`
494    /// - `INDEX name (field_a, field_b)`
495    /// - `UNIQUE INDEX name (field_a, field_b)`
496    #[must_use]
497    pub fn show_indexes<E>(&self) -> Vec<String>
498    where
499        E: EntityKind<Canister = C>,
500    {
501        show_indexes_for_model(E::MODEL)
502    }
503
504    /// Return one structured schema description for the entity.
505    ///
506    /// This is a typed `DESCRIBE`-style introspection surface consumed by
507    /// developer tooling and pre-EXPLAIN debugging.
508    #[must_use]
509    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
510    where
511        E: EntityKind<Canister = C>,
512    {
513        describe_entity_model(E::MODEL)
514    }
515
516    /// Build one point-in-time storage report for observability endpoints.
517    pub fn storage_report(
518        &self,
519        name_to_path: &[(&'static str, &'static str)],
520    ) -> Result<StorageReport, InternalError> {
521        self.db.storage_report(name_to_path)
522    }
523
524    // ---------------------------------------------------------------------
525    // Low-level executors (crate-internal; execution primitives)
526    // ---------------------------------------------------------------------
527
528    #[must_use]
529    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
530    where
531        E: EntityKind<Canister = C> + EntityValue,
532    {
533        LoadExecutor::new(self.db, self.debug)
534    }
535
536    #[must_use]
537    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
538    where
539        E: EntityKind<Canister = C> + EntityValue,
540    {
541        DeleteExecutor::new(self.db, self.debug)
542    }
543
544    #[must_use]
545    pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
546    where
547        E: EntityKind<Canister = C> + EntityValue,
548    {
549        SaveExecutor::new(self.db, self.debug)
550    }
551
552    // ---------------------------------------------------------------------
553    // Query diagnostics / execution (internal routing)
554    // ---------------------------------------------------------------------
555
556    /// Execute one scalar load/delete query and return materialized response rows.
557    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
558    where
559        E: EntityKind<Canister = C> + EntityValue,
560    {
561        let plan = query.plan()?.into_executable();
562
563        let result = match query.mode() {
564            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
565            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
566        };
567
568        result.map_err(QueryError::execute)
569    }
570
571    // Shared load-query terminal wrapper: build plan, run under metrics, map
572    // execution errors into query-facing errors.
573    pub(in crate::db) fn execute_load_query_with<E, T>(
574        &self,
575        query: &Query<E>,
576        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
577    ) -> Result<T, QueryError>
578    where
579        E: EntityKind<Canister = C> + EntityValue,
580    {
581        let plan = query.plan()?.into_executable();
582
583        self.with_metrics(|| op(self.load_executor::<E>(), plan))
584            .map_err(QueryError::execute)
585    }
586
587    /// Build one trace payload for a query without executing it.
588    ///
589    /// This lightweight surface is intended for developer diagnostics:
590    /// plan hash, access strategy summary, and planner/executor route shape.
591    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
592    where
593        E: EntityKind<Canister = C>,
594    {
595        let compiled = query.plan()?;
596        let explain = compiled.explain();
597        let plan_hash = compiled.plan_hash_hex();
598
599        let executable = compiled.into_executable();
600        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
601        let execution_strategy = match query.mode() {
602            QueryMode::Load(_) => Some(trace_execution_strategy(
603                executable
604                    .execution_strategy()
605                    .map_err(QueryError::execute)?,
606            )),
607            QueryMode::Delete(_) => None,
608        };
609
610        Ok(QueryTracePlan::new(
611            plan_hash,
612            access_strategy,
613            execution_strategy,
614            explain,
615        ))
616    }
617
618    /// Build one aggregate-terminal explain payload without executing the query.
619    pub(crate) fn explain_load_query_terminal_with<E>(
620        query: &Query<E>,
621        aggregate: AggregateExpr,
622    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
623    where
624        E: EntityKind<Canister = C> + EntityValue,
625    {
626        // Phase 1: build one compiled query once and project logical explain output.
627        let compiled = query.plan()?;
628        let query_explain = compiled.explain();
629        let terminal = aggregate.kind();
630
631        // Phase 2: derive the executor route label for this aggregate terminal.
632        let executable = compiled.into_executable();
633        let execution = executable.explain_aggregate_terminal_execution_descriptor(aggregate);
634
635        Ok(ExplainAggregateTerminalPlan::new(
636            query_explain,
637            terminal,
638            execution,
639        ))
640    }
641
642    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
643    pub(crate) fn execute_load_query_paged_with_trace<E>(
644        &self,
645        query: &Query<E>,
646        cursor_token: Option<&str>,
647    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
648    where
649        E: EntityKind<Canister = C> + EntityValue,
650    {
651        // Phase 1: build/validate executable plan and reject grouped plans.
652        let plan = query.plan()?.into_executable();
653        match plan.execution_strategy().map_err(QueryError::execute)? {
654            ExecutionStrategy::PrimaryKey => {
655                return Err(QueryError::execute(
656                    crate::db::error::query_executor_invariant(
657                        "cursor pagination requires explicit or grouped ordering",
658                    ),
659                ));
660            }
661            ExecutionStrategy::Ordered => {}
662            ExecutionStrategy::Grouped => {
663                return Err(QueryError::execute(
664                    crate::db::error::query_executor_invariant(
665                        "grouped plans require execute_grouped(...)",
666                    ),
667                ));
668            }
669        }
670
671        // Phase 2: decode external cursor token and validate it against plan surface.
672        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
673        let cursor = plan
674            .prepare_cursor(cursor_bytes.as_deref())
675            .map_err(map_executor_plan_error)?;
676
677        // Phase 3: execute one traced page and encode outbound continuation token.
678        let (page, trace) = self
679            .with_metrics(|| {
680                self.load_executor::<E>()
681                    .execute_paged_with_cursor_traced(plan, cursor)
682            })
683            .map_err(QueryError::execute)?;
684        let next_cursor = page
685            .next_cursor
686            .map(|token| {
687                let Some(token) = token.as_scalar() else {
688                    return Err(QueryError::execute(
689                        crate::db::error::query_executor_invariant(
690                            "scalar load pagination emitted grouped continuation token",
691                        ),
692                    ));
693                };
694
695                token.encode().map_err(|err| {
696                    QueryError::execute(InternalError::serialize_internal(format!(
697                        "failed to serialize continuation cursor: {err}"
698                    )))
699                })
700            })
701            .transpose()?;
702
703        Ok(PagedLoadExecutionWithTrace::new(
704            page.items,
705            next_cursor,
706            trace,
707        ))
708    }
709
710    /// Execute one grouped query page with optional grouped continuation cursor.
711    ///
712    /// This is the explicit grouped execution boundary; scalar load APIs reject
713    /// grouped plans to preserve scalar response contracts.
714    pub fn execute_grouped<E>(
715        &self,
716        query: &Query<E>,
717        cursor_token: Option<&str>,
718    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
719    where
720        E: EntityKind<Canister = C> + EntityValue,
721    {
722        // Phase 1: build/validate executable plan and require grouped shape.
723        let plan = query.plan()?.into_executable();
724        if !matches!(
725            plan.execution_strategy().map_err(QueryError::execute)?,
726            ExecutionStrategy::Grouped
727        ) {
728            return Err(QueryError::execute(
729                crate::db::error::query_executor_invariant(
730                    "execute_grouped requires grouped logical plans",
731                ),
732            ));
733        }
734
735        // Phase 2: decode external grouped cursor token and validate against plan.
736        let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
737        let cursor = plan
738            .prepare_grouped_cursor(cursor_bytes.as_deref())
739            .map_err(map_executor_plan_error)?;
740
741        // Phase 3: execute grouped page and encode outbound grouped continuation token.
742        let (page, trace) = self
743            .with_metrics(|| {
744                self.load_executor::<E>()
745                    .execute_grouped_paged_with_cursor_traced(plan, cursor)
746            })
747            .map_err(QueryError::execute)?;
748        let next_cursor = page
749            .next_cursor
750            .map(|token| {
751                let Some(token) = token.as_grouped() else {
752                    return Err(QueryError::execute(
753                        crate::db::error::query_executor_invariant(
754                            "grouped pagination emitted scalar continuation token",
755                        ),
756                    ));
757                };
758
759                token.encode().map_err(|err| {
760                    QueryError::execute(InternalError::serialize_internal(format!(
761                        "failed to serialize grouped continuation cursor: {err}"
762                    )))
763                })
764            })
765            .transpose()?;
766
767        Ok(PagedGroupedExecutionWithTrace::new(
768            page.rows,
769            next_cursor,
770            trace,
771        ))
772    }
773
774    // ---------------------------------------------------------------------
775    // High-level write API (public, intent-level)
776    // ---------------------------------------------------------------------
777
778    /// Insert one entity row.
779    pub fn insert<E>(&self, entity: E) -> Result<E, InternalError>
780    where
781        E: EntityKind<Canister = C> + EntityValue,
782    {
783        self.execute_save_entity(|save| save.insert(entity))
784    }
785
786    /// Insert a single-entity-type batch atomically in one commit window.
787    ///
788    /// If any item fails pre-commit validation, no row in the batch is persisted.
789    ///
790    /// This API is not a multi-entity transaction surface.
791    pub fn insert_many_atomic<E>(
792        &self,
793        entities: impl IntoIterator<Item = E>,
794    ) -> Result<WriteBatchResponse<E>, InternalError>
795    where
796        E: EntityKind<Canister = C> + EntityValue,
797    {
798        self.execute_save_batch(|save| save.insert_many_atomic(entities))
799    }
800
801    /// Insert a batch with explicitly non-atomic semantics.
802    ///
803    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
804    pub fn insert_many_non_atomic<E>(
805        &self,
806        entities: impl IntoIterator<Item = E>,
807    ) -> Result<WriteBatchResponse<E>, InternalError>
808    where
809        E: EntityKind<Canister = C> + EntityValue,
810    {
811        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
812    }
813
814    /// Replace one existing entity row.
815    pub fn replace<E>(&self, entity: E) -> Result<E, InternalError>
816    where
817        E: EntityKind<Canister = C> + EntityValue,
818    {
819        self.execute_save_entity(|save| save.replace(entity))
820    }
821
822    /// Replace a single-entity-type batch atomically in one commit window.
823    ///
824    /// If any item fails pre-commit validation, no row in the batch is persisted.
825    ///
826    /// This API is not a multi-entity transaction surface.
827    pub fn replace_many_atomic<E>(
828        &self,
829        entities: impl IntoIterator<Item = E>,
830    ) -> Result<WriteBatchResponse<E>, InternalError>
831    where
832        E: EntityKind<Canister = C> + EntityValue,
833    {
834        self.execute_save_batch(|save| save.replace_many_atomic(entities))
835    }
836
837    /// Replace a batch with explicitly non-atomic semantics.
838    ///
839    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
840    pub fn replace_many_non_atomic<E>(
841        &self,
842        entities: impl IntoIterator<Item = E>,
843    ) -> Result<WriteBatchResponse<E>, InternalError>
844    where
845        E: EntityKind<Canister = C> + EntityValue,
846    {
847        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
848    }
849
850    /// Update one existing entity row.
851    pub fn update<E>(&self, entity: E) -> Result<E, InternalError>
852    where
853        E: EntityKind<Canister = C> + EntityValue,
854    {
855        self.execute_save_entity(|save| save.update(entity))
856    }
857
858    /// Update a single-entity-type batch atomically in one commit window.
859    ///
860    /// If any item fails pre-commit validation, no row in the batch is persisted.
861    ///
862    /// This API is not a multi-entity transaction surface.
863    pub fn update_many_atomic<E>(
864        &self,
865        entities: impl IntoIterator<Item = E>,
866    ) -> Result<WriteBatchResponse<E>, InternalError>
867    where
868        E: EntityKind<Canister = C> + EntityValue,
869    {
870        self.execute_save_batch(|save| save.update_many_atomic(entities))
871    }
872
873    /// Update a batch with explicitly non-atomic semantics.
874    ///
875    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
876    pub fn update_many_non_atomic<E>(
877        &self,
878        entities: impl IntoIterator<Item = E>,
879    ) -> Result<WriteBatchResponse<E>, InternalError>
880    where
881        E: EntityKind<Canister = C> + EntityValue,
882    {
883        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
884    }
885
886    /// Insert one view value and return the stored view.
887    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
888    where
889        E: EntityKind<Canister = C> + EntityValue,
890    {
891        self.execute_save_view::<E>(|save| save.insert_view(view))
892    }
893
894    /// Replace one view value and return the stored view.
895    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
896    where
897        E: EntityKind<Canister = C> + EntityValue,
898    {
899        self.execute_save_view::<E>(|save| save.replace_view(view))
900    }
901
902    /// Update one view value and return the stored view.
903    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
904    where
905        E: EntityKind<Canister = C> + EntityValue,
906    {
907        self.execute_save_view::<E>(|save| save.update_view(view))
908    }
909
910    /// TEST ONLY: clear all registered data and index stores for this database.
911    #[cfg(test)]
912    #[doc(hidden)]
913    pub fn clear_stores_for_tests(&self) {
914        self.db.with_store_registry(|reg| {
915            // Test cleanup only: clearing all stores is set-like and does not
916            // depend on registry iteration order.
917            for (_, store) in reg.iter() {
918                store.with_data_mut(DataStore::clear);
919                store.with_index_mut(IndexStore::clear);
920            }
921        });
922    }
923}
924
925const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
926    match strategy {
927        ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
928        ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
929        ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
930    }
931}
932
933///
934/// TESTS
935///
936
937#[cfg(test)]
938mod tests {
939    use super::*;
940    use crate::{
941        db::{
942            Db,
943            commit::{ensure_recovered, init_commit_store_for_tests},
944            cursor::CursorPlanError,
945            data::DataStore,
946            index::IndexStore,
947            query::plan::expr::{Expr, ProjectionField},
948            registry::StoreRegistry,
949        },
950        error::{ErrorClass, ErrorDetail, ErrorOrigin, QueryErrorDetail},
951        model::field::FieldKind,
952        testing::test_memory,
953        traits::Path,
954        types::Ulid,
955        value::Value,
956    };
957    use icydb_derive::FieldProjection;
958    use serde::{Deserialize, Serialize};
959    use std::cell::RefCell;
960
961    crate::test_canister! {
962        ident = SessionSqlCanister,
963        commit_memory_id = crate::testing::test_commit_memory_id(),
964    }
965
966    crate::test_store! {
967        ident = SessionSqlStore,
968        canister = SessionSqlCanister,
969    }
970
971    thread_local! {
972        static SESSION_SQL_DATA_STORE: RefCell<DataStore> =
973            RefCell::new(DataStore::init(test_memory(160)));
974        static SESSION_SQL_INDEX_STORE: RefCell<IndexStore> =
975            RefCell::new(IndexStore::init(test_memory(161)));
976        static SESSION_SQL_STORE_REGISTRY: StoreRegistry = {
977            let mut reg = StoreRegistry::new();
978            reg.register_store(
979                SessionSqlStore::PATH,
980                &SESSION_SQL_DATA_STORE,
981                &SESSION_SQL_INDEX_STORE,
982            )
983            .expect("SQL session test store registration should succeed");
984            reg
985        };
986    }
987
988    static SESSION_SQL_DB: Db<SessionSqlCanister> = Db::new(&SESSION_SQL_STORE_REGISTRY);
989
990    ///
991    /// SessionSqlEntity
992    ///
993    /// Test entity used to lock end-to-end reduced SQL session behavior.
994    ///
995
996    #[derive(Clone, Debug, Default, Deserialize, FieldProjection, PartialEq, Serialize)]
997    struct SessionSqlEntity {
998        id: Ulid,
999        name: String,
1000        age: u64,
1001    }
1002
1003    crate::test_entity_schema! {
1004        ident = SessionSqlEntity,
1005        id = Ulid,
1006        id_field = id,
1007        entity_name = "SessionSqlEntity",
1008        primary_key = "id",
1009        pk_index = 0,
1010        fields = [
1011            ("id", FieldKind::Ulid),
1012            ("name", FieldKind::Text),
1013            ("age", FieldKind::Uint),
1014        ],
1015        indexes = [],
1016        store = SessionSqlStore,
1017        canister = SessionSqlCanister,
1018    }
1019
1020    // Reset all session SQL fixture state between tests to preserve deterministic assertions.
1021    fn reset_session_sql_store() {
1022        init_commit_store_for_tests().expect("commit store init should succeed");
1023        ensure_recovered(&SESSION_SQL_DB).expect("write-side recovery should succeed");
1024        SESSION_SQL_DATA_STORE.with(|store| store.borrow_mut().clear());
1025        SESSION_SQL_INDEX_STORE.with(|store| store.borrow_mut().clear());
1026    }
1027
1028    fn sql_session() -> DbSession<SessionSqlCanister> {
1029        DbSession::new(SESSION_SQL_DB)
1030    }
1031
1032    // Assert query-surface cursor errors remain wrapped under QueryError::Plan(PlanError::Cursor).
1033    fn assert_query_error_is_cursor_plan(
1034        err: QueryError,
1035        predicate: impl FnOnce(&CursorPlanError) -> bool,
1036    ) {
1037        assert!(matches!(
1038            err,
1039            QueryError::Plan(plan_err)
1040                if matches!(
1041                    plan_err.as_ref(),
1042                    PlanError::Cursor(inner) if predicate(inner.as_ref())
1043                )
1044        ));
1045    }
1046
1047    // Assert both session conversion paths preserve the same cursor-plan variant payload.
1048    fn assert_cursor_mapping_parity(
1049        build: impl Fn() -> CursorPlanError,
1050        predicate: impl Fn(&CursorPlanError) -> bool + Copy,
1051    ) {
1052        let mapped_via_executor = map_executor_plan_error(ExecutorPlanError::from(build()));
1053        assert_query_error_is_cursor_plan(mapped_via_executor, predicate);
1054
1055        let mapped_via_plan = QueryError::from(PlanError::from(build()));
1056        assert_query_error_is_cursor_plan(mapped_via_plan, predicate);
1057    }
1058
1059    // Assert SQL parser unsupported-feature labels remain preserved through
1060    // query-facing execution error detail payloads.
1061    fn assert_sql_unsupported_feature_detail(err: QueryError, expected_feature: &'static str) {
1062        let QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1063            internal,
1064        )) = err
1065        else {
1066            panic!("expected query execution unsupported error variant");
1067        };
1068
1069        assert_eq!(internal.class(), ErrorClass::Unsupported);
1070        assert_eq!(internal.origin(), ErrorOrigin::Query);
1071        assert!(
1072            matches!(
1073                internal.detail(),
1074                Some(ErrorDetail::Query(QueryErrorDetail::UnsupportedSqlFeature { feature }))
1075                    if *feature == expected_feature
1076            ),
1077            "unsupported SQL feature detail label should be preserved",
1078        );
1079    }
1080
1081    fn unsupported_sql_feature_cases() -> [(&'static str, &'static str); 3] {
1082        [
1083            (
1084                "SELECT * FROM SessionSqlEntity JOIN other ON SessionSqlEntity.id = other.id",
1085                "JOIN",
1086            ),
1087            (
1088                "SELECT \"name\" FROM SessionSqlEntity",
1089                "quoted identifiers",
1090            ),
1091            ("SELECT * FROM SessionSqlEntity alias", "table aliases"),
1092        ]
1093    }
1094
1095    #[test]
1096    fn session_cursor_error_mapping_parity_boundary_arity() {
1097        assert_cursor_mapping_parity(
1098            || CursorPlanError::continuation_cursor_boundary_arity_mismatch(2, 1),
1099            |inner| {
1100                matches!(
1101                    inner,
1102                    CursorPlanError::ContinuationCursorBoundaryArityMismatch {
1103                        expected: 2,
1104                        found: 1
1105                    }
1106                )
1107            },
1108        );
1109    }
1110
1111    #[test]
1112    fn session_cursor_error_mapping_parity_window_mismatch() {
1113        assert_cursor_mapping_parity(
1114            || CursorPlanError::continuation_cursor_window_mismatch(8, 3),
1115            |inner| {
1116                matches!(
1117                    inner,
1118                    CursorPlanError::ContinuationCursorWindowMismatch {
1119                        expected_offset: 8,
1120                        actual_offset: 3
1121                    }
1122                )
1123            },
1124        );
1125    }
1126
1127    #[test]
1128    fn session_cursor_error_mapping_parity_decode_reason() {
1129        assert_cursor_mapping_parity(
1130            || {
1131                CursorPlanError::invalid_continuation_cursor(
1132                    crate::db::codec::cursor::CursorDecodeError::OddLength,
1133                )
1134            },
1135            |inner| {
1136                matches!(
1137                    inner,
1138                    CursorPlanError::InvalidContinuationCursor {
1139                        reason: crate::db::codec::cursor::CursorDecodeError::OddLength
1140                    }
1141                )
1142            },
1143        );
1144    }
1145
1146    #[test]
1147    fn session_cursor_error_mapping_parity_primary_key_type_mismatch() {
1148        assert_cursor_mapping_parity(
1149            || {
1150                CursorPlanError::continuation_cursor_primary_key_type_mismatch(
1151                    "id",
1152                    "ulid",
1153                    Some(crate::value::Value::Text("not-a-ulid".to_string())),
1154                )
1155            },
1156            |inner| {
1157                matches!(
1158                    inner,
1159                    CursorPlanError::ContinuationCursorPrimaryKeyTypeMismatch {
1160                        field,
1161                        expected,
1162                        value: Some(crate::value::Value::Text(value))
1163                    } if field == "id" && expected == "ulid" && value == "not-a-ulid"
1164                )
1165            },
1166        );
1167    }
1168
1169    #[test]
1170    fn session_cursor_error_mapping_parity_matrix_preserves_cursor_variants() {
1171        // Keep one matrix-level canary test name so cross-module audit references remain stable.
1172        assert_cursor_mapping_parity(
1173            || CursorPlanError::continuation_cursor_boundary_arity_mismatch(2, 1),
1174            |inner| {
1175                matches!(
1176                    inner,
1177                    CursorPlanError::ContinuationCursorBoundaryArityMismatch {
1178                        expected: 2,
1179                        found: 1
1180                    }
1181                )
1182            },
1183        );
1184    }
1185
1186    #[test]
1187    fn execute_sql_select_star_honors_order_limit_offset() {
1188        reset_session_sql_store();
1189        let session = sql_session();
1190
1191        session
1192            .insert(SessionSqlEntity {
1193                id: Ulid::generate(),
1194                name: "older".to_string(),
1195                age: 37,
1196            })
1197            .expect("seed insert should succeed");
1198        session
1199            .insert(SessionSqlEntity {
1200                id: Ulid::generate(),
1201                name: "younger".to_string(),
1202                age: 19,
1203            })
1204            .expect("seed insert should succeed");
1205
1206        let response = session
1207            .execute_sql::<SessionSqlEntity>(
1208                "SELECT * FROM SessionSqlEntity ORDER BY age ASC LIMIT 1 OFFSET 1",
1209            )
1210            .expect("SELECT * should execute");
1211
1212        assert_eq!(response.count(), 1, "window should return one row");
1213        let row = response
1214            .iter()
1215            .next()
1216            .expect("windowed result should include one row");
1217        assert_eq!(
1218            row.entity_ref().name,
1219            "older",
1220            "ordered window should return the second age-ordered row",
1221        );
1222    }
1223
1224    #[test]
1225    fn execute_sql_delete_honors_predicate_order_and_limit() {
1226        reset_session_sql_store();
1227        let session = sql_session();
1228
1229        session
1230            .insert(SessionSqlEntity {
1231                id: Ulid::generate(),
1232                name: "first-minor".to_string(),
1233                age: 16,
1234            })
1235            .expect("seed insert should succeed");
1236        session
1237            .insert(SessionSqlEntity {
1238                id: Ulid::generate(),
1239                name: "second-minor".to_string(),
1240                age: 17,
1241            })
1242            .expect("seed insert should succeed");
1243        session
1244            .insert(SessionSqlEntity {
1245                id: Ulid::generate(),
1246                name: "adult".to_string(),
1247                age: 42,
1248            })
1249            .expect("seed insert should succeed");
1250
1251        let deleted = session
1252            .execute_sql::<SessionSqlEntity>(
1253                "DELETE FROM SessionSqlEntity WHERE age < 20 ORDER BY age ASC LIMIT 1",
1254            )
1255            .expect("DELETE should execute");
1256
1257        assert_eq!(deleted.count(), 1, "delete limit should remove one row");
1258        assert_eq!(
1259            deleted
1260                .iter()
1261                .next()
1262                .expect("deleted row should exist")
1263                .entity_ref()
1264                .age,
1265            16,
1266            "ordered delete should remove the youngest matching row first",
1267        );
1268
1269        let remaining = session
1270            .load::<SessionSqlEntity>()
1271            .order_by("age")
1272            .execute()
1273            .expect("post-delete load should succeed");
1274        let remaining_ages = remaining
1275            .iter()
1276            .map(|row| row.entity_ref().age)
1277            .collect::<Vec<_>>();
1278
1279        assert_eq!(
1280            remaining_ages,
1281            vec![17, 42],
1282            "delete window semantics should preserve non-deleted rows",
1283        );
1284    }
1285
1286    #[test]
1287    fn query_from_sql_rejects_explain_statements() {
1288        reset_session_sql_store();
1289        let session = sql_session();
1290
1291        let err = session
1292            .query_from_sql::<SessionSqlEntity>("EXPLAIN SELECT * FROM SessionSqlEntity")
1293            .expect_err("query_from_sql must reject EXPLAIN statements");
1294
1295        assert!(
1296            matches!(
1297                err,
1298                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1299                    _
1300                ))
1301            ),
1302            "query_from_sql EXPLAIN rejection must map to unsupported execution class",
1303        );
1304    }
1305
1306    #[test]
1307    fn query_from_sql_preserves_parser_unsupported_feature_detail_labels() {
1308        reset_session_sql_store();
1309        let session = sql_session();
1310
1311        for (sql, feature) in unsupported_sql_feature_cases() {
1312            let err = session
1313                .query_from_sql::<SessionSqlEntity>(sql)
1314                .expect_err("unsupported SQL feature should fail through query_from_sql");
1315            assert_sql_unsupported_feature_detail(err, feature);
1316        }
1317    }
1318
1319    #[test]
1320    fn execute_sql_preserves_parser_unsupported_feature_detail_labels() {
1321        reset_session_sql_store();
1322        let session = sql_session();
1323
1324        for (sql, feature) in unsupported_sql_feature_cases() {
1325            let err = session
1326                .execute_sql::<SessionSqlEntity>(sql)
1327                .expect_err("unsupported SQL feature should fail through execute_sql");
1328            assert_sql_unsupported_feature_detail(err, feature);
1329        }
1330    }
1331
1332    #[test]
1333    fn execute_sql_projection_preserves_parser_unsupported_feature_detail_labels() {
1334        reset_session_sql_store();
1335        let session = sql_session();
1336
1337        for (sql, feature) in unsupported_sql_feature_cases() {
1338            let err = session
1339                .execute_sql_projection::<SessionSqlEntity>(sql)
1340                .expect_err("unsupported SQL feature should fail through execute_sql_projection");
1341            assert_sql_unsupported_feature_detail(err, feature);
1342        }
1343    }
1344
1345    #[test]
1346    fn execute_sql_grouped_preserves_parser_unsupported_feature_detail_labels() {
1347        reset_session_sql_store();
1348        let session = sql_session();
1349
1350        for (sql, feature) in unsupported_sql_feature_cases() {
1351            let err = session
1352                .execute_sql_grouped::<SessionSqlEntity>(sql, None)
1353                .expect_err("unsupported SQL feature should fail through execute_sql_grouped");
1354            assert_sql_unsupported_feature_detail(err, feature);
1355        }
1356    }
1357
1358    #[test]
1359    fn execute_sql_aggregate_preserves_parser_unsupported_feature_detail_labels() {
1360        reset_session_sql_store();
1361        let session = sql_session();
1362
1363        for (sql, feature) in unsupported_sql_feature_cases() {
1364            let err = session
1365                .execute_sql_aggregate::<SessionSqlEntity>(sql)
1366                .expect_err("unsupported SQL feature should fail through execute_sql_aggregate");
1367            assert_sql_unsupported_feature_detail(err, feature);
1368        }
1369    }
1370
1371    #[test]
1372    fn explain_sql_preserves_parser_unsupported_feature_detail_labels() {
1373        reset_session_sql_store();
1374        let session = sql_session();
1375
1376        for (sql, feature) in unsupported_sql_feature_cases() {
1377            let explain_sql = format!("EXPLAIN {sql}");
1378            let err = session
1379                .explain_sql::<SessionSqlEntity>(explain_sql.as_str())
1380                .expect_err("unsupported SQL feature should fail through explain_sql");
1381            assert_sql_unsupported_feature_detail(err, feature);
1382        }
1383    }
1384
1385    #[test]
1386    fn query_from_sql_select_field_projection_lowers_to_scalar_field_selection() {
1387        reset_session_sql_store();
1388        let session = sql_session();
1389
1390        let query = session
1391            .query_from_sql::<SessionSqlEntity>("SELECT name, age FROM SessionSqlEntity")
1392            .expect("field-list SQL query should lower");
1393        let projection = query
1394            .plan()
1395            .expect("field-list SQL plan should build")
1396            .projection_spec();
1397        let field_names = projection
1398            .fields()
1399            .map(|field| match field {
1400                ProjectionField::Scalar {
1401                    expr: Expr::Field(field),
1402                    alias: None,
1403                } => field.as_str().to_string(),
1404                other @ ProjectionField::Scalar { .. } => {
1405                    panic!("field-list SQL projection should lower to plain field exprs: {other:?}")
1406                }
1407            })
1408            .collect::<Vec<_>>();
1409
1410        assert_eq!(field_names, vec!["name".to_string(), "age".to_string()]);
1411    }
1412
1413    #[test]
1414    fn query_from_sql_select_grouped_aggregate_projection_lowers_to_grouped_intent() {
1415        reset_session_sql_store();
1416        let session = sql_session();
1417
1418        let query = session
1419            .query_from_sql::<SessionSqlEntity>(
1420                "SELECT age, COUNT(*) FROM SessionSqlEntity GROUP BY age",
1421            )
1422            .expect("grouped aggregate projection SQL query should lower");
1423        assert!(
1424            query.has_grouping(),
1425            "grouped aggregate SQL projection lowering should produce grouped query intent",
1426        );
1427    }
1428
1429    #[test]
1430    fn execute_sql_select_field_projection_currently_returns_entity_shaped_rows() {
1431        reset_session_sql_store();
1432        let session = sql_session();
1433
1434        session
1435            .insert(SessionSqlEntity {
1436                id: Ulid::generate(),
1437                name: "projected-row".to_string(),
1438                age: 29,
1439            })
1440            .expect("seed insert should succeed");
1441
1442        let response = session
1443            .execute_sql::<SessionSqlEntity>(
1444                "SELECT name FROM SessionSqlEntity ORDER BY age ASC LIMIT 1",
1445            )
1446            .expect("field-list SQL projection should execute");
1447        let row = response
1448            .iter()
1449            .next()
1450            .expect("field-list SQL projection response should contain one row");
1451
1452        assert_eq!(
1453            row.entity_ref().name,
1454            "projected-row",
1455            "field-list SQL projection should still return entity rows in this baseline",
1456        );
1457        assert_eq!(
1458            row.entity_ref().age,
1459            29,
1460            "field-list SQL projection should preserve full entity payload until projection response shaping is introduced",
1461        );
1462    }
1463
1464    #[test]
1465    fn execute_sql_projection_select_field_list_returns_projection_shaped_rows() {
1466        reset_session_sql_store();
1467        let session = sql_session();
1468
1469        session
1470            .insert(SessionSqlEntity {
1471                id: Ulid::generate(),
1472                name: "projection-surface".to_string(),
1473                age: 33,
1474            })
1475            .expect("seed insert should succeed");
1476
1477        let response = session
1478            .execute_sql_projection::<SessionSqlEntity>(
1479                "SELECT name FROM SessionSqlEntity ORDER BY age ASC LIMIT 1",
1480            )
1481            .expect("projection SQL execution should succeed");
1482        let row = response
1483            .iter()
1484            .next()
1485            .expect("projection SQL response should contain one row");
1486
1487        assert_eq!(response.count(), 1);
1488        assert_eq!(
1489            row.values(),
1490            [Value::Text("projection-surface".to_string())],
1491            "projection SQL response should carry only projected field values in declaration order",
1492        );
1493    }
1494
1495    #[test]
1496    fn execute_sql_projection_select_star_returns_all_fields_in_model_order() {
1497        reset_session_sql_store();
1498        let session = sql_session();
1499
1500        session
1501            .insert(SessionSqlEntity {
1502                id: Ulid::generate(),
1503                name: "projection-star".to_string(),
1504                age: 41,
1505            })
1506            .expect("seed insert should succeed");
1507
1508        let response = session
1509            .execute_sql_projection::<SessionSqlEntity>(
1510                "SELECT * FROM SessionSqlEntity ORDER BY age ASC LIMIT 1",
1511            )
1512            .expect("projection SQL star execution should succeed");
1513        let row = response
1514            .iter()
1515            .next()
1516            .expect("projection SQL star response should contain one row");
1517
1518        assert_eq!(response.count(), 1);
1519        assert_eq!(
1520            row.values().len(),
1521            3,
1522            "SELECT * projection response should include all model fields",
1523        );
1524        assert_eq!(row.values()[0], Value::Ulid(row.id().key()));
1525        assert_eq!(row.values()[1], Value::Text("projection-star".to_string()));
1526        assert_eq!(row.values()[2], Value::Uint(41));
1527    }
1528
1529    #[test]
1530    fn execute_sql_select_schema_qualified_entity_executes() {
1531        reset_session_sql_store();
1532        let session = sql_session();
1533
1534        session
1535            .insert(SessionSqlEntity {
1536                id: Ulid::generate(),
1537                name: "schema-qualified".to_string(),
1538                age: 41,
1539            })
1540            .expect("seed insert should succeed");
1541
1542        let response = session
1543            .execute_sql::<SessionSqlEntity>(
1544                "SELECT * FROM public.SessionSqlEntity ORDER BY age ASC LIMIT 1",
1545            )
1546            .expect("schema-qualified entity SQL should execute");
1547
1548        assert_eq!(response.len(), 1);
1549    }
1550
1551    #[test]
1552    fn execute_sql_projection_select_table_qualified_fields_executes() {
1553        reset_session_sql_store();
1554        let session = sql_session();
1555
1556        session
1557            .insert(SessionSqlEntity {
1558                id: Ulid::generate(),
1559                name: "qualified-projection".to_string(),
1560                age: 42,
1561            })
1562            .expect("seed insert should succeed");
1563
1564        let response = session
1565            .execute_sql_projection::<SessionSqlEntity>(
1566                "SELECT SessionSqlEntity.name \
1567                 FROM SessionSqlEntity \
1568                 WHERE SessionSqlEntity.age >= 40 \
1569                 ORDER BY SessionSqlEntity.age DESC LIMIT 1",
1570            )
1571            .expect("table-qualified projection SQL should execute");
1572        let row = response
1573            .iter()
1574            .next()
1575            .expect("table-qualified projection SQL response should contain one row");
1576
1577        assert_eq!(response.count(), 1);
1578        assert_eq!(
1579            row.values(),
1580            [Value::Text("qualified-projection".to_string())]
1581        );
1582    }
1583
1584    #[test]
1585    fn execute_sql_projection_rejects_delete_statements() {
1586        reset_session_sql_store();
1587        let session = sql_session();
1588
1589        let err = session
1590            .execute_sql_projection::<SessionSqlEntity>(
1591                "DELETE FROM SessionSqlEntity ORDER BY age LIMIT 1",
1592            )
1593            .expect_err("projection SQL execution should reject delete statements");
1594
1595        assert!(
1596            matches!(
1597                err,
1598                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1599                    _
1600                ))
1601            ),
1602            "projection SQL delete usage should fail as unsupported",
1603        );
1604    }
1605
1606    #[test]
1607    fn execute_sql_select_field_projection_unknown_field_fails_with_plan_error() {
1608        reset_session_sql_store();
1609        let session = sql_session();
1610
1611        let err = session
1612            .execute_sql::<SessionSqlEntity>("SELECT missing_field FROM SessionSqlEntity")
1613            .expect_err("unknown projected fields should fail planner validation");
1614
1615        assert!(
1616            matches!(err, QueryError::Plan(_)),
1617            "unknown projected fields should surface planner-domain query errors: {err:?}",
1618        );
1619    }
1620
1621    #[test]
1622    fn execute_sql_rejects_aggregate_projection_in_current_slice() {
1623        reset_session_sql_store();
1624        let session = sql_session();
1625
1626        let err = session
1627            .execute_sql::<SessionSqlEntity>("SELECT COUNT(*) FROM SessionSqlEntity")
1628            .expect_err("global aggregate SQL projection should remain lowering-gated");
1629
1630        assert!(
1631            matches!(
1632                err,
1633                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1634                    _
1635                ))
1636            ),
1637            "global aggregate SQL projection should fail at reduced lowering boundary",
1638        );
1639    }
1640
1641    #[test]
1642    fn execute_sql_rejects_table_alias_forms_in_reduced_parser() {
1643        reset_session_sql_store();
1644        let session = sql_session();
1645
1646        let err = session
1647            .execute_sql::<SessionSqlEntity>("SELECT * FROM SessionSqlEntity alias")
1648            .expect_err("table aliases should be rejected by reduced SQL parser");
1649
1650        assert!(
1651            matches!(
1652                err,
1653                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1654                    _
1655                ))
1656            ),
1657            "table alias usage should fail closed through unsupported SQL boundary",
1658        );
1659    }
1660
1661    #[test]
1662    fn execute_sql_rejects_quoted_identifiers_in_reduced_parser() {
1663        reset_session_sql_store();
1664        let session = sql_session();
1665
1666        let err = session
1667            .execute_sql::<SessionSqlEntity>("SELECT \"name\" FROM SessionSqlEntity")
1668            .expect_err("quoted identifiers should be rejected by reduced SQL parser");
1669
1670        assert!(
1671            matches!(
1672                err,
1673                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1674                    _
1675                ))
1676            ),
1677            "quoted identifiers should fail closed through unsupported SQL boundary",
1678        );
1679    }
1680
1681    #[test]
1682    fn execute_sql_select_distinct_star_executes() {
1683        reset_session_sql_store();
1684        let session = sql_session();
1685
1686        let id_a = Ulid::generate();
1687        let id_b = Ulid::generate();
1688        session
1689            .insert(SessionSqlEntity {
1690                id: id_a,
1691                name: "distinct-a".to_string(),
1692                age: 20,
1693            })
1694            .expect("seed insert should succeed");
1695        session
1696            .insert(SessionSqlEntity {
1697                id: id_b,
1698                name: "distinct-b".to_string(),
1699                age: 20,
1700            })
1701            .expect("seed insert should succeed");
1702
1703        let response = session
1704            .execute_sql::<SessionSqlEntity>(
1705                "SELECT DISTINCT * FROM SessionSqlEntity ORDER BY id ASC",
1706            )
1707            .expect("SELECT DISTINCT * should execute");
1708        assert_eq!(response.len(), 2);
1709    }
1710
1711    #[test]
1712    fn execute_sql_projection_select_distinct_with_pk_field_list_executes() {
1713        reset_session_sql_store();
1714        let session = sql_session();
1715
1716        session
1717            .insert(SessionSqlEntity {
1718                id: Ulid::generate(),
1719                name: "distinct-pk-a".to_string(),
1720                age: 25,
1721            })
1722            .expect("seed insert should succeed");
1723        session
1724            .insert(SessionSqlEntity {
1725                id: Ulid::generate(),
1726                name: "distinct-pk-b".to_string(),
1727                age: 25,
1728            })
1729            .expect("seed insert should succeed");
1730
1731        let response = session
1732            .execute_sql_projection::<SessionSqlEntity>(
1733                "SELECT DISTINCT id, age FROM SessionSqlEntity ORDER BY id ASC",
1734            )
1735            .expect("SELECT DISTINCT field-list with PK should execute");
1736        assert_eq!(response.len(), 2);
1737        assert_eq!(response[0].values().len(), 2);
1738    }
1739
1740    #[test]
1741    fn execute_sql_rejects_distinct_without_pk_projection_in_current_slice() {
1742        reset_session_sql_store();
1743        let session = sql_session();
1744
1745        let err = session
1746            .execute_sql::<SessionSqlEntity>("SELECT DISTINCT age FROM SessionSqlEntity")
1747            .expect_err("SELECT DISTINCT without PK in projection should remain lowering-gated");
1748
1749        assert!(
1750            matches!(
1751                err,
1752                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1753                    _
1754                ))
1755            ),
1756            "distinct SQL gating should map to unsupported execution error boundary",
1757        );
1758    }
1759
1760    #[test]
1761    fn execute_sql_aggregate_count_star_and_count_field_return_uint() {
1762        reset_session_sql_store();
1763        let session = sql_session();
1764
1765        session
1766            .insert(SessionSqlEntity {
1767                id: Ulid::generate(),
1768                name: "aggregate-a".to_string(),
1769                age: 20,
1770            })
1771            .expect("seed insert should succeed");
1772        session
1773            .insert(SessionSqlEntity {
1774                id: Ulid::generate(),
1775                name: "aggregate-b".to_string(),
1776                age: 32,
1777            })
1778            .expect("seed insert should succeed");
1779
1780        let count_rows = session
1781            .execute_sql_aggregate::<SessionSqlEntity>("SELECT COUNT(*) FROM SessionSqlEntity")
1782            .expect("COUNT(*) SQL aggregate should execute");
1783        let count_field = session
1784            .execute_sql_aggregate::<SessionSqlEntity>("SELECT COUNT(age) FROM SessionSqlEntity")
1785            .expect("COUNT(field) SQL aggregate should execute");
1786        assert_eq!(count_rows, Value::Uint(2));
1787        assert_eq!(count_field, Value::Uint(2));
1788    }
1789
1790    #[test]
1791    fn execute_sql_aggregate_sum_with_table_qualified_field_executes() {
1792        reset_session_sql_store();
1793        let session = sql_session();
1794
1795        session
1796            .insert(SessionSqlEntity {
1797                id: Ulid::generate(),
1798                name: "qualified-aggregate-a".to_string(),
1799                age: 20,
1800            })
1801            .expect("seed insert should succeed");
1802        session
1803            .insert(SessionSqlEntity {
1804                id: Ulid::generate(),
1805                name: "qualified-aggregate-b".to_string(),
1806                age: 32,
1807            })
1808            .expect("seed insert should succeed");
1809
1810        let sum = session
1811            .execute_sql_aggregate::<SessionSqlEntity>(
1812                "SELECT SUM(SessionSqlEntity.age) FROM SessionSqlEntity",
1813            )
1814            .expect("table-qualified aggregate SQL should execute");
1815
1816        assert_eq!(sum, Value::Decimal(crate::types::Decimal::from(52u64)));
1817    }
1818
1819    #[test]
1820    fn execute_sql_aggregate_rejects_distinct_aggregate_qualifier() {
1821        reset_session_sql_store();
1822        let session = sql_session();
1823
1824        let err = session
1825            .execute_sql_aggregate::<SessionSqlEntity>(
1826                "SELECT COUNT(DISTINCT age) FROM SessionSqlEntity",
1827            )
1828            .expect_err("aggregate DISTINCT qualifier should remain unsupported");
1829
1830        assert!(
1831            matches!(
1832                err,
1833                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1834                    _
1835                ))
1836            ),
1837            "aggregate DISTINCT qualifier should fail closed through unsupported SQL boundary",
1838        );
1839    }
1840
1841    #[test]
1842    fn execute_sql_aggregate_sum_avg_min_max_return_expected_values() {
1843        reset_session_sql_store();
1844        let session = sql_session();
1845
1846        session
1847            .insert(SessionSqlEntity {
1848                id: Ulid::generate(),
1849                name: "sumavg-a".to_string(),
1850                age: 20,
1851            })
1852            .expect("seed insert should succeed");
1853        session
1854            .insert(SessionSqlEntity {
1855                id: Ulid::generate(),
1856                name: "sumavg-b".to_string(),
1857                age: 32,
1858            })
1859            .expect("seed insert should succeed");
1860
1861        let sum = session
1862            .execute_sql_aggregate::<SessionSqlEntity>("SELECT SUM(age) FROM SessionSqlEntity")
1863            .expect("SUM(field) SQL aggregate should execute");
1864        let avg = session
1865            .execute_sql_aggregate::<SessionSqlEntity>("SELECT AVG(age) FROM SessionSqlEntity")
1866            .expect("AVG(field) SQL aggregate should execute");
1867        let min = session
1868            .execute_sql_aggregate::<SessionSqlEntity>("SELECT MIN(age) FROM SessionSqlEntity")
1869            .expect("MIN(field) SQL aggregate should execute");
1870        let max = session
1871            .execute_sql_aggregate::<SessionSqlEntity>("SELECT MAX(age) FROM SessionSqlEntity")
1872            .expect("MAX(field) SQL aggregate should execute");
1873        let empty_sum = session
1874            .execute_sql_aggregate::<SessionSqlEntity>(
1875                "SELECT SUM(age) FROM SessionSqlEntity WHERE age < 0",
1876            )
1877            .expect("SUM(field) SQL aggregate empty-window execution should succeed");
1878        let empty_min = session
1879            .execute_sql_aggregate::<SessionSqlEntity>(
1880                "SELECT MIN(age) FROM SessionSqlEntity WHERE age < 0",
1881            )
1882            .expect("MIN(field) SQL aggregate empty-window execution should succeed");
1883        let empty_max = session
1884            .execute_sql_aggregate::<SessionSqlEntity>(
1885                "SELECT MAX(age) FROM SessionSqlEntity WHERE age < 0",
1886            )
1887            .expect("MAX(field) SQL aggregate empty-window execution should succeed");
1888
1889        assert_eq!(sum, Value::Decimal(crate::types::Decimal::from(52u64)));
1890        assert_eq!(avg, Value::Decimal(crate::types::Decimal::from(26u64)));
1891        assert_eq!(min, Value::Uint(20));
1892        assert_eq!(max, Value::Uint(32));
1893        assert_eq!(empty_sum, Value::Null);
1894        assert_eq!(empty_min, Value::Null);
1895        assert_eq!(empty_max, Value::Null);
1896    }
1897
1898    #[test]
1899    fn execute_sql_aggregate_honors_order_limit_offset_window() {
1900        reset_session_sql_store();
1901        let session = sql_session();
1902
1903        session
1904            .insert(SessionSqlEntity {
1905                id: Ulid::generate(),
1906                name: "window-a".to_string(),
1907                age: 10,
1908            })
1909            .expect("seed insert should succeed");
1910        session
1911            .insert(SessionSqlEntity {
1912                id: Ulid::generate(),
1913                name: "window-b".to_string(),
1914                age: 20,
1915            })
1916            .expect("seed insert should succeed");
1917        session
1918            .insert(SessionSqlEntity {
1919                id: Ulid::generate(),
1920                name: "window-c".to_string(),
1921                age: 30,
1922            })
1923            .expect("seed insert should succeed");
1924
1925        let count = session
1926            .execute_sql_aggregate::<SessionSqlEntity>(
1927                "SELECT COUNT(*) FROM SessionSqlEntity ORDER BY age DESC LIMIT 2 OFFSET 1",
1928            )
1929            .expect("COUNT(*) SQL aggregate window execution should succeed");
1930        let sum = session
1931            .execute_sql_aggregate::<SessionSqlEntity>(
1932                "SELECT SUM(age) FROM SessionSqlEntity ORDER BY age DESC LIMIT 1 OFFSET 1",
1933            )
1934            .expect("SUM(field) SQL aggregate window execution should succeed");
1935        let avg = session
1936            .execute_sql_aggregate::<SessionSqlEntity>(
1937                "SELECT AVG(age) FROM SessionSqlEntity ORDER BY age ASC LIMIT 2 OFFSET 1",
1938            )
1939            .expect("AVG(field) SQL aggregate window execution should succeed");
1940
1941        assert_eq!(count, Value::Uint(2));
1942        assert_eq!(sum, Value::Decimal(crate::types::Decimal::from(20u64)));
1943        assert_eq!(avg, Value::Decimal(crate::types::Decimal::from(25u64)));
1944    }
1945
1946    #[test]
1947    fn execute_sql_aggregate_rejects_unsupported_aggregate_shapes() {
1948        reset_session_sql_store();
1949        let session = sql_session();
1950
1951        for sql in [
1952            "SELECT age FROM SessionSqlEntity",
1953            "SELECT age, COUNT(*) FROM SessionSqlEntity GROUP BY age",
1954        ] {
1955            let err = session
1956                .execute_sql_aggregate::<SessionSqlEntity>(sql)
1957                .expect_err("unsupported SQL aggregate shape should fail closed");
1958            assert!(
1959                matches!(
1960                    err,
1961                    QueryError::Execute(
1962                        crate::db::query::intent::QueryExecutionError::Unsupported(_)
1963                    )
1964                ),
1965                "unsupported SQL aggregate shape should map to unsupported execution error boundary: {sql}",
1966            );
1967        }
1968    }
1969
1970    #[test]
1971    fn execute_sql_aggregate_rejects_unknown_target_field() {
1972        reset_session_sql_store();
1973        let session = sql_session();
1974
1975        let err = session
1976            .execute_sql_aggregate::<SessionSqlEntity>(
1977                "SELECT SUM(missing_field) FROM SessionSqlEntity",
1978            )
1979            .expect_err("unknown aggregate target field should fail");
1980
1981        assert!(
1982            matches!(
1983                err,
1984                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1985                    _
1986                ))
1987            ),
1988            "unknown aggregate target field should map to unsupported execution error boundary",
1989        );
1990    }
1991
1992    #[test]
1993    fn execute_sql_projection_rejects_grouped_aggregate_sql() {
1994        reset_session_sql_store();
1995        let session = sql_session();
1996
1997        let err = session
1998            .execute_sql_projection::<SessionSqlEntity>(
1999                "SELECT age, COUNT(*) FROM SessionSqlEntity GROUP BY age",
2000            )
2001            .expect_err("projection SQL API should reject grouped aggregate SQL intent");
2002
2003        assert!(
2004            matches!(
2005                err,
2006                QueryError::Intent(
2007                    crate::db::query::intent::IntentError::GroupedRequiresExecuteGrouped
2008                )
2009            ),
2010            "projection SQL API must reject grouped aggregate SQL with grouped-intent routing error",
2011        );
2012    }
2013
2014    #[test]
2015    fn execute_sql_grouped_select_count_returns_grouped_aggregate_row() {
2016        reset_session_sql_store();
2017        let session = sql_session();
2018
2019        session
2020            .insert(SessionSqlEntity {
2021                id: Ulid::generate(),
2022                name: "aggregate-a".to_string(),
2023                age: 20,
2024            })
2025            .expect("seed insert should succeed");
2026        session
2027            .insert(SessionSqlEntity {
2028                id: Ulid::generate(),
2029                name: "aggregate-b".to_string(),
2030                age: 20,
2031            })
2032            .expect("seed insert should succeed");
2033        session
2034            .insert(SessionSqlEntity {
2035                id: Ulid::generate(),
2036                name: "aggregate-c".to_string(),
2037                age: 32,
2038            })
2039            .expect("seed insert should succeed");
2040
2041        let execution = session
2042            .execute_sql_grouped::<SessionSqlEntity>(
2043                "SELECT age, COUNT(*) FROM SessionSqlEntity GROUP BY age ORDER BY age ASC LIMIT 10",
2044                None,
2045            )
2046            .expect("grouped SQL aggregate execution should succeed");
2047
2048        assert!(
2049            execution.continuation_cursor().is_none(),
2050            "single-page grouped aggregate execution should not emit continuation cursor",
2051        );
2052        assert_eq!(execution.rows().len(), 2);
2053        assert_eq!(execution.rows()[0].group_key(), [Value::Uint(20)]);
2054        assert_eq!(execution.rows()[0].aggregate_values(), [Value::Uint(2)]);
2055        assert_eq!(execution.rows()[1].group_key(), [Value::Uint(32)]);
2056        assert_eq!(execution.rows()[1].aggregate_values(), [Value::Uint(1)]);
2057    }
2058
2059    #[test]
2060    fn execute_sql_grouped_select_count_with_qualified_identifiers_executes() {
2061        reset_session_sql_store();
2062        let session = sql_session();
2063
2064        session
2065            .insert(SessionSqlEntity {
2066                id: Ulid::generate(),
2067                name: "qualified-group-a".to_string(),
2068                age: 20,
2069            })
2070            .expect("seed insert should succeed");
2071        session
2072            .insert(SessionSqlEntity {
2073                id: Ulid::generate(),
2074                name: "qualified-group-b".to_string(),
2075                age: 20,
2076            })
2077            .expect("seed insert should succeed");
2078        session
2079            .insert(SessionSqlEntity {
2080                id: Ulid::generate(),
2081                name: "qualified-group-c".to_string(),
2082                age: 32,
2083            })
2084            .expect("seed insert should succeed");
2085
2086        let execution = session
2087            .execute_sql_grouped::<SessionSqlEntity>(
2088                "SELECT SessionSqlEntity.age, COUNT(*) \
2089                 FROM public.SessionSqlEntity \
2090                 WHERE SessionSqlEntity.age >= 20 \
2091                 GROUP BY SessionSqlEntity.age \
2092                 ORDER BY SessionSqlEntity.age ASC LIMIT 10",
2093                None,
2094            )
2095            .expect("qualified grouped SQL aggregate execution should succeed");
2096
2097        assert!(execution.continuation_cursor().is_none());
2098        assert_eq!(execution.rows().len(), 2);
2099        assert_eq!(execution.rows()[0].group_key(), [Value::Uint(20)]);
2100        assert_eq!(execution.rows()[0].aggregate_values(), [Value::Uint(2)]);
2101        assert_eq!(execution.rows()[1].group_key(), [Value::Uint(32)]);
2102        assert_eq!(execution.rows()[1].aggregate_values(), [Value::Uint(1)]);
2103    }
2104
2105    #[test]
2106    fn execute_sql_grouped_rejects_scalar_sql_intent() {
2107        reset_session_sql_store();
2108        let session = sql_session();
2109
2110        let err = session
2111            .execute_sql_grouped::<SessionSqlEntity>("SELECT name FROM SessionSqlEntity", None)
2112            .expect_err("grouped SQL API should reject non-grouped SQL queries");
2113
2114        assert!(
2115            matches!(
2116                err,
2117                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
2118                    _
2119                ))
2120            ),
2121            "grouped SQL API should fail closed for non-grouped SQL shapes",
2122        );
2123    }
2124
2125    #[test]
2126    fn execute_sql_rejects_grouped_sql_intent_without_grouped_api() {
2127        reset_session_sql_store();
2128        let session = sql_session();
2129
2130        let err = session
2131            .execute_sql::<SessionSqlEntity>(
2132                "SELECT age, COUNT(*) FROM SessionSqlEntity GROUP BY age",
2133            )
2134            .expect_err("scalar SQL API should reject grouped SQL intent");
2135
2136        assert!(
2137            matches!(
2138                err,
2139                QueryError::Intent(
2140                    crate::db::query::intent::IntentError::GroupedRequiresExecuteGrouped
2141                )
2142            ),
2143            "scalar SQL API must preserve grouped explicit-entrypoint contract",
2144        );
2145    }
2146
2147    #[test]
2148    fn execute_sql_rejects_unsupported_group_by_projection_shape() {
2149        reset_session_sql_store();
2150        let session = sql_session();
2151
2152        let err = session
2153            .execute_sql::<SessionSqlEntity>("SELECT COUNT(*) FROM SessionSqlEntity GROUP BY age")
2154            .expect_err("group-by projection mismatch should fail closed");
2155
2156        assert!(
2157            matches!(
2158                err,
2159                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
2160                    _
2161                ))
2162            ),
2163            "unsupported grouped SQL projection shapes should fail at reduced lowering boundary",
2164        );
2165    }
2166
2167    #[test]
2168    fn explain_sql_execution_returns_descriptor_text() {
2169        reset_session_sql_store();
2170        let session = sql_session();
2171
2172        let explain = session
2173            .explain_sql::<SessionSqlEntity>(
2174                "EXPLAIN EXECUTION SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
2175            )
2176            .expect("EXPLAIN EXECUTION should succeed");
2177
2178        assert!(
2179            explain.contains("node_id=0"),
2180            "execution explain output should include the root descriptor node id",
2181        );
2182        assert!(
2183            explain.contains("layer="),
2184            "execution explain output should include execution layer annotations",
2185        );
2186    }
2187
2188    #[test]
2189    fn explain_sql_plan_returns_logical_plan_text() {
2190        reset_session_sql_store();
2191        let session = sql_session();
2192
2193        let explain = session
2194            .explain_sql::<SessionSqlEntity>(
2195                "EXPLAIN SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
2196            )
2197            .expect("EXPLAIN should succeed");
2198
2199        assert!(
2200            explain.contains("mode=Load"),
2201            "logical explain text should include query mode projection",
2202        );
2203        assert!(
2204            explain.contains("access="),
2205            "logical explain text should include projected access shape",
2206        );
2207    }
2208
2209    #[test]
2210    fn explain_sql_plan_grouped_qualified_identifiers_match_unqualified_output() {
2211        reset_session_sql_store();
2212        let session = sql_session();
2213
2214        let qualified = session
2215            .explain_sql::<SessionSqlEntity>(
2216                "EXPLAIN SELECT SessionSqlEntity.age, COUNT(*) \
2217                 FROM public.SessionSqlEntity \
2218                 WHERE SessionSqlEntity.age >= 21 \
2219                 GROUP BY SessionSqlEntity.age \
2220                 ORDER BY SessionSqlEntity.age DESC LIMIT 2 OFFSET 1",
2221            )
2222            .expect("qualified grouped EXPLAIN plan SQL should succeed");
2223        let unqualified = session
2224            .explain_sql::<SessionSqlEntity>(
2225                "EXPLAIN SELECT age, COUNT(*) \
2226                 FROM SessionSqlEntity \
2227                 WHERE age >= 21 \
2228                 GROUP BY age \
2229                 ORDER BY age DESC LIMIT 2 OFFSET 1",
2230            )
2231            .expect("unqualified grouped EXPLAIN plan SQL should succeed");
2232
2233        assert_eq!(
2234            qualified, unqualified,
2235            "qualified grouped identifiers should normalize to the same logical EXPLAIN plan output",
2236        );
2237    }
2238
2239    #[test]
2240    fn explain_sql_execution_grouped_qualified_identifiers_match_unqualified_output() {
2241        reset_session_sql_store();
2242        let session = sql_session();
2243
2244        let qualified = session
2245            .explain_sql::<SessionSqlEntity>(
2246                "EXPLAIN EXECUTION SELECT SessionSqlEntity.age, COUNT(*) \
2247                 FROM public.SessionSqlEntity \
2248                 WHERE SessionSqlEntity.age >= 21 \
2249                 GROUP BY SessionSqlEntity.age \
2250                 ORDER BY SessionSqlEntity.age DESC LIMIT 2 OFFSET 1",
2251            )
2252            .expect("qualified grouped EXPLAIN execution SQL should succeed");
2253        let unqualified = session
2254            .explain_sql::<SessionSqlEntity>(
2255                "EXPLAIN EXECUTION SELECT age, COUNT(*) \
2256                 FROM SessionSqlEntity \
2257                 WHERE age >= 21 \
2258                 GROUP BY age \
2259                 ORDER BY age DESC LIMIT 2 OFFSET 1",
2260            )
2261            .expect("unqualified grouped EXPLAIN execution SQL should succeed");
2262
2263        assert_eq!(
2264            qualified, unqualified,
2265            "qualified grouped identifiers should normalize to the same execution EXPLAIN descriptor output",
2266        );
2267    }
2268
2269    #[test]
2270    fn explain_sql_json_grouped_qualified_identifiers_match_unqualified_output() {
2271        reset_session_sql_store();
2272        let session = sql_session();
2273
2274        let qualified = session
2275            .explain_sql::<SessionSqlEntity>(
2276                "EXPLAIN JSON SELECT SessionSqlEntity.age, COUNT(*) \
2277                 FROM public.SessionSqlEntity \
2278                 WHERE SessionSqlEntity.age >= 21 \
2279                 GROUP BY SessionSqlEntity.age \
2280                 ORDER BY SessionSqlEntity.age DESC LIMIT 2 OFFSET 1",
2281            )
2282            .expect("qualified grouped EXPLAIN JSON SQL should succeed");
2283        let unqualified = session
2284            .explain_sql::<SessionSqlEntity>(
2285                "EXPLAIN JSON SELECT age, COUNT(*) \
2286                 FROM SessionSqlEntity \
2287                 WHERE age >= 21 \
2288                 GROUP BY age \
2289                 ORDER BY age DESC LIMIT 2 OFFSET 1",
2290            )
2291            .expect("unqualified grouped EXPLAIN JSON SQL should succeed");
2292
2293        assert_eq!(
2294            qualified, unqualified,
2295            "qualified grouped identifiers should normalize to the same EXPLAIN JSON output",
2296        );
2297    }
2298
2299    #[test]
2300    fn explain_sql_plan_qualified_identifiers_match_unqualified_output() {
2301        reset_session_sql_store();
2302        let session = sql_session();
2303
2304        let qualified = session
2305            .explain_sql::<SessionSqlEntity>(
2306                "EXPLAIN SELECT * \
2307                 FROM public.SessionSqlEntity \
2308                 WHERE SessionSqlEntity.age >= 21 \
2309                 ORDER BY SessionSqlEntity.age DESC LIMIT 1",
2310            )
2311            .expect("qualified EXPLAIN plan SQL should succeed");
2312        let unqualified = session
2313            .explain_sql::<SessionSqlEntity>(
2314                "EXPLAIN SELECT * \
2315                 FROM SessionSqlEntity \
2316                 WHERE age >= 21 \
2317                 ORDER BY age DESC LIMIT 1",
2318            )
2319            .expect("unqualified EXPLAIN plan SQL should succeed");
2320
2321        assert_eq!(
2322            qualified, unqualified,
2323            "qualified identifiers should normalize to the same logical EXPLAIN plan output",
2324        );
2325    }
2326
2327    #[test]
2328    fn explain_sql_execution_qualified_identifiers_match_unqualified_output() {
2329        reset_session_sql_store();
2330        let session = sql_session();
2331
2332        let qualified = session
2333            .explain_sql::<SessionSqlEntity>(
2334                "EXPLAIN EXECUTION SELECT SessionSqlEntity.name \
2335                 FROM SessionSqlEntity \
2336                 WHERE SessionSqlEntity.age >= 21 \
2337                 ORDER BY SessionSqlEntity.age DESC LIMIT 1",
2338            )
2339            .expect("qualified EXPLAIN execution SQL should succeed");
2340        let unqualified = session
2341            .explain_sql::<SessionSqlEntity>(
2342                "EXPLAIN EXECUTION SELECT name \
2343                 FROM SessionSqlEntity \
2344                 WHERE age >= 21 \
2345                 ORDER BY age DESC LIMIT 1",
2346            )
2347            .expect("unqualified EXPLAIN execution SQL should succeed");
2348
2349        assert_eq!(
2350            qualified, unqualified,
2351            "qualified identifiers should normalize to the same execution EXPLAIN descriptor output",
2352        );
2353    }
2354
2355    #[test]
2356    fn explain_sql_json_qualified_aggregate_matches_unqualified_output() {
2357        reset_session_sql_store();
2358        let session = sql_session();
2359
2360        let qualified = session
2361            .explain_sql::<SessionSqlEntity>(
2362                "EXPLAIN JSON SELECT SUM(SessionSqlEntity.age) \
2363                 FROM public.SessionSqlEntity \
2364                 WHERE SessionSqlEntity.age >= 21",
2365            )
2366            .expect("qualified global aggregate EXPLAIN JSON should succeed");
2367        let unqualified = session
2368            .explain_sql::<SessionSqlEntity>(
2369                "EXPLAIN JSON SELECT SUM(age) FROM SessionSqlEntity WHERE age >= 21",
2370            )
2371            .expect("unqualified global aggregate EXPLAIN JSON should succeed");
2372
2373        assert_eq!(
2374            qualified, unqualified,
2375            "qualified identifiers should normalize to the same global aggregate EXPLAIN JSON output",
2376        );
2377    }
2378
2379    #[test]
2380    fn explain_sql_plan_select_distinct_star_marks_distinct_true() {
2381        reset_session_sql_store();
2382        let session = sql_session();
2383
2384        let explain = session
2385            .explain_sql::<SessionSqlEntity>(
2386                "EXPLAIN SELECT DISTINCT * FROM SessionSqlEntity ORDER BY id ASC",
2387            )
2388            .expect("EXPLAIN SELECT DISTINCT * should succeed");
2389
2390        assert!(
2391            explain.contains("distinct=true"),
2392            "logical explain text should preserve scalar distinct intent",
2393        );
2394    }
2395
2396    #[test]
2397    fn explain_sql_execution_select_distinct_star_returns_execution_descriptor_text() {
2398        reset_session_sql_store();
2399        let session = sql_session();
2400
2401        let explain = session
2402            .explain_sql::<SessionSqlEntity>(
2403                "EXPLAIN EXECUTION SELECT DISTINCT * FROM SessionSqlEntity ORDER BY id ASC LIMIT 1",
2404            )
2405            .expect("EXPLAIN EXECUTION SELECT DISTINCT * should succeed");
2406
2407        assert!(
2408            explain.contains("node_id=0"),
2409            "execution explain output should include the root descriptor node id",
2410        );
2411    }
2412
2413    #[test]
2414    fn explain_sql_json_returns_logical_plan_json() {
2415        reset_session_sql_store();
2416        let session = sql_session();
2417
2418        let explain = session
2419            .explain_sql::<SessionSqlEntity>(
2420                "EXPLAIN JSON SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
2421            )
2422            .expect("EXPLAIN JSON should succeed");
2423
2424        assert!(
2425            explain.starts_with('{') && explain.ends_with('}'),
2426            "logical explain JSON should render one JSON object payload",
2427        );
2428        assert!(
2429            explain.contains("\"mode\":{\"type\":\"Load\""),
2430            "logical explain JSON should expose structured query mode metadata",
2431        );
2432        assert!(
2433            explain.contains("\"access\":"),
2434            "logical explain JSON should include projected access metadata",
2435        );
2436    }
2437
2438    #[test]
2439    fn explain_sql_json_select_distinct_star_marks_distinct_true() {
2440        reset_session_sql_store();
2441        let session = sql_session();
2442
2443        let explain = session
2444            .explain_sql::<SessionSqlEntity>(
2445                "EXPLAIN JSON SELECT DISTINCT * FROM SessionSqlEntity ORDER BY id ASC",
2446            )
2447            .expect("EXPLAIN JSON SELECT DISTINCT * should succeed");
2448
2449        assert!(
2450            explain.contains("\"distinct\":true"),
2451            "logical explain JSON should preserve scalar distinct intent",
2452        );
2453    }
2454
2455    #[test]
2456    fn explain_sql_json_delete_returns_logical_delete_mode() {
2457        reset_session_sql_store();
2458        let session = sql_session();
2459
2460        let explain = session
2461            .explain_sql::<SessionSqlEntity>(
2462                "EXPLAIN JSON DELETE FROM SessionSqlEntity ORDER BY age LIMIT 1",
2463            )
2464            .expect("EXPLAIN JSON DELETE should succeed");
2465
2466        assert!(
2467            explain.contains("\"mode\":{\"type\":\"Delete\""),
2468            "logical explain JSON should expose delete query mode metadata",
2469        );
2470    }
2471
2472    #[test]
2473    fn explain_sql_plan_global_aggregate_returns_logical_plan_text() {
2474        reset_session_sql_store();
2475        let session = sql_session();
2476
2477        let explain = session
2478            .explain_sql::<SessionSqlEntity>("EXPLAIN SELECT COUNT(*) FROM SessionSqlEntity")
2479            .expect("global aggregate SQL explain plan should succeed");
2480
2481        assert!(
2482            explain.contains("mode=Load"),
2483            "global aggregate SQL explain plan should project logical load mode",
2484        );
2485        assert!(
2486            explain.contains("access="),
2487            "global aggregate SQL explain plan should include logical access projection",
2488        );
2489    }
2490
2491    #[test]
2492    fn explain_sql_execution_global_aggregate_returns_execution_descriptor_text() {
2493        reset_session_sql_store();
2494        let session = sql_session();
2495
2496        let explain = session
2497            .explain_sql::<SessionSqlEntity>(
2498                "EXPLAIN EXECUTION SELECT COUNT(*) FROM SessionSqlEntity",
2499            )
2500            .expect("global aggregate SQL explain execution should succeed");
2501
2502        assert!(
2503            explain.contains("AggregateCount execution_mode="),
2504            "global aggregate SQL explain execution should include aggregate terminal node heading",
2505        );
2506        assert!(
2507            explain.contains("node_id=0"),
2508            "global aggregate SQL explain execution should include root node id",
2509        );
2510    }
2511
2512    #[test]
2513    fn explain_sql_json_global_aggregate_returns_logical_plan_json() {
2514        reset_session_sql_store();
2515        let session = sql_session();
2516
2517        let explain = session
2518            .explain_sql::<SessionSqlEntity>("EXPLAIN JSON SELECT COUNT(*) FROM SessionSqlEntity")
2519            .expect("global aggregate SQL explain json should succeed");
2520
2521        assert!(
2522            explain.starts_with('{') && explain.ends_with('}'),
2523            "global aggregate SQL explain json should render one JSON object payload",
2524        );
2525        assert!(
2526            explain.contains("\"mode\":{\"type\":\"Load\""),
2527            "global aggregate SQL explain json should expose logical query mode metadata",
2528        );
2529    }
2530
2531    #[test]
2532    fn explain_sql_global_aggregate_rejects_unknown_target_field() {
2533        reset_session_sql_store();
2534        let session = sql_session();
2535
2536        let err = session
2537            .explain_sql::<SessionSqlEntity>(
2538                "EXPLAIN EXECUTION SELECT SUM(missing_field) FROM SessionSqlEntity",
2539            )
2540            .expect_err("global aggregate SQL explain should reject unknown target fields");
2541
2542        assert!(
2543            matches!(
2544                err,
2545                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
2546                    _
2547                ))
2548            ),
2549            "global aggregate SQL explain should map unknown target field to unsupported execution error boundary",
2550        );
2551    }
2552
2553    #[test]
2554    fn explain_sql_rejects_distinct_without_pk_projection_in_current_slice() {
2555        reset_session_sql_store();
2556        let session = sql_session();
2557
2558        let err = session
2559            .explain_sql::<SessionSqlEntity>("EXPLAIN SELECT DISTINCT age FROM SessionSqlEntity")
2560            .expect_err("EXPLAIN SELECT DISTINCT without PK projection should remain fail-closed");
2561
2562        assert!(
2563            matches!(
2564                err,
2565                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
2566                    _
2567                ))
2568            ),
2569            "unsupported DISTINCT explain shape should map to unsupported execution error boundary",
2570        );
2571    }
2572
2573    #[test]
2574    fn explain_sql_rejects_non_explain_statements() {
2575        reset_session_sql_store();
2576        let session = sql_session();
2577
2578        let err = session
2579            .explain_sql::<SessionSqlEntity>("SELECT * FROM SessionSqlEntity")
2580            .expect_err("explain_sql must reject non-EXPLAIN statements");
2581
2582        assert!(
2583            matches!(
2584                err,
2585                QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
2586                    _
2587                ))
2588            ),
2589            "non-EXPLAIN input must fail as unsupported explain usage",
2590        );
2591    }
2592}