use crate::{
db::{
DbSession, EntityResponse, PersistedRow, Query, QueryError,
executor::{
ScalarNumericFieldBoundaryRequest, ScalarProjectionBoundaryOutput,
ScalarProjectionBoundaryRequest, ScalarTerminalBoundaryOutput,
ScalarTerminalBoundaryRequest,
},
query::builder::{
AvgBySlotTerminal, AvgDistinctBySlotTerminal, CountDistinctBySlotTerminal,
CountRowsTerminal, DistinctValuesBySlotTerminal, ExistsRowsTerminal, FirstIdTerminal,
FirstValueBySlotTerminal, LastIdTerminal, LastValueBySlotTerminal, MaxIdBySlotTerminal,
MaxIdTerminal, MedianIdBySlotTerminal, MinIdBySlotTerminal, MinIdTerminal,
MinMaxIdBySlotTerminal, NthIdBySlotTerminal, SumBySlotTerminal,
SumDistinctBySlotTerminal, ValuesBySlotTerminal, ValuesBySlotWithIdsTerminal,
},
query::plan::FieldSlot,
},
traits::{CanisterKind, EntityValue},
types::{Decimal, Id},
value::Value,
};
type FluentIdPair<E> = Option<(Id<E>, Id<E>)>;
impl<C: CanisterKind> DbSession<C> {
fn execute_scalar_terminal_boundary<E>(
&self,
query: &Query<E>,
request: ScalarTerminalBoundaryRequest,
) -> Result<ScalarTerminalBoundaryOutput, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_with_plan(query, move |load, plan| {
load.execute_scalar_terminal_request(plan, request)
})
}
fn execute_scalar_projection_boundary<E>(
&self,
query: &Query<E>,
target_field: FieldSlot,
request: ScalarProjectionBoundaryRequest,
) -> Result<ScalarProjectionBoundaryOutput, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_with_plan(query, move |load, plan| {
load.execute_scalar_projection_boundary(plan, target_field, request)
})
}
pub(in crate::db) fn execute_fluent_count_rows_terminal<E>(
&self,
query: &Query<E>,
strategy: CountRowsTerminal,
) -> Result<u32, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let request = strategy.into_executor_request();
let output = self.execute_scalar_terminal_boundary(query, request)?;
output.into_count().map_err(QueryError::execute)
}
pub(in crate::db) fn execute_fluent_exists_rows_terminal<E>(
&self,
query: &Query<E>,
strategy: ExistsRowsTerminal,
) -> Result<bool, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let request = strategy.into_executor_request();
let output = self.execute_scalar_terminal_boundary(query, request)?;
output.into_exists().map_err(QueryError::execute)
}
fn execute_scalar_id_terminal_boundary<E>(
&self,
query: &Query<E>,
request: ScalarTerminalBoundaryRequest,
) -> Result<Option<Id<E>>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_scalar_terminal_boundary(query, request)?
.into_id::<E>()
.map_err(QueryError::execute)
}
pub(in crate::db) fn execute_fluent_min_id_terminal<E>(
&self,
query: &Query<E>,
strategy: MinIdTerminal,
) -> Result<Option<Id<E>>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_scalar_id_terminal_boundary(query, strategy.into_executor_request())
}
pub(in crate::db) fn execute_fluent_max_id_terminal<E>(
&self,
query: &Query<E>,
strategy: MaxIdTerminal,
) -> Result<Option<Id<E>>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_scalar_id_terminal_boundary(query, strategy.into_executor_request())
}
pub(in crate::db) fn execute_fluent_min_id_by_slot<E>(
&self,
query: &Query<E>,
strategy: MinIdBySlotTerminal,
) -> Result<Option<Id<E>>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_scalar_id_terminal_boundary(query, strategy.into_executor_request())
}
pub(in crate::db) fn execute_fluent_max_id_by_slot<E>(
&self,
query: &Query<E>,
strategy: MaxIdBySlotTerminal,
) -> Result<Option<Id<E>>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_scalar_id_terminal_boundary(query, strategy.into_executor_request())
}
fn execute_scalar_id_pair_terminal_boundary<E>(
&self,
query: &Query<E>,
request: ScalarTerminalBoundaryRequest,
) -> Result<FluentIdPair<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_scalar_terminal_boundary(query, request)?
.into_id_pair::<E>()
.map_err(QueryError::execute)
}
pub(in crate::db) fn execute_fluent_first_id_terminal<E>(
&self,
query: &Query<E>,
strategy: FirstIdTerminal,
) -> Result<Option<Id<E>>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_scalar_id_terminal_boundary(query, strategy.into_executor_request())
}
pub(in crate::db) fn execute_fluent_last_id_terminal<E>(
&self,
query: &Query<E>,
strategy: LastIdTerminal,
) -> Result<Option<Id<E>>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_scalar_id_terminal_boundary(query, strategy.into_executor_request())
}
pub(in crate::db) fn execute_fluent_nth_id_by_slot<E>(
&self,
query: &Query<E>,
strategy: NthIdBySlotTerminal,
) -> Result<Option<Id<E>>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_scalar_id_terminal_boundary(query, strategy.into_executor_request())
}
pub(in crate::db) fn execute_fluent_median_id_by_slot<E>(
&self,
query: &Query<E>,
strategy: MedianIdBySlotTerminal,
) -> Result<Option<Id<E>>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_scalar_id_terminal_boundary(query, strategy.into_executor_request())
}
pub(in crate::db) fn execute_fluent_min_max_id_by_slot<E>(
&self,
query: &Query<E>,
strategy: MinMaxIdBySlotTerminal,
) -> Result<FluentIdPair<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_scalar_id_pair_terminal_boundary(query, strategy.into_executor_request())
}
fn execute_numeric_field_boundary<E>(
&self,
query: &Query<E>,
target_field: FieldSlot,
request: ScalarNumericFieldBoundaryRequest,
) -> Result<Option<Decimal>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_with_plan(query, move |load, plan| {
load.execute_numeric_field_boundary(plan, target_field, request)
})
}
pub(in crate::db) fn execute_fluent_sum_by_slot<E>(
&self,
query: &Query<E>,
strategy: SumBySlotTerminal,
) -> Result<Option<Decimal>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (target_field, request) = strategy.into_executor_request();
self.execute_numeric_field_boundary(query, target_field, request)
}
pub(in crate::db) fn execute_fluent_sum_distinct_by_slot<E>(
&self,
query: &Query<E>,
strategy: SumDistinctBySlotTerminal,
) -> Result<Option<Decimal>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (target_field, request) = strategy.into_executor_request();
self.execute_numeric_field_boundary(query, target_field, request)
}
pub(in crate::db) fn execute_fluent_avg_by_slot<E>(
&self,
query: &Query<E>,
strategy: AvgBySlotTerminal,
) -> Result<Option<Decimal>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (target_field, request) = strategy.into_executor_request();
self.execute_numeric_field_boundary(query, target_field, request)
}
pub(in crate::db) fn execute_fluent_avg_distinct_by_slot<E>(
&self,
query: &Query<E>,
strategy: AvgDistinctBySlotTerminal,
) -> Result<Option<Decimal>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (target_field, request) = strategy.into_executor_request();
self.execute_numeric_field_boundary(query, target_field, request)
}
pub(in crate::db) fn execute_fluent_values_by_slot<E>(
&self,
query: &Query<E>,
strategy: ValuesBySlotTerminal,
) -> Result<Vec<Value>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (target_field, request) = strategy.into_executor_request();
let output = self.execute_scalar_projection_boundary(query, target_field, request)?;
output.into_values().map_err(QueryError::execute)
}
pub(in crate::db) fn execute_fluent_distinct_values_by_slot<E>(
&self,
query: &Query<E>,
strategy: DistinctValuesBySlotTerminal,
) -> Result<Vec<Value>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (target_field, request) = strategy.into_executor_request();
let output = self.execute_scalar_projection_boundary(query, target_field, request)?;
output.into_values().map_err(QueryError::execute)
}
pub(in crate::db) fn execute_fluent_count_distinct_by_slot<E>(
&self,
query: &Query<E>,
strategy: CountDistinctBySlotTerminal,
) -> Result<u32, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (target_field, request) = strategy.into_executor_request();
let output = self.execute_scalar_projection_boundary(query, target_field, request)?;
output.into_count().map_err(QueryError::execute)
}
pub(in crate::db) fn execute_fluent_values_by_with_ids_slot<E>(
&self,
query: &Query<E>,
strategy: ValuesBySlotWithIdsTerminal,
) -> Result<Vec<(Id<E>, Value)>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (target_field, request) = strategy.into_executor_request();
let output = self.execute_scalar_projection_boundary(query, target_field, request)?;
output
.into_values_with_ids::<E>()
.map_err(QueryError::execute)
}
pub(in crate::db) fn execute_fluent_first_value_by_slot<E>(
&self,
query: &Query<E>,
strategy: FirstValueBySlotTerminal,
) -> Result<Option<Value>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (target_field, request) = strategy.into_executor_request();
let output = self.execute_scalar_projection_boundary(query, target_field, request)?;
output.into_terminal_value().map_err(QueryError::execute)
}
pub(in crate::db) fn execute_fluent_last_value_by_slot<E>(
&self,
query: &Query<E>,
strategy: LastValueBySlotTerminal,
) -> Result<Option<Value>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
let (target_field, request) = strategy.into_executor_request();
let output = self.execute_scalar_projection_boundary(query, target_field, request)?;
output.into_terminal_value().map_err(QueryError::execute)
}
pub(in crate::db) fn execute_fluent_bytes<E>(&self, query: &Query<E>) -> Result<u64, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_with_plan(query, |load, plan| load.bytes(plan))
}
pub(in crate::db) fn execute_fluent_bytes_by_slot<E>(
&self,
query: &Query<E>,
target_slot: FieldSlot,
) -> Result<u64, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_with_plan(query, move |load, plan| {
load.bytes_by_slot(plan, target_slot)
})
}
pub(in crate::db) fn execute_fluent_take<E>(
&self,
query: &Query<E>,
take_count: u32,
) -> Result<EntityResponse<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_with_plan(query, move |load, plan| load.take(plan, take_count))
}
pub(in crate::db) fn execute_fluent_top_k_rows_by_slot<E>(
&self,
query: &Query<E>,
target_slot: FieldSlot,
take_count: u32,
) -> Result<EntityResponse<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_with_plan(query, move |load, plan| {
load.top_k_by_slot(plan, target_slot, take_count)
})
}
pub(in crate::db) fn execute_fluent_bottom_k_rows_by_slot<E>(
&self,
query: &Query<E>,
target_slot: FieldSlot,
take_count: u32,
) -> Result<EntityResponse<E>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_with_plan(query, move |load, plan| {
load.bottom_k_by_slot(plan, target_slot, take_count)
})
}
pub(in crate::db) fn execute_fluent_top_k_values_by_slot<E>(
&self,
query: &Query<E>,
target_slot: FieldSlot,
take_count: u32,
) -> Result<Vec<Value>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_with_plan(query, move |load, plan| {
load.top_k_by_values_slot(plan, target_slot, take_count)
})
}
pub(in crate::db) fn execute_fluent_bottom_k_values_by_slot<E>(
&self,
query: &Query<E>,
target_slot: FieldSlot,
take_count: u32,
) -> Result<Vec<Value>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_with_plan(query, move |load, plan| {
load.bottom_k_by_values_slot(plan, target_slot, take_count)
})
}
pub(in crate::db) fn execute_fluent_top_k_values_with_ids_by_slot<E>(
&self,
query: &Query<E>,
target_slot: FieldSlot,
take_count: u32,
) -> Result<Vec<(Id<E>, Value)>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_with_plan(query, move |load, plan| {
load.top_k_by_with_ids_slot(plan, target_slot, take_count)
})
}
pub(in crate::db) fn execute_fluent_bottom_k_values_with_ids_by_slot<E>(
&self,
query: &Query<E>,
target_slot: FieldSlot,
take_count: u32,
) -> Result<Vec<(Id<E>, Value)>, QueryError>
where
E: PersistedRow<Canister = C> + EntityValue,
{
self.execute_with_plan(query, move |load, plan| {
load.bottom_k_by_with_ids_slot(plan, target_slot, take_count)
})
}
}