mysql_binlog_connector_rust/event/
table_map_event.rs

1use std::io::{Cursor, Read};
2
3use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
4use serde::{Deserialize, Serialize};
5
6use crate::{
7    binlog_error::BinlogError, column::column_type::ColumnType,
8    event::table_map::table_metadata::TableMetadata, ext::cursor_ext::CursorExt,
9};
10
11#[derive(Debug, Deserialize, Serialize, Clone)]
12pub struct TableMapEvent {
13    pub table_id: u64,
14    pub database_name: String,
15    pub table_name: String,
16    pub column_types: Vec<u8>,
17    pub column_metas: Vec<u16>,
18    pub null_bits: Vec<bool>,
19    /// Gets table metadata for MySQL 8.0.1+
20    pub table_metadata: Option<TableMetadata>,
21}
22
23impl TableMapEvent {
24    pub fn parse(cursor: &mut Cursor<&Vec<u8>>) -> Result<Self, BinlogError> {
25        // refer: https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Table__map__event.html
26        // table_id
27        let table_id = cursor.read_u48::<LittleEndian>()?;
28
29        // flags, Reserved for future use; currently always 0.
30        let _flags = cursor.read_u16::<LittleEndian>();
31
32        // database_name
33        let database_name_length = cursor.read_u8()?;
34        let database_name = cursor.read_string_without_terminator(database_name_length as usize)?;
35
36        // table_name
37        let table_name_length = cursor.read_u8()?;
38        let table_name = cursor.read_string_without_terminator(table_name_length as usize)?;
39
40        // column_count
41        let column_count = cursor.read_packed_number()?;
42
43        // column_types
44        let mut column_types = vec![0u8; column_count];
45        cursor.read_exact(&mut column_types)?;
46
47        // metadata_length, won't be used
48        let _metadata_length = cursor.read_packed_number()?;
49
50        // metadata
51        let column_metas = Self::read_metadatas(cursor, &column_types)?;
52
53        // nullable_bits
54        let null_bits = cursor.read_bits(column_count, false)?;
55
56        // table_metadata (MySQL 8.0.1+ and MariaDB 10.5+)
57        let mut table_metadata = None;
58        if cursor.available() > 0 {
59            table_metadata = Some(TableMetadata::parse(cursor, &column_types, &column_metas)?);
60        }
61
62        Ok(Self {
63            table_id,
64            database_name,
65            table_name,
66            column_types,
67            column_metas,
68            null_bits,
69            table_metadata,
70        })
71    }
72
73    fn read_metadatas(
74        cursor: &mut Cursor<&Vec<u8>>,
75        column_types: &Vec<u8>,
76    ) -> Result<Vec<u16>, BinlogError> {
77        let mut column_metadatas = Vec::with_capacity(column_types.len());
78        for column_type in column_types {
79            let column_metadata = match ColumnType::from_code(*column_type) {
80                ColumnType::Float
81                | ColumnType::Double
82                | ColumnType::Blob
83                | ColumnType::TinyBlob
84                | ColumnType::MediumBlob
85                | ColumnType::LongBlob
86                | ColumnType::Json
87                | ColumnType::Geometry
88                | ColumnType::Time2
89                | ColumnType::DateTime2
90                | ColumnType::TimeStamp2 => cursor.read_u8()? as u16,
91
92                ColumnType::Bit | ColumnType::VarChar | ColumnType::NewDecimal => {
93                    cursor.read_u16::<LittleEndian>()?
94                }
95
96                ColumnType::Set | ColumnType::Enum | ColumnType::String => {
97                    cursor.read_u16::<BigEndian>()?
98                }
99
100                _ => 0,
101            };
102            column_metadatas.push(column_metadata);
103        }
104
105        Ok(column_metadatas)
106    }
107}