quill_sql/buffer/
buffer_pool.rs

1//! Low-level buffer pool responsible for frame storage, page table, and disk I/O.
2
3use bytes::{Bytes, BytesMut};
4use dashmap::DashMap;
5use parking_lot::{Mutex, RwLock};
6use std::cell::UnsafeCell;
7use std::collections::VecDeque;
8use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
9use std::sync::Arc;
10
11use crate::buffer::page::{PageId, INVALID_PAGE_ID, PAGE_SIZE};
12use crate::config::BufferPoolConfig;
13use crate::error::{QuillSQLError, QuillSQLResult};
14use crate::recovery::Lsn;
15use crate::storage::disk_scheduler::DiskScheduler;
16
17pub type FrameId = usize;
18
19pub const BUFFER_POOL_SIZE: usize = 5000;
20
21#[derive(Debug, Default, Clone)]
22pub struct FrameMetaSnapshot {
23    pub page_id: PageId,
24    pub pin_count: u32,
25    pub is_dirty: bool,
26    pub lsn: Lsn,
27}
28
29#[derive(Debug)]
30pub struct FrameMeta {
31    page_id: AtomicU32,
32    pin_count: AtomicU32,
33    is_dirty: AtomicBool,
34    lsn: AtomicU64,
35}
36
37impl Default for FrameMeta {
38    fn default() -> Self {
39        Self {
40            page_id: AtomicU32::new(INVALID_PAGE_ID),
41            pin_count: AtomicU32::new(0),
42            is_dirty: AtomicBool::new(false),
43            lsn: AtomicU64::new(0),
44        }
45    }
46}
47
48impl FrameMeta {
49    pub fn snapshot(&self) -> FrameMetaSnapshot {
50        FrameMetaSnapshot {
51            page_id: self.page_id(),
52            pin_count: self.pin_count(),
53            is_dirty: self.is_dirty(),
54            lsn: self.lsn(),
55        }
56    }
57
58    pub fn page_id(&self) -> PageId {
59        self.page_id.load(Ordering::Acquire)
60    }
61
62    pub fn set_page_id(&self, page_id: PageId) {
63        self.page_id.store(page_id, Ordering::Release);
64    }
65
66    pub fn pin_count(&self) -> u32 {
67        self.pin_count.load(Ordering::Acquire)
68    }
69
70    pub fn set_pin_count(&self, count: u32) {
71        self.pin_count.store(count, Ordering::Release);
72    }
73
74    pub fn increment_pin(&self) -> u32 {
75        self.pin_count.fetch_add(1, Ordering::AcqRel) + 1
76    }
77
78    pub fn try_decrement_pin(&self) -> Option<u32> {
79        let mut current = self.pin_count.load(Ordering::Acquire);
80        loop {
81            if current == 0 {
82                return None;
83            }
84            match self.pin_count.compare_exchange(
85                current,
86                current - 1,
87                Ordering::AcqRel,
88                Ordering::Acquire,
89            ) {
90                Ok(_) => return Some(current - 1),
91                Err(observed) => current = observed,
92            }
93        }
94    }
95
96    pub fn reset_pin(&self) {
97        self.set_pin_count(0);
98    }
99
100    pub fn is_dirty(&self) -> bool {
101        self.is_dirty.load(Ordering::Acquire)
102    }
103
104    pub fn mark_dirty(&self) {
105        self.is_dirty.store(true, Ordering::Release);
106    }
107
108    pub fn clear_dirty(&self) {
109        self.is_dirty.store(false, Ordering::Release);
110    }
111
112    pub fn lsn(&self) -> Lsn {
113        self.lsn.load(Ordering::Acquire)
114    }
115
116    pub fn set_lsn(&self, lsn: Lsn) {
117        self.lsn.store(lsn, Ordering::Release);
118    }
119
120    pub fn initialize(&self, page_id: PageId) {
121        self.set_page_id(page_id);
122        self.reset_pin();
123        self.clear_dirty();
124        self.set_lsn(0);
125    }
126
127    pub fn clear(&self) {
128        self.initialize(INVALID_PAGE_ID);
129    }
130}
131
132#[derive(Debug)]
133pub struct BufferPool {
134    arena: Box<[UnsafeCell<u8>]>,
135    locks: Vec<RwLock<()>>,
136    meta: Vec<FrameMeta>,
137    page_table: DashMap<PageId, FrameId>,
138    free_list: Mutex<VecDeque<FrameId>>,
139    disk_scheduler: Arc<DiskScheduler>,
140}
141
142unsafe impl Sync for BufferPool {}
143
144impl BufferPool {
145    pub fn new(num_pages: usize, disk_scheduler: Arc<DiskScheduler>) -> Self {
146        Self::new_with_config(
147            BufferPoolConfig {
148                buffer_pool_size: num_pages,
149                ..Default::default()
150            },
151            disk_scheduler,
152        )
153    }
154
155    pub fn new_with_config(config: BufferPoolConfig, disk_scheduler: Arc<DiskScheduler>) -> Self {
156        let num_pages = config.buffer_pool_size;
157        let mut free_list = VecDeque::with_capacity(num_pages);
158        let mut meta = Vec::with_capacity(num_pages);
159        let mut locks = Vec::with_capacity(num_pages);
160        for frame_id in 0..num_pages {
161            free_list.push_back(frame_id);
162            meta.push(FrameMeta::default());
163            locks.push(RwLock::new(()));
164        }
165        let mut arena_vec: Vec<UnsafeCell<u8>> = Vec::with_capacity(num_pages * PAGE_SIZE);
166        arena_vec.resize_with(num_pages * PAGE_SIZE, || UnsafeCell::new(0u8));
167        let arena = arena_vec.into_boxed_slice();
168
169        Self {
170            arena,
171            locks,
172            meta,
173            page_table: DashMap::new(),
174            free_list: Mutex::new(free_list),
175            disk_scheduler,
176        }
177    }
178
179    pub fn capacity(&self) -> usize {
180        self.locks.len()
181    }
182
183    pub fn frame_lock(&self, frame_id: FrameId) -> &RwLock<()> {
184        &self.locks[frame_id]
185    }
186
187    /// Returns an immutable view over the page bytes stored in `frame_id`.
188    ///
189    /// # Safety
190    /// Caller must guarantee the frame lock is held for read (or write) and that the
191    /// frame remains pinned for the duration of the returned slice. Violating the
192    /// locking protocol allows concurrent mutation and results in undefined behavior.
193    pub unsafe fn frame_slice(&self, frame_id: FrameId) -> &[u8] {
194        let ptr = self.frame_ptr(frame_id) as *const u8;
195        std::slice::from_raw_parts(ptr, PAGE_SIZE)
196    }
197
198    /// Returns a mutable view over the page bytes stored in `frame_id`.
199    ///
200    /// # Safety
201    /// Caller must hold the frame's write lock and ensure no other references to the
202    /// underlying slice exist. The buffer manager enforces this by acquiring the
203    /// corresponding `RwLock` write guard before invoking this function. Misuse causes
204    /// aliasing mutable references and leads to undefined behavior.
205    #[allow(clippy::mut_from_ref)]
206    pub unsafe fn frame_slice_mut(&self, frame_id: FrameId) -> &mut [u8] {
207        let ptr = self.frame_ptr(frame_id);
208        std::slice::from_raw_parts_mut(ptr, PAGE_SIZE)
209    }
210
211    /// Computes the raw pointer to the page bytes backing `frame_id`.
212    ///
213    /// # Safety
214    /// Equivalent to `frame_slice`/`frame_slice_mut`; caller must uphold the same
215    /// locking and lifetime guarantees before dereferencing the pointer.
216    unsafe fn frame_ptr(&self, frame_id: FrameId) -> *mut u8 {
217        self.arena.as_ptr().add(frame_id * PAGE_SIZE) as *mut u8
218    }
219
220    pub fn frame_meta(&self, frame_id: FrameId) -> &FrameMeta {
221        &self.meta[frame_id]
222    }
223
224    pub fn clear_frame_meta(&self, frame_id: FrameId) {
225        self.meta[frame_id].clear();
226    }
227
228    pub fn pop_free_frame(&self) -> Option<FrameId> {
229        self.free_list.lock().pop_front()
230    }
231
232    pub fn has_free_frame(&self) -> bool {
233        !self.free_list.lock().is_empty()
234    }
235
236    pub fn push_free_frame(&self, frame_id: FrameId) {
237        self.free_list.lock().push_back(frame_id);
238    }
239
240    pub fn insert_mapping(&self, page_id: PageId, frame_id: FrameId) {
241        self.page_table.insert(page_id, frame_id);
242    }
243
244    pub fn remove_mapping_if(&self, page_id: PageId, frame_id: FrameId) -> bool {
245        self.page_table
246            .remove_if(&page_id, |_, current| *current == frame_id)
247            .is_some()
248    }
249
250    pub fn remove_mapping(&self, page_id: PageId) {
251        self.page_table.remove(&page_id);
252    }
253
254    pub fn lookup_frame(&self, page_id: PageId) -> Option<FrameId> {
255        self.page_table.get(&page_id).map(|entry| *entry.value())
256    }
257
258    pub fn read_page_from_disk(&self, page_id: PageId) -> QuillSQLResult<BytesMut> {
259        let rx = self.disk_scheduler.schedule_read(page_id)?;
260        let data = rx
261            .recv()
262            .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
263        Ok(data)
264    }
265
266    pub fn load_page_into_frame(&self, page_id: PageId, frame_id: FrameId) -> QuillSQLResult<()> {
267        let page_bytes = self.read_page_from_disk(page_id)?;
268        let slice = unsafe { self.frame_slice_mut(frame_id) };
269        let len = PAGE_SIZE.min(page_bytes.len());
270        slice[..len].copy_from_slice(&page_bytes[..len]);
271        if len < PAGE_SIZE {
272            slice[len..].fill(0);
273        }
274        let meta = self.frame_meta(frame_id);
275        meta.initialize(page_id);
276        Ok(())
277    }
278
279    pub fn write_page_to_disk(&self, page_id: PageId, bytes: Bytes) -> QuillSQLResult<()> {
280        self.disk_scheduler
281            .schedule_write(page_id, bytes)?
282            .recv()
283            .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
284        Ok(())
285    }
286
287    pub fn allocate_page_id(&self) -> QuillSQLResult<PageId> {
288        let rx = self.disk_scheduler.schedule_allocate()?;
289        let page_id = rx
290            .recv()
291            .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
292        if page_id == INVALID_PAGE_ID {
293            return Err(QuillSQLError::Internal(
294                "DiskScheduler returned INVALID_PAGE_ID".to_string(),
295            ));
296        }
297        Ok(page_id)
298    }
299
300    pub fn disk_scheduler(&self) -> Arc<DiskScheduler> {
301        self.disk_scheduler.clone()
302    }
303
304    pub fn reset_frame(&self, frame_id: FrameId) {
305        unsafe {
306            self.frame_slice_mut(frame_id).fill(0);
307        }
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use crate::storage::disk_manager::DiskManager;
315    use tempfile::TempDir;
316
317    fn setup_pool(num_pages: usize) -> (TempDir, Arc<DiskScheduler>, BufferPool) {
318        let temp_dir = TempDir::new().unwrap();
319        let disk_manager = Arc::new(DiskManager::try_new(temp_dir.path().join("pool.db")).unwrap());
320        let scheduler = Arc::new(DiskScheduler::new(disk_manager));
321        let mut config = BufferPoolConfig::default();
322        config.buffer_pool_size = num_pages;
323        let pool = BufferPool::new_with_config(config, scheduler.clone());
324        (temp_dir, scheduler, pool)
325    }
326
327    #[test]
328    fn load_page_into_frame_populates_arena_and_meta() {
329        let (_tmp, scheduler, pool) = setup_pool(4);
330        let rx_alloc = scheduler.schedule_allocate().unwrap();
331        let page_id = rx_alloc.recv().unwrap().unwrap();
332
333        let pattern = Bytes::copy_from_slice(&vec![0xAA; PAGE_SIZE]);
334        scheduler
335            .schedule_write(page_id, pattern.clone())
336            .unwrap()
337            .recv()
338            .unwrap()
339            .unwrap();
340
341        let frame_id = pool.pop_free_frame().expect("free frame");
342        pool.load_page_into_frame(page_id, frame_id).unwrap();
343
344        {
345            let meta = pool.frame_meta(frame_id).snapshot();
346            assert_eq!(meta.page_id, page_id);
347            assert_eq!(meta.pin_count, 0);
348            assert!(!meta.is_dirty);
349            assert_eq!(meta.lsn, 0);
350        }
351
352        let data = unsafe { pool.frame_slice(frame_id) };
353        assert_eq!(data, pattern.as_ref());
354    }
355
356    #[test]
357    fn write_page_to_disk_persists_arena_bytes() {
358        let (_tmp, scheduler, pool) = setup_pool(4);
359        let page_id = scheduler
360            .schedule_allocate()
361            .unwrap()
362            .recv()
363            .unwrap()
364            .unwrap();
365
366        let frame_id = pool.pop_free_frame().expect("free frame");
367        unsafe {
368            pool.frame_slice_mut(frame_id).fill(0x3C);
369        }
370        let payload = Bytes::copy_from_slice(unsafe { pool.frame_slice(frame_id) });
371        pool.write_page_to_disk(page_id, payload.clone()).unwrap();
372
373        let read_back = scheduler
374            .schedule_read(page_id)
375            .unwrap()
376            .recv()
377            .unwrap()
378            .unwrap();
379        assert!(read_back.iter().all(|b| *b == 0x3C));
380    }
381
382    #[test]
383    fn reset_frame_clears_data_and_meta() {
384        let (_tmp, scheduler, pool) = setup_pool(2);
385        let page_id = scheduler
386            .schedule_allocate()
387            .unwrap()
388            .recv()
389            .unwrap()
390            .unwrap();
391        let frame_id = pool.pop_free_frame().expect("free frame");
392
393        {
394            let meta = pool.frame_meta(frame_id);
395            meta.set_page_id(page_id);
396            meta.set_pin_count(5);
397            meta.mark_dirty();
398            meta.set_lsn(99);
399        }
400        unsafe {
401            pool.frame_slice_mut(frame_id).fill(0x55);
402        }
403
404        pool.reset_frame(frame_id);
405        pool.clear_frame_meta(frame_id);
406
407        let meta = pool.frame_meta(frame_id).snapshot();
408        assert_eq!(meta.page_id, INVALID_PAGE_ID);
409        assert_eq!(meta.pin_count, 0);
410        assert!(!meta.is_dirty);
411        assert_eq!(meta.lsn, 0);
412
413        assert!(unsafe { pool.frame_slice(frame_id) }
414            .iter()
415            .all(|b| *b == 0));
416    }
417
418    #[test]
419    fn page_table_insert_lookup_and_remove() {
420        let (_tmp, scheduler, pool) = setup_pool(2);
421        let page_id = scheduler
422            .schedule_allocate()
423            .unwrap()
424            .recv()
425            .unwrap()
426            .unwrap();
427        let frame_id = 0;
428
429        pool.insert_mapping(page_id, frame_id);
430        assert_eq!(pool.lookup_frame(page_id), Some(frame_id));
431        assert!(pool.remove_mapping_if(page_id, frame_id));
432        assert!(pool.lookup_frame(page_id).is_none());
433
434        pool.insert_mapping(page_id, frame_id);
435        pool.remove_mapping(page_id);
436        assert!(pool.lookup_frame(page_id).is_none());
437    }
438}