mysql_binlog_connector_rust/event/
format_description_event.rs

1use crate::{binlog_error::BinlogError, event::event_type::EventType};
2use byteorder::{LittleEndian, ReadBytesExt};
3use serde::{Deserialize, Serialize};
4use std::io::{Cursor, Read, Seek, SeekFrom};
5
6use super::checksum_type::ChecksumType;
7
8#[derive(Debug, Deserialize, Serialize, Clone)]
9pub struct FormatDescriptionEvent {
10    pub binlog_version: u16,
11    pub server_version: String,
12    pub create_timestamp: u32,
13    pub header_length: u8,
14    pub checksum_type: ChecksumType,
15}
16
17impl FormatDescriptionEvent {
18    pub fn parse(cursor: &mut Cursor<&Vec<u8>>, data_length: usize) -> Result<Self, BinlogError> {
19        // refer: https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Format__description__event.html
20        // binlog_version: 2 bytes
21        let binlog_version = cursor.read_u16::<LittleEndian>()?;
22
23        // server_version: 50 bytes
24        let mut server_version_buf = [0u8; 50];
25        cursor.read_exact(&mut server_version_buf)?;
26        let server_version = std::str::from_utf8(&server_version_buf)
27            .unwrap()
28            .to_string();
29
30        // create_timestamp: 4 bytes
31        let create_timestamp = cursor.read_u32::<LittleEndian>()?;
32
33        // header_length: 1 byte
34        // Length of the Binlog Event Header of next events. Should always be 19.
35        let header_length = cursor.read_u8()?;
36
37        // post-header (76 : n), it is an array of n bytes,
38        // one byte per event type that the server knows about, n = count of all event types,
39        // the 14th (EventType::FormatDescription - 1) byte contains the payload length of FormatDescription,
40        cursor.seek(SeekFrom::Current(EventType::FormatDescription as i64 - 1))?;
41        let payload_length = cursor.read_u8()? as usize;
42
43        // after the header and payload, it is the checksum type, 1 byte
44        let mut checksum_type = 0;
45        let checksum_block_length = data_length - payload_length;
46        if checksum_block_length > 0 {
47            // seek to the end of payload
48            let current_pos = 2 + 50 + 4 + 1 + EventType::FormatDescription as u8;
49            cursor.seek(SeekFrom::Current(
50                payload_length as i64 - current_pos as i64,
51            ))?;
52            // read checksum type, refer: https://mariadb.com/kb/en/format_description_event/
53            checksum_type = cursor.read_u8()?;
54        }
55
56        Ok(Self {
57            binlog_version,
58            server_version,
59            create_timestamp,
60            header_length,
61            checksum_type: ChecksumType::from_code(checksum_type),
62        })
63    }
64}