mysql_binlog_connector_rust/event/
event_header.rs

1use std::io::{Cursor, Read, Seek, SeekFrom};
2
3use byteorder::{LittleEndian, ReadBytesExt};
4use serde::{Deserialize, Serialize};
5
6use crate::{binlog_error::BinlogError, constants, ext::cursor_ext::CursorExt};
7
8#[derive(Debug, Deserialize, Serialize, Clone)]
9pub struct EventHeader {
10    pub timestamp: u32,
11    pub event_type: u8,
12    pub server_id: u32,
13    pub event_length: u32,
14    pub next_event_position: u32,
15    pub event_flags: u16,
16}
17
18impl EventHeader {
19    pub fn parse<S: Read + Seek>(stream: &mut S) -> Result<Self, BinlogError> {
20        // refer: https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Log__event__header.html
21        let mut buf = [0u8; constants::EVENT_HEADER_LENGTH];
22        stream.read_exact(&mut buf)?;
23
24        let mut cursor = Cursor::new(&buf);
25        Ok(Self {
26            timestamp: cursor.read_u32::<LittleEndian>()?,
27            event_type: cursor.read_u8()?,
28            server_id: cursor.read_u32::<LittleEndian>()?,
29            event_length: cursor.read_u32::<LittleEndian>()?,
30            next_event_position: cursor.read_u32::<LittleEndian>()?,
31            event_flags: cursor.read_u16::<LittleEndian>()?,
32        })
33    }
34
35    // Parse the common header for rows events:
36    // WriteRows / UpdateRows / DeleteRows
37    // ExtWriteRows / ExtUpdateRows / ExtDeleteRows
38    pub fn parse_rows_event_common_header(
39        cursor: &mut Cursor<&Vec<u8>>,
40        row_event_version: u8,
41    ) -> Result<(u64, usize, Vec<bool>), BinlogError> {
42        let table_id = cursor.read_u48::<LittleEndian>()?;
43        let _flags = cursor.read_u16::<LittleEndian>()?;
44
45        // ExtWriteRows/ExtUpdateRows/ExtDeleteRows, version 2, MySQL only
46        if row_event_version == 2 {
47            let extra_data_length = cursor.read_u16::<LittleEndian>()? as i64;
48            cursor.seek(SeekFrom::Current(extra_data_length - 2))?;
49        }
50
51        let column_count = cursor.read_packed_number()?;
52        let included_columns = cursor.read_bits(column_count, false)?;
53
54        Ok((table_id, column_count, included_columns))
55    }
56}