mysql_async/conn/binlog_stream/
mod.rs1use futures_core::ready;
10use mysql_common::{
11 binlog::{
12 consts::{BinlogVersion::Version4, EventType},
13 events::{Event, TableMapEvent, TransactionPayloadEvent},
14 EventStreamReader,
15 },
16 io::ParseBuf,
17 packets::{ComRegisterSlave, ErrPacket, NetworkStreamTerminator, OkPacketDeserializer},
18};
19
20use std::{
21 future::Future,
22 io::{Cursor, ErrorKind},
23 pin::Pin,
24 task::{Context, Poll},
25};
26
27use crate::{connection_like::Connection, queryable::Queryable};
28use crate::{error::DriverError, io::ReadPacket, Conn, Error, IoError, Result};
29
30use self::request::BinlogStreamRequest;
31
32pub mod request;
33
34impl super::Conn {
35 pub async fn get_binlog_stream(
40 mut self,
41 request: BinlogStreamRequest<'_>,
42 ) -> Result<BinlogStream> {
43 self.request_binlog(request).await?;
44
45 Ok(BinlogStream::new(self))
46 }
47
48 async fn register_as_slave(
49 &mut self,
50 com_register_slave: ComRegisterSlave<'_>,
51 ) -> crate::Result<()> {
52 self.query_drop("SET @master_binlog_checksum='ALL'").await?;
53 self.write_command(&com_register_slave).await?;
54
55 self.read_packet().await?;
57
58 Ok(())
59 }
60
61 async fn request_binlog(&mut self, request: BinlogStreamRequest<'_>) -> crate::Result<()> {
62 self.register_as_slave(request.register_slave).await?;
63 self.write_command(&request.binlog_request.as_cmd()).await?;
64 Ok(())
65 }
66}
67
68pub struct BinlogStream {
72 read_packet: ReadPacket<'static, 'static>,
73 esr: EventStreamReader,
74 tpe: Option<Cursor<Vec<u8>>>,
77}
78
79impl BinlogStream {
80 pub(super) fn new(conn: Conn) -> Self {
82 BinlogStream {
83 read_packet: ReadPacket::new(conn),
84 esr: EventStreamReader::new(Version4),
85 tpe: None,
86 }
87 }
88
89 pub fn get_tme(&self, table_id: u64) -> Option<&TableMapEvent<'static>> {
91 self.esr.get_tme(table_id)
92 }
93
94 pub async fn close(self) -> Result<()> {
97 match self.read_packet.0 {
98 Connection::Conn(conn) => {
101 if let Err(Error::Io(IoError::Io(ref error))) = conn.close_conn().await {
102 if error.kind() == ErrorKind::BrokenPipe {
105 return Ok(());
106 }
107 }
108 }
109 Connection::ConnMut(_) => {}
110 Connection::Tx(_) => {}
111 }
112
113 Ok(())
114 }
115}
116
117impl futures_core::stream::Stream for BinlogStream {
118 type Item = Result<Event>;
119
120 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
121 {
122 let Self {
123 ref mut tpe,
124 ref mut esr,
125 ..
126 } = *self;
127
128 if let Some(tpe) = tpe.as_mut() {
129 match esr.read_decompressed(tpe) {
130 Ok(Some(event)) => return Poll::Ready(Some(Ok(event))),
131 Ok(None) => self.tpe = None,
132 Err(err) => return Poll::Ready(Some(Err(err.into()))),
133 }
134 }
135 }
136
137 let packet = match ready!(Pin::new(&mut self.read_packet).poll(cx)) {
138 Ok(packet) => packet,
139 Err(err) => return Poll::Ready(Some(Err(err.into()))),
140 };
141
142 let first_byte = packet.first().copied();
143
144 if first_byte == Some(255) {
145 if let Ok(ErrPacket::Error(err)) =
146 ParseBuf(&packet).parse(self.read_packet.conn_ref().capabilities())
147 {
148 return Poll::Ready(Some(Err(From::from(err))));
149 }
150 }
151
152 if first_byte == Some(254)
153 && packet.len() < 8
154 && ParseBuf(&packet)
155 .parse::<OkPacketDeserializer<NetworkStreamTerminator>>(
156 self.read_packet.conn_ref().capabilities(),
157 )
158 .is_ok()
159 {
160 return Poll::Ready(None);
161 }
162
163 if first_byte == Some(0) {
164 let event_data = &packet[1..];
165 match self.esr.read(event_data) {
166 Ok(Some(event)) => {
167 if event.header().event_type_raw() == EventType::TRANSACTION_PAYLOAD_EVENT as u8
168 {
169 match event.read_event::<TransactionPayloadEvent<'_>>() {
170 Ok(e) => self.tpe = Some(Cursor::new(e.danger_decompress())),
171 Err(_) => (),
172 }
173 }
174 Poll::Ready(Some(Ok(event)))
175 }
176 Ok(None) => Poll::Ready(None),
177 Err(err) => Poll::Ready(Some(Err(err.into()))),
178 }
179 } else {
180 Poll::Ready(Some(Err(DriverError::UnexpectedPacket {
181 payload: packet.to_vec(),
182 }
183 .into())))
184 }
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use std::time::Duration;
191
192 use futures_util::StreamExt;
193 use mysql_common::binlog::events::EventData;
194 use tokio::time::timeout;
195
196 use crate::prelude::*;
197 use crate::{test_misc::get_opts, *};
198
199 async fn gen_dummy_data(conn: &mut Conn) -> super::Result<()> {
200 "CREATE TABLE IF NOT EXISTS customers (customer_id int not null)"
201 .ignore(&mut *conn)
202 .await?;
203
204 let mut tx = conn.start_transaction(Default::default()).await?;
205 for i in 0_u8..100 {
206 "INSERT INTO customers(customer_id) VALUES (?)"
207 .with((i,))
208 .ignore(&mut tx)
209 .await?;
210 }
211 tx.commit().await?;
212
213 "DROP TABLE customers".ignore(conn).await?;
214
215 Ok(())
216 }
217
218 async fn create_binlog_stream_conn(pool: Option<&Pool>) -> super::Result<(Conn, Vec<u8>, u64)> {
219 let mut conn = match pool {
220 None => Conn::new(get_opts()).await.unwrap(),
221 Some(pool) => pool.get_conn().await.unwrap(),
222 };
223
224 if conn.server_version() >= (8, 0, 31) && conn.server_version() < (9, 0, 0) {
225 let _ = "SET binlog_transaction_compression=ON"
226 .ignore(&mut conn)
227 .await;
228 }
229
230 if let Ok(Some(gtid_mode)) = "SELECT @@GLOBAL.GTID_MODE"
231 .first::<String, _>(&mut conn)
232 .await
233 {
234 if !gtid_mode.starts_with("ON") {
235 panic!(
236 "GTID_MODE is disabled \
237 (enable using --gtid_mode=ON --enforce_gtid_consistency=ON)"
238 );
239 }
240 }
241
242 let row: crate::Row = "SHOW BINARY LOGS".first(&mut conn).await.unwrap().unwrap();
243 let filename = row.get(0).unwrap();
244 let position = row.get(1).unwrap();
245
246 gen_dummy_data(&mut conn).await.unwrap();
247 Ok((conn, filename, position))
248 }
249
250 #[tokio::test]
251 async fn should_read_binlog() -> super::Result<()> {
252 read_binlog_streams_and_close_their_connections(None, (12, 13, 14))
253 .await
254 .unwrap();
255
256 let pool = Pool::new(get_opts());
257 read_binlog_streams_and_close_their_connections(Some(&pool), (15, 16, 17))
258 .await
259 .unwrap();
260
261 timeout(Duration::from_secs(10), pool.disconnect())
264 .await
265 .unwrap()
266 .unwrap();
267
268 Ok(())
269 }
270
271 async fn read_binlog_streams_and_close_their_connections(
272 pool: Option<&Pool>,
273 binlog_server_ids: (u32, u32, u32),
274 ) -> super::Result<()> {
275 let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();
277 let is_mariadb = conn.inner.is_mariadb;
278
279 let mut binlog_stream = conn
280 .get_binlog_stream(
281 BinlogStreamRequest::new(binlog_server_ids.0)
282 .with_filename(&filename)
283 .with_pos(pos),
284 )
285 .await
286 .unwrap();
287
288 let mut events_num = 0;
289 while let Ok(Some(event)) = timeout(Duration::from_secs(10), binlog_stream.next()).await {
290 let event = event.unwrap();
291 events_num += 1;
292
293 event.header().event_type().unwrap();
295
296 if let EventData::RowsEvent(re) = event.read_data()?.unwrap() {
298 let tme = binlog_stream.get_tme(re.table_id());
299 for row in re.rows(tme.unwrap()) {
300 row.unwrap();
301 }
302 }
303 }
304 assert!(events_num > 0);
305 timeout(Duration::from_secs(10), binlog_stream.close())
306 .await
307 .unwrap()
308 .unwrap();
309
310 if !is_mariadb {
311 let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();
313
314 let mut binlog_stream = conn
315 .get_binlog_stream(
316 BinlogStreamRequest::new(binlog_server_ids.1)
317 .with_gtid()
318 .with_filename(&filename)
319 .with_pos(pos),
320 )
321 .await
322 .unwrap();
323
324 events_num = 0;
325 while let Ok(Some(event)) = timeout(Duration::from_secs(10), binlog_stream.next()).await
326 {
327 let event = event.unwrap();
328 events_num += 1;
329
330 event.header().event_type().unwrap();
332
333 if let EventData::RowsEvent(re) = event.read_data()?.unwrap() {
335 let tme = binlog_stream.get_tme(re.table_id());
336 for row in re.rows(tme.unwrap()) {
337 row.unwrap();
338 }
339 }
340 }
341 assert!(events_num > 0);
342 timeout(Duration::from_secs(10), binlog_stream.close())
343 .await
344 .unwrap()
345 .unwrap();
346 }
347
348 let (conn, filename, pos) = create_binlog_stream_conn(pool).await.unwrap();
350
351 let mut binlog_stream = conn
352 .get_binlog_stream(
353 BinlogStreamRequest::new(binlog_server_ids.2)
354 .with_filename(&filename)
355 .with_pos(pos)
356 .with_non_blocking(),
357 )
358 .await
359 .unwrap();
360
361 events_num = 0;
362 while let Some(event) = binlog_stream.next().await {
363 let event = event.unwrap();
364 events_num += 1;
365 event.header().event_type().unwrap();
366 event.read_data().unwrap();
367 }
368 assert!(events_num > 0);
369 timeout(Duration::from_secs(10), binlog_stream.close())
370 .await
371 .unwrap()
372 .unwrap();
373
374 Ok(())
375 }
376}