vortex_scan/
arrow.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use arrow_array::cast::AsArray;
5use arrow_array::{RecordBatch, RecordBatchReader};
6use arrow_schema::{ArrowError, DataType, SchemaRef};
7use vortex_array::ArrayRef;
8use vortex_array::arrow::IntoArrowArray;
9use vortex_error::{VortexError, VortexResult};
10
11use crate::ScanBuilder;
12
13impl ScanBuilder<ArrayRef> {
14    /// Creates a new thread-safe `RecordBatchReader` from the scan builder.
15    ///
16    /// This reader can be cloned and passed to multiple threads for concurrent processing.
17    ///
18    /// The `schema` parameter is used to define the schema of the resulting record batches. In
19    /// general, it is not possible to exactly infer an Arrow schema from a Vortex
20    /// [`vortex_dtype::DType`], therefore it is required to be provided explicitly.
21    pub fn into_record_batch_reader(
22        self,
23        schema: SchemaRef,
24    ) -> VortexResult<impl RecordBatchReader + Send + Clone + 'static> {
25        let data_type = DataType::Struct(schema.fields().clone());
26
27        let iter = self
28            .into_array_iter()?
29            .map(move |chunk| to_record_batch(chunk, &data_type));
30
31        Ok(RecordBatchIteratorAdapter { iter, schema })
32    }
33
34    /// Creates a new `RecordBatchReader` from the scan builder that internally drives the scan
35    /// on multithreaded pool of workers.
36    #[cfg(feature = "tokio")]
37    pub fn into_record_batch_reader_multithread(
38        self,
39        schema: SchemaRef,
40    ) -> VortexResult<impl RecordBatchReader + Send + 'static> {
41        use arrow_array::RecordBatchIterator;
42
43        let data_type = DataType::Struct(schema.fields().clone());
44
45        let iter = self.into_iter_multithread(move |chunk| to_record_batch(chunk, &data_type))?;
46
47        Ok(RecordBatchIterator::new(iter, schema))
48    }
49}
50
51fn to_record_batch(
52    chunk: VortexResult<ArrayRef>,
53    data_type: &DataType,
54) -> Result<RecordBatch, ArrowError> {
55    chunk
56        .and_then(|array| {
57            let arrow = array.into_arrow(data_type)?;
58            Ok::<_, VortexError>(RecordBatch::from(arrow.as_struct().clone()))
59        })
60        .map_err(|e| ArrowError::ExternalError(Box::new(e)))
61}
62
63/// We create an adapter for record batch iterators that supports clone.
64/// This allows us to create thread-safe [`RecordBatchIterator`].
65#[derive(Clone)]
66struct RecordBatchIteratorAdapter<I> {
67    iter: I,
68    schema: SchemaRef,
69}
70
71impl<I> Iterator for RecordBatchIteratorAdapter<I>
72where
73    I: Iterator<Item = Result<RecordBatch, ArrowError>>,
74{
75    type Item = Result<RecordBatch, ArrowError>;
76
77    fn next(&mut self) -> Option<Self::Item> {
78        self.iter.next()
79    }
80}
81
82impl<I> RecordBatchReader for RecordBatchIteratorAdapter<I>
83where
84    I: Iterator<Item = Result<RecordBatch, ArrowError>>,
85{
86    fn schema(&self) -> SchemaRef {
87        self.schema.clone()
88    }
89}