1use std::sync::atomic::AtomicU32;
2use std::sync::Arc;
3
4use bytes::{Bytes, BytesMut};
5
6use crate::buffer::{BufferManager, PageId, PAGE_SIZE};
7use crate::catalog::EMPTY_SCHEMA_REF;
8use crate::error::{QuillSQLError, QuillSQLResult};
9use crate::recovery::resource_manager::{
10 register_resource_manager, RedoContext, ResourceManager, UndoContext,
11};
12use crate::recovery::wal::codec::{ResourceManagerId, WalFrame};
13use crate::recovery::Lsn;
14use crate::storage::codec::{TablePageHeaderCodec, TupleCodec};
15use crate::storage::heap::wal_codec::{
16 decode_heap_record, HeapDeletePayload, HeapInsertPayload, HeapRecordPayload, TupleMetaRepr,
17};
18use crate::storage::page::{RecordId, TablePageHeader, TupleInfo, TupleMeta};
19use crate::storage::table_heap::TableHeap;
20use crate::storage::tuple::Tuple;
21use crate::transaction::{CommandId, TransactionId};
22use std::sync::OnceLock;
23
24#[derive(Default)]
25struct HeapResourceManager;
26
27impl HeapResourceManager {
28 fn decode_payload(&self, frame: &WalFrame) -> QuillSQLResult<HeapRecordPayload> {
29 decode_heap_record(&frame.body, frame.info)
30 }
31
32 fn heap_txn_id(payload: &HeapRecordPayload) -> u64 {
33 match payload {
34 HeapRecordPayload::Insert(p) => p.op_txn_id,
35 HeapRecordPayload::Delete(p) => p.op_txn_id,
36 }
37 }
38
39 fn apply_tuple_meta_flag(
40 &self,
41 ctx: &UndoContext,
42 page_id: u32,
43 slot_idx: usize,
44 deleted: bool,
45 ) -> QuillSQLResult<()> {
46 if let Some(bpm) = &ctx.buffer_pool {
47 let rid = RecordId::new(page_id, slot_idx as u32);
48 let guard = bpm.fetch_page_read(page_id)?;
49 let (header, _hdr_len) = TablePageHeaderCodec::decode(guard.data())?;
50 drop(guard);
51 if slot_idx >= header.tuple_infos.len() {
52 return Ok(());
53 }
54 let mut new_meta = header.tuple_infos[slot_idx].meta;
55 if deleted {
56 new_meta.is_deleted = true;
57 } else {
58 new_meta.clear_delete();
59 }
60 let heap = TableHeap::recovery_view(bpm.clone());
61 let _ = heap.recover_set_tuple_meta(rid, new_meta);
62 let _ = bpm.flush_page(page_id);
63 return Ok(());
64 }
65 Ok(())
66 }
67
68 fn restore_tuple(
69 &self,
70 ctx: &UndoContext,
71 page_id: u32,
72 slot_idx: usize,
73 old_meta: TupleMetaRepr,
74 old_bytes: &[u8],
75 ) -> QuillSQLResult<()> {
76 if let Some(bpm) = &ctx.buffer_pool {
77 let heap = TableHeap::recovery_view(bpm.clone());
78 let rid = RecordId::new(page_id, slot_idx as u32);
79 let _ = heap.recover_set_tuple_bytes(rid, old_bytes);
80 let restored_meta: TupleMeta = old_meta.into();
81 let _ = heap.recover_set_tuple_meta(rid, restored_meta);
82 let _ = bpm.flush_page(page_id);
83 return Ok(());
84 }
85
86 let rx = ctx.disk_scheduler.schedule_read(page_id)?;
87 let buf: BytesMut = rx.recv().map_err(|e| {
88 crate::error::QuillSQLError::Internal(format!("WAL recovery read recv failed: {}", e))
89 })??;
90 let page_bytes = buf.to_vec();
91 let (mut header, _hdr_len) = TablePageHeaderCodec::decode(&page_bytes)?;
92 if slot_idx >= header.tuple_infos.len() {
93 return Ok(());
94 }
95 let tuple_count = header.tuple_infos.len();
96 let mut tuples_bytes: Vec<Vec<u8>> = Vec::with_capacity(tuple_count);
97 for i in 0..tuple_count {
98 let info = &header.tuple_infos[i];
99 let slice = &page_bytes[info.offset as usize..(info.offset + info.size) as usize];
100 if i == slot_idx {
101 tuples_bytes.push(old_bytes.to_vec());
102 } else {
103 tuples_bytes.push(slice.to_vec());
104 }
105 }
106 let mut tail = PAGE_SIZE;
107 for i in 0..tuple_count {
108 let size = tuples_bytes[i].len();
109 tail = tail.saturating_sub(size);
110 header.tuple_infos[i].offset = tail as u16;
111 header.tuple_infos[i].size = size as u16;
112 }
113 let restored_meta: TupleMeta = old_meta.into();
114 header.tuple_infos[slot_idx].meta = restored_meta;
115 let new_header_bytes = TablePageHeaderCodec::encode(&header);
116 let mut new_page = vec![0u8; PAGE_SIZE];
117 let max_hdr = std::cmp::min(new_header_bytes.len(), PAGE_SIZE);
118 new_page[0..max_hdr].copy_from_slice(&new_header_bytes[..max_hdr]);
119 for i in 0..tuple_count {
120 let off = header.tuple_infos[i].offset as usize;
121 let sz = header.tuple_infos[i].size as usize;
122 if off + sz <= PAGE_SIZE {
123 new_page[off..off + sz].copy_from_slice(&tuples_bytes[i][..sz]);
124 }
125 }
126 let rxw = ctx
127 .disk_scheduler
128 .schedule_write(page_id, Bytes::from(new_page))?;
129 rxw.recv().map_err(|e| {
130 crate::error::QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
131 })??;
132 Ok(())
133 }
134
135 fn apply_with_page<F>(
136 &self,
137 ctx: &RedoContext,
138 page_id: PageId,
139 frame_lsn: Lsn,
140 mutator: F,
141 ) -> QuillSQLResult<bool>
142 where
143 F: FnOnce(&mut [u8], Lsn) -> QuillSQLResult<bool>,
144 {
145 if let Some(bpm) = &ctx.buffer_pool {
146 if let Ok(mut guard) = bpm.fetch_page_write(page_id) {
147 let applied = mutator(guard.data_mut(), frame_lsn)?;
148 if applied {
149 guard.set_lsn(frame_lsn);
150 guard.mark_dirty();
151 }
152 return Ok(applied);
153 }
154 }
155
156 let mut data = Self::read_or_zero(ctx, page_id);
157 let applied = mutator(&mut data[..], frame_lsn)?;
158 if applied {
159 let rxw = ctx
160 .disk_scheduler
161 .schedule_write(page_id, Bytes::from(data))?;
162 rxw.recv().map_err(|e| {
163 QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
164 })??;
165 }
166 Ok(applied)
167 }
168
169 fn read_or_zero(ctx: &RedoContext, page_id: PageId) -> Vec<u8> {
170 match ctx.disk_scheduler.schedule_read(page_id) {
171 Ok(rx) => match rx.recv() {
172 Ok(Ok(bytes)) => {
173 if bytes.len() == PAGE_SIZE {
174 bytes.to_vec()
175 } else {
176 vec![0u8; PAGE_SIZE]
177 }
178 }
179 _ => vec![0u8; PAGE_SIZE],
180 },
181 Err(_) => vec![0u8; PAGE_SIZE],
182 }
183 }
184
185 fn redo_insert(
186 &self,
187 frame: &WalFrame,
188 ctx: &RedoContext,
189 body: &HeapInsertPayload,
190 ) -> QuillSQLResult<bool> {
191 self.apply_with_page(ctx, body.page_id, frame.lsn, |page_bytes, lsn| {
192 Self::apply_tuple_update(
193 page_bytes,
194 lsn,
195 body.slot_id as usize,
196 Some(body.tuple_meta.into()),
197 Some(&body.tuple_data),
198 true,
199 )
200 })
201 }
202
203 fn redo_delete(
204 &self,
205 frame: &WalFrame,
206 ctx: &RedoContext,
207 body: &HeapDeletePayload,
208 ) -> QuillSQLResult<bool> {
209 self.apply_with_page(ctx, body.page_id, frame.lsn, |page_bytes, lsn| {
210 Self::apply_tuple_update(
211 page_bytes,
212 lsn,
213 body.slot_id as usize,
214 Some(body.new_tuple_meta.into()),
215 None,
216 false,
217 )
218 })
219 }
220
221 fn collect_slots(header: &TablePageHeader, page: &[u8]) -> QuillSQLResult<Vec<Vec<u8>>> {
222 let mut slots = Vec::with_capacity(header.tuple_infos.len());
223 for info in &header.tuple_infos {
224 let start = info.offset as usize;
225 let end = start + info.size as usize;
226 if end > page.len() {
227 return Err(QuillSQLError::Storage(format!(
228 "heap redo tuple range [{}, {}) exceeds page size {}",
229 start,
230 end,
231 page.len()
232 )));
233 }
234 slots.push(page[start..end].to_vec());
235 }
236 Ok(slots)
237 }
238
239 fn pack_page(
240 header: &mut TablePageHeader,
241 tuples: &[Vec<u8>],
242 dest: &mut [u8],
243 ) -> QuillSQLResult<()> {
244 if header.tuple_infos.len() != tuples.len() {
245 return Err(QuillSQLError::Internal(
246 "heap redo tuple metadata count mismatch".to_string(),
247 ));
248 }
249 dest.fill(0);
250 let mut tail = PAGE_SIZE;
251 for (info, tuple) in header.tuple_infos.iter_mut().zip(tuples.iter()) {
252 let len = tuple.len();
253 if len > PAGE_SIZE {
254 return Err(QuillSQLError::Storage(
255 "tuple length exceeds page capacity".to_string(),
256 ));
257 }
258 if tail < len {
259 return Err(QuillSQLError::Storage(
260 "insufficient free space while rebuilding heap page".to_string(),
261 ));
262 }
263 tail -= len;
264 info.offset = tail as u16;
265 info.size = len as u16;
266 }
267 header.num_tuples = header.tuple_infos.len() as u16;
268 header.num_deleted_tuples = header
269 .tuple_infos
270 .iter()
271 .filter(|info| info.meta.is_deleted)
272 .count() as u16;
273 let header_bytes = TablePageHeaderCodec::encode(header);
274 if header_bytes.len() > tail {
275 return Err(QuillSQLError::Storage(
276 "heap page header overlaps tuple data during redo".to_string(),
277 ));
278 }
279 dest[..header_bytes.len()].copy_from_slice(&header_bytes);
280 for (info, tuple) in header.tuple_infos.iter().zip(tuples.iter()) {
281 let start = info.offset as usize;
282 let end = start + tuple.len();
283 dest[start..end].copy_from_slice(tuple);
284 }
285 Ok(())
286 }
287
288 fn apply_tuple_update(
289 page_bytes: &mut [u8],
290 lsn: Lsn,
291 slot: usize,
292 new_meta: Option<TupleMeta>,
293 new_tuple: Option<&[u8]>,
294 allow_insert: bool,
295 ) -> QuillSQLResult<bool> {
296 let (mut header, _) = TablePageHeaderCodec::decode(page_bytes)?;
297 if lsn != 0 && header.lsn >= lsn {
298 return Ok(false);
299 }
300 let existing_slots = header.tuple_infos.len();
301 if slot > existing_slots {
302 return Err(QuillSQLError::Storage(format!(
303 "heap redo slot {} beyond tuple count {}",
304 slot, existing_slots
305 )));
306 }
307
308 let mut tuples = Self::collect_slots(&header, page_bytes)?;
309
310 if slot == existing_slots {
311 if !allow_insert {
312 return Ok(false);
313 }
314 let Some(bytes) = new_tuple else {
315 return Err(QuillSQLError::Storage(
316 "heap redo insert missing tuple bytes".to_string(),
317 ));
318 };
319 let Some(meta) = new_meta else {
320 return Err(QuillSQLError::Storage(
321 "heap redo insert missing tuple metadata".to_string(),
322 ));
323 };
324 header.tuple_infos.push(TupleInfo {
325 offset: 0,
326 size: 0,
327 meta,
328 });
329 tuples.push(bytes.to_vec());
330 } else {
331 if let Some(meta) = new_meta {
332 header.tuple_infos[slot].meta = meta;
333 }
334 if let Some(bytes) = new_tuple {
335 tuples[slot] = bytes.to_vec();
336 }
337 }
338
339 header.lsn = lsn;
340 Self::pack_page(&mut header, &tuples, page_bytes)?;
341 Ok(true)
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348 use crate::storage::page::TupleMeta;
349
350 fn decode_header(page: &[u8]) -> TablePageHeader {
351 TablePageHeaderCodec::decode(page).unwrap().0
352 }
353
354 #[test]
355 fn apply_tuple_update_inserts_and_updates_lsn() {
356 let mut page = vec![0u8; PAGE_SIZE];
357 let lsn1 = 10;
358 let meta = TupleMeta::new(1, 0);
359 let data1 = vec![1u8, 2, 3];
360 let applied = HeapResourceManager::apply_tuple_update(
361 &mut page,
362 lsn1,
363 0,
364 Some(meta),
365 Some(&data1),
366 true,
367 )
368 .expect("apply insert");
369 assert!(applied);
370 let header = decode_header(&page);
371 assert_eq!(header.lsn, lsn1);
372 assert_eq!(header.num_tuples, 1);
373 let info = &header.tuple_infos[0];
374 let stored = &page[info.offset as usize..(info.offset + info.size) as usize];
375 assert_eq!(stored, data1.as_slice());
376
377 let skipped =
379 HeapResourceManager::apply_tuple_update(&mut page, lsn1 - 1, 0, None, None, false)
380 .expect("apply older lsn");
381 assert!(!skipped);
382 let header_after = decode_header(&page);
383 assert_eq!(header_after.lsn, lsn1);
384 }
385
386 #[test]
387 fn apply_tuple_update_overwrites_and_repacks() {
388 let mut page = vec![0u8; PAGE_SIZE];
389 let lsn1 = 5;
390 let mut meta = TupleMeta::new(2, 0);
391 let data_small = vec![7u8, 7];
392 HeapResourceManager::apply_tuple_update(
393 &mut page,
394 lsn1,
395 0,
396 Some(meta),
397 Some(&data_small),
398 true,
399 )
400 .unwrap();
401
402 let lsn2 = 20;
403 meta.mark_deleted(9, 0);
404 let data_large = vec![9u8, 9, 9, 9, 9];
405 let applied = HeapResourceManager::apply_tuple_update(
406 &mut page,
407 lsn2,
408 0,
409 Some(meta),
410 Some(&data_large),
411 false,
412 )
413 .unwrap();
414 assert!(applied);
415
416 let header = decode_header(&page);
417 assert_eq!(header.lsn, lsn2);
418 assert_eq!(header.num_tuples, 1);
419 assert!(header.tuple_infos[0].meta.is_deleted);
420 let info = &header.tuple_infos[0];
421 let stored = &page[info.offset as usize..(info.offset + info.size) as usize];
422 assert_eq!(stored, data_large.as_slice());
423 }
424}
425impl ResourceManager for HeapResourceManager {
426 fn redo(&self, frame: &WalFrame, ctx: &RedoContext) -> QuillSQLResult<usize> {
427 let payload = self.decode_payload(frame)?;
428 let applied = match payload {
429 HeapRecordPayload::Insert(ref body) => self.redo_insert(frame, ctx, body)?,
430 HeapRecordPayload::Delete(ref body) => self.redo_delete(frame, ctx, body)?,
431 };
432 Ok(applied as usize)
433 }
434
435 fn undo(&self, frame: &WalFrame, ctx: &UndoContext) -> QuillSQLResult<()> {
436 let payload = self.decode_payload(frame)?;
437 match payload {
438 HeapRecordPayload::Insert(body) => {
439 self.apply_tuple_meta_flag(ctx, body.page_id, body.slot_id as usize, true)
440 }
441 HeapRecordPayload::Delete(body) => self.restore_tuple(
442 ctx,
443 body.page_id,
444 body.slot_id as usize,
445 body.old_tuple_meta,
446 &body.old_tuple_data,
447 ),
448 }
449 }
450
451 fn transaction_id(&self, frame: &WalFrame) -> Option<u64> {
452 let payload = self.decode_payload(frame).ok()?;
453 Some(Self::heap_txn_id(&payload))
454 }
455}
456
457static REGISTER: OnceLock<()> = OnceLock::new();
458
459pub fn ensure_heap_resource_manager_registered() {
460 REGISTER.get_or_init(|| {
461 register_resource_manager(
462 ResourceManagerId::Heap,
463 Arc::new(HeapResourceManager::default()),
464 );
465 });
466}
467
468impl TableHeap {
469 pub fn recovery_view(buffer_pool: Arc<BufferManager>) -> Self {
472 Self {
473 schema: EMPTY_SCHEMA_REF.clone(),
474 buffer_pool,
475 first_page_id: AtomicU32::new(0),
476 last_page_id: AtomicU32::new(0),
477 }
478 }
479
480 pub fn recover_set_tuple_meta(&self, rid: RecordId, meta: TupleMeta) -> QuillSQLResult<()> {
483 let mut guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
484 let (mut header, hdr_len) = TablePageHeaderCodec::decode(guard.data())?;
485 if (rid.slot_num as usize) >= header.tuple_infos.len() {
486 return Ok(());
487 }
488 let info = &mut header.tuple_infos[rid.slot_num as usize];
489 if info.meta.is_deleted != meta.is_deleted {
490 if meta.is_deleted {
491 header.num_deleted_tuples = header.num_deleted_tuples.saturating_add(1);
492 } else {
493 header.num_deleted_tuples = header.num_deleted_tuples.saturating_sub(1);
494 }
495 }
496 info.meta = meta;
497 let new_header = TablePageHeaderCodec::encode(&header);
498 let copy_len = std::cmp::min(hdr_len, new_header.len());
499 guard.data_mut()[0..copy_len].copy_from_slice(&new_header[..copy_len]);
500 guard.mark_dirty();
501 Ok(())
502 }
503
504 pub fn recover_set_tuple_bytes(&self, rid: RecordId, new_bytes: &[u8]) -> QuillSQLResult<()> {
507 let mut guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
508 let (mut header, _hdr_len) = TablePageHeaderCodec::decode(guard.data())?;
509 if (rid.slot_num as usize) >= header.tuple_infos.len() {
510 return Ok(());
511 }
512 let slot = rid.slot_num as usize;
513 let info = &mut header.tuple_infos[slot];
514 let off = info.offset as usize;
515 let sz = info.size as usize;
516 if new_bytes.len() == sz {
517 if off + sz <= crate::buffer::PAGE_SIZE {
518 guard.data_mut()[off..off + sz].copy_from_slice(new_bytes);
519 }
520 guard.mark_dirty();
521 return Ok(());
522 }
523 let n = header.tuple_infos.len();
524 let mut tuples: Vec<Vec<u8>> = Vec::with_capacity(n);
525 for i in 0..n {
526 let inf = &header.tuple_infos[i];
527 let s = &guard.data()[inf.offset as usize..(inf.offset + inf.size) as usize];
528 if i == slot {
529 tuples.push(new_bytes.to_vec());
530 } else {
531 tuples.push(s.to_vec());
532 }
533 }
534 let mut tail = crate::buffer::PAGE_SIZE;
535 for i in 0..n {
536 let sz = tuples[i].len();
537 tail = tail.saturating_sub(sz);
538 header.tuple_infos[i].offset = tail as u16;
539 header.tuple_infos[i].size = sz as u16;
540 }
541 let new_header = TablePageHeaderCodec::encode(&header);
542 for b in guard.data_mut().iter_mut() {
543 *b = 0;
544 }
545 let hdr_copy = std::cmp::min(new_header.len(), crate::buffer::PAGE_SIZE);
546 guard.data_mut()[0..hdr_copy].copy_from_slice(&new_header[..hdr_copy]);
547 for i in 0..n {
548 let off = header.tuple_infos[i].offset as usize;
549 let sz = header.tuple_infos[i].size as usize;
550 if off + sz <= crate::buffer::PAGE_SIZE {
551 guard.data_mut()[off..off + sz].copy_from_slice(&tuples[i][..sz]);
552 }
553 }
554 guard.mark_dirty();
555 Ok(())
556 }
557
558 pub fn recover_restore_tuple(
559 &self,
560 rid: RecordId,
561 meta: TupleMeta,
562 tuple: &Tuple,
563 ) -> QuillSQLResult<()> {
564 let bytes = TupleCodec::encode(tuple);
565 self.recover_set_tuple_bytes(rid, &bytes)?;
566 self.recover_set_tuple_meta(rid, meta)
567 }
568
569 pub fn recover_delete_tuple(
570 &self,
571 rid: RecordId,
572 txn_id: TransactionId,
573 cid: CommandId,
574 ) -> QuillSQLResult<()> {
575 let mut meta = self.tuple_meta(rid)?;
576 if meta.is_deleted {
577 return Ok(());
578 }
579 meta.mark_deleted(txn_id, cid);
580 self.recover_set_tuple_meta(rid, meta)
581 }
582}