quill_sql/recovery/
resource_manager.rs1use std::collections::HashMap;
2use std::sync::{Arc, RwLock};
3
4use once_cell::sync::Lazy;
5use std::sync::OnceLock;
6
7use crate::buffer::BufferManager;
8use crate::error::{QuillSQLError, QuillSQLResult};
9use crate::recovery::wal::codec::{
10 decode_page_delta, decode_page_write, ResourceManagerId, WalFrame,
11};
12use crate::recovery::Lsn;
13use crate::storage::disk_scheduler::DiskScheduler;
14
15#[derive(Clone)]
16pub struct RedoContext {
17 pub disk_scheduler: Arc<DiskScheduler>,
18 pub buffer_pool: Option<Arc<BufferManager>>,
19}
20
21#[derive(Clone)]
22pub struct UndoContext {
23 pub disk_scheduler: Arc<DiskScheduler>,
24 pub buffer_pool: Option<Arc<BufferManager>>,
25}
26
27pub trait ResourceManager: Send + Sync {
28 fn redo(&self, frame: &WalFrame, ctx: &RedoContext) -> QuillSQLResult<usize>;
29 fn undo(&self, frame: &WalFrame, ctx: &UndoContext) -> QuillSQLResult<()>;
30
31 fn transaction_id(&self, _frame: &WalFrame) -> Option<u64> {
32 None
33 }
34}
35
36static REGISTRY: Lazy<RwLock<HashMap<ResourceManagerId, Arc<dyn ResourceManager>>>> =
37 Lazy::new(|| RwLock::new(HashMap::new()));
38
39pub fn register_resource_manager(id: ResourceManagerId, manager: Arc<dyn ResourceManager>) {
40 let mut guard = REGISTRY
41 .write()
42 .expect("resource manager registry poisoned");
43 guard.insert(id, manager);
44}
45
46pub fn get_resource_manager(id: ResourceManagerId) -> Option<Arc<dyn ResourceManager>> {
47 let guard = REGISTRY.read().expect("resource manager registry poisoned");
48 guard.get(&id).cloned()
49}
50
51#[derive(Default)]
52struct PageResourceManager;
53
54impl PageResourceManager {
55 fn page_requires_redo(
56 &self,
57 ctx: &RedoContext,
58 page_id: u32,
59 record_lsn: Lsn,
60 ) -> QuillSQLResult<bool> {
61 if let Some(bpm) = &ctx.buffer_pool {
62 match bpm.fetch_page_read(page_id) {
63 Ok(guard) => Ok(guard.lsn() < record_lsn),
64 Err(_) => Ok(true),
65 }
66 } else {
67 Ok(true)
68 }
69 }
70
71 fn redo_page_write(
72 &self,
73 ctx: &RedoContext,
74 payload: crate::recovery::wal::codec::PageWritePayload,
75 ) -> QuillSQLResult<()> {
76 debug_assert_eq!(payload.page_image.len(), crate::buffer::PAGE_SIZE);
77 let bytes = bytes::Bytes::from(payload.page_image);
78 let rx = ctx.disk_scheduler.schedule_write(payload.page_id, bytes)?;
79 rx.recv().map_err(|e| {
80 QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
81 })??;
82 Ok(())
83 }
84
85 fn redo_page_delta(
86 &self,
87 ctx: &RedoContext,
88 payload: crate::recovery::wal::codec::PageDeltaPayload,
89 ) -> QuillSQLResult<()> {
90 let rx = ctx.disk_scheduler.schedule_read(payload.page_id)?;
91 let buf: bytes::BytesMut = rx.recv().map_err(|e| {
92 QuillSQLError::Internal(format!("WAL recovery read recv failed: {}", e))
93 })??;
94 if buf.len() != crate::buffer::PAGE_SIZE {
95 return Err(QuillSQLError::Internal(format!(
96 "Unexpected page size {} while applying delta",
97 buf.len()
98 )));
99 }
100 let mut page_bytes = buf.to_vec();
101 let start = payload.offset as usize;
102 if start >= crate::buffer::PAGE_SIZE {
103 return Err(QuillSQLError::Internal(format!(
104 "PageDelta start out of bounds: offset={} page_size={}",
105 start,
106 crate::buffer::PAGE_SIZE
107 )));
108 }
109 let end = match start.checked_add(payload.data.len()) {
110 Some(e) if e <= crate::buffer::PAGE_SIZE => e,
111 _ => {
112 return Err(QuillSQLError::Internal(format!(
113 "PageDelta out of bounds: offset={} len={} page_size={}",
114 start,
115 payload.data.len(),
116 crate::buffer::PAGE_SIZE
117 )))
118 }
119 };
120 page_bytes[start..end].copy_from_slice(&payload.data);
121 let rxw = ctx
122 .disk_scheduler
123 .schedule_write(payload.page_id, bytes::Bytes::from(page_bytes))?;
124 rxw.recv().map_err(|e| {
125 QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
126 })??;
127 Ok(())
128 }
129}
130
131impl ResourceManager for PageResourceManager {
132 fn redo(&self, frame: &WalFrame, ctx: &RedoContext) -> QuillSQLResult<usize> {
133 match frame.info {
134 0 => {
135 let payload = decode_page_write(&frame.body)?;
136 if !self.page_requires_redo(ctx, payload.page_id, frame.lsn)? {
137 return Ok(0);
138 }
139 self.redo_page_write(ctx, payload)?;
140 Ok(1)
141 }
142 1 => {
143 let payload = decode_page_delta(&frame.body)?;
144 if !self.page_requires_redo(ctx, payload.page_id, frame.lsn)? {
145 return Ok(0);
146 }
147 self.redo_page_delta(ctx, payload)?;
148 Ok(1)
149 }
150 other => Err(QuillSQLError::Internal(format!(
151 "Unknown Page info kind: {}",
152 other
153 ))),
154 }
155 }
156
157 fn undo(&self, _frame: &WalFrame, _ctx: &UndoContext) -> QuillSQLResult<()> {
158 Ok(())
159 }
160}
161
162static DEFAULT_RESOURCE_MANAGERS: OnceLock<()> = OnceLock::new();
163
164pub fn ensure_default_resource_managers_registered() {
165 DEFAULT_RESOURCE_MANAGERS.get_or_init(|| {
166 register_resource_manager(
167 ResourceManagerId::Page,
168 Arc::new(PageResourceManager::default()),
169 );
170 crate::storage::heap_recovery::ensure_heap_resource_manager_registered();
171 });
172}