1use byteorder::{ByteOrder, LittleEndian};
7use serde::Serialize;
8
9use super::event::BinlogEventType;
10
11#[derive(Debug, Clone, Serialize)]
51pub struct TableMapEvent {
52 pub table_id: u64,
54 pub database_name: String,
56 pub table_name: String,
58 pub column_count: u64,
60 pub column_types: Vec<u8>,
62}
63
64impl TableMapEvent {
65 pub fn parse(data: &[u8]) -> Option<Self> {
67 if data.len() < 10 {
68 return None;
69 }
70
71 let table_id = LittleEndian::read_u32(&data[0..]) as u64
73 | ((data[4] as u64) << 32)
74 | ((data[5] as u64) << 40);
75
76 let mut offset = 8;
78
79 if offset >= data.len() {
81 return None;
82 }
83 let db_len = data[offset] as usize;
84 offset += 1;
85 if offset + db_len + 1 > data.len() {
86 return None;
87 }
88 let database_name = std::str::from_utf8(&data[offset..offset + db_len])
89 .unwrap_or("")
90 .to_string();
91 offset += db_len + 1; if offset >= data.len() {
95 return None;
96 }
97 let tbl_len = data[offset] as usize;
98 offset += 1;
99 if offset + tbl_len + 1 > data.len() {
100 return None;
101 }
102 let table_name = std::str::from_utf8(&data[offset..offset + tbl_len])
103 .unwrap_or("")
104 .to_string();
105 offset += tbl_len + 1; if offset >= data.len() {
109 return None;
110 }
111 let (column_count, bytes_read) = read_lenenc_int(&data[offset..]);
112 offset += bytes_read;
113
114 let end = offset + column_count as usize;
116 if end > data.len() {
117 return None;
118 }
119 let column_types = data[offset..end].to_vec();
120
121 Some(TableMapEvent {
122 table_id,
123 database_name,
124 table_name,
125 column_count,
126 column_types,
127 })
128 }
129}
130
131#[derive(Debug, Clone, Serialize)]
136pub struct RowsEvent {
137 pub table_id: u64,
139 pub event_type: BinlogEventType,
141 pub flags: u16,
143 pub column_count: u64,
145 pub row_count: usize,
147}
148
149impl RowsEvent {
150 pub fn parse(data: &[u8], type_code: u8) -> Option<Self> {
154 if data.len() < 10 {
155 return None;
156 }
157
158 let event_type = BinlogEventType::from_u8(type_code);
159
160 let table_id = LittleEndian::read_u32(&data[0..]) as u64
162 | ((data[4] as u64) << 32)
163 | ((data[5] as u64) << 40);
164
165 let flags = LittleEndian::read_u16(&data[6..]);
167
168 let extra_len = LittleEndian::read_u16(&data[8..]) as usize;
170 let mut offset = 10 + extra_len.saturating_sub(2); if offset >= data.len() {
174 return None;
175 }
176 let (column_count, bytes_read) = read_lenenc_int(&data[offset..]);
177 offset += bytes_read;
178
179 let bitmap_len = (column_count as usize).div_ceil(8);
181 offset += bitmap_len; if type_code == 31 {
183 offset += bitmap_len; }
185
186 let row_count = if offset < data.len() { 1 } else { 0 };
189
190 Some(RowsEvent {
191 table_id,
192 event_type,
193 flags,
194 column_count,
195 row_count,
196 })
197 }
198}
199
200fn read_lenenc_int(data: &[u8]) -> (u64, usize) {
204 if data.is_empty() {
205 return (0, 0);
206 }
207 match data[0] {
208 0..=250 => (data[0] as u64, 1),
209 252 => {
210 if data.len() < 3 {
211 return (0, 1);
212 }
213 (LittleEndian::read_u16(&data[1..]) as u64, 3)
214 }
215 253 => {
216 if data.len() < 4 {
217 return (0, 1);
218 }
219 let v = data[1] as u64 | (data[2] as u64) << 8 | (data[3] as u64) << 16;
220 (v, 4)
221 }
222 254 => {
223 if data.len() < 9 {
224 return (0, 1);
225 }
226 (LittleEndian::read_u64(&data[1..]), 9)
227 }
228 _ => (0, 1), }
230}
231
232#[derive(Debug, Clone, Serialize)]
234pub struct BinlogEventSummary {
235 pub offset: u64,
237 pub event_type: String,
239 pub type_code: u8,
241 pub timestamp: u32,
243 pub server_id: u32,
245 pub event_length: u32,
247}
248
249#[derive(Debug, Clone, Serialize)]
251pub struct BinlogAnalysis {
252 pub format_description: FormatDescriptionEvent,
254 pub event_count: usize,
256 pub event_type_counts: std::collections::HashMap<String, usize>,
258 #[serde(skip_serializing_if = "Vec::is_empty")]
260 pub table_maps: Vec<TableMapEvent>,
261 #[serde(skip_serializing_if = "Vec::is_empty")]
263 pub events: Vec<BinlogEventSummary>,
264}
265
266use crate::binlog::constants::COMMON_HEADER_SIZE;
267use crate::binlog::header::{validate_binlog_magic, BinlogEventHeader, FormatDescriptionEvent};
268use std::io::{Read, Seek, SeekFrom};
269
270pub fn analyze_binlog<R: Read + Seek>(mut reader: R) -> Result<BinlogAnalysis, crate::IdbError> {
274 let mut magic = [0u8; 4];
276 reader
277 .read_exact(&mut magic)
278 .map_err(|e| crate::IdbError::Io(format!("Failed to read binlog magic: {e}")))?;
279
280 if !validate_binlog_magic(&magic) {
281 return Err(crate::IdbError::Parse(
282 "Not a valid MySQL binary log file (bad magic)".to_string(),
283 ));
284 }
285
286 let file_size = reader
287 .seek(SeekFrom::End(0))
288 .map_err(|e| crate::IdbError::Io(format!("Failed to seek: {e}")))?;
289 reader
290 .seek(SeekFrom::Start(4))
291 .map_err(|e| crate::IdbError::Io(format!("Failed to seek: {e}")))?;
292
293 let mut events = Vec::new();
294 let mut event_type_counts = std::collections::HashMap::new();
295 let mut table_maps = Vec::new();
296 let mut format_desc = None;
297
298 let mut position = 4u64;
299 let mut header_buf = vec![0u8; COMMON_HEADER_SIZE];
300
301 while position + COMMON_HEADER_SIZE as u64 <= file_size {
302 if reader.read_exact(&mut header_buf).is_err() {
303 break;
304 }
305
306 let hdr = match BinlogEventHeader::parse(&header_buf) {
307 Some(h) => h,
308 None => break,
309 };
310
311 if hdr.event_length < COMMON_HEADER_SIZE as u32 {
312 break;
313 }
314
315 let data_len = hdr.event_length as usize - COMMON_HEADER_SIZE;
316 let mut event_data = vec![0u8; data_len];
317 if reader.read_exact(&mut event_data).is_err() {
318 break;
319 }
320
321 let event_type = BinlogEventType::from_u8(hdr.type_code);
322
323 if hdr.type_code == 15 && format_desc.is_none() {
325 format_desc = FormatDescriptionEvent::parse(&event_data);
326 } else if hdr.type_code == 19 {
327 if let Some(tme) = TableMapEvent::parse(&event_data) {
328 table_maps.push(tme);
329 }
330 }
331
332 *event_type_counts
333 .entry(event_type.name().to_string())
334 .or_insert(0) += 1;
335
336 events.push(BinlogEventSummary {
337 offset: position,
338 event_type: event_type.name().to_string(),
339 type_code: hdr.type_code,
340 timestamp: hdr.timestamp,
341 server_id: hdr.server_id,
342 event_length: hdr.event_length,
343 });
344
345 position = if hdr.next_position > 0 {
346 hdr.next_position as u64
347 } else {
348 position + hdr.event_length as u64
349 };
350
351 if reader.seek(SeekFrom::Start(position)).is_err() {
353 break;
354 }
355 }
356
357 let format_description = format_desc.unwrap_or(FormatDescriptionEvent {
358 binlog_version: 0,
359 server_version: "unknown".to_string(),
360 create_timestamp: 0,
361 header_length: 19,
362 checksum_alg: 0,
363 });
364
365 Ok(BinlogAnalysis {
366 format_description,
367 event_count: events.len(),
368 event_type_counts,
369 table_maps,
370 events,
371 })
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377
378 #[test]
379 fn test_event_type_from_u8() {
380 assert_eq!(BinlogEventType::from_u8(2), BinlogEventType::QueryEvent);
381 assert_eq!(
382 BinlogEventType::from_u8(15),
383 BinlogEventType::FormatDescription
384 );
385 assert_eq!(BinlogEventType::from_u8(19), BinlogEventType::TableMapEvent);
386 assert_eq!(
387 BinlogEventType::from_u8(30),
388 BinlogEventType::WriteRowsEvent
389 );
390 assert_eq!(
391 BinlogEventType::from_u8(31),
392 BinlogEventType::UpdateRowsEvent
393 );
394 assert_eq!(
395 BinlogEventType::from_u8(32),
396 BinlogEventType::DeleteRowsEvent
397 );
398 assert_eq!(BinlogEventType::from_u8(255), BinlogEventType::Unknown(255));
399 }
400
401 #[test]
402 fn test_event_type_names() {
403 assert_eq!(
404 BinlogEventType::FormatDescription.name(),
405 "FORMAT_DESCRIPTION"
406 );
407 assert_eq!(BinlogEventType::TableMapEvent.name(), "TABLE_MAP");
408 assert_eq!(BinlogEventType::WriteRowsEvent.name(), "WRITE_ROWS_V2");
409 assert_eq!(BinlogEventType::GtidLogEvent.name(), "GTID");
410 }
411
412 #[test]
413 fn test_event_type_display() {
414 assert_eq!(format!("{}", BinlogEventType::QueryEvent), "QUERY");
415 assert_eq!(format!("{}", BinlogEventType::Unknown(99)), "UNKNOWN(99)");
416 }
417
418 #[test]
419 fn test_table_map_event_parse() {
420 let mut data = vec![0u8; 50];
421 LittleEndian::write_u32(&mut data[0..], 42);
423 data[4] = 0;
424 data[5] = 0;
425 LittleEndian::write_u16(&mut data[6..], 0);
427 data[8] = 4;
429 data[9..13].copy_from_slice(b"test");
430 data[13] = 0;
431 data[14] = 5;
433 data[15..20].copy_from_slice(b"users");
434 data[20] = 0;
435 data[21] = 3;
437 data[22] = 3; data[23] = 15; data[24] = 12; let tme = TableMapEvent::parse(&data).unwrap();
443 assert_eq!(tme.table_id, 42);
444 assert_eq!(tme.database_name, "test");
445 assert_eq!(tme.table_name, "users");
446 assert_eq!(tme.column_count, 3);
447 assert_eq!(tme.column_types, vec![3, 15, 12]);
448 }
449
450 #[test]
451 fn test_table_map_event_too_short() {
452 let data = vec![0u8; 5];
453 assert!(TableMapEvent::parse(&data).is_none());
454 }
455
456 #[test]
457 fn test_rows_event_parse() {
458 let mut data = vec![0u8; 30];
459 LittleEndian::write_u32(&mut data[0..], 42);
461 data[4] = 0;
462 data[5] = 0;
463 LittleEndian::write_u16(&mut data[6..], 1);
465 LittleEndian::write_u16(&mut data[8..], 2);
467 data[10] = 3;
469 data[11] = 0x07;
471 data[12] = 0x01;
473
474 let re = RowsEvent::parse(&data, 30).unwrap();
475 assert_eq!(re.table_id, 42);
476 assert_eq!(re.event_type, BinlogEventType::WriteRowsEvent);
477 assert_eq!(re.flags, 1);
478 assert_eq!(re.column_count, 3);
479 }
480
481 #[test]
482 fn test_lenenc_int() {
483 assert_eq!(read_lenenc_int(&[5]), (5, 1));
484 assert_eq!(read_lenenc_int(&[250]), (250, 1));
485 assert_eq!(read_lenenc_int(&[252, 0x01, 0x00]), (1, 3));
486 assert_eq!(read_lenenc_int(&[253, 0x01, 0x00, 0x00]), (1, 4));
487 }
488
489 #[test]
490 fn test_analyze_binlog_synthetic() {
491 use std::io::Cursor;
492
493 let mut binlog = Vec::new();
496 binlog.extend_from_slice(&[0xfe, 0x62, 0x69, 0x6e]); let fde_data_len = 100usize;
500 let fde_event_len = (COMMON_HEADER_SIZE + fde_data_len) as u32;
501
502 let mut fde_header = vec![0u8; 19];
503 LittleEndian::write_u32(&mut fde_header[0..], 1700000000); fde_header[4] = 15; LittleEndian::write_u32(&mut fde_header[5..], 1); LittleEndian::write_u32(&mut fde_header[9..], fde_event_len); LittleEndian::write_u32(&mut fde_header[13..], 4 + fde_event_len); binlog.extend_from_slice(&fde_header);
509
510 let mut fde_data = vec![0u8; fde_data_len];
511 LittleEndian::write_u16(&mut fde_data[0..], 4); let ver = b"8.0.35";
513 fde_data[2..2 + ver.len()].copy_from_slice(ver);
514 LittleEndian::write_u32(&mut fde_data[52..], 1700000000);
515 fde_data[56] = 19;
516 fde_data[95] = 1; binlog.extend_from_slice(&fde_data);
518
519 let cursor = Cursor::new(binlog);
520 let analysis = analyze_binlog(cursor).unwrap();
521
522 assert_eq!(analysis.event_count, 1);
523 assert_eq!(analysis.format_description.binlog_version, 4);
524 assert_eq!(analysis.format_description.server_version, "8.0.35");
525 assert_eq!(
526 analysis.event_type_counts.get("FORMAT_DESCRIPTION"),
527 Some(&1)
528 );
529 }
530
531 #[test]
532 fn test_analyze_binlog_bad_magic() {
533 use std::io::Cursor;
534 let data = vec![0u8; 100];
535 let cursor = Cursor::new(data);
536 assert!(analyze_binlog(cursor).is_err());
537 }
538}