Skip to main content

idb/innodb/
record.rs

1//! Row-level record parsing for InnoDB compact format.
2//!
3//! InnoDB stores rows in compact record format (MySQL 5.0+), where each record
4//! has a 5-byte header containing the info bits, record type, heap number, and
5//! next-record pointer. This module provides [`RecordType`] classification and
6//! [`walk_compact_records`] to traverse the singly-linked record chain within
7//! an INDEX page, starting from the infimum record.
8
9use byteorder::{BigEndian, ByteOrder};
10
11use crate::innodb::constants::*;
12
13/// Record type extracted from the info bits.
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
15pub enum RecordType {
16    /// Ordinary user record (leaf page).
17    Ordinary,
18    /// Node pointer record (non-leaf page).
19    NodePtr,
20    /// Infimum system record.
21    Infimum,
22    /// Supremum system record.
23    Supremum,
24}
25
26impl RecordType {
27    /// Convert a 3-bit status value from the record header to a `RecordType`.
28    ///
29    /// Only the lowest 3 bits of `val` are used.
30    pub fn from_u8(val: u8) -> Self {
31        match val & 0x07 {
32            0 => RecordType::Ordinary,
33            1 => RecordType::NodePtr,
34            2 => RecordType::Infimum,
35            3 => RecordType::Supremum,
36            _ => RecordType::Ordinary,
37        }
38    }
39
40    /// Returns the MySQL source-style name for this record type (e.g. `"REC_STATUS_ORDINARY"`).
41    pub fn name(&self) -> &'static str {
42        match self {
43            RecordType::Ordinary => "REC_STATUS_ORDINARY",
44            RecordType::NodePtr => "REC_STATUS_NODE_PTR",
45            RecordType::Infimum => "REC_STATUS_INFIMUM",
46            RecordType::Supremum => "REC_STATUS_SUPREMUM",
47        }
48    }
49}
50
51/// Parsed compact (new-style) record header.
52///
53/// In compact format, 5 bytes precede each record:
54/// - Byte 0: info bits (delete mark, min_rec flag) + n_owned upper nibble
55/// - Bytes 1-2: heap_no (13 bits) + rec_type (3 bits)
56/// - Bytes 3-4: next record offset (signed, relative)
57#[derive(Debug, Clone)]
58pub struct CompactRecordHeader {
59    /// Number of records owned by this record in the page directory.
60    pub n_owned: u8,
61    /// Delete mark flag.
62    pub delete_mark: bool,
63    /// Min-rec flag (leftmost record on a non-leaf level).
64    pub min_rec: bool,
65    /// Record's position in the heap.
66    pub heap_no: u16,
67    /// Record type.
68    pub rec_type: RecordType,
69    /// Relative offset to the next record (signed).
70    pub next_offset: i16,
71}
72
73impl CompactRecordHeader {
74    /// Parse a compact record header from the 5 bytes preceding the record origin.
75    ///
76    /// `data` should point to the start of the 5-byte extra header.
77    pub fn parse(data: &[u8]) -> Option<Self> {
78        if data.len() < REC_N_NEW_EXTRA_BYTES {
79            return None;
80        }
81
82        // Byte 0 layout: [info_bits(4) | n_owned(4)]
83        // Info bits (upper nibble): bit 5 = delete_mark, bit 4 = min_rec
84        // n_owned (lower nibble): bits 0-3
85        let byte0 = data[0];
86        let n_owned = byte0 & 0x0F;
87        let delete_mark = (byte0 & 0x20) != 0;
88        let min_rec = (byte0 & 0x10) != 0;
89
90        let two_bytes = BigEndian::read_u16(&data[1..3]);
91        let rec_type = RecordType::from_u8((two_bytes & 0x07) as u8);
92        let heap_no = (two_bytes >> 3) & 0x1FFF;
93
94        let next_offset = BigEndian::read_i16(&data[3..5]);
95
96        Some(CompactRecordHeader {
97            n_owned,
98            delete_mark,
99            min_rec,
100            heap_no,
101            rec_type,
102            next_offset,
103        })
104    }
105}
106
107/// A record position on a page, with its parsed header.
108#[derive(Debug, Clone)]
109pub struct RecordInfo {
110    /// Absolute offset of the record origin within the page.
111    pub offset: usize,
112    /// Parsed record header.
113    pub header: CompactRecordHeader,
114}
115
116/// Walk all user records on a compact-format INDEX page.
117///
118/// Starts from infimum and follows next-record offsets until reaching supremum.
119/// Returns a list of record positions (excluding infimum/supremum).
120pub fn walk_compact_records(page_data: &[u8]) -> Vec<RecordInfo> {
121    let mut records = Vec::new();
122
123    // Infimum record origin is at PAGE_NEW_INFIMUM (99)
124    let infimum_origin = PAGE_NEW_INFIMUM;
125    if page_data.len() < infimum_origin + 2 {
126        return records;
127    }
128
129    // Read infimum's next-record offset (at infimum_origin - 2, relative to origin)
130    let infimum_extra_start = infimum_origin - REC_N_NEW_EXTRA_BYTES;
131    if page_data.len() < infimum_extra_start + REC_N_NEW_EXTRA_BYTES {
132        return records;
133    }
134
135    let infimum_hdr = match CompactRecordHeader::parse(&page_data[infimum_extra_start..]) {
136        Some(h) => h,
137        None => return records,
138    };
139
140    // Follow the linked list
141    let mut current_offset = infimum_origin;
142    let mut next_rel = infimum_hdr.next_offset;
143
144    // Safety: limit iterations to prevent infinite loops
145    let max_iter = page_data.len();
146    let mut iterations = 0;
147
148    loop {
149        if iterations > max_iter {
150            break;
151        }
152        iterations += 1;
153
154        // Calculate next record's absolute offset
155        let next_abs = (current_offset as i32 + next_rel as i32) as usize;
156        if next_abs < REC_N_NEW_EXTRA_BYTES || next_abs >= page_data.len() {
157            break;
158        }
159
160        // Parse the record header (5 bytes before the origin)
161        let extra_start = next_abs - REC_N_NEW_EXTRA_BYTES;
162        if extra_start + REC_N_NEW_EXTRA_BYTES > page_data.len() {
163            break;
164        }
165
166        let hdr = match CompactRecordHeader::parse(&page_data[extra_start..]) {
167            Some(h) => h,
168            None => break,
169        };
170
171        // If we've reached supremum, stop
172        if hdr.rec_type == RecordType::Supremum {
173            break;
174        }
175
176        next_rel = hdr.next_offset;
177        records.push(RecordInfo {
178            offset: next_abs,
179            header: hdr,
180        });
181        current_offset = next_abs;
182
183        // next_offset of 0 means end of list
184        if next_rel == 0 {
185            break;
186        }
187    }
188
189    records
190}
191
192/// Parse the variable-length field lengths from a compact record's null bitmap
193/// and variable-length header. Returns the field data starting offset.
194///
195/// For SDI records and other known-format records, callers can use the
196/// record offset directly since field positions are fixed.
197pub fn read_variable_field_lengths(
198    page_data: &[u8],
199    record_origin: usize,
200    n_nullable: usize,
201    n_variable: usize,
202) -> Option<(Vec<bool>, Vec<usize>)> {
203    // The variable-length header grows backwards from the record origin,
204    // before the 5-byte compact extra header.
205    // Layout (backwards from origin - 5):
206    //   - null bitmap: ceil(n_nullable / 8) bytes
207    //   - variable-length field lengths: 1 or 2 bytes each
208
209    let null_bitmap_bytes = n_nullable.div_ceil(8);
210    let mut pos = record_origin - REC_N_NEW_EXTRA_BYTES;
211
212    // Read null bitmap
213    if pos < null_bitmap_bytes {
214        return None;
215    }
216    pos -= null_bitmap_bytes;
217    let mut nulls = Vec::with_capacity(n_nullable);
218    for i in 0..n_nullable {
219        let byte_idx = pos + (i / 8);
220        let bit_idx = i % 8;
221        if byte_idx >= page_data.len() {
222            return None;
223        }
224        nulls.push((page_data[byte_idx] & (1 << bit_idx)) != 0);
225    }
226
227    // Read variable-length field lengths
228    let mut var_lengths = Vec::with_capacity(n_variable);
229    for _ in 0..n_variable {
230        if pos == 0 {
231            return None;
232        }
233        pos -= 1;
234        if pos >= page_data.len() {
235            return None;
236        }
237        let len_byte = page_data[pos] as usize;
238        if len_byte & 0x80 != 0 {
239            // 2-byte length
240            if pos == 0 {
241                return None;
242            }
243            pos -= 1;
244            if pos >= page_data.len() {
245                return None;
246            }
247            let high_byte = page_data[pos] as usize;
248            let total_len = ((len_byte & 0x3F) << 8) | high_byte;
249            var_lengths.push(total_len);
250        } else {
251            var_lengths.push(len_byte);
252        }
253    }
254
255    Some((nulls, var_lengths))
256}
257
258#[cfg(test)]
259mod tests {
260    use super::*;
261    use byteorder::ByteOrder;
262
263    #[test]
264    fn test_record_type_from_u8() {
265        assert_eq!(RecordType::from_u8(0), RecordType::Ordinary);
266        assert_eq!(RecordType::from_u8(1), RecordType::NodePtr);
267        assert_eq!(RecordType::from_u8(2), RecordType::Infimum);
268        assert_eq!(RecordType::from_u8(3), RecordType::Supremum);
269    }
270
271    #[test]
272    fn test_compact_record_header_parse() {
273        // Build a 5-byte compact header:
274        // byte0: [info_bits(4) | n_owned(4)]
275        //   n_owned=1 in lower nibble, no info bits => 0x01
276        // bytes 1-2: heap_no=5 (5<<3=0x0028), rec_type=0 => 0x0028
277        // bytes 3-4: next_offset = 30 => 0x001E
278        let mut data = vec![0u8; 5];
279        data[0] = 0x01; // n_owned=1, no delete, no min_rec
280        BigEndian::write_u16(&mut data[1..3], 5 << 3); // heap_no=5, type=0
281        BigEndian::write_i16(&mut data[3..5], 30); // next=30
282
283        let hdr = CompactRecordHeader::parse(&data).unwrap();
284        assert_eq!(hdr.n_owned, 1);
285        assert!(!hdr.delete_mark);
286        assert!(!hdr.min_rec);
287        assert_eq!(hdr.heap_no, 5);
288        assert_eq!(hdr.rec_type, RecordType::Ordinary);
289        assert_eq!(hdr.next_offset, 30);
290    }
291
292    #[test]
293    fn test_compact_record_header_with_flags() {
294        let mut data = vec![0u8; 5];
295        // n_owned=3 (0x30), delete_mark (0x20), min_rec (0x10)
296        // => 0x30 | 0x20 | 0x10 = 0x70... wait, n_owned is bits 4-7 so n_owned=3 is 0x30
297        // delete_mark is bit 5 (0x20), min_rec is bit 4 (0x10)
298        // But if n_owned=3 takes bits 4-7, that's 0x30, which conflicts with bit 5 for delete.
299        // Actually in InnoDB: byte0 has info_bits in upper 4 bits and... let me recheck.
300        // The layout is: [info_bits(4) | n_owned(4)]
301        // info_bits: bit 7=unused, bit 6=unused, bit 5=delete_mark, bit 4=min_rec
302        // n_owned: bits 0-3
303        // So: delete_mark=1, min_rec=0, n_owned=2 => 0x20 | 0x02 = 0x22
304        data[0] = 0x22; // delete_mark=1, n_owned=2
305        BigEndian::write_u16(&mut data[1..3], (10 << 3) | 1); // heap_no=10, type=node_ptr
306        BigEndian::write_i16(&mut data[3..5], -50); // negative offset
307
308        let hdr = CompactRecordHeader::parse(&data).unwrap();
309        assert_eq!(hdr.n_owned, 2);
310        assert!(hdr.delete_mark);
311        assert!(!hdr.min_rec);
312        assert_eq!(hdr.heap_no, 10);
313        assert_eq!(hdr.rec_type, RecordType::NodePtr);
314        assert_eq!(hdr.next_offset, -50);
315    }
316}