Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10///
11/// TESTS
12///
13#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18    db::{
19        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20        FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
21        QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
22        executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
23        query::plan::VisibleIndexes,
24        schema::{
25            AcceptedSchemaSnapshot, describe_entity_fields,
26            describe_entity_fields_with_persisted_schema, describe_entity_model,
27            describe_entity_model_with_persisted_schema, ensure_initial_schema_snapshot,
28            show_indexes_for_model, show_indexes_for_model_with_runtime_state,
29        },
30    },
31    error::InternalError,
32    metrics::sink::{MetricsSink, with_metrics_sink},
33    model::entity::EntityModel,
34    traits::{CanisterKind, EntityKind, EntityValue, Path},
35    value::Value,
36};
37use std::thread::LocalKey;
38
39#[cfg(feature = "diagnostics")]
40pub use query::{
41    DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
42    QueryExecutionAttribution,
43};
44pub(in crate::db) use response::finalize_structural_grouped_projection_result;
45pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
46#[cfg(feature = "sql")]
47pub use sql::SqlStatementResult;
48#[cfg(all(feature = "sql", feature = "diagnostics"))]
49pub use sql::{
50    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
51    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
52};
53#[cfg(all(feature = "sql", feature = "diagnostics"))]
54pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
55
56///
57/// DbSession
58///
59/// Session-scoped database handle with policy (debug, metrics) and execution routing.
60///
61
62pub struct DbSession<C: CanisterKind> {
63    db: Db<C>,
64    debug: bool,
65    metrics: Option<&'static dyn MetricsSink>,
66}
67
68impl<C: CanisterKind> DbSession<C> {
69    /// Construct one session facade for a database handle.
70    #[must_use]
71    pub(crate) const fn new(db: Db<C>) -> Self {
72        Self {
73            db,
74            debug: false,
75            metrics: None,
76        }
77    }
78
79    /// Construct one session facade from store registry and runtime hooks.
80    #[must_use]
81    pub const fn new_with_hooks(
82        store: &'static LocalKey<StoreRegistry>,
83        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
84    ) -> Self {
85        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
86    }
87
88    /// Enable debug execution behavior where supported by executors.
89    #[must_use]
90    pub const fn debug(mut self) -> Self {
91        self.debug = true;
92        self
93    }
94
95    /// Attach one metrics sink for all session-executed operations.
96    #[must_use]
97    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
98        self.metrics = Some(sink);
99        self
100    }
101
102    // Shared fluent load wrapper construction keeps the session boundary in
103    // one place when load entry points differ only by missing-row policy.
104    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
105    where
106        E: EntityKind<Canister = C>,
107    {
108        FluentLoadQuery::new(self, Query::new(consistency))
109    }
110
111    // Shared fluent delete wrapper construction keeps the delete-mode handoff
112    // explicit at the session boundary instead of reassembling the same query
113    // shell in each public entry point.
114    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
115    where
116        E: PersistedRow<Canister = C>,
117    {
118        FluentDeleteQuery::new(self, Query::new(consistency).delete())
119    }
120
121    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
122        if let Some(sink) = self.metrics {
123            with_metrics_sink(sink, f)
124        } else {
125            f()
126        }
127    }
128
129    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
130    fn execute_save_with<E, T, R>(
131        &self,
132        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
133        map: impl FnOnce(T) -> R,
134    ) -> Result<R, InternalError>
135    where
136        E: PersistedRow<Canister = C> + EntityValue,
137    {
138        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
139
140        Ok(map(value))
141    }
142
143    // Shared save-facade wrappers keep response shape explicit at call sites.
144    fn execute_save_entity<E>(
145        &self,
146        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
147    ) -> Result<E, InternalError>
148    where
149        E: PersistedRow<Canister = C> + EntityValue,
150    {
151        self.execute_save_with(op, std::convert::identity)
152    }
153
154    fn execute_save_batch<E>(
155        &self,
156        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
157    ) -> Result<WriteBatchResponse<E>, InternalError>
158    where
159        E: PersistedRow<Canister = C> + EntityValue,
160    {
161        self.execute_save_with(op, WriteBatchResponse::new)
162    }
163
164    // ---------------------------------------------------------------------
165    // Query entry points (public, fluent)
166    // ---------------------------------------------------------------------
167
168    /// Start a fluent load query with default missing-row policy (`Ignore`).
169    #[must_use]
170    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
171    where
172        E: EntityKind<Canister = C>,
173    {
174        self.fluent_load_query(MissingRowPolicy::Ignore)
175    }
176
177    /// Start a fluent load query with explicit missing-row policy.
178    #[must_use]
179    pub const fn load_with_consistency<E>(
180        &self,
181        consistency: MissingRowPolicy,
182    ) -> FluentLoadQuery<'_, E>
183    where
184        E: EntityKind<Canister = C>,
185    {
186        self.fluent_load_query(consistency)
187    }
188
189    /// Start a fluent delete query with default missing-row policy (`Ignore`).
190    #[must_use]
191    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
192    where
193        E: PersistedRow<Canister = C>,
194    {
195        self.fluent_delete_query(MissingRowPolicy::Ignore)
196    }
197
198    /// Start a fluent delete query with explicit missing-row policy.
199    #[must_use]
200    pub fn delete_with_consistency<E>(
201        &self,
202        consistency: MissingRowPolicy,
203    ) -> FluentDeleteQuery<'_, E>
204    where
205        E: PersistedRow<Canister = C>,
206    {
207        self.fluent_delete_query(consistency)
208    }
209
210    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
211    ///
212    /// This terminal bypasses query planning and access routing entirely.
213    #[must_use]
214    pub const fn select_one(&self) -> Value {
215        Value::Int(1)
216    }
217
218    /// Return one stable, human-readable index listing for the entity schema.
219    ///
220    /// Output format mirrors SQL-style introspection:
221    /// - `PRIMARY KEY (field)`
222    /// - `INDEX name (field_a, field_b)`
223    /// - `UNIQUE INDEX name (field_a, field_b)`
224    #[must_use]
225    pub fn show_indexes<E>(&self) -> Vec<String>
226    where
227        E: EntityKind<Canister = C>,
228    {
229        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
230    }
231
232    /// Return one stable, human-readable index listing for one schema model.
233    ///
234    /// This model-only helper is schema-owned and intentionally does not
235    /// attach runtime lifecycle state because it does not carry store
236    /// placement context.
237    #[must_use]
238    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
239        show_indexes_for_model(model)
240    }
241
242    // Return one stable, human-readable index listing for one resolved
243    // store/model pair, attaching the current runtime lifecycle state when the
244    // registry can resolve the backing store handle.
245    pub(in crate::db) fn show_indexes_for_store_model(
246        &self,
247        store_path: &str,
248        model: &'static EntityModel,
249    ) -> Vec<String> {
250        let runtime_state = self
251            .db
252            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
253            .map(|store| store.index_state());
254
255        show_indexes_for_model_with_runtime_state(model, runtime_state)
256    }
257
258    /// Return one stable list of field descriptors for the entity schema.
259    #[must_use]
260    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
261    where
262        E: EntityKind<Canister = C>,
263    {
264        self.show_columns_for_model(E::MODEL)
265    }
266
267    /// Return one stable list of field descriptors for one schema model.
268    #[must_use]
269    pub fn show_columns_for_model(
270        &self,
271        model: &'static EntityModel,
272    ) -> Vec<EntityFieldDescription> {
273        describe_entity_fields(model)
274    }
275
276    /// Return field descriptors using the accepted persisted schema snapshot.
277    ///
278    /// This fallible variant is intended for SQL and diagnostics surfaces that
279    /// can report schema reconciliation failures. The infallible
280    /// `show_columns` helper remains generated-model based.
281    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
282    where
283        E: EntityKind<Canister = C>,
284    {
285        let snapshot = self.accepted_initial_schema_snapshot::<E>()?;
286
287        Ok(describe_entity_fields_with_persisted_schema(
288            E::MODEL,
289            &snapshot,
290        ))
291    }
292
293    /// Return one stable list of runtime-registered entity names.
294    #[must_use]
295    pub fn show_entities(&self) -> Vec<String> {
296        self.db.runtime_entity_names()
297    }
298
299    /// Return one stable list of runtime-registered entity names.
300    ///
301    /// `SHOW TABLES` is only an alias for `SHOW ENTITIES`, so the typed
302    /// metadata surface keeps the same alias relationship.
303    #[must_use]
304    pub fn show_tables(&self) -> Vec<String> {
305        self.show_entities()
306    }
307
308    // Resolve the exact secondary-index set that is visible to planner-owned
309    // query planning for one recovered store/model pair.
310    fn visible_indexes_for_store_model(
311        &self,
312        store_path: &str,
313        model: &'static EntityModel,
314    ) -> Result<VisibleIndexes<'static>, QueryError> {
315        // Phase 1: resolve the recovered store state once at the session
316        // boundary so query/executor planning does not reopen lifecycle checks.
317        let store = self
318            .db
319            .recovered_store(store_path)
320            .map_err(QueryError::execute)?;
321        let state = store.index_state();
322        if state != IndexState::Ready {
323            return Ok(VisibleIndexes::none());
324        }
325        debug_assert_eq!(state, IndexState::Ready);
326
327        // Phase 2: planner-visible indexes are exactly the model-owned index
328        // declarations once the recovered store is query-visible.
329        Ok(VisibleIndexes::planner_visible(model.indexes()))
330    }
331
332    /// Return one structured schema description for the entity.
333    ///
334    /// This is a typed `DESCRIBE`-style introspection surface consumed by
335    /// developer tooling and pre-EXPLAIN debugging.
336    #[must_use]
337    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
338    where
339        E: EntityKind<Canister = C>,
340    {
341        self.describe_entity_model(E::MODEL)
342    }
343
344    /// Return one structured schema description for one schema model.
345    #[must_use]
346    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
347        describe_entity_model(model)
348    }
349
350    /// Return a schema description using the accepted persisted schema snapshot.
351    ///
352    /// This is the live-schema counterpart to `describe_entity`. It is fallible
353    /// because loading accepted schema authority can fail if startup
354    /// reconciliation rejects the stored metadata.
355    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
356    where
357        E: EntityKind<Canister = C>,
358    {
359        let snapshot = self.accepted_initial_schema_snapshot::<E>()?;
360
361        Ok(describe_entity_model_with_persisted_schema(
362            E::MODEL,
363            &snapshot,
364        ))
365    }
366
367    // Load the accepted initial schema snapshot for one generated entity after
368    // enforcing recovery/reconciliation. Later schema-evolution work will
369    // replace the initial-version lookup with accepted live-version authority.
370    fn accepted_initial_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
371    where
372        E: EntityKind<Canister = C>,
373    {
374        let store = self.db.recovered_store(E::Store::PATH)?;
375
376        store.with_schema_mut(|schema_store| {
377            ensure_initial_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
378        })
379    }
380
381    /// Build one point-in-time storage report for observability endpoints.
382    pub fn storage_report(
383        &self,
384        name_to_path: &[(&'static str, &'static str)],
385    ) -> Result<StorageReport, InternalError> {
386        self.db.storage_report(name_to_path)
387    }
388
389    /// Build one point-in-time storage report using default entity-path labels.
390    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
391        self.db.storage_report_default()
392    }
393
394    /// Build one point-in-time integrity scan report for observability endpoints.
395    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
396        self.db.integrity_report()
397    }
398
399    // ---------------------------------------------------------------------
400    // Low-level executors (crate-internal; execution primitives)
401    // ---------------------------------------------------------------------
402
403    #[must_use]
404    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
405    where
406        E: EntityKind<Canister = C> + EntityValue,
407    {
408        LoadExecutor::new(self.db, self.debug)
409    }
410
411    #[must_use]
412    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
413    where
414        E: PersistedRow<Canister = C> + EntityValue,
415    {
416        DeleteExecutor::new(self.db)
417    }
418
419    #[must_use]
420    pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
421    where
422        E: PersistedRow<Canister = C> + EntityValue,
423    {
424        SaveExecutor::new(self.db, self.debug)
425    }
426}