Skip to main content

aegis_storage/
buffer.rs

1//! Aegis Buffer - Buffer Pool Management
2//!
3//! LRU-based buffer pool for caching pages in memory. Provides efficient
4//! page access with automatic eviction of least-recently-used pages when
5//! the pool reaches capacity.
6//!
7//! Key Features:
8//! - LRU eviction policy for cache management
9//! - Pin counting to protect active pages from eviction
10//! - Dirty page tracking for write-back optimization
11//! - Thread-safe concurrent access
12//!
13//! @version 0.1.0
14//! @author AutomataNexus Development Team
15
16use crate::page::{Page, PageType};
17use aegis_common::{AegisError, PageId, Result};
18use parking_lot::{Mutex, RwLock};
19use std::collections::{HashMap, VecDeque};
20use std::sync::Arc;
21
22// =============================================================================
23// Buffer Pool Configuration
24// =============================================================================
25
26/// Configuration for the buffer pool.
27#[derive(Debug, Clone)]
28pub struct BufferPoolConfig {
29    pub pool_size: usize,
30    pub prefetch_size: usize,
31}
32
33impl Default for BufferPoolConfig {
34    fn default() -> Self {
35        Self {
36            pool_size: 1024,
37            prefetch_size: 8,
38        }
39    }
40}
41
42// =============================================================================
43// Buffer Frame
44// =============================================================================
45
46/// A frame in the buffer pool holding a page.
47struct BufferFrame {
48    page: RwLock<Option<Page>>,
49    page_id: RwLock<Option<PageId>>,
50}
51
52impl BufferFrame {
53    fn new() -> Self {
54        Self {
55            page: RwLock::new(None),
56            page_id: RwLock::new(None),
57        }
58    }
59
60    #[allow(dead_code)]
61    fn is_occupied(&self) -> bool {
62        self.page_id.read().is_some()
63    }
64
65    fn get_page_id(&self) -> Option<PageId> {
66        *self.page_id.read()
67    }
68}
69
70// =============================================================================
71// Buffer Pool
72// =============================================================================
73
74/// LRU buffer pool for page caching.
75pub struct BufferPool {
76    frames: Vec<Arc<BufferFrame>>,
77    page_table: RwLock<HashMap<PageId, usize>>,
78    free_list: Mutex<VecDeque<usize>>,
79    lru_list: Mutex<VecDeque<usize>>,
80    config: BufferPoolConfig,
81}
82
83impl BufferPool {
84    /// Create a new buffer pool with the given configuration.
85    pub fn new(config: BufferPoolConfig) -> Self {
86        let mut frames = Vec::with_capacity(config.pool_size);
87        let mut free_list = VecDeque::with_capacity(config.pool_size);
88
89        for i in 0..config.pool_size {
90            frames.push(Arc::new(BufferFrame::new()));
91            free_list.push_back(i);
92        }
93
94        Self {
95            frames,
96            page_table: RwLock::new(HashMap::new()),
97            free_list: Mutex::new(free_list),
98            lru_list: Mutex::new(VecDeque::new()),
99            config,
100        }
101    }
102
103    /// Create a buffer pool with default configuration.
104    pub fn with_capacity(num_pages: usize) -> Self {
105        Self::new(BufferPoolConfig {
106            pool_size: num_pages,
107            ..Default::default()
108        })
109    }
110
111    /// Fetch a page from the pool, loading it if necessary.
112    pub fn fetch_page(&self, page_id: PageId) -> Result<PageHandle> {
113        if let Some(&frame_id) = self.page_table.read().get(&page_id) {
114            let frame = &self.frames[frame_id];
115            if let Some(ref page) = *frame.page.read() {
116                page.pin();
117                self.update_lru(frame_id);
118                return Ok(PageHandle {
119                    frame: Arc::clone(frame),
120                    page_id,
121                });
122            }
123        }
124
125        let frame_id = self.get_free_frame()?;
126        let frame = &self.frames[frame_id];
127
128        let page = Page::new(page_id, PageType::Data);
129        page.pin();
130
131        *frame.page.write() = Some(page);
132        *frame.page_id.write() = Some(page_id);
133
134        self.page_table.write().insert(page_id, frame_id);
135        self.update_lru(frame_id);
136
137        Ok(PageHandle {
138            frame: Arc::clone(frame),
139            page_id,
140        })
141    }
142
143    /// Create a new page in the pool.
144    pub fn new_page(&self, page_id: PageId, page_type: PageType) -> Result<PageHandle> {
145        if self.page_table.read().contains_key(&page_id) {
146            return Err(AegisError::Storage(format!(
147                "Page {} already exists",
148                page_id.0
149            )));
150        }
151
152        let frame_id = self.get_free_frame()?;
153        let frame = &self.frames[frame_id];
154
155        let page = Page::new(page_id, page_type);
156        page.pin();
157
158        *frame.page.write() = Some(page);
159        *frame.page_id.write() = Some(page_id);
160
161        self.page_table.write().insert(page_id, frame_id);
162        self.update_lru(frame_id);
163
164        Ok(PageHandle {
165            frame: Arc::clone(frame),
166            page_id,
167        })
168    }
169
170    /// Flush a specific page to storage.
171    pub fn flush_page(&self, page_id: PageId) -> Result<Option<Page>> {
172        if let Some(&frame_id) = self.page_table.read().get(&page_id) {
173            let frame = &self.frames[frame_id];
174            let page_guard = frame.page.read();
175
176            if let Some(ref page) = *page_guard {
177                if page.is_dirty() {
178                    let cloned = Page::from_bytes(&page.to_bytes())?;
179                    page.clear_dirty();
180                    return Ok(Some(cloned));
181                }
182            }
183        }
184        Ok(None)
185    }
186
187    /// Unpin a page, allowing it to be evicted.
188    pub fn unpin_page(&self, page_id: PageId) {
189        if let Some(&frame_id) = self.page_table.read().get(&page_id) {
190            let frame = &self.frames[frame_id];
191            if let Some(ref page) = *frame.page.read() {
192                page.unpin();
193            }
194        }
195    }
196
197    /// Get buffer pool statistics.
198    pub fn stats(&self) -> BufferPoolStats {
199        let page_table = self.page_table.read();
200        let free_count = self.free_list.lock().len();
201
202        let mut dirty_count = 0;
203        let mut pinned_count = 0;
204
205        for &frame_id in page_table.values() {
206            let frame = &self.frames[frame_id];
207            if let Some(ref page) = *frame.page.read() {
208                if page.is_dirty() {
209                    dirty_count += 1;
210                }
211                if page.is_pinned() {
212                    pinned_count += 1;
213                }
214            }
215        }
216
217        BufferPoolStats {
218            total_frames: self.config.pool_size,
219            used_frames: page_table.len(),
220            free_frames: free_count,
221            dirty_pages: dirty_count,
222            pinned_pages: pinned_count,
223        }
224    }
225
226    fn get_free_frame(&self) -> Result<usize> {
227        if let Some(frame_id) = self.free_list.lock().pop_front() {
228            return Ok(frame_id);
229        }
230
231        self.evict_page()
232    }
233
234    fn evict_page(&self) -> Result<usize> {
235        let mut lru_list = self.lru_list.lock();
236
237        for _ in 0..lru_list.len() {
238            if let Some(frame_id) = lru_list.pop_front() {
239                let frame = &self.frames[frame_id];
240
241                if let Some(ref page) = *frame.page.read() {
242                    if page.is_pinned() {
243                        lru_list.push_back(frame_id);
244                        continue;
245                    }
246                }
247
248                if let Some(page_id) = frame.get_page_id() {
249                    self.page_table.write().remove(&page_id);
250                }
251
252                *frame.page.write() = None;
253                *frame.page_id.write() = None;
254
255                return Ok(frame_id);
256            }
257        }
258
259        Err(AegisError::ResourceExhausted(
260            "No evictable pages in buffer pool".to_string(),
261        ))
262    }
263
264    fn update_lru(&self, frame_id: usize) {
265        let mut lru_list = self.lru_list.lock();
266        lru_list.retain(|&id| id != frame_id);
267        lru_list.push_back(frame_id);
268    }
269}
270
271// =============================================================================
272// Page Handle
273// =============================================================================
274
275/// RAII handle for accessing a page in the buffer pool.
276pub struct PageHandle {
277    frame: Arc<BufferFrame>,
278    page_id: PageId,
279}
280
281impl PageHandle {
282    /// Get a read reference to the page.
283    pub fn read(&self) -> impl std::ops::Deref<Target = Page> + '_ {
284        parking_lot::RwLockReadGuard::map(self.frame.page.read(), |opt| {
285            opt.as_ref().expect("Page should be present")
286        })
287    }
288
289    /// Get a write reference to the page.
290    pub fn write(&self) -> impl std::ops::DerefMut<Target = Page> + '_ {
291        parking_lot::RwLockWriteGuard::map(self.frame.page.write(), |opt| {
292            opt.as_mut().expect("Page should be present")
293        })
294    }
295
296    /// Get the page ID.
297    pub fn page_id(&self) -> PageId {
298        self.page_id
299    }
300}
301
302impl Drop for PageHandle {
303    fn drop(&mut self) {
304        if let Some(ref page) = *self.frame.page.read() {
305            page.unpin();
306        }
307    }
308}
309
310// =============================================================================
311// Statistics
312// =============================================================================
313
314/// Buffer pool statistics.
315#[derive(Debug, Clone)]
316pub struct BufferPoolStats {
317    pub total_frames: usize,
318    pub used_frames: usize,
319    pub free_frames: usize,
320    pub dirty_pages: usize,
321    pub pinned_pages: usize,
322}
323
324// =============================================================================
325// Tests
326// =============================================================================
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    #[test]
333    fn test_buffer_pool_new_page() {
334        let pool = BufferPool::with_capacity(10);
335        let handle = pool
336            .new_page(PageId(1), PageType::Data)
337            .expect("new_page should succeed");
338
339        assert_eq!(handle.page_id(), PageId(1));
340    }
341
342    #[test]
343    fn test_buffer_pool_fetch_page() {
344        let pool = BufferPool::with_capacity(10);
345
346        pool.new_page(PageId(1), PageType::Data)
347            .expect("new_page should succeed");
348
349        let handle = pool
350            .fetch_page(PageId(1))
351            .expect("fetch_page should succeed");
352        assert_eq!(handle.page_id(), PageId(1));
353    }
354
355    #[test]
356    fn test_buffer_pool_eviction() {
357        let pool = BufferPool::with_capacity(3);
358
359        let _h1 = pool
360            .new_page(PageId(1), PageType::Data)
361            .expect("new_page 1 should succeed");
362        let _h2 = pool
363            .new_page(PageId(2), PageType::Data)
364            .expect("new_page 2 should succeed");
365        let _h3 = pool
366            .new_page(PageId(3), PageType::Data)
367            .expect("new_page 3 should succeed");
368
369        drop(_h1);
370        drop(_h2);
371        drop(_h3);
372
373        let _h4 = pool
374            .new_page(PageId(4), PageType::Data)
375            .expect("new_page 4 after eviction should succeed");
376
377        let stats = pool.stats();
378        assert_eq!(stats.used_frames, 3);
379    }
380
381    #[test]
382    fn test_buffer_pool_stats() {
383        let pool = BufferPool::with_capacity(10);
384
385        let stats = pool.stats();
386        assert_eq!(stats.total_frames, 10);
387        assert_eq!(stats.used_frames, 0);
388        assert_eq!(stats.free_frames, 10);
389
390        let _h1 = pool
391            .new_page(PageId(1), PageType::Data)
392            .expect("new_page 1 should succeed");
393        let _h2 = pool
394            .new_page(PageId(2), PageType::Data)
395            .expect("new_page 2 should succeed");
396
397        let stats = pool.stats();
398        assert_eq!(stats.used_frames, 2);
399    }
400}