1use std::collections::HashMap;
25
26use crate::error::Result;
27use crate::proto;
28use crate::statistics::ColumnStatistics;
29
30#[derive(Debug, Clone)]
36pub struct RowGroupEntry {
37 pub statistics: Option<ColumnStatistics>,
39
40 pub positions: Vec<u64>,
51}
52
53impl RowGroupEntry {
54 pub fn new(statistics: Option<ColumnStatistics>, positions: Vec<u64>) -> Self {
55 Self {
56 statistics,
57 positions,
58 }
59 }
60}
61
62#[derive(Debug, Clone)]
67pub struct RowGroupIndex {
68 entries: Vec<RowGroupEntry>,
70
71 rows_per_group: usize,
73
74 column_index: usize,
76}
77
78impl RowGroupIndex {
79 pub fn new(entries: Vec<RowGroupEntry>, rows_per_group: usize, column_index: usize) -> Self {
80 Self {
81 entries,
82 rows_per_group,
83 column_index,
84 }
85 }
86
87 pub fn num_row_groups(&self) -> usize {
89 self.entries.len()
90 }
91
92 pub fn rows_per_group(&self) -> usize {
94 self.rows_per_group
95 }
96
97 pub fn column_index(&self) -> usize {
99 self.column_index
100 }
101
102 pub fn row_group_stats(&self, row_group_idx: usize) -> Option<&ColumnStatistics> {
104 self.entries
105 .get(row_group_idx)
106 .and_then(|entry| entry.statistics.as_ref())
107 }
108
109 pub fn entries(&self) -> impl Iterator<Item = &RowGroupEntry> {
111 self.entries.iter()
112 }
113
114 pub fn entry(&self, row_group_idx: usize) -> Option<&RowGroupEntry> {
116 self.entries.get(row_group_idx)
117 }
118}
119
120#[derive(Debug, Clone)]
125pub struct StripeRowIndex {
126 columns: HashMap<usize, RowGroupIndex>,
128
129 total_rows: usize,
131
132 rows_per_group: usize,
134}
135
136impl StripeRowIndex {
137 pub fn new(
138 columns: HashMap<usize, RowGroupIndex>,
139 total_rows: usize,
140 rows_per_group: usize,
141 ) -> Self {
142 Self {
143 columns,
144 total_rows,
145 rows_per_group,
146 }
147 }
148
149 pub fn column(&self, column_idx: usize) -> Option<&RowGroupIndex> {
151 self.columns.get(&column_idx)
152 }
153
154 pub fn num_row_groups(&self) -> usize {
156 if self.rows_per_group == 0 {
157 return 0;
158 }
159 self.total_rows.div_ceil(self.rows_per_group)
160 }
161
162 pub fn row_group_stats(
164 &self,
165 column_idx: usize,
166 row_group_idx: usize,
167 ) -> Option<&ColumnStatistics> {
168 self.column(column_idx)
169 .and_then(|col_index| col_index.row_group_stats(row_group_idx))
170 }
171
172 pub fn total_rows(&self) -> usize {
174 self.total_rows
175 }
176
177 pub fn rows_per_group(&self) -> usize {
179 self.rows_per_group
180 }
181
182 pub fn column_indices(&self) -> impl Iterator<Item = usize> + '_ {
184 self.columns.keys().copied()
185 }
186}
187
188fn parse_row_index(
190 proto: &proto::RowIndex,
191 column_index: usize,
192 rows_per_group: usize,
193) -> Result<RowGroupIndex> {
194 use crate::statistics::ColumnStatistics;
195
196 let entries: Result<Vec<RowGroupEntry>> = proto
197 .entry
198 .iter()
199 .map(|entry| {
200 let statistics = entry
201 .statistics
202 .as_ref()
203 .map(ColumnStatistics::try_from)
204 .transpose()?;
205 Ok(RowGroupEntry::new(statistics, entry.positions.clone()))
206 })
207 .collect();
208
209 Ok(RowGroupIndex::new(entries?, rows_per_group, column_index))
210}
211
212pub fn parse_stripe_row_indexes(
221 stripe_stream_map: &crate::stripe::StreamMap,
222 columns: &[crate::column::Column],
223 total_rows: usize,
224 rows_per_group: usize,
225) -> Result<StripeRowIndex> {
226 use crate::error::{DecodeProtoSnafu, IoSnafu};
227 use crate::proto::stream::Kind;
228 use prost::Message;
229 use snafu::ResultExt;
230
231 let mut row_indexes = HashMap::new();
232
233 for column in columns {
234 let column_id = column.column_id();
235
236 let row_index_stream = stripe_stream_map.get_opt(column, Kind::RowIndex);
238
239 if let Some(mut decompressor) = row_index_stream {
240 let mut buffer = Vec::new();
242 std::io::Read::read_to_end(&mut decompressor, &mut buffer).context(IoSnafu)?;
243
244 let proto_row_index =
246 proto::RowIndex::decode(buffer.as_slice()).context(DecodeProtoSnafu)?;
247
248 let row_group_index =
250 parse_row_index(&proto_row_index, column_id as usize, rows_per_group)?;
251 row_indexes.insert(column_id as usize, row_group_index);
252 }
253 }
254
255 Ok(StripeRowIndex::new(row_indexes, total_rows, rows_per_group))
256}
257
258#[cfg(test)]
259mod tests {
260 use super::*;
261
262 #[test]
263 fn test_row_group_index() {
264 let entries = vec![
265 RowGroupEntry::new(None, vec![1, 2, 3]),
266 RowGroupEntry::new(None, vec![4, 5, 6]),
267 ];
268 let index = RowGroupIndex::new(entries, 10000, 0);
269
270 assert_eq!(index.num_row_groups(), 2);
271 assert_eq!(index.rows_per_group(), 10000);
272 assert_eq!(index.column_index(), 0);
273 }
274
275 #[test]
276 fn test_stripe_row_index() {
277 let mut columns = HashMap::new();
278 let entries = vec![RowGroupEntry::new(None, vec![])];
279 columns.insert(0, RowGroupIndex::new(entries, 10000, 0));
280
281 let stripe_index = StripeRowIndex::new(columns, 50000, 10000);
282
283 assert_eq!(stripe_index.num_row_groups(), 5);
284 assert_eq!(stripe_index.total_rows(), 50000);
285 assert_eq!(stripe_index.rows_per_group(), 10000);
286 }
287}