use byteorder::{ByteOrder, LittleEndian};
use serde::Serialize;
use super::event::BinlogEventType;
#[derive(Debug, Clone, Serialize)]
pub struct TableMapEvent {
pub table_id: u64,
pub database_name: String,
pub table_name: String,
pub column_count: u64,
pub column_types: Vec<u8>,
}
impl TableMapEvent {
pub fn parse(data: &[u8]) -> Option<Self> {
if data.len() < 10 {
return None;
}
let table_id = LittleEndian::read_u32(&data[0..]) as u64
| ((data[4] as u64) << 32)
| ((data[5] as u64) << 40);
let mut offset = 8;
if offset >= data.len() {
return None;
}
let db_len = data[offset] as usize;
offset += 1;
if offset + db_len + 1 > data.len() {
return None;
}
let database_name = std::str::from_utf8(&data[offset..offset + db_len])
.unwrap_or("")
.to_string();
offset += db_len + 1;
if offset >= data.len() {
return None;
}
let tbl_len = data[offset] as usize;
offset += 1;
if offset + tbl_len + 1 > data.len() {
return None;
}
let table_name = std::str::from_utf8(&data[offset..offset + tbl_len])
.unwrap_or("")
.to_string();
offset += tbl_len + 1;
if offset >= data.len() {
return None;
}
let (column_count, bytes_read) = read_lenenc_int(&data[offset..]);
offset += bytes_read;
let end = offset + column_count as usize;
if end > data.len() {
return None;
}
let column_types = data[offset..end].to_vec();
Some(TableMapEvent {
table_id,
database_name,
table_name,
column_count,
column_types,
})
}
}
#[derive(Debug, Clone, Serialize)]
pub struct RowsEvent {
pub table_id: u64,
pub event_type: BinlogEventType,
pub flags: u16,
pub column_count: u64,
pub row_count: usize,
}
impl RowsEvent {
pub fn parse(data: &[u8], type_code: u8) -> Option<Self> {
if data.len() < 10 {
return None;
}
let event_type = BinlogEventType::from_u8(type_code);
let table_id = LittleEndian::read_u32(&data[0..]) as u64
| ((data[4] as u64) << 32)
| ((data[5] as u64) << 40);
let flags = LittleEndian::read_u16(&data[6..]);
let extra_len = LittleEndian::read_u16(&data[8..]) as usize;
let mut offset = 10 + extra_len.saturating_sub(2);
if offset >= data.len() {
return None;
}
let (column_count, bytes_read) = read_lenenc_int(&data[offset..]);
offset += bytes_read;
let bitmap_len = (column_count as usize).div_ceil(8);
offset += bitmap_len; if type_code == 31 {
offset += bitmap_len; }
let row_count = if offset < data.len() { 1 } else { 0 };
Some(RowsEvent {
table_id,
event_type,
flags,
column_count,
row_count,
})
}
}
fn read_lenenc_int(data: &[u8]) -> (u64, usize) {
if data.is_empty() {
return (0, 0);
}
match data[0] {
0..=250 => (data[0] as u64, 1),
252 => {
if data.len() < 3 {
return (0, 1);
}
(LittleEndian::read_u16(&data[1..]) as u64, 3)
}
253 => {
if data.len() < 4 {
return (0, 1);
}
let v = data[1] as u64 | (data[2] as u64) << 8 | (data[3] as u64) << 16;
(v, 4)
}
254 => {
if data.len() < 9 {
return (0, 1);
}
(LittleEndian::read_u64(&data[1..]), 9)
}
_ => (0, 1), }
}
#[derive(Debug, Clone, Serialize)]
pub struct BinlogEventSummary {
pub offset: u64,
pub event_type: String,
pub type_code: u8,
pub timestamp: u32,
pub server_id: u32,
pub event_length: u32,
}
#[derive(Debug, Clone, Serialize)]
pub struct BinlogAnalysis {
pub format_description: FormatDescriptionEvent,
pub event_count: usize,
pub event_type_counts: std::collections::HashMap<String, usize>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub table_maps: Vec<TableMapEvent>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub events: Vec<BinlogEventSummary>,
}
use crate::binlog::constants::COMMON_HEADER_SIZE;
use crate::binlog::header::{validate_binlog_magic, BinlogEventHeader, FormatDescriptionEvent};
use std::io::{Read, Seek, SeekFrom};
pub fn analyze_binlog<R: Read + Seek>(mut reader: R) -> Result<BinlogAnalysis, crate::IdbError> {
let mut magic = [0u8; 4];
reader
.read_exact(&mut magic)
.map_err(|e| crate::IdbError::Io(format!("Failed to read binlog magic: {e}")))?;
if !validate_binlog_magic(&magic) {
return Err(crate::IdbError::Parse(
"Not a valid MySQL binary log file (bad magic)".to_string(),
));
}
let file_size = reader
.seek(SeekFrom::End(0))
.map_err(|e| crate::IdbError::Io(format!("Failed to seek: {e}")))?;
reader
.seek(SeekFrom::Start(4))
.map_err(|e| crate::IdbError::Io(format!("Failed to seek: {e}")))?;
let mut events = Vec::new();
let mut event_type_counts = std::collections::HashMap::new();
let mut table_maps = Vec::new();
let mut format_desc = None;
let mut position = 4u64;
let mut header_buf = vec![0u8; COMMON_HEADER_SIZE];
while position + COMMON_HEADER_SIZE as u64 <= file_size {
if reader.read_exact(&mut header_buf).is_err() {
break;
}
let hdr = match BinlogEventHeader::parse(&header_buf) {
Some(h) => h,
None => break,
};
if hdr.event_length < COMMON_HEADER_SIZE as u32 {
break;
}
let data_len = hdr.event_length as usize - COMMON_HEADER_SIZE;
let mut event_data = vec![0u8; data_len];
if reader.read_exact(&mut event_data).is_err() {
break;
}
let event_type = BinlogEventType::from_u8(hdr.type_code);
if hdr.type_code == 15 && format_desc.is_none() {
format_desc = FormatDescriptionEvent::parse(&event_data);
} else if hdr.type_code == 19 {
if let Some(tme) = TableMapEvent::parse(&event_data) {
table_maps.push(tme);
}
}
*event_type_counts
.entry(event_type.name().to_string())
.or_insert(0) += 1;
events.push(BinlogEventSummary {
offset: position,
event_type: event_type.name().to_string(),
type_code: hdr.type_code,
timestamp: hdr.timestamp,
server_id: hdr.server_id,
event_length: hdr.event_length,
});
position = if hdr.next_position > 0 {
hdr.next_position as u64
} else {
position + hdr.event_length as u64
};
if reader.seek(SeekFrom::Start(position)).is_err() {
break;
}
}
let format_description = format_desc.unwrap_or(FormatDescriptionEvent {
binlog_version: 0,
server_version: "unknown".to_string(),
create_timestamp: 0,
header_length: 19,
checksum_alg: 0,
});
Ok(BinlogAnalysis {
format_description,
event_count: events.len(),
event_type_counts,
table_maps,
events,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_type_from_u8() {
assert_eq!(BinlogEventType::from_u8(2), BinlogEventType::QueryEvent);
assert_eq!(
BinlogEventType::from_u8(15),
BinlogEventType::FormatDescription
);
assert_eq!(BinlogEventType::from_u8(19), BinlogEventType::TableMapEvent);
assert_eq!(
BinlogEventType::from_u8(30),
BinlogEventType::WriteRowsEvent
);
assert_eq!(
BinlogEventType::from_u8(31),
BinlogEventType::UpdateRowsEvent
);
assert_eq!(
BinlogEventType::from_u8(32),
BinlogEventType::DeleteRowsEvent
);
assert_eq!(BinlogEventType::from_u8(255), BinlogEventType::Unknown(255));
}
#[test]
fn test_event_type_names() {
assert_eq!(
BinlogEventType::FormatDescription.name(),
"FORMAT_DESCRIPTION"
);
assert_eq!(BinlogEventType::TableMapEvent.name(), "TABLE_MAP");
assert_eq!(BinlogEventType::WriteRowsEvent.name(), "WRITE_ROWS_V2");
assert_eq!(BinlogEventType::GtidLogEvent.name(), "GTID");
}
#[test]
fn test_event_type_display() {
assert_eq!(format!("{}", BinlogEventType::QueryEvent), "QUERY");
assert_eq!(format!("{}", BinlogEventType::Unknown(99)), "UNKNOWN(99)");
}
#[test]
fn test_table_map_event_parse() {
let mut data = vec![0u8; 50];
LittleEndian::write_u32(&mut data[0..], 42);
data[4] = 0;
data[5] = 0;
LittleEndian::write_u16(&mut data[6..], 0);
data[8] = 4;
data[9..13].copy_from_slice(b"test");
data[13] = 0;
data[14] = 5;
data[15..20].copy_from_slice(b"users");
data[20] = 0;
data[21] = 3;
data[22] = 3; data[23] = 15; data[24] = 12;
let tme = TableMapEvent::parse(&data).unwrap();
assert_eq!(tme.table_id, 42);
assert_eq!(tme.database_name, "test");
assert_eq!(tme.table_name, "users");
assert_eq!(tme.column_count, 3);
assert_eq!(tme.column_types, vec![3, 15, 12]);
}
#[test]
fn test_table_map_event_too_short() {
let data = vec![0u8; 5];
assert!(TableMapEvent::parse(&data).is_none());
}
#[test]
fn test_rows_event_parse() {
let mut data = vec![0u8; 30];
LittleEndian::write_u32(&mut data[0..], 42);
data[4] = 0;
data[5] = 0;
LittleEndian::write_u16(&mut data[6..], 1);
LittleEndian::write_u16(&mut data[8..], 2);
data[10] = 3;
data[11] = 0x07;
data[12] = 0x01;
let re = RowsEvent::parse(&data, 30).unwrap();
assert_eq!(re.table_id, 42);
assert_eq!(re.event_type, BinlogEventType::WriteRowsEvent);
assert_eq!(re.flags, 1);
assert_eq!(re.column_count, 3);
}
#[test]
fn test_lenenc_int() {
assert_eq!(read_lenenc_int(&[5]), (5, 1));
assert_eq!(read_lenenc_int(&[250]), (250, 1));
assert_eq!(read_lenenc_int(&[252, 0x01, 0x00]), (1, 3));
assert_eq!(read_lenenc_int(&[253, 0x01, 0x00, 0x00]), (1, 4));
}
#[test]
fn test_analyze_binlog_synthetic() {
use std::io::Cursor;
let mut binlog = Vec::new();
binlog.extend_from_slice(&[0xfe, 0x62, 0x69, 0x6e]);
let fde_data_len = 100usize;
let fde_event_len = (COMMON_HEADER_SIZE + fde_data_len) as u32;
let mut fde_header = vec![0u8; 19];
LittleEndian::write_u32(&mut fde_header[0..], 1700000000); fde_header[4] = 15; LittleEndian::write_u32(&mut fde_header[5..], 1); LittleEndian::write_u32(&mut fde_header[9..], fde_event_len); LittleEndian::write_u32(&mut fde_header[13..], 4 + fde_event_len); binlog.extend_from_slice(&fde_header);
let mut fde_data = vec![0u8; fde_data_len];
LittleEndian::write_u16(&mut fde_data[0..], 4); let ver = b"8.0.35";
fde_data[2..2 + ver.len()].copy_from_slice(ver);
LittleEndian::write_u32(&mut fde_data[52..], 1700000000);
fde_data[56] = 19;
fde_data[95] = 1; binlog.extend_from_slice(&fde_data);
let cursor = Cursor::new(binlog);
let analysis = analyze_binlog(cursor).unwrap();
assert_eq!(analysis.event_count, 1);
assert_eq!(analysis.format_description.binlog_version, 4);
assert_eq!(analysis.format_description.server_version, "8.0.35");
assert_eq!(
analysis.event_type_counts.get("FORMAT_DESCRIPTION"),
Some(&1)
);
}
#[test]
fn test_analyze_binlog_bad_magic() {
use std::io::Cursor;
let data = vec![0u8; 100];
let cursor = Cursor::new(data);
assert!(analyze_binlog(cursor).is_err());
}
}