1mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18 db::{
19 Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20 FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
21 QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
22 executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
23 query::plan::VisibleIndexes,
24 schema::{
25 AcceptedRowDecodeContract, AcceptedRowLayoutRuntimeDescriptor, AcceptedSchemaSnapshot,
26 describe_entity_fields, describe_entity_fields_with_persisted_schema,
27 describe_entity_model, describe_entity_model_with_persisted_schema,
28 ensure_accepted_schema_snapshot, show_indexes_for_model,
29 show_indexes_for_model_with_runtime_state,
30 },
31 },
32 error::InternalError,
33 metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
34 model::entity::EntityModel,
35 traits::{CanisterKind, EntityKind, EntityValue, Path},
36 value::Value,
37};
38use std::thread::LocalKey;
39
40#[cfg(feature = "diagnostics")]
41pub use query::{
42 DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
43 QueryExecutionAttribution,
44};
45pub(in crate::db) use response::finalize_structural_grouped_projection_result;
46pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
47#[cfg(feature = "sql")]
48pub use sql::SqlStatementResult;
49#[cfg(all(feature = "sql", feature = "diagnostics"))]
50pub use sql::{
51 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
52 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
53};
54#[cfg(all(feature = "sql", feature = "diagnostics"))]
55pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
56
57pub struct DbSession<C: CanisterKind> {
64 db: Db<C>,
65 debug: bool,
66 metrics: Option<&'static dyn MetricsSink>,
67}
68
69impl<C: CanisterKind> DbSession<C> {
70 #[must_use]
72 pub(crate) const fn new(db: Db<C>) -> Self {
73 Self {
74 db,
75 debug: false,
76 metrics: None,
77 }
78 }
79
80 #[must_use]
82 pub const fn new_with_hooks(
83 store: &'static LocalKey<StoreRegistry>,
84 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
85 ) -> Self {
86 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
87 }
88
89 #[must_use]
91 pub const fn debug(mut self) -> Self {
92 self.debug = true;
93 self
94 }
95
96 #[must_use]
98 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
99 self.metrics = Some(sink);
100 self
101 }
102
103 const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
106 where
107 E: EntityKind<Canister = C>,
108 {
109 FluentLoadQuery::new(self, Query::new(consistency))
110 }
111
112 fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
116 where
117 E: PersistedRow<Canister = C>,
118 {
119 FluentDeleteQuery::new(self, Query::new(consistency).delete())
120 }
121
122 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
123 if let Some(sink) = self.metrics {
124 with_metrics_sink(sink, f)
125 } else {
126 f()
127 }
128 }
129
130 fn execute_save_with<E, T, R>(
132 &self,
133 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
134 map: impl FnOnce(T) -> R,
135 ) -> Result<R, InternalError>
136 where
137 E: PersistedRow<Canister = C> + EntityValue,
138 {
139 let contract = match self
140 .with_metrics(|| self.ensure_generated_compatible_accepted_row_decode_contract::<E>())
141 {
142 Ok(contract) => contract,
143 Err(error) => {
144 self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
145
146 return Err(error);
147 }
148 };
149 let value = self.with_metrics(|| {
150 op(self
151 .save_executor::<E>()
152 .with_accepted_row_decode_contract(contract))
153 })?;
154
155 Ok(map(value))
156 }
157
158 fn execute_save_with_checked_accepted_row_contract<E, T, R>(
163 &self,
164 accepted_row_decode_contract: AcceptedRowDecodeContract,
165 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
166 map: impl FnOnce(T) -> R,
167 ) -> Result<R, InternalError>
168 where
169 E: PersistedRow<Canister = C> + EntityValue,
170 {
171 let value = self.with_metrics(|| {
172 op(self
173 .save_executor::<E>()
174 .with_accepted_row_decode_contract(accepted_row_decode_contract))
175 })?;
176
177 Ok(map(value))
178 }
179
180 fn execute_save_entity<E>(
182 &self,
183 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
184 ) -> Result<E, InternalError>
185 where
186 E: PersistedRow<Canister = C> + EntityValue,
187 {
188 self.execute_save_with(op, std::convert::identity)
189 }
190
191 fn execute_save_batch<E>(
192 &self,
193 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
194 ) -> Result<WriteBatchResponse<E>, InternalError>
195 where
196 E: PersistedRow<Canister = C> + EntityValue,
197 {
198 self.execute_save_with(op, WriteBatchResponse::new)
199 }
200
201 #[must_use]
207 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
208 where
209 E: EntityKind<Canister = C>,
210 {
211 self.fluent_load_query(MissingRowPolicy::Ignore)
212 }
213
214 #[must_use]
216 pub const fn load_with_consistency<E>(
217 &self,
218 consistency: MissingRowPolicy,
219 ) -> FluentLoadQuery<'_, E>
220 where
221 E: EntityKind<Canister = C>,
222 {
223 self.fluent_load_query(consistency)
224 }
225
226 #[must_use]
228 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
229 where
230 E: PersistedRow<Canister = C>,
231 {
232 self.fluent_delete_query(MissingRowPolicy::Ignore)
233 }
234
235 #[must_use]
237 pub fn delete_with_consistency<E>(
238 &self,
239 consistency: MissingRowPolicy,
240 ) -> FluentDeleteQuery<'_, E>
241 where
242 E: PersistedRow<Canister = C>,
243 {
244 self.fluent_delete_query(consistency)
245 }
246
247 #[must_use]
251 pub const fn select_one(&self) -> Value {
252 Value::Int(1)
253 }
254
255 #[must_use]
262 pub fn show_indexes<E>(&self) -> Vec<String>
263 where
264 E: EntityKind<Canister = C>,
265 {
266 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
267 }
268
269 #[must_use]
275 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
276 show_indexes_for_model(model)
277 }
278
279 pub(in crate::db) fn show_indexes_for_store_model(
283 &self,
284 store_path: &str,
285 model: &'static EntityModel,
286 ) -> Vec<String> {
287 let runtime_state = self
288 .db
289 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
290 .map(|store| store.index_state());
291
292 show_indexes_for_model_with_runtime_state(model, runtime_state)
293 }
294
295 #[must_use]
301 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
302 where
303 E: EntityKind<Canister = C>,
304 {
305 self.show_columns_for_model(E::MODEL)
306 }
307
308 #[must_use]
310 pub fn show_columns_for_model(
311 &self,
312 model: &'static EntityModel,
313 ) -> Vec<EntityFieldDescription> {
314 describe_entity_fields(model)
315 }
316
317 pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
323 where
324 E: EntityKind<Canister = C>,
325 {
326 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
327
328 Ok(describe_entity_fields_with_persisted_schema(&snapshot))
329 }
330
331 #[must_use]
333 pub fn show_entities(&self) -> Vec<String> {
334 self.db.runtime_entity_names()
335 }
336
337 #[must_use]
342 pub fn show_tables(&self) -> Vec<String> {
343 self.show_entities()
344 }
345
346 fn visible_indexes_for_store_model(
349 &self,
350 store_path: &str,
351 model: &'static EntityModel,
352 ) -> Result<VisibleIndexes<'static>, QueryError> {
353 let store = self
356 .db
357 .recovered_store(store_path)
358 .map_err(QueryError::execute)?;
359 let state = store.index_state();
360 if state != IndexState::Ready {
361 return Ok(VisibleIndexes::none());
362 }
363 debug_assert_eq!(state, IndexState::Ready);
364
365 Ok(VisibleIndexes::planner_visible(model.indexes()))
368 }
369
370 #[must_use]
376 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
377 where
378 E: EntityKind<Canister = C>,
379 {
380 self.describe_entity_model(E::MODEL)
381 }
382
383 #[must_use]
385 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
386 describe_entity_model(model)
387 }
388
389 pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
395 where
396 E: EntityKind<Canister = C>,
397 {
398 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
399
400 Ok(describe_entity_model_with_persisted_schema(
401 E::MODEL,
402 &snapshot,
403 ))
404 }
405
406 fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
410 where
411 E: EntityKind<Canister = C>,
412 {
413 self.ensure_accepted_schema_snapshot_for_authority(&EntityAuthority::for_type::<E>())
414 }
415
416 fn ensure_accepted_schema_snapshot_for_authority(
420 &self,
421 authority: &EntityAuthority,
422 ) -> Result<AcceptedSchemaSnapshot, InternalError> {
423 let store = self.db.recovered_store(authority.store_path())?;
424
425 store.with_schema_mut(|schema_store| {
426 ensure_accepted_schema_snapshot(
427 schema_store,
428 authority.entity_tag(),
429 authority.entity_path(),
430 authority.model(),
431 )
432 })
433 }
434
435 fn ensure_accepted_schema_snapshot_and_authority(
439 &self,
440 authority: EntityAuthority,
441 ) -> Result<(AcceptedSchemaSnapshot, EntityAuthority), InternalError> {
442 let accepted_schema = self.ensure_accepted_schema_snapshot_for_authority(&authority)?;
443 let accepted_row_layout =
444 AcceptedRowLayoutRuntimeDescriptor::from_accepted_schema(&accepted_schema)?;
445 let row_shape =
446 accepted_row_layout.generated_compatible_row_shape_for_model(authority.model())?;
447 let row_decode_contract = accepted_row_layout.row_decode_contract();
448 let authority = authority.with_accepted_row_decode_contract(row_shape, row_decode_contract);
449
450 Ok((accepted_schema, authority))
451 }
452
453 fn ensure_generated_compatible_accepted_row_decode_contract<E>(
458 &self,
459 ) -> Result<AcceptedRowDecodeContract, InternalError>
460 where
461 E: EntityKind<Canister = C>,
462 {
463 let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
464 let accepted_row_layout =
465 AcceptedRowLayoutRuntimeDescriptor::from_accepted_schema(&accepted_schema)?;
466 accepted_row_layout.generated_compatible_row_shape_for_model(E::MODEL)?;
467
468 Ok(accepted_row_layout.row_decode_contract())
469 }
470
471 pub fn storage_report(
473 &self,
474 name_to_path: &[(&'static str, &'static str)],
475 ) -> Result<StorageReport, InternalError> {
476 self.db.storage_report(name_to_path)
477 }
478
479 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
481 self.db.storage_report_default()
482 }
483
484 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
486 self.db.integrity_report()
487 }
488
489 #[must_use]
494 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
495 where
496 E: EntityKind<Canister = C> + EntityValue,
497 {
498 LoadExecutor::new(self.db, self.debug)
499 }
500
501 #[must_use]
502 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
503 where
504 E: PersistedRow<Canister = C> + EntityValue,
505 {
506 DeleteExecutor::new(self.db)
507 }
508
509 #[must_use]
510 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
511 where
512 E: PersistedRow<Canister = C> + EntityValue,
513 {
514 SaveExecutor::new(self.db, self.debug)
515 }
516}