mysql_async/conn/binlog_stream/
mod.rs

1// Copyright (c) 2020 Anatoly Ikorsky
2//
3// Licensed under the Apache License, Version 2.0
4// <LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0> or the MIT
5// license <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6// option. All files in the project carrying such notice may not be copied,
7// modified, or distributed except according to those terms.
8
9use futures_core::ready;
10use mysql_common::{
11    binlog::{
12        consts::{BinlogVersion::Version4, EventType},
13        events::{Event, TableMapEvent, TransactionPayloadEvent},
14        EventStreamReader,
15    },
16    io::ParseBuf,
17    packets::{ComRegisterSlave, ErrPacket, NetworkStreamTerminator, OkPacketDeserializer},
18};
19
20use std::{
21    future::Future,
22    io::{Cursor, ErrorKind},
23    pin::Pin,
24    task::{Context, Poll},
25};
26
27use crate::{connection_like::Connection, queryable::Queryable};
28use crate::{error::DriverError, io::ReadPacket, Conn, Error, IoError, Result};
29
30use self::request::BinlogStreamRequest;
31
32pub mod request;
33
34impl super::Conn {
35    /// Turns this connection into a binlog stream.
36    ///
37    /// You can use SHOW BINARY LOGS to get the current logfile and position from the master.
38    /// If the request’s filename is empty, the server will send the binlog-stream of the first known binlog.
39    pub async fn get_binlog_stream(
40        mut self,
41        request: BinlogStreamRequest<'_>,
42    ) -> Result<BinlogStream> {
43        self.request_binlog(request).await?;
44
45        Ok(BinlogStream::new(self))
46    }
47
48    async fn register_as_slave(
49        &mut self,
50        com_register_slave: ComRegisterSlave<'_>,
51    ) -> crate::Result<()> {
52        self.query_drop("SET @master_binlog_checksum='ALL'").await?;
53        self.write_command(&com_register_slave).await?;
54
55        // Server will respond with OK.
56        self.read_packet().await?;
57
58        Ok(())
59    }
60
61    async fn request_binlog(&mut self, request: BinlogStreamRequest<'_>) -> crate::Result<()> {
62        self.register_as_slave(request.register_slave).await?;
63        self.write_command(&request.binlog_request.as_cmd()).await?;
64        Ok(())
65    }
66}
67
68/// Binlog event stream.
69///
70/// Stream initialization is lazy, i.e. binlog won't be requested until this stream is polled.
71pub struct BinlogStream {
72    read_packet: ReadPacket<'static, 'static>,
73    esr: EventStreamReader,
74    // TODO: Use 'static reader here (requires impl on the mysql_common side).
75    /// Uncompressed Transaction_payload_event we are iterating over (if any).
76    tpe: Option<Cursor<Vec<u8>>>,
77}
78
79impl BinlogStream {
80    /// `conn` is a `Conn` with `request_binlog` executed on it.
81    pub(super) fn new(conn: Conn) -> Self {
82        BinlogStream {
83            read_packet: ReadPacket::new(conn),
84            esr: EventStreamReader::new(Version4),
85            tpe: None,
86        }
87    }
88
89    /// Returns a table map event for the given table id.
90    pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> {
91        self.esr.get_tme(table_id)
92    }
93
94    /// Closes the stream's `Conn`. Additionally, the connection is dropped, so its associated
95    /// pool (if any) will regain a connection slot.
96    pub async fn close(self) -> Result<()> {
97        match self.read_packet.0 {
98            // `close_conn` requires ownership of `Conn`. That's okay, because
99            // `BinLogStream`'s connection is always owned.
100            Connection::Conn(conn) => {
101                if let Err(Error::Io(IoError::Io(ref error))) = conn.close_conn().await {
102                    // If the binlog was requested with the flag BINLOG_DUMP_NON_BLOCK,
103                    // the connection's file handler will already have been closed (EOF).
104                    if error.kind() == ErrorKind::BrokenPipe {
105                        return Ok(());
106                    }
107                }
108            }
109            Connection::ConnMut(_) => {}
110            Connection::Tx(_) => {}
111        }
112
113        Ok(())
114    }
115}
116
117impl futures_core::stream::Stream for BinlogStream {
118    type Item = Result<Event>;
119
120    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121        {
122            let Self {
123                ref mut tpe,
124                ref mut esr,
125                ..
126            } = *self;
127
128            if let Some(tpe) = tpe.as_mut() {
129                match esr.read_decompressed(tpe) {
130                    Ok(Some(event)) => return Poll::Ready(Some(Ok(event))),
131                    Ok(None) => self.tpe = None,
132                    Err(err) => return Poll::Ready(Some(Err(err.into()))),
133                }
134            }
135        }
136
137        let packet = match ready!(Pin::new(&mut self.read_packet).poll(cx)) {
138            Ok(packet) => packet,
139            Err(err) => return Poll::Ready(Some(Err(err.into()))),
140        };
141
142        let first_byte = packet.first().copied();
143
144        if first_byte == Some(255) {
145            if let Ok(ErrPacket::Error(err)) =
146                ParseBuf(&packet).parse(self.read_packet.conn_ref().capabilities())
147            {
148                return Poll::Ready(Some(Err(From::from(err))));
149            }
150        }
151
152        if first_byte == Some(254)
153            && packet.len() < 8
154            && ParseBuf(&packet)
155                .parse::<OkPacketDeserializer<NetworkStreamTerminator>>(
156                    self.read_packet.conn_ref().capabilities(),
157                )
158                .is_ok()
159        {
160            return Poll::Ready(None);
161        }
162
163        if first_byte == Some(0) {
164            let event_data = &packet[1..];
165            match self.esr.read(event_data) {
166                Ok(Some(event)) => {
167                    if event.header().event_type_raw() == EventType::TRANSACTION_PAYLOAD_EVENT as u8
168                    {
169                        match event.read_event::<TransactionPayloadEvent<'_>>() {
170                            Ok(e) => self.tpe = Some(Cursor::new(e.danger_decompress())),
171                            Err(_) => (/* TODO: Log the error */),
172                        }
173                    }
174                    Poll::Ready(Some(Ok(event)))
175                }
176                Ok(None) => Poll::Ready(None),
177                Err(err) => Poll::Ready(Some(Err(err.into()))),
178            }
179        } else {
180            Poll::Ready(Some(Err(DriverError::UnexpectedPacket {
181                payload: packet.to_vec(),
182            }
183            .into())))
184        }
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use std::time::Duration;
191
192    use futures_util::StreamExt;
193    use mysql_common::binlog::events::EventData;
194    use tokio::time::timeout;
195
196    use crate::prelude::*;
197    use crate::{test_misc::get_opts, *};
198
199    async fn gen_dummy_data(conn: &mut Conn) -> super::Result<()> {
200        "CREATE TABLE IF NOT EXISTS customers (customer_id int not null)"
201            .ignore(&mut *conn)
202            .await?;
203
204        let mut tx = conn.start_transaction(Default::default()).await?;
205        for i in 0_u8..100 {
206            "INSERT INTO customers(customer_id) VALUES (?)"
207                .with((i,))
208                .ignore(&mut tx)
209                .await?;
210        }
211        tx.commit().await?;
212
213        "DROP TABLE customers".ignore(conn).await?;
214
215        Ok(())
216    }
217
218    async fn create_binlog_stream_conn(pool: Option<&Pool>) -> super::Result<(Conn, Vec<u8>, u64)> {
219        let mut conn = match pool {
220            None => Conn::new(get_opts()).await.unwrap(),
221            Some(pool) => pool.get_conn().await.unwrap(),
222        };
223
224        if conn.server_version() >= (8, 0, 31) && conn.server_version() < (9, 0, 0) {
225            let _ = "SET binlog_transaction_compression=ON"
226                .ignore(&mut conn)
227                .await;
228        }
229
230        if let Ok(Some(gtid_mode)) = "SELECT @@GLOBAL.GTID_MODE"
231            .first::<String, _>(&mut conn)
232            .await
233        {
234            if !gtid_mode.starts_with("ON") {
235                panic!(
236                    "GTID_MODE is disabled \
237                        (enable using --gtid_mode=ON --enforce_gtid_consistency=ON)"
238                );
239            }
240        }
241
242        let row: crate::Row = "SHOW BINARY LOGS".first(&mut conn).await.unwrap().unwrap();
243        let filename = row.get(0).unwrap();
244        let position = row.get(1).unwrap();
245
246        gen_dummy_data(&mut conn).await.unwrap();
247        Ok((conn, filename, position))
248    }
249
250    #[tokio::test]
251    async fn should_read_binlog() -> super::Result<()> {
252        read_binlog_streams_and_close_their_connections(None, (12, 13, 14))
253            .await
254            .unwrap();
255
256        let pool = Pool::new(get_opts());
257        read_binlog_streams_and_close_their_connections(Some(&pool), (15, 16, 17))
258            .await
259            .unwrap();
260
261        // Disconnecting the pool verifies that closing the binlog connections
262        // left the pool in a sane state.
263        timeout(Duration::from_secs(10), pool.disconnect())
264            .await
265            .unwrap()
266            .unwrap();
267
268        Ok(())
269    }
270
271    async fn read_binlog_streams_and_close_their_connections(
272        pool: Option<&Pool>,
273        binlog_server_ids: (u32, u32, u32),
274    ) -> super::Result<()> {
275        // iterate using COM_BINLOG_DUMP
276        let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();
277        let is_mariadb = conn.inner.is_mariadb;
278
279        let mut binlog_stream = conn
280            .get_binlog_stream(
281                BinlogStreamRequest::new(binlog_server_ids.0)
282                    .with_filename(&filename)
283                    .with_pos(pos),
284            )
285            .await
286            .unwrap();
287
288        let mut events_num = 0;
289        while let Ok(Some(event)) = timeout(Duration::from_secs(10), binlog_stream.next()).await {
290            let event = event.unwrap();
291            events_num += 1;
292
293            // assert that event type is known
294            event.header().event_type().unwrap();
295
296            // iterate over rows of an event
297            if let EventData::RowsEvent(re) = event.read_data()?.unwrap() {
298                let tme = binlog_stream.get_tme(re.table_id());
299                for row in re.rows(tme.unwrap()) {
300                    row.unwrap();
301                }
302            }
303        }
304        assert!(events_num > 0);
305        timeout(Duration::from_secs(10), binlog_stream.close())
306            .await
307            .unwrap()
308            .unwrap();
309
310        if !is_mariadb {
311            // iterate using COM_BINLOG_DUMP_GTID
312            let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();
313
314            let mut binlog_stream = conn
315                .get_binlog_stream(
316                    BinlogStreamRequest::new(binlog_server_ids.1)
317                        .with_gtid()
318                        .with_filename(&filename)
319                        .with_pos(pos),
320                )
321                .await
322                .unwrap();
323
324            events_num = 0;
325            while let Ok(Some(event)) = timeout(Duration::from_secs(10), binlog_stream.next()).await
326            {
327                let event = event.unwrap();
328                events_num += 1;
329
330                // assert that event type is known
331                event.header().event_type().unwrap();
332
333                // iterate over rows of an event
334                if let EventData::RowsEvent(re) = event.read_data()?.unwrap() {
335                    let tme = binlog_stream.get_tme(re.table_id());
336                    for row in re.rows(tme.unwrap()) {
337                        row.unwrap();
338                    }
339                }
340            }
341            assert!(events_num > 0);
342            timeout(Duration::from_secs(10), binlog_stream.close())
343                .await
344                .unwrap()
345                .unwrap();
346        }
347
348        // iterate using COM_BINLOG_DUMP with BINLOG_DUMP_NON_BLOCK flag
349        let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();
350
351        let mut binlog_stream = conn
352            .get_binlog_stream(
353                BinlogStreamRequest::new(binlog_server_ids.2)
354                    .with_filename(&filename)
355                    .with_pos(pos)
356                    .with_non_blocking(),
357            )
358            .await
359            .unwrap();
360
361        events_num = 0;
362        while let Some(event) = binlog_stream.next().await {
363            let event = event.unwrap();
364            events_num += 1;
365            event.header().event_type().unwrap();
366            event.read_data().unwrap();
367        }
368        assert!(events_num > 0);
369        timeout(Duration::from_secs(10), binlog_stream.close())
370            .await
371            .unwrap()
372            .unwrap();
373
374        Ok(())
375    }
376}