datafusion_datasource_avro/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading line-delimited Avro files
19
20use std::any::Any;
21use std::sync::Arc;
22
23use crate::avro_to_arrow::Reader as AvroReader;
24
25use datafusion_common::error::Result;
26use datafusion_datasource::TableSchema;
27use datafusion_datasource::file::FileSource;
28use datafusion_datasource::file_scan_config::FileScanConfig;
29use datafusion_datasource::file_stream::FileOpener;
30use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
31use datafusion_physical_expr_common::sort_expr::LexOrdering;
32use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
33use datafusion_physical_plan::projection::ProjectionExprs;
34
35use object_store::ObjectStore;
36
37/// AvroSource holds the extra configuration that is necessary for opening avro files
38#[derive(Clone)]
39pub struct AvroSource {
40    table_schema: TableSchema,
41    batch_size: Option<usize>,
42    projection: SplitProjection,
43    metrics: ExecutionPlanMetricsSet,
44}
45
46impl AvroSource {
47    /// Initialize an AvroSource with the provided schema
48    pub fn new(table_schema: impl Into<TableSchema>) -> Self {
49        let table_schema = table_schema.into();
50        Self {
51            projection: SplitProjection::unprojected(&table_schema),
52            table_schema,
53            batch_size: None,
54            metrics: ExecutionPlanMetricsSet::new(),
55        }
56    }
57
58    fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static, R>> {
59        let file_schema = self.table_schema.file_schema();
60        let projection = Some(
61            self.projection
62                .file_indices
63                .iter()
64                .map(|&idx| file_schema.field(idx).name().clone())
65                .collect::<Vec<_>>(),
66        );
67        AvroReader::try_new(
68            reader,
69            &Arc::clone(self.table_schema.file_schema()),
70            self.batch_size.expect("Batch size must set before open"),
71            projection.as_ref(),
72        )
73    }
74}
75
76impl FileSource for AvroSource {
77    fn create_file_opener(
78        &self,
79        object_store: Arc<dyn ObjectStore>,
80        _base_config: &FileScanConfig,
81        _partition: usize,
82    ) -> Result<Arc<dyn FileOpener>> {
83        let mut opener = Arc::new(private::AvroOpener {
84            config: Arc::new(self.clone()),
85            object_store,
86        }) as Arc<dyn FileOpener>;
87        opener = ProjectionOpener::try_new(
88            self.projection.clone(),
89            Arc::clone(&opener),
90            self.table_schema.file_schema(),
91        )?;
92        Ok(opener)
93    }
94
95    fn as_any(&self) -> &dyn Any {
96        self
97    }
98
99    fn table_schema(&self) -> &TableSchema {
100        &self.table_schema
101    }
102
103    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
104        let mut conf = self.clone();
105        conf.batch_size = Some(batch_size);
106        Arc::new(conf)
107    }
108
109    fn try_pushdown_projection(
110        &self,
111        projection: &ProjectionExprs,
112    ) -> Result<Option<Arc<dyn FileSource>>> {
113        let mut source = self.clone();
114        let new_projection = self.projection.source.try_merge(projection)?;
115        let split_projection =
116            SplitProjection::new(self.table_schema.file_schema(), &new_projection);
117        source.projection = split_projection;
118        Ok(Some(Arc::new(source)))
119    }
120
121    fn projection(&self) -> Option<&ProjectionExprs> {
122        Some(&self.projection.source)
123    }
124
125    fn metrics(&self) -> &ExecutionPlanMetricsSet {
126        &self.metrics
127    }
128
129    fn file_type(&self) -> &str {
130        "avro"
131    }
132
133    fn repartitioned(
134        &self,
135        _target_partitions: usize,
136        _repartition_file_min_size: usize,
137        _output_ordering: Option<LexOrdering>,
138        _config: &FileScanConfig,
139    ) -> Result<Option<FileScanConfig>> {
140        Ok(None)
141    }
142}
143
144mod private {
145    use super::*;
146
147    use bytes::Buf;
148    use datafusion_datasource::{PartitionedFile, file_stream::FileOpenFuture};
149    use futures::StreamExt;
150    use object_store::{GetResultPayload, ObjectStore};
151
152    pub struct AvroOpener {
153        pub config: Arc<AvroSource>,
154        pub object_store: Arc<dyn ObjectStore>,
155    }
156
157    impl FileOpener for AvroOpener {
158        fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
159            let config = Arc::clone(&self.config);
160            let object_store = Arc::clone(&self.object_store);
161            Ok(Box::pin(async move {
162                let r = object_store
163                    .get(&partitioned_file.object_meta.location)
164                    .await?;
165                match r.payload {
166                    GetResultPayload::File(file, _) => {
167                        let reader = config.open(file)?;
168                        Ok(futures::stream::iter(reader)
169                            .map(|r| r.map_err(Into::into))
170                            .boxed())
171                    }
172                    GetResultPayload::Stream(_) => {
173                        let bytes = r.bytes().await?;
174                        let reader = config.open(bytes.reader())?;
175                        Ok(futures::stream::iter(reader)
176                            .map(|r| r.map_err(Into::into))
177                            .boxed())
178                    }
179                }
180            }))
181        }
182    }
183}