1mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18 db::{
19 Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20 FluentLoadQuery, IndexState, IntegrityReport, MigrationPlan, MigrationRegistry,
21 MigrationRunOutcome, MissingRowPolicy, PersistedRow, Query, QueryError,
22 SchemaMigrationDescriptor, SchemaMigrationExecutionOutcome, SchemaMigrationPlanner,
23 StorageReport, StoreRegistry, WriteBatchResponse,
24 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
25 query::plan::VisibleIndexes,
26 schema::{
27 describe_entity_fields, describe_entity_model, show_indexes_for_model,
28 show_indexes_for_model_with_runtime_state,
29 },
30 },
31 error::InternalError,
32 metrics::sink::{MetricsSink, with_metrics_sink},
33 model::entity::EntityModel,
34 traits::{CanisterKind, EntityKind, EntityValue, Path},
35 value::Value,
36};
37use std::thread::LocalKey;
38
39#[cfg(feature = "diagnostics")]
40pub use query::{
41 DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
42 QueryExecutionAttribution,
43};
44pub(in crate::db) use response::finalize_structural_grouped_projection_result;
45pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
46#[cfg(feature = "sql")]
47pub use sql::SqlStatementResult;
48#[cfg(all(feature = "sql", feature = "diagnostics"))]
49pub use sql::{
50 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
51 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
52};
53#[cfg(all(feature = "sql", feature = "diagnostics"))]
54pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
55
56pub struct DbSession<C: CanisterKind> {
63 db: Db<C>,
64 debug: bool,
65 metrics: Option<&'static dyn MetricsSink>,
66}
67
68impl<C: CanisterKind> DbSession<C> {
69 #[must_use]
71 pub(crate) const fn new(db: Db<C>) -> Self {
72 Self {
73 db,
74 debug: false,
75 metrics: None,
76 }
77 }
78
79 #[must_use]
81 pub const fn new_with_hooks(
82 store: &'static LocalKey<StoreRegistry>,
83 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
84 ) -> Self {
85 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
86 }
87
88 #[must_use]
90 pub const fn debug(mut self) -> Self {
91 self.debug = true;
92 self
93 }
94
95 #[must_use]
97 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
98 self.metrics = Some(sink);
99 self
100 }
101
102 const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
105 where
106 E: EntityKind<Canister = C>,
107 {
108 FluentLoadQuery::new(self, Query::new(consistency))
109 }
110
111 fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
115 where
116 E: PersistedRow<Canister = C>,
117 {
118 FluentDeleteQuery::new(self, Query::new(consistency).delete())
119 }
120
121 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
122 if let Some(sink) = self.metrics {
123 with_metrics_sink(sink, f)
124 } else {
125 f()
126 }
127 }
128
129 fn execute_save_with<E, T, R>(
131 &self,
132 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
133 map: impl FnOnce(T) -> R,
134 ) -> Result<R, InternalError>
135 where
136 E: PersistedRow<Canister = C> + EntityValue,
137 {
138 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
139
140 Ok(map(value))
141 }
142
143 fn execute_save_entity<E>(
145 &self,
146 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
147 ) -> Result<E, InternalError>
148 where
149 E: PersistedRow<Canister = C> + EntityValue,
150 {
151 self.execute_save_with(op, std::convert::identity)
152 }
153
154 fn execute_save_batch<E>(
155 &self,
156 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
157 ) -> Result<WriteBatchResponse<E>, InternalError>
158 where
159 E: PersistedRow<Canister = C> + EntityValue,
160 {
161 self.execute_save_with(op, WriteBatchResponse::new)
162 }
163
164 #[must_use]
170 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
171 where
172 E: EntityKind<Canister = C>,
173 {
174 self.fluent_load_query(MissingRowPolicy::Ignore)
175 }
176
177 #[must_use]
179 pub const fn load_with_consistency<E>(
180 &self,
181 consistency: MissingRowPolicy,
182 ) -> FluentLoadQuery<'_, E>
183 where
184 E: EntityKind<Canister = C>,
185 {
186 self.fluent_load_query(consistency)
187 }
188
189 #[must_use]
191 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
192 where
193 E: PersistedRow<Canister = C>,
194 {
195 self.fluent_delete_query(MissingRowPolicy::Ignore)
196 }
197
198 #[must_use]
200 pub fn delete_with_consistency<E>(
201 &self,
202 consistency: MissingRowPolicy,
203 ) -> FluentDeleteQuery<'_, E>
204 where
205 E: PersistedRow<Canister = C>,
206 {
207 self.fluent_delete_query(consistency)
208 }
209
210 #[must_use]
214 pub const fn select_one(&self) -> Value {
215 Value::Int(1)
216 }
217
218 #[must_use]
225 pub fn show_indexes<E>(&self) -> Vec<String>
226 where
227 E: EntityKind<Canister = C>,
228 {
229 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
230 }
231
232 #[must_use]
238 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
239 show_indexes_for_model(model)
240 }
241
242 pub(in crate::db) fn show_indexes_for_store_model(
246 &self,
247 store_path: &str,
248 model: &'static EntityModel,
249 ) -> Vec<String> {
250 let runtime_state = self
251 .db
252 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
253 .map(|store| store.index_state());
254
255 show_indexes_for_model_with_runtime_state(model, runtime_state)
256 }
257
258 #[must_use]
260 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
261 where
262 E: EntityKind<Canister = C>,
263 {
264 self.show_columns_for_model(E::MODEL)
265 }
266
267 #[must_use]
269 pub fn show_columns_for_model(
270 &self,
271 model: &'static EntityModel,
272 ) -> Vec<EntityFieldDescription> {
273 describe_entity_fields(model)
274 }
275
276 #[must_use]
278 pub fn show_entities(&self) -> Vec<String> {
279 self.db.runtime_entity_names()
280 }
281
282 #[must_use]
287 pub fn show_tables(&self) -> Vec<String> {
288 self.show_entities()
289 }
290
291 fn visible_indexes_for_store_model(
294 &self,
295 store_path: &str,
296 model: &'static EntityModel,
297 ) -> Result<VisibleIndexes<'static>, QueryError> {
298 let store = self
301 .db
302 .recovered_store(store_path)
303 .map_err(QueryError::execute)?;
304 let state = store.index_state();
305 if state != IndexState::Ready {
306 return Ok(VisibleIndexes::none());
307 }
308 debug_assert_eq!(state, IndexState::Ready);
309
310 Ok(VisibleIndexes::planner_visible(model.indexes()))
313 }
314
315 #[must_use]
320 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
321 where
322 E: EntityKind<Canister = C>,
323 {
324 self.describe_entity_model(E::MODEL)
325 }
326
327 #[must_use]
329 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
330 describe_entity_model(model)
331 }
332
333 pub fn storage_report(
335 &self,
336 name_to_path: &[(&'static str, &'static str)],
337 ) -> Result<StorageReport, InternalError> {
338 self.db.storage_report(name_to_path)
339 }
340
341 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
343 self.db.storage_report_default()
344 }
345
346 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
348 self.db.integrity_report()
349 }
350
351 pub fn execute_migration_plan(
356 &self,
357 plan: &MigrationPlan,
358 max_steps: usize,
359 ) -> Result<MigrationRunOutcome, InternalError> {
360 self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
361 }
362
363 pub fn execute_schema_migration_descriptor(
369 &self,
370 registry: &mut MigrationRegistry,
371 planner: &SchemaMigrationPlanner,
372 descriptor: &SchemaMigrationDescriptor,
373 max_steps: usize,
374 ) -> Result<SchemaMigrationExecutionOutcome, InternalError> {
375 self.with_metrics(|| {
376 self.db
377 .execute_schema_migration_descriptor(registry, planner, descriptor, max_steps)
378 })
379 }
380
381 #[must_use]
386 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
387 where
388 E: EntityKind<Canister = C> + EntityValue,
389 {
390 LoadExecutor::new(self.db, self.debug)
391 }
392
393 #[must_use]
394 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
395 where
396 E: PersistedRow<Canister = C> + EntityValue,
397 {
398 DeleteExecutor::new(self.db)
399 }
400
401 #[must_use]
402 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
403 where
404 E: PersistedRow<Canister = C> + EntityValue,
405 {
406 SaveExecutor::new(self.db, self.debug)
407 }
408}