mysqlbinlog_network/mysql_binlog/
binlog_file.rs

1use std::fs::File;
2use std::io::{self, Read, Seek};
3use std::path::{Path, PathBuf};
4
5use crate::mysql_binlog::errors::{BinlogParseError, EventParseError};
6use crate::mysql_binlog::event::{Event, TypeCode};
7
8/// Low level wrapper around a single Binlog file. Use this if you
9/// want to introspect all events (including internal events like the FDE
10/// and TME)
11pub struct BinlogFile<I: Seek + Read> {
12    file_name: Option<PathBuf>,
13    file: I,
14    first_event_offset: u64,
15}
16
17pub struct BinlogEvents<I: Seek + Read> {
18    file: BinlogFile<I>,
19    // if the offset is None, it means that we can't read any more
20    // for whatever reason
21    offset: Option<u64>,
22}
23
24impl<I: Seek + Read> BinlogEvents<I> {
25    pub fn new(mut bf: BinlogFile<I>, start_offset: u64) -> Self {
26        bf.file.seek(io::SeekFrom::Start(start_offset)).unwrap();
27        BinlogEvents {
28            offset: Some(start_offset),
29            file: bf,
30        }
31    }
32}
33
34impl<I: Seek + Read> Iterator for BinlogEvents<I> {
35    type Item = Result<Event, EventParseError>;
36
37    fn next(&mut self) -> Option<Self::Item> {
38        let event = match self.offset {
39            Some(offset) => match self.file.read_at(offset) {
40                Ok(e) => e,
41                Err(EventParseError::Io(_)) => return None,
42                Err(EventParseError::EofError) => return None,
43                Err(e) => return Some(Err(e)),
44            },
45            None => return None,
46        };
47        if event.type_code() == TypeCode::RotateEvent {
48            self.offset = None;
49        } else {
50            self.offset = Some(event.next_position());
51        }
52        Some(Ok(event))
53    }
54}
55
56impl BinlogFile<File> {
57    /// Construct a new BinLogFile from the given path
58    ///
59    /// Opens the file and reads/parses the FDE at construction time
60    pub fn try_from_path<R: AsRef<Path>>(path: R) -> Result<Self, BinlogParseError> {
61        let p = path.as_ref();
62        let fh = File::open(p).map_err(BinlogParseError::OpenError)?;
63        Self::try_new_from_reader_name(fh, Some(p.to_owned()))
64    }
65}
66
67impl<I: Seek + Read> BinlogFile<I> {
68    pub fn try_from_reader(reader: I) -> Result<Self, BinlogParseError> {
69        Self::try_new_from_reader_name(reader, None)
70    }
71
72    fn try_new_from_reader_name(
73        mut fh: I,
74        name: Option<PathBuf>,
75    ) -> Result<Self, BinlogParseError> {
76        // read the magic bytes
77        let mut magic = [0u8; 4];
78        fh.read_exact(&mut magic)?;
79        if magic != [0xfeu8, 0x62, 0x69, 0x6e] {
80            return Err(BinlogParseError::BadMagic(magic).into());
81        }
82        let fde = Event::read(&mut fh, 4)?;
83        if fde.inner(None)?.is_some() {
84            // XXX: todo: thread through common_header_len
85        } else {
86            return Err(BinlogParseError::BadFirstRecord.into());
87        }
88        Ok(BinlogFile {
89            file_name: name,
90            file: fh,
91            first_event_offset: fde.next_position(),
92        })
93    }
94
95    fn read_at(&mut self, offset: u64) -> Result<Event, EventParseError> {
96        self.file.seek(io::SeekFrom::Start(offset))?;
97        Event::read(&mut self.file, offset).map_err(|i| i.into())
98    }
99
100    /// Iterate throgh events in this BinLog file, optionally from the given
101    /// starting offset.
102    pub fn events(self, offset: Option<u64>) -> BinlogEvents<I> {
103        let offset = offset.unwrap_or(self.first_event_offset);
104        BinlogEvents::new(self, offset)
105    }
106
107    pub fn file_name(&self) -> Option<&Path> {
108        self.file_name.as_ref().map(|a| a.as_ref())
109    }
110}