1mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18 db::{
19 Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20 FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
21 QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
22 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
23 query::plan::VisibleIndexes,
24 schema::{
25 AcceptedSchemaSnapshot, describe_entity_fields,
26 describe_entity_fields_with_persisted_schema, describe_entity_model,
27 describe_entity_model_with_persisted_schema, ensure_initial_schema_snapshot,
28 show_indexes_for_model, show_indexes_for_model_with_runtime_state,
29 },
30 },
31 error::InternalError,
32 metrics::sink::{MetricsSink, with_metrics_sink},
33 model::entity::EntityModel,
34 traits::{CanisterKind, EntityKind, EntityValue, Path},
35 value::Value,
36};
37use std::thread::LocalKey;
38
39#[cfg(feature = "diagnostics")]
40pub use query::{
41 DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
42 QueryExecutionAttribution,
43};
44pub(in crate::db) use response::finalize_structural_grouped_projection_result;
45pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
46#[cfg(feature = "sql")]
47pub use sql::SqlStatementResult;
48#[cfg(all(feature = "sql", feature = "diagnostics"))]
49pub use sql::{
50 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
51 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
52};
53#[cfg(all(feature = "sql", feature = "diagnostics"))]
54pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
55
56pub struct DbSession<C: CanisterKind> {
63 db: Db<C>,
64 debug: bool,
65 metrics: Option<&'static dyn MetricsSink>,
66}
67
68impl<C: CanisterKind> DbSession<C> {
69 #[must_use]
71 pub(crate) const fn new(db: Db<C>) -> Self {
72 Self {
73 db,
74 debug: false,
75 metrics: None,
76 }
77 }
78
79 #[must_use]
81 pub const fn new_with_hooks(
82 store: &'static LocalKey<StoreRegistry>,
83 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
84 ) -> Self {
85 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
86 }
87
88 #[must_use]
90 pub const fn debug(mut self) -> Self {
91 self.debug = true;
92 self
93 }
94
95 #[must_use]
97 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
98 self.metrics = Some(sink);
99 self
100 }
101
102 const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
105 where
106 E: EntityKind<Canister = C>,
107 {
108 FluentLoadQuery::new(self, Query::new(consistency))
109 }
110
111 fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
115 where
116 E: PersistedRow<Canister = C>,
117 {
118 FluentDeleteQuery::new(self, Query::new(consistency).delete())
119 }
120
121 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
122 if let Some(sink) = self.metrics {
123 with_metrics_sink(sink, f)
124 } else {
125 f()
126 }
127 }
128
129 fn execute_save_with<E, T, R>(
131 &self,
132 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
133 map: impl FnOnce(T) -> R,
134 ) -> Result<R, InternalError>
135 where
136 E: PersistedRow<Canister = C> + EntityValue,
137 {
138 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
139
140 Ok(map(value))
141 }
142
143 fn execute_save_entity<E>(
145 &self,
146 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
147 ) -> Result<E, InternalError>
148 where
149 E: PersistedRow<Canister = C> + EntityValue,
150 {
151 self.execute_save_with(op, std::convert::identity)
152 }
153
154 fn execute_save_batch<E>(
155 &self,
156 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
157 ) -> Result<WriteBatchResponse<E>, InternalError>
158 where
159 E: PersistedRow<Canister = C> + EntityValue,
160 {
161 self.execute_save_with(op, WriteBatchResponse::new)
162 }
163
164 #[must_use]
170 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
171 where
172 E: EntityKind<Canister = C>,
173 {
174 self.fluent_load_query(MissingRowPolicy::Ignore)
175 }
176
177 #[must_use]
179 pub const fn load_with_consistency<E>(
180 &self,
181 consistency: MissingRowPolicy,
182 ) -> FluentLoadQuery<'_, E>
183 where
184 E: EntityKind<Canister = C>,
185 {
186 self.fluent_load_query(consistency)
187 }
188
189 #[must_use]
191 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
192 where
193 E: PersistedRow<Canister = C>,
194 {
195 self.fluent_delete_query(MissingRowPolicy::Ignore)
196 }
197
198 #[must_use]
200 pub fn delete_with_consistency<E>(
201 &self,
202 consistency: MissingRowPolicy,
203 ) -> FluentDeleteQuery<'_, E>
204 where
205 E: PersistedRow<Canister = C>,
206 {
207 self.fluent_delete_query(consistency)
208 }
209
210 #[must_use]
214 pub const fn select_one(&self) -> Value {
215 Value::Int(1)
216 }
217
218 #[must_use]
225 pub fn show_indexes<E>(&self) -> Vec<String>
226 where
227 E: EntityKind<Canister = C>,
228 {
229 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
230 }
231
232 #[must_use]
238 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
239 show_indexes_for_model(model)
240 }
241
242 pub(in crate::db) fn show_indexes_for_store_model(
246 &self,
247 store_path: &str,
248 model: &'static EntityModel,
249 ) -> Vec<String> {
250 let runtime_state = self
251 .db
252 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
253 .map(|store| store.index_state());
254
255 show_indexes_for_model_with_runtime_state(model, runtime_state)
256 }
257
258 #[must_use]
260 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
261 where
262 E: EntityKind<Canister = C>,
263 {
264 self.show_columns_for_model(E::MODEL)
265 }
266
267 #[must_use]
269 pub fn show_columns_for_model(
270 &self,
271 model: &'static EntityModel,
272 ) -> Vec<EntityFieldDescription> {
273 describe_entity_fields(model)
274 }
275
276 pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
282 where
283 E: EntityKind<Canister = C>,
284 {
285 let snapshot = self.accepted_initial_schema_snapshot::<E>()?;
286
287 Ok(describe_entity_fields_with_persisted_schema(
288 E::MODEL,
289 &snapshot,
290 ))
291 }
292
293 #[must_use]
295 pub fn show_entities(&self) -> Vec<String> {
296 self.db.runtime_entity_names()
297 }
298
299 #[must_use]
304 pub fn show_tables(&self) -> Vec<String> {
305 self.show_entities()
306 }
307
308 fn visible_indexes_for_store_model(
311 &self,
312 store_path: &str,
313 model: &'static EntityModel,
314 ) -> Result<VisibleIndexes<'static>, QueryError> {
315 let store = self
318 .db
319 .recovered_store(store_path)
320 .map_err(QueryError::execute)?;
321 let state = store.index_state();
322 if state != IndexState::Ready {
323 return Ok(VisibleIndexes::none());
324 }
325 debug_assert_eq!(state, IndexState::Ready);
326
327 Ok(VisibleIndexes::planner_visible(model.indexes()))
330 }
331
332 #[must_use]
337 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
338 where
339 E: EntityKind<Canister = C>,
340 {
341 self.describe_entity_model(E::MODEL)
342 }
343
344 #[must_use]
346 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
347 describe_entity_model(model)
348 }
349
350 pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
356 where
357 E: EntityKind<Canister = C>,
358 {
359 let snapshot = self.accepted_initial_schema_snapshot::<E>()?;
360
361 Ok(describe_entity_model_with_persisted_schema(
362 E::MODEL,
363 &snapshot,
364 ))
365 }
366
367 fn accepted_initial_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
371 where
372 E: EntityKind<Canister = C>,
373 {
374 let store = self.db.recovered_store(E::Store::PATH)?;
375
376 store.with_schema_mut(|schema_store| {
377 ensure_initial_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
378 })
379 }
380
381 pub fn storage_report(
383 &self,
384 name_to_path: &[(&'static str, &'static str)],
385 ) -> Result<StorageReport, InternalError> {
386 self.db.storage_report(name_to_path)
387 }
388
389 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
391 self.db.storage_report_default()
392 }
393
394 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
396 self.db.integrity_report()
397 }
398
399 #[must_use]
404 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
405 where
406 E: EntityKind<Canister = C> + EntityValue,
407 {
408 LoadExecutor::new(self.db, self.debug)
409 }
410
411 #[must_use]
412 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
413 where
414 E: PersistedRow<Canister = C> + EntityValue,
415 {
416 DeleteExecutor::new(self.db)
417 }
418
419 #[must_use]
420 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
421 where
422 E: PersistedRow<Canister = C> + EntityValue,
423 {
424 SaveExecutor::new(self.db, self.debug)
425 }
426}