1use crate::buffer::buffer_pool::{BufferPool, FrameMetaSnapshot};
2use crate::buffer::{BufferManager, FrameId};
3use crate::error::{QuillSQLError, QuillSQLResult};
4use crate::recovery::Lsn;
5use derive_with::With;
6use parking_lot::{RwLockReadGuard, RwLockWriteGuard};
7use std::mem::{self, ManuallyDrop};
8use std::sync::atomic::{AtomicU32, Ordering};
9use std::sync::Arc;
10
11pub type PageId = u32;
12pub type AtomicPageId = AtomicU32;
13
14pub const INVALID_PAGE_ID: PageId = 0;
15pub const PAGE_SIZE: usize = 4096;
16
17#[derive(Debug, With)]
18pub struct PageMeta {
19 pub page_id: PageId,
20 pub pin_count: AtomicU32,
21 pub is_dirty: bool,
22 pub page_lsn: Lsn,
23}
24
25impl PageMeta {
26 pub fn empty() -> Self {
27 Self {
28 page_id: INVALID_PAGE_ID,
29 pin_count: AtomicU32::new(0),
30 is_dirty: false,
31 page_lsn: 0,
32 }
33 }
34
35 pub fn new(page_id: PageId) -> Self {
36 Self {
37 page_id,
38 pin_count: AtomicU32::new(0),
39 is_dirty: false,
40 page_lsn: 0,
41 }
42 }
43
44 pub fn destroy(&mut self) {
45 self.page_id = INVALID_PAGE_ID;
46 self.pin_count.store(0, Ordering::Relaxed);
47 self.is_dirty = false;
48 self.page_lsn = 0;
49 }
50}
51
52#[derive(Debug)]
53pub struct ReadPageGuard {
54 bpm: Arc<BufferManager>,
55 pool: Arc<BufferPool>,
56 frame_id: FrameId,
57 guard: ManuallyDrop<RwLockReadGuard<'static, ()>>,
58}
59
60impl ReadPageGuard {
61 pub fn pin_count(&self) -> u32 {
62 self.meta_snapshot().pin_count
63 }
64
65 pub fn data(&self) -> &[u8] {
66 unsafe { self.pool.frame_slice(self.frame_id) }
67 }
68
69 pub fn is_dirty(&self) -> bool {
70 self.meta_snapshot().is_dirty
71 }
72
73 pub fn page_id(&self) -> PageId {
74 self.meta_snapshot().page_id
75 }
76
77 pub fn lsn(&self) -> Lsn {
78 self.meta_snapshot().lsn
79 }
80
81 pub fn meta_snapshot(&self) -> FrameMetaSnapshot {
82 self.pool.frame_meta(self.frame_id).snapshot()
83 }
84
85 pub fn frame_id(&self) -> FrameId {
86 self.frame_id
87 }
88}
89
90impl Drop for ReadPageGuard {
91 fn drop(&mut self) {
92 let snapshot = self.meta_snapshot();
93 let page_id = snapshot.page_id;
94 let is_dirty = snapshot.is_dirty;
95 unsafe {
96 ManuallyDrop::drop(&mut self.guard);
97 }
98 if let Err(e) = self.bpm.complete_unpin(page_id, is_dirty, None) {
99 eprintln!("Warning: Failed to complete_unpin page {}: {}", page_id, e);
100 }
101 }
102}
103
104#[derive(Debug)]
105pub struct WritePageGuard {
106 bpm: Arc<BufferManager>,
107 pool: Arc<BufferPool>,
108 frame_id: FrameId,
109 guard: ManuallyDrop<RwLockWriteGuard<'static, ()>>,
110 first_dirty_lsn: Option<Lsn>,
111}
112
113impl WritePageGuard {
114 pub fn pin_count(&self) -> u32 {
115 self.meta_snapshot().pin_count
116 }
117
118 pub fn data(&self) -> &[u8] {
119 unsafe { self.pool.frame_slice(self.frame_id) }
120 }
121
122 pub fn data_mut(&mut self) -> &mut [u8] {
123 unsafe { self.pool.frame_slice_mut(self.frame_id) }
124 }
125
126 pub fn is_dirty(&self) -> bool {
127 self.meta_snapshot().is_dirty
128 }
129
130 pub fn page_id(&self) -> PageId {
131 self.meta_snapshot().page_id
132 }
133
134 pub fn lsn(&self) -> Lsn {
135 self.meta_snapshot().lsn
136 }
137
138 pub fn set_lsn(&mut self, lsn: Lsn) {
139 let meta = self.pool.frame_meta(self.frame_id);
140 meta.set_lsn(lsn);
141 if self.first_dirty_lsn.is_none() {
142 self.first_dirty_lsn = Some(lsn);
143 }
144 }
145
146 pub fn mark_dirty(&mut self) {
147 let meta = self.pool.frame_meta(self.frame_id);
148 meta.mark_dirty();
149 if self.first_dirty_lsn.is_none() {
150 self.first_dirty_lsn = Some(meta.lsn());
151 }
152 }
153
154 pub fn overwrite(&mut self, data: &[u8], new_lsn: Option<Lsn>) {
155 debug_assert_eq!(data.len(), PAGE_SIZE);
156 let slice = unsafe { self.pool.frame_slice_mut(self.frame_id) };
157 slice.copy_from_slice(data);
158 if let Some(lsn) = new_lsn {
159 self.set_lsn(lsn);
160 }
161 self.mark_dirty();
162 }
163
164 pub fn apply_page_image(&mut self, image: &[u8]) -> QuillSQLResult<Option<Lsn>> {
166 if image.len() != PAGE_SIZE {
167 return Err(QuillSQLError::Internal(format!(
168 "page {} image length {} differs from PAGE_SIZE",
169 self.page_id(),
170 image.len()
171 )));
172 }
173 if let Some(wal) = self.bpm.wal_manager() {
174 let prev_lsn = self.lsn();
175 let wal_result = wal.log_page_update(self.page_id(), prev_lsn, self.data(), image)?;
176 if let Some(result) = wal_result {
177 self.overwrite(image, Some(result.end_lsn));
178 return Ok(Some(result.end_lsn));
179 }
180 Ok(None)
181 } else {
182 let prev_lsn = self.lsn();
183 self.overwrite(image, Some(prev_lsn));
184 Ok(None)
185 }
186 }
187
188 pub fn meta_snapshot(&self) -> FrameMetaSnapshot {
189 self.pool.frame_meta(self.frame_id).snapshot()
190 }
191
192 pub fn frame_id(&self) -> FrameId {
193 self.frame_id
194 }
195}
196
197impl Drop for WritePageGuard {
198 fn drop(&mut self) {
199 let snapshot = self.meta_snapshot();
200 let page_id = snapshot.page_id;
201 let is_dirty = snapshot.is_dirty;
202 let lsn = snapshot.lsn;
203 unsafe {
204 ManuallyDrop::drop(&mut self.guard);
205 }
206 let rec_lsn_hint = if let Some(first) = self.first_dirty_lsn {
207 Some(first)
208 } else if is_dirty {
209 Some(lsn)
210 } else {
211 None
212 };
213 if let Err(e) = self.bpm.complete_unpin(page_id, is_dirty, rec_lsn_hint) {
214 eprintln!("Warning: Failed to complete_unpin page {}: {}", page_id, e);
215 }
216 }
217}
218
219pub(crate) fn new_read_guard(bpm: Arc<BufferManager>, frame_id: FrameId) -> ReadPageGuard {
220 let pool = bpm.buffer_pool();
221 let lock = pool.frame_lock(frame_id).read();
222 let static_guard =
223 unsafe { mem::transmute::<RwLockReadGuard<'_, ()>, RwLockReadGuard<'static, ()>>(lock) };
224 ReadPageGuard {
225 bpm,
226 pool,
227 frame_id,
228 guard: ManuallyDrop::new(static_guard),
229 }
230}
231
232pub(crate) fn new_write_guard(bpm: Arc<BufferManager>, frame_id: FrameId) -> WritePageGuard {
233 let pool = bpm.buffer_pool();
234 let lock = pool.frame_lock(frame_id).write();
235 let static_guard =
236 unsafe { mem::transmute::<RwLockWriteGuard<'_, ()>, RwLockWriteGuard<'static, ()>>(lock) };
237 WritePageGuard {
238 bpm,
239 pool,
240 frame_id,
241 guard: ManuallyDrop::new(static_guard),
242 first_dirty_lsn: None,
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use std::sync::atomic::Ordering;
249 use std::sync::Arc;
250 use tempfile::TempDir;
251
252 use crate::buffer::BufferManager;
253 use crate::config::WalConfig;
254 use crate::recovery::wal::codec::decode_payload;
255 use crate::recovery::wal_record::WalRecordPayload;
256 use crate::recovery::WalManager;
257 use crate::storage::disk_manager::DiskManager;
258 use crate::storage::disk_scheduler::DiskScheduler;
259
260 use super::{PageMeta, INVALID_PAGE_ID, PAGE_SIZE};
261
262 fn setup_real_bpm_environment(num_pages: usize) -> (TempDir, Arc<BufferManager>) {
263 let temp_dir = TempDir::new().unwrap();
264 let db_path = temp_dir.path().join("test.db");
265
266 let disk_manager = Arc::new(DiskManager::try_new(db_path).unwrap());
267 let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager));
268 let buffer_pool_manager = Arc::new(BufferManager::new(num_pages, disk_scheduler));
269
270 (temp_dir, buffer_pool_manager)
271 }
272
273 fn attach_wal(temp_dir: &TempDir, bpm: &Arc<BufferManager>) -> Arc<WalManager> {
274 let mut config = WalConfig::default();
275 config.directory = temp_dir.path().join("wal_guard");
276 let wal = Arc::new(WalManager::new(config, None, None).expect("wal manager"));
277 bpm.set_wal_manager(wal.clone());
278 wal
279 }
280
281 #[test]
282 fn test_page_struct_creation() {
283 let page = PageMeta::new(1);
284 assert_eq!(page.page_id, 1);
285 assert!(!page.is_dirty);
286 assert_eq!(page.pin_count.load(Ordering::Acquire), 0);
287
288 let empty_page = PageMeta::empty();
289 assert_eq!(empty_page.page_id, INVALID_PAGE_ID);
290 }
291
292 #[test]
293 fn test_read_guard_deref_and_drop() {
294 let (_temp_dir, bpm) = setup_real_bpm_environment(10);
295
296 let (page_id, frame_id) = {
297 let guard = bpm.new_page().unwrap();
298 let frame_id = guard.frame_id();
299 (guard.page_id(), frame_id)
300 };
301
302 {
303 let meta = bpm.buffer_pool().frame_meta(frame_id).snapshot();
304 assert_eq!(meta.pin_count, 0);
305 }
306
307 let read_guard = bpm.fetch_page_read(page_id).unwrap();
308 assert_eq!(read_guard.page_id(), page_id);
309 assert_eq!(read_guard.pin_count(), 1);
310 assert_eq!(read_guard.data().len(), PAGE_SIZE);
311 let snapshot = read_guard.meta_snapshot();
312 assert_eq!(snapshot.pin_count, 1);
313 drop(read_guard);
314
315 let meta = bpm.buffer_pool().frame_meta(frame_id).snapshot();
316 assert_eq!(meta.pin_count, 0);
317 }
318
319 #[test]
320 fn test_write_guard_deref_mut_and_drop() {
321 let (_temp_dir, bpm) = setup_real_bpm_environment(10);
322 let (page_id, frame_id) = {
323 let mut write_guard = bpm.new_page().unwrap();
324 write_guard.data_mut()[0] = 123;
325 write_guard.set_lsn(42);
326 write_guard.mark_dirty();
327 (write_guard.page_id(), write_guard.frame_id())
328 };
329
330 let meta = bpm.buffer_pool().frame_meta(frame_id).snapshot();
331 assert!(meta.is_dirty);
332 assert_eq!(meta.lsn, 42);
333 assert_eq!(meta.pin_count, 0);
334
335 let read_guard = bpm.fetch_page_read(page_id).unwrap();
336 assert_eq!(read_guard.data()[0], 123);
337 assert!(read_guard.is_dirty());
338 assert_eq!(read_guard.lsn(), 42);
339 let snapshot = read_guard.meta_snapshot();
340 assert_eq!(snapshot.lsn, 42);
341 assert!(snapshot.is_dirty);
342 assert_eq!(snapshot.pin_count, 1);
343 drop(read_guard);
344
345 let meta = bpm.buffer_pool().frame_meta(frame_id).snapshot();
346 assert!(meta.is_dirty);
347 assert_eq!(meta.lsn, 42);
348 assert_eq!(meta.pin_count, 0);
349 }
350
351 #[test]
352 fn test_write_guard_without_mutation_is_not_dirty() {
353 let (_temp_dir, bpm) = setup_real_bpm_environment(10);
354 let (page_id, frame_id) = {
355 let guard = bpm.new_page().unwrap();
356 (guard.page_id(), guard.frame_id())
357 };
358
359 {
360 let _write_guard = bpm.fetch_page_write(page_id).unwrap();
361 }
362
363 let read_guard = bpm.fetch_page_read(page_id).unwrap();
364 let snapshot = read_guard.meta_snapshot();
365 assert!(!snapshot.is_dirty);
366 assert_eq!(snapshot.lsn, 0);
367 assert_eq!(snapshot.pin_count, 1);
368 drop(read_guard);
369
370 let meta = bpm.buffer_pool().frame_meta(frame_id).snapshot();
371 assert!(!meta.is_dirty);
372 assert_eq!(meta.pin_count, 0);
373 }
374
375 #[test]
376 fn apply_page_image_logs_wal_records() {
377 let (temp_dir, bpm) = setup_real_bpm_environment(4);
378 let wal = attach_wal(&temp_dir, &bpm);
379
380 let mut guard = bpm.new_page().unwrap();
381 let page_id = guard.page_id();
382 let mut image = vec![0u8; PAGE_SIZE];
383 image[0] = 42;
384 let result = guard.apply_page_image(&image).expect("apply image");
385 assert!(result.is_some());
386 let lsn = result.unwrap();
387 assert_eq!(guard.lsn(), lsn);
388 drop(guard);
389
390 wal.flush(None).expect("flush wal");
391 let mut reader = wal.reader().expect("reader");
392 let frame = reader.next_frame().expect("frame").expect("frame");
393 match decode_payload(&frame).expect("payload") {
394 WalRecordPayload::PageWrite(payload) => {
395 assert_eq!(payload.page_id, page_id);
396 }
397 other => panic!("expected PageWrite, got {:?}", other),
398 }
399 assert!(reader.next_frame().expect("no extra").is_none());
400 }
401
402 #[test]
403 fn apply_page_image_rejects_invalid_length() {
404 let (_temp_dir, bpm) = setup_real_bpm_environment(2);
405 let mut guard = bpm.new_page().unwrap();
406 let err = guard
407 .apply_page_image(&vec![0u8; PAGE_SIZE - 1])
408 .expect_err("length mismatch should fail");
409 match err {
410 crate::error::QuillSQLError::Internal(msg) => {
411 assert!(
412 msg.contains("image length"),
413 "unexpected error message: {}",
414 msg
415 );
416 }
417 other => panic!("unexpected error: {:?}", other),
418 }
419 }
420}