datafusion_datasource_avro/
source.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::avro_to_arrow::Reader as AvroReader;
24
25use arrow::datatypes::SchemaRef;
26use datafusion_common::error::Result;
27use datafusion_common::Statistics;
28use datafusion_datasource::file::FileSource;
29use datafusion_datasource::file_scan_config::FileScanConfig;
30use datafusion_datasource::file_stream::FileOpener;
31use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
32use datafusion_physical_expr_common::sort_expr::LexOrdering;
33use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
34
35use object_store::ObjectStore;
36
37#[derive(Clone, Default)]
39pub struct AvroSource {
40 schema: Option<SchemaRef>,
41 batch_size: Option<usize>,
42 projection: Option<Vec<String>>,
43 metrics: ExecutionPlanMetricsSet,
44 projected_statistics: Option<Statistics>,
45 schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
46}
47
48impl AvroSource {
49 pub fn new() -> Self {
51 Self::default()
52 }
53
54 fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static, R>> {
55 AvroReader::try_new(
56 reader,
57 Arc::clone(self.schema.as_ref().expect("Schema must set before open")),
58 self.batch_size.expect("Batch size must set before open"),
59 self.projection.clone(),
60 )
61 }
62}
63
64impl FileSource for AvroSource {
65 fn create_file_opener(
66 &self,
67 object_store: Arc<dyn ObjectStore>,
68 _base_config: &FileScanConfig,
69 _partition: usize,
70 ) -> Arc<dyn FileOpener> {
71 Arc::new(private::AvroOpener {
72 config: Arc::new(self.clone()),
73 object_store,
74 })
75 }
76
77 fn as_any(&self) -> &dyn Any {
78 self
79 }
80
81 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
82 let mut conf = self.clone();
83 conf.batch_size = Some(batch_size);
84 Arc::new(conf)
85 }
86
87 fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
88 let mut conf = self.clone();
89 conf.schema = Some(schema);
90 Arc::new(conf)
91 }
92 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
93 let mut conf = self.clone();
94 conf.projected_statistics = Some(statistics);
95 Arc::new(conf)
96 }
97
98 fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
99 let mut conf = self.clone();
100 conf.projection = config.projected_file_column_names();
101 Arc::new(conf)
102 }
103
104 fn metrics(&self) -> &ExecutionPlanMetricsSet {
105 &self.metrics
106 }
107
108 fn statistics(&self) -> Result<Statistics> {
109 let statistics = &self.projected_statistics;
110 Ok(statistics
111 .clone()
112 .expect("projected_statistics must be set"))
113 }
114
115 fn file_type(&self) -> &str {
116 "avro"
117 }
118
119 fn repartitioned(
120 &self,
121 _target_partitions: usize,
122 _repartition_file_min_size: usize,
123 _output_ordering: Option<LexOrdering>,
124 _config: &FileScanConfig,
125 ) -> Result<Option<FileScanConfig>> {
126 Ok(None)
127 }
128
129 fn with_schema_adapter_factory(
130 &self,
131 schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
132 ) -> Result<Arc<dyn FileSource>> {
133 Ok(Arc::new(Self {
134 schema_adapter_factory: Some(schema_adapter_factory),
135 ..self.clone()
136 }))
137 }
138
139 fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
140 self.schema_adapter_factory.clone()
141 }
142}
143
144mod private {
145 use super::*;
146
147 use bytes::Buf;
148 use datafusion_datasource::{
149 file_meta::FileMeta, file_stream::FileOpenFuture, PartitionedFile,
150 };
151 use futures::StreamExt;
152 use object_store::{GetResultPayload, ObjectStore};
153
154 pub struct AvroOpener {
155 pub config: Arc<AvroSource>,
156 pub object_store: Arc<dyn ObjectStore>,
157 }
158
159 impl FileOpener for AvroOpener {
160 fn open(
161 &self,
162 file_meta: FileMeta,
163 _file: PartitionedFile,
164 ) -> Result<FileOpenFuture> {
165 let config = Arc::clone(&self.config);
166 let object_store = Arc::clone(&self.object_store);
167 Ok(Box::pin(async move {
168 let r = object_store.get(file_meta.location()).await?;
169 match r.payload {
170 GetResultPayload::File(file, _) => {
171 let reader = config.open(file)?;
172 Ok(futures::stream::iter(reader).boxed())
173 }
174 GetResultPayload::Stream(_) => {
175 let bytes = r.bytes().await?;
176 let reader = config.open(bytes.reader())?;
177 Ok(futures::stream::iter(reader).boxed())
178 }
179 }
180 }))
181 }
182 }
183}