Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10///
11/// TESTS
12///
13#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18    db::{
19        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20        FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
21        QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
22        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
23        query::plan::VisibleIndexes,
24        schema::{
25            AcceptedRowDecodeContract, AcceptedRowLayoutRuntimeDescriptor, AcceptedSchemaSnapshot,
26            describe_entity_fields, describe_entity_fields_with_persisted_schema,
27            describe_entity_model, describe_entity_model_with_persisted_schema,
28            ensure_accepted_schema_snapshot, show_indexes_for_model,
29            show_indexes_for_model_with_runtime_state,
30        },
31    },
32    error::InternalError,
33    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
34    model::entity::EntityModel,
35    traits::{CanisterKind, EntityKind, EntityValue, Path},
36    value::Value,
37};
38use std::thread::LocalKey;
39
40#[cfg(feature = "diagnostics")]
41pub use query::{
42    DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
43    QueryExecutionAttribution,
44};
45pub(in crate::db) use response::finalize_structural_grouped_projection_result;
46pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
47#[cfg(feature = "sql")]
48pub use sql::SqlStatementResult;
49#[cfg(all(feature = "sql", feature = "diagnostics"))]
50pub use sql::{
51    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
52    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
53};
54#[cfg(all(feature = "sql", feature = "diagnostics"))]
55pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
56
57///
58/// DbSession
59///
60/// Session-scoped database handle with policy (debug, metrics) and execution routing.
61///
62
63pub struct DbSession<C: CanisterKind> {
64    db: Db<C>,
65    debug: bool,
66    metrics: Option<&'static dyn MetricsSink>,
67}
68
69impl<C: CanisterKind> DbSession<C> {
70    /// Construct one session facade for a database handle.
71    #[must_use]
72    pub(crate) const fn new(db: Db<C>) -> Self {
73        Self {
74            db,
75            debug: false,
76            metrics: None,
77        }
78    }
79
80    /// Construct one session facade from store registry and runtime hooks.
81    #[must_use]
82    pub const fn new_with_hooks(
83        store: &'static LocalKey<StoreRegistry>,
84        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
85    ) -> Self {
86        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
87    }
88
89    /// Enable debug execution behavior where supported by executors.
90    #[must_use]
91    pub const fn debug(mut self) -> Self {
92        self.debug = true;
93        self
94    }
95
96    /// Attach one metrics sink for all session-executed operations.
97    #[must_use]
98    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
99        self.metrics = Some(sink);
100        self
101    }
102
103    // Shared fluent load wrapper construction keeps the session boundary in
104    // one place when load entry points differ only by missing-row policy.
105    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
106    where
107        E: EntityKind<Canister = C>,
108    {
109        FluentLoadQuery::new(self, Query::new(consistency))
110    }
111
112    // Shared fluent delete wrapper construction keeps the delete-mode handoff
113    // explicit at the session boundary instead of reassembling the same query
114    // shell in each public entry point.
115    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
116    where
117        E: PersistedRow<Canister = C>,
118    {
119        FluentDeleteQuery::new(self, Query::new(consistency).delete())
120    }
121
122    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
123        if let Some(sink) = self.metrics {
124            with_metrics_sink(sink, f)
125        } else {
126            f()
127        }
128    }
129
130    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
131    fn execute_save_with<E, T, R>(
132        &self,
133        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
134        map: impl FnOnce(T) -> R,
135    ) -> Result<R, InternalError>
136    where
137        E: PersistedRow<Canister = C> + EntityValue,
138    {
139        let contract = match self
140            .with_metrics(|| self.ensure_generated_compatible_accepted_row_decode_contract::<E>())
141        {
142            Ok(contract) => contract,
143            Err(error) => {
144                self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
145
146                return Err(error);
147            }
148        };
149        let value = self.with_metrics(|| {
150            op(self
151                .save_executor::<E>()
152                .with_accepted_row_decode_contract(contract))
153        })?;
154
155        Ok(map(value))
156    }
157
158    // Execute save work after the caller has already proven that the accepted
159    // row contract is generated-compatible. SQL and structural writes use this
160    // after their pre-staging schema guard so mutation staging and save
161    // execution do not rerun schema-store reconciliation in the same statement.
162    fn execute_save_with_checked_accepted_row_contract<E, T, R>(
163        &self,
164        accepted_row_decode_contract: AcceptedRowDecodeContract,
165        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
166        map: impl FnOnce(T) -> R,
167    ) -> Result<R, InternalError>
168    where
169        E: PersistedRow<Canister = C> + EntityValue,
170    {
171        let value = self.with_metrics(|| {
172            op(self
173                .save_executor::<E>()
174                .with_accepted_row_decode_contract(accepted_row_decode_contract))
175        })?;
176
177        Ok(map(value))
178    }
179
180    // Shared save-facade wrappers keep response shape explicit at call sites.
181    fn execute_save_entity<E>(
182        &self,
183        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
184    ) -> Result<E, InternalError>
185    where
186        E: PersistedRow<Canister = C> + EntityValue,
187    {
188        self.execute_save_with(op, std::convert::identity)
189    }
190
191    fn execute_save_batch<E>(
192        &self,
193        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
194    ) -> Result<WriteBatchResponse<E>, InternalError>
195    where
196        E: PersistedRow<Canister = C> + EntityValue,
197    {
198        self.execute_save_with(op, WriteBatchResponse::new)
199    }
200
201    // ---------------------------------------------------------------------
202    // Query entry points (public, fluent)
203    // ---------------------------------------------------------------------
204
205    /// Start a fluent load query with default missing-row policy (`Ignore`).
206    #[must_use]
207    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
208    where
209        E: EntityKind<Canister = C>,
210    {
211        self.fluent_load_query(MissingRowPolicy::Ignore)
212    }
213
214    /// Start a fluent load query with explicit missing-row policy.
215    #[must_use]
216    pub const fn load_with_consistency<E>(
217        &self,
218        consistency: MissingRowPolicy,
219    ) -> FluentLoadQuery<'_, E>
220    where
221        E: EntityKind<Canister = C>,
222    {
223        self.fluent_load_query(consistency)
224    }
225
226    /// Start a fluent delete query with default missing-row policy (`Ignore`).
227    #[must_use]
228    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
229    where
230        E: PersistedRow<Canister = C>,
231    {
232        self.fluent_delete_query(MissingRowPolicy::Ignore)
233    }
234
235    /// Start a fluent delete query with explicit missing-row policy.
236    #[must_use]
237    pub fn delete_with_consistency<E>(
238        &self,
239        consistency: MissingRowPolicy,
240    ) -> FluentDeleteQuery<'_, E>
241    where
242        E: PersistedRow<Canister = C>,
243    {
244        self.fluent_delete_query(consistency)
245    }
246
247    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
248    ///
249    /// This terminal bypasses query planning and access routing entirely.
250    #[must_use]
251    pub const fn select_one(&self) -> Value {
252        Value::Int(1)
253    }
254
255    /// Return one stable, human-readable index listing for the entity schema.
256    ///
257    /// Output format mirrors SQL-style introspection:
258    /// - `PRIMARY KEY (field)`
259    /// - `INDEX name (field_a, field_b)`
260    /// - `UNIQUE INDEX name (field_a, field_b)`
261    #[must_use]
262    pub fn show_indexes<E>(&self) -> Vec<String>
263    where
264        E: EntityKind<Canister = C>,
265    {
266        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
267    }
268
269    /// Return one stable, human-readable index listing for one schema model.
270    ///
271    /// This model-only helper is schema-owned and intentionally does not
272    /// attach runtime lifecycle state because it does not carry store
273    /// placement context.
274    #[must_use]
275    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
276        show_indexes_for_model(model)
277    }
278
279    // Return one stable, human-readable index listing for one resolved
280    // store/model pair, attaching the current runtime lifecycle state when the
281    // registry can resolve the backing store handle.
282    pub(in crate::db) fn show_indexes_for_store_model(
283        &self,
284        store_path: &str,
285        model: &'static EntityModel,
286    ) -> Vec<String> {
287        let runtime_state = self
288            .db
289            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
290            .map(|store| store.index_state());
291
292        show_indexes_for_model_with_runtime_state(model, runtime_state)
293    }
294
295    /// Return one stable generated-model list of field descriptors.
296    ///
297    /// This infallible Rust metadata helper intentionally reports the compiled
298    /// schema model. Use `try_show_columns` for the accepted persisted-schema
299    /// view used by SQL and diagnostics surfaces.
300    #[must_use]
301    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
302    where
303        E: EntityKind<Canister = C>,
304    {
305        self.show_columns_for_model(E::MODEL)
306    }
307
308    /// Return one stable generated-model list of field descriptors.
309    #[must_use]
310    pub fn show_columns_for_model(
311        &self,
312        model: &'static EntityModel,
313    ) -> Vec<EntityFieldDescription> {
314        describe_entity_fields(model)
315    }
316
317    /// Return field descriptors using the accepted persisted schema snapshot.
318    ///
319    /// This fallible variant is intended for SQL and diagnostics surfaces that
320    /// can report schema reconciliation failures. The infallible
321    /// `show_columns` helper remains generated-model based.
322    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
323    where
324        E: EntityKind<Canister = C>,
325    {
326        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
327
328        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
329    }
330
331    /// Return one stable list of runtime-registered entity names.
332    #[must_use]
333    pub fn show_entities(&self) -> Vec<String> {
334        self.db.runtime_entity_names()
335    }
336
337    /// Return one stable list of runtime-registered entity names.
338    ///
339    /// `SHOW TABLES` is only an alias for `SHOW ENTITIES`, so the typed
340    /// metadata surface keeps the same alias relationship.
341    #[must_use]
342    pub fn show_tables(&self) -> Vec<String> {
343        self.show_entities()
344    }
345
346    // Resolve the exact secondary-index set that is visible to planner-owned
347    // query planning for one recovered store/model pair.
348    fn visible_indexes_for_store_model(
349        &self,
350        store_path: &str,
351        model: &'static EntityModel,
352    ) -> Result<VisibleIndexes<'static>, QueryError> {
353        // Phase 1: resolve the recovered store state once at the session
354        // boundary so query/executor planning does not reopen lifecycle checks.
355        let store = self
356            .db
357            .recovered_store(store_path)
358            .map_err(QueryError::execute)?;
359        let state = store.index_state();
360        if state != IndexState::Ready {
361            return Ok(VisibleIndexes::none());
362        }
363        debug_assert_eq!(state, IndexState::Ready);
364
365        // Phase 2: planner-visible indexes are exactly the model-owned index
366        // declarations once the recovered store is query-visible.
367        Ok(VisibleIndexes::planner_visible(model.indexes()))
368    }
369
370    /// Return one generated-model schema description for the entity.
371    ///
372    /// This is a typed `DESCRIBE`-style introspection surface consumed by
373    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
374    /// schema view is required.
375    #[must_use]
376    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
377    where
378        E: EntityKind<Canister = C>,
379    {
380        self.describe_entity_model(E::MODEL)
381    }
382
383    /// Return one generated-model schema description for one schema model.
384    #[must_use]
385    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
386        describe_entity_model(model)
387    }
388
389    /// Return a schema description using the accepted persisted schema snapshot.
390    ///
391    /// This is the live-schema counterpart to `describe_entity`. It is fallible
392    /// because loading accepted schema authority can fail if startup
393    /// reconciliation rejects the stored metadata.
394    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
395    where
396        E: EntityKind<Canister = C>,
397    {
398        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
399
400        Ok(describe_entity_model_with_persisted_schema(
401            E::MODEL,
402            &snapshot,
403        ))
404    }
405
406    // Ensure and return the accepted schema snapshot for one generated entity.
407    // This may write the first snapshot for an empty store; otherwise it loads
408    // the latest stored snapshot and applies the current exact-match policy.
409    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
410    where
411        E: EntityKind<Canister = C>,
412    {
413        self.ensure_accepted_schema_snapshot_for_authority(&EntityAuthority::for_type::<E>())
414    }
415
416    // Ensure and return the accepted schema snapshot from already-resolved
417    // structural entity authority. SQL and fluent shared-plan cache paths use
418    // this shape after lowering has erased the concrete entity type.
419    fn ensure_accepted_schema_snapshot_for_authority(
420        &self,
421        authority: &EntityAuthority,
422    ) -> Result<AcceptedSchemaSnapshot, InternalError> {
423        let store = self.db.recovered_store(authority.store_path())?;
424
425        store.with_schema_mut(|schema_store| {
426            ensure_accepted_schema_snapshot(
427                schema_store,
428                authority.entity_tag(),
429                authority.entity_path(),
430                authority.model(),
431            )
432        })
433    }
434
435    // Ensure accepted schema metadata and derive the execution authority that
436    // consumes it. Keeping the pair together prevents session call sites from
437    // mixing a live-schema fingerprint with a generated-only row layout.
438    fn ensure_accepted_schema_snapshot_and_authority(
439        &self,
440        authority: EntityAuthority,
441    ) -> Result<(AcceptedSchemaSnapshot, EntityAuthority), InternalError> {
442        let accepted_schema = self.ensure_accepted_schema_snapshot_for_authority(&authority)?;
443        let accepted_row_layout =
444            AcceptedRowLayoutRuntimeDescriptor::from_accepted_schema(&accepted_schema)?;
445        let row_shape =
446            accepted_row_layout.generated_compatible_row_shape_for_model(authority.model())?;
447        let row_decode_contract = accepted_row_layout.row_decode_contract();
448        let authority = authority.with_accepted_row_decode_contract(row_shape, row_decode_contract);
449
450        Ok((accepted_schema, authority))
451    }
452
453    // Ensure accepted schema metadata is safe for write paths that still encode
454    // rows through generated field contracts. Returning only the snapshot keeps
455    // SQL write type checks unchanged while the schema-runtime descriptor guard
456    // rejects unsupported layout or payload drift before mutation staging.
457    fn ensure_generated_compatible_accepted_row_decode_contract<E>(
458        &self,
459    ) -> Result<AcceptedRowDecodeContract, InternalError>
460    where
461        E: EntityKind<Canister = C>,
462    {
463        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
464        let accepted_row_layout =
465            AcceptedRowLayoutRuntimeDescriptor::from_accepted_schema(&accepted_schema)?;
466        accepted_row_layout.generated_compatible_row_shape_for_model(E::MODEL)?;
467
468        Ok(accepted_row_layout.row_decode_contract())
469    }
470
471    /// Build one point-in-time storage report for observability endpoints.
472    pub fn storage_report(
473        &self,
474        name_to_path: &[(&'static str, &'static str)],
475    ) -> Result<StorageReport, InternalError> {
476        self.db.storage_report(name_to_path)
477    }
478
479    /// Build one point-in-time storage report using default entity-path labels.
480    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
481        self.db.storage_report_default()
482    }
483
484    /// Build one point-in-time integrity scan report for observability endpoints.
485    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
486        self.db.integrity_report()
487    }
488
489    // ---------------------------------------------------------------------
490    // Low-level executors (crate-internal; execution primitives)
491    // ---------------------------------------------------------------------
492
493    #[must_use]
494    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
495    where
496        E: EntityKind<Canister = C> + EntityValue,
497    {
498        LoadExecutor::new(self.db, self.debug)
499    }
500
501    #[must_use]
502    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
503    where
504        E: PersistedRow<Canister = C> + EntityValue,
505    {
506        DeleteExecutor::new(self.db)
507    }
508
509    #[must_use]
510    pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
511    where
512        E: PersistedRow<Canister = C> + EntityValue,
513    {
514        SaveExecutor::new(self.db, self.debug)
515    }
516}