1use bytes::{Bytes, BytesMut};
4use dashmap::DashMap;
5use parking_lot::{Mutex, RwLock};
6use std::cell::UnsafeCell;
7use std::collections::VecDeque;
8use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
9use std::sync::Arc;
10
11use crate::buffer::page::{PageId, INVALID_PAGE_ID, PAGE_SIZE};
12use crate::config::BufferPoolConfig;
13use crate::error::{QuillSQLError, QuillSQLResult};
14use crate::recovery::Lsn;
15use crate::storage::disk_scheduler::DiskScheduler;
16
17pub type FrameId = usize;
18
19pub const BUFFER_POOL_SIZE: usize = 5000;
20
21#[derive(Debug, Default, Clone)]
22pub struct FrameMetaSnapshot {
23 pub page_id: PageId,
24 pub pin_count: u32,
25 pub is_dirty: bool,
26 pub lsn: Lsn,
27}
28
29#[derive(Debug)]
30pub struct FrameMeta {
31 page_id: AtomicU32,
32 pin_count: AtomicU32,
33 is_dirty: AtomicBool,
34 lsn: AtomicU64,
35}
36
37impl Default for FrameMeta {
38 fn default() -> Self {
39 Self {
40 page_id: AtomicU32::new(INVALID_PAGE_ID),
41 pin_count: AtomicU32::new(0),
42 is_dirty: AtomicBool::new(false),
43 lsn: AtomicU64::new(0),
44 }
45 }
46}
47
48impl FrameMeta {
49 pub fn snapshot(&self) -> FrameMetaSnapshot {
50 FrameMetaSnapshot {
51 page_id: self.page_id(),
52 pin_count: self.pin_count(),
53 is_dirty: self.is_dirty(),
54 lsn: self.lsn(),
55 }
56 }
57
58 pub fn page_id(&self) -> PageId {
59 self.page_id.load(Ordering::Acquire)
60 }
61
62 pub fn set_page_id(&self, page_id: PageId) {
63 self.page_id.store(page_id, Ordering::Release);
64 }
65
66 pub fn pin_count(&self) -> u32 {
67 self.pin_count.load(Ordering::Acquire)
68 }
69
70 pub fn set_pin_count(&self, count: u32) {
71 self.pin_count.store(count, Ordering::Release);
72 }
73
74 pub fn increment_pin(&self) -> u32 {
75 self.pin_count.fetch_add(1, Ordering::AcqRel) + 1
76 }
77
78 pub fn try_decrement_pin(&self) -> Option<u32> {
79 let mut current = self.pin_count.load(Ordering::Acquire);
80 loop {
81 if current == 0 {
82 return None;
83 }
84 match self.pin_count.compare_exchange(
85 current,
86 current - 1,
87 Ordering::AcqRel,
88 Ordering::Acquire,
89 ) {
90 Ok(_) => return Some(current - 1),
91 Err(observed) => current = observed,
92 }
93 }
94 }
95
96 pub fn reset_pin(&self) {
97 self.set_pin_count(0);
98 }
99
100 pub fn is_dirty(&self) -> bool {
101 self.is_dirty.load(Ordering::Acquire)
102 }
103
104 pub fn mark_dirty(&self) {
105 self.is_dirty.store(true, Ordering::Release);
106 }
107
108 pub fn clear_dirty(&self) {
109 self.is_dirty.store(false, Ordering::Release);
110 }
111
112 pub fn lsn(&self) -> Lsn {
113 self.lsn.load(Ordering::Acquire)
114 }
115
116 pub fn set_lsn(&self, lsn: Lsn) {
117 self.lsn.store(lsn, Ordering::Release);
118 }
119
120 pub fn initialize(&self, page_id: PageId) {
121 self.set_page_id(page_id);
122 self.reset_pin();
123 self.clear_dirty();
124 self.set_lsn(0);
125 }
126
127 pub fn clear(&self) {
128 self.initialize(INVALID_PAGE_ID);
129 }
130}
131
132#[derive(Debug)]
133pub struct BufferPool {
134 arena: Box<[UnsafeCell<u8>]>,
135 locks: Vec<RwLock<()>>,
136 meta: Vec<FrameMeta>,
137 page_table: DashMap<PageId, FrameId>,
138 free_list: Mutex<VecDeque<FrameId>>,
139 disk_scheduler: Arc<DiskScheduler>,
140}
141
142unsafe impl Sync for BufferPool {}
143
144impl BufferPool {
145 pub fn new(num_pages: usize, disk_scheduler: Arc<DiskScheduler>) -> Self {
146 Self::new_with_config(
147 BufferPoolConfig {
148 buffer_pool_size: num_pages,
149 ..Default::default()
150 },
151 disk_scheduler,
152 )
153 }
154
155 pub fn new_with_config(config: BufferPoolConfig, disk_scheduler: Arc<DiskScheduler>) -> Self {
156 let num_pages = config.buffer_pool_size;
157 let mut free_list = VecDeque::with_capacity(num_pages);
158 let mut meta = Vec::with_capacity(num_pages);
159 let mut locks = Vec::with_capacity(num_pages);
160 for frame_id in 0..num_pages {
161 free_list.push_back(frame_id);
162 meta.push(FrameMeta::default());
163 locks.push(RwLock::new(()));
164 }
165 let mut arena_vec: Vec<UnsafeCell<u8>> = Vec::with_capacity(num_pages * PAGE_SIZE);
166 arena_vec.resize_with(num_pages * PAGE_SIZE, || UnsafeCell::new(0u8));
167 let arena = arena_vec.into_boxed_slice();
168
169 Self {
170 arena,
171 locks,
172 meta,
173 page_table: DashMap::new(),
174 free_list: Mutex::new(free_list),
175 disk_scheduler,
176 }
177 }
178
179 pub fn capacity(&self) -> usize {
180 self.locks.len()
181 }
182
183 pub fn frame_meta_snapshot(&self) -> Vec<FrameMetaSnapshot> {
184 (0..self.capacity())
185 .map(|frame_id| self.meta[frame_id].snapshot())
186 .collect()
187 }
188
189 pub fn frame_lock(&self, frame_id: FrameId) -> &RwLock<()> {
190 &self.locks[frame_id]
191 }
192
193 pub unsafe fn frame_slice(&self, frame_id: FrameId) -> &[u8] {
200 let ptr = self.frame_ptr(frame_id) as *const u8;
201 std::slice::from_raw_parts(ptr, PAGE_SIZE)
202 }
203
204 #[allow(clippy::mut_from_ref)]
212 pub unsafe fn frame_slice_mut(&self, frame_id: FrameId) -> &mut [u8] {
213 let ptr = self.frame_ptr(frame_id);
214 std::slice::from_raw_parts_mut(ptr, PAGE_SIZE)
215 }
216
217 unsafe fn frame_ptr(&self, frame_id: FrameId) -> *mut u8 {
223 self.arena.as_ptr().add(frame_id * PAGE_SIZE) as *mut u8
224 }
225
226 pub fn frame_meta(&self, frame_id: FrameId) -> &FrameMeta {
227 &self.meta[frame_id]
228 }
229
230 pub fn clear_frame_meta(&self, frame_id: FrameId) {
231 self.meta[frame_id].clear();
232 }
233
234 pub fn pop_free_frame(&self) -> Option<FrameId> {
235 self.free_list.lock().pop_front()
236 }
237
238 pub fn has_free_frame(&self) -> bool {
239 !self.free_list.lock().is_empty()
240 }
241
242 pub fn push_free_frame(&self, frame_id: FrameId) {
243 self.free_list.lock().push_back(frame_id);
244 }
245
246 pub fn insert_mapping(&self, page_id: PageId, frame_id: FrameId) {
247 self.page_table.insert(page_id, frame_id);
248 }
249
250 pub fn remove_mapping_if(&self, page_id: PageId, frame_id: FrameId) -> bool {
251 self.page_table
252 .remove_if(&page_id, |_, current| *current == frame_id)
253 .is_some()
254 }
255
256 pub fn remove_mapping(&self, page_id: PageId) {
257 self.page_table.remove(&page_id);
258 }
259
260 pub fn lookup_frame(&self, page_id: PageId) -> Option<FrameId> {
261 self.page_table.get(&page_id).map(|entry| *entry.value())
262 }
263
264 pub fn read_page_from_disk(&self, page_id: PageId) -> QuillSQLResult<BytesMut> {
265 let rx = self.disk_scheduler.schedule_read(page_id)?;
266 let data = rx
267 .recv()
268 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
269 Ok(data)
270 }
271
272 pub fn load_page_into_frame(&self, page_id: PageId, frame_id: FrameId) -> QuillSQLResult<()> {
273 let page_bytes = self.read_page_from_disk(page_id)?;
274 let slice = unsafe { self.frame_slice_mut(frame_id) };
275 let len = PAGE_SIZE.min(page_bytes.len());
276 slice[..len].copy_from_slice(&page_bytes[..len]);
277 if len < PAGE_SIZE {
278 slice[len..].fill(0);
279 }
280 let meta = self.frame_meta(frame_id);
281 meta.initialize(page_id);
282 Ok(())
283 }
284
285 pub fn write_page_to_disk(&self, page_id: PageId, bytes: Bytes) -> QuillSQLResult<()> {
286 self.disk_scheduler
287 .schedule_write(page_id, bytes)?
288 .recv()
289 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
290 Ok(())
291 }
292
293 pub fn allocate_page_id(&self) -> QuillSQLResult<PageId> {
294 let rx = self.disk_scheduler.schedule_allocate()?;
295 let page_id = rx
296 .recv()
297 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
298 if page_id == INVALID_PAGE_ID {
299 return Err(QuillSQLError::Internal(
300 "DiskScheduler returned INVALID_PAGE_ID".to_string(),
301 ));
302 }
303 Ok(page_id)
304 }
305
306 pub fn disk_scheduler(&self) -> Arc<DiskScheduler> {
307 self.disk_scheduler.clone()
308 }
309
310 pub fn reset_frame(&self, frame_id: FrameId) {
311 unsafe {
312 self.frame_slice_mut(frame_id).fill(0);
313 }
314 }
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320 use crate::storage::disk_manager::DiskManager;
321 use tempfile::TempDir;
322
323 fn setup_pool(num_pages: usize) -> (TempDir, Arc<DiskScheduler>, BufferPool) {
324 let temp_dir = TempDir::new().unwrap();
325 let disk_manager = Arc::new(DiskManager::try_new(temp_dir.path().join("pool.db")).unwrap());
326 let scheduler = Arc::new(DiskScheduler::new(disk_manager));
327 let mut config = BufferPoolConfig::default();
328 config.buffer_pool_size = num_pages;
329 let pool = BufferPool::new_with_config(config, scheduler.clone());
330 (temp_dir, scheduler, pool)
331 }
332
333 #[test]
334 fn load_page_into_frame_populates_arena_and_meta() {
335 let (_tmp, scheduler, pool) = setup_pool(4);
336 let rx_alloc = scheduler.schedule_allocate().unwrap();
337 let page_id = rx_alloc.recv().unwrap().unwrap();
338
339 let pattern = Bytes::copy_from_slice(&vec![0xAA; PAGE_SIZE]);
340 scheduler
341 .schedule_write(page_id, pattern.clone())
342 .unwrap()
343 .recv()
344 .unwrap()
345 .unwrap();
346
347 let frame_id = pool.pop_free_frame().expect("free frame");
348 pool.load_page_into_frame(page_id, frame_id).unwrap();
349
350 {
351 let meta = pool.frame_meta(frame_id).snapshot();
352 assert_eq!(meta.page_id, page_id);
353 assert_eq!(meta.pin_count, 0);
354 assert!(!meta.is_dirty);
355 assert_eq!(meta.lsn, 0);
356 }
357
358 let data = unsafe { pool.frame_slice(frame_id) };
359 assert_eq!(data, pattern.as_ref());
360 }
361
362 #[test]
363 fn write_page_to_disk_persists_arena_bytes() {
364 let (_tmp, scheduler, pool) = setup_pool(4);
365 let page_id = scheduler
366 .schedule_allocate()
367 .unwrap()
368 .recv()
369 .unwrap()
370 .unwrap();
371
372 let frame_id = pool.pop_free_frame().expect("free frame");
373 unsafe {
374 pool.frame_slice_mut(frame_id).fill(0x3C);
375 }
376 let payload = Bytes::copy_from_slice(unsafe { pool.frame_slice(frame_id) });
377 pool.write_page_to_disk(page_id, payload.clone()).unwrap();
378
379 let read_back = scheduler
380 .schedule_read(page_id)
381 .unwrap()
382 .recv()
383 .unwrap()
384 .unwrap();
385 assert!(read_back.iter().all(|b| *b == 0x3C));
386 }
387
388 #[test]
389 fn reset_frame_clears_data_and_meta() {
390 let (_tmp, scheduler, pool) = setup_pool(2);
391 let page_id = scheduler
392 .schedule_allocate()
393 .unwrap()
394 .recv()
395 .unwrap()
396 .unwrap();
397 let frame_id = pool.pop_free_frame().expect("free frame");
398
399 {
400 let meta = pool.frame_meta(frame_id);
401 meta.set_page_id(page_id);
402 meta.set_pin_count(5);
403 meta.mark_dirty();
404 meta.set_lsn(99);
405 }
406 unsafe {
407 pool.frame_slice_mut(frame_id).fill(0x55);
408 }
409
410 pool.reset_frame(frame_id);
411 pool.clear_frame_meta(frame_id);
412
413 let meta = pool.frame_meta(frame_id).snapshot();
414 assert_eq!(meta.page_id, INVALID_PAGE_ID);
415 assert_eq!(meta.pin_count, 0);
416 assert!(!meta.is_dirty);
417 assert_eq!(meta.lsn, 0);
418
419 assert!(unsafe { pool.frame_slice(frame_id) }
420 .iter()
421 .all(|b| *b == 0));
422 }
423
424 #[test]
425 fn page_table_insert_lookup_and_remove() {
426 let (_tmp, scheduler, pool) = setup_pool(2);
427 let page_id = scheduler
428 .schedule_allocate()
429 .unwrap()
430 .recv()
431 .unwrap()
432 .unwrap();
433 let frame_id = 0;
434
435 pool.insert_mapping(page_id, frame_id);
436 assert_eq!(pool.lookup_frame(page_id), Some(frame_id));
437 assert!(pool.remove_mapping_if(page_id, frame_id));
438 assert!(pool.lookup_frame(page_id).is_none());
439
440 pool.insert_mapping(page_id, frame_id);
441 pool.remove_mapping(page_id);
442 assert!(pool.lookup_frame(page_id).is_none());
443 }
444}