use bytes::Bytes;
use crabka_protocol::records::{Attributes, Record, RecordBatch};
#[allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MarkerType {
Commit,
Abort,
}
impl MarkerType {
#[allow(dead_code)]
fn type_code(self) -> i16 {
match self {
MarkerType::Commit => 1,
MarkerType::Abort => 0,
}
}
}
#[allow(dead_code)]
pub fn build_marker_batch(
producer_id: i64,
producer_epoch: i16,
base_offset: i64,
marker_type: MarkerType,
) -> RecordBatch {
let mut key = Vec::with_capacity(4);
key.extend_from_slice(&0i16.to_be_bytes()); key.extend_from_slice(&marker_type.type_code().to_be_bytes());
let attrs = Attributes::default()
.with_transactional(true)
.with_control(true);
RecordBatch {
attributes: attrs,
base_offset,
last_offset_delta: 0,
producer_id,
producer_epoch,
records: vec![Record {
offset_delta: 0,
key: Some(Bytes::from(key)),
value: None,
..Default::default()
}],
..RecordBatch::default()
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
#[test]
fn commit_marker_attribute_bits_set() {
let b = build_marker_batch(1000, 0, 7, MarkerType::Commit);
assert!(b.attributes.is_transactional());
assert!(b.attributes.is_control_batch());
}
#[test]
fn abort_marker_key_starts_with_version_zero_then_type_zero() {
let b = build_marker_batch(1000, 0, 0, MarkerType::Abort);
let key = b.records[0].key.as_ref().unwrap();
assert!(key.len() == 4);
assert!(&key[..2] == &0i16.to_be_bytes());
assert!(&key[2..] == &0i16.to_be_bytes());
}
#[test]
fn commit_marker_key_type_is_one() {
let b = build_marker_batch(1000, 0, 0, MarkerType::Commit);
let key = b.records[0].key.as_ref().unwrap();
assert!(&key[2..] == &1i16.to_be_bytes());
}
}