datafusion_orc/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::collections::HashMap;
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use datafusion::arrow::datatypes::{Schema, SchemaRef};
24use datafusion::common::Statistics;
25use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
26use datafusion::datasource::file_format::FileFormat;
27use datafusion::datasource::physical_plan::{FileScanConfig, FileSource};
28use datafusion::datasource::table_schema::TableSchema;
29use datafusion::error::{DataFusionError, Result};
30use datafusion::physical_plan::ExecutionPlan;
31use futures::TryStreamExt;
32use orc_rust::reader::metadata::read_metadata_async;
33
34use crate::OrcSource;
35use async_trait::async_trait;
36use datafusion::catalog::Session;
37use datafusion::datasource::source::DataSourceExec;
38use futures_util::StreamExt;
39use object_store::path::Path;
40use object_store::{ObjectMeta, ObjectStore};
41
42use super::object_store_reader::ObjectStoreReader;
43
44async fn fetch_schema(store: &Arc<dyn ObjectStore>, file: &ObjectMeta) -> Result<(Path, Schema)> {
45    let loc_path = file.location.clone();
46    let mut reader = ObjectStoreReader::new(Arc::clone(store), file.clone());
47    let metadata = read_metadata_async(&mut reader)
48        .await
49        .map_err(|e| DataFusionError::External(Box::new(e)))?;
50    let schema = metadata
51        .root_data_type()
52        .create_arrow_schema(&HashMap::default());
53    Ok((loc_path, schema))
54}
55
56#[derive(Clone, Debug)]
57pub struct OrcFormat;
58
59#[async_trait]
60impl FileFormat for OrcFormat {
61    fn as_any(&self) -> &dyn Any {
62        self
63    }
64
65    fn get_ext(&self) -> String {
66        "orc".to_string()
67    }
68
69    fn get_ext_with_compression(&self, _compression: &FileCompressionType) -> Result<String> {
70        Ok("orc".to_string())
71    }
72
73    fn compression_type(&self) -> Option<FileCompressionType> {
74        None
75    }
76
77    async fn infer_schema(
78        &self,
79        state: &dyn Session,
80        store: &Arc<dyn ObjectStore>,
81        objects: &[ObjectMeta],
82    ) -> Result<SchemaRef> {
83        let mut schemas: Vec<_> = futures::stream::iter(objects)
84            .map(|object| fetch_schema(store, object))
85            .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
86            .buffered(state.config_options().execution.meta_fetch_concurrency)
87            .try_collect()
88            .await?;
89
90        // Schema inference adds fields based the order they are seen
91        // which depends on the order the files are processed. For some
92        // object stores (like local file systems) the order returned from list
93        // is not deterministic. Thus, to ensure deterministic schema inference
94        // sort the files first.
95        // https://github.com/apache/datafusion/pull/6629
96        schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));
97
98        let schemas = schemas
99            .into_iter()
100            .map(|(_, schema)| schema)
101            .collect::<Vec<_>>();
102
103        let schema = Schema::try_merge(schemas)?;
104
105        Ok(Arc::new(schema))
106    }
107
108    async fn infer_stats(
109        &self,
110        _state: &dyn Session,
111        _store: &Arc<dyn ObjectStore>,
112        table_schema: SchemaRef,
113        _object: &ObjectMeta,
114    ) -> Result<Statistics> {
115        Ok(Statistics::new_unknown(&table_schema))
116    }
117
118    async fn create_physical_plan(
119        &self,
120        _state: &dyn Session,
121        conf: FileScanConfig,
122    ) -> Result<Arc<dyn ExecutionPlan>> {
123        Ok(DataSourceExec::from_data_source(conf))
124    }
125
126    fn file_source(&self, table_schema: TableSchema) -> Arc<dyn FileSource> {
127        Arc::new(OrcSource::new(table_schema))
128    }
129}