use bytes::{BufMut, Bytes, BytesMut};
const I64: usize = 8;
const I32: usize = 4;
const NULL: i32 = -1;
const SAME_AS_PRIOR: i32 = -2;
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct SuppressRecordCtx {
pub topic: String,
pub partition: i32,
pub offset: i64,
pub timestamp: i64,
}
impl SuppressRecordCtx {
fn write(&self, b: &mut BytesMut) {
b.put_i64(self.timestamp);
b.put_i64(self.offset);
b.put_i32(i32::try_from(self.topic.len()).expect("topic length fits i32"));
b.extend_from_slice(self.topic.as_bytes());
b.put_i32(self.partition);
b.put_i32(0); }
fn read(buf: &[u8]) -> (Self, &[u8]) {
let timestamp = read_i64(&buf[0..I64]);
let offset = read_i64(&buf[I64..2 * I64]);
let mut o = 2 * I64;
let topic_len =
usize::try_from(read_i32(&buf[o..o + I32])).expect("non-negative topic len");
o += I32;
let topic = String::from_utf8(buf[o..o + topic_len].to_vec()).expect("utf-8 topic");
o += topic_len;
let partition = read_i32(&buf[o..o + I32]);
o += I32;
o += I32;
(
Self {
topic,
partition,
offset,
timestamp,
},
&buf[o..],
)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct BufferedChange {
pub ctx: SuppressRecordCtx,
pub prior: Option<Vec<u8>>,
pub old: Option<Vec<u8>>,
pub new: Option<Vec<u8>>,
pub buffer_time: i64,
}
fn add_value(b: &mut BytesMut, value: Option<&[u8]>) {
match value {
None => b.put_i32(NULL),
Some(v) => {
b.put_i32(i32::try_from(v.len()).expect("value length fits i32"));
b.extend_from_slice(v);
}
}
}
pub(crate) fn serialize_buffer_change(
ctx: &SuppressRecordCtx,
prior: Option<&[u8]>,
old: Option<&[u8]>,
new: Option<&[u8]>,
buffer_time: i64,
) -> Bytes {
let mut b = BytesMut::new();
ctx.write(&mut b);
add_value(&mut b, prior);
match old {
None => b.put_i32(NULL),
Some(o) if prior == Some(o) => b.put_i32(SAME_AS_PRIOR),
Some(o) => {
b.put_i32(i32::try_from(o.len()).expect("old value length fits i32"));
b.extend_from_slice(o);
}
}
add_value(&mut b, new);
b.put_i64(buffer_time);
b.freeze()
}
fn read_add_value(buf: &[u8], o: &mut usize) -> Option<Vec<u8>> {
let len = read_i32(&buf[*o..*o + I32]);
*o += I32;
if len == NULL {
return None;
}
let len = usize::try_from(len).expect("non-negative length");
let v = buf[*o..*o + len].to_vec();
*o += len;
Some(v)
}
pub(crate) fn deserialize_buffer_change(bytes: &[u8]) -> BufferedChange {
let (ctx, rest) = SuppressRecordCtx::read(bytes);
let base = bytes.len() - rest.len();
let mut o = base;
let prior = read_add_value(bytes, &mut o);
let old_tag = read_i32(&bytes[o..o + I32]);
o += I32;
let old = if old_tag == NULL {
None
} else if old_tag == SAME_AS_PRIOR {
prior.clone()
} else {
let len = usize::try_from(old_tag).expect("non-negative length");
let v = bytes[o..o + len].to_vec();
o += len;
Some(v)
};
let new = read_add_value(bytes, &mut o);
let buffer_time = read_i64(&bytes[o..o + I64]);
BufferedChange {
ctx,
prior,
old,
new,
buffer_time,
}
}
fn read_i64(b: &[u8]) -> i64 {
i64::from_be_bytes(b.try_into().expect("8 bytes"))
}
fn read_i32(b: &[u8]) -> i32 {
i32::from_be_bytes(b.try_into().expect("4 bytes"))
}
#[cfg(test)]
mod tests {
use super::*;
fn count(n: i64) -> Vec<u8> {
n.to_be_bytes().to_vec()
}
fn ctx(timestamp: i64) -> SuppressRecordCtx {
SuppressRecordCtx {
topic: "in".to_string(),
partition: 0,
offset: 0,
timestamp,
}
}
fn hex(bytes: &[u8]) -> String {
use std::fmt::Write as _;
bytes.iter().fold(String::new(), |mut s, b| {
let _ = write!(s, "{b:02x}");
s
})
}
const WC_FIRST: &str = "000000000000000a0000000000000000000000026\
96e0000000000000000ffffffffffffffff000000080000000000000001000000000000000a";
const WC_CHANGE: &str = "000000000000000c0000000000000000000000026\
96e0000000000000000000000080000000000000001fffffffe000000080000000000000002000000000000000c";
const TOMBSTONE: &str = "00000000000000140000000000000000000000026\
96e0000000000000000000000080000000000000001fffffffeffffffff0000000000000014";
#[test]
fn wc_first_matches_jvm_bytes() {
let bytes = serialize_buffer_change(&ctx(10), None, None, Some(&count(1)), 10);
assert_eq!(hex(&bytes), WC_FIRST);
}
#[test]
fn wc_change_matches_jvm_bytes() {
let bytes = serialize_buffer_change(
&ctx(12),
Some(&count(1)),
Some(&count(1)),
Some(&count(2)),
12,
);
assert_eq!(hex(&bytes), WC_CHANGE);
}
#[test]
fn tombstone_matches_jvm_bytes() {
let bytes = serialize_buffer_change(&ctx(20), Some(&count(1)), Some(&count(1)), None, 20);
assert_eq!(hex(&bytes), TOMBSTONE);
}
#[test]
fn ctx_layout_is_30_bytes_for_two_char_topic() {
let mut b = BytesMut::new();
ctx(10).write(&mut b);
assert_eq!(b.len(), 30);
}
#[test]
fn round_trips_wc_first() {
let bytes = serialize_buffer_change(&ctx(10), None, None, Some(&count(1)), 10);
let d = deserialize_buffer_change(&bytes);
assert_eq!(d.ctx, ctx(10));
assert_eq!(d.prior, None);
assert_eq!(d.old, None);
assert_eq!(d.new, Some(count(1)));
assert_eq!(d.buffer_time, 10);
}
#[test]
fn round_trips_same_as_prior_alias() {
let bytes = serialize_buffer_change(
&ctx(12),
Some(&count(1)),
Some(&count(1)),
Some(&count(2)),
12,
);
let d = deserialize_buffer_change(&bytes);
assert_eq!(d.prior, Some(count(1)));
assert_eq!(d.old, Some(count(1)));
assert_eq!(d.new, Some(count(2)));
assert_eq!(d.buffer_time, 12);
}
#[test]
fn round_trips_tombstone() {
let bytes = serialize_buffer_change(&ctx(20), Some(&count(1)), Some(&count(1)), None, 20);
let d = deserialize_buffer_change(&bytes);
assert_eq!(d.new, None);
assert_eq!(d.old, Some(count(1)));
assert_eq!(d.buffer_time, 20);
}
#[test]
fn round_trips_distinct_prior_and_old() {
let bytes = serialize_buffer_change(
&ctx(33),
Some(&count(7)),
Some(&count(9)),
Some(&count(11)),
33,
);
let d = deserialize_buffer_change(&bytes);
assert_eq!(d.prior, Some(count(7)));
assert_eq!(d.old, Some(count(9)));
assert_eq!(d.new, Some(count(11)));
assert_eq!(d.buffer_time, 33);
}
#[test]
fn longer_topic_name_round_trips() {
let c = SuppressRecordCtx {
topic: "KSTREAM-TOTABLE-0000000005".to_string(),
partition: 3,
offset: 42,
timestamp: 100,
};
let bytes = serialize_buffer_change(&c, None, Some(&count(5)), Some(&count(6)), 100);
let d = deserialize_buffer_change(&bytes);
assert_eq!(d.ctx, c);
assert_eq!(d.old, Some(count(5)));
assert_eq!(d.new, Some(count(6)));
}
}