icydb-core 0.184.22

IcyDB — A schema-first typed query engine and persistence runtime for Internet Computer canisters
Documentation
//! Module: executor::aggregate::terminal_attribution
//! Responsibility: diagnostics-only scalar aggregate terminal attribution.
//! Boundary: keeps counters and instruction measurement out of reducer logic.

#[cfg(feature = "sql")]
use crate::db::diagnostics::measure_local_instruction_delta as measure_scalar_aggregate_terminal_phase;
use std::cell::Cell;

std::thread_local! {
    static SCALAR_AGGREGATE_TERMINAL_ATTRIBUTION: Cell<ScalarAggregateTerminalAttribution> =
        const { Cell::new(ScalarAggregateTerminalAttribution::none()) };
}

///
/// ScalarAggregateSinkMode
///
/// ScalarAggregateSinkMode records which executor-owned scalar aggregate path
/// satisfied one terminal set. It exists for diagnostics so fast-path terminal
/// sources remain distinguishable from buffered reducer work.
///

#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub(in crate::db) enum ScalarAggregateSinkMode {
    #[default]
    None,
    #[cfg(feature = "sql")]
    Buffered,
    ExistingRows,
    IndexPrefixCardinality,
    KernelAggregate,
}

impl ScalarAggregateSinkMode {
    pub(in crate::db) const fn label(self) -> Option<&'static str> {
        match self {
            Self::None => None,
            #[cfg(feature = "sql")]
            Self::Buffered => Some("Buffered"),
            Self::ExistingRows => Some("ExistingRows"),
            Self::IndexPrefixCardinality => Some("IndexPrefixCardinality"),
            Self::KernelAggregate => Some("KernelAggregate"),
        }
    }
}

///
/// ScalarAggregateTerminalAttribution
///
/// ScalarAggregateTerminalAttribution is the diagnostics-only executor snapshot
/// for one scalar aggregate terminal execution. It keeps base-row materialization,
/// reducer fold work, expression reuse counts, and terminal shape metrics together.
///

#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub(in crate::db) struct ScalarAggregateTerminalAttribution {
    pub(in crate::db) base_row_local_instructions: u64,
    pub(in crate::db) reducer_fold_local_instructions: u64,
    pub(in crate::db) expression_evaluations: u64,
    pub(in crate::db) filter_evaluations: u64,
    pub(in crate::db) rows_ingested: u64,
    pub(in crate::db) terminal_count: u64,
    pub(in crate::db) unique_input_expr_count: u64,
    pub(in crate::db) unique_filter_expr_count: u64,
    pub(in crate::db) sink_mode: ScalarAggregateSinkMode,
}

impl ScalarAggregateTerminalAttribution {
    pub(in crate::db) const fn none() -> Self {
        Self {
            base_row_local_instructions: 0,
            reducer_fold_local_instructions: 0,
            expression_evaluations: 0,
            filter_evaluations: 0,
            rows_ingested: 0,
            terminal_count: 0,
            unique_input_expr_count: 0,
            unique_filter_expr_count: 0,
            sink_mode: ScalarAggregateSinkMode::None,
        }
    }

    #[cfg(feature = "sql")]
    pub(in crate::db::executor::aggregate) fn from_terminal_counts(
        terminal_count: usize,
        input_expr_count: usize,
        filter_expr_count: usize,
    ) -> Self {
        Self {
            terminal_count: usize_to_u64(terminal_count),
            unique_input_expr_count: usize_to_u64(input_expr_count),
            unique_filter_expr_count: usize_to_u64(filter_expr_count),
            sink_mode: ScalarAggregateSinkMode::Buffered,
            ..Self::none()
        }
    }

    const fn from_index_prefix_cardinality_terminal(base_row_local_instructions: u64) -> Self {
        Self {
            base_row_local_instructions,
            terminal_count: 1,
            sink_mode: ScalarAggregateSinkMode::IndexPrefixCardinality,
            ..Self::none()
        }
    }

    fn from_existing_rows_terminal(rows_ingested: usize) -> Self {
        Self {
            rows_ingested: usize_to_u64(rows_ingested),
            terminal_count: 1,
            sink_mode: ScalarAggregateSinkMode::ExistingRows,
            ..Self::none()
        }
    }

