Skip to main content

vortex_datafusion/persistent/
format.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::sync::Arc;
8
9use arrow_schema::Schema;
10use arrow_schema::SchemaRef;
11use async_trait::async_trait;
12use datafusion_catalog::Session;
13use datafusion_common::ColumnStatistics;
14use datafusion_common::DataFusionError;
15use datafusion_common::GetExt;
16use datafusion_common::Result as DFResult;
17use datafusion_common::Statistics;
18use datafusion_common::config::ConfigField;
19use datafusion_common::config_namespace;
20use datafusion_common::internal_datafusion_err;
21use datafusion_common::not_impl_err;
22use datafusion_common::parsers::CompressionTypeVariant;
23use datafusion_common::stats::Precision;
24use datafusion_common_runtime::SpawnedTask;
25use datafusion_datasource::TableSchema;
26use datafusion_datasource::file::FileSource;
27use datafusion_datasource::file_compression_type::FileCompressionType;
28use datafusion_datasource::file_format::FileFormat;
29use datafusion_datasource::file_format::FileFormatFactory;
30use datafusion_datasource::file_scan_config::FileScanConfig;
31use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
32use datafusion_datasource::file_sink_config::FileSinkConfig;
33use datafusion_datasource::sink::DataSinkExec;
34use datafusion_datasource::source::DataSourceExec;
35use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry;
36use datafusion_expr::dml::InsertOp;
37use datafusion_physical_expr::LexRequirement;
38use datafusion_physical_plan::ExecutionPlan;
39use futures::FutureExt;
40use futures::StreamExt as _;
41use futures::TryStreamExt as _;
42use futures::stream;
43use object_store::ObjectMeta;
44use object_store::ObjectStore;
45use vortex::VortexSessionDefault;
46use vortex::array::memory::MemorySessionExt;
47use vortex::dtype::DType;
48use vortex::dtype::Nullability;
49use vortex::dtype::PType;
50use vortex::dtype::arrow::FromArrowType;
51use vortex::error::VortexExpect;
52use vortex::error::VortexResult;
53use vortex::error::vortex_err;
54use vortex::expr::stats;
55use vortex::expr::stats::Stat;
56use vortex::file::EOF_SIZE;
57use vortex::file::MAX_POSTSCRIPT_SIZE;
58use vortex::file::OpenOptionsSessionExt;
59use vortex::file::VORTEX_FILE_EXTENSION;
60use vortex::io::object_store::ObjectStoreReadAt;
61use vortex::io::session::RuntimeSessionExt;
62use vortex::scalar::Scalar;
63use vortex::session::VortexSession;
64
65use super::cache::CachedVortexMetadata;
66use super::sink::VortexSink;
67use super::source::VortexSource;
68use crate::PrecisionExt as _;
69use crate::convert::TryToDataFusion;
70
71const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
72
73/// Vortex implementation of a DataFusion [`FileFormat`].
74pub struct VortexFormat {
75    session: VortexSession,
76    opts: VortexTableOptions,
77}
78
79impl Debug for VortexFormat {
80    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
81        f.debug_struct("VortexFormat")
82            .field("opts", &self.opts)
83            .finish()
84    }
85}
86
87config_namespace! {
88    /// Options to configure the [`VortexFormat`].
89    ///
90    /// Can be set through a DataFusion [`SessionConfig`].
91    ///
92    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
93    pub struct VortexTableOptions {
94        /// The number of bytes to read when parsing a file footer.
95        ///
96        /// Values smaller than `MAX_POSTSCRIPT_SIZE + EOF_SIZE` will be clamped to that minimum
97        /// during footer parsing.
98        pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
99        /// Whether to enable projection pushdown into the underlying Vortex scan.
100        ///
101        /// When enabled, projection expressions may be partially evaluated during
102        /// the scan. When disabled, Vortex reads only the referenced columns and
103        /// all expressions are evaluated after the scan.
104        pub projection_pushdown: bool, default = false
105        /// The intra-partition scan concurrency, controlling the number of row splits to process
106        /// concurrently per-thread within each file.
107        ///
108        /// This does not affect the overall parallelism
109        /// across partitions, which is controlled by DataFusion's execution configuration.
110        pub scan_concurrency: Option<usize>, default = None
111    }
112}
113
114impl Eq for VortexTableOptions {}
115
116/// Minimal factory to create [`VortexFormat`] instances.
117#[derive(Debug)]
118pub struct VortexFormatFactory {
119    session: VortexSession,
120    options: Option<VortexTableOptions>,
121}
122
123impl GetExt for VortexFormatFactory {
124    fn get_ext(&self) -> String {
125        VORTEX_FILE_EXTENSION.to_string()
126    }
127}
128
129impl VortexFormatFactory {
130    /// Creates a new instance with a default [`VortexSession`] and default options.
131    #[expect(
132        clippy::new_without_default,
133        reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
134    )]
135    pub fn new() -> Self {
136        Self {
137            session: VortexSession::default(),
138            options: None,
139        }
140    }
141
142    /// Creates a new instance with customized session and default options for all [`VortexFormat`] instances created from this factory.
143    ///
144    /// The options can be overridden by table-level configuration pass in [`FileFormatFactory::create`].
145    pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self {
146        Self {
147            session,
148            options: Some(options),
149        }
150    }
151
152    /// Override the default options for this factory.
153    ///
154    /// For example:
155    /// ```rust
156    /// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
157    ///
158    /// let factory = VortexFormatFactory::new().with_options(VortexTableOptions::default());
159    /// ```
160    pub fn with_options(mut self, options: VortexTableOptions) -> Self {
161        self.options = Some(options);
162        self
163    }
164}
165
166impl FileFormatFactory for VortexFormatFactory {
167    #[expect(clippy::disallowed_types, reason = "required by trait signature")]
168    fn create(
169        &self,
170        _state: &dyn Session,
171        format_options: &std::collections::HashMap<String, String>,
172    ) -> DFResult<Arc<dyn FileFormat>> {
173        let mut opts = self.options.clone().unwrap_or_default();
174        for (key, value) in format_options {
175            if let Some(key) = key.strip_prefix("format.") {
176                opts.set(key, value)?;
177            } else {
178                tracing::trace!("Ignoring options '{key}'");
179            }
180        }
181
182        Ok(Arc::new(VortexFormat::new_with_options(
183            self.session.clone(),
184            opts,
185        )))
186    }
187
188    fn default(&self) -> Arc<dyn FileFormat> {
189        Arc::new(VortexFormat::new(self.session.clone()))
190    }
191
192    fn as_any(&self) -> &dyn Any {
193        self
194    }
195}
196
197impl VortexFormat {
198    /// Create a new instance with default options.
199    pub fn new(session: VortexSession) -> Self {
200        Self::new_with_options(session, VortexTableOptions::default())
201    }
202
203    /// Creates a new instance with configured by a [`VortexTableOptions`].
204    pub fn new_with_options(session: VortexSession, opts: VortexTableOptions) -> Self {
205        Self { session, opts }
206    }
207
208    /// Return the format specific configuration
209    pub fn options(&self) -> &VortexTableOptions {
210        &self.opts
211    }
212}
213
214#[async_trait]
215impl FileFormat for VortexFormat {
216    fn as_any(&self) -> &dyn Any {
217        self
218    }
219
220    fn compression_type(&self) -> Option<FileCompressionType> {
221        None
222    }
223
224    fn get_ext(&self) -> String {
225        VORTEX_FILE_EXTENSION.to_string()
226    }
227
228    fn get_ext_with_compression(
229        &self,
230        file_compression_type: &FileCompressionType,
231    ) -> DFResult<String> {
232        match file_compression_type.get_variant() {
233            CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
234            _ => Err(DataFusionError::Internal(
235                "Vortex does not support file level compression.".into(),
236            )),
237        }
238    }
239
240    async fn infer_schema(
241        &self,
242        state: &dyn Session,
243        store: &Arc<dyn ObjectStore>,
244        objects: &[ObjectMeta],
245    ) -> DFResult<SchemaRef> {
246        let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
247
248        let mut file_schemas = stream::iter(objects.iter().cloned())
249            .map(|object| {
250                let store = Arc::clone(store);
251                let session = self.session.clone();
252                let opts = self.opts.clone();
253                let cache = Arc::clone(&file_metadata_cache);
254
255                SpawnedTask::spawn(async move {
256                    // Check if we have entry metadata for this file
257                    if let Some(entry) = cache.get(&object.location)
258                        && entry.is_valid_for(&object)
259                        && let Some(cached_vortex) = entry
260                            .file_metadata
261                            .as_any()
262                            .downcast_ref::<CachedVortexMetadata>()
263                    {
264                        let inferred_schema = cached_vortex.footer().dtype().to_arrow_schema()?;
265                        return VortexResult::Ok((object.location, inferred_schema));
266                    }
267
268                    // Not entry or invalid - open the file
269                    let reader = Arc::new(ObjectStoreReadAt::new_with_allocator(
270                        store,
271                        object.location.clone(),
272                        session.handle(),
273                        session.allocator(),
274                    ));
275
276                    let vxf = session
277                        .open_options()
278                        .with_initial_read_size(opts.footer_initial_read_size_bytes)
279                        .with_file_size(object.size)
280                        .open_read(reader)
281                        .await?;
282
283                    // Cache the metadata
284                    let cached_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
285                    let entry = CachedFileMetadataEntry::new(object.clone(), cached_metadata);
286                    cache.put(&object.location, entry);
287
288                    let inferred_schema = vxf.dtype().to_arrow_schema()?;
289                    VortexResult::Ok((object.location, inferred_schema))
290                })
291                .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
292            })
293            .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
294            .try_collect::<Vec<_>>()
295            .await
296            .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
297
298        // Get consistent order of schemas for `Schema::try_merge`, as some filesystems don't have deterministic listing orders
299        file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
300        let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
301
302        Ok(Arc::new(Schema::try_merge(file_schemas)?))
303    }
304
305    #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
306    async fn infer_stats(
307        &self,
308        state: &dyn Session,
309        store: &Arc<dyn ObjectStore>,
310        table_schema: SchemaRef,
311        object: &ObjectMeta,
312    ) -> DFResult<Statistics> {
313        let object = object.clone();
314        let store = Arc::clone(store);
315        let session = self.session.clone();
316        let opts = self.opts.clone();
317        let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
318
319        SpawnedTask::spawn(async move {
320            // Try to get entry metadata first
321            let cached_metadata = file_metadata_cache
322                .get(&object.location)
323                .filter(|entry| entry.is_valid_for(&object))
324                .and_then(|entry| {
325                    entry
326                        .file_metadata
327                        .as_any()
328                        .downcast_ref::<CachedVortexMetadata>()
329                        .map(|m| {
330                            (
331                                m.footer().dtype().clone(),
332                                m.footer().statistics().cloned(),
333                                m.footer().row_count(),
334                            )
335                        })
336                });
337
338            let (dtype, file_stats, row_count) = match cached_metadata {
339                Some(metadata) => metadata,
340                None => {
341                    // Not entry - open the file
342                    let reader = Arc::new(ObjectStoreReadAt::new_with_allocator(
343                        store,
344                        object.location.clone(),
345                        session.handle(),
346                        session.allocator(),
347                    ));
348
349                    let vxf = session
350                        .open_options()
351                        .with_initial_read_size(opts.footer_initial_read_size_bytes)
352                        .with_file_size(object.size)
353                        .open_read(reader)
354                        .await
355                        .map_err(|e| {
356                            DataFusionError::Execution(format!(
357                                "Failed to open Vortex file {}: {e}",
358                                object.location
359                            ))
360                        })?;
361
362                    // Cache the metadata
363                    let file_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
364                    let entry = CachedFileMetadataEntry::new(object.clone(), file_metadata);
365                    file_metadata_cache.put(&object.location, entry);
366
367                    (
368                        vxf.dtype().clone(),
369                        vxf.file_stats().cloned(),
370                        vxf.row_count(),
371                    )
372                }
373            };
374
375            let struct_dtype = dtype
376                .as_struct_fields_opt()
377                .vortex_expect("dtype is not a struct");
378
379            // Evaluate the statistics for each column that we are able to return to DataFusion.
380            let Some(file_stats) = file_stats else {
381                // If the file has no column stats, the best we can do is return a row count.
382                return Ok(Statistics {
383                    num_rows: Precision::Exact(
384                        usize::try_from(row_count)
385                            .map_err(|_| vortex_err!("Row count overflow"))
386                            .vortex_expect("Row count overflow"),
387                    ),
388                    total_byte_size: Precision::Absent,
389                    column_statistics: vec![ColumnStatistics::default(); struct_dtype.nfields()],
390                });
391            };
392
393            let mut sum_of_column_byte_sizes = stats::Precision::exact(0_usize);
394            let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
395
396            for field in table_schema.fields().iter() {
397                // If the column does not exist, continue. This can happen if the schema has evolved
398                // but we have not yet updated the Vortex file.
399                let Some(col_idx) = struct_dtype.find(field.name()) else {
400                    // The default sets all statistics to `Precision<Absent>`.
401                    column_statistics.push(ColumnStatistics::default());
402                    continue;
403                };
404                let (stats_set, stats_dtype) = file_stats.get(col_idx);
405
406                // Update the total size in bytes.
407                let column_size = stats_set
408                    .get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into())
409                    .unwrap_or_else(|| stats::Precision::inexact(0_usize));
410                sum_of_column_byte_sizes = sum_of_column_byte_sizes
411                    .zip(column_size)
412                    .map(|(acc, size)| acc + size);
413
414                // TODO(connor): There's a lot that can go wrong here, should probably handle this
415                // more gracefully...
416                // Find the min statistic.
417                let min = stats_set.get(Stat::Min).and_then(|pstat_val| {
418                    pstat_val
419                        .map(|stat_val| {
420                            // Because of DataFusion's Schema evolution, it is possible that the
421                            // type of the min/max stat has changed. Thus we construct the stat as
422                            // the file datatype first and only then do we cast accordingly.
423                            Scalar::try_new(
424                                Stat::Min
425                                    .dtype(stats_dtype)
426                                    .vortex_expect("must have a valid dtype"),
427                                Some(stat_val),
428                            )
429                            .vortex_expect("`Stat::Min` somehow had an incompatible `DType`")
430                            .cast(&DType::from_arrow(field.as_ref()))
431                            .vortex_expect("Unable to cast to target type that DataFusion wants")
432                            .try_to_df()
433                            .ok()
434                        })
435                        .transpose()
436                });
437
438                // Find the max statistic.
439                let max = stats_set.get(Stat::Max).and_then(|pstat_val| {
440                    pstat_val
441                        .map(|stat_val| {
442                            Scalar::try_new(
443                                Stat::Max
444                                    .dtype(stats_dtype)
445                                    .vortex_expect("must have a valid dtype"),
446                                Some(stat_val),
447                            )
448                            .vortex_expect("`Stat::Max` somehow had an incompatible `DType`")
449                            .cast(&DType::from_arrow(field.as_ref()))
450                            .vortex_expect("Unable to cast to target type that DataFusion wants")
451                            .try_to_df()
452                            .ok()
453                        })
454                        .transpose()
455                });
456
457                let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
458
459                column_statistics.push(ColumnStatistics {
460                    null_count: null_count.to_df(),
461                    min_value: min.to_df(),
462                    max_value: max.to_df(),
463                    sum_value: Precision::Absent,
464                    distinct_count: stats_set
465                        .get_as::<bool>(Stat::IsConstant, &DType::Bool(Nullability::NonNullable))
466                        .and_then(|is_constant| is_constant.as_exact().map(|_| Precision::Exact(1)))
467                        .unwrap_or(Precision::Absent),
468                    // TODO(connor): Is this correct?
469                    byte_size: column_size.to_df(),
470                })
471            }
472
473            let total_byte_size = sum_of_column_byte_sizes.to_df();
474
475            Ok(Statistics {
476                num_rows: Precision::Exact(
477                    usize::try_from(row_count)
478                        .map_err(|_| vortex_err!("Row count overflow"))
479                        .vortex_expect("Row count overflow"),
480                ),
481                total_byte_size,
482                column_statistics,
483            })
484        })
485        .await
486        .vortex_expect("Failed to spawn infer_stats")
487    }
488
489    async fn create_physical_plan(
490        &self,
491        state: &dyn Session,
492        file_scan_config: FileScanConfig,
493    ) -> DFResult<Arc<dyn ExecutionPlan>> {
494        let mut source = file_scan_config
495            .file_source()
496            .as_any()
497            .downcast_ref::<VortexSource>()
498            .cloned()
499            .ok_or_else(|| internal_datafusion_err!("Expected VortexSource"))?;
500
501        source = source
502            .with_file_metadata_cache(state.runtime_env().cache_manager.get_file_metadata_cache());
503
504        let conf = FileScanConfigBuilder::from(file_scan_config)
505            .with_source(Arc::new(source))
506            .build();
507
508        Ok(DataSourceExec::from_data_source(conf))
509    }
510
511    async fn create_writer_physical_plan(
512        &self,
513        input: Arc<dyn ExecutionPlan>,
514        _state: &dyn Session,
515        conf: FileSinkConfig,
516        order_requirements: Option<LexRequirement>,
517    ) -> DFResult<Arc<dyn ExecutionPlan>> {
518        if conf.insert_op != InsertOp::Append {
519            return not_impl_err!("Overwrites are not implemented yet for Vortex");
520        }
521
522        let schema = Arc::clone(conf.output_schema());
523        let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
524
525        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
526    }
527
528    fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
529        let mut source = VortexSource::new(table_schema, self.session.clone())
530            .with_projection_pushdown(self.opts.projection_pushdown);
531
532        if let Some(scan_concurrency) = self.opts.scan_concurrency {
533            source = source.with_scan_concurrency(scan_concurrency);
534        }
535
536        Arc::new(source) as _
537    }
538}
539
540#[cfg(test)]
541mod tests {
542
543    use super::*;
544    use crate::common_tests::TestSessionContext;
545
546    #[tokio::test]
547    async fn create_table() -> anyhow::Result<()> {
548        let ctx = TestSessionContext::default();
549
550        ctx.session
551            .sql(
552                "CREATE EXTERNAL TABLE my_tbl \
553                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
554                STORED AS vortex  \
555                LOCATION 'table/'",
556            )
557            .await?;
558
559        assert!(ctx.session.table_exist("my_tbl")?);
560
561        Ok(())
562    }
563
564    #[tokio::test]
565    async fn configure_format_source() -> anyhow::Result<()> {
566        let ctx = TestSessionContext::default();
567
568        ctx.session
569            .sql(
570                "CREATE EXTERNAL TABLE my_tbl \
571                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
572                STORED AS vortex \
573                LOCATION 'table/' \
574                OPTIONS( footer_initial_read_size_bytes '12345', scan_concurrency '3' );",
575            )
576            .await?
577            .collect()
578            .await?;
579
580        Ok(())
581    }
582
583    #[test]
584    fn format_plumbs_footer_initial_read_size() {
585        let mut opts = VortexTableOptions::default();
586        opts.set("footer_initial_read_size_bytes", "12345").unwrap();
587
588        let format = VortexFormat::new_with_options(VortexSession::default(), opts);
589        assert_eq!(format.options().footer_initial_read_size_bytes, 12345);
590    }
591}