Skip to main content

idb/binlog/
events.rs

1//! Binlog row-based event parsing and analysis.
2//!
3//! Provides parsing for TABLE_MAP and row-based events (WRITE/UPDATE/DELETE
4//! ROWS v2), plus top-level binlog file analysis via [`analyze_binlog`].
5
6use byteorder::{ByteOrder, LittleEndian};
7use serde::Serialize;
8
9use super::event::BinlogEventType;
10
11/// Parsed TABLE_MAP event (type 19).
12///
13/// Maps a table_id to a database.table name and column type information.
14/// This event precedes row-based events to provide schema context.
15///
16/// # Examples
17///
18/// ```
19/// use idb::binlog::events::TableMapEvent;
20/// use byteorder::{LittleEndian, ByteOrder};
21///
22/// let mut data = vec![0u8; 50];
23/// // table_id (6 bytes LE)
24/// LittleEndian::write_u32(&mut data[0..], 42);
25/// data[4] = 0; data[5] = 0;
26/// // flags (2 bytes)
27/// LittleEndian::write_u16(&mut data[6..], 0);
28/// // database name length (1 byte) + name + NUL
29/// data[8] = 4; // "test"
30/// data[9..13].copy_from_slice(b"test");
31/// data[13] = 0; // NUL
32/// // table name length (1 byte) + name + NUL
33/// data[14] = 5; // "users"
34/// data[15..20].copy_from_slice(b"users");
35/// data[20] = 0; // NUL
36/// // column_count (packed integer)
37/// data[21] = 3;
38/// // column_types
39/// data[22] = 3; // LONG
40/// data[23] = 15; // VARCHAR
41/// data[24] = 12; // DATETIME
42///
43/// let tme = TableMapEvent::parse(&data).unwrap();
44/// assert_eq!(tme.table_id, 42);
45/// assert_eq!(tme.database_name, "test");
46/// assert_eq!(tme.table_name, "users");
47/// assert_eq!(tme.column_count, 3);
48/// assert_eq!(tme.column_types, vec![3, 15, 12]);
49/// ```
50#[derive(Debug, Clone, Serialize)]
51pub struct TableMapEvent {
52    /// Internal table ID.
53    pub table_id: u64,
54    /// Database (schema) name.
55    pub database_name: String,
56    /// Table name.
57    pub table_name: String,
58    /// Number of columns.
59    pub column_count: u64,
60    /// Column type codes.
61    pub column_types: Vec<u8>,
62}
63
64impl TableMapEvent {
65    /// Parse a TABLE_MAP event from the event data (after the common header).
66    pub fn parse(data: &[u8]) -> Option<Self> {
67        if data.len() < 10 {
68            return None;
69        }
70
71        // table_id: 6 bytes LE
72        let table_id = LittleEndian::read_u32(&data[0..]) as u64
73            | ((data[4] as u64) << 32)
74            | ((data[5] as u64) << 40);
75
76        // flags: 2 bytes (skip)
77        let mut offset = 8;
78
79        // database name: 1-byte length + string + NUL
80        if offset >= data.len() {
81            return None;
82        }
83        let db_len = data[offset] as usize;
84        offset += 1;
85        if offset + db_len + 1 > data.len() {
86            return None;
87        }
88        let database_name = std::str::from_utf8(&data[offset..offset + db_len])
89            .unwrap_or("")
90            .to_string();
91        offset += db_len + 1; // skip NUL
92
93        // table name: 1-byte length + string + NUL
94        if offset >= data.len() {
95            return None;
96        }
97        let tbl_len = data[offset] as usize;
98        offset += 1;
99        if offset + tbl_len + 1 > data.len() {
100            return None;
101        }
102        let table_name = std::str::from_utf8(&data[offset..offset + tbl_len])
103            .unwrap_or("")
104            .to_string();
105        offset += tbl_len + 1; // skip NUL
106
107        // column_count: packed integer (lenenc)
108        if offset >= data.len() {
109            return None;
110        }
111        let (column_count, bytes_read) = read_lenenc_int(&data[offset..]);
112        offset += bytes_read;
113
114        // column_types: column_count bytes
115        let end = offset + column_count as usize;
116        if end > data.len() {
117            return None;
118        }
119        let column_types = data[offset..end].to_vec();
120
121        Some(TableMapEvent {
122            table_id,
123            database_name,
124            table_name,
125            column_count,
126            column_types,
127        })
128    }
129}
130
131/// Parsed row-based event summary (types 30-32).
132///
133/// Contains metadata about row changes. Full row data decoding is not
134/// performed — only the event structure and row count are extracted.
135#[derive(Debug, Clone, Serialize)]
136pub struct RowsEvent {
137    /// Internal table ID (matches TABLE_MAP event).
138    pub table_id: u64,
139    /// Event type.
140    pub event_type: BinlogEventType,
141    /// Event flags.
142    pub flags: u16,
143    /// Number of columns involved.
144    pub column_count: u64,
145    /// Approximate row count (estimated from data size).
146    pub row_count: usize,
147}
148
149impl RowsEvent {
150    /// Parse a row-based event from the event data (after the common header).
151    ///
152    /// Supports WRITE_ROWS_V2 (30), UPDATE_ROWS_V2 (31), DELETE_ROWS_V2 (32).
153    pub fn parse(data: &[u8], type_code: u8) -> Option<Self> {
154        if data.len() < 10 {
155            return None;
156        }
157
158        let event_type = BinlogEventType::from_u8(type_code);
159
160        // table_id: 6 bytes LE
161        let table_id = LittleEndian::read_u32(&data[0..]) as u64
162            | ((data[4] as u64) << 32)
163            | ((data[5] as u64) << 40);
164
165        // flags: 2 bytes
166        let flags = LittleEndian::read_u16(&data[6..]);
167
168        // extra_data_length: 2 bytes (v2 events)
169        let extra_len = LittleEndian::read_u16(&data[8..]) as usize;
170        let mut offset = 10 + extra_len.saturating_sub(2); // extra_len includes itself
171
172        // column_count: packed integer
173        if offset >= data.len() {
174            return None;
175        }
176        let (column_count, bytes_read) = read_lenenc_int(&data[offset..]);
177        offset += bytes_read;
178
179        // Skip column bitmaps to estimate row count from remaining data
180        let bitmap_len = (column_count as usize).div_ceil(8);
181        offset += bitmap_len; // columns_before_image
182        if type_code == 31 {
183            offset += bitmap_len; // columns_after_image for UPDATE
184        }
185
186        // Rough row count estimate: we can't parse row data without column metadata,
187        // so count is estimated as 1 if there's remaining data
188        let row_count = if offset < data.len() { 1 } else { 0 };
189
190        Some(RowsEvent {
191            table_id,
192            event_type,
193            flags,
194            column_count,
195            row_count,
196        })
197    }
198}
199
200/// Read a MySQL packed (length-encoded) integer.
201///
202/// Returns (value, bytes_consumed).
203fn read_lenenc_int(data: &[u8]) -> (u64, usize) {
204    if data.is_empty() {
205        return (0, 0);
206    }
207    match data[0] {
208        0..=250 => (data[0] as u64, 1),
209        252 => {
210            if data.len() < 3 {
211                return (0, 1);
212            }
213            (LittleEndian::read_u16(&data[1..]) as u64, 3)
214        }
215        253 => {
216            if data.len() < 4 {
217                return (0, 1);
218            }
219            let v = data[1] as u64 | (data[2] as u64) << 8 | (data[3] as u64) << 16;
220            (v, 4)
221        }
222        254 => {
223            if data.len() < 9 {
224                return (0, 1);
225            }
226            (LittleEndian::read_u64(&data[1..]), 9)
227        }
228        _ => (0, 1), // 251 = NULL, 255 = undefined
229    }
230}
231
232/// Summary of a single binlog event for analysis output.
233#[derive(Debug, Clone, Serialize)]
234pub struct BinlogEventSummary {
235    /// File offset of the event.
236    pub offset: u64,
237    /// Event type name.
238    pub event_type: String,
239    /// Event type code.
240    pub type_code: u8,
241    /// Unix timestamp.
242    pub timestamp: u32,
243    /// Server ID.
244    pub server_id: u32,
245    /// Total event size in bytes.
246    pub event_length: u32,
247}
248
249/// Top-level analysis result for a binary log file.
250#[derive(Debug, Clone, Serialize)]
251pub struct BinlogAnalysis {
252    /// Format description from the first event.
253    pub format_description: FormatDescriptionEvent,
254    /// Total number of events.
255    pub event_count: usize,
256    /// Count of events by type name.
257    pub event_type_counts: std::collections::HashMap<String, usize>,
258    /// TABLE_MAP events found.
259    #[serde(skip_serializing_if = "Vec::is_empty")]
260    pub table_maps: Vec<TableMapEvent>,
261    /// Individual event summaries.
262    #[serde(skip_serializing_if = "Vec::is_empty")]
263    pub events: Vec<BinlogEventSummary>,
264}
265
266use crate::binlog::constants::COMMON_HEADER_SIZE;
267use crate::binlog::header::{validate_binlog_magic, BinlogEventHeader, FormatDescriptionEvent};
268use std::io::{Read, Seek, SeekFrom};
269
270/// Analyze a binary log file from a reader.
271///
272/// Reads all events, collecting summaries, TABLE_MAP events, and type counts.
273pub fn analyze_binlog<R: Read + Seek>(mut reader: R) -> Result<BinlogAnalysis, crate::IdbError> {
274    // Validate magic
275    let mut magic = [0u8; 4];
276    reader
277        .read_exact(&mut magic)
278        .map_err(|e| crate::IdbError::Io(format!("Failed to read binlog magic: {e}")))?;
279
280    if !validate_binlog_magic(&magic) {
281        return Err(crate::IdbError::Parse(
282            "Not a valid MySQL binary log file (bad magic)".to_string(),
283        ));
284    }
285
286    let file_size = reader
287        .seek(SeekFrom::End(0))
288        .map_err(|e| crate::IdbError::Io(format!("Failed to seek: {e}")))?;
289    reader
290        .seek(SeekFrom::Start(4))
291        .map_err(|e| crate::IdbError::Io(format!("Failed to seek: {e}")))?;
292
293    let mut events = Vec::new();
294    let mut event_type_counts = std::collections::HashMap::new();
295    let mut table_maps = Vec::new();
296    let mut format_desc = None;
297
298    let mut position = 4u64;
299    let mut header_buf = vec![0u8; COMMON_HEADER_SIZE];
300
301    while position + COMMON_HEADER_SIZE as u64 <= file_size {
302        if reader.read_exact(&mut header_buf).is_err() {
303            break;
304        }
305
306        let hdr = match BinlogEventHeader::parse(&header_buf) {
307            Some(h) => h,
308            None => break,
309        };
310
311        if hdr.event_length < COMMON_HEADER_SIZE as u32 {
312            break;
313        }
314
315        let data_len = hdr.event_length as usize - COMMON_HEADER_SIZE;
316        let mut event_data = vec![0u8; data_len];
317        if reader.read_exact(&mut event_data).is_err() {
318            break;
319        }
320
321        let event_type = BinlogEventType::from_u8(hdr.type_code);
322
323        // Parse specific event types
324        if hdr.type_code == 15 && format_desc.is_none() {
325            format_desc = FormatDescriptionEvent::parse(&event_data);
326        } else if hdr.type_code == 19 {
327            if let Some(tme) = TableMapEvent::parse(&event_data) {
328                table_maps.push(tme);
329            }
330        }
331
332        *event_type_counts
333            .entry(event_type.name().to_string())
334            .or_insert(0) += 1;
335
336        events.push(BinlogEventSummary {
337            offset: position,
338            event_type: event_type.name().to_string(),
339            type_code: hdr.type_code,
340            timestamp: hdr.timestamp,
341            server_id: hdr.server_id,
342            event_length: hdr.event_length,
343        });
344
345        position = if hdr.next_position > 0 {
346            hdr.next_position as u64
347        } else {
348            position + hdr.event_length as u64
349        };
350
351        // Seek to next event position (in case of padding or checksum)
352        if reader.seek(SeekFrom::Start(position)).is_err() {
353            break;
354        }
355    }
356
357    let format_description = format_desc.unwrap_or(FormatDescriptionEvent {
358        binlog_version: 0,
359        server_version: "unknown".to_string(),
360        create_timestamp: 0,
361        header_length: 19,
362        checksum_alg: 0,
363    });
364
365    Ok(BinlogAnalysis {
366        format_description,
367        event_count: events.len(),
368        event_type_counts,
369        table_maps,
370        events,
371    })
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377
378    #[test]
379    fn test_event_type_from_u8() {
380        assert_eq!(BinlogEventType::from_u8(2), BinlogEventType::QueryEvent);
381        assert_eq!(
382            BinlogEventType::from_u8(15),
383            BinlogEventType::FormatDescription
384        );
385        assert_eq!(BinlogEventType::from_u8(19), BinlogEventType::TableMapEvent);
386        assert_eq!(
387            BinlogEventType::from_u8(30),
388            BinlogEventType::WriteRowsEvent
389        );
390        assert_eq!(
391            BinlogEventType::from_u8(31),
392            BinlogEventType::UpdateRowsEvent
393        );
394        assert_eq!(
395            BinlogEventType::from_u8(32),
396            BinlogEventType::DeleteRowsEvent
397        );
398        assert_eq!(BinlogEventType::from_u8(255), BinlogEventType::Unknown(255));
399    }
400
401    #[test]
402    fn test_event_type_names() {
403        assert_eq!(
404            BinlogEventType::FormatDescription.name(),
405            "FORMAT_DESCRIPTION"
406        );
407        assert_eq!(BinlogEventType::TableMapEvent.name(), "TABLE_MAP");
408        assert_eq!(BinlogEventType::WriteRowsEvent.name(), "WRITE_ROWS_V2");
409        assert_eq!(BinlogEventType::GtidLogEvent.name(), "GTID");
410    }
411
412    #[test]
413    fn test_event_type_display() {
414        assert_eq!(format!("{}", BinlogEventType::QueryEvent), "QUERY");
415        assert_eq!(format!("{}", BinlogEventType::Unknown(99)), "UNKNOWN(99)");
416    }
417
418    #[test]
419    fn test_table_map_event_parse() {
420        let mut data = vec![0u8; 50];
421        // table_id = 42
422        LittleEndian::write_u32(&mut data[0..], 42);
423        data[4] = 0;
424        data[5] = 0;
425        // flags
426        LittleEndian::write_u16(&mut data[6..], 0);
427        // db name: "test"
428        data[8] = 4;
429        data[9..13].copy_from_slice(b"test");
430        data[13] = 0;
431        // table name: "users"
432        data[14] = 5;
433        data[15..20].copy_from_slice(b"users");
434        data[20] = 0;
435        // column_count = 3
436        data[21] = 3;
437        // column types
438        data[22] = 3; // LONG
439        data[23] = 15; // VARCHAR
440        data[24] = 12; // DATETIME
441
442        let tme = TableMapEvent::parse(&data).unwrap();
443        assert_eq!(tme.table_id, 42);
444        assert_eq!(tme.database_name, "test");
445        assert_eq!(tme.table_name, "users");
446        assert_eq!(tme.column_count, 3);
447        assert_eq!(tme.column_types, vec![3, 15, 12]);
448    }
449
450    #[test]
451    fn test_table_map_event_too_short() {
452        let data = vec![0u8; 5];
453        assert!(TableMapEvent::parse(&data).is_none());
454    }
455
456    #[test]
457    fn test_rows_event_parse() {
458        let mut data = vec![0u8; 30];
459        // table_id = 42
460        LittleEndian::write_u32(&mut data[0..], 42);
461        data[4] = 0;
462        data[5] = 0;
463        // flags
464        LittleEndian::write_u16(&mut data[6..], 1);
465        // extra_data_length = 2 (minimum, self-inclusive)
466        LittleEndian::write_u16(&mut data[8..], 2);
467        // column_count = 3
468        data[10] = 3;
469        // bitmap (1 byte for 3 columns)
470        data[11] = 0x07;
471        // some row data
472        data[12] = 0x01;
473
474        let re = RowsEvent::parse(&data, 30).unwrap();
475        assert_eq!(re.table_id, 42);
476        assert_eq!(re.event_type, BinlogEventType::WriteRowsEvent);
477        assert_eq!(re.flags, 1);
478        assert_eq!(re.column_count, 3);
479    }
480
481    #[test]
482    fn test_lenenc_int() {
483        assert_eq!(read_lenenc_int(&[5]), (5, 1));
484        assert_eq!(read_lenenc_int(&[250]), (250, 1));
485        assert_eq!(read_lenenc_int(&[252, 0x01, 0x00]), (1, 3));
486        assert_eq!(read_lenenc_int(&[253, 0x01, 0x00, 0x00]), (1, 4));
487    }
488
489    #[test]
490    fn test_analyze_binlog_synthetic() {
491        use std::io::Cursor;
492
493        // Build a minimal synthetic binlog:
494        // 4-byte magic + FDE event (19-byte header + FDE data)
495        let mut binlog = Vec::new();
496        binlog.extend_from_slice(&[0xfe, 0x62, 0x69, 0x6e]); // magic
497
498        // Build FDE event
499        let fde_data_len = 100usize;
500        let fde_event_len = (COMMON_HEADER_SIZE + fde_data_len) as u32;
501
502        let mut fde_header = vec![0u8; 19];
503        LittleEndian::write_u32(&mut fde_header[0..], 1700000000); // timestamp
504        fde_header[4] = 15; // FORMAT_DESCRIPTION_EVENT
505        LittleEndian::write_u32(&mut fde_header[5..], 1); // server_id
506        LittleEndian::write_u32(&mut fde_header[9..], fde_event_len); // event_length
507        LittleEndian::write_u32(&mut fde_header[13..], 4 + fde_event_len); // next_position
508        binlog.extend_from_slice(&fde_header);
509
510        let mut fde_data = vec![0u8; fde_data_len];
511        LittleEndian::write_u16(&mut fde_data[0..], 4); // binlog_version
512        let ver = b"8.0.35";
513        fde_data[2..2 + ver.len()].copy_from_slice(ver);
514        LittleEndian::write_u32(&mut fde_data[52..], 1700000000);
515        fde_data[56] = 19;
516        fde_data[95] = 1; // checksum_alg = CRC32
517        binlog.extend_from_slice(&fde_data);
518
519        let cursor = Cursor::new(binlog);
520        let analysis = analyze_binlog(cursor).unwrap();
521
522        assert_eq!(analysis.event_count, 1);
523        assert_eq!(analysis.format_description.binlog_version, 4);
524        assert_eq!(analysis.format_description.server_version, "8.0.35");
525        assert_eq!(
526            analysis.event_type_counts.get("FORMAT_DESCRIPTION"),
527            Some(&1)
528        );
529    }
530
531    #[test]
532    fn test_analyze_binlog_bad_magic() {
533        use std::io::Cursor;
534        let data = vec![0u8; 100];
535        let cursor = Cursor::new(data);
536        assert!(analyze_binlog(cursor).is_err());
537    }
538}