reddb_server/storage/query/executors/bitmap_scan.rs
1//! Bitmap heap scan — Fase 5 P6 consumer of `TidBitmap`.
2//!
3//! Implements the second half of the PG bitmap-index-scan
4//! pipeline: given a `TidBitmap` produced by AND/OR-ing
5//! per-index bitmaps, walk the target heap pages in sorted
6//! order and fetch the rows corresponding to set bits.
7//!
8//! The win over a plain index scan is **sequential page
9//! access**: bitmap entries are sorted by TID, so successive
10//! fetches go to adjacent pages, giving the OS and buffer
11//! pool a prefetch-friendly stream instead of random seeks.
12//! For queries touching >1% of a large table the difference
13//! is 5-20× on spinning disks and ~2-3× on SSDs.
14//!
15//! Mirrors PG's `nodeBitmapHeapscan.c` modulo features we
16//! don't have:
17//!
18//! - **Lossy bitmap entries**: PG's tidbitmap can spill to
19//! page-level granularity when memory pressure mounts.
20//! `TidBitmap` doesn't, so the bitmap heap scan here
21//! always processes tuple-level entries.
22//! - **Prefetch hints**: PG calls `PrefetchBuffer` for the
23//! next few pages while the current page is being
24//! processed. We rely on the OS readahead for now.
25//! - **Parallel bitmap heap scans**: single-producer for now.
26//!
27//! This module is **not yet wired** into the canonical plan.
28//! A `BitmapHeapScan` logical plan node and its executor
29//! arm plug into `query_exec/table.rs` when the planner
30//! learns to emit bitmap paths.
31
32use crate::storage::index::tid_bitmap::TidBitmap;
33
34/// Callback the bitmap scan uses to fetch a row by its TID.
35/// The caller (typically the runtime executor) provides this
36/// when invoking the scan because the row shape depends on
37/// the target collection.
38pub trait RowFetcher {
39 type Row;
40 type Error;
41 /// Load the row at `(page, row_within_page)`. Returns
42 /// `None` when the slot is empty (tombstone / deleted)
43 /// so the scan can skip it without raising an error.
44 fn fetch(&self, page: u32, row_within_page: u32) -> Result<Option<Self::Row>, Self::Error>;
45}
46
47/// Execute a bitmap heap scan over `bitmap`, invoking `fetcher`
48/// for each surviving TID in sorted order. Returns the
49/// materialised rows in the same TID order.
50///
51/// `rows_per_page` is the table's fixed row density — the
52/// planner supplies this from schema metadata. For reddb's
53/// default 8 KB pages with ~64-byte rows it's ~128.
54///
55/// Three-phase algorithm:
56///
57/// 1. **Group by page**: `bitmap.group_by_page(rows_per_page)`
58/// returns `(page_id, Vec<row_within_page>)` sorted by
59/// page. This turns the iteration into a sequential-read-
60/// friendly pattern.
61/// 2. **Fetch each page's rows**: for each page group, the
62/// fetcher is called once per target row within that page.
63/// The fetcher is responsible for caching the page's
64/// buffer so repeated fetches within the same page don't
65/// re-read the disk.
66/// 3. **Materialise the output**: rows from all pages flow
67/// into a single result Vec in their natural ascending
68/// TID order.
69pub fn execute_bitmap_scan<F: RowFetcher>(
70 bitmap: &TidBitmap,
71 fetcher: &F,
72 rows_per_page: u32,
73) -> Result<Vec<F::Row>, F::Error> {
74 let groups = bitmap.group_by_page(rows_per_page);
75 // Pre-allocate capacity for the expected output size —
76 // bitmap::len() gives exact row count since the bitmap
77 // is not lossy.
78 let mut out: Vec<F::Row> = Vec::with_capacity(bitmap.len() as usize);
79 for (page_id, rows_within) in groups {
80 for row in rows_within {
81 if let Some(row) = fetcher.fetch(page_id, row)? {
82 out.push(row);
83 }
84 }
85 }
86 Ok(out)
87}
88
89/// Summary statistics the bitmap scan returns alongside its
90/// output. Useful for `EXPLAIN ANALYZE`-style diagnostics and
91/// for the planner's feedback loop when adjusting cost
92/// estimates based on actual selectivity.
93#[derive(Debug, Clone, Default)]
94pub struct BitmapScanStats {
95 /// Total candidate TIDs the bitmap contained before
96 /// fetching.
97 pub candidate_tids: u64,
98 /// Rows actually returned (candidates minus tombstones).
99 pub rows_returned: u64,
100 /// Distinct pages touched during the scan. A good proxy
101 /// for physical I/O: n pages × buffer-pool-hit ratio.
102 pub pages_touched: u64,
103}
104
105impl BitmapScanStats {
106 /// Returns the fraction of candidates that survived the
107 /// tombstone check. Values near 1.0 mean the bitmap is
108 /// well-pruned; values near 0.0 mean the index was stale
109 /// and VACUUM should run.
110 pub fn survival_ratio(&self) -> f64 {
111 if self.candidate_tids == 0 {
112 return 0.0;
113 }
114 self.rows_returned as f64 / self.candidate_tids as f64
115 }
116}
117
118/// Variant of `execute_bitmap_scan` that also fills a
119/// `BitmapScanStats` struct alongside the row output. Used
120/// by `EXPLAIN ANALYZE` paths and by the runtime's
121/// cardinality feedback loop.
122pub fn execute_bitmap_scan_with_stats<F: RowFetcher>(
123 bitmap: &TidBitmap,
124 fetcher: &F,
125 rows_per_page: u32,
126) -> Result<(Vec<F::Row>, BitmapScanStats), F::Error> {
127 let groups = bitmap.group_by_page(rows_per_page);
128 let mut stats = BitmapScanStats {
129 candidate_tids: bitmap.len(),
130 rows_returned: 0,
131 pages_touched: groups.len() as u64,
132 };
133 let mut out: Vec<F::Row> = Vec::with_capacity(bitmap.len() as usize);
134 for (page_id, rows_within) in groups {
135 for row in rows_within {
136 if let Some(row) = fetcher.fetch(page_id, row)? {
137 out.push(row);
138 stats.rows_returned += 1;
139 }
140 }
141 }
142 Ok((out, stats))
143}
144
145/// Phase 3.6 wiring entry point. Combines a list of per-index
146/// bitmaps via the requested boolean op and runs a single heap
147/// fetch over the result. The planner uses this when WHERE has
148/// multi-index AND/OR predicates.
149///
150/// `combine` controls the merge: `BitmapCombine::And` produces
151/// the intersection (rows matching every index), `Or` produces
152/// the union (rows matching any index).
153///
154/// Returns the scan rows in TID-sorted order. Caller can wrap
155/// in `execute_bitmap_scan_with_stats` instead if it wants the
156/// diagnostic counters.
157pub fn execute_combined_bitmap_scan<F: RowFetcher>(
158 bitmaps: Vec<TidBitmap>,
159 combine: BitmapCombine,
160 fetcher: &F,
161 rows_per_page: u32,
162) -> Result<Vec<F::Row>, F::Error> {
163 let merged = match combine {
164 BitmapCombine::And => crate::storage::index::tid_bitmap::intersect_all(bitmaps),
165 BitmapCombine::Or => crate::storage::index::tid_bitmap::union_all(bitmaps),
166 };
167 execute_bitmap_scan(&merged, fetcher, rows_per_page)
168}
169
170/// Boolean combinator passed into `execute_combined_bitmap_scan`.
171#[derive(Debug, Clone, Copy, PartialEq, Eq)]
172pub enum BitmapCombine {
173 And,
174 Or,
175}
176
177// ──────── Issue #768 / S9 — pull-based bitmap heap scan ────────
178
179/// Lazily-evaluated, pull-based counterpart to
180/// [`execute_bitmap_scan`].
181///
182/// `execute_bitmap_scan` fetches every surviving row into a single
183/// `Vec<F::Row>` before returning — server memory scales with the
184/// result-set cardinality. `BitmapScanRows` instead holds only the
185/// page-grouped TID metadata (small: one `u32` page id plus the set
186/// row offsets per touched page) and fetches **one row at a time**
187/// on demand, skipping tombstones inline. Driven by the S1
188/// [`ChunkProducer`](crate::server::output_stream::ChunkProducer),
189/// the resident row payload is a single row, not the whole heap
190/// scan.
191///
192/// Order is identical to `execute_bitmap_scan`: rows are produced in
193/// ascending `(page, row_within_page)` TID order, so collecting the
194/// iterator reproduces the eager `Vec` exactly — the S9 parity
195/// contract.
196///
197/// Errors are surfaced as `Some(Err(_))`; the caller decides whether
198/// to abort. After a fetch error the iterator may still be polled,
199/// but well-behaved drivers stop on the first error.
200pub struct BitmapScanRows<'a, F: RowFetcher> {
201 fetcher: &'a F,
202 groups: std::vec::IntoIter<(u32, Vec<u32>)>,
203 current_page: u32,
204 current_rows: std::vec::IntoIter<u32>,
205 candidate_tids: u64,
206 rows_returned: u64,
207 pages_touched: u64,
208}
209
210impl<'a, F: RowFetcher> BitmapScanRows<'a, F> {
211 /// Diagnostic counters accumulated so far. After the iterator is
212 /// fully drained these equal what
213 /// [`execute_bitmap_scan_with_stats`] would have reported.
214 pub fn stats(&self) -> BitmapScanStats {
215 BitmapScanStats {
216 candidate_tids: self.candidate_tids,
217 rows_returned: self.rows_returned,
218 pages_touched: self.pages_touched,
219 }
220 }
221}
222
223impl<'a, F: RowFetcher> Iterator for BitmapScanRows<'a, F> {
224 type Item = Result<F::Row, F::Error>;
225
226 fn next(&mut self) -> Option<Self::Item> {
227 loop {
228 // Drain the current page's row offsets first; advance to
229 // the next page group only when exhausted.
230 match self.current_rows.next() {
231 Some(row) => match self.fetcher.fetch(self.current_page, row) {
232 Ok(Some(r)) => {
233 self.rows_returned += 1;
234 return Some(Ok(r));
235 }
236 // Tombstone / deleted slot — skip without yielding.
237 Ok(None) => continue,
238 Err(e) => return Some(Err(e)),
239 },
240 None => match self.groups.next() {
241 Some((page_id, rows_within)) => {
242 self.current_page = page_id;
243 self.current_rows = rows_within.into_iter();
244 }
245 None => return None,
246 },
247 }
248 }
249 }
250}
251
252/// Construct a pull-based [`BitmapScanRows`] over `bitmap`. The page
253/// grouping (sequential-read-friendly TID ordering) is computed up
254/// front — it is metadata-sized, not row-sized — while the heap
255/// fetches stay lazy.
256pub fn execute_bitmap_scan_stream<'a, F: RowFetcher>(
257 bitmap: &TidBitmap,
258 fetcher: &'a F,
259 rows_per_page: u32,
260) -> BitmapScanRows<'a, F> {
261 let groups = bitmap.group_by_page(rows_per_page);
262 let pages_touched = groups.len() as u64;
263 BitmapScanRows {
264 fetcher,
265 groups: groups.into_iter(),
266 current_page: 0,
267 current_rows: Vec::new().into_iter(),
268 candidate_tids: bitmap.len(),
269 rows_returned: 0,
270 pages_touched,
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277
278 /// In-memory fetcher: a dense `(page, row) -> Option<u64>` map.
279 /// `None` models a tombstone the scan must skip.
280 struct MapFetcher {
281 rows: std::collections::HashMap<(u32, u32), Option<u64>>,
282 }
283
284 impl RowFetcher for MapFetcher {
285 type Row = u64;
286 type Error = ();
287 fn fetch(&self, page: u32, row: u32) -> Result<Option<u64>, ()> {
288 Ok(self.rows.get(&(page, row)).copied().flatten())
289 }
290 }
291
292 /// Fetcher that errors on a specific TID, to exercise the
293 /// `Some(Err(_))` path.
294 struct ErrFetcher {
295 fail_at: (u32, u32),
296 }
297 impl RowFetcher for ErrFetcher {
298 type Row = u64;
299 type Error = &'static str;
300 fn fetch(&self, page: u32, row: u32) -> Result<Option<u64>, &'static str> {
301 if (page, row) == self.fail_at {
302 Err("fetch failed")
303 } else {
304 Ok(Some((page as u64) * 1000 + row as u64))
305 }
306 }
307 }
308
309 fn bitmap_with(tids: &[u32]) -> TidBitmap {
310 let mut b = TidBitmap::new();
311 for &t in tids {
312 b.insert(t).unwrap();
313 }
314 b
315 }
316
317 #[test]
318 fn bitmap_stream_matches_eager_scan() {
319 // Acceptance #3 / #5: parity with the materialising path on a
320 // small fixture.
321 let rows_per_page = 128u32;
322 let tids: Vec<u32> = vec![0, 1, 5, 130, 131, 400];
323 let bitmap = bitmap_with(&tids);
324 let mut rows = std::collections::HashMap::new();
325 for &t in &tids {
326 let page = t / rows_per_page;
327 let row = t % rows_per_page;
328 rows.insert((page, row), Some(t as u64));
329 }
330 let fetcher = MapFetcher { rows };
331
332 let eager = execute_bitmap_scan(&bitmap, &fetcher, rows_per_page).unwrap();
333 let streamed: Vec<u64> = execute_bitmap_scan_stream(&bitmap, &fetcher, rows_per_page)
334 .map(|r| r.unwrap())
335 .collect();
336 assert_eq!(eager, streamed);
337 }
338
339 #[test]
340 fn bitmap_stream_skips_tombstones_and_tracks_stats() {
341 let rows_per_page = 128u32;
342 let tids: Vec<u32> = vec![0, 1, 2, 3];
343 let bitmap = bitmap_with(&tids);
344 let mut rows = std::collections::HashMap::new();
345 rows.insert((0, 0), Some(10u64));
346 rows.insert((0, 1), None); // tombstone
347 rows.insert((0, 2), Some(12u64));
348 rows.insert((0, 3), None); // tombstone
349 let fetcher = MapFetcher { rows };
350
351 let mut it = execute_bitmap_scan_stream(&bitmap, &fetcher, rows_per_page);
352 let collected: Vec<u64> = it.by_ref().map(|r| r.unwrap()).collect();
353 assert_eq!(collected, vec![10, 12]);
354
355 let stats = it.stats();
356 assert_eq!(stats.candidate_tids, 4);
357 assert_eq!(stats.rows_returned, 2);
358 // Parity with the eager stats path.
359 let (_eager_rows, eager_stats) =
360 execute_bitmap_scan_with_stats(&bitmap, &fetcher, rows_per_page).unwrap();
361 assert_eq!(eager_stats.rows_returned, stats.rows_returned);
362 assert_eq!(eager_stats.candidate_tids, stats.candidate_tids);
363 assert_eq!(eager_stats.pages_touched, stats.pages_touched);
364 }
365
366 #[test]
367 fn bitmap_stream_surfaces_fetch_errors() {
368 let bitmap = bitmap_with(&[0, 1, 2]);
369 let fetcher = ErrFetcher { fail_at: (0, 1) };
370 let mut it = execute_bitmap_scan_stream(&bitmap, &fetcher, 128);
371 assert_eq!(it.next(), Some(Ok(0))); // (0,0)
372 assert_eq!(it.next(), Some(Err("fetch failed"))); // (0,1)
373 }
374}