use std::{collections::HashMap, io::Read, sync::Arc};
use bytes::Bytes;
use prost::Message;
use snafu::ResultExt;
use crate::{
column::Column,
compression::{Compression, Decompressor},
error::{self, IoSnafu, Result},
proto::{self, stream::Kind, StripeFooter},
reader::{metadata::FileMetadata, ChunkReader},
schema::RootDataType,
statistics::ColumnStatistics,
};
#[derive(Debug, Clone)]
pub struct StripeMetadata {
column_statistics: Vec<ColumnStatistics>,
offset: u64,
index_length: u64,
data_length: u64,
footer_length: u64,
number_of_rows: u64,
}
impl StripeMetadata {
pub fn offset(&self) -> u64 {
self.offset
}
pub fn index_length(&self) -> u64 {
self.index_length
}
pub fn data_length(&self) -> u64 {
self.data_length
}
pub fn footer_length(&self) -> u64 {
self.footer_length
}
pub fn number_of_rows(&self) -> u64 {
self.number_of_rows
}
pub fn column_statistics(&self) -> &[ColumnStatistics] {
&self.column_statistics
}
pub fn footer_offset(&self) -> u64 {
self.offset + self.index_length + self.data_length
}
}
impl TryFrom<(&proto::StripeInformation, &proto::StripeStatistics)> for StripeMetadata {
type Error = error::OrcError;
fn try_from(value: (&proto::StripeInformation, &proto::StripeStatistics)) -> Result<Self> {
let (info, statistics) = value;
let column_statistics = statistics
.col_stats
.iter()
.map(TryFrom::try_from)
.collect::<Result<Vec<_>>>()?;
Ok(Self {
column_statistics,
offset: info.offset(),
index_length: info.index_length(),
data_length: info.data_length(),
footer_length: info.footer_length(),
number_of_rows: info.number_of_rows(),
})
}
}
impl TryFrom<&proto::StripeInformation> for StripeMetadata {
type Error = error::OrcError;
fn try_from(value: &proto::StripeInformation) -> Result<Self> {
Ok(Self {
column_statistics: vec![],
offset: value.offset(),
index_length: value.index_length(),
data_length: value.data_length(),
footer_length: value.footer_length(),
number_of_rows: value.number_of_rows(),
})
}
}
#[derive(Debug)]
pub struct Stripe {
columns: Vec<Column>,
stream_map: StreamMap,
number_of_rows: usize,
tz: Option<chrono_tz::Tz>,
}
impl Stripe {
pub fn new<R: ChunkReader>(
reader: &mut R,
file_metadata: &FileMetadata,
projected_data_type: &RootDataType,
info: &StripeMetadata,
) -> Result<Self> {
let footer = reader
.get_bytes(info.footer_offset(), info.footer_length())
.context(IoSnafu)?;
let footer = Arc::new(deserialize_stripe_footer(
footer,
file_metadata.compression(),
)?);
let columns: Vec<Column> = projected_data_type
.children()
.iter()
.map(|col| {
Column::new(
col.name().to_string(),
col.data_type().clone(),
footer.clone(),
)
})
.collect();
let mut stream_map = HashMap::new();
let mut stream_offset = info.offset();
for stream in &footer.streams {
let length = stream.length();
let column_id = stream.column();
if projected_data_type.contains_column_index(column_id as usize) {
let kind = stream.kind();
let data = reader.get_bytes(stream_offset, length).context(IoSnafu)?;
stream_map.insert((column_id, kind), data);
}
stream_offset += length;
}
let tz = footer
.writer_timezone
.as_ref()
.map(|a| a.parse::<chrono_tz::Tz>().unwrap());
Ok(Self {
columns,
stream_map: StreamMap {
inner: stream_map,
compression: file_metadata.compression(),
},
number_of_rows: info.number_of_rows() as usize,
tz,
})
}
#[cfg(feature = "async")]
pub async fn new_async<R: crate::reader::AsyncChunkReader>(
reader: &mut R,
file_metadata: &FileMetadata,
projected_data_type: &RootDataType,
info: &StripeMetadata,
) -> Result<Self> {
let footer = reader
.get_bytes(info.footer_offset(), info.footer_length())
.await
.context(IoSnafu)?;
let footer = Arc::new(deserialize_stripe_footer(
footer,
file_metadata.compression(),
)?);
let columns: Vec<Column> = projected_data_type
.children()
.iter()
.map(|col| {
Column::new(
col.name().to_string(),
col.data_type().clone(),
footer.clone(),
)
})
.collect();
let mut stream_map = HashMap::new();
let mut stream_offset = info.offset();
for stream in &footer.streams {
let length = stream.length();
let column_id = stream.column();
if projected_data_type.contains_column_index(column_id as usize) {
let kind = stream.kind();
let data = reader
.get_bytes(stream_offset, length)
.await
.context(IoSnafu)?;
stream_map.insert((column_id, kind), data);
}
stream_offset += length;
}
let tz = footer
.writer_timezone
.as_ref()
.map(|a| a.parse::<chrono_tz::Tz>().unwrap());
Ok(Self {
columns,
stream_map: StreamMap {
inner: stream_map,
compression: file_metadata.compression(),
},
number_of_rows: info.number_of_rows() as usize,
tz,
})
}
pub fn number_of_rows(&self) -> usize {
self.number_of_rows
}
pub fn stream_map(&self) -> &StreamMap {
&self.stream_map
}
pub fn columns(&self) -> &[Column] {
&self.columns
}
pub fn writer_tz(&self) -> Option<chrono_tz::Tz> {
self.tz
}
pub fn read_row_indexes(
&self,
file_metadata: &crate::reader::metadata::FileMetadata,
) -> Result<crate::row_index::StripeRowIndex> {
let rows_per_group = file_metadata.row_index_stride().unwrap_or(10_000);
crate::row_index::parse_stripe_row_indexes(
&self.stream_map,
&self.columns,
self.number_of_rows,
rows_per_group,
)
}
}
#[derive(Debug)]
pub struct StreamMap {
inner: HashMap<(u32, Kind), Bytes>,
compression: Option<Compression>,
}
impl StreamMap {
pub fn get(&self, column: &Column, kind: Kind) -> Decompressor {
self.get_opt(column, kind)
.unwrap_or_else(Decompressor::empty)
}
pub fn get_opt(&self, column: &Column, kind: Kind) -> Option<Decompressor> {
let column_id = column.column_id();
self.inner
.get(&(column_id, kind))
.cloned()
.map(|data| Decompressor::new(data, self.compression, vec![]))
}
}
fn deserialize_stripe_footer(
bytes: Bytes,
compression: Option<Compression>,
) -> Result<StripeFooter> {
let mut buffer = vec![];
Decompressor::new(bytes, compression, vec![])
.read_to_end(&mut buffer)
.context(error::IoSnafu)?;
StripeFooter::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
}