mysql_binlog_connector_rust/command/
command_util.rs1use crate::{
2 binlog_client::BinlogClient,
3 binlog_error::BinlogError,
4 constants::MysqlRespCode,
5 event::checksum_type::ChecksumType,
6 network::{
7 error_packet::ErrorPacket, packet_channel::PacketChannel,
8 result_set_row_packet::ResultSetRowPacket,
9 },
10};
11
12use super::{
13 dump_binlog_command::DumpBinlogCommand, dump_binlog_gtid_command::DumpBinlogGtidCommand,
14 gtid_set::GtidSet, query_command::QueryCommand,
15};
16
17pub struct CommandUtil {}
18
19impl CommandUtil {
20 pub async fn execute_query(
21 channel: &mut PacketChannel,
22 sql: &str,
23 ) -> Result<Vec<ResultSetRowPacket>, BinlogError> {
24 Self::execute_sql(channel, sql).await?;
25 while channel.read().await?[0] != MysqlRespCode::EOF {}
27 let mut result_sets = Vec::new();
29
30 let mut buf = channel.read().await?;
31 while buf[0] != MysqlRespCode::EOF {
32 Self::check_error_packet(&buf)?;
33 let result_set = ResultSetRowPacket::new(&buf)?;
34 result_sets.push(result_set);
35 buf = channel.read().await?;
36 }
37
38 Ok(result_sets)
39 }
40
41 pub async fn execute_sql(channel: &mut PacketChannel, sql: &str) -> Result<(), BinlogError> {
42 let mut command = QueryCommand {
43 sql: sql.to_string(),
44 };
45
46 channel.write(&command.to_bytes()?, 0).await?;
48
49 let buf = channel.read().await?;
51 Self::check_error_packet(&buf)
52 }
53
54 pub async fn fetch_binlog_info(
55 channel: &mut PacketChannel,
56 ) -> Result<(String, u32, String), BinlogError> {
57 let result_sets = Self::execute_query(channel, "show master status").await?;
58 if result_sets.is_empty() {
59 return Err(BinlogError::ConnectError(
60 "failed to fetch binlog filename and position".into(),
61 ));
62 }
63 let binlog_filename = result_sets[0].values[0].clone();
64 let binlog_position = result_sets[0].values[1].clone().parse::<u32>()?;
65 let gtid_set = result_sets[0].values[4].clone();
66 Ok((binlog_filename, binlog_position, gtid_set))
67 }
68
69 pub async fn fetch_binlog_checksum(
70 channel: &mut PacketChannel,
71 ) -> Result<ChecksumType, BinlogError> {
72 let result_set_rows =
73 Self::execute_query(channel, "select @@global.binlog_checksum").await?;
74 let mut checksum_name = "";
75 if !result_set_rows.is_empty() {
76 checksum_name = result_set_rows[0].values[0].as_str();
77 }
78 Ok(ChecksumType::from_name(checksum_name))
79 }
80
81 pub async fn setup_binlog_connection(channel: &mut PacketChannel) -> Result<(), BinlogError> {
82 let mut command = QueryCommand {
83 sql: "set @master_binlog_checksum= @@global.binlog_checksum".to_string(),
84 };
85 channel.write(&command.to_bytes()?, 0).await?;
86 let buf = channel.read().await?;
87 Self::check_error_packet(&buf)
88 }
89
90 pub async fn enable_heartbeat(
91 channel: &mut PacketChannel,
92 heartbeat_interval_secs: u64,
93 ) -> Result<(), BinlogError> {
94 let mut command = QueryCommand {
95 sql: format!(
96 "set @master_heartbeat_period={}",
97 heartbeat_interval_secs * 1000_000_000
98 ),
99 };
100 channel.write(&command.to_bytes()?, 0).await?;
101 let buf = channel.read().await?;
102 Self::check_error_packet(&buf)
103 }
104
105 pub async fn dump_binlog(
106 channel: &mut PacketChannel,
107 client: &BinlogClient,
108 ) -> Result<(), BinlogError> {
109 let buf = if client.gtid_enabled {
110 let mut command = DumpBinlogGtidCommand {
111 server_id: client.server_id,
112 gtid_set: GtidSet::new(&client.gtid_set)?,
113 };
114 command.to_bytes()?
115 } else {
116 let mut command = DumpBinlogCommand {
117 binlog_filename: client.binlog_filename.clone(),
118 binlog_position: client.binlog_position,
119 server_id: client.server_id,
120 };
121 command.to_bytes()?
122 };
123 channel.write(&buf, 0).await
124 }
125
126 pub fn parse_result(buf: &Vec<u8>) -> Result<(), BinlogError> {
127 match buf[0] {
128 MysqlRespCode::OK => Ok(()),
129
130 MysqlRespCode::ERROR => Self::check_error_packet(buf),
131
132 _ => Err(BinlogError::ConnectError("connect mysql failed".into())),
133 }
134 }
135
136 pub fn check_error_packet(buf: &Vec<u8>) -> Result<(), BinlogError> {
137 if buf[0] == MysqlRespCode::ERROR {
138 let error_packet = ErrorPacket::new(buf)?;
139 return Err(BinlogError::ConnectError(format!(
140 "connect mysql failed: {}",
141 error_packet.error_message
142 )));
143 }
144 Ok(())
145 }
146}