datafusion_datasource_avro/
source.rs1use std::any::Any;
21use std::sync::Arc;
22
23use crate::avro_to_arrow::Reader as AvroReader;
24
25use datafusion_common::error::Result;
26use datafusion_datasource::TableSchema;
27use datafusion_datasource::file::FileSource;
28use datafusion_datasource::file_scan_config::FileScanConfig;
29use datafusion_datasource::file_stream::FileOpener;
30use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
31use datafusion_physical_expr_common::sort_expr::LexOrdering;
32use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
33use datafusion_physical_plan::projection::ProjectionExprs;
34
35use object_store::ObjectStore;
36
37#[derive(Clone)]
39pub struct AvroSource {
40 table_schema: TableSchema,
41 batch_size: Option<usize>,
42 projection: SplitProjection,
43 metrics: ExecutionPlanMetricsSet,
44}
45
46impl AvroSource {
47 pub fn new(table_schema: impl Into<TableSchema>) -> Self {
49 let table_schema = table_schema.into();
50 Self {
51 projection: SplitProjection::unprojected(&table_schema),
52 table_schema,
53 batch_size: None,
54 metrics: ExecutionPlanMetricsSet::new(),
55 }
56 }
57
58 fn open<R: std::io::Read>(&self, reader: R) -> Result<AvroReader<'static, R>> {
59 let file_schema = self.table_schema.file_schema();
60 let projection = Some(
61 self.projection
62 .file_indices
63 .iter()
64 .map(|&idx| file_schema.field(idx).name().clone())
65 .collect::<Vec<_>>(),
66 );
67 AvroReader::try_new(
68 reader,
69 &Arc::clone(self.table_schema.file_schema()),
70 self.batch_size.expect("Batch size must set before open"),
71 projection.as_ref(),
72 )
73 }
74}
75
76impl FileSource for AvroSource {
77 fn create_file_opener(
78 &self,
79 object_store: Arc<dyn ObjectStore>,
80 _base_config: &FileScanConfig,
81 _partition: usize,
82 ) -> Result<Arc<dyn FileOpener>> {
83 let mut opener = Arc::new(private::AvroOpener {
84 config: Arc::new(self.clone()),
85 object_store,
86 }) as Arc<dyn FileOpener>;
87 opener = ProjectionOpener::try_new(
88 self.projection.clone(),
89 Arc::clone(&opener),
90 self.table_schema.file_schema(),
91 )?;
92 Ok(opener)
93 }
94
95 fn as_any(&self) -> &dyn Any {
96 self
97 }
98
99 fn table_schema(&self) -> &TableSchema {
100 &self.table_schema
101 }
102
103 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
104 let mut conf = self.clone();
105 conf.batch_size = Some(batch_size);
106 Arc::new(conf)
107 }
108
109 fn try_pushdown_projection(
110 &self,
111 projection: &ProjectionExprs,
112 ) -> Result<Option<Arc<dyn FileSource>>> {
113 let mut source = self.clone();
114 let new_projection = self.projection.source.try_merge(projection)?;
115 let split_projection =
116 SplitProjection::new(self.table_schema.file_schema(), &new_projection);
117 source.projection = split_projection;
118 Ok(Some(Arc::new(source)))
119 }
120
121 fn projection(&self) -> Option<&ProjectionExprs> {
122 Some(&self.projection.source)
123 }
124
125 fn metrics(&self) -> &ExecutionPlanMetricsSet {
126 &self.metrics
127 }
128
129 fn file_type(&self) -> &str {
130 "avro"
131 }
132
133 fn repartitioned(
134 &self,
135 _target_partitions: usize,
136 _repartition_file_min_size: usize,
137 _output_ordering: Option<LexOrdering>,
138 _config: &FileScanConfig,
139 ) -> Result<Option<FileScanConfig>> {
140 Ok(None)
141 }
142}
143
144mod private {
145 use super::*;
146
147 use bytes::Buf;
148 use datafusion_datasource::{PartitionedFile, file_stream::FileOpenFuture};
149 use futures::StreamExt;
150 use object_store::{GetResultPayload, ObjectStore};
151
152 pub struct AvroOpener {
153 pub config: Arc<AvroSource>,
154 pub object_store: Arc<dyn ObjectStore>,
155 }
156
157 impl FileOpener for AvroOpener {
158 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
159 let config = Arc::clone(&self.config);
160 let object_store = Arc::clone(&self.object_store);
161 Ok(Box::pin(async move {
162 let r = object_store
163 .get(&partitioned_file.object_meta.location)
164 .await?;
165 match r.payload {
166 GetResultPayload::File(file, _) => {
167 let reader = config.open(file)?;
168 Ok(futures::stream::iter(reader)
169 .map(|r| r.map_err(Into::into))
170 .boxed())
171 }
172 GetResultPayload::Stream(_) => {
173 let bytes = r.bytes().await?;
174 let reader = config.open(bytes.reader())?;
175 Ok(futures::stream::iter(reader)
176 .map(|r| r.map_err(Into::into))
177 .boxed())
178 }
179 }
180 }))
181 }
182 }
183}