pub mod delete;
pub(crate) mod generated;
pub mod load;
mod macros;
#[cfg(all(test, feature = "sql"))]
mod tests;
#[cfg(feature = "sql")]
use crate::db::{
SqlStatementRoute,
sql::{
SqlGroupedRowsOutput, SqlProjectionRows, SqlQueryResult, SqlQueryRowsOutput,
render_value_text,
},
};
use crate::{
db::{
EntityFieldDescription, EntitySchemaDescription, PersistedRow, StorageReport,
query::{MissingRowPolicy, Query, QueryTracePlan},
response::{PagedGroupedResponse, Response, WriteBatchResponse, WriteResponse},
},
error::Error,
metrics::MetricsSink,
model::entity::EntityModel,
traits::{CanisterKind, EntityKind, EntityValue},
value::Value,
};
use icydb_core as core;
pub use delete::SessionDeleteQuery;
pub use load::{FluentLoadQuery, PagedLoadQuery};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum MutationMode {
Insert,
Replace,
Update,
}
impl MutationMode {
const fn into_core(self) -> core::db::MutationMode {
match self {
Self::Insert => core::db::MutationMode::Insert,
Self::Replace => core::db::MutationMode::Replace,
Self::Update => core::db::MutationMode::Update,
}
}
}
#[derive(Default)]
pub struct UpdatePatch {
inner: core::db::UpdatePatch,
}
impl UpdatePatch {
#[must_use]
pub const fn new() -> Self {
Self {
inner: core::db::UpdatePatch::new(),
}
}
pub fn set_field(
mut self,
model: &'static EntityModel,
field_name: &str,
value: Value,
) -> Result<Self, Error> {
self.inner = self.inner.set_field(model, field_name, value)?;
Ok(self)
}
}
#[cfg(feature = "sql")]
pub struct SqlParsedStatement {
inner: core::db::SqlParsedStatement,
}
#[cfg(feature = "sql")]
impl SqlParsedStatement {
#[must_use]
const fn from_core(inner: core::db::SqlParsedStatement) -> Self {
Self { inner }
}
#[must_use]
pub const fn route(&self) -> &SqlStatementRoute {
self.inner.route()
}
}
pub struct DbSession<C: CanisterKind> {
inner: core::db::DbSession<C>,
}
impl<C: CanisterKind> DbSession<C> {
const fn response_from_core<E>(inner: core::db::EntityResponse<E>) -> Response<E>
where
E: EntityKind,
{
Response::from_core(inner)
}
const fn write_response<E>(entity: E) -> WriteResponse<E>
where
E: EntityKind,
{
WriteResponse::new(entity)
}
fn write_batch_response<E>(
inner: icydb_core::db::WriteBatchResponse<E>,
) -> WriteBatchResponse<E>
where
E: EntityKind,
{
WriteBatchResponse::from_core(inner)
}
#[must_use]
pub const fn new(session: core::db::DbSession<C>) -> Self {
Self { inner: session }
}
#[must_use]
pub const fn debug(mut self) -> Self {
self.inner = self.inner.debug();
self
}
#[must_use]
pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
self.inner = self.inner.metrics_sink(sink);
self
}
#[must_use]
pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
FluentLoadQuery {
inner: self.inner.load::<E>(),
}
}
#[must_use]
pub const fn load_with_consistency<E>(
&self,
consistency: MissingRowPolicy,
) -> FluentLoadQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
FluentLoadQuery {
inner: self.inner.load_with_consistency::<E>(consistency),
}
}
#[cfg(feature = "sql")]
pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, Error>
where
E: EntityKind<Canister = C>,
{
Ok(self.inner.query_from_sql::<E>(sql)?)
}
#[cfg(feature = "sql")]
pub fn sql_statement_route(&self, sql: &str) -> Result<SqlStatementRoute, Error> {
let parsed = self.parse_sql_statement(sql)?;
Ok(parsed.route().clone())
}
#[cfg(feature = "sql")]
pub fn parse_sql_statement(&self, sql: &str) -> Result<SqlParsedStatement, Error> {
Ok(SqlParsedStatement::from_core(
self.inner.parse_sql_statement(sql)?,
))
}
#[cfg(feature = "sql")]
pub fn execute_sql<E>(&self, sql: &str) -> Result<Response<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(Self::response_from_core(self.inner.execute_sql::<E>(sql)?))
}
#[cfg(feature = "sql")]
pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlQueryResult, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let parsed = self.parse_sql_statement(sql)?;
self.execute_sql_dispatch_parsed::<E>(&parsed)
}
#[cfg(feature = "sql")]
pub fn execute_sql_dispatch_parsed<E>(
&self,
parsed: &SqlParsedStatement,
) -> Result<SqlQueryResult, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let result = self.inner.execute_sql_dispatch_parsed::<E>(&parsed.inner)?;
Ok(Self::map_sql_dispatch_result(
result,
E::MODEL.name().to_string(),
))
}
#[cfg(feature = "sql")]
pub(crate) fn map_sql_dispatch_result(
result: core::db::SqlDispatchResult,
entity_name: String,
) -> SqlQueryResult {
match result {
core::db::SqlDispatchResult::Projection {
columns,
rows,
row_count,
} => {
let rows = Self::projection_rows_from_values(columns, rows, row_count);
Self::projection_sql_query_result(entity_name, rows)
}
core::db::SqlDispatchResult::ProjectionText {
columns,
rows,
row_count,
} => Self::projection_sql_query_result(
entity_name,
SqlProjectionRows::new(columns, rows, row_count),
),
core::db::SqlDispatchResult::Grouped {
columns,
rows,
row_count,
next_cursor,
} => SqlQueryResult::Grouped(SqlGroupedRowsOutput {
entity: entity_name,
columns,
rows: Self::grouped_rows_from_values(rows),
row_count,
next_cursor,
}),
core::db::SqlDispatchResult::Explain(explain) => SqlQueryResult::Explain {
entity: entity_name,
explain,
},
core::db::SqlDispatchResult::Describe(description) => {
SqlQueryResult::Describe(description)
}
core::db::SqlDispatchResult::ShowIndexes(indexes) => SqlQueryResult::ShowIndexes {
entity: entity_name,
indexes,
},
core::db::SqlDispatchResult::ShowColumns(columns) => SqlQueryResult::ShowColumns {
entity: entity_name,
columns,
},
core::db::SqlDispatchResult::ShowEntities(entities) => {
SqlQueryResult::ShowEntities { entities }
}
}
}
#[cfg(feature = "sql")]
fn projection_sql_query_result(entity_name: String, rows: SqlProjectionRows) -> SqlQueryResult {
SqlQueryResult::Projection(SqlQueryRowsOutput::from_projection(entity_name, rows))
}
#[cfg(feature = "sql")]
fn projection_rows_from_values(
columns: Vec<String>,
rows: Vec<Vec<Value>>,
row_count: u32,
) -> SqlProjectionRows {
let mut rendered_rows = Vec::with_capacity(rows.len());
let mut max_column_count = 0usize;
for row in rows {
let rendered_row = Self::render_sql_value_row(row);
max_column_count = max_column_count.max(rendered_row.len());
rendered_rows.push(rendered_row);
}
let columns = if max_column_count == 0 || columns.len() == max_column_count {
columns
} else {
Self::projection_columns(max_column_count)
};
SqlProjectionRows::new(columns, rendered_rows, row_count)
}
#[cfg(feature = "sql")]
fn grouped_rows_from_values(rows: Vec<core::db::GroupedRow>) -> Vec<Vec<String>> {
let mut rendered_rows = Vec::with_capacity(rows.len());
for row in rows {
let mut rendered_row =
Vec::with_capacity(row.group_key().len() + row.aggregate_values().len());
Self::render_sql_values_into(row.group_key(), &mut rendered_row);
Self::render_sql_values_into(row.aggregate_values(), &mut rendered_row);
rendered_rows.push(rendered_row);
}
rendered_rows
}
#[cfg(feature = "sql")]
fn render_sql_value_row(row: Vec<Value>) -> Vec<String> {
let mut rendered_row = Vec::with_capacity(row.len());
Self::render_sql_values_into(&row, &mut rendered_row);
rendered_row
}
#[cfg(feature = "sql")]
fn render_sql_values_into(values: &[Value], rendered_row: &mut Vec<String>) {
for value in values {
rendered_row.push(render_value_text(value));
}
}
#[cfg(feature = "sql")]
fn projection_columns(column_count: usize) -> Vec<String> {
(0..column_count)
.map(|index| format!("col_{index}"))
.collect()
}
pub(crate) const fn paged_grouped_response(
rows: Vec<core::db::GroupedRow>,
next_cursor: Option<String>,
execution_trace: Option<core::db::ExecutionTrace>,
) -> PagedGroupedResponse {
PagedGroupedResponse::new(rows, next_cursor, execution_trace)
}
#[cfg(feature = "sql")]
pub fn execute_sql_aggregate<E>(&self, sql: &str) -> Result<crate::value::Value, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(self.inner.execute_sql_aggregate::<E>(sql)?)
}
#[cfg(feature = "sql")]
pub fn execute_sql_grouped<E>(
&self,
sql: &str,
cursor_token: Option<&str>,
) -> Result<PagedGroupedResponse, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (rows, next_cursor, execution_trace) = self
.inner
.execute_sql_grouped_text_cursor::<E>(sql, cursor_token)?;
Ok(Self::paged_grouped_response(
rows,
next_cursor,
execution_trace,
))
}
#[must_use]
pub fn delete<E>(&self) -> SessionDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
SessionDeleteQuery {
inner: self.inner.delete::<E>(),
}
}
#[must_use]
pub fn delete_with_consistency<E>(
&self,
consistency: MissingRowPolicy,
) -> SessionDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
SessionDeleteQuery {
inner: self.inner.delete_with_consistency::<E>(consistency),
}
}
#[must_use]
pub fn show_indexes<E>(&self) -> Vec<String>
where
E: EntityKind<Canister = C>,
{
self.inner.show_indexes::<E>()
}
#[must_use]
pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
where
E: EntityKind<Canister = C>,
{
self.inner.show_columns::<E>()
}
#[must_use]
pub fn show_entities(&self) -> Vec<String> {
self.inner.show_entities()
}
#[must_use]
pub fn describe_entity<E>(&self) -> EntitySchemaDescription
where
E: EntityKind<Canister = C>,
{
self.inner.describe_entity::<E>()
}
pub fn storage_report(
&self,
name_to_path: &[(&'static str, &'static str)],
) -> Result<StorageReport, Error> {
Ok(self.inner.storage_report(name_to_path)?)
}
pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(Self::response_from_core(self.inner.execute_query(query)?))
}
pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, Error>
where
E: EntityKind<Canister = C>,
{
Ok(self.inner.trace_query(query)?)
}
pub fn execute_grouped<E>(
&self,
query: &Query<E>,
cursor_token: Option<&str>,
) -> Result<PagedGroupedResponse, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (rows, next_cursor, execution_trace) = self
.inner
.execute_grouped_text_cursor(query, cursor_token)?;
Ok(Self::paged_grouped_response(
rows,
next_cursor,
execution_trace,
))
}
pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(Self::write_response(self.inner.insert(entity)?))
}
pub fn insert_many_atomic<E>(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<WriteBatchResponse<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(Self::write_batch_response(
self.inner.insert_many_atomic(entities)?,
))
}
pub fn insert_many_non_atomic<E>(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<WriteBatchResponse<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(Self::write_batch_response(
self.inner.insert_many_non_atomic(entities)?,
))
}
pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(Self::write_response(self.inner.replace(entity)?))
}
pub fn replace_many_atomic<E>(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<WriteBatchResponse<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(Self::write_batch_response(
self.inner.replace_many_atomic(entities)?,
))
}
pub fn replace_many_non_atomic<E>(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<WriteBatchResponse<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(Self::write_batch_response(
self.inner.replace_many_non_atomic(entities)?,
))
}
pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(Self::write_response(self.inner.update(entity)?))
}
pub fn mutate_structural<E>(
&self,
key: E::Key,
patch: UpdatePatch,
mode: MutationMode,
) -> Result<WriteResponse<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(Self::write_response(self.inner.mutate_structural::<E>(
key,
patch.inner,
mode.into_core(),
)?))
}
pub fn update_many_atomic<E>(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<WriteBatchResponse<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(Self::write_batch_response(
self.inner.update_many_atomic(entities)?,
))
}
pub fn update_many_non_atomic<E>(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<WriteBatchResponse<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(Self::write_batch_response(
self.inner.update_many_non_atomic(entities)?,
))
}
}