1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166

use rowevents::parser::Parser;
use rowevents::stream::Stream;
use rowevents::event_header::EventHeader;
use rowevents::events::*;
use std::io::Result;
use std::io::{Error, ErrorKind};
extern crate regex;
use regex::Regex;

pub struct Reader {
    filename: String,
    parser: Parser,
    skip_next_event: bool,
    concerned_events: Vec<i8>,
    excluded_db_table_list: Vec<Regex>,
}

impl Reader {
    
    pub fn new(filename: &str) -> Result<Reader> {
        
        if let Some(stream) = Stream::from_file(filename) {
            let mut parser = Parser::new(stream);
            parser.read_binlog_file_header();
            Ok(Reader{
                filename: filename.to_string(),
                parser: parser,
                skip_next_event: false,
                concerned_events: Vec::with_capacity(20),
                excluded_db_table_list: Vec::with_capacity(20),
            })
        } else {
            Err(Error::new(ErrorKind::Other, "oh no!"))
        }
    }

    pub fn open_next_binlog_file(&mut self) -> bool {
        self.parser.read_next_binlog_file();
        self.parser.read_binlog_file_header()
    }
    
    #[inline]
    pub fn add_concerned_event(&mut self, event_type: i8) {
        self.concerned_events.push(event_type);
    }

    #[inline]
    pub fn is_concerned_event(&mut self, event_type: i8) -> bool  {
        self.concerned_events.len() == 0 || self.concerned_events.contains(&event_type)
    }

    pub fn add_excluded_db_table(&mut self, db_table_name: &str) {
        let regexp = db_table_name.replace(".", "\\.");
        let regexp = regexp.replace("*", "\\w*");
        self.excluded_db_table_list.push(Regex::new(&regexp).unwrap());
    }

    pub fn is_excluded(&mut self, db_name: &str, table_name: &str) -> bool {
        let db_table_name = db_name.to_string() + "." + table_name;
        for ref re in self.excluded_db_table_list.iter() {
            if re.is_match(&db_table_name) {
                return true;
            }
        }
        return false;
    }

    // ????
    pub fn read_event(&mut self) -> Result<(EventHeader, Event)> {
        if let Ok(eh) = self.read_event_header() {

            if self.skip_next_event || !self.is_concerned_event(eh.get_event_type()) {
                if let Ok(e) = self.read_unknown_event(&eh) {
                    // Recover from skip
                    self.set_skip_next_event(false);
                    Ok((eh, e))
                } else {
                    Err(Error::new(ErrorKind::Other, "oh no!"))
                }
            } else if let Ok(e) = self.read_event_detail(&eh) {
                match e {
                    Event::TableMap(ref e) => {
                        if self.is_excluded(&e.db_name, &e.table_name) {
                            // println!("Excluded {}.{}", e.db_name, e.table_name);
                            self.set_skip_next_event(true);
                        }
                    },

                    Event::Rotate(ref e) => {
                        println!("Open next binlog file...");
                        self.open_next_binlog_file();
                    },
                    _ => ()
                }

                Ok((eh, e))
                
            } else {
                Err(Error::new(ErrorKind::Other, "oh no!"))
            }
        } else {
            Err(Error::new(ErrorKind::Other, "oh no!"))
        }
    }

    #[inline]
    pub fn read_event_header(&mut self) -> Result<EventHeader> {
        self.parser.read_event_header()
    }

    pub fn read_event_detail(&mut self, eh: &EventHeader) -> Result<Event> {
        
        match eh.get_event_type() {
            QUERY_EVENT => self.parser.read_query_event(eh),

            STOP_EVENT | ROTATE_EVENT => {
                let e = self.parser.read_rotate_event(eh);
                self.open_next_binlog_file();
                e
            },

            FORMAT_DESCRIPTION_EVENT => self.parser.read_format_descriptor_event(eh),
            XID_EVENT => self.parser.read_xid_event(eh),

            TABLE_MAP_EVENT  => self.parser.read_table_map_event(eh),

            // WRITE_ROWS_EVENT  => self.parser.read_event(eh),
            // UPDATE_ROWS_EVENT  => self.parser.read_event(eh),
            // DELETE_ROWS_EVENT  => self.parser.read_event(eh),

            WRITE_ROWS_EVENT2 => self.parser.read_write_event(eh),
            UPDATE_ROWS_EVENT2 => self.parser.read_update_event(eh),
            DELETE_ROWS_EVENT2 => self.parser.read_delete_event(eh),

            _ => self.parser.read_unknown_event(eh)
        }
    }

    pub fn read_unknown_event(&mut self, eh: &EventHeader) -> Result<Event> {
        self.parser.read_unknown_event(eh)
    }

    pub fn set_skip_next_event(&mut self, skip: bool) {
        self.skip_next_event = skip;
    }

    #[inline]
    pub fn skip_next_event(&self) -> bool {
        self.skip_next_event
    }
}

impl Iterator for Reader {
    type Item = (EventHeader, Event);

    // next() is the only required method
    fn next(&mut self) -> Option<(EventHeader, Event)> {
        if let Ok((eh, e)) = self.read_event() {
            Some((eh, e))
        } else {
            None
        }
    }
}