vibesql_storage/buffer/
mod.rs

1//! Buffer Pool - LRU Cache for Page Management
2//!
3//! This module provides an LRU (Least Recently Used) buffer pool that caches
4//! hot pages in memory, improving disk-backed B+ tree performance.
5
6#[cfg(target_arch = "wasm32")]
7use std::sync::Mutex;
8use std::{
9    num::NonZeroUsize,
10    sync::{
11        atomic::{AtomicU64, Ordering},
12        Arc,
13    },
14};
15
16use lru::LruCache;
17#[cfg(not(target_arch = "wasm32"))]
18use parking_lot::Mutex;
19
20use crate::{
21    page::{Page, PageId, PageManager},
22    StorageError,
23};
24
25/// Helper macro to handle platform-specific mutex locking
26/// - parking_lot::Mutex::lock() returns guard directly
27/// - std::sync::Mutex::lock() returns Result<Guard, PoisonError>
28macro_rules! lock {
29    ($mutex:expr) => {{
30        #[cfg(not(target_arch = "wasm32"))]
31        {
32            $mutex.lock()
33        }
34        #[cfg(target_arch = "wasm32")]
35        {
36            $mutex.lock().unwrap()
37        }
38    }};
39}
40
41/// Statistics for buffer pool performance monitoring
42#[derive(Debug, Default)]
43pub struct BufferPoolStats {
44    /// Number of cache hits (page found in cache)
45    hits: AtomicU64,
46    /// Number of cache misses (page loaded from disk)
47    misses: AtomicU64,
48    /// Number of page evictions
49    evictions: AtomicU64,
50}
51
52impl BufferPoolStats {
53    /// Create new statistics tracker
54    pub fn new() -> Self {
55        Self::default()
56    }
57
58    /// Get number of cache hits
59    pub fn hits(&self) -> u64 {
60        self.hits.load(Ordering::Relaxed)
61    }
62
63    /// Get number of cache misses
64    pub fn misses(&self) -> u64 {
65        self.misses.load(Ordering::Relaxed)
66    }
67
68    /// Get number of evictions
69    pub fn evictions(&self) -> u64 {
70        self.evictions.load(Ordering::Relaxed)
71    }
72
73    /// Calculate hit rate as a percentage (0.0 to 1.0)
74    pub fn hit_rate(&self) -> f64 {
75        let hits = self.hits();
76        let misses = self.misses();
77        let total = hits + misses;
78        if total == 0 {
79            0.0
80        } else {
81            hits as f64 / total as f64
82        }
83    }
84
85    /// Record a cache hit
86    fn record_hit(&self) {
87        self.hits.fetch_add(1, Ordering::Relaxed);
88    }
89
90    /// Record a cache miss
91    fn record_miss(&self) {
92        self.misses.fetch_add(1, Ordering::Relaxed);
93    }
94
95    /// Record an eviction
96    fn record_eviction(&self) {
97        self.evictions.fetch_add(1, Ordering::Relaxed);
98    }
99}
100
101/// LRU buffer pool for caching pages in memory
102#[derive(Debug)]
103pub struct BufferPool {
104    /// LRU cache mapping page IDs to pages
105    cache: Arc<Mutex<LruCache<PageId, Page>>>,
106    /// Page manager for disk I/O
107    page_manager: Arc<PageManager>,
108    /// Maximum number of pages to cache
109    capacity: usize,
110    /// Performance statistics
111    stats: BufferPoolStats,
112}
113
114impl BufferPool {
115    /// Create a new buffer pool with the specified capacity
116    ///
117    /// # Arguments
118    /// * `page_manager` - Page manager for disk I/O
119    /// * `capacity` - Maximum number of pages to cache (default: 1000 = ~4MB)
120    pub fn new(page_manager: Arc<PageManager>, capacity: usize) -> Self {
121        let capacity = if capacity == 0 { 1000 } else { capacity };
122        let cache = Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(capacity).unwrap())));
123
124        BufferPool { cache, page_manager, capacity, stats: BufferPoolStats::new() }
125    }
126
127    /// Get a page from the cache or load from disk
128    ///
129    /// # Arguments
130    /// * `page_id` - ID of the page to retrieve
131    ///
132    /// # Returns
133    /// The requested page, either from cache or loaded from disk
134    pub fn get_page(&self, page_id: PageId) -> Result<Page, StorageError> {
135        // Try to get from cache first
136        {
137            let mut cache = lock!(self.cache);
138
139            if let Some(page) = cache.get(&page_id) {
140                self.stats.record_hit();
141                return Ok(page.clone());
142            }
143        }
144
145        // Cache miss - load from disk
146        self.stats.record_miss();
147        let page = self.page_manager.read_page(page_id)?;
148
149        // Insert into cache
150        self.put_page_internal(page.clone())?;
151
152        Ok(page)
153    }
154
155    /// Put a page into the cache
156    ///
157    /// # Arguments
158    /// * `page` - The page to cache
159    pub fn put_page(&self, page: Page) -> Result<(), StorageError> {
160        self.put_page_internal(page)
161    }
162
163    /// Internal method to put a page into the cache
164    fn put_page_internal(&self, page: Page) -> Result<(), StorageError> {
165        let mut cache = lock!(self.cache);
166
167        // If cache is at capacity, LRU will evict automatically
168        if let Some((_evicted_id, mut evicted_page)) = cache.push(page.id, page) {
169            // If evicted page is dirty, write to disk
170            if evicted_page.dirty {
171                drop(cache); // Release lock before disk I/O
172                self.page_manager.write_page(&mut evicted_page)?;
173            }
174            self.stats.record_eviction();
175        }
176
177        Ok(())
178    }
179
180    /// Flush all dirty pages to disk
181    pub fn flush_dirty(&self) -> Result<(), StorageError> {
182        let cache = lock!(self.cache);
183
184        // Collect dirty pages (we need to release the lock before writing)
185        let dirty_pages: Vec<(PageId, Page)> = cache
186            .iter()
187            .filter(|(_, page)| page.dirty)
188            .map(|(id, page)| (*id, page.clone()))
189            .collect();
190
191        drop(cache); // Release lock before disk I/O
192
193        // Write all dirty pages
194        for (_id, mut page) in dirty_pages {
195            self.page_manager.write_page(&mut page)?;
196
197            // Update the page in cache to mark it clean
198            let mut cache = lock!(self.cache);
199            if let Some(cached_page) = cache.get_mut(&page.id) {
200                cached_page.mark_clean();
201            }
202        }
203
204        Ok(())
205    }
206
207    /// Manually evict a page from the cache
208    ///
209    /// # Arguments
210    /// * `page_id` - ID of the page to evict
211    pub fn evict(&self, page_id: PageId) -> Result<(), StorageError> {
212        let mut cache = lock!(self.cache);
213
214        if let Some(mut page) = cache.pop(&page_id) {
215            // If page is dirty, write to disk
216            if page.dirty {
217                drop(cache); // Release lock before disk I/O
218                self.page_manager.write_page(&mut page)?;
219            }
220            self.stats.record_eviction();
221        }
222
223        Ok(())
224    }
225
226    /// Get the current capacity of the buffer pool
227    pub fn capacity(&self) -> usize {
228        self.capacity
229    }
230
231    /// Get the current number of pages in the cache
232    pub fn size(&self) -> Result<usize, StorageError> {
233        let cache = lock!(self.cache);
234        Ok(cache.len())
235    }
236
237    /// Get statistics for the buffer pool
238    pub fn stats(&self) -> &BufferPoolStats {
239        &self.stats
240    }
241}
242
243#[cfg(test)]
244#[cfg(not(target_arch = "wasm32"))]
245mod tests {
246    use tempfile::TempDir;
247
248    use super::*;
249    use crate::NativeStorage;
250
251    #[test]
252    fn test_buffer_pool_creation() {
253        let temp_dir = TempDir::new().unwrap();
254        let storage = Arc::new(NativeStorage::new(temp_dir.path()).unwrap());
255        let page_manager = Arc::new(PageManager::new("test.db", storage).unwrap());
256        let buffer_pool = BufferPool::new(page_manager, 10);
257
258        assert_eq!(buffer_pool.capacity(), 10);
259        assert_eq!(buffer_pool.size().unwrap(), 0);
260    }
261
262    #[test]
263    fn test_cache_hit() {
264        let temp_dir = TempDir::new().unwrap();
265        let storage = Arc::new(NativeStorage::new(temp_dir.path()).unwrap());
266        let page_manager = Arc::new(PageManager::new("test.db", storage).unwrap());
267        let buffer_pool = BufferPool::new(page_manager.clone(), 10);
268
269        // Create and write a page to disk
270        let mut page = Page::new(1);
271        page.data[0] = 42;
272        page.mark_dirty();
273        page_manager.write_page(&mut page).unwrap();
274
275        // First access - cache miss
276        let page1 = buffer_pool.get_page(1).unwrap();
277        assert_eq!(page1.data[0], 42);
278        assert_eq!(buffer_pool.stats().misses(), 1);
279        assert_eq!(buffer_pool.stats().hits(), 0);
280
281        // Second access - cache hit
282        let page2 = buffer_pool.get_page(1).unwrap();
283        assert_eq!(page2.data[0], 42);
284        assert_eq!(buffer_pool.stats().misses(), 1);
285        assert_eq!(buffer_pool.stats().hits(), 1);
286    }
287
288    #[test]
289    fn test_cache_miss() {
290        let temp_dir = TempDir::new().unwrap();
291        let storage = Arc::new(NativeStorage::new(temp_dir.path()).unwrap());
292        let page_manager = Arc::new(PageManager::new("test.db", storage).unwrap());
293        let buffer_pool = BufferPool::new(page_manager.clone(), 10);
294
295        // Access a page that doesn't exist yet
296        let page = buffer_pool.get_page(1).unwrap();
297        assert_eq!(page.id, 1);
298        assert_eq!(buffer_pool.stats().misses(), 1);
299        assert_eq!(buffer_pool.stats().hits(), 0);
300    }
301
302    #[test]
303    fn test_eviction() {
304        let temp_dir = TempDir::new().unwrap();
305        let storage = Arc::new(NativeStorage::new(temp_dir.path()).unwrap());
306        let page_manager = Arc::new(PageManager::new("test.db", storage).unwrap());
307        let buffer_pool = BufferPool::new(page_manager.clone(), 3);
308
309        // Fill cache to capacity
310        for i in 1..=3 {
311            let mut page = Page::new(i);
312            page.data[0] = i as u8;
313            buffer_pool.put_page(page).unwrap();
314        }
315
316        assert_eq!(buffer_pool.size().unwrap(), 3);
317        assert_eq!(buffer_pool.stats().evictions(), 0);
318
319        // Add one more page - should trigger eviction
320        let page4 = Page::new(4);
321        buffer_pool.put_page(page4).unwrap();
322
323        assert_eq!(buffer_pool.size().unwrap(), 3);
324        assert_eq!(buffer_pool.stats().evictions(), 1);
325    }
326
327    #[test]
328    fn test_dirty_page_write_on_eviction() {
329        let temp_dir = TempDir::new().unwrap();
330        let storage = Arc::new(NativeStorage::new(temp_dir.path()).unwrap());
331        let page_manager = Arc::new(PageManager::new("test.db", storage).unwrap());
332        let buffer_pool = BufferPool::new(page_manager.clone(), 2);
333
334        // Add a dirty page
335        let mut page1 = Page::new(1);
336        page1.data[0] = 42;
337        page1.mark_dirty();
338        buffer_pool.put_page(page1).unwrap();
339
340        // Add another page
341        let page2 = Page::new(2);
342        buffer_pool.put_page(page2).unwrap();
343
344        // Add a third page - should evict page1 and write it to disk
345        let page3 = Page::new(3);
346        buffer_pool.put_page(page3).unwrap();
347
348        assert_eq!(buffer_pool.stats().evictions(), 1);
349
350        // Read page1 from disk to verify it was written
351        let read_page = page_manager.read_page(1).unwrap();
352        assert_eq!(read_page.data[0], 42);
353    }
354
355    #[test]
356    fn test_flush_dirty_pages() {
357        let temp_dir = TempDir::new().unwrap();
358        let storage = Arc::new(NativeStorage::new(temp_dir.path()).unwrap());
359        let page_manager = Arc::new(PageManager::new("test.db", storage).unwrap());
360        let buffer_pool = BufferPool::new(page_manager.clone(), 10);
361
362        // Add multiple dirty pages
363        for i in 1..=5 {
364            let mut page = Page::new(i);
365            page.data[0] = i as u8;
366            page.mark_dirty();
367            buffer_pool.put_page(page).unwrap();
368        }
369
370        // Flush all dirty pages
371        buffer_pool.flush_dirty().unwrap();
372
373        // Verify all pages were written to disk
374        for i in 1..=5 {
375            let page = page_manager.read_page(i).unwrap();
376            assert_eq!(page.data[0], i as u8);
377        }
378    }
379
380    #[test]
381    fn test_cache_statistics() {
382        let temp_dir = TempDir::new().unwrap();
383        let storage = Arc::new(NativeStorage::new(temp_dir.path()).unwrap());
384        let page_manager = Arc::new(PageManager::new("test.db", storage).unwrap());
385        let buffer_pool = BufferPool::new(page_manager.clone(), 10);
386
387        // Mix of hits and misses
388        buffer_pool.get_page(1).unwrap(); // miss
389        buffer_pool.get_page(1).unwrap(); // hit
390        buffer_pool.get_page(2).unwrap(); // miss
391        buffer_pool.get_page(1).unwrap(); // hit
392        buffer_pool.get_page(2).unwrap(); // hit
393
394        assert_eq!(buffer_pool.stats().hits(), 3);
395        assert_eq!(buffer_pool.stats().misses(), 2);
396        assert_eq!(buffer_pool.stats().hit_rate(), 0.6);
397    }
398
399    #[test]
400    fn test_manual_eviction() {
401        let temp_dir = TempDir::new().unwrap();
402        let storage = Arc::new(NativeStorage::new(temp_dir.path()).unwrap());
403        let page_manager = Arc::new(PageManager::new("test.db", storage).unwrap());
404        let buffer_pool = BufferPool::new(page_manager.clone(), 10);
405
406        // Add a page
407        let mut page = Page::new(1);
408        page.data[0] = 42;
409        page.mark_dirty();
410        buffer_pool.put_page(page).unwrap();
411
412        assert_eq!(buffer_pool.size().unwrap(), 1);
413
414        // Manually evict the page
415        buffer_pool.evict(1).unwrap();
416
417        assert_eq!(buffer_pool.size().unwrap(), 0);
418        assert_eq!(buffer_pool.stats().evictions(), 1);
419
420        // Verify page was written to disk
421        let read_page = page_manager.read_page(1).unwrap();
422        assert_eq!(read_page.data[0], 42);
423    }
424
425    #[test]
426    fn test_zero_capacity_defaults_to_1000() {
427        let temp_dir = TempDir::new().unwrap();
428        let storage = Arc::new(NativeStorage::new(temp_dir.path()).unwrap());
429        let page_manager = Arc::new(PageManager::new("test.db", storage).unwrap());
430        let buffer_pool = BufferPool::new(page_manager, 0);
431
432        assert_eq!(buffer_pool.capacity(), 1000);
433    }
434}