1use std::any::Any;
71use std::fmt;
72use std::fmt::Formatter;
73use std::sync::Arc;
74
75use arrow_schema::DataType;
76use arrow_schema::Schema;
77use arrow_schema::SchemaRef;
78use datafusion_common::ColumnStatistics;
79use datafusion_common::DataFusionError;
80use datafusion_common::Result as DFResult;
81use datafusion_common::Statistics;
82use datafusion_common::stats::Precision as DFPrecision;
83use datafusion_datasource::source::DataSource;
84use datafusion_execution::SendableRecordBatchStream;
85use datafusion_execution::TaskContext;
86use datafusion_physical_expr::EquivalenceProperties;
87use datafusion_physical_expr::Partitioning;
88use datafusion_physical_expr::PhysicalExpr;
89use datafusion_physical_expr::projection::ProjectionExprs;
90use datafusion_physical_expr::utils::reassign_expr_columns;
91use datafusion_physical_expr_common::sort_expr::LexOrdering;
92use datafusion_physical_plan::DisplayFormatType;
93use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
94use datafusion_physical_plan::filter_pushdown::PushedDown;
95use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
96use futures::StreamExt;
97use futures::TryStreamExt;
98use futures::future::try_join_all;
99use vortex::array::VortexSessionExecute;
100use vortex::array::arrow::ArrowArrayExecutor;
101use vortex::dtype::DType;
102use vortex::dtype::FieldPath;
103use vortex::dtype::Nullability;
104use vortex::error::VortexResult;
105use vortex::error::vortex_bail;
106use vortex::expr::Expression;
107use vortex::expr::and as vx_and;
108use vortex::expr::get_item;
109use vortex::expr::pack;
110use vortex::expr::root;
111use vortex::expr::stats::Precision;
112use vortex::expr::transform::replace;
113use vortex::io::session::RuntimeSessionExt;
114use vortex::scan::DataSourceRef;
115use vortex::scan::ScanRequest;
116use vortex::session::VortexSession;
117use vortex_utils::parallelism::get_available_parallelism;
118
119use crate::convert::exprs::DefaultExpressionConvertor;
120use crate::convert::exprs::ExpressionConvertor;
121use crate::convert::exprs::ProcessedProjection;
122use crate::convert::exprs::make_vortex_predicate;
123use crate::convert::stats::stats_set_to_df;
124
125pub struct VortexDataSourceBuilder {
165 data_source: DataSourceRef,
166 session: VortexSession,
167
168 arrow_schema: Option<SchemaRef>,
169 projection: Option<Vec<usize>>,
170}
171
172impl VortexDataSourceBuilder {
173 pub fn with_arrow_schema(mut self, arrow_schema: SchemaRef) -> Self {
181 self.arrow_schema = Some(arrow_schema);
182 self
183 }
184
185 pub fn with_projection(mut self, indices: Vec<usize>) -> Self {
190 self.projection = Some(indices);
191 self
192 }
193
194 pub fn with_some_projection(mut self, indices: Option<Vec<usize>>) -> Self {
196 self.projection = indices;
197 self
198 }
199
200 pub async fn build(self) -> VortexResult<VortexDataSource> {
206 let mut projection = root();
208
209 let mut arrow_schema = match self.arrow_schema {
211 Some(schema) => schema,
212 None => {
213 let data_type = self.data_source.dtype().to_arrow_dtype()?;
214 let DataType::Struct(fields) = data_type else {
215 vortex_bail!("Expected a struct-like DataType, found {}", data_type);
216 };
217 Arc::new(Schema::new(fields))
218 }
219 };
220
221 if let Some(indices) = self.projection {
223 let fields = indices.iter().map(|&i| {
224 let name = arrow_schema.field(i).name().clone();
225 let expr = get_item(name.as_str(), root());
226 (name, expr)
227 });
228
229 projection = pack(fields, Nullability::NonNullable);
231
232 arrow_schema = Arc::new(Schema::new(
234 indices
235 .iter()
236 .map(|&i| arrow_schema.field(i).clone())
237 .collect::<Vec<_>>(),
238 ));
239 }
240
241 let DType::Struct(fields, ..) = projection.return_dtype(self.data_source.dtype())? else {
242 vortex_bail!("Projection does not evaluate to a struct");
243 };
244
245 let field_paths: Vec<_> = fields
247 .names()
248 .iter()
249 .cloned()
250 .map(FieldPath::from_name)
251 .collect();
252 let statistics = try_join_all(
253 field_paths
254 .iter()
255 .map(|path| self.data_source.field_statistics(path)),
256 )
257 .await?
258 .iter()
259 .zip(fields.fields())
260 .map(|(stats, dtype)| stats_set_to_df(stats, &dtype))
261 .collect::<VortexResult<Vec<_>>>()?;
262
263 Ok(VortexDataSource {
264 data_source: self.data_source,
265 session: self.session,
266 initial_schema: Arc::clone(&arrow_schema),
267 initial_projection: projection.clone(),
268 initial_statistics: statistics.clone(),
269 projected_projection: projection.clone(),
270 projected_schema: Arc::clone(&arrow_schema),
271 projected_statistics: statistics.clone(),
272 leftover_projection: None,
273 leftover_schema: arrow_schema,
274 leftover_statistics: statistics,
275 filter: None,
276 limit: None,
277 ordered: false,
278 num_partitions: get_available_parallelism().unwrap_or(1),
279 })
280 }
281}
282
283impl VortexDataSource {
284 pub fn builder(data_source: DataSourceRef, session: VortexSession) -> VortexDataSourceBuilder {
286 VortexDataSourceBuilder {
287 data_source,
288 session,
289 arrow_schema: None,
290 projection: None,
291 }
292 }
293}
294
295#[derive(Clone)]
312pub struct VortexDataSource {
313 data_source: DataSourceRef,
315 session: VortexSession,
317
318 initial_schema: SchemaRef,
321 initial_projection: Expression,
323 #[expect(dead_code)]
325 initial_statistics: Vec<ColumnStatistics>,
326
327 projected_projection: Expression,
331 projected_schema: SchemaRef,
333 projected_statistics: Vec<ColumnStatistics>,
335
336 leftover_projection: Option<ProjectionExprs>,
341 leftover_schema: SchemaRef,
344 leftover_statistics: Vec<ColumnStatistics>,
346
347 filter: Option<Expression>,
350 limit: Option<usize>,
352 ordered: bool,
354
355 num_partitions: usize,
360}
361
362impl fmt::Debug for VortexDataSource {
363 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
364 f.debug_struct("VortexScanSource")
365 .field("schema", &self.leftover_schema)
366 .field("projection", &format!("{}", &self.projected_projection))
367 .field("filter", &self.filter.as_ref().map(|e| format!("{}", e)))
368 .field("limit", &self.limit)
369 .finish()
370 }
371}
372
373impl DataSource for VortexDataSource {
374 fn open(
375 &self,
376 partition: usize,
377 _context: Arc<TaskContext>,
378 ) -> DFResult<SendableRecordBatchStream> {
379 if partition != 0 {
382 return Err(DataFusionError::Internal(format!(
383 "VortexScanSource: expected partition 0, got {partition}"
384 )));
385 }
386
387 let scan_request = ScanRequest {
390 projection: self.projected_projection.clone(),
391 filter: self.filter.clone(),
392 limit: self.limit.map(|l| u64::try_from(l).unwrap_or(u64::MAX)),
393 ordered: self.ordered,
394 ..Default::default()
395 };
396
397 let data_source = Arc::clone(&self.data_source);
398 let projected_schema = Arc::clone(&self.projected_schema);
399 let session = self.session.clone();
400 let num_partitions = self.num_partitions;
401
402 let leftover_projector = self
404 .leftover_projection
405 .as_ref()
406 .map(|proj| proj.make_projector(&self.projected_schema))
407 .transpose()?;
408
409 let stream = futures::stream::once(async move {
411 let scan = data_source
412 .scan(scan_request)
413 .await
414 .map_err(|e| DataFusionError::External(Box::new(e)))?;
415
416 let scan_streams = scan.partitions().map(|split_result| {
421 let split = split_result?;
422 split.execute()
423 });
424
425 let handle = session.handle();
426 let stream = scan_streams
427 .try_flatten_unordered(Some(num_partitions * 2))
428 .map(move |result| {
429 let session = session.clone();
430 let schema = Arc::clone(&projected_schema);
431 handle.spawn_cpu(move || {
432 let mut ctx = session.create_execution_ctx();
433 result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx))
434 })
435 })
436 .buffered(num_partitions)
437 .map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));
438
439 let stream = if let Some(projector) = leftover_projector {
441 stream
442 .map(move |batch_result| {
443 batch_result.and_then(|batch| projector.project_batch(&batch))
444 })
445 .boxed()
446 } else {
447 stream.boxed()
448 };
449
450 Ok::<_, DataFusionError>(stream)
451 })
452 .try_flatten();
453
454 Ok(Box::pin(RecordBatchStreamAdapter::new(
455 Arc::clone(&self.leftover_schema),
456 stream,
457 )))
458 }
459
460 fn as_any(&self) -> &dyn Any {
461 self
462 }
463
464 fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
465 write!(
466 f,
467 "VortexScanSource: projection={}",
468 self.projected_projection
469 )?;
470 if let Some(filter) = &self.filter {
471 write!(f, ", filter={filter}")?;
472 }
473 if let Some(limit) = self.limit {
474 write!(f, ", limit={limit}")?;
475 }
476 Ok(())
477 }
478
479 fn repartitioned(
480 &self,
481 target_partitions: usize,
482 _repartition_file_min_size: usize,
483 output_ordering: Option<LexOrdering>,
484 ) -> DFResult<Option<Arc<dyn DataSource>>> {
485 let mut this = self.clone();
487 this.num_partitions = target_partitions;
488 this.ordered |= output_ordering.is_some();
489 Ok(Some(Arc::new(this)))
490 }
491
492 fn output_partitioning(&self) -> Partitioning {
493 Partitioning::UnknownPartitioning(1)
494 }
495
496 fn eq_properties(&self) -> EquivalenceProperties {
497 EquivalenceProperties::new(Arc::clone(&self.leftover_schema))
498 }
499
500 fn partition_statistics(&self, _partition: Option<usize>) -> DFResult<Statistics> {
501 let num_rows = estimate_to_df_precision(self.data_source.row_count().as_ref());
504
505 let total_byte_size = estimate_to_df_precision(self.data_source.byte_size().as_ref());
507
508 let column_statistics = self.leftover_statistics.clone();
511
512 Ok(Statistics {
513 num_rows,
514 total_byte_size,
515 column_statistics,
516 })
517 }
518
519 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
520 let mut this = self.clone();
521 this.limit = limit;
522 Some(Arc::new(this))
523 }
524
525 fn fetch(&self) -> Option<usize> {
526 self.limit
527 }
528
529 fn try_swapping_with_projection(
532 &self,
533 projection: &ProjectionExprs,
534 ) -> DFResult<Option<Arc<dyn DataSource>>> {
535 tracing::debug!(
536 "VortexScanSource: trying to swap with projection: {}",
537 projection
538 );
539
540 let convertor = DefaultExpressionConvertor::default();
541 let input_schema = self.initial_schema.as_ref();
542 let projected_schema = projection.project_schema(input_schema)?;
543
544 let ProcessedProjection {
548 scan_projection,
549 leftover_projection,
550 } = convertor.split_projection(projection.clone(), input_schema, &projected_schema)?;
551
552 let scan_projection = replace(scan_projection, &root(), self.initial_projection.clone());
555
556 let scan_dtype = scan_projection
558 .return_dtype(self.data_source.dtype())
559 .map_err(|e| DataFusionError::External(Box::new(e)))?;
560 let scan_arrow_type = scan_dtype
561 .to_arrow_dtype()
562 .map_err(|e| DataFusionError::External(Box::new(e)))?;
563 let DataType::Struct(scan_fields) = scan_arrow_type else {
564 return Err(DataFusionError::Internal(
565 "Scan projection must produce a struct type".to_string(),
566 ));
567 };
568 let scan_output_schema = Arc::new(Schema::new(scan_fields));
569
570 let leftover_projection = leftover_projection
572 .try_map_exprs(|expr| reassign_expr_columns(expr, &scan_output_schema))?;
573
574 let final_schema = Arc::new(projected_schema);
575
576 let mut this = self.clone();
577 this.projected_projection = scan_projection;
578 this.projected_schema = Arc::clone(&scan_output_schema);
579 this.projected_statistics =
580 vec![ColumnStatistics::new_unknown(); scan_output_schema.fields().len()];
581 this.leftover_projection = Some(leftover_projection);
582 this.leftover_schema = Arc::clone(&final_schema);
583 this.leftover_statistics =
584 vec![ColumnStatistics::new_unknown(); final_schema.fields().len()];
585
586 Ok(Some(Arc::new(this)))
587 }
588
589 fn try_pushdown_filters(
590 &self,
591 filters: Vec<Arc<dyn PhysicalExpr>>,
592 _config: &datafusion_common::config::ConfigOptions,
593 ) -> DFResult<FilterPushdownPropagation<Arc<dyn DataSource>>> {
594 if filters.is_empty() {
595 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
596 vec![],
597 ));
598 }
599
600 let convertor = DefaultExpressionConvertor::default();
601 let input_schema = self.initial_schema.as_ref();
602
603 let pushdown_results: Vec<PushedDown> = filters
606 .iter()
607 .map(|expr| {
608 if convertor.can_be_pushed_down(expr, input_schema) {
609 PushedDown::Yes
610 } else {
611 PushedDown::No
612 }
613 })
614 .collect();
615
616 if pushdown_results.iter().all(|p| matches!(p, PushedDown::No)) {
618 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
619 pushdown_results,
620 ));
621 }
622
623 let pushable: Vec<Arc<dyn PhysicalExpr>> = filters
625 .iter()
626 .zip(pushdown_results.iter())
627 .filter_map(|(expr, pushed)| match pushed {
628 PushedDown::Yes => Some(Arc::clone(expr)),
629 PushedDown::No => None,
630 })
631 .collect();
632
633 let vortex_pred = make_vortex_predicate(&convertor, &pushable)?;
635
636 let new_filter = match (&self.filter, vortex_pred) {
638 (Some(existing), Some(new_pred)) => Some(vx_and(existing.clone(), new_pred)),
639 (Some(existing), None) => Some(existing.clone()),
640 (None, Some(new_pred)) => Some(new_pred),
641 (None, None) => None,
642 };
643
644 let mut this = self.clone();
645 this.filter = new_filter;
646 Ok(
647 FilterPushdownPropagation::with_parent_pushdown_result(pushdown_results)
648 .with_updated_node(Arc::new(this) as _),
649 )
650 }
651}
652
653fn estimate_to_df_precision(est: Option<&Precision<u64>>) -> DFPrecision<usize> {
658 match est {
659 Some(Precision::Exact(v)) => DFPrecision::Exact(usize::try_from(*v).unwrap_or(usize::MAX)),
660 Some(Precision::Inexact(v)) => {
661 DFPrecision::Inexact(usize::try_from(*v).unwrap_or(usize::MAX))
662 }
663 None => DFPrecision::Absent,
664 }
665}