mod query;
#[cfg(feature = "sql")]
mod sql;
#[cfg(all(test, feature = "sql"))]
mod tests;
mod write;
use crate::{
db::{
Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy, PersistedRow, Query,
QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
commit::EntityRuntimeHooks,
cursor::decode_optional_cursor_token,
executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
schema::{describe_entity_model, show_indexes_for_model},
},
error::InternalError,
metrics::sink::{MetricsSink, with_metrics_sink},
model::entity::EntityModel,
traits::{CanisterKind, EntityKind, EntityValue},
value::Value,
};
use std::thread::LocalKey;
#[cfg(feature = "sql")]
pub use sql::{SqlDispatchResult, SqlParsedStatement, SqlStatementRoute};
fn decode_optional_cursor_bytes(cursor_token: Option<&str>) -> Result<Option<Vec<u8>>, QueryError> {
decode_optional_cursor_token(cursor_token).map_err(QueryError::from_cursor_plan_error)
}
pub struct DbSession<C: CanisterKind> {
db: Db<C>,
debug: bool,
metrics: Option<&'static dyn MetricsSink>,
}
impl<C: CanisterKind> DbSession<C> {
#[must_use]
pub(crate) const fn new(db: Db<C>) -> Self {
Self {
db,
debug: false,
metrics: None,
}
}
#[must_use]
pub const fn new_with_hooks(
store: &'static LocalKey<StoreRegistry>,
entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
) -> Self {
Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
}
#[must_use]
pub const fn debug(mut self) -> Self {
self.debug = true;
self
}
#[must_use]
pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
self.metrics = Some(sink);
self
}
fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
if let Some(sink) = self.metrics {
with_metrics_sink(sink, f)
} else {
f()
}
}
fn execute_save_with<E, T, R>(
&self,
op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
map: impl FnOnce(T) -> R,
) -> Result<R, InternalError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
Ok(map(value))
}
fn execute_save_entity<E>(
&self,
op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
) -> Result<E, InternalError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_save_with(op, std::convert::identity)
}
fn execute_save_batch<E>(
&self,
op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
) -> Result<WriteBatchResponse<E>, InternalError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_save_with(op, WriteBatchResponse::new)
}
#[must_use]
pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
where
E: EntityKind<Canister = C>,
{
FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
}
#[must_use]
pub const fn load_with_consistency<E>(
&self,
consistency: MissingRowPolicy,
) -> FluentLoadQuery<'_, E>
where
E: EntityKind<Canister = C>,
{
FluentLoadQuery::new(self, Query::new(consistency))
}
#[must_use]
pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
}
#[must_use]
pub fn delete_with_consistency<E>(
&self,
consistency: MissingRowPolicy,
) -> FluentDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
FluentDeleteQuery::new(self, Query::new(consistency).delete())
}
#[must_use]
pub const fn select_one(&self) -> Value {
Value::Int(1)
}
#[must_use]
pub fn show_indexes<E>(&self) -> Vec<String>
where
E: EntityKind<Canister = C>,
{
self.show_indexes_for_model(E::MODEL)
}
#[must_use]
pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
show_indexes_for_model(model)
}
#[must_use]
pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
where
E: EntityKind<Canister = C>,
{
self.show_columns_for_model(E::MODEL)
}
#[must_use]
pub fn show_columns_for_model(
&self,
model: &'static EntityModel,
) -> Vec<EntityFieldDescription> {
describe_entity_model(model).fields().to_vec()
}
#[must_use]
pub fn show_entities(&self) -> Vec<String> {
self.db.runtime_entity_names()
}
#[must_use]
pub fn describe_entity<E>(&self) -> EntitySchemaDescription
where
E: EntityKind<Canister = C>,
{
self.describe_entity_model(E::MODEL)
}
#[must_use]
pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
describe_entity_model(model)
}
pub fn storage_report(
&self,
name_to_path: &[(&'static str, &'static str)],
) -> Result<StorageReport, InternalError> {
self.db.storage_report(name_to_path)
}
pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
self.db.integrity_report()
}
pub fn execute_migration_plan(
&self,
plan: &MigrationPlan,
max_steps: usize,
) -> Result<MigrationRunOutcome, InternalError> {
self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
}
#[must_use]
pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
where
E: EntityKind<Canister = C> + EntityValue,
{
LoadExecutor::new(self.db, self.debug)
}
#[must_use]
pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
where
E: PersistedRow<Canister = C> + EntityValue,
{
DeleteExecutor::new(self.db, self.debug)
}
#[must_use]
pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
where
E: PersistedRow<Canister = C> + EntityValue,
{
SaveExecutor::new(self.db, self.debug)
}
}