quill_sql/buffer/
buffer_manager.rs

1//! BufferManager coordinates replacement, dirty tracking, and WAL with a shared BufferPool.
2
3use std::sync::Arc;
4
5use bytes::Bytes;
6use dashmap::{DashMap, DashSet};
7use parking_lot::{Mutex, RwLock};
8
9use crate::buffer::buffer_pool::{BufferPool, FrameId};
10use crate::buffer::page::{self, PageId, ReadPageGuard, WritePageGuard, INVALID_PAGE_ID};
11use crate::catalog::SchemaRef;
12use crate::config::BufferPoolConfig;
13use crate::error::{QuillSQLError, QuillSQLResult};
14use crate::recovery::{Lsn, WalManager};
15use crate::storage::codec::{
16    BPlusTreeHeaderPageCodec, BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec,
17    BPlusTreePageCodec, TablePageCodec,
18};
19use crate::storage::disk_scheduler::DiskScheduler;
20use crate::storage::page::{
21    BPlusTreeHeaderPage, BPlusTreeInternalPage, BPlusTreeLeafPage, BPlusTreePage, TablePage,
22};
23use crate::utils::cache::lru_k::LRUKReplacer;
24use crate::utils::cache::tiny_lfu::TinyLFU;
25use crate::utils::cache::Replacer;
26
27#[derive(Debug)]
28pub struct BufferManager {
29    pool: Arc<BufferPool>,
30    replacer: Arc<RwLock<LRUKReplacer>>,
31    inflight_loads: DashMap<PageId, Arc<Mutex<()>>>,
32    tiny_lfu: Option<Arc<RwLock<TinyLFU>>>,
33    dirty_pages: DashSet<PageId>,
34    dirty_page_table: DashMap<PageId, Lsn>,
35    wal_manager: Arc<RwLock<Option<Arc<WalManager>>>>,
36}
37
38impl BufferManager {
39    pub fn new(num_pages: usize, disk_scheduler: Arc<DiskScheduler>) -> Self {
40        Self::new_with_config(
41            BufferPoolConfig {
42                buffer_pool_size: num_pages,
43                ..Default::default()
44            },
45            disk_scheduler,
46        )
47    }
48
49    pub fn new_with_config(config: BufferPoolConfig, disk_scheduler: Arc<DiskScheduler>) -> Self {
50        let pool = Arc::new(BufferPool::new_with_config(config, disk_scheduler));
51        let replacer = Arc::new(RwLock::new(LRUKReplacer::new(pool.capacity())));
52        let tiny_lfu = if config.tiny_lfu_enable {
53            Some(Arc::new(RwLock::new(TinyLFU::new(
54                pool.capacity().next_power_of_two(),
55                config.tiny_lfu_counters,
56            ))))
57        } else {
58            None
59        };
60
61        Self {
62            pool,
63            replacer,
64            inflight_loads: DashMap::new(),
65            tiny_lfu,
66            dirty_pages: DashSet::new(),
67            dirty_page_table: DashMap::new(),
68            wal_manager: Arc::new(RwLock::new(None)),
69        }
70    }
71
72    pub fn buffer_pool(&self) -> Arc<BufferPool> {
73        self.pool.clone()
74    }
75
76    pub fn replacer_arc(&self) -> Arc<RwLock<LRUKReplacer>> {
77        self.replacer.clone()
78    }
79
80    pub fn set_wal_manager(&self, wal_manager: Arc<WalManager>) {
81        *self.wal_manager.write() = Some(wal_manager);
82    }
83
84    pub fn wal_manager(&self) -> Option<Arc<WalManager>> {
85        self.wal_manager.read().clone()
86    }
87
88    pub fn dirty_page_ids(&self) -> Vec<PageId> {
89        self.dirty_pages.iter().map(|entry| *entry.key()).collect()
90    }
91
92    pub fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)> {
93        self.dirty_page_table
94            .iter()
95            .map(|entry| (*entry.key(), *entry.value()))
96            .collect()
97    }
98
99    fn note_dirty_page(&self, page_id: PageId, rec_lsn: Lsn) {
100        self.dirty_pages.insert(page_id);
101        self.dirty_page_table.entry(page_id).or_insert(rec_lsn);
102    }
103
104    pub fn new_page(self: &Arc<Self>) -> QuillSQLResult<WritePageGuard> {
105        if !self.pool.has_free_frame() && self.replacer.read().size() == 0 {
106            return Err(QuillSQLError::Storage(
107                "Cannot new page because buffer pool is full and no page to evict".to_string(),
108            ));
109        }
110
111        let frame_id = self.allocate_frame()?;
112        let page_id = self.pool.allocate_page_id()?;
113        self.pool.insert_mapping(page_id, frame_id);
114
115        {
116            let meta = self.pool.frame_meta(frame_id);
117            meta.initialize(page_id);
118            meta.increment_pin();
119        }
120
121        self.pool.reset_frame(frame_id);
122        self.replacer_record_access(frame_id)?;
123        self.mark_non_evictable(frame_id)?;
124        Ok(page::new_write_guard(Arc::clone(self), frame_id))
125    }
126
127    pub fn fetch_page_read(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<ReadPageGuard> {
128        if page_id == INVALID_PAGE_ID {
129            return Err(QuillSQLError::Storage(
130                "fetch_page_read: invalid page id".to_string(),
131            ));
132        }
133
134        let frame_id = self.ensure_frame(page_id)?;
135        {
136            let meta = self.pool.frame_meta(frame_id);
137            meta.increment_pin();
138        }
139        self.replacer_record_access(frame_id)?;
140        self.mark_non_evictable(frame_id)?;
141        Ok(page::new_read_guard(Arc::clone(self), frame_id))
142    }
143
144    pub fn fetch_page_write(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<WritePageGuard> {
145        if page_id == INVALID_PAGE_ID {
146            return Err(QuillSQLError::Storage(
147                "fetch_page_write: invalid page id".to_string(),
148            ));
149        }
150
151        let frame_id = self.ensure_frame(page_id)?;
152        {
153            let meta = self.pool.frame_meta(frame_id);
154            meta.increment_pin();
155        }
156        self.replacer_record_access(frame_id)?;
157        self.mark_non_evictable(frame_id)?;
158        Ok(page::new_write_guard(Arc::clone(self), frame_id))
159    }
160
161    pub fn complete_unpin(
162        &self,
163        page_id: PageId,
164        is_dirty: bool,
165        rec_lsn_hint: Option<Lsn>,
166    ) -> QuillSQLResult<()> {
167        if let Some(frame_id) = self.pool.lookup_frame(page_id) {
168            let meta = self.pool.frame_meta(frame_id);
169            let new_pin_count = if meta.pin_count() > 0 {
170                meta.try_decrement_pin().unwrap_or(0)
171            } else {
172                0
173            };
174            if is_dirty {
175                meta.mark_dirty();
176                if let Some(lsn) = rec_lsn_hint {
177                    meta.set_lsn(lsn);
178                }
179                let rec_lsn = rec_lsn_hint.unwrap_or_else(|| meta.lsn());
180                self.note_dirty_page(page_id, rec_lsn);
181            }
182            if new_pin_count == 0 {
183                self.mark_evictable(frame_id)?;
184            }
185        }
186        Ok(())
187    }
188
189    pub fn fetch_table_page(
190        self: &Arc<Self>,
191        page_id: PageId,
192        schema: SchemaRef,
193    ) -> QuillSQLResult<(ReadPageGuard, TablePage)> {
194        let guard = self.fetch_page_read(page_id)?;
195        let (page, _) = TablePageCodec::decode(guard.data(), schema)?;
196        Ok((guard, page))
197    }
198
199    pub fn fetch_tree_page(
200        self: &Arc<Self>,
201        page_id: PageId,
202        key_schema: SchemaRef,
203    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreePage)> {
204        let guard = self.fetch_page_read(page_id)?;
205        let (page, _) = BPlusTreePageCodec::decode(guard.data(), key_schema.clone())?;
206        Ok((guard, page))
207    }
208
209    pub fn fetch_tree_internal_page(
210        self: &Arc<Self>,
211        page_id: PageId,
212        key_schema: SchemaRef,
213    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeInternalPage)> {
214        let guard = self.fetch_page_read(page_id)?;
215        let (page, _) = BPlusTreeInternalPageCodec::decode(guard.data(), key_schema.clone())?;
216        Ok((guard, page))
217    }
218
219    pub fn fetch_tree_leaf_page(
220        self: &Arc<Self>,
221        page_id: PageId,
222        key_schema: SchemaRef,
223    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeLeafPage)> {
224        let guard = self.fetch_page_read(page_id)?;
225        let (page, _) = BPlusTreeLeafPageCodec::decode(guard.data(), key_schema.clone())?;
226        Ok((guard, page))
227    }
228
229    pub fn fetch_header_page(
230        self: &Arc<Self>,
231        page_id: PageId,
232    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeHeaderPage)> {
233        let guard = self.fetch_page_read(page_id)?;
234        let (header, _) = BPlusTreeHeaderPageCodec::decode(guard.data())?;
235        Ok((guard, header))
236    }
237
238    pub fn prefetch_page(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<()> {
239        if let Ok(g) = self.fetch_page_read(page_id) {
240            drop(g);
241        }
242        Ok(())
243    }
244
245    pub fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
246        let Some(frame_id) = self.pool.lookup_frame(page_id) else {
247            return Ok(false);
248        };
249        let meta = self.pool.frame_meta(frame_id);
250        if !meta.is_dirty() {
251            self.dirty_pages.remove(&page_id);
252            self.dirty_page_table.remove(&page_id);
253            return Ok(false);
254        }
255        let lsn = meta.lsn();
256        self.ensure_wal_durable(lsn)?;
257        let bytes = {
258            let _lock = self.pool.frame_lock(frame_id).read();
259            let slice = unsafe { self.pool.frame_slice(frame_id) };
260            Bytes::copy_from_slice(slice)
261        };
262        self.pool.write_page_to_disk(page_id, bytes)?;
263        meta.clear_dirty();
264        self.dirty_pages.remove(&page_id);
265        self.dirty_page_table.remove(&page_id);
266        Ok(true)
267    }
268
269    pub fn flush_all_pages(&self) -> QuillSQLResult<()> {
270        if let Some(wal) = self.wal_manager.read().clone() {
271            wal.flush(None)?;
272        }
273        let dirty_ids: Vec<PageId> = self.dirty_pages.iter().map(|entry| *entry.key()).collect();
274        for page_id in dirty_ids {
275            let _ = self.flush_page(page_id)?;
276        }
277        Ok(())
278    }
279
280    pub fn delete_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
281        let (guard, created_here) = if let Some(existing) = self.inflight_loads.get(&page_id) {
282            (existing.clone(), false)
283        } else {
284            let arc = Arc::new(Mutex::new(()));
285            self.inflight_loads.insert(page_id, arc.clone());
286            (arc, true)
287        };
288        let lock = guard.lock();
289        let result = self.delete_page_inner(page_id);
290        drop(lock);
291        if created_here {
292            self.inflight_loads.remove(&page_id);
293        }
294        result
295    }
296
297    fn delete_page_inner(&self, page_id: PageId) -> QuillSQLResult<bool> {
298        if let Some(frame_id) = self.pool.lookup_frame(page_id) {
299            let lock = self.pool.frame_lock(frame_id);
300            let guard = match lock.try_write() {
301                Some(g) => g,
302                None => return Ok(false),
303            };
304            drop(guard);
305            let meta = self.pool.frame_meta(frame_id);
306            if meta.page_id() != page_id {
307                self.pool.remove_mapping_if(page_id, frame_id);
308                return self.delete_page_inner(page_id);
309            }
310            if meta.pin_count() > 0 {
311                return Ok(false);
312            }
313            if !self.pool.remove_mapping_if(page_id, frame_id) {
314                return self.delete_page_inner(page_id);
315            }
316            self.pool.reset_frame(frame_id);
317            self.dirty_pages.remove(&page_id);
318            self.dirty_page_table.remove(&page_id);
319            self.pool.clear_frame_meta(frame_id);
320            {
321                let mut rep = self.replacer.write();
322                let _ = rep.set_evictable(frame_id, true);
323                rep.remove(frame_id);
324            }
325            self.pool.push_free_frame(frame_id);
326            self.pool
327                .disk_scheduler()
328                .schedule_deallocate(page_id)?
329                .recv()
330                .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
331            Ok(true)
332        } else {
333            self.pool
334                .disk_scheduler()
335                .schedule_deallocate(page_id)?
336                .recv()
337                .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
338            Ok(true)
339        }
340    }
341
342    fn ensure_frame(&self, page_id: PageId) -> QuillSQLResult<FrameId> {
343        if let Some(frame_id) = self.pool.lookup_frame(page_id) {
344            self.replacer_record_access(frame_id)?;
345            if let Some(lfu) = &self.tiny_lfu {
346                lfu.write().admit_record(page_id as u64);
347            }
348            return Ok(frame_id);
349        }
350
351        let (guard, created_here) = if let Some(existing) = self.inflight_loads.get(&page_id) {
352            (existing.clone(), false)
353        } else {
354            let arc = Arc::new(Mutex::new(()));
355            self.inflight_loads.insert(page_id, arc.clone());
356            (arc, true)
357        };
358        let _lock = guard.lock();
359
360        if let Some(frame_id) = self.pool.lookup_frame(page_id) {
361            if created_here {
362                self.inflight_loads.remove(&page_id);
363            }
364            self.replacer_record_access(frame_id)?;
365            if let Some(lfu) = &self.tiny_lfu {
366                lfu.write().admit_record(page_id as u64);
367            }
368            return Ok(frame_id);
369        }
370
371        if let Some(lfu) = &self.tiny_lfu {
372            let estimate = lfu.read().estimate(page_id as u64);
373            if estimate == 0 && !self.pool.has_free_frame() && self.replacer.read().size() == 0 {
374                if created_here {
375                    self.inflight_loads.remove(&page_id);
376                }
377                return Err(QuillSQLError::Storage(
378                    "Cannot allocate frame: admission denied and no space".to_string(),
379                ));
380            }
381        }
382
383        let frame_id = self.allocate_frame()?;
384        self.pool.load_page_into_frame(page_id, frame_id)?;
385        self.pool.insert_mapping(page_id, frame_id);
386
387        if let Some(lfu) = &self.tiny_lfu {
388            lfu.write().admit_record(page_id as u64);
389        }
390        if created_here {
391            self.inflight_loads.remove(&page_id);
392        }
393        self.replacer_record_access(frame_id)?;
394        Ok(frame_id)
395    }
396
397    fn allocate_frame(&self) -> QuillSQLResult<FrameId> {
398        if let Some(frame_id) = self.pool.pop_free_frame() {
399            return Ok(frame_id);
400        }
401        self.evict_victim_frame()
402    }
403
404    fn replacer_record_access(&self, frame_id: FrameId) -> QuillSQLResult<()> {
405        let mut rep = self.replacer.write();
406        let _ = rep.record_access(frame_id);
407        Ok(())
408    }
409
410    fn evict_victim_frame(&self) -> QuillSQLResult<FrameId> {
411        loop {
412            let victim = {
413                let mut rep = self.replacer.write();
414                match rep.evict() {
415                    Some(frame_id) => frame_id,
416                    None => {
417                        return Err(QuillSQLError::Storage(
418                            "Cannot allocate frame: buffer pool is full".to_string(),
419                        ))
420                    }
421                }
422            };
423
424            let snapshot = self.pool.frame_meta(victim).snapshot();
425            let page_id = snapshot.page_id;
426            let pin_count = snapshot.pin_count;
427            let is_dirty = snapshot.is_dirty;
428            let lsn = snapshot.lsn;
429
430            if pin_count > 0 {
431                let mut rep = self.replacer.write();
432                let _ = rep.record_access(victim);
433                let _ = rep.set_evictable(victim, false);
434                continue;
435            }
436
437            if page_id != INVALID_PAGE_ID {
438                if is_dirty {
439                    self.ensure_wal_durable(lsn)?;
440                    let bytes = Bytes::copy_from_slice(unsafe { self.pool.frame_slice(victim) });
441                    self.pool.write_page_to_disk(page_id, bytes)?;
442                    self.dirty_pages.remove(&page_id);
443                    self.dirty_page_table.remove(&page_id);
444                }
445                self.pool.remove_mapping(page_id);
446            }
447
448            self.pool.clear_frame_meta(victim);
449            self.pool.reset_frame(victim);
450            return Ok(victim);
451        }
452    }
453
454    fn mark_evictable(&self, frame_id: FrameId) -> QuillSQLResult<()> {
455        let mut rep = self.replacer.write();
456        let _ = rep.set_evictable(frame_id, true);
457        Ok(())
458    }
459
460    fn mark_non_evictable(&self, frame_id: FrameId) -> QuillSQLResult<()> {
461        let mut rep = self.replacer.write();
462        let _ = rep.set_evictable(frame_id, false);
463        Ok(())
464    }
465
466    fn ensure_wal_durable(&self, lsn: Lsn) -> QuillSQLResult<()> {
467        if lsn == 0 {
468            return Ok(());
469        }
470        if let Some(wal) = self.wal_manager.read().clone() {
471            if lsn > wal.durable_lsn() {
472                wal.flush(Some(lsn))?;
473                if wal.durable_lsn() < lsn {
474                    return Err(QuillSQLError::Internal(format!(
475                        "Flush blocked: page_lsn={} > durable_lsn={}",
476                        lsn,
477                        wal.durable_lsn()
478                    )));
479                }
480            }
481        }
482        Ok(())
483    }
484
485    pub fn clone_arc(self: &Arc<Self>) -> Arc<Self> {
486        self.clone()
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use crate::storage::disk_manager::DiskManager;
494    use crate::storage::disk_scheduler::DiskScheduler;
495    use std::sync::{Arc, Barrier};
496    use std::thread;
497    use tempfile::TempDir;
498
499    fn setup_manager(num_pages: usize) -> (TempDir, Arc<BufferManager>) {
500        let temp_dir = TempDir::new().unwrap();
501        let db_file = temp_dir.path().join("test.db");
502        let disk_manager = Arc::new(DiskManager::try_new(db_file).unwrap());
503        let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager));
504        let manager = Arc::new(BufferManager::new(num_pages, disk_scheduler));
505        (temp_dir, manager)
506    }
507
508    #[test]
509    fn new_page_initializes_frame() {
510        let (_tmp, manager) = setup_manager(2);
511        let guard = manager.new_page().unwrap();
512        let page_id = guard.page_id();
513        let frame_id = guard.frame_id();
514
515        // Page should be zeroed
516        assert!(guard.data().iter().all(|b| *b == 0));
517        assert!(!guard.is_dirty());
518        assert_eq!(guard.lsn(), 0);
519
520        drop(guard);
521
522        let meta = manager.buffer_pool().frame_meta(frame_id).snapshot();
523        assert_eq!(meta.page_id, page_id);
524        assert_eq!(meta.pin_count, 0);
525        assert!(!meta.is_dirty);
526    }
527
528    #[test]
529    fn fetch_page_read_increments_pin_and_resets_on_drop() {
530        let (_tmp, manager) = setup_manager(2);
531        let guard = manager.new_page().unwrap();
532        let page_id = guard.page_id();
533        let frame_id = guard.frame_id();
534        drop(guard);
535
536        {
537            let read_guard = manager.fetch_page_read(page_id).unwrap();
538            assert_eq!(read_guard.pin_count(), 1);
539            assert_eq!(read_guard.frame_id(), frame_id);
540        }
541
542        let meta = manager.buffer_pool().frame_meta(frame_id).snapshot();
543        assert_eq!(meta.pin_count, 0);
544    }
545
546    #[test]
547    fn fetch_page_write_marks_dirty_and_tracks_lsn() {
548        let (_tmp, manager) = setup_manager(2);
549        let mut guard = manager.new_page().unwrap();
550        let page_id = guard.page_id();
551        guard.data_mut()[0] = 7;
552        guard.set_lsn(99);
553        guard.mark_dirty();
554        drop(guard);
555
556        let mut write_guard = manager.fetch_page_write(page_id).unwrap();
557        assert!(write_guard.is_dirty());
558        assert_eq!(write_guard.lsn(), 99);
559        write_guard.data_mut()[1] = 8;
560        drop(write_guard);
561
562        let meta = manager
563            .buffer_pool()
564            .frame_meta(manager.buffer_pool().lookup_frame(page_id).unwrap())
565            .snapshot();
566        assert!(meta.is_dirty);
567        assert_eq!(meta.lsn, 99);
568        assert_eq!(meta.pin_count, 0);
569    }
570
571    #[test]
572    fn flush_page_writes_back_and_clears_dirty_flag() {
573        let (_tmp, manager) = setup_manager(2);
574        let mut guard = manager.new_page().unwrap();
575        let page_id = guard.page_id();
576        guard.data_mut()[0] = 42;
577        guard.set_lsn(123);
578        guard.mark_dirty();
579        drop(guard);
580
581        assert!(manager.flush_page(page_id).unwrap());
582
583        let meta = manager
584            .buffer_pool()
585            .frame_meta(manager.buffer_pool().lookup_frame(page_id).unwrap())
586            .snapshot();
587        assert!(!meta.is_dirty);
588    }
589
590    #[test]
591    fn delete_page_releases_frame() {
592        let (_tmp, manager) = setup_manager(2);
593        let page_id = {
594            let guard = manager.new_page().unwrap();
595            guard.page_id()
596        };
597
598        assert!(manager.delete_page(page_id).unwrap());
599        assert!(manager.buffer_pool().lookup_frame(page_id).is_none());
600        assert!(manager.buffer_pool().has_free_frame());
601
602        // Ensure subsequent allocation succeeds and pool remains operational
603        let new_guard = manager.new_page().unwrap();
604        assert!(new_guard.frame_id() < manager.buffer_pool().capacity());
605    }
606
607    #[test]
608    fn concurrent_reads_do_not_leak_pins() {
609        const THREADS: usize = 8;
610        let (_tmp, manager) = setup_manager(4);
611        let (page_id, frame_id) = {
612            let mut guard = manager.new_page().unwrap();
613            guard.data_mut()[0] = 42;
614            (guard.page_id(), guard.frame_id())
615        };
616
617        let barrier = Arc::new(Barrier::new(THREADS));
618        let mut handles = Vec::with_capacity(THREADS);
619        for _ in 0..THREADS {
620            let mgr = manager.clone();
621            let barrier = barrier.clone();
622            handles.push(thread::spawn(move || {
623                barrier.wait();
624                for _ in 0..50 {
625                    let guard = mgr.fetch_page_read(page_id).expect("read page");
626                    assert_eq!(guard.data()[0], 42);
627                }
628            }));
629        }
630
631        for handle in handles {
632            handle.join().unwrap();
633        }
634
635        let pool = manager.buffer_pool();
636        let meta = pool.frame_meta(frame_id).snapshot();
637        assert_eq!(meta.pin_count, 0);
638        assert_eq!(meta.page_id, page_id);
639    }
640
641    #[test]
642    fn concurrent_writes_mark_dirty_and_flush_once() {
643        const THREADS: usize = 4;
644        let (_tmp, manager) = setup_manager(4);
645        let (page_id, frame_id) = {
646            let guard = manager.new_page().unwrap();
647            (guard.page_id(), guard.frame_id())
648        };
649
650        let barrier = Arc::new(Barrier::new(THREADS));
651        let mut handles = Vec::with_capacity(THREADS);
652        for tid in 0..THREADS {
653            let mgr = manager.clone();
654            let barrier = barrier.clone();
655            handles.push(thread::spawn(move || {
656                let lsn = (tid as Lsn) + 1;
657                barrier.wait();
658                for _ in 0..25 {
659                    let mut guard = mgr.fetch_page_write(page_id).expect("write guard");
660                    guard.data_mut()[tid] = (tid as u8) + 1;
661                    guard.set_lsn(lsn);
662                    guard.mark_dirty();
663                }
664            }));
665        }
666
667        for handle in handles {
668            handle.join().unwrap();
669        }
670
671        {
672            let pool = manager.buffer_pool();
673            let meta = pool.frame_meta(frame_id).snapshot();
674            assert!(meta.is_dirty);
675            assert_eq!(meta.pin_count, 0);
676            assert_eq!(meta.page_id, page_id);
677        }
678
679        assert!(manager.flush_page(page_id).unwrap());
680        {
681            let pool = manager.buffer_pool();
682            let meta = pool.frame_meta(frame_id).snapshot();
683            assert!(!meta.is_dirty);
684            assert_eq!(meta.pin_count, 0);
685        }
686
687        let read_back = manager
688            .buffer_pool()
689            .disk_scheduler()
690            .schedule_read(page_id)
691            .unwrap()
692            .recv()
693            .unwrap()
694            .unwrap();
695        for tid in 0..THREADS {
696            assert_eq!(read_back[tid], (tid as u8) + 1);
697        }
698    }
699}