1use byteorder::{ByteOrder, LittleEndian};
13use serde::Serialize;
14use std::fmt;
15
16use super::constants::*;
17use super::header::{FormatDescriptionEvent, RotateEvent};
18
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
38pub enum BinlogEventType {
39 UnknownEvent,
41 StartEventV3,
43 QueryEvent,
45 StopEvent,
47 RotateEvent,
49 IntvarEvent,
51 LoadEvent,
53 SlaveEvent,
55 CreateFileEvent,
57 AppendBlockEvent,
59 ExecLoadEvent,
61 DeleteFileEvent,
63 NewLoadEvent,
65 RandEvent,
67 UserVarEvent,
69 FormatDescription,
71 XidEvent,
73 BeginLoadQueryEvent,
75 ExecuteLoadQueryEvent,
77 TableMapEvent,
79 PreGaWriteRowsEvent,
81 PreGaUpdateRowsEvent,
83 PreGaDeleteRowsEvent,
85 WriteRowsEventV1,
87 UpdateRowsEventV1,
89 DeleteRowsEventV1,
91 IncidentEvent,
93 HeartbeatEvent,
95 IgnorableLogEvent,
97 RowsQueryEvent,
99 WriteRowsEvent,
101 UpdateRowsEvent,
103 DeleteRowsEvent,
105 GtidLogEvent,
107 AnonymousGtidLogEvent,
109 PreviousGtidsLogEvent,
111 TransactionContextEvent,
113 ViewChangeEvent,
115 XaPrepareLogEvent,
117 PartialUpdateRowsEvent,
119 TransactionPayloadEvent,
121 HeartbeatEventV2,
123 Unknown(u8),
125}
126
127impl BinlogEventType {
128 pub fn from_u8(code: u8) -> Self {
130 match code {
131 UNKNOWN_EVENT => Self::UnknownEvent,
132 START_EVENT_V3 => Self::StartEventV3,
133 QUERY_EVENT => Self::QueryEvent,
134 STOP_EVENT => Self::StopEvent,
135 ROTATE_EVENT => Self::RotateEvent,
136 INTVAR_EVENT => Self::IntvarEvent,
137 LOAD_EVENT => Self::LoadEvent,
138 SLAVE_EVENT => Self::SlaveEvent,
139 CREATE_FILE_EVENT => Self::CreateFileEvent,
140 APPEND_BLOCK_EVENT => Self::AppendBlockEvent,
141 EXEC_LOAD_EVENT => Self::ExecLoadEvent,
142 DELETE_FILE_EVENT => Self::DeleteFileEvent,
143 NEW_LOAD_EVENT => Self::NewLoadEvent,
144 RAND_EVENT => Self::RandEvent,
145 USER_VAR_EVENT => Self::UserVarEvent,
146 FORMAT_DESCRIPTION_EVENT => Self::FormatDescription,
147 XID_EVENT => Self::XidEvent,
148 BEGIN_LOAD_QUERY_EVENT => Self::BeginLoadQueryEvent,
149 EXECUTE_LOAD_QUERY_EVENT => Self::ExecuteLoadQueryEvent,
150 TABLE_MAP_EVENT => Self::TableMapEvent,
151 PRE_GA_WRITE_ROWS_EVENT => Self::PreGaWriteRowsEvent,
152 PRE_GA_UPDATE_ROWS_EVENT => Self::PreGaUpdateRowsEvent,
153 PRE_GA_DELETE_ROWS_EVENT => Self::PreGaDeleteRowsEvent,
154 WRITE_ROWS_EVENT_V1 => Self::WriteRowsEventV1,
155 UPDATE_ROWS_EVENT_V1 => Self::UpdateRowsEventV1,
156 DELETE_ROWS_EVENT_V1 => Self::DeleteRowsEventV1,
157 INCIDENT_EVENT => Self::IncidentEvent,
158 HEARTBEAT_LOG_EVENT => Self::HeartbeatEvent,
159 IGNORABLE_LOG_EVENT => Self::IgnorableLogEvent,
160 ROWS_QUERY_LOG_EVENT => Self::RowsQueryEvent,
161 WRITE_ROWS_EVENT => Self::WriteRowsEvent,
162 UPDATE_ROWS_EVENT => Self::UpdateRowsEvent,
163 DELETE_ROWS_EVENT => Self::DeleteRowsEvent,
164 GTID_LOG_EVENT => Self::GtidLogEvent,
165 ANONYMOUS_GTID_LOG_EVENT => Self::AnonymousGtidLogEvent,
166 PREVIOUS_GTIDS_LOG_EVENT => Self::PreviousGtidsLogEvent,
167 TRANSACTION_CONTEXT_EVENT => Self::TransactionContextEvent,
168 VIEW_CHANGE_EVENT => Self::ViewChangeEvent,
169 XA_PREPARE_LOG_EVENT => Self::XaPrepareLogEvent,
170 PARTIAL_UPDATE_ROWS_EVENT => Self::PartialUpdateRowsEvent,
171 TRANSACTION_PAYLOAD_EVENT => Self::TransactionPayloadEvent,
172 HEARTBEAT_LOG_EVENT_V2 => Self::HeartbeatEventV2,
173 other => Self::Unknown(other),
174 }
175 }
176
177 pub fn type_code(&self) -> u8 {
179 match self {
180 Self::UnknownEvent => UNKNOWN_EVENT,
181 Self::StartEventV3 => START_EVENT_V3,
182 Self::QueryEvent => QUERY_EVENT,
183 Self::StopEvent => STOP_EVENT,
184 Self::RotateEvent => ROTATE_EVENT,
185 Self::IntvarEvent => INTVAR_EVENT,
186 Self::LoadEvent => LOAD_EVENT,
187 Self::SlaveEvent => SLAVE_EVENT,
188 Self::CreateFileEvent => CREATE_FILE_EVENT,
189 Self::AppendBlockEvent => APPEND_BLOCK_EVENT,
190 Self::ExecLoadEvent => EXEC_LOAD_EVENT,
191 Self::DeleteFileEvent => DELETE_FILE_EVENT,
192 Self::NewLoadEvent => NEW_LOAD_EVENT,
193 Self::RandEvent => RAND_EVENT,
194 Self::UserVarEvent => USER_VAR_EVENT,
195 Self::FormatDescription => FORMAT_DESCRIPTION_EVENT,
196 Self::XidEvent => XID_EVENT,
197 Self::BeginLoadQueryEvent => BEGIN_LOAD_QUERY_EVENT,
198 Self::ExecuteLoadQueryEvent => EXECUTE_LOAD_QUERY_EVENT,
199 Self::TableMapEvent => TABLE_MAP_EVENT,
200 Self::PreGaWriteRowsEvent => PRE_GA_WRITE_ROWS_EVENT,
201 Self::PreGaUpdateRowsEvent => PRE_GA_UPDATE_ROWS_EVENT,
202 Self::PreGaDeleteRowsEvent => PRE_GA_DELETE_ROWS_EVENT,
203 Self::WriteRowsEventV1 => WRITE_ROWS_EVENT_V1,
204 Self::UpdateRowsEventV1 => UPDATE_ROWS_EVENT_V1,
205 Self::DeleteRowsEventV1 => DELETE_ROWS_EVENT_V1,
206 Self::IncidentEvent => INCIDENT_EVENT,
207 Self::HeartbeatEvent => HEARTBEAT_LOG_EVENT,
208 Self::IgnorableLogEvent => IGNORABLE_LOG_EVENT,
209 Self::RowsQueryEvent => ROWS_QUERY_LOG_EVENT,
210 Self::WriteRowsEvent => WRITE_ROWS_EVENT,
211 Self::UpdateRowsEvent => UPDATE_ROWS_EVENT,
212 Self::DeleteRowsEvent => DELETE_ROWS_EVENT,
213 Self::GtidLogEvent => GTID_LOG_EVENT,
214 Self::AnonymousGtidLogEvent => ANONYMOUS_GTID_LOG_EVENT,
215 Self::PreviousGtidsLogEvent => PREVIOUS_GTIDS_LOG_EVENT,
216 Self::TransactionContextEvent => TRANSACTION_CONTEXT_EVENT,
217 Self::ViewChangeEvent => VIEW_CHANGE_EVENT,
218 Self::XaPrepareLogEvent => XA_PREPARE_LOG_EVENT,
219 Self::PartialUpdateRowsEvent => PARTIAL_UPDATE_ROWS_EVENT,
220 Self::TransactionPayloadEvent => TRANSACTION_PAYLOAD_EVENT,
221 Self::HeartbeatEventV2 => HEARTBEAT_LOG_EVENT_V2,
222 Self::Unknown(c) => *c,
223 }
224 }
225
226 pub fn name(&self) -> &'static str {
228 match self {
229 Self::UnknownEvent => "UNKNOWN",
230 Self::StartEventV3 => "START_V3",
231 Self::QueryEvent => "QUERY",
232 Self::StopEvent => "STOP",
233 Self::RotateEvent => "ROTATE",
234 Self::IntvarEvent => "INTVAR",
235 Self::LoadEvent => "LOAD",
236 Self::SlaveEvent => "SLAVE",
237 Self::CreateFileEvent => "CREATE_FILE",
238 Self::AppendBlockEvent => "APPEND_BLOCK",
239 Self::ExecLoadEvent => "EXEC_LOAD",
240 Self::DeleteFileEvent => "DELETE_FILE",
241 Self::NewLoadEvent => "NEW_LOAD",
242 Self::RandEvent => "RAND",
243 Self::UserVarEvent => "USER_VAR",
244 Self::FormatDescription => "FORMAT_DESCRIPTION",
245 Self::XidEvent => "XID",
246 Self::BeginLoadQueryEvent => "BEGIN_LOAD_QUERY",
247 Self::ExecuteLoadQueryEvent => "EXECUTE_LOAD_QUERY",
248 Self::TableMapEvent => "TABLE_MAP",
249 Self::PreGaWriteRowsEvent => "PRE_GA_WRITE_ROWS",
250 Self::PreGaUpdateRowsEvent => "PRE_GA_UPDATE_ROWS",
251 Self::PreGaDeleteRowsEvent => "PRE_GA_DELETE_ROWS",
252 Self::WriteRowsEventV1 => "WRITE_ROWS_V1",
253 Self::UpdateRowsEventV1 => "UPDATE_ROWS_V1",
254 Self::DeleteRowsEventV1 => "DELETE_ROWS_V1",
255 Self::IncidentEvent => "INCIDENT",
256 Self::HeartbeatEvent => "HEARTBEAT",
257 Self::IgnorableLogEvent => "IGNORABLE",
258 Self::RowsQueryEvent => "ROWS_QUERY",
259 Self::WriteRowsEvent => "WRITE_ROWS_V2",
260 Self::UpdateRowsEvent => "UPDATE_ROWS_V2",
261 Self::DeleteRowsEvent => "DELETE_ROWS_V2",
262 Self::GtidLogEvent => "GTID",
263 Self::AnonymousGtidLogEvent => "ANONYMOUS_GTID",
264 Self::PreviousGtidsLogEvent => "PREVIOUS_GTIDS",
265 Self::TransactionContextEvent => "TRANSACTION_CONTEXT",
266 Self::ViewChangeEvent => "VIEW_CHANGE",
267 Self::XaPrepareLogEvent => "XA_PREPARE",
268 Self::PartialUpdateRowsEvent => "PARTIAL_UPDATE_ROWS",
269 Self::TransactionPayloadEvent => "TRANSACTION_PAYLOAD",
270 Self::HeartbeatEventV2 => "HEARTBEAT_V2",
271 Self::Unknown(_) => "UNKNOWN",
272 }
273 }
274}
275
276impl fmt::Display for BinlogEventType {
277 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
278 match self {
279 Self::Unknown(c) => write!(f, "UNKNOWN({c})"),
280 _ => write!(f, "{}", self.name()),
281 }
282 }
283}
284
285#[derive(Debug, Clone, Serialize)]
308pub struct CommonEventHeader {
309 pub timestamp: u32,
311 pub type_code: BinlogEventType,
313 pub server_id: u32,
315 pub event_length: u32,
317 pub next_position: u32,
319 pub flags: u16,
321}
322
323impl CommonEventHeader {
324 pub fn parse(data: &[u8]) -> Option<Self> {
328 if data.len() < COMMON_HEADER_SIZE {
329 return None;
330 }
331
332 Some(Self {
333 timestamp: LittleEndian::read_u32(&data[EVENT_TIMESTAMP_OFFSET..]),
334 type_code: BinlogEventType::from_u8(data[EVENT_TYPE_OFFSET]),
335 server_id: LittleEndian::read_u32(&data[EVENT_SERVER_ID_OFFSET..]),
336 event_length: LittleEndian::read_u32(&data[EVENT_LENGTH_OFFSET..]),
337 next_position: LittleEndian::read_u32(&data[EVENT_NEXT_POSITION_OFFSET..]),
338 flags: LittleEndian::read_u16(&data[EVENT_FLAGS_OFFSET..]),
339 })
340 }
341
342 pub fn payload_offset(&self) -> usize {
344 COMMON_HEADER_SIZE
345 }
346
347 pub fn payload_length(&self, checksum_enabled: bool) -> usize {
349 let total = self.event_length as usize;
350 let overhead = COMMON_HEADER_SIZE
351 + if checksum_enabled {
352 BINLOG_CHECKSUM_LEN
353 } else {
354 0
355 };
356 total.saturating_sub(overhead)
357 }
358}
359
360#[derive(Debug, Clone, Serialize)]
365#[serde(tag = "type")]
366pub enum BinlogEvent {
367 FormatDescription(FormatDescriptionEvent),
369 Rotate(RotateEvent),
371 Stop,
373 Query {
375 #[serde(skip)]
377 payload: Vec<u8>,
378 },
379 Xid {
381 xid: u64,
383 },
384 Unknown {
386 type_code: u8,
388 #[serde(skip)]
390 payload: Vec<u8>,
391 },
392}
393
394#[cfg(test)]
395mod tests {
396 use super::*;
397
398 #[test]
399 fn event_type_roundtrip() {
400 for code in 0..=41 {
401 let t = BinlogEventType::from_u8(code);
402 assert_eq!(t.type_code(), code);
403 }
404 let t = BinlogEventType::from_u8(200);
406 assert_eq!(t.type_code(), 200);
407 assert!(matches!(t, BinlogEventType::Unknown(200)));
408 }
409
410 #[test]
411 fn event_type_display() {
412 assert_eq!(
413 BinlogEventType::FormatDescription.to_string(),
414 "FORMAT_DESCRIPTION"
415 );
416 assert_eq!(BinlogEventType::StopEvent.to_string(), "STOP");
417 assert_eq!(BinlogEventType::Unknown(99).to_string(), "UNKNOWN(99)");
418 }
419
420 #[test]
421 fn event_type_name() {
422 assert_eq!(BinlogEventType::QueryEvent.name(), "QUERY");
423 assert_eq!(BinlogEventType::TableMapEvent.name(), "TABLE_MAP");
424 assert_eq!(BinlogEventType::WriteRowsEvent.name(), "WRITE_ROWS_V2");
425 assert_eq!(BinlogEventType::GtidLogEvent.name(), "GTID");
426 assert_eq!(BinlogEventType::HeartbeatEventV2.name(), "HEARTBEAT_V2");
427 }
428
429 #[test]
430 fn parse_common_header() {
431 let mut data = vec![0u8; 19];
432 LittleEndian::write_u32(&mut data[0..], 1_700_000_000);
433 data[4] = FORMAT_DESCRIPTION_EVENT;
434 LittleEndian::write_u32(&mut data[5..], 42);
435 LittleEndian::write_u32(&mut data[9..], 119);
436 LittleEndian::write_u32(&mut data[13..], 123);
437 LittleEndian::write_u16(&mut data[17..], 0x0001);
438
439 let hdr = CommonEventHeader::parse(&data).unwrap();
440 assert_eq!(hdr.timestamp, 1_700_000_000);
441 assert_eq!(hdr.type_code, BinlogEventType::FormatDescription);
442 assert_eq!(hdr.server_id, 42);
443 assert_eq!(hdr.event_length, 119);
444 assert_eq!(hdr.next_position, 123);
445 assert_eq!(hdr.flags, 0x0001);
446 }
447
448 #[test]
449 fn parse_common_header_too_short() {
450 let data = vec![0u8; 18];
451 assert!(CommonEventHeader::parse(&data).is_none());
452 }
453
454 #[test]
455 fn payload_length_with_checksum() {
456 let mut data = vec![0u8; 19];
457 LittleEndian::write_u32(&mut data[9..], 100); let hdr = CommonEventHeader::parse(&data).unwrap();
459
460 assert_eq!(hdr.payload_length(false), 81);
462 assert_eq!(hdr.payload_length(true), 77);
464 }
465}