use super::{FilePredicates, FormatFileReader};
use crate::io::FileRead;
use crate::spec::DataField;
use crate::table::{ArrowRecordBatchStream, RowRange};
use crate::Error;
use async_trait::async_trait;
use bytes::Bytes;
use futures::{future::BoxFuture, StreamExt};
use orc_rust::projection::ProjectionMask;
use orc_rust::reader::AsyncChunkReader;
use orc_rust::ArrowReaderBuilder;
pub(crate) struct OrcFormatReader;
#[async_trait]
impl FormatFileReader for OrcFormatReader {
async fn read_batch_stream(
&self,
reader: Box<dyn FileRead>,
file_size: u64,
read_fields: &[DataField],
_predicates: Option<&FilePredicates>,
batch_size: Option<usize>,
row_selection: Option<Vec<RowRange>>,
) -> crate::Result<ArrowRecordBatchStream> {
let orc_reader = OrcFileReader::new(file_size, reader);
let builder = ArrowReaderBuilder::try_new_async(orc_reader)
.await
.map_err(|e| Error::UnexpectedError {
message: format!("Failed to open ORC file: {e}"),
source: Some(Box::new(e)),
})?;
let projected_names: Vec<&str> = read_fields.iter().map(|f| f.name()).collect();
let projection =
ProjectionMask::named_roots(builder.file_metadata().root_data_type(), &projected_names);
let mut builder = builder.with_projection(projection);
if let Some(size) = batch_size {
builder = builder.with_batch_size(size);
}
if let Some(ref ranges) = row_selection {
let total_rows: usize = builder
.file_metadata()
.stripe_metadatas()
.iter()
.map(|s| s.number_of_rows() as usize)
.sum();
let selection = build_range_row_selection(total_rows, ranges);
builder = builder.with_row_selection(selection);
}
let stream = builder.build_async();
Ok(stream
.map(|r| {
r.map_err(|e| Error::UnexpectedError {
message: format!("ORC read error: {e}"),
source: Some(Box::new(e)),
})
})
.boxed())
}
}
fn build_range_row_selection(
total_rows: usize,
row_ranges: &[RowRange],
) -> orc_rust::row_selection::RowSelection {
if total_rows == 0 {
return orc_rust::row_selection::RowSelection::default();
}
let file_end = total_rows as i64 - 1;
let mut local_ranges: Vec<(usize, usize)> = row_ranges
.iter()
.filter_map(|r| {
if r.to() < 0 || r.from() > file_end {
return None;
}
let local_start = r.from().max(0) as usize;
let local_end = (r.to().min(file_end) + 1) as usize;
Some((local_start, local_end))
})
.collect();
local_ranges.sort_by_key(|&(s, _)| s);
orc_rust::row_selection::RowSelection::from_consecutive_ranges(
local_ranges.into_iter().map(|(s, e)| s..e),
total_rows,
)
}
struct OrcFileReader {
file_size: u64,
r: Box<dyn FileRead>,
}
impl OrcFileReader {
fn new(file_size: u64, r: Box<dyn FileRead>) -> Self {
Self { file_size, r }
}
}
impl AsyncChunkReader for OrcFileReader {
fn len(&mut self) -> BoxFuture<'_, std::io::Result<u64>> {
Box::pin(std::future::ready(Ok(self.file_size)))
}
fn get_bytes(
&mut self,
offset_from_start: u64,
length: u64,
) -> BoxFuture<'_, std::io::Result<Bytes>> {
Box::pin(async move {
self.r
.read(offset_from_start..offset_from_start + length)
.await
.map_err(|e| std::io::Error::other(e.to_string()))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use orc_rust::row_selection::RowSelector;
#[test]
fn test_build_range_row_selection_single_range() {
let ranges = vec![RowRange::new(2, 4)];
let sel = build_range_row_selection(6, &ranges);
let expected: orc_rust::row_selection::RowSelection = vec![
RowSelector::skip(2),
RowSelector::select(3),
RowSelector::skip(1),
]
.into();
assert_eq!(sel, expected);
}
#[test]
fn test_build_range_row_selection_with_offset() {
let ranges = vec![RowRange::new(1, 3)];
let sel = build_range_row_selection(5, &ranges);
let expected: orc_rust::row_selection::RowSelection = vec![
RowSelector::skip(1),
RowSelector::select(3),
RowSelector::skip(1),
]
.into();
assert_eq!(sel, expected);
}
#[test]
fn test_build_range_row_selection_out_of_file() {
let ranges = vec![RowRange::new(10, 20)];
let sel = build_range_row_selection(5, &ranges);
let expected: orc_rust::row_selection::RowSelection = vec![RowSelector::skip(5)].into();
assert_eq!(sel, expected);
}
}