1use std::sync::Arc;
4
5use bytes::Bytes;
6use dashmap::{DashMap, DashSet};
7use parking_lot::{Mutex, RwLock};
8
9use crate::buffer::buffer_pool::{BufferPool, FrameId, FrameMetaSnapshot};
10use crate::buffer::page::{self, PageId, ReadPageGuard, WritePageGuard, INVALID_PAGE_ID};
11use crate::catalog::SchemaRef;
12use crate::config::BufferPoolConfig;
13use crate::error::{QuillSQLError, QuillSQLResult};
14use crate::recovery::{Lsn, WalManager};
15use crate::storage::codec::{
16 BPlusTreeHeaderPageCodec, BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec,
17 BPlusTreePageCodec, TablePageCodec,
18};
19use crate::storage::disk_scheduler::DiskScheduler;
20use crate::storage::page::{
21 BPlusTreeHeaderPage, BPlusTreeInternalPage, BPlusTreeLeafPage, BPlusTreePage, TablePage,
22};
23use crate::utils::cache::lru_k::LRUKReplacer;
24use crate::utils::cache::tiny_lfu::TinyLFU;
25use crate::utils::cache::Replacer;
26
27#[derive(Debug)]
28pub struct BufferManager {
29 pool: Arc<BufferPool>,
30 replacer: Arc<RwLock<LRUKReplacer>>,
31 inflight_loads: DashMap<PageId, Arc<Mutex<()>>>,
32 tiny_lfu: Option<Arc<RwLock<TinyLFU>>>,
33 dirty_pages: DashSet<PageId>,
34 dirty_page_table: DashMap<PageId, Lsn>,
35 wal_manager: Arc<RwLock<Option<Arc<WalManager>>>>,
36}
37
38impl BufferManager {
39 pub fn new(num_pages: usize, disk_scheduler: Arc<DiskScheduler>) -> Self {
40 Self::new_with_config(
41 BufferPoolConfig {
42 buffer_pool_size: num_pages,
43 ..Default::default()
44 },
45 disk_scheduler,
46 )
47 }
48
49 pub fn new_with_config(config: BufferPoolConfig, disk_scheduler: Arc<DiskScheduler>) -> Self {
50 let pool = Arc::new(BufferPool::new_with_config(config, disk_scheduler));
51 let replacer = Arc::new(RwLock::new(LRUKReplacer::new(pool.capacity())));
52 let tiny_lfu = if config.tiny_lfu_enable {
53 Some(Arc::new(RwLock::new(TinyLFU::new(
54 pool.capacity().next_power_of_two(),
55 config.tiny_lfu_counters,
56 ))))
57 } else {
58 None
59 };
60
61 Self {
62 pool,
63 replacer,
64 inflight_loads: DashMap::new(),
65 tiny_lfu,
66 dirty_pages: DashSet::new(),
67 dirty_page_table: DashMap::new(),
68 wal_manager: Arc::new(RwLock::new(None)),
69 }
70 }
71
72 pub fn buffer_pool(&self) -> Arc<BufferPool> {
73 self.pool.clone()
74 }
75
76 pub fn replacer_arc(&self) -> Arc<RwLock<LRUKReplacer>> {
77 self.replacer.clone()
78 }
79
80 pub fn set_wal_manager(&self, wal_manager: Arc<WalManager>) {
81 *self.wal_manager.write() = Some(wal_manager);
82 }
83
84 pub fn wal_manager(&self) -> Option<Arc<WalManager>> {
85 self.wal_manager.read().clone()
86 }
87
88 pub fn frame_meta_snapshot(&self) -> Vec<FrameMetaSnapshot> {
89 self.pool.frame_meta_snapshot()
90 }
91
92 pub fn dirty_page_ids(&self) -> Vec<PageId> {
93 self.dirty_pages.iter().map(|entry| *entry.key()).collect()
94 }
95
96 pub fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)> {
97 self.dirty_page_table
98 .iter()
99 .map(|entry| (*entry.key(), *entry.value()))
100 .collect()
101 }
102
103 fn note_dirty_page(&self, page_id: PageId, rec_lsn: Lsn) {
104 self.dirty_pages.insert(page_id);
105 self.dirty_page_table.entry(page_id).or_insert(rec_lsn);
106 }
107
108 pub fn new_page(self: &Arc<Self>) -> QuillSQLResult<WritePageGuard> {
109 if !self.pool.has_free_frame() && self.replacer.read().size() == 0 {
110 return Err(QuillSQLError::Storage(
111 "Cannot new page because buffer pool is full and no page to evict".to_string(),
112 ));
113 }
114
115 let frame_id = self.allocate_frame()?;
116 let page_id = self.pool.allocate_page_id()?;
117 self.pool.insert_mapping(page_id, frame_id);
118
119 {
120 let meta = self.pool.frame_meta(frame_id);
121 meta.initialize(page_id);
122 meta.increment_pin();
123 }
124
125 self.pool.reset_frame(frame_id);
126 self.replacer_record_access(frame_id)?;
127 self.mark_non_evictable(frame_id)?;
128 Ok(page::new_write_guard(Arc::clone(self), frame_id))
129 }
130
131 pub fn fetch_page_read(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<ReadPageGuard> {
132 if page_id == INVALID_PAGE_ID {
133 return Err(QuillSQLError::Storage(
134 "fetch_page_read: invalid page id".to_string(),
135 ));
136 }
137
138 let frame_id = self.ensure_frame(page_id)?;
139 {
140 let meta = self.pool.frame_meta(frame_id);
141 meta.increment_pin();
142 }
143 self.replacer_record_access(frame_id)?;
144 self.mark_non_evictable(frame_id)?;
145 Ok(page::new_read_guard(Arc::clone(self), frame_id))
146 }
147
148 pub fn fetch_page_write(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<WritePageGuard> {
149 if page_id == INVALID_PAGE_ID {
150 return Err(QuillSQLError::Storage(
151 "fetch_page_write: invalid page id".to_string(),
152 ));
153 }
154
155 let frame_id = self.ensure_frame(page_id)?;
156 {
157 let meta = self.pool.frame_meta(frame_id);
158 meta.increment_pin();
159 }
160 self.replacer_record_access(frame_id)?;
161 self.mark_non_evictable(frame_id)?;
162 Ok(page::new_write_guard(Arc::clone(self), frame_id))
163 }
164
165 pub fn complete_unpin(
166 &self,
167 page_id: PageId,
168 is_dirty: bool,
169 rec_lsn_hint: Option<Lsn>,
170 ) -> QuillSQLResult<()> {
171 if let Some(frame_id) = self.pool.lookup_frame(page_id) {
172 let meta = self.pool.frame_meta(frame_id);
173 let new_pin_count = if meta.pin_count() > 0 {
174 meta.try_decrement_pin().unwrap_or(0)
175 } else {
176 0
177 };
178 if is_dirty {
179 meta.mark_dirty();
180 if let Some(lsn) = rec_lsn_hint {
181 meta.set_lsn(lsn);
182 }
183 let rec_lsn = rec_lsn_hint.unwrap_or_else(|| meta.lsn());
184 self.note_dirty_page(page_id, rec_lsn);
185 }
186 if new_pin_count == 0 {
187 self.mark_evictable(frame_id)?;
188 }
189 }
190 Ok(())
191 }
192
193 pub fn fetch_table_page(
194 self: &Arc<Self>,
195 page_id: PageId,
196 schema: SchemaRef,
197 ) -> QuillSQLResult<(ReadPageGuard, TablePage)> {
198 let guard = self.fetch_page_read(page_id)?;
199 let (page, _) = TablePageCodec::decode(guard.data(), schema)?;
200 Ok((guard, page))
201 }
202
203 pub fn fetch_tree_page(
204 self: &Arc<Self>,
205 page_id: PageId,
206 key_schema: SchemaRef,
207 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreePage)> {
208 let guard = self.fetch_page_read(page_id)?;
209 let (page, _) = BPlusTreePageCodec::decode(guard.data(), key_schema.clone())?;
210 Ok((guard, page))
211 }
212
213 pub fn fetch_tree_internal_page(
214 self: &Arc<Self>,
215 page_id: PageId,
216 key_schema: SchemaRef,
217 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeInternalPage)> {
218 let guard = self.fetch_page_read(page_id)?;
219 let (page, _) = BPlusTreeInternalPageCodec::decode(guard.data(), key_schema.clone())?;
220 Ok((guard, page))
221 }
222
223 pub fn fetch_tree_leaf_page(
224 self: &Arc<Self>,
225 page_id: PageId,
226 key_schema: SchemaRef,
227 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeLeafPage)> {
228 let guard = self.fetch_page_read(page_id)?;
229 let (page, _) = BPlusTreeLeafPageCodec::decode(guard.data(), key_schema.clone())?;
230 Ok((guard, page))
231 }
232
233 pub fn fetch_header_page(
234 self: &Arc<Self>,
235 page_id: PageId,
236 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeHeaderPage)> {
237 let guard = self.fetch_page_read(page_id)?;
238 let (header, _) = BPlusTreeHeaderPageCodec::decode(guard.data())?;
239 Ok((guard, header))
240 }
241
242 pub fn prefetch_page(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<()> {
243 if let Ok(g) = self.fetch_page_read(page_id) {
244 drop(g);
245 }
246 Ok(())
247 }
248
249 pub fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
250 let Some(frame_id) = self.pool.lookup_frame(page_id) else {
251 return Ok(false);
252 };
253 let meta = self.pool.frame_meta(frame_id);
254 if !meta.is_dirty() {
255 self.dirty_pages.remove(&page_id);
256 self.dirty_page_table.remove(&page_id);
257 return Ok(false);
258 }
259 let lsn = meta.lsn();
260 self.ensure_wal_durable(lsn)?;
261 let bytes = {
262 let _lock = self.pool.frame_lock(frame_id).read();
263 let slice = unsafe { self.pool.frame_slice(frame_id) };
264 Bytes::copy_from_slice(slice)
265 };
266 self.pool.write_page_to_disk(page_id, bytes)?;
267 meta.clear_dirty();
268 self.dirty_pages.remove(&page_id);
269 self.dirty_page_table.remove(&page_id);
270 Ok(true)
271 }
272
273 pub fn flush_all_pages(&self) -> QuillSQLResult<()> {
274 if let Some(wal) = self.wal_manager.read().clone() {
275 wal.flush(None)?;
276 }
277 let dirty_ids: Vec<PageId> = self.dirty_pages.iter().map(|entry| *entry.key()).collect();
278 for page_id in dirty_ids {
279 let _ = self.flush_page(page_id)?;
280 }
281 Ok(())
282 }
283
284 pub fn delete_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
285 let (guard, created_here) = if let Some(existing) = self.inflight_loads.get(&page_id) {
286 (existing.clone(), false)
287 } else {
288 let arc = Arc::new(Mutex::new(()));
289 self.inflight_loads.insert(page_id, arc.clone());
290 (arc, true)
291 };
292 let lock = guard.lock();
293 let result = self.delete_page_inner(page_id);
294 drop(lock);
295 if created_here {
296 self.inflight_loads.remove(&page_id);
297 }
298 result
299 }
300
301 fn delete_page_inner(&self, page_id: PageId) -> QuillSQLResult<bool> {
302 if let Some(frame_id) = self.pool.lookup_frame(page_id) {
303 let lock = self.pool.frame_lock(frame_id);
304 let guard = match lock.try_write() {
305 Some(g) => g,
306 None => return Ok(false),
307 };
308 drop(guard);
309 let meta = self.pool.frame_meta(frame_id);
310 if meta.page_id() != page_id {
311 self.pool.remove_mapping_if(page_id, frame_id);
312 return self.delete_page_inner(page_id);
313 }
314 if meta.pin_count() > 0 {
315 return Ok(false);
316 }
317 if !self.pool.remove_mapping_if(page_id, frame_id) {
318 return self.delete_page_inner(page_id);
319 }
320 self.pool.reset_frame(frame_id);
321 self.dirty_pages.remove(&page_id);
322 self.dirty_page_table.remove(&page_id);
323 self.pool.clear_frame_meta(frame_id);
324 {
325 let mut rep = self.replacer.write();
326 let _ = rep.set_evictable(frame_id, true);
327 rep.remove(frame_id);
328 }
329 self.pool.push_free_frame(frame_id);
330 self.pool
331 .disk_scheduler()
332 .schedule_deallocate(page_id)?
333 .recv()
334 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
335 Ok(true)
336 } else {
337 self.pool
338 .disk_scheduler()
339 .schedule_deallocate(page_id)?
340 .recv()
341 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
342 Ok(true)
343 }
344 }
345
346 fn ensure_frame(&self, page_id: PageId) -> QuillSQLResult<FrameId> {
347 if let Some(frame_id) = self.pool.lookup_frame(page_id) {
348 self.replacer_record_access(frame_id)?;
349 if let Some(lfu) = &self.tiny_lfu {
350 lfu.write().admit_record(page_id as u64);
351 }
352 return Ok(frame_id);
353 }
354
355 let (guard, created_here) = if let Some(existing) = self.inflight_loads.get(&page_id) {
356 (existing.clone(), false)
357 } else {
358 let arc = Arc::new(Mutex::new(()));
359 self.inflight_loads.insert(page_id, arc.clone());
360 (arc, true)
361 };
362 let _lock = guard.lock();
363
364 if let Some(frame_id) = self.pool.lookup_frame(page_id) {
365 if created_here {
366 self.inflight_loads.remove(&page_id);
367 }
368 self.replacer_record_access(frame_id)?;
369 if let Some(lfu) = &self.tiny_lfu {
370 lfu.write().admit_record(page_id as u64);
371 }
372 return Ok(frame_id);
373 }
374
375 if let Some(lfu) = &self.tiny_lfu {
376 let estimate = lfu.read().estimate(page_id as u64);
377 if estimate == 0 && !self.pool.has_free_frame() && self.replacer.read().size() == 0 {
378 if created_here {
379 self.inflight_loads.remove(&page_id);
380 }
381 return Err(QuillSQLError::Storage(
382 "Cannot allocate frame: admission denied and no space".to_string(),
383 ));
384 }
385 }
386
387 let frame_id = self.allocate_frame()?;
388 self.pool.load_page_into_frame(page_id, frame_id)?;
389 self.pool.insert_mapping(page_id, frame_id);
390
391 if let Some(lfu) = &self.tiny_lfu {
392 lfu.write().admit_record(page_id as u64);
393 }
394 if created_here {
395 self.inflight_loads.remove(&page_id);
396 }
397 self.replacer_record_access(frame_id)?;
398 Ok(frame_id)
399 }
400
401 fn allocate_frame(&self) -> QuillSQLResult<FrameId> {
402 if let Some(frame_id) = self.pool.pop_free_frame() {
403 return Ok(frame_id);
404 }
405 self.evict_victim_frame()
406 }
407
408 fn replacer_record_access(&self, frame_id: FrameId) -> QuillSQLResult<()> {
409 let mut rep = self.replacer.write();
410 let _ = rep.record_access(frame_id);
411 Ok(())
412 }
413
414 fn evict_victim_frame(&self) -> QuillSQLResult<FrameId> {
415 loop {
416 let victim = {
417 let mut rep = self.replacer.write();
418 match rep.evict() {
419 Some(frame_id) => frame_id,
420 None => {
421 return Err(QuillSQLError::Storage(
422 "Cannot allocate frame: buffer pool is full".to_string(),
423 ))
424 }
425 }
426 };
427
428 let snapshot = self.pool.frame_meta(victim).snapshot();
429 let page_id = snapshot.page_id;
430 let pin_count = snapshot.pin_count;
431 let is_dirty = snapshot.is_dirty;
432 let lsn = snapshot.lsn;
433
434 if pin_count > 0 {
435 let mut rep = self.replacer.write();
436 let _ = rep.record_access(victim);
437 let _ = rep.set_evictable(victim, false);
438 continue;
439 }
440
441 if page_id != INVALID_PAGE_ID {
442 if is_dirty {
443 self.ensure_wal_durable(lsn)?;
444 let bytes = Bytes::copy_from_slice(unsafe { self.pool.frame_slice(victim) });
445 self.pool.write_page_to_disk(page_id, bytes)?;
446 self.dirty_pages.remove(&page_id);
447 self.dirty_page_table.remove(&page_id);
448 }
449 self.pool.remove_mapping(page_id);
450 }
451
452 self.pool.clear_frame_meta(victim);
453 self.pool.reset_frame(victim);
454 return Ok(victim);
455 }
456 }
457
458 fn mark_evictable(&self, frame_id: FrameId) -> QuillSQLResult<()> {
459 let mut rep = self.replacer.write();
460 let _ = rep.set_evictable(frame_id, true);
461 Ok(())
462 }
463
464 fn mark_non_evictable(&self, frame_id: FrameId) -> QuillSQLResult<()> {
465 let mut rep = self.replacer.write();
466 let _ = rep.set_evictable(frame_id, false);
467 Ok(())
468 }
469
470 fn ensure_wal_durable(&self, lsn: Lsn) -> QuillSQLResult<()> {
471 if lsn == 0 {
472 return Ok(());
473 }
474 if let Some(wal) = self.wal_manager.read().clone() {
475 if lsn > wal.durable_lsn() {
476 wal.flush(Some(lsn))?;
477 if wal.durable_lsn() < lsn {
478 return Err(QuillSQLError::Internal(format!(
479 "Flush blocked: page_lsn={} > durable_lsn={}",
480 lsn,
481 wal.durable_lsn()
482 )));
483 }
484 }
485 }
486 Ok(())
487 }
488
489 pub fn clone_arc(self: &Arc<Self>) -> Arc<Self> {
490 self.clone()
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497 use crate::storage::disk_manager::DiskManager;
498 use crate::storage::disk_scheduler::DiskScheduler;
499 use std::sync::{Arc, Barrier};
500 use std::thread;
501 use tempfile::TempDir;
502
503 fn setup_manager(num_pages: usize) -> (TempDir, Arc<BufferManager>) {
504 let temp_dir = TempDir::new().unwrap();
505 let db_file = temp_dir.path().join("test.db");
506 let disk_manager = Arc::new(DiskManager::try_new(db_file).unwrap());
507 let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager));
508 let manager = Arc::new(BufferManager::new(num_pages, disk_scheduler));
509 (temp_dir, manager)
510 }
511
512 #[test]
513 fn new_page_initializes_frame() {
514 let (_tmp, manager) = setup_manager(2);
515 let guard = manager.new_page().unwrap();
516 let page_id = guard.page_id();
517 let frame_id = guard.frame_id();
518
519 assert!(guard.data().iter().all(|b| *b == 0));
521 assert!(!guard.is_dirty());
522 assert_eq!(guard.lsn(), 0);
523
524 drop(guard);
525
526 let meta = manager.buffer_pool().frame_meta(frame_id).snapshot();
527 assert_eq!(meta.page_id, page_id);
528 assert_eq!(meta.pin_count, 0);
529 assert!(!meta.is_dirty);
530 }
531
532 #[test]
533 fn fetch_page_read_increments_pin_and_resets_on_drop() {
534 let (_tmp, manager) = setup_manager(2);
535 let guard = manager.new_page().unwrap();
536 let page_id = guard.page_id();
537 let frame_id = guard.frame_id();
538 drop(guard);
539
540 {
541 let read_guard = manager.fetch_page_read(page_id).unwrap();
542 assert_eq!(read_guard.pin_count(), 1);
543 assert_eq!(read_guard.frame_id(), frame_id);
544 }
545
546 let meta = manager.buffer_pool().frame_meta(frame_id).snapshot();
547 assert_eq!(meta.pin_count, 0);
548 }
549
550 #[test]
551 fn fetch_page_write_marks_dirty_and_tracks_lsn() {
552 let (_tmp, manager) = setup_manager(2);
553 let mut guard = manager.new_page().unwrap();
554 let page_id = guard.page_id();
555 guard.data_mut()[0] = 7;
556 guard.set_lsn(99);
557 guard.mark_dirty();
558 drop(guard);
559
560 let mut write_guard = manager.fetch_page_write(page_id).unwrap();
561 assert!(write_guard.is_dirty());
562 assert_eq!(write_guard.lsn(), 99);
563 write_guard.data_mut()[1] = 8;
564 drop(write_guard);
565
566 let meta = manager
567 .buffer_pool()
568 .frame_meta(manager.buffer_pool().lookup_frame(page_id).unwrap())
569 .snapshot();
570 assert!(meta.is_dirty);
571 assert_eq!(meta.lsn, 99);
572 assert_eq!(meta.pin_count, 0);
573 }
574
575 #[test]
576 fn flush_page_writes_back_and_clears_dirty_flag() {
577 let (_tmp, manager) = setup_manager(2);
578 let mut guard = manager.new_page().unwrap();
579 let page_id = guard.page_id();
580 guard.data_mut()[0] = 42;
581 guard.set_lsn(123);
582 guard.mark_dirty();
583 drop(guard);
584
585 assert!(manager.flush_page(page_id).unwrap());
586
587 let meta = manager
588 .buffer_pool()
589 .frame_meta(manager.buffer_pool().lookup_frame(page_id).unwrap())
590 .snapshot();
591 assert!(!meta.is_dirty);
592 }
593
594 #[test]
595 fn delete_page_releases_frame() {
596 let (_tmp, manager) = setup_manager(2);
597 let page_id = {
598 let guard = manager.new_page().unwrap();
599 guard.page_id()
600 };
601
602 assert!(manager.delete_page(page_id).unwrap());
603 assert!(manager.buffer_pool().lookup_frame(page_id).is_none());
604 assert!(manager.buffer_pool().has_free_frame());
605
606 let new_guard = manager.new_page().unwrap();
608 assert!(new_guard.frame_id() < manager.buffer_pool().capacity());
609 }
610
611 #[test]
612 fn concurrent_reads_do_not_leak_pins() {
613 const THREADS: usize = 8;
614 let (_tmp, manager) = setup_manager(4);
615 let (page_id, frame_id) = {
616 let mut guard = manager.new_page().unwrap();
617 guard.data_mut()[0] = 42;
618 (guard.page_id(), guard.frame_id())
619 };
620
621 let barrier = Arc::new(Barrier::new(THREADS));
622 let mut handles = Vec::with_capacity(THREADS);
623 for _ in 0..THREADS {
624 let mgr = manager.clone();
625 let barrier = barrier.clone();
626 handles.push(thread::spawn(move || {
627 barrier.wait();
628 for _ in 0..50 {
629 let guard = mgr.fetch_page_read(page_id).expect("read page");
630 assert_eq!(guard.data()[0], 42);
631 }
632 }));
633 }
634
635 for handle in handles {
636 handle.join().unwrap();
637 }
638
639 let pool = manager.buffer_pool();
640 let meta = pool.frame_meta(frame_id).snapshot();
641 assert_eq!(meta.pin_count, 0);
642 assert_eq!(meta.page_id, page_id);
643 }
644
645 #[test]
646 fn concurrent_writes_mark_dirty_and_flush_once() {
647 const THREADS: usize = 4;
648 let (_tmp, manager) = setup_manager(4);
649 let (page_id, frame_id) = {
650 let guard = manager.new_page().unwrap();
651 (guard.page_id(), guard.frame_id())
652 };
653
654 let barrier = Arc::new(Barrier::new(THREADS));
655 let mut handles = Vec::with_capacity(THREADS);
656 for tid in 0..THREADS {
657 let mgr = manager.clone();
658 let barrier = barrier.clone();
659 handles.push(thread::spawn(move || {
660 let lsn = (tid as Lsn) + 1;
661 barrier.wait();
662 for _ in 0..25 {
663 let mut guard = mgr.fetch_page_write(page_id).expect("write guard");
664 guard.data_mut()[tid] = (tid as u8) + 1;
665 guard.set_lsn(lsn);
666 guard.mark_dirty();
667 }
668 }));
669 }
670
671 for handle in handles {
672 handle.join().unwrap();
673 }
674
675 {
676 let pool = manager.buffer_pool();
677 let meta = pool.frame_meta(frame_id).snapshot();
678 assert!(meta.is_dirty);
679 assert_eq!(meta.pin_count, 0);
680 assert_eq!(meta.page_id, page_id);
681 }
682
683 assert!(manager.flush_page(page_id).unwrap());
684 {
685 let pool = manager.buffer_pool();
686 let meta = pool.frame_meta(frame_id).snapshot();
687 assert!(!meta.is_dirty);
688 assert_eq!(meta.pin_count, 0);
689 }
690
691 let read_back = manager
692 .buffer_pool()
693 .disk_scheduler()
694 .schedule_read(page_id)
695 .unwrap()
696 .recv()
697 .unwrap()
698 .unwrap();
699 for tid in 0..THREADS {
700 assert_eq!(read_back[tid], (tid as u8) + 1);
701 }
702 }
703}