quill_sql/recovery/
resource_manager.rs

1use std::collections::HashMap;
2use std::sync::{Arc, RwLock};
3
4use once_cell::sync::Lazy;
5use std::sync::OnceLock;
6
7use crate::buffer::BufferManager;
8use crate::error::{QuillSQLError, QuillSQLResult};
9use crate::recovery::wal::codec::{decode_page_write, ResourceManagerId, WalFrame};
10use crate::recovery::Lsn;
11use crate::storage::disk_scheduler::DiskScheduler;
12
13#[derive(Clone)]
14pub struct RedoContext {
15    pub disk_scheduler: Arc<DiskScheduler>,
16    pub buffer_pool: Option<Arc<BufferManager>>,
17}
18
19#[derive(Clone)]
20pub struct UndoContext {
21    pub disk_scheduler: Arc<DiskScheduler>,
22    pub buffer_pool: Option<Arc<BufferManager>>,
23}
24
25pub trait ResourceManager: Send + Sync {
26    fn redo(&self, frame: &WalFrame, ctx: &RedoContext) -> QuillSQLResult<usize>;
27    fn undo(&self, frame: &WalFrame, ctx: &UndoContext) -> QuillSQLResult<()>;
28
29    fn transaction_id(&self, _frame: &WalFrame) -> Option<u64> {
30        None
31    }
32}
33
34static REGISTRY: Lazy<RwLock<HashMap<ResourceManagerId, Arc<dyn ResourceManager>>>> =
35    Lazy::new(|| RwLock::new(HashMap::new()));
36
37pub fn register_resource_manager(id: ResourceManagerId, manager: Arc<dyn ResourceManager>) {
38    let mut guard = REGISTRY
39        .write()
40        .expect("resource manager registry poisoned");
41    guard.insert(id, manager);
42}
43
44pub fn get_resource_manager(id: ResourceManagerId) -> Option<Arc<dyn ResourceManager>> {
45    let guard = REGISTRY.read().expect("resource manager registry poisoned");
46    guard.get(&id).cloned()
47}
48
49#[derive(Default)]
50struct PageResourceManager;
51
52impl PageResourceManager {
53    fn page_requires_redo(
54        &self,
55        ctx: &RedoContext,
56        page_id: u32,
57        record_lsn: Lsn,
58    ) -> QuillSQLResult<bool> {
59        if let Some(bpm) = &ctx.buffer_pool {
60            match bpm.fetch_page_read(page_id) {
61                Ok(guard) => Ok(guard.lsn() < record_lsn),
62                Err(_) => Ok(true),
63            }
64        } else {
65            Ok(true)
66        }
67    }
68
69    fn redo_page_write(
70        &self,
71        ctx: &RedoContext,
72        payload: crate::recovery::wal::codec::PageWritePayload,
73    ) -> QuillSQLResult<()> {
74        debug_assert_eq!(payload.page_image.len(), crate::buffer::PAGE_SIZE);
75        let bytes = bytes::Bytes::from(payload.page_image);
76        let rx = ctx.disk_scheduler.schedule_write(payload.page_id, bytes)?;
77        rx.recv().map_err(|e| {
78            QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
79        })??;
80        Ok(())
81    }
82}
83
84impl ResourceManager for PageResourceManager {
85    fn redo(&self, frame: &WalFrame, ctx: &RedoContext) -> QuillSQLResult<usize> {
86        if frame.info != 0 {
87            return Err(QuillSQLError::Internal(format!(
88                "Unknown Page info kind: {}",
89                frame.info
90            )));
91        }
92        let payload = decode_page_write(&frame.body)?;
93        if !self.page_requires_redo(ctx, payload.page_id, frame.lsn)? {
94            return Ok(0);
95        }
96        self.redo_page_write(ctx, payload)?;
97        Ok(1)
98    }
99
100    fn undo(&self, _frame: &WalFrame, _ctx: &UndoContext) -> QuillSQLResult<()> {
101        Ok(())
102    }
103}
104
105static DEFAULT_RESOURCE_MANAGERS: OnceLock<()> = OnceLock::new();
106
107pub fn ensure_default_resource_managers_registered() {
108    DEFAULT_RESOURCE_MANAGERS.get_or_init(|| {
109        register_resource_manager(
110            ResourceManagerId::Page,
111            Arc::new(PageResourceManager::default()),
112        );
113        crate::storage::heap_recovery::ensure_heap_resource_manager_registered();
114        crate::storage::index::index_recovery::ensure_index_resource_manager_registered();
115    });
116}