Skip to main content

kafkit_client/admin/
mod.rs

1//! Admin client operations for topics, groups, configs, and cluster metadata.
2//!
3//! ```no_run
4//! # async fn example() -> kafkit_client::Result<()> {
5//! use kafkit_client::{KafkaClient, NewTopic};
6//!
7//! let admin = KafkaClient::new("localhost:9092").admin().connect().await?;
8//! admin
9//!     .create_topics([NewTopic::new("orders", 3, 1)])
10//!     .await?;
11//! # Ok(())
12//! # }
13//! ```
14//!
15use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use bytes::{Buf, Bytes};
19use kafka_protocol::error::{ParseResponseErrorCode, ResponseError};
20use kafka_protocol::messages::alter_user_scram_credentials_request::ScramCredentialUpsertion;
21use kafka_protocol::messages::create_partitions_request::{
22    CreatePartitionsAssignment, CreatePartitionsTopic,
23};
24use kafka_protocol::messages::create_topics_request::{CreatableTopic, CreatableTopicConfig};
25use kafka_protocol::messages::describe_configs_request::DescribeConfigsResource;
26use kafka_protocol::messages::incremental_alter_configs_request::{
27    AlterConfigsResource, AlterableConfig,
28};
29use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
30use kafka_protocol::messages::update_features_request::FeatureUpdateKey;
31use kafka_protocol::messages::{
32    AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse,
33    ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerProtocolAssignment,
34    ConsumerProtocolSubscription, CreatePartitionsRequest, CreatePartitionsResponse,
35    CreateTopicsRequest, CreateTopicsResponse, DeleteGroupsRequest, DeleteGroupsResponse,
36    DeleteTopicsRequest, DeleteTopicsResponse, DescribeClusterRequest, DescribeClusterResponse,
37    DescribeConfigsRequest, DescribeConfigsResponse, DescribeGroupsRequest, DescribeGroupsResponse,
38    IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse, ListGroupsRequest,
39    ListGroupsResponse, MetadataRequest, MetadataResponse, ShareGroupDescribeRequest,
40    ShareGroupDescribeResponse, UpdateFeaturesRequest, UpdateFeaturesResponse,
41};
42use kafka_protocol::protocol::{Decodable, Request, StrBytes};
43use tracing::{debug, instrument};
44use uuid::Uuid;
45
46use crate::config::{AdminConfig, SaslMechanism};
47use crate::constants::{
48    ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP, CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
49    CREATE_PARTITIONS_VERSION_CAP, CREATE_TOPICS_VERSION_CAP, DELETE_GROUPS_VERSION_CAP,
50    DELETE_TOPICS_VERSION_CAP, DESCRIBE_CLUSTER_VERSION_CAP, DESCRIBE_CONFIGS_VERSION_CAP,
51    DESCRIBE_GROUPS_VERSION_CAP, INCREMENTAL_ALTER_CONFIGS_VERSION_CAP, LIST_GROUPS_VERSION_CAP,
52    METADATA_VERSION_CAP, SHARE_GROUP_DESCRIBE_VERSION_CAP, UPDATE_FEATURES_VERSION_CAP,
53};
54use crate::network::scram;
55use crate::network::{BrokerConnection, connect_to_any_bootstrap, duration_to_i32_ms};
56use crate::{AdminError, Result};
57
58#[derive(Debug, Clone)]
59/// New Topic.
60pub struct NewTopic {
61    /// Name returned by Kafka.
62    pub name: String,
63    /// Num Partitions.
64    pub num_partitions: i32,
65    /// Replication Factor.
66    pub replication_factor: i16,
67    /// Topic-level configuration entries.
68    pub configs: BTreeMap<String, String>,
69}
70
71impl NewTopic {
72    /// Creates a new value.
73    pub fn new(name: impl Into<String>, num_partitions: i32, replication_factor: i16) -> Self {
74        Self {
75            name: name.into(),
76            num_partitions,
77            replication_factor,
78            configs: BTreeMap::new(),
79        }
80    }
81
82    /// Sets config and returns the updated value.
83    pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
84        self.configs.insert(key.into(), value.into());
85        self
86    }
87
88    fn into_request_topic(self) -> Result<CreatableTopic> {
89        let name = validate_topic_name(self.name)?;
90        if self.num_partitions <= 0 {
91            return Err(AdminError::InvalidPartitionCount {
92                partitions: self.num_partitions,
93            }
94            .into());
95        }
96        if self.replication_factor <= 0 {
97            return Err(AdminError::InvalidReplicationFactor {
98                replication_factor: self.replication_factor,
99            }
100            .into());
101        }
102
103        let configs = self
104            .configs
105            .into_iter()
106            .map(|(key, value)| {
107                CreatableTopicConfig::default()
108                    .with_name(StrBytes::from_string(key))
109                    .with_value(Some(StrBytes::from_string(value)))
110            })
111            .collect();
112
113        Ok(CreatableTopic::default()
114            .with_name(StrBytes::from_string(name).into())
115            .with_num_partitions(self.num_partitions)
116            .with_replication_factor(self.replication_factor)
117            .with_configs(configs))
118    }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122/// New Partitions.
123pub struct NewPartitions {
124    /// Total Count.
125    pub total_count: i32,
126    /// Assignments.
127    pub assignments: Vec<Vec<i32>>,
128}
129
130impl NewPartitions {
131    /// Increase To.
132    pub fn increase_to(total_count: i32) -> Self {
133        Self {
134            total_count,
135            assignments: Vec::new(),
136        }
137    }
138
139    /// Sets assignment and returns the updated value.
140    pub fn with_assignment<I>(mut self, broker_ids: I) -> Self
141    where
142        I: IntoIterator<Item = i32>,
143    {
144        self.assignments.push(broker_ids.into_iter().collect());
145        self
146    }
147
148    fn into_request_topic(self, topic_name: String) -> Result<CreatePartitionsTopic> {
149        let name = validate_topic_name(topic_name)?;
150        if self.total_count <= 0 {
151            return Err(AdminError::InvalidPartitionCount {
152                partitions: self.total_count,
153            }
154            .into());
155        }
156
157        let assignments = (!self.assignments.is_empty()).then(|| {
158            self.assignments
159                .into_iter()
160                .map(|broker_ids| {
161                    CreatePartitionsAssignment::default()
162                        .with_broker_ids(broker_ids.into_iter().map(Into::into).collect())
163                })
164                .collect()
165        });
166
167        Ok(CreatePartitionsTopic::default()
168            .with_name(StrBytes::from_string(name).into())
169            .with_count(self.total_count)
170            .with_assignments(assignments))
171    }
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
175/// Topic Listing.
176pub struct TopicListing {
177    /// Name returned by Kafka.
178    pub name: String,
179    /// Topic Id.
180    pub topic_id: Option<Uuid>,
181    /// Is Internal.
182    pub is_internal: bool,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186/// Topic Partition Description.
187pub struct TopicPartitionDescription {
188    /// Partition number.
189    pub partition: i32,
190    /// Leader Id.
191    pub leader_id: i32,
192    /// Leader Epoch.
193    pub leader_epoch: i32,
194    /// Replica Nodes.
195    pub replica_nodes: Vec<i32>,
196    /// Isr Nodes.
197    pub isr_nodes: Vec<i32>,
198    /// Offline Replicas.
199    pub offline_replicas: Vec<i32>,
200}
201
202#[derive(Debug, Clone, PartialEq, Eq)]
203/// Topic Description.
204pub struct TopicDescription {
205    /// Name returned by Kafka.
206    pub name: String,
207    /// Topic Id.
208    pub topic_id: Option<Uuid>,
209    /// Is Internal.
210    pub is_internal: bool,
211    /// Partitions.
212    pub partitions: Vec<TopicPartitionDescription>,
213}
214
215#[derive(Debug, Clone, PartialEq, Eq)]
216/// Broker Description.
217pub struct BrokerDescription {
218    /// Broker Id.
219    pub broker_id: i32,
220    /// Host.
221    pub host: String,
222    /// Port.
223    pub port: i32,
224    /// Rack.
225    pub rack: Option<String>,
226    /// Is Fenced.
227    pub is_fenced: bool,
228}
229
230#[derive(Debug, Clone, PartialEq, Eq)]
231/// Cluster Description.
232pub struct ClusterDescription {
233    /// Cluster Id.
234    pub cluster_id: String,
235    /// Controller Id.
236    pub controller_id: i32,
237    /// Brokers.
238    pub brokers: Vec<BrokerDescription>,
239}
240
241#[derive(Debug, Clone, PartialEq, Eq)]
242/// Broker Feature Level.
243pub struct BrokerFeatureLevel {
244    /// Name returned by Kafka.
245    pub name: String,
246    /// Level.
247    pub level: i16,
248}
249
250#[derive(Debug, Clone, PartialEq, Eq)]
251/// Finalized broker feature update.
252pub struct FeatureUpdate {
253    /// Feature name.
254    pub name: String,
255    /// New finalized maximum version level. Values below 1 delete the finalized feature.
256    pub max_version_level: i16,
257    /// How the broker should apply an upgrade or downgrade.
258    pub upgrade_type: FeatureUpgradeType,
259}
260
261impl FeatureUpdate {
262    /// Creates an upgrade-only feature update.
263    pub fn upgrade(name: impl Into<String>, max_version_level: i16) -> Self {
264        Self {
265            name: name.into(),
266            max_version_level,
267            upgrade_type: FeatureUpgradeType::Upgrade,
268        }
269    }
270
271    /// Creates a safe downgrade or deletion update.
272    pub fn safe_downgrade(name: impl Into<String>, max_version_level: i16) -> Self {
273        Self {
274            name: name.into(),
275            max_version_level,
276            upgrade_type: FeatureUpgradeType::SafeDowngrade,
277        }
278    }
279
280    /// Creates an unsafe downgrade or deletion update.
281    pub fn unsafe_downgrade(name: impl Into<String>, max_version_level: i16) -> Self {
282        Self {
283            name: name.into(),
284            max_version_level,
285            upgrade_type: FeatureUpgradeType::UnsafeDowngrade,
286        }
287    }
288
289    fn into_request_update(self, version: i16) -> Result<FeatureUpdateKey> {
290        let name = validate_feature_name(self.name)?;
291        let allow_downgrade = self.upgrade_type.allows_downgrade();
292        let mut update = FeatureUpdateKey::default()
293            .with_feature(StrBytes::from_string(name))
294            .with_max_version_level(self.max_version_level);
295        if version == 0 {
296            update = update.with_allow_downgrade(allow_downgrade);
297        } else {
298            update = update.with_upgrade_type(self.upgrade_type.as_protocol_value());
299        }
300        Ok(update)
301    }
302}
303
304#[derive(Debug, Clone, Copy, PartialEq, Eq)]
305/// Type of finalized feature update to perform.
306pub enum FeatureUpgradeType {
307    /// Only allow upgrades.
308    Upgrade,
309    /// Allow safe, lossless downgrades.
310    SafeDowngrade,
311    /// Allow unsafe, potentially lossy downgrades.
312    UnsafeDowngrade,
313}
314
315impl FeatureUpgradeType {
316    fn as_protocol_value(self) -> i8 {
317        match self {
318            Self::Upgrade => 1,
319            Self::SafeDowngrade => 2,
320            Self::UnsafeDowngrade => 3,
321        }
322    }
323
324    fn allows_downgrade(self) -> bool {
325        !matches!(self, Self::Upgrade)
326    }
327}
328
329#[derive(Debug, Clone, PartialEq, Eq)]
330/// Consumer Group Listing.
331pub struct ConsumerGroupListing {
332    /// Group Id.
333    pub group_id: String,
334    /// Protocol Type.
335    pub protocol_type: String,
336    /// State.
337    pub state: Option<String>,
338    /// Group Type.
339    pub group_type: Option<String>,
340}
341
342#[derive(Debug, Clone, PartialEq, Eq)]
343/// Consumer Group Description.
344pub struct ConsumerGroupDescription {
345    /// Group Id.
346    pub group_id: String,
347    /// State.
348    pub state: String,
349    /// Protocol Type.
350    pub protocol_type: String,
351    /// Protocol Data.
352    pub protocol_data: String,
353    /// Members.
354    pub members: Vec<ConsumerGroupMemberDescription>,
355    /// Authorized Operations.
356    pub authorized_operations: Option<i32>,
357}
358
359#[derive(Debug, Clone, PartialEq, Eq)]
360/// Consumer Group Member Description.
361pub struct ConsumerGroupMemberDescription {
362    /// Member Id.
363    pub member_id: String,
364    /// Group Instance Id.
365    pub group_instance_id: Option<String>,
366    /// Client Id.
367    pub client_id: String,
368    /// Client Host.
369    pub client_host: String,
370    /// Member Metadata Bytes.
371    pub member_metadata_bytes: usize,
372    /// Member Assignment Bytes.
373    pub member_assignment_bytes: usize,
374}
375
376/// Group Listing.
377pub type GroupListing = ConsumerGroupListing;
378
379/// Group Description.
380pub type GroupDescription = ConsumerGroupDescription;
381
382/// Group Member Description.
383pub type GroupMemberDescription = ConsumerGroupMemberDescription;
384
385#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
386/// Config Resource Type.
387pub enum ConfigResourceType {
388    /// Unknown.
389    Unknown,
390    /// Topic name.
391    Topic,
392    /// Broker.
393    Broker,
394    /// Broker logger.
395    BrokerLogger,
396    /// Client metrics.
397    ClientMetrics,
398    /// Group.
399    Group,
400}
401
402impl ConfigResourceType {
403    fn as_protocol_value(self) -> i8 {
404        match self {
405            Self::Unknown => 0,
406            Self::Topic => 2,
407            Self::Broker => 4,
408            Self::BrokerLogger => 8,
409            Self::ClientMetrics => 16,
410            Self::Group => 32,
411        }
412    }
413
414    fn from_protocol_value(value: i8) -> Self {
415        match value {
416            2 => Self::Topic,
417            4 => Self::Broker,
418            8 => Self::BrokerLogger,
419            16 => Self::ClientMetrics,
420            32 => Self::Group,
421            _ => Self::Unknown,
422        }
423    }
424}
425
426#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
427/// Config Resource.
428pub struct ConfigResource {
429    /// Resource Type.
430    pub resource_type: ConfigResourceType,
431    /// Resource Name.
432    pub resource_name: String,
433}
434
435impl ConfigResource {
436    /// Creates a new value.
437    pub fn new(resource_type: ConfigResourceType, resource_name: impl Into<String>) -> Self {
438        Self {
439            resource_type,
440            resource_name: resource_name.into(),
441        }
442    }
443
444    /// Topic name.
445    pub fn topic(resource_name: impl Into<String>) -> Self {
446        Self::new(ConfigResourceType::Topic, resource_name)
447    }
448
449    /// Group.
450    pub fn group(resource_name: impl Into<String>) -> Self {
451        Self::new(ConfigResourceType::Group, resource_name)
452    }
453}
454
455#[derive(Debug, Clone, PartialEq, Eq)]
456/// Config Entry.
457pub struct ConfigEntry {
458    /// Name returned by Kafka.
459    pub name: String,
460    /// Record value.
461    pub value: Option<String>,
462    /// Read Only.
463    pub read_only: bool,
464    /// Config Source.
465    pub config_source: i8,
466    /// Is Sensitive.
467    pub is_sensitive: bool,
468    /// Config Type.
469    pub config_type: Option<i8>,
470    /// Documentation.
471    pub documentation: Option<String>,
472}
473
474#[derive(Debug, Clone, PartialEq, Eq)]
475/// Config Resource Config.
476pub struct ConfigResourceConfig {
477    /// Resource.
478    pub resource: ConfigResource,
479    /// Entries.
480    pub entries: BTreeMap<String, ConfigEntry>,
481}
482
483impl ConfigResourceConfig {
484    /// Entry.
485    pub fn entry(&self, name: &str) -> Option<&ConfigEntry> {
486        self.entries.get(name)
487    }
488}
489
490#[derive(Debug, Clone, Copy, PartialEq, Eq)]
491/// Alter Config Op Type.
492pub enum AlterConfigOpType {
493    /// Set.
494    Set,
495    /// Delete.
496    Delete,
497    /// Append.
498    Append,
499    /// Subtract.
500    Subtract,
501}
502
503impl AlterConfigOpType {
504    fn as_protocol_value(self) -> i8 {
505        match self {
506            Self::Set => 0,
507            Self::Delete => 1,
508            Self::Append => 2,
509            Self::Subtract => 3,
510        }
511    }
512}
513
514#[derive(Debug, Clone, PartialEq, Eq)]
515/// Alter Config Op.
516pub struct AlterConfigOp {
517    /// Name returned by Kafka.
518    pub name: String,
519    /// Op Type.
520    pub op_type: AlterConfigOpType,
521    /// Record value.
522    pub value: Option<String>,
523}
524
525impl AlterConfigOp {
526    /// Set.
527    pub fn set(name: impl Into<String>, value: impl Into<String>) -> Self {
528        Self {
529            name: name.into(),
530            op_type: AlterConfigOpType::Set,
531            value: Some(value.into()),
532        }
533    }
534
535    /// Delete.
536    pub fn delete(name: impl Into<String>) -> Self {
537        Self {
538            name: name.into(),
539            op_type: AlterConfigOpType::Delete,
540            value: None,
541        }
542    }
543}
544
545#[derive(Debug, Clone)]
546/// Client for Kafka admin operations.
547pub struct KafkaAdmin {
548    config: AdminConfig,
549}
550
551impl KafkaAdmin {
552    #[instrument(
553        name = "admin.connect",
554        level = "debug",
555        skip(config),
556        fields(
557            bootstrap_server_count = config.bootstrap_servers.len(),
558            client_id = %config.client_id
559        )
560    )]
561    /// Connects to Kafka and returns the client.
562    pub async fn connect(config: AdminConfig) -> Result<Self> {
563        let admin = Self { config };
564        admin.warm_up().await?;
565        debug!("admin client connected");
566        Ok(admin)
567    }
568
569    #[instrument(name = "admin.create_topics", level = "debug", skip(self, topics))]
570    /// Create Topics.
571    pub async fn create_topics<I>(&self, topics: I) -> Result<()>
572    where
573        I: IntoIterator<Item = NewTopic>,
574    {
575        let topics = topics
576            .into_iter()
577            .map(NewTopic::into_request_topic)
578            .collect::<Result<Vec<_>>>()?;
579        if topics.is_empty() {
580            return Ok(());
581        }
582
583        let request = CreateTopicsRequest::default()
584            .with_topics(topics)
585            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
586            .with_validate_only(false);
587        let response: CreateTopicsResponse = self
588            .send_request::<CreateTopicsRequest>(CREATE_TOPICS_VERSION_CAP, &request)
589            .await?;
590
591        for topic in response.topics {
592            let name = topic.name.0.to_string();
593            if let Some(error) = topic
594                .error_code
595                .err()
596                .filter(|error| !is_ignorable_create_topic_error(*error))
597            {
598                return Err(anyhow!("create topic '{name}' failed: {error}").into());
599            }
600        }
601
602        Ok(())
603    }
604
605    #[instrument(name = "admin.delete_topics", level = "debug", skip(self, topics))]
606    /// Delete Topics.
607    pub async fn delete_topics<I, S>(&self, topics: I) -> Result<()>
608    where
609        I: IntoIterator<Item = S>,
610        S: Into<String>,
611    {
612        let topic_names = topics
613            .into_iter()
614            .map(|topic| validate_topic_name(topic.into()))
615            .collect::<Result<Vec<_>>>()?;
616        if topic_names.is_empty() {
617            return Ok(());
618        }
619
620        let request = DeleteTopicsRequest::default()
621            .with_topic_names(
622                topic_names
623                    .iter()
624                    .cloned()
625                    .map(StrBytes::from_string)
626                    .map(Into::into)
627                    .collect(),
628            )
629            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?);
630        let response: DeleteTopicsResponse = self
631            .send_request::<DeleteTopicsRequest>(DELETE_TOPICS_VERSION_CAP, &request)
632            .await?;
633
634        for topic in response.responses {
635            let name = topic
636                .name
637                .as_ref()
638                .map(|name| name.0.to_string())
639                .unwrap_or_else(|| "<unknown>".to_owned());
640            if let Some(error) = topic.error_code.err() {
641                return Err(anyhow!("delete topic '{name}' failed: {error}").into());
642            }
643        }
644
645        Ok(())
646    }
647
648    #[instrument(
649        name = "admin.create_partitions",
650        level = "debug",
651        skip(self, partitions)
652    )]
653    /// Create Partitions.
654    pub async fn create_partitions<I, S>(&self, partitions: I) -> Result<()>
655    where
656        I: IntoIterator<Item = (S, NewPartitions)>,
657        S: Into<String>,
658    {
659        let topics = partitions
660            .into_iter()
661            .map(|(topic, new_partitions)| new_partitions.into_request_topic(topic.into()))
662            .collect::<Result<Vec<_>>>()?;
663        if topics.is_empty() {
664            return Ok(());
665        }
666
667        let request = CreatePartitionsRequest::default()
668            .with_topics(topics)
669            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
670            .with_validate_only(false);
671        let response: CreatePartitionsResponse = self
672            .send_request::<CreatePartitionsRequest>(CREATE_PARTITIONS_VERSION_CAP, &request)
673            .await?;
674
675        for topic in response.results {
676            let name = topic.name.0.to_string();
677            if let Some(error) = topic.error_code.err() {
678                return Err(anyhow!(
679                    "create partitions for topic '{name}' failed: {}",
680                    topic
681                        .error_message
682                        .as_ref()
683                        .map(|message| message.to_string())
684                        .filter(|message| !message.is_empty())
685                        .unwrap_or_else(|| error.to_string())
686                )
687                .into());
688            }
689        }
690
691        Ok(())
692    }
693
694    #[instrument(name = "admin.list_topics", level = "debug", skip(self))]
695    /// List Topics.
696    pub async fn list_topics(&self) -> Result<Vec<TopicListing>> {
697        let response = self.fetch_metadata(None).await?;
698        let mut topics = Vec::new();
699
700        for topic in response.topics {
701            let Some(name) = topic.name.as_ref().map(|name| name.0.to_string()) else {
702                continue;
703            };
704            if let Some(error) = topic.error_code.err() {
705                return Err(anyhow!("list topics failed for '{name}': {error}").into());
706            }
707
708            topics.push(TopicListing {
709                name,
710                topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
711                is_internal: topic.is_internal,
712            });
713        }
714
715        topics.sort_by(|left, right| left.name.cmp(&right.name));
716        Ok(topics)
717    }
718
719    #[instrument(name = "admin.describe_topics", level = "debug", skip(self, topics))]
720    /// Describe Topics.
721    pub async fn describe_topics<I, S>(&self, topics: I) -> Result<Vec<TopicDescription>>
722    where
723        I: IntoIterator<Item = S>,
724        S: Into<String>,
725    {
726        let requested_topics = topics
727            .into_iter()
728            .map(|topic| validate_topic_name(topic.into()))
729            .collect::<Result<Vec<_>>>()?;
730        if requested_topics.is_empty() {
731            return Ok(Vec::new());
732        }
733
734        let response = self.fetch_metadata(Some(&requested_topics)).await?;
735        let mut descriptions = BTreeMap::new();
736
737        for topic in response.topics {
738            let name = topic
739                .name
740                .as_ref()
741                .map(|name| name.0.to_string())
742                .unwrap_or_default();
743            if let Some(error) = topic.error_code.err() {
744                let label = if name.is_empty() { "<unknown>" } else { &name };
745                return Err(anyhow!("describe topic '{label}' failed: {error}").into());
746            }
747
748            let mut partitions = topic
749                .partitions
750                .into_iter()
751                .map(|partition| {
752                    if let Some(error) = partition.error_code.err() {
753                        return Err(anyhow!(
754                            "describe topic '{name}' partition {} failed: {error}",
755                            partition.partition_index
756                        ));
757                    }
758
759                    Ok(TopicPartitionDescription {
760                        partition: partition.partition_index,
761                        leader_id: partition.leader_id.0,
762                        leader_epoch: partition.leader_epoch,
763                        replica_nodes: partition.replica_nodes.into_iter().map(|id| id.0).collect(),
764                        isr_nodes: partition.isr_nodes.into_iter().map(|id| id.0).collect(),
765                        offline_replicas: partition
766                            .offline_replicas
767                            .into_iter()
768                            .map(|id| id.0)
769                            .collect(),
770                    })
771                })
772                .collect::<std::result::Result<Vec<_>, _>>()?;
773            partitions.sort_by_key(|partition| partition.partition);
774
775            descriptions.insert(
776                name.clone(),
777                TopicDescription {
778                    name,
779                    topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
780                    is_internal: topic.is_internal,
781                    partitions,
782                },
783            );
784        }
785
786        requested_topics
787            .into_iter()
788            .map(|topic| {
789                descriptions.remove(&topic).ok_or_else(|| {
790                    anyhow!("metadata response did not include topic '{topic}'").into()
791                })
792            })
793            .collect()
794    }
795
796    #[instrument(name = "admin.describe_cluster", level = "debug", skip(self))]
797    /// Describe Cluster.
798    pub async fn describe_cluster(&self) -> Result<ClusterDescription> {
799        let (mut connection, version) = self
800            .connect_with_version::<DescribeClusterRequest>(DESCRIBE_CLUSTER_VERSION_CAP)
801            .await?;
802        let mut request =
803            DescribeClusterRequest::default().with_include_cluster_authorized_operations(false);
804        if version >= 1 {
805            request = request.with_endpoint_type(1);
806        }
807        if version >= 2 {
808            request = request.with_include_fenced_brokers(true);
809        }
810
811        let response: DescribeClusterResponse = connection
812            .send_request::<DescribeClusterRequest>(&self.config.client_id, version, &request)
813            .await?;
814        if let Some(error) = response.error_code.err() {
815            return Err(anyhow!("describe cluster failed: {error}").into());
816        }
817
818        let mut brokers = response
819            .brokers
820            .into_iter()
821            .map(|broker| BrokerDescription {
822                broker_id: broker.broker_id.0,
823                host: broker.host.to_string(),
824                port: broker.port,
825                rack: broker.rack.map(|rack| rack.to_string()),
826                is_fenced: broker.is_fenced,
827            })
828            .collect::<Vec<_>>();
829        brokers.sort_by_key(|broker| broker.broker_id);
830
831        Ok(ClusterDescription {
832            cluster_id: response.cluster_id.to_string(),
833            controller_id: response.controller_id.0,
834            brokers,
835        })
836    }
837
838    #[instrument(name = "admin.list_groups", level = "debug", skip(self))]
839    /// List Groups.
840    pub async fn list_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
841        let response: ListGroupsResponse = self
842            .send_request::<ListGroupsRequest>(
843                LIST_GROUPS_VERSION_CAP,
844                &ListGroupsRequest::default(),
845            )
846            .await?;
847        if let Some(error) = response.error_code.err() {
848            return Err(anyhow!("list groups failed: {error}").into());
849        }
850
851        let mut groups = response
852            .groups
853            .into_iter()
854            .map(|group| ConsumerGroupListing {
855                group_id: group.group_id.to_string(),
856                protocol_type: group.protocol_type.to_string(),
857                state: (!group.group_state.is_empty()).then(|| group.group_state.to_string()),
858                group_type: (!group.group_type.is_empty()).then(|| group.group_type.to_string()),
859            })
860            .collect::<Vec<_>>();
861        groups.sort_by(|left, right| left.group_id.cmp(&right.group_id));
862        Ok(groups)
863    }
864
865    #[instrument(name = "admin.list_consumer_groups", level = "debug", skip(self))]
866    /// List Consumer Groups.
867    pub async fn list_consumer_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
868        self.list_groups().await
869    }
870
871    #[instrument(name = "admin.describe_groups", level = "debug", skip(self, groups))]
872    /// Describe Groups.
873    pub async fn describe_groups<I, S>(&self, groups: I) -> Result<Vec<GroupDescription>>
874    where
875        I: IntoIterator<Item = S>,
876        S: Into<String>,
877    {
878        let group_ids = groups
879            .into_iter()
880            .map(|group| validate_group_id(group.into()))
881            .collect::<Result<Vec<_>>>()?;
882        if group_ids.is_empty() {
883            return Ok(Vec::new());
884        }
885
886        let listed_group_types = self
887            .list_groups()
888            .await?
889            .into_iter()
890            .map(|group| (group.group_id, group.group_type))
891            .collect::<BTreeMap<_, _>>();
892        let mut consumer_groups = Vec::new();
893        let mut share_groups = Vec::new();
894        let mut classic_groups = Vec::new();
895        for group_id in &group_ids {
896            match listed_group_types
897                .get(group_id)
898                .and_then(|group_type| group_type.as_deref())
899            {
900                Some(group_type) if group_type.eq_ignore_ascii_case("share") => {
901                    share_groups.push(group_id.clone());
902                }
903                Some(group_type) if group_type.eq_ignore_ascii_case("classic") => {
904                    classic_groups.push(group_id.clone());
905                }
906                _ => consumer_groups.push(group_id.clone()),
907            }
908        }
909
910        let mut descriptions = BTreeMap::new();
911        for group in self.describe_consumer_groups(consumer_groups).await? {
912            descriptions.insert(group.group_id.clone(), group);
913        }
914        for group in self.describe_share_groups(share_groups).await? {
915            descriptions.insert(group.group_id.clone(), group);
916        }
917        for group in self.describe_classic_groups(classic_groups).await? {
918            descriptions.insert(group.group_id.clone(), group);
919        }
920
921        group_ids
922            .into_iter()
923            .map(|group_id| {
924                descriptions.remove(&group_id).ok_or_else(|| {
925                    anyhow!("describe groups response did not include group '{group_id}'").into()
926                })
927            })
928            .collect()
929    }
930
931    #[instrument(
932        name = "admin.describe_consumer_groups",
933        level = "debug",
934        skip(self, groups)
935    )]
936    /// Describe Consumer Groups.
937    pub async fn describe_consumer_groups<I, S>(
938        &self,
939        groups: I,
940    ) -> Result<Vec<ConsumerGroupDescription>>
941    where
942        I: IntoIterator<Item = S>,
943        S: Into<String>,
944    {
945        let group_ids = groups
946            .into_iter()
947            .map(|group| validate_group_id(group.into()))
948            .collect::<Result<Vec<_>>>()?;
949        if group_ids.is_empty() {
950            return Ok(Vec::new());
951        }
952
953        let request = ConsumerGroupDescribeRequest::default()
954            .with_group_ids(
955                group_ids
956                    .iter()
957                    .cloned()
958                    .map(StrBytes::from_string)
959                    .map(Into::into)
960                    .collect(),
961            )
962            .with_include_authorized_operations(false);
963        let response: ConsumerGroupDescribeResponse = self
964            .send_request::<ConsumerGroupDescribeRequest>(
965                CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
966                &request,
967            )
968            .await?;
969
970        let mut descriptions = BTreeMap::new();
971        for group in response.groups {
972            let group_id = group.group_id.to_string();
973            if let Some(error) = group.error_code.err() {
974                let message = group
975                    .error_message
976                    .as_ref()
977                    .map(ToString::to_string)
978                    .filter(|message| !message.is_empty())
979                    .unwrap_or_else(|| error.to_string());
980                return Err(
981                    anyhow!("describe consumer group '{group_id}' failed: {message}").into(),
982                );
983            }
984
985            descriptions.insert(
986                group_id.clone(),
987                ConsumerGroupDescription {
988                    group_id,
989                    state: group.group_state.to_string(),
990                    protocol_type: "consumer".to_owned(),
991                    protocol_data: group.assignor_name.to_string(),
992                    members: group
993                        .members
994                        .into_iter()
995                        .map(|member| ConsumerGroupMemberDescription {
996                            member_id: member.member_id.to_string(),
997                            group_instance_id: member
998                                .instance_id
999                                .map(|instance_id| instance_id.to_string()),
1000                            client_id: member.client_id.to_string(),
1001                            client_host: member.client_host.to_string(),
1002                            member_metadata_bytes: member.subscribed_topic_names.len(),
1003                            member_assignment_bytes: assignment_partition_count(&member.assignment),
1004                        })
1005                        .collect(),
1006                    authorized_operations: (group.authorized_operations != i32::MIN)
1007                        .then_some(group.authorized_operations),
1008                },
1009            );
1010        }
1011
1012        group_ids
1013            .into_iter()
1014            .map(|group_id| {
1015                descriptions.remove(&group_id).ok_or_else(|| {
1016                    anyhow!("describe groups response did not include group '{group_id}'").into()
1017                })
1018            })
1019            .collect()
1020    }
1021
1022    #[instrument(
1023        name = "admin.describe_classic_groups",
1024        level = "debug",
1025        skip(self, groups)
1026    )]
1027    /// Describe Classic Groups.
1028    pub async fn describe_classic_groups<I, S>(
1029        &self,
1030        groups: I,
1031    ) -> Result<Vec<ConsumerGroupDescription>>
1032    where
1033        I: IntoIterator<Item = S>,
1034        S: Into<String>,
1035    {
1036        let group_ids = groups
1037            .into_iter()
1038            .map(|group| validate_group_id(group.into()))
1039            .collect::<Result<Vec<_>>>()?;
1040        if group_ids.is_empty() {
1041            return Ok(Vec::new());
1042        }
1043
1044        let request = DescribeGroupsRequest::default()
1045            .with_groups(
1046                group_ids
1047                    .iter()
1048                    .cloned()
1049                    .map(StrBytes::from_string)
1050                    .map(Into::into)
1051                    .collect(),
1052            )
1053            .with_include_authorized_operations(false);
1054        let response: DescribeGroupsResponse = self
1055            .send_request::<DescribeGroupsRequest>(DESCRIBE_GROUPS_VERSION_CAP, &request)
1056            .await?;
1057
1058        let mut descriptions = BTreeMap::new();
1059        for group in response.groups {
1060            let group_id = group.group_id.to_string();
1061            if let Some(error) = group.error_code.err() {
1062                let message = group
1063                    .error_message
1064                    .as_ref()
1065                    .map(ToString::to_string)
1066                    .filter(|message| !message.is_empty())
1067                    .unwrap_or_else(|| error.to_string());
1068                return Err(
1069                    anyhow!("describe classic group '{group_id}' failed: {message}").into(),
1070                );
1071            }
1072
1073            descriptions.insert(
1074                group_id.clone(),
1075                ConsumerGroupDescription {
1076                    group_id,
1077                    state: group.group_state.to_string(),
1078                    protocol_type: group.protocol_type.to_string(),
1079                    protocol_data: group.protocol_data.to_string(),
1080                    members: group
1081                        .members
1082                        .into_iter()
1083                        .map(|member| {
1084                            let member_metadata_bytes =
1085                                classic_subscription_topic_count(&member.member_metadata);
1086                            let member_assignment_bytes =
1087                                classic_assignment_partition_count(&member.member_assignment);
1088                            ConsumerGroupMemberDescription {
1089                                member_id: member.member_id.to_string(),
1090                                group_instance_id: member
1091                                    .group_instance_id
1092                                    .map(|instance_id| instance_id.to_string()),
1093                                client_id: member.client_id.to_string(),
1094                                client_host: member.client_host.to_string(),
1095                                member_metadata_bytes,
1096                                member_assignment_bytes,
1097                            }
1098                        })
1099                        .collect(),
1100                    authorized_operations: (group.authorized_operations != i32::MIN)
1101                        .then_some(group.authorized_operations),
1102                },
1103            );
1104        }
1105
1106        group_ids
1107            .into_iter()
1108            .map(|group_id| {
1109                descriptions.remove(&group_id).ok_or_else(|| {
1110                    anyhow!("describe classic groups response did not include group '{group_id}'")
1111                        .into()
1112                })
1113            })
1114            .collect()
1115    }
1116
1117    #[instrument(
1118        name = "admin.describe_share_groups",
1119        level = "debug",
1120        skip(self, groups)
1121    )]
1122    /// Describe Share Groups.
1123    pub async fn describe_share_groups<I, S>(
1124        &self,
1125        groups: I,
1126    ) -> Result<Vec<ConsumerGroupDescription>>
1127    where
1128        I: IntoIterator<Item = S>,
1129        S: Into<String>,
1130    {
1131        let group_ids = groups
1132            .into_iter()
1133            .map(|group| validate_group_id(group.into()))
1134            .collect::<Result<Vec<_>>>()?;
1135        if group_ids.is_empty() {
1136            return Ok(Vec::new());
1137        }
1138
1139        let request = ShareGroupDescribeRequest::default()
1140            .with_group_ids(
1141                group_ids
1142                    .iter()
1143                    .cloned()
1144                    .map(StrBytes::from_string)
1145                    .map(Into::into)
1146                    .collect(),
1147            )
1148            .with_include_authorized_operations(false);
1149        let response: ShareGroupDescribeResponse = self
1150            .send_request::<ShareGroupDescribeRequest>(SHARE_GROUP_DESCRIBE_VERSION_CAP, &request)
1151            .await?;
1152
1153        let mut descriptions = BTreeMap::new();
1154        for group in response.groups {
1155            let group_id = group.group_id.to_string();
1156            if let Some(error) = group.error_code.err() {
1157                let message = group
1158                    .error_message
1159                    .as_ref()
1160                    .map(ToString::to_string)
1161                    .filter(|message| !message.is_empty())
1162                    .unwrap_or_else(|| error.to_string());
1163                return Err(anyhow!("describe share group '{group_id}' failed: {message}").into());
1164            }
1165
1166            descriptions.insert(
1167                group_id.clone(),
1168                ConsumerGroupDescription {
1169                    group_id,
1170                    state: group.group_state.to_string(),
1171                    protocol_type: "share".to_owned(),
1172                    protocol_data: group.assignor_name.to_string(),
1173                    members: group
1174                        .members
1175                        .into_iter()
1176                        .map(|member| ConsumerGroupMemberDescription {
1177                            member_id: member.member_id.to_string(),
1178                            group_instance_id: None,
1179                            client_id: member.client_id.to_string(),
1180                            client_host: member.client_host.to_string(),
1181                            member_metadata_bytes: member.subscribed_topic_names.len(),
1182                            member_assignment_bytes: share_assignment_partition_count(
1183                                &member.assignment,
1184                            ),
1185                        })
1186                        .collect(),
1187                    authorized_operations: (group.authorized_operations != i32::MIN)
1188                        .then_some(group.authorized_operations),
1189                },
1190            );
1191        }
1192
1193        group_ids
1194            .into_iter()
1195            .map(|group_id| {
1196                descriptions.remove(&group_id).ok_or_else(|| {
1197                    anyhow!("describe share groups response did not include group '{group_id}'")
1198                        .into()
1199                })
1200            })
1201            .collect()
1202    }
1203
1204    #[instrument(name = "admin.delete_groups", level = "debug", skip(self, groups))]
1205    /// Delete Groups.
1206    pub async fn delete_groups<I, S>(&self, groups: I) -> Result<()>
1207    where
1208        I: IntoIterator<Item = S>,
1209        S: Into<String>,
1210    {
1211        let group_ids = groups
1212            .into_iter()
1213            .map(|group| validate_group_id(group.into()))
1214            .collect::<Result<Vec<_>>>()?;
1215        if group_ids.is_empty() {
1216            return Ok(());
1217        }
1218
1219        let request = DeleteGroupsRequest::default().with_groups_names(
1220            group_ids
1221                .iter()
1222                .cloned()
1223                .map(StrBytes::from_string)
1224                .map(Into::into)
1225                .collect(),
1226        );
1227        let response: DeleteGroupsResponse = self
1228            .send_request::<DeleteGroupsRequest>(DELETE_GROUPS_VERSION_CAP, &request)
1229            .await?;
1230
1231        for result in response.results {
1232            if let Some(error) = result.error_code.err() {
1233                return Err(anyhow!("delete group '{}' failed: {error}", &*result.group_id).into());
1234            }
1235        }
1236        Ok(())
1237    }
1238
1239    #[instrument(
1240        name = "admin.delete_consumer_groups",
1241        level = "debug",
1242        skip(self, groups)
1243    )]
1244    /// Delete Consumer Groups.
1245    pub async fn delete_consumer_groups<I, S>(&self, groups: I) -> Result<()>
1246    where
1247        I: IntoIterator<Item = S>,
1248        S: Into<String>,
1249    {
1250        self.delete_groups(groups).await
1251    }
1252
1253    #[instrument(
1254        name = "admin.describe_configs",
1255        level = "debug",
1256        skip(self, resources)
1257    )]
1258    /// Describe Configs.
1259    pub async fn describe_configs<I>(&self, resources: I) -> Result<Vec<ConfigResourceConfig>>
1260    where
1261        I: IntoIterator<Item = ConfigResource>,
1262    {
1263        let resources = resources.into_iter().collect::<Vec<_>>();
1264        if resources.is_empty() {
1265            return Ok(Vec::new());
1266        }
1267
1268        let request = DescribeConfigsRequest::default()
1269            .with_resources(
1270                resources
1271                    .iter()
1272                    .map(|resource| {
1273                        DescribeConfigsResource::default()
1274                            .with_resource_type(resource.resource_type.as_protocol_value())
1275                            .with_resource_name(StrBytes::from_string(
1276                                resource.resource_name.clone(),
1277                            ))
1278                            .with_configuration_keys(None)
1279                    })
1280                    .collect(),
1281            )
1282            .with_include_synonyms(false);
1283        let response: DescribeConfigsResponse = self
1284            .send_request::<DescribeConfigsRequest>(DESCRIBE_CONFIGS_VERSION_CAP, &request)
1285            .await?;
1286
1287        let mut described = BTreeMap::new();
1288        for resource in response.results {
1289            let resource_type = ConfigResourceType::from_protocol_value(resource.resource_type);
1290            let resource_name = resource.resource_name.to_string();
1291            if let Some(error) = resource.error_code.err() {
1292                return Err(anyhow!(
1293                    "describe configs for {:?} '{}' failed: {}",
1294                    resource_type,
1295                    resource_name,
1296                    resource
1297                        .error_message
1298                        .as_ref()
1299                        .map(|message| message.to_string())
1300                        .filter(|message| !message.is_empty())
1301                        .unwrap_or_else(|| error.to_string())
1302                )
1303                .into());
1304            }
1305
1306            let entries = resource
1307                .configs
1308                .into_iter()
1309                .map(|entry| {
1310                    let name = entry.name.to_string();
1311                    let config_entry = ConfigEntry {
1312                        name: name.clone(),
1313                        value: entry.value.map(|value| value.to_string()),
1314                        read_only: entry.read_only,
1315                        config_source: entry.config_source,
1316                        is_sensitive: entry.is_sensitive,
1317                        config_type: (response_supported_config_type(entry.config_type))
1318                            .then_some(entry.config_type),
1319                        documentation: entry.documentation.map(|doc| doc.to_string()),
1320                    };
1321                    (name, config_entry)
1322                })
1323                .collect();
1324
1325            described.insert(
1326                (resource_type, resource_name.clone()),
1327                ConfigResourceConfig {
1328                    resource: ConfigResource::new(resource_type, resource_name),
1329                    entries,
1330                },
1331            );
1332        }
1333
1334        resources
1335            .into_iter()
1336            .map(|resource| {
1337                described
1338                    .remove(&(resource.resource_type, resource.resource_name.clone()))
1339                    .ok_or_else(|| {
1340                        anyhow!("describe configs response did not include {:?}", resource).into()
1341                    })
1342            })
1343            .collect()
1344    }
1345
1346    #[instrument(
1347        name = "admin.incremental_alter_configs",
1348        level = "debug",
1349        skip(self, resources)
1350    )]
1351    /// Incremental Alter Configs.
1352    pub async fn incremental_alter_configs<I>(&self, resources: I) -> Result<()>
1353    where
1354        I: IntoIterator<Item = (ConfigResource, Vec<AlterConfigOp>)>,
1355    {
1356        let resources = resources
1357            .into_iter()
1358            .map(|(resource, ops)| {
1359                AlterConfigsResource::default()
1360                    .with_resource_type(resource.resource_type.as_protocol_value())
1361                    .with_resource_name(StrBytes::from_string(resource.resource_name))
1362                    .with_configs(
1363                        ops.into_iter()
1364                            .map(|op| {
1365                                AlterableConfig::default()
1366                                    .with_name(StrBytes::from_string(op.name))
1367                                    .with_config_operation(op.op_type.as_protocol_value())
1368                                    .with_value(op.value.map(StrBytes::from_string))
1369                            })
1370                            .collect(),
1371                    )
1372            })
1373            .collect::<Vec<_>>();
1374        if resources.is_empty() {
1375            return Ok(());
1376        }
1377
1378        let request = IncrementalAlterConfigsRequest::default()
1379            .with_resources(resources)
1380            .with_validate_only(false);
1381        let response: IncrementalAlterConfigsResponse = self
1382            .send_request::<IncrementalAlterConfigsRequest>(
1383                INCREMENTAL_ALTER_CONFIGS_VERSION_CAP,
1384                &request,
1385            )
1386            .await?;
1387
1388        for resource in response.responses {
1389            if let Some(error) = resource.error_code.err() {
1390                return Err(anyhow!(
1391                    "incremental alter configs for {:?} '{}' failed: {}",
1392                    ConfigResourceType::from_protocol_value(resource.resource_type),
1393                    resource.resource_name,
1394                    resource
1395                        .error_message
1396                        .as_ref()
1397                        .map(|message| message.to_string())
1398                        .filter(|message| !message.is_empty())
1399                        .unwrap_or_else(|| error.to_string())
1400                )
1401                .into());
1402            }
1403        }
1404
1405        Ok(())
1406    }
1407
1408    #[instrument(
1409        name = "admin.upsert_scram_credential",
1410        level = "debug",
1411        skip(self, user, password)
1412    )]
1413    /// Upsert Scram Credential.
1414    pub async fn upsert_scram_credential(
1415        &self,
1416        user: impl Into<String>,
1417        mechanism: SaslMechanism,
1418        password: impl AsRef<[u8]>,
1419    ) -> Result<()> {
1420        self.upsert_scram_credential_with_iterations(
1421            user,
1422            mechanism,
1423            password,
1424            scram::MIN_ITERATIONS,
1425        )
1426        .await
1427    }
1428
1429    #[instrument(
1430        name = "admin.upsert_scram_credential_with_iterations",
1431        level = "debug",
1432        skip(self, user, password)
1433    )]
1434    /// Upsert Scram Credential With Iterations.
1435    pub async fn upsert_scram_credential_with_iterations(
1436        &self,
1437        user: impl Into<String>,
1438        mechanism: SaslMechanism,
1439        password: impl AsRef<[u8]>,
1440        iterations: i32,
1441    ) -> Result<()> {
1442        let user = user.into();
1443        let mechanism_type = mechanism
1444            .scram_type()
1445            .ok_or_else(|| anyhow!("SCRAM credential upsertion requires a SCRAM mechanism"))?;
1446        let salt = scram::secure_random_bytes()?;
1447        let salted_password =
1448            scram::salted_password(mechanism, password.as_ref(), &salt, iterations)?;
1449        let request = AlterUserScramCredentialsRequest::default().with_upsertions(vec![
1450            ScramCredentialUpsertion::default()
1451                .with_name(StrBytes::from_string(user.clone()))
1452                .with_mechanism(mechanism_type)
1453                .with_iterations(iterations)
1454                .with_salt(Bytes::from(salt))
1455                .with_salted_password(Bytes::from(salted_password)),
1456        ]);
1457        let response: AlterUserScramCredentialsResponse = self
1458            .send_request::<AlterUserScramCredentialsRequest>(
1459                ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP,
1460                &request,
1461            )
1462            .await?;
1463
1464        for result in response.results {
1465            if let Some(error) = result.error_code.err() {
1466                return Err(anyhow!(
1467                    "alter SCRAM credential for user '{}' failed: {}",
1468                    result.user,
1469                    result
1470                        .error_message
1471                        .as_ref()
1472                        .map(|message| message.to_string())
1473                        .filter(|message| !message.is_empty())
1474                        .unwrap_or_else(|| error.to_string())
1475                )
1476                .into());
1477            }
1478        }
1479
1480        Ok(())
1481    }
1482
1483    /// Config.
1484    pub fn config(&self) -> &AdminConfig {
1485        &self.config
1486    }
1487
1488    /// Finalized Feature Levels.
1489    pub async fn finalized_feature_levels(&self) -> Result<Vec<BrokerFeatureLevel>> {
1490        let connection = connect_to_any_bootstrap(
1491            &self.config.bootstrap_servers,
1492            &self.config.client_id,
1493            self.config.request_timeout,
1494            self.config.security_protocol,
1495            &self.config.tls,
1496            &self.config.sasl,
1497            &self.config.tcp_connector,
1498        )
1499        .await?;
1500        Ok(connection
1501            .finalized_feature_levels()
1502            .into_iter()
1503            .map(|(name, level)| BrokerFeatureLevel { name, level })
1504            .collect())
1505    }
1506
1507    /// Updates finalized broker feature levels.
1508    pub async fn update_features<I>(&self, updates: I) -> Result<()>
1509    where
1510        I: IntoIterator<Item = FeatureUpdate>,
1511    {
1512        self.update_features_inner(updates, false).await
1513    }
1514
1515    /// Validates finalized broker feature updates without applying them.
1516    pub async fn validate_feature_updates<I>(&self, updates: I) -> Result<()>
1517    where
1518        I: IntoIterator<Item = FeatureUpdate>,
1519    {
1520        self.update_features_inner(updates, true).await
1521    }
1522
1523    async fn update_features_inner<I>(&self, updates: I, validate_only: bool) -> Result<()>
1524    where
1525        I: IntoIterator<Item = FeatureUpdate>,
1526    {
1527        let (mut connection, version) = self
1528            .connect_with_version::<UpdateFeaturesRequest>(UPDATE_FEATURES_VERSION_CAP)
1529            .await?;
1530        let feature_updates = updates
1531            .into_iter()
1532            .map(|update| update.into_request_update(version))
1533            .collect::<Result<Vec<_>>>()?;
1534        if feature_updates.is_empty() {
1535            return Ok(());
1536        }
1537
1538        let mut request = UpdateFeaturesRequest::default()
1539            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
1540            .with_feature_updates(feature_updates);
1541        if version >= 1 {
1542            request = request.with_validate_only(validate_only);
1543        } else if validate_only {
1544            return Err(anyhow!("validate-only feature updates require UpdateFeatures v1+").into());
1545        }
1546
1547        let response: UpdateFeaturesResponse = connection
1548            .send_request::<UpdateFeaturesRequest>(&self.config.client_id, version, &request)
1549            .await?;
1550        if let Some(error) = response.error_code.err() {
1551            return Err(anyhow!(
1552                "update features failed: {}",
1553                response
1554                    .error_message
1555                    .as_ref()
1556                    .map(|message| message.to_string())
1557                    .filter(|message| !message.is_empty())
1558                    .unwrap_or_else(|| error.to_string())
1559            )
1560            .into());
1561        }
1562
1563        for result in response.results {
1564            if let Some(error) = result.error_code.err() {
1565                let feature = result.feature.to_string();
1566                return Err(anyhow!(
1567                    "update feature '{feature}' failed: {}",
1568                    result
1569                        .error_message
1570                        .as_ref()
1571                        .map(|message| message.to_string())
1572                        .filter(|message| !message.is_empty())
1573                        .unwrap_or_else(|| error.to_string())
1574                )
1575                .into());
1576            }
1577        }
1578
1579        Ok(())
1580    }
1581
1582    async fn warm_up(&self) -> Result<()> {
1583        let _ = connect_to_any_bootstrap(
1584            &self.config.bootstrap_servers,
1585            &self.config.client_id,
1586            self.config.request_timeout,
1587            self.config.security_protocol,
1588            &self.config.tls,
1589            &self.config.sasl,
1590            &self.config.tcp_connector,
1591        )
1592        .await?;
1593        Ok(())
1594    }
1595
1596    async fn fetch_metadata(&self, topics: Option<&[String]>) -> Result<MetadataResponse> {
1597        let (mut connection, version) = self
1598            .connect_with_version::<MetadataRequest>(METADATA_VERSION_CAP)
1599            .await?;
1600        let request = MetadataRequest::default()
1601            .with_topics(topics.map(|topics| {
1602                topics
1603                    .iter()
1604                    .cloned()
1605                    .map(StrBytes::from_string)
1606                    .map(|name| MetadataRequestTopic::default().with_name(Some(name.into())))
1607                    .collect()
1608            }))
1609            .with_allow_auto_topic_creation(false)
1610            .with_include_cluster_authorized_operations(false)
1611            .with_include_topic_authorized_operations(false);
1612        Ok(connection
1613            .send_request::<MetadataRequest>(&self.config.client_id, version, &request)
1614            .await?)
1615    }
1616
1617    async fn send_request<Req>(&self, version_cap: i16, request: &Req) -> Result<Req::Response>
1618    where
1619        Req: Request,
1620    {
1621        let (mut connection, version) = self.connect_with_version::<Req>(version_cap).await?;
1622        Ok(connection
1623            .send_request::<Req>(&self.config.client_id, version, request)
1624            .await?)
1625    }
1626
1627    async fn connect_with_version<Req>(&self, version_cap: i16) -> Result<(BrokerConnection, i16)>
1628    where
1629        Req: Request,
1630    {
1631        let connection = connect_to_any_bootstrap(
1632            &self.config.bootstrap_servers,
1633            &self.config.client_id,
1634            self.config.request_timeout,
1635            self.config.security_protocol,
1636            &self.config.tls,
1637            &self.config.sasl,
1638            &self.config.tcp_connector,
1639        )
1640        .await?;
1641        let version = connection.version_with_cap::<Req>(version_cap)?;
1642        Ok((connection, version))
1643    }
1644}
1645
1646fn validate_topic_name(topic: String) -> Result<String> {
1647    let topic = topic.trim();
1648    if topic.is_empty() {
1649        return Err(AdminError::EmptyTopicName.into());
1650    }
1651
1652    Ok(topic.to_owned())
1653}
1654
1655fn validate_group_id(group_id: String) -> Result<String> {
1656    let group_id = group_id.trim();
1657    if group_id.is_empty() {
1658        return Err(anyhow!("consumer group id cannot be empty").into());
1659    }
1660
1661    Ok(group_id.to_owned())
1662}
1663
1664fn validate_feature_name(feature: String) -> Result<String> {
1665    let feature = feature.trim();
1666    if feature.is_empty() {
1667        return Err(anyhow!("feature names must be non-empty").into());
1668    }
1669
1670    Ok(feature.to_owned())
1671}
1672
1673fn assignment_partition_count(
1674    assignment: &kafka_protocol::messages::consumer_group_describe_response::Assignment,
1675) -> usize {
1676    assignment
1677        .topic_partitions
1678        .iter()
1679        .map(|topic| topic.partitions.len())
1680        .sum()
1681}
1682
1683fn share_assignment_partition_count(
1684    assignment: &kafka_protocol::messages::share_group_describe_response::Assignment,
1685) -> usize {
1686    assignment
1687        .topic_partitions
1688        .iter()
1689        .map(|topic| topic.partitions.len())
1690        .sum()
1691}
1692
1693fn classic_subscription_topic_count(metadata: &Bytes) -> usize {
1694    let Some((version, mut body)) = classic_protocol_body(metadata) else {
1695        return 0;
1696    };
1697    ConsumerProtocolSubscription::decode(&mut body, version)
1698        .map(|subscription| subscription.topics.len())
1699        .unwrap_or(0)
1700}
1701
1702fn classic_assignment_partition_count(assignment: &Bytes) -> usize {
1703    let Some((version, mut body)) = classic_protocol_body(assignment) else {
1704        return 0;
1705    };
1706    ConsumerProtocolAssignment::decode(&mut body, version)
1707        .map(|assignment| {
1708            assignment
1709                .assigned_partitions
1710                .iter()
1711                .map(|topic| topic.partitions.len())
1712                .sum()
1713        })
1714        .unwrap_or(0)
1715}
1716
1717fn classic_protocol_body(bytes: &Bytes) -> Option<(i16, Bytes)> {
1718    let mut body = bytes.clone();
1719    if body.remaining() < 2 {
1720        return None;
1721    }
1722    let version = body.get_i16();
1723    Some((version, body))
1724}
1725
1726fn is_ignorable_create_topic_error(error: ResponseError) -> bool {
1727    error == ResponseError::TopicAlreadyExists
1728}
1729
1730fn response_supported_config_type(config_type: i8) -> bool {
1731    config_type >= 0
1732}
1733
1734#[cfg(test)]
1735mod tests {
1736    use super::*;
1737
1738    #[test]
1739    fn new_topic_maps_to_create_topics_request() {
1740        let topic = NewTopic::new("orders", 3, 2)
1741            .with_config("cleanup.policy", "compact")
1742            .into_request_topic()
1743            .expect("topic should be valid");
1744
1745        assert_eq!(topic.name.0.to_string(), "orders");
1746        assert_eq!(topic.num_partitions, 3);
1747        assert_eq!(topic.replication_factor, 2);
1748        assert_eq!(topic.configs.len(), 1);
1749        assert_eq!(topic.configs[0].name.to_string(), "cleanup.policy");
1750    }
1751
1752    #[test]
1753    fn new_topic_rejects_invalid_partition_count() {
1754        let error = NewTopic::new("orders", 0, 1)
1755            .into_request_topic()
1756            .expect_err("invalid topic should fail");
1757        assert!(
1758            error
1759                .to_string()
1760                .contains("topic partition count must be positive")
1761        );
1762    }
1763
1764    #[test]
1765    fn new_topic_rejects_empty_names_and_invalid_replication_factor() {
1766        let error = NewTopic::new("  ", 1, 1).into_request_topic().unwrap_err();
1767        assert!(matches!(
1768            error,
1769            crate::Error::Admin(AdminError::EmptyTopicName)
1770        ));
1771
1772        let error = NewTopic::new("orders", 1, 0)
1773            .into_request_topic()
1774            .unwrap_err();
1775        assert!(matches!(
1776            error,
1777            crate::Error::Admin(AdminError::InvalidReplicationFactor {
1778                replication_factor: 0
1779            })
1780        ));
1781    }
1782
1783    #[test]
1784    fn new_partitions_maps_assignments_and_rejects_invalid_input() {
1785        let topic = NewPartitions::increase_to(4)
1786            .with_assignment([1, 2])
1787            .with_assignment([2, 3])
1788            .into_request_topic("orders".to_owned())
1789            .unwrap();
1790        assert_eq!(topic.name.to_string(), "orders");
1791        assert_eq!(topic.count, 4);
1792        assert_eq!(topic.assignments.unwrap().len(), 2);
1793
1794        let error = NewPartitions::increase_to(0)
1795            .into_request_topic("orders".to_owned())
1796            .unwrap_err();
1797        assert!(matches!(
1798            error,
1799            crate::Error::Admin(AdminError::InvalidPartitionCount { partitions: 0 })
1800        ));
1801
1802        let error = NewPartitions::increase_to(1)
1803            .into_request_topic(" ".to_owned())
1804            .unwrap_err();
1805        assert!(matches!(
1806            error,
1807            crate::Error::Admin(AdminError::EmptyTopicName)
1808        ));
1809    }
1810
1811    #[test]
1812    fn config_resource_and_operation_protocol_values_are_stable() {
1813        assert_eq!(ConfigResourceType::Unknown.as_protocol_value(), 0);
1814        assert_eq!(ConfigResourceType::Topic.as_protocol_value(), 2);
1815        assert_eq!(ConfigResourceType::Broker.as_protocol_value(), 4);
1816        assert_eq!(ConfigResourceType::BrokerLogger.as_protocol_value(), 8);
1817        assert_eq!(ConfigResourceType::ClientMetrics.as_protocol_value(), 16);
1818        assert_eq!(ConfigResourceType::Group.as_protocol_value(), 32);
1819        assert_eq!(
1820            ConfigResourceType::from_protocol_value(2),
1821            ConfigResourceType::Topic
1822        );
1823        assert_eq!(
1824            ConfigResourceType::from_protocol_value(99),
1825            ConfigResourceType::Unknown
1826        );
1827
1828        assert_eq!(
1829            AlterConfigOp::set("cleanup.policy", "compact")
1830                .op_type
1831                .as_protocol_value(),
1832            0
1833        );
1834        assert_eq!(
1835            AlterConfigOp::delete("retention.ms")
1836                .op_type
1837                .as_protocol_value(),
1838            1
1839        );
1840        assert_eq!(AlterConfigOpType::Append.as_protocol_value(), 2);
1841        assert_eq!(AlterConfigOpType::Subtract.as_protocol_value(), 3);
1842        assert!(response_supported_config_type(0));
1843        assert!(!response_supported_config_type(-1));
1844    }
1845
1846    #[test]
1847    fn config_resource_config_entry_lookup_returns_named_entry() {
1848        let mut entries = BTreeMap::new();
1849        entries.insert(
1850            "cleanup.policy".to_owned(),
1851            ConfigEntry {
1852                name: "cleanup.policy".to_owned(),
1853                value: Some("compact".to_owned()),
1854                read_only: false,
1855                config_source: 1,
1856                is_sensitive: false,
1857                config_type: Some(2),
1858                documentation: None,
1859            },
1860        );
1861        let config = ConfigResourceConfig {
1862            resource: ConfigResource::topic("orders"),
1863            entries,
1864        };
1865
1866        assert_eq!(
1867            config.entry("cleanup.policy").unwrap().value.as_deref(),
1868            Some("compact")
1869        );
1870        assert!(config.entry("missing").is_none());
1871    }
1872
1873    #[test]
1874    fn feature_update_maps_to_modern_update_features_request() {
1875        let update = FeatureUpdate::upgrade("share.version", 1)
1876            .into_request_update(2)
1877            .unwrap();
1878        assert_eq!(update.feature.to_string(), "share.version");
1879        assert_eq!(update.max_version_level, 1);
1880        assert_eq!(update.upgrade_type, 1);
1881        assert!(!update.allow_downgrade);
1882    }
1883
1884    #[test]
1885    fn feature_update_maps_to_legacy_downgrade_flag() {
1886        let update = FeatureUpdate::safe_downgrade("metadata.version", 0)
1887            .into_request_update(0)
1888            .unwrap();
1889        assert_eq!(update.feature.to_string(), "metadata.version");
1890        assert_eq!(update.max_version_level, 0);
1891        assert_eq!(update.upgrade_type, 1);
1892        assert!(update.allow_downgrade);
1893    }
1894
1895    #[test]
1896    fn feature_update_rejects_empty_name() {
1897        let error = FeatureUpdate::upgrade(" ", 1)
1898            .into_request_update(2)
1899            .unwrap_err();
1900        assert!(
1901            error
1902                .to_string()
1903                .contains("feature names must be non-empty")
1904        );
1905    }
1906
1907    #[test]
1908    fn topic_already_exists_is_ignorable() {
1909        assert!(is_ignorable_create_topic_error(
1910            ResponseError::TopicAlreadyExists
1911        ));
1912        assert!(!is_ignorable_create_topic_error(
1913            ResponseError::UnknownTopicOrPartition
1914        ));
1915    }
1916}