1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use parquet_format_async_temp::PageLocation;

use crate::error::Error;

/// An interval
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Interval {
    /// Its start
    pub start: usize,
    /// Its length
    pub length: usize,
}

impl Interval {
    /// Create a new interal
    pub fn new(start: usize, length: usize) -> Self {
        Self { start, length }
    }
}

/// Returns the set of (row) intervals of the pages.
fn compute_page_row_intervals(
    locations: &[PageLocation],
    num_rows: usize,
) -> Result<Vec<Interval>, Error> {
    if locations.is_empty() {
        return Ok(vec![]);
    };

    let last = (|| {
        let start: usize = locations.last().unwrap().first_row_index.try_into()?;
        let length = num_rows - start;
        Result::<_, Error>::Ok(Interval::new(start, length))
    })();

    let pages_lengths = locations
        .windows(2)
        .map(|x| {
            let start = usize::try_from(x[0].first_row_index)?;
            let length = usize::try_from(x[1].first_row_index - x[0].first_row_index)?;
            Ok(Interval::new(start, length))
        })
        .chain(std::iter::once(last));
    pages_lengths.collect()
}

/// Returns the set of intervals `(start, len)` containing all the
/// selected rows (for a given column)
pub fn compute_rows(
    selected: &[bool],
    locations: &[PageLocation],
    num_rows: usize,
) -> Result<Vec<Interval>, Error> {
    let page_intervals = compute_page_row_intervals(locations, num_rows)?;

    Ok(selected
        .iter()
        .zip(page_intervals.iter().copied())
        .filter_map(
            |(&is_selected, page)| {
                if is_selected {
                    Some(page)
                } else {
                    None
                }
            },
        )
        .collect())
}

/// An enum describing a page that was either selected in a filter pushdown or skipped
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FilteredPage {
    /// Location of the page in the file
    pub start: u64,
    pub length: usize,
    /// rows to select from the page
    pub selected_rows: Vec<Interval>,
    pub num_rows: usize,
}

fn is_in(probe: Interval, intervals: &[Interval]) -> Vec<Interval> {
    intervals
        .iter()
        .filter_map(|interval| {
            let interval_end = interval.start + interval.length;
            let probe_end = probe.start + probe.length;
            let overlaps = (probe.start < interval_end) && (probe_end > interval.start);
            if overlaps {
                let start = interval.start.max(probe.start);
                let end = interval_end.min(probe_end);
                Some(Interval::new(start - probe.start, end - start))
            } else {
                None
            }
        })
        .collect()
}

/// Given a set of selected [Interval]s of rows and the set of [`PageLocation`], returns the
/// a set of [`FilteredPage`] with the same number of items as `locations`.
pub fn select_pages(
    intervals: &[Interval],
    locations: &[PageLocation],
    num_rows: usize,
) -> Result<Vec<FilteredPage>, Error> {
    let page_intervals = compute_page_row_intervals(locations, num_rows)?;

    page_intervals
        .into_iter()
        .zip(locations.iter())
        .map(|(interval, location)| {
            let selected_rows = is_in(interval, intervals);
            Ok(FilteredPage {
                start: location.offset.try_into()?,
                length: location.compressed_page_size.try_into()?,
                selected_rows,
                num_rows: interval.length,
            })
        })
        .collect()
}