use crate::{
db::{
access::{ExecutionPathPayload, LoweredAccess},
data::{DataStore, DecodedDataStoreKey, RawDataStoreKey, StoreVisit},
executor::{
EntityAuthority, LoweredIndexPrefixCardinalityPlan, PreparedAggregatePlan,
aggregate::{
AccessPlannedQuery, PageSpec, PreparedAggregateStreamingInputs,
PreparedScalarTerminalPreflight, ScalarAggregateOutput,
},
exact_count_cardinality_prefixes_for_plan,
pipeline::contracts::LoadExecutor,
plan_metrics::{record_plan_metrics, record_rows_scanned_for_path},
validate_executor_plan_for_authority,
},
index::{IndexId, IndexKeyKind},
query::builder::aggregate::{ScalarTerminalBoundaryOutput, ScalarTerminalBoundaryRequest},
registry::StoreHandle,
},
error::InternalError,
traits::{EntityKind, EntityValue},
types::EntityTag,
value::Value,
};
use std::ops::Bound;
#[cfg(feature = "sql")]
use crate::db::access::LoweredIndexPrefixCardinalitySpec;
#[cfg(feature = "diagnostics")]
use crate::db::diagnostics::measure_local_instruction_delta as measure_count_terminal_phase;
#[cfg(feature = "diagnostics")]
fn measure_index_prefix_cardinality_preflight<T>(run: impl FnOnce() -> T) -> (u64, T) {
measure_count_terminal_phase(run)
}
#[cfg(not(feature = "diagnostics"))]
fn measure_index_prefix_cardinality_preflight<T>(run: impl FnOnce() -> T) -> (u64, T) {
(0, run())
}
impl<E> LoadExecutor<E>
where
E: EntityKind + EntityValue,
{
#[cfg(feature = "sql")]
pub(in crate::db) fn execute_direct_count_index_prefix_cardinality_request(
&self,
authority: EntityAuthority,
page: Option<&PageSpec>,
prefixes: &[LoweredIndexPrefixCardinalitySpec],
) -> Result<Option<ScalarTerminalBoundaryOutput>, InternalError> {
let store = self.db.recovered_store(authority.store_path())?;
let (metadata_local_instructions, count) =
measure_index_prefix_cardinality_preflight(|| {
count_index_prefix_cardinality_specs(store, page, prefixes)
});
let Some(count) = count else {
return Ok(None);
};
record_index_prefix_cardinality_terminal(
authority.entity_path(),
metadata_local_instructions,
);
Ok(Some(ScalarTerminalBoundaryOutput::Count(count)))
}
}
pub(super) fn execute_count_primary_key_cardinality_terminal_request(
prepared: PreparedAggregateStreamingInputs<'_>,
) -> Result<ScalarAggregateOutput, InternalError> {
let lowered_access = prepared.lowered_access()?;
let (count, rows_scanned) = aggregate_count_from_pk_cardinality_with_store(
&prepared.logical_plan,
&lowered_access,
prepared.authority.entity_tag(),
prepared.store,
)?;
record_rows_scanned_for_path(prepared.authority.entity_path(), rows_scanned);
Ok(ScalarAggregateOutput::Count(count))
}
pub(super) fn try_prepare_scalar_terminal_preflight<'plan>(
plan: &'plan PreparedAggregatePlan,
request: &ScalarTerminalBoundaryRequest,
) -> Option<PreparedScalarTerminalPreflight<'plan>> {
match request {
ScalarTerminalBoundaryRequest::Count => try_prepare_index_prefix_cardinality_preflight(
plan,
true,
|authority, logical_plan, prefixes| {
PreparedScalarTerminalPreflight::CountIndexPrefixCardinality {
authority,
logical_plan,
prefixes,
}
},
),
ScalarTerminalBoundaryRequest::Exists
if exists_index_prefix_cardinality_preflight_supported(plan.logical_plan()) =>
{
try_prepare_index_prefix_cardinality_preflight(
plan,
false,
|authority, logical_plan, prefixes| {
PreparedScalarTerminalPreflight::ExistsIndexPrefixCardinality {
authority,
logical_plan,
prefixes,
}
},
)
}
ScalarTerminalBoundaryRequest::Exists
| ScalarTerminalBoundaryRequest::IdTerminal { .. }
| ScalarTerminalBoundaryRequest::IdBySlot { .. }
| ScalarTerminalBoundaryRequest::NthBySlot { .. }
| ScalarTerminalBoundaryRequest::MedianBySlot { .. }
| ScalarTerminalBoundaryRequest::MinMaxBySlot { .. } => None,
}
}
fn exists_index_prefix_cardinality_preflight_supported(logical_plan: &AccessPlannedQuery) -> bool {
match logical_plan
.access
.as_path()
.and_then(|path| path.as_index_multi_lookup_contract())
{
Some((_index, values)) => values.len() > 1,
None => false,
}
}
fn try_prepare_index_prefix_cardinality_preflight<'plan>(
plan: &'plan PreparedAggregatePlan,
allow_ordered_plan: bool,
build: impl FnOnce(
EntityAuthority,
&'plan AccessPlannedQuery,
LoweredIndexPrefixCardinalityPlan<'plan>,
) -> PreparedScalarTerminalPreflight<'plan>,
) -> Option<PreparedScalarTerminalPreflight<'plan>> {
let authority = plan.authority();
let logical_plan = plan.logical_plan();
let Ok(index_prefix_specs) = plan.index_prefix_specs() else {
return None;
};
let prefixes = exact_count_cardinality_prefixes_for_plan(
authority.entity_tag(),
logical_plan,
index_prefix_specs,
allow_ordered_plan,
)?;
Some(build(authority, logical_plan, prefixes))
}
pub(super) fn execute_scalar_terminal_preflight<E>(
executor: &LoadExecutor<E>,
preflight: PreparedScalarTerminalPreflight<'_>,
) -> Result<Option<ScalarTerminalBoundaryOutput>, InternalError>
where
E: EntityKind + EntityValue,
{
match preflight {
PreparedScalarTerminalPreflight::CountIndexPrefixCardinality {
authority,
logical_plan,
prefixes,
} => execute_count_index_prefix_cardinality_preflight(
executor,
authority,
logical_plan,
prefixes,
),
PreparedScalarTerminalPreflight::ExistsIndexPrefixCardinality {
authority,
logical_plan,
prefixes,
} => execute_exists_index_prefix_cardinality_preflight(
executor,
authority,
logical_plan,
prefixes,
),
}
}
fn execute_count_index_prefix_cardinality_preflight<E>(
executor: &LoadExecutor<E>,
authority: EntityAuthority,
logical_plan: &AccessPlannedQuery,
prefixes: LoweredIndexPrefixCardinalityPlan<'_>,
) -> Result<Option<ScalarTerminalBoundaryOutput>, InternalError>
where
E: EntityKind + EntityValue,
{
validate_executor_plan_for_authority(&authority, logical_plan)?;
let store = executor.db.recovered_store(authority.store_path())?;
let (metadata_local_instructions, count) = measure_index_prefix_cardinality_preflight(|| {
count_index_prefix_cardinality(store, logical_plan.scalar_plan().page.as_ref(), prefixes)
});
let Some(count) = count else {
return Ok(None);
};
record_plan_metrics(authority.entity_path(), logical_plan);
record_index_prefix_cardinality_terminal(authority.entity_path(), metadata_local_instructions);
Ok(Some(ScalarTerminalBoundaryOutput::Count(count)))
}
fn execute_exists_index_prefix_cardinality_preflight<E>(
executor: &LoadExecutor<E>,
authority: EntityAuthority,
logical_plan: &AccessPlannedQuery,
prefixes: LoweredIndexPrefixCardinalityPlan<'_>,
) -> Result<Option<ScalarTerminalBoundaryOutput>, InternalError>
where
E: EntityKind + EntityValue,
{
validate_executor_plan_for_authority(&authority, logical_plan)?;
let store = executor.db.recovered_store(authority.store_path())?;
let (metadata_local_instructions, exists) = measure_index_prefix_cardinality_preflight(|| {
exists_index_prefix_cardinality(store, logical_plan.scalar_plan().page.as_ref(), prefixes)
});
let Some(exists) = exists else {
return Ok(None);
};
record_plan_metrics(authority.entity_path(), logical_plan);
record_index_prefix_cardinality_terminal(authority.entity_path(), metadata_local_instructions);
Ok(Some(ScalarTerminalBoundaryOutput::Exists(exists)))
}
fn count_index_prefix_cardinality(
store: StoreHandle,
page: Option<&PageSpec>,
prefixes: LoweredIndexPrefixCardinalityPlan<'_>,
) -> Option<u32> {
let required_candidate_rows = count_window_required_candidate_rows(page);
if required_candidate_rows == Some(0) {
return Some(0);
}
let data_generation = store.with_data(DataStore::generation);
let prefix_len = prefixes.prefix_len();
if prefixes
.specs()
.iter()
.any(|spec| spec.prefix_components().len() < prefix_len)
{
return None;
}
let available_rows = index_prefix_cardinality_sum(
store,
data_generation,
prefixes.index_id(),
prefixes
.specs()
.iter()
.map(|spec| &spec.prefix_components()[..prefix_len]),
required_candidate_rows,
)?;
let available_rows = usize::try_from(available_rows).unwrap_or(usize::MAX);
let count_window = CountWindowResult::from_candidate_rows(page, available_rows);
Some(count_window.count())
}
#[cfg(feature = "sql")]
fn count_index_prefix_cardinality_specs(
store: StoreHandle,
page: Option<&PageSpec>,
prefixes: &[LoweredIndexPrefixCardinalitySpec],
) -> Option<u32> {
let required_candidate_rows = count_window_required_candidate_rows(page);
if required_candidate_rows == Some(0) {
return Some(0);
}
let index_id = common_prefix_cardinality_index_id(prefixes)?;
let data_generation = store.with_data(DataStore::generation);
let available_rows = index_prefix_cardinality_sum(
store,
data_generation,
index_id,
prefixes
.iter()
.map(LoweredIndexPrefixCardinalitySpec::prefix_components),
required_candidate_rows,
)?;
let available_rows = usize::try_from(available_rows).unwrap_or(usize::MAX);
let count_window = CountWindowResult::from_candidate_rows(page, available_rows);
Some(count_window.count())
}
#[cfg(feature = "sql")]
fn common_prefix_cardinality_index_id(
prefixes: &[LoweredIndexPrefixCardinalitySpec],
) -> Option<crate::db::index::IndexId> {
let index_id = prefixes.first()?.index_id();
prefixes
.iter()
.all(|spec| spec.index_id() == index_id)
.then_some(index_id)
}
fn count_window_required_candidate_rows(page: Option<&PageSpec>) -> Option<u64> {
match page {
Some(page) => page
.limit
.map(|limit| u64::from(page.offset).saturating_add(u64::from(limit))),
None => None,
}
}
fn exists_index_prefix_cardinality(
store: StoreHandle,
page: Option<&PageSpec>,
prefixes: LoweredIndexPrefixCardinalityPlan<'_>,
) -> Option<bool> {
let Some(required_candidate_rows) = exists_window_required_candidate_rows(page) else {
return Some(false);
};
let data_generation = store.with_data(DataStore::generation);
let prefix_len = prefixes.prefix_len();
if prefixes
.specs()
.iter()
.any(|spec| spec.prefix_components().len() < prefix_len)
{
return None;
}
let available_rows = index_prefix_cardinality_sum(
store,
data_generation,
prefixes.index_id(),
prefixes
.specs()
.iter()
.map(|spec| &spec.prefix_components()[..prefix_len]),
Some(required_candidate_rows),
)?;
Some(available_rows >= required_candidate_rows)
}
fn index_prefix_cardinality_sum<'a>(
store: StoreHandle,
data_generation: u64,
index_id: IndexId,
component_prefixes: impl IntoIterator<Item = &'a [Vec<u8>]>,
stop_after: Option<u64>,
) -> Option<u64> {
store.with_index(|store| {
store.exact_prefix_cardinality_sum(
data_generation,
IndexKeyKind::User,
index_id,
component_prefixes,
stop_after,
)
})
}
fn exists_window_required_candidate_rows(page: Option<&PageSpec>) -> Option<u64> {
match page {
Some(PageSpec { limit: Some(0), .. }) => None,
Some(page) => Some(u64::from(page.offset).saturating_add(1)),
None => Some(1),
}
}
fn record_index_prefix_cardinality_terminal(
entity_path: &'static str,
base_row_local_instructions: u64,
) {
record_rows_scanned_for_path(entity_path, 0);
#[cfg(not(feature = "diagnostics"))]
let _ = base_row_local_instructions;
#[cfg(feature = "diagnostics")]
super::terminal_attribution::record_index_prefix_cardinality_terminal_attribution(
base_row_local_instructions,
);
}
fn aggregate_count_from_pk_cardinality_with_store(
logical_plan: &AccessPlannedQuery,
lowered_access: &LoweredAccess<'_, Value>,
entity_tag: EntityTag,
store: StoreHandle,
) -> Result<(u32, usize), InternalError> {
let page = logical_plan.scalar_plan().page.as_ref();
let Some(path) = lowered_access.executable().as_path() else {
return Err(InternalError::query_executor_invariant());
};
let candidate_rows = match path {
ExecutionPathPayload::FullScan => count_full_entity_candidate_rows(store, entity_tag, page),
ExecutionPathPayload::KeyRange { start, end } => {
let start_raw =
DecodedDataStoreKey::try_from_structural_key(entity_tag, start)?.to_raw()?;
let end_raw =
DecodedDataStoreKey::try_from_structural_key(entity_tag, end)?.to_raw()?;
count_data_range_candidate_rows(store, start_raw, end_raw, page)
}
_ => {
return Err(InternalError::query_executor_invariant());
}
};
let count_window = CountWindowResult::from_candidate_rows(page, candidate_rows.available);
Ok((count_window.count(), candidate_rows.scanned))
}
fn count_full_entity_candidate_rows(
store: StoreHandle,
entity_tag: EntityTag,
page: Option<&PageSpec>,
) -> CountCandidateRows {
if let Some(available) = store.with_data(|data| data.exact_entity_count(entity_tag)) {
return CountCandidateRows::metadata(available);
}
store.with_data(|data| {
let mut count = 0usize;
let scan_limit = count_scan_limit(page);
if scan_limit == Some(0) {
return CountCandidateRows::scanned(0);
}
let _: Result<(), InternalError> = data.visit_entity(entity_tag, |_raw_key, _row| {
count = count.saturating_add(1);
Ok(count_store_visit(count, scan_limit))
});
CountCandidateRows::scanned(count)
})
}
fn count_data_range_candidate_rows(
store: StoreHandle,
start_raw: RawDataStoreKey,
end_raw: RawDataStoreKey,
page: Option<&PageSpec>,
) -> CountCandidateRows {
store.with_data(|data| {
let mut count = 0usize;
let scan_limit = count_scan_limit(page);
if scan_limit == Some(0) {
return CountCandidateRows::scanned(0);
}
let _: Result<(), InternalError> = data.visit_range(
(Bound::Included(start_raw), Bound::Included(end_raw)),
|_raw_key, _row| {
count = count.saturating_add(1);
Ok(count_store_visit(count, scan_limit))
},
);
CountCandidateRows::scanned(count)
})
}
fn count_scan_limit(page: Option<&PageSpec>) -> Option<usize> {
count_window_required_candidate_rows(page)
.map(|limit| usize::try_from(limit).unwrap_or(usize::MAX))
}
fn count_store_visit(count: usize, scan_limit: Option<usize>) -> StoreVisit {
if scan_limit.is_some_and(|limit| count >= limit) {
StoreVisit::Stop
} else {
StoreVisit::Continue
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct CountCandidateRows {
available: usize,
scanned: usize,
}
impl CountCandidateRows {
fn metadata(available: u64) -> Self {
Self {
available: usize::try_from(available).unwrap_or(usize::MAX),
scanned: 0,
}
}
const fn scanned(count: usize) -> Self {
Self {
available: count,
scanned: count,
}
}
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
struct CountWindowResult {
count: u32,
}
impl CountWindowResult {
fn from_candidate_rows(page: Option<&PageSpec>, available_rows: usize) -> Self {
let Some(page) = page else {
return Self::new(usize_to_u32_saturating(available_rows));
};
let offset = usize::try_from(page.offset).unwrap_or(usize::MAX);
match page.limit {
Some(0) => Self::new(0),
Some(limit) => {
let limit = usize::try_from(limit).unwrap_or(usize::MAX);
let count = available_rows.saturating_sub(offset).min(limit);
Self::new(usize_to_u32_saturating(count))
}
None => {
let count = available_rows.saturating_sub(offset);
Self::new(usize_to_u32_saturating(count))
}
}
}
const fn new(count: u32) -> Self {
Self { count }
}
const fn count(self) -> u32 {
self.count
}
}
fn usize_to_u32_saturating(value: usize) -> u32 {
u32::try_from(value).unwrap_or(u32::MAX)
}