quill_sql/storage/
heap_recovery.rs

1use std::sync::Arc;
2
3use bytes::{Bytes, BytesMut};
4
5use crate::buffer::PAGE_SIZE;
6use crate::error::QuillSQLResult;
7use crate::recovery::resource_manager::{
8    register_resource_manager, RedoContext, ResourceManager, UndoContext,
9};
10use crate::recovery::wal::codec::{ResourceManagerId, WalFrame};
11use crate::storage::codec::TablePageHeaderCodec;
12use crate::storage::heap::wal_codec::{decode_heap_record, HeapRecordPayload, TupleMetaRepr};
13use crate::storage::page::{RecordId, TupleMeta};
14use crate::storage::table_heap::TableHeap;
15use std::sync::OnceLock;
16
17#[derive(Default)]
18struct HeapResourceManager;
19
20impl HeapResourceManager {
21    fn decode_payload(&self, frame: &WalFrame) -> QuillSQLResult<HeapRecordPayload> {
22        decode_heap_record(&frame.body, frame.info)
23    }
24
25    fn heap_txn_id(payload: &HeapRecordPayload) -> u64 {
26        match payload {
27            HeapRecordPayload::Insert(p) => p.op_txn_id,
28            HeapRecordPayload::Update(p) => p.op_txn_id,
29            HeapRecordPayload::Delete(p) => p.op_txn_id,
30        }
31    }
32
33    fn apply_tuple_meta_flag(
34        &self,
35        ctx: &UndoContext,
36        page_id: u32,
37        slot_idx: usize,
38        deleted: bool,
39    ) -> QuillSQLResult<()> {
40        if let Some(bpm) = &ctx.buffer_pool {
41            let rid = RecordId::new(page_id, slot_idx as u32);
42            let guard = bpm.fetch_page_read(page_id)?;
43            let (header, _hdr_len) = TablePageHeaderCodec::decode(guard.data())?;
44            drop(guard);
45            if slot_idx >= header.tuple_infos.len() {
46                return Ok(());
47            }
48            let mut new_meta = header.tuple_infos[slot_idx].meta;
49            new_meta.is_deleted = deleted;
50            let heap = TableHeap::recovery_view(bpm.clone());
51            let _ = heap.recover_set_tuple_meta(rid, new_meta);
52            let _ = bpm.flush_page(page_id);
53            return Ok(());
54        }
55        Ok(())
56    }
57
58    fn restore_tuple(
59        &self,
60        ctx: &UndoContext,
61        page_id: u32,
62        slot_idx: usize,
63        old_meta: TupleMetaRepr,
64        old_bytes: &[u8],
65    ) -> QuillSQLResult<()> {
66        if let Some(bpm) = &ctx.buffer_pool {
67            let heap = TableHeap::recovery_view(bpm.clone());
68            let rid = RecordId::new(page_id, slot_idx as u32);
69            let _ = heap.recover_set_tuple_bytes(rid, old_bytes);
70            let restored_meta: TupleMeta = old_meta.into();
71            let _ = heap.recover_set_tuple_meta(rid, restored_meta);
72            let _ = bpm.flush_page(page_id);
73            return Ok(());
74        }
75
76        let rx = ctx.disk_scheduler.schedule_read(page_id)?;
77        let buf: BytesMut = rx.recv().map_err(|e| {
78            crate::error::QuillSQLError::Internal(format!("WAL recovery read recv failed: {}", e))
79        })??;
80        let page_bytes = buf.to_vec();
81        let (mut header, _hdr_len) = TablePageHeaderCodec::decode(&page_bytes)?;
82        if slot_idx >= header.tuple_infos.len() {
83            return Ok(());
84        }
85        let tuple_count = header.tuple_infos.len();
86        let mut tuples_bytes: Vec<Vec<u8>> = Vec::with_capacity(tuple_count);
87        for i in 0..tuple_count {
88            let info = &header.tuple_infos[i];
89            let slice = &page_bytes[info.offset as usize..(info.offset + info.size) as usize];
90            if i == slot_idx {
91                tuples_bytes.push(old_bytes.to_vec());
92            } else {
93                tuples_bytes.push(slice.to_vec());
94            }
95        }
96        let mut tail = PAGE_SIZE;
97        for i in 0..tuple_count {
98            let size = tuples_bytes[i].len();
99            tail = tail.saturating_sub(size);
100            header.tuple_infos[i].offset = tail as u16;
101            header.tuple_infos[i].size = size as u16;
102        }
103        let restored_meta: TupleMeta = old_meta.into();
104        header.tuple_infos[slot_idx].meta = restored_meta;
105        let new_header_bytes = TablePageHeaderCodec::encode(&header);
106        let mut new_page = vec![0u8; PAGE_SIZE];
107        let max_hdr = std::cmp::min(new_header_bytes.len(), PAGE_SIZE);
108        new_page[0..max_hdr].copy_from_slice(&new_header_bytes[..max_hdr]);
109        for i in 0..tuple_count {
110            let off = header.tuple_infos[i].offset as usize;
111            let sz = header.tuple_infos[i].size as usize;
112            if off + sz <= PAGE_SIZE {
113                new_page[off..off + sz].copy_from_slice(&tuples_bytes[i][..sz]);
114            }
115        }
116        let rxw = ctx
117            .disk_scheduler
118            .schedule_write(page_id, Bytes::from(new_page))?;
119        rxw.recv().map_err(|e| {
120            crate::error::QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
121        })??;
122        Ok(())
123    }
124}
125
126impl ResourceManager for HeapResourceManager {
127    fn redo(&self, _frame: &WalFrame, _ctx: &RedoContext) -> QuillSQLResult<usize> {
128        Ok(0)
129    }
130
131    fn undo(&self, frame: &WalFrame, ctx: &UndoContext) -> QuillSQLResult<()> {
132        let payload = self.decode_payload(frame)?;
133        match payload {
134            HeapRecordPayload::Insert(body) => {
135                self.apply_tuple_meta_flag(ctx, body.page_id, body.slot_id as usize, true)
136            }
137            HeapRecordPayload::Update(body) => {
138                if let (Some(old_meta), Some(old_bytes)) =
139                    (body.old_tuple_meta, body.old_tuple_data.as_deref())
140                {
141                    self.restore_tuple(
142                        ctx,
143                        body.page_id,
144                        body.slot_id as usize,
145                        old_meta,
146                        old_bytes,
147                    )
148                } else {
149                    Ok(())
150                }
151            }
152            HeapRecordPayload::Delete(body) => {
153                if let Some(old_bytes) = body.old_tuple_data.as_deref() {
154                    self.restore_tuple(
155                        ctx,
156                        body.page_id,
157                        body.slot_id as usize,
158                        body.old_tuple_meta,
159                        old_bytes,
160                    )
161                } else {
162                    self.apply_tuple_meta_flag(ctx, body.page_id, body.slot_id as usize, false)
163                }
164            }
165        }
166    }
167
168    fn transaction_id(&self, frame: &WalFrame) -> Option<u64> {
169        let payload = self.decode_payload(frame).ok()?;
170        Some(Self::heap_txn_id(&payload))
171    }
172}
173
174static REGISTER: OnceLock<()> = OnceLock::new();
175
176pub fn ensure_heap_resource_manager_registered() {
177    REGISTER.get_or_init(|| {
178        register_resource_manager(
179            ResourceManagerId::Heap,
180            Arc::new(HeapResourceManager::default()),
181        );
182    });
183}