mysql_binlog_connector_rust/
binlog_client.rs

1use std::{collections::HashMap, time::Duration};
2
3use crate::{
4    binlog_error::BinlogError,
5    binlog_parser::BinlogParser,
6    binlog_stream::BinlogStream,
7    command::{authenticator::Authenticator, command_util::CommandUtil},
8    network::packet_channel::KeepAliveConfig,
9};
10
11pub enum StartPosition {
12    BinlogPosition(String, u32),
13    Gtid(String),
14    Latest,
15}
16
17#[derive(Default)]
18pub struct BinlogClient {
19    /// MySQL server connection URL in format "mysql://user:password@host:port"
20    pub url: String,
21    /// Name of the binlog file to start replication from, e.g. "mysql-bin.000001"
22    /// Only used when gtid_enabled is false
23    pub binlog_filename: String,
24    /// Position in the binlog file to start replication from
25    pub binlog_position: u32,
26    /// Unique identifier for this replication client
27    /// Must be different from other clients connected to the same MySQL server
28    pub server_id: u64,
29    /// Whether to enable GTID mode for replication
30    pub gtid_enabled: bool,
31    /// GTID set in format "uuid:1-100,uuid2:1-200"
32    /// Only used when gtid_enabled is true
33    pub gtid_set: String,
34    /// Heartbeat interval in seconds
35    /// Server will send a heartbeat event if no binlog events are received within this interval
36    /// If heartbeat_interval_secs=0, server won't send heartbeat events
37    pub heartbeat_interval_secs: u64,
38    /// Network operation timeout in seconds
39    /// Maximum wait time for operations like connection establishment and data reading
40    /// If timeout_secs=0, the default value(60) will be used
41    pub timeout_secs: u64,
42
43    /// TCP keepalive idle time in seconds
44    /// The time period after which the first keepalive packet is sent if no data has been exchanged between the two endpoints
45    /// If keepalive_idle_secs=0, TCP keepalive will not be enabled
46    pub keepalive_idle_secs: u64,
47    /// TCP keepalive interval time in seconds
48    /// The time period between keepalive packets if the connection is still active
49    /// If keepalive_interval_secs=0, TCP keepalive will not be enabled
50    pub keepalive_interval_secs: u64,
51}
52
53const MIN_BINLOG_POSITION: u32 = 4;
54
55impl BinlogClient {
56    pub fn new(url: &str, server_id: u64, position: StartPosition) -> Self {
57        let mut client = Self {
58            url: url.to_string(),
59            server_id,
60            timeout_secs: 60,
61            ..Default::default()
62        };
63        match position {
64            StartPosition::BinlogPosition(binlog_filename, binlog_position) => {
65                client.binlog_filename = binlog_filename.to_string();
66                client.binlog_position = binlog_position;
67            }
68            StartPosition::Gtid(gtid_set) => {
69                client.gtid_set = gtid_set.to_string();
70                client.gtid_enabled = true;
71            }
72            StartPosition::Latest => {}
73        }
74        client
75    }
76
77    pub fn with_master_heartbeat(self, heartbeat_interval: Duration) -> Self {
78        Self {
79            heartbeat_interval_secs: heartbeat_interval.as_secs(),
80            ..self
81        }
82    }
83
84    pub fn with_read_timeout(self, timeout: Duration) -> Self {
85        Self {
86            timeout_secs: timeout.as_secs(),
87            ..self
88        }
89    }
90
91    pub fn with_keepalive(self, keepalive_idle: Duration, keepalive_interval: Duration) -> Self {
92        Self {
93            keepalive_idle_secs: keepalive_idle.as_secs(),
94            keepalive_interval_secs: keepalive_interval.as_secs(),
95            ..self
96        }
97    }
98
99    pub async fn connect(&mut self) -> Result<BinlogStream, BinlogError> {
100        // init connect
101        let timeout_secs = if self.timeout_secs > 0 {
102            self.timeout_secs
103        } else {
104            60
105        };
106        let mut authenticator =
107            Authenticator::new(&self.url, timeout_secs, self.build_keepalive_config())?;
108        let mut channel = authenticator.connect().await?;
109
110        if self.gtid_enabled {
111            if self.gtid_set.is_empty() {
112                let (_, _, gtid_set) = CommandUtil::fetch_binlog_info(&mut channel).await?;
113                self.gtid_set = gtid_set;
114            }
115        } else {
116            // fetch binlog info
117            if self.binlog_filename.is_empty() {
118                let (binlog_filename, binlog_position, _) =
119                    CommandUtil::fetch_binlog_info(&mut channel).await?;
120                self.binlog_filename = binlog_filename;
121                self.binlog_position = binlog_position;
122            }
123
124            if self.binlog_position < MIN_BINLOG_POSITION {
125                self.binlog_position = MIN_BINLOG_POSITION;
126            }
127        }
128
129        // fetch binlog checksum
130        let binlog_checksum = CommandUtil::fetch_binlog_checksum(&mut channel).await?;
131
132        // setup connection
133        CommandUtil::setup_binlog_connection(&mut channel).await?;
134
135        if self.heartbeat_interval_secs > 0 {
136            CommandUtil::enable_heartbeat(&mut channel, self.heartbeat_interval_secs).await?;
137        }
138
139        // dump binlog
140        CommandUtil::dump_binlog(&mut channel, self).await?;
141
142        // list for binlog
143        let parser = BinlogParser {
144            checksum_length: binlog_checksum.get_length(),
145            table_map_event_by_table_id: HashMap::new(),
146        };
147
148        Ok(BinlogStream { channel, parser })
149    }
150
151    fn build_keepalive_config(&self) -> Option<KeepAliveConfig> {
152        if self.keepalive_idle_secs == 0 || self.keepalive_interval_secs == 0 {
153            return None;
154        }
155
156        Some(KeepAliveConfig {
157            keepidle_secs: self.keepalive_idle_secs,
158            keepintvl_secs: self.keepalive_interval_secs,
159        })
160    }
161}