mysql_binlog_connector_rust/
binlog_parser.rs

1use crate::{
2    binlog_error::BinlogError,
3    constants,
4    event::{
5        delete_rows_event::DeleteRowsEvent, event_data::*, event_header::EventHeader,
6        gtid_event::GtidEvent, previous_gtids_event::PreviousGtidsEvent, query_event::QueryEvent,
7        rotate_event::RotateEvent, rows_query_event::RowsQueryEvent,
8        table_map_event::TableMapEvent, transaction_payload_event::TransactionPayloadEvent,
9        update_rows_event::UpdateRowsEvent, write_rows_event::WriteRowsEvent,
10        xa_prepare_event::XaPrepareEvent, xid_event::XidEvent,
11    },
12    event::{event_type::EventType, format_description_event::FormatDescriptionEvent},
13};
14
15use std::{
16    collections::HashMap,
17    io::{Cursor, Read, Seek, SeekFrom},
18};
19
20pub struct BinlogParser {
21    pub checksum_length: u8,
22    pub table_map_event_by_table_id: HashMap<u64, TableMapEvent>,
23}
24
25const MAGIC_VALUE: [u8; 4] = [0xfeu8, 0x62, 0x69, 0x6e];
26
27impl BinlogParser {
28    pub fn check_magic<S: Read + Seek>(&mut self, stream: &mut S) -> Result<(), BinlogError> {
29        let mut magic = [0u8; 4];
30        stream.read_exact(&mut magic)?;
31        match magic {
32            MAGIC_VALUE => Ok(()),
33            _ => Err(BinlogError::UnexpectedData("bad magic".into())),
34        }
35    }
36
37    pub fn next<S: Read + Seek>(
38        &mut self,
39        stream: &mut S,
40    ) -> Result<(EventHeader, EventData), BinlogError> {
41        let header = EventHeader::parse(stream)?;
42        let data_length = header.event_length as usize
43            - constants::EVENT_HEADER_LENGTH
44            - self.checksum_length as usize;
45
46        let buf = self.read_event_data(stream, data_length)?;
47        let mut cursor = Cursor::new(&buf);
48
49        let event_type = EventType::from_code(header.event_type);
50        match event_type {
51            EventType::FormatDescription => {
52                let event_data = FormatDescriptionEvent::parse(&mut cursor, data_length)?;
53                self.checksum_length = event_data.checksum_type.get_length();
54                Ok((header, EventData::FormatDescription(event_data)))
55            }
56
57            EventType::PreviousGtids => Ok((
58                header,
59                EventData::PreviousGtids(PreviousGtidsEvent::parse(&mut cursor)?),
60            )),
61
62            EventType::Gtid => Ok((header, EventData::Gtid(GtidEvent::parse(&mut cursor)?))),
63
64            EventType::Query => Ok((header, EventData::Query(QueryEvent::parse(&mut cursor)?))),
65
66            EventType::TableMap => {
67                let event_data = TableMapEvent::parse(&mut cursor)?;
68                self.table_map_event_by_table_id
69                    .insert(event_data.table_id, event_data.clone());
70                Ok((header, EventData::TableMap(event_data)))
71            }
72
73            EventType::WriteRows | EventType::ExtWriteRows => {
74                let row_event_version = Self::get_row_event_version(&event_type);
75                let event_data = WriteRowsEvent::parse(
76                    &mut cursor,
77                    &mut self.table_map_event_by_table_id,
78                    row_event_version,
79                )?;
80                Ok((header, EventData::WriteRows(event_data)))
81            }
82
83            EventType::UpdateRows | EventType::ExtUpdateRows => {
84                let row_event_version = Self::get_row_event_version(&event_type);
85                let event_data = UpdateRowsEvent::parse(
86                    &mut cursor,
87                    &mut self.table_map_event_by_table_id,
88                    row_event_version,
89                )?;
90                Ok((header, EventData::UpdateRows(event_data)))
91            }
92
93            EventType::DeleteRows | EventType::ExtDeleteRows => {
94                let row_event_version = Self::get_row_event_version(&event_type);
95                let event_data = DeleteRowsEvent::parse(
96                    &mut cursor,
97                    &mut self.table_map_event_by_table_id,
98                    row_event_version,
99                )?;
100                Ok((header, EventData::DeleteRows(event_data)))
101            }
102
103            EventType::Xid => Ok((header, EventData::Xid(XidEvent::parse(&mut cursor)?))),
104
105            EventType::XaPrepare => Ok((
106                header,
107                EventData::XaPrepare(XaPrepareEvent::parse(&mut cursor)?),
108            )),
109
110            EventType::TransactionPayload => Ok((
111                header,
112                EventData::TransactionPayload(TransactionPayloadEvent::parse(&mut cursor)?),
113            )),
114
115            EventType::RowsQuery => Ok((
116                header,
117                EventData::RowsQuery(RowsQueryEvent::parse(&mut cursor)?),
118            )),
119
120            EventType::Rotate => Ok((header, EventData::Rotate(RotateEvent::parse(&mut cursor)?))),
121
122            EventType::HeartBeat => Ok((header, EventData::HeartBeat)),
123
124            _ => Ok((header, EventData::NotSupported)),
125        }
126    }
127
128    fn read_event_data<S: Read + Seek>(
129        &mut self,
130        stream: &mut S,
131        data_length: usize,
132    ) -> Result<Vec<u8>, BinlogError> {
133        // read data for current event
134        let mut buf = vec![0u8; data_length];
135        stream.read_exact(&mut buf)?;
136        // skip checksum
137        stream.seek(SeekFrom::Current(self.checksum_length as i64))?;
138        Ok(buf)
139    }
140
141    fn get_row_event_version(event_type: &EventType) -> u8 {
142        match event_type {
143            EventType::ExtWriteRows | EventType::ExtUpdateRows | EventType::ExtDeleteRows => 2,
144            _ => 1,
145        }
146    }
147}