Skip to main content

kafkit_client/admin/
mod.rs

1//! Admin client operations for topics, groups, configs, and cluster metadata.
2//!
3//! ```no_run
4//! # async fn example() -> kafkit_client::Result<()> {
5//! use kafkit_client::{KafkaClient, NewTopic};
6//!
7//! let admin = KafkaClient::new("localhost:9092").admin().connect().await?;
8//! admin
9//!     .create_topics([NewTopic::new("orders", 3, 1)])
10//!     .await?;
11//! # Ok(())
12//! # }
13//! ```
14//!
15use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use bytes::{Buf, Bytes};
19use kafka_protocol::error::{ParseResponseErrorCode, ResponseError};
20use kafka_protocol::messages::alter_user_scram_credentials_request::ScramCredentialUpsertion;
21use kafka_protocol::messages::create_acls_request::AclCreation;
22use kafka_protocol::messages::create_partitions_request::{
23    CreatePartitionsAssignment, CreatePartitionsTopic,
24};
25use kafka_protocol::messages::create_topics_request::{CreatableTopic, CreatableTopicConfig};
26use kafka_protocol::messages::delete_acls_request::DeleteAclsFilter;
27use kafka_protocol::messages::describe_configs_request::DescribeConfigsResource;
28use kafka_protocol::messages::describe_log_dirs_request::DescribableLogDirTopic;
29use kafka_protocol::messages::incremental_alter_configs_request::{
30    AlterConfigsResource, AlterableConfig,
31};
32use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
33use kafka_protocol::messages::update_features_request::FeatureUpdateKey;
34use kafka_protocol::messages::{
35    AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse,
36    ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerProtocolAssignment,
37    ConsumerProtocolSubscription, CreateAclsRequest, CreateAclsResponse, CreatePartitionsRequest,
38    CreatePartitionsResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteAclsRequest,
39    DeleteAclsResponse, DeleteGroupsRequest, DeleteGroupsResponse, DeleteTopicsRequest,
40    DeleteTopicsResponse, DescribeAclsRequest, DescribeAclsResponse, DescribeClusterRequest,
41    DescribeClusterResponse, DescribeConfigsRequest, DescribeConfigsResponse,
42    DescribeGroupsRequest, DescribeGroupsResponse, DescribeLogDirsRequest, DescribeLogDirsResponse,
43    IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse, ListGroupsRequest,
44    ListGroupsResponse, MetadataRequest, MetadataResponse, ShareGroupDescribeRequest,
45    ShareGroupDescribeResponse, UpdateFeaturesRequest, UpdateFeaturesResponse,
46};
47use kafka_protocol::protocol::{Decodable, Request, StrBytes};
48use tracing::{debug, instrument};
49use uuid::Uuid;
50
51use crate::config::{AdminConfig, SaslMechanism};
52use crate::constants::{
53    ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP, CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
54    CREATE_ACLS_VERSION_CAP, CREATE_PARTITIONS_VERSION_CAP, CREATE_TOPICS_VERSION_CAP,
55    DELETE_ACLS_VERSION_CAP, DELETE_GROUPS_VERSION_CAP, DELETE_TOPICS_VERSION_CAP,
56    DESCRIBE_ACLS_VERSION_CAP, DESCRIBE_CLUSTER_VERSION_CAP, DESCRIBE_CONFIGS_VERSION_CAP,
57    DESCRIBE_GROUPS_VERSION_CAP, DESCRIBE_LOG_DIRS_VERSION_CAP,
58    INCREMENTAL_ALTER_CONFIGS_VERSION_CAP, LIST_GROUPS_VERSION_CAP, METADATA_VERSION_CAP,
59    SHARE_GROUP_DESCRIBE_VERSION_CAP, UPDATE_FEATURES_VERSION_CAP,
60};
61use crate::network::scram;
62use crate::network::{BrokerConnection, connect_to_any_bootstrap, duration_to_i32_ms};
63use crate::types::TopicPartition;
64use crate::{AdminError, BrokerError, Result, ValidationError};
65
66#[derive(Debug, Clone)]
67/// Topic definition used with [`KafkaAdmin::create_topics`].
68///
69/// `num_partitions` and `replication_factor` must both be positive. Additional
70/// topic-level broker configs can be attached with [`NewTopic::with_config`].
71///
72/// ```no_run
73/// # async fn example(admin: kafkit_client::KafkaAdmin) -> kafkit_client::Result<()> {
74/// use kafkit_client::NewTopic;
75///
76/// admin
77///     .create_topics([
78///         NewTopic::new("orders", 6, 3)
79///             .with_config("cleanup.policy", "delete")
80///             .with_config("retention.ms", "604800000"),
81///     ])
82///     .await?;
83/// # Ok(())
84/// # }
85/// ```
86pub struct NewTopic {
87    /// Topic name to create.
88    pub name: String,
89    /// Number of partitions for the new topic.
90    pub num_partitions: i32,
91    /// Number of replicas Kafka should create for each partition.
92    pub replication_factor: i16,
93    /// Topic-level configuration entries.
94    pub configs: BTreeMap<String, String>,
95}
96
97impl NewTopic {
98    /// Creates a topic definition.
99    pub fn new(name: impl Into<String>, num_partitions: i32, replication_factor: i16) -> Self {
100        Self {
101            name: name.into(),
102            num_partitions,
103            replication_factor,
104            configs: BTreeMap::new(),
105        }
106    }
107
108    /// Adds a topic-level config entry and returns the updated definition.
109    pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
110        self.configs.insert(key.into(), value.into());
111        self
112    }
113
114    fn into_request_topic(self) -> Result<CreatableTopic> {
115        let name = validate_topic_name(self.name)?;
116        if self.num_partitions <= 0 {
117            return Err(AdminError::InvalidPartitionCount {
118                partitions: self.num_partitions,
119            }
120            .into());
121        }
122        if self.replication_factor <= 0 {
123            return Err(AdminError::InvalidReplicationFactor {
124                replication_factor: self.replication_factor,
125            }
126            .into());
127        }
128
129        let configs = self
130            .configs
131            .into_iter()
132            .map(|(key, value)| {
133                CreatableTopicConfig::default()
134                    .with_name(StrBytes::from_string(key))
135                    .with_value(Some(StrBytes::from_string(value)))
136            })
137            .collect();
138
139        Ok(CreatableTopic::default()
140            .with_name(StrBytes::from_string(name).into())
141            .with_num_partitions(self.num_partitions)
142            .with_replication_factor(self.replication_factor)
143            .with_configs(configs))
144    }
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
148/// Partition increase request used with [`KafkaAdmin::create_partitions`].
149///
150/// Kafka can only increase a topic's partition count. `total_count` is the new
151/// total number of partitions, not the number of partitions to add.
152///
153/// ```no_run
154/// # async fn example(admin: kafkit_client::KafkaAdmin) -> kafkit_client::Result<()> {
155/// use kafkit_client::NewPartitions;
156///
157/// admin
158///     .create_partitions([("orders", NewPartitions::increase_to(12))])
159///     .await?;
160/// # Ok(())
161/// # }
162/// ```
163pub struct NewPartitions {
164    /// New total partition count for the topic.
165    pub total_count: i32,
166    /// Optional per-new-partition replica broker assignments.
167    pub assignments: Vec<Vec<i32>>,
168}
169
170impl NewPartitions {
171    /// Creates a request to increase a topic to `total_count` partitions.
172    pub fn increase_to(total_count: i32) -> Self {
173        Self {
174            total_count,
175            assignments: Vec::new(),
176        }
177    }
178
179    /// Adds an explicit replica assignment for one new partition.
180    ///
181    /// Call this once for each new partition that needs an explicit broker
182    /// assignment.
183    pub fn with_assignment<I>(mut self, broker_ids: I) -> Self
184    where
185        I: IntoIterator<Item = i32>,
186    {
187        self.assignments.push(broker_ids.into_iter().collect());
188        self
189    }
190
191    fn into_request_topic(self, topic_name: String) -> Result<CreatePartitionsTopic> {
192        let name = validate_topic_name(topic_name)?;
193        if self.total_count <= 0 {
194            return Err(AdminError::InvalidPartitionCount {
195                partitions: self.total_count,
196            }
197            .into());
198        }
199
200        let assignments = (!self.assignments.is_empty()).then(|| {
201            self.assignments
202                .into_iter()
203                .map(|broker_ids| {
204                    CreatePartitionsAssignment::default()
205                        .with_broker_ids(broker_ids.into_iter().map(Into::into).collect())
206                })
207                .collect()
208        });
209
210        Ok(CreatePartitionsTopic::default()
211            .with_name(StrBytes::from_string(name).into())
212            .with_count(self.total_count)
213            .with_assignments(assignments))
214    }
215}
216
217#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218/// Type of Kafka resource an ACL applies to.
219pub enum ResourceType {
220    Unknown,
221    Any,
222    Topic,
223    Group,
224    Cluster,
225    TransactionalId,
226    DelegationToken,
227    User,
228}
229
230impl ResourceType {
231    fn as_protocol_value(self) -> i8 {
232        match self {
233            Self::Unknown => 0,
234            Self::Any => 1,
235            Self::Topic => 2,
236            Self::Group => 3,
237            Self::Cluster => 4,
238            Self::TransactionalId => 5,
239            Self::DelegationToken => 6,
240            Self::User => 7,
241        }
242    }
243
244    fn from_protocol_value(value: i8) -> Self {
245        match value {
246            1 => Self::Any,
247            2 => Self::Topic,
248            3 => Self::Group,
249            4 => Self::Cluster,
250            5 => Self::TransactionalId,
251            6 => Self::DelegationToken,
252            7 => Self::User,
253            _ => Self::Unknown,
254        }
255    }
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq)]
259/// Resource pattern type used by ACLs.
260pub enum PatternType {
261    Unknown,
262    Any,
263    Match,
264    Literal,
265    Prefixed,
266}
267
268impl PatternType {
269    fn as_protocol_value(self) -> i8 {
270        match self {
271            Self::Unknown => 0,
272            Self::Any => 1,
273            Self::Match => 2,
274            Self::Literal => 3,
275            Self::Prefixed => 4,
276        }
277    }
278
279    fn from_protocol_value(value: i8) -> Self {
280        match value {
281            1 => Self::Any,
282            2 => Self::Match,
283            3 => Self::Literal,
284            4 => Self::Prefixed,
285            _ => Self::Unknown,
286        }
287    }
288}
289
290#[derive(Debug, Clone, Copy, PartialEq, Eq)]
291/// Operation granted or denied by an ACL.
292pub enum AclOperation {
293    Unknown,
294    Any,
295    All,
296    Read,
297    Write,
298    Create,
299    Delete,
300    Alter,
301    Describe,
302    ClusterAction,
303    DescribeConfigs,
304    AlterConfigs,
305    IdempotentWrite,
306    CreateTokens,
307    DescribeTokens,
308    TwoPhaseCommit,
309}
310
311impl AclOperation {
312    fn as_protocol_value(self) -> i8 {
313        match self {
314            Self::Unknown => 0,
315            Self::Any => 1,
316            Self::All => 2,
317            Self::Read => 3,
318            Self::Write => 4,
319            Self::Create => 5,
320            Self::Delete => 6,
321            Self::Alter => 7,
322            Self::Describe => 8,
323            Self::ClusterAction => 9,
324            Self::DescribeConfigs => 10,
325            Self::AlterConfigs => 11,
326            Self::IdempotentWrite => 12,
327            Self::CreateTokens => 13,
328            Self::DescribeTokens => 14,
329            Self::TwoPhaseCommit => 15,
330        }
331    }
332
333    fn from_protocol_value(value: i8) -> Self {
334        match value {
335            1 => Self::Any,
336            2 => Self::All,
337            3 => Self::Read,
338            4 => Self::Write,
339            5 => Self::Create,
340            6 => Self::Delete,
341            7 => Self::Alter,
342            8 => Self::Describe,
343            9 => Self::ClusterAction,
344            10 => Self::DescribeConfigs,
345            11 => Self::AlterConfigs,
346            12 => Self::IdempotentWrite,
347            13 => Self::CreateTokens,
348            14 => Self::DescribeTokens,
349            15 => Self::TwoPhaseCommit,
350            _ => Self::Unknown,
351        }
352    }
353}
354
355#[derive(Debug, Clone, Copy, PartialEq, Eq)]
356/// Whether an ACL grants or denies access.
357pub enum AclPermissionType {
358    Unknown,
359    Any,
360    Deny,
361    Allow,
362}
363
364impl AclPermissionType {
365    fn as_protocol_value(self) -> i8 {
366        match self {
367            Self::Unknown => 0,
368            Self::Any => 1,
369            Self::Deny => 2,
370            Self::Allow => 3,
371        }
372    }
373
374    fn from_protocol_value(value: i8) -> Self {
375        match value {
376            1 => Self::Any,
377            2 => Self::Deny,
378            3 => Self::Allow,
379            _ => Self::Unknown,
380        }
381    }
382}
383
384#[derive(Debug, Clone, PartialEq, Eq)]
385/// Represents a pattern that ACLs use to match resources.
386pub struct ResourcePattern {
387    pub resource_type: ResourceType,
388    pub name: String,
389    pub pattern_type: PatternType,
390}
391
392impl ResourcePattern {
393    pub const WILDCARD_RESOURCE: &'static str = "*";
394
395    pub fn new(
396        resource_type: ResourceType,
397        name: impl Into<String>,
398        pattern_type: PatternType,
399    ) -> Self {
400        Self {
401            resource_type,
402            name: name.into(),
403            pattern_type,
404        }
405    }
406
407    pub fn to_filter(&self) -> ResourcePatternFilter {
408        ResourcePatternFilter::new(
409            self.resource_type,
410            Some(self.name.clone()),
411            self.pattern_type,
412        )
413    }
414
415    fn validate(&self) -> Result<()> {
416        if matches!(self.resource_type, ResourceType::Any) {
417            return Err(ValidationError::InvalidAclResourceType {
418                resource_type: "ANY".to_owned(),
419            }
420            .into());
421        }
422        if matches!(self.pattern_type, PatternType::Any | PatternType::Match) {
423            return Err(ValidationError::InvalidAclPatternType {
424                pattern_type: format!("{:?}", self.pattern_type),
425            }
426            .into());
427        }
428        if self.name.trim().is_empty() {
429            return Err(ValidationError::EmptyResourceName { resource: "ACL" }.into());
430        }
431        Ok(())
432    }
433}
434
435#[derive(Debug, Clone, PartialEq, Eq)]
436/// Filter form of [`ResourcePattern`].
437pub struct ResourcePatternFilter {
438    pub resource_type: ResourceType,
439    pub name: Option<String>,
440    pub pattern_type: PatternType,
441}
442
443impl ResourcePatternFilter {
444    pub fn any() -> Self {
445        Self::new(ResourceType::Any, None, PatternType::Any)
446    }
447
448    pub fn new(
449        resource_type: ResourceType,
450        name: Option<String>,
451        pattern_type: PatternType,
452    ) -> Self {
453        Self {
454            resource_type,
455            name,
456            pattern_type,
457        }
458    }
459}
460
461#[derive(Debug, Clone, PartialEq, Eq)]
462/// Access control entry: principal, host, operation, and permission type.
463pub struct AccessControlEntry {
464    pub principal: String,
465    pub host: String,
466    pub operation: AclOperation,
467    pub permission_type: AclPermissionType,
468}
469
470impl AccessControlEntry {
471    pub fn new(
472        principal: impl Into<String>,
473        host: impl Into<String>,
474        operation: AclOperation,
475        permission_type: AclPermissionType,
476    ) -> Self {
477        Self {
478            principal: principal.into(),
479            host: host.into(),
480            operation,
481            permission_type,
482        }
483    }
484
485    pub fn to_filter(&self) -> AccessControlEntryFilter {
486        AccessControlEntryFilter::new(
487            Some(self.principal.clone()),
488            Some(self.host.clone()),
489            self.operation,
490            self.permission_type,
491        )
492    }
493
494    fn validate(&self) -> Result<()> {
495        if self.principal.trim().is_empty() {
496            return Err(ValidationError::EmptyAclPrincipal.into());
497        }
498        if self.host.trim().is_empty() {
499            return Err(ValidationError::EmptyAclHost.into());
500        }
501        if matches!(self.operation, AclOperation::Any) {
502            return Err(ValidationError::InvalidAclOperation {
503                operation: "ANY".to_owned(),
504            }
505            .into());
506        }
507        if matches!(self.permission_type, AclPermissionType::Any) {
508            return Err(ValidationError::InvalidAclPermissionType {
509                permission_type: "ANY".to_owned(),
510            }
511            .into());
512        }
513        Ok(())
514    }
515}
516
517#[derive(Debug, Clone, PartialEq, Eq)]
518/// Filter form of [`AccessControlEntry`].
519pub struct AccessControlEntryFilter {
520    pub principal: Option<String>,
521    pub host: Option<String>,
522    pub operation: AclOperation,
523    pub permission_type: AclPermissionType,
524}
525
526impl AccessControlEntryFilter {
527    pub fn any() -> Self {
528        Self::new(None, None, AclOperation::Any, AclPermissionType::Any)
529    }
530
531    pub fn new(
532        principal: Option<String>,
533        host: Option<String>,
534        operation: AclOperation,
535        permission_type: AclPermissionType,
536    ) -> Self {
537        Self {
538            principal,
539            host,
540            operation,
541            permission_type,
542        }
543    }
544}
545
546#[derive(Debug, Clone, PartialEq, Eq)]
547/// Binding between a resource pattern and an access control entry.
548pub struct AclBinding {
549    pub pattern: ResourcePattern,
550    pub entry: AccessControlEntry,
551}
552
553impl AclBinding {
554    pub fn new(pattern: ResourcePattern, entry: AccessControlEntry) -> Self {
555        Self { pattern, entry }
556    }
557
558    pub fn to_filter(&self) -> AclBindingFilter {
559        AclBindingFilter::new(self.pattern.to_filter(), self.entry.to_filter())
560    }
561
562    fn validate(&self) -> Result<()> {
563        self.pattern.validate()?;
564        self.entry.validate()
565    }
566
567    fn into_creation(self) -> Result<AclCreation> {
568        self.validate()?;
569        Ok(AclCreation::default()
570            .with_resource_type(self.pattern.resource_type.as_protocol_value())
571            .with_resource_name(StrBytes::from_string(self.pattern.name))
572            .with_resource_pattern_type(self.pattern.pattern_type.as_protocol_value())
573            .with_principal(StrBytes::from_string(self.entry.principal))
574            .with_host(StrBytes::from_string(self.entry.host))
575            .with_operation(self.entry.operation.as_protocol_value())
576            .with_permission_type(self.entry.permission_type.as_protocol_value()))
577    }
578}
579
580#[derive(Debug, Clone, PartialEq, Eq)]
581/// Filter matching ACL bindings.
582pub struct AclBindingFilter {
583    pub pattern_filter: ResourcePatternFilter,
584    pub entry_filter: AccessControlEntryFilter,
585}
586
587impl AclBindingFilter {
588    pub fn any() -> Self {
589        Self::new(
590            ResourcePatternFilter::any(),
591            AccessControlEntryFilter::any(),
592        )
593    }
594
595    pub fn new(
596        pattern_filter: ResourcePatternFilter,
597        entry_filter: AccessControlEntryFilter,
598    ) -> Self {
599        Self {
600            pattern_filter,
601            entry_filter,
602        }
603    }
604
605    fn to_describe_request(&self) -> DescribeAclsRequest {
606        DescribeAclsRequest::default()
607            .with_resource_type_filter(self.pattern_filter.resource_type.as_protocol_value())
608            .with_resource_name_filter(self.pattern_filter.name.clone().map(StrBytes::from_string))
609            .with_pattern_type_filter(self.pattern_filter.pattern_type.as_protocol_value())
610            .with_principal_filter(
611                self.entry_filter
612                    .principal
613                    .clone()
614                    .map(StrBytes::from_string),
615            )
616            .with_host_filter(self.entry_filter.host.clone().map(StrBytes::from_string))
617            .with_operation(self.entry_filter.operation.as_protocol_value())
618            .with_permission_type(self.entry_filter.permission_type.as_protocol_value())
619    }
620
621    fn into_delete_filter(self) -> DeleteAclsFilter {
622        DeleteAclsFilter::default()
623            .with_resource_type_filter(self.pattern_filter.resource_type.as_protocol_value())
624            .with_resource_name_filter(self.pattern_filter.name.map(StrBytes::from_string))
625            .with_pattern_type_filter(self.pattern_filter.pattern_type.as_protocol_value())
626            .with_principal_filter(self.entry_filter.principal.map(StrBytes::from_string))
627            .with_host_filter(self.entry_filter.host.map(StrBytes::from_string))
628            .with_operation(self.entry_filter.operation.as_protocol_value())
629            .with_permission_type(self.entry_filter.permission_type.as_protocol_value())
630    }
631}
632
633#[derive(Debug, Clone, PartialEq, Eq)]
634/// ACLs deleted for one delete filter.
635pub struct DeleteAclsResult {
636    pub matching_acls: Vec<AclBinding>,
637}
638
639#[derive(Debug, Clone, PartialEq, Eq)]
640/// Summary returned by [`KafkaAdmin::list_topics`].
641pub struct TopicListing {
642    /// Topic name.
643    pub name: String,
644    /// Stable topic ID when the broker returns one.
645    pub topic_id: Option<Uuid>,
646    /// Whether Kafka marks the topic as internal.
647    pub is_internal: bool,
648}
649
650#[derive(Debug, Clone, PartialEq, Eq)]
651/// Metadata for one partition in a [`TopicDescription`].
652pub struct TopicPartitionDescription {
653    /// Partition number.
654    pub partition: i32,
655    /// Broker ID currently acting as leader.
656    pub leader_id: i32,
657    /// Leader epoch reported by the broker.
658    pub leader_epoch: i32,
659    /// Broker IDs hosting replicas for this partition.
660    pub replica_nodes: Vec<i32>,
661    /// Broker IDs currently in the in-sync replica set.
662    pub isr_nodes: Vec<i32>,
663    /// Broker IDs for replicas currently offline.
664    pub offline_replicas: Vec<i32>,
665}
666
667#[derive(Debug, Clone, PartialEq, Eq)]
668/// Detailed metadata returned by [`KafkaAdmin::describe_topics`].
669pub struct TopicDescription {
670    /// Topic name.
671    pub name: String,
672    /// Stable topic ID when the broker returns one.
673    pub topic_id: Option<Uuid>,
674    /// Whether Kafka marks the topic as internal.
675    pub is_internal: bool,
676    /// Partition metadata sorted by partition number.
677    pub partitions: Vec<TopicPartitionDescription>,
678}
679
680#[derive(Debug, Clone, PartialEq, Eq)]
681/// Broker endpoint metadata returned in a [`ClusterDescription`].
682pub struct BrokerDescription {
683    /// Broker ID assigned by the Kafka cluster.
684    pub broker_id: i32,
685    /// Broker host advertised by Kafka.
686    pub host: String,
687    /// Broker port advertised by Kafka.
688    pub port: i32,
689    /// Optional rack ID.
690    pub rack: Option<String>,
691    /// Whether the broker is currently fenced.
692    pub is_fenced: bool,
693}
694
695#[derive(Debug, Clone, PartialEq, Eq)]
696/// Cluster metadata returned by [`KafkaAdmin::describe_cluster`].
697pub struct ClusterDescription {
698    /// Kafka cluster ID.
699    pub cluster_id: String,
700    /// Broker ID of the active controller.
701    pub controller_id: i32,
702    /// Brokers known to the cluster, sorted by broker ID.
703    pub brokers: Vec<BrokerDescription>,
704}
705
706#[derive(Debug, Clone, PartialEq, Eq)]
707/// Finalized feature level reported by the cluster.
708pub struct BrokerFeatureLevel {
709    /// Feature name.
710    pub name: String,
711    /// Finalized version level.
712    pub level: i16,
713}
714
715#[derive(Debug, Clone, PartialEq, Eq)]
716/// Finalized broker feature update.
717pub struct FeatureUpdate {
718    /// Feature name.
719    pub name: String,
720    /// New finalized maximum version level. Values below 1 delete the finalized feature.
721    pub max_version_level: i16,
722    /// How the broker should apply an upgrade or downgrade.
723    pub upgrade_type: FeatureUpgradeType,
724}
725
726impl FeatureUpdate {
727    /// Creates an upgrade-only feature update.
728    pub fn upgrade(name: impl Into<String>, max_version_level: i16) -> Self {
729        Self {
730            name: name.into(),
731            max_version_level,
732            upgrade_type: FeatureUpgradeType::Upgrade,
733        }
734    }
735
736    /// Creates a safe downgrade or deletion update.
737    pub fn safe_downgrade(name: impl Into<String>, max_version_level: i16) -> Self {
738        Self {
739            name: name.into(),
740            max_version_level,
741            upgrade_type: FeatureUpgradeType::SafeDowngrade,
742        }
743    }
744
745    /// Creates an unsafe downgrade or deletion update.
746    pub fn unsafe_downgrade(name: impl Into<String>, max_version_level: i16) -> Self {
747        Self {
748            name: name.into(),
749            max_version_level,
750            upgrade_type: FeatureUpgradeType::UnsafeDowngrade,
751        }
752    }
753
754    fn into_request_update(self, version: i16) -> Result<FeatureUpdateKey> {
755        let name = validate_feature_name(self.name)?;
756        let allow_downgrade = self.upgrade_type.allows_downgrade();
757        let mut update = FeatureUpdateKey::default()
758            .with_feature(StrBytes::from_string(name))
759            .with_max_version_level(self.max_version_level);
760        if version == 0 {
761            update = update.with_allow_downgrade(allow_downgrade);
762        } else {
763            update = update.with_upgrade_type(self.upgrade_type.as_protocol_value());
764        }
765        Ok(update)
766    }
767}
768
769#[derive(Debug, Clone, Copy, PartialEq, Eq)]
770/// Type of finalized feature update to perform.
771pub enum FeatureUpgradeType {
772    /// Only allow upgrades.
773    Upgrade,
774    /// Allow safe, lossless downgrades.
775    SafeDowngrade,
776    /// Allow unsafe, potentially lossy downgrades.
777    UnsafeDowngrade,
778}
779
780impl FeatureUpgradeType {
781    fn as_protocol_value(self) -> i8 {
782        match self {
783            Self::Upgrade => 1,
784            Self::SafeDowngrade => 2,
785            Self::UnsafeDowngrade => 3,
786        }
787    }
788
789    fn allows_downgrade(self) -> bool {
790        !matches!(self, Self::Upgrade)
791    }
792}
793
794#[derive(Debug, Clone, PartialEq, Eq)]
795/// Summary returned by [`KafkaAdmin::list_groups`].
796pub struct ConsumerGroupListing {
797    /// Consumer group ID.
798    pub group_id: String,
799    /// Protocol type reported by Kafka.
800    pub protocol_type: String,
801    /// Group state when provided by the broker.
802    pub state: Option<String>,
803    /// Group type when provided by newer broker versions, such as `classic` or `share`.
804    pub group_type: Option<String>,
805}
806
807#[derive(Debug, Clone, Copy, PartialEq, Eq)]
808enum DescribeGroupApi {
809    Consumer,
810    Classic,
811    Share,
812}
813
814#[derive(Debug, Clone, PartialEq, Eq)]
815/// Detailed group metadata returned by group describe APIs.
816pub struct ConsumerGroupDescription {
817    /// Consumer group ID.
818    pub group_id: String,
819    /// Group state reported by Kafka.
820    pub state: String,
821    /// Protocol type, such as `consumer`, `classic`, or `share`.
822    pub protocol_type: String,
823    /// Protocol-specific data, usually the assignor or protocol name.
824    pub protocol_data: String,
825    /// Members currently in the group.
826    pub members: Vec<ConsumerGroupMemberDescription>,
827    /// Authorized operations bitset when returned by the broker.
828    pub authorized_operations: Option<i32>,
829}
830
831#[derive(Debug, Clone, PartialEq, Eq)]
832/// Metadata for one member of a described consumer group.
833pub struct ConsumerGroupMemberDescription {
834    /// Kafka-assigned member ID.
835    pub member_id: String,
836    /// Static membership instance ID, when configured.
837    pub group_instance_id: Option<String>,
838    /// Client ID reported by the member.
839    pub client_id: String,
840    /// Host reported by the member.
841    pub client_host: String,
842    /// Count derived from member metadata. For consumer/share groups this is
843    /// the number of subscribed topic names; for classic groups it is decoded
844    /// from the classic subscription payload when possible.
845    pub member_metadata_bytes: usize,
846    /// Count derived from member assignment metadata. For consumer/share groups
847    /// this is the number of assigned partitions; for classic groups it is
848    /// decoded from the classic assignment payload when possible.
849    pub member_assignment_bytes: usize,
850}
851
852/// Backwards-compatible alias for [`ConsumerGroupListing`].
853pub type GroupListing = ConsumerGroupListing;
854
855/// Backwards-compatible alias for [`ConsumerGroupDescription`].
856pub type GroupDescription = ConsumerGroupDescription;
857
858/// Backwards-compatible alias for [`ConsumerGroupMemberDescription`].
859pub type GroupMemberDescription = ConsumerGroupMemberDescription;
860
861#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
862/// Kafka resource type used by config APIs.
863pub enum ConfigResourceType {
864    /// Unknown or unsupported resource type.
865    Unknown,
866    /// Topic-level configuration.
867    Topic,
868    /// Broker configuration.
869    Broker,
870    /// Broker logger configuration.
871    BrokerLogger,
872    /// Client metrics configuration.
873    ClientMetrics,
874    /// Consumer group configuration.
875    Group,
876}
877
878impl ConfigResourceType {
879    fn as_protocol_value(self) -> i8 {
880        match self {
881            Self::Unknown => 0,
882            Self::Topic => 2,
883            Self::Broker => 4,
884            Self::BrokerLogger => 8,
885            Self::ClientMetrics => 16,
886            Self::Group => 32,
887        }
888    }
889
890    fn from_protocol_value(value: i8) -> Self {
891        match value {
892            2 => Self::Topic,
893            4 => Self::Broker,
894            8 => Self::BrokerLogger,
895            16 => Self::ClientMetrics,
896            32 => Self::Group,
897            _ => Self::Unknown,
898        }
899    }
900}
901
902#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
903/// Kafka resource identified for config describe or alter operations.
904///
905/// Use [`ConfigResource::topic`] and [`ConfigResource::group`] for the common
906/// cases, or [`ConfigResource::new`] for broker and other resource types.
907pub struct ConfigResource {
908    /// Type of Kafka resource.
909    pub resource_type: ConfigResourceType,
910    /// Resource name, such as a topic name, group ID, or broker ID string.
911    pub resource_name: String,
912}
913
914impl ConfigResource {
915    /// Creates a config resource identifier.
916    pub fn new(resource_type: ConfigResourceType, resource_name: impl Into<String>) -> Self {
917        Self {
918            resource_type,
919            resource_name: resource_name.into(),
920        }
921    }
922
923    /// Creates a topic config resource.
924    pub fn topic(resource_name: impl Into<String>) -> Self {
925        Self::new(ConfigResourceType::Topic, resource_name)
926    }
927
928    /// Creates a group config resource.
929    pub fn group(resource_name: impl Into<String>) -> Self {
930        Self::new(ConfigResourceType::Group, resource_name)
931    }
932}
933
934#[derive(Debug, Clone, PartialEq, Eq)]
935/// One Kafka configuration entry returned by [`KafkaAdmin::describe_configs`].
936pub struct ConfigEntry {
937    /// Config key.
938    pub name: String,
939    /// Config value. Sensitive entries are usually returned as `None`.
940    pub value: Option<String>,
941    /// Whether Kafka reports the config as read-only.
942    pub read_only: bool,
943    /// Broker protocol config source value.
944    pub config_source: i8,
945    /// Whether the broker marks the config as sensitive.
946    pub is_sensitive: bool,
947    /// Broker protocol config type value when supported.
948    pub config_type: Option<i8>,
949    /// Broker-supplied config documentation when available.
950    pub documentation: Option<String>,
951}
952
953#[derive(Debug, Clone, PartialEq, Eq)]
954/// Described configuration for one Kafka resource.
955pub struct ConfigResourceConfig {
956    /// Resource that was described.
957    pub resource: ConfigResource,
958    /// Config entries keyed by config name.
959    pub entries: BTreeMap<String, ConfigEntry>,
960}
961
962#[derive(Debug, Clone, PartialEq, Eq)]
963/// Log directory usage returned by [`KafkaAdmin::describe_log_dirs`].
964pub struct BrokerLogDirs {
965    /// Broker ID that returned these log directories.
966    pub broker_id: i32,
967    /// Log directory details reported by the broker.
968    pub log_dirs: Vec<LogDirDescription>,
969}
970
971#[derive(Debug, Clone, PartialEq, Eq)]
972/// One broker log directory.
973pub struct LogDirDescription {
974    /// Absolute log directory path.
975    pub log_dir: String,
976    /// Broker protocol error code for this log directory, if any.
977    pub error_code: i16,
978    /// Total size of the filesystem containing this log dir when returned by Kafka.
979    pub total_bytes: Option<i64>,
980    /// Usable size of the filesystem containing this log dir when returned by Kafka.
981    pub usable_bytes: Option<i64>,
982    /// Replica logs in this directory.
983    pub replicas: Vec<ReplicaLogDirDescription>,
984}
985
986#[derive(Debug, Clone, PartialEq, Eq)]
987/// Size information for one replica log in a broker log directory.
988pub struct ReplicaLogDirDescription {
989    /// Topic name.
990    pub topic: String,
991    /// Partition number.
992    pub partition: i32,
993    /// Log segment size in bytes.
994    pub size_bytes: i64,
995    /// Offset lag reported by Kafka.
996    pub offset_lag: i64,
997    /// Whether this is a future replica log.
998    pub is_future: bool,
999}
1000
1001impl ConfigResourceConfig {
1002    /// Returns a config entry by name.
1003    pub fn entry(&self, name: &str) -> Option<&ConfigEntry> {
1004        self.entries.get(name)
1005    }
1006}
1007
1008#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1009/// Operation type for an incremental config change.
1010pub enum AlterConfigOpType {
1011    /// Set or replace the config value.
1012    Set,
1013    /// Delete the config and fall back to Kafka defaults.
1014    Delete,
1015    /// Append to a list-valued config.
1016    Append,
1017    /// Remove from a list-valued config.
1018    Subtract,
1019}
1020
1021impl AlterConfigOpType {
1022    fn as_protocol_value(self) -> i8 {
1023        match self {
1024            Self::Set => 0,
1025            Self::Delete => 1,
1026            Self::Append => 2,
1027            Self::Subtract => 3,
1028        }
1029    }
1030}
1031
1032#[derive(Debug, Clone, PartialEq, Eq)]
1033/// One config mutation used with [`KafkaAdmin::incremental_alter_configs`].
1034pub struct AlterConfigOp {
1035    /// Config key.
1036    pub name: String,
1037    /// Operation to apply.
1038    pub op_type: AlterConfigOpType,
1039    /// Config value for operations that require one.
1040    pub value: Option<String>,
1041}
1042
1043impl AlterConfigOp {
1044    /// Creates a config set operation.
1045    pub fn set(name: impl Into<String>, value: impl Into<String>) -> Self {
1046        Self {
1047            name: name.into(),
1048            op_type: AlterConfigOpType::Set,
1049            value: Some(value.into()),
1050        }
1051    }
1052
1053    /// Creates a config delete operation.
1054    pub fn delete(name: impl Into<String>) -> Self {
1055        Self {
1056            name: name.into(),
1057            op_type: AlterConfigOpType::Delete,
1058            value: None,
1059        }
1060    }
1061}
1062
1063#[derive(Debug, Clone)]
1064/// Client for Kafka admin operations.
1065///
1066/// `KafkaAdmin` is an async client for cluster, topic, group, config, feature,
1067/// and SCRAM credential operations. Construct it either from an [`AdminConfig`]
1068/// when you need explicit configuration, or through [`KafkaClient::admin`] for
1069/// the builder-style API.
1070///
1071/// ```no_run
1072/// # async fn example() -> kafkit_client::Result<()> {
1073/// use kafkit_client::{AdminConfig, KafkaAdmin, NewTopic};
1074///
1075/// let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;
1076///
1077/// admin.create_topics([NewTopic::new("orders", 3, 1)]).await?;
1078/// let topics = admin.list_topics().await?;
1079/// # let _ = topics;
1080/// # Ok(())
1081/// # }
1082/// ```
1083///
1084/// [`KafkaClient::admin`]: crate::KafkaClient::admin
1085pub struct KafkaAdmin {
1086    config: AdminConfig,
1087}
1088
1089impl KafkaAdmin {
1090    #[instrument(
1091        name = "admin.connect",
1092        level = "debug",
1093        skip(config),
1094        fields(
1095            bootstrap_server_count = config.bootstrap_servers.len(),
1096            client_id = %config.client_id
1097        )
1098    )]
1099    /// Connects to Kafka and returns an admin client.
1100    ///
1101    /// This performs a lightweight bootstrap connection so configuration,
1102    /// security settings, and broker reachability fail before the client is
1103    /// returned.
1104    pub async fn connect(config: AdminConfig) -> Result<Self> {
1105        let admin = Self { config };
1106        admin.warm_up().await?;
1107        debug!("admin client connected");
1108        Ok(admin)
1109    }
1110
1111    #[instrument(name = "admin.create_topics", level = "debug", skip(self, topics))]
1112    /// Creates one or more topics.
1113    ///
1114    /// Existing topics are treated as success, matching the common
1115    /// create-if-missing setup flow. Passing an empty iterator is a no-op.
1116    pub async fn create_topics<I>(&self, topics: I) -> Result<()>
1117    where
1118        I: IntoIterator<Item = NewTopic>,
1119    {
1120        let topics = topics
1121            .into_iter()
1122            .map(NewTopic::into_request_topic)
1123            .collect::<Result<Vec<_>>>()?;
1124        if topics.is_empty() {
1125            return Ok(());
1126        }
1127
1128        let request = CreateTopicsRequest::default()
1129            .with_topics(topics)
1130            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
1131            .with_validate_only(false);
1132        let response: CreateTopicsResponse = self
1133            .send_request::<CreateTopicsRequest>(CREATE_TOPICS_VERSION_CAP, &request)
1134            .await?;
1135
1136        for topic in response.topics {
1137            let name = topic.name.0.to_string();
1138            if let Some(error) = topic
1139                .error_code
1140                .err()
1141                .filter(|error| !is_ignorable_create_topic_error(*error))
1142            {
1143                return Err(admin_broker_error("create_topic", Some(name), error));
1144            }
1145        }
1146
1147        Ok(())
1148    }
1149
1150    #[instrument(name = "admin.delete_topics", level = "debug", skip(self, topics))]
1151    /// Deletes one or more topics by name.
1152    ///
1153    /// Passing an empty iterator is a no-op. Kafka must have topic deletion
1154    /// enabled for the broker to complete the operation.
1155    pub async fn delete_topics<I, S>(&self, topics: I) -> Result<()>
1156    where
1157        I: IntoIterator<Item = S>,
1158        S: Into<String>,
1159    {
1160        let topic_names = topics
1161            .into_iter()
1162            .map(|topic| validate_topic_name(topic.into()))
1163            .collect::<Result<Vec<_>>>()?;
1164        if topic_names.is_empty() {
1165            return Ok(());
1166        }
1167
1168        let request = DeleteTopicsRequest::default()
1169            .with_topic_names(
1170                topic_names
1171                    .iter()
1172                    .cloned()
1173                    .map(StrBytes::from_string)
1174                    .map(Into::into)
1175                    .collect(),
1176            )
1177            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?);
1178        let response: DeleteTopicsResponse = self
1179            .send_request::<DeleteTopicsRequest>(DELETE_TOPICS_VERSION_CAP, &request)
1180            .await?;
1181
1182        for topic in response.responses {
1183            let name = topic
1184                .name
1185                .as_ref()
1186                .map(|name| name.0.to_string())
1187                .unwrap_or_else(|| "<unknown>".to_owned());
1188            if let Some(error) = topic.error_code.err() {
1189                return Err(admin_broker_error("delete_topic", Some(name), error));
1190            }
1191        }
1192
1193        Ok(())
1194    }
1195
1196    #[instrument(
1197        name = "admin.create_partitions",
1198        level = "debug",
1199        skip(self, partitions)
1200    )]
1201    /// Increases partition counts for one or more topics.
1202    ///
1203    /// Each item is a `(topic_name, NewPartitions)` pair. Kafka only supports
1204    /// increasing the total partition count; it cannot shrink a topic.
1205    pub async fn create_partitions<I, S>(&self, partitions: I) -> Result<()>
1206    where
1207        I: IntoIterator<Item = (S, NewPartitions)>,
1208        S: Into<String>,
1209    {
1210        let topics = partitions
1211            .into_iter()
1212            .map(|(topic, new_partitions)| new_partitions.into_request_topic(topic.into()))
1213            .collect::<Result<Vec<_>>>()?;
1214        if topics.is_empty() {
1215            return Ok(());
1216        }
1217
1218        let request = CreatePartitionsRequest::default()
1219            .with_topics(topics)
1220            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
1221            .with_validate_only(false);
1222        let response: CreatePartitionsResponse = self
1223            .send_request::<CreatePartitionsRequest>(CREATE_PARTITIONS_VERSION_CAP, &request)
1224            .await?;
1225
1226        for topic in response.results {
1227            let name = topic.name.0.to_string();
1228            if let Some(error) = topic.error_code.err() {
1229                return Err(admin_broker_error("create_partitions", Some(name), error));
1230            }
1231        }
1232
1233        Ok(())
1234    }
1235
1236    #[instrument(name = "admin.list_topics", level = "debug", skip(self))]
1237    /// Lists topics known to the cluster.
1238    ///
1239    /// The returned list is sorted by topic name.
1240    pub async fn list_topics(&self) -> Result<Vec<TopicListing>> {
1241        let response = self.fetch_metadata(None).await?;
1242        let mut topics = Vec::new();
1243
1244        for topic in response.topics {
1245            let Some(name) = topic.name.as_ref().map(|name| name.0.to_string()) else {
1246                continue;
1247            };
1248            if let Some(error) = topic.error_code.err() {
1249                return Err(admin_broker_error("list_topics", Some(name), error));
1250            }
1251
1252            topics.push(TopicListing {
1253                name,
1254                topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
1255                is_internal: topic.is_internal,
1256            });
1257        }
1258
1259        topics.sort_by(|left, right| left.name.cmp(&right.name));
1260        Ok(topics)
1261    }
1262
1263    #[instrument(name = "admin.describe_topics", level = "debug", skip(self, topics))]
1264    /// Describes selected topics.
1265    ///
1266    /// The returned descriptions preserve the input topic order. Passing an
1267    /// empty iterator returns an empty vector.
1268    pub async fn describe_topics<I, S>(&self, topics: I) -> Result<Vec<TopicDescription>>
1269    where
1270        I: IntoIterator<Item = S>,
1271        S: Into<String>,
1272    {
1273        let requested_topics = topics
1274            .into_iter()
1275            .map(|topic| validate_topic_name(topic.into()))
1276            .collect::<Result<Vec<_>>>()?;
1277        if requested_topics.is_empty() {
1278            return Ok(Vec::new());
1279        }
1280
1281        let response = self.fetch_metadata(Some(&requested_topics)).await?;
1282        let mut descriptions = BTreeMap::new();
1283
1284        for topic in response.topics {
1285            let name = topic
1286                .name
1287                .as_ref()
1288                .map(|name| name.0.to_string())
1289                .unwrap_or_default();
1290            if let Some(error) = topic.error_code.err() {
1291                let label = if name.is_empty() { "<unknown>" } else { &name };
1292                return Err(admin_broker_error(
1293                    "describe_topic",
1294                    Some(label.to_owned()),
1295                    error,
1296                ));
1297            }
1298
1299            let mut partitions = topic
1300                .partitions
1301                .into_iter()
1302                .map(|partition| {
1303                    if let Some(error) = partition.error_code.err() {
1304                        return Err(admin_broker_error(
1305                            "describe_topic_partition",
1306                            Some(format!("{name}:{}", partition.partition_index)),
1307                            error,
1308                        ));
1309                    }
1310
1311                    Ok(TopicPartitionDescription {
1312                        partition: partition.partition_index,
1313                        leader_id: partition.leader_id.0,
1314                        leader_epoch: partition.leader_epoch,
1315                        replica_nodes: partition.replica_nodes.into_iter().map(|id| id.0).collect(),
1316                        isr_nodes: partition.isr_nodes.into_iter().map(|id| id.0).collect(),
1317                        offline_replicas: partition
1318                            .offline_replicas
1319                            .into_iter()
1320                            .map(|id| id.0)
1321                            .collect(),
1322                    })
1323                })
1324                .collect::<Result<Vec<_>>>()?;
1325            partitions.sort_by_key(|partition| partition.partition);
1326
1327            descriptions.insert(
1328                name.clone(),
1329                TopicDescription {
1330                    name,
1331                    topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
1332                    is_internal: topic.is_internal,
1333                    partitions,
1334                },
1335            );
1336        }
1337
1338        requested_topics
1339            .into_iter()
1340            .map(|topic| {
1341                descriptions.remove(&topic).ok_or_else(|| {
1342                    anyhow!("metadata response did not include topic '{topic}'").into()
1343                })
1344            })
1345            .collect()
1346    }
1347
1348    #[instrument(name = "admin.describe_cluster", level = "debug", skip(self))]
1349    /// Describes cluster ID, controller, and broker endpoints.
1350    pub async fn describe_cluster(&self) -> Result<ClusterDescription> {
1351        let (mut connection, version) = self
1352            .connect_with_version::<DescribeClusterRequest>(DESCRIBE_CLUSTER_VERSION_CAP)
1353            .await?;
1354        let mut request =
1355            DescribeClusterRequest::default().with_include_cluster_authorized_operations(false);
1356        if version >= 1 {
1357            request = request.with_endpoint_type(1);
1358        }
1359        if version >= 2 {
1360            request = request.with_include_fenced_brokers(true);
1361        }
1362
1363        let response: DescribeClusterResponse = connection
1364            .send_request::<DescribeClusterRequest>(&self.config.client_id, version, &request)
1365            .await?;
1366        if let Some(error) = response.error_code.err() {
1367            return Err(admin_broker_error(
1368                "describe_cluster",
1369                None::<String>,
1370                error,
1371            ));
1372        }
1373
1374        let mut brokers = response
1375            .brokers
1376            .into_iter()
1377            .map(|broker| BrokerDescription {
1378                broker_id: broker.broker_id.0,
1379                host: broker.host.to_string(),
1380                port: broker.port,
1381                rack: broker.rack.map(|rack| rack.to_string()),
1382                is_fenced: broker.is_fenced,
1383            })
1384            .collect::<Vec<_>>();
1385        brokers.sort_by_key(|broker| broker.broker_id);
1386
1387        Ok(ClusterDescription {
1388            cluster_id: response.cluster_id.to_string(),
1389            controller_id: response.controller_id.0,
1390            brokers,
1391        })
1392    }
1393
1394    #[instrument(name = "admin.describe_log_dirs", level = "debug", skip(self, brokers))]
1395    /// Describes broker log directories and replica log sizes.
1396    ///
1397    /// Kafka serves this API from each broker. Pass broker metadata returned by
1398    /// [`KafkaAdmin::describe_cluster`]. When `topics` is `None`, Kafka returns
1399    /// all topic-partitions hosted by each broker.
1400    pub async fn describe_log_dirs<I>(
1401        &self,
1402        brokers: I,
1403        topics: Option<&[TopicPartition]>,
1404    ) -> Result<Vec<BrokerLogDirs>>
1405    where
1406        I: IntoIterator<Item = BrokerDescription>,
1407    {
1408        let request_topics = topics.map(describe_log_dirs_topics);
1409        let mut handles = Vec::new();
1410        for broker in brokers {
1411            let config = self.config.clone();
1412            let request_topics = request_topics.clone();
1413            let broker_id = broker.broker_id;
1414            handles.push((
1415                broker_id,
1416                tokio::spawn(async move {
1417                    describe_log_dirs_for_broker(config, broker, request_topics).await
1418                }),
1419            ));
1420        }
1421
1422        let mut results = Vec::with_capacity(handles.len());
1423        let mut first_error = None;
1424        for (broker_id, handle) in handles {
1425            match handle.await {
1426                Ok(Ok(broker_result)) => results.push(broker_result),
1427                Ok(Err(error)) => {
1428                    first_error.get_or_insert(error);
1429                }
1430                Err(error) => {
1431                    first_error.get_or_insert(
1432                        anyhow!("describe log dirs for broker {broker_id} task failed: {error}")
1433                            .into(),
1434                    );
1435                }
1436            }
1437        }
1438        if let Some(error) = first_error {
1439            return Err(error);
1440        }
1441        results.sort_by_key(|broker| broker.broker_id);
1442        Ok(results)
1443    }
1444
1445    #[instrument(name = "admin.list_groups", level = "debug", skip(self))]
1446    /// Lists consumer groups known to the cluster.
1447    ///
1448    /// The returned list is sorted by group ID.
1449    pub async fn list_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
1450        let response: ListGroupsResponse = self
1451            .send_request::<ListGroupsRequest>(
1452                LIST_GROUPS_VERSION_CAP,
1453                &ListGroupsRequest::default(),
1454            )
1455            .await?;
1456        if let Some(error) = response.error_code.err() {
1457            return Err(admin_broker_error("list_groups", None::<String>, error));
1458        }
1459
1460        let mut groups = response
1461            .groups
1462            .into_iter()
1463            .map(|group| ConsumerGroupListing {
1464                group_id: group.group_id.to_string(),
1465                protocol_type: group.protocol_type.to_string(),
1466                state: (!group.group_state.is_empty()).then(|| group.group_state.to_string()),
1467                group_type: (!group.group_type.is_empty()).then(|| group.group_type.to_string()),
1468            })
1469            .collect::<Vec<_>>();
1470        groups.sort_by(|left, right| left.group_id.cmp(&right.group_id));
1471        Ok(groups)
1472    }
1473
1474    #[instrument(name = "admin.list_consumer_groups", level = "debug", skip(self))]
1475    /// Alias for [`KafkaAdmin::list_groups`].
1476    pub async fn list_consumer_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
1477        self.list_groups().await
1478    }
1479
1480    #[instrument(name = "admin.describe_groups", level = "debug", skip(self, groups))]
1481    /// Describes groups, routing each group to the appropriate Kafka describe API.
1482    ///
1483    /// Newer brokers report group type in `ListGroups`; this method uses that
1484    /// metadata to describe consumer, share, and classic groups. The returned
1485    /// descriptions preserve the input group order.
1486    pub async fn describe_groups<I, S>(&self, groups: I) -> Result<Vec<GroupDescription>>
1487    where
1488        I: IntoIterator<Item = S>,
1489        S: Into<String>,
1490    {
1491        let group_ids = groups
1492            .into_iter()
1493            .map(|group| validate_group_id(group.into()))
1494            .collect::<Result<Vec<_>>>()?;
1495        if group_ids.is_empty() {
1496            return Ok(Vec::new());
1497        }
1498
1499        let listed_group_types = self
1500            .list_groups()
1501            .await?
1502            .into_iter()
1503            .map(|group| (group.group_id, group.group_type))
1504            .collect::<BTreeMap<_, _>>();
1505        let mut consumer_groups = Vec::new();
1506        let mut share_groups = Vec::new();
1507        let mut classic_groups = Vec::new();
1508        for group_id in &group_ids {
1509            match describe_group_api_for_listed_type(
1510                listed_group_types
1511                    .get(group_id)
1512                    .and_then(|group_type| group_type.as_deref()),
1513            ) {
1514                DescribeGroupApi::Share => {
1515                    share_groups.push(group_id.clone());
1516                }
1517                DescribeGroupApi::Classic => {
1518                    classic_groups.push(group_id.clone());
1519                }
1520                DescribeGroupApi::Consumer => {
1521                    consumer_groups.push(group_id.clone());
1522                }
1523            }
1524        }
1525
1526        let mut descriptions = BTreeMap::new();
1527        for group in self.describe_consumer_groups(consumer_groups).await? {
1528            descriptions.insert(group.group_id.clone(), group);
1529        }
1530        for group in self.describe_share_groups(share_groups).await? {
1531            descriptions.insert(group.group_id.clone(), group);
1532        }
1533        for group in self.describe_classic_groups(classic_groups).await? {
1534            descriptions.insert(group.group_id.clone(), group);
1535        }
1536
1537        group_ids
1538            .into_iter()
1539            .map(|group_id| {
1540                descriptions.remove(&group_id).ok_or_else(|| {
1541                    anyhow!("describe groups response did not include group '{group_id}'").into()
1542                })
1543            })
1544            .collect()
1545    }
1546
1547    #[instrument(
1548        name = "admin.describe_consumer_groups",
1549        level = "debug",
1550        skip(self, groups)
1551    )]
1552    /// Describes KIP-848 consumer groups.
1553    ///
1554    /// Use [`KafkaAdmin::describe_groups`] when the group type is not known.
1555    pub async fn describe_consumer_groups<I, S>(
1556        &self,
1557        groups: I,
1558    ) -> Result<Vec<ConsumerGroupDescription>>
1559    where
1560        I: IntoIterator<Item = S>,
1561        S: Into<String>,
1562    {
1563        let group_ids = groups
1564            .into_iter()
1565            .map(|group| validate_group_id(group.into()))
1566            .collect::<Result<Vec<_>>>()?;
1567        if group_ids.is_empty() {
1568            return Ok(Vec::new());
1569        }
1570
1571        let request = ConsumerGroupDescribeRequest::default()
1572            .with_group_ids(
1573                group_ids
1574                    .iter()
1575                    .cloned()
1576                    .map(StrBytes::from_string)
1577                    .map(Into::into)
1578                    .collect(),
1579            )
1580            .with_include_authorized_operations(false);
1581        let response: ConsumerGroupDescribeResponse = self
1582            .send_request::<ConsumerGroupDescribeRequest>(
1583                CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
1584                &request,
1585            )
1586            .await?;
1587
1588        let mut descriptions = BTreeMap::new();
1589        for group in response.groups {
1590            let group_id = group.group_id.to_string();
1591            if let Some(error) = group.error_code.err() {
1592                return Err(admin_broker_error(
1593                    "describe_consumer_group",
1594                    Some(group_id),
1595                    error,
1596                ));
1597            }
1598
1599            descriptions.insert(
1600                group_id.clone(),
1601                ConsumerGroupDescription {
1602                    group_id,
1603                    state: group.group_state.to_string(),
1604                    protocol_type: "consumer".to_owned(),
1605                    protocol_data: group.assignor_name.to_string(),
1606                    members: group
1607                        .members
1608                        .into_iter()
1609                        .map(|member| ConsumerGroupMemberDescription {
1610                            member_id: member.member_id.to_string(),
1611                            group_instance_id: member
1612                                .instance_id
1613                                .map(|instance_id| instance_id.to_string()),
1614                            client_id: member.client_id.to_string(),
1615                            client_host: member.client_host.to_string(),
1616                            member_metadata_bytes: member.subscribed_topic_names.len(),
1617                            member_assignment_bytes: assignment_partition_count(&member.assignment),
1618                        })
1619                        .collect(),
1620                    authorized_operations: (group.authorized_operations != i32::MIN)
1621                        .then_some(group.authorized_operations),
1622                },
1623            );
1624        }
1625
1626        group_ids
1627            .into_iter()
1628            .map(|group_id| {
1629                descriptions.remove(&group_id).ok_or_else(|| {
1630                    anyhow!("describe groups response did not include group '{group_id}'").into()
1631                })
1632            })
1633            .collect()
1634    }
1635
1636    #[instrument(
1637        name = "admin.describe_classic_groups",
1638        level = "debug",
1639        skip(self, groups)
1640    )]
1641    /// Describes classic consumer groups with Kafka's legacy describe-groups API.
1642    ///
1643    /// Use [`KafkaAdmin::describe_groups`] when the group type is not known.
1644    pub async fn describe_classic_groups<I, S>(
1645        &self,
1646        groups: I,
1647    ) -> Result<Vec<ConsumerGroupDescription>>
1648    where
1649        I: IntoIterator<Item = S>,
1650        S: Into<String>,
1651    {
1652        let group_ids = groups
1653            .into_iter()
1654            .map(|group| validate_group_id(group.into()))
1655            .collect::<Result<Vec<_>>>()?;
1656        if group_ids.is_empty() {
1657            return Ok(Vec::new());
1658        }
1659
1660        let request = DescribeGroupsRequest::default()
1661            .with_groups(
1662                group_ids
1663                    .iter()
1664                    .cloned()
1665                    .map(StrBytes::from_string)
1666                    .map(Into::into)
1667                    .collect(),
1668            )
1669            .with_include_authorized_operations(false);
1670        let response: DescribeGroupsResponse = self
1671            .send_request::<DescribeGroupsRequest>(DESCRIBE_GROUPS_VERSION_CAP, &request)
1672            .await?;
1673
1674        let mut descriptions = BTreeMap::new();
1675        for group in response.groups {
1676            let group_id = group.group_id.to_string();
1677            if let Some(error) = group.error_code.err() {
1678                return Err(admin_broker_error(
1679                    "describe_classic_group",
1680                    Some(group_id),
1681                    error,
1682                ));
1683            }
1684
1685            descriptions.insert(
1686                group_id.clone(),
1687                ConsumerGroupDescription {
1688                    group_id,
1689                    state: group.group_state.to_string(),
1690                    protocol_type: group.protocol_type.to_string(),
1691                    protocol_data: group.protocol_data.to_string(),
1692                    members: group
1693                        .members
1694                        .into_iter()
1695                        .map(|member| {
1696                            let member_metadata_bytes =
1697                                classic_subscription_topic_count(&member.member_metadata);
1698                            let member_assignment_bytes =
1699                                classic_assignment_partition_count(&member.member_assignment);
1700                            ConsumerGroupMemberDescription {
1701                                member_id: member.member_id.to_string(),
1702                                group_instance_id: member
1703                                    .group_instance_id
1704                                    .map(|instance_id| instance_id.to_string()),
1705                                client_id: member.client_id.to_string(),
1706                                client_host: member.client_host.to_string(),
1707                                member_metadata_bytes,
1708                                member_assignment_bytes,
1709                            }
1710                        })
1711                        .collect(),
1712                    authorized_operations: (group.authorized_operations != i32::MIN)
1713                        .then_some(group.authorized_operations),
1714                },
1715            );
1716        }
1717
1718        group_ids
1719            .into_iter()
1720            .map(|group_id| {
1721                descriptions.remove(&group_id).ok_or_else(|| {
1722                    anyhow!("describe classic groups response did not include group '{group_id}'")
1723                        .into()
1724                })
1725            })
1726            .collect()
1727    }
1728
1729    #[instrument(
1730        name = "admin.describe_share_groups",
1731        level = "debug",
1732        skip(self, groups)
1733    )]
1734    /// Describes Kafka share groups.
1735    ///
1736    /// Use [`KafkaAdmin::describe_groups`] when the group type is not known.
1737    pub async fn describe_share_groups<I, S>(
1738        &self,
1739        groups: I,
1740    ) -> Result<Vec<ConsumerGroupDescription>>
1741    where
1742        I: IntoIterator<Item = S>,
1743        S: Into<String>,
1744    {
1745        let group_ids = groups
1746            .into_iter()
1747            .map(|group| validate_group_id(group.into()))
1748            .collect::<Result<Vec<_>>>()?;
1749        if group_ids.is_empty() {
1750            return Ok(Vec::new());
1751        }
1752
1753        let request = ShareGroupDescribeRequest::default()
1754            .with_group_ids(
1755                group_ids
1756                    .iter()
1757                    .cloned()
1758                    .map(StrBytes::from_string)
1759                    .map(Into::into)
1760                    .collect(),
1761            )
1762            .with_include_authorized_operations(false);
1763        let response: ShareGroupDescribeResponse = self
1764            .send_request::<ShareGroupDescribeRequest>(SHARE_GROUP_DESCRIBE_VERSION_CAP, &request)
1765            .await?;
1766
1767        let mut descriptions = BTreeMap::new();
1768        for group in response.groups {
1769            let group_id = group.group_id.to_string();
1770            if let Some(error) = group.error_code.err() {
1771                return Err(admin_broker_error(
1772                    "describe_share_group",
1773                    Some(group_id),
1774                    error,
1775                ));
1776            }
1777
1778            descriptions.insert(
1779                group_id.clone(),
1780                ConsumerGroupDescription {
1781                    group_id,
1782                    state: group.group_state.to_string(),
1783                    protocol_type: "share".to_owned(),
1784                    protocol_data: group.assignor_name.to_string(),
1785                    members: group
1786                        .members
1787                        .into_iter()
1788                        .map(|member| ConsumerGroupMemberDescription {
1789                            member_id: member.member_id.to_string(),
1790                            group_instance_id: None,
1791                            client_id: member.client_id.to_string(),
1792                            client_host: member.client_host.to_string(),
1793                            member_metadata_bytes: member.subscribed_topic_names.len(),
1794                            member_assignment_bytes: share_assignment_partition_count(
1795                                &member.assignment,
1796                            ),
1797                        })
1798                        .collect(),
1799                    authorized_operations: (group.authorized_operations != i32::MIN)
1800                        .then_some(group.authorized_operations),
1801                },
1802            );
1803        }
1804
1805        group_ids
1806            .into_iter()
1807            .map(|group_id| {
1808                descriptions.remove(&group_id).ok_or_else(|| {
1809                    anyhow!("describe share groups response did not include group '{group_id}'")
1810                        .into()
1811                })
1812            })
1813            .collect()
1814    }
1815
1816    #[instrument(name = "admin.delete_groups", level = "debug", skip(self, groups))]
1817    /// Deletes one or more groups by ID.
1818    ///
1819    /// Passing an empty iterator is a no-op.
1820    pub async fn delete_groups<I, S>(&self, groups: I) -> Result<()>
1821    where
1822        I: IntoIterator<Item = S>,
1823        S: Into<String>,
1824    {
1825        let group_ids = groups
1826            .into_iter()
1827            .map(|group| validate_group_id(group.into()))
1828            .collect::<Result<Vec<_>>>()?;
1829        if group_ids.is_empty() {
1830            return Ok(());
1831        }
1832
1833        let request = DeleteGroupsRequest::default().with_groups_names(
1834            group_ids
1835                .iter()
1836                .cloned()
1837                .map(StrBytes::from_string)
1838                .map(Into::into)
1839                .collect(),
1840        );
1841        let response: DeleteGroupsResponse = self
1842            .send_request::<DeleteGroupsRequest>(DELETE_GROUPS_VERSION_CAP, &request)
1843            .await?;
1844
1845        for result in response.results {
1846            if let Some(error) = result.error_code.err() {
1847                return Err(admin_broker_error(
1848                    "delete_group",
1849                    Some(result.group_id.to_string()),
1850                    error,
1851                ));
1852            }
1853        }
1854        Ok(())
1855    }
1856
1857    #[instrument(
1858        name = "admin.delete_consumer_groups",
1859        level = "debug",
1860        skip(self, groups)
1861    )]
1862    /// Alias for [`KafkaAdmin::delete_groups`].
1863    pub async fn delete_consumer_groups<I, S>(&self, groups: I) -> Result<()>
1864    where
1865        I: IntoIterator<Item = S>,
1866        S: Into<String>,
1867    {
1868        self.delete_groups(groups).await
1869    }
1870
1871    #[instrument(name = "admin.describe_acls", level = "debug", skip(self, filter))]
1872    /// Describes ACL bindings matching a filter.
1873    ///
1874    /// The shape mirrors Kafka's `Admin#describeAcls(AclBindingFilter)`.
1875    pub async fn describe_acls(&self, filter: AclBindingFilter) -> Result<Vec<AclBinding>> {
1876        let request = filter.to_describe_request();
1877        let response: DescribeAclsResponse = self
1878            .send_request::<DescribeAclsRequest>(DESCRIBE_ACLS_VERSION_CAP, &request)
1879            .await?;
1880        if let Some(error) = response.error_code.err() {
1881            return Err(admin_broker_error("describe_acls", None::<String>, error));
1882        }
1883
1884        let mut bindings = Vec::new();
1885        for resource in response.resources {
1886            let pattern = ResourcePattern::new(
1887                ResourceType::from_protocol_value(resource.resource_type),
1888                resource.resource_name.to_string(),
1889                PatternType::from_protocol_value(resource.pattern_type),
1890            );
1891            for acl in resource.acls {
1892                bindings.push(AclBinding::new(
1893                    pattern.clone(),
1894                    AccessControlEntry::new(
1895                        acl.principal.to_string(),
1896                        acl.host.to_string(),
1897                        AclOperation::from_protocol_value(acl.operation),
1898                        AclPermissionType::from_protocol_value(acl.permission_type),
1899                    ),
1900                ));
1901            }
1902        }
1903        bindings.sort_by(|left, right| {
1904            left.pattern
1905                .name
1906                .cmp(&right.pattern.name)
1907                .then_with(|| left.entry.principal.cmp(&right.entry.principal))
1908                .then_with(|| left.entry.host.cmp(&right.entry.host))
1909        });
1910        Ok(bindings)
1911    }
1912
1913    #[instrument(name = "admin.create_acls", level = "debug", skip(self, acls))]
1914    /// Creates ACL bindings.
1915    ///
1916    /// The shape mirrors Kafka's `Admin#createAcls(Collection<AclBinding>)`.
1917    pub async fn create_acls<I>(&self, acls: I) -> Result<()>
1918    where
1919        I: IntoIterator<Item = AclBinding>,
1920    {
1921        let creations = acls
1922            .into_iter()
1923            .map(AclBinding::into_creation)
1924            .collect::<Result<Vec<_>>>()?;
1925        if creations.is_empty() {
1926            return Ok(());
1927        }
1928
1929        let request = CreateAclsRequest::default().with_creations(creations);
1930        let response: CreateAclsResponse = self
1931            .send_request::<CreateAclsRequest>(CREATE_ACLS_VERSION_CAP, &request)
1932            .await?;
1933        for result in response.results {
1934            if let Some(error) = result.error_code.err() {
1935                return Err(admin_broker_error("create_acl", None::<String>, error));
1936            }
1937        }
1938        Ok(())
1939    }
1940
1941    #[instrument(name = "admin.delete_acls", level = "debug", skip(self, filters))]
1942    /// Deletes ACL bindings matching each filter.
1943    ///
1944    /// The shape mirrors Kafka's `Admin#deleteAcls(Collection<AclBindingFilter>)`.
1945    pub async fn delete_acls<I>(&self, filters: I) -> Result<Vec<DeleteAclsResult>>
1946    where
1947        I: IntoIterator<Item = AclBindingFilter>,
1948    {
1949        let filters = filters
1950            .into_iter()
1951            .map(AclBindingFilter::into_delete_filter)
1952            .collect::<Vec<_>>();
1953        if filters.is_empty() {
1954            return Ok(Vec::new());
1955        }
1956
1957        let request = DeleteAclsRequest::default().with_filters(filters);
1958        let response: DeleteAclsResponse = self
1959            .send_request::<DeleteAclsRequest>(DELETE_ACLS_VERSION_CAP, &request)
1960            .await?;
1961        let mut results = Vec::new();
1962        for result in response.filter_results {
1963            if let Some(error) = result.error_code.err() {
1964                return Err(admin_broker_error("delete_acls", None::<String>, error));
1965            }
1966            let mut matching_acls = Vec::new();
1967            for acl in result.matching_acls {
1968                if let Some(error) = acl.error_code.err() {
1969                    return Err(admin_broker_error(
1970                        "delete_acl",
1971                        Some(acl.resource_name.to_string()),
1972                        error,
1973                    ));
1974                }
1975                matching_acls.push(AclBinding::new(
1976                    ResourcePattern::new(
1977                        ResourceType::from_protocol_value(acl.resource_type),
1978                        acl.resource_name.to_string(),
1979                        PatternType::from_protocol_value(acl.pattern_type),
1980                    ),
1981                    AccessControlEntry::new(
1982                        acl.principal.to_string(),
1983                        acl.host.to_string(),
1984                        AclOperation::from_protocol_value(acl.operation),
1985                        AclPermissionType::from_protocol_value(acl.permission_type),
1986                    ),
1987                ));
1988            }
1989            results.push(DeleteAclsResult { matching_acls });
1990        }
1991        Ok(results)
1992    }
1993
1994    #[instrument(
1995        name = "admin.describe_configs",
1996        level = "debug",
1997        skip(self, resources)
1998    )]
1999    /// Describes configuration entries for Kafka resources.
2000    ///
2001    /// ```no_run
2002    /// # async fn example(admin: kafkit_client::KafkaAdmin) -> kafkit_client::Result<()> {
2003    /// use kafkit_client::ConfigResource;
2004    ///
2005    /// let configs = admin
2006    ///     .describe_configs([ConfigResource::topic("orders")])
2007    ///     .await?;
2008    ///
2009    /// if let Some(retention) = configs[0].entry("retention.ms") {
2010    ///     println!("retention: {:?}", retention.value);
2011    /// }
2012    /// # Ok(())
2013    /// # }
2014    /// ```
2015    pub async fn describe_configs<I>(&self, resources: I) -> Result<Vec<ConfigResourceConfig>>
2016    where
2017        I: IntoIterator<Item = ConfigResource>,
2018    {
2019        let resources = resources.into_iter().collect::<Vec<_>>();
2020        if resources.is_empty() {
2021            return Ok(Vec::new());
2022        }
2023
2024        let request = DescribeConfigsRequest::default()
2025            .with_resources(
2026                resources
2027                    .iter()
2028                    .map(|resource| {
2029                        DescribeConfigsResource::default()
2030                            .with_resource_type(resource.resource_type.as_protocol_value())
2031                            .with_resource_name(StrBytes::from_string(
2032                                resource.resource_name.clone(),
2033                            ))
2034                            .with_configuration_keys(None)
2035                    })
2036                    .collect(),
2037            )
2038            .with_include_synonyms(false);
2039        let response: DescribeConfigsResponse = self
2040            .send_request::<DescribeConfigsRequest>(DESCRIBE_CONFIGS_VERSION_CAP, &request)
2041            .await?;
2042
2043        let mut described = BTreeMap::new();
2044        for resource in response.results {
2045            let resource_type = ConfigResourceType::from_protocol_value(resource.resource_type);
2046            let resource_name = resource.resource_name.to_string();
2047            if let Some(error) = resource.error_code.err() {
2048                return Err(admin_broker_error(
2049                    "describe_configs",
2050                    Some(format!("{resource_type:?}:{resource_name}")),
2051                    error,
2052                ));
2053            }
2054
2055            let entries = resource
2056                .configs
2057                .into_iter()
2058                .map(|entry| {
2059                    let name = entry.name.to_string();
2060                    let config_entry = ConfigEntry {
2061                        name: name.clone(),
2062                        value: entry.value.map(|value| value.to_string()),
2063                        read_only: entry.read_only,
2064                        config_source: entry.config_source,
2065                        is_sensitive: entry.is_sensitive,
2066                        config_type: (response_supported_config_type(entry.config_type))
2067                            .then_some(entry.config_type),
2068                        documentation: entry.documentation.map(|doc| doc.to_string()),
2069                    };
2070                    (name, config_entry)
2071                })
2072                .collect();
2073
2074            described.insert(
2075                (resource_type, resource_name.clone()),
2076                ConfigResourceConfig {
2077                    resource: ConfigResource::new(resource_type, resource_name),
2078                    entries,
2079                },
2080            );
2081        }
2082
2083        resources
2084            .into_iter()
2085            .map(|resource| {
2086                described
2087                    .remove(&(resource.resource_type, resource.resource_name.clone()))
2088                    .ok_or_else(|| {
2089                        anyhow!("describe configs response did not include {:?}", resource).into()
2090                    })
2091            })
2092            .collect()
2093    }
2094
2095    #[instrument(
2096        name = "admin.incremental_alter_configs",
2097        level = "debug",
2098        skip(self, resources)
2099    )]
2100    /// Applies incremental config changes to Kafka resources.
2101    ///
2102    /// ```no_run
2103    /// # async fn example(admin: kafkit_client::KafkaAdmin) -> kafkit_client::Result<()> {
2104    /// use kafkit_client::{AlterConfigOp, ConfigResource};
2105    ///
2106    /// admin
2107    ///     .incremental_alter_configs([(
2108    ///         ConfigResource::topic("orders"),
2109    ///         vec![AlterConfigOp::set("retention.ms", "604800000")],
2110    ///     )])
2111    ///     .await?;
2112    /// # Ok(())
2113    /// # }
2114    /// ```
2115    pub async fn incremental_alter_configs<I>(&self, resources: I) -> Result<()>
2116    where
2117        I: IntoIterator<Item = (ConfigResource, Vec<AlterConfigOp>)>,
2118    {
2119        let resources = resources
2120            .into_iter()
2121            .map(|(resource, ops)| {
2122                AlterConfigsResource::default()
2123                    .with_resource_type(resource.resource_type.as_protocol_value())
2124                    .with_resource_name(StrBytes::from_string(resource.resource_name))
2125                    .with_configs(
2126                        ops.into_iter()
2127                            .map(|op| {
2128                                AlterableConfig::default()
2129                                    .with_name(StrBytes::from_string(op.name))
2130                                    .with_config_operation(op.op_type.as_protocol_value())
2131                                    .with_value(op.value.map(StrBytes::from_string))
2132                            })
2133                            .collect(),
2134                    )
2135            })
2136            .collect::<Vec<_>>();
2137        if resources.is_empty() {
2138            return Ok(());
2139        }
2140
2141        let request = IncrementalAlterConfigsRequest::default()
2142            .with_resources(resources)
2143            .with_validate_only(false);
2144        let response: IncrementalAlterConfigsResponse = self
2145            .send_request::<IncrementalAlterConfigsRequest>(
2146                INCREMENTAL_ALTER_CONFIGS_VERSION_CAP,
2147                &request,
2148            )
2149            .await?;
2150
2151        for resource in response.responses {
2152            if let Some(error) = resource.error_code.err() {
2153                let resource_type = ConfigResourceType::from_protocol_value(resource.resource_type);
2154                return Err(admin_broker_error(
2155                    "incremental_alter_configs",
2156                    Some(format!("{resource_type:?}:{}", resource.resource_name)),
2157                    error,
2158                ));
2159            }
2160        }
2161
2162        Ok(())
2163    }
2164
2165    #[instrument(
2166        name = "admin.upsert_scram_credential",
2167        level = "debug",
2168        skip(self, user, password)
2169    )]
2170    /// Creates or replaces a SCRAM credential for a user.
2171    ///
2172    /// `mechanism` must be [`SaslMechanism::ScramSha256`] or
2173    /// [`SaslMechanism::ScramSha512`]. Use
2174    /// [`KafkaAdmin::upsert_scram_credential_with_iterations`] to override the
2175    /// default iteration count.
2176    pub async fn upsert_scram_credential(
2177        &self,
2178        user: impl Into<String>,
2179        mechanism: SaslMechanism,
2180        password: impl AsRef<[u8]>,
2181    ) -> Result<()> {
2182        self.upsert_scram_credential_with_iterations(
2183            user,
2184            mechanism,
2185            password,
2186            scram::MIN_ITERATIONS,
2187        )
2188        .await
2189    }
2190
2191    #[instrument(
2192        name = "admin.upsert_scram_credential_with_iterations",
2193        level = "debug",
2194        skip(self, user, password)
2195    )]
2196    /// Creates or replaces a SCRAM credential with an explicit iteration count.
2197    pub async fn upsert_scram_credential_with_iterations(
2198        &self,
2199        user: impl Into<String>,
2200        mechanism: SaslMechanism,
2201        password: impl AsRef<[u8]>,
2202        iterations: i32,
2203    ) -> Result<()> {
2204        let user = user.into();
2205        let mechanism_type = mechanism
2206            .scram_type()
2207            .ok_or_else(|| anyhow!("SCRAM credential upsertion requires a SCRAM mechanism"))?;
2208        let salt = scram::secure_random_bytes()?;
2209        let salted_password =
2210            scram::salted_password(mechanism, password.as_ref(), &salt, iterations)?;
2211        let request = AlterUserScramCredentialsRequest::default().with_upsertions(vec![
2212            ScramCredentialUpsertion::default()
2213                .with_name(StrBytes::from_string(user.clone()))
2214                .with_mechanism(mechanism_type)
2215                .with_iterations(iterations)
2216                .with_salt(Bytes::from(salt))
2217                .with_salted_password(Bytes::from(salted_password)),
2218        ]);
2219        let response: AlterUserScramCredentialsResponse = self
2220            .send_request::<AlterUserScramCredentialsRequest>(
2221                ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP,
2222                &request,
2223            )
2224            .await?;
2225
2226        for result in response.results {
2227            if let Some(error) = result.error_code.err() {
2228                return Err(admin_broker_error(
2229                    "alter_scram_credential",
2230                    Some(result.user.to_string()),
2231                    error,
2232                ));
2233            }
2234        }
2235
2236        Ok(())
2237    }
2238
2239    /// Returns the configuration used by this admin client.
2240    pub fn config(&self) -> &AdminConfig {
2241        &self.config
2242    }
2243
2244    /// Returns finalized broker feature levels visible from a bootstrap broker.
2245    pub async fn finalized_feature_levels(&self) -> Result<Vec<BrokerFeatureLevel>> {
2246        let connection = connect_to_any_bootstrap(
2247            &self.config.bootstrap_servers,
2248            &self.config.client_id,
2249            self.config.request_timeout,
2250            self.config.security_protocol,
2251            &self.config.tls,
2252            &self.config.sasl,
2253            &self.config.tcp_connector,
2254        )
2255        .await?;
2256        Ok(connection
2257            .finalized_feature_levels()
2258            .into_iter()
2259            .map(|(name, level)| BrokerFeatureLevel { name, level })
2260            .collect())
2261    }
2262
2263    /// Updates finalized broker feature levels.
2264    ///
2265    /// Feature updates are cluster-level operations. Consider calling
2266    /// [`KafkaAdmin::validate_feature_updates`] first when supported by the
2267    /// broker.
2268    pub async fn update_features<I>(&self, updates: I) -> Result<()>
2269    where
2270        I: IntoIterator<Item = FeatureUpdate>,
2271    {
2272        self.update_features_inner(updates, false).await
2273    }
2274
2275    /// Validates finalized broker feature updates without applying them.
2276    pub async fn validate_feature_updates<I>(&self, updates: I) -> Result<()>
2277    where
2278        I: IntoIterator<Item = FeatureUpdate>,
2279    {
2280        self.update_features_inner(updates, true).await
2281    }
2282
2283    async fn update_features_inner<I>(&self, updates: I, validate_only: bool) -> Result<()>
2284    where
2285        I: IntoIterator<Item = FeatureUpdate>,
2286    {
2287        let (mut connection, version) = self
2288            .connect_with_version::<UpdateFeaturesRequest>(UPDATE_FEATURES_VERSION_CAP)
2289            .await?;
2290        let feature_updates = updates
2291            .into_iter()
2292            .map(|update| update.into_request_update(version))
2293            .collect::<Result<Vec<_>>>()?;
2294        if feature_updates.is_empty() {
2295            return Ok(());
2296        }
2297
2298        let mut request = UpdateFeaturesRequest::default()
2299            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
2300            .with_feature_updates(feature_updates);
2301        if version >= 1 {
2302            request = request.with_validate_only(validate_only);
2303        } else if validate_only {
2304            return Err(anyhow!("validate-only feature updates require UpdateFeatures v1+").into());
2305        }
2306
2307        let response: UpdateFeaturesResponse = connection
2308            .send_request::<UpdateFeaturesRequest>(&self.config.client_id, version, &request)
2309            .await?;
2310        if let Some(error) = response.error_code.err() {
2311            return Err(admin_broker_error("update_features", None::<String>, error));
2312        }
2313
2314        for result in response.results {
2315            if let Some(error) = result.error_code.err() {
2316                let feature = result.feature.to_string();
2317                return Err(admin_broker_error("update_feature", Some(feature), error));
2318            }
2319        }
2320
2321        Ok(())
2322    }
2323
2324    async fn warm_up(&self) -> Result<()> {
2325        let _ = connect_to_any_bootstrap(
2326            &self.config.bootstrap_servers,
2327            &self.config.client_id,
2328            self.config.request_timeout,
2329            self.config.security_protocol,
2330            &self.config.tls,
2331            &self.config.sasl,
2332            &self.config.tcp_connector,
2333        )
2334        .await?;
2335        Ok(())
2336    }
2337
2338    async fn fetch_metadata(&self, topics: Option<&[String]>) -> Result<MetadataResponse> {
2339        let (mut connection, version) = self
2340            .connect_with_version::<MetadataRequest>(METADATA_VERSION_CAP)
2341            .await?;
2342        let request = MetadataRequest::default()
2343            .with_topics(topics.map(|topics| {
2344                topics
2345                    .iter()
2346                    .cloned()
2347                    .map(StrBytes::from_string)
2348                    .map(|name| MetadataRequestTopic::default().with_name(Some(name.into())))
2349                    .collect()
2350            }))
2351            .with_allow_auto_topic_creation(false)
2352            .with_include_cluster_authorized_operations(false)
2353            .with_include_topic_authorized_operations(false);
2354        Ok(connection
2355            .send_request::<MetadataRequest>(&self.config.client_id, version, &request)
2356            .await?)
2357    }
2358
2359    async fn send_request<Req>(&self, version_cap: i16, request: &Req) -> Result<Req::Response>
2360    where
2361        Req: Request,
2362    {
2363        let (mut connection, version) = self.connect_with_version::<Req>(version_cap).await?;
2364        Ok(connection
2365            .send_request::<Req>(&self.config.client_id, version, request)
2366            .await?)
2367    }
2368
2369    async fn connect_with_version<Req>(&self, version_cap: i16) -> Result<(BrokerConnection, i16)>
2370    where
2371        Req: Request,
2372    {
2373        let connection = connect_to_any_bootstrap(
2374            &self.config.bootstrap_servers,
2375            &self.config.client_id,
2376            self.config.request_timeout,
2377            self.config.security_protocol,
2378            &self.config.tls,
2379            &self.config.sasl,
2380            &self.config.tcp_connector,
2381        )
2382        .await?;
2383        let version = connection.version_with_cap::<Req>(version_cap)?;
2384        Ok((connection, version))
2385    }
2386}
2387
2388fn validate_topic_name(topic: String) -> Result<String> {
2389    let topic = topic.trim();
2390    if topic.is_empty() {
2391        return Err(AdminError::EmptyTopicName.into());
2392    }
2393
2394    Ok(topic.to_owned())
2395}
2396
2397fn admin_broker_error(
2398    operation: &'static str,
2399    resource: impl Into<Option<String>>,
2400    error: ResponseError,
2401) -> crate::Error {
2402    BrokerError::response(operation, resource, error).into()
2403}
2404
2405async fn describe_log_dirs_for_broker(
2406    mut config: AdminConfig,
2407    broker: BrokerDescription,
2408    request_topics: Option<Vec<DescribableLogDirTopic>>,
2409) -> Result<BrokerLogDirs> {
2410    let address = format!("{}:{}", broker.host, broker.port);
2411    config.bootstrap_servers = vec![address];
2412    let broker_admin = KafkaAdmin { config };
2413    let response: DescribeLogDirsResponse = broker_admin
2414        .send_request::<DescribeLogDirsRequest>(
2415            DESCRIBE_LOG_DIRS_VERSION_CAP,
2416            &DescribeLogDirsRequest::default().with_topics(request_topics),
2417        )
2418        .await?;
2419    if let Some(error) = ResponseError::try_from_code(response.error_code) {
2420        return Err(admin_broker_error(
2421            "describe_log_dirs",
2422            Some(format!("broker {}", broker.broker_id)),
2423            error,
2424        ));
2425    }
2426
2427    let mut log_dirs = Vec::new();
2428    for result in response.results {
2429        let replicas = result
2430            .topics
2431            .into_iter()
2432            .flat_map(|topic| {
2433                let topic_name = topic.name.to_string();
2434                topic
2435                    .partitions
2436                    .into_iter()
2437                    .map(move |partition| ReplicaLogDirDescription {
2438                        topic: topic_name.clone(),
2439                        partition: partition.partition_index,
2440                        size_bytes: partition.partition_size.max(0),
2441                        offset_lag: partition.offset_lag,
2442                        is_future: partition.is_future_key,
2443                    })
2444            })
2445            .collect();
2446        log_dirs.push(LogDirDescription {
2447            log_dir: result.log_dir.to_string(),
2448            error_code: result.error_code,
2449            total_bytes: (result.total_bytes >= 0).then_some(result.total_bytes),
2450            usable_bytes: (result.usable_bytes >= 0).then_some(result.usable_bytes),
2451            replicas,
2452        });
2453    }
2454    log_dirs.sort_by(|left, right| left.log_dir.cmp(&right.log_dir));
2455    Ok(BrokerLogDirs {
2456        broker_id: broker.broker_id,
2457        log_dirs,
2458    })
2459}
2460
2461fn describe_log_dirs_topics(partitions: &[TopicPartition]) -> Vec<DescribableLogDirTopic> {
2462    let mut topics = BTreeMap::<String, Vec<i32>>::new();
2463    for partition in partitions {
2464        topics
2465            .entry(partition.topic.clone())
2466            .or_default()
2467            .push(partition.partition);
2468    }
2469    topics
2470        .into_iter()
2471        .map(|(topic, mut partitions)| {
2472            partitions.sort_unstable();
2473            partitions.dedup();
2474            DescribableLogDirTopic::default()
2475                .with_topic(StrBytes::from_string(topic).into())
2476                .with_partitions(partitions)
2477        })
2478        .collect()
2479}
2480
2481fn validate_group_id(group_id: String) -> Result<String> {
2482    let group_id = group_id.trim();
2483    if group_id.is_empty() {
2484        return Err(ValidationError::EmptyConsumerGroupId.into());
2485    }
2486
2487    Ok(group_id.to_owned())
2488}
2489
2490fn describe_group_api_for_listed_type(group_type: Option<&str>) -> DescribeGroupApi {
2491    match group_type {
2492        Some(group_type) if group_type.eq_ignore_ascii_case("consumer") => {
2493            DescribeGroupApi::Consumer
2494        }
2495        Some(group_type) if group_type.eq_ignore_ascii_case("share") => DescribeGroupApi::Share,
2496        _ => DescribeGroupApi::Classic,
2497    }
2498}
2499
2500fn validate_feature_name(feature: String) -> Result<String> {
2501    let feature = feature.trim();
2502    if feature.is_empty() {
2503        return Err(ValidationError::EmptyFeatureName.into());
2504    }
2505
2506    Ok(feature.to_owned())
2507}
2508
2509fn assignment_partition_count(
2510    assignment: &kafka_protocol::messages::consumer_group_describe_response::Assignment,
2511) -> usize {
2512    assignment
2513        .topic_partitions
2514        .iter()
2515        .map(|topic| topic.partitions.len())
2516        .sum()
2517}
2518
2519fn share_assignment_partition_count(
2520    assignment: &kafka_protocol::messages::share_group_describe_response::Assignment,
2521) -> usize {
2522    assignment
2523        .topic_partitions
2524        .iter()
2525        .map(|topic| topic.partitions.len())
2526        .sum()
2527}
2528
2529fn classic_subscription_topic_count(metadata: &Bytes) -> usize {
2530    let Some((version, mut body)) = classic_protocol_body(metadata) else {
2531        return 0;
2532    };
2533    ConsumerProtocolSubscription::decode(&mut body, version)
2534        .map(|subscription| subscription.topics.len())
2535        .unwrap_or(0)
2536}
2537
2538fn classic_assignment_partition_count(assignment: &Bytes) -> usize {
2539    let Some((version, mut body)) = classic_protocol_body(assignment) else {
2540        return 0;
2541    };
2542    ConsumerProtocolAssignment::decode(&mut body, version)
2543        .map(|assignment| {
2544            assignment
2545                .assigned_partitions
2546                .iter()
2547                .map(|topic| topic.partitions.len())
2548                .sum()
2549        })
2550        .unwrap_or(0)
2551}
2552
2553fn classic_protocol_body(bytes: &Bytes) -> Option<(i16, Bytes)> {
2554    let mut body = bytes.clone();
2555    if body.remaining() < 2 {
2556        return None;
2557    }
2558    let version = body.get_i16();
2559    Some((version, body))
2560}
2561
2562fn is_ignorable_create_topic_error(error: ResponseError) -> bool {
2563    error == ResponseError::TopicAlreadyExists
2564}
2565
2566fn response_supported_config_type(config_type: i8) -> bool {
2567    config_type >= 0
2568}
2569
2570#[cfg(test)]
2571mod tests {
2572    use super::*;
2573    use bytes::{BufMut, BytesMut};
2574    use kafka_protocol::protocol::Encodable;
2575
2576    #[test]
2577    fn new_topic_maps_to_create_topics_request() {
2578        let topic = NewTopic::new("orders", 3, 2)
2579            .with_config("cleanup.policy", "compact")
2580            .into_request_topic()
2581            .expect("topic should be valid");
2582
2583        assert_eq!(topic.name.0.to_string(), "orders");
2584        assert_eq!(topic.num_partitions, 3);
2585        assert_eq!(topic.replication_factor, 2);
2586        assert_eq!(topic.configs.len(), 1);
2587        assert_eq!(topic.configs[0].name.to_string(), "cleanup.policy");
2588    }
2589
2590    #[test]
2591    fn new_topic_rejects_invalid_partition_count() {
2592        let error = NewTopic::new("orders", 0, 1)
2593            .into_request_topic()
2594            .expect_err("invalid topic should fail");
2595        assert!(
2596            error
2597                .to_string()
2598                .contains("topic partition count must be positive")
2599        );
2600    }
2601
2602    #[test]
2603    fn new_topic_rejects_empty_names_and_invalid_replication_factor() {
2604        let error = NewTopic::new("  ", 1, 1).into_request_topic().unwrap_err();
2605        assert!(matches!(
2606            error,
2607            crate::Error::Admin(AdminError::EmptyTopicName)
2608        ));
2609
2610        let error = NewTopic::new("orders", 1, 0)
2611            .into_request_topic()
2612            .unwrap_err();
2613        assert!(matches!(
2614            error,
2615            crate::Error::Admin(AdminError::InvalidReplicationFactor {
2616                replication_factor: 0
2617            })
2618        ));
2619    }
2620
2621    #[test]
2622    fn new_partitions_maps_assignments_and_rejects_invalid_input() {
2623        let topic = NewPartitions::increase_to(4)
2624            .with_assignment([1, 2])
2625            .with_assignment([2, 3])
2626            .into_request_topic("orders".to_owned())
2627            .unwrap();
2628        assert_eq!(topic.name.to_string(), "orders");
2629        assert_eq!(topic.count, 4);
2630        assert_eq!(topic.assignments.unwrap().len(), 2);
2631
2632        let error = NewPartitions::increase_to(0)
2633            .into_request_topic("orders".to_owned())
2634            .unwrap_err();
2635        assert!(matches!(
2636            error,
2637            crate::Error::Admin(AdminError::InvalidPartitionCount { partitions: 0 })
2638        ));
2639
2640        let error = NewPartitions::increase_to(1)
2641            .into_request_topic(" ".to_owned())
2642            .unwrap_err();
2643        assert!(matches!(
2644            error,
2645            crate::Error::Admin(AdminError::EmptyTopicName)
2646        ));
2647    }
2648
2649    #[test]
2650    fn config_resource_and_operation_protocol_values_are_stable() {
2651        assert_eq!(ConfigResourceType::Unknown.as_protocol_value(), 0);
2652        assert_eq!(ConfigResourceType::Topic.as_protocol_value(), 2);
2653        assert_eq!(ConfigResourceType::Broker.as_protocol_value(), 4);
2654        assert_eq!(ConfigResourceType::BrokerLogger.as_protocol_value(), 8);
2655        assert_eq!(ConfigResourceType::ClientMetrics.as_protocol_value(), 16);
2656        assert_eq!(ConfigResourceType::Group.as_protocol_value(), 32);
2657        assert_eq!(
2658            ConfigResourceType::from_protocol_value(2),
2659            ConfigResourceType::Topic
2660        );
2661        assert_eq!(
2662            ConfigResourceType::from_protocol_value(99),
2663            ConfigResourceType::Unknown
2664        );
2665
2666        assert_eq!(
2667            AlterConfigOp::set("cleanup.policy", "compact")
2668                .op_type
2669                .as_protocol_value(),
2670            0
2671        );
2672        assert_eq!(
2673            AlterConfigOp::delete("retention.ms")
2674                .op_type
2675                .as_protocol_value(),
2676            1
2677        );
2678        assert_eq!(AlterConfigOpType::Append.as_protocol_value(), 2);
2679        assert_eq!(AlterConfigOpType::Subtract.as_protocol_value(), 3);
2680        assert!(response_supported_config_type(0));
2681        assert!(!response_supported_config_type(-1));
2682    }
2683
2684    #[test]
2685    fn config_resource_config_entry_lookup_returns_named_entry() {
2686        let mut entries = BTreeMap::new();
2687        entries.insert(
2688            "cleanup.policy".to_owned(),
2689            ConfigEntry {
2690                name: "cleanup.policy".to_owned(),
2691                value: Some("compact".to_owned()),
2692                read_only: false,
2693                config_source: 1,
2694                is_sensitive: false,
2695                config_type: Some(2),
2696                documentation: None,
2697            },
2698        );
2699        let config = ConfigResourceConfig {
2700            resource: ConfigResource::topic("orders"),
2701            entries,
2702        };
2703
2704        assert_eq!(
2705            config.entry("cleanup.policy").unwrap().value.as_deref(),
2706            Some("compact")
2707        );
2708        assert!(config.entry("missing").is_none());
2709    }
2710
2711    #[test]
2712    fn describe_group_api_routing_uses_legacy_api_for_classic_groups() {
2713        assert_eq!(
2714            describe_group_api_for_listed_type(None),
2715            DescribeGroupApi::Classic
2716        );
2717        assert_eq!(
2718            describe_group_api_for_listed_type(Some("classic")),
2719            DescribeGroupApi::Classic
2720        );
2721        assert_eq!(
2722            describe_group_api_for_listed_type(Some("consumer")),
2723            DescribeGroupApi::Consumer
2724        );
2725        assert_eq!(
2726            describe_group_api_for_listed_type(Some("share")),
2727            DescribeGroupApi::Share
2728        );
2729    }
2730
2731    #[test]
2732    fn acl_binding_creation_validates_and_maps_protocol_values() {
2733        let creation = AclBinding::new(
2734            ResourcePattern::new(ResourceType::Topic, "orders", PatternType::Prefixed),
2735            AccessControlEntry::new(
2736                "User:alice",
2737                "*",
2738                AclOperation::Read,
2739                AclPermissionType::Allow,
2740            ),
2741        )
2742        .into_creation()
2743        .unwrap();
2744
2745        assert_eq!(creation.resource_type, 2);
2746        assert_eq!(creation.resource_name.to_string(), "orders");
2747        assert_eq!(creation.resource_pattern_type, 4);
2748        assert_eq!(creation.principal.to_string(), "User:alice");
2749        assert_eq!(creation.host.to_string(), "*");
2750        assert_eq!(creation.operation, 3);
2751        assert_eq!(creation.permission_type, 3);
2752
2753        let error = AclBinding::new(
2754            ResourcePattern::new(ResourceType::Any, "orders", PatternType::Literal),
2755            AccessControlEntry::new(
2756                "User:alice",
2757                "*",
2758                AclOperation::Read,
2759                AclPermissionType::Allow,
2760            ),
2761        )
2762        .into_creation()
2763        .unwrap_err();
2764        assert!(error.to_string().contains("resource_type must not be ANY"));
2765
2766        let error = AclBinding::new(
2767            ResourcePattern::new(ResourceType::Topic, "orders", PatternType::Literal),
2768            AccessControlEntry::new(
2769                "User:alice",
2770                "*",
2771                AclOperation::Any,
2772                AclPermissionType::Allow,
2773            ),
2774        )
2775        .into_creation()
2776        .unwrap_err();
2777        assert!(error.to_string().contains("operation must not be ANY"));
2778    }
2779
2780    #[test]
2781    fn acl_binding_filters_map_to_describe_and_delete_requests() {
2782        let filter = AclBindingFilter::new(
2783            ResourcePatternFilter::new(
2784                ResourceType::Topic,
2785                Some("orders".to_owned()),
2786                PatternType::Literal,
2787            ),
2788            AccessControlEntryFilter::new(
2789                Some("User:alice".to_owned()),
2790                Some("*".to_owned()),
2791                AclOperation::Read,
2792                AclPermissionType::Allow,
2793            ),
2794        );
2795
2796        let describe = filter.to_describe_request();
2797        assert_eq!(describe.resource_type_filter, 2);
2798        assert_eq!(
2799            describe
2800                .resource_name_filter
2801                .as_ref()
2802                .map(ToString::to_string),
2803            Some("orders".to_owned())
2804        );
2805        assert_eq!(describe.pattern_type_filter, 3);
2806        assert_eq!(
2807            describe.principal_filter.as_ref().map(ToString::to_string),
2808            Some("User:alice".to_owned())
2809        );
2810        assert_eq!(
2811            describe.host_filter.as_ref().map(ToString::to_string),
2812            Some("*".to_owned())
2813        );
2814        assert_eq!(describe.operation, 3);
2815        assert_eq!(describe.permission_type, 3);
2816
2817        let delete = filter.into_delete_filter();
2818        assert_eq!(delete.resource_type_filter, 2);
2819        assert_eq!(
2820            delete
2821                .resource_name_filter
2822                .as_ref()
2823                .map(ToString::to_string),
2824            Some("orders".to_owned())
2825        );
2826        assert_eq!(delete.pattern_type_filter, 3);
2827        assert_eq!(
2828            delete.principal_filter.as_ref().map(ToString::to_string),
2829            Some("User:alice".to_owned())
2830        );
2831        assert_eq!(
2832            delete.host_filter.as_ref().map(ToString::to_string),
2833            Some("*".to_owned())
2834        );
2835        assert_eq!(delete.operation, 3);
2836        assert_eq!(delete.permission_type, 3);
2837    }
2838
2839    #[test]
2840    fn classic_group_payload_counts_decode_subscription_and_assignment() {
2841        let subscription = ConsumerProtocolSubscription::default().with_topics(vec![
2842            StrBytes::from_static_str("orders"),
2843            StrBytes::from_static_str("payments"),
2844        ]);
2845        let subscription_bytes = encode_classic_protocol(&subscription, 3);
2846        assert_eq!(classic_subscription_topic_count(&subscription_bytes), 2);
2847
2848        let assignment = ConsumerProtocolAssignment::default().with_assigned_partitions(vec![
2849            kafka_protocol::messages::consumer_protocol_assignment::TopicPartition::default()
2850                .with_topic(StrBytes::from_static_str("orders").into())
2851                .with_partitions(vec![0, 1]),
2852            kafka_protocol::messages::consumer_protocol_assignment::TopicPartition::default()
2853                .with_topic(StrBytes::from_static_str("payments").into())
2854                .with_partitions(vec![0]),
2855        ]);
2856        let assignment_bytes = encode_classic_protocol(&assignment, 3);
2857        assert_eq!(classic_assignment_partition_count(&assignment_bytes), 3);
2858
2859        assert_eq!(
2860            classic_subscription_topic_count(&Bytes::from_static(b"\x00")),
2861            0
2862        );
2863        assert_eq!(
2864            classic_assignment_partition_count(&Bytes::from_static(b"\x00\x63")),
2865            0
2866        );
2867    }
2868
2869    #[test]
2870    fn feature_update_maps_to_modern_update_features_request() {
2871        let update = FeatureUpdate::upgrade("share.version", 1)
2872            .into_request_update(2)
2873            .unwrap();
2874        assert_eq!(update.feature.to_string(), "share.version");
2875        assert_eq!(update.max_version_level, 1);
2876        assert_eq!(update.upgrade_type, 1);
2877        assert!(!update.allow_downgrade);
2878    }
2879
2880    #[test]
2881    fn feature_update_maps_to_legacy_downgrade_flag() {
2882        let update = FeatureUpdate::safe_downgrade("metadata.version", 0)
2883            .into_request_update(0)
2884            .unwrap();
2885        assert_eq!(update.feature.to_string(), "metadata.version");
2886        assert_eq!(update.max_version_level, 0);
2887        assert_eq!(update.upgrade_type, 1);
2888        assert!(update.allow_downgrade);
2889    }
2890
2891    #[test]
2892    fn feature_update_rejects_empty_name() {
2893        let error = FeatureUpdate::upgrade(" ", 1)
2894            .into_request_update(2)
2895            .unwrap_err();
2896        assert!(
2897            error
2898                .to_string()
2899                .contains("feature names must be non-empty")
2900        );
2901    }
2902
2903    #[test]
2904    fn topic_already_exists_is_ignorable() {
2905        assert!(is_ignorable_create_topic_error(
2906            ResponseError::TopicAlreadyExists
2907        ));
2908        assert!(!is_ignorable_create_topic_error(
2909            ResponseError::UnknownTopicOrPartition
2910        ));
2911    }
2912
2913    fn encode_classic_protocol<T>(message: &T, version: i16) -> Bytes
2914    where
2915        T: Encodable,
2916    {
2917        let mut bytes = BytesMut::new();
2918        bytes.put_i16(version);
2919        message.encode(&mut bytes, version).unwrap();
2920        bytes.freeze()
2921    }
2922}