Skip to main content

reddb_server/storage/query/executors/
bitmap_scan.rs

1//! Bitmap heap scan — Fase 5 P6 consumer of `TidBitmap`.
2//!
3//! Implements the second half of the PG bitmap-index-scan
4//! pipeline: given a `TidBitmap` produced by AND/OR-ing
5//! per-index bitmaps, walk the target heap pages in sorted
6//! order and fetch the rows corresponding to set bits.
7//!
8//! The win over a plain index scan is **sequential page
9//! access**: bitmap entries are sorted by TID, so successive
10//! fetches go to adjacent pages, giving the OS and buffer
11//! pool a prefetch-friendly stream instead of random seeks.
12//! For queries touching >1% of a large table the difference
13//! is 5-20× on spinning disks and ~2-3× on SSDs.
14//!
15//! Mirrors PG's `nodeBitmapHeapscan.c` modulo features we
16//! don't have:
17//!
18//! - **Lossy bitmap entries**: PG's tidbitmap can spill to
19//!   page-level granularity when memory pressure mounts.
20//!   `TidBitmap` doesn't, so the bitmap heap scan here
21//!   always processes tuple-level entries.
22//! - **Prefetch hints**: PG calls `PrefetchBuffer` for the
23//!   next few pages while the current page is being
24//!   processed. We rely on the OS readahead for now.
25//! - **Parallel bitmap heap scans**: single-producer for now.
26//!
27//! This module is **not yet wired** into the canonical plan.
28//! A `BitmapHeapScan` logical plan node and its executor
29//! arm plug into `query_exec/table.rs` when the planner
30//! learns to emit bitmap paths.
31
32use crate::storage::index::tid_bitmap::TidBitmap;
33
34/// Callback the bitmap scan uses to fetch a row by its TID.
35/// The caller (typically the runtime executor) provides this
36/// when invoking the scan because the row shape depends on
37/// the target collection.
38pub trait RowFetcher {
39    type Row;
40    type Error;
41    /// Load the row at `(page, row_within_page)`. Returns
42    /// `None` when the slot is empty (tombstone / deleted)
43    /// so the scan can skip it without raising an error.
44    fn fetch(&self, page: u32, row_within_page: u32) -> Result<Option<Self::Row>, Self::Error>;
45}
46
47/// Execute a bitmap heap scan over `bitmap`, invoking `fetcher`
48/// for each surviving TID in sorted order. Returns the
49/// materialised rows in the same TID order.
50///
51/// `rows_per_page` is the table's fixed row density — the
52/// planner supplies this from schema metadata. For reddb's
53/// default 8 KB pages with ~64-byte rows it's ~128.
54///
55/// Three-phase algorithm:
56///
57/// 1. **Group by page**: `bitmap.group_by_page(rows_per_page)`
58///    returns `(page_id, Vec<row_within_page>)` sorted by
59///    page. This turns the iteration into a sequential-read-
60///    friendly pattern.
61/// 2. **Fetch each page's rows**: for each page group, the
62///    fetcher is called once per target row within that page.
63///    The fetcher is responsible for caching the page's
64///    buffer so repeated fetches within the same page don't
65///    re-read the disk.
66/// 3. **Materialise the output**: rows from all pages flow
67///    into a single result Vec in their natural ascending
68///    TID order.
69pub fn execute_bitmap_scan<F: RowFetcher>(
70    bitmap: &TidBitmap,
71    fetcher: &F,
72    rows_per_page: u32,
73) -> Result<Vec<F::Row>, F::Error> {
74    let groups = bitmap.group_by_page(rows_per_page);
75    // Pre-allocate capacity for the expected output size —
76    // bitmap::len() gives exact row count since the bitmap
77    // is not lossy.
78    let mut out: Vec<F::Row> = Vec::with_capacity(bitmap.len() as usize);
79    for (page_id, rows_within) in groups {
80        for row in rows_within {
81            if let Some(row) = fetcher.fetch(page_id, row)? {
82                out.push(row);
83            }
84        }
85    }
86    Ok(out)
87}
88
89/// Summary statistics the bitmap scan returns alongside its
90/// output. Useful for `EXPLAIN ANALYZE`-style diagnostics and
91/// for the planner's feedback loop when adjusting cost
92/// estimates based on actual selectivity.
93#[derive(Debug, Clone, Default)]
94pub struct BitmapScanStats {
95    /// Total candidate TIDs the bitmap contained before
96    /// fetching.
97    pub candidate_tids: u64,
98    /// Rows actually returned (candidates minus tombstones).
99    pub rows_returned: u64,
100    /// Distinct pages touched during the scan. A good proxy
101    /// for physical I/O: n pages × buffer-pool-hit ratio.
102    pub pages_touched: u64,
103}
104
105impl BitmapScanStats {
106    /// Returns the fraction of candidates that survived the
107    /// tombstone check. Values near 1.0 mean the bitmap is
108    /// well-pruned; values near 0.0 mean the index was stale
109    /// and VACUUM should run.
110    pub fn survival_ratio(&self) -> f64 {
111        if self.candidate_tids == 0 {
112            return 0.0;
113        }
114        self.rows_returned as f64 / self.candidate_tids as f64
115    }
116}
117
118/// Variant of `execute_bitmap_scan` that also fills a
119/// `BitmapScanStats` struct alongside the row output. Used
120/// by `EXPLAIN ANALYZE` paths and by the runtime's
121/// cardinality feedback loop.
122pub fn execute_bitmap_scan_with_stats<F: RowFetcher>(
123    bitmap: &TidBitmap,
124    fetcher: &F,
125    rows_per_page: u32,
126) -> Result<(Vec<F::Row>, BitmapScanStats), F::Error> {
127    let groups = bitmap.group_by_page(rows_per_page);
128    let mut stats = BitmapScanStats {
129        candidate_tids: bitmap.len(),
130        rows_returned: 0,
131        pages_touched: groups.len() as u64,
132    };
133    let mut out: Vec<F::Row> = Vec::with_capacity(bitmap.len() as usize);
134    for (page_id, rows_within) in groups {
135        for row in rows_within {
136            if let Some(row) = fetcher.fetch(page_id, row)? {
137                out.push(row);
138                stats.rows_returned += 1;
139            }
140        }
141    }
142    Ok((out, stats))
143}
144
145/// Phase 3.6 wiring entry point. Combines a list of per-index
146/// bitmaps via the requested boolean op and runs a single heap
147/// fetch over the result. The planner uses this when WHERE has
148/// multi-index AND/OR predicates.
149///
150/// `combine` controls the merge: `BitmapCombine::And` produces
151/// the intersection (rows matching every index), `Or` produces
152/// the union (rows matching any index).
153///
154/// Returns the scan rows in TID-sorted order. Caller can wrap
155/// in `execute_bitmap_scan_with_stats` instead if it wants the
156/// diagnostic counters.
157pub fn execute_combined_bitmap_scan<F: RowFetcher>(
158    bitmaps: Vec<TidBitmap>,
159    combine: BitmapCombine,
160    fetcher: &F,
161    rows_per_page: u32,
162) -> Result<Vec<F::Row>, F::Error> {
163    let merged = match combine {
164        BitmapCombine::And => crate::storage::index::tid_bitmap::intersect_all(bitmaps),
165        BitmapCombine::Or => crate::storage::index::tid_bitmap::union_all(bitmaps),
166    };
167    execute_bitmap_scan(&merged, fetcher, rows_per_page)
168}
169
170/// Boolean combinator passed into `execute_combined_bitmap_scan`.
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum BitmapCombine {
173    And,
174    Or,
175}
176
177// ──────── Issue #768 / S9 — pull-based bitmap heap scan ────────
178
179/// Lazily-evaluated, pull-based counterpart to
180/// [`execute_bitmap_scan`].
181///
182/// `execute_bitmap_scan` fetches every surviving row into a single
183/// `Vec<F::Row>` before returning — server memory scales with the
184/// result-set cardinality. `BitmapScanRows` instead holds only the
185/// page-grouped TID metadata (small: one `u32` page id plus the set
186/// row offsets per touched page) and fetches **one row at a time**
187/// on demand, skipping tombstones inline. Driven by the S1
188/// [`ChunkProducer`](crate::server::output_stream::ChunkProducer),
189/// the resident row payload is a single row, not the whole heap
190/// scan.
191///
192/// Order is identical to `execute_bitmap_scan`: rows are produced in
193/// ascending `(page, row_within_page)` TID order, so collecting the
194/// iterator reproduces the eager `Vec` exactly — the S9 parity
195/// contract.
196///
197/// Errors are surfaced as `Some(Err(_))`; the caller decides whether
198/// to abort. After a fetch error the iterator may still be polled,
199/// but well-behaved drivers stop on the first error.
200pub struct BitmapScanRows<'a, F: RowFetcher> {
201    fetcher: &'a F,
202    groups: std::vec::IntoIter<(u32, Vec<u32>)>,
203    current_page: u32,
204    current_rows: std::vec::IntoIter<u32>,
205    candidate_tids: u64,
206    rows_returned: u64,
207    pages_touched: u64,
208}
209
210impl<'a, F: RowFetcher> BitmapScanRows<'a, F> {
211    /// Diagnostic counters accumulated so far. After the iterator is
212    /// fully drained these equal what
213    /// [`execute_bitmap_scan_with_stats`] would have reported.
214    pub fn stats(&self) -> BitmapScanStats {
215        BitmapScanStats {
216            candidate_tids: self.candidate_tids,
217            rows_returned: self.rows_returned,
218            pages_touched: self.pages_touched,
219        }
220    }
221}
222
223impl<'a, F: RowFetcher> Iterator for BitmapScanRows<'a, F> {
224    type Item = Result<F::Row, F::Error>;
225
226    fn next(&mut self) -> Option<Self::Item> {
227        loop {
228            // Drain the current page's row offsets first; advance to
229            // the next page group only when exhausted.
230            match self.current_rows.next() {
231                Some(row) => match self.fetcher.fetch(self.current_page, row) {
232                    Ok(Some(r)) => {
233                        self.rows_returned += 1;
234                        return Some(Ok(r));
235                    }
236                    // Tombstone / deleted slot — skip without yielding.
237                    Ok(None) => continue,
238                    Err(e) => return Some(Err(e)),
239                },
240                None => match self.groups.next() {
241                    Some((page_id, rows_within)) => {
242                        self.current_page = page_id;
243                        self.current_rows = rows_within.into_iter();
244                    }
245                    None => return None,
246                },
247            }
248        }
249    }
250}
251
252/// Construct a pull-based [`BitmapScanRows`] over `bitmap`. The page
253/// grouping (sequential-read-friendly TID ordering) is computed up
254/// front — it is metadata-sized, not row-sized — while the heap
255/// fetches stay lazy.
256pub fn execute_bitmap_scan_stream<'a, F: RowFetcher>(
257    bitmap: &TidBitmap,
258    fetcher: &'a F,
259    rows_per_page: u32,
260) -> BitmapScanRows<'a, F> {
261    let groups = bitmap.group_by_page(rows_per_page);
262    let pages_touched = groups.len() as u64;
263    BitmapScanRows {
264        fetcher,
265        groups: groups.into_iter(),
266        current_page: 0,
267        current_rows: Vec::new().into_iter(),
268        candidate_tids: bitmap.len(),
269        rows_returned: 0,
270        pages_touched,
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277
278    /// In-memory fetcher: a dense `(page, row) -> Option<u64>` map.
279    /// `None` models a tombstone the scan must skip.
280    struct MapFetcher {
281        rows: std::collections::HashMap<(u32, u32), Option<u64>>,
282    }
283
284    impl RowFetcher for MapFetcher {
285        type Row = u64;
286        type Error = ();
287        fn fetch(&self, page: u32, row: u32) -> Result<Option<u64>, ()> {
288            Ok(self.rows.get(&(page, row)).copied().flatten())
289        }
290    }
291
292    /// Fetcher that errors on a specific TID, to exercise the
293    /// `Some(Err(_))` path.
294    struct ErrFetcher {
295        fail_at: (u32, u32),
296    }
297    impl RowFetcher for ErrFetcher {
298        type Row = u64;
299        type Error = &'static str;
300        fn fetch(&self, page: u32, row: u32) -> Result<Option<u64>, &'static str> {
301            if (page, row) == self.fail_at {
302                Err("fetch failed")
303            } else {
304                Ok(Some((page as u64) * 1000 + row as u64))
305            }
306        }
307    }
308
309    fn bitmap_with(tids: &[u32]) -> TidBitmap {
310        let mut b = TidBitmap::new();
311        for &t in tids {
312            b.insert(t).unwrap();
313        }
314        b
315    }
316
317    #[test]
318    fn bitmap_stream_matches_eager_scan() {
319        // Acceptance #3 / #5: parity with the materialising path on a
320        // small fixture.
321        let rows_per_page = 128u32;
322        let tids: Vec<u32> = vec![0, 1, 5, 130, 131, 400];
323        let bitmap = bitmap_with(&tids);
324        let mut rows = std::collections::HashMap::new();
325        for &t in &tids {
326            let page = t / rows_per_page;
327            let row = t % rows_per_page;
328            rows.insert((page, row), Some(t as u64));
329        }
330        let fetcher = MapFetcher { rows };
331
332        let eager = execute_bitmap_scan(&bitmap, &fetcher, rows_per_page).unwrap();
333        let streamed: Vec<u64> = execute_bitmap_scan_stream(&bitmap, &fetcher, rows_per_page)
334            .map(|r| r.unwrap())
335            .collect();
336        assert_eq!(eager, streamed);
337    }
338
339    #[test]
340    fn bitmap_stream_skips_tombstones_and_tracks_stats() {
341        let rows_per_page = 128u32;
342        let tids: Vec<u32> = vec![0, 1, 2, 3];
343        let bitmap = bitmap_with(&tids);
344        let mut rows = std::collections::HashMap::new();
345        rows.insert((0, 0), Some(10u64));
346        rows.insert((0, 1), None); // tombstone
347        rows.insert((0, 2), Some(12u64));
348        rows.insert((0, 3), None); // tombstone
349        let fetcher = MapFetcher { rows };
350
351        let mut it = execute_bitmap_scan_stream(&bitmap, &fetcher, rows_per_page);
352        let collected: Vec<u64> = it.by_ref().map(|r| r.unwrap()).collect();
353        assert_eq!(collected, vec![10, 12]);
354
355        let stats = it.stats();
356        assert_eq!(stats.candidate_tids, 4);
357        assert_eq!(stats.rows_returned, 2);
358        // Parity with the eager stats path.
359        let (_eager_rows, eager_stats) =
360            execute_bitmap_scan_with_stats(&bitmap, &fetcher, rows_per_page).unwrap();
361        assert_eq!(eager_stats.rows_returned, stats.rows_returned);
362        assert_eq!(eager_stats.candidate_tids, stats.candidate_tids);
363        assert_eq!(eager_stats.pages_touched, stats.pages_touched);
364    }
365
366    #[test]
367    fn bitmap_stream_surfaces_fetch_errors() {
368        let bitmap = bitmap_with(&[0, 1, 2]);
369        let fetcher = ErrFetcher { fail_at: (0, 1) };
370        let mut it = execute_bitmap_scan_stream(&bitmap, &fetcher, 128);
371        assert_eq!(it.next(), Some(Ok(0))); // (0,0)
372        assert_eq!(it.next(), Some(Err("fetch failed"))); // (0,1)
373    }
374}