mysql_binlog_connector_rust/event/
table_map_event.rs

1use std::io::{Cursor, Read};
2
3use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
4use serde::{Deserialize, Serialize};
5
6use crate::{
7    binlog_error::BinlogError, column::column_type::ColumnType, ext::cursor_ext::CursorExt,
8};
9
10#[derive(Debug, Deserialize, Serialize, Clone)]
11pub struct TableMapEvent {
12    pub table_id: u64,
13    pub database_name: String,
14    pub table_name: String,
15    pub column_types: Vec<u8>,
16    pub column_metas: Vec<u16>,
17    pub null_bits: Vec<bool>,
18}
19
20impl TableMapEvent {
21    pub fn parse(cursor: &mut Cursor<&Vec<u8>>) -> Result<Self, BinlogError> {
22        // refer: https://dev.mysql.com/doc/dev/mysql-server/latest/classbinary__log_1_1Table__map__event.html
23        // table_id
24        let table_id = cursor.read_u48::<LittleEndian>()?;
25
26        // flags, Reserved for future use; currently always 0.
27        let _flags = cursor.read_u16::<LittleEndian>();
28
29        // database_name
30        let database_name_length = cursor.read_u8()?;
31        let database_name = cursor.read_string_without_terminator(database_name_length as usize)?;
32
33        // table_name
34        let table_name_length = cursor.read_u8()?;
35        let table_name = cursor.read_string_without_terminator(table_name_length as usize)?;
36
37        // column_count
38        let column_count = cursor.read_packed_number()?;
39
40        // column_types
41        let mut column_types = vec![0u8; column_count];
42        cursor.read_exact(&mut column_types)?;
43
44        // metadata_length, won't be used
45        let _metadata_length = cursor.read_packed_number()?;
46
47        // metadata
48        let column_metas = Self::read_metadatas(cursor, &column_types)?;
49
50        // nullable_bits
51        let null_bits = cursor.read_bits(column_count, false)?;
52
53        // TODO: table_metadata
54
55        Ok(Self {
56            table_id,
57            database_name,
58            table_name,
59            column_types,
60            column_metas,
61            null_bits,
62        })
63    }
64
65    fn read_metadatas(
66        cursor: &mut Cursor<&Vec<u8>>,
67        column_types: &Vec<u8>,
68    ) -> Result<Vec<u16>, BinlogError> {
69        let mut column_metadatas = Vec::with_capacity(column_types.len());
70        for column_type in column_types {
71            let column_metadata = match ColumnType::from_code(*column_type) {
72                ColumnType::Float
73                | ColumnType::Double
74                | ColumnType::Blob
75                | ColumnType::TinyBlob
76                | ColumnType::MediumBlob
77                | ColumnType::LongBlob
78                | ColumnType::Json
79                | ColumnType::Geometry
80                | ColumnType::Time2
81                | ColumnType::DateTime2
82                | ColumnType::TimeStamp2 => cursor.read_u8()? as u16,
83
84                ColumnType::Bit | ColumnType::VarChar | ColumnType::NewDecimal => {
85                    cursor.read_u16::<LittleEndian>()?
86                }
87
88                ColumnType::Set | ColumnType::Enum | ColumnType::String => {
89                    cursor.read_u16::<BigEndian>()?
90                }
91
92                _ => 0,
93            };
94            column_metadatas.push(column_metadata);
95        }
96
97        Ok(column_metadatas)
98    }
99}