Skip to main content

vortex_datafusion/persistent/
format.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::sync::Arc;
8
9use arrow_schema::Schema;
10use arrow_schema::SchemaRef;
11use async_trait::async_trait;
12use datafusion_catalog::Session;
13use datafusion_common::ColumnStatistics;
14use datafusion_common::DataFusionError;
15use datafusion_common::GetExt;
16use datafusion_common::Result as DFResult;
17use datafusion_common::ScalarValue as DFScalarValue;
18use datafusion_common::Statistics;
19use datafusion_common::config::ConfigField;
20use datafusion_common::config_namespace;
21use datafusion_common::internal_datafusion_err;
22use datafusion_common::not_impl_err;
23use datafusion_common::parsers::CompressionTypeVariant;
24use datafusion_common::stats::Precision;
25use datafusion_common_runtime::SpawnedTask;
26use datafusion_datasource::TableSchema;
27use datafusion_datasource::file::FileSource;
28use datafusion_datasource::file_compression_type::FileCompressionType;
29use datafusion_datasource::file_format::FileFormat;
30use datafusion_datasource::file_format::FileFormatFactory;
31use datafusion_datasource::file_scan_config::FileScanConfig;
32use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
33use datafusion_datasource::file_sink_config::FileSinkConfig;
34use datafusion_datasource::sink::DataSinkExec;
35use datafusion_datasource::source::DataSourceExec;
36use datafusion_execution::cache::cache_manager::CachedFileMetadataEntry;
37use datafusion_expr::dml::InsertOp;
38use datafusion_physical_expr::LexRequirement;
39use datafusion_physical_plan::ExecutionPlan;
40use futures::FutureExt;
41use futures::StreamExt as _;
42use futures::TryStreamExt as _;
43use futures::stream;
44use object_store::ObjectMeta;
45use object_store::ObjectStore;
46use vortex::VortexSessionDefault;
47use vortex::array::memory::MemorySessionExt;
48use vortex::dtype::DType;
49use vortex::dtype::Nullability;
50use vortex::dtype::PType;
51use vortex::dtype::arrow::FromArrowType;
52use vortex::error::VortexExpect;
53use vortex::error::VortexResult;
54use vortex::error::vortex_err;
55use vortex::expr::stats;
56use vortex::expr::stats::Stat;
57use vortex::file::EOF_SIZE;
58use vortex::file::MAX_POSTSCRIPT_SIZE;
59use vortex::file::OpenOptionsSessionExt;
60use vortex::file::VORTEX_FILE_EXTENSION;
61use vortex::io::object_store::ObjectStoreReadAt;
62use vortex::io::session::RuntimeSessionExt;
63use vortex::scalar::Scalar;
64use vortex::scalar::ScalarValue as VortexScalarValue;
65use vortex::session::VortexSession;
66
67use super::cache::CachedVortexMetadata;
68use super::sink::VortexSink;
69use super::source::VortexSource;
70use crate::PrecisionExt as _;
71use crate::convert::TryToDataFusion;
72use crate::convert::stats::is_constant_to_distinct_count;
73
74const DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES: usize = MAX_POSTSCRIPT_SIZE as usize + EOF_SIZE;
75
76/// DataFusion [`FileFormat`] implementation for `.vortex` files.
77///
78/// Most applications do not construct `VortexFormat` directly. Instead, they
79/// register [`VortexFormatFactory`] with a [`SessionContext`] and let
80/// DataFusion instantiate `VortexFormat` as tables are planned.
81///
82/// Construct `VortexFormat` directly when you are wiring a [`ListingTable`] by
83/// hand and need to pass a file format into [`ListingOptions`].
84///
85/// # Example
86///
87/// ```no_run
88/// use std::sync::Arc;
89///
90/// use datafusion::datasource::listing::ListingOptions;
91/// use datafusion::datasource::listing::ListingTable;
92/// use datafusion::datasource::listing::ListingTableConfig;
93/// use datafusion::datasource::listing::ListingTableUrl;
94/// use datafusion::prelude::SessionContext;
95/// use tempfile::tempdir;
96/// use vortex::VortexSessionDefault;
97/// use vortex::session::VortexSession;
98/// use vortex_datafusion::VortexFormat;
99///
100/// # #[tokio::main]
101/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
102/// let ctx = SessionContext::new();
103/// let dir = tempdir()?;
104///
105/// let format = Arc::new(VortexFormat::new(VortexSession::default()));
106/// let table_url = ListingTableUrl::parse(dir.path().to_str().unwrap())?;
107/// let config = ListingTableConfig::new(table_url)
108///     .with_listing_options(
109///         ListingOptions::new(format).with_session_config_options(ctx.state().config()),
110///     )
111///     .infer_schema(&ctx.state())
112///     .await?;
113///
114/// let table = ListingTable::try_new(config)?;
115/// # let _ = table;
116/// # Ok(())
117/// # }
118/// ```
119///
120/// [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionContext.html
121/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
122/// [`ListingOptions`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingOptions.html
123pub struct VortexFormat {
124    session: VortexSession,
125    opts: VortexTableOptions,
126}
127
128impl Debug for VortexFormat {
129    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
130        f.debug_struct("VortexFormat")
131            .field("opts", &self.opts)
132            .finish()
133    }
134}
135
136config_namespace! {
137    /// Options to configure [`VortexFormat`] and [`VortexSource`].
138    ///
139    /// These options are usually set on a [`VortexFormatFactory`] and inherited
140    /// by the `VortexFormat` / `VortexSource` instances created for individual
141    /// tables.
142    ///
143    /// # Example
144    ///
145    /// ```rust
146    /// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
147    ///
148    /// let factory = VortexFormatFactory::new().with_options(VortexTableOptions {
149    ///     projection_pushdown: true,
150    ///     scan_concurrency: Some(8),
151    ///     ..Default::default()
152    /// });
153    /// # let _ = factory;
154    /// ```
155    ///
156    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
157    pub struct VortexTableOptions {
158        /// The number of bytes to read when parsing a file footer.
159        ///
160        /// Values smaller than `MAX_POSTSCRIPT_SIZE + EOF_SIZE` will be clamped to that minimum
161        /// during footer parsing.
162        pub footer_initial_read_size_bytes: usize, default = DEFAULT_FOOTER_INITIAL_READ_SIZE_BYTES
163        /// Whether to enable projection pushdown into the underlying Vortex scan.
164        ///
165        /// When enabled, projection expressions may be partially evaluated during
166        /// the scan. When disabled, Vortex reads only the referenced columns and
167        /// all expressions are evaluated after the scan.
168        pub projection_pushdown: bool, default = false
169        /// The intra-partition scan concurrency, controlling the number of row splits to process
170        /// concurrently per-thread within each file.
171        ///
172        /// This does not affect the overall parallelism
173        /// across partitions, which is controlled by DataFusion's execution configuration.
174        pub scan_concurrency: Option<usize>, default = None
175    }
176}
177
178impl Eq for VortexTableOptions {}
179
180/// Registration entry point for the file-backed Vortex integration.
181///
182/// `VortexFormatFactory` is the type most applications use. Register it with a
183/// DataFusion session, and DataFusion will create [`VortexFormat`] values for
184/// `CREATE EXTERNAL TABLE`, [`ListingTable`], and URL-table scans.
185///
186/// The factory stores a [`VortexSession`] and default [`VortexTableOptions`].
187/// Those defaults are copied into the formats and sources created for each
188/// table.
189///
190/// # Example
191///
192/// ```no_run
193/// use std::sync::Arc;
194///
195/// use datafusion::datasource::provider::DefaultTableFactory;
196/// use datafusion::execution::SessionStateBuilder;
197/// use datafusion_common::GetExt;
198/// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
199///
200/// let factory = Arc::new(VortexFormatFactory::new().with_options(VortexTableOptions {
201///     projection_pushdown: true,
202///     ..Default::default()
203/// }));
204///
205/// let mut state_builder = SessionStateBuilder::new()
206///     .with_default_features()
207///     .with_table_factory(
208///         factory.get_ext().to_uppercase(),
209///         Arc::new(DefaultTableFactory::new()),
210///     );
211///
212/// if let Some(file_formats) = state_builder.file_formats() {
213///     file_formats.push(factory.clone() as _);
214/// }
215/// ```
216///
217/// [`ListingTable`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingTable.html
218#[derive(Debug)]
219pub struct VortexFormatFactory {
220    session: VortexSession,
221    options: Option<VortexTableOptions>,
222}
223
224impl GetExt for VortexFormatFactory {
225    fn get_ext(&self) -> String {
226        VORTEX_FILE_EXTENSION.to_string()
227    }
228}
229
230impl VortexFormatFactory {
231    /// Creates a factory with a default [`VortexSession`] and default options.
232    #[expect(
233        clippy::new_without_default,
234        reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
235    )]
236    pub fn new() -> Self {
237        Self {
238            session: VortexSession::default(),
239            options: None,
240        }
241    }
242
243    /// Creates a factory with an explicit session and default options.
244    ///
245    /// The supplied options become the baseline for every [`VortexFormat`]
246    /// created by this factory. DataFusion may still override them with
247    /// table-level options passed into [`FileFormatFactory::create`].
248    pub fn new_with_options(session: VortexSession, options: VortexTableOptions) -> Self {
249        Self {
250            session,
251            options: Some(options),
252        }
253    }
254
255    /// Overrides the default options for this factory.
256    ///
257    /// This is the usual way to turn on features such as projection pushdown for
258    /// every table created through the factory.
259    ///
260    /// # Example
261    ///
262    /// ```rust
263    /// use vortex_datafusion::{VortexFormatFactory, VortexTableOptions};
264    ///
265    /// let factory = VortexFormatFactory::new().with_options(VortexTableOptions {
266    ///     projection_pushdown: true,
267    ///     ..Default::default()
268    /// });
269    /// # let _ = factory;
270    /// ```
271    pub fn with_options(mut self, options: VortexTableOptions) -> Self {
272        self.options = Some(options);
273        self
274    }
275}
276
277impl FileFormatFactory for VortexFormatFactory {
278    #[expect(clippy::disallowed_types, reason = "required by trait signature")]
279    fn create(
280        &self,
281        _state: &dyn Session,
282        format_options: &std::collections::HashMap<String, String>,
283    ) -> DFResult<Arc<dyn FileFormat>> {
284        let mut opts = self.options.clone().unwrap_or_default();
285        for (key, value) in format_options {
286            if let Some(key) = key.strip_prefix("format.") {
287                opts.set(key, value)?;
288            } else {
289                tracing::trace!("Ignoring options '{key}'");
290            }
291        }
292
293        Ok(Arc::new(VortexFormat::new_with_options(
294            self.session.clone(),
295            opts,
296        )))
297    }
298
299    fn default(&self) -> Arc<dyn FileFormat> {
300        Arc::new(VortexFormat::new(self.session.clone()))
301    }
302
303    fn as_any(&self) -> &dyn Any {
304        self
305    }
306}
307
308impl VortexFormat {
309    /// Creates a format with default [`VortexTableOptions`].
310    ///
311    /// Prefer [`VortexFormatFactory`] when registering with a session. Construct
312    /// `VortexFormat` directly when building [`ListingOptions`] manually.
313    ///
314    /// [`ListingOptions`]: https://docs.rs/datafusion/latest/datafusion/datasource/listing/struct.ListingOptions.html
315    pub fn new(session: VortexSession) -> Self {
316        Self::new_with_options(session, VortexTableOptions::default())
317    }
318
319    /// Creates a format with explicit [`VortexTableOptions`].
320    pub fn new_with_options(session: VortexSession, opts: VortexTableOptions) -> Self {
321        Self { session, opts }
322    }
323
324    /// Returns the format-specific configuration that will be copied into the
325    /// [`VortexSource`] created for a scan.
326    pub fn options(&self) -> &VortexTableOptions {
327        &self.opts
328    }
329}
330
331#[async_trait]
332impl FileFormat for VortexFormat {
333    fn as_any(&self) -> &dyn Any {
334        self
335    }
336
337    fn compression_type(&self) -> Option<FileCompressionType> {
338        None
339    }
340
341    fn get_ext(&self) -> String {
342        VORTEX_FILE_EXTENSION.to_string()
343    }
344
345    fn get_ext_with_compression(
346        &self,
347        file_compression_type: &FileCompressionType,
348    ) -> DFResult<String> {
349        match file_compression_type.get_variant() {
350            CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
351            _ => Err(DataFusionError::Internal(
352                "Vortex does not support file level compression.".into(),
353            )),
354        }
355    }
356
357    async fn infer_schema(
358        &self,
359        state: &dyn Session,
360        store: &Arc<dyn ObjectStore>,
361        objects: &[ObjectMeta],
362    ) -> DFResult<SchemaRef> {
363        let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
364
365        let mut file_schemas = stream::iter(objects.iter().cloned())
366            .map(|object| {
367                let store = Arc::clone(store);
368                let session = self.session.clone();
369                let opts = self.opts.clone();
370                let cache = Arc::clone(&file_metadata_cache);
371
372                SpawnedTask::spawn(async move {
373                    // Check if we have entry metadata for this file
374                    if let Some(entry) = cache.get(&object.location)
375                        && entry.is_valid_for(&object)
376                        && let Some(cached_vortex) = entry
377                            .file_metadata
378                            .as_any()
379                            .downcast_ref::<CachedVortexMetadata>()
380                    {
381                        let inferred_schema = cached_vortex.footer().dtype().to_arrow_schema()?;
382                        return VortexResult::Ok((object.location, inferred_schema));
383                    }
384
385                    // Not entry or invalid - open the file
386                    let reader = Arc::new(ObjectStoreReadAt::new_with_allocator(
387                        store,
388                        object.location.clone(),
389                        session.handle(),
390                        session.allocator(),
391                    ));
392
393                    let vxf = session
394                        .open_options()
395                        .with_initial_read_size(opts.footer_initial_read_size_bytes)
396                        .with_file_size(object.size)
397                        .open_read(reader)
398                        .await?;
399
400                    // Cache the metadata
401                    let cached_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
402                    let entry = CachedFileMetadataEntry::new(object.clone(), cached_metadata);
403                    cache.put(&object.location, entry);
404
405                    let inferred_schema = vxf.dtype().to_arrow_schema()?;
406                    VortexResult::Ok((object.location, inferred_schema))
407                })
408                .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
409            })
410            .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
411            .try_collect::<Vec<_>>()
412            .await
413            .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
414
415        // Get consistent order of schemas for `Schema::try_merge`, as some filesystems don't have deterministic listing orders
416        file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
417        let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
418
419        Ok(Arc::new(Schema::try_merge(file_schemas)?))
420    }
421
422    #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
423    async fn infer_stats(
424        &self,
425        state: &dyn Session,
426        store: &Arc<dyn ObjectStore>,
427        table_schema: SchemaRef,
428        object: &ObjectMeta,
429    ) -> DFResult<Statistics> {
430        let object = object.clone();
431        let store = Arc::clone(store);
432        let session = self.session.clone();
433        let opts = self.opts.clone();
434        let file_metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
435
436        SpawnedTask::spawn(async move {
437            // Try to get entry metadata first
438            let cached_metadata = file_metadata_cache
439                .get(&object.location)
440                .filter(|entry| entry.is_valid_for(&object))
441                .and_then(|entry| {
442                    entry
443                        .file_metadata
444                        .as_any()
445                        .downcast_ref::<CachedVortexMetadata>()
446                        .map(|m| {
447                            (
448                                m.footer().dtype().clone(),
449                                m.footer().statistics().cloned(),
450                                m.footer().row_count(),
451                            )
452                        })
453                });
454
455            let (dtype, file_stats, row_count) = match cached_metadata {
456                Some(metadata) => metadata,
457                None => {
458                    // Not entry - open the file
459                    let reader = Arc::new(ObjectStoreReadAt::new_with_allocator(
460                        store,
461                        object.location.clone(),
462                        session.handle(),
463                        session.allocator(),
464                    ));
465
466                    let vxf = session
467                        .open_options()
468                        .with_initial_read_size(opts.footer_initial_read_size_bytes)
469                        .with_file_size(object.size)
470                        .open_read(reader)
471                        .await
472                        .map_err(|e| {
473                            DataFusionError::Execution(format!(
474                                "Failed to open Vortex file {}: {e}",
475                                object.location
476                            ))
477                        })?;
478
479                    // Cache the metadata
480                    let file_metadata = Arc::new(CachedVortexMetadata::new(&vxf));
481                    let entry = CachedFileMetadataEntry::new(object.clone(), file_metadata);
482                    file_metadata_cache.put(&object.location, entry);
483
484                    (
485                        vxf.dtype().clone(),
486                        vxf.file_stats().cloned(),
487                        vxf.row_count(),
488                    )
489                }
490            };
491
492            let struct_dtype = dtype
493                .as_struct_fields_opt()
494                .vortex_expect("dtype is not a struct");
495
496            // Evaluate the statistics for each column that we are able to return to DataFusion.
497            let Some(file_stats) = file_stats else {
498                // If the file has no column stats, the best we can do is return a row count.
499                return Ok(Statistics {
500                    num_rows: Precision::Exact(
501                        usize::try_from(row_count)
502                            .map_err(|_| vortex_err!("Row count overflow"))
503                            .vortex_expect("Row count overflow"),
504                    ),
505                    total_byte_size: Precision::Absent,
506                    column_statistics: vec![
507                        ColumnStatistics::default();
508                        table_schema.fields().len()
509                    ],
510                });
511            };
512
513            let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
514
515            for field in table_schema.fields().iter() {
516                // If the column does not exist, continue. This can happen if the schema has evolved
517                // but we have not yet updated the Vortex file.
518                let Some(col_idx) = struct_dtype.find(field.name()) else {
519                    // The default sets all statistics to `Precision<Absent>`.
520                    column_statistics.push(ColumnStatistics::default());
521                    continue;
522                };
523                let (stats_set, stats_dtype) = file_stats.get(col_idx);
524
525                // Update the total size in bytes.
526                let column_size =
527                    stats_set.get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into());
528
529                let target_dtype = DType::from_arrow(field.as_ref());
530                let min = scalar_stat_to_df(
531                    Stat::Min,
532                    stats_set.get(Stat::Min),
533                    stats_dtype,
534                    &target_dtype,
535                );
536                let max = scalar_stat_to_df(
537                    Stat::Max,
538                    stats_set.get(Stat::Max),
539                    stats_dtype,
540                    &target_dtype,
541                );
542
543                let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
544
545                column_statistics.push(ColumnStatistics {
546                    null_count: null_count.to_df(),
547                    min_value: min.to_df(),
548                    max_value: max.to_df(),
549                    sum_value: Precision::Absent,
550                    distinct_count: is_constant_to_distinct_count(
551                        stats_set.get_as::<bool>(
552                            Stat::IsConstant,
553                            &DType::Bool(Nullability::NonNullable),
554                        ),
555                    ),
556                    byte_size: column_size.to_df(),
557                })
558            }
559
560            let total_byte_size = column_statistics
561                .iter()
562                .fold(Precision::Exact(0), |acc, cs| acc.add(&cs.byte_size));
563
564            Ok(Statistics {
565                num_rows: Precision::Exact(
566                    usize::try_from(row_count)
567                        .map_err(|_| vortex_err!("Row count overflow"))
568                        .vortex_expect("Row count overflow"),
569                ),
570                total_byte_size,
571                column_statistics,
572            })
573        })
574        .await
575        .vortex_expect("Failed to spawn infer_stats")
576    }
577
578    async fn create_physical_plan(
579        &self,
580        state: &dyn Session,
581        file_scan_config: FileScanConfig,
582    ) -> DFResult<Arc<dyn ExecutionPlan>> {
583        let mut source = file_scan_config
584            .file_source()
585            .as_any()
586            .downcast_ref::<VortexSource>()
587            .cloned()
588            .ok_or_else(|| internal_datafusion_err!("Expected VortexSource"))?;
589
590        source = source
591            .with_file_metadata_cache(state.runtime_env().cache_manager.get_file_metadata_cache());
592
593        let conf = FileScanConfigBuilder::from(file_scan_config)
594            .with_source(Arc::new(source))
595            .build();
596
597        Ok(DataSourceExec::from_data_source(conf))
598    }
599
600    async fn create_writer_physical_plan(
601        &self,
602        input: Arc<dyn ExecutionPlan>,
603        _state: &dyn Session,
604        conf: FileSinkConfig,
605        order_requirements: Option<LexRequirement>,
606    ) -> DFResult<Arc<dyn ExecutionPlan>> {
607        if conf.insert_op != InsertOp::Append {
608            return not_impl_err!("Overwrites are not implemented yet for Vortex");
609        }
610
611        let schema = Arc::clone(conf.output_schema());
612        let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
613
614        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
615    }
616
617    fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
618        let mut source = VortexSource::new(table_schema, self.session.clone())
619            .with_projection_pushdown(self.opts.projection_pushdown);
620
621        if let Some(scan_concurrency) = self.opts.scan_concurrency {
622            source = source.with_scan_concurrency(scan_concurrency);
623        }
624
625        Arc::new(source) as _
626    }
627}
628
629fn scalar_stat_to_df(
630    stat: Stat,
631    value: Option<stats::Precision<VortexScalarValue>>,
632    stats_dtype: &DType,
633    target_dtype: &DType,
634) -> Option<stats::Precision<DFScalarValue>> {
635    let stat_dtype = stat.dtype(stats_dtype)?;
636
637    value?
638        .try_map(|stat_value| {
639            Scalar::try_new(stat_dtype, Some(stat_value))?
640                .cast(target_dtype)?
641                .try_to_df()
642        })
643        .ok()
644}
645
646#[cfg(test)]
647mod tests {
648
649    use super::*;
650    use crate::common_tests::TestSessionContext;
651
652    #[tokio::test]
653    async fn create_table() -> anyhow::Result<()> {
654        let ctx = TestSessionContext::default();
655
656        ctx.session
657            .sql(
658                "CREATE EXTERNAL TABLE my_tbl \
659                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
660                STORED AS vortex  \
661                LOCATION 'table/'",
662            )
663            .await?;
664
665        assert!(ctx.session.table_exist("my_tbl")?);
666
667        Ok(())
668    }
669
670    #[tokio::test]
671    async fn configure_format_source() -> anyhow::Result<()> {
672        let ctx = TestSessionContext::default();
673
674        ctx.session
675            .sql(
676                "CREATE EXTERNAL TABLE my_tbl \
677                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
678                STORED AS vortex \
679                LOCATION 'table/' \
680                OPTIONS( footer_initial_read_size_bytes '12345', scan_concurrency '3' );",
681            )
682            .await?
683            .collect()
684            .await?;
685
686        Ok(())
687    }
688
689    #[test]
690    fn format_plumbs_footer_initial_read_size() {
691        let mut opts = VortexTableOptions::default();
692        opts.set("footer_initial_read_size_bytes", "12345").unwrap();
693
694        let format = VortexFormat::new_with_options(VortexSession::default(), opts);
695        assert_eq!(format.options().footer_initial_read_size_bytes, 12345);
696    }
697}