vortex_datafusion/persistent/
source.rs1use std::any::Any;
5use std::fmt::Formatter;
6use std::sync::Arc;
7use std::sync::Weak;
8
9use datafusion_common::Result as DFResult;
10use datafusion_common::Statistics;
11use datafusion_common::config::ConfigOptions;
12use datafusion_datasource::TableSchema;
13use datafusion_datasource::file::FileSource;
14use datafusion_datasource::file_scan_config::FileScanConfig;
15use datafusion_datasource::file_stream::FileOpener;
16use datafusion_datasource::schema_adapter::DefaultSchemaAdapterFactory;
17use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
18use datafusion_physical_expr::PhysicalExprRef;
19use datafusion_physical_expr::conjunction;
20use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
21use datafusion_physical_expr_common::physical_expr::fmt_sql;
22use datafusion_physical_plan::DisplayFormatType;
23use datafusion_physical_plan::PhysicalExpr;
24use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
25use datafusion_physical_plan::filter_pushdown::PushedDown;
26use datafusion_physical_plan::filter_pushdown::PushedDownPredicate;
27use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
28use object_store::ObjectStore;
29use object_store::path::Path;
30use vortex::error::VortexExpect as _;
31use vortex::file::VORTEX_FILE_EXTENSION;
32use vortex::layout::LayoutReader;
33use vortex::metrics::MetricsSessionExt;
34use vortex::session::VortexSession;
35use vortex_utils::aliases::dash_map::DashMap;
36
37use super::cache::VortexFileCache;
38use super::metrics::PARTITION_LABEL;
39use super::opener::VortexOpener;
40use crate::convert::exprs::DefaultExpressionConvertor;
41use crate::convert::exprs::ExpressionConvertor;
42use crate::convert::exprs::can_be_pushed_down;
43use crate::vendor::schema_rewriter::DF52PhysicalExprAdapterFactory;
44
45#[derive(Clone)]
49pub struct VortexSource {
50 pub(crate) session: VortexSession,
51 pub(crate) file_cache: VortexFileCache,
52 pub(crate) full_predicate: Option<PhysicalExprRef>,
55 pub(crate) vortex_predicate: Option<PhysicalExprRef>,
58 pub(crate) batch_size: Option<usize>,
59 pub(crate) projected_statistics: Option<Statistics>,
60 pub(crate) table_schema: Option<TableSchema>,
61 pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
62 pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
63 _unused_df_metrics: ExecutionPlanMetricsSet,
64 layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
68 expression_convertor: Arc<dyn ExpressionConvertor>,
69}
70
71impl VortexSource {
72 pub(crate) fn new(session: VortexSession, file_cache: VortexFileCache) -> Self {
73 Self {
74 session,
75 file_cache,
76 full_predicate: None,
77 vortex_predicate: None,
78 batch_size: None,
79 projected_statistics: None,
80 table_schema: None,
81 schema_adapter_factory: None,
82 expr_adapter_factory: None,
83 _unused_df_metrics: Default::default(),
84 layout_readers: Arc::new(DashMap::default()),
85 expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
86 }
87 }
88
89 pub fn with_expression_convertor(
91 mut self,
92 expr_convertor: Arc<dyn ExpressionConvertor>,
93 ) -> Self {
94 self.expression_convertor = expr_convertor;
95 self
96 }
97}
98
99impl FileSource for VortexSource {
100 fn create_file_opener(
101 &self,
102 object_store: Arc<dyn ObjectStore>,
103 base_config: &FileScanConfig,
104 partition: usize,
105 ) -> Arc<dyn FileOpener> {
106 let partition_metrics = self
107 .session
108 .metrics()
109 .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter());
110
111 let batch_size = self
112 .batch_size
113 .vortex_expect("batch_size must be supplied to VortexSource");
114
115 let expr_adapter = self
116 .expr_adapter_factory
117 .as_ref()
118 .or(base_config.expr_adapter_factory.as_ref());
119
120 if expr_adapter.is_some() {
121 tracing::warn!(
122 "Schema evolution with VortexSource may not work as expected if you override the adapter."
123 );
124 }
125
126 let schema_adapter = self.schema_adapter_factory.as_ref();
127
128 let (expr_adapter_factory, schema_adapter_factory) = match (expr_adapter, schema_adapter) {
130 (Some(expr_adapter), Some(schema_adapter)) => {
131 (Some(expr_adapter.clone()), schema_adapter.clone())
132 }
133 (Some(expr_adapter), None) => (
134 Some(expr_adapter.clone()),
135 Arc::new(DefaultSchemaAdapterFactory) as _,
136 ),
137 (None, Some(schema_adapter)) => {
138 (None, schema_adapter.clone())
140 }
141 (None, None) => (
142 Some(Arc::new(DF52PhysicalExprAdapterFactory) as _),
143 Arc::new(DefaultSchemaAdapterFactory) as _,
144 ),
145 };
146
147 let projection = base_config.file_column_projection_indices().map(Arc::from);
148
149 let table_schema = base_config.table_schema.clone();
150
151 let opener = VortexOpener {
152 session: self.session.clone(),
153 object_store,
154 projection,
155 filter: self.vortex_predicate.clone(),
156 file_pruning_predicate: self.full_predicate.clone(),
157 expr_adapter_factory,
158 schema_adapter_factory,
159 table_schema,
160 file_cache: self.file_cache.clone(),
161 batch_size,
162 limit: base_config.limit,
163 metrics: partition_metrics,
164 layout_readers: self.layout_readers.clone(),
165 has_output_ordering: !base_config.output_ordering.is_empty(),
166 expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
167 };
168
169 Arc::new(opener)
170 }
171
172 fn as_any(&self) -> &dyn Any {
173 self
174 }
175
176 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
177 let mut source = self.clone();
178 source.batch_size = Some(batch_size);
179 Arc::new(source)
180 }
181
182 fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
183 let mut source = self.clone();
184 source.table_schema = Some(schema);
185 Arc::new(source)
186 }
187
188 fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
189 Arc::new(self.clone())
190 }
191
192 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
193 let mut source = self.clone();
194 source.projected_statistics = Some(statistics);
195 Arc::new(source)
196 }
197
198 fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
199 self.vortex_predicate.clone()
200 }
201
202 fn metrics(&self) -> &ExecutionPlanMetricsSet {
203 &self._unused_df_metrics
204 }
205
206 fn statistics(&self) -> DFResult<Statistics> {
207 let statistics = self
208 .projected_statistics
209 .clone()
210 .vortex_expect("projected_statistics must be set");
211
212 if self.vortex_predicate.is_some() {
213 Ok(statistics.to_inexact())
214 } else {
215 Ok(statistics)
216 }
217 }
218
219 fn file_type(&self) -> &str {
220 VORTEX_FILE_EXTENSION
221 }
222
223 fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
224 match t {
225 DisplayFormatType::Default | DisplayFormatType::Verbose => {
226 if let Some(ref predicate) = self.vortex_predicate {
227 write!(f, ", predicate: {predicate}")?;
228 }
229 }
230 DisplayFormatType::TreeRender => {
232 if let Some(ref predicate) = self.vortex_predicate {
233 writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
234 };
235 }
236 }
237 Ok(())
238 }
239
240 fn try_pushdown_filters(
241 &self,
242 filters: Vec<Arc<dyn PhysicalExpr>>,
243 _config: &ConfigOptions,
244 ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
245 if filters.is_empty() {
246 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
247 vec![],
248 ));
249 }
250
251 let Some(table_schema) = self.table_schema.as_ref() else {
252 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
253 vec![PushedDown::No; filters.len()],
254 ));
255 };
256
257 let mut source = self.clone();
258
259 source.full_predicate = match source.full_predicate {
262 Some(predicate) => Some(conjunction(
263 std::iter::once(predicate).chain(filters.clone()),
264 )),
265 None => Some(conjunction(filters.clone())),
266 };
267
268 let supported_filters = filters
269 .into_iter()
270 .map(|expr| {
271 if can_be_pushed_down(&expr, table_schema.file_schema()) {
272 PushedDownPredicate::supported(expr)
273 } else {
274 PushedDownPredicate::unsupported(expr)
275 }
276 })
277 .collect::<Vec<_>>();
278
279 if supported_filters
280 .iter()
281 .all(|p| matches!(p.discriminant, PushedDown::No))
282 {
283 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
284 vec![PushedDown::No; supported_filters.len()],
285 )
286 .with_updated_node(Arc::new(source) as _));
287 }
288
289 let supported = supported_filters
290 .iter()
291 .filter_map(|p| match p.discriminant {
292 PushedDown::Yes => Some(&p.predicate),
293 PushedDown::No => None,
294 })
295 .cloned();
296
297 let predicate = match source.vortex_predicate {
298 Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
299 None => conjunction(supported),
300 };
301
302 tracing::debug!(%predicate, "Saving predicate");
303
304 source.vortex_predicate = Some(predicate);
305
306 Ok(FilterPushdownPropagation::with_parent_pushdown_result(
307 supported_filters.iter().map(|f| f.discriminant).collect(),
308 )
309 .with_updated_node(Arc::new(source) as _))
310 }
311
312 fn with_schema_adapter_factory(
313 &self,
314 factory: Arc<dyn SchemaAdapterFactory>,
315 ) -> DFResult<Arc<dyn FileSource>> {
316 let mut source = self.clone();
317 source.schema_adapter_factory = Some(factory);
318 Ok(Arc::new(source))
319 }
320
321 fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
322 self.schema_adapter_factory.clone()
323 }
324}