use crate::{
db::{
access::{ExecutionPathPayload, LoweredAccess, lower_access},
data::{DataStore, DecodedDataStoreKey, StoreVisit},
executor::{
EntityAuthority, LoweredIndexPrefixCardinalityKey, PreparedAggregatePlan,
aggregate::{
AccessPlannedQuery, PageSpec, PreparedAggregateStreamingInputs,
PreparedScalarTerminalPreflight, ScalarAggregateOutput,
},
exact_count_cardinality_prefixes_for_plan,
pipeline::contracts::LoadExecutor,
plan_metrics::{record_plan_metrics, record_rows_scanned_for_path},
validate_executor_plan_for_authority,
},
index::IndexKeyKind,
query::builder::aggregate::{ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest},
registry::StoreHandle,
},
error::InternalError,
traits::{EntityKind, EntityValue},
types::EntityTag,
value::Value,
};
use std::ops::Bound;
pub(super) fn execute_count_primary_key_cardinality_terminal_request(
prepared: PreparedAggregateStreamingInputs<'_>,
) -> Result<ScalarAggregateOutput, InternalError> {
let lowered_access = prepared.lowered_access()?;
let (count, rows_scanned) = aggregate_count_from_pk_cardinality_with_store(
&prepared.logical_plan,
&lowered_access,
prepared.authority.entity_tag(),
prepared.store,
)?;
record_rows_scanned_for_path(prepared.authority.entity_path(), rows_scanned);
Ok(ScalarAggregateOutput::Count(count))
}
pub(super) fn try_prepare_scalar_terminal_preflight<'plan>(
plan: &'plan PreparedAggregatePlan,
request: &ScalarTerminalBoundaryRequest,
) -> Option<PreparedScalarTerminalPreflight<'plan>> {
match request {
ScalarTerminalBoundaryRequest::Count => try_prepare_index_prefix_cardinality_preflight(
plan,
|authority, logical_plan, prefixes| {
PreparedScalarTerminalPreflight::CountIndexPrefixCardinality {
authority,
logical_plan,
prefixes,
}
},
),
ScalarTerminalBoundaryRequest::Exists => try_prepare_index_prefix_cardinality_preflight(
plan,
|authority, logical_plan, prefixes| {
PreparedScalarTerminalPreflight::ExistsIndexPrefixCardinality {
authority,
logical_plan,
prefixes,
}
},
),
ScalarTerminalBoundaryRequest::IdTerminal { .. }
| ScalarTerminalBoundaryRequest::IdBySlot { .. }
| ScalarTerminalBoundaryRequest::NthBySlot { .. }
| ScalarTerminalBoundaryRequest::MedianBySlot { .. }
| ScalarTerminalBoundaryRequest::MinMaxBySlot { .. } => None,
}
}
fn try_prepare_index_prefix_cardinality_preflight<'plan>(
plan: &'plan PreparedAggregatePlan,
build: impl FnOnce(
EntityAuthority,
&'plan AccessPlannedQuery,
Vec<LoweredIndexPrefixCardinalityKey>,
) -> PreparedScalarTerminalPreflight<'plan>,
) -> Option<PreparedScalarTerminalPreflight<'plan>> {
let authority = plan.authority();
let logical_plan = plan.logical_plan();
let Ok(lowered_access) = lower_access(authority.entity_tag(), &logical_plan.access) else {
return None;
};
let Ok(index_prefix_specs) = plan.index_prefix_specs() else {
return None;
};
let prefixes = exact_count_cardinality_prefixes_for_plan(
logical_plan,
&lowered_access,
index_prefix_specs,
)?;
Some(build(authority, logical_plan, prefixes))
}
pub(super) fn execute_scalar_terminal_preflight<E>(
executor: &LoadExecutor<E>,
preflight: PreparedScalarTerminalPreflight<'_>,
) -> Result<Option<ScalarTerminalBoundaryOutput>, InternalError>
where
E: EntityKind + EntityValue,
{
match preflight {
PreparedScalarTerminalPreflight::CountIndexPrefixCardinality {
authority,
logical_plan,
prefixes,
} => execute_count_index_prefix_cardinality_preflight(
executor,
authority,
logical_plan,
&prefixes,
),
PreparedScalarTerminalPreflight::ExistsIndexPrefixCardinality {
authority,
logical_plan,
prefixes,
} => execute_exists_index_prefix_cardinality_preflight(
executor,
authority,
logical_plan,
&prefixes,
),
}
}
fn execute_count_index_prefix_cardinality_preflight<E>(
executor: &LoadExecutor<E>,
authority: EntityAuthority,
logical_plan: &AccessPlannedQuery,
prefixes: &[LoweredIndexPrefixCardinalityKey],
) -> Result<Option<ScalarTerminalBoundaryOutput>, InternalError>
where
E: EntityKind + EntityValue,
{
validate_executor_plan_for_authority(&authority, logical_plan)?;
let store = executor.db.recovered_store(authority.store_path())?;
let Some(count) =
count_index_prefix_cardinality(store, logical_plan.scalar_plan().page.as_ref(), prefixes)
else {
return Ok(None);
};
record_plan_metrics(authority.entity_path(), logical_plan);
record_index_prefix_cardinality_terminal(authority.entity_path());
Ok(Some(ScalarTerminalBoundaryOutput::Count(count)))
}
fn execute_exists_index_prefix_cardinality_preflight<E>(
executor: &LoadExecutor<E>,
authority: EntityAuthority,
logical_plan: &AccessPlannedQuery,
prefixes: &[LoweredIndexPrefixCardinalityKey],
) -> Result<Option<ScalarTerminalBoundaryOutput>, InternalError>
where
E: EntityKind + EntityValue,
{
validate_executor_plan_for_authority(&authority, logical_plan)?;
let store = executor.db.recovered_store(authority.store_path())?;
let Some(exists) =
exists_index_prefix_cardinality(store, logical_plan.scalar_plan().page.as_ref(), prefixes)
else {
return Ok(None);
};
record_plan_metrics(authority.entity_path(), logical_plan);
record_index_prefix_cardinality_terminal(authority.entity_path());
Ok(Some(ScalarTerminalBoundaryOutput::Exists(exists)))
}
fn count_index_prefix_cardinality(
store: StoreHandle,
page: Option<&PageSpec>,
prefixes: &[LoweredIndexPrefixCardinalityKey],
) -> Option<u32> {
let data_generation = store.with_data(DataStore::generation);
let available_rows = store.with_index(|store| {
let mut total = 0_u64;
for prefix in prefixes {
let count = store.exact_prefix_cardinality(
data_generation,
IndexKeyKind::User,
prefix.index_id(),
prefix.prefix_components(),
)?;
total = total.saturating_add(count);
}
Some(total)
});
let available_rows = available_rows?;
let available_rows = usize::try_from(available_rows).unwrap_or(usize::MAX);
let count_window = CountWindowResult::from_candidate_rows(page, available_rows);
Some(count_window.count())
}
fn exists_index_prefix_cardinality(
store: StoreHandle,
page: Option<&PageSpec>,
prefixes: &[LoweredIndexPrefixCardinalityKey],
) -> Option<bool> {
let Some(required_candidate_rows) = exists_window_required_candidate_rows(page) else {
return Some(false);
};
let data_generation = store.with_data(DataStore::generation);
store.with_index(|store| {
let mut available_rows = 0_u64;
for prefix in prefixes {
let count = store.exact_prefix_cardinality(
data_generation,
IndexKeyKind::User,
prefix.index_id(),
prefix.prefix_components(),
)?;
available_rows = available_rows.saturating_add(count);
if available_rows >= required_candidate_rows {
return Some(true);
}
}
Some(false)
})
}
fn exists_window_required_candidate_rows(page: Option<&PageSpec>) -> Option<u64> {
match page {
Some(PageSpec { limit: Some(0), .. }) => None,
Some(page) => Some(u64::from(page.offset).saturating_add(1)),
None => Some(1),
}
}
fn record_index_prefix_cardinality_terminal(entity_path: &'static str) {
record_rows_scanned_for_path(entity_path, 0);
#[cfg(feature = "diagnostics")]
super::terminal_attribution::record_index_prefix_cardinality_terminal_attribution();
}
fn aggregate_count_from_pk_cardinality_with_store(
logical_plan: &AccessPlannedQuery,
lowered_access: &LoweredAccess<'_, Value>,
entity_tag: EntityTag,
store: StoreHandle,
) -> Result<(u32, usize), InternalError> {
let page = logical_plan.scalar_plan().page.as_ref();
let Some(path) = lowered_access.executable().as_path() else {
return Err(InternalError::query_executor_invariant());
};
let available_rows = match path {
ExecutionPathPayload::FullScan => store.with_data(|data| {
let mut count = 0usize;
let _: Result<(), InternalError> = data.visit_entity(entity_tag, |_raw_key, _row| {
count = count.saturating_add(1);
Ok(StoreVisit::Continue)
});
count
}),
ExecutionPathPayload::KeyRange { start, end } => {
let start_raw =
DecodedDataStoreKey::try_from_structural_key(entity_tag, start)?.to_raw()?;
let end_raw =
DecodedDataStoreKey::try_from_structural_key(entity_tag, end)?.to_raw()?;
store.with_data(|data| {
let mut count = 0usize;
let _: Result<(), InternalError> = data.visit_range(
(Bound::Included(start_raw), Bound::Included(end_raw)),
|_raw_key, _row| {
count = count.saturating_add(1);
Ok(StoreVisit::Continue)
},
);
count
})
}
_ => {
return Err(InternalError::query_executor_invariant());
}
};
let count_window = CountWindowResult::from_candidate_rows(page, available_rows);
Ok((count_window.count(), count_window.rows_scanned()))
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct CountWindowResult {
count: u32,
rows_scanned: usize,
}
impl CountWindowResult {
fn from_candidate_rows(page: Option<&PageSpec>, available_rows: usize) -> Self {
let Some(page) = page else {
return Self::new(usize_to_u32_saturating(available_rows), available_rows);
};
let offset = usize::try_from(page.offset).unwrap_or(usize::MAX);
match page.limit {
Some(0) => Self::new(0, 0),
Some(limit) => {
let limit = usize::try_from(limit).unwrap_or(usize::MAX);
let rows_scanned = available_rows.min(offset.saturating_add(limit));
let count = available_rows.saturating_sub(offset).min(limit);
Self::new(usize_to_u32_saturating(count), rows_scanned)
}
None => {
let count = available_rows.saturating_sub(offset);
Self::new(usize_to_u32_saturating(count), available_rows)
}
}
}
const fn new(count: u32, rows_scanned: usize) -> Self {
Self {
count,
rows_scanned,
}
}
const fn count(self) -> u32 {
self.count
}
const fn rows_scanned(self) -> usize {
self.rows_scanned
}
}
fn usize_to_u32_saturating(value: usize) -> u32 {
u32::try_from(value).unwrap_or(u32::MAX)
}