quill_sql/buffer/
buffer_manager.rs

1//! BufferManager coordinates replacement, dirty tracking, and WAL with a shared BufferPool.
2
3use std::sync::Arc;
4
5use bytes::Bytes;
6use dashmap::{DashMap, DashSet};
7use parking_lot::{Mutex, RwLock};
8
9use crate::buffer::buffer_pool::{BufferPool, FrameId, FrameMeta};
10use crate::buffer::page::{self, PageId, ReadPageGuard, WritePageGuard, INVALID_PAGE_ID};
11use crate::catalog::SchemaRef;
12use crate::config::BufferPoolConfig;
13use crate::error::{QuillSQLError, QuillSQLResult};
14use crate::recovery::{Lsn, WalManager};
15use crate::storage::codec::{
16    BPlusTreeHeaderPageCodec, BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec,
17    BPlusTreePageCodec, TablePageCodec,
18};
19use crate::storage::disk_scheduler::DiskScheduler;
20use crate::storage::page::{
21    BPlusTreeHeaderPage, BPlusTreeInternalPage, BPlusTreeLeafPage, BPlusTreePage, TablePage,
22};
23use crate::utils::cache::lru_k::LRUKReplacer;
24use crate::utils::cache::tiny_lfu::TinyLFU;
25use crate::utils::cache::Replacer;
26
27#[derive(Debug)]
28pub struct BufferManager {
29    pool: Arc<BufferPool>,
30    replacer: Arc<RwLock<LRUKReplacer>>,
31    inflight_loads: DashMap<PageId, Arc<Mutex<()>>>,
32    tiny_lfu: Option<Arc<RwLock<TinyLFU>>>,
33    dirty_pages: DashSet<PageId>,
34    dirty_page_table: DashMap<PageId, Lsn>,
35    wal_manager: Arc<RwLock<Option<Arc<WalManager>>>>,
36}
37
38impl BufferManager {
39    pub fn new(num_pages: usize, disk_scheduler: Arc<DiskScheduler>) -> Self {
40        Self::new_with_config(
41            BufferPoolConfig {
42                buffer_pool_size: num_pages,
43                ..Default::default()
44            },
45            disk_scheduler,
46        )
47    }
48
49    pub fn new_with_config(config: BufferPoolConfig, disk_scheduler: Arc<DiskScheduler>) -> Self {
50        let pool = Arc::new(BufferPool::new_with_config(config, disk_scheduler));
51        let replacer = Arc::new(RwLock::new(LRUKReplacer::new(pool.capacity())));
52        let tiny_lfu = if config.tiny_lfu_enable {
53            Some(Arc::new(RwLock::new(TinyLFU::new(
54                pool.capacity().next_power_of_two(),
55                config.tiny_lfu_counters,
56            ))))
57        } else {
58            None
59        };
60
61        Self {
62            pool,
63            replacer,
64            inflight_loads: DashMap::new(),
65            tiny_lfu,
66            dirty_pages: DashSet::new(),
67            dirty_page_table: DashMap::new(),
68            wal_manager: Arc::new(RwLock::new(None)),
69        }
70    }
71
72    pub fn buffer_pool(&self) -> Arc<BufferPool> {
73        self.pool.clone()
74    }
75
76    pub fn replacer_arc(&self) -> Arc<RwLock<LRUKReplacer>> {
77        self.replacer.clone()
78    }
79
80    pub fn set_wal_manager(&self, wal_manager: Arc<WalManager>) {
81        *self.wal_manager.write() = Some(wal_manager);
82    }
83
84    pub fn wal_manager(&self) -> Option<Arc<WalManager>> {
85        self.wal_manager.read().clone()
86    }
87
88    pub fn dirty_page_ids(&self) -> Vec<PageId> {
89        self.dirty_pages.iter().map(|entry| *entry.key()).collect()
90    }
91
92    pub fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)> {
93        self.dirty_page_table
94            .iter()
95            .map(|entry| (*entry.key(), *entry.value()))
96            .collect()
97    }
98
99    fn note_dirty_page(&self, page_id: PageId, rec_lsn: Lsn) {
100        self.dirty_pages.insert(page_id);
101        self.dirty_page_table.entry(page_id).or_insert(rec_lsn);
102    }
103
104    pub fn new_page(self: &Arc<Self>) -> QuillSQLResult<WritePageGuard> {
105        if !self.pool.has_free_frame() && self.replacer.read().size() == 0 {
106            return Err(QuillSQLError::Storage(
107                "Cannot new page because buffer pool is full and no page to evict".to_string(),
108            ));
109        }
110
111        let frame_id = self.allocate_frame()?;
112        let page_id = self.pool.allocate_page_id()?;
113        self.pool.insert_mapping(page_id, frame_id);
114
115        {
116            let mut meta = self.pool.frame_meta(frame_id);
117            meta.page_id = page_id;
118            meta.pin_count = 1;
119            meta.is_dirty = false;
120            meta.lsn = 0;
121        }
122
123        self.pool.reset_frame(frame_id);
124        self.replacer_record_access(frame_id)?;
125        self.mark_non_evictable(frame_id)?;
126        Ok(page::new_write_guard(Arc::clone(self), frame_id))
127    }
128
129    pub fn fetch_page_read(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<ReadPageGuard> {
130        if page_id == INVALID_PAGE_ID {
131            return Err(QuillSQLError::Storage(
132                "fetch_page_read: invalid page id".to_string(),
133            ));
134        }
135
136        let frame_id = self.ensure_frame(page_id)?;
137        {
138            let mut meta = self.pool.frame_meta(frame_id);
139            meta.pin_count += 1;
140        }
141        self.replacer_record_access(frame_id)?;
142        self.mark_non_evictable(frame_id)?;
143        Ok(page::new_read_guard(Arc::clone(self), frame_id))
144    }
145
146    pub fn fetch_page_write(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<WritePageGuard> {
147        if page_id == INVALID_PAGE_ID {
148            return Err(QuillSQLError::Storage(
149                "fetch_page_write: invalid page id".to_string(),
150            ));
151        }
152
153        let frame_id = self.ensure_frame(page_id)?;
154        {
155            let mut meta = self.pool.frame_meta(frame_id);
156            meta.pin_count += 1;
157        }
158        self.replacer_record_access(frame_id)?;
159        self.mark_non_evictable(frame_id)?;
160        Ok(page::new_write_guard(Arc::clone(self), frame_id))
161    }
162
163    pub fn complete_unpin(
164        &self,
165        page_id: PageId,
166        is_dirty: bool,
167        rec_lsn_hint: Option<Lsn>,
168    ) -> QuillSQLResult<()> {
169        if let Some(frame_id) = self.pool.lookup_frame(page_id) {
170            let mut meta = self.pool.frame_meta(frame_id);
171            if meta.pin_count > 0 {
172                meta.pin_count -= 1;
173            }
174            if is_dirty {
175                meta.is_dirty = true;
176                if let Some(lsn) = rec_lsn_hint {
177                    meta.lsn = lsn;
178                }
179                self.note_dirty_page(page_id, rec_lsn_hint.unwrap_or(meta.lsn));
180            }
181            if meta.pin_count == 0 {
182                self.mark_evictable(frame_id)?;
183            }
184        }
185        Ok(())
186    }
187
188    pub fn fetch_table_page(
189        self: &Arc<Self>,
190        page_id: PageId,
191        schema: SchemaRef,
192    ) -> QuillSQLResult<(ReadPageGuard, TablePage)> {
193        let guard = self.fetch_page_read(page_id)?;
194        let (page, _) = TablePageCodec::decode(guard.data(), schema)?;
195        Ok((guard, page))
196    }
197
198    pub fn fetch_tree_page(
199        self: &Arc<Self>,
200        page_id: PageId,
201        key_schema: SchemaRef,
202    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreePage)> {
203        let guard = self.fetch_page_read(page_id)?;
204        let (page, _) = BPlusTreePageCodec::decode(guard.data(), key_schema.clone())?;
205        Ok((guard, page))
206    }
207
208    pub fn fetch_tree_internal_page(
209        self: &Arc<Self>,
210        page_id: PageId,
211        key_schema: SchemaRef,
212    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeInternalPage)> {
213        let guard = self.fetch_page_read(page_id)?;
214        let (page, _) = BPlusTreeInternalPageCodec::decode(guard.data(), key_schema.clone())?;
215        Ok((guard, page))
216    }
217
218    pub fn fetch_tree_leaf_page(
219        self: &Arc<Self>,
220        page_id: PageId,
221        key_schema: SchemaRef,
222    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeLeafPage)> {
223        let guard = self.fetch_page_read(page_id)?;
224        let (page, _) = BPlusTreeLeafPageCodec::decode(guard.data(), key_schema.clone())?;
225        Ok((guard, page))
226    }
227
228    pub fn fetch_header_page(
229        self: &Arc<Self>,
230        page_id: PageId,
231    ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeHeaderPage)> {
232        let guard = self.fetch_page_read(page_id)?;
233        let (header, _) = BPlusTreeHeaderPageCodec::decode(guard.data())?;
234        Ok((guard, header))
235    }
236
237    pub fn prefetch_page(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<()> {
238        if let Ok(g) = self.fetch_page_read(page_id) {
239            drop(g);
240        }
241        Ok(())
242    }
243
244    pub fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
245        let Some(frame_id) = self.pool.lookup_frame(page_id) else {
246            return Ok(false);
247        };
248        let meta = self.pool.frame_meta(frame_id);
249        if !meta.is_dirty {
250            self.dirty_pages.remove(&page_id);
251            self.dirty_page_table.remove(&page_id);
252            return Ok(false);
253        }
254        let lsn = meta.lsn;
255        drop(meta);
256        self.ensure_wal_durable(lsn)?;
257        let bytes = {
258            let _lock = self.pool.frame_lock(frame_id).read();
259            let slice = unsafe { self.pool.frame_slice(frame_id) };
260            Bytes::copy_from_slice(slice)
261        };
262        self.pool.write_page_to_disk(page_id, bytes)?;
263        let mut meta = self.pool.frame_meta(frame_id);
264        meta.is_dirty = false;
265        self.dirty_pages.remove(&page_id);
266        self.dirty_page_table.remove(&page_id);
267        Ok(true)
268    }
269
270    pub fn flush_all_pages(&self) -> QuillSQLResult<()> {
271        if let Some(wal) = self.wal_manager.read().clone() {
272            wal.flush(None)?;
273        }
274        let dirty_ids: Vec<PageId> = self.dirty_pages.iter().map(|entry| *entry.key()).collect();
275        for page_id in dirty_ids {
276            let _ = self.flush_page(page_id)?;
277        }
278        Ok(())
279    }
280
281    pub fn delete_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
282        let (guard, created_here) = if let Some(existing) = self.inflight_loads.get(&page_id) {
283            (existing.clone(), false)
284        } else {
285            let arc = Arc::new(Mutex::new(()));
286            self.inflight_loads.insert(page_id, arc.clone());
287            (arc, true)
288        };
289        let lock = guard.lock();
290        let result = self.delete_page_inner(page_id);
291        drop(lock);
292        if created_here {
293            self.inflight_loads.remove(&page_id);
294        }
295        result
296    }
297
298    fn delete_page_inner(&self, page_id: PageId) -> QuillSQLResult<bool> {
299        if let Some(frame_id) = self.pool.lookup_frame(page_id) {
300            let lock = self.pool.frame_lock(frame_id);
301            let guard = match lock.try_write() {
302                Some(g) => g,
303                None => return Ok(false),
304            };
305            drop(guard);
306            let meta = self.pool.frame_meta(frame_id);
307            if meta.page_id != page_id {
308                drop(meta);
309                self.pool.remove_mapping_if(page_id, frame_id);
310                return self.delete_page_inner(page_id);
311            }
312            if meta.pin_count > 0 {
313                drop(meta);
314                return Ok(false);
315            }
316            if !self.pool.remove_mapping_if(page_id, frame_id) {
317                drop(meta);
318                return self.delete_page_inner(page_id);
319            }
320            drop(meta);
321            self.pool.reset_frame(frame_id);
322            self.dirty_pages.remove(&page_id);
323            self.dirty_page_table.remove(&page_id);
324            {
325                let mut meta = self.pool.frame_meta(frame_id);
326                *meta = FrameMeta::default();
327            }
328            {
329                let mut rep = self.replacer.write();
330                let _ = rep.set_evictable(frame_id, true);
331                rep.remove(frame_id);
332            }
333            self.pool.push_free_frame(frame_id);
334            self.pool
335                .disk_scheduler()
336                .schedule_deallocate(page_id)?
337                .recv()
338                .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
339            Ok(true)
340        } else {
341            self.pool
342                .disk_scheduler()
343                .schedule_deallocate(page_id)?
344                .recv()
345                .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
346            Ok(true)
347        }
348    }
349
350    fn ensure_frame(&self, page_id: PageId) -> QuillSQLResult<FrameId> {
351        if let Some(frame_id) = self.pool.lookup_frame(page_id) {
352            self.replacer_record_access(frame_id)?;
353            if let Some(lfu) = &self.tiny_lfu {
354                lfu.write().admit_record(page_id as u64);
355            }
356            return Ok(frame_id);
357        }
358
359        let (guard, created_here) = if let Some(existing) = self.inflight_loads.get(&page_id) {
360            (existing.clone(), false)
361        } else {
362            let arc = Arc::new(Mutex::new(()));
363            self.inflight_loads.insert(page_id, arc.clone());
364            (arc, true)
365        };
366        let _lock = guard.lock();
367
368        if let Some(frame_id) = self.pool.lookup_frame(page_id) {
369            if created_here {
370                self.inflight_loads.remove(&page_id);
371            }
372            self.replacer_record_access(frame_id)?;
373            if let Some(lfu) = &self.tiny_lfu {
374                lfu.write().admit_record(page_id as u64);
375            }
376            return Ok(frame_id);
377        }
378
379        if let Some(lfu) = &self.tiny_lfu {
380            let estimate = lfu.read().estimate(page_id as u64);
381            if estimate == 0 && !self.pool.has_free_frame() && self.replacer.read().size() == 0 {
382                if created_here {
383                    self.inflight_loads.remove(&page_id);
384                }
385                return Err(QuillSQLError::Storage(
386                    "Cannot allocate frame: admission denied and no space".to_string(),
387                ));
388            }
389        }
390
391        let frame_id = self.allocate_frame()?;
392        self.pool.load_page_into_frame(page_id, frame_id)?;
393        self.pool.insert_mapping(page_id, frame_id);
394        {
395            let mut meta = self.pool.frame_meta(frame_id);
396            meta.page_id = page_id;
397            meta.pin_count = 0;
398            meta.is_dirty = false;
399            meta.lsn = 0;
400        }
401        if let Some(lfu) = &self.tiny_lfu {
402            lfu.write().admit_record(page_id as u64);
403        }
404        if created_here {
405            self.inflight_loads.remove(&page_id);
406        }
407        self.replacer_record_access(frame_id)?;
408        Ok(frame_id)
409    }
410
411    fn allocate_frame(&self) -> QuillSQLResult<FrameId> {
412        if let Some(frame_id) = self.pool.pop_free_frame() {
413            return Ok(frame_id);
414        }
415        self.evict_victim_frame()
416    }
417
418    fn replacer_record_access(&self, frame_id: FrameId) -> QuillSQLResult<()> {
419        let mut rep = self.replacer.write();
420        let _ = rep.record_access(frame_id);
421        Ok(())
422    }
423
424    fn evict_victim_frame(&self) -> QuillSQLResult<FrameId> {
425        loop {
426            let victim = {
427                let mut rep = self.replacer.write();
428                match rep.evict() {
429                    Some(frame_id) => frame_id,
430                    None => {
431                        return Err(QuillSQLError::Storage(
432                            "Cannot allocate frame: buffer pool is full".to_string(),
433                        ))
434                    }
435                }
436            };
437
438            let (page_id, pin_count, is_dirty, lsn) = {
439                let meta = self.pool.frame_meta(victim);
440                (meta.page_id, meta.pin_count, meta.is_dirty, meta.lsn)
441            };
442
443            if pin_count > 0 {
444                let mut rep = self.replacer.write();
445                let _ = rep.record_access(victim);
446                let _ = rep.set_evictable(victim, false);
447                continue;
448            }
449
450            if page_id != INVALID_PAGE_ID {
451                if is_dirty {
452                    self.ensure_wal_durable(lsn)?;
453                    let bytes = Bytes::copy_from_slice(unsafe { self.pool.frame_slice(victim) });
454                    self.pool.write_page_to_disk(page_id, bytes)?;
455                    self.dirty_pages.remove(&page_id);
456                    self.dirty_page_table.remove(&page_id);
457                }
458                self.pool.remove_mapping(page_id);
459            }
460
461            self.pool.clear_frame_meta(victim);
462            self.pool.reset_frame(victim);
463            return Ok(victim);
464        }
465    }
466
467    fn mark_evictable(&self, frame_id: FrameId) -> QuillSQLResult<()> {
468        let mut rep = self.replacer.write();
469        let _ = rep.set_evictable(frame_id, true);
470        Ok(())
471    }
472
473    fn mark_non_evictable(&self, frame_id: FrameId) -> QuillSQLResult<()> {
474        let mut rep = self.replacer.write();
475        let _ = rep.set_evictable(frame_id, false);
476        Ok(())
477    }
478
479    fn ensure_wal_durable(&self, lsn: Lsn) -> QuillSQLResult<()> {
480        if lsn == 0 {
481            return Ok(());
482        }
483        if let Some(wal) = self.wal_manager.read().clone() {
484            if lsn > wal.durable_lsn() {
485                wal.flush(Some(lsn))?;
486                if wal.durable_lsn() < lsn {
487                    return Err(QuillSQLError::Internal(format!(
488                        "Flush blocked: page_lsn={} > durable_lsn={}",
489                        lsn,
490                        wal.durable_lsn()
491                    )));
492                }
493            }
494        }
495        Ok(())
496    }
497
498    pub fn clone_arc(self: &Arc<Self>) -> Arc<Self> {
499        self.clone()
500    }
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506    use crate::storage::disk_manager::DiskManager;
507    use crate::storage::disk_scheduler::DiskScheduler;
508    use std::sync::{Arc, Barrier};
509    use std::thread;
510    use tempfile::TempDir;
511
512    fn setup_manager(num_pages: usize) -> (TempDir, Arc<BufferManager>) {
513        let temp_dir = TempDir::new().unwrap();
514        let db_file = temp_dir.path().join("test.db");
515        let disk_manager = Arc::new(DiskManager::try_new(db_file).unwrap());
516        let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager));
517        let manager = Arc::new(BufferManager::new(num_pages, disk_scheduler));
518        (temp_dir, manager)
519    }
520
521    #[test]
522    fn new_page_initializes_frame() {
523        let (_tmp, manager) = setup_manager(2);
524        let guard = manager.new_page().unwrap();
525        let page_id = guard.page_id();
526        let frame_id = guard.frame_id();
527
528        // Page should be zeroed
529        assert!(guard.data().iter().all(|b| *b == 0));
530        assert!(!guard.is_dirty());
531        assert_eq!(guard.lsn(), 0);
532
533        drop(guard);
534
535        let meta = manager.buffer_pool().frame_meta(frame_id).clone();
536        assert_eq!(meta.page_id, page_id);
537        assert_eq!(meta.pin_count, 0);
538        assert!(!meta.is_dirty);
539    }
540
541    #[test]
542    fn fetch_page_read_increments_pin_and_resets_on_drop() {
543        let (_tmp, manager) = setup_manager(2);
544        let guard = manager.new_page().unwrap();
545        let page_id = guard.page_id();
546        let frame_id = guard.frame_id();
547        drop(guard);
548
549        {
550            let read_guard = manager.fetch_page_read(page_id).unwrap();
551            assert_eq!(read_guard.pin_count(), 1);
552            assert_eq!(read_guard.frame_id(), frame_id);
553        }
554
555        let meta = manager.buffer_pool().frame_meta(frame_id).clone();
556        assert_eq!(meta.pin_count, 0);
557    }
558
559    #[test]
560    fn fetch_page_write_marks_dirty_and_tracks_lsn() {
561        let (_tmp, manager) = setup_manager(2);
562        let mut guard = manager.new_page().unwrap();
563        let page_id = guard.page_id();
564        guard.data_mut()[0] = 7;
565        guard.set_lsn(99);
566        guard.mark_dirty();
567        drop(guard);
568
569        let mut write_guard = manager.fetch_page_write(page_id).unwrap();
570        assert!(write_guard.is_dirty());
571        assert_eq!(write_guard.lsn(), 99);
572        write_guard.data_mut()[1] = 8;
573        drop(write_guard);
574
575        let meta = manager
576            .buffer_pool()
577            .frame_meta(manager.buffer_pool().lookup_frame(page_id).unwrap())
578            .clone();
579        assert!(meta.is_dirty);
580        assert_eq!(meta.lsn, 99);
581        assert_eq!(meta.pin_count, 0);
582    }
583
584    #[test]
585    fn flush_page_writes_back_and_clears_dirty_flag() {
586        let (_tmp, manager) = setup_manager(2);
587        let mut guard = manager.new_page().unwrap();
588        let page_id = guard.page_id();
589        guard.data_mut()[0] = 42;
590        guard.set_lsn(123);
591        guard.mark_dirty();
592        drop(guard);
593
594        assert!(manager.flush_page(page_id).unwrap());
595
596        let meta = manager
597            .buffer_pool()
598            .frame_meta(manager.buffer_pool().lookup_frame(page_id).unwrap())
599            .clone();
600        assert!(!meta.is_dirty);
601    }
602
603    #[test]
604    fn delete_page_releases_frame() {
605        let (_tmp, manager) = setup_manager(2);
606        let page_id = {
607            let guard = manager.new_page().unwrap();
608            guard.page_id()
609        };
610
611        assert!(manager.delete_page(page_id).unwrap());
612        assert!(manager.buffer_pool().lookup_frame(page_id).is_none());
613        assert!(manager.buffer_pool().has_free_frame());
614
615        // Ensure subsequent allocation succeeds and pool remains operational
616        let new_guard = manager.new_page().unwrap();
617        assert!(new_guard.frame_id() < manager.buffer_pool().capacity());
618    }
619
620    #[test]
621    fn concurrent_reads_do_not_leak_pins() {
622        const THREADS: usize = 8;
623        let (_tmp, manager) = setup_manager(4);
624        let (page_id, frame_id) = {
625            let mut guard = manager.new_page().unwrap();
626            guard.data_mut()[0] = 42;
627            (guard.page_id(), guard.frame_id())
628        };
629
630        let barrier = Arc::new(Barrier::new(THREADS));
631        let mut handles = Vec::with_capacity(THREADS);
632        for _ in 0..THREADS {
633            let mgr = manager.clone();
634            let barrier = barrier.clone();
635            handles.push(thread::spawn(move || {
636                barrier.wait();
637                for _ in 0..50 {
638                    let guard = mgr.fetch_page_read(page_id).expect("read page");
639                    assert_eq!(guard.data()[0], 42);
640                }
641            }));
642        }
643
644        for handle in handles {
645            handle.join().unwrap();
646        }
647
648        let pool = manager.buffer_pool();
649        let meta = pool.frame_meta(frame_id);
650        assert_eq!(meta.pin_count, 0);
651        assert_eq!(meta.page_id, page_id);
652    }
653
654    #[test]
655    fn concurrent_writes_mark_dirty_and_flush_once() {
656        const THREADS: usize = 4;
657        let (_tmp, manager) = setup_manager(4);
658        let (page_id, frame_id) = {
659            let guard = manager.new_page().unwrap();
660            (guard.page_id(), guard.frame_id())
661        };
662
663        let barrier = Arc::new(Barrier::new(THREADS));
664        let mut handles = Vec::with_capacity(THREADS);
665        for tid in 0..THREADS {
666            let mgr = manager.clone();
667            let barrier = barrier.clone();
668            handles.push(thread::spawn(move || {
669                let lsn = (tid as Lsn) + 1;
670                barrier.wait();
671                for _ in 0..25 {
672                    let mut guard = mgr.fetch_page_write(page_id).expect("write guard");
673                    guard.data_mut()[tid] = (tid as u8) + 1;
674                    guard.set_lsn(lsn);
675                    guard.mark_dirty();
676                }
677            }));
678        }
679
680        for handle in handles {
681            handle.join().unwrap();
682        }
683
684        {
685            let pool = manager.buffer_pool();
686            let meta = pool.frame_meta(frame_id);
687            assert!(meta.is_dirty);
688            assert_eq!(meta.pin_count, 0);
689            assert_eq!(meta.page_id, page_id);
690        }
691
692        assert!(manager.flush_page(page_id).unwrap());
693        {
694            let pool = manager.buffer_pool();
695            let meta = pool.frame_meta(frame_id);
696            assert!(!meta.is_dirty);
697            assert_eq!(meta.pin_count, 0);
698        }
699
700        let read_back = manager
701            .buffer_pool()
702            .disk_scheduler()
703            .schedule_read(page_id)
704            .unwrap()
705            .recv()
706            .unwrap()
707            .unwrap();
708        for tid in 0..THREADS {
709            assert_eq!(read_back[tid], (tid as u8) + 1);
710        }
711    }
712}