mysqlbinlog_network/client/
sync.rs

1use crate::mysql_binlog;
2use crate::mysql_binlog::event::EventData::{EventHeader, FormatDescriptionEvent};
3use crate::mysql_binlog::event::{ChecksumAlgorithm, EVENT_HEADER_SIZE};
4use crate::mysql_binlog::table_map::TableMap;
5use crate::none;
6use crate::none_ref;
7use crate::pkg::event::Event;
8use crate::pkg::mysql_gtid::Gtid;
9use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
10use mysql::consts::Command;
11use mysql::prelude::Queryable;
12use mysql::{Conn, Opts};
13use std::error::Error;
14use std::io::{Cursor, Read, Write};
15use std::ops::{Deref, DerefMut};
16use std::result;
17use std::str::FromStr;
18pub struct OffsetConfig {
19    pub pos: Option<(String, u32)>,
20    pub gtid: Option<Gtid>,
21}
22type Result<T> = result::Result<T, Box<dyn Error>>;
23pub struct Runner {
24    conn: Conn,
25    opt: Opts,
26    server_id: u32,
27    table_map: TableMap,
28    binlog_checksum_length: usize, // if checksum , length = 4
29}
30impl Deref for Runner {
31    type Target = Conn;
32    fn deref(&self) -> &Self::Target {
33        &self.conn
34    }
35}
36impl DerefMut for Runner {
37    fn deref_mut(&mut self) -> &mut Self::Target {
38        &mut self.conn
39    }
40}
41impl Runner {
42    pub fn new(url: &str, server_id: u32) -> Result<Self> {
43        let opt = mysql::Opts::from_url(url)?;
44        let conn = mysql::Conn::new(opt.clone())?;
45        Ok(Runner {
46            conn,
47            opt,
48            server_id,
49            table_map: TableMap::new(),
50            binlog_checksum_length: 0,
51        })
52    }
53    fn prepare(&mut self) -> Result<()> {
54        self.register_slave()?;
55        self.write_register_slave_command()?;
56        self.enable_semi_sync()
57    }
58    fn register_slave(&mut self) -> Result<()> {
59        let rsl: Vec<(String, String)> =
60            self.query("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'")?;
61        if rsl.len() == 1 {
62            let (_, s) = *none_ref!(rsl.get(0));
63            if !s.is_empty() {
64                self.query_drop("SET @master_binlog_checksum='NONE'")?;
65            }
66        }
67        self.query_drop("SET @master_heartbeat_period=30000000000;")?;
68        Ok(())
69    }
70    fn write_register_slave_command(&mut self) -> Result<()> {
71        let h_name = sys_info::hostname()?;
72        let mut data = vec![0u8; 0];
73        // slave
74        data.write_u32::<LittleEndian>(self.server_id)?;
75        data.write_u8(h_name.len() as u8)?;
76        data.write_all(h_name.as_bytes())?;
77        data.write_u8(none!(self.opt.get_user()).len() as u8)?;
78        data.write_all(none!(self.opt.get_user()).as_bytes())?;
79        data.write_u8(none!(self.opt.get_pass()).len() as u8)?;
80        data.write_all(none!(self.opt.get_pass()).as_bytes())?;
81        data.write_u16::<LittleEndian>(self.opt.get_tcp_port())?;
82        data.write_u32::<LittleEndian>(0)?;
83        data.write_u32::<LittleEndian>(0)?;
84        self.write_command(Command::COM_REGISTER_SLAVE, data.as_slice())?;
85        let rsl = self.read_packet()?;
86        println!("{:?}", rsl);
87        Ok(())
88    }
89    fn enable_semi_sync(&mut self) -> Result<()> {
90        let _: Vec<(String, String)> =
91            self.query("SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled';")?;
92        self.query_drop("SET @rpl_semi_sync_slave = 1;")?;
93        Ok(())
94    }
95    pub fn start_sync(&mut self, offset: OffsetConfig) -> Result<()> {
96        if offset.pos.is_some() {
97            self.prepare()?;
98            let mut data = vec![0u8; 0];
99            data.write_u32::<LittleEndian>(none_ref!(offset.pos).1)?;
100            data.write_u16::<LittleEndian>(0)?;
101            data.write_u32::<LittleEndian>(self.server_id)?;
102            data.write_all(none_ref!(offset.pos).0.as_bytes())?;
103            self.write_command(Command::COM_BINLOG_DUMP, data.as_slice())?;
104        } else if offset.gtid.is_some() {
105            self.prepare()?;
106            let mut data = vec![0u8; 0];
107            data.write_u16::<LittleEndian>(0)?;
108            data.write_u32::<LittleEndian>(self.server_id)?;
109            data.write_u32::<LittleEndian>("".len() as u32)?;
110            data.write_all("".as_bytes())?;
111            data.write_u64::<LittleEndian>(4)?;
112            let gtid = none_ref!(offset.gtid);
113            let gtiddata = gtid.encode()?;
114            data.write_u32::<LittleEndian>(gtiddata.len() as u32)?;
115            data.write_all(gtiddata.as_slice())?;
116            self.write_command(Command::COM_BINLOG_DUMP_GTID, data.as_slice())?;
117        } else {
118            return Err(Box::from("show set gtid or fileName/offset"));
119        }
120        Ok(())
121    }
122    fn parse_event(&mut self, data: Vec<u8>) -> Result<Event> {
123        let header: mysql_binlog::event::EventData;
124        // parse Header
125        match mysql_binlog::event::EventData::parse_header(&data[1..])? {
126            Some(EventHeader {
127                timestamp,
128                event_type: typ,
129                server_id,
130                event_size,
131                log_pos,
132                flags,
133            }) => {
134                header = mysql_binlog::event::EventData::EventHeader {
135                    timestamp,
136                    event_type: typ,
137                    server_id,
138                    event_size,
139                    log_pos,
140                    flags,
141                };
142                match typ {
143                    mysql_binlog::event::TypeCode::XidEvent => {
144                        let event = mysql_binlog::event::EventData::from_data(
145                            typ,
146                            &data[EVENT_HEADER_SIZE + 1..data.len() - self.binlog_checksum_length],
147                            Some(&self.table_map),
148                        )?;
149                        Ok(Event { header, event })
150                    }
151                    mysql_binlog::event::TypeCode::RotateEvent => {
152                        let event = mysql_binlog::event::EventData::from_data(
153                            typ,
154                            &data[EVENT_HEADER_SIZE + 1..data.len() - self.binlog_checksum_length],
155                            Some(&self.table_map),
156                        )?;
157                        Ok(Event { header, event })
158                    }
159                    mysql_binlog::event::TypeCode::QueryEvent => {
160                        let event = mysql_binlog::event::EventData::from_data(
161                            typ,
162                            &data[EVENT_HEADER_SIZE + 1..data.len() - self.binlog_checksum_length],
163                            Some(&self.table_map),
164                        )?;
165                        Ok(Event { header, event })
166                    }
167                    mysql_binlog::event::TypeCode::TableMapEvent => {
168                        let event = mysql_binlog::event::EventData::from_data(
169                            typ,
170                            &data[EVENT_HEADER_SIZE + 1..data.len() - self.binlog_checksum_length],
171                            Some(&self.table_map),
172                        )?;
173                        if let Some(ref e) = event {
174                            match e {
175                                mysql_binlog::event::EventData::TableMapEvent {
176                                    table_id: d1,
177                                    schema_name: d2,
178                                    table_name: d3,
179                                    columns: d4,
180                                    ..
181                                } => self
182                                    .table_map
183                                    .handle(*d1, d2.clone(), d3.clone(), d4.clone()),
184                                _ => {
185                                    println!("nop")
186                                }
187                            }
188                        }
189                        Ok(Event { header, event })
190                    }
191                    mysql_binlog::event::TypeCode::UpdateRowsEventV2
192                    | mysql_binlog::event::TypeCode::WriteRowsEventV2
193                    | mysql_binlog::event::TypeCode::DeleteRowsEventV2 => {
194                        let event = mysql_binlog::event::EventData::from_data(
195                            typ,
196                            &data[EVENT_HEADER_SIZE + 1..data.len() - self.binlog_checksum_length],
197                            Some(&self.table_map),
198                        )?;
199                        Ok(Event { header, event })
200                    }
201                    mysql_binlog::event::TypeCode::FormatDescriptionEvent => {
202                        let event = mysql_binlog::event::EventData::from_data(
203                            typ,
204                            &data[EVENT_HEADER_SIZE + 1..data.len()],
205                            Some(&self.table_map),
206                        )?;
207                        if let Some(FormatDescriptionEvent {
208                            checksum_algorithm: ca,
209                            ..
210                        }) = event.as_ref()
211                        {
212                            match ca {
213                                ChecksumAlgorithm::None => self.binlog_checksum_length = 0,
214                                ChecksumAlgorithm::CRC32 => self.binlog_checksum_length = 4,
215                                ChecksumAlgorithm::Other(size) => {
216                                    self.binlog_checksum_length = *size as usize
217                                }
218                            }
219                        }
220                        Ok(Event { header, event })
221                    }
222                    _ => {
223                        let event = mysql_binlog::event::EventData::from_data(
224                            typ,
225                            &data[EVENT_HEADER_SIZE + 1..data.len() - self.binlog_checksum_length],
226                            Some(&self.table_map),
227                        )?;
228                        Ok(Event { header, event })
229                    }
230                }
231            }
232            None => Err(Box::from("invalid event header")),
233            _ => Err(Box::from("parse event error")),
234        }
235    }
236    fn handle_error_packet(&mut self, data: Vec<u8>) -> Result<(u16, String, String)> {
237        let mut cursor = Cursor::new(data);
238        cursor.set_position(2);
239        let code = cursor.read_u16::<LittleEndian>()?;
240        let _ = cursor.read_u8()?;
241        let mut state = String::with_capacity(5);
242        cursor.read_to_string(&mut state)?;
243        let mut message = "".to_string();
244        cursor.read_to_string(&mut message);
245        Ok((code, state, message))
246    }
247    pub fn get_event(&mut self) -> Result<Event> {
248        loop {
249            match self.read_packet() {
250                Ok(data) => match none!(data.get(0)) {
251                    &0 => return self.parse_event(data),
252                    &0xff => {
253                        let (code, state, message) = self.handle_error_packet(data)?;
254                        return Err(Box::from(format!("{}{}:{}", code, state, message)));
255                    }
256                    &_ => continue,
257                },
258                Err(e) => return Err(Box::from(e.to_string())),
259            }
260        }
261    }
262}
263
264#[test]
265fn test_conn_progress() {
266    let mut runner = Runner::new("mysql://root:123456@127.0.0.1:3306", 1111).unwrap();
267    runner.prepare().unwrap();
268    runner
269        .start_sync(OffsetConfig {
270            pos: Some(("mysql-bin.000132".to_string(), 194)),
271            gtid: None,
272        })
273        .unwrap();
274    runner.get_event();
275}