mysql_binlog_connector_rust/event/
transaction_payload_event.rs1use std::{
2 collections::HashMap,
3 io::{Cursor, Seek, SeekFrom},
4};
5
6use serde::{Deserialize, Serialize};
7
8use crate::{binlog_error::BinlogError, binlog_parser::BinlogParser, ext::cursor_ext::CursorExt};
9
10use super::{event_data::EventData, event_header::EventHeader};
11
12#[derive(Debug, Deserialize, Serialize, Clone)]
13pub struct TransactionPayloadEvent {
14 pub uncompressed_size: u32,
15 pub uncompressed_events: Vec<(EventHeader, EventData)>,
16}
17
18impl TransactionPayloadEvent {
19 pub fn parse(cursor: &mut Cursor<&Vec<u8>>) -> Result<Self, BinlogError> {
20 let (_compress_type, uncompressed_size) = Self::parse_meta(cursor)?;
22
23 let mut uncompressed_payload: Vec<u8> = Vec::new();
25 zstd::stream::copy_decode(cursor, &mut uncompressed_payload)?;
26
27 let mut payload_cursor = Cursor::new(uncompressed_payload);
29 let mut parser = BinlogParser {
30 checksum_length: 0,
31 table_map_event_by_table_id: HashMap::new(),
32 };
33
34 let mut uncompressed_events: Vec<(EventHeader, EventData)> = Vec::new();
36 while let Ok(e) = parser.next(&mut payload_cursor) {
37 uncompressed_events.push(e);
38 }
39
40 Ok(Self {
41 uncompressed_size: uncompressed_size as u32,
42 uncompressed_events,
43 })
44 }
45
46 fn parse_meta(cursor: &mut Cursor<&Vec<u8>>) -> Result<(usize, usize), BinlogError> {
47 let mut payload_size = 0;
48 let mut compress_type = 0;
49 let mut uncompressed_size = 0;
50
51 while cursor.available() > 0 {
52 let field_type = if cursor.available() >= 1 {
53 cursor.read_packed_number()?
54 } else {
55 0
56 };
57
58 if field_type == 0 {
60 break;
61 }
62
63 let field_length = if cursor.available() >= 1 {
64 cursor.read_packed_number()?
65 } else {
66 0
67 };
68
69 match field_type {
70 1 => payload_size = cursor.read_packed_number()?,
71
72 2 => compress_type = cursor.read_packed_number()?,
73
74 3 => uncompressed_size = cursor.read_packed_number()?,
75
76 _ => {
77 cursor.seek(SeekFrom::Current(field_length as i64))?;
78 }
79 }
80 }
81
82 if uncompressed_size == 0 {
83 uncompressed_size = payload_size;
84 }
85
86 Ok((compress_type, uncompressed_size))
87 }
88}