1use bytes::Bytes;
7use dashmap::{DashMap, DashSet};
8use parking_lot::{Mutex, RwLock};
9use std::sync::atomic::Ordering;
10use std::{collections::VecDeque, sync::Arc};
11
12use crate::buffer::page::{
13 self, Page, PageId, ReadPageGuard, WritePageGuard, INVALID_PAGE_ID, PAGE_SIZE,
14};
15
16use crate::catalog::SchemaRef;
17use crate::error::{QuillSQLError, QuillSQLResult};
18use crate::storage::codec::{
19 BPlusTreeHeaderPageCodec, BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec,
20 BPlusTreePageCodec, TablePageCodec,
21};
22use crate::storage::disk_scheduler::DiskScheduler;
23use crate::storage::{
24 page::TablePage,
25 page::{BPlusTreeHeaderPage, BPlusTreeInternalPage, BPlusTreeLeafPage, BPlusTreePage},
26};
27
28use crate::config::BufferPoolConfig;
29use crate::recovery::Lsn;
30use crate::recovery::WalManager;
31use crate::utils::cache::lru_k::LRUKReplacer;
32use crate::utils::cache::tiny_lfu::TinyLFU;
33use crate::utils::cache::Replacer;
34
35pub type FrameId = usize;
36
37pub const BUFFER_POOL_SIZE: usize = 5000;
38
39#[derive(Debug)]
40pub struct BufferPoolManager {
41 pub pool: Vec<Arc<RwLock<Page>>>,
42 pub replacer: Arc<RwLock<LRUKReplacer>>,
43 pub disk_scheduler: Arc<DiskScheduler>,
44 pub page_table: Arc<DashMap<PageId, FrameId>>,
45 pub free_list: Arc<RwLock<VecDeque<FrameId>>>,
46 pub inflight_loads: Arc<DashMap<PageId, Arc<Mutex<()>>>>,
48 pub tiny_lfu: Option<Arc<RwLock<TinyLFU>>>,
50 pub dirty_pages: Arc<DashSet<PageId>>,
52 pub dirty_page_table: DashMap<PageId, Lsn>,
54 pub wal_manager: Arc<RwLock<Option<Arc<WalManager>>>>,
56}
57
58impl BufferPoolManager {
59 #[inline]
60 fn replacer_set_evictable(&self, frame_id: FrameId, evictable: bool) -> QuillSQLResult<()> {
61 let mut rep = self.replacer.write();
62 rep.set_evictable(frame_id, evictable)
63 .map_err(|e| QuillSQLError::Internal(format!("replacer set_evictable failed: {}", e)))
64 }
65
66 #[inline]
67 fn replacer_record_access(&self, frame_id: FrameId) -> QuillSQLResult<()> {
68 let mut rep = self.replacer.write();
69 rep.record_access(frame_id)
70 .map_err(|e| QuillSQLError::Internal(format!("replacer record_access failed: {}", e)))
71 }
72 #[inline]
73 fn replacer_touch_and_set(&self, frame_id: FrameId, evictable: bool) -> QuillSQLResult<()> {
74 let mut rep = self.replacer.write();
75 rep.record_access(frame_id).map_err(|e| {
76 QuillSQLError::Internal(format!("replacer record_access failed: {}", e))
77 })?;
78 rep.set_evictable(frame_id, evictable)
79 .map_err(|e| QuillSQLError::Internal(format!("replacer set_evictable failed: {}", e)))
80 }
81
82 pub fn new(num_pages: usize, disk_scheduler: Arc<DiskScheduler>) -> Self {
83 Self::new_with_config(
84 BufferPoolConfig {
85 buffer_pool_size: num_pages,
86 ..Default::default()
87 },
88 disk_scheduler,
89 )
90 }
91
92 pub fn new_with_config(config: BufferPoolConfig, disk_scheduler: Arc<DiskScheduler>) -> Self {
93 let num_pages = config.buffer_pool_size;
94 let mut free_list = VecDeque::with_capacity(num_pages);
95 let mut pool = vec![];
96 for i in 0..num_pages {
97 free_list.push_back(i);
98 pool.push(Arc::new(RwLock::new(Page::empty())));
99 }
100
101 Self {
102 pool,
103 replacer: Arc::new(RwLock::new(LRUKReplacer::new(num_pages))),
104 disk_scheduler,
105 page_table: Arc::new(DashMap::new()),
106 free_list: Arc::new(RwLock::new(free_list)),
107 inflight_loads: Arc::new(DashMap::new()),
108 tiny_lfu: if config.tiny_lfu_enable {
109 Some(Arc::new(RwLock::new(TinyLFU::new(
110 num_pages.next_power_of_two(),
111 config.tiny_lfu_counters,
112 ))))
113 } else {
114 None
115 },
116 dirty_pages: Arc::new(DashSet::new()),
117 wal_manager: Arc::new(RwLock::new(None)),
118 dirty_page_table: DashMap::new(),
119 }
120 }
121
122 pub fn set_wal_manager(&self, wal_manager: Arc<WalManager>) {
123 *self.wal_manager.write() = Some(wal_manager);
124 }
125
126 pub fn wal_manager(&self) -> Option<Arc<WalManager>> {
127 self.wal_manager.read().clone()
128 }
129
130 pub fn dirty_page_ids(&self) -> Vec<PageId> {
131 self.dirty_pages.iter().map(|entry| *entry.key()).collect()
132 }
133
134 pub fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)> {
135 self.dirty_page_table
136 .iter()
137 .map(|entry| (*entry.key(), *entry.value()))
138 .collect()
139 }
140
141 #[inline]
142 fn note_dirty_page(&self, page_id: PageId, rec_lsn: Lsn) {
143 self.dirty_pages.insert(page_id);
144 self.dirty_page_table.entry(page_id).or_insert(rec_lsn);
145 }
146
147 pub fn new_page(self: &Arc<Self>) -> QuillSQLResult<WritePageGuard> {
149 if self.free_list.read().is_empty() && self.replacer.read().size() == 0 {
150 return Err(QuillSQLError::Storage(
151 "Cannot new page because buffer pool is full and no page to evict".to_string(),
152 ));
153 }
154
155 let frame_id = self.allocate_frame()?;
156
157 let rx_alloc = self.disk_scheduler.schedule_allocate()?;
158 let new_page_id = rx_alloc.recv().map_err(|e| {
159 QuillSQLError::Internal(format!("Failed to receive allocated page_id: {}", e))
160 })??;
161 self.page_table.insert(new_page_id, frame_id);
162
163 let page_arc = self.pool[frame_id].clone();
164 {
165 let mut page_writer = page_arc.write();
166 *page_writer = Page::new(new_page_id);
167 page_writer.pin_count.store(1, Ordering::Relaxed);
168 }
169
170 self.replacer_touch_and_set(frame_id, false)?;
171
172 Ok(page::new_write_guard(self.clone(), page_arc))
173 }
174
175 pub fn fetch_page_read(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<ReadPageGuard> {
177 if page_id == INVALID_PAGE_ID {
178 return Err(QuillSQLError::Storage(
179 "fetch_page_read: invalid page id".to_string(),
180 ));
181 }
182 let mut attempts = 0usize;
184 loop {
185 if let Ok(frame_id) = self.get_frame_for_page(page_id) {
186 let page_arc = self.pool[frame_id].clone();
187
188 let reader = page_arc.read();
190 if reader.page_id() == page_id {
191 reader.pin();
193 let _ = self.replacer_set_evictable(frame_id, false);
195 drop(reader);
196 return Ok(page::new_read_guard(self.clone(), page_arc));
197 } else {
198 drop(reader);
199 attempts += 1;
200 if attempts > 128 {
201 break;
202 }
203 std::hint::spin_loop();
204 continue;
205 }
206 } else {
207 attempts += 1;
208 if attempts > 128 {
209 break;
210 }
211 std::hint::spin_loop();
212 continue;
213 }
214 }
215 Err(QuillSQLError::Internal(format!(
216 "fetch_page_read: failed after {} retries",
217 attempts
218 )))
219 }
220
221 pub fn fetch_page_write(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<WritePageGuard> {
223 if page_id == INVALID_PAGE_ID {
224 return Err(QuillSQLError::Storage(
225 "fetch_page_write: invalid page id".to_string(),
226 ));
227 }
228 let mut attempts = 0usize;
229 loop {
230 if let Ok(frame_id) = self.get_frame_for_page(page_id) {
231 let page_arc = self.pool[frame_id].clone();
232
233 {
235 let reader = page_arc.read();
236 if reader.page_id() == page_id {
237 reader.pin();
238 let _ = self.replacer_set_evictable(frame_id, false);
240 } else {
241 drop(reader);
242 attempts += 1;
243 if attempts > 128 {
244 break;
245 }
246 std::hint::spin_loop();
247 continue;
248 }
249 if false {
250 eprintln!(
251 "[LOCK DEBUG] thread={:?} attempt write page_id={}",
252 std::thread::current().id(),
253 reader.page_id()
254 );
255 }
256 }
257
258 let guard = page::new_write_guard(self.clone(), page_arc.clone());
259 if false {
260 if let Some(r) = page_arc.try_read() {
261 eprintln!(
262 "[LOCK DEBUG] thread={:?} acquired write page_id={}",
263 std::thread::current().id(),
264 r.page_id()
265 );
266 } else {
267 eprintln!(
268 "[LOCK DEBUG] thread={:?} acquired write page_id=<busy>",
269 std::thread::current().id()
270 );
271 }
272 }
273 return Ok(guard);
274 } else {
275 attempts += 1;
276 if attempts > 128 {
277 break;
278 }
279 std::hint::spin_loop();
280 continue;
281 }
282 }
283 Err(QuillSQLError::Internal(format!(
284 "fetch_page_write: failed after {} retries",
285 attempts
286 )))
287 }
288
289 pub fn complete_unpin(
292 &self,
293 page_id: PageId,
294 is_dirty: bool,
295 old_pin_count: u32,
296 rec_lsn_hint: Option<Lsn>,
297 ) -> QuillSQLResult<()> {
298 if let Some(frame_id_ref) = self.page_table.get(&page_id) {
299 let frame_id = *frame_id_ref;
300 if is_dirty {
301 if let Some(mut p) = self.pool[frame_id].try_write() {
302 p.is_dirty = true;
303 }
304 let lsn = rec_lsn_hint.unwrap_or_else(|| self.pool[frame_id].read().page_lsn);
305 self.note_dirty_page(page_id, lsn);
306 }
307 if old_pin_count == 1 {
308 self.replacer_set_evictable(frame_id, true)?;
309 }
310 }
311 Ok(())
312 }
313
314 fn get_frame_for_page(&self, page_id: PageId) -> QuillSQLResult<FrameId> {
316 if let Some(frame_id_ref) = self.page_table.get(&page_id) {
317 let frame_id = *frame_id_ref;
318 self.replacer_record_access(frame_id)?;
319 if let Some(f) = &self.tiny_lfu {
321 f.write().admit_record(page_id as u64);
322 }
323 Ok(frame_id)
324 } else {
325 let (lock_arc, created_here) = if let Some(g) = self.inflight_loads.get(&page_id) {
327 (g.clone(), false)
328 } else {
329 let arc = Arc::new(Mutex::new(()));
330 self.inflight_loads.insert(page_id, arc.clone());
331 (arc, true)
332 };
333
334 let _lock_guard = lock_arc.lock();
335
336 if let Some(frame_id_ref2) = self.page_table.get(&page_id) {
338 let frame_id2 = *frame_id_ref2;
339 self.replacer_record_access(frame_id2)?;
340 return Ok(frame_id2);
341 }
342
343 if let Some(f) = &self.tiny_lfu {
345 let est = f.read().estimate(page_id as u64);
346 if est == 0 && self.free_list.read().is_empty() && self.replacer.read().size() == 0
347 {
348 return Err(QuillSQLError::Storage(
349 "Cannot allocate frame: admission denied and no space".to_string(),
350 ));
351 }
352 }
353
354 let frame_id = match self.allocate_frame() {
355 Ok(fid) => fid,
356 Err(e) => {
357 if created_here {
358 self.inflight_loads.remove(&page_id);
359 }
360 return Err(e);
361 }
362 };
363
364 let page_data_bytes = match self.disk_scheduler.schedule_read(page_id) {
365 Ok(rx) => match rx.recv() {
366 Ok(Ok(bytes)) => bytes,
367 Ok(Err(e)) => {
368 if created_here {
369 self.inflight_loads.remove(&page_id);
370 }
371 return Err(e);
372 }
373 Err(e) => {
374 if created_here {
375 self.inflight_loads.remove(&page_id);
376 }
377 return Err(QuillSQLError::Internal(format!(
378 "Channel disconnected: {}",
379 e
380 )));
381 }
382 },
383 Err(e) => {
384 if created_here {
385 self.inflight_loads.remove(&page_id);
386 }
387 return Err(e);
388 }
389 };
390
391 let mut page_data_array = [0u8; PAGE_SIZE];
392 page_data_array.copy_from_slice(&page_data_bytes[..PAGE_SIZE]);
393
394 let page_arc = &self.pool[frame_id];
395 {
396 let mut page = page_arc.write();
397 *page = Page::new(page_id);
398 page.data = page_data_array;
399 }
401
402 self.page_table.insert(page_id, frame_id);
403 if let Some(f) = &self.tiny_lfu {
404 f.write().admit_record(page_id as u64);
405 }
406 if created_here {
407 self.inflight_loads.remove(&page_id);
408 }
409
410 self.replacer_record_access(frame_id)?;
411 Ok(frame_id)
412 }
413 }
414
415 pub fn fetch_table_page(
416 self: &Arc<Self>,
417 page_id: PageId,
418 schema: SchemaRef,
419 ) -> QuillSQLResult<(ReadPageGuard, TablePage)> {
420 let guard = self.fetch_page_read(page_id)?;
421 let (table_page, _) = TablePageCodec::decode(&guard.data, schema)?;
423 Ok((guard, table_page))
424 }
425
426 pub fn fetch_tree_page(
427 self: &Arc<Self>,
428 page_id: PageId,
429 key_schema: SchemaRef,
430 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreePage)> {
431 let guard = self.fetch_page_read(page_id)?;
432 let (tree_page, _) = BPlusTreePageCodec::decode(&guard.data, key_schema.clone())?;
433 Ok((guard, tree_page))
434 }
435
436 pub fn fetch_tree_internal_page(
437 self: &Arc<Self>,
438 page_id: PageId,
439 key_schema: SchemaRef,
440 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeInternalPage)> {
441 let guard = self.fetch_page_read(page_id)?;
442 let (tree_internal_page, _) =
443 BPlusTreeInternalPageCodec::decode(&guard.data, key_schema.clone())?;
444 Ok((guard, tree_internal_page))
445 }
446
447 pub fn fetch_tree_leaf_page(
448 self: &Arc<Self>,
449 page_id: PageId,
450 key_schema: SchemaRef,
451 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeLeafPage)> {
452 let guard = self.fetch_page_read(page_id)?;
453 let (tree_leaf_page, _) = BPlusTreeLeafPageCodec::decode(&guard.data, key_schema.clone())?;
454 Ok((guard, tree_leaf_page))
455 }
456
457 pub fn prefetch_page(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<()> {
459 if let Ok(g) = self.fetch_page_read(page_id) {
460 drop(g);
461 }
462 Ok(())
463 }
464
465 pub fn fetch_header_page(
466 self: &Arc<Self>,
467 page_id: PageId,
468 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeHeaderPage)> {
469 let guard = self.fetch_page_read(page_id)?;
470 let (header_page, _) = BPlusTreeHeaderPageCodec::decode(&guard.data)?;
471 Ok((guard, header_page))
472 }
473
474 pub fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
475 if let Some(frame_id_ref) = self.page_table.get(&page_id) {
476 let frame_id = *frame_id_ref;
477 let page_arc = self.pool[frame_id].clone();
478
479 let mut guard = page_arc.write();
481 if !guard.is_dirty {
482 self.dirty_pages.remove(&page_id);
483 return Ok(false);
484 }
485
486 if let Some(wal) = self.wal_manager.read().clone() {
487 let durable_lsn = wal.durable_lsn();
488 if guard.page_lsn > durable_lsn {
489 let target = guard.page_lsn;
490 wal.flush(Some(target))?;
491 if wal.durable_lsn() < target {
492 return Err(QuillSQLError::Internal(format!(
493 "Flush of page {} blocked: page_lsn={} > durable_lsn={}",
494 page_id,
495 guard.page_lsn,
496 wal.durable_lsn()
497 )));
498 }
499 }
500 }
501 let data_bytes = Bytes::copy_from_slice(&guard.data);
502
503 self.disk_scheduler
504 .schedule_write(page_id, data_bytes)?
505 .recv()
506 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
507
508 guard.is_dirty = false;
509 self.dirty_pages.remove(&page_id);
510 Ok(true)
511 } else {
512 Ok(false)
513 }
514 }
515
516 pub fn flush_all_pages(&self) -> QuillSQLResult<()> {
517 if let Some(wal) = self.wal_manager.read().clone() {
518 wal.flush(None)?;
519 }
520 let dirty_ids: Vec<PageId> = self.dirty_pages.iter().map(|entry| *entry.key()).collect();
521 for page_id in dirty_ids {
522 let _ = self.flush_page(page_id)?;
523 }
524 Ok(())
525 }
526
527 pub fn delete_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
528 let (lock_arc, created_here) = if let Some(g) = self.inflight_loads.get(&page_id) {
529 (g.clone(), false)
530 } else {
531 let arc = Arc::new(Mutex::new(()));
532 self.inflight_loads.insert(page_id, arc.clone());
533 (arc, true)
534 };
535
536 let mut inflight_lock = Some(lock_arc.lock());
537 let mut cleanup = |bpm: &BufferPoolManager| {
538 if let Some(g) = inflight_lock.take() {
539 drop(g);
540 }
541 if created_here {
542 bpm.inflight_loads.remove(&page_id);
543 }
544 };
545
546 let result = (|| {
547 loop {
548 if let Some(frame_ref) = self.page_table.get(&page_id) {
549 let frame_id = *frame_ref;
550 let page_arc = self.pool[frame_id].clone();
551 drop(frame_ref);
552
553 let mut page_writer = match page_arc.try_write() {
554 Some(guard) => guard,
555 None => {
556 return Ok(false);
558 }
559 };
560
561 if page_writer.page_id() != page_id {
562 drop(page_writer);
564 let _ = self
565 .page_table
566 .remove_if(&page_id, |_, fid| *fid == frame_id);
567 continue;
568 }
569
570 if page_writer.get_pin_count() > 0 {
571 return Ok(false);
573 }
574
575 if self
576 .page_table
577 .remove_if(&page_id, |_, fid| *fid == frame_id)
578 .is_none()
579 {
580 continue;
581 }
582
583 page_writer.destroy();
584 self.dirty_pages.remove(&page_id);
585 drop(page_writer);
586
587 {
588 let mut rep = self.replacer.write();
589 let _ = rep.set_evictable(frame_id, true);
590 let _ = rep.remove(frame_id);
591 }
592
593 self.free_list.write().push_back(frame_id);
594
595 self.disk_scheduler
596 .schedule_deallocate(page_id)?
597 .recv()
598 .map_err(|e| {
599 QuillSQLError::Internal(format!("Channel disconnected: {}", e))
600 })??;
601
602 return Ok(true);
603 } else {
604 self.disk_scheduler
606 .schedule_deallocate(page_id)?
607 .recv()
608 .map_err(|e| {
609 QuillSQLError::Internal(format!("Channel disconnected: {}", e))
610 })??;
611 return Ok(true);
612 }
613 }
614 })();
615
616 cleanup(self);
617 result
618 }
619
620 fn allocate_frame(&self) -> QuillSQLResult<FrameId> {
621 if let Some(frame_id) = self.free_list.write().pop_front() {
623 return Ok(frame_id);
624 }
625
626 loop {
628 let opt = { self.replacer.write().evict() };
629 let Some(frame_id) = opt else {
630 return Err(QuillSQLError::Storage(
631 "Cannot allocate frame: buffer pool is full and all pages are pinned"
632 .to_string(),
633 ));
634 };
635
636 let evicted_page_arc = self.pool[frame_id].clone();
637 let handled = {
639 let opt_guard = evicted_page_arc.try_write();
640 if let Some(evicted_page_writer) = opt_guard {
641 let evicted_page_id = evicted_page_writer.page_id;
642 if evicted_page_writer.get_pin_count() > 0 {
643 drop(evicted_page_writer);
644 let mut rep = self.replacer.write();
645 rep.record_access(frame_id)?;
646 rep.set_evictable(frame_id, true)?;
647 false
648 } else {
649 let need_flush = evicted_page_writer.is_dirty;
650 drop(evicted_page_writer);
651 if need_flush {
652 self.flush_page(evicted_page_id)?;
653 }
654 self.page_table.remove(&evicted_page_id);
655 true
661 }
662 } else {
663 let mut rep = self.replacer.write();
665 rep.record_access(frame_id)?;
666 rep.set_evictable(frame_id, true)?;
667 false
668 }
669 };
670 if handled {
671 return Ok(frame_id);
672 }
673 }
674 }
675}
676
677#[cfg(test)]
678mod tests {
679 use crate::buffer::buffer_pool::BufferPoolManager;
680 use crate::config::WalConfig;
681 use crate::recovery::wal_record::{
682 TransactionPayload, TransactionRecordKind, WalRecordPayload,
683 };
684 use crate::recovery::WalManager;
685 use crate::storage::disk_manager::DiskManager;
686 use crate::storage::disk_scheduler::DiskScheduler;
687 use crate::utils::cache::Replacer;
688 use std::fs;
689 use std::sync::Arc;
690 use tempfile::TempDir;
691
692 fn setup_test_environment(
695 num_pages: usize,
696 ) -> (
697 TempDir, Arc<BufferPoolManager>,
699 ) {
700 let temp_dir = TempDir::new().unwrap();
701 let db_path = temp_dir.path().join("test.db");
702
703 let disk_manager = Arc::new(DiskManager::try_new(db_path).unwrap());
704 let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager));
705 let buffer_pool_manager = Arc::new(BufferPoolManager::new(num_pages, disk_scheduler));
706
707 (temp_dir, buffer_pool_manager)
708 }
709
710 #[test]
711 fn test_new_page_and_basic_fetch() {
712 let (_temp_dir, bpm) = setup_test_environment(10);
713
714 let mut page0_guard = bpm.new_page().unwrap();
716 let page0_id = page0_guard.page_id();
717
718 let test_data = b"Hello, World!";
720 page0_guard.data[..test_data.len()].copy_from_slice(test_data);
721 assert!(page0_guard.is_dirty); assert_eq!(&page0_guard.data[..test_data.len()], test_data);
725
726 drop(page0_guard);
728
729 let page0_read_guard = bpm.fetch_page_read(page0_id).unwrap();
730 assert_eq!(page0_read_guard.page_id(), page0_id);
731 assert_eq!(&page0_read_guard.data[..test_data.len()], test_data);
732 assert_eq!(page0_read_guard.pin_count(), 1);
733
734 drop(page0_read_guard);
736
737 let final_guard = bpm.fetch_page_read(page0_id).unwrap();
739 assert_eq!(final_guard.pin_count(), 1);
740 assert_eq!(final_guard.is_dirty, true); }
742
743 #[test]
744 fn test_unpin_and_eviction_logic() {
745 let (_temp_dir, bpm) = setup_test_environment(3);
746
747 let page1 = bpm.new_page().unwrap();
749 let page1_id = page1.page_id();
750 let page2 = bpm.new_page().unwrap();
751 let page2_id = page2.page_id();
752 let page3 = bpm.new_page().unwrap();
753 let page3_id = page3.page_id();
754
755 assert_eq!(bpm.replacer.read().size(), 0);
757
758 drop(page1);
760 assert_eq!(bpm.replacer.read().size(), 1);
761
762 drop(page2);
764 assert_eq!(bpm.replacer.read().size(), 2);
765
766 let page4 = bpm.new_page().unwrap();
769 assert_ne!(page4.page_id(), page1_id);
770
771 assert!(bpm.page_table.get(&page1_id).is_none());
773 assert!(bpm.page_table.get(&page2_id).is_some());
774 assert!(bpm.page_table.get(&page3_id).is_some());
775
776 assert_eq!(bpm.replacer.read().size(), 1);
778 }
779
780 #[test]
781 fn test_flush_page() {
782 let (temp_dir, bpm) = setup_test_environment(10);
783 let db_path = temp_dir.path().join("test.db");
784
785 let page_id = {
787 let mut guard = bpm.new_page().unwrap();
788 guard.data[0..4].copy_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
789 guard.page_id()
790 };
792
793 let flush_result = bpm.flush_page(page_id).unwrap();
795 assert!(flush_result);
796
797 let guard = bpm.fetch_page_read(page_id).unwrap();
799 assert!(!guard.is_dirty);
800 drop(guard);
801
802 let file_data = fs::read(db_path).unwrap();
804
805 let mut found = false;
807 for i in 0..=(file_data.len().saturating_sub(4)) {
808 if &file_data[i..i + 4] == &[0xDE, 0xAD, 0xBE, 0xEF] {
809 found = true;
810 break;
811 }
812 }
813
814 assert!(found, "Test data was not written to disk correctly");
815 }
816
817 #[test]
818 fn test_flush_requires_durable_wal() {
819 let (temp_dir, bpm) = setup_test_environment(4);
820 let wal_dir = temp_dir.path().join("wal");
821 let scheduler = bpm.disk_scheduler.clone();
822 let wal = Arc::new(
823 WalManager::new(
824 WalConfig {
825 directory: wal_dir.clone(),
826 ..WalConfig::default()
827 },
828 scheduler,
829 None,
830 None,
831 )
832 .expect("wal manager"),
833 );
834 bpm.set_wal_manager(wal.clone());
835
836 let page_id;
837 let lsn;
838 {
839 let mut guard = bpm.new_page().expect("new page");
840 page_id = guard.page_id();
841 guard.data[0..4].copy_from_slice(b"walu");
842 lsn = wal
843 .append_record_with(|_ctx| {
844 WalRecordPayload::Transaction(TransactionPayload {
845 marker: TransactionRecordKind::Begin,
846 txn_id: 1,
847 })
848 })
849 .expect("append wal record")
850 .end_lsn;
851 guard.page_lsn = lsn;
852 }
853
854 let flushed = bpm
855 .flush_page(page_id)
856 .expect("flush should auto-flush wal");
857 assert!(flushed, "page should flush once wal is durable");
858 assert!(wal.durable_lsn() >= lsn, "wal durable lsn should advance");
859
860 let wal_files: Vec<_> = std::fs::read_dir(&wal_dir)
861 .expect("wal directory")
862 .filter_map(|entry| entry.ok())
863 .collect();
864 assert!(
865 !wal_files.is_empty(),
866 "wal flush should create segment file"
867 );
868 let total_size: u64 = wal_files
869 .iter()
870 .map(|entry| entry.metadata().map(|m| m.len()).unwrap_or(0))
871 .sum();
872 assert!(total_size > 0, "wal segment should contain data");
873 }
874
875 #[test]
876 fn test_delete_page() {
877 let (_temp_dir, bpm) = setup_test_environment(10);
878
879 let page1_id = bpm.new_page().unwrap().page_id();
881 drop(bpm.new_page().unwrap()); assert_eq!(bpm.page_table.len(), 2);
884 assert_eq!(bpm.free_list.read().len(), 8);
885
886 drop(bpm.fetch_page_read(page1_id).unwrap()); let deleted = bpm.delete_page(page1_id).unwrap();
889 assert!(deleted);
890
891 assert!(bpm.page_table.get(&page1_id).is_none());
893 assert_eq!(bpm.page_table.len(), 1);
894 assert_eq!(bpm.free_list.read().len(), 9); assert_eq!(bpm.replacer.read().size(), 1); let refetched_guard = bpm.fetch_page_read(page1_id).unwrap();
899 assert!(refetched_guard.data.iter().all(|&b| b == 0));
900 }
901
902 #[test]
903 fn test_delete_pinned_page_fails() {
904 let (_temp_dir, bpm) = setup_test_environment(10);
905
906 let guard = bpm.new_page().unwrap();
907 let page_id = guard.page_id();
908
909 let deleted = bpm.delete_page(page_id).unwrap();
911 assert!(!deleted); assert!(bpm.page_table.get(&page_id).is_some());
915 }
916
917 #[test]
918 fn test_delete_page_concurrent_fetch_preserves_mapping() {
919 use std::sync::mpsc;
920 use std::thread;
921
922 let (_temp_dir, bpm) = setup_test_environment(6);
923
924 let page_id = {
925 let guard = bpm.new_page().unwrap();
926 let pid = guard.page_id();
927 drop(guard);
928 pid
929 };
930
931 drop(bpm.fetch_page_read(page_id).unwrap());
933 assert!(bpm.page_table.get(&page_id).is_some());
934
935 let (ready_tx, ready_rx) = mpsc::channel();
936 let (release_tx, release_rx) = mpsc::channel();
937
938 let bpm_fetch = bpm.clone();
939 let fetcher = thread::spawn(move || {
940 let guard = bpm_fetch.fetch_page_read(page_id).unwrap();
941 ready_tx.send(()).unwrap();
942 release_rx.recv().unwrap();
943 drop(guard);
944 });
945
946 ready_rx.recv().unwrap();
948
949 let frame_id = *bpm.page_table.get(&page_id).unwrap();
950 assert_eq!(bpm.pool[frame_id].read().get_pin_count(), 1);
951
952 let deleted = bpm.delete_page(page_id).unwrap();
954 assert!(!deleted);
955 assert!(bpm.page_table.get(&page_id).is_some());
956
957 release_tx.send(()).unwrap();
959 fetcher.join().unwrap();
960
961 let frame_id = *bpm.page_table.get(&page_id).unwrap();
962 assert_eq!(bpm.pool[frame_id].read().get_pin_count(), 0);
963 assert_eq!(bpm.replacer.read().size(), 1);
964
965 assert!(bpm.delete_page(page_id).unwrap());
967 }
968
969 #[test]
970 fn test_delete_page_stale_mapping_does_not_remove_new_page() {
971 let (_temp_dir, bpm) = setup_test_environment(6);
972
973 let page_a = bpm.new_page().unwrap().page_id();
974 let page_b = bpm.new_page().unwrap().page_id();
975
976 drop(bpm.fetch_page_read(page_a).unwrap());
977 drop(bpm.fetch_page_read(page_b).unwrap());
978
979 let frame_a = *bpm.page_table.get(&page_a).unwrap();
980 let frame_b = *bpm.page_table.get(&page_b).unwrap();
981 assert_ne!(frame_a, frame_b);
982
983 bpm.page_table.insert(page_a, frame_b);
985
986 assert!(bpm.delete_page(page_a).unwrap());
987 assert!(bpm.page_table.get(&page_a).is_none());
988
989 let frame_b_after = *bpm.page_table.get(&page_b).unwrap();
990 assert_eq!(frame_b_after, frame_b);
991 assert_eq!(bpm.pool[frame_b_after].read().page_id(), page_b);
992 }
993
994 #[test]
995 fn test_buffer_pool_is_full() {
996 let (_temp_dir, bpm) = setup_test_environment(2);
997
998 let _page1 = bpm.new_page().unwrap();
1000 let _page2 = bpm.new_page().unwrap();
1001
1002 assert_eq!(bpm.replacer.read().size(), 0);
1004 assert!(bpm.free_list.read().is_empty());
1005
1006 let page3_result = bpm.new_page();
1008 assert!(page3_result.is_err());
1009 }
1010
1011 #[test]
1012 fn test_concurrent_reads_and_exclusive_write() {
1013 let (_temp_dir, bpm) = setup_test_environment(10);
1014
1015 let page_id = {
1017 let mut guard = bpm.new_page().unwrap();
1018 guard.data[0] = 42;
1019 guard.page_id()
1020 };
1021
1022 let read_guard1 = bpm.fetch_page_read(page_id).unwrap();
1024 assert_eq!(read_guard1.data[0], 42);
1025 assert_eq!(read_guard1.pin_count(), 1);
1026 drop(read_guard1);
1027
1028 let mut write_guard = bpm.fetch_page_write(page_id).unwrap();
1030 write_guard.data[0] = 99;
1031 assert_eq!(write_guard.data[0], 99);
1032 }
1033
1034 #[test]
1035 fn test_concurrent_same_page_fetch_single_frame() {
1036 use std::thread;
1037 let (_temp_dir, bpm) = setup_test_environment(8);
1038
1039 let page_id = bpm.new_page().unwrap().page_id();
1041 drop(bpm.fetch_page_read(page_id).unwrap());
1043
1044 let threads = (0..8)
1045 .map(|_| {
1046 let bpm_c = bpm.clone();
1047 thread::spawn(move || {
1048 for _ in 0..100 {
1049 let g = bpm_c.fetch_page_read(page_id).unwrap();
1050 assert_eq!(g.page_id(), page_id);
1051 }
1052 })
1053 })
1054 .collect::<Vec<_>>();
1055
1056 for t in threads {
1057 t.join().unwrap();
1058 }
1059
1060 let frame_id = bpm.page_table.get(&page_id).map(|r| *r).unwrap();
1062 assert_eq!(bpm.pool[frame_id].read().page_id(), page_id);
1064 }
1065
1066 #[test]
1067 fn test_delete_vs_fetch_race_safety() {
1068 use std::thread;
1069 let (_temp_dir, bpm) = setup_test_environment(8);
1070
1071 let page_id = bpm.new_page().unwrap().page_id();
1072 drop(bpm.fetch_page_read(page_id).unwrap());
1073
1074 let bpm_del = bpm.clone();
1075 let deleter = thread::spawn(move || {
1076 for _ in 0..1000 {
1078 let _ = bpm_del.delete_page(page_id).unwrap();
1079 }
1080 });
1081
1082 let bpm_fetch = bpm.clone();
1083 let fetcher = thread::spawn(move || {
1084 for _ in 0..1000 {
1085 let _ = bpm_fetch.fetch_page_read(page_id).unwrap();
1086 }
1087 });
1088
1089 deleter.join().unwrap();
1090 fetcher.join().unwrap();
1091
1092 let frame_id_opt = bpm.page_table.get(&page_id).map(|r| *r);
1094 if let Some(frame_id) = frame_id_opt {
1095 assert_eq!(bpm.pool[frame_id].read().page_id(), page_id);
1096 }
1097 }
1098}