mysqlbinlog_network/client/
sync.rs1use crate::mysql_binlog;
2use crate::mysql_binlog::event::EventData::{EventHeader, FormatDescriptionEvent};
3use crate::mysql_binlog::event::{ChecksumAlgorithm, EVENT_HEADER_SIZE};
4use crate::mysql_binlog::table_map::TableMap;
5use crate::none;
6use crate::none_ref;
7use crate::pkg::event::Event;
8use crate::pkg::mysql_gtid::Gtid;
9use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
10use mysql::consts::Command;
11use mysql::prelude::Queryable;
12use mysql::{Conn, Opts};
13use std::error::Error;
14use std::io::{Cursor, Read, Write};
15use std::ops::{Deref, DerefMut};
16use std::result;
17use std::str::FromStr;
18pub struct OffsetConfig {
19 pub pos: Option<(String, u32)>,
20 pub gtid: Option<Gtid>,
21}
22type Result<T> = result::Result<T, Box<dyn Error>>;
23pub struct Runner {
24 conn: Conn,
25 opt: Opts,
26 server_id: u32,
27 table_map: TableMap,
28 binlog_checksum_length: usize, }
30impl Deref for Runner {
31 type Target = Conn;
32 fn deref(&self) -> &Self::Target {
33 &self.conn
34 }
35}
36impl DerefMut for Runner {
37 fn deref_mut(&mut self) -> &mut Self::Target {
38 &mut self.conn
39 }
40}
41impl Runner {
42 pub fn new(url: &str, server_id: u32) -> Result<Self> {
43 let opt = mysql::Opts::from_url(url)?;
44 let conn = mysql::Conn::new(opt.clone())?;
45 Ok(Runner {
46 conn,
47 opt,
48 server_id,
49 table_map: TableMap::new(),
50 binlog_checksum_length: 0,
51 })
52 }
53 fn prepare(&mut self) -> Result<()> {
54 self.register_slave()?;
55 self.write_register_slave_command()?;
56 self.enable_semi_sync()
57 }
58 fn register_slave(&mut self) -> Result<()> {
59 let rsl: Vec<(String, String)> =
60 self.query("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'")?;
61 if rsl.len() == 1 {
62 let (_, s) = *none_ref!(rsl.get(0));
63 if !s.is_empty() {
64 self.query_drop("SET @master_binlog_checksum='NONE'")?;
65 }
66 }
67 self.query_drop("SET @master_heartbeat_period=30000000000;")?;
68 Ok(())
69 }
70 fn write_register_slave_command(&mut self) -> Result<()> {
71 let h_name = sys_info::hostname()?;
72 let mut data = vec![0u8; 0];
73 data.write_u32::<LittleEndian>(self.server_id)?;
75 data.write_u8(h_name.len() as u8)?;
76 data.write_all(h_name.as_bytes())?;
77 data.write_u8(none!(self.opt.get_user()).len() as u8)?;
78 data.write_all(none!(self.opt.get_user()).as_bytes())?;
79 data.write_u8(none!(self.opt.get_pass()).len() as u8)?;
80 data.write_all(none!(self.opt.get_pass()).as_bytes())?;
81 data.write_u16::<LittleEndian>(self.opt.get_tcp_port())?;
82 data.write_u32::<LittleEndian>(0)?;
83 data.write_u32::<LittleEndian>(0)?;
84 self.write_command(Command::COM_REGISTER_SLAVE, data.as_slice())?;
85 let rsl = self.read_packet()?;
86 println!("{:?}", rsl);
87 Ok(())
88 }
89 fn enable_semi_sync(&mut self) -> Result<()> {
90 let _: Vec<(String, String)> =
91 self.query("SHOW VARIABLES LIKE 'rpl_semi_sync_master_enabled';")?;
92 self.query_drop("SET @rpl_semi_sync_slave = 1;")?;
93 Ok(())
94 }
95 pub fn start_sync(&mut self, offset: OffsetConfig) -> Result<()> {
96 if offset.pos.is_some() {
97 self.prepare()?;
98 let mut data = vec![0u8; 0];
99 data.write_u32::<LittleEndian>(none_ref!(offset.pos).1)?;
100 data.write_u16::<LittleEndian>(0)?;
101 data.write_u32::<LittleEndian>(self.server_id)?;
102 data.write_all(none_ref!(offset.pos).0.as_bytes())?;
103 self.write_command(Command::COM_BINLOG_DUMP, data.as_slice())?;
104 } else if offset.gtid.is_some() {
105 self.prepare()?;
106 let mut data = vec![0u8; 0];
107 data.write_u16::<LittleEndian>(0)?;
108 data.write_u32::<LittleEndian>(self.server_id)?;
109 data.write_u32::<LittleEndian>("".len() as u32)?;
110 data.write_all("".as_bytes())?;
111 data.write_u64::<LittleEndian>(4)?;
112 let gtid = none_ref!(offset.gtid);
113 let gtiddata = gtid.encode()?;
114 data.write_u32::<LittleEndian>(gtiddata.len() as u32)?;
115 data.write_all(gtiddata.as_slice())?;
116 self.write_command(Command::COM_BINLOG_DUMP_GTID, data.as_slice())?;
117 } else {
118 return Err(Box::from("show set gtid or fileName/offset"));
119 }
120 Ok(())
121 }
122 fn parse_event(&mut self, data: Vec<u8>) -> Result<Event> {
123 let header: mysql_binlog::event::EventData;
124 match mysql_binlog::event::EventData::parse_header(&data[1..])? {
126 Some(EventHeader {
127 timestamp,
128 event_type: typ,
129 server_id,
130 event_size,
131 log_pos,
132 flags,
133 }) => {
134 header = mysql_binlog::event::EventData::EventHeader {
135 timestamp,
136 event_type: typ,
137 server_id,
138 event_size,
139 log_pos,
140 flags,
141 };
142 match typ {
143 mysql_binlog::event::TypeCode::XidEvent => {
144 let event = mysql_binlog::event::EventData::from_data(
145 typ,
146 &data[EVENT_HEADER_SIZE + 1..data.len() - self.binlog_checksum_length],
147 Some(&self.table_map),
148 )?;
149 Ok(Event { header, event })
150 }
151 mysql_binlog::event::TypeCode::RotateEvent => {
152 let event = mysql_binlog::event::EventData::from_data(
153 typ,
154 &data[EVENT_HEADER_SIZE + 1..data.len() - self.binlog_checksum_length],
155 Some(&self.table_map),
156 )?;
157 Ok(Event { header, event })
158 }
159 mysql_binlog::event::TypeCode::QueryEvent => {
160 let event = mysql_binlog::event::EventData::from_data(
161 typ,
162 &data[EVENT_HEADER_SIZE + 1..data.len() - self.binlog_checksum_length],
163 Some(&self.table_map),
164 )?;
165 Ok(Event { header, event })
166 }
167 mysql_binlog::event::TypeCode::TableMapEvent => {
168 let event = mysql_binlog::event::EventData::from_data(
169 typ,
170 &data[EVENT_HEADER_SIZE + 1..data.len() - self.binlog_checksum_length],
171 Some(&self.table_map),
172 )?;
173 if let Some(ref e) = event {
174 match e {
175 mysql_binlog::event::EventData::TableMapEvent {
176 table_id: d1,
177 schema_name: d2,
178 table_name: d3,
179 columns: d4,
180 ..
181 } => self
182 .table_map
183 .handle(*d1, d2.clone(), d3.clone(), d4.clone()),
184 _ => {
185 println!("nop")
186 }
187 }
188 }
189 Ok(Event { header, event })
190 }
191 mysql_binlog::event::TypeCode::UpdateRowsEventV2
192 | mysql_binlog::event::TypeCode::WriteRowsEventV2
193 | mysql_binlog::event::TypeCode::DeleteRowsEventV2 => {
194 let event = mysql_binlog::event::EventData::from_data(
195 typ,
196 &data[EVENT_HEADER_SIZE + 1..data.len() - self.binlog_checksum_length],
197 Some(&self.table_map),
198 )?;
199 Ok(Event { header, event })
200 }
201 mysql_binlog::event::TypeCode::FormatDescriptionEvent => {
202 let event = mysql_binlog::event::EventData::from_data(
203 typ,
204 &data[EVENT_HEADER_SIZE + 1..data.len()],
205 Some(&self.table_map),
206 )?;
207 if let Some(FormatDescriptionEvent {
208 checksum_algorithm: ca,
209 ..
210 }) = event.as_ref()
211 {
212 match ca {
213 ChecksumAlgorithm::None => self.binlog_checksum_length = 0,
214 ChecksumAlgorithm::CRC32 => self.binlog_checksum_length = 4,
215 ChecksumAlgorithm::Other(size) => {
216 self.binlog_checksum_length = *size as usize
217 }
218 }
219 }
220 Ok(Event { header, event })
221 }
222 _ => {
223 let event = mysql_binlog::event::EventData::from_data(
224 typ,
225 &data[EVENT_HEADER_SIZE + 1..data.len() - self.binlog_checksum_length],
226 Some(&self.table_map),
227 )?;
228 Ok(Event { header, event })
229 }
230 }
231 }
232 None => Err(Box::from("invalid event header")),
233 _ => Err(Box::from("parse event error")),
234 }
235 }
236 fn handle_error_packet(&mut self, data: Vec<u8>) -> Result<(u16, String, String)> {
237 let mut cursor = Cursor::new(data);
238 cursor.set_position(2);
239 let code = cursor.read_u16::<LittleEndian>()?;
240 let _ = cursor.read_u8()?;
241 let mut state = String::with_capacity(5);
242 cursor.read_to_string(&mut state)?;
243 let mut message = "".to_string();
244 cursor.read_to_string(&mut message);
245 Ok((code, state, message))
246 }
247 pub fn get_event(&mut self) -> Result<Event> {
248 loop {
249 match self.read_packet() {
250 Ok(data) => match none!(data.get(0)) {
251 &0 => return self.parse_event(data),
252 &0xff => {
253 let (code, state, message) = self.handle_error_packet(data)?;
254 return Err(Box::from(format!("{}{}:{}", code, state, message)));
255 }
256 &_ => continue,
257 },
258 Err(e) => return Err(Box::from(e.to_string())),
259 }
260 }
261 }
262}
263
264#[test]
265fn test_conn_progress() {
266 let mut runner = Runner::new("mysql://root:123456@127.0.0.1:3306", 1111).unwrap();
267 runner.prepare().unwrap();
268 runner
269 .start_sync(OffsetConfig {
270 pos: Some(("mysql-bin.000132".to_string(), 194)),
271 gtid: None,
272 })
273 .unwrap();
274 runner.get_event();
275}