use byteorder::{BigEndian, ByteOrder};
use crate::innodb::constants::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecordType {
Ordinary,
NodePtr,
Infimum,
Supremum,
}
impl RecordType {
pub fn from_u8(val: u8) -> Self {
match val & 0x07 {
0 => RecordType::Ordinary,
1 => RecordType::NodePtr,
2 => RecordType::Infimum,
3 => RecordType::Supremum,
_ => RecordType::Ordinary,
}
}
pub fn name(&self) -> &'static str {
match self {
RecordType::Ordinary => "REC_STATUS_ORDINARY",
RecordType::NodePtr => "REC_STATUS_NODE_PTR",
RecordType::Infimum => "REC_STATUS_INFIMUM",
RecordType::Supremum => "REC_STATUS_SUPREMUM",
}
}
}
#[derive(Debug, Clone)]
pub struct CompactRecordHeader {
pub n_owned: u8,
pub delete_mark: bool,
pub min_rec: bool,
pub heap_no: u16,
pub rec_type: RecordType,
pub next_offset: i16,
}
impl CompactRecordHeader {
pub fn parse(data: &[u8]) -> Option<Self> {
if data.len() < REC_N_NEW_EXTRA_BYTES {
return None;
}
let byte0 = data[0];
let n_owned = byte0 & 0x0F;
let delete_mark = (byte0 & 0x20) != 0;
let min_rec = (byte0 & 0x10) != 0;
let two_bytes = BigEndian::read_u16(&data[1..3]);
let rec_type = RecordType::from_u8((two_bytes & 0x07) as u8);
let heap_no = (two_bytes >> 3) & 0x1FFF;
let next_offset = BigEndian::read_i16(&data[3..5]);
Some(CompactRecordHeader {
n_owned,
delete_mark,
min_rec,
heap_no,
rec_type,
next_offset,
})
}
}
#[derive(Debug, Clone)]
pub struct RedundantRecordHeader {
pub n_owned: u8,
pub delete_mark: bool,
pub min_rec: bool,
pub heap_no: u16,
pub rec_type: RecordType,
pub next_offset: u16,
pub n_fields: u16,
pub one_byte_offs: bool,
}
impl RedundantRecordHeader {
pub fn parse(data: &[u8]) -> Option<Self> {
if data.len() < REC_N_OLD_EXTRA_BYTES {
return None;
}
let byte0 = data[0];
let n_owned = byte0 & 0x0F;
let delete_mark = (byte0 & 0x20) != 0;
let min_rec = (byte0 & 0x10) != 0;
let heap_status = BigEndian::read_u16(&data[1..3]);
let rec_type = RecordType::from_u8((heap_status & 0x07) as u8);
let heap_no = (heap_status >> 3) & 0x1FFF;
let nf_word = BigEndian::read_u16(&data[2..4]);
let n_fields = (nf_word >> 6) & 0x03FF;
let one_byte_offs = (nf_word & 0x20) != 0;
let next_offset = BigEndian::read_u16(&data[4..6]);
Some(RedundantRecordHeader {
n_owned,
delete_mark,
min_rec,
heap_no,
rec_type,
next_offset,
n_fields,
one_byte_offs,
})
}
}
#[derive(Debug, Clone)]
pub enum RecordHeader {
Compact(CompactRecordHeader),
Redundant(RedundantRecordHeader),
}
impl RecordHeader {
pub fn n_owned(&self) -> u8 {
match self {
Self::Compact(h) => h.n_owned,
Self::Redundant(h) => h.n_owned,
}
}
pub fn delete_mark(&self) -> bool {
match self {
Self::Compact(h) => h.delete_mark,
Self::Redundant(h) => h.delete_mark,
}
}
pub fn min_rec(&self) -> bool {
match self {
Self::Compact(h) => h.min_rec,
Self::Redundant(h) => h.min_rec,
}
}
pub fn heap_no(&self) -> u16 {
match self {
Self::Compact(h) => h.heap_no,
Self::Redundant(h) => h.heap_no,
}
}
pub fn rec_type(&self) -> RecordType {
match self {
Self::Compact(h) => h.rec_type,
Self::Redundant(h) => h.rec_type,
}
}
pub fn next_offset_raw(&self) -> i16 {
match self {
Self::Compact(h) => h.next_offset,
Self::Redundant(h) => h.next_offset as i16,
}
}
}
#[derive(Debug, Clone)]
pub struct RecordInfo {
pub offset: usize,
pub header: RecordHeader,
}
pub fn walk_compact_records(page_data: &[u8]) -> Vec<RecordInfo> {
let mut records = Vec::new();
let infimum_origin = PAGE_NEW_INFIMUM;
if page_data.len() < infimum_origin + 2 {
return records;
}
let infimum_extra_start = infimum_origin - REC_N_NEW_EXTRA_BYTES;
if page_data.len() < infimum_extra_start + REC_N_NEW_EXTRA_BYTES {
return records;
}
let infimum_hdr = match CompactRecordHeader::parse(&page_data[infimum_extra_start..]) {
Some(h) => h,
None => return records,
};
let mut current_offset = infimum_origin;
let mut next_rel = infimum_hdr.next_offset;
let max_iter = page_data.len();
let mut iterations = 0;
loop {
if iterations > max_iter {
break;
}
iterations += 1;
let next_abs = (current_offset as i32 + next_rel as i32) as usize;
if next_abs < REC_N_NEW_EXTRA_BYTES || next_abs >= page_data.len() {
break;
}
let extra_start = next_abs - REC_N_NEW_EXTRA_BYTES;
if extra_start + REC_N_NEW_EXTRA_BYTES > page_data.len() {
break;
}
let hdr = match CompactRecordHeader::parse(&page_data[extra_start..]) {
Some(h) => h,
None => break,
};
if hdr.rec_type == RecordType::Supremum {
break;
}
next_rel = hdr.next_offset;
records.push(RecordInfo {
offset: next_abs,
header: RecordHeader::Compact(hdr),
});
current_offset = next_abs;
if next_rel == 0 {
break;
}
}
records
}
pub fn walk_redundant_records(page_data: &[u8]) -> Vec<RecordInfo> {
let mut records = Vec::new();
let infimum_origin = PAGE_OLD_INFIMUM;
if page_data.len() < infimum_origin + 2 {
return records;
}
let infimum_extra_start = infimum_origin - REC_N_OLD_EXTRA_BYTES;
if page_data.len() < infimum_extra_start + REC_N_OLD_EXTRA_BYTES {
return records;
}
let infimum_hdr = match RedundantRecordHeader::parse(&page_data[infimum_extra_start..]) {
Some(h) => h,
None => return records,
};
let mut next_abs = infimum_hdr.next_offset as usize;
let max_iter = page_data.len();
let mut iterations = 0;
loop {
if iterations > max_iter {
break;
}
iterations += 1;
if next_abs < REC_N_OLD_EXTRA_BYTES || next_abs >= page_data.len() {
break;
}
let extra_start = next_abs - REC_N_OLD_EXTRA_BYTES;
if extra_start + REC_N_OLD_EXTRA_BYTES > page_data.len() {
break;
}
let hdr = match RedundantRecordHeader::parse(&page_data[extra_start..]) {
Some(h) => h,
None => break,
};
if hdr.rec_type == RecordType::Supremum {
break;
}
let current_next = hdr.next_offset as usize;
records.push(RecordInfo {
offset: next_abs,
header: RecordHeader::Redundant(hdr),
});
if current_next == 0 {
break;
}
next_abs = current_next;
}
records
}
pub fn read_variable_field_lengths(
page_data: &[u8],
record_origin: usize,
n_nullable: usize,
n_variable: usize,
) -> Option<(Vec<bool>, Vec<usize>)> {
let null_bitmap_bytes = n_nullable.div_ceil(8);
let mut pos = record_origin - REC_N_NEW_EXTRA_BYTES;
if pos < null_bitmap_bytes {
return None;
}
pos -= null_bitmap_bytes;
let mut nulls = Vec::with_capacity(n_nullable);
for i in 0..n_nullable {
let byte_idx = pos + (i / 8);
let bit_idx = i % 8;
if byte_idx >= page_data.len() {
return None;
}
nulls.push((page_data[byte_idx] & (1 << bit_idx)) != 0);
}
let mut var_lengths = Vec::with_capacity(n_variable);
for _ in 0..n_variable {
if pos == 0 {
return None;
}
pos -= 1;
if pos >= page_data.len() {
return None;
}
let len_byte = page_data[pos] as usize;
if len_byte & 0x80 != 0 {
if pos == 0 {
return None;
}
pos -= 1;
if pos >= page_data.len() {
return None;
}
let high_byte = page_data[pos] as usize;
let total_len = ((len_byte & 0x3F) << 8) | high_byte;
var_lengths.push(total_len);
} else {
var_lengths.push(len_byte);
}
}
Some((nulls, var_lengths))
}
#[cfg(test)]
mod tests {
use super::*;
use byteorder::ByteOrder;
#[test]
fn test_record_type_from_u8() {
assert_eq!(RecordType::from_u8(0), RecordType::Ordinary);
assert_eq!(RecordType::from_u8(1), RecordType::NodePtr);
assert_eq!(RecordType::from_u8(2), RecordType::Infimum);
assert_eq!(RecordType::from_u8(3), RecordType::Supremum);
}
#[test]
fn test_compact_record_header_parse() {
let mut data = vec![0u8; 5];
data[0] = 0x01; BigEndian::write_u16(&mut data[1..3], 5 << 3); BigEndian::write_i16(&mut data[3..5], 30);
let hdr = CompactRecordHeader::parse(&data).unwrap();
assert_eq!(hdr.n_owned, 1);
assert!(!hdr.delete_mark);
assert!(!hdr.min_rec);
assert_eq!(hdr.heap_no, 5);
assert_eq!(hdr.rec_type, RecordType::Ordinary);
assert_eq!(hdr.next_offset, 30);
}
#[test]
fn test_compact_record_header_with_flags() {
let mut data = vec![0u8; 5];
data[0] = 0x22; BigEndian::write_u16(&mut data[1..3], (10 << 3) | 1); BigEndian::write_i16(&mut data[3..5], -50);
let hdr = CompactRecordHeader::parse(&data).unwrap();
assert_eq!(hdr.n_owned, 2);
assert!(hdr.delete_mark);
assert!(!hdr.min_rec);
assert_eq!(hdr.heap_no, 10);
assert_eq!(hdr.rec_type, RecordType::NodePtr);
assert_eq!(hdr.next_offset, -50);
}
#[test]
fn test_redundant_record_header_parse() {
let mut data = vec![0u8; 6];
data[0] = 0x22;
BigEndian::write_u16(&mut data[1..3], 8 << 3);
data[3] = 0x40; BigEndian::write_u16(&mut data[4..6], 300);
let hdr = RedundantRecordHeader::parse(&data).unwrap();
assert_eq!(hdr.n_owned, 2);
assert!(hdr.delete_mark);
assert_eq!(hdr.heap_no, 8);
assert_eq!(hdr.rec_type, RecordType::Ordinary);
assert_eq!(hdr.next_offset, 300);
}
#[test]
fn test_redundant_record_header_no_flags() {
let mut data = vec![0u8; 6];
data[0] = 0x01; BigEndian::write_u16(&mut data[1..3], 3 << 3); BigEndian::write_u16(&mut data[4..6], 150);
let hdr = RedundantRecordHeader::parse(&data).unwrap();
assert_eq!(hdr.n_owned, 1);
assert!(!hdr.delete_mark);
assert!(!hdr.min_rec);
assert_eq!(hdr.heap_no, 3);
assert_eq!(hdr.rec_type, RecordType::Ordinary);
assert_eq!(hdr.next_offset, 150);
}
#[test]
fn test_record_header_enum_accessors() {
let mut data = vec![0u8; 5];
data[0] = 0x22; BigEndian::write_u16(&mut data[1..3], 5 << 3);
BigEndian::write_i16(&mut data[3..5], 42);
let compact = CompactRecordHeader::parse(&data).unwrap();
let header = RecordHeader::Compact(compact);
assert_eq!(header.n_owned(), 2);
assert!(header.delete_mark());
assert_eq!(header.heap_no(), 5);
assert_eq!(header.rec_type(), RecordType::Ordinary);
assert_eq!(header.next_offset_raw(), 42);
}
}