mysql_async/conn/
binlog_stream.rs

1// Copyright (c) 2020 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use futures_core::ready;
10use mysql_common::{
11    binlog::{
12        consts::BinlogVersion::Version4,
13        events::{Event, TableMapEvent},
14        EventStreamReader,
15    },
16    io::ParseBuf,
17    packets::{ErrPacket, NetworkStreamTerminator, OkPacketDeserializer},
18};
19
20use std::{
21    future::Future,
22    io::ErrorKind,
23    pin::Pin,
24    task::{Context, Poll},
25};
26
27use crate::connection_like::Connection;
28use crate::{error::DriverError, io::ReadPacket, Conn, Error, IoError, Result};
29
30/// Binlog event stream.
31///
32/// Stream initialization is lazy, i.e. binlog won't be requested until this stream is polled.
33pub struct BinlogStream {
34    read_packet: ReadPacket<'static, 'static>,
35    esr: EventStreamReader,
36}
37
38impl BinlogStream {
39    /// `conn` is a `Conn` with `request_binlog` executed on it.
40    pub(super) fn new(conn: Conn) -> Self {
41        BinlogStream {
42            read_packet: ReadPacket::new(conn),
43            esr: EventStreamReader::new(Version4),
44        }
45    }
46
47    /// Returns a table map event for the given table id.
48    pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> {
49        self.esr.get_tme(table_id)
50    }
51
52    /// Closes the stream's `Conn`. Additionally, the connection is dropped, so its associated
53    /// pool (if any) will regain a connection slot.
54    pub async fn close(self) -> Result<()> {
55        match self.read_packet.0 {
56            // `close_conn` requires ownership of `Conn`. That's okay, because
57            // `BinLogStream`'s connection is always owned.
58            Connection::Conn(conn) => {
59                if let Err(Error::Io(IoError::Io(ref error))) = conn.close_conn().await {
60                    // If the binlog was requested with the flag BINLOG_DUMP_NON_BLOCK,
61                    // the connection's file handler will already have been closed (EOF).
62                    if error.kind() == ErrorKind::BrokenPipe {
63                        return Ok(());
64                    }
65                }
66            }
67            Connection::ConnMut(_) => {}
68            Connection::Tx(_) => {}
69        }
70
71        Ok(())
72    }
73}
74
75impl futures_core::stream::Stream for BinlogStream {
76    type Item = Result<Event>;
77
78    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79        let packet = match ready!(Pin::new(&mut self.read_packet).poll(cx)) {
80            Ok(packet) => packet,
81            Err(err) => return Poll::Ready(Some(Err(err.into()))),
82        };
83
84        let first_byte = packet.get(0).copied();
85
86        if first_byte == Some(255) {
87            if let Ok(ErrPacket::Error(err)) =
88                ParseBuf(&*packet).parse(self.read_packet.conn_ref().capabilities())
89            {
90                return Poll::Ready(Some(Err(From::from(err))));
91            }
92        }
93
94        if first_byte == Some(254) && packet.len() < 8 {
95            if ParseBuf(&*packet)
96                .parse::<OkPacketDeserializer<NetworkStreamTerminator>>(
97                    self.read_packet.conn_ref().capabilities(),
98                )
99                .is_ok()
100            {
101                return Poll::Ready(None);
102            }
103        }
104
105        if first_byte == Some(0) {
106            let event_data = &packet[1..];
107            match self.esr.read(event_data) {
108                Ok(event) => {
109                    return Poll::Ready(Some(Ok(event)));
110                }
111                Err(err) => return Poll::Ready(Some(Err(err.into()))),
112            }
113        } else {
114            return Poll::Ready(Some(Err(DriverError::UnexpectedPacket {
115                payload: packet.to_vec(),
116            }
117            .into())));
118        }
119    }
120}