quill_sql/recovery/
resource_manager.rs

1use std::collections::HashMap;
2use std::sync::{Arc, RwLock};
3
4use once_cell::sync::Lazy;
5use std::sync::OnceLock;
6
7use crate::buffer::BufferManager;
8use crate::error::{QuillSQLError, QuillSQLResult};
9use crate::recovery::wal::codec::{
10    decode_page_delta, decode_page_write, ResourceManagerId, WalFrame,
11};
12use crate::recovery::Lsn;
13use crate::storage::disk_scheduler::DiskScheduler;
14
15#[derive(Clone)]
16pub struct RedoContext {
17    pub disk_scheduler: Arc<DiskScheduler>,
18    pub buffer_pool: Option<Arc<BufferManager>>,
19}
20
21#[derive(Clone)]
22pub struct UndoContext {
23    pub disk_scheduler: Arc<DiskScheduler>,
24    pub buffer_pool: Option<Arc<BufferManager>>,
25}
26
27pub trait ResourceManager: Send + Sync {
28    fn redo(&self, frame: &WalFrame, ctx: &RedoContext) -> QuillSQLResult<usize>;
29    fn undo(&self, frame: &WalFrame, ctx: &UndoContext) -> QuillSQLResult<()>;
30
31    fn transaction_id(&self, _frame: &WalFrame) -> Option<u64> {
32        None
33    }
34}
35
36static REGISTRY: Lazy<RwLock<HashMap<ResourceManagerId, Arc<dyn ResourceManager>>>> =
37    Lazy::new(|| RwLock::new(HashMap::new()));
38
39pub fn register_resource_manager(id: ResourceManagerId, manager: Arc<dyn ResourceManager>) {
40    let mut guard = REGISTRY
41        .write()
42        .expect("resource manager registry poisoned");
43    guard.insert(id, manager);
44}
45
46pub fn get_resource_manager(id: ResourceManagerId) -> Option<Arc<dyn ResourceManager>> {
47    let guard = REGISTRY.read().expect("resource manager registry poisoned");
48    guard.get(&id).cloned()
49}
50
51#[derive(Default)]
52struct PageResourceManager;
53
54impl PageResourceManager {
55    fn page_requires_redo(
56        &self,
57        ctx: &RedoContext,
58        page_id: u32,
59        record_lsn: Lsn,
60    ) -> QuillSQLResult<bool> {
61        if let Some(bpm) = &ctx.buffer_pool {
62            match bpm.fetch_page_read(page_id) {
63                Ok(guard) => Ok(guard.lsn() < record_lsn),
64                Err(_) => Ok(true),
65            }
66        } else {
67            Ok(true)
68        }
69    }
70
71    fn redo_page_write(
72        &self,
73        ctx: &RedoContext,
74        payload: crate::recovery::wal::codec::PageWritePayload,
75    ) -> QuillSQLResult<()> {
76        debug_assert_eq!(payload.page_image.len(), crate::buffer::PAGE_SIZE);
77        let bytes = bytes::Bytes::from(payload.page_image);
78        let rx = ctx.disk_scheduler.schedule_write(payload.page_id, bytes)?;
79        rx.recv().map_err(|e| {
80            QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
81        })??;
82        Ok(())
83    }
84
85    fn redo_page_delta(
86        &self,
87        ctx: &RedoContext,
88        payload: crate::recovery::wal::codec::PageDeltaPayload,
89    ) -> QuillSQLResult<()> {
90        let rx = ctx.disk_scheduler.schedule_read(payload.page_id)?;
91        let buf: bytes::BytesMut = rx.recv().map_err(|e| {
92            QuillSQLError::Internal(format!("WAL recovery read recv failed: {}", e))
93        })??;
94        if buf.len() != crate::buffer::PAGE_SIZE {
95            return Err(QuillSQLError::Internal(format!(
96                "Unexpected page size {} while applying delta",
97                buf.len()
98            )));
99        }
100        let mut page_bytes = buf.to_vec();
101        let start = payload.offset as usize;
102        if start >= crate::buffer::PAGE_SIZE {
103            return Err(QuillSQLError::Internal(format!(
104                "PageDelta start out of bounds: offset={} page_size={}",
105                start,
106                crate::buffer::PAGE_SIZE
107            )));
108        }
109        let end = match start.checked_add(payload.data.len()) {
110            Some(e) if e <= crate::buffer::PAGE_SIZE => e,
111            _ => {
112                return Err(QuillSQLError::Internal(format!(
113                    "PageDelta out of bounds: offset={} len={} page_size={}",
114                    start,
115                    payload.data.len(),
116                    crate::buffer::PAGE_SIZE
117                )))
118            }
119        };
120        page_bytes[start..end].copy_from_slice(&payload.data);
121        let rxw = ctx
122            .disk_scheduler
123            .schedule_write(payload.page_id, bytes::Bytes::from(page_bytes))?;
124        rxw.recv().map_err(|e| {
125            QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
126        })??;
127        Ok(())
128    }
129}
130
131impl ResourceManager for PageResourceManager {
132    fn redo(&self, frame: &WalFrame, ctx: &RedoContext) -> QuillSQLResult<usize> {
133        match frame.info {
134            0 => {
135                let payload = decode_page_write(&frame.body)?;
136                if !self.page_requires_redo(ctx, payload.page_id, frame.lsn)? {
137                    return Ok(0);
138                }
139                self.redo_page_write(ctx, payload)?;
140                Ok(1)
141            }
142            1 => {
143                let payload = decode_page_delta(&frame.body)?;
144                if !self.page_requires_redo(ctx, payload.page_id, frame.lsn)? {
145                    return Ok(0);
146                }
147                self.redo_page_delta(ctx, payload)?;
148                Ok(1)
149            }
150            other => Err(QuillSQLError::Internal(format!(
151                "Unknown Page info kind: {}",
152                other
153            ))),
154        }
155    }
156
157    fn undo(&self, _frame: &WalFrame, _ctx: &UndoContext) -> QuillSQLResult<()> {
158        Ok(())
159    }
160}
161
162static DEFAULT_RESOURCE_MANAGERS: OnceLock<()> = OnceLock::new();
163
164pub fn ensure_default_resource_managers_registered() {
165    DEFAULT_RESOURCE_MANAGERS.get_or_init(|| {
166        register_resource_manager(
167            ResourceManagerId::Page,
168            Arc::new(PageResourceManager::default()),
169        );
170        crate::storage::heap_recovery::ensure_heap_resource_manager_registered();
171    });
172}