quill_sql/buffer/
buffer_manager.rs

1//! BufferManager coordinates replacement, dirty tracking, and WAL with a shared BufferPool.
2
3use std::sync::Arc;
4
5use bytes::Bytes;
6use dashmap::{DashMap, DashSet};
7use parking_lot::{Mutex, RwLock};
8
9use crate::buffer::buffer_pool::{BufferPool, FrameId, FrameMetaSnapshot};
10use crate::buffer::page::{self, PageId, ReadPageGuard, WritePageGuard, INVALID_PAGE_ID};
11use crate::catalog::SchemaRef;
12use crate::config::BufferPoolConfig;
13use crate::error::{QuillSQLError, QuillSQLResult};
14use crate::recovery::{Lsn, WalManager};
15use crate::storage::codec::{
16    BPlusTreeHeaderPageCodec, BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec,
17    BPlusTreePageCodec, TablePageCodec,
18};
19use crate::storage::disk_scheduler::DiskScheduler;
20use crate::storage::page::{
21    BPlusTreeHeaderPage, BPlusTreeInternalPage, BPlusTreeLeafPage, BPlusTreePage, TablePage,
22};
23use crate::utils::cache::lru_k::LRUKReplacer;
24use crate::utils::cache::tiny_lfu::TinyLFU;
25use crate::utils::cache::Replacer;
26
27#[derive(Debug)]
28pub struct BufferManager {
29    pool: Arc<BufferPool>,
30    replacer: Arc<RwLock<LRUKReplacer>>,
31    inflight_loads: DashMap<PageId, Arc<Mutex<()>>>,
32    tiny_lfu: Option<Arc<RwLock<TinyLFU>>>,
33    dirty_pages: DashSet<PageId>,
34    dirty_page_table: DashMap<PageId, Lsn>,
35    wal_manager: Arc<RwLock<Option<Arc<WalManager>>>>,
36}
37
38impl BufferManager {
39    pub fn new(num_pages: usize, disk_scheduler: Arc<DiskScheduler>) -> Self {
40        Self::new_with_config(
41            BufferPoolConfig {
42                buffer_pool_size: num_pages,
43                ..Default::default()
44            },
45            disk_scheduler,
46        )
47    }
48
49    pub fn new_with_config(config: BufferPoolConfig, disk_scheduler: Arc<DiskScheduler>) -> Self {
50        let pool = Arc::new(BufferPool::new_with_config(config, disk_scheduler));
51        let replacer = Arc::new(RwLock::new(LRUKReplacer::new(pool.capacity())));
52        let tiny_lfu = if config.tiny_lfu_enable {
53            Some(Arc::new(RwLock::new(TinyLFU::new(
54                pool.capacity().next_power_of_two(),
55                config.tiny_lfu_counters,
56            ))))
57        } else {
58            None
59        };
60
61        Self {
62            pool,
63            replacer,
64            inflight_loads: DashMap::new(),
65            tiny_lfu,
66            dirty_pages: DashSet::new(),
67            dirty_page_table: DashMap::new(),
68            wal_manager: Arc::new(RwLock::new(None)),
69        }
70    }
71
72    pub fn buffer_pool(&self) -> Arc<BufferPool> {
73        self.pool.clone()
74    }
75
76    pub fn replacer_arc(&self) -> Arc<RwLock<LRUKReplacer>> {
77        self.replacer.clone()
78    }
79
80    pub fn set_wal_manager(&self, wal_manager: Arc<WalManager>) {
81        *self.wal_manager.write() = Some(wal_manager);
82    }
83
84    pub fn wal_manager(&self) -> Option<Arc<WalManager>> {
85        self.wal_manager.read().clone()
86    }
87
88    pub fn frame_meta_snapshot(&self) -> Vec<FrameMetaSnapshot> {
89        self.pool.frame_meta_snapshot()
90    }
91
92    pub fn dirty_page_ids(&self) -> Vec<PageId> {
93        self.dirty_pages.iter().map(|entry| *entry.key()).collect()
94    }
95
96    pub fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)> {
97        self.dirty_page_table
98            .iter()
99            .map(|entry| (*entry.key(), *entry.value()))
100            .collect()
101    }
102
103    fn note_dirty_page(&self, page_id: PageId, rec_lsn: Lsn) {
104        self.dirty_pages.insert(page_id);
105        self.dirty_page_table.entry(page_id).or_insert(rec_lsn);
106    }
107
108    pub fn new_page(self: &Arc<Self>) -> QuillSQLResult<WritePageGuard> {
109        if !self.pool.has_free_frame() && self.replacer.read().size() == 0 {
110            return Err(QuillSQLError::Storage(
111                "Cannot new page because buffer pool is full and no page to evict".to_string(),
112            ));
113        }
114
115        let frame_id = self.allocate_frame()?;
116        let page_id = self.pool.allocate_page_id()?;
117        self.pool.insert_mapping(page_id, frame_id);
118
119        {
120            let meta = self.pool.frame_meta(frame_id);
121            meta.initialize(page_id);
122            meta.increment_pin();
123        }
124
125        self.pool.reset_frame(frame_id);
126        self.replacer_record_access(frame_id)?;
127        self.mark_non_evictable(frame_id)?;
128        Ok(page::new_write_guard(Arc::clone(self), frame_id))
129    }
130
131    pub fn fetch_page_read(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<ReadPageGuard> {
132        if page_id == INVALID_PAGE_ID {
133            return Err(QuillSQLError::Storage(
134                "fetch_page_read: invalid page id".to_string(),
135            ));
136        }
137
138        let frame_id = self.ensure_frame(page_id)?;
139        {
140            let meta = self.pool.frame_meta(frame_id);
141            meta.increment_pin();
142        }
143        self.replacer_record_access(frame_id)?;
144        self.mark_non_evictable(frame_id)?;
145        Ok(page::new_read_guard(Arc::clone(self), frame_id))
146    }
147
148    pub fn fetch_page_write(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<WritePageGuard> {
149        if page_id == INVALID_PAGE_ID {
150            return Err(QuillSQLError::Storage(
151                "fetch_page_write: invalid page id".to_string(),
152            ));
153        }
154
155        let frame_id = self.ensure_frame(page_id)?;
156        {
157            let meta = self.pool.frame_meta(frame_id);
158            meta.increment_pin();
159        }
160        self.replacer_record_access(frame_id)?;
161        self.mark_non_evictable(frame_id)?;
162        Ok(page::new_write_guard(Arc::clone(self), frame_id))
163    }
164
165    pub fn complete_unpin(
166        &self,
167        page_id: PageId,
168        is_dirty: bool,
169        rec_lsn_hint: Option<Lsn>,
170    ) -> QuillSQLResult<()> {
171        if let Some(frame_id) = self.pool.lookup_frame(page_id) {
172            let meta = self.pool.frame_meta(frame_id);
173            let new_pin_count = if meta.pin_count() > 0 {
174                meta.try_decrement_pin().unwrap_or(0)
175            } else {
176                0
177            };
178            if is_dirty {
179                meta.mark_dirty();
180                if let Some(lsn) = rec_lsn_hint {
181                    meta.set_lsn(lsn);
182                }
183                let rec_lsn = rec_lsn_hint.unwrap_or_else(|| meta.lsn());
184                self.note_dirty_page(page_id, rec_lsn);
185            }
186            if new_pin_count == 0 {
187                self.mark_evictable(frame_id)?;
188            }
189        }
190        Ok(())
191    }
192
193    pub fn fetch_table_page(
194        self: &Arc<Self>,
195        page_id: PageId,
196        schema: SchemaRef,
197    ) -> QuillSQLResult<(ReadPageGuard, TablePage)> {
198        let guard = self.fetch_page_read(page_id)?;
199        let (page, _) = TablePageCodec::decode(guard.data(), schema)?;
200        Ok((guard, page))
201    }
202
203    pub fn fetch_tree_page(
204        self: &Arc<Self>,
205        page_id: PageId,
206        key_schema: SchemaRef,
207    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreePage)> {
208        let guard = self.fetch_page_read(page_id)?;
209        let (page, _) = BPlusTreePageCodec::decode(guard.data(), key_schema.clone())?;
210        Ok((guard, page))
211    }
212
213    pub fn fetch_tree_internal_page(
214        self: &Arc<Self>,
215        page_id: PageId,
216        key_schema: SchemaRef,
217    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeInternalPage)> {
218        let guard = self.fetch_page_read(page_id)?;
219        let (page, _) = BPlusTreeInternalPageCodec::decode(guard.data(), key_schema.clone())?;
220        Ok((guard, page))
221    }
222
223    pub fn fetch_tree_leaf_page(
224        self: &Arc<Self>,
225        page_id: PageId,
226        key_schema: SchemaRef,
227    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeLeafPage)> {
228        let guard = self.fetch_page_read(page_id)?;
229        let (page, _) = BPlusTreeLeafPageCodec::decode(guard.data(), key_schema.clone())?;
230        Ok((guard, page))
231    }
232
233    pub fn fetch_header_page(
234        self: &Arc<Self>,
235        page_id: PageId,
236    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeHeaderPage)> {
237        let guard = self.fetch_page_read(page_id)?;
238        let (header, _) = BPlusTreeHeaderPageCodec::decode(guard.data())?;
239        Ok((guard, header))
240    }
241
242    pub fn prefetch_page(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<()> {
243        if let Ok(g) = self.fetch_page_read(page_id) {
244            drop(g);
245        }
246        Ok(())
247    }
248
249    pub fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
250        let Some(frame_id) = self.pool.lookup_frame(page_id) else {
251            return Ok(false);
252        };
253        let meta = self.pool.frame_meta(frame_id);
254        if !meta.is_dirty() {
255            self.dirty_pages.remove(&page_id);
256            self.dirty_page_table.remove(&page_id);
257            return Ok(false);
258        }
259        let lsn = meta.lsn();
260        self.ensure_wal_durable(lsn)?;
261        let bytes = {
262            let _lock = self.pool.frame_lock(frame_id).read();
263            let slice = unsafe { self.pool.frame_slice(frame_id) };
264            Bytes::copy_from_slice(slice)
265        };
266        self.pool.write_page_to_disk(page_id, bytes)?;
267        meta.clear_dirty();
268        self.dirty_pages.remove(&page_id);
269        self.dirty_page_table.remove(&page_id);
270        Ok(true)
271    }
272
273    pub fn flush_all_pages(&self) -> QuillSQLResult<()> {
274        if let Some(wal) = self.wal_manager.read().clone() {
275            wal.flush(None)?;
276        }
277        let dirty_ids: Vec<PageId> = self.dirty_pages.iter().map(|entry| *entry.key()).collect();
278        for page_id in dirty_ids {
279            let _ = self.flush_page(page_id)?;
280        }
281        Ok(())
282    }
283
284    pub fn delete_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
285        let (guard, created_here) = if let Some(existing) = self.inflight_loads.get(&page_id) {
286            (existing.clone(), false)
287        } else {
288            let arc = Arc::new(Mutex::new(()));
289            self.inflight_loads.insert(page_id, arc.clone());
290            (arc, true)
291        };
292        let lock = guard.lock();
293        let result = self.delete_page_inner(page_id);
294        drop(lock);
295        if created_here {
296            self.inflight_loads.remove(&page_id);
297        }
298        result
299    }
300
301    fn delete_page_inner(&self, page_id: PageId) -> QuillSQLResult<bool> {
302        if let Some(frame_id) = self.pool.lookup_frame(page_id) {
303            let lock = self.pool.frame_lock(frame_id);
304            let guard = match lock.try_write() {
305                Some(g) => g,
306                None => return Ok(false),
307            };
308            drop(guard);
309            let meta = self.pool.frame_meta(frame_id);
310            if meta.page_id() != page_id {
311                self.pool.remove_mapping_if(page_id, frame_id);
312                return self.delete_page_inner(page_id);
313            }
314            if meta.pin_count() > 0 {
315                return Ok(false);
316            }
317            if !self.pool.remove_mapping_if(page_id, frame_id) {
318                return self.delete_page_inner(page_id);
319            }
320            self.pool.reset_frame(frame_id);
321            self.dirty_pages.remove(&page_id);
322            self.dirty_page_table.remove(&page_id);
323            self.pool.clear_frame_meta(frame_id);
324            {
325                let mut rep = self.replacer.write();
326                let _ = rep.set_evictable(frame_id, true);
327                rep.remove(frame_id);
328            }
329            self.pool.push_free_frame(frame_id);
330            self.pool
331                .disk_scheduler()
332                .schedule_deallocate(page_id)?
333                .recv()
334                .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
335            Ok(true)
336        } else {
337            self.pool
338                .disk_scheduler()
339                .schedule_deallocate(page_id)?
340                .recv()
341                .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
342            Ok(true)
343        }
344    }
345
346    fn ensure_frame(&self, page_id: PageId) -> QuillSQLResult<FrameId> {
347        if let Some(frame_id) = self.pool.lookup_frame(page_id) {
348            self.replacer_record_access(frame_id)?;
349            if let Some(lfu) = &self.tiny_lfu {
350                lfu.write().admit_record(page_id as u64);
351            }
352            return Ok(frame_id);
353        }
354
355        let (guard, created_here) = if let Some(existing) = self.inflight_loads.get(&page_id) {
356            (existing.clone(), false)
357        } else {
358            let arc = Arc::new(Mutex::new(()));
359            self.inflight_loads.insert(page_id, arc.clone());
360            (arc, true)
361        };
362        let _lock = guard.lock();
363
364        if let Some(frame_id) = self.pool.lookup_frame(page_id) {
365            if created_here {
366                self.inflight_loads.remove(&page_id);
367            }
368            self.replacer_record_access(frame_id)?;
369            if let Some(lfu) = &self.tiny_lfu {
370                lfu.write().admit_record(page_id as u64);
371            }
372            return Ok(frame_id);
373        }
374
375        if let Some(lfu) = &self.tiny_lfu {
376            let estimate = lfu.read().estimate(page_id as u64);
377            if estimate == 0 && !self.pool.has_free_frame() && self.replacer.read().size() == 0 {
378                if created_here {
379                    self.inflight_loads.remove(&page_id);
380                }
381                return Err(QuillSQLError::Storage(
382                    "Cannot allocate frame: admission denied and no space".to_string(),
383                ));
384            }
385        }
386
387        let frame_id = self.allocate_frame()?;
388        self.pool.load_page_into_frame(page_id, frame_id)?;
389        self.pool.insert_mapping(page_id, frame_id);
390
391        if let Some(lfu) = &self.tiny_lfu {
392            lfu.write().admit_record(page_id as u64);
393        }
394        if created_here {
395            self.inflight_loads.remove(&page_id);
396        }
397        self.replacer_record_access(frame_id)?;
398        Ok(frame_id)
399    }
400
401    fn allocate_frame(&self) -> QuillSQLResult<FrameId> {
402        if let Some(frame_id) = self.pool.pop_free_frame() {
403            return Ok(frame_id);
404        }
405        self.evict_victim_frame()
406    }
407
408    fn replacer_record_access(&self, frame_id: FrameId) -> QuillSQLResult<()> {
409        let mut rep = self.replacer.write();
410        let _ = rep.record_access(frame_id);
411        Ok(())
412    }
413
414    fn evict_victim_frame(&self) -> QuillSQLResult<FrameId> {
415        loop {
416            let victim = {
417                let mut rep = self.replacer.write();
418                match rep.evict() {
419                    Some(frame_id) => frame_id,
420                    None => {
421                        return Err(QuillSQLError::Storage(
422                            "Cannot allocate frame: buffer pool is full".to_string(),
423                        ))
424                    }
425                }
426            };
427
428            let snapshot = self.pool.frame_meta(victim).snapshot();
429            let page_id = snapshot.page_id;
430            let pin_count = snapshot.pin_count;
431            let is_dirty = snapshot.is_dirty;
432            let lsn = snapshot.lsn;
433
434            if pin_count > 0 {
435                let mut rep = self.replacer.write();
436                let _ = rep.record_access(victim);
437                let _ = rep.set_evictable(victim, false);
438                continue;
439            }
440
441            if page_id != INVALID_PAGE_ID {
442                if is_dirty {
443                    self.ensure_wal_durable(lsn)?;
444                    let bytes = Bytes::copy_from_slice(unsafe { self.pool.frame_slice(victim) });
445                    self.pool.write_page_to_disk(page_id, bytes)?;
446                    self.dirty_pages.remove(&page_id);
447                    self.dirty_page_table.remove(&page_id);
448                }
449                self.pool.remove_mapping(page_id);
450            }
451
452            self.pool.clear_frame_meta(victim);
453            self.pool.reset_frame(victim);
454            return Ok(victim);
455        }
456    }
457
458    fn mark_evictable(&self, frame_id: FrameId) -> QuillSQLResult<()> {
459        let mut rep = self.replacer.write();
460        let _ = rep.set_evictable(frame_id, true);
461        Ok(())
462    }
463
464    fn mark_non_evictable(&self, frame_id: FrameId) -> QuillSQLResult<()> {
465        let mut rep = self.replacer.write();
466        let _ = rep.set_evictable(frame_id, false);
467        Ok(())
468    }
469
470    fn ensure_wal_durable(&self, lsn: Lsn) -> QuillSQLResult<()> {
471        if lsn == 0 {
472            return Ok(());
473        }
474        if let Some(wal) = self.wal_manager.read().clone() {
475            if lsn > wal.durable_lsn() {
476                wal.flush(Some(lsn))?;
477                if wal.durable_lsn() < lsn {
478                    return Err(QuillSQLError::Internal(format!(
479                        "Flush blocked: page_lsn={} > durable_lsn={}",
480                        lsn,
481                        wal.durable_lsn()
482                    )));
483                }
484            }
485        }
486        Ok(())
487    }
488
489    pub fn clone_arc(self: &Arc<Self>) -> Arc<Self> {
490        self.clone()
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497    use crate::storage::disk_manager::DiskManager;
498    use crate::storage::disk_scheduler::DiskScheduler;
499    use std::sync::{Arc, Barrier};
500    use std::thread;
501    use tempfile::TempDir;
502
503    fn setup_manager(num_pages: usize) -> (TempDir, Arc<BufferManager>) {
504        let temp_dir = TempDir::new().unwrap();
505        let db_file = temp_dir.path().join("test.db");
506        let disk_manager = Arc::new(DiskManager::try_new(db_file).unwrap());
507        let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager));
508        let manager = Arc::new(BufferManager::new(num_pages, disk_scheduler));
509        (temp_dir, manager)
510    }
511
512    #[test]
513    fn new_page_initializes_frame() {
514        let (_tmp, manager) = setup_manager(2);
515        let guard = manager.new_page().unwrap();
516        let page_id = guard.page_id();
517        let frame_id = guard.frame_id();
518
519        // Page should be zeroed
520        assert!(guard.data().iter().all(|b| *b == 0));
521        assert!(!guard.is_dirty());
522        assert_eq!(guard.lsn(), 0);
523
524        drop(guard);
525
526        let meta = manager.buffer_pool().frame_meta(frame_id).snapshot();
527        assert_eq!(meta.page_id, page_id);
528        assert_eq!(meta.pin_count, 0);
529        assert!(!meta.is_dirty);
530    }
531
532    #[test]
533    fn fetch_page_read_increments_pin_and_resets_on_drop() {
534        let (_tmp, manager) = setup_manager(2);
535        let guard = manager.new_page().unwrap();
536        let page_id = guard.page_id();
537        let frame_id = guard.frame_id();
538        drop(guard);
539
540        {
541            let read_guard = manager.fetch_page_read(page_id).unwrap();
542            assert_eq!(read_guard.pin_count(), 1);
543            assert_eq!(read_guard.frame_id(), frame_id);
544        }
545
546        let meta = manager.buffer_pool().frame_meta(frame_id).snapshot();
547        assert_eq!(meta.pin_count, 0);
548    }
549
550    #[test]
551    fn fetch_page_write_marks_dirty_and_tracks_lsn() {
552        let (_tmp, manager) = setup_manager(2);
553        let mut guard = manager.new_page().unwrap();
554        let page_id = guard.page_id();
555        guard.data_mut()[0] = 7;
556        guard.set_lsn(99);
557        guard.mark_dirty();
558        drop(guard);
559
560        let mut write_guard = manager.fetch_page_write(page_id).unwrap();
561        assert!(write_guard.is_dirty());
562        assert_eq!(write_guard.lsn(), 99);
563        write_guard.data_mut()[1] = 8;
564        drop(write_guard);
565
566        let meta = manager
567            .buffer_pool()
568            .frame_meta(manager.buffer_pool().lookup_frame(page_id).unwrap())
569            .snapshot();
570        assert!(meta.is_dirty);
571        assert_eq!(meta.lsn, 99);
572        assert_eq!(meta.pin_count, 0);
573    }
574
575    #[test]
576    fn flush_page_writes_back_and_clears_dirty_flag() {
577        let (_tmp, manager) = setup_manager(2);
578        let mut guard = manager.new_page().unwrap();
579        let page_id = guard.page_id();
580        guard.data_mut()[0] = 42;
581        guard.set_lsn(123);
582        guard.mark_dirty();
583        drop(guard);
584
585        assert!(manager.flush_page(page_id).unwrap());
586
587        let meta = manager
588            .buffer_pool()
589            .frame_meta(manager.buffer_pool().lookup_frame(page_id).unwrap())
590            .snapshot();
591        assert!(!meta.is_dirty);
592    }
593
594    #[test]
595    fn delete_page_releases_frame() {
596        let (_tmp, manager) = setup_manager(2);
597        let page_id = {
598            let guard = manager.new_page().unwrap();
599            guard.page_id()
600        };
601
602        assert!(manager.delete_page(page_id).unwrap());
603        assert!(manager.buffer_pool().lookup_frame(page_id).is_none());
604        assert!(manager.buffer_pool().has_free_frame());
605
606        // Ensure subsequent allocation succeeds and pool remains operational
607        let new_guard = manager.new_page().unwrap();
608        assert!(new_guard.frame_id() < manager.buffer_pool().capacity());
609    }
610
611    #[test]
612    fn concurrent_reads_do_not_leak_pins() {
613        const THREADS: usize = 8;
614        let (_tmp, manager) = setup_manager(4);
615        let (page_id, frame_id) = {
616            let mut guard = manager.new_page().unwrap();
617            guard.data_mut()[0] = 42;
618            (guard.page_id(), guard.frame_id())
619        };
620
621        let barrier = Arc::new(Barrier::new(THREADS));
622        let mut handles = Vec::with_capacity(THREADS);
623        for _ in 0..THREADS {
624            let mgr = manager.clone();
625            let barrier = barrier.clone();
626            handles.push(thread::spawn(move || {
627                barrier.wait();
628                for _ in 0..50 {
629                    let guard = mgr.fetch_page_read(page_id).expect("read page");
630                    assert_eq!(guard.data()[0], 42);
631                }
632            }));
633        }
634
635        for handle in handles {
636            handle.join().unwrap();
637        }
638
639        let pool = manager.buffer_pool();
640        let meta = pool.frame_meta(frame_id).snapshot();
641        assert_eq!(meta.pin_count, 0);
642        assert_eq!(meta.page_id, page_id);
643    }
644
645    #[test]
646    fn concurrent_writes_mark_dirty_and_flush_once() {
647        const THREADS: usize = 4;
648        let (_tmp, manager) = setup_manager(4);
649        let (page_id, frame_id) = {
650            let guard = manager.new_page().unwrap();
651            (guard.page_id(), guard.frame_id())
652        };
653
654        let barrier = Arc::new(Barrier::new(THREADS));
655        let mut handles = Vec::with_capacity(THREADS);
656        for tid in 0..THREADS {
657            let mgr = manager.clone();
658            let barrier = barrier.clone();
659            handles.push(thread::spawn(move || {
660                let lsn = (tid as Lsn) + 1;
661                barrier.wait();
662                for _ in 0..25 {
663                    let mut guard = mgr.fetch_page_write(page_id).expect("write guard");
664                    guard.data_mut()[tid] = (tid as u8) + 1;
665                    guard.set_lsn(lsn);
666                    guard.mark_dirty();
667                }
668            }));
669        }
670
671        for handle in handles {
672            handle.join().unwrap();
673        }
674
675        {
676            let pool = manager.buffer_pool();
677            let meta = pool.frame_meta(frame_id).snapshot();
678            assert!(meta.is_dirty);
679            assert_eq!(meta.pin_count, 0);
680            assert_eq!(meta.page_id, page_id);
681        }
682
683        assert!(manager.flush_page(page_id).unwrap());
684        {
685            let pool = manager.buffer_pool();
686            let meta = pool.frame_meta(frame_id).snapshot();
687            assert!(!meta.is_dirty);
688            assert_eq!(meta.pin_count, 0);
689        }
690
691        let read_back = manager
692            .buffer_pool()
693            .disk_scheduler()
694            .schedule_read(page_id)
695            .unwrap()
696            .recv()
697            .unwrap()
698            .unwrap();
699        for tid in 0..THREADS {
700            assert_eq!(read_back[tid], (tid as u8) + 1);
701        }
702    }
703}