quill_sql/storage/heap/
heap_recovery.rs

1use std::sync::atomic::AtomicU32;
2use std::sync::Arc;
3
4use bytes::{Bytes, BytesMut};
5
6use crate::buffer::{BufferManager, PageId, PAGE_SIZE};
7use crate::catalog::EMPTY_SCHEMA_REF;
8use crate::error::{QuillSQLError, QuillSQLResult};
9use crate::recovery::resource_manager::{
10    register_resource_manager, RedoContext, ResourceManager, UndoContext,
11};
12use crate::recovery::wal::codec::{ResourceManagerId, WalFrame};
13use crate::recovery::Lsn;
14use crate::storage::codec::{TablePageHeaderCodec, TupleCodec};
15use crate::storage::heap::wal_codec::{
16    decode_heap_record, HeapDeletePayload, HeapInsertPayload, HeapRecordPayload, TupleMetaRepr,
17};
18use crate::storage::page::{RecordId, TablePageHeader, TupleInfo, TupleMeta};
19use crate::storage::table_heap::TableHeap;
20use crate::storage::tuple::Tuple;
21use crate::transaction::{CommandId, TransactionId};
22use std::sync::OnceLock;
23
24#[derive(Default)]
25struct HeapResourceManager;
26
27impl HeapResourceManager {
28    fn decode_payload(&self, frame: &WalFrame) -> QuillSQLResult<HeapRecordPayload> {
29        decode_heap_record(&frame.body, frame.info)
30    }
31
32    fn heap_txn_id(payload: &HeapRecordPayload) -> u64 {
33        match payload {
34            HeapRecordPayload::Insert(p) => p.op_txn_id,
35            HeapRecordPayload::Delete(p) => p.op_txn_id,
36        }
37    }
38
39    fn apply_tuple_meta_flag(
40        &self,
41        ctx: &UndoContext,
42        page_id: u32,
43        slot_idx: usize,
44        deleted: bool,
45    ) -> QuillSQLResult<()> {
46        if let Some(bpm) = &ctx.buffer_pool {
47            let rid = RecordId::new(page_id, slot_idx as u32);
48            let guard = bpm.fetch_page_read(page_id)?;
49            let (header, _hdr_len) = TablePageHeaderCodec::decode(guard.data())?;
50            drop(guard);
51            if slot_idx >= header.tuple_infos.len() {
52                return Ok(());
53            }
54            let mut new_meta = header.tuple_infos[slot_idx].meta;
55            if deleted {
56                new_meta.is_deleted = true;
57            } else {
58                new_meta.clear_delete();
59            }
60            let heap = TableHeap::recovery_view(bpm.clone());
61            let _ = heap.recover_set_tuple_meta(rid, new_meta);
62            let _ = bpm.flush_page(page_id);
63            return Ok(());
64        }
65        Ok(())
66    }
67
68    fn restore_tuple(
69        &self,
70        ctx: &UndoContext,
71        page_id: u32,
72        slot_idx: usize,
73        old_meta: TupleMetaRepr,
74        old_bytes: &[u8],
75    ) -> QuillSQLResult<()> {
76        if let Some(bpm) = &ctx.buffer_pool {
77            let heap = TableHeap::recovery_view(bpm.clone());
78            let rid = RecordId::new(page_id, slot_idx as u32);
79            let _ = heap.recover_set_tuple_bytes(rid, old_bytes);
80            let restored_meta: TupleMeta = old_meta.into();
81            let _ = heap.recover_set_tuple_meta(rid, restored_meta);
82            let _ = bpm.flush_page(page_id);
83            return Ok(());
84        }
85
86        let rx = ctx.disk_scheduler.schedule_read(page_id)?;
87        let buf: BytesMut = rx.recv().map_err(|e| {
88            crate::error::QuillSQLError::Internal(format!("WAL recovery read recv failed: {}", e))
89        })??;
90        let page_bytes = buf.to_vec();
91        let (mut header, _hdr_len) = TablePageHeaderCodec::decode(&page_bytes)?;
92        if slot_idx >= header.tuple_infos.len() {
93            return Ok(());
94        }
95        let tuple_count = header.tuple_infos.len();
96        let mut tuples_bytes: Vec<Vec<u8>> = Vec::with_capacity(tuple_count);
97        for i in 0..tuple_count {
98            let info = &header.tuple_infos[i];
99            let slice = &page_bytes[info.offset as usize..(info.offset + info.size) as usize];
100            if i == slot_idx {
101                tuples_bytes.push(old_bytes.to_vec());
102            } else {
103                tuples_bytes.push(slice.to_vec());
104            }
105        }
106        let mut tail = PAGE_SIZE;
107        for i in 0..tuple_count {
108            let size = tuples_bytes[i].len();
109            tail = tail.saturating_sub(size);
110            header.tuple_infos[i].offset = tail as u16;
111            header.tuple_infos[i].size = size as u16;
112        }
113        let restored_meta: TupleMeta = old_meta.into();
114        header.tuple_infos[slot_idx].meta = restored_meta;
115        let new_header_bytes = TablePageHeaderCodec::encode(&header);
116        let mut new_page = vec![0u8; PAGE_SIZE];
117        let max_hdr = std::cmp::min(new_header_bytes.len(), PAGE_SIZE);
118        new_page[0..max_hdr].copy_from_slice(&new_header_bytes[..max_hdr]);
119        for i in 0..tuple_count {
120            let off = header.tuple_infos[i].offset as usize;
121            let sz = header.tuple_infos[i].size as usize;
122            if off + sz <= PAGE_SIZE {
123                new_page[off..off + sz].copy_from_slice(&tuples_bytes[i][..sz]);
124            }
125        }
126        let rxw = ctx
127            .disk_scheduler
128            .schedule_write(page_id, Bytes::from(new_page))?;
129        rxw.recv().map_err(|e| {
130            crate::error::QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
131        })??;
132        Ok(())
133    }
134
135    fn apply_with_page<F>(
136        &self,
137        ctx: &RedoContext,
138        page_id: PageId,
139        frame_lsn: Lsn,
140        mutator: F,
141    ) -> QuillSQLResult<bool>
142    where
143        F: FnOnce(&mut [u8], Lsn) -> QuillSQLResult<bool>,
144    {
145        if let Some(bpm) = &ctx.buffer_pool {
146            if let Ok(mut guard) = bpm.fetch_page_write(page_id) {
147                let applied = mutator(guard.data_mut(), frame_lsn)?;
148                if applied {
149                    guard.set_lsn(frame_lsn);
150                    guard.mark_dirty();
151                }
152                return Ok(applied);
153            }
154        }
155
156        let mut data = Self::read_or_zero(ctx, page_id);
157        let applied = mutator(&mut data[..], frame_lsn)?;
158        if applied {
159            let rxw = ctx
160                .disk_scheduler
161                .schedule_write(page_id, Bytes::from(data))?;
162            rxw.recv().map_err(|e| {
163                QuillSQLError::Internal(format!("WAL recovery write recv failed: {}", e))
164            })??;
165        }
166        Ok(applied)
167    }
168
169    fn read_or_zero(ctx: &RedoContext, page_id: PageId) -> Vec<u8> {
170        match ctx.disk_scheduler.schedule_read(page_id) {
171            Ok(rx) => match rx.recv() {
172                Ok(Ok(bytes)) => {
173                    if bytes.len() == PAGE_SIZE {
174                        bytes.to_vec()
175                    } else {
176                        vec![0u8; PAGE_SIZE]
177                    }
178                }
179                _ => vec![0u8; PAGE_SIZE],
180            },
181            Err(_) => vec![0u8; PAGE_SIZE],
182        }
183    }
184
185    fn redo_insert(
186        &self,
187        frame: &WalFrame,
188        ctx: &RedoContext,
189        body: &HeapInsertPayload,
190    ) -> QuillSQLResult<bool> {
191        self.apply_with_page(ctx, body.page_id, frame.lsn, |page_bytes, lsn| {
192            Self::apply_tuple_update(
193                page_bytes,
194                lsn,
195                body.slot_id as usize,
196                Some(body.tuple_meta.into()),
197                Some(&body.tuple_data),
198                true,
199            )
200        })
201    }
202
203    fn redo_delete(
204        &self,
205        frame: &WalFrame,
206        ctx: &RedoContext,
207        body: &HeapDeletePayload,
208    ) -> QuillSQLResult<bool> {
209        self.apply_with_page(ctx, body.page_id, frame.lsn, |page_bytes, lsn| {
210            Self::apply_tuple_update(
211                page_bytes,
212                lsn,
213                body.slot_id as usize,
214                Some(body.new_tuple_meta.into()),
215                None,
216                false,
217            )
218        })
219    }
220
221    fn collect_slots(header: &TablePageHeader, page: &[u8]) -> QuillSQLResult<Vec<Vec<u8>>> {
222        let mut slots = Vec::with_capacity(header.tuple_infos.len());
223        for info in &header.tuple_infos {
224            let start = info.offset as usize;
225            let end = start + info.size as usize;
226            if end > page.len() {
227                return Err(QuillSQLError::Storage(format!(
228                    "heap redo tuple range [{}, {}) exceeds page size {}",
229                    start,
230                    end,
231                    page.len()
232                )));
233            }
234            slots.push(page[start..end].to_vec());
235        }
236        Ok(slots)
237    }
238
239    fn pack_page(
240        header: &mut TablePageHeader,
241        tuples: &[Vec<u8>],
242        dest: &mut [u8],
243    ) -> QuillSQLResult<()> {
244        if header.tuple_infos.len() != tuples.len() {
245            return Err(QuillSQLError::Internal(
246                "heap redo tuple metadata count mismatch".to_string(),
247            ));
248        }
249        dest.fill(0);
250        let mut tail = PAGE_SIZE;
251        for (info, tuple) in header.tuple_infos.iter_mut().zip(tuples.iter()) {
252            let len = tuple.len();
253            if len > PAGE_SIZE {
254                return Err(QuillSQLError::Storage(
255                    "tuple length exceeds page capacity".to_string(),
256                ));
257            }
258            if tail < len {
259                return Err(QuillSQLError::Storage(
260                    "insufficient free space while rebuilding heap page".to_string(),
261                ));
262            }
263            tail -= len;
264            info.offset = tail as u16;
265            info.size = len as u16;
266        }
267        header.num_tuples = header.tuple_infos.len() as u16;
268        header.num_deleted_tuples = header
269            .tuple_infos
270            .iter()
271            .filter(|info| info.meta.is_deleted)
272            .count() as u16;
273        let header_bytes = TablePageHeaderCodec::encode(header);
274        if header_bytes.len() > tail {
275            return Err(QuillSQLError::Storage(
276                "heap page header overlaps tuple data during redo".to_string(),
277            ));
278        }
279        dest[..header_bytes.len()].copy_from_slice(&header_bytes);
280        for (info, tuple) in header.tuple_infos.iter().zip(tuples.iter()) {
281            let start = info.offset as usize;
282            let end = start + tuple.len();
283            dest[start..end].copy_from_slice(tuple);
284        }
285        Ok(())
286    }
287
288    fn apply_tuple_update(
289        page_bytes: &mut [u8],
290        lsn: Lsn,
291        slot: usize,
292        new_meta: Option<TupleMeta>,
293        new_tuple: Option<&[u8]>,
294        allow_insert: bool,
295    ) -> QuillSQLResult<bool> {
296        let (mut header, _) = TablePageHeaderCodec::decode(page_bytes)?;
297        if lsn != 0 && header.lsn >= lsn {
298            return Ok(false);
299        }
300        let existing_slots = header.tuple_infos.len();
301        if slot > existing_slots {
302            return Err(QuillSQLError::Storage(format!(
303                "heap redo slot {} beyond tuple count {}",
304                slot, existing_slots
305            )));
306        }
307
308        let mut tuples = Self::collect_slots(&header, page_bytes)?;
309
310        if slot == existing_slots {
311            if !allow_insert {
312                return Ok(false);
313            }
314            let Some(bytes) = new_tuple else {
315                return Err(QuillSQLError::Storage(
316                    "heap redo insert missing tuple bytes".to_string(),
317                ));
318            };
319            let Some(meta) = new_meta else {
320                return Err(QuillSQLError::Storage(
321                    "heap redo insert missing tuple metadata".to_string(),
322                ));
323            };
324            header.tuple_infos.push(TupleInfo {
325                offset: 0,
326                size: 0,
327                meta,
328            });
329            tuples.push(bytes.to_vec());
330        } else {
331            if let Some(meta) = new_meta {
332                header.tuple_infos[slot].meta = meta;
333            }
334            if let Some(bytes) = new_tuple {
335                tuples[slot] = bytes.to_vec();
336            }
337        }
338
339        header.lsn = lsn;
340        Self::pack_page(&mut header, &tuples, page_bytes)?;
341        Ok(true)
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use crate::storage::page::TupleMeta;
349
350    fn decode_header(page: &[u8]) -> TablePageHeader {
351        TablePageHeaderCodec::decode(page).unwrap().0
352    }
353
354    #[test]
355    fn apply_tuple_update_inserts_and_updates_lsn() {
356        let mut page = vec![0u8; PAGE_SIZE];
357        let lsn1 = 10;
358        let meta = TupleMeta::new(1, 0);
359        let data1 = vec![1u8, 2, 3];
360        let applied = HeapResourceManager::apply_tuple_update(
361            &mut page,
362            lsn1,
363            0,
364            Some(meta),
365            Some(&data1),
366            true,
367        )
368        .expect("apply insert");
369        assert!(applied);
370        let header = decode_header(&page);
371        assert_eq!(header.lsn, lsn1);
372        assert_eq!(header.num_tuples, 1);
373        let info = &header.tuple_infos[0];
374        let stored = &page[info.offset as usize..(info.offset + info.size) as usize];
375        assert_eq!(stored, data1.as_slice());
376
377        // Older LSN should be ignored
378        let skipped =
379            HeapResourceManager::apply_tuple_update(&mut page, lsn1 - 1, 0, None, None, false)
380                .expect("apply older lsn");
381        assert!(!skipped);
382        let header_after = decode_header(&page);
383        assert_eq!(header_after.lsn, lsn1);
384    }
385
386    #[test]
387    fn apply_tuple_update_overwrites_and_repacks() {
388        let mut page = vec![0u8; PAGE_SIZE];
389        let lsn1 = 5;
390        let mut meta = TupleMeta::new(2, 0);
391        let data_small = vec![7u8, 7];
392        HeapResourceManager::apply_tuple_update(
393            &mut page,
394            lsn1,
395            0,
396            Some(meta),
397            Some(&data_small),
398            true,
399        )
400        .unwrap();
401
402        let lsn2 = 20;
403        meta.mark_deleted(9, 0);
404        let data_large = vec![9u8, 9, 9, 9, 9];
405        let applied = HeapResourceManager::apply_tuple_update(
406            &mut page,
407            lsn2,
408            0,
409            Some(meta),
410            Some(&data_large),
411            false,
412        )
413        .unwrap();
414        assert!(applied);
415
416        let header = decode_header(&page);
417        assert_eq!(header.lsn, lsn2);
418        assert_eq!(header.num_tuples, 1);
419        assert!(header.tuple_infos[0].meta.is_deleted);
420        let info = &header.tuple_infos[0];
421        let stored = &page[info.offset as usize..(info.offset + info.size) as usize];
422        assert_eq!(stored, data_large.as_slice());
423    }
424}
425impl ResourceManager for HeapResourceManager {
426    fn redo(&self, frame: &WalFrame, ctx: &RedoContext) -> QuillSQLResult<usize> {
427        let payload = self.decode_payload(frame)?;
428        let applied = match payload {
429            HeapRecordPayload::Insert(ref body) => self.redo_insert(frame, ctx, body)?,
430            HeapRecordPayload::Delete(ref body) => self.redo_delete(frame, ctx, body)?,
431        };
432        Ok(applied as usize)
433    }
434
435    fn undo(&self, frame: &WalFrame, ctx: &UndoContext) -> QuillSQLResult<()> {
436        let payload = self.decode_payload(frame)?;
437        match payload {
438            HeapRecordPayload::Insert(body) => {
439                self.apply_tuple_meta_flag(ctx, body.page_id, body.slot_id as usize, true)
440            }
441            HeapRecordPayload::Delete(body) => self.restore_tuple(
442                ctx,
443                body.page_id,
444                body.slot_id as usize,
445                body.old_tuple_meta,
446                &body.old_tuple_data,
447            ),
448        }
449    }
450
451    fn transaction_id(&self, frame: &WalFrame) -> Option<u64> {
452        let payload = self.decode_payload(frame).ok()?;
453        Some(Self::heap_txn_id(&payload))
454    }
455}
456
457static REGISTER: OnceLock<()> = OnceLock::new();
458
459pub fn ensure_heap_resource_manager_registered() {
460    REGISTER.get_or_init(|| {
461        register_resource_manager(
462            ResourceManagerId::Heap,
463            Arc::new(HeapResourceManager::default()),
464        );
465    });
466}
467
468impl TableHeap {
469    /// Construct a lightweight TableHeap view for recovery operations.
470    /// This instance uses an empty schema and does not rely on first/last page ids.
471    pub fn recovery_view(buffer_pool: Arc<BufferManager>) -> Self {
472        Self {
473            schema: EMPTY_SCHEMA_REF.clone(),
474            buffer_pool,
475            first_page_id: AtomicU32::new(0),
476            last_page_id: AtomicU32::new(0),
477        }
478    }
479
480    /// Recovery-only API: set tuple meta without emitting WAL.
481    /// Only used by RecoveryManager during UNDO.
482    pub fn recover_set_tuple_meta(&self, rid: RecordId, meta: TupleMeta) -> QuillSQLResult<()> {
483        let mut guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
484        let (mut header, hdr_len) = TablePageHeaderCodec::decode(guard.data())?;
485        if (rid.slot_num as usize) >= header.tuple_infos.len() {
486            return Ok(());
487        }
488        let info = &mut header.tuple_infos[rid.slot_num as usize];
489        if info.meta.is_deleted != meta.is_deleted {
490            if meta.is_deleted {
491                header.num_deleted_tuples = header.num_deleted_tuples.saturating_add(1);
492            } else {
493                header.num_deleted_tuples = header.num_deleted_tuples.saturating_sub(1);
494            }
495        }
496        info.meta = meta;
497        let new_header = TablePageHeaderCodec::encode(&header);
498        let copy_len = std::cmp::min(hdr_len, new_header.len());
499        guard.data_mut()[0..copy_len].copy_from_slice(&new_header[..copy_len]);
500        guard.mark_dirty();
501        Ok(())
502    }
503
504    /// Recovery-only API: set tuple raw bytes without emitting WAL.
505    /// If size mismatches, repack tuple area and update offsets.
506    pub fn recover_set_tuple_bytes(&self, rid: RecordId, new_bytes: &[u8]) -> QuillSQLResult<()> {
507        let mut guard = self.buffer_pool.fetch_page_write(rid.page_id)?;
508        let (mut header, _hdr_len) = TablePageHeaderCodec::decode(guard.data())?;
509        if (rid.slot_num as usize) >= header.tuple_infos.len() {
510            return Ok(());
511        }
512        let slot = rid.slot_num as usize;
513        let info = &mut header.tuple_infos[slot];
514        let off = info.offset as usize;
515        let sz = info.size as usize;
516        if new_bytes.len() == sz {
517            if off + sz <= crate::buffer::PAGE_SIZE {
518                guard.data_mut()[off..off + sz].copy_from_slice(new_bytes);
519            }
520            guard.mark_dirty();
521            return Ok(());
522        }
523        let n = header.tuple_infos.len();
524        let mut tuples: Vec<Vec<u8>> = Vec::with_capacity(n);
525        for i in 0..n {
526            let inf = &header.tuple_infos[i];
527            let s = &guard.data()[inf.offset as usize..(inf.offset + inf.size) as usize];
528            if i == slot {
529                tuples.push(new_bytes.to_vec());
530            } else {
531                tuples.push(s.to_vec());
532            }
533        }
534        let mut tail = crate::buffer::PAGE_SIZE;
535        for i in 0..n {
536            let sz = tuples[i].len();
537            tail = tail.saturating_sub(sz);
538            header.tuple_infos[i].offset = tail as u16;
539            header.tuple_infos[i].size = sz as u16;
540        }
541        let new_header = TablePageHeaderCodec::encode(&header);
542        for b in guard.data_mut().iter_mut() {
543            *b = 0;
544        }
545        let hdr_copy = std::cmp::min(new_header.len(), crate::buffer::PAGE_SIZE);
546        guard.data_mut()[0..hdr_copy].copy_from_slice(&new_header[..hdr_copy]);
547        for i in 0..n {
548            let off = header.tuple_infos[i].offset as usize;
549            let sz = header.tuple_infos[i].size as usize;
550            if off + sz <= crate::buffer::PAGE_SIZE {
551                guard.data_mut()[off..off + sz].copy_from_slice(&tuples[i][..sz]);
552            }
553        }
554        guard.mark_dirty();
555        Ok(())
556    }
557
558    pub fn recover_restore_tuple(
559        &self,
560        rid: RecordId,
561        meta: TupleMeta,
562        tuple: &Tuple,
563    ) -> QuillSQLResult<()> {
564        let bytes = TupleCodec::encode(tuple);
565        self.recover_set_tuple_bytes(rid, &bytes)?;
566        self.recover_set_tuple_meta(rid, meta)
567    }
568
569    pub fn recover_delete_tuple(
570        &self,
571        rid: RecordId,
572        txn_id: TransactionId,
573        cid: CommandId,
574    ) -> QuillSQLResult<()> {
575        let mut meta = self.tuple_meta(rid)?;
576        if meta.is_deleted {
577            return Ok(());
578        }
579        meta.mark_deleted(txn_id, cid);
580        self.recover_set_tuple_meta(rid, meta)
581    }
582}