datafusion_datasource_orc/
file_format.rs1use std::any::Any;
24use std::sync::Arc;
25
26use async_trait::async_trait;
27use datafusion_common::{GetExt, Result};
28use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
29use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
30use datafusion_datasource::source::DataSourceExec;
31use datafusion_datasource::TableSchema;
32use datafusion_physical_plan::ExecutionPlan;
33use futures::StreamExt;
34use futures::TryStreamExt;
35use object_store::ObjectStore;
36
37use crate::metadata::{read_orc_schema, read_orc_statistics};
38use crate::options::OrcFormatOptions;
39use crate::source::OrcSource;
40
41#[derive(Debug, Default)]
43pub struct OrcFormatFactory {
44 options: Option<OrcFormatOptions>,
45}
46
47impl OrcFormatFactory {
48 pub fn new() -> Self {
50 Self { options: None }
51 }
52
53 pub fn new_with_options(options: OrcFormatOptions) -> Self {
55 Self {
56 options: Some(options),
57 }
58 }
59}
60
61impl FileFormatFactory for OrcFormatFactory {
62 fn create(
63 &self,
64 _state: &dyn datafusion_session::Session,
65 format_options: &std::collections::HashMap<String, String>,
66 ) -> Result<Arc<dyn FileFormat>> {
67 let mut options = self.options.clone().unwrap_or_default();
68 options.apply_format_options(format_options)?;
70 Ok(Arc::new(OrcFormat::default().with_options(options)))
71 }
72
73 fn default(&self) -> Arc<dyn FileFormat> {
74 Arc::new(OrcFormat::default())
75 }
76
77 fn as_any(&self) -> &dyn Any {
78 self
79 }
80}
81
82impl GetExt for OrcFormatFactory {
83 fn get_ext(&self) -> String {
84 "orc".to_string()
85 }
86}
87
88#[derive(Debug, Default, Clone)]
90pub struct OrcFormat {
91 options: OrcFormatOptions,
92}
93
94impl OrcFormat {
95 pub fn new() -> Self {
97 Self::default()
98 }
99
100 pub fn with_options(mut self, options: OrcFormatOptions) -> Self {
102 self.options = options;
103 self
104 }
105
106 pub fn options(&self) -> &OrcFormatOptions {
108 &self.options
109 }
110}
111
112#[async_trait]
113impl FileFormat for OrcFormat {
114 fn as_any(&self) -> &dyn Any {
115 self
116 }
117
118 fn get_ext(&self) -> String {
119 "orc".to_string()
120 }
121
122 fn get_ext_with_compression(
123 &self,
124 _file_compression_type: &datafusion_datasource::file_compression_type::FileCompressionType,
125 ) -> Result<String> {
126 Ok("orc".to_string())
128 }
129
130 fn compression_type(
131 &self,
132 ) -> Option<datafusion_datasource::file_compression_type::FileCompressionType> {
133 None
135 }
136
137 async fn infer_schema(
138 &self,
139 state: &dyn datafusion_session::Session,
140 store: &Arc<dyn ObjectStore>,
141 objects: &[object_store::ObjectMeta],
142 ) -> Result<arrow::datatypes::SchemaRef> {
143 use futures::stream::iter;
144
145 let store_clone = Arc::clone(store);
147 let schemas: Vec<_> = iter(objects.iter())
148 .map(|object| {
149 let store = Arc::clone(&store_clone);
150 async move { read_orc_schema(&store, object).await }
151 })
152 .boxed() .buffered(state.config_options().execution.meta_fetch_concurrency)
154 .try_collect()
155 .await?;
156
157 let schemas: Vec<_> = schemas.into_iter().map(|s| (*s).clone()).collect();
160 let merged_schema = arrow::datatypes::Schema::try_merge(schemas)?;
161 Ok(Arc::new(merged_schema))
162 }
163
164 async fn infer_stats(
165 &self,
166 _state: &dyn datafusion_session::Session,
167 store: &Arc<dyn ObjectStore>,
168 table_schema: arrow::datatypes::SchemaRef,
169 object: &object_store::ObjectMeta,
170 ) -> Result<datafusion_common::Statistics> {
171 read_orc_statistics(store, object, table_schema).await
172 }
173
174 async fn create_physical_plan(
175 &self,
176 _state: &dyn datafusion_session::Session,
177 conf: datafusion_datasource::file_scan_config::FileScanConfig,
178 ) -> Result<Arc<dyn ExecutionPlan>> {
179 let file_schema = conf.file_schema();
182 let table_schema = TableSchema::from_file_schema(file_schema.clone());
183 let source =
184 Arc::new(OrcSource::new(table_schema).with_read_options(self.options.read.clone()));
185
186 let conf = FileScanConfigBuilder::from(conf)
188 .with_source(source)
189 .build();
190
191 Ok(DataSourceExec::from_data_source(conf))
193 }
194
195 fn file_source(&self) -> Arc<dyn datafusion_datasource::file::FileSource> {
196 Arc::new(
199 OrcSource::new(TableSchema::from_file_schema(Arc::new(
200 arrow::datatypes::Schema::empty(),
201 )))
202 .with_read_options(self.options.read.clone()),
203 )
204 }
205}