vortex_datafusion/persistent/
source.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Formatter;
6use std::sync::Arc;
7use std::sync::Weak;
8
9use datafusion_common::Result as DFResult;
10use datafusion_common::Statistics;
11use datafusion_common::config::ConfigOptions;
12use datafusion_datasource::TableSchema;
13use datafusion_datasource::file::FileSource;
14use datafusion_datasource::file_scan_config::FileScanConfig;
15use datafusion_datasource::file_stream::FileOpener;
16use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
17use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
18use datafusion_physical_expr::PhysicalExprRef;
19use datafusion_physical_expr::conjunction;
20use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
21use datafusion_physical_expr_common::physical_expr::fmt_sql;
22use datafusion_physical_plan::DisplayFormatType;
23use datafusion_physical_plan::PhysicalExpr;
24use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
25use datafusion_physical_plan::filter_pushdown::PushedDown;
26use datafusion_physical_plan::filter_pushdown::PushedDownPredicate;
27use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
28use object_store::ObjectStore;
29use object_store::path::Path;
30use vortex::error::VortexExpect as _;
31use vortex::file::VORTEX_FILE_EXTENSION;
32use vortex::layout::LayoutReader;
33use vortex::metrics::MetricsSessionExt;
34use vortex::session::VortexSession;
35use vortex_utils::aliases::dash_map::DashMap;
36
37use super::cache::VortexFileCache;
38use super::metrics::PARTITION_LABEL;
39use super::opener::VortexOpener;
40use crate::convert::exprs::DefaultExpressionConvertor;
41use crate::convert::exprs::ExpressionConvertor;
42use crate::convert::exprs::can_be_pushed_down;
43use crate::vendor::schema_rewriter::DF52PhysicalExprAdapterFactory;
44
45/// Execution plan for reading one or more Vortex files, intended to be consumed by [`DataSourceExec`].
46///
47/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
48#[derive(Clone)]
49pub struct VortexSource {
50    pub(crate) session: VortexSession,
51    pub(crate) file_cache: VortexFileCache,
52    /// Combined predicate expression containing all filters from DataFusion query planning.
53    /// Used with FilePruner to skip files based on statistics and partition values.
54    pub(crate) full_predicate: Option<PhysicalExprRef>,
55    /// Subset of predicates that can be pushed down into Vortex scan operations.
56    /// These are expressions that Vortex can efficiently evaluate during scanning.
57    pub(crate) vortex_predicate: Option<PhysicalExprRef>,
58    pub(crate) batch_size: Option<usize>,
59    pub(crate) projected_statistics: Option<Statistics>,
60    pub(crate) table_schema: Option<TableSchema>,
61    pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
62    pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
63    _unused_df_metrics: ExecutionPlanMetricsSet,
64    /// Shared layout readers, the source only lives as long as one scan.
65    ///
66    /// Sharing the readers allows us to only read every layout once from the file, even across partitions.
67    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
68    expression_convertor: Arc<dyn ExpressionConvertor>,
69}
70
71impl VortexSource {
72    pub(crate) fn new(session: VortexSession, file_cache: VortexFileCache) -> Self {
73        Self {
74            session,
75            file_cache,
76            full_predicate: None,
77            vortex_predicate: None,
78            batch_size: None,
79            projected_statistics: None,
80            table_schema: None,
81            schema_adapter_factory: None,
82            expr_adapter_factory: None,
83            _unused_df_metrics: Default::default(),
84            layout_readers: Arc::new(DashMap::default()),
85            expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
86        }
87    }
88
89    /// Set a [`ExpressionConvertor`] to control how Datafusion expression should be converted and pushed down.
90    pub fn with_expression_convertor(
91        mut self,
92        expr_convertor: Arc<dyn ExpressionConvertor>,
93    ) -> Self {
94        self.expression_convertor = expr_convertor;
95        self
96    }
97}
98
99impl FileSource for VortexSource {
100    fn create_file_opener(
101        &self,
102        object_store: Arc<dyn ObjectStore>,
103        base_config: &FileScanConfig,
104        partition: usize,
105    ) -> Arc<dyn FileOpener> {
106        let partition_metrics = self
107            .session
108            .metrics()
109            .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter());
110
111        let batch_size = self
112            .batch_size
113            .vortex_expect("batch_size must be supplied to VortexSource");
114
115        let expr_adapter = self
116            .expr_adapter_factory
117            .as_ref()
118            .or(base_config.expr_adapter_factory.as_ref());
119
120        if expr_adapter.is_some() {
121            tracing::warn!(
122                "Schema evolution with VortexSource may not work as expected if you override the adapter."
123            );
124        }
125
126        let schema_adapter = self.schema_adapter_factory.as_ref();
127
128        // This match is here to support the behavior defined by [`ListingTable`], see https://github.com/apache/datafusion/issues/16800 for more details.
129        let (expr_adapter_factory, schema_adapter_factory) = match (expr_adapter, schema_adapter) {
130            (Some(expr_adapter), Some(schema_adapter)) => {
131                (Some(expr_adapter.clone()), schema_adapter.clone())
132            }
133            (Some(expr_adapter), None) => (
134                Some(expr_adapter.clone()),
135                Arc::new(DefaultSchemaAdapterFactory) as _,
136            ),
137            (None, Some(schema_adapter)) => {
138                // If no `PhysicalExprAdapterFactory` is specified, we only use the provided `SchemaAdapterFactory`
139                (None, schema_adapter.clone())
140            }
141            (None, None) => (
142                Some(Arc::new(DF52PhysicalExprAdapterFactory) as _),
143                Arc::new(DefaultSchemaAdapterFactory) as _,
144            ),
145        };
146
147        let projection = base_config.file_column_projection_indices().map(Arc::from);
148
149        let table_schema = base_config.table_schema.clone();
150
151        let opener = VortexOpener {
152            session: self.session.clone(),
153            object_store,
154            projection,
155            filter: self.vortex_predicate.clone(),
156            file_pruning_predicate: self.full_predicate.clone(),
157            expr_adapter_factory,
158            schema_adapter_factory,
159            table_schema,
160            file_cache: self.file_cache.clone(),
161            batch_size,
162            limit: base_config.limit,
163            metrics: partition_metrics,
164            layout_readers: self.layout_readers.clone(),
165            has_output_ordering: !base_config.output_ordering.is_empty(),
166            expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
167        };
168
169        Arc::new(opener)
170    }
171
172    fn as_any(&self) -> &dyn Any {
173        self
174    }
175
176    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
177        let mut source = self.clone();
178        source.batch_size = Some(batch_size);
179        Arc::new(source)
180    }
181
182    fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
183        let mut source = self.clone();
184        source.table_schema = Some(schema);
185        Arc::new(source)
186    }
187
188    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
189        Arc::new(self.clone())
190    }
191
192    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
193        let mut source = self.clone();
194        source.projected_statistics = Some(statistics);
195        Arc::new(source)
196    }
197
198    fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
199        self.vortex_predicate.clone()
200    }
201
202    fn metrics(&self) -> &ExecutionPlanMetricsSet {
203        &self._unused_df_metrics
204    }
205
206    fn statistics(&self) -> DFResult<Statistics> {
207        let statistics = self
208            .projected_statistics
209            .clone()
210            .vortex_expect("projected_statistics must be set");
211
212        if self.vortex_predicate.is_some() {
213            Ok(statistics.to_inexact())
214        } else {
215            Ok(statistics)
216        }
217    }
218
219    fn file_type(&self) -> &str {
220        VORTEX_FILE_EXTENSION
221    }
222
223    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
224        match t {
225            DisplayFormatType::Default | DisplayFormatType::Verbose => {
226                if let Some(ref predicate) = self.vortex_predicate {
227                    write!(f, ", predicate: {predicate}")?;
228                }
229            }
230            // Use TreeRender style key=value formatting to display the predicate
231            DisplayFormatType::TreeRender => {
232                if let Some(ref predicate) = self.vortex_predicate {
233                    writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
234                };
235            }
236        }
237        Ok(())
238    }
239
240    fn try_pushdown_filters(
241        &self,
242        filters: Vec<Arc<dyn PhysicalExpr>>,
243        _config: &ConfigOptions,
244    ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
245        if filters.is_empty() {
246            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
247                vec![],
248            ));
249        }
250
251        let Some(table_schema) = self.table_schema.as_ref() else {
252            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
253                vec![PushedDown::No; filters.len()],
254            ));
255        };
256
257        let mut source = self.clone();
258
259        // Combine new filters with existing predicate for file pruning.
260        // This full predicate is used by FilePruner to eliminate files.
261        source.full_predicate = match source.full_predicate {
262            Some(predicate) => Some(conjunction(
263                std::iter::once(predicate).chain(filters.clone()),
264            )),
265            None => Some(conjunction(filters.clone())),
266        };
267
268        let supported_filters = filters
269            .into_iter()
270            .map(|expr| {
271                if can_be_pushed_down(&expr, table_schema.file_schema()) {
272                    PushedDownPredicate::supported(expr)
273                } else {
274                    PushedDownPredicate::unsupported(expr)
275                }
276            })
277            .collect::<Vec<_>>();
278
279        if supported_filters
280            .iter()
281            .all(|p| matches!(p.discriminant, PushedDown::No))
282        {
283            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
284                vec![PushedDown::No; supported_filters.len()],
285            )
286            .with_updated_node(Arc::new(source) as _));
287        }
288
289        let supported = supported_filters
290            .iter()
291            .filter_map(|p| match p.discriminant {
292                PushedDown::Yes => Some(&p.predicate),
293                PushedDown::No => None,
294            })
295            .cloned();
296
297        let predicate = match source.vortex_predicate {
298            Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
299            None => conjunction(supported),
300        };
301
302        tracing::debug!(%predicate, "Saving predicate");
303
304        source.vortex_predicate = Some(predicate);
305
306        Ok(FilterPushdownPropagation::with_parent_pushdown_result(
307            supported_filters.iter().map(|f| f.discriminant).collect(),
308        )
309        .with_updated_node(Arc::new(source) as _))
310    }
311
312    fn with_schema_adapter_factory(
313        &self,
314        factory: Arc<dyn SchemaAdapterFactory>,
315    ) -> DFResult<Arc<dyn FileSource>> {
316        let mut source = self.clone();
317        source.schema_adapter_factory = Some(factory);
318        Ok(Arc::new(source))
319    }
320
321    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
322        self.schema_adapter_factory.clone()
323    }
324}