mysqlbinlog_network/mysql_binlog/
event.rs

1use std::fmt;
2use std::io::{self, Cursor, ErrorKind, Read, Seek};
3
4use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
5use serde_derive::Serialize;
6use uuid::Uuid;
7
8use crate::mysql_binlog::bit_set::BitSet;
9use crate::mysql_binlog::column_types::ColumnType;
10use crate::mysql_binlog::errors::EventParseError::EofError;
11use crate::mysql_binlog::errors::{ColumnParseError, EventParseError};
12use crate::mysql_binlog::packet_helpers::*;
13use crate::mysql_binlog::table_map::{SingleTableMap, TableMap};
14use crate::mysql_binlog::tell::Tell;
15use crate::mysql_binlog::value::MySQLValue;
16
17#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize)]
18#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
19pub enum TypeCode {
20    Unknown,
21    StartEventV3,
22    QueryEvent,
23    StopEvent,
24    RotateEvent,
25    IntvarEvent,
26    LoadEvent,
27    SlaveEvent,
28    CreateFileEvent,
29    AppendBlockEvent,
30    ExecLoadEvent,
31    DeleteFileEvent,
32    NewLoadEvent,
33    RandEvent,
34    UserVarEvent,
35    FormatDescriptionEvent,
36    XidEvent,
37    BeginLoadQueryEvent,
38    ExecuteLoadQueryEvent,
39    TableMapEvent,
40    PreGaWriteRowsEvent,
41    PreGaUpdateRowsEvent,
42    PreGaDeleteRowsEvent,
43    WriteRowsEventV1,
44    UpdateRowsEventV1,
45    DeleteRowsEventV1,
46    IncidentEvent,
47    HeartbeatLogEvent,
48    IgnorableLogEvent,
49    RowsQueryLogEvent,
50    WriteRowsEventV2,
51    UpdateRowsEventV2,
52    DeleteRowsEventV2,
53    GtidLogEvent,
54    AnonymousGtidLogEvent,
55    PreviousGtidsLogEvent,
56    OtherUnknown(u8),
57}
58
59impl TypeCode {
60    pub fn from_byte(b: u8) -> Self {
61        match b {
62            0 => TypeCode::Unknown,
63            1 => TypeCode::StartEventV3,
64            2 => TypeCode::QueryEvent,
65            3 => TypeCode::StopEvent,
66            4 => TypeCode::RotateEvent,
67            5 => TypeCode::IntvarEvent,
68            6 => TypeCode::LoadEvent,
69            7 => TypeCode::SlaveEvent,
70            8 => TypeCode::CreateFileEvent,
71            9 => TypeCode::AppendBlockEvent,
72            10 => TypeCode::ExecLoadEvent,
73            11 => TypeCode::DeleteFileEvent,
74            12 => TypeCode::NewLoadEvent,
75            13 => TypeCode::RandEvent,
76            14 => TypeCode::UserVarEvent,
77            15 => TypeCode::FormatDescriptionEvent,
78            16 => TypeCode::XidEvent,
79            17 => TypeCode::BeginLoadQueryEvent,
80            18 => TypeCode::ExecuteLoadQueryEvent,
81            19 => TypeCode::TableMapEvent,
82            20 => TypeCode::PreGaWriteRowsEvent,
83            21 => TypeCode::PreGaUpdateRowsEvent,
84            22 => TypeCode::PreGaDeleteRowsEvent,
85            23 => TypeCode::WriteRowsEventV1,
86            24 => TypeCode::UpdateRowsEventV1,
87            25 => TypeCode::DeleteRowsEventV1,
88            26 => TypeCode::IncidentEvent,
89            27 => TypeCode::HeartbeatLogEvent,
90            28 => TypeCode::IgnorableLogEvent,
91            29 => TypeCode::RowsQueryLogEvent,
92            30 => TypeCode::WriteRowsEventV2,
93            31 => TypeCode::UpdateRowsEventV2,
94            32 => TypeCode::DeleteRowsEventV2,
95            33 => TypeCode::GtidLogEvent,
96            34 => TypeCode::AnonymousGtidLogEvent,
97            35 => TypeCode::PreviousGtidsLogEvent,
98            i => TypeCode::OtherUnknown(i),
99        }
100    }
101}
102
103#[derive(Debug, Serialize)]
104pub enum ChecksumAlgorithm {
105    None,
106    CRC32,
107    Other(u8),
108}
109
110impl From<u8> for ChecksumAlgorithm {
111    fn from(byte: u8) -> Self {
112        match byte {
113            0x00 => ChecksumAlgorithm::None,
114            0x01 => ChecksumAlgorithm::CRC32,
115            other => ChecksumAlgorithm::Other(other),
116        }
117    }
118}
119
120pub type RowData = Vec<Option<MySQLValue>>;
121
122#[derive(Debug)]
123pub enum EventData {
124    EventHeader {
125        timestamp: u32,
126        event_type: TypeCode,
127        server_id: u32,
128        event_size: u32,
129        log_pos: u32,
130        flags: u16,
131    },
132    XIDEvent {
133        xid: u64,
134    },
135    RotateEvent {
136        pos: u64,
137        next_log_name: String,
138    },
139    GtidLogEvent {
140        flags: u8,
141        uuid: Uuid,
142        coordinate: u64,
143        last_committed: Option<u64>,
144        sequence_number: Option<u64>,
145    },
146    QueryEvent {
147        thread_id: u32,
148        exec_time: u32,
149        error_code: i16,
150        schema: String,
151        query: String,
152    },
153    FormatDescriptionEvent {
154        binlog_version: u16,
155        server_version: String,
156        create_timestamp: u32,
157        common_header_len: u8,
158        checksum_algorithm: ChecksumAlgorithm,
159    },
160    TableMapEvent {
161        table_id: u64,
162        schema_name: String,
163        table_name: String,
164        columns: Vec<ColumnType>,
165        null_bitmap: BitSet,
166    },
167    WriteRowsEvent {
168        table_id: u64,
169        rows: Vec<RowEvent>,
170    },
171    UpdateRowsEvent {
172        table_id: u64,
173        rows: Vec<RowEvent>,
174    },
175    DeleteRowsEvent {
176        table_id: u64,
177        rows: Vec<RowEvent>,
178    },
179}
180
181struct RowsEvent {
182    table_id: u64,
183    rows: Vec<RowEvent>,
184}
185
186fn parse_one_row<R: Read + Seek>(
187    mut cursor: &mut R,
188    this_table_map: &SingleTableMap,
189    present_bitmask: &BitSet,
190) -> Result<RowData, ColumnParseError> {
191    let num_set_columns = present_bitmask.bits_set();
192    let null_bitmask_size = (num_set_columns + 7) >> 3;
193    let mut row = Vec::with_capacity(this_table_map.columns.len());
194    let null_bitmask = BitSet::from_slice(
195        num_set_columns,
196        &read_nbytes(&mut cursor, null_bitmask_size)?,
197    )
198    .unwrap();
199    let mut null_index = 0;
200    for (i, column_definition) in this_table_map.columns.iter().enumerate() {
201        if !present_bitmask.is_set(i) {
202            row.push(None);
203            continue;
204        }
205        let is_null = null_bitmask.is_set(null_index);
206        let val = if is_null {
207            MySQLValue::Null
208        } else {
209            //println!("parsing column {} ({:?})", i, column_definition);
210            column_definition.read_value(&mut cursor)?
211        };
212        row.push(Some(val));
213        null_index += 1;
214    }
215    //println!("finished row: {:?}", row);
216    Ok(row)
217}
218
219#[derive(Debug, Serialize)]
220#[serde(untagged)]
221pub enum RowEvent {
222    NewRow {
223        cols: RowData,
224    },
225    DeletedRow {
226        cols: RowData,
227    },
228    UpdatedRow {
229        before_cols: RowData,
230        after_cols: RowData,
231    },
232}
233
234impl RowEvent {
235    pub fn cols(&self) -> Option<&RowData> {
236        match self {
237            RowEvent::NewRow { cols } => Some(cols),
238            RowEvent::DeletedRow { cols } => Some(cols),
239            RowEvent::UpdatedRow { .. } => None,
240        }
241    }
242}
243
244fn parse_rows_event<R: Read + Seek>(
245    type_code: TypeCode,
246    data_len: usize,
247    mut cursor: &mut R,
248    table_map: Option<&TableMap>,
249) -> Result<RowsEvent, ColumnParseError> {
250    let mut table_id_buf = [0u8; 8];
251    cursor.read_exact(&mut table_id_buf[0..6])?;
252    let table_id = LittleEndian::read_u64(&table_id_buf);
253    // two-byte reserved value
254    cursor.seek(io::SeekFrom::Current(2))?;
255    match type_code {
256        TypeCode::WriteRowsEventV2 | TypeCode::UpdateRowsEventV2 | TypeCode::DeleteRowsEventV2 => {
257            let _ = cursor.read_i16::<LittleEndian>()?;
258        }
259        _ => {}
260    }
261    let num_columns = read_variable_length_integer(&mut cursor)? as usize;
262    let bitmask_size = (num_columns + 7) >> 3;
263    let before_column_bitmask =
264        BitSet::from_slice(num_columns, &read_nbytes(&mut cursor, bitmask_size)?).unwrap();
265    let after_column_bitmask = match type_code {
266        TypeCode::UpdateRowsEventV1 | TypeCode::UpdateRowsEventV2 => {
267            Some(BitSet::from_slice(num_columns, &read_nbytes(&mut cursor, bitmask_size)?).unwrap())
268        }
269        _ => None,
270    };
271    let mut rows = Vec::with_capacity(1);
272    if let Some(table_map) = table_map {
273        if let Some(this_table_map) = table_map.get(table_id) {
274            loop {
275                let pos = cursor.tell()? as usize;
276                if data_len - pos < 1 {
277                    break;
278                }
279                match type_code {
280                    TypeCode::WriteRowsEventV1 | TypeCode::WriteRowsEventV2 => {
281                        rows.push(RowEvent::NewRow {
282                            cols: parse_one_row(
283                                &mut cursor,
284                                this_table_map,
285                                &before_column_bitmask,
286                            )?,
287                        });
288                    }
289                    TypeCode::UpdateRowsEventV1 | TypeCode::UpdateRowsEventV2 => {
290                        rows.push(RowEvent::UpdatedRow {
291                            before_cols: parse_one_row(
292                                &mut cursor,
293                                this_table_map,
294                                &before_column_bitmask,
295                            )?,
296                            after_cols: parse_one_row(
297                                &mut cursor,
298                                this_table_map,
299                                after_column_bitmask.as_ref().unwrap(),
300                            )?,
301                        })
302                    }
303                    TypeCode::DeleteRowsEventV1 | TypeCode::DeleteRowsEventV2 => {
304                        rows.push(RowEvent::DeletedRow {
305                            cols: parse_one_row(
306                                &mut cursor,
307                                this_table_map,
308                                &before_column_bitmask,
309                            )?,
310                        });
311                    }
312                    _ => unimplemented!(),
313                }
314            }
315        }
316    }
317    Ok(RowsEvent { table_id, rows })
318}
319pub const EVENT_HEADER_SIZE: usize = 19;
320pub const BINLOG_CHECKSUM_LENGTH: usize = 4;
321impl EventData {
322    pub fn parse_header(data: &[u8]) -> Result<Option<EventData>, EventParseError> {
323        if data.len() < EVENT_HEADER_SIZE {
324            return Err(EofError);
325        }
326        let mut cursor = Cursor::new(data);
327        Ok(Some(EventData::EventHeader {
328            timestamp: cursor.read_u32::<LittleEndian>()?,
329            event_type: TypeCode::from_byte(cursor.read_u8()?),
330            server_id: cursor.read_u32::<LittleEndian>()?,
331            event_size: cursor.read_u32::<LittleEndian>()?,
332            log_pos: cursor.read_u32::<LittleEndian>()?,
333            flags: cursor.read_u16::<LittleEndian>()?,
334        }))
335    }
336    pub fn from_data(
337        type_code: TypeCode,
338        data: &[u8],
339        table_map: Option<&TableMap>,
340    ) -> Result<Option<Self>, EventParseError> {
341        let mut cursor = Cursor::new(data);
342        match type_code {
343            TypeCode::XidEvent => Ok(Some(EventData::XIDEvent {
344                xid: cursor.read_u64::<LittleEndian>()?,
345            })),
346            TypeCode::RotateEvent => {
347                let log_name = match String::from_utf8(Vec::from(&data[8..])) {
348                    Ok(d) => d,
349                    Err(_e) => return Err(EofError),
350                };
351                Ok(Some(EventData::RotateEvent {
352                    pos: cursor.read_u64::<LittleEndian>()?,
353                    next_log_name: log_name,
354                }))
355            }
356            TypeCode::FormatDescriptionEvent => {
357                let binlog_version = cursor.read_u16::<LittleEndian>()?;
358                if binlog_version != 4 {
359                    unimplemented!("can only parse a version 4 binary log");
360                }
361                let mut server_version_buf = [0u8; 50];
362                cursor.read_exact(&mut server_version_buf)?;
363                let server_version = ::std::str::from_utf8(
364                    server_version_buf
365                        .split(|c| *c == 0x00)
366                        .next()
367                        .unwrap_or(&[]),
368                )
369                .unwrap()
370                .to_owned();
371                let create_timestamp = cursor.read_u32::<LittleEndian>()?;
372                let common_header_len = cursor.read_u8()?;
373                let event_types = data.len() - 2 - 50 - 4 - 1 - 5;
374                let mut event_sizes_tables = vec![0u8; event_types];
375                cursor.read_exact(&mut event_sizes_tables)?;
376                let checksum_algo = ChecksumAlgorithm::from(cursor.read_u8()?);
377                let mut checksum_buf = [0u8; 4];
378                cursor.read_exact(&mut checksum_buf)?;
379                Ok(Some(EventData::FormatDescriptionEvent {
380                    binlog_version,
381                    server_version,
382                    create_timestamp,
383                    common_header_len,
384                    checksum_algorithm: checksum_algo,
385                }))
386            }
387            TypeCode::GtidLogEvent => {
388                let flags = cursor.read_u8()?;
389                let mut uuid_buf = [0u8; 16];
390                cursor.read_exact(&mut uuid_buf)?;
391                let uuid = Uuid::from_slice(&uuid_buf)?;
392                let offset = cursor.read_u64::<LittleEndian>()?;
393                let (last_committed, sequence_number) = match cursor.read_u8() {
394                    Ok(0x02) => {
395                        let last_committed = cursor.read_u64::<LittleEndian>()?;
396                        let sequence_number = cursor.read_u64::<LittleEndian>()?;
397                        (Some(last_committed), Some(sequence_number))
398                    }
399                    _ => (None, None),
400                };
401                Ok(Some(EventData::GtidLogEvent {
402                    flags,
403                    uuid,
404                    coordinate: offset,
405                    last_committed,
406                    sequence_number,
407                }))
408            }
409            TypeCode::QueryEvent => {
410                let thread_id = cursor.read_u32::<LittleEndian>()?;
411                let execution_time = cursor.read_u32::<LittleEndian>()?;
412                let schema_len = cursor.read_u8()?;
413                let error_code = cursor.read_i16::<LittleEndian>()?;
414                let _status_vars = read_two_byte_length_prefixed_bytes(&mut cursor)?;
415                let schema =
416                    String::from_utf8_lossy(&read_nbytes(&mut cursor, schema_len)?).into_owned();
417                cursor.seek(io::SeekFrom::Current(1))?;
418                let mut statement = String::new();
419                cursor.read_to_string(&mut statement)?;
420                Ok(Some(EventData::QueryEvent {
421                    thread_id,
422                    exec_time: execution_time,
423                    error_code,
424                    schema,
425                    query: statement,
426                }))
427            }
428            TypeCode::TableMapEvent => {
429                let mut table_id_buf = [0u8; 8];
430                cursor.read_exact(&mut table_id_buf[0..6])?;
431                let table_id = LittleEndian::read_u64(&table_id_buf);
432                // two-byte reserved value
433                cursor.seek(io::SeekFrom::Current(2))?;
434                let schema_name = read_one_byte_length_prefixed_string(&mut cursor)?;
435                // nul byte
436                cursor.seek(io::SeekFrom::Current(1))?;
437                let table_name = read_one_byte_length_prefixed_string(&mut cursor)?;
438                // nul byte
439                cursor.seek(io::SeekFrom::Current(1))?;
440                //println!("parsing table map for {}.{}", schema_name, table_name);
441                let column_count = read_variable_length_integer(&mut cursor)? as usize;
442                let mut columns = Vec::with_capacity(column_count);
443                for _ in 0..column_count {
444                    let column_type = ColumnType::from_byte(cursor.read_u8()?);
445                    columns.push(column_type);
446                }
447                //let pos = cursor.tell()? as usize;
448                //println!("column types: {:?}", columns);
449                //println!("top of metadata: remaining table map data: {:?}", &data[pos..]);
450                let _metadata_length = read_variable_length_integer(&mut cursor)? as usize;
451                let final_columns = columns
452                    .into_iter()
453                    .map(|c| c.read_metadata(&mut cursor))
454                    .collect::<Result<Vec<_>, _>>()?;
455                //println!("finished decoding metadata; columns: {:?}", final_columns);
456                //let end_of_map_pos = cursor.seek(io::SeekFrom::Current(0))? as usize;
457                let num_columns = final_columns.len();
458                let null_bitmask_size = (num_columns + 7) >> 3;
459                let null_bitmap_source = read_nbytes(&mut cursor, null_bitmask_size)?;
460                let nullable_bitmap = BitSet::from_slice(num_columns, &null_bitmap_source).unwrap();
461                Ok(Some(EventData::TableMapEvent {
462                    table_id,
463                    schema_name,
464                    table_name,
465                    columns: final_columns,
466                    null_bitmap: nullable_bitmap,
467                }))
468            }
469            TypeCode::WriteRowsEventV1 | TypeCode::WriteRowsEventV2 => {
470                let ev = parse_rows_event(type_code, data.len(), &mut cursor, table_map)?;
471                Ok(Some(EventData::WriteRowsEvent {
472                    table_id: ev.table_id,
473                    rows: ev.rows,
474                }))
475            }
476            TypeCode::UpdateRowsEventV1 | TypeCode::UpdateRowsEventV2 => {
477                let ev = parse_rows_event(type_code, data.len(), &mut cursor, table_map)?;
478                Ok(Some(EventData::UpdateRowsEvent {
479                    table_id: ev.table_id,
480                    rows: ev.rows,
481                }))
482            }
483            TypeCode::DeleteRowsEventV1 | TypeCode::DeleteRowsEventV2 => {
484                let ev = parse_rows_event(type_code, data.len(), &mut cursor, table_map)?;
485                Ok(Some(EventData::DeleteRowsEvent {
486                    table_id: ev.table_id,
487                    rows: ev.rows,
488                }))
489            }
490            _ => Ok(None),
491        }
492    }
493}
494
495pub struct Event {
496    timestamp: u32,
497    type_code: TypeCode,
498    server_id: u32,
499    event_length: u32,
500    next_position: u32,
501    flags: u16,
502    data: Vec<u8>,
503    offset: u64,
504}
505
506impl fmt::Debug for Event {
507    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
508        write!(f, "Event {{ timestamp: {:?}, type_code: {:?}, server_id: {:?}, data_len: {:?}, offset: {:?} }}", self.timestamp, self.type_code, self.server_id, self.data.len(), self.offset)
509    }
510}
511
512// TODO: determine this by examining the server version
513const HAS_CHECKSUM: bool = true;
514
515impl Event {
516    pub fn read<R: Read>(reader: &mut R, offset: u64) -> Result<Self, EventParseError> {
517        let mut header = [0u8; 19];
518        match reader.read_exact(&mut header) {
519            Ok(_) => {}
520            Err(ref e) if e.kind() == ErrorKind::UnexpectedEof => {
521                return Err(EventParseError::EofError.into())
522            }
523            Err(e) => return Err(e.into()),
524        }
525        let mut c = Cursor::new(header);
526        let timestamp = c.read_u32::<LittleEndian>()?;
527        let type_code = TypeCode::from_byte(c.read_u8()?);
528        let server_id = c.read_u32::<LittleEndian>()?;
529        let event_length = c.read_u32::<LittleEndian>()?;
530        let next_position = c.read_u32::<LittleEndian>()?;
531        let flags = c.read_u16::<LittleEndian>()?;
532        let mut data_length: usize = (event_length - 19) as usize;
533        if HAS_CHECKSUM {
534            data_length -= 4;
535        }
536        //println!("finished reading event header with type_code {:?} event_length {} and next_position {}", type_code, event_length, next_position);
537        let mut data = vec![0u8; data_length];
538        reader.read_exact(&mut data)?;
539        //println!("finished reading body");
540        Ok(Event {
541            timestamp,
542            type_code,
543            server_id,
544            event_length,
545            next_position,
546            flags,
547            data,
548            offset,
549        })
550    }
551
552    pub fn type_code(&self) -> TypeCode {
553        self.type_code
554    }
555
556    pub fn timestamp(&self) -> u32 {
557        self.timestamp
558    }
559
560    pub fn next_position(&self) -> u64 {
561        u64::from(self.next_position)
562    }
563
564    pub fn inner(
565        &self,
566        table_map: Option<&TableMap>,
567    ) -> Result<Option<EventData>, EventParseError> {
568        EventData::from_data(self.type_code, &self.data, table_map).map_err(Into::into)
569    }
570
571    pub fn data(&self) -> &Vec<u8> {
572        &self.data
573    }
574
575    pub fn flags(&self) -> u16 {
576        self.flags
577    }
578
579    pub fn event_length(&self) -> u32 {
580        self.event_length
581    }
582
583    pub fn offset(&self) -> u64 {
584        self.offset
585    }
586}