Skip to main content

idb/innodb/
record.rs

1use byteorder::{BigEndian, ByteOrder};
2
3use crate::innodb::constants::*;
4
5/// Record type extracted from the info bits.
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7pub enum RecordType {
8    /// Ordinary user record (leaf page).
9    Ordinary,
10    /// Node pointer record (non-leaf page).
11    NodePtr,
12    /// Infimum system record.
13    Infimum,
14    /// Supremum system record.
15    Supremum,
16}
17
18impl RecordType {
19    pub fn from_u8(val: u8) -> Self {
20        match val & 0x07 {
21            0 => RecordType::Ordinary,
22            1 => RecordType::NodePtr,
23            2 => RecordType::Infimum,
24            3 => RecordType::Supremum,
25            _ => RecordType::Ordinary,
26        }
27    }
28
29    pub fn name(&self) -> &'static str {
30        match self {
31            RecordType::Ordinary => "REC_STATUS_ORDINARY",
32            RecordType::NodePtr => "REC_STATUS_NODE_PTR",
33            RecordType::Infimum => "REC_STATUS_INFIMUM",
34            RecordType::Supremum => "REC_STATUS_SUPREMUM",
35        }
36    }
37}
38
39/// Parsed compact (new-style) record header.
40///
41/// In compact format, 5 bytes precede each record:
42/// - Byte 0: info bits (delete mark, min_rec flag) + n_owned upper nibble
43/// - Bytes 1-2: heap_no (13 bits) + rec_type (3 bits)
44/// - Bytes 3-4: next record offset (signed, relative)
45#[derive(Debug, Clone)]
46pub struct CompactRecordHeader {
47    /// Number of records owned by this record in the page directory.
48    pub n_owned: u8,
49    /// Delete mark flag.
50    pub delete_mark: bool,
51    /// Min-rec flag (leftmost record on a non-leaf level).
52    pub min_rec: bool,
53    /// Record's position in the heap.
54    pub heap_no: u16,
55    /// Record type.
56    pub rec_type: RecordType,
57    /// Relative offset to the next record (signed).
58    pub next_offset: i16,
59}
60
61impl CompactRecordHeader {
62    /// Parse a compact record header from the 5 bytes preceding the record origin.
63    ///
64    /// `data` should point to the start of the 5-byte extra header.
65    pub fn parse(data: &[u8]) -> Option<Self> {
66        if data.len() < REC_N_NEW_EXTRA_BYTES {
67            return None;
68        }
69
70        // Byte 0 layout: [info_bits(4) | n_owned(4)]
71        // Info bits (upper nibble): bit 5 = delete_mark, bit 4 = min_rec
72        // n_owned (lower nibble): bits 0-3
73        let byte0 = data[0];
74        let n_owned = byte0 & 0x0F;
75        let delete_mark = (byte0 & 0x20) != 0;
76        let min_rec = (byte0 & 0x10) != 0;
77
78        let two_bytes = BigEndian::read_u16(&data[1..3]);
79        let rec_type = RecordType::from_u8((two_bytes & 0x07) as u8);
80        let heap_no = (two_bytes >> 3) & 0x1FFF;
81
82        let next_offset = BigEndian::read_i16(&data[3..5]);
83
84        Some(CompactRecordHeader {
85            n_owned,
86            delete_mark,
87            min_rec,
88            heap_no,
89            rec_type,
90            next_offset,
91        })
92    }
93}
94
95/// A record position on a page, with its parsed header.
96#[derive(Debug, Clone)]
97pub struct RecordInfo {
98    /// Absolute offset of the record origin within the page.
99    pub offset: usize,
100    /// Parsed record header.
101    pub header: CompactRecordHeader,
102}
103
104/// Walk all user records on a compact-format INDEX page.
105///
106/// Starts from infimum and follows next-record offsets until reaching supremum.
107/// Returns a list of record positions (excluding infimum/supremum).
108pub fn walk_compact_records(page_data: &[u8]) -> Vec<RecordInfo> {
109    let mut records = Vec::new();
110
111    // Infimum record origin is at PAGE_NEW_INFIMUM (99)
112    let infimum_origin = PAGE_NEW_INFIMUM;
113    if page_data.len() < infimum_origin + 2 {
114        return records;
115    }
116
117    // Read infimum's next-record offset (at infimum_origin - 2, relative to origin)
118    let infimum_extra_start = infimum_origin - REC_N_NEW_EXTRA_BYTES;
119    if page_data.len() < infimum_extra_start + REC_N_NEW_EXTRA_BYTES {
120        return records;
121    }
122
123    let infimum_hdr = match CompactRecordHeader::parse(&page_data[infimum_extra_start..]) {
124        Some(h) => h,
125        None => return records,
126    };
127
128    // Follow the linked list
129    let mut current_offset = infimum_origin;
130    let mut next_rel = infimum_hdr.next_offset;
131
132    // Safety: limit iterations to prevent infinite loops
133    let max_iter = page_data.len();
134    let mut iterations = 0;
135
136    loop {
137        if iterations > max_iter {
138            break;
139        }
140        iterations += 1;
141
142        // Calculate next record's absolute offset
143        let next_abs = (current_offset as i32 + next_rel as i32) as usize;
144        if next_abs < REC_N_NEW_EXTRA_BYTES || next_abs >= page_data.len() {
145            break;
146        }
147
148        // Parse the record header (5 bytes before the origin)
149        let extra_start = next_abs - REC_N_NEW_EXTRA_BYTES;
150        if extra_start + REC_N_NEW_EXTRA_BYTES > page_data.len() {
151            break;
152        }
153
154        let hdr = match CompactRecordHeader::parse(&page_data[extra_start..]) {
155            Some(h) => h,
156            None => break,
157        };
158
159        // If we've reached supremum, stop
160        if hdr.rec_type == RecordType::Supremum {
161            break;
162        }
163
164        next_rel = hdr.next_offset;
165        records.push(RecordInfo {
166            offset: next_abs,
167            header: hdr,
168        });
169        current_offset = next_abs;
170
171        // next_offset of 0 means end of list
172        if next_rel == 0 {
173            break;
174        }
175    }
176
177    records
178}
179
180/// Parse the variable-length field lengths from a compact record's null bitmap
181/// and variable-length header. Returns the field data starting offset.
182///
183/// For SDI records and other known-format records, callers can use the
184/// record offset directly since field positions are fixed.
185pub fn read_variable_field_lengths(
186    page_data: &[u8],
187    record_origin: usize,
188    n_nullable: usize,
189    n_variable: usize,
190) -> Option<(Vec<bool>, Vec<usize>)> {
191    // The variable-length header grows backwards from the record origin,
192    // before the 5-byte compact extra header.
193    // Layout (backwards from origin - 5):
194    //   - null bitmap: ceil(n_nullable / 8) bytes
195    //   - variable-length field lengths: 1 or 2 bytes each
196
197    let null_bitmap_bytes = n_nullable.div_ceil(8);
198    let mut pos = record_origin - REC_N_NEW_EXTRA_BYTES;
199
200    // Read null bitmap
201    if pos < null_bitmap_bytes {
202        return None;
203    }
204    pos -= null_bitmap_bytes;
205    let mut nulls = Vec::with_capacity(n_nullable);
206    for i in 0..n_nullable {
207        let byte_idx = pos + (i / 8);
208        let bit_idx = i % 8;
209        if byte_idx >= page_data.len() {
210            return None;
211        }
212        nulls.push((page_data[byte_idx] & (1 << bit_idx)) != 0);
213    }
214
215    // Read variable-length field lengths
216    let mut var_lengths = Vec::with_capacity(n_variable);
217    for _ in 0..n_variable {
218        if pos == 0 {
219            return None;
220        }
221        pos -= 1;
222        if pos >= page_data.len() {
223            return None;
224        }
225        let len_byte = page_data[pos] as usize;
226        if len_byte & 0x80 != 0 {
227            // 2-byte length
228            if pos == 0 {
229                return None;
230            }
231            pos -= 1;
232            if pos >= page_data.len() {
233                return None;
234            }
235            let high_byte = page_data[pos] as usize;
236            let total_len = ((len_byte & 0x3F) << 8) | high_byte;
237            var_lengths.push(total_len);
238        } else {
239            var_lengths.push(len_byte);
240        }
241    }
242
243    Some((nulls, var_lengths))
244}
245
246#[cfg(test)]
247mod tests {
248    use super::*;
249    use byteorder::ByteOrder;
250
251    #[test]
252    fn test_record_type_from_u8() {
253        assert_eq!(RecordType::from_u8(0), RecordType::Ordinary);
254        assert_eq!(RecordType::from_u8(1), RecordType::NodePtr);
255        assert_eq!(RecordType::from_u8(2), RecordType::Infimum);
256        assert_eq!(RecordType::from_u8(3), RecordType::Supremum);
257    }
258
259    #[test]
260    fn test_compact_record_header_parse() {
261        // Build a 5-byte compact header:
262        // byte0: [info_bits(4) | n_owned(4)]
263        //   n_owned=1 in lower nibble, no info bits => 0x01
264        // bytes 1-2: heap_no=5 (5<<3=0x0028), rec_type=0 => 0x0028
265        // bytes 3-4: next_offset = 30 => 0x001E
266        let mut data = vec![0u8; 5];
267        data[0] = 0x01; // n_owned=1, no delete, no min_rec
268        BigEndian::write_u16(&mut data[1..3], 5 << 3); // heap_no=5, type=0
269        BigEndian::write_i16(&mut data[3..5], 30); // next=30
270
271        let hdr = CompactRecordHeader::parse(&data).unwrap();
272        assert_eq!(hdr.n_owned, 1);
273        assert!(!hdr.delete_mark);
274        assert!(!hdr.min_rec);
275        assert_eq!(hdr.heap_no, 5);
276        assert_eq!(hdr.rec_type, RecordType::Ordinary);
277        assert_eq!(hdr.next_offset, 30);
278    }
279
280    #[test]
281    fn test_compact_record_header_with_flags() {
282        let mut data = vec![0u8; 5];
283        // n_owned=3 (0x30), delete_mark (0x20), min_rec (0x10)
284        // => 0x30 | 0x20 | 0x10 = 0x70... wait, n_owned is bits 4-7 so n_owned=3 is 0x30
285        // delete_mark is bit 5 (0x20), min_rec is bit 4 (0x10)
286        // But if n_owned=3 takes bits 4-7, that's 0x30, which conflicts with bit 5 for delete.
287        // Actually in InnoDB: byte0 has info_bits in upper 4 bits and... let me recheck.
288        // The layout is: [info_bits(4) | n_owned(4)]
289        // info_bits: bit 7=unused, bit 6=unused, bit 5=delete_mark, bit 4=min_rec
290        // n_owned: bits 0-3
291        // So: delete_mark=1, min_rec=0, n_owned=2 => 0x20 | 0x02 = 0x22
292        data[0] = 0x22; // delete_mark=1, n_owned=2
293        BigEndian::write_u16(&mut data[1..3], (10 << 3) | 1); // heap_no=10, type=node_ptr
294        BigEndian::write_i16(&mut data[3..5], -50); // negative offset
295
296        let hdr = CompactRecordHeader::parse(&data).unwrap();
297        assert_eq!(hdr.n_owned, 2);
298        assert!(hdr.delete_mark);
299        assert!(!hdr.min_rec);
300        assert_eq!(hdr.heap_no, 10);
301        assert_eq!(hdr.rec_type, RecordType::NodePtr);
302        assert_eq!(hdr.next_offset, -50);
303    }
304}