use std::any::Any;
use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;
use arrow_schema::Field;
use arrow_schema::Schema;
use arrow_schema::SchemaRef;
use datafusion_common::ColumnStatistics;
use datafusion_common::DataFusionError;
use datafusion_common::Result as DFResult;
use datafusion_common::Statistics;
use datafusion_common::arrow::array::AsArray;
use datafusion_common::arrow::array::RecordBatch;
use datafusion_common::stats::Precision as DFPrecision;
use datafusion_datasource::source::DataSource;
use datafusion_execution::SendableRecordBatchStream;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::projection::ProjectionExprs;
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::DisplayFormatType;
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
use datafusion_physical_plan::filter_pushdown::PushedDown;
use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
use futures::StreamExt;
use futures::TryStreamExt;
use futures::future::try_join_all;
use vortex::array::VortexSessionExecute;
use vortex::array::arrow::ArrowSessionExt;
use vortex::dtype::DType;
use vortex::dtype::FieldPath;
use vortex::dtype::Nullability;
use vortex::error::VortexResult;
use vortex::error::vortex_bail;
use vortex::expr::Expression;
use vortex::expr::and as vx_and;
use vortex::expr::get_item;
use vortex::expr::pack;
use vortex::expr::root;
use vortex::expr::stats::Precision;
use vortex::expr::transform::replace;
use vortex::io::session::RuntimeSessionExt;
use vortex::scan::DataSourceRef;
use vortex::scan::ScanRequest;
use vortex::session::VortexSession;
use vortex_utils::parallelism::get_available_parallelism;
use crate::convert::exprs::DefaultExpressionConvertor;
use crate::convert::exprs::ExpressionConvertor;
use crate::convert::exprs::ProcessedProjection;
use crate::convert::exprs::make_vortex_predicate;
use crate::convert::stats::stats_set_to_df;
pub struct VortexDataSourceBuilder {
data_source: DataSourceRef,
session: VortexSession,
arrow_schema: Option<SchemaRef>,
projection: Option<Vec<usize>>,
}
impl VortexDataSourceBuilder {
pub fn with_arrow_schema(mut self, arrow_schema: SchemaRef) -> Self {
self.arrow_schema = Some(arrow_schema);
self
}
pub fn with_projection(mut self, indices: Vec<usize>) -> Self {
self.projection = Some(indices);
self
}
pub fn with_some_projection(mut self, indices: Option<Vec<usize>>) -> Self {
self.projection = indices;
self
}
pub async fn build(self) -> VortexResult<VortexDataSource> {
let mut projection = root();
let mut arrow_schema = match self.arrow_schema {
Some(schema) => schema,
None => Arc::new(
self.session
.arrow()
.to_arrow_schema(self.data_source.dtype())?,
),
};
if let Some(indices) = self.projection {
let fields = indices.iter().map(|&i| {
let name = arrow_schema.field(i).name().clone();
let expr = get_item(name.as_str(), root());
(name, expr)
});
projection = pack(fields, Nullability::NonNullable);
arrow_schema = Arc::new(Schema::new(
indices
.iter()
.map(|&i| arrow_schema.field(i).clone())
.collect::<Vec<_>>(),
));
}
let DType::Struct(fields, ..) = projection.return_dtype(self.data_source.dtype())? else {
vortex_bail!("Projection does not evaluate to a struct");
};
let field_paths: Vec<_> = fields
.names()
.iter()
.cloned()
.map(FieldPath::from_name)
.collect();
let statistics = try_join_all(
field_paths
.iter()
.map(|path| self.data_source.field_statistics(path)),
)
.await?
.iter()
.zip(fields.fields())
.map(|(stats, dtype)| stats_set_to_df(stats, &dtype))
.collect::<VortexResult<Vec<_>>>()?;
Ok(VortexDataSource {
data_source: self.data_source,
session: self.session,
initial_schema: Arc::clone(&arrow_schema),
initial_projection: projection.clone(),
initial_statistics: statistics.clone(),
projected_projection: projection.clone(),
projected_schema: Arc::clone(&arrow_schema),
projected_statistics: statistics.clone(),
leftover_projection: None,
leftover_schema: arrow_schema,
leftover_statistics: statistics,
filter: None,
limit: None,
ordered: false,
num_partitions: get_available_parallelism().unwrap_or(1),
})
}
}
impl VortexDataSource {
pub fn builder(data_source: DataSourceRef, session: VortexSession) -> VortexDataSourceBuilder {
VortexDataSourceBuilder {
data_source,
session,
arrow_schema: None,
projection: None,
}
}
}
#[derive(Clone)]
pub struct VortexDataSource {
data_source: DataSourceRef,
session: VortexSession,
initial_schema: SchemaRef,
initial_projection: Expression,
#[expect(dead_code)]
initial_statistics: Vec<ColumnStatistics>,
projected_projection: Expression,
projected_schema: SchemaRef,
projected_statistics: Vec<ColumnStatistics>,
leftover_projection: Option<ProjectionExprs>,
leftover_schema: SchemaRef,
leftover_statistics: Vec<ColumnStatistics>,
filter: Option<Expression>,
limit: Option<usize>,
ordered: bool,
num_partitions: usize,
}
impl fmt::Debug for VortexDataSource {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("VortexScanSource")
.field("schema", &self.leftover_schema)
.field("projection", &format!("{}", &self.projected_projection))
.field("filter", &self.filter.as_ref().map(|e| format!("{}", e)))
.field("limit", &self.limit)
.finish()
}
}
impl DataSource for VortexDataSource {
fn open(
&self,
partition: usize,
_context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
if partition != 0 {
return Err(DataFusionError::Internal(format!(
"VortexScanSource: expected partition 0, got {partition}"
)));
}
let scan_request = ScanRequest {
projection: self.projected_projection.clone(),
filter: self.filter.clone(),
limit: self.limit.map(|l| u64::try_from(l).unwrap_or(u64::MAX)),
ordered: self.ordered,
..Default::default()
};
let data_source = Arc::clone(&self.data_source);
let projected_schema = Arc::clone(&self.projected_schema);
let projected_target_field = Arc::new(Field::new_struct(
"",
projected_schema.fields().clone(),
false,
));
let session = self.session.clone();
let num_partitions = self.num_partitions;
let leftover_projector = self
.leftover_projection
.as_ref()
.map(|proj| proj.make_projector(&self.projected_schema))
.transpose()?;
let stream = futures::stream::once(async move {
let scan = data_source
.scan(scan_request)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let scan_streams = scan.partitions().map(|split_result| {
let split = split_result?;
split.execute()
});
let handle = session.handle();
let stream = scan_streams
.try_flatten_unordered(Some(num_partitions * 2))
.map(move |result| {
let session = session.clone();
let target_field = Arc::clone(&projected_target_field);
handle.spawn_cpu(move || {
let mut ctx = session.create_execution_ctx();
result.and_then(|chunk| {
let arrow = session.arrow().execute_arrow(
chunk,
Some(target_field.as_ref()),
&mut ctx,
)?;
Ok(RecordBatch::from(arrow.as_struct().clone()))
})
})
})
.buffered(num_partitions)
.map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));
let stream = if let Some(projector) = leftover_projector {
stream
.map(move |batch_result| {
batch_result.and_then(|batch| projector.project_batch(&batch))
})
.boxed()
} else {
stream.boxed()
};
Ok::<_, DataFusionError>(stream)
})
.try_flatten();
Ok(Box::pin(RecordBatchStreamAdapter::new(
Arc::clone(&self.leftover_schema),
stream,
)))
}
fn as_any(&self) -> &dyn Any {
self
}
fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
write!(
f,
"VortexScanSource: projection={}",
self.projected_projection
)?;
if let Some(filter) = &self.filter {
write!(f, ", filter={filter}")?;
}
if let Some(limit) = self.limit {
write!(f, ", limit={limit}")?;
}
Ok(())
}
fn repartitioned(
&self,
target_partitions: usize,
_repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
) -> DFResult<Option<Arc<dyn DataSource>>> {
let mut this = self.clone();
this.num_partitions = target_partitions;
this.ordered |= output_ordering.is_some();
Ok(Some(Arc::new(this)))
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}
fn eq_properties(&self) -> EquivalenceProperties {
EquivalenceProperties::new(Arc::clone(&self.leftover_schema))
}
fn partition_statistics(&self, _partition: Option<usize>) -> DFResult<Statistics> {
let num_rows = estimate_to_df_precision(self.data_source.row_count().as_ref());
let total_byte_size = estimate_to_df_precision(self.data_source.byte_size().as_ref());
let column_statistics = self.leftover_statistics.clone();
Ok(Statistics {
num_rows,
total_byte_size,
column_statistics,
})
}
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
let mut this = self.clone();
this.limit = limit;
Some(Arc::new(this))
}
fn fetch(&self) -> Option<usize> {
self.limit
}
fn try_swapping_with_projection(
&self,
projection: &ProjectionExprs,
) -> DFResult<Option<Arc<dyn DataSource>>> {
tracing::debug!(
"VortexScanSource: trying to swap with projection: {}",
projection
);
let convertor = DefaultExpressionConvertor::default();
let input_schema = self.initial_schema.as_ref();
let projected_schema = projection.project_schema(input_schema)?;
let ProcessedProjection {
scan_projection,
leftover_projection,
} = convertor.split_projection(projection.clone(), input_schema, &projected_schema)?;
let scan_projection = replace(scan_projection, &root(), self.initial_projection.clone());
let scan_dtype = scan_projection
.return_dtype(self.data_source.dtype())
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let scan_output_schema = Arc::new(
self.session
.arrow()
.to_arrow_schema(&scan_dtype)
.map_err(|e| DataFusionError::External(Box::new(e)))?,
);
let leftover_projection = leftover_projection
.try_map_exprs(|expr| reassign_expr_columns(expr, &scan_output_schema))?;
let final_schema = Arc::new(projected_schema);
let mut this = self.clone();
this.projected_projection = scan_projection;
this.projected_schema = Arc::clone(&scan_output_schema);
this.projected_statistics =
vec![ColumnStatistics::new_unknown(); scan_output_schema.fields().len()];
this.leftover_projection = Some(leftover_projection);
this.leftover_schema = Arc::clone(&final_schema);
this.leftover_statistics =
vec![ColumnStatistics::new_unknown(); final_schema.fields().len()];
Ok(Some(Arc::new(this)))
}
fn try_pushdown_filters(
&self,
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &datafusion_common::config::ConfigOptions,
) -> DFResult<FilterPushdownPropagation<Arc<dyn DataSource>>> {
if filters.is_empty() {
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
vec![],
));
}
let convertor = DefaultExpressionConvertor::default();
let input_schema = self.initial_schema.as_ref();
let pushdown_results: Vec<PushedDown> = filters
.iter()
.map(|expr| {
if convertor.can_be_pushed_down(expr, input_schema) {
PushedDown::Yes
} else {
PushedDown::No
}
})
.collect();
if pushdown_results.iter().all(|p| matches!(p, PushedDown::No)) {
return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
pushdown_results,
));
}
let pushable: Vec<Arc<dyn PhysicalExpr>> = filters
.iter()
.zip(pushdown_results.iter())
.filter_map(|(expr, pushed)| match pushed {
PushedDown::Yes => Some(Arc::clone(expr)),
PushedDown::No => None,
})
.collect();
let vortex_pred = make_vortex_predicate(&convertor, &pushable)?;
let new_filter = match (&self.filter, vortex_pred) {
(Some(existing), Some(new_pred)) => Some(vx_and(existing.clone(), new_pred)),
(Some(existing), None) => Some(existing.clone()),
(None, Some(new_pred)) => Some(new_pred),
(None, None) => None,
};
let mut this = self.clone();
this.filter = new_filter;
Ok(
FilterPushdownPropagation::with_parent_pushdown_result(pushdown_results)
.with_updated_node(Arc::new(this) as _),
)
}
}
fn estimate_to_df_precision(est: Option<&Precision<u64>>) -> DFPrecision<usize> {
match est {
Some(Precision::Exact(v)) => DFPrecision::Exact(usize::try_from(*v).unwrap_or(usize::MAX)),
Some(Precision::Inexact(v)) => {
DFPrecision::Inexact(usize::try_from(*v).unwrap_or(usize::MAX))
}
None => DFPrecision::Absent,
}
}