use std::{borrow::Cow, cmp::min, io};
use saturating::Saturating as S;
use crate::{
binlog::{
BinlogCtx, BinlogEvent, BinlogStruct,
consts::{BinlogVersion, EventType, Gno, GtidFlags},
},
io::ParseBuf,
misc::{
raw::{RawConst, RawFlags, int::*},
read_varlen_uint, varlen_uint_size, write_varlen_uint,
},
packets::Tag,
proto::{MyDeserialize, MySerialize},
};
use super::BinlogEventHeader;
define_const!(
ConstU8,
LogicalTimestampTypecode,
InvalidLogicalTimestampTypecode("Invalid logical timestamp typecode value for GTID event"),
2
);
mod field_id {
pub const GTID_FLAGS: u64 = 0;
pub const SID: u64 = 1;
pub const GNO: u64 = 2;
pub const TAG: u64 = 3;
pub const LAST_COMMITTED: u64 = 4;
pub const SEQUENCE_NUMBER: u64 = 5;
pub const IMMEDIATE_COMMIT_TIMESTAMP: u64 = 6;
pub const ORIGINAL_COMMIT_TIMESTAMP: u64 = 7;
pub const TRANSACTION_LENGTH: u64 = 8;
pub const IMMEDIATE_SERVER_VERSION: u64 = 9;
pub const ORIGINAL_SERVER_VERSION: u64 = 10;
pub const COMMIT_GROUP_TICKET: u64 = 11;
}
#[derive(Debug, Clone, Eq, PartialEq, Hash)]
pub struct GtidEvent {
flags: RawFlags<GtidFlags, u8>,
sid: [u8; Self::ENCODED_SID_LENGTH],
gno: RawConst<LeU64, Gno>,
tag: Option<Tag<'static>>,
lc_typecode: Option<LogicalTimestampTypecode>,
last_committed: RawInt<LeU64>,
sequence_number: RawInt<LeU64>,
immediate_commit_timestamp: RawInt<LeU56>,
original_commit_timestamp: RawInt<LeU56>,
tx_length: RawInt<LenEnc>,
original_server_version: RawInt<LeU32>,
immediate_server_version: RawInt<LeU32>,
serialization_version: u8,
commit_group_ticket: u64,
}
impl GtidEvent {
pub const POST_HEADER_LENGTH: usize = 1 + Self::ENCODED_SID_LENGTH + 8 + 1 + 16;
pub const ENCODED_SID_LENGTH: usize = 16;
pub const LOGICAL_TIMESTAMP_TYPECODE: u8 = 2;
pub const IMMEDIATE_COMMIT_TIMESTAMP_LENGTH: usize = 7;
pub const ORIGINAL_COMMIT_TIMESTAMP_LENGTH: usize = 7;
pub const UNDEFINED_SERVER_VERSION: u32 = 999_999;
pub const IMMEDIATE_SERVER_VERSION_LENGTH: usize = 4;
pub const COMMIT_GROUP_TICKET_UNSET: u64 = 0;
pub const TAGGED_SERIALIZATION_VERSION_V2: u8 = 2;
pub fn new(sid: [u8; Self::ENCODED_SID_LENGTH], gno: u64) -> Self {
Self {
flags: Default::default(),
sid,
gno: RawConst::new(gno),
tag: None,
lc_typecode: Some(LogicalTimestampTypecode::default()),
last_committed: Default::default(),
sequence_number: Default::default(),
immediate_commit_timestamp: Default::default(),
original_commit_timestamp: Default::default(),
tx_length: Default::default(),
original_server_version: Default::default(),
immediate_server_version: Default::default(),
serialization_version: 0,
commit_group_ticket: Self::COMMIT_GROUP_TICKET_UNSET,
}
}
pub fn new_tagged(sid: [u8; Self::ENCODED_SID_LENGTH], tag: Tag<'static>, gno: u64) -> Self {
Self {
flags: Default::default(),
sid,
gno: RawConst::new(gno),
tag: Some(tag),
lc_typecode: None,
last_committed: Default::default(),
sequence_number: Default::default(),
immediate_commit_timestamp: Default::default(),
original_commit_timestamp: Default::default(),
tx_length: Default::default(),
original_server_version: RawInt::new(Self::UNDEFINED_SERVER_VERSION),
immediate_server_version: RawInt::new(Self::UNDEFINED_SERVER_VERSION),
serialization_version: Self::TAGGED_SERIALIZATION_VERSION_V2,
commit_group_ticket: Self::COMMIT_GROUP_TICKET_UNSET,
}
}
pub fn is_tagged(&self) -> bool {
self.tag.is_some()
}
pub fn event_type(&self) -> EventType {
if self.tag.is_some() {
EventType::GTID_TAGGED_LOG_EVENT
} else {
EventType::GTID_EVENT
}
}
pub fn with_flags(mut self, flags: GtidFlags) -> Self {
self.flags = RawFlags::new(flags.bits());
self
}
pub fn flags_raw(&self) -> u8 {
self.flags.0
}
pub fn flags(&self) -> GtidFlags {
self.flags.get()
}
pub fn with_sid(mut self, sid: [u8; Self::ENCODED_SID_LENGTH]) -> Self {
self.sid = sid;
self
}
pub fn sid(&self) -> [u8; Self::ENCODED_SID_LENGTH] {
self.sid
}
pub fn with_gno(mut self, gno: u64) -> Self {
self.gno = RawConst::new(gno);
self
}
pub fn gno(&self) -> u64 {
self.gno.0
}
pub fn tag(&self) -> Option<&Tag<'static>> {
self.tag.as_ref()
}
pub fn with_tag(mut self, tag: Tag<'static>) -> Self {
self.tag = Some(tag);
self.serialization_version = Self::TAGGED_SERIALIZATION_VERSION_V2;
self
}
pub fn lc_typecode(&self) -> Option<u8> {
self.lc_typecode.as_ref().map(|x| x.value())
}
pub fn with_lc_typecode(mut self) -> Self {
self.lc_typecode = Some(LogicalTimestampTypecode::default());
self
}
pub fn with_last_committed(mut self, last_committed: u64) -> Self {
self.last_committed = RawInt::new(last_committed);
self
}
pub fn last_committed(&self) -> u64 {
self.last_committed.0
}
pub fn with_sequence_number(mut self, sequence_number: u64) -> Self {
self.sequence_number = RawInt::new(sequence_number);
self
}
pub fn sequence_number(&self) -> u64 {
self.sequence_number.0
}
pub fn with_immediate_commit_timestamp(mut self, immediate_commit_timestamp: u64) -> Self {
self.immediate_commit_timestamp = RawInt::new(immediate_commit_timestamp);
self
}
pub fn immediate_commit_timestamp(&self) -> u64 {
self.immediate_commit_timestamp.0
}
pub fn with_original_commit_timestamp(mut self, original_commit_timestamp: u64) -> Self {
self.original_commit_timestamp = RawInt::new(original_commit_timestamp);
self
}
pub fn original_commit_timestamp(&self) -> u64 {
self.original_commit_timestamp.0
}
pub fn with_tx_length(mut self, tx_length: u64) -> Self {
self.tx_length = RawInt::new(tx_length);
self
}
pub fn tx_length(&self) -> u64 {
self.tx_length.0
}
pub fn with_original_server_version(mut self, original_server_version: u32) -> Self {
self.original_server_version = RawInt::new(original_server_version);
self
}
pub fn original_server_version(&self) -> u32 {
self.original_server_version.0
}
pub fn with_immediate_server_version(mut self, immediate_server_version: u32) -> Self {
self.immediate_server_version = RawInt::new(immediate_server_version);
self
}
pub fn immediate_server_version(&self) -> u32 {
self.immediate_server_version.0
}
pub fn commit_group_ticket(&self) -> u64 {
self.commit_group_ticket
}
pub fn with_commit_group_ticket(mut self, ticket: u64) -> Self {
self.commit_group_ticket = ticket;
self
}
}
fn compute_self_inclusive_payload_size(
extra_overhead: usize,
fields_size: usize,
lnif: u64,
) -> u64 {
let fixed = extra_overhead as u64 + varlen_uint_size(lnif) as u64 + fields_size as u64;
let mut ps = fixed + varlen_uint_size(fields_size as u64) as u64;
loop {
let next = fixed + varlen_uint_size(ps) as u64;
if next == ps {
return ps;
}
ps = next;
}
}
fn read_varlen_int(buf: &mut ParseBuf<'_>) -> io::Result<i64> {
let unsigned = read_varlen_uint(buf)?;
let sign = unsigned & 1;
let magnitude = (unsigned >> 1) as i64;
if sign != 0 {
Ok(-magnitude - 1)
} else {
Ok(magnitude)
}
}
fn write_varlen_int(buf: &mut Vec<u8>, value: i64) {
let unsigned = if value >= 0 {
(value as u64) << 1
} else {
((-(value + 1)) as u64) << 1 | 1
};
write_varlen_uint(buf, unsigned);
}
fn read_serialized_uuid(buf: &mut ParseBuf<'_>) -> io::Result<[u8; 16]> {
let mut uuid = [0u8; 16];
for (i, byte) in uuid.iter_mut().enumerate() {
let val = read_varlen_uint(buf)?;
if val > u8::MAX as u64 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("UUID byte {} out of range: {}", i, val),
));
}
*byte = val as u8;
}
Ok(uuid)
}
fn write_serialized_uuid(buf: &mut Vec<u8>, uuid: &[u8; 16]) {
for &byte in uuid.iter() {
write_varlen_uint(buf, byte as u64);
}
}
fn read_varlen_string<'a>(buf: &mut ParseBuf<'a>) -> io::Result<Cow<'a, str>> {
let raw_len = read_varlen_uint(buf)?;
let len: usize = raw_len.try_into().map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidData,
format!("varlen string length {} exceeds platform usize", raw_len),
)
})?;
if buf.len() < len {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected end of buffer reading varlen string",
));
}
let bytes = &buf.0[..len];
buf.0 = &buf.0[len..];
let s = std::str::from_utf8(bytes)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, format!("invalid UTF-8: {}", e)))?;
Ok(Cow::Borrowed(s))
}
impl<'de> MyDeserialize<'de> for GtidEvent {
const SIZE: Option<usize> = None;
type Ctx = BinlogCtx<'de>;
fn deserialize(ctx: Self::Ctx, buf: &mut ParseBuf<'de>) -> io::Result<Self> {
let event_type = EventType::try_from(ctx.event_type_raw)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
match event_type {
EventType::GTID_EVENT | EventType::ANONYMOUS_GTID_EVENT => {
Self::deserialize_untagged(buf)
}
EventType::GTID_TAGGED_LOG_EVENT => Self::deserialize_tagged(buf),
_ => Err(io::Error::other("unexpected event type for GtidEvent")),
}
}
}
impl GtidEvent {
fn deserialize_untagged(buf: &mut ParseBuf<'_>) -> io::Result<Self> {
let mut sbuf: ParseBuf<'_> = buf.parse(1 + Self::ENCODED_SID_LENGTH + 8)?;
let flags = sbuf.parse_unchecked(())?;
let sid: [u8; Self::ENCODED_SID_LENGTH] = sbuf.parse_unchecked(())?;
let gno = sbuf.parse_unchecked(())?;
let mut lc_typecode = None;
let mut last_committed = RawInt::new(0);
let mut sequence_number = RawInt::new(0);
let mut immediate_commit_timestamp = RawInt::new(0);
let mut original_commit_timestamp = RawInt::new(0);
let mut tx_length = RawInt::new(0);
let mut original_server_version = RawInt::new(Self::UNDEFINED_SERVER_VERSION);
let mut immediate_server_version = RawInt::new(Self::UNDEFINED_SERVER_VERSION);
if !buf.is_empty() && buf.0[0] == Self::LOGICAL_TIMESTAMP_TYPECODE {
lc_typecode = Some(buf.parse_unchecked(())?);
let mut sbuf: ParseBuf<'_> = buf.parse(16)?;
last_committed = sbuf.parse_unchecked(())?;
sequence_number = sbuf.parse_unchecked(())?;
if buf.len() >= Self::IMMEDIATE_COMMIT_TIMESTAMP_LENGTH {
immediate_commit_timestamp = buf.parse_unchecked(())?;
if immediate_commit_timestamp.0 & (1 << 55) != 0 {
immediate_commit_timestamp.0 &= !(1 << 55);
original_commit_timestamp = buf.parse(())?;
} else {
original_commit_timestamp = immediate_commit_timestamp;
}
}
if !buf.is_empty() {
tx_length = buf.parse_unchecked(())?;
}
if buf.len() >= Self::IMMEDIATE_SERVER_VERSION_LENGTH {
immediate_server_version = buf.parse_unchecked(())?;
if immediate_server_version.0 & (1 << 31) != 0 {
immediate_server_version.0 &= !(1 << 31);
original_server_version = buf.parse(())?;
} else {
original_server_version = immediate_server_version;
}
}
}
Ok(Self {
flags,
sid,
gno,
tag: None,
lc_typecode,
last_committed,
sequence_number,
immediate_commit_timestamp,
original_commit_timestamp,
tx_length,
original_server_version,
immediate_server_version,
serialization_version: 0,
commit_group_ticket: Self::COMMIT_GROUP_TICKET_UNSET,
})
}
fn deserialize_tagged(buf: &mut ParseBuf<'_>) -> io::Result<Self> {
if buf.is_empty() {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"unexpected end reading serialization version",
));
}
let serialization_version = buf.0[0];
buf.0 = &buf.0[1..];
match serialization_version {
Self::TAGGED_SERIALIZATION_VERSION_V2 => {
Self::deserialize_tagged_v2(serialization_version, buf)
}
v => Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("unsupported tagged GTID serialization version {v}"),
)),
}
}
fn deserialize_tagged_v2(
serialization_version: u8,
buf: &mut ParseBuf<'_>,
) -> io::Result<Self> {
let buf_len_at_start = buf.len() + std::mem::size_of_val(&serialization_version);
let payload_size = read_varlen_uint(buf)?;
let last_non_ignorable_field_id = read_varlen_uint(buf)?;
let envelope_consumed = (buf_len_at_start - buf.len()) as u64;
if payload_size < envelope_consumed {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"payload_size ({}) is smaller than envelope overhead ({})",
payload_size, envelope_consumed,
),
));
}
let fields_len = (payload_size - envelope_consumed) as usize;
if buf.len() < fields_len {
return Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
format!(
"buffer has {} bytes but payload declares {} field bytes",
buf.len(),
fields_len,
),
));
}
let mut payload_buf = ParseBuf(&buf.0[..fields_len]);
buf.0 = &buf.0[fields_len..];
let mut flags = RawFlags::new(0);
let mut sid = [0u8; 16];
let mut tag: Option<Tag<'static>> = None;
let mut gno = 0u64;
let mut last_committed = 0u64;
let mut sequence_number = 0u64;
let mut immediate_commit_timestamp = 0u64;
let mut original_commit_timestamp = 0u64;
let mut tx_length = 0u64;
let mut original_server_version = Self::UNDEFINED_SERVER_VERSION;
let mut immediate_server_version = Self::UNDEFINED_SERVER_VERSION;
let mut commit_group_ticket = Self::COMMIT_GROUP_TICKET_UNSET;
let mut seen_original_commit_timestamp = false;
let mut seen_original_server_version = false;
while !payload_buf.is_empty() {
let fid = read_varlen_uint(&mut payload_buf)?;
match fid {
field_id::GTID_FLAGS => {
let val = read_varlen_uint(&mut payload_buf)?;
if val > u8::MAX as u64 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("gtid_flags value out of u8 range: {}", val),
));
}
flags = RawFlags::new(val as u8);
}
field_id::SID => {
sid = read_serialized_uuid(&mut payload_buf)?;
}
field_id::GNO => {
let val = read_varlen_int(&mut payload_buf)?;
if val < 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("negative gno value: {}", val),
));
}
gno = val as u64;
}
field_id::TAG => {
let tag_str = read_varlen_string(&mut payload_buf)?;
tag = Some(
Tag::new(tag_str.into_owned())
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?,
);
}
field_id::LAST_COMMITTED => {
let val = read_varlen_int(&mut payload_buf)?;
if val < 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("negative last_committed value: {}", val),
));
}
last_committed = val as u64;
}
field_id::SEQUENCE_NUMBER => {
let val = read_varlen_int(&mut payload_buf)?;
if val < 0 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("negative sequence_number value: {}", val),
));
}
sequence_number = val as u64;
}
field_id::IMMEDIATE_COMMIT_TIMESTAMP => {
immediate_commit_timestamp = read_varlen_uint(&mut payload_buf)?;
}
field_id::ORIGINAL_COMMIT_TIMESTAMP => {
original_commit_timestamp = read_varlen_uint(&mut payload_buf)?;
seen_original_commit_timestamp = true;
}
field_id::TRANSACTION_LENGTH => {
tx_length = read_varlen_uint(&mut payload_buf)?;
}
field_id::IMMEDIATE_SERVER_VERSION => {
let val = read_varlen_uint(&mut payload_buf)?;
if val > u32::MAX as u64 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("immediate_server_version out of u32 range: {}", val),
));
}
immediate_server_version = val as u32;
}
field_id::ORIGINAL_SERVER_VERSION => {
let val = read_varlen_uint(&mut payload_buf)?;
if val > u32::MAX as u64 {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("original_server_version out of u32 range: {}", val),
));
}
original_server_version = val as u32;
seen_original_server_version = true;
}
field_id::COMMIT_GROUP_TICKET => {
commit_group_ticket = read_varlen_uint(&mut payload_buf)?;
}
_ => {
if fid <= last_non_ignorable_field_id {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!(
"unknown non-ignorable field {} in GTID_TAGGED_LOG_EVENT \
(last_non_ignorable_field_id = {})",
fid, last_non_ignorable_field_id,
),
));
}
break;
}
}
}
if !seen_original_commit_timestamp {
original_commit_timestamp = immediate_commit_timestamp;
}
if !seen_original_server_version {
original_server_version = immediate_server_version;
}
let tag = tag.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidData,
"GTID_TAGGED_LOG_EVENT missing tag field",
)
})?;
Ok(Self {
flags,
sid,
gno: RawConst::new(gno),
tag: Some(tag),
lc_typecode: None,
last_committed: RawInt::new(last_committed),
sequence_number: RawInt::new(sequence_number),
immediate_commit_timestamp: RawInt::new(immediate_commit_timestamp),
original_commit_timestamp: RawInt::new(original_commit_timestamp),
tx_length: RawInt::new(tx_length),
original_server_version: RawInt::new(original_server_version),
immediate_server_version: RawInt::new(immediate_server_version),
serialization_version,
commit_group_ticket,
})
}
}
impl MySerialize for GtidEvent {
fn serialize(&self, buf: &mut Vec<u8>) {
if self.tag.is_some() {
self.serialize_tagged(buf);
} else {
self.serialize_untagged(buf);
}
}
}
impl GtidEvent {
fn serialize_untagged(&self, buf: &mut Vec<u8>) {
self.flags.serialize(&mut *buf);
self.sid.serialize(&mut *buf);
self.gno.serialize(&mut *buf);
match self.lc_typecode {
Some(lc_typecode) => lc_typecode.serialize(&mut *buf),
None => return,
};
self.last_committed.serialize(&mut *buf);
self.sequence_number.serialize(&mut *buf);
let mut immediate_commit_timestamp_with_flag = *self.immediate_commit_timestamp;
if self.immediate_commit_timestamp != self.original_commit_timestamp {
immediate_commit_timestamp_with_flag |= 1 << 55;
} else {
immediate_commit_timestamp_with_flag &= !(1 << 55);
}
RawInt::<LeU56>::new(immediate_commit_timestamp_with_flag).serialize(&mut *buf);
if self.immediate_commit_timestamp != self.original_commit_timestamp {
self.original_commit_timestamp.serialize(&mut *buf);
}
self.tx_length.serialize(&mut *buf);
let mut immediate_server_version_with_flag = *self.immediate_server_version;
if self.immediate_server_version != self.original_server_version {
immediate_server_version_with_flag |= 1 << 31;
} else {
immediate_server_version_with_flag &= !(1 << 31);
}
RawInt::<LeU32>::new(immediate_server_version_with_flag).serialize(&mut *buf);
if self.immediate_server_version != self.original_server_version {
self.original_server_version.serialize(&mut *buf);
}
}
fn serialize_tagged(&self, buf: &mut Vec<u8>) {
match self.serialization_version {
Self::TAGGED_SERIALIZATION_VERSION_V2 => self.serialize_tagged_v2(buf),
_ => unreachable!(
"unsupported tagged GTID serialization version {}",
self.serialization_version
),
}
}
fn write_tagged_fields_v2(&self, fields: &mut Vec<u8>) {
write_varlen_uint(fields, field_id::GTID_FLAGS);
write_varlen_uint(fields, self.flags.0 as u64);
write_varlen_uint(fields, field_id::SID);
write_serialized_uuid(fields, &self.sid);
assert!(self.gno.0 <= i64::MAX as u64, "gno exceeds i64::MAX");
write_varlen_uint(fields, field_id::GNO);
write_varlen_int(fields, self.gno.0 as i64);
write_varlen_uint(fields, field_id::TAG);
let tag_bytes = self
.tag
.as_ref()
.map(|t| t.as_str().as_bytes())
.unwrap_or(b"");
write_varlen_uint(fields, tag_bytes.len() as u64);
fields.extend_from_slice(tag_bytes);
assert!(
self.last_committed.0 <= i64::MAX as u64,
"last_committed exceeds i64::MAX"
);
write_varlen_uint(fields, field_id::LAST_COMMITTED);
write_varlen_int(fields, self.last_committed.0 as i64);
assert!(
self.sequence_number.0 <= i64::MAX as u64,
"sequence_number exceeds i64::MAX"
);
write_varlen_uint(fields, field_id::SEQUENCE_NUMBER);
write_varlen_int(fields, self.sequence_number.0 as i64);
write_varlen_uint(fields, field_id::IMMEDIATE_COMMIT_TIMESTAMP);
write_varlen_uint(fields, self.immediate_commit_timestamp.0);
if self.original_commit_timestamp != self.immediate_commit_timestamp {
write_varlen_uint(fields, field_id::ORIGINAL_COMMIT_TIMESTAMP);
write_varlen_uint(fields, self.original_commit_timestamp.0);
}
write_varlen_uint(fields, field_id::TRANSACTION_LENGTH);
write_varlen_uint(fields, self.tx_length.0);
write_varlen_uint(fields, field_id::IMMEDIATE_SERVER_VERSION);
write_varlen_uint(fields, self.immediate_server_version.0 as u64);
if self.original_server_version != self.immediate_server_version {
write_varlen_uint(fields, field_id::ORIGINAL_SERVER_VERSION);
write_varlen_uint(fields, self.original_server_version.0 as u64);
}
if self.commit_group_ticket != Self::COMMIT_GROUP_TICKET_UNSET {
write_varlen_uint(fields, field_id::COMMIT_GROUP_TICKET);
write_varlen_uint(fields, self.commit_group_ticket);
}
}
fn serialize_tagged_v2(&self, buf: &mut Vec<u8>) {
let mut fields = Vec::new();
self.write_tagged_fields_v2(&mut fields);
let last_non_ignorable_field_id: u64 = 0;
let version_byte_size = std::mem::size_of_val(&self.serialization_version);
let payload_size = compute_self_inclusive_payload_size(
version_byte_size,
fields.len(),
last_non_ignorable_field_id,
);
buf.reserve(
1 + varlen_uint_size(payload_size)
+ varlen_uint_size(last_non_ignorable_field_id)
+ fields.len(),
);
buf.push(self.serialization_version);
write_varlen_uint(buf, payload_size);
write_varlen_uint(buf, last_non_ignorable_field_id);
buf.extend_from_slice(&fields);
}
}
impl<'a> BinlogStruct<'a> for GtidEvent {
fn len(&self, _version: BinlogVersion) -> usize {
if self.tag.is_some() {
self.len_tagged()
} else {
self.len_untagged()
}
}
}
impl GtidEvent {
fn len_untagged(&self) -> usize {
let mut len = S(0);
len += S(1); len += S(Self::ENCODED_SID_LENGTH); len += S(8); len += S(1); len += S(8); len += S(8);
len += S(7); if self.immediate_commit_timestamp != self.original_commit_timestamp {
len += S(7); }
len += S(crate::misc::lenenc_int_len(*self.tx_length) as usize); len += S(4); if self.immediate_server_version != self.original_server_version {
len += S(4); }
min(len.0, u32::MAX as usize - BinlogEventHeader::LEN)
}
fn len_tagged(&self) -> usize {
match self.serialization_version {
Self::TAGGED_SERIALIZATION_VERSION_V2 => self.len_tagged_v2(),
_ => unreachable!(
"unsupported tagged GTID serialization version {}",
self.serialization_version
),
}
}
fn len_tagged_v2(&self) -> usize {
let mut fields = Vec::new();
self.write_tagged_fields_v2(&mut fields);
let last_non_ignorable_field_id: u64 = 0;
let version_byte_size = std::mem::size_of_val(&self.serialization_version);
let total_size = compute_self_inclusive_payload_size(
version_byte_size,
fields.len(),
last_non_ignorable_field_id,
) as usize;
min(total_size, u32::MAX as usize - BinlogEventHeader::LEN)
}
}
impl<'a> BinlogEvent<'a> for GtidEvent {
const EVENT_TYPE: EventType = EventType::GTID_EVENT;
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn varlen_uint_roundtrip() {
let test_cases: &[u64] = &[
0,
1,
63,
64,
127,
128,
255,
256,
0x3FFF,
0x4000,
0x1F_FFFF,
0x20_0000,
0x0FFF_FFFF,
0x1000_0000,
0x07_FFFF_FFFF,
0x08_0000_0000,
0x03FF_FFFF_FFFF,
0x0400_0000_0000,
0x01_FFFF_FFFF_FFFF,
0x02_0000_0000_0000,
0x00FF_FFFF_FFFF_FFFF,
0x0100_0000_0000_0000,
u64::MAX / 2,
u64::MAX,
];
for &value in test_cases {
let mut buf = Vec::new();
write_varlen_uint(&mut buf, value);
let mut parse_buf = ParseBuf(&buf);
let decoded = read_varlen_uint(&mut parse_buf).unwrap();
assert_eq!(
value, decoded,
"roundtrip failed for {} (0x{:X}), encoded {:?}",
value, value, buf
);
assert!(
parse_buf.is_empty(),
"leftover bytes for {} (0x{:X})",
value,
value
);
}
}
#[test]
fn varlen_uint_byte_count() {
let mut buf = Vec::new();
write_varlen_uint(&mut buf, 0);
assert_eq!(buf.len(), 1);
buf.clear();
write_varlen_uint(&mut buf, 127);
assert_eq!(buf.len(), 1);
buf.clear();
write_varlen_uint(&mut buf, 128);
assert_eq!(buf.len(), 2);
buf.clear();
write_varlen_uint(&mut buf, u64::MAX);
assert_eq!(buf.len(), 9);
}
#[test]
fn varlen_int_roundtrip() {
let test_cases: &[i64] = &[
0,
1,
-1,
127,
-128,
i64::MAX,
i64::MIN,
i64::MAX / 2,
i64::MIN / 2,
42,
-42,
];
for &value in test_cases {
let mut buf = Vec::new();
write_varlen_int(&mut buf, value);
let mut parse_buf = ParseBuf(&buf);
let decoded = read_varlen_int(&mut parse_buf).unwrap();
assert_eq!(
value, decoded,
"signed roundtrip failed for {} (0x{:X})",
value, value
);
assert!(parse_buf.is_empty(), "leftover bytes for signed {}", value);
}
}
#[test]
fn tagged_event_creation() {
let sid = [1u8; 16];
let tag = Tag::new("domain_1").unwrap();
let event = GtidEvent::new_tagged(sid, tag, 42);
assert!(event.is_tagged());
assert_eq!(event.sid(), sid);
assert_eq!(event.tag().unwrap().as_str(), "domain_1");
assert_eq!(event.gno(), 42);
assert_eq!(event.flags_raw(), 0);
}
#[test]
fn untagged_event_creation() {
let sid = [1u8; 16];
let event = GtidEvent::new(sid, 42);
assert!(!event.is_tagged());
assert_eq!(event.sid(), sid);
assert!(event.tag().is_none());
assert_eq!(event.gno(), 42);
}
#[test]
fn tagged_event_serialize_deserialize_roundtrip() {
let sid = *uuid::Uuid::parse_str("3E11FA47-71CA-11E1-9E33-C80AA9429562")
.unwrap()
.as_bytes();
let tag = Tag::new("domain_1").unwrap();
let event = GtidEvent::new_tagged(sid, tag, 42)
.with_last_committed(10)
.with_sequence_number(11)
.with_immediate_commit_timestamp(1234567890)
.with_original_commit_timestamp(1234567890)
.with_tx_length(256)
.with_immediate_server_version(90200)
.with_original_server_version(90200);
let mut buf = Vec::new();
event.serialize(&mut buf);
let mut parse_buf = ParseBuf(&buf);
let decoded = GtidEvent::deserialize_tagged(&mut parse_buf).unwrap();
assert!(decoded.is_tagged());
assert_eq!(decoded.sid(), event.sid());
assert_eq!(
decoded.tag().unwrap().as_str(),
event.tag().unwrap().as_str()
);
assert_eq!(decoded.gno(), event.gno());
assert_eq!(decoded.last_committed(), event.last_committed());
assert_eq!(decoded.sequence_number(), event.sequence_number());
assert_eq!(
decoded.immediate_commit_timestamp(),
event.immediate_commit_timestamp()
);
assert_eq!(
decoded.original_commit_timestamp(),
event.original_commit_timestamp()
);
assert_eq!(decoded.tx_length(), event.tx_length());
assert_eq!(
decoded.immediate_server_version(),
event.immediate_server_version()
);
assert_eq!(
decoded.original_server_version(),
event.original_server_version()
);
}
#[test]
fn tagged_event_with_different_timestamps() {
let sid = [0xABu8; 16];
let tag = Tag::new("backup").unwrap();
let event = GtidEvent::new_tagged(sid, tag, 1)
.with_immediate_commit_timestamp(2000)
.with_original_commit_timestamp(1000)
.with_immediate_server_version(90200)
.with_original_server_version(90100);
let mut buf = Vec::new();
event.serialize(&mut buf);
let mut parse_buf = ParseBuf(&buf);
let decoded = GtidEvent::deserialize_tagged(&mut parse_buf).unwrap();
assert_eq!(decoded.immediate_commit_timestamp(), 2000);
assert_eq!(decoded.original_commit_timestamp(), 1000);
assert_eq!(decoded.immediate_server_version(), 90200);
assert_eq!(decoded.original_server_version(), 90100);
}
#[test]
fn tagged_event_roundtrip_preserves_explicit_zero_originals() {
let sid = [0xCDu8; 16];
let tag = Tag::new("roundtrip").unwrap();
let event = GtidEvent::new_tagged(sid, tag.clone(), 5)
.with_immediate_commit_timestamp(99999)
.with_original_commit_timestamp(0)
.with_immediate_server_version(90200)
.with_original_server_version(90200);
let mut buf = Vec::new();
event.serialize(&mut buf);
let mut parse_buf = ParseBuf(&buf);
let decoded = GtidEvent::deserialize_tagged(&mut parse_buf).unwrap();
assert_eq!(decoded.original_commit_timestamp(), 0);
assert_eq!(decoded.immediate_commit_timestamp(), 99999);
let event = GtidEvent::new_tagged(sid, tag, 5)
.with_immediate_commit_timestamp(50000)
.with_original_commit_timestamp(50000)
.with_immediate_server_version(90200)
.with_original_server_version(GtidEvent::UNDEFINED_SERVER_VERSION);
buf.clear();
event.serialize(&mut buf);
let mut parse_buf = ParseBuf(&buf);
let decoded = GtidEvent::deserialize_tagged(&mut parse_buf).unwrap();
assert_eq!(
decoded.original_server_version(),
GtidEvent::UNDEFINED_SERVER_VERSION,
);
assert_eq!(decoded.immediate_server_version(), 90200);
}
#[test]
fn deserialize_rejects_unknown_non_ignorable_field() {
let mut fields = Vec::new();
write_varlen_uint(&mut fields, field_id::GTID_FLAGS);
write_varlen_uint(&mut fields, 0);
write_varlen_uint(&mut fields, 99);
write_varlen_uint(&mut fields, 0);
let last_non_ignorable: u64 = 100;
let fields_size = fields.len() as u64;
let payload_size = fields_size
+ varlen_uint_size(fields_size) as u64
+ varlen_uint_size(last_non_ignorable) as u64;
let mut buf = Vec::new();
buf.push(GtidEvent::TAGGED_SERIALIZATION_VERSION_V2);
write_varlen_uint(&mut buf, payload_size);
write_varlen_uint(&mut buf, last_non_ignorable);
buf.extend_from_slice(&fields);
let mut parse_buf = ParseBuf(&buf);
let result = GtidEvent::deserialize_tagged(&mut parse_buf);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
assert!(err.to_string().contains("unknown non-ignorable field"));
}
#[test]
fn deserialize_rejects_unknown_serialization_version() {
let mut buf = Vec::new();
buf.push(0u8); write_varlen_uint(&mut buf, 10); write_varlen_uint(&mut buf, 0);
let mut parse_buf = ParseBuf(&buf);
let result = GtidEvent::deserialize_tagged(&mut parse_buf);
assert!(result.is_err());
let err = result.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::InvalidData);
assert!(
err.to_string()
.contains("unsupported tagged GTID serialization version"),
);
}
fn build_raw_tagged_payload(field_pairs: &[(u64, Vec<u8>)]) -> Vec<u8> {
build_raw_tagged_payload_with_lnif(field_pairs, field_id::IMMEDIATE_SERVER_VERSION)
}
fn build_raw_tagged_payload_with_lnif(
field_pairs: &[(u64, Vec<u8>)],
last_non_ignorable: u64,
) -> Vec<u8> {
let mut fields = Vec::new();
for (fid, data) in field_pairs {
write_varlen_uint(&mut fields, *fid);
fields.extend_from_slice(data);
}
let version_byte: u8 = GtidEvent::TAGGED_SERIALIZATION_VERSION_V2;
let version_byte_size = std::mem::size_of_val(&version_byte);
let payload_size = compute_self_inclusive_payload_size(
version_byte_size,
fields.len(),
last_non_ignorable,
);
let mut buf = Vec::new();
buf.push(version_byte); write_varlen_uint(&mut buf, payload_size);
write_varlen_uint(&mut buf, last_non_ignorable);
buf.extend_from_slice(&fields);
buf
}
fn encode_varlen_uint(value: u64) -> Vec<u8> {
let mut buf = Vec::new();
write_varlen_uint(&mut buf, value);
buf
}
fn encode_varlen_int(value: i64) -> Vec<u8> {
let mut buf = Vec::new();
write_varlen_int(&mut buf, value);
buf
}
fn encode_uuid(uuid: &[u8; 16]) -> Vec<u8> {
let mut buf = Vec::new();
write_serialized_uuid(&mut buf, uuid);
buf
}
fn encode_varlen_string(s: &str) -> Vec<u8> {
let mut buf = Vec::new();
write_varlen_uint(&mut buf, s.len() as u64);
buf.extend_from_slice(s.as_bytes());
buf
}
#[test]
fn deserialize_rejects_negative_signed_fields() {
let sid = [0u8; 16];
let buf = build_raw_tagged_payload(&[
(field_id::GTID_FLAGS, encode_varlen_uint(0)),
(field_id::SID, encode_uuid(&sid)),
(field_id::GNO, encode_varlen_int(-1)),
(field_id::TAG, encode_varlen_string("test")),
(field_id::LAST_COMMITTED, encode_varlen_int(0)),
(field_id::SEQUENCE_NUMBER, encode_varlen_int(0)),
(field_id::IMMEDIATE_COMMIT_TIMESTAMP, encode_varlen_uint(0)),
(field_id::TRANSACTION_LENGTH, encode_varlen_uint(0)),
(field_id::IMMEDIATE_SERVER_VERSION, encode_varlen_uint(0)),
]);
let mut parse_buf = ParseBuf(&buf);
let err = GtidEvent::deserialize_tagged(&mut parse_buf).unwrap_err();
assert!(err.to_string().contains("negative gno"), "{}", err);
let buf = build_raw_tagged_payload(&[
(field_id::GTID_FLAGS, encode_varlen_uint(0)),
(field_id::SID, encode_uuid(&sid)),
(field_id::GNO, encode_varlen_int(1)),
(field_id::TAG, encode_varlen_string("test")),
(field_id::LAST_COMMITTED, encode_varlen_int(-5)),
(field_id::SEQUENCE_NUMBER, encode_varlen_int(0)),
(field_id::IMMEDIATE_COMMIT_TIMESTAMP, encode_varlen_uint(0)),
(field_id::TRANSACTION_LENGTH, encode_varlen_uint(0)),
(field_id::IMMEDIATE_SERVER_VERSION, encode_varlen_uint(0)),
]);
let mut parse_buf = ParseBuf(&buf);
let err = GtidEvent::deserialize_tagged(&mut parse_buf).unwrap_err();
assert!(
err.to_string().contains("negative last_committed"),
"{}",
err
);
let buf = build_raw_tagged_payload(&[
(field_id::GTID_FLAGS, encode_varlen_uint(0)),
(field_id::SID, encode_uuid(&sid)),
(field_id::GNO, encode_varlen_int(1)),
(field_id::TAG, encode_varlen_string("test")),
(field_id::LAST_COMMITTED, encode_varlen_int(0)),
(field_id::SEQUENCE_NUMBER, encode_varlen_int(-100)),
(field_id::IMMEDIATE_COMMIT_TIMESTAMP, encode_varlen_uint(0)),
(field_id::TRANSACTION_LENGTH, encode_varlen_uint(0)),
(field_id::IMMEDIATE_SERVER_VERSION, encode_varlen_uint(0)),
]);
let mut parse_buf = ParseBuf(&buf);
let err = GtidEvent::deserialize_tagged(&mut parse_buf).unwrap_err();
assert!(
err.to_string().contains("negative sequence_number"),
"{}",
err
);
}
#[test]
fn deserialize_rejects_missing_tag() {
let sid = [0u8; 16];
let buf = build_raw_tagged_payload(&[
(field_id::GTID_FLAGS, encode_varlen_uint(0)),
(field_id::SID, encode_uuid(&sid)),
(field_id::GNO, encode_varlen_int(1)),
(field_id::LAST_COMMITTED, encode_varlen_int(0)),
(field_id::SEQUENCE_NUMBER, encode_varlen_int(0)),
(field_id::IMMEDIATE_COMMIT_TIMESTAMP, encode_varlen_uint(0)),
(field_id::TRANSACTION_LENGTH, encode_varlen_uint(0)),
(field_id::IMMEDIATE_SERVER_VERSION, encode_varlen_uint(0)),
]);
let mut parse_buf = ParseBuf(&buf);
let err = GtidEvent::deserialize_tagged(&mut parse_buf).unwrap_err();
assert!(err.to_string().contains("missing tag field"), "{}", err);
}
#[test]
fn deserialize_rejects_empty_buffer() {
let buf: &[u8] = &[];
let mut parse_buf = ParseBuf(buf);
let err = GtidEvent::deserialize_tagged(&mut parse_buf).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
}
#[test]
fn deserialize_rejects_truncated_varlen() {
let buf: &[u8] = &[GtidEvent::TAGGED_SERIALIZATION_VERSION_V2, 0xFF, 0x01, 0x02];
let mut parse_buf = ParseBuf(buf);
let err = GtidEvent::deserialize_tagged(&mut parse_buf).unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::UnexpectedEof);
}
#[test]
fn tagged_event_roundtrip_with_max_gno() {
let sid = [0xFFu8; 16];
let tag = Tag::new("maxgno").unwrap();
let event = GtidEvent::new_tagged(sid, tag, i64::MAX as u64)
.with_last_committed(i64::MAX as u64)
.with_sequence_number(i64::MAX as u64);
let mut buf = Vec::new();
event.serialize(&mut buf);
let mut parse_buf = ParseBuf(&buf);
let decoded = GtidEvent::deserialize_tagged(&mut parse_buf).unwrap();
assert_eq!(decoded.gno(), i64::MAX as u64);
assert_eq!(decoded.last_committed(), i64::MAX as u64);
assert_eq!(decoded.sequence_number(), i64::MAX as u64);
}
}