icydb-core 0.144.7

IcyDB — A schema-first typed query engine and persistence runtime for Internet Computer canisters
Documentation
#[cfg(feature = "diagnostics")]
use std::cell::Cell;
#[cfg(any(test, feature = "diagnostics"))]
use std::cell::RefCell;

#[cfg(feature = "diagnostics")]
pub(super) use crate::db::diagnostics::measure_local_instruction_delta as measure_direct_data_row_phase;

///
/// ScalarMaterializationLaneMetrics
///
/// ScalarMaterializationLaneMetrics aggregates one test-scoped or
/// metrics-scoped view of which shared scalar materialization lane actually
/// executed for one workload.
/// This keeps lane attribution explicit so runtime work can be tied back to
/// the executor contract instead of inferred indirectly from instruction
/// totals alone.
///

#[cfg(any(test, feature = "diagnostics"))]
#[cfg_attr(all(test, not(feature = "diagnostics")), allow(unreachable_pub))]
#[allow(clippy::struct_field_names)]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct ScalarMaterializationLaneMetrics {
    pub direct_data_row_path_hits: u64,
    pub direct_filtered_data_row_path_hits: u64,
    pub kernel_data_row_path_hits: u64,
    pub kernel_full_row_retained_path_hits: u64,
    pub kernel_slots_only_path_hits: u64,
}

///
/// DirectDataRowPhaseAttribution
///
/// DirectDataRowPhaseAttribution isolates the direct raw-row scalar lane into
/// scan-local subphases plus the later order/page windows that still matter
/// for warmed fluent perf work.
/// Non-direct executor lanes leave these counters at zero so the attribution
/// surface stays lane-local instead of pretending to describe every runtime.
///

#[cfg(feature = "diagnostics")]
#[allow(clippy::struct_field_names)]
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub(in crate::db::executor) struct DirectDataRowPhaseAttribution {
    pub(in crate::db::executor) scan_local_instructions: u64,
    pub(in crate::db::executor) key_stream_local_instructions: u64,
    pub(in crate::db::executor) row_read_local_instructions: u64,
    pub(in crate::db::executor) key_encode_local_instructions: u64,
    pub(in crate::db::executor) store_get_local_instructions: u64,
    pub(in crate::db::executor) order_window_local_instructions: u64,
    pub(in crate::db::executor) page_window_local_instructions: u64,
}

#[cfg(any(test, feature = "diagnostics"))]
std::thread_local! {
    static SCALAR_MATERIALIZATION_LANE_METRICS: RefCell<Option<ScalarMaterializationLaneMetrics>> = const {
        RefCell::new(None)
    };
}

#[cfg(feature = "diagnostics")]
std::thread_local! {
    static DIRECT_DATA_ROW_PHASE_ATTRIBUTION: Cell<DirectDataRowPhaseAttribution> = const {
        Cell::new(DirectDataRowPhaseAttribution {
            scan_local_instructions: 0,
            key_stream_local_instructions: 0,
            row_read_local_instructions: 0,
            key_encode_local_instructions: 0,
            store_get_local_instructions: 0,
            order_window_local_instructions: 0,
            page_window_local_instructions: 0,
        })
    };
}

#[cfg(any(test, feature = "diagnostics"))]
fn update_scalar_materialization_lane_metrics(
    update: impl FnOnce(&mut ScalarMaterializationLaneMetrics),
) {
    SCALAR_MATERIALIZATION_LANE_METRICS.with(|metrics| {
        let mut metrics = metrics.borrow_mut();
        let Some(metrics) = metrics.as_mut() else {
            return;
        };

        update(metrics);
    });
}

#[cfg(any(test, feature = "diagnostics"))]
pub(super) fn record_direct_data_row_path_hit() {
    update_scalar_materialization_lane_metrics(|metrics| {
        metrics.direct_data_row_path_hits = metrics.direct_data_row_path_hits.saturating_add(1);
    });
}

#[cfg(any(test, feature = "diagnostics"))]
pub(super) fn record_direct_filtered_data_row_path_hit() {
    update_scalar_materialization_lane_metrics(|metrics| {
        metrics.direct_filtered_data_row_path_hits =
            metrics.direct_filtered_data_row_path_hits.saturating_add(1);
    });
}

#[cfg(any(test, feature = "diagnostics"))]
pub(super) fn record_kernel_data_row_path_hit() {
    update_scalar_materialization_lane_metrics(|metrics| {
        metrics.kernel_data_row_path_hits = metrics.kernel_data_row_path_hits.saturating_add(1);
    });
}

#[cfg(any(test, feature = "diagnostics"))]
pub(super) fn record_kernel_full_row_retained_path_hit() {
    update_scalar_materialization_lane_metrics(|metrics| {
        metrics.kernel_full_row_retained_path_hits =
            metrics.kernel_full_row_retained_path_hits.saturating_add(1);
    });
}

