use std::collections::HashMap;
use crate::bloom_filter::BloomFilter;
use crate::error::Result;
use crate::proto;
use crate::statistics::ColumnStatistics;
#[derive(Debug, Clone)]
pub struct RowGroupEntry {
pub statistics: Option<ColumnStatistics>,
pub positions: Vec<u64>,
pub bloom_filter: Option<BloomFilter>,
}
impl RowGroupEntry {
pub fn new(statistics: Option<ColumnStatistics>, positions: Vec<u64>) -> Self {
Self {
statistics,
positions,
bloom_filter: None,
}
}
pub fn with_bloom_filter(mut self, bloom_filter: Option<BloomFilter>) -> Self {
self.bloom_filter = bloom_filter;
self
}
}
#[derive(Debug, Clone)]
pub struct RowGroupIndex {
entries: Vec<RowGroupEntry>,
rows_per_group: usize,
column_index: usize,
}
impl RowGroupIndex {
pub fn new(entries: Vec<RowGroupEntry>, rows_per_group: usize, column_index: usize) -> Self {
Self {
entries,
rows_per_group,
column_index,
}
}
pub fn num_row_groups(&self) -> usize {
self.entries.len()
}
pub fn rows_per_group(&self) -> usize {
self.rows_per_group
}
pub fn column_index(&self) -> usize {
self.column_index
}
pub fn row_group_stats(&self, row_group_idx: usize) -> Option<&ColumnStatistics> {
self.entries
.get(row_group_idx)
.and_then(|entry| entry.statistics.as_ref())
}
pub fn entries(&self) -> impl Iterator<Item = &RowGroupEntry> {
self.entries.iter()
}
pub(crate) fn entries_mut(&mut self) -> impl Iterator<Item = &mut RowGroupEntry> {
self.entries.iter_mut()
}
pub fn entry(&self, row_group_idx: usize) -> Option<&RowGroupEntry> {
self.entries.get(row_group_idx)
}
}
#[derive(Debug, Clone)]
pub struct StripeRowIndex {
columns: HashMap<usize, RowGroupIndex>,
total_rows: usize,
rows_per_group: usize,
}
impl StripeRowIndex {
pub fn new(
columns: HashMap<usize, RowGroupIndex>,
total_rows: usize,
rows_per_group: usize,
) -> Self {
Self {
columns,
total_rows,
rows_per_group,
}
}
pub fn column(&self, column_idx: usize) -> Option<&RowGroupIndex> {
self.columns.get(&column_idx)
}
pub fn num_row_groups(&self) -> usize {
if self.rows_per_group == 0 {
return 0;
}
self.total_rows.div_ceil(self.rows_per_group)
}
pub fn row_group_stats(
&self,
column_idx: usize,
row_group_idx: usize,
) -> Option<&ColumnStatistics> {
self.column(column_idx)
.and_then(|col_index| col_index.row_group_stats(row_group_idx))
}
pub fn total_rows(&self) -> usize {
self.total_rows
}
pub fn rows_per_group(&self) -> usize {
self.rows_per_group
}
pub fn column_indices(&self) -> impl Iterator<Item = usize> + '_ {
self.columns.keys().copied()
}
}
fn parse_row_index(
proto: &proto::RowIndex,
column_index: usize,
rows_per_group: usize,
) -> Result<RowGroupIndex> {
use crate::statistics::ColumnStatistics;
let entries: Result<Vec<RowGroupEntry>> = proto
.entry
.iter()
.map(|entry| {
let statistics = entry
.statistics
.as_ref()
.map(ColumnStatistics::try_from)
.transpose()?;
Ok(RowGroupEntry::new(statistics, entry.positions.clone()))
})
.collect();
Ok(RowGroupIndex::new(entries?, rows_per_group, column_index))
}
pub fn parse_stripe_row_indexes(
stripe_stream_map: &crate::stripe::StreamMap,
columns: &[crate::column::Column],
total_rows: usize,
rows_per_group: usize,
) -> Result<StripeRowIndex> {
use crate::error::{DecodeProtoSnafu, IoSnafu};
use crate::proto::stream::Kind;
use prost::Message;
use snafu::ResultExt;
let mut row_indexes = HashMap::new();
for column in columns {
let column_id = column.column_id();
let row_index_stream = stripe_stream_map.get_opt(column, Kind::RowIndex);
if let Some(mut decompressor) = row_index_stream {
let mut buffer = Vec::new();
std::io::Read::read_to_end(&mut decompressor, &mut buffer).context(IoSnafu)?;
let proto_row_index =
proto::RowIndex::decode(buffer.as_slice()).context(DecodeProtoSnafu)?;
let row_group_index =
parse_row_index(&proto_row_index, column_id as usize, rows_per_group)?;
row_indexes.insert(column_id as usize, row_group_index);
}
}
let bloom_filters = parse_bloom_filters(stripe_stream_map, columns)?;
for (column_id, filters) in bloom_filters {
if let Some(row_group_index) = row_indexes.get_mut(&column_id) {
let entry_count = row_group_index.num_row_groups();
assert_eq!(
entry_count,
filters.len(),
"Bloom filter count mismatch: expected {} but got {} for column {}",
entry_count,
filters.len(),
column_id
);
for (entry, bloom) in row_group_index.entries_mut().zip(filters.into_iter()) {
entry.bloom_filter = Some(bloom);
}
}
}
Ok(StripeRowIndex::new(row_indexes, total_rows, rows_per_group))
}
fn parse_bloom_filters(
stripe_stream_map: &crate::stripe::StreamMap,
columns: &[crate::column::Column],
) -> Result<HashMap<usize, Vec<BloomFilter>>> {
use crate::error::{DecodeProtoSnafu, IoSnafu};
use crate::proto::stream::Kind;
use prost::Message;
use snafu::ResultExt;
let mut bloom_indexes = HashMap::new();
for column in columns {
let column_id = column.column_id();
let bloom_stream = stripe_stream_map
.get_opt(column, Kind::BloomFilter)
.or_else(|| stripe_stream_map.get_opt(column, Kind::BloomFilterUtf8));
if let Some(mut decompressor) = bloom_stream {
let mut buffer = Vec::new();
std::io::Read::read_to_end(&mut decompressor, &mut buffer).context(IoSnafu)?;
let proto_bloom_index =
proto::BloomFilterIndex::decode(buffer.as_slice()).context(DecodeProtoSnafu)?;
let filters: Vec<BloomFilter> = proto_bloom_index
.bloom_filter
.iter()
.filter_map(BloomFilter::try_from_proto)
.collect();
bloom_indexes.insert(column_id as usize, filters);
}
}
Ok(bloom_indexes)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_row_group_index() {
let entries = vec![
RowGroupEntry::new(None, vec![1, 2, 3]),
RowGroupEntry::new(None, vec![4, 5, 6]),
];
let index = RowGroupIndex::new(entries, 10000, 0);
assert_eq!(index.num_row_groups(), 2);
assert_eq!(index.rows_per_group(), 10000);
assert_eq!(index.column_index(), 0);
}
#[test]
fn test_stripe_row_index() {
let mut columns = HashMap::new();
let entries = vec![RowGroupEntry::new(None, vec![])];
columns.insert(0, RowGroupIndex::new(entries, 10000, 0));
let stripe_index = StripeRowIndex::new(columns, 50000, 10000);
assert_eq!(stripe_index.num_row_groups(), 5);
assert_eq!(stripe_index.total_rows(), 50000);
assert_eq!(stripe_index.rows_per_group(), 10000);
}
}