use bytes::{Buf, BufMut, Bytes};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, KrafkaError, ProtocolErrorKind, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::primitives::{
Decode, Encode, KafkaBytes, KafkaString, TaggedFields, TryEncode,
};
use crate::protocol::{
array_len_i32, check_compact_array_len, check_compact_nullable_array_len,
check_decode_array_len, check_decode_nullable_array_len,
};
#[derive(Debug, Clone)]
pub struct ProduceRequest {
pub transactional_id: Option<String>,
pub acks: i16,
pub timeout_ms: i32,
pub topic_data: Vec<ProduceTopicData>,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ProduceTopicData {
pub name: String,
pub topic_id: Option<[u8; 16]>,
pub partition_data: Vec<ProducePartitionData>,
}
#[derive(Debug, Clone)]
pub struct ProducePartitionData {
pub index: i32,
pub records: Bytes,
}
impl ProduceRequest {
pub fn api_key() -> ApiKey {
ApiKey::Produce
}
pub fn encode_v3(&self, buf: &mut impl BufMut) -> Result<()> {
match &self.transactional_id {
Some(id) => KafkaString::new(id).try_encode(buf)?,
None => KafkaString::null().try_encode(buf)?,
}
self.acks.encode(buf);
self.timeout_ms.encode(buf);
buf.put_i32(array_len_i32(self.topic_data.len())?);
for topic in &self.topic_data {
KafkaString::new(&topic.name).try_encode(buf)?;
buf.put_i32(array_len_i32(topic.partition_data.len())?);
for partition in &topic.partition_data {
partition.index.encode(buf);
KafkaBytes::new(partition.records.clone()).try_encode(buf)?;
}
}
Ok(())
}
pub fn encode_v9(&self, buf: &mut impl BufMut) -> Result<()> {
match &self.transactional_id {
Some(id) => KafkaString::new(id).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
self.acks.encode(buf);
self.timeout_ms.encode(buf);
let topics_len = u32::try_from(self.topic_data.len().saturating_add(1)).map_err(|_| {
KrafkaError::protocol_kind(ProtocolErrorKind::InvalidLength, "topics array too large")
})?;
crate::util::varint::encode_unsigned_varint(topics_len, buf);
for topic in &self.topic_data {
KafkaString::new(&topic.name).try_encode_compact(buf)?;
let parts_len =
u32::try_from(topic.partition_data.len().saturating_add(1)).map_err(|_| {
KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidLength,
"partitions array too large",
)
})?;
crate::util::varint::encode_unsigned_varint(parts_len, buf);
for partition in &topic.partition_data {
partition.index.encode(buf);
KafkaBytes::new(partition.records.clone()).try_encode_compact(buf)?;
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v13(&self, buf: &mut impl BufMut) -> Result<()> {
match &self.transactional_id {
Some(id) => KafkaString::new(id).try_encode_compact(buf)?,
None => KafkaString::null().try_encode_compact(buf)?,
}
self.acks.encode(buf);
self.timeout_ms.encode(buf);
let topics_len = u32::try_from(self.topic_data.len().saturating_add(1)).map_err(|_| {
KrafkaError::protocol_kind(ProtocolErrorKind::InvalidLength, "topics array too large")
})?;
crate::util::varint::encode_unsigned_varint(topics_len, buf);
for topic in &self.topic_data {
let topic_id = topic.topic_id.ok_or_else(|| {
KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidValue,
"topic_id is required for Produce v13+ (KIP-516)",
)
})?;
buf.put_slice(&topic_id);
let parts_len =
u32::try_from(topic.partition_data.len().saturating_add(1)).map_err(|_| {
KrafkaError::protocol_kind(
ProtocolErrorKind::InvalidLength,
"partitions array too large",
)
})?;
crate::util::varint::encode_unsigned_varint(parts_len, buf);
for partition in &topic.partition_data {
partition.index.encode(buf);
KafkaBytes::new(partition.records.clone()).try_encode_compact(buf)?;
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone, Default)]
pub struct ProduceResponse {
pub responses: Vec<ProduceTopicResponse>,
pub throttle_time_ms: i32,
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct ProduceTopicResponse {
pub name: String,
pub topic_id: Option<[u8; 16]>,
pub partition_responses: Vec<ProducePartitionResponse>,
}
#[derive(Debug, Clone)]
pub struct ProducePartitionResponse {
pub index: i32,
pub error_code: ErrorCode,
pub base_offset: i64,
pub log_append_time_ms: i64,
pub log_start_offset: i64,
}
impl ProduceResponse {
pub fn decode_v3(buf: &mut impl Buf) -> Result<Self> {
let topic_count = check_decode_array_len(i32::decode(buf)?)?;
let mut responses = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let name = non_nullable_string("topic name", KafkaString::decode(buf)?.0)?;
let partition_count = check_decode_array_len(i32::decode(buf)?)?;
let mut partition_responses = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let index = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let base_offset = i64::decode(buf)?;
let log_append_time_ms = i64::decode(buf)?;
partition_responses.push(ProducePartitionResponse {
index,
error_code,
base_offset,
log_append_time_ms,
log_start_offset: -1,
});
}
responses.push(ProduceTopicResponse {
name,
topic_id: None,
partition_responses,
});
}
let throttle_time_ms = i32::decode(buf)?;
Ok(Self {
responses,
throttle_time_ms,
})
}
pub fn decode_v5(buf: &mut impl Buf) -> Result<Self> {
let topic_count = check_decode_array_len(i32::decode(buf)?)?;
let mut responses = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let name = non_nullable_string("topic name", KafkaString::decode(buf)?.0)?;
let partition_count = check_decode_array_len(i32::decode(buf)?)?;
let mut partition_responses = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let index = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let base_offset = i64::decode(buf)?;
let log_append_time_ms = i64::decode(buf)?;
let log_start_offset = i64::decode(buf)?;
partition_responses.push(ProducePartitionResponse {
index,
error_code,
base_offset,
log_append_time_ms,
log_start_offset,
});
}
responses.push(ProduceTopicResponse {
name,
topic_id: None,
partition_responses,
});
}
let throttle_time_ms = i32::decode(buf)?;
Ok(Self {
responses,
throttle_time_ms,
})
}
pub fn decode_v8(buf: &mut impl Buf) -> Result<Self> {
let topic_count = check_decode_array_len(i32::decode(buf)?)?;
let mut responses = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let name = non_nullable_string("topic name", KafkaString::decode(buf)?.0)?;
let partition_count = check_decode_array_len(i32::decode(buf)?)?;
let mut partition_responses = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let index = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let base_offset = i64::decode(buf)?;
let log_append_time_ms = i64::decode(buf)?;
let log_start_offset = i64::decode(buf)?;
let record_errors_count = check_decode_nullable_array_len(i32::decode(buf)?)?;
for _ in 0..record_errors_count {
let _ = i32::decode(buf)?; let _ = KafkaString::decode(buf)?; }
let _ = KafkaString::decode(buf)?;
partition_responses.push(ProducePartitionResponse {
index,
error_code,
base_offset,
log_append_time_ms,
log_start_offset,
});
}
responses.push(ProduceTopicResponse {
name,
topic_id: None,
partition_responses,
});
}
let throttle_time_ms = i32::decode(buf)?;
Ok(Self {
responses,
throttle_time_ms,
})
}
pub fn decode_v9(buf: &mut impl Buf) -> Result<Self> {
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut responses = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let name = non_nullable_string("topic name", KafkaString::decode_compact(buf)?.0)?;
let part_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partition_responses = Vec::with_capacity(part_count);
for _ in 0..part_count {
let index = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let base_offset = i64::decode(buf)?;
let log_append_time_ms = i64::decode(buf)?;
let log_start_offset = i64::decode(buf)?;
let re_count = check_compact_nullable_array_len(
crate::util::varint::decode_unsigned_varint(buf)?,
)?;
if re_count > 0 {
for _ in 0..re_count {
let _ = i32::decode(buf)?;
let _ = KafkaString::decode_compact(buf)?;
let _ = TaggedFields::decode(buf)?;
}
}
let _ = KafkaString::decode_compact(buf)?;
let _ = TaggedFields::decode(buf)?;
partition_responses.push(ProducePartitionResponse {
index,
error_code,
base_offset,
log_append_time_ms,
log_start_offset,
});
}
let _ = TaggedFields::decode(buf)?;
responses.push(ProduceTopicResponse {
name,
topic_id: None,
partition_responses,
});
}
let throttle_time_ms = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self {
responses,
throttle_time_ms,
})
}
pub fn decode_v13(buf: &mut impl Buf) -> Result<Self> {
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut responses = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
if buf.remaining() < 16 {
return Err(KrafkaError::protocol_kind(
ProtocolErrorKind::TruncatedFrame,
"not enough bytes for topic_id UUID",
));
}
let mut topic_id = [0u8; 16];
buf.copy_to_slice(&mut topic_id);
let part_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partition_responses = Vec::with_capacity(part_count);
for _ in 0..part_count {
let index = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let base_offset = i64::decode(buf)?;
let log_append_time_ms = i64::decode(buf)?;
let log_start_offset = i64::decode(buf)?;
let re_count = check_compact_nullable_array_len(
crate::util::varint::decode_unsigned_varint(buf)?,
)?;
if re_count > 0 {
for _ in 0..re_count {
let _ = i32::decode(buf)?;
let _ = KafkaString::decode_compact(buf)?;
let _ = TaggedFields::decode(buf)?;
}
}
let _ = KafkaString::decode_compact(buf)?;
let _ = TaggedFields::decode(buf)?;
partition_responses.push(ProducePartitionResponse {
index,
error_code,
base_offset,
log_append_time_ms,
log_start_offset,
});
}
let _ = TaggedFields::decode(buf)?;
responses.push(ProduceTopicResponse {
name: String::new(),
topic_id: Some(topic_id),
partition_responses,
});
}
let throttle_time_ms = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self {
responses,
throttle_time_ms,
})
}
}
impl VersionedEncode for ProduceRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
3..=8 => self.encode_v3(buf)?,
9..=12 => self.encode_v9(buf)?,
13 => self.encode_v13(buf)?,
_ => return unsupported_encode!("ProduceRequest", version),
}
Ok(())
}
}
impl VersionedDecode for ProduceResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
3..=4 => Self::decode_v3(buf),
5..=7 => Self::decode_v5(buf),
8 => Self::decode_v8(buf),
9..=12 => Self::decode_v9(buf),
13 => Self::decode_v13(buf),
_ => unsupported_decode!("ProduceResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::protocol::*;
use crate::util::varint;
use bytes::BytesMut;
use rstest::rstest;
fn sample_produce_request() -> ProduceRequest {
ProduceRequest {
transactional_id: None,
acks: -1,
timeout_ms: 30_000,
topic_data: vec![ProduceTopicData {
name: "orders".to_string(),
topic_id: None,
partition_data: vec![ProducePartitionData {
index: 0,
records: bytes::Bytes::from_static(&[0xCA, 0xFE]),
}],
}],
}
}
fn sample_produce_request_with_data() -> ProduceRequest {
ProduceRequest {
transactional_id: Some("txn-abc".to_string()),
acks: -1,
timeout_ms: 30_000,
topic_data: vec![
ProduceTopicData {
name: "orders".to_string(),
topic_id: None,
partition_data: vec![
ProducePartitionData {
index: 0,
records: Bytes::from_static(&[0xDE, 0xAD, 0xBE, 0xEF]),
},
ProducePartitionData {
index: 1,
records: Bytes::from_static(&[0xCA, 0xFE]),
},
],
},
ProduceTopicData {
name: "events".to_string(),
topic_id: None,
partition_data: vec![ProducePartitionData {
index: 0,
records: Bytes::from_static(&[0x01, 0x02, 0x03]),
}],
},
],
}
}
#[test]
fn test_produce_request_v3_encodes_transactional_id() {
let request = sample_produce_request();
let mut buf = BytesMut::new();
request.encode_versioned(3, &mut buf).unwrap();
let mut r = buf.freeze();
let tid = KafkaString::decode(&mut r).unwrap().0;
assert!(tid.is_none());
assert_eq!(i16::decode(&mut r).unwrap(), -1); assert_eq!(i32::decode(&mut r).unwrap(), 30_000); }
#[test]
fn test_produce_request_v9_flexible_encoding() {
let request = sample_produce_request();
let mut buf = BytesMut::new();
request.encode_versioned(9, &mut buf).unwrap();
let mut buf3 = BytesMut::new();
request.encode_versioned(3, &mut buf3).unwrap();
assert!(!buf.is_empty());
assert!(!buf3.is_empty());
}
#[test]
fn test_produce_request_below_min_rejected() {
let request = sample_produce_request();
let mut buf = BytesMut::new();
assert!(request.encode_versioned(0, &mut buf).is_err());
let mut buf2 = BytesMut::new();
assert!(request.encode_versioned(2, &mut buf2).is_err());
}
#[test]
fn test_produce_response_decode_v3() {
let mut buf = BytesMut::new();
buf.put_i32(1);
let topic = b"orders";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_i32(1);
buf.put_i32(0); buf.put_i16(0); buf.put_i64(42); buf.put_i64(1_700_000_000_000); buf.put_i32(100);
let resp = ProduceResponse::decode_versioned(3, &mut buf.freeze()).unwrap();
assert_eq!(resp.responses.len(), 1);
assert_eq!(resp.responses[0].name, "orders");
assert_eq!(resp.responses[0].partition_responses[0].base_offset, 42);
assert_eq!(
resp.responses[0].partition_responses[0].log_append_time_ms,
1_700_000_000_000
);
assert_eq!(resp.throttle_time_ms, 100);
}
#[test]
fn test_produce_response_decode_v5_has_log_start_offset() {
let mut buf = BytesMut::new();
buf.put_i32(1);
let topic = b"t1";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_i32(1);
buf.put_i32(0); buf.put_i16(0); buf.put_i64(100); buf.put_i64(-1); buf.put_i64(50); buf.put_i32(0);
let resp = ProduceResponse::decode_versioned(5, &mut buf.freeze()).unwrap();
assert_eq!(
resp.responses[0].partition_responses[0].log_start_offset,
50
);
}
#[rstest]
#[case::produce_v0(0)]
#[case::produce_v1(1)]
#[case::produce_v2(2)]
fn test_produce_encode_below_min(#[case] version: i16) {
let request = sample_produce_request();
let mut buf = BytesMut::new();
assert!(request.encode_versioned(version, &mut buf).is_err());
}
#[rstest]
#[case::v3_min(3)]
#[case::v8_last_non_flexible(8)]
fn test_produce_request_encode_non_flexible(#[case] version: i16) {
let request = sample_produce_request_with_data();
let mut buf = BytesMut::new();
request.encode_versioned(version, &mut buf).unwrap();
assert!(!buf.is_empty());
let mut buf2 = BytesMut::new();
request.encode_v3(&mut buf2).unwrap();
assert_eq!(buf, buf2);
}
#[rstest]
#[case::v9_first_flexible(9)]
#[case::v11_max(11)]
fn test_produce_request_encode_flexible(#[case] version: i16) {
let request = sample_produce_request_with_data();
let mut buf = BytesMut::new();
request.encode_versioned(version, &mut buf).unwrap();
assert!(!buf.is_empty());
let mut buf_v3 = BytesMut::new();
request.encode_v3(&mut buf_v3).unwrap();
assert_ne!(
buf.as_ref(),
buf_v3.as_ref(),
"Flexible should differ from non-flexible"
);
}
#[test]
fn test_produce_request_null_transactional_id() {
let request = ProduceRequest {
transactional_id: None,
acks: 1,
timeout_ms: 5000,
topic_data: vec![ProduceTopicData {
name: "t".to_string(),
topic_id: None,
partition_data: vec![ProducePartitionData {
index: 0,
records: Bytes::from_static(&[0x00]),
}],
}],
};
let mut buf_v3 = BytesMut::new();
request.encode_versioned(3, &mut buf_v3).unwrap();
assert_eq!(i16::from_be_bytes([buf_v3[0], buf_v3[1]]), -1);
let mut buf_v9 = BytesMut::new();
request.encode_versioned(9, &mut buf_v9).unwrap();
assert_eq!(buf_v9[0], 0x00);
}
#[test]
fn test_produce_request_empty_topic_data() {
let request = ProduceRequest {
transactional_id: None,
acks: 0,
timeout_ms: 1000,
topic_data: vec![],
};
let mut buf = BytesMut::new();
request.encode_versioned(3, &mut buf).unwrap();
assert!(!buf.is_empty());
let mut buf_flex = BytesMut::new();
request.encode_versioned(9, &mut buf_flex).unwrap();
assert!(!buf_flex.is_empty());
}
#[test]
fn test_produce_response_decode_v3_wire_format() {
let mut buf = BytesMut::new();
buf.put_i32(1);
let topic = b"orders";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_i32(1);
buf.put_i32(0);
buf.put_i16(0);
buf.put_i64(42);
buf.put_i64(1_700_000_000_000);
buf.put_i32(100);
let resp = ProduceResponse::decode_versioned(3, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 100);
assert_eq!(resp.responses.len(), 1);
assert_eq!(resp.responses[0].name, "orders");
assert_eq!(resp.responses[0].partition_responses.len(), 1);
assert_eq!(resp.responses[0].partition_responses[0].index, 0);
assert!(resp.responses[0].partition_responses[0].error_code.is_ok());
assert_eq!(resp.responses[0].partition_responses[0].base_offset, 42);
assert_eq!(
resp.responses[0].partition_responses[0].log_append_time_ms,
1_700_000_000_000
);
assert_eq!(
resp.responses[0].partition_responses[0].log_start_offset,
-1
);
}
#[test]
fn test_produce_response_decode_v5_log_start_offset() {
let mut buf = BytesMut::new();
buf.put_i32(1); let topic = b"events";
buf.put_i16(topic.len() as i16);
buf.put_slice(topic);
buf.put_i32(1); buf.put_i32(2); buf.put_i16(0); buf.put_i64(100); buf.put_i64(-1); buf.put_i64(50); buf.put_i32(0);
let resp = ProduceResponse::decode_versioned(5, &mut buf.freeze()).unwrap();
assert_eq!(
resp.responses[0].partition_responses[0].log_start_offset,
50
);
assert_eq!(resp.responses[0].partition_responses[0].base_offset, 100);
}
#[test]
fn test_produce_response_decode_v9_flexible() {
let mut buf = BytesMut::new();
varint::encode_unsigned_varint(2, &mut buf); let topic = b"test-topic";
varint::encode_unsigned_varint(topic.len() as u32 + 1, &mut buf);
buf.put_slice(topic);
varint::encode_unsigned_varint(2, &mut buf); buf.put_i32(0); buf.put_i16(0); buf.put_i64(999); buf.put_i64(-1); buf.put_i64(10); varint::encode_unsigned_varint(1, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
buf.put_i32(50);
varint::encode_unsigned_varint(0, &mut buf);
let resp = ProduceResponse::decode_versioned(9, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 50);
assert_eq!(resp.responses.len(), 1);
assert_eq!(resp.responses[0].name, "test-topic");
assert_eq!(resp.responses[0].partition_responses[0].base_offset, 999);
assert_eq!(
resp.responses[0].partition_responses[0].log_start_offset,
10
);
}
#[test]
fn test_produce_response_decode_v9_multi_topic_multi_partition() {
let mut buf = BytesMut::new();
varint::encode_unsigned_varint(3, &mut buf);
varint::encode_unsigned_varint(3, &mut buf); buf.put_slice(b"t1");
varint::encode_unsigned_varint(3, &mut buf); for p in [0i32, 1] {
buf.put_i32(p); buf.put_i16(0); buf.put_i64(p as i64 * 100); buf.put_i64(-1); buf.put_i64(0); varint::encode_unsigned_varint(1, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); }
varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(3, &mut buf);
buf.put_slice(b"t2");
varint::encode_unsigned_varint(2, &mut buf); buf.put_i32(0);
buf.put_i16(0);
buf.put_i64(500);
buf.put_i64(-1);
buf.put_i64(0);
varint::encode_unsigned_varint(1, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
buf.put_i32(0); varint::encode_unsigned_varint(0, &mut buf);
let resp = ProduceResponse::decode_versioned(11, &mut buf.freeze()).unwrap();
assert_eq!(resp.responses.len(), 2);
assert_eq!(resp.responses[0].name, "t1");
assert_eq!(resp.responses[0].partition_responses.len(), 2);
assert_eq!(resp.responses[0].partition_responses[1].base_offset, 100);
assert_eq!(resp.responses[1].name, "t2");
assert_eq!(resp.responses[1].partition_responses[0].base_offset, 500);
}
#[test]
fn test_produce_response_decode_v12_same_as_v11() {
let mut buf = BytesMut::new();
varint::encode_unsigned_varint(2, &mut buf); let topic = b"tp";
varint::encode_unsigned_varint(topic.len() as u32 + 1, &mut buf);
buf.put_slice(topic);
varint::encode_unsigned_varint(2, &mut buf); buf.put_i32(0);
buf.put_i16(0);
buf.put_i64(42);
buf.put_i64(-1);
buf.put_i64(0);
varint::encode_unsigned_varint(1, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); buf.put_i32(5); varint::encode_unsigned_varint(0, &mut buf);
let resp = ProduceResponse::decode_versioned(12, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 5);
assert_eq!(resp.responses[0].name, "tp");
assert_eq!(resp.responses[0].partition_responses[0].base_offset, 42);
}
#[test]
fn test_produce_response_decode_v13_topic_id() {
let mut buf = BytesMut::new();
varint::encode_unsigned_varint(2, &mut buf); let topic_id: [u8; 16] = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
buf.put_slice(&topic_id);
varint::encode_unsigned_varint(2, &mut buf); buf.put_i32(0);
buf.put_i16(0);
buf.put_i64(99);
buf.put_i64(-1);
buf.put_i64(0);
varint::encode_unsigned_varint(1, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
buf.put_i32(0); varint::encode_unsigned_varint(0, &mut buf);
let resp = ProduceResponse::decode_versioned(13, &mut buf.freeze()).unwrap();
assert_eq!(resp.responses[0].topic_id, Some(topic_id));
assert!(resp.responses[0].name.is_empty());
assert_eq!(resp.responses[0].partition_responses[0].base_offset, 99);
}
#[test]
fn test_produce_request_encode_v13_topic_id() {
let topic_id: [u8; 16] = [0xAA; 16];
let request = ProduceRequest {
transactional_id: None,
acks: -1,
timeout_ms: 1500,
topic_data: vec![ProduceTopicData {
name: String::new(),
topic_id: Some(topic_id),
partition_data: vec![],
}],
};
let mut buf = BytesMut::new();
request.encode_v13(&mut buf).unwrap();
let mut cur = &buf[..];
let txn_varint = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(txn_varint, 0);
assert_eq!(cur.get_i16(), -1); assert_eq!(cur.get_i32(), 1500); let topics_varint = varint::decode_unsigned_varint(&mut cur).unwrap();
assert_eq!(topics_varint, 2);
let mut read_id = [0u8; 16];
cur.copy_to_slice(&mut read_id);
assert_eq!(read_id, topic_id);
}
}