use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{
array_len_i32, check_compact_array_len, check_decode_array_len, encode_compact_array_len,
};
#[derive(Debug, Clone)]
pub struct AlterReplicaLogDirTopic {
pub name: String,
pub partitions: Vec<i32>,
}
#[derive(Debug, Clone)]
pub struct AlterReplicaLogDir {
pub path: String,
pub topics: Vec<AlterReplicaLogDirTopic>,
}
#[derive(Debug, Clone)]
pub struct AlterReplicaLogDirsRequest {
pub dirs: Vec<AlterReplicaLogDir>,
}
impl AlterReplicaLogDirsRequest {
pub fn api_key() -> ApiKey {
ApiKey::AlterReplicaLogDirs
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
buf.put_i32(array_len_i32(self.dirs.len())?);
for dir in &self.dirs {
KafkaString::new(&dir.path).try_encode(buf)?;
buf.put_i32(array_len_i32(dir.topics.len())?);
for topic in &dir.topics {
KafkaString::new(&topic.name).try_encode(buf)?;
buf.put_i32(array_len_i32(topic.partitions.len())?);
for &p in &topic.partitions {
p.encode(buf);
}
}
}
Ok(())
}
pub fn encode_v2(&self, buf: &mut impl BufMut) -> Result<()> {
encode_compact_array_len(self.dirs.len(), buf)?;
for dir in &self.dirs {
KafkaString::new(&dir.path).try_encode_compact(buf)?;
encode_compact_array_len(dir.topics.len(), buf)?;
for topic in &dir.topics {
KafkaString::new(&topic.name).try_encode_compact(buf)?;
encode_compact_array_len(topic.partitions.len(), buf)?;
for &p in &topic.partitions {
p.encode(buf);
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
impl VersionedEncode for AlterReplicaLogDirsRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
1 => self.encode_v1(buf),
2 => self.encode_v2(buf),
_ => unsupported_encode!("AlterReplicaLogDirsRequest", version),
}
}
}
#[derive(Debug, Clone)]
pub struct AlterReplicaLogDirsPartitionResult {
pub partition_index: i32,
pub error_code: ErrorCode,
}
#[derive(Debug, Clone)]
pub struct AlterReplicaLogDirsTopicResult {
pub topic_name: String,
pub partitions: Vec<AlterReplicaLogDirsPartitionResult>,
}
#[derive(Debug, Clone)]
pub struct AlterReplicaLogDirsResponse {
pub throttle_time_ms: i32,
pub results: Vec<AlterReplicaLogDirsTopicResult>,
}
impl AlterReplicaLogDirsResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let topic_count = check_decode_array_len(i32::decode(buf)?)?;
let mut results = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let topic_name = non_nullable_string("topic_name", KafkaString::decode(buf)?.0)?;
let partition_count = check_decode_array_len(i32::decode(buf)?)?;
let mut partitions = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let partition_index = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
partitions.push(AlterReplicaLogDirsPartitionResult {
partition_index,
error_code,
});
}
results.push(AlterReplicaLogDirsTopicResult {
topic_name,
partitions,
});
}
Ok(Self {
throttle_time_ms,
results,
})
}
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let topic_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut results = Vec::with_capacity(topic_count);
for _ in 0..topic_count {
let topic_name =
non_nullable_string("topic_name", KafkaString::decode_compact(buf)?.0)?;
let partition_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut partitions = Vec::with_capacity(partition_count);
for _ in 0..partition_count {
let partition_index = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let _ = TaggedFields::decode(buf)?;
partitions.push(AlterReplicaLogDirsPartitionResult {
partition_index,
error_code,
});
}
let _ = TaggedFields::decode(buf)?;
results.push(AlterReplicaLogDirsTopicResult {
topic_name,
partitions,
});
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
results,
})
}
}
impl VersionedDecode for AlterReplicaLogDirsResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1 => Self::decode_v1(buf),
2 => Self::decode_v2(buf),
_ => unsupported_decode!("AlterReplicaLogDirsResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use bytes::BytesMut;
fn put_kafka_string(buf: &mut BytesMut, s: &str) {
buf.put_i16(s.len() as i16);
buf.put_slice(s.as_bytes());
}
fn put_compact_string(buf: &mut BytesMut, s: &str) {
crate::util::varint::encode_unsigned_varint((s.len() + 1) as u32, buf);
buf.put_slice(s.as_bytes());
}
fn put_empty_tagged_fields(buf: &mut BytesMut) {
crate::util::varint::encode_unsigned_varint(0, buf);
}
fn put_compact_array_len(buf: &mut BytesMut, count: usize) {
crate::util::varint::encode_unsigned_varint((count + 1) as u32, buf);
}
#[test]
fn test_alter_replica_log_dirs_api_key() {
assert_eq!(
AlterReplicaLogDirsRequest::api_key(),
ApiKey::AlterReplicaLogDirs
);
}
#[test]
fn test_alter_replica_log_dirs_encode_v1() {
let request = AlterReplicaLogDirsRequest {
dirs: vec![AlterReplicaLogDir {
path: "/data/kafka-logs".to_string(),
topics: vec![AlterReplicaLogDirTopic {
name: "my-topic".to_string(),
partitions: vec![0, 1, 2],
}],
}],
};
let mut buf = BytesMut::new();
request.encode_v1(&mut buf).unwrap();
assert!(!buf.is_empty());
}
#[test]
fn test_alter_replica_log_dirs_encode_v2() {
let request = AlterReplicaLogDirsRequest {
dirs: vec![AlterReplicaLogDir {
path: "/data/kafka-logs-2".to_string(),
topics: vec![AlterReplicaLogDirTopic {
name: "other-topic".to_string(),
partitions: vec![0],
}],
}],
};
let mut buf = BytesMut::new();
request.encode_v2(&mut buf).unwrap();
assert!(!buf.is_empty());
}
#[test]
fn test_alter_replica_log_dirs_versioned_dispatch() {
let request = AlterReplicaLogDirsRequest { dirs: Vec::new() };
for v in 1..=2 {
let mut buf = BytesMut::new();
request.encode_versioned(v, &mut buf).unwrap();
}
}
#[test]
fn test_alter_replica_log_dirs_versioned_unsupported() {
let request = AlterReplicaLogDirsRequest { dirs: Vec::new() };
let mut buf = BytesMut::new();
assert!(request.encode_versioned(0, &mut buf).is_err());
assert!(request.encode_versioned(3, &mut buf).is_err());
}
#[test]
fn test_alter_replica_log_dirs_response_decode_v1() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i32(1); put_kafka_string(&mut buf, "my-topic");
buf.put_i32(2); buf.put_i32(0); buf.put_i16(0); buf.put_i32(1); buf.put_i16(0);
let resp = AlterReplicaLogDirsResponse::decode_v1(&mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 0);
assert_eq!(resp.results.len(), 1);
assert_eq!(resp.results[0].topic_name, "my-topic");
assert_eq!(resp.results[0].partitions.len(), 2);
assert!(resp.results[0].partitions[0].error_code.is_ok());
assert!(resp.results[0].partitions[1].error_code.is_ok());
}
#[test]
fn test_alter_replica_log_dirs_response_decode_v2() {
let mut buf = BytesMut::new();
buf.put_i32(100); put_compact_array_len(&mut buf, 1); put_compact_string(&mut buf, "my-topic");
put_compact_array_len(&mut buf, 1); buf.put_i32(0); buf.put_i16(0); put_empty_tagged_fields(&mut buf); put_empty_tagged_fields(&mut buf); put_empty_tagged_fields(&mut buf);
let resp = AlterReplicaLogDirsResponse::decode_v2(&mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 100);
assert_eq!(resp.results.len(), 1);
assert_eq!(resp.results[0].topic_name, "my-topic");
assert!(resp.results[0].partitions[0].error_code.is_ok());
}
#[test]
fn test_alter_replica_log_dirs_versioned_decode_dispatch() {
let mut buf = BytesMut::new();
buf.put_i32(0);
buf.put_i32(0); let resp = AlterReplicaLogDirsResponse::decode_versioned(1, &mut buf.freeze()).unwrap();
assert!(resp.results.is_empty());
let mut buf = BytesMut::new();
buf.put_i32(0);
put_compact_array_len(&mut buf, 0);
put_empty_tagged_fields(&mut buf);
let resp = AlterReplicaLogDirsResponse::decode_versioned(2, &mut buf.freeze()).unwrap();
assert!(resp.results.is_empty());
}
#[test]
fn test_alter_replica_log_dirs_versioned_decode_unsupported() {
let buf = BytesMut::new();
assert!(
AlterReplicaLogDirsResponse::decode_versioned(0, &mut buf.clone().freeze()).is_err()
);
assert!(AlterReplicaLogDirsResponse::decode_versioned(3, &mut buf.freeze()).is_err());
}
}