1use bytes::Bytes;
4use dashmap::DashMap;
5use parking_lot::{Mutex, MutexGuard, RwLock};
6use std::cell::UnsafeCell;
7use std::collections::VecDeque;
8use std::sync::Arc;
9
10use crate::buffer::page::{PageId, INVALID_PAGE_ID, PAGE_SIZE};
11use crate::config::BufferPoolConfig;
12use crate::error::{QuillSQLError, QuillSQLResult};
13use crate::recovery::Lsn;
14use crate::storage::disk_scheduler::DiskScheduler;
15
16pub type FrameId = usize;
17
18pub const BUFFER_POOL_SIZE: usize = 5000;
19
20#[derive(Debug, Default, Clone)]
21pub struct FrameMeta {
22 pub page_id: PageId,
23 pub pin_count: u32,
24 pub is_dirty: bool,
25 pub lsn: Lsn,
26}
27
28#[derive(Debug)]
29pub struct BufferPool {
30 arena: Box<[UnsafeCell<u8>]>,
31 locks: Vec<RwLock<()>>,
32 meta: Vec<Mutex<FrameMeta>>,
33 page_table: DashMap<PageId, FrameId>,
34 free_list: Mutex<VecDeque<FrameId>>,
35 disk_scheduler: Arc<DiskScheduler>,
36}
37
38unsafe impl Sync for BufferPool {}
39
40impl BufferPool {
41 pub fn new(num_pages: usize, disk_scheduler: Arc<DiskScheduler>) -> Self {
42 Self::new_with_config(
43 BufferPoolConfig {
44 buffer_pool_size: num_pages,
45 ..Default::default()
46 },
47 disk_scheduler,
48 )
49 }
50
51 pub fn new_with_config(config: BufferPoolConfig, disk_scheduler: Arc<DiskScheduler>) -> Self {
52 let num_pages = config.buffer_pool_size;
53 let mut free_list = VecDeque::with_capacity(num_pages);
54 let mut meta = Vec::with_capacity(num_pages);
55 let mut locks = Vec::with_capacity(num_pages);
56 for frame_id in 0..num_pages {
57 free_list.push_back(frame_id);
58 meta.push(Mutex::new(FrameMeta::default()));
59 locks.push(RwLock::new(()));
60 }
61 let mut arena_vec: Vec<UnsafeCell<u8>> = Vec::with_capacity(num_pages * PAGE_SIZE);
62 arena_vec.resize_with(num_pages * PAGE_SIZE, || UnsafeCell::new(0u8));
63 let arena = arena_vec.into_boxed_slice();
64
65 Self {
66 arena,
67 locks,
68 meta,
69 page_table: DashMap::new(),
70 free_list: Mutex::new(free_list),
71 disk_scheduler,
72 }
73 }
74
75 pub fn capacity(&self) -> usize {
76 self.locks.len()
77 }
78
79 pub fn frame_lock(&self, frame_id: FrameId) -> &RwLock<()> {
80 &self.locks[frame_id]
81 }
82
83 pub unsafe fn frame_slice(&self, frame_id: FrameId) -> &[u8] {
90 let ptr = self.frame_ptr(frame_id) as *const u8;
91 std::slice::from_raw_parts(ptr, PAGE_SIZE)
92 }
93
94 #[allow(clippy::mut_from_ref)]
102 pub unsafe fn frame_slice_mut(&self, frame_id: FrameId) -> &mut [u8] {
103 let ptr = self.frame_ptr(frame_id);
104 std::slice::from_raw_parts_mut(ptr, PAGE_SIZE)
105 }
106
107 unsafe fn frame_ptr(&self, frame_id: FrameId) -> *mut u8 {
113 self.arena.as_ptr().add(frame_id * PAGE_SIZE) as *mut u8
114 }
115
116 pub fn frame_meta(&self, frame_id: FrameId) -> MutexGuard<'_, FrameMeta> {
117 self.meta[frame_id].lock()
118 }
119
120 pub fn clear_frame_meta(&self, frame_id: FrameId) {
121 let mut meta = self.meta[frame_id].lock();
122 *meta = FrameMeta::default();
123 }
124
125 pub fn pop_free_frame(&self) -> Option<FrameId> {
126 self.free_list.lock().pop_front()
127 }
128
129 pub fn has_free_frame(&self) -> bool {
130 !self.free_list.lock().is_empty()
131 }
132
133 pub fn push_free_frame(&self, frame_id: FrameId) {
134 self.free_list.lock().push_back(frame_id);
135 }
136
137 pub fn insert_mapping(&self, page_id: PageId, frame_id: FrameId) {
138 self.page_table.insert(page_id, frame_id);
139 }
140
141 pub fn remove_mapping_if(&self, page_id: PageId, frame_id: FrameId) -> bool {
142 self.page_table
143 .remove_if(&page_id, |_, current| *current == frame_id)
144 .is_some()
145 }
146
147 pub fn remove_mapping(&self, page_id: PageId) {
148 self.page_table.remove(&page_id);
149 }
150
151 pub fn lookup_frame(&self, page_id: PageId) -> Option<FrameId> {
152 self.page_table.get(&page_id).map(|entry| *entry.value())
153 }
154
155 pub fn read_page_from_disk(&self, page_id: PageId) -> QuillSQLResult<Vec<u8>> {
156 let rx = self.disk_scheduler.schedule_read(page_id)?;
157 let data = rx
158 .recv()
159 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
160 Ok(data.to_vec())
161 }
162
163 pub fn load_page_into_frame(&self, page_id: PageId, frame_id: FrameId) -> QuillSQLResult<()> {
164 let page_bytes = self.read_page_from_disk(page_id)?;
165 let slice = unsafe { self.frame_slice_mut(frame_id) };
166 let len = PAGE_SIZE.min(page_bytes.len());
167 slice[..len].copy_from_slice(&page_bytes[..len]);
168 if len < PAGE_SIZE {
169 slice[len..].fill(0);
170 }
171 let mut meta = self.meta[frame_id].lock();
172 meta.page_id = page_id;
173 meta.is_dirty = false;
174 meta.pin_count = 0;
175 meta.lsn = 0;
176 Ok(())
177 }
178
179 pub fn write_page_to_disk(&self, page_id: PageId, bytes: Bytes) -> QuillSQLResult<()> {
180 self.disk_scheduler
181 .schedule_write(page_id, bytes)?
182 .recv()
183 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
184 Ok(())
185 }
186
187 pub fn allocate_page_id(&self) -> QuillSQLResult<PageId> {
188 let rx = self.disk_scheduler.schedule_allocate()?;
189 let page_id = rx
190 .recv()
191 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
192 if page_id == INVALID_PAGE_ID {
193 return Err(QuillSQLError::Internal(
194 "DiskScheduler returned INVALID_PAGE_ID".to_string(),
195 ));
196 }
197 Ok(page_id)
198 }
199
200 pub fn disk_scheduler(&self) -> Arc<DiskScheduler> {
201 self.disk_scheduler.clone()
202 }
203
204 pub fn reset_frame(&self, frame_id: FrameId) {
205 unsafe {
206 self.frame_slice_mut(frame_id).fill(0);
207 }
208 }
209}
210
211#[cfg(test)]
212mod tests {
213 use super::*;
214 use crate::storage::disk_manager::DiskManager;
215 use tempfile::TempDir;
216
217 fn setup_pool(num_pages: usize) -> (TempDir, Arc<DiskScheduler>, BufferPool) {
218 let temp_dir = TempDir::new().unwrap();
219 let disk_manager = Arc::new(DiskManager::try_new(temp_dir.path().join("pool.db")).unwrap());
220 let scheduler = Arc::new(DiskScheduler::new(disk_manager));
221 let mut config = BufferPoolConfig::default();
222 config.buffer_pool_size = num_pages;
223 let pool = BufferPool::new_with_config(config, scheduler.clone());
224 (temp_dir, scheduler, pool)
225 }
226
227 #[test]
228 fn load_page_into_frame_populates_arena_and_meta() {
229 let (_tmp, scheduler, pool) = setup_pool(4);
230 let rx_alloc = scheduler.schedule_allocate().unwrap();
231 let page_id = rx_alloc.recv().unwrap().unwrap();
232
233 let pattern = Bytes::copy_from_slice(&vec![0xAA; PAGE_SIZE]);
234 scheduler
235 .schedule_write(page_id, pattern.clone())
236 .unwrap()
237 .recv()
238 .unwrap()
239 .unwrap();
240
241 let frame_id = pool.pop_free_frame().expect("free frame");
242 pool.load_page_into_frame(page_id, frame_id).unwrap();
243
244 {
245 let meta = pool.frame_meta(frame_id);
246 assert_eq!(meta.page_id, page_id);
247 assert_eq!(meta.pin_count, 0);
248 assert!(!meta.is_dirty);
249 assert_eq!(meta.lsn, 0);
250 }
251
252 let data = unsafe { pool.frame_slice(frame_id) };
253 assert_eq!(data, pattern.as_ref());
254 }
255
256 #[test]
257 fn write_page_to_disk_persists_arena_bytes() {
258 let (_tmp, scheduler, pool) = setup_pool(4);
259 let page_id = scheduler
260 .schedule_allocate()
261 .unwrap()
262 .recv()
263 .unwrap()
264 .unwrap();
265
266 let frame_id = pool.pop_free_frame().expect("free frame");
267 unsafe {
268 pool.frame_slice_mut(frame_id).fill(0x3C);
269 }
270 let payload = Bytes::copy_from_slice(unsafe { pool.frame_slice(frame_id) });
271 pool.write_page_to_disk(page_id, payload.clone()).unwrap();
272
273 let read_back = scheduler
274 .schedule_read(page_id)
275 .unwrap()
276 .recv()
277 .unwrap()
278 .unwrap();
279 assert!(read_back.iter().all(|b| *b == 0x3C));
280 }
281
282 #[test]
283 fn reset_frame_clears_data_and_meta() {
284 let (_tmp, scheduler, pool) = setup_pool(2);
285 let page_id = scheduler
286 .schedule_allocate()
287 .unwrap()
288 .recv()
289 .unwrap()
290 .unwrap();
291 let frame_id = pool.pop_free_frame().expect("free frame");
292
293 {
294 let mut meta = pool.frame_meta(frame_id);
295 meta.page_id = page_id;
296 meta.pin_count = 5;
297 meta.is_dirty = true;
298 meta.lsn = 99;
299 }
300 unsafe {
301 pool.frame_slice_mut(frame_id).fill(0x55);
302 }
303
304 pool.reset_frame(frame_id);
305 pool.clear_frame_meta(frame_id);
306
307 let meta = pool.frame_meta(frame_id);
308 assert_eq!(meta.page_id, INVALID_PAGE_ID);
309 assert_eq!(meta.pin_count, 0);
310 assert!(!meta.is_dirty);
311 assert_eq!(meta.lsn, 0);
312 drop(meta);
313
314 assert!(unsafe { pool.frame_slice(frame_id) }
315 .iter()
316 .all(|b| *b == 0));
317 }
318
319 #[test]
320 fn page_table_insert_lookup_and_remove() {
321 let (_tmp, scheduler, pool) = setup_pool(2);
322 let page_id = scheduler
323 .schedule_allocate()
324 .unwrap()
325 .recv()
326 .unwrap()
327 .unwrap();
328 let frame_id = 0;
329
330 pool.insert_mapping(page_id, frame_id);
331 assert_eq!(pool.lookup_frame(page_id), Some(frame_id));
332 assert!(pool.remove_mapping_if(page_id, frame_id));
333 assert!(pool.lookup_frame(page_id).is_none());
334
335 pool.insert_mapping(page_id, frame_id);
336 pool.remove_mapping(page_id);
337 assert!(pool.lookup_frame(page_id).is_none());
338 }
339}