quill_sql/buffer/
page.rs

1use crate::buffer::buffer_pool::{BufferPool, FrameMetaSnapshot};
2use crate::buffer::{BufferManager, FrameId};
3use crate::error::{QuillSQLError, QuillSQLResult};
4use crate::recovery::Lsn;
5use derive_with::With;
6use parking_lot::{RwLockReadGuard, RwLockWriteGuard};
7use std::mem::{self, ManuallyDrop};
8use std::sync::atomic::{AtomicU32, Ordering};
9use std::sync::Arc;
10
11pub type PageId = u32;
12pub type AtomicPageId = AtomicU32;
13
14pub const INVALID_PAGE_ID: PageId = 0;
15pub const PAGE_SIZE: usize = 4096;
16
17#[derive(Debug, With)]
18pub struct PageMeta {
19    pub page_id: PageId,
20    pub pin_count: AtomicU32,
21    pub is_dirty: bool,
22    pub page_lsn: Lsn,
23}
24
25impl PageMeta {
26    pub fn empty() -> Self {
27        Self {
28            page_id: INVALID_PAGE_ID,
29            pin_count: AtomicU32::new(0),
30            is_dirty: false,
31            page_lsn: 0,
32        }
33    }
34
35    pub fn new(page_id: PageId) -> Self {
36        Self {
37            page_id,
38            pin_count: AtomicU32::new(0),
39            is_dirty: false,
40            page_lsn: 0,
41        }
42    }
43
44    pub fn destroy(&mut self) {
45        self.page_id = INVALID_PAGE_ID;
46        self.pin_count.store(0, Ordering::Relaxed);
47        self.is_dirty = false;
48        self.page_lsn = 0;
49    }
50}
51
52#[derive(Debug)]
53pub struct ReadPageGuard {
54    bpm: Arc<BufferManager>,
55    pool: Arc<BufferPool>,
56    frame_id: FrameId,
57    guard: ManuallyDrop<RwLockReadGuard<'static, ()>>,
58}
59
60impl ReadPageGuard {
61    pub fn pin_count(&self) -> u32 {
62        self.meta_snapshot().pin_count
63    }
64
65    pub fn data(&self) -> &[u8] {
66        unsafe { self.pool.frame_slice(self.frame_id) }
67    }
68
69    pub fn is_dirty(&self) -> bool {
70        self.meta_snapshot().is_dirty
71    }
72
73    pub fn page_id(&self) -> PageId {
74        self.meta_snapshot().page_id
75    }
76
77    pub fn lsn(&self) -> Lsn {
78        self.meta_snapshot().lsn
79    }
80
81    pub fn meta_snapshot(&self) -> FrameMetaSnapshot {
82        self.pool.frame_meta(self.frame_id).snapshot()
83    }
84
85    pub fn frame_id(&self) -> FrameId {
86        self.frame_id
87    }
88}
89
90impl Drop for ReadPageGuard {
91    fn drop(&mut self) {
92        let snapshot = self.meta_snapshot();
93        let page_id = snapshot.page_id;
94        let is_dirty = snapshot.is_dirty;
95        unsafe {
96            ManuallyDrop::drop(&mut self.guard);
97        }
98        if let Err(e) = self.bpm.complete_unpin(page_id, is_dirty, None) {
99            eprintln!("Warning: Failed to complete_unpin page {}: {}", page_id, e);
100        }
101    }
102}
103
104#[derive(Debug)]
105pub struct WritePageGuard {
106    bpm: Arc<BufferManager>,
107    pool: Arc<BufferPool>,
108    frame_id: FrameId,
109    guard: ManuallyDrop<RwLockWriteGuard<'static, ()>>,
110    first_dirty_lsn: Option<Lsn>,
111}
112
113impl WritePageGuard {
114    pub fn pin_count(&self) -> u32 {
115        self.meta_snapshot().pin_count
116    }
117
118    pub fn data(&self) -> &[u8] {
119        unsafe { self.pool.frame_slice(self.frame_id) }
120    }
121
122    pub fn data_mut(&mut self) -> &mut [u8] {
123        unsafe { self.pool.frame_slice_mut(self.frame_id) }
124    }
125
126    pub fn is_dirty(&self) -> bool {
127        self.meta_snapshot().is_dirty
128    }
129
130    pub fn page_id(&self) -> PageId {
131        self.meta_snapshot().page_id
132    }
133
134    pub fn lsn(&self) -> Lsn {
135        self.meta_snapshot().lsn
136    }
137
138    pub fn set_lsn(&mut self, lsn: Lsn) {
139        let meta = self.pool.frame_meta(self.frame_id);
140        meta.set_lsn(lsn);
141        if self.first_dirty_lsn.is_none() {
142            self.first_dirty_lsn = Some(lsn);
143        }
144    }
145
146    pub fn mark_dirty(&mut self) {
147        let meta = self.pool.frame_meta(self.frame_id);
148        meta.mark_dirty();
149        if self.first_dirty_lsn.is_none() {
150            self.first_dirty_lsn = Some(meta.lsn());
151        }
152    }
153
154    pub fn overwrite(&mut self, data: &[u8], new_lsn: Option<Lsn>) {
155        debug_assert_eq!(data.len(), PAGE_SIZE);
156        let slice = unsafe { self.pool.frame_slice_mut(self.frame_id) };
157        slice.copy_from_slice(data);
158        if let Some(lsn) = new_lsn {
159            self.set_lsn(lsn);
160        }
161        self.mark_dirty();
162    }
163
164    /// Apply a full page image; automatically emits WAL (full or delta) when configured.
165    pub fn apply_page_image(&mut self, image: &[u8]) -> QuillSQLResult<Option<Lsn>> {
166        if image.len() != PAGE_SIZE {
167            return Err(QuillSQLError::Internal(format!(
168                "page {} image length {} differs from PAGE_SIZE",
169                self.page_id(),
170                image.len()
171            )));
172        }
173        if let Some(wal) = self.bpm.wal_manager() {
174            let prev_lsn = self.lsn();
175            let wal_result = wal.log_page_update(self.page_id(), prev_lsn, self.data(), image)?;
176            if let Some(result) = wal_result {
177                self.overwrite(image, Some(result.end_lsn));
178                return Ok(Some(result.end_lsn));
179            }
180            Ok(None)
181        } else {
182            let prev_lsn = self.lsn();
183            self.overwrite(image, Some(prev_lsn));
184            Ok(None)
185        }
186    }
187
188    pub fn meta_snapshot(&self) -> FrameMetaSnapshot {
189        self.pool.frame_meta(self.frame_id).snapshot()
190    }
191
192    pub fn frame_id(&self) -> FrameId {
193        self.frame_id
194    }
195}
196
197impl Drop for WritePageGuard {
198    fn drop(&mut self) {
199        let snapshot = self.meta_snapshot();
200        let page_id = snapshot.page_id;
201        let is_dirty = snapshot.is_dirty;
202        let lsn = snapshot.lsn;
203        unsafe {
204            ManuallyDrop::drop(&mut self.guard);
205        }
206        let rec_lsn_hint = if let Some(first) = self.first_dirty_lsn {
207            Some(first)
208        } else if is_dirty {
209            Some(lsn)
210        } else {
211            None
212        };
213        if let Err(e) = self.bpm.complete_unpin(page_id, is_dirty, rec_lsn_hint) {
214            eprintln!("Warning: Failed to complete_unpin page {}: {}", page_id, e);
215        }
216    }
217}
218
219pub(crate) fn new_read_guard(bpm: Arc<BufferManager>, frame_id: FrameId) -> ReadPageGuard {
220    let pool = bpm.buffer_pool();
221    let lock = pool.frame_lock(frame_id).read();
222    let static_guard =
223        unsafe { mem::transmute::<RwLockReadGuard<'_, ()>, RwLockReadGuard<'static, ()>>(lock) };
224    ReadPageGuard {
225        bpm,
226        pool,
227        frame_id,
228        guard: ManuallyDrop::new(static_guard),
229    }
230}
231
232pub(crate) fn new_write_guard(bpm: Arc<BufferManager>, frame_id: FrameId) -> WritePageGuard {
233    let pool = bpm.buffer_pool();
234    let lock = pool.frame_lock(frame_id).write();
235    let static_guard =
236        unsafe { mem::transmute::<RwLockWriteGuard<'_, ()>, RwLockWriteGuard<'static, ()>>(lock) };
237    WritePageGuard {
238        bpm,
239        pool,
240        frame_id,
241        guard: ManuallyDrop::new(static_guard),
242        first_dirty_lsn: None,
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use std::sync::atomic::Ordering;
249    use std::sync::Arc;
250    use tempfile::TempDir;
251
252    use crate::buffer::BufferManager;
253    use crate::config::WalConfig;
254    use crate::recovery::wal::codec::decode_payload;
255    use crate::recovery::wal_record::WalRecordPayload;
256    use crate::recovery::WalManager;
257    use crate::storage::disk_manager::DiskManager;
258    use crate::storage::disk_scheduler::DiskScheduler;
259
260    use super::{PageMeta, INVALID_PAGE_ID, PAGE_SIZE};
261
262    fn setup_real_bpm_environment(num_pages: usize) -> (TempDir, Arc<BufferManager>) {
263        let temp_dir = TempDir::new().unwrap();
264        let db_path = temp_dir.path().join("test.db");
265
266        let disk_manager = Arc::new(DiskManager::try_new(db_path).unwrap());
267        let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager));
268        let buffer_pool_manager = Arc::new(BufferManager::new(num_pages, disk_scheduler));
269
270        (temp_dir, buffer_pool_manager)
271    }
272
273    fn attach_wal(temp_dir: &TempDir, bpm: &Arc<BufferManager>) -> Arc<WalManager> {
274        let mut config = WalConfig::default();
275        config.directory = temp_dir.path().join("wal_guard");
276        let wal = Arc::new(WalManager::new(config, None, None).expect("wal manager"));
277        bpm.set_wal_manager(wal.clone());
278        wal
279    }
280
281    #[test]
282    fn test_page_struct_creation() {
283        let page = PageMeta::new(1);
284        assert_eq!(page.page_id, 1);
285        assert!(!page.is_dirty);
286        assert_eq!(page.pin_count.load(Ordering::Acquire), 0);
287
288        let empty_page = PageMeta::empty();
289        assert_eq!(empty_page.page_id, INVALID_PAGE_ID);
290    }
291
292    #[test]
293    fn test_read_guard_deref_and_drop() {
294        let (_temp_dir, bpm) = setup_real_bpm_environment(10);
295
296        let (page_id, frame_id) = {
297            let guard = bpm.new_page().unwrap();
298            let frame_id = guard.frame_id();
299            (guard.page_id(), frame_id)
300        };
301
302        {
303            let meta = bpm.buffer_pool().frame_meta(frame_id).snapshot();
304            assert_eq!(meta.pin_count, 0);
305        }
306
307        let read_guard = bpm.fetch_page_read(page_id).unwrap();
308        assert_eq!(read_guard.page_id(), page_id);
309        assert_eq!(read_guard.pin_count(), 1);
310        assert_eq!(read_guard.data().len(), PAGE_SIZE);
311        let snapshot = read_guard.meta_snapshot();
312        assert_eq!(snapshot.pin_count, 1);
313        drop(read_guard);
314
315        let meta = bpm.buffer_pool().frame_meta(frame_id).snapshot();
316        assert_eq!(meta.pin_count, 0);
317    }
318
319    #[test]
320    fn test_write_guard_deref_mut_and_drop() {
321        let (_temp_dir, bpm) = setup_real_bpm_environment(10);
322        let (page_id, frame_id) = {
323            let mut write_guard = bpm.new_page().unwrap();
324            write_guard.data_mut()[0] = 123;
325            write_guard.set_lsn(42);
326            write_guard.mark_dirty();
327            (write_guard.page_id(), write_guard.frame_id())
328        };
329
330        let meta = bpm.buffer_pool().frame_meta(frame_id).snapshot();
331        assert!(meta.is_dirty);
332        assert_eq!(meta.lsn, 42);
333        assert_eq!(meta.pin_count, 0);
334
335        let read_guard = bpm.fetch_page_read(page_id).unwrap();
336        assert_eq!(read_guard.data()[0], 123);
337        assert!(read_guard.is_dirty());
338        assert_eq!(read_guard.lsn(), 42);
339        let snapshot = read_guard.meta_snapshot();
340        assert_eq!(snapshot.lsn, 42);
341        assert!(snapshot.is_dirty);
342        assert_eq!(snapshot.pin_count, 1);
343        drop(read_guard);
344
345        let meta = bpm.buffer_pool().frame_meta(frame_id).snapshot();
346        assert!(meta.is_dirty);
347        assert_eq!(meta.lsn, 42);
348        assert_eq!(meta.pin_count, 0);
349    }
350
351    #[test]
352    fn test_write_guard_without_mutation_is_not_dirty() {
353        let (_temp_dir, bpm) = setup_real_bpm_environment(10);
354        let (page_id, frame_id) = {
355            let guard = bpm.new_page().unwrap();
356            (guard.page_id(), guard.frame_id())
357        };
358
359        {
360            let _write_guard = bpm.fetch_page_write(page_id).unwrap();
361        }
362
363        let read_guard = bpm.fetch_page_read(page_id).unwrap();
364        let snapshot = read_guard.meta_snapshot();
365        assert!(!snapshot.is_dirty);
366        assert_eq!(snapshot.lsn, 0);
367        assert_eq!(snapshot.pin_count, 1);
368        drop(read_guard);
369
370        let meta = bpm.buffer_pool().frame_meta(frame_id).snapshot();
371        assert!(!meta.is_dirty);
372        assert_eq!(meta.pin_count, 0);
373    }
374
375    #[test]
376    fn apply_page_image_logs_wal_records() {
377        let (temp_dir, bpm) = setup_real_bpm_environment(4);
378        let wal = attach_wal(&temp_dir, &bpm);
379
380        let mut guard = bpm.new_page().unwrap();
381        let page_id = guard.page_id();
382        let mut image = vec![0u8; PAGE_SIZE];
383        image[0] = 42;
384        let result = guard.apply_page_image(&image).expect("apply image");
385        assert!(result.is_some());
386        let lsn = result.unwrap();
387        assert_eq!(guard.lsn(), lsn);
388        drop(guard);
389
390        wal.flush(None).expect("flush wal");
391        let mut reader = wal.reader().expect("reader");
392        let frame = reader.next_frame().expect("frame").expect("frame");
393        match decode_payload(&frame).expect("payload") {
394            WalRecordPayload::PageWrite(payload) => {
395                assert_eq!(payload.page_id, page_id);
396            }
397            other => panic!("expected PageWrite, got {:?}", other),
398        }
399        assert!(reader.next_frame().expect("no extra").is_none());
400    }
401
402    #[test]
403    fn apply_page_image_rejects_invalid_length() {
404        let (_temp_dir, bpm) = setup_real_bpm_environment(2);
405        let mut guard = bpm.new_page().unwrap();
406        let err = guard
407            .apply_page_image(&vec![0u8; PAGE_SIZE - 1])
408            .expect_err("length mismatch should fail");
409        match err {
410            crate::error::QuillSQLError::Internal(msg) => {
411                assert!(
412                    msg.contains("image length"),
413                    "unexpected error message: {}",
414                    msg
415                );
416            }
417            other => panic!("unexpected error: {:?}", other),
418        }
419    }
420}