vortex_datafusion/persistent/
format.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::sync::Arc;
8
9use arrow_schema::Schema;
10use arrow_schema::SchemaRef;
11use async_trait::async_trait;
12use datafusion_catalog::Session;
13use datafusion_common::ColumnStatistics;
14use datafusion_common::DataFusionError;
15use datafusion_common::GetExt;
16use datafusion_common::Result as DFResult;
17use datafusion_common::Statistics;
18use datafusion_common::config::ConfigField;
19use datafusion_common::config_namespace;
20use datafusion_common::not_impl_err;
21use datafusion_common::parsers::CompressionTypeVariant;
22use datafusion_common::stats::Precision;
23use datafusion_common_runtime::SpawnedTask;
24use datafusion_datasource::file::FileSource;
25use datafusion_datasource::file_compression_type::FileCompressionType;
26use datafusion_datasource::file_format::FileFormat;
27use datafusion_datasource::file_format::FileFormatFactory;
28use datafusion_datasource::file_scan_config::FileScanConfig;
29use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
30use datafusion_datasource::file_sink_config::FileSinkConfig;
31use datafusion_datasource::sink::DataSinkExec;
32use datafusion_datasource::source::DataSourceExec;
33use datafusion_expr::dml::InsertOp;
34use datafusion_physical_expr::LexRequirement;
35use datafusion_physical_plan::ExecutionPlan;
36use futures::FutureExt;
37use futures::StreamExt as _;
38use futures::TryStreamExt as _;
39use futures::stream;
40use itertools::Itertools;
41use object_store::ObjectMeta;
42use object_store::ObjectStore;
43use vortex::VortexSessionDefault;
44use vortex::array::stats::StatsSet;
45use vortex::dtype::DType;
46use vortex::dtype::Nullability;
47use vortex::dtype::PType;
48use vortex::dtype::arrow::FromArrowType;
49use vortex::error::VortexExpect;
50use vortex::error::VortexResult;
51use vortex::error::vortex_err;
52use vortex::expr::stats;
53use vortex::expr::stats::Stat;
54use vortex::file::VORTEX_FILE_EXTENSION;
55use vortex::scalar::Scalar;
56use vortex::session::VortexSession;
57
58use super::cache::VortexFileCache;
59use super::sink::VortexSink;
60use super::source::VortexSource;
61use crate::PrecisionExt as _;
62use crate::convert::TryToDataFusion;
63
64/// Vortex implementation of a DataFusion [`FileFormat`].
65pub struct VortexFormat {
66    session: VortexSession,
67    file_cache: VortexFileCache,
68    opts: VortexOptions,
69}
70
71impl Debug for VortexFormat {
72    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73        f.debug_struct("VortexFormat")
74            .field("opts", &self.opts)
75            .finish()
76    }
77}
78
79config_namespace! {
80    /// Options to configure the [`VortexFormat`].
81    ///
82    /// Can be set through a DataFusion [`SessionConfig`].
83    ///
84    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
85    pub struct VortexOptions {
86        /// The size of the in-memory [`vortex::file::Footer`] cache.
87        pub footer_cache_size_mb: usize, default = 64
88        /// The size of the in-memory segment cache.
89        pub segment_cache_size_mb: usize, default = 0
90    }
91}
92
93impl Eq for VortexOptions {}
94
95/// Minimal factory to create [`VortexFormat`] instances.
96#[derive(Debug)]
97pub struct VortexFormatFactory {
98    session: VortexSession,
99    options: Option<VortexOptions>,
100}
101
102impl GetExt for VortexFormatFactory {
103    fn get_ext(&self) -> String {
104        VORTEX_FILE_EXTENSION.to_string()
105    }
106}
107
108impl VortexFormatFactory {
109    /// Creates a new instance with a default [`VortexSession`] and default options.
110    #[expect(
111        clippy::new_without_default,
112        reason = "FormatFactory defines `default` method, so having `Default` implementation is confusing"
113    )]
114    pub fn new() -> Self {
115        Self {
116            session: VortexSession::default(),
117            options: None,
118        }
119    }
120
121    /// Creates a new instance with customized session and default options for all [`VortexFormat`] instances created from this factory.
122    ///
123    /// The options can be overridden by table-level configuration pass in [`FileFormatFactory::create`].
124    pub fn new_with_options(session: VortexSession, options: VortexOptions) -> Self {
125        Self {
126            session,
127            options: Some(options),
128        }
129    }
130
131    /// Override the default options for this factory.
132    ///
133    /// For example:
134    /// ```rust
135    /// use vortex_datafusion::{VortexFormatFactory, VortexOptions};
136    ///
137    /// let factory = VortexFormatFactory::new().with_options(VortexOptions::default());
138    /// ```
139    pub fn with_options(mut self, options: VortexOptions) -> Self {
140        self.options = Some(options);
141        self
142    }
143}
144
145impl FileFormatFactory for VortexFormatFactory {
146    #[expect(clippy::disallowed_types, reason = "required by trait signature")]
147    fn create(
148        &self,
149        _state: &dyn Session,
150        format_options: &std::collections::HashMap<String, String>,
151    ) -> DFResult<Arc<dyn FileFormat>> {
152        let mut opts = self.options.clone().unwrap_or_default();
153        for (key, value) in format_options {
154            if let Some(key) = key.strip_prefix("format.") {
155                opts.set(key, value)?;
156            } else {
157                tracing::trace!("Ignoring options '{key}'");
158            }
159        }
160
161        Ok(Arc::new(VortexFormat::new_with_options(
162            self.session.clone(),
163            opts,
164        )))
165    }
166
167    fn default(&self) -> Arc<dyn FileFormat> {
168        Arc::new(VortexFormat::new(self.session.clone()))
169    }
170
171    fn as_any(&self) -> &dyn Any {
172        self
173    }
174}
175
176impl VortexFormat {
177    /// Create a new instance with default options.
178    pub fn new(session: VortexSession) -> Self {
179        Self::new_with_options(session, VortexOptions::default())
180    }
181
182    /// Creates a new instance with configured by a [`VortexOptions`].
183    pub fn new_with_options(session: VortexSession, opts: VortexOptions) -> Self {
184        Self {
185            session: session.clone(),
186            file_cache: VortexFileCache::new(
187                opts.footer_cache_size_mb,
188                opts.segment_cache_size_mb,
189                session,
190            ),
191            opts,
192        }
193    }
194
195    /// Return the format specific configuration
196    pub fn options(&self) -> &VortexOptions {
197        &self.opts
198    }
199}
200
201#[async_trait]
202impl FileFormat for VortexFormat {
203    fn as_any(&self) -> &dyn Any {
204        self
205    }
206
207    fn compression_type(&self) -> Option<FileCompressionType> {
208        None
209    }
210
211    fn get_ext(&self) -> String {
212        VORTEX_FILE_EXTENSION.to_string()
213    }
214
215    fn get_ext_with_compression(
216        &self,
217        file_compression_type: &FileCompressionType,
218    ) -> DFResult<String> {
219        match file_compression_type.get_variant() {
220            CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
221            _ => Err(DataFusionError::Internal(
222                "Vortex does not support file level compression.".into(),
223            )),
224        }
225    }
226
227    async fn infer_schema(
228        &self,
229        state: &dyn Session,
230        store: &Arc<dyn ObjectStore>,
231        objects: &[ObjectMeta],
232    ) -> DFResult<SchemaRef> {
233        let mut file_schemas = stream::iter(objects.iter().cloned())
234            .map(|o| {
235                let store = store.clone();
236                let cache = self.file_cache.clone();
237                SpawnedTask::spawn(async move {
238                    let vxf = cache.try_get(&o, store).await?;
239                    let inferred_schema = vxf.dtype().to_arrow_schema()?;
240                    VortexResult::Ok((o.location, inferred_schema))
241                })
242                .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
243            })
244            .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
245            .try_collect::<Vec<_>>()
246            .await
247            .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
248
249        // Get consistent order of schemas for `Schema::try_merge`, as some filesystems don't have deterministic listing orders
250        file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
251        let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
252
253        Ok(Arc::new(Schema::try_merge(file_schemas)?))
254    }
255
256    #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
257    async fn infer_stats(
258        &self,
259        _state: &dyn Session,
260        store: &Arc<dyn ObjectStore>,
261        table_schema: SchemaRef,
262        object: &ObjectMeta,
263    ) -> DFResult<Statistics> {
264        let object = object.clone();
265        let store = store.clone();
266        let cache = self.file_cache.clone();
267
268        SpawnedTask::spawn(async move {
269            let vxf = cache.try_get(&object, store.clone()).await.map_err(|e| {
270                DataFusionError::Execution(format!(
271                    "Failed to open Vortex file {}: {e}",
272                    object.location
273                ))
274            })?;
275
276            let struct_dtype = vxf
277                .dtype()
278                .as_struct_fields_opt()
279                .vortex_expect("dtype is not a struct");
280
281            // Evaluate the statistics for each column that we are able to return to DataFusion.
282            let Some(file_stats) = vxf.file_stats() else {
283                // If the file has no column stats, the best we can do is return a row count.
284                return Ok(Statistics {
285                    num_rows: Precision::Exact(
286                        usize::try_from(vxf.row_count())
287                            .map_err(|_| vortex_err!("Row count overflow"))
288                            .vortex_expect("Row count overflow"),
289                    ),
290                    total_byte_size: Precision::Absent,
291                    column_statistics: vec![ColumnStatistics::default(); struct_dtype.nfields()],
292                });
293            };
294
295            let stats = table_schema
296                .fields()
297                .iter()
298                .map(|field| struct_dtype.find(field.name()))
299                .map(|idx| match idx {
300                    None => StatsSet::default(),
301                    Some(id) => file_stats[id].clone(),
302                })
303                .collect_vec();
304
305            let total_byte_size = stats
306                .iter()
307                .map(|stats_set| {
308                    stats_set
309                        .get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into())
310                        .unwrap_or_else(|| stats::Precision::inexact(0_usize))
311                })
312                .fold(stats::Precision::exact(0_usize), |acc, stats_set| {
313                    acc.zip(stats_set).map(|(acc, stats_set)| acc + stats_set)
314                });
315
316            // Sum up the total byte size across all the columns.
317            let total_byte_size = total_byte_size.to_df();
318
319            let column_statistics = stats
320                .into_iter()
321                .zip(table_schema.fields().iter())
322                .map(|(stats_set, field)| {
323                    let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
324                    let min = stats_set.get(Stat::Min).and_then(|n| {
325                        n.map(|n| {
326                            Scalar::new(
327                                Stat::Min
328                                    .dtype(&DType::from_arrow(field.as_ref()))
329                                    .vortex_expect("must have a valid dtype"),
330                                n,
331                            )
332                            .try_to_df()
333                            .ok()
334                        })
335                        .transpose()
336                    });
337
338                    let max = stats_set.get(Stat::Max).and_then(|n| {
339                        n.map(|n| {
340                            Scalar::new(
341                                Stat::Max
342                                    .dtype(&DType::from_arrow(field.as_ref()))
343                                    .vortex_expect("must have a valid dtype"),
344                                n,
345                            )
346                            .try_to_df()
347                            .ok()
348                        })
349                        .transpose()
350                    });
351
352                    ColumnStatistics {
353                        null_count: null_count.to_df(),
354                        max_value: max.to_df(),
355                        min_value: min.to_df(),
356                        sum_value: Precision::Absent,
357                        distinct_count: stats_set
358                            .get_as::<bool>(
359                                Stat::IsConstant,
360                                &DType::Bool(Nullability::NonNullable),
361                            )
362                            .and_then(|is_constant| {
363                                is_constant.as_exact().map(|_| Precision::Exact(1))
364                            })
365                            .unwrap_or(Precision::Absent),
366                    }
367                })
368                .collect::<Vec<_>>();
369
370            Ok(Statistics {
371                num_rows: Precision::Exact(
372                    usize::try_from(vxf.row_count())
373                        .map_err(|_| vortex_err!("Row count overflow"))
374                        .vortex_expect("Row count overflow"),
375                ),
376                total_byte_size,
377                column_statistics,
378            })
379        })
380        .await
381        .vortex_expect("Failed to spawn infer_stats")
382    }
383
384    async fn create_physical_plan(
385        &self,
386        _state: &dyn Session,
387        file_scan_config: FileScanConfig,
388    ) -> DFResult<Arc<dyn ExecutionPlan>> {
389        let source = VortexSource::new(self.session.clone(), self.file_cache.clone());
390        let source = Arc::new(source);
391
392        Ok(DataSourceExec::from_data_source(
393            FileScanConfigBuilder::from(file_scan_config)
394                .with_source(source)
395                .build(),
396        ))
397    }
398
399    async fn create_writer_physical_plan(
400        &self,
401        input: Arc<dyn ExecutionPlan>,
402        _state: &dyn Session,
403        conf: FileSinkConfig,
404        order_requirements: Option<LexRequirement>,
405    ) -> DFResult<Arc<dyn ExecutionPlan>> {
406        if conf.insert_op != InsertOp::Append {
407            return not_impl_err!("Overwrites are not implemented yet for Vortex");
408        }
409
410        let schema = conf.output_schema().clone();
411        let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
412
413        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
414    }
415
416    fn file_source(&self) -> Arc<dyn FileSource> {
417        Arc::new(VortexSource::new(
418            self.session.clone(),
419            self.file_cache.clone(),
420        ))
421    }
422}
423
424#[cfg(test)]
425mod tests {
426    use datafusion::execution::SessionStateBuilder;
427    use datafusion::prelude::SessionContext;
428    use tempfile::TempDir;
429
430    use super::*;
431    use crate::persistent::register_vortex_format_factory;
432
433    #[tokio::test]
434    async fn create_table() {
435        let dir = TempDir::new().unwrap();
436
437        let factory: VortexFormatFactory = VortexFormatFactory::new();
438        let mut session_state_builder = SessionStateBuilder::new().with_default_features();
439        register_vortex_format_factory(factory, &mut session_state_builder);
440        let session = SessionContext::new_with_state(session_state_builder.build());
441
442        let df = session
443            .sql(&format!(
444                "CREATE EXTERNAL TABLE my_tbl \
445                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
446                STORED AS vortex  \
447                LOCATION '{}'",
448                dir.path().to_str().unwrap()
449            ))
450            .await
451            .unwrap();
452
453        assert_eq!(df.count().await.unwrap(), 0);
454    }
455
456    #[tokio::test]
457    async fn configure_format_source() {
458        let dir = TempDir::new().unwrap();
459
460        let factory = VortexFormatFactory::new();
461        let mut session_state_builder = SessionStateBuilder::new().with_default_features();
462        register_vortex_format_factory(factory, &mut session_state_builder);
463        let session = SessionContext::new_with_state(session_state_builder.build());
464
465        session
466            .sql(&format!(
467                "CREATE EXTERNAL TABLE my_tbl \
468                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
469                STORED AS vortex \
470                LOCATION '{}' \
471                OPTIONS( segment_cache_size_mb '5' );",
472                dir.path().to_str().unwrap()
473            ))
474            .await
475            .unwrap()
476            .collect()
477            .await
478            .unwrap();
479    }
480}