datafusion_datasource_avro/
file_format.rs1use std::any::Any;
21use std::collections::HashMap;
22use std::fmt;
23use std::sync::Arc;
24
25use crate::avro_to_arrow::read_avro_schema_from_reader;
26use crate::source::AvroSource;
27
28use arrow::datatypes::Schema;
29use arrow::datatypes::SchemaRef;
30use datafusion_common::internal_err;
31use datafusion_common::parsers::CompressionTypeVariant;
32use datafusion_common::GetExt;
33use datafusion_common::DEFAULT_AVRO_EXTENSION;
34use datafusion_common::{Result, Statistics};
35use datafusion_datasource::file::FileSource;
36use datafusion_datasource::file_compression_type::FileCompressionType;
37use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
38use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
39use datafusion_datasource::source::DataSourceExec;
40use datafusion_physical_plan::ExecutionPlan;
41use datafusion_session::Session;
42
43use async_trait::async_trait;
44use object_store::{GetResultPayload, ObjectMeta, ObjectStore};
45
46#[derive(Default)]
47pub struct AvroFormatFactory;
49
50impl AvroFormatFactory {
51 pub fn new() -> Self {
53 Self {}
54 }
55}
56
57impl FileFormatFactory for AvroFormatFactory {
58 fn create(
59 &self,
60 _state: &dyn Session,
61 _format_options: &HashMap<String, String>,
62 ) -> Result<Arc<dyn FileFormat>> {
63 Ok(Arc::new(AvroFormat))
64 }
65
66 fn default(&self) -> Arc<dyn FileFormat> {
67 Arc::new(AvroFormat)
68 }
69
70 fn as_any(&self) -> &dyn Any {
71 self
72 }
73}
74
75impl fmt::Debug for AvroFormatFactory {
76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77 f.debug_struct("AvroFormatFactory").finish()
78 }
79}
80
81impl GetExt for AvroFormatFactory {
82 fn get_ext(&self) -> String {
83 DEFAULT_AVRO_EXTENSION[1..].to_string()
85 }
86}
87
88#[derive(Default, Debug)]
90pub struct AvroFormat;
91
92#[async_trait]
93impl FileFormat for AvroFormat {
94 fn as_any(&self) -> &dyn Any {
95 self
96 }
97
98 fn get_ext(&self) -> String {
99 AvroFormatFactory::new().get_ext()
100 }
101
102 fn get_ext_with_compression(
103 &self,
104 file_compression_type: &FileCompressionType,
105 ) -> Result<String> {
106 let ext = self.get_ext();
107 match file_compression_type.get_variant() {
108 CompressionTypeVariant::UNCOMPRESSED => Ok(ext),
109 _ => internal_err!("Avro FileFormat does not support compression."),
110 }
111 }
112
113 fn compression_type(&self) -> Option<FileCompressionType> {
114 None
115 }
116
117 async fn infer_schema(
118 &self,
119 _state: &dyn Session,
120 store: &Arc<dyn ObjectStore>,
121 objects: &[ObjectMeta],
122 ) -> Result<SchemaRef> {
123 let mut schemas = vec![];
124 for object in objects {
125 let r = store.as_ref().get(&object.location).await?;
126 let schema = match r.payload {
127 GetResultPayload::File(mut file, _) => {
128 read_avro_schema_from_reader(&mut file)?
129 }
130 GetResultPayload::Stream(_) => {
131 let data = r.bytes().await?;
133 read_avro_schema_from_reader(&mut data.as_ref())?
134 }
135 };
136 schemas.push(schema);
137 }
138 let merged_schema = Schema::try_merge(schemas)?;
139 Ok(Arc::new(merged_schema))
140 }
141
142 async fn infer_stats(
143 &self,
144 _state: &dyn Session,
145 _store: &Arc<dyn ObjectStore>,
146 table_schema: SchemaRef,
147 _object: &ObjectMeta,
148 ) -> Result<Statistics> {
149 Ok(Statistics::new_unknown(&table_schema))
150 }
151
152 async fn create_physical_plan(
153 &self,
154 _state: &dyn Session,
155 conf: FileScanConfig,
156 ) -> Result<Arc<dyn ExecutionPlan>> {
157 let config = FileScanConfigBuilder::from(conf)
158 .with_source(self.file_source())
159 .build();
160 Ok(DataSourceExec::from_data_source(config))
161 }
162
163 fn file_source(&self) -> Arc<dyn FileSource> {
164 Arc::new(AvroSource::new())
165 }
166}