use bytes::{Buf, BufMut};
use super::{VersionedDecode, VersionedEncode, non_nullable_string};
use crate::error::{ErrorCode, Result};
use crate::protocol::primitives::{Decode, Encode, KafkaString, TaggedFields, TryEncode};
use crate::protocol::{
array_len_i32, check_compact_array_len, check_decode_array_len, encode_compact_array_len,
};
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AclResourceType {
Unknown = 0,
#[default]
Any = 1,
Topic = 2,
Group = 3,
Cluster = 4,
TransactionalId = 5,
DelegationToken = 6,
}
impl AclResourceType {
#[inline]
pub fn from_i8(value: i8) -> Self {
match value {
1 => Self::Any,
2 => Self::Topic,
3 => Self::Group,
4 => Self::Cluster,
5 => Self::TransactionalId,
6 => Self::DelegationToken,
_ => Self::Unknown,
}
}
#[inline]
pub fn to_i8(self) -> i8 {
self as i8
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AclPatternType {
Unknown = 0,
#[default]
Any = 1,
Literal = 2,
Prefixed = 3,
}
impl AclPatternType {
#[inline]
pub fn from_i8(value: i8) -> Self {
match value {
1 => Self::Any,
2 => Self::Literal,
3 => Self::Prefixed,
_ => Self::Unknown,
}
}
#[inline]
pub fn to_i8(self) -> i8 {
self as i8
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AclOperation {
Unknown = 0,
#[default]
Any = 1,
All = 2,
Read = 3,
Write = 4,
Create = 5,
Delete = 6,
Alter = 7,
Describe = 8,
ClusterAction = 9,
DescribeConfigs = 10,
AlterConfigs = 11,
IdempotentWrite = 12,
}
impl AclOperation {
#[inline]
pub fn from_i8(value: i8) -> Self {
match value {
1 => Self::Any,
2 => Self::All,
3 => Self::Read,
4 => Self::Write,
5 => Self::Create,
6 => Self::Delete,
7 => Self::Alter,
8 => Self::Describe,
9 => Self::ClusterAction,
10 => Self::DescribeConfigs,
11 => Self::AlterConfigs,
12 => Self::IdempotentWrite,
_ => Self::Unknown,
}
}
#[inline]
pub fn to_i8(self) -> i8 {
self as i8
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum AclPermissionType {
Unknown = 0,
#[default]
Any = 1,
Deny = 2,
Allow = 3,
}
impl AclPermissionType {
#[inline]
pub fn from_i8(value: i8) -> Self {
match value {
1 => Self::Any,
2 => Self::Deny,
3 => Self::Allow,
_ => Self::Unknown,
}
}
#[inline]
pub fn to_i8(self) -> i8 {
self as i8
}
}
#[derive(Debug, Clone)]
pub struct AclBinding {
pub resource_type: AclResourceType,
pub resource_name: String,
pub pattern_type: AclPatternType,
pub principal: String,
pub host: String,
pub operation: AclOperation,
pub permission_type: AclPermissionType,
}
impl AclBinding {
pub fn new(
resource_type: AclResourceType,
resource_name: impl Into<String>,
principal: impl Into<String>,
host: impl Into<String>,
operation: AclOperation,
permission_type: AclPermissionType,
) -> Self {
Self {
resource_type,
resource_name: resource_name.into(),
pattern_type: AclPatternType::Literal,
principal: principal.into(),
host: host.into(),
operation,
permission_type,
}
}
pub fn with_pattern_type(mut self, pattern_type: AclPatternType) -> Self {
self.pattern_type = pattern_type;
self
}
pub fn allow_read_topic(topic: impl Into<String>, principal: impl Into<String>) -> Self {
Self::new(
AclResourceType::Topic,
topic,
principal,
"*",
AclOperation::Read,
AclPermissionType::Allow,
)
}
pub fn allow_write_topic(topic: impl Into<String>, principal: impl Into<String>) -> Self {
Self::new(
AclResourceType::Topic,
topic,
principal,
"*",
AclOperation::Write,
AclPermissionType::Allow,
)
}
pub fn allow_all_topic(topic: impl Into<String>, principal: impl Into<String>) -> Self {
Self::new(
AclResourceType::Topic,
topic,
principal,
"*",
AclOperation::All,
AclPermissionType::Allow,
)
}
}
#[derive(Debug, Clone)]
pub struct DescribeAclsRequest {
pub resource_type: AclResourceType,
pub resource_name: Option<String>,
pub pattern_type: AclPatternType,
pub principal: Option<String>,
pub host: Option<String>,
pub operation: AclOperation,
pub permission_type: AclPermissionType,
}
impl DescribeAclsRequest {
pub fn all() -> Self {
Self {
resource_type: AclResourceType::Any,
resource_name: None,
pattern_type: AclPatternType::Any,
principal: None,
host: None,
operation: AclOperation::Any,
permission_type: AclPermissionType::Any,
}
}
pub fn for_topic(topic: impl Into<String>) -> Self {
Self {
resource_type: AclResourceType::Topic,
resource_name: Some(topic.into()),
pattern_type: AclPatternType::Any,
principal: None,
host: None,
operation: AclOperation::Any,
permission_type: AclPermissionType::Any,
}
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
(self.resource_type.to_i8()).encode(buf);
KafkaString(self.resource_name.clone()).try_encode(buf)?;
(self.pattern_type.to_i8()).encode(buf);
KafkaString(self.principal.clone()).try_encode(buf)?;
KafkaString(self.host.clone()).try_encode(buf)?;
(self.operation.to_i8()).encode(buf);
(self.permission_type.to_i8()).encode(buf);
Ok(())
}
pub fn encode_v2(&self, buf: &mut impl BufMut) -> Result<()> {
(self.resource_type.to_i8()).encode(buf);
KafkaString(self.resource_name.clone()).try_encode_compact(buf)?;
(self.pattern_type.to_i8()).encode(buf);
KafkaString(self.principal.clone()).try_encode_compact(buf)?;
KafkaString(self.host.clone()).try_encode_compact(buf)?;
(self.operation.to_i8()).encode(buf);
(self.permission_type.to_i8()).encode(buf);
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct DescribeAclsResponse {
pub throttle_time_ms: i32,
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub resources: Vec<DescribeAclsResource>,
}
#[derive(Debug, Clone)]
pub struct DescribeAclsResource {
pub resource_type: AclResourceType,
pub resource_name: String,
pub pattern_type: AclPatternType,
pub acls: Vec<AclDescription>,
}
#[derive(Debug, Clone)]
pub struct AclDescription {
pub principal: String,
pub host: String,
pub operation: AclOperation,
pub permission_type: AclPermissionType,
}
impl DescribeAclsResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode(buf)?.0;
let resource_count = check_decode_array_len(i32::decode(buf)?)?;
let mut resources = Vec::with_capacity(resource_count);
for _ in 0..resource_count {
let resource_type = AclResourceType::from_i8(i8::decode(buf)?);
let resource_name = non_nullable_string("resource name", KafkaString::decode(buf)?.0)?;
let pattern_type = AclPatternType::from_i8(i8::decode(buf)?);
let acl_count = check_decode_array_len(i32::decode(buf)?)?;
let mut acls = Vec::with_capacity(acl_count);
for _ in 0..acl_count {
let principal = non_nullable_string("principal", KafkaString::decode(buf)?.0)?;
let host = non_nullable_string("host", KafkaString::decode(buf)?.0)?;
let operation = AclOperation::from_i8(i8::decode(buf)?);
let permission_type = AclPermissionType::from_i8(i8::decode(buf)?);
acls.push(AclDescription {
principal,
host,
operation,
permission_type,
});
}
resources.push(DescribeAclsResource {
resource_type,
resource_name,
pattern_type,
acls,
});
}
Ok(Self {
throttle_time_ms,
error_code,
error_message,
resources,
})
}
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let resource_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut resources = Vec::with_capacity(resource_count);
for _ in 0..resource_count {
let resource_type = AclResourceType::from_i8(i8::decode(buf)?);
let resource_name =
non_nullable_string("resource name", KafkaString::decode_compact(buf)?.0)?;
let pattern_type = AclPatternType::from_i8(i8::decode(buf)?);
let acl_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut acls = Vec::with_capacity(acl_count);
for _ in 0..acl_count {
let principal =
non_nullable_string("principal", KafkaString::decode_compact(buf)?.0)?;
let host = non_nullable_string("host", KafkaString::decode_compact(buf)?.0)?;
let operation = AclOperation::from_i8(i8::decode(buf)?);
let permission_type = AclPermissionType::from_i8(i8::decode(buf)?);
let _ = TaggedFields::decode(buf)?;
acls.push(AclDescription {
principal,
host,
operation,
permission_type,
});
}
let _ = TaggedFields::decode(buf)?;
resources.push(DescribeAclsResource {
resource_type,
resource_name,
pattern_type,
acls,
});
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
error_code,
error_message,
resources,
})
}
}
#[derive(Debug, Clone)]
pub struct CreateAclsRequest {
pub creations: Vec<AclBinding>,
}
impl CreateAclsRequest {
pub fn new(creations: Vec<AclBinding>) -> Self {
Self { creations }
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
array_len_i32(self.creations.len())?.encode(buf);
for acl in &self.creations {
(acl.resource_type.to_i8()).encode(buf);
KafkaString(Some(acl.resource_name.clone())).try_encode(buf)?;
(acl.pattern_type.to_i8()).encode(buf);
KafkaString(Some(acl.principal.clone())).try_encode(buf)?;
KafkaString(Some(acl.host.clone())).try_encode(buf)?;
(acl.operation.to_i8()).encode(buf);
(acl.permission_type.to_i8()).encode(buf);
}
Ok(())
}
pub fn encode_v2(&self, buf: &mut impl BufMut) -> Result<()> {
encode_compact_array_len(self.creations.len(), buf)?;
for acl in &self.creations {
(acl.resource_type.to_i8()).encode(buf);
KafkaString(Some(acl.resource_name.clone())).try_encode_compact(buf)?;
(acl.pattern_type.to_i8()).encode(buf);
KafkaString(Some(acl.principal.clone())).try_encode_compact(buf)?;
KafkaString(Some(acl.host.clone())).try_encode_compact(buf)?;
(acl.operation.to_i8()).encode(buf);
(acl.permission_type.to_i8()).encode(buf);
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct CreateAclsResponse {
pub throttle_time_ms: i32,
pub results: Vec<CreateAclsResult>,
}
#[derive(Debug, Clone)]
pub struct CreateAclsResult {
pub error_code: ErrorCode,
pub error_message: Option<String>,
}
impl CreateAclsResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let result_count = check_decode_array_len(i32::decode(buf)?)?;
let mut results = Vec::with_capacity(result_count);
for _ in 0..result_count {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode(buf)?.0;
results.push(CreateAclsResult {
error_code,
error_message,
});
}
Ok(Self {
throttle_time_ms,
results,
})
}
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let result_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut results = Vec::with_capacity(result_count);
for _ in 0..result_count {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let _ = TaggedFields::decode(buf)?;
results.push(CreateAclsResult {
error_code,
error_message,
});
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
results,
})
}
pub fn is_ok(&self) -> bool {
self.results.iter().all(|r| r.error_code.is_ok())
}
}
#[derive(Debug, Clone)]
pub struct DeleteAclsRequest {
pub filters: Vec<AclBindingFilter>,
}
#[derive(Debug, Clone)]
pub struct AclBindingFilter {
pub resource_type: AclResourceType,
pub resource_name: Option<String>,
pub pattern_type: AclPatternType,
pub principal: Option<String>,
pub host: Option<String>,
pub operation: AclOperation,
pub permission_type: AclPermissionType,
}
impl AclBindingFilter {
pub fn matching(binding: &AclBinding) -> Self {
Self {
resource_type: binding.resource_type,
resource_name: Some(binding.resource_name.clone()),
pattern_type: binding.pattern_type,
principal: Some(binding.principal.clone()),
host: Some(binding.host.clone()),
operation: binding.operation,
permission_type: binding.permission_type,
}
}
pub fn for_topic(topic: impl Into<String>) -> Self {
Self {
resource_type: AclResourceType::Topic,
resource_name: Some(topic.into()),
pattern_type: AclPatternType::Any,
principal: None,
host: None,
operation: AclOperation::Any,
permission_type: AclPermissionType::Any,
}
}
}
impl DeleteAclsRequest {
pub fn new(filters: Vec<AclBindingFilter>) -> Self {
Self { filters }
}
pub fn encode_v1(&self, buf: &mut impl BufMut) -> Result<()> {
array_len_i32(self.filters.len())?.encode(buf);
for filter in &self.filters {
(filter.resource_type.to_i8()).encode(buf);
KafkaString(filter.resource_name.clone()).try_encode(buf)?;
(filter.pattern_type.to_i8()).encode(buf);
KafkaString(filter.principal.clone()).try_encode(buf)?;
KafkaString(filter.host.clone()).try_encode(buf)?;
(filter.operation.to_i8()).encode(buf);
(filter.permission_type.to_i8()).encode(buf);
}
Ok(())
}
pub fn encode_v2(&self, buf: &mut impl BufMut) -> Result<()> {
encode_compact_array_len(self.filters.len(), buf)?;
for filter in &self.filters {
(filter.resource_type.to_i8()).encode(buf);
KafkaString(filter.resource_name.clone()).try_encode_compact(buf)?;
(filter.pattern_type.to_i8()).encode(buf);
KafkaString(filter.principal.clone()).try_encode_compact(buf)?;
KafkaString(filter.host.clone()).try_encode_compact(buf)?;
(filter.operation.to_i8()).encode(buf);
(filter.permission_type.to_i8()).encode(buf);
TaggedFields::default().try_encode(buf)?;
}
TaggedFields::default().try_encode(buf)?;
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct DeleteAclsResponse {
pub throttle_time_ms: i32,
pub filter_results: Vec<DeleteAclsFilterResult>,
}
#[derive(Debug, Clone)]
pub struct DeleteAclsFilterResult {
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub matching_acls: Vec<DeleteAclsMatchingAcl>,
}
#[derive(Debug, Clone)]
pub struct DeleteAclsMatchingAcl {
pub error_code: ErrorCode,
pub error_message: Option<String>,
pub resource_type: AclResourceType,
pub resource_name: String,
pub pattern_type: AclPatternType,
pub principal: String,
pub host: String,
pub operation: AclOperation,
pub permission_type: AclPermissionType,
}
impl DeleteAclsResponse {
pub fn decode_v1(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let filter_count = check_decode_array_len(i32::decode(buf)?)?;
let mut filter_results = Vec::with_capacity(filter_count);
for _ in 0..filter_count {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode(buf)?.0;
let matching_count = check_decode_array_len(i32::decode(buf)?)?;
let mut matching_acls = Vec::with_capacity(matching_count);
for _ in 0..matching_count {
let acl_error_code = ErrorCode::from_i16(i16::decode(buf)?);
let acl_error_message = KafkaString::decode(buf)?.0;
let resource_type = AclResourceType::from_i8(i8::decode(buf)?);
let resource_name =
non_nullable_string("resource name", KafkaString::decode(buf)?.0)?;
let pattern_type = AclPatternType::from_i8(i8::decode(buf)?);
let principal = non_nullable_string("principal", KafkaString::decode(buf)?.0)?;
let host = non_nullable_string("host", KafkaString::decode(buf)?.0)?;
let operation = AclOperation::from_i8(i8::decode(buf)?);
let permission_type = AclPermissionType::from_i8(i8::decode(buf)?);
matching_acls.push(DeleteAclsMatchingAcl {
error_code: acl_error_code,
error_message: acl_error_message,
resource_type,
resource_name,
pattern_type,
principal,
host,
operation,
permission_type,
});
}
filter_results.push(DeleteAclsFilterResult {
error_code,
error_message,
matching_acls,
});
}
Ok(Self {
throttle_time_ms,
filter_results,
})
}
pub fn decode_v2(buf: &mut impl Buf) -> Result<Self> {
let throttle_time_ms = i32::decode(buf)?;
let filter_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut filter_results = Vec::with_capacity(filter_count);
for _ in 0..filter_count {
let error_code = ErrorCode::from_i16(i16::decode(buf)?);
let error_message = KafkaString::decode_compact(buf)?.0;
let matching_count =
check_compact_array_len(crate::util::varint::decode_unsigned_varint(buf)?)?;
let mut matching_acls = Vec::with_capacity(matching_count);
for _ in 0..matching_count {
let acl_error_code = ErrorCode::from_i16(i16::decode(buf)?);
let acl_error_message = KafkaString::decode_compact(buf)?.0;
let resource_type = AclResourceType::from_i8(i8::decode(buf)?);
let resource_name =
non_nullable_string("resource name", KafkaString::decode_compact(buf)?.0)?;
let pattern_type = AclPatternType::from_i8(i8::decode(buf)?);
let principal =
non_nullable_string("principal", KafkaString::decode_compact(buf)?.0)?;
let host = non_nullable_string("host", KafkaString::decode_compact(buf)?.0)?;
let operation = AclOperation::from_i8(i8::decode(buf)?);
let permission_type = AclPermissionType::from_i8(i8::decode(buf)?);
let _ = TaggedFields::decode(buf)?;
matching_acls.push(DeleteAclsMatchingAcl {
error_code: acl_error_code,
error_message: acl_error_message,
resource_type,
resource_name,
pattern_type,
principal,
host,
operation,
permission_type,
});
}
let _ = TaggedFields::decode(buf)?;
filter_results.push(DeleteAclsFilterResult {
error_code,
error_message,
matching_acls,
});
}
let _ = TaggedFields::decode(buf)?;
Ok(Self {
throttle_time_ms,
filter_results,
})
}
}
impl VersionedEncode for DescribeAclsRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
1 => self.encode_v1(buf)?,
2 | 3 => self.encode_v2(buf)?,
_ => return unsupported_encode!("DescribeAclsRequest", version),
}
Ok(())
}
}
impl VersionedDecode for DescribeAclsResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1 => Self::decode_v1(buf),
2 | 3 => Self::decode_v2(buf),
_ => unsupported_decode!("DescribeAclsResponse", version),
}
}
}
impl VersionedEncode for CreateAclsRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
1 => self.encode_v1(buf)?,
2 | 3 => self.encode_v2(buf)?,
_ => return unsupported_encode!("CreateAclsRequest", version),
}
Ok(())
}
}
impl VersionedDecode for CreateAclsResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1 => Self::decode_v1(buf),
2 | 3 => Self::decode_v2(buf),
_ => unsupported_decode!("CreateAclsResponse", version),
}
}
}
impl VersionedEncode for DeleteAclsRequest {
fn encode_versioned(&self, version: i16, buf: &mut impl BufMut) -> Result<()> {
match version {
1 => self.encode_v1(buf)?,
2 | 3 => self.encode_v2(buf)?,
_ => return unsupported_encode!("DeleteAclsRequest", version),
}
Ok(())
}
}
impl VersionedDecode for DeleteAclsResponse {
fn decode_versioned(version: i16, buf: &mut impl Buf) -> Result<Self> {
match version {
1 => Self::decode_v1(buf),
2 | 3 => Self::decode_v2(buf),
_ => unsupported_decode!("DeleteAclsResponse", version),
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
use bytes::BytesMut;
use rstest::rstest;
#[test]
fn test_acl_resource_type() {
assert_eq!(AclResourceType::Topic.to_i8(), 2);
assert_eq!(AclResourceType::Group.to_i8(), 3);
assert_eq!(AclResourceType::Cluster.to_i8(), 4);
assert_eq!(AclResourceType::from_i8(2), AclResourceType::Topic);
assert_eq!(AclResourceType::from_i8(99), AclResourceType::Unknown);
}
#[test]
fn test_acl_operation() {
assert_eq!(AclOperation::Read.to_i8(), 3);
assert_eq!(AclOperation::Write.to_i8(), 4);
assert_eq!(AclOperation::from_i8(3), AclOperation::Read);
assert_eq!(AclOperation::from_i8(99), AclOperation::Unknown);
}
#[test]
fn test_acl_permission_type() {
assert_eq!(AclPermissionType::Allow.to_i8(), 3);
assert_eq!(AclPermissionType::Deny.to_i8(), 2);
assert_eq!(AclPermissionType::from_i8(3), AclPermissionType::Allow);
}
#[test]
fn test_acl_binding() {
let binding = AclBinding::allow_read_topic("my-topic", "User:alice");
assert_eq!(binding.resource_type, AclResourceType::Topic);
assert_eq!(binding.resource_name, "my-topic");
assert_eq!(binding.principal, "User:alice");
assert_eq!(binding.host, "*");
assert_eq!(binding.operation, AclOperation::Read);
assert_eq!(binding.permission_type, AclPermissionType::Allow);
}
#[test]
fn test_describe_acls_request() {
let request = DescribeAclsRequest::all();
assert_eq!(request.resource_type, AclResourceType::Any);
assert!(request.resource_name.is_none());
let request = DescribeAclsRequest::for_topic("my-topic");
assert_eq!(request.resource_type, AclResourceType::Topic);
assert_eq!(request.resource_name.as_deref(), Some("my-topic"));
}
#[test]
fn test_create_acls_request() {
let bindings = vec![
AclBinding::allow_read_topic("topic1", "User:alice"),
AclBinding::allow_write_topic("topic2", "User:bob"),
];
let request = CreateAclsRequest::new(bindings);
assert_eq!(request.creations.len(), 2);
let mut buf = BytesMut::new();
request.encode_v1(&mut buf).unwrap();
assert!(!buf.is_empty());
}
#[test]
fn test_delete_acls_filter() {
let binding = AclBinding::allow_read_topic("my-topic", "User:alice");
let filter = AclBindingFilter::matching(&binding);
assert_eq!(filter.resource_name.as_deref(), Some("my-topic"));
assert_eq!(filter.principal.as_deref(), Some("User:alice"));
}
#[test]
fn test_describe_acls_v2_flexible() {
let request = DescribeAclsRequest::all();
let mut v1 = BytesMut::new();
request.encode_v1(&mut v1).unwrap();
let mut v2 = BytesMut::new();
request.encode_v2(&mut v2).unwrap();
assert_ne!(v1.len(), v2.len());
assert!(!v2.is_empty());
let mut v3 = BytesMut::new();
request.encode_versioned(3, &mut v3).unwrap();
assert_eq!(v2, v3);
}
#[test]
fn test_describe_acls_response_v2_roundtrip() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_i16(0); buf.put_u8(0); buf.put_u8(2); buf.put_i8(2); buf.put_u8(6); buf.put_slice(b"test1");
buf.put_i8(2); buf.put_u8(2); buf.put_u8(6); buf.put_slice(b"User:");
buf.put_u8(2); buf.put_slice(b"*");
buf.put_i8(4); buf.put_i8(3); buf.put_u8(0); buf.put_u8(0); buf.put_u8(0);
let mut frozen = buf.freeze();
let resp = DescribeAclsResponse::decode_v2(&mut frozen).unwrap();
assert!(resp.error_code.is_ok());
assert_eq!(resp.resources.len(), 1);
let resource = &resp.resources[0];
assert_eq!(resource.resource_type, AclResourceType::Topic);
assert_eq!(resource.resource_name, "test1");
assert_eq!(resource.pattern_type, AclPatternType::Literal);
assert_eq!(resource.acls.len(), 1);
let acl = &resource.acls[0];
assert_eq!(acl.principal, "User:");
assert_eq!(acl.host, "*");
assert_eq!(acl.operation, AclOperation::Write);
assert_eq!(acl.permission_type, AclPermissionType::Allow);
}
#[test]
fn test_create_acls_v2_flexible() {
use crate::protocol::messages::{
AclBinding, AclOperation, AclPatternType, AclPermissionType, AclResourceType,
};
let request = CreateAclsRequest::new(vec![AclBinding {
resource_type: AclResourceType::Topic,
resource_name: "test".to_string(),
pattern_type: AclPatternType::Literal,
principal: "User:alice".to_string(),
host: "*".to_string(),
operation: AclOperation::Read,
permission_type: AclPermissionType::Allow,
}]);
let mut v1 = BytesMut::new();
request.encode_v1(&mut v1).unwrap();
let mut v2 = BytesMut::new();
request.encode_v2(&mut v2).unwrap();
assert_ne!(v1.len(), v2.len());
let mut v3 = BytesMut::new();
request.encode_versioned(3, &mut v3).unwrap();
assert_eq!(v2, v3);
}
#[test]
fn test_create_acls_response_v2_roundtrip() {
let mut buf = BytesMut::new();
buf.put_i32(0); buf.put_u8(2); buf.put_i16(0); buf.put_u8(0); buf.put_u8(0); buf.put_u8(0);
let mut frozen = buf.freeze();
let resp = CreateAclsResponse::decode_v2(&mut frozen).unwrap();
assert_eq!(resp.results.len(), 1);
assert!(resp.results[0].error_code.is_ok());
}
#[test]
fn test_delete_acls_v2_flexible() {
let filter = AclBindingFilter::for_topic("test");
let request = DeleteAclsRequest::new(vec![filter]);
let mut v1 = BytesMut::new();
request.encode_v1(&mut v1).unwrap();
let mut v2 = BytesMut::new();
request.encode_v2(&mut v2).unwrap();
assert_ne!(v1.len(), v2.len());
let mut v3 = BytesMut::new();
request.encode_versioned(3, &mut v3).unwrap();
assert_eq!(v2, v3);
}
#[test]
fn test_delete_acls_response_v2_roundtrip() {
let mut buf = BytesMut::new();
buf.put_i32(5); buf.put_u8(2); buf.put_i16(0); buf.put_u8(0); buf.put_u8(1); buf.put_u8(0); buf.put_u8(0);
let mut frozen = buf.freeze();
let resp = DeleteAclsResponse::decode_v2(&mut frozen).unwrap();
assert_eq!(resp.throttle_time_ms, 5);
assert_eq!(resp.filter_results.len(), 1);
assert!(resp.filter_results[0].matching_acls.is_empty());
}
#[rstest]
#[case::da_v0(0)]
fn test_describe_acls_encode_below_min(#[case] version: i16) {
let request = DescribeAclsRequest {
resource_type: AclResourceType::Any,
resource_name: None,
pattern_type: AclPatternType::Any,
principal: None,
host: None,
operation: AclOperation::All,
permission_type: AclPermissionType::Allow,
};
let mut buf = BytesMut::new();
assert!(request.encode_versioned(version, &mut buf).is_err());
}
}