vortex_datafusion/persistent/
source.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Formatter;
6use std::sync::{Arc, Weak};
7
8use arrow_schema::SchemaRef;
9use datafusion_common::config::ConfigOptions;
10use datafusion_common::{Result as DFResult, Statistics};
11use datafusion_datasource::file::FileSource;
12use datafusion_datasource::file_scan_config::FileScanConfig;
13use datafusion_datasource::file_stream::FileOpener;
14use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
15use datafusion_physical_expr::schema_rewriter::{
16    DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory,
17};
18use datafusion_physical_expr::{PhysicalExprRef, conjunction};
19use datafusion_physical_plan::filter_pushdown::{
20    FilterPushdownPropagation, PushedDown, PushedDownPredicate,
21};
22use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
23use datafusion_physical_plan::{DisplayFormatType, PhysicalExpr};
24use object_store::ObjectStore;
25use object_store::path::Path;
26use vortex::error::VortexExpect as _;
27use vortex::file::VORTEX_FILE_EXTENSION;
28use vortex::layout::LayoutReader;
29use vortex::metrics::VortexMetrics;
30use vortex_utils::aliases::dash_map::DashMap;
31
32use super::cache::VortexFileCache;
33use super::metrics::PARTITION_LABEL;
34use super::opener::VortexOpener;
35use crate::convert::exprs::can_be_pushed_down;
36
37/// Execution plan for reading one or more Vortex files, intended to be consumed by [`DataSourceExec`].
38///
39/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
40#[derive(Clone)]
41pub struct VortexSource {
42    pub(crate) file_cache: VortexFileCache,
43    pub(crate) predicate: Option<PhysicalExprRef>,
44    pub(crate) batch_size: Option<usize>,
45    pub(crate) projected_statistics: Option<Statistics>,
46    /// This is the file schema the table expects, which is the table's schema without partition columns, and **not** the file's physical schema.
47    pub(crate) arrow_file_schema: Option<SchemaRef>,
48    pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
49    pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
50    pub(crate) metrics: VortexMetrics,
51    _unused_df_metrics: ExecutionPlanMetricsSet,
52    /// Shared layout readers, the source only lives as long as one scan.
53    ///
54    /// Sharing the readers allows us to only read every layout once from the file, even across partitions.
55    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
56}
57
58impl VortexSource {
59    pub(crate) fn new(file_cache: VortexFileCache, metrics: VortexMetrics) -> Self {
60        Self {
61            file_cache,
62            metrics,
63            predicate: None,
64            batch_size: None,
65            projected_statistics: None,
66            arrow_file_schema: None,
67            schema_adapter_factory: None,
68            expr_adapter_factory: None,
69            _unused_df_metrics: Default::default(),
70            layout_readers: Arc::new(DashMap::default()),
71        }
72    }
73
74    /// Sets a [`PhysicalExprAdapterFactory`] for the [`VortexSource`].
75    /// Currently, this must be provided in order to filter columns in files that have a different data type from the unified table schema.
76    ///
77    /// This factory will take precedence when opening files over instances provided by the [`FileScanConfig`].
78    pub fn with_expr_adapter_factory(
79        &self,
80        expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
81    ) -> Arc<dyn FileSource> {
82        let mut source = self.clone();
83        source.expr_adapter_factory = Some(expr_adapter_factory);
84        Arc::new(source)
85    }
86}
87
88impl FileSource for VortexSource {
89    fn create_file_opener(
90        &self,
91        object_store: Arc<dyn ObjectStore>,
92        base_config: &FileScanConfig,
93        partition: usize,
94    ) -> Arc<dyn FileOpener> {
95        let partition_metrics = self
96            .metrics
97            .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter());
98
99        let batch_size = self
100            .batch_size
101            .vortex_expect("batch_size must be supplied to VortexSource");
102
103        let expr_adapter = self
104            .expr_adapter_factory
105            .as_ref()
106            .or(base_config.expr_adapter_factory.as_ref());
107        let schema_adapter = self.schema_adapter_factory.as_ref();
108
109        // This match is here to support the behavior defined by [`ListingTable`], see https://github.com/apache/datafusion/issues/16800 for more details.
110        let (expr_adapter_factory, schema_adapter_factory) = match (expr_adapter, schema_adapter) {
111            (Some(expr_adapter), Some(schema_adapter)) => {
112                (Some(expr_adapter.clone()), schema_adapter.clone())
113            }
114            (Some(expr_adapter), None) => (
115                Some(expr_adapter.clone()),
116                Arc::new(DefaultSchemaAdapterFactory) as _,
117            ),
118            (None, Some(schema_adapter)) => {
119                // If no `PhysicalExprAdapterFactory` is specified, we only use the provided `SchemaAdapterFactory`
120                (None, schema_adapter.clone())
121            }
122            (None, None) => (
123                Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
124                Arc::new(DefaultSchemaAdapterFactory) as _,
125            ),
126        };
127
128        let projection = base_config.file_column_projection_indices().map(Arc::from);
129
130        let opener = VortexOpener {
131            object_store,
132            projection,
133            filter: self.predicate.clone(),
134            expr_adapter_factory,
135            schema_adapter_factory,
136            partition_fields: base_config.table_partition_cols.clone(),
137            logical_schema: base_config.file_schema.clone(),
138            file_cache: self.file_cache.clone(),
139            batch_size,
140            limit: base_config.limit,
141            metrics: partition_metrics,
142            layout_readers: self.layout_readers.clone(),
143        };
144
145        Arc::new(opener)
146    }
147
148    fn as_any(&self) -> &dyn Any {
149        self
150    }
151
152    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
153        let mut source = self.clone();
154        source.batch_size = Some(batch_size);
155        Arc::new(source)
156    }
157
158    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
159        let mut source = self.clone();
160        source.arrow_file_schema = Some(schema);
161        Arc::new(source)
162    }
163
164    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
165        Arc::new(self.clone())
166    }
167
168    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
169        let mut source = self.clone();
170        source.projected_statistics = Some(statistics);
171        Arc::new(source)
172    }
173
174    fn metrics(&self) -> &ExecutionPlanMetricsSet {
175        &self._unused_df_metrics
176    }
177
178    fn statistics(&self) -> DFResult<Statistics> {
179        let statistics = self
180            .projected_statistics
181            .clone()
182            .vortex_expect("projected_statistics must be set");
183
184        if self.predicate.is_some() {
185            Ok(statistics.to_inexact())
186        } else {
187            Ok(statistics)
188        }
189    }
190
191    fn file_type(&self) -> &str {
192        VORTEX_FILE_EXTENSION
193    }
194
195    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
196        match t {
197            DisplayFormatType::Default | DisplayFormatType::Verbose => {
198                if let Some(ref predicate) = self.predicate {
199                    write!(f, ", predicate: {predicate}")?;
200                }
201            }
202            // Use TreeRender style key=value formatting to display the predicate
203            DisplayFormatType::TreeRender => {
204                if let Some(ref predicate) = self.predicate {
205                    writeln!(f, "predicate={predicate}")?;
206                };
207            }
208        }
209        Ok(())
210    }
211
212    fn try_pushdown_filters(
213        &self,
214        filters: Vec<Arc<dyn PhysicalExpr>>,
215        _config: &ConfigOptions,
216    ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
217        let Some(schema) = self.arrow_file_schema.as_ref() else {
218            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
219                vec![PushedDown::No; filters.len()],
220            ));
221        };
222
223        let mut source = self.clone();
224
225        let filters = filters
226            .into_iter()
227            .map(|expr| {
228                if can_be_pushed_down(&expr, schema) {
229                    PushedDownPredicate::supported(expr)
230                } else {
231                    PushedDownPredicate::unsupported(expr)
232                }
233            })
234            .collect::<Vec<_>>();
235
236        if filters
237            .iter()
238            .all(|p| matches!(p.discriminant, PushedDown::No))
239        {
240            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
241                vec![PushedDown::No; filters.len()],
242            ));
243        }
244
245        let supported = filters
246            .iter()
247            .filter_map(|p| match p.discriminant {
248                PushedDown::Yes => Some(&p.predicate),
249                PushedDown::No => None,
250            })
251            .cloned();
252
253        let predicate = match source.predicate {
254            Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
255            None => conjunction(supported),
256        };
257        source.predicate = Some(predicate);
258
259        let pushdown_propagation = if source.predicate.clone().is_some() {
260            FilterPushdownPropagation::with_parent_pushdown_result(
261                filters.iter().map(|f| f.discriminant).collect(),
262            )
263            .with_updated_node(Arc::new(source) as _)
264        } else {
265            FilterPushdownPropagation::with_parent_pushdown_result(vec![
266                PushedDown::No;
267                filters.len()
268            ])
269        };
270
271        Ok(pushdown_propagation)
272    }
273
274    fn with_schema_adapter_factory(
275        &self,
276        factory: Arc<dyn SchemaAdapterFactory>,
277    ) -> DFResult<Arc<dyn FileSource>> {
278        let mut source = self.clone();
279        source.schema_adapter_factory = Some(factory);
280        Ok(Arc::new(source))
281    }
282
283    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
284        None
285    }
286}