mysqlbinlog_network/mysql_binlog/
binlog_file.rs1use std::fs::File;
2use std::io::{self, Read, Seek};
3use std::path::{Path, PathBuf};
4
5use crate::mysql_binlog::errors::{BinlogParseError, EventParseError};
6use crate::mysql_binlog::event::{Event, TypeCode};
7
8pub struct BinlogFile<I: Seek + Read> {
12 file_name: Option<PathBuf>,
13 file: I,
14 first_event_offset: u64,
15}
16
17pub struct BinlogEvents<I: Seek + Read> {
18 file: BinlogFile<I>,
19 offset: Option<u64>,
22}
23
24impl<I: Seek + Read> BinlogEvents<I> {
25 pub fn new(mut bf: BinlogFile<I>, start_offset: u64) -> Self {
26 bf.file.seek(io::SeekFrom::Start(start_offset)).unwrap();
27 BinlogEvents {
28 offset: Some(start_offset),
29 file: bf,
30 }
31 }
32}
33
34impl<I: Seek + Read> Iterator for BinlogEvents<I> {
35 type Item = Result<Event, EventParseError>;
36
37 fn next(&mut self) -> Option<Self::Item> {
38 let event = match self.offset {
39 Some(offset) => match self.file.read_at(offset) {
40 Ok(e) => e,
41 Err(EventParseError::Io(_)) => return None,
42 Err(EventParseError::EofError) => return None,
43 Err(e) => return Some(Err(e)),
44 },
45 None => return None,
46 };
47 if event.type_code() == TypeCode::RotateEvent {
48 self.offset = None;
49 } else {
50 self.offset = Some(event.next_position());
51 }
52 Some(Ok(event))
53 }
54}
55
56impl BinlogFile<File> {
57 pub fn try_from_path<R: AsRef<Path>>(path: R) -> Result<Self, BinlogParseError> {
61 let p = path.as_ref();
62 let fh = File::open(p).map_err(BinlogParseError::OpenError)?;
63 Self::try_new_from_reader_name(fh, Some(p.to_owned()))
64 }
65}
66
67impl<I: Seek + Read> BinlogFile<I> {
68 pub fn try_from_reader(reader: I) -> Result<Self, BinlogParseError> {
69 Self::try_new_from_reader_name(reader, None)
70 }
71
72 fn try_new_from_reader_name(
73 mut fh: I,
74 name: Option<PathBuf>,
75 ) -> Result<Self, BinlogParseError> {
76 let mut magic = [0u8; 4];
78 fh.read_exact(&mut magic)?;
79 if magic != [0xfeu8, 0x62, 0x69, 0x6e] {
80 return Err(BinlogParseError::BadMagic(magic).into());
81 }
82 let fde = Event::read(&mut fh, 4)?;
83 if fde.inner(None)?.is_some() {
84 } else {
86 return Err(BinlogParseError::BadFirstRecord.into());
87 }
88 Ok(BinlogFile {
89 file_name: name,
90 file: fh,
91 first_event_offset: fde.next_position(),
92 })
93 }
94
95 fn read_at(&mut self, offset: u64) -> Result<Event, EventParseError> {
96 self.file.seek(io::SeekFrom::Start(offset))?;
97 Event::read(&mut self.file, offset).map_err(|i| i.into())
98 }
99
100 pub fn events(self, offset: Option<u64>) -> BinlogEvents<I> {
103 let offset = offset.unwrap_or(self.first_event_offset);
104 BinlogEvents::new(self, offset)
105 }
106
107 pub fn file_name(&self) -> Option<&Path> {
108 self.file_name.as_ref().map(|a| a.as_ref())
109 }
110}