1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use crate::{
    binlog_error::BinlogError,
    constants::MysqlRespCode,
    event::checksum_type::ChecksumType,
    network::{
        error_packet::ErrorPacket, packet_channel::PacketChannel,
        result_set_row_packet::ResultSetRowPacket,
    },
};

use super::{dump_binlog_command::DumpBinlogCommand, query_command::QueryCommand};

pub struct CommandUtil {}

impl CommandUtil {
    pub async fn execute_query(
        channel: &mut PacketChannel,
        sql: &str,
    ) -> Result<Vec<ResultSetRowPacket>, BinlogError> {
        Self::execute_sql(channel, sql).await?;
        // read to EOF
        while channel.read().await?[0] != MysqlRespCode::EOF {}
        // get result sets
        let mut result_sets = Vec::new();

        let mut buf = channel.read().await?;
        while buf[0] != MysqlRespCode::EOF {
            Self::check_error_packet(&buf)?;
            let result_set = ResultSetRowPacket::new(&buf)?;
            result_sets.push(result_set);
            buf = channel.read().await?;
        }

        Ok(result_sets)
    }

    pub async fn execute_sql(channel: &mut PacketChannel, sql: &str) -> Result<(), BinlogError> {
        let mut command = QueryCommand {
            sql: sql.to_string(),
        };

        // send the query command, sequence for non-authenticate commands are always 0
        channel.write(&command.to_bytes()?, 0).await?;

        // read the response packet
        let buf = channel.read().await?;
        Self::check_error_packet(&buf)
    }

    pub async fn fetch_binlog_info(
        channel: &mut PacketChannel,
    ) -> Result<(String, u32), BinlogError> {
        let result_sets = Self::execute_query(channel, "show master status").await?;
        if result_sets.is_empty() {
            return Err(BinlogError::ConnectError(
                "failed to fetch binlog filename and position".into(),
            ));
        }
        let binlog_filename = result_sets[0].values[0].clone();
        let binlog_position = result_sets[0].values[1].clone().parse::<u32>()?;
        Ok((binlog_filename, binlog_position))
    }

    pub async fn fetch_binlog_checksum(
        channel: &mut PacketChannel,
    ) -> Result<ChecksumType, BinlogError> {
        let result_set_rows =
            Self::execute_query(channel, "select @@global.binlog_checksum").await?;
        let mut checksum_name = "";
        if !result_set_rows.is_empty() {
            checksum_name = result_set_rows[0].values[0].as_str();
        }
        Ok(ChecksumType::from_name(checksum_name))
    }

    pub async fn setup_binlog_connection(channel: &mut PacketChannel) -> Result<(), BinlogError> {
        let mut command = QueryCommand {
            sql: "set @master_binlog_checksum= @@global.binlog_checksum".to_string(),
        };
        channel.write(&command.to_bytes()?, 0).await?;

        let buf = channel.read().await?;
        Self::check_error_packet(&buf)?;
        Ok(())
    }

    pub async fn dump_binlog(
        channel: &mut PacketChannel,
        binlog_filename: &str,
        binlog_position: u32,
        server_id: u64,
    ) -> Result<(), BinlogError> {
        let mut command = DumpBinlogCommand {
            binlog_filename: binlog_filename.to_string(),
            binlog_position,
            server_id,
        };

        let buf = command.to_bytes()?;
        channel.write(&buf, 0).await?;
        Ok(())
    }

    pub fn parse_result(buf: &Vec<u8>) -> Result<(), BinlogError> {
        match buf[0] {
            MysqlRespCode::OK => Ok(()),

            MysqlRespCode::ERROR => Self::check_error_packet(buf),

            _ => Err(BinlogError::ConnectError("connect mysql failed".into())),
        }
    }

    pub fn check_error_packet(buf: &Vec<u8>) -> Result<(), BinlogError> {
        if buf[0] == MysqlRespCode::ERROR {
            let error_packet = ErrorPacket::new(buf)?;
            return Err(BinlogError::ConnectError(format!(
                "connect mysql failed: {}",
                error_packet.error_message
            )));
        }
        Ok(())
    }
}