use bytes::{BufMut, Bytes};
use crate::{
encode::ToByte,
error::Result,
prelude::Compression,
protocol::HeaderRequest,
utils::{compress, compress_lz4, compress_snappy, compress_zstd, now, to_crc},
};
const API_KEY_PRODUCE: i16 = 0;
const API_VERSION: i16 = 3;
const MESSAGE_MAGIC_BYTE: i8 = 2;
#[derive(Debug)]
pub struct ProduceRequest<'a> {
pub header: HeaderRequest<'a>,
pub transactional_id: Option<String>,
pub required_acks: i16,
pub timeout_ms: i32,
topic_partitions: Vec<TopicPartition<'a>>,
attributes: Attributes,
}
impl<'a> ProduceRequest<'a> {
pub fn new(
required_acks: i16,
timeout_ms: i32,
correlation_id: i32,
client_id: &'a str,
attributes: Attributes,
) -> ProduceRequest<'a> {
ProduceRequest {
header: HeaderRequest::new(API_KEY_PRODUCE, API_VERSION, correlation_id, client_id),
transactional_id: None,
required_acks,
timeout_ms,
topic_partitions: vec![],
attributes,
}
}
pub fn add(
&mut self,
topic: &'a str,
partition: i32,
key: Option<Bytes>,
value: Option<Bytes>,
headers: Vec<Header>,
) {
let message = Message::new(key, value, headers);
match self
.topic_partitions
.iter_mut()
.find(|tp| tp.index == topic)
{
Some(tp) => {
tp.add(partition, message);
}
None => {
let mut tp = TopicPartition::new(topic, self.attributes.clone());
tp.add(partition, message);
self.topic_partitions.push(tp);
}
}
}
}
impl ToByte for ProduceRequest<'_> {
fn encode<W: BufMut>(&self, buffer: &mut W) -> Result<()> {
tracing::trace!("Encoding ProduceRequest {:?}", self);
self.header.encode(buffer)?;
self.transactional_id.encode(buffer)?;
self.required_acks.encode(buffer)?;
self.timeout_ms.encode(buffer)?;
self.topic_partitions.encode(buffer)?;
Ok(())
}
}
#[derive(Debug)]
struct TopicPartition<'a> {
pub index: &'a str,
pub partitions: Vec<Partition>,
attributes: Attributes,
}
impl<'a> TopicPartition<'a> {
pub fn new(index: &'a str, attributes: Attributes) -> TopicPartition<'a> {
TopicPartition {
index,
partitions: vec![],
attributes,
}
}
pub fn add(&mut self, partition: i32, message: Message) {
match self
.partitions
.iter_mut()
.find(|p| p.partition == partition)
{
Some(p) => {
p.add(message);
}
None => {
let mut p = Partition::new(partition, self.attributes.clone());
p.add(message);
self.partitions.push(p);
}
}
}
}
impl ToByte for TopicPartition<'_> {
fn encode<W: BufMut>(&self, buffer: &mut W) -> Result<()> {
tracing::trace!("Encoding TopicPartition {:?}", self);
self.index.encode(buffer)?;
self.partitions.encode(buffer)?;
Ok(())
}
}
#[derive(Debug)]
struct Partition {
pub partition: i32,
pub batches: Vec<RecordBatch>,
attributes: Attributes,
}
impl Partition {
pub fn new(partition: i32, attributes: Attributes) -> Partition {
Partition {
partition,
batches: Vec::new(),
attributes,
}
}
pub fn add(&mut self, message: Message) {
if self.batches.is_empty() {
self.batches.push(RecordBatch::new(self.attributes.clone()));
}
self.batches[0].add(message);
}
}
impl ToByte for Partition {
fn encode<W: BufMut>(&self, out: &mut W) -> Result<()> {
tracing::trace!("Encoding Partition {:?}", self);
self.partition.encode(out)?;
let mut buf = Vec::with_capacity(4);
for msg in &self.batches {
msg._encode_to_buf(&mut buf)?;
}
buf.encode(out)
}
}
#[derive(Clone, Debug)]
pub struct Message {
pub key: Option<Bytes>,
pub value: Option<Bytes>,
pub headers: Vec<Header>,
}
impl Message {
pub fn new(key: Option<Bytes>, value: Option<Bytes>, headers: Vec<Header>) -> Message {
Message {
key,
value,
headers,
}
}
}
#[derive(Debug)]
pub struct RecordBatch {
base_offset: i64,
partition_leader_epoch: i32,
magic: i8,
crc: u32,
attributes: Attributes,
last_offset_delta: i32,
base_timestamp: i64,
max_timestamp: i64,
producer_id: i64,
producer_epoch: i16,
base_sequence: i32,
records: Vec<Record>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct Attributes {
pub compression: Option<Compression>,
}
impl Attributes {
pub fn new(compression: Option<Compression>) -> Self {
Attributes { compression }
}
}
impl From<i16> for Attributes {
fn from(n: i16) -> Self {
let compression = match n & 0x07 {
1 => Some(Compression::Gzip),
2 => Some(Compression::Snappy),
3 => Some(Compression::Lz4),
4 => Some(Compression::Zstd),
_ => None,
};
Self::new(compression)
}
}
impl ToByte for Attributes {
fn encode<W: BufMut>(&self, out: &mut W) -> Result<()> {
let mut attr: i16 = 0;
attr = match self.compression {
Some(Compression::Gzip) => attr | 1,
Some(Compression::Snappy) => attr | 2,
Some(Compression::Lz4) => attr | 3,
Some(Compression::Zstd) => attr | 4,
None => attr,
};
attr.encode(out)?;
Ok(())
}
}
impl RecordBatch {
pub fn new(attributes: Attributes) -> Self {
Self {
base_offset: 0,
partition_leader_epoch: -1,
magic: MESSAGE_MAGIC_BYTE,
crc: 0,
attributes,
last_offset_delta: -1,
base_timestamp: now(),
max_timestamp: 0,
producer_id: -1,
producer_epoch: -1,
base_sequence: -1,
records: Vec::new(),
}
}
pub fn add(&mut self, message: Message) {
self.last_offset_delta += 1;
self.max_timestamp = now();
let timestamp_delta = self.max_timestamp - self.base_timestamp;
let offset_delta = self.last_offset_delta;
let record = Record::new(message, timestamp_delta as usize, offset_delta as usize);
self.records.push(record);
}
pub fn _encode_to_buf(&self, out: &mut Vec<u8>) -> Result<()> {
self.base_offset.encode(out)?;
let mut buf = Vec::with_capacity(4);
self.partition_leader_epoch.encode(&mut buf)?;
self.magic.encode(&mut buf)?;
let crc_pos = 5;
self.crc.encode(&mut buf)?;
self.attributes.encode(&mut buf)?;
self.last_offset_delta.encode(&mut buf)?;
self.base_timestamp.encode(&mut buf)?;
self.max_timestamp.encode(&mut buf)?;
self.producer_id.encode(&mut buf)?;
self.producer_epoch.encode(&mut buf)?;
self.base_sequence.encode(&mut buf)?;
match self.attributes.compression {
Some(Compression::Gzip) => {
let mut compressed = Vec::new();
for record in &self.records {
record.encode(&mut compressed)?;
}
compressed = compress(&compressed)?;
(self.records.len() as i32).encode(&mut buf)?;
buf.put(compressed.as_ref());
}
Some(Compression::Snappy) => {
let mut uncompressed = Vec::new();
for record in &self.records {
record.encode(&mut uncompressed)?;
}
let compressed = compress_snappy(&uncompressed)?;
(self.records.len() as i32).encode(&mut buf)?;
buf.put(compressed.as_ref());
}
Some(Compression::Lz4) => {
let mut uncompressed = Vec::new();
for record in &self.records {
record.encode(&mut uncompressed)?;
}
let compressed = compress_lz4(&uncompressed)?;
(self.records.len() as i32).encode(&mut buf)?;
buf.put(compressed.as_ref());
}
Some(Compression::Zstd) => {
let mut uncompressed = Vec::new();
for record in &self.records {
record.encode(&mut uncompressed)?;
}
let compressed = compress_zstd(&uncompressed)?;
(self.records.len() as i32).encode(&mut buf)?;
buf.put(compressed.as_ref());
}
None => self.records.encode(&mut buf)?,
}
let crc = to_crc(&buf[(crc_pos + 4)..]);
crc.encode(&mut &mut buf[crc_pos..crc_pos + 4])?;
buf.encode(out)?;
Ok(())
}
}
#[derive(Debug)]
pub struct Record {
attributes: i8,
timestamp_delta: usize,
offset_delta: usize,
key_length: usize,
key: Option<Bytes>,
value_length: usize,
value: Option<Bytes>,
headers: Vec<Header>,
}
impl Record {
pub fn new(message: Message, timestamp_delta: usize, offset_delta: usize) -> Self {
Self {
attributes: 0,
timestamp_delta,
offset_delta,
key_length: match &message.key {
Some(key) => key.len(),
None => 0,
},
key: message.key,
value_length: match &message.value {
Some(value) => value.len(),
None => 0,
},
value: message.value,
headers: message.headers,
}
}
pub fn _encode_to_buf(&self, out: &mut Vec<u8>) -> Result<()> {
self.attributes.encode(out)?;
self.timestamp_delta.encode(out)?;
self.offset_delta.encode(out)?;
self.key_length.encode(out)?;
out.put(self.key.clone().unwrap_or(Bytes::from("")));
self.value_length.encode(out)?;
out.put(self.value.clone().unwrap_or(Bytes::from("")));
let header_length = self.headers.len();
header_length.encode(out)?;
for header in &self.headers {
header.encode(out)?;
}
Ok(())
}
}
impl ToByte for Record {
fn encode<W: BufMut>(&self, out: &mut W) -> Result<()> {
let mut buf = Vec::with_capacity(4);
self._encode_to_buf(&mut buf)?;
let length = buf.len();
length.encode(out)?;
out.put(buf.as_ref());
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct Header {
header_key_length: usize,
header_key: String,
header_value_length: usize,
value: Bytes,
}
impl Header {
pub fn new(key: String, value: Bytes) -> Self {
Self {
header_key_length: key.len(),
header_key: key,
header_value_length: value.len(),
value,
}
}
}
impl ToByte for Header {
fn encode<W: BufMut>(&self, out: &mut W) -> Result<()> {
self.header_key_length.encode(out)?;
out.put(self.header_key.as_bytes());
self.header_value_length.encode(out)?;
out.put(self.value.as_ref());
Ok(())
}
}