datafusion_orc/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::any::Any;
19use std::collections::HashMap;
20use std::fmt::Debug;
21use std::sync::Arc;
22
23use datafusion::arrow::datatypes::{Schema, SchemaRef};
24use datafusion::common::Statistics;
25use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
26use datafusion::datasource::file_format::FileFormat;
27use datafusion::datasource::physical_plan::{FileScanConfig, FileSource};
28use datafusion::error::{DataFusionError, Result};
29use datafusion::physical_plan::ExecutionPlan;
30use futures::TryStreamExt;
31use orc_rust::reader::metadata::read_metadata_async;
32
33use crate::OrcSource;
34use async_trait::async_trait;
35use datafusion::catalog::Session;
36use datafusion::datasource::source::DataSourceExec;
37use futures_util::StreamExt;
38use object_store::path::Path;
39use object_store::{ObjectMeta, ObjectStore};
40
41use super::object_store_reader::ObjectStoreReader;
42
43async fn fetch_schema(store: &Arc<dyn ObjectStore>, file: &ObjectMeta) -> Result<(Path, Schema)> {
44    let loc_path = file.location.clone();
45    let mut reader = ObjectStoreReader::new(Arc::clone(store), file.clone());
46    let metadata = read_metadata_async(&mut reader)
47        .await
48        .map_err(|e| DataFusionError::External(Box::new(e)))?;
49    let schema = metadata
50        .root_data_type()
51        .create_arrow_schema(&HashMap::default());
52    Ok((loc_path, schema))
53}
54
55#[derive(Clone, Debug)]
56pub struct OrcFormat;
57
58#[async_trait]
59impl FileFormat for OrcFormat {
60    fn as_any(&self) -> &dyn Any {
61        self
62    }
63
64    fn get_ext(&self) -> String {
65        "orc".to_string()
66    }
67
68    fn get_ext_with_compression(&self, _compression: &FileCompressionType) -> Result<String> {
69        Ok("orc".to_string())
70    }
71
72    fn compression_type(&self) -> Option<FileCompressionType> {
73        None
74    }
75
76    async fn infer_schema(
77        &self,
78        state: &dyn Session,
79        store: &Arc<dyn ObjectStore>,
80        objects: &[ObjectMeta],
81    ) -> Result<SchemaRef> {
82        let mut schemas: Vec<_> = futures::stream::iter(objects)
83            .map(|object| fetch_schema(store, object))
84            .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552
85            .buffered(state.config_options().execution.meta_fetch_concurrency)
86            .try_collect()
87            .await?;
88
89        // Schema inference adds fields based the order they are seen
90        // which depends on the order the files are processed. For some
91        // object stores (like local file systems) the order returned from list
92        // is not deterministic. Thus, to ensure deterministic schema inference
93        // sort the files first.
94        // https://github.com/apache/datafusion/pull/6629
95        schemas.sort_by(|(location1, _), (location2, _)| location1.cmp(location2));
96
97        let schemas = schemas
98            .into_iter()
99            .map(|(_, schema)| schema)
100            .collect::<Vec<_>>();
101
102        let schema = Schema::try_merge(schemas)?;
103
104        Ok(Arc::new(schema))
105    }
106
107    async fn infer_stats(
108        &self,
109        _state: &dyn Session,
110        _store: &Arc<dyn ObjectStore>,
111        table_schema: SchemaRef,
112        _object: &ObjectMeta,
113    ) -> Result<Statistics> {
114        Ok(Statistics::new_unknown(&table_schema))
115    }
116
117    async fn create_physical_plan(
118        &self,
119        _state: &dyn Session,
120        conf: FileScanConfig,
121    ) -> Result<Arc<dyn ExecutionPlan>> {
122        Ok(DataSourceExec::from_data_source(conf))
123    }
124
125    fn file_source(&self) -> Arc<dyn FileSource> {
126        Arc::new(OrcSource::default())
127    }
128}