mod query;
mod response;
#[cfg(feature = "sql")]
mod sql;
#[cfg(all(test, feature = "sql"))]
mod tests;
mod write;
use crate::{
db::{
Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
QueryError, StorageReport, StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
commit::CommitSchemaFingerprint,
executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
query::plan::VisibleIndexes,
schema::{
AcceptedRowDecodeContract, AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot,
SchemaInfo, accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
describe_entity_fields, describe_entity_fields_with_persisted_schema,
describe_entity_model, describe_entity_model_with_persisted_schema,
ensure_accepted_schema_snapshot, show_indexes_for_model,
show_indexes_for_model_with_runtime_state,
show_indexes_for_schema_info_with_runtime_state,
},
},
error::InternalError,
metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
model::entity::EntityModel,
traits::{CanisterKind, EntityKind, EntityValue, Path},
value::Value,
};
use std::{cell::RefCell, collections::HashMap, thread::LocalKey};
#[cfg(feature = "diagnostics")]
pub use query::{
DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
QueryExecutionAttribution,
};
pub(in crate::db) use response::finalize_structural_grouped_projection_result;
pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub use sql::{
SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
};
#[cfg(feature = "sql")]
pub use sql::{
SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport, SqlStatementResult,
SqlStatementSurface, sql_statement_entity_name, sql_statement_surface,
};
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
pub struct DbSession<C: CanisterKind> {
db: Db<C>,
debug: bool,
metrics: Option<&'static dyn MetricsSink>,
}
#[derive(Clone, Debug)]
struct AcceptedSchemaQueryCacheEntry {
snapshot: AcceptedSchemaSnapshot,
fingerprint: CommitSchemaFingerprint,
}
pub(in crate::db) type AcceptedSaveContract = (
AcceptedRowDecodeContract,
AcceptedRowDecodeContract,
SchemaInfo,
CommitSchemaFingerprint,
);
#[derive(Clone, Debug)]
pub(in crate::db) struct AcceptedSchemaCatalogContext {
snapshot: AcceptedSchemaSnapshot,
fingerprint: CommitSchemaFingerprint,
}
impl AcceptedSchemaCatalogContext {
#[must_use]
pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
&self.snapshot
}
#[must_use]
pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
self.fingerprint
}
pub(in crate::db) fn accepted_entity_authority_for<E>(
&self,
) -> Result<EntityAuthority, InternalError>
where
E: EntityKind,
{
EntityAuthority::from_accepted_schema_for_type::<E>(&self.snapshot)
}
#[must_use]
pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
where
E: EntityKind,
{
SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
E::MODEL,
&self.snapshot,
true,
)
}
}
pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
accepted_schema: &AcceptedSchemaSnapshot,
descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
) -> Result<AcceptedSaveContract, InternalError>
where
E: EntityKind,
{
let row_decode_contract = descriptor.row_decode_contract();
let mutation_row_decode_contract = row_decode_contract.clone();
let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
Ok((
row_decode_contract,
mutation_row_decode_contract,
schema_info,
schema_fingerprint,
))
}
thread_local! {
static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
RefCell::new(HashMap::default());
}
impl<C: CanisterKind> DbSession<C> {
#[must_use]
pub(crate) const fn new(db: Db<C>) -> Self {
Self {
db,
debug: false,
metrics: None,
}
}
#[must_use]
pub const fn new_with_hooks(
store: &'static LocalKey<StoreRegistry>,
entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
) -> Self {
Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
}
#[must_use]
pub const fn debug(mut self) -> Self {
self.debug = true;
self
}
#[must_use]
pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
self.metrics = Some(sink);
self
}
const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
where
E: EntityKind<Canister = C>,
{
FluentLoadQuery::new(self, Query::new(consistency))
}
fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
FluentDeleteQuery::new(self, Query::new(consistency).delete())
}
fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
if let Some(sink) = self.metrics {
with_metrics_sink(sink, f)
} else {
f()
}
}
fn execute_save_with<E, T, R>(
&self,
op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
map: impl FnOnce(T) -> R,
) -> Result<R, InternalError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (contract, schema_info, schema_fingerprint) = match self
.with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
{
Ok(authority) => authority,
Err(error) => {
self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
return Err(error);
}
};
let value = self.with_metrics(|| {
op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
})?;
Ok(map(value))
}
fn execute_save_with_checked_accepted_row_contract<E, T, R>(
&self,
accepted_row_decode_contract: AcceptedRowDecodeContract,
accepted_schema_info: SchemaInfo,
accepted_schema_fingerprint: CommitSchemaFingerprint,
op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
map: impl FnOnce(T) -> R,
) -> Result<R, InternalError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let value = self.with_metrics(|| {
op(self.save_executor::<E>(
accepted_row_decode_contract,
accepted_schema_info,
accepted_schema_fingerprint,
))
})?;
Ok(map(value))
}
fn execute_save_entity<E>(
&self,
op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
) -> Result<E, InternalError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_save_with(op, std::convert::identity)
}
fn execute_save_batch<E>(
&self,
op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
) -> Result<WriteBatchResponse<E>, InternalError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_save_with(op, WriteBatchResponse::new)
}
#[must_use]
pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
where
E: EntityKind<Canister = C>,
{
self.fluent_load_query(MissingRowPolicy::Ignore)
}
#[must_use]
pub const fn load_with_consistency<E>(
&self,
consistency: MissingRowPolicy,
) -> FluentLoadQuery<'_, E>
where
E: EntityKind<Canister = C>,
{
self.fluent_load_query(consistency)
}
#[must_use]
pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
self.fluent_delete_query(MissingRowPolicy::Ignore)
}
#[must_use]
pub fn delete_with_consistency<E>(
&self,
consistency: MissingRowPolicy,
) -> FluentDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
self.fluent_delete_query(consistency)
}
#[must_use]
pub const fn select_one(&self) -> Value {
Value::Int64(1)
}
#[must_use]
pub fn show_indexes<E>(&self) -> Vec<String>
where
E: EntityKind<Canister = C>,
{
self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
}
#[must_use]
pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
show_indexes_for_model(model)
}
pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
where
E: EntityKind<Canister = C>,
{
let schema = self.accepted_schema_info_for_entity::<E>()?;
Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
}
pub(in crate::db) fn show_indexes_for_store_model(
&self,
store_path: &str,
model: &'static EntityModel,
) -> Vec<String> {
let runtime_state = self
.db
.with_store_registry(|registry| registry.try_get_store(store_path).ok())
.map(|store| store.index_state());
show_indexes_for_model_with_runtime_state(model, runtime_state)
}
pub(in crate::db) fn show_indexes_for_store_schema_info(
&self,
store_path: &str,
schema: &SchemaInfo,
) -> Vec<String> {
let runtime_state = self
.db
.with_store_registry(|registry| registry.try_get_store(store_path).ok())
.map(|store| store.index_state());
show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
}
#[must_use]
pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
where
E: EntityKind<Canister = C>,
{
self.show_columns_for_model(E::MODEL)
}
#[must_use]
pub fn show_columns_for_model(
&self,
model: &'static EntityModel,
) -> Vec<EntityFieldDescription> {
describe_entity_fields(model)
}
pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
where
E: EntityKind<Canister = C>,
{
let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
Ok(describe_entity_fields_with_persisted_schema(&snapshot))
}
#[must_use]
pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
self.try_show_entities()
.expect("SHOW ENTITIES metadata requires accepted schema authority")
}
pub fn try_show_entities(
&self,
) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
self.db.runtime_entity_catalog()
}
#[must_use]
pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
self.db.runtime_store_catalog()
}
#[must_use]
pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
self.db.runtime_memory_catalog()
}
fn visible_indexes_for_store_accepted_schema(
&self,
store_path: &str,
schema_info: &SchemaInfo,
) -> Result<VisibleIndexes<'static>, QueryError> {
let store = self
.db
.recovered_store(store_path)
.map_err(QueryError::execute)?;
let state = store.index_state();
if state != IndexState::Ready {
return Ok(VisibleIndexes::none());
}
debug_assert_eq!(state, IndexState::Ready);
let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
debug_assert_eq!(
visible_indexes.accepted_expression_index_count(),
Some(visible_indexes.accepted_expression_indexes().len()),
);
Ok(visible_indexes)
}
#[must_use]
pub fn describe_entity<E>(&self) -> EntitySchemaDescription
where
E: EntityKind<Canister = C>,
{
self.describe_entity_model(E::MODEL)
}
#[must_use]
pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
describe_entity_model(model)
}
pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
where
E: EntityKind<Canister = C>,
{
let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
Ok(describe_entity_model_with_persisted_schema(
E::MODEL,
&snapshot,
))
}
fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
where
E: EntityKind<Canister = C>,
{
let store = self.db.recovered_store(E::Store::PATH)?;
store.with_schema_mut(|schema_store| {
ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
})
}
pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
&self,
) -> Result<AcceptedSchemaCatalogContext, InternalError>
where
E: EntityKind<Canister = C>,
{
let cache_key = (self.db.cache_scope_id(), E::PATH);
if let Some(entry) =
ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
{
return Ok(AcceptedSchemaCatalogContext {
snapshot: entry.snapshot,
fingerprint: entry.fingerprint,
});
}
let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
cache.borrow_mut().insert(
cache_key,
AcceptedSchemaQueryCacheEntry {
snapshot: snapshot.clone(),
fingerprint,
},
);
});
Ok(AcceptedSchemaCatalogContext {
snapshot,
fingerprint,
})
}
fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
where
E: EntityKind<Canister = C>,
{
let cache_key = (self.db.cache_scope_id(), E::PATH);
ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
cache.borrow_mut().remove(&cache_key);
});
}
fn load_accepted_schema_snapshot_for_query<E>(
&self,
) -> Result<AcceptedSchemaSnapshot, InternalError>
where
E: EntityKind<Canister = C>,
{
let store = self.db.recovered_store(E::Store::PATH)?;
store.with_schema_mut(|schema_store| {
if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
&accepted,
E::MODEL,
)
.is_ok()
{
return Ok(accepted);
}
}
ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
})
}
pub(in crate::db) fn accepted_schema_info_for_entity<E>(
&self,
) -> Result<SchemaInfo, InternalError>
where
E: EntityKind<Canister = C>,
{
let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
Ok(
SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
E::MODEL,
&accepted_schema,
true,
),
)
}
pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
accepted_schema: &AcceptedSchemaSnapshot,
) -> Result<EntityAuthority, InternalError>
where
E: EntityKind<Canister = C>,
{
EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
}
fn ensure_generated_compatible_accepted_save_schema<E>(
&self,
) -> Result<
(
AcceptedRowDecodeContract,
SchemaInfo,
CommitSchemaFingerprint,
),
InternalError,
>
where
E: EntityKind<Canister = C>,
{
let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
let (accepted_row_layout, _) =
AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
&accepted_schema,
E::MODEL,
)?;
let (row_decode_contract, _, schema_info, schema_fingerprint) =
accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
Ok((row_decode_contract, schema_info, schema_fingerprint))
}
pub fn storage_report(
&self,
name_to_path: &[(&'static str, &'static str)],
) -> Result<StorageReport, InternalError> {
self.db.storage_report(name_to_path)
}
pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
self.db.storage_report_default()
}
pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
self.db.integrity_report()
}
#[must_use]
pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
where
E: EntityKind<Canister = C> + EntityValue,
{
LoadExecutor::new(self.db, self.debug)
}
#[must_use]
pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
where
E: PersistedRow<Canister = C> + EntityValue,
{
DeleteExecutor::new(self.db)
}
#[must_use]
pub(in crate::db) const fn save_executor<E>(
&self,
accepted_row_decode_contract: AcceptedRowDecodeContract,
accepted_schema_info: SchemaInfo,
accepted_schema_fingerprint: CommitSchemaFingerprint,
) -> SaveExecutor<E>
where
E: PersistedRow<Canister = C> + EntityValue,
{
SaveExecutor::new_with_accepted_contract(
self.db,
self.debug,
accepted_row_decode_contract,
accepted_schema_info,
accepted_schema_fingerprint,
)
}
}