mysql/conn/
binlog_stream.rs

1// Copyright (c) 2021 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use mysql_common::{
10    binlog::{
11        consts::BinlogVersion::Version4,
12        events::{Event, TableMapEvent},
13        EventStreamReader,
14    },
15    io::ParseBuf,
16    packets::{ErrPacket, NetworkStreamTerminator, OkPacketDeserializer},
17};
18
19use crate::Conn;
20
21/// Binlog event stream.
22///
23/// Stream initialization is lazy, i.e. binlog won't be requested until this stream is polled.
24pub struct BinlogStream {
25    conn: Option<Conn>,
26    esr: EventStreamReader,
27}
28
29impl BinlogStream {
30    /// `conn` is a `Conn` with `request_binlog` executed on it.
31    pub(super) fn new(conn: Conn) -> Self {
32        BinlogStream {
33            conn: Some(conn),
34            esr: EventStreamReader::new(Version4),
35        }
36    }
37
38    /// Returns a table map event for the given table id.
39    pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> {
40        self.esr.get_tme(table_id)
41    }
42}
43
44impl Iterator for BinlogStream {
45    type Item = crate::Result<Event>;
46
47    fn next(&mut self) -> Option<Self::Item> {
48        let conn = self.conn.as_mut()?;
49
50        let packet = match conn.read_packet() {
51            Ok(packet) => packet,
52            Err(err) => {
53                self.conn = None;
54                return Some(Err(err));
55            }
56        };
57
58        let first_byte = packet.get(0).copied();
59
60        if first_byte == Some(255) {
61            if let Ok(ErrPacket::Error(err)) = ParseBuf(&*packet).parse(conn.0.capability_flags) {
62                self.conn = None;
63                return Some(Err(crate::Error::MySqlError(From::from(err))));
64            }
65        }
66
67        if first_byte == Some(254) && packet.len() < 8 {
68            if ParseBuf(&*packet)
69                .parse::<OkPacketDeserializer<NetworkStreamTerminator>>(conn.0.capability_flags)
70                .is_ok()
71            {
72                self.conn = None;
73                return None;
74            }
75        }
76
77        if first_byte == Some(0) {
78            let event_data = &packet[1..];
79            match self.esr.read(event_data) {
80                Ok(event) => {
81                    return Some(Ok(event));
82                }
83                Err(err) => return Some(Err(err.into())),
84            }
85        } else {
86            self.conn = None;
87            return Some(Err(crate::error::DriverError::UnexpectedPacket.into()));
88        }
89    }
90}