use crate::storage::index::tid_bitmap::TidBitmap;
pub trait RowFetcher {
type Row;
type Error;
fn fetch(&self, page: u32, row_within_page: u32) -> Result<Option<Self::Row>, Self::Error>;
}
pub fn execute_bitmap_scan<F: RowFetcher>(
bitmap: &TidBitmap,
fetcher: &F,
rows_per_page: u32,
) -> Result<Vec<F::Row>, F::Error> {
let groups = bitmap.group_by_page(rows_per_page);
let mut out: Vec<F::Row> = Vec::with_capacity(bitmap.len() as usize);
for (page_id, rows_within) in groups {
for row in rows_within {
if let Some(row) = fetcher.fetch(page_id, row)? {
out.push(row);
}
}
}
Ok(out)
}
#[derive(Debug, Clone, Default)]
pub struct BitmapScanStats {
pub candidate_tids: u64,
pub rows_returned: u64,
pub pages_touched: u64,
}
impl BitmapScanStats {
pub fn survival_ratio(&self) -> f64 {
if self.candidate_tids == 0 {
return 0.0;
}
self.rows_returned as f64 / self.candidate_tids as f64
}
}
pub fn execute_bitmap_scan_with_stats<F: RowFetcher>(
bitmap: &TidBitmap,
fetcher: &F,
rows_per_page: u32,
) -> Result<(Vec<F::Row>, BitmapScanStats), F::Error> {
let groups = bitmap.group_by_page(rows_per_page);
let mut stats = BitmapScanStats {
candidate_tids: bitmap.len(),
rows_returned: 0,
pages_touched: groups.len() as u64,
};
let mut out: Vec<F::Row> = Vec::with_capacity(bitmap.len() as usize);
for (page_id, rows_within) in groups {
for row in rows_within {
if let Some(row) = fetcher.fetch(page_id, row)? {
out.push(row);
stats.rows_returned += 1;
}
}
}
Ok((out, stats))
}
pub fn execute_combined_bitmap_scan<F: RowFetcher>(
bitmaps: Vec<TidBitmap>,
combine: BitmapCombine,
fetcher: &F,
rows_per_page: u32,
) -> Result<Vec<F::Row>, F::Error> {
let merged = match combine {
BitmapCombine::And => crate::storage::index::tid_bitmap::intersect_all(bitmaps),
BitmapCombine::Or => crate::storage::index::tid_bitmap::union_all(bitmaps),
};
execute_bitmap_scan(&merged, fetcher, rows_per_page)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum BitmapCombine {
And,
Or,
}
pub struct BitmapScanRows<'a, F: RowFetcher> {
fetcher: &'a F,
groups: std::vec::IntoIter<(u32, Vec<u32>)>,
current_page: u32,
current_rows: std::vec::IntoIter<u32>,
candidate_tids: u64,
rows_returned: u64,
pages_touched: u64,
}
impl<'a, F: RowFetcher> BitmapScanRows<'a, F> {
pub fn stats(&self) -> BitmapScanStats {
BitmapScanStats {
candidate_tids: self.candidate_tids,
rows_returned: self.rows_returned,
pages_touched: self.pages_touched,
}
}
}
impl<'a, F: RowFetcher> Iterator for BitmapScanRows<'a, F> {
type Item = Result<F::Row, F::Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.current_rows.next() {
Some(row) => match self.fetcher.fetch(self.current_page, row) {
Ok(Some(r)) => {
self.rows_returned += 1;
return Some(Ok(r));
}
Ok(None) => continue,
Err(e) => return Some(Err(e)),
},
None => match self.groups.next() {
Some((page_id, rows_within)) => {
self.current_page = page_id;
self.current_rows = rows_within.into_iter();
}
None => return None,
},
}
}
}
}
pub fn execute_bitmap_scan_stream<'a, F: RowFetcher>(
bitmap: &TidBitmap,
fetcher: &'a F,
rows_per_page: u32,
) -> BitmapScanRows<'a, F> {
let groups = bitmap.group_by_page(rows_per_page);
let pages_touched = groups.len() as u64;
BitmapScanRows {
fetcher,
groups: groups.into_iter(),
current_page: 0,
current_rows: Vec::new().into_iter(),
candidate_tids: bitmap.len(),
rows_returned: 0,
pages_touched,
}
}
#[cfg(test)]
mod tests {
use super::*;
struct MapFetcher {
rows: std::collections::HashMap<(u32, u32), Option<u64>>,
}
impl RowFetcher for MapFetcher {
type Row = u64;
type Error = ();
fn fetch(&self, page: u32, row: u32) -> Result<Option<u64>, ()> {
Ok(self.rows.get(&(page, row)).copied().flatten())
}
}
struct ErrFetcher {
fail_at: (u32, u32),
}
impl RowFetcher for ErrFetcher {
type Row = u64;
type Error = &'static str;
fn fetch(&self, page: u32, row: u32) -> Result<Option<u64>, &'static str> {
if (page, row) == self.fail_at {
Err("fetch failed")
} else {
Ok(Some((page as u64) * 1000 + row as u64))
}
}
}
fn bitmap_with(tids: &[u32]) -> TidBitmap {
let mut b = TidBitmap::new();
for &t in tids {
b.insert(t).unwrap();
}
b
}
#[test]
fn bitmap_stream_matches_eager_scan() {
let rows_per_page = 128u32;
let tids: Vec<u32> = vec![0, 1, 5, 130, 131, 400];
let bitmap = bitmap_with(&tids);
let mut rows = std::collections::HashMap::new();
for &t in &tids {
let page = t / rows_per_page;
let row = t % rows_per_page;
rows.insert((page, row), Some(t as u64));
}
let fetcher = MapFetcher { rows };
let eager = execute_bitmap_scan(&bitmap, &fetcher, rows_per_page).unwrap();
let streamed: Vec<u64> = execute_bitmap_scan_stream(&bitmap, &fetcher, rows_per_page)
.map(|r| r.unwrap())
.collect();
assert_eq!(eager, streamed);
}
#[test]
fn bitmap_stream_skips_tombstones_and_tracks_stats() {
let rows_per_page = 128u32;
let tids: Vec<u32> = vec![0, 1, 2, 3];
let bitmap = bitmap_with(&tids);
let mut rows = std::collections::HashMap::new();
rows.insert((0, 0), Some(10u64));
rows.insert((0, 1), None); rows.insert((0, 2), Some(12u64));
rows.insert((0, 3), None); let fetcher = MapFetcher { rows };
let mut it = execute_bitmap_scan_stream(&bitmap, &fetcher, rows_per_page);
let collected: Vec<u64> = it.by_ref().map(|r| r.unwrap()).collect();
assert_eq!(collected, vec![10, 12]);
let stats = it.stats();
assert_eq!(stats.candidate_tids, 4);
assert_eq!(stats.rows_returned, 2);
let (_eager_rows, eager_stats) =
execute_bitmap_scan_with_stats(&bitmap, &fetcher, rows_per_page).unwrap();
assert_eq!(eager_stats.rows_returned, stats.rows_returned);
assert_eq!(eager_stats.candidate_tids, stats.candidate_tids);
assert_eq!(eager_stats.pages_touched, stats.pages_touched);
}
#[test]
fn bitmap_stream_surfaces_fetch_errors() {
let bitmap = bitmap_with(&[0, 1, 2]);
let fetcher = ErrFetcher { fail_at: (0, 1) };
let mut it = execute_bitmap_scan_stream(&bitmap, &fetcher, 128);
assert_eq!(it.next(), Some(Ok(0))); assert_eq!(it.next(), Some(Err("fetch failed"))); }
}