quill_sql/buffer/
buffer_pool.rs

1//! Buffer Pool Manager
2//! - Page table (PageId→FrameId), LRU-K replacer, optional TinyLFU admission
3//! - Read/Write page guards pin/unpin via RAII; flush dirty pages before eviction
4//! - Disk I/O offloaded to async DiskScheduler
5
6use bytes::Bytes;
7use dashmap::{DashMap, DashSet};
8use parking_lot::{Mutex, RwLock};
9use std::sync::atomic::Ordering;
10use std::{collections::VecDeque, sync::Arc};
11
12use crate::buffer::page::{
13    self, Page, PageId, ReadPageGuard, WritePageGuard, INVALID_PAGE_ID, PAGE_SIZE,
14};
15
16use crate::catalog::SchemaRef;
17use crate::error::{QuillSQLError, QuillSQLResult};
18use crate::storage::codec::{
19    BPlusTreeHeaderPageCodec, BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec,
20    BPlusTreePageCodec, TablePageCodec,
21};
22use crate::storage::disk_scheduler::DiskScheduler;
23use crate::storage::{
24    page::TablePage,
25    page::{BPlusTreeHeaderPage, BPlusTreeInternalPage, BPlusTreeLeafPage, BPlusTreePage},
26};
27
28use crate::config::BufferPoolConfig;
29use crate::recovery::Lsn;
30use crate::recovery::WalManager;
31use crate::utils::cache::lru_k::LRUKReplacer;
32use crate::utils::cache::tiny_lfu::TinyLFU;
33use crate::utils::cache::Replacer;
34
35pub type FrameId = usize;
36
37pub const BUFFER_POOL_SIZE: usize = 5000;
38
39#[derive(Debug)]
40pub struct BufferPoolManager {
41    pub pool: Vec<Arc<RwLock<Page>>>,
42    pub replacer: Arc<RwLock<LRUKReplacer>>,
43    pub disk_scheduler: Arc<DiskScheduler>,
44    pub page_table: Arc<DashMap<PageId, FrameId>>,
45    pub free_list: Arc<RwLock<VecDeque<FrameId>>>,
46    /// Per-page inflight load guards to serialize concurrent loads of the same page
47    pub inflight_loads: Arc<DashMap<PageId, Arc<Mutex<()>>>>,
48    /// Optional TinyLFU admission filter
49    pub tiny_lfu: Option<Arc<RwLock<TinyLFU>>>,
50    /// Dirty page tracking for checkpoints and background writers
51    pub dirty_pages: Arc<DashSet<PageId>>,
52    /// Dirty page table: page id -> first dirty LSN (recLSN)
53    pub dirty_page_table: DashMap<PageId, Lsn>,
54    /// Optional WAL manager to enforce write-ahead rule on flush
55    pub wal_manager: Arc<RwLock<Option<Arc<WalManager>>>>,
56}
57
58impl BufferPoolManager {
59    #[inline]
60    fn replacer_set_evictable(&self, frame_id: FrameId, evictable: bool) -> QuillSQLResult<()> {
61        let mut rep = self.replacer.write();
62        rep.set_evictable(frame_id, evictable)
63            .map_err(|e| QuillSQLError::Internal(format!("replacer set_evictable failed: {}", e)))
64    }
65
66    #[inline]
67    fn replacer_record_access(&self, frame_id: FrameId) -> QuillSQLResult<()> {
68        let mut rep = self.replacer.write();
69        rep.record_access(frame_id)
70            .map_err(|e| QuillSQLError::Internal(format!("replacer record_access failed: {}", e)))
71    }
72    #[inline]
73    fn replacer_touch_and_set(&self, frame_id: FrameId, evictable: bool) -> QuillSQLResult<()> {
74        let mut rep = self.replacer.write();
75        rep.record_access(frame_id).map_err(|e| {
76            QuillSQLError::Internal(format!("replacer record_access failed: {}", e))
77        })?;
78        rep.set_evictable(frame_id, evictable)
79            .map_err(|e| QuillSQLError::Internal(format!("replacer set_evictable failed: {}", e)))
80    }
81
82    pub fn new(num_pages: usize, disk_scheduler: Arc<DiskScheduler>) -> Self {
83        Self::new_with_config(
84            BufferPoolConfig {
85                buffer_pool_size: num_pages,
86                ..Default::default()
87            },
88            disk_scheduler,
89        )
90    }
91
92    pub fn new_with_config(config: BufferPoolConfig, disk_scheduler: Arc<DiskScheduler>) -> Self {
93        let num_pages = config.buffer_pool_size;
94        let mut free_list = VecDeque::with_capacity(num_pages);
95        let mut pool = vec![];
96        for i in 0..num_pages {
97            free_list.push_back(i);
98            pool.push(Arc::new(RwLock::new(Page::empty())));
99        }
100
101        Self {
102            pool,
103            replacer: Arc::new(RwLock::new(LRUKReplacer::new(num_pages))),
104            disk_scheduler,
105            page_table: Arc::new(DashMap::new()),
106            free_list: Arc::new(RwLock::new(free_list)),
107            inflight_loads: Arc::new(DashMap::new()),
108            tiny_lfu: if config.tiny_lfu_enable {
109                Some(Arc::new(RwLock::new(TinyLFU::new(
110                    num_pages.next_power_of_two(),
111                    config.tiny_lfu_counters,
112                ))))
113            } else {
114                None
115            },
116            dirty_pages: Arc::new(DashSet::new()),
117            wal_manager: Arc::new(RwLock::new(None)),
118            dirty_page_table: DashMap::new(),
119        }
120    }
121
122    pub fn set_wal_manager(&self, wal_manager: Arc<WalManager>) {
123        *self.wal_manager.write() = Some(wal_manager);
124    }
125
126    pub fn wal_manager(&self) -> Option<Arc<WalManager>> {
127        self.wal_manager.read().clone()
128    }
129
130    pub fn dirty_page_ids(&self) -> Vec<PageId> {
131        self.dirty_pages.iter().map(|entry| *entry.key()).collect()
132    }
133
134    pub fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)> {
135        self.dirty_page_table
136            .iter()
137            .map(|entry| (*entry.key(), *entry.value()))
138            .collect()
139    }
140
141    #[inline]
142    fn note_dirty_page(&self, page_id: PageId, rec_lsn: Lsn) {
143        self.dirty_pages.insert(page_id);
144        self.dirty_page_table.entry(page_id).or_insert(rec_lsn);
145    }
146
147    /// 创建一个新页面。
148    pub fn new_page(self: &Arc<Self>) -> QuillSQLResult<WritePageGuard> {
149        if self.free_list.read().is_empty() && self.replacer.read().size() == 0 {
150            return Err(QuillSQLError::Storage(
151                "Cannot new page because buffer pool is full and no page to evict".to_string(),
152            ));
153        }
154
155        let frame_id = self.allocate_frame()?;
156
157        let rx_alloc = self.disk_scheduler.schedule_allocate()?;
158        let new_page_id = rx_alloc.recv().map_err(|e| {
159            QuillSQLError::Internal(format!("Failed to receive allocated page_id: {}", e))
160        })??;
161        self.page_table.insert(new_page_id, frame_id);
162
163        let page_arc = self.pool[frame_id].clone();
164        {
165            let mut page_writer = page_arc.write();
166            *page_writer = Page::new(new_page_id);
167            page_writer.pin_count.store(1, Ordering::Relaxed);
168        }
169
170        self.replacer_touch_and_set(frame_id, false)?;
171
172        Ok(page::new_write_guard(self.clone(), page_arc))
173    }
174
175    /// 获取一个只读页面。
176    pub fn fetch_page_read(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<ReadPageGuard> {
177        if page_id == INVALID_PAGE_ID {
178            return Err(QuillSQLError::Storage(
179                "fetch_page_read: invalid page id".to_string(),
180            ));
181        }
182        // Robust retry to tolerate delete/evict races
183        let mut attempts = 0usize;
184        loop {
185            if let Ok(frame_id) = self.get_frame_for_page(page_id) {
186                let page_arc = self.pool[frame_id].clone();
187
188                // Validate mapping before pin
189                let reader = page_arc.read();
190                if reader.page_id() == page_id {
191                    // Pin now that mapping is validated
192                    reader.pin();
193                    // Mark non-evictable while pinned
194                    let _ = self.replacer_set_evictable(frame_id, false);
195                    drop(reader);
196                    return Ok(page::new_read_guard(self.clone(), page_arc));
197                } else {
198                    drop(reader);
199                    attempts += 1;
200                    if attempts > 128 {
201                        break;
202                    }
203                    std::hint::spin_loop();
204                    continue;
205                }
206            } else {
207                attempts += 1;
208                if attempts > 128 {
209                    break;
210                }
211                std::hint::spin_loop();
212                continue;
213            }
214        }
215        Err(QuillSQLError::Internal(format!(
216            "fetch_page_read: failed after {} retries",
217            attempts
218        )))
219    }
220
221    /// 获取一个可写页面。
222    pub fn fetch_page_write(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<WritePageGuard> {
223        if page_id == INVALID_PAGE_ID {
224            return Err(QuillSQLError::Storage(
225                "fetch_page_write: invalid page id".to_string(),
226            ));
227        }
228        let mut attempts = 0usize;
229        loop {
230            if let Ok(frame_id) = self.get_frame_for_page(page_id) {
231                let page_arc = self.pool[frame_id].clone();
232
233                // Validate mapping then pin
234                {
235                    let reader = page_arc.read();
236                    if reader.page_id() == page_id {
237                        reader.pin();
238                        // Mark non-evictable while pinned
239                        let _ = self.replacer_set_evictable(frame_id, false);
240                    } else {
241                        drop(reader);
242                        attempts += 1;
243                        if attempts > 128 {
244                            break;
245                        }
246                        std::hint::spin_loop();
247                        continue;
248                    }
249                    if false {
250                        eprintln!(
251                            "[LOCK DEBUG] thread={:?} attempt write page_id={}",
252                            std::thread::current().id(),
253                            reader.page_id()
254                        );
255                    }
256                }
257
258                let guard = page::new_write_guard(self.clone(), page_arc.clone());
259                if false {
260                    if let Some(r) = page_arc.try_read() {
261                        eprintln!(
262                            "[LOCK DEBUG] thread={:?} acquired write page_id={}",
263                            std::thread::current().id(),
264                            r.page_id()
265                        );
266                    } else {
267                        eprintln!(
268                            "[LOCK DEBUG] thread={:?} acquired write page_id=<busy>",
269                            std::thread::current().id()
270                        );
271                    }
272                }
273                return Ok(guard);
274            } else {
275                attempts += 1;
276                if attempts > 128 {
277                    break;
278                }
279                std::hint::spin_loop();
280                continue;
281            }
282        }
283        Err(QuillSQLError::Internal(format!(
284            "fetch_page_write: failed after {} retries",
285            attempts
286        )))
287    }
288
289    /// 完成 unpin 的后续处理:根据旧的 pin_count 决定是否可驱逐,并处理脏位。
290    /// 该函数必须在未持有页面锁的情况下调用,以避免与 fetch_page_write 的锁序形成死锁。
291    pub fn complete_unpin(
292        &self,
293        page_id: PageId,
294        is_dirty: bool,
295        old_pin_count: u32,
296        rec_lsn_hint: Option<Lsn>,
297    ) -> QuillSQLResult<()> {
298        if let Some(frame_id_ref) = self.page_table.get(&page_id) {
299            let frame_id = *frame_id_ref;
300            if is_dirty {
301                if let Some(mut p) = self.pool[frame_id].try_write() {
302                    p.is_dirty = true;
303                }
304                let lsn = rec_lsn_hint.unwrap_or_else(|| self.pool[frame_id].read().page_lsn);
305                self.note_dirty_page(page_id, lsn);
306            }
307            if old_pin_count == 1 {
308                self.replacer_set_evictable(frame_id, true)?;
309            }
310        }
311        Ok(())
312    }
313
314    /// 辅助函数:为给定的 page_id 查找或分配一个 frame。
315    fn get_frame_for_page(&self, page_id: PageId) -> QuillSQLResult<FrameId> {
316        if let Some(frame_id_ref) = self.page_table.get(&page_id) {
317            let frame_id = *frame_id_ref;
318            self.replacer_record_access(frame_id)?;
319            // record access for admission
320            if let Some(f) = &self.tiny_lfu {
321                f.write().admit_record(page_id as u64);
322            }
323            Ok(frame_id)
324        } else {
325            // Serialize concurrent loads for the same page_id
326            let (lock_arc, created_here) = if let Some(g) = self.inflight_loads.get(&page_id) {
327                (g.clone(), false)
328            } else {
329                let arc = Arc::new(Mutex::new(()));
330                self.inflight_loads.insert(page_id, arc.clone());
331                (arc, true)
332            };
333
334            let _lock_guard = lock_arc.lock();
335
336            // Re-check after acquiring inflight guard to avoid duplicate loads
337            if let Some(frame_id_ref2) = self.page_table.get(&page_id) {
338                let frame_id2 = *frame_id_ref2;
339                self.replacer_record_access(frame_id2)?;
340                return Ok(frame_id2);
341            }
342
343            // Optional admission: if LFU thinks it's cold and replacer is full, deny admission
344            if let Some(f) = &self.tiny_lfu {
345                let est = f.read().estimate(page_id as u64);
346                if est == 0 && self.free_list.read().is_empty() && self.replacer.read().size() == 0
347                {
348                    return Err(QuillSQLError::Storage(
349                        "Cannot allocate frame: admission denied and no space".to_string(),
350                    ));
351                }
352            }
353
354            let frame_id = match self.allocate_frame() {
355                Ok(fid) => fid,
356                Err(e) => {
357                    if created_here {
358                        self.inflight_loads.remove(&page_id);
359                    }
360                    return Err(e);
361                }
362            };
363
364            let page_data_bytes = match self.disk_scheduler.schedule_read(page_id) {
365                Ok(rx) => match rx.recv() {
366                    Ok(Ok(bytes)) => bytes,
367                    Ok(Err(e)) => {
368                        if created_here {
369                            self.inflight_loads.remove(&page_id);
370                        }
371                        return Err(e);
372                    }
373                    Err(e) => {
374                        if created_here {
375                            self.inflight_loads.remove(&page_id);
376                        }
377                        return Err(QuillSQLError::Internal(format!(
378                            "Channel disconnected: {}",
379                            e
380                        )));
381                    }
382                },
383                Err(e) => {
384                    if created_here {
385                        self.inflight_loads.remove(&page_id);
386                    }
387                    return Err(e);
388                }
389            };
390
391            let mut page_data_array = [0u8; PAGE_SIZE];
392            page_data_array.copy_from_slice(&page_data_bytes[..PAGE_SIZE]);
393
394            let page_arc = &self.pool[frame_id];
395            {
396                let mut page = page_arc.write();
397                *page = Page::new(page_id);
398                page.data = page_data_array;
399                // pin_count 将在调用者中设置
400            }
401
402            self.page_table.insert(page_id, frame_id);
403            if let Some(f) = &self.tiny_lfu {
404                f.write().admit_record(page_id as u64);
405            }
406            if created_here {
407                self.inflight_loads.remove(&page_id);
408            }
409
410            self.replacer_record_access(frame_id)?;
411            Ok(frame_id)
412        }
413    }
414
415    pub fn fetch_table_page(
416        self: &Arc<Self>,
417        page_id: PageId,
418        schema: SchemaRef,
419    ) -> QuillSQLResult<(ReadPageGuard, TablePage)> {
420        let guard = self.fetch_page_read(page_id)?;
421        // Guard exposes `data` directly; decode a typed view on demand
422        let (table_page, _) = TablePageCodec::decode(&guard.data, schema)?;
423        Ok((guard, table_page))
424    }
425
426    pub fn fetch_tree_page(
427        self: &Arc<Self>,
428        page_id: PageId,
429        key_schema: SchemaRef,
430    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreePage)> {
431        let guard = self.fetch_page_read(page_id)?;
432        let (tree_page, _) = BPlusTreePageCodec::decode(&guard.data, key_schema.clone())?;
433        Ok((guard, tree_page))
434    }
435
436    pub fn fetch_tree_internal_page(
437        self: &Arc<Self>,
438        page_id: PageId,
439        key_schema: SchemaRef,
440    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeInternalPage)> {
441        let guard = self.fetch_page_read(page_id)?;
442        let (tree_internal_page, _) =
443            BPlusTreeInternalPageCodec::decode(&guard.data, key_schema.clone())?;
444        Ok((guard, tree_internal_page))
445    }
446
447    pub fn fetch_tree_leaf_page(
448        self: &Arc<Self>,
449        page_id: PageId,
450        key_schema: SchemaRef,
451    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeLeafPage)> {
452        let guard = self.fetch_page_read(page_id)?;
453        let (tree_leaf_page, _) = BPlusTreeLeafPageCodec::decode(&guard.data, key_schema.clone())?;
454        Ok((guard, tree_leaf_page))
455    }
456
457    /// Best-effort prefetch: fetch+drop to warm the cache without holding the pin.
458    pub fn prefetch_page(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<()> {
459        if let Ok(g) = self.fetch_page_read(page_id) {
460            drop(g);
461        }
462        Ok(())
463    }
464
465    pub fn fetch_header_page(
466        self: &Arc<Self>,
467        page_id: PageId,
468    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeHeaderPage)> {
469        let guard = self.fetch_page_read(page_id)?;
470        let (header_page, _) = BPlusTreeHeaderPageCodec::decode(&guard.data)?;
471        Ok((guard, header_page))
472    }
473
474    pub fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
475        if let Some(frame_id_ref) = self.page_table.get(&page_id) {
476            let frame_id = *frame_id_ref;
477            let page_arc = self.pool[frame_id].clone();
478
479            // Hold write lock for the whole flush to avoid racing writers clearing dirty bit.
480            let mut guard = page_arc.write();
481            if !guard.is_dirty {
482                self.dirty_pages.remove(&page_id);
483                return Ok(false);
484            }
485
486            if let Some(wal) = self.wal_manager.read().clone() {
487                let durable_lsn = wal.durable_lsn();
488                if guard.page_lsn > durable_lsn {
489                    let target = guard.page_lsn;
490                    wal.flush(Some(target))?;
491                    if wal.durable_lsn() < target {
492                        return Err(QuillSQLError::Internal(format!(
493                            "Flush of page {} blocked: page_lsn={} > durable_lsn={}",
494                            page_id,
495                            guard.page_lsn,
496                            wal.durable_lsn()
497                        )));
498                    }
499                }
500            }
501            let data_bytes = Bytes::copy_from_slice(&guard.data);
502
503            self.disk_scheduler
504                .schedule_write(page_id, data_bytes)?
505                .recv()
506                .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
507
508            guard.is_dirty = false;
509            self.dirty_pages.remove(&page_id);
510            Ok(true)
511        } else {
512            Ok(false)
513        }
514    }
515
516    pub fn flush_all_pages(&self) -> QuillSQLResult<()> {
517        if let Some(wal) = self.wal_manager.read().clone() {
518            wal.flush(None)?;
519        }
520        let dirty_ids: Vec<PageId> = self.dirty_pages.iter().map(|entry| *entry.key()).collect();
521        for page_id in dirty_ids {
522            let _ = self.flush_page(page_id)?;
523        }
524        Ok(())
525    }
526
527    pub fn delete_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
528        let (lock_arc, created_here) = if let Some(g) = self.inflight_loads.get(&page_id) {
529            (g.clone(), false)
530        } else {
531            let arc = Arc::new(Mutex::new(()));
532            self.inflight_loads.insert(page_id, arc.clone());
533            (arc, true)
534        };
535
536        let mut inflight_lock = Some(lock_arc.lock());
537        let mut cleanup = |bpm: &BufferPoolManager| {
538            if let Some(g) = inflight_lock.take() {
539                drop(g);
540            }
541            if created_here {
542                bpm.inflight_loads.remove(&page_id);
543            }
544        };
545
546        let result = (|| {
547            loop {
548                if let Some(frame_ref) = self.page_table.get(&page_id) {
549                    let frame_id = *frame_ref;
550                    let page_arc = self.pool[frame_id].clone();
551                    drop(frame_ref);
552
553                    let mut page_writer = match page_arc.try_write() {
554                        Some(guard) => guard,
555                        None => {
556                            // 页面正在被其他读/写操作持有,删除失败且不修改页表
557                            return Ok(false);
558                        }
559                    };
560
561                    if page_writer.page_id() != page_id {
562                        // Frame was reused for another page; clean the stale mapping and retry
563                        drop(page_writer);
564                        let _ = self
565                            .page_table
566                            .remove_if(&page_id, |_, fid| *fid == frame_id);
567                        continue;
568                    }
569
570                    if page_writer.get_pin_count() > 0 {
571                        // 仍有 pin,无需继续,保持映射不变
572                        return Ok(false);
573                    }
574
575                    if self
576                        .page_table
577                        .remove_if(&page_id, |_, fid| *fid == frame_id)
578                        .is_none()
579                    {
580                        continue;
581                    }
582
583                    page_writer.destroy();
584                    self.dirty_pages.remove(&page_id);
585                    drop(page_writer);
586
587                    {
588                        let mut rep = self.replacer.write();
589                        let _ = rep.set_evictable(frame_id, true);
590                        let _ = rep.remove(frame_id);
591                    }
592
593                    self.free_list.write().push_back(frame_id);
594
595                    self.disk_scheduler
596                        .schedule_deallocate(page_id)?
597                        .recv()
598                        .map_err(|e| {
599                            QuillSQLError::Internal(format!("Channel disconnected: {}", e))
600                        })??;
601
602                    return Ok(true);
603                } else {
604                    // Page not in buffer pool, but we should still try to deallocate from disk.
605                    self.disk_scheduler
606                        .schedule_deallocate(page_id)?
607                        .recv()
608                        .map_err(|e| {
609                            QuillSQLError::Internal(format!("Channel disconnected: {}", e))
610                        })??;
611                    return Ok(true);
612                }
613            }
614        })();
615
616        cleanup(self);
617        result
618    }
619
620    fn allocate_frame(&self) -> QuillSQLResult<FrameId> {
621        // Try free list first
622        if let Some(frame_id) = self.free_list.write().pop_front() {
623            return Ok(frame_id);
624        }
625
626        // Then evict from replacer, with safety checks
627        loop {
628            let opt = { self.replacer.write().evict() };
629            let Some(frame_id) = opt else {
630                return Err(QuillSQLError::Storage(
631                    "Cannot allocate frame: buffer pool is full and all pages are pinned"
632                        .to_string(),
633                ));
634            };
635
636            let evicted_page_arc = self.pool[frame_id].clone();
637            // Use a scoped block to ensure the try_write guard (and Option) drop before next loop
638            let handled = {
639                let opt_guard = evicted_page_arc.try_write();
640                if let Some(evicted_page_writer) = opt_guard {
641                    let evicted_page_id = evicted_page_writer.page_id;
642                    if evicted_page_writer.get_pin_count() > 0 {
643                        drop(evicted_page_writer);
644                        let mut rep = self.replacer.write();
645                        rep.record_access(frame_id)?;
646                        rep.set_evictable(frame_id, true)?;
647                        false
648                    } else {
649                        let need_flush = evicted_page_writer.is_dirty;
650                        drop(evicted_page_writer);
651                        if need_flush {
652                            self.flush_page(evicted_page_id)?;
653                        }
654                        self.page_table.remove(&evicted_page_id);
655                        // signal handled and return after block
656                        // we cannot return here due to borrow; mark and handle below
657                        // store the decision in handled flag
658                        // but we need evicted_page_id only for flush which is done
659                        // so we can use a marker
660                        true
661                    }
662                } else {
663                    // Couldn't get exclusive lock; frame likely pinned. Try next.
664                    let mut rep = self.replacer.write();
665                    rep.record_access(frame_id)?;
666                    rep.set_evictable(frame_id, true)?;
667                    false
668                }
669            };
670            if handled {
671                return Ok(frame_id);
672            }
673        }
674    }
675}
676
677#[cfg(test)]
678mod tests {
679    use crate::buffer::buffer_pool::BufferPoolManager;
680    use crate::config::WalConfig;
681    use crate::recovery::wal_record::{
682        TransactionPayload, TransactionRecordKind, WalRecordPayload,
683    };
684    use crate::recovery::WalManager;
685    use crate::storage::disk_manager::DiskManager;
686    use crate::storage::disk_scheduler::DiskScheduler;
687    use crate::utils::cache::Replacer;
688    use std::fs;
689    use std::sync::Arc;
690    use tempfile::TempDir;
691
692    /// 辅助函数,用于为每个测试设置一个干净的环境。
693    /// 它会创建一个临时目录、DiskManager 和 BufferPoolManager。
694    fn setup_test_environment(
695        num_pages: usize,
696    ) -> (
697        TempDir, // RAII handle for the temp directory
698        Arc<BufferPoolManager>,
699    ) {
700        let temp_dir = TempDir::new().unwrap();
701        let db_path = temp_dir.path().join("test.db");
702
703        let disk_manager = Arc::new(DiskManager::try_new(db_path).unwrap());
704        let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager));
705        let buffer_pool_manager = Arc::new(BufferPoolManager::new(num_pages, disk_scheduler));
706
707        (temp_dir, buffer_pool_manager)
708    }
709
710    #[test]
711    fn test_new_page_and_basic_fetch() {
712        let (_temp_dir, bpm) = setup_test_environment(10);
713
714        // 1. 创建一个新页面
715        let mut page0_guard = bpm.new_page().unwrap();
716        let page0_id = page0_guard.page_id();
717
718        // 2. 写入一些数据
719        let test_data = b"Hello, World!";
720        page0_guard.data[..test_data.len()].copy_from_slice(test_data);
721        assert!(page0_guard.is_dirty); // 可变访问应该标记为脏页
722
723        // 3. 读取并验证数据
724        assert_eq!(&page0_guard.data[..test_data.len()], test_data);
725
726        // 4. 先释放写保护器,然后获取读保护器
727        drop(page0_guard);
728
729        let page0_read_guard = bpm.fetch_page_read(page0_id).unwrap();
730        assert_eq!(page0_read_guard.page_id(), page0_id);
731        assert_eq!(&page0_read_guard.data[..test_data.len()], test_data);
732        assert_eq!(page0_read_guard.pin_count(), 1);
733
734        // 5. Drop 读保护器
735        drop(page0_read_guard);
736
737        // 6. 确认 pin count 归零
738        let final_guard = bpm.fetch_page_read(page0_id).unwrap();
739        assert_eq!(final_guard.pin_count(), 1);
740        assert_eq!(final_guard.is_dirty, true); // 脏位应该保持
741    }
742
743    #[test]
744    fn test_unpin_and_eviction_logic() {
745        let (_temp_dir, bpm) = setup_test_environment(3);
746
747        // 1. 创建3个页面,填满缓冲池
748        let page1 = bpm.new_page().unwrap();
749        let page1_id = page1.page_id();
750        let page2 = bpm.new_page().unwrap();
751        let page2_id = page2.page_id();
752        let page3 = bpm.new_page().unwrap();
753        let page3_id = page3.page_id();
754
755        // 此时 replacer 为空,因为所有页面都被 pin 住
756        assert_eq!(bpm.replacer.read().size(), 0);
757
758        // 2. Drop page1,它应该变得可被驱逐
759        drop(page1);
760        assert_eq!(bpm.replacer.read().size(), 1);
761
762        // 3. Drop page2,它也应该变得可被驱逐
763        drop(page2);
764        assert_eq!(bpm.replacer.read().size(), 2);
765
766        // 4. 创建一个新的页面,这将触发驱逐
767        // LRU-K 策略下,page1_id 是最先被 unpin 的,应该被驱逐
768        let page4 = bpm.new_page().unwrap();
769        assert_ne!(page4.page_id(), page1_id);
770
771        // 5. 验证 page1 已经不在 page_table 中
772        assert!(bpm.page_table.get(&page1_id).is_none());
773        assert!(bpm.page_table.get(&page2_id).is_some());
774        assert!(bpm.page_table.get(&page3_id).is_some());
775
776        // 6. page3 仍然被 pin 住,所以 replacer 中只有一个 page2
777        assert_eq!(bpm.replacer.read().size(), 1);
778    }
779
780    #[test]
781    fn test_flush_page() {
782        let (temp_dir, bpm) = setup_test_environment(10);
783        let db_path = temp_dir.path().join("test.db");
784
785        // 1. 创建一个新页面并写入数据
786        let page_id = {
787            let mut guard = bpm.new_page().unwrap();
788            guard.data[0..4].copy_from_slice(&[0xDE, 0xAD, 0xBE, 0xEF]);
789            guard.page_id()
790            // guard 在此 drop,unpin 时 is_dirty 应该为 true
791        };
792
793        // 2. 调用 flush_page
794        let flush_result = bpm.flush_page(page_id).unwrap();
795        assert!(flush_result);
796
797        // 3. 验证页面的脏位已被清除
798        let guard = bpm.fetch_page_read(page_id).unwrap();
799        assert!(!guard.is_dirty);
800        drop(guard);
801
802        // 4. 验证数据确实被写入磁盘(不检查具体位置,因为DiskManager的实现细节可能不同)
803        let file_data = fs::read(db_path).unwrap();
804
805        // 搜索整个文件,确认数据已写入
806        let mut found = false;
807        for i in 0..=(file_data.len().saturating_sub(4)) {
808            if &file_data[i..i + 4] == &[0xDE, 0xAD, 0xBE, 0xEF] {
809                found = true;
810                break;
811            }
812        }
813
814        assert!(found, "Test data was not written to disk correctly");
815    }
816
817    #[test]
818    fn test_flush_requires_durable_wal() {
819        let (temp_dir, bpm) = setup_test_environment(4);
820        let wal_dir = temp_dir.path().join("wal");
821        let scheduler = bpm.disk_scheduler.clone();
822        let wal = Arc::new(
823            WalManager::new(
824                WalConfig {
825                    directory: wal_dir.clone(),
826                    ..WalConfig::default()
827                },
828                scheduler,
829                None,
830                None,
831            )
832            .expect("wal manager"),
833        );
834        bpm.set_wal_manager(wal.clone());
835
836        let page_id;
837        let lsn;
838        {
839            let mut guard = bpm.new_page().expect("new page");
840            page_id = guard.page_id();
841            guard.data[0..4].copy_from_slice(b"walu");
842            lsn = wal
843                .append_record_with(|_ctx| {
844                    WalRecordPayload::Transaction(TransactionPayload {
845                        marker: TransactionRecordKind::Begin,
846                        txn_id: 1,
847                    })
848                })
849                .expect("append wal record")
850                .end_lsn;
851            guard.page_lsn = lsn;
852        }
853
854        let flushed = bpm
855            .flush_page(page_id)
856            .expect("flush should auto-flush wal");
857        assert!(flushed, "page should flush once wal is durable");
858        assert!(wal.durable_lsn() >= lsn, "wal durable lsn should advance");
859
860        let wal_files: Vec<_> = std::fs::read_dir(&wal_dir)
861            .expect("wal directory")
862            .filter_map(|entry| entry.ok())
863            .collect();
864        assert!(
865            !wal_files.is_empty(),
866            "wal flush should create segment file"
867        );
868        let total_size: u64 = wal_files
869            .iter()
870            .map(|entry| entry.metadata().map(|m| m.len()).unwrap_or(0))
871            .sum();
872        assert!(total_size > 0, "wal segment should contain data");
873    }
874
875    #[test]
876    fn test_delete_page() {
877        let (_temp_dir, bpm) = setup_test_environment(10);
878
879        // 1. 创建一些页面
880        let page1_id = bpm.new_page().unwrap().page_id();
881        drop(bpm.new_page().unwrap()); // unpin
882
883        assert_eq!(bpm.page_table.len(), 2);
884        assert_eq!(bpm.free_list.read().len(), 8);
885
886        // 2. 删除一个未被 pin 的页面 (page1)
887        drop(bpm.fetch_page_read(page1_id).unwrap()); // unpin page1
888        let deleted = bpm.delete_page(page1_id).unwrap();
889        assert!(deleted);
890
891        // 3. 验证其已被移除
892        assert!(bpm.page_table.get(&page1_id).is_none());
893        assert_eq!(bpm.page_table.len(), 1);
894        assert_eq!(bpm.free_list.read().len(), 9); // free_list 增加
895        assert_eq!(bpm.replacer.read().size(), 1); // 另一个页面还在
896
897        // 4. 尝试获取被删除的页面,应该会从磁盘重新读取(内容为空)
898        let refetched_guard = bpm.fetch_page_read(page1_id).unwrap();
899        assert!(refetched_guard.data.iter().all(|&b| b == 0));
900    }
901
902    #[test]
903    fn test_delete_pinned_page_fails() {
904        let (_temp_dir, bpm) = setup_test_environment(10);
905
906        let guard = bpm.new_page().unwrap();
907        let page_id = guard.page_id();
908
909        // 尝试删除一个被 pin 的页面
910        let deleted = bpm.delete_page(page_id).unwrap();
911        assert!(!deleted); // 应该失败
912
913        // 验证页面仍然存在
914        assert!(bpm.page_table.get(&page_id).is_some());
915    }
916
917    #[test]
918    fn test_delete_page_concurrent_fetch_preserves_mapping() {
919        use std::sync::mpsc;
920        use std::thread;
921
922        let (_temp_dir, bpm) = setup_test_environment(6);
923
924        let page_id = {
925            let guard = bpm.new_page().unwrap();
926            let pid = guard.page_id();
927            drop(guard);
928            pid
929        };
930
931        // Ensure page exists in buffer pool and can be evicted initially
932        drop(bpm.fetch_page_read(page_id).unwrap());
933        assert!(bpm.page_table.get(&page_id).is_some());
934
935        let (ready_tx, ready_rx) = mpsc::channel();
936        let (release_tx, release_rx) = mpsc::channel();
937
938        let bpm_fetch = bpm.clone();
939        let fetcher = thread::spawn(move || {
940            let guard = bpm_fetch.fetch_page_read(page_id).unwrap();
941            ready_tx.send(()).unwrap();
942            release_rx.recv().unwrap();
943            drop(guard);
944        });
945
946        // Wait until the fetcher has pinned the page
947        ready_rx.recv().unwrap();
948
949        let frame_id = *bpm.page_table.get(&page_id).unwrap();
950        assert_eq!(bpm.pool[frame_id].read().get_pin_count(), 1);
951
952        // delete_page should fail because the page is currently pinned and keep the mapping intact
953        let deleted = bpm.delete_page(page_id).unwrap();
954        assert!(!deleted);
955        assert!(bpm.page_table.get(&page_id).is_some());
956
957        // Release the read guard so complete_unpin can mark it evictable again
958        release_tx.send(()).unwrap();
959        fetcher.join().unwrap();
960
961        let frame_id = *bpm.page_table.get(&page_id).unwrap();
962        assert_eq!(bpm.pool[frame_id].read().get_pin_count(), 0);
963        assert_eq!(bpm.replacer.read().size(), 1);
964
965        // Now delete should succeed because the page is no longer pinned
966        assert!(bpm.delete_page(page_id).unwrap());
967    }
968
969    #[test]
970    fn test_delete_page_stale_mapping_does_not_remove_new_page() {
971        let (_temp_dir, bpm) = setup_test_environment(6);
972
973        let page_a = bpm.new_page().unwrap().page_id();
974        let page_b = bpm.new_page().unwrap().page_id();
975
976        drop(bpm.fetch_page_read(page_a).unwrap());
977        drop(bpm.fetch_page_read(page_b).unwrap());
978
979        let frame_a = *bpm.page_table.get(&page_a).unwrap();
980        let frame_b = *bpm.page_table.get(&page_b).unwrap();
981        assert_ne!(frame_a, frame_b);
982
983        // Simulate a race where the page table temporarily points page_a at page_b's frame
984        bpm.page_table.insert(page_a, frame_b);
985
986        assert!(bpm.delete_page(page_a).unwrap());
987        assert!(bpm.page_table.get(&page_a).is_none());
988
989        let frame_b_after = *bpm.page_table.get(&page_b).unwrap();
990        assert_eq!(frame_b_after, frame_b);
991        assert_eq!(bpm.pool[frame_b_after].read().page_id(), page_b);
992    }
993
994    #[test]
995    fn test_buffer_pool_is_full() {
996        let (_temp_dir, bpm) = setup_test_environment(2);
997
998        // 创建两个页面,填满缓冲池,并且一直持有它们的 guard
999        let _page1 = bpm.new_page().unwrap();
1000        let _page2 = bpm.new_page().unwrap();
1001
1002        // 此时缓冲池已满,且所有页面都被 pin 住,无法驱逐
1003        assert_eq!(bpm.replacer.read().size(), 0);
1004        assert!(bpm.free_list.read().is_empty());
1005
1006        // 尝试创建第三个页面,应该会失败
1007        let page3_result = bpm.new_page();
1008        assert!(page3_result.is_err());
1009    }
1010
1011    #[test]
1012    fn test_concurrent_reads_and_exclusive_write() {
1013        let (_temp_dir, bpm) = setup_test_environment(10);
1014
1015        // 创建一个页面
1016        let page_id = {
1017            let mut guard = bpm.new_page().unwrap();
1018            guard.data[0] = 42;
1019            guard.page_id()
1020        };
1021
1022        // 1. 获取一个读保护器
1023        let read_guard1 = bpm.fetch_page_read(page_id).unwrap();
1024        assert_eq!(read_guard1.data[0], 42);
1025        assert_eq!(read_guard1.pin_count(), 1);
1026        drop(read_guard1);
1027
1028        // 2. 验证写操作是独占的
1029        let mut write_guard = bpm.fetch_page_write(page_id).unwrap();
1030        write_guard.data[0] = 99;
1031        assert_eq!(write_guard.data[0], 99);
1032    }
1033
1034    #[test]
1035    fn test_concurrent_same_page_fetch_single_frame() {
1036        use std::thread;
1037        let (_temp_dir, bpm) = setup_test_environment(8);
1038
1039        // Prepare a page on disk by allocating once
1040        let page_id = bpm.new_page().unwrap().page_id();
1041        // Unpin to allow others to fetch
1042        drop(bpm.fetch_page_read(page_id).unwrap());
1043
1044        let threads = (0..8)
1045            .map(|_| {
1046                let bpm_c = bpm.clone();
1047                thread::spawn(move || {
1048                    for _ in 0..100 {
1049                        let g = bpm_c.fetch_page_read(page_id).unwrap();
1050                        assert_eq!(g.page_id(), page_id);
1051                    }
1052                })
1053            })
1054            .collect::<Vec<_>>();
1055
1056        for t in threads {
1057            t.join().unwrap();
1058        }
1059
1060        // Ensure only one frame is mapped to this page
1061        let frame_id = bpm.page_table.get(&page_id).map(|r| *r).unwrap();
1062        // Verify that the frame actually holds the page
1063        assert_eq!(bpm.pool[frame_id].read().page_id(), page_id);
1064    }
1065
1066    #[test]
1067    fn test_delete_vs_fetch_race_safety() {
1068        use std::thread;
1069        let (_temp_dir, bpm) = setup_test_environment(8);
1070
1071        let page_id = bpm.new_page().unwrap().page_id();
1072        drop(bpm.fetch_page_read(page_id).unwrap());
1073
1074        let bpm_del = bpm.clone();
1075        let deleter = thread::spawn(move || {
1076            // Try to delete repeatedly; it may fail if pinned, but must not corrupt state
1077            for _ in 0..1000 {
1078                let _ = bpm_del.delete_page(page_id).unwrap();
1079            }
1080        });
1081
1082        let bpm_fetch = bpm.clone();
1083        let fetcher = thread::spawn(move || {
1084            for _ in 0..1000 {
1085                let _ = bpm_fetch.fetch_page_read(page_id).unwrap();
1086            }
1087        });
1088
1089        deleter.join().unwrap();
1090        fetcher.join().unwrap();
1091
1092        // State must remain consistent: either mapped and readable, or not mapped but deletable
1093        let frame_id_opt = bpm.page_table.get(&page_id).map(|r| *r);
1094        if let Some(frame_id) = frame_id_opt {
1095            assert_eq!(bpm.pool[frame_id].read().page_id(), page_id);
1096        }
1097    }
1098}