vortex_datafusion/persistent/
format.rs1use std::any::Any;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::sync::Arc;
8
9use arrow_schema::Schema;
10use arrow_schema::SchemaRef;
11use async_trait::async_trait;
12use datafusion_catalog::Session;
13use datafusion_common::ColumnStatistics;
14use datafusion_common::DataFusionError;
15use datafusion_common::GetExt;
16use datafusion_common::Result as DFResult;
17use datafusion_common::ScalarValue as DFScalarValue;
18use datafusion_common::Statistics;
19use datafusion_common::config::ConfigField;
20use datafusion_common::config_namespace;
21use datafusion_common::internal_datafusion_err;
22use datafusion_common::not_impl_err;
23use datafusion_common::parsers::CompressionTypeVariant;
24use datafusion_common::stats::Precision;
25use datafusion_common_runtime::SpawnedTask;
26use datafusion_datasource::TableSchema;
27use datafusion_datasource::file::FileSource;
28use datafusion_datasource::file_compression_type::FileCompressionType;
29use datafusion_datasource::file_format::FileFormat;
30use datafusion_datasource::file_format::FileFormatFactory;
31use datafusion_datasource::file_scan_config::FileScanConfig;
32use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
33use datafusion_datasource::file_sink_config::FileSinkConfig;
34use datafusion_datasource::sink::DataSinkExec;
35use datafusion_datasource::source::DataSourceExec;
36use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry;
37use datafusion_expr::dml::InsertOp;
38use datafusion_physical_expr::LexRequirement;
39use datafusion_physical_plan::ExecutionPlan;
40use futures::FutureExt;
41use futures::StreamExt as _;
42use futures::TryStreamExt as _;
43use futures::stream;
44use object_store::ObjectMeta;
45use object_store::ObjectStore;
46use vortex::VortexSessionDefault;
47use vortex::array::memory::MemorySessionExt;
48use vortex::dtype::DType;
49use vortex::dtype::Nullability;
50use vortex::dtype::PType;
51use vortex::dtype::arrow::FromArrowType;
52use vortex::error::VortexExpect;
53use vortex::error::VortexResult;
54use vortex::error::vortex_err;
55use vortex::expr::stats;
56use vortex::expr::stats::Stat;
57use vortex::file::EOF_SIZE;
58use vortex::file::MAX_POSTSCRIPT_SIZE;
59use vortex::file::OpenOptionsSessionExt;
60use vortex::file::VORTEX_FILE_EXTENSION;
61use vortex::io::object_store::ObjectStoreReadAt;
62use vortex::io::session::RuntimeSessionExt;
63use vortex::scalar::Scalar;
64use vortex::scalar::ScalarValue as VortexScalarValue;
65use vortex::session::VortexSession;
66
67use super::cache::CachedVortexMetadata;
68use super::sink::VortexSink;
69use super::source::VortexSource;
70use crate::PrecisionExt as _;
71use crate::convert::TryToDataFusion;
72use crate::convert::stats::is_constant_to_distinct_count;
73
74const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
75
76pub struct VortexFormat {
124 session: VortexSession,
125 opts: VortexTableOptions,
126}
127
128impl Debug for VortexFormat {
129 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
130 f.debug_struct("VortexFormat")
131 .field("opts", &self.opts)
132 .finish()
133 }
134}
135
136config_namespace! {
137 pub struct VortexTableOptions {
158 pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
163 pub projection_pushdown: bool, default = false
169 pub scan_concurrency: Option<usize>, default = None
175 }
176}
177
178impl Eq for VortexTableOptions {}
179
180#[derive(Debug)]
219pub struct VortexFormatFactory {
220 session: VortexSession,
221 options: Option<VortexTableOptions>,
222}
223
224impl GetExt for VortexFormatFactory {
225 fn get_ext(&self) -> String {
226 VORTEX_FILE_EXTENSION.to_string()
227 }
228}
229
230impl VortexFormatFactory {
231 #[expect(
233 clippy::new_without_default,
234 reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
235 )]
236 pub fn new() -> Self {
237 Self {
238 session: VortexSession::default(),
239 options: None,
240 }
241 }
242
243 pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self {
249 Self {
250 session,
251 options: Some(options),
252 }
253 }
254
255 pub fn with_options(mut self, options: VortexTableOptions) -> Self {
272 self.options = Some(options);
273 self
274 }
275}
276
277impl FileFormatFactory for VortexFormatFactory {
278 #[expect(clippy::disallowed_types, reason = "required by trait signature")]
279 fn create(
280 &self,
281 _state: &dyn Session,
282 format_options: &std::collections::HashMap<String, String>,
283 ) -> DFResult<Arc<dyn FileFormat>> {
284 let mut opts = self.options.clone().unwrap_or_default();
285 for (key, value) in format_options {
286 if let Some(key) = key.strip_prefix("format.") {
287 opts.set(key, value)?;
288 } else {
289 tracing::trace!("Ignoring options '{key}'");
290 }
291 }
292
293 Ok(Arc::new(VortexFormat::new_with_options(
294 self.session.clone(),
295 opts,
296 )))
297 }
298
299 fn default(&self) -> Arc<dyn FileFormat> {
300 Arc::new(VortexFormat::new(self.session.clone()))
301 }
302
303 fn as_any(&self) -> &dyn Any {
304 self
305 }
306}
307
308impl VortexFormat {
309 pub fn new(session: VortexSession) -> Self {
316 Self::new_with_options(session, VortexTableOptions::default())
317 }
318
319 pub fn new_with_options(session: VortexSession, opts: VortexTableOptions) -> Self {
321 Self { session, opts }
322 }
323
324 pub fn options(&self) -> &VortexTableOptions {
327 &self.opts
328 }
329}
330
331#[async_trait]
332impl FileFormat for VortexFormat {
333 fn as_any(&self) -> &dyn Any {
334 self
335 }
336
337 fn compression_type(&self) -> Option<FileCompressionType> {
338 None
339 }
340
341 fn get_ext(&self) -> String {
342 VORTEX_FILE_EXTENSION.to_string()
343 }
344
345 fn get_ext_with_compression(
346 &self,
347 file_compression_type: &FileCompressionType,
348 ) -> DFResult<String> {
349 match file_compression_type.get_variant() {
350 CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
351 _ => Err(DataFusionError::Internal(
352 "Vortex does not support file level compression.".into(),
353 )),
354 }
355 }
356
357 async fn infer_schema(
358 &self,
359 state: &dyn Session,
360 store: &Arc<dyn ObjectStore>,
361 objects: &[ObjectMeta],
362 ) -> DFResult<SchemaRef> {
363 let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
364
365 let mut file_schemas = stream::iter(objects.iter().cloned())
366 .map(|object| {
367 let store = Arc::clone(store);
368 let session = self.session.clone();
369 let opts = self.opts.clone();
370 let cache = Arc::clone(&file_metadata_cache);
371
372 SpawnedTask::spawn(async move {
373 if let Some(entry) = cache.get(&object.location)
375 && entry.is_valid_for(&object)
376 && let Some(cached_vortex) = entry
377 .file_metadata
378 .as_any()
379 .downcast_ref::<CachedVortexMetadata>()
380 {
381 let inferred_schema = cached_vortex.footer().dtype().to_arrow_schema()?;
382 return VortexResult::Ok((object.location, inferred_schema));
383 }
384
385 let reader = Arc::new(ObjectStoreReadAt::new_with_allocator(
387 store,
388 object.location.clone(),
389 session.handle(),
390 session.allocator(),
391 ));
392
393 let vxf = session
394 .open_options()
395 .with_initial_read_size(opts.footer_initial_read_size_bytes)
396 .with_file_size(object.size)
397 .open_read(reader)
398 .await?;
399
400 let cached_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
402 let entry = CachedFileMetadataEntry::new(object.clone(), cached_metadata);
403 cache.put(&object.location, entry);
404
405 let inferred_schema = vxf.dtype().to_arrow_schema()?;
406 VortexResult::Ok((object.location, inferred_schema))
407 })
408 .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
409 })
410 .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
411 .try_collect::<Vec<_>>()
412 .await
413 .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
414
415 file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
417 let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
418
419 Ok(Arc::new(Schema::try_merge(file_schemas)?))
420 }
421
422 #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
423 async fn infer_stats(
424 &self,
425 state: &dyn Session,
426 store: &Arc<dyn ObjectStore>,
427 table_schema: SchemaRef,
428 object: &ObjectMeta,
429 ) -> DFResult<Statistics> {
430 let object = object.clone();
431 let store = Arc::clone(store);
432 let session = self.session.clone();
433 let opts = self.opts.clone();
434 let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
435
436 SpawnedTask::spawn(async move {
437 let cached_metadata = file_metadata_cache
439 .get(&object.location)
440 .filter(|entry| entry.is_valid_for(&object))
441 .and_then(|entry| {
442 entry
443 .file_metadata
444 .as_any()
445 .downcast_ref::<CachedVortexMetadata>()
446 .map(|m| {
447 (
448 m.footer().dtype().clone(),
449 m.footer().statistics().cloned(),
450 m.footer().row_count(),
451 )
452 })
453 });
454
455 let (dtype, file_stats, row_count) = match cached_metadata {
456 Some(metadata) => metadata,
457 None => {
458 let reader = Arc::new(ObjectStoreReadAt::new_with_allocator(
460 store,
461 object.location.clone(),
462 session.handle(),
463 session.allocator(),
464 ));
465
466 let vxf = session
467 .open_options()
468 .with_initial_read_size(opts.footer_initial_read_size_bytes)
469 .with_file_size(object.size)
470 .open_read(reader)
471 .await
472 .map_err(|e| {
473 DataFusionError::Execution(format!(
474 "Failed to open Vortex file {}: {e}",
475 object.location
476 ))
477 })?;
478
479 let file_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
481 let entry = CachedFileMetadataEntry::new(object.clone(), file_metadata);
482 file_metadata_cache.put(&object.location, entry);
483
484 (
485 vxf.dtype().clone(),
486 vxf.file_stats().cloned(),
487 vxf.row_count(),
488 )
489 }
490 };
491
492 let struct_dtype = dtype
493 .as_struct_fields_opt()
494 .vortex_expect("dtype is not a struct");
495
496 let Some(file_stats) = file_stats else {
498 return Ok(Statistics {
500 num_rows: Precision::Exact(
501 usize::try_from(row_count)
502 .map_err(|_| vortex_err!("Row count overflow"))
503 .vortex_expect("Row count overflow"),
504 ),
505 total_byte_size: Precision::Absent,
506 column_statistics: vec![
507 ColumnStatistics::default();
508 table_schema.fields().len()
509 ],
510 });
511 };
512
513 let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
514
515 for field in table_schema.fields().iter() {
516 let Some(col_idx) = struct_dtype.find(field.name()) else {
519 column_statistics.push(ColumnStatistics::default());
521 continue;
522 };
523 let (stats_set, stats_dtype) = file_stats.get(col_idx);
524
525 let column_size =
527 stats_set.get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into());
528
529 let target_dtype = DType::from_arrow(field.as_ref());
530 let min = scalar_stat_to_df(
531 Stat::Min,
532 stats_set.get(Stat::Min),
533 stats_dtype,
534 &target_dtype,
535 );
536 let max = scalar_stat_to_df(
537 Stat::Max,
538 stats_set.get(Stat::Max),
539 stats_dtype,
540 &target_dtype,
541 );
542
543 let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
544
545 column_statistics.push(ColumnStatistics {
546 null_count: null_count.to_df(),
547 min_value: min.to_df(),
548 max_value: max.to_df(),
549 sum_value: Precision::Absent,
550 distinct_count: is_constant_to_distinct_count(
551 stats_set.get_as::<bool>(
552 Stat::IsConstant,
553 &DType::Bool(Nullability::NonNullable),
554 ),
555 ),
556 byte_size: column_size.to_df(),
557 })
558 }
559
560 let total_byte_size = column_statistics
561 .iter()
562 .fold(Precision::Exact(0), |acc, cs| acc.add(&cs.byte_size));
563
564 Ok(Statistics {
565 num_rows: Precision::Exact(
566 usize::try_from(row_count)
567 .map_err(|_| vortex_err!("Row count overflow"))
568 .vortex_expect("Row count overflow"),
569 ),
570 total_byte_size,
571 column_statistics,
572 })
573 })
574 .await
575 .vortex_expect("Failed to spawn infer_stats")
576 }
577
578 async fn create_physical_plan(
579 &self,
580 state: &dyn Session,
581 file_scan_config: FileScanConfig,
582 ) -> DFResult<Arc<dyn ExecutionPlan>> {
583 let mut source = file_scan_config
584 .file_source()
585 .as_any()
586 .downcast_ref::<VortexSource>()
587 .cloned()
588 .ok_or_else(|| internal_datafusion_err!("Expected VortexSource"))?;
589
590 source = source
591 .with_file_metadata_cache(state.runtime_env().cache_manager.get_file_metadata_cache());
592
593 let conf = FileScanConfigBuilder::from(file_scan_config)
594 .with_source(Arc::new(source))
595 .build();
596
597 Ok(DataSourceExec::from_data_source(conf))
598 }
599
600 async fn create_writer_physical_plan(
601 &self,
602 input: Arc<dyn ExecutionPlan>,
603 _state: &dyn Session,
604 conf: FileSinkConfig,
605 order_requirements: Option<LexRequirement>,
606 ) -> DFResult<Arc<dyn ExecutionPlan>> {
607 if conf.insert_op != InsertOp::Append {
608 return not_impl_err!("Overwrites are not implemented yet for Vortex");
609 }
610
611 let schema = Arc::clone(conf.output_schema());
612 let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
613
614 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
615 }
616
617 fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
618 let mut source = VortexSource::new(table_schema, self.session.clone())
619 .with_projection_pushdown(self.opts.projection_pushdown);
620
621 if let Some(scan_concurrency) = self.opts.scan_concurrency {
622 source = source.with_scan_concurrency(scan_concurrency);
623 }
624
625 Arc::new(source) as _
626 }
627}
628
629fn scalar_stat_to_df(
630 stat: Stat,
631 value: Option<stats::Precision<VortexScalarValue>>,
632 stats_dtype: &DType,
633 target_dtype: &DType,
634) -> Option<stats::Precision<DFScalarValue>> {
635 let stat_dtype = stat.dtype(stats_dtype)?;
636
637 value?
638 .try_map(|stat_value| {
639 Scalar::try_new(stat_dtype, Some(stat_value))?
640 .cast(target_dtype)?
641 .try_to_df()
642 })
643 .ok()
644}
645
646#[cfg(test)]
647mod tests {
648
649 use super::*;
650 use crate::common_tests::TestSessionContext;
651
652 #[tokio::test]
653 async fn create_table() -> anyhow::Result<()> {
654 let ctx = TestSessionContext::default();
655
656 ctx.session
657 .sql(
658 "CREATE EXTERNAL TABLE my_tbl \
659 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
660 STORED AS vortex \
661 LOCATION 'table/'",
662 )
663 .await?;
664
665 assert!(ctx.session.table_exist("my_tbl")?);
666
667 Ok(())
668 }
669
670 #[tokio::test]
671 async fn configure_format_source() -> anyhow::Result<()> {
672 let ctx = TestSessionContext::default();
673
674 ctx.session
675 .sql(
676 "CREATE EXTERNAL TABLE my_tbl \
677 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
678 STORED AS vortex \
679 LOCATION 'table/' \
680 OPTIONS( footer_initial_read_size_bytes '12345', scan_concurrency '3' );",
681 )
682 .await?
683 .collect()
684 .await?;
685
686 Ok(())
687 }
688
689 #[test]
690 fn format_plumbs_footer_initial_read_size() {
691 let mut opts = VortexTableOptions::default();
692 opts.set("footer_initial_read_size_bytes", "12345").unwrap();
693
694 let format = VortexFormat::new_with_options(VortexSession::default(), opts);
695 assert_eq!(format.options().footer_initial_read_size_bytes, 12345);
696 }
697}