mysql_async/conn/
binlog_stream.rs1use futures_core::ready;
10use mysql_common::{
11 binlog::{
12 consts::BinlogVersion::Version4,
13 events::{Event, TableMapEvent},
14 EventStreamReader,
15 },
16 io::ParseBuf,
17 packets::{ErrPacket, NetworkStreamTerminator, OkPacketDeserializer},
18};
19
20use std::{
21 future::Future,
22 io::ErrorKind,
23 pin::Pin,
24 task::{Context, Poll},
25};
26
27use crate::connection_like::Connection;
28use crate::{error::DriverError, io::ReadPacket, Conn, Error, IoError, Result};
29
30pub struct BinlogStream {
34 read_packet: ReadPacket<'static, 'static>,
35 esr: EventStreamReader,
36}
37
38impl BinlogStream {
39 pub(super) fn new(conn: Conn) -> Self {
41 BinlogStream {
42 read_packet: ReadPacket::new(conn),
43 esr: EventStreamReader::new(Version4),
44 }
45 }
46
47 pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> {
49 self.esr.get_tme(table_id)
50 }
51
52 pub async fn close(self) -> Result<()> {
55 match self.read_packet.0 {
56 Connection::Conn(conn) => {
59 if let Err(Error::Io(IoError::Io(ref error))) = conn.close_conn().await {
60 if error.kind() == ErrorKind::BrokenPipe {
63 return Ok(());
64 }
65 }
66 }
67 Connection::ConnMut(_) => {}
68 Connection::Tx(_) => {}
69 }
70
71 Ok(())
72 }
73}
74
75impl futures_core::stream::Stream for BinlogStream {
76 type Item = Result<Event>;
77
78 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79 let packet = match ready!(Pin::new(&mut self.read_packet).poll(cx)) {
80 Ok(packet) => packet,
81 Err(err) => return Poll::Ready(Some(Err(err.into()))),
82 };
83
84 let first_byte = packet.get(0).copied();
85
86 if first_byte == Some(255) {
87 if let Ok(ErrPacket::Error(err)) =
88 ParseBuf(&*packet).parse(self.read_packet.conn_ref().capabilities())
89 {
90 return Poll::Ready(Some(Err(From::from(err))));
91 }
92 }
93
94 if first_byte == Some(254) && packet.len() < 8 {
95 if ParseBuf(&*packet)
96 .parse::<OkPacketDeserializer<NetworkStreamTerminator>>(
97 self.read_packet.conn_ref().capabilities(),
98 )
99 .is_ok()
100 {
101 return Poll::Ready(None);
102 }
103 }
104
105 if first_byte == Some(0) {
106 let event_data = &packet[1..];
107 match self.esr.read(event_data) {
108 Ok(event) => {
109 return Poll::Ready(Some(Ok(event)));
110 }
111 Err(err) => return Poll::Ready(Some(Err(err.into()))),
112 }
113 } else {
114 return Poll::Ready(Some(Err(DriverError::UnexpectedPacket {
115 payload: packet.to_vec(),
116 }
117 .into())));
118 }
119 }
120}