orc_rust/
row_index.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Row index structures for ORC files
19//!
20//! This module provides structures for parsing and accessing row-level indexes
21//! from ORC stripes. Row indexes contain statistics for each row group (default
22//! 10,000 rows) enabling efficient predicate pushdown and row group pruning.
23
24use std::collections::HashMap;
25
26use crate::error::Result;
27use crate::proto;
28use crate::statistics::ColumnStatistics;
29
30/// A single row group entry in a row index
31///
32/// According to ORC spec, each entry contains:
33/// - Statistics for the row group (min/max/null count)
34/// - Stream positions for seeking to the row group (for future use)
35#[derive(Debug, Clone)]
36pub struct RowGroupEntry {
37    /// Statistics for this row group
38    pub statistics: Option<ColumnStatistics>,
39
40    /// Stream positions for seeking
41    ///
42    /// According to ORC spec, positions encode differently for:
43    /// - Uncompressed: [RLE_run_byte_offset, num_values_to_consume]
44    /// - Compressed: [compression_chunk_start, decompressed_bytes, num_values]
45    ///
46    /// For columns with multiple streams, positions are concatenated:
47    /// [PRESENT_positions..., DATA_positions..., LENGTH_positions...]
48    ///
49    /// Note: Dictionary positions are NOT included (dictionary must be fully read)
50    pub positions: Vec<u64>,
51}
52
53impl RowGroupEntry {
54    pub fn new(statistics: Option<ColumnStatistics>, positions: Vec<u64>) -> Self {
55        Self {
56            statistics,
57            positions,
58        }
59    }
60}
61
62/// Row index for a single column in a stripe
63///
64/// Only primitive columns have row indexes. Compound types (struct/list/map)
65/// delegate to their child columns.
66#[derive(Debug, Clone)]
67pub struct RowGroupIndex {
68    /// Row group entries, one per row group
69    entries: Vec<RowGroupEntry>,
70
71    /// Number of rows per row group (from row_index_stride, default 10,000)
72    rows_per_group: usize,
73
74    /// Column index this row index belongs to
75    column_index: usize,
76}
77
78impl RowGroupIndex {
79    pub fn new(entries: Vec<RowGroupEntry>, rows_per_group: usize, column_index: usize) -> Self {
80        Self {
81            entries,
82            rows_per_group,
83            column_index,
84        }
85    }
86
87    /// Get the number of row groups in this index
88    pub fn num_row_groups(&self) -> usize {
89        self.entries.len()
90    }
91
92    /// Get the number of rows per row group
93    pub fn rows_per_group(&self) -> usize {
94        self.rows_per_group
95    }
96
97    /// Get the column index this row index belongs to
98    pub fn column_index(&self) -> usize {
99        self.column_index
100    }
101
102    /// Get statistics for a specific row group
103    pub fn row_group_stats(&self, row_group_idx: usize) -> Option<&ColumnStatistics> {
104        self.entries
105            .get(row_group_idx)
106            .and_then(|entry| entry.statistics.as_ref())
107    }
108
109    /// Get an iterator over row group entries
110    pub fn entries(&self) -> impl Iterator<Item = &RowGroupEntry> {
111        self.entries.iter()
112    }
113
114    /// Get a specific row group entry
115    pub fn entry(&self, row_group_idx: usize) -> Option<&RowGroupEntry> {
116        self.entries.get(row_group_idx)
117    }
118}
119
120/// Row indexes for all columns in a stripe
121///
122/// This structure provides access to row group statistics for all primitive
123/// columns in a stripe, enabling row group-level filtering.
124#[derive(Debug, Clone)]
125pub struct StripeRowIndex {
126    /// Map from column index to its row group index
127    columns: HashMap<usize, RowGroupIndex>,
128
129    /// Total number of rows in the stripe
130    total_rows: usize,
131
132    /// Number of rows per row group
133    rows_per_group: usize,
134}
135
136impl StripeRowIndex {
137    pub fn new(
138        columns: HashMap<usize, RowGroupIndex>,
139        total_rows: usize,
140        rows_per_group: usize,
141    ) -> Self {
142        Self {
143            columns,
144            total_rows,
145            rows_per_group,
146        }
147    }
148
149    /// Get the row group index for a column
150    pub fn column(&self, column_idx: usize) -> Option<&RowGroupIndex> {
151        self.columns.get(&column_idx)
152    }
153
154    /// Get the number of row groups in this stripe
155    pub fn num_row_groups(&self) -> usize {
156        if self.rows_per_group == 0 {
157            return 0;
158        }
159        self.total_rows.div_ceil(self.rows_per_group)
160    }
161
162    /// Get statistics for a specific row group and column
163    pub fn row_group_stats(
164        &self,
165        column_idx: usize,
166        row_group_idx: usize,
167    ) -> Option<&ColumnStatistics> {
168        self.column(column_idx)
169            .and_then(|col_index| col_index.row_group_stats(row_group_idx))
170    }
171
172    /// Get the total number of rows in this stripe
173    pub fn total_rows(&self) -> usize {
174        self.total_rows
175    }
176
177    /// Get the number of rows per row group
178    pub fn rows_per_group(&self) -> usize {
179        self.rows_per_group
180    }
181
182    /// Get an iterator over all column indices that have row indexes
183    pub fn column_indices(&self) -> impl Iterator<Item = usize> + '_ {
184        self.columns.keys().copied()
185    }
186}
187
188/// Parse a `RowIndex` protobuf message into a `RowGroupIndex`
189fn parse_row_index(
190    proto: &proto::RowIndex,
191    column_index: usize,
192    rows_per_group: usize,
193) -> Result<RowGroupIndex> {
194    use crate::statistics::ColumnStatistics;
195
196    let entries: Result<Vec<RowGroupEntry>> = proto
197        .entry
198        .iter()
199        .map(|entry| {
200            let statistics = entry
201                .statistics
202                .as_ref()
203                .map(ColumnStatistics::try_from)
204                .transpose()?;
205            Ok(RowGroupEntry::new(statistics, entry.positions.clone()))
206        })
207        .collect();
208
209    Ok(RowGroupIndex::new(entries?, rows_per_group, column_index))
210}
211
212/// Parse row indexes from a stripe
213///
214/// According to ORC spec:
215/// - Only primitive columns have row indexes
216/// - Row indexes are stored in ROW_INDEX streams in the index section
217/// - Indexes are only loaded when predicate pushdown is used or seeking
218///
219/// This function parses all ROW_INDEX streams from the stripe's stream map.
220pub fn parse_stripe_row_indexes(
221    stripe_stream_map: &crate::stripe::StreamMap,
222    columns: &[crate::column::Column],
223    total_rows: usize,
224    rows_per_group: usize,
225) -> Result<StripeRowIndex> {
226    use crate::error::{DecodeProtoSnafu, IoSnafu};
227    use crate::proto::stream::Kind;
228    use prost::Message;
229    use snafu::ResultExt;
230
231    let mut row_indexes = HashMap::new();
232
233    for column in columns {
234        let column_id = column.column_id();
235
236        // Try to get ROW_INDEX stream for this column
237        let row_index_stream = stripe_stream_map.get_opt(column, Kind::RowIndex);
238
239        if let Some(mut decompressor) = row_index_stream {
240            // Decompress the stream
241            let mut buffer = Vec::new();
242            std::io::Read::read_to_end(&mut decompressor, &mut buffer).context(IoSnafu)?;
243
244            // Parse the protobuf message
245            let proto_row_index =
246                proto::RowIndex::decode(buffer.as_slice()).context(DecodeProtoSnafu)?;
247
248            // Parse into RowGroupIndex
249            let row_group_index =
250                parse_row_index(&proto_row_index, column_id as usize, rows_per_group)?;
251            row_indexes.insert(column_id as usize, row_group_index);
252        }
253    }
254
255    Ok(StripeRowIndex::new(row_indexes, total_rows, rows_per_group))
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261
262    #[test]
263    fn test_row_group_index() {
264        let entries = vec![
265            RowGroupEntry::new(None, vec![1, 2, 3]),
266            RowGroupEntry::new(None, vec![4, 5, 6]),
267        ];
268        let index = RowGroupIndex::new(entries, 10000, 0);
269
270        assert_eq!(index.num_row_groups(), 2);
271        assert_eq!(index.rows_per_group(), 10000);
272        assert_eq!(index.column_index(), 0);
273    }
274
275    #[test]
276    fn test_stripe_row_index() {
277        let mut columns = HashMap::new();
278        let entries = vec![RowGroupEntry::new(None, vec![])];
279        columns.insert(0, RowGroupIndex::new(entries, 10000, 0));
280
281        let stripe_index = StripeRowIndex::new(columns, 50000, 10000);
282
283        assert_eq!(stripe_index.num_row_groups(), 5);
284        assert_eq!(stripe_index.total_rows(), 50000);
285        assert_eq!(stripe_index.rows_per_group(), 10000);
286    }
287}