datafusion_datasource_avro/
file_format.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Apache Avro [`FileFormat`] abstractions
19
20use std::any::Any;
21use std::collections::HashMap;
22use std::fmt;
23use std::sync::Arc;
24
25use crate::avro_to_arrow::read_avro_schema_from_reader;
26use crate::source::AvroSource;
27
28use arrow::datatypes::Schema;
29use arrow::datatypes::SchemaRef;
30use datafusion_common::internal_err;
31use datafusion_common::parsers::CompressionTypeVariant;
32use datafusion_common::GetExt;
33use datafusion_common::DEFAULT_AVRO_EXTENSION;
34use datafusion_common::{Result, Statistics};
35use datafusion_datasource::file::FileSource;
36use datafusion_datasource::file_compression_type::FileCompressionType;
37use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
38use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
39use datafusion_datasource::source::DataSourceExec;
40use datafusion_physical_plan::ExecutionPlan;
41use datafusion_session::Session;
42
43use async_trait::async_trait;
44use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
45
46#[derive(Default)]
47/// Factory struct used to create [`AvroFormat`]
48pub struct AvroFormatFactory;
49
50impl AvroFormatFactory {
51    /// Creates an instance of [`AvroFormatFactory`]
52    pub fn new() -> Self {
53        Self {}
54    }
55}
56
57impl FileFormatFactory for AvroFormatFactory {
58    fn create(
59        &self,
60        _state: &dyn Session,
61        _format_options: &HashMap<String, String>,
62    ) -> Result<Arc<dyn FileFormat>> {
63        Ok(Arc::new(AvroFormat))
64    }
65
66    fn default(&self) -> Arc<dyn FileFormat> {
67        Arc::new(AvroFormat)
68    }
69
70    fn as_any(&self) -> &dyn Any {
71        self
72    }
73}
74
75impl fmt::Debug for AvroFormatFactory {
76    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77        f.debug_struct("AvroFormatFactory").finish()
78    }
79}
80
81impl GetExt for AvroFormatFactory {
82    fn get_ext(&self) -> String {
83        // Removes the dot, i.e. ".avro" -> "avro"
84        DEFAULT_AVRO_EXTENSION[1..].to_string()
85    }
86}
87
88/// Avro [`FileFormat`] implementation.
89#[derive(Default, Debug)]
90pub struct AvroFormat;
91
92#[async_trait]
93impl FileFormat for AvroFormat {
94    fn as_any(&self) -> &dyn Any {
95        self
96    }
97
98    fn get_ext(&self) -> String {
99        AvroFormatFactory::new().get_ext()
100    }
101
102    fn get_ext_with_compression(
103        &self,
104        file_compression_type: &FileCompressionType,
105    ) -> Result<String> {
106        let ext = self.get_ext();
107        match file_compression_type.get_variant() {
108            CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
109            _ => internal_err!("Avro FileFormat does not support compression."),
110        }
111    }
112
113    fn compression_type(&self) -> Option<FileCompressionType> {
114        None
115    }
116
117    async fn infer_schema(
118        &self,
119        _state: &dyn Session,
120        store: &Arc<dyn ObjectStore>,
121        objects: &[ObjectMeta],
122    ) -> Result<SchemaRef> {
123        let mut schemas = vec![];
124        for object in objects {
125            let r = store.as_ref().get(&object.location).await?;
126            let schema = match r.payload {
127                GetResultPayload::File(mut file, _) => {
128                    read_avro_schema_from_reader(&mut file)?
129                }
130                GetResultPayload::Stream(_) => {
131                    // TODO: Fetching entire file to get schema is potentially wasteful
132                    let data = r.bytes().await?;
133                    read_avro_schema_from_reader(&mut data.as_ref())?
134                }
135            };
136            schemas.push(schema);
137        }
138        let merged_schema = Schema::try_merge(schemas)?;
139        Ok(Arc::new(merged_schema))
140    }
141
142    async fn infer_stats(
143        &self,
144        _state: &dyn Session,
145        _store: &Arc<dyn ObjectStore>,
146        table_schema: SchemaRef,
147        _object: &ObjectMeta,
148    ) -> Result<Statistics> {
149        Ok(Statistics::new_unknown(&table_schema))
150    }
151
152    async fn create_physical_plan(
153        &self,
154        _state: &dyn Session,
155        conf: FileScanConfig,
156    ) -> Result<Arc<dyn ExecutionPlan>> {
157        let config = FileScanConfigBuilder::from(conf)
158            .with_source(self.file_source())
159            .build();
160        Ok(DataSourceExec::from_data_source(config))
161    }
162
163    fn file_source(&self) -> Arc<dyn FileSource> {
164        Arc::new(AvroSource::new())
165    }
166}