Skip to main content

vortex_datafusion/persistent/
format.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::sync::Arc;
8
9use arrow_schema::Schema;
10use arrow_schema::SchemaRef;
11use async_trait::async_trait;
12use datafusion_catalog::Session;
13use datafusion_common::ColumnStatistics;
14use datafusion_common::DataFusionError;
15use datafusion_common::GetExt;
16use datafusion_common::Result as DFResult;
17use datafusion_common::Statistics;
18use datafusion_common::config::ConfigField;
19use datafusion_common::config_namespace;
20use datafusion_common::internal_datafusion_err;
21use datafusion_common::not_impl_err;
22use datafusion_common::parsers::CompressionTypeVariant;
23use datafusion_common::stats::Precision;
24use datafusion_common_runtime::SpawnedTask;
25use datafusion_datasource::TableSchema;
26use datafusion_datasource::file::FileSource;
27use datafusion_datasource::file_compression_type::FileCompressionType;
28use datafusion_datasource::file_format::FileFormat;
29use datafusion_datasource::file_format::FileFormatFactory;
30use datafusion_datasource::file_scan_config::FileScanConfig;
31use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
32use datafusion_datasource::file_sink_config::FileSinkConfig;
33use datafusion_datasource::sink::DataSinkExec;
34use datafusion_datasource::source::DataSourceExec;
35use datafusion_expr::dml::InsertOp;
36use datafusion_physical_expr::LexRequirement;
37use datafusion_physical_plan::ExecutionPlan;
38use futures::FutureExt;
39use futures::StreamExt as _;
40use futures::TryStreamExt as _;
41use futures::stream;
42use object_store::ObjectMeta;
43use object_store::ObjectStore;
44use vortex::VortexSessionDefault;
45use vortex::dtype::DType;
46use vortex::dtype::Nullability;
47use vortex::dtype::PType;
48use vortex::dtype::arrow::FromArrowType;
49use vortex::error::VortexExpect;
50use vortex::error::VortexResult;
51use vortex::error::vortex_err;
52use vortex::expr::stats;
53use vortex::expr::stats::Stat;
54use vortex::file::EOF_SIZE;
55use vortex::file::MAX_POSTSCRIPT_SIZE;
56use vortex::file::OpenOptionsSessionExt;
57use vortex::file::VORTEX_FILE_EXTENSION;
58use vortex::io::object_store::ObjectStoreReadAt;
59use vortex::io::session::RuntimeSessionExt;
60use vortex::scalar::Scalar;
61use vortex::session::VortexSession;
62
63use super::cache::CachedVortexMetadata;
64use super::sink::VortexSink;
65use super::source::VortexSource;
66use crate::PrecisionExt as _;
67use crate::convert::TryToDataFusion;
68
69const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
70
71/// Vortex implementation of a DataFusion [`FileFormat`].
72pub struct VortexFormat {
73    session: VortexSession,
74    opts: VortexTableOptions,
75}
76
77impl Debug for VortexFormat {
78    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
79        f.debug_struct("VortexFormat")
80            .field("opts", &self.opts)
81            .finish()
82    }
83}
84
85config_namespace! {
86    /// Options to configure the [`VortexFormat`].
87    ///
88    /// Can be set through a DataFusion [`SessionConfig`].
89    ///
90    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
91    pub struct VortexTableOptions {
92        /// The number of bytes to read when parsing a file footer.
93        ///
94        /// Values smaller than `MAX_POSTSCRIPT_SIZE + EOF_SIZE` will be clamped to that minimum
95        /// during footer parsing.
96        pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
97        /// Whether to enable projection pushdown into the underlying Vortex scan.
98        ///
99        /// When enabled, projection expressions may be partially evaluated during
100        /// the scan. When disabled, Vortex reads only the referenced columns and
101        /// all expressions are evaluated after the scan.
102        pub projection_pushdown: bool, default = false
103        /// The intra-partition scan concurrency, controlling the number of row splits to process
104        /// concurrently per-thread within each file.
105        ///
106        /// This does not affect the overall parallelism
107        /// across partitions, which is controlled by DataFusion's execution configuration.
108        pub scan_concurrency: Option<usize>, default = None
109    }
110}
111
112impl Eq for VortexTableOptions {}
113
114/// Minimal factory to create [`VortexFormat`] instances.
115#[derive(Debug)]
116pub struct VortexFormatFactory {
117    session: VortexSession,
118    options: Option<VortexTableOptions>,
119}
120
121impl GetExt for VortexFormatFactory {
122    fn get_ext(&self) -> String {
123        VORTEX_FILE_EXTENSION.to_string()
124    }
125}
126
127impl VortexFormatFactory {
128    /// Creates a new instance with a default [`VortexSession`] and default options.
129    #[expect(
130        clippy::new_without_default,
131        reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
132    )]
133    pub fn new() -> Self {
134        Self {
135            session: VortexSession::default(),
136            options: None,
137        }
138    }
139
140    /// Creates a new instance with customized session and default options for all [`VortexFormat`] instances created from this factory.
141    ///
142    /// The options can be overridden by table-level configuration pass in [`FileFormatFactory::create`].
143    pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self {
144        Self {
145            session,
146            options: Some(options),
147        }
148    }
149
150    /// Override the default options for this factory.
151    ///
152    /// For example:
153    /// ```rust
154    /// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
155    ///
156    /// let factory = VortexFormatFactory::new().with_options(VortexTableOptions::default());
157    /// ```
158    pub fn with_options(mut self, options: VortexTableOptions) -> Self {
159        self.options = Some(options);
160        self
161    }
162}
163
164impl FileFormatFactory for VortexFormatFactory {
165    #[expect(clippy::disallowed_types, reason = "required by trait signature")]
166    fn create(
167        &self,
168        _state: &dyn Session,
169        format_options: &std::collections::HashMap<String, String>,
170    ) -> DFResult<Arc<dyn FileFormat>> {
171        let mut opts = self.options.clone().unwrap_or_default();
172        for (key, value) in format_options {
173            if let Some(key) = key.strip_prefix("format.") {
174                opts.set(key, value)?;
175            } else {
176                tracing::trace!("Ignoring options '{key}'");
177            }
178        }
179
180        Ok(Arc::new(VortexFormat::new_with_options(
181            self.session.clone(),
182            opts,
183        )))
184    }
185
186    fn default(&self) -> Arc<dyn FileFormat> {
187        Arc::new(VortexFormat::new(self.session.clone()))
188    }
189
190    fn as_any(&self) -> &dyn Any {
191        self
192    }
193}
194
195impl VortexFormat {
196    /// Create a new instance with default options.
197    pub fn new(session: VortexSession) -> Self {
198        Self::new_with_options(session, VortexTableOptions::default())
199    }
200
201    /// Creates a new instance with configured by a [`VortexTableOptions`].
202    pub fn new_with_options(session: VortexSession, opts: VortexTableOptions) -> Self {
203        Self { session, opts }
204    }
205
206    /// Return the format specific configuration
207    pub fn options(&self) -> &VortexTableOptions {
208        &self.opts
209    }
210}
211
212#[async_trait]
213impl FileFormat for VortexFormat {
214    fn as_any(&self) -> &dyn Any {
215        self
216    }
217
218    fn compression_type(&self) -> Option<FileCompressionType> {
219        None
220    }
221
222    fn get_ext(&self) -> String {
223        VORTEX_FILE_EXTENSION.to_string()
224    }
225
226    fn get_ext_with_compression(
227        &self,
228        file_compression_type: &FileCompressionType,
229    ) -> DFResult<String> {
230        match file_compression_type.get_variant() {
231            CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
232            _ => Err(DataFusionError::Internal(
233                "Vortex does not support file level compression.".into(),
234            )),
235        }
236    }
237
238    async fn infer_schema(
239        &self,
240        state: &dyn Session,
241        store: &Arc<dyn ObjectStore>,
242        objects: &[ObjectMeta],
243    ) -> DFResult<SchemaRef> {
244        let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
245
246        let mut file_schemas = stream::iter(objects.iter().cloned())
247            .map(|object| {
248                let store = store.clone();
249                let session = self.session.clone();
250                let opts = self.opts.clone();
251                let cache = file_metadata_cache.clone();
252
253                SpawnedTask::spawn(async move {
254                    // Check if we have cached metadata for this file
255                    if let Some(cached) = cache.get(&object)
256                        && let Some(cached_vortex) =
257                            cached.as_any().downcast_ref::<CachedVortexMetadata>()
258                    {
259                        let inferred_schema = cached_vortex.footer().dtype().to_arrow_schema()?;
260                        return VortexResult::Ok((object.location, inferred_schema));
261                    }
262
263                    // Not cached or invalid - open the file
264                    let reader = Arc::new(ObjectStoreReadAt::new(
265                        store,
266                        object.location.clone(),
267                        session.handle(),
268                    ));
269
270                    let vxf = session
271                        .open_options()
272                        .with_initial_read_size(opts.footer_initial_read_size_bytes)
273                        .with_file_size(object.size)
274                        .open_read(reader)
275                        .await?;
276
277                    // Cache the metadata
278                    let cached_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
279                    cache.put(&object, cached_metadata);
280
281                    let inferred_schema = vxf.dtype().to_arrow_schema()?;
282                    VortexResult::Ok((object.location, inferred_schema))
283                })
284                .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
285            })
286            .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
287            .try_collect::<Vec<_>>()
288            .await
289            .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
290
291        // Get consistent order of schemas for `Schema::try_merge`, as some filesystems don't have deterministic listing orders
292        file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
293        let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
294
295        Ok(Arc::new(Schema::try_merge(file_schemas)?))
296    }
297
298    #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
299    async fn infer_stats(
300        &self,
301        state: &dyn Session,
302        store: &Arc<dyn ObjectStore>,
303        table_schema: SchemaRef,
304        object: &ObjectMeta,
305    ) -> DFResult<Statistics> {
306        let object = object.clone();
307        let store = store.clone();
308        let session = self.session.clone();
309        let opts = self.opts.clone();
310        let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
311
312        SpawnedTask::spawn(async move {
313            // Try to get cached metadata first
314            let cached_metadata = file_metadata_cache.get(&object).and_then(|cached| {
315                cached
316                    .as_any()
317                    .downcast_ref::<CachedVortexMetadata>()
318                    .map(|m| {
319                        (
320                            m.footer().dtype().clone(),
321                            m.footer().statistics().cloned(),
322                            m.footer().row_count(),
323                        )
324                    })
325            });
326
327            let (dtype, file_stats, row_count) = match cached_metadata {
328                Some(metadata) => metadata,
329                None => {
330                    // Not cached - open the file
331                    let reader = Arc::new(ObjectStoreReadAt::new(
332                        store,
333                        object.location.clone(),
334                        session.handle(),
335                    ));
336
337                    let vxf = session
338                        .open_options()
339                        .with_initial_read_size(opts.footer_initial_read_size_bytes)
340                        .with_file_size(object.size)
341                        .open_read(reader)
342                        .await
343                        .map_err(|e| {
344                            DataFusionError::Execution(format!(
345                                "Failed to open Vortex file {}: {e}",
346                                object.location
347                            ))
348                        })?;
349
350                    // Cache the metadata
351                    let cached = Arc::new(CachedVortexMetadata::new(&vxf));
352                    file_metadata_cache.put(&object, cached);
353
354                    (
355                        vxf.dtype().clone(),
356                        vxf.file_stats().cloned(),
357                        vxf.row_count(),
358                    )
359                }
360            };
361
362            let struct_dtype = dtype
363                .as_struct_fields_opt()
364                .vortex_expect("dtype is not a struct");
365
366            // Evaluate the statistics for each column that we are able to return to DataFusion.
367            let Some(file_stats) = file_stats else {
368                // If the file has no column stats, the best we can do is return a row count.
369                return Ok(Statistics {
370                    num_rows: Precision::Exact(
371                        usize::try_from(row_count)
372                            .map_err(|_| vortex_err!("Row count overflow"))
373                            .vortex_expect("Row count overflow"),
374                    ),
375                    total_byte_size: Precision::Absent,
376                    column_statistics: vec![ColumnStatistics::default(); struct_dtype.nfields()],
377                });
378            };
379
380            let mut sum_of_column_byte_sizes = stats::Precision::exact(0_usize);
381            let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
382
383            for field in table_schema.fields().iter() {
384                // If the column does not exist, continue. This can happen if the schema has evolved
385                // but we have not yet updated the Vortex file.
386                let Some(col_idx) = struct_dtype.find(field.name()) else {
387                    // The default sets all statistics to `Precision<Absent>`.
388                    column_statistics.push(ColumnStatistics::default());
389                    continue;
390                };
391                let (stats_set, stats_dtype) = file_stats.get(col_idx);
392
393                // Update the total size in bytes.
394                let column_size = stats_set
395                    .get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into())
396                    .unwrap_or_else(|| stats::Precision::inexact(0_usize));
397                sum_of_column_byte_sizes = sum_of_column_byte_sizes
398                    .zip(column_size)
399                    .map(|(acc, size)| acc + size);
400
401                // TODO(connor): There's a lot that can go wrong here, should probably handle this
402                // more gracefully...
403                // Find the min statistic.
404                let min = stats_set.get(Stat::Min).and_then(|pstat_val| {
405                    pstat_val
406                        .map(|stat_val| {
407                            // Because of DataFusion's Schema evolution, it is possible that the
408                            // type of the min/max stat has changed. Thus we construct the stat as
409                            // the file datatype first and only then do we cast accordingly.
410                            Scalar::try_new(
411                                Stat::Min
412                                    .dtype(stats_dtype)
413                                    .vortex_expect("must have a valid dtype"),
414                                Some(stat_val),
415                            )
416                            .vortex_expect("`Stat::Min` somehow had an incompatible `DType`")
417                            .cast(&DType::from_arrow(field.as_ref()))
418                            .vortex_expect("Unable to cast to target type that DataFusion wants")
419                            .try_to_df()
420                            .ok()
421                        })
422                        .transpose()
423                });
424
425                // Find the max statistic.
426                let max = stats_set.get(Stat::Max).and_then(|pstat_val| {
427                    pstat_val
428                        .map(|stat_val| {
429                            Scalar::try_new(
430                                Stat::Max
431                                    .dtype(stats_dtype)
432                                    .vortex_expect("must have a valid dtype"),
433                                Some(stat_val),
434                            )
435                            .vortex_expect("`Stat::Max` somehow had an incompatible `DType`")
436                            .cast(&DType::from_arrow(field.as_ref()))
437                            .vortex_expect("Unable to cast to target type that DataFusion wants")
438                            .try_to_df()
439                            .ok()
440                        })
441                        .transpose()
442                });
443
444                let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
445
446                column_statistics.push(ColumnStatistics {
447                    null_count: null_count.to_df(),
448                    min_value: min.to_df(),
449                    max_value: max.to_df(),
450                    sum_value: Precision::Absent,
451                    distinct_count: stats_set
452                        .get_as::<bool>(Stat::IsConstant, &DType::Bool(Nullability::NonNullable))
453                        .and_then(|is_constant| is_constant.as_exact().map(|_| Precision::Exact(1)))
454                        .unwrap_or(Precision::Absent),
455                    // TODO(connor): Is this correct?
456                    byte_size: column_size.to_df(),
457                })
458            }
459
460            let total_byte_size = sum_of_column_byte_sizes.to_df();
461
462            Ok(Statistics {
463                num_rows: Precision::Exact(
464                    usize::try_from(row_count)
465                        .map_err(|_| vortex_err!("Row count overflow"))
466                        .vortex_expect("Row count overflow"),
467                ),
468                total_byte_size,
469                column_statistics,
470            })
471        })
472        .await
473        .vortex_expect("Failed to spawn infer_stats")
474    }
475
476    async fn create_physical_plan(
477        &self,
478        state: &dyn Session,
479        file_scan_config: FileScanConfig,
480    ) -> DFResult<Arc<dyn ExecutionPlan>> {
481        let mut source = file_scan_config
482            .file_source()
483            .as_any()
484            .downcast_ref::<VortexSource>()
485            .cloned()
486            .ok_or_else(|| internal_datafusion_err!("Expected VortexSource"))?;
487
488        source = source
489            .with_file_metadata_cache(state.runtime_env().cache_manager.get_file_metadata_cache());
490
491        let conf = FileScanConfigBuilder::from(file_scan_config)
492            .with_source(Arc::new(source))
493            .build();
494
495        Ok(DataSourceExec::from_data_source(conf))
496    }
497
498    async fn create_writer_physical_plan(
499        &self,
500        input: Arc<dyn ExecutionPlan>,
501        _state: &dyn Session,
502        conf: FileSinkConfig,
503        order_requirements: Option<LexRequirement>,
504    ) -> DFResult<Arc<dyn ExecutionPlan>> {
505        if conf.insert_op != InsertOp::Append {
506            return not_impl_err!("Overwrites are not implemented yet for Vortex");
507        }
508
509        let schema = conf.output_schema().clone();
510        let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
511
512        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
513    }
514
515    fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
516        let mut source = VortexSource::new(table_schema, self.session.clone())
517            .with_projection_pushdown(self.opts.projection_pushdown);
518
519        if let Some(scan_concurrency) = self.opts.scan_concurrency {
520            source = source.with_scan_concurrency(scan_concurrency);
521        }
522
523        Arc::new(source) as _
524    }
525}
526
527#[cfg(test)]
528mod tests {
529
530    use super::*;
531    use crate::common_tests::TestSessionContext;
532
533    #[tokio::test]
534    async fn create_table() -> anyhow::Result<()> {
535        let ctx = TestSessionContext::default();
536
537        ctx.session
538            .sql(
539                "CREATE EXTERNAL TABLE my_tbl \
540                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
541                STORED AS vortex  \
542                LOCATION 'table/'",
543            )
544            .await?;
545
546        assert!(ctx.session.table_exist("my_tbl")?);
547
548        Ok(())
549    }
550
551    #[tokio::test]
552    async fn configure_format_source() -> anyhow::Result<()> {
553        let ctx = TestSessionContext::default();
554
555        ctx.session
556            .sql(
557                "CREATE EXTERNAL TABLE my_tbl \
558                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
559                STORED AS vortex \
560                LOCATION 'table/' \
561                OPTIONS( footer_initial_read_size_bytes '12345', scan_concurrency '3' );",
562            )
563            .await?
564            .collect()
565            .await?;
566
567        Ok(())
568    }
569
570    #[test]
571    fn format_plumbs_footer_initial_read_size() {
572        let mut opts = VortexTableOptions::default();
573        opts.set("footer_initial_read_size_bytes", "12345").unwrap();
574
575        let format = VortexFormat::new_with_options(VortexSession::default(), opts);
576        assert_eq!(format.options().footer_initial_read_size_bytes, 12345);
577    }
578}