use crate::{
db::{
cursor::{
ContinuationKeyRef, ContinuationRuntime, IndexScanContinuationInput, LoopAction,
WindowCursorContract,
},
data::{DecodedDataStoreKey, RawDataStoreKey},
direction::Direction,
executor::{
LoweredIndexPrefixSpec, LoweredIndexRangeSpec, LoweredIndexScanContract, LoweredKey,
record_row_check_index_entry_scanned, record_row_check_index_key_owned_entry,
record_row_check_index_row_identity_decoded,
},
index::{
IndexEntryExistenceWitness, IndexEntryRowWitness, IndexEntryValue, IndexKey,
RawIndexStoreKey,
predicate::{IndexPredicateExecution, eval_index_execution_on_decoded_key},
},
registry::StoreHandle,
},
error::InternalError,
types::EntityTag,
};
use std::{ops::Bound, sync::Arc};
type IndexComponentValues = Arc<[Vec<u8>]>;
pub(in crate::db::executor) type IndexComponentRows = Vec<(
DecodedDataStoreKey,
IndexEntryExistenceWitness,
IndexComponentValues,
)>;
pub(in crate::db::executor) struct PrimaryScan;
impl PrimaryScan {
pub(in crate::db::executor) fn decode_data_key(
raw: &RawDataStoreKey,
) -> Result<DecodedDataStoreKey, InternalError> {
DecodedDataStoreKey::try_from_raw(raw).map_err(|err| {
InternalError::identity_corruption(format!("failed to decode data key: {err}"))
})
}
}
pub(in crate::db::executor) struct IndexScan;
pub(in crate::db::executor) struct IndexDecodedKeyScanChunk {
keys: Vec<DecodedDataStoreKey>,
last_raw_key: Option<RawIndexStoreKey>,
}
impl IndexDecodedKeyScanChunk {
#[must_use]
const fn new(keys: Vec<DecodedDataStoreKey>, last_raw_key: Option<RawIndexStoreKey>) -> Self {
Self { keys, last_raw_key }
}
#[must_use]
pub(in crate::db::executor) fn into_parts(
self,
) -> (Vec<DecodedDataStoreKey>, Option<RawIndexStoreKey>) {
(self.keys, self.last_raw_key)
}
}
impl IndexScan {
const LIMITED_SCAN_PREALLOC_CAP: usize = 32;
pub(in crate::db::executor) fn prefix_structural(
store: StoreHandle,
entity_tag: EntityTag,
spec: &LoweredIndexPrefixSpec,
direction: Direction,
limit: usize,
predicate_execution: Option<IndexPredicateExecution<'_>>,
) -> Result<Vec<DecodedDataStoreKey>, InternalError> {
Self::resolve_data_values_in_raw_range_limited(
store,
entity_tag,
spec.lower(),
spec.upper(),
IndexScanContinuationInput::new(None, direction),
limit,
predicate_execution,
)
}
#[expect(clippy::too_many_arguments)]
pub(in crate::db::executor) fn components_structural(
store: StoreHandle,
entity_tag: EntityTag,
index: LoweredIndexScanContract,
lower: &Bound<LoweredKey>,
upper: &Bound<LoweredKey>,
continuation: IndexScanContinuationInput<'_>,
limit: usize,
component_indices: &[usize],
predicate_execution: Option<IndexPredicateExecution<'_>>,
) -> Result<IndexComponentRows, InternalError> {
if limit == 0 {
return Ok(Vec::new());
}
let continuation =
ContinuationRuntime::new(continuation, WindowCursorContract::unbounded());
let bounds = continuation.scan_bounds((lower, upper))?;
let mut out = Vec::with_capacity(limit.min(Self::LIMITED_SCAN_PREALLOC_CAP));
store.with_index(|index_store| {
index_store.visit_raw_entries_in_range(
(&bounds.0, &bounds.1),
continuation.direction(),
|raw_key, value| {
match Self::accept_scan_key(&continuation, raw_key)? {
LoopAction::Skip => return Ok(false),
LoopAction::Emit => {}
LoopAction::Stop => return Ok(true),
}
Self::decode_index_entry_and_push_with_components(
entity_tag,
&index,
raw_key,
value,
&mut out,
Some(limit),
component_indices,
"range resolve",
predicate_execution,
)
},
)
})?;
Ok(out)
}
pub(in crate::db::executor) fn range_structural(
store: StoreHandle,
entity_tag: EntityTag,
spec: &LoweredIndexRangeSpec,
continuation: IndexScanContinuationInput<'_>,
limit: usize,
predicate_execution: Option<IndexPredicateExecution<'_>>,
) -> Result<Vec<DecodedDataStoreKey>, InternalError> {
Self::resolve_data_values_in_raw_range_limited(
store,
entity_tag,
spec.lower(),
spec.upper(),
continuation,
limit,
predicate_execution,
)
}
pub(in crate::db::executor) fn chunk_structural(
store: StoreHandle,
entity_tag: EntityTag,
lower: &Bound<LoweredKey>,
upper: &Bound<LoweredKey>,
continuation: IndexScanContinuationInput<'_>,
max_entries: usize,
output_limit: Option<usize>,
) -> Result<IndexDecodedKeyScanChunk, InternalError> {
Self::resolve_chunk(
store,
entity_tag,
lower,
upper,
continuation,
max_entries,
output_limit,
)
}
fn resolve_data_values_in_raw_range_limited(
store: StoreHandle,
entity_tag: EntityTag,
lower: &Bound<LoweredKey>,
upper: &Bound<LoweredKey>,
continuation: IndexScanContinuationInput<'_>,
limit: usize,
predicate_execution: Option<IndexPredicateExecution<'_>>,
) -> Result<Vec<DecodedDataStoreKey>, InternalError> {
if limit == 0 {
return Ok(Vec::new());
}
let continuation =
ContinuationRuntime::new(continuation, WindowCursorContract::unbounded());
let bounds = continuation.scan_bounds((lower, upper))?;
let mut keys = Vec::with_capacity(limit.min(Self::LIMITED_SCAN_PREALLOC_CAP));
store.with_index(|index_store| {
index_store.visit_raw_entries_in_range(
(&bounds.0, &bounds.1),
continuation.direction(),
|raw_key, value| {
match Self::accept_scan_key(&continuation, raw_key)? {
LoopAction::Skip => return Ok(false),
LoopAction::Emit => {}
LoopAction::Stop => return Ok(true),
}
Self::decode_index_entry_and_push(
entity_tag,
raw_key,
value,
&mut keys,
Some(limit),
"range resolve",
predicate_execution,
)
},
)
})?;
Ok(keys)
}
fn resolve_chunk(
store: StoreHandle,
entity_tag: EntityTag,
lower: &Bound<LoweredKey>,
upper: &Bound<LoweredKey>,
continuation: IndexScanContinuationInput<'_>,
max_entries: usize,
output_limit: Option<usize>,
) -> Result<IndexDecodedKeyScanChunk, InternalError> {
if max_entries == 0 || matches!(output_limit, Some(0)) {
return Ok(IndexDecodedKeyScanChunk::new(Vec::new(), None));
}
let continuation =
ContinuationRuntime::new(continuation, WindowCursorContract::unbounded());
let bounds = continuation.scan_bounds((lower, upper))?;
let mut keys = Vec::with_capacity(max_entries.min(Self::LIMITED_SCAN_PREALLOC_CAP));
let mut last_raw_key = None;
let mut scanned_entries = 0usize;
store.with_index(|index_store| {
index_store.visit_raw_entries_in_range(
(&bounds.0, &bounds.1),
continuation.direction(),
|raw_key, value| {
match Self::accept_scan_key(&continuation, raw_key)? {
LoopAction::Skip => return Ok(false),
LoopAction::Emit => {}
LoopAction::Stop => return Ok(true),
}
last_raw_key = Some(raw_key.clone());
scanned_entries = scanned_entries.saturating_add(1);
if Self::decode_index_entry_and_push(
entity_tag,
raw_key,
value,
&mut keys,
output_limit,
"range stream",
None,
)? {
return Ok(true);
}
Ok(scanned_entries == max_entries)
},
)
})?;
let chunk = IndexDecodedKeyScanChunk::new(keys, last_raw_key);
Ok(chunk)
}
fn accept_scan_key(
continuation: &ContinuationRuntime<'_>,
raw_key: &RawIndexStoreKey,
) -> Result<LoopAction, InternalError> {
continuation.accept_key(ContinuationKeyRef::scan(raw_key))
}
fn decode_index_entry_and_push(
entity: EntityTag,
raw_key: &RawIndexStoreKey,
value: &IndexEntryValue,
out: &mut Vec<DecodedDataStoreKey>,
limit: Option<usize>,
context: &'static str,
index_predicate_execution: Option<IndexPredicateExecution<'_>>,
) -> Result<bool, InternalError> {
record_row_check_index_entry_scanned();
if let Some(execution) = index_predicate_execution {
let decoded_key = IndexKey::try_from_raw(raw_key)
.map_err(|err| InternalError::index_scan_key_corrupted_during(context, err))?;
if !eval_index_execution_on_decoded_key(&decoded_key, execution)? {
return Ok(false);
}
}
let row_witness = value
.decode_row_witness(raw_key)
.map_err(InternalError::index_entry_decode_failed)?;
record_row_check_index_key_owned_entry();
record_row_check_index_row_identity_decoded();
out.push(Self::data_key_from_row_witness(entity, &row_witness));
if let Some(limit) = limit
&& out.len() == limit
{
return Ok(true);
}
Ok(false)
}
#[expect(clippy::too_many_arguments)]
fn decode_index_entry_and_push_with_components(
entity: EntityTag,
index: &LoweredIndexScanContract,
raw_key: &RawIndexStoreKey,
value: &IndexEntryValue,
out: &mut IndexComponentRows,
limit: Option<usize>,
component_indices: &[usize],
context: &'static str,
index_predicate_execution: Option<IndexPredicateExecution<'_>>,
) -> Result<bool, InternalError> {
record_row_check_index_entry_scanned();
let decoded_key = IndexKey::try_from_raw(raw_key)
.map_err(|err| InternalError::index_scan_key_corrupted_during(context, err))?;
let mut components = Vec::with_capacity(component_indices.len());
for component_index in component_indices {
let Some(component) = decoded_key.component(*component_index) else {
return Err(InternalError::index_projection_component_required(
index.name(),
*component_index,
));
};
components.push(component.to_vec());
}
let components: Arc<[Vec<u8>]> = Arc::from(components);
if let Some(execution) = index_predicate_execution
&& !eval_index_execution_on_decoded_key(&decoded_key, execution)?
{
return Ok(false);
}
let row_witness = value
.decode_row_witness(raw_key)
.map_err(InternalError::index_entry_decode_failed)?;
record_row_check_index_key_owned_entry();
record_row_check_index_row_identity_decoded();
out.push((
Self::data_key_from_row_witness(entity, &row_witness),
row_witness.existence_witness(),
components,
));
if let Some(limit) = limit
&& out.len() == limit
{
return Ok(true);
}
Ok(false)
}
const fn data_key_from_row_witness(
entity: EntityTag,
row_witness: &IndexEntryRowWitness,
) -> DecodedDataStoreKey {
DecodedDataStoreKey::new(entity, row_witness.storage_key())
}
}