mod query;
mod response;
#[cfg(feature = "sql")]
mod sql;
#[cfg(all(test, feature = "sql"))]
mod tests;
mod write;
use crate::{
db::{
Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
QueryError, StorageReport, StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
commit::CommitSchemaFingerprint,
executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
query::plan::VisibleIndexes,
schema::{
AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, SchemaInfo, SchemaVersion,
accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
describe_entity_fields, describe_entity_fields_with_persisted_schema,
describe_entity_model, describe_entity_model_with_persisted_schema,
ensure_accepted_schema_snapshot, show_indexes_for_model,
show_indexes_for_model_with_runtime_state,
show_indexes_for_schema_info_with_runtime_state,
},
},
error::InternalError,
metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
model::entity::EntityModel,
traits::{CanisterKind, EntityKind, EntityValue, Path},
value::Value,
};
use std::{
cell::{OnceCell, RefCell},
collections::HashMap,
thread::LocalKey,
};
#[cfg(feature = "diagnostics")]
pub use query::{
DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
QueryExecutionAttribution,
};
pub(in crate::db) use response::finalize_structural_grouped_projection_result;
pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub use sql::{
SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
};
#[cfg(feature = "sql")]
pub use sql::{
SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport, SqlStatementResult,
SqlStatementSurface, sql_statement_entity_name, sql_statement_surface,
};
#[cfg(all(feature = "sql", feature = "diagnostics"))]
pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
pub struct DbSession<C: CanisterKind> {
db: Db<C>,
debug: bool,
metrics: Option<&'static dyn MetricsSink>,
}
#[cfg(test)]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
pub schema_info_projections: u64,
pub persisted_schema_decodes: u64,
pub generated_compatible_row_layout_proofs: u64,
pub latest_by_entity_calls: u64,
pub visible_index_projections: u64,
}
#[derive(Clone, Debug)]
struct AcceptedSchemaQueryCacheEntry {
snapshot: AcceptedSchemaSnapshot,
identity: AcceptedCatalogIdentity,
}
pub(in crate::db) type AcceptedSaveContract = (
AcceptedRowDecodeContract,
AcceptedRowDecodeContract,
SchemaInfo,
CommitSchemaFingerprint,
);
#[derive(Clone, Debug)]
pub(in crate::db) struct AcceptedSchemaCatalogContext {
snapshot: AcceptedSchemaSnapshot,
identity: AcceptedCatalogIdentity,
schema_info: OnceCell<SchemaInfo>,
}
impl AcceptedSchemaCatalogContext {
#[must_use]
pub(in crate::db) const fn new(
snapshot: AcceptedSchemaSnapshot,
identity: AcceptedCatalogIdentity,
) -> Self {
Self {
snapshot,
identity,
schema_info: OnceCell::new(),
}
}
#[must_use]
pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
&self.snapshot
}
#[must_use]
pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
self.identity.accepted_schema_version()
}
#[must_use]
pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
self.identity.accepted_schema_fingerprint()
}
#[must_use]
pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
self.identity.fingerprint_method_version()
}
fn debug_assert_matches_entity<E>(&self)
where
E: EntityKind,
{
debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
debug_assert_eq!(self.identity.entity_path(), E::PATH);
debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
}
pub(in crate::db) fn accepted_entity_authority_for<E>(
&self,
) -> Result<EntityAuthority, InternalError>
where
E: EntityKind,
{
self.debug_assert_matches_entity::<E>();
let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
let (accepted_row_layout, row_proof) =
AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
&self.snapshot,
authority.model(),
)?;
let row_decode_contract = accepted_row_layout.row_decode_contract();
Ok(authority.with_accepted_row_decode_contract(
row_proof,
row_decode_contract,
self.accepted_schema_info_for::<E>(),
))
}
#[must_use]
pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
where
E: EntityKind,
{
self.debug_assert_matches_entity::<E>();
self.schema_info
.get_or_init(|| {
SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
E::MODEL,
&self.snapshot,
true,
)
})
.clone()
}
}
pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
accepted_schema: &AcceptedSchemaSnapshot,
descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
) -> Result<AcceptedSaveContract, InternalError>
where
E: EntityKind,
{
let row_decode_contract = descriptor.row_decode_contract();
let mutation_row_decode_contract = row_decode_contract.clone();
let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
Ok((
row_decode_contract,
mutation_row_decode_contract,
schema_info,
schema_fingerprint,
))
}
thread_local! {
static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
RefCell::new(HashMap::default());
}
impl<C: CanisterKind> DbSession<C> {
#[must_use]
pub(crate) const fn new(db: Db<C>) -> Self {
Self {
db,
debug: false,
metrics: None,
}
}
#[must_use]
pub const fn new_with_hooks(
store: &'static LocalKey<StoreRegistry>,
entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
) -> Self {
Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
}
#[cfg(test)]
pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
query::reset_visible_index_projection_count_for_tests();
}
#[cfg(test)]
pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
-> AcceptedCatalogRuntimeCounterSnapshot {
AcceptedCatalogRuntimeCounterSnapshot {
schema_info_projections:
crate::db::schema::accepted_schema_info_projection_count_for_tests(),
persisted_schema_decodes:
crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
generated_compatible_row_layout_proofs:
crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
latest_by_entity_calls:
crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
visible_index_projections: query::visible_index_projection_count_for_tests(),
}
}
#[must_use]
pub const fn debug(mut self) -> Self {
self.debug = true;
self
}
#[must_use]
pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
self.metrics = Some(sink);
self
}
const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
where
E: EntityKind<Canister = C>,
{
FluentLoadQuery::new(self, Query::new(consistency))
}
fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
FluentDeleteQuery::new(self, Query::new(consistency).delete())
}
fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
if let Some(sink) = self.metrics {
with_metrics_sink(sink, f)
} else {
f()
}
}
fn execute_save_with<E, T, R>(
&self,
op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
map: impl FnOnce(T) -> R,
) -> Result<R, InternalError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (contract, schema_info, schema_fingerprint) = match self
.with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
{
Ok(authority) => authority,
Err(error) => {
self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
return Err(error);
}
};
let value = self.with_metrics(|| {
op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
})?;
Ok(map(value))
}
fn execute_save_with_checked_accepted_row_contract<E, T, R>(
&self,
accepted_row_decode_contract: AcceptedRowDecodeContract,
accepted_schema_info: SchemaInfo,
accepted_schema_fingerprint: CommitSchemaFingerprint,
op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
map: impl FnOnce(T) -> R,
) -> Result<R, InternalError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let value = self.with_metrics(|| {
op(self.save_executor::<E>(
accepted_row_decode_contract,
accepted_schema_info,
accepted_schema_fingerprint,
))
})?;
Ok(map(value))
}
fn execute_save_entity<E>(
&self,
op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
) -> Result<E, InternalError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_save_with(op, std::convert::identity)
}
fn execute_save_batch<E>(
&self,
op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
) -> Result<WriteBatchResponse<E>, InternalError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_save_with(op, WriteBatchResponse::new)
}
#[must_use]
pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
where
E: EntityKind<Canister = C>,
{
self.fluent_load_query(MissingRowPolicy::Ignore)
}
#[must_use]
pub const fn load_with_consistency<E>(
&self,
consistency: MissingRowPolicy,
) -> FluentLoadQuery<'_, E>
where
E: EntityKind<Canister = C>,
{
self.fluent_load_query(consistency)
}
#[must_use]
pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
self.fluent_delete_query(MissingRowPolicy::Ignore)
}
#[must_use]
pub fn delete_with_consistency<E>(
&self,
consistency: MissingRowPolicy,
) -> FluentDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
self.fluent_delete_query(consistency)
}
#[must_use]
pub const fn select_one(&self) -> Value {
Value::Int64(1)
}
#[must_use]
pub fn show_indexes<E>(&self) -> Vec<String>
where
E: EntityKind<Canister = C>,
{
self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
}
#[must_use]
pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
show_indexes_for_model(model)
}
pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
where
E: EntityKind<Canister = C>,
{
let schema = self.accepted_schema_info_for_entity::<E>()?;
Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
}
pub(in crate::db) fn show_indexes_for_store_model(
&self,
store_path: &str,
model: &'static EntityModel,
) -> Vec<String> {
let runtime_state = self
.db
.with_store_registry(|registry| registry.try_get_store(store_path).ok())
.map(|store| store.index_state());
show_indexes_for_model_with_runtime_state(model, runtime_state)
}
pub(in crate::db) fn show_indexes_for_store_schema_info(
&self,
store_path: &str,
schema: &SchemaInfo,
) -> Vec<String> {
let runtime_state = self
.db
.with_store_registry(|registry| registry.try_get_store(store_path).ok())
.map(|store| store.index_state());
show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
}
#[must_use]
pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
where
E: EntityKind<Canister = C>,
{
self.show_columns_for_model(E::MODEL)
}
#[must_use]
pub fn show_columns_for_model(
&self,
model: &'static EntityModel,
) -> Vec<EntityFieldDescription> {
describe_entity_fields(model)
}
pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
where
E: EntityKind<Canister = C>,
{
let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
Ok(describe_entity_fields_with_persisted_schema(&snapshot))
}
#[must_use]
pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
self.try_show_entities()
.expect("SHOW ENTITIES metadata requires accepted schema authority")
}
pub fn try_show_entities(
&self,
) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
self.db.runtime_entity_catalog()
}
#[must_use]
pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
self.db.runtime_store_catalog()
}
#[must_use]
pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
self.db.runtime_memory_catalog()
}
fn visible_indexes_for_store_accepted_schema(
&self,
store_path: &str,
schema_info: &SchemaInfo,
) -> Result<VisibleIndexes<'static>, QueryError> {
let store = self
.db
.recovered_store(store_path)
.map_err(QueryError::execute)?;
let state = store.index_state();
if state != IndexState::Ready {
return Ok(VisibleIndexes::none());
}
debug_assert_eq!(state, IndexState::Ready);
let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
debug_assert_eq!(
visible_indexes.accepted_expression_index_count(),
Some(visible_indexes.accepted_expression_indexes().len()),
);
Ok(visible_indexes)
}
#[must_use]
pub fn describe_entity<E>(&self) -> EntitySchemaDescription
where
E: EntityKind<Canister = C>,
{
self.describe_entity_model(E::MODEL)
}
#[must_use]
pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
describe_entity_model(model)
}
pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
where
E: EntityKind<Canister = C>,
{
let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
Ok(describe_entity_model_with_persisted_schema(
E::MODEL,
&snapshot,
))
}
fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
where
E: EntityKind<Canister = C>,
{
let store = self.db.recovered_store(E::Store::PATH)?;
store.with_schema_mut(|schema_store| {
ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
})
}
pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
&self,
) -> Result<AcceptedSchemaCatalogContext, InternalError>
where
E: EntityKind<Canister = C>,
{
let cache_key = (self.db.cache_scope_id(), E::PATH);
if let Some(entry) =
ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
{
return Ok(AcceptedSchemaCatalogContext::new(
entry.snapshot,
entry.identity,
));
}
let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
let identity = AcceptedCatalogIdentity::new(
E::ENTITY_TAG,
E::PATH,
E::Store::PATH,
snapshot.persisted_snapshot().version(),
fingerprint,
);
ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
cache.borrow_mut().insert(
cache_key,
AcceptedSchemaQueryCacheEntry {
snapshot: snapshot.clone(),
identity,
},
);
});
Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
}
pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
&self,
) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
where
E: EntityKind<Canister = C>,
{
let store = self.db.recovered_store(E::Store::PATH)?;
store.with_schema_mut(|schema_store| {
schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
})
}
pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
&self,
selection: &AcceptedCatalogSnapshotSelection,
) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
where
E: EntityKind<Canister = C>,
{
let snapshot = selection.decode_verified()?;
if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
return Ok(None);
}
let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
let cache_key = (self.db.cache_scope_id(), E::PATH);
ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
cache.borrow_mut().insert(
cache_key,
AcceptedSchemaQueryCacheEntry {
snapshot,
identity: selection.identity(),
},
);
});
Ok(Some(context))
}
fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
where
E: EntityKind<Canister = C>,
{
let cache_key = (self.db.cache_scope_id(), E::PATH);
ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
cache.borrow_mut().remove(&cache_key);
});
}
fn load_accepted_schema_snapshot_for_query<E>(
&self,
) -> Result<AcceptedSchemaSnapshot, InternalError>
where
E: EntityKind<Canister = C>,
{
let store = self.db.recovered_store(E::Store::PATH)?;
store.with_schema_mut(|schema_store| {
if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
&accepted,
E::MODEL,
)
.is_ok()
{
return Ok(accepted);
}
}
ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
})
}
pub(in crate::db) fn accepted_schema_info_for_entity<E>(
&self,
) -> Result<SchemaInfo, InternalError>
where
E: EntityKind<Canister = C>,
{
let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
Ok(catalog.accepted_schema_info_for::<E>())
}
pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
accepted_schema: &AcceptedSchemaSnapshot,
) -> Result<EntityAuthority, InternalError>
where
E: EntityKind<Canister = C>,
{
EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
}
fn ensure_generated_compatible_accepted_save_schema<E>(
&self,
) -> Result<
(
AcceptedRowDecodeContract,
SchemaInfo,
CommitSchemaFingerprint,
),
InternalError,
>
where
E: EntityKind<Canister = C>,
{
let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
let (accepted_row_layout, _) =
AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
&accepted_schema,
E::MODEL,
)?;
let (row_decode_contract, _, schema_info, schema_fingerprint) =
accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
Ok((row_decode_contract, schema_info, schema_fingerprint))
}
pub fn storage_report(
&self,
name_to_path: &[(&'static str, &'static str)],
) -> Result<StorageReport, InternalError> {
self.db.storage_report(name_to_path)
}
pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
self.db.storage_report_default()
}
pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
self.db.integrity_report()
}
#[must_use]
pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
where
E: EntityKind<Canister = C> + EntityValue,
{
LoadExecutor::new(self.db, self.debug)
}
#[must_use]
pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
where
E: PersistedRow<Canister = C> + EntityValue,
{
DeleteExecutor::new(self.db)
}
#[must_use]
pub(in crate::db) const fn save_executor<E>(
&self,
accepted_row_decode_contract: AcceptedRowDecodeContract,
accepted_schema_info: SchemaInfo,
accepted_schema_fingerprint: CommitSchemaFingerprint,
) -> SaveExecutor<E>
where
E: PersistedRow<Canister = C> + EntityValue,
{
SaveExecutor::new_with_accepted_contract(
self.db,
self.debug,
accepted_row_decode_contract,
accepted_schema_info,
accepted_schema_fingerprint,
)
}
}