#[cfg(feature = "sql")]
use crate::db::diagnostics::measure_local_instruction_delta as measure_scalar_aggregate_terminal_phase;
use std::cell::Cell;
std::thread_local! {
static SCALAR_AGGREGATE_TERMINAL_ATTRIBUTION: Cell<ScalarAggregateTerminalAttribution> =
const { Cell::new(ScalarAggregateTerminalAttribution::none()) };
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub(in crate::db) enum ScalarAggregateSinkMode {
#[default]
None,
#[cfg(feature = "sql")]
Buffered,
ExistingRows,
IndexPrefixCardinality,
KernelAggregate,
}
impl ScalarAggregateSinkMode {
pub(in crate::db) const fn label(self) -> Option<&'static str> {
match self {
Self::None => None,
#[cfg(feature = "sql")]
Self::Buffered => Some("Buffered"),
Self::ExistingRows => Some("ExistingRows"),
Self::IndexPrefixCardinality => Some("IndexPrefixCardinality"),
Self::KernelAggregate => Some("KernelAggregate"),
}
}
}
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub(in crate::db) struct ScalarAggregateTerminalAttribution {
pub(in crate::db) base_row_local_instructions: u64,
pub(in crate::db) reducer_fold_local_instructions: u64,
pub(in crate::db) expression_evaluations: u64,
pub(in crate::db) filter_evaluations: u64,
pub(in crate::db) rows_ingested: u64,
pub(in crate::db) terminal_count: u64,
pub(in crate::db) unique_input_expr_count: u64,
pub(in crate::db) unique_filter_expr_count: u64,
pub(in crate::db) sink_mode: ScalarAggregateSinkMode,
}
impl ScalarAggregateTerminalAttribution {
pub(in crate::db) const fn none() -> Self {
Self {
base_row_local_instructions: 0,
reducer_fold_local_instructions: 0,
expression_evaluations: 0,
filter_evaluations: 0,
rows_ingested: 0,
terminal_count: 0,
unique_input_expr_count: 0,
unique_filter_expr_count: 0,
sink_mode: ScalarAggregateSinkMode::None,
}
}
#[cfg(feature = "sql")]
pub(in crate::db::executor::aggregate) fn from_terminal_counts(
terminal_count: usize,
input_expr_count: usize,
filter_expr_count: usize,
) -> Self {
Self {
terminal_count: usize_to_u64(terminal_count),
unique_input_expr_count: usize_to_u64(input_expr_count),
unique_filter_expr_count: usize_to_u64(filter_expr_count),
sink_mode: ScalarAggregateSinkMode::Buffered,
..Self::none()
}
}
const fn from_index_prefix_cardinality_terminal() -> Self {
Self {
terminal_count: 1,
sink_mode: ScalarAggregateSinkMode::IndexPrefixCardinality,
..Self::none()
}
}
fn from_existing_rows_terminal(rows_ingested: usize) -> Self {
Self {
rows_ingested: usize_to_u64(rows_ingested),
terminal_count: 1,
sink_mode: ScalarAggregateSinkMode::ExistingRows,
..Self::none()
}
}
const fn from_kernel_aggregate_terminal() -> Self {
Self {
terminal_count: 1,
sink_mode: ScalarAggregateSinkMode::KernelAggregate,
..Self::none()
}
}
#[cfg(feature = "sql")]
pub(in crate::db::executor::aggregate) const fn merge_runtime(&mut self, runtime: Self) {
self.reducer_fold_local_instructions = self
.reducer_fold_local_instructions
.saturating_add(runtime.reducer_fold_local_instructions);
self.expression_evaluations = self
.expression_evaluations
.saturating_add(runtime.expression_evaluations);
self.filter_evaluations = self
.filter_evaluations
.saturating_add(runtime.filter_evaluations);
self.rows_ingested = self.rows_ingested.saturating_add(runtime.rows_ingested);
}
fn merge_recorded(&mut self, other: Self) {
self.base_row_local_instructions = self
.base_row_local_instructions
.saturating_add(other.base_row_local_instructions);
self.reducer_fold_local_instructions = self
.reducer_fold_local_instructions
.saturating_add(other.reducer_fold_local_instructions);
self.expression_evaluations = self
.expression_evaluations
.saturating_add(other.expression_evaluations);
self.filter_evaluations = self
.filter_evaluations
.saturating_add(other.filter_evaluations);
self.rows_ingested = self.rows_ingested.saturating_add(other.rows_ingested);
self.terminal_count = self.terminal_count.saturating_add(other.terminal_count);
self.unique_input_expr_count = self
.unique_input_expr_count
.saturating_add(other.unique_input_expr_count);
self.unique_filter_expr_count = self
.unique_filter_expr_count
.saturating_add(other.unique_filter_expr_count);
if other.sink_mode != ScalarAggregateSinkMode::None {
self.sink_mode = other.sink_mode;
}
}
}
pub(in crate::db) fn with_scalar_aggregate_terminal_attribution<T>(
run: impl FnOnce() -> T,
) -> (ScalarAggregateTerminalAttribution, T) {
let previous = SCALAR_AGGREGATE_TERMINAL_ATTRIBUTION.with(|attribution| {
let previous = attribution.get();
attribution.set(ScalarAggregateTerminalAttribution::none());
previous
});
let output = run();
let captured = SCALAR_AGGREGATE_TERMINAL_ATTRIBUTION.with(|attribution| {
let captured = attribution.get();
attribution.set(previous);
captured
});
(captured, output)
}
pub(in crate::db::executor::aggregate) fn record_scalar_aggregate_terminal_attribution(
recorded: ScalarAggregateTerminalAttribution,
) {
SCALAR_AGGREGATE_TERMINAL_ATTRIBUTION.with(|attribution| {
let mut current = attribution.get();
current.merge_recorded(recorded);
attribution.set(current);
});
}
pub(in crate::db::executor::aggregate) fn record_index_prefix_cardinality_terminal_attribution() {
record_scalar_aggregate_terminal_attribution(
ScalarAggregateTerminalAttribution::from_index_prefix_cardinality_terminal(),
);
}
pub(in crate::db::executor::aggregate) fn record_existing_rows_terminal_attribution(
rows_ingested: usize,
) {
record_scalar_aggregate_terminal_attribution(
ScalarAggregateTerminalAttribution::from_existing_rows_terminal(rows_ingested),
);
}
pub(in crate::db::executor::aggregate) fn record_kernel_aggregate_terminal_attribution() {
record_scalar_aggregate_terminal_attribution(
ScalarAggregateTerminalAttribution::from_kernel_aggregate_terminal(),
);
}
#[cfg(feature = "sql")]
pub(in crate::db::executor::aggregate) fn measure_phase<T>(run: impl FnOnce() -> T) -> (u64, T) {
measure_scalar_aggregate_terminal_phase(run)
}
pub(in crate::db::executor::aggregate) fn usize_to_u64(value: usize) -> u64 {
u64::try_from(value).unwrap_or(u64::MAX)
}