1use std::any::Any;
71use std::fmt;
72use std::fmt::Formatter;
73use std::sync::Arc;
74
75use arrow_schema::Field;
76use arrow_schema::Schema;
77use arrow_schema::SchemaRef;
78use datafusion_common::ColumnStatistics;
79use datafusion_common::DataFusionError;
80use datafusion_common::Result as DFResult;
81use datafusion_common::Statistics;
82use datafusion_common::arrow::array::AsArray;
83use datafusion_common::arrow::array::RecordBatch;
84use datafusion_common::stats::Precision as DFPrecision;
85use datafusion_datasource::source::DataSource;
86use datafusion_execution::SendableRecordBatchStream;
87use datafusion_execution::TaskContext;
88use datafusion_physical_expr::EquivalenceProperties;
89use datafusion_physical_expr::Partitioning;
90use datafusion_physical_expr::PhysicalExpr;
91use datafusion_physical_expr::projection::ProjectionExprs;
92use datafusion_physical_expr::utils::reassign_expr_columns;
93use datafusion_physical_expr_common::sort_expr::LexOrdering;
94use datafusion_physical_plan::DisplayFormatType;
95use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
96use datafusion_physical_plan::filter_pushdown::PushedDown;
97use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
98use futures::StreamExt;
99use futures::TryStreamExt;
100use futures::future::try_join_all;
101use vortex::array::VortexSessionExecute;
102use vortex::array::arrow::ArrowSessionExt;
103use vortex::dtype::DType;
104use vortex::dtype::FieldPath;
105use vortex::dtype::Nullability;
106use vortex::error::VortexResult;
107use vortex::error::vortex_bail;
108use vortex::expr::Expression;
109use vortex::expr::and as vx_and;
110use vortex::expr::get_item;
111use vortex::expr::pack;
112use vortex::expr::root;
113use vortex::expr::stats::Precision;
114use vortex::expr::transform::replace;
115use vortex::io::session::RuntimeSessionExt;
116use vortex::scan::DataSourceRef;
117use vortex::scan::ScanRequest;
118use vortex::session::VortexSession;
119use vortex_utils::parallelism::get_available_parallelism;
120
121use crate::convert::exprs::DefaultExpressionConvertor;
122use crate::convert::exprs::ExpressionConvertor;
123use crate::convert::exprs::ProcessedProjection;
124use crate::convert::exprs::make_vortex_predicate;
125use crate::convert::stats::stats_set_to_df;
126
127pub struct VortexDataSourceBuilder {
167 data_source: DataSourceRef,
168 session: VortexSession,
169
170 arrow_schema: Option<SchemaRef>,
171 projection: Option<Vec<usize>>,
172}
173
174impl VortexDataSourceBuilder {
175 pub fn with_arrow_schema(mut self, arrow_schema: SchemaRef) -> Self {
183 self.arrow_schema = Some(arrow_schema);
184 self
185 }
186
187 pub fn with_projection(mut self, indices: Vec<usize>) -> Self {
192 self.projection = Some(indices);
193 self
194 }
195
196 pub fn with_some_projection(mut self, indices: Option<Vec<usize>>) -> Self {
198 self.projection = indices;
199 self
200 }
201
202 pub async fn build(self) -> VortexResult<VortexDataSource> {
208 let mut projection = root();
210
211 let mut arrow_schema = match self.arrow_schema {
213 Some(schema) => schema,
214 None => Arc::new(
215 self.session
216 .arrow()
217 .to_arrow_schema(self.data_source.dtype())?,
218 ),
219 };
220
221 if let Some(indices) = self.projection {
223 let fields = indices.iter().map(|&i| {
224 let name = arrow_schema.field(i).name().clone();
225 let expr = get_item(name.as_str(), root());
226 (name, expr)
227 });
228
229 projection = pack(fields, Nullability::NonNullable);
231
232 arrow_schema = Arc::new(Schema::new(
234 indices
235 .iter()
236 .map(|&i| arrow_schema.field(i).clone())
237 .collect::<Vec<_>>(),
238 ));
239 }
240
241 let DType::Struct(fields, ..) = projection.return_dtype(self.data_source.dtype())? else {
242 vortex_bail!("Projection does not evaluate to a struct");
243 };
244
245 let field_paths: Vec<_> = fields
247 .names()
248 .iter()
249 .cloned()
250 .map(FieldPath::from_name)
251 .collect();
252 let statistics = try_join_all(
253 field_paths
254 .iter()
255 .map(|path| self.data_source.field_statistics(path)),
256 )
257 .await?
258 .iter()
259 .zip(fields.fields())
260 .map(|(stats, dtype)| stats_set_to_df(stats, &dtype))
261 .collect::<VortexResult<Vec<_>>>()?;
262
263 Ok(VortexDataSource {
264 data_source: self.data_source,
265 session: self.session,
266 initial_schema: Arc::clone(&arrow_schema),
267 initial_projection: projection.clone(),
268 initial_statistics: statistics.clone(),
269 projected_projection: projection.clone(),
270 projected_schema: Arc::clone(&arrow_schema),
271 projected_statistics: statistics.clone(),
272 leftover_projection: None,
273 leftover_schema: arrow_schema,
274 leftover_statistics: statistics,
275 filter: None,
276 limit: None,
277 ordered: false,
278 num_partitions: get_available_parallelism().unwrap_or(1),
279 })
280 }
281}
282
283impl VortexDataSource {
284 pub fn builder(data_source: DataSourceRef, session: VortexSession) -> VortexDataSourceBuilder {
286 VortexDataSourceBuilder {
287 data_source,
288 session,
289 arrow_schema: None,
290 projection: None,
291 }
292 }
293}
294
295#[derive(Clone)]
312pub struct VortexDataSource {
313 data_source: DataSourceRef,
315 session: VortexSession,
317
318 initial_schema: SchemaRef,
321 initial_projection: Expression,
323 #[expect(dead_code)]
325 initial_statistics: Vec<ColumnStatistics>,
326
327 projected_projection: Expression,
331 projected_schema: SchemaRef,
333 projected_statistics: Vec<ColumnStatistics>,
335
336 leftover_projection: Option<ProjectionExprs>,
341 leftover_schema: SchemaRef,
344 leftover_statistics: Vec<ColumnStatistics>,
346
347 filter: Option<Expression>,
350 limit: Option<usize>,
352 ordered: bool,
354
355 num_partitions: usize,
360}
361
362impl fmt::Debug for VortexDataSource {
363 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
364 f.debug_struct("VortexScanSource")
365 .field("schema", &self.leftover_schema)
366 .field("projection", &format!("{}", &self.projected_projection))
367 .field("filter", &self.filter.as_ref().map(|e| format!("{}", e)))
368 .field("limit", &self.limit)
369 .finish()
370 }
371}
372
373impl DataSource for VortexDataSource {
374 fn open(
375 &self,
376 partition: usize,
377 _context: Arc<TaskContext>,
378 ) -> DFResult<SendableRecordBatchStream> {
379 if partition != 0 {
382 return Err(DataFusionError::Internal(format!(
383 "VortexScanSource: expected partition 0, got {partition}"
384 )));
385 }
386
387 let scan_request = ScanRequest {
390 projection: self.projected_projection.clone(),
391 filter: self.filter.clone(),
392 limit: self.limit.map(|l| u64::try_from(l).unwrap_or(u64::MAX)),
393 ordered: self.ordered,
394 ..Default::default()
395 };
396
397 let data_source = Arc::clone(&self.data_source);
398 let projected_schema = Arc::clone(&self.projected_schema);
399 let projected_target_field = Arc::new(Field::new_struct(
400 "",
401 projected_schema.fields().clone(),
402 false,
403 ));
404 let session = self.session.clone();
405 let num_partitions = self.num_partitions;
406
407 let leftover_projector = self
409 .leftover_projection
410 .as_ref()
411 .map(|proj| proj.make_projector(&self.projected_schema))
412 .transpose()?;
413
414 let stream = futures::stream::once(async move {
416 let scan = data_source
417 .scan(scan_request)
418 .await
419 .map_err(|e| DataFusionError::External(Box::new(e)))?;
420
421 let scan_streams = scan.partitions().map(|split_result| {
426 let split = split_result?;
427 split.execute()
428 });
429
430 let handle = session.handle();
431 let stream = scan_streams
432 .try_flatten_unordered(Some(num_partitions * 2))
433 .map(move |result| {
434 let session = session.clone();
435 let target_field = Arc::clone(&projected_target_field);
436 handle.spawn_cpu(move || {
437 let mut ctx = session.create_execution_ctx();
438 result.and_then(|chunk| {
439 let arrow = session.arrow().execute_arrow(
440 chunk,
441 Some(target_field.as_ref()),
442 &mut ctx,
443 )?;
444 Ok(RecordBatch::from(arrow.as_struct().clone()))
445 })
446 })
447 })
448 .buffered(num_partitions)
449 .map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));
450
451 let stream = if let Some(projector) = leftover_projector {
453 stream
454 .map(move |batch_result| {
455 batch_result.and_then(|batch| projector.project_batch(&batch))
456 })
457 .boxed()
458 } else {
459 stream.boxed()
460 };
461
462 Ok::<_, DataFusionError>(stream)
463 })
464 .try_flatten();
465
466 Ok(Box::pin(RecordBatchStreamAdapter::new(
467 Arc::clone(&self.leftover_schema),
468 stream,
469 )))
470 }
471
472 fn as_any(&self) -> &dyn Any {
473 self
474 }
475
476 fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
477 write!(
478 f,
479 "VortexScanSource: projection={}",
480 self.projected_projection
481 )?;
482 if let Some(filter) = &self.filter {
483 write!(f, ", filter={filter}")?;
484 }
485 if let Some(limit) = self.limit {
486 write!(f, ", limit={limit}")?;
487 }
488 Ok(())
489 }
490
491 fn repartitioned(
492 &self,
493 target_partitions: usize,
494 _repartition_file_min_size: usize,
495 output_ordering: Option<LexOrdering>,
496 ) -> DFResult<Option<Arc<dyn DataSource>>> {
497 let mut this = self.clone();
499 this.num_partitions = target_partitions;
500 this.ordered |= output_ordering.is_some();
501 Ok(Some(Arc::new(this)))
502 }
503
504 fn output_partitioning(&self) -> Partitioning {
505 Partitioning::UnknownPartitioning(1)
506 }
507
508 fn eq_properties(&self) -> EquivalenceProperties {
509 EquivalenceProperties::new(Arc::clone(&self.leftover_schema))
510 }
511
512 fn partition_statistics(&self, _partition: Option<usize>) -> DFResult<Statistics> {
513 let num_rows = estimate_to_df_precision(self.data_source.row_count().as_ref());
516
517 let total_byte_size = estimate_to_df_precision(self.data_source.byte_size().as_ref());
519
520 let column_statistics = self.leftover_statistics.clone();
523
524 Ok(Statistics {
525 num_rows,
526 total_byte_size,
527 column_statistics,
528 })
529 }
530
531 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
532 let mut this = self.clone();
533 this.limit = limit;
534 Some(Arc::new(this))
535 }
536
537 fn fetch(&self) -> Option<usize> {
538 self.limit
539 }
540
541 fn try_swapping_with_projection(
544 &self,
545 projection: &ProjectionExprs,
546 ) -> DFResult<Option<Arc<dyn DataSource>>> {
547 tracing::debug!(
548 "VortexScanSource: trying to swap with projection: {}",
549 projection
550 );
551
552 let convertor = DefaultExpressionConvertor::default();
553 let input_schema = self.initial_schema.as_ref();
554 let projected_schema = projection.project_schema(input_schema)?;
555
556 let ProcessedProjection {
560 scan_projection,
561 leftover_projection,
562 } = convertor.split_projection(projection.clone(), input_schema, &projected_schema)?;
563
564 let scan_projection = replace(scan_projection, &root(), self.initial_projection.clone());
567
568 let scan_dtype = scan_projection
570 .return_dtype(self.data_source.dtype())
571 .map_err(|e| DataFusionError::External(Box::new(e)))?;
572 let scan_output_schema = Arc::new(
573 self.session
574 .arrow()
575 .to_arrow_schema(&scan_dtype)
576 .map_err(|e| DataFusionError::External(Box::new(e)))?,
577 );
578
579 let leftover_projection = leftover_projection
581 .try_map_exprs(|expr| reassign_expr_columns(expr, &scan_output_schema))?;
582
583 let final_schema = Arc::new(projected_schema);
584
585 let mut this = self.clone();
586 this.projected_projection = scan_projection;
587 this.projected_schema = Arc::clone(&scan_output_schema);
588 this.projected_statistics =
589 vec![ColumnStatistics::new_unknown(); scan_output_schema.fields().len()];
590 this.leftover_projection = Some(leftover_projection);
591 this.leftover_schema = Arc::clone(&final_schema);
592 this.leftover_statistics =
593 vec![ColumnStatistics::new_unknown(); final_schema.fields().len()];
594
595 Ok(Some(Arc::new(this)))
596 }
597
598 fn try_pushdown_filters(
599 &self,
600 filters: Vec<Arc<dyn PhysicalExpr>>,
601 _config: &datafusion_common::config::ConfigOptions,
602 ) -> DFResult<FilterPushdownPropagation<Arc<dyn DataSource>>> {
603 if filters.is_empty() {
604 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
605 vec![],
606 ));
607 }
608
609 let convertor = DefaultExpressionConvertor::default();
610 let input_schema = self.initial_schema.as_ref();
611
612 let pushdown_results: Vec<PushedDown> = filters
615 .iter()
616 .map(|expr| {
617 if convertor.can_be_pushed_down(expr, input_schema) {
618 PushedDown::Yes
619 } else {
620 PushedDown::No
621 }
622 })
623 .collect();
624
625 if pushdown_results.iter().all(|p| matches!(p, PushedDown::No)) {
627 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
628 pushdown_results,
629 ));
630 }
631
632 let pushable: Vec<Arc<dyn PhysicalExpr>> = filters
634 .iter()
635 .zip(pushdown_results.iter())
636 .filter_map(|(expr, pushed)| match pushed {
637 PushedDown::Yes => Some(Arc::clone(expr)),
638 PushedDown::No => None,
639 })
640 .collect();
641
642 let vortex_pred = make_vortex_predicate(&convertor, &pushable)?;
644
645 let new_filter = match (&self.filter, vortex_pred) {
647 (Some(existing), Some(new_pred)) => Some(vx_and(existing.clone(), new_pred)),
648 (Some(existing), None) => Some(existing.clone()),
649 (None, Some(new_pred)) => Some(new_pred),
650 (None, None) => None,
651 };
652
653 let mut this = self.clone();
654 this.filter = new_filter;
655 Ok(
656 FilterPushdownPropagation::with_parent_pushdown_result(pushdown_results)
657 .with_updated_node(Arc::new(this) as _),
658 )
659 }
660}
661
662fn estimate_to_df_precision(est: Option<&Precision<u64>>) -> DFPrecision<usize> {
667 match est {
668 Some(Precision::Exact(v)) => DFPrecision::Exact(usize::try_from(*v).unwrap_or(usize::MAX)),
669 Some(Precision::Inexact(v)) => {
670 DFPrecision::Inexact(usize::try_from(*v).unwrap_or(usize::MAX))
671 }
672 None => DFPrecision::Absent,
673 }
674}