datafusion_orc/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Integration with [Apache DataFusion](https://datafusion.apache.org/) query engine to
19//! allow querying ORC files with a SQL/DataFrame API.
20//!
21//! # Example usage
22//!
23//! ```no_run
24//! # use datafusion::prelude::*;
25//! # use datafusion::error::Result;
26//! # use datafusion_orc::{OrcReadOptions, SessionContextOrcExt};
27//! # #[tokio::main]
28//! # async fn main() -> Result<()> {
29//! let ctx = SessionContext::new();
30//! ctx.register_orc(
31//!     "table1",
32//!     "/path/to/file.orc",
33//!     OrcReadOptions::default(),
34//! )
35//! .await?;
36//!
37//! ctx.sql("select a, b from table1")
38//!     .await?
39//!     .show()
40//!     .await?;
41//! # Ok(())
42//! # }
43//! ```
44
45use std::sync::Arc;
46
47use datafusion::arrow::datatypes::SchemaRef;
48use datafusion::common::exec_err;
49use datafusion::config::TableOptions;
50use datafusion::dataframe::DataFrame;
51use datafusion::datasource::listing::{
52    ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
53};
54use datafusion::error::Result;
55use datafusion::execution::config::SessionConfig;
56use datafusion::execution::context::{DataFilePaths, SessionContext, SessionState};
57use datafusion::execution::options::ReadOptions;
58
59use async_trait::async_trait;
60
61mod file_format;
62mod file_source;
63mod object_store_reader;
64mod physical_exec;
65
66pub use file_format::OrcFormat;
67pub use file_source::OrcSource;
68
69/// Configuration options for reading ORC files.
70#[derive(Clone)]
71pub struct OrcReadOptions<'a> {
72    pub file_extension: &'a str,
73}
74
75impl Default for OrcReadOptions<'_> {
76    fn default() -> Self {
77        Self {
78            file_extension: "orc",
79        }
80    }
81}
82
83#[async_trait]
84impl ReadOptions<'_> for OrcReadOptions<'_> {
85    fn to_listing_options(
86        &self,
87        _config: &SessionConfig,
88        _table_options: TableOptions,
89    ) -> ListingOptions {
90        ListingOptions::new(Arc::new(OrcFormat)).with_file_extension(self.file_extension)
91    }
92
93    async fn get_resolved_schema(
94        &self,
95        config: &SessionConfig,
96        state: SessionState,
97        table_path: ListingTableUrl,
98    ) -> Result<SchemaRef> {
99        self._get_resolved_schema(config, state, table_path, None)
100            .await
101    }
102}
103
104/// Exposes new functions for registering ORC tables onto a DataFusion [`SessionContext`]
105/// to enable querying them using the SQL or DataFrame API.
106pub trait SessionContextOrcExt {
107    fn read_orc<P: DataFilePaths + Send>(
108        &self,
109        table_paths: P,
110        options: OrcReadOptions<'_>,
111    ) -> impl std::future::Future<Output = Result<DataFrame>> + Send;
112
113    fn register_orc(
114        &self,
115        name: &str,
116        table_path: &str,
117        options: OrcReadOptions<'_>,
118    ) -> impl std::future::Future<Output = Result<()>> + Send;
119}
120
121impl SessionContextOrcExt for SessionContext {
122    async fn read_orc<P: DataFilePaths + Send>(
123        &self,
124        table_paths: P,
125        options: OrcReadOptions<'_>,
126    ) -> Result<DataFrame> {
127        // SessionContext::_read_type
128        let table_paths = table_paths.to_urls()?;
129        let session_config = self.copied_config();
130        let listing_options = ListingOptions::new(Arc::new(OrcFormat)).with_file_extension(".orc");
131
132        let option_extension = listing_options.file_extension.clone();
133
134        if table_paths.is_empty() {
135            return exec_err!("No table paths were provided");
136        }
137
138        // check if the file extension matches the expected extension
139        for path in &table_paths {
140            let file_path = path.as_str();
141            if !file_path.ends_with(option_extension.clone().as_str()) && !path.is_collection() {
142                return exec_err!(
143                    "File path '{file_path}' does not match the expected extension '{option_extension}'"
144                );
145            }
146        }
147
148        let resolved_schema = options
149            .get_resolved_schema(&session_config, self.state(), table_paths[0].clone())
150            .await?;
151        let config = ListingTableConfig::new_with_multi_paths(table_paths)
152            .with_listing_options(listing_options)
153            .with_schema(resolved_schema);
154        let provider = ListingTable::try_new(config)?;
155        self.read_table(Arc::new(provider))
156    }
157
158    async fn register_orc(
159        &self,
160        name: &str,
161        table_path: &str,
162        options: OrcReadOptions<'_>,
163    ) -> Result<()> {
164        let listing_options =
165            options.to_listing_options(&self.copied_config(), self.copied_table_options());
166        self.register_listing_table(name, table_path, listing_options, None, None)
167            .await?;
168        Ok(())
169    }
170}
171
172#[cfg(test)]
173mod tests {
174    use datafusion::assert_batches_sorted_eq;
175
176    use super::*;
177
178    #[tokio::test]
179    async fn dataframe() -> Result<()> {
180        let ctx = SessionContext::new();
181        ctx.register_orc(
182            "table1",
183            "tests/basic/data/alltypes.snappy.orc",
184            OrcReadOptions::default(),
185        )
186        .await?;
187
188        let actual = ctx
189            .sql("select int16, utf8 from table1 limit 5")
190            .await?
191            .collect()
192            .await?;
193
194        assert_batches_sorted_eq!(
195            [
196                "+-------+--------+",
197                "| int16 | utf8   |",
198                "+-------+--------+",
199                "|       |        |",
200                "| -1    |        |",
201                "| 0     |        |",
202                "| 1     | a      |",
203                "| 32767 | encode |",
204                "+-------+--------+",
205            ],
206            &actual
207        );
208
209        Ok(())
210    }
211}