#[cfg(any(test, feature = "diagnostics"))]
pub(super) fn record_kernel_slots_only_path_hit() {
    update_scalar_materialization_lane_metrics(|metrics| {
        metrics.kernel_slots_only_path_hits = metrics.kernel_slots_only_path_hits.saturating_add(1);
    });
}

///
/// with_scalar_materialization_lane_metrics
///
/// Run one closure while collecting executor-owned scalar materialization lane
/// metrics on the current thread, then return the closure result plus the
/// aggregated snapshot.
///

#[cfg(any(test, feature = "diagnostics"))]
#[cfg_attr(all(test, not(feature = "diagnostics")), allow(unreachable_pub))]
pub fn with_scalar_materialization_lane_metrics<T>(
    f: impl FnOnce() -> T,
) -> (T, ScalarMaterializationLaneMetrics) {
    SCALAR_MATERIALIZATION_LANE_METRICS.with(|metrics| {
        debug_assert!(
            metrics.borrow().is_none(),
            "scalar materialization lane metrics captures should not nest"
        );
        *metrics.borrow_mut() = Some(ScalarMaterializationLaneMetrics::default());
    });

    let result = f();
    let metrics = SCALAR_MATERIALIZATION_LANE_METRICS
        .with(|metrics| metrics.borrow_mut().take().unwrap_or_default());

    (result, metrics)
}

#[cfg(feature = "diagnostics")]
pub(super) fn record_direct_data_row_scan_local_instructions(delta: u64) {
    update_direct_data_row_phase_attribution(delta, |current, delta| {
        current.scan_local_instructions = current.scan_local_instructions.saturating_add(delta);
    });
}

#[cfg(feature = "diagnostics")]
pub(super) fn record_direct_data_row_key_stream_local_instructions(delta: u64) {
    update_direct_data_row_phase_attribution(delta, |current, delta| {
        current.key_stream_local_instructions =
            current.key_stream_local_instructions.saturating_add(delta);
    });
}

#[cfg(feature = "diagnostics")]
pub(super) fn record_direct_data_row_row_read_local_instructions(delta: u64) {
    update_direct_data_row_phase_attribution(delta, |current, delta| {
        current.row_read_local_instructions =
            current.row_read_local_instructions.saturating_add(delta);
    });
}

#[cfg(feature = "diagnostics")]
pub(super) fn record_direct_data_row_key_encode_local_instructions(delta: u64) {
    update_direct_data_row_phase_attribution(delta, |current, delta| {
        current.key_encode_local_instructions =
            current.key_encode_local_instructions.saturating_add(delta);
    });
}

#[cfg(feature = "diagnostics")]
pub(super) fn record_direct_data_row_store_get_local_instructions(delta: u64) {
    update_direct_data_row_phase_attribution(delta, |current, delta| {
        current.store_get_local_instructions =
            current.store_get_local_instructions.saturating_add(delta);
    });
}

#[cfg(feature = "diagnostics")]
pub(super) fn record_direct_data_row_order_window_local_instructions(delta: u64) {
    update_direct_data_row_phase_attribution(delta, |current, delta| {
        current.order_window_local_instructions = current
            .order_window_local_instructions
            .saturating_add(delta);
    });
}

#[cfg(feature = "diagnostics")]
pub(super) fn record_direct_data_row_page_window_local_instructions(delta: u64) {
    update_direct_data_row_phase_attribution(delta, |current, delta| {
        current.page_window_local_instructions =
            current.page_window_local_instructions.saturating_add(delta);
    });
}

// Apply one direct-row phase counter update through the shared thread-local
// capture slot so individual bucket recorders only own bucket selection.
#[cfg(feature = "diagnostics")]
fn update_direct_data_row_phase_attribution(
    delta: u64,
    update: impl FnOnce(&mut DirectDataRowPhaseAttribution, u64),
) {
    if delta == 0 {
        return;
    }

    DIRECT_DATA_ROW_PHASE_ATTRIBUTION.with(|attribution| {
        let mut current = attribution.get();
        update(&mut current, delta);
        attribution.set(current);
    });
}

#[cfg(feature = "diagnostics")]
pub(in crate::db::executor) fn with_direct_data_row_phase_attribution<T>(
    f: impl FnOnce() -> T,
) -> (T, DirectDataRowPhaseAttribution) {
    let previous = DIRECT_DATA_ROW_PHASE_ATTRIBUTION.with(|attribution| {
        let previous = attribution.get();
        attribution.set(DirectDataRowPhaseAttribution::default());

        previous
    });

    let result = f();
    let captured = DIRECT_DATA_ROW_PHASE_ATTRIBUTION.with(|attribution| {
        let captured = attribution.get();
        attribution.set(previous);

        captured
    });

    (result, captured)
}