1use std::fmt;
2use std::io::{self, Cursor, ErrorKind, Read, Seek};
3
4use byteorder::{ByteOrder, LittleEndian, ReadBytesExt};
5use serde_derive::Serialize;
6use uuid::Uuid;
7
8use crate::mysql_binlog::bit_set::BitSet;
9use crate::mysql_binlog::column_types::ColumnType;
10use crate::mysql_binlog::errors::EventParseError::EofError;
11use crate::mysql_binlog::errors::{ColumnParseError, EventParseError};
12use crate::mysql_binlog::packet_helpers::*;
13use crate::mysql_binlog::table_map::{SingleTableMap, TableMap};
14use crate::mysql_binlog::tell::Tell;
15use crate::mysql_binlog::value::MySQLValue;
16
17#[derive(Debug, PartialEq, Eq, Clone, Copy, Serialize)]
18#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
19pub enum TypeCode {
20 Unknown,
21 StartEventV3,
22 QueryEvent,
23 StopEvent,
24 RotateEvent,
25 IntvarEvent,
26 LoadEvent,
27 SlaveEvent,
28 CreateFileEvent,
29 AppendBlockEvent,
30 ExecLoadEvent,
31 DeleteFileEvent,
32 NewLoadEvent,
33 RandEvent,
34 UserVarEvent,
35 FormatDescriptionEvent,
36 XidEvent,
37 BeginLoadQueryEvent,
38 ExecuteLoadQueryEvent,
39 TableMapEvent,
40 PreGaWriteRowsEvent,
41 PreGaUpdateRowsEvent,
42 PreGaDeleteRowsEvent,
43 WriteRowsEventV1,
44 UpdateRowsEventV1,
45 DeleteRowsEventV1,
46 IncidentEvent,
47 HeartbeatLogEvent,
48 IgnorableLogEvent,
49 RowsQueryLogEvent,
50 WriteRowsEventV2,
51 UpdateRowsEventV2,
52 DeleteRowsEventV2,
53 GtidLogEvent,
54 AnonymousGtidLogEvent,
55 PreviousGtidsLogEvent,
56 OtherUnknown(u8),
57}
58
59impl TypeCode {
60 pub fn from_byte(b: u8) -> Self {
61 match b {
62 0 => TypeCode::Unknown,
63 1 => TypeCode::StartEventV3,
64 2 => TypeCode::QueryEvent,
65 3 => TypeCode::StopEvent,
66 4 => TypeCode::RotateEvent,
67 5 => TypeCode::IntvarEvent,
68 6 => TypeCode::LoadEvent,
69 7 => TypeCode::SlaveEvent,
70 8 => TypeCode::CreateFileEvent,
71 9 => TypeCode::AppendBlockEvent,
72 10 => TypeCode::ExecLoadEvent,
73 11 => TypeCode::DeleteFileEvent,
74 12 => TypeCode::NewLoadEvent,
75 13 => TypeCode::RandEvent,
76 14 => TypeCode::UserVarEvent,
77 15 => TypeCode::FormatDescriptionEvent,
78 16 => TypeCode::XidEvent,
79 17 => TypeCode::BeginLoadQueryEvent,
80 18 => TypeCode::ExecuteLoadQueryEvent,
81 19 => TypeCode::TableMapEvent,
82 20 => TypeCode::PreGaWriteRowsEvent,
83 21 => TypeCode::PreGaUpdateRowsEvent,
84 22 => TypeCode::PreGaDeleteRowsEvent,
85 23 => TypeCode::WriteRowsEventV1,
86 24 => TypeCode::UpdateRowsEventV1,
87 25 => TypeCode::DeleteRowsEventV1,
88 26 => TypeCode::IncidentEvent,
89 27 => TypeCode::HeartbeatLogEvent,
90 28 => TypeCode::IgnorableLogEvent,
91 29 => TypeCode::RowsQueryLogEvent,
92 30 => TypeCode::WriteRowsEventV2,
93 31 => TypeCode::UpdateRowsEventV2,
94 32 => TypeCode::DeleteRowsEventV2,
95 33 => TypeCode::GtidLogEvent,
96 34 => TypeCode::AnonymousGtidLogEvent,
97 35 => TypeCode::PreviousGtidsLogEvent,
98 i => TypeCode::OtherUnknown(i),
99 }
100 }
101}
102
103#[derive(Debug, Serialize)]
104pub enum ChecksumAlgorithm {
105 None,
106 CRC32,
107 Other(u8),
108}
109
110impl From<u8> for ChecksumAlgorithm {
111 fn from(byte: u8) -> Self {
112 match byte {
113 0x00 => ChecksumAlgorithm::None,
114 0x01 => ChecksumAlgorithm::CRC32,
115 other => ChecksumAlgorithm::Other(other),
116 }
117 }
118}
119
120pub type RowData = Vec<Option<MySQLValue>>;
121
122#[derive(Debug)]
123pub enum EventData {
124 EventHeader {
125 timestamp: u32,
126 event_type: TypeCode,
127 server_id: u32,
128 event_size: u32,
129 log_pos: u32,
130 flags: u16,
131 },
132 XIDEvent {
133 xid: u64,
134 },
135 RotateEvent {
136 pos: u64,
137 next_log_name: String,
138 },
139 GtidLogEvent {
140 flags: u8,
141 uuid: Uuid,
142 coordinate: u64,
143 last_committed: Option<u64>,
144 sequence_number: Option<u64>,
145 },
146 QueryEvent {
147 thread_id: u32,
148 exec_time: u32,
149 error_code: i16,
150 schema: String,
151 query: String,
152 },
153 FormatDescriptionEvent {
154 binlog_version: u16,
155 server_version: String,
156 create_timestamp: u32,
157 common_header_len: u8,
158 checksum_algorithm: ChecksumAlgorithm,
159 },
160 TableMapEvent {
161 table_id: u64,
162 schema_name: String,
163 table_name: String,
164 columns: Vec<ColumnType>,
165 null_bitmap: BitSet,
166 },
167 WriteRowsEvent {
168 table_id: u64,
169 rows: Vec<RowEvent>,
170 },
171 UpdateRowsEvent {
172 table_id: u64,
173 rows: Vec<RowEvent>,
174 },
175 DeleteRowsEvent {
176 table_id: u64,
177 rows: Vec<RowEvent>,
178 },
179}
180
181struct RowsEvent {
182 table_id: u64,
183 rows: Vec<RowEvent>,
184}
185
186fn parse_one_row<R: Read + Seek>(
187 mut cursor: &mut R,
188 this_table_map: &SingleTableMap,
189 present_bitmask: &BitSet,
190) -> Result<RowData, ColumnParseError> {
191 let num_set_columns = present_bitmask.bits_set();
192 let null_bitmask_size = (num_set_columns + 7) >> 3;
193 let mut row = Vec::with_capacity(this_table_map.columns.len());
194 let null_bitmask = BitSet::from_slice(
195 num_set_columns,
196 &read_nbytes(&mut cursor, null_bitmask_size)?,
197 )
198 .unwrap();
199 let mut null_index = 0;
200 for (i, column_definition) in this_table_map.columns.iter().enumerate() {
201 if !present_bitmask.is_set(i) {
202 row.push(None);
203 continue;
204 }
205 let is_null = null_bitmask.is_set(null_index);
206 let val = if is_null {
207 MySQLValue::Null
208 } else {
209 column_definition.read_value(&mut cursor)?
211 };
212 row.push(Some(val));
213 null_index += 1;
214 }
215 Ok(row)
217}
218
219#[derive(Debug, Serialize)]
220#[serde(untagged)]
221pub enum RowEvent {
222 NewRow {
223 cols: RowData,
224 },
225 DeletedRow {
226 cols: RowData,
227 },
228 UpdatedRow {
229 before_cols: RowData,
230 after_cols: RowData,
231 },
232}
233
234impl RowEvent {
235 pub fn cols(&self) -> Option<&RowData> {
236 match self {
237 RowEvent::NewRow { cols } => Some(cols),
238 RowEvent::DeletedRow { cols } => Some(cols),
239 RowEvent::UpdatedRow { .. } => None,
240 }
241 }
242}
243
244fn parse_rows_event<R: Read + Seek>(
245 type_code: TypeCode,
246 data_len: usize,
247 mut cursor: &mut R,
248 table_map: Option<&TableMap>,
249) -> Result<RowsEvent, ColumnParseError> {
250 let mut table_id_buf = [0u8; 8];
251 cursor.read_exact(&mut table_id_buf[0..6])?;
252 let table_id = LittleEndian::read_u64(&table_id_buf);
253 cursor.seek(io::SeekFrom::Current(2))?;
255 match type_code {
256 TypeCode::WriteRowsEventV2 | TypeCode::UpdateRowsEventV2 | TypeCode::DeleteRowsEventV2 => {
257 let _ = cursor.read_i16::<LittleEndian>()?;
258 }
259 _ => {}
260 }
261 let num_columns = read_variable_length_integer(&mut cursor)? as usize;
262 let bitmask_size = (num_columns + 7) >> 3;
263 let before_column_bitmask =
264 BitSet::from_slice(num_columns, &read_nbytes(&mut cursor, bitmask_size)?).unwrap();
265 let after_column_bitmask = match type_code {
266 TypeCode::UpdateRowsEventV1 | TypeCode::UpdateRowsEventV2 => {
267 Some(BitSet::from_slice(num_columns, &read_nbytes(&mut cursor, bitmask_size)?).unwrap())
268 }
269 _ => None,
270 };
271 let mut rows = Vec::with_capacity(1);
272 if let Some(table_map) = table_map {
273 if let Some(this_table_map) = table_map.get(table_id) {
274 loop {
275 let pos = cursor.tell()? as usize;
276 if data_len - pos < 1 {
277 break;
278 }
279 match type_code {
280 TypeCode::WriteRowsEventV1 | TypeCode::WriteRowsEventV2 => {
281 rows.push(RowEvent::NewRow {
282 cols: parse_one_row(
283 &mut cursor,
284 this_table_map,
285 &before_column_bitmask,
286 )?,
287 });
288 }
289 TypeCode::UpdateRowsEventV1 | TypeCode::UpdateRowsEventV2 => {
290 rows.push(RowEvent::UpdatedRow {
291 before_cols: parse_one_row(
292 &mut cursor,
293 this_table_map,
294 &before_column_bitmask,
295 )?,
296 after_cols: parse_one_row(
297 &mut cursor,
298 this_table_map,
299 after_column_bitmask.as_ref().unwrap(),
300 )?,
301 })
302 }
303 TypeCode::DeleteRowsEventV1 | TypeCode::DeleteRowsEventV2 => {
304 rows.push(RowEvent::DeletedRow {
305 cols: parse_one_row(
306 &mut cursor,
307 this_table_map,
308 &before_column_bitmask,
309 )?,
310 });
311 }
312 _ => unimplemented!(),
313 }
314 }
315 }
316 }
317 Ok(RowsEvent { table_id, rows })
318}
319pub const EVENT_HEADER_SIZE: usize = 19;
320pub const BINLOG_CHECKSUM_LENGTH: usize = 4;
321impl EventData {
322 pub fn parse_header(data: &[u8]) -> Result<Option<EventData>, EventParseError> {
323 if data.len() < EVENT_HEADER_SIZE {
324 return Err(EofError);
325 }
326 let mut cursor = Cursor::new(data);
327 Ok(Some(EventData::EventHeader {
328 timestamp: cursor.read_u32::<LittleEndian>()?,
329 event_type: TypeCode::from_byte(cursor.read_u8()?),
330 server_id: cursor.read_u32::<LittleEndian>()?,
331 event_size: cursor.read_u32::<LittleEndian>()?,
332 log_pos: cursor.read_u32::<LittleEndian>()?,
333 flags: cursor.read_u16::<LittleEndian>()?,
334 }))
335 }
336 pub fn from_data(
337 type_code: TypeCode,
338 data: &[u8],
339 table_map: Option<&TableMap>,
340 ) -> Result<Option<Self>, EventParseError> {
341 let mut cursor = Cursor::new(data);
342 match type_code {
343 TypeCode::XidEvent => Ok(Some(EventData::XIDEvent {
344 xid: cursor.read_u64::<LittleEndian>()?,
345 })),
346 TypeCode::RotateEvent => {
347 let log_name = match String::from_utf8(Vec::from(&data[8..])) {
348 Ok(d) => d,
349 Err(_e) => return Err(EofError),
350 };
351 Ok(Some(EventData::RotateEvent {
352 pos: cursor.read_u64::<LittleEndian>()?,
353 next_log_name: log_name,
354 }))
355 }
356 TypeCode::FormatDescriptionEvent => {
357 let binlog_version = cursor.read_u16::<LittleEndian>()?;
358 if binlog_version != 4 {
359 unimplemented!("can only parse a version 4 binary log");
360 }
361 let mut server_version_buf = [0u8; 50];
362 cursor.read_exact(&mut server_version_buf)?;
363 let server_version = ::std::str::from_utf8(
364 server_version_buf
365 .split(|c| *c == 0x00)
366 .next()
367 .unwrap_or(&[]),
368 )
369 .unwrap()
370 .to_owned();
371 let create_timestamp = cursor.read_u32::<LittleEndian>()?;
372 let common_header_len = cursor.read_u8()?;
373 let event_types = data.len() - 2 - 50 - 4 - 1 - 5;
374 let mut event_sizes_tables = vec![0u8; event_types];
375 cursor.read_exact(&mut event_sizes_tables)?;
376 let checksum_algo = ChecksumAlgorithm::from(cursor.read_u8()?);
377 let mut checksum_buf = [0u8; 4];
378 cursor.read_exact(&mut checksum_buf)?;
379 Ok(Some(EventData::FormatDescriptionEvent {
380 binlog_version,
381 server_version,
382 create_timestamp,
383 common_header_len,
384 checksum_algorithm: checksum_algo,
385 }))
386 }
387 TypeCode::GtidLogEvent => {
388 let flags = cursor.read_u8()?;
389 let mut uuid_buf = [0u8; 16];
390 cursor.read_exact(&mut uuid_buf)?;
391 let uuid = Uuid::from_slice(&uuid_buf)?;
392 let offset = cursor.read_u64::<LittleEndian>()?;
393 let (last_committed, sequence_number) = match cursor.read_u8() {
394 Ok(0x02) => {
395 let last_committed = cursor.read_u64::<LittleEndian>()?;
396 let sequence_number = cursor.read_u64::<LittleEndian>()?;
397 (Some(last_committed), Some(sequence_number))
398 }
399 _ => (None, None),
400 };
401 Ok(Some(EventData::GtidLogEvent {
402 flags,
403 uuid,
404 coordinate: offset,
405 last_committed,
406 sequence_number,
407 }))
408 }
409 TypeCode::QueryEvent => {
410 let thread_id = cursor.read_u32::<LittleEndian>()?;
411 let execution_time = cursor.read_u32::<LittleEndian>()?;
412 let schema_len = cursor.read_u8()?;
413 let error_code = cursor.read_i16::<LittleEndian>()?;
414 let _status_vars = read_two_byte_length_prefixed_bytes(&mut cursor)?;
415 let schema =
416 String::from_utf8_lossy(&read_nbytes(&mut cursor, schema_len)?).into_owned();
417 cursor.seek(io::SeekFrom::Current(1))?;
418 let mut statement = String::new();
419 cursor.read_to_string(&mut statement)?;
420 Ok(Some(EventData::QueryEvent {
421 thread_id,
422 exec_time: execution_time,
423 error_code,
424 schema,
425 query: statement,
426 }))
427 }
428 TypeCode::TableMapEvent => {
429 let mut table_id_buf = [0u8; 8];
430 cursor.read_exact(&mut table_id_buf[0..6])?;
431 let table_id = LittleEndian::read_u64(&table_id_buf);
432 cursor.seek(io::SeekFrom::Current(2))?;
434 let schema_name = read_one_byte_length_prefixed_string(&mut cursor)?;
435 cursor.seek(io::SeekFrom::Current(1))?;
437 let table_name = read_one_byte_length_prefixed_string(&mut cursor)?;
438 cursor.seek(io::SeekFrom::Current(1))?;
440 let column_count = read_variable_length_integer(&mut cursor)? as usize;
442 let mut columns = Vec::with_capacity(column_count);
443 for _ in 0..column_count {
444 let column_type = ColumnType::from_byte(cursor.read_u8()?);
445 columns.push(column_type);
446 }
447 let _metadata_length = read_variable_length_integer(&mut cursor)? as usize;
451 let final_columns = columns
452 .into_iter()
453 .map(|c| c.read_metadata(&mut cursor))
454 .collect::<Result<Vec<_>, _>>()?;
455 let num_columns = final_columns.len();
458 let null_bitmask_size = (num_columns + 7) >> 3;
459 let null_bitmap_source = read_nbytes(&mut cursor, null_bitmask_size)?;
460 let nullable_bitmap = BitSet::from_slice(num_columns, &null_bitmap_source).unwrap();
461 Ok(Some(EventData::TableMapEvent {
462 table_id,
463 schema_name,
464 table_name,
465 columns: final_columns,
466 null_bitmap: nullable_bitmap,
467 }))
468 }
469 TypeCode::WriteRowsEventV1 | TypeCode::WriteRowsEventV2 => {
470 let ev = parse_rows_event(type_code, data.len(), &mut cursor, table_map)?;
471 Ok(Some(EventData::WriteRowsEvent {
472 table_id: ev.table_id,
473 rows: ev.rows,
474 }))
475 }
476 TypeCode::UpdateRowsEventV1 | TypeCode::UpdateRowsEventV2 => {
477 let ev = parse_rows_event(type_code, data.len(), &mut cursor, table_map)?;
478 Ok(Some(EventData::UpdateRowsEvent {
479 table_id: ev.table_id,
480 rows: ev.rows,
481 }))
482 }
483 TypeCode::DeleteRowsEventV1 | TypeCode::DeleteRowsEventV2 => {
484 let ev = parse_rows_event(type_code, data.len(), &mut cursor, table_map)?;
485 Ok(Some(EventData::DeleteRowsEvent {
486 table_id: ev.table_id,
487 rows: ev.rows,
488 }))
489 }
490 _ => Ok(None),
491 }
492 }
493}
494
495pub struct Event {
496 timestamp: u32,
497 type_code: TypeCode,
498 server_id: u32,
499 event_length: u32,
500 next_position: u32,
501 flags: u16,
502 data: Vec<u8>,
503 offset: u64,
504}
505
506impl fmt::Debug for Event {
507 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
508 write!(f, "Event {{ timestamp: {:?}, type_code: {:?}, server_id: {:?}, data_len: {:?}, offset: {:?} }}", self.timestamp, self.type_code, self.server_id, self.data.len(), self.offset)
509 }
510}
511
512const HAS_CHECKSUM: bool = true;
514
515impl Event {
516 pub fn read<R: Read>(reader: &mut R, offset: u64) -> Result<Self, EventParseError> {
517 let mut header = [0u8; 19];
518 match reader.read_exact(&mut header) {
519 Ok(_) => {}
520 Err(ref e) if e.kind() == ErrorKind::UnexpectedEof => {
521 return Err(EventParseError::EofError.into())
522 }
523 Err(e) => return Err(e.into()),
524 }
525 let mut c = Cursor::new(header);
526 let timestamp = c.read_u32::<LittleEndian>()?;
527 let type_code = TypeCode::from_byte(c.read_u8()?);
528 let server_id = c.read_u32::<LittleEndian>()?;
529 let event_length = c.read_u32::<LittleEndian>()?;
530 let next_position = c.read_u32::<LittleEndian>()?;
531 let flags = c.read_u16::<LittleEndian>()?;
532 let mut data_length: usize = (event_length - 19) as usize;
533 if HAS_CHECKSUM {
534 data_length -= 4;
535 }
536 let mut data = vec![0u8; data_length];
538 reader.read_exact(&mut data)?;
539 Ok(Event {
541 timestamp,
542 type_code,
543 server_id,
544 event_length,
545 next_position,
546 flags,
547 data,
548 offset,
549 })
550 }
551
552 pub fn type_code(&self) -> TypeCode {
553 self.type_code
554 }
555
556 pub fn timestamp(&self) -> u32 {
557 self.timestamp
558 }
559
560 pub fn next_position(&self) -> u64 {
561 u64::from(self.next_position)
562 }
563
564 pub fn inner(
565 &self,
566 table_map: Option<&TableMap>,
567 ) -> Result<Option<EventData>, EventParseError> {
568 EventData::from_data(self.type_code, &self.data, table_map).map_err(Into::into)
569 }
570
571 pub fn data(&self) -> &Vec<u8> {
572 &self.data
573 }
574
575 pub fn flags(&self) -> u16 {
576 self.flags
577 }
578
579 pub fn event_length(&self) -> u32 {
580 self.event_length
581 }
582
583 pub fn offset(&self) -> u64 {
584 self.offset
585 }
586}