Skip to main content

kyu_storage/
buffer_manager.rs

1use dashmap::DashMap;
2use kyu_common::{KyuError, KyuResult};
3
4use crate::page_id::{FileId, FrameIdx, PageId, PoolId};
5use crate::page_store::PageStore;
6use crate::pool::Pool;
7
8/// Buffer manager with split read/write pools.
9///
10/// Pages are loaded from a `PageStore` into frames. The read pool holds
11/// pages for analytical queries (70% default), the write pool holds pages
12/// for ingestion/WAL staging (30% default).
13///
14/// Eviction uses the clock (second-chance) algorithm per pool.
15pub struct BufferManager {
16    read_pool: Pool,
17    write_pool: Pool,
18    /// Maps PageId -> (PoolId, FrameIdx) for O(1) lookup.
19    page_table: DashMap<PageId, (PoolId, FrameIdx)>,
20    store: Box<dyn PageStore>,
21}
22
23impl BufferManager {
24    /// Create a new buffer manager with the given total frame count and read ratio.
25    pub fn new(total_frames: u32, read_ratio: f64, store: Box<dyn PageStore>) -> Self {
26        let read_frames = ((total_frames as f64) * read_ratio).round() as u32;
27        let write_frames = total_frames.saturating_sub(read_frames).max(1);
28        let read_frames = read_frames.max(1);
29
30        Self {
31            read_pool: Pool::new(read_frames),
32            write_pool: Pool::new(write_frames),
33            page_table: DashMap::new(),
34            store,
35        }
36    }
37
38    /// Pin a page for reading. Loads from disk if not already in memory.
39    pub fn pin_read(&self, page_id: PageId) -> KyuResult<PinnedPage<'_>> {
40        self.pin_page(page_id, PoolId::Read)
41    }
42
43    /// Pin a page for writing. Loads from disk if not already in memory.
44    pub fn pin_write(&self, page_id: PageId) -> KyuResult<PinnedPage<'_>> {
45        self.pin_page(page_id, PoolId::Write)
46    }
47
48    /// Allocate a new page in the given file and pin it for writing.
49    pub fn allocate_new_page(&self, file_id: FileId) -> KyuResult<(PageId, PinnedPage<'_>)> {
50        let page_idx = self.store.allocate_page(file_id)?;
51        let page_id = PageId::new(file_id, page_idx);
52        let pinned = self.pin_page(page_id, PoolId::Write)?;
53        Ok((page_id, pinned))
54    }
55
56    /// Flush all dirty pages to disk.
57    pub fn flush_all(&self) -> KyuResult<()> {
58        self.flush_pool(&self.read_pool)?;
59        self.flush_pool(&self.write_pool)?;
60        Ok(())
61    }
62
63    /// Flush dirty pages from a specific pool.
64    fn flush_pool(&self, pool: &Pool) -> KyuResult<()> {
65        for i in 0..pool.num_frames() {
66            let frame = pool.frame(FrameIdx(i));
67            if frame.is_dirty() && frame.has_valid_page() {
68                // SAFETY: We're reading the data to write to disk.
69                // In a production system, we'd acquire a shared latch first.
70                let data = unsafe { frame.data() };
71                self.store.write_page(frame.page_id(), data)?;
72                frame.clear_dirty();
73            }
74        }
75        Ok(())
76    }
77
78    /// Pin a page in the specified pool.
79    fn pin_page(&self, page_id: PageId, preferred_pool: PoolId) -> KyuResult<PinnedPage<'_>> {
80        // 1. Check if already in memory
81        if let Some(entry) = self.page_table.get(&page_id) {
82            let (pool_id, frame_idx) = *entry;
83            let pool = self.get_pool(pool_id);
84            let frame = pool.frame(frame_idx);
85            frame.pin();
86            frame.set_recently_used();
87            return Ok(PinnedPage {
88                bm: self,
89                page_id,
90                pool_id,
91                frame_idx,
92            });
93        }
94
95        // 2. Find a frame in the preferred pool
96        let pool = self.get_pool(preferred_pool);
97        let frame_idx = self.find_or_evict_frame(pool, preferred_pool)?;
98        let frame = pool.frame(frame_idx);
99
100        // 3. Load page data from disk
101        // SAFETY: We have exclusive access to this frame (it was just evicted/empty).
102        let data = unsafe { frame.data_mut() };
103        self.store.read_page(page_id, data)?;
104
105        // 4. Set up the frame
106        frame.set_page_id(page_id);
107        frame.pin();
108        frame.set_recently_used();
109        frame.clear_dirty();
110
111        // 5. Update page table
112        self.page_table.insert(page_id, (preferred_pool, frame_idx));
113
114        Ok(PinnedPage {
115            bm: self,
116            page_id,
117            pool_id: preferred_pool,
118            frame_idx,
119        })
120    }
121
122    /// Find an empty frame or evict one.
123    fn find_or_evict_frame(&self, pool: &Pool, _pool_id: PoolId) -> KyuResult<FrameIdx> {
124        // Try to find an empty frame first
125        if let Some(idx) = pool.find_empty() {
126            return Ok(idx);
127        }
128
129        // Evict using clock algorithm
130        let idx = pool.find_evictable().ok_or_else(|| {
131            KyuError::Storage("buffer pool exhausted: all frames are pinned".into())
132        })?;
133
134        let frame = pool.frame(idx);
135
136        // Write dirty page to disk before evicting
137        if frame.is_dirty() && frame.has_valid_page() {
138            // SAFETY: Frame is not pinned (eviction only picks unpinned frames).
139            let data = unsafe { frame.data() };
140            self.store.write_page(frame.page_id(), data)?;
141        }
142
143        // Remove old mapping
144        if frame.has_valid_page() {
145            self.page_table.remove(&frame.page_id());
146        }
147
148        frame.reset();
149        Ok(idx)
150    }
151
152    fn get_pool(&self, pool_id: PoolId) -> &Pool {
153        match pool_id {
154            PoolId::Read => &self.read_pool,
155            PoolId::Write => &self.write_pool,
156        }
157    }
158
159    /// Get the total number of frames across both pools.
160    pub fn total_frames(&self) -> u32 {
161        self.read_pool.num_frames() + self.write_pool.num_frames()
162    }
163
164    /// Get statistics about the buffer manager.
165    pub fn stats(&self) -> BufferManagerStats {
166        BufferManagerStats {
167            read_pool_frames: self.read_pool.num_frames(),
168            write_pool_frames: self.write_pool.num_frames(),
169            read_pool_loaded: self.read_pool.loaded_count(),
170            write_pool_loaded: self.write_pool.loaded_count(),
171            read_pool_dirty: self.read_pool.dirty_count(),
172            write_pool_dirty: self.write_pool.dirty_count(),
173            read_pool_pinned: self.read_pool.pinned_count(),
174            write_pool_pinned: self.write_pool.pinned_count(),
175            page_table_entries: self.page_table.len() as u32,
176        }
177    }
178}
179
180/// Statistics about buffer manager state.
181#[derive(Clone, Debug)]
182pub struct BufferManagerStats {
183    pub read_pool_frames: u32,
184    pub write_pool_frames: u32,
185    pub read_pool_loaded: u32,
186    pub write_pool_loaded: u32,
187    pub read_pool_dirty: u32,
188    pub write_pool_dirty: u32,
189    pub read_pool_pinned: u32,
190    pub write_pool_pinned: u32,
191    pub page_table_entries: u32,
192}
193
194/// RAII guard that automatically unpins a page when dropped.
195///
196/// Provides safe access to the underlying frame data.
197pub struct PinnedPage<'bm> {
198    bm: &'bm BufferManager,
199    page_id: PageId,
200    pool_id: PoolId,
201    frame_idx: FrameIdx,
202}
203
204impl<'bm> PinnedPage<'bm> {
205    /// Get the page ID.
206    pub fn page_id(&self) -> PageId {
207        self.page_id
208    }
209
210    /// Get a shared (read-only) view of the page data.
211    pub fn data(&self) -> &[u8] {
212        let pool = self.bm.get_pool(self.pool_id);
213        let frame = pool.frame(self.frame_idx);
214        // SAFETY: The page is pinned, so the frame won't be evicted.
215        // We hold a shared reference, which is safe for reading.
216        unsafe { frame.data() }
217    }
218
219    /// Get an exclusive (mutable) view of the page data. Marks the page as dirty.
220    pub fn data_mut(&mut self) -> &mut [u8] {
221        let pool = self.bm.get_pool(self.pool_id);
222        let frame = pool.frame(self.frame_idx);
223        frame.set_dirty();
224        // SAFETY: The page is pinned. We hold &mut self, preventing concurrent access.
225        unsafe { frame.data_mut() }
226    }
227
228    /// Check if this page is dirty.
229    pub fn is_dirty(&self) -> bool {
230        let pool = self.bm.get_pool(self.pool_id);
231        pool.frame(self.frame_idx).is_dirty()
232    }
233
234    /// Manually mark the page as dirty.
235    pub fn mark_dirty(&self) {
236        let pool = self.bm.get_pool(self.pool_id);
237        pool.frame(self.frame_idx).set_dirty();
238    }
239}
240
241impl Drop for PinnedPage<'_> {
242    fn drop(&mut self) {
243        let pool = self.bm.get_pool(self.pool_id);
244        pool.frame(self.frame_idx).unpin();
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use super::*;
251    use crate::page_id::PAGE_SIZE;
252    use crate::page_store::MockPageStore;
253
254    fn make_bm(frames: u32) -> BufferManager {
255        BufferManager::new(frames, 0.7, Box::new(MockPageStore::new()))
256    }
257
258    #[test]
259    fn create_buffer_manager() {
260        let bm = make_bm(10);
261        assert_eq!(bm.total_frames(), 10);
262        let stats = bm.stats();
263        assert_eq!(stats.read_pool_frames, 7);
264        assert_eq!(stats.write_pool_frames, 3);
265    }
266
267    #[test]
268    fn pin_and_unpin_read() {
269        let bm = make_bm(10);
270        let pid = PageId::new(FileId(0), 0);
271        {
272            let page = bm.pin_read(pid).unwrap();
273            assert_eq!(page.page_id(), pid);
274            assert_eq!(page.data().len(), PAGE_SIZE);
275            // Verify initial data is zeros (from MockPageStore)
276            assert!(page.data().iter().all(|&b| b == 0));
277        }
278        // After drop, the frame should be unpinned
279        let stats = bm.stats();
280        assert_eq!(stats.read_pool_pinned, 0);
281    }
282
283    #[test]
284    fn pin_write_marks_dirty() {
285        let bm = make_bm(10);
286        let pid = PageId::new(FileId(0), 0);
287        {
288            let mut page = bm.pin_write(pid).unwrap();
289            assert!(!page.is_dirty());
290            page.data_mut()[0] = 42;
291            assert!(page.is_dirty());
292        }
293    }
294
295    #[test]
296    fn pin_same_page_twice() {
297        let bm = make_bm(10);
298        let pid = PageId::new(FileId(0), 0);
299        let p1 = bm.pin_read(pid).unwrap();
300        let p2 = bm.pin_read(pid).unwrap();
301        assert_eq!(p1.page_id(), p2.page_id());
302        drop(p1);
303        drop(p2);
304    }
305
306    #[test]
307    fn write_and_read_back() {
308        let bm = make_bm(10);
309        let pid = PageId::new(FileId(0), 0);
310
311        // Write data
312        {
313            let mut page = bm.pin_write(pid).unwrap();
314            page.data_mut()[0] = 0xAB;
315            page.data_mut()[1] = 0xCD;
316        }
317
318        // Read it back (same frame, still in memory)
319        {
320            let page = bm.pin_read(pid).unwrap();
321            assert_eq!(page.data()[0], 0xAB);
322            assert_eq!(page.data()[1], 0xCD);
323        }
324    }
325
326    #[test]
327    fn allocate_new_page() {
328        let bm = make_bm(10);
329        let (pid, mut page) = bm.allocate_new_page(FileId(0)).unwrap();
330        assert_eq!(pid.file_id, FileId(0));
331        assert_eq!(pid.page_idx, 0);
332        page.data_mut()[0] = 99;
333        drop(page);
334
335        let (pid2, _page2) = bm.allocate_new_page(FileId(0)).unwrap();
336        assert_eq!(pid2.page_idx, 1);
337    }
338
339    #[test]
340    fn flush_all() {
341        let store = MockPageStore::new();
342        let bm = BufferManager::new(10, 0.7, Box::new(store));
343        let pid = PageId::new(FileId(0), 0);
344
345        {
346            let mut page = bm.pin_write(pid).unwrap();
347            page.data_mut()[0] = 0xFF;
348        }
349
350        bm.flush_all().unwrap();
351
352        let stats = bm.stats();
353        assert_eq!(stats.write_pool_dirty, 0);
354    }
355
356    #[test]
357    fn eviction_on_full_pool() {
358        // Small pool: 3 read frames + 1 write frame
359        let bm = make_bm(4);
360
361        // Fill up the read pool (3 frames)
362        for i in 0..3 {
363            let pid = PageId::new(FileId(0), i);
364            let page = bm.pin_read(pid).unwrap();
365            drop(page); // Unpin immediately so it can be evicted
366        }
367
368        assert_eq!(bm.stats().read_pool_loaded, 3);
369
370        // Pin a 4th page — should trigger eviction
371        let pid = PageId::new(FileId(0), 3);
372        let page = bm.pin_read(pid).unwrap();
373        assert_eq!(page.data().len(), PAGE_SIZE);
374        drop(page);
375    }
376
377    #[test]
378    fn eviction_writes_dirty_page() {
379        let bm = BufferManager::new(4, 0.75, Box::new(MockPageStore::new()));
380
381        // Fill 3 read frames with dirty data
382        for i in 0..3 {
383            let pid = PageId::new(FileId(0), i);
384            let page = bm.pin_read(pid).unwrap();
385            page.mark_dirty();
386            drop(page);
387        }
388
389        // Trigger eviction
390        let pid = PageId::new(FileId(0), 10);
391        let _page = bm.pin_read(pid).unwrap();
392
393        // The evicted dirty page should have been written to the store
394        // We can't easily check MockPageStore through the Box, but the
395        // test passes if eviction doesn't error
396    }
397
398    #[test]
399    fn buffer_pool_exhausted() {
400        // 2 frames total: 1 read + 1 write (minimum)
401        let bm = make_bm(2);
402
403        // Pin both frames and keep them pinned
404        let p1 = bm.pin_read(PageId::new(FileId(0), 0)).unwrap();
405
406        // Try to pin another page in the same pool — should fail
407        // since there's only 1 read frame and it's pinned
408        let result = bm.pin_read(PageId::new(FileId(0), 1));
409        assert!(result.is_err());
410
411        drop(p1);
412    }
413
414    #[test]
415    fn stats() {
416        let bm = make_bm(10);
417        let stats = bm.stats();
418        assert_eq!(stats.read_pool_loaded, 0);
419        assert_eq!(stats.write_pool_loaded, 0);
420        assert_eq!(stats.page_table_entries, 0);
421
422        let _p = bm.pin_read(PageId::new(FileId(0), 0)).unwrap();
423        let stats = bm.stats();
424        assert_eq!(stats.read_pool_loaded, 1);
425        assert_eq!(stats.read_pool_pinned, 1);
426        assert_eq!(stats.page_table_entries, 1);
427    }
428
429    #[test]
430    fn split_pool_ratio() {
431        let bm = BufferManager::new(100, 0.7, Box::new(MockPageStore::new()));
432        let stats = bm.stats();
433        assert_eq!(stats.read_pool_frames, 70);
434        assert_eq!(stats.write_pool_frames, 30);
435    }
436
437    #[test]
438    fn minimum_pool_sizes() {
439        // Even with extreme ratios, each pool gets at least 1 frame
440        let bm = BufferManager::new(2, 0.0, Box::new(MockPageStore::new()));
441        assert!(bm.read_pool.num_frames() >= 1);
442        assert!(bm.write_pool.num_frames() >= 1);
443    }
444}