mysqlbinlog_network/mysql_binlog/
mod.rs1use std::fs::File;
21use std::io::{Read, Seek};
22use std::path::Path;
23
24pub mod binlog_file;
25mod bit_set;
26pub mod column_types;
27pub mod errors;
28pub mod event;
29mod jsonb;
30mod packet_helpers;
31pub mod table_map;
32mod tell;
33pub mod value;
34
35use event::EventData;
36use serde_derive::Serialize;
37
38use errors::{BinlogParseError, EventParseError};
39
40#[derive(Debug, Clone, Copy)]
41pub struct Gtid(uuid::Uuid, u64);
43
44impl serde::Serialize for Gtid {
45 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
46 where
47 S: serde::Serializer,
48 {
49 let serialized = format!("{}:{}", self.0.to_hyphenated(), self.1);
50 serializer.serialize_str(&serialized)
51 }
52}
53
54impl ToString for Gtid {
55 fn to_string(&self) -> String {
56 format!("{}:{}", self.0.to_hyphenated(), self.1)
57 }
58}
59
60#[derive(Debug, Clone, Copy, Serialize)]
61pub struct LogicalTimestamp {
62 last_committed: u64,
63 sequence_number: u64,
64}
65
66#[derive(Debug, Serialize)]
67pub struct BinlogEvent {
70 pub type_code: event::TypeCode,
71 pub timestamp: u32,
73 pub gtid: Option<Gtid>,
74 pub logical_timestamp: Option<LogicalTimestamp>,
75 #[serde(skip_serializing_if = "Option::is_none")]
76 pub schema_name: Option<String>,
77 #[serde(skip_serializing_if = "Option::is_none")]
78 pub table_name: Option<String>,
79 #[serde(skip_serializing_if = "Vec::is_empty")]
80 pub rows: Vec<event::RowEvent>,
81 #[serde(skip_serializing_if = "Option::is_none")]
82 pub query: Option<String>,
83 pub offset: u64,
84}
85
86pub struct EventIterator<BR: Read + Seek> {
88 events: binlog_file::BinlogEvents<BR>,
89 table_map: table_map::TableMap,
90 current_gtid: Option<Gtid>,
91 logical_timestamp: Option<LogicalTimestamp>,
92}
93
94impl<BR: Read + Seek> EventIterator<BR> {
95 fn new(bf: binlog_file::BinlogFile<BR>, start_offset: Option<u64>) -> Self {
96 EventIterator {
97 events: bf.events(start_offset),
98 table_map: table_map::TableMap::new(),
99 current_gtid: None,
100 logical_timestamp: None,
101 }
102 }
103}
104
105impl<BR: Read + Seek> Iterator for EventIterator<BR> {
106 type Item = Result<BinlogEvent, EventParseError>;
107
108 fn next(&mut self) -> Option<Self::Item> {
109 while let Some(event) = self.events.next() {
110 let event = match event {
111 Ok(event) => event,
112 Err(e) => return Some(Err(e)),
113 };
114 let offset = event.offset();
115 match event.inner(Some(&self.table_map)) {
116 Ok(Some(e)) => match e {
117 EventData::GtidLogEvent {
118 uuid,
119 coordinate,
120 last_committed,
121 sequence_number,
122 ..
123 } => {
124 self.current_gtid = Some(Gtid(uuid, coordinate));
125 if let (Some(last_committed), Some(sequence_number)) =
126 (last_committed, sequence_number)
127 {
128 self.logical_timestamp = Some(LogicalTimestamp {
129 last_committed,
130 sequence_number,
131 });
132 } else {
133 self.logical_timestamp = None;
134 }
135 }
136 EventData::TableMapEvent {
137 table_id,
138 schema_name,
139 table_name,
140 columns,
141 ..
142 } => {
143 self.table_map
144 .handle(table_id, schema_name, table_name, columns);
145 }
146 EventData::QueryEvent { query, .. } => {
147 return Some(Ok(BinlogEvent {
148 offset,
149 type_code: event.type_code(),
150 timestamp: event.timestamp(),
151 gtid: self.current_gtid,
152 logical_timestamp: self.logical_timestamp,
153 table_name: None,
154 schema_name: None,
155 rows: Vec::new(),
156 query: Some(query),
157 }))
158 }
159 EventData::WriteRowsEvent { table_id, rows }
160 | EventData::UpdateRowsEvent { table_id, rows }
161 | EventData::DeleteRowsEvent { table_id, rows } => {
162 let maybe_table = self.table_map.get(table_id);
163 let message = BinlogEvent {
164 offset,
165 type_code: event.type_code(),
166 timestamp: event.timestamp(),
167 gtid: self.current_gtid,
168 logical_timestamp: self.logical_timestamp,
169 table_name: maybe_table.as_ref().map(|a| a.table_name.to_owned()),
170 schema_name: maybe_table.as_ref().map(|a| a.schema_name.to_owned()),
171 rows,
172 query: None,
173 };
174 return Some(Ok(message));
175 }
176 u => {
177 eprintln!("unhandled event: {:?}", u);
178 }
179 },
180 Ok(None) => {
181 }
184 Err(e) => return Some(Err(e)),
185 }
186 }
187 None
188 }
189}
190
191pub struct BinlogFileParserBuilder<BR: Read + Seek> {
193 bf: binlog_file::BinlogFile<BR>,
194 start_position: Option<u64>,
195}
196
197impl BinlogFileParserBuilder<File> {
198 pub fn try_from_path<P: AsRef<Path>>(file_name: P) -> Result<Self, BinlogParseError> {
200 let bf = binlog_file::BinlogFile::try_from_path(file_name.as_ref())?;
201 Ok(BinlogFileParserBuilder {
202 bf: bf,
203 start_position: None,
204 })
205 }
206}
207
208impl<BR: Read + Seek> BinlogFileParserBuilder<BR> {
209 pub fn try_from_reader(r: BR) -> Result<Self, BinlogParseError> {
211 let bf = binlog_file::BinlogFile::try_from_reader(r)?;
212 Ok(BinlogFileParserBuilder {
213 bf: bf,
214 start_position: None,
215 })
216 }
217
218 pub fn start_position(mut self, pos: u64) -> Self {
222 self.start_position = Some(pos);
223 self
224 }
225
226 pub fn build(self) -> EventIterator<BR> {
228 EventIterator::new(self.bf, self.start_position)
229 }
230}
231
232pub fn parse_reader<R: Read + Seek + 'static>(r: R) -> Result<EventIterator<R>, BinlogParseError> {
239 BinlogFileParserBuilder::try_from_reader(r).map(|b| b.build())
240}
241
242pub fn parse_file<P: AsRef<Path>>(file_name: P) -> Result<EventIterator<File>, BinlogParseError> {
249 BinlogFileParserBuilder::try_from_path(file_name).map(|b| b.build())
250}
251
252#[cfg(test)]
253mod tests {
254 use assert_matches::assert_matches;
255
256 use bigdecimal::BigDecimal;
257
258 use super::{parse_file, parse_reader};
259 use crate::event::TypeCode;
260 use crate::mysql_binlog::event::TypeCode;
261 use crate::mysql_binlog::value::MySQLValue;
262 use crate::value::MySQLValue;
263
264 #[test]
265 fn test_parse_file() {
266 let results = parse_file("test_data/bin-log.000001")
267 .unwrap()
268 .collect::<Result<Vec<_>, _>>()
269 .unwrap();
270 assert_eq!(results.len(), 5);
271 assert_eq!(results[0].type_code, TypeCode::QueryEvent);
272 assert_eq!(results[0].query, Some("CREATE TABLE foo(id BIGINT AUTO_INCREMENT PRIMARY KEY, val_decimal DECIMAL(10, 5) NOT NULL, comment VARCHAR(255) NOT NULL)".to_owned()));
273 assert_eq!(results[2].timestamp, 1550192291);
274 assert_eq!(
275 results[2].gtid.unwrap().to_string(),
276 "87cee3a4-6b31-11e7-bdfd-0d98d6698870:14918"
277 );
278 assert_eq!(
279 results[2].schema_name.as_ref().map(|s| s.as_str()),
280 Some("bltest")
281 );
282 assert_eq!(
283 results[2].table_name.as_ref().map(|s| s.as_str()),
284 Some("foo")
285 );
286 let cols = results[2].rows[0].cols().unwrap();
287 assert_matches!(cols[0], Some(MySQLValue::SignedInteger(1)));
288 assert_matches!(cols[1], Some(MySQLValue::Decimal(_)));
289 if let Some(MySQLValue::Decimal(ref d)) = cols[1] {
290 assert_eq!(*d, "0.1".parse::<BigDecimal>().unwrap());
291 }
292 assert_matches!(cols[2], Some(MySQLValue::String(_)));
293 }
294
295 #[test]
296 fn test_parse_reader() {
297 let f = std::fs::File::open("test_data/bin-log.000001").unwrap();
298 let results = parse_reader(f)
299 .unwrap()
300 .collect::<Result<Vec<_>, _>>()
301 .unwrap();
302 assert_eq!(results.len(), 5);
303 assert_eq!(results[0].type_code, TypeCode::QueryEvent);
304 assert_eq!(results[0].query, Some("CREATE TABLE foo(id BIGINT AUTO_INCREMENT PRIMARY KEY, val_decimal DECIMAL(10, 5) NOT NULL, comment VARCHAR(255) NOT NULL)".to_owned()));
305 }
306}