mysql_binlog_connector_rust/event/
table_map_event.rs1use std::io::{Cursor, Read};
2
3use byteorder::{BigEndian, LittleEndian, ReadBytesExt};
4use serde::{Deserialize, Serialize};
5
6use crate::{
7 binlog_error::BinlogError, column::column_type::ColumnType,
8 event::table_map::table_metadata::TableMetadata, ext::cursor_ext::CursorExt,
9};
10
11#[derive(Debug, Deserialize, Serialize, Clone)]
12pub struct TableMapEvent {
13 pub table_id: u64,
14 pub database_name: String,
15 pub table_name: String,
16 pub column_types: Vec<u8>,
17 pub column_metas: Vec<u16>,
18 pub null_bits: Vec<bool>,
19 pub table_metadata: Option<TableMetadata>,
21}
22
23impl TableMapEvent {
24 pub fn parse(cursor: &mut Cursor<&Vec<u8>>) -> Result<Self, BinlogError> {
25 let table_id = cursor.read_u48::<LittleEndian>()?;
28
29 let _flags = cursor.read_u16::<LittleEndian>();
31
32 let database_name_length = cursor.read_u8()?;
34 let database_name = cursor.read_string_without_terminator(database_name_length as usize)?;
35
36 let table_name_length = cursor.read_u8()?;
38 let table_name = cursor.read_string_without_terminator(table_name_length as usize)?;
39
40 let column_count = cursor.read_packed_number()?;
42
43 let mut column_types = vec![0u8; column_count];
45 cursor.read_exact(&mut column_types)?;
46
47 let _metadata_length = cursor.read_packed_number()?;
49
50 let column_metas = Self::read_metadatas(cursor, &column_types)?;
52
53 let null_bits = cursor.read_bits(column_count, false)?;
55
56 let mut table_metadata = None;
58 if cursor.available() > 0 {
59 table_metadata = Some(TableMetadata::parse(cursor, &column_types, &column_metas)?);
60 }
61
62 Ok(Self {
63 table_id,
64 database_name,
65 table_name,
66 column_types,
67 column_metas,
68 null_bits,
69 table_metadata,
70 })
71 }
72
73 fn read_metadatas(
74 cursor: &mut Cursor<&Vec<u8>>,
75 column_types: &Vec<u8>,
76 ) -> Result<Vec<u16>, BinlogError> {
77 let mut column_metadatas = Vec::with_capacity(column_types.len());
78 for column_type in column_types {
79 let column_metadata = match ColumnType::from_code(*column_type) {
80 ColumnType::Float
81 | ColumnType::Double
82 | ColumnType::Blob
83 | ColumnType::TinyBlob
84 | ColumnType::MediumBlob
85 | ColumnType::LongBlob
86 | ColumnType::Json
87 | ColumnType::Geometry
88 | ColumnType::Time2
89 | ColumnType::DateTime2
90 | ColumnType::TimeStamp2 => cursor.read_u8()? as u16,
91
92 ColumnType::Bit | ColumnType::VarChar | ColumnType::NewDecimal => {
93 cursor.read_u16::<LittleEndian>()?
94 }
95
96 ColumnType::Set | ColumnType::Enum | ColumnType::String => {
97 cursor.read_u16::<BigEndian>()?
98 }
99
100 _ => 0,
101 };
102 column_metadatas.push(column_metadata);
103 }
104
105 Ok(column_metadatas)
106 }
107}