Skip to main content

featherdb_storage/
buffer_pool.rs

1//! Buffer pool with pluggable eviction algorithms
2//!
3//! Supports multiple eviction policies:
4//! - Clock: Simple second-chance algorithm (default)
5//! - LRU-2: Tracks last 2 access times for scan resistance
6//! - LIRS: Low Inter-reference Recency Set for workload adaptation
7
8use crate::eviction::{EvictionPolicy, EvictionPolicyFactory, EvictionPolicyType};
9use crate::{FileManager, Page, PageType};
10use featherdb_core::{Error, PageId, Result, TransactionId};
11use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
12use std::collections::{HashMap, HashSet};
13use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize, Ordering};
14use std::sync::Arc;
15
16/// Statistics for the buffer pool
17#[derive(Debug, Default)]
18pub struct BufferPoolStats {
19    pub cache_hits: AtomicUsize,
20    pub cache_misses: AtomicUsize,
21    pub pages_evicted: AtomicUsize,
22    pub pages_flushed: AtomicUsize,
23}
24
25impl BufferPoolStats {
26    /// Calculate hit ratio (0.0 to 1.0)
27    pub fn hit_ratio(&self) -> f64 {
28        let hits = self.cache_hits.load(Ordering::Relaxed);
29        let misses = self.cache_misses.load(Ordering::Relaxed);
30        let total = hits + misses;
31        if total == 0 {
32            1.0
33        } else {
34            hits as f64 / total as f64
35        }
36    }
37
38    /// Reset all statistics
39    pub fn reset(&self) {
40        self.cache_hits.store(0, Ordering::Relaxed);
41        self.cache_misses.store(0, Ordering::Relaxed);
42        self.pages_evicted.store(0, Ordering::Relaxed);
43        self.pages_flushed.store(0, Ordering::Relaxed);
44    }
45
46    /// Get a snapshot of current statistics
47    pub fn snapshot(&self) -> BufferPoolStatsSnapshot {
48        BufferPoolStatsSnapshot {
49            cache_hits: self.cache_hits.load(Ordering::Relaxed),
50            cache_misses: self.cache_misses.load(Ordering::Relaxed),
51            pages_evicted: self.pages_evicted.load(Ordering::Relaxed),
52            pages_flushed: self.pages_flushed.load(Ordering::Relaxed),
53        }
54    }
55}
56
57/// Immutable snapshot of buffer pool statistics
58#[derive(Debug, Clone)]
59pub struct BufferPoolStatsSnapshot {
60    pub cache_hits: usize,
61    pub cache_misses: usize,
62    pub pages_evicted: usize,
63    pub pages_flushed: usize,
64}
65
66impl BufferPoolStatsSnapshot {
67    /// Calculate hit ratio (0.0 to 1.0)
68    pub fn hit_ratio(&self) -> f64 {
69        let total = self.cache_hits + self.cache_misses;
70        if total == 0 {
71            1.0
72        } else {
73            self.cache_hits as f64 / total as f64
74        }
75    }
76}
77
78/// A frame in the buffer pool holding a page
79pub struct BufferFrame {
80    /// The page ID
81    page_id: PageId,
82    /// The page data
83    page: RwLock<Page>,
84    /// Number of active references (pinned when > 0)
85    pin_count: AtomicU32,
86    /// Recently accessed (for clock algorithm, kept for compatibility)
87    reference_bit: AtomicBool,
88    /// Modified since last flush
89    dirty: AtomicBool,
90    /// Transaction that dirtied this page (0 if clean or already committed/flushed)
91    /// Uses AtomicU64 for lock-free access; 0 means no transaction owns this page
92    dirty_by_txn: AtomicU64,
93}
94
95impl BufferFrame {
96    fn new(page_id: PageId, page: Page) -> Self {
97        BufferFrame {
98            page_id,
99            page: RwLock::new(page),
100            pin_count: AtomicU32::new(0),
101            reference_bit: AtomicBool::new(true),
102            dirty: AtomicBool::new(false),
103            dirty_by_txn: AtomicU64::new(0),
104        }
105    }
106
107    #[allow(dead_code)]
108    pub fn page_id(&self) -> PageId {
109        self.page_id
110    }
111
112    #[allow(dead_code)]
113    pub fn is_dirty(&self) -> bool {
114        self.dirty.load(Ordering::Acquire)
115    }
116
117    pub fn mark_dirty(&self) {
118        self.dirty.store(true, Ordering::Release);
119    }
120
121    /// Mark page as dirty with transaction ownership
122    pub fn mark_dirty_by(&self, txn_id: TransactionId) {
123        self.dirty_by_txn.store(txn_id.0, Ordering::Release);
124        self.dirty.store(true, Ordering::Release);
125    }
126
127    /// Clear transaction ownership (called after commit/flush)
128    pub fn clear_txn_ownership(&self) {
129        self.dirty_by_txn.store(0, Ordering::Release);
130    }
131
132    /// Check if this page was dirtied by a specific transaction
133    #[allow(dead_code)]
134    pub fn is_dirty_by(&self, txn_id: TransactionId) -> bool {
135        self.dirty_by_txn.load(Ordering::Acquire) == txn_id.0
136    }
137
138    /// Get the transaction that owns this dirty page (None if clean or committed)
139    pub fn dirty_txn(&self) -> Option<TransactionId> {
140        let txn_id = self.dirty_by_txn.load(Ordering::Acquire);
141        if txn_id == 0 {
142            None
143        } else {
144            Some(TransactionId(txn_id))
145        }
146    }
147
148    #[allow(dead_code)]
149    pub fn pin_count(&self) -> u32 {
150        self.pin_count.load(Ordering::Acquire)
151    }
152}
153
154/// The buffer pool - caches pages in memory
155pub struct BufferPool {
156    /// Maximum number of pages to cache
157    capacity: usize,
158    /// Page cache: page_id -> frame
159    frames: RwLock<HashMap<PageId, Arc<BufferFrame>>>,
160    /// Eviction policy (behind a mutex for exclusive access during eviction)
161    eviction_policy: Mutex<Box<dyn EvictionPolicy>>,
162    /// File manager for disk I/O
163    file_manager: Arc<FileManager>,
164    /// Statistics
165    pub stats: BufferPoolStats,
166    /// Eviction policy type (for informational purposes)
167    policy_type: EvictionPolicyType,
168    /// Active (uncommitted) transactions - pages owned by these can't be evicted
169    active_txns: RwLock<HashSet<TransactionId>>,
170}
171
172impl BufferPool {
173    /// Create a new buffer pool with default Clock eviction
174    pub fn new(capacity: usize, file_manager: Arc<FileManager>) -> Self {
175        Self::with_policy(capacity, file_manager, EvictionPolicyType::Clock)
176    }
177
178    /// Create a new buffer pool with specified eviction policy
179    pub fn with_policy(
180        capacity: usize,
181        file_manager: Arc<FileManager>,
182        policy_type: EvictionPolicyType,
183    ) -> Self {
184        BufferPool {
185            capacity,
186            frames: RwLock::new(HashMap::new()),
187            eviction_policy: Mutex::new(policy_type.create(capacity)),
188            file_manager,
189            stats: BufferPoolStats::default(),
190            policy_type,
191            active_txns: RwLock::new(HashSet::new()),
192        }
193    }
194
195    /// Get the eviction policy type
196    pub fn eviction_policy_type(&self) -> EvictionPolicyType {
197        self.policy_type
198    }
199
200    /// Get the eviction policy name
201    pub fn eviction_policy_name(&self) -> &'static str {
202        self.eviction_policy.lock().name()
203    }
204
205    /// Get a page for reading
206    pub fn get_page(&self, page_id: PageId) -> Result<PageGuard> {
207        self.get_page_internal(page_id, false)
208    }
209
210    /// Get a page for writing
211    pub fn get_page_for_write(&self, page_id: PageId) -> Result<PageGuardMut> {
212        let guard = self.get_page_internal(page_id, true)?;
213        Ok(PageGuardMut { inner: guard })
214    }
215
216    fn get_page_internal(&self, page_id: PageId, for_write: bool) -> Result<PageGuard> {
217        // Get current active transaction for ownership tracking
218        let current_txn = if for_write {
219            self.get_single_active_txn()
220        } else {
221            None
222        };
223
224        // Fast path: page in cache
225        {
226            let frames = self.frames.read();
227            if let Some(frame) = frames.get(&page_id) {
228                frame.pin_count.fetch_add(1, Ordering::AcqRel);
229                frame.reference_bit.store(true, Ordering::Relaxed);
230                self.stats.cache_hits.fetch_add(1, Ordering::Relaxed);
231
232                // For Clock policy, the reference_bit set above is sufficient.
233                // For other policies (LRU-2, LIRS), we need to update their internal state.
234                if self.policy_type != EvictionPolicyType::Clock {
235                    let mut policy = self.eviction_policy.lock();
236                    policy.on_access(page_id);
237                }
238
239                if for_write {
240                    if let Some(txn_id) = current_txn {
241                        frame.mark_dirty_by(txn_id);
242                    } else {
243                        frame.mark_dirty();
244                    }
245                }
246
247                return Ok(PageGuard {
248                    frame: frame.clone(),
249                });
250            }
251        }
252
253        // Slow path: load from disk
254        self.stats.cache_misses.fetch_add(1, Ordering::Relaxed);
255        self.load_page_with_txn(page_id, for_write, current_txn)
256    }
257
258    fn load_page_with_txn(
259        &self,
260        page_id: PageId,
261        for_write: bool,
262        txn_id: Option<TransactionId>,
263    ) -> Result<PageGuard> {
264        // Evict if at capacity
265        {
266            let frames = self.frames.read();
267            if frames.len() >= self.capacity {
268                drop(frames);
269                self.evict_one()?;
270            }
271        }
272
273        // Read from disk
274        let page_data = self.file_manager.read_page(page_id)?;
275        let page = Page::from_bytes(page_data)?;
276
277        // Verify checksum
278        if !page.verify_checksum() {
279            return Err(Error::CorruptedPage(page_id));
280        }
281
282        // Create frame
283        let frame = Arc::new(BufferFrame::new(page_id, page));
284        frame.pin_count.fetch_add(1, Ordering::AcqRel);
285
286        if for_write {
287            if let Some(tid) = txn_id {
288                frame.mark_dirty_by(tid);
289            } else {
290                frame.mark_dirty();
291            }
292        }
293
294        // Insert into cache
295        {
296            let mut frames = self.frames.write();
297            let mut policy = self.eviction_policy.lock();
298
299            // Double-check another thread didn't load it
300            if let Some(existing) = frames.get(&page_id) {
301                existing.pin_count.fetch_add(1, Ordering::AcqRel);
302                return Ok(PageGuard {
303                    frame: existing.clone(),
304                });
305            }
306
307            frames.insert(page_id, frame.clone());
308            policy.on_load(page_id);
309        }
310
311        Ok(PageGuard { frame })
312    }
313
314    /// Allocate a new page
315    pub fn allocate_page(&self, page_type: PageType) -> Result<(PageId, PageGuardMut)> {
316        // Get current active transaction for ownership tracking
317        let current_txn = self.get_single_active_txn();
318
319        // Evict if at capacity
320        {
321            let frames = self.frames.read();
322            if frames.len() >= self.capacity {
323                drop(frames);
324                self.evict_one()?;
325            }
326        }
327
328        // Extend file
329        let page_id = self.file_manager.extend()?;
330
331        // Create new page
332        let page = Page::new(page_id, page_type, self.file_manager.page_size());
333
334        // Create frame
335        let frame = Arc::new(BufferFrame::new(page_id, page));
336        frame.pin_count.fetch_add(1, Ordering::AcqRel);
337
338        // Mark dirty with transaction ownership if we have an active transaction
339        if let Some(txn_id) = current_txn {
340            frame.mark_dirty_by(txn_id);
341        } else {
342            frame.mark_dirty();
343        }
344
345        // Insert into cache
346        {
347            let mut frames = self.frames.write();
348            let mut policy = self.eviction_policy.lock();
349
350            frames.insert(page_id, frame.clone());
351            policy.on_load(page_id);
352        }
353
354        Ok((
355            page_id,
356            PageGuardMut {
357                inner: PageGuard { frame },
358            },
359        ))
360    }
361
362    /// Copy a page for modification (copy-on-write)
363    pub fn copy_page(&self, page_id: PageId) -> Result<(PageId, PageGuardMut)> {
364        // Get current active transaction for ownership tracking
365        let current_txn = self.get_single_active_txn();
366
367        // Read the original page
368        let original = self.get_page(page_id)?;
369        let page_data = {
370            let page = original.read();
371            page.as_bytes().to_vec()
372        };
373        drop(original);
374
375        // Allocate new page
376        let new_page_id = self.file_manager.extend()?;
377        let mut new_page = Page::from_bytes(page_data)?;
378        new_page.set_page_id(new_page_id);
379
380        // Create frame
381        let frame = Arc::new(BufferFrame::new(new_page_id, new_page));
382        frame.pin_count.fetch_add(1, Ordering::AcqRel);
383
384        // Mark dirty with transaction ownership if we have an active transaction
385        if let Some(txn_id) = current_txn {
386            frame.mark_dirty_by(txn_id);
387        } else {
388            frame.mark_dirty();
389        }
390
391        // Insert into cache
392        {
393            let mut frames = self.frames.write();
394            let mut policy = self.eviction_policy.lock();
395
396            frames.insert(new_page_id, frame.clone());
397            policy.on_load(new_page_id);
398        }
399
400        Ok((
401            new_page_id,
402            PageGuardMut {
403                inner: PageGuard { frame },
404            },
405        ))
406    }
407
408    /// Evict one page using the configured eviction policy
409    fn evict_one(&self) -> Result<()> {
410        // Try to find a victim using the eviction policy
411        let max_attempts = self.capacity * 2;
412
413        for _ in 0..max_attempts {
414            let victim_id = {
415                let mut policy = self.eviction_policy.lock();
416                policy.select_victim()
417            };
418
419            let Some(page_id) = victim_id else {
420                // No victim available
421                return Err(Error::BufferPoolFull {
422                    capacity: self.capacity,
423                    pinned: self.count_pinned(),
424                });
425            };
426
427            // Check if the page can be evicted
428            let frame_opt = {
429                let frames = self.frames.read();
430                frames.get(&page_id).cloned()
431            };
432
433            if let Some(frame) = frame_opt {
434                // Check if page can be evicted (not pinned, not dirty by active txn)
435                if !self.can_evict_frame(&frame) {
436                    // Remove from policy and try again
437                    let mut policy = self.eviction_policy.lock();
438                    policy.remove(page_id);
439                    continue;
440                }
441
442                // Found victim! Flush if dirty (only committed pages reach here)
443                if frame.dirty.load(Ordering::Acquire) {
444                    self.flush_frame(&frame)?;
445                }
446
447                // Remove from cache and policy
448                {
449                    let mut frames = self.frames.write();
450                    let mut policy = self.eviction_policy.lock();
451
452                    frames.remove(&page_id);
453                    policy.remove(page_id);
454                }
455
456                self.stats.pages_evicted.fetch_add(1, Ordering::Relaxed);
457                return Ok(());
458            } else {
459                // Page not in cache, remove from policy
460                let mut policy = self.eviction_policy.lock();
461                policy.remove(page_id);
462            }
463        }
464
465        Err(Error::BufferPoolFull {
466            capacity: self.capacity,
467            pinned: self.count_pinned(),
468        })
469    }
470
471    fn flush_frame(&self, frame: &BufferFrame) -> Result<()> {
472        let mut page = frame.page.write();
473
474        // Compute checksum
475        page.compute_checksum();
476
477        // Write to disk
478        self.file_manager
479            .write_page(frame.page_id, page.as_bytes())?;
480
481        frame.dirty.store(false, Ordering::Release);
482        self.stats.pages_flushed.fetch_add(1, Ordering::Relaxed);
483
484        Ok(())
485    }
486
487    /// Flush all dirty pages
488    ///
489    /// Skips pages owned by active (uncommitted) transactions to prevent
490    /// uncommitted data from being persisted.
491    pub fn flush_all(&self) -> Result<()> {
492        let frames = self.frames.read();
493        let active = self.active_txns.read();
494
495        for frame in frames.values() {
496            if frame.dirty.load(Ordering::Acquire) {
497                // Skip pages owned by active transactions
498                if let Some(txn_id) = frame.dirty_txn() {
499                    if active.contains(&txn_id) {
500                        continue;
501                    }
502                }
503                self.flush_frame(frame)?;
504            }
505        }
506
507        Ok(())
508    }
509
510    /// Flush a specific page
511    pub fn flush_page(&self, page_id: PageId) -> Result<()> {
512        let frames = self.frames.read();
513
514        if let Some(frame) = frames.get(&page_id) {
515            if frame.dirty.load(Ordering::Acquire) {
516                self.flush_frame(frame)?;
517            }
518        }
519
520        Ok(())
521    }
522
523    // ==================== Transaction-Aware Methods ====================
524
525    /// Register a transaction as active (its dirty pages can't be evicted)
526    pub fn register_active_txn(&self, txn_id: TransactionId) {
527        self.active_txns.write().insert(txn_id);
528    }
529
530    /// Unregister a transaction (called on commit or abort)
531    pub fn unregister_active_txn(&self, txn_id: TransactionId) {
532        self.active_txns.write().remove(&txn_id);
533    }
534
535    /// Get a page for write within a transaction context
536    ///
537    /// This marks the page as dirty and records the transaction that modified it.
538    /// Pages owned by uncommitted transactions cannot be evicted.
539    pub fn get_page_for_write_in_txn(
540        &self,
541        page_id: PageId,
542        txn_id: TransactionId,
543    ) -> Result<PageGuardMut> {
544        let guard = self.get_page_internal(page_id, true)?;
545        // Mark with transaction ownership
546        guard.frame.mark_dirty_by(txn_id);
547        Ok(PageGuardMut { inner: guard })
548    }
549
550    /// Allocate a new page within a transaction context
551    pub fn allocate_page_in_txn(
552        &self,
553        page_type: PageType,
554        txn_id: TransactionId,
555    ) -> Result<(PageId, PageGuardMut)> {
556        // Evict if at capacity
557        {
558            let frames = self.frames.read();
559            if frames.len() >= self.capacity {
560                drop(frames);
561                self.evict_one()?;
562            }
563        }
564
565        // Extend file
566        let page_id = self.file_manager.extend()?;
567
568        // Create new page
569        let page = Page::new(page_id, page_type, self.file_manager.page_size());
570
571        // Create frame
572        let frame = Arc::new(BufferFrame::new(page_id, page));
573        frame.pin_count.fetch_add(1, Ordering::AcqRel);
574        frame.mark_dirty_by(txn_id);
575
576        // Insert into cache
577        {
578            let mut frames = self.frames.write();
579            let mut policy = self.eviction_policy.lock();
580
581            frames.insert(page_id, frame.clone());
582            policy.on_load(page_id);
583        }
584
585        Ok((
586            page_id,
587            PageGuardMut {
588                inner: PageGuard { frame },
589            },
590        ))
591    }
592
593    /// Flush only pages dirtied by a specific transaction
594    ///
595    /// Called during commit to persist the transaction's changes.
596    /// After flushing, clears the transaction ownership from the pages.
597    pub fn flush_transaction(&self, txn_id: TransactionId) -> Result<()> {
598        let frames = self.frames.read();
599
600        for frame in frames.values() {
601            if frame.is_dirty_by(txn_id) {
602                self.flush_frame(frame)?;
603                frame.clear_txn_ownership();
604            }
605        }
606
607        Ok(())
608    }
609
610    /// Discard dirty pages from an aborted transaction
611    ///
612    /// This clears the dirty flag and transaction ownership for pages
613    /// that were modified by the aborted transaction, effectively
614    /// discarding their uncommitted changes.
615    ///
616    /// Note: The in-memory page data still contains the aborted writes,
617    /// but since the dirty flag is cleared, they won't be persisted.
618    /// When the page is evicted and re-read, the original data is restored.
619    pub fn discard_transaction(&self, txn_id: TransactionId) -> Result<()> {
620        // First pass: collect pages to discard
621        let pages_to_discard: Vec<PageId> = {
622            let frames = self.frames.read();
623            frames
624                .iter()
625                .filter(|(_, frame)| frame.is_dirty_by(txn_id))
626                .map(|(page_id, _)| *page_id)
627                .collect()
628        };
629
630        // Second pass: for each page, clear dirty flag and remove if unpinned
631        for page_id in pages_to_discard {
632            // Get write lock and process the page
633            let mut frames_write = self.frames.write();
634
635            if let Some(frame) = frames_write.get(&page_id) {
636                // Only process if still owned by this transaction
637                if frame.is_dirty_by(txn_id) {
638                    // Clear the dirty flag so the page won't be flushed
639                    frame.dirty.store(false, Ordering::Release);
640                    frame.clear_txn_ownership();
641
642                    // If unpinned, remove from cache so it gets reloaded from disk
643                    if frame.pin_count.load(Ordering::Acquire) == 0 {
644                        frames_write.remove(&page_id);
645                        let mut policy = self.eviction_policy.lock();
646                        policy.remove(page_id);
647                    }
648                }
649            }
650        }
651
652        Ok(())
653    }
654
655    /// Get the single active transaction, if exactly one exists
656    ///
657    /// This is used to automatically associate dirty pages with transactions
658    /// when B-tree operations use the standard get_page_for_write() path.
659    /// If there's exactly one active transaction, we can infer that any
660    /// write operation belongs to it.
661    fn get_single_active_txn(&self) -> Option<TransactionId> {
662        let active = self.active_txns.read();
663        if active.len() == 1 {
664            active.iter().next().copied()
665        } else {
666            None
667        }
668    }
669
670    /// Check if a page can be evicted
671    ///
672    /// A page can be evicted if:
673    /// 1. It's not pinned (pin_count == 0)
674    /// 2. It's not dirty by an active (uncommitted) transaction
675    fn can_evict_frame(&self, frame: &BufferFrame) -> bool {
676        // Can't evict pinned pages
677        if frame.pin_count.load(Ordering::Acquire) > 0 {
678            return false;
679        }
680
681        // Can't evict pages dirty by active transactions
682        if let Some(txn_id) = frame.dirty_txn() {
683            let active = self.active_txns.read();
684            if active.contains(&txn_id) {
685                return false;
686            }
687        }
688
689        true
690    }
691
692    /// Count the number of currently pinned pages
693    fn count_pinned(&self) -> usize {
694        let frames = self.frames.read();
695        frames
696            .values()
697            .filter(|f| f.pin_count.load(Ordering::Relaxed) > 0)
698            .count()
699    }
700
701    /// Get the number of pages in the buffer pool
702    pub fn size(&self) -> usize {
703        self.frames.read().len()
704    }
705
706    /// Get the capacity of the buffer pool
707    pub fn capacity(&self) -> usize {
708        self.capacity
709    }
710
711    /// Get a snapshot of current statistics
712    pub fn stats_snapshot(&self) -> BufferPoolStatsSnapshot {
713        self.stats.snapshot()
714    }
715}
716
717impl Drop for BufferPool {
718    fn drop(&mut self) {
719        // Best-effort: try to flush remaining dirty pages
720        let _ = self.flush_all();
721    }
722}
723
724/// RAII guard for reading a page
725pub struct PageGuard {
726    frame: Arc<BufferFrame>,
727}
728
729impl PageGuard {
730    pub fn read(&self) -> RwLockReadGuard<'_, Page> {
731        self.frame.reference_bit.store(true, Ordering::Relaxed);
732        self.frame.page.read()
733    }
734
735    pub fn page_id(&self) -> PageId {
736        self.frame.page_id
737    }
738}
739
740impl Drop for PageGuard {
741    fn drop(&mut self) {
742        self.frame.pin_count.fetch_sub(1, Ordering::AcqRel);
743    }
744}
745
746/// RAII guard for writing a page
747pub struct PageGuardMut {
748    inner: PageGuard,
749}
750
751impl PageGuardMut {
752    pub fn read(&self) -> RwLockReadGuard<'_, Page> {
753        self.inner.read()
754    }
755
756    pub fn write(&self) -> RwLockWriteGuard<'_, Page> {
757        self.inner
758            .frame
759            .reference_bit
760            .store(true, Ordering::Relaxed);
761        self.inner.frame.mark_dirty();
762        self.inner.frame.page.write()
763    }
764
765    pub fn page_id(&self) -> PageId {
766        self.inner.page_id()
767    }
768
769    pub fn apply_redo(&mut self, data: &[u8]) -> Result<()> {
770        let mut page = self.write();
771        page.apply_redo(data)
772    }
773}
774
775impl Drop for PageGuardMut {
776    fn drop(&mut self) {
777        // inner's drop will handle unpin
778    }
779}
780
781#[cfg(test)]
782mod tests {
783    use super::*;
784    use crate::FileManager;
785    use featherdb_core::Config;
786    use tempfile::TempDir;
787
788    fn create_test_buffer_pool() -> (TempDir, BufferPool) {
789        create_test_buffer_pool_with_policy(EvictionPolicyType::Clock)
790    }
791
792    fn create_test_buffer_pool_with_policy(policy: EvictionPolicyType) -> (TempDir, BufferPool) {
793        let tmp = TempDir::new().unwrap();
794        let path = tmp.path().join("test.db");
795        let config = Config::new(&path);
796
797        let fm = Arc::new(FileManager::open(&config).unwrap());
798        fm.init_if_needed(&config).unwrap();
799
800        let bp = BufferPool::with_policy(10, fm, policy);
801        (tmp, bp)
802    }
803
804    #[test]
805    fn test_buffer_pool_allocate() {
806        let (_tmp, bp) = create_test_buffer_pool();
807
808        let (page_id, guard) = bp.allocate_page(PageType::Leaf).unwrap();
809        assert!(page_id.0 >= 3); // First data page
810
811        {
812            let page = guard.read();
813            assert_eq!(page.page_type(), PageType::Leaf);
814        }
815    }
816
817    #[test]
818    fn test_buffer_pool_read_write() {
819        let (_tmp, bp) = create_test_buffer_pool();
820
821        // Allocate and write
822        let (page_id, guard) = bp.allocate_page(PageType::Leaf).unwrap();
823        {
824            let mut page = guard.write();
825            page.insert_cell(b"hello");
826        }
827        drop(guard);
828
829        // Flush
830        bp.flush_page(page_id).unwrap();
831
832        // Read back
833        let guard = bp.get_page(page_id).unwrap();
834        let page = guard.read();
835        assert_eq!(page.get_cell(0), Some(b"hello".as_slice()));
836    }
837
838    #[test]
839    fn test_buffer_pool_eviction_clock() {
840        let (_tmp, bp) = create_test_buffer_pool_with_policy(EvictionPolicyType::Clock);
841
842        // Allocate more pages than capacity
843        for _ in 0..15 {
844            let (_, guard) = bp.allocate_page(PageType::Leaf).unwrap();
845            drop(guard); // Unpin immediately
846        }
847
848        // Should have evicted some pages
849        assert!(bp.stats.pages_evicted.load(Ordering::Relaxed) > 0);
850        assert_eq!(bp.eviction_policy_name(), "Clock");
851    }
852
853    #[test]
854    fn test_buffer_pool_eviction_lru2() {
855        let (_tmp, bp) = create_test_buffer_pool_with_policy(EvictionPolicyType::Lru2);
856
857        assert_eq!(bp.eviction_policy_name(), "LRU-2");
858
859        // Allocate more pages than capacity
860        for _ in 0..15 {
861            let (_, guard) = bp.allocate_page(PageType::Leaf).unwrap();
862            drop(guard); // Unpin immediately
863        }
864
865        // Should have evicted some pages
866        assert!(bp.stats.pages_evicted.load(Ordering::Relaxed) > 0);
867    }
868
869    #[test]
870    fn test_buffer_pool_eviction_lirs() {
871        let (_tmp, bp) = create_test_buffer_pool_with_policy(EvictionPolicyType::Lirs);
872
873        assert_eq!(bp.eviction_policy_name(), "LIRS");
874
875        // Allocate more pages than capacity
876        for _ in 0..15 {
877            let (_, guard) = bp.allocate_page(PageType::Leaf).unwrap();
878            drop(guard); // Unpin immediately
879        }
880
881        // Should have evicted some pages
882        assert!(bp.stats.pages_evicted.load(Ordering::Relaxed) > 0);
883    }
884
885    #[test]
886    fn test_buffer_pool_stats() {
887        let (_tmp, bp) = create_test_buffer_pool();
888
889        // Allocate a page
890        let (page_id, guard) = bp.allocate_page(PageType::Leaf).unwrap();
891        drop(guard);
892
893        // Access the cached page (should be a hit)
894        let _guard = bp.get_page(page_id).unwrap();
895
896        let stats = bp.stats_snapshot();
897        assert!(stats.cache_hits > 0);
898        assert!(stats.hit_ratio() > 0.0);
899    }
900
901    #[test]
902    fn test_buffer_pool_hit_ratio() {
903        let (_tmp, bp) = create_test_buffer_pool();
904
905        // Create some pages
906        let mut page_ids = Vec::new();
907        for _ in 0..5 {
908            let (page_id, guard) = bp.allocate_page(PageType::Leaf).unwrap();
909            page_ids.push(page_id);
910            drop(guard);
911        }
912
913        // Access them multiple times (cache hits)
914        for _ in 0..10 {
915            for &page_id in &page_ids {
916                let _guard = bp.get_page(page_id).unwrap();
917            }
918        }
919
920        let stats = bp.stats_snapshot();
921        assert!(
922            stats.hit_ratio() > 0.9,
923            "Hit ratio should be high: {}",
924            stats.hit_ratio()
925        );
926    }
927}