quill_sql/buffer/
buffer_pool.rs

1//! Low-level buffer pool responsible for frame storage, page table, and disk I/O.
2
3use bytes::Bytes;
4use dashmap::DashMap;
5use parking_lot::{Mutex, MutexGuard, RwLock};
6use std::cell::UnsafeCell;
7use std::collections::VecDeque;
8use std::sync::Arc;
9
10use crate::buffer::page::{PageId, INVALID_PAGE_ID, PAGE_SIZE};
11use crate::config::BufferPoolConfig;
12use crate::error::{QuillSQLError, QuillSQLResult};
13use crate::recovery::Lsn;
14use crate::storage::disk_scheduler::DiskScheduler;
15
16pub type FrameId = usize;
17
18pub const BUFFER_POOL_SIZE: usize = 5000;
19
20#[derive(Debug, Default, Clone)]
21pub struct FrameMeta {
22    pub page_id: PageId,
23    pub pin_count: u32,
24    pub is_dirty: bool,
25    pub lsn: Lsn,
26}
27
28#[derive(Debug)]
29pub struct BufferPool {
30    arena: Box<[UnsafeCell<u8>]>,
31    locks: Vec<RwLock<()>>,
32    meta: Vec<Mutex<FrameMeta>>,
33    page_table: DashMap<PageId, FrameId>,
34    free_list: Mutex<VecDeque<FrameId>>,
35    disk_scheduler: Arc<DiskScheduler>,
36}
37
38unsafe impl Sync for BufferPool {}
39
40impl BufferPool {
41    pub fn new(num_pages: usize, disk_scheduler: Arc<DiskScheduler>) -> Self {
42        Self::new_with_config(
43            BufferPoolConfig {
44                buffer_pool_size: num_pages,
45                ..Default::default()
46            },
47            disk_scheduler,
48        )
49    }
50
51    pub fn new_with_config(config: BufferPoolConfig, disk_scheduler: Arc<DiskScheduler>) -> Self {
52        let num_pages = config.buffer_pool_size;
53        let mut free_list = VecDeque::with_capacity(num_pages);
54        let mut meta = Vec::with_capacity(num_pages);
55        let mut locks = Vec::with_capacity(num_pages);
56        for frame_id in 0..num_pages {
57            free_list.push_back(frame_id);
58            meta.push(Mutex::new(FrameMeta::default()));
59            locks.push(RwLock::new(()));
60        }
61        let mut arena_vec: Vec<UnsafeCell<u8>> = Vec::with_capacity(num_pages * PAGE_SIZE);
62        arena_vec.resize_with(num_pages * PAGE_SIZE, || UnsafeCell::new(0u8));
63        let arena = arena_vec.into_boxed_slice();
64
65        Self {
66            arena,
67            locks,
68            meta,
69            page_table: DashMap::new(),
70            free_list: Mutex::new(free_list),
71            disk_scheduler,
72        }
73    }
74
75    pub fn capacity(&self) -> usize {
76        self.locks.len()
77    }
78
79    pub fn frame_lock(&self, frame_id: FrameId) -> &RwLock<()> {
80        &self.locks[frame_id]
81    }
82
83    /// Returns an immutable view over the page bytes stored in `frame_id`.
84    ///
85    /// # Safety
86    /// Caller must guarantee the frame lock is held for read (or write) and that the
87    /// frame remains pinned for the duration of the returned slice. Violating the
88    /// locking protocol allows concurrent mutation and results in undefined behavior.
89    pub unsafe fn frame_slice(&self, frame_id: FrameId) -> &[u8] {
90        let ptr = self.frame_ptr(frame_id) as *const u8;
91        std::slice::from_raw_parts(ptr, PAGE_SIZE)
92    }
93
94    /// Returns a mutable view over the page bytes stored in `frame_id`.
95    ///
96    /// # Safety
97    /// Caller must hold the frame's write lock and ensure no other references to the
98    /// underlying slice exist. The buffer manager enforces this by acquiring the
99    /// corresponding `RwLock` write guard before invoking this function. Misuse causes
100    /// aliasing mutable references and leads to undefined behavior.
101    #[allow(clippy::mut_from_ref)]
102    pub unsafe fn frame_slice_mut(&self, frame_id: FrameId) -> &mut [u8] {
103        let ptr = self.frame_ptr(frame_id);
104        std::slice::from_raw_parts_mut(ptr, PAGE_SIZE)
105    }
106
107    /// Computes the raw pointer to the page bytes backing `frame_id`.
108    ///
109    /// # Safety
110    /// Equivalent to `frame_slice`/`frame_slice_mut`; caller must uphold the same
111    /// locking and lifetime guarantees before dereferencing the pointer.
112    unsafe fn frame_ptr(&self, frame_id: FrameId) -> *mut u8 {
113        self.arena.as_ptr().add(frame_id * PAGE_SIZE) as *mut u8
114    }
115
116    pub fn frame_meta(&self, frame_id: FrameId) -> MutexGuard<'_, FrameMeta> {
117        self.meta[frame_id].lock()
118    }
119
120    pub fn clear_frame_meta(&self, frame_id: FrameId) {
121        let mut meta = self.meta[frame_id].lock();
122        *meta = FrameMeta::default();
123    }
124
125    pub fn pop_free_frame(&self) -> Option<FrameId> {
126        self.free_list.lock().pop_front()
127    }
128
129    pub fn has_free_frame(&self) -> bool {
130        !self.free_list.lock().is_empty()
131    }
132
133    pub fn push_free_frame(&self, frame_id: FrameId) {
134        self.free_list.lock().push_back(frame_id);
135    }
136
137    pub fn insert_mapping(&self, page_id: PageId, frame_id: FrameId) {
138        self.page_table.insert(page_id, frame_id);
139    }
140
141    pub fn remove_mapping_if(&self, page_id: PageId, frame_id: FrameId) -> bool {
142        self.page_table
143            .remove_if(&page_id, |_, current| *current == frame_id)
144            .is_some()
145    }
146
147    pub fn remove_mapping(&self, page_id: PageId) {
148        self.page_table.remove(&page_id);
149    }
150
151    pub fn lookup_frame(&self, page_id: PageId) -> Option<FrameId> {
152        self.page_table.get(&page_id).map(|entry| *entry.value())
153    }
154
155    pub fn read_page_from_disk(&self, page_id: PageId) -> QuillSQLResult<Vec<u8>> {
156        let rx = self.disk_scheduler.schedule_read(page_id)?;
157        let data = rx
158            .recv()
159            .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
160        Ok(data.to_vec())
161    }
162
163    pub fn load_page_into_frame(&self, page_id: PageId, frame_id: FrameId) -> QuillSQLResult<()> {
164        let page_bytes = self.read_page_from_disk(page_id)?;
165        let slice = unsafe { self.frame_slice_mut(frame_id) };
166        let len = PAGE_SIZE.min(page_bytes.len());
167        slice[..len].copy_from_slice(&page_bytes[..len]);
168        if len < PAGE_SIZE {
169            slice[len..].fill(0);
170        }
171        let mut meta = self.meta[frame_id].lock();
172        meta.page_id = page_id;
173        meta.is_dirty = false;
174        meta.pin_count = 0;
175        meta.lsn = 0;
176        Ok(())
177    }
178
179    pub fn write_page_to_disk(&self, page_id: PageId, bytes: Bytes) -> QuillSQLResult<()> {
180        self.disk_scheduler
181            .schedule_write(page_id, bytes)?
182            .recv()
183            .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
184        Ok(())
185    }
186
187    pub fn allocate_page_id(&self) -> QuillSQLResult<PageId> {
188        let rx = self.disk_scheduler.schedule_allocate()?;
189        let page_id = rx
190            .recv()
191            .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
192        if page_id == INVALID_PAGE_ID {
193            return Err(QuillSQLError::Internal(
194                "DiskScheduler returned INVALID_PAGE_ID".to_string(),
195            ));
196        }
197        Ok(page_id)
198    }
199
200    pub fn disk_scheduler(&self) -> Arc<DiskScheduler> {
201        self.disk_scheduler.clone()
202    }
203
204    pub fn reset_frame(&self, frame_id: FrameId) {
205        unsafe {
206            self.frame_slice_mut(frame_id).fill(0);
207        }
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214    use crate::storage::disk_manager::DiskManager;
215    use tempfile::TempDir;
216
217    fn setup_pool(num_pages: usize) -> (TempDir, Arc<DiskScheduler>, BufferPool) {
218        let temp_dir = TempDir::new().unwrap();
219        let disk_manager = Arc::new(DiskManager::try_new(temp_dir.path().join("pool.db")).unwrap());
220        let scheduler = Arc::new(DiskScheduler::new(disk_manager));
221        let mut config = BufferPoolConfig::default();
222        config.buffer_pool_size = num_pages;
223        let pool = BufferPool::new_with_config(config, scheduler.clone());
224        (temp_dir, scheduler, pool)
225    }
226
227    #[test]
228    fn load_page_into_frame_populates_arena_and_meta() {
229        let (_tmp, scheduler, pool) = setup_pool(4);
230        let rx_alloc = scheduler.schedule_allocate().unwrap();
231        let page_id = rx_alloc.recv().unwrap().unwrap();
232
233        let pattern = Bytes::copy_from_slice(&vec![0xAA; PAGE_SIZE]);
234        scheduler
235            .schedule_write(page_id, pattern.clone())
236            .unwrap()
237            .recv()
238            .unwrap()
239            .unwrap();
240
241        let frame_id = pool.pop_free_frame().expect("free frame");
242        pool.load_page_into_frame(page_id, frame_id).unwrap();
243
244        {
245            let meta = pool.frame_meta(frame_id);
246            assert_eq!(meta.page_id, page_id);
247            assert_eq!(meta.pin_count, 0);
248            assert!(!meta.is_dirty);
249            assert_eq!(meta.lsn, 0);
250        }
251
252        let data = unsafe { pool.frame_slice(frame_id) };
253        assert_eq!(data, pattern.as_ref());
254    }
255
256    #[test]
257    fn write_page_to_disk_persists_arena_bytes() {
258        let (_tmp, scheduler, pool) = setup_pool(4);
259        let page_id = scheduler
260            .schedule_allocate()
261            .unwrap()
262            .recv()
263            .unwrap()
264            .unwrap();
265
266        let frame_id = pool.pop_free_frame().expect("free frame");
267        unsafe {
268            pool.frame_slice_mut(frame_id).fill(0x3C);
269        }
270        let payload = Bytes::copy_from_slice(unsafe { pool.frame_slice(frame_id) });
271        pool.write_page_to_disk(page_id, payload.clone()).unwrap();
272
273        let read_back = scheduler
274            .schedule_read(page_id)
275            .unwrap()
276            .recv()
277            .unwrap()
278            .unwrap();
279        assert!(read_back.iter().all(|b| *b == 0x3C));
280    }
281
282    #[test]
283    fn reset_frame_clears_data_and_meta() {
284        let (_tmp, scheduler, pool) = setup_pool(2);
285        let page_id = scheduler
286            .schedule_allocate()
287            .unwrap()
288            .recv()
289            .unwrap()
290            .unwrap();
291        let frame_id = pool.pop_free_frame().expect("free frame");
292
293        {
294            let mut meta = pool.frame_meta(frame_id);
295            meta.page_id = page_id;
296            meta.pin_count = 5;
297            meta.is_dirty = true;
298            meta.lsn = 99;
299        }
300        unsafe {
301            pool.frame_slice_mut(frame_id).fill(0x55);
302        }
303
304        pool.reset_frame(frame_id);
305        pool.clear_frame_meta(frame_id);
306
307        let meta = pool.frame_meta(frame_id);
308        assert_eq!(meta.page_id, INVALID_PAGE_ID);
309        assert_eq!(meta.pin_count, 0);
310        assert!(!meta.is_dirty);
311        assert_eq!(meta.lsn, 0);
312        drop(meta);
313
314        assert!(unsafe { pool.frame_slice(frame_id) }
315            .iter()
316            .all(|b| *b == 0));
317    }
318
319    #[test]
320    fn page_table_insert_lookup_and_remove() {
321        let (_tmp, scheduler, pool) = setup_pool(2);
322        let page_id = scheduler
323            .schedule_allocate()
324            .unwrap()
325            .recv()
326            .unwrap()
327            .unwrap();
328        let frame_id = 0;
329
330        pool.insert_mapping(page_id, frame_id);
331        assert_eq!(pool.lookup_frame(page_id), Some(frame_id));
332        assert!(pool.remove_mapping_if(page_id, frame_id));
333        assert!(pool.lookup_frame(page_id).is_none());
334
335        pool.insert_mapping(page_id, frame_id);
336        pool.remove_mapping(page_id);
337        assert!(pool.lookup_frame(page_id).is_none());
338    }
339}