mysql_binlog_connector_rust/event/
row_event.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
use std::io::Cursor;

use serde::{Deserialize, Serialize};

use crate::{
    binlog_error::BinlogError,
    column::{column_type::ColumnType, column_value::ColumnValue},
    ext::cursor_ext::CursorExt,
};

use super::table_map_event::TableMapEvent;

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct RowEvent {
    pub column_values: Vec<ColumnValue>,
}

impl RowEvent {
    #[allow(clippy::needless_range_loop)]
    pub fn parse(
        cursor: &mut Cursor<&Vec<u8>>,
        table_map_event: &TableMapEvent,
        included_columns: &[bool],
    ) -> Result<Self, BinlogError> {
        let null_columns = cursor.read_bits(included_columns.len(), false)?;
        let mut column_values = Vec::with_capacity(table_map_event.column_types.len());
        let mut skipped_column_count = 0;
        for i in 0..table_map_event.column_types.len() {
            if !included_columns[i] {
                skipped_column_count += 1;
                column_values.push(ColumnValue::None);
                continue;
            }

            let index = i - skipped_column_count;
            if null_columns[index] {
                column_values.push(ColumnValue::None);
                continue;
            }

            let column_meta = table_map_event.column_metas[i];
            let mut column_type = table_map_event.column_types[i];
            let mut column_length = column_meta;

            if column_type == ColumnType::String as u8 && column_meta >= 256 {
                (column_type, column_length) =
                    ColumnType::parse_string_column_meta(column_meta, column_type)?;
            }

            let col_value = ColumnValue::parse(
                cursor,
                ColumnType::from_code(column_type),
                column_meta,
                column_length,
            )?;
            column_values.push(col_value);
        }

        Ok(Self { column_values })
    }
}