vortex_datafusion/persistent/
format.rs1use std::fmt::Debug;
5use std::fmt::Formatter;
6use std::sync::Arc;
7
8use arrow_schema::Schema;
9use arrow_schema::SchemaRef;
10use async_trait::async_trait;
11use datafusion_catalog::Session;
12use datafusion_common::ColumnStatistics;
13use datafusion_common::DataFusionError;
14use datafusion_common::GetExt;
15use datafusion_common::Result as DFResult;
16use datafusion_common::ScalarValue as DFScalarValue;
17use datafusion_common::Statistics;
18use datafusion_common::config::ConfigField;
19use datafusion_common::config_namespace;
20use datafusion_common::internal_datafusion_err;
21use datafusion_common::not_impl_err;
22use datafusion_common::parsers::CompressionTypeVariant;
23use datafusion_common::stats::Precision as DFPrecision;
24use datafusion_common_runtime::SpawnedTask;
25use datafusion_datasource::TableSchema;
26use datafusion_datasource::file::FileSource;
27use datafusion_datasource::file_compression_type::FileCompressionType;
28use datafusion_datasource::file_format::FileFormat;
29use datafusion_datasource::file_format::FileFormatFactory;
30use datafusion_datasource::file_scan_config::FileScanConfig;
31use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
32use datafusion_datasource::file_sink_config::FileSinkConfig;
33use datafusion_datasource::sink::DataSinkExec;
34use datafusion_datasource::source::DataSourceExec;
35use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry;
36use datafusion_expr::dml::InsertOp;
37use datafusion_physical_expr::LexRequirement;
38use datafusion_physical_plan::ExecutionPlan;
39use futures::FutureExt;
40use futures::StreamExt as _;
41use futures::TryStreamExt as _;
42use futures::stream;
43use object_store::ObjectMeta;
44use object_store::ObjectStore;
45use vortex::VortexSessionDefault;
46use vortex::array::arrow::ArrowSessionExt;
47use vortex::array::memory::MemorySessionExt;
48use vortex::dtype::DType;
49use vortex::dtype::Nullability;
50use vortex::dtype::PType;
51use vortex::error::VortexExpect;
52use vortex::error::VortexResult;
53use vortex::error::vortex_err;
54use vortex::expr::stats::Precision;
55use vortex::expr::stats::Stat;
56use vortex::file::EOF_SIZE;
57use vortex::file::MAX_POSTSCRIPT_SIZE;
58use vortex::file::OpenOptionsSessionExt;
59use vortex::file::VORTEX_FILE_EXTENSION;
60use vortex::io::object_store::ObjectStoreReadAt;
61use vortex::io::session::RuntimeSessionExt;
62use vortex::scalar::Scalar;
63use vortex::scalar::ScalarValue as VortexScalarValue;
64use vortex::session::VortexSession;
65
66use super::cache::CachedVortexMetadata;
67use super::sink::VortexSink;
68use super::source::VortexSource;
69use crate::PrecisionExt as _;
70use crate::convert::TryToDataFusion;
71use crate::convert::stats::is_constant_to_distinct_count;
72
73const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
74
75pub struct VortexFormat {
123 session: VortexSession,
124 opts: VortexTableOptions,
125}
126
127impl Debug for VortexFormat {
128 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
129 f.debug_struct("VortexFormat")
130 .field("opts", &self.opts)
131 .finish()
132 }
133}
134
135config_namespace! {
136 pub struct VortexTableOptions {
157 pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
162 pub projection_pushdown: bool, default = false
168 pub scan_concurrency: Option<usize>, default = None
174 }
175}
176
177impl Eq for VortexTableOptions {}
178
179#[derive(Debug)]
218pub struct VortexFormatFactory {
219 session: VortexSession,
220 options: Option<VortexTableOptions>,
221}
222
223impl GetExt for VortexFormatFactory {
224 fn get_ext(&self) -> String {
225 VORTEX_FILE_EXTENSION.to_string()
226 }
227}
228
229impl VortexFormatFactory {
230 #[expect(
232 clippy::new_without_default,
233 reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
234 )]
235 pub fn new() -> Self {
236 Self {
237 session: VortexSession::default(),
238 options: None,
239 }
240 }
241
242 pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self {
248 Self {
249 session,
250 options: Some(options),
251 }
252 }
253
254 pub fn with_options(mut self, options: VortexTableOptions) -> Self {
271 self.options = Some(options);
272 self
273 }
274}
275
276impl FileFormatFactory for VortexFormatFactory {
277 #[expect(clippy::disallowed_types, reason = "required by trait signature")]
278 fn create(
279 &self,
280 _state: &dyn Session,
281 format_options: &std::collections::HashMap<String, String>,
282 ) -> DFResult<Arc<dyn FileFormat>> {
283 let mut opts = self.options.clone().unwrap_or_default();
284 for (key, value) in format_options {
285 if let Some(key) = key.strip_prefix("format.") {
286 opts.set(key, value)?;
287 } else {
288 tracing::trace!("Ignoring option '{key}'");
289 }
290 }
291
292 Ok(Arc::new(VortexFormat::new_with_options(
293 self.session.clone(),
294 opts,
295 )))
296 }
297
298 fn default(&self) -> Arc<dyn FileFormat> {
299 Arc::new(VortexFormat::new(self.session.clone()))
300 }
301}
302
303impl VortexFormat {
304 pub fn new(session: VortexSession) -> Self {
311 Self::new_with_options(session, VortexTableOptions::default())
312 }
313
314 pub fn new_with_options(session: VortexSession, opts: VortexTableOptions) -> Self {
316 Self { session, opts }
317 }
318
319 pub fn options(&self) -> &VortexTableOptions {
322 &self.opts
323 }
324}
325
326#[async_trait]
327impl FileFormat for VortexFormat {
328 fn compression_type(&self) -> Option<FileCompressionType> {
329 None
330 }
331
332 fn get_ext(&self) -> String {
333 VORTEX_FILE_EXTENSION.to_string()
334 }
335
336 fn get_ext_with_compression(
337 &self,
338 file_compression_type: &FileCompressionType,
339 ) -> DFResult<String> {
340 match file_compression_type.get_variant() {
341 CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
342 _ => Err(DataFusionError::Internal(
343 "Vortex does not support file level compression.".into(),
344 )),
345 }
346 }
347
348 async fn infer_schema(
349 &self,
350 state: &dyn Session,
351 store: &Arc<dyn ObjectStore>,
352 objects: &[ObjectMeta],
353 ) -> DFResult<SchemaRef> {
354 let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
355
356 let mut file_schemas = stream::iter(objects.iter().cloned())
357 .map(|object| {
358 let store = Arc::clone(store);
359 let session = self.session.clone();
360 let opts = self.opts.clone();
361 let cache = Arc::clone(&file_metadata_cache);
362
363 SpawnedTask::spawn(async move {
364 if let Some(entry) = cache.get(&object.location)
366 && entry.is_valid_for(&object)
367 && let Some(cached_vortex) = entry
368 .file_metadata
369 .as_any()
370 .downcast_ref::<CachedVortexMetadata>()
371 {
372 let inferred_schema = session
373 .arrow()
374 .to_arrow_schema(cached_vortex.footer().dtype())?;
375 return VortexResult::Ok((object.location, inferred_schema));
376 }
377
378 let reader = Arc::new(ObjectStoreReadAt::new_with_allocator(
380 store,
381 object.location.clone(),
382 session.handle(),
383 session.allocator(),
384 ));
385
386 let vxf = session
387 .open_options()
388 .with_initial_read_size(opts.footer_initial_read_size_bytes)
389 .with_file_size(object.size)
390 .open_read(reader)
391 .await?;
392
393 let cached_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
395 let entry = CachedFileMetadataEntry::new(object.clone(), cached_metadata);
396 cache.put(&object.location, entry);
397
398 let inferred_schema = session.arrow().to_arrow_schema(vxf.dtype())?;
399 VortexResult::Ok((object.location, inferred_schema))
400 })
401 .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
402 })
403 .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
404 .try_collect::<Vec<_>>()
405 .await
406 .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
407
408 file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
410 let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
411
412 Ok(Arc::new(Schema::try_merge(file_schemas)?))
413 }
414
415 #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
416 async fn infer_stats(
417 &self,
418 state: &dyn Session,
419 store: &Arc<dyn ObjectStore>,
420 table_schema: SchemaRef,
421 object: &ObjectMeta,
422 ) -> DFResult<Statistics> {
423 let object = object.clone();
424 let store = Arc::clone(store);
425 let session = self.session.clone();
426 let opts = self.opts.clone();
427 let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
428
429 SpawnedTask::spawn(async move {
430 let cached_metadata = file_metadata_cache
432 .get(&object.location)
433 .filter(|entry| entry.is_valid_for(&object))
434 .and_then(|entry| {
435 entry
436 .file_metadata
437 .as_any()
438 .downcast_ref::<CachedVortexMetadata>()
439 .map(|m| {
440 (
441 m.footer().dtype().clone(),
442 m.footer().statistics().cloned(),
443 m.footer().row_count(),
444 )
445 })
446 });
447
448 let (dtype, file_stats, row_count) = match cached_metadata {
449 Some(metadata) => metadata,
450 None => {
451 let reader = Arc::new(ObjectStoreReadAt::new_with_allocator(
453 store,
454 object.location.clone(),
455 session.handle(),
456 session.allocator(),
457 ));
458
459 let vxf = session
460 .open_options()
461 .with_initial_read_size(opts.footer_initial_read_size_bytes)
462 .with_file_size(object.size)
463 .open_read(reader)
464 .await
465 .map_err(|e| {
466 DataFusionError::Execution(format!(
467 "Failed to open Vortex file {}: {e}",
468 object.location
469 ))
470 })?;
471
472 let file_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
474 let entry = CachedFileMetadataEntry::new(object.clone(), file_metadata);
475 file_metadata_cache.put(&object.location, entry);
476
477 (
478 vxf.dtype().clone(),
479 vxf.file_stats().cloned(),
480 vxf.row_count(),
481 )
482 }
483 };
484
485 let struct_dtype = dtype
486 .as_struct_fields_opt()
487 .vortex_expect("dtype is not a struct");
488
489 let Some(file_stats) = file_stats else {
491 return Ok(Statistics {
493 num_rows: DFPrecision::Exact(
494 usize::try_from(row_count)
495 .map_err(|_| vortex_err!("Row count overflow"))
496 .vortex_expect("Row count overflow"),
497 ),
498 total_byte_size: DFPrecision::Absent,
499 column_statistics: vec![
500 ColumnStatistics::default();
501 table_schema.fields().len()
502 ],
503 });
504 };
505
506 let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
507
508 for field in table_schema.fields().iter() {
509 let Some(col_idx) = struct_dtype.find(field.name()) else {
512 column_statistics.push(ColumnStatistics::default());
514 continue;
515 };
516 let (stats_set, stats_dtype) = file_stats.get(col_idx);
517
518 let column_size =
520 stats_set.get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into());
521
522 let target_dtype =
523 session
524 .arrow()
525 .from_arrow_field(field.as_ref())
526 .map_err(|e| {
527 DataFusionError::Execution(format!(
528 "Failed to derive Vortex DType for field {}: {e}",
529 field.name()
530 ))
531 })?;
532 let min = scalar_stat_to_df(
533 Stat::Min,
534 stats_set.get(Stat::Min),
535 stats_dtype,
536 &target_dtype,
537 );
538
539 let max = scalar_stat_to_df(
540 Stat::Max,
541 stats_set.get(Stat::Max),
542 stats_dtype,
543 &target_dtype,
544 );
545
546 let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
547
548 column_statistics.push(ColumnStatistics {
549 null_count: null_count.to_df(),
550 min_value: min.to_df(),
551 max_value: max.to_df(),
552 sum_value: DFPrecision::Absent,
553 distinct_count: is_constant_to_distinct_count(
554 stats_set.get_as::<bool>(
555 Stat::IsConstant,
556 &DType::Bool(Nullability::NonNullable),
557 ),
558 ),
559 byte_size: column_size.to_df(),
560 })
561 }
562
563 let total_byte_size = column_statistics
564 .iter()
565 .fold(DFPrecision::Exact(0), |acc, cs| acc.add(&cs.byte_size));
566
567 Ok(Statistics {
568 num_rows: DFPrecision::Exact(
569 usize::try_from(row_count)
570 .map_err(|_| vortex_err!("Row count overflow"))
571 .vortex_expect("Row count overflow"),
572 ),
573 total_byte_size,
574 column_statistics,
575 })
576 })
577 .await
578 .vortex_expect("Failed to spawn infer_stats")
579 }
580
581 async fn create_physical_plan(
582 &self,
583 state: &dyn Session,
584 file_scan_config: FileScanConfig,
585 ) -> DFResult<Arc<dyn ExecutionPlan>> {
586 let mut source = file_scan_config
587 .file_source()
588 .downcast_ref::<VortexSource>()
589 .cloned()
590 .ok_or_else(|| internal_datafusion_err!("Expected VortexSource"))?;
591
592 source = source
593 .with_file_metadata_cache(state.runtime_env().cache_manager.get_file_metadata_cache());
594
595 let conf = FileScanConfigBuilder::from(file_scan_config)
596 .with_source(Arc::new(source))
597 .build();
598
599 Ok(DataSourceExec::from_data_source(conf))
600 }
601
602 async fn create_writer_physical_plan(
603 &self,
604 input: Arc<dyn ExecutionPlan>,
605 _state: &dyn Session,
606 conf: FileSinkConfig,
607 order_requirements: Option<LexRequirement>,
608 ) -> DFResult<Arc<dyn ExecutionPlan>> {
609 if conf.insert_op != InsertOp::Append {
610 return not_impl_err!("Overwrites are not implemented yet for Vortex");
611 }
612
613 let schema = Arc::clone(conf.output_schema());
614 let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
615
616 Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
617 }
618
619 fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
620 let mut source = VortexSource::new(table_schema, self.session.clone())
621 .with_projection_pushdown(self.opts.projection_pushdown);
622
623 if let Some(scan_concurrency) = self.opts.scan_concurrency {
624 source = source.with_scan_concurrency(scan_concurrency);
625 }
626
627 Arc::new(source) as _
628 }
629}
630
631fn scalar_stat_to_df(
632 stat: Stat,
633 value: Precision<VortexScalarValue>,
634 stats_dtype: &DType,
635 target_dtype: &DType,
636) -> Precision<DFScalarValue> {
637 let Some(stat_dtype) = stat.dtype(stats_dtype) else {
638 return Precision::Absent;
639 };
640
641 value
642 .map(|stat_value| {
643 Scalar::try_new(stat_dtype, Some(stat_value))?
644 .cast(target_dtype)?
645 .try_to_df()
646 })
647 .transpose()
648 .unwrap_or(Precision::Absent)
649}
650
651#[cfg(test)]
652mod tests {
653
654 use super::*;
655 use crate::common_tests::TestSessionContext;
656
657 #[tokio::test]
658 async fn create_table() -> anyhow::Result<()> {
659 let ctx = TestSessionContext::default();
660
661 ctx.session
662 .sql(
663 "CREATE EXTERNAL TABLE my_tbl \
664 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
665 STORED AS vortex \
666 LOCATION 'table/'",
667 )
668 .await?;
669
670 assert!(ctx.session.table_exist("my_tbl")?);
671
672 Ok(())
673 }
674
675 #[tokio::test]
676 async fn configure_format_source() -> anyhow::Result<()> {
677 let ctx = TestSessionContext::default();
678
679 ctx.session
680 .sql(
681 "CREATE EXTERNAL TABLE my_tbl \
682 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
683 STORED AS vortex \
684 LOCATION 'table/' \
685 OPTIONS( footer_initial_read_size_bytes '12345', scan_concurrency '3' );",
686 )
687 .await?
688 .collect()
689 .await?;
690
691 Ok(())
692 }
693
694 #[test]
695 fn format_plumbs_footer_initial_read_size() {
696 let mut opts = VortexTableOptions::default();
697 opts.set("footer_initial_read_size_bytes", "12345").unwrap();
698
699 let format = VortexFormat::new_with_options(VortexSession::default(), opts);
700 assert_eq!(format.options().footer_initial_read_size_bytes, 12345);
701 }
702}