mysql_binlog_connector_rust/
binlog_client.rs

1use std::collections::HashMap;
2
3use crate::{
4    binlog_error::BinlogError,
5    binlog_parser::BinlogParser,
6    binlog_stream::BinlogStream,
7    command::{authenticator::Authenticator, command_util::CommandUtil},
8};
9
10#[derive(Default)]
11pub struct BinlogClient {
12    pub url: String,
13    pub binlog_filename: String,
14    pub binlog_position: u32,
15    pub server_id: u64,
16    pub gtid_enabled: bool,
17    pub gtid_set: String,
18    pub heartbeat_interval_secs: u64,
19}
20
21const MIN_BINLOG_POSITION: u32 = 4;
22
23impl BinlogClient {
24    pub async fn connect(&mut self) -> Result<BinlogStream, BinlogError> {
25        // init connect
26        let mut authenticator = Authenticator::new(&self.url)?;
27        let mut channel = authenticator.connect().await?;
28
29        if self.gtid_enabled {
30            if self.gtid_set.is_empty() {
31                let (_, _, gtid_set) = CommandUtil::fetch_binlog_info(&mut channel).await?;
32                self.gtid_set = gtid_set;
33            }
34        } else {
35            // fetch binlog info
36            if self.binlog_filename.is_empty() {
37                let (binlog_filename, binlog_position, _) =
38                    CommandUtil::fetch_binlog_info(&mut channel).await?;
39                self.binlog_filename = binlog_filename;
40                self.binlog_position = binlog_position;
41            }
42
43            if self.binlog_position < MIN_BINLOG_POSITION {
44                self.binlog_position = MIN_BINLOG_POSITION;
45            }
46        }
47
48        // fetch binlog checksum
49        let binlog_checksum = CommandUtil::fetch_binlog_checksum(&mut channel).await?;
50
51        // setup connection
52        CommandUtil::setup_binlog_connection(&mut channel).await?;
53
54        if self.heartbeat_interval_secs > 0 {
55            CommandUtil::enable_heartbeat(&mut channel, self.heartbeat_interval_secs).await?;
56        }
57
58        // dump binlog
59        CommandUtil::dump_binlog(&mut channel, self).await?;
60
61        // list for binlog
62        let parser = BinlogParser {
63            checksum_length: binlog_checksum.get_length(),
64            table_map_event_by_table_id: HashMap::new(),
65        };
66
67        Ok(BinlogStream { channel, parser })
68    }
69}