1use std::collections::HashMap;
25
26use crate::bloom_filter::BloomFilter;
27use crate::error::Result;
28use crate::proto;
29use crate::statistics::ColumnStatistics;
30
31#[derive(Debug, Clone)]
37pub struct RowGroupEntry {
38 pub statistics: Option<ColumnStatistics>,
40
41 pub positions: Vec<u64>,
52
53 pub bloom_filter: Option<BloomFilter>,
55}
56
57impl RowGroupEntry {
58 pub fn new(statistics: Option<ColumnStatistics>, positions: Vec<u64>) -> Self {
59 Self {
60 statistics,
61 positions,
62 bloom_filter: None,
63 }
64 }
65
66 pub fn with_bloom_filter(mut self, bloom_filter: Option<BloomFilter>) -> Self {
67 self.bloom_filter = bloom_filter;
68 self
69 }
70}
71
72#[derive(Debug, Clone)]
77pub struct RowGroupIndex {
78 entries: Vec<RowGroupEntry>,
80
81 rows_per_group: usize,
83
84 column_index: usize,
86}
87
88impl RowGroupIndex {
89 pub fn new(entries: Vec<RowGroupEntry>, rows_per_group: usize, column_index: usize) -> Self {
90 Self {
91 entries,
92 rows_per_group,
93 column_index,
94 }
95 }
96
97 pub fn num_row_groups(&self) -> usize {
99 self.entries.len()
100 }
101
102 pub fn rows_per_group(&self) -> usize {
104 self.rows_per_group
105 }
106
107 pub fn column_index(&self) -> usize {
109 self.column_index
110 }
111
112 pub fn row_group_stats(&self, row_group_idx: usize) -> Option<&ColumnStatistics> {
114 self.entries
115 .get(row_group_idx)
116 .and_then(|entry| entry.statistics.as_ref())
117 }
118
119 pub fn entries(&self) -> impl Iterator<Item = &RowGroupEntry> {
121 self.entries.iter()
122 }
123
124 pub(crate) fn entries_mut(&mut self) -> impl Iterator<Item = &mut RowGroupEntry> {
126 self.entries.iter_mut()
127 }
128
129 pub fn entry(&self, row_group_idx: usize) -> Option<&RowGroupEntry> {
131 self.entries.get(row_group_idx)
132 }
133}
134
135#[derive(Debug, Clone)]
140pub struct StripeRowIndex {
141 columns: HashMap<usize, RowGroupIndex>,
143
144 total_rows: usize,
146
147 rows_per_group: usize,
149}
150
151impl StripeRowIndex {
152 pub fn new(
153 columns: HashMap<usize, RowGroupIndex>,
154 total_rows: usize,
155 rows_per_group: usize,
156 ) -> Self {
157 Self {
158 columns,
159 total_rows,
160 rows_per_group,
161 }
162 }
163
164 pub fn column(&self, column_idx: usize) -> Option<&RowGroupIndex> {
166 self.columns.get(&column_idx)
167 }
168
169 pub fn num_row_groups(&self) -> usize {
171 if self.rows_per_group == 0 {
172 return 0;
173 }
174 self.total_rows.div_ceil(self.rows_per_group)
175 }
176
177 pub fn row_group_stats(
179 &self,
180 column_idx: usize,
181 row_group_idx: usize,
182 ) -> Option<&ColumnStatistics> {
183 self.column(column_idx)
184 .and_then(|col_index| col_index.row_group_stats(row_group_idx))
185 }
186
187 pub fn total_rows(&self) -> usize {
189 self.total_rows
190 }
191
192 pub fn rows_per_group(&self) -> usize {
194 self.rows_per_group
195 }
196
197 pub fn column_indices(&self) -> impl Iterator<Item = usize> + '_ {
199 self.columns.keys().copied()
200 }
201}
202
203fn parse_row_index(
205 proto: &proto::RowIndex,
206 column_index: usize,
207 rows_per_group: usize,
208) -> Result<RowGroupIndex> {
209 use crate::statistics::ColumnStatistics;
210
211 let entries: Result<Vec<RowGroupEntry>> = proto
212 .entry
213 .iter()
214 .map(|entry| {
215 let statistics = entry
216 .statistics
217 .as_ref()
218 .map(ColumnStatistics::try_from)
219 .transpose()?;
220 Ok(RowGroupEntry::new(statistics, entry.positions.clone()))
221 })
222 .collect();
223
224 Ok(RowGroupIndex::new(entries?, rows_per_group, column_index))
225}
226
227pub fn parse_stripe_row_indexes(
236 stripe_stream_map: &crate::stripe::StreamMap,
237 columns: &[crate::column::Column],
238 total_rows: usize,
239 rows_per_group: usize,
240) -> Result<StripeRowIndex> {
241 use crate::error::{DecodeProtoSnafu, IoSnafu};
242 use crate::proto::stream::Kind;
243 use prost::Message;
244 use snafu::ResultExt;
245
246 let mut row_indexes = HashMap::new();
247
248 for column in columns {
249 let column_id = column.column_id();
250
251 let row_index_stream = stripe_stream_map.get_opt(column, Kind::RowIndex);
253
254 if let Some(mut decompressor) = row_index_stream {
255 let mut buffer = Vec::new();
257 std::io::Read::read_to_end(&mut decompressor, &mut buffer).context(IoSnafu)?;
258
259 let proto_row_index =
261 proto::RowIndex::decode(buffer.as_slice()).context(DecodeProtoSnafu)?;
262
263 let row_group_index =
265 parse_row_index(&proto_row_index, column_id as usize, rows_per_group)?;
266 row_indexes.insert(column_id as usize, row_group_index);
267 }
268 }
269
270 let bloom_filters = parse_bloom_filters(stripe_stream_map, columns)?;
272 for (column_id, filters) in bloom_filters {
273 if let Some(row_group_index) = row_indexes.get_mut(&column_id) {
274 let entry_count = row_group_index.num_row_groups();
275 assert_eq!(
276 entry_count,
277 filters.len(),
278 "Bloom filter count mismatch: expected {} but got {} for column {}",
279 entry_count,
280 filters.len(),
281 column_id
282 );
283 for (entry, bloom) in row_group_index.entries_mut().zip(filters.into_iter()) {
284 entry.bloom_filter = Some(bloom);
285 }
286 }
287 }
288
289 Ok(StripeRowIndex::new(row_indexes, total_rows, rows_per_group))
290}
291
292fn parse_bloom_filters(
294 stripe_stream_map: &crate::stripe::StreamMap,
295 columns: &[crate::column::Column],
296) -> Result<HashMap<usize, Vec<BloomFilter>>> {
297 use crate::error::{DecodeProtoSnafu, IoSnafu};
298 use crate::proto::stream::Kind;
299 use prost::Message;
300 use snafu::ResultExt;
301
302 let mut bloom_indexes = HashMap::new();
303
304 for column in columns {
305 let column_id = column.column_id();
306
307 let bloom_stream = stripe_stream_map
308 .get_opt(column, Kind::BloomFilter)
309 .or_else(|| stripe_stream_map.get_opt(column, Kind::BloomFilterUtf8));
310
311 if let Some(mut decompressor) = bloom_stream {
312 let mut buffer = Vec::new();
313 std::io::Read::read_to_end(&mut decompressor, &mut buffer).context(IoSnafu)?;
314
315 let proto_bloom_index =
316 proto::BloomFilterIndex::decode(buffer.as_slice()).context(DecodeProtoSnafu)?;
317
318 let filters: Vec<BloomFilter> = proto_bloom_index
319 .bloom_filter
320 .iter()
321 .filter_map(BloomFilter::try_from_proto)
322 .collect();
323
324 bloom_indexes.insert(column_id as usize, filters);
325 }
326 }
327
328 Ok(bloom_indexes)
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334
335 #[test]
336 fn test_row_group_index() {
337 let entries = vec![
338 RowGroupEntry::new(None, vec![1, 2, 3]),
339 RowGroupEntry::new(None, vec![4, 5, 6]),
340 ];
341 let index = RowGroupIndex::new(entries, 10000, 0);
342
343 assert_eq!(index.num_row_groups(), 2);
344 assert_eq!(index.rows_per_group(), 10000);
345 assert_eq!(index.column_index(), 0);
346 }
347
348 #[test]
349 fn test_stripe_row_index() {
350 let mut columns = HashMap::new();
351 let entries = vec![RowGroupEntry::new(None, vec![])];
352 columns.insert(0, RowGroupIndex::new(entries, 10000, 0));
353
354 let stripe_index = StripeRowIndex::new(columns, 50000, 10000);
355
356 assert_eq!(stripe_index.num_row_groups(), 5);
357 assert_eq!(stripe_index.total_rows(), 50000);
358 assert_eq!(stripe_index.rows_per_group(), 10000);
359 }
360}