1use std::sync::Arc;
4
5use bytes::Bytes;
6use dashmap::{DashMap, DashSet};
7use parking_lot::{Mutex, RwLock};
8
9use crate::buffer::buffer_pool::{BufferPool, FrameId, FrameMeta};
10use crate::buffer::page::{self, PageId, ReadPageGuard, WritePageGuard, INVALID_PAGE_ID};
11use crate::catalog::SchemaRef;
12use crate::config::BufferPoolConfig;
13use crate::error::{QuillSQLError, QuillSQLResult};
14use crate::recovery::{Lsn, WalManager};
15use crate::storage::codec::{
16 BPlusTreeHeaderPageCodec, BPlusTreeInternalPageCodec, BPlusTreeLeafPageCodec,
17 BPlusTreePageCodec, TablePageCodec,
18};
19use crate::storage::disk_scheduler::DiskScheduler;
20use crate::storage::page::{
21 BPlusTreeHeaderPage, BPlusTreeInternalPage, BPlusTreeLeafPage, BPlusTreePage, TablePage,
22};
23use crate::utils::cache::lru_k::LRUKReplacer;
24use crate::utils::cache::tiny_lfu::TinyLFU;
25use crate::utils::cache::Replacer;
26
27#[derive(Debug)]
28pub struct BufferManager {
29 pool: Arc<BufferPool>,
30 replacer: Arc<RwLock<LRUKReplacer>>,
31 inflight_loads: DashMap<PageId, Arc<Mutex<()>>>,
32 tiny_lfu: Option<Arc<RwLock<TinyLFU>>>,
33 dirty_pages: DashSet<PageId>,
34 dirty_page_table: DashMap<PageId, Lsn>,
35 wal_manager: Arc<RwLock<Option<Arc<WalManager>>>>,
36}
37
38impl BufferManager {
39 pub fn new(num_pages: usize, disk_scheduler: Arc<DiskScheduler>) -> Self {
40 Self::new_with_config(
41 BufferPoolConfig {
42 buffer_pool_size: num_pages,
43 ..Default::default()
44 },
45 disk_scheduler,
46 )
47 }
48
49 pub fn new_with_config(config: BufferPoolConfig, disk_scheduler: Arc<DiskScheduler>) -> Self {
50 let pool = Arc::new(BufferPool::new_with_config(config, disk_scheduler));
51 let replacer = Arc::new(RwLock::new(LRUKReplacer::new(pool.capacity())));
52 let tiny_lfu = if config.tiny_lfu_enable {
53 Some(Arc::new(RwLock::new(TinyLFU::new(
54 pool.capacity().next_power_of_two(),
55 config.tiny_lfu_counters,
56 ))))
57 } else {
58 None
59 };
60
61 Self {
62 pool,
63 replacer,
64 inflight_loads: DashMap::new(),
65 tiny_lfu,
66 dirty_pages: DashSet::new(),
67 dirty_page_table: DashMap::new(),
68 wal_manager: Arc::new(RwLock::new(None)),
69 }
70 }
71
72 pub fn buffer_pool(&self) -> Arc<BufferPool> {
73 self.pool.clone()
74 }
75
76 pub fn replacer_arc(&self) -> Arc<RwLock<LRUKReplacer>> {
77 self.replacer.clone()
78 }
79
80 pub fn set_wal_manager(&self, wal_manager: Arc<WalManager>) {
81 *self.wal_manager.write() = Some(wal_manager);
82 }
83
84 pub fn wal_manager(&self) -> Option<Arc<WalManager>> {
85 self.wal_manager.read().clone()
86 }
87
88 pub fn dirty_page_ids(&self) -> Vec<PageId> {
89 self.dirty_pages.iter().map(|entry| *entry.key()).collect()
90 }
91
92 pub fn dirty_page_table_snapshot(&self) -> Vec<(PageId, Lsn)> {
93 self.dirty_page_table
94 .iter()
95 .map(|entry| (*entry.key(), *entry.value()))
96 .collect()
97 }
98
99 fn note_dirty_page(&self, page_id: PageId, rec_lsn: Lsn) {
100 self.dirty_pages.insert(page_id);
101 self.dirty_page_table.entry(page_id).or_insert(rec_lsn);
102 }
103
104 pub fn new_page(self: &Arc<Self>) -> QuillSQLResult<WritePageGuard> {
105 if !self.pool.has_free_frame() && self.replacer.read().size() == 0 {
106 return Err(QuillSQLError::Storage(
107 "Cannot new page because buffer pool is full and no page to evict".to_string(),
108 ));
109 }
110
111 let frame_id = self.allocate_frame()?;
112 let page_id = self.pool.allocate_page_id()?;
113 self.pool.insert_mapping(page_id, frame_id);
114
115 {
116 let mut meta = self.pool.frame_meta(frame_id);
117 meta.page_id = page_id;
118 meta.pin_count = 1;
119 meta.is_dirty = false;
120 meta.lsn = 0;
121 }
122
123 self.pool.reset_frame(frame_id);
124 self.replacer_record_access(frame_id)?;
125 self.mark_non_evictable(frame_id)?;
126 Ok(page::new_write_guard(Arc::clone(self), frame_id))
127 }
128
129 pub fn fetch_page_read(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<ReadPageGuard> {
130 if page_id == INVALID_PAGE_ID {
131 return Err(QuillSQLError::Storage(
132 "fetch_page_read: invalid page id".to_string(),
133 ));
134 }
135
136 let frame_id = self.ensure_frame(page_id)?;
137 {
138 let mut meta = self.pool.frame_meta(frame_id);
139 meta.pin_count += 1;
140 }
141 self.replacer_record_access(frame_id)?;
142 self.mark_non_evictable(frame_id)?;
143 Ok(page::new_read_guard(Arc::clone(self), frame_id))
144 }
145
146 pub fn fetch_page_write(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<WritePageGuard> {
147 if page_id == INVALID_PAGE_ID {
148 return Err(QuillSQLError::Storage(
149 "fetch_page_write: invalid page id".to_string(),
150 ));
151 }
152
153 let frame_id = self.ensure_frame(page_id)?;
154 {
155 let mut meta = self.pool.frame_meta(frame_id);
156 meta.pin_count += 1;
157 }
158 self.replacer_record_access(frame_id)?;
159 self.mark_non_evictable(frame_id)?;
160 Ok(page::new_write_guard(Arc::clone(self), frame_id))
161 }
162
163 pub fn complete_unpin(
164 &self,
165 page_id: PageId,
166 is_dirty: bool,
167 rec_lsn_hint: Option<Lsn>,
168 ) -> QuillSQLResult<()> {
169 if let Some(frame_id) = self.pool.lookup_frame(page_id) {
170 let mut meta = self.pool.frame_meta(frame_id);
171 if meta.pin_count > 0 {
172 meta.pin_count -= 1;
173 }
174 if is_dirty {
175 meta.is_dirty = true;
176 if let Some(lsn) = rec_lsn_hint {
177 meta.lsn = lsn;
178 }
179 self.note_dirty_page(page_id, rec_lsn_hint.unwrap_or(meta.lsn));
180 }
181 if meta.pin_count == 0 {
182 self.mark_evictable(frame_id)?;
183 }
184 }
185 Ok(())
186 }
187
188 pub fn fetch_table_page(
189 self: &Arc<Self>,
190 page_id: PageId,
191 schema: SchemaRef,
192 ) -> QuillSQLResult<(ReadPageGuard, TablePage)> {
193 let guard = self.fetch_page_read(page_id)?;
194 let (page, _) = TablePageCodec::decode(guard.data(), schema)?;
195 Ok((guard, page))
196 }
197
198 pub fn fetch_tree_page(
199 self: &Arc<Self>,
200 page_id: PageId,
201 key_schema: SchemaRef,
202 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreePage)> {
203 let guard = self.fetch_page_read(page_id)?;
204 let (page, _) = BPlusTreePageCodec::decode(guard.data(), key_schema.clone())?;
205 Ok((guard, page))
206 }
207
208 pub fn fetch_tree_internal_page(
209 self: &Arc<Self>,
210 page_id: PageId,
211 key_schema: SchemaRef,
212 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeInternalPage)> {
213 let guard = self.fetch_page_read(page_id)?;
214 let (page, _) = BPlusTreeInternalPageCodec::decode(guard.data(), key_schema.clone())?;
215 Ok((guard, page))
216 }
217
218 pub fn fetch_tree_leaf_page(
219 self: &Arc<Self>,
220 page_id: PageId,
221 key_schema: SchemaRef,
222 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeLeafPage)> {
223 let guard = self.fetch_page_read(page_id)?;
224 let (page, _) = BPlusTreeLeafPageCodec::decode(guard.data(), key_schema.clone())?;
225 Ok((guard, page))
226 }
227
228 pub fn fetch_header_page(
229 self: &Arc<Self>,
230 page_id: PageId,
231 ) -> QuillSQLResult<(ReadPageGuard, BPlusTreeHeaderPage)> {
232 let guard = self.fetch_page_read(page_id)?;
233 let (header, _) = BPlusTreeHeaderPageCodec::decode(guard.data())?;
234 Ok((guard, header))
235 }
236
237 pub fn prefetch_page(self: &Arc<Self>, page_id: PageId) -> QuillSQLResult<()> {
238 if let Ok(g) = self.fetch_page_read(page_id) {
239 drop(g);
240 }
241 Ok(())
242 }
243
244 pub fn flush_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
245 let Some(frame_id) = self.pool.lookup_frame(page_id) else {
246 return Ok(false);
247 };
248 let meta = self.pool.frame_meta(frame_id);
249 if !meta.is_dirty {
250 self.dirty_pages.remove(&page_id);
251 self.dirty_page_table.remove(&page_id);
252 return Ok(false);
253 }
254 let lsn = meta.lsn;
255 drop(meta);
256 self.ensure_wal_durable(lsn)?;
257 let bytes = {
258 let _lock = self.pool.frame_lock(frame_id).read();
259 let slice = unsafe { self.pool.frame_slice(frame_id) };
260 Bytes::copy_from_slice(slice)
261 };
262 self.pool.write_page_to_disk(page_id, bytes)?;
263 let mut meta = self.pool.frame_meta(frame_id);
264 meta.is_dirty = false;
265 self.dirty_pages.remove(&page_id);
266 self.dirty_page_table.remove(&page_id);
267 Ok(true)
268 }
269
270 pub fn flush_all_pages(&self) -> QuillSQLResult<()> {
271 if let Some(wal) = self.wal_manager.read().clone() {
272 wal.flush(None)?;
273 }
274 let dirty_ids: Vec<PageId> = self.dirty_pages.iter().map(|entry| *entry.key()).collect();
275 for page_id in dirty_ids {
276 let _ = self.flush_page(page_id)?;
277 }
278 Ok(())
279 }
280
281 pub fn delete_page(&self, page_id: PageId) -> QuillSQLResult<bool> {
282 let (guard, created_here) = if let Some(existing) = self.inflight_loads.get(&page_id) {
283 (existing.clone(), false)
284 } else {
285 let arc = Arc::new(Mutex::new(()));
286 self.inflight_loads.insert(page_id, arc.clone());
287 (arc, true)
288 };
289 let lock = guard.lock();
290 let result = self.delete_page_inner(page_id);
291 drop(lock);
292 if created_here {
293 self.inflight_loads.remove(&page_id);
294 }
295 result
296 }
297
298 fn delete_page_inner(&self, page_id: PageId) -> QuillSQLResult<bool> {
299 if let Some(frame_id) = self.pool.lookup_frame(page_id) {
300 let lock = self.pool.frame_lock(frame_id);
301 let guard = match lock.try_write() {
302 Some(g) => g,
303 None => return Ok(false),
304 };
305 drop(guard);
306 let meta = self.pool.frame_meta(frame_id);
307 if meta.page_id != page_id {
308 drop(meta);
309 self.pool.remove_mapping_if(page_id, frame_id);
310 return self.delete_page_inner(page_id);
311 }
312 if meta.pin_count > 0 {
313 drop(meta);
314 return Ok(false);
315 }
316 if !self.pool.remove_mapping_if(page_id, frame_id) {
317 drop(meta);
318 return self.delete_page_inner(page_id);
319 }
320 drop(meta);
321 self.pool.reset_frame(frame_id);
322 self.dirty_pages.remove(&page_id);
323 self.dirty_page_table.remove(&page_id);
324 {
325 let mut meta = self.pool.frame_meta(frame_id);
326 *meta = FrameMeta::default();
327 }
328 {
329 let mut rep = self.replacer.write();
330 let _ = rep.set_evictable(frame_id, true);
331 rep.remove(frame_id);
332 }
333 self.pool.push_free_frame(frame_id);
334 self.pool
335 .disk_scheduler()
336 .schedule_deallocate(page_id)?
337 .recv()
338 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
339 Ok(true)
340 } else {
341 self.pool
342 .disk_scheduler()
343 .schedule_deallocate(page_id)?
344 .recv()
345 .map_err(|e| QuillSQLError::Internal(format!("Channel disconnected: {}", e)))??;
346 Ok(true)
347 }
348 }
349
350 fn ensure_frame(&self, page_id: PageId) -> QuillSQLResult<FrameId> {
351 if let Some(frame_id) = self.pool.lookup_frame(page_id) {
352 self.replacer_record_access(frame_id)?;
353 if let Some(lfu) = &self.tiny_lfu {
354 lfu.write().admit_record(page_id as u64);
355 }
356 return Ok(frame_id);
357 }
358
359 let (guard, created_here) = if let Some(existing) = self.inflight_loads.get(&page_id) {
360 (existing.clone(), false)
361 } else {
362 let arc = Arc::new(Mutex::new(()));
363 self.inflight_loads.insert(page_id, arc.clone());
364 (arc, true)
365 };
366 let _lock = guard.lock();
367
368 if let Some(frame_id) = self.pool.lookup_frame(page_id) {
369 if created_here {
370 self.inflight_loads.remove(&page_id);
371 }
372 self.replacer_record_access(frame_id)?;
373 if let Some(lfu) = &self.tiny_lfu {
374 lfu.write().admit_record(page_id as u64);
375 }
376 return Ok(frame_id);
377 }
378
379 if let Some(lfu) = &self.tiny_lfu {
380 let estimate = lfu.read().estimate(page_id as u64);
381 if estimate == 0 && !self.pool.has_free_frame() && self.replacer.read().size() == 0 {
382 if created_here {
383 self.inflight_loads.remove(&page_id);
384 }
385 return Err(QuillSQLError::Storage(
386 "Cannot allocate frame: admission denied and no space".to_string(),
387 ));
388 }
389 }
390
391 let frame_id = self.allocate_frame()?;
392 self.pool.load_page_into_frame(page_id, frame_id)?;
393 self.pool.insert_mapping(page_id, frame_id);
394 {
395 let mut meta = self.pool.frame_meta(frame_id);
396 meta.page_id = page_id;
397 meta.pin_count = 0;
398 meta.is_dirty = false;
399 meta.lsn = 0;
400 }
401 if let Some(lfu) = &self.tiny_lfu {
402 lfu.write().admit_record(page_id as u64);
403 }
404 if created_here {
405 self.inflight_loads.remove(&page_id);
406 }
407 self.replacer_record_access(frame_id)?;
408 Ok(frame_id)
409 }
410
411 fn allocate_frame(&self) -> QuillSQLResult<FrameId> {
412 if let Some(frame_id) = self.pool.pop_free_frame() {
413 return Ok(frame_id);
414 }
415 self.evict_victim_frame()
416 }
417
418 fn replacer_record_access(&self, frame_id: FrameId) -> QuillSQLResult<()> {
419 let mut rep = self.replacer.write();
420 let _ = rep.record_access(frame_id);
421 Ok(())
422 }
423
424 fn evict_victim_frame(&self) -> QuillSQLResult<FrameId> {
425 loop {
426 let victim = {
427 let mut rep = self.replacer.write();
428 match rep.evict() {
429 Some(frame_id) => frame_id,
430 None => {
431 return Err(QuillSQLError::Storage(
432 "Cannot allocate frame: buffer pool is full".to_string(),
433 ))
434 }
435 }
436 };
437
438 let (page_id, pin_count, is_dirty, lsn) = {
439 let meta = self.pool.frame_meta(victim);
440 (meta.page_id, meta.pin_count, meta.is_dirty, meta.lsn)
441 };
442
443 if pin_count > 0 {
444 let mut rep = self.replacer.write();
445 let _ = rep.record_access(victim);
446 let _ = rep.set_evictable(victim, false);
447 continue;
448 }
449
450 if page_id != INVALID_PAGE_ID {
451 if is_dirty {
452 self.ensure_wal_durable(lsn)?;
453 let bytes = Bytes::copy_from_slice(unsafe { self.pool.frame_slice(victim) });
454 self.pool.write_page_to_disk(page_id, bytes)?;
455 self.dirty_pages.remove(&page_id);
456 self.dirty_page_table.remove(&page_id);
457 }
458 self.pool.remove_mapping(page_id);
459 }
460
461 self.pool.clear_frame_meta(victim);
462 self.pool.reset_frame(victim);
463 return Ok(victim);
464 }
465 }
466
467 fn mark_evictable(&self, frame_id: FrameId) -> QuillSQLResult<()> {
468 let mut rep = self.replacer.write();
469 let _ = rep.set_evictable(frame_id, true);
470 Ok(())
471 }
472
473 fn mark_non_evictable(&self, frame_id: FrameId) -> QuillSQLResult<()> {
474 let mut rep = self.replacer.write();
475 let _ = rep.set_evictable(frame_id, false);
476 Ok(())
477 }
478
479 fn ensure_wal_durable(&self, lsn: Lsn) -> QuillSQLResult<()> {
480 if lsn == 0 {
481 return Ok(());
482 }
483 if let Some(wal) = self.wal_manager.read().clone() {
484 if lsn > wal.durable_lsn() {
485 wal.flush(Some(lsn))?;
486 if wal.durable_lsn() < lsn {
487 return Err(QuillSQLError::Internal(format!(
488 "Flush blocked: page_lsn={} > durable_lsn={}",
489 lsn,
490 wal.durable_lsn()
491 )));
492 }
493 }
494 }
495 Ok(())
496 }
497
498 pub fn clone_arc(self: &Arc<Self>) -> Arc<Self> {
499 self.clone()
500 }
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506 use crate::storage::disk_manager::DiskManager;
507 use crate::storage::disk_scheduler::DiskScheduler;
508 use std::sync::{Arc, Barrier};
509 use std::thread;
510 use tempfile::TempDir;
511
512 fn setup_manager(num_pages: usize) -> (TempDir, Arc<BufferManager>) {
513 let temp_dir = TempDir::new().unwrap();
514 let db_file = temp_dir.path().join("test.db");
515 let disk_manager = Arc::new(DiskManager::try_new(db_file).unwrap());
516 let disk_scheduler = Arc::new(DiskScheduler::new(disk_manager));
517 let manager = Arc::new(BufferManager::new(num_pages, disk_scheduler));
518 (temp_dir, manager)
519 }
520
521 #[test]
522 fn new_page_initializes_frame() {
523 let (_tmp, manager) = setup_manager(2);
524 let guard = manager.new_page().unwrap();
525 let page_id = guard.page_id();
526 let frame_id = guard.frame_id();
527
528 assert!(guard.data().iter().all(|b| *b == 0));
530 assert!(!guard.is_dirty());
531 assert_eq!(guard.lsn(), 0);
532
533 drop(guard);
534
535 let meta = manager.buffer_pool().frame_meta(frame_id).clone();
536 assert_eq!(meta.page_id, page_id);
537 assert_eq!(meta.pin_count, 0);
538 assert!(!meta.is_dirty);
539 }
540
541 #[test]
542 fn fetch_page_read_increments_pin_and_resets_on_drop() {
543 let (_tmp, manager) = setup_manager(2);
544 let guard = manager.new_page().unwrap();
545 let page_id = guard.page_id();
546 let frame_id = guard.frame_id();
547 drop(guard);
548
549 {
550 let read_guard = manager.fetch_page_read(page_id).unwrap();
551 assert_eq!(read_guard.pin_count(), 1);
552 assert_eq!(read_guard.frame_id(), frame_id);
553 }
554
555 let meta = manager.buffer_pool().frame_meta(frame_id).clone();
556 assert_eq!(meta.pin_count, 0);
557 }
558
559 #[test]
560 fn fetch_page_write_marks_dirty_and_tracks_lsn() {
561 let (_tmp, manager) = setup_manager(2);
562 let mut guard = manager.new_page().unwrap();
563 let page_id = guard.page_id();
564 guard.data_mut()[0] = 7;
565 guard.set_lsn(99);
566 guard.mark_dirty();
567 drop(guard);
568
569 let mut write_guard = manager.fetch_page_write(page_id).unwrap();
570 assert!(write_guard.is_dirty());
571 assert_eq!(write_guard.lsn(), 99);
572 write_guard.data_mut()[1] = 8;
573 drop(write_guard);
574
575 let meta = manager
576 .buffer_pool()
577 .frame_meta(manager.buffer_pool().lookup_frame(page_id).unwrap())
578 .clone();
579 assert!(meta.is_dirty);
580 assert_eq!(meta.lsn, 99);
581 assert_eq!(meta.pin_count, 0);
582 }
583
584 #[test]
585 fn flush_page_writes_back_and_clears_dirty_flag() {
586 let (_tmp, manager) = setup_manager(2);
587 let mut guard = manager.new_page().unwrap();
588 let page_id = guard.page_id();
589 guard.data_mut()[0] = 42;
590 guard.set_lsn(123);
591 guard.mark_dirty();
592 drop(guard);
593
594 assert!(manager.flush_page(page_id).unwrap());
595
596 let meta = manager
597 .buffer_pool()
598 .frame_meta(manager.buffer_pool().lookup_frame(page_id).unwrap())
599 .clone();
600 assert!(!meta.is_dirty);
601 }
602
603 #[test]
604 fn delete_page_releases_frame() {
605 let (_tmp, manager) = setup_manager(2);
606 let page_id = {
607 let guard = manager.new_page().unwrap();
608 guard.page_id()
609 };
610
611 assert!(manager.delete_page(page_id).unwrap());
612 assert!(manager.buffer_pool().lookup_frame(page_id).is_none());
613 assert!(manager.buffer_pool().has_free_frame());
614
615 let new_guard = manager.new_page().unwrap();
617 assert!(new_guard.frame_id() < manager.buffer_pool().capacity());
618 }
619
620 #[test]
621 fn concurrent_reads_do_not_leak_pins() {
622 const THREADS: usize = 8;
623 let (_tmp, manager) = setup_manager(4);
624 let (page_id, frame_id) = {
625 let mut guard = manager.new_page().unwrap();
626 guard.data_mut()[0] = 42;
627 (guard.page_id(), guard.frame_id())
628 };
629
630 let barrier = Arc::new(Barrier::new(THREADS));
631 let mut handles = Vec::with_capacity(THREADS);
632 for _ in 0..THREADS {
633 let mgr = manager.clone();
634 let barrier = barrier.clone();
635 handles.push(thread::spawn(move || {
636 barrier.wait();
637 for _ in 0..50 {
638 let guard = mgr.fetch_page_read(page_id).expect("read page");
639 assert_eq!(guard.data()[0], 42);
640 }
641 }));
642 }
643
644 for handle in handles {
645 handle.join().unwrap();
646 }
647
648 let pool = manager.buffer_pool();
649 let meta = pool.frame_meta(frame_id);
650 assert_eq!(meta.pin_count, 0);
651 assert_eq!(meta.page_id, page_id);
652 }
653
654 #[test]
655 fn concurrent_writes_mark_dirty_and_flush_once() {
656 const THREADS: usize = 4;
657 let (_tmp, manager) = setup_manager(4);
658 let (page_id, frame_id) = {
659 let guard = manager.new_page().unwrap();
660 (guard.page_id(), guard.frame_id())
661 };
662
663 let barrier = Arc::new(Barrier::new(THREADS));
664 let mut handles = Vec::with_capacity(THREADS);
665 for tid in 0..THREADS {
666 let mgr = manager.clone();
667 let barrier = barrier.clone();
668 handles.push(thread::spawn(move || {
669 let lsn = (tid as Lsn) + 1;
670 barrier.wait();
671 for _ in 0..25 {
672 let mut guard = mgr.fetch_page_write(page_id).expect("write guard");
673 guard.data_mut()[tid] = (tid as u8) + 1;
674 guard.set_lsn(lsn);
675 guard.mark_dirty();
676 }
677 }));
678 }
679
680 for handle in handles {
681 handle.join().unwrap();
682 }
683
684 {
685 let pool = manager.buffer_pool();
686 let meta = pool.frame_meta(frame_id);
687 assert!(meta.is_dirty);
688 assert_eq!(meta.pin_count, 0);
689 assert_eq!(meta.page_id, page_id);
690 }
691
692 assert!(manager.flush_page(page_id).unwrap());
693 {
694 let pool = manager.buffer_pool();
695 let meta = pool.frame_meta(frame_id);
696 assert!(!meta.is_dirty);
697 assert_eq!(meta.pin_count, 0);
698 }
699
700 let read_back = manager
701 .buffer_pool()
702 .disk_scheduler()
703 .schedule_read(page_id)
704 .unwrap()
705 .recv()
706 .unwrap()
707 .unwrap();
708 for tid in 0..THREADS {
709 assert_eq!(read_back[tid], (tid as u8) + 1);
710 }
711 }
712}