1use crate::eviction::{EvictionPolicy, EvictionPolicyFactory, EvictionPolicyType};
9use crate::{FileManager, Page, PageType};
10use featherdb_core::{Error, PageId, Result, TransactionId};
11use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
12use std::collections::{HashMap, HashSet};
13use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
14use std::sync::Arc;
15
16#[derive(Debug, Default)]
18pub struct BufferPoolStats {
19 pub cache_hits: AtomicUsize,
20 pub cache_misses: AtomicUsize,
21 pub pages_evicted: AtomicUsize,
22 pub pages_flushed: AtomicUsize,
23}
24
25impl BufferPoolStats {
26 pub fn hit_ratio(&self) -> f64 {
28 let hits = self.cache_hits.load(Ordering::Relaxed);
29 let misses = self.cache_misses.load(Ordering::Relaxed);
30 let total = hits + misses;
31 if total == 0 {
32 1.0
33 } else {
34 hits as f64 / total as f64
35 }
36 }
37
38 pub fn reset(&self) {
40 self.cache_hits.store(0, Ordering::Relaxed);
41 self.cache_misses.store(0, Ordering::Relaxed);
42 self.pages_evicted.store(0, Ordering::Relaxed);
43 self.pages_flushed.store(0, Ordering::Relaxed);
44 }
45
46 pub fn snapshot(&self) -> BufferPoolStatsSnapshot {
48 BufferPoolStatsSnapshot {
49 cache_hits: self.cache_hits.load(Ordering::Relaxed),
50 cache_misses: self.cache_misses.load(Ordering::Relaxed),
51 pages_evicted: self.pages_evicted.load(Ordering::Relaxed),
52 pages_flushed: self.pages_flushed.load(Ordering::Relaxed),
53 }
54 }
55}
56
57#[derive(Debug, Clone)]
59pub struct BufferPoolStatsSnapshot {
60 pub cache_hits: usize,
61 pub cache_misses: usize,
62 pub pages_evicted: usize,
63 pub pages_flushed: usize,
64}
65
66impl BufferPoolStatsSnapshot {
67 pub fn hit_ratio(&self) -> f64 {
69 let total = self.cache_hits + self.cache_misses;
70 if total == 0 {
71 1.0
72 } else {
73 self.cache_hits as f64 / total as f64
74 }
75 }
76}
77
78pub struct BufferFrame {
80 page_id: PageId,
82 page: RwLock<Page>,
84 pin_count: AtomicU32,
86 reference_bit: AtomicBool,
88 dirty: AtomicBool,
90 dirty_by_txn: AtomicU64,
93}
94
95impl BufferFrame {
96 fn new(page_id: PageId, page: Page) -> Self {
97 BufferFrame {
98 page_id,
99 page: RwLock::new(page),
100 pin_count: AtomicU32::new(0),
101 reference_bit: AtomicBool::new(true),
102 dirty: AtomicBool::new(false),
103 dirty_by_txn: AtomicU64::new(0),
104 }
105 }
106
107 #[allow(dead_code)]
108 pub fn page_id(&self) -> PageId {
109 self.page_id
110 }
111
112 #[allow(dead_code)]
113 pub fn is_dirty(&self) -> bool {
114 self.dirty.load(Ordering::Acquire)
115 }
116
117 pub fn mark_dirty(&self) {
118 self.dirty.store(true, Ordering::Release);
119 }
120
121 pub fn mark_dirty_by(&self, txn_id: TransactionId) {
123 self.dirty_by_txn.store(txn_id.0, Ordering::Release);
124 self.dirty.store(true, Ordering::Release);
125 }
126
127 pub fn clear_txn_ownership(&self) {
129 self.dirty_by_txn.store(0, Ordering::Release);
130 }
131
132 #[allow(dead_code)]
134 pub fn is_dirty_by(&self, txn_id: TransactionId) -> bool {
135 self.dirty_by_txn.load(Ordering::Acquire) == txn_id.0
136 }
137
138 pub fn dirty_txn(&self) -> Option<TransactionId> {
140 let txn_id = self.dirty_by_txn.load(Ordering::Acquire);
141 if txn_id == 0 {
142 None
143 } else {
144 Some(TransactionId(txn_id))
145 }
146 }
147
148 #[allow(dead_code)]
149 pub fn pin_count(&self) -> u32 {
150 self.pin_count.load(Ordering::Acquire)
151 }
152}
153
154pub struct BufferPool {
156 capacity: usize,
158 frames: RwLock<HashMap<PageId, Arc<BufferFrame>>>,
160 eviction_policy: Mutex<Box<dyn EvictionPolicy>>,
162 file_manager: Arc<FileManager>,
164 pub stats: BufferPoolStats,
166 policy_type: EvictionPolicyType,
168 active_txns: RwLock<HashSet<TransactionId>>,
170}
171
172impl BufferPool {
173 pub fn new(capacity: usize, file_manager: Arc<FileManager>) -> Self {
175 Self::with_policy(capacity, file_manager, EvictionPolicyType::Clock)
176 }
177
178 pub fn with_policy(
180 capacity: usize,
181 file_manager: Arc<FileManager>,
182 policy_type: EvictionPolicyType,
183 ) -> Self {
184 BufferPool {
185 capacity,
186 frames: RwLock::new(HashMap::new()),
187 eviction_policy: Mutex::new(policy_type.create(capacity)),
188 file_manager,
189 stats: BufferPoolStats::default(),
190 policy_type,
191 active_txns: RwLock::new(HashSet::new()),
192 }
193 }
194
195 pub fn eviction_policy_type(&self) -> EvictionPolicyType {
197 self.policy_type
198 }
199
200 pub fn eviction_policy_name(&self) -> &'static str {
202 self.eviction_policy.lock().name()
203 }
204
205 pub fn get_page(&self, page_id: PageId) -> Result<PageGuard> {
207 self.get_page_internal(page_id, false)
208 }
209
210 pub fn get_page_for_write(&self, page_id: PageId) -> Result<PageGuardMut> {
212 let guard = self.get_page_internal(page_id, true)?;
213 Ok(PageGuardMut { inner: guard })
214 }
215
216 fn get_page_internal(&self, page_id: PageId, for_write: bool) -> Result<PageGuard> {
217 let current_txn = if for_write {
219 self.get_single_active_txn()
220 } else {
221 None
222 };
223
224 {
226 let frames = self.frames.read();
227 if let Some(frame) = frames.get(&page_id) {
228 frame.pin_count.fetch_add(1, Ordering::AcqRel);
229 frame.reference_bit.store(true, Ordering::Relaxed);
230 self.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
231
232 if self.policy_type != EvictionPolicyType::Clock {
235 let mut policy = self.eviction_policy.lock();
236 policy.on_access(page_id);
237 }
238
239 if for_write {
240 if let Some(txn_id) = current_txn {
241 frame.mark_dirty_by(txn_id);
242 } else {
243 frame.mark_dirty();
244 }
245 }
246
247 return Ok(PageGuard {
248 frame: frame.clone(),
249 });
250 }
251 }
252
253 self.stats.cache_misses.fetch_add(1, Ordering::Relaxed);
255 self.load_page_with_txn(page_id, for_write, current_txn)
256 }
257
258 fn load_page_with_txn(
259 &self,
260 page_id: PageId,
261 for_write: bool,
262 txn_id: Option<TransactionId>,
263 ) -> Result<PageGuard> {
264 {
266 let frames = self.frames.read();
267 if frames.len() >= self.capacity {
268 drop(frames);
269 self.evict_one()?;
270 }
271 }
272
273 let page_data = self.file_manager.read_page(page_id)?;
275 let page = Page::from_bytes(page_data)?;
276
277 if !page.verify_checksum() {
279 return Err(Error::CorruptedPage(page_id));
280 }
281
282 let frame = Arc::new(BufferFrame::new(page_id, page));
284 frame.pin_count.fetch_add(1, Ordering::AcqRel);
285
286 if for_write {
287 if let Some(tid) = txn_id {
288 frame.mark_dirty_by(tid);
289 } else {
290 frame.mark_dirty();
291 }
292 }
293
294 {
296 let mut frames = self.frames.write();
297 let mut policy = self.eviction_policy.lock();
298
299 if let Some(existing) = frames.get(&page_id) {
301 existing.pin_count.fetch_add(1, Ordering::AcqRel);
302 return Ok(PageGuard {
303 frame: existing.clone(),
304 });
305 }
306
307 frames.insert(page_id, frame.clone());
308 policy.on_load(page_id);
309 }
310
311 Ok(PageGuard { frame })
312 }
313
314 pub fn allocate_page(&self, page_type: PageType) -> Result<(PageId, PageGuardMut)> {
316 let current_txn = self.get_single_active_txn();
318
319 {
321 let frames = self.frames.read();
322 if frames.len() >= self.capacity {
323 drop(frames);
324 self.evict_one()?;
325 }
326 }
327
328 let page_id = self.file_manager.extend()?;
330
331 let page = Page::new(page_id, page_type, self.file_manager.page_size());
333
334 let frame = Arc::new(BufferFrame::new(page_id, page));
336 frame.pin_count.fetch_add(1, Ordering::AcqRel);
337
338 if let Some(txn_id) = current_txn {
340 frame.mark_dirty_by(txn_id);
341 } else {
342 frame.mark_dirty();
343 }
344
345 {
347 let mut frames = self.frames.write();
348 let mut policy = self.eviction_policy.lock();
349
350 frames.insert(page_id, frame.clone());
351 policy.on_load(page_id);
352 }
353
354 Ok((
355 page_id,
356 PageGuardMut {
357 inner: PageGuard { frame },
358 },
359 ))
360 }
361
362 pub fn copy_page(&self, page_id: PageId) -> Result<(PageId, PageGuardMut)> {
364 let current_txn = self.get_single_active_txn();
366
367 let original = self.get_page(page_id)?;
369 let page_data = {
370 let page = original.read();
371 page.as_bytes().to_vec()
372 };
373 drop(original);
374
375 let new_page_id = self.file_manager.extend()?;
377 let mut new_page = Page::from_bytes(page_data)?;
378 new_page.set_page_id(new_page_id);
379
380 let frame = Arc::new(BufferFrame::new(new_page_id, new_page));
382 frame.pin_count.fetch_add(1, Ordering::AcqRel);
383
384 if let Some(txn_id) = current_txn {
386 frame.mark_dirty_by(txn_id);
387 } else {
388 frame.mark_dirty();
389 }
390
391 {
393 let mut frames = self.frames.write();
394 let mut policy = self.eviction_policy.lock();
395
396 frames.insert(new_page_id, frame.clone());
397 policy.on_load(new_page_id);
398 }
399
400 Ok((
401 new_page_id,
402 PageGuardMut {
403 inner: PageGuard { frame },
404 },
405 ))
406 }
407
408 fn evict_one(&self) -> Result<()> {
410 let max_attempts = self.capacity * 2;
412
413 for _ in 0..max_attempts {
414 let victim_id = {
415 let mut policy = self.eviction_policy.lock();
416 policy.select_victim()
417 };
418
419 let Some(page_id) = victim_id else {
420 return Err(Error::BufferPoolFull {
422 capacity: self.capacity,
423 pinned: self.count_pinned(),
424 });
425 };
426
427 let frame_opt = {
429 let frames = self.frames.read();
430 frames.get(&page_id).cloned()
431 };
432
433 if let Some(frame) = frame_opt {
434 if !self.can_evict_frame(&frame) {
436 let mut policy = self.eviction_policy.lock();
438 policy.remove(page_id);
439 continue;
440 }
441
442 if frame.dirty.load(Ordering::Acquire) {
444 self.flush_frame(&frame)?;
445 }
446
447 {
449 let mut frames = self.frames.write();
450 let mut policy = self.eviction_policy.lock();
451
452 frames.remove(&page_id);
453 policy.remove(page_id);
454 }
455
456 self.stats.pages_evicted.fetch_add(1, Ordering::Relaxed);
457 return Ok(());
458 } else {
459 let mut policy = self.eviction_policy.lock();
461 policy.remove(page_id);
462 }
463 }
464
465 Err(Error::BufferPoolFull {
466 capacity: self.capacity,
467 pinned: self.count_pinned(),
468 })
469 }
470
471 fn flush_frame(&self, frame: &BufferFrame) -> Result<()> {
472 let mut page = frame.page.write();
473
474 page.compute_checksum();
476
477 self.file_manager
479 .write_page(frame.page_id, page.as_bytes())?;
480
481 frame.dirty.store(false, Ordering::Release);
482 self.stats.pages_flushed.fetch_add(1, Ordering::Relaxed);
483
484 Ok(())
485 }
486
487 pub fn flush_all(&self) -> Result<()> {
492 let frames = self.frames.read();
493 let active = self.active_txns.read();
494
495 for frame in frames.values() {
496 if frame.dirty.load(Ordering::Acquire) {
497 if let Some(txn_id) = frame.dirty_txn() {
499 if active.contains(&txn_id) {
500 continue;
501 }
502 }
503 self.flush_frame(frame)?;
504 }
505 }
506
507 Ok(())
508 }
509
510 pub fn flush_page(&self, page_id: PageId) -> Result<()> {
512 let frames = self.frames.read();
513
514 if let Some(frame) = frames.get(&page_id) {
515 if frame.dirty.load(Ordering::Acquire) {
516 self.flush_frame(frame)?;
517 }
518 }
519
520 Ok(())
521 }
522
523 pub fn register_active_txn(&self, txn_id: TransactionId) {
527 self.active_txns.write().insert(txn_id);
528 }
529
530 pub fn unregister_active_txn(&self, txn_id: TransactionId) {
532 self.active_txns.write().remove(&txn_id);
533 }
534
535 pub fn get_page_for_write_in_txn(
540 &self,
541 page_id: PageId,
542 txn_id: TransactionId,
543 ) -> Result<PageGuardMut> {
544 let guard = self.get_page_internal(page_id, true)?;
545 guard.frame.mark_dirty_by(txn_id);
547 Ok(PageGuardMut { inner: guard })
548 }
549
550 pub fn allocate_page_in_txn(
552 &self,
553 page_type: PageType,
554 txn_id: TransactionId,
555 ) -> Result<(PageId, PageGuardMut)> {
556 {
558 let frames = self.frames.read();
559 if frames.len() >= self.capacity {
560 drop(frames);
561 self.evict_one()?;
562 }
563 }
564
565 let page_id = self.file_manager.extend()?;
567
568 let page = Page::new(page_id, page_type, self.file_manager.page_size());
570
571 let frame = Arc::new(BufferFrame::new(page_id, page));
573 frame.pin_count.fetch_add(1, Ordering::AcqRel);
574 frame.mark_dirty_by(txn_id);
575
576 {
578 let mut frames = self.frames.write();
579 let mut policy = self.eviction_policy.lock();
580
581 frames.insert(page_id, frame.clone());
582 policy.on_load(page_id);
583 }
584
585 Ok((
586 page_id,
587 PageGuardMut {
588 inner: PageGuard { frame },
589 },
590 ))
591 }
592
593 pub fn flush_transaction(&self, txn_id: TransactionId) -> Result<()> {
598 let frames = self.frames.read();
599
600 for frame in frames.values() {
601 if frame.is_dirty_by(txn_id) {
602 self.flush_frame(frame)?;
603 frame.clear_txn_ownership();
604 }
605 }
606
607 Ok(())
608 }
609
610 pub fn discard_transaction(&self, txn_id: TransactionId) -> Result<()> {
620 let pages_to_discard: Vec<PageId> = {
622 let frames = self.frames.read();
623 frames
624 .iter()
625 .filter(|(_, frame)| frame.is_dirty_by(txn_id))
626 .map(|(page_id, _)| *page_id)
627 .collect()
628 };
629
630 for page_id in pages_to_discard {
632 let mut frames_write = self.frames.write();
634
635 if let Some(frame) = frames_write.get(&page_id) {
636 if frame.is_dirty_by(txn_id) {
638 frame.dirty.store(false, Ordering::Release);
640 frame.clear_txn_ownership();
641
642 if frame.pin_count.load(Ordering::Acquire) == 0 {
644 frames_write.remove(&page_id);
645 let mut policy = self.eviction_policy.lock();
646 policy.remove(page_id);
647 }
648 }
649 }
650 }
651
652 Ok(())
653 }
654
655 fn get_single_active_txn(&self) -> Option<TransactionId> {
662 let active = self.active_txns.read();
663 if active.len() == 1 {
664 active.iter().next().copied()
665 } else {
666 None
667 }
668 }
669
670 fn can_evict_frame(&self, frame: &BufferFrame) -> bool {
676 if frame.pin_count.load(Ordering::Acquire) > 0 {
678 return false;
679 }
680
681 if let Some(txn_id) = frame.dirty_txn() {
683 let active = self.active_txns.read();
684 if active.contains(&txn_id) {
685 return false;
686 }
687 }
688
689 true
690 }
691
692 fn count_pinned(&self) -> usize {
694 let frames = self.frames.read();
695 frames
696 .values()
697 .filter(|f| f.pin_count.load(Ordering::Relaxed) > 0)
698 .count()
699 }
700
701 pub fn size(&self) -> usize {
703 self.frames.read().len()
704 }
705
706 pub fn capacity(&self) -> usize {
708 self.capacity
709 }
710
711 pub fn stats_snapshot(&self) -> BufferPoolStatsSnapshot {
713 self.stats.snapshot()
714 }
715}
716
717impl Drop for BufferPool {
718 fn drop(&mut self) {
719 let _ = self.flush_all();
721 }
722}
723
724pub struct PageGuard {
726 frame: Arc<BufferFrame>,
727}
728
729impl PageGuard {
730 pub fn read(&self) -> RwLockReadGuard<'_, Page> {
731 self.frame.reference_bit.store(true, Ordering::Relaxed);
732 self.frame.page.read()
733 }
734
735 pub fn page_id(&self) -> PageId {
736 self.frame.page_id
737 }
738}
739
740impl Drop for PageGuard {
741 fn drop(&mut self) {
742 self.frame.pin_count.fetch_sub(1, Ordering::AcqRel);
743 }
744}
745
746pub struct PageGuardMut {
748 inner: PageGuard,
749}
750
751impl PageGuardMut {
752 pub fn read(&self) -> RwLockReadGuard<'_, Page> {
753 self.inner.read()
754 }
755
756 pub fn write(&self) -> RwLockWriteGuard<'_, Page> {
757 self.inner
758 .frame
759 .reference_bit
760 .store(true, Ordering::Relaxed);
761 self.inner.frame.mark_dirty();
762 self.inner.frame.page.write()
763 }
764
765 pub fn page_id(&self) -> PageId {
766 self.inner.page_id()
767 }
768
769 pub fn apply_redo(&mut self, data: &[u8]) -> Result<()> {
770 let mut page = self.write();
771 page.apply_redo(data)
772 }
773}
774
775impl Drop for PageGuardMut {
776 fn drop(&mut self) {
777 }
779}
780
781#[cfg(test)]
782mod tests {
783 use super::*;
784 use crate::FileManager;
785 use featherdb_core::Config;
786 use tempfile::TempDir;
787
788 fn create_test_buffer_pool() -> (TempDir, BufferPool) {
789 create_test_buffer_pool_with_policy(EvictionPolicyType::Clock)
790 }
791
792 fn create_test_buffer_pool_with_policy(policy: EvictionPolicyType) -> (TempDir, BufferPool) {
793 let tmp = TempDir::new().unwrap();
794 let path = tmp.path().join("test.db");
795 let config = Config::new(&path);
796
797 let fm = Arc::new(FileManager::open(&config).unwrap());
798 fm.init_if_needed(&config).unwrap();
799
800 let bp = BufferPool::with_policy(10, fm, policy);
801 (tmp, bp)
802 }
803
804 #[test]
805 fn test_buffer_pool_allocate() {
806 let (_tmp, bp) = create_test_buffer_pool();
807
808 let (page_id, guard) = bp.allocate_page(PageType::Leaf).unwrap();
809 assert!(page_id.0 >= 3); {
812 let page = guard.read();
813 assert_eq!(page.page_type(), PageType::Leaf);
814 }
815 }
816
817 #[test]
818 fn test_buffer_pool_read_write() {
819 let (_tmp, bp) = create_test_buffer_pool();
820
821 let (page_id, guard) = bp.allocate_page(PageType::Leaf).unwrap();
823 {
824 let mut page = guard.write();
825 page.insert_cell(b"hello");
826 }
827 drop(guard);
828
829 bp.flush_page(page_id).unwrap();
831
832 let guard = bp.get_page(page_id).unwrap();
834 let page = guard.read();
835 assert_eq!(page.get_cell(0), Some(b"hello".as_slice()));
836 }
837
838 #[test]
839 fn test_buffer_pool_eviction_clock() {
840 let (_tmp, bp) = create_test_buffer_pool_with_policy(EvictionPolicyType::Clock);
841
842 for _ in 0..15 {
844 let (_, guard) = bp.allocate_page(PageType::Leaf).unwrap();
845 drop(guard); }
847
848 assert!(bp.stats.pages_evicted.load(Ordering::Relaxed) > 0);
850 assert_eq!(bp.eviction_policy_name(), "Clock");
851 }
852
853 #[test]
854 fn test_buffer_pool_eviction_lru2() {
855 let (_tmp, bp) = create_test_buffer_pool_with_policy(EvictionPolicyType::Lru2);
856
857 assert_eq!(bp.eviction_policy_name(), "LRU-2");
858
859 for _ in 0..15 {
861 let (_, guard) = bp.allocate_page(PageType::Leaf).unwrap();
862 drop(guard); }
864
865 assert!(bp.stats.pages_evicted.load(Ordering::Relaxed) > 0);
867 }
868
869 #[test]
870 fn test_buffer_pool_eviction_lirs() {
871 let (_tmp, bp) = create_test_buffer_pool_with_policy(EvictionPolicyType::Lirs);
872
873 assert_eq!(bp.eviction_policy_name(), "LIRS");
874
875 for _ in 0..15 {
877 let (_, guard) = bp.allocate_page(PageType::Leaf).unwrap();
878 drop(guard); }
880
881 assert!(bp.stats.pages_evicted.load(Ordering::Relaxed) > 0);
883 }
884
885 #[test]
886 fn test_buffer_pool_stats() {
887 let (_tmp, bp) = create_test_buffer_pool();
888
889 let (page_id, guard) = bp.allocate_page(PageType::Leaf).unwrap();
891 drop(guard);
892
893 let _guard = bp.get_page(page_id).unwrap();
895
896 let stats = bp.stats_snapshot();
897 assert!(stats.cache_hits > 0);
898 assert!(stats.hit_ratio() > 0.0);
899 }
900
901 #[test]
902 fn test_buffer_pool_hit_ratio() {
903 let (_tmp, bp) = create_test_buffer_pool();
904
905 let mut page_ids = Vec::new();
907 for _ in 0..5 {
908 let (page_id, guard) = bp.allocate_page(PageType::Leaf).unwrap();
909 page_ids.push(page_id);
910 drop(guard);
911 }
912
913 for _ in 0..10 {
915 for &page_id in &page_ids {
916 let _guard = bp.get_page(page_id).unwrap();
917 }
918 }
919
920 let stats = bp.stats_snapshot();
921 assert!(
922 stats.hit_ratio() > 0.9,
923 "Hit ratio should be high: {}",
924 stats.hit_ratio()
925 );
926 }
927}