mod query;
#[cfg(feature = "sql")]
mod sql;
#[cfg(all(test, feature = "sql"))]
mod tests;
mod write;
use crate::{
db::{
Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
commit::EntityRuntimeHooks,
executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
query::plan::VisibleIndexes,
schema::{
describe_entity_fields, describe_entity_model, show_indexes_for_model,
show_indexes_for_model_with_runtime_state,
},
},
error::InternalError,
metrics::sink::{MetricsSink, with_metrics_sink},
model::entity::EntityModel,
traits::{CanisterKind, EntityKind, EntityValue, Path},
value::Value,
};
use std::thread::LocalKey;
#[cfg(feature = "diagnostics")]
pub use query::QueryExecutionAttribution;
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub use sql::SqlQueryExecutionAttribution;
#[cfg(feature = "sql")]
pub use sql::SqlStatementResult;
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
pub struct DbSession<C: CanisterKind> {
db: Db<C>,
debug: bool,
metrics: Option<&'static dyn MetricsSink>,
}
impl<C: CanisterKind> DbSession<C> {
#[must_use]
pub(crate) const fn new(db: Db<C>) -> Self {
Self {
db,
debug: false,
metrics: None,
}
}
#[must_use]
pub const fn new_with_hooks(
store: &'static LocalKey<StoreRegistry>,
entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
) -> Self {
Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
}
#[must_use]
pub const fn debug(mut self) -> Self {
self.debug = true;
self
}
#[must_use]
pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
self.metrics = Some(sink);
self
}
const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
where
E: EntityKind<Canister = C>,
{
FluentLoadQuery::new(self, Query::new(consistency))
}
fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
FluentDeleteQuery::new(self, Query::new(consistency).delete())
}
fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
if let Some(sink) = self.metrics {
with_metrics_sink(sink, f)
} else {
f()
}
}
fn execute_save_with<E, T, R>(
&self,
op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
map: impl FnOnce(T) -> R,
) -> Result<R, InternalError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
Ok(map(value))
}
fn execute_save_entity<E>(
&self,
op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
) -> Result<E, InternalError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_save_with(op, std::convert::identity)
}
fn execute_save_batch<E>(
&self,
op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
) -> Result<WriteBatchResponse<E>, InternalError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_save_with(op, WriteBatchResponse::new)
}
#[must_use]
pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
where
E: EntityKind<Canister = C>,
{
self.fluent_load_query(MissingRowPolicy::Ignore)
}
#[must_use]
pub const fn load_with_consistency<E>(
&self,
consistency: MissingRowPolicy,
) -> FluentLoadQuery<'_, E>
where
E: EntityKind<Canister = C>,
{
self.fluent_load_query(consistency)
}
#[must_use]
pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
self.fluent_delete_query(MissingRowPolicy::Ignore)
}
#[must_use]
pub fn delete_with_consistency<E>(
&self,
consistency: MissingRowPolicy,
) -> FluentDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
self.fluent_delete_query(consistency)
}
#[must_use]
pub const fn select_one(&self) -> Value {
Value::Int(1)
}
#[must_use]
pub fn show_indexes<E>(&self) -> Vec<String>
where
E: EntityKind<Canister = C>,
{
self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
}
#[must_use]
pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
show_indexes_for_model(model)
}
pub(in crate::db) fn show_indexes_for_store_model(
&self,
store_path: &str,
model: &'static EntityModel,
) -> Vec<String> {
let runtime_state = self
.db
.with_store_registry(|registry| registry.try_get_store(store_path).ok())
.map(|store| store.index_state());
show_indexes_for_model_with_runtime_state(model, runtime_state)
}
#[must_use]
pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
where
E: EntityKind<Canister = C>,
{
self.show_columns_for_model(E::MODEL)
}
#[must_use]
pub fn show_columns_for_model(
&self,
model: &'static EntityModel,
) -> Vec<EntityFieldDescription> {
describe_entity_fields(model)
}
#[must_use]
pub fn show_entities(&self) -> Vec<String> {
self.db.runtime_entity_names()
}
#[must_use]
pub fn show_tables(&self) -> Vec<String> {
self.show_entities()
}
fn visible_indexes_for_store_model(
&self,
store_path: &str,
model: &'static EntityModel,
) -> Result<VisibleIndexes<'static>, QueryError> {
let store = self
.db
.recovered_store(store_path)
.map_err(QueryError::execute)?;
let state = store.index_state();
if state != IndexState::Ready {
return Ok(VisibleIndexes::none());
}
debug_assert_eq!(state, IndexState::Ready);
Ok(VisibleIndexes::planner_visible(model.indexes()))
}
#[must_use]
pub fn describe_entity<E>(&self) -> EntitySchemaDescription
where
E: EntityKind<Canister = C>,
{
self.describe_entity_model(E::MODEL)
}
#[must_use]
pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
describe_entity_model(model)
}
pub fn storage_report(
&self,
name_to_path: &[(&'static str, &'static str)],
) -> Result<StorageReport, InternalError> {
self.db.storage_report(name_to_path)
}
pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
self.db.storage_report_default()
}
pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
self.db.integrity_report()
}
pub fn execute_migration_plan(
&self,
plan: &MigrationPlan,
max_steps: usize,
) -> Result<MigrationRunOutcome, InternalError> {
self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
}
#[must_use]
pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
where
E: EntityKind<Canister = C> + EntityValue,
{
LoadExecutor::new(self.db, self.debug)
}
#[must_use]
pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
where
E: PersistedRow<Canister = C> + EntityValue,
{
DeleteExecutor::new(self.db)
}
#[must_use]
pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
where
E: PersistedRow<Canister = C> + EntityValue,
{
SaveExecutor::new(self.db, self.debug)
}
}