datafusion_datasource_orc/
source.rs1use std::any::Any;
29use std::fmt::{Debug, Formatter};
30use std::sync::Arc;
31
32use datafusion_common::config::ConfigOptions;
33use datafusion_common::Statistics;
34use datafusion_datasource::as_file_source;
35use datafusion_datasource::file::FileSource;
36use datafusion_datasource::file_scan_config::FileScanConfig;
37use datafusion_datasource::file_stream::FileOpener;
38use datafusion_datasource::TableSchema;
39use datafusion_physical_expr::conjunction;
40use datafusion_physical_expr_common::physical_expr::fmt_sql;
41use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
42use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
43use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
44use datafusion_physical_plan::DisplayFormatType;
45use object_store::ObjectStore;
46use orc_rust::predicate::Predicate as OrcPredicate;
47
48use crate::opener::OrcOpener;
49use crate::options::OrcReadOptions;
50use crate::predicate::convert_physical_expr_to_predicate;
51
52const DEFAULT_BATCH_SIZE: usize = 8192;
53
54#[derive(Clone)]
59pub struct OrcSource {
60 table_schema: TableSchema,
62 metrics: ExecutionPlanMetricsSet,
64 predicate: Option<Arc<dyn PhysicalExpr>>,
66 orc_predicate: Option<OrcPredicate>,
68 read_options: OrcReadOptions,
70 projected_statistics: Option<Statistics>,
72}
73
74impl Debug for OrcSource {
75 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
76 f.debug_struct("OrcSource")
77 .field("table_schema", &self.table_schema)
78 .field("predicate", &self.predicate)
79 .finish()
80 }
81}
82
83impl OrcSource {
84 pub fn new(table_schema: impl Into<TableSchema>) -> Self {
86 Self {
87 table_schema: table_schema.into(),
88 metrics: ExecutionPlanMetricsSet::new(),
89 predicate: None,
90 orc_predicate: None,
91 read_options: OrcReadOptions::default(),
92 projected_statistics: None,
93 }
94 }
95
96 pub fn with_read_options(mut self, read_options: OrcReadOptions) -> Self {
98 self.read_options = read_options;
99 if let Some(predicate) = self.predicate.clone() {
100 self.set_predicate(predicate);
101 }
102 self
103 }
104
105 pub fn read_options(&self) -> &OrcReadOptions {
107 &self.read_options
108 }
109
110 pub fn with_predicate(&self, predicate: Arc<dyn PhysicalExpr>) -> Self {
115 let mut source = self.clone();
116 source.set_predicate(predicate);
117 source
118 }
119
120 pub fn orc_predicate(&self) -> Option<&OrcPredicate> {
122 self.orc_predicate.as_ref()
123 }
124
125 fn set_predicate(&mut self, predicate: Arc<dyn PhysicalExpr>) {
126 self.predicate = Some(Arc::clone(&predicate));
127 if self.read_options.pushdown_predicate {
128 let file_schema = self.table_schema.file_schema();
129 self.orc_predicate =
130 convert_physical_expr_to_predicate(&predicate, file_schema.as_ref());
131 } else {
132 self.orc_predicate = None;
133 }
134 }
135}
136
137impl From<OrcSource> for Arc<dyn FileSource> {
139 fn from(source: OrcSource) -> Self {
140 as_file_source(source)
141 }
142}
143
144impl FileSource for OrcSource {
145 fn create_file_opener(
146 &self,
147 object_store: Arc<dyn ObjectStore>,
148 base_config: &FileScanConfig,
149 partition: usize,
150 ) -> Arc<dyn FileOpener> {
151 let file_schema = base_config.file_schema();
153 let projection: Arc<[usize]> = base_config
154 .file_column_projection_indices()
155 .map(|indices| indices.into())
156 .unwrap_or_else(|| (0..file_schema.fields().len()).collect::<Vec<_>>().into());
157
158 let batch_size = base_config
160 .batch_size
161 .or(self.read_options.batch_size)
162 .unwrap_or(DEFAULT_BATCH_SIZE);
163
164 let limit = base_config.limit;
166
167 let logical_file_schema = base_config.projected_file_schema();
169
170 let partition_fields = base_config.table_partition_cols().clone();
172
173 let metrics = self.metrics.clone();
175
176 let orc_predicate = self.orc_predicate.clone();
178
179 Arc::new(OrcOpener::new(
180 partition,
181 projection,
182 batch_size,
183 limit,
184 logical_file_schema,
185 partition_fields,
186 metrics,
187 object_store,
188 orc_predicate,
189 ))
190 }
191
192 fn as_any(&self) -> &dyn Any {
193 self
194 }
195
196 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
197 let mut source = self.clone();
198 source.read_options = source.read_options.with_batch_size(batch_size);
199 Arc::new(source)
200 }
201
202 fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
203 let mut source = self.clone();
204 source.table_schema = schema;
205 Arc::new(source)
206 }
207
208 fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
209 Arc::new(self.clone())
210 }
211
212 fn with_statistics(&self, statistics: datafusion_common::Statistics) -> Arc<dyn FileSource> {
213 let mut source = self.clone();
214 source.projected_statistics = Some(statistics);
215 Arc::new(source)
216 }
217
218 fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
219 if let Some(statistics) = self.projected_statistics.clone() {
220 if self.filter().is_some() {
221 Ok(statistics.to_inexact())
222 } else {
223 Ok(statistics)
224 }
225 } else {
226 Ok(datafusion_common::Statistics::new_unknown(
227 self.table_schema.table_schema().as_ref(),
228 ))
229 }
230 }
231
232 fn metrics(&self) -> &ExecutionPlanMetricsSet {
233 &self.metrics
234 }
235
236 fn file_type(&self) -> &str {
237 "orc"
238 }
239
240 fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
241 match t {
242 DisplayFormatType::Default | DisplayFormatType::Verbose => {
243 if let Some(predicate) = self.filter() {
244 write!(f, ", predicate={predicate}")?;
245 }
246 Ok(())
247 }
248 DisplayFormatType::TreeRender => {
249 if let Some(predicate) = self.filter() {
250 writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
251 }
252 Ok(())
253 }
254 }
255 }
256
257 fn try_pushdown_filters(
258 &self,
259 filters: Vec<Arc<dyn PhysicalExpr>>,
260 _config: &ConfigOptions,
261 ) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
262 let file_schema = self.table_schema.file_schema();
263 let total_filters = filters.len();
264 let mut supported = Vec::new();
265
266 for filter in filters {
267 if convert_physical_expr_to_predicate(&filter, file_schema.as_ref()).is_some() {
268 supported.push(filter);
269 }
270 }
271
272 if supported.is_empty() {
273 return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
274 vec![PushedDown::No; total_filters],
275 ));
276 }
277
278 let mut source = self.clone();
279 source.set_predicate(conjunction(supported));
282 let source = Arc::new(source);
283
284 Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![
285 PushedDown::No;
286 total_filters
287 ])
288 .with_updated_node(source))
289 }
290
291 fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
293 self.predicate.clone()
294 }
295}