1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use mysql_common::{
binlog::{
consts::BinlogVersion::Version4,
events::{Event, TableMapEvent},
EventStreamReader,
},
io::ParseBuf,
packets::{ErrPacket, NetworkStreamTerminator, OkPacketDeserializer},
};
use crate::Conn;
pub struct BinlogStream {
conn: Option<Conn>,
esr: EventStreamReader,
}
impl BinlogStream {
pub(super) fn new(conn: Conn) -> Self {
BinlogStream {
conn: Some(conn),
esr: EventStreamReader::new(Version4),
}
}
pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> {
self.esr.get_tme(table_id)
}
}
impl Iterator for BinlogStream {
type Item = crate::Result<Event>;
fn next(&mut self) -> Option<Self::Item> {
let conn = self.conn.as_mut()?;
let packet = match conn.read_packet() {
Ok(packet) => packet,
Err(err) => {
self.conn = None;
return Some(Err(err));
}
};
let first_byte = packet.get(0).copied();
if first_byte == Some(255) {
if let Ok(ErrPacket::Error(err)) = ParseBuf(&*packet).parse(conn.0.capability_flags) {
self.conn = None;
return Some(Err(crate::Error::MySqlError(From::from(err))));
}
}
if first_byte == Some(254) && packet.len() < 8 {
if ParseBuf(&*packet)
.parse::<OkPacketDeserializer<NetworkStreamTerminator>>(conn.0.capability_flags)
.is_ok()
{
self.conn = None;
return None;
}
}
if first_byte == Some(0) {
let event_data = &packet[1..];
match self.esr.read(event_data) {
Ok(event) => {
return Some(Ok(event));
}
Err(err) => return Some(Err(err.into())),
}
} else {
self.conn = None;
return Some(Err(crate::error::DriverError::UnexpectedPacket.into()));
}
}
}