use std::{
any::Any,
cmp,
collections::{HashMap, HashSet},
fmt,
ops::Range,
sync::{Arc, atomic},
time::Instant,
};
use arrow_array::ArrayRef;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use bytes::Bytes;
use datafusion::{
catalog::{Session, TableProvider},
common::{ColumnStatistics, DFSchema, Statistics, stats::Precision},
datasource::{
listing::PartitionedFile,
physical_plan::{
FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource,
parquet::ParquetAccessPlan,
},
source::DataSourceExec,
},
error::{DataFusionError, Result as DfResult},
execution::object_store::ObjectStoreUrl,
logical_expr::{Expr, Operator, TableProviderFilterPushDown, TableType},
object_store::path::Path as ObjPath,
physical_expr::PhysicalExpr,
physical_plan::{ExecutionPlan, empty::EmptyExec, metrics::ExecutionPlanMetricsSet},
scalar::ScalarValue,
};
use futures::{FutureExt, future::BoxFuture};
use object_store::ObjectStore as OsObjectStore;
use parquet::{
arrow::{
arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReaderBuilder, RowSelection, RowSelector,
},
async_reader::{AsyncFileReader, ParquetObjectReader},
},
errors,
file::metadata::ParquetMetaData,
};
use roaring::RoaringBitmap;
use uuid::Uuid;
use crate::{
superfile::{LazyByteSource, fts::reader::BoolMode},
supertable::{
SuperfileEntry,
manifest::{ManifestSnapshot, add_sum_arrays, hll::HllSketch},
options::{DECIMAL128_PRECISION, DECIMAL128_SCALE},
query::{
candidate::CandidatePlan,
df_object_store::SuperfileObjectStore,
prune::{PruneLeaf, select_superfiles},
skip::{ScalarOp, ScalarPredicate},
superfile_reader::superfile_reader,
},
reader_cache::{DiskCacheStore, SuperfileReaderCache},
tombstones::SidecarCache,
},
};
pub(crate) const TABLE_NAME: &str = "supertable";
const SUPERFILE_STORE_URL_PREFIX: &str = "superfile://supertable-";
static STORE_URL_SEQ: atomic::AtomicU64 = atomic::AtomicU64::new(0);
const PUSHDOWN_MAX_FRACTION: f64 = 0.01;
const PUSHDOWN_MIN_ROWS: u64 = 4096;
const PUSHDOWN_MAX_DENSITY: f64 = 0.5;
pub(crate) struct SupertableProvider {
schema: SchemaRef,
manifest: Arc<ManifestSnapshot>,
store: Arc<dyn SuperfileReaderCache>,
disk_cache: Option<Arc<DiskCacheStore>>,
tombstone_cache: Option<Arc<SidecarCache>>,
store_url: ObjectStoreUrl,
segment_filter: Option<HashSet<Uuid>>,
}
impl fmt::Debug for SupertableProvider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SupertableProvider")
.field("schema", &self.schema)
.field("n_superfiles", &self.manifest.superfiles.len())
.field("has_disk_cache", &self.disk_cache.is_some())
.field("has_tombstone_cache", &self.tombstone_cache.is_some())
.finish()
}
}
impl SupertableProvider {
pub(crate) fn new(
schema: SchemaRef,
manifest: Arc<ManifestSnapshot>,
store: Arc<dyn SuperfileReaderCache>,
disk_cache: Option<Arc<DiskCacheStore>>,
tombstone_cache: Option<Arc<SidecarCache>>,
) -> Self {
let seq = STORE_URL_SEQ.fetch_add(1, atomic::Ordering::Relaxed);
let store_url = ObjectStoreUrl::parse(format!("{SUPERFILE_STORE_URL_PREFIX}{seq}/"))
.expect("invariant: a counter-derived store URL is always valid");
Self {
schema,
manifest,
store,
disk_cache,
tombstone_cache,
store_url,
segment_filter: None,
}
}
pub(crate) fn restricted_to(&self, segments: HashSet<Uuid>) -> Self {
let mut restricted = Self::new(
Arc::clone(&self.schema),
Arc::clone(&self.manifest),
Arc::clone(&self.store),
self.disk_cache.clone(),
self.tombstone_cache.clone(),
);
restricted.segment_filter = Some(segments);
restricted
}
pub(crate) fn is_segment_restricted(&self) -> bool {
self.segment_filter.is_some()
}
pub(crate) fn manifest(&self) -> &Arc<ManifestSnapshot> {
&self.manifest
}
pub(crate) fn entry_is_clean(&self, entry: &SuperfileEntry) -> bool {
match self.tombstone_cache.as_ref() {
None => true,
Some(cache) => cache
.bitmap_for(entry.superfile_id, Instant::now())
.map(|bitmap| bitmap.is_empty())
.unwrap_or(false),
}
}
fn predicates_to_prune_leaves(&self, predicates: Vec<ScalarPredicate>) -> Vec<PruneLeaf> {
let opts = &self.manifest.options;
let mut leaves = Vec::with_capacity(predicates.len());
for pred in predicates {
if pred.op == ScalarOp::Eq
&& opts.fts_columns.iter().any(|c| c.column == pred.column)
&& let Some(tok) = opts.tokenizer.as_ref()
&& let Some(literal) = scalar_as_str(&pred.value)
{
let terms: Vec<String> = tok.tokenize(literal).collect();
if !terms.is_empty() {
leaves.push(PruneLeaf::TermPresence {
column: pred.column.clone(),
terms,
mode: BoolMode::And,
});
}
}
leaves.push(PruneLeaf::Scalar(pred));
}
leaves
}
async fn select_survivors(&self, filters: &[Expr]) -> DfResult<Vec<Arc<SuperfileEntry>>> {
let predicates = exprs_to_scalar_predicates(filters, &self.schema);
let leaves = self.predicates_to_prune_leaves(predicates);
let mut survivors = select_superfiles(self.manifest.as_ref(), &leaves)
.await
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
if let Some(allowed) = self.segment_filter.as_ref() {
survivors.retain(|entry| allowed.contains(&entry.superfile_id));
}
Ok(survivors)
}
fn fts_cols_set(&self) -> HashSet<&str> {
self.manifest
.options
.fts_columns
.iter()
.map(|c| c.column.as_str())
.collect()
}
#[cfg(test)]
pub(crate) async fn surviving_superfile_count(&self, filters: &[Expr]) -> usize {
self.select_survivors(filters)
.await
.expect("select survivors")
.len()
}
fn statistics_for(&self, entries: &[Arc<SuperfileEntry>]) -> Statistics {
let total_rows: u64 = entries.iter().map(|e| e.n_docs).sum();
let now = Instant::now();
let mut deleted: u64 = 0;
let mut views_resolved = true;
if let Some(cache) = self.tombstone_cache.as_ref() {
for entry in entries {
match cache.bitmap_for(entry.superfile_id, now) {
Ok(bitmap) => deleted += bitmap.len(),
Err(_) => {
views_resolved = false;
break;
}
}
}
}
let num_rows = if views_resolved {
Precision::Exact((total_rows - deleted) as usize)
} else {
Precision::Inexact(total_rows as usize)
};
let clean = views_resolved && deleted == 0;
let wrap = |v: ScalarValue| {
if clean {
Precision::Exact(v)
} else {
Precision::Inexact(v)
}
};
let id_column = self.manifest.options.id_column.as_str();
let column_statistics = self
.schema
.fields()
.iter()
.map(|field| {
let name = field.name().as_str();
if name == id_column {
let mut stats = ColumnStatistics::new_unknown();
if let Some((min, max)) = id_min_max(entries) {
stats.min_value = wrap(min);
stats.max_value = wrap(max);
}
stats.null_count = Precision::Exact(0);
stats.distinct_count = num_rows;
return stats;
}
let mut stats = ColumnStatistics::new_unknown();
if let Some((min, max)) = scalar_min_max(entries, name) {
stats.min_value = wrap(min);
stats.max_value = wrap(max);
}
if let Some(nulls) = scalar_null_count(entries, name) {
stats.null_count = if clean {
Precision::Exact(nulls as usize)
} else {
Precision::Inexact(nulls as usize)
};
}
if let Some(sum) = scalar_sum(entries, name) {
stats.sum_value = wrap(sum);
}
if let Some(distinct) = scalar_distinct(entries, name) {
stats.distinct_count = Precision::Inexact(distinct);
}
stats
})
.collect();
Statistics {
num_rows,
total_byte_size: Precision::Absent,
column_statistics,
}
}
}
fn id_min_max(entries: &[Arc<SuperfileEntry>]) -> Option<(ScalarValue, ScalarValue)> {
let min = entries.iter().map(|e| e.id_min).min()?;
let max = entries.iter().map(|e| e.id_max).max()?;
Some((
ScalarValue::Decimal128(Some(min), DECIMAL128_PRECISION, DECIMAL128_SCALE),
ScalarValue::Decimal128(Some(max), DECIMAL128_PRECISION, DECIMAL128_SCALE),
))
}
fn scalar_null_count(entries: &[Arc<SuperfileEntry>], name: &str) -> Option<u64> {
entries.iter().try_fold(0u64, |acc, entry| {
acc.checked_add(entry.scalar_stats.get(name)?.null_count?)
})
}
fn scalar_sum(entries: &[Arc<SuperfileEntry>], name: &str) -> Option<ScalarValue> {
let mut acc: Option<ArrayRef> = None;
for entry in entries {
let part = entry.scalar_stats.get(name)?.sum.as_ref()?;
acc = Some(match acc {
None => Arc::clone(part),
Some(total) => add_sum_arrays(&total, part)?,
});
}
ScalarValue::try_from_array(&acc?, 0).ok()
}
fn scalar_distinct(entries: &[Arc<SuperfileEntry>], name: &str) -> Option<usize> {
let mut merged: Option<HllSketch> = None;
for entry in entries {
let sketch = HllSketch::from_bytes(entry.scalar_stats.get(name)?.hll.as_ref()?)?;
merged = Some(match merged {
None => sketch,
Some(mut acc) => {
acc.merge(&sketch);
acc
}
});
}
Some(merged?.estimate().round() as usize)
}
fn scalar_min_max(
entries: &[Arc<SuperfileEntry>],
name: &str,
) -> Option<(ScalarValue, ScalarValue)> {
let mut acc: Option<(ScalarValue, ScalarValue)> = None;
for entry in entries {
let agg = entry.scalar_stats.get(name)?;
let min = ScalarValue::try_from_array(&agg.min, 0).ok()?;
let max = ScalarValue::try_from_array(&agg.max, 0).ok()?;
if min.is_null() || max.is_null() {
return None;
}
acc = match acc {
None => Some((min, max)),
Some((cur_min, cur_max)) => {
let new_min = match min.partial_cmp(&cur_min)? {
cmp::Ordering::Less => min,
_ => cur_min,
};
let new_max = match max.partial_cmp(&cur_max)? {
cmp::Ordering::Greater => max,
_ => cur_max,
};
Some((new_min, new_max))
}
};
}
acc
}
fn scalar_as_str(v: &ScalarValue) -> Option<&str> {
match v {
ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s)) => Some(s.as_str()),
_ => None,
}
}
#[async_trait]
impl TableProvider for SupertableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}
fn table_type(&self) -> TableType {
TableType::Base
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> DfResult<Vec<TableProviderFilterPushDown>> {
Ok(vec![TableProviderFilterPushDown::Inexact; filters.len()])
}
fn statistics(&self) -> Option<Statistics> {
if !self.manifest.is_in_process_only() {
return None;
}
Some(self.statistics_for(&self.manifest.superfiles))
}
async fn scan(
&self,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> DfResult<Arc<dyn ExecutionPlan>> {
let survivor_entries = self.select_survivors(filters).await?;
let survivors: Vec<&Arc<SuperfileEntry>> = survivor_entries.iter().collect();
if survivors.is_empty() {
let projected = match projection {
Some(indices) => Arc::new(self.schema.project(indices)?),
None => Arc::clone(&self.schema),
};
return Ok(Arc::new(EmptyExec::new(projected)));
}
let now = Instant::now();
if let Some(cache) = self.tombstone_cache.as_ref() {
let ids: Vec<_> = survivors.iter().map(|e| e.superfile_id).collect();
cache.prefetch(&ids, now).await;
}
let candidate_plan = CandidatePlan::from_filters(
filters,
&self.fts_cols_set(),
self.manifest.options.tokenizer.as_ref(),
);
let mut sources: HashMap<ObjPath, Arc<dyn LazyByteSource>> = HashMap::new();
struct SuperfileScan {
path: ObjPath,
size: u64,
candidates: Option<RoaringBitmap>,
tombstones: Arc<RoaringBitmap>,
parquet_meta: Arc<ParquetMetaData>,
}
let mut superfiles: Vec<SuperfileScan> = Vec::with_capacity(survivors.len());
for entry in &survivors {
let reader = superfile_reader(
&self.store,
self.disk_cache.as_ref(),
self.manifest.options.storage.as_ref(),
&entry.uri,
entry.subsection_offsets.as_ref(),
)
.await
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
let est = candidate_plan
.estimate(reader.as_ref())
.await
.map_err(|e| DataFusionError::Execution(e.to_string()))?;
let gate =
((reader.n_docs() as f64 * PUSHDOWN_MAX_FRACTION) as u64).max(PUSHDOWN_MIN_ROWS);
let density_cap = (reader.n_docs() as f64 * PUSHDOWN_MAX_DENSITY) as u64;
let candidates = if est > gate || est >= density_cap {
None
} else {
candidate_plan
.evaluate(reader.as_ref())
.await
.map_err(|e| DataFusionError::Execution(e.to_string()))?
};
let tombstones = match self.tombstone_cache.as_ref() {
Some(cache) => cache
.bitmap_for(entry.superfile_id, now)
.map_err(|e| DataFusionError::Execution(format!("tombstone cache: {e}")))?,
None => Arc::new(RoaringBitmap::new()),
};
let source = reader.byte_source();
let size = source.size();
let path = ObjPath::from(entry.uri.storage_path());
sources.insert(path.clone(), source);
superfiles.push(SuperfileScan {
path,
size,
candidates,
tombstones,
parquet_meta: Arc::clone(reader.parquet_metadata()),
});
}
let store: Arc<dyn OsObjectStore> = Arc::new(SuperfileObjectStore::from_sources(sources));
let mut files: Vec<PartitionedFile> = Vec::with_capacity(superfiles.len());
for seg in &superfiles {
let access_plan = if seg.candidates.is_some() || !seg.tombstones.is_empty() {
let row_counts: Vec<u32> = seg
.parquet_meta
.row_groups()
.iter()
.map(|rg| rg.num_rows() as u32)
.collect();
build_access_plan(&row_counts, &seg.candidates, &seg.tombstones)
} else {
None
};
let mut file = PartitionedFile::new(seg.path.to_string(), seg.size);
if let Some(plan) = access_plan {
file = file.with_extensions(Arc::new(plan));
}
files.push(file);
}
let index_bounded = !matches!(candidate_plan, CandidatePlan::Unbounded);
let predicate = if !index_bounded {
row_group_predicate(state, filters, &self.schema)
} else {
None
};
let effective_limit = if filters.is_empty() { limit } else { None };
let mut source = ParquetSource::new(Arc::clone(&self.schema));
if let Some(predicate) = predicate.as_ref() {
source = source
.with_predicate(Arc::clone(predicate))
.with_pushdown_filters(true)
.with_reorder_filters(true);
}
let metas: HashMap<ObjPath, Arc<ParquetMetaData>> = superfiles
.iter()
.map(|s| (s.path.clone(), Arc::clone(&s.parquet_meta)))
.collect();
source = source.with_parquet_file_reader_factory(Arc::new(CachedMetadataReaderFactory {
store: Arc::clone(&store),
metas,
}));
let scan_stats = {
let stats = self.statistics_for(&survivor_entries);
if filters.is_empty() {
stats
} else {
stats.to_inexact()
}
};
let url = self.store_url.clone();
state
.runtime_env()
.register_object_store(url.as_ref(), store);
let mut builder = FileScanConfigBuilder::new(url, Arc::new(source));
for file in files {
builder = builder.with_file(file);
}
let config = builder
.with_statistics(scan_stats)
.with_projection_indices(projection.cloned())?
.with_limit(effective_limit)
.build();
Ok(DataSourceExec::from_data_source(config))
}
}
pub(crate) fn tombstone_access_plan(
parquet_bytes: &Bytes,
bitmap: &RoaringBitmap,
) -> DfResult<Option<ParquetAccessPlan>> {
Ok(tombstone_access_plan_from_counts(
&row_group_rows_from_bytes(parquet_bytes)?,
bitmap,
))
}
fn tombstone_access_plan_from_counts(
row_counts: &[u32],
bitmap: &RoaringBitmap,
) -> Option<ParquetAccessPlan> {
let deleted: Vec<u32> = bitmap.iter().collect();
let mut plan = ParquetAccessPlan::new_all(row_counts.len());
let mut base: u32 = 0;
let mut any = false;
for (idx, &n) in row_counts.iter().enumerate() {
if n == 0 {
continue;
}
let lo = deleted.partition_point(|&x| x < base);
let hi = deleted.partition_point(|&x| x < base + n);
let rg_deleted = &deleted[lo..hi];
if rg_deleted.is_empty() {
base += n;
continue;
}
any = true;
if rg_deleted.len() as u32 == n {
plan.skip(idx);
base += n;
continue;
}
let mut selectors: Vec<RowSelector> = Vec::new();
let mut cursor: u32 = 0; let mut i = 0usize;
while i < rg_deleted.len() {
let start_rel = rg_deleted[i] - base;
if start_rel > cursor {
selectors.push(RowSelector::select((start_rel - cursor) as usize));
}
let mut j = i;
while j + 1 < rg_deleted.len() && rg_deleted[j + 1] == rg_deleted[j] + 1 {
j += 1;
}
let run = (rg_deleted[j] - rg_deleted[i] + 1) as usize;
selectors.push(RowSelector::skip(run));
cursor = (rg_deleted[j] - base) + 1;
i = j + 1;
}
if cursor < n {
selectors.push(RowSelector::select((n - cursor) as usize));
}
plan.scan_selection(idx, RowSelection::from(selectors));
base += n;
}
any.then_some(plan)
}
fn row_group_rows_from_bytes(parquet_bytes: &Bytes) -> DfResult<Vec<u32>> {
let builder = ParquetRecordBatchReaderBuilder::try_new(parquet_bytes.clone())
.map_err(|e| DataFusionError::Execution(format!("parquet metadata: {e}")))?;
Ok(builder
.metadata()
.row_groups()
.iter()
.map(|rg| rg.num_rows() as u32)
.collect())
}
struct CachedMetadataReaderFactory {
store: Arc<dyn OsObjectStore>,
metas: HashMap<ObjPath, Arc<ParquetMetaData>>,
}
impl fmt::Debug for CachedMetadataReaderFactory {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("CachedMetadataReaderFactory")
.field("superfiles", &self.metas.len())
.finish()
}
}
struct CachedMetadataReader {
inner: ParquetObjectReader,
meta: Option<Arc<ParquetMetaData>>,
}
impl AsyncFileReader for CachedMetadataReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, errors::Result<Bytes>> {
self.inner.get_bytes(range)
}
fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, errors::Result<Vec<Bytes>>> {
self.inner.get_byte_ranges(ranges)
}
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, errors::Result<Arc<ParquetMetaData>>> {
match self.meta.clone() {
Some(meta) => async move { Ok(meta) }.boxed(),
None => self.inner.get_metadata(options),
}
}
}
impl ParquetFileReaderFactory for CachedMetadataReaderFactory {
fn create_reader(
&self,
_partition_index: usize,
partitioned_file: PartitionedFile,
metadata_size_hint: Option<usize>,
_metrics: &ExecutionPlanMetricsSet,
) -> DfResult<Box<dyn AsyncFileReader + Send>> {
let location = &partitioned_file.object_meta.location;
let mut inner = ParquetObjectReader::new(Arc::clone(&self.store), location.clone())
.with_file_size(partitioned_file.object_meta.size);
if let Some(hint) = metadata_size_hint {
inner = inner.with_footer_size_hint(hint);
}
Ok(Box::new(CachedMetadataReader {
meta: self.metas.get(location).cloned(),
inner,
}))
}
}
fn build_access_plan(
row_counts: &[u32],
candidates: &Option<RoaringBitmap>,
tombstones: &RoaringBitmap,
) -> Option<ParquetAccessPlan> {
match candidates {
Some(keep) => {
let mut keep = keep.clone();
keep -= tombstones;
Some(selection_access_plan_from_counts(row_counts, &keep))
}
None => {
if tombstones.is_empty() {
None
} else {
tombstone_access_plan_from_counts(row_counts, tombstones)
}
}
}
}
fn selection_access_plan_from_counts(
row_counts: &[u32],
keep: &RoaringBitmap,
) -> ParquetAccessPlan {
let kept: Vec<u32> = keep.iter().collect();
let mut plan = ParquetAccessPlan::new_all(row_counts.len());
let mut base: u32 = 0;
for (idx, &n) in row_counts.iter().enumerate() {
if n == 0 {
continue;
}
let lo = kept.partition_point(|&x| x < base);
let hi = kept.partition_point(|&x| x < base + n);
let rg_kept = &kept[lo..hi];
if rg_kept.is_empty() {
plan.skip(idx);
base += n;
continue;
}
if rg_kept.len() as u32 == n {
base += n;
continue;
}
let mut selectors: Vec<RowSelector> = Vec::new();
let mut cursor: u32 = 0; let mut i = 0usize;
while i < rg_kept.len() {
let start_rel = rg_kept[i] - base;
if start_rel > cursor {
selectors.push(RowSelector::skip((start_rel - cursor) as usize));
}
let mut j = i;
while j + 1 < rg_kept.len() && rg_kept[j + 1] == rg_kept[j] + 1 {
j += 1;
}
let run = (rg_kept[j] - rg_kept[i] + 1) as usize;
selectors.push(RowSelector::select(run));
cursor = (rg_kept[j] - base) + 1;
i = j + 1;
}
if cursor < n {
selectors.push(RowSelector::skip((n - cursor) as usize));
}
plan.scan_selection(idx, RowSelection::from(selectors));
base += n;
}
plan
}
pub(crate) fn exprs_to_scalar_predicates(
filters: &[Expr],
schema: &SchemaRef,
) -> Vec<ScalarPredicate> {
let mut out = Vec::new();
for filter in filters {
collect_conjuncts(filter, schema, &mut out);
}
out
}
fn collect_conjuncts(expr: &Expr, schema: &SchemaRef, out: &mut Vec<ScalarPredicate>) {
if let Expr::BinaryExpr(be) = expr {
if be.op == Operator::And {
collect_conjuncts(&be.left, schema, out);
collect_conjuncts(&be.right, schema, out);
} else if let Some(p) = leaf_to_predicate(&be.left, be.op, &be.right, schema) {
out.push(p);
}
}
}
fn leaf_to_predicate(
left: &Expr,
op: Operator,
right: &Expr,
schema: &SchemaRef,
) -> Option<ScalarPredicate> {
let (column, value, scalar_op) = match (left, right) {
(Expr::Column(c), Expr::Literal(v, _)) => (&c.name, v, map_op(op)?),
(Expr::Literal(v, _), Expr::Column(c)) => (&c.name, v, flip_op(map_op(op)?)),
_ => return None,
};
schema.field_with_name(column).ok()?;
Some(ScalarPredicate {
column: column.clone(),
op: scalar_op,
value: value.clone(),
})
}
fn map_op(op: Operator) -> Option<ScalarOp> {
match op {
Operator::Eq => Some(ScalarOp::Eq),
Operator::NotEq => Some(ScalarOp::NotEq),
Operator::Lt => Some(ScalarOp::Lt),
Operator::LtEq => Some(ScalarOp::LtEq),
Operator::Gt => Some(ScalarOp::Gt),
Operator::GtEq => Some(ScalarOp::GtEq),
_ => None,
}
}
fn flip_op(op: ScalarOp) -> ScalarOp {
match op {
ScalarOp::Eq => ScalarOp::Eq,
ScalarOp::NotEq => ScalarOp::NotEq,
ScalarOp::Lt => ScalarOp::Gt,
ScalarOp::LtEq => ScalarOp::GtEq,
ScalarOp::Gt => ScalarOp::Lt,
ScalarOp::GtEq => ScalarOp::LtEq,
}
}
fn row_group_predicate(
state: &dyn Session,
filters: &[Expr],
schema: &SchemaRef,
) -> Option<Arc<dyn PhysicalExpr>> {
let combined = filters.iter().cloned().reduce(|a, b| a.and(b))?;
let df_schema = DFSchema::try_from_qualified_schema(TABLE_NAME, schema.as_ref())
.or_else(|_| DFSchema::try_from(schema.as_ref().clone()))
.ok()?;
state.create_physical_expr(combined, &df_schema).ok()
}
#[cfg(test)]
mod tests {
use arrow_array::{Int64Array, LargeStringArray, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use datafusion::{
prelude::{col, lit},
scalar::ScalarValue,
};
use object_store::memory::InMemory;
use rayon::ThreadPoolBuilder;
use tokio::runtime;
use super::*;
use crate::{
superfile::builder::FtsConfig,
supertable::{
Supertable, SupertableOptions,
manifest::{ScalarStatsAgg, SuperfileUri},
},
test_helpers::default_tokenizer,
};
fn parquet_with_row_groups(total: i64, rg_size: usize) -> Bytes {
use parquet::{arrow::ArrowWriter, file::properties::WriterProperties};
let schema = Arc::new(Schema::new(vec![Field::new("v", DataType::Int64, false)]));
let arr = Int64Array::from((0..total).collect::<Vec<_>>());
let batch = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(arr)]).expect("batch");
let props = WriterProperties::builder()
.set_max_row_group_row_count(Some(rg_size))
.build();
let mut buf = Vec::new();
{
let mut w =
ArrowWriter::try_new(&mut buf, Arc::clone(&schema), Some(props)).expect("writer");
w.write(&batch).expect("write");
w.close().expect("close");
}
Bytes::from(buf)
}
fn read_with_plan(bytes: &Bytes, plan: ParquetAccessPlan) -> Vec<i64> {
let meta = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())
.expect("meta")
.metadata()
.clone();
let row_groups = plan.row_group_indexes();
let selection = plan
.into_overall_row_selection(meta.row_groups())
.expect("overall selection");
let mut builder = ParquetRecordBatchReaderBuilder::try_new(bytes.clone())
.expect("builder")
.with_row_groups(row_groups);
if let Some(sel) = selection {
builder = builder.with_row_selection(sel);
}
let reader = builder.build().expect("reader");
let mut got = Vec::new();
for b in reader {
let b = b.expect("batch");
let c = b
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.expect("int64 col");
for i in 0..c.len() {
got.push(c.value(i));
}
}
got
}
#[test]
fn tombstone_access_plan_none_when_no_deletes_in_file() {
let bytes = parquet_with_row_groups(12, 4);
let mut bm = RoaringBitmap::new();
bm.insert(99);
assert!(
tombstone_access_plan(&bytes, &bm).expect("plan").is_none(),
"no deleted id falls inside the file → full scan (None)"
);
}
#[test]
fn tombstone_access_plan_skips_deleted_across_row_groups() {
let bytes = parquet_with_row_groups(12, 4);
let mut bm = RoaringBitmap::new();
for id in [0u32, 1, 4, 5, 6, 7, 10] {
bm.insert(id);
}
let plan = tombstone_access_plan(&bytes, &bm)
.expect("plan")
.expect("some deletes");
assert!(!plan.should_scan(1), "fully-tombstoned row group 1 skipped");
assert!(plan.should_scan(0));
assert!(plan.should_scan(2));
let survivors = read_with_plan(&bytes, plan);
assert_eq!(survivors, vec![2, 3, 8, 9, 11]);
}
#[test]
fn tombstone_access_plan_handles_alternating_and_boundary_deletes() {
let bytes = parquet_with_row_groups(8, 8);
let mut bm = RoaringBitmap::new();
for id in [0u32, 2, 4, 7] {
bm.insert(id);
}
let plan = tombstone_access_plan(&bytes, &bm)
.expect("plan")
.expect("some deletes");
let survivors = read_with_plan(&bytes, plan);
assert_eq!(survivors, vec![1, 3, 5, 6]);
}
fn schema_xy() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("x", DataType::Int64, true),
Field::new("y", DataType::Int64, true),
]))
}
#[test]
fn col_op_lit_maps_directly() {
let s = schema_xy();
let preds = exprs_to_scalar_predicates(&[col("x").gt(lit(5_i64))], &s);
assert_eq!(preds.len(), 1);
assert_eq!(preds[0].column, "x");
assert_eq!(preds[0].op, ScalarOp::Gt);
assert_eq!(preds[0].value, ScalarValue::Int64(Some(5)));
}
#[test]
fn lit_op_col_flips_operator() {
let s = schema_xy();
let preds = exprs_to_scalar_predicates(&[lit(5_i64).lt(col("x"))], &s);
assert_eq!(preds.len(), 1);
assert_eq!(preds[0].column, "x");
assert_eq!(preds[0].op, ScalarOp::Gt);
assert_eq!(preds[0].value, ScalarValue::Int64(Some(5)));
}
#[test]
fn and_is_flattened_into_two_predicates() {
let s = schema_xy();
let expr = col("x").gt_eq(lit(5_i64)).and(col("x").lt_eq(lit(8_i64)));
let preds = exprs_to_scalar_predicates(&[expr], &s);
assert_eq!(preds.len(), 2);
assert_eq!(preds[0].op, ScalarOp::GtEq);
assert_eq!(preds[1].op, ScalarOp::LtEq);
}
#[test]
fn multiple_top_level_filters_each_contribute() {
let s = schema_xy();
let preds =
exprs_to_scalar_predicates(&[col("x").gt(lit(1_i64)), col("y").lt(lit(9_i64))], &s);
assert_eq!(preds.len(), 2);
assert_eq!(preds[0].column, "x");
assert_eq!(preds[1].column, "y");
}
#[test]
fn col_op_col_is_ignored() {
let s = schema_xy();
let preds = exprs_to_scalar_predicates(&[col("x").gt(col("y"))], &s);
assert!(preds.is_empty());
}
#[test]
fn unknown_column_is_ignored() {
let s = schema_xy();
let preds = exprs_to_scalar_predicates(&[col("z").gt(lit(1_i64))], &s);
assert!(preds.is_empty());
}
#[test]
fn non_comparison_operator_is_ignored() {
let s = schema_xy();
let preds = exprs_to_scalar_predicates(&[col("x") + lit(1_i64)], &s);
assert!(preds.is_empty());
}
fn cat_title_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("category", DataType::LargeUtf8, false),
Field::new("title", DataType::LargeUtf8, false),
]))
}
fn cat_title_opts() -> SupertableOptions {
let pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(1)
.build()
.expect("pool"),
);
SupertableOptions::new(
cat_title_schema(),
vec![FtsConfig {
column: "title".into(),
}],
vec![],
Some(default_tokenizer()),
)
.expect("opts")
.with_writer_pool(pool)
}
fn cat_title_batch(cats: &[&str], titles: &[&str]) -> RecordBatch {
RecordBatch::try_new(
cat_title_schema(),
vec![
Arc::new(LargeStringArray::from(cats.to_vec())),
Arc::new(LargeStringArray::from(titles.to_vec())),
],
)
.expect("batch")
}
#[test]
fn superfile_prune_index_helps_vs_does_not() {
let st = Supertable::create(cat_title_opts()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&cat_title_batch(&["lang", "lang"], &["aardvark", "zebra"]))
.expect("a1");
w.commit().expect("c1");
w.append(&cat_title_batch(&["lang"], &["mango"]))
.expect("a2");
w.commit().expect("c2");
w.append(&cat_title_batch(&["lang", "lang"], &["delta", "sigma"]))
.expect("a3");
w.commit().expect("c3");
assert_eq!(st.reader().n_superfiles(), 3);
let reader = st.reader();
let provider = SupertableProvider::new(
st.options().scalar_schema(),
reader.manifest().clone(),
st.options().store.clone(),
st.options().disk_cache.clone(),
reader.tombstone_cache.clone(),
);
let rt = runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("rt");
assert_eq!(
rt.block_on(provider.surviving_superfile_count(&[col("title").eq(lit("mango"))])),
1,
"FTS bloom prunes to the single token holder"
);
assert_eq!(
rt.block_on(provider.surviving_superfile_count(&[])),
3,
"no predicate → full scan, nothing pruned"
);
assert_eq!(
rt.block_on(provider.surviving_superfile_count(&[col("category").eq(lit("lang"))])),
3,
"non-FTS predicate matching all superfiles prunes nothing"
);
}
fn provider_over_two_superfiles() -> (SupertableProvider, runtime::Runtime) {
let st = Supertable::create(cat_title_opts()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&cat_title_batch(&["a", "a"], &["alpha beta", "gamma"]))
.expect("a1");
w.commit().expect("c1");
w.append(&cat_title_batch(&["b"], &["delta"])).expect("a2");
w.commit().expect("c2");
let reader = st.reader();
let provider = SupertableProvider::new(
st.options().scalar_schema(),
reader.manifest().clone(),
st.options().store.clone(),
st.options().disk_cache.clone(),
reader.tombstone_cache.clone(),
);
let rt = runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("rt");
(provider, rt)
}
#[test]
fn trait_accessors_and_debug() {
let (provider, _rt) = provider_over_two_superfiles();
assert!(
provider
.as_any()
.downcast_ref::<SupertableProvider>()
.is_some()
);
assert!(matches!(provider.table_type(), TableType::Base));
let sch = provider.schema();
assert!(sch.field_with_name("category").is_ok());
assert!(sch.field_with_name("title").is_ok());
let dbg = format!("{provider:?}");
assert!(dbg.contains("SupertableProvider"));
assert!(dbg.contains("n_superfiles"));
}
#[test]
fn supports_filters_pushdown_is_always_inexact() {
let (provider, _rt) = provider_over_two_superfiles();
let f1 = col("category").eq(lit("a"));
let f2 = col("title").eq(lit("alpha"));
let filters = [&f1, &f2];
let pushdown = provider
.supports_filters_pushdown(&filters)
.expect("pushdown");
assert_eq!(pushdown.len(), 2);
assert!(
pushdown
.iter()
.all(|p| matches!(p, TableProviderFilterPushDown::Inexact))
);
}
#[test]
fn statistics_exact_on_clean_in_memory_flat_manifest() {
let (provider, _rt) = provider_over_two_superfiles();
let stats = provider.statistics().expect("flat-manifest statistics");
assert!(matches!(stats.num_rows, Precision::Exact(3)));
assert_eq!(
stats.column_statistics.len(),
provider.schema().fields().len()
);
}
#[test]
fn manifest_accessor_and_restricted_to_idempotency_guard() {
let (provider, _rt) = provider_over_two_superfiles();
assert!(!provider.is_segment_restricted());
let ids: Vec<Uuid> = provider
.manifest()
.superfiles
.iter()
.map(|e| e.superfile_id)
.collect();
assert_eq!(ids.len(), 2);
let only_first: HashSet<Uuid> = [ids[0]].into_iter().collect();
let restricted = provider.restricted_to(only_first);
assert!(restricted.is_segment_restricted());
assert!(Arc::ptr_eq(restricted.manifest(), provider.manifest()));
}
#[test]
fn entry_is_clean_true_without_tombstone_overlay() {
let (provider, _rt) = provider_over_two_superfiles();
for entry in provider.manifest().superfiles.iter() {
assert!(provider.entry_is_clean(entry));
}
}
#[test]
fn restricted_provider_scans_only_its_segment() {
let (provider, rt) = provider_over_two_superfiles();
let first = provider.manifest().superfiles[0].superfile_id;
let only_first: HashSet<Uuid> = [first].into_iter().collect();
let restricted = provider.restricted_to(only_first);
assert_eq!(rt.block_on(provider.surviving_superfile_count(&[])), 2);
assert_eq!(rt.block_on(restricted.surviving_superfile_count(&[])), 1);
}
fn num_schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new("n", DataType::Int64, true)]))
}
fn num_opts() -> SupertableOptions {
let pool = Arc::new(
ThreadPoolBuilder::new()
.num_threads(1)
.build()
.expect("pool"),
);
SupertableOptions::new(num_schema(), vec![], vec![], None)
.expect("opts")
.with_writer_pool(pool)
}
fn num_batch(vals: &[Option<i64>]) -> RecordBatch {
RecordBatch::try_new(
num_schema(),
vec![Arc::new(Int64Array::from(vals.to_vec()))],
)
.expect("batch")
}
#[test]
fn statistics_for_aggregates_scalar_stats_across_superfiles() {
let st = Supertable::create(num_opts()).expect("create");
let mut w = st.writer().expect("writer");
w.append(&num_batch(&[Some(1), Some(2), Some(3), None]))
.expect("a1");
w.commit().expect("c1");
w.append(&num_batch(&[Some(10), Some(20)])).expect("a2");
w.commit().expect("c2");
let reader = st.reader();
let provider = SupertableProvider::new(
st.options().scalar_schema(),
reader.manifest().clone(),
st.options().store.clone(),
st.options().disk_cache.clone(),
reader.tombstone_cache.clone(),
);
let stats = provider.statistics().expect("statistics");
assert!(matches!(stats.num_rows, Precision::Exact(6)));
let sch = provider.schema();
let n_idx = sch.index_of("n").expect("n column");
let cs = &stats.column_statistics[n_idx];
assert_eq!(cs.min_value, Precision::Exact(ScalarValue::Int64(Some(1))));
assert_eq!(cs.max_value, Precision::Exact(ScalarValue::Int64(Some(20))));
assert_eq!(cs.null_count, Precision::Exact(1));
}
fn entry_minmax_only(col: &str, min: &str, max: &str) -> Arc<SuperfileEntry> {
let mn: ArrayRef = Arc::new(LargeStringArray::from(vec![min]));
let mx: ArrayRef = Arc::new(LargeStringArray::from(vec![max]));
let mut scalar_stats = HashMap::new();
scalar_stats.insert(col.to_string(), ScalarStatsAgg::from_min_max(mn, mx));
Arc::new(SuperfileEntry {
superfile_id: Uuid::new_v4(),
uri: SuperfileUri::new_v4(),
n_docs: 1,
id_min: 0,
id_max: 0,
scalar_stats,
fts_summary: HashMap::new(),
vector_summary: HashMap::new(),
partition_key: Vec::new(),
partition_hint: None,
subsection_offsets: None,
})
}
#[test]
fn scalar_statistics_helpers_return_none_when_stat_absent() {
let entries = vec![entry_minmax_only("s", "alpha", "omega")];
assert!(scalar_sum(&entries, "s").is_none(), "no sum stat → None");
assert!(
scalar_distinct(&entries, "s").is_none(),
"no hll stat → None"
);
assert!(
scalar_null_count(&entries, "s").is_none(),
"no null_count stat → None"
);
assert!(scalar_min_max(&entries, "s").is_some());
assert!(scalar_sum(&entries, "missing").is_none());
assert!(scalar_min_max(&entries, "missing").is_none());
assert!(scalar_null_count(&entries, "missing").is_none());
}
#[test]
fn cached_metadata_reader_factory_debug_reports_superfile_count() {
let store: Arc<dyn OsObjectStore> = Arc::new(InMemory::new());
let factory = CachedMetadataReaderFactory {
store,
metas: HashMap::new(),
};
let dbg = format!("{factory:?}");
assert!(
dbg.contains("CachedMetadataReaderFactory") && dbg.contains("superfiles: 0"),
"Debug missing fields: {dbg}"
);
}
}