1mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18 db::{
19 Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20 FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
21 QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
22 executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
23 query::plan::VisibleIndexes,
24 schema::{
25 AcceptedRowLayoutRuntimeDescriptor, AcceptedSchemaSnapshot, describe_entity_fields,
26 describe_entity_fields_with_persisted_schema, describe_entity_model,
27 describe_entity_model_with_persisted_schema, ensure_accepted_schema_snapshot,
28 show_indexes_for_model, show_indexes_for_model_with_runtime_state,
29 },
30 },
31 error::InternalError,
32 metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
33 model::entity::EntityModel,
34 traits::{CanisterKind, EntityKind, EntityValue, Path},
35 value::Value,
36};
37use std::thread::LocalKey;
38
39#[cfg(feature = "diagnostics")]
40pub use query::{
41 DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
42 QueryExecutionAttribution,
43};
44pub(in crate::db) use response::finalize_structural_grouped_projection_result;
45pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
46#[cfg(feature = "sql")]
47pub use sql::SqlStatementResult;
48#[cfg(all(feature = "sql", feature = "diagnostics"))]
49pub use sql::{
50 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
51 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
52};
53#[cfg(all(feature = "sql", feature = "diagnostics"))]
54pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
55
56pub struct DbSession<C: CanisterKind> {
63 db: Db<C>,
64 debug: bool,
65 metrics: Option<&'static dyn MetricsSink>,
66}
67
68impl<C: CanisterKind> DbSession<C> {
69 #[must_use]
71 pub(crate) const fn new(db: Db<C>) -> Self {
72 Self {
73 db,
74 debug: false,
75 metrics: None,
76 }
77 }
78
79 #[must_use]
81 pub const fn new_with_hooks(
82 store: &'static LocalKey<StoreRegistry>,
83 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
84 ) -> Self {
85 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
86 }
87
88 #[must_use]
90 pub const fn debug(mut self) -> Self {
91 self.debug = true;
92 self
93 }
94
95 #[must_use]
97 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
98 self.metrics = Some(sink);
99 self
100 }
101
102 const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
105 where
106 E: EntityKind<Canister = C>,
107 {
108 FluentLoadQuery::new(self, Query::new(consistency))
109 }
110
111 fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
115 where
116 E: PersistedRow<Canister = C>,
117 {
118 FluentDeleteQuery::new(self, Query::new(consistency).delete())
119 }
120
121 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
122 if let Some(sink) = self.metrics {
123 with_metrics_sink(sink, f)
124 } else {
125 f()
126 }
127 }
128
129 fn execute_save_with<E, T, R>(
131 &self,
132 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
133 map: impl FnOnce(T) -> R,
134 ) -> Result<R, InternalError>
135 where
136 E: PersistedRow<Canister = C> + EntityValue,
137 {
138 if let Err(error) =
139 self.with_metrics(|| self.ensure_generated_compatible_accepted_schema_snapshot::<E>())
140 {
141 self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
142
143 return Err(error);
144 }
145
146 self.execute_save_with_checked_accepted_schema(op, map)
147 }
148
149 fn execute_save_with_checked_accepted_schema<E, T, R>(
154 &self,
155 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
156 map: impl FnOnce(T) -> R,
157 ) -> Result<R, InternalError>
158 where
159 E: PersistedRow<Canister = C> + EntityValue,
160 {
161 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
162
163 Ok(map(value))
164 }
165
166 fn execute_save_entity<E>(
168 &self,
169 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
170 ) -> Result<E, InternalError>
171 where
172 E: PersistedRow<Canister = C> + EntityValue,
173 {
174 self.execute_save_with(op, std::convert::identity)
175 }
176
177 fn execute_save_batch<E>(
178 &self,
179 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
180 ) -> Result<WriteBatchResponse<E>, InternalError>
181 where
182 E: PersistedRow<Canister = C> + EntityValue,
183 {
184 self.execute_save_with(op, WriteBatchResponse::new)
185 }
186
187 #[must_use]
193 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
194 where
195 E: EntityKind<Canister = C>,
196 {
197 self.fluent_load_query(MissingRowPolicy::Ignore)
198 }
199
200 #[must_use]
202 pub const fn load_with_consistency<E>(
203 &self,
204 consistency: MissingRowPolicy,
205 ) -> FluentLoadQuery<'_, E>
206 where
207 E: EntityKind<Canister = C>,
208 {
209 self.fluent_load_query(consistency)
210 }
211
212 #[must_use]
214 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
215 where
216 E: PersistedRow<Canister = C>,
217 {
218 self.fluent_delete_query(MissingRowPolicy::Ignore)
219 }
220
221 #[must_use]
223 pub fn delete_with_consistency<E>(
224 &self,
225 consistency: MissingRowPolicy,
226 ) -> FluentDeleteQuery<'_, E>
227 where
228 E: PersistedRow<Canister = C>,
229 {
230 self.fluent_delete_query(consistency)
231 }
232
233 #[must_use]
237 pub const fn select_one(&self) -> Value {
238 Value::Int(1)
239 }
240
241 #[must_use]
248 pub fn show_indexes<E>(&self) -> Vec<String>
249 where
250 E: EntityKind<Canister = C>,
251 {
252 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
253 }
254
255 #[must_use]
261 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
262 show_indexes_for_model(model)
263 }
264
265 pub(in crate::db) fn show_indexes_for_store_model(
269 &self,
270 store_path: &str,
271 model: &'static EntityModel,
272 ) -> Vec<String> {
273 let runtime_state = self
274 .db
275 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
276 .map(|store| store.index_state());
277
278 show_indexes_for_model_with_runtime_state(model, runtime_state)
279 }
280
281 #[must_use]
287 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
288 where
289 E: EntityKind<Canister = C>,
290 {
291 self.show_columns_for_model(E::MODEL)
292 }
293
294 #[must_use]
296 pub fn show_columns_for_model(
297 &self,
298 model: &'static EntityModel,
299 ) -> Vec<EntityFieldDescription> {
300 describe_entity_fields(model)
301 }
302
303 pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
309 where
310 E: EntityKind<Canister = C>,
311 {
312 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
313
314 Ok(describe_entity_fields_with_persisted_schema(&snapshot))
315 }
316
317 #[must_use]
319 pub fn show_entities(&self) -> Vec<String> {
320 self.db.runtime_entity_names()
321 }
322
323 #[must_use]
328 pub fn show_tables(&self) -> Vec<String> {
329 self.show_entities()
330 }
331
332 fn visible_indexes_for_store_model(
335 &self,
336 store_path: &str,
337 model: &'static EntityModel,
338 ) -> Result<VisibleIndexes<'static>, QueryError> {
339 let store = self
342 .db
343 .recovered_store(store_path)
344 .map_err(QueryError::execute)?;
345 let state = store.index_state();
346 if state != IndexState::Ready {
347 return Ok(VisibleIndexes::none());
348 }
349 debug_assert_eq!(state, IndexState::Ready);
350
351 Ok(VisibleIndexes::planner_visible(model.indexes()))
354 }
355
356 #[must_use]
362 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
363 where
364 E: EntityKind<Canister = C>,
365 {
366 self.describe_entity_model(E::MODEL)
367 }
368
369 #[must_use]
371 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
372 describe_entity_model(model)
373 }
374
375 pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
381 where
382 E: EntityKind<Canister = C>,
383 {
384 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
385
386 Ok(describe_entity_model_with_persisted_schema(
387 E::MODEL,
388 &snapshot,
389 ))
390 }
391
392 fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
396 where
397 E: EntityKind<Canister = C>,
398 {
399 self.ensure_accepted_schema_snapshot_for_authority(EntityAuthority::for_type::<E>())
400 }
401
402 fn ensure_accepted_schema_snapshot_for_authority(
406 &self,
407 authority: EntityAuthority,
408 ) -> Result<AcceptedSchemaSnapshot, InternalError> {
409 let store = self.db.recovered_store(authority.store_path())?;
410
411 store.with_schema_mut(|schema_store| {
412 ensure_accepted_schema_snapshot(
413 schema_store,
414 authority.entity_tag(),
415 authority.entity_path(),
416 authority.model(),
417 )
418 })
419 }
420
421 fn ensure_accepted_schema_snapshot_and_authority(
425 &self,
426 authority: EntityAuthority,
427 ) -> Result<(AcceptedSchemaSnapshot, EntityAuthority), InternalError> {
428 let accepted_schema = self.ensure_accepted_schema_snapshot_for_authority(authority)?;
429 let accepted_row_layout =
430 AcceptedRowLayoutRuntimeDescriptor::from_accepted_schema(&accepted_schema)?;
431 let authority = authority.with_accepted_row_layout(&accepted_row_layout)?;
432
433 Ok((accepted_schema, authority))
434 }
435
436 fn ensure_generated_compatible_accepted_schema_snapshot<E>(
441 &self,
442 ) -> Result<AcceptedSchemaSnapshot, InternalError>
443 where
444 E: EntityKind<Canister = C>,
445 {
446 let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
447 let _accepted_row_layout =
448 AcceptedRowLayoutRuntimeDescriptor::from_generated_compatible_accepted_schema(
449 &accepted_schema,
450 E::MODEL,
451 )?;
452
453 Ok(accepted_schema)
454 }
455
456 pub fn storage_report(
458 &self,
459 name_to_path: &[(&'static str, &'static str)],
460 ) -> Result<StorageReport, InternalError> {
461 self.db.storage_report(name_to_path)
462 }
463
464 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
466 self.db.storage_report_default()
467 }
468
469 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
471 self.db.integrity_report()
472 }
473
474 #[must_use]
479 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
480 where
481 E: EntityKind<Canister = C> + EntityValue,
482 {
483 LoadExecutor::new(self.db, self.debug)
484 }
485
486 #[must_use]
487 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
488 where
489 E: PersistedRow<Canister = C> + EntityValue,
490 {
491 DeleteExecutor::new(self.db)
492 }
493
494 #[must_use]
495 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
496 where
497 E: PersistedRow<Canister = C> + EntityValue,
498 {
499 SaveExecutor::new(self.db, self.debug)
500 }
501}