1use dashmap::DashMap;
2use kyu_common::{KyuError, KyuResult};
3
4use crate::page_id::{FileId, FrameIdx, PageId, PoolId};
5use crate::page_store::PageStore;
6use crate::pool::Pool;
7
8pub struct BufferManager {
16 read_pool: Pool,
17 write_pool: Pool,
18 page_table: DashMap<PageId, (PoolId, FrameIdx)>,
20 store: Box<dyn PageStore>,
21}
22
23impl BufferManager {
24 pub fn new(total_frames: u32, read_ratio: f64, store: Box<dyn PageStore>) -> Self {
26 let read_frames = ((total_frames as f64) * read_ratio).round() as u32;
27 let write_frames = total_frames.saturating_sub(read_frames).max(1);
28 let read_frames = read_frames.max(1);
29
30 Self {
31 read_pool: Pool::new(read_frames),
32 write_pool: Pool::new(write_frames),
33 page_table: DashMap::new(),
34 store,
35 }
36 }
37
38 pub fn pin_read(&self, page_id: PageId) -> KyuResult<PinnedPage<'_>> {
40 self.pin_page(page_id, PoolId::Read)
41 }
42
43 pub fn pin_write(&self, page_id: PageId) -> KyuResult<PinnedPage<'_>> {
45 self.pin_page(page_id, PoolId::Write)
46 }
47
48 pub fn allocate_new_page(&self, file_id: FileId) -> KyuResult<(PageId, PinnedPage<'_>)> {
50 let page_idx = self.store.allocate_page(file_id)?;
51 let page_id = PageId::new(file_id, page_idx);
52 let pinned = self.pin_page(page_id, PoolId::Write)?;
53 Ok((page_id, pinned))
54 }
55
56 pub fn flush_all(&self) -> KyuResult<()> {
58 self.flush_pool(&self.read_pool)?;
59 self.flush_pool(&self.write_pool)?;
60 Ok(())
61 }
62
63 fn flush_pool(&self, pool: &Pool) -> KyuResult<()> {
65 for i in 0..pool.num_frames() {
66 let frame = pool.frame(FrameIdx(i));
67 if frame.is_dirty() && frame.has_valid_page() {
68 let data = unsafe { frame.data() };
71 self.store.write_page(frame.page_id(), data)?;
72 frame.clear_dirty();
73 }
74 }
75 Ok(())
76 }
77
78 fn pin_page(&self, page_id: PageId, preferred_pool: PoolId) -> KyuResult<PinnedPage<'_>> {
80 if let Some(entry) = self.page_table.get(&page_id) {
82 let (pool_id, frame_idx) = *entry;
83 let pool = self.get_pool(pool_id);
84 let frame = pool.frame(frame_idx);
85 frame.pin();
86 frame.set_recently_used();
87 return Ok(PinnedPage {
88 bm: self,
89 page_id,
90 pool_id,
91 frame_idx,
92 });
93 }
94
95 let pool = self.get_pool(preferred_pool);
97 let frame_idx = self.find_or_evict_frame(pool, preferred_pool)?;
98 let frame = pool.frame(frame_idx);
99
100 let data = unsafe { frame.data_mut() };
103 self.store.read_page(page_id, data)?;
104
105 frame.set_page_id(page_id);
107 frame.pin();
108 frame.set_recently_used();
109 frame.clear_dirty();
110
111 self.page_table.insert(page_id, (preferred_pool, frame_idx));
113
114 Ok(PinnedPage {
115 bm: self,
116 page_id,
117 pool_id: preferred_pool,
118 frame_idx,
119 })
120 }
121
122 fn find_or_evict_frame(&self, pool: &Pool, _pool_id: PoolId) -> KyuResult<FrameIdx> {
124 if let Some(idx) = pool.find_empty() {
126 return Ok(idx);
127 }
128
129 let idx = pool.find_evictable().ok_or_else(|| {
131 KyuError::Storage("buffer pool exhausted: all frames are pinned".into())
132 })?;
133
134 let frame = pool.frame(idx);
135
136 if frame.is_dirty() && frame.has_valid_page() {
138 let data = unsafe { frame.data() };
140 self.store.write_page(frame.page_id(), data)?;
141 }
142
143 if frame.has_valid_page() {
145 self.page_table.remove(&frame.page_id());
146 }
147
148 frame.reset();
149 Ok(idx)
150 }
151
152 fn get_pool(&self, pool_id: PoolId) -> &Pool {
153 match pool_id {
154 PoolId::Read => &self.read_pool,
155 PoolId::Write => &self.write_pool,
156 }
157 }
158
159 pub fn total_frames(&self) -> u32 {
161 self.read_pool.num_frames() + self.write_pool.num_frames()
162 }
163
164 pub fn stats(&self) -> BufferManagerStats {
166 BufferManagerStats {
167 read_pool_frames: self.read_pool.num_frames(),
168 write_pool_frames: self.write_pool.num_frames(),
169 read_pool_loaded: self.read_pool.loaded_count(),
170 write_pool_loaded: self.write_pool.loaded_count(),
171 read_pool_dirty: self.read_pool.dirty_count(),
172 write_pool_dirty: self.write_pool.dirty_count(),
173 read_pool_pinned: self.read_pool.pinned_count(),
174 write_pool_pinned: self.write_pool.pinned_count(),
175 page_table_entries: self.page_table.len() as u32,
176 }
177 }
178}
179
180#[derive(Clone, Debug)]
182pub struct BufferManagerStats {
183 pub read_pool_frames: u32,
184 pub write_pool_frames: u32,
185 pub read_pool_loaded: u32,
186 pub write_pool_loaded: u32,
187 pub read_pool_dirty: u32,
188 pub write_pool_dirty: u32,
189 pub read_pool_pinned: u32,
190 pub write_pool_pinned: u32,
191 pub page_table_entries: u32,
192}
193
194pub struct PinnedPage<'bm> {
198 bm: &'bm BufferManager,
199 page_id: PageId,
200 pool_id: PoolId,
201 frame_idx: FrameIdx,
202}
203
204impl<'bm> PinnedPage<'bm> {
205 pub fn page_id(&self) -> PageId {
207 self.page_id
208 }
209
210 pub fn data(&self) -> &[u8] {
212 let pool = self.bm.get_pool(self.pool_id);
213 let frame = pool.frame(self.frame_idx);
214 unsafe { frame.data() }
217 }
218
219 pub fn data_mut(&mut self) -> &mut [u8] {
221 let pool = self.bm.get_pool(self.pool_id);
222 let frame = pool.frame(self.frame_idx);
223 frame.set_dirty();
224 unsafe { frame.data_mut() }
226 }
227
228 pub fn is_dirty(&self) -> bool {
230 let pool = self.bm.get_pool(self.pool_id);
231 pool.frame(self.frame_idx).is_dirty()
232 }
233
234 pub fn mark_dirty(&self) {
236 let pool = self.bm.get_pool(self.pool_id);
237 pool.frame(self.frame_idx).set_dirty();
238 }
239}
240
241impl Drop for PinnedPage<'_> {
242 fn drop(&mut self) {
243 let pool = self.bm.get_pool(self.pool_id);
244 pool.frame(self.frame_idx).unpin();
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use super::*;
251 use crate::page_id::PAGE_SIZE;
252 use crate::page_store::MockPageStore;
253
254 fn make_bm(frames: u32) -> BufferManager {
255 BufferManager::new(frames, 0.7, Box::new(MockPageStore::new()))
256 }
257
258 #[test]
259 fn create_buffer_manager() {
260 let bm = make_bm(10);
261 assert_eq!(bm.total_frames(), 10);
262 let stats = bm.stats();
263 assert_eq!(stats.read_pool_frames, 7);
264 assert_eq!(stats.write_pool_frames, 3);
265 }
266
267 #[test]
268 fn pin_and_unpin_read() {
269 let bm = make_bm(10);
270 let pid = PageId::new(FileId(0), 0);
271 {
272 let page = bm.pin_read(pid).unwrap();
273 assert_eq!(page.page_id(), pid);
274 assert_eq!(page.data().len(), PAGE_SIZE);
275 assert!(page.data().iter().all(|&b| b == 0));
277 }
278 let stats = bm.stats();
280 assert_eq!(stats.read_pool_pinned, 0);
281 }
282
283 #[test]
284 fn pin_write_marks_dirty() {
285 let bm = make_bm(10);
286 let pid = PageId::new(FileId(0), 0);
287 {
288 let mut page = bm.pin_write(pid).unwrap();
289 assert!(!page.is_dirty());
290 page.data_mut()[0] = 42;
291 assert!(page.is_dirty());
292 }
293 }
294
295 #[test]
296 fn pin_same_page_twice() {
297 let bm = make_bm(10);
298 let pid = PageId::new(FileId(0), 0);
299 let p1 = bm.pin_read(pid).unwrap();
300 let p2 = bm.pin_read(pid).unwrap();
301 assert_eq!(p1.page_id(), p2.page_id());
302 drop(p1);
303 drop(p2);
304 }
305
306 #[test]
307 fn write_and_read_back() {
308 let bm = make_bm(10);
309 let pid = PageId::new(FileId(0), 0);
310
311 {
313 let mut page = bm.pin_write(pid).unwrap();
314 page.data_mut()[0] = 0xAB;
315 page.data_mut()[1] = 0xCD;
316 }
317
318 {
320 let page = bm.pin_read(pid).unwrap();
321 assert_eq!(page.data()[0], 0xAB);
322 assert_eq!(page.data()[1], 0xCD);
323 }
324 }
325
326 #[test]
327 fn allocate_new_page() {
328 let bm = make_bm(10);
329 let (pid, mut page) = bm.allocate_new_page(FileId(0)).unwrap();
330 assert_eq!(pid.file_id, FileId(0));
331 assert_eq!(pid.page_idx, 0);
332 page.data_mut()[0] = 99;
333 drop(page);
334
335 let (pid2, _page2) = bm.allocate_new_page(FileId(0)).unwrap();
336 assert_eq!(pid2.page_idx, 1);
337 }
338
339 #[test]
340 fn flush_all() {
341 let store = MockPageStore::new();
342 let bm = BufferManager::new(10, 0.7, Box::new(store));
343 let pid = PageId::new(FileId(0), 0);
344
345 {
346 let mut page = bm.pin_write(pid).unwrap();
347 page.data_mut()[0] = 0xFF;
348 }
349
350 bm.flush_all().unwrap();
351
352 let stats = bm.stats();
353 assert_eq!(stats.write_pool_dirty, 0);
354 }
355
356 #[test]
357 fn eviction_on_full_pool() {
358 let bm = make_bm(4);
360
361 for i in 0..3 {
363 let pid = PageId::new(FileId(0), i);
364 let page = bm.pin_read(pid).unwrap();
365 drop(page); }
367
368 assert_eq!(bm.stats().read_pool_loaded, 3);
369
370 let pid = PageId::new(FileId(0), 3);
372 let page = bm.pin_read(pid).unwrap();
373 assert_eq!(page.data().len(), PAGE_SIZE);
374 drop(page);
375 }
376
377 #[test]
378 fn eviction_writes_dirty_page() {
379 let bm = BufferManager::new(4, 0.75, Box::new(MockPageStore::new()));
380
381 for i in 0..3 {
383 let pid = PageId::new(FileId(0), i);
384 let page = bm.pin_read(pid).unwrap();
385 page.mark_dirty();
386 drop(page);
387 }
388
389 let pid = PageId::new(FileId(0), 10);
391 let _page = bm.pin_read(pid).unwrap();
392
393 }
397
398 #[test]
399 fn buffer_pool_exhausted() {
400 let bm = make_bm(2);
402
403 let p1 = bm.pin_read(PageId::new(FileId(0), 0)).unwrap();
405
406 let result = bm.pin_read(PageId::new(FileId(0), 1));
409 assert!(result.is_err());
410
411 drop(p1);
412 }
413
414 #[test]
415 fn stats() {
416 let bm = make_bm(10);
417 let stats = bm.stats();
418 assert_eq!(stats.read_pool_loaded, 0);
419 assert_eq!(stats.write_pool_loaded, 0);
420 assert_eq!(stats.page_table_entries, 0);
421
422 let _p = bm.pin_read(PageId::new(FileId(0), 0)).unwrap();
423 let stats = bm.stats();
424 assert_eq!(stats.read_pool_loaded, 1);
425 assert_eq!(stats.read_pool_pinned, 1);
426 assert_eq!(stats.page_table_entries, 1);
427 }
428
429 #[test]
430 fn split_pool_ratio() {
431 let bm = BufferManager::new(100, 0.7, Box::new(MockPageStore::new()));
432 let stats = bm.stats();
433 assert_eq!(stats.read_pool_frames, 70);
434 assert_eq!(stats.write_pool_frames, 30);
435 }
436
437 #[test]
438 fn minimum_pool_sizes() {
439 let bm = BufferManager::new(2, 0.0, Box::new(MockPageStore::new()));
441 assert!(bm.read_pool.num_frames() >= 1);
442 assert!(bm.write_pool.num_frames() >= 1);
443 }
444}