use byteorder::{BigEndian, ByteOrder};
use serde::Serialize;
use crate::innodb::constants::{FIL_NULL, FIL_PAGE_DATA};
use crate::innodb::page::FilHeader;
use crate::innodb::page_types::PageType;
use crate::innodb::tablespace::Tablespace;
use crate::IdbError;
const TRX_UNDO_PAGE_TYPE: usize = 0; const TRX_UNDO_PAGE_START: usize = 2; const TRX_UNDO_PAGE_FREE: usize = 4; #[allow(dead_code)]
const TRX_UNDO_PAGE_NODE: usize = 6; const TRX_UNDO_PAGE_HDR_SIZE: usize = 18;
const TRX_UNDO_STATE: usize = 0; const TRX_UNDO_LAST_LOG: usize = 2; #[allow(dead_code)]
const TRX_UNDO_FSEG_HEADER: usize = 4; #[allow(dead_code)]
const TRX_UNDO_PAGE_LIST: usize = 14; const TRX_UNDO_SEG_HDR_SIZE: usize = 30;
const TRX_UNDO_TRX_ID: usize = 0; const TRX_UNDO_TRX_NO: usize = 8; const TRX_UNDO_DEL_MARKS: usize = 16; const TRX_UNDO_LOG_START: usize = 18; const TRX_UNDO_XID_EXISTS: usize = 20; const TRX_UNDO_DICT_TRANS: usize = 21; const TRX_UNDO_TABLE_ID: usize = 22; const TRX_UNDO_NEXT_LOG: usize = 30; const TRX_UNDO_PREV_LOG: usize = 32;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub enum UndoPageType {
Insert,
Update,
Unknown(u16),
}
impl UndoPageType {
pub fn from_u16(value: u16) -> Self {
match value {
1 => UndoPageType::Insert,
2 => UndoPageType::Update,
v => UndoPageType::Unknown(v),
}
}
pub fn name(&self) -> &'static str {
match self {
UndoPageType::Insert => "INSERT",
UndoPageType::Update => "UPDATE",
UndoPageType::Unknown(_) => "UNKNOWN",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub enum UndoState {
Active,
Cached,
ToFree,
ToPurge,
Prepared,
Unknown(u16),
}
impl UndoState {
pub fn from_u16(value: u16) -> Self {
match value {
1 => UndoState::Active,
2 => UndoState::Cached,
3 => UndoState::ToFree,
4 => UndoState::ToPurge,
5 => UndoState::Prepared,
v => UndoState::Unknown(v),
}
}
pub fn name(&self) -> &'static str {
match self {
UndoState::Active => "ACTIVE",
UndoState::Cached => "CACHED",
UndoState::ToFree => "TO_FREE",
UndoState::ToPurge => "TO_PURGE",
UndoState::Prepared => "PREPARED",
UndoState::Unknown(_) => "UNKNOWN",
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct UndoPageHeader {
pub page_type: UndoPageType,
pub start: u16,
pub free: u16,
}
#[derive(Debug, Clone, Serialize)]
pub struct UndoSegmentHeader {
pub state: UndoState,
pub last_log: u16,
}
impl UndoPageHeader {
pub fn parse(page_data: &[u8]) -> Option<Self> {
let base = FIL_PAGE_DATA;
if page_data.len() < base + TRX_UNDO_PAGE_HDR_SIZE {
return None;
}
let d = &page_data[base..];
Some(UndoPageHeader {
page_type: UndoPageType::from_u16(BigEndian::read_u16(&d[TRX_UNDO_PAGE_TYPE..])),
start: BigEndian::read_u16(&d[TRX_UNDO_PAGE_START..]),
free: BigEndian::read_u16(&d[TRX_UNDO_PAGE_FREE..]),
})
}
}
impl UndoSegmentHeader {
pub fn parse(page_data: &[u8]) -> Option<Self> {
let base = FIL_PAGE_DATA + TRX_UNDO_PAGE_HDR_SIZE;
if page_data.len() < base + TRX_UNDO_SEG_HDR_SIZE {
return None;
}
let d = &page_data[base..];
Some(UndoSegmentHeader {
state: UndoState::from_u16(BigEndian::read_u16(&d[TRX_UNDO_STATE..])),
last_log: BigEndian::read_u16(&d[TRX_UNDO_LAST_LOG..]),
})
}
}
#[derive(Debug, Clone, Serialize)]
pub struct UndoLogHeader {
pub trx_id: u64,
pub trx_no: u64,
pub del_marks: bool,
pub log_start: u16,
pub xid_exists: bool,
pub dict_trans: bool,
pub table_id: u64,
pub next_log: u16,
pub prev_log: u16,
}
impl UndoLogHeader {
pub fn parse(page_data: &[u8], log_offset: usize) -> Option<Self> {
if page_data.len() < log_offset + 34 {
return None;
}
let d = &page_data[log_offset..];
Some(UndoLogHeader {
trx_id: BigEndian::read_u64(&d[TRX_UNDO_TRX_ID..]),
trx_no: BigEndian::read_u64(&d[TRX_UNDO_TRX_NO..]),
del_marks: BigEndian::read_u16(&d[TRX_UNDO_DEL_MARKS..]) != 0,
log_start: BigEndian::read_u16(&d[TRX_UNDO_LOG_START..]),
xid_exists: d[TRX_UNDO_XID_EXISTS] != 0,
dict_trans: d[TRX_UNDO_DICT_TRANS] != 0,
table_id: BigEndian::read_u64(&d[TRX_UNDO_TABLE_ID..]),
next_log: BigEndian::read_u16(&d[TRX_UNDO_NEXT_LOG..]),
prev_log: BigEndian::read_u16(&d[TRX_UNDO_PREV_LOG..]),
})
}
}
#[derive(Debug, Clone, Serialize)]
pub struct RsegArrayHeader {
pub size: u32,
}
impl RsegArrayHeader {
pub fn parse(page_data: &[u8]) -> Option<Self> {
let base = FIL_PAGE_DATA;
if page_data.len() < base + 4 {
return None;
}
Some(RsegArrayHeader {
size: BigEndian::read_u32(&page_data[base..]),
})
}
pub fn read_slots(page_data: &[u8], max_slots: usize) -> Vec<u32> {
let base = FIL_PAGE_DATA + 4; let mut slots = Vec::new();
for i in 0..max_slots {
let offset = base + i * 4;
if offset + 4 > page_data.len() {
break;
}
let page_no = BigEndian::read_u32(&page_data[offset..]);
if page_no != 0 && page_no != crate::innodb::constants::FIL_NULL {
slots.push(page_no);
}
}
slots
}
}
const TRX_RSEG_MAX_SIZE: usize = 0; const TRX_RSEG_HISTORY_SIZE: usize = 4; #[allow(dead_code)]
const TRX_RSEG_HISTORY: usize = 8; const TRX_RSEG_SLOTS_OFFSET: usize = 24;
const TRX_RSEG_N_SLOTS: usize = 1024;
#[derive(Debug, Clone, Serialize)]
pub struct RollbackSegmentHeader {
pub max_size: u32,
pub history_size: u32,
pub slots: Vec<u32>,
}
impl RollbackSegmentHeader {
pub fn parse(page_data: &[u8]) -> Option<Self> {
let base = FIL_PAGE_DATA;
let min_size = base + TRX_RSEG_SLOTS_OFFSET + TRX_RSEG_N_SLOTS * 4;
if page_data.len() < min_size {
return None;
}
let d = &page_data[base..];
let max_size = BigEndian::read_u32(&d[TRX_RSEG_MAX_SIZE..]);
let history_size = BigEndian::read_u32(&d[TRX_RSEG_HISTORY_SIZE..]);
let mut slots = Vec::new();
for i in 0..TRX_RSEG_N_SLOTS {
let offset = TRX_RSEG_SLOTS_OFFSET + i * 4;
let page_no = BigEndian::read_u32(&d[offset..]);
slots.push(page_no);
}
Some(RollbackSegmentHeader {
max_size,
history_size,
slots,
})
}
pub fn active_slots(&self) -> Vec<u32> {
self.slots
.iter()
.copied()
.filter(|&s| s != FIL_NULL && s != 0)
.collect()
}
}
pub fn read_compressed(data: &[u8], offset: usize) -> Option<(u64, usize)> {
if offset >= data.len() {
return None;
}
let b = data[offset];
if b < 0x80 {
Some((b as u64, 1))
} else if b < 0xC0 {
if offset + 2 > data.len() {
return None;
}
let val = ((b as u64 - 0x80) << 8) | data[offset + 1] as u64;
Some((val, 2))
} else if b < 0xE0 {
if offset + 3 > data.len() {
return None;
}
let val =
((b as u64 - 0xC0) << 16) | (data[offset + 1] as u64) << 8 | data[offset + 2] as u64;
Some((val, 3))
} else if b < 0xF0 {
if offset + 4 > data.len() {
return None;
}
let val = ((b as u64 - 0xE0) << 24)
| (data[offset + 1] as u64) << 16
| (data[offset + 2] as u64) << 8
| data[offset + 3] as u64;
Some((val, 4))
} else if b == 0xF0 {
if offset + 5 > data.len() {
return None;
}
let val = BigEndian::read_u32(&data[offset + 1..]) as u64;
Some((val, 5))
} else {
None
}
}
pub fn walk_undo_log_headers(page_data: &[u8], start_offset: u16) -> Vec<UndoLogHeader> {
let mut headers = Vec::new();
let mut offset = start_offset as usize;
let max_iterations = 1000;
for _ in 0..max_iterations {
if offset == 0 || offset >= page_data.len() {
break;
}
match UndoLogHeader::parse(page_data, offset) {
Some(hdr) => {
let prev = hdr.prev_log;
headers.push(hdr);
if prev == 0 {
break;
}
offset = prev as usize;
}
None => break,
}
}
headers
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub enum UndoRecordType {
InsertRec,
UpdExistRec,
UpdDelRec,
DelMarkRec,
Unknown(u8),
}
impl UndoRecordType {
pub fn from_type_byte(byte: u8) -> Self {
match byte & 0x0F {
11 => UndoRecordType::InsertRec,
12 => UndoRecordType::UpdExistRec,
13 => UndoRecordType::UpdDelRec,
14 => UndoRecordType::DelMarkRec,
other => UndoRecordType::Unknown(other),
}
}
pub fn from_u8(val: u8) -> Self {
match val {
11 => UndoRecordType::InsertRec,
12 => UndoRecordType::UpdExistRec,
13 => UndoRecordType::UpdDelRec,
14 => UndoRecordType::DelMarkRec,
v => UndoRecordType::Unknown(v),
}
}
pub fn name(&self) -> &'static str {
match self {
UndoRecordType::InsertRec => "INSERT",
UndoRecordType::UpdExistRec => "UPD_EXIST",
UndoRecordType::UpdDelRec => "UPD_DEL",
UndoRecordType::DelMarkRec => "DEL_MARK",
UndoRecordType::Unknown(_) => "UNKNOWN",
}
}
}
impl std::fmt::Display for UndoRecordType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
#[derive(Debug, Clone, Serialize)]
pub struct UndoUpdateField {
pub field_no: u64,
pub data: Vec<u8>,
}
#[derive(Debug, Clone, Serialize)]
pub struct UndoRecord {
pub offset: usize,
pub record_type: UndoRecordType,
pub info_bits: u8,
pub next_offset: u16,
pub data_len: usize,
}
pub fn walk_undo_records(
page_data: &[u8],
start_offset: u16,
free_offset: u16,
max_records: usize,
) -> Vec<UndoRecord> {
let mut records = Vec::new();
let mut offset = start_offset as usize;
for _ in 0..max_records {
if offset == 0 || offset >= page_data.len() || offset as u16 >= free_offset {
break;
}
if offset + 3 > page_data.len() {
break;
}
let next_offset = BigEndian::read_u16(&page_data[offset..]);
let type_byte = page_data[offset + 2];
let record_type = UndoRecordType::from_type_byte(type_byte);
let info_bits = type_byte >> 4;
let end = if next_offset > 0 && (next_offset as usize) < page_data.len() {
next_offset as usize
} else {
free_offset as usize
};
let data_len = if end > offset + 3 {
end - offset - 3
} else {
0
};
records.push(UndoRecord {
offset,
record_type,
info_bits,
next_offset,
data_len,
});
if next_offset == 0 {
break;
}
offset = next_offset as usize;
}
records
}
#[derive(Debug, Clone, Serialize)]
pub struct DetailedUndoRecord {
pub offset: usize,
pub record_type: UndoRecordType,
pub undo_no: u64,
pub table_id: u64,
pub pk_fields: Vec<Vec<u8>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub trx_id: Option<u64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub roll_ptr: Option<[u8; 7]>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub update_fields: Vec<UndoUpdateField>,
}
pub fn parse_undo_records(page_data: &[u8]) -> Vec<DetailedUndoRecord> {
let mut records = Vec::new();
let page_hdr = match UndoPageHeader::parse(page_data) {
Some(h) => h,
None => return records,
};
let start = page_hdr.start as usize;
let free = page_hdr.free as usize;
if start == 0 || start >= page_data.len() || free == 0 || start >= free {
return records;
}
let mut pos = start;
let mut visited = std::collections::HashSet::new();
while pos >= start && pos < free && pos + 3 <= page_data.len() {
if !visited.insert(pos) {
break; }
let rec_offset = pos;
if pos + 2 > page_data.len() {
break;
}
let next = BigEndian::read_u16(&page_data[pos..]) as usize;
pos += 2;
if pos >= page_data.len() {
break;
}
let type_cmpl = page_data[pos];
let rec_type = UndoRecordType::from_u8(type_cmpl & 0x0F);
pos += 1;
let (undo_no, consumed) = match read_compressed(page_data, pos) {
Some(v) => v,
None => break,
};
pos += consumed;
let (table_id, consumed) = match read_compressed(page_data, pos) {
Some(v) => v,
None => break,
};
pos += consumed;
let mut trx_id = None;
let mut roll_ptr = None;
let mut update_fields = Vec::new();
let is_modify = matches!(
rec_type,
UndoRecordType::UpdExistRec | UndoRecordType::UpdDelRec | UndoRecordType::DelMarkRec
);
if is_modify {
if pos + 6 <= page_data.len() {
let mut buf = [0u8; 8];
buf[2..8].copy_from_slice(&page_data[pos..pos + 6]);
trx_id = Some(BigEndian::read_u64(&buf));
pos += 6;
}
if pos + 7 <= page_data.len() {
let mut rp = [0u8; 7];
rp.copy_from_slice(&page_data[pos..pos + 7]);
roll_ptr = Some(rp);
pos += 7;
}
if let Some((n_fields, consumed)) = read_compressed(page_data, pos) {
pos += consumed;
for _ in 0..n_fields.min(256) {
let (field_no, c1) = match read_compressed(page_data, pos) {
Some(v) => v,
None => break,
};
pos += c1;
let (flen, c2) = match read_compressed(page_data, pos) {
Some(v) => v,
None => break,
};
pos += c2;
let flen = flen as usize;
if flen > 0 && pos + flen <= page_data.len() && flen < 65536 {
update_fields.push(UndoUpdateField {
field_no,
data: page_data[pos..pos + flen].to_vec(),
});
pos += flen;
} else if flen == 0 {
update_fields.push(UndoUpdateField {
field_no,
data: Vec::new(),
});
} else {
break;
}
}
}
}
let mut pk_fields = Vec::new();
if let Some((pk_len, consumed)) = read_compressed(page_data, pos) {
pos += consumed;
let pk_len = pk_len as usize;
if pk_len > 0 && pos + pk_len <= page_data.len() && pk_len < 8192 {
pk_fields.push(page_data[pos..pos + pk_len].to_vec());
let _ = pk_len; }
}
records.push(DetailedUndoRecord {
offset: rec_offset,
record_type: rec_type,
undo_no,
table_id,
pk_fields,
trx_id,
roll_ptr,
update_fields,
});
if next == 0 || next >= free || next <= rec_offset {
break;
}
pos = next;
}
records
}
#[derive(Debug, Clone, Serialize)]
pub struct UndoSegmentInfo {
pub page_no: u64,
pub page_header: UndoPageHeader,
pub segment_header: UndoSegmentHeader,
pub log_headers: Vec<UndoLogHeader>,
pub record_count: usize,
}
#[derive(Debug, Clone, Serialize)]
pub struct UndoAnalysis {
#[serde(skip_serializing_if = "Vec::is_empty")]
pub rseg_slots: Vec<u32>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub rseg_headers: Vec<RsegInfo>,
pub segments: Vec<UndoSegmentInfo>,
pub total_transactions: usize,
pub active_transactions: usize,
}
#[derive(Debug, Clone, Serialize)]
pub struct RsegInfo {
pub page_no: u32,
pub max_size: u32,
pub history_size: u32,
pub active_slot_count: usize,
}
pub fn analyze_undo_tablespace(ts: &mut Tablespace) -> Result<UndoAnalysis, IdbError> {
let page0 = ts.read_page(0)?;
let page0_type = FilHeader::parse(&page0).map(|h| h.page_type);
if page0_type == Some(PageType::RsegArray) {
analyze_via_rseg_array(ts)
} else {
analyze_via_scan(ts)
}
}
fn analyze_via_rseg_array(ts: &mut Tablespace) -> Result<UndoAnalysis, IdbError> {
let page0 = ts.read_page(0)?;
let rseg_array = RsegArrayHeader::parse(&page0);
let rseg_slots = rseg_array
.map(|a| {
let max = a.size.min(128) as usize;
RsegArrayHeader::read_slots(&page0, max)
})
.unwrap_or_default();
let mut rseg_headers = Vec::new();
let mut segments = Vec::new();
for &rseg_page_no in &rseg_slots {
let rseg_page = match ts.read_page(rseg_page_no as u64) {
Ok(p) => p,
Err(_) => continue,
};
if let Some(rseg_hdr) = RollbackSegmentHeader::parse(&rseg_page) {
let active_slots = rseg_hdr.active_slots();
rseg_headers.push(RsegInfo {
page_no: rseg_page_no,
max_size: rseg_hdr.max_size,
history_size: rseg_hdr.history_size,
active_slot_count: active_slots.len(),
});
for &undo_page_no in &active_slots {
if let Ok(info) = read_undo_segment(ts, undo_page_no as u64) {
segments.push(info);
}
}
}
}
let total_transactions: usize = segments.iter().map(|s| s.log_headers.len()).sum();
let active_transactions = segments
.iter()
.filter(|s| s.segment_header.state == UndoState::Active)
.count();
Ok(UndoAnalysis {
rseg_slots,
rseg_headers,
segments,
total_transactions,
active_transactions,
})
}
fn analyze_via_scan(ts: &mut Tablespace) -> Result<UndoAnalysis, IdbError> {
let page_count = ts.page_count();
let mut segments = Vec::new();
for page_num in 0..page_count {
let page_data = match ts.read_page(page_num) {
Ok(d) => d,
Err(_) => continue,
};
let fil_hdr = match FilHeader::parse(&page_data) {
Some(h) => h,
None => continue,
};
if fil_hdr.page_type != PageType::UndoLog {
continue;
}
let page_hdr = match UndoPageHeader::parse(&page_data) {
Some(h) => h,
None => continue,
};
let seg_hdr = match UndoSegmentHeader::parse(&page_data) {
Some(h) => h,
None => continue,
};
if seg_hdr.last_log == 0 {
continue;
}
let log_headers = walk_undo_log_headers(&page_data, seg_hdr.last_log);
let record_count =
walk_undo_records(&page_data, page_hdr.start, page_hdr.free, 10000).len();
segments.push(UndoSegmentInfo {
page_no: page_num,
page_header: page_hdr,
segment_header: seg_hdr,
log_headers,
record_count,
});
}
let total_transactions: usize = segments.iter().map(|s| s.log_headers.len()).sum();
let active_transactions = segments
.iter()
.filter(|s| s.segment_header.state == UndoState::Active)
.count();
Ok(UndoAnalysis {
rseg_slots: Vec::new(),
rseg_headers: Vec::new(),
segments,
total_transactions,
active_transactions,
})
}
fn read_undo_segment(ts: &mut Tablespace, page_no: u64) -> Result<UndoSegmentInfo, IdbError> {
let page_data = ts.read_page(page_no)?;
let page_header = UndoPageHeader::parse(&page_data)
.ok_or_else(|| IdbError::Parse("Cannot parse undo page header".to_string()))?;
let segment_header = UndoSegmentHeader::parse(&page_data)
.ok_or_else(|| IdbError::Parse("Cannot parse undo segment header".to_string()))?;
let log_headers = walk_undo_log_headers(&page_data, segment_header.last_log);
let record_count =
walk_undo_records(&page_data, page_header.start, page_header.free, 10000).len();
Ok(UndoSegmentInfo {
page_no,
page_header,
segment_header,
log_headers,
record_count,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_undo_page_type() {
assert_eq!(UndoPageType::from_u16(1), UndoPageType::Insert);
assert_eq!(UndoPageType::from_u16(2), UndoPageType::Update);
assert_eq!(UndoPageType::from_u16(1).name(), "INSERT");
assert_eq!(UndoPageType::from_u16(2).name(), "UPDATE");
}
#[test]
fn test_undo_state() {
assert_eq!(UndoState::from_u16(1), UndoState::Active);
assert_eq!(UndoState::from_u16(2), UndoState::Cached);
assert_eq!(UndoState::from_u16(3), UndoState::ToFree);
assert_eq!(UndoState::from_u16(4), UndoState::ToPurge);
assert_eq!(UndoState::from_u16(5), UndoState::Prepared);
assert_eq!(UndoState::from_u16(1).name(), "ACTIVE");
}
#[test]
fn test_undo_page_header_parse() {
let mut page = vec![0u8; 256];
let base = FIL_PAGE_DATA;
BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_TYPE..], 1);
BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_START..], 100);
BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_FREE..], 200);
let hdr = UndoPageHeader::parse(&page).unwrap();
assert_eq!(hdr.page_type, UndoPageType::Insert);
assert_eq!(hdr.start, 100);
assert_eq!(hdr.free, 200);
}
#[test]
fn test_walk_undo_log_headers_single() {
let mut page = vec![0u8; 256];
let seg_base = FIL_PAGE_DATA + TRX_UNDO_PAGE_HDR_SIZE;
let log_offset = seg_base + TRX_UNDO_SEG_HDR_SIZE;
BigEndian::write_u16(&mut page[seg_base + TRX_UNDO_LAST_LOG..], log_offset as u16);
BigEndian::write_u64(&mut page[log_offset..], 1001); BigEndian::write_u64(&mut page[log_offset + 8..], 500); BigEndian::write_u16(&mut page[log_offset + 16..], 1); BigEndian::write_u16(&mut page[log_offset + 18..], 120); BigEndian::write_u16(&mut page[log_offset + 30..], 0); BigEndian::write_u16(&mut page[log_offset + 32..], 0);
let headers = walk_undo_log_headers(&page, log_offset as u16);
assert_eq!(headers.len(), 1);
assert_eq!(headers[0].trx_id, 1001);
assert_eq!(headers[0].trx_no, 500);
assert!(headers[0].del_marks);
}
#[test]
fn test_walk_undo_log_headers_chain() {
let mut page = vec![0u8; 512];
let offset1 = 100usize;
let offset2 = 200usize;
BigEndian::write_u64(&mut page[offset2..], 2002); BigEndian::write_u64(&mut page[offset2 + 8..], 600);
BigEndian::write_u16(&mut page[offset2 + 30..], 0); BigEndian::write_u16(&mut page[offset2 + 32..], offset1 as u16);
BigEndian::write_u64(&mut page[offset1..], 1001); BigEndian::write_u64(&mut page[offset1 + 8..], 500);
BigEndian::write_u16(&mut page[offset1 + 30..], offset2 as u16); BigEndian::write_u16(&mut page[offset1 + 32..], 0);
let headers = walk_undo_log_headers(&page, offset2 as u16);
assert_eq!(headers.len(), 2);
assert_eq!(headers[0].trx_id, 2002); assert_eq!(headers[1].trx_id, 1001);
}
#[test]
fn test_rollback_segment_header_parse() {
let page_size = 16384;
let mut page = vec![0u8; page_size];
let base = FIL_PAGE_DATA;
BigEndian::write_u32(&mut page[base + TRX_RSEG_MAX_SIZE..], 1000);
BigEndian::write_u32(&mut page[base + TRX_RSEG_HISTORY_SIZE..], 42);
BigEndian::write_u32(&mut page[base + TRX_RSEG_SLOTS_OFFSET..], 5);
BigEndian::write_u32(&mut page[base + TRX_RSEG_SLOTS_OFFSET + 4..], FIL_NULL);
let hdr = RollbackSegmentHeader::parse(&page).unwrap();
assert_eq!(hdr.max_size, 1000);
assert_eq!(hdr.history_size, 42);
let active = hdr.active_slots();
assert_eq!(active, vec![5]);
}
#[test]
fn test_undo_segment_header_parse() {
let mut page = vec![0u8; 256];
let base = FIL_PAGE_DATA + TRX_UNDO_PAGE_HDR_SIZE;
BigEndian::write_u16(&mut page[base + TRX_UNDO_STATE..], 1);
BigEndian::write_u16(&mut page[base + TRX_UNDO_LAST_LOG..], 150);
let hdr = UndoSegmentHeader::parse(&page).unwrap();
assert_eq!(hdr.state, UndoState::Active);
assert_eq!(hdr.last_log, 150);
}
#[test]
fn test_undo_record_type_classification() {
assert_eq!(
UndoRecordType::from_type_byte(11),
UndoRecordType::InsertRec
);
assert_eq!(
UndoRecordType::from_type_byte(12),
UndoRecordType::UpdExistRec
);
assert_eq!(
UndoRecordType::from_type_byte(13),
UndoRecordType::UpdDelRec
);
assert_eq!(
UndoRecordType::from_type_byte(14),
UndoRecordType::DelMarkRec
);
assert_eq!(
UndoRecordType::from_type_byte(0),
UndoRecordType::Unknown(0)
);
}
#[test]
fn test_read_compressed_1byte() {
assert_eq!(read_compressed(&[0x00], 0), Some((0, 1)));
assert_eq!(read_compressed(&[0x7F], 0), Some((127, 1)));
assert_eq!(read_compressed(&[0x42], 0), Some((0x42, 1)));
}
#[test]
fn test_read_compressed_2byte() {
assert_eq!(read_compressed(&[0x80, 0x01], 0), Some((1, 2)));
assert_eq!(read_compressed(&[0xBF, 0xFF], 0), Some((0x3FFF, 2)));
}
#[test]
fn test_read_compressed_3byte() {
assert_eq!(read_compressed(&[0xC0, 0x00, 0x01], 0), Some((1, 3)));
assert_eq!(read_compressed(&[0xDF, 0xFF, 0xFF], 0), Some((0x1FFFFF, 3)));
}
#[test]
fn test_read_compressed_4byte() {
assert_eq!(read_compressed(&[0xE0, 0x00, 0x00, 0x01], 0), Some((1, 4)));
assert_eq!(
read_compressed(&[0xEF, 0xFF, 0xFF, 0xFF], 0),
Some((0x0FFFFFFF, 4))
);
}
#[test]
fn test_undo_record_type_masks_upper_bits() {
assert_eq!(
UndoRecordType::from_type_byte(0xFB), UndoRecordType::InsertRec
);
assert_eq!(
UndoRecordType::from_type_byte(0x2E), UndoRecordType::DelMarkRec
);
}
#[test]
fn test_read_compressed_5byte() {
assert_eq!(
read_compressed(&[0xF0, 0x00, 0x00, 0x00, 0x42], 0),
Some((0x42, 5))
);
assert_eq!(
read_compressed(&[0xF0, 0xFF, 0xFF, 0xFF, 0xFF], 0),
Some((0xFFFFFFFF, 5))
);
}
#[test]
fn test_undo_record_type_names() {
assert_eq!(UndoRecordType::InsertRec.name(), "INSERT");
assert_eq!(UndoRecordType::UpdExistRec.name(), "UPD_EXIST");
assert_eq!(UndoRecordType::UpdDelRec.name(), "UPD_DEL");
assert_eq!(UndoRecordType::DelMarkRec.name(), "DEL_MARK");
assert_eq!(UndoRecordType::Unknown(0).name(), "UNKNOWN");
}
#[test]
fn test_walk_undo_records_single() {
let mut page = vec![0u8; 256];
let offset = 100usize;
BigEndian::write_u16(&mut page[offset..], 0);
page[offset + 2] = 11;
let records = walk_undo_records(&page, offset as u16, 200, 100);
assert_eq!(records.len(), 1);
assert_eq!(records[0].offset, 100);
assert_eq!(records[0].record_type, UndoRecordType::InsertRec);
assert_eq!(records[0].next_offset, 0);
}
#[test]
fn test_walk_undo_records_chain() {
let mut page = vec![0u8; 512];
let o1 = 100usize;
let o2 = 150usize;
let o3 = 200usize;
BigEndian::write_u16(&mut page[o1..], o2 as u16);
page[o1 + 2] = 11;
BigEndian::write_u16(&mut page[o2..], o3 as u16);
page[o2 + 2] = 12;
BigEndian::write_u16(&mut page[o3..], 0);
page[o3 + 2] = 14;
let records = walk_undo_records(&page, o1 as u16, 300, 100);
assert_eq!(records.len(), 3);
assert_eq!(records[0].record_type, UndoRecordType::InsertRec);
assert_eq!(records[1].record_type, UndoRecordType::UpdExistRec);
assert_eq!(records[2].record_type, UndoRecordType::DelMarkRec);
assert_eq!(records[0].data_len, 47); assert_eq!(records[1].data_len, 47); }
#[test]
fn test_walk_undo_records_respects_free_offset() {
let mut page = vec![0u8; 512];
let o1 = 100usize;
let o2 = 200usize;
BigEndian::write_u16(&mut page[o1..], o2 as u16);
page[o1 + 2] = 11;
BigEndian::write_u16(&mut page[o2..], 0);
page[o2 + 2] = 12;
let records = walk_undo_records(&page, o1 as u16, 150, 100);
assert_eq!(records.len(), 1);
}
#[test]
fn test_walk_undo_records_respects_max() {
let mut page = vec![0u8; 512];
let o1 = 100usize;
let o2 = 150usize;
let o3 = 200usize;
BigEndian::write_u16(&mut page[o1..], o2 as u16);
page[o1 + 2] = 11;
BigEndian::write_u16(&mut page[o2..], o3 as u16);
page[o2 + 2] = 12;
BigEndian::write_u16(&mut page[o3..], 0);
page[o3 + 2] = 14;
let records = walk_undo_records(&page, o1 as u16, 300, 2);
assert_eq!(records.len(), 2);
}
#[test]
fn test_walk_undo_records_empty() {
let page = vec![0u8; 256];
let records = walk_undo_records(&page, 0, 200, 100);
assert_eq!(records.len(), 0);
}
#[test]
fn test_read_compressed_insufficient_data() {
assert_eq!(read_compressed(&[], 0), None);
assert_eq!(read_compressed(&[0x80], 0), None); assert_eq!(read_compressed(&[0xC0, 0x00], 0), None); assert_eq!(read_compressed(&[0xE0, 0x00, 0x00], 0), None); assert_eq!(read_compressed(&[0xF0, 0x00, 0x00, 0x00], 0), None); }
#[test]
fn test_read_compressed_with_offset() {
let data = [0x00, 0x00, 0x42];
assert_eq!(read_compressed(&data, 2), Some((0x42, 1)));
}
#[test]
fn test_read_compressed_invalid_leading_byte() {
assert_eq!(read_compressed(&[0xF1], 0), None);
assert_eq!(read_compressed(&[0xFF], 0), None);
}
#[test]
fn test_undo_record_type_from_u8() {
assert_eq!(UndoRecordType::from_u8(11), UndoRecordType::InsertRec);
assert_eq!(UndoRecordType::from_u8(12), UndoRecordType::UpdExistRec);
assert_eq!(UndoRecordType::from_u8(13), UndoRecordType::UpdDelRec);
assert_eq!(UndoRecordType::from_u8(14), UndoRecordType::DelMarkRec);
assert_eq!(UndoRecordType::from_u8(99), UndoRecordType::Unknown(99));
}
#[test]
fn test_parse_undo_records_empty_page() {
let mut page = vec![0u8; 256];
let base = FIL_PAGE_DATA;
BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_TYPE..], 2); BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_START..], 100);
BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_FREE..], 100); assert!(parse_undo_records(&page).is_empty());
}
#[test]
fn test_parse_undo_records_single_insert() {
let mut page = vec![0u8; 512];
let base = FIL_PAGE_DATA;
let start_offset: u16 = 100;
let free_offset: u16 = 120;
BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_TYPE..], 1); BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_START..], start_offset);
BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_FREE..], free_offset);
let mut pos = start_offset as usize;
BigEndian::write_u16(&mut page[pos..], 0);
pos += 2;
page[pos] = 11;
pos += 1;
page[pos] = 5;
pos += 1;
page[pos] = 42;
pos += 1;
page[pos] = 4; pos += 1;
BigEndian::write_u32(&mut page[pos..], 1);
let records = parse_undo_records(&page);
assert_eq!(records.len(), 1);
assert_eq!(records[0].record_type, UndoRecordType::InsertRec);
assert_eq!(records[0].undo_no, 5);
assert_eq!(records[0].table_id, 42);
assert_eq!(records[0].pk_fields.len(), 1);
assert_eq!(records[0].pk_fields[0], vec![0, 0, 0, 1]);
assert!(records[0].trx_id.is_none());
}
#[test]
fn test_parse_undo_records_del_mark() {
let mut page = vec![0u8; 512];
let base = FIL_PAGE_DATA;
let start_offset: u16 = 100;
let free_offset: u16 = 200;
BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_TYPE..], 2); BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_START..], start_offset);
BigEndian::write_u16(&mut page[base + TRX_UNDO_PAGE_FREE..], free_offset);
let mut pos = start_offset as usize;
BigEndian::write_u16(&mut page[pos..], 0);
pos += 2;
page[pos] = 14;
pos += 1;
page[pos] = 10;
pos += 1;
page[pos] = 7;
pos += 1;
page[pos] = 0;
page[pos + 1] = 0;
page[pos + 2] = 0;
page[pos + 3] = 0;
page[pos + 4] = 0;
page[pos + 5] = 100;
pos += 6;
for i in 0..7 {
page[pos + i] = (i + 1) as u8;
}
pos += 7;
page[pos] = 0;
pos += 1;
page[pos] = 2;
pos += 1;
page[pos] = 0x00;
page[pos + 1] = 0x05;
let records = parse_undo_records(&page);
assert_eq!(records.len(), 1);
assert_eq!(records[0].record_type, UndoRecordType::DelMarkRec);
assert_eq!(records[0].table_id, 7);
assert_eq!(records[0].trx_id, Some(100));
assert_eq!(records[0].roll_ptr, Some([1, 2, 3, 4, 5, 6, 7]));
}
#[test]
fn test_parse_undo_records_bounds_safety() {
let page = vec![0u8; 30];
assert!(parse_undo_records(&page).is_empty());
}
#[test]
fn test_detailed_undo_record_serialization() {
let rec = DetailedUndoRecord {
offset: 100,
record_type: UndoRecordType::DelMarkRec,
undo_no: 5,
table_id: 42,
pk_fields: vec![vec![0, 0, 0, 1]],
trx_id: Some(100),
roll_ptr: Some([1, 2, 3, 4, 5, 6, 7]),
update_fields: vec![],
};
let json = serde_json::to_string(&rec).unwrap();
assert!(json.contains("\"record_type\":\"DelMarkRec\""));
assert!(json.contains("\"table_id\":42"));
assert!(json.contains("\"trx_id\":100"));
}
}