mysql_binlog_connector_rust/event/
table_map_event.rs1use std::io::{Cursor, Read};
2
3use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
4use serde::{Deserialize, Serialize};
5
6use crate::{
7 binlog_error::BinlogError, column::column_type::ColumnType, ext::cursor_ext::CursorExt,
8};
9
10#[derive(Debug, Deserialize, Serialize, Clone)]
11pub struct TableMapEvent {
12 pub table_id: u64,
13 pub database_name: String,
14 pub table_name: String,
15 pub column_types: Vec<u8>,
16 pub column_metas: Vec<u16>,
17 pub null_bits: Vec<bool>,
18}
19
20impl TableMapEvent {
21 pub fn parse(cursor: &mut Cursor<&Vec<u8>>) -> Result<Self, BinlogError> {
22 let table_id = cursor.read_u48::<LittleEndian>()?;
25
26 let _flags = cursor.read_u16::<LittleEndian>();
28
29 let database_name_length = cursor.read_u8()?;
31 let database_name = cursor.read_string_without_terminator(database_name_length as usize)?;
32
33 let table_name_length = cursor.read_u8()?;
35 let table_name = cursor.read_string_without_terminator(table_name_length as usize)?;
36
37 let column_count = cursor.read_packed_number()?;
39
40 let mut column_types = vec![0u8; column_count];
42 cursor.read_exact(&mut column_types)?;
43
44 let _metadata_length = cursor.read_packed_number()?;
46
47 let column_metas = Self::read_metadatas(cursor, &column_types)?;
49
50 let null_bits = cursor.read_bits(column_count, false)?;
52
53 Ok(Self {
56 table_id,
57 database_name,
58 table_name,
59 column_types,
60 column_metas,
61 null_bits,
62 })
63 }
64
65 fn read_metadatas(
66 cursor: &mut Cursor<&Vec<u8>>,
67 column_types: &Vec<u8>,
68 ) -> Result<Vec<u16>, BinlogError> {
69 let mut column_metadatas = Vec::with_capacity(column_types.len());
70 for column_type in column_types {
71 let column_metadata = match ColumnType::from_code(*column_type) {
72 ColumnType::Float
73 | ColumnType::Double
74 | ColumnType::Blob
75 | ColumnType::TinyBlob
76 | ColumnType::MediumBlob
77 | ColumnType::LongBlob
78 | ColumnType::Json
79 | ColumnType::Geometry
80 | ColumnType::Time2
81 | ColumnType::DateTime2
82 | ColumnType::TimeStamp2 => cursor.read_u8()? as u16,
83
84 ColumnType::Bit | ColumnType::VarChar | ColumnType::NewDecimal => {
85 cursor.read_u16::<LittleEndian>()?
86 }
87
88 ColumnType::Set | ColumnType::Enum | ColumnType::String => {
89 cursor.read_u16::<BigEndian>()?
90 }
91
92 _ => 0,
93 };
94 column_metadatas.push(column_metadata);
95 }
96
97 Ok(column_metadatas)
98 }
99}