vortex_datafusion/persistent/
source.rs1use std::fmt::Formatter;
5use std::ops::Range;
6use std::sync::Arc;
7use std::sync::Weak;
8
9use datafusion_common::Result as DFResult;
10use datafusion_common::config::ConfigOptions;
11use datafusion_datasource::TableSchema;
12use datafusion_datasource::file::FileSource;
13use datafusion_datasource::file_scan_config::FileScanConfig;
14use datafusion_datasource::file_stream::FileOpener;
15use datafusion_execution::cache::cache_manager::FileMetadataCache;
16use datafusion_physical_expr::EquivalenceProperties;
17use datafusion_physical_expr::PhysicalExprRef;
18use datafusion_physical_expr::PhysicalSortExpr;
19use datafusion_physical_expr::conjunction;
20use datafusion_physical_expr::projection::ProjectionExprs;
21use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
22use datafusion_physical_expr_common::physical_expr::fmt_sql;
23use datafusion_physical_plan::DisplayFormatType;
24use datafusion_physical_plan::PhysicalExpr;
25use datafusion_physical_plan::SortOrderPushdownResult;
26use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
27use datafusion_physical_plan::filter_pushdown::PushedDown;
28use datafusion_physical_plan::filter_pushdown::PushedDownPredicate;
29use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
30use object_store::ObjectStore;
31use object_store::path::Path;
32use vortex::error::VortexExpect;
33use vortex::file::VORTEX_FILE_EXTENSION;
34use vortex::layout::LayoutReader;
35use vortex::metrics::DefaultMetricsRegistry;
36use vortex::metrics::MetricsRegistry;
37use vortex::session::VortexSession;
38use vortex_utils::aliases::dash_map::DashMap;
39
40use super::opener::VortexOpener;
41use crate::VortexTableOptions;
42use crate::convert::exprs::DefaultExpressionConvertor;
43use crate::convert::exprs::ExpressionConvertor;
44use crate::persistent::reader::DefaultVortexReaderFactory;
45use crate::persistent::reader::VortexReaderFactory;
46
47#[derive(Clone)]
175pub struct VortexSource {
176 pub(crate) session: VortexSession,
177 pub(crate) table_schema: TableSchema,
178 pub(crate) projection: ProjectionExprs,
179 pub(crate) full_predicate: Option<PhysicalExprRef>,
182 pub(crate) vortex_predicate: Option<PhysicalExprRef>,
185 pub(crate) batch_size: Option<usize>,
186 _unused_df_metrics: ExecutionPlanMetricsSet,
187 layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
191 natural_split_ranges: Arc<DashMap<Path, Arc<[Range<u64>]>>>,
193 expression_convertor: Arc<dyn ExpressionConvertor>,
194 pub(crate) vortex_reader_factory: Option<Arc<dyn VortexReaderFactory>>,
195 pub(crate) ordered: bool,
196 vx_metrics_registry: Arc<dyn MetricsRegistry>,
197 file_metadata_cache: Option<Arc<dyn FileMetadataCache>>,
198 options: VortexTableOptions,
200}
201
202impl VortexSource {
203 pub fn new(table_schema: TableSchema, session: VortexSession) -> Self {
212 let full_schema = table_schema.table_schema();
213 let indices = (0..full_schema.fields().len()).collect::<Vec<_>>();
214 let projection = ProjectionExprs::from_indices(&indices, full_schema);
215
216 Self {
217 session,
218 table_schema,
219 projection,
220 full_predicate: None,
221 vortex_predicate: None,
222 batch_size: None,
223 _unused_df_metrics: Default::default(),
224 layout_readers: Arc::new(DashMap::default()),
225 natural_split_ranges: Arc::new(DashMap::default()),
226 expression_convertor: Arc::new(DefaultExpressionConvertor::default()),
227 vortex_reader_factory: None,
228 vx_metrics_registry: Arc::new(DefaultMetricsRegistry::default()),
229 file_metadata_cache: None,
230 ordered: false,
231 options: VortexTableOptions::default(),
232 }
233 }
234
235 pub fn with_projection_pushdown(mut self, enabled: bool) -> Self {
241 self.options.projection_pushdown = enabled;
242 self
243 }
244
245 pub fn with_expression_convertor(
251 mut self,
252 expr_convertor: Arc<dyn ExpressionConvertor>,
253 ) -> Self {
254 self.expression_convertor = expr_convertor;
255 self
256 }
257
258 pub fn with_vortex_reader_factory(
265 mut self,
266 vortex_reader_factory: Arc<dyn VortexReaderFactory>,
267 ) -> Self {
268 self.vortex_reader_factory = Some(vortex_reader_factory);
269 self
270 }
271
272 pub fn metrics_registry(&self) -> &Arc<dyn MetricsRegistry> {
278 &self.vx_metrics_registry
279 }
280
281 pub fn with_file_metadata_cache(
283 mut self,
284 file_metadata_cache: Arc<dyn FileMetadataCache>,
285 ) -> Self {
286 self.file_metadata_cache = Some(file_metadata_cache);
287 self
288 }
289
290 pub fn with_scan_concurrency(mut self, scan_concurrency: usize) -> Self {
294 self.options.scan_concurrency = Some(scan_concurrency);
295 self
296 }
297
298 pub fn options(&self) -> &VortexTableOptions {
300 &self.options
301 }
302
303 pub fn with_options(mut self, opts: VortexTableOptions) -> Self {
305 self.options = opts;
306 self
307 }
308
309 fn create_vortex_opener(
310 &self,
311 object_store: Arc<dyn ObjectStore>,
312 base_config: &FileScanConfig,
313 partition: usize,
314 ) -> DFResult<VortexOpener> {
315 let batch_size = self
316 .batch_size
317 .vortex_expect("batch_size must be supplied to VortexSource");
318
319 let expr_adapter_factory = base_config
320 .expr_adapter_factory
321 .clone()
322 .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory));
323
324 let vortex_reader_factory = self
325 .vortex_reader_factory
326 .clone()
327 .unwrap_or_else(|| Arc::new(DefaultVortexReaderFactory::new(object_store)));
328
329 let opener = VortexOpener {
330 partition,
331 session: self.session.clone(),
332 vortex_reader_factory,
333 projection: self.projection.clone(),
334 filter: self.vortex_predicate.clone(),
335 file_pruning_predicate: self.full_predicate.clone(),
336 expr_adapter_factory,
337 table_schema: self.table_schema.clone(),
338 batch_size,
339 limit: base_config.limit.map(|l| l as u64),
340 metrics_registry: Arc::clone(&self.vx_metrics_registry),
341 layout_readers: Arc::clone(&self.layout_readers),
342 natural_split_ranges: Arc::clone(&self.natural_split_ranges),
343 has_output_ordering: !base_config.output_ordering.is_empty() || self.ordered,
344 expression_convertor: Arc::clone(&self.expression_convertor),
345 file_metadata_cache: self.file_metadata_cache.clone(),
346 projection_pushdown: self.options.projection_pushdown,
347 scan_concurrency: self.options.scan_concurrency,
348 };
349
350 Ok(opener)
351 }
352}
353
354impl FileSource for VortexSource {
355 fn create_file_opener(
356 &self,
357 object_store: Arc<dyn ObjectStore>,
358 base_config: &FileScanConfig,
359 partition: usize,
360 ) -> DFResult<Arc<dyn FileOpener>> {
361 Ok(Arc::new(self.create_vortex_opener(
362 object_store,
363 base_config,
364 partition,
365 )?))
366 }
367
368 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
369 let mut source = self.clone();
370 source.batch_size = Some(batch_size);
371 Arc::new(source)
372 }
373
374 fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
375 self.vortex_predicate.clone()
376 }
377
378 fn metrics(&self) -> &ExecutionPlanMetricsSet {
379 &self._unused_df_metrics
380 }
381
382 fn file_type(&self) -> &str {
383 VORTEX_FILE_EXTENSION
384 }
385
386 fn try_pushdown_sort(
387 &self,
388 order: &[PhysicalSortExpr],
389 eq_properties: &EquivalenceProperties,
390 ) -> DFResult<SortOrderPushdownResult<Arc<dyn FileSource>>> {
391 if order.is_empty() {
392 return Ok(SortOrderPushdownResult::Unsupported);
393 }
394
395 if eq_properties.ordering_satisfy(order.iter().cloned())? {
396 let mut this = self.clone();
397 this.ordered = true;
398
399 return Ok(SortOrderPushdownResult::Exact {
400 inner: Arc::new(this) as Arc<dyn FileSource>,
401 });
402 }
403
404 Ok(SortOrderPushdownResult::Unsupported)
405 }
406
407 fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
408 match t {
409 DisplayFormatType::Default | DisplayFormatType::Verbose => {
410 if let Some(predicate) = &self.vortex_predicate {
411 write!(f, ", predicate: {predicate}")?;
412 }
413 }
414 DisplayFormatType::TreeRender => {
416 if let Some(predicate) = &self.vortex_predicate {
417 writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
418 };
419 }
420 }
421 Ok(())
422 }
423
424 fn supports_repartitioning(&self) -> bool {
425 true
426 }
427
428 fn try_pushdown_filters(
429 &self,
430 filters: Vec<Arc<dyn PhysicalExpr>>,
431 _config: &ConfigOptions,
432 ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
433 if filters.is_empty() {
434 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
435 vec![],
436 ));
437 }
438
439 let mut source = self.clone();
440
441 source.full_predicate = match source.full_predicate {
444 Some(predicate) => Some(conjunction(
445 std::iter::once(predicate).chain(filters.clone()),
446 )),
447 None => Some(conjunction(filters.clone())),
448 };
449
450 let supported_filters = filters
451 .into_iter()
452 .map(|expr| {
453 if self
454 .expression_convertor
455 .can_be_pushed_down(&expr, self.table_schema.file_schema())
456 {
457 PushedDownPredicate::supported(expr)
458 } else {
459 PushedDownPredicate::unsupported(expr)
460 }
461 })
462 .collect::<Vec<_>>();
463
464 if supported_filters
465 .iter()
466 .all(|p| matches!(p.discriminant, PushedDown::No))
467 {
468 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
469 vec![PushedDown::No; supported_filters.len()],
470 )
471 .with_updated_node(Arc::new(source) as _));
472 }
473
474 let supported = supported_filters
475 .iter()
476 .filter_map(|p| match p.discriminant {
477 PushedDown::Yes => Some(&p.predicate),
478 PushedDown::No => None,
479 })
480 .cloned();
481
482 let predicate = match source.vortex_predicate {
483 Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
484 None => conjunction(supported),
485 };
486
487 tracing::debug!(%predicate, "Saving predicate");
488
489 source.vortex_predicate = Some(predicate);
490
491 Ok(FilterPushdownPropagation::with_parent_pushdown_result(
492 supported_filters.iter().map(|f| f.discriminant).collect(),
493 )
494 .with_updated_node(Arc::new(source) as _))
495 }
496
497 fn try_pushdown_projection(
498 &self,
499 projection: &ProjectionExprs,
500 ) -> DFResult<Option<Arc<dyn FileSource>>> {
501 let mut source = self.clone();
502 source.projection = self.projection.try_merge(projection)?;
503 Ok(Some(Arc::new(source)))
504 }
505
506 fn projection(&self) -> Option<&ProjectionExprs> {
507 Some(&self.projection)
508 }
509
510 fn table_schema(&self) -> &TableSchema {
511 &self.table_schema
512 }
513}
514
515#[cfg(test)]
516mod tests {
517 use arrow_schema::DataType;
518 use arrow_schema::Field;
519 use arrow_schema::Schema;
520 use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
521 use datafusion_execution::object_store::ObjectStoreUrl;
522 use datafusion_physical_expr::expressions::Column;
523 use object_store::memory::InMemory;
524 use vortex::VortexSessionDefault;
525
526 use super::*;
527 use crate::convert::exprs::ProcessedProjection;
528
529 struct TrackingExpressionConvertor {
530 inner: DefaultExpressionConvertor,
531 }
532
533 impl ExpressionConvertor for TrackingExpressionConvertor {
534 fn can_be_pushed_down(&self, expr: &PhysicalExprRef, schema: &Schema) -> bool {
535 self.inner.can_be_pushed_down(expr, schema)
536 }
537
538 fn convert(&self, expr: &dyn PhysicalExpr) -> DFResult<vortex::expr::Expression> {
539 self.inner.convert(expr)
540 }
541
542 fn split_projection(
543 &self,
544 source_projection: ProjectionExprs,
545 input_schema: &Schema,
546 output_schema: &Schema,
547 ) -> DFResult<ProcessedProjection> {
548 self.inner
549 .split_projection(source_projection, input_schema, output_schema)
550 }
551
552 fn no_pushdown_projection(
553 &self,
554 source_projection: ProjectionExprs,
555 input_schema: &Schema,
556 ) -> DFResult<ProcessedProjection> {
557 self.inner
558 .no_pushdown_projection(source_projection, input_schema)
559 }
560 }
561
562 fn sort_column(name: &str, index: usize) -> PhysicalSortExpr {
563 let expr: PhysicalExprRef = Arc::new(Column::new(name, index));
564 PhysicalSortExpr::new_default(expr)
565 }
566
567 fn sort_test_schema() -> Arc<Schema> {
568 Arc::new(Schema::new(vec![
569 Field::new("a", DataType::Int32, false),
570 Field::new("b", DataType::Int32, false),
571 ]))
572 }
573
574 fn sort_test_source(schema: Arc<Schema>) -> VortexSource {
575 VortexSource::new(
576 TableSchema::from_file_schema(schema),
577 VortexSession::default(),
578 )
579 }
580
581 fn assert_ordered_source(inner: Arc<dyn FileSource>) -> anyhow::Result<()> {
582 let source = inner
583 .downcast_ref::<VortexSource>()
584 .ok_or_else(|| anyhow::anyhow!("expected VortexSource"))?;
585
586 assert!(source.ordered);
587 Ok(())
588 }
589
590 #[test]
591 fn try_pushdown_sort_returns_exact_when_ordering_is_satisfied() -> anyhow::Result<()> {
592 let schema = sort_test_schema();
593 let source = sort_test_source(Arc::clone(&schema));
594 let order = vec![sort_column("a", 0), sort_column("b", 1)];
595 let eq_properties = EquivalenceProperties::new_with_orderings(schema, [order.clone()]);
596
597 let result = source.try_pushdown_sort(&order, &eq_properties)?;
598
599 match result {
600 SortOrderPushdownResult::Exact { inner } => assert_ordered_source(inner)?,
601 SortOrderPushdownResult::Inexact { .. } | SortOrderPushdownResult::Unsupported => {
602 anyhow::bail!("expected exact sort pushdown")
603 }
604 }
605 assert!(!source.ordered);
606 Ok(())
607 }
608
609 #[test]
610 fn create_vortex_opener_preserves_expression_convertor() -> anyhow::Result<()> {
611 let file_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
612 let expression_convertor = Arc::new(TrackingExpressionConvertor {
613 inner: DefaultExpressionConvertor::default(),
614 }) as Arc<dyn ExpressionConvertor>;
615
616 let mut source = VortexSource::new(
617 TableSchema::from_file_schema(file_schema),
618 VortexSession::default(),
619 )
620 .with_expression_convertor(Arc::clone(&expression_convertor));
621 source.batch_size = Some(100);
622
623 let config = FileScanConfigBuilder::new(
624 ObjectStoreUrl::local_filesystem(),
625 Arc::new(source.clone()),
626 )
627 .build();
628
629 let opener = source.create_vortex_opener(
630 Arc::new(InMemory::new()) as Arc<dyn ObjectStore>,
631 &config,
632 0,
633 )?;
634
635 assert!(Arc::ptr_eq(
636 &opener.expression_convertor,
637 &expression_convertor
638 ));
639 Ok(())
640 }
641}