mysql_binlog_connector_rust/
binlog_client.rs1use std::{collections::HashMap, time::Duration};
2
3use crate::{
4 binlog_error::BinlogError,
5 binlog_parser::BinlogParser,
6 binlog_stream::BinlogStream,
7 command::{authenticator::Authenticator, command_util::CommandUtil},
8 network::packet_channel::KeepAliveConfig,
9};
10
11pub enum StartPosition {
12 BinlogPosition(String, u32),
13 Gtid(String),
14 Latest,
15}
16
17#[derive(Default)]
18pub struct BinlogClient {
19 pub url: String,
21 pub binlog_filename: String,
24 pub binlog_position: u32,
26 pub server_id: u64,
29 pub gtid_enabled: bool,
31 pub gtid_set: String,
34 pub heartbeat_interval_secs: u64,
38 pub timeout_secs: u64,
42
43 pub keepalive_idle_secs: u64,
47 pub keepalive_interval_secs: u64,
51}
52
53const MIN_BINLOG_POSITION: u32 = 4;
54
55impl BinlogClient {
56 pub fn new(url: &str, server_id: u64, position: StartPosition) -> Self {
57 let mut client = Self {
58 url: url.to_string(),
59 server_id,
60 timeout_secs: 60,
61 ..Default::default()
62 };
63 match position {
64 StartPosition::BinlogPosition(binlog_filename, binlog_position) => {
65 client.binlog_filename = binlog_filename.to_string();
66 client.binlog_position = binlog_position;
67 }
68 StartPosition::Gtid(gtid_set) => {
69 client.gtid_set = gtid_set.to_string();
70 client.gtid_enabled = true;
71 }
72 StartPosition::Latest => {}
73 }
74 client
75 }
76
77 pub fn with_master_heartbeat(self, heartbeat_interval: Duration) -> Self {
78 Self {
79 heartbeat_interval_secs: heartbeat_interval.as_secs(),
80 ..self
81 }
82 }
83
84 pub fn with_read_timeout(self, timeout: Duration) -> Self {
85 Self {
86 timeout_secs: timeout.as_secs(),
87 ..self
88 }
89 }
90
91 pub fn with_keepalive(self, keepalive_idle: Duration, keepalive_interval: Duration) -> Self {
92 Self {
93 keepalive_idle_secs: keepalive_idle.as_secs(),
94 keepalive_interval_secs: keepalive_interval.as_secs(),
95 ..self
96 }
97 }
98
99 pub async fn connect(&mut self) -> Result<BinlogStream, BinlogError> {
100 let timeout_secs = if self.timeout_secs > 0 {
102 self.timeout_secs
103 } else {
104 60
105 };
106 let mut authenticator =
107 Authenticator::new(&self.url, timeout_secs, self.build_keepalive_config())?;
108 let mut channel = authenticator.connect().await?;
109
110 if self.gtid_enabled {
111 if self.gtid_set.is_empty() {
112 let (_, _, gtid_set) = CommandUtil::fetch_binlog_info(&mut channel).await?;
113 self.gtid_set = gtid_set;
114 }
115 } else {
116 if self.binlog_filename.is_empty() {
118 let (binlog_filename, binlog_position, _) =
119 CommandUtil::fetch_binlog_info(&mut channel).await?;
120 self.binlog_filename = binlog_filename;
121 self.binlog_position = binlog_position;
122 }
123
124 if self.binlog_position < MIN_BINLOG_POSITION {
125 self.binlog_position = MIN_BINLOG_POSITION;
126 }
127 }
128
129 let binlog_checksum = CommandUtil::fetch_binlog_checksum(&mut channel).await?;
131
132 CommandUtil::setup_binlog_connection(&mut channel).await?;
134
135 if self.heartbeat_interval_secs > 0 {
136 CommandUtil::enable_heartbeat(&mut channel, self.heartbeat_interval_secs).await?;
137 }
138
139 CommandUtil::dump_binlog(&mut channel, self).await?;
141
142 let parser = BinlogParser {
144 checksum_length: binlog_checksum.get_length(),
145 table_map_event_by_table_id: HashMap::new(),
146 };
147
148 Ok(BinlogStream { channel, parser })
149 }
150
151 fn build_keepalive_config(&self) -> Option<KeepAliveConfig> {
152 if self.keepalive_idle_secs == 0 || self.keepalive_interval_secs == 0 {
153 return None;
154 }
155
156 Some(KeepAliveConfig {
157 keepidle_secs: self.keepalive_idle_secs,
158 keepintvl_secs: self.keepalive_interval_secs,
159 })
160 }
161}