vortex_datafusion/persistent/
format.rs1use std::any::Any;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::sync::Arc;
8
9use arrow_schema::Schema;
10use arrow_schema::SchemaRef;
11use async_trait::async_trait;
12use datafusion_catalog::Session;
13use datafusion_common::ColumnStatistics;
14use datafusion_common::DataFusionError;
15use datafusion_common::GetExt;
16use datafusion_common::Result as DFResult;
17use datafusion_common::ScalarValue as DFScalarValue;
18use datafusion_common::Statistics;
19use datafusion_common::config::ConfigField;
20use datafusion_common::config_namespace;
21use datafusion_common::internal_datafusion_err;
22use datafusion_common::not_impl_err;
23use datafusion_common::parsers::CompressionTypeVariant;
24use datafusion_common::stats::Precision;
25use datafusion_common_runtime::SpawnedTask;
26use datafusion_datasource::TableSchema;
27use datafusion_datasource::file::FileSource;
28use datafusion_datasource::file_compression_type::FileCompressionType;
29use datafusion_datasource::file_format::FileFormat;
30use datafusion_datasource::file_format::FileFormatFactory;
31use datafusion_datasource::file_scan_config::FileScanConfig;
32use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
33use datafusion_datasource::file_sink_config::FileSinkConfig;
34use datafusion_datasource::sink::DataSinkExec;
35use datafusion_datasource::source::DataSourceExec;
36use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry;
37use datafusion_expr::dml::InsertOp;
38use datafusion_physical_expr::LexRequirement;
39use datafusion_physical_plan::ExecutionPlan;
40use futures::FutureExt;
41use futures::StreamExt as _;
42use futures::TryStreamExt as _;
43use futures::stream;
44use object_store::ObjectMeta;
45use object_store::ObjectStore;
46use vortex::VortexSessionDefault;
47use vortex::array::arrow::ArrowSessionExt;
48use vortex::array::memory::MemorySessionExt;
49use vortex::dtype::DType;
50use vortex::dtype::Nullability;
51use vortex::dtype::PType;
52use vortex::error::VortexExpect;
53use vortex::error::VortexResult;
54use vortex::error::vortex_err;
55use vortex::expr::stats;
56use vortex::expr::stats::Stat;
57use vortex::file::EOF_SIZE;
58use vortex::file::MAX_POSTSCRIPT_SIZE;
59use vortex::file::OpenOptionsSessionExt;
60use vortex::file::VORTEX_FILE_EXTENSION;
61use vortex::io::object_store::ObjectStoreReadAt;
62use vortex::io::session::RuntimeSessionExt;
63use vortex::scalar::Scalar;
64use vortex::scalar::ScalarValue as VortexScalarValue;
65use vortex::session::VortexSession;
66
67use super::cache::CachedVortexMetadata;
68use super::sink::VortexSink;
69use super::source::VortexSource;
70use crate::PrecisionExt as _;
71use crate::convert::TryToDataFusion;
72use crate::convert::stats::is_constant_to_distinct_count;
73
74const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
75
76pub struct VortexFormat {
124 session: VortexSession,
125 opts: VortexTableOptions,
126}
127
128impl Debug for VortexFormat {
129 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
130 f.debug_struct("VortexFormat")
131 .field("opts", &self.opts)
132 .finish()
133 }
134}
135
136config_namespace! {
137 pub struct VortexTableOptions {
158 pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
163 pub projection_pushdown: bool, default = false
169 pub scan_concurrency: Option<usize>, default = None
175 }
176}
177
178impl Eq for VortexTableOptions {}
179
180#[derive(Debug)]
219pub struct VortexFormatFactory {
220 session: VortexSession,
221 options: Option<VortexTableOptions>,
222}
223
224impl GetExt for VortexFormatFactory {
225 fn get_ext(&self) -> String {
226 VORTEX_FILE_EXTENSION.to_string()
227 }
228}
229
230impl VortexFormatFactory {
231 #[expect(
233 clippy::new_without_default,
234 reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
235 )]
236 pub fn new() -> Self {
237 Self {
238 session: VortexSession::default(),
239 options: None,
240 }
241 }
242
243 pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self {
249 Self {
250 session,
251 options: Some(options),
252 }
253 }
254
255 pub fn with_options(mut self, options: VortexTableOptions) -> Self {
272 self.options = Some(options);
273 self
274 }
275}
276
277impl FileFormatFactory for VortexFormatFactory {
278 #[expect(clippy::disallowed_types, reason = "required by trait signature")]
279 fn create(
280 &self,
281 _state: &dyn Session,
282 format_options: &std::collections::HashMap<String, String>,
283 ) -> DFResult<Arc<dyn FileFormat>> {
284 let mut opts = self.options.clone().unwrap_or_default();
285 for (key, value) in format_options {
286 if let Some(key) = key.strip_prefix("format.") {
287 opts.set(key, value)?;
288 } else {
289 tracing::trace!("Ignoring options '{key}'");
290 }
291 }
292
293 Ok(Arc::new(VortexFormat::new_with_options(
294 self.session.clone(),
295 opts,
296 )))
297 }
298
299 fn default(&self) -> Arc<dyn FileFormat> {
300 Arc::new(VortexFormat::new(self.session.clone()))
301 }
302
303 fn as_any(&self) -> &dyn Any {
304 self
305 }
306}
307
308impl VortexFormat {
309 pub fn new(session: VortexSession) -> Self {
316 Self::new_with_options(session, VortexTableOptions::default())
317 }
318
319 pub fn new_with_options(session: VortexSession, opts: VortexTableOptions) -> Self {
321 Self { session, opts }
322 }
323
324 pub fn options(&self) -> &VortexTableOptions {
327 &self.opts
328 }
329}
330
331#[async_trait]
332impl FileFormat for VortexFormat {
333 fn as_any(&self) -> &dyn Any {
334 self
335 }
336
337 fn compression_type(&self) -> Option<FileCompressionType> {
338 None
339 }
340
341 fn get_ext(&self) -> String {
342 VORTEX_FILE_EXTENSION.to_string()
343 }
344
345 fn get_ext_with_compression(
346 &self,
347 file_compression_type: &FileCompressionType,
348 ) -> DFResult<String> {
349 match file_compression_type.get_variant() {
350 CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
351 _ => Err(DataFusionError::Internal(
352 "Vortex does not support file level compression.".into(),
353 )),
354 }
355 }
356
357 async fn infer_schema(
358 &self,
359 state: &dyn Session,
360 store: &Arc<dyn ObjectStore>,
361 objects: &[ObjectMeta],
362 ) -> DFResult<SchemaRef> {
363 let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
364
365 let mut file_schemas = stream::iter(objects.iter().cloned())
366 .map(|object| {
367 let store = Arc::clone(store);
368 let session = self.session.clone();
369 let opts = self.opts.clone();
370 let cache = Arc::clone(&file_metadata_cache);
371
372 SpawnedTask::spawn(async move {
373 if let Some(entry) = cache.get(&object.location)
375 && entry.is_valid_for(&object)
376 && let Some(cached_vortex) = entry
377 .file_metadata
378 .as_any()
379 .downcast_ref::<CachedVortexMetadata>()
380 {
381 let inferred_schema = session
382 .arrow()
383 .to_arrow_schema(cached_vortex.footer().dtype())?;
384 return VortexResult::Ok((object.location, inferred_schema));
385 }
386
387 let reader = Arc::new(ObjectStoreReadAt::new_with_allocator(
389 store,
390 object.location.clone(),
391 session.handle(),
392 session.allocator(),
393 ));
394
395 let vxf = session
396 .open_options()
397 .with_initial_read_size(opts.footer_initial_read_size_bytes)
398 .with_file_size(object.size)
399 .open_read(reader)
400 .await?;
401
402 let cached_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
404 let entry = CachedFileMetadataEntry::new(object.clone(), cached_metadata);
405 cache.put(&object.location, entry);
406
407 let inferred_schema = session.arrow().to_arrow_schema(vxf.dtype())?;
408 VortexResult::Ok((object.location, inferred_schema))
409 })
410 .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
411 })
412 .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
413 .try_collect::<Vec<_>>()
414 .await
415 .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
416
417 file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
419 let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
420
421 Ok(Arc::new(Schema::try_merge(file_schemas)?))
422 }
423
424 #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
425 async fn infer_stats(
426 &self,
427 state: &dyn Session,
428 store: &Arc<dyn ObjectStore>,
429 table_schema: SchemaRef,
430 object: &ObjectMeta,
431 ) -> DFResult<Statistics> {
432 let object = object.clone();
433 let store = Arc::clone(store);
434 let session = self.session.clone();
435 let opts = self.opts.clone();
436 let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
437
438 SpawnedTask::spawn(async move {
439 let cached_metadata = file_metadata_cache
441 .get(&object.location)
442 .filter(|entry| entry.is_valid_for(&object))
443 .and_then(|entry| {
444 entry
445 .file_metadata
446 .as_any()
447 .downcast_ref::<CachedVortexMetadata>()
448 .map(|m| {
449 (
450 m.footer().dtype().clone(),
451 m.footer().statistics().cloned(),
452 m.footer().row_count(),
453 )
454 })
455 });
456
457 let (dtype, file_stats, row_count) = match cached_metadata {
458 Some(metadata) => metadata,
459 None => {
460 let reader = Arc::new(ObjectStoreReadAt::new_with_allocator(
462 store,
463 object.location.clone(),
464 session.handle(),
465 session.allocator(),
466 ));
467
468 let vxf = session
469 .open_options()
470 .with_initial_read_size(opts.footer_initial_read_size_bytes)
471 .with_file_size(object.size)
472 .open_read(reader)
473 .await
474 .map_err(|e| {
475 DataFusionError::Execution(format!(
476 "Failed to open Vortex file {}: {e}",
477 object.location
478 ))
479 })?;
480
481 let file_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
483 let entry = CachedFileMetadataEntry::new(object.clone(), file_metadata);
484 file_metadata_cache.put(&object.location, entry);
485
486 (
487 vxf.dtype().clone(),
488 vxf.file_stats().cloned(),
489 vxf.row_count(),
490 )
491 }
492 };
493
494 let struct_dtype = dtype
495 .as_struct_fields_opt()
496 .vortex_expect("dtype is not a struct");
497
498 let Some(file_stats) = file_stats else {
500 return Ok(Statistics {
502 num_rows: Precision::Exact(
503 usize::try_from(row_count)
504 .map_err(|_| vortex_err!("Row count overflow"))
505 .vortex_expect("Row count overflow"),
506 ),
507 total_byte_size: Precision::Absent,
508 column_statistics: vec![
509 ColumnStatistics::default();
510 table_schema.fields().len()
511 ],
512 });
513 };
514
515 let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
516
517 for field in table_schema.fields().iter() {
518 let Some(col_idx) = struct_dtype.find(field.name()) else {
521 column_statistics.push(ColumnStatistics::default());
523 continue;
524 };
525 let (stats_set, stats_dtype) = file_stats.get(col_idx);
526
527 let column_size =
529 stats_set.get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into());
530
531 let target_dtype =
532 session
533 .arrow()
534 .from_arrow_field(field.as_ref())
535 .map_err(|e| {
536 DataFusionError::Execution(format!(
537 "Failed to derive Vortex DType for field {}: {e}",
538 field.name()
539 ))
540 })?;
541 let min = scalar_stat_to_df(
542 Stat::Min,
543 stats_set.get(Stat::Min),
544 stats_dtype,
545 &target_dtype,
546 );
547 let max = scalar_stat_to_df(
548 Stat::Max,
549 stats_set.get(Stat::Max),
550 stats_dtype,
551 &target_dtype,
552 );
553
554 let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
555
556 column_statistics.push(ColumnStatistics {
557 null_count: null_count.to_df(),
558 min_value: min.to_df(),
559 max_value: max.to_df(),
560 sum_value: Precision::Absent,
561 distinct_count: is_constant_to_distinct_count(
562 stats_set.get_as::<bool>(
563 Stat::IsConstant,
564 &DType::Bool(Nullability::NonNullable),
565 ),
566 ),
567 byte_size: column_size.to_df(),
568 })
569 }
570
571 let total_byte_size = column_statistics
572 .iter()
573 .fold(Precision::Exact(0), |acc, cs| acc.add(&cs.byte_size));
574
575 Ok(Statistics {
576 num_rows: Precision::Exact(
577 usize::try_from(row_count)
578 .map_err(|_| vortex_err!("Row count overflow"))
579 .vortex_expect("Row count overflow"),
580 ),
581 total_byte_size,
582 column_statistics,
583 })
584 })
585 .await
586 .vortex_expect("Failed to spawn infer_stats")
587 }
588
589 async fn create_physical_plan(
590 &self,
591 state: &dyn Session,
592 file_scan_config: FileScanConfig,
593 ) -> DFResult<Arc<dyn ExecutionPlan>> {
594 let mut source = file_scan_config
595 .file_source()
596 .as_any()
597 .downcast_ref::<VortexSource>()
598 .cloned()
599 .ok_or_else(|| internal_datafusion_err!("Expected VortexSource"))?;
600
601 source = source
602 .with_file_metadata_cache(state.runtime_env().cache_manager.get_file_metadata_cache());
603
604 let conf = FileScanConfigBuilder::from(file_scan_config)
605 .with_source(Arc::new(source))
606 .build();
607
608 Ok(DataSourceExec::from_data_source(conf))
609 }
610
611 async fn create_writer_physical_plan(
612 &self,
613 input: Arc<dyn ExecutionPlan>,
614 _state: &dyn Session,
615 conf: FileSinkConfig,
616 order_requirements: Option<LexRequirement>,
617 ) -> DFResult<Arc<dyn ExecutionPlan>> {
618 if conf.insert_op != InsertOp::Append {
619 return not_impl_err!("Overwrites are not implemented yet for Vortex");
620 }
621
622 let schema = Arc::clone(conf.output_schema());
623 let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
624
625 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
626 }
627
628 fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
629 let mut source = VortexSource::new(table_schema, self.session.clone())
630 .with_projection_pushdown(self.opts.projection_pushdown);
631
632 if let Some(scan_concurrency) = self.opts.scan_concurrency {
633 source = source.with_scan_concurrency(scan_concurrency);
634 }
635
636 Arc::new(source) as _
637 }
638}
639
640fn scalar_stat_to_df(
641 stat: Stat,
642 value: Option<stats::Precision<VortexScalarValue>>,
643 stats_dtype: &DType,
644 target_dtype: &DType,
645) -> Option<stats::Precision<DFScalarValue>> {
646 let stat_dtype = stat.dtype(stats_dtype)?;
647
648 value?
649 .try_map(|stat_value| {
650 Scalar::try_new(stat_dtype, Some(stat_value))?
651 .cast(target_dtype)?
652 .try_to_df()
653 })
654 .ok()
655}
656
657#[cfg(test)]
658mod tests {
659
660 use super::*;
661 use crate::common_tests::TestSessionContext;
662
663 #[tokio::test]
664 async fn create_table() -> anyhow::Result<()> {
665 let ctx = TestSessionContext::default();
666
667 ctx.session
668 .sql(
669 "CREATE EXTERNAL TABLE my_tbl \
670 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
671 STORED AS vortex \
672 LOCATION 'table/'",
673 )
674 .await?;
675
676 assert!(ctx.session.table_exist("my_tbl")?);
677
678 Ok(())
679 }
680
681 #[tokio::test]
682 async fn configure_format_source() -> anyhow::Result<()> {
683 let ctx = TestSessionContext::default();
684
685 ctx.session
686 .sql(
687 "CREATE EXTERNAL TABLE my_tbl \
688 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
689 STORED AS vortex \
690 LOCATION 'table/' \
691 OPTIONS( footer_initial_read_size_bytes '12345', scan_concurrency '3' );",
692 )
693 .await?
694 .collect()
695 .await?;
696
697 Ok(())
698 }
699
700 #[test]
701 fn format_plumbs_footer_initial_read_size() {
702 let mut opts = VortexTableOptions::default();
703 opts.set("footer_initial_read_size_bytes", "12345").unwrap();
704
705 let format = VortexFormat::new_with_options(VortexSession::default(), opts);
706 assert_eq!(format.options().footer_initial_read_size_bytes, 12345);
707 }
708}