Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10///
11/// TESTS
12///
13#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18    db::{
19        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20        FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
21        QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
22        commit::CommitSchemaFingerprint,
23        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
24        query::plan::VisibleIndexes,
25        schema::{
26            AcceptedRowDecodeContract, AcceptedRowLayoutRuntimeDescriptor, AcceptedSchemaSnapshot,
27            SchemaInfo, accepted_commit_schema_fingerprint_for_model, describe_entity_fields,
28            describe_entity_fields_with_persisted_schema, describe_entity_model,
29            describe_entity_model_with_persisted_schema, ensure_accepted_schema_snapshot,
30            show_indexes_for_model, show_indexes_for_model_with_runtime_state,
31        },
32    },
33    error::InternalError,
34    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
35    model::entity::EntityModel,
36    traits::{CanisterKind, EntityKind, EntityValue, Path},
37    value::Value,
38};
39use std::thread::LocalKey;
40
41#[cfg(feature = "diagnostics")]
42pub use query::{
43    DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
44    QueryExecutionAttribution,
45};
46pub(in crate::db) use response::finalize_structural_grouped_projection_result;
47pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
48#[cfg(feature = "sql")]
49pub use sql::SqlStatementResult;
50#[cfg(all(feature = "sql", feature = "diagnostics"))]
51pub use sql::{
52    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
53    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
54};
55#[cfg(all(feature = "sql", feature = "diagnostics"))]
56pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
57
58///
59/// DbSession
60///
61/// Session-scoped database handle with policy (debug, metrics) and execution routing.
62///
63
64pub struct DbSession<C: CanisterKind> {
65    db: Db<C>,
66    debug: bool,
67    metrics: Option<&'static dyn MetricsSink>,
68}
69
70impl<C: CanisterKind> DbSession<C> {
71    /// Construct one session facade for a database handle.
72    #[must_use]
73    pub(crate) const fn new(db: Db<C>) -> Self {
74        Self {
75            db,
76            debug: false,
77            metrics: None,
78        }
79    }
80
81    /// Construct one session facade from store registry and runtime hooks.
82    #[must_use]
83    pub const fn new_with_hooks(
84        store: &'static LocalKey<StoreRegistry>,
85        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
86    ) -> Self {
87        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
88    }
89
90    /// Enable debug execution behavior where supported by executors.
91    #[must_use]
92    pub const fn debug(mut self) -> Self {
93        self.debug = true;
94        self
95    }
96
97    /// Attach one metrics sink for all session-executed operations.
98    #[must_use]
99    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
100        self.metrics = Some(sink);
101        self
102    }
103
104    // Shared fluent load wrapper construction keeps the session boundary in
105    // one place when load entry points differ only by missing-row policy.
106    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
107    where
108        E: EntityKind<Canister = C>,
109    {
110        FluentLoadQuery::new(self, Query::new(consistency))
111    }
112
113    // Shared fluent delete wrapper construction keeps the delete-mode handoff
114    // explicit at the session boundary instead of reassembling the same query
115    // shell in each public entry point.
116    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
117    where
118        E: PersistedRow<Canister = C>,
119    {
120        FluentDeleteQuery::new(self, Query::new(consistency).delete())
121    }
122
123    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
124        if let Some(sink) = self.metrics {
125            with_metrics_sink(sink, f)
126        } else {
127            f()
128        }
129    }
130
131    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
132    fn execute_save_with<E, T, R>(
133        &self,
134        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
135        map: impl FnOnce(T) -> R,
136    ) -> Result<R, InternalError>
137    where
138        E: PersistedRow<Canister = C> + EntityValue,
139    {
140        let (contract, schema_info, schema_fingerprint) = match self
141            .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
142        {
143            Ok(authority) => authority,
144            Err(error) => {
145                self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
146
147                return Err(error);
148            }
149        };
150        let value = self.with_metrics(|| {
151            op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
152        })?;
153
154        Ok(map(value))
155    }
156
157    // Execute save work after the caller has already proven that the accepted
158    // row contract is generated-compatible. SQL and structural writes use this
159    // after their pre-staging schema guard so mutation staging and save
160    // execution do not rerun schema-store reconciliation in the same statement.
161    fn execute_save_with_checked_accepted_row_contract<E, T, R>(
162        &self,
163        accepted_row_decode_contract: AcceptedRowDecodeContract,
164        accepted_schema_info: SchemaInfo,
165        accepted_schema_fingerprint: CommitSchemaFingerprint,
166        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
167        map: impl FnOnce(T) -> R,
168    ) -> Result<R, InternalError>
169    where
170        E: PersistedRow<Canister = C> + EntityValue,
171    {
172        let value = self.with_metrics(|| {
173            op(self.save_executor::<E>(
174                accepted_row_decode_contract,
175                accepted_schema_info,
176                accepted_schema_fingerprint,
177            ))
178        })?;
179
180        Ok(map(value))
181    }
182
183    // Shared save-facade wrappers keep response shape explicit at call sites.
184    fn execute_save_entity<E>(
185        &self,
186        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
187    ) -> Result<E, InternalError>
188    where
189        E: PersistedRow<Canister = C> + EntityValue,
190    {
191        self.execute_save_with(op, std::convert::identity)
192    }
193
194    fn execute_save_batch<E>(
195        &self,
196        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
197    ) -> Result<WriteBatchResponse<E>, InternalError>
198    where
199        E: PersistedRow<Canister = C> + EntityValue,
200    {
201        self.execute_save_with(op, WriteBatchResponse::new)
202    }
203
204    // ---------------------------------------------------------------------
205    // Query entry points (public, fluent)
206    // ---------------------------------------------------------------------
207
208    /// Start a fluent load query with default missing-row policy (`Ignore`).
209    #[must_use]
210    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
211    where
212        E: EntityKind<Canister = C>,
213    {
214        self.fluent_load_query(MissingRowPolicy::Ignore)
215    }
216
217    /// Start a fluent load query with explicit missing-row policy.
218    #[must_use]
219    pub const fn load_with_consistency<E>(
220        &self,
221        consistency: MissingRowPolicy,
222    ) -> FluentLoadQuery<'_, E>
223    where
224        E: EntityKind<Canister = C>,
225    {
226        self.fluent_load_query(consistency)
227    }
228
229    /// Start a fluent delete query with default missing-row policy (`Ignore`).
230    #[must_use]
231    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
232    where
233        E: PersistedRow<Canister = C>,
234    {
235        self.fluent_delete_query(MissingRowPolicy::Ignore)
236    }
237
238    /// Start a fluent delete query with explicit missing-row policy.
239    #[must_use]
240    pub fn delete_with_consistency<E>(
241        &self,
242        consistency: MissingRowPolicy,
243    ) -> FluentDeleteQuery<'_, E>
244    where
245        E: PersistedRow<Canister = C>,
246    {
247        self.fluent_delete_query(consistency)
248    }
249
250    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
251    ///
252    /// This terminal bypasses query planning and access routing entirely.
253    #[must_use]
254    pub const fn select_one(&self) -> Value {
255        Value::Int(1)
256    }
257
258    /// Return one stable, human-readable index listing for the entity schema.
259    ///
260    /// Output format mirrors SQL-style introspection:
261    /// - `PRIMARY KEY (field)`
262    /// - `INDEX name (field_a, field_b)`
263    /// - `UNIQUE INDEX name (field_a, field_b)`
264    #[must_use]
265    pub fn show_indexes<E>(&self) -> Vec<String>
266    where
267        E: EntityKind<Canister = C>,
268    {
269        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
270    }
271
272    /// Return one stable, human-readable index listing for one schema model.
273    ///
274    /// This model-only helper is schema-owned and intentionally does not
275    /// attach runtime lifecycle state because it does not carry store
276    /// placement context.
277    #[must_use]
278    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
279        show_indexes_for_model(model)
280    }
281
282    // Return one stable, human-readable index listing for one resolved
283    // store/model pair, attaching the current runtime lifecycle state when the
284    // registry can resolve the backing store handle.
285    pub(in crate::db) fn show_indexes_for_store_model(
286        &self,
287        store_path: &str,
288        model: &'static EntityModel,
289    ) -> Vec<String> {
290        let runtime_state = self
291            .db
292            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
293            .map(|store| store.index_state());
294
295        show_indexes_for_model_with_runtime_state(model, runtime_state)
296    }
297
298    /// Return one stable generated-model list of field descriptors.
299    ///
300    /// This infallible Rust metadata helper intentionally reports the compiled
301    /// schema model. Use `try_show_columns` for the accepted persisted-schema
302    /// view used by SQL and diagnostics surfaces.
303    #[must_use]
304    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
305    where
306        E: EntityKind<Canister = C>,
307    {
308        self.show_columns_for_model(E::MODEL)
309    }
310
311    /// Return one stable generated-model list of field descriptors.
312    #[must_use]
313    pub fn show_columns_for_model(
314        &self,
315        model: &'static EntityModel,
316    ) -> Vec<EntityFieldDescription> {
317        describe_entity_fields(model)
318    }
319
320    /// Return field descriptors using the accepted persisted schema snapshot.
321    ///
322    /// This fallible variant is intended for SQL and diagnostics surfaces that
323    /// can report schema reconciliation failures. The infallible
324    /// `show_columns` helper remains generated-model based.
325    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
326    where
327        E: EntityKind<Canister = C>,
328    {
329        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
330
331        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
332    }
333
334    /// Return one stable list of runtime-registered entity names.
335    #[must_use]
336    pub fn show_entities(&self) -> Vec<String> {
337        self.db.runtime_entity_names()
338    }
339
340    /// Return one stable list of runtime-registered entity names.
341    ///
342    /// `SHOW TABLES` is only an alias for `SHOW ENTITIES`, so the typed
343    /// metadata surface keeps the same alias relationship.
344    #[must_use]
345    pub fn show_tables(&self) -> Vec<String> {
346        self.show_entities()
347    }
348
349    // Resolve the exact secondary-index set that is visible to planner-owned
350    // query planning for one recovered store/model pair.
351    fn visible_indexes_for_store_model(
352        &self,
353        store_path: &str,
354        model: &'static EntityModel,
355    ) -> Result<VisibleIndexes<'static>, QueryError> {
356        // Phase 1: resolve the recovered store state once at the session
357        // boundary so query/executor planning does not reopen lifecycle checks.
358        let store = self
359            .db
360            .recovered_store(store_path)
361            .map_err(QueryError::execute)?;
362        let state = store.index_state();
363        if state != IndexState::Ready {
364            return Ok(VisibleIndexes::none());
365        }
366        debug_assert_eq!(state, IndexState::Ready);
367
368        // Phase 2: planner-visible indexes are exactly the model-owned index
369        // declarations once the recovered store is query-visible.
370        Ok(VisibleIndexes::planner_visible(model.indexes()))
371    }
372
373    /// Return one generated-model schema description for the entity.
374    ///
375    /// This is a typed `DESCRIBE`-style introspection surface consumed by
376    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
377    /// schema view is required.
378    #[must_use]
379    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
380    where
381        E: EntityKind<Canister = C>,
382    {
383        self.describe_entity_model(E::MODEL)
384    }
385
386    /// Return one generated-model schema description for one schema model.
387    #[must_use]
388    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
389        describe_entity_model(model)
390    }
391
392    /// Return a schema description using the accepted persisted schema snapshot.
393    ///
394    /// This is the live-schema counterpart to `describe_entity`. It is fallible
395    /// because loading accepted schema authority can fail if startup
396    /// reconciliation rejects the stored metadata.
397    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
398    where
399        E: EntityKind<Canister = C>,
400    {
401        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
402
403        Ok(describe_entity_model_with_persisted_schema(
404            E::MODEL,
405            &snapshot,
406        ))
407    }
408
409    // Ensure and return the accepted schema snapshot for one generated entity.
410    // This may write the first snapshot for an empty store; otherwise it loads
411    // the latest stored snapshot and applies the current exact-match policy.
412    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
413    where
414        E: EntityKind<Canister = C>,
415    {
416        let store = self.db.recovered_store(E::Store::PATH)?;
417
418        store.with_schema_mut(|schema_store| {
419            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
420        })
421    }
422
423    // Build the accepted schema-info projection for one typed entity. Fluent
424    // terminal adapters use this before constructing slot-bound descriptors so
425    // field slot authority comes from the accepted schema snapshot.
426    pub(in crate::db) fn accepted_schema_info_for_entity<E>(
427        &self,
428    ) -> Result<SchemaInfo, InternalError>
429    where
430        E: EntityKind<Canister = C>,
431    {
432        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
433
434        Ok(SchemaInfo::from_accepted_snapshot_for_model(
435            E::MODEL,
436            &accepted_schema,
437        ))
438    }
439
440    // Derive executor authority from one accepted schema snapshot. This keeps
441    // row layout, cursor schema info, and schema compatibility proof selection
442    // in the same session-owned bootstrap path.
443    fn accepted_authority_for_schema(
444        authority: EntityAuthority,
445        accepted_schema: &AcceptedSchemaSnapshot,
446    ) -> Result<EntityAuthority, InternalError> {
447        let (accepted_row_layout, row_shape) =
448            AcceptedRowLayoutRuntimeDescriptor::from_generated_compatible_schema(
449                accepted_schema,
450                authority.model(),
451            )?;
452        let row_decode_contract = accepted_row_layout.row_decode_contract();
453        let schema_info =
454            SchemaInfo::from_accepted_snapshot_for_model(authority.model(), accepted_schema);
455
456        Ok(
457            authority.with_accepted_row_decode_contract(
458                row_shape,
459                row_decode_contract,
460                schema_info,
461            ),
462        )
463    }
464
465    // Derive typed executor authority from an accepted snapshot the caller
466    // already loaded, avoiding a second schema-store pass in SQL write/select
467    // adapters that need both write descriptors and read selector authority.
468    pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
469        accepted_schema: &AcceptedSchemaSnapshot,
470    ) -> Result<EntityAuthority, InternalError>
471    where
472        E: EntityKind<Canister = C>,
473    {
474        Self::accepted_authority_for_schema(EntityAuthority::for_type::<E>(), accepted_schema)
475    }
476
477    // Load the accepted schema snapshot and derive the matching typed executor
478    // authority as one pair so typed session entrypoints cannot accidentally
479    // mix accepted schema fingerprints with generated row-layout authority.
480    pub(in crate::db) fn accepted_entity_authority<E>(
481        &self,
482    ) -> Result<(AcceptedSchemaSnapshot, EntityAuthority), InternalError>
483    where
484        E: EntityKind<Canister = C>,
485    {
486        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
487        let authority = Self::accepted_entity_authority_for_schema::<E>(&accepted_schema)?;
488
489        Ok((accepted_schema, authority))
490    }
491
492    // Ensure accepted schema metadata is safe for write paths that still encode
493    // rows through generated field contracts. Returning only the snapshot keeps
494    // SQL write type checks unchanged while the schema-runtime descriptor guard
495    // rejects unsupported layout or payload drift before mutation staging.
496    fn ensure_generated_compatible_accepted_save_schema<E>(
497        &self,
498    ) -> Result<
499        (
500            AcceptedRowDecodeContract,
501            SchemaInfo,
502            CommitSchemaFingerprint,
503        ),
504        InternalError,
505    >
506    where
507        E: EntityKind<Canister = C>,
508    {
509        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
510        let (accepted_row_layout, _) =
511            AcceptedRowLayoutRuntimeDescriptor::from_generated_compatible_schema(
512                &accepted_schema,
513                E::MODEL,
514            )?;
515        let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, &accepted_schema);
516        let schema_fingerprint =
517            accepted_commit_schema_fingerprint_for_model(E::MODEL, &accepted_schema)?;
518
519        Ok((
520            accepted_row_layout.row_decode_contract(),
521            schema_info,
522            schema_fingerprint,
523        ))
524    }
525
526    /// Build one point-in-time storage report for observability endpoints.
527    pub fn storage_report(
528        &self,
529        name_to_path: &[(&'static str, &'static str)],
530    ) -> Result<StorageReport, InternalError> {
531        self.db.storage_report(name_to_path)
532    }
533
534    /// Build one point-in-time storage report using default entity-path labels.
535    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
536        self.db.storage_report_default()
537    }
538
539    /// Build one point-in-time integrity scan report for observability endpoints.
540    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
541        self.db.integrity_report()
542    }
543
544    // ---------------------------------------------------------------------
545    // Low-level executors (crate-internal; execution primitives)
546    // ---------------------------------------------------------------------
547
548    #[must_use]
549    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
550    where
551        E: EntityKind<Canister = C> + EntityValue,
552    {
553        LoadExecutor::new(self.db, self.debug)
554    }
555
556    #[must_use]
557    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
558    where
559        E: PersistedRow<Canister = C> + EntityValue,
560    {
561        DeleteExecutor::new(self.db)
562    }
563
564    #[must_use]
565    pub(in crate::db) const fn save_executor<E>(
566        &self,
567        accepted_row_decode_contract: AcceptedRowDecodeContract,
568        accepted_schema_info: SchemaInfo,
569        accepted_schema_fingerprint: CommitSchemaFingerprint,
570    ) -> SaveExecutor<E>
571    where
572        E: PersistedRow<Canister = C> + EntityValue,
573    {
574        SaveExecutor::new_with_accepted_contract(
575            self.db,
576            self.debug,
577            accepted_row_decode_contract,
578            accepted_schema_info,
579            accepted_schema_fingerprint,
580        )
581    }
582}