1use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use bytes::{Buf, Bytes};
19use kafka_protocol::error::{ParseResponseErrorCode, ResponseError};
20use kafka_protocol::messages::alter_user_scram_credentials_request::ScramCredentialUpsertion;
21use kafka_protocol::messages::create_partitions_request::{
22 CreatePartitionsAssignment, CreatePartitionsTopic,
23};
24use kafka_protocol::messages::create_topics_request::{CreatableTopic, CreatableTopicConfig};
25use kafka_protocol::messages::describe_configs_request::DescribeConfigsResource;
26use kafka_protocol::messages::incremental_alter_configs_request::{
27 AlterConfigsResource, AlterableConfig,
28};
29use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
30use kafka_protocol::messages::update_features_request::FeatureUpdateKey;
31use kafka_protocol::messages::{
32 AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse,
33 ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerProtocolAssignment,
34 ConsumerProtocolSubscription, CreatePartitionsRequest, CreatePartitionsResponse,
35 CreateTopicsRequest, CreateTopicsResponse, DeleteGroupsRequest, DeleteGroupsResponse,
36 DeleteTopicsRequest, DeleteTopicsResponse, DescribeClusterRequest, DescribeClusterResponse,
37 DescribeConfigsRequest, DescribeConfigsResponse, DescribeGroupsRequest, DescribeGroupsResponse,
38 IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse, ListGroupsRequest,
39 ListGroupsResponse, MetadataRequest, MetadataResponse, ShareGroupDescribeRequest,
40 ShareGroupDescribeResponse, UpdateFeaturesRequest, UpdateFeaturesResponse,
41};
42use kafka_protocol::protocol::{Decodable, Request, StrBytes};
43use tracing::{debug, instrument};
44use uuid::Uuid;
45
46use crate::config::{AdminConfig, SaslMechanism};
47use crate::constants::{
48 ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP, CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
49 CREATE_PARTITIONS_VERSION_CAP, CREATE_TOPICS_VERSION_CAP, DELETE_GROUPS_VERSION_CAP,
50 DELETE_TOPICS_VERSION_CAP, DESCRIBE_CLUSTER_VERSION_CAP, DESCRIBE_CONFIGS_VERSION_CAP,
51 DESCRIBE_GROUPS_VERSION_CAP, INCREMENTAL_ALTER_CONFIGS_VERSION_CAP, LIST_GROUPS_VERSION_CAP,
52 METADATA_VERSION_CAP, SHARE_GROUP_DESCRIBE_VERSION_CAP, UPDATE_FEATURES_VERSION_CAP,
53};
54use crate::network::scram;
55use crate::network::{BrokerConnection, connect_to_any_bootstrap, duration_to_i32_ms};
56use crate::{AdminError, Result};
57
58#[derive(Debug, Clone)]
59pub struct NewTopic {
61 pub name: String,
63 pub num_partitions: i32,
65 pub replication_factor: i16,
67 pub configs: BTreeMap<String, String>,
69}
70
71impl NewTopic {
72 pub fn new(name: impl Into<String>, num_partitions: i32, replication_factor: i16) -> Self {
74 Self {
75 name: name.into(),
76 num_partitions,
77 replication_factor,
78 configs: BTreeMap::new(),
79 }
80 }
81
82 pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
84 self.configs.insert(key.into(), value.into());
85 self
86 }
87
88 fn into_request_topic(self) -> Result<CreatableTopic> {
89 let name = validate_topic_name(self.name)?;
90 if self.num_partitions <= 0 {
91 return Err(AdminError::InvalidPartitionCount {
92 partitions: self.num_partitions,
93 }
94 .into());
95 }
96 if self.replication_factor <= 0 {
97 return Err(AdminError::InvalidReplicationFactor {
98 replication_factor: self.replication_factor,
99 }
100 .into());
101 }
102
103 let configs = self
104 .configs
105 .into_iter()
106 .map(|(key, value)| {
107 CreatableTopicConfig::default()
108 .with_name(StrBytes::from_string(key))
109 .with_value(Some(StrBytes::from_string(value)))
110 })
111 .collect();
112
113 Ok(CreatableTopic::default()
114 .with_name(StrBytes::from_string(name).into())
115 .with_num_partitions(self.num_partitions)
116 .with_replication_factor(self.replication_factor)
117 .with_configs(configs))
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct NewPartitions {
124 pub total_count: i32,
126 pub assignments: Vec<Vec<i32>>,
128}
129
130impl NewPartitions {
131 pub fn increase_to(total_count: i32) -> Self {
133 Self {
134 total_count,
135 assignments: Vec::new(),
136 }
137 }
138
139 pub fn with_assignment<I>(mut self, broker_ids: I) -> Self
141 where
142 I: IntoIterator<Item = i32>,
143 {
144 self.assignments.push(broker_ids.into_iter().collect());
145 self
146 }
147
148 fn into_request_topic(self, topic_name: String) -> Result<CreatePartitionsTopic> {
149 let name = validate_topic_name(topic_name)?;
150 if self.total_count <= 0 {
151 return Err(AdminError::InvalidPartitionCount {
152 partitions: self.total_count,
153 }
154 .into());
155 }
156
157 let assignments = (!self.assignments.is_empty()).then(|| {
158 self.assignments
159 .into_iter()
160 .map(|broker_ids| {
161 CreatePartitionsAssignment::default()
162 .with_broker_ids(broker_ids.into_iter().map(Into::into).collect())
163 })
164 .collect()
165 });
166
167 Ok(CreatePartitionsTopic::default()
168 .with_name(StrBytes::from_string(name).into())
169 .with_count(self.total_count)
170 .with_assignments(assignments))
171 }
172}
173
174#[derive(Debug, Clone, PartialEq, Eq)]
175pub struct TopicListing {
177 pub name: String,
179 pub topic_id: Option<Uuid>,
181 pub is_internal: bool,
183}
184
185#[derive(Debug, Clone, PartialEq, Eq)]
186pub struct TopicPartitionDescription {
188 pub partition: i32,
190 pub leader_id: i32,
192 pub leader_epoch: i32,
194 pub replica_nodes: Vec<i32>,
196 pub isr_nodes: Vec<i32>,
198 pub offline_replicas: Vec<i32>,
200}
201
202#[derive(Debug, Clone, PartialEq, Eq)]
203pub struct TopicDescription {
205 pub name: String,
207 pub topic_id: Option<Uuid>,
209 pub is_internal: bool,
211 pub partitions: Vec<TopicPartitionDescription>,
213}
214
215#[derive(Debug, Clone, PartialEq, Eq)]
216pub struct BrokerDescription {
218 pub broker_id: i32,
220 pub host: String,
222 pub port: i32,
224 pub rack: Option<String>,
226 pub is_fenced: bool,
228}
229
230#[derive(Debug, Clone, PartialEq, Eq)]
231pub struct ClusterDescription {
233 pub cluster_id: String,
235 pub controller_id: i32,
237 pub brokers: Vec<BrokerDescription>,
239}
240
241#[derive(Debug, Clone, PartialEq, Eq)]
242pub struct BrokerFeatureLevel {
244 pub name: String,
246 pub level: i16,
248}
249
250#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct FeatureUpdate {
253 pub name: String,
255 pub max_version_level: i16,
257 pub upgrade_type: FeatureUpgradeType,
259}
260
261impl FeatureUpdate {
262 pub fn upgrade(name: impl Into<String>, max_version_level: i16) -> Self {
264 Self {
265 name: name.into(),
266 max_version_level,
267 upgrade_type: FeatureUpgradeType::Upgrade,
268 }
269 }
270
271 pub fn safe_downgrade(name: impl Into<String>, max_version_level: i16) -> Self {
273 Self {
274 name: name.into(),
275 max_version_level,
276 upgrade_type: FeatureUpgradeType::SafeDowngrade,
277 }
278 }
279
280 pub fn unsafe_downgrade(name: impl Into<String>, max_version_level: i16) -> Self {
282 Self {
283 name: name.into(),
284 max_version_level,
285 upgrade_type: FeatureUpgradeType::UnsafeDowngrade,
286 }
287 }
288
289 fn into_request_update(self, version: i16) -> Result<FeatureUpdateKey> {
290 let name = validate_feature_name(self.name)?;
291 let allow_downgrade = self.upgrade_type.allows_downgrade();
292 let mut update = FeatureUpdateKey::default()
293 .with_feature(StrBytes::from_string(name))
294 .with_max_version_level(self.max_version_level);
295 if version == 0 {
296 update = update.with_allow_downgrade(allow_downgrade);
297 } else {
298 update = update.with_upgrade_type(self.upgrade_type.as_protocol_value());
299 }
300 Ok(update)
301 }
302}
303
304#[derive(Debug, Clone, Copy, PartialEq, Eq)]
305pub enum FeatureUpgradeType {
307 Upgrade,
309 SafeDowngrade,
311 UnsafeDowngrade,
313}
314
315impl FeatureUpgradeType {
316 fn as_protocol_value(self) -> i8 {
317 match self {
318 Self::Upgrade => 1,
319 Self::SafeDowngrade => 2,
320 Self::UnsafeDowngrade => 3,
321 }
322 }
323
324 fn allows_downgrade(self) -> bool {
325 !matches!(self, Self::Upgrade)
326 }
327}
328
329#[derive(Debug, Clone, PartialEq, Eq)]
330pub struct ConsumerGroupListing {
332 pub group_id: String,
334 pub protocol_type: String,
336 pub state: Option<String>,
338 pub group_type: Option<String>,
340}
341
342#[derive(Debug, Clone, PartialEq, Eq)]
343pub struct ConsumerGroupDescription {
345 pub group_id: String,
347 pub state: String,
349 pub protocol_type: String,
351 pub protocol_data: String,
353 pub members: Vec<ConsumerGroupMemberDescription>,
355 pub authorized_operations: Option<i32>,
357}
358
359#[derive(Debug, Clone, PartialEq, Eq)]
360pub struct ConsumerGroupMemberDescription {
362 pub member_id: String,
364 pub group_instance_id: Option<String>,
366 pub client_id: String,
368 pub client_host: String,
370 pub member_metadata_bytes: usize,
372 pub member_assignment_bytes: usize,
374}
375
376pub type GroupListing = ConsumerGroupListing;
378
379pub type GroupDescription = ConsumerGroupDescription;
381
382pub type GroupMemberDescription = ConsumerGroupMemberDescription;
384
385#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
386pub enum ConfigResourceType {
388 Unknown,
390 Topic,
392 Broker,
394 BrokerLogger,
396 ClientMetrics,
398 Group,
400}
401
402impl ConfigResourceType {
403 fn as_protocol_value(self) -> i8 {
404 match self {
405 Self::Unknown => 0,
406 Self::Topic => 2,
407 Self::Broker => 4,
408 Self::BrokerLogger => 8,
409 Self::ClientMetrics => 16,
410 Self::Group => 32,
411 }
412 }
413
414 fn from_protocol_value(value: i8) -> Self {
415 match value {
416 2 => Self::Topic,
417 4 => Self::Broker,
418 8 => Self::BrokerLogger,
419 16 => Self::ClientMetrics,
420 32 => Self::Group,
421 _ => Self::Unknown,
422 }
423 }
424}
425
426#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
427pub struct ConfigResource {
429 pub resource_type: ConfigResourceType,
431 pub resource_name: String,
433}
434
435impl ConfigResource {
436 pub fn new(resource_type: ConfigResourceType, resource_name: impl Into<String>) -> Self {
438 Self {
439 resource_type,
440 resource_name: resource_name.into(),
441 }
442 }
443
444 pub fn topic(resource_name: impl Into<String>) -> Self {
446 Self::new(ConfigResourceType::Topic, resource_name)
447 }
448
449 pub fn group(resource_name: impl Into<String>) -> Self {
451 Self::new(ConfigResourceType::Group, resource_name)
452 }
453}
454
455#[derive(Debug, Clone, PartialEq, Eq)]
456pub struct ConfigEntry {
458 pub name: String,
460 pub value: Option<String>,
462 pub read_only: bool,
464 pub config_source: i8,
466 pub is_sensitive: bool,
468 pub config_type: Option<i8>,
470 pub documentation: Option<String>,
472}
473
474#[derive(Debug, Clone, PartialEq, Eq)]
475pub struct ConfigResourceConfig {
477 pub resource: ConfigResource,
479 pub entries: BTreeMap<String, ConfigEntry>,
481}
482
483impl ConfigResourceConfig {
484 pub fn entry(&self, name: &str) -> Option<&ConfigEntry> {
486 self.entries.get(name)
487 }
488}
489
490#[derive(Debug, Clone, Copy, PartialEq, Eq)]
491pub enum AlterConfigOpType {
493 Set,
495 Delete,
497 Append,
499 Subtract,
501}
502
503impl AlterConfigOpType {
504 fn as_protocol_value(self) -> i8 {
505 match self {
506 Self::Set => 0,
507 Self::Delete => 1,
508 Self::Append => 2,
509 Self::Subtract => 3,
510 }
511 }
512}
513
514#[derive(Debug, Clone, PartialEq, Eq)]
515pub struct AlterConfigOp {
517 pub name: String,
519 pub op_type: AlterConfigOpType,
521 pub value: Option<String>,
523}
524
525impl AlterConfigOp {
526 pub fn set(name: impl Into<String>, value: impl Into<String>) -> Self {
528 Self {
529 name: name.into(),
530 op_type: AlterConfigOpType::Set,
531 value: Some(value.into()),
532 }
533 }
534
535 pub fn delete(name: impl Into<String>) -> Self {
537 Self {
538 name: name.into(),
539 op_type: AlterConfigOpType::Delete,
540 value: None,
541 }
542 }
543}
544
545#[derive(Debug, Clone)]
546pub struct KafkaAdmin {
548 config: AdminConfig,
549}
550
551impl KafkaAdmin {
552 #[instrument(
553 name = "admin.connect",
554 level = "debug",
555 skip(config),
556 fields(
557 bootstrap_server_count = config.bootstrap_servers.len(),
558 client_id = %config.client_id
559 )
560 )]
561 pub async fn connect(config: AdminConfig) -> Result<Self> {
563 let admin = Self { config };
564 admin.warm_up().await?;
565 debug!("admin client connected");
566 Ok(admin)
567 }
568
569 #[instrument(name = "admin.create_topics", level = "debug", skip(self, topics))]
570 pub async fn create_topics<I>(&self, topics: I) -> Result<()>
572 where
573 I: IntoIterator<Item = NewTopic>,
574 {
575 let topics = topics
576 .into_iter()
577 .map(NewTopic::into_request_topic)
578 .collect::<Result<Vec<_>>>()?;
579 if topics.is_empty() {
580 return Ok(());
581 }
582
583 let request = CreateTopicsRequest::default()
584 .with_topics(topics)
585 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
586 .with_validate_only(false);
587 let response: CreateTopicsResponse = self
588 .send_request::<CreateTopicsRequest>(CREATE_TOPICS_VERSION_CAP, &request)
589 .await?;
590
591 for topic in response.topics {
592 let name = topic.name.0.to_string();
593 if let Some(error) = topic
594 .error_code
595 .err()
596 .filter(|error| !is_ignorable_create_topic_error(*error))
597 {
598 return Err(anyhow!("create topic '{name}' failed: {error}").into());
599 }
600 }
601
602 Ok(())
603 }
604
605 #[instrument(name = "admin.delete_topics", level = "debug", skip(self, topics))]
606 pub async fn delete_topics<I, S>(&self, topics: I) -> Result<()>
608 where
609 I: IntoIterator<Item = S>,
610 S: Into<String>,
611 {
612 let topic_names = topics
613 .into_iter()
614 .map(|topic| validate_topic_name(topic.into()))
615 .collect::<Result<Vec<_>>>()?;
616 if topic_names.is_empty() {
617 return Ok(());
618 }
619
620 let request = DeleteTopicsRequest::default()
621 .with_topic_names(
622 topic_names
623 .iter()
624 .cloned()
625 .map(StrBytes::from_string)
626 .map(Into::into)
627 .collect(),
628 )
629 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?);
630 let response: DeleteTopicsResponse = self
631 .send_request::<DeleteTopicsRequest>(DELETE_TOPICS_VERSION_CAP, &request)
632 .await?;
633
634 for topic in response.responses {
635 let name = topic
636 .name
637 .as_ref()
638 .map(|name| name.0.to_string())
639 .unwrap_or_else(|| "<unknown>".to_owned());
640 if let Some(error) = topic.error_code.err() {
641 return Err(anyhow!("delete topic '{name}' failed: {error}").into());
642 }
643 }
644
645 Ok(())
646 }
647
648 #[instrument(
649 name = "admin.create_partitions",
650 level = "debug",
651 skip(self, partitions)
652 )]
653 pub async fn create_partitions<I, S>(&self, partitions: I) -> Result<()>
655 where
656 I: IntoIterator<Item = (S, NewPartitions)>,
657 S: Into<String>,
658 {
659 let topics = partitions
660 .into_iter()
661 .map(|(topic, new_partitions)| new_partitions.into_request_topic(topic.into()))
662 .collect::<Result<Vec<_>>>()?;
663 if topics.is_empty() {
664 return Ok(());
665 }
666
667 let request = CreatePartitionsRequest::default()
668 .with_topics(topics)
669 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
670 .with_validate_only(false);
671 let response: CreatePartitionsResponse = self
672 .send_request::<CreatePartitionsRequest>(CREATE_PARTITIONS_VERSION_CAP, &request)
673 .await?;
674
675 for topic in response.results {
676 let name = topic.name.0.to_string();
677 if let Some(error) = topic.error_code.err() {
678 return Err(anyhow!(
679 "create partitions for topic '{name}' failed: {}",
680 topic
681 .error_message
682 .as_ref()
683 .map(|message| message.to_string())
684 .filter(|message| !message.is_empty())
685 .unwrap_or_else(|| error.to_string())
686 )
687 .into());
688 }
689 }
690
691 Ok(())
692 }
693
694 #[instrument(name = "admin.list_topics", level = "debug", skip(self))]
695 pub async fn list_topics(&self) -> Result<Vec<TopicListing>> {
697 let response = self.fetch_metadata(None).await?;
698 let mut topics = Vec::new();
699
700 for topic in response.topics {
701 let Some(name) = topic.name.as_ref().map(|name| name.0.to_string()) else {
702 continue;
703 };
704 if let Some(error) = topic.error_code.err() {
705 return Err(anyhow!("list topics failed for '{name}': {error}").into());
706 }
707
708 topics.push(TopicListing {
709 name,
710 topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
711 is_internal: topic.is_internal,
712 });
713 }
714
715 topics.sort_by(|left, right| left.name.cmp(&right.name));
716 Ok(topics)
717 }
718
719 #[instrument(name = "admin.describe_topics", level = "debug", skip(self, topics))]
720 pub async fn describe_topics<I, S>(&self, topics: I) -> Result<Vec<TopicDescription>>
722 where
723 I: IntoIterator<Item = S>,
724 S: Into<String>,
725 {
726 let requested_topics = topics
727 .into_iter()
728 .map(|topic| validate_topic_name(topic.into()))
729 .collect::<Result<Vec<_>>>()?;
730 if requested_topics.is_empty() {
731 return Ok(Vec::new());
732 }
733
734 let response = self.fetch_metadata(Some(&requested_topics)).await?;
735 let mut descriptions = BTreeMap::new();
736
737 for topic in response.topics {
738 let name = topic
739 .name
740 .as_ref()
741 .map(|name| name.0.to_string())
742 .unwrap_or_default();
743 if let Some(error) = topic.error_code.err() {
744 let label = if name.is_empty() { "<unknown>" } else { &name };
745 return Err(anyhow!("describe topic '{label}' failed: {error}").into());
746 }
747
748 let mut partitions = topic
749 .partitions
750 .into_iter()
751 .map(|partition| {
752 if let Some(error) = partition.error_code.err() {
753 return Err(anyhow!(
754 "describe topic '{name}' partition {} failed: {error}",
755 partition.partition_index
756 ));
757 }
758
759 Ok(TopicPartitionDescription {
760 partition: partition.partition_index,
761 leader_id: partition.leader_id.0,
762 leader_epoch: partition.leader_epoch,
763 replica_nodes: partition.replica_nodes.into_iter().map(|id| id.0).collect(),
764 isr_nodes: partition.isr_nodes.into_iter().map(|id| id.0).collect(),
765 offline_replicas: partition
766 .offline_replicas
767 .into_iter()
768 .map(|id| id.0)
769 .collect(),
770 })
771 })
772 .collect::<std::result::Result<Vec<_>, _>>()?;
773 partitions.sort_by_key(|partition| partition.partition);
774
775 descriptions.insert(
776 name.clone(),
777 TopicDescription {
778 name,
779 topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
780 is_internal: topic.is_internal,
781 partitions,
782 },
783 );
784 }
785
786 requested_topics
787 .into_iter()
788 .map(|topic| {
789 descriptions.remove(&topic).ok_or_else(|| {
790 anyhow!("metadata response did not include topic '{topic}'").into()
791 })
792 })
793 .collect()
794 }
795
796 #[instrument(name = "admin.describe_cluster", level = "debug", skip(self))]
797 pub async fn describe_cluster(&self) -> Result<ClusterDescription> {
799 let (mut connection, version) = self
800 .connect_with_version::<DescribeClusterRequest>(DESCRIBE_CLUSTER_VERSION_CAP)
801 .await?;
802 let mut request =
803 DescribeClusterRequest::default().with_include_cluster_authorized_operations(false);
804 if version >= 1 {
805 request = request.with_endpoint_type(1);
806 }
807 if version >= 2 {
808 request = request.with_include_fenced_brokers(true);
809 }
810
811 let response: DescribeClusterResponse = connection
812 .send_request::<DescribeClusterRequest>(&self.config.client_id, version, &request)
813 .await?;
814 if let Some(error) = response.error_code.err() {
815 return Err(anyhow!("describe cluster failed: {error}").into());
816 }
817
818 let mut brokers = response
819 .brokers
820 .into_iter()
821 .map(|broker| BrokerDescription {
822 broker_id: broker.broker_id.0,
823 host: broker.host.to_string(),
824 port: broker.port,
825 rack: broker.rack.map(|rack| rack.to_string()),
826 is_fenced: broker.is_fenced,
827 })
828 .collect::<Vec<_>>();
829 brokers.sort_by_key(|broker| broker.broker_id);
830
831 Ok(ClusterDescription {
832 cluster_id: response.cluster_id.to_string(),
833 controller_id: response.controller_id.0,
834 brokers,
835 })
836 }
837
838 #[instrument(name = "admin.list_groups", level = "debug", skip(self))]
839 pub async fn list_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
841 let response: ListGroupsResponse = self
842 .send_request::<ListGroupsRequest>(
843 LIST_GROUPS_VERSION_CAP,
844 &ListGroupsRequest::default(),
845 )
846 .await?;
847 if let Some(error) = response.error_code.err() {
848 return Err(anyhow!("list groups failed: {error}").into());
849 }
850
851 let mut groups = response
852 .groups
853 .into_iter()
854 .map(|group| ConsumerGroupListing {
855 group_id: group.group_id.to_string(),
856 protocol_type: group.protocol_type.to_string(),
857 state: (!group.group_state.is_empty()).then(|| group.group_state.to_string()),
858 group_type: (!group.group_type.is_empty()).then(|| group.group_type.to_string()),
859 })
860 .collect::<Vec<_>>();
861 groups.sort_by(|left, right| left.group_id.cmp(&right.group_id));
862 Ok(groups)
863 }
864
865 #[instrument(name = "admin.list_consumer_groups", level = "debug", skip(self))]
866 pub async fn list_consumer_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
868 self.list_groups().await
869 }
870
871 #[instrument(name = "admin.describe_groups", level = "debug", skip(self, groups))]
872 pub async fn describe_groups<I, S>(&self, groups: I) -> Result<Vec<GroupDescription>>
874 where
875 I: IntoIterator<Item = S>,
876 S: Into<String>,
877 {
878 let group_ids = groups
879 .into_iter()
880 .map(|group| validate_group_id(group.into()))
881 .collect::<Result<Vec<_>>>()?;
882 if group_ids.is_empty() {
883 return Ok(Vec::new());
884 }
885
886 let listed_group_types = self
887 .list_groups()
888 .await?
889 .into_iter()
890 .map(|group| (group.group_id, group.group_type))
891 .collect::<BTreeMap<_, _>>();
892 let mut consumer_groups = Vec::new();
893 let mut share_groups = Vec::new();
894 let mut classic_groups = Vec::new();
895 for group_id in &group_ids {
896 match listed_group_types
897 .get(group_id)
898 .and_then(|group_type| group_type.as_deref())
899 {
900 Some(group_type) if group_type.eq_ignore_ascii_case("share") => {
901 share_groups.push(group_id.clone());
902 }
903 Some(group_type) if group_type.eq_ignore_ascii_case("classic") => {
904 classic_groups.push(group_id.clone());
905 }
906 _ => consumer_groups.push(group_id.clone()),
907 }
908 }
909
910 let mut descriptions = BTreeMap::new();
911 for group in self.describe_consumer_groups(consumer_groups).await? {
912 descriptions.insert(group.group_id.clone(), group);
913 }
914 for group in self.describe_share_groups(share_groups).await? {
915 descriptions.insert(group.group_id.clone(), group);
916 }
917 for group in self.describe_classic_groups(classic_groups).await? {
918 descriptions.insert(group.group_id.clone(), group);
919 }
920
921 group_ids
922 .into_iter()
923 .map(|group_id| {
924 descriptions.remove(&group_id).ok_or_else(|| {
925 anyhow!("describe groups response did not include group '{group_id}'").into()
926 })
927 })
928 .collect()
929 }
930
931 #[instrument(
932 name = "admin.describe_consumer_groups",
933 level = "debug",
934 skip(self, groups)
935 )]
936 pub async fn describe_consumer_groups<I, S>(
938 &self,
939 groups: I,
940 ) -> Result<Vec<ConsumerGroupDescription>>
941 where
942 I: IntoIterator<Item = S>,
943 S: Into<String>,
944 {
945 let group_ids = groups
946 .into_iter()
947 .map(|group| validate_group_id(group.into()))
948 .collect::<Result<Vec<_>>>()?;
949 if group_ids.is_empty() {
950 return Ok(Vec::new());
951 }
952
953 let request = ConsumerGroupDescribeRequest::default()
954 .with_group_ids(
955 group_ids
956 .iter()
957 .cloned()
958 .map(StrBytes::from_string)
959 .map(Into::into)
960 .collect(),
961 )
962 .with_include_authorized_operations(false);
963 let response: ConsumerGroupDescribeResponse = self
964 .send_request::<ConsumerGroupDescribeRequest>(
965 CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
966 &request,
967 )
968 .await?;
969
970 let mut descriptions = BTreeMap::new();
971 for group in response.groups {
972 let group_id = group.group_id.to_string();
973 if let Some(error) = group.error_code.err() {
974 let message = group
975 .error_message
976 .as_ref()
977 .map(ToString::to_string)
978 .filter(|message| !message.is_empty())
979 .unwrap_or_else(|| error.to_string());
980 return Err(
981 anyhow!("describe consumer group '{group_id}' failed: {message}").into(),
982 );
983 }
984
985 descriptions.insert(
986 group_id.clone(),
987 ConsumerGroupDescription {
988 group_id,
989 state: group.group_state.to_string(),
990 protocol_type: "consumer".to_owned(),
991 protocol_data: group.assignor_name.to_string(),
992 members: group
993 .members
994 .into_iter()
995 .map(|member| ConsumerGroupMemberDescription {
996 member_id: member.member_id.to_string(),
997 group_instance_id: member
998 .instance_id
999 .map(|instance_id| instance_id.to_string()),
1000 client_id: member.client_id.to_string(),
1001 client_host: member.client_host.to_string(),
1002 member_metadata_bytes: member.subscribed_topic_names.len(),
1003 member_assignment_bytes: assignment_partition_count(&member.assignment),
1004 })
1005 .collect(),
1006 authorized_operations: (group.authorized_operations != i32::MIN)
1007 .then_some(group.authorized_operations),
1008 },
1009 );
1010 }
1011
1012 group_ids
1013 .into_iter()
1014 .map(|group_id| {
1015 descriptions.remove(&group_id).ok_or_else(|| {
1016 anyhow!("describe groups response did not include group '{group_id}'").into()
1017 })
1018 })
1019 .collect()
1020 }
1021
1022 #[instrument(
1023 name = "admin.describe_classic_groups",
1024 level = "debug",
1025 skip(self, groups)
1026 )]
1027 pub async fn describe_classic_groups<I, S>(
1029 &self,
1030 groups: I,
1031 ) -> Result<Vec<ConsumerGroupDescription>>
1032 where
1033 I: IntoIterator<Item = S>,
1034 S: Into<String>,
1035 {
1036 let group_ids = groups
1037 .into_iter()
1038 .map(|group| validate_group_id(group.into()))
1039 .collect::<Result<Vec<_>>>()?;
1040 if group_ids.is_empty() {
1041 return Ok(Vec::new());
1042 }
1043
1044 let request = DescribeGroupsRequest::default()
1045 .with_groups(
1046 group_ids
1047 .iter()
1048 .cloned()
1049 .map(StrBytes::from_string)
1050 .map(Into::into)
1051 .collect(),
1052 )
1053 .with_include_authorized_operations(false);
1054 let response: DescribeGroupsResponse = self
1055 .send_request::<DescribeGroupsRequest>(DESCRIBE_GROUPS_VERSION_CAP, &request)
1056 .await?;
1057
1058 let mut descriptions = BTreeMap::new();
1059 for group in response.groups {
1060 let group_id = group.group_id.to_string();
1061 if let Some(error) = group.error_code.err() {
1062 let message = group
1063 .error_message
1064 .as_ref()
1065 .map(ToString::to_string)
1066 .filter(|message| !message.is_empty())
1067 .unwrap_or_else(|| error.to_string());
1068 return Err(
1069 anyhow!("describe classic group '{group_id}' failed: {message}").into(),
1070 );
1071 }
1072
1073 descriptions.insert(
1074 group_id.clone(),
1075 ConsumerGroupDescription {
1076 group_id,
1077 state: group.group_state.to_string(),
1078 protocol_type: group.protocol_type.to_string(),
1079 protocol_data: group.protocol_data.to_string(),
1080 members: group
1081 .members
1082 .into_iter()
1083 .map(|member| {
1084 let member_metadata_bytes =
1085 classic_subscription_topic_count(&member.member_metadata);
1086 let member_assignment_bytes =
1087 classic_assignment_partition_count(&member.member_assignment);
1088 ConsumerGroupMemberDescription {
1089 member_id: member.member_id.to_string(),
1090 group_instance_id: member
1091 .group_instance_id
1092 .map(|instance_id| instance_id.to_string()),
1093 client_id: member.client_id.to_string(),
1094 client_host: member.client_host.to_string(),
1095 member_metadata_bytes,
1096 member_assignment_bytes,
1097 }
1098 })
1099 .collect(),
1100 authorized_operations: (group.authorized_operations != i32::MIN)
1101 .then_some(group.authorized_operations),
1102 },
1103 );
1104 }
1105
1106 group_ids
1107 .into_iter()
1108 .map(|group_id| {
1109 descriptions.remove(&group_id).ok_or_else(|| {
1110 anyhow!("describe classic groups response did not include group '{group_id}'")
1111 .into()
1112 })
1113 })
1114 .collect()
1115 }
1116
1117 #[instrument(
1118 name = "admin.describe_share_groups",
1119 level = "debug",
1120 skip(self, groups)
1121 )]
1122 pub async fn describe_share_groups<I, S>(
1124 &self,
1125 groups: I,
1126 ) -> Result<Vec<ConsumerGroupDescription>>
1127 where
1128 I: IntoIterator<Item = S>,
1129 S: Into<String>,
1130 {
1131 let group_ids = groups
1132 .into_iter()
1133 .map(|group| validate_group_id(group.into()))
1134 .collect::<Result<Vec<_>>>()?;
1135 if group_ids.is_empty() {
1136 return Ok(Vec::new());
1137 }
1138
1139 let request = ShareGroupDescribeRequest::default()
1140 .with_group_ids(
1141 group_ids
1142 .iter()
1143 .cloned()
1144 .map(StrBytes::from_string)
1145 .map(Into::into)
1146 .collect(),
1147 )
1148 .with_include_authorized_operations(false);
1149 let response: ShareGroupDescribeResponse = self
1150 .send_request::<ShareGroupDescribeRequest>(SHARE_GROUP_DESCRIBE_VERSION_CAP, &request)
1151 .await?;
1152
1153 let mut descriptions = BTreeMap::new();
1154 for group in response.groups {
1155 let group_id = group.group_id.to_string();
1156 if let Some(error) = group.error_code.err() {
1157 let message = group
1158 .error_message
1159 .as_ref()
1160 .map(ToString::to_string)
1161 .filter(|message| !message.is_empty())
1162 .unwrap_or_else(|| error.to_string());
1163 return Err(anyhow!("describe share group '{group_id}' failed: {message}").into());
1164 }
1165
1166 descriptions.insert(
1167 group_id.clone(),
1168 ConsumerGroupDescription {
1169 group_id,
1170 state: group.group_state.to_string(),
1171 protocol_type: "share".to_owned(),
1172 protocol_data: group.assignor_name.to_string(),
1173 members: group
1174 .members
1175 .into_iter()
1176 .map(|member| ConsumerGroupMemberDescription {
1177 member_id: member.member_id.to_string(),
1178 group_instance_id: None,
1179 client_id: member.client_id.to_string(),
1180 client_host: member.client_host.to_string(),
1181 member_metadata_bytes: member.subscribed_topic_names.len(),
1182 member_assignment_bytes: share_assignment_partition_count(
1183 &member.assignment,
1184 ),
1185 })
1186 .collect(),
1187 authorized_operations: (group.authorized_operations != i32::MIN)
1188 .then_some(group.authorized_operations),
1189 },
1190 );
1191 }
1192
1193 group_ids
1194 .into_iter()
1195 .map(|group_id| {
1196 descriptions.remove(&group_id).ok_or_else(|| {
1197 anyhow!("describe share groups response did not include group '{group_id}'")
1198 .into()
1199 })
1200 })
1201 .collect()
1202 }
1203
1204 #[instrument(name = "admin.delete_groups", level = "debug", skip(self, groups))]
1205 pub async fn delete_groups<I, S>(&self, groups: I) -> Result<()>
1207 where
1208 I: IntoIterator<Item = S>,
1209 S: Into<String>,
1210 {
1211 let group_ids = groups
1212 .into_iter()
1213 .map(|group| validate_group_id(group.into()))
1214 .collect::<Result<Vec<_>>>()?;
1215 if group_ids.is_empty() {
1216 return Ok(());
1217 }
1218
1219 let request = DeleteGroupsRequest::default().with_groups_names(
1220 group_ids
1221 .iter()
1222 .cloned()
1223 .map(StrBytes::from_string)
1224 .map(Into::into)
1225 .collect(),
1226 );
1227 let response: DeleteGroupsResponse = self
1228 .send_request::<DeleteGroupsRequest>(DELETE_GROUPS_VERSION_CAP, &request)
1229 .await?;
1230
1231 for result in response.results {
1232 if let Some(error) = result.error_code.err() {
1233 return Err(anyhow!("delete group '{}' failed: {error}", &*result.group_id).into());
1234 }
1235 }
1236 Ok(())
1237 }
1238
1239 #[instrument(
1240 name = "admin.delete_consumer_groups",
1241 level = "debug",
1242 skip(self, groups)
1243 )]
1244 pub async fn delete_consumer_groups<I, S>(&self, groups: I) -> Result<()>
1246 where
1247 I: IntoIterator<Item = S>,
1248 S: Into<String>,
1249 {
1250 self.delete_groups(groups).await
1251 }
1252
1253 #[instrument(
1254 name = "admin.describe_configs",
1255 level = "debug",
1256 skip(self, resources)
1257 )]
1258 pub async fn describe_configs<I>(&self, resources: I) -> Result<Vec<ConfigResourceConfig>>
1260 where
1261 I: IntoIterator<Item = ConfigResource>,
1262 {
1263 let resources = resources.into_iter().collect::<Vec<_>>();
1264 if resources.is_empty() {
1265 return Ok(Vec::new());
1266 }
1267
1268 let request = DescribeConfigsRequest::default()
1269 .with_resources(
1270 resources
1271 .iter()
1272 .map(|resource| {
1273 DescribeConfigsResource::default()
1274 .with_resource_type(resource.resource_type.as_protocol_value())
1275 .with_resource_name(StrBytes::from_string(
1276 resource.resource_name.clone(),
1277 ))
1278 .with_configuration_keys(None)
1279 })
1280 .collect(),
1281 )
1282 .with_include_synonyms(false);
1283 let response: DescribeConfigsResponse = self
1284 .send_request::<DescribeConfigsRequest>(DESCRIBE_CONFIGS_VERSION_CAP, &request)
1285 .await?;
1286
1287 let mut described = BTreeMap::new();
1288 for resource in response.results {
1289 let resource_type = ConfigResourceType::from_protocol_value(resource.resource_type);
1290 let resource_name = resource.resource_name.to_string();
1291 if let Some(error) = resource.error_code.err() {
1292 return Err(anyhow!(
1293 "describe configs for {:?} '{}' failed: {}",
1294 resource_type,
1295 resource_name,
1296 resource
1297 .error_message
1298 .as_ref()
1299 .map(|message| message.to_string())
1300 .filter(|message| !message.is_empty())
1301 .unwrap_or_else(|| error.to_string())
1302 )
1303 .into());
1304 }
1305
1306 let entries = resource
1307 .configs
1308 .into_iter()
1309 .map(|entry| {
1310 let name = entry.name.to_string();
1311 let config_entry = ConfigEntry {
1312 name: name.clone(),
1313 value: entry.value.map(|value| value.to_string()),
1314 read_only: entry.read_only,
1315 config_source: entry.config_source,
1316 is_sensitive: entry.is_sensitive,
1317 config_type: (response_supported_config_type(entry.config_type))
1318 .then_some(entry.config_type),
1319 documentation: entry.documentation.map(|doc| doc.to_string()),
1320 };
1321 (name, config_entry)
1322 })
1323 .collect();
1324
1325 described.insert(
1326 (resource_type, resource_name.clone()),
1327 ConfigResourceConfig {
1328 resource: ConfigResource::new(resource_type, resource_name),
1329 entries,
1330 },
1331 );
1332 }
1333
1334 resources
1335 .into_iter()
1336 .map(|resource| {
1337 described
1338 .remove(&(resource.resource_type, resource.resource_name.clone()))
1339 .ok_or_else(|| {
1340 anyhow!("describe configs response did not include {:?}", resource).into()
1341 })
1342 })
1343 .collect()
1344 }
1345
1346 #[instrument(
1347 name = "admin.incremental_alter_configs",
1348 level = "debug",
1349 skip(self, resources)
1350 )]
1351 pub async fn incremental_alter_configs<I>(&self, resources: I) -> Result<()>
1353 where
1354 I: IntoIterator<Item = (ConfigResource, Vec<AlterConfigOp>)>,
1355 {
1356 let resources = resources
1357 .into_iter()
1358 .map(|(resource, ops)| {
1359 AlterConfigsResource::default()
1360 .with_resource_type(resource.resource_type.as_protocol_value())
1361 .with_resource_name(StrBytes::from_string(resource.resource_name))
1362 .with_configs(
1363 ops.into_iter()
1364 .map(|op| {
1365 AlterableConfig::default()
1366 .with_name(StrBytes::from_string(op.name))
1367 .with_config_operation(op.op_type.as_protocol_value())
1368 .with_value(op.value.map(StrBytes::from_string))
1369 })
1370 .collect(),
1371 )
1372 })
1373 .collect::<Vec<_>>();
1374 if resources.is_empty() {
1375 return Ok(());
1376 }
1377
1378 let request = IncrementalAlterConfigsRequest::default()
1379 .with_resources(resources)
1380 .with_validate_only(false);
1381 let response: IncrementalAlterConfigsResponse = self
1382 .send_request::<IncrementalAlterConfigsRequest>(
1383 INCREMENTAL_ALTER_CONFIGS_VERSION_CAP,
1384 &request,
1385 )
1386 .await?;
1387
1388 for resource in response.responses {
1389 if let Some(error) = resource.error_code.err() {
1390 return Err(anyhow!(
1391 "incremental alter configs for {:?} '{}' failed: {}",
1392 ConfigResourceType::from_protocol_value(resource.resource_type),
1393 resource.resource_name,
1394 resource
1395 .error_message
1396 .as_ref()
1397 .map(|message| message.to_string())
1398 .filter(|message| !message.is_empty())
1399 .unwrap_or_else(|| error.to_string())
1400 )
1401 .into());
1402 }
1403 }
1404
1405 Ok(())
1406 }
1407
1408 #[instrument(
1409 name = "admin.upsert_scram_credential",
1410 level = "debug",
1411 skip(self, user, password)
1412 )]
1413 pub async fn upsert_scram_credential(
1415 &self,
1416 user: impl Into<String>,
1417 mechanism: SaslMechanism,
1418 password: impl AsRef<[u8]>,
1419 ) -> Result<()> {
1420 self.upsert_scram_credential_with_iterations(
1421 user,
1422 mechanism,
1423 password,
1424 scram::MIN_ITERATIONS,
1425 )
1426 .await
1427 }
1428
1429 #[instrument(
1430 name = "admin.upsert_scram_credential_with_iterations",
1431 level = "debug",
1432 skip(self, user, password)
1433 )]
1434 pub async fn upsert_scram_credential_with_iterations(
1436 &self,
1437 user: impl Into<String>,
1438 mechanism: SaslMechanism,
1439 password: impl AsRef<[u8]>,
1440 iterations: i32,
1441 ) -> Result<()> {
1442 let user = user.into();
1443 let mechanism_type = mechanism
1444 .scram_type()
1445 .ok_or_else(|| anyhow!("SCRAM credential upsertion requires a SCRAM mechanism"))?;
1446 let salt = scram::secure_random_bytes()?;
1447 let salted_password =
1448 scram::salted_password(mechanism, password.as_ref(), &salt, iterations)?;
1449 let request = AlterUserScramCredentialsRequest::default().with_upsertions(vec![
1450 ScramCredentialUpsertion::default()
1451 .with_name(StrBytes::from_string(user.clone()))
1452 .with_mechanism(mechanism_type)
1453 .with_iterations(iterations)
1454 .with_salt(Bytes::from(salt))
1455 .with_salted_password(Bytes::from(salted_password)),
1456 ]);
1457 let response: AlterUserScramCredentialsResponse = self
1458 .send_request::<AlterUserScramCredentialsRequest>(
1459 ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP,
1460 &request,
1461 )
1462 .await?;
1463
1464 for result in response.results {
1465 if let Some(error) = result.error_code.err() {
1466 return Err(anyhow!(
1467 "alter SCRAM credential for user '{}' failed: {}",
1468 result.user,
1469 result
1470 .error_message
1471 .as_ref()
1472 .map(|message| message.to_string())
1473 .filter(|message| !message.is_empty())
1474 .unwrap_or_else(|| error.to_string())
1475 )
1476 .into());
1477 }
1478 }
1479
1480 Ok(())
1481 }
1482
1483 pub fn config(&self) -> &AdminConfig {
1485 &self.config
1486 }
1487
1488 pub async fn finalized_feature_levels(&self) -> Result<Vec<BrokerFeatureLevel>> {
1490 let connection = connect_to_any_bootstrap(
1491 &self.config.bootstrap_servers,
1492 &self.config.client_id,
1493 self.config.request_timeout,
1494 self.config.security_protocol,
1495 &self.config.tls,
1496 &self.config.sasl,
1497 &self.config.tcp_connector,
1498 )
1499 .await?;
1500 Ok(connection
1501 .finalized_feature_levels()
1502 .into_iter()
1503 .map(|(name, level)| BrokerFeatureLevel { name, level })
1504 .collect())
1505 }
1506
1507 pub async fn update_features<I>(&self, updates: I) -> Result<()>
1509 where
1510 I: IntoIterator<Item = FeatureUpdate>,
1511 {
1512 self.update_features_inner(updates, false).await
1513 }
1514
1515 pub async fn validate_feature_updates<I>(&self, updates: I) -> Result<()>
1517 where
1518 I: IntoIterator<Item = FeatureUpdate>,
1519 {
1520 self.update_features_inner(updates, true).await
1521 }
1522
1523 async fn update_features_inner<I>(&self, updates: I, validate_only: bool) -> Result<()>
1524 where
1525 I: IntoIterator<Item = FeatureUpdate>,
1526 {
1527 let (mut connection, version) = self
1528 .connect_with_version::<UpdateFeaturesRequest>(UPDATE_FEATURES_VERSION_CAP)
1529 .await?;
1530 let feature_updates = updates
1531 .into_iter()
1532 .map(|update| update.into_request_update(version))
1533 .collect::<Result<Vec<_>>>()?;
1534 if feature_updates.is_empty() {
1535 return Ok(());
1536 }
1537
1538 let mut request = UpdateFeaturesRequest::default()
1539 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
1540 .with_feature_updates(feature_updates);
1541 if version >= 1 {
1542 request = request.with_validate_only(validate_only);
1543 } else if validate_only {
1544 return Err(anyhow!("validate-only feature updates require UpdateFeatures v1+").into());
1545 }
1546
1547 let response: UpdateFeaturesResponse = connection
1548 .send_request::<UpdateFeaturesRequest>(&self.config.client_id, version, &request)
1549 .await?;
1550 if let Some(error) = response.error_code.err() {
1551 return Err(anyhow!(
1552 "update features failed: {}",
1553 response
1554 .error_message
1555 .as_ref()
1556 .map(|message| message.to_string())
1557 .filter(|message| !message.is_empty())
1558 .unwrap_or_else(|| error.to_string())
1559 )
1560 .into());
1561 }
1562
1563 for result in response.results {
1564 if let Some(error) = result.error_code.err() {
1565 let feature = result.feature.to_string();
1566 return Err(anyhow!(
1567 "update feature '{feature}' failed: {}",
1568 result
1569 .error_message
1570 .as_ref()
1571 .map(|message| message.to_string())
1572 .filter(|message| !message.is_empty())
1573 .unwrap_or_else(|| error.to_string())
1574 )
1575 .into());
1576 }
1577 }
1578
1579 Ok(())
1580 }
1581
1582 async fn warm_up(&self) -> Result<()> {
1583 let _ = connect_to_any_bootstrap(
1584 &self.config.bootstrap_servers,
1585 &self.config.client_id,
1586 self.config.request_timeout,
1587 self.config.security_protocol,
1588 &self.config.tls,
1589 &self.config.sasl,
1590 &self.config.tcp_connector,
1591 )
1592 .await?;
1593 Ok(())
1594 }
1595
1596 async fn fetch_metadata(&self, topics: Option<&[String]>) -> Result<MetadataResponse> {
1597 let (mut connection, version) = self
1598 .connect_with_version::<MetadataRequest>(METADATA_VERSION_CAP)
1599 .await?;
1600 let request = MetadataRequest::default()
1601 .with_topics(topics.map(|topics| {
1602 topics
1603 .iter()
1604 .cloned()
1605 .map(StrBytes::from_string)
1606 .map(|name| MetadataRequestTopic::default().with_name(Some(name.into())))
1607 .collect()
1608 }))
1609 .with_allow_auto_topic_creation(false)
1610 .with_include_cluster_authorized_operations(false)
1611 .with_include_topic_authorized_operations(false);
1612 Ok(connection
1613 .send_request::<MetadataRequest>(&self.config.client_id, version, &request)
1614 .await?)
1615 }
1616
1617 async fn send_request<Req>(&self, version_cap: i16, request: &Req) -> Result<Req::Response>
1618 where
1619 Req: Request,
1620 {
1621 let (mut connection, version) = self.connect_with_version::<Req>(version_cap).await?;
1622 Ok(connection
1623 .send_request::<Req>(&self.config.client_id, version, request)
1624 .await?)
1625 }
1626
1627 async fn connect_with_version<Req>(&self, version_cap: i16) -> Result<(BrokerConnection, i16)>
1628 where
1629 Req: Request,
1630 {
1631 let connection = connect_to_any_bootstrap(
1632 &self.config.bootstrap_servers,
1633 &self.config.client_id,
1634 self.config.request_timeout,
1635 self.config.security_protocol,
1636 &self.config.tls,
1637 &self.config.sasl,
1638 &self.config.tcp_connector,
1639 )
1640 .await?;
1641 let version = connection.version_with_cap::<Req>(version_cap)?;
1642 Ok((connection, version))
1643 }
1644}
1645
1646fn validate_topic_name(topic: String) -> Result<String> {
1647 let topic = topic.trim();
1648 if topic.is_empty() {
1649 return Err(AdminError::EmptyTopicName.into());
1650 }
1651
1652 Ok(topic.to_owned())
1653}
1654
1655fn validate_group_id(group_id: String) -> Result<String> {
1656 let group_id = group_id.trim();
1657 if group_id.is_empty() {
1658 return Err(anyhow!("consumer group id cannot be empty").into());
1659 }
1660
1661 Ok(group_id.to_owned())
1662}
1663
1664fn validate_feature_name(feature: String) -> Result<String> {
1665 let feature = feature.trim();
1666 if feature.is_empty() {
1667 return Err(anyhow!("feature names must be non-empty").into());
1668 }
1669
1670 Ok(feature.to_owned())
1671}
1672
1673fn assignment_partition_count(
1674 assignment: &kafka_protocol::messages::consumer_group_describe_response::Assignment,
1675) -> usize {
1676 assignment
1677 .topic_partitions
1678 .iter()
1679 .map(|topic| topic.partitions.len())
1680 .sum()
1681}
1682
1683fn share_assignment_partition_count(
1684 assignment: &kafka_protocol::messages::share_group_describe_response::Assignment,
1685) -> usize {
1686 assignment
1687 .topic_partitions
1688 .iter()
1689 .map(|topic| topic.partitions.len())
1690 .sum()
1691}
1692
1693fn classic_subscription_topic_count(metadata: &Bytes) -> usize {
1694 let Some((version, mut body)) = classic_protocol_body(metadata) else {
1695 return 0;
1696 };
1697 ConsumerProtocolSubscription::decode(&mut body, version)
1698 .map(|subscription| subscription.topics.len())
1699 .unwrap_or(0)
1700}
1701
1702fn classic_assignment_partition_count(assignment: &Bytes) -> usize {
1703 let Some((version, mut body)) = classic_protocol_body(assignment) else {
1704 return 0;
1705 };
1706 ConsumerProtocolAssignment::decode(&mut body, version)
1707 .map(|assignment| {
1708 assignment
1709 .assigned_partitions
1710 .iter()
1711 .map(|topic| topic.partitions.len())
1712 .sum()
1713 })
1714 .unwrap_or(0)
1715}
1716
1717fn classic_protocol_body(bytes: &Bytes) -> Option<(i16, Bytes)> {
1718 let mut body = bytes.clone();
1719 if body.remaining() < 2 {
1720 return None;
1721 }
1722 let version = body.get_i16();
1723 Some((version, body))
1724}
1725
1726fn is_ignorable_create_topic_error(error: ResponseError) -> bool {
1727 error == ResponseError::TopicAlreadyExists
1728}
1729
1730fn response_supported_config_type(config_type: i8) -> bool {
1731 config_type >= 0
1732}
1733
1734#[cfg(test)]
1735mod tests {
1736 use super::*;
1737
1738 #[test]
1739 fn new_topic_maps_to_create_topics_request() {
1740 let topic = NewTopic::new("orders", 3, 2)
1741 .with_config("cleanup.policy", "compact")
1742 .into_request_topic()
1743 .expect("topic should be valid");
1744
1745 assert_eq!(topic.name.0.to_string(), "orders");
1746 assert_eq!(topic.num_partitions, 3);
1747 assert_eq!(topic.replication_factor, 2);
1748 assert_eq!(topic.configs.len(), 1);
1749 assert_eq!(topic.configs[0].name.to_string(), "cleanup.policy");
1750 }
1751
1752 #[test]
1753 fn new_topic_rejects_invalid_partition_count() {
1754 let error = NewTopic::new("orders", 0, 1)
1755 .into_request_topic()
1756 .expect_err("invalid topic should fail");
1757 assert!(
1758 error
1759 .to_string()
1760 .contains("topic partition count must be positive")
1761 );
1762 }
1763
1764 #[test]
1765 fn new_topic_rejects_empty_names_and_invalid_replication_factor() {
1766 let error = NewTopic::new(" ", 1, 1).into_request_topic().unwrap_err();
1767 assert!(matches!(
1768 error,
1769 crate::Error::Admin(AdminError::EmptyTopicName)
1770 ));
1771
1772 let error = NewTopic::new("orders", 1, 0)
1773 .into_request_topic()
1774 .unwrap_err();
1775 assert!(matches!(
1776 error,
1777 crate::Error::Admin(AdminError::InvalidReplicationFactor {
1778 replication_factor: 0
1779 })
1780 ));
1781 }
1782
1783 #[test]
1784 fn new_partitions_maps_assignments_and_rejects_invalid_input() {
1785 let topic = NewPartitions::increase_to(4)
1786 .with_assignment([1, 2])
1787 .with_assignment([2, 3])
1788 .into_request_topic("orders".to_owned())
1789 .unwrap();
1790 assert_eq!(topic.name.to_string(), "orders");
1791 assert_eq!(topic.count, 4);
1792 assert_eq!(topic.assignments.unwrap().len(), 2);
1793
1794 let error = NewPartitions::increase_to(0)
1795 .into_request_topic("orders".to_owned())
1796 .unwrap_err();
1797 assert!(matches!(
1798 error,
1799 crate::Error::Admin(AdminError::InvalidPartitionCount { partitions: 0 })
1800 ));
1801
1802 let error = NewPartitions::increase_to(1)
1803 .into_request_topic(" ".to_owned())
1804 .unwrap_err();
1805 assert!(matches!(
1806 error,
1807 crate::Error::Admin(AdminError::EmptyTopicName)
1808 ));
1809 }
1810
1811 #[test]
1812 fn config_resource_and_operation_protocol_values_are_stable() {
1813 assert_eq!(ConfigResourceType::Unknown.as_protocol_value(), 0);
1814 assert_eq!(ConfigResourceType::Topic.as_protocol_value(), 2);
1815 assert_eq!(ConfigResourceType::Broker.as_protocol_value(), 4);
1816 assert_eq!(ConfigResourceType::BrokerLogger.as_protocol_value(), 8);
1817 assert_eq!(ConfigResourceType::ClientMetrics.as_protocol_value(), 16);
1818 assert_eq!(ConfigResourceType::Group.as_protocol_value(), 32);
1819 assert_eq!(
1820 ConfigResourceType::from_protocol_value(2),
1821 ConfigResourceType::Topic
1822 );
1823 assert_eq!(
1824 ConfigResourceType::from_protocol_value(99),
1825 ConfigResourceType::Unknown
1826 );
1827
1828 assert_eq!(
1829 AlterConfigOp::set("cleanup.policy", "compact")
1830 .op_type
1831 .as_protocol_value(),
1832 0
1833 );
1834 assert_eq!(
1835 AlterConfigOp::delete("retention.ms")
1836 .op_type
1837 .as_protocol_value(),
1838 1
1839 );
1840 assert_eq!(AlterConfigOpType::Append.as_protocol_value(), 2);
1841 assert_eq!(AlterConfigOpType::Subtract.as_protocol_value(), 3);
1842 assert!(response_supported_config_type(0));
1843 assert!(!response_supported_config_type(-1));
1844 }
1845
1846 #[test]
1847 fn config_resource_config_entry_lookup_returns_named_entry() {
1848 let mut entries = BTreeMap::new();
1849 entries.insert(
1850 "cleanup.policy".to_owned(),
1851 ConfigEntry {
1852 name: "cleanup.policy".to_owned(),
1853 value: Some("compact".to_owned()),
1854 read_only: false,
1855 config_source: 1,
1856 is_sensitive: false,
1857 config_type: Some(2),
1858 documentation: None,
1859 },
1860 );
1861 let config = ConfigResourceConfig {
1862 resource: ConfigResource::topic("orders"),
1863 entries,
1864 };
1865
1866 assert_eq!(
1867 config.entry("cleanup.policy").unwrap().value.as_deref(),
1868 Some("compact")
1869 );
1870 assert!(config.entry("missing").is_none());
1871 }
1872
1873 #[test]
1874 fn feature_update_maps_to_modern_update_features_request() {
1875 let update = FeatureUpdate::upgrade("share.version", 1)
1876 .into_request_update(2)
1877 .unwrap();
1878 assert_eq!(update.feature.to_string(), "share.version");
1879 assert_eq!(update.max_version_level, 1);
1880 assert_eq!(update.upgrade_type, 1);
1881 assert!(!update.allow_downgrade);
1882 }
1883
1884 #[test]
1885 fn feature_update_maps_to_legacy_downgrade_flag() {
1886 let update = FeatureUpdate::safe_downgrade("metadata.version", 0)
1887 .into_request_update(0)
1888 .unwrap();
1889 assert_eq!(update.feature.to_string(), "metadata.version");
1890 assert_eq!(update.max_version_level, 0);
1891 assert_eq!(update.upgrade_type, 1);
1892 assert!(update.allow_downgrade);
1893 }
1894
1895 #[test]
1896 fn feature_update_rejects_empty_name() {
1897 let error = FeatureUpdate::upgrade(" ", 1)
1898 .into_request_update(2)
1899 .unwrap_err();
1900 assert!(
1901 error
1902 .to_string()
1903 .contains("feature names must be non-empty")
1904 );
1905 }
1906
1907 #[test]
1908 fn topic_already_exists_is_ignorable() {
1909 assert!(is_ignorable_create_topic_error(
1910 ResponseError::TopicAlreadyExists
1911 ));
1912 assert!(!is_ignorable_create_topic_error(
1913 ResponseError::UnknownTopicOrPartition
1914 ));
1915 }
1916}