1#[cfg(test)]
7use crate::db::{DataStore, IndexStore};
8use crate::{
9 db::{
10 Db, EntityResponse, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
11 MissingRowPolicy, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace, PlanError,
12 ProjectionResponse, Query, QueryError, QueryTracePlan, StorageReport, StoreRegistry,
13 TraceExecutionStrategy, WriteBatchResponse,
14 access::AccessStrategy,
15 commit::EntityRuntimeHooks,
16 cursor::decode_optional_cursor_token,
17 executor::{
18 DeleteExecutor, ExecutablePlan, ExecutionStrategy, ExecutorPlanError, LoadExecutor,
19 SaveExecutor,
20 },
21 query::{
22 builder::aggregate::AggregateExpr, explain::ExplainAggregateTerminalPlan,
23 plan::QueryMode,
24 },
25 schema::{describe_entity_model, show_indexes_for_model},
26 sql::lowering::{SqlCommand, SqlLoweringError, compile_sql_command},
27 },
28 error::{ErrorClass, ErrorOrigin, InternalError},
29 metrics::sink::{MetricsSink, with_metrics_sink},
30 traits::{CanisterKind, EntityKind, EntityValue},
31 value::Value,
32};
33use std::thread::LocalKey;
34
35fn map_executor_plan_error(err: ExecutorPlanError) -> QueryError {
37 match err {
38 ExecutorPlanError::Cursor(err) => QueryError::from(PlanError::from(*err)),
39 }
40}
41
42fn decode_optional_cursor_bytes(cursor_token: Option<&str>) -> Result<Option<Vec<u8>>, QueryError> {
45 decode_optional_cursor_token(cursor_token).map_err(|err| QueryError::from(PlanError::from(err)))
46}
47
48fn map_sql_lowering_error(err: SqlLoweringError) -> QueryError {
50 QueryError::execute(InternalError::classified(
51 ErrorClass::Unsupported,
52 ErrorOrigin::Query,
53 format!("SQL query is not executable in this release: {err}"),
54 ))
55}
56
57pub struct DbSession<C: CanisterKind> {
64 db: Db<C>,
65 debug: bool,
66 metrics: Option<&'static dyn MetricsSink>,
67}
68
69impl<C: CanisterKind> DbSession<C> {
70 #[must_use]
72 pub(crate) const fn new(db: Db<C>) -> Self {
73 Self {
74 db,
75 debug: false,
76 metrics: None,
77 }
78 }
79
80 #[must_use]
82 pub const fn new_with_hooks(
83 store: &'static LocalKey<StoreRegistry>,
84 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
85 ) -> Self {
86 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
87 }
88
89 #[must_use]
91 pub const fn debug(mut self) -> Self {
92 self.debug = true;
93 self
94 }
95
96 #[must_use]
98 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
99 self.metrics = Some(sink);
100 self
101 }
102
103 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
104 if let Some(sink) = self.metrics {
105 with_metrics_sink(sink, f)
106 } else {
107 f()
108 }
109 }
110
111 fn execute_save_with<E, T, R>(
113 &self,
114 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
115 map: impl FnOnce(T) -> R,
116 ) -> Result<R, InternalError>
117 where
118 E: EntityKind<Canister = C> + EntityValue,
119 {
120 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
121
122 Ok(map(value))
123 }
124
125 fn execute_save_entity<E>(
127 &self,
128 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
129 ) -> Result<E, InternalError>
130 where
131 E: EntityKind<Canister = C> + EntityValue,
132 {
133 self.execute_save_with(op, std::convert::identity)
134 }
135
136 fn execute_save_batch<E>(
137 &self,
138 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
139 ) -> Result<WriteBatchResponse<E>, InternalError>
140 where
141 E: EntityKind<Canister = C> + EntityValue,
142 {
143 self.execute_save_with(op, WriteBatchResponse::new)
144 }
145
146 fn execute_save_view<E>(
147 &self,
148 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
149 ) -> Result<E::ViewType, InternalError>
150 where
151 E: EntityKind<Canister = C> + EntityValue,
152 {
153 self.execute_save_with(op, std::convert::identity)
154 }
155
156 #[must_use]
162 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
163 where
164 E: EntityKind<Canister = C>,
165 {
166 FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
167 }
168
169 #[must_use]
171 pub const fn load_with_consistency<E>(
172 &self,
173 consistency: MissingRowPolicy,
174 ) -> FluentLoadQuery<'_, E>
175 where
176 E: EntityKind<Canister = C>,
177 {
178 FluentLoadQuery::new(self, Query::new(consistency))
179 }
180
181 pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
186 where
187 E: EntityKind<Canister = C>,
188 {
189 let command = compile_sql_command::<E>(sql, MissingRowPolicy::Ignore)
190 .map_err(map_sql_lowering_error)?;
191
192 match command {
193 SqlCommand::Query(query) => Ok(query),
194 SqlCommand::Explain { .. } => Err(QueryError::execute(InternalError::classified(
195 ErrorClass::Unsupported,
196 ErrorOrigin::Query,
197 "query_from_sql does not accept EXPLAIN statements; use explain_sql(...)",
198 ))),
199 }
200 }
201
202 pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
204 where
205 E: EntityKind<Canister = C> + EntityValue,
206 {
207 let query = self.query_from_sql::<E>(sql)?;
208 self.execute_query(&query)
209 }
210
211 pub fn execute_sql_projection<E>(&self, sql: &str) -> Result<ProjectionResponse<E>, QueryError>
216 where
217 E: EntityKind<Canister = C> + EntityValue,
218 {
219 let query = self.query_from_sql::<E>(sql)?;
220 match query.mode() {
221 QueryMode::Load(_) => {
222 self.execute_load_query_with(&query, |load, plan| load.execute_projection(plan))
223 }
224 QueryMode::Delete(_) => Err(QueryError::execute(InternalError::classified(
225 ErrorClass::Unsupported,
226 ErrorOrigin::Query,
227 "execute_sql_projection only supports SELECT statements",
228 ))),
229 }
230 }
231
232 pub fn explain_sql<E>(&self, sql: &str) -> Result<String, QueryError>
239 where
240 E: EntityKind<Canister = C> + EntityValue,
241 {
242 let command = compile_sql_command::<E>(sql, MissingRowPolicy::Ignore)
243 .map_err(map_sql_lowering_error)?;
244
245 match command {
246 SqlCommand::Query(_) => Err(QueryError::execute(InternalError::classified(
247 ErrorClass::Unsupported,
248 ErrorOrigin::Query,
249 "explain_sql requires an EXPLAIN statement",
250 ))),
251 SqlCommand::Explain { mode, query } => match mode {
252 crate::db::sql::parser::SqlExplainMode::Plan => {
253 Ok(query.explain()?.render_text_canonical())
254 }
255 crate::db::sql::parser::SqlExplainMode::Execution => query.explain_execution_text(),
256 crate::db::sql::parser::SqlExplainMode::Json => {
257 Ok(query.explain()?.render_json_canonical())
258 }
259 },
260 }
261 }
262
263 #[must_use]
265 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
266 where
267 E: EntityKind<Canister = C>,
268 {
269 FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
270 }
271
272 #[must_use]
274 pub fn delete_with_consistency<E>(
275 &self,
276 consistency: MissingRowPolicy,
277 ) -> FluentDeleteQuery<'_, E>
278 where
279 E: EntityKind<Canister = C>,
280 {
281 FluentDeleteQuery::new(self, Query::new(consistency).delete())
282 }
283
284 #[must_use]
288 pub const fn select_one(&self) -> Value {
289 Value::Int(1)
290 }
291
292 #[must_use]
299 pub fn show_indexes<E>(&self) -> Vec<String>
300 where
301 E: EntityKind<Canister = C>,
302 {
303 show_indexes_for_model(E::MODEL)
304 }
305
306 #[must_use]
311 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
312 where
313 E: EntityKind<Canister = C>,
314 {
315 describe_entity_model(E::MODEL)
316 }
317
318 pub fn storage_report(
320 &self,
321 name_to_path: &[(&'static str, &'static str)],
322 ) -> Result<StorageReport, InternalError> {
323 self.db.storage_report(name_to_path)
324 }
325
326 #[must_use]
331 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
332 where
333 E: EntityKind<Canister = C> + EntityValue,
334 {
335 LoadExecutor::new(self.db, self.debug)
336 }
337
338 #[must_use]
339 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
340 where
341 E: EntityKind<Canister = C> + EntityValue,
342 {
343 DeleteExecutor::new(self.db, self.debug)
344 }
345
346 #[must_use]
347 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
348 where
349 E: EntityKind<Canister = C> + EntityValue,
350 {
351 SaveExecutor::new(self.db, self.debug)
352 }
353
354 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
360 where
361 E: EntityKind<Canister = C> + EntityValue,
362 {
363 let plan = query.plan()?.into_executable();
364
365 let result = match query.mode() {
366 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
367 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
368 };
369
370 result.map_err(QueryError::execute)
371 }
372
373 pub(in crate::db) fn execute_load_query_with<E, T>(
376 &self,
377 query: &Query<E>,
378 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
379 ) -> Result<T, QueryError>
380 where
381 E: EntityKind<Canister = C> + EntityValue,
382 {
383 let plan = query.plan()?.into_executable();
384
385 self.with_metrics(|| op(self.load_executor::<E>(), plan))
386 .map_err(QueryError::execute)
387 }
388
389 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
394 where
395 E: EntityKind<Canister = C>,
396 {
397 let compiled = query.plan()?;
398 let explain = compiled.explain();
399 let plan_hash = compiled.plan_hash_hex();
400
401 let executable = compiled.into_executable();
402 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
403 let execution_strategy = match query.mode() {
404 QueryMode::Load(_) => Some(trace_execution_strategy(
405 executable
406 .execution_strategy()
407 .map_err(QueryError::execute)?,
408 )),
409 QueryMode::Delete(_) => None,
410 };
411
412 Ok(QueryTracePlan::new(
413 plan_hash,
414 access_strategy,
415 execution_strategy,
416 explain,
417 ))
418 }
419
420 pub(crate) fn explain_load_query_terminal_with<E>(
422 query: &Query<E>,
423 aggregate: AggregateExpr,
424 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
425 where
426 E: EntityKind<Canister = C> + EntityValue,
427 {
428 let compiled = query.plan()?;
430 let query_explain = compiled.explain();
431 let terminal = aggregate.kind();
432
433 let executable = compiled.into_executable();
435 let execution = executable.explain_aggregate_terminal_execution_descriptor(aggregate);
436
437 Ok(ExplainAggregateTerminalPlan::new(
438 query_explain,
439 terminal,
440 execution,
441 ))
442 }
443
444 pub(crate) fn execute_load_query_paged_with_trace<E>(
446 &self,
447 query: &Query<E>,
448 cursor_token: Option<&str>,
449 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
450 where
451 E: EntityKind<Canister = C> + EntityValue,
452 {
453 let plan = query.plan()?.into_executable();
455 match plan.execution_strategy().map_err(QueryError::execute)? {
456 ExecutionStrategy::PrimaryKey => {
457 return Err(QueryError::execute(
458 crate::db::error::query_executor_invariant(
459 "cursor pagination requires explicit or grouped ordering",
460 ),
461 ));
462 }
463 ExecutionStrategy::Ordered => {}
464 ExecutionStrategy::Grouped => {
465 return Err(QueryError::execute(
466 crate::db::error::query_executor_invariant(
467 "grouped plans require execute_grouped(...)",
468 ),
469 ));
470 }
471 }
472
473 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
475 let cursor = plan
476 .prepare_cursor(cursor_bytes.as_deref())
477 .map_err(map_executor_plan_error)?;
478
479 let (page, trace) = self
481 .with_metrics(|| {
482 self.load_executor::<E>()
483 .execute_paged_with_cursor_traced(plan, cursor)
484 })
485 .map_err(QueryError::execute)?;
486 let next_cursor = page
487 .next_cursor
488 .map(|token| {
489 let Some(token) = token.as_scalar() else {
490 return Err(QueryError::execute(
491 crate::db::error::query_executor_invariant(
492 "scalar load pagination emitted grouped continuation token",
493 ),
494 ));
495 };
496
497 token.encode().map_err(|err| {
498 QueryError::execute(InternalError::serialize_internal(format!(
499 "failed to serialize continuation cursor: {err}"
500 )))
501 })
502 })
503 .transpose()?;
504
505 Ok(PagedLoadExecutionWithTrace::new(
506 page.items,
507 next_cursor,
508 trace,
509 ))
510 }
511
512 pub fn execute_grouped<E>(
517 &self,
518 query: &Query<E>,
519 cursor_token: Option<&str>,
520 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
521 where
522 E: EntityKind<Canister = C> + EntityValue,
523 {
524 let plan = query.plan()?.into_executable();
526 if !matches!(
527 plan.execution_strategy().map_err(QueryError::execute)?,
528 ExecutionStrategy::Grouped
529 ) {
530 return Err(QueryError::execute(
531 crate::db::error::query_executor_invariant(
532 "execute_grouped requires grouped logical plans",
533 ),
534 ));
535 }
536
537 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
539 let cursor = plan
540 .prepare_grouped_cursor(cursor_bytes.as_deref())
541 .map_err(map_executor_plan_error)?;
542
543 let (page, trace) = self
545 .with_metrics(|| {
546 self.load_executor::<E>()
547 .execute_grouped_paged_with_cursor_traced(plan, cursor)
548 })
549 .map_err(QueryError::execute)?;
550 let next_cursor = page
551 .next_cursor
552 .map(|token| {
553 let Some(token) = token.as_grouped() else {
554 return Err(QueryError::execute(
555 crate::db::error::query_executor_invariant(
556 "grouped pagination emitted scalar continuation token",
557 ),
558 ));
559 };
560
561 token.encode().map_err(|err| {
562 QueryError::execute(InternalError::serialize_internal(format!(
563 "failed to serialize grouped continuation cursor: {err}"
564 )))
565 })
566 })
567 .transpose()?;
568
569 Ok(PagedGroupedExecutionWithTrace::new(
570 page.rows,
571 next_cursor,
572 trace,
573 ))
574 }
575
576 pub fn insert<E>(&self, entity: E) -> Result<E, InternalError>
582 where
583 E: EntityKind<Canister = C> + EntityValue,
584 {
585 self.execute_save_entity(|save| save.insert(entity))
586 }
587
588 pub fn insert_many_atomic<E>(
594 &self,
595 entities: impl IntoIterator<Item = E>,
596 ) -> Result<WriteBatchResponse<E>, InternalError>
597 where
598 E: EntityKind<Canister = C> + EntityValue,
599 {
600 self.execute_save_batch(|save| save.insert_many_atomic(entities))
601 }
602
603 pub fn insert_many_non_atomic<E>(
607 &self,
608 entities: impl IntoIterator<Item = E>,
609 ) -> Result<WriteBatchResponse<E>, InternalError>
610 where
611 E: EntityKind<Canister = C> + EntityValue,
612 {
613 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
614 }
615
616 pub fn replace<E>(&self, entity: E) -> Result<E, InternalError>
618 where
619 E: EntityKind<Canister = C> + EntityValue,
620 {
621 self.execute_save_entity(|save| save.replace(entity))
622 }
623
624 pub fn replace_many_atomic<E>(
630 &self,
631 entities: impl IntoIterator<Item = E>,
632 ) -> Result<WriteBatchResponse<E>, InternalError>
633 where
634 E: EntityKind<Canister = C> + EntityValue,
635 {
636 self.execute_save_batch(|save| save.replace_many_atomic(entities))
637 }
638
639 pub fn replace_many_non_atomic<E>(
643 &self,
644 entities: impl IntoIterator<Item = E>,
645 ) -> Result<WriteBatchResponse<E>, InternalError>
646 where
647 E: EntityKind<Canister = C> + EntityValue,
648 {
649 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
650 }
651
652 pub fn update<E>(&self, entity: E) -> Result<E, InternalError>
654 where
655 E: EntityKind<Canister = C> + EntityValue,
656 {
657 self.execute_save_entity(|save| save.update(entity))
658 }
659
660 pub fn update_many_atomic<E>(
666 &self,
667 entities: impl IntoIterator<Item = E>,
668 ) -> Result<WriteBatchResponse<E>, InternalError>
669 where
670 E: EntityKind<Canister = C> + EntityValue,
671 {
672 self.execute_save_batch(|save| save.update_many_atomic(entities))
673 }
674
675 pub fn update_many_non_atomic<E>(
679 &self,
680 entities: impl IntoIterator<Item = E>,
681 ) -> Result<WriteBatchResponse<E>, InternalError>
682 where
683 E: EntityKind<Canister = C> + EntityValue,
684 {
685 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
686 }
687
688 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
690 where
691 E: EntityKind<Canister = C> + EntityValue,
692 {
693 self.execute_save_view::<E>(|save| save.insert_view(view))
694 }
695
696 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
698 where
699 E: EntityKind<Canister = C> + EntityValue,
700 {
701 self.execute_save_view::<E>(|save| save.replace_view(view))
702 }
703
704 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
706 where
707 E: EntityKind<Canister = C> + EntityValue,
708 {
709 self.execute_save_view::<E>(|save| save.update_view(view))
710 }
711
712 #[cfg(test)]
714 #[doc(hidden)]
715 pub fn clear_stores_for_tests(&self) {
716 self.db.with_store_registry(|reg| {
717 for (_, store) in reg.iter() {
720 store.with_data_mut(DataStore::clear);
721 store.with_index_mut(IndexStore::clear);
722 }
723 });
724 }
725}
726
727const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
728 match strategy {
729 ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
730 ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
731 ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
732 }
733}
734
735#[cfg(test)]
740mod tests {
741 use super::*;
742 use crate::{
743 db::{
744 Db,
745 commit::{ensure_recovered, init_commit_store_for_tests},
746 cursor::CursorPlanError,
747 data::DataStore,
748 index::IndexStore,
749 query::plan::expr::{Expr, ProjectionField},
750 registry::StoreRegistry,
751 },
752 model::field::FieldKind,
753 testing::test_memory,
754 traits::Path,
755 types::Ulid,
756 value::Value,
757 };
758 use icydb_derive::FieldProjection;
759 use serde::{Deserialize, Serialize};
760 use std::cell::RefCell;
761
762 crate::test_canister! {
763 ident = SessionSqlCanister,
764 commit_memory_id = crate::testing::test_commit_memory_id(),
765 }
766
767 crate::test_store! {
768 ident = SessionSqlStore,
769 canister = SessionSqlCanister,
770 }
771
772 thread_local! {
773 static SESSION_SQL_DATA_STORE: RefCell<DataStore> =
774 RefCell::new(DataStore::init(test_memory(160)));
775 static SESSION_SQL_INDEX_STORE: RefCell<IndexStore> =
776 RefCell::new(IndexStore::init(test_memory(161)));
777 static SESSION_SQL_STORE_REGISTRY: StoreRegistry = {
778 let mut reg = StoreRegistry::new();
779 reg.register_store(
780 SessionSqlStore::PATH,
781 &SESSION_SQL_DATA_STORE,
782 &SESSION_SQL_INDEX_STORE,
783 )
784 .expect("SQL session test store registration should succeed");
785 reg
786 };
787 }
788
789 static SESSION_SQL_DB: Db<SessionSqlCanister> = Db::new(&SESSION_SQL_STORE_REGISTRY);
790
791 #[derive(Clone, Debug, Default, Deserialize, FieldProjection, PartialEq, Serialize)]
798 struct SessionSqlEntity {
799 id: Ulid,
800 name: String,
801 age: u64,
802 }
803
804 crate::test_entity_schema! {
805 ident = SessionSqlEntity,
806 id = Ulid,
807 id_field = id,
808 entity_name = "SessionSqlEntity",
809 primary_key = "id",
810 pk_index = 0,
811 fields = [
812 ("id", FieldKind::Ulid),
813 ("name", FieldKind::Text),
814 ("age", FieldKind::Uint),
815 ],
816 indexes = [],
817 store = SessionSqlStore,
818 canister = SessionSqlCanister,
819 }
820
821 fn reset_session_sql_store() {
823 init_commit_store_for_tests().expect("commit store init should succeed");
824 ensure_recovered(&SESSION_SQL_DB).expect("write-side recovery should succeed");
825 SESSION_SQL_DATA_STORE.with(|store| store.borrow_mut().clear());
826 SESSION_SQL_INDEX_STORE.with(|store| store.borrow_mut().clear());
827 }
828
829 fn sql_session() -> DbSession<SessionSqlCanister> {
830 DbSession::new(SESSION_SQL_DB)
831 }
832
833 fn assert_query_error_is_cursor_plan(
835 err: QueryError,
836 predicate: impl FnOnce(&CursorPlanError) -> bool,
837 ) {
838 assert!(matches!(
839 err,
840 QueryError::Plan(plan_err)
841 if matches!(
842 plan_err.as_ref(),
843 PlanError::Cursor(inner) if predicate(inner.as_ref())
844 )
845 ));
846 }
847
848 fn assert_cursor_mapping_parity(
850 build: impl Fn() -> CursorPlanError,
851 predicate: impl Fn(&CursorPlanError) -> bool + Copy,
852 ) {
853 let mapped_via_executor = map_executor_plan_error(ExecutorPlanError::from(build()));
854 assert_query_error_is_cursor_plan(mapped_via_executor, predicate);
855
856 let mapped_via_plan = QueryError::from(PlanError::from(build()));
857 assert_query_error_is_cursor_plan(mapped_via_plan, predicate);
858 }
859
860 #[test]
861 fn session_cursor_error_mapping_parity_boundary_arity() {
862 assert_cursor_mapping_parity(
863 || CursorPlanError::continuation_cursor_boundary_arity_mismatch(2, 1),
864 |inner| {
865 matches!(
866 inner,
867 CursorPlanError::ContinuationCursorBoundaryArityMismatch {
868 expected: 2,
869 found: 1
870 }
871 )
872 },
873 );
874 }
875
876 #[test]
877 fn session_cursor_error_mapping_parity_window_mismatch() {
878 assert_cursor_mapping_parity(
879 || CursorPlanError::continuation_cursor_window_mismatch(8, 3),
880 |inner| {
881 matches!(
882 inner,
883 CursorPlanError::ContinuationCursorWindowMismatch {
884 expected_offset: 8,
885 actual_offset: 3
886 }
887 )
888 },
889 );
890 }
891
892 #[test]
893 fn session_cursor_error_mapping_parity_decode_reason() {
894 assert_cursor_mapping_parity(
895 || {
896 CursorPlanError::invalid_continuation_cursor(
897 crate::db::codec::cursor::CursorDecodeError::OddLength,
898 )
899 },
900 |inner| {
901 matches!(
902 inner,
903 CursorPlanError::InvalidContinuationCursor {
904 reason: crate::db::codec::cursor::CursorDecodeError::OddLength
905 }
906 )
907 },
908 );
909 }
910
911 #[test]
912 fn session_cursor_error_mapping_parity_primary_key_type_mismatch() {
913 assert_cursor_mapping_parity(
914 || {
915 CursorPlanError::continuation_cursor_primary_key_type_mismatch(
916 "id",
917 "ulid",
918 Some(crate::value::Value::Text("not-a-ulid".to_string())),
919 )
920 },
921 |inner| {
922 matches!(
923 inner,
924 CursorPlanError::ContinuationCursorPrimaryKeyTypeMismatch {
925 field,
926 expected,
927 value: Some(crate::value::Value::Text(value))
928 } if field == "id" && expected == "ulid" && value == "not-a-ulid"
929 )
930 },
931 );
932 }
933
934 #[test]
935 fn session_cursor_error_mapping_parity_matrix_preserves_cursor_variants() {
936 assert_cursor_mapping_parity(
938 || CursorPlanError::continuation_cursor_boundary_arity_mismatch(2, 1),
939 |inner| {
940 matches!(
941 inner,
942 CursorPlanError::ContinuationCursorBoundaryArityMismatch {
943 expected: 2,
944 found: 1
945 }
946 )
947 },
948 );
949 }
950
951 #[test]
952 fn execute_sql_select_star_honors_order_limit_offset() {
953 reset_session_sql_store();
954 let session = sql_session();
955
956 session
957 .insert(SessionSqlEntity {
958 id: Ulid::generate(),
959 name: "older".to_string(),
960 age: 37,
961 })
962 .expect("seed insert should succeed");
963 session
964 .insert(SessionSqlEntity {
965 id: Ulid::generate(),
966 name: "younger".to_string(),
967 age: 19,
968 })
969 .expect("seed insert should succeed");
970
971 let response = session
972 .execute_sql::<SessionSqlEntity>(
973 "SELECT * FROM SessionSqlEntity ORDER BY age ASC LIMIT 1 OFFSET 1",
974 )
975 .expect("SELECT * should execute");
976
977 assert_eq!(response.count(), 1, "window should return one row");
978 let row = response
979 .iter()
980 .next()
981 .expect("windowed result should include one row");
982 assert_eq!(
983 row.entity_ref().name,
984 "older",
985 "ordered window should return the second age-ordered row",
986 );
987 }
988
989 #[test]
990 fn execute_sql_delete_honors_predicate_order_and_limit() {
991 reset_session_sql_store();
992 let session = sql_session();
993
994 session
995 .insert(SessionSqlEntity {
996 id: Ulid::generate(),
997 name: "first-minor".to_string(),
998 age: 16,
999 })
1000 .expect("seed insert should succeed");
1001 session
1002 .insert(SessionSqlEntity {
1003 id: Ulid::generate(),
1004 name: "second-minor".to_string(),
1005 age: 17,
1006 })
1007 .expect("seed insert should succeed");
1008 session
1009 .insert(SessionSqlEntity {
1010 id: Ulid::generate(),
1011 name: "adult".to_string(),
1012 age: 42,
1013 })
1014 .expect("seed insert should succeed");
1015
1016 let deleted = session
1017 .execute_sql::<SessionSqlEntity>(
1018 "DELETE FROM SessionSqlEntity WHERE age < 20 ORDER BY age ASC LIMIT 1",
1019 )
1020 .expect("DELETE should execute");
1021
1022 assert_eq!(deleted.count(), 1, "delete limit should remove one row");
1023 assert_eq!(
1024 deleted
1025 .iter()
1026 .next()
1027 .expect("deleted row should exist")
1028 .entity_ref()
1029 .age,
1030 16,
1031 "ordered delete should remove the youngest matching row first",
1032 );
1033
1034 let remaining = session
1035 .load::<SessionSqlEntity>()
1036 .order_by("age")
1037 .execute()
1038 .expect("post-delete load should succeed");
1039 let remaining_ages = remaining
1040 .iter()
1041 .map(|row| row.entity_ref().age)
1042 .collect::<Vec<_>>();
1043
1044 assert_eq!(
1045 remaining_ages,
1046 vec![17, 42],
1047 "delete window semantics should preserve non-deleted rows",
1048 );
1049 }
1050
1051 #[test]
1052 fn query_from_sql_rejects_explain_statements() {
1053 reset_session_sql_store();
1054 let session = sql_session();
1055
1056 let err = session
1057 .query_from_sql::<SessionSqlEntity>("EXPLAIN SELECT * FROM SessionSqlEntity")
1058 .expect_err("query_from_sql must reject EXPLAIN statements");
1059
1060 assert!(
1061 matches!(
1062 err,
1063 QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1064 _
1065 ))
1066 ),
1067 "query_from_sql EXPLAIN rejection must map to unsupported execution class",
1068 );
1069 }
1070
1071 #[test]
1072 fn query_from_sql_select_field_projection_lowers_to_scalar_field_selection() {
1073 reset_session_sql_store();
1074 let session = sql_session();
1075
1076 let query = session
1077 .query_from_sql::<SessionSqlEntity>("SELECT name, age FROM SessionSqlEntity")
1078 .expect("field-list SQL query should lower");
1079 let projection = query
1080 .plan()
1081 .expect("field-list SQL plan should build")
1082 .projection_spec();
1083 let field_names = projection
1084 .fields()
1085 .map(|field| match field {
1086 ProjectionField::Scalar {
1087 expr: Expr::Field(field),
1088 alias: None,
1089 } => field.as_str().to_string(),
1090 other @ ProjectionField::Scalar { .. } => {
1091 panic!("field-list SQL projection should lower to plain field exprs: {other:?}")
1092 }
1093 })
1094 .collect::<Vec<_>>();
1095
1096 assert_eq!(field_names, vec!["name".to_string(), "age".to_string()]);
1097 }
1098
1099 #[test]
1100 fn execute_sql_select_field_projection_currently_returns_entity_shaped_rows() {
1101 reset_session_sql_store();
1102 let session = sql_session();
1103
1104 session
1105 .insert(SessionSqlEntity {
1106 id: Ulid::generate(),
1107 name: "projected-row".to_string(),
1108 age: 29,
1109 })
1110 .expect("seed insert should succeed");
1111
1112 let response = session
1113 .execute_sql::<SessionSqlEntity>(
1114 "SELECT name FROM SessionSqlEntity ORDER BY age ASC LIMIT 1",
1115 )
1116 .expect("field-list SQL projection should execute");
1117 let row = response
1118 .iter()
1119 .next()
1120 .expect("field-list SQL projection response should contain one row");
1121
1122 assert_eq!(
1123 row.entity_ref().name,
1124 "projected-row",
1125 "field-list SQL projection should still return entity rows in this baseline",
1126 );
1127 assert_eq!(
1128 row.entity_ref().age,
1129 29,
1130 "field-list SQL projection should preserve full entity payload until projection response shaping is introduced",
1131 );
1132 }
1133
1134 #[test]
1135 fn execute_sql_projection_select_field_list_returns_projection_shaped_rows() {
1136 reset_session_sql_store();
1137 let session = sql_session();
1138
1139 session
1140 .insert(SessionSqlEntity {
1141 id: Ulid::generate(),
1142 name: "projection-surface".to_string(),
1143 age: 33,
1144 })
1145 .expect("seed insert should succeed");
1146
1147 let response = session
1148 .execute_sql_projection::<SessionSqlEntity>(
1149 "SELECT name FROM SessionSqlEntity ORDER BY age ASC LIMIT 1",
1150 )
1151 .expect("projection SQL execution should succeed");
1152 let row = response
1153 .iter()
1154 .next()
1155 .expect("projection SQL response should contain one row");
1156
1157 assert_eq!(response.count(), 1);
1158 assert_eq!(
1159 row.values(),
1160 [Value::Text("projection-surface".to_string())],
1161 "projection SQL response should carry only projected field values in declaration order",
1162 );
1163 }
1164
1165 #[test]
1166 fn execute_sql_projection_select_star_returns_all_fields_in_model_order() {
1167 reset_session_sql_store();
1168 let session = sql_session();
1169
1170 session
1171 .insert(SessionSqlEntity {
1172 id: Ulid::generate(),
1173 name: "projection-star".to_string(),
1174 age: 41,
1175 })
1176 .expect("seed insert should succeed");
1177
1178 let response = session
1179 .execute_sql_projection::<SessionSqlEntity>(
1180 "SELECT * FROM SessionSqlEntity ORDER BY age ASC LIMIT 1",
1181 )
1182 .expect("projection SQL star execution should succeed");
1183 let row = response
1184 .iter()
1185 .next()
1186 .expect("projection SQL star response should contain one row");
1187
1188 assert_eq!(response.count(), 1);
1189 assert_eq!(
1190 row.values().len(),
1191 3,
1192 "SELECT * projection response should include all model fields",
1193 );
1194 assert_eq!(row.values()[0], Value::Ulid(row.id().key()));
1195 assert_eq!(row.values()[1], Value::Text("projection-star".to_string()));
1196 assert_eq!(row.values()[2], Value::Uint(41));
1197 }
1198
1199 #[test]
1200 fn execute_sql_projection_rejects_delete_statements() {
1201 reset_session_sql_store();
1202 let session = sql_session();
1203
1204 let err = session
1205 .execute_sql_projection::<SessionSqlEntity>(
1206 "DELETE FROM SessionSqlEntity ORDER BY age LIMIT 1",
1207 )
1208 .expect_err("projection SQL execution should reject delete statements");
1209
1210 assert!(
1211 matches!(
1212 err,
1213 QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1214 _
1215 ))
1216 ),
1217 "projection SQL delete usage should fail as unsupported",
1218 );
1219 }
1220
1221 #[test]
1222 fn execute_sql_select_field_projection_unknown_field_fails_with_plan_error() {
1223 reset_session_sql_store();
1224 let session = sql_session();
1225
1226 let err = session
1227 .execute_sql::<SessionSqlEntity>("SELECT missing_field FROM SessionSqlEntity")
1228 .expect_err("unknown projected fields should fail planner validation");
1229
1230 assert!(
1231 matches!(err, QueryError::Plan(_)),
1232 "unknown projected fields should surface planner-domain query errors: {err:?}",
1233 );
1234 }
1235
1236 #[test]
1237 fn execute_sql_rejects_aggregate_projection_in_current_slice() {
1238 reset_session_sql_store();
1239 let session = sql_session();
1240
1241 let err = session
1242 .execute_sql::<SessionSqlEntity>("SELECT COUNT(*) FROM SessionSqlEntity")
1243 .expect_err("aggregate SQL projection should remain lowering-gated in this slice");
1244
1245 assert!(
1246 matches!(
1247 err,
1248 QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1249 _
1250 ))
1251 ),
1252 "aggregate projection gate should remain an unsupported execution error boundary",
1253 );
1254 }
1255
1256 #[test]
1257 fn execute_sql_rejects_group_by_in_current_slice() {
1258 reset_session_sql_store();
1259 let session = sql_session();
1260
1261 let err = session
1262 .execute_sql::<SessionSqlEntity>("SELECT * FROM SessionSqlEntity GROUP BY age")
1263 .expect_err("GROUP BY should be rejected in this slice");
1264
1265 assert!(
1266 matches!(
1267 err,
1268 QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1269 _
1270 ))
1271 ),
1272 "group-by gate should remain an unsupported execution error boundary",
1273 );
1274 }
1275
1276 #[test]
1277 fn explain_sql_execution_returns_descriptor_text() {
1278 reset_session_sql_store();
1279 let session = sql_session();
1280
1281 let explain = session
1282 .explain_sql::<SessionSqlEntity>(
1283 "EXPLAIN EXECUTION SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
1284 )
1285 .expect("EXPLAIN EXECUTION should succeed");
1286
1287 assert!(
1288 explain.contains("node_id=0"),
1289 "execution explain output should include the root descriptor node id",
1290 );
1291 assert!(
1292 explain.contains("layer="),
1293 "execution explain output should include execution layer annotations",
1294 );
1295 }
1296
1297 #[test]
1298 fn explain_sql_plan_returns_logical_plan_text() {
1299 reset_session_sql_store();
1300 let session = sql_session();
1301
1302 let explain = session
1303 .explain_sql::<SessionSqlEntity>(
1304 "EXPLAIN SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
1305 )
1306 .expect("EXPLAIN should succeed");
1307
1308 assert!(
1309 explain.contains("mode=Load"),
1310 "logical explain text should include query mode projection",
1311 );
1312 assert!(
1313 explain.contains("access="),
1314 "logical explain text should include projected access shape",
1315 );
1316 }
1317
1318 #[test]
1319 fn explain_sql_json_returns_logical_plan_json() {
1320 reset_session_sql_store();
1321 let session = sql_session();
1322
1323 let explain = session
1324 .explain_sql::<SessionSqlEntity>(
1325 "EXPLAIN JSON SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
1326 )
1327 .expect("EXPLAIN JSON should succeed");
1328
1329 assert!(
1330 explain.starts_with('{') && explain.ends_with('}'),
1331 "logical explain JSON should render one JSON object payload",
1332 );
1333 assert!(
1334 explain.contains("\"mode\":{\"type\":\"Load\""),
1335 "logical explain JSON should expose structured query mode metadata",
1336 );
1337 assert!(
1338 explain.contains("\"access\":"),
1339 "logical explain JSON should include projected access metadata",
1340 );
1341 }
1342
1343 #[test]
1344 fn explain_sql_json_delete_returns_logical_delete_mode() {
1345 reset_session_sql_store();
1346 let session = sql_session();
1347
1348 let explain = session
1349 .explain_sql::<SessionSqlEntity>(
1350 "EXPLAIN JSON DELETE FROM SessionSqlEntity ORDER BY age LIMIT 1",
1351 )
1352 .expect("EXPLAIN JSON DELETE should succeed");
1353
1354 assert!(
1355 explain.contains("\"mode\":{\"type\":\"Delete\""),
1356 "logical explain JSON should expose delete query mode metadata",
1357 );
1358 }
1359
1360 #[test]
1361 fn explain_sql_rejects_non_explain_statements() {
1362 reset_session_sql_store();
1363 let session = sql_session();
1364
1365 let err = session
1366 .explain_sql::<SessionSqlEntity>("SELECT * FROM SessionSqlEntity")
1367 .expect_err("explain_sql must reject non-EXPLAIN statements");
1368
1369 assert!(
1370 matches!(
1371 err,
1372 QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1373 _
1374 ))
1375 ),
1376 "non-EXPLAIN input must fail as unsupported explain usage",
1377 );
1378 }
1379}