datafusion_orc/
file_source.rs1use crate::physical_exec::OrcOpener;
19use datafusion::common::DataFusionError;
20use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource};
21use datafusion::datasource::table_schema::TableSchema;
22use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
23use datafusion::physical_plan::projection::ProjectionExprs;
24use object_store::ObjectStore;
25use std::any::Any;
26use std::sync::Arc;
27
28#[derive(Debug, Clone)]
29pub struct OrcSource {
30 metrics: ExecutionPlanMetricsSet,
31 batch_size: usize,
32 table_schema: TableSchema,
33 projection: ProjectionExprs,
34}
35
36impl OrcSource {
37 pub fn new(table_schema: TableSchema) -> Self {
38 let table_schema_ref = table_schema.table_schema();
39 let projection = ProjectionExprs::from_indices(
40 &(0..table_schema_ref.fields().len()).collect::<Vec<_>>(),
41 table_schema_ref,
42 );
43 Self {
44 metrics: ExecutionPlanMetricsSet::default(),
45 batch_size: 1024,
46 table_schema,
47 projection,
48 }
49 }
50}
51
52impl FileSource for OrcSource {
53 fn create_file_opener(
54 &self,
55 object_store: Arc<dyn ObjectStore>,
56 config: &FileScanConfig,
57 _partition: usize,
58 ) -> Result<Arc<dyn FileOpener>, DataFusionError> {
59 OrcOpener::try_new(
60 object_store,
61 self.table_schema.table_schema().clone(),
62 config.batch_size.unwrap_or(self.batch_size),
63 self.projection.clone(),
64 )
65 .map(|f| Arc::new(f) as Arc<dyn FileOpener>)
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71
72 fn table_schema(&self) -> &TableSchema {
73 &self.table_schema
74 }
75
76 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
77 Arc::new(Self {
78 batch_size,
79 ..self.clone()
80 })
81 }
82
83 fn projection(&self) -> Option<&ProjectionExprs> {
84 Some(&self.projection)
85 }
86
87 fn metrics(&self) -> &ExecutionPlanMetricsSet {
88 &self.metrics
89 }
90
91 fn file_type(&self) -> &str {
92 "orc"
93 }
94
95 fn try_pushdown_projection(
96 &self,
97 projection: &ProjectionExprs,
98 ) -> Result<Option<Arc<dyn FileSource>>, DataFusionError> {
99 let mut source = self.clone();
100 source.projection = self.projection.try_merge(projection)?;
101 Ok(Some(Arc::new(source)))
102 }
103}