vortex_datafusion/persistent/
format.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::any::Any;
5use std::fmt::{Debug, Formatter};
6use std::sync::Arc;
7
8use arrow_schema::{Schema, SchemaRef};
9use async_trait::async_trait;
10use datafusion_catalog::Session;
11use datafusion_common::config::ConfigField;
12use datafusion_common::parsers::CompressionTypeVariant;
13use datafusion_common::stats::Precision;
14use datafusion_common::{
15    ColumnStatistics, DataFusionError, GetExt, Result as DFResult, Statistics, config_namespace,
16    not_impl_err,
17};
18use datafusion_common_runtime::SpawnedTask;
19use datafusion_datasource::file::FileSource;
20use datafusion_datasource::file_compression_type::FileCompressionType;
21use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
22use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
23use datafusion_datasource::file_sink_config::FileSinkConfig;
24use datafusion_datasource::sink::DataSinkExec;
25use datafusion_datasource::source::DataSourceExec;
26use datafusion_expr::dml::InsertOp;
27use datafusion_physical_expr::LexRequirement;
28use datafusion_physical_plan::ExecutionPlan;
29use futures::{FutureExt, StreamExt as _, TryStreamExt as _, stream};
30use itertools::Itertools;
31use object_store::{ObjectMeta, ObjectStore};
32use vortex::dtype::arrow::FromArrowType;
33use vortex::dtype::{DType, Nullability, PType};
34use vortex::error::{VortexExpect, VortexResult, vortex_err};
35use vortex::file::VORTEX_FILE_EXTENSION;
36use vortex::metrics::VortexMetrics;
37use vortex::scalar::Scalar;
38use vortex::session::VortexSession;
39use vortex::stats;
40use vortex::stats::{Stat, StatsSet};
41
42use super::cache::VortexFileCache;
43use super::sink::VortexSink;
44use super::source::VortexSource;
45use crate::PrecisionExt as _;
46use crate::convert::TryToDataFusion;
47
48/// Vortex implementation of a DataFusion [`FileFormat`].
49pub struct VortexFormat {
50    session: Arc<VortexSession>,
51    file_cache: VortexFileCache,
52    opts: VortexOptions,
53}
54
55impl Debug for VortexFormat {
56    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
57        f.debug_struct("VortexFormat")
58            .field("opts", &self.opts)
59            .finish()
60    }
61}
62
63config_namespace! {
64    /// Options to configure the [`VortexFormat`].
65    ///
66    /// Can be set through a DataFusion [`SessionConfig`].
67    ///
68    /// [`SessionConfig`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionConfig.html
69    pub struct VortexOptions {
70        /// The size of the in-memory [`vortex::file::Footer`] cache.
71        pub footer_cache_size_mb: usize, default = 64
72        /// The size of the in-memory segment cache.
73        pub segment_cache_size_mb: usize, default = 0
74    }
75}
76
77impl Eq for VortexOptions {}
78
79/// Minimal factory to create [`VortexFormat`] instances.
80#[derive(Debug)]
81pub struct VortexFormatFactory {
82    session: Arc<VortexSession>,
83    options: Option<VortexOptions>,
84}
85
86impl GetExt for VortexFormatFactory {
87    fn get_ext(&self) -> String {
88        VORTEX_FILE_EXTENSION.to_string()
89    }
90}
91
92impl VortexFormatFactory {
93    /// Creates a new instance with a default [`VortexSession`] and default options.
94    #[allow(clippy::new_without_default)] // FormatFactory defines `default` method, so having `Default` implementation is confusing.
95    pub fn new() -> Self {
96        Self {
97            session: Arc::new(VortexSession::default()),
98            options: None,
99        }
100    }
101
102    /// Creates a new instance with customized session and default options for all [`VortexFormat`] instances created from this factory.
103    ///
104    /// The options can be overridden by table-level configuration pass in [`FileFormatFactory::create`].
105    pub fn new_with_options(session: Arc<VortexSession>, options: VortexOptions) -> Self {
106        Self {
107            session,
108            options: Some(options),
109        }
110    }
111
112    /// Override the default options for this factory.
113    ///
114    /// For example:
115    /// ```rust
116    /// use vortex_datafusion::{VortexFormatFactory, VortexOptions};
117    ///
118    /// let factory = VortexFormatFactory::new().with_options(VortexOptions::default());
119    /// ```
120    pub fn with_options(mut self, options: VortexOptions) -> Self {
121        self.options = Some(options);
122        self
123    }
124}
125
126impl FileFormatFactory for VortexFormatFactory {
127    #[allow(clippy::disallowed_types)]
128    fn create(
129        &self,
130        _state: &dyn Session,
131        format_options: &std::collections::HashMap<String, String>,
132    ) -> DFResult<Arc<dyn FileFormat>> {
133        let mut opts = self.options.clone().unwrap_or_default();
134        for (key, value) in format_options {
135            if let Some(key) = key.strip_prefix("format.") {
136                opts.set(key, value)?;
137            } else {
138                tracing::trace!("Ignoring options '{key}'");
139            }
140        }
141
142        Ok(Arc::new(VortexFormat::new_with_options(
143            self.session.clone(),
144            opts,
145        )))
146    }
147
148    fn default(&self) -> Arc<dyn FileFormat> {
149        Arc::new(VortexFormat::default())
150    }
151
152    fn as_any(&self) -> &dyn Any {
153        self
154    }
155}
156
157impl Default for VortexFormat {
158    fn default() -> Self {
159        Self::new(Arc::new(VortexSession::default()))
160    }
161}
162
163impl VortexFormat {
164    /// Create a new instance with default options.
165    pub fn new(session: Arc<VortexSession>) -> Self {
166        Self::new_with_options(session, VortexOptions::default())
167    }
168
169    /// Creates a new instance with configured by a [`VortexOptions`].
170    pub fn new_with_options(session: Arc<VortexSession>, opts: VortexOptions) -> Self {
171        Self {
172            session: session.clone(),
173            file_cache: VortexFileCache::new(
174                opts.footer_cache_size_mb,
175                opts.segment_cache_size_mb,
176                session,
177            ),
178            opts,
179        }
180    }
181
182    /// Return the format specific configuration
183    pub fn options(&self) -> &VortexOptions {
184        &self.opts
185    }
186}
187
188#[async_trait]
189impl FileFormat for VortexFormat {
190    fn as_any(&self) -> &dyn Any {
191        self
192    }
193
194    fn compression_type(&self) -> Option<FileCompressionType> {
195        None
196    }
197
198    fn get_ext(&self) -> String {
199        VORTEX_FILE_EXTENSION.to_string()
200    }
201
202    fn get_ext_with_compression(
203        &self,
204        file_compression_type: &FileCompressionType,
205    ) -> DFResult<String> {
206        match file_compression_type.get_variant() {
207            CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
208            _ => Err(DataFusionError::Internal(
209                "Vortex does not support file level compression.".into(),
210            )),
211        }
212    }
213
214    async fn infer_schema(
215        &self,
216        state: &dyn Session,
217        store: &Arc<dyn ObjectStore>,
218        objects: &[ObjectMeta],
219    ) -> DFResult<SchemaRef> {
220        let mut file_schemas = stream::iter(objects.iter().cloned())
221            .map(|o| {
222                let store = store.clone();
223                let cache = self.file_cache.clone();
224                SpawnedTask::spawn(async move {
225                    let vxf = cache.try_get(&o, store).await?;
226                    let inferred_schema = vxf.dtype().to_arrow_schema()?;
227                    VortexResult::Ok((o.location, inferred_schema))
228                })
229                .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
230            })
231            .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
232            .try_collect::<Vec<_>>()
233            .await
234            .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
235
236        // Get consistent order of schemas for `Schema::try_merge`, as some filesystems don't have deterministic listing orders
237        file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
238        let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
239
240        Ok(Arc::new(Schema::try_merge(file_schemas)?))
241    }
242
243    #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
244    async fn infer_stats(
245        &self,
246        _state: &dyn Session,
247        store: &Arc<dyn ObjectStore>,
248        table_schema: SchemaRef,
249        object: &ObjectMeta,
250    ) -> DFResult<Statistics> {
251        let object = object.clone();
252        let store = store.clone();
253        let cache = self.file_cache.clone();
254
255        SpawnedTask::spawn(async move {
256            let vxf = cache.try_get(&object, store.clone()).await.map_err(|e| {
257                DataFusionError::Execution(format!(
258                    "Failed to open Vortex file {}: {e}",
259                    object.location
260                ))
261            })?;
262
263            let struct_dtype = vxf
264                .dtype()
265                .as_struct_fields_opt()
266                .vortex_expect("dtype is not a struct");
267
268            // Evaluate the statistics for each column that we are able to return to DataFusion.
269            let Some(file_stats) = vxf.file_stats() else {
270                // If the file has no column stats, the best we can do is return a row count.
271                return Ok(Statistics {
272                    num_rows: Precision::Exact(
273                        usize::try_from(vxf.row_count())
274                            .map_err(|_| vortex_err!("Row count overflow"))
275                            .vortex_expect("Row count overflow"),
276                    ),
277                    total_byte_size: Precision::Absent,
278                    column_statistics: vec![ColumnStatistics::default(); struct_dtype.nfields()],
279                });
280            };
281
282            let stats = table_schema
283                .fields()
284                .iter()
285                .map(|field| struct_dtype.find(field.name()))
286                .map(|idx| match idx {
287                    None => StatsSet::default(),
288                    Some(id) => file_stats[id].clone(),
289                })
290                .collect_vec();
291
292            let total_byte_size = stats
293                .iter()
294                .map(|stats_set| {
295                    stats_set
296                        .get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into())
297                        .unwrap_or_else(|| stats::Precision::inexact(0_usize))
298                })
299                .fold(stats::Precision::exact(0_usize), |acc, stats_set| {
300                    acc.zip(stats_set).map(|(acc, stats_set)| acc + stats_set)
301                });
302
303            // Sum up the total byte size across all the columns.
304            let total_byte_size = total_byte_size.to_df();
305
306            let column_statistics = stats
307                .into_iter()
308                .zip(table_schema.fields().iter())
309                .map(|(stats_set, field)| {
310                    let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
311                    let min = stats_set.get(Stat::Min).and_then(|n| {
312                        n.map(|n| {
313                            Scalar::new(
314                                Stat::Min
315                                    .dtype(&DType::from_arrow(field.as_ref()))
316                                    .vortex_expect("must have a valid dtype"),
317                                n,
318                            )
319                            .try_to_df()
320                            .ok()
321                        })
322                        .transpose()
323                    });
324
325                    let max = stats_set.get(Stat::Max).and_then(|n| {
326                        n.map(|n| {
327                            Scalar::new(
328                                Stat::Max
329                                    .dtype(&DType::from_arrow(field.as_ref()))
330                                    .vortex_expect("must have a valid dtype"),
331                                n,
332                            )
333                            .try_to_df()
334                            .ok()
335                        })
336                        .transpose()
337                    });
338
339                    ColumnStatistics {
340                        null_count: null_count.to_df(),
341                        max_value: max.to_df(),
342                        min_value: min.to_df(),
343                        sum_value: Precision::Absent,
344                        distinct_count: stats_set
345                            .get_as::<bool>(
346                                Stat::IsConstant,
347                                &DType::Bool(Nullability::NonNullable),
348                            )
349                            .and_then(|is_constant| {
350                                is_constant.as_exact().map(|_| Precision::Exact(1))
351                            })
352                            .unwrap_or(Precision::Absent),
353                    }
354                })
355                .collect::<Vec<_>>();
356
357            Ok(Statistics {
358                num_rows: Precision::Exact(
359                    usize::try_from(vxf.row_count())
360                        .map_err(|_| vortex_err!("Row count overflow"))
361                        .vortex_expect("Row count overflow"),
362                ),
363                total_byte_size,
364                column_statistics,
365            })
366        })
367        .await
368        .vortex_expect("Failed to spawn infer_stats")
369    }
370
371    async fn create_physical_plan(
372        &self,
373        _state: &dyn Session,
374        file_scan_config: FileScanConfig,
375    ) -> DFResult<Arc<dyn ExecutionPlan>> {
376        let source = VortexSource::new(self.file_cache.clone(), self.session.metrics().clone());
377        let source = Arc::new(source);
378
379        Ok(DataSourceExec::from_data_source(
380            FileScanConfigBuilder::from(file_scan_config)
381                .with_source(source)
382                .build(),
383        ))
384    }
385
386    async fn create_writer_physical_plan(
387        &self,
388        input: Arc<dyn ExecutionPlan>,
389        _state: &dyn Session,
390        conf: FileSinkConfig,
391        order_requirements: Option<LexRequirement>,
392    ) -> DFResult<Arc<dyn ExecutionPlan>> {
393        if conf.insert_op != InsertOp::Append {
394            return not_impl_err!("Overwrites are not implemented yet for Vortex");
395        }
396
397        let schema = conf.output_schema().clone();
398        let sink = Arc::new(VortexSink::new(conf, schema));
399
400        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
401    }
402
403    fn file_source(&self) -> Arc<dyn FileSource> {
404        Arc::new(VortexSource::new(
405            self.file_cache.clone(),
406            VortexMetrics::default(),
407        ))
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use datafusion::execution::SessionStateBuilder;
414    use datafusion::prelude::SessionContext;
415    use tempfile::TempDir;
416
417    use super::*;
418    use crate::persistent::register_vortex_format_factory;
419
420    #[tokio::test]
421    async fn create_table() {
422        let dir = TempDir::new().unwrap();
423
424        let factory: VortexFormatFactory = VortexFormatFactory::new();
425        let mut session_state_builder = SessionStateBuilder::new().with_default_features();
426        register_vortex_format_factory(factory, &mut session_state_builder);
427        let session = SessionContext::new_with_state(session_state_builder.build());
428
429        let df = session
430            .sql(&format!(
431                "CREATE EXTERNAL TABLE my_tbl \
432                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
433                STORED AS vortex  \
434                LOCATION '{}'",
435                dir.path().to_str().unwrap()
436            ))
437            .await
438            .unwrap();
439
440        assert_eq!(df.count().await.unwrap(), 0);
441    }
442
443    #[tokio::test]
444    async fn configure_format_source() {
445        let dir = TempDir::new().unwrap();
446
447        let factory = VortexFormatFactory::new();
448        let mut session_state_builder = SessionStateBuilder::new().with_default_features();
449        register_vortex_format_factory(factory, &mut session_state_builder);
450        let session = SessionContext::new_with_state(session_state_builder.build());
451
452        session
453            .sql(&format!(
454                "CREATE EXTERNAL TABLE my_tbl \
455                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
456                STORED AS vortex \
457                LOCATION '{}' \
458                OPTIONS( segment_cache_size_mb '5' );",
459                dir.path().to_str().unwrap()
460            ))
461            .await
462            .unwrap()
463            .collect()
464            .await
465            .unwrap();
466    }
467}