1mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18 db::{
19 Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20 FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
21 QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
22 commit::CommitSchemaFingerprint,
23 executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
24 query::plan::VisibleIndexes,
25 schema::{
26 AcceptedRowDecodeContract, AcceptedRowLayoutRuntimeDescriptor, AcceptedSchemaSnapshot,
27 SchemaInfo, accepted_commit_schema_fingerprint_for_model, describe_entity_fields,
28 describe_entity_fields_with_persisted_schema, describe_entity_model,
29 describe_entity_model_with_persisted_schema, ensure_accepted_schema_snapshot,
30 show_indexes_for_model, show_indexes_for_model_with_runtime_state,
31 },
32 },
33 error::InternalError,
34 metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
35 model::entity::EntityModel,
36 traits::{CanisterKind, EntityKind, EntityValue, Path},
37 value::Value,
38};
39use std::thread::LocalKey;
40
41#[cfg(feature = "diagnostics")]
42pub use query::{
43 DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
44 QueryExecutionAttribution,
45};
46pub(in crate::db) use response::finalize_structural_grouped_projection_result;
47pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
48#[cfg(feature = "sql")]
49pub use sql::SqlStatementResult;
50#[cfg(all(feature = "sql", feature = "diagnostics"))]
51pub use sql::{
52 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
53 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
54};
55#[cfg(all(feature = "sql", feature = "diagnostics"))]
56pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
57
58pub struct DbSession<C: CanisterKind> {
65 db: Db<C>,
66 debug: bool,
67 metrics: Option<&'static dyn MetricsSink>,
68}
69
70impl<C: CanisterKind> DbSession<C> {
71 #[must_use]
73 pub(crate) const fn new(db: Db<C>) -> Self {
74 Self {
75 db,
76 debug: false,
77 metrics: None,
78 }
79 }
80
81 #[must_use]
83 pub const fn new_with_hooks(
84 store: &'static LocalKey<StoreRegistry>,
85 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
86 ) -> Self {
87 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
88 }
89
90 #[must_use]
92 pub const fn debug(mut self) -> Self {
93 self.debug = true;
94 self
95 }
96
97 #[must_use]
99 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
100 self.metrics = Some(sink);
101 self
102 }
103
104 const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
107 where
108 E: EntityKind<Canister = C>,
109 {
110 FluentLoadQuery::new(self, Query::new(consistency))
111 }
112
113 fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
117 where
118 E: PersistedRow<Canister = C>,
119 {
120 FluentDeleteQuery::new(self, Query::new(consistency).delete())
121 }
122
123 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
124 if let Some(sink) = self.metrics {
125 with_metrics_sink(sink, f)
126 } else {
127 f()
128 }
129 }
130
131 fn execute_save_with<E, T, R>(
133 &self,
134 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
135 map: impl FnOnce(T) -> R,
136 ) -> Result<R, InternalError>
137 where
138 E: PersistedRow<Canister = C> + EntityValue,
139 {
140 let (contract, schema_info, schema_fingerprint) = match self
141 .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
142 {
143 Ok(authority) => authority,
144 Err(error) => {
145 self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
146
147 return Err(error);
148 }
149 };
150 let value = self.with_metrics(|| {
151 op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
152 })?;
153
154 Ok(map(value))
155 }
156
157 fn execute_save_with_checked_accepted_row_contract<E, T, R>(
162 &self,
163 accepted_row_decode_contract: AcceptedRowDecodeContract,
164 accepted_schema_info: SchemaInfo,
165 accepted_schema_fingerprint: CommitSchemaFingerprint,
166 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
167 map: impl FnOnce(T) -> R,
168 ) -> Result<R, InternalError>
169 where
170 E: PersistedRow<Canister = C> + EntityValue,
171 {
172 let value = self.with_metrics(|| {
173 op(self.save_executor::<E>(
174 accepted_row_decode_contract,
175 accepted_schema_info,
176 accepted_schema_fingerprint,
177 ))
178 })?;
179
180 Ok(map(value))
181 }
182
183 fn execute_save_entity<E>(
185 &self,
186 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
187 ) -> Result<E, InternalError>
188 where
189 E: PersistedRow<Canister = C> + EntityValue,
190 {
191 self.execute_save_with(op, std::convert::identity)
192 }
193
194 fn execute_save_batch<E>(
195 &self,
196 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
197 ) -> Result<WriteBatchResponse<E>, InternalError>
198 where
199 E: PersistedRow<Canister = C> + EntityValue,
200 {
201 self.execute_save_with(op, WriteBatchResponse::new)
202 }
203
204 #[must_use]
210 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
211 where
212 E: EntityKind<Canister = C>,
213 {
214 self.fluent_load_query(MissingRowPolicy::Ignore)
215 }
216
217 #[must_use]
219 pub const fn load_with_consistency<E>(
220 &self,
221 consistency: MissingRowPolicy,
222 ) -> FluentLoadQuery<'_, E>
223 where
224 E: EntityKind<Canister = C>,
225 {
226 self.fluent_load_query(consistency)
227 }
228
229 #[must_use]
231 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
232 where
233 E: PersistedRow<Canister = C>,
234 {
235 self.fluent_delete_query(MissingRowPolicy::Ignore)
236 }
237
238 #[must_use]
240 pub fn delete_with_consistency<E>(
241 &self,
242 consistency: MissingRowPolicy,
243 ) -> FluentDeleteQuery<'_, E>
244 where
245 E: PersistedRow<Canister = C>,
246 {
247 self.fluent_delete_query(consistency)
248 }
249
250 #[must_use]
254 pub const fn select_one(&self) -> Value {
255 Value::Int(1)
256 }
257
258 #[must_use]
265 pub fn show_indexes<E>(&self) -> Vec<String>
266 where
267 E: EntityKind<Canister = C>,
268 {
269 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
270 }
271
272 #[must_use]
278 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
279 show_indexes_for_model(model)
280 }
281
282 pub(in crate::db) fn show_indexes_for_store_model(
286 &self,
287 store_path: &str,
288 model: &'static EntityModel,
289 ) -> Vec<String> {
290 let runtime_state = self
291 .db
292 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
293 .map(|store| store.index_state());
294
295 show_indexes_for_model_with_runtime_state(model, runtime_state)
296 }
297
298 #[must_use]
304 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
305 where
306 E: EntityKind<Canister = C>,
307 {
308 self.show_columns_for_model(E::MODEL)
309 }
310
311 #[must_use]
313 pub fn show_columns_for_model(
314 &self,
315 model: &'static EntityModel,
316 ) -> Vec<EntityFieldDescription> {
317 describe_entity_fields(model)
318 }
319
320 pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
326 where
327 E: EntityKind<Canister = C>,
328 {
329 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
330
331 Ok(describe_entity_fields_with_persisted_schema(&snapshot))
332 }
333
334 #[must_use]
336 pub fn show_entities(&self) -> Vec<String> {
337 self.db.runtime_entity_names()
338 }
339
340 #[must_use]
345 pub fn show_tables(&self) -> Vec<String> {
346 self.show_entities()
347 }
348
349 fn visible_indexes_for_store_model(
352 &self,
353 store_path: &str,
354 model: &'static EntityModel,
355 ) -> Result<VisibleIndexes<'static>, QueryError> {
356 let store = self
359 .db
360 .recovered_store(store_path)
361 .map_err(QueryError::execute)?;
362 let state = store.index_state();
363 if state != IndexState::Ready {
364 return Ok(VisibleIndexes::none());
365 }
366 debug_assert_eq!(state, IndexState::Ready);
367
368 Ok(VisibleIndexes::planner_visible(model.indexes()))
371 }
372
373 #[must_use]
379 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
380 where
381 E: EntityKind<Canister = C>,
382 {
383 self.describe_entity_model(E::MODEL)
384 }
385
386 #[must_use]
388 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
389 describe_entity_model(model)
390 }
391
392 pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
398 where
399 E: EntityKind<Canister = C>,
400 {
401 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
402
403 Ok(describe_entity_model_with_persisted_schema(
404 E::MODEL,
405 &snapshot,
406 ))
407 }
408
409 fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
413 where
414 E: EntityKind<Canister = C>,
415 {
416 let store = self.db.recovered_store(E::Store::PATH)?;
417
418 store.with_schema_mut(|schema_store| {
419 ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
420 })
421 }
422
423 pub(in crate::db) fn accepted_schema_info_for_entity<E>(
427 &self,
428 ) -> Result<SchemaInfo, InternalError>
429 where
430 E: EntityKind<Canister = C>,
431 {
432 let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
433
434 Ok(SchemaInfo::from_accepted_snapshot_for_model(
435 E::MODEL,
436 &accepted_schema,
437 ))
438 }
439
440 fn accepted_authority_for_schema(
444 authority: EntityAuthority,
445 accepted_schema: &AcceptedSchemaSnapshot,
446 ) -> Result<EntityAuthority, InternalError> {
447 let (accepted_row_layout, row_shape) =
448 AcceptedRowLayoutRuntimeDescriptor::from_generated_compatible_schema(
449 accepted_schema,
450 authority.model(),
451 )?;
452 let row_decode_contract = accepted_row_layout.row_decode_contract();
453 let schema_info =
454 SchemaInfo::from_accepted_snapshot_for_model(authority.model(), accepted_schema);
455
456 Ok(
457 authority.with_accepted_row_decode_contract(
458 row_shape,
459 row_decode_contract,
460 schema_info,
461 ),
462 )
463 }
464
465 pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
469 accepted_schema: &AcceptedSchemaSnapshot,
470 ) -> Result<EntityAuthority, InternalError>
471 where
472 E: EntityKind<Canister = C>,
473 {
474 Self::accepted_authority_for_schema(EntityAuthority::for_type::<E>(), accepted_schema)
475 }
476
477 pub(in crate::db) fn accepted_entity_authority<E>(
481 &self,
482 ) -> Result<(AcceptedSchemaSnapshot, EntityAuthority), InternalError>
483 where
484 E: EntityKind<Canister = C>,
485 {
486 let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
487 let authority = Self::accepted_entity_authority_for_schema::<E>(&accepted_schema)?;
488
489 Ok((accepted_schema, authority))
490 }
491
492 fn ensure_generated_compatible_accepted_save_schema<E>(
497 &self,
498 ) -> Result<
499 (
500 AcceptedRowDecodeContract,
501 SchemaInfo,
502 CommitSchemaFingerprint,
503 ),
504 InternalError,
505 >
506 where
507 E: EntityKind<Canister = C>,
508 {
509 let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
510 let (accepted_row_layout, _) =
511 AcceptedRowLayoutRuntimeDescriptor::from_generated_compatible_schema(
512 &accepted_schema,
513 E::MODEL,
514 )?;
515 let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, &accepted_schema);
516 let schema_fingerprint =
517 accepted_commit_schema_fingerprint_for_model(E::MODEL, &accepted_schema)?;
518
519 Ok((
520 accepted_row_layout.row_decode_contract(),
521 schema_info,
522 schema_fingerprint,
523 ))
524 }
525
526 pub fn storage_report(
528 &self,
529 name_to_path: &[(&'static str, &'static str)],
530 ) -> Result<StorageReport, InternalError> {
531 self.db.storage_report(name_to_path)
532 }
533
534 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
536 self.db.storage_report_default()
537 }
538
539 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
541 self.db.integrity_report()
542 }
543
544 #[must_use]
549 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
550 where
551 E: EntityKind<Canister = C> + EntityValue,
552 {
553 LoadExecutor::new(self.db, self.debug)
554 }
555
556 #[must_use]
557 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
558 where
559 E: PersistedRow<Canister = C> + EntityValue,
560 {
561 DeleteExecutor::new(self.db)
562 }
563
564 #[must_use]
565 pub(in crate::db) const fn save_executor<E>(
566 &self,
567 accepted_row_decode_contract: AcceptedRowDecodeContract,
568 accepted_schema_info: SchemaInfo,
569 accepted_schema_fingerprint: CommitSchemaFingerprint,
570 ) -> SaveExecutor<E>
571 where
572 E: PersistedRow<Canister = C> + EntityValue,
573 {
574 SaveExecutor::new_with_accepted_contract(
575 self.db,
576 self.debug,
577 accepted_row_decode_contract,
578 accepted_schema_info,
579 accepted_schema_fingerprint,
580 )
581 }
582}