Skip to main content

vortex_datafusion/persistent/
source.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Formatter;
5use std::ops::Range;
6use std::sync::Arc;
7use std::sync::Weak;
8
9use datafusion_common::Result as DFResult;
10use datafusion_common::config::ConfigOptions;
11use datafusion_datasource::TableSchema;
12use datafusion_datasource::file::FileSource;
13use datafusion_datasource::file_scan_config::FileScanConfig;
14use datafusion_datasource::file_stream::FileOpener;
15use datafusion_execution::cache::cache_manager::FileMetadataCache;
16use datafusion_physical_expr::EquivalenceProperties;
17use datafusion_physical_expr::PhysicalExprRef;
18use datafusion_physical_expr::PhysicalSortExpr;
19use datafusion_physical_expr::conjunction;
20use datafusion_physical_expr::projection::ProjectionExprs;
21use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
22use datafusion_physical_expr_common::physical_expr::fmt_sql;
23use datafusion_physical_plan::DisplayFormatType;
24use datafusion_physical_plan::PhysicalExpr;
25use datafusion_physical_plan::SortOrderPushdownResult;
26use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
27use datafusion_physical_plan::filter_pushdown::PushedDown;
28use datafusion_physical_plan::filter_pushdown::PushedDownPredicate;
29use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
30use object_store::ObjectStore;
31use object_store::path::Path;
32use vortex::error::VortexExpect;
33use vortex::file::VORTEX_FILE_EXTENSION;
34use vortex::layout::LayoutReader;
35use vortex::metrics::DefaultMetricsRegistry;
36use vortex::metrics::MetricsRegistry;
37use vortex::session::VortexSession;
38use vortex_utils::aliases::dash_map::DashMap;
39
40use super::opener::VortexOpener;
41use crate::VortexTableOptions;
42use crate::convert::exprs::DefaultExpressionConvertor;
43use crate::convert::exprs::ExpressionConvertor;
44use crate::persistent::reader::DefaultVortexReaderFactory;
45use crate::persistent::reader::VortexReaderFactory;
46
47/// File scan implementation for reading one or more `.vortex` files.
48///
49/// `VortexSource` is the lower-level read component underneath
50/// [`VortexFormat`]. It is the type DataFusion stores in a [`FileScanConfig`],
51/// and it is ultimately executed through [`DataSourceExec`].
52///
53/// ```text
54///             ▲
55///             │
56///             │  Produce a stream of
57///             │  RecordBatches
58///             │
59/// ┌───────────────────────┐
60/// │     DataSourceExec    │
61/// └───────────────────────┘
62///             ▲
63///             │ uses
64///             │
65/// ┌───────────────────────┐
66/// │      VortexSource     │
67/// └───────────────────────┘
68///             ▲
69///             │ opens `.vortex` files via
70///             │
71///        ObjectStore / VortexReadAt
72/// ```
73///
74/// Most applications reach `VortexSource` indirectly through
75/// [`VortexFormatFactory`]. Use `VortexSource` directly when you are
76/// constructing a `FileScanConfig` yourself or when you need to inject
77/// lower-level behavior such as a custom [`VortexReaderFactory`], an external
78/// [`VortexAccessPlan`], or a specific [`FileMetadataCache`].
79///
80/// # Example
81///
82/// ```rust
83/// use std::sync::Arc;
84///
85/// use arrow_schema::Schema;
86/// use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
87/// use datafusion_datasource::source::DataSourceExec;
88/// use datafusion_datasource::PartitionedFile;
89/// use datafusion_datasource::TableSchema;
90/// use datafusion_execution::object_store::ObjectStoreUrl;
91/// use vortex::VortexSessionDefault;
92/// use vortex::session::VortexSession;
93/// use vortex_datafusion::VortexSource;
94///
95/// let file_schema = Arc::new(Schema::empty());
96/// let source = Arc::new(
97///     VortexSource::new(
98///         TableSchema::from_file_schema(file_schema),
99///         VortexSession::default(),
100///     )
101///     .with_projection_pushdown(true)
102///     .with_scan_concurrency(4),
103/// );
104///
105/// let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
106///     .with_file(PartitionedFile::new("metrics.vortex", 1024))
107///     .build();
108///
109/// let exec = DataSourceExec::from_data_source(config);
110/// # let _ = exec;
111/// ```
112///
113/// # What `VortexSource` Handles
114///
115/// `VortexSource` is responsible for:
116///
117/// - translating DataFusion filters into Vortex predicates when possible,
118/// - retaining the full predicate for file pruning based on statistics and
119///   partition values,
120/// - configuring per-file readers and sharing parsed layout readers across
121///   partitions within the same scan,
122/// - carrying the table schema used for schema evolution and missing-column
123///   adaptation,
124/// - attaching a Vortex metrics registry to the read path.
125///
126/// # Projection And Predicate Behavior
127///
128/// `VortexSource` keeps two related predicate forms:
129///
130/// - `full_predicate`, which is used by DataFusion's `FilePruner` to skip whole
131///   files before they are opened,
132/// - `vortex_predicate`, which contains only the expressions Vortex can evaluate
133///   during the scan.
134///
135/// Projection handling depends on
136/// [`VortexTableOptions::projection_pushdown`]:
137///
138/// - when disabled, `VortexSource` still prunes unreferenced top-level columns,
139///   but DataFusion applies the full projection after the scan,
140/// - when enabled, the scan can evaluate a Vortex-native projection and leave
141///   only unsupported expressions for DataFusion.
142///
143/// # Observability
144///
145/// `VortexSource` owns a Vortex metrics registry for the lifetime of a physical
146/// scan. The registry is passed to the reader and scan builder so I/O and scan
147/// metrics accumulate as the query executes.
148///
149/// Use [`VortexMetricsFinder`] to merge those metrics back into DataFusion
150/// `MetricsSet` values after the plan has run.
151///
152/// # Execution Flow
153///
154/// At execution time:
155///
156/// 1. DataFusion calls `DataSourceExec`, which delegates file opening to
157///    `VortexSource`.
158/// 2. `VortexSource` creates a `VortexOpener` configured with the current
159///    projection, predicate, options, and metrics.
160/// 3. The opener adapts filters and schema for the specific file, applies any
161///    [`VortexAccessPlan`], and builds a Vortex scan.
162/// 4. Scan results are converted into Arrow `RecordBatch` values for
163///    DataFusion.
164///
165/// [`VortexFormat`]: crate::VortexFormat
166/// [`FileScanConfig`]: datafusion_datasource::file_scan_config::FileScanConfig
167/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
168/// [`VortexFormatFactory`]: crate::VortexFormatFactory
169/// [`VortexReaderFactory`]: crate::reader::VortexReaderFactory
170/// [`VortexAccessPlan`]: crate::VortexAccessPlan
171/// [`FileMetadataCache`]: datafusion_execution::cache::cache_manager::FileMetadataCache
172/// [`VortexTableOptions::projection_pushdown`]: crate::VortexTableOptions::projection_pushdown
173/// [`VortexMetricsFinder`]: crate::metrics::VortexMetricsFinder
174#[derive(Clone)]
175pub struct VortexSource {
176    pub(crate) session: VortexSession,
177    pub(crate) table_schema: TableSchema,
178    pub(crate) projection: ProjectionExprs,
179    /// Combined predicate expression containing all filters from DataFusion query planning.
180    /// Used with FilePruner to skip files based on statistics and partition values.
181    pub(crate) full_predicate: Option<PhysicalExprRef>,
182    /// Subset of predicates that can be pushed down into Vortex scan operations.
183    /// These are expressions that Vortex can efficiently evaluate during scanning.
184    pub(crate) vortex_predicate: Option<PhysicalExprRef>,
185    pub(crate) batch_size: Option<usize>,
186    _unused_df_metrics: ExecutionPlanMetricsSet,
187    /// Shared layout readers, the source only lives as long as one scan.
188    ///
189    /// Sharing the readers allows us to only read every layout once from the file, even across partitions.
190    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
191    /// Shared full-file natural split ranges keyed by path.
192    natural_split_ranges: Arc<DashMap<Path, Arc<[Range<u64>]>>>,
193    expression_convertor: Arc<dyn ExpressionConvertor>,
194    pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
195    pub(crate) ordered: bool,
196    vx_metrics_registry: Arc<dyn MetricsRegistry>,
197    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
198    /// Whether to enable expression pushdown into the underlying Vortex scan.
199    options: VortexTableOptions,
200}
201
202impl VortexSource {
203    /// Creates a new `VortexSource` for a table schema and [`VortexSession`].
204    ///
205    /// The new source starts with:
206    ///
207    /// - all top-level columns projected,
208    /// - no pushed filters,
209    /// - a default Vortex metrics registry,
210    /// - default [`VortexTableOptions`].
211    pub fn new(table_schema: TableSchema, session: VortexSession) -> Self {
212        let full_schema = table_schema.table_schema();
213        let indices = (0..full_schema.fields().len()).collect::<Vec<_>>();
214        let projection = ProjectionExprs::from_indices(&indices, full_schema);
215
216        Self {
217            session,
218            table_schema,
219            projection,
220            full_predicate: None,
221            vortex_predicate: None,
222            batch_size: None,
223            _unused_df_metrics: Default::default(),
224            layout_readers: Arc::new(DashMap::default()),
225            natural_split_ranges: Arc::new(DashMap::default()),
226            expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
227            vortex_reader_factory: None,
228            vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
229            file_metadata_cache: None,
230            ordered: false,
231            options: VortexTableOptions::default(),
232        }
233    }
234
235    /// Enables or disables Vortex-native projection evaluation.
236    ///
237    /// This toggles whether `VortexSource` tries to split DataFusion projection
238    /// expressions into a Vortex scan projection plus a leftover DataFusion
239    /// projection.
240    pub fn with_projection_pushdown(mut self, enabled: bool) -> Self {
241        self.options.projection_pushdown = enabled;
242        self
243    }
244
245    /// Sets the [`ExpressionConvertor`] used to translate DataFusion expressions
246    /// into Vortex expressions.
247    ///
248    /// Override this when the default converter is insufficient for an engine
249    /// integration or for a custom schema-adaptation strategy.
250    pub fn with_expression_convertor(
251        mut self,
252        expr_convertor: Arc<dyn ExpressionConvertor>,
253    ) -> Self {
254        self.expression_convertor = expr_convertor;
255        self
256    }
257
258    /// Sets a custom factory for the underlying [`VortexReadAt`].
259    ///
260    /// Use this when reads need to go through an application-specific layer
261    /// rather than the default DataFusion [`ObjectStore`].
262    ///
263    /// [`VortexReadAt`]: vortex::io::VortexReadAt
264    pub fn with_vortex_reader_factory(
265        mut self,
266        vortex_reader_factory: Arc<dyn VortexReaderFactory>,
267    ) -> Self {
268        self.vortex_reader_factory = Some(vortex_reader_factory);
269        self
270    }
271
272    /// Returns the [`MetricsRegistry`] attached to this scan.
273    ///
274    /// The registry is populated as files are opened and scanned. In most
275    /// callers, [`crate::metrics::VortexMetricsFinder`] is the more convenient
276    /// public API for turning the registry contents into DataFusion metrics.
277    pub fn metrics_registry(&self) -> &Arc<dyn MetricsRegistry> {
278        &self.vx_metrics_registry
279    }
280
281    /// Overrides the metadata cache used to reuse Vortex footers across scans.
282    pub fn with_file_metadata_cache(
283        mut self,
284        file_metadata_cache: Arc<dyn FileMetadataCache>,
285    ) -> Self {
286        self.file_metadata_cache = Some(file_metadata_cache);
287        self
288    }
289
290    /// Sets the per-file Vortex scan concurrency.
291    ///
292    /// This is separate from DataFusion's partition-level parallelism.
293    pub fn with_scan_concurrency(mut self, scan_concurrency: usize) -> Self {
294        self.options.scan_concurrency = Some(scan_concurrency);
295        self
296    }
297
298    /// Returns the effective table options for this source.
299    pub fn options(&self) -> &VortexTableOptions {
300        &self.options
301    }
302
303    /// Replaces the table options for this source.
304    pub fn with_options(mut self, opts: VortexTableOptions) -> Self {
305        self.options = opts;
306        self
307    }
308
309    fn create_vortex_opener(
310        &self,
311        object_store: Arc<dyn ObjectStore>,
312        base_config: &FileScanConfig,
313        partition: usize,
314    ) -> DFResult<VortexOpener> {
315        let batch_size = self
316            .batch_size
317            .vortex_expect("batch_size must be supplied to VortexSource");
318
319        let expr_adapter_factory = base_config
320            .expr_adapter_factory
321            .clone()
322            .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory));
323
324        let vortex_reader_factory = self
325            .vortex_reader_factory
326            .clone()
327            .unwrap_or_else(|| Arc::new(DefaultVortexReaderFactory::new(object_store)));
328
329        let opener = VortexOpener {
330            partition,
331            session: self.session.clone(),
332            vortex_reader_factory,
333            projection: self.projection.clone(),
334            filter: self.vortex_predicate.clone(),
335            file_pruning_predicate: self.full_predicate.clone(),
336            expr_adapter_factory,
337            table_schema: self.table_schema.clone(),
338            batch_size,
339            limit: base_config.limit.map(|l| l as u64),
340            metrics_registry: Arc::clone(&self.vx_metrics_registry),
341            layout_readers: Arc::clone(&self.layout_readers),
342            natural_split_ranges: Arc::clone(&self.natural_split_ranges),
343            has_output_ordering: !base_config.output_ordering.is_empty() || self.ordered,
344            expression_convertor: Arc::clone(&self.expression_convertor),
345            file_metadata_cache: self.file_metadata_cache.clone(),
346            projection_pushdown: self.options.projection_pushdown,
347            scan_concurrency: self.options.scan_concurrency,
348        };
349
350        Ok(opener)
351    }
352}
353
354impl FileSource for VortexSource {
355    fn create_file_opener(
356        &self,
357        object_store: Arc<dyn ObjectStore>,
358        base_config: &FileScanConfig,
359        partition: usize,
360    ) -> DFResult<Arc<dyn FileOpener>> {
361        Ok(Arc::new(self.create_vortex_opener(
362            object_store,
363            base_config,
364            partition,
365        )?))
366    }
367
368    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
369        let mut source = self.clone();
370        source.batch_size = Some(batch_size);
371        Arc::new(source)
372    }
373
374    fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
375        self.vortex_predicate.clone()
376    }
377
378    fn metrics(&self) -> &ExecutionPlanMetricsSet {
379        &self._unused_df_metrics
380    }
381
382    fn file_type(&self) -> &str {
383        VORTEX_FILE_EXTENSION
384    }
385
386    fn try_pushdown_sort(
387        &self,
388        order: &[PhysicalSortExpr],
389        eq_properties: &EquivalenceProperties,
390    ) -> DFResult<SortOrderPushdownResult<Arc<dyn FileSource>>> {
391        if order.is_empty() {
392            return Ok(SortOrderPushdownResult::Unsupported);
393        }
394
395        if eq_properties.ordering_satisfy(order.iter().cloned())? {
396            let mut this = self.clone();
397            this.ordered = true;
398
399            return Ok(SortOrderPushdownResult::Exact {
400                inner: Arc::new(this) as Arc<dyn FileSource>,
401            });
402        }
403
404        Ok(SortOrderPushdownResult::Unsupported)
405    }
406
407    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
408        match t {
409            DisplayFormatType::Default | DisplayFormatType::Verbose => {
410                if let Some(predicate) = &self.vortex_predicate {
411                    write!(f, ", predicate: {predicate}")?;
412                }
413            }
414            // Use TreeRender style key=value formatting to display the predicate
415            DisplayFormatType::TreeRender => {
416                if let Some(predicate) = &self.vortex_predicate {
417                    writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
418                };
419            }
420        }
421        Ok(())
422    }
423
424    fn supports_repartitioning(&self) -> bool {
425        true
426    }
427
428    fn try_pushdown_filters(
429        &self,
430        filters: Vec<Arc<dyn PhysicalExpr>>,
431        _config: &ConfigOptions,
432    ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
433        if filters.is_empty() {
434            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
435                vec![],
436            ));
437        }
438
439        let mut source = self.clone();
440
441        // Combine new filters with existing predicate for file pruning.
442        // This full predicate is used by FilePruner to eliminate files.
443        source.full_predicate = match source.full_predicate {
444            Some(predicate) => Some(conjunction(
445                std::iter::once(predicate).chain(filters.clone()),
446            )),
447            None => Some(conjunction(filters.clone())),
448        };
449
450        let supported_filters = filters
451            .into_iter()
452            .map(|expr| {
453                if self
454                    .expression_convertor
455                    .can_be_pushed_down(&expr, self.table_schema.file_schema())
456                {
457                    PushedDownPredicate::supported(expr)
458                } else {
459                    PushedDownPredicate::unsupported(expr)
460                }
461            })
462            .collect::<Vec<_>>();
463
464        if supported_filters
465            .iter()
466            .all(|p| matches!(p.discriminant, PushedDown::No))
467        {
468            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
469                vec![PushedDown::No; supported_filters.len()],
470            )
471            .with_updated_node(Arc::new(source) as _));
472        }
473
474        let supported = supported_filters
475            .iter()
476            .filter_map(|p| match p.discriminant {
477                PushedDown::Yes => Some(&p.predicate),
478                PushedDown::No => None,
479            })
480            .cloned();
481
482        let predicate = match source.vortex_predicate {
483            Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
484            None => conjunction(supported),
485        };
486
487        tracing::debug!(%predicate, "Saving predicate");
488
489        source.vortex_predicate = Some(predicate);
490
491        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
492            supported_filters.iter().map(|f| f.discriminant).collect(),
493        )
494        .with_updated_node(Arc::new(source) as _))
495    }
496
497    fn try_pushdown_projection(
498        &self,
499        projection: &ProjectionExprs,
500    ) -> DFResult<Option<Arc<dyn FileSource>>> {
501        let mut source = self.clone();
502        source.projection = self.projection.try_merge(projection)?;
503        Ok(Some(Arc::new(source)))
504    }
505
506    fn projection(&self) -> Option<&ProjectionExprs> {
507        Some(&self.projection)
508    }
509
510    fn table_schema(&self) -> &TableSchema {
511        &self.table_schema
512    }
513}
514
515#[cfg(test)]
516mod tests {
517    use arrow_schema::DataType;
518    use arrow_schema::Field;
519    use arrow_schema::Schema;
520    use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
521    use datafusion_execution::object_store::ObjectStoreUrl;
522    use datafusion_physical_expr::expressions::Column;
523    use object_store::memory::InMemory;
524    use vortex::VortexSessionDefault;
525
526    use super::*;
527    use crate::convert::exprs::ProcessedProjection;
528
529    struct TrackingExpressionConvertor {
530        inner: DefaultExpressionConvertor,
531    }
532
533    impl ExpressionConvertor for TrackingExpressionConvertor {
534        fn can_be_pushed_down(&self, expr: &PhysicalExprRef, schema: &Schema) -> bool {
535            self.inner.can_be_pushed_down(expr, schema)
536        }
537
538        fn convert(&self, expr: &dyn PhysicalExpr) -> DFResult<vortex::expr::Expression> {
539            self.inner.convert(expr)
540        }
541
542        fn split_projection(
543            &self,
544            source_projection: ProjectionExprs,
545            input_schema: &Schema,
546            output_schema: &Schema,
547        ) -> DFResult<ProcessedProjection> {
548            self.inner
549                .split_projection(source_projection, input_schema, output_schema)
550        }
551
552        fn no_pushdown_projection(
553            &self,
554            source_projection: ProjectionExprs,
555            input_schema: &Schema,
556        ) -> DFResult<ProcessedProjection> {
557            self.inner
558                .no_pushdown_projection(source_projection, input_schema)
559        }
560    }
561
562    fn sort_column(name: &str, index: usize) -> PhysicalSortExpr {
563        let expr: PhysicalExprRef = Arc::new(Column::new(name, index));
564        PhysicalSortExpr::new_default(expr)
565    }
566
567    fn sort_test_schema() -> Arc<Schema> {
568        Arc::new(Schema::new(vec![
569            Field::new("a", DataType::Int32, false),
570            Field::new("b", DataType::Int32, false),
571        ]))
572    }
573
574    fn sort_test_source(schema: Arc<Schema>) -> VortexSource {
575        VortexSource::new(
576            TableSchema::from_file_schema(schema),
577            VortexSession::default(),
578        )
579    }
580
581    fn assert_ordered_source(inner: Arc<dyn FileSource>) -> anyhow::Result<()> {
582        let source = inner
583            .downcast_ref::<VortexSource>()
584            .ok_or_else(|| anyhow::anyhow!("expected VortexSource"))?;
585
586        assert!(source.ordered);
587        Ok(())
588    }
589
590    #[test]
591    fn try_pushdown_sort_returns_exact_when_ordering_is_satisfied() -> anyhow::Result<()> {
592        let schema = sort_test_schema();
593        let source = sort_test_source(Arc::clone(&schema));
594        let order = vec![sort_column("a", 0), sort_column("b", 1)];
595        let eq_properties = EquivalenceProperties::new_with_orderings(schema, [order.clone()]);
596
597        let result = source.try_pushdown_sort(&order, &eq_properties)?;
598
599        match result {
600            SortOrderPushdownResult::Exact { inner } => assert_ordered_source(inner)?,
601            SortOrderPushdownResult::Inexact { .. } | SortOrderPushdownResult::Unsupported => {
602                anyhow::bail!("expected exact sort pushdown")
603            }
604        }
605        assert!(!source.ordered);
606        Ok(())
607    }
608
609    #[test]
610    fn create_vortex_opener_preserves_expression_convertor() -> anyhow::Result<()> {
611        let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
612        let expression_convertor = Arc::new(TrackingExpressionConvertor {
613            inner: DefaultExpressionConvertor::default(),
614        }) as Arc<dyn ExpressionConvertor>;
615
616        let mut source = VortexSource::new(
617            TableSchema::from_file_schema(file_schema),
618            VortexSession::default(),
619        )
620        .with_expression_convertor(Arc::clone(&expression_convertor));
621        source.batch_size = Some(100);
622
623        let config = FileScanConfigBuilder::new(
624            ObjectStoreUrl::local_filesystem(),
625            Arc::new(source.clone()),
626        )
627        .build();
628
629        let opener = source.create_vortex_opener(
630            Arc::new(InMemory::new()) as Arc<dyn ObjectStore>,
631            &config,
632            0,
633        )?;
634
635        assert!(Arc::ptr_eq(
636            &opener.expression_convertor,
637            &expression_convertor
638        ));
639        Ok(())
640    }
641}