Skip to main content

kafkit_client/admin/
mod.rs

1//! Admin client operations for topics, groups, configs, and cluster metadata.
2//!
3//! ```no_run
4//! # async fn example() -> kafkit_client::Result<()> {
5//! use kafkit_client::{KafkaClient, NewTopic};
6//!
7//! let admin = KafkaClient::new("localhost:9092").admin().connect().await?;
8//! admin
9//!     .create_topics([NewTopic::new("orders", 3, 1)])
10//!     .await?;
11//! # Ok(())
12//! # }
13//! ```
14//!
15use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use bytes::Bytes;
19use kafka_protocol::error::{ParseResponseErrorCode, ResponseError};
20use kafka_protocol::messages::alter_user_scram_credentials_request::ScramCredentialUpsertion;
21use kafka_protocol::messages::create_partitions_request::{
22    CreatePartitionsAssignment, CreatePartitionsTopic,
23};
24use kafka_protocol::messages::create_topics_request::{CreatableTopic, CreatableTopicConfig};
25use kafka_protocol::messages::describe_configs_request::DescribeConfigsResource;
26use kafka_protocol::messages::incremental_alter_configs_request::{
27    AlterConfigsResource, AlterableConfig,
28};
29use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
30use kafka_protocol::messages::{
31    AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse,
32    ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, CreatePartitionsRequest,
33    CreatePartitionsResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteGroupsRequest,
34    DeleteGroupsResponse, DeleteTopicsRequest, DeleteTopicsResponse, DescribeClusterRequest,
35    DescribeClusterResponse, DescribeConfigsRequest, DescribeConfigsResponse,
36    IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse, ListGroupsRequest,
37    ListGroupsResponse, MetadataRequest, MetadataResponse,
38};
39use kafka_protocol::protocol::{Request, StrBytes};
40use tracing::{debug, instrument};
41use uuid::Uuid;
42
43use crate::config::{AdminConfig, SaslMechanism};
44use crate::constants::{
45    ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP, CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
46    CREATE_PARTITIONS_VERSION_CAP, CREATE_TOPICS_VERSION_CAP, DELETE_GROUPS_VERSION_CAP,
47    DELETE_TOPICS_VERSION_CAP, DESCRIBE_CLUSTER_VERSION_CAP, DESCRIBE_CONFIGS_VERSION_CAP,
48    INCREMENTAL_ALTER_CONFIGS_VERSION_CAP, LIST_GROUPS_VERSION_CAP, METADATA_VERSION_CAP,
49};
50use crate::network::scram;
51use crate::network::{BrokerConnection, connect_to_any_bootstrap, duration_to_i32_ms};
52use crate::{AdminError, Result};
53
54#[derive(Debug, Clone)]
55/// New Topic.
56pub struct NewTopic {
57    /// Name returned by Kafka.
58    pub name: String,
59    /// Num Partitions.
60    pub num_partitions: i32,
61    /// Replication Factor.
62    pub replication_factor: i16,
63    /// Topic-level configuration entries.
64    pub configs: BTreeMap<String, String>,
65}
66
67impl NewTopic {
68    /// Creates a new value.
69    pub fn new(name: impl Into<String>, num_partitions: i32, replication_factor: i16) -> Self {
70        Self {
71            name: name.into(),
72            num_partitions,
73            replication_factor,
74            configs: BTreeMap::new(),
75        }
76    }
77
78    /// Sets config and returns the updated value.
79    pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
80        self.configs.insert(key.into(), value.into());
81        self
82    }
83
84    fn into_request_topic(self) -> Result<CreatableTopic> {
85        let name = validate_topic_name(self.name)?;
86        if self.num_partitions <= 0 {
87            return Err(AdminError::InvalidPartitionCount {
88                partitions: self.num_partitions,
89            }
90            .into());
91        }
92        if self.replication_factor <= 0 {
93            return Err(AdminError::InvalidReplicationFactor {
94                replication_factor: self.replication_factor,
95            }
96            .into());
97        }
98
99        let configs = self
100            .configs
101            .into_iter()
102            .map(|(key, value)| {
103                CreatableTopicConfig::default()
104                    .with_name(StrBytes::from_string(key))
105                    .with_value(Some(StrBytes::from_string(value)))
106            })
107            .collect();
108
109        Ok(CreatableTopic::default()
110            .with_name(StrBytes::from_string(name).into())
111            .with_num_partitions(self.num_partitions)
112            .with_replication_factor(self.replication_factor)
113            .with_configs(configs))
114    }
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
118/// New Partitions.
119pub struct NewPartitions {
120    /// Total Count.
121    pub total_count: i32,
122    /// Assignments.
123    pub assignments: Vec<Vec<i32>>,
124}
125
126impl NewPartitions {
127    /// Increase To.
128    pub fn increase_to(total_count: i32) -> Self {
129        Self {
130            total_count,
131            assignments: Vec::new(),
132        }
133    }
134
135    /// Sets assignment and returns the updated value.
136    pub fn with_assignment<I>(mut self, broker_ids: I) -> Self
137    where
138        I: IntoIterator<Item = i32>,
139    {
140        self.assignments.push(broker_ids.into_iter().collect());
141        self
142    }
143
144    fn into_request_topic(self, topic_name: String) -> Result<CreatePartitionsTopic> {
145        let name = validate_topic_name(topic_name)?;
146        if self.total_count <= 0 {
147            return Err(AdminError::InvalidPartitionCount {
148                partitions: self.total_count,
149            }
150            .into());
151        }
152
153        let assignments = (!self.assignments.is_empty()).then(|| {
154            self.assignments
155                .into_iter()
156                .map(|broker_ids| {
157                    CreatePartitionsAssignment::default()
158                        .with_broker_ids(broker_ids.into_iter().map(Into::into).collect())
159                })
160                .collect()
161        });
162
163        Ok(CreatePartitionsTopic::default()
164            .with_name(StrBytes::from_string(name).into())
165            .with_count(self.total_count)
166            .with_assignments(assignments))
167    }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq)]
171/// Topic Listing.
172pub struct TopicListing {
173    /// Name returned by Kafka.
174    pub name: String,
175    /// Topic Id.
176    pub topic_id: Option<Uuid>,
177    /// Is Internal.
178    pub is_internal: bool,
179}
180
181#[derive(Debug, Clone, PartialEq, Eq)]
182/// Topic Partition Description.
183pub struct TopicPartitionDescription {
184    /// Partition number.
185    pub partition: i32,
186    /// Leader Id.
187    pub leader_id: i32,
188    /// Leader Epoch.
189    pub leader_epoch: i32,
190    /// Replica Nodes.
191    pub replica_nodes: Vec<i32>,
192    /// Isr Nodes.
193    pub isr_nodes: Vec<i32>,
194    /// Offline Replicas.
195    pub offline_replicas: Vec<i32>,
196}
197
198#[derive(Debug, Clone, PartialEq, Eq)]
199/// Topic Description.
200pub struct TopicDescription {
201    /// Name returned by Kafka.
202    pub name: String,
203    /// Topic Id.
204    pub topic_id: Option<Uuid>,
205    /// Is Internal.
206    pub is_internal: bool,
207    /// Partitions.
208    pub partitions: Vec<TopicPartitionDescription>,
209}
210
211#[derive(Debug, Clone, PartialEq, Eq)]
212/// Broker Description.
213pub struct BrokerDescription {
214    /// Broker Id.
215    pub broker_id: i32,
216    /// Host.
217    pub host: String,
218    /// Port.
219    pub port: i32,
220    /// Rack.
221    pub rack: Option<String>,
222    /// Is Fenced.
223    pub is_fenced: bool,
224}
225
226#[derive(Debug, Clone, PartialEq, Eq)]
227/// Cluster Description.
228pub struct ClusterDescription {
229    /// Cluster Id.
230    pub cluster_id: String,
231    /// Controller Id.
232    pub controller_id: i32,
233    /// Brokers.
234    pub brokers: Vec<BrokerDescription>,
235}
236
237#[derive(Debug, Clone, PartialEq, Eq)]
238/// Broker Feature Level.
239pub struct BrokerFeatureLevel {
240    /// Name returned by Kafka.
241    pub name: String,
242    /// Level.
243    pub level: i16,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247/// Consumer Group Listing.
248pub struct ConsumerGroupListing {
249    /// Group Id.
250    pub group_id: String,
251    /// Protocol Type.
252    pub protocol_type: String,
253    /// State.
254    pub state: Option<String>,
255    /// Group Type.
256    pub group_type: Option<String>,
257}
258
259#[derive(Debug, Clone, PartialEq, Eq)]
260/// Consumer Group Description.
261pub struct ConsumerGroupDescription {
262    /// Group Id.
263    pub group_id: String,
264    /// State.
265    pub state: String,
266    /// Protocol Type.
267    pub protocol_type: String,
268    /// Protocol Data.
269    pub protocol_data: String,
270    /// Members.
271    pub members: Vec<ConsumerGroupMemberDescription>,
272    /// Authorized Operations.
273    pub authorized_operations: Option<i32>,
274}
275
276#[derive(Debug, Clone, PartialEq, Eq)]
277/// Consumer Group Member Description.
278pub struct ConsumerGroupMemberDescription {
279    /// Member Id.
280    pub member_id: String,
281    /// Group Instance Id.
282    pub group_instance_id: Option<String>,
283    /// Client Id.
284    pub client_id: String,
285    /// Client Host.
286    pub client_host: String,
287    /// Member Metadata Bytes.
288    pub member_metadata_bytes: usize,
289    /// Member Assignment Bytes.
290    pub member_assignment_bytes: usize,
291}
292
293#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
294/// Config Resource Type.
295pub enum ConfigResourceType {
296    /// Unknown.
297    Unknown,
298    /// Topic name.
299    Topic,
300    /// Broker.
301    Broker,
302    /// Broker logger.
303    BrokerLogger,
304    /// Client metrics.
305    ClientMetrics,
306    /// Group.
307    Group,
308}
309
310impl ConfigResourceType {
311    fn as_protocol_value(self) -> i8 {
312        match self {
313            Self::Unknown => 0,
314            Self::Topic => 2,
315            Self::Broker => 4,
316            Self::BrokerLogger => 8,
317            Self::ClientMetrics => 16,
318            Self::Group => 32,
319        }
320    }
321
322    fn from_protocol_value(value: i8) -> Self {
323        match value {
324            2 => Self::Topic,
325            4 => Self::Broker,
326            8 => Self::BrokerLogger,
327            16 => Self::ClientMetrics,
328            32 => Self::Group,
329            _ => Self::Unknown,
330        }
331    }
332}
333
334#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
335/// Config Resource.
336pub struct ConfigResource {
337    /// Resource Type.
338    pub resource_type: ConfigResourceType,
339    /// Resource Name.
340    pub resource_name: String,
341}
342
343impl ConfigResource {
344    /// Creates a new value.
345    pub fn new(resource_type: ConfigResourceType, resource_name: impl Into<String>) -> Self {
346        Self {
347            resource_type,
348            resource_name: resource_name.into(),
349        }
350    }
351
352    /// Topic name.
353    pub fn topic(resource_name: impl Into<String>) -> Self {
354        Self::new(ConfigResourceType::Topic, resource_name)
355    }
356
357    /// Group.
358    pub fn group(resource_name: impl Into<String>) -> Self {
359        Self::new(ConfigResourceType::Group, resource_name)
360    }
361}
362
363#[derive(Debug, Clone, PartialEq, Eq)]
364/// Config Entry.
365pub struct ConfigEntry {
366    /// Name returned by Kafka.
367    pub name: String,
368    /// Record value.
369    pub value: Option<String>,
370    /// Read Only.
371    pub read_only: bool,
372    /// Config Source.
373    pub config_source: i8,
374    /// Is Sensitive.
375    pub is_sensitive: bool,
376    /// Config Type.
377    pub config_type: Option<i8>,
378    /// Documentation.
379    pub documentation: Option<String>,
380}
381
382#[derive(Debug, Clone, PartialEq, Eq)]
383/// Config Resource Config.
384pub struct ConfigResourceConfig {
385    /// Resource.
386    pub resource: ConfigResource,
387    /// Entries.
388    pub entries: BTreeMap<String, ConfigEntry>,
389}
390
391impl ConfigResourceConfig {
392    /// Entry.
393    pub fn entry(&self, name: &str) -> Option<&ConfigEntry> {
394        self.entries.get(name)
395    }
396}
397
398#[derive(Debug, Clone, Copy, PartialEq, Eq)]
399/// Alter Config Op Type.
400pub enum AlterConfigOpType {
401    /// Set.
402    Set,
403    /// Delete.
404    Delete,
405    /// Append.
406    Append,
407    /// Subtract.
408    Subtract,
409}
410
411impl AlterConfigOpType {
412    fn as_protocol_value(self) -> i8 {
413        match self {
414            Self::Set => 0,
415            Self::Delete => 1,
416            Self::Append => 2,
417            Self::Subtract => 3,
418        }
419    }
420}
421
422#[derive(Debug, Clone, PartialEq, Eq)]
423/// Alter Config Op.
424pub struct AlterConfigOp {
425    /// Name returned by Kafka.
426    pub name: String,
427    /// Op Type.
428    pub op_type: AlterConfigOpType,
429    /// Record value.
430    pub value: Option<String>,
431}
432
433impl AlterConfigOp {
434    /// Set.
435    pub fn set(name: impl Into<String>, value: impl Into<String>) -> Self {
436        Self {
437            name: name.into(),
438            op_type: AlterConfigOpType::Set,
439            value: Some(value.into()),
440        }
441    }
442
443    /// Delete.
444    pub fn delete(name: impl Into<String>) -> Self {
445        Self {
446            name: name.into(),
447            op_type: AlterConfigOpType::Delete,
448            value: None,
449        }
450    }
451}
452
453#[derive(Debug, Clone)]
454/// Client for Kafka admin operations.
455pub struct KafkaAdmin {
456    config: AdminConfig,
457}
458
459impl KafkaAdmin {
460    #[instrument(
461        name = "admin.connect",
462        level = "debug",
463        skip(config),
464        fields(
465            bootstrap_server_count = config.bootstrap_servers.len(),
466            client_id = %config.client_id
467        )
468    )]
469    /// Connects to Kafka and returns the client.
470    pub async fn connect(config: AdminConfig) -> Result<Self> {
471        let admin = Self { config };
472        admin.warm_up().await?;
473        debug!("admin client connected");
474        Ok(admin)
475    }
476
477    #[instrument(name = "admin.create_topics", level = "debug", skip(self, topics))]
478    /// Create Topics.
479    pub async fn create_topics<I>(&self, topics: I) -> Result<()>
480    where
481        I: IntoIterator<Item = NewTopic>,
482    {
483        let topics = topics
484            .into_iter()
485            .map(NewTopic::into_request_topic)
486            .collect::<Result<Vec<_>>>()?;
487        if topics.is_empty() {
488            return Ok(());
489        }
490
491        let request = CreateTopicsRequest::default()
492            .with_topics(topics)
493            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
494            .with_validate_only(false);
495        let response: CreateTopicsResponse = self
496            .send_request::<CreateTopicsRequest>(CREATE_TOPICS_VERSION_CAP, &request)
497            .await?;
498
499        for topic in response.topics {
500            let name = topic.name.0.to_string();
501            if let Some(error) = topic
502                .error_code
503                .err()
504                .filter(|error| !is_ignorable_create_topic_error(*error))
505            {
506                return Err(anyhow!("create topic '{name}' failed: {error}").into());
507            }
508        }
509
510        Ok(())
511    }
512
513    #[instrument(name = "admin.delete_topics", level = "debug", skip(self, topics))]
514    /// Delete Topics.
515    pub async fn delete_topics<I, S>(&self, topics: I) -> Result<()>
516    where
517        I: IntoIterator<Item = S>,
518        S: Into<String>,
519    {
520        let topic_names = topics
521            .into_iter()
522            .map(|topic| validate_topic_name(topic.into()))
523            .collect::<Result<Vec<_>>>()?;
524        if topic_names.is_empty() {
525            return Ok(());
526        }
527
528        let request = DeleteTopicsRequest::default()
529            .with_topic_names(
530                topic_names
531                    .iter()
532                    .cloned()
533                    .map(StrBytes::from_string)
534                    .map(Into::into)
535                    .collect(),
536            )
537            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?);
538        let response: DeleteTopicsResponse = self
539            .send_request::<DeleteTopicsRequest>(DELETE_TOPICS_VERSION_CAP, &request)
540            .await?;
541
542        for topic in response.responses {
543            let name = topic
544                .name
545                .as_ref()
546                .map(|name| name.0.to_string())
547                .unwrap_or_else(|| "<unknown>".to_owned());
548            if let Some(error) = topic.error_code.err() {
549                return Err(anyhow!("delete topic '{name}' failed: {error}").into());
550            }
551        }
552
553        Ok(())
554    }
555
556    #[instrument(
557        name = "admin.create_partitions",
558        level = "debug",
559        skip(self, partitions)
560    )]
561    /// Create Partitions.
562    pub async fn create_partitions<I, S>(&self, partitions: I) -> Result<()>
563    where
564        I: IntoIterator<Item = (S, NewPartitions)>,
565        S: Into<String>,
566    {
567        let topics = partitions
568            .into_iter()
569            .map(|(topic, new_partitions)| new_partitions.into_request_topic(topic.into()))
570            .collect::<Result<Vec<_>>>()?;
571        if topics.is_empty() {
572            return Ok(());
573        }
574
575        let request = CreatePartitionsRequest::default()
576            .with_topics(topics)
577            .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
578            .with_validate_only(false);
579        let response: CreatePartitionsResponse = self
580            .send_request::<CreatePartitionsRequest>(CREATE_PARTITIONS_VERSION_CAP, &request)
581            .await?;
582
583        for topic in response.results {
584            let name = topic.name.0.to_string();
585            if let Some(error) = topic.error_code.err() {
586                return Err(anyhow!(
587                    "create partitions for topic '{name}' failed: {}",
588                    topic
589                        .error_message
590                        .as_ref()
591                        .map(|message| message.to_string())
592                        .filter(|message| !message.is_empty())
593                        .unwrap_or_else(|| error.to_string())
594                )
595                .into());
596            }
597        }
598
599        Ok(())
600    }
601
602    #[instrument(name = "admin.list_topics", level = "debug", skip(self))]
603    /// List Topics.
604    pub async fn list_topics(&self) -> Result<Vec<TopicListing>> {
605        let response = self.fetch_metadata(None).await?;
606        let mut topics = Vec::new();
607
608        for topic in response.topics {
609            let Some(name) = topic.name.as_ref().map(|name| name.0.to_string()) else {
610                continue;
611            };
612            if let Some(error) = topic.error_code.err() {
613                return Err(anyhow!("list topics failed for '{name}': {error}").into());
614            }
615
616            topics.push(TopicListing {
617                name,
618                topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
619                is_internal: topic.is_internal,
620            });
621        }
622
623        topics.sort_by(|left, right| left.name.cmp(&right.name));
624        Ok(topics)
625    }
626
627    #[instrument(name = "admin.describe_topics", level = "debug", skip(self, topics))]
628    /// Describe Topics.
629    pub async fn describe_topics<I, S>(&self, topics: I) -> Result<Vec<TopicDescription>>
630    where
631        I: IntoIterator<Item = S>,
632        S: Into<String>,
633    {
634        let requested_topics = topics
635            .into_iter()
636            .map(|topic| validate_topic_name(topic.into()))
637            .collect::<Result<Vec<_>>>()?;
638        if requested_topics.is_empty() {
639            return Ok(Vec::new());
640        }
641
642        let response = self.fetch_metadata(Some(&requested_topics)).await?;
643        let mut descriptions = BTreeMap::new();
644
645        for topic in response.topics {
646            let name = topic
647                .name
648                .as_ref()
649                .map(|name| name.0.to_string())
650                .unwrap_or_default();
651            if let Some(error) = topic.error_code.err() {
652                let label = if name.is_empty() { "<unknown>" } else { &name };
653                return Err(anyhow!("describe topic '{label}' failed: {error}").into());
654            }
655
656            let mut partitions = topic
657                .partitions
658                .into_iter()
659                .map(|partition| {
660                    if let Some(error) = partition.error_code.err() {
661                        return Err(anyhow!(
662                            "describe topic '{name}' partition {} failed: {error}",
663                            partition.partition_index
664                        ));
665                    }
666
667                    Ok(TopicPartitionDescription {
668                        partition: partition.partition_index,
669                        leader_id: partition.leader_id.0,
670                        leader_epoch: partition.leader_epoch,
671                        replica_nodes: partition.replica_nodes.into_iter().map(|id| id.0).collect(),
672                        isr_nodes: partition.isr_nodes.into_iter().map(|id| id.0).collect(),
673                        offline_replicas: partition
674                            .offline_replicas
675                            .into_iter()
676                            .map(|id| id.0)
677                            .collect(),
678                    })
679                })
680                .collect::<std::result::Result<Vec<_>, _>>()?;
681            partitions.sort_by_key(|partition| partition.partition);
682
683            descriptions.insert(
684                name.clone(),
685                TopicDescription {
686                    name,
687                    topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
688                    is_internal: topic.is_internal,
689                    partitions,
690                },
691            );
692        }
693
694        requested_topics
695            .into_iter()
696            .map(|topic| {
697                descriptions.remove(&topic).ok_or_else(|| {
698                    anyhow!("metadata response did not include topic '{topic}'").into()
699                })
700            })
701            .collect()
702    }
703
704    #[instrument(name = "admin.describe_cluster", level = "debug", skip(self))]
705    /// Describe Cluster.
706    pub async fn describe_cluster(&self) -> Result<ClusterDescription> {
707        let (mut connection, version) = self
708            .connect_with_version::<DescribeClusterRequest>(DESCRIBE_CLUSTER_VERSION_CAP)
709            .await?;
710        let mut request =
711            DescribeClusterRequest::default().with_include_cluster_authorized_operations(false);
712        if version >= 1 {
713            request = request.with_endpoint_type(1);
714        }
715        if version >= 2 {
716            request = request.with_include_fenced_brokers(true);
717        }
718
719        let response: DescribeClusterResponse = connection
720            .send_request::<DescribeClusterRequest>(&self.config.client_id, version, &request)
721            .await?;
722        if let Some(error) = response.error_code.err() {
723            return Err(anyhow!("describe cluster failed: {error}").into());
724        }
725
726        let mut brokers = response
727            .brokers
728            .into_iter()
729            .map(|broker| BrokerDescription {
730                broker_id: broker.broker_id.0,
731                host: broker.host.to_string(),
732                port: broker.port,
733                rack: broker.rack.map(|rack| rack.to_string()),
734                is_fenced: broker.is_fenced,
735            })
736            .collect::<Vec<_>>();
737        brokers.sort_by_key(|broker| broker.broker_id);
738
739        Ok(ClusterDescription {
740            cluster_id: response.cluster_id.to_string(),
741            controller_id: response.controller_id.0,
742            brokers,
743        })
744    }
745
746    #[instrument(name = "admin.list_consumer_groups", level = "debug", skip(self))]
747    /// List Consumer Groups.
748    pub async fn list_consumer_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
749        let response: ListGroupsResponse = self
750            .send_request::<ListGroupsRequest>(
751                LIST_GROUPS_VERSION_CAP,
752                &ListGroupsRequest::default(),
753            )
754            .await?;
755        if let Some(error) = response.error_code.err() {
756            return Err(anyhow!("list consumer groups failed: {error}").into());
757        }
758
759        let mut groups = response
760            .groups
761            .into_iter()
762            .map(|group| ConsumerGroupListing {
763                group_id: group.group_id.to_string(),
764                protocol_type: group.protocol_type.to_string(),
765                state: (!group.group_state.is_empty()).then(|| group.group_state.to_string()),
766                group_type: (!group.group_type.is_empty()).then(|| group.group_type.to_string()),
767            })
768            .collect::<Vec<_>>();
769        groups.sort_by(|left, right| left.group_id.cmp(&right.group_id));
770        Ok(groups)
771    }
772
773    #[instrument(
774        name = "admin.describe_consumer_groups",
775        level = "debug",
776        skip(self, groups)
777    )]
778    /// Describe Consumer Groups.
779    pub async fn describe_consumer_groups<I, S>(
780        &self,
781        groups: I,
782    ) -> Result<Vec<ConsumerGroupDescription>>
783    where
784        I: IntoIterator<Item = S>,
785        S: Into<String>,
786    {
787        let group_ids = groups
788            .into_iter()
789            .map(|group| validate_group_id(group.into()))
790            .collect::<Result<Vec<_>>>()?;
791        if group_ids.is_empty() {
792            return Ok(Vec::new());
793        }
794
795        let request = ConsumerGroupDescribeRequest::default()
796            .with_group_ids(
797                group_ids
798                    .iter()
799                    .cloned()
800                    .map(StrBytes::from_string)
801                    .map(Into::into)
802                    .collect(),
803            )
804            .with_include_authorized_operations(false);
805        let response: ConsumerGroupDescribeResponse = self
806            .send_request::<ConsumerGroupDescribeRequest>(
807                CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
808                &request,
809            )
810            .await?;
811
812        let mut descriptions = BTreeMap::new();
813        for group in response.groups {
814            let group_id = group.group_id.to_string();
815            if let Some(error) = group.error_code.err() {
816                let message = group
817                    .error_message
818                    .as_ref()
819                    .map(ToString::to_string)
820                    .filter(|message| !message.is_empty())
821                    .unwrap_or_else(|| error.to_string());
822                return Err(
823                    anyhow!("describe consumer group '{group_id}' failed: {message}").into(),
824                );
825            }
826
827            descriptions.insert(
828                group_id.clone(),
829                ConsumerGroupDescription {
830                    group_id,
831                    state: group.group_state.to_string(),
832                    protocol_type: "consumer".to_owned(),
833                    protocol_data: group.assignor_name.to_string(),
834                    members: group
835                        .members
836                        .into_iter()
837                        .map(|member| ConsumerGroupMemberDescription {
838                            member_id: member.member_id.to_string(),
839                            group_instance_id: member
840                                .instance_id
841                                .map(|instance_id| instance_id.to_string()),
842                            client_id: member.client_id.to_string(),
843                            client_host: member.client_host.to_string(),
844                            member_metadata_bytes: member.subscribed_topic_names.len(),
845                            member_assignment_bytes: assignment_partition_count(&member.assignment),
846                        })
847                        .collect(),
848                    authorized_operations: (group.authorized_operations != i32::MIN)
849                        .then_some(group.authorized_operations),
850                },
851            );
852        }
853
854        group_ids
855            .into_iter()
856            .map(|group_id| {
857                descriptions.remove(&group_id).ok_or_else(|| {
858                    anyhow!("describe groups response did not include group '{group_id}'").into()
859                })
860            })
861            .collect()
862    }
863
864    #[instrument(
865        name = "admin.delete_consumer_groups",
866        level = "debug",
867        skip(self, groups)
868    )]
869    /// Delete Consumer Groups.
870    pub async fn delete_consumer_groups<I, S>(&self, groups: I) -> Result<()>
871    where
872        I: IntoIterator<Item = S>,
873        S: Into<String>,
874    {
875        let group_ids = groups
876            .into_iter()
877            .map(|group| validate_group_id(group.into()))
878            .collect::<Result<Vec<_>>>()?;
879        if group_ids.is_empty() {
880            return Ok(());
881        }
882
883        let request = DeleteGroupsRequest::default().with_groups_names(
884            group_ids
885                .iter()
886                .cloned()
887                .map(StrBytes::from_string)
888                .map(Into::into)
889                .collect(),
890        );
891        let response: DeleteGroupsResponse = self
892            .send_request::<DeleteGroupsRequest>(DELETE_GROUPS_VERSION_CAP, &request)
893            .await?;
894
895        for result in response.results {
896            if let Some(error) = result.error_code.err() {
897                return Err(anyhow!(
898                    "delete consumer group '{}' failed: {error}",
899                    &*result.group_id
900                )
901                .into());
902            }
903        }
904        Ok(())
905    }
906
907    #[instrument(
908        name = "admin.describe_configs",
909        level = "debug",
910        skip(self, resources)
911    )]
912    /// Describe Configs.
913    pub async fn describe_configs<I>(&self, resources: I) -> Result<Vec<ConfigResourceConfig>>
914    where
915        I: IntoIterator<Item = ConfigResource>,
916    {
917        let resources = resources.into_iter().collect::<Vec<_>>();
918        if resources.is_empty() {
919            return Ok(Vec::new());
920        }
921
922        let request = DescribeConfigsRequest::default()
923            .with_resources(
924                resources
925                    .iter()
926                    .map(|resource| {
927                        DescribeConfigsResource::default()
928                            .with_resource_type(resource.resource_type.as_protocol_value())
929                            .with_resource_name(StrBytes::from_string(
930                                resource.resource_name.clone(),
931                            ))
932                            .with_configuration_keys(None)
933                    })
934                    .collect(),
935            )
936            .with_include_synonyms(false);
937        let response: DescribeConfigsResponse = self
938            .send_request::<DescribeConfigsRequest>(DESCRIBE_CONFIGS_VERSION_CAP, &request)
939            .await?;
940
941        let mut described = BTreeMap::new();
942        for resource in response.results {
943            let resource_type = ConfigResourceType::from_protocol_value(resource.resource_type);
944            let resource_name = resource.resource_name.to_string();
945            if let Some(error) = resource.error_code.err() {
946                return Err(anyhow!(
947                    "describe configs for {:?} '{}' failed: {}",
948                    resource_type,
949                    resource_name,
950                    resource
951                        .error_message
952                        .as_ref()
953                        .map(|message| message.to_string())
954                        .filter(|message| !message.is_empty())
955                        .unwrap_or_else(|| error.to_string())
956                )
957                .into());
958            }
959
960            let entries = resource
961                .configs
962                .into_iter()
963                .map(|entry| {
964                    let name = entry.name.to_string();
965                    let config_entry = ConfigEntry {
966                        name: name.clone(),
967                        value: entry.value.map(|value| value.to_string()),
968                        read_only: entry.read_only,
969                        config_source: entry.config_source,
970                        is_sensitive: entry.is_sensitive,
971                        config_type: (response_supported_config_type(entry.config_type))
972                            .then_some(entry.config_type),
973                        documentation: entry.documentation.map(|doc| doc.to_string()),
974                    };
975                    (name, config_entry)
976                })
977                .collect();
978
979            described.insert(
980                (resource_type, resource_name.clone()),
981                ConfigResourceConfig {
982                    resource: ConfigResource::new(resource_type, resource_name),
983                    entries,
984                },
985            );
986        }
987
988        resources
989            .into_iter()
990            .map(|resource| {
991                described
992                    .remove(&(resource.resource_type, resource.resource_name.clone()))
993                    .ok_or_else(|| {
994                        anyhow!("describe configs response did not include {:?}", resource).into()
995                    })
996            })
997            .collect()
998    }
999
1000    #[instrument(
1001        name = "admin.incremental_alter_configs",
1002        level = "debug",
1003        skip(self, resources)
1004    )]
1005    /// Incremental Alter Configs.
1006    pub async fn incremental_alter_configs<I>(&self, resources: I) -> Result<()>
1007    where
1008        I: IntoIterator<Item = (ConfigResource, Vec<AlterConfigOp>)>,
1009    {
1010        let resources = resources
1011            .into_iter()
1012            .map(|(resource, ops)| {
1013                AlterConfigsResource::default()
1014                    .with_resource_type(resource.resource_type.as_protocol_value())
1015                    .with_resource_name(StrBytes::from_string(resource.resource_name))
1016                    .with_configs(
1017                        ops.into_iter()
1018                            .map(|op| {
1019                                AlterableConfig::default()
1020                                    .with_name(StrBytes::from_string(op.name))
1021                                    .with_config_operation(op.op_type.as_protocol_value())
1022                                    .with_value(op.value.map(StrBytes::from_string))
1023                            })
1024                            .collect(),
1025                    )
1026            })
1027            .collect::<Vec<_>>();
1028        if resources.is_empty() {
1029            return Ok(());
1030        }
1031
1032        let request = IncrementalAlterConfigsRequest::default()
1033            .with_resources(resources)
1034            .with_validate_only(false);
1035        let response: IncrementalAlterConfigsResponse = self
1036            .send_request::<IncrementalAlterConfigsRequest>(
1037                INCREMENTAL_ALTER_CONFIGS_VERSION_CAP,
1038                &request,
1039            )
1040            .await?;
1041
1042        for resource in response.responses {
1043            if let Some(error) = resource.error_code.err() {
1044                return Err(anyhow!(
1045                    "incremental alter configs for {:?} '{}' failed: {}",
1046                    ConfigResourceType::from_protocol_value(resource.resource_type),
1047                    resource.resource_name,
1048                    resource
1049                        .error_message
1050                        .as_ref()
1051                        .map(|message| message.to_string())
1052                        .filter(|message| !message.is_empty())
1053                        .unwrap_or_else(|| error.to_string())
1054                )
1055                .into());
1056            }
1057        }
1058
1059        Ok(())
1060    }
1061
1062    #[instrument(
1063        name = "admin.upsert_scram_credential",
1064        level = "debug",
1065        skip(self, user, password)
1066    )]
1067    /// Upsert Scram Credential.
1068    pub async fn upsert_scram_credential(
1069        &self,
1070        user: impl Into<String>,
1071        mechanism: SaslMechanism,
1072        password: impl AsRef<[u8]>,
1073    ) -> Result<()> {
1074        self.upsert_scram_credential_with_iterations(
1075            user,
1076            mechanism,
1077            password,
1078            scram::MIN_ITERATIONS,
1079        )
1080        .await
1081    }
1082
1083    #[instrument(
1084        name = "admin.upsert_scram_credential_with_iterations",
1085        level = "debug",
1086        skip(self, user, password)
1087    )]
1088    /// Upsert Scram Credential With Iterations.
1089    pub async fn upsert_scram_credential_with_iterations(
1090        &self,
1091        user: impl Into<String>,
1092        mechanism: SaslMechanism,
1093        password: impl AsRef<[u8]>,
1094        iterations: i32,
1095    ) -> Result<()> {
1096        let user = user.into();
1097        let mechanism_type = mechanism
1098            .scram_type()
1099            .ok_or_else(|| anyhow!("SCRAM credential upsertion requires a SCRAM mechanism"))?;
1100        let salt = scram::secure_random_bytes()?;
1101        let salted_password =
1102            scram::salted_password(mechanism, password.as_ref(), &salt, iterations)?;
1103        let request = AlterUserScramCredentialsRequest::default().with_upsertions(vec![
1104            ScramCredentialUpsertion::default()
1105                .with_name(StrBytes::from_string(user.clone()))
1106                .with_mechanism(mechanism_type)
1107                .with_iterations(iterations)
1108                .with_salt(Bytes::from(salt))
1109                .with_salted_password(Bytes::from(salted_password)),
1110        ]);
1111        let response: AlterUserScramCredentialsResponse = self
1112            .send_request::<AlterUserScramCredentialsRequest>(
1113                ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP,
1114                &request,
1115            )
1116            .await?;
1117
1118        for result in response.results {
1119            if let Some(error) = result.error_code.err() {
1120                return Err(anyhow!(
1121                    "alter SCRAM credential for user '{}' failed: {}",
1122                    result.user,
1123                    result
1124                        .error_message
1125                        .as_ref()
1126                        .map(|message| message.to_string())
1127                        .filter(|message| !message.is_empty())
1128                        .unwrap_or_else(|| error.to_string())
1129                )
1130                .into());
1131            }
1132        }
1133
1134        Ok(())
1135    }
1136
1137    /// Config.
1138    pub fn config(&self) -> &AdminConfig {
1139        &self.config
1140    }
1141
1142    /// Finalized Feature Levels.
1143    pub async fn finalized_feature_levels(&self) -> Result<Vec<BrokerFeatureLevel>> {
1144        let connection = connect_to_any_bootstrap(
1145            &self.config.bootstrap_servers,
1146            &self.config.client_id,
1147            self.config.request_timeout,
1148            self.config.security_protocol,
1149            &self.config.tls,
1150            &self.config.sasl,
1151        )
1152        .await?;
1153        Ok(connection
1154            .finalized_feature_levels()
1155            .into_iter()
1156            .map(|(name, level)| BrokerFeatureLevel { name, level })
1157            .collect())
1158    }
1159
1160    async fn warm_up(&self) -> Result<()> {
1161        let _ = connect_to_any_bootstrap(
1162            &self.config.bootstrap_servers,
1163            &self.config.client_id,
1164            self.config.request_timeout,
1165            self.config.security_protocol,
1166            &self.config.tls,
1167            &self.config.sasl,
1168        )
1169        .await?;
1170        Ok(())
1171    }
1172
1173    async fn fetch_metadata(&self, topics: Option<&[String]>) -> Result<MetadataResponse> {
1174        let (mut connection, version) = self
1175            .connect_with_version::<MetadataRequest>(METADATA_VERSION_CAP)
1176            .await?;
1177        let request = MetadataRequest::default()
1178            .with_topics(topics.map(|topics| {
1179                topics
1180                    .iter()
1181                    .cloned()
1182                    .map(StrBytes::from_string)
1183                    .map(|name| MetadataRequestTopic::default().with_name(Some(name.into())))
1184                    .collect()
1185            }))
1186            .with_allow_auto_topic_creation(false)
1187            .with_include_cluster_authorized_operations(false)
1188            .with_include_topic_authorized_operations(false);
1189        Ok(connection
1190            .send_request::<MetadataRequest>(&self.config.client_id, version, &request)
1191            .await?)
1192    }
1193
1194    async fn send_request<Req>(&self, version_cap: i16, request: &Req) -> Result<Req::Response>
1195    where
1196        Req: Request,
1197    {
1198        let (mut connection, version) = self.connect_with_version::<Req>(version_cap).await?;
1199        Ok(connection
1200            .send_request::<Req>(&self.config.client_id, version, request)
1201            .await?)
1202    }
1203
1204    async fn connect_with_version<Req>(&self, version_cap: i16) -> Result<(BrokerConnection, i16)>
1205    where
1206        Req: Request,
1207    {
1208        let connection = connect_to_any_bootstrap(
1209            &self.config.bootstrap_servers,
1210            &self.config.client_id,
1211            self.config.request_timeout,
1212            self.config.security_protocol,
1213            &self.config.tls,
1214            &self.config.sasl,
1215        )
1216        .await?;
1217        let version = connection.version_with_cap::<Req>(version_cap)?;
1218        Ok((connection, version))
1219    }
1220}
1221
1222fn validate_topic_name(topic: String) -> Result<String> {
1223    let topic = topic.trim();
1224    if topic.is_empty() {
1225        return Err(AdminError::EmptyTopicName.into());
1226    }
1227
1228    Ok(topic.to_owned())
1229}
1230
1231fn validate_group_id(group_id: String) -> Result<String> {
1232    let group_id = group_id.trim();
1233    if group_id.is_empty() {
1234        return Err(anyhow!("consumer group id cannot be empty").into());
1235    }
1236
1237    Ok(group_id.to_owned())
1238}
1239
1240fn assignment_partition_count(
1241    assignment: &kafka_protocol::messages::consumer_group_describe_response::Assignment,
1242) -> usize {
1243    assignment
1244        .topic_partitions
1245        .iter()
1246        .map(|topic| topic.partitions.len())
1247        .sum()
1248}
1249
1250fn is_ignorable_create_topic_error(error: ResponseError) -> bool {
1251    error == ResponseError::TopicAlreadyExists
1252}
1253
1254fn response_supported_config_type(config_type: i8) -> bool {
1255    config_type >= 0
1256}
1257
1258#[cfg(test)]
1259mod tests {
1260    use super::*;
1261
1262    #[test]
1263    fn new_topic_maps_to_create_topics_request() {
1264        let topic = NewTopic::new("orders", 3, 2)
1265            .with_config("cleanup.policy", "compact")
1266            .into_request_topic()
1267            .expect("topic should be valid");
1268
1269        assert_eq!(topic.name.0.to_string(), "orders");
1270        assert_eq!(topic.num_partitions, 3);
1271        assert_eq!(topic.replication_factor, 2);
1272        assert_eq!(topic.configs.len(), 1);
1273        assert_eq!(topic.configs[0].name.to_string(), "cleanup.policy");
1274    }
1275
1276    #[test]
1277    fn new_topic_rejects_invalid_partition_count() {
1278        let error = NewTopic::new("orders", 0, 1)
1279            .into_request_topic()
1280            .expect_err("invalid topic should fail");
1281        assert!(
1282            error
1283                .to_string()
1284                .contains("topic partition count must be positive")
1285        );
1286    }
1287
1288    #[test]
1289    fn new_topic_rejects_empty_names_and_invalid_replication_factor() {
1290        let error = NewTopic::new("  ", 1, 1).into_request_topic().unwrap_err();
1291        assert!(matches!(
1292            error,
1293            crate::Error::Admin(AdminError::EmptyTopicName)
1294        ));
1295
1296        let error = NewTopic::new("orders", 1, 0)
1297            .into_request_topic()
1298            .unwrap_err();
1299        assert!(matches!(
1300            error,
1301            crate::Error::Admin(AdminError::InvalidReplicationFactor {
1302                replication_factor: 0
1303            })
1304        ));
1305    }
1306
1307    #[test]
1308    fn new_partitions_maps_assignments_and_rejects_invalid_input() {
1309        let topic = NewPartitions::increase_to(4)
1310            .with_assignment([1, 2])
1311            .with_assignment([2, 3])
1312            .into_request_topic("orders".to_owned())
1313            .unwrap();
1314        assert_eq!(topic.name.to_string(), "orders");
1315        assert_eq!(topic.count, 4);
1316        assert_eq!(topic.assignments.unwrap().len(), 2);
1317
1318        let error = NewPartitions::increase_to(0)
1319            .into_request_topic("orders".to_owned())
1320            .unwrap_err();
1321        assert!(matches!(
1322            error,
1323            crate::Error::Admin(AdminError::InvalidPartitionCount { partitions: 0 })
1324        ));
1325
1326        let error = NewPartitions::increase_to(1)
1327            .into_request_topic(" ".to_owned())
1328            .unwrap_err();
1329        assert!(matches!(
1330            error,
1331            crate::Error::Admin(AdminError::EmptyTopicName)
1332        ));
1333    }
1334
1335    #[test]
1336    fn config_resource_and_operation_protocol_values_are_stable() {
1337        assert_eq!(ConfigResourceType::Unknown.as_protocol_value(), 0);
1338        assert_eq!(ConfigResourceType::Topic.as_protocol_value(), 2);
1339        assert_eq!(ConfigResourceType::Broker.as_protocol_value(), 4);
1340        assert_eq!(ConfigResourceType::BrokerLogger.as_protocol_value(), 8);
1341        assert_eq!(ConfigResourceType::ClientMetrics.as_protocol_value(), 16);
1342        assert_eq!(ConfigResourceType::Group.as_protocol_value(), 32);
1343        assert_eq!(
1344            ConfigResourceType::from_protocol_value(2),
1345            ConfigResourceType::Topic
1346        );
1347        assert_eq!(
1348            ConfigResourceType::from_protocol_value(99),
1349            ConfigResourceType::Unknown
1350        );
1351
1352        assert_eq!(
1353            AlterConfigOp::set("cleanup.policy", "compact")
1354                .op_type
1355                .as_protocol_value(),
1356            0
1357        );
1358        assert_eq!(
1359            AlterConfigOp::delete("retention.ms")
1360                .op_type
1361                .as_protocol_value(),
1362            1
1363        );
1364        assert_eq!(AlterConfigOpType::Append.as_protocol_value(), 2);
1365        assert_eq!(AlterConfigOpType::Subtract.as_protocol_value(), 3);
1366        assert!(response_supported_config_type(0));
1367        assert!(!response_supported_config_type(-1));
1368    }
1369
1370    #[test]
1371    fn config_resource_config_entry_lookup_returns_named_entry() {
1372        let mut entries = BTreeMap::new();
1373        entries.insert(
1374            "cleanup.policy".to_owned(),
1375            ConfigEntry {
1376                name: "cleanup.policy".to_owned(),
1377                value: Some("compact".to_owned()),
1378                read_only: false,
1379                config_source: 1,
1380                is_sensitive: false,
1381                config_type: Some(2),
1382                documentation: None,
1383            },
1384        );
1385        let config = ConfigResourceConfig {
1386            resource: ConfigResource::topic("orders"),
1387            entries,
1388        };
1389
1390        assert_eq!(
1391            config.entry("cleanup.policy").unwrap().value.as_deref(),
1392            Some("compact")
1393        );
1394        assert!(config.entry("missing").is_none());
1395    }
1396
1397    #[test]
1398    fn topic_already_exists_is_ignorable() {
1399        assert!(is_ignorable_create_topic_error(
1400            ResponseError::TopicAlreadyExists
1401        ));
1402        assert!(!is_ignorable_create_topic_error(
1403            ResponseError::UnknownTopicOrPartition
1404        ));
1405    }
1406}