1use std::sync::Arc;
4
5use bytes::Bytes;
6use dashmap::{DashMap, DashSet};
7use parking_lot::{Mutex, RwLock};
8
9use crate::buffer::buffer_pool::{BufferPool, FrameId};
10use crate::buffer::page::{self, PageId, ReadPageGuard, WritePageGuard, INVALID_PAGE_ID};
11use crate::catalog::SchemaRef;
12use crate::config::BufferPoolConfig;
13use crate::error::{QuillSQLError, QuillSQLResult};
14use crate::recovery::{Lsn, WalManager};
15use crate::storage::codec::{
16 BPlusTreeHeaderPageCodec, BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec,
17 BPlusTreePageCodec, TablePageCodec,
18};
19use crate::storage::disk_scheduler::DiskScheduler;
20use crate::storage::page::{
21 BPlusTreeHeaderPage, BPlusTreeInternalPage, BPlusTreeLeafPage, BPlusTreePage, TablePage,
22};
23use crate::utils::cache::lru_k::LRUKReplacer;
24use crate::utils::cache::tiny_lfu::TinyLFU;
25use crate::utils::cache::Replacer;
26
27#[derive(Debug)]
28pub struct BufferManager {
29 pool: Arc<BufferPool>,
30 replacer: Arc<RwLock<LRUKReplacer>>,
31 inflight_loads: DashMap<PageId, Arc<Mutex<()>>>,
32 tiny_lfu: Option<Arc<RwLock<TinyLFU>>>,
33 dirty_pages: DashSet<PageId>,
34 dirty_page_table: DashMap<PageId, Lsn>,
35 wal_manager: Arc<RwLock<Option<Arc<WalManager>>>>,
36}
37
38impl BufferManager {
39 pub fn new(num_pages: usize, disk_scheduler: Arc<DiskScheduler>) -> Self {
40 Self::new_with_config(
41 BufferPoolConfig {
42 buffer_pool_size: num_pages,
43 ..Default::default()
44 },
45 disk_scheduler,
46 )
47 }
48
49 pub fn new_with_config(config: BufferPoolConfig, disk_scheduler: Arc<DiskScheduler>) -> Self {
50 let pool = Arc::new(BufferPool::new_with_config(config, disk_scheduler));
51 let replacer = Arc::new(RwLock::new(LRUKReplacer::new(pool.capacity())));
52 let tiny_lfu = if config.tiny_lfu_enable {
53 Some(Arc::new(RwLock::new(TinyLFU::new(
54 pool.capacity().next_power_of_two(),
55 config.tiny_lfu_counters,
56 ))))
57 } else {
58 None
59 };
60
61 Self {
62 pool,
63 replacer,
64 inflight_loads: DashMap::new(),
65 tiny_lfu,
66 dirty_pages: DashSet::new(),
67 dirty_page_table: DashMap::new(),
68 wal_manager: Arc::new(RwLock::new(None)),
69 }
70 }
71
72 pub fn buffer_pool(&self) -> Arc<BufferPool> {
73 self.pool.clone()
74 }
75
76 pub fn replacer_arc(&self) -> Arc<RwLock<LRUKReplacer>> {
77 self.replacer.clone()
78 }
79
80 pub fn set_wal_manager(&self, wal_manager: Arc<WalManager>) {
81 *self.wal_manager.write() = Some(wal_manager);
82 }
83
84 pub fn wal_manager(&self) -> Option<Arc<WalManager>> {
85 self.wal_manager.read().clone()
86 }
87
88 pub fn dirty_page_ids(&self) -> Vec<PageId> {
89 self.dirty_pages.iter().map(|entry| *entry.key()).collect()
90 }
91
92 pub fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)> {
93 self.dirty_page_table
94 .iter()
95 .map(|entry| (*entry.key(), *entry.value()))
96 .collect()
97 }
98
99 fn note_dirty_page(&self, page_id: PageId, rec_lsn: Lsn) {
100 self.dirty_pages.insert(page_id);
101 self.dirty_page_table.entry(page_id).or_insert(rec_lsn);
102 }
103
104 pub fn new_page(self: &Arc<Self>) -> QuillSQLResult<WritePageGuard> {
105 if !self.pool.has_free_frame() && self.replacer.read().size() == 0 {
106 return Err(QuillSQLError::Storage(
107 "Cannot new page because buffer pool is full and no page to evict".to_string(),
108 ));
109 }
110
111 let frame_id = self.allocate_frame()?;
112 let page_id = self.pool.allocate_page_id()?;
113 self.pool.insert_mapping(page_id, frame_id);
114
115 {
116 let meta = self.pool.frame_meta(frame_id);
117 meta.initialize(page_id);
118 meta.increment_pin();
119 }
120
121 self.pool.reset_frame(frame_id);
122 self.replacer_record_access(frame_id)?;
123 self.mark_non_evictable(frame_id)?;
124 Ok(page::new_write_guard(Arc::clone(self), frame_id))
125 }
126
127 pub fn fetch_page_read(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<ReadPageGuard> {
128 if page_id == INVALID_PAGE_ID {
129 return Err(QuillSQLError::Storage(
130 "fetch_page_read: invalid page id".to_string(),
131 ));
132 }
133
134 let frame_id = self.ensure_frame(page_id)?;
135 {
136 let meta = self.pool.frame_meta(frame_id);
137 meta.increment_pin();
138 }
139 self.replacer_record_access(frame_id)?;
140 self.mark_non_evictable(frame_id)?;
141 Ok(page::new_read_guard(Arc::clone(self), frame_id))
142 }
143
144 pub fn fetch_page_write(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<WritePageGuard> {
145 if page_id == INVALID_PAGE_ID {
146 return Err(QuillSQLError::Storage(
147 "fetch_page_write: invalid page id".to_string(),
148 ));
149 }
150
151 let frame_id = self.ensure_frame(page_id)?;
152 {
153 let meta = self.pool.frame_meta(frame_id);
154 meta.increment_pin();
155 }
156 self.replacer_record_access(frame_id)?;
157 self.mark_non_evictable(frame_id)?;
158 Ok(page::new_write_guard(Arc::clone(self), frame_id))
159 }
160
161 pub fn complete_unpin(
162 &self,
163 page_id: PageId,
164 is_dirty: bool,
165 rec_lsn_hint: Option<Lsn>,
166 ) -> QuillSQLResult<()> {
167 if let Some(frame_id) = self.pool.lookup_frame(page_id) {
168 let meta = self.pool.frame_meta(frame_id);
169 let new_pin_count = if meta.pin_count() > 0 {
170 meta.try_decrement_pin().unwrap_or(0)
171 } else {
172 0
173 };
174 if is_dirty {
175 meta.mark_dirty();
176 if let Some(lsn) = rec_lsn_hint {
177 meta.set_lsn(lsn);
178 }
179 let rec_lsn = rec_lsn_hint.unwrap_or_else(|| meta.lsn());
180 self.note_dirty_page(page_id, rec_lsn);
181 }
182 if new_pin_count == 0 {
183 self.mark_evictable(frame_id)?;
184 }
185 }
186 Ok(())
187 }
188
189 pub fn fetch_table_page(
190 self: &Arc<Self>,
191 page_id: PageId,
192 schema: SchemaRef,
193 ) -> QuillSQLResult<(ReadPageGuard, TablePage)> {
194 let guard = self.fetch_page_read(page_id)?;
195 let (page, _) = TablePageCodec::decode(guard.data(), schema)?;
196 Ok((guard, page))
197 }
198
199 pub fn fetch_tree_page(
200 self: &Arc<Self>,
201 page_id: PageId,
202 key_schema: SchemaRef,
203 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreePage)> {
204 let guard = self.fetch_page_read(page_id)?;
205 let (page, _) = BPlusTreePageCodec::decode(guard.data(), key_schema.clone())?;
206 Ok((guard, page))
207 }
208
209 pub fn fetch_tree_internal_page(
210 self: &Arc<Self>,
211 page_id: PageId,
212 key_schema: SchemaRef,
213 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeInternalPage)> {
214 let guard = self.fetch_page_read(page_id)?;
215 let (page, _) = BPlusTreeInternalPageCodec::decode(guard.data(), key_schema.clone())?;
216 Ok((guard, page))
217 }
218
219 pub fn fetch_tree_leaf_page(
220 self: &Arc<Self>,
221 page_id: PageId,
222 key_schema: SchemaRef,
223 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeLeafPage)> {
224 let guard = self.fetch_page_read(page_id)?;
225 let (page, _) = BPlusTreeLeafPageCodec::decode(guard.data(), key_schema.clone())?;
226 Ok((guard, page))
227 }
228
229 pub fn fetch_header_page(
230 self: &Arc<Self>,
231 page_id: PageId,
232 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeHeaderPage)> {
233 let guard = self.fetch_page_read(page_id)?;
234 let (header, _) = BPlusTreeHeaderPageCodec::decode(guard.data())?;
235 Ok((guard, header))
236 }
237
238 pub fn prefetch_page(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<()> {
239 if let Ok(g) = self.fetch_page_read(page_id) {
240 drop(g);
241 }
242 Ok(())
243 }
244
245 pub fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
246 let Some(frame_id) = self.pool.lookup_frame(page_id) else {
247 return Ok(false);
248 };
249 let meta = self.pool.frame_meta(frame_id);
250 if !meta.is_dirty() {
251 self.dirty_pages.remove(&page_id);
252 self.dirty_page_table.remove(&page_id);
253 return Ok(false);
254 }
255 let lsn = meta.lsn();
256 self.ensure_wal_durable(lsn)?;
257 let bytes = {
258 let _lock = self.pool.frame_lock(frame_id).read();
259 let slice = unsafe { self.pool.frame_slice(frame_id) };
260 Bytes::copy_from_slice(slice)
261 };
262 self.pool.write_page_to_disk(page_id, bytes)?;
263 meta.clear_dirty();
264 self.dirty_pages.remove(&page_id);
265 self.dirty_page_table.remove(&page_id);
266 Ok(true)
267 }
268
269 pub fn flush_all_pages(&self) -> QuillSQLResult<()> {
270 if let Some(wal) = self.wal_manager.read().clone() {
271 wal.flush(None)?;
272 }
273 let dirty_ids: Vec<PageId> = self.dirty_pages.iter().map(|entry| *entry.key()).collect();
274 for page_id in dirty_ids {
275 let _ = self.flush_page(page_id)?;
276 }
277 Ok(())
278 }
279
280 pub fn delete_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
281 let (guard, created_here) = if let Some(existing) = self.inflight_loads.get(&page_id) {
282 (existing.clone(), false)
283 } else {
284 let arc = Arc::new(Mutex::new(()));
285 self.inflight_loads.insert(page_id, arc.clone());
286 (arc, true)
287 };
288 let lock = guard.lock();
289 let result = self.delete_page_inner(page_id);
290 drop(lock);
291 if created_here {
292 self.inflight_loads.remove(&page_id);
293 }
294 result
295 }
296
297 fn delete_page_inner(&self, page_id: PageId) -> QuillSQLResult<bool> {
298 if let Some(frame_id) = self.pool.lookup_frame(page_id) {
299 let lock = self.pool.frame_lock(frame_id);
300 let guard = match lock.try_write() {
301 Some(g) => g,
302 None => return Ok(false),
303 };
304 drop(guard);
305 let meta = self.pool.frame_meta(frame_id);
306 if meta.page_id() != page_id {
307 self.pool.remove_mapping_if(page_id, frame_id);
308 return self.delete_page_inner(page_id);
309 }
310 if meta.pin_count() > 0 {
311 return Ok(false);
312 }
313 if !self.pool.remove_mapping_if(page_id, frame_id) {
314 return self.delete_page_inner(page_id);
315 }
316 self.pool.reset_frame(frame_id);
317 self.dirty_pages.remove(&page_id);
318 self.dirty_page_table.remove(&page_id);
319 self.pool.clear_frame_meta(frame_id);
320 {
321 let mut rep = self.replacer.write();
322 let _ = rep.set_evictable(frame_id, true);
323 rep.remove(frame_id);
324 }
325 self.pool.push_free_frame(frame_id);
326 self.pool
327 .disk_scheduler()
328 .schedule_deallocate(page_id)?
329 .recv()
330 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
331 Ok(true)
332 } else {
333 self.pool
334 .disk_scheduler()
335 .schedule_deallocate(page_id)?
336 .recv()
337 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
338 Ok(true)
339 }
340 }
341
342 fn ensure_frame(&self, page_id: PageId) -> QuillSQLResult<FrameId> {
343 if let Some(frame_id) = self.pool.lookup_frame(page_id) {
344 self.replacer_record_access(frame_id)?;
345 if let Some(lfu) = &self.tiny_lfu {
346 lfu.write().admit_record(page_id as u64);
347 }
348 return Ok(frame_id);
349 }
350
351 let (guard, created_here) = if let Some(existing) = self.inflight_loads.get(&page_id) {
352 (existing.clone(), false)
353 } else {
354 let arc = Arc::new(Mutex::new(()));
355 self.inflight_loads.insert(page_id, arc.clone());
356 (arc, true)
357 };
358 let _lock = guard.lock();
359
360 if let Some(frame_id) = self.pool.lookup_frame(page_id) {
361 if created_here {
362 self.inflight_loads.remove(&page_id);
363 }
364 self.replacer_record_access(frame_id)?;
365 if let Some(lfu) = &self.tiny_lfu {
366 lfu.write().admit_record(page_id as u64);
367 }
368 return Ok(frame_id);
369 }
370
371 if let Some(lfu) = &self.tiny_lfu {
372 let estimate = lfu.read().estimate(page_id as u64);
373 if estimate == 0 && !self.pool.has_free_frame() && self.replacer.read().size() == 0 {
374 if created_here {
375 self.inflight_loads.remove(&page_id);
376 }
377 return Err(QuillSQLError::Storage(
378 "Cannot allocate frame: admission denied and no space".to_string(),
379 ));
380 }
381 }
382
383 let frame_id = self.allocate_frame()?;
384 self.pool.load_page_into_frame(page_id, frame_id)?;
385 self.pool.insert_mapping(page_id, frame_id);
386
387 if let Some(lfu) = &self.tiny_lfu {
388 lfu.write().admit_record(page_id as u64);
389 }
390 if created_here {
391 self.inflight_loads.remove(&page_id);
392 }
393 self.replacer_record_access(frame_id)?;
394 Ok(frame_id)
395 }
396
397 fn allocate_frame(&self) -> QuillSQLResult<FrameId> {
398 if let Some(frame_id) = self.pool.pop_free_frame() {
399 return Ok(frame_id);
400 }
401 self.evict_victim_frame()
402 }
403
404 fn replacer_record_access(&self, frame_id: FrameId) -> QuillSQLResult<()> {
405 let mut rep = self.replacer.write();
406 let _ = rep.record_access(frame_id);
407 Ok(())
408 }
409
410 fn evict_victim_frame(&self) -> QuillSQLResult<FrameId> {
411 loop {
412 let victim = {
413 let mut rep = self.replacer.write();
414 match rep.evict() {
415 Some(frame_id) => frame_id,
416 None => {
417 return Err(QuillSQLError::Storage(
418 "Cannot allocate frame: buffer pool is full".to_string(),
419 ))
420 }
421 }
422 };
423
424 let snapshot = self.pool.frame_meta(victim).snapshot();
425 let page_id = snapshot.page_id;
426 let pin_count = snapshot.pin_count;
427 let is_dirty = snapshot.is_dirty;
428 let lsn = snapshot.lsn;
429
430 if pin_count > 0 {
431 let mut rep = self.replacer.write();
432 let _ = rep.record_access(victim);
433 let _ = rep.set_evictable(victim, false);
434 continue;
435 }
436
437 if page_id != INVALID_PAGE_ID {
438 if is_dirty {
439 self.ensure_wal_durable(lsn)?;
440 let bytes = Bytes::copy_from_slice(unsafe { self.pool.frame_slice(victim) });
441 self.pool.write_page_to_disk(page_id, bytes)?;
442 self.dirty_pages.remove(&page_id);
443 self.dirty_page_table.remove(&page_id);
444 }
445 self.pool.remove_mapping(page_id);
446 }
447
448 self.pool.clear_frame_meta(victim);
449 self.pool.reset_frame(victim);
450 return Ok(victim);
451 }
452 }
453
454 fn mark_evictable(&self, frame_id: FrameId) -> QuillSQLResult<()> {
455 let mut rep = self.replacer.write();
456 let _ = rep.set_evictable(frame_id, true);
457 Ok(())
458 }
459
460 fn mark_non_evictable(&self, frame_id: FrameId) -> QuillSQLResult<()> {
461 let mut rep = self.replacer.write();
462 let _ = rep.set_evictable(frame_id, false);
463 Ok(())
464 }
465
466 fn ensure_wal_durable(&self, lsn: Lsn) -> QuillSQLResult<()> {
467 if lsn == 0 {
468 return Ok(());
469 }
470 if let Some(wal) = self.wal_manager.read().clone() {
471 if lsn > wal.durable_lsn() {
472 wal.flush(Some(lsn))?;
473 if wal.durable_lsn() < lsn {
474 return Err(QuillSQLError::Internal(format!(
475 "Flush blocked: page_lsn={} > durable_lsn={}",
476 lsn,
477 wal.durable_lsn()
478 )));
479 }
480 }
481 }
482 Ok(())
483 }
484
485 pub fn clone_arc(self: &Arc<Self>) -> Arc<Self> {
486 self.clone()
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use crate::storage::disk_manager::DiskManager;
494 use crate::storage::disk_scheduler::DiskScheduler;
495 use std::sync::{Arc, Barrier};
496 use std::thread;
497 use tempfile::TempDir;
498
499 fn setup_manager(num_pages: usize) -> (TempDir, Arc<BufferManager>) {
500 let temp_dir = TempDir::new().unwrap();
501 let db_file = temp_dir.path().join("test.db");
502 let disk_manager = Arc::new(DiskManager::try_new(db_file).unwrap());
503 let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager));
504 let manager = Arc::new(BufferManager::new(num_pages, disk_scheduler));
505 (temp_dir, manager)
506 }
507
508 #[test]
509 fn new_page_initializes_frame() {
510 let (_tmp, manager) = setup_manager(2);
511 let guard = manager.new_page().unwrap();
512 let page_id = guard.page_id();
513 let frame_id = guard.frame_id();
514
515 assert!(guard.data().iter().all(|b| *b == 0));
517 assert!(!guard.is_dirty());
518 assert_eq!(guard.lsn(), 0);
519
520 drop(guard);
521
522 let meta = manager.buffer_pool().frame_meta(frame_id).snapshot();
523 assert_eq!(meta.page_id, page_id);
524 assert_eq!(meta.pin_count, 0);
525 assert!(!meta.is_dirty);
526 }
527
528 #[test]
529 fn fetch_page_read_increments_pin_and_resets_on_drop() {
530 let (_tmp, manager) = setup_manager(2);
531 let guard = manager.new_page().unwrap();
532 let page_id = guard.page_id();
533 let frame_id = guard.frame_id();
534 drop(guard);
535
536 {
537 let read_guard = manager.fetch_page_read(page_id).unwrap();
538 assert_eq!(read_guard.pin_count(), 1);
539 assert_eq!(read_guard.frame_id(), frame_id);
540 }
541
542 let meta = manager.buffer_pool().frame_meta(frame_id).snapshot();
543 assert_eq!(meta.pin_count, 0);
544 }
545
546 #[test]
547 fn fetch_page_write_marks_dirty_and_tracks_lsn() {
548 let (_tmp, manager) = setup_manager(2);
549 let mut guard = manager.new_page().unwrap();
550 let page_id = guard.page_id();
551 guard.data_mut()[0] = 7;
552 guard.set_lsn(99);
553 guard.mark_dirty();
554 drop(guard);
555
556 let mut write_guard = manager.fetch_page_write(page_id).unwrap();
557 assert!(write_guard.is_dirty());
558 assert_eq!(write_guard.lsn(), 99);
559 write_guard.data_mut()[1] = 8;
560 drop(write_guard);
561
562 let meta = manager
563 .buffer_pool()
564 .frame_meta(manager.buffer_pool().lookup_frame(page_id).unwrap())
565 .snapshot();
566 assert!(meta.is_dirty);
567 assert_eq!(meta.lsn, 99);
568 assert_eq!(meta.pin_count, 0);
569 }
570
571 #[test]
572 fn flush_page_writes_back_and_clears_dirty_flag() {
573 let (_tmp, manager) = setup_manager(2);
574 let mut guard = manager.new_page().unwrap();
575 let page_id = guard.page_id();
576 guard.data_mut()[0] = 42;
577 guard.set_lsn(123);
578 guard.mark_dirty();
579 drop(guard);
580
581 assert!(manager.flush_page(page_id).unwrap());
582
583 let meta = manager
584 .buffer_pool()
585 .frame_meta(manager.buffer_pool().lookup_frame(page_id).unwrap())
586 .snapshot();
587 assert!(!meta.is_dirty);
588 }
589
590 #[test]
591 fn delete_page_releases_frame() {
592 let (_tmp, manager) = setup_manager(2);
593 let page_id = {
594 let guard = manager.new_page().unwrap();
595 guard.page_id()
596 };
597
598 assert!(manager.delete_page(page_id).unwrap());
599 assert!(manager.buffer_pool().lookup_frame(page_id).is_none());
600 assert!(manager.buffer_pool().has_free_frame());
601
602 let new_guard = manager.new_page().unwrap();
604 assert!(new_guard.frame_id() < manager.buffer_pool().capacity());
605 }
606
607 #[test]
608 fn concurrent_reads_do_not_leak_pins() {
609 const THREADS: usize = 8;
610 let (_tmp, manager) = setup_manager(4);
611 let (page_id, frame_id) = {
612 let mut guard = manager.new_page().unwrap();
613 guard.data_mut()[0] = 42;
614 (guard.page_id(), guard.frame_id())
615 };
616
617 let barrier = Arc::new(Barrier::new(THREADS));
618 let mut handles = Vec::with_capacity(THREADS);
619 for _ in 0..THREADS {
620 let mgr = manager.clone();
621 let barrier = barrier.clone();
622 handles.push(thread::spawn(move || {
623 barrier.wait();
624 for _ in 0..50 {
625 let guard = mgr.fetch_page_read(page_id).expect("read page");
626 assert_eq!(guard.data()[0], 42);
627 }
628 }));
629 }
630
631 for handle in handles {
632 handle.join().unwrap();
633 }
634
635 let pool = manager.buffer_pool();
636 let meta = pool.frame_meta(frame_id).snapshot();
637 assert_eq!(meta.pin_count, 0);
638 assert_eq!(meta.page_id, page_id);
639 }
640
641 #[test]
642 fn concurrent_writes_mark_dirty_and_flush_once() {
643 const THREADS: usize = 4;
644 let (_tmp, manager) = setup_manager(4);
645 let (page_id, frame_id) = {
646 let guard = manager.new_page().unwrap();
647 (guard.page_id(), guard.frame_id())
648 };
649
650 let barrier = Arc::new(Barrier::new(THREADS));
651 let mut handles = Vec::with_capacity(THREADS);
652 for tid in 0..THREADS {
653 let mgr = manager.clone();
654 let barrier = barrier.clone();
655 handles.push(thread::spawn(move || {
656 let lsn = (tid as Lsn) + 1;
657 barrier.wait();
658 for _ in 0..25 {
659 let mut guard = mgr.fetch_page_write(page_id).expect("write guard");
660 guard.data_mut()[tid] = (tid as u8) + 1;
661 guard.set_lsn(lsn);
662 guard.mark_dirty();
663 }
664 }));
665 }
666
667 for handle in handles {
668 handle.join().unwrap();
669 }
670
671 {
672 let pool = manager.buffer_pool();
673 let meta = pool.frame_meta(frame_id).snapshot();
674 assert!(meta.is_dirty);
675 assert_eq!(meta.pin_count, 0);
676 assert_eq!(meta.page_id, page_id);
677 }
678
679 assert!(manager.flush_page(page_id).unwrap());
680 {
681 let pool = manager.buffer_pool();
682 let meta = pool.frame_meta(frame_id).snapshot();
683 assert!(!meta.is_dirty);
684 assert_eq!(meta.pin_count, 0);
685 }
686
687 let read_back = manager
688 .buffer_pool()
689 .disk_scheduler()
690 .schedule_read(page_id)
691 .unwrap()
692 .recv()
693 .unwrap()
694 .unwrap();
695 for tid in 0..THREADS {
696 assert_eq!(read_back[tid], (tid as u8) + 1);
697 }
698 }
699}