use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use roaring::RoaringBitmap;
pub fn row_selection_to_roaring(
selection: &RowSelection,
total_rows: u64,
) -> Option<RoaringBitmap> {
if total_rows > u32::MAX as u64 {
return None;
}
let mut bitmap = RoaringBitmap::new();
let mut offset: u64 = 0;
for selector in selection.iter() {
let len = selector.row_count as u64;
if selector.skip {
offset += len;
continue;
}
if len == 0 {
continue;
}
let end = offset + len;
if end > total_rows {
return None;
}
if end > u32::MAX as u64 {
return None;
}
bitmap.insert_range(offset as u32..end as u32);
offset = end;
}
Some(bitmap)
}
pub fn roaring_to_row_selection(bitmap: &RoaringBitmap, total_rows: usize) -> RowSelection {
let mut selectors = Vec::new();
let mut cursor = 0usize;
let mut iter = bitmap.iter().peekable();
while cursor < total_rows {
let Some(&next) = iter.peek() else {
if cursor < total_rows {
selectors.push(RowSelector::skip(total_rows - cursor));
}
break;
};
let next = next as usize;
if next > cursor {
selectors.push(RowSelector::skip(next - cursor));
cursor = next;
}
let mut run_len = 0usize;
while let Some(&value) = iter.peek() {
let value = value as usize;
if value != cursor + run_len {
break;
}
run_len += 1;
iter.next();
}
if run_len > 0 {
selectors.push(RowSelector::select(run_len));
cursor += run_len;
}
}
RowSelection::from(selectors)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn row_selection_round_trip() {
let selection = RowSelection::from(vec![
RowSelector::skip(3),
RowSelector::select(2),
RowSelector::skip(1),
RowSelector::select(3),
]);
let bitmap = row_selection_to_roaring(&selection, 9).expect("should convert");
let back = roaring_to_row_selection(&bitmap, 9);
let original: Vec<RowSelector> = selection.into();
let round_trip: Vec<RowSelector> = back.into();
assert_eq!(original, round_trip);
}
#[test]
fn roaring_rejects_large_datasets() {
let large_total = u32::MAX as u64 + 1;
let selection = RowSelection::from(vec![RowSelector::select(1000)]);
let result = row_selection_to_roaring(&selection, large_total);
assert!(
result.is_none(),
"Should return None for datasets exceeding u32::MAX"
);
}
#[test]
fn roaring_accepts_max_u32_dataset() {
let max_u32 = u32::MAX as u64;
let selection = RowSelection::from(vec![RowSelector::select(1000)]);
let result = row_selection_to_roaring(&selection, max_u32);
assert!(
result.is_some(),
"Should accept datasets at exactly u32::MAX"
);
}
#[test]
fn roaring_rejects_malformed_selection() {
let selection = RowSelection::from(vec![RowSelector::select(3), RowSelector::select(3)]);
let result = row_selection_to_roaring(&selection, 5);
assert!(
result.is_none(),
"Should return None for selections that exceed total_rows"
);
}
}