use std::{cmp::min, fmt, io};
use bitvec::prelude::*;
use bytes::BufMut;
use saturating::Saturating as S;
use crate::{
binlog::{
BinlogCtx,
consts::{BinlogVersion, EventType, RowsEventFlags},
row::BinlogRow,
},
io::ParseBuf,
misc::{
raw::{
RawBytes, RawFlags,
bytes::{BareBytes, EofBytes},
int::*,
},
unexpected_buf_eof,
},
proto::{MyDeserialize, MySerialize},
};
use super::{BinlogEventHeader, TableMapEvent};
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct RowsEvent<'a> {
event_type: EventType,
table_id: RawInt<LeU48>,
flags: RawFlags<RowsEventFlags, LeU16>,
extra_data: RawBytes<'a, BareBytes<{ u16::MAX as usize - 2 }>>,
num_columns: RawInt<LenEnc>,
columns_before_image: Option<RawBytes<'a, BareBytes<0x2000000000000000>>>,
columns_after_image: Option<RawBytes<'a, BareBytes<0x2000000000000000>>>,
rows_data: RawBytes<'a, EofBytes>,
}
impl<'a> RowsEvent<'a> {
pub fn event_type(&self) -> EventType {
self.event_type
}
pub fn table_id(&self) -> u64 {
self.table_id.0
}
pub fn num_columns(&self) -> u64 {
self.num_columns.0
}
pub fn columns_before_image(&'a self) -> Option<&'a BitSlice<u8>> {
match self.columns_before_image {
Some(ref bytes) => {
let slice = BitSlice::from_slice(bytes.as_bytes());
Some(&slice[..self.num_columns() as usize])
}
None => None,
}
}
pub fn columns_after_image(&'a self) -> Option<&'a BitSlice<u8>> {
match self.columns_after_image {
Some(ref bytes) => {
let slice = BitSlice::from_slice(bytes.as_bytes());
Some(&slice[..self.num_columns() as usize])
}
None => None,
}
}
pub fn rows_data(&'a self) -> &'a [u8] {
self.rows_data.as_bytes()
}
pub fn flags(&self) -> RowsEventFlags {
self.flags.get()
}
pub fn flags_raw(&self) -> u16 {
self.flags.0
}
pub fn len(&self, _version: BinlogVersion) -> usize {
let mut len = S(0);
len += S(6); len += S(2); len += S(2); len += S(min(self.extra_data.len(), u16::MAX as usize - 2)); len += S(crate::misc::lenenc_int_len(self.num_columns()) as usize); let bitmap_len = self.num_columns().div_ceil(8) as usize;
if self.columns_before_image.is_some() {
len += S(bitmap_len); }
if self.columns_after_image.is_some() {
len += S(bitmap_len); }
len += S(self.rows_data.len());
min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
}
pub fn rows<'b>(&'b self, table_map_event: &'b TableMapEvent<'b>) -> RowsEventRows<'b> {
RowsEventRows {
rows_event: self,
table_map_event,
rows_data: ParseBuf(self.rows_data.as_bytes()),
}
}
pub fn into_owned(self) -> RowsEvent<'static> {
RowsEvent {
event_type: self.event_type,
table_id: self.table_id,
flags: self.flags,
extra_data: self.extra_data.into_owned(),
num_columns: self.num_columns,
columns_before_image: self.columns_before_image.map(|x| x.into_owned()),
columns_after_image: self.columns_after_image.map(|x| x.into_owned()),
rows_data: self.rows_data.into_owned(),
}
}
}
pub struct RowsEventCtx<'a> {
pub event_type: EventType,
pub binlog_ctx: BinlogCtx<'a>,
}
impl<'de> MyDeserialize<'de> for RowsEvent<'de> {
const SIZE: Option<usize> = None;
type Ctx = RowsEventCtx<'de>;
fn deserialize(ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
let post_header_len = ctx
.binlog_ctx
.fde
.get_event_type_header_length(ctx.event_type);
let is_delete_event = ctx.event_type == EventType::DELETE_ROWS_EVENT
|| ctx.event_type == EventType::DELETE_ROWS_EVENT_V1;
let is_update_event = ctx.event_type == EventType::UPDATE_ROWS_EVENT
|| ctx.event_type == EventType::UPDATE_ROWS_EVENT_V1
|| ctx.event_type == EventType::PARTIAL_UPDATE_ROWS_EVENT;
let table_id = if post_header_len == 6 {
let value = buf.parse::<RawInt<LeU32>>(())?;
RawInt::new(value.0 as u64)
} else {
buf.parse(())?
};
let flags = buf.parse(())?;
let extra_data = if post_header_len
== ctx
.binlog_ctx
.fde
.get_event_type_header_length(EventType::WRITE_ROWS_EVENT)
{
let extra_data_len = buf.checked_eat_u16_le().ok_or_else(unexpected_buf_eof)? as usize;
buf.parse(extra_data_len.saturating_sub(2))?
} else {
RawBytes::new(&[][..])
};
let num_columns: RawInt<LenEnc> = buf.parse(())?;
let bitmap_len = num_columns.0.div_ceil(8) as usize;
let mut columns_before_image = None;
let mut columns_after_image = None;
if is_update_event {
columns_before_image = Some(buf.parse(bitmap_len)?);
columns_after_image = Some(buf.parse(bitmap_len)?);
} else if is_delete_event {
columns_before_image = Some(buf.parse(bitmap_len)?);
} else {
columns_after_image = Some(buf.parse(bitmap_len)?);
}
let rows_data = buf.parse(())?;
Ok(Self {
event_type: ctx.event_type,
table_id,
flags,
extra_data,
num_columns,
columns_before_image,
columns_after_image,
rows_data,
})
}
}
impl MySerialize for RowsEvent<'_> {
fn serialize(&self, buf: &mut Vec<u8>) {
self.table_id.serialize(&mut *buf);
self.flags.serialize(&mut *buf);
if self.event_type == EventType::WRITE_ROWS_EVENT
|| self.event_type == EventType::UPDATE_ROWS_EVENT
|| self.event_type == EventType::DELETE_ROWS_EVENT
|| self.event_type == EventType::PARTIAL_UPDATE_ROWS_EVENT
{
let len = min(self.extra_data.len().saturating_add(2), u16::MAX as usize) as u16;
buf.put_u16_le(len);
self.extra_data.serialize(&mut *buf);
}
self.num_columns.serialize(&mut *buf);
if let Some(bitmap) = &self.columns_before_image {
bitmap.serialize(&mut *buf);
}
if let Some(bitmap) = &self.columns_after_image {
bitmap.serialize(&mut *buf);
}
self.rows_data.serialize(buf);
}
}
#[derive(Clone, Eq, PartialEq)]
pub struct RowsEventRows<'a> {
rows_event: &'a RowsEvent<'a>,
table_map_event: &'a TableMapEvent<'a>,
rows_data: ParseBuf<'a>,
}
impl<'a> RowsEventRows<'a> {
pub(crate) fn new(
rows_event: &'a RowsEvent<'a>,
table_map_event: &'a TableMapEvent<'a>,
rows_data: ParseBuf<'a>,
) -> Self {
Self {
rows_event,
table_map_event,
rows_data,
}
}
}
impl<'a> Iterator for RowsEventRows<'a> {
type Item = io::Result<(Option<BinlogRow>, Option<BinlogRow>)>;
fn next(&mut self) -> Option<Self::Item> {
let mut row_before = None;
let mut row_after = None;
if self.rows_data.is_empty() {
return None;
}
if let Some(cols) = self.rows_event.columns_before_image() {
let ctx = (
self.rows_event.num_columns(),
cols,
false,
self.table_map_event,
);
row_before = match self.rows_data.parse(ctx) {
Ok(row_before) => Some(row_before),
Err(err) => return Some(Err(err)),
};
}
if let Some(cols) = self.rows_event.columns_after_image() {
let ctx = (
self.rows_event.num_columns(),
cols,
self.rows_event.event_type == EventType::PARTIAL_UPDATE_ROWS_EVENT,
self.table_map_event,
);
row_after = match self.rows_data.parse(ctx) {
Ok(row_after) => Some(row_after),
Err(err) => return Some(Err(err)),
};
}
Some(Ok((row_before, row_after)))
}
}
impl fmt::Debug for RowsEventRows<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list().entries(self.clone()).finish()
}
}