1mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18 db::{
19 Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20 FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
21 QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
22 executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
23 query::plan::VisibleIndexes,
24 schema::{
25 AcceptedRowDecodeContract, AcceptedRowLayoutRuntimeDescriptor, AcceptedSchemaSnapshot,
26 describe_entity_fields, describe_entity_fields_with_persisted_schema,
27 describe_entity_model, describe_entity_model_with_persisted_schema,
28 ensure_accepted_schema_snapshot, show_indexes_for_model,
29 show_indexes_for_model_with_runtime_state,
30 },
31 },
32 error::InternalError,
33 metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
34 model::entity::EntityModel,
35 traits::{CanisterKind, EntityKind, EntityValue, Path},
36 value::Value,
37};
38use std::thread::LocalKey;
39
40#[cfg(feature = "diagnostics")]
41pub use query::{
42 DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
43 QueryExecutionAttribution,
44};
45pub(in crate::db) use response::finalize_structural_grouped_projection_result;
46pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
47#[cfg(feature = "sql")]
48pub use sql::SqlStatementResult;
49#[cfg(all(feature = "sql", feature = "diagnostics"))]
50pub use sql::{
51 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
52 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
53};
54#[cfg(all(feature = "sql", feature = "diagnostics"))]
55pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
56
57pub struct DbSession<C: CanisterKind> {
64 db: Db<C>,
65 debug: bool,
66 metrics: Option<&'static dyn MetricsSink>,
67}
68
69impl<C: CanisterKind> DbSession<C> {
70 #[must_use]
72 pub(crate) const fn new(db: Db<C>) -> Self {
73 Self {
74 db,
75 debug: false,
76 metrics: None,
77 }
78 }
79
80 #[must_use]
82 pub const fn new_with_hooks(
83 store: &'static LocalKey<StoreRegistry>,
84 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
85 ) -> Self {
86 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
87 }
88
89 #[must_use]
91 pub const fn debug(mut self) -> Self {
92 self.debug = true;
93 self
94 }
95
96 #[must_use]
98 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
99 self.metrics = Some(sink);
100 self
101 }
102
103 const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
106 where
107 E: EntityKind<Canister = C>,
108 {
109 FluentLoadQuery::new(self, Query::new(consistency))
110 }
111
112 fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
116 where
117 E: PersistedRow<Canister = C>,
118 {
119 FluentDeleteQuery::new(self, Query::new(consistency).delete())
120 }
121
122 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
123 if let Some(sink) = self.metrics {
124 with_metrics_sink(sink, f)
125 } else {
126 f()
127 }
128 }
129
130 fn execute_save_with<E, T, R>(
132 &self,
133 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
134 map: impl FnOnce(T) -> R,
135 ) -> Result<R, InternalError>
136 where
137 E: PersistedRow<Canister = C> + EntityValue,
138 {
139 let contract = match self
140 .with_metrics(|| self.ensure_generated_compatible_accepted_row_decode_contract::<E>())
141 {
142 Ok(contract) => contract,
143 Err(error) => {
144 self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
145
146 return Err(error);
147 }
148 };
149 let value = self.with_metrics(|| op(self.save_executor::<E>(contract)))?;
150
151 Ok(map(value))
152 }
153
154 fn execute_save_with_checked_accepted_row_contract<E, T, R>(
159 &self,
160 accepted_row_decode_contract: AcceptedRowDecodeContract,
161 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
162 map: impl FnOnce(T) -> R,
163 ) -> Result<R, InternalError>
164 where
165 E: PersistedRow<Canister = C> + EntityValue,
166 {
167 let value =
168 self.with_metrics(|| op(self.save_executor::<E>(accepted_row_decode_contract)))?;
169
170 Ok(map(value))
171 }
172
173 fn execute_save_entity<E>(
175 &self,
176 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
177 ) -> Result<E, InternalError>
178 where
179 E: PersistedRow<Canister = C> + EntityValue,
180 {
181 self.execute_save_with(op, std::convert::identity)
182 }
183
184 fn execute_save_batch<E>(
185 &self,
186 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
187 ) -> Result<WriteBatchResponse<E>, InternalError>
188 where
189 E: PersistedRow<Canister = C> + EntityValue,
190 {
191 self.execute_save_with(op, WriteBatchResponse::new)
192 }
193
194 #[must_use]
200 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
201 where
202 E: EntityKind<Canister = C>,
203 {
204 self.fluent_load_query(MissingRowPolicy::Ignore)
205 }
206
207 #[must_use]
209 pub const fn load_with_consistency<E>(
210 &self,
211 consistency: MissingRowPolicy,
212 ) -> FluentLoadQuery<'_, E>
213 where
214 E: EntityKind<Canister = C>,
215 {
216 self.fluent_load_query(consistency)
217 }
218
219 #[must_use]
221 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
222 where
223 E: PersistedRow<Canister = C>,
224 {
225 self.fluent_delete_query(MissingRowPolicy::Ignore)
226 }
227
228 #[must_use]
230 pub fn delete_with_consistency<E>(
231 &self,
232 consistency: MissingRowPolicy,
233 ) -> FluentDeleteQuery<'_, E>
234 where
235 E: PersistedRow<Canister = C>,
236 {
237 self.fluent_delete_query(consistency)
238 }
239
240 #[must_use]
244 pub const fn select_one(&self) -> Value {
245 Value::Int(1)
246 }
247
248 #[must_use]
255 pub fn show_indexes<E>(&self) -> Vec<String>
256 where
257 E: EntityKind<Canister = C>,
258 {
259 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
260 }
261
262 #[must_use]
268 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
269 show_indexes_for_model(model)
270 }
271
272 pub(in crate::db) fn show_indexes_for_store_model(
276 &self,
277 store_path: &str,
278 model: &'static EntityModel,
279 ) -> Vec<String> {
280 let runtime_state = self
281 .db
282 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
283 .map(|store| store.index_state());
284
285 show_indexes_for_model_with_runtime_state(model, runtime_state)
286 }
287
288 #[must_use]
294 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
295 where
296 E: EntityKind<Canister = C>,
297 {
298 self.show_columns_for_model(E::MODEL)
299 }
300
301 #[must_use]
303 pub fn show_columns_for_model(
304 &self,
305 model: &'static EntityModel,
306 ) -> Vec<EntityFieldDescription> {
307 describe_entity_fields(model)
308 }
309
310 pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
316 where
317 E: EntityKind<Canister = C>,
318 {
319 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
320
321 Ok(describe_entity_fields_with_persisted_schema(&snapshot))
322 }
323
324 #[must_use]
326 pub fn show_entities(&self) -> Vec<String> {
327 self.db.runtime_entity_names()
328 }
329
330 #[must_use]
335 pub fn show_tables(&self) -> Vec<String> {
336 self.show_entities()
337 }
338
339 fn visible_indexes_for_store_model(
342 &self,
343 store_path: &str,
344 model: &'static EntityModel,
345 ) -> Result<VisibleIndexes<'static>, QueryError> {
346 let store = self
349 .db
350 .recovered_store(store_path)
351 .map_err(QueryError::execute)?;
352 let state = store.index_state();
353 if state != IndexState::Ready {
354 return Ok(VisibleIndexes::none());
355 }
356 debug_assert_eq!(state, IndexState::Ready);
357
358 Ok(VisibleIndexes::planner_visible(model.indexes()))
361 }
362
363 #[must_use]
369 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
370 where
371 E: EntityKind<Canister = C>,
372 {
373 self.describe_entity_model(E::MODEL)
374 }
375
376 #[must_use]
378 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
379 describe_entity_model(model)
380 }
381
382 pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
388 where
389 E: EntityKind<Canister = C>,
390 {
391 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
392
393 Ok(describe_entity_model_with_persisted_schema(
394 E::MODEL,
395 &snapshot,
396 ))
397 }
398
399 fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
403 where
404 E: EntityKind<Canister = C>,
405 {
406 self.ensure_accepted_schema_snapshot_for_authority(&EntityAuthority::for_type::<E>())
407 }
408
409 fn ensure_accepted_schema_snapshot_for_authority(
413 &self,
414 authority: &EntityAuthority,
415 ) -> Result<AcceptedSchemaSnapshot, InternalError> {
416 let store = self.db.recovered_store(authority.store_path())?;
417
418 store.with_schema_mut(|schema_store| {
419 ensure_accepted_schema_snapshot(
420 schema_store,
421 authority.entity_tag(),
422 authority.entity_path(),
423 authority.model(),
424 )
425 })
426 }
427
428 fn ensure_accepted_schema_snapshot_and_authority(
432 &self,
433 authority: EntityAuthority,
434 ) -> Result<(AcceptedSchemaSnapshot, EntityAuthority), InternalError> {
435 let accepted_schema = self.ensure_accepted_schema_snapshot_for_authority(&authority)?;
436 let (accepted_row_layout, row_shape) =
437 AcceptedRowLayoutRuntimeDescriptor::from_generated_compatible_schema(
438 &accepted_schema,
439 authority.model(),
440 )?;
441 let row_decode_contract = accepted_row_layout.row_decode_contract();
442 let authority = authority.with_accepted_row_decode_contract(row_shape, row_decode_contract);
443
444 Ok((accepted_schema, authority))
445 }
446
447 fn ensure_generated_compatible_accepted_row_decode_contract<E>(
452 &self,
453 ) -> Result<AcceptedRowDecodeContract, InternalError>
454 where
455 E: EntityKind<Canister = C>,
456 {
457 let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
458 let (accepted_row_layout, _) =
459 AcceptedRowLayoutRuntimeDescriptor::from_generated_compatible_schema(
460 &accepted_schema,
461 E::MODEL,
462 )?;
463
464 Ok(accepted_row_layout.row_decode_contract())
465 }
466
467 pub fn storage_report(
469 &self,
470 name_to_path: &[(&'static str, &'static str)],
471 ) -> Result<StorageReport, InternalError> {
472 self.db.storage_report(name_to_path)
473 }
474
475 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
477 self.db.storage_report_default()
478 }
479
480 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
482 self.db.integrity_report()
483 }
484
485 #[must_use]
490 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
491 where
492 E: EntityKind<Canister = C> + EntityValue,
493 {
494 LoadExecutor::new(self.db, self.debug)
495 }
496
497 #[must_use]
498 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
499 where
500 E: PersistedRow<Canister = C> + EntityValue,
501 {
502 DeleteExecutor::new(self.db)
503 }
504
505 #[must_use]
506 pub(in crate::db) const fn save_executor<E>(
507 &self,
508 accepted_row_decode_contract: AcceptedRowDecodeContract,
509 ) -> SaveExecutor<E>
510 where
511 E: PersistedRow<Canister = C> + EntityValue,
512 {
513 SaveExecutor::new_with_accepted_contract(self.db, self.debug, accepted_row_decode_contract)
514 }
515}