datafusion_orc/
file_source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::physical_exec::OrcOpener;
19use datafusion::common::DataFusionError;
20use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource};
21use datafusion::datasource::table_schema::TableSchema;
22use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
23use datafusion::physical_plan::projection::ProjectionExprs;
24use object_store::ObjectStore;
25use std::any::Any;
26use std::sync::Arc;
27
28#[derive(Debug, Clone)]
29pub struct OrcSource {
30    metrics: ExecutionPlanMetricsSet,
31    batch_size: usize,
32    table_schema: TableSchema,
33    projection: ProjectionExprs,
34}
35
36impl OrcSource {
37    pub fn new(table_schema: TableSchema) -> Self {
38        let table_schema_ref = table_schema.table_schema();
39        let projection = ProjectionExprs::from_indices(
40            &(0..table_schema_ref.fields().len()).collect::<Vec<_>>(),
41            table_schema_ref,
42        );
43        Self {
44            metrics: ExecutionPlanMetricsSet::default(),
45            batch_size: 1024,
46            table_schema,
47            projection,
48        }
49    }
50}
51
52impl FileSource for OrcSource {
53    fn create_file_opener(
54        &self,
55        object_store: Arc<dyn ObjectStore>,
56        config: &FileScanConfig,
57        _partition: usize,
58    ) -> Result<Arc<dyn FileOpener>, DataFusionError> {
59        OrcOpener::try_new(
60            object_store,
61            self.table_schema.table_schema().clone(),
62            config.batch_size.unwrap_or(self.batch_size),
63            self.projection.clone(),
64        )
65        .map(|f| Arc::new(f) as Arc<dyn FileOpener>)
66    }
67
68    fn as_any(&self) -> &dyn Any {
69        self
70    }
71
72    fn table_schema(&self) -> &TableSchema {
73        &self.table_schema
74    }
75
76    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
77        Arc::new(Self {
78            batch_size,
79            ..self.clone()
80        })
81    }
82
83    fn projection(&self) -> Option<&ProjectionExprs> {
84        Some(&self.projection)
85    }
86
87    fn metrics(&self) -> &ExecutionPlanMetricsSet {
88        &self.metrics
89    }
90
91    fn file_type(&self) -> &str {
92        "orc"
93    }
94
95    fn try_pushdown_projection(
96        &self,
97        projection: &ProjectionExprs,
98    ) -> Result<Option<Arc<dyn FileSource>>, DataFusionError> {
99        let mut source = self.clone();
100        source.projection = self.projection.try_merge(projection)?;
101        Ok(Some(Arc::new(source)))
102    }
103}