mysql_binlog_connector_rust/
binlog_client.rs1use std::collections::HashMap;
2
3use crate::{
4 binlog_error::BinlogError,
5 binlog_parser::BinlogParser,
6 binlog_stream::BinlogStream,
7 command::{authenticator::Authenticator, command_util::CommandUtil},
8};
9
10#[derive(Default)]
11pub struct BinlogClient {
12 pub url: String,
13 pub binlog_filename: String,
14 pub binlog_position: u32,
15 pub server_id: u64,
16 pub gtid_enabled: bool,
17 pub gtid_set: String,
18 pub heartbeat_interval_secs: u64,
19}
20
21const MIN_BINLOG_POSITION: u32 = 4;
22
23impl BinlogClient {
24 pub async fn connect(&mut self) -> Result<BinlogStream, BinlogError> {
25 let mut authenticator = Authenticator::new(&self.url)?;
27 let mut channel = authenticator.connect().await?;
28
29 if self.gtid_enabled {
30 if self.gtid_set.is_empty() {
31 let (_, _, gtid_set) = CommandUtil::fetch_binlog_info(&mut channel).await?;
32 self.gtid_set = gtid_set;
33 }
34 } else {
35 if self.binlog_filename.is_empty() {
37 let (binlog_filename, binlog_position, _) =
38 CommandUtil::fetch_binlog_info(&mut channel).await?;
39 self.binlog_filename = binlog_filename;
40 self.binlog_position = binlog_position;
41 }
42
43 if self.binlog_position < MIN_BINLOG_POSITION {
44 self.binlog_position = MIN_BINLOG_POSITION;
45 }
46 }
47
48 let binlog_checksum = CommandUtil::fetch_binlog_checksum(&mut channel).await?;
50
51 CommandUtil::setup_binlog_connection(&mut channel).await?;
53
54 if self.heartbeat_interval_secs > 0 {
55 CommandUtil::enable_heartbeat(&mut channel, self.heartbeat_interval_secs).await?;
56 }
57
58 CommandUtil::dump_binlog(&mut channel, self).await?;
60
61 let parser = BinlogParser {
63 checksum_length: binlog_checksum.get_length(),
64 table_map_event_by_table_id: HashMap::new(),
65 };
66
67 Ok(BinlogStream { channel, parser })
68 }
69}