Skip to main content

vortex_datafusion/v2/
source.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! [`VortexDataSource`] implements DataFusion's [`DataSource`] trait, deferring scan construction
5//! to [`DataSource::open`] so that pushed-down filters and limits are included in the
6//! [`ScanRequest`]. A single DataFusion partition is used; Vortex handles internal parallelism
7//! by driving splits concurrently via [`TryStreamExt::try_flatten_unordered`].
8
9use std::any::Any;
10use std::fmt;
11use std::fmt::Formatter;
12use std::num::NonZero;
13use std::num::NonZeroUsize;
14use std::sync::Arc;
15
16use arrow_schema::DataType;
17use arrow_schema::Schema;
18use arrow_schema::SchemaRef;
19use datafusion_common::ColumnStatistics;
20use datafusion_common::DataFusionError;
21use datafusion_common::Result as DFResult;
22use datafusion_common::Statistics;
23use datafusion_common::stats::Precision as DFPrecision;
24use datafusion_datasource::source::DataSource;
25use datafusion_execution::SendableRecordBatchStream;
26use datafusion_execution::TaskContext;
27use datafusion_physical_expr::EquivalenceProperties;
28use datafusion_physical_expr::Partitioning;
29use datafusion_physical_expr::PhysicalExpr;
30use datafusion_physical_expr::projection::ProjectionExprs;
31use datafusion_physical_expr::utils::reassign_expr_columns;
32use datafusion_physical_expr_common::sort_expr::LexOrdering;
33use datafusion_physical_plan::DisplayFormatType;
34use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
35use datafusion_physical_plan::filter_pushdown::PushedDown;
36use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
37use futures::StreamExt;
38use futures::TryStreamExt;
39use futures::future::try_join_all;
40use vortex::array::VortexSessionExecute;
41use vortex::array::arrow::ArrowArrayExecutor;
42use vortex::dtype::DType;
43use vortex::dtype::FieldPath;
44use vortex::dtype::Nullability;
45use vortex::error::VortexExpect;
46use vortex::error::VortexResult;
47use vortex::error::vortex_bail;
48use vortex::expr::Expression;
49use vortex::expr::and as vx_and;
50use vortex::expr::get_item;
51use vortex::expr::pack;
52use vortex::expr::root;
53use vortex::expr::stats::Precision;
54use vortex::expr::transform::replace;
55use vortex::io::session::RuntimeSessionExt;
56use vortex::scan::api::DataSourceRef;
57use vortex::scan::api::ScanRequest;
58use vortex::session::VortexSession;
59
60use crate::convert::exprs::DefaultExpressionConvertor;
61use crate::convert::exprs::ExpressionConvertor;
62use crate::convert::exprs::ProcessedProjection;
63use crate::convert::exprs::make_vortex_predicate;
64use crate::convert::stats::stats_set_to_df;
65
66/// A builder for a [`VortexDataSource`].
67pub struct VortexDataSourceBuilder {
68    data_source: DataSourceRef,
69    session: VortexSession,
70
71    arrow_schema: Option<SchemaRef>,
72    projection: Option<Vec<usize>>,
73}
74
75impl VortexDataSourceBuilder {
76    /// Manually configure an Arrow schema to use when reading from the Vortex source.
77    /// If not specified, the data source will infer an Arrow schema from the Vortex DType.
78    ///
79    /// Note that this schema is not validated against the Vortex DType so any errors will be
80    /// deferred until read time.
81    pub fn with_arrow_schema(mut self, arrow_schema: SchemaRef) -> Self {
82        self.arrow_schema = Some(arrow_schema);
83        self
84    }
85
86    /// Configure an initial projection using top-level field indices.
87    pub fn with_projection(mut self, indices: Vec<usize>) -> Self {
88        self.projection = Some(indices);
89        self
90    }
91
92    /// Configure an initial projection using top-level field indices.
93    pub fn with_some_projection(mut self, indices: Option<Vec<usize>>) -> Self {
94        self.projection = indices;
95        self
96    }
97
98    /// Build the [`VortexDataSource`].
99    ///
100    /// FIXME(ngates): Note that due to the DataFusion API, this function eagerly resolves
101    ///   statistics for all projected columns. That said.. we only need to do this for aggregation
102    ///   reductions. Any stats used for pruning are handled internally. We could possibly look
103    ///   at the plan ourselves and decide whether there is any need for the stats?
104    pub async fn build(self) -> VortexResult<VortexDataSource> {
105        // The projection expression
106        let mut projection = root();
107
108        // Resolve the Arrow schema
109        let mut arrow_schema = match self.arrow_schema {
110            Some(schema) => schema,
111            None => {
112                let data_type = self.data_source.dtype().to_arrow_dtype()?;
113                let DataType::Struct(fields) = data_type else {
114                    vortex_bail!("Expected a struct-like DataType, found {}", data_type);
115                };
116                Arc::new(Schema::new(fields))
117            }
118        };
119
120        // Apply any selection and create a projection expression.
121        if let Some(indices) = self.projection {
122            let fields = indices.iter().map(|&i| {
123                let name = arrow_schema.field(i).name().clone();
124                let expr = get_item(name.as_str(), root());
125                (name, expr)
126            });
127
128            // Update the projection expression
129            projection = pack(fields, Nullability::NonNullable);
130
131            // Update the arrow schema
132            arrow_schema = Arc::new(Schema::new(
133                indices
134                    .iter()
135                    .map(|&i| arrow_schema.field(i).clone())
136                    .collect::<Vec<_>>(),
137            ));
138        }
139
140        let DType::Struct(fields, ..) = projection.return_dtype(self.data_source.dtype())? else {
141            vortex_bail!("Projection does not evaluate to a struct");
142        };
143
144        // We now compute initial statistics.
145        let field_paths: Vec<_> = fields
146            .names()
147            .iter()
148            .cloned()
149            .map(FieldPath::from_name)
150            .collect();
151        let statistics = try_join_all(
152            field_paths
153                .iter()
154                .map(|path| self.data_source.field_statistics(path)),
155        )
156        .await?
157        .iter()
158        .zip(fields.fields())
159        .map(|(stats, dtype)| stats_set_to_df(stats, &dtype))
160        .collect::<VortexResult<Vec<_>>>()?;
161
162        Ok(VortexDataSource {
163            data_source: self.data_source,
164            session: self.session,
165            initial_schema: arrow_schema.clone(),
166            initial_projection: projection.clone(),
167            initial_statistics: statistics.clone(),
168            projected_projection: projection.clone(),
169            projected_schema: arrow_schema.clone(),
170            projected_statistics: statistics.clone(),
171            leftover_projection: None,
172            leftover_schema: arrow_schema,
173            leftover_statistics: statistics,
174            filter: None,
175            limit: None,
176            ordered: false,
177            num_partitions: std::thread::available_parallelism().unwrap_or_else(|_| {
178                NonZero::new(1).vortex_expect("available parallelism must be non-zero")
179            }),
180        })
181    }
182}
183
184impl VortexDataSource {
185    /// Create a builder for a [`VortexDataSource`].
186    pub fn builder(data_source: DataSourceRef, session: VortexSession) -> VortexDataSourceBuilder {
187        VortexDataSourceBuilder {
188            data_source,
189            session,
190            arrow_schema: None,
191            projection: None,
192        }
193    }
194}
195
196/// A DataFusion [`DataSource`] that defers Vortex scan construction to [`open`](DataSource::open).
197///
198/// Holds a [`DataSourceRef`] rather than pre-collected splits, so that filters and limits pushed
199/// down by DataFusion's optimizer are included in the [`ScanRequest`]. A single DataFusion
200/// partition is exposed; Vortex drives splits concurrently via
201/// [`TryStreamExt::try_flatten_unordered`].
202#[derive(Clone)]
203pub struct VortexDataSource {
204    /// The Vortex data source.
205    data_source: DataSourceRef,
206    /// Vortex session handle.
207    session: VortexSession,
208
209    // --- Phase 1: Initial (from the builder, before any optimizer pushdown) ---
210    /// The Arrow schema of the data source before any DataFusion projection pushdown.
211    initial_schema: SchemaRef,
212    /// The initial Vortex projection expression (e.g. column selection from the builder).
213    initial_projection: Expression,
214    /// Column statistics for the initial projection columns.
215    #[allow(dead_code)]
216    initial_statistics: Vec<ColumnStatistics>,
217
218    // --- Phase 2: Projected (pushed into the Vortex scan) ---
219    /// The Vortex projection expression sent in the [`ScanRequest`].
220    /// Composed with `initial_projection` so it operates on the original source columns.
221    projected_projection: Expression,
222    /// The Arrow schema of the Vortex scan output (before any leftover projection).
223    projected_schema: SchemaRef,
224    /// Column statistics for the projected (scan output) columns.
225    projected_statistics: Vec<ColumnStatistics>,
226
227    // --- Phase 3: Leftover (applied by DataFusion after the scan) ---
228    /// DataFusion projection expressions that could not be pushed into the Vortex scan.
229    /// Applied after converting arrays to record batches in [`DataSource::open`].
230    /// `None` when all projection expressions were successfully pushed down.
231    leftover_projection: Option<ProjectionExprs>,
232    /// The Arrow schema after applying the leftover projection.
233    /// This is the output schema seen by DataFusion.
234    leftover_schema: SchemaRef,
235    /// Column statistics matching `leftover_schema`.
236    leftover_statistics: Vec<ColumnStatistics>,
237
238    /// An optional filter expression.
239    /// Populated by [`DataSource::try_pushdown_filters`] when DataFusion pushes filters down.
240    filter: Option<Expression>,
241    /// An optional row limit populated by [`DataSource::with_fetch`].
242    limit: Option<usize>,
243    /// Whether to preserve the order of the output rows.
244    ordered: bool,
245
246    /// The requested partition count from DataFusion, populated by [`DataSource::repartitioned`].
247    /// We use this as a hint for how many splits to execute concurrently in `open()`, but we
248    /// always declare to DataFusion that we only have a single partition so that we can
249    /// internally manage concurrency and fix the problem of partition skew.
250    num_partitions: NonZeroUsize,
251}
252
253impl fmt::Debug for VortexDataSource {
254    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
255        f.debug_struct("VortexScanSource")
256            .field("schema", &self.leftover_schema)
257            .field("projection", &format!("{}", &self.projected_projection))
258            .field("filter", &self.filter.as_ref().map(|e| format!("{}", e)))
259            .field("limit", &self.limit)
260            .finish()
261    }
262}
263
264impl DataSource for VortexDataSource {
265    fn open(
266        &self,
267        partition: usize,
268        _context: Arc<TaskContext>,
269    ) -> DFResult<SendableRecordBatchStream> {
270        // VortexScanSource always uses a single partition since Vortex handles parallelism
271        // and concurrency internally.
272        if partition != 0 {
273            return Err(DataFusionError::Internal(format!(
274                "VortexScanSource: expected partition 0, got {partition}"
275            )));
276        }
277
278        // Build the scan request with pushed-down projection, filter, and limit.
279        // The projection is included so the scan can prune columns at the I/O level.
280        let scan_request = ScanRequest {
281            projection: self.projected_projection.clone(),
282            filter: self.filter.clone(),
283            limit: self.limit.map(|l| u64::try_from(l).unwrap_or(u64::MAX)),
284            ordered: self.ordered,
285            ..Default::default()
286        };
287
288        let data_source = self.data_source.clone();
289        let projected_schema = self.projected_schema.clone();
290        let session = self.session.clone();
291        let num_partitions = self.num_partitions;
292
293        // Pre-build the leftover projector (if any) so we can apply it after batch conversion.
294        let leftover_projector = self
295            .leftover_projection
296            .as_ref()
297            .map(|proj| proj.make_projector(&self.projected_schema))
298            .transpose()?;
299
300        // Defer the async DataSource::scan() call to the first poll of the stream.
301        let stream = futures::stream::once(async move {
302            let scan = data_source
303                .scan(scan_request)
304                .await
305                .map_err(|e| DataFusionError::External(Box::new(e)))?;
306
307            // Each split.execute() returns a lazy stream whose early polls do preparation
308            // work (expression resolution, layout traversal, first I/O spawns). We use
309            // try_flatten_unordered to poll multiple split streams concurrently so that
310            // the next split is already warm when the current one finishes.
311            let scan_streams = scan.partitions().map(|split_result| {
312                let split = split_result?;
313                split.execute()
314            });
315
316            let handle = session.handle();
317            let stream = scan_streams
318                .try_flatten_unordered(Some(num_partitions.get() * 2))
319                .map(move |result| {
320                    let session = session.clone();
321                    let schema = projected_schema.clone();
322                    handle.spawn_cpu(move || {
323                        let mut ctx = session.create_execution_ctx();
324                        result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx))
325                    })
326                })
327                .buffered(num_partitions.get())
328                .map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));
329
330            // Apply leftover projection (expressions that couldn't be pushed into Vortex).
331            let stream = if let Some(projector) = leftover_projector {
332                stream
333                    .map(move |batch_result| {
334                        batch_result.and_then(|batch| projector.project_batch(&batch))
335                    })
336                    .boxed()
337            } else {
338                stream.boxed()
339            };
340
341            Ok::<_, DataFusionError>(stream)
342        })
343        .try_flatten();
344
345        Ok(Box::pin(RecordBatchStreamAdapter::new(
346            self.leftover_schema.clone(),
347            stream,
348        )))
349    }
350
351    fn as_any(&self) -> &dyn Any {
352        self
353    }
354
355    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
356        write!(
357            f,
358            "VortexScanSource: projection={}",
359            self.projected_projection
360        )?;
361        if let Some(ref filter) = self.filter {
362            write!(f, ", filter={filter}")?;
363        }
364        if let Some(limit) = self.limit {
365            write!(f, ", limit={limit}")?;
366        }
367        Ok(())
368    }
369
370    fn repartitioned(
371        &self,
372        target_partitions: usize,
373        _repartition_file_min_size: usize,
374        output_ordering: Option<LexOrdering>,
375    ) -> DFResult<Option<Arc<dyn DataSource>>> {
376        // Vortex handles parallelism internally — always use a single partition.
377        let mut this = self.clone();
378        this.num_partitions = NonZero::new(target_partitions)
379            .ok_or_else(|| DataFusionError::Internal("non-zero partitions".to_string()))?;
380        this.ordered |= output_ordering.is_some();
381        Ok(Some(Arc::new(this)))
382    }
383
384    fn output_partitioning(&self) -> Partitioning {
385        Partitioning::UnknownPartitioning(1)
386    }
387
388    fn eq_properties(&self) -> EquivalenceProperties {
389        EquivalenceProperties::new(self.leftover_schema.clone())
390    }
391
392    fn partition_statistics(&self, _partition: Option<usize>) -> DFResult<Statistics> {
393        // FIXME(ngates): this should be adjusted based on filters. See DuckDB for heuristics,
394        //  and in the future, store the selectivity stats in the session.
395        let num_rows = estimate_to_df_precision(&self.data_source.row_count());
396
397        // FIXME(ngates): byte size should be adjusted for the initial projection...
398        let total_byte_size = estimate_to_df_precision(&self.data_source.byte_size());
399
400        // Column statistics must match the output schema (leftover_schema), which may differ
401        // from the initial schema after try_swapping_with_projection adds computed columns.
402        let column_statistics = self.leftover_statistics.clone();
403
404        Ok(Statistics {
405            num_rows,
406            total_byte_size,
407            column_statistics,
408        })
409    }
410
411    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
412        let mut this = self.clone();
413        this.limit = limit;
414        Some(Arc::new(this))
415    }
416
417    fn fetch(&self) -> Option<usize> {
418        self.limit
419    }
420
421    // Note that we're explicitly "swapping" the projection. That means everything we do must
422    // be computed over the original input schema, rather than the projected output schema.
423    fn try_swapping_with_projection(
424        &self,
425        projection: &ProjectionExprs,
426    ) -> DFResult<Option<Arc<dyn DataSource>>> {
427        tracing::debug!(
428            "VortexScanSource: trying to swap with projection: {}",
429            projection
430        );
431
432        let convertor = DefaultExpressionConvertor::default();
433        let input_schema = self.initial_schema.as_ref();
434        let projected_schema = projection.project_schema(input_schema)?;
435
436        // Use the shared ExpressionConvertor to split the projection into a Vortex
437        // scan_projection and a leftover DataFusion projection for expressions that
438        // can't be pushed down (e.g., unsupported scalar functions, decimal binary).
439        let ProcessedProjection {
440            scan_projection,
441            leftover_projection,
442        } = convertor.split_projection(projection.clone(), input_schema, &projected_schema)?;
443
444        // Compose with the initial projection so the scan operates on the original
445        // source columns, not the initial projection's output columns.
446        let scan_projection = replace(scan_projection, &root(), self.initial_projection.clone());
447
448        // Compute the scan output schema from the Vortex expression's return dtype.
449        let scan_dtype = scan_projection
450            .return_dtype(self.data_source.dtype())
451            .map_err(|e| DataFusionError::External(Box::new(e)))?;
452        let scan_arrow_type = scan_dtype
453            .to_arrow_dtype()
454            .map_err(|e| DataFusionError::External(Box::new(e)))?;
455        let DataType::Struct(scan_fields) = scan_arrow_type else {
456            return Err(DataFusionError::Internal(
457                "Scan projection must produce a struct type".to_string(),
458            ));
459        };
460        let scan_output_schema = Arc::new(Schema::new(scan_fields));
461
462        // Remap the leftover column references to match the scan output schema.
463        let leftover_projection = leftover_projection
464            .try_map_exprs(|expr| reassign_expr_columns(expr, &scan_output_schema))?;
465
466        let final_schema = Arc::new(projected_schema);
467
468        let mut this = self.clone();
469        this.projected_projection = scan_projection;
470        this.projected_schema = scan_output_schema.clone();
471        this.projected_statistics =
472            vec![ColumnStatistics::new_unknown(); scan_output_schema.fields().len()];
473        this.leftover_projection = Some(leftover_projection);
474        this.leftover_schema = final_schema.clone();
475        this.leftover_statistics =
476            vec![ColumnStatistics::new_unknown(); final_schema.fields().len()];
477
478        Ok(Some(Arc::new(this)))
479    }
480
481    fn try_pushdown_filters(
482        &self,
483        filters: Vec<Arc<dyn PhysicalExpr>>,
484        _config: &datafusion_common::config::ConfigOptions,
485    ) -> DFResult<FilterPushdownPropagation<Arc<dyn DataSource>>> {
486        if filters.is_empty() {
487            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
488                vec![],
489            ));
490        }
491
492        let convertor = DefaultExpressionConvertor::default();
493        let input_schema = self.initial_schema.as_ref();
494
495        // Classify each filter: pushable filters are passed into the ScanRequest in open(),
496        // so we can safely claim PushedDown::Yes for them.
497        let pushdown_results: Vec<PushedDown> = filters
498            .iter()
499            .map(|expr| {
500                if convertor.can_be_pushed_down(expr, input_schema) {
501                    PushedDown::Yes
502                } else {
503                    PushedDown::No
504                }
505            })
506            .collect();
507
508        // If nothing can be pushed down, return early.
509        if pushdown_results.iter().all(|p| matches!(p, PushedDown::No)) {
510            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
511                pushdown_results,
512            ));
513        }
514
515        // Collect the pushable filter expressions.
516        let pushable: Vec<Arc<dyn PhysicalExpr>> = filters
517            .iter()
518            .zip(pushdown_results.iter())
519            .filter_map(|(expr, pushed)| match pushed {
520                PushedDown::Yes => Some(expr.clone()),
521                PushedDown::No => None,
522            })
523            .collect();
524
525        // Convert to Vortex conjunction.
526        let vortex_pred = make_vortex_predicate(&convertor, &pushable)?;
527
528        // Combine with existing filter.
529        let new_filter = match (&self.filter, vortex_pred) {
530            (Some(existing), Some(new_pred)) => Some(vx_and(existing.clone(), new_pred)),
531            (Some(existing), None) => Some(existing.clone()),
532            (None, Some(new_pred)) => Some(new_pred),
533            (None, None) => None,
534        };
535
536        let mut this = self.clone();
537        this.filter = new_filter;
538        Ok(
539            FilterPushdownPropagation::with_parent_pushdown_result(pushdown_results)
540                .with_updated_node(Arc::new(this) as _),
541        )
542    }
543}
544
545/// Convert a Vortex [`Option<Precision>`] to a DataFusion [`Precision`](DFPrecision).
546fn estimate_to_df_precision(est: &Option<Precision<u64>>) -> DFPrecision<usize> {
547    match est {
548        Some(Precision::Exact(v)) => DFPrecision::Exact(usize::try_from(*v).unwrap_or(usize::MAX)),
549        Some(Precision::Inexact(v)) => {
550            DFPrecision::Inexact(usize::try_from(*v).unwrap_or(usize::MAX))
551        }
552        None => DFPrecision::Absent,
553    }
554}