use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, KrafkaError, Result};
use crate::protocol::api::ApiKey;
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{check_compact_array_len, encode_compact_array_len};
#[derive(Debug, Clone)]
pub struct FindCoordinatorRequest {
pub key: String,
pub key_type: i8,
}
impl FindCoordinatorRequest {
pub fn for_group(group_id: &str) -> Self {
Self {
key: group_id.to_string(),
key_type: 0,
}
}
pub fn for_transaction(transactional_id: &str) -> Self {
Self {
key: transactional_id.to_string(),
key_type: 1,
}
}
pub fn api_key() -> ApiKey {
ApiKey::FindCoordinator
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.key).try_encode(buf)?;
self.key_type.encode(buf);
Ok(())
}
pub fn encode_v3(&self, buf: &mut impl BufMut) -> Result<()> {
KafkaString::new(&self.key).try_encode_compact(buf)?;
self.key_type.encode(buf);
TaggedFields::default().try_encode(buf)?;
Ok(())
}
pub fn encode_v4(&self, buf: &mut impl BufMut) -> Result<()> {
self.key_type.encode(buf);
encode_compact_array_len(1, buf)?;
KafkaString::new(&self.key).try_encode_compact(buf)?;
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct FindCoordinatorResponse {
pub throttle_time_ms: i32,
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub node_id: i32,
pub host: String,
pub port: i32,
}
struct BatchedCoordinatorEntry {
node_id: i32,
host: String,
port: i32,
error_code: ErrorCode,
error_message: Option<String>,
}
impl FindCoordinatorResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode(buf)?.0;
let node_id = i32::decode(buf)?;
let host = non_nullable_string("coordinator host", KafkaString::decode(buf)?.0)?;
let port = i32::decode(buf)?;
Ok(Self {
throttle_time_ms,
error_code,
error_message,
node_id,
host,
port,
})
}
pub fn decode_v3(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let node_id = i32::decode(buf)?;
let host = non_nullable_string("coordinator host", KafkaString::decode_compact(buf)?.0)?;
let port = i32::decode(buf)?;
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
error_code,
error_message,
node_id,
host,
port,
})
}
fn decode_batched_coordinator(buf: &mut impl Buf) -> Result<BatchedCoordinatorEntry> {
let _ = non_nullable_string("coordinator key", KafkaString::decode_compact(buf)?.0)?;
let node_id = i32::decode(buf)?;
let host = non_nullable_string("coordinator host", KafkaString::decode_compact(buf)?.0)?;
let port = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let _ = TaggedFields::decode(buf)?;
Ok(BatchedCoordinatorEntry {
node_id,
host,
port,
error_code,
error_message,
})
}
pub fn decode_v4(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let count = check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
if count == 0 {
let _ = TaggedFields::decode(buf)?;
return Err(KrafkaError::protocol(
"FindCoordinator v4: empty coordinators array",
));
}
let BatchedCoordinatorEntry {
node_id,
host,
port,
error_code,
error_message,
} = Self::decode_batched_coordinator(buf)?;
for _ in 1..count {
let _ = Self::decode_batched_coordinator(buf)?;
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
error_code,
error_message,
node_id,
host,
port,
})
}
}
impl VersionedEncode for FindCoordinatorRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
1..=2 => self.encode_v1(buf)?,
3 => self.encode_v3(buf)?,
4..=6 => self.encode_v4(buf)?,
_ => return unsupported_encode!("FindCoordinatorRequest", version),
}
Ok(())
}
}
impl VersionedDecode for FindCoordinatorResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1..=2 => Self::decode_v1(buf),
3 => Self::decode_v3(buf),
4..=6 => Self::decode_v4(buf),
_ => unsupported_decode!("FindCoordinatorResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use crate::protocol::*;
use crate::util::varint;
use bytes::BytesMut;
use rstest::rstest;
#[test]
fn test_find_coordinator_request() {
let request = FindCoordinatorRequest::for_group("my-group");
assert_eq!(request.key, "my-group");
assert_eq!(request.key_type, 0);
let request = FindCoordinatorRequest::for_transaction("my-txn");
assert_eq!(request.key, "my-txn");
assert_eq!(request.key_type, 1);
}
#[test]
fn test_find_coordinator_request_v1_encode() {
let request = FindCoordinatorRequest {
key: "grp".to_string(),
key_type: 0,
};
let mut buf = BytesMut::new();
request.encode_versioned(1, &mut buf).unwrap();
let mut r = buf.freeze();
let key = KafkaString::decode(&mut r).unwrap().0.unwrap();
assert_eq!(key, "grp");
assert_eq!(i8::decode(&mut r).unwrap(), 0);
}
#[test]
fn test_find_coordinator_request_below_min_rejected() {
let request = FindCoordinatorRequest {
key: "g".to_string(),
key_type: 0,
};
let mut buf = BytesMut::new();
assert!(request.encode_versioned(0, &mut buf).is_err());
}
#[test]
fn test_find_coordinator_response_decode_v1() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i16(0); let msg = b"";
buf.put_i16(msg.len() as i16);
buf.put_slice(msg); buf.put_i32(1); let host = b"broker1";
buf.put_i16(host.len() as i16);
buf.put_slice(host); buf.put_i32(9092);
let resp = FindCoordinatorResponse::decode_versioned(1, &mut buf.freeze()).unwrap();
assert_eq!(resp.node_id, 1);
assert_eq!(resp.host, "broker1");
assert_eq!(resp.port, 9092);
assert_eq!(resp.throttle_time_ms, 0);
}
#[rstest]
#[case::find_coordinator_v0(0)]
fn test_find_coordinator_encode_below_min(#[case] version: i16) {
let request = FindCoordinatorRequest {
key: "g".to_string(),
key_type: 0,
};
let mut buf = BytesMut::new();
assert!(request.encode_versioned(version, &mut buf).is_err());
}
#[rstest]
#[case::v1(1)]
#[case::v2(2)]
fn test_find_coordinator_request_v1_v2(#[case] version: i16) {
let request = FindCoordinatorRequest {
key: "my-group".to_string(),
key_type: 0, };
let mut buf = BytesMut::new();
request.encode_versioned(version, &mut buf).unwrap();
let mut buf2 = BytesMut::new();
request.encode_v1(&mut buf2).unwrap();
assert_eq!(buf, buf2);
}
#[test]
fn test_find_coordinator_request_v3_flexible() {
let request = FindCoordinatorRequest {
key: "txn-id-1".to_string(),
key_type: 1, };
let mut buf_v2 = BytesMut::new();
request.encode_versioned(2, &mut buf_v2).unwrap();
let mut buf_v3 = BytesMut::new();
request.encode_versioned(3, &mut buf_v3).unwrap();
assert_ne!(
buf_v2.as_ref(),
buf_v3.as_ref(),
"v3 flexible should differ from v2"
);
}
#[test]
fn test_find_coordinator_request_v4_batched_keys() {
let request = FindCoordinatorRequest {
key: "my-group".to_string(),
key_type: 0,
};
let mut buf_v3 = BytesMut::new();
request.encode_versioned(3, &mut buf_v3).unwrap();
let mut buf_v4 = BytesMut::new();
request.encode_versioned(4, &mut buf_v4).unwrap();
assert_ne!(
buf_v3.as_ref(),
buf_v4.as_ref(),
"v4 batched should differ from v3"
);
}
#[test]
fn test_find_coordinator_response_decode_v1_wire_format() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i16(0); buf.put_i16(-1); buf.put_i32(1); let host = b"broker-1";
buf.put_i16(host.len() as i16);
buf.put_slice(host);
buf.put_i32(9092);
let resp = FindCoordinatorResponse::decode_versioned(1, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 0);
assert!(resp.error_code.is_ok());
assert!(resp.error_message.is_none());
assert_eq!(resp.node_id, 1);
assert_eq!(resp.host, "broker-1");
assert_eq!(resp.port, 9092);
}
#[test]
fn test_find_coordinator_response_decode_v3_flexible() {
let mut buf = BytesMut::new();
buf.put_i32(50); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf);
buf.put_i32(2); let host = b"kafka-0.internal";
varint::encode_unsigned_varint(host.len() as u32 + 1, &mut buf);
buf.put_slice(host);
buf.put_i32(9093); varint::encode_unsigned_varint(0, &mut buf);
let resp = FindCoordinatorResponse::decode_versioned(3, &mut buf.freeze()).unwrap();
assert_eq!(resp.throttle_time_ms, 50);
assert_eq!(resp.node_id, 2);
assert_eq!(resp.host, "kafka-0.internal");
assert_eq!(resp.port, 9093);
}
#[test]
fn test_find_coordinator_response_decode_v4_batched() {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(2, &mut buf);
let key = b"my-group";
varint::encode_unsigned_varint(key.len() as u32 + 1, &mut buf);
buf.put_slice(key);
buf.put_i32(3); let host = b"broker-3";
varint::encode_unsigned_varint(host.len() as u32 + 1, &mut buf);
buf.put_slice(host);
buf.put_i32(9094); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf);
let resp = FindCoordinatorResponse::decode_versioned(4, &mut buf.freeze()).unwrap();
assert_eq!(resp.node_id, 3);
assert_eq!(resp.host, "broker-3");
assert_eq!(resp.port, 9094);
assert!(resp.error_code.is_ok());
}
#[test]
fn test_find_coordinator_response_decode_v4_rejects_null_key_in_remaining_entry() {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(3, &mut buf);
let key = b"my-group";
varint::encode_unsigned_varint(key.len() as u32 + 1, &mut buf);
buf.put_slice(key);
buf.put_i32(3); let host = b"broker-3";
varint::encode_unsigned_varint(host.len() as u32 + 1, &mut buf);
buf.put_slice(host);
buf.put_i32(9094); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(0, &mut buf); buf.put_i32(4); let host = b"broker-4";
varint::encode_unsigned_varint(host.len() as u32 + 1, &mut buf);
buf.put_slice(host);
buf.put_i32(9095); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
let err = FindCoordinatorResponse::decode_versioned(4, &mut buf.freeze()).unwrap_err();
assert!(
err.to_string().contains("coordinator key must not be null"),
"expected non-null coordinator key error, got: {err}"
);
}
#[test]
fn test_find_coordinator_response_decode_v4_rejects_null_host_in_remaining_entry() {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(3, &mut buf);
let key = b"my-group";
varint::encode_unsigned_varint(key.len() as u32 + 1, &mut buf);
buf.put_slice(key);
buf.put_i32(3); let host = b"broker-3";
varint::encode_unsigned_varint(host.len() as u32 + 1, &mut buf);
buf.put_slice(host);
buf.put_i32(9094); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf);
let key = b"other-group";
varint::encode_unsigned_varint(key.len() as u32 + 1, &mut buf);
buf.put_slice(key);
buf.put_i32(4); varint::encode_unsigned_varint(0, &mut buf); buf.put_i32(9095); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf);
varint::encode_unsigned_varint(0, &mut buf);
let err = FindCoordinatorResponse::decode_versioned(4, &mut buf.freeze()).unwrap_err();
assert!(
err.to_string()
.contains("coordinator host must not be null"),
"expected non-null coordinator host error, got: {err}"
);
}
#[rstest]
#[case::v5(5)]
#[case::v6(6)]
fn test_find_coordinator_request_v5_v6_same_as_v4(#[case] version: i16) {
let request = FindCoordinatorRequest {
key: "my-group".to_string(),
key_type: 0,
};
let mut buf_v4 = BytesMut::new();
request.encode_versioned(4, &mut buf_v4).unwrap();
let mut buf = BytesMut::new();
request.encode_versioned(version, &mut buf).unwrap();
assert_eq!(buf, buf_v4, "v{version} encode should equal v4");
}
#[test]
fn test_find_coordinator_request_v4_exact_bytes() {
let request = FindCoordinatorRequest {
key: "grp".to_string(),
key_type: 0,
};
let mut buf = BytesMut::new();
request.encode_versioned(4, &mut buf).unwrap();
let mut expected = BytesMut::new();
expected.put_i8(0); varint::encode_unsigned_varint(2, &mut expected); varint::encode_unsigned_varint(4, &mut expected);
expected.put_slice(b"grp");
varint::encode_unsigned_varint(0, &mut expected);
assert_eq!(
buf, expected,
"v4 wire bytes mismatch: got {buf:?}, expected {expected:?}"
);
}
#[rstest]
#[case::v5(5)]
#[case::v6(6)]
fn test_find_coordinator_response_v5_v6_decode(#[case] version: i16) {
let mut buf = BytesMut::new();
buf.put_i32(0); varint::encode_unsigned_varint(2, &mut buf); let key = b"grp";
varint::encode_unsigned_varint(key.len() as u32 + 1, &mut buf);
buf.put_slice(key);
buf.put_i32(7); let host = b"host-7";
varint::encode_unsigned_varint(host.len() as u32 + 1, &mut buf);
buf.put_slice(host);
buf.put_i32(9092); buf.put_i16(0); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf); varint::encode_unsigned_varint(0, &mut buf);
let resp = FindCoordinatorResponse::decode_versioned(version, &mut buf.freeze()).unwrap();
assert_eq!(resp.node_id, 7);
assert_eq!(resp.host, "host-7");
assert_eq!(resp.port, 9092);
}
}