mysql_binlog_connector_rust/event/
transaction_payload_event.rs

1use std::{
2    collections::HashMap,
3    io::{Cursor, Seek, SeekFrom},
4};
5
6use serde::{Deserialize, Serialize};
7
8use crate::{binlog_error::BinlogError, binlog_parser::BinlogParser, ext::cursor_ext::CursorExt};
9
10use super::{event_data::EventData, event_header::EventHeader};
11
12#[derive(Debug, Deserialize, Serialize, Clone)]
13pub struct TransactionPayloadEvent {
14    pub uncompressed_size: u32,
15    pub uncompressed_events: Vec<(EventHeader, EventData)>,
16}
17
18impl TransactionPayloadEvent {
19    pub fn parse(cursor: &mut Cursor<&Vec<u8>>) -> Result<Self, BinlogError> {
20        // refer: https://dev.mysql.com/doc/refman/8.0/en/binary-log-transaction-compression.html
21        let (_compress_type, uncompressed_size) = Self::parse_meta(cursor)?;
22
23        // read the rest data as payload and decompress it, currently only support zstd
24        let mut uncompressed_payload: Vec<u8> = Vec::new();
25        zstd::stream::copy_decode(cursor, &mut uncompressed_payload)?;
26
27        // construct a new parser from the payload
28        let mut payload_cursor = Cursor::new(uncompressed_payload);
29        let mut parser = BinlogParser {
30            checksum_length: 0,
31            table_map_event_by_table_id: HashMap::new(),
32        };
33
34        // parse events in payload
35        let mut uncompressed_events: Vec<(EventHeader, EventData)> = Vec::new();
36        while let Ok(e) = parser.next(&mut payload_cursor) {
37            uncompressed_events.push(e);
38        }
39
40        Ok(Self {
41            uncompressed_size: uncompressed_size as u32,
42            uncompressed_events,
43        })
44    }
45
46    fn parse_meta(cursor: &mut Cursor<&Vec<u8>>) -> Result<(usize, usize), BinlogError> {
47        let mut payload_size = 0;
48        let mut compress_type = 0;
49        let mut uncompressed_size = 0;
50
51        while cursor.available() > 0 {
52            let field_type = if cursor.available() >= 1 {
53                cursor.read_packed_number()?
54            } else {
55                0
56            };
57
58            // we have reached the end of the Event Data Header
59            if field_type == 0 {
60                break;
61            }
62
63            let field_length = if cursor.available() >= 1 {
64                cursor.read_packed_number()?
65            } else {
66                0
67            };
68
69            match field_type {
70                1 => payload_size = cursor.read_packed_number()?,
71
72                2 => compress_type = cursor.read_packed_number()?,
73
74                3 => uncompressed_size = cursor.read_packed_number()?,
75
76                _ => {
77                    cursor.seek(SeekFrom::Current(field_length as i64))?;
78                }
79            }
80        }
81
82        if uncompressed_size == 0 {
83            uncompressed_size = payload_size;
84        }
85
86        Ok((compress_type, uncompressed_size))
87    }
88}