quill_sql/recovery/
resource_manager.rs1use std::collections::HashMap;
2use std::sync::{Arc, RwLock};
3
4use once_cell::sync::Lazy;
5use std::sync::OnceLock;
6
7use crate::buffer::BufferManager;
8use crate::error::{QuillSQLError, QuillSQLResult};
9use crate::recovery::wal::codec::{decode_page_write, ResourceManagerId, WalFrame};
10use crate::recovery::Lsn;
11use crate::storage::disk_scheduler::DiskScheduler;
12
13#[derive(Clone)]
14pub struct RedoContext {
15 pub disk_scheduler: Arc<DiskScheduler>,
16 pub buffer_pool: Option<Arc<BufferManager>>,
17}
18
19#[derive(Clone)]
20pub struct UndoContext {
21 pub disk_scheduler: Arc<DiskScheduler>,
22 pub buffer_pool: Option<Arc<BufferManager>>,
23}
24
25pub trait ResourceManager: Send + Sync {
26 fn redo(&self, frame: &WalFrame, ctx: &RedoContext) -> QuillSQLResult<usize>;
27 fn undo(&self, frame: &WalFrame, ctx: &UndoContext) -> QuillSQLResult<()>;
28
29 fn transaction_id(&self, _frame: &WalFrame) -> Option<u64> {
30 None
31 }
32}
33
34static REGISTRY: Lazy<RwLock<HashMap<ResourceManagerId, Arc<dyn ResourceManager>>>> =
35 Lazy::new(|| RwLock::new(HashMap::new()));
36
37pub fn register_resource_manager(id: ResourceManagerId, manager: Arc<dyn ResourceManager>) {
38 let mut guard = REGISTRY
39 .write()
40 .expect("resource manager registry poisoned");
41 guard.insert(id, manager);
42}
43
44pub fn get_resource_manager(id: ResourceManagerId) -> Option<Arc<dyn ResourceManager>> {
45 let guard = REGISTRY.read().expect("resource manager registry poisoned");
46 guard.get(&id).cloned()
47}
48
49#[derive(Default)]
50struct PageResourceManager;
51
52impl PageResourceManager {
53 fn page_requires_redo(
54 &self,
55 ctx: &RedoContext,
56 page_id: u32,
57 record_lsn: Lsn,
58 ) -> QuillSQLResult<bool> {
59 if let Some(bpm) = &ctx.buffer_pool {
60 match bpm.fetch_page_read(page_id) {
61 Ok(guard) => Ok(guard.lsn() < record_lsn),
62 Err(_) => Ok(true),
63 }
64 } else {
65 Ok(true)
66 }
67 }
68
69 fn redo_page_write(
70 &self,
71 ctx: &RedoContext,
72 payload: crate::recovery::wal::codec::PageWritePayload,
73 ) -> QuillSQLResult<()> {
74 debug_assert_eq!(payload.page_image.len(), crate::buffer::PAGE_SIZE);
75 let bytes = bytes::Bytes::from(payload.page_image);
76 let rx = ctx.disk_scheduler.schedule_write(payload.page_id, bytes)?;
77 rx.recv().map_err(|e| {
78 QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
79 })??;
80 Ok(())
81 }
82}
83
84impl ResourceManager for PageResourceManager {
85 fn redo(&self, frame: &WalFrame, ctx: &RedoContext) -> QuillSQLResult<usize> {
86 if frame.info != 0 {
87 return Err(QuillSQLError::Internal(format!(
88 "Unknown Page info kind: {}",
89 frame.info
90 )));
91 }
92 let payload = decode_page_write(&frame.body)?;
93 if !self.page_requires_redo(ctx, payload.page_id, frame.lsn)? {
94 return Ok(0);
95 }
96 self.redo_page_write(ctx, payload)?;
97 Ok(1)
98 }
99
100 fn undo(&self, _frame: &WalFrame, _ctx: &UndoContext) -> QuillSQLResult<()> {
101 Ok(())
102 }
103}
104
105static DEFAULT_RESOURCE_MANAGERS: OnceLock<()> = OnceLock::new();
106
107pub fn ensure_default_resource_managers_registered() {
108 DEFAULT_RESOURCE_MANAGERS.get_or_init(|| {
109 register_resource_manager(
110 ResourceManagerId::Page,
111 Arc::new(PageResourceManager::default()),
112 );
113 crate::storage::heap_recovery::ensure_heap_resource_manager_registered();
114 crate::storage::index::index_recovery::ensure_index_resource_manager_registered();
115 });
116}