datafusion_datasource_arrow/
source.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Execution plan for reading Arrow IPC files
19//!
20//! # Naming Note
21//!
22//! The naming in this module can be confusing:
23//! - `ArrowFileOpener` handles the Arrow IPC **file format**
24//!   (with footer, supports parallel reading)
25//! - `ArrowStreamFileOpener` handles the Arrow IPC **stream format**
26//!   (without footer, sequential only)
27//! - `ArrowSource` is the unified `FileSource` implementation that uses either opener
28//!   depending on the format specified at construction
29//!
30//! Despite the name "ArrowStreamFileOpener", it still reads from files - the "Stream"
31//! refers to the Arrow IPC stream format, not streaming I/O. Both formats can be stored
32//! in files on disk or object storage.
33
34use std::sync::Arc;
35use std::{any::Any, io::Cursor};
36
37use datafusion_datasource::{TableSchema, as_file_source};
38
39use arrow::buffer::Buffer;
40use arrow::ipc::reader::{FileDecoder, FileReader, StreamReader};
41use datafusion_common::error::Result;
42use datafusion_common::exec_datafusion_err;
43use datafusion_datasource::PartitionedFile;
44use datafusion_datasource::file::FileSource;
45use datafusion_datasource::file_scan_config::FileScanConfig;
46use datafusion_datasource::projection::{ProjectionOpener, SplitProjection};
47use datafusion_physical_expr_common::sort_expr::LexOrdering;
48use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
49use datafusion_physical_plan::projection::ProjectionExprs;
50
51use datafusion_datasource::file_stream::FileOpenFuture;
52use datafusion_datasource::file_stream::FileOpener;
53use futures::StreamExt;
54use itertools::Itertools;
55use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
56
57/// Enum indicating which Arrow IPC format to use
58#[derive(Clone, Copy, Debug)]
59enum ArrowFormat {
60    /// Arrow IPC file format (with footer, supports parallel reading)
61    File,
62    /// Arrow IPC stream format (without footer, sequential only)
63    Stream,
64}
65
66/// `FileOpener` for Arrow IPC stream format. Supports only sequential reading.
67pub(crate) struct ArrowStreamFileOpener {
68    object_store: Arc<dyn ObjectStore>,
69    projection: Option<Vec<usize>>,
70}
71
72impl FileOpener for ArrowStreamFileOpener {
73    fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
74        if partitioned_file.range.is_some() {
75            return Err(exec_datafusion_err!(
76                "ArrowStreamFileOpener does not support range-based reading"
77            ));
78        }
79        let object_store = Arc::clone(&self.object_store);
80        let projection = self.projection.clone();
81
82        Ok(Box::pin(async move {
83            let r = object_store
84                .get(&partitioned_file.object_meta.location)
85                .await?;
86
87            let stream = match r.payload {
88                #[cfg(not(target_arch = "wasm32"))]
89                GetResultPayload::File(file, _) => futures::stream::iter(
90                    StreamReader::try_new(file.try_clone()?, projection.clone())?,
91                )
92                .map(|r| r.map_err(Into::into))
93                .boxed(),
94                GetResultPayload::Stream(_) => {
95                    let bytes = r.bytes().await?;
96                    let cursor = Cursor::new(bytes);
97                    futures::stream::iter(StreamReader::try_new(
98                        cursor,
99                        projection.clone(),
100                    )?)
101                    .map(|r| r.map_err(Into::into))
102                    .boxed()
103                }
104            };
105
106            Ok(stream)
107        }))
108    }
109}
110
111/// `FileOpener` for Arrow IPC file format. Supports range-based parallel reading.
112pub(crate) struct ArrowFileOpener {
113    object_store: Arc<dyn ObjectStore>,
114    projection: Option<Vec<usize>>,
115}
116
117impl FileOpener for ArrowFileOpener {
118    fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
119        let object_store = Arc::clone(&self.object_store);
120        let projection = self.projection.clone();
121
122        Ok(Box::pin(async move {
123            let range = partitioned_file.range.clone();
124            match range {
125                None => {
126                    let r = object_store
127                        .get(&partitioned_file.object_meta.location)
128                        .await?;
129                    let stream = match r.payload {
130                        #[cfg(not(target_arch = "wasm32"))]
131                        GetResultPayload::File(file, _) => futures::stream::iter(
132                            FileReader::try_new(file.try_clone()?, projection.clone())?,
133                        )
134                        .map(|r| r.map_err(Into::into))
135                        .boxed(),
136                        GetResultPayload::Stream(_) => {
137                            let bytes = r.bytes().await?;
138                            let cursor = Cursor::new(bytes);
139                            futures::stream::iter(FileReader::try_new(
140                                cursor,
141                                projection.clone(),
142                            )?)
143                            .map(|r| r.map_err(Into::into))
144                            .boxed()
145                        }
146                    };
147
148                    Ok(stream)
149                }
150                Some(range) => {
151                    // range is not none, the file maybe split into multiple parts to scan in parallel
152                    // get footer_len firstly
153                    let get_option = GetOptions {
154                        range: Some(GetRange::Suffix(10)),
155                        ..Default::default()
156                    };
157                    let get_result = object_store
158                        .get_opts(&partitioned_file.object_meta.location, get_option)
159                        .await?;
160                    let footer_len_buf = get_result.bytes().await?;
161                    let footer_len = arrow_ipc::reader::read_footer_length(
162                        footer_len_buf[..].try_into().unwrap(),
163                    )?;
164                    // read footer according to footer_len
165                    let get_option = GetOptions {
166                        range: Some(GetRange::Suffix(10 + (footer_len as u64))),
167                        ..Default::default()
168                    };
169                    let get_result = object_store
170                        .get_opts(&partitioned_file.object_meta.location, get_option)
171                        .await?;
172                    let footer_buf = get_result.bytes().await?;
173                    let footer = arrow_ipc::root_as_footer(
174                        footer_buf[..footer_len].try_into().unwrap(),
175                    )
176                    .map_err(|err| {
177                        exec_datafusion_err!("Unable to get root as footer: {err:?}")
178                    })?;
179                    // build decoder according to footer & projection
180                    let schema =
181                        arrow_ipc::convert::fb_to_schema(footer.schema().unwrap());
182                    let mut decoder = FileDecoder::new(schema.into(), footer.version());
183                    if let Some(projection) = projection {
184                        decoder = decoder.with_projection(projection);
185                    }
186                    let dict_ranges = footer
187                        .dictionaries()
188                        .iter()
189                        .flatten()
190                        .map(|block| {
191                            let block_len =
192                                block.bodyLength() as u64 + block.metaDataLength() as u64;
193                            let block_offset = block.offset() as u64;
194                            block_offset..block_offset + block_len
195                        })
196                        .collect_vec();
197                    let dict_results = object_store
198                        .get_ranges(&partitioned_file.object_meta.location, &dict_ranges)
199                        .await?;
200                    for (dict_block, dict_result) in
201                        footer.dictionaries().iter().flatten().zip(dict_results)
202                    {
203                        decoder
204                            .read_dictionary(dict_block, &Buffer::from(dict_result))?;
205                    }
206
207                    // filter recordbatches according to range
208                    let recordbatches = footer
209                        .recordBatches()
210                        .iter()
211                        .flatten()
212                        .filter(|block| {
213                            let block_offset = block.offset() as u64;
214                            block_offset >= range.start as u64
215                                && block_offset < range.end as u64
216                        })
217                        .copied()
218                        .collect_vec();
219
220                    let recordbatch_ranges = recordbatches
221                        .iter()
222                        .map(|block| {
223                            let block_len =
224                                block.bodyLength() as u64 + block.metaDataLength() as u64;
225                            let block_offset = block.offset() as u64;
226                            block_offset..block_offset + block_len
227                        })
228                        .collect_vec();
229
230                    let recordbatch_results = object_store
231                        .get_ranges(
232                            &partitioned_file.object_meta.location,
233                            &recordbatch_ranges,
234                        )
235                        .await?;
236
237                    let stream = futures::stream::iter(
238                        recordbatches
239                            .into_iter()
240                            .zip(recordbatch_results)
241                            .filter_map(move |(block, data)| {
242                                decoder
243                                    .read_record_batch(&block, &Buffer::from(data))
244                                    .transpose()
245                            }),
246                    )
247                    .map(|r| r.map_err(Into::into))
248                    .boxed();
249
250                    Ok(stream)
251                }
252            }
253        }))
254    }
255}
256
257/// `FileSource` for both Arrow IPC file and stream formats
258#[derive(Clone)]
259pub struct ArrowSource {
260    format: ArrowFormat,
261    metrics: ExecutionPlanMetricsSet,
262    projection: SplitProjection,
263    table_schema: TableSchema,
264}
265
266impl ArrowSource {
267    /// Creates an [`ArrowSource`] for file format
268    pub fn new_file_source(table_schema: impl Into<TableSchema>) -> Self {
269        let table_schema = table_schema.into();
270        Self {
271            format: ArrowFormat::File,
272            metrics: ExecutionPlanMetricsSet::new(),
273            projection: SplitProjection::unprojected(&table_schema),
274            table_schema,
275        }
276    }
277
278    /// Creates an [`ArrowSource`] for stream format
279    pub fn new_stream_file_source(table_schema: impl Into<TableSchema>) -> Self {
280        let table_schema = table_schema.into();
281        Self {
282            format: ArrowFormat::Stream,
283            metrics: ExecutionPlanMetricsSet::new(),
284            projection: SplitProjection::unprojected(&table_schema),
285            table_schema,
286        }
287    }
288}
289
290impl FileSource for ArrowSource {
291    fn create_file_opener(
292        &self,
293        object_store: Arc<dyn ObjectStore>,
294        _base_config: &FileScanConfig,
295        _partition: usize,
296    ) -> Result<Arc<dyn FileOpener>> {
297        let split_projection = self.projection.clone();
298
299        let opener: Arc<dyn FileOpener> = match self.format {
300            ArrowFormat::File => Arc::new(ArrowFileOpener {
301                object_store,
302                projection: Some(split_projection.file_indices.clone()),
303            }),
304            ArrowFormat::Stream => Arc::new(ArrowStreamFileOpener {
305                object_store,
306                projection: Some(split_projection.file_indices.clone()),
307            }),
308        };
309        ProjectionOpener::try_new(
310            split_projection,
311            opener,
312            self.table_schema.file_schema(),
313        )
314    }
315
316    fn as_any(&self) -> &dyn Any {
317        self
318    }
319
320    fn with_batch_size(&self, _batch_size: usize) -> Arc<dyn FileSource> {
321        Arc::new(Self { ..self.clone() })
322    }
323
324    fn metrics(&self) -> &ExecutionPlanMetricsSet {
325        &self.metrics
326    }
327
328    fn file_type(&self) -> &str {
329        match self.format {
330            ArrowFormat::File => "arrow",
331            ArrowFormat::Stream => "arrow_stream",
332        }
333    }
334
335    fn repartitioned(
336        &self,
337        target_partitions: usize,
338        repartition_file_min_size: usize,
339        output_ordering: Option<LexOrdering>,
340        config: &FileScanConfig,
341    ) -> Result<Option<FileScanConfig>> {
342        match self.format {
343            ArrowFormat::Stream => {
344                // The Arrow IPC stream format doesn't support range-based parallel reading
345                // because it lacks a footer with the information that would be needed to
346                // make range-based parallel reading practical. Without the data in the
347                // footer you would either need to read the the entire file and record the
348                // offsets of the record batches and dictionaries, essentially recreating
349                // the footer's contents, or else each partition would need to read the
350                // entire file up to the correct offset which is a lot of duplicate I/O.
351                // We're opting to avoid that entirely by only acting on a single partition
352                // and reading sequentially.
353                Ok(None)
354            }
355            ArrowFormat::File => {
356                // Use the default trait implementation logic for file format
357                use datafusion_datasource::file_groups::FileGroupPartitioner;
358
359                if config.file_compression_type.is_compressed() {
360                    return Ok(None);
361                }
362
363                let repartitioned_file_groups_option = FileGroupPartitioner::new()
364                    .with_target_partitions(target_partitions)
365                    .with_repartition_file_min_size(repartition_file_min_size)
366                    .with_preserve_order_within_groups(output_ordering.is_some())
367                    .repartition_file_groups(&config.file_groups);
368
369                if let Some(repartitioned_file_groups) = repartitioned_file_groups_option
370                {
371                    let mut source = config.clone();
372                    source.file_groups = repartitioned_file_groups;
373                    return Ok(Some(source));
374                }
375                Ok(None)
376            }
377        }
378    }
379
380    fn table_schema(&self) -> &TableSchema {
381        &self.table_schema
382    }
383
384    fn try_pushdown_projection(
385        &self,
386        projection: &ProjectionExprs,
387    ) -> Result<Option<Arc<dyn FileSource>>> {
388        let mut source = self.clone();
389        source.projection = SplitProjection::new(
390            self.table_schema().file_schema(),
391            &source.projection.source.try_merge(projection)?,
392        );
393        Ok(Some(Arc::new(source)))
394    }
395
396    fn projection(&self) -> Option<&ProjectionExprs> {
397        Some(&self.projection.source)
398    }
399}
400
401/// `FileOpener` wrapper for both Arrow IPC file and stream formats
402pub struct ArrowOpener {
403    pub inner: Arc<dyn FileOpener>,
404}
405
406impl FileOpener for ArrowOpener {
407    fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
408        self.inner.open(partitioned_file)
409    }
410}
411
412impl ArrowOpener {
413    /// Creates a new [`ArrowOpener`]
414    pub fn new(inner: Arc<dyn FileOpener>) -> Self {
415        Self { inner }
416    }
417
418    pub fn new_file_opener(
419        object_store: Arc<dyn ObjectStore>,
420        projection: Option<Vec<usize>>,
421    ) -> Self {
422        Self {
423            inner: Arc::new(ArrowFileOpener {
424                object_store,
425                projection,
426            }),
427        }
428    }
429
430    pub fn new_stream_file_opener(
431        object_store: Arc<dyn ObjectStore>,
432        projection: Option<Vec<usize>>,
433    ) -> Self {
434        Self {
435            inner: Arc::new(ArrowStreamFileOpener {
436                object_store,
437                projection,
438            }),
439        }
440    }
441}
442
443impl From<ArrowSource> for Arc<dyn FileSource> {
444    fn from(source: ArrowSource) -> Self {
445        as_file_source(source)
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use std::{fs::File, io::Read};
452
453    use arrow::datatypes::{DataType, Field, Schema};
454    use arrow_ipc::reader::{FileReader, StreamReader};
455    use bytes::Bytes;
456    use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
457    use datafusion_execution::object_store::ObjectStoreUrl;
458    use object_store::memory::InMemory;
459
460    use super::*;
461
462    #[tokio::test]
463    async fn test_file_opener_without_ranges() -> Result<()> {
464        for filename in ["example.arrow", "example_stream.arrow"] {
465            let path = format!("tests/data/{filename}");
466            let path_str = path.as_str();
467            let mut file = File::open(path_str)?;
468            let file_size = file.metadata()?.len();
469
470            let mut buffer = Vec::new();
471            file.read_to_end(&mut buffer)?;
472            let bytes = Bytes::from(buffer);
473
474            let object_store = Arc::new(InMemory::new());
475            let partitioned_file = PartitionedFile::new(filename, file_size);
476            object_store
477                .put(&partitioned_file.object_meta.location, bytes.into())
478                .await?;
479
480            let schema = match FileReader::try_new(File::open(path_str)?, None) {
481                Ok(reader) => reader.schema(),
482                Err(_) => StreamReader::try_new(File::open(path_str)?, None)?.schema(),
483            };
484
485            let source: Arc<dyn FileSource> = if filename.contains("stream") {
486                Arc::new(ArrowSource::new_stream_file_source(schema))
487            } else {
488                Arc::new(ArrowSource::new_file_source(schema))
489            };
490
491            let scan_config = FileScanConfigBuilder::new(
492                ObjectStoreUrl::local_filesystem(),
493                source.clone(),
494            )
495            .build();
496
497            let file_opener = source.create_file_opener(object_store, &scan_config, 0)?;
498            let mut stream = file_opener.open(partitioned_file)?.await?;
499
500            assert!(stream.next().await.is_some());
501        }
502
503        Ok(())
504    }
505
506    #[tokio::test]
507    async fn test_file_opener_with_ranges() -> Result<()> {
508        let filename = "example.arrow";
509        let path = format!("tests/data/{filename}");
510        let path_str = path.as_str();
511        let mut file = File::open(path_str)?;
512        let file_size = file.metadata()?.len();
513
514        let mut buffer = Vec::new();
515        file.read_to_end(&mut buffer)?;
516        let bytes = Bytes::from(buffer);
517
518        let object_store = Arc::new(InMemory::new());
519        let partitioned_file = PartitionedFile::new_with_range(
520            filename.into(),
521            file_size,
522            0,
523            (file_size - 1) as i64,
524        );
525        object_store
526            .put(&partitioned_file.object_meta.location, bytes.into())
527            .await?;
528
529        let schema = FileReader::try_new(File::open(path_str)?, None)?.schema();
530
531        let source = Arc::new(ArrowSource::new_file_source(schema));
532
533        let scan_config = FileScanConfigBuilder::new(
534            ObjectStoreUrl::local_filesystem(),
535            source.clone(),
536        )
537        .build();
538
539        let file_opener = source.create_file_opener(object_store, &scan_config, 0)?;
540        let mut stream = file_opener.open(partitioned_file)?.await?;
541
542        assert!(stream.next().await.is_some());
543
544        Ok(())
545    }
546
547    #[tokio::test]
548    async fn test_stream_opener_errors_with_ranges() -> Result<()> {
549        let filename = "example_stream.arrow";
550        let path = format!("tests/data/{filename}");
551        let path_str = path.as_str();
552        let mut file = File::open(path_str)?;
553        let file_size = file.metadata()?.len();
554
555        let mut buffer = Vec::new();
556        file.read_to_end(&mut buffer)?;
557        let bytes = Bytes::from(buffer);
558
559        let object_store = Arc::new(InMemory::new());
560        let partitioned_file = PartitionedFile::new_with_range(
561            filename.into(),
562            file_size,
563            0,
564            (file_size - 1) as i64,
565        );
566        object_store
567            .put(&partitioned_file.object_meta.location, bytes.into())
568            .await?;
569
570        let schema = StreamReader::try_new(File::open(path_str)?, None)?.schema();
571
572        let source = Arc::new(ArrowSource::new_stream_file_source(schema));
573
574        let scan_config = FileScanConfigBuilder::new(
575            ObjectStoreUrl::local_filesystem(),
576            source.clone(),
577        )
578        .build();
579
580        let file_opener = source.create_file_opener(object_store, &scan_config, 0)?;
581        let result = file_opener.open(partitioned_file);
582        assert!(result.is_err());
583
584        Ok(())
585    }
586
587    #[tokio::test]
588    async fn test_arrow_stream_repartitioning_not_supported() -> Result<()> {
589        let schema =
590            Arc::new(Schema::new(vec![Field::new("f0", DataType::Int64, false)]));
591        let source = ArrowSource::new_stream_file_source(schema);
592
593        let config = FileScanConfigBuilder::new(
594            ObjectStoreUrl::local_filesystem(),
595            Arc::new(source.clone()) as Arc<dyn FileSource>,
596        )
597        .build();
598
599        for target_partitions in [2, 4, 8, 16] {
600            let result =
601                source.repartitioned(target_partitions, 1024 * 1024, None, &config)?;
602
603            assert!(
604                result.is_none(),
605                "Stream format should not support repartitioning with {target_partitions} partitions",
606            );
607        }
608
609        Ok(())
610    }
611
612    #[tokio::test]
613    async fn test_stream_opener_with_projection() -> Result<()> {
614        let filename = "example_stream.arrow";
615        let path = format!("tests/data/{filename}");
616        let path_str = path.as_str();
617        let mut file = File::open(path_str)?;
618        let file_size = file.metadata()?.len();
619
620        let mut buffer = Vec::new();
621        file.read_to_end(&mut buffer)?;
622        let bytes = Bytes::from(buffer);
623
624        let object_store = Arc::new(InMemory::new());
625        let partitioned_file = PartitionedFile::new(filename, file_size);
626        object_store
627            .put(&partitioned_file.object_meta.location, bytes.into())
628            .await?;
629
630        let opener = ArrowStreamFileOpener {
631            object_store,
632            projection: Some(vec![0]), // just the first column
633        };
634
635        let mut stream = opener.open(partitioned_file)?.await?;
636
637        if let Some(batch) = stream.next().await {
638            let batch = batch?;
639            assert_eq!(
640                batch.num_columns(),
641                1,
642                "Projection should result in 1 column"
643            );
644        } else {
645            panic!("Expected at least one batch");
646        }
647
648        Ok(())
649    }
650}