use std::collections::HashSet;
use std::{collections::HashMap, io::Read, sync::Arc};
use bytes::Bytes;
use prost::Message;
use snafu::ResultExt;
use crate::{
column::Column,
compression::{Compression, Decompressor},
error::{self, IoSnafu, Result},
proto::{self, stream::Kind, StripeFooter},
reader::{metadata::FileMetadata, ChunkReader},
schema::RootDataType,
statistics::ColumnStatistics,
};
#[derive(Debug, Clone)]
pub struct StripeMetadata {
column_statistics: Vec<ColumnStatistics>,
offset: u64,
index_length: u64,
data_length: u64,
footer_length: u64,
number_of_rows: u64,
}
impl StripeMetadata {
pub fn offset(&self) -> u64 {
self.offset
}
pub fn index_length(&self) -> u64 {
self.index_length
}
pub fn data_length(&self) -> u64 {
self.data_length
}
pub fn footer_length(&self) -> u64 {
self.footer_length
}
pub fn number_of_rows(&self) -> u64 {
self.number_of_rows
}
pub fn column_statistics(&self) -> &[ColumnStatistics] {
&self.column_statistics
}
pub fn footer_offset(&self) -> u64 {
self.offset + self.index_length + self.data_length
}
}
impl TryFrom<(&proto::StripeInformation, &proto::StripeStatistics)> for StripeMetadata {
type Error = error::OrcError;
fn try_from(value: (&proto::StripeInformation, &proto::StripeStatistics)) -> Result<Self> {
let column_statistics = value
.1
.col_stats
.iter()
.map(TryFrom::try_from)
.collect::<Result<Vec<_>>>()?;
Ok(Self {
column_statistics,
offset: value.0.offset(),
index_length: value.0.index_length(),
data_length: value.0.data_length(),
footer_length: value.0.footer_length(),
number_of_rows: value.0.number_of_rows(),
})
}
}
impl TryFrom<&proto::StripeInformation> for StripeMetadata {
type Error = error::OrcError;
fn try_from(value: &proto::StripeInformation) -> Result<Self> {
Ok(Self {
column_statistics: vec![],
offset: value.offset(),
index_length: value.index_length(),
data_length: value.data_length(),
footer_length: value.footer_length(),
number_of_rows: value.number_of_rows(),
})
}
}
#[derive(Debug)]
pub struct Stripe {
columns: Vec<Column>,
stream_map: Arc<StreamMap>,
number_of_rows: usize,
tz: Option<chrono_tz::Tz>,
}
impl Stripe {
pub fn new<R: ChunkReader>(
reader: &mut R,
file_metadata: &Arc<FileMetadata>,
projected_data_type: &RootDataType,
info: &StripeMetadata,
) -> Result<Self> {
let compression = file_metadata.compression();
let footer = reader
.get_bytes(info.footer_offset(), info.footer_length())
.context(IoSnafu)?;
let footer = Arc::new(deserialize_stripe_footer(footer, compression)?);
let columns: Vec<Column> = projected_data_type
.children()
.iter()
.map(|col| Column::new(col.name(), col.data_type(), &footer))
.collect();
let column_ids = collect_required_column_ids(&columns);
let mut stream_map = HashMap::new();
let mut stream_offset = info.offset();
for stream in &footer.streams {
let length = stream.length();
let column_id = stream.column();
if column_ids.contains(&column_id) {
let kind = stream.kind();
let data = Column::read_stream(reader, stream_offset, length)?;
stream_map.insert((column_id, kind), data);
}
stream_offset += length;
}
let tz: Option<chrono_tz::Tz> = footer
.writer_timezone
.as_ref()
.map(|a| a.parse::<chrono_tz::Tz>().unwrap());
Ok(Self {
columns,
stream_map: Arc::new(StreamMap {
inner: stream_map,
compression,
}),
number_of_rows: info.number_of_rows() as usize,
tz,
})
}
#[cfg(feature = "async")]
pub async fn new_async<R: crate::reader::AsyncChunkReader>(
reader: &mut R,
file_metadata: &Arc<FileMetadata>,
projected_data_type: &RootDataType,
info: &StripeMetadata,
) -> Result<Self> {
let compression = file_metadata.compression();
let footer = reader
.get_bytes(info.footer_offset(), info.footer_length())
.await
.context(IoSnafu)?;
let footer = Arc::new(deserialize_stripe_footer(footer, compression)?);
let columns: Vec<Column> = projected_data_type
.children()
.iter()
.map(|col| Column::new(col.name(), col.data_type(), &footer))
.collect();
let column_ids = collect_required_column_ids(&columns);
let mut stream_map = HashMap::new();
let mut stream_offset = info.offset();
for stream in &footer.streams {
let length = stream.length();
let column_id = stream.column();
if column_ids.contains(&column_id) {
let kind = stream.kind();
let data = Column::read_stream_async(reader, stream_offset, length).await?;
stream_map.insert((column_id, kind), data);
}
stream_offset += length;
}
let tz: Option<chrono_tz::Tz> = footer
.writer_timezone
.as_ref()
.map(|a| a.parse::<chrono_tz::Tz>().unwrap());
Ok(Self {
columns,
stream_map: Arc::new(StreamMap {
inner: stream_map,
compression,
}),
number_of_rows: info.number_of_rows() as usize,
tz,
})
}
pub fn number_of_rows(&self) -> usize {
self.number_of_rows
}
pub fn stream_map(&self) -> &StreamMap {
&self.stream_map
}
pub fn columns(&self) -> &[Column] {
&self.columns
}
pub fn writer_tz(&self) -> Option<chrono_tz::Tz> {
self.tz
}
}
#[derive(Debug)]
pub struct StreamMap {
pub inner: HashMap<(u32, Kind), Bytes>,
pub compression: Option<Compression>,
}
impl StreamMap {
pub fn get(&self, column: &Column, kind: Kind) -> Decompressor {
self.get_opt(column, kind)
.unwrap_or_else(Decompressor::empty)
}
pub fn get_opt(&self, column: &Column, kind: Kind) -> Option<Decompressor> {
let column_id = column.column_id();
self.inner
.get(&(column_id, kind))
.cloned()
.map(|data| Decompressor::new(data, self.compression, vec![]))
}
}
fn deserialize_stripe_footer(
bytes: Bytes,
compression: Option<Compression>,
) -> Result<StripeFooter> {
let mut buffer = vec![];
Decompressor::new(bytes, compression, vec![])
.read_to_end(&mut buffer)
.context(error::IoSnafu)?;
StripeFooter::decode(buffer.as_slice()).context(error::DecodeProtoSnafu)
}
fn collect_required_column_ids(columns: &[Column]) -> HashSet<u32> {
let mut set = HashSet::new();
for column in columns {
set.insert(column.column_id());
set.extend(collect_required_column_ids(&column.children()));
}
set
}