Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10///
11/// TESTS
12///
13#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18    db::{
19        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20        FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
21        QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
22        commit::CommitSchemaFingerprint,
23        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
24        query::plan::VisibleIndexes,
25        schema::{
26            AcceptedRowDecodeContract, AcceptedRowLayoutRuntimeDescriptor, AcceptedSchemaSnapshot,
27            SchemaInfo, accepted_commit_schema_fingerprint, describe_entity_fields,
28            describe_entity_fields_with_persisted_schema, describe_entity_model,
29            describe_entity_model_with_persisted_schema, ensure_accepted_schema_snapshot,
30            show_indexes_for_model, show_indexes_for_model_with_runtime_state,
31            show_indexes_for_schema_info_with_runtime_state,
32        },
33    },
34    error::InternalError,
35    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
36    model::entity::EntityModel,
37    traits::{CanisterKind, EntityKind, EntityValue, Path},
38    value::Value,
39};
40use std::thread::LocalKey;
41
42#[cfg(feature = "diagnostics")]
43pub use query::{
44    DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
45    QueryExecutionAttribution,
46};
47pub(in crate::db) use response::finalize_structural_grouped_projection_result;
48pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
49#[cfg(all(feature = "sql", feature = "diagnostics"))]
50pub use sql::{
51    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
52    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
53};
54#[cfg(feature = "sql")]
55pub use sql::{
56    SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport, SqlStatementResult,
57    sql_statement_entity_name,
58};
59#[cfg(all(feature = "sql", feature = "diagnostics"))]
60pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
61
62///
63/// DbSession
64///
65/// Session-scoped database handle with policy (debug, metrics) and execution routing.
66///
67
68pub struct DbSession<C: CanisterKind> {
69    db: Db<C>,
70    debug: bool,
71    metrics: Option<&'static dyn MetricsSink>,
72}
73
74impl<C: CanisterKind> DbSession<C> {
75    /// Construct one session facade for a database handle.
76    #[must_use]
77    pub(crate) const fn new(db: Db<C>) -> Self {
78        Self {
79            db,
80            debug: false,
81            metrics: None,
82        }
83    }
84
85    /// Construct one session facade from store registry and runtime hooks.
86    #[must_use]
87    pub const fn new_with_hooks(
88        store: &'static LocalKey<StoreRegistry>,
89        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
90    ) -> Self {
91        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
92    }
93
94    /// Enable debug execution behavior where supported by executors.
95    #[must_use]
96    pub const fn debug(mut self) -> Self {
97        self.debug = true;
98        self
99    }
100
101    /// Attach one metrics sink for all session-executed operations.
102    #[must_use]
103    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
104        self.metrics = Some(sink);
105        self
106    }
107
108    // Shared fluent load wrapper construction keeps the session boundary in
109    // one place when load entry points differ only by missing-row policy.
110    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
111    where
112        E: EntityKind<Canister = C>,
113    {
114        FluentLoadQuery::new(self, Query::new(consistency))
115    }
116
117    // Shared fluent delete wrapper construction keeps the delete-mode handoff
118    // explicit at the session boundary instead of reassembling the same query
119    // shell in each public entry point.
120    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
121    where
122        E: PersistedRow<Canister = C>,
123    {
124        FluentDeleteQuery::new(self, Query::new(consistency).delete())
125    }
126
127    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
128        if let Some(sink) = self.metrics {
129            with_metrics_sink(sink, f)
130        } else {
131            f()
132        }
133    }
134
135    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
136    fn execute_save_with<E, T, R>(
137        &self,
138        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
139        map: impl FnOnce(T) -> R,
140    ) -> Result<R, InternalError>
141    where
142        E: PersistedRow<Canister = C> + EntityValue,
143    {
144        let (contract, schema_info, schema_fingerprint) = match self
145            .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
146        {
147            Ok(authority) => authority,
148            Err(error) => {
149                self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
150
151                return Err(error);
152            }
153        };
154        let value = self.with_metrics(|| {
155            op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
156        })?;
157
158        Ok(map(value))
159    }
160
161    // Execute save work after the caller has already proven that the accepted
162    // row contract is generated-compatible. SQL and structural writes use this
163    // after their pre-staging schema guard so mutation staging and save
164    // execution do not rerun schema-store reconciliation in the same statement.
165    fn execute_save_with_checked_accepted_row_contract<E, T, R>(
166        &self,
167        accepted_row_decode_contract: AcceptedRowDecodeContract,
168        accepted_schema_info: SchemaInfo,
169        accepted_schema_fingerprint: CommitSchemaFingerprint,
170        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
171        map: impl FnOnce(T) -> R,
172    ) -> Result<R, InternalError>
173    where
174        E: PersistedRow<Canister = C> + EntityValue,
175    {
176        let value = self.with_metrics(|| {
177            op(self.save_executor::<E>(
178                accepted_row_decode_contract,
179                accepted_schema_info,
180                accepted_schema_fingerprint,
181            ))
182        })?;
183
184        Ok(map(value))
185    }
186
187    // Shared save-facade wrappers keep response shape explicit at call sites.
188    fn execute_save_entity<E>(
189        &self,
190        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
191    ) -> Result<E, InternalError>
192    where
193        E: PersistedRow<Canister = C> + EntityValue,
194    {
195        self.execute_save_with(op, std::convert::identity)
196    }
197
198    fn execute_save_batch<E>(
199        &self,
200        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
201    ) -> Result<WriteBatchResponse<E>, InternalError>
202    where
203        E: PersistedRow<Canister = C> + EntityValue,
204    {
205        self.execute_save_with(op, WriteBatchResponse::new)
206    }
207
208    // ---------------------------------------------------------------------
209    // Query entry points (public, fluent)
210    // ---------------------------------------------------------------------
211
212    /// Start a fluent load query with default missing-row policy (`Ignore`).
213    #[must_use]
214    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
215    where
216        E: EntityKind<Canister = C>,
217    {
218        self.fluent_load_query(MissingRowPolicy::Ignore)
219    }
220
221    /// Start a fluent load query with explicit missing-row policy.
222    #[must_use]
223    pub const fn load_with_consistency<E>(
224        &self,
225        consistency: MissingRowPolicy,
226    ) -> FluentLoadQuery<'_, E>
227    where
228        E: EntityKind<Canister = C>,
229    {
230        self.fluent_load_query(consistency)
231    }
232
233    /// Start a fluent delete query with default missing-row policy (`Ignore`).
234    #[must_use]
235    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
236    where
237        E: PersistedRow<Canister = C>,
238    {
239        self.fluent_delete_query(MissingRowPolicy::Ignore)
240    }
241
242    /// Start a fluent delete query with explicit missing-row policy.
243    #[must_use]
244    pub fn delete_with_consistency<E>(
245        &self,
246        consistency: MissingRowPolicy,
247    ) -> FluentDeleteQuery<'_, E>
248    where
249        E: PersistedRow<Canister = C>,
250    {
251        self.fluent_delete_query(consistency)
252    }
253
254    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
255    ///
256    /// This terminal bypasses query planning and access routing entirely.
257    #[must_use]
258    pub const fn select_one(&self) -> Value {
259        Value::Int(1)
260    }
261
262    /// Return one stable, human-readable index listing for the entity schema.
263    ///
264    /// Output format mirrors SQL-style introspection:
265    /// - `PRIMARY KEY (field) [state=ready] [origin=generated]`
266    /// - `INDEX name (field_a, field_b) [state=ready] [origin=generated]`
267    /// - `UNIQUE INDEX name (field_a, field_b) [state=ready] [origin=generated]`
268    #[must_use]
269    pub fn show_indexes<E>(&self) -> Vec<String>
270    where
271        E: EntityKind<Canister = C>,
272    {
273        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
274    }
275
276    /// Return one stable, human-readable index listing for one schema model.
277    ///
278    /// This model-only helper is schema-owned and intentionally does not
279    /// attach runtime lifecycle state because it does not carry store
280    /// placement context.
281    #[must_use]
282    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
283        show_indexes_for_model(model)
284    }
285
286    /// Return one stable, human-readable index listing for the accepted schema.
287    ///
288    /// Unlike `show_indexes`, this fallible live-schema helper reflects
289    /// accepted DDL-created indexes as well as compiled schema indexes.
290    pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
291    where
292        E: EntityKind<Canister = C>,
293    {
294        let schema = self.accepted_schema_info_for_entity::<E>()?;
295
296        Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
297    }
298
299    // Return one stable, human-readable index listing for one resolved
300    // store/model pair, attaching the current runtime lifecycle state when the
301    // registry can resolve the backing store handle.
302    pub(in crate::db) fn show_indexes_for_store_model(
303        &self,
304        store_path: &str,
305        model: &'static EntityModel,
306    ) -> Vec<String> {
307        let runtime_state = self
308            .db
309            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
310            .map(|store| store.index_state());
311
312        show_indexes_for_model_with_runtime_state(model, runtime_state)
313    }
314
315    // Return one stable, human-readable index listing for one resolved
316    // store/accepted-schema pair, attaching the current runtime lifecycle state
317    // when the registry can resolve the backing store handle.
318    pub(in crate::db) fn show_indexes_for_store_schema_info(
319        &self,
320        store_path: &str,
321        schema: &SchemaInfo,
322    ) -> Vec<String> {
323        let runtime_state = self
324            .db
325            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
326            .map(|store| store.index_state());
327
328        show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
329    }
330
331    /// Return one stable generated-model list of field descriptors.
332    ///
333    /// This infallible Rust metadata helper intentionally reports the compiled
334    /// schema model. Use `try_show_columns` for the accepted persisted-schema
335    /// view used by SQL and diagnostics surfaces.
336    #[must_use]
337    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
338    where
339        E: EntityKind<Canister = C>,
340    {
341        self.show_columns_for_model(E::MODEL)
342    }
343
344    /// Return one stable generated-model list of field descriptors.
345    #[must_use]
346    pub fn show_columns_for_model(
347        &self,
348        model: &'static EntityModel,
349    ) -> Vec<EntityFieldDescription> {
350        describe_entity_fields(model)
351    }
352
353    /// Return field descriptors using the accepted persisted schema snapshot.
354    ///
355    /// This fallible variant is intended for SQL and diagnostics surfaces that
356    /// can report schema reconciliation failures. The infallible
357    /// `show_columns` helper remains generated-model based.
358    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
359    where
360        E: EntityKind<Canister = C>,
361    {
362        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
363
364        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
365    }
366
367    /// Return one stable list of runtime-registered entity names.
368    #[must_use]
369    pub fn show_entities(&self) -> Vec<String> {
370        self.db.runtime_entity_names()
371    }
372
373    /// Return one stable list of runtime-registered entity names.
374    ///
375    /// `SHOW TABLES` is only an alias for `SHOW ENTITIES`, so the typed
376    /// metadata surface keeps the same alias relationship.
377    #[must_use]
378    pub fn show_tables(&self) -> Vec<String> {
379        self.show_entities()
380    }
381
382    // Resolve the exact secondary-index set that is visible to planner-owned
383    // query planning for one recovered store and accepted schema pair.
384    fn visible_indexes_for_store_accepted_schema(
385        &self,
386        store_path: &str,
387        schema_info: &SchemaInfo,
388    ) -> Result<VisibleIndexes<'static>, QueryError> {
389        // Phase 1: resolve the recovered store state once at the session
390        // boundary so query/executor planning does not reopen lifecycle checks.
391        let store = self
392            .db
393            .recovered_store(store_path)
394            .map_err(QueryError::execute)?;
395        let state = store.index_state();
396        if state != IndexState::Ready {
397            return Ok(VisibleIndexes::none());
398        }
399        debug_assert_eq!(state, IndexState::Ready);
400
401        // Phase 2: planner-visible indexes are accepted schema contracts once
402        // the recovered store is query-visible.
403        let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
404        debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
405        debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
406        debug_assert_eq!(
407            visible_indexes.accepted_expression_index_count(),
408            Some(visible_indexes.accepted_expression_indexes().len()),
409        );
410
411        Ok(visible_indexes)
412    }
413
414    /// Return one generated-model schema description for the entity.
415    ///
416    /// This is a typed `DESCRIBE`-style introspection surface consumed by
417    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
418    /// schema view is required.
419    #[must_use]
420    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
421    where
422        E: EntityKind<Canister = C>,
423    {
424        self.describe_entity_model(E::MODEL)
425    }
426
427    /// Return one generated-model schema description for one schema model.
428    #[must_use]
429    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
430        describe_entity_model(model)
431    }
432
433    /// Return a schema description using the accepted persisted schema snapshot.
434    ///
435    /// This is the live-schema counterpart to `describe_entity`. It is fallible
436    /// because loading accepted schema authority can fail if startup
437    /// reconciliation rejects the stored metadata.
438    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
439    where
440        E: EntityKind<Canister = C>,
441    {
442        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
443
444        Ok(describe_entity_model_with_persisted_schema(
445            E::MODEL,
446            &snapshot,
447        ))
448    }
449
450    // Ensure and return the accepted schema snapshot for one generated entity.
451    // This may write the first snapshot for an empty store; otherwise it loads
452    // the latest stored snapshot and applies the current exact-match policy.
453    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
454    where
455        E: EntityKind<Canister = C>,
456    {
457        let store = self.db.recovered_store(E::Store::PATH)?;
458
459        store.with_schema_mut(|schema_store| {
460            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
461        })
462    }
463
464    // Build the accepted schema-info projection for one typed entity. Fluent
465    // terminal adapters use this before constructing slot-bound descriptors so
466    // field slot authority comes from the accepted schema snapshot.
467    pub(in crate::db) fn accepted_schema_info_for_entity<E>(
468        &self,
469    ) -> Result<SchemaInfo, InternalError>
470    where
471        E: EntityKind<Canister = C>,
472    {
473        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
474
475        Ok(
476            SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
477                E::MODEL,
478                &accepted_schema,
479                true,
480            ),
481        )
482    }
483
484    // Derive typed executor authority from an accepted snapshot the caller
485    // already loaded, avoiding a second schema-store pass in SQL write/select
486    // adapters that need both write descriptors and read selector authority.
487    pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
488        accepted_schema: &AcceptedSchemaSnapshot,
489    ) -> Result<EntityAuthority, InternalError>
490    where
491        E: EntityKind<Canister = C>,
492    {
493        EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
494    }
495
496    // Load the accepted schema snapshot and derive the matching typed executor
497    // authority as one pair so typed session entrypoints cannot accidentally
498    // mix accepted schema fingerprints with generated row-layout authority.
499    pub(in crate::db) fn accepted_entity_authority<E>(
500        &self,
501    ) -> Result<(AcceptedSchemaSnapshot, EntityAuthority), InternalError>
502    where
503        E: EntityKind<Canister = C>,
504    {
505        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
506        let authority = Self::accepted_entity_authority_for_schema::<E>(&accepted_schema)?;
507
508        Ok((accepted_schema, authority))
509    }
510
511    // Ensure accepted schema metadata is safe for write paths that still encode
512    // rows through generated field contracts. Returning only the snapshot keeps
513    // SQL write type checks unchanged while the schema-runtime descriptor guard
514    // rejects unsupported layout or payload drift before mutation staging.
515    fn ensure_generated_compatible_accepted_save_schema<E>(
516        &self,
517    ) -> Result<
518        (
519            AcceptedRowDecodeContract,
520            SchemaInfo,
521            CommitSchemaFingerprint,
522        ),
523        InternalError,
524    >
525    where
526        E: EntityKind<Canister = C>,
527    {
528        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
529        let (accepted_row_layout, _) =
530            AcceptedRowLayoutRuntimeDescriptor::from_generated_compatible_schema(
531                &accepted_schema,
532                E::MODEL,
533            )?;
534        let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, &accepted_schema);
535        let schema_fingerprint = accepted_commit_schema_fingerprint(&accepted_schema)?;
536
537        Ok((
538            accepted_row_layout.row_decode_contract(),
539            schema_info,
540            schema_fingerprint,
541        ))
542    }
543
544    /// Build one point-in-time storage report for observability endpoints.
545    pub fn storage_report(
546        &self,
547        name_to_path: &[(&'static str, &'static str)],
548    ) -> Result<StorageReport, InternalError> {
549        self.db.storage_report(name_to_path)
550    }
551
552    /// Build one point-in-time storage report using default entity-path labels.
553    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
554        self.db.storage_report_default()
555    }
556
557    /// Build one point-in-time integrity scan report for observability endpoints.
558    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
559        self.db.integrity_report()
560    }
561
562    // ---------------------------------------------------------------------
563    // Low-level executors (crate-internal; execution primitives)
564    // ---------------------------------------------------------------------
565
566    #[must_use]
567    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
568    where
569        E: EntityKind<Canister = C> + EntityValue,
570    {
571        LoadExecutor::new(self.db, self.debug)
572    }
573
574    #[must_use]
575    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
576    where
577        E: PersistedRow<Canister = C> + EntityValue,
578    {
579        DeleteExecutor::new(self.db)
580    }
581
582    #[must_use]
583    pub(in crate::db) const fn save_executor<E>(
584        &self,
585        accepted_row_decode_contract: AcceptedRowDecodeContract,
586        accepted_schema_info: SchemaInfo,
587        accepted_schema_fingerprint: CommitSchemaFingerprint,
588    ) -> SaveExecutor<E>
589    where
590        E: PersistedRow<Canister = C> + EntityValue,
591    {
592        SaveExecutor::new_with_accepted_contract(
593            self.db,
594            self.debug,
595            accepted_row_decode_contract,
596            accepted_schema_info,
597            accepted_schema_fingerprint,
598        )
599    }
600}