mysql_binlog_connector_rust/
binlog_stream.rs

1use std::io::Cursor;
2
3use byteorder::ReadBytesExt;
4
5use crate::{
6    binlog_error::BinlogError,
7    binlog_parser::BinlogParser,
8    command::command_util::CommandUtil,
9    constants::MysqlRespCode,
10    event::{event_data::EventData, event_header::EventHeader},
11    network::packet_channel::PacketChannel,
12};
13
14pub struct BinlogStream {
15    pub channel: PacketChannel,
16    pub parser: BinlogParser,
17}
18
19impl BinlogStream {
20    pub async fn read(&mut self) -> Result<(EventHeader, EventData), BinlogError> {
21        let buf = self.channel.read().await?;
22        let mut cursor = Cursor::new(&buf);
23
24        if cursor.read_u8()? == MysqlRespCode::ERROR {
25            CommandUtil::parse_result(&buf)?;
26        }
27
28        // parse events, execute the callback
29        self.parser.next(&mut cursor)
30    }
31
32    pub async fn close(&mut self) -> Result<(), BinlogError> {
33        self.channel.close().await?;
34        Ok(())
35    }
36}