Skip to main content

idb/binlog/
event.rs

1//! Binlog event types and common event header.
2//!
3//! Every binlog event begins with a 19-byte common header containing the
4//! timestamp, event type, server ID, total length, next-event position, and
5//! flags. The [`CommonEventHeader`] struct parses this header, and the
6//! [`BinlogEventType`] enum maps the type code byte to a named variant.
7//!
8//! The [`BinlogEvent`] enum wraps the parsed payload for recognized event
9//! types (FORMAT_DESCRIPTION, ROTATE, STOP, QUERY, XID) with an `Unknown`
10//! fallback for everything else.
11
12use byteorder::{ByteOrder, LittleEndian};
13use serde::Serialize;
14use std::fmt;
15
16use super::constants::*;
17use super::header::{FormatDescriptionEvent, RotateEvent};
18
19/// MySQL binary log event type codes.
20///
21/// Covers all event types from MySQL 5.0 through 9.x (42 named variants).
22/// Unrecognized type codes are preserved in the `Unknown(u8)` variant
23/// for forward compatibility.
24///
25/// # Examples
26///
27/// ```
28/// use idb::binlog::BinlogEventType;
29///
30/// let t = BinlogEventType::from_u8(15);
31/// assert_eq!(t, BinlogEventType::FormatDescription);
32/// assert_eq!(t.name(), "FORMAT_DESCRIPTION");
33///
34/// let u = BinlogEventType::from_u8(200);
35/// assert!(matches!(u, BinlogEventType::Unknown(200)));
36/// ```
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
38pub enum BinlogEventType {
39    /// Unknown or invalid event (type code 0).
40    UnknownEvent,
41    /// Start event v3 (pre-v4 format).
42    StartEventV3,
43    /// SQL query event.
44    QueryEvent,
45    /// Server shutdown.
46    StopEvent,
47    /// Binlog file rotation.
48    RotateEvent,
49    /// Integer session variable.
50    IntvarEvent,
51    /// LOAD DATA INFILE event (deprecated).
52    LoadEvent,
53    /// Slave event (internal replication, deprecated).
54    SlaveEvent,
55    /// Create file for LOAD DATA (deprecated).
56    CreateFileEvent,
57    /// Append block for LOAD DATA (deprecated).
58    AppendBlockEvent,
59    /// Execute LOAD DATA (deprecated).
60    ExecLoadEvent,
61    /// Delete file for LOAD DATA (deprecated).
62    DeleteFileEvent,
63    /// New LOAD DATA INFILE event (deprecated).
64    NewLoadEvent,
65    /// Random seed for RAND().
66    RandEvent,
67    /// User-defined variable.
68    UserVarEvent,
69    /// Format description event (binlog v4 header).
70    FormatDescription,
71    /// XA transaction commit.
72    XidEvent,
73    /// Begin LOAD QUERY event.
74    BeginLoadQueryEvent,
75    /// Execute LOAD QUERY event.
76    ExecuteLoadQueryEvent,
77    /// Table map (row-based replication).
78    TableMapEvent,
79    /// Pre-GA write rows event.
80    PreGaWriteRowsEvent,
81    /// Pre-GA update rows event.
82    PreGaUpdateRowsEvent,
83    /// Pre-GA delete rows event.
84    PreGaDeleteRowsEvent,
85    /// Write rows v1.
86    WriteRowsEventV1,
87    /// Update rows v1.
88    UpdateRowsEventV1,
89    /// Delete rows v1.
90    DeleteRowsEventV1,
91    /// Incident event.
92    IncidentEvent,
93    /// Heartbeat event.
94    HeartbeatEvent,
95    /// Ignorable log event.
96    IgnorableLogEvent,
97    /// Rows query event.
98    RowsQueryEvent,
99    /// Write rows v2.
100    WriteRowsEvent,
101    /// Update rows v2.
102    UpdateRowsEvent,
103    /// Delete rows v2.
104    DeleteRowsEvent,
105    /// GTID event.
106    GtidLogEvent,
107    /// Anonymous GTID event.
108    AnonymousGtidLogEvent,
109    /// Previous GTIDs event.
110    PreviousGtidsLogEvent,
111    /// Transaction context event (Group Replication).
112    TransactionContextEvent,
113    /// View change event (Group Replication).
114    ViewChangeEvent,
115    /// XA prepare log event.
116    XaPrepareLogEvent,
117    /// Partial update rows event (MySQL 8.0+).
118    PartialUpdateRowsEvent,
119    /// Transaction payload event (MySQL 8.0.20+).
120    TransactionPayloadEvent,
121    /// Heartbeat v2 event (MySQL 8.0.26+).
122    HeartbeatEventV2,
123    /// Unrecognized event type code (forward compatibility).
124    Unknown(u8),
125}
126
127impl BinlogEventType {
128    /// Map a raw type code byte to a [`BinlogEventType`] variant.
129    pub fn from_u8(code: u8) -> Self {
130        match code {
131            UNKNOWN_EVENT => Self::UnknownEvent,
132            START_EVENT_V3 => Self::StartEventV3,
133            QUERY_EVENT => Self::QueryEvent,
134            STOP_EVENT => Self::StopEvent,
135            ROTATE_EVENT => Self::RotateEvent,
136            INTVAR_EVENT => Self::IntvarEvent,
137            LOAD_EVENT => Self::LoadEvent,
138            SLAVE_EVENT => Self::SlaveEvent,
139            CREATE_FILE_EVENT => Self::CreateFileEvent,
140            APPEND_BLOCK_EVENT => Self::AppendBlockEvent,
141            EXEC_LOAD_EVENT => Self::ExecLoadEvent,
142            DELETE_FILE_EVENT => Self::DeleteFileEvent,
143            NEW_LOAD_EVENT => Self::NewLoadEvent,
144            RAND_EVENT => Self::RandEvent,
145            USER_VAR_EVENT => Self::UserVarEvent,
146            FORMAT_DESCRIPTION_EVENT => Self::FormatDescription,
147            XID_EVENT => Self::XidEvent,
148            BEGIN_LOAD_QUERY_EVENT => Self::BeginLoadQueryEvent,
149            EXECUTE_LOAD_QUERY_EVENT => Self::ExecuteLoadQueryEvent,
150            TABLE_MAP_EVENT => Self::TableMapEvent,
151            PRE_GA_WRITE_ROWS_EVENT => Self::PreGaWriteRowsEvent,
152            PRE_GA_UPDATE_ROWS_EVENT => Self::PreGaUpdateRowsEvent,
153            PRE_GA_DELETE_ROWS_EVENT => Self::PreGaDeleteRowsEvent,
154            WRITE_ROWS_EVENT_V1 => Self::WriteRowsEventV1,
155            UPDATE_ROWS_EVENT_V1 => Self::UpdateRowsEventV1,
156            DELETE_ROWS_EVENT_V1 => Self::DeleteRowsEventV1,
157            INCIDENT_EVENT => Self::IncidentEvent,
158            HEARTBEAT_LOG_EVENT => Self::HeartbeatEvent,
159            IGNORABLE_LOG_EVENT => Self::IgnorableLogEvent,
160            ROWS_QUERY_LOG_EVENT => Self::RowsQueryEvent,
161            WRITE_ROWS_EVENT => Self::WriteRowsEvent,
162            UPDATE_ROWS_EVENT => Self::UpdateRowsEvent,
163            DELETE_ROWS_EVENT => Self::DeleteRowsEvent,
164            GTID_LOG_EVENT => Self::GtidLogEvent,
165            ANONYMOUS_GTID_LOG_EVENT => Self::AnonymousGtidLogEvent,
166            PREVIOUS_GTIDS_LOG_EVENT => Self::PreviousGtidsLogEvent,
167            TRANSACTION_CONTEXT_EVENT => Self::TransactionContextEvent,
168            VIEW_CHANGE_EVENT => Self::ViewChangeEvent,
169            XA_PREPARE_LOG_EVENT => Self::XaPrepareLogEvent,
170            PARTIAL_UPDATE_ROWS_EVENT => Self::PartialUpdateRowsEvent,
171            TRANSACTION_PAYLOAD_EVENT => Self::TransactionPayloadEvent,
172            HEARTBEAT_LOG_EVENT_V2 => Self::HeartbeatEventV2,
173            other => Self::Unknown(other),
174        }
175    }
176
177    /// Return the raw type code byte.
178    pub fn type_code(&self) -> u8 {
179        match self {
180            Self::UnknownEvent => UNKNOWN_EVENT,
181            Self::StartEventV3 => START_EVENT_V3,
182            Self::QueryEvent => QUERY_EVENT,
183            Self::StopEvent => STOP_EVENT,
184            Self::RotateEvent => ROTATE_EVENT,
185            Self::IntvarEvent => INTVAR_EVENT,
186            Self::LoadEvent => LOAD_EVENT,
187            Self::SlaveEvent => SLAVE_EVENT,
188            Self::CreateFileEvent => CREATE_FILE_EVENT,
189            Self::AppendBlockEvent => APPEND_BLOCK_EVENT,
190            Self::ExecLoadEvent => EXEC_LOAD_EVENT,
191            Self::DeleteFileEvent => DELETE_FILE_EVENT,
192            Self::NewLoadEvent => NEW_LOAD_EVENT,
193            Self::RandEvent => RAND_EVENT,
194            Self::UserVarEvent => USER_VAR_EVENT,
195            Self::FormatDescription => FORMAT_DESCRIPTION_EVENT,
196            Self::XidEvent => XID_EVENT,
197            Self::BeginLoadQueryEvent => BEGIN_LOAD_QUERY_EVENT,
198            Self::ExecuteLoadQueryEvent => EXECUTE_LOAD_QUERY_EVENT,
199            Self::TableMapEvent => TABLE_MAP_EVENT,
200            Self::PreGaWriteRowsEvent => PRE_GA_WRITE_ROWS_EVENT,
201            Self::PreGaUpdateRowsEvent => PRE_GA_UPDATE_ROWS_EVENT,
202            Self::PreGaDeleteRowsEvent => PRE_GA_DELETE_ROWS_EVENT,
203            Self::WriteRowsEventV1 => WRITE_ROWS_EVENT_V1,
204            Self::UpdateRowsEventV1 => UPDATE_ROWS_EVENT_V1,
205            Self::DeleteRowsEventV1 => DELETE_ROWS_EVENT_V1,
206            Self::IncidentEvent => INCIDENT_EVENT,
207            Self::HeartbeatEvent => HEARTBEAT_LOG_EVENT,
208            Self::IgnorableLogEvent => IGNORABLE_LOG_EVENT,
209            Self::RowsQueryEvent => ROWS_QUERY_LOG_EVENT,
210            Self::WriteRowsEvent => WRITE_ROWS_EVENT,
211            Self::UpdateRowsEvent => UPDATE_ROWS_EVENT,
212            Self::DeleteRowsEvent => DELETE_ROWS_EVENT,
213            Self::GtidLogEvent => GTID_LOG_EVENT,
214            Self::AnonymousGtidLogEvent => ANONYMOUS_GTID_LOG_EVENT,
215            Self::PreviousGtidsLogEvent => PREVIOUS_GTIDS_LOG_EVENT,
216            Self::TransactionContextEvent => TRANSACTION_CONTEXT_EVENT,
217            Self::ViewChangeEvent => VIEW_CHANGE_EVENT,
218            Self::XaPrepareLogEvent => XA_PREPARE_LOG_EVENT,
219            Self::PartialUpdateRowsEvent => PARTIAL_UPDATE_ROWS_EVENT,
220            Self::TransactionPayloadEvent => TRANSACTION_PAYLOAD_EVENT,
221            Self::HeartbeatEventV2 => HEARTBEAT_LOG_EVENT_V2,
222            Self::Unknown(c) => *c,
223        }
224    }
225
226    /// Returns the MySQL source-style name for this event type.
227    pub fn name(&self) -> &'static str {
228        match self {
229            Self::UnknownEvent => "UNKNOWN",
230            Self::StartEventV3 => "START_V3",
231            Self::QueryEvent => "QUERY",
232            Self::StopEvent => "STOP",
233            Self::RotateEvent => "ROTATE",
234            Self::IntvarEvent => "INTVAR",
235            Self::LoadEvent => "LOAD",
236            Self::SlaveEvent => "SLAVE",
237            Self::CreateFileEvent => "CREATE_FILE",
238            Self::AppendBlockEvent => "APPEND_BLOCK",
239            Self::ExecLoadEvent => "EXEC_LOAD",
240            Self::DeleteFileEvent => "DELETE_FILE",
241            Self::NewLoadEvent => "NEW_LOAD",
242            Self::RandEvent => "RAND",
243            Self::UserVarEvent => "USER_VAR",
244            Self::FormatDescription => "FORMAT_DESCRIPTION",
245            Self::XidEvent => "XID",
246            Self::BeginLoadQueryEvent => "BEGIN_LOAD_QUERY",
247            Self::ExecuteLoadQueryEvent => "EXECUTE_LOAD_QUERY",
248            Self::TableMapEvent => "TABLE_MAP",
249            Self::PreGaWriteRowsEvent => "PRE_GA_WRITE_ROWS",
250            Self::PreGaUpdateRowsEvent => "PRE_GA_UPDATE_ROWS",
251            Self::PreGaDeleteRowsEvent => "PRE_GA_DELETE_ROWS",
252            Self::WriteRowsEventV1 => "WRITE_ROWS_V1",
253            Self::UpdateRowsEventV1 => "UPDATE_ROWS_V1",
254            Self::DeleteRowsEventV1 => "DELETE_ROWS_V1",
255            Self::IncidentEvent => "INCIDENT",
256            Self::HeartbeatEvent => "HEARTBEAT",
257            Self::IgnorableLogEvent => "IGNORABLE",
258            Self::RowsQueryEvent => "ROWS_QUERY",
259            Self::WriteRowsEvent => "WRITE_ROWS_V2",
260            Self::UpdateRowsEvent => "UPDATE_ROWS_V2",
261            Self::DeleteRowsEvent => "DELETE_ROWS_V2",
262            Self::GtidLogEvent => "GTID",
263            Self::AnonymousGtidLogEvent => "ANONYMOUS_GTID",
264            Self::PreviousGtidsLogEvent => "PREVIOUS_GTIDS",
265            Self::TransactionContextEvent => "TRANSACTION_CONTEXT",
266            Self::ViewChangeEvent => "VIEW_CHANGE",
267            Self::XaPrepareLogEvent => "XA_PREPARE",
268            Self::PartialUpdateRowsEvent => "PARTIAL_UPDATE_ROWS",
269            Self::TransactionPayloadEvent => "TRANSACTION_PAYLOAD",
270            Self::HeartbeatEventV2 => "HEARTBEAT_V2",
271            Self::Unknown(_) => "UNKNOWN",
272        }
273    }
274}
275
276impl fmt::Display for BinlogEventType {
277    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
278        match self {
279            Self::Unknown(c) => write!(f, "UNKNOWN({c})"),
280            _ => write!(f, "{}", self.name()),
281        }
282    }
283}
284
285/// The 19-byte common header present at the start of every binlog event.
286///
287/// All integer fields are stored in little-endian byte order.
288///
289/// # Examples
290///
291/// ```
292/// use idb::binlog::CommonEventHeader;
293/// use byteorder::{LittleEndian, ByteOrder};
294///
295/// let mut data = vec![0u8; 19];
296/// LittleEndian::write_u32(&mut data[0..], 1_700_000_000);  // timestamp
297/// data[4] = 15;                                             // FORMAT_DESCRIPTION_EVENT
298/// LittleEndian::write_u32(&mut data[5..], 1);               // server_id
299/// LittleEndian::write_u32(&mut data[9..], 119);             // event_length
300/// LittleEndian::write_u32(&mut data[13..], 123);            // next_position
301/// LittleEndian::write_u16(&mut data[17..], 0);              // flags
302///
303/// let hdr = CommonEventHeader::parse(&data).unwrap();
304/// assert_eq!(hdr.timestamp, 1_700_000_000);
305/// assert_eq!(hdr.event_length, 119);
306/// ```
307#[derive(Debug, Clone, Serialize)]
308pub struct CommonEventHeader {
309    /// Seconds since Unix epoch when the event was created.
310    pub timestamp: u32,
311    /// Event type.
312    pub type_code: BinlogEventType,
313    /// Server ID of the originating MySQL server.
314    pub server_id: u32,
315    /// Total event size in bytes (header + payload + optional checksum).
316    pub event_length: u32,
317    /// Absolute file position of the next event.
318    pub next_position: u32,
319    /// Event flags (e.g. `LOG_EVENT_BINLOG_IN_USE_F`).
320    pub flags: u16,
321}
322
323impl CommonEventHeader {
324    /// Parse a common event header from a byte slice.
325    ///
326    /// Returns `None` if the slice is shorter than [`COMMON_HEADER_SIZE`] (19 bytes).
327    pub fn parse(data: &[u8]) -> Option<Self> {
328        if data.len() < COMMON_HEADER_SIZE {
329            return None;
330        }
331
332        Some(Self {
333            timestamp: LittleEndian::read_u32(&data[EVENT_TIMESTAMP_OFFSET..]),
334            type_code: BinlogEventType::from_u8(data[EVENT_TYPE_OFFSET]),
335            server_id: LittleEndian::read_u32(&data[EVENT_SERVER_ID_OFFSET..]),
336            event_length: LittleEndian::read_u32(&data[EVENT_LENGTH_OFFSET..]),
337            next_position: LittleEndian::read_u32(&data[EVENT_NEXT_POSITION_OFFSET..]),
338            flags: LittleEndian::read_u16(&data[EVENT_FLAGS_OFFSET..]),
339        })
340    }
341
342    /// Byte offset where the event payload begins (always 19).
343    pub fn payload_offset(&self) -> usize {
344        COMMON_HEADER_SIZE
345    }
346
347    /// Size of the event payload in bytes, excluding header and optional checksum.
348    pub fn payload_length(&self, checksum_enabled: bool) -> usize {
349        let total = self.event_length as usize;
350        let overhead = COMMON_HEADER_SIZE
351            + if checksum_enabled {
352                BINLOG_CHECKSUM_LEN
353            } else {
354                0
355            };
356        total.saturating_sub(overhead)
357    }
358}
359
360/// Parsed binlog event payload.
361///
362/// Wraps the typed payload for recognized event types. Events not yet
363/// supported by full parsing are stored as `Unknown` with their raw bytes.
364#[derive(Debug, Clone, Serialize)]
365#[serde(tag = "type")]
366pub enum BinlogEvent {
367    /// FORMAT_DESCRIPTION_EVENT — always the first event after the magic bytes.
368    FormatDescription(FormatDescriptionEvent),
369    /// ROTATE_EVENT — signals rotation to the next binlog file.
370    Rotate(RotateEvent),
371    /// STOP_EVENT — server shutdown, no payload.
372    Stop,
373    /// QUERY_EVENT — SQL query (full parsing deferred to #135).
374    Query {
375        /// Raw payload bytes (query text + metadata).
376        #[serde(skip)]
377        payload: Vec<u8>,
378    },
379    /// XID_EVENT — transaction commit with XA transaction ID.
380    Xid {
381        /// The XID value.
382        xid: u64,
383    },
384    /// Unrecognized or not-yet-parsed event type.
385    Unknown {
386        /// Raw type code.
387        type_code: u8,
388        /// Raw payload bytes.
389        #[serde(skip)]
390        payload: Vec<u8>,
391    },
392}
393
394#[cfg(test)]
395mod tests {
396    use super::*;
397
398    #[test]
399    fn event_type_roundtrip() {
400        for code in 0..=41 {
401            let t = BinlogEventType::from_u8(code);
402            assert_eq!(t.type_code(), code);
403        }
404        // Unknown codes
405        let t = BinlogEventType::from_u8(200);
406        assert_eq!(t.type_code(), 200);
407        assert!(matches!(t, BinlogEventType::Unknown(200)));
408    }
409
410    #[test]
411    fn event_type_display() {
412        assert_eq!(
413            BinlogEventType::FormatDescription.to_string(),
414            "FORMAT_DESCRIPTION"
415        );
416        assert_eq!(BinlogEventType::StopEvent.to_string(), "STOP");
417        assert_eq!(BinlogEventType::Unknown(99).to_string(), "UNKNOWN(99)");
418    }
419
420    #[test]
421    fn event_type_name() {
422        assert_eq!(BinlogEventType::QueryEvent.name(), "QUERY");
423        assert_eq!(BinlogEventType::TableMapEvent.name(), "TABLE_MAP");
424        assert_eq!(BinlogEventType::WriteRowsEvent.name(), "WRITE_ROWS_V2");
425        assert_eq!(BinlogEventType::GtidLogEvent.name(), "GTID");
426        assert_eq!(BinlogEventType::HeartbeatEventV2.name(), "HEARTBEAT_V2");
427    }
428
429    #[test]
430    fn parse_common_header() {
431        let mut data = vec![0u8; 19];
432        LittleEndian::write_u32(&mut data[0..], 1_700_000_000);
433        data[4] = FORMAT_DESCRIPTION_EVENT;
434        LittleEndian::write_u32(&mut data[5..], 42);
435        LittleEndian::write_u32(&mut data[9..], 119);
436        LittleEndian::write_u32(&mut data[13..], 123);
437        LittleEndian::write_u16(&mut data[17..], 0x0001);
438
439        let hdr = CommonEventHeader::parse(&data).unwrap();
440        assert_eq!(hdr.timestamp, 1_700_000_000);
441        assert_eq!(hdr.type_code, BinlogEventType::FormatDescription);
442        assert_eq!(hdr.server_id, 42);
443        assert_eq!(hdr.event_length, 119);
444        assert_eq!(hdr.next_position, 123);
445        assert_eq!(hdr.flags, 0x0001);
446    }
447
448    #[test]
449    fn parse_common_header_too_short() {
450        let data = vec![0u8; 18];
451        assert!(CommonEventHeader::parse(&data).is_none());
452    }
453
454    #[test]
455    fn payload_length_with_checksum() {
456        let mut data = vec![0u8; 19];
457        LittleEndian::write_u32(&mut data[9..], 100); // event_length = 100
458        let hdr = CommonEventHeader::parse(&data).unwrap();
459
460        // Without checksum: 100 - 19 = 81
461        assert_eq!(hdr.payload_length(false), 81);
462        // With checksum: 100 - 19 - 4 = 77
463        assert_eq!(hdr.payload_length(true), 77);
464    }
465}