mysql_binlog_connector_rust/event/
row_event.rs

1use std::io::Cursor;
2
3use serde::{Deserialize, Serialize};
4
5use crate::{
6    binlog_error::BinlogError,
7    column::{column_type::ColumnType, column_value::ColumnValue},
8    ext::cursor_ext::CursorExt,
9};
10
11use super::table_map_event::TableMapEvent;
12
13#[derive(Debug, Deserialize, Serialize, Clone)]
14pub struct RowEvent {
15    pub column_values: Vec<ColumnValue>,
16}
17
18impl RowEvent {
19    #[allow(clippy::needless_range_loop)]
20    pub fn parse(
21        cursor: &mut Cursor<&Vec<u8>>,
22        table_map_event: &TableMapEvent,
23        included_columns: &[bool],
24    ) -> Result<Self, BinlogError> {
25        let null_columns = cursor.read_bits(included_columns.len(), false)?;
26        let mut column_values = Vec::with_capacity(table_map_event.column_types.len());
27        let mut skipped_column_count = 0;
28        for i in 0..table_map_event.column_types.len() {
29            if !included_columns[i] {
30                skipped_column_count += 1;
31                column_values.push(ColumnValue::None);
32                continue;
33            }
34
35            let index = i - skipped_column_count;
36            if null_columns[index] {
37                column_values.push(ColumnValue::None);
38                continue;
39            }
40
41            let column_meta = table_map_event.column_metas[i];
42            let mut column_type = table_map_event.column_types[i];
43            let mut column_length = column_meta;
44
45            if column_type == ColumnType::String as u8 && column_meta >= 256 {
46                (column_type, column_length) =
47                    ColumnType::parse_string_column_meta(column_meta, column_type)?;
48            }
49
50            let col_value = ColumnValue::parse(
51                cursor,
52                ColumnType::from_code(column_type),
53                column_meta,
54                column_length,
55            )?;
56            column_values.push(col_value);
57        }
58
59        Ok(Self { column_values })
60    }
61}