mysql_binlog_connector_rust/event/
event_header.rs1use std::io::{Cursor, Read, Seek, SeekFrom};
2
3use byteorder::{LittleEndian, ReadBytesExt};
4use serde::{Deserialize, Serialize};
5
6use crate::{binlog_error::BinlogError, constants, ext::cursor_ext::CursorExt};
7
8#[derive(Debug, Deserialize, Serialize, Clone)]
9pub struct EventHeader {
10 pub timestamp: u32,
11 pub event_type: u8,
12 pub server_id: u32,
13 pub event_length: u32,
14 pub next_event_position: u32,
15 pub event_flags: u16,
16}
17
18impl EventHeader {
19 pub fn parse<S: Read + Seek>(stream: &mut S) -> Result<Self, BinlogError> {
20 let mut buf = [0u8; constants::EVENT_HEADER_LENGTH];
22 stream.read_exact(&mut buf)?;
23
24 let mut cursor = Cursor::new(&buf);
25 Ok(Self {
26 timestamp: cursor.read_u32::<LittleEndian>()?,
27 event_type: cursor.read_u8()?,
28 server_id: cursor.read_u32::<LittleEndian>()?,
29 event_length: cursor.read_u32::<LittleEndian>()?,
30 next_event_position: cursor.read_u32::<LittleEndian>()?,
31 event_flags: cursor.read_u16::<LittleEndian>()?,
32 })
33 }
34
35 pub fn parse_rows_event_common_header(
39 cursor: &mut Cursor<&Vec<u8>>,
40 row_event_version: u8,
41 ) -> Result<(u64, usize, Vec<bool>), BinlogError> {
42 let table_id = cursor.read_u48::<LittleEndian>()?;
43 let _flags = cursor.read_u16::<LittleEndian>()?;
44
45 if row_event_version == 2 {
47 let extra_data_length = cursor.read_u16::<LittleEndian>()? as i64;
48 cursor.seek(SeekFrom::Current(extra_data_length - 2))?;
49 }
50
51 let column_count = cursor.read_packed_number()?;
52 let included_columns = cursor.read_bits(column_count, false)?;
53
54 Ok((table_id, column_count, included_columns))
55 }
56}