mysql_binlog_connector_rust/
binlog_client.rs1use std::collections::HashMap;
2
3use crate::{
4 binlog_error::BinlogError,
5 binlog_parser::BinlogParser,
6 binlog_stream::BinlogStream,
7 command::{authenticator::Authenticator, command_util::CommandUtil},
8};
9
10#[derive(Default)]
11pub struct BinlogClient {
12 pub url: String,
14 pub binlog_filename: String,
17 pub binlog_position: u32,
19 pub server_id: u64,
22 pub gtid_enabled: bool,
24 pub gtid_set: String,
27 pub heartbeat_interval_secs: u64,
31 pub timeout_secs: u64,
35}
36
37const MIN_BINLOG_POSITION: u32 = 4;
38
39impl BinlogClient {
40 pub async fn connect(&mut self) -> Result<BinlogStream, BinlogError> {
41 let timeout_secs = if self.timeout_secs > 0 {
43 self.timeout_secs
44 } else {
45 60
46 };
47 let mut authenticator = Authenticator::new(&self.url, timeout_secs)?;
48 let mut channel = authenticator.connect().await?;
49
50 if self.gtid_enabled {
51 if self.gtid_set.is_empty() {
52 let (_, _, gtid_set) = CommandUtil::fetch_binlog_info(&mut channel).await?;
53 self.gtid_set = gtid_set;
54 }
55 } else {
56 if self.binlog_filename.is_empty() {
58 let (binlog_filename, binlog_position, _) =
59 CommandUtil::fetch_binlog_info(&mut channel).await?;
60 self.binlog_filename = binlog_filename;
61 self.binlog_position = binlog_position;
62 }
63
64 if self.binlog_position < MIN_BINLOG_POSITION {
65 self.binlog_position = MIN_BINLOG_POSITION;
66 }
67 }
68
69 let binlog_checksum = CommandUtil::fetch_binlog_checksum(&mut channel).await?;
71
72 CommandUtil::setup_binlog_connection(&mut channel).await?;
74
75 if self.heartbeat_interval_secs > 0 {
76 CommandUtil::enable_heartbeat(&mut channel, self.heartbeat_interval_secs).await?;
77 }
78
79 CommandUtil::dump_binlog(&mut channel, self).await?;
81
82 let parser = BinlogParser {
84 checksum_length: binlog_checksum.get_length(),
85 table_map_event_by_table_id: HashMap::new(),
86 };
87
88 Ok(BinlogStream { channel, parser })
89 }
90}