mysql_binlog_connector_rust/event/
query_event.rs

1use std::io::{Cursor, Read, Seek, SeekFrom};
2
3use byteorder::{LittleEndian, ReadBytesExt};
4use serde::{Deserialize, Serialize};
5
6use crate::{binlog_error::BinlogError, ext::cursor_ext::CursorExt};
7
8#[derive(Debug, Deserialize, Serialize, Clone)]
9pub struct QueryEvent {
10    pub thread_id: u32,
11    pub exec_time: u32,
12    pub error_code: u16,
13    pub schema: String,
14    pub query: String,
15}
16
17impl QueryEvent {
18    pub fn parse(cursor: &mut Cursor<&Vec<u8>>) -> Result<Self, BinlogError> {
19        // refer: https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Query__event.html
20        // Post-Header for Query_event
21        let thread_id = cursor.read_u32::<LittleEndian>()?;
22        let exec_time = cursor.read_u32::<LittleEndian>()?;
23        let schema_length = cursor.read_u8()?;
24        let error_code = cursor.read_u16::<LittleEndian>()?;
25        let status_vars_length = cursor.read_u16::<LittleEndian>()? as i64;
26
27        // skip, Body for Query_event
28        cursor.seek(SeekFrom::Current(status_vars_length))?;
29
30        // Format: schema_length + 1, The currently selected database, as a null-terminated string.
31        let schema = cursor.read_string_without_terminator(schema_length as usize)?;
32
33        let mut query = String::new();
34        cursor.read_to_string(&mut query)?;
35
36        Ok(Self {
37            thread_id,
38            exec_time,
39            error_code,
40            schema,
41            query,
42        })
43    }
44}