Skip to main content

vortex_datafusion/persistent/
format.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::fmt::Formatter;
6use std::sync::Arc;
7
8use arrow_schema::Schema;
9use arrow_schema::SchemaRef;
10use async_trait::async_trait;
11use datafusion_catalog::Session;
12use datafusion_common::ColumnStatistics;
13use datafusion_common::DataFusionError;
14use datafusion_common::GetExt;
15use datafusion_common::Result as DFResult;
16use datafusion_common::ScalarValue as DFScalarValue;
17use datafusion_common::Statistics;
18use datafusion_common::config::ConfigField;
19use datafusion_common::config_namespace;
20use datafusion_common::internal_datafusion_err;
21use datafusion_common::not_impl_err;
22use datafusion_common::parsers::CompressionTypeVariant;
23use datafusion_common::stats::Precision as DFPrecision;
24use datafusion_common_runtime::SpawnedTask;
25use datafusion_datasource::TableSchema;
26use datafusion_datasource::file::FileSource;
27use datafusion_datasource::file_compression_type::FileCompressionType;
28use datafusion_datasource::file_format::FileFormat;
29use datafusion_datasource::file_format::FileFormatFactory;
30use datafusion_datasource::file_scan_config::FileScanConfig;
31use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
32use datafusion_datasource::file_sink_config::FileSinkConfig;
33use datafusion_datasource::sink::DataSinkExec;
34use datafusion_datasource::source::DataSourceExec;
35use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry;
36use datafusion_expr::dml::InsertOp;
37use datafusion_physical_expr::LexRequirement;
38use datafusion_physical_plan::ExecutionPlan;
39use futures::FutureExt;
40use futures::StreamExt as _;
41use futures::TryStreamExt as _;
42use futures::stream;
43use object_store::ObjectMeta;
44use object_store::ObjectStore;
45use vortex::VortexSessionDefault;
46use vortex::array::arrow::ArrowSessionExt;
47use vortex::array::memory::MemorySessionExt;
48use vortex::dtype::DType;
49use vortex::dtype::Nullability;
50use vortex::dtype::PType;
51use vortex::error::VortexExpect;
52use vortex::error::VortexResult;
53use vortex::error::vortex_err;
54use vortex::expr::stats::Precision;
55use vortex::expr::stats::Stat;
56use vortex::file::EOF_SIZE;
57use vortex::file::MAX_POSTSCRIPT_SIZE;
58use vortex::file::OpenOptionsSessionExt;
59use vortex::file::VORTEX_FILE_EXTENSION;
60use vortex::io::object_store::ObjectStoreReadAt;
61use vortex::io::session::RuntimeSessionExt;
62use vortex::scalar::Scalar;
63use vortex::scalar::ScalarValue as VortexScalarValue;
64use vortex::session::VortexSession;
65
66use super::cache::CachedVortexMetadata;
67use super::sink::VortexSink;
68use super::source::VortexSource;
69use crate::PrecisionExt as _;
70use crate::convert::TryToDataFusion;
71use crate::convert::stats::is_constant_to_distinct_count;
72
73const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
74
75/// DataFusion [`FileFormat`] implementation for `.vortex` files.
76///
77/// Most applications do not construct `VortexFormat` directly. Instead, they
78/// register [`VortexFormatFactory`] with a [`SessionContext`] and let
79/// DataFusion instantiate `VortexFormat` as tables are planned.
80///
81/// Construct `VortexFormat` directly when you are wiring a [`ListingTable`] by
82/// hand and need to pass a file format into [`ListingOptions`].
83///
84/// # Example
85///
86/// ```no_run
87/// use std::sync::Arc;
88///
89/// use datafusion::datasource::listing::ListingOptions;
90/// use datafusion::datasource::listing::ListingTable;
91/// use datafusion::datasource::listing::ListingTableConfig;
92/// use datafusion::datasource::listing::ListingTableUrl;
93/// use datafusion::prelude::SessionContext;
94/// use tempfile::tempdir;
95/// use vortex::VortexSessionDefault;
96/// use vortex::session::VortexSession;
97/// use vortex_datafusion::VortexFormat;
98///
99/// # #[tokio::main]
100/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
101/// let ctx = SessionContext::new();
102/// let dir = tempdir()?;
103///
104/// let format = Arc::new(VortexFormat::new(VortexSession::default()));
105/// let table_url = ListingTableUrl::parse(dir.path().to_str().unwrap())?;
106/// let config = ListingTableConfig::new(table_url)
107///     .with_listing_options(
108///         ListingOptions::new(format).with_session_config_options(ctx.state().config()),
109///     )
110///     .infer_schema(&ctx.state())
111///     .await?;
112///
113/// let table = ListingTable::try_new(config)?;
114/// # let _ = table;
115/// # Ok(())
116/// # }
117/// ```
118///
119/// [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionContext.html
120/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
121/// [`ListingOptions`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingOptions.html
122pub struct VortexFormat {
123    session: VortexSession,
124    opts: VortexTableOptions,
125}
126
127impl Debug for VortexFormat {
128    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
129        f.debug_struct("VortexFormat")
130            .field("opts", &self.opts)
131            .finish()
132    }
133}
134
135config_namespace! {
136    /// Options to configure [`VortexFormat`] and [`VortexSource`].
137    ///
138    /// These options are usually set on a [`VortexFormatFactory`] and inherited
139    /// by the `VortexFormat` / `VortexSource` instances created for individual
140    /// tables.
141    ///
142    /// # Example
143    ///
144    /// ```rust
145    /// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
146    ///
147    /// let factory = VortexFormatFactory::new().with_options(VortexTableOptions {
148    ///     projection_pushdown: true,
149    ///     scan_concurrency: Some(8),
150    ///     ..Default::default()
151    /// });
152    /// # let _ = factory;
153    /// ```
154    ///
155    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
156    pub struct VortexTableOptions {
157        /// The number of bytes to read when parsing a file footer.
158        ///
159        /// Values smaller than `MAX_POSTSCRIPT_SIZE + EOF_SIZE` will be clamped to that minimum
160        /// during footer parsing.
161        pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
162        /// Whether to enable projection pushdown into the underlying Vortex scan.
163        ///
164        /// When enabled, projection expressions may be partially evaluated during
165        /// the scan. When disabled, Vortex reads only the referenced columns and
166        /// all expressions are evaluated after the scan.
167        pub projection_pushdown: bool, default = false
168        /// The intra-partition scan concurrency, controlling the number of row splits to process
169        /// concurrently per-thread within each file.
170        ///
171        /// This does not affect the overall parallelism
172        /// across partitions, which is controlled by DataFusion's execution configuration.
173        pub scan_concurrency: Option<usize>, default = None
174    }
175}
176
177impl Eq for VortexTableOptions {}
178
179/// Registration entry point for the file-backed Vortex integration.
180///
181/// `VortexFormatFactory` is the type most applications use. Register it with a
182/// DataFusion session, and DataFusion will create [`VortexFormat`] values for
183/// `CREATE EXTERNAL TABLE`, [`ListingTable`], and URL-table scans.
184///
185/// The factory stores a [`VortexSession`] and default [`VortexTableOptions`].
186/// Those defaults are copied into the formats and sources created for each
187/// table.
188///
189/// # Example
190///
191/// ```no_run
192/// use std::sync::Arc;
193///
194/// use datafusion::datasource::provider::DefaultTableFactory;
195/// use datafusion::execution::SessionStateBuilder;
196/// use datafusion_common::GetExt;
197/// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
198///
199/// let factory = Arc::new(VortexFormatFactory::new().with_options(VortexTableOptions {
200///     projection_pushdown: true,
201///     ..Default::default()
202/// }));
203///
204/// let mut state_builder = SessionStateBuilder::new()
205///     .with_default_features()
206///     .with_table_factory(
207///         factory.get_ext().to_uppercase(),
208///         Arc::new(DefaultTableFactory::new()),
209///     );
210///
211/// if let Some(file_formats) = state_builder.file_formats() {
212///     file_formats.push(factory.clone() as _);
213/// }
214/// ```
215///
216/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
217#[derive(Debug)]
218pub struct VortexFormatFactory {
219    session: VortexSession,
220    options: Option<VortexTableOptions>,
221}
222
223impl GetExt for VortexFormatFactory {
224    fn get_ext(&self) -> String {
225        VORTEX_FILE_EXTENSION.to_string()
226    }
227}
228
229impl VortexFormatFactory {
230    /// Creates a factory with a default [`VortexSession`] and default options.
231    #[expect(
232        clippy::new_without_default,
233        reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
234    )]
235    pub fn new() -> Self {
236        Self {
237            session: VortexSession::default(),
238            options: None,
239        }
240    }
241
242    /// Creates a factory with an explicit session and default options.
243    ///
244    /// The supplied options become the baseline for every [`VortexFormat`]
245    /// created by this factory. DataFusion may still override them with
246    /// table-level options passed into [`FileFormatFactory::create`].
247    pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self {
248        Self {
249            session,
250            options: Some(options),
251        }
252    }
253
254    /// Overrides the default options for this factory.
255    ///
256    /// This is the usual way to turn on features such as projection pushdown for
257    /// every table created through the factory.
258    ///
259    /// # Example
260    ///
261    /// ```rust
262    /// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
263    ///
264    /// let factory = VortexFormatFactory::new().with_options(VortexTableOptions {
265    ///     projection_pushdown: true,
266    ///     ..Default::default()
267    /// });
268    /// # let _ = factory;
269    /// ```
270    pub fn with_options(mut self, options: VortexTableOptions) -> Self {
271        self.options = Some(options);
272        self
273    }
274}
275
276impl FileFormatFactory for VortexFormatFactory {
277    #[expect(clippy::disallowed_types, reason = "required by trait signature")]
278    fn create(
279        &self,
280        _state: &dyn Session,
281        format_options: &std::collections::HashMap<String, String>,
282    ) -> DFResult<Arc<dyn FileFormat>> {
283        let mut opts = self.options.clone().unwrap_or_default();
284        for (key, value) in format_options {
285            if let Some(key) = key.strip_prefix("format.") {
286                opts.set(key, value)?;
287            } else {
288                tracing::trace!("Ignoring option '{key}'");
289            }
290        }
291
292        Ok(Arc::new(VortexFormat::new_with_options(
293            self.session.clone(),
294            opts,
295        )))
296    }
297
298    fn default(&self) -> Arc<dyn FileFormat> {
299        Arc::new(VortexFormat::new(self.session.clone()))
300    }
301}
302
303impl VortexFormat {
304    /// Creates a format with default [`VortexTableOptions`].
305    ///
306    /// Prefer [`VortexFormatFactory`] when registering with a session. Construct
307    /// `VortexFormat` directly when building [`ListingOptions`] manually.
308    ///
309    /// [`ListingOptions`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingOptions.html
310    pub fn new(session: VortexSession) -> Self {
311        Self::new_with_options(session, VortexTableOptions::default())
312    }
313
314    /// Creates a format with explicit [`VortexTableOptions`].
315    pub fn new_with_options(session: VortexSession, opts: VortexTableOptions) -> Self {
316        Self { session, opts }
317    }
318
319    /// Returns the format-specific configuration that will be copied into the
320    /// [`VortexSource`] created for a scan.
321    pub fn options(&self) -> &VortexTableOptions {
322        &self.opts
323    }
324}
325
326#[async_trait]
327impl FileFormat for VortexFormat {
328    fn compression_type(&self) -> Option<FileCompressionType> {
329        None
330    }
331
332    fn get_ext(&self) -> String {
333        VORTEX_FILE_EXTENSION.to_string()
334    }
335
336    fn get_ext_with_compression(
337        &self,
338        file_compression_type: &FileCompressionType,
339    ) -> DFResult<String> {
340        match file_compression_type.get_variant() {
341            CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
342            _ => Err(DataFusionError::Internal(
343                "Vortex does not support file level compression.".into(),
344            )),
345        }
346    }
347
348    async fn infer_schema(
349        &self,
350        state: &dyn Session,
351        store: &Arc<dyn ObjectStore>,
352        objects: &[ObjectMeta],
353    ) -> DFResult<SchemaRef> {
354        let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
355
356        let mut file_schemas = stream::iter(objects.iter().cloned())
357            .map(|object| {
358                let store = Arc::clone(store);
359                let session = self.session.clone();
360                let opts = self.opts.clone();
361                let cache = Arc::clone(&file_metadata_cache);
362
363                SpawnedTask::spawn(async move {
364                    // Check if we have entry metadata for this file
365                    if let Some(entry) = cache.get(&object.location)
366                        && entry.is_valid_for(&object)
367                        && let Some(cached_vortex) = entry
368                            .file_metadata
369                            .as_any()
370                            .downcast_ref::<CachedVortexMetadata>()
371                    {
372                        let inferred_schema = session
373                            .arrow()
374                            .to_arrow_schema(cached_vortex.footer().dtype())?;
375                        return VortexResult::Ok((object.location, inferred_schema));
376                    }
377
378                    // Not entry or invalid - open the file
379                    let reader = Arc::new(ObjectStoreReadAt::new_with_allocator(
380                        store,
381                        object.location.clone(),
382                        session.handle(),
383                        session.allocator(),
384                    ));
385
386                    let vxf = session
387                        .open_options()
388                        .with_initial_read_size(opts.footer_initial_read_size_bytes)
389                        .with_file_size(object.size)
390                        .open_read(reader)
391                        .await?;
392
393                    // Cache the metadata
394                    let cached_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
395                    let entry = CachedFileMetadataEntry::new(object.clone(), cached_metadata);
396                    cache.put(&object.location, entry);
397
398                    let inferred_schema = session.arrow().to_arrow_schema(vxf.dtype())?;
399                    VortexResult::Ok((object.location, inferred_schema))
400                })
401                .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
402            })
403            .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
404            .try_collect::<Vec<_>>()
405            .await
406            .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
407
408        // Get consistent order of schemas for `Schema::try_merge`, as some filesystems don't have deterministic listing orders
409        file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
410        let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
411
412        Ok(Arc::new(Schema::try_merge(file_schemas)?))
413    }
414
415    #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
416    async fn infer_stats(
417        &self,
418        state: &dyn Session,
419        store: &Arc<dyn ObjectStore>,
420        table_schema: SchemaRef,
421        object: &ObjectMeta,
422    ) -> DFResult<Statistics> {
423        let object = object.clone();
424        let store = Arc::clone(store);
425        let session = self.session.clone();
426        let opts = self.opts.clone();
427        let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
428
429        SpawnedTask::spawn(async move {
430            // Try to get entry metadata first
431            let cached_metadata = file_metadata_cache
432                .get(&object.location)
433                .filter(|entry| entry.is_valid_for(&object))
434                .and_then(|entry| {
435                    entry
436                        .file_metadata
437                        .as_any()
438                        .downcast_ref::<CachedVortexMetadata>()
439                        .map(|m| {
440                            (
441                                m.footer().dtype().clone(),
442                                m.footer().statistics().cloned(),
443                                m.footer().row_count(),
444                            )
445                        })
446                });
447
448            let (dtype, file_stats, row_count) = match cached_metadata {
449                Some(metadata) => metadata,
450                None => {
451                    // Not entry - open the file
452                    let reader = Arc::new(ObjectStoreReadAt::new_with_allocator(
453                        store,
454                        object.location.clone(),
455                        session.handle(),
456                        session.allocator(),
457                    ));
458
459                    let vxf = session
460                        .open_options()
461                        .with_initial_read_size(opts.footer_initial_read_size_bytes)
462                        .with_file_size(object.size)
463                        .open_read(reader)
464                        .await
465                        .map_err(|e| {
466                            DataFusionError::Execution(format!(
467                                "Failed to open Vortex file {}: {e}",
468                                object.location
469                            ))
470                        })?;
471
472                    // Cache the metadata
473                    let file_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
474                    let entry = CachedFileMetadataEntry::new(object.clone(), file_metadata);
475                    file_metadata_cache.put(&object.location, entry);
476
477                    (
478                        vxf.dtype().clone(),
479                        vxf.file_stats().cloned(),
480                        vxf.row_count(),
481                    )
482                }
483            };
484
485            let struct_dtype = dtype
486                .as_struct_fields_opt()
487                .vortex_expect("dtype is not a struct");
488
489            // Evaluate the statistics for each column that we are able to return to DataFusion.
490            let Some(file_stats) = file_stats else {
491                // If the file has no column stats, the best we can do is return a row count.
492                return Ok(Statistics {
493                    num_rows: DFPrecision::Exact(
494                        usize::try_from(row_count)
495                            .map_err(|_| vortex_err!("Row count overflow"))
496                            .vortex_expect("Row count overflow"),
497                    ),
498                    total_byte_size: DFPrecision::Absent,
499                    column_statistics: vec![
500                        ColumnStatistics::default();
501                        table_schema.fields().len()
502                    ],
503                });
504            };
505
506            let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
507
508            for field in table_schema.fields().iter() {
509                // If the column does not exist, continue. This can happen if the schema has evolved
510                // but we have not yet updated the Vortex file.
511                let Some(col_idx) = struct_dtype.find(field.name()) else {
512                    // The default sets all statistics to `Precision<Absent>`.
513                    column_statistics.push(ColumnStatistics::default());
514                    continue;
515                };
516                let (stats_set, stats_dtype) = file_stats.get(col_idx);
517
518                // Update the total size in bytes.
519                let column_size =
520                    stats_set.get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into());
521
522                let target_dtype =
523                    session
524                        .arrow()
525                        .from_arrow_field(field.as_ref())
526                        .map_err(|e| {
527                            DataFusionError::Execution(format!(
528                                "Failed to derive Vortex DType for field {}: {e}",
529                                field.name()
530                            ))
531                        })?;
532                let min = scalar_stat_to_df(
533                    Stat::Min,
534                    stats_set.get(Stat::Min),
535                    stats_dtype,
536                    &target_dtype,
537                );
538
539                let max = scalar_stat_to_df(
540                    Stat::Max,
541                    stats_set.get(Stat::Max),
542                    stats_dtype,
543                    &target_dtype,
544                );
545
546                let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
547
548                column_statistics.push(ColumnStatistics {
549                    null_count: null_count.to_df(),
550                    min_value: min.to_df(),
551                    max_value: max.to_df(),
552                    sum_value: DFPrecision::Absent,
553                    distinct_count: is_constant_to_distinct_count(
554                        stats_set.get_as::<bool>(
555                            Stat::IsConstant,
556                            &DType::Bool(Nullability::NonNullable),
557                        ),
558                    ),
559                    byte_size: column_size.to_df(),
560                })
561            }
562
563            let total_byte_size = column_statistics
564                .iter()
565                .fold(DFPrecision::Exact(0), |acc, cs| acc.add(&cs.byte_size));
566
567            Ok(Statistics {
568                num_rows: DFPrecision::Exact(
569                    usize::try_from(row_count)
570                        .map_err(|_| vortex_err!("Row count overflow"))
571                        .vortex_expect("Row count overflow"),
572                ),
573                total_byte_size,
574                column_statistics,
575            })
576        })
577        .await
578        .vortex_expect("Failed to spawn infer_stats")
579    }
580
581    async fn create_physical_plan(
582        &self,
583        state: &dyn Session,
584        file_scan_config: FileScanConfig,
585    ) -> DFResult<Arc<dyn ExecutionPlan>> {
586        let mut source = file_scan_config
587            .file_source()
588            .downcast_ref::<VortexSource>()
589            .cloned()
590            .ok_or_else(|| internal_datafusion_err!("Expected VortexSource"))?;
591
592        source = source
593            .with_file_metadata_cache(state.runtime_env().cache_manager.get_file_metadata_cache());
594
595        let conf = FileScanConfigBuilder::from(file_scan_config)
596            .with_source(Arc::new(source))
597            .build();
598
599        Ok(DataSourceExec::from_data_source(conf))
600    }
601
602    async fn create_writer_physical_plan(
603        &self,
604        input: Arc<dyn ExecutionPlan>,
605        _state: &dyn Session,
606        conf: FileSinkConfig,
607        order_requirements: Option<LexRequirement>,
608    ) -> DFResult<Arc<dyn ExecutionPlan>> {
609        if conf.insert_op != InsertOp::Append {
610            return not_impl_err!("Overwrites are not implemented yet for Vortex");
611        }
612
613        let schema = Arc::clone(conf.output_schema());
614        let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
615
616        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
617    }
618
619    fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
620        let mut source = VortexSource::new(table_schema, self.session.clone())
621            .with_projection_pushdown(self.opts.projection_pushdown);
622
623        if let Some(scan_concurrency) = self.opts.scan_concurrency {
624            source = source.with_scan_concurrency(scan_concurrency);
625        }
626
627        Arc::new(source) as _
628    }
629}
630
631fn scalar_stat_to_df(
632    stat: Stat,
633    value: Precision<VortexScalarValue>,
634    stats_dtype: &DType,
635    target_dtype: &DType,
636) -> Precision<DFScalarValue> {
637    let Some(stat_dtype) = stat.dtype(stats_dtype) else {
638        return Precision::Absent;
639    };
640
641    value
642        .map(|stat_value| {
643            Scalar::try_new(stat_dtype, Some(stat_value))?
644                .cast(target_dtype)?
645                .try_to_df()
646        })
647        .transpose()
648        .unwrap_or(Precision::Absent)
649}
650
651#[cfg(test)]
652mod tests {
653
654    use super::*;
655    use crate::common_tests::TestSessionContext;
656
657    #[tokio::test]
658    async fn create_table() -> anyhow::Result<()> {
659        let ctx = TestSessionContext::default();
660
661        ctx.session
662            .sql(
663                "CREATE EXTERNAL TABLE my_tbl \
664                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
665                STORED AS vortex  \
666                LOCATION 'table/'",
667            )
668            .await?;
669
670        assert!(ctx.session.table_exist("my_tbl")?);
671
672        Ok(())
673    }
674
675    #[tokio::test]
676    async fn configure_format_source() -> anyhow::Result<()> {
677        let ctx = TestSessionContext::default();
678
679        ctx.session
680            .sql(
681                "CREATE EXTERNAL TABLE my_tbl \
682                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
683                STORED AS vortex \
684                LOCATION 'table/' \
685                OPTIONS( footer_initial_read_size_bytes '12345', scan_concurrency '3' );",
686            )
687            .await?
688            .collect()
689            .await?;
690
691        Ok(())
692    }
693
694    #[test]
695    fn format_plumbs_footer_initial_read_size() {
696        let mut opts = VortexTableOptions::default();
697        opts.set("footer_initial_read_size_bytes", "12345").unwrap();
698
699        let format = VortexFormat::new_with_options(VortexSession::default(), opts);
700        assert_eq!(format.options().footer_initial_read_size_bytes, 12345);
701    }
702}