Skip to main content

vortex_datafusion/persistent/
format.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::sync::Arc;
8
9use arrow_schema::Schema;
10use arrow_schema::SchemaRef;
11use async_trait::async_trait;
12use datafusion_catalog::Session;
13use datafusion_common::ColumnStatistics;
14use datafusion_common::DataFusionError;
15use datafusion_common::GetExt;
16use datafusion_common::Result as DFResult;
17use datafusion_common::Statistics;
18use datafusion_common::config::ConfigField;
19use datafusion_common::config_namespace;
20use datafusion_common::internal_datafusion_err;
21use datafusion_common::not_impl_err;
22use datafusion_common::parsers::CompressionTypeVariant;
23use datafusion_common::stats::Precision;
24use datafusion_common_runtime::SpawnedTask;
25use datafusion_datasource::TableSchema;
26use datafusion_datasource::file::FileSource;
27use datafusion_datasource::file_compression_type::FileCompressionType;
28use datafusion_datasource::file_format::FileFormat;
29use datafusion_datasource::file_format::FileFormatFactory;
30use datafusion_datasource::file_scan_config::FileScanConfig;
31use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
32use datafusion_datasource::file_sink_config::FileSinkConfig;
33use datafusion_datasource::sink::DataSinkExec;
34use datafusion_datasource::source::DataSourceExec;
35use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry;
36use datafusion_expr::dml::InsertOp;
37use datafusion_physical_expr::LexRequirement;
38use datafusion_physical_plan::ExecutionPlan;
39use futures::FutureExt;
40use futures::StreamExt as _;
41use futures::TryStreamExt as _;
42use futures::stream;
43use object_store::ObjectMeta;
44use object_store::ObjectStore;
45use vortex::VortexSessionDefault;
46use vortex::dtype::DType;
47use vortex::dtype::Nullability;
48use vortex::dtype::PType;
49use vortex::dtype::arrow::FromArrowType;
50use vortex::error::VortexExpect;
51use vortex::error::VortexResult;
52use vortex::error::vortex_err;
53use vortex::expr::stats;
54use vortex::expr::stats::Stat;
55use vortex::file::EOF_SIZE;
56use vortex::file::MAX_POSTSCRIPT_SIZE;
57use vortex::file::OpenOptionsSessionExt;
58use vortex::file::VORTEX_FILE_EXTENSION;
59use vortex::io::object_store::ObjectStoreReadAt;
60use vortex::io::session::RuntimeSessionExt;
61use vortex::scalar::Scalar;
62use vortex::session::VortexSession;
63
64use super::cache::CachedVortexMetadata;
65use super::sink::VortexSink;
66use super::source::VortexSource;
67use crate::PrecisionExt as _;
68use crate::convert::TryToDataFusion;
69
70const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
71
72/// Vortex implementation of a DataFusion [`FileFormat`].
73pub struct VortexFormat {
74    session: VortexSession,
75    opts: VortexTableOptions,
76}
77
78impl Debug for VortexFormat {
79    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
80        f.debug_struct("VortexFormat")
81            .field("opts", &self.opts)
82            .finish()
83    }
84}
85
86config_namespace! {
87    /// Options to configure the [`VortexFormat`].
88    ///
89    /// Can be set through a DataFusion [`SessionConfig`].
90    ///
91    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
92    pub struct VortexTableOptions {
93        /// The number of bytes to read when parsing a file footer.
94        ///
95        /// Values smaller than `MAX_POSTSCRIPT_SIZE + EOF_SIZE` will be clamped to that minimum
96        /// during footer parsing.
97        pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
98        /// Whether to enable projection pushdown into the underlying Vortex scan.
99        ///
100        /// When enabled, projection expressions may be partially evaluated during
101        /// the scan. When disabled, Vortex reads only the referenced columns and
102        /// all expressions are evaluated after the scan.
103        pub projection_pushdown: bool, default = false
104        /// The intra-partition scan concurrency, controlling the number of row splits to process
105        /// concurrently per-thread within each file.
106        ///
107        /// This does not affect the overall parallelism
108        /// across partitions, which is controlled by DataFusion's execution configuration.
109        pub scan_concurrency: Option<usize>, default = None
110    }
111}
112
113impl Eq for VortexTableOptions {}
114
115/// Minimal factory to create [`VortexFormat`] instances.
116#[derive(Debug)]
117pub struct VortexFormatFactory {
118    session: VortexSession,
119    options: Option<VortexTableOptions>,
120}
121
122impl GetExt for VortexFormatFactory {
123    fn get_ext(&self) -> String {
124        VORTEX_FILE_EXTENSION.to_string()
125    }
126}
127
128impl VortexFormatFactory {
129    /// Creates a new instance with a default [`VortexSession`] and default options.
130    #[expect(
131        clippy::new_without_default,
132        reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
133    )]
134    pub fn new() -> Self {
135        Self {
136            session: VortexSession::default(),
137            options: None,
138        }
139    }
140
141    /// Creates a new instance with customized session and default options for all [`VortexFormat`] instances created from this factory.
142    ///
143    /// The options can be overridden by table-level configuration pass in [`FileFormatFactory::create`].
144    pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self {
145        Self {
146            session,
147            options: Some(options),
148        }
149    }
150
151    /// Override the default options for this factory.
152    ///
153    /// For example:
154    /// ```rust
155    /// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
156    ///
157    /// let factory = VortexFormatFactory::new().with_options(VortexTableOptions::default());
158    /// ```
159    pub fn with_options(mut self, options: VortexTableOptions) -> Self {
160        self.options = Some(options);
161        self
162    }
163}
164
165impl FileFormatFactory for VortexFormatFactory {
166    #[expect(clippy::disallowed_types, reason = "required by trait signature")]
167    fn create(
168        &self,
169        _state: &dyn Session,
170        format_options: &std::collections::HashMap<String, String>,
171    ) -> DFResult<Arc<dyn FileFormat>> {
172        let mut opts = self.options.clone().unwrap_or_default();
173        for (key, value) in format_options {
174            if let Some(key) = key.strip_prefix("format.") {
175                opts.set(key, value)?;
176            } else {
177                tracing::trace!("Ignoring options '{key}'");
178            }
179        }
180
181        Ok(Arc::new(VortexFormat::new_with_options(
182            self.session.clone(),
183            opts,
184        )))
185    }
186
187    fn default(&self) -> Arc<dyn FileFormat> {
188        Arc::new(VortexFormat::new(self.session.clone()))
189    }
190
191    fn as_any(&self) -> &dyn Any {
192        self
193    }
194}
195
196impl VortexFormat {
197    /// Create a new instance with default options.
198    pub fn new(session: VortexSession) -> Self {
199        Self::new_with_options(session, VortexTableOptions::default())
200    }
201
202    /// Creates a new instance with configured by a [`VortexTableOptions`].
203    pub fn new_with_options(session: VortexSession, opts: VortexTableOptions) -> Self {
204        Self { session, opts }
205    }
206
207    /// Return the format specific configuration
208    pub fn options(&self) -> &VortexTableOptions {
209        &self.opts
210    }
211}
212
213#[async_trait]
214impl FileFormat for VortexFormat {
215    fn as_any(&self) -> &dyn Any {
216        self
217    }
218
219    fn compression_type(&self) -> Option<FileCompressionType> {
220        None
221    }
222
223    fn get_ext(&self) -> String {
224        VORTEX_FILE_EXTENSION.to_string()
225    }
226
227    fn get_ext_with_compression(
228        &self,
229        file_compression_type: &FileCompressionType,
230    ) -> DFResult<String> {
231        match file_compression_type.get_variant() {
232            CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
233            _ => Err(DataFusionError::Internal(
234                "Vortex does not support file level compression.".into(),
235            )),
236        }
237    }
238
239    async fn infer_schema(
240        &self,
241        state: &dyn Session,
242        store: &Arc<dyn ObjectStore>,
243        objects: &[ObjectMeta],
244    ) -> DFResult<SchemaRef> {
245        let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
246
247        let mut file_schemas = stream::iter(objects.iter().cloned())
248            .map(|object| {
249                let store = store.clone();
250                let session = self.session.clone();
251                let opts = self.opts.clone();
252                let cache = file_metadata_cache.clone();
253
254                SpawnedTask::spawn(async move {
255                    // Check if we have entry metadata for this file
256                    if let Some(entry) = cache.get(&object.location)
257                        && entry.is_valid_for(&object)
258                        && let Some(cached_vortex) = entry
259                            .file_metadata
260                            .as_any()
261                            .downcast_ref::<CachedVortexMetadata>()
262                    {
263                        let inferred_schema = cached_vortex.footer().dtype().to_arrow_schema()?;
264                        return VortexResult::Ok((object.location, inferred_schema));
265                    }
266
267                    // Not entry or invalid - open the file
268                    let reader = Arc::new(ObjectStoreReadAt::new(
269                        store,
270                        object.location.clone(),
271                        session.handle(),
272                    ));
273
274                    let vxf = session
275                        .open_options()
276                        .with_initial_read_size(opts.footer_initial_read_size_bytes)
277                        .with_file_size(object.size)
278                        .open_read(reader)
279                        .await?;
280
281                    // Cache the metadata
282                    let cached_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
283                    let entry = CachedFileMetadataEntry::new(object.clone(), cached_metadata);
284                    cache.put(&object.location, entry);
285
286                    let inferred_schema = vxf.dtype().to_arrow_schema()?;
287                    VortexResult::Ok((object.location, inferred_schema))
288                })
289                .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
290            })
291            .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
292            .try_collect::<Vec<_>>()
293            .await
294            .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
295
296        // Get consistent order of schemas for `Schema::try_merge`, as some filesystems don't have deterministic listing orders
297        file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
298        let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
299
300        Ok(Arc::new(Schema::try_merge(file_schemas)?))
301    }
302
303    #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
304    async fn infer_stats(
305        &self,
306        state: &dyn Session,
307        store: &Arc<dyn ObjectStore>,
308        table_schema: SchemaRef,
309        object: &ObjectMeta,
310    ) -> DFResult<Statistics> {
311        let object = object.clone();
312        let store = store.clone();
313        let session = self.session.clone();
314        let opts = self.opts.clone();
315        let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
316
317        SpawnedTask::spawn(async move {
318            // Try to get entry metadata first
319            let cached_metadata = file_metadata_cache
320                .get(&object.location)
321                .filter(|entry| entry.is_valid_for(&object))
322                .and_then(|entry| {
323                    entry
324                        .file_metadata
325                        .as_any()
326                        .downcast_ref::<CachedVortexMetadata>()
327                        .map(|m| {
328                            (
329                                m.footer().dtype().clone(),
330                                m.footer().statistics().cloned(),
331                                m.footer().row_count(),
332                            )
333                        })
334                });
335
336            let (dtype, file_stats, row_count) = match cached_metadata {
337                Some(metadata) => metadata,
338                None => {
339                    // Not entry - open the file
340                    let reader = Arc::new(ObjectStoreReadAt::new(
341                        store,
342                        object.location.clone(),
343                        session.handle(),
344                    ));
345
346                    let vxf = session
347                        .open_options()
348                        .with_initial_read_size(opts.footer_initial_read_size_bytes)
349                        .with_file_size(object.size)
350                        .open_read(reader)
351                        .await
352                        .map_err(|e| {
353                            DataFusionError::Execution(format!(
354                                "Failed to open Vortex file {}: {e}",
355                                object.location
356                            ))
357                        })?;
358
359                    // Cache the metadata
360                    let file_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
361                    let entry = CachedFileMetadataEntry::new(object.clone(), file_metadata);
362                    file_metadata_cache.put(&object.location, entry);
363
364                    (
365                        vxf.dtype().clone(),
366                        vxf.file_stats().cloned(),
367                        vxf.row_count(),
368                    )
369                }
370            };
371
372            let struct_dtype = dtype
373                .as_struct_fields_opt()
374                .vortex_expect("dtype is not a struct");
375
376            // Evaluate the statistics for each column that we are able to return to DataFusion.
377            let Some(file_stats) = file_stats else {
378                // If the file has no column stats, the best we can do is return a row count.
379                return Ok(Statistics {
380                    num_rows: Precision::Exact(
381                        usize::try_from(row_count)
382                            .map_err(|_| vortex_err!("Row count overflow"))
383                            .vortex_expect("Row count overflow"),
384                    ),
385                    total_byte_size: Precision::Absent,
386                    column_statistics: vec![ColumnStatistics::default(); struct_dtype.nfields()],
387                });
388            };
389
390            let mut sum_of_column_byte_sizes = stats::Precision::exact(0_usize);
391            let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
392
393            for field in table_schema.fields().iter() {
394                // If the column does not exist, continue. This can happen if the schema has evolved
395                // but we have not yet updated the Vortex file.
396                let Some(col_idx) = struct_dtype.find(field.name()) else {
397                    // The default sets all statistics to `Precision<Absent>`.
398                    column_statistics.push(ColumnStatistics::default());
399                    continue;
400                };
401                let (stats_set, stats_dtype) = file_stats.get(col_idx);
402
403                // Update the total size in bytes.
404                let column_size = stats_set
405                    .get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into())
406                    .unwrap_or_else(|| stats::Precision::inexact(0_usize));
407                sum_of_column_byte_sizes = sum_of_column_byte_sizes
408                    .zip(column_size)
409                    .map(|(acc, size)| acc + size);
410
411                // TODO(connor): There's a lot that can go wrong here, should probably handle this
412                // more gracefully...
413                // Find the min statistic.
414                let min = stats_set.get(Stat::Min).and_then(|pstat_val| {
415                    pstat_val
416                        .map(|stat_val| {
417                            // Because of DataFusion's Schema evolution, it is possible that the
418                            // type of the min/max stat has changed. Thus we construct the stat as
419                            // the file datatype first and only then do we cast accordingly.
420                            Scalar::try_new(
421                                Stat::Min
422                                    .dtype(stats_dtype)
423                                    .vortex_expect("must have a valid dtype"),
424                                Some(stat_val),
425                            )
426                            .vortex_expect("`Stat::Min` somehow had an incompatible `DType`")
427                            .cast(&DType::from_arrow(field.as_ref()))
428                            .vortex_expect("Unable to cast to target type that DataFusion wants")
429                            .try_to_df()
430                            .ok()
431                        })
432                        .transpose()
433                });
434
435                // Find the max statistic.
436                let max = stats_set.get(Stat::Max).and_then(|pstat_val| {
437                    pstat_val
438                        .map(|stat_val| {
439                            Scalar::try_new(
440                                Stat::Max
441                                    .dtype(stats_dtype)
442                                    .vortex_expect("must have a valid dtype"),
443                                Some(stat_val),
444                            )
445                            .vortex_expect("`Stat::Max` somehow had an incompatible `DType`")
446                            .cast(&DType::from_arrow(field.as_ref()))
447                            .vortex_expect("Unable to cast to target type that DataFusion wants")
448                            .try_to_df()
449                            .ok()
450                        })
451                        .transpose()
452                });
453
454                let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
455
456                column_statistics.push(ColumnStatistics {
457                    null_count: null_count.to_df(),
458                    min_value: min.to_df(),
459                    max_value: max.to_df(),
460                    sum_value: Precision::Absent,
461                    distinct_count: stats_set
462                        .get_as::<bool>(Stat::IsConstant, &DType::Bool(Nullability::NonNullable))
463                        .and_then(|is_constant| is_constant.as_exact().map(|_| Precision::Exact(1)))
464                        .unwrap_or(Precision::Absent),
465                    // TODO(connor): Is this correct?
466                    byte_size: column_size.to_df(),
467                })
468            }
469
470            let total_byte_size = sum_of_column_byte_sizes.to_df();
471
472            Ok(Statistics {
473                num_rows: Precision::Exact(
474                    usize::try_from(row_count)
475                        .map_err(|_| vortex_err!("Row count overflow"))
476                        .vortex_expect("Row count overflow"),
477                ),
478                total_byte_size,
479                column_statistics,
480            })
481        })
482        .await
483        .vortex_expect("Failed to spawn infer_stats")
484    }
485
486    async fn create_physical_plan(
487        &self,
488        state: &dyn Session,
489        file_scan_config: FileScanConfig,
490    ) -> DFResult<Arc<dyn ExecutionPlan>> {
491        let mut source = file_scan_config
492            .file_source()
493            .as_any()
494            .downcast_ref::<VortexSource>()
495            .cloned()
496            .ok_or_else(|| internal_datafusion_err!("Expected VortexSource"))?;
497
498        source = source
499            .with_file_metadata_cache(state.runtime_env().cache_manager.get_file_metadata_cache());
500
501        let conf = FileScanConfigBuilder::from(file_scan_config)
502            .with_source(Arc::new(source))
503            .build();
504
505        Ok(DataSourceExec::from_data_source(conf))
506    }
507
508    async fn create_writer_physical_plan(
509        &self,
510        input: Arc<dyn ExecutionPlan>,
511        _state: &dyn Session,
512        conf: FileSinkConfig,
513        order_requirements: Option<LexRequirement>,
514    ) -> DFResult<Arc<dyn ExecutionPlan>> {
515        if conf.insert_op != InsertOp::Append {
516            return not_impl_err!("Overwrites are not implemented yet for Vortex");
517        }
518
519        let schema = conf.output_schema().clone();
520        let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
521
522        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
523    }
524
525    fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
526        let mut source = VortexSource::new(table_schema, self.session.clone())
527            .with_projection_pushdown(self.opts.projection_pushdown);
528
529        if let Some(scan_concurrency) = self.opts.scan_concurrency {
530            source = source.with_scan_concurrency(scan_concurrency);
531        }
532
533        Arc::new(source) as _
534    }
535}
536
537#[cfg(test)]
538mod tests {
539
540    use super::*;
541    use crate::common_tests::TestSessionContext;
542
543    #[tokio::test]
544    async fn create_table() -> anyhow::Result<()> {
545        let ctx = TestSessionContext::default();
546
547        ctx.session
548            .sql(
549                "CREATE EXTERNAL TABLE my_tbl \
550                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
551                STORED AS vortex  \
552                LOCATION 'table/'",
553            )
554            .await?;
555
556        assert!(ctx.session.table_exist("my_tbl")?);
557
558        Ok(())
559    }
560
561    #[tokio::test]
562    async fn configure_format_source() -> anyhow::Result<()> {
563        let ctx = TestSessionContext::default();
564
565        ctx.session
566            .sql(
567                "CREATE EXTERNAL TABLE my_tbl \
568                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
569                STORED AS vortex \
570                LOCATION 'table/' \
571                OPTIONS( footer_initial_read_size_bytes '12345', scan_concurrency '3' );",
572            )
573            .await?
574            .collect()
575            .await?;
576
577        Ok(())
578    }
579
580    #[test]
581    fn format_plumbs_footer_initial_read_size() {
582        let mut opts = VortexTableOptions::default();
583        opts.set("footer_initial_read_size_bytes", "12345").unwrap();
584
585        let format = VortexFormat::new_with_options(VortexSession::default(), opts);
586        assert_eq!(format.options().footer_initial_read_size_bytes, 12345);
587    }
588}