mysql_binlog_connector_rust/command/
command_util.rs

1use crate::{
2    binlog_client::BinlogClient,
3    binlog_error::BinlogError,
4    constants::MysqlRespCode,
5    event::checksum_type::ChecksumType,
6    network::{
7        error_packet::ErrorPacket, packet_channel::PacketChannel,
8        result_set_row_packet::ResultSetRowPacket,
9    },
10};
11
12use super::{
13    dump_binlog_command::DumpBinlogCommand, dump_binlog_gtid_command::DumpBinlogGtidCommand,
14    gtid_set::GtidSet, query_command::QueryCommand,
15};
16
17pub struct CommandUtil {}
18
19impl CommandUtil {
20    pub async fn execute_query(
21        channel: &mut PacketChannel,
22        sql: &str,
23    ) -> Result<Vec<ResultSetRowPacket>, BinlogError> {
24        Self::execute_sql(channel, sql).await?;
25        // read to EOF
26        while channel.read().await?[0] != MysqlRespCode::EOF {}
27        // get result sets
28        let mut result_sets = Vec::new();
29
30        let mut buf = channel.read().await?;
31        while buf[0] != MysqlRespCode::EOF {
32            Self::check_error_packet(&buf)?;
33            let result_set = ResultSetRowPacket::new(&buf)?;
34            result_sets.push(result_set);
35            buf = channel.read().await?;
36        }
37
38        Ok(result_sets)
39    }
40
41    pub async fn execute_sql(channel: &mut PacketChannel, sql: &str) -> Result<(), BinlogError> {
42        let mut command = QueryCommand {
43            sql: sql.to_string(),
44        };
45
46        // send the query command, sequence for non-authenticate commands are always 0
47        channel.write(&command.to_bytes()?, 0).await?;
48
49        // read the response packet
50        let buf = channel.read().await?;
51        Self::check_error_packet(&buf)
52    }
53
54    pub async fn fetch_binlog_info(
55        channel: &mut PacketChannel,
56    ) -> Result<(String, u32, String), BinlogError> {
57        let result_sets = Self::execute_query(channel, "show master status").await?;
58        if result_sets.is_empty() {
59            return Err(BinlogError::ConnectError(
60                "failed to fetch binlog filename and position".into(),
61            ));
62        }
63        let binlog_filename = result_sets[0].values[0].clone();
64        let binlog_position = result_sets[0].values[1].clone().parse::<u32>()?;
65        let gtid_set = result_sets[0].values[4].clone();
66        Ok((binlog_filename, binlog_position, gtid_set))
67    }
68
69    pub async fn fetch_binlog_checksum(
70        channel: &mut PacketChannel,
71    ) -> Result<ChecksumType, BinlogError> {
72        let result_set_rows =
73            Self::execute_query(channel, "select @@global.binlog_checksum").await?;
74        let mut checksum_name = "";
75        if !result_set_rows.is_empty() {
76            checksum_name = result_set_rows[0].values[0].as_str();
77        }
78        Ok(ChecksumType::from_name(checksum_name))
79    }
80
81    pub async fn setup_binlog_connection(channel: &mut PacketChannel) -> Result<(), BinlogError> {
82        let mut command = QueryCommand {
83            sql: "set @master_binlog_checksum= @@global.binlog_checksum".to_string(),
84        };
85        channel.write(&command.to_bytes()?, 0).await?;
86        let buf = channel.read().await?;
87        Self::check_error_packet(&buf)
88    }
89
90    pub async fn enable_heartbeat(
91        channel: &mut PacketChannel,
92        heartbeat_interval_secs: u64,
93    ) -> Result<(), BinlogError> {
94        let mut command = QueryCommand {
95            sql: format!(
96                "set @master_heartbeat_period={}",
97                heartbeat_interval_secs * 1000_000_000
98            ),
99        };
100        channel.write(&command.to_bytes()?, 0).await?;
101        let buf = channel.read().await?;
102        Self::check_error_packet(&buf)
103    }
104
105    pub async fn dump_binlog(
106        channel: &mut PacketChannel,
107        client: &BinlogClient,
108    ) -> Result<(), BinlogError> {
109        let buf = if client.gtid_enabled {
110            let mut command = DumpBinlogGtidCommand {
111                server_id: client.server_id,
112                gtid_set: GtidSet::new(&client.gtid_set)?,
113            };
114            command.to_bytes()?
115        } else {
116            let mut command = DumpBinlogCommand {
117                binlog_filename: client.binlog_filename.clone(),
118                binlog_position: client.binlog_position,
119                server_id: client.server_id,
120            };
121            command.to_bytes()?
122        };
123        channel.write(&buf, 0).await
124    }
125
126    pub fn parse_result(buf: &Vec<u8>) -> Result<(), BinlogError> {
127        match buf[0] {
128            MysqlRespCode::OK => Ok(()),
129
130            MysqlRespCode::ERROR => Self::check_error_packet(buf),
131
132            _ => Err(BinlogError::ConnectError("connect mysql failed".into())),
133        }
134    }
135
136    pub fn check_error_packet(buf: &Vec<u8>) -> Result<(), BinlogError> {
137        if buf[0] == MysqlRespCode::ERROR {
138            let error_packet = ErrorPacket::new(buf)?;
139            return Err(BinlogError::ConnectError(format!(
140                "connect mysql failed: {}",
141                error_packet.error_message
142            )));
143        }
144        Ok(())
145    }
146}