mysql/conn/
binlog_stream.rs1use mysql_common::{
10 binlog::{
11 consts::BinlogVersion::Version4,
12 events::{Event, TableMapEvent},
13 EventStreamReader,
14 },
15 io::ParseBuf,
16 packets::{ErrPacket, NetworkStreamTerminator, OkPacketDeserializer},
17};
18
19use crate::Conn;
20
21pub struct BinlogStream {
25 conn: Option<Conn>,
26 esr: EventStreamReader,
27}
28
29impl BinlogStream {
30 pub(super) fn new(conn: Conn) -> Self {
32 BinlogStream {
33 conn: Some(conn),
34 esr: EventStreamReader::new(Version4),
35 }
36 }
37
38 pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> {
40 self.esr.get_tme(table_id)
41 }
42}
43
44impl Iterator for BinlogStream {
45 type Item = crate::Result<Event>;
46
47 fn next(&mut self) -> Option<Self::Item> {
48 let conn = self.conn.as_mut()?;
49
50 let packet = match conn.read_packet() {
51 Ok(packet) => packet,
52 Err(err) => {
53 self.conn = None;
54 return Some(Err(err));
55 }
56 };
57
58 let first_byte = packet.get(0).copied();
59
60 if first_byte == Some(255) {
61 if let Ok(ErrPacket::Error(err)) = ParseBuf(&*packet).parse(conn.0.capability_flags) {
62 self.conn = None;
63 return Some(Err(crate::Error::MySqlError(From::from(err))));
64 }
65 }
66
67 if first_byte == Some(254) && packet.len() < 8 {
68 if ParseBuf(&*packet)
69 .parse::<OkPacketDeserializer<NetworkStreamTerminator>>(conn.0.capability_flags)
70 .is_ok()
71 {
72 self.conn = None;
73 return None;
74 }
75 }
76
77 if first_byte == Some(0) {
78 let event_data = &packet[1..];
79 match self.esr.read(event_data) {
80 Ok(event) => {
81 return Some(Ok(event));
82 }
83 Err(err) => return Some(Err(err.into())),
84 }
85 } else {
86 self.conn = None;
87 return Some(Err(crate::error::DriverError::UnexpectedPacket.into()));
88 }
89 }
90}