quill_sql/buffer/
buffer_pool.rs

1//! Low-level buffer pool responsible for frame storage, page table, and disk I/O.
2
3use bytes::{Bytes, BytesMut};
4use dashmap::DashMap;
5use parking_lot::{Mutex, RwLock};
6use std::cell::UnsafeCell;
7use std::collections::VecDeque;
8use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
9use std::sync::Arc;
10
11use crate::buffer::page::{PageId, INVALID_PAGE_ID, PAGE_SIZE};
12use crate::config::BufferPoolConfig;
13use crate::error::{QuillSQLError, QuillSQLResult};
14use crate::recovery::Lsn;
15use crate::storage::disk_scheduler::DiskScheduler;
16
17pub type FrameId = usize;
18
19pub const BUFFER_POOL_SIZE: usize = 5000;
20
21#[derive(Debug, Default, Clone)]
22pub struct FrameMetaSnapshot {
23    pub page_id: PageId,
24    pub pin_count: u32,
25    pub is_dirty: bool,
26    pub lsn: Lsn,
27}
28
29#[derive(Debug)]
30pub struct FrameMeta {
31    page_id: AtomicU32,
32    pin_count: AtomicU32,
33    is_dirty: AtomicBool,
34    lsn: AtomicU64,
35}
36
37impl Default for FrameMeta {
38    fn default() -> Self {
39        Self {
40            page_id: AtomicU32::new(INVALID_PAGE_ID),
41            pin_count: AtomicU32::new(0),
42            is_dirty: AtomicBool::new(false),
43            lsn: AtomicU64::new(0),
44        }
45    }
46}
47
48impl FrameMeta {
49    pub fn snapshot(&self) -> FrameMetaSnapshot {
50        FrameMetaSnapshot {
51            page_id: self.page_id(),
52            pin_count: self.pin_count(),
53            is_dirty: self.is_dirty(),
54            lsn: self.lsn(),
55        }
56    }
57
58    pub fn page_id(&self) -> PageId {
59        self.page_id.load(Ordering::Acquire)
60    }
61
62    pub fn set_page_id(&self, page_id: PageId) {
63        self.page_id.store(page_id, Ordering::Release);
64    }
65
66    pub fn pin_count(&self) -> u32 {
67        self.pin_count.load(Ordering::Acquire)
68    }
69
70    pub fn set_pin_count(&self, count: u32) {
71        self.pin_count.store(count, Ordering::Release);
72    }
73
74    pub fn increment_pin(&self) -> u32 {
75        self.pin_count.fetch_add(1, Ordering::AcqRel) + 1
76    }
77
78    pub fn try_decrement_pin(&self) -> Option<u32> {
79        let mut current = self.pin_count.load(Ordering::Acquire);
80        loop {
81            if current == 0 {
82                return None;
83            }
84            match self.pin_count.compare_exchange(
85                current,
86                current - 1,
87                Ordering::AcqRel,
88                Ordering::Acquire,
89            ) {
90                Ok(_) => return Some(current - 1),
91                Err(observed) => current = observed,
92            }
93        }
94    }
95
96    pub fn reset_pin(&self) {
97        self.set_pin_count(0);
98    }
99
100    pub fn is_dirty(&self) -> bool {
101        self.is_dirty.load(Ordering::Acquire)
102    }
103
104    pub fn mark_dirty(&self) {
105        self.is_dirty.store(true, Ordering::Release);
106    }
107
108    pub fn clear_dirty(&self) {
109        self.is_dirty.store(false, Ordering::Release);
110    }
111
112    pub fn lsn(&self) -> Lsn {
113        self.lsn.load(Ordering::Acquire)
114    }
115
116    pub fn set_lsn(&self, lsn: Lsn) {
117        self.lsn.store(lsn, Ordering::Release);
118    }
119
120    pub fn initialize(&self, page_id: PageId) {
121        self.set_page_id(page_id);
122        self.reset_pin();
123        self.clear_dirty();
124        self.set_lsn(0);
125    }
126
127    pub fn clear(&self) {
128        self.initialize(INVALID_PAGE_ID);
129    }
130}
131
132#[derive(Debug)]
133pub struct BufferPool {
134    arena: Box<[UnsafeCell<u8>]>,
135    locks: Vec<RwLock<()>>,
136    meta: Vec<FrameMeta>,
137    page_table: DashMap<PageId, FrameId>,
138    free_list: Mutex<VecDeque<FrameId>>,
139    disk_scheduler: Arc<DiskScheduler>,
140}
141
142unsafe impl Sync for BufferPool {}
143
144impl BufferPool {
145    pub fn new(num_pages: usize, disk_scheduler: Arc<DiskScheduler>) -> Self {
146        Self::new_with_config(
147            BufferPoolConfig {
148                buffer_pool_size: num_pages,
149                ..Default::default()
150            },
151            disk_scheduler,
152        )
153    }
154
155    pub fn new_with_config(config: BufferPoolConfig, disk_scheduler: Arc<DiskScheduler>) -> Self {
156        let num_pages = config.buffer_pool_size;
157        let mut free_list = VecDeque::with_capacity(num_pages);
158        let mut meta = Vec::with_capacity(num_pages);
159        let mut locks = Vec::with_capacity(num_pages);
160        for frame_id in 0..num_pages {
161            free_list.push_back(frame_id);
162            meta.push(FrameMeta::default());
163            locks.push(RwLock::new(()));
164        }
165        let mut arena_vec: Vec<UnsafeCell<u8>> = Vec::with_capacity(num_pages * PAGE_SIZE);
166        arena_vec.resize_with(num_pages * PAGE_SIZE, || UnsafeCell::new(0u8));
167        let arena = arena_vec.into_boxed_slice();
168
169        Self {
170            arena,
171            locks,
172            meta,
173            page_table: DashMap::new(),
174            free_list: Mutex::new(free_list),
175            disk_scheduler,
176        }
177    }
178
179    pub fn capacity(&self) -> usize {
180        self.locks.len()
181    }
182
183    pub fn frame_meta_snapshot(&self) -> Vec<FrameMetaSnapshot> {
184        (0..self.capacity())
185            .map(|frame_id| self.meta[frame_id].snapshot())
186            .collect()
187    }
188
189    pub fn frame_lock(&self, frame_id: FrameId) -> &RwLock<()> {
190        &self.locks[frame_id]
191    }
192
193    /// Returns an immutable view over the page bytes stored in `frame_id`.
194    ///
195    /// # Safety
196    /// Caller must guarantee the frame lock is held for read (or write) and that the
197    /// frame remains pinned for the duration of the returned slice. Violating the
198    /// locking protocol allows concurrent mutation and results in undefined behavior.
199    pub unsafe fn frame_slice(&self, frame_id: FrameId) -> &[u8] {
200        let ptr = self.frame_ptr(frame_id) as *const u8;
201        std::slice::from_raw_parts(ptr, PAGE_SIZE)
202    }
203
204    /// Returns a mutable view over the page bytes stored in `frame_id`.
205    ///
206    /// # Safety
207    /// Caller must hold the frame's write lock and ensure no other references to the
208    /// underlying slice exist. The buffer manager enforces this by acquiring the
209    /// corresponding `RwLock` write guard before invoking this function. Misuse causes
210    /// aliasing mutable references and leads to undefined behavior.
211    #[allow(clippy::mut_from_ref)]
212    pub unsafe fn frame_slice_mut(&self, frame_id: FrameId) -> &mut [u8] {
213        let ptr = self.frame_ptr(frame_id);
214        std::slice::from_raw_parts_mut(ptr, PAGE_SIZE)
215    }
216
217    /// Computes the raw pointer to the page bytes backing `frame_id`.
218    ///
219    /// # Safety
220    /// Equivalent to `frame_slice`/`frame_slice_mut`; caller must uphold the same
221    /// locking and lifetime guarantees before dereferencing the pointer.
222    unsafe fn frame_ptr(&self, frame_id: FrameId) -> *mut u8 {
223        self.arena.as_ptr().add(frame_id * PAGE_SIZE) as *mut u8
224    }
225
226    pub fn frame_meta(&self, frame_id: FrameId) -> &FrameMeta {
227        &self.meta[frame_id]
228    }
229
230    pub fn clear_frame_meta(&self, frame_id: FrameId) {
231        self.meta[frame_id].clear();
232    }
233
234    pub fn pop_free_frame(&self) -> Option<FrameId> {
235        self.free_list.lock().pop_front()
236    }
237
238    pub fn has_free_frame(&self) -> bool {
239        !self.free_list.lock().is_empty()
240    }
241
242    pub fn push_free_frame(&self, frame_id: FrameId) {
243        self.free_list.lock().push_back(frame_id);
244    }
245
246    pub fn insert_mapping(&self, page_id: PageId, frame_id: FrameId) {
247        self.page_table.insert(page_id, frame_id);
248    }
249
250    pub fn remove_mapping_if(&self, page_id: PageId, frame_id: FrameId) -> bool {
251        self.page_table
252            .remove_if(&page_id, |_, current| *current == frame_id)
253            .is_some()
254    }
255
256    pub fn remove_mapping(&self, page_id: PageId) {
257        self.page_table.remove(&page_id);
258    }
259
260    pub fn lookup_frame(&self, page_id: PageId) -> Option<FrameId> {
261        self.page_table.get(&page_id).map(|entry| *entry.value())
262    }
263
264    pub fn read_page_from_disk(&self, page_id: PageId) -> QuillSQLResult<BytesMut> {
265        let rx = self.disk_scheduler.schedule_read(page_id)?;
266        let data = rx
267            .recv()
268            .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
269        Ok(data)
270    }
271
272    pub fn load_page_into_frame(&self, page_id: PageId, frame_id: FrameId) -> QuillSQLResult<()> {
273        let page_bytes = self.read_page_from_disk(page_id)?;
274        let slice = unsafe { self.frame_slice_mut(frame_id) };
275        let len = PAGE_SIZE.min(page_bytes.len());
276        slice[..len].copy_from_slice(&page_bytes[..len]);
277        if len < PAGE_SIZE {
278            slice[len..].fill(0);
279        }
280        let meta = self.frame_meta(frame_id);
281        meta.initialize(page_id);
282        Ok(())
283    }
284
285    pub fn write_page_to_disk(&self, page_id: PageId, bytes: Bytes) -> QuillSQLResult<()> {
286        self.disk_scheduler
287            .schedule_write(page_id, bytes)?
288            .recv()
289            .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
290        Ok(())
291    }
292
293    pub fn allocate_page_id(&self) -> QuillSQLResult<PageId> {
294        let rx = self.disk_scheduler.schedule_allocate()?;
295        let page_id = rx
296            .recv()
297            .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
298        if page_id == INVALID_PAGE_ID {
299            return Err(QuillSQLError::Internal(
300                "DiskScheduler returned INVALID_PAGE_ID".to_string(),
301            ));
302        }
303        Ok(page_id)
304    }
305
306    pub fn disk_scheduler(&self) -> Arc<DiskScheduler> {
307        self.disk_scheduler.clone()
308    }
309
310    pub fn reset_frame(&self, frame_id: FrameId) {
311        unsafe {
312            self.frame_slice_mut(frame_id).fill(0);
313        }
314    }
315}
316
317#[cfg(test)]
318mod tests {
319    use super::*;
320    use crate::storage::disk_manager::DiskManager;
321    use tempfile::TempDir;
322
323    fn setup_pool(num_pages: usize) -> (TempDir, Arc<DiskScheduler>, BufferPool) {
324        let temp_dir = TempDir::new().unwrap();
325        let disk_manager = Arc::new(DiskManager::try_new(temp_dir.path().join("pool.db")).unwrap());
326        let scheduler = Arc::new(DiskScheduler::new(disk_manager));
327        let mut config = BufferPoolConfig::default();
328        config.buffer_pool_size = num_pages;
329        let pool = BufferPool::new_with_config(config, scheduler.clone());
330        (temp_dir, scheduler, pool)
331    }
332
333    #[test]
334    fn load_page_into_frame_populates_arena_and_meta() {
335        let (_tmp, scheduler, pool) = setup_pool(4);
336        let rx_alloc = scheduler.schedule_allocate().unwrap();
337        let page_id = rx_alloc.recv().unwrap().unwrap();
338
339        let pattern = Bytes::copy_from_slice(&vec![0xAA; PAGE_SIZE]);
340        scheduler
341            .schedule_write(page_id, pattern.clone())
342            .unwrap()
343            .recv()
344            .unwrap()
345            .unwrap();
346
347        let frame_id = pool.pop_free_frame().expect("free frame");
348        pool.load_page_into_frame(page_id, frame_id).unwrap();
349
350        {
351            let meta = pool.frame_meta(frame_id).snapshot();
352            assert_eq!(meta.page_id, page_id);
353            assert_eq!(meta.pin_count, 0);
354            assert!(!meta.is_dirty);
355            assert_eq!(meta.lsn, 0);
356        }
357
358        let data = unsafe { pool.frame_slice(frame_id) };
359        assert_eq!(data, pattern.as_ref());
360    }
361
362    #[test]
363    fn write_page_to_disk_persists_arena_bytes() {
364        let (_tmp, scheduler, pool) = setup_pool(4);
365        let page_id = scheduler
366            .schedule_allocate()
367            .unwrap()
368            .recv()
369            .unwrap()
370            .unwrap();
371
372        let frame_id = pool.pop_free_frame().expect("free frame");
373        unsafe {
374            pool.frame_slice_mut(frame_id).fill(0x3C);
375        }
376        let payload = Bytes::copy_from_slice(unsafe { pool.frame_slice(frame_id) });
377        pool.write_page_to_disk(page_id, payload.clone()).unwrap();
378
379        let read_back = scheduler
380            .schedule_read(page_id)
381            .unwrap()
382            .recv()
383            .unwrap()
384            .unwrap();
385        assert!(read_back.iter().all(|b| *b == 0x3C));
386    }
387
388    #[test]
389    fn reset_frame_clears_data_and_meta() {
390        let (_tmp, scheduler, pool) = setup_pool(2);
391        let page_id = scheduler
392            .schedule_allocate()
393            .unwrap()
394            .recv()
395            .unwrap()
396            .unwrap();
397        let frame_id = pool.pop_free_frame().expect("free frame");
398
399        {
400            let meta = pool.frame_meta(frame_id);
401            meta.set_page_id(page_id);
402            meta.set_pin_count(5);
403            meta.mark_dirty();
404            meta.set_lsn(99);
405        }
406        unsafe {
407            pool.frame_slice_mut(frame_id).fill(0x55);
408        }
409
410        pool.reset_frame(frame_id);
411        pool.clear_frame_meta(frame_id);
412
413        let meta = pool.frame_meta(frame_id).snapshot();
414        assert_eq!(meta.page_id, INVALID_PAGE_ID);
415        assert_eq!(meta.pin_count, 0);
416        assert!(!meta.is_dirty);
417        assert_eq!(meta.lsn, 0);
418
419        assert!(unsafe { pool.frame_slice(frame_id) }
420            .iter()
421            .all(|b| *b == 0));
422    }
423
424    #[test]
425    fn page_table_insert_lookup_and_remove() {
426        let (_tmp, scheduler, pool) = setup_pool(2);
427        let page_id = scheduler
428            .schedule_allocate()
429            .unwrap()
430            .recv()
431            .unwrap()
432            .unwrap();
433        let frame_id = 0;
434
435        pool.insert_mapping(page_id, frame_id);
436        assert_eq!(pool.lookup_frame(page_id), Some(frame_id));
437        assert!(pool.remove_mapping_if(page_id, frame_id));
438        assert!(pool.lookup_frame(page_id).is_none());
439
440        pool.insert_mapping(page_id, frame_id);
441        pool.remove_mapping(page_id);
442        assert!(pool.lookup_frame(page_id).is_none());
443    }
444}