1use std::any::Any;
10use std::fmt;
11use std::fmt::Formatter;
12use std::num::NonZero;
13use std::num::NonZeroUsize;
14use std::sync::Arc;
15
16use arrow_schema::DataType;
17use arrow_schema::Schema;
18use arrow_schema::SchemaRef;
19use datafusion_common::ColumnStatistics;
20use datafusion_common::DataFusionError;
21use datafusion_common::Result as DFResult;
22use datafusion_common::Statistics;
23use datafusion_common::stats::Precision as DFPrecision;
24use datafusion_datasource::source::DataSource;
25use datafusion_execution::SendableRecordBatchStream;
26use datafusion_execution::TaskContext;
27use datafusion_physical_expr::EquivalenceProperties;
28use datafusion_physical_expr::Partitioning;
29use datafusion_physical_expr::PhysicalExpr;
30use datafusion_physical_expr::projection::ProjectionExprs;
31use datafusion_physical_expr::utils::reassign_expr_columns;
32use datafusion_physical_expr_common::sort_expr::LexOrdering;
33use datafusion_physical_plan::DisplayFormatType;
34use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
35use datafusion_physical_plan::filter_pushdown::PushedDown;
36use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
37use futures::StreamExt;
38use futures::TryStreamExt;
39use futures::future::try_join_all;
40use vortex::array::VortexSessionExecute;
41use vortex::array::arrow::ArrowArrayExecutor;
42use vortex::dtype::DType;
43use vortex::dtype::FieldPath;
44use vortex::dtype::Nullability;
45use vortex::error::VortexExpect;
46use vortex::error::VortexResult;
47use vortex::error::vortex_bail;
48use vortex::expr::Expression;
49use vortex::expr::and as vx_and;
50use vortex::expr::get_item;
51use vortex::expr::pack;
52use vortex::expr::root;
53use vortex::expr::stats::Precision;
54use vortex::expr::transform::replace;
55use vortex::io::session::RuntimeSessionExt;
56use vortex::scan::api::DataSourceRef;
57use vortex::scan::api::ScanRequest;
58use vortex::session::VortexSession;
59
60use crate::convert::exprs::DefaultExpressionConvertor;
61use crate::convert::exprs::ExpressionConvertor;
62use crate::convert::exprs::ProcessedProjection;
63use crate::convert::exprs::make_vortex_predicate;
64use crate::convert::stats::stats_set_to_df;
65
66pub struct VortexDataSourceBuilder {
68 data_source: DataSourceRef,
69 session: VortexSession,
70
71 arrow_schema: Option<SchemaRef>,
72 projection: Option<Vec<usize>>,
73}
74
75impl VortexDataSourceBuilder {
76 pub fn with_arrow_schema(mut self, arrow_schema: SchemaRef) -> Self {
82 self.arrow_schema = Some(arrow_schema);
83 self
84 }
85
86 pub fn with_projection(mut self, indices: Vec<usize>) -> Self {
88 self.projection = Some(indices);
89 self
90 }
91
92 pub fn with_some_projection(mut self, indices: Option<Vec<usize>>) -> Self {
94 self.projection = indices;
95 self
96 }
97
98 pub async fn build(self) -> VortexResult<VortexDataSource> {
105 let mut projection = root();
107
108 let mut arrow_schema = match self.arrow_schema {
110 Some(schema) => schema,
111 None => {
112 let data_type = self.data_source.dtype().to_arrow_dtype()?;
113 let DataType::Struct(fields) = data_type else {
114 vortex_bail!("Expected a struct-like DataType, found {}", data_type);
115 };
116 Arc::new(Schema::new(fields))
117 }
118 };
119
120 if let Some(indices) = self.projection {
122 let fields = indices.iter().map(|&i| {
123 let name = arrow_schema.field(i).name().clone();
124 let expr = get_item(name.as_str(), root());
125 (name, expr)
126 });
127
128 projection = pack(fields, Nullability::NonNullable);
130
131 arrow_schema = Arc::new(Schema::new(
133 indices
134 .iter()
135 .map(|&i| arrow_schema.field(i).clone())
136 .collect::<Vec<_>>(),
137 ));
138 }
139
140 let DType::Struct(fields, ..) = projection.return_dtype(self.data_source.dtype())? else {
141 vortex_bail!("Projection does not evaluate to a struct");
142 };
143
144 let field_paths: Vec<_> = fields
146 .names()
147 .iter()
148 .cloned()
149 .map(FieldPath::from_name)
150 .collect();
151 let statistics = try_join_all(
152 field_paths
153 .iter()
154 .map(|path| self.data_source.field_statistics(path)),
155 )
156 .await?
157 .iter()
158 .zip(fields.fields())
159 .map(|(stats, dtype)| stats_set_to_df(stats, &dtype))
160 .collect::<VortexResult<Vec<_>>>()?;
161
162 Ok(VortexDataSource {
163 data_source: self.data_source,
164 session: self.session,
165 initial_schema: arrow_schema.clone(),
166 initial_projection: projection.clone(),
167 initial_statistics: statistics.clone(),
168 projected_projection: projection.clone(),
169 projected_schema: arrow_schema.clone(),
170 projected_statistics: statistics.clone(),
171 leftover_projection: None,
172 leftover_schema: arrow_schema,
173 leftover_statistics: statistics,
174 filter: None,
175 limit: None,
176 ordered: false,
177 num_partitions: std::thread::available_parallelism().unwrap_or_else(|_| {
178 NonZero::new(1).vortex_expect("available parallelism must be non-zero")
179 }),
180 })
181 }
182}
183
184impl VortexDataSource {
185 pub fn builder(data_source: DataSourceRef, session: VortexSession) -> VortexDataSourceBuilder {
187 VortexDataSourceBuilder {
188 data_source,
189 session,
190 arrow_schema: None,
191 projection: None,
192 }
193 }
194}
195
196#[derive(Clone)]
203pub struct VortexDataSource {
204 data_source: DataSourceRef,
206 session: VortexSession,
208
209 initial_schema: SchemaRef,
212 initial_projection: Expression,
214 #[allow(dead_code)]
216 initial_statistics: Vec<ColumnStatistics>,
217
218 projected_projection: Expression,
222 projected_schema: SchemaRef,
224 projected_statistics: Vec<ColumnStatistics>,
226
227 leftover_projection: Option<ProjectionExprs>,
232 leftover_schema: SchemaRef,
235 leftover_statistics: Vec<ColumnStatistics>,
237
238 filter: Option<Expression>,
241 limit: Option<usize>,
243 ordered: bool,
245
246 num_partitions: NonZeroUsize,
251}
252
253impl fmt::Debug for VortexDataSource {
254 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
255 f.debug_struct("VortexScanSource")
256 .field("schema", &self.leftover_schema)
257 .field("projection", &format!("{}", &self.projected_projection))
258 .field("filter", &self.filter.as_ref().map(|e| format!("{}", e)))
259 .field("limit", &self.limit)
260 .finish()
261 }
262}
263
264impl DataSource for VortexDataSource {
265 fn open(
266 &self,
267 partition: usize,
268 _context: Arc<TaskContext>,
269 ) -> DFResult<SendableRecordBatchStream> {
270 if partition != 0 {
273 return Err(DataFusionError::Internal(format!(
274 "VortexScanSource: expected partition 0, got {partition}"
275 )));
276 }
277
278 let scan_request = ScanRequest {
281 projection: self.projected_projection.clone(),
282 filter: self.filter.clone(),
283 limit: self.limit.map(|l| u64::try_from(l).unwrap_or(u64::MAX)),
284 ordered: self.ordered,
285 ..Default::default()
286 };
287
288 let data_source = self.data_source.clone();
289 let projected_schema = self.projected_schema.clone();
290 let session = self.session.clone();
291 let num_partitions = self.num_partitions;
292
293 let leftover_projector = self
295 .leftover_projection
296 .as_ref()
297 .map(|proj| proj.make_projector(&self.projected_schema))
298 .transpose()?;
299
300 let stream = futures::stream::once(async move {
302 let scan = data_source
303 .scan(scan_request)
304 .await
305 .map_err(|e| DataFusionError::External(Box::new(e)))?;
306
307 let scan_streams = scan.partitions().map(|split_result| {
312 let split = split_result?;
313 split.execute()
314 });
315
316 let handle = session.handle();
317 let stream = scan_streams
318 .try_flatten_unordered(Some(num_partitions.get() * 2))
319 .map(move |result| {
320 let session = session.clone();
321 let schema = projected_schema.clone();
322 handle.spawn_cpu(move || {
323 let mut ctx = session.create_execution_ctx();
324 result.and_then(|chunk| chunk.execute_record_batch(&schema, &mut ctx))
325 })
326 })
327 .buffered(num_partitions.get())
328 .map(|result| result.map_err(|e| DataFusionError::External(Box::new(e))));
329
330 let stream = if let Some(projector) = leftover_projector {
332 stream
333 .map(move |batch_result| {
334 batch_result.and_then(|batch| projector.project_batch(&batch))
335 })
336 .boxed()
337 } else {
338 stream.boxed()
339 };
340
341 Ok::<_, DataFusionError>(stream)
342 })
343 .try_flatten();
344
345 Ok(Box::pin(RecordBatchStreamAdapter::new(
346 self.leftover_schema.clone(),
347 stream,
348 )))
349 }
350
351 fn as_any(&self) -> &dyn Any {
352 self
353 }
354
355 fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
356 write!(
357 f,
358 "VortexScanSource: projection={}",
359 self.projected_projection
360 )?;
361 if let Some(ref filter) = self.filter {
362 write!(f, ", filter={filter}")?;
363 }
364 if let Some(limit) = self.limit {
365 write!(f, ", limit={limit}")?;
366 }
367 Ok(())
368 }
369
370 fn repartitioned(
371 &self,
372 target_partitions: usize,
373 _repartition_file_min_size: usize,
374 output_ordering: Option<LexOrdering>,
375 ) -> DFResult<Option<Arc<dyn DataSource>>> {
376 let mut this = self.clone();
378 this.num_partitions = NonZero::new(target_partitions)
379 .ok_or_else(|| DataFusionError::Internal("non-zero partitions".to_string()))?;
380 this.ordered |= output_ordering.is_some();
381 Ok(Some(Arc::new(this)))
382 }
383
384 fn output_partitioning(&self) -> Partitioning {
385 Partitioning::UnknownPartitioning(1)
386 }
387
388 fn eq_properties(&self) -> EquivalenceProperties {
389 EquivalenceProperties::new(self.leftover_schema.clone())
390 }
391
392 fn partition_statistics(&self, _partition: Option<usize>) -> DFResult<Statistics> {
393 let num_rows = estimate_to_df_precision(&self.data_source.row_count());
396
397 let total_byte_size = estimate_to_df_precision(&self.data_source.byte_size());
399
400 let column_statistics = self.leftover_statistics.clone();
403
404 Ok(Statistics {
405 num_rows,
406 total_byte_size,
407 column_statistics,
408 })
409 }
410
411 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
412 let mut this = self.clone();
413 this.limit = limit;
414 Some(Arc::new(this))
415 }
416
417 fn fetch(&self) -> Option<usize> {
418 self.limit
419 }
420
421 fn try_swapping_with_projection(
424 &self,
425 projection: &ProjectionExprs,
426 ) -> DFResult<Option<Arc<dyn DataSource>>> {
427 tracing::debug!(
428 "VortexScanSource: trying to swap with projection: {}",
429 projection
430 );
431
432 let convertor = DefaultExpressionConvertor::default();
433 let input_schema = self.initial_schema.as_ref();
434 let projected_schema = projection.project_schema(input_schema)?;
435
436 let ProcessedProjection {
440 scan_projection,
441 leftover_projection,
442 } = convertor.split_projection(projection.clone(), input_schema, &projected_schema)?;
443
444 let scan_projection = replace(scan_projection, &root(), self.initial_projection.clone());
447
448 let scan_dtype = scan_projection
450 .return_dtype(self.data_source.dtype())
451 .map_err(|e| DataFusionError::External(Box::new(e)))?;
452 let scan_arrow_type = scan_dtype
453 .to_arrow_dtype()
454 .map_err(|e| DataFusionError::External(Box::new(e)))?;
455 let DataType::Struct(scan_fields) = scan_arrow_type else {
456 return Err(DataFusionError::Internal(
457 "Scan projection must produce a struct type".to_string(),
458 ));
459 };
460 let scan_output_schema = Arc::new(Schema::new(scan_fields));
461
462 let leftover_projection = leftover_projection
464 .try_map_exprs(|expr| reassign_expr_columns(expr, &scan_output_schema))?;
465
466 let final_schema = Arc::new(projected_schema);
467
468 let mut this = self.clone();
469 this.projected_projection = scan_projection;
470 this.projected_schema = scan_output_schema.clone();
471 this.projected_statistics =
472 vec![ColumnStatistics::new_unknown(); scan_output_schema.fields().len()];
473 this.leftover_projection = Some(leftover_projection);
474 this.leftover_schema = final_schema.clone();
475 this.leftover_statistics =
476 vec![ColumnStatistics::new_unknown(); final_schema.fields().len()];
477
478 Ok(Some(Arc::new(this)))
479 }
480
481 fn try_pushdown_filters(
482 &self,
483 filters: Vec<Arc<dyn PhysicalExpr>>,
484 _config: &datafusion_common::config::ConfigOptions,
485 ) -> DFResult<FilterPushdownPropagation<Arc<dyn DataSource>>> {
486 if filters.is_empty() {
487 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
488 vec![],
489 ));
490 }
491
492 let convertor = DefaultExpressionConvertor::default();
493 let input_schema = self.initial_schema.as_ref();
494
495 let pushdown_results: Vec<PushedDown> = filters
498 .iter()
499 .map(|expr| {
500 if convertor.can_be_pushed_down(expr, input_schema) {
501 PushedDown::Yes
502 } else {
503 PushedDown::No
504 }
505 })
506 .collect();
507
508 if pushdown_results.iter().all(|p| matches!(p, PushedDown::No)) {
510 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
511 pushdown_results,
512 ));
513 }
514
515 let pushable: Vec<Arc<dyn PhysicalExpr>> = filters
517 .iter()
518 .zip(pushdown_results.iter())
519 .filter_map(|(expr, pushed)| match pushed {
520 PushedDown::Yes => Some(expr.clone()),
521 PushedDown::No => None,
522 })
523 .collect();
524
525 let vortex_pred = make_vortex_predicate(&convertor, &pushable)?;
527
528 let new_filter = match (&self.filter, vortex_pred) {
530 (Some(existing), Some(new_pred)) => Some(vx_and(existing.clone(), new_pred)),
531 (Some(existing), None) => Some(existing.clone()),
532 (None, Some(new_pred)) => Some(new_pred),
533 (None, None) => None,
534 };
535
536 let mut this = self.clone();
537 this.filter = new_filter;
538 Ok(
539 FilterPushdownPropagation::with_parent_pushdown_result(pushdown_results)
540 .with_updated_node(Arc::new(this) as _),
541 )
542 }
543}
544
545fn estimate_to_df_precision(est: &Option<Precision<u64>>) -> DFPrecision<usize> {
547 match est {
548 Some(Precision::Exact(v)) => DFPrecision::Exact(usize::try_from(*v).unwrap_or(usize::MAX)),
549 Some(Precision::Inexact(v)) => {
550 DFPrecision::Inexact(usize::try_from(*v).unwrap_or(usize::MAX))
551 }
552 None => DFPrecision::Absent,
553 }
554}