datafusion_datasource_avro/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading line-delimited Avro files
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::avro_to_arrow::Reader as AvroReader;
24
25use arrow::datatypes::SchemaRef;
26use datafusion_common::error::Result;
27use datafusion_common::Statistics;
28use datafusion_datasource::file::FileSource;
29use datafusion_datasource::file_scan_config::FileScanConfig;
30use datafusion_datasource::file_stream::FileOpener;
31use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
32use datafusion_physical_expr_common::sort_expr::LexOrdering;
33use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
34
35use object_store::ObjectStore;
36
37/// AvroSource holds the extra configuration that is necessary for opening avro files
38#[derive(Clone, Default)]
39pub struct AvroSource {
40    schema: Option<SchemaRef>,
41    batch_size: Option<usize>,
42    projection: Option<Vec<String>>,
43    metrics: ExecutionPlanMetricsSet,
44    projected_statistics: Option<Statistics>,
45    schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
46}
47
48impl AvroSource {
49    /// Initialize an AvroSource with default values
50    pub fn new() -> Self {
51        Self::default()
52    }
53
54    fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static, R>> {
55        AvroReader::try_new(
56            reader,
57            Arc::clone(self.schema.as_ref().expect("Schema must set before open")),
58            self.batch_size.expect("Batch size must set before open"),
59            self.projection.clone(),
60        )
61    }
62}
63
64impl FileSource for AvroSource {
65    fn create_file_opener(
66        &self,
67        object_store: Arc<dyn ObjectStore>,
68        _base_config: &FileScanConfig,
69        _partition: usize,
70    ) -> Arc<dyn FileOpener> {
71        Arc::new(private::AvroOpener {
72            config: Arc::new(self.clone()),
73            object_store,
74        })
75    }
76
77    fn as_any(&self) -> &dyn Any {
78        self
79    }
80
81    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
82        let mut conf = self.clone();
83        conf.batch_size = Some(batch_size);
84        Arc::new(conf)
85    }
86
87    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
88        let mut conf = self.clone();
89        conf.schema = Some(schema);
90        Arc::new(conf)
91    }
92    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
93        let mut conf = self.clone();
94        conf.projected_statistics = Some(statistics);
95        Arc::new(conf)
96    }
97
98    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
99        let mut conf = self.clone();
100        conf.projection = config.projected_file_column_names();
101        Arc::new(conf)
102    }
103
104    fn metrics(&self) -> &ExecutionPlanMetricsSet {
105        &self.metrics
106    }
107
108    fn statistics(&self) -> Result<Statistics> {
109        let statistics = &self.projected_statistics;
110        Ok(statistics
111            .clone()
112            .expect("projected_statistics must be set"))
113    }
114
115    fn file_type(&self) -> &str {
116        "avro"
117    }
118
119    fn repartitioned(
120        &self,
121        _target_partitions: usize,
122        _repartition_file_min_size: usize,
123        _output_ordering: Option<LexOrdering>,
124        _config: &FileScanConfig,
125    ) -> Result<Option<FileScanConfig>> {
126        Ok(None)
127    }
128
129    fn with_schema_adapter_factory(
130        &self,
131        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
132    ) -> Result<Arc<dyn FileSource>> {
133        Ok(Arc::new(Self {
134            schema_adapter_factory: Some(schema_adapter_factory),
135            ..self.clone()
136        }))
137    }
138
139    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
140        self.schema_adapter_factory.clone()
141    }
142}
143
144mod private {
145    use super::*;
146
147    use bytes::Buf;
148    use datafusion_datasource::{
149        file_meta::FileMeta, file_stream::FileOpenFuture, PartitionedFile,
150    };
151    use futures::StreamExt;
152    use object_store::{GetResultPayload, ObjectStore};
153
154    pub struct AvroOpener {
155        pub config: Arc<AvroSource>,
156        pub object_store: Arc<dyn ObjectStore>,
157    }
158
159    impl FileOpener for AvroOpener {
160        fn open(
161            &self,
162            file_meta: FileMeta,
163            _file: PartitionedFile,
164        ) -> Result<FileOpenFuture> {
165            let config = Arc::clone(&self.config);
166            let object_store = Arc::clone(&self.object_store);
167            Ok(Box::pin(async move {
168                let r = object_store.get(file_meta.location()).await?;
169                match r.payload {
170                    GetResultPayload::File(file, _) => {
171                        let reader = config.open(file)?;
172                        Ok(futures::stream::iter(reader).boxed())
173                    }
174                    GetResultPayload::Stream(_) => {
175                        let bytes = r.bytes().await?;
176                        let reader = config.open(bytes.reader())?;
177                        Ok(futures::stream::iter(reader).boxed())
178                    }
179                }
180            }))
181        }
182    }
183}