mysql_binlog_connector_rust/
binlog_client.rs

1use std::collections::HashMap;
2
3use crate::{
4    binlog_error::BinlogError,
5    binlog_parser::BinlogParser,
6    binlog_stream::BinlogStream,
7    command::{authenticator::Authenticator, command_util::CommandUtil},
8};
9
10#[derive(Default)]
11pub struct BinlogClient {
12    /// MySQL server connection URL in format "mysql://user:password@host:port"
13    pub url: String,
14    /// Name of the binlog file to start replication from, e.g. "mysql-bin.000001"
15    /// Only used when gtid_enabled is false
16    pub binlog_filename: String,
17    /// Position in the binlog file to start replication from
18    pub binlog_position: u32,
19    /// Unique identifier for this replication client
20    /// Must be different from other clients connected to the same MySQL server
21    pub server_id: u64,
22    /// Whether to enable GTID mode for replication
23    pub gtid_enabled: bool,
24    /// GTID set in format "uuid:1-100,uuid2:1-200"
25    /// Only used when gtid_enabled is true
26    pub gtid_set: String,
27    /// Heartbeat interval in seconds
28    /// Server will send a heartbeat event if no binlog events are received within this interval
29    /// If heartbeat_interval_secs=0, server won't send heartbeat events
30    pub heartbeat_interval_secs: u64,
31    /// Network operation timeout in seconds
32    /// Maximum wait time for operations like connection establishment and data reading
33    /// If timeout_secs=0, the default value(60) will be used
34    pub timeout_secs: u64,
35}
36
37const MIN_BINLOG_POSITION: u32 = 4;
38
39impl BinlogClient {
40    pub async fn connect(&mut self) -> Result<BinlogStream, BinlogError> {
41        // init connect
42        let timeout_secs = if self.timeout_secs > 0 {
43            self.timeout_secs
44        } else {
45            60
46        };
47        let mut authenticator = Authenticator::new(&self.url, timeout_secs)?;
48        let mut channel = authenticator.connect().await?;
49
50        if self.gtid_enabled {
51            if self.gtid_set.is_empty() {
52                let (_, _, gtid_set) = CommandUtil::fetch_binlog_info(&mut channel).await?;
53                self.gtid_set = gtid_set;
54            }
55        } else {
56            // fetch binlog info
57            if self.binlog_filename.is_empty() {
58                let (binlog_filename, binlog_position, _) =
59                    CommandUtil::fetch_binlog_info(&mut channel).await?;
60                self.binlog_filename = binlog_filename;
61                self.binlog_position = binlog_position;
62            }
63
64            if self.binlog_position < MIN_BINLOG_POSITION {
65                self.binlog_position = MIN_BINLOG_POSITION;
66            }
67        }
68
69        // fetch binlog checksum
70        let binlog_checksum = CommandUtil::fetch_binlog_checksum(&mut channel).await?;
71
72        // setup connection
73        CommandUtil::setup_binlog_connection(&mut channel).await?;
74
75        if self.heartbeat_interval_secs > 0 {
76            CommandUtil::enable_heartbeat(&mut channel, self.heartbeat_interval_secs).await?;
77        }
78
79        // dump binlog
80        CommandUtil::dump_binlog(&mut channel, self).await?;
81
82        // list for binlog
83        let parser = BinlogParser {
84            checksum_length: binlog_checksum.get_length(),
85            table_map_event_by_table_id: HashMap::new(),
86        };
87
88        Ok(BinlogStream { channel, parser })
89    }
90}