Skip to main content

vortex_datafusion/persistent/
source.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Formatter;
6use std::sync::Arc;
7use std::sync::Weak;
8
9use datafusion_common::Result as DFResult;
10use datafusion_common::config::ConfigOptions;
11use datafusion_datasource::TableSchema;
12use datafusion_datasource::file::FileSource;
13use datafusion_datasource::file_scan_config::FileScanConfig;
14use datafusion_datasource::file_stream::FileOpener;
15use datafusion_execution::cache::cache_manager::FileMetadataCache;
16use datafusion_physical_expr::PhysicalExprRef;
17use datafusion_physical_expr::conjunction;
18use datafusion_physical_expr::projection::ProjectionExprs;
19use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
20use datafusion_physical_expr_common::physical_expr::fmt_sql;
21use datafusion_physical_plan::DisplayFormatType;
22use datafusion_physical_plan::PhysicalExpr;
23use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
24use datafusion_physical_plan::filter_pushdown::PushedDown;
25use datafusion_physical_plan::filter_pushdown::PushedDownPredicate;
26use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
27use object_store::ObjectStore;
28use object_store::path::Path;
29use vortex::error::VortexExpect;
30use vortex::file::VORTEX_FILE_EXTENSION;
31use vortex::layout::LayoutReader;
32use vortex::metrics::DefaultMetricsRegistry;
33use vortex::metrics::MetricsRegistry;
34use vortex::session::VortexSession;
35use vortex_utils::aliases::dash_map::DashMap;
36
37use super::opener::VortexOpener;
38use crate::VortexTableOptions;
39use crate::convert::exprs::DefaultExpressionConvertor;
40use crate::convert::exprs::ExpressionConvertor;
41use crate::persistent::reader::DefaultVortexReaderFactory;
42use crate::persistent::reader::VortexReaderFactory;
43
44/// Execution plan for reading one or more Vortex files, intended to be consumed by [`DataSourceExec`].
45///
46/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
47#[derive(Clone)]
48pub struct VortexSource {
49    pub(crate) session: VortexSession,
50    pub(crate) table_schema: TableSchema,
51    pub(crate) projection: ProjectionExprs,
52    /// Combined predicate expression containing all filters from DataFusion query planning.
53    /// Used with FilePruner to skip files based on statistics and partition values.
54    pub(crate) full_predicate: Option<PhysicalExprRef>,
55    /// Subset of predicates that can be pushed down into Vortex scan operations.
56    /// These are expressions that Vortex can efficiently evaluate during scanning.
57    pub(crate) vortex_predicate: Option<PhysicalExprRef>,
58    pub(crate) batch_size: Option<usize>,
59    _unused_df_metrics: ExecutionPlanMetricsSet,
60    /// Shared layout readers, the source only lives as long as one scan.
61    ///
62    /// Sharing the readers allows us to only read every layout once from the file, even across partitions.
63    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
64    expression_convertor: Arc<dyn ExpressionConvertor>,
65    pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
66    vx_metrics_registry: Arc<dyn MetricsRegistry>,
67    file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
68    /// Whether to enable expression pushdown into the underlying Vortex scan.
69    options: VortexTableOptions,
70}
71
72impl VortexSource {
73    /// Creates a new VortexSource with default configuration and a provided [`VortexSession`].
74    /// Meant to be use with a [`FileScanConfig`] to scan a file with the provided schema.
75    ///
76    /// Can be configured using the provided methods.
77    pub fn new(table_schema: TableSchema, session: VortexSession) -> Self {
78        let full_schema = table_schema.table_schema();
79        let indices = (0..full_schema.fields().len()).collect::<Vec<_>>();
80        let projection = ProjectionExprs::from_indices(&indices, full_schema);
81
82        Self {
83            session,
84            table_schema,
85            projection,
86            full_predicate: None,
87            vortex_predicate: None,
88            batch_size: None,
89            _unused_df_metrics: Default::default(),
90            layout_readers: Arc::new(DashMap::default()),
91            expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
92            vortex_reader_factory: None,
93            vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
94            file_metadata_cache: None,
95            options: VortexTableOptions::default(),
96        }
97    }
98
99    /// Enable or disable expression pushdown into the underlying Vortex scan.
100    pub fn with_projection_pushdown(mut self, enabled: bool) -> Self {
101        self.options.projection_pushdown = enabled;
102        self
103    }
104
105    /// Set a [`ExpressionConvertor`] to control how Datafusion expression should be converted and pushed down.
106    pub fn with_expression_convertor(
107        mut self,
108        expr_convertor: Arc<dyn ExpressionConvertor>,
109    ) -> Self {
110        self.expression_convertor = expr_convertor;
111        self
112    }
113
114    /// Set a user-defined factory to create the underlying [`VortexReadAt`]
115    ///
116    /// [`VortexReadAt`]: vortex::io::VortexReadAt
117    pub fn with_vortex_reader_factory(
118        mut self,
119        vortex_reader_factory: Arc<dyn VortexReaderFactory>,
120    ) -> Self {
121        self.vortex_reader_factory = Some(vortex_reader_factory);
122        self
123    }
124
125    /// Returns the [`MetricsRegistry`] attached to this source.
126    pub fn metrics_registry(&self) -> &Arc<dyn MetricsRegistry> {
127        &self.vx_metrics_registry
128    }
129
130    /// Override the file metadata cache
131    pub fn with_file_metadata_cache(
132        mut self,
133        file_metadata_cache: Arc<dyn FileMetadataCache>,
134    ) -> Self {
135        self.file_metadata_cache = Some(file_metadata_cache);
136        self
137    }
138
139    /// Set the underlying scan concurrency. This limit is used per Vortex scan operations.
140    pub fn with_scan_concurrency(mut self, scan_concurrency: usize) -> Self {
141        self.options.scan_concurrency = Some(scan_concurrency);
142        self
143    }
144
145    /// Returns the table options for this source.
146    pub fn options(&self) -> &VortexTableOptions {
147        &self.options
148    }
149
150    /// Set the table options for this source.
151    pub fn with_options(mut self, opts: VortexTableOptions) -> Self {
152        self.options = opts;
153        self
154    }
155}
156
157impl FileSource for VortexSource {
158    fn create_file_opener(
159        &self,
160        object_store: Arc<dyn ObjectStore>,
161        base_config: &FileScanConfig,
162        partition: usize,
163    ) -> DFResult<Arc<dyn FileOpener>> {
164        let batch_size = self
165            .batch_size
166            .vortex_expect("batch_size must be supplied to VortexSource");
167
168        let expr_adapter_factory = base_config
169            .expr_adapter_factory
170            .clone()
171            .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory));
172
173        let vortex_reader_factory = self
174            .vortex_reader_factory
175            .clone()
176            .unwrap_or_else(|| Arc::new(DefaultVortexReaderFactory::new(object_store)));
177
178        let opener = VortexOpener {
179            partition,
180            session: self.session.clone(),
181            vortex_reader_factory,
182            projection: self.projection.clone(),
183            filter: self.vortex_predicate.clone(),
184            file_pruning_predicate: self.full_predicate.clone(),
185            expr_adapter_factory,
186            table_schema: self.table_schema.clone(),
187            batch_size,
188            limit: base_config.limit.map(|l| l as u64),
189            metrics_registry: self.vx_metrics_registry.clone(),
190            layout_readers: self.layout_readers.clone(),
191            has_output_ordering: !base_config.output_ordering.is_empty(),
192            expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
193            file_metadata_cache: self.file_metadata_cache.clone(),
194            projection_pushdown: self.options.projection_pushdown,
195            scan_concurrency: self.options.scan_concurrency,
196        };
197
198        Ok(Arc::new(opener))
199    }
200
201    fn as_any(&self) -> &dyn Any {
202        self
203    }
204
205    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
206        let mut source = self.clone();
207        source.batch_size = Some(batch_size);
208        Arc::new(source)
209    }
210
211    fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
212        self.vortex_predicate.clone()
213    }
214
215    fn metrics(&self) -> &ExecutionPlanMetricsSet {
216        &self._unused_df_metrics
217    }
218
219    fn file_type(&self) -> &str {
220        VORTEX_FILE_EXTENSION
221    }
222
223    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
224        match t {
225            DisplayFormatType::Default | DisplayFormatType::Verbose => {
226                if let Some(ref predicate) = self.vortex_predicate {
227                    write!(f, ", predicate: {predicate}")?;
228                }
229            }
230            // Use TreeRender style key=value formatting to display the predicate
231            DisplayFormatType::TreeRender => {
232                if let Some(ref predicate) = self.vortex_predicate {
233                    writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
234                };
235            }
236        }
237        Ok(())
238    }
239
240    fn supports_repartitioning(&self) -> bool {
241        true
242    }
243
244    fn try_pushdown_filters(
245        &self,
246        filters: Vec<Arc<dyn PhysicalExpr>>,
247        _config: &ConfigOptions,
248    ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
249        if filters.is_empty() {
250            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
251                vec![],
252            ));
253        }
254
255        let mut source = self.clone();
256
257        // Combine new filters with existing predicate for file pruning.
258        // This full predicate is used by FilePruner to eliminate files.
259        source.full_predicate = match source.full_predicate {
260            Some(predicate) => Some(conjunction(
261                std::iter::once(predicate).chain(filters.clone()),
262            )),
263            None => Some(conjunction(filters.clone())),
264        };
265
266        let supported_filters = filters
267            .into_iter()
268            .map(|expr| {
269                if self
270                    .expression_convertor
271                    .can_be_pushed_down(&expr, self.table_schema.file_schema())
272                {
273                    PushedDownPredicate::supported(expr)
274                } else {
275                    PushedDownPredicate::unsupported(expr)
276                }
277            })
278            .collect::<Vec<_>>();
279
280        if supported_filters
281            .iter()
282            .all(|p| matches!(p.discriminant, PushedDown::No))
283        {
284            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
285                vec![PushedDown::No; supported_filters.len()],
286            )
287            .with_updated_node(Arc::new(source) as _));
288        }
289
290        let supported = supported_filters
291            .iter()
292            .filter_map(|p| match p.discriminant {
293                PushedDown::Yes => Some(&p.predicate),
294                PushedDown::No => None,
295            })
296            .cloned();
297
298        let predicate = match source.vortex_predicate {
299            Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
300            None => conjunction(supported),
301        };
302
303        tracing::debug!(%predicate, "Saving predicate");
304
305        source.vortex_predicate = Some(predicate);
306
307        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
308            supported_filters.iter().map(|f| f.discriminant).collect(),
309        )
310        .with_updated_node(Arc::new(source) as _))
311    }
312
313    fn try_pushdown_projection(
314        &self,
315        projection: &ProjectionExprs,
316    ) -> DFResult<Option<Arc<dyn FileSource>>> {
317        let mut source = self.clone();
318        source.projection = self.projection.try_merge(projection)?;
319        Ok(Some(Arc::new(source)))
320    }
321
322    fn projection(&self) -> Option<&ProjectionExprs> {
323        Some(&self.projection)
324    }
325
326    fn table_schema(&self) -> &TableSchema {
327        &self.table_schema
328    }
329}