1use std::fmt;
71use std::fmt::Formatter;
72use std::sync::Arc;
73
74use arrow_schema::Field;
75use arrow_schema::Schema;
76use arrow_schema::SchemaRef;
77use datafusion_common::ColumnStatistics;
78use datafusion_common::DataFusionError;
79use datafusion_common::Result as DFResult;
80use datafusion_common::Statistics;
81use datafusion_common::arrow::array::AsArray;
82use datafusion_common::arrow::array::RecordBatch;
83use datafusion_common::stats::Precision as DFPrecision;
84use datafusion_datasource::source::DataSource;
85use datafusion_execution::SendableRecordBatchStream;
86use datafusion_execution::TaskContext;
87use datafusion_physical_expr::EquivalenceProperties;
88use datafusion_physical_expr::Partitioning;
89use datafusion_physical_expr::PhysicalExpr;
90use datafusion_physical_expr::projection::ProjectionExprs;
91use datafusion_physical_expr::utils::reassign_expr_columns;
92use datafusion_physical_expr_common::sort_expr::LexOrdering;
93use datafusion_physical_plan::DisplayFormatType;
94use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
95use datafusion_physical_plan::filter_pushdown::PushedDown;
96use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
97use futures::StreamExt;
98use futures::TryStreamExt;
99use futures::future::try_join_all;
100use vortex::array::VortexSessionExecute;
101use vortex::array::arrow::ArrowSessionExt;
102use vortex::dtype::DType;
103use vortex::dtype::FieldPath;
104use vortex::dtype::Nullability;
105use vortex::error::VortexResult;
106use vortex::error::vortex_bail;
107use vortex::expr::Expression;
108use vortex::expr::and as vx_and;
109use vortex::expr::get_item;
110use vortex::expr::pack;
111use vortex::expr::root;
112use vortex::expr::stats::Precision;
113use vortex::expr::transform::replace;
114use vortex::io::session::RuntimeSessionExt;
115use vortex::scan::DataSourceRef;
116use vortex::scan::ScanRequest;
117use vortex::session::VortexSession;
118use vortex_utils::parallelism::get_available_parallelism;
119
120use crate::convert::exprs::DefaultExpressionConvertor;
121use crate::convert::exprs::ExpressionConvertor;
122use crate::convert::exprs::ProcessedProjection;
123use crate::convert::exprs::make_vortex_predicate;
124use crate::convert::stats::stats_set_to_df;
125
126pub struct VortexDataSourceBuilder {
166 data_source: DataSourceRef,
167 session: VortexSession,
168
169 arrow_schema: Option<SchemaRef>,
170 projection: Option<Vec<usize>>,
171}
172
173impl VortexDataSourceBuilder {
174 pub fn with_arrow_schema(mut self, arrow_schema: SchemaRef) -> Self {
182 self.arrow_schema = Some(arrow_schema);
183 self
184 }
185
186 pub fn with_projection(mut self, indices: Vec<usize>) -> Self {
191 self.projection = Some(indices);
192 self
193 }
194
195 pub fn with_some_projection(mut self, indices: Option<Vec<usize>>) -> Self {
197 self.projection = indices;
198 self
199 }
200
201 pub async fn build(self) -> VortexResult<VortexDataSource> {
207 let mut projection = root();
209
210 let mut arrow_schema = match self.arrow_schema {
212 Some(schema) => schema,
213 None => Arc::new(
214 self.session
215 .arrow()
216 .to_arrow_schema(self.data_source.dtype())?,
217 ),
218 };
219
220 if let Some(indices) = self.projection {
222 let fields = indices.iter().map(|&i| {
223 let name = arrow_schema.field(i).name().clone();
224 let expr = get_item(name.as_str(), root());
225 (name, expr)
226 });
227
228 projection = pack(fields, Nullability::NonNullable);
230
231 arrow_schema = Arc::new(Schema::new(
233 indices
234 .iter()
235 .map(|&i| arrow_schema.field(i).clone())
236 .collect::<Vec<_>>(),
237 ));
238 }
239
240 let DType::Struct(fields, ..) = projection.return_dtype(self.data_source.dtype())? else {
241 vortex_bail!("Projection does not evaluate to a struct");
242 };
243
244 let field_paths: Vec<_> = fields
246 .names()
247 .iter()
248 .cloned()
249 .map(FieldPath::from_name)
250 .collect();
251 let statistics = try_join_all(
252 field_paths
253 .iter()
254 .map(|path| self.data_source.field_statistics(path)),
255 )
256 .await?
257 .iter()
258 .zip(fields.fields())
259 .map(|(stats, dtype)| stats_set_to_df(stats, &dtype))
260 .collect::<VortexResult<Vec<_>>>()?;
261
262 Ok(VortexDataSource {
263 data_source: self.data_source,
264 session: self.session,
265 initial_schema: Arc::clone(&arrow_schema),
266 initial_projection: projection.clone(),
267 initial_statistics: statistics.clone(),
268 projected_projection: projection.clone(),
269 projected_schema: Arc::clone(&arrow_schema),
270 projected_statistics: statistics.clone(),
271 leftover_projection: None,
272 leftover_schema: arrow_schema,
273 leftover_statistics: statistics,
274 filter: None,
275 limit: None,
276 ordered: false,
277 num_partitions: get_available_parallelism().unwrap_or(1),
278 })
279 }
280}
281
282impl VortexDataSource {
283 pub fn builder(data_source: DataSourceRef, session: VortexSession) -> VortexDataSourceBuilder {
285 VortexDataSourceBuilder {
286 data_source,
287 session,
288 arrow_schema: None,
289 projection: None,
290 }
291 }
292}
293
294#[derive(Clone)]
311pub struct VortexDataSource {
312 data_source: DataSourceRef,
314 session: VortexSession,
316
317 initial_schema: SchemaRef,
320 initial_projection: Expression,
322 #[expect(dead_code)]
324 initial_statistics: Vec<ColumnStatistics>,
325
326 projected_projection: Expression,
330 projected_schema: SchemaRef,
332 projected_statistics: Vec<ColumnStatistics>,
334
335 leftover_projection: Option<ProjectionExprs>,
340 leftover_schema: SchemaRef,
343 leftover_statistics: Vec<ColumnStatistics>,
345
346 filter: Option<Expression>,
349 limit: Option<usize>,
351 ordered: bool,
353
354 num_partitions: usize,
359}
360
361impl fmt::Debug for VortexDataSource {
362 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
363 f.debug_struct("VortexScanSource")
364 .field("schema", &self.leftover_schema)
365 .field("projection", &format!("{}", &self.projected_projection))
366 .field("filter", &self.filter.as_ref().map(|e| format!("{}", e)))
367 .field("limit", &self.limit)
368 .finish()
369 }
370}
371
372impl DataSource for VortexDataSource {
373 fn open(
374 &self,
375 partition: usize,
376 _context: Arc<TaskContext>,
377 ) -> DFResult<SendableRecordBatchStream> {
378 if partition != 0 {
381 return Err(DataFusionError::Internal(format!(
382 "VortexScanSource: expected partition 0, got {partition}"
383 )));
384 }
385
386 let scan_request = ScanRequest {
389 projection: self.projected_projection.clone(),
390 filter: self.filter.clone(),
391 limit: self.limit.map(|l| u64::try_from(l).unwrap_or(u64::MAX)),
392 ordered: self.ordered,
393 ..Default::default()
394 };
395
396 let data_source = Arc::clone(&self.data_source);
397 let projected_schema = Arc::clone(&self.projected_schema);
398 let projected_target_field = Arc::new(Field::new_struct(
399 "",
400 projected_schema.fields().clone(),
401 false,
402 ));
403 let session = self.session.clone();
404 let num_partitions = self.num_partitions;
405
406 let leftover_projector = self
408 .leftover_projection
409 .as_ref()
410 .map(|proj| proj.make_projector(&self.projected_schema))
411 .transpose()?;
412
413 let stream = futures::stream::once(async move {
415 let scan = data_source
416 .scan(scan_request)
417 .await
418 .map_err(|e| DataFusionError::External(Box::new(e)))?;
419
420 let scan_streams = scan.partitions().map(|split_result| {
425 let split = split_result?;
426 split.execute()
427 });
428
429 let handle = session.handle();
430 let stream = scan_streams
431 .try_flatten_unordered(Some(num_partitions * 2))
432 .map(move |result| {
433 let session = session.clone();
434 let target_field = Arc::clone(&projected_target_field);
435 handle.spawn_cpu(move || {
436 let mut ctx = session.create_execution_ctx();
437 result.and_then(|chunk| {
438 let arrow = session.arrow().execute_arrow(
439 chunk,
440 Some(target_field.as_ref()),
441 &mut ctx,
442 )?;
443 Ok(RecordBatch::from(arrow.as_struct().clone()))
444 })
445 })
446 })
447 .buffered(num_partitions)
448 .map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));
449
450 let stream = if let Some(projector) = leftover_projector {
452 stream
453 .map(move |batch_result| {
454 batch_result.and_then(|batch| projector.project_batch(&batch))
455 })
456 .boxed()
457 } else {
458 stream.boxed()
459 };
460
461 Ok::<_, DataFusionError>(stream)
462 })
463 .try_flatten();
464
465 Ok(Box::pin(RecordBatchStreamAdapter::new(
466 Arc::clone(&self.leftover_schema),
467 stream,
468 )))
469 }
470
471 fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
472 write!(
473 f,
474 "VortexScanSource: projection={}",
475 self.projected_projection
476 )?;
477 if let Some(filter) = &self.filter {
478 write!(f, ", filter={filter}")?;
479 }
480 if let Some(limit) = self.limit {
481 write!(f, ", limit={limit}")?;
482 }
483 Ok(())
484 }
485
486 fn repartitioned(
487 &self,
488 target_partitions: usize,
489 _repartition_file_min_size: usize,
490 output_ordering: Option<LexOrdering>,
491 ) -> DFResult<Option<Arc<dyn DataSource>>> {
492 let mut this = self.clone();
494 this.num_partitions = target_partitions;
495 this.ordered |= output_ordering.is_some();
496 Ok(Some(Arc::new(this)))
497 }
498
499 fn output_partitioning(&self) -> Partitioning {
500 Partitioning::UnknownPartitioning(1)
501 }
502
503 fn eq_properties(&self) -> EquivalenceProperties {
504 EquivalenceProperties::new(Arc::clone(&self.leftover_schema))
505 }
506
507 fn partition_statistics(&self, _partition: Option<usize>) -> DFResult<Arc<Statistics>> {
508 let num_rows = estimate_to_df_precision(&self.data_source.row_count());
511
512 let total_byte_size = estimate_to_df_precision(&self.data_source.byte_size());
514
515 let column_statistics = self.leftover_statistics.clone();
518
519 Ok(Arc::new(Statistics {
520 num_rows,
521 total_byte_size,
522 column_statistics,
523 }))
524 }
525
526 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
527 let mut this = self.clone();
528 this.limit = limit;
529 Some(Arc::new(this))
530 }
531
532 fn fetch(&self) -> Option<usize> {
533 self.limit
534 }
535
536 fn try_swapping_with_projection(
539 &self,
540 projection: &ProjectionExprs,
541 ) -> DFResult<Option<Arc<dyn DataSource>>> {
542 tracing::debug!(
543 "VortexScanSource: trying to swap with projection: {}",
544 projection
545 );
546
547 let convertor = DefaultExpressionConvertor::default();
548 let input_schema = self.initial_schema.as_ref();
549 let projected_schema = projection.project_schema(input_schema)?;
550
551 let ProcessedProjection {
555 scan_projection,
556 leftover_projection,
557 } = convertor.split_projection(projection.clone(), input_schema, &projected_schema)?;
558
559 let scan_projection = replace(scan_projection, &root(), self.initial_projection.clone());
562
563 let scan_dtype = scan_projection
565 .return_dtype(self.data_source.dtype())
566 .map_err(|e| DataFusionError::External(Box::new(e)))?;
567 let scan_output_schema = Arc::new(
568 self.session
569 .arrow()
570 .to_arrow_schema(&scan_dtype)
571 .map_err(|e| DataFusionError::External(Box::new(e)))?,
572 );
573
574 let leftover_projection = leftover_projection
576 .try_map_exprs(|expr| reassign_expr_columns(expr, &scan_output_schema))?;
577
578 let final_schema = Arc::new(projected_schema);
579
580 let mut this = self.clone();
581 this.projected_projection = scan_projection;
582 this.projected_schema = Arc::clone(&scan_output_schema);
583 this.projected_statistics =
584 vec![ColumnStatistics::new_unknown(); scan_output_schema.fields().len()];
585 this.leftover_projection = Some(leftover_projection);
586 this.leftover_schema = Arc::clone(&final_schema);
587 this.leftover_statistics =
588 vec![ColumnStatistics::new_unknown(); final_schema.fields().len()];
589
590 Ok(Some(Arc::new(this)))
591 }
592
593 fn try_pushdown_filters(
594 &self,
595 filters: Vec<Arc<dyn PhysicalExpr>>,
596 _config: &datafusion_common::config::ConfigOptions,
597 ) -> DFResult<FilterPushdownPropagation<Arc<dyn DataSource>>> {
598 if filters.is_empty() {
599 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
600 vec![],
601 ));
602 }
603
604 let convertor = DefaultExpressionConvertor::default();
605 let input_schema = self.initial_schema.as_ref();
606
607 let pushdown_results: Vec<PushedDown> = filters
610 .iter()
611 .map(|expr| {
612 if convertor.can_be_pushed_down(expr, input_schema) {
613 PushedDown::Yes
614 } else {
615 PushedDown::No
616 }
617 })
618 .collect();
619
620 if pushdown_results.iter().all(|p| matches!(p, PushedDown::No)) {
622 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
623 pushdown_results,
624 ));
625 }
626
627 let pushable: Vec<Arc<dyn PhysicalExpr>> = filters
629 .iter()
630 .zip(pushdown_results.iter())
631 .filter_map(|(expr, pushed)| match pushed {
632 PushedDown::Yes => Some(Arc::clone(expr)),
633 PushedDown::No => None,
634 })
635 .collect();
636
637 let vortex_pred = make_vortex_predicate(&convertor, &pushable)?;
639
640 let new_filter = match (&self.filter, vortex_pred) {
642 (Some(existing), Some(new_pred)) => Some(vx_and(existing.clone(), new_pred)),
643 (Some(existing), None) => Some(existing.clone()),
644 (None, Some(new_pred)) => Some(new_pred),
645 (None, None) => None,
646 };
647
648 let mut this = self.clone();
649 this.filter = new_filter;
650 Ok(
651 FilterPushdownPropagation::with_parent_pushdown_result(pushdown_results)
652 .with_updated_node(Arc::new(this) as _),
653 )
654 }
655}
656
657fn estimate_to_df_precision(est: &Precision<u64>) -> DFPrecision<usize> {
662 match est {
663 Precision::Exact(v) => DFPrecision::Exact(usize::try_from(*v).unwrap_or(usize::MAX)),
664 Precision::Inexact(v) => DFPrecision::Inexact(usize::try_from(*v).unwrap_or(usize::MAX)),
665 Precision::Absent => DFPrecision::Absent,
666 }
667}