mysql_binlog_connector_rust/
binlog_parser.rs1use crate::{
2 binlog_error::BinlogError,
3 constants,
4 event::{
5 delete_rows_event::DeleteRowsEvent, event_data::*, event_header::EventHeader,
6 gtid_event::GtidEvent, previous_gtids_event::PreviousGtidsEvent, query_event::QueryEvent,
7 rotate_event::RotateEvent, rows_query_event::RowsQueryEvent,
8 table_map_event::TableMapEvent, transaction_payload_event::TransactionPayloadEvent,
9 update_rows_event::UpdateRowsEvent, write_rows_event::WriteRowsEvent,
10 xa_prepare_event::XaPrepareEvent, xid_event::XidEvent,
11 },
12 event::{event_type::EventType, format_description_event::FormatDescriptionEvent},
13};
14
15use std::{
16 collections::HashMap,
17 io::{Cursor, Read, Seek, SeekFrom},
18};
19
20pub struct BinlogParser {
21 pub checksum_length: u8,
22 pub table_map_event_by_table_id: HashMap<u64, TableMapEvent>,
23}
24
25const MAGIC_VALUE: [u8; 4] = [0xfeu8, 0x62, 0x69, 0x6e];
26
27impl BinlogParser {
28 pub fn check_magic<S: Read + Seek>(&mut self, stream: &mut S) -> Result<(), BinlogError> {
29 let mut magic = [0u8; 4];
30 stream.read_exact(&mut magic)?;
31 match magic {
32 MAGIC_VALUE => Ok(()),
33 _ => Err(BinlogError::UnexpectedData("bad magic".into())),
34 }
35 }
36
37 pub fn next<S: Read + Seek>(
38 &mut self,
39 stream: &mut S,
40 ) -> Result<(EventHeader, EventData), BinlogError> {
41 let header = EventHeader::parse(stream)?;
42 let data_length = header.event_length as usize
43 - constants::EVENT_HEADER_LENGTH
44 - self.checksum_length as usize;
45
46 let buf = self.read_event_data(stream, data_length)?;
47 let mut cursor = Cursor::new(&buf);
48
49 let event_type = EventType::from_code(header.event_type);
50 match event_type {
51 EventType::FormatDescription => {
52 let event_data = FormatDescriptionEvent::parse(&mut cursor, data_length)?;
53 self.checksum_length = event_data.checksum_type.get_length();
54 Ok((header, EventData::FormatDescription(event_data)))
55 }
56
57 EventType::PreviousGtids => Ok((
58 header,
59 EventData::PreviousGtids(PreviousGtidsEvent::parse(&mut cursor)?),
60 )),
61
62 EventType::Gtid => Ok((header, EventData::Gtid(GtidEvent::parse(&mut cursor)?))),
63
64 EventType::Query => Ok((header, EventData::Query(QueryEvent::parse(&mut cursor)?))),
65
66 EventType::TableMap => {
67 let event_data = TableMapEvent::parse(&mut cursor)?;
68 self.table_map_event_by_table_id
69 .insert(event_data.table_id, event_data.clone());
70 Ok((header, EventData::TableMap(event_data)))
71 }
72
73 EventType::WriteRows | EventType::ExtWriteRows => {
74 let row_event_version = Self::get_row_event_version(&event_type);
75 let event_data = WriteRowsEvent::parse(
76 &mut cursor,
77 &mut self.table_map_event_by_table_id,
78 row_event_version,
79 )?;
80 Ok((header, EventData::WriteRows(event_data)))
81 }
82
83 EventType::UpdateRows | EventType::ExtUpdateRows => {
84 let row_event_version = Self::get_row_event_version(&event_type);
85 let event_data = UpdateRowsEvent::parse(
86 &mut cursor,
87 &mut self.table_map_event_by_table_id,
88 row_event_version,
89 )?;
90 Ok((header, EventData::UpdateRows(event_data)))
91 }
92
93 EventType::DeleteRows | EventType::ExtDeleteRows => {
94 let row_event_version = Self::get_row_event_version(&event_type);
95 let event_data = DeleteRowsEvent::parse(
96 &mut cursor,
97 &mut self.table_map_event_by_table_id,
98 row_event_version,
99 )?;
100 Ok((header, EventData::DeleteRows(event_data)))
101 }
102
103 EventType::Xid => Ok((header, EventData::Xid(XidEvent::parse(&mut cursor)?))),
104
105 EventType::XaPrepare => Ok((
106 header,
107 EventData::XaPrepare(XaPrepareEvent::parse(&mut cursor)?),
108 )),
109
110 EventType::TransactionPayload => Ok((
111 header,
112 EventData::TransactionPayload(TransactionPayloadEvent::parse(&mut cursor)?),
113 )),
114
115 EventType::RowsQuery => Ok((
116 header,
117 EventData::RowsQuery(RowsQueryEvent::parse(&mut cursor)?),
118 )),
119
120 EventType::Rotate => Ok((header, EventData::Rotate(RotateEvent::parse(&mut cursor)?))),
121
122 EventType::HeartBeat => Ok((header, EventData::HeartBeat)),
123
124 _ => Ok((header, EventData::NotSupported)),
125 }
126 }
127
128 fn read_event_data<S: Read + Seek>(
129 &mut self,
130 stream: &mut S,
131 data_length: usize,
132 ) -> Result<Vec<u8>, BinlogError> {
133 let mut buf = vec![0u8; data_length];
135 stream.read_exact(&mut buf)?;
136 stream.seek(SeekFrom::Current(self.checksum_length as i64))?;
138 Ok(buf)
139 }
140
141 fn get_row_event_version(event_type: &EventType) -> u8 {
142 match event_type {
143 EventType::ExtWriteRows | EventType::ExtUpdateRows | EventType::ExtDeleteRows => 2,
144 _ => 1,
145 }
146 }
147}