use std::{
collections::{BTreeMap, BTreeSet},
pin::Pin,
sync::Arc,
time::Instant,
};
use aisle::PruneRequest;
use arrow_array::RecordBatch;
use arrow_schema::{DataType, Schema, SchemaRef};
use fusio::{
DynFs,
executor::{Executor, Mutex, Timer},
};
use fusio_parquet::reader::AsyncReader;
use futures::{Stream, StreamExt, TryStreamExt, stream};
use parquet::{
arrow::{
ProjectionMask,
arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions},
},
errors::ParquetError,
file::metadata::{PageIndexPolicy, ParquetMetaDataReader},
};
use typed_arrow_dyn::DynRow;
use crate::{
db::DbInner,
extractor::{KeyExtractError, KeyProjection, projection_for_columns},
inmem::{
immutable::{self, ImmutableSegment, memtable::ImmutableVisibleEntry},
mutable::memtable::DynRowScanEntry,
},
key::{KeyOwned, KeyRow},
mode::fingerprint_schema,
mutation::DynMutation,
mvcc::{MVCC_COMMIT_COL, Timestamp},
ondisk::{
bloom::{BatchedAsyncReader, BloomFilterCache, SstBloomFilterProvider},
metadata::ParquetMetadataCache,
scan::{DeleteStreamWithExtractor, SstableScan, UnpinExec},
sstable::{
ParquetStreamOptions, SsTableError, open_parquet_stream_with_metadata,
split_predicate_for_row_filter, storage_path_from_manifest, validate_page_indexes,
},
},
query::{
Expr, ScalarValue,
scan::{
DeleteSelection, RowSet, ScanPlan, ScanSelection, SstScanSelection, SstSelection,
key_bounds_for_predicate, key_range_for_predicate, next_prefix_string,
projection_with_predicate,
},
stream::{
Order, OwnedImmutableScan, OwnedMutableScan, ScanStream, merge::MergeStream,
package::PackageStream,
},
},
transaction::{Snapshot as TxSnapshot, TransactionScan},
};
async fn parquet_metadata_cached<E>(
fs: Arc<dyn DynFs>,
path: &fusio::path::Path,
cache: Arc<E::Mutex<ParquetMetadataCache>>,
executor: E,
) -> Result<Arc<parquet::file::metadata::ParquetMetaData>, SsTableError>
where
E: Executor + Clone + 'static,
{
if let Some(cached) = {
let guard = cache.lock().await;
guard.get(path)
} {
return Ok(cached);
}
let file = fs.open(path).await.map_err(SsTableError::Fs)?;
let size = file.size().await.map_err(SsTableError::Fs)?;
let mut reader = AsyncReader::new(file, size, UnpinExec(executor))
.await
.map_err(SsTableError::Fs)?;
let metadata = ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::Optional)
.load_and_finish(&mut reader, size)
.await
.map_err(SsTableError::Parquet)?;
validate_page_indexes(path, &metadata)?;
let metadata = Arc::new(metadata);
let mut guard = cache.lock().await;
guard.insert(path, Arc::clone(&metadata));
Ok(metadata)
}
async fn pruner_for_schema<E>(
cache: Arc<E::Mutex<crate::db::PrunerCache>>,
schema: &SchemaRef,
) -> Option<Arc<aisle::Pruner>>
where
E: Executor + Clone + 'static,
{
let fingerprint = fingerprint_schema(schema);
let mut guard = cache.lock().await;
if let Some(pruner) = guard.get(&fingerprint) {
return Some(Arc::clone(pruner));
}
let pruner = Arc::new(aisle::Pruner::try_new(Arc::clone(schema)).ok()?);
guard.insert(fingerprint, Arc::clone(&pruner));
Some(pruner)
}
struct PruneContext<E>
where
E: Executor + Clone + 'static,
{
fs: Arc<dyn DynFs>,
metadata_cache: Arc<E::Mutex<ParquetMetadataCache>>,
bloom_cache: Arc<E::Mutex<BloomFilterCache>>,
pruner_cache: Arc<E::Mutex<crate::db::PrunerCache>>,
executor: E,
}
pub const DEFAULT_SCAN_BATCH_ROWS: usize = 1024;
#[derive(Debug, Clone, Copy, Default)]
pub struct ScanSetupProfile {
snapshot_ns: u64,
plan_scan_ns: u64,
build_scan_streams_ns: u64,
merge_init_ns: u64,
package_init_ns: u64,
}
impl ScanSetupProfile {
pub fn snapshot_ns(&self) -> u64 {
self.snapshot_ns
}
pub fn plan_scan_ns(&self) -> u64 {
self.plan_scan_ns
}
pub fn build_scan_streams_ns(&self) -> u64 {
self.build_scan_streams_ns
}
pub fn merge_init_ns(&self) -> u64 {
self.merge_init_ns
}
pub fn package_init_ns(&self) -> u64 {
self.package_init_ns
}
}
impl TxSnapshot {
pub(crate) async fn plan_scan<FS, E>(
&self,
db: &DbInner<FS, E>,
predicate: &Expr,
projected_schema: Option<&SchemaRef>,
limit: Option<usize>,
) -> Result<ScanPlan, crate::db::DBError>
where
FS: crate::manifest::ManifestFs<E>,
E: Executor + Timer + Clone,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
self.plan_scan_with_context(db, predicate, projected_schema, limit, false)
.await
}
async fn plan_scan_with_context<FS, E>(
&self,
db: &DbInner<FS, E>,
predicate: &Expr,
projected_schema: Option<&SchemaRef>,
limit: Option<usize>,
force_predicate_columns: bool,
) -> Result<ScanPlan, crate::db::DBError>
where
FS: crate::manifest::ManifestFs<E>,
E: Executor + Timer + Clone,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
if let Some(column) = find_bloom_filter_column(predicate) {
return Err(crate::db::DBError::UnsupportedPredicate {
reason: format!(
"bloom filter predicates are not supported yet (column '{column}')"
),
});
}
let projected_schema = projected_schema.cloned();
let projection_schema = projected_schema.as_ref().unwrap_or(&db.schema);
let predicate_scan_schema =
projection_with_predicate(&db.schema, projection_schema, Some(predicate))?;
let split = split_predicate_for_row_filter(predicate, &predicate_scan_schema);
let mut pushdown_predicate = split.pushdown;
let mut residual_predicate = split.residual;
let read_ts = self.read_view().read_ts();
let key_schema = db.extractor().key_schema();
let key_bounds = key_bounds_for_predicate(predicate, &key_schema);
let key_range = key_range_for_predicate(predicate, &key_schema);
let (immutable_indexes, immutable_row_sets) = {
let seal = db.seal_state_lock();
let prune_input: Vec<&ImmutableSegment> =
seal.immutables.iter().map(|arc| arc.as_ref()).collect();
let indexes = immutable::prune_segments(&prune_input, key_bounds.as_ref(), read_ts);
let row_sets = indexes
.iter()
.filter_map(|idx| seal.immutables.get(*idx))
.map(|segment| RowSet::all(segment.entry_count()))
.collect();
(indexes, row_sets)
};
let mutable_row_set = {
let row_count = db.mem.row_count();
if row_count == 0 {
RowSet::all(0)
} else {
let (min_key, max_key) = db.mem.key_bounds();
let mut keep = true;
if let (Some(min_key), Some(max_key)) = (min_key.as_ref(), max_key.as_ref())
&& let Some(bounds) = key_bounds.as_ref()
&& !bounds.overlaps(min_key, max_key)
{
keep = false;
}
if keep
&& let Some((min_commit_ts, _)) = db.mem.commit_ts_bounds()
&& min_commit_ts > read_ts
{
keep = false;
}
if keep {
RowSet::all(row_count)
} else {
RowSet::none(row_count)
}
}
};
let fs = Arc::clone(&db.fs);
let executor: E = (**db.executor()).clone();
let prune_ctx = PruneContext {
fs: Arc::clone(&fs),
metadata_cache: db.metadata_cache(),
bloom_cache: db.bloom_cache(),
pruner_cache: db.pruner_cache(),
executor: executor.clone(),
};
let has_non_sst = !mutable_row_set.is_empty() || !immutable_indexes.is_empty();
let non_sst_residual = if force_predicate_columns && !matches!(predicate, Expr::True) {
Some(predicate.clone())
} else if has_non_sst {
split_predicate_for_non_sst(predicate, &key_schema).residual
} else {
None
};
residual_predicate = combine_predicates_with_and(residual_predicate, non_sst_residual);
let mut sst_selections = Vec::new();
let mut sst_requires_full_residual = false;
for entry in self
.table_snapshot()
.latest_version
.as_ref()
.map(|v| v.ssts())
.unwrap_or(&[])
.iter()
.flatten()
{
if let Some(bounds) = key_bounds.as_ref() {
if bounds.is_empty() {
continue;
}
if let Some(stats) = entry.stats()
&& let (Some(min_key), Some(max_key)) =
(stats.min_key.as_ref(), stats.max_key.as_ref())
&& !bounds.overlaps(min_key, max_key)
{
continue;
}
}
if let Some(min_commit_ts) = entry.stats().and_then(|stats| stats.min_commit_ts)
&& min_commit_ts > read_ts
{
continue;
}
let (mut selection, requires_residual) = prune_sst_selection(
&prune_ctx,
&storage_path_from_manifest(&db.sst_root, entry.data_path()),
predicate,
read_ts,
&predicate_scan_schema,
&key_schema,
)
.await?;
let has_delete_sidecar = entry.delete_path().is_some();
if selection.row_set.is_empty() && !has_delete_sidecar {
continue;
}
if requires_residual {
sst_requires_full_residual = true;
}
if let Some(delete_path) = entry.delete_path() {
let delete_selection = plan_delete_sidecar_selection(
Arc::clone(&prune_ctx.fs),
&storage_path_from_manifest(&db.sst_root, delete_path),
&key_schema,
Arc::clone(&prune_ctx.metadata_cache),
prune_ctx.executor.clone(),
)
.await?;
selection.delete_selection = Some(delete_selection);
}
sst_selections.push(SstScanSelection {
entry: entry.clone(),
selection: ScanSelection::Sst(selection),
});
}
if sst_requires_full_residual && !matches!(predicate, Expr::True) {
residual_predicate = Some(predicate.clone());
}
if sst_selections.is_empty() {
pushdown_predicate = None;
}
let needs_predicate_columns = force_predicate_columns || residual_predicate.is_some();
let scan_schema = if needs_predicate_columns {
Arc::clone(&predicate_scan_schema)
} else {
Arc::clone(projection_schema)
};
if !needs_predicate_columns && scan_schema.as_ref() != predicate_scan_schema.as_ref() {
for sst in &mut sst_selections {
if let ScanSelection::Sst(selection) = &mut sst.selection {
reproject_sst_selection(selection, &scan_schema, &key_schema)?;
}
}
}
Ok(ScanPlan {
pushdown_predicate,
immutable_indexes,
mutable_row_set,
immutable_row_sets,
mutable_selection: key_range
.clone()
.map(ScanSelection::KeyRange)
.unwrap_or(ScanSelection::AllRows),
immutable_selection: key_range
.map(ScanSelection::KeyRange)
.unwrap_or(ScanSelection::AllRows),
sst_selections,
residual_predicate,
projected_schema,
scan_schema,
limit,
read_ts,
_snapshot: self.table_snapshot().clone(),
})
}
}
async fn prune_sst_selection<E>(
ctx: &PruneContext<E>,
data_path: &fusio::path::Path,
predicate: &Expr,
read_ts: Timestamp,
scan_schema: &SchemaRef,
key_schema: &SchemaRef,
) -> Result<(SstSelection, bool), SsTableError>
where
E: Executor + Clone + 'static,
{
let metadata = parquet_metadata_cached(
Arc::clone(&ctx.fs),
data_path,
Arc::clone(&ctx.metadata_cache),
ctx.executor.clone(),
)
.await?;
let options = ArrowReaderOptions::new().with_page_index(true);
let arrow_metadata = ArrowReaderMetadata::try_new(Arc::clone(&metadata), options)
.map_err(SsTableError::Parquet)?;
let schema = arrow_metadata.schema();
let requires_residual = split_predicate_for_row_filter(predicate, schema)
.residual
.is_some();
let commit_predicate = Expr::lt_eq(MVCC_COMMIT_COL, ScalarValue::UInt64(Some(read_ts.get())));
let prune_predicate = if matches!(predicate, Expr::True) {
commit_predicate
} else {
Expr::and(vec![predicate.clone(), commit_predicate])
};
let bloom_file = ctx.fs.open(data_path).await.map_err(SsTableError::Fs)?;
let size = bloom_file.size().await.map_err(SsTableError::Fs)?;
let reader = BatchedAsyncReader::new(bloom_file, size, ctx.executor.clone())
.await
.map_err(SsTableError::Fs)?;
let mut provider = SstBloomFilterProvider::new(
data_path.clone(),
Arc::clone(&metadata),
reader,
Arc::clone(&ctx.bloom_cache),
);
let prune_result =
if let Some(pruner) = pruner_for_schema::<E>(Arc::clone(&ctx.pruner_cache), schema).await {
pruner
.prune_ir_with_bloom_provider(
metadata.as_ref(),
std::slice::from_ref(&prune_predicate),
&mut provider,
)
.await
} else {
PruneRequest::new(metadata.as_ref(), schema.as_ref())
.with_predicate(&prune_predicate)
.enable_page_index(true)
.enable_bloom_filter(true)
.prune_async(&mut provider)
.await
};
let mut row_groups = prune_result.row_groups().to_vec();
row_groups.sort_unstable();
row_groups.dedup();
let selected_row_groups_rows = if row_groups.is_empty() {
0usize
} else {
row_groups
.iter()
.map(|idx| {
usize::try_from(metadata.row_group(*idx).num_rows()).map_err(|_| {
SsTableError::Parquet(ParquetError::General(
"parquet row group count exceeds usize::MAX".to_string(),
))
})
})
.collect::<Result<Vec<_>, _>>()?
.into_iter()
.sum::<usize>()
};
let total_rows = selected_row_groups_rows;
let row_set = match prune_result.row_selection().cloned() {
Some(selection) => RowSet::from_row_selection(total_rows, selection).map_err(|err| {
SsTableError::RowSelection {
reason: err.to_string(),
}
})?,
None => RowSet::all(total_rows),
};
let total_row_groups = metadata.num_row_groups();
let row_groups = if row_groups.len() == total_row_groups {
None
} else {
Some(row_groups)
};
let (projection, projected_schema) =
sst_projection_from_metadata(&arrow_metadata, scan_schema, key_schema)?;
Ok((
SstSelection {
row_groups,
row_set,
metadata,
projection,
projected_schema,
delete_selection: None,
},
requires_residual,
))
}
fn sst_projection_from_metadata(
arrow_metadata: &ArrowReaderMetadata,
scan_schema: &SchemaRef,
key_schema: &SchemaRef,
) -> Result<(ProjectionMask, SchemaRef), SsTableError> {
let mut required = BTreeSet::new();
for field in scan_schema.fields() {
required.insert(field.name().to_string());
}
for field in key_schema.fields() {
required.insert(field.name().to_string());
}
required.insert(MVCC_COMMIT_COL.to_string());
let mut remaining = required;
let mut projected_fields = Vec::new();
let mut root_indices = Vec::new();
for (idx, field) in arrow_metadata.schema().fields().iter().enumerate() {
if remaining.remove(field.name()) {
projected_fields.push(field.clone());
root_indices.push(idx);
}
}
if let Some(missing) = remaining.iter().next() {
return Err(KeyExtractError::NoSuchField {
name: missing.to_string(),
}
.into());
}
let projected_schema = Arc::new(Schema::new(projected_fields));
let projection = ProjectionMask::roots(arrow_metadata.parquet_schema(), root_indices);
Ok((projection, projected_schema))
}
fn reproject_sst_selection(
selection: &mut SstSelection,
scan_schema: &SchemaRef,
key_schema: &SchemaRef,
) -> Result<(), SsTableError> {
let options = ArrowReaderOptions::new().with_page_index(true);
let arrow_metadata = ArrowReaderMetadata::try_new(Arc::clone(&selection.metadata), options)
.map_err(SsTableError::Parquet)?;
let (projection, projected_schema) =
sst_projection_from_metadata(&arrow_metadata, scan_schema, key_schema)?;
selection.projection = projection;
selection.projected_schema = projected_schema;
Ok(())
}
fn schema_projection_indices(
base_schema: &SchemaRef,
target_schema: &SchemaRef,
) -> Result<Vec<usize>, KeyExtractError> {
let mut indices = Vec::with_capacity(target_schema.fields().len());
for field in target_schema.fields() {
let Some((idx, _)) = base_schema
.fields()
.iter()
.enumerate()
.find(|(_, candidate)| candidate.name() == field.name())
else {
return Err(KeyExtractError::NoSuchField {
name: field.name().to_string(),
});
};
indices.push(idx);
}
Ok(indices)
}
async fn plan_delete_sidecar_selection<E>(
fs: Arc<dyn DynFs>,
delete_path: &fusio::path::Path,
key_schema: &SchemaRef,
metadata_cache: Arc<E::Mutex<ParquetMetadataCache>>,
executor: E,
) -> Result<DeleteSelection, SsTableError>
where
E: Executor + Clone + 'static,
{
let metadata = parquet_metadata_cached(fs, delete_path, metadata_cache, executor).await?;
let options = ArrowReaderOptions::new().with_page_index(true);
let arrow_metadata = ArrowReaderMetadata::try_new(Arc::clone(&metadata), options)
.map_err(SsTableError::Parquet)?;
let file_schema = arrow_metadata.schema();
let parquet_schema = arrow_metadata.parquet_schema();
let mut required = BTreeSet::new();
for field in key_schema.fields() {
required.insert(field.name().to_string());
}
required.insert(MVCC_COMMIT_COL.to_string());
let mut remaining = required;
let mut root_indices = Vec::new();
for (idx, field) in file_schema.fields().iter().enumerate() {
if remaining.remove(field.name()) {
root_indices.push(idx);
}
}
if let Some(missing) = remaining.iter().next() {
return Err(KeyExtractError::NoSuchField {
name: missing.to_string(),
}
.into());
}
let projection = ProjectionMask::roots(parquet_schema, root_indices);
Ok(DeleteSelection {
metadata,
projection,
})
}
impl<FS, E> DbInner<FS, E>
where
FS: crate::manifest::ManifestFs<E>,
E: Executor + Timer + Clone,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
#[cfg(test)]
pub(crate) async fn execute_scan<'a>(
&'a self,
plan: ScanPlan,
) -> Result<impl Stream<Item = Result<RecordBatch, crate::db::DBError>> + 'a, crate::db::DBError>
{
let result_projection = plan
.projected_schema
.clone()
.unwrap_or_else(|| Arc::clone(&self.schema));
let scan_schema = Arc::clone(&plan.scan_schema);
let streams = self.build_scan_streams(&plan, None).await?;
if streams.is_empty() {
let stream = stream::empty::<Result<RecordBatch, crate::db::DBError>>();
return Ok(Box::pin(stream)
as Pin<
Box<dyn Stream<Item = Result<RecordBatch, crate::db::DBError>> + 'a>,
>);
}
let ScanPlan {
residual_predicate,
limit,
..
} = plan;
let needs_post_filter = residual_predicate.is_some();
let limit_for_merge = if needs_post_filter { None } else { limit };
let limit_for_package = if needs_post_filter { limit } else { None };
let merge = MergeStream::from_vec(streams, limit_for_merge, Some(Order::Asc))
.await
.map_err(crate::db::DBError::from)?;
let package = PackageStream::with_limit(
DEFAULT_SCAN_BATCH_ROWS,
merge,
Arc::clone(&scan_schema),
Arc::clone(&result_projection),
residual_predicate,
limit_for_package,
)
.map_err(crate::db::DBError::from)?;
let mapped = package.map(|result| result.map_err(crate::db::DBError::from));
Ok(Box::pin(mapped)
as Pin<
Box<dyn Stream<Item = Result<RecordBatch, crate::db::DBError>> + 'a>,
>)
}
pub(crate) async fn build_scan_streams<'a>(
&'a self,
plan: &ScanPlan,
txn_scan: Option<TransactionScan<'a>>,
) -> Result<Vec<ScanStream<'a, E>>, crate::db::DBError> {
let mut streams = Vec::new();
if let Some(txn_scan) = txn_scan {
streams.push(ScanStream::from(txn_scan));
}
let scan_schema = Arc::clone(&plan.scan_schema);
let key_schema = self.extractor().key_schema();
if !plan.mutable_row_set.is_empty() {
let mutable_scan = match &plan.mutable_selection {
ScanSelection::AllRows | ScanSelection::Sst(_) => OwnedMutableScan::from_guard(
self.mem.read(),
Some(Arc::clone(&scan_schema)),
plan.read_ts,
)?,
ScanSelection::KeyRange(range) => OwnedMutableScan::from_guard_range(
self.mem.read(),
Some(Arc::clone(&scan_schema)),
plan.read_ts,
range.start.clone(),
range.end.clone(),
)?,
};
streams.push(ScanStream::from(mutable_scan));
}
let immutables: Vec<Arc<ImmutableSegment>> = {
let seal = self.seal_state_lock();
plan.immutable_indexes
.iter()
.filter_map(|idx| seal.immutables.get(*idx).cloned())
.collect()
};
for segment in immutables {
let owned = match &plan.immutable_selection {
ScanSelection::AllRows | ScanSelection::Sst(_) => OwnedImmutableScan::from_arc(
Arc::clone(&segment),
Some(Arc::clone(&scan_schema)),
plan.read_ts,
)?,
ScanSelection::KeyRange(range) => OwnedImmutableScan::from_arc_range(
Arc::clone(&segment),
Some(Arc::clone(&scan_schema)),
plan.read_ts,
range.start.clone(),
range.end.clone(),
)?,
};
streams.push(ScanStream::from(owned));
}
for sst in plan.sst_selections() {
let selection = match &sst.selection {
ScanSelection::Sst(selection) => selection,
ScanSelection::AllRows => {
return Err(crate::db::DBError::SsTable(
SsTableError::InvalidScanSelection {
selection: "AllRows",
},
));
}
ScanSelection::KeyRange(_) => {
return Err(crate::db::DBError::SsTable(
SsTableError::InvalidScanSelection {
selection: "KeyRange",
},
));
}
};
let data_path = storage_path_from_manifest(&self.sst_root, sst.entry.data_path());
let executor: E = (**self.executor()).clone();
let projected_schema = Arc::clone(&selection.projected_schema);
let projection_indices = schema_projection_indices(&projected_schema, &scan_schema)?;
let key_indices = schema_projection_indices(&projected_schema, &key_schema)?;
let data_extractor: Arc<dyn KeyProjection> =
projection_for_columns(projected_schema, key_indices)?.into();
let options = ParquetStreamOptions {
projection: Some(selection.projection.clone()),
row_groups: selection.row_groups.clone(),
row_selection: selection.row_set.to_row_selection(),
row_filter_predicate: plan.pushdown_predicate.as_ref(),
};
let data_stream = open_parquet_stream_with_metadata(
Arc::clone(&self.fs),
data_path,
Arc::clone(&selection.metadata),
options,
executor.clone(),
)
.await
.map_err(crate::db::DBError::SsTable)?;
let delete_stream_with_extractor = if let Some(delete_path) = sst.entry.delete_path() {
let delete_selection = selection.delete_selection.as_ref().ok_or_else(|| {
crate::db::DBError::SsTable(SsTableError::InvalidScanSelection {
selection: "missing delete sidecar selection",
})
})?;
let delete_path = storage_path_from_manifest(&self.sst_root, delete_path);
let options = ParquetStreamOptions {
projection: Some(delete_selection.projection.clone()),
row_groups: None,
row_selection: None,
row_filter_predicate: None,
};
let stream = open_parquet_stream_with_metadata(
Arc::clone(&self.fs),
delete_path,
Arc::clone(&delete_selection.metadata),
options,
executor.clone(),
)
.await
.map_err(crate::db::DBError::SsTable)?;
Some(DeleteStreamWithExtractor {
stream,
extractor: Arc::clone(self.delete_extractor()),
})
} else {
if selection.delete_selection.is_some() {
return Err(crate::db::DBError::SsTable(
SsTableError::InvalidScanSelection {
selection: "unexpected delete sidecar selection",
},
));
}
None
};
let sstable_scan = SstableScan::new(
data_stream,
delete_stream_with_extractor,
data_extractor,
projection_indices,
Some(Order::Asc),
plan.read_ts,
);
streams.push(ScanStream::from(sstable_scan));
}
Ok(streams)
}
pub(crate) fn scan_immutable_rows_at(
&self,
read_ts: Timestamp,
) -> Result<Vec<(KeyRow, DynRow)>, KeyExtractError> {
let mut rows = Vec::new();
let segments = self.seal_state_lock().immutables.clone();
for segment in segments {
let scan = segment.scan_visible(None, read_ts)?;
for result in scan {
match result {
Ok(ImmutableVisibleEntry::Row(key_view, row_raw)) => {
let row = row_raw.into_owned().map_err(|err| {
KeyExtractError::Arrow(arrow_schema::ArrowError::ComputeError(
err.to_string(),
))
})?;
let (key_row, _) = key_view.into_parts();
rows.push((key_row, row));
}
Ok(ImmutableVisibleEntry::Tombstone(_)) => {}
Err(err) => return Err(KeyExtractError::from(err)),
}
}
}
Ok(rows)
}
pub(crate) fn scan_mutable_rows_at(
&self,
read_ts: Timestamp,
) -> Result<Vec<(KeyRow, DynRow)>, KeyExtractError> {
let mut rows = Vec::new();
let mem = self.mem.read();
let scan = mem.scan_visible(None, read_ts)?;
for entry in scan {
match entry {
Ok(DynRowScanEntry::Row(key_view, row_raw)) => {
let row = row_raw.into_owned().map_err(|err| {
KeyExtractError::Arrow(arrow_schema::ArrowError::ComputeError(
err.to_string(),
))
})?;
let (key_row, _) = key_view.into_parts();
rows.push((key_row, row));
}
Ok(DynRowScanEntry::Tombstone(_)) => {}
Err(err) => return Err(KeyExtractError::from(err)),
}
}
Ok(rows)
}
#[cfg(test)]
pub fn scan(&self) -> ScanBuilder<'_, FS, E> {
ScanBuilder::new(self)
}
}
pub(crate) enum SnapshotSource<'a> {
Lazy,
Preexisting(&'a TxSnapshot),
}
pub(crate) struct StagedOverlay<'a> {
pub(crate) staging: &'a BTreeMap<KeyOwned, DynMutation<DynRow, ()>>,
pub(crate) schema: &'a SchemaRef,
}
pub struct ScanBuilder<'a, FS, E>
where
FS: crate::manifest::ManifestFs<E>,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
db: &'a DbInner<FS, E>,
snapshot_source: SnapshotSource<'a>,
staged_overlay: Option<StagedOverlay<'a>>,
predicate: Option<Expr>,
projection: Option<SchemaRef>,
limit: Option<usize>,
}
impl<'a, FS, E> ScanBuilder<'a, FS, E>
where
FS: crate::manifest::ManifestFs<E>,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
pub(crate) fn new(db: &'a DbInner<FS, E>) -> Self {
Self {
db,
snapshot_source: SnapshotSource::Lazy,
staged_overlay: None,
predicate: None,
projection: None,
limit: None,
}
}
pub(crate) fn from_snapshot(db: &'a DbInner<FS, E>, snapshot: &'a TxSnapshot) -> Self {
Self {
db,
snapshot_source: SnapshotSource::Preexisting(snapshot),
staged_overlay: None,
predicate: None,
projection: None,
limit: None,
}
}
pub(crate) fn from_snapshot_with_db(
db: &'a super::DB<FS, E>,
snapshot: &'a TxSnapshot,
) -> Self {
Self::from_snapshot(&db.inner, snapshot)
}
pub(crate) fn with_transaction_overlay(
db: &'a DbInner<FS, E>,
snapshot: &'a TxSnapshot,
staging: &'a BTreeMap<KeyOwned, DynMutation<DynRow, ()>>,
schema: &'a SchemaRef,
) -> Self {
Self {
db,
snapshot_source: SnapshotSource::Preexisting(snapshot),
staged_overlay: Some(StagedOverlay { staging, schema }),
predicate: None,
projection: None,
limit: None,
}
}
#[must_use]
pub fn filter(mut self, predicate: Expr) -> Self {
self.predicate = Some(predicate);
self
}
#[must_use]
pub fn projection(mut self, schema: SchemaRef) -> Self {
self.projection = Some(schema);
self
}
#[must_use]
pub fn limit(mut self, limit: usize) -> Self {
self.limit = Some(limit);
self
}
pub async fn stream(
self,
) -> Result<impl Stream<Item = Result<RecordBatch, crate::db::DBError>> + 'a, crate::db::DBError>
{
let (stream, _) = self.stream_with_profile().await?;
Ok(stream)
}
pub async fn stream_with_profile(
self,
) -> Result<
(
impl Stream<Item = Result<RecordBatch, crate::db::DBError>> + 'a,
ScanSetupProfile,
),
crate::db::DBError,
> {
let Self {
db,
snapshot_source,
staged_overlay,
predicate,
projection,
limit,
} = self;
let predicate = predicate.unwrap_or(Expr::True);
let mut profile = ScanSetupProfile::default();
let snapshot = match snapshot_source {
SnapshotSource::Preexisting(snap) => snap.clone(),
SnapshotSource::Lazy => {
let started = Instant::now();
let snapshot = db.begin_snapshot().await?;
profile.snapshot_ns = duration_ns_u64(started.elapsed());
snapshot
}
};
let txn_scan = match staged_overlay {
Some(overlay) if !overlay.staging.is_empty() => {
let started = Instant::now();
let plan = snapshot
.plan_scan_with_context(db, &predicate, projection.as_ref(), limit, true)
.await?;
profile.plan_scan_ns = duration_ns_u64(started.elapsed());
Some((
TransactionScan::new(
overlay.staging,
overlay.schema,
plan.read_ts,
Some(&plan.scan_schema),
)
.map_err(crate::db::DBError::from)?,
plan,
))
}
_ => None,
};
let plan = match txn_scan {
Some((scan, plan)) => {
return execute_with_txn_scan(db, plan, Some(scan), profile).await;
}
None => {
let started = Instant::now();
let plan = snapshot
.plan_scan(db, &predicate, projection.as_ref(), limit)
.await?;
profile.plan_scan_ns = duration_ns_u64(started.elapsed());
plan
}
};
execute_with_txn_scan(db, plan, None, profile).await
}
pub async fn collect(self) -> Result<Vec<RecordBatch>, crate::db::DBError> {
self.stream().await?.try_collect().await
}
}
async fn execute_with_txn_scan<'a, FS, E>(
db: &'a DbInner<FS, E>,
plan: ScanPlan,
txn_scan: Option<TransactionScan<'a>>,
mut profile: ScanSetupProfile,
) -> Result<
(
Pin<Box<dyn Stream<Item = Result<RecordBatch, crate::db::DBError>> + 'a>>,
ScanSetupProfile,
),
crate::db::DBError,
>
where
FS: crate::manifest::ManifestFs<E>,
E: Executor + Timer + Clone + 'static,
<FS as fusio::fs::Fs>::File: fusio::durability::FileCommit,
{
let result_projection = plan
.projected_schema
.clone()
.unwrap_or_else(|| Arc::clone(&db.schema));
let scan_schema = Arc::clone(&plan.scan_schema);
let build_streams_started = Instant::now();
let streams = db.build_scan_streams(&plan, txn_scan).await?;
profile.build_scan_streams_ns = duration_ns_u64(build_streams_started.elapsed());
if streams.is_empty() {
let stream = stream::empty::<Result<RecordBatch, crate::db::DBError>>();
return Ok((
Box::pin(stream)
as Pin<Box<dyn Stream<Item = Result<RecordBatch, crate::db::DBError>> + 'a>>,
profile,
));
}
let ScanPlan {
residual_predicate,
limit,
..
} = plan;
let needs_post_filter = residual_predicate.is_some();
let limit_for_merge = if needs_post_filter { None } else { limit };
let limit_for_package = if needs_post_filter { limit } else { None };
let merge_started = Instant::now();
let merge = MergeStream::from_vec(streams, limit_for_merge, Some(Order::Asc))
.await
.map_err(crate::db::DBError::from)?;
profile.merge_init_ns = duration_ns_u64(merge_started.elapsed());
let package_started = Instant::now();
let package = PackageStream::with_limit(
DEFAULT_SCAN_BATCH_ROWS,
merge,
Arc::clone(&scan_schema),
Arc::clone(&result_projection),
residual_predicate,
limit_for_package,
)
.map_err(crate::db::DBError::from)?;
profile.package_init_ns = duration_ns_u64(package_started.elapsed());
let mapped = package.map(|result| result.map_err(crate::db::DBError::from));
Ok((
Box::pin(mapped)
as Pin<Box<dyn Stream<Item = Result<RecordBatch, crate::db::DBError>> + 'a>>,
profile,
))
}
fn duration_ns_u64(duration: std::time::Duration) -> u64 {
u64::try_from(duration.as_nanos()).unwrap_or(u64::MAX)
}
fn find_bloom_filter_column(predicate: &Expr) -> Option<&str> {
match predicate {
Expr::BloomFilterEq { column, .. } | Expr::BloomFilterInList { column, .. } => {
Some(column.as_str())
}
Expr::And(children) | Expr::Or(children) => children
.iter()
.find_map(|child| find_bloom_filter_column(child)),
Expr::Not(child) => find_bloom_filter_column(child.as_ref()),
_ => None,
}
}
#[derive(Debug)]
struct NonSstPredicateSplit {
pushdown: Option<Expr>,
residual: Option<Expr>,
}
fn split_predicate_for_non_sst(predicate: &Expr, key_schema: &SchemaRef) -> NonSstPredicateSplit {
if matches!(predicate, Expr::True) {
return NonSstPredicateSplit {
pushdown: None,
residual: None,
};
}
if matches!(predicate, Expr::False) {
return NonSstPredicateSplit {
pushdown: Some(Expr::False),
residual: None,
};
}
if key_schema.fields().len() != 1 {
return NonSstPredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
};
}
let Some(key_field) = key_schema.fields().first() else {
return NonSstPredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
};
};
split_predicate_for_non_sst_inner(predicate, key_field.name(), key_field.data_type())
}
fn split_predicate_for_non_sst_inner(
predicate: &Expr,
key_column: &str,
key_type: &DataType,
) -> NonSstPredicateSplit {
match predicate {
Expr::True => NonSstPredicateSplit {
pushdown: None,
residual: None,
},
Expr::False => NonSstPredicateSplit {
pushdown: Some(Expr::False),
residual: None,
},
Expr::Cmp { column, op, .. } => {
if column != key_column || matches!(op, aisle::CmpOp::NotEq) {
return NonSstPredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
};
}
NonSstPredicateSplit {
pushdown: Some(predicate.clone()),
residual: None,
}
}
Expr::Between { column, .. } => {
if column != key_column {
return NonSstPredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
};
}
NonSstPredicateSplit {
pushdown: Some(predicate.clone()),
residual: None,
}
}
Expr::StartsWith { column, prefix } => {
if column != key_column || prefix.is_empty() || !is_string_key_type(key_type) {
return NonSstPredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
};
}
if next_prefix_string(prefix).is_none() {
return NonSstPredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
};
}
NonSstPredicateSplit {
pushdown: Some(predicate.clone()),
residual: None,
}
}
Expr::And(children) => {
let mut pushdown = Vec::new();
let mut residual = Vec::new();
for child in children {
let split = split_predicate_for_non_sst_inner(child, key_column, key_type);
if let Some(expr) = split.pushdown {
pushdown.push(expr);
}
if let Some(expr) = split.residual {
residual.push(expr);
}
}
NonSstPredicateSplit {
pushdown: combine_and_parts(pushdown),
residual: combine_and_parts(residual),
}
}
_ => NonSstPredicateSplit {
pushdown: None,
residual: Some(predicate.clone()),
},
}
}
fn is_string_key_type(data_type: &DataType) -> bool {
matches!(
data_type,
DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
)
}
fn combine_and_parts(mut predicates: Vec<Expr>) -> Option<Expr> {
match predicates.len() {
0 => None,
1 => Some(predicates.remove(0)),
_ => Some(Expr::and(predicates)),
}
}
fn combine_predicates_with_and(lhs: Option<Expr>, rhs: Option<Expr>) -> Option<Expr> {
match (lhs, rhs) {
(Some(lhs), Some(rhs)) => Some(Expr::and(vec![lhs, rhs])),
(Some(expr), None) | (None, Some(expr)) => Some(expr),
(None, None) => None,
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_schema::{DataType, Field, Schema};
use super::*;
#[test]
fn split_non_sst_true_with_composite_key_keeps_residual_empty() {
let key_schema = Arc::new(Schema::new(vec![
Field::new("k1", DataType::Utf8, false),
Field::new("k2", DataType::Int64, false),
]));
let split = split_predicate_for_non_sst(&Expr::True, &key_schema);
assert!(split.pushdown.is_none());
assert!(split.residual.is_none());
}
#[test]
fn split_non_sst_starts_with_without_upper_bound_requires_residual() {
let key_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let predicate = Expr::StartsWith {
column: "id".to_string(),
prefix: char::MAX.to_string(),
};
let split = split_predicate_for_non_sst(&predicate, &key_schema);
assert!(split.pushdown.is_none());
assert!(split.residual.is_some());
}
#[test]
fn split_non_sst_starts_with_unicode_gap_keeps_full_pushdown() {
let key_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Utf8, false)]));
let predicate = Expr::StartsWith {
column: "id".to_string(),
prefix: "\u{d7ff}".to_string(),
};
let split = split_predicate_for_non_sst(&predicate, &key_schema);
assert!(split.pushdown.is_some());
assert!(split.residual.is_none());
}
#[test]
fn split_non_sst_starts_with_on_non_string_key_requires_residual() {
let key_schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
let predicate = Expr::StartsWith {
column: "id".to_string(),
prefix: "12".to_string(),
};
let split = split_predicate_for_non_sst(&predicate, &key_schema);
assert!(split.pushdown.is_none());
assert!(split.residual.is_some());
}
}