    const fn from_kernel_aggregate_terminal() -> Self {
        Self {
            terminal_count: 1,
            sink_mode: ScalarAggregateSinkMode::KernelAggregate,
            ..Self::none()
        }
    }

    #[cfg(feature = "sql")]
    pub(in crate::db::executor::aggregate) const fn merge_runtime(&mut self, runtime: Self) {
        self.reducer_fold_local_instructions = self
            .reducer_fold_local_instructions
            .saturating_add(runtime.reducer_fold_local_instructions);
        self.expression_evaluations = self
            .expression_evaluations
            .saturating_add(runtime.expression_evaluations);
        self.filter_evaluations = self
            .filter_evaluations
            .saturating_add(runtime.filter_evaluations);
        self.rows_ingested = self.rows_ingested.saturating_add(runtime.rows_ingested);
    }

    fn merge_recorded(&mut self, other: Self) {
        self.base_row_local_instructions = self
            .base_row_local_instructions
            .saturating_add(other.base_row_local_instructions);
        self.reducer_fold_local_instructions = self
            .reducer_fold_local_instructions
            .saturating_add(other.reducer_fold_local_instructions);
        self.expression_evaluations = self
            .expression_evaluations
            .saturating_add(other.expression_evaluations);
        self.filter_evaluations = self
            .filter_evaluations
            .saturating_add(other.filter_evaluations);
        self.rows_ingested = self.rows_ingested.saturating_add(other.rows_ingested);
        self.terminal_count = self.terminal_count.saturating_add(other.terminal_count);
        self.unique_input_expr_count = self
            .unique_input_expr_count
            .saturating_add(other.unique_input_expr_count);
        self.unique_filter_expr_count = self
            .unique_filter_expr_count
            .saturating_add(other.unique_filter_expr_count);
        if other.sink_mode != ScalarAggregateSinkMode::None {
            self.sink_mode = other.sink_mode;
        }
    }
}

/// Run one closure while collecting scalar aggregate terminal diagnostics.
pub(in crate::db) fn with_scalar_aggregate_terminal_attribution<T>(
    run: impl FnOnce() -> T,
) -> (ScalarAggregateTerminalAttribution, T) {
    let previous = SCALAR_AGGREGATE_TERMINAL_ATTRIBUTION.with(|attribution| {
        let previous = attribution.get();
        attribution.set(ScalarAggregateTerminalAttribution::none());
        previous
    });
    let output = run();
    let captured = SCALAR_AGGREGATE_TERMINAL_ATTRIBUTION.with(|attribution| {
        let captured = attribution.get();
        attribution.set(previous);
        captured
    });

    (captured, output)
}

pub(in crate::db::executor::aggregate) fn record_scalar_aggregate_terminal_attribution(
    recorded: ScalarAggregateTerminalAttribution,
) {
    SCALAR_AGGREGATE_TERMINAL_ATTRIBUTION.with(|attribution| {
        let mut current = attribution.get();
        current.merge_recorded(recorded);
        attribution.set(current);
    });
}

pub(in crate::db::executor::aggregate) fn record_index_prefix_cardinality_terminal_attribution(
    base_row_local_instructions: u64,
) {
    record_scalar_aggregate_terminal_attribution(
        ScalarAggregateTerminalAttribution::from_index_prefix_cardinality_terminal(
            base_row_local_instructions,
        ),
    );
}

pub(in crate::db::executor::aggregate) fn record_existing_rows_terminal_attribution(
    rows_ingested: usize,
) {
    record_scalar_aggregate_terminal_attribution(
        ScalarAggregateTerminalAttribution::from_existing_rows_terminal(rows_ingested),
    );
}

pub(in crate::db::executor::aggregate) fn record_kernel_aggregate_terminal_attribution() {
    record_scalar_aggregate_terminal_attribution(
        ScalarAggregateTerminalAttribution::from_kernel_aggregate_terminal(),
    );
}

#[cfg(feature = "sql")]
pub(in crate::db::executor::aggregate) fn measure_phase<T>(run: impl FnOnce() -> T) -> (u64, T) {
    measure_scalar_aggregate_terminal_phase(run)
}

pub(in crate::db::executor::aggregate) fn usize_to_u64(value: usize) -> u64 {
    u64::try_from(value).unwrap_or(u64::MAX)
}