1use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use bytes::Bytes;
19use kafka_protocol::error::{ParseResponseErrorCode, ResponseError};
20use kafka_protocol::messages::alter_user_scram_credentials_request::ScramCredentialUpsertion;
21use kafka_protocol::messages::create_partitions_request::{
22 CreatePartitionsAssignment, CreatePartitionsTopic,
23};
24use kafka_protocol::messages::create_topics_request::{CreatableTopic, CreatableTopicConfig};
25use kafka_protocol::messages::describe_configs_request::DescribeConfigsResource;
26use kafka_protocol::messages::incremental_alter_configs_request::{
27 AlterConfigsResource, AlterableConfig,
28};
29use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
30use kafka_protocol::messages::update_features_request::FeatureUpdateKey;
31use kafka_protocol::messages::{
32 AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse,
33 ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, CreatePartitionsRequest,
34 CreatePartitionsResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteGroupsRequest,
35 DeleteGroupsResponse, DeleteTopicsRequest, DeleteTopicsResponse, DescribeClusterRequest,
36 DescribeClusterResponse, DescribeConfigsRequest, DescribeConfigsResponse,
37 IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse, ListGroupsRequest,
38 ListGroupsResponse, MetadataRequest, MetadataResponse, UpdateFeaturesRequest,
39 UpdateFeaturesResponse,
40};
41use kafka_protocol::protocol::{Request, StrBytes};
42use tracing::{debug, instrument};
43use uuid::Uuid;
44
45use crate::config::{AdminConfig, SaslMechanism};
46use crate::constants::{
47 ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP, CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
48 CREATE_PARTITIONS_VERSION_CAP, CREATE_TOPICS_VERSION_CAP, DELETE_GROUPS_VERSION_CAP,
49 DELETE_TOPICS_VERSION_CAP, DESCRIBE_CLUSTER_VERSION_CAP, DESCRIBE_CONFIGS_VERSION_CAP,
50 INCREMENTAL_ALTER_CONFIGS_VERSION_CAP, LIST_GROUPS_VERSION_CAP, METADATA_VERSION_CAP,
51 UPDATE_FEATURES_VERSION_CAP,
52};
53use crate::network::scram;
54use crate::network::{BrokerConnection, connect_to_any_bootstrap, duration_to_i32_ms};
55use crate::{AdminError, Result};
56
57#[derive(Debug, Clone)]
58pub struct NewTopic {
60 pub name: String,
62 pub num_partitions: i32,
64 pub replication_factor: i16,
66 pub configs: BTreeMap<String, String>,
68}
69
70impl NewTopic {
71 pub fn new(name: impl Into<String>, num_partitions: i32, replication_factor: i16) -> Self {
73 Self {
74 name: name.into(),
75 num_partitions,
76 replication_factor,
77 configs: BTreeMap::new(),
78 }
79 }
80
81 pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
83 self.configs.insert(key.into(), value.into());
84 self
85 }
86
87 fn into_request_topic(self) -> Result<CreatableTopic> {
88 let name = validate_topic_name(self.name)?;
89 if self.num_partitions <= 0 {
90 return Err(AdminError::InvalidPartitionCount {
91 partitions: self.num_partitions,
92 }
93 .into());
94 }
95 if self.replication_factor <= 0 {
96 return Err(AdminError::InvalidReplicationFactor {
97 replication_factor: self.replication_factor,
98 }
99 .into());
100 }
101
102 let configs = self
103 .configs
104 .into_iter()
105 .map(|(key, value)| {
106 CreatableTopicConfig::default()
107 .with_name(StrBytes::from_string(key))
108 .with_value(Some(StrBytes::from_string(value)))
109 })
110 .collect();
111
112 Ok(CreatableTopic::default()
113 .with_name(StrBytes::from_string(name).into())
114 .with_num_partitions(self.num_partitions)
115 .with_replication_factor(self.replication_factor)
116 .with_configs(configs))
117 }
118}
119
120#[derive(Debug, Clone, PartialEq, Eq)]
121pub struct NewPartitions {
123 pub total_count: i32,
125 pub assignments: Vec<Vec<i32>>,
127}
128
129impl NewPartitions {
130 pub fn increase_to(total_count: i32) -> Self {
132 Self {
133 total_count,
134 assignments: Vec::new(),
135 }
136 }
137
138 pub fn with_assignment<I>(mut self, broker_ids: I) -> Self
140 where
141 I: IntoIterator<Item = i32>,
142 {
143 self.assignments.push(broker_ids.into_iter().collect());
144 self
145 }
146
147 fn into_request_topic(self, topic_name: String) -> Result<CreatePartitionsTopic> {
148 let name = validate_topic_name(topic_name)?;
149 if self.total_count <= 0 {
150 return Err(AdminError::InvalidPartitionCount {
151 partitions: self.total_count,
152 }
153 .into());
154 }
155
156 let assignments = (!self.assignments.is_empty()).then(|| {
157 self.assignments
158 .into_iter()
159 .map(|broker_ids| {
160 CreatePartitionsAssignment::default()
161 .with_broker_ids(broker_ids.into_iter().map(Into::into).collect())
162 })
163 .collect()
164 });
165
166 Ok(CreatePartitionsTopic::default()
167 .with_name(StrBytes::from_string(name).into())
168 .with_count(self.total_count)
169 .with_assignments(assignments))
170 }
171}
172
173#[derive(Debug, Clone, PartialEq, Eq)]
174pub struct TopicListing {
176 pub name: String,
178 pub topic_id: Option<Uuid>,
180 pub is_internal: bool,
182}
183
184#[derive(Debug, Clone, PartialEq, Eq)]
185pub struct TopicPartitionDescription {
187 pub partition: i32,
189 pub leader_id: i32,
191 pub leader_epoch: i32,
193 pub replica_nodes: Vec<i32>,
195 pub isr_nodes: Vec<i32>,
197 pub offline_replicas: Vec<i32>,
199}
200
201#[derive(Debug, Clone, PartialEq, Eq)]
202pub struct TopicDescription {
204 pub name: String,
206 pub topic_id: Option<Uuid>,
208 pub is_internal: bool,
210 pub partitions: Vec<TopicPartitionDescription>,
212}
213
214#[derive(Debug, Clone, PartialEq, Eq)]
215pub struct BrokerDescription {
217 pub broker_id: i32,
219 pub host: String,
221 pub port: i32,
223 pub rack: Option<String>,
225 pub is_fenced: bool,
227}
228
229#[derive(Debug, Clone, PartialEq, Eq)]
230pub struct ClusterDescription {
232 pub cluster_id: String,
234 pub controller_id: i32,
236 pub brokers: Vec<BrokerDescription>,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct BrokerFeatureLevel {
243 pub name: String,
245 pub level: i16,
247}
248
249#[derive(Debug, Clone, PartialEq, Eq)]
250pub struct FeatureUpdate {
252 pub name: String,
254 pub max_version_level: i16,
256 pub upgrade_type: FeatureUpgradeType,
258}
259
260impl FeatureUpdate {
261 pub fn upgrade(name: impl Into<String>, max_version_level: i16) -> Self {
263 Self {
264 name: name.into(),
265 max_version_level,
266 upgrade_type: FeatureUpgradeType::Upgrade,
267 }
268 }
269
270 pub fn safe_downgrade(name: impl Into<String>, max_version_level: i16) -> Self {
272 Self {
273 name: name.into(),
274 max_version_level,
275 upgrade_type: FeatureUpgradeType::SafeDowngrade,
276 }
277 }
278
279 pub fn unsafe_downgrade(name: impl Into<String>, max_version_level: i16) -> Self {
281 Self {
282 name: name.into(),
283 max_version_level,
284 upgrade_type: FeatureUpgradeType::UnsafeDowngrade,
285 }
286 }
287
288 fn into_request_update(self, version: i16) -> Result<FeatureUpdateKey> {
289 let name = validate_feature_name(self.name)?;
290 let allow_downgrade = self.upgrade_type.allows_downgrade();
291 let mut update = FeatureUpdateKey::default()
292 .with_feature(StrBytes::from_string(name))
293 .with_max_version_level(self.max_version_level);
294 if version == 0 {
295 update = update.with_allow_downgrade(allow_downgrade);
296 } else {
297 update = update.with_upgrade_type(self.upgrade_type.as_protocol_value());
298 }
299 Ok(update)
300 }
301}
302
303#[derive(Debug, Clone, Copy, PartialEq, Eq)]
304pub enum FeatureUpgradeType {
306 Upgrade,
308 SafeDowngrade,
310 UnsafeDowngrade,
312}
313
314impl FeatureUpgradeType {
315 fn as_protocol_value(self) -> i8 {
316 match self {
317 Self::Upgrade => 1,
318 Self::SafeDowngrade => 2,
319 Self::UnsafeDowngrade => 3,
320 }
321 }
322
323 fn allows_downgrade(self) -> bool {
324 !matches!(self, Self::Upgrade)
325 }
326}
327
328#[derive(Debug, Clone, PartialEq, Eq)]
329pub struct ConsumerGroupListing {
331 pub group_id: String,
333 pub protocol_type: String,
335 pub state: Option<String>,
337 pub group_type: Option<String>,
339}
340
341#[derive(Debug, Clone, PartialEq, Eq)]
342pub struct ConsumerGroupDescription {
344 pub group_id: String,
346 pub state: String,
348 pub protocol_type: String,
350 pub protocol_data: String,
352 pub members: Vec<ConsumerGroupMemberDescription>,
354 pub authorized_operations: Option<i32>,
356}
357
358#[derive(Debug, Clone, PartialEq, Eq)]
359pub struct ConsumerGroupMemberDescription {
361 pub member_id: String,
363 pub group_instance_id: Option<String>,
365 pub client_id: String,
367 pub client_host: String,
369 pub member_metadata_bytes: usize,
371 pub member_assignment_bytes: usize,
373}
374
375#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
376pub enum ConfigResourceType {
378 Unknown,
380 Topic,
382 Broker,
384 BrokerLogger,
386 ClientMetrics,
388 Group,
390}
391
392impl ConfigResourceType {
393 fn as_protocol_value(self) -> i8 {
394 match self {
395 Self::Unknown => 0,
396 Self::Topic => 2,
397 Self::Broker => 4,
398 Self::BrokerLogger => 8,
399 Self::ClientMetrics => 16,
400 Self::Group => 32,
401 }
402 }
403
404 fn from_protocol_value(value: i8) -> Self {
405 match value {
406 2 => Self::Topic,
407 4 => Self::Broker,
408 8 => Self::BrokerLogger,
409 16 => Self::ClientMetrics,
410 32 => Self::Group,
411 _ => Self::Unknown,
412 }
413 }
414}
415
416#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
417pub struct ConfigResource {
419 pub resource_type: ConfigResourceType,
421 pub resource_name: String,
423}
424
425impl ConfigResource {
426 pub fn new(resource_type: ConfigResourceType, resource_name: impl Into<String>) -> Self {
428 Self {
429 resource_type,
430 resource_name: resource_name.into(),
431 }
432 }
433
434 pub fn topic(resource_name: impl Into<String>) -> Self {
436 Self::new(ConfigResourceType::Topic, resource_name)
437 }
438
439 pub fn group(resource_name: impl Into<String>) -> Self {
441 Self::new(ConfigResourceType::Group, resource_name)
442 }
443}
444
445#[derive(Debug, Clone, PartialEq, Eq)]
446pub struct ConfigEntry {
448 pub name: String,
450 pub value: Option<String>,
452 pub read_only: bool,
454 pub config_source: i8,
456 pub is_sensitive: bool,
458 pub config_type: Option<i8>,
460 pub documentation: Option<String>,
462}
463
464#[derive(Debug, Clone, PartialEq, Eq)]
465pub struct ConfigResourceConfig {
467 pub resource: ConfigResource,
469 pub entries: BTreeMap<String, ConfigEntry>,
471}
472
473impl ConfigResourceConfig {
474 pub fn entry(&self, name: &str) -> Option<&ConfigEntry> {
476 self.entries.get(name)
477 }
478}
479
480#[derive(Debug, Clone, Copy, PartialEq, Eq)]
481pub enum AlterConfigOpType {
483 Set,
485 Delete,
487 Append,
489 Subtract,
491}
492
493impl AlterConfigOpType {
494 fn as_protocol_value(self) -> i8 {
495 match self {
496 Self::Set => 0,
497 Self::Delete => 1,
498 Self::Append => 2,
499 Self::Subtract => 3,
500 }
501 }
502}
503
504#[derive(Debug, Clone, PartialEq, Eq)]
505pub struct AlterConfigOp {
507 pub name: String,
509 pub op_type: AlterConfigOpType,
511 pub value: Option<String>,
513}
514
515impl AlterConfigOp {
516 pub fn set(name: impl Into<String>, value: impl Into<String>) -> Self {
518 Self {
519 name: name.into(),
520 op_type: AlterConfigOpType::Set,
521 value: Some(value.into()),
522 }
523 }
524
525 pub fn delete(name: impl Into<String>) -> Self {
527 Self {
528 name: name.into(),
529 op_type: AlterConfigOpType::Delete,
530 value: None,
531 }
532 }
533}
534
535#[derive(Debug, Clone)]
536pub struct KafkaAdmin {
538 config: AdminConfig,
539}
540
541impl KafkaAdmin {
542 #[instrument(
543 name = "admin.connect",
544 level = "debug",
545 skip(config),
546 fields(
547 bootstrap_server_count = config.bootstrap_servers.len(),
548 client_id = %config.client_id
549 )
550 )]
551 pub async fn connect(config: AdminConfig) -> Result<Self> {
553 let admin = Self { config };
554 admin.warm_up().await?;
555 debug!("admin client connected");
556 Ok(admin)
557 }
558
559 #[instrument(name = "admin.create_topics", level = "debug", skip(self, topics))]
560 pub async fn create_topics<I>(&self, topics: I) -> Result<()>
562 where
563 I: IntoIterator<Item = NewTopic>,
564 {
565 let topics = topics
566 .into_iter()
567 .map(NewTopic::into_request_topic)
568 .collect::<Result<Vec<_>>>()?;
569 if topics.is_empty() {
570 return Ok(());
571 }
572
573 let request = CreateTopicsRequest::default()
574 .with_topics(topics)
575 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
576 .with_validate_only(false);
577 let response: CreateTopicsResponse = self
578 .send_request::<CreateTopicsRequest>(CREATE_TOPICS_VERSION_CAP, &request)
579 .await?;
580
581 for topic in response.topics {
582 let name = topic.name.0.to_string();
583 if let Some(error) = topic
584 .error_code
585 .err()
586 .filter(|error| !is_ignorable_create_topic_error(*error))
587 {
588 return Err(anyhow!("create topic '{name}' failed: {error}").into());
589 }
590 }
591
592 Ok(())
593 }
594
595 #[instrument(name = "admin.delete_topics", level = "debug", skip(self, topics))]
596 pub async fn delete_topics<I, S>(&self, topics: I) -> Result<()>
598 where
599 I: IntoIterator<Item = S>,
600 S: Into<String>,
601 {
602 let topic_names = topics
603 .into_iter()
604 .map(|topic| validate_topic_name(topic.into()))
605 .collect::<Result<Vec<_>>>()?;
606 if topic_names.is_empty() {
607 return Ok(());
608 }
609
610 let request = DeleteTopicsRequest::default()
611 .with_topic_names(
612 topic_names
613 .iter()
614 .cloned()
615 .map(StrBytes::from_string)
616 .map(Into::into)
617 .collect(),
618 )
619 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?);
620 let response: DeleteTopicsResponse = self
621 .send_request::<DeleteTopicsRequest>(DELETE_TOPICS_VERSION_CAP, &request)
622 .await?;
623
624 for topic in response.responses {
625 let name = topic
626 .name
627 .as_ref()
628 .map(|name| name.0.to_string())
629 .unwrap_or_else(|| "<unknown>".to_owned());
630 if let Some(error) = topic.error_code.err() {
631 return Err(anyhow!("delete topic '{name}' failed: {error}").into());
632 }
633 }
634
635 Ok(())
636 }
637
638 #[instrument(
639 name = "admin.create_partitions",
640 level = "debug",
641 skip(self, partitions)
642 )]
643 pub async fn create_partitions<I, S>(&self, partitions: I) -> Result<()>
645 where
646 I: IntoIterator<Item = (S, NewPartitions)>,
647 S: Into<String>,
648 {
649 let topics = partitions
650 .into_iter()
651 .map(|(topic, new_partitions)| new_partitions.into_request_topic(topic.into()))
652 .collect::<Result<Vec<_>>>()?;
653 if topics.is_empty() {
654 return Ok(());
655 }
656
657 let request = CreatePartitionsRequest::default()
658 .with_topics(topics)
659 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
660 .with_validate_only(false);
661 let response: CreatePartitionsResponse = self
662 .send_request::<CreatePartitionsRequest>(CREATE_PARTITIONS_VERSION_CAP, &request)
663 .await?;
664
665 for topic in response.results {
666 let name = topic.name.0.to_string();
667 if let Some(error) = topic.error_code.err() {
668 return Err(anyhow!(
669 "create partitions for topic '{name}' failed: {}",
670 topic
671 .error_message
672 .as_ref()
673 .map(|message| message.to_string())
674 .filter(|message| !message.is_empty())
675 .unwrap_or_else(|| error.to_string())
676 )
677 .into());
678 }
679 }
680
681 Ok(())
682 }
683
684 #[instrument(name = "admin.list_topics", level = "debug", skip(self))]
685 pub async fn list_topics(&self) -> Result<Vec<TopicListing>> {
687 let response = self.fetch_metadata(None).await?;
688 let mut topics = Vec::new();
689
690 for topic in response.topics {
691 let Some(name) = topic.name.as_ref().map(|name| name.0.to_string()) else {
692 continue;
693 };
694 if let Some(error) = topic.error_code.err() {
695 return Err(anyhow!("list topics failed for '{name}': {error}").into());
696 }
697
698 topics.push(TopicListing {
699 name,
700 topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
701 is_internal: topic.is_internal,
702 });
703 }
704
705 topics.sort_by(|left, right| left.name.cmp(&right.name));
706 Ok(topics)
707 }
708
709 #[instrument(name = "admin.describe_topics", level = "debug", skip(self, topics))]
710 pub async fn describe_topics<I, S>(&self, topics: I) -> Result<Vec<TopicDescription>>
712 where
713 I: IntoIterator<Item = S>,
714 S: Into<String>,
715 {
716 let requested_topics = topics
717 .into_iter()
718 .map(|topic| validate_topic_name(topic.into()))
719 .collect::<Result<Vec<_>>>()?;
720 if requested_topics.is_empty() {
721 return Ok(Vec::new());
722 }
723
724 let response = self.fetch_metadata(Some(&requested_topics)).await?;
725 let mut descriptions = BTreeMap::new();
726
727 for topic in response.topics {
728 let name = topic
729 .name
730 .as_ref()
731 .map(|name| name.0.to_string())
732 .unwrap_or_default();
733 if let Some(error) = topic.error_code.err() {
734 let label = if name.is_empty() { "<unknown>" } else { &name };
735 return Err(anyhow!("describe topic '{label}' failed: {error}").into());
736 }
737
738 let mut partitions = topic
739 .partitions
740 .into_iter()
741 .map(|partition| {
742 if let Some(error) = partition.error_code.err() {
743 return Err(anyhow!(
744 "describe topic '{name}' partition {} failed: {error}",
745 partition.partition_index
746 ));
747 }
748
749 Ok(TopicPartitionDescription {
750 partition: partition.partition_index,
751 leader_id: partition.leader_id.0,
752 leader_epoch: partition.leader_epoch,
753 replica_nodes: partition.replica_nodes.into_iter().map(|id| id.0).collect(),
754 isr_nodes: partition.isr_nodes.into_iter().map(|id| id.0).collect(),
755 offline_replicas: partition
756 .offline_replicas
757 .into_iter()
758 .map(|id| id.0)
759 .collect(),
760 })
761 })
762 .collect::<std::result::Result<Vec<_>, _>>()?;
763 partitions.sort_by_key(|partition| partition.partition);
764
765 descriptions.insert(
766 name.clone(),
767 TopicDescription {
768 name,
769 topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
770 is_internal: topic.is_internal,
771 partitions,
772 },
773 );
774 }
775
776 requested_topics
777 .into_iter()
778 .map(|topic| {
779 descriptions.remove(&topic).ok_or_else(|| {
780 anyhow!("metadata response did not include topic '{topic}'").into()
781 })
782 })
783 .collect()
784 }
785
786 #[instrument(name = "admin.describe_cluster", level = "debug", skip(self))]
787 pub async fn describe_cluster(&self) -> Result<ClusterDescription> {
789 let (mut connection, version) = self
790 .connect_with_version::<DescribeClusterRequest>(DESCRIBE_CLUSTER_VERSION_CAP)
791 .await?;
792 let mut request =
793 DescribeClusterRequest::default().with_include_cluster_authorized_operations(false);
794 if version >= 1 {
795 request = request.with_endpoint_type(1);
796 }
797 if version >= 2 {
798 request = request.with_include_fenced_brokers(true);
799 }
800
801 let response: DescribeClusterResponse = connection
802 .send_request::<DescribeClusterRequest>(&self.config.client_id, version, &request)
803 .await?;
804 if let Some(error) = response.error_code.err() {
805 return Err(anyhow!("describe cluster failed: {error}").into());
806 }
807
808 let mut brokers = response
809 .brokers
810 .into_iter()
811 .map(|broker| BrokerDescription {
812 broker_id: broker.broker_id.0,
813 host: broker.host.to_string(),
814 port: broker.port,
815 rack: broker.rack.map(|rack| rack.to_string()),
816 is_fenced: broker.is_fenced,
817 })
818 .collect::<Vec<_>>();
819 brokers.sort_by_key(|broker| broker.broker_id);
820
821 Ok(ClusterDescription {
822 cluster_id: response.cluster_id.to_string(),
823 controller_id: response.controller_id.0,
824 brokers,
825 })
826 }
827
828 #[instrument(name = "admin.list_consumer_groups", level = "debug", skip(self))]
829 pub async fn list_consumer_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
831 let response: ListGroupsResponse = self
832 .send_request::<ListGroupsRequest>(
833 LIST_GROUPS_VERSION_CAP,
834 &ListGroupsRequest::default(),
835 )
836 .await?;
837 if let Some(error) = response.error_code.err() {
838 return Err(anyhow!("list consumer groups failed: {error}").into());
839 }
840
841 let mut groups = response
842 .groups
843 .into_iter()
844 .map(|group| ConsumerGroupListing {
845 group_id: group.group_id.to_string(),
846 protocol_type: group.protocol_type.to_string(),
847 state: (!group.group_state.is_empty()).then(|| group.group_state.to_string()),
848 group_type: (!group.group_type.is_empty()).then(|| group.group_type.to_string()),
849 })
850 .collect::<Vec<_>>();
851 groups.sort_by(|left, right| left.group_id.cmp(&right.group_id));
852 Ok(groups)
853 }
854
855 #[instrument(
856 name = "admin.describe_consumer_groups",
857 level = "debug",
858 skip(self, groups)
859 )]
860 pub async fn describe_consumer_groups<I, S>(
862 &self,
863 groups: I,
864 ) -> Result<Vec<ConsumerGroupDescription>>
865 where
866 I: IntoIterator<Item = S>,
867 S: Into<String>,
868 {
869 let group_ids = groups
870 .into_iter()
871 .map(|group| validate_group_id(group.into()))
872 .collect::<Result<Vec<_>>>()?;
873 if group_ids.is_empty() {
874 return Ok(Vec::new());
875 }
876
877 let request = ConsumerGroupDescribeRequest::default()
878 .with_group_ids(
879 group_ids
880 .iter()
881 .cloned()
882 .map(StrBytes::from_string)
883 .map(Into::into)
884 .collect(),
885 )
886 .with_include_authorized_operations(false);
887 let response: ConsumerGroupDescribeResponse = self
888 .send_request::<ConsumerGroupDescribeRequest>(
889 CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
890 &request,
891 )
892 .await?;
893
894 let mut descriptions = BTreeMap::new();
895 for group in response.groups {
896 let group_id = group.group_id.to_string();
897 if let Some(error) = group.error_code.err() {
898 let message = group
899 .error_message
900 .as_ref()
901 .map(ToString::to_string)
902 .filter(|message| !message.is_empty())
903 .unwrap_or_else(|| error.to_string());
904 return Err(
905 anyhow!("describe consumer group '{group_id}' failed: {message}").into(),
906 );
907 }
908
909 descriptions.insert(
910 group_id.clone(),
911 ConsumerGroupDescription {
912 group_id,
913 state: group.group_state.to_string(),
914 protocol_type: "consumer".to_owned(),
915 protocol_data: group.assignor_name.to_string(),
916 members: group
917 .members
918 .into_iter()
919 .map(|member| ConsumerGroupMemberDescription {
920 member_id: member.member_id.to_string(),
921 group_instance_id: member
922 .instance_id
923 .map(|instance_id| instance_id.to_string()),
924 client_id: member.client_id.to_string(),
925 client_host: member.client_host.to_string(),
926 member_metadata_bytes: member.subscribed_topic_names.len(),
927 member_assignment_bytes: assignment_partition_count(&member.assignment),
928 })
929 .collect(),
930 authorized_operations: (group.authorized_operations != i32::MIN)
931 .then_some(group.authorized_operations),
932 },
933 );
934 }
935
936 group_ids
937 .into_iter()
938 .map(|group_id| {
939 descriptions.remove(&group_id).ok_or_else(|| {
940 anyhow!("describe groups response did not include group '{group_id}'").into()
941 })
942 })
943 .collect()
944 }
945
946 #[instrument(
947 name = "admin.delete_consumer_groups",
948 level = "debug",
949 skip(self, groups)
950 )]
951 pub async fn delete_consumer_groups<I, S>(&self, groups: I) -> Result<()>
953 where
954 I: IntoIterator<Item = S>,
955 S: Into<String>,
956 {
957 let group_ids = groups
958 .into_iter()
959 .map(|group| validate_group_id(group.into()))
960 .collect::<Result<Vec<_>>>()?;
961 if group_ids.is_empty() {
962 return Ok(());
963 }
964
965 let request = DeleteGroupsRequest::default().with_groups_names(
966 group_ids
967 .iter()
968 .cloned()
969 .map(StrBytes::from_string)
970 .map(Into::into)
971 .collect(),
972 );
973 let response: DeleteGroupsResponse = self
974 .send_request::<DeleteGroupsRequest>(DELETE_GROUPS_VERSION_CAP, &request)
975 .await?;
976
977 for result in response.results {
978 if let Some(error) = result.error_code.err() {
979 return Err(anyhow!(
980 "delete consumer group '{}' failed: {error}",
981 &*result.group_id
982 )
983 .into());
984 }
985 }
986 Ok(())
987 }
988
989 #[instrument(
990 name = "admin.describe_configs",
991 level = "debug",
992 skip(self, resources)
993 )]
994 pub async fn describe_configs<I>(&self, resources: I) -> Result<Vec<ConfigResourceConfig>>
996 where
997 I: IntoIterator<Item = ConfigResource>,
998 {
999 let resources = resources.into_iter().collect::<Vec<_>>();
1000 if resources.is_empty() {
1001 return Ok(Vec::new());
1002 }
1003
1004 let request = DescribeConfigsRequest::default()
1005 .with_resources(
1006 resources
1007 .iter()
1008 .map(|resource| {
1009 DescribeConfigsResource::default()
1010 .with_resource_type(resource.resource_type.as_protocol_value())
1011 .with_resource_name(StrBytes::from_string(
1012 resource.resource_name.clone(),
1013 ))
1014 .with_configuration_keys(None)
1015 })
1016 .collect(),
1017 )
1018 .with_include_synonyms(false);
1019 let response: DescribeConfigsResponse = self
1020 .send_request::<DescribeConfigsRequest>(DESCRIBE_CONFIGS_VERSION_CAP, &request)
1021 .await?;
1022
1023 let mut described = BTreeMap::new();
1024 for resource in response.results {
1025 let resource_type = ConfigResourceType::from_protocol_value(resource.resource_type);
1026 let resource_name = resource.resource_name.to_string();
1027 if let Some(error) = resource.error_code.err() {
1028 return Err(anyhow!(
1029 "describe configs for {:?} '{}' failed: {}",
1030 resource_type,
1031 resource_name,
1032 resource
1033 .error_message
1034 .as_ref()
1035 .map(|message| message.to_string())
1036 .filter(|message| !message.is_empty())
1037 .unwrap_or_else(|| error.to_string())
1038 )
1039 .into());
1040 }
1041
1042 let entries = resource
1043 .configs
1044 .into_iter()
1045 .map(|entry| {
1046 let name = entry.name.to_string();
1047 let config_entry = ConfigEntry {
1048 name: name.clone(),
1049 value: entry.value.map(|value| value.to_string()),
1050 read_only: entry.read_only,
1051 config_source: entry.config_source,
1052 is_sensitive: entry.is_sensitive,
1053 config_type: (response_supported_config_type(entry.config_type))
1054 .then_some(entry.config_type),
1055 documentation: entry.documentation.map(|doc| doc.to_string()),
1056 };
1057 (name, config_entry)
1058 })
1059 .collect();
1060
1061 described.insert(
1062 (resource_type, resource_name.clone()),
1063 ConfigResourceConfig {
1064 resource: ConfigResource::new(resource_type, resource_name),
1065 entries,
1066 },
1067 );
1068 }
1069
1070 resources
1071 .into_iter()
1072 .map(|resource| {
1073 described
1074 .remove(&(resource.resource_type, resource.resource_name.clone()))
1075 .ok_or_else(|| {
1076 anyhow!("describe configs response did not include {:?}", resource).into()
1077 })
1078 })
1079 .collect()
1080 }
1081
1082 #[instrument(
1083 name = "admin.incremental_alter_configs",
1084 level = "debug",
1085 skip(self, resources)
1086 )]
1087 pub async fn incremental_alter_configs<I>(&self, resources: I) -> Result<()>
1089 where
1090 I: IntoIterator<Item = (ConfigResource, Vec<AlterConfigOp>)>,
1091 {
1092 let resources = resources
1093 .into_iter()
1094 .map(|(resource, ops)| {
1095 AlterConfigsResource::default()
1096 .with_resource_type(resource.resource_type.as_protocol_value())
1097 .with_resource_name(StrBytes::from_string(resource.resource_name))
1098 .with_configs(
1099 ops.into_iter()
1100 .map(|op| {
1101 AlterableConfig::default()
1102 .with_name(StrBytes::from_string(op.name))
1103 .with_config_operation(op.op_type.as_protocol_value())
1104 .with_value(op.value.map(StrBytes::from_string))
1105 })
1106 .collect(),
1107 )
1108 })
1109 .collect::<Vec<_>>();
1110 if resources.is_empty() {
1111 return Ok(());
1112 }
1113
1114 let request = IncrementalAlterConfigsRequest::default()
1115 .with_resources(resources)
1116 .with_validate_only(false);
1117 let response: IncrementalAlterConfigsResponse = self
1118 .send_request::<IncrementalAlterConfigsRequest>(
1119 INCREMENTAL_ALTER_CONFIGS_VERSION_CAP,
1120 &request,
1121 )
1122 .await?;
1123
1124 for resource in response.responses {
1125 if let Some(error) = resource.error_code.err() {
1126 return Err(anyhow!(
1127 "incremental alter configs for {:?} '{}' failed: {}",
1128 ConfigResourceType::from_protocol_value(resource.resource_type),
1129 resource.resource_name,
1130 resource
1131 .error_message
1132 .as_ref()
1133 .map(|message| message.to_string())
1134 .filter(|message| !message.is_empty())
1135 .unwrap_or_else(|| error.to_string())
1136 )
1137 .into());
1138 }
1139 }
1140
1141 Ok(())
1142 }
1143
1144 #[instrument(
1145 name = "admin.upsert_scram_credential",
1146 level = "debug",
1147 skip(self, user, password)
1148 )]
1149 pub async fn upsert_scram_credential(
1151 &self,
1152 user: impl Into<String>,
1153 mechanism: SaslMechanism,
1154 password: impl AsRef<[u8]>,
1155 ) -> Result<()> {
1156 self.upsert_scram_credential_with_iterations(
1157 user,
1158 mechanism,
1159 password,
1160 scram::MIN_ITERATIONS,
1161 )
1162 .await
1163 }
1164
1165 #[instrument(
1166 name = "admin.upsert_scram_credential_with_iterations",
1167 level = "debug",
1168 skip(self, user, password)
1169 )]
1170 pub async fn upsert_scram_credential_with_iterations(
1172 &self,
1173 user: impl Into<String>,
1174 mechanism: SaslMechanism,
1175 password: impl AsRef<[u8]>,
1176 iterations: i32,
1177 ) -> Result<()> {
1178 let user = user.into();
1179 let mechanism_type = mechanism
1180 .scram_type()
1181 .ok_or_else(|| anyhow!("SCRAM credential upsertion requires a SCRAM mechanism"))?;
1182 let salt = scram::secure_random_bytes()?;
1183 let salted_password =
1184 scram::salted_password(mechanism, password.as_ref(), &salt, iterations)?;
1185 let request = AlterUserScramCredentialsRequest::default().with_upsertions(vec![
1186 ScramCredentialUpsertion::default()
1187 .with_name(StrBytes::from_string(user.clone()))
1188 .with_mechanism(mechanism_type)
1189 .with_iterations(iterations)
1190 .with_salt(Bytes::from(salt))
1191 .with_salted_password(Bytes::from(salted_password)),
1192 ]);
1193 let response: AlterUserScramCredentialsResponse = self
1194 .send_request::<AlterUserScramCredentialsRequest>(
1195 ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP,
1196 &request,
1197 )
1198 .await?;
1199
1200 for result in response.results {
1201 if let Some(error) = result.error_code.err() {
1202 return Err(anyhow!(
1203 "alter SCRAM credential for user '{}' failed: {}",
1204 result.user,
1205 result
1206 .error_message
1207 .as_ref()
1208 .map(|message| message.to_string())
1209 .filter(|message| !message.is_empty())
1210 .unwrap_or_else(|| error.to_string())
1211 )
1212 .into());
1213 }
1214 }
1215
1216 Ok(())
1217 }
1218
1219 pub fn config(&self) -> &AdminConfig {
1221 &self.config
1222 }
1223
1224 pub async fn finalized_feature_levels(&self) -> Result<Vec<BrokerFeatureLevel>> {
1226 let connection = connect_to_any_bootstrap(
1227 &self.config.bootstrap_servers,
1228 &self.config.client_id,
1229 self.config.request_timeout,
1230 self.config.security_protocol,
1231 &self.config.tls,
1232 &self.config.sasl,
1233 &self.config.tcp_connector,
1234 )
1235 .await?;
1236 Ok(connection
1237 .finalized_feature_levels()
1238 .into_iter()
1239 .map(|(name, level)| BrokerFeatureLevel { name, level })
1240 .collect())
1241 }
1242
1243 pub async fn update_features<I>(&self, updates: I) -> Result<()>
1245 where
1246 I: IntoIterator<Item = FeatureUpdate>,
1247 {
1248 self.update_features_inner(updates, false).await
1249 }
1250
1251 pub async fn validate_feature_updates<I>(&self, updates: I) -> Result<()>
1253 where
1254 I: IntoIterator<Item = FeatureUpdate>,
1255 {
1256 self.update_features_inner(updates, true).await
1257 }
1258
1259 async fn update_features_inner<I>(&self, updates: I, validate_only: bool) -> Result<()>
1260 where
1261 I: IntoIterator<Item = FeatureUpdate>,
1262 {
1263 let (mut connection, version) = self
1264 .connect_with_version::<UpdateFeaturesRequest>(UPDATE_FEATURES_VERSION_CAP)
1265 .await?;
1266 let feature_updates = updates
1267 .into_iter()
1268 .map(|update| update.into_request_update(version))
1269 .collect::<Result<Vec<_>>>()?;
1270 if feature_updates.is_empty() {
1271 return Ok(());
1272 }
1273
1274 let mut request = UpdateFeaturesRequest::default()
1275 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
1276 .with_feature_updates(feature_updates);
1277 if version >= 1 {
1278 request = request.with_validate_only(validate_only);
1279 } else if validate_only {
1280 return Err(anyhow!("validate-only feature updates require UpdateFeatures v1+").into());
1281 }
1282
1283 let response: UpdateFeaturesResponse = connection
1284 .send_request::<UpdateFeaturesRequest>(&self.config.client_id, version, &request)
1285 .await?;
1286 if let Some(error) = response.error_code.err() {
1287 return Err(anyhow!(
1288 "update features failed: {}",
1289 response
1290 .error_message
1291 .as_ref()
1292 .map(|message| message.to_string())
1293 .filter(|message| !message.is_empty())
1294 .unwrap_or_else(|| error.to_string())
1295 )
1296 .into());
1297 }
1298
1299 for result in response.results {
1300 if let Some(error) = result.error_code.err() {
1301 let feature = result.feature.to_string();
1302 return Err(anyhow!(
1303 "update feature '{feature}' failed: {}",
1304 result
1305 .error_message
1306 .as_ref()
1307 .map(|message| message.to_string())
1308 .filter(|message| !message.is_empty())
1309 .unwrap_or_else(|| error.to_string())
1310 )
1311 .into());
1312 }
1313 }
1314
1315 Ok(())
1316 }
1317
1318 async fn warm_up(&self) -> Result<()> {
1319 let _ = connect_to_any_bootstrap(
1320 &self.config.bootstrap_servers,
1321 &self.config.client_id,
1322 self.config.request_timeout,
1323 self.config.security_protocol,
1324 &self.config.tls,
1325 &self.config.sasl,
1326 &self.config.tcp_connector,
1327 )
1328 .await?;
1329 Ok(())
1330 }
1331
1332 async fn fetch_metadata(&self, topics: Option<&[String]>) -> Result<MetadataResponse> {
1333 let (mut connection, version) = self
1334 .connect_with_version::<MetadataRequest>(METADATA_VERSION_CAP)
1335 .await?;
1336 let request = MetadataRequest::default()
1337 .with_topics(topics.map(|topics| {
1338 topics
1339 .iter()
1340 .cloned()
1341 .map(StrBytes::from_string)
1342 .map(|name| MetadataRequestTopic::default().with_name(Some(name.into())))
1343 .collect()
1344 }))
1345 .with_allow_auto_topic_creation(false)
1346 .with_include_cluster_authorized_operations(false)
1347 .with_include_topic_authorized_operations(false);
1348 Ok(connection
1349 .send_request::<MetadataRequest>(&self.config.client_id, version, &request)
1350 .await?)
1351 }
1352
1353 async fn send_request<Req>(&self, version_cap: i16, request: &Req) -> Result<Req::Response>
1354 where
1355 Req: Request,
1356 {
1357 let (mut connection, version) = self.connect_with_version::<Req>(version_cap).await?;
1358 Ok(connection
1359 .send_request::<Req>(&self.config.client_id, version, request)
1360 .await?)
1361 }
1362
1363 async fn connect_with_version<Req>(&self, version_cap: i16) -> Result<(BrokerConnection, i16)>
1364 where
1365 Req: Request,
1366 {
1367 let connection = connect_to_any_bootstrap(
1368 &self.config.bootstrap_servers,
1369 &self.config.client_id,
1370 self.config.request_timeout,
1371 self.config.security_protocol,
1372 &self.config.tls,
1373 &self.config.sasl,
1374 &self.config.tcp_connector,
1375 )
1376 .await?;
1377 let version = connection.version_with_cap::<Req>(version_cap)?;
1378 Ok((connection, version))
1379 }
1380}
1381
1382fn validate_topic_name(topic: String) -> Result<String> {
1383 let topic = topic.trim();
1384 if topic.is_empty() {
1385 return Err(AdminError::EmptyTopicName.into());
1386 }
1387
1388 Ok(topic.to_owned())
1389}
1390
1391fn validate_group_id(group_id: String) -> Result<String> {
1392 let group_id = group_id.trim();
1393 if group_id.is_empty() {
1394 return Err(anyhow!("consumer group id cannot be empty").into());
1395 }
1396
1397 Ok(group_id.to_owned())
1398}
1399
1400fn validate_feature_name(feature: String) -> Result<String> {
1401 let feature = feature.trim();
1402 if feature.is_empty() {
1403 return Err(anyhow!("feature names must be non-empty").into());
1404 }
1405
1406 Ok(feature.to_owned())
1407}
1408
1409fn assignment_partition_count(
1410 assignment: &kafka_protocol::messages::consumer_group_describe_response::Assignment,
1411) -> usize {
1412 assignment
1413 .topic_partitions
1414 .iter()
1415 .map(|topic| topic.partitions.len())
1416 .sum()
1417}
1418
1419fn is_ignorable_create_topic_error(error: ResponseError) -> bool {
1420 error == ResponseError::TopicAlreadyExists
1421}
1422
1423fn response_supported_config_type(config_type: i8) -> bool {
1424 config_type >= 0
1425}
1426
1427#[cfg(test)]
1428mod tests {
1429 use super::*;
1430
1431 #[test]
1432 fn new_topic_maps_to_create_topics_request() {
1433 let topic = NewTopic::new("orders", 3, 2)
1434 .with_config("cleanup.policy", "compact")
1435 .into_request_topic()
1436 .expect("topic should be valid");
1437
1438 assert_eq!(topic.name.0.to_string(), "orders");
1439 assert_eq!(topic.num_partitions, 3);
1440 assert_eq!(topic.replication_factor, 2);
1441 assert_eq!(topic.configs.len(), 1);
1442 assert_eq!(topic.configs[0].name.to_string(), "cleanup.policy");
1443 }
1444
1445 #[test]
1446 fn new_topic_rejects_invalid_partition_count() {
1447 let error = NewTopic::new("orders", 0, 1)
1448 .into_request_topic()
1449 .expect_err("invalid topic should fail");
1450 assert!(
1451 error
1452 .to_string()
1453 .contains("topic partition count must be positive")
1454 );
1455 }
1456
1457 #[test]
1458 fn new_topic_rejects_empty_names_and_invalid_replication_factor() {
1459 let error = NewTopic::new(" ", 1, 1).into_request_topic().unwrap_err();
1460 assert!(matches!(
1461 error,
1462 crate::Error::Admin(AdminError::EmptyTopicName)
1463 ));
1464
1465 let error = NewTopic::new("orders", 1, 0)
1466 .into_request_topic()
1467 .unwrap_err();
1468 assert!(matches!(
1469 error,
1470 crate::Error::Admin(AdminError::InvalidReplicationFactor {
1471 replication_factor: 0
1472 })
1473 ));
1474 }
1475
1476 #[test]
1477 fn new_partitions_maps_assignments_and_rejects_invalid_input() {
1478 let topic = NewPartitions::increase_to(4)
1479 .with_assignment([1, 2])
1480 .with_assignment([2, 3])
1481 .into_request_topic("orders".to_owned())
1482 .unwrap();
1483 assert_eq!(topic.name.to_string(), "orders");
1484 assert_eq!(topic.count, 4);
1485 assert_eq!(topic.assignments.unwrap().len(), 2);
1486
1487 let error = NewPartitions::increase_to(0)
1488 .into_request_topic("orders".to_owned())
1489 .unwrap_err();
1490 assert!(matches!(
1491 error,
1492 crate::Error::Admin(AdminError::InvalidPartitionCount { partitions: 0 })
1493 ));
1494
1495 let error = NewPartitions::increase_to(1)
1496 .into_request_topic(" ".to_owned())
1497 .unwrap_err();
1498 assert!(matches!(
1499 error,
1500 crate::Error::Admin(AdminError::EmptyTopicName)
1501 ));
1502 }
1503
1504 #[test]
1505 fn config_resource_and_operation_protocol_values_are_stable() {
1506 assert_eq!(ConfigResourceType::Unknown.as_protocol_value(), 0);
1507 assert_eq!(ConfigResourceType::Topic.as_protocol_value(), 2);
1508 assert_eq!(ConfigResourceType::Broker.as_protocol_value(), 4);
1509 assert_eq!(ConfigResourceType::BrokerLogger.as_protocol_value(), 8);
1510 assert_eq!(ConfigResourceType::ClientMetrics.as_protocol_value(), 16);
1511 assert_eq!(ConfigResourceType::Group.as_protocol_value(), 32);
1512 assert_eq!(
1513 ConfigResourceType::from_protocol_value(2),
1514 ConfigResourceType::Topic
1515 );
1516 assert_eq!(
1517 ConfigResourceType::from_protocol_value(99),
1518 ConfigResourceType::Unknown
1519 );
1520
1521 assert_eq!(
1522 AlterConfigOp::set("cleanup.policy", "compact")
1523 .op_type
1524 .as_protocol_value(),
1525 0
1526 );
1527 assert_eq!(
1528 AlterConfigOp::delete("retention.ms")
1529 .op_type
1530 .as_protocol_value(),
1531 1
1532 );
1533 assert_eq!(AlterConfigOpType::Append.as_protocol_value(), 2);
1534 assert_eq!(AlterConfigOpType::Subtract.as_protocol_value(), 3);
1535 assert!(response_supported_config_type(0));
1536 assert!(!response_supported_config_type(-1));
1537 }
1538
1539 #[test]
1540 fn config_resource_config_entry_lookup_returns_named_entry() {
1541 let mut entries = BTreeMap::new();
1542 entries.insert(
1543 "cleanup.policy".to_owned(),
1544 ConfigEntry {
1545 name: "cleanup.policy".to_owned(),
1546 value: Some("compact".to_owned()),
1547 read_only: false,
1548 config_source: 1,
1549 is_sensitive: false,
1550 config_type: Some(2),
1551 documentation: None,
1552 },
1553 );
1554 let config = ConfigResourceConfig {
1555 resource: ConfigResource::topic("orders"),
1556 entries,
1557 };
1558
1559 assert_eq!(
1560 config.entry("cleanup.policy").unwrap().value.as_deref(),
1561 Some("compact")
1562 );
1563 assert!(config.entry("missing").is_none());
1564 }
1565
1566 #[test]
1567 fn feature_update_maps_to_modern_update_features_request() {
1568 let update = FeatureUpdate::upgrade("share.version", 1)
1569 .into_request_update(2)
1570 .unwrap();
1571 assert_eq!(update.feature.to_string(), "share.version");
1572 assert_eq!(update.max_version_level, 1);
1573 assert_eq!(update.upgrade_type, 1);
1574 assert!(!update.allow_downgrade);
1575 }
1576
1577 #[test]
1578 fn feature_update_maps_to_legacy_downgrade_flag() {
1579 let update = FeatureUpdate::safe_downgrade("metadata.version", 0)
1580 .into_request_update(0)
1581 .unwrap();
1582 assert_eq!(update.feature.to_string(), "metadata.version");
1583 assert_eq!(update.max_version_level, 0);
1584 assert_eq!(update.upgrade_type, 1);
1585 assert!(update.allow_downgrade);
1586 }
1587
1588 #[test]
1589 fn feature_update_rejects_empty_name() {
1590 let error = FeatureUpdate::upgrade(" ", 1)
1591 .into_request_update(2)
1592 .unwrap_err();
1593 assert!(
1594 error
1595 .to_string()
1596 .contains("feature names must be non-empty")
1597 );
1598 }
1599
1600 #[test]
1601 fn topic_already_exists_is_ignorable() {
1602 assert!(is_ignorable_create_topic_error(
1603 ResponseError::TopicAlreadyExists
1604 ));
1605 assert!(!is_ignorable_create_topic_error(
1606 ResponseError::UnknownTopicOrPartition
1607 ));
1608 }
1609}