datafusion_datasource_orc/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ORC [`FileSource`] implementation for DataFusion scans.
19//!
20//! `OrcSource` wires DataFusion's [`FileScanConfig`] to an ORC-specific
21//! [`FileOpener`] and manages scan-level options such as projections, limits,
22//! and predicate pushdown.
23//!
24//! [`FileScanConfig`]: datafusion_datasource::file_scan_config::FileScanConfig
25//! [`FileOpener`]: datafusion_datasource::file_stream::FileOpener
26//! [`FileSource`]: datafusion_datasource::file::FileSource
27
28use std::any::Any;
29use std::fmt::{Debug, Formatter};
30use std::sync::Arc;
31
32use datafusion_common::config::ConfigOptions;
33use datafusion_common::Statistics;
34use datafusion_datasource::as_file_source;
35use datafusion_datasource::file::FileSource;
36use datafusion_datasource::file_scan_config::FileScanConfig;
37use datafusion_datasource::file_stream::FileOpener;
38use datafusion_datasource::TableSchema;
39use datafusion_physical_expr::conjunction;
40use datafusion_physical_expr_common::physical_expr::fmt_sql;
41use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
42use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
43use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
44use datafusion_physical_plan::DisplayFormatType;
45use object_store::ObjectStore;
46use orc_rust::predicate::Predicate as OrcPredicate;
47
48use crate::opener::OrcOpener;
49use crate::options::OrcReadOptions;
50use crate::predicate::convert_physical_expr_to_predicate;
51
52const DEFAULT_BATCH_SIZE: usize = 8192;
53
54/// Execution plan for reading one or more ORC files.
55///
56/// Supports projection, limit pushdown, and stripe-level predicate pushdown.
57/// TODO: Add schema adapter support and column statistics pruning.
58#[derive(Clone)]
59pub struct OrcSource {
60    /// Table schema
61    table_schema: TableSchema,
62    /// Execution plan metrics
63    metrics: ExecutionPlanMetricsSet,
64    /// Optional predicate filter pushed down from DataFusion
65    predicate: Option<Arc<dyn PhysicalExpr>>,
66    /// Converted orc-rust predicate (cached for efficiency)
67    orc_predicate: Option<OrcPredicate>,
68    /// Read-path options for ORC scans
69    read_options: OrcReadOptions,
70    /// Statistics projected to the scan schema, if available
71    projected_statistics: Option<Statistics>,
72}
73
74impl Debug for OrcSource {
75    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
76        f.debug_struct("OrcSource")
77            .field("table_schema", &self.table_schema)
78            .field("predicate", &self.predicate)
79            .finish()
80    }
81}
82
83impl OrcSource {
84    /// Create a new OrcSource
85    pub fn new(table_schema: impl Into<TableSchema>) -> Self {
86        Self {
87            table_schema: table_schema.into(),
88            metrics: ExecutionPlanMetricsSet::new(),
89            predicate: None,
90            orc_predicate: None,
91            read_options: OrcReadOptions::default(),
92            projected_statistics: None,
93        }
94    }
95
96    /// Set read options for ORC scans.
97    pub fn with_read_options(mut self, read_options: OrcReadOptions) -> Self {
98        self.read_options = read_options;
99        if let Some(predicate) = self.predicate.clone() {
100            self.set_predicate(predicate);
101        }
102        self
103    }
104
105    /// Return the current read options.
106    pub fn read_options(&self) -> &OrcReadOptions {
107        &self.read_options
108    }
109
110    /// Create a new OrcSource with a predicate filter
111    ///
112    /// The predicate will be converted to an orc-rust Predicate and used
113    /// for stripe-level filtering during file reads.
114    pub fn with_predicate(&self, predicate: Arc<dyn PhysicalExpr>) -> Self {
115        let mut source = self.clone();
116        source.set_predicate(predicate);
117        source
118    }
119
120    /// Get the orc-rust predicate (if conversion was successful)
121    pub fn orc_predicate(&self) -> Option<&OrcPredicate> {
122        self.orc_predicate.as_ref()
123    }
124
125    fn set_predicate(&mut self, predicate: Arc<dyn PhysicalExpr>) {
126        self.predicate = Some(Arc::clone(&predicate));
127        if self.read_options.pushdown_predicate {
128            let file_schema = self.table_schema.file_schema();
129            self.orc_predicate =
130                convert_physical_expr_to_predicate(&predicate, file_schema.as_ref());
131        } else {
132            self.orc_predicate = None;
133        }
134    }
135}
136
137/// Allows easy conversion from OrcSource to Arc<dyn FileSource>
138impl From<OrcSource> for Arc<dyn FileSource> {
139    fn from(source: OrcSource) -> Self {
140        as_file_source(source)
141    }
142}
143
144impl FileSource for OrcSource {
145    fn create_file_opener(
146        &self,
147        object_store: Arc<dyn ObjectStore>,
148        base_config: &FileScanConfig,
149        partition: usize,
150    ) -> Arc<dyn FileOpener> {
151        // Extract projection indices from the file scan config
152        let file_schema = base_config.file_schema();
153        let projection: Arc<[usize]> = base_config
154            .file_column_projection_indices()
155            .map(|indices| indices.into())
156            .unwrap_or_else(|| (0..file_schema.fields().len()).collect::<Vec<_>>().into());
157
158        // Get batch size from config or use default
159        let batch_size = base_config
160            .batch_size
161            .or(self.read_options.batch_size)
162            .unwrap_or(DEFAULT_BATCH_SIZE);
163
164        // Get limit from config
165        let limit = base_config.limit;
166
167        // Get projected file schema (without partition columns)
168        let logical_file_schema = base_config.projected_file_schema();
169
170        // Get partition fields
171        let partition_fields = base_config.table_partition_cols().clone();
172
173        // Get metrics
174        let metrics = self.metrics.clone();
175
176        // Clone the orc predicate for the opener
177        let orc_predicate = self.orc_predicate.clone();
178
179        Arc::new(OrcOpener::new(
180            partition,
181            projection,
182            batch_size,
183            limit,
184            logical_file_schema,
185            partition_fields,
186            metrics,
187            object_store,
188            orc_predicate,
189        ))
190    }
191
192    fn as_any(&self) -> &dyn Any {
193        self
194    }
195
196    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
197        let mut source = self.clone();
198        source.read_options = source.read_options.with_batch_size(batch_size);
199        Arc::new(source)
200    }
201
202    fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
203        let mut source = self.clone();
204        source.table_schema = schema;
205        Arc::new(source)
206    }
207
208    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
209        Arc::new(self.clone())
210    }
211
212    fn with_statistics(&self, statistics: datafusion_common::Statistics) -> Arc<dyn FileSource> {
213        let mut source = self.clone();
214        source.projected_statistics = Some(statistics);
215        Arc::new(source)
216    }
217
218    fn statistics(&self) -> datafusion_common::Result<datafusion_common::Statistics> {
219        if let Some(statistics) = self.projected_statistics.clone() {
220            if self.filter().is_some() {
221                Ok(statistics.to_inexact())
222            } else {
223                Ok(statistics)
224            }
225        } else {
226            Ok(datafusion_common::Statistics::new_unknown(
227                self.table_schema.table_schema().as_ref(),
228            ))
229        }
230    }
231
232    fn metrics(&self) -> &ExecutionPlanMetricsSet {
233        &self.metrics
234    }
235
236    fn file_type(&self) -> &str {
237        "orc"
238    }
239
240    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
241        match t {
242            DisplayFormatType::Default | DisplayFormatType::Verbose => {
243                if let Some(predicate) = self.filter() {
244                    write!(f, ", predicate={predicate}")?;
245                }
246                Ok(())
247            }
248            DisplayFormatType::TreeRender => {
249                if let Some(predicate) = self.filter() {
250                    writeln!(f, "predicate={}", fmt_sql(predicate.as_ref()))?;
251                }
252                Ok(())
253            }
254        }
255    }
256
257    fn try_pushdown_filters(
258        &self,
259        filters: Vec<Arc<dyn PhysicalExpr>>,
260        _config: &ConfigOptions,
261    ) -> datafusion_common::Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
262        let file_schema = self.table_schema.file_schema();
263        let total_filters = filters.len();
264        let mut supported = Vec::new();
265
266        for filter in filters {
267            if convert_physical_expr_to_predicate(&filter, file_schema.as_ref()).is_some() {
268                supported.push(filter);
269            }
270        }
271
272        if supported.is_empty() {
273            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
274                vec![PushedDown::No; total_filters],
275            ));
276        }
277
278        let mut source = self.clone();
279        // ORC predicates are currently used for stripe-level pruning only. We
280        // still report `No` so DataFusion applies the full filter above.
281        source.set_predicate(conjunction(supported));
282        let source = Arc::new(source);
283
284        Ok(FilterPushdownPropagation::with_parent_pushdown_result(vec![
285            PushedDown::No;
286            total_filters
287        ])
288        .with_updated_node(source))
289    }
290
291    /// Returns the filter expression that will be applied during the file scan.
292    fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
293        self.predicate.clone()
294    }
295}