pub mod delete;
pub(crate) mod generated;
pub mod load;
mod macros;
#[cfg(feature = "sql")]
use crate::db::sql::{SqlProjectionRows, SqlQueryResult, SqlQueryRowsOutput, render_value_text};
use crate::{
db::{
EntityFieldDescription, EntitySchemaDescription, PersistedRow, StorageReport,
query::{MissingRowPolicy, Query, QueryTracePlan},
response::QueryResponse,
},
error::{Error, ErrorKind, ErrorOrigin, RuntimeErrorKind},
metrics::MetricsSink,
model::entity::EntityModel,
traits::{CanisterKind, EntityKind, EntityValue, Path},
value::Value,
};
use icydb_core as core;
pub use delete::SessionDeleteQuery;
pub use load::{FluentLoadQuery, PagedLoadQuery};
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum MutationMode {
Insert,
Replace,
Update,
}
impl MutationMode {
const fn into_core(self) -> core::db::MutationMode {
match self {
Self::Insert => core::db::MutationMode::Insert,
Self::Replace => core::db::MutationMode::Replace,
Self::Update => core::db::MutationMode::Update,
}
}
}
#[derive(Default)]
pub struct UpdatePatch {
inner: core::db::UpdatePatch,
}
impl UpdatePatch {
#[must_use]
pub const fn new() -> Self {
Self {
inner: core::db::UpdatePatch::new(),
}
}
pub fn set_field(
mut self,
model: &'static EntityModel,
field_name: &str,
value: Value,
) -> Result<Self, Error> {
self.inner = self.inner.set_field(model, field_name, value)?;
Ok(self)
}
}
pub struct DbSession<C: CanisterKind> {
inner: core::db::DbSession<C>,
}
#[cfg(all(feature = "sql", feature = "diagnostics"))]
#[expect(clippy::missing_const_for_fn)]
fn read_sql_response_decode_local_instruction_counter() -> u64 {
#[cfg(target_arch = "wasm32")]
{
canic_cdk::api::performance_counter(1)
}
#[cfg(not(target_arch = "wasm32"))]
{
0
}
}
#[cfg(all(feature = "sql", feature = "diagnostics"))]
fn measure_sql_response_decode_stage<T>(run: impl FnOnce() -> T) -> (u64, T) {
let start = read_sql_response_decode_local_instruction_counter();
let result = run();
let delta = read_sql_response_decode_local_instruction_counter().saturating_sub(start);
(delta, result)
}
#[cfg(all(feature = "sql", feature = "diagnostics"))]
const fn finalize_public_sql_query_attribution(
mut attribution: crate::db::SqlQueryExecutionAttribution,
response_decode_local_instructions: u64,
) -> crate::db::SqlQueryExecutionAttribution {
attribution.response_decode_local_instructions = response_decode_local_instructions;
attribution.execute_local_instructions = attribution
.planner_local_instructions
.saturating_add(attribution.store_local_instructions)
.saturating_add(attribution.executor_local_instructions)
.saturating_add(response_decode_local_instructions);
attribution.total_local_instructions = attribution
.compile_local_instructions
.saturating_add(attribution.execute_local_instructions);
attribution
}
impl<C: CanisterKind> DbSession<C> {
fn query_response_from_core<E>(inner: core::db::LoadQueryResult<E>) -> QueryResponse<E>
where
E: EntityKind,
{
QueryResponse::from_core(inner)
}
#[must_use]
pub const fn new(session: core::db::DbSession<C>) -> Self {
Self { inner: session }
}
#[must_use]
pub const fn debug(mut self) -> Self {
self.inner = self.inner.debug();
self
}
#[must_use]
pub fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
self.inner = self.inner.metrics_sink(sink);
self
}
#[must_use]
pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
FluentLoadQuery {
inner: self.inner.load::<E>(),
}
}
#[must_use]
pub const fn load_with_consistency<E>(
&self,
consistency: MissingRowPolicy,
) -> FluentLoadQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
FluentLoadQuery {
inner: self.inner.load_with_consistency::<E>(consistency),
}
}
#[cfg(feature = "diagnostics")]
#[doc(hidden)]
pub fn execute_query_result_with_attribution<E>(
&self,
query: &Query<E>,
) -> Result<(QueryResponse<E>, crate::db::QueryExecutionAttribution), Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (result, attribution) = self.inner.execute_query_result_with_attribution(query)?;
Ok((Self::query_response_from_core(result), attribution))
}
#[cfg(feature = "sql")]
pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlQueryResult, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(crate::db::sql::sql_query_result_from_statement(
self.inner.execute_sql_query::<E>(sql)?,
E::MODEL.name().to_string(),
))
}
#[cfg(all(feature = "sql", feature = "diagnostics"))]
#[doc(hidden)]
pub fn execute_sql_query_with_attribution<E>(
&self,
sql: &str,
) -> Result<(SqlQueryResult, crate::db::SqlQueryExecutionAttribution), Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (result, mut attribution) = self.inner.execute_sql_query_with_attribution::<E>(sql)?;
let entity_name = E::MODEL.name().to_string();
let (response_decode_local_instructions, result) =
measure_sql_response_decode_stage(|| {
crate::db::sql::sql_query_result_from_statement(result, entity_name)
});
attribution =
finalize_public_sql_query_attribution(attribution, response_decode_local_instructions);
Ok((result, attribution))
}
#[cfg(feature = "sql")]
pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlQueryResult, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(crate::db::sql::sql_query_result_from_statement(
self.inner.execute_sql_update::<E>(sql)?,
E::MODEL.name().to_string(),
))
}
#[cfg(feature = "sql")]
fn projection_selection<E>(
selected_fields: Option<&[String]>,
) -> Result<(Vec<String>, Vec<usize>), Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
match selected_fields {
None => Ok((
E::MODEL
.fields()
.iter()
.map(|field| field.name().to_string())
.collect(),
(0..E::MODEL.fields().len()).collect(),
)),
Some(fields) => {
let mut indices = Vec::with_capacity(fields.len());
for field in fields {
let index = E::MODEL
.fields()
.iter()
.position(|candidate| candidate.name() == field.as_str())
.ok_or_else(|| {
Error::new(
ErrorKind::Runtime(RuntimeErrorKind::Unsupported),
ErrorOrigin::Query,
format!(
"RETURNING field '{field}' does not exist on the target entity '{}'",
E::PATH
),
)
})?;
indices.push(index);
}
Ok((fields.to_vec(), indices))
}
}
}
#[cfg(feature = "sql")]
pub(crate) fn sql_query_rows_output_from_entities<E>(
entity_name: String,
entities: Vec<E>,
selected_fields: Option<&[String]>,
) -> Result<SqlQueryRowsOutput, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (columns, indices) = Self::projection_selection::<E>(selected_fields)?;
let mut rows = Vec::with_capacity(entities.len());
for entity in entities {
let mut rendered = Vec::with_capacity(indices.len());
for index in &indices {
let value = entity.get_value_by_index(*index).ok_or_else(|| {
Error::new(
ErrorKind::Runtime(RuntimeErrorKind::Internal),
ErrorOrigin::Query,
format!(
"RETURNING projection row must align with declared columns: entity='{}' index={index}",
E::PATH
),
)
})?;
rendered.push(render_value_text(&value));
}
rows.push(rendered);
}
let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
Ok(SqlQueryRowsOutput::from_projection(
entity_name,
SqlProjectionRows::new(columns, rows, row_count),
))
}
#[must_use]
pub fn delete<E>(&self) -> SessionDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
SessionDeleteQuery {
inner: self.inner.delete::<E>(),
}
}
#[must_use]
pub fn delete_with_consistency<E>(
&self,
consistency: MissingRowPolicy,
) -> SessionDeleteQuery<'_, E>
where
E: PersistedRow<Canister = C>,
{
SessionDeleteQuery {
inner: self.inner.delete_with_consistency::<E>(consistency),
}
}
#[must_use]
pub fn show_indexes<E>(&self) -> Vec<String>
where
E: EntityKind<Canister = C>,
{
self.inner.show_indexes::<E>()
}
#[must_use]
pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
where
E: EntityKind<Canister = C>,
{
self.inner.show_columns::<E>()
}
#[must_use]
pub fn show_entities(&self) -> Vec<String> {
self.inner.show_entities()
}
#[must_use]
pub fn show_tables(&self) -> Vec<String> {
self.inner.show_tables()
}
#[must_use]
pub fn describe_entity<E>(&self) -> EntitySchemaDescription
where
E: EntityKind<Canister = C>,
{
self.inner.describe_entity::<E>()
}
pub fn storage_report(
&self,
name_to_path: &[(&'static str, &'static str)],
) -> Result<StorageReport, Error> {
Ok(self.inner.storage_report(name_to_path)?)
}
pub fn execute_query<E>(&self, query: &Query<E>) -> Result<QueryResponse<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(Self::query_response_from_core(
self.inner.execute_query_result(query)?,
))
}
pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, Error>
where
E: EntityKind<Canister = C>,
{
Ok(self.inner.trace_query(query)?)
}
pub fn insert<E>(&self, entity: E) -> Result<E, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(self.inner.insert(entity)?)
}
#[cfg(feature = "sql")]
pub fn insert_returning_all<E>(&self, entity: E) -> Result<SqlQueryRowsOutput, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let entity = self.inner.insert(entity)?;
Self::sql_query_rows_output_from_entities::<E>(E::PATH.to_string(), vec![entity], None)
}
#[cfg(feature = "sql")]
pub fn insert_returning<E, I, S>(
&self,
entity: E,
fields: I,
) -> Result<SqlQueryRowsOutput, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let entity = self.inner.insert(entity)?;
let fields = fields
.into_iter()
.map(|field| field.as_ref().to_string())
.collect::<Vec<_>>();
Self::sql_query_rows_output_from_entities::<E>(
E::PATH.to_string(),
vec![entity],
Some(fields.as_slice()),
)
}
pub fn create<I>(&self, input: I) -> Result<I::Entity, Error>
where
I: crate::traits::EntityCreateInput,
I::Entity: PersistedRow<Canister = C> + EntityValue,
{
Ok(self.inner.create(input)?)
}
#[cfg(feature = "sql")]
pub fn create_returning_all<I>(&self, input: I) -> Result<SqlQueryRowsOutput, Error>
where
I: crate::traits::EntityCreateInput,
I::Entity: PersistedRow<Canister = C> + EntityValue,
{
let entity = self.inner.create(input)?;
Self::sql_query_rows_output_from_entities::<I::Entity>(
I::Entity::PATH.to_string(),
vec![entity],
None,
)
}
#[cfg(feature = "sql")]
pub fn create_returning<I, F, S>(
&self,
input: I,
fields: F,
) -> Result<SqlQueryRowsOutput, Error>
where
I: crate::traits::EntityCreateInput,
I::Entity: PersistedRow<Canister = C> + EntityValue,
F: IntoIterator<Item = S>,
S: AsRef<str>,
{
let entity = self.inner.create(input)?;
let fields = fields
.into_iter()
.map(|field| field.as_ref().to_string())
.collect::<Vec<_>>();
Self::sql_query_rows_output_from_entities::<I::Entity>(
I::Entity::PATH.to_string(),
vec![entity],
Some(fields.as_slice()),
)
}
pub fn insert_many_atomic<E>(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(self.inner.insert_many_atomic(entities)?.entities())
}
pub fn insert_many_non_atomic<E>(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(self.inner.insert_many_non_atomic(entities)?.entities())
}
pub fn replace<E>(&self, entity: E) -> Result<E, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(self.inner.replace(entity)?)
}
pub fn replace_many_atomic<E>(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(self.inner.replace_many_atomic(entities)?.entities())
}
pub fn replace_many_non_atomic<E>(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(self.inner.replace_many_non_atomic(entities)?.entities())
}
pub fn update<E>(&self, entity: E) -> Result<E, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(self.inner.update(entity)?)
}
#[cfg(feature = "sql")]
pub fn update_returning_all<E>(&self, entity: E) -> Result<SqlQueryRowsOutput, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let entity = self.inner.update(entity)?;
Self::sql_query_rows_output_from_entities::<E>(E::PATH.to_string(), vec![entity], None)
}
#[cfg(feature = "sql")]
pub fn update_returning<E, I, S>(
&self,
entity: E,
fields: I,
) -> Result<SqlQueryRowsOutput, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
I: IntoIterator<Item = S>,
S: AsRef<str>,
{
let entity = self.inner.update(entity)?;
let fields = fields
.into_iter()
.map(|field| field.as_ref().to_string())
.collect::<Vec<_>>();
Self::sql_query_rows_output_from_entities::<E>(
E::PATH.to_string(),
vec![entity],
Some(fields.as_slice()),
)
}
pub fn mutate_structural<E>(
&self,
key: E::Key,
patch: UpdatePatch,
mode: MutationMode,
) -> Result<E, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(self
.inner
.mutate_structural::<E>(key, patch.inner, mode.into_core())?)
}
pub fn update_many_atomic<E>(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(self.inner.update_many_atomic(entities)?.entities())
}
pub fn update_many_non_atomic<E>(
&self,
entities: impl IntoIterator<Item = E>,
) -> Result<Vec<E>, Error>
where
E: PersistedRow<Canister = C> + EntityValue,
{
Ok(self.inner.update_many_non_atomic(entities)?.entities())
}
}
#[cfg(all(test, feature = "sql", feature = "diagnostics"))]
mod tests {
use super::finalize_public_sql_query_attribution;
use crate::db::SqlQueryExecutionAttribution;
#[test]
fn public_sql_perf_attribution_total_stays_exhaustive_after_decode_finalize() {
let finalized = finalize_public_sql_query_attribution(
SqlQueryExecutionAttribution {
compile_local_instructions: 11,
compile_cache_key_local_instructions: 0,
compile_cache_lookup_local_instructions: 1,
compile_parse_local_instructions: 2,
compile_parse_tokenize_local_instructions: 1,
compile_parse_select_local_instructions: 1,
compile_parse_expr_local_instructions: 0,
compile_parse_predicate_local_instructions: 0,
compile_aggregate_lane_check_local_instructions: 0,
compile_prepare_local_instructions: 3,
compile_lower_local_instructions: 4,
compile_bind_local_instructions: 1,
compile_cache_insert_local_instructions: 0,
planner_local_instructions: 13,
store_local_instructions: 17,
executor_local_instructions: 17,
pure_covering_decode_local_instructions: 0,
pure_covering_row_assembly_local_instructions: 0,
grouped_stream_local_instructions: 0,
grouped_fold_local_instructions: 0,
grouped_finalize_local_instructions: 0,
grouped_count_borrowed_hash_computations: 0,
grouped_count_bucket_candidate_checks: 0,
grouped_count_existing_group_hits: 0,
grouped_count_new_group_inserts: 0,
grouped_count_row_materialization_local_instructions: 0,
grouped_count_group_lookup_local_instructions: 0,
grouped_count_existing_group_update_local_instructions: 0,
grouped_count_new_group_insert_local_instructions: 0,
store_get_calls: 3,
response_decode_local_instructions: 0,
execute_local_instructions: 47,
total_local_instructions: 58,
sql_compiled_command_cache_hits: 0,
sql_compiled_command_cache_misses: 0,
shared_query_plan_cache_hits: 0,
shared_query_plan_cache_misses: 0,
},
19,
);
assert_eq!(
finalized.execute_local_instructions,
finalized
.planner_local_instructions
.saturating_add(finalized.store_local_instructions)
.saturating_add(finalized.executor_local_instructions)
.saturating_add(finalized.response_decode_local_instructions),
"public SQL execute totals should include planner, store, executor, and decode work",
);
assert_eq!(
finalized.total_local_instructions,
finalized
.compile_local_instructions
.saturating_add(finalized.execute_local_instructions),
"public SQL total instructions should remain exhaustive across compiler, planner, store, executor, and decode",
);
}
}