datafusion_datasource_orc/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ORC [`FileFormat`] implementations and factory wiring.
19//!
20//! This module provides the entry points DataFusion uses to discover ORC support
21//! via [`FileFormatFactory`] and to build physical plans via [`FileFormat`].
22
23use std::any::Any;
24use std::sync::Arc;
25
26use async_trait::async_trait;
27use datafusion_common::{GetExt, Result};
28use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
29use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
30use datafusion_datasource::source::DataSourceExec;
31use datafusion_datasource::TableSchema;
32use datafusion_physical_plan::ExecutionPlan;
33use futures::StreamExt;
34use futures::TryStreamExt;
35use object_store::ObjectStore;
36
37use crate::metadata::{read_orc_schema, read_orc_statistics};
38use crate::options::OrcFormatOptions;
39use crate::source::OrcSource;
40
41/// Factory for creating [`OrcFormat`] instances.
42#[derive(Debug, Default)]
43pub struct OrcFormatFactory {
44    options: Option<OrcFormatOptions>,
45}
46
47impl OrcFormatFactory {
48    /// Creates an instance of [`OrcFormatFactory`]
49    pub fn new() -> Self {
50        Self { options: None }
51    }
52
53    /// Creates an instance of [`OrcFormatFactory`] with default options.
54    pub fn new_with_options(options: OrcFormatOptions) -> Self {
55        Self {
56            options: Some(options),
57        }
58    }
59}
60
61impl FileFormatFactory for OrcFormatFactory {
62    fn create(
63        &self,
64        _state: &dyn datafusion_session::Session,
65        format_options: &std::collections::HashMap<String, String>,
66    ) -> Result<Arc<dyn FileFormat>> {
67        let mut options = self.options.clone().unwrap_or_default();
68        // TODO: plumb DataFusion session defaults once ORC config options exist.
69        options.apply_format_options(format_options)?;
70        Ok(Arc::new(OrcFormat::default().with_options(options)))
71    }
72
73    fn default(&self) -> Arc<dyn FileFormat> {
74        Arc::new(OrcFormat::default())
75    }
76
77    fn as_any(&self) -> &dyn Any {
78        self
79    }
80}
81
82impl GetExt for OrcFormatFactory {
83    fn get_ext(&self) -> String {
84        "orc".to_string()
85    }
86}
87
88/// The Apache ORC [`FileFormat`] implementation.
89#[derive(Debug, Default, Clone)]
90pub struct OrcFormat {
91    options: OrcFormatOptions,
92}
93
94impl OrcFormat {
95    /// Construct a new Format with default options
96    pub fn new() -> Self {
97        Self::default()
98    }
99
100    /// Override ORC format options.
101    pub fn with_options(mut self, options: OrcFormatOptions) -> Self {
102        self.options = options;
103        self
104    }
105
106    /// Return the configured ORC format options.
107    pub fn options(&self) -> &OrcFormatOptions {
108        &self.options
109    }
110}
111
112#[async_trait]
113impl FileFormat for OrcFormat {
114    fn as_any(&self) -> &dyn Any {
115        self
116    }
117
118    fn get_ext(&self) -> String {
119        "orc".to_string()
120    }
121
122    fn get_ext_with_compression(
123        &self,
124        _file_compression_type: &datafusion_datasource::file_compression_type::FileCompressionType,
125    ) -> Result<String> {
126        // ORC files have built-in compression, so the extension is always "orc"
127        Ok("orc".to_string())
128    }
129
130    fn compression_type(
131        &self,
132    ) -> Option<datafusion_datasource::file_compression_type::FileCompressionType> {
133        // ORC files have built-in compression support
134        None
135    }
136
137    async fn infer_schema(
138        &self,
139        state: &dyn datafusion_session::Session,
140        store: &Arc<dyn ObjectStore>,
141        objects: &[object_store::ObjectMeta],
142    ) -> Result<arrow::datatypes::SchemaRef> {
143        use futures::stream::iter;
144
145        // Read schemas from all objects concurrently
146        let store_clone = Arc::clone(store);
147        let schemas: Vec<_> = iter(objects.iter())
148            .map(|object| {
149                let store = Arc::clone(&store_clone);
150                async move { read_orc_schema(&store, object).await }
151            })
152            .boxed() // Workaround for lifetime issues
153            .buffered(state.config_options().execution.meta_fetch_concurrency)
154            .try_collect()
155            .await?;
156
157        // Merge all schemas
158        // Schema::try_merge needs owned Schema, not Arc<Schema>
159        let schemas: Vec<_> = schemas.into_iter().map(|s| (*s).clone()).collect();
160        let merged_schema = arrow::datatypes::Schema::try_merge(schemas)?;
161        Ok(Arc::new(merged_schema))
162    }
163
164    async fn infer_stats(
165        &self,
166        _state: &dyn datafusion_session::Session,
167        store: &Arc<dyn ObjectStore>,
168        table_schema: arrow::datatypes::SchemaRef,
169        object: &object_store::ObjectMeta,
170    ) -> Result<datafusion_common::Statistics> {
171        read_orc_statistics(store, object, table_schema).await
172    }
173
174    async fn create_physical_plan(
175        &self,
176        _state: &dyn datafusion_session::Session,
177        conf: datafusion_datasource::file_scan_config::FileScanConfig,
178    ) -> Result<Arc<dyn ExecutionPlan>> {
179        // Create OrcSource from the file scan config
180        // conf.file_schema() returns SchemaRef, we need to convert it to TableSchema
181        let file_schema = conf.file_schema();
182        let table_schema = TableSchema::from_file_schema(file_schema.clone());
183        let source =
184            Arc::new(OrcSource::new(table_schema).with_read_options(self.options.read.clone()));
185
186        // Create new FileScanConfig with OrcSource
187        let conf = FileScanConfigBuilder::from(conf)
188            .with_source(source)
189            .build();
190
191        // Create DataSourceExec
192        Ok(DataSourceExec::from_data_source(conf))
193    }
194
195    fn file_source(&self) -> Arc<dyn datafusion_datasource::file::FileSource> {
196        // Return a default OrcSource
197        // The actual schema will be set when create_physical_plan is called
198        Arc::new(
199            OrcSource::new(TableSchema::from_file_schema(Arc::new(
200                arrow::datatypes::Schema::empty(),
201            )))
202            .with_read_options(self.options.read.clone()),
203        )
204    }
205}