use crate::{
db::{
GroupedRow,
cursor::GroupedPlannedCursor,
executor::{
PreparedAggregatePlan,
aggregate::AggregateKind,
pipeline::{
contracts::{GroupedCursorPage, GroupedRouteStage, LoadExecutor},
entrypoints::execute_prepared_grouped_route_runtime,
grouped_runtime::resolve_grouped_route_for_plan,
},
},
query::plan::{GroupedExecutionConfig, global_distinct_group_spec_for_semantic_aggregate},
},
error::InternalError,
traits::{EntityKind, EntityValue},
value::Value,
};
const GLOBAL_DISTINCT_GROUPED_MAX_GROUPS: u64 = 1;
const GLOBAL_DISTINCT_GROUPED_MAX_GROUP_BYTES: u64 = 16 * 1024 * 1024;
struct GlobalDistinctGroupedOutputContract;
impl GlobalDistinctGroupedOutputContract {
fn continuation_cursor_forbidden() -> InternalError {
InternalError::query_executor_invariant(
"global DISTINCT grouped aggregate must not emit continuation cursor",
)
}
fn grouped_row_count_invalid(found: usize) -> InternalError {
InternalError::query_executor_invariant(format!(
"global DISTINCT grouped aggregate must emit at most one grouped row, found {found}"
))
}
fn grouped_key_must_be_empty() -> InternalError {
InternalError::query_executor_invariant(
"global DISTINCT grouped aggregate row must have empty grouped key",
)
}
fn aggregate_value_count_invalid(found: usize) -> InternalError {
InternalError::query_executor_invariant(format!(
"global DISTINCT grouped aggregate row must have one aggregate value, found {found}"
))
}
fn decode_page(page: GroupedCursorPage) -> Result<Option<Value>, InternalError> {
if page.next_cursor.is_some() {
return Err(Self::continuation_cursor_forbidden());
}
if page.rows.len() > 1 {
return Err(Self::grouped_row_count_invalid(page.rows.len()));
}
let Some(row) = page.rows.first() else {
return Ok(None);
};
Self::decode_row(row)
}
fn decode_row(row: &GroupedRow) -> Result<Option<Value>, InternalError> {
if !row.group_key().is_empty() {
return Err(Self::grouped_key_must_be_empty());
}
if row.aggregate_values().len() != 1 {
return Err(Self::aggregate_value_count_invalid(
row.aggregate_values().len(),
));
}
Ok(row.aggregate_values().first().cloned())
}
}
impl<E> LoadExecutor<E>
where
E: EntityKind + EntityValue,
{
pub(in crate::db::executor::aggregate) fn prepare_global_distinct_grouped_route(
&self,
plan: PreparedAggregatePlan,
kind: AggregateKind,
target_field: &str,
) -> Result<GroupedRouteStage, InternalError> {
let grouped_shape = global_distinct_group_spec_for_semantic_aggregate(
kind,
target_field,
GroupedExecutionConfig::with_hard_limits(
GLOBAL_DISTINCT_GROUPED_MAX_GROUPS,
GLOBAL_DISTINCT_GROUPED_MAX_GROUP_BYTES,
),
)
.map_err(|reason| reason.into_global_distinct_prepare_internal_error(kind))?;
resolve_grouped_route_for_plan(
plan.into_grouped_load_plan(grouped_shape),
GroupedPlannedCursor::none(),
self.debug,
)
}
pub(in crate::db::executor::aggregate) fn execute_prepared_global_distinct_grouped_aggregate(
&self,
route: GroupedRouteStage,
) -> Result<Option<Value>, InternalError> {
let (page, _) = execute_prepared_grouped_route_runtime(
self.prepare_grouped_route_runtime(route, None, None)?,
)?;
GlobalDistinctGroupedOutputContract::decode_page(page)
}
}