1use std::sync::Arc;
46
47use datafusion::arrow::datatypes::SchemaRef;
48use datafusion::common::exec_err;
49use datafusion::config::TableOptions;
50use datafusion::dataframe::DataFrame;
51use datafusion::datasource::listing::{
52 ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
53};
54use datafusion::error::Result;
55use datafusion::execution::config::SessionConfig;
56use datafusion::execution::context::{DataFilePaths, SessionContext, SessionState};
57use datafusion::execution::options::ReadOptions;
58
59use async_trait::async_trait;
60
61mod file_format;
62mod file_source;
63mod object_store_reader;
64mod physical_exec;
65
66pub use file_format::OrcFormat;
67pub use file_source::OrcSource;
68
69#[derive(Clone)]
71pub struct OrcReadOptions<'a> {
72 pub file_extension: &'a str,
73}
74
75impl Default for OrcReadOptions<'_> {
76 fn default() -> Self {
77 Self {
78 file_extension: "orc",
79 }
80 }
81}
82
83#[async_trait]
84impl ReadOptions<'_> for OrcReadOptions<'_> {
85 fn to_listing_options(
86 &self,
87 _config: &SessionConfig,
88 _table_options: TableOptions,
89 ) -> ListingOptions {
90 ListingOptions::new(Arc::new(OrcFormat)).with_file_extension(self.file_extension)
91 }
92
93 async fn get_resolved_schema(
94 &self,
95 config: &SessionConfig,
96 state: SessionState,
97 table_path: ListingTableUrl,
98 ) -> Result<SchemaRef> {
99 self._get_resolved_schema(config, state, table_path, None)
100 .await
101 }
102}
103
104pub trait SessionContextOrcExt {
107 fn read_orc<P: DataFilePaths + Send>(
108 &self,
109 table_paths: P,
110 options: OrcReadOptions<'_>,
111 ) -> impl std::future::Future<Output = Result<DataFrame>> + Send;
112
113 fn register_orc(
114 &self,
115 name: &str,
116 table_path: &str,
117 options: OrcReadOptions<'_>,
118 ) -> impl std::future::Future<Output = Result<()>> + Send;
119}
120
121impl SessionContextOrcExt for SessionContext {
122 async fn read_orc<P: DataFilePaths + Send>(
123 &self,
124 table_paths: P,
125 options: OrcReadOptions<'_>,
126 ) -> Result<DataFrame> {
127 let table_paths = table_paths.to_urls()?;
129 let session_config = self.copied_config();
130 let listing_options = ListingOptions::new(Arc::new(OrcFormat)).with_file_extension(".orc");
131
132 let option_extension = listing_options.file_extension.clone();
133
134 if table_paths.is_empty() {
135 return exec_err!("No table paths were provided");
136 }
137
138 for path in &table_paths {
140 let file_path = path.as_str();
141 if !file_path.ends_with(option_extension.clone().as_str()) && !path.is_collection() {
142 return exec_err!(
143 "File path '{file_path}' does not match the expected extension '{option_extension}'"
144 );
145 }
146 }
147
148 let resolved_schema = options
149 .get_resolved_schema(&session_config, self.state(), table_paths[0].clone())
150 .await?;
151 let config = ListingTableConfig::new_with_multi_paths(table_paths)
152 .with_listing_options(listing_options)
153 .with_schema(resolved_schema);
154 let provider = ListingTable::try_new(config)?;
155 self.read_table(Arc::new(provider))
156 }
157
158 async fn register_orc(
159 &self,
160 name: &str,
161 table_path: &str,
162 options: OrcReadOptions<'_>,
163 ) -> Result<()> {
164 let listing_options =
165 options.to_listing_options(&self.copied_config(), self.copied_table_options());
166 self.register_listing_table(name, table_path, listing_options, None, None)
167 .await?;
168 Ok(())
169 }
170}
171
172#[cfg(test)]
173mod tests {
174 use datafusion::assert_batches_sorted_eq;
175
176 use super::*;
177
178 #[tokio::test]
179 async fn dataframe() -> Result<()> {
180 let ctx = SessionContext::new();
181 ctx.register_orc(
182 "table1",
183 "tests/basic/data/alltypes.snappy.orc",
184 OrcReadOptions::default(),
185 )
186 .await?;
187
188 let actual = ctx
189 .sql("select int16, utf8 from table1 limit 5")
190 .await?
191 .collect()
192 .await?;
193
194 assert_batches_sorted_eq!(
195 [
196 "+-------+--------+",
197 "| int16 | utf8 |",
198 "+-------+--------+",
199 "| | |",
200 "| -1 | |",
201 "| 0 | |",
202 "| 1 | a |",
203 "| 32767 | encode |",
204 "+-------+--------+",
205 ],
206 &actual
207 );
208
209 Ok(())
210 }
211}