1use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use bytes::Bytes;
19use kafka_protocol::error::{ParseResponseErrorCode, ResponseError};
20use kafka_protocol::messages::alter_user_scram_credentials_request::ScramCredentialUpsertion;
21use kafka_protocol::messages::create_partitions_request::{
22 CreatePartitionsAssignment, CreatePartitionsTopic,
23};
24use kafka_protocol::messages::create_topics_request::{CreatableTopic, CreatableTopicConfig};
25use kafka_protocol::messages::describe_configs_request::DescribeConfigsResource;
26use kafka_protocol::messages::incremental_alter_configs_request::{
27 AlterConfigsResource, AlterableConfig,
28};
29use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
30use kafka_protocol::messages::{
31 AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse,
32 ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, CreatePartitionsRequest,
33 CreatePartitionsResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteGroupsRequest,
34 DeleteGroupsResponse, DeleteTopicsRequest, DeleteTopicsResponse, DescribeClusterRequest,
35 DescribeClusterResponse, DescribeConfigsRequest, DescribeConfigsResponse,
36 IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse, ListGroupsRequest,
37 ListGroupsResponse, MetadataRequest, MetadataResponse,
38};
39use kafka_protocol::protocol::{Request, StrBytes};
40use tracing::{debug, instrument};
41use uuid::Uuid;
42
43use crate::config::{AdminConfig, SaslMechanism};
44use crate::constants::{
45 ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP, CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
46 CREATE_PARTITIONS_VERSION_CAP, CREATE_TOPICS_VERSION_CAP, DELETE_GROUPS_VERSION_CAP,
47 DELETE_TOPICS_VERSION_CAP, DESCRIBE_CLUSTER_VERSION_CAP, DESCRIBE_CONFIGS_VERSION_CAP,
48 INCREMENTAL_ALTER_CONFIGS_VERSION_CAP, LIST_GROUPS_VERSION_CAP, METADATA_VERSION_CAP,
49};
50use crate::network::scram;
51use crate::network::{BrokerConnection, connect_to_any_bootstrap, duration_to_i32_ms};
52use crate::{AdminError, Result};
53
54#[derive(Debug, Clone)]
55pub struct NewTopic {
57 pub name: String,
59 pub num_partitions: i32,
61 pub replication_factor: i16,
63 pub configs: BTreeMap<String, String>,
65}
66
67impl NewTopic {
68 pub fn new(name: impl Into<String>, num_partitions: i32, replication_factor: i16) -> Self {
70 Self {
71 name: name.into(),
72 num_partitions,
73 replication_factor,
74 configs: BTreeMap::new(),
75 }
76 }
77
78 pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
80 self.configs.insert(key.into(), value.into());
81 self
82 }
83
84 fn into_request_topic(self) -> Result<CreatableTopic> {
85 let name = validate_topic_name(self.name)?;
86 if self.num_partitions <= 0 {
87 return Err(AdminError::InvalidPartitionCount {
88 partitions: self.num_partitions,
89 }
90 .into());
91 }
92 if self.replication_factor <= 0 {
93 return Err(AdminError::InvalidReplicationFactor {
94 replication_factor: self.replication_factor,
95 }
96 .into());
97 }
98
99 let configs = self
100 .configs
101 .into_iter()
102 .map(|(key, value)| {
103 CreatableTopicConfig::default()
104 .with_name(StrBytes::from_string(key))
105 .with_value(Some(StrBytes::from_string(value)))
106 })
107 .collect();
108
109 Ok(CreatableTopic::default()
110 .with_name(StrBytes::from_string(name).into())
111 .with_num_partitions(self.num_partitions)
112 .with_replication_factor(self.replication_factor)
113 .with_configs(configs))
114 }
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
118pub struct NewPartitions {
120 pub total_count: i32,
122 pub assignments: Vec<Vec<i32>>,
124}
125
126impl NewPartitions {
127 pub fn increase_to(total_count: i32) -> Self {
129 Self {
130 total_count,
131 assignments: Vec::new(),
132 }
133 }
134
135 pub fn with_assignment<I>(mut self, broker_ids: I) -> Self
137 where
138 I: IntoIterator<Item = i32>,
139 {
140 self.assignments.push(broker_ids.into_iter().collect());
141 self
142 }
143
144 fn into_request_topic(self, topic_name: String) -> Result<CreatePartitionsTopic> {
145 let name = validate_topic_name(topic_name)?;
146 if self.total_count <= 0 {
147 return Err(AdminError::InvalidPartitionCount {
148 partitions: self.total_count,
149 }
150 .into());
151 }
152
153 let assignments = (!self.assignments.is_empty()).then(|| {
154 self.assignments
155 .into_iter()
156 .map(|broker_ids| {
157 CreatePartitionsAssignment::default()
158 .with_broker_ids(broker_ids.into_iter().map(Into::into).collect())
159 })
160 .collect()
161 });
162
163 Ok(CreatePartitionsTopic::default()
164 .with_name(StrBytes::from_string(name).into())
165 .with_count(self.total_count)
166 .with_assignments(assignments))
167 }
168}
169
170#[derive(Debug, Clone, PartialEq, Eq)]
171pub struct TopicListing {
173 pub name: String,
175 pub topic_id: Option<Uuid>,
177 pub is_internal: bool,
179}
180
181#[derive(Debug, Clone, PartialEq, Eq)]
182pub struct TopicPartitionDescription {
184 pub partition: i32,
186 pub leader_id: i32,
188 pub leader_epoch: i32,
190 pub replica_nodes: Vec<i32>,
192 pub isr_nodes: Vec<i32>,
194 pub offline_replicas: Vec<i32>,
196}
197
198#[derive(Debug, Clone, PartialEq, Eq)]
199pub struct TopicDescription {
201 pub name: String,
203 pub topic_id: Option<Uuid>,
205 pub is_internal: bool,
207 pub partitions: Vec<TopicPartitionDescription>,
209}
210
211#[derive(Debug, Clone, PartialEq, Eq)]
212pub struct BrokerDescription {
214 pub broker_id: i32,
216 pub host: String,
218 pub port: i32,
220 pub rack: Option<String>,
222 pub is_fenced: bool,
224}
225
226#[derive(Debug, Clone, PartialEq, Eq)]
227pub struct ClusterDescription {
229 pub cluster_id: String,
231 pub controller_id: i32,
233 pub brokers: Vec<BrokerDescription>,
235}
236
237#[derive(Debug, Clone, PartialEq, Eq)]
238pub struct BrokerFeatureLevel {
240 pub name: String,
242 pub level: i16,
244}
245
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub struct ConsumerGroupListing {
249 pub group_id: String,
251 pub protocol_type: String,
253 pub state: Option<String>,
255 pub group_type: Option<String>,
257}
258
259#[derive(Debug, Clone, PartialEq, Eq)]
260pub struct ConsumerGroupDescription {
262 pub group_id: String,
264 pub state: String,
266 pub protocol_type: String,
268 pub protocol_data: String,
270 pub members: Vec<ConsumerGroupMemberDescription>,
272 pub authorized_operations: Option<i32>,
274}
275
276#[derive(Debug, Clone, PartialEq, Eq)]
277pub struct ConsumerGroupMemberDescription {
279 pub member_id: String,
281 pub group_instance_id: Option<String>,
283 pub client_id: String,
285 pub client_host: String,
287 pub member_metadata_bytes: usize,
289 pub member_assignment_bytes: usize,
291}
292
293#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
294pub enum ConfigResourceType {
296 Unknown,
298 Topic,
300 Broker,
302 BrokerLogger,
304 ClientMetrics,
306 Group,
308}
309
310impl ConfigResourceType {
311 fn as_protocol_value(self) -> i8 {
312 match self {
313 Self::Unknown => 0,
314 Self::Topic => 2,
315 Self::Broker => 4,
316 Self::BrokerLogger => 8,
317 Self::ClientMetrics => 16,
318 Self::Group => 32,
319 }
320 }
321
322 fn from_protocol_value(value: i8) -> Self {
323 match value {
324 2 => Self::Topic,
325 4 => Self::Broker,
326 8 => Self::BrokerLogger,
327 16 => Self::ClientMetrics,
328 32 => Self::Group,
329 _ => Self::Unknown,
330 }
331 }
332}
333
334#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
335pub struct ConfigResource {
337 pub resource_type: ConfigResourceType,
339 pub resource_name: String,
341}
342
343impl ConfigResource {
344 pub fn new(resource_type: ConfigResourceType, resource_name: impl Into<String>) -> Self {
346 Self {
347 resource_type,
348 resource_name: resource_name.into(),
349 }
350 }
351
352 pub fn topic(resource_name: impl Into<String>) -> Self {
354 Self::new(ConfigResourceType::Topic, resource_name)
355 }
356
357 pub fn group(resource_name: impl Into<String>) -> Self {
359 Self::new(ConfigResourceType::Group, resource_name)
360 }
361}
362
363#[derive(Debug, Clone, PartialEq, Eq)]
364pub struct ConfigEntry {
366 pub name: String,
368 pub value: Option<String>,
370 pub read_only: bool,
372 pub config_source: i8,
374 pub is_sensitive: bool,
376 pub config_type: Option<i8>,
378 pub documentation: Option<String>,
380}
381
382#[derive(Debug, Clone, PartialEq, Eq)]
383pub struct ConfigResourceConfig {
385 pub resource: ConfigResource,
387 pub entries: BTreeMap<String, ConfigEntry>,
389}
390
391impl ConfigResourceConfig {
392 pub fn entry(&self, name: &str) -> Option<&ConfigEntry> {
394 self.entries.get(name)
395 }
396}
397
398#[derive(Debug, Clone, Copy, PartialEq, Eq)]
399pub enum AlterConfigOpType {
401 Set,
403 Delete,
405 Append,
407 Subtract,
409}
410
411impl AlterConfigOpType {
412 fn as_protocol_value(self) -> i8 {
413 match self {
414 Self::Set => 0,
415 Self::Delete => 1,
416 Self::Append => 2,
417 Self::Subtract => 3,
418 }
419 }
420}
421
422#[derive(Debug, Clone, PartialEq, Eq)]
423pub struct AlterConfigOp {
425 pub name: String,
427 pub op_type: AlterConfigOpType,
429 pub value: Option<String>,
431}
432
433impl AlterConfigOp {
434 pub fn set(name: impl Into<String>, value: impl Into<String>) -> Self {
436 Self {
437 name: name.into(),
438 op_type: AlterConfigOpType::Set,
439 value: Some(value.into()),
440 }
441 }
442
443 pub fn delete(name: impl Into<String>) -> Self {
445 Self {
446 name: name.into(),
447 op_type: AlterConfigOpType::Delete,
448 value: None,
449 }
450 }
451}
452
453#[derive(Debug, Clone)]
454pub struct KafkaAdmin {
456 config: AdminConfig,
457}
458
459impl KafkaAdmin {
460 #[instrument(
461 name = "admin.connect",
462 level = "debug",
463 skip(config),
464 fields(
465 bootstrap_server_count = config.bootstrap_servers.len(),
466 client_id = %config.client_id
467 )
468 )]
469 pub async fn connect(config: AdminConfig) -> Result<Self> {
471 let admin = Self { config };
472 admin.warm_up().await?;
473 debug!("admin client connected");
474 Ok(admin)
475 }
476
477 #[instrument(name = "admin.create_topics", level = "debug", skip(self, topics))]
478 pub async fn create_topics<I>(&self, topics: I) -> Result<()>
480 where
481 I: IntoIterator<Item = NewTopic>,
482 {
483 let topics = topics
484 .into_iter()
485 .map(NewTopic::into_request_topic)
486 .collect::<Result<Vec<_>>>()?;
487 if topics.is_empty() {
488 return Ok(());
489 }
490
491 let request = CreateTopicsRequest::default()
492 .with_topics(topics)
493 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
494 .with_validate_only(false);
495 let response: CreateTopicsResponse = self
496 .send_request::<CreateTopicsRequest>(CREATE_TOPICS_VERSION_CAP, &request)
497 .await?;
498
499 for topic in response.topics {
500 let name = topic.name.0.to_string();
501 if let Some(error) = topic
502 .error_code
503 .err()
504 .filter(|error| !is_ignorable_create_topic_error(*error))
505 {
506 return Err(anyhow!("create topic '{name}' failed: {error}").into());
507 }
508 }
509
510 Ok(())
511 }
512
513 #[instrument(name = "admin.delete_topics", level = "debug", skip(self, topics))]
514 pub async fn delete_topics<I, S>(&self, topics: I) -> Result<()>
516 where
517 I: IntoIterator<Item = S>,
518 S: Into<String>,
519 {
520 let topic_names = topics
521 .into_iter()
522 .map(|topic| validate_topic_name(topic.into()))
523 .collect::<Result<Vec<_>>>()?;
524 if topic_names.is_empty() {
525 return Ok(());
526 }
527
528 let request = DeleteTopicsRequest::default()
529 .with_topic_names(
530 topic_names
531 .iter()
532 .cloned()
533 .map(StrBytes::from_string)
534 .map(Into::into)
535 .collect(),
536 )
537 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?);
538 let response: DeleteTopicsResponse = self
539 .send_request::<DeleteTopicsRequest>(DELETE_TOPICS_VERSION_CAP, &request)
540 .await?;
541
542 for topic in response.responses {
543 let name = topic
544 .name
545 .as_ref()
546 .map(|name| name.0.to_string())
547 .unwrap_or_else(|| "<unknown>".to_owned());
548 if let Some(error) = topic.error_code.err() {
549 return Err(anyhow!("delete topic '{name}' failed: {error}").into());
550 }
551 }
552
553 Ok(())
554 }
555
556 #[instrument(
557 name = "admin.create_partitions",
558 level = "debug",
559 skip(self, partitions)
560 )]
561 pub async fn create_partitions<I, S>(&self, partitions: I) -> Result<()>
563 where
564 I: IntoIterator<Item = (S, NewPartitions)>,
565 S: Into<String>,
566 {
567 let topics = partitions
568 .into_iter()
569 .map(|(topic, new_partitions)| new_partitions.into_request_topic(topic.into()))
570 .collect::<Result<Vec<_>>>()?;
571 if topics.is_empty() {
572 return Ok(());
573 }
574
575 let request = CreatePartitionsRequest::default()
576 .with_topics(topics)
577 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
578 .with_validate_only(false);
579 let response: CreatePartitionsResponse = self
580 .send_request::<CreatePartitionsRequest>(CREATE_PARTITIONS_VERSION_CAP, &request)
581 .await?;
582
583 for topic in response.results {
584 let name = topic.name.0.to_string();
585 if let Some(error) = topic.error_code.err() {
586 return Err(anyhow!(
587 "create partitions for topic '{name}' failed: {}",
588 topic
589 .error_message
590 .as_ref()
591 .map(|message| message.to_string())
592 .filter(|message| !message.is_empty())
593 .unwrap_or_else(|| error.to_string())
594 )
595 .into());
596 }
597 }
598
599 Ok(())
600 }
601
602 #[instrument(name = "admin.list_topics", level = "debug", skip(self))]
603 pub async fn list_topics(&self) -> Result<Vec<TopicListing>> {
605 let response = self.fetch_metadata(None).await?;
606 let mut topics = Vec::new();
607
608 for topic in response.topics {
609 let Some(name) = topic.name.as_ref().map(|name| name.0.to_string()) else {
610 continue;
611 };
612 if let Some(error) = topic.error_code.err() {
613 return Err(anyhow!("list topics failed for '{name}': {error}").into());
614 }
615
616 topics.push(TopicListing {
617 name,
618 topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
619 is_internal: topic.is_internal,
620 });
621 }
622
623 topics.sort_by(|left, right| left.name.cmp(&right.name));
624 Ok(topics)
625 }
626
627 #[instrument(name = "admin.describe_topics", level = "debug", skip(self, topics))]
628 pub async fn describe_topics<I, S>(&self, topics: I) -> Result<Vec<TopicDescription>>
630 where
631 I: IntoIterator<Item = S>,
632 S: Into<String>,
633 {
634 let requested_topics = topics
635 .into_iter()
636 .map(|topic| validate_topic_name(topic.into()))
637 .collect::<Result<Vec<_>>>()?;
638 if requested_topics.is_empty() {
639 return Ok(Vec::new());
640 }
641
642 let response = self.fetch_metadata(Some(&requested_topics)).await?;
643 let mut descriptions = BTreeMap::new();
644
645 for topic in response.topics {
646 let name = topic
647 .name
648 .as_ref()
649 .map(|name| name.0.to_string())
650 .unwrap_or_default();
651 if let Some(error) = topic.error_code.err() {
652 let label = if name.is_empty() { "<unknown>" } else { &name };
653 return Err(anyhow!("describe topic '{label}' failed: {error}").into());
654 }
655
656 let mut partitions = topic
657 .partitions
658 .into_iter()
659 .map(|partition| {
660 if let Some(error) = partition.error_code.err() {
661 return Err(anyhow!(
662 "describe topic '{name}' partition {} failed: {error}",
663 partition.partition_index
664 ));
665 }
666
667 Ok(TopicPartitionDescription {
668 partition: partition.partition_index,
669 leader_id: partition.leader_id.0,
670 leader_epoch: partition.leader_epoch,
671 replica_nodes: partition.replica_nodes.into_iter().map(|id| id.0).collect(),
672 isr_nodes: partition.isr_nodes.into_iter().map(|id| id.0).collect(),
673 offline_replicas: partition
674 .offline_replicas
675 .into_iter()
676 .map(|id| id.0)
677 .collect(),
678 })
679 })
680 .collect::<std::result::Result<Vec<_>, _>>()?;
681 partitions.sort_by_key(|partition| partition.partition);
682
683 descriptions.insert(
684 name.clone(),
685 TopicDescription {
686 name,
687 topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
688 is_internal: topic.is_internal,
689 partitions,
690 },
691 );
692 }
693
694 requested_topics
695 .into_iter()
696 .map(|topic| {
697 descriptions.remove(&topic).ok_or_else(|| {
698 anyhow!("metadata response did not include topic '{topic}'").into()
699 })
700 })
701 .collect()
702 }
703
704 #[instrument(name = "admin.describe_cluster", level = "debug", skip(self))]
705 pub async fn describe_cluster(&self) -> Result<ClusterDescription> {
707 let (mut connection, version) = self
708 .connect_with_version::<DescribeClusterRequest>(DESCRIBE_CLUSTER_VERSION_CAP)
709 .await?;
710 let mut request =
711 DescribeClusterRequest::default().with_include_cluster_authorized_operations(false);
712 if version >= 1 {
713 request = request.with_endpoint_type(1);
714 }
715 if version >= 2 {
716 request = request.with_include_fenced_brokers(true);
717 }
718
719 let response: DescribeClusterResponse = connection
720 .send_request::<DescribeClusterRequest>(&self.config.client_id, version, &request)
721 .await?;
722 if let Some(error) = response.error_code.err() {
723 return Err(anyhow!("describe cluster failed: {error}").into());
724 }
725
726 let mut brokers = response
727 .brokers
728 .into_iter()
729 .map(|broker| BrokerDescription {
730 broker_id: broker.broker_id.0,
731 host: broker.host.to_string(),
732 port: broker.port,
733 rack: broker.rack.map(|rack| rack.to_string()),
734 is_fenced: broker.is_fenced,
735 })
736 .collect::<Vec<_>>();
737 brokers.sort_by_key(|broker| broker.broker_id);
738
739 Ok(ClusterDescription {
740 cluster_id: response.cluster_id.to_string(),
741 controller_id: response.controller_id.0,
742 brokers,
743 })
744 }
745
746 #[instrument(name = "admin.list_consumer_groups", level = "debug", skip(self))]
747 pub async fn list_consumer_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
749 let response: ListGroupsResponse = self
750 .send_request::<ListGroupsRequest>(
751 LIST_GROUPS_VERSION_CAP,
752 &ListGroupsRequest::default(),
753 )
754 .await?;
755 if let Some(error) = response.error_code.err() {
756 return Err(anyhow!("list consumer groups failed: {error}").into());
757 }
758
759 let mut groups = response
760 .groups
761 .into_iter()
762 .map(|group| ConsumerGroupListing {
763 group_id: group.group_id.to_string(),
764 protocol_type: group.protocol_type.to_string(),
765 state: (!group.group_state.is_empty()).then(|| group.group_state.to_string()),
766 group_type: (!group.group_type.is_empty()).then(|| group.group_type.to_string()),
767 })
768 .collect::<Vec<_>>();
769 groups.sort_by(|left, right| left.group_id.cmp(&right.group_id));
770 Ok(groups)
771 }
772
773 #[instrument(
774 name = "admin.describe_consumer_groups",
775 level = "debug",
776 skip(self, groups)
777 )]
778 pub async fn describe_consumer_groups<I, S>(
780 &self,
781 groups: I,
782 ) -> Result<Vec<ConsumerGroupDescription>>
783 where
784 I: IntoIterator<Item = S>,
785 S: Into<String>,
786 {
787 let group_ids = groups
788 .into_iter()
789 .map(|group| validate_group_id(group.into()))
790 .collect::<Result<Vec<_>>>()?;
791 if group_ids.is_empty() {
792 return Ok(Vec::new());
793 }
794
795 let request = ConsumerGroupDescribeRequest::default()
796 .with_group_ids(
797 group_ids
798 .iter()
799 .cloned()
800 .map(StrBytes::from_string)
801 .map(Into::into)
802 .collect(),
803 )
804 .with_include_authorized_operations(false);
805 let response: ConsumerGroupDescribeResponse = self
806 .send_request::<ConsumerGroupDescribeRequest>(
807 CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
808 &request,
809 )
810 .await?;
811
812 let mut descriptions = BTreeMap::new();
813 for group in response.groups {
814 let group_id = group.group_id.to_string();
815 if let Some(error) = group.error_code.err() {
816 let message = group
817 .error_message
818 .as_ref()
819 .map(ToString::to_string)
820 .filter(|message| !message.is_empty())
821 .unwrap_or_else(|| error.to_string());
822 return Err(
823 anyhow!("describe consumer group '{group_id}' failed: {message}").into(),
824 );
825 }
826
827 descriptions.insert(
828 group_id.clone(),
829 ConsumerGroupDescription {
830 group_id,
831 state: group.group_state.to_string(),
832 protocol_type: "consumer".to_owned(),
833 protocol_data: group.assignor_name.to_string(),
834 members: group
835 .members
836 .into_iter()
837 .map(|member| ConsumerGroupMemberDescription {
838 member_id: member.member_id.to_string(),
839 group_instance_id: member
840 .instance_id
841 .map(|instance_id| instance_id.to_string()),
842 client_id: member.client_id.to_string(),
843 client_host: member.client_host.to_string(),
844 member_metadata_bytes: member.subscribed_topic_names.len(),
845 member_assignment_bytes: assignment_partition_count(&member.assignment),
846 })
847 .collect(),
848 authorized_operations: (group.authorized_operations != i32::MIN)
849 .then_some(group.authorized_operations),
850 },
851 );
852 }
853
854 group_ids
855 .into_iter()
856 .map(|group_id| {
857 descriptions.remove(&group_id).ok_or_else(|| {
858 anyhow!("describe groups response did not include group '{group_id}'").into()
859 })
860 })
861 .collect()
862 }
863
864 #[instrument(
865 name = "admin.delete_consumer_groups",
866 level = "debug",
867 skip(self, groups)
868 )]
869 pub async fn delete_consumer_groups<I, S>(&self, groups: I) -> Result<()>
871 where
872 I: IntoIterator<Item = S>,
873 S: Into<String>,
874 {
875 let group_ids = groups
876 .into_iter()
877 .map(|group| validate_group_id(group.into()))
878 .collect::<Result<Vec<_>>>()?;
879 if group_ids.is_empty() {
880 return Ok(());
881 }
882
883 let request = DeleteGroupsRequest::default().with_groups_names(
884 group_ids
885 .iter()
886 .cloned()
887 .map(StrBytes::from_string)
888 .map(Into::into)
889 .collect(),
890 );
891 let response: DeleteGroupsResponse = self
892 .send_request::<DeleteGroupsRequest>(DELETE_GROUPS_VERSION_CAP, &request)
893 .await?;
894
895 for result in response.results {
896 if let Some(error) = result.error_code.err() {
897 return Err(anyhow!(
898 "delete consumer group '{}' failed: {error}",
899 &*result.group_id
900 )
901 .into());
902 }
903 }
904 Ok(())
905 }
906
907 #[instrument(
908 name = "admin.describe_configs",
909 level = "debug",
910 skip(self, resources)
911 )]
912 pub async fn describe_configs<I>(&self, resources: I) -> Result<Vec<ConfigResourceConfig>>
914 where
915 I: IntoIterator<Item = ConfigResource>,
916 {
917 let resources = resources.into_iter().collect::<Vec<_>>();
918 if resources.is_empty() {
919 return Ok(Vec::new());
920 }
921
922 let request = DescribeConfigsRequest::default()
923 .with_resources(
924 resources
925 .iter()
926 .map(|resource| {
927 DescribeConfigsResource::default()
928 .with_resource_type(resource.resource_type.as_protocol_value())
929 .with_resource_name(StrBytes::from_string(
930 resource.resource_name.clone(),
931 ))
932 .with_configuration_keys(None)
933 })
934 .collect(),
935 )
936 .with_include_synonyms(false);
937 let response: DescribeConfigsResponse = self
938 .send_request::<DescribeConfigsRequest>(DESCRIBE_CONFIGS_VERSION_CAP, &request)
939 .await?;
940
941 let mut described = BTreeMap::new();
942 for resource in response.results {
943 let resource_type = ConfigResourceType::from_protocol_value(resource.resource_type);
944 let resource_name = resource.resource_name.to_string();
945 if let Some(error) = resource.error_code.err() {
946 return Err(anyhow!(
947 "describe configs for {:?} '{}' failed: {}",
948 resource_type,
949 resource_name,
950 resource
951 .error_message
952 .as_ref()
953 .map(|message| message.to_string())
954 .filter(|message| !message.is_empty())
955 .unwrap_or_else(|| error.to_string())
956 )
957 .into());
958 }
959
960 let entries = resource
961 .configs
962 .into_iter()
963 .map(|entry| {
964 let name = entry.name.to_string();
965 let config_entry = ConfigEntry {
966 name: name.clone(),
967 value: entry.value.map(|value| value.to_string()),
968 read_only: entry.read_only,
969 config_source: entry.config_source,
970 is_sensitive: entry.is_sensitive,
971 config_type: (response_supported_config_type(entry.config_type))
972 .then_some(entry.config_type),
973 documentation: entry.documentation.map(|doc| doc.to_string()),
974 };
975 (name, config_entry)
976 })
977 .collect();
978
979 described.insert(
980 (resource_type, resource_name.clone()),
981 ConfigResourceConfig {
982 resource: ConfigResource::new(resource_type, resource_name),
983 entries,
984 },
985 );
986 }
987
988 resources
989 .into_iter()
990 .map(|resource| {
991 described
992 .remove(&(resource.resource_type, resource.resource_name.clone()))
993 .ok_or_else(|| {
994 anyhow!("describe configs response did not include {:?}", resource).into()
995 })
996 })
997 .collect()
998 }
999
1000 #[instrument(
1001 name = "admin.incremental_alter_configs",
1002 level = "debug",
1003 skip(self, resources)
1004 )]
1005 pub async fn incremental_alter_configs<I>(&self, resources: I) -> Result<()>
1007 where
1008 I: IntoIterator<Item = (ConfigResource, Vec<AlterConfigOp>)>,
1009 {
1010 let resources = resources
1011 .into_iter()
1012 .map(|(resource, ops)| {
1013 AlterConfigsResource::default()
1014 .with_resource_type(resource.resource_type.as_protocol_value())
1015 .with_resource_name(StrBytes::from_string(resource.resource_name))
1016 .with_configs(
1017 ops.into_iter()
1018 .map(|op| {
1019 AlterableConfig::default()
1020 .with_name(StrBytes::from_string(op.name))
1021 .with_config_operation(op.op_type.as_protocol_value())
1022 .with_value(op.value.map(StrBytes::from_string))
1023 })
1024 .collect(),
1025 )
1026 })
1027 .collect::<Vec<_>>();
1028 if resources.is_empty() {
1029 return Ok(());
1030 }
1031
1032 let request = IncrementalAlterConfigsRequest::default()
1033 .with_resources(resources)
1034 .with_validate_only(false);
1035 let response: IncrementalAlterConfigsResponse = self
1036 .send_request::<IncrementalAlterConfigsRequest>(
1037 INCREMENTAL_ALTER_CONFIGS_VERSION_CAP,
1038 &request,
1039 )
1040 .await?;
1041
1042 for resource in response.responses {
1043 if let Some(error) = resource.error_code.err() {
1044 return Err(anyhow!(
1045 "incremental alter configs for {:?} '{}' failed: {}",
1046 ConfigResourceType::from_protocol_value(resource.resource_type),
1047 resource.resource_name,
1048 resource
1049 .error_message
1050 .as_ref()
1051 .map(|message| message.to_string())
1052 .filter(|message| !message.is_empty())
1053 .unwrap_or_else(|| error.to_string())
1054 )
1055 .into());
1056 }
1057 }
1058
1059 Ok(())
1060 }
1061
1062 #[instrument(
1063 name = "admin.upsert_scram_credential",
1064 level = "debug",
1065 skip(self, user, password)
1066 )]
1067 pub async fn upsert_scram_credential(
1069 &self,
1070 user: impl Into<String>,
1071 mechanism: SaslMechanism,
1072 password: impl AsRef<[u8]>,
1073 ) -> Result<()> {
1074 self.upsert_scram_credential_with_iterations(
1075 user,
1076 mechanism,
1077 password,
1078 scram::MIN_ITERATIONS,
1079 )
1080 .await
1081 }
1082
1083 #[instrument(
1084 name = "admin.upsert_scram_credential_with_iterations",
1085 level = "debug",
1086 skip(self, user, password)
1087 )]
1088 pub async fn upsert_scram_credential_with_iterations(
1090 &self,
1091 user: impl Into<String>,
1092 mechanism: SaslMechanism,
1093 password: impl AsRef<[u8]>,
1094 iterations: i32,
1095 ) -> Result<()> {
1096 let user = user.into();
1097 let mechanism_type = mechanism
1098 .scram_type()
1099 .ok_or_else(|| anyhow!("SCRAM credential upsertion requires a SCRAM mechanism"))?;
1100 let salt = scram::secure_random_bytes()?;
1101 let salted_password =
1102 scram::salted_password(mechanism, password.as_ref(), &salt, iterations)?;
1103 let request = AlterUserScramCredentialsRequest::default().with_upsertions(vec![
1104 ScramCredentialUpsertion::default()
1105 .with_name(StrBytes::from_string(user.clone()))
1106 .with_mechanism(mechanism_type)
1107 .with_iterations(iterations)
1108 .with_salt(Bytes::from(salt))
1109 .with_salted_password(Bytes::from(salted_password)),
1110 ]);
1111 let response: AlterUserScramCredentialsResponse = self
1112 .send_request::<AlterUserScramCredentialsRequest>(
1113 ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP,
1114 &request,
1115 )
1116 .await?;
1117
1118 for result in response.results {
1119 if let Some(error) = result.error_code.err() {
1120 return Err(anyhow!(
1121 "alter SCRAM credential for user '{}' failed: {}",
1122 result.user,
1123 result
1124 .error_message
1125 .as_ref()
1126 .map(|message| message.to_string())
1127 .filter(|message| !message.is_empty())
1128 .unwrap_or_else(|| error.to_string())
1129 )
1130 .into());
1131 }
1132 }
1133
1134 Ok(())
1135 }
1136
1137 pub fn config(&self) -> &AdminConfig {
1139 &self.config
1140 }
1141
1142 pub async fn finalized_feature_levels(&self) -> Result<Vec<BrokerFeatureLevel>> {
1144 let connection = connect_to_any_bootstrap(
1145 &self.config.bootstrap_servers,
1146 &self.config.client_id,
1147 self.config.request_timeout,
1148 self.config.security_protocol,
1149 &self.config.tls,
1150 &self.config.sasl,
1151 )
1152 .await?;
1153 Ok(connection
1154 .finalized_feature_levels()
1155 .into_iter()
1156 .map(|(name, level)| BrokerFeatureLevel { name, level })
1157 .collect())
1158 }
1159
1160 async fn warm_up(&self) -> Result<()> {
1161 let _ = connect_to_any_bootstrap(
1162 &self.config.bootstrap_servers,
1163 &self.config.client_id,
1164 self.config.request_timeout,
1165 self.config.security_protocol,
1166 &self.config.tls,
1167 &self.config.sasl,
1168 )
1169 .await?;
1170 Ok(())
1171 }
1172
1173 async fn fetch_metadata(&self, topics: Option<&[String]>) -> Result<MetadataResponse> {
1174 let (mut connection, version) = self
1175 .connect_with_version::<MetadataRequest>(METADATA_VERSION_CAP)
1176 .await?;
1177 let request = MetadataRequest::default()
1178 .with_topics(topics.map(|topics| {
1179 topics
1180 .iter()
1181 .cloned()
1182 .map(StrBytes::from_string)
1183 .map(|name| MetadataRequestTopic::default().with_name(Some(name.into())))
1184 .collect()
1185 }))
1186 .with_allow_auto_topic_creation(false)
1187 .with_include_cluster_authorized_operations(false)
1188 .with_include_topic_authorized_operations(false);
1189 Ok(connection
1190 .send_request::<MetadataRequest>(&self.config.client_id, version, &request)
1191 .await?)
1192 }
1193
1194 async fn send_request<Req>(&self, version_cap: i16, request: &Req) -> Result<Req::Response>
1195 where
1196 Req: Request,
1197 {
1198 let (mut connection, version) = self.connect_with_version::<Req>(version_cap).await?;
1199 Ok(connection
1200 .send_request::<Req>(&self.config.client_id, version, request)
1201 .await?)
1202 }
1203
1204 async fn connect_with_version<Req>(&self, version_cap: i16) -> Result<(BrokerConnection, i16)>
1205 where
1206 Req: Request,
1207 {
1208 let connection = connect_to_any_bootstrap(
1209 &self.config.bootstrap_servers,
1210 &self.config.client_id,
1211 self.config.request_timeout,
1212 self.config.security_protocol,
1213 &self.config.tls,
1214 &self.config.sasl,
1215 )
1216 .await?;
1217 let version = connection.version_with_cap::<Req>(version_cap)?;
1218 Ok((connection, version))
1219 }
1220}
1221
1222fn validate_topic_name(topic: String) -> Result<String> {
1223 let topic = topic.trim();
1224 if topic.is_empty() {
1225 return Err(AdminError::EmptyTopicName.into());
1226 }
1227
1228 Ok(topic.to_owned())
1229}
1230
1231fn validate_group_id(group_id: String) -> Result<String> {
1232 let group_id = group_id.trim();
1233 if group_id.is_empty() {
1234 return Err(anyhow!("consumer group id cannot be empty").into());
1235 }
1236
1237 Ok(group_id.to_owned())
1238}
1239
1240fn assignment_partition_count(
1241 assignment: &kafka_protocol::messages::consumer_group_describe_response::Assignment,
1242) -> usize {
1243 assignment
1244 .topic_partitions
1245 .iter()
1246 .map(|topic| topic.partitions.len())
1247 .sum()
1248}
1249
1250fn is_ignorable_create_topic_error(error: ResponseError) -> bool {
1251 error == ResponseError::TopicAlreadyExists
1252}
1253
1254fn response_supported_config_type(config_type: i8) -> bool {
1255 config_type >= 0
1256}
1257
1258#[cfg(test)]
1259mod tests {
1260 use super::*;
1261
1262 #[test]
1263 fn new_topic_maps_to_create_topics_request() {
1264 let topic = NewTopic::new("orders", 3, 2)
1265 .with_config("cleanup.policy", "compact")
1266 .into_request_topic()
1267 .expect("topic should be valid");
1268
1269 assert_eq!(topic.name.0.to_string(), "orders");
1270 assert_eq!(topic.num_partitions, 3);
1271 assert_eq!(topic.replication_factor, 2);
1272 assert_eq!(topic.configs.len(), 1);
1273 assert_eq!(topic.configs[0].name.to_string(), "cleanup.policy");
1274 }
1275
1276 #[test]
1277 fn new_topic_rejects_invalid_partition_count() {
1278 let error = NewTopic::new("orders", 0, 1)
1279 .into_request_topic()
1280 .expect_err("invalid topic should fail");
1281 assert!(
1282 error
1283 .to_string()
1284 .contains("topic partition count must be positive")
1285 );
1286 }
1287
1288 #[test]
1289 fn new_topic_rejects_empty_names_and_invalid_replication_factor() {
1290 let error = NewTopic::new(" ", 1, 1).into_request_topic().unwrap_err();
1291 assert!(matches!(
1292 error,
1293 crate::Error::Admin(AdminError::EmptyTopicName)
1294 ));
1295
1296 let error = NewTopic::new("orders", 1, 0)
1297 .into_request_topic()
1298 .unwrap_err();
1299 assert!(matches!(
1300 error,
1301 crate::Error::Admin(AdminError::InvalidReplicationFactor {
1302 replication_factor: 0
1303 })
1304 ));
1305 }
1306
1307 #[test]
1308 fn new_partitions_maps_assignments_and_rejects_invalid_input() {
1309 let topic = NewPartitions::increase_to(4)
1310 .with_assignment([1, 2])
1311 .with_assignment([2, 3])
1312 .into_request_topic("orders".to_owned())
1313 .unwrap();
1314 assert_eq!(topic.name.to_string(), "orders");
1315 assert_eq!(topic.count, 4);
1316 assert_eq!(topic.assignments.unwrap().len(), 2);
1317
1318 let error = NewPartitions::increase_to(0)
1319 .into_request_topic("orders".to_owned())
1320 .unwrap_err();
1321 assert!(matches!(
1322 error,
1323 crate::Error::Admin(AdminError::InvalidPartitionCount { partitions: 0 })
1324 ));
1325
1326 let error = NewPartitions::increase_to(1)
1327 .into_request_topic(" ".to_owned())
1328 .unwrap_err();
1329 assert!(matches!(
1330 error,
1331 crate::Error::Admin(AdminError::EmptyTopicName)
1332 ));
1333 }
1334
1335 #[test]
1336 fn config_resource_and_operation_protocol_values_are_stable() {
1337 assert_eq!(ConfigResourceType::Unknown.as_protocol_value(), 0);
1338 assert_eq!(ConfigResourceType::Topic.as_protocol_value(), 2);
1339 assert_eq!(ConfigResourceType::Broker.as_protocol_value(), 4);
1340 assert_eq!(ConfigResourceType::BrokerLogger.as_protocol_value(), 8);
1341 assert_eq!(ConfigResourceType::ClientMetrics.as_protocol_value(), 16);
1342 assert_eq!(ConfigResourceType::Group.as_protocol_value(), 32);
1343 assert_eq!(
1344 ConfigResourceType::from_protocol_value(2),
1345 ConfigResourceType::Topic
1346 );
1347 assert_eq!(
1348 ConfigResourceType::from_protocol_value(99),
1349 ConfigResourceType::Unknown
1350 );
1351
1352 assert_eq!(
1353 AlterConfigOp::set("cleanup.policy", "compact")
1354 .op_type
1355 .as_protocol_value(),
1356 0
1357 );
1358 assert_eq!(
1359 AlterConfigOp::delete("retention.ms")
1360 .op_type
1361 .as_protocol_value(),
1362 1
1363 );
1364 assert_eq!(AlterConfigOpType::Append.as_protocol_value(), 2);
1365 assert_eq!(AlterConfigOpType::Subtract.as_protocol_value(), 3);
1366 assert!(response_supported_config_type(0));
1367 assert!(!response_supported_config_type(-1));
1368 }
1369
1370 #[test]
1371 fn config_resource_config_entry_lookup_returns_named_entry() {
1372 let mut entries = BTreeMap::new();
1373 entries.insert(
1374 "cleanup.policy".to_owned(),
1375 ConfigEntry {
1376 name: "cleanup.policy".to_owned(),
1377 value: Some("compact".to_owned()),
1378 read_only: false,
1379 config_source: 1,
1380 is_sensitive: false,
1381 config_type: Some(2),
1382 documentation: None,
1383 },
1384 );
1385 let config = ConfigResourceConfig {
1386 resource: ConfigResource::topic("orders"),
1387 entries,
1388 };
1389
1390 assert_eq!(
1391 config.entry("cleanup.policy").unwrap().value.as_deref(),
1392 Some("compact")
1393 );
1394 assert!(config.entry("missing").is_none());
1395 }
1396
1397 #[test]
1398 fn topic_already_exists_is_ignorable() {
1399 assert!(is_ignorable_create_topic_error(
1400 ResponseError::TopicAlreadyExists
1401 ));
1402 assert!(!is_ignorable_create_topic_error(
1403 ResponseError::UnknownTopicOrPartition
1404 ));
1405 }
1406}