Skip to main content

vortex_datafusion/persistent/
source.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Formatter;
6use std::ops::Range;
7use std::sync::Arc;
8use std::sync::Weak;
9
10use datafusion_common::Result as DFResult;
11use datafusion_common::config::ConfigOptions;
12use datafusion_datasource::TableSchema;
13use datafusion_datasource::file::FileSource;
14use datafusion_datasource::file_scan_config::FileScanConfig;
15use datafusion_datasource::file_stream::FileOpener;
16use datafusion_execution::cache::cache_manager::FileMetadataCache;
17use datafusion_physical_expr::PhysicalExprRef;
18use datafusion_physical_expr::conjunction;
19use datafusion_physical_expr::projection::ProjectionExprs;
20use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
21use datafusion_physical_expr_common::physical_expr::fmt_sql;
22use datafusion_physical_plan::DisplayFormatType;
23use datafusion_physical_plan::PhysicalExpr;
24use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
25use datafusion_physical_plan::filter_pushdown::PushedDown;
26use datafusion_physical_plan::filter_pushdown::PushedDownPredicate;
27use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
28use object_store::ObjectStore;
29use object_store::path::Path;
30use vortex::error::VortexExpect;
31use vortex::file::VORTEX_FILE_EXTENSION;
32use vortex::layout::LayoutReader;
33use vortex::metrics::DefaultMetricsRegistry;
34use vortex::metrics::MetricsRegistry;
35use vortex::session::VortexSession;
36use vortex_utils::aliases::dash_map::DashMap;
37
38use super::opener::VortexOpener;
39use crate::VortexTableOptions;
40use crate::convert::exprs::DefaultExpressionConvertor;
41use crate::convert::exprs::ExpressionConvertor;
42use crate::persistent::reader::DefaultVortexReaderFactory;
43use crate::persistent::reader::VortexReaderFactory;
44
45/// File scan implementation for reading one or more `.vortex` files.
46///
47/// `VortexSource` is the lower-level read component underneath
48/// [`VortexFormat`]. It is the type DataFusion stores in a [`FileScanConfig`],
49/// and it is ultimately executed through [`DataSourceExec`].
50///
51/// ```text
52///             ▲
53///             │
54///             │  Produce a stream of
55///             │  RecordBatches
56///             │
57/// ┌───────────────────────┐
58/// │     DataSourceExec    │
59/// └───────────────────────┘
60///             ▲
61///             │ uses
62///             │
63/// ┌───────────────────────┐
64/// │      VortexSource     │
65/// └───────────────────────┘
66///             ▲
67///             │ opens `.vortex` files via
68///             │
69///        ObjectStore / VortexReadAt
70/// ```
71///
72/// Most applications reach `VortexSource` indirectly through
73/// [`VortexFormatFactory`]. Use `VortexSource` directly when you are
74/// constructing a `FileScanConfig` yourself or when you need to inject
75/// lower-level behavior such as a custom [`VortexReaderFactory`], an external
76/// [`VortexAccessPlan`], or a specific [`FileMetadataCache`].
77///
78/// # Example
79///
80/// ```rust
81/// use std::sync::Arc;
82///
83/// use arrow_schema::Schema;
84/// use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
85/// use datafusion_datasource::source::DataSourceExec;
86/// use datafusion_datasource::PartitionedFile;
87/// use datafusion_datasource::TableSchema;
88/// use datafusion_execution::object_store::ObjectStoreUrl;
89/// use vortex::VortexSessionDefault;
90/// use vortex::session::VortexSession;
91/// use vortex_datafusion::VortexSource;
92///
93/// let file_schema = Arc::new(Schema::empty());
94/// let source = Arc::new(
95///     VortexSource::new(
96///         TableSchema::from_file_schema(file_schema),
97///         VortexSession::default(),
98///     )
99///     .with_projection_pushdown(true)
100///     .with_scan_concurrency(4),
101/// );
102///
103/// let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
104///     .with_file(PartitionedFile::new("metrics.vortex", 1024))
105///     .build();
106///
107/// let exec = DataSourceExec::from_data_source(config);
108/// # let _ = exec;
109/// ```
110///
111/// # What `VortexSource` Handles
112///
113/// `VortexSource` is responsible for:
114///
115/// - translating DataFusion filters into Vortex predicates when possible,
116/// - retaining the full predicate for file pruning based on statistics and
117///   partition values,
118/// - configuring per-file readers and sharing parsed layout readers across
119///   partitions within the same scan,
120/// - carrying the table schema used for schema evolution and missing-column
121///   adaptation,
122/// - attaching a Vortex metrics registry to the read path.
123///
124/// # Projection And Predicate Behavior
125///
126/// `VortexSource` keeps two related predicate forms:
127///
128/// - `full_predicate`, which is used by DataFusion's `FilePruner` to skip whole
129///   files before they are opened,
130/// - `vortex_predicate`, which contains only the expressions Vortex can evaluate
131///   during the scan.
132///
133/// Projection handling depends on
134/// [`VortexTableOptions::projection_pushdown`]:
135///
136/// - when disabled, `VortexSource` still prunes unreferenced top-level columns,
137///   but DataFusion applies the full projection after the scan,
138/// - when enabled, the scan can evaluate a Vortex-native projection and leave
139///   only unsupported expressions for DataFusion.
140///
141/// # Observability
142///
143/// `VortexSource` owns a Vortex metrics registry for the lifetime of a physical
144/// scan. The registry is passed to the reader and scan builder so I/O and scan
145/// metrics accumulate as the query executes.
146///
147/// Use [`VortexMetricsFinder`] to merge those metrics back into DataFusion
148/// `MetricsSet` values after the plan has run.
149///
150/// # Execution Flow
151///
152/// At execution time:
153///
154/// 1. DataFusion calls `DataSourceExec`, which delegates file opening to
155///    `VortexSource`.
156/// 2. `VortexSource` creates a `VortexOpener` configured with the current
157///    projection, predicate, options, and metrics.
158/// 3. The opener adapts filters and schema for the specific file, applies any
159///    [`VortexAccessPlan`], and builds a Vortex scan.
160/// 4. Scan results are converted into Arrow `RecordBatch` values for
161///    DataFusion.
162///
163/// [`VortexFormat`]: crate::VortexFormat
164/// [`FileScanConfig`]: datafusion_datasource::file_scan_config::FileScanConfig
165/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
166/// [`VortexFormatFactory`]: crate::VortexFormatFactory
167/// [`VortexReaderFactory`]: crate::reader::VortexReaderFactory
168/// [`VortexAccessPlan`]: crate::VortexAccessPlan
169/// [`FileMetadataCache`]: datafusion_execution::cache::cache_manager::FileMetadataCache
170/// [`VortexTableOptions::projection_pushdown`]: crate::VortexTableOptions::projection_pushdown
171/// [`VortexMetricsFinder`]: crate::metrics::VortexMetricsFinder
172#[derive(Clone)]
173pub struct VortexSource {
174    pub(crate) session: VortexSession,
175    pub(crate) table_schema: TableSchema,
176    pub(crate) projection: ProjectionExprs,
177    /// Combined predicate expression containing all filters from DataFusion query planning.
178    /// Used with FilePruner to skip files based on statistics and partition values.
179    pub(crate) full_predicate: Option<PhysicalExprRef>,
180    /// Subset of predicates that can be pushed down into Vortex scan operations.
181    /// These are expressions that Vortex can efficiently evaluate during scanning.
182    pub(crate) vortex_predicate: Option<PhysicalExprRef>,
183    pub(crate) batch_size: Option<usize>,
184    _unused_df_metrics: ExecutionPlanMetricsSet,
185    /// Shared layout readers, the source only lives as long as one scan.
186    ///
187    /// Sharing the readers allows us to only read every layout once from the file, even across partitions.
188    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
189    /// Shared full-file natural split ranges keyed by path.
190    natural_split_ranges: Arc<DashMap<Path, Arc<[Range<u64>]>>>,
191    expression_convertor: Arc<dyn ExpressionConvertor>,
192    pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
193    vx_metrics_registry: Arc<dyn MetricsRegistry>,
194    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
195    /// Whether to enable expression pushdown into the underlying Vortex scan.
196    options: VortexTableOptions,
197}
198
199impl VortexSource {
200    /// Creates a new `VortexSource` for a table schema and [`VortexSession`].
201    ///
202    /// The new source starts with:
203    ///
204    /// - all top-level columns projected,
205    /// - no pushed filters,
206    /// - a default Vortex metrics registry,
207    /// - default [`VortexTableOptions`].
208    pub fn new(table_schema: TableSchema, session: VortexSession) -> Self {
209        let full_schema = table_schema.table_schema();
210        let indices = (0..full_schema.fields().len()).collect::<Vec<_>>();
211        let projection = ProjectionExprs::from_indices(&indices, full_schema);
212
213        Self {
214            session,
215            table_schema,
216            projection,
217            full_predicate: None,
218            vortex_predicate: None,
219            batch_size: None,
220            _unused_df_metrics: Default::default(),
221            layout_readers: Arc::new(DashMap::default()),
222            natural_split_ranges: Arc::new(DashMap::default()),
223            expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
224            vortex_reader_factory: None,
225            vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
226            file_metadata_cache: None,
227            options: VortexTableOptions::default(),
228        }
229    }
230
231    /// Enables or disables Vortex-native projection evaluation.
232    ///
233    /// This toggles whether `VortexSource` tries to split DataFusion projection
234    /// expressions into a Vortex scan projection plus a leftover DataFusion
235    /// projection.
236    pub fn with_projection_pushdown(mut self, enabled: bool) -> Self {
237        self.options.projection_pushdown = enabled;
238        self
239    }
240
241    /// Sets the [`ExpressionConvertor`] used to translate DataFusion expressions
242    /// into Vortex expressions.
243    ///
244    /// Override this when the default converter is insufficient for an engine
245    /// integration or for a custom schema-adaptation strategy.
246    pub fn with_expression_convertor(
247        mut self,
248        expr_convertor: Arc<dyn ExpressionConvertor>,
249    ) -> Self {
250        self.expression_convertor = expr_convertor;
251        self
252    }
253
254    /// Sets a custom factory for the underlying [`VortexReadAt`].
255    ///
256    /// Use this when reads need to go through an application-specific layer
257    /// rather than the default DataFusion [`ObjectStore`].
258    ///
259    /// [`VortexReadAt`]: vortex::io::VortexReadAt
260    pub fn with_vortex_reader_factory(
261        mut self,
262        vortex_reader_factory: Arc<dyn VortexReaderFactory>,
263    ) -> Self {
264        self.vortex_reader_factory = Some(vortex_reader_factory);
265        self
266    }
267
268    /// Returns the [`MetricsRegistry`] attached to this scan.
269    ///
270    /// The registry is populated as files are opened and scanned. In most
271    /// callers, [`crate::metrics::VortexMetricsFinder`] is the more convenient
272    /// public API for turning the registry contents into DataFusion metrics.
273    pub fn metrics_registry(&self) -> &Arc<dyn MetricsRegistry> {
274        &self.vx_metrics_registry
275    }
276
277    /// Overrides the metadata cache used to reuse Vortex footers across scans.
278    pub fn with_file_metadata_cache(
279        mut self,
280        file_metadata_cache: Arc<dyn FileMetadataCache>,
281    ) -> Self {
282        self.file_metadata_cache = Some(file_metadata_cache);
283        self
284    }
285
286    /// Sets the per-file Vortex scan concurrency.
287    ///
288    /// This is separate from DataFusion's partition-level parallelism.
289    pub fn with_scan_concurrency(mut self, scan_concurrency: usize) -> Self {
290        self.options.scan_concurrency = Some(scan_concurrency);
291        self
292    }
293
294    /// Returns the effective table options for this source.
295    pub fn options(&self) -> &VortexTableOptions {
296        &self.options
297    }
298
299    /// Replaces the table options for this source.
300    pub fn with_options(mut self, opts: VortexTableOptions) -> Self {
301        self.options = opts;
302        self
303    }
304
305    fn create_vortex_opener(
306        &self,
307        object_store: Arc<dyn ObjectStore>,
308        base_config: &FileScanConfig,
309        partition: usize,
310    ) -> DFResult<VortexOpener> {
311        let batch_size = self
312            .batch_size
313            .vortex_expect("batch_size must be supplied to VortexSource");
314
315        let expr_adapter_factory = base_config
316            .expr_adapter_factory
317            .clone()
318            .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory));
319
320        let vortex_reader_factory = self
321            .vortex_reader_factory
322            .clone()
323            .unwrap_or_else(|| Arc::new(DefaultVortexReaderFactory::new(object_store)));
324
325        let opener = VortexOpener {
326            partition,
327            session: self.session.clone(),
328            vortex_reader_factory,
329            projection: self.projection.clone(),
330            filter: self.vortex_predicate.clone(),
331            file_pruning_predicate: self.full_predicate.clone(),
332            expr_adapter_factory,
333            table_schema: self.table_schema.clone(),
334            batch_size,
335            limit: base_config.limit.map(|l| l as u64),
336            metrics_registry: Arc::clone(&self.vx_metrics_registry),
337            layout_readers: Arc::clone(&self.layout_readers),
338            natural_split_ranges: Arc::clone(&self.natural_split_ranges),
339            has_output_ordering: !base_config.output_ordering.is_empty(),
340            expression_convertor: Arc::clone(&self.expression_convertor),
341            file_metadata_cache: self.file_metadata_cache.clone(),
342            projection_pushdown: self.options.projection_pushdown,
343            scan_concurrency: self.options.scan_concurrency,
344        };
345
346        Ok(opener)
347    }
348}
349
350impl FileSource for VortexSource {
351    fn create_file_opener(
352        &self,
353        object_store: Arc<dyn ObjectStore>,
354        base_config: &FileScanConfig,
355        partition: usize,
356    ) -> DFResult<Arc<dyn FileOpener>> {
357        Ok(Arc::new(self.create_vortex_opener(
358            object_store,
359            base_config,
360            partition,
361        )?))
362    }
363
364    fn as_any(&self) -> &dyn Any {
365        self
366    }
367
368    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
369        let mut source = self.clone();
370        source.batch_size = Some(batch_size);
371        Arc::new(source)
372    }
373
374    fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
375        self.vortex_predicate.clone()
376    }
377
378    fn metrics(&self) -> &ExecutionPlanMetricsSet {
379        &self._unused_df_metrics
380    }
381
382    fn file_type(&self) -> &str {
383        VORTEX_FILE_EXTENSION
384    }
385
386    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
387        match t {
388            DisplayFormatType::Default | DisplayFormatType::Verbose => {
389                if let Some(predicate) = &self.vortex_predicate {
390                    write!(f, ", predicate: {predicate}")?;
391                }
392            }
393            // Use TreeRender style key=value formatting to display the predicate
394            DisplayFormatType::TreeRender => {
395                if let Some(predicate) = &self.vortex_predicate {
396                    writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
397                };
398            }
399        }
400        Ok(())
401    }
402
403    fn supports_repartitioning(&self) -> bool {
404        true
405    }
406
407    fn try_pushdown_filters(
408        &self,
409        filters: Vec<Arc<dyn PhysicalExpr>>,
410        _config: &ConfigOptions,
411    ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
412        if filters.is_empty() {
413            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
414                vec![],
415            ));
416        }
417
418        let mut source = self.clone();
419
420        // Combine new filters with existing predicate for file pruning.
421        // This full predicate is used by FilePruner to eliminate files.
422        source.full_predicate = match source.full_predicate {
423            Some(predicate) => Some(conjunction(
424                std::iter::once(predicate).chain(filters.clone()),
425            )),
426            None => Some(conjunction(filters.clone())),
427        };
428
429        let supported_filters = filters
430            .into_iter()
431            .map(|expr| {
432                if self
433                    .expression_convertor
434                    .can_be_pushed_down(&expr, self.table_schema.file_schema())
435                {
436                    PushedDownPredicate::supported(expr)
437                } else {
438                    PushedDownPredicate::unsupported(expr)
439                }
440            })
441            .collect::<Vec<_>>();
442
443        if supported_filters
444            .iter()
445            .all(|p| matches!(p.discriminant, PushedDown::No))
446        {
447            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
448                vec![PushedDown::No; supported_filters.len()],
449            )
450            .with_updated_node(Arc::new(source) as _));
451        }
452
453        let supported = supported_filters
454            .iter()
455            .filter_map(|p| match p.discriminant {
456                PushedDown::Yes => Some(&p.predicate),
457                PushedDown::No => None,
458            })
459            .cloned();
460
461        let predicate = match source.vortex_predicate {
462            Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
463            None => conjunction(supported),
464        };
465
466        tracing::debug!(%predicate, "Saving predicate");
467
468        source.vortex_predicate = Some(predicate);
469
470        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
471            supported_filters.iter().map(|f| f.discriminant).collect(),
472        )
473        .with_updated_node(Arc::new(source) as _))
474    }
475
476    fn try_pushdown_projection(
477        &self,
478        projection: &ProjectionExprs,
479    ) -> DFResult<Option<Arc<dyn FileSource>>> {
480        let mut source = self.clone();
481        source.projection = self.projection.try_merge(projection)?;
482        Ok(Some(Arc::new(source)))
483    }
484
485    fn projection(&self) -> Option<&ProjectionExprs> {
486        Some(&self.projection)
487    }
488
489    fn table_schema(&self) -> &TableSchema {
490        &self.table_schema
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use arrow_schema::DataType;
497    use arrow_schema::Field;
498    use arrow_schema::Schema;
499    use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
500    use datafusion_execution::object_store::ObjectStoreUrl;
501    use object_store::memory::InMemory;
502    use vortex::VortexSessionDefault;
503
504    use super::*;
505    use crate::convert::exprs::ProcessedProjection;
506
507    struct TrackingExpressionConvertor {
508        inner: DefaultExpressionConvertor,
509    }
510
511    impl ExpressionConvertor for TrackingExpressionConvertor {
512        fn can_be_pushed_down(&self, expr: &PhysicalExprRef, schema: &Schema) -> bool {
513            self.inner.can_be_pushed_down(expr, schema)
514        }
515
516        fn convert(&self, expr: &dyn PhysicalExpr) -> DFResult<vortex::expr::Expression> {
517            self.inner.convert(expr)
518        }
519
520        fn split_projection(
521            &self,
522            source_projection: ProjectionExprs,
523            input_schema: &Schema,
524            output_schema: &Schema,
525        ) -> DFResult<ProcessedProjection> {
526            self.inner
527                .split_projection(source_projection, input_schema, output_schema)
528        }
529
530        fn no_pushdown_projection(
531            &self,
532            source_projection: ProjectionExprs,
533            input_schema: &Schema,
534        ) -> DFResult<ProcessedProjection> {
535            self.inner
536                .no_pushdown_projection(source_projection, input_schema)
537        }
538    }
539
540    #[test]
541    fn create_vortex_opener_preserves_expression_convertor() -> anyhow::Result<()> {
542        let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
543        let expression_convertor = Arc::new(TrackingExpressionConvertor {
544            inner: DefaultExpressionConvertor::default(),
545        }) as Arc<dyn ExpressionConvertor>;
546
547        let mut source = VortexSource::new(
548            TableSchema::from_file_schema(file_schema),
549            VortexSession::default(),
550        )
551        .with_expression_convertor(Arc::clone(&expression_convertor));
552        source.batch_size = Some(100);
553
554        let config = FileScanConfigBuilder::new(
555            ObjectStoreUrl::local_filesystem(),
556            Arc::new(source.clone()),
557        )
558        .build();
559
560        let opener = source.create_vortex_opener(
561            Arc::new(InMemory::new()) as Arc<dyn ObjectStore>,
562            &config,
563            0,
564        )?;
565
566        assert!(Arc::ptr_eq(
567            &opener.expression_convertor,
568            &expression_convertor
569        ));
570        Ok(())
571    }
572}