datafusion_datasource_arrow/
source.rs1use std::sync::Arc;
35use std::{any::Any, io::Cursor};
36
37use datafusion_datasource::{TableSchema, as_file_source};
38
39use arrow::buffer::Buffer;
40use arrow::ipc::reader::{FileDecoder, FileReader, StreamReader};
41use datafusion_common::error::Result;
42use datafusion_common::exec_datafusion_err;
43use datafusion_datasource::PartitionedFile;
44use datafusion_datasource::file::FileSource;
45use datafusion_datasource::file_scan_config::FileScanConfig;
46use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
47use datafusion_physical_expr_common::sort_expr::LexOrdering;
48use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
49use datafusion_physical_plan::projection::ProjectionExprs;
50
51use datafusion_datasource::file_stream::FileOpenFuture;
52use datafusion_datasource::file_stream::FileOpener;
53use futures::StreamExt;
54use itertools::Itertools;
55use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
56
57#[derive(Clone, Copy, Debug)]
59enum ArrowFormat {
60 File,
62 Stream,
64}
65
66pub(crate) struct ArrowStreamFileOpener {
68 object_store: Arc<dyn ObjectStore>,
69 projection: Option<Vec<usize>>,
70}
71
72impl FileOpener for ArrowStreamFileOpener {
73 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
74 if partitioned_file.range.is_some() {
75 return Err(exec_datafusion_err!(
76 "ArrowStreamFileOpener does not support range-based reading"
77 ));
78 }
79 let object_store = Arc::clone(&self.object_store);
80 let projection = self.projection.clone();
81
82 Ok(Box::pin(async move {
83 let r = object_store
84 .get(&partitioned_file.object_meta.location)
85 .await?;
86
87 let stream = match r.payload {
88 #[cfg(not(target_arch = "wasm32"))]
89 GetResultPayload::File(file, _) => futures::stream::iter(
90 StreamReader::try_new(file.try_clone()?, projection.clone())?,
91 )
92 .map(|r| r.map_err(Into::into))
93 .boxed(),
94 GetResultPayload::Stream(_) => {
95 let bytes = r.bytes().await?;
96 let cursor = Cursor::new(bytes);
97 futures::stream::iter(StreamReader::try_new(
98 cursor,
99 projection.clone(),
100 )?)
101 .map(|r| r.map_err(Into::into))
102 .boxed()
103 }
104 };
105
106 Ok(stream)
107 }))
108 }
109}
110
111pub(crate) struct ArrowFileOpener {
113 object_store: Arc<dyn ObjectStore>,
114 projection: Option<Vec<usize>>,
115}
116
117impl FileOpener for ArrowFileOpener {
118 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
119 let object_store = Arc::clone(&self.object_store);
120 let projection = self.projection.clone();
121
122 Ok(Box::pin(async move {
123 let range = partitioned_file.range.clone();
124 match range {
125 None => {
126 let r = object_store
127 .get(&partitioned_file.object_meta.location)
128 .await?;
129 let stream = match r.payload {
130 #[cfg(not(target_arch = "wasm32"))]
131 GetResultPayload::File(file, _) => futures::stream::iter(
132 FileReader::try_new(file.try_clone()?, projection.clone())?,
133 )
134 .map(|r| r.map_err(Into::into))
135 .boxed(),
136 GetResultPayload::Stream(_) => {
137 let bytes = r.bytes().await?;
138 let cursor = Cursor::new(bytes);
139 futures::stream::iter(FileReader::try_new(
140 cursor,
141 projection.clone(),
142 )?)
143 .map(|r| r.map_err(Into::into))
144 .boxed()
145 }
146 };
147
148 Ok(stream)
149 }
150 Some(range) => {
151 let get_option = GetOptions {
154 range: Some(GetRange::Suffix(10)),
155 ..Default::default()
156 };
157 let get_result = object_store
158 .get_opts(&partitioned_file.object_meta.location, get_option)
159 .await?;
160 let footer_len_buf = get_result.bytes().await?;
161 let footer_len = arrow_ipc::reader::read_footer_length(
162 footer_len_buf[..].try_into().unwrap(),
163 )?;
164 let get_option = GetOptions {
166 range: Some(GetRange::Suffix(10 + (footer_len as u64))),
167 ..Default::default()
168 };
169 let get_result = object_store
170 .get_opts(&partitioned_file.object_meta.location, get_option)
171 .await?;
172 let footer_buf = get_result.bytes().await?;
173 let footer = arrow_ipc::root_as_footer(
174 footer_buf[..footer_len].try_into().unwrap(),
175 )
176 .map_err(|err| {
177 exec_datafusion_err!("Unable to get root as footer: {err:?}")
178 })?;
179 let schema =
181 arrow_ipc::convert::fb_to_schema(footer.schema().unwrap());
182 let mut decoder = FileDecoder::new(schema.into(), footer.version());
183 if let Some(projection) = projection {
184 decoder = decoder.with_projection(projection);
185 }
186 let dict_ranges = footer
187 .dictionaries()
188 .iter()
189 .flatten()
190 .map(|block| {
191 let block_len =
192 block.bodyLength() as u64 + block.metaDataLength() as u64;
193 let block_offset = block.offset() as u64;
194 block_offset..block_offset + block_len
195 })
196 .collect_vec();
197 let dict_results = object_store
198 .get_ranges(&partitioned_file.object_meta.location, &dict_ranges)
199 .await?;
200 for (dict_block, dict_result) in
201 footer.dictionaries().iter().flatten().zip(dict_results)
202 {
203 decoder
204 .read_dictionary(dict_block, &Buffer::from(dict_result))?;
205 }
206
207 let recordbatches = footer
209 .recordBatches()
210 .iter()
211 .flatten()
212 .filter(|block| {
213 let block_offset = block.offset() as u64;
214 block_offset >= range.start as u64
215 && block_offset < range.end as u64
216 })
217 .copied()
218 .collect_vec();
219
220 let recordbatch_ranges = recordbatches
221 .iter()
222 .map(|block| {
223 let block_len =
224 block.bodyLength() as u64 + block.metaDataLength() as u64;
225 let block_offset = block.offset() as u64;
226 block_offset..block_offset + block_len
227 })
228 .collect_vec();
229
230 let recordbatch_results = object_store
231 .get_ranges(
232 &partitioned_file.object_meta.location,
233 &recordbatch_ranges,
234 )
235 .await?;
236
237 let stream = futures::stream::iter(
238 recordbatches
239 .into_iter()
240 .zip(recordbatch_results)
241 .filter_map(move |(block, data)| {
242 decoder
243 .read_record_batch(&block, &Buffer::from(data))
244 .transpose()
245 }),
246 )
247 .map(|r| r.map_err(Into::into))
248 .boxed();
249
250 Ok(stream)
251 }
252 }
253 }))
254 }
255}
256
257#[derive(Clone)]
259pub struct ArrowSource {
260 format: ArrowFormat,
261 metrics: ExecutionPlanMetricsSet,
262 projection: SplitProjection,
263 table_schema: TableSchema,
264}
265
266impl ArrowSource {
267 pub fn new_file_source(table_schema: impl Into<TableSchema>) -> Self {
269 let table_schema = table_schema.into();
270 Self {
271 format: ArrowFormat::File,
272 metrics: ExecutionPlanMetricsSet::new(),
273 projection: SplitProjection::unprojected(&table_schema),
274 table_schema,
275 }
276 }
277
278 pub fn new_stream_file_source(table_schema: impl Into<TableSchema>) -> Self {
280 let table_schema = table_schema.into();
281 Self {
282 format: ArrowFormat::Stream,
283 metrics: ExecutionPlanMetricsSet::new(),
284 projection: SplitProjection::unprojected(&table_schema),
285 table_schema,
286 }
287 }
288}
289
290impl FileSource for ArrowSource {
291 fn create_file_opener(
292 &self,
293 object_store: Arc<dyn ObjectStore>,
294 _base_config: &FileScanConfig,
295 _partition: usize,
296 ) -> Result<Arc<dyn FileOpener>> {
297 let split_projection = self.projection.clone();
298
299 let opener: Arc<dyn FileOpener> = match self.format {
300 ArrowFormat::File => Arc::new(ArrowFileOpener {
301 object_store,
302 projection: Some(split_projection.file_indices.clone()),
303 }),
304 ArrowFormat::Stream => Arc::new(ArrowStreamFileOpener {
305 object_store,
306 projection: Some(split_projection.file_indices.clone()),
307 }),
308 };
309 ProjectionOpener::try_new(
310 split_projection,
311 opener,
312 self.table_schema.file_schema(),
313 )
314 }
315
316 fn as_any(&self) -> &dyn Any {
317 self
318 }
319
320 fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
321 Arc::new(Self { ..self.clone() })
322 }
323
324 fn metrics(&self) -> &ExecutionPlanMetricsSet {
325 &self.metrics
326 }
327
328 fn file_type(&self) -> &str {
329 match self.format {
330 ArrowFormat::File => "arrow",
331 ArrowFormat::Stream => "arrow_stream",
332 }
333 }
334
335 fn repartitioned(
336 &self,
337 target_partitions: usize,
338 repartition_file_min_size: usize,
339 output_ordering: Option<LexOrdering>,
340 config: &FileScanConfig,
341 ) -> Result<Option<FileScanConfig>> {
342 match self.format {
343 ArrowFormat::Stream => {
344 Ok(None)
354 }
355 ArrowFormat::File => {
356 use datafusion_datasource::file_groups::FileGroupPartitioner;
358
359 if config.file_compression_type.is_compressed() {
360 return Ok(None);
361 }
362
363 let repartitioned_file_groups_option = FileGroupPartitioner::new()
364 .with_target_partitions(target_partitions)
365 .with_repartition_file_min_size(repartition_file_min_size)
366 .with_preserve_order_within_groups(output_ordering.is_some())
367 .repartition_file_groups(&config.file_groups);
368
369 if let Some(repartitioned_file_groups) = repartitioned_file_groups_option
370 {
371 let mut source = config.clone();
372 source.file_groups = repartitioned_file_groups;
373 return Ok(Some(source));
374 }
375 Ok(None)
376 }
377 }
378 }
379
380 fn table_schema(&self) -> &TableSchema {
381 &self.table_schema
382 }
383
384 fn try_pushdown_projection(
385 &self,
386 projection: &ProjectionExprs,
387 ) -> Result<Option<Arc<dyn FileSource>>> {
388 let mut source = self.clone();
389 source.projection = SplitProjection::new(
390 self.table_schema().file_schema(),
391 &source.projection.source.try_merge(projection)?,
392 );
393 Ok(Some(Arc::new(source)))
394 }
395
396 fn projection(&self) -> Option<&ProjectionExprs> {
397 Some(&self.projection.source)
398 }
399}
400
401pub struct ArrowOpener {
403 pub inner: Arc<dyn FileOpener>,
404}
405
406impl FileOpener for ArrowOpener {
407 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
408 self.inner.open(partitioned_file)
409 }
410}
411
412impl ArrowOpener {
413 pub fn new(inner: Arc<dyn FileOpener>) -> Self {
415 Self { inner }
416 }
417
418 pub fn new_file_opener(
419 object_store: Arc<dyn ObjectStore>,
420 projection: Option<Vec<usize>>,
421 ) -> Self {
422 Self {
423 inner: Arc::new(ArrowFileOpener {
424 object_store,
425 projection,
426 }),
427 }
428 }
429
430 pub fn new_stream_file_opener(
431 object_store: Arc<dyn ObjectStore>,
432 projection: Option<Vec<usize>>,
433 ) -> Self {
434 Self {
435 inner: Arc::new(ArrowStreamFileOpener {
436 object_store,
437 projection,
438 }),
439 }
440 }
441}
442
443impl From<ArrowSource> for Arc<dyn FileSource> {
444 fn from(source: ArrowSource) -> Self {
445 as_file_source(source)
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use std::{fs::File, io::Read};
452
453 use arrow::datatypes::{DataType, Field, Schema};
454 use arrow_ipc::reader::{FileReader, StreamReader};
455 use bytes::Bytes;
456 use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
457 use datafusion_execution::object_store::ObjectStoreUrl;
458 use object_store::memory::InMemory;
459
460 use super::*;
461
462 #[tokio::test]
463 async fn test_file_opener_without_ranges() -> Result<()> {
464 for filename in ["example.arrow", "example_stream.arrow"] {
465 let path = format!("tests/data/{filename}");
466 let path_str = path.as_str();
467 let mut file = File::open(path_str)?;
468 let file_size = file.metadata()?.len();
469
470 let mut buffer = Vec::new();
471 file.read_to_end(&mut buffer)?;
472 let bytes = Bytes::from(buffer);
473
474 let object_store = Arc::new(InMemory::new());
475 let partitioned_file = PartitionedFile::new(filename, file_size);
476 object_store
477 .put(&partitioned_file.object_meta.location, bytes.into())
478 .await?;
479
480 let schema = match FileReader::try_new(File::open(path_str)?, None) {
481 Ok(reader) => reader.schema(),
482 Err(_) => StreamReader::try_new(File::open(path_str)?, None)?.schema(),
483 };
484
485 let source: Arc<dyn FileSource> = if filename.contains("stream") {
486 Arc::new(ArrowSource::new_stream_file_source(schema))
487 } else {
488 Arc::new(ArrowSource::new_file_source(schema))
489 };
490
491 let scan_config = FileScanConfigBuilder::new(
492 ObjectStoreUrl::local_filesystem(),
493 source.clone(),
494 )
495 .build();
496
497 let file_opener = source.create_file_opener(object_store, &scan_config, 0)?;
498 let mut stream = file_opener.open(partitioned_file)?.await?;
499
500 assert!(stream.next().await.is_some());
501 }
502
503 Ok(())
504 }
505
506 #[tokio::test]
507 async fn test_file_opener_with_ranges() -> Result<()> {
508 let filename = "example.arrow";
509 let path = format!("tests/data/{filename}");
510 let path_str = path.as_str();
511 let mut file = File::open(path_str)?;
512 let file_size = file.metadata()?.len();
513
514 let mut buffer = Vec::new();
515 file.read_to_end(&mut buffer)?;
516 let bytes = Bytes::from(buffer);
517
518 let object_store = Arc::new(InMemory::new());
519 let partitioned_file = PartitionedFile::new_with_range(
520 filename.into(),
521 file_size,
522 0,
523 (file_size - 1) as i64,
524 );
525 object_store
526 .put(&partitioned_file.object_meta.location, bytes.into())
527 .await?;
528
529 let schema = FileReader::try_new(File::open(path_str)?, None)?.schema();
530
531 let source = Arc::new(ArrowSource::new_file_source(schema));
532
533 let scan_config = FileScanConfigBuilder::new(
534 ObjectStoreUrl::local_filesystem(),
535 source.clone(),
536 )
537 .build();
538
539 let file_opener = source.create_file_opener(object_store, &scan_config, 0)?;
540 let mut stream = file_opener.open(partitioned_file)?.await?;
541
542 assert!(stream.next().await.is_some());
543
544 Ok(())
545 }
546
547 #[tokio::test]
548 async fn test_stream_opener_errors_with_ranges() -> Result<()> {
549 let filename = "example_stream.arrow";
550 let path = format!("tests/data/{filename}");
551 let path_str = path.as_str();
552 let mut file = File::open(path_str)?;
553 let file_size = file.metadata()?.len();
554
555 let mut buffer = Vec::new();
556 file.read_to_end(&mut buffer)?;
557 let bytes = Bytes::from(buffer);
558
559 let object_store = Arc::new(InMemory::new());
560 let partitioned_file = PartitionedFile::new_with_range(
561 filename.into(),
562 file_size,
563 0,
564 (file_size - 1) as i64,
565 );
566 object_store
567 .put(&partitioned_file.object_meta.location, bytes.into())
568 .await?;
569
570 let schema = StreamReader::try_new(File::open(path_str)?, None)?.schema();
571
572 let source = Arc::new(ArrowSource::new_stream_file_source(schema));
573
574 let scan_config = FileScanConfigBuilder::new(
575 ObjectStoreUrl::local_filesystem(),
576 source.clone(),
577 )
578 .build();
579
580 let file_opener = source.create_file_opener(object_store, &scan_config, 0)?;
581 let result = file_opener.open(partitioned_file);
582 assert!(result.is_err());
583
584 Ok(())
585 }
586
587 #[tokio::test]
588 async fn test_arrow_stream_repartitioning_not_supported() -> Result<()> {
589 let schema =
590 Arc::new(Schema::new(vec![Field::new("f0", DataType::Int64, false)]));
591 let source = ArrowSource::new_stream_file_source(schema);
592
593 let config = FileScanConfigBuilder::new(
594 ObjectStoreUrl::local_filesystem(),
595 Arc::new(source.clone()) as Arc<dyn FileSource>,
596 )
597 .build();
598
599 for target_partitions in [2, 4, 8, 16] {
600 let result =
601 source.repartitioned(target_partitions, 1024 * 1024, None, &config)?;
602
603 assert!(
604 result.is_none(),
605 "Stream format should not support repartitioning with {target_partitions} partitions",
606 );
607 }
608
609 Ok(())
610 }
611
612 #[tokio::test]
613 async fn test_stream_opener_with_projection() -> Result<()> {
614 let filename = "example_stream.arrow";
615 let path = format!("tests/data/{filename}");
616 let path_str = path.as_str();
617 let mut file = File::open(path_str)?;
618 let file_size = file.metadata()?.len();
619
620 let mut buffer = Vec::new();
621 file.read_to_end(&mut buffer)?;
622 let bytes = Bytes::from(buffer);
623
624 let object_store = Arc::new(InMemory::new());
625 let partitioned_file = PartitionedFile::new(filename, file_size);
626 object_store
627 .put(&partitioned_file.object_meta.location, bytes.into())
628 .await?;
629
630 let opener = ArrowStreamFileOpener {
631 object_store,
632 projection: Some(vec![0]), };
634
635 let mut stream = opener.open(partitioned_file)?.await?;
636
637 if let Some(batch) = stream.next().await {
638 let batch = batch?;
639 assert_eq!(
640 batch.num_columns(),
641 1,
642 "Projection should result in 1 column"
643 );
644 } else {
645 panic!("Expected at least one batch");
646 }
647
648 Ok(())
649 }
650}