1#[cfg(test)]
7use crate::db::{DataStore, IndexStore};
8use crate::{
9 db::{
10 Db, EntityResponse, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
11 MissingRowPolicy, PagedGroupedExecutionWithTrace, PagedLoadExecutionWithTrace, PlanError,
12 Query, QueryError, QueryTracePlan, StorageReport, StoreRegistry, TraceExecutionStrategy,
13 WriteBatchResponse,
14 access::AccessStrategy,
15 commit::EntityRuntimeHooks,
16 cursor::decode_optional_cursor_token,
17 executor::{
18 DeleteExecutor, ExecutablePlan, ExecutionStrategy, ExecutorPlanError, LoadExecutor,
19 SaveExecutor,
20 },
21 query::{
22 builder::aggregate::AggregateExpr, explain::ExplainAggregateTerminalPlan,
23 plan::QueryMode,
24 },
25 schema::{describe_entity_model, show_indexes_for_model},
26 sql::lowering::{SqlCommand, SqlLoweringError, compile_sql_command},
27 },
28 error::{ErrorClass, ErrorOrigin, InternalError},
29 metrics::sink::{MetricsSink, with_metrics_sink},
30 traits::{CanisterKind, EntityKind, EntityValue},
31 value::Value,
32};
33use std::thread::LocalKey;
34
35fn map_executor_plan_error(err: ExecutorPlanError) -> QueryError {
37 match err {
38 ExecutorPlanError::Cursor(err) => QueryError::from(PlanError::from(*err)),
39 }
40}
41
42fn decode_optional_cursor_bytes(cursor_token: Option<&str>) -> Result<Option<Vec<u8>>, QueryError> {
45 decode_optional_cursor_token(cursor_token).map_err(|err| QueryError::from(PlanError::from(err)))
46}
47
48fn map_sql_lowering_error(err: SqlLoweringError) -> QueryError {
50 QueryError::execute(InternalError::classified(
51 ErrorClass::Unsupported,
52 ErrorOrigin::Query,
53 format!("SQL query is not executable in this release: {err}"),
54 ))
55}
56
57pub struct DbSession<C: CanisterKind> {
64 db: Db<C>,
65 debug: bool,
66 metrics: Option<&'static dyn MetricsSink>,
67}
68
69impl<C: CanisterKind> DbSession<C> {
70 #[must_use]
72 pub(crate) const fn new(db: Db<C>) -> Self {
73 Self {
74 db,
75 debug: false,
76 metrics: None,
77 }
78 }
79
80 #[must_use]
82 pub const fn new_with_hooks(
83 store: &'static LocalKey<StoreRegistry>,
84 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
85 ) -> Self {
86 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
87 }
88
89 #[must_use]
91 pub const fn debug(mut self) -> Self {
92 self.debug = true;
93 self
94 }
95
96 #[must_use]
98 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
99 self.metrics = Some(sink);
100 self
101 }
102
103 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
104 if let Some(sink) = self.metrics {
105 with_metrics_sink(sink, f)
106 } else {
107 f()
108 }
109 }
110
111 fn execute_save_with<E, T, R>(
113 &self,
114 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
115 map: impl FnOnce(T) -> R,
116 ) -> Result<R, InternalError>
117 where
118 E: EntityKind<Canister = C> + EntityValue,
119 {
120 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
121
122 Ok(map(value))
123 }
124
125 fn execute_save_entity<E>(
127 &self,
128 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
129 ) -> Result<E, InternalError>
130 where
131 E: EntityKind<Canister = C> + EntityValue,
132 {
133 self.execute_save_with(op, std::convert::identity)
134 }
135
136 fn execute_save_batch<E>(
137 &self,
138 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
139 ) -> Result<WriteBatchResponse<E>, InternalError>
140 where
141 E: EntityKind<Canister = C> + EntityValue,
142 {
143 self.execute_save_with(op, WriteBatchResponse::new)
144 }
145
146 fn execute_save_view<E>(
147 &self,
148 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
149 ) -> Result<E::ViewType, InternalError>
150 where
151 E: EntityKind<Canister = C> + EntityValue,
152 {
153 self.execute_save_with(op, std::convert::identity)
154 }
155
156 #[must_use]
162 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
163 where
164 E: EntityKind<Canister = C>,
165 {
166 FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
167 }
168
169 #[must_use]
171 pub const fn load_with_consistency<E>(
172 &self,
173 consistency: MissingRowPolicy,
174 ) -> FluentLoadQuery<'_, E>
175 where
176 E: EntityKind<Canister = C>,
177 {
178 FluentLoadQuery::new(self, Query::new(consistency))
179 }
180
181 pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
186 where
187 E: EntityKind<Canister = C>,
188 {
189 let command = compile_sql_command::<E>(sql, MissingRowPolicy::Ignore)
190 .map_err(map_sql_lowering_error)?;
191
192 match command {
193 SqlCommand::Query(query) => Ok(query),
194 SqlCommand::Explain { .. } => Err(QueryError::execute(InternalError::classified(
195 ErrorClass::Unsupported,
196 ErrorOrigin::Query,
197 "query_from_sql does not accept EXPLAIN statements; use explain_sql(...)",
198 ))),
199 }
200 }
201
202 pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
204 where
205 E: EntityKind<Canister = C> + EntityValue,
206 {
207 let query = self.query_from_sql::<E>(sql)?;
208 self.execute_query(&query)
209 }
210
211 pub fn explain_sql<E>(&self, sql: &str) -> Result<String, QueryError>
218 where
219 E: EntityKind<Canister = C> + EntityValue,
220 {
221 let command = compile_sql_command::<E>(sql, MissingRowPolicy::Ignore)
222 .map_err(map_sql_lowering_error)?;
223
224 match command {
225 SqlCommand::Query(_) => Err(QueryError::execute(InternalError::classified(
226 ErrorClass::Unsupported,
227 ErrorOrigin::Query,
228 "explain_sql requires an EXPLAIN statement",
229 ))),
230 SqlCommand::Explain { mode, query } => match mode {
231 crate::db::sql::parser::SqlExplainMode::Plan => {
232 Ok(format!("{:?}", query.explain()?))
233 }
234 crate::db::sql::parser::SqlExplainMode::Execution => query.explain_execution_text(),
235 crate::db::sql::parser::SqlExplainMode::Json => query.explain_execution_json(),
236 },
237 }
238 }
239
240 #[must_use]
242 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
243 where
244 E: EntityKind<Canister = C>,
245 {
246 FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
247 }
248
249 #[must_use]
251 pub fn delete_with_consistency<E>(
252 &self,
253 consistency: MissingRowPolicy,
254 ) -> FluentDeleteQuery<'_, E>
255 where
256 E: EntityKind<Canister = C>,
257 {
258 FluentDeleteQuery::new(self, Query::new(consistency).delete())
259 }
260
261 #[must_use]
265 pub const fn select_one(&self) -> Value {
266 Value::Int(1)
267 }
268
269 #[must_use]
276 pub fn show_indexes<E>(&self) -> Vec<String>
277 where
278 E: EntityKind<Canister = C>,
279 {
280 show_indexes_for_model(E::MODEL)
281 }
282
283 #[must_use]
288 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
289 where
290 E: EntityKind<Canister = C>,
291 {
292 describe_entity_model(E::MODEL)
293 }
294
295 pub fn storage_report(
297 &self,
298 name_to_path: &[(&'static str, &'static str)],
299 ) -> Result<StorageReport, InternalError> {
300 self.db.storage_report(name_to_path)
301 }
302
303 #[must_use]
308 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
309 where
310 E: EntityKind<Canister = C> + EntityValue,
311 {
312 LoadExecutor::new(self.db, self.debug)
313 }
314
315 #[must_use]
316 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
317 where
318 E: EntityKind<Canister = C> + EntityValue,
319 {
320 DeleteExecutor::new(self.db, self.debug)
321 }
322
323 #[must_use]
324 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
325 where
326 E: EntityKind<Canister = C> + EntityValue,
327 {
328 SaveExecutor::new(self.db, self.debug)
329 }
330
331 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
337 where
338 E: EntityKind<Canister = C> + EntityValue,
339 {
340 let plan = query.plan()?.into_executable();
341
342 let result = match query.mode() {
343 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
344 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
345 };
346
347 result.map_err(QueryError::execute)
348 }
349
350 pub(in crate::db) fn execute_load_query_with<E, T>(
353 &self,
354 query: &Query<E>,
355 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
356 ) -> Result<T, QueryError>
357 where
358 E: EntityKind<Canister = C> + EntityValue,
359 {
360 let plan = query.plan()?.into_executable();
361
362 self.with_metrics(|| op(self.load_executor::<E>(), plan))
363 .map_err(QueryError::execute)
364 }
365
366 pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
371 where
372 E: EntityKind<Canister = C>,
373 {
374 let compiled = query.plan()?;
375 let explain = compiled.explain();
376 let plan_hash = compiled.plan_hash_hex();
377
378 let executable = compiled.into_executable();
379 let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
380 let execution_strategy = match query.mode() {
381 QueryMode::Load(_) => Some(trace_execution_strategy(
382 executable
383 .execution_strategy()
384 .map_err(QueryError::execute)?,
385 )),
386 QueryMode::Delete(_) => None,
387 };
388
389 Ok(QueryTracePlan::new(
390 plan_hash,
391 access_strategy,
392 execution_strategy,
393 explain,
394 ))
395 }
396
397 pub(crate) fn explain_load_query_terminal_with<E>(
399 query: &Query<E>,
400 aggregate: AggregateExpr,
401 ) -> Result<ExplainAggregateTerminalPlan, QueryError>
402 where
403 E: EntityKind<Canister = C> + EntityValue,
404 {
405 let compiled = query.plan()?;
407 let query_explain = compiled.explain();
408 let terminal = aggregate.kind();
409
410 let executable = compiled.into_executable();
412 let execution = executable.explain_aggregate_terminal_execution_descriptor(aggregate);
413
414 Ok(ExplainAggregateTerminalPlan::new(
415 query_explain,
416 terminal,
417 execution,
418 ))
419 }
420
421 pub(crate) fn execute_load_query_paged_with_trace<E>(
423 &self,
424 query: &Query<E>,
425 cursor_token: Option<&str>,
426 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
427 where
428 E: EntityKind<Canister = C> + EntityValue,
429 {
430 let plan = query.plan()?.into_executable();
432 match plan.execution_strategy().map_err(QueryError::execute)? {
433 ExecutionStrategy::PrimaryKey => {
434 return Err(QueryError::execute(
435 crate::db::error::query_executor_invariant(
436 "cursor pagination requires explicit or grouped ordering",
437 ),
438 ));
439 }
440 ExecutionStrategy::Ordered => {}
441 ExecutionStrategy::Grouped => {
442 return Err(QueryError::execute(
443 crate::db::error::query_executor_invariant(
444 "grouped plans require execute_grouped(...)",
445 ),
446 ));
447 }
448 }
449
450 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
452 let cursor = plan
453 .prepare_cursor(cursor_bytes.as_deref())
454 .map_err(map_executor_plan_error)?;
455
456 let (page, trace) = self
458 .with_metrics(|| {
459 self.load_executor::<E>()
460 .execute_paged_with_cursor_traced(plan, cursor)
461 })
462 .map_err(QueryError::execute)?;
463 let next_cursor = page
464 .next_cursor
465 .map(|token| {
466 let Some(token) = token.as_scalar() else {
467 return Err(QueryError::execute(
468 crate::db::error::query_executor_invariant(
469 "scalar load pagination emitted grouped continuation token",
470 ),
471 ));
472 };
473
474 token.encode().map_err(|err| {
475 QueryError::execute(InternalError::serialize_internal(format!(
476 "failed to serialize continuation cursor: {err}"
477 )))
478 })
479 })
480 .transpose()?;
481
482 Ok(PagedLoadExecutionWithTrace::new(
483 page.items,
484 next_cursor,
485 trace,
486 ))
487 }
488
489 pub fn execute_grouped<E>(
494 &self,
495 query: &Query<E>,
496 cursor_token: Option<&str>,
497 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
498 where
499 E: EntityKind<Canister = C> + EntityValue,
500 {
501 let plan = query.plan()?.into_executable();
503 if !matches!(
504 plan.execution_strategy().map_err(QueryError::execute)?,
505 ExecutionStrategy::Grouped
506 ) {
507 return Err(QueryError::execute(
508 crate::db::error::query_executor_invariant(
509 "execute_grouped requires grouped logical plans",
510 ),
511 ));
512 }
513
514 let cursor_bytes = decode_optional_cursor_bytes(cursor_token)?;
516 let cursor = plan
517 .prepare_grouped_cursor(cursor_bytes.as_deref())
518 .map_err(map_executor_plan_error)?;
519
520 let (page, trace) = self
522 .with_metrics(|| {
523 self.load_executor::<E>()
524 .execute_grouped_paged_with_cursor_traced(plan, cursor)
525 })
526 .map_err(QueryError::execute)?;
527 let next_cursor = page
528 .next_cursor
529 .map(|token| {
530 let Some(token) = token.as_grouped() else {
531 return Err(QueryError::execute(
532 crate::db::error::query_executor_invariant(
533 "grouped pagination emitted scalar continuation token",
534 ),
535 ));
536 };
537
538 token.encode().map_err(|err| {
539 QueryError::execute(InternalError::serialize_internal(format!(
540 "failed to serialize grouped continuation cursor: {err}"
541 )))
542 })
543 })
544 .transpose()?;
545
546 Ok(PagedGroupedExecutionWithTrace::new(
547 page.rows,
548 next_cursor,
549 trace,
550 ))
551 }
552
553 pub fn insert<E>(&self, entity: E) -> Result<E, InternalError>
559 where
560 E: EntityKind<Canister = C> + EntityValue,
561 {
562 self.execute_save_entity(|save| save.insert(entity))
563 }
564
565 pub fn insert_many_atomic<E>(
571 &self,
572 entities: impl IntoIterator<Item = E>,
573 ) -> Result<WriteBatchResponse<E>, InternalError>
574 where
575 E: EntityKind<Canister = C> + EntityValue,
576 {
577 self.execute_save_batch(|save| save.insert_many_atomic(entities))
578 }
579
580 pub fn insert_many_non_atomic<E>(
584 &self,
585 entities: impl IntoIterator<Item = E>,
586 ) -> Result<WriteBatchResponse<E>, InternalError>
587 where
588 E: EntityKind<Canister = C> + EntityValue,
589 {
590 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
591 }
592
593 pub fn replace<E>(&self, entity: E) -> Result<E, InternalError>
595 where
596 E: EntityKind<Canister = C> + EntityValue,
597 {
598 self.execute_save_entity(|save| save.replace(entity))
599 }
600
601 pub fn replace_many_atomic<E>(
607 &self,
608 entities: impl IntoIterator<Item = E>,
609 ) -> Result<WriteBatchResponse<E>, InternalError>
610 where
611 E: EntityKind<Canister = C> + EntityValue,
612 {
613 self.execute_save_batch(|save| save.replace_many_atomic(entities))
614 }
615
616 pub fn replace_many_non_atomic<E>(
620 &self,
621 entities: impl IntoIterator<Item = E>,
622 ) -> Result<WriteBatchResponse<E>, InternalError>
623 where
624 E: EntityKind<Canister = C> + EntityValue,
625 {
626 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
627 }
628
629 pub fn update<E>(&self, entity: E) -> Result<E, InternalError>
631 where
632 E: EntityKind<Canister = C> + EntityValue,
633 {
634 self.execute_save_entity(|save| save.update(entity))
635 }
636
637 pub fn update_many_atomic<E>(
643 &self,
644 entities: impl IntoIterator<Item = E>,
645 ) -> Result<WriteBatchResponse<E>, InternalError>
646 where
647 E: EntityKind<Canister = C> + EntityValue,
648 {
649 self.execute_save_batch(|save| save.update_many_atomic(entities))
650 }
651
652 pub fn update_many_non_atomic<E>(
656 &self,
657 entities: impl IntoIterator<Item = E>,
658 ) -> Result<WriteBatchResponse<E>, InternalError>
659 where
660 E: EntityKind<Canister = C> + EntityValue,
661 {
662 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
663 }
664
665 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
667 where
668 E: EntityKind<Canister = C> + EntityValue,
669 {
670 self.execute_save_view::<E>(|save| save.insert_view(view))
671 }
672
673 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
675 where
676 E: EntityKind<Canister = C> + EntityValue,
677 {
678 self.execute_save_view::<E>(|save| save.replace_view(view))
679 }
680
681 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
683 where
684 E: EntityKind<Canister = C> + EntityValue,
685 {
686 self.execute_save_view::<E>(|save| save.update_view(view))
687 }
688
689 #[cfg(test)]
691 #[doc(hidden)]
692 pub fn clear_stores_for_tests(&self) {
693 self.db.with_store_registry(|reg| {
694 for (_, store) in reg.iter() {
697 store.with_data_mut(DataStore::clear);
698 store.with_index_mut(IndexStore::clear);
699 }
700 });
701 }
702}
703
704const fn trace_execution_strategy(strategy: ExecutionStrategy) -> TraceExecutionStrategy {
705 match strategy {
706 ExecutionStrategy::PrimaryKey => TraceExecutionStrategy::PrimaryKey,
707 ExecutionStrategy::Ordered => TraceExecutionStrategy::Ordered,
708 ExecutionStrategy::Grouped => TraceExecutionStrategy::Grouped,
709 }
710}
711
712#[cfg(test)]
717mod tests {
718 use super::*;
719 use crate::{
720 db::{
721 Db,
722 commit::{ensure_recovered, init_commit_store_for_tests},
723 cursor::CursorPlanError,
724 data::DataStore,
725 index::IndexStore,
726 registry::StoreRegistry,
727 },
728 model::field::FieldKind,
729 testing::test_memory,
730 traits::Path,
731 types::Ulid,
732 };
733 use icydb_derive::FieldProjection;
734 use serde::{Deserialize, Serialize};
735 use std::cell::RefCell;
736
737 crate::test_canister! {
738 ident = SessionSqlCanister,
739 commit_memory_id = crate::testing::test_commit_memory_id(),
740 }
741
742 crate::test_store! {
743 ident = SessionSqlStore,
744 canister = SessionSqlCanister,
745 }
746
747 thread_local! {
748 static SESSION_SQL_DATA_STORE: RefCell<DataStore> =
749 RefCell::new(DataStore::init(test_memory(160)));
750 static SESSION_SQL_INDEX_STORE: RefCell<IndexStore> =
751 RefCell::new(IndexStore::init(test_memory(161)));
752 static SESSION_SQL_STORE_REGISTRY: StoreRegistry = {
753 let mut reg = StoreRegistry::new();
754 reg.register_store(
755 SessionSqlStore::PATH,
756 &SESSION_SQL_DATA_STORE,
757 &SESSION_SQL_INDEX_STORE,
758 )
759 .expect("SQL session test store registration should succeed");
760 reg
761 };
762 }
763
764 static SESSION_SQL_DB: Db<SessionSqlCanister> = Db::new(&SESSION_SQL_STORE_REGISTRY);
765
766 #[derive(Clone, Debug, Default, Deserialize, FieldProjection, PartialEq, Serialize)]
773 struct SessionSqlEntity {
774 id: Ulid,
775 name: String,
776 age: u64,
777 }
778
779 crate::test_entity_schema! {
780 ident = SessionSqlEntity,
781 id = Ulid,
782 id_field = id,
783 entity_name = "SessionSqlEntity",
784 primary_key = "id",
785 pk_index = 0,
786 fields = [
787 ("id", FieldKind::Ulid),
788 ("name", FieldKind::Text),
789 ("age", FieldKind::Uint),
790 ],
791 indexes = [],
792 store = SessionSqlStore,
793 canister = SessionSqlCanister,
794 }
795
796 fn reset_session_sql_store() {
798 init_commit_store_for_tests().expect("commit store init should succeed");
799 ensure_recovered(&SESSION_SQL_DB).expect("write-side recovery should succeed");
800 SESSION_SQL_DATA_STORE.with(|store| store.borrow_mut().clear());
801 SESSION_SQL_INDEX_STORE.with(|store| store.borrow_mut().clear());
802 }
803
804 fn sql_session() -> DbSession<SessionSqlCanister> {
805 DbSession::new(SESSION_SQL_DB)
806 }
807
808 fn assert_query_error_is_cursor_plan(
810 err: QueryError,
811 predicate: impl FnOnce(&CursorPlanError) -> bool,
812 ) {
813 assert!(matches!(
814 err,
815 QueryError::Plan(plan_err)
816 if matches!(
817 plan_err.as_ref(),
818 PlanError::Cursor(inner) if predicate(inner.as_ref())
819 )
820 ));
821 }
822
823 fn assert_cursor_mapping_parity(
825 build: impl Fn() -> CursorPlanError,
826 predicate: impl Fn(&CursorPlanError) -> bool + Copy,
827 ) {
828 let mapped_via_executor = map_executor_plan_error(ExecutorPlanError::from(build()));
829 assert_query_error_is_cursor_plan(mapped_via_executor, predicate);
830
831 let mapped_via_plan = QueryError::from(PlanError::from(build()));
832 assert_query_error_is_cursor_plan(mapped_via_plan, predicate);
833 }
834
835 #[test]
836 fn session_cursor_error_mapping_parity_boundary_arity() {
837 assert_cursor_mapping_parity(
838 || CursorPlanError::continuation_cursor_boundary_arity_mismatch(2, 1),
839 |inner| {
840 matches!(
841 inner,
842 CursorPlanError::ContinuationCursorBoundaryArityMismatch {
843 expected: 2,
844 found: 1
845 }
846 )
847 },
848 );
849 }
850
851 #[test]
852 fn session_cursor_error_mapping_parity_window_mismatch() {
853 assert_cursor_mapping_parity(
854 || CursorPlanError::continuation_cursor_window_mismatch(8, 3),
855 |inner| {
856 matches!(
857 inner,
858 CursorPlanError::ContinuationCursorWindowMismatch {
859 expected_offset: 8,
860 actual_offset: 3
861 }
862 )
863 },
864 );
865 }
866
867 #[test]
868 fn session_cursor_error_mapping_parity_decode_reason() {
869 assert_cursor_mapping_parity(
870 || {
871 CursorPlanError::invalid_continuation_cursor(
872 crate::db::codec::cursor::CursorDecodeError::OddLength,
873 )
874 },
875 |inner| {
876 matches!(
877 inner,
878 CursorPlanError::InvalidContinuationCursor {
879 reason: crate::db::codec::cursor::CursorDecodeError::OddLength
880 }
881 )
882 },
883 );
884 }
885
886 #[test]
887 fn session_cursor_error_mapping_parity_primary_key_type_mismatch() {
888 assert_cursor_mapping_parity(
889 || {
890 CursorPlanError::continuation_cursor_primary_key_type_mismatch(
891 "id",
892 "ulid",
893 Some(crate::value::Value::Text("not-a-ulid".to_string())),
894 )
895 },
896 |inner| {
897 matches!(
898 inner,
899 CursorPlanError::ContinuationCursorPrimaryKeyTypeMismatch {
900 field,
901 expected,
902 value: Some(crate::value::Value::Text(value))
903 } if field == "id" && expected == "ulid" && value == "not-a-ulid"
904 )
905 },
906 );
907 }
908
909 #[test]
910 fn session_cursor_error_mapping_parity_matrix_preserves_cursor_variants() {
911 assert_cursor_mapping_parity(
913 || CursorPlanError::continuation_cursor_boundary_arity_mismatch(2, 1),
914 |inner| {
915 matches!(
916 inner,
917 CursorPlanError::ContinuationCursorBoundaryArityMismatch {
918 expected: 2,
919 found: 1
920 }
921 )
922 },
923 );
924 }
925
926 #[test]
927 fn execute_sql_select_star_honors_order_limit_offset() {
928 reset_session_sql_store();
929 let session = sql_session();
930
931 session
932 .insert(SessionSqlEntity {
933 id: Ulid::generate(),
934 name: "older".to_string(),
935 age: 37,
936 })
937 .expect("seed insert should succeed");
938 session
939 .insert(SessionSqlEntity {
940 id: Ulid::generate(),
941 name: "younger".to_string(),
942 age: 19,
943 })
944 .expect("seed insert should succeed");
945
946 let response = session
947 .execute_sql::<SessionSqlEntity>(
948 "SELECT * FROM SessionSqlEntity ORDER BY age ASC LIMIT 1 OFFSET 1",
949 )
950 .expect("SELECT * should execute");
951
952 assert_eq!(response.count(), 1, "window should return one row");
953 let row = response
954 .iter()
955 .next()
956 .expect("windowed result should include one row");
957 assert_eq!(
958 row.entity_ref().name,
959 "older",
960 "ordered window should return the second age-ordered row",
961 );
962 }
963
964 #[test]
965 fn execute_sql_delete_honors_predicate_order_and_limit() {
966 reset_session_sql_store();
967 let session = sql_session();
968
969 session
970 .insert(SessionSqlEntity {
971 id: Ulid::generate(),
972 name: "first-minor".to_string(),
973 age: 16,
974 })
975 .expect("seed insert should succeed");
976 session
977 .insert(SessionSqlEntity {
978 id: Ulid::generate(),
979 name: "second-minor".to_string(),
980 age: 17,
981 })
982 .expect("seed insert should succeed");
983 session
984 .insert(SessionSqlEntity {
985 id: Ulid::generate(),
986 name: "adult".to_string(),
987 age: 42,
988 })
989 .expect("seed insert should succeed");
990
991 let deleted = session
992 .execute_sql::<SessionSqlEntity>(
993 "DELETE FROM SessionSqlEntity WHERE age < 20 ORDER BY age ASC LIMIT 1",
994 )
995 .expect("DELETE should execute");
996
997 assert_eq!(deleted.count(), 1, "delete limit should remove one row");
998 assert_eq!(
999 deleted
1000 .iter()
1001 .next()
1002 .expect("deleted row should exist")
1003 .entity_ref()
1004 .age,
1005 16,
1006 "ordered delete should remove the youngest matching row first",
1007 );
1008
1009 let remaining = session
1010 .load::<SessionSqlEntity>()
1011 .order_by("age")
1012 .execute()
1013 .expect("post-delete load should succeed");
1014 let remaining_ages = remaining
1015 .iter()
1016 .map(|row| row.entity_ref().age)
1017 .collect::<Vec<_>>();
1018
1019 assert_eq!(
1020 remaining_ages,
1021 vec![17, 42],
1022 "delete window semantics should preserve non-deleted rows",
1023 );
1024 }
1025
1026 #[test]
1027 fn query_from_sql_rejects_explain_statements() {
1028 reset_session_sql_store();
1029 let session = sql_session();
1030
1031 let err = session
1032 .query_from_sql::<SessionSqlEntity>("EXPLAIN SELECT * FROM SessionSqlEntity")
1033 .expect_err("query_from_sql must reject EXPLAIN statements");
1034
1035 assert!(
1036 matches!(
1037 err,
1038 QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1039 _
1040 ))
1041 ),
1042 "query_from_sql EXPLAIN rejection must map to unsupported execution class",
1043 );
1044 }
1045
1046 #[test]
1047 fn explain_sql_execution_returns_descriptor_text() {
1048 reset_session_sql_store();
1049 let session = sql_session();
1050
1051 let explain = session
1052 .explain_sql::<SessionSqlEntity>(
1053 "EXPLAIN EXECUTION SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
1054 )
1055 .expect("EXPLAIN EXECUTION should succeed");
1056
1057 assert!(
1058 explain.contains("node_id=0"),
1059 "execution explain output should include the root descriptor node id",
1060 );
1061 assert!(
1062 explain.contains("layer="),
1063 "execution explain output should include execution layer annotations",
1064 );
1065 }
1066
1067 #[test]
1068 fn explain_sql_plan_returns_logical_plan_text() {
1069 reset_session_sql_store();
1070 let session = sql_session();
1071
1072 let explain = session
1073 .explain_sql::<SessionSqlEntity>(
1074 "EXPLAIN SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
1075 )
1076 .expect("EXPLAIN should succeed");
1077
1078 assert!(
1079 explain.contains("mode: Load"),
1080 "logical explain text should include query mode projection",
1081 );
1082 assert!(
1083 explain.contains("access:"),
1084 "logical explain text should include projected access shape",
1085 );
1086 }
1087
1088 #[test]
1089 fn explain_sql_json_returns_execution_descriptor_json() {
1090 reset_session_sql_store();
1091 let session = sql_session();
1092
1093 let explain = session
1094 .explain_sql::<SessionSqlEntity>(
1095 "EXPLAIN JSON SELECT * FROM SessionSqlEntity ORDER BY age LIMIT 1",
1096 )
1097 .expect("EXPLAIN JSON should succeed");
1098
1099 assert!(
1100 explain.starts_with('{') && explain.ends_with('}'),
1101 "execution explain JSON should render one JSON object payload",
1102 );
1103 assert!(
1104 explain.contains("\"node_id\":0"),
1105 "execution explain JSON should include the root descriptor node id",
1106 );
1107 }
1108
1109 #[test]
1110 fn explain_sql_rejects_non_explain_statements() {
1111 reset_session_sql_store();
1112 let session = sql_session();
1113
1114 let err = session
1115 .explain_sql::<SessionSqlEntity>("SELECT * FROM SessionSqlEntity")
1116 .expect_err("explain_sql must reject non-EXPLAIN statements");
1117
1118 assert!(
1119 matches!(
1120 err,
1121 QueryError::Execute(crate::db::query::intent::QueryExecutionError::Unsupported(
1122 _
1123 ))
1124 ),
1125 "non-EXPLAIN input must fail as unsupported explain usage",
1126 );
1127 }
1128}