quill_sql/storage/
heap_recovery.rs1use std::sync::Arc;
2
3use bytes::{Bytes, BytesMut};
4
5use crate::buffer::PAGE_SIZE;
6use crate::error::QuillSQLResult;
7use crate::recovery::resource_manager::{
8 register_resource_manager, RedoContext, ResourceManager, UndoContext,
9};
10use crate::recovery::wal::codec::{ResourceManagerId, WalFrame};
11use crate::storage::codec::TablePageHeaderCodec;
12use crate::storage::heap::wal_codec::{decode_heap_record, HeapRecordPayload, TupleMetaRepr};
13use crate::storage::page::{RecordId, TupleMeta};
14use crate::storage::table_heap::TableHeap;
15use std::sync::OnceLock;
16
17#[derive(Default)]
18struct HeapResourceManager;
19
20impl HeapResourceManager {
21 fn decode_payload(&self, frame: &WalFrame) -> QuillSQLResult<HeapRecordPayload> {
22 decode_heap_record(&frame.body, frame.info)
23 }
24
25 fn heap_txn_id(payload: &HeapRecordPayload) -> u64 {
26 match payload {
27 HeapRecordPayload::Insert(p) => p.op_txn_id,
28 HeapRecordPayload::Update(p) => p.op_txn_id,
29 HeapRecordPayload::Delete(p) => p.op_txn_id,
30 }
31 }
32
33 fn apply_tuple_meta_flag(
34 &self,
35 ctx: &UndoContext,
36 page_id: u32,
37 slot_idx: usize,
38 deleted: bool,
39 ) -> QuillSQLResult<()> {
40 if let Some(bpm) = &ctx.buffer_pool {
41 let rid = RecordId::new(page_id, slot_idx as u32);
42 let guard = bpm.fetch_page_read(page_id)?;
43 let (header, _hdr_len) = TablePageHeaderCodec::decode(guard.data())?;
44 drop(guard);
45 if slot_idx >= header.tuple_infos.len() {
46 return Ok(());
47 }
48 let mut new_meta = header.tuple_infos[slot_idx].meta;
49 new_meta.is_deleted = deleted;
50 let heap = TableHeap::recovery_view(bpm.clone());
51 let _ = heap.recover_set_tuple_meta(rid, new_meta);
52 let _ = bpm.flush_page(page_id);
53 return Ok(());
54 }
55 Ok(())
56 }
57
58 fn restore_tuple(
59 &self,
60 ctx: &UndoContext,
61 page_id: u32,
62 slot_idx: usize,
63 old_meta: TupleMetaRepr,
64 old_bytes: &[u8],
65 ) -> QuillSQLResult<()> {
66 if let Some(bpm) = &ctx.buffer_pool {
67 let heap = TableHeap::recovery_view(bpm.clone());
68 let rid = RecordId::new(page_id, slot_idx as u32);
69 let _ = heap.recover_set_tuple_bytes(rid, old_bytes);
70 let restored_meta: TupleMeta = old_meta.into();
71 let _ = heap.recover_set_tuple_meta(rid, restored_meta);
72 let _ = bpm.flush_page(page_id);
73 return Ok(());
74 }
75
76 let rx = ctx.disk_scheduler.schedule_read(page_id)?;
77 let buf: BytesMut = rx.recv().map_err(|e| {
78 crate::error::QuillSQLError::Internal(format!("WAL recovery read recv failed: {}", e))
79 })??;
80 let page_bytes = buf.to_vec();
81 let (mut header, _hdr_len) = TablePageHeaderCodec::decode(&page_bytes)?;
82 if slot_idx >= header.tuple_infos.len() {
83 return Ok(());
84 }
85 let tuple_count = header.tuple_infos.len();
86 let mut tuples_bytes: Vec<Vec<u8>> = Vec::with_capacity(tuple_count);
87 for i in 0..tuple_count {
88 let info = &header.tuple_infos[i];
89 let slice = &page_bytes[info.offset as usize..(info.offset + info.size) as usize];
90 if i == slot_idx {
91 tuples_bytes.push(old_bytes.to_vec());
92 } else {
93 tuples_bytes.push(slice.to_vec());
94 }
95 }
96 let mut tail = PAGE_SIZE;
97 for i in 0..tuple_count {
98 let size = tuples_bytes[i].len();
99 tail = tail.saturating_sub(size);
100 header.tuple_infos[i].offset = tail as u16;
101 header.tuple_infos[i].size = size as u16;
102 }
103 let restored_meta: TupleMeta = old_meta.into();
104 header.tuple_infos[slot_idx].meta = restored_meta;
105 let new_header_bytes = TablePageHeaderCodec::encode(&header);
106 let mut new_page = vec![0u8; PAGE_SIZE];
107 let max_hdr = std::cmp::min(new_header_bytes.len(), PAGE_SIZE);
108 new_page[0..max_hdr].copy_from_slice(&new_header_bytes[..max_hdr]);
109 for i in 0..tuple_count {
110 let off = header.tuple_infos[i].offset as usize;
111 let sz = header.tuple_infos[i].size as usize;
112 if off + sz <= PAGE_SIZE {
113 new_page[off..off + sz].copy_from_slice(&tuples_bytes[i][..sz]);
114 }
115 }
116 let rxw = ctx
117 .disk_scheduler
118 .schedule_write(page_id, Bytes::from(new_page))?;
119 rxw.recv().map_err(|e| {
120 crate::error::QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
121 })??;
122 Ok(())
123 }
124}
125
126impl ResourceManager for HeapResourceManager {
127 fn redo(&self, _frame: &WalFrame, _ctx: &RedoContext) -> QuillSQLResult<usize> {
128 Ok(0)
129 }
130
131 fn undo(&self, frame: &WalFrame, ctx: &UndoContext) -> QuillSQLResult<()> {
132 let payload = self.decode_payload(frame)?;
133 match payload {
134 HeapRecordPayload::Insert(body) => {
135 self.apply_tuple_meta_flag(ctx, body.page_id, body.slot_id as usize, true)
136 }
137 HeapRecordPayload::Update(body) => {
138 if let (Some(old_meta), Some(old_bytes)) =
139 (body.old_tuple_meta, body.old_tuple_data.as_deref())
140 {
141 self.restore_tuple(
142 ctx,
143 body.page_id,
144 body.slot_id as usize,
145 old_meta,
146 old_bytes,
147 )
148 } else {
149 Ok(())
150 }
151 }
152 HeapRecordPayload::Delete(body) => {
153 if let Some(old_bytes) = body.old_tuple_data.as_deref() {
154 self.restore_tuple(
155 ctx,
156 body.page_id,
157 body.slot_id as usize,
158 body.old_tuple_meta,
159 old_bytes,
160 )
161 } else {
162 self.apply_tuple_meta_flag(ctx, body.page_id, body.slot_id as usize, false)
163 }
164 }
165 }
166 }
167
168 fn transaction_id(&self, frame: &WalFrame) -> Option<u64> {
169 let payload = self.decode_payload(frame).ok()?;
170 Some(Self::heap_txn_id(&payload))
171 }
172}
173
174static REGISTER: OnceLock<()> = OnceLock::new();
175
176pub fn ensure_heap_resource_manager_registered() {
177 REGISTER.get_or_init(|| {
178 register_resource_manager(
179 ResourceManagerId::Heap,
180 Arc::new(HeapResourceManager::default()),
181 );
182 });
183}