Skip to main content

orc_rust/
row_index.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Row index structures for ORC files
19//!
20//! This module provides structures for parsing and accessing row-level indexes
21//! from ORC stripes. Row indexes contain statistics for each row group (default
22//! 10,000 rows) enabling efficient predicate pushdown and row group pruning.
23
24use std::collections::HashMap;
25
26use crate::bloom_filter::BloomFilter;
27use crate::error::Result;
28use crate::proto;
29use crate::statistics::ColumnStatistics;
30
31/// A single row group entry in a row index
32///
33/// According to ORC spec, each entry contains:
34/// - Statistics for the row group (min/max/null count)
35/// - Stream positions for seeking to the row group (for future use)
36#[derive(Debug, Clone)]
37pub struct RowGroupEntry {
38    /// Statistics for this row group
39    pub statistics: Option<ColumnStatistics>,
40
41    /// Stream positions for seeking
42    ///
43    /// According to ORC spec, positions encode differently for:
44    /// - Uncompressed: [RLE_run_byte_offset, num_values_to_consume]
45    /// - Compressed: [compression_chunk_start, decompressed_bytes, num_values]
46    ///
47    /// For columns with multiple streams, positions are concatenated:
48    /// [PRESENT_positions..., DATA_positions..., LENGTH_positions...]
49    ///
50    /// Note: Dictionary positions are NOT included (dictionary must be fully read)
51    pub positions: Vec<u64>,
52
53    /// Optional Bloom filter for this row group
54    pub bloom_filter: Option<BloomFilter>,
55}
56
57impl RowGroupEntry {
58    pub fn new(statistics: Option<ColumnStatistics>, positions: Vec<u64>) -> Self {
59        Self {
60            statistics,
61            positions,
62            bloom_filter: None,
63        }
64    }
65
66    pub fn with_bloom_filter(mut self, bloom_filter: Option<BloomFilter>) -> Self {
67        self.bloom_filter = bloom_filter;
68        self
69    }
70}
71
72/// Row index for a single column in a stripe
73///
74/// Only primitive columns have row indexes. Compound types (struct/list/map)
75/// delegate to their child columns.
76#[derive(Debug, Clone)]
77pub struct RowGroupIndex {
78    /// Row group entries, one per row group
79    entries: Vec<RowGroupEntry>,
80
81    /// Number of rows per row group (from row_index_stride, default 10,000)
82    rows_per_group: usize,
83
84    /// Column index this row index belongs to
85    column_index: usize,
86}
87
88impl RowGroupIndex {
89    pub fn new(entries: Vec<RowGroupEntry>, rows_per_group: usize, column_index: usize) -> Self {
90        Self {
91            entries,
92            rows_per_group,
93            column_index,
94        }
95    }
96
97    /// Get the number of row groups in this index
98    pub fn num_row_groups(&self) -> usize {
99        self.entries.len()
100    }
101
102    /// Get the number of rows per row group
103    pub fn rows_per_group(&self) -> usize {
104        self.rows_per_group
105    }
106
107    /// Get the column index this row index belongs to
108    pub fn column_index(&self) -> usize {
109        self.column_index
110    }
111
112    /// Get statistics for a specific row group
113    pub fn row_group_stats(&self, row_group_idx: usize) -> Option<&ColumnStatistics> {
114        self.entries
115            .get(row_group_idx)
116            .and_then(|entry| entry.statistics.as_ref())
117    }
118
119    /// Get an iterator over row group entries
120    pub fn entries(&self) -> impl Iterator<Item = &RowGroupEntry> {
121        self.entries.iter()
122    }
123
124    /// Get a mutable iterator over row group entries
125    pub(crate) fn entries_mut(&mut self) -> impl Iterator<Item = &mut RowGroupEntry> {
126        self.entries.iter_mut()
127    }
128
129    /// Get a specific row group entry
130    pub fn entry(&self, row_group_idx: usize) -> Option<&RowGroupEntry> {
131        self.entries.get(row_group_idx)
132    }
133}
134
135/// Row indexes for all columns in a stripe
136///
137/// This structure provides access to row group statistics for all primitive
138/// columns in a stripe, enabling row group-level filtering.
139#[derive(Debug, Clone)]
140pub struct StripeRowIndex {
141    /// Map from column index to its row group index
142    columns: HashMap<usize, RowGroupIndex>,
143
144    /// Total number of rows in the stripe
145    total_rows: usize,
146
147    /// Number of rows per row group
148    rows_per_group: usize,
149}
150
151impl StripeRowIndex {
152    pub fn new(
153        columns: HashMap<usize, RowGroupIndex>,
154        total_rows: usize,
155        rows_per_group: usize,
156    ) -> Self {
157        Self {
158            columns,
159            total_rows,
160            rows_per_group,
161        }
162    }
163
164    /// Get the row group index for a column
165    pub fn column(&self, column_idx: usize) -> Option<&RowGroupIndex> {
166        self.columns.get(&column_idx)
167    }
168
169    /// Get the number of row groups in this stripe
170    pub fn num_row_groups(&self) -> usize {
171        if self.rows_per_group == 0 {
172            return 0;
173        }
174        self.total_rows.div_ceil(self.rows_per_group)
175    }
176
177    /// Get statistics for a specific row group and column
178    pub fn row_group_stats(
179        &self,
180        column_idx: usize,
181        row_group_idx: usize,
182    ) -> Option<&ColumnStatistics> {
183        self.column(column_idx)
184            .and_then(|col_index| col_index.row_group_stats(row_group_idx))
185    }
186
187    /// Get the total number of rows in this stripe
188    pub fn total_rows(&self) -> usize {
189        self.total_rows
190    }
191
192    /// Get the number of rows per row group
193    pub fn rows_per_group(&self) -> usize {
194        self.rows_per_group
195    }
196
197    /// Get an iterator over all column indices that have row indexes
198    pub fn column_indices(&self) -> impl Iterator<Item = usize> + '_ {
199        self.columns.keys().copied()
200    }
201}
202
203/// Parse a `RowIndex` protobuf message into a `RowGroupIndex`
204fn parse_row_index(
205    proto: &proto::RowIndex,
206    column_index: usize,
207    rows_per_group: usize,
208) -> Result<RowGroupIndex> {
209    use crate::statistics::ColumnStatistics;
210
211    let entries: Result<Vec<RowGroupEntry>> = proto
212        .entry
213        .iter()
214        .map(|entry| {
215            let statistics = entry
216                .statistics
217                .as_ref()
218                .map(ColumnStatistics::try_from)
219                .transpose()?;
220            Ok(RowGroupEntry::new(statistics, entry.positions.clone()))
221        })
222        .collect();
223
224    Ok(RowGroupIndex::new(entries?, rows_per_group, column_index))
225}
226
227/// Parse row indexes from a stripe
228///
229/// According to ORC spec:
230/// - Only primitive columns have row indexes
231/// - Row indexes are stored in ROW_INDEX streams in the index section
232/// - Indexes are only loaded when predicate pushdown is used or seeking
233///
234/// This function parses all ROW_INDEX streams from the stripe's stream map.
235pub fn parse_stripe_row_indexes(
236    stripe_stream_map: &crate::stripe::StreamMap,
237    columns: &[crate::column::Column],
238    total_rows: usize,
239    rows_per_group: usize,
240) -> Result<StripeRowIndex> {
241    use crate::error::{DecodeProtoSnafu, IoSnafu};
242    use crate::proto::stream::Kind;
243    use prost::Message;
244    use snafu::ResultExt;
245
246    let mut row_indexes = HashMap::new();
247
248    for column in columns {
249        let column_id = column.column_id();
250
251        // Try to get ROW_INDEX stream for this column
252        let row_index_stream = stripe_stream_map.get_opt(column, Kind::RowIndex);
253
254        if let Some(mut decompressor) = row_index_stream {
255            // Decompress the stream
256            let mut buffer = Vec::new();
257            std::io::Read::read_to_end(&mut decompressor, &mut buffer).context(IoSnafu)?;
258
259            // Parse the protobuf message
260            let proto_row_index =
261                proto::RowIndex::decode(buffer.as_slice()).context(DecodeProtoSnafu)?;
262
263            // Parse into RowGroupIndex
264            let row_group_index =
265                parse_row_index(&proto_row_index, column_id as usize, rows_per_group)?;
266            row_indexes.insert(column_id as usize, row_group_index);
267        }
268    }
269
270    // Attach bloom filters if present
271    let bloom_filters = parse_bloom_filters(stripe_stream_map, columns)?;
272    for (column_id, filters) in bloom_filters {
273        if let Some(row_group_index) = row_indexes.get_mut(&column_id) {
274            let entry_count = row_group_index.num_row_groups();
275            assert_eq!(
276                entry_count,
277                filters.len(),
278                "Bloom filter count mismatch: expected {} but got {} for column {}",
279                entry_count,
280                filters.len(),
281                column_id
282            );
283            for (entry, bloom) in row_group_index.entries_mut().zip(filters.into_iter()) {
284                entry.bloom_filter = Some(bloom);
285            }
286        }
287    }
288
289    Ok(StripeRowIndex::new(row_indexes, total_rows, rows_per_group))
290}
291
292/// Parse Bloom filter indexes for the provided columns (if present)
293fn parse_bloom_filters(
294    stripe_stream_map: &crate::stripe::StreamMap,
295    columns: &[crate::column::Column],
296) -> Result<HashMap<usize, Vec<BloomFilter>>> {
297    use crate::error::{DecodeProtoSnafu, IoSnafu};
298    use crate::proto::stream::Kind;
299    use prost::Message;
300    use snafu::ResultExt;
301
302    let mut bloom_indexes = HashMap::new();
303
304    for column in columns {
305        let column_id = column.column_id();
306
307        let bloom_stream = stripe_stream_map
308            .get_opt(column, Kind::BloomFilter)
309            .or_else(|| stripe_stream_map.get_opt(column, Kind::BloomFilterUtf8));
310
311        if let Some(mut decompressor) = bloom_stream {
312            let mut buffer = Vec::new();
313            std::io::Read::read_to_end(&mut decompressor, &mut buffer).context(IoSnafu)?;
314
315            let proto_bloom_index =
316                proto::BloomFilterIndex::decode(buffer.as_slice()).context(DecodeProtoSnafu)?;
317
318            let filters: Vec<BloomFilter> = proto_bloom_index
319                .bloom_filter
320                .iter()
321                .filter_map(BloomFilter::try_from_proto)
322                .collect();
323
324            bloom_indexes.insert(column_id as usize, filters);
325        }
326    }
327
328    Ok(bloom_indexes)
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334
335    #[test]
336    fn test_row_group_index() {
337        let entries = vec![
338            RowGroupEntry::new(None, vec![1, 2, 3]),
339            RowGroupEntry::new(None, vec![4, 5, 6]),
340        ];
341        let index = RowGroupIndex::new(entries, 10000, 0);
342
343        assert_eq!(index.num_row_groups(), 2);
344        assert_eq!(index.rows_per_group(), 10000);
345        assert_eq!(index.column_index(), 0);
346    }
347
348    #[test]
349    fn test_stripe_row_index() {
350        let mut columns = HashMap::new();
351        let entries = vec![RowGroupEntry::new(None, vec![])];
352        columns.insert(0, RowGroupIndex::new(entries, 10000, 0));
353
354        let stripe_index = StripeRowIndex::new(columns, 50000, 10000);
355
356        assert_eq!(stripe_index.num_row_groups(), 5);
357        assert_eq!(stripe_index.total_rows(), 50000);
358        assert_eq!(stripe_index.rows_per_group(), 10000);
359    }
360}