vortex_datafusion/persistent/
format.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::{Debug, Formatter};
6use std::sync::Arc;
7
8use arrow_schema::{Schema, SchemaRef};
9use async_trait::async_trait;
10use datafusion_catalog::Session;
11use datafusion_common::config::ConfigField;
12use datafusion_common::parsers::CompressionTypeVariant;
13use datafusion_common::stats::Precision;
14use datafusion_common::{
15    ColumnStatistics, DataFusionError, GetExt, Result as DFResult, Statistics, config_namespace,
16    not_impl_err,
17};
18use datafusion_common_runtime::SpawnedTask;
19use datafusion_datasource::file::FileSource;
20use datafusion_datasource::file_compression_type::FileCompressionType;
21use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
22use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
23use datafusion_datasource::file_sink_config::FileSinkConfig;
24use datafusion_datasource::sink::DataSinkExec;
25use datafusion_datasource::source::DataSourceExec;
26use datafusion_expr::dml::InsertOp;
27use datafusion_physical_expr::LexRequirement;
28use datafusion_physical_plan::ExecutionPlan;
29use futures::{FutureExt, StreamExt as _, TryStreamExt as _, stream};
30use itertools::Itertools;
31use object_store::{ObjectMeta, ObjectStore};
32use vortex::dtype::arrow::FromArrowType;
33use vortex::dtype::{DType, Nullability, PType};
34use vortex::error::{VortexExpect, VortexResult, vortex_err};
35use vortex::file::VORTEX_FILE_EXTENSION;
36use vortex::scalar::Scalar;
37use vortex::session::VortexSession;
38use vortex::stats::{Stat, StatsSet};
39use vortex::{VortexSessionDefault, stats};
40
41use super::cache::VortexFileCache;
42use super::sink::VortexSink;
43use super::source::VortexSource;
44use crate::PrecisionExt as _;
45use crate::convert::TryToDataFusion;
46
47/// Vortex implementation of a DataFusion [`FileFormat`].
48pub struct VortexFormat {
49    session: VortexSession,
50    file_cache: VortexFileCache,
51    opts: VortexOptions,
52}
53
54impl Debug for VortexFormat {
55    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
56        f.debug_struct("VortexFormat")
57            .field("opts", &self.opts)
58            .finish()
59    }
60}
61
62config_namespace! {
63    /// Options to configure the [`VortexFormat`].
64    ///
65    /// Can be set through a DataFusion [`SessionConfig`].
66    ///
67    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
68    pub struct VortexOptions {
69        /// The size of the in-memory [`vortex::file::Footer`] cache.
70        pub footer_cache_size_mb: usize, default = 64
71        /// The size of the in-memory segment cache.
72        pub segment_cache_size_mb: usize, default = 0
73    }
74}
75
76impl Eq for VortexOptions {}
77
78/// Minimal factory to create [`VortexFormat`] instances.
79#[derive(Debug)]
80pub struct VortexFormatFactory {
81    session: VortexSession,
82    options: Option<VortexOptions>,
83}
84
85impl GetExt for VortexFormatFactory {
86    fn get_ext(&self) -> String {
87        VORTEX_FILE_EXTENSION.to_string()
88    }
89}
90
91impl VortexFormatFactory {
92    /// Creates a new instance with a default [`VortexSession`] and default options.
93    #[allow(clippy::new_without_default)] // FormatFactory defines `default` method, so having `Default` implementation is confusing.
94    pub fn new() -> Self {
95        Self {
96            session: VortexSession::default(),
97            options: None,
98        }
99    }
100
101    /// Creates a new instance with customized session and default options for all [`VortexFormat`] instances created from this factory.
102    ///
103    /// The options can be overridden by table-level configuration pass in [`FileFormatFactory::create`].
104    pub fn new_with_options(session: VortexSession, options: VortexOptions) -> Self {
105        Self {
106            session,
107            options: Some(options),
108        }
109    }
110
111    /// Override the default options for this factory.
112    ///
113    /// For example:
114    /// ```rust
115    /// use vortex_datafusion::{VortexFormatFactory, VortexOptions};
116    ///
117    /// let factory = VortexFormatFactory::new().with_options(VortexOptions::default());
118    /// ```
119    pub fn with_options(mut self, options: VortexOptions) -> Self {
120        self.options = Some(options);
121        self
122    }
123}
124
125impl FileFormatFactory for VortexFormatFactory {
126    #[allow(clippy::disallowed_types)]
127    fn create(
128        &self,
129        _state: &dyn Session,
130        format_options: &std::collections::HashMap<String, String>,
131    ) -> DFResult<Arc<dyn FileFormat>> {
132        let mut opts = self.options.clone().unwrap_or_default();
133        for (key, value) in format_options {
134            if let Some(key) = key.strip_prefix("format.") {
135                opts.set(key, value)?;
136            } else {
137                tracing::trace!("Ignoring options '{key}'");
138            }
139        }
140
141        Ok(Arc::new(VortexFormat::new_with_options(
142            self.session.clone(),
143            opts,
144        )))
145    }
146
147    fn default(&self) -> Arc<dyn FileFormat> {
148        Arc::new(VortexFormat::new(self.session.clone()))
149    }
150
151    fn as_any(&self) -> &dyn Any {
152        self
153    }
154}
155
156impl VortexFormat {
157    /// Create a new instance with default options.
158    pub fn new(session: VortexSession) -> Self {
159        Self::new_with_options(session, VortexOptions::default())
160    }
161
162    /// Creates a new instance with configured by a [`VortexOptions`].
163    pub fn new_with_options(session: VortexSession, opts: VortexOptions) -> Self {
164        Self {
165            session: session.clone(),
166            file_cache: VortexFileCache::new(
167                opts.footer_cache_size_mb,
168                opts.segment_cache_size_mb,
169                session,
170            ),
171            opts,
172        }
173    }
174
175    /// Return the format specific configuration
176    pub fn options(&self) -> &VortexOptions {
177        &self.opts
178    }
179}
180
181#[async_trait]
182impl FileFormat for VortexFormat {
183    fn as_any(&self) -> &dyn Any {
184        self
185    }
186
187    fn compression_type(&self) -> Option<FileCompressionType> {
188        None
189    }
190
191    fn get_ext(&self) -> String {
192        VORTEX_FILE_EXTENSION.to_string()
193    }
194
195    fn get_ext_with_compression(
196        &self,
197        file_compression_type: &FileCompressionType,
198    ) -> DFResult<String> {
199        match file_compression_type.get_variant() {
200            CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
201            _ => Err(DataFusionError::Internal(
202                "Vortex does not support file level compression.".into(),
203            )),
204        }
205    }
206
207    async fn infer_schema(
208        &self,
209        state: &dyn Session,
210        store: &Arc<dyn ObjectStore>,
211        objects: &[ObjectMeta],
212    ) -> DFResult<SchemaRef> {
213        let mut file_schemas = stream::iter(objects.iter().cloned())
214            .map(|o| {
215                let store = store.clone();
216                let cache = self.file_cache.clone();
217                SpawnedTask::spawn(async move {
218                    let vxf = cache.try_get(&o, store).await?;
219                    let inferred_schema = vxf.dtype().to_arrow_schema()?;
220                    VortexResult::Ok((o.location, inferred_schema))
221                })
222                .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
223            })
224            .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
225            .try_collect::<Vec<_>>()
226            .await
227            .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
228
229        // Get consistent order of schemas for `Schema::try_merge`, as some filesystems don't have deterministic listing orders
230        file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
231        let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
232
233        Ok(Arc::new(Schema::try_merge(file_schemas)?))
234    }
235
236    #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
237    async fn infer_stats(
238        &self,
239        _state: &dyn Session,
240        store: &Arc<dyn ObjectStore>,
241        table_schema: SchemaRef,
242        object: &ObjectMeta,
243    ) -> DFResult<Statistics> {
244        let object = object.clone();
245        let store = store.clone();
246        let cache = self.file_cache.clone();
247
248        SpawnedTask::spawn(async move {
249            let vxf = cache.try_get(&object, store.clone()).await.map_err(|e| {
250                DataFusionError::Execution(format!(
251                    "Failed to open Vortex file {}: {e}",
252                    object.location
253                ))
254            })?;
255
256            let struct_dtype = vxf
257                .dtype()
258                .as_struct_fields_opt()
259                .vortex_expect("dtype is not a struct");
260
261            // Evaluate the statistics for each column that we are able to return to DataFusion.
262            let Some(file_stats) = vxf.file_stats() else {
263                // If the file has no column stats, the best we can do is return a row count.
264                return Ok(Statistics {
265                    num_rows: Precision::Exact(
266                        usize::try_from(vxf.row_count())
267                            .map_err(|_| vortex_err!("Row count overflow"))
268                            .vortex_expect("Row count overflow"),
269                    ),
270                    total_byte_size: Precision::Absent,
271                    column_statistics: vec![ColumnStatistics::default(); struct_dtype.nfields()],
272                });
273            };
274
275            let stats = table_schema
276                .fields()
277                .iter()
278                .map(|field| struct_dtype.find(field.name()))
279                .map(|idx| match idx {
280                    None => StatsSet::default(),
281                    Some(id) => file_stats[id].clone(),
282                })
283                .collect_vec();
284
285            let total_byte_size = stats
286                .iter()
287                .map(|stats_set| {
288                    stats_set
289                        .get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into())
290                        .unwrap_or_else(|| stats::Precision::inexact(0_usize))
291                })
292                .fold(stats::Precision::exact(0_usize), |acc, stats_set| {
293                    acc.zip(stats_set).map(|(acc, stats_set)| acc + stats_set)
294                });
295
296            // Sum up the total byte size across all the columns.
297            let total_byte_size = total_byte_size.to_df();
298
299            let column_statistics = stats
300                .into_iter()
301                .zip(table_schema.fields().iter())
302                .map(|(stats_set, field)| {
303                    let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
304                    let min = stats_set.get(Stat::Min).and_then(|n| {
305                        n.map(|n| {
306                            Scalar::new(
307                                Stat::Min
308                                    .dtype(&DType::from_arrow(field.as_ref()))
309                                    .vortex_expect("must have a valid dtype"),
310                                n,
311                            )
312                            .try_to_df()
313                            .ok()
314                        })
315                        .transpose()
316                    });
317
318                    let max = stats_set.get(Stat::Max).and_then(|n| {
319                        n.map(|n| {
320                            Scalar::new(
321                                Stat::Max
322                                    .dtype(&DType::from_arrow(field.as_ref()))
323                                    .vortex_expect("must have a valid dtype"),
324                                n,
325                            )
326                            .try_to_df()
327                            .ok()
328                        })
329                        .transpose()
330                    });
331
332                    ColumnStatistics {
333                        null_count: null_count.to_df(),
334                        max_value: max.to_df(),
335                        min_value: min.to_df(),
336                        sum_value: Precision::Absent,
337                        distinct_count: stats_set
338                            .get_as::<bool>(
339                                Stat::IsConstant,
340                                &DType::Bool(Nullability::NonNullable),
341                            )
342                            .and_then(|is_constant| {
343                                is_constant.as_exact().map(|_| Precision::Exact(1))
344                            })
345                            .unwrap_or(Precision::Absent),
346                    }
347                })
348                .collect::<Vec<_>>();
349
350            Ok(Statistics {
351                num_rows: Precision::Exact(
352                    usize::try_from(vxf.row_count())
353                        .map_err(|_| vortex_err!("Row count overflow"))
354                        .vortex_expect("Row count overflow"),
355                ),
356                total_byte_size,
357                column_statistics,
358            })
359        })
360        .await
361        .vortex_expect("Failed to spawn infer_stats")
362    }
363
364    async fn create_physical_plan(
365        &self,
366        _state: &dyn Session,
367        file_scan_config: FileScanConfig,
368    ) -> DFResult<Arc<dyn ExecutionPlan>> {
369        let source = VortexSource::new(self.session.clone(), self.file_cache.clone());
370        let source = Arc::new(source);
371
372        Ok(DataSourceExec::from_data_source(
373            FileScanConfigBuilder::from(file_scan_config)
374                .with_source(source)
375                .build(),
376        ))
377    }
378
379    async fn create_writer_physical_plan(
380        &self,
381        input: Arc<dyn ExecutionPlan>,
382        _state: &dyn Session,
383        conf: FileSinkConfig,
384        order_requirements: Option<LexRequirement>,
385    ) -> DFResult<Arc<dyn ExecutionPlan>> {
386        if conf.insert_op != InsertOp::Append {
387            return not_impl_err!("Overwrites are not implemented yet for Vortex");
388        }
389
390        let schema = conf.output_schema().clone();
391        let sink = Arc::new(VortexSink::new(conf, schema, self.session.clone()));
392
393        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
394    }
395
396    fn file_source(&self) -> Arc<dyn FileSource> {
397        Arc::new(VortexSource::new(
398            self.session.clone(),
399            self.file_cache.clone(),
400        ))
401    }
402}
403
404#[cfg(test)]
405mod tests {
406    use datafusion::execution::SessionStateBuilder;
407    use datafusion::prelude::SessionContext;
408    use tempfile::TempDir;
409
410    use super::*;
411    use crate::persistent::register_vortex_format_factory;
412
413    #[tokio::test]
414    async fn create_table() {
415        let dir = TempDir::new().unwrap();
416
417        let factory: VortexFormatFactory = VortexFormatFactory::new();
418        let mut session_state_builder = SessionStateBuilder::new().with_default_features();
419        register_vortex_format_factory(factory, &mut session_state_builder);
420        let session = SessionContext::new_with_state(session_state_builder.build());
421
422        let df = session
423            .sql(&format!(
424                "CREATE EXTERNAL TABLE my_tbl \
425                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
426                STORED AS vortex  \
427                LOCATION '{}'",
428                dir.path().to_str().unwrap()
429            ))
430            .await
431            .unwrap();
432
433        assert_eq!(df.count().await.unwrap(), 0);
434    }
435
436    #[tokio::test]
437    async fn configure_format_source() {
438        let dir = TempDir::new().unwrap();
439
440        let factory = VortexFormatFactory::new();
441        let mut session_state_builder = SessionStateBuilder::new().with_default_features();
442        register_vortex_format_factory(factory, &mut session_state_builder);
443        let session = SessionContext::new_with_state(session_state_builder.build());
444
445        session
446            .sql(&format!(
447                "CREATE EXTERNAL TABLE my_tbl \
448                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
449                STORED AS vortex \
450                LOCATION '{}' \
451                OPTIONS( segment_cache_size_mb '5' );",
452                dir.path().to_str().unwrap()
453            ))
454            .await
455            .unwrap()
456            .collect()
457            .await
458            .unwrap();
459    }
460}