mysqlbinlog_network/mysql_binlog/
mod.rs

1//! Parser for the MySQL binary log format.
2//!
3//! # Limitations
4//!
5//! - Targets Percona and Oracle MySQL 5.6 and 5.7. Has not been tested with MariaDB, MySQL 8.0, or older versions of MySQL
6//! - Like all 5.6/5.7 MySQL implementations, UNSIGNED BIGINT cannot safely represent numbers between `2^63` and `2^64` because `i64` is used internally for all integral data types
7//!
8//! # Example
9//!
10//! A simple command line event parser and printer
11//!
12//! ```no_run
13//! fn main() {
14//!     for event in crate::mysql_binlog::parse_file("bin-log.000001").unwrap() {
15//!         println!("{:?}", event.unwrap());
16//!     }
17//! }
18//! ```
19
20use std::fs::File;
21use std::io::{Read, Seek};
22use std::path::Path;
23
24pub mod binlog_file;
25mod bit_set;
26pub mod column_types;
27pub mod errors;
28pub mod event;
29mod jsonb;
30mod packet_helpers;
31pub mod table_map;
32mod tell;
33pub mod value;
34
35use event::EventData;
36use serde_derive::Serialize;
37
38use errors::{BinlogParseError, EventParseError};
39
40#[derive(Debug, Clone, Copy)]
41/// Global Transaction ID
42pub struct Gtid(uuid::Uuid, u64);
43
44impl serde::Serialize for Gtid {
45    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
46    where
47        S: serde::Serializer,
48    {
49        let serialized = format!("{}:{}", self.0.to_hyphenated(), self.1);
50        serializer.serialize_str(&serialized)
51    }
52}
53
54impl ToString for Gtid {
55    fn to_string(&self) -> String {
56        format!("{}:{}", self.0.to_hyphenated(), self.1)
57    }
58}
59
60#[derive(Debug, Clone, Copy, Serialize)]
61pub struct LogicalTimestamp {
62    last_committed: u64,
63    sequence_number: u64,
64}
65
66#[derive(Debug, Serialize)]
67/// A binlog event as returned by [`EventIterator`]. Filters out internal events
68/// like the TableMapEvent and simplifies mapping GTIDs to individual events.
69pub struct BinlogEvent {
70    pub type_code: event::TypeCode,
71    // warning: Y2038 Problem ahead
72    pub timestamp: u32,
73    pub gtid: Option<Gtid>,
74    pub logical_timestamp: Option<LogicalTimestamp>,
75    #[serde(skip_serializing_if = "Option::is_none")]
76    pub schema_name: Option<String>,
77    #[serde(skip_serializing_if = "Option::is_none")]
78    pub table_name: Option<String>,
79    #[serde(skip_serializing_if = "Vec::is_empty")]
80    pub rows: Vec<event::RowEvent>,
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub query: Option<String>,
83    pub offset: u64,
84}
85
86/// Iterator over [`BinlogEvent`]s
87pub struct EventIterator<BR: Read + Seek> {
88    events: binlog_file::BinlogEvents<BR>,
89    table_map: table_map::TableMap,
90    current_gtid: Option<Gtid>,
91    logical_timestamp: Option<LogicalTimestamp>,
92}
93
94impl<BR: Read + Seek> EventIterator<BR> {
95    fn new(bf: binlog_file::BinlogFile<BR>, start_offset: Option<u64>) -> Self {
96        EventIterator {
97            events: bf.events(start_offset),
98            table_map: table_map::TableMap::new(),
99            current_gtid: None,
100            logical_timestamp: None,
101        }
102    }
103}
104
105impl<BR: Read + Seek> Iterator for EventIterator<BR> {
106    type Item = Result<BinlogEvent, EventParseError>;
107
108    fn next(&mut self) -> Option<Self::Item> {
109        while let Some(event) = self.events.next() {
110            let event = match event {
111                Ok(event) => event,
112                Err(e) => return Some(Err(e)),
113            };
114            let offset = event.offset();
115            match event.inner(Some(&self.table_map)) {
116                Ok(Some(e)) => match e {
117                    EventData::GtidLogEvent {
118                        uuid,
119                        coordinate,
120                        last_committed,
121                        sequence_number,
122                        ..
123                    } => {
124                        self.current_gtid = Some(Gtid(uuid, coordinate));
125                        if let (Some(last_committed), Some(sequence_number)) =
126                            (last_committed, sequence_number)
127                        {
128                            self.logical_timestamp = Some(LogicalTimestamp {
129                                last_committed,
130                                sequence_number,
131                            });
132                        } else {
133                            self.logical_timestamp = None;
134                        }
135                    }
136                    EventData::TableMapEvent {
137                        table_id,
138                        schema_name,
139                        table_name,
140                        columns,
141                        ..
142                    } => {
143                        self.table_map
144                            .handle(table_id, schema_name, table_name, columns);
145                    }
146                    EventData::QueryEvent { query, .. } => {
147                        return Some(Ok(BinlogEvent {
148                            offset,
149                            type_code: event.type_code(),
150                            timestamp: event.timestamp(),
151                            gtid: self.current_gtid,
152                            logical_timestamp: self.logical_timestamp,
153                            table_name: None,
154                            schema_name: None,
155                            rows: Vec::new(),
156                            query: Some(query),
157                        }))
158                    }
159                    EventData::WriteRowsEvent { table_id, rows }
160                    | EventData::UpdateRowsEvent { table_id, rows }
161                    | EventData::DeleteRowsEvent { table_id, rows } => {
162                        let maybe_table = self.table_map.get(table_id);
163                        let message = BinlogEvent {
164                            offset,
165                            type_code: event.type_code(),
166                            timestamp: event.timestamp(),
167                            gtid: self.current_gtid,
168                            logical_timestamp: self.logical_timestamp,
169                            table_name: maybe_table.as_ref().map(|a| a.table_name.to_owned()),
170                            schema_name: maybe_table.as_ref().map(|a| a.schema_name.to_owned()),
171                            rows,
172                            query: None,
173                        };
174                        return Some(Ok(message));
175                    }
176                    u => {
177                        eprintln!("unhandled event: {:?}", u);
178                    }
179                },
180                Ok(None) => {
181                    // this event doesn't have an inner type, which means we don't currently
182                    // care about it. Example: PreviousGtidEvent
183                }
184                Err(e) => return Some(Err(e)),
185            }
186        }
187        None
188    }
189}
190
191/// Builder to configure Binary Log reading
192pub struct BinlogFileParserBuilder<BR: Read + Seek> {
193    bf: binlog_file::BinlogFile<BR>,
194    start_position: Option<u64>,
195}
196
197impl BinlogFileParserBuilder<File> {
198    /// Construct a new BinlogFileParserBuilder from some path
199    pub fn try_from_path<P: AsRef<Path>>(file_name: P) -> Result<Self, BinlogParseError> {
200        let bf = binlog_file::BinlogFile::try_from_path(file_name.as_ref())?;
201        Ok(BinlogFileParserBuilder {
202            bf: bf,
203            start_position: None,
204        })
205    }
206}
207
208impl<BR: Read + Seek> BinlogFileParserBuilder<BR> {
209    /// Construct a new BinlogFileParserBuilder from some object implementing Read and Seek
210    pub fn try_from_reader(r: BR) -> Result<Self, BinlogParseError> {
211        let bf = binlog_file::BinlogFile::try_from_reader(r)?;
212        Ok(BinlogFileParserBuilder {
213            bf: bf,
214            start_position: None,
215        })
216    }
217
218    /// Set the start position to begin emitting events. NOTE: The beginning of the binlog will
219    /// always be read first for the FDE. NOTE: Column mappings may be incorrect if you use this
220    /// functionality, as TMEs may be missed.
221    pub fn start_position(mut self, pos: u64) -> Self {
222        self.start_position = Some(pos);
223        self
224    }
225
226    /// Consume this builder, returning an iterator of [`BinlogEvent`] structs
227    pub fn build(self) -> EventIterator<BR> {
228        EventIterator::new(self.bf, self.start_position)
229    }
230}
231
232/// Parse events from an object implementing the [`std::io::Read`] trait
233///
234/// ## Errors
235///
236/// - returns an immediate error if the Read does not begin with a valid Format Descriptor Event
237/// - each call to the iterator can return an error if there is an I/O or parsing error
238pub fn parse_reader<R: Read + Seek + 'static>(r: R) -> Result<EventIterator<R>, BinlogParseError> {
239    BinlogFileParserBuilder::try_from_reader(r).map(|b| b.build())
240}
241
242/// parse all events in the file living at a given path
243///
244/// ## Errors
245///
246/// - returns an immediate error if the file could not be opened or if it does not contain a valid Format Desciptor Event
247/// - each call to the iterator can return an error if there is an I/O or parsing error
248pub fn parse_file<P: AsRef<Path>>(file_name: P) -> Result<EventIterator<File>, BinlogParseError> {
249    BinlogFileParserBuilder::try_from_path(file_name).map(|b| b.build())
250}
251
252#[cfg(test)]
253mod tests {
254    use assert_matches::assert_matches;
255
256    use bigdecimal::BigDecimal;
257
258    use super::{parse_file, parse_reader};
259    use crate::event::TypeCode;
260    use crate::mysql_binlog::event::TypeCode;
261    use crate::mysql_binlog::value::MySQLValue;
262    use crate::value::MySQLValue;
263
264    #[test]
265    fn test_parse_file() {
266        let results = parse_file("test_data/bin-log.000001")
267            .unwrap()
268            .collect::<Result<Vec<_>, _>>()
269            .unwrap();
270        assert_eq!(results.len(), 5);
271        assert_eq!(results[0].type_code, TypeCode::QueryEvent);
272        assert_eq!(results[0].query, Some("CREATE TABLE foo(id BIGINT AUTO_INCREMENT PRIMARY KEY, val_decimal DECIMAL(10, 5) NOT NULL, comment VARCHAR(255) NOT NULL)".to_owned()));
273        assert_eq!(results[2].timestamp, 1550192291);
274        assert_eq!(
275            results[2].gtid.unwrap().to_string(),
276            "87cee3a4-6b31-11e7-bdfd-0d98d6698870:14918"
277        );
278        assert_eq!(
279            results[2].schema_name.as_ref().map(|s| s.as_str()),
280            Some("bltest")
281        );
282        assert_eq!(
283            results[2].table_name.as_ref().map(|s| s.as_str()),
284            Some("foo")
285        );
286        let cols = results[2].rows[0].cols().unwrap();
287        assert_matches!(cols[0], Some(MySQLValue::SignedInteger(1)));
288        assert_matches!(cols[1], Some(MySQLValue::Decimal(_)));
289        if let Some(MySQLValue::Decimal(ref d)) = cols[1] {
290            assert_eq!(*d, "0.1".parse::<BigDecimal>().unwrap());
291        }
292        assert_matches!(cols[2], Some(MySQLValue::String(_)));
293    }
294
295    #[test]
296    fn test_parse_reader() {
297        let f = std::fs::File::open("test_data/bin-log.000001").unwrap();
298        let results = parse_reader(f)
299            .unwrap()
300            .collect::<Result<Vec<_>, _>>()
301            .unwrap();
302        assert_eq!(results.len(), 5);
303        assert_eq!(results[0].type_code, TypeCode::QueryEvent);
304        assert_eq!(results[0].query, Some("CREATE TABLE foo(id BIGINT AUTO_INCREMENT PRIMARY KEY, val_decimal DECIMAL(10, 5) NOT NULL, comment VARCHAR(255) NOT NULL)".to_owned()));
305    }
306}