vortex_datafusion/persistent/
source.rs1use std::any::Any;
5use std::fmt::Formatter;
6use std::ops::Range;
7use std::sync::Arc;
8use std::sync::Weak;
9
10use datafusion_common::Result as DFResult;
11use datafusion_common::config::ConfigOptions;
12use datafusion_datasource::TableSchema;
13use datafusion_datasource::file::FileSource;
14use datafusion_datasource::file_scan_config::FileScanConfig;
15use datafusion_datasource::file_stream::FileOpener;
16use datafusion_execution::cache::cache_manager::FileMetadataCache;
17use datafusion_physical_expr::PhysicalExprRef;
18use datafusion_physical_expr::conjunction;
19use datafusion_physical_expr::projection::ProjectionExprs;
20use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
21use datafusion_physical_expr_common::physical_expr::fmt_sql;
22use datafusion_physical_plan::DisplayFormatType;
23use datafusion_physical_plan::PhysicalExpr;
24use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
25use datafusion_physical_plan::filter_pushdown::PushedDown;
26use datafusion_physical_plan::filter_pushdown::PushedDownPredicate;
27use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
28use object_store::ObjectStore;
29use object_store::path::Path;
30use vortex::error::VortexExpect;
31use vortex::file::VORTEX_FILE_EXTENSION;
32use vortex::layout::LayoutReader;
33use vortex::metrics::DefaultMetricsRegistry;
34use vortex::metrics::MetricsRegistry;
35use vortex::session::VortexSession;
36use vortex_utils::aliases::dash_map::DashMap;
37
38use super::opener::VortexOpener;
39use crate::VortexTableOptions;
40use crate::convert::exprs::DefaultExpressionConvertor;
41use crate::convert::exprs::ExpressionConvertor;
42use crate::persistent::reader::DefaultVortexReaderFactory;
43use crate::persistent::reader::VortexReaderFactory;
44
45#[derive(Clone)]
173pub struct VortexSource {
174 pub(crate) session: VortexSession,
175 pub(crate) table_schema: TableSchema,
176 pub(crate) projection: ProjectionExprs,
177 pub(crate) full_predicate: Option<PhysicalExprRef>,
180 pub(crate) vortex_predicate: Option<PhysicalExprRef>,
183 pub(crate) batch_size: Option<usize>,
184 _unused_df_metrics: ExecutionPlanMetricsSet,
185 layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
189 natural_split_ranges: Arc<DashMap<Path, Arc<[Range<u64>]>>>,
191 expression_convertor: Arc<dyn ExpressionConvertor>,
192 pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
193 vx_metrics_registry: Arc<dyn MetricsRegistry>,
194 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
195 options: VortexTableOptions,
197}
198
199impl VortexSource {
200 pub fn new(table_schema: TableSchema, session: VortexSession) -> Self {
209 let full_schema = table_schema.table_schema();
210 let indices = (0..full_schema.fields().len()).collect::<Vec<_>>();
211 let projection = ProjectionExprs::from_indices(&indices, full_schema);
212
213 Self {
214 session,
215 table_schema,
216 projection,
217 full_predicate: None,
218 vortex_predicate: None,
219 batch_size: None,
220 _unused_df_metrics: Default::default(),
221 layout_readers: Arc::new(DashMap::default()),
222 natural_split_ranges: Arc::new(DashMap::default()),
223 expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
224 vortex_reader_factory: None,
225 vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
226 file_metadata_cache: None,
227 options: VortexTableOptions::default(),
228 }
229 }
230
231 pub fn with_projection_pushdown(mut self, enabled: bool) -> Self {
237 self.options.projection_pushdown = enabled;
238 self
239 }
240
241 pub fn with_expression_convertor(
247 mut self,
248 expr_convertor: Arc<dyn ExpressionConvertor>,
249 ) -> Self {
250 self.expression_convertor = expr_convertor;
251 self
252 }
253
254 pub fn with_vortex_reader_factory(
261 mut self,
262 vortex_reader_factory: Arc<dyn VortexReaderFactory>,
263 ) -> Self {
264 self.vortex_reader_factory = Some(vortex_reader_factory);
265 self
266 }
267
268 pub fn metrics_registry(&self) -> &Arc<dyn MetricsRegistry> {
274 &self.vx_metrics_registry
275 }
276
277 pub fn with_file_metadata_cache(
279 mut self,
280 file_metadata_cache: Arc<dyn FileMetadataCache>,
281 ) -> Self {
282 self.file_metadata_cache = Some(file_metadata_cache);
283 self
284 }
285
286 pub fn with_scan_concurrency(mut self, scan_concurrency: usize) -> Self {
290 self.options.scan_concurrency = Some(scan_concurrency);
291 self
292 }
293
294 pub fn options(&self) -> &VortexTableOptions {
296 &self.options
297 }
298
299 pub fn with_options(mut self, opts: VortexTableOptions) -> Self {
301 self.options = opts;
302 self
303 }
304
305 fn create_vortex_opener(
306 &self,
307 object_store: Arc<dyn ObjectStore>,
308 base_config: &FileScanConfig,
309 partition: usize,
310 ) -> DFResult<VortexOpener> {
311 let batch_size = self
312 .batch_size
313 .vortex_expect("batch_size must be supplied to VortexSource");
314
315 let expr_adapter_factory = base_config
316 .expr_adapter_factory
317 .clone()
318 .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory));
319
320 let vortex_reader_factory = self
321 .vortex_reader_factory
322 .clone()
323 .unwrap_or_else(|| Arc::new(DefaultVortexReaderFactory::new(object_store)));
324
325 let opener = VortexOpener {
326 partition,
327 session: self.session.clone(),
328 vortex_reader_factory,
329 projection: self.projection.clone(),
330 filter: self.vortex_predicate.clone(),
331 file_pruning_predicate: self.full_predicate.clone(),
332 expr_adapter_factory,
333 table_schema: self.table_schema.clone(),
334 batch_size,
335 limit: base_config.limit.map(|l| l as u64),
336 metrics_registry: Arc::clone(&self.vx_metrics_registry),
337 layout_readers: Arc::clone(&self.layout_readers),
338 natural_split_ranges: Arc::clone(&self.natural_split_ranges),
339 has_output_ordering: !base_config.output_ordering.is_empty(),
340 expression_convertor: Arc::clone(&self.expression_convertor),
341 file_metadata_cache: self.file_metadata_cache.clone(),
342 projection_pushdown: self.options.projection_pushdown,
343 scan_concurrency: self.options.scan_concurrency,
344 };
345
346 Ok(opener)
347 }
348}
349
350impl FileSource for VortexSource {
351 fn create_file_opener(
352 &self,
353 object_store: Arc<dyn ObjectStore>,
354 base_config: &FileScanConfig,
355 partition: usize,
356 ) -> DFResult<Arc<dyn FileOpener>> {
357 Ok(Arc::new(self.create_vortex_opener(
358 object_store,
359 base_config,
360 partition,
361 )?))
362 }
363
364 fn as_any(&self) -> &dyn Any {
365 self
366 }
367
368 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
369 let mut source = self.clone();
370 source.batch_size = Some(batch_size);
371 Arc::new(source)
372 }
373
374 fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
375 self.vortex_predicate.clone()
376 }
377
378 fn metrics(&self) -> &ExecutionPlanMetricsSet {
379 &self._unused_df_metrics
380 }
381
382 fn file_type(&self) -> &str {
383 VORTEX_FILE_EXTENSION
384 }
385
386 fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
387 match t {
388 DisplayFormatType::Default | DisplayFormatType::Verbose => {
389 if let Some(predicate) = &self.vortex_predicate {
390 write!(f, ", predicate: {predicate}")?;
391 }
392 }
393 DisplayFormatType::TreeRender => {
395 if let Some(predicate) = &self.vortex_predicate {
396 writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
397 };
398 }
399 }
400 Ok(())
401 }
402
403 fn supports_repartitioning(&self) -> bool {
404 true
405 }
406
407 fn try_pushdown_filters(
408 &self,
409 filters: Vec<Arc<dyn PhysicalExpr>>,
410 _config: &ConfigOptions,
411 ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
412 if filters.is_empty() {
413 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
414 vec![],
415 ));
416 }
417
418 let mut source = self.clone();
419
420 source.full_predicate = match source.full_predicate {
423 Some(predicate) => Some(conjunction(
424 std::iter::once(predicate).chain(filters.clone()),
425 )),
426 None => Some(conjunction(filters.clone())),
427 };
428
429 let supported_filters = filters
430 .into_iter()
431 .map(|expr| {
432 if self
433 .expression_convertor
434 .can_be_pushed_down(&expr, self.table_schema.file_schema())
435 {
436 PushedDownPredicate::supported(expr)
437 } else {
438 PushedDownPredicate::unsupported(expr)
439 }
440 })
441 .collect::<Vec<_>>();
442
443 if supported_filters
444 .iter()
445 .all(|p| matches!(p.discriminant, PushedDown::No))
446 {
447 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
448 vec![PushedDown::No; supported_filters.len()],
449 )
450 .with_updated_node(Arc::new(source) as _));
451 }
452
453 let supported = supported_filters
454 .iter()
455 .filter_map(|p| match p.discriminant {
456 PushedDown::Yes => Some(&p.predicate),
457 PushedDown::No => None,
458 })
459 .cloned();
460
461 let predicate = match source.vortex_predicate {
462 Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
463 None => conjunction(supported),
464 };
465
466 tracing::debug!(%predicate, "Saving predicate");
467
468 source.vortex_predicate = Some(predicate);
469
470 Ok(FilterPushdownPropagation::with_parent_pushdown_result(
471 supported_filters.iter().map(|f| f.discriminant).collect(),
472 )
473 .with_updated_node(Arc::new(source) as _))
474 }
475
476 fn try_pushdown_projection(
477 &self,
478 projection: &ProjectionExprs,
479 ) -> DFResult<Option<Arc<dyn FileSource>>> {
480 let mut source = self.clone();
481 source.projection = self.projection.try_merge(projection)?;
482 Ok(Some(Arc::new(source)))
483 }
484
485 fn projection(&self) -> Option<&ProjectionExprs> {
486 Some(&self.projection)
487 }
488
489 fn table_schema(&self) -> &TableSchema {
490 &self.table_schema
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use arrow_schema::DataType;
497 use arrow_schema::Field;
498 use arrow_schema::Schema;
499 use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
500 use datafusion_execution::object_store::ObjectStoreUrl;
501 use object_store::memory::InMemory;
502 use vortex::VortexSessionDefault;
503
504 use super::*;
505 use crate::convert::exprs::ProcessedProjection;
506
507 struct TrackingExpressionConvertor {
508 inner: DefaultExpressionConvertor,
509 }
510
511 impl ExpressionConvertor for TrackingExpressionConvertor {
512 fn can_be_pushed_down(&self, expr: &PhysicalExprRef, schema: &Schema) -> bool {
513 self.inner.can_be_pushed_down(expr, schema)
514 }
515
516 fn convert(&self, expr: &dyn PhysicalExpr) -> DFResult<vortex::expr::Expression> {
517 self.inner.convert(expr)
518 }
519
520 fn split_projection(
521 &self,
522 source_projection: ProjectionExprs,
523 input_schema: &Schema,
524 output_schema: &Schema,
525 ) -> DFResult<ProcessedProjection> {
526 self.inner
527 .split_projection(source_projection, input_schema, output_schema)
528 }
529
530 fn no_pushdown_projection(
531 &self,
532 source_projection: ProjectionExprs,
533 input_schema: &Schema,
534 ) -> DFResult<ProcessedProjection> {
535 self.inner
536 .no_pushdown_projection(source_projection, input_schema)
537 }
538 }
539
540 #[test]
541 fn create_vortex_opener_preserves_expression_convertor() -> anyhow::Result<()> {
542 let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
543 let expression_convertor = Arc::new(TrackingExpressionConvertor {
544 inner: DefaultExpressionConvertor::default(),
545 }) as Arc<dyn ExpressionConvertor>;
546
547 let mut source = VortexSource::new(
548 TableSchema::from_file_schema(file_schema),
549 VortexSession::default(),
550 )
551 .with_expression_convertor(Arc::clone(&expression_convertor));
552 source.batch_size = Some(100);
553
554 let config = FileScanConfigBuilder::new(
555 ObjectStoreUrl::local_filesystem(),
556 Arc::new(source.clone()),
557 )
558 .build();
559
560 let opener = source.create_vortex_opener(
561 Arc::new(InMemory::new()) as Arc<dyn ObjectStore>,
562 &config,
563 0,
564 )?;
565
566 assert!(Arc::ptr_eq(
567 &opener.expression_convertor,
568 &expression_convertor
569 ));
570 Ok(())
571 }
572}