1use crate::page::{Page, PageType};
17use aegis_common::{AegisError, PageId, Result};
18use parking_lot::{Mutex, RwLock};
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21
22#[derive(Debug, Clone)]
28pub struct BufferPoolConfig {
29 pub pool_size: usize,
30 pub prefetch_size: usize,
31}
32
33impl Default for BufferPoolConfig {
34 fn default() -> Self {
35 Self {
36 pool_size: 1024,
37 prefetch_size: 8,
38 }
39 }
40}
41
42struct BufferFrame {
48 page: RwLock<Option<Page>>,
49 page_id: RwLock<Option<PageId>>,
50}
51
52impl BufferFrame {
53 fn new() -> Self {
54 Self {
55 page: RwLock::new(None),
56 page_id: RwLock::new(None),
57 }
58 }
59
60 #[allow(dead_code)]
61 fn is_occupied(&self) -> bool {
62 self.page_id.read().is_some()
63 }
64
65 fn get_page_id(&self) -> Option<PageId> {
66 *self.page_id.read()
67 }
68}
69
70pub struct BufferPool {
76 frames: Vec<Arc<BufferFrame>>,
77 page_table: RwLock<HashMap<PageId, usize>>,
78 free_list: Mutex<VecDeque<usize>>,
79 lru_list: Mutex<VecDeque<usize>>,
80 config: BufferPoolConfig,
81}
82
83impl BufferPool {
84 pub fn new(config: BufferPoolConfig) -> Self {
86 let mut frames = Vec::with_capacity(config.pool_size);
87 let mut free_list = VecDeque::with_capacity(config.pool_size);
88
89 for i in 0..config.pool_size {
90 frames.push(Arc::new(BufferFrame::new()));
91 free_list.push_back(i);
92 }
93
94 Self {
95 frames,
96 page_table: RwLock::new(HashMap::new()),
97 free_list: Mutex::new(free_list),
98 lru_list: Mutex::new(VecDeque::new()),
99 config,
100 }
101 }
102
103 pub fn with_capacity(num_pages: usize) -> Self {
105 Self::new(BufferPoolConfig {
106 pool_size: num_pages,
107 ..Default::default()
108 })
109 }
110
111 pub fn fetch_page(&self, page_id: PageId) -> Result<PageHandle> {
113 if let Some(&frame_id) = self.page_table.read().get(&page_id) {
114 let frame = &self.frames[frame_id];
115 if let Some(ref page) = *frame.page.read() {
116 page.pin();
117 self.update_lru(frame_id);
118 return Ok(PageHandle {
119 frame: Arc::clone(frame),
120 page_id,
121 });
122 }
123 }
124
125 let frame_id = self.get_free_frame()?;
126 let frame = &self.frames[frame_id];
127
128 let page = Page::new(page_id, PageType::Data);
129 page.pin();
130
131 *frame.page.write() = Some(page);
132 *frame.page_id.write() = Some(page_id);
133
134 self.page_table.write().insert(page_id, frame_id);
135 self.update_lru(frame_id);
136
137 Ok(PageHandle {
138 frame: Arc::clone(frame),
139 page_id,
140 })
141 }
142
143 pub fn new_page(&self, page_id: PageId, page_type: PageType) -> Result<PageHandle> {
145 if self.page_table.read().contains_key(&page_id) {
146 return Err(AegisError::Storage(format!(
147 "Page {} already exists",
148 page_id.0
149 )));
150 }
151
152 let frame_id = self.get_free_frame()?;
153 let frame = &self.frames[frame_id];
154
155 let page = Page::new(page_id, page_type);
156 page.pin();
157
158 *frame.page.write() = Some(page);
159 *frame.page_id.write() = Some(page_id);
160
161 self.page_table.write().insert(page_id, frame_id);
162 self.update_lru(frame_id);
163
164 Ok(PageHandle {
165 frame: Arc::clone(frame),
166 page_id,
167 })
168 }
169
170 pub fn flush_page(&self, page_id: PageId) -> Result<Option<Page>> {
172 if let Some(&frame_id) = self.page_table.read().get(&page_id) {
173 let frame = &self.frames[frame_id];
174 let page_guard = frame.page.read();
175
176 if let Some(ref page) = *page_guard {
177 if page.is_dirty() {
178 let cloned = Page::from_bytes(&page.to_bytes())?;
179 page.clear_dirty();
180 return Ok(Some(cloned));
181 }
182 }
183 }
184 Ok(None)
185 }
186
187 pub fn unpin_page(&self, page_id: PageId) {
189 if let Some(&frame_id) = self.page_table.read().get(&page_id) {
190 let frame = &self.frames[frame_id];
191 if let Some(ref page) = *frame.page.read() {
192 page.unpin();
193 }
194 }
195 }
196
197 pub fn stats(&self) -> BufferPoolStats {
199 let page_table = self.page_table.read();
200 let free_count = self.free_list.lock().len();
201
202 let mut dirty_count = 0;
203 let mut pinned_count = 0;
204
205 for &frame_id in page_table.values() {
206 let frame = &self.frames[frame_id];
207 if let Some(ref page) = *frame.page.read() {
208 if page.is_dirty() {
209 dirty_count += 1;
210 }
211 if page.is_pinned() {
212 pinned_count += 1;
213 }
214 }
215 }
216
217 BufferPoolStats {
218 total_frames: self.config.pool_size,
219 used_frames: page_table.len(),
220 free_frames: free_count,
221 dirty_pages: dirty_count,
222 pinned_pages: pinned_count,
223 }
224 }
225
226 fn get_free_frame(&self) -> Result<usize> {
227 if let Some(frame_id) = self.free_list.lock().pop_front() {
228 return Ok(frame_id);
229 }
230
231 self.evict_page()
232 }
233
234 fn evict_page(&self) -> Result<usize> {
235 let mut lru_list = self.lru_list.lock();
236
237 for _ in 0..lru_list.len() {
238 if let Some(frame_id) = lru_list.pop_front() {
239 let frame = &self.frames[frame_id];
240
241 if let Some(ref page) = *frame.page.read() {
242 if page.is_pinned() {
243 lru_list.push_back(frame_id);
244 continue;
245 }
246 }
247
248 if let Some(page_id) = frame.get_page_id() {
249 self.page_table.write().remove(&page_id);
250 }
251
252 *frame.page.write() = None;
253 *frame.page_id.write() = None;
254
255 return Ok(frame_id);
256 }
257 }
258
259 Err(AegisError::ResourceExhausted(
260 "No evictable pages in buffer pool".to_string(),
261 ))
262 }
263
264 fn update_lru(&self, frame_id: usize) {
265 let mut lru_list = self.lru_list.lock();
266 lru_list.retain(|&id| id != frame_id);
267 lru_list.push_back(frame_id);
268 }
269}
270
271pub struct PageHandle {
277 frame: Arc<BufferFrame>,
278 page_id: PageId,
279}
280
281impl PageHandle {
282 pub fn read(&self) -> impl std::ops::Deref<Target = Page> + '_ {
284 parking_lot::RwLockReadGuard::map(self.frame.page.read(), |opt| {
285 opt.as_ref().expect("Page should be present")
286 })
287 }
288
289 pub fn write(&self) -> impl std::ops::DerefMut<Target = Page> + '_ {
291 parking_lot::RwLockWriteGuard::map(self.frame.page.write(), |opt| {
292 opt.as_mut().expect("Page should be present")
293 })
294 }
295
296 pub fn page_id(&self) -> PageId {
298 self.page_id
299 }
300}
301
302impl Drop for PageHandle {
303 fn drop(&mut self) {
304 if let Some(ref page) = *self.frame.page.read() {
305 page.unpin();
306 }
307 }
308}
309
310#[derive(Debug, Clone)]
316pub struct BufferPoolStats {
317 pub total_frames: usize,
318 pub used_frames: usize,
319 pub free_frames: usize,
320 pub dirty_pages: usize,
321 pub pinned_pages: usize,
322}
323
324#[cfg(test)]
329mod tests {
330 use super::*;
331
332 #[test]
333 fn test_buffer_pool_new_page() {
334 let pool = BufferPool::with_capacity(10);
335 let handle = pool
336 .new_page(PageId(1), PageType::Data)
337 .expect("new_page should succeed");
338
339 assert_eq!(handle.page_id(), PageId(1));
340 }
341
342 #[test]
343 fn test_buffer_pool_fetch_page() {
344 let pool = BufferPool::with_capacity(10);
345
346 pool.new_page(PageId(1), PageType::Data)
347 .expect("new_page should succeed");
348
349 let handle = pool
350 .fetch_page(PageId(1))
351 .expect("fetch_page should succeed");
352 assert_eq!(handle.page_id(), PageId(1));
353 }
354
355 #[test]
356 fn test_buffer_pool_eviction() {
357 let pool = BufferPool::with_capacity(3);
358
359 let _h1 = pool
360 .new_page(PageId(1), PageType::Data)
361 .expect("new_page 1 should succeed");
362 let _h2 = pool
363 .new_page(PageId(2), PageType::Data)
364 .expect("new_page 2 should succeed");
365 let _h3 = pool
366 .new_page(PageId(3), PageType::Data)
367 .expect("new_page 3 should succeed");
368
369 drop(_h1);
370 drop(_h2);
371 drop(_h3);
372
373 let _h4 = pool
374 .new_page(PageId(4), PageType::Data)
375 .expect("new_page 4 after eviction should succeed");
376
377 let stats = pool.stats();
378 assert_eq!(stats.used_frames, 3);
379 }
380
381 #[test]
382 fn test_buffer_pool_stats() {
383 let pool = BufferPool::with_capacity(10);
384
385 let stats = pool.stats();
386 assert_eq!(stats.total_frames, 10);
387 assert_eq!(stats.used_frames, 0);
388 assert_eq!(stats.free_frames, 10);
389
390 let _h1 = pool
391 .new_page(PageId(1), PageType::Data)
392 .expect("new_page 1 should succeed");
393 let _h2 = pool
394 .new_page(PageId(2), PageType::Data)
395 .expect("new_page 2 should succeed");
396
397 let stats = pool.stats();
398 assert_eq!(stats.used_frames, 2);
399 }
400}