1use bytes::{Bytes, BytesMut};
4use dashmap::DashMap;
5use parking_lot::{Mutex, RwLock};
6use std::cell::UnsafeCell;
7use std::collections::VecDeque;
8use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
9use std::sync::Arc;
10
11use crate::buffer::page::{PageId, INVALID_PAGE_ID, PAGE_SIZE};
12use crate::config::BufferPoolConfig;
13use crate::error::{QuillSQLError, QuillSQLResult};
14use crate::recovery::Lsn;
15use crate::storage::disk_scheduler::DiskScheduler;
16
17pub type FrameId = usize;
18
19pub const BUFFER_POOL_SIZE: usize = 5000;
20
21#[derive(Debug, Default, Clone)]
22pub struct FrameMetaSnapshot {
23 pub page_id: PageId,
24 pub pin_count: u32,
25 pub is_dirty: bool,
26 pub lsn: Lsn,
27}
28
29#[derive(Debug)]
30pub struct FrameMeta {
31 page_id: AtomicU32,
32 pin_count: AtomicU32,
33 is_dirty: AtomicBool,
34 lsn: AtomicU64,
35}
36
37impl Default for FrameMeta {
38 fn default() -> Self {
39 Self {
40 page_id: AtomicU32::new(INVALID_PAGE_ID),
41 pin_count: AtomicU32::new(0),
42 is_dirty: AtomicBool::new(false),
43 lsn: AtomicU64::new(0),
44 }
45 }
46}
47
48impl FrameMeta {
49 pub fn snapshot(&self) -> FrameMetaSnapshot {
50 FrameMetaSnapshot {
51 page_id: self.page_id(),
52 pin_count: self.pin_count(),
53 is_dirty: self.is_dirty(),
54 lsn: self.lsn(),
55 }
56 }
57
58 pub fn page_id(&self) -> PageId {
59 self.page_id.load(Ordering::Acquire)
60 }
61
62 pub fn set_page_id(&self, page_id: PageId) {
63 self.page_id.store(page_id, Ordering::Release);
64 }
65
66 pub fn pin_count(&self) -> u32 {
67 self.pin_count.load(Ordering::Acquire)
68 }
69
70 pub fn set_pin_count(&self, count: u32) {
71 self.pin_count.store(count, Ordering::Release);
72 }
73
74 pub fn increment_pin(&self) -> u32 {
75 self.pin_count.fetch_add(1, Ordering::AcqRel) + 1
76 }
77
78 pub fn try_decrement_pin(&self) -> Option<u32> {
79 let mut current = self.pin_count.load(Ordering::Acquire);
80 loop {
81 if current == 0 {
82 return None;
83 }
84 match self.pin_count.compare_exchange(
85 current,
86 current - 1,
87 Ordering::AcqRel,
88 Ordering::Acquire,
89 ) {
90 Ok(_) => return Some(current - 1),
91 Err(observed) => current = observed,
92 }
93 }
94 }
95
96 pub fn reset_pin(&self) {
97 self.set_pin_count(0);
98 }
99
100 pub fn is_dirty(&self) -> bool {
101 self.is_dirty.load(Ordering::Acquire)
102 }
103
104 pub fn mark_dirty(&self) {
105 self.is_dirty.store(true, Ordering::Release);
106 }
107
108 pub fn clear_dirty(&self) {
109 self.is_dirty.store(false, Ordering::Release);
110 }
111
112 pub fn lsn(&self) -> Lsn {
113 self.lsn.load(Ordering::Acquire)
114 }
115
116 pub fn set_lsn(&self, lsn: Lsn) {
117 self.lsn.store(lsn, Ordering::Release);
118 }
119
120 pub fn initialize(&self, page_id: PageId) {
121 self.set_page_id(page_id);
122 self.reset_pin();
123 self.clear_dirty();
124 self.set_lsn(0);
125 }
126
127 pub fn clear(&self) {
128 self.initialize(INVALID_PAGE_ID);
129 }
130}
131
132#[derive(Debug)]
133pub struct BufferPool {
134 arena: Box<[UnsafeCell<u8>]>,
135 locks: Vec<RwLock<()>>,
136 meta: Vec<FrameMeta>,
137 page_table: DashMap<PageId, FrameId>,
138 free_list: Mutex<VecDeque<FrameId>>,
139 disk_scheduler: Arc<DiskScheduler>,
140}
141
142unsafe impl Sync for BufferPool {}
143
144impl BufferPool {
145 pub fn new(num_pages: usize, disk_scheduler: Arc<DiskScheduler>) -> Self {
146 Self::new_with_config(
147 BufferPoolConfig {
148 buffer_pool_size: num_pages,
149 ..Default::default()
150 },
151 disk_scheduler,
152 )
153 }
154
155 pub fn new_with_config(config: BufferPoolConfig, disk_scheduler: Arc<DiskScheduler>) -> Self {
156 let num_pages = config.buffer_pool_size;
157 let mut free_list = VecDeque::with_capacity(num_pages);
158 let mut meta = Vec::with_capacity(num_pages);
159 let mut locks = Vec::with_capacity(num_pages);
160 for frame_id in 0..num_pages {
161 free_list.push_back(frame_id);
162 meta.push(FrameMeta::default());
163 locks.push(RwLock::new(()));
164 }
165 let mut arena_vec: Vec<UnsafeCell<u8>> = Vec::with_capacity(num_pages * PAGE_SIZE);
166 arena_vec.resize_with(num_pages * PAGE_SIZE, || UnsafeCell::new(0u8));
167 let arena = arena_vec.into_boxed_slice();
168
169 Self {
170 arena,
171 locks,
172 meta,
173 page_table: DashMap::new(),
174 free_list: Mutex::new(free_list),
175 disk_scheduler,
176 }
177 }
178
179 pub fn capacity(&self) -> usize {
180 self.locks.len()
181 }
182
183 pub fn frame_lock(&self, frame_id: FrameId) -> &RwLock<()> {
184 &self.locks[frame_id]
185 }
186
187 pub unsafe fn frame_slice(&self, frame_id: FrameId) -> &[u8] {
194 let ptr = self.frame_ptr(frame_id) as *const u8;
195 std::slice::from_raw_parts(ptr, PAGE_SIZE)
196 }
197
198 #[allow(clippy::mut_from_ref)]
206 pub unsafe fn frame_slice_mut(&self, frame_id: FrameId) -> &mut [u8] {
207 let ptr = self.frame_ptr(frame_id);
208 std::slice::from_raw_parts_mut(ptr, PAGE_SIZE)
209 }
210
211 unsafe fn frame_ptr(&self, frame_id: FrameId) -> *mut u8 {
217 self.arena.as_ptr().add(frame_id * PAGE_SIZE) as *mut u8
218 }
219
220 pub fn frame_meta(&self, frame_id: FrameId) -> &FrameMeta {
221 &self.meta[frame_id]
222 }
223
224 pub fn clear_frame_meta(&self, frame_id: FrameId) {
225 self.meta[frame_id].clear();
226 }
227
228 pub fn pop_free_frame(&self) -> Option<FrameId> {
229 self.free_list.lock().pop_front()
230 }
231
232 pub fn has_free_frame(&self) -> bool {
233 !self.free_list.lock().is_empty()
234 }
235
236 pub fn push_free_frame(&self, frame_id: FrameId) {
237 self.free_list.lock().push_back(frame_id);
238 }
239
240 pub fn insert_mapping(&self, page_id: PageId, frame_id: FrameId) {
241 self.page_table.insert(page_id, frame_id);
242 }
243
244 pub fn remove_mapping_if(&self, page_id: PageId, frame_id: FrameId) -> bool {
245 self.page_table
246 .remove_if(&page_id, |_, current| *current == frame_id)
247 .is_some()
248 }
249
250 pub fn remove_mapping(&self, page_id: PageId) {
251 self.page_table.remove(&page_id);
252 }
253
254 pub fn lookup_frame(&self, page_id: PageId) -> Option<FrameId> {
255 self.page_table.get(&page_id).map(|entry| *entry.value())
256 }
257
258 pub fn read_page_from_disk(&self, page_id: PageId) -> QuillSQLResult<BytesMut> {
259 let rx = self.disk_scheduler.schedule_read(page_id)?;
260 let data = rx
261 .recv()
262 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
263 Ok(data)
264 }
265
266 pub fn load_page_into_frame(&self, page_id: PageId, frame_id: FrameId) -> QuillSQLResult<()> {
267 let page_bytes = self.read_page_from_disk(page_id)?;
268 let slice = unsafe { self.frame_slice_mut(frame_id) };
269 let len = PAGE_SIZE.min(page_bytes.len());
270 slice[..len].copy_from_slice(&page_bytes[..len]);
271 if len < PAGE_SIZE {
272 slice[len..].fill(0);
273 }
274 let meta = self.frame_meta(frame_id);
275 meta.initialize(page_id);
276 Ok(())
277 }
278
279 pub fn write_page_to_disk(&self, page_id: PageId, bytes: Bytes) -> QuillSQLResult<()> {
280 self.disk_scheduler
281 .schedule_write(page_id, bytes)?
282 .recv()
283 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
284 Ok(())
285 }
286
287 pub fn allocate_page_id(&self) -> QuillSQLResult<PageId> {
288 let rx = self.disk_scheduler.schedule_allocate()?;
289 let page_id = rx
290 .recv()
291 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
292 if page_id == INVALID_PAGE_ID {
293 return Err(QuillSQLError::Internal(
294 "DiskScheduler returned INVALID_PAGE_ID".to_string(),
295 ));
296 }
297 Ok(page_id)
298 }
299
300 pub fn disk_scheduler(&self) -> Arc<DiskScheduler> {
301 self.disk_scheduler.clone()
302 }
303
304 pub fn reset_frame(&self, frame_id: FrameId) {
305 unsafe {
306 self.frame_slice_mut(frame_id).fill(0);
307 }
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use crate::storage::disk_manager::DiskManager;
315 use tempfile::TempDir;
316
317 fn setup_pool(num_pages: usize) -> (TempDir, Arc<DiskScheduler>, BufferPool) {
318 let temp_dir = TempDir::new().unwrap();
319 let disk_manager = Arc::new(DiskManager::try_new(temp_dir.path().join("pool.db")).unwrap());
320 let scheduler = Arc::new(DiskScheduler::new(disk_manager));
321 let mut config = BufferPoolConfig::default();
322 config.buffer_pool_size = num_pages;
323 let pool = BufferPool::new_with_config(config, scheduler.clone());
324 (temp_dir, scheduler, pool)
325 }
326
327 #[test]
328 fn load_page_into_frame_populates_arena_and_meta() {
329 let (_tmp, scheduler, pool) = setup_pool(4);
330 let rx_alloc = scheduler.schedule_allocate().unwrap();
331 let page_id = rx_alloc.recv().unwrap().unwrap();
332
333 let pattern = Bytes::copy_from_slice(&vec![0xAA; PAGE_SIZE]);
334 scheduler
335 .schedule_write(page_id, pattern.clone())
336 .unwrap()
337 .recv()
338 .unwrap()
339 .unwrap();
340
341 let frame_id = pool.pop_free_frame().expect("free frame");
342 pool.load_page_into_frame(page_id, frame_id).unwrap();
343
344 {
345 let meta = pool.frame_meta(frame_id).snapshot();
346 assert_eq!(meta.page_id, page_id);
347 assert_eq!(meta.pin_count, 0);
348 assert!(!meta.is_dirty);
349 assert_eq!(meta.lsn, 0);
350 }
351
352 let data = unsafe { pool.frame_slice(frame_id) };
353 assert_eq!(data, pattern.as_ref());
354 }
355
356 #[test]
357 fn write_page_to_disk_persists_arena_bytes() {
358 let (_tmp, scheduler, pool) = setup_pool(4);
359 let page_id = scheduler
360 .schedule_allocate()
361 .unwrap()
362 .recv()
363 .unwrap()
364 .unwrap();
365
366 let frame_id = pool.pop_free_frame().expect("free frame");
367 unsafe {
368 pool.frame_slice_mut(frame_id).fill(0x3C);
369 }
370 let payload = Bytes::copy_from_slice(unsafe { pool.frame_slice(frame_id) });
371 pool.write_page_to_disk(page_id, payload.clone()).unwrap();
372
373 let read_back = scheduler
374 .schedule_read(page_id)
375 .unwrap()
376 .recv()
377 .unwrap()
378 .unwrap();
379 assert!(read_back.iter().all(|b| *b == 0x3C));
380 }
381
382 #[test]
383 fn reset_frame_clears_data_and_meta() {
384 let (_tmp, scheduler, pool) = setup_pool(2);
385 let page_id = scheduler
386 .schedule_allocate()
387 .unwrap()
388 .recv()
389 .unwrap()
390 .unwrap();
391 let frame_id = pool.pop_free_frame().expect("free frame");
392
393 {
394 let meta = pool.frame_meta(frame_id);
395 meta.set_page_id(page_id);
396 meta.set_pin_count(5);
397 meta.mark_dirty();
398 meta.set_lsn(99);
399 }
400 unsafe {
401 pool.frame_slice_mut(frame_id).fill(0x55);
402 }
403
404 pool.reset_frame(frame_id);
405 pool.clear_frame_meta(frame_id);
406
407 let meta = pool.frame_meta(frame_id).snapshot();
408 assert_eq!(meta.page_id, INVALID_PAGE_ID);
409 assert_eq!(meta.pin_count, 0);
410 assert!(!meta.is_dirty);
411 assert_eq!(meta.lsn, 0);
412
413 assert!(unsafe { pool.frame_slice(frame_id) }
414 .iter()
415 .all(|b| *b == 0));
416 }
417
418 #[test]
419 fn page_table_insert_lookup_and_remove() {
420 let (_tmp, scheduler, pool) = setup_pool(2);
421 let page_id = scheduler
422 .schedule_allocate()
423 .unwrap()
424 .recv()
425 .unwrap()
426 .unwrap();
427 let frame_id = 0;
428
429 pool.insert_mapping(page_id, frame_id);
430 assert_eq!(pool.lookup_frame(page_id), Some(frame_id));
431 assert!(pool.remove_mapping_if(page_id, frame_id));
432 assert!(pool.lookup_frame(page_id).is_none());
433
434 pool.insert_mapping(page_id, frame_id);
435 pool.remove_mapping(page_id);
436 assert!(pool.lookup_frame(page_id).is_none());
437 }
438}