1use std::collections::BTreeMap;
16
17use anyhow::anyhow;
18use bytes::{Buf, Bytes};
19use kafka_protocol::error::{ParseResponseErrorCode, ResponseError};
20use kafka_protocol::messages::alter_user_scram_credentials_request::ScramCredentialUpsertion;
21use kafka_protocol::messages::create_acls_request::AclCreation;
22use kafka_protocol::messages::create_partitions_request::{
23 CreatePartitionsAssignment, CreatePartitionsTopic,
24};
25use kafka_protocol::messages::create_topics_request::{CreatableTopic, CreatableTopicConfig};
26use kafka_protocol::messages::delete_acls_request::DeleteAclsFilter;
27use kafka_protocol::messages::describe_configs_request::DescribeConfigsResource;
28use kafka_protocol::messages::describe_log_dirs_request::DescribableLogDirTopic;
29use kafka_protocol::messages::incremental_alter_configs_request::{
30 AlterConfigsResource, AlterableConfig,
31};
32use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
33use kafka_protocol::messages::update_features_request::FeatureUpdateKey;
34use kafka_protocol::messages::{
35 AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse,
36 ConsumerGroupDescribeRequest, ConsumerGroupDescribeResponse, ConsumerProtocolAssignment,
37 ConsumerProtocolSubscription, CreateAclsRequest, CreateAclsResponse, CreatePartitionsRequest,
38 CreatePartitionsResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteAclsRequest,
39 DeleteAclsResponse, DeleteGroupsRequest, DeleteGroupsResponse, DeleteTopicsRequest,
40 DeleteTopicsResponse, DescribeAclsRequest, DescribeAclsResponse, DescribeClusterRequest,
41 DescribeClusterResponse, DescribeConfigsRequest, DescribeConfigsResponse,
42 DescribeGroupsRequest, DescribeGroupsResponse, DescribeLogDirsRequest, DescribeLogDirsResponse,
43 IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse, ListGroupsRequest,
44 ListGroupsResponse, MetadataRequest, MetadataResponse, ShareGroupDescribeRequest,
45 ShareGroupDescribeResponse, UpdateFeaturesRequest, UpdateFeaturesResponse,
46};
47use kafka_protocol::protocol::{Decodable, Request, StrBytes};
48use tracing::{debug, instrument};
49use uuid::Uuid;
50
51use crate::config::{AdminConfig, SaslMechanism};
52use crate::constants::{
53 ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP, CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
54 CREATE_ACLS_VERSION_CAP, CREATE_PARTITIONS_VERSION_CAP, CREATE_TOPICS_VERSION_CAP,
55 DELETE_ACLS_VERSION_CAP, DELETE_GROUPS_VERSION_CAP, DELETE_TOPICS_VERSION_CAP,
56 DESCRIBE_ACLS_VERSION_CAP, DESCRIBE_CLUSTER_VERSION_CAP, DESCRIBE_CONFIGS_VERSION_CAP,
57 DESCRIBE_GROUPS_VERSION_CAP, DESCRIBE_LOG_DIRS_VERSION_CAP,
58 INCREMENTAL_ALTER_CONFIGS_VERSION_CAP, LIST_GROUPS_VERSION_CAP, METADATA_VERSION_CAP,
59 SHARE_GROUP_DESCRIBE_VERSION_CAP, UPDATE_FEATURES_VERSION_CAP,
60};
61use crate::network::scram;
62use crate::network::{BrokerConnection, connect_to_any_bootstrap, duration_to_i32_ms};
63use crate::types::TopicPartition;
64use crate::{AdminError, BrokerError, Result, ValidationError};
65
66#[derive(Debug, Clone)]
67pub struct NewTopic {
87 pub name: String,
89 pub num_partitions: i32,
91 pub replication_factor: i16,
93 pub configs: BTreeMap<String, String>,
95}
96
97impl NewTopic {
98 pub fn new(name: impl Into<String>, num_partitions: i32, replication_factor: i16) -> Self {
100 Self {
101 name: name.into(),
102 num_partitions,
103 replication_factor,
104 configs: BTreeMap::new(),
105 }
106 }
107
108 pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
110 self.configs.insert(key.into(), value.into());
111 self
112 }
113
114 fn into_request_topic(self) -> Result<CreatableTopic> {
115 let name = validate_topic_name(self.name)?;
116 if self.num_partitions <= 0 {
117 return Err(AdminError::InvalidPartitionCount {
118 partitions: self.num_partitions,
119 }
120 .into());
121 }
122 if self.replication_factor <= 0 {
123 return Err(AdminError::InvalidReplicationFactor {
124 replication_factor: self.replication_factor,
125 }
126 .into());
127 }
128
129 let configs = self
130 .configs
131 .into_iter()
132 .map(|(key, value)| {
133 CreatableTopicConfig::default()
134 .with_name(StrBytes::from_string(key))
135 .with_value(Some(StrBytes::from_string(value)))
136 })
137 .collect();
138
139 Ok(CreatableTopic::default()
140 .with_name(StrBytes::from_string(name).into())
141 .with_num_partitions(self.num_partitions)
142 .with_replication_factor(self.replication_factor)
143 .with_configs(configs))
144 }
145}
146
147#[derive(Debug, Clone, PartialEq, Eq)]
148pub struct NewPartitions {
164 pub total_count: i32,
166 pub assignments: Vec<Vec<i32>>,
168}
169
170impl NewPartitions {
171 pub fn increase_to(total_count: i32) -> Self {
173 Self {
174 total_count,
175 assignments: Vec::new(),
176 }
177 }
178
179 pub fn with_assignment<I>(mut self, broker_ids: I) -> Self
184 where
185 I: IntoIterator<Item = i32>,
186 {
187 self.assignments.push(broker_ids.into_iter().collect());
188 self
189 }
190
191 fn into_request_topic(self, topic_name: String) -> Result<CreatePartitionsTopic> {
192 let name = validate_topic_name(topic_name)?;
193 if self.total_count <= 0 {
194 return Err(AdminError::InvalidPartitionCount {
195 partitions: self.total_count,
196 }
197 .into());
198 }
199
200 let assignments = (!self.assignments.is_empty()).then(|| {
201 self.assignments
202 .into_iter()
203 .map(|broker_ids| {
204 CreatePartitionsAssignment::default()
205 .with_broker_ids(broker_ids.into_iter().map(Into::into).collect())
206 })
207 .collect()
208 });
209
210 Ok(CreatePartitionsTopic::default()
211 .with_name(StrBytes::from_string(name).into())
212 .with_count(self.total_count)
213 .with_assignments(assignments))
214 }
215}
216
217#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218pub enum ResourceType {
220 Unknown,
221 Any,
222 Topic,
223 Group,
224 Cluster,
225 TransactionalId,
226 DelegationToken,
227 User,
228}
229
230impl ResourceType {
231 fn as_protocol_value(self) -> i8 {
232 match self {
233 Self::Unknown => 0,
234 Self::Any => 1,
235 Self::Topic => 2,
236 Self::Group => 3,
237 Self::Cluster => 4,
238 Self::TransactionalId => 5,
239 Self::DelegationToken => 6,
240 Self::User => 7,
241 }
242 }
243
244 fn from_protocol_value(value: i8) -> Self {
245 match value {
246 1 => Self::Any,
247 2 => Self::Topic,
248 3 => Self::Group,
249 4 => Self::Cluster,
250 5 => Self::TransactionalId,
251 6 => Self::DelegationToken,
252 7 => Self::User,
253 _ => Self::Unknown,
254 }
255 }
256}
257
258#[derive(Debug, Clone, Copy, PartialEq, Eq)]
259pub enum PatternType {
261 Unknown,
262 Any,
263 Match,
264 Literal,
265 Prefixed,
266}
267
268impl PatternType {
269 fn as_protocol_value(self) -> i8 {
270 match self {
271 Self::Unknown => 0,
272 Self::Any => 1,
273 Self::Match => 2,
274 Self::Literal => 3,
275 Self::Prefixed => 4,
276 }
277 }
278
279 fn from_protocol_value(value: i8) -> Self {
280 match value {
281 1 => Self::Any,
282 2 => Self::Match,
283 3 => Self::Literal,
284 4 => Self::Prefixed,
285 _ => Self::Unknown,
286 }
287 }
288}
289
290#[derive(Debug, Clone, Copy, PartialEq, Eq)]
291pub enum AclOperation {
293 Unknown,
294 Any,
295 All,
296 Read,
297 Write,
298 Create,
299 Delete,
300 Alter,
301 Describe,
302 ClusterAction,
303 DescribeConfigs,
304 AlterConfigs,
305 IdempotentWrite,
306 CreateTokens,
307 DescribeTokens,
308 TwoPhaseCommit,
309}
310
311impl AclOperation {
312 fn as_protocol_value(self) -> i8 {
313 match self {
314 Self::Unknown => 0,
315 Self::Any => 1,
316 Self::All => 2,
317 Self::Read => 3,
318 Self::Write => 4,
319 Self::Create => 5,
320 Self::Delete => 6,
321 Self::Alter => 7,
322 Self::Describe => 8,
323 Self::ClusterAction => 9,
324 Self::DescribeConfigs => 10,
325 Self::AlterConfigs => 11,
326 Self::IdempotentWrite => 12,
327 Self::CreateTokens => 13,
328 Self::DescribeTokens => 14,
329 Self::TwoPhaseCommit => 15,
330 }
331 }
332
333 fn from_protocol_value(value: i8) -> Self {
334 match value {
335 1 => Self::Any,
336 2 => Self::All,
337 3 => Self::Read,
338 4 => Self::Write,
339 5 => Self::Create,
340 6 => Self::Delete,
341 7 => Self::Alter,
342 8 => Self::Describe,
343 9 => Self::ClusterAction,
344 10 => Self::DescribeConfigs,
345 11 => Self::AlterConfigs,
346 12 => Self::IdempotentWrite,
347 13 => Self::CreateTokens,
348 14 => Self::DescribeTokens,
349 15 => Self::TwoPhaseCommit,
350 _ => Self::Unknown,
351 }
352 }
353}
354
355#[derive(Debug, Clone, Copy, PartialEq, Eq)]
356pub enum AclPermissionType {
358 Unknown,
359 Any,
360 Deny,
361 Allow,
362}
363
364impl AclPermissionType {
365 fn as_protocol_value(self) -> i8 {
366 match self {
367 Self::Unknown => 0,
368 Self::Any => 1,
369 Self::Deny => 2,
370 Self::Allow => 3,
371 }
372 }
373
374 fn from_protocol_value(value: i8) -> Self {
375 match value {
376 1 => Self::Any,
377 2 => Self::Deny,
378 3 => Self::Allow,
379 _ => Self::Unknown,
380 }
381 }
382}
383
384#[derive(Debug, Clone, PartialEq, Eq)]
385pub struct ResourcePattern {
387 pub resource_type: ResourceType,
388 pub name: String,
389 pub pattern_type: PatternType,
390}
391
392impl ResourcePattern {
393 pub const WILDCARD_RESOURCE: &'static str = "*";
394
395 pub fn new(
396 resource_type: ResourceType,
397 name: impl Into<String>,
398 pattern_type: PatternType,
399 ) -> Self {
400 Self {
401 resource_type,
402 name: name.into(),
403 pattern_type,
404 }
405 }
406
407 pub fn to_filter(&self) -> ResourcePatternFilter {
408 ResourcePatternFilter::new(
409 self.resource_type,
410 Some(self.name.clone()),
411 self.pattern_type,
412 )
413 }
414
415 fn validate(&self) -> Result<()> {
416 if matches!(self.resource_type, ResourceType::Any) {
417 return Err(ValidationError::InvalidAclResourceType {
418 resource_type: "ANY".to_owned(),
419 }
420 .into());
421 }
422 if matches!(self.pattern_type, PatternType::Any | PatternType::Match) {
423 return Err(ValidationError::InvalidAclPatternType {
424 pattern_type: format!("{:?}", self.pattern_type),
425 }
426 .into());
427 }
428 if self.name.trim().is_empty() {
429 return Err(ValidationError::EmptyResourceName { resource: "ACL" }.into());
430 }
431 Ok(())
432 }
433}
434
435#[derive(Debug, Clone, PartialEq, Eq)]
436pub struct ResourcePatternFilter {
438 pub resource_type: ResourceType,
439 pub name: Option<String>,
440 pub pattern_type: PatternType,
441}
442
443impl ResourcePatternFilter {
444 pub fn any() -> Self {
445 Self::new(ResourceType::Any, None, PatternType::Any)
446 }
447
448 pub fn new(
449 resource_type: ResourceType,
450 name: Option<String>,
451 pattern_type: PatternType,
452 ) -> Self {
453 Self {
454 resource_type,
455 name,
456 pattern_type,
457 }
458 }
459}
460
461#[derive(Debug, Clone, PartialEq, Eq)]
462pub struct AccessControlEntry {
464 pub principal: String,
465 pub host: String,
466 pub operation: AclOperation,
467 pub permission_type: AclPermissionType,
468}
469
470impl AccessControlEntry {
471 pub fn new(
472 principal: impl Into<String>,
473 host: impl Into<String>,
474 operation: AclOperation,
475 permission_type: AclPermissionType,
476 ) -> Self {
477 Self {
478 principal: principal.into(),
479 host: host.into(),
480 operation,
481 permission_type,
482 }
483 }
484
485 pub fn to_filter(&self) -> AccessControlEntryFilter {
486 AccessControlEntryFilter::new(
487 Some(self.principal.clone()),
488 Some(self.host.clone()),
489 self.operation,
490 self.permission_type,
491 )
492 }
493
494 fn validate(&self) -> Result<()> {
495 if self.principal.trim().is_empty() {
496 return Err(ValidationError::EmptyAclPrincipal.into());
497 }
498 if self.host.trim().is_empty() {
499 return Err(ValidationError::EmptyAclHost.into());
500 }
501 if matches!(self.operation, AclOperation::Any) {
502 return Err(ValidationError::InvalidAclOperation {
503 operation: "ANY".to_owned(),
504 }
505 .into());
506 }
507 if matches!(self.permission_type, AclPermissionType::Any) {
508 return Err(ValidationError::InvalidAclPermissionType {
509 permission_type: "ANY".to_owned(),
510 }
511 .into());
512 }
513 Ok(())
514 }
515}
516
517#[derive(Debug, Clone, PartialEq, Eq)]
518pub struct AccessControlEntryFilter {
520 pub principal: Option<String>,
521 pub host: Option<String>,
522 pub operation: AclOperation,
523 pub permission_type: AclPermissionType,
524}
525
526impl AccessControlEntryFilter {
527 pub fn any() -> Self {
528 Self::new(None, None, AclOperation::Any, AclPermissionType::Any)
529 }
530
531 pub fn new(
532 principal: Option<String>,
533 host: Option<String>,
534 operation: AclOperation,
535 permission_type: AclPermissionType,
536 ) -> Self {
537 Self {
538 principal,
539 host,
540 operation,
541 permission_type,
542 }
543 }
544}
545
546#[derive(Debug, Clone, PartialEq, Eq)]
547pub struct AclBinding {
549 pub pattern: ResourcePattern,
550 pub entry: AccessControlEntry,
551}
552
553impl AclBinding {
554 pub fn new(pattern: ResourcePattern, entry: AccessControlEntry) -> Self {
555 Self { pattern, entry }
556 }
557
558 pub fn to_filter(&self) -> AclBindingFilter {
559 AclBindingFilter::new(self.pattern.to_filter(), self.entry.to_filter())
560 }
561
562 fn validate(&self) -> Result<()> {
563 self.pattern.validate()?;
564 self.entry.validate()
565 }
566
567 fn into_creation(self) -> Result<AclCreation> {
568 self.validate()?;
569 Ok(AclCreation::default()
570 .with_resource_type(self.pattern.resource_type.as_protocol_value())
571 .with_resource_name(StrBytes::from_string(self.pattern.name))
572 .with_resource_pattern_type(self.pattern.pattern_type.as_protocol_value())
573 .with_principal(StrBytes::from_string(self.entry.principal))
574 .with_host(StrBytes::from_string(self.entry.host))
575 .with_operation(self.entry.operation.as_protocol_value())
576 .with_permission_type(self.entry.permission_type.as_protocol_value()))
577 }
578}
579
580#[derive(Debug, Clone, PartialEq, Eq)]
581pub struct AclBindingFilter {
583 pub pattern_filter: ResourcePatternFilter,
584 pub entry_filter: AccessControlEntryFilter,
585}
586
587impl AclBindingFilter {
588 pub fn any() -> Self {
589 Self::new(
590 ResourcePatternFilter::any(),
591 AccessControlEntryFilter::any(),
592 )
593 }
594
595 pub fn new(
596 pattern_filter: ResourcePatternFilter,
597 entry_filter: AccessControlEntryFilter,
598 ) -> Self {
599 Self {
600 pattern_filter,
601 entry_filter,
602 }
603 }
604
605 fn to_describe_request(&self) -> DescribeAclsRequest {
606 DescribeAclsRequest::default()
607 .with_resource_type_filter(self.pattern_filter.resource_type.as_protocol_value())
608 .with_resource_name_filter(self.pattern_filter.name.clone().map(StrBytes::from_string))
609 .with_pattern_type_filter(self.pattern_filter.pattern_type.as_protocol_value())
610 .with_principal_filter(
611 self.entry_filter
612 .principal
613 .clone()
614 .map(StrBytes::from_string),
615 )
616 .with_host_filter(self.entry_filter.host.clone().map(StrBytes::from_string))
617 .with_operation(self.entry_filter.operation.as_protocol_value())
618 .with_permission_type(self.entry_filter.permission_type.as_protocol_value())
619 }
620
621 fn into_delete_filter(self) -> DeleteAclsFilter {
622 DeleteAclsFilter::default()
623 .with_resource_type_filter(self.pattern_filter.resource_type.as_protocol_value())
624 .with_resource_name_filter(self.pattern_filter.name.map(StrBytes::from_string))
625 .with_pattern_type_filter(self.pattern_filter.pattern_type.as_protocol_value())
626 .with_principal_filter(self.entry_filter.principal.map(StrBytes::from_string))
627 .with_host_filter(self.entry_filter.host.map(StrBytes::from_string))
628 .with_operation(self.entry_filter.operation.as_protocol_value())
629 .with_permission_type(self.entry_filter.permission_type.as_protocol_value())
630 }
631}
632
633#[derive(Debug, Clone, PartialEq, Eq)]
634pub struct DeleteAclsResult {
636 pub matching_acls: Vec<AclBinding>,
637}
638
639#[derive(Debug, Clone, PartialEq, Eq)]
640pub struct TopicListing {
642 pub name: String,
644 pub topic_id: Option<Uuid>,
646 pub is_internal: bool,
648}
649
650#[derive(Debug, Clone, PartialEq, Eq)]
651pub struct TopicPartitionDescription {
653 pub partition: i32,
655 pub leader_id: i32,
657 pub leader_epoch: i32,
659 pub replica_nodes: Vec<i32>,
661 pub isr_nodes: Vec<i32>,
663 pub offline_replicas: Vec<i32>,
665}
666
667#[derive(Debug, Clone, PartialEq, Eq)]
668pub struct TopicDescription {
670 pub name: String,
672 pub topic_id: Option<Uuid>,
674 pub is_internal: bool,
676 pub partitions: Vec<TopicPartitionDescription>,
678}
679
680#[derive(Debug, Clone, PartialEq, Eq)]
681pub struct BrokerDescription {
683 pub broker_id: i32,
685 pub host: String,
687 pub port: i32,
689 pub rack: Option<String>,
691 pub is_fenced: bool,
693}
694
695#[derive(Debug, Clone, PartialEq, Eq)]
696pub struct ClusterDescription {
698 pub cluster_id: String,
700 pub controller_id: i32,
702 pub brokers: Vec<BrokerDescription>,
704}
705
706#[derive(Debug, Clone, PartialEq, Eq)]
707pub struct BrokerFeatureLevel {
709 pub name: String,
711 pub level: i16,
713}
714
715#[derive(Debug, Clone, PartialEq, Eq)]
716pub struct FeatureUpdate {
718 pub name: String,
720 pub max_version_level: i16,
722 pub upgrade_type: FeatureUpgradeType,
724}
725
726impl FeatureUpdate {
727 pub fn upgrade(name: impl Into<String>, max_version_level: i16) -> Self {
729 Self {
730 name: name.into(),
731 max_version_level,
732 upgrade_type: FeatureUpgradeType::Upgrade,
733 }
734 }
735
736 pub fn safe_downgrade(name: impl Into<String>, max_version_level: i16) -> Self {
738 Self {
739 name: name.into(),
740 max_version_level,
741 upgrade_type: FeatureUpgradeType::SafeDowngrade,
742 }
743 }
744
745 pub fn unsafe_downgrade(name: impl Into<String>, max_version_level: i16) -> Self {
747 Self {
748 name: name.into(),
749 max_version_level,
750 upgrade_type: FeatureUpgradeType::UnsafeDowngrade,
751 }
752 }
753
754 fn into_request_update(self, version: i16) -> Result<FeatureUpdateKey> {
755 let name = validate_feature_name(self.name)?;
756 let allow_downgrade = self.upgrade_type.allows_downgrade();
757 let mut update = FeatureUpdateKey::default()
758 .with_feature(StrBytes::from_string(name))
759 .with_max_version_level(self.max_version_level);
760 if version == 0 {
761 update = update.with_allow_downgrade(allow_downgrade);
762 } else {
763 update = update.with_upgrade_type(self.upgrade_type.as_protocol_value());
764 }
765 Ok(update)
766 }
767}
768
769#[derive(Debug, Clone, Copy, PartialEq, Eq)]
770pub enum FeatureUpgradeType {
772 Upgrade,
774 SafeDowngrade,
776 UnsafeDowngrade,
778}
779
780impl FeatureUpgradeType {
781 fn as_protocol_value(self) -> i8 {
782 match self {
783 Self::Upgrade => 1,
784 Self::SafeDowngrade => 2,
785 Self::UnsafeDowngrade => 3,
786 }
787 }
788
789 fn allows_downgrade(self) -> bool {
790 !matches!(self, Self::Upgrade)
791 }
792}
793
794#[derive(Debug, Clone, PartialEq, Eq)]
795pub struct ConsumerGroupListing {
797 pub group_id: String,
799 pub protocol_type: String,
801 pub state: Option<String>,
803 pub group_type: Option<String>,
805}
806
807#[derive(Debug, Clone, Copy, PartialEq, Eq)]
808enum DescribeGroupApi {
809 Consumer,
810 Classic,
811 Share,
812}
813
814#[derive(Debug, Clone, PartialEq, Eq)]
815pub struct ConsumerGroupDescription {
817 pub group_id: String,
819 pub state: String,
821 pub protocol_type: String,
823 pub protocol_data: String,
825 pub members: Vec<ConsumerGroupMemberDescription>,
827 pub authorized_operations: Option<i32>,
829}
830
831#[derive(Debug, Clone, PartialEq, Eq)]
832pub struct ConsumerGroupMemberDescription {
834 pub member_id: String,
836 pub group_instance_id: Option<String>,
838 pub client_id: String,
840 pub client_host: String,
842 pub member_metadata_bytes: usize,
846 pub member_assignment_bytes: usize,
850}
851
852pub type GroupListing = ConsumerGroupListing;
854
855pub type GroupDescription = ConsumerGroupDescription;
857
858pub type GroupMemberDescription = ConsumerGroupMemberDescription;
860
861#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
862pub enum ConfigResourceType {
864 Unknown,
866 Topic,
868 Broker,
870 BrokerLogger,
872 ClientMetrics,
874 Group,
876}
877
878impl ConfigResourceType {
879 fn as_protocol_value(self) -> i8 {
880 match self {
881 Self::Unknown => 0,
882 Self::Topic => 2,
883 Self::Broker => 4,
884 Self::BrokerLogger => 8,
885 Self::ClientMetrics => 16,
886 Self::Group => 32,
887 }
888 }
889
890 fn from_protocol_value(value: i8) -> Self {
891 match value {
892 2 => Self::Topic,
893 4 => Self::Broker,
894 8 => Self::BrokerLogger,
895 16 => Self::ClientMetrics,
896 32 => Self::Group,
897 _ => Self::Unknown,
898 }
899 }
900}
901
902#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
903pub struct ConfigResource {
908 pub resource_type: ConfigResourceType,
910 pub resource_name: String,
912}
913
914impl ConfigResource {
915 pub fn new(resource_type: ConfigResourceType, resource_name: impl Into<String>) -> Self {
917 Self {
918 resource_type,
919 resource_name: resource_name.into(),
920 }
921 }
922
923 pub fn topic(resource_name: impl Into<String>) -> Self {
925 Self::new(ConfigResourceType::Topic, resource_name)
926 }
927
928 pub fn group(resource_name: impl Into<String>) -> Self {
930 Self::new(ConfigResourceType::Group, resource_name)
931 }
932}
933
934#[derive(Debug, Clone, PartialEq, Eq)]
935pub struct ConfigEntry {
937 pub name: String,
939 pub value: Option<String>,
941 pub read_only: bool,
943 pub config_source: i8,
945 pub is_sensitive: bool,
947 pub config_type: Option<i8>,
949 pub documentation: Option<String>,
951}
952
953#[derive(Debug, Clone, PartialEq, Eq)]
954pub struct ConfigResourceConfig {
956 pub resource: ConfigResource,
958 pub entries: BTreeMap<String, ConfigEntry>,
960}
961
962#[derive(Debug, Clone, PartialEq, Eq)]
963pub struct BrokerLogDirs {
965 pub broker_id: i32,
967 pub log_dirs: Vec<LogDirDescription>,
969}
970
971#[derive(Debug, Clone, PartialEq, Eq)]
972pub struct LogDirDescription {
974 pub log_dir: String,
976 pub error_code: i16,
978 pub total_bytes: Option<i64>,
980 pub usable_bytes: Option<i64>,
982 pub replicas: Vec<ReplicaLogDirDescription>,
984}
985
986#[derive(Debug, Clone, PartialEq, Eq)]
987pub struct ReplicaLogDirDescription {
989 pub topic: String,
991 pub partition: i32,
993 pub size_bytes: i64,
995 pub offset_lag: i64,
997 pub is_future: bool,
999}
1000
1001impl ConfigResourceConfig {
1002 pub fn entry(&self, name: &str) -> Option<&ConfigEntry> {
1004 self.entries.get(name)
1005 }
1006}
1007
1008#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1009pub enum AlterConfigOpType {
1011 Set,
1013 Delete,
1015 Append,
1017 Subtract,
1019}
1020
1021impl AlterConfigOpType {
1022 fn as_protocol_value(self) -> i8 {
1023 match self {
1024 Self::Set => 0,
1025 Self::Delete => 1,
1026 Self::Append => 2,
1027 Self::Subtract => 3,
1028 }
1029 }
1030}
1031
1032#[derive(Debug, Clone, PartialEq, Eq)]
1033pub struct AlterConfigOp {
1035 pub name: String,
1037 pub op_type: AlterConfigOpType,
1039 pub value: Option<String>,
1041}
1042
1043impl AlterConfigOp {
1044 pub fn set(name: impl Into<String>, value: impl Into<String>) -> Self {
1046 Self {
1047 name: name.into(),
1048 op_type: AlterConfigOpType::Set,
1049 value: Some(value.into()),
1050 }
1051 }
1052
1053 pub fn delete(name: impl Into<String>) -> Self {
1055 Self {
1056 name: name.into(),
1057 op_type: AlterConfigOpType::Delete,
1058 value: None,
1059 }
1060 }
1061}
1062
1063#[derive(Debug, Clone)]
1064pub struct KafkaAdmin {
1086 config: AdminConfig,
1087}
1088
1089impl KafkaAdmin {
1090 #[instrument(
1091 name = "admin.connect",
1092 level = "debug",
1093 skip(config),
1094 fields(
1095 bootstrap_server_count = config.bootstrap_servers.len(),
1096 client_id = %config.client_id
1097 )
1098 )]
1099 pub async fn connect(config: AdminConfig) -> Result<Self> {
1105 let admin = Self { config };
1106 admin.warm_up().await?;
1107 debug!("admin client connected");
1108 Ok(admin)
1109 }
1110
1111 #[instrument(name = "admin.create_topics", level = "debug", skip(self, topics))]
1112 pub async fn create_topics<I>(&self, topics: I) -> Result<()>
1117 where
1118 I: IntoIterator<Item = NewTopic>,
1119 {
1120 let topics = topics
1121 .into_iter()
1122 .map(NewTopic::into_request_topic)
1123 .collect::<Result<Vec<_>>>()?;
1124 if topics.is_empty() {
1125 return Ok(());
1126 }
1127
1128 let request = CreateTopicsRequest::default()
1129 .with_topics(topics)
1130 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
1131 .with_validate_only(false);
1132 let response: CreateTopicsResponse = self
1133 .send_request::<CreateTopicsRequest>(CREATE_TOPICS_VERSION_CAP, &request)
1134 .await?;
1135
1136 for topic in response.topics {
1137 let name = topic.name.0.to_string();
1138 if let Some(error) = topic
1139 .error_code
1140 .err()
1141 .filter(|error| !is_ignorable_create_topic_error(*error))
1142 {
1143 return Err(admin_broker_error("create_topic", Some(name), error));
1144 }
1145 }
1146
1147 Ok(())
1148 }
1149
1150 #[instrument(name = "admin.delete_topics", level = "debug", skip(self, topics))]
1151 pub async fn delete_topics<I, S>(&self, topics: I) -> Result<()>
1156 where
1157 I: IntoIterator<Item = S>,
1158 S: Into<String>,
1159 {
1160 let topic_names = topics
1161 .into_iter()
1162 .map(|topic| validate_topic_name(topic.into()))
1163 .collect::<Result<Vec<_>>>()?;
1164 if topic_names.is_empty() {
1165 return Ok(());
1166 }
1167
1168 let request = DeleteTopicsRequest::default()
1169 .with_topic_names(
1170 topic_names
1171 .iter()
1172 .cloned()
1173 .map(StrBytes::from_string)
1174 .map(Into::into)
1175 .collect(),
1176 )
1177 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?);
1178 let response: DeleteTopicsResponse = self
1179 .send_request::<DeleteTopicsRequest>(DELETE_TOPICS_VERSION_CAP, &request)
1180 .await?;
1181
1182 for topic in response.responses {
1183 let name = topic
1184 .name
1185 .as_ref()
1186 .map(|name| name.0.to_string())
1187 .unwrap_or_else(|| "<unknown>".to_owned());
1188 if let Some(error) = topic.error_code.err() {
1189 return Err(admin_broker_error("delete_topic", Some(name), error));
1190 }
1191 }
1192
1193 Ok(())
1194 }
1195
1196 #[instrument(
1197 name = "admin.create_partitions",
1198 level = "debug",
1199 skip(self, partitions)
1200 )]
1201 pub async fn create_partitions<I, S>(&self, partitions: I) -> Result<()>
1206 where
1207 I: IntoIterator<Item = (S, NewPartitions)>,
1208 S: Into<String>,
1209 {
1210 let topics = partitions
1211 .into_iter()
1212 .map(|(topic, new_partitions)| new_partitions.into_request_topic(topic.into()))
1213 .collect::<Result<Vec<_>>>()?;
1214 if topics.is_empty() {
1215 return Ok(());
1216 }
1217
1218 let request = CreatePartitionsRequest::default()
1219 .with_topics(topics)
1220 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
1221 .with_validate_only(false);
1222 let response: CreatePartitionsResponse = self
1223 .send_request::<CreatePartitionsRequest>(CREATE_PARTITIONS_VERSION_CAP, &request)
1224 .await?;
1225
1226 for topic in response.results {
1227 let name = topic.name.0.to_string();
1228 if let Some(error) = topic.error_code.err() {
1229 return Err(admin_broker_error("create_partitions", Some(name), error));
1230 }
1231 }
1232
1233 Ok(())
1234 }
1235
1236 #[instrument(name = "admin.list_topics", level = "debug", skip(self))]
1237 pub async fn list_topics(&self) -> Result<Vec<TopicListing>> {
1241 let response = self.fetch_metadata(None).await?;
1242 let mut topics = Vec::new();
1243
1244 for topic in response.topics {
1245 let Some(name) = topic.name.as_ref().map(|name| name.0.to_string()) else {
1246 continue;
1247 };
1248 if let Some(error) = topic.error_code.err() {
1249 return Err(admin_broker_error("list_topics", Some(name), error));
1250 }
1251
1252 topics.push(TopicListing {
1253 name,
1254 topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
1255 is_internal: topic.is_internal,
1256 });
1257 }
1258
1259 topics.sort_by(|left, right| left.name.cmp(&right.name));
1260 Ok(topics)
1261 }
1262
1263 #[instrument(name = "admin.describe_topics", level = "debug", skip(self, topics))]
1264 pub async fn describe_topics<I, S>(&self, topics: I) -> Result<Vec<TopicDescription>>
1269 where
1270 I: IntoIterator<Item = S>,
1271 S: Into<String>,
1272 {
1273 let requested_topics = topics
1274 .into_iter()
1275 .map(|topic| validate_topic_name(topic.into()))
1276 .collect::<Result<Vec<_>>>()?;
1277 if requested_topics.is_empty() {
1278 return Ok(Vec::new());
1279 }
1280
1281 let response = self.fetch_metadata(Some(&requested_topics)).await?;
1282 let mut descriptions = BTreeMap::new();
1283
1284 for topic in response.topics {
1285 let name = topic
1286 .name
1287 .as_ref()
1288 .map(|name| name.0.to_string())
1289 .unwrap_or_default();
1290 if let Some(error) = topic.error_code.err() {
1291 let label = if name.is_empty() { "<unknown>" } else { &name };
1292 return Err(admin_broker_error(
1293 "describe_topic",
1294 Some(label.to_owned()),
1295 error,
1296 ));
1297 }
1298
1299 let mut partitions = topic
1300 .partitions
1301 .into_iter()
1302 .map(|partition| {
1303 if let Some(error) = partition.error_code.err() {
1304 return Err(admin_broker_error(
1305 "describe_topic_partition",
1306 Some(format!("{name}:{}", partition.partition_index)),
1307 error,
1308 ));
1309 }
1310
1311 Ok(TopicPartitionDescription {
1312 partition: partition.partition_index,
1313 leader_id: partition.leader_id.0,
1314 leader_epoch: partition.leader_epoch,
1315 replica_nodes: partition.replica_nodes.into_iter().map(|id| id.0).collect(),
1316 isr_nodes: partition.isr_nodes.into_iter().map(|id| id.0).collect(),
1317 offline_replicas: partition
1318 .offline_replicas
1319 .into_iter()
1320 .map(|id| id.0)
1321 .collect(),
1322 })
1323 })
1324 .collect::<Result<Vec<_>>>()?;
1325 partitions.sort_by_key(|partition| partition.partition);
1326
1327 descriptions.insert(
1328 name.clone(),
1329 TopicDescription {
1330 name,
1331 topic_id: (!topic.topic_id.is_nil()).then_some(topic.topic_id),
1332 is_internal: topic.is_internal,
1333 partitions,
1334 },
1335 );
1336 }
1337
1338 requested_topics
1339 .into_iter()
1340 .map(|topic| {
1341 descriptions.remove(&topic).ok_or_else(|| {
1342 anyhow!("metadata response did not include topic '{topic}'").into()
1343 })
1344 })
1345 .collect()
1346 }
1347
1348 #[instrument(name = "admin.describe_cluster", level = "debug", skip(self))]
1349 pub async fn describe_cluster(&self) -> Result<ClusterDescription> {
1351 let (mut connection, version) = self
1352 .connect_with_version::<DescribeClusterRequest>(DESCRIBE_CLUSTER_VERSION_CAP)
1353 .await?;
1354 let mut request =
1355 DescribeClusterRequest::default().with_include_cluster_authorized_operations(false);
1356 if version >= 1 {
1357 request = request.with_endpoint_type(1);
1358 }
1359 if version >= 2 {
1360 request = request.with_include_fenced_brokers(true);
1361 }
1362
1363 let response: DescribeClusterResponse = connection
1364 .send_request::<DescribeClusterRequest>(&self.config.client_id, version, &request)
1365 .await?;
1366 if let Some(error) = response.error_code.err() {
1367 return Err(admin_broker_error(
1368 "describe_cluster",
1369 None::<String>,
1370 error,
1371 ));
1372 }
1373
1374 let mut brokers = response
1375 .brokers
1376 .into_iter()
1377 .map(|broker| BrokerDescription {
1378 broker_id: broker.broker_id.0,
1379 host: broker.host.to_string(),
1380 port: broker.port,
1381 rack: broker.rack.map(|rack| rack.to_string()),
1382 is_fenced: broker.is_fenced,
1383 })
1384 .collect::<Vec<_>>();
1385 brokers.sort_by_key(|broker| broker.broker_id);
1386
1387 Ok(ClusterDescription {
1388 cluster_id: response.cluster_id.to_string(),
1389 controller_id: response.controller_id.0,
1390 brokers,
1391 })
1392 }
1393
1394 #[instrument(name = "admin.describe_log_dirs", level = "debug", skip(self, brokers))]
1395 pub async fn describe_log_dirs<I>(
1401 &self,
1402 brokers: I,
1403 topics: Option<&[TopicPartition]>,
1404 ) -> Result<Vec<BrokerLogDirs>>
1405 where
1406 I: IntoIterator<Item = BrokerDescription>,
1407 {
1408 let request_topics = topics.map(describe_log_dirs_topics);
1409 let mut handles = Vec::new();
1410 for broker in brokers {
1411 let config = self.config.clone();
1412 let request_topics = request_topics.clone();
1413 let broker_id = broker.broker_id;
1414 handles.push((
1415 broker_id,
1416 tokio::spawn(async move {
1417 describe_log_dirs_for_broker(config, broker, request_topics).await
1418 }),
1419 ));
1420 }
1421
1422 let mut results = Vec::with_capacity(handles.len());
1423 let mut first_error = None;
1424 for (broker_id, handle) in handles {
1425 match handle.await {
1426 Ok(Ok(broker_result)) => results.push(broker_result),
1427 Ok(Err(error)) => {
1428 first_error.get_or_insert(error);
1429 }
1430 Err(error) => {
1431 first_error.get_or_insert(
1432 anyhow!("describe log dirs for broker {broker_id} task failed: {error}")
1433 .into(),
1434 );
1435 }
1436 }
1437 }
1438 if let Some(error) = first_error {
1439 return Err(error);
1440 }
1441 results.sort_by_key(|broker| broker.broker_id);
1442 Ok(results)
1443 }
1444
1445 #[instrument(name = "admin.list_groups", level = "debug", skip(self))]
1446 pub async fn list_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
1450 let response: ListGroupsResponse = self
1451 .send_request::<ListGroupsRequest>(
1452 LIST_GROUPS_VERSION_CAP,
1453 &ListGroupsRequest::default(),
1454 )
1455 .await?;
1456 if let Some(error) = response.error_code.err() {
1457 return Err(admin_broker_error("list_groups", None::<String>, error));
1458 }
1459
1460 let mut groups = response
1461 .groups
1462 .into_iter()
1463 .map(|group| ConsumerGroupListing {
1464 group_id: group.group_id.to_string(),
1465 protocol_type: group.protocol_type.to_string(),
1466 state: (!group.group_state.is_empty()).then(|| group.group_state.to_string()),
1467 group_type: (!group.group_type.is_empty()).then(|| group.group_type.to_string()),
1468 })
1469 .collect::<Vec<_>>();
1470 groups.sort_by(|left, right| left.group_id.cmp(&right.group_id));
1471 Ok(groups)
1472 }
1473
1474 #[instrument(name = "admin.list_consumer_groups", level = "debug", skip(self))]
1475 pub async fn list_consumer_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
1477 self.list_groups().await
1478 }
1479
1480 #[instrument(name = "admin.describe_groups", level = "debug", skip(self, groups))]
1481 pub async fn describe_groups<I, S>(&self, groups: I) -> Result<Vec<GroupDescription>>
1487 where
1488 I: IntoIterator<Item = S>,
1489 S: Into<String>,
1490 {
1491 let group_ids = groups
1492 .into_iter()
1493 .map(|group| validate_group_id(group.into()))
1494 .collect::<Result<Vec<_>>>()?;
1495 if group_ids.is_empty() {
1496 return Ok(Vec::new());
1497 }
1498
1499 let listed_group_types = self
1500 .list_groups()
1501 .await?
1502 .into_iter()
1503 .map(|group| (group.group_id, group.group_type))
1504 .collect::<BTreeMap<_, _>>();
1505 let mut consumer_groups = Vec::new();
1506 let mut share_groups = Vec::new();
1507 let mut classic_groups = Vec::new();
1508 for group_id in &group_ids {
1509 match describe_group_api_for_listed_type(
1510 listed_group_types
1511 .get(group_id)
1512 .and_then(|group_type| group_type.as_deref()),
1513 ) {
1514 DescribeGroupApi::Share => {
1515 share_groups.push(group_id.clone());
1516 }
1517 DescribeGroupApi::Classic => {
1518 classic_groups.push(group_id.clone());
1519 }
1520 DescribeGroupApi::Consumer => {
1521 consumer_groups.push(group_id.clone());
1522 }
1523 }
1524 }
1525
1526 let mut descriptions = BTreeMap::new();
1527 for group in self.describe_consumer_groups(consumer_groups).await? {
1528 descriptions.insert(group.group_id.clone(), group);
1529 }
1530 for group in self.describe_share_groups(share_groups).await? {
1531 descriptions.insert(group.group_id.clone(), group);
1532 }
1533 for group in self.describe_classic_groups(classic_groups).await? {
1534 descriptions.insert(group.group_id.clone(), group);
1535 }
1536
1537 group_ids
1538 .into_iter()
1539 .map(|group_id| {
1540 descriptions.remove(&group_id).ok_or_else(|| {
1541 anyhow!("describe groups response did not include group '{group_id}'").into()
1542 })
1543 })
1544 .collect()
1545 }
1546
1547 #[instrument(
1548 name = "admin.describe_consumer_groups",
1549 level = "debug",
1550 skip(self, groups)
1551 )]
1552 pub async fn describe_consumer_groups<I, S>(
1556 &self,
1557 groups: I,
1558 ) -> Result<Vec<ConsumerGroupDescription>>
1559 where
1560 I: IntoIterator<Item = S>,
1561 S: Into<String>,
1562 {
1563 let group_ids = groups
1564 .into_iter()
1565 .map(|group| validate_group_id(group.into()))
1566 .collect::<Result<Vec<_>>>()?;
1567 if group_ids.is_empty() {
1568 return Ok(Vec::new());
1569 }
1570
1571 let request = ConsumerGroupDescribeRequest::default()
1572 .with_group_ids(
1573 group_ids
1574 .iter()
1575 .cloned()
1576 .map(StrBytes::from_string)
1577 .map(Into::into)
1578 .collect(),
1579 )
1580 .with_include_authorized_operations(false);
1581 let response: ConsumerGroupDescribeResponse = self
1582 .send_request::<ConsumerGroupDescribeRequest>(
1583 CONSUMER_GROUP_DESCRIBE_VERSION_CAP,
1584 &request,
1585 )
1586 .await?;
1587
1588 let mut descriptions = BTreeMap::new();
1589 for group in response.groups {
1590 let group_id = group.group_id.to_string();
1591 if let Some(error) = group.error_code.err() {
1592 return Err(admin_broker_error(
1593 "describe_consumer_group",
1594 Some(group_id),
1595 error,
1596 ));
1597 }
1598
1599 descriptions.insert(
1600 group_id.clone(),
1601 ConsumerGroupDescription {
1602 group_id,
1603 state: group.group_state.to_string(),
1604 protocol_type: "consumer".to_owned(),
1605 protocol_data: group.assignor_name.to_string(),
1606 members: group
1607 .members
1608 .into_iter()
1609 .map(|member| ConsumerGroupMemberDescription {
1610 member_id: member.member_id.to_string(),
1611 group_instance_id: member
1612 .instance_id
1613 .map(|instance_id| instance_id.to_string()),
1614 client_id: member.client_id.to_string(),
1615 client_host: member.client_host.to_string(),
1616 member_metadata_bytes: member.subscribed_topic_names.len(),
1617 member_assignment_bytes: assignment_partition_count(&member.assignment),
1618 })
1619 .collect(),
1620 authorized_operations: (group.authorized_operations != i32::MIN)
1621 .then_some(group.authorized_operations),
1622 },
1623 );
1624 }
1625
1626 group_ids
1627 .into_iter()
1628 .map(|group_id| {
1629 descriptions.remove(&group_id).ok_or_else(|| {
1630 anyhow!("describe groups response did not include group '{group_id}'").into()
1631 })
1632 })
1633 .collect()
1634 }
1635
1636 #[instrument(
1637 name = "admin.describe_classic_groups",
1638 level = "debug",
1639 skip(self, groups)
1640 )]
1641 pub async fn describe_classic_groups<I, S>(
1645 &self,
1646 groups: I,
1647 ) -> Result<Vec<ConsumerGroupDescription>>
1648 where
1649 I: IntoIterator<Item = S>,
1650 S: Into<String>,
1651 {
1652 let group_ids = groups
1653 .into_iter()
1654 .map(|group| validate_group_id(group.into()))
1655 .collect::<Result<Vec<_>>>()?;
1656 if group_ids.is_empty() {
1657 return Ok(Vec::new());
1658 }
1659
1660 let request = DescribeGroupsRequest::default()
1661 .with_groups(
1662 group_ids
1663 .iter()
1664 .cloned()
1665 .map(StrBytes::from_string)
1666 .map(Into::into)
1667 .collect(),
1668 )
1669 .with_include_authorized_operations(false);
1670 let response: DescribeGroupsResponse = self
1671 .send_request::<DescribeGroupsRequest>(DESCRIBE_GROUPS_VERSION_CAP, &request)
1672 .await?;
1673
1674 let mut descriptions = BTreeMap::new();
1675 for group in response.groups {
1676 let group_id = group.group_id.to_string();
1677 if let Some(error) = group.error_code.err() {
1678 return Err(admin_broker_error(
1679 "describe_classic_group",
1680 Some(group_id),
1681 error,
1682 ));
1683 }
1684
1685 descriptions.insert(
1686 group_id.clone(),
1687 ConsumerGroupDescription {
1688 group_id,
1689 state: group.group_state.to_string(),
1690 protocol_type: group.protocol_type.to_string(),
1691 protocol_data: group.protocol_data.to_string(),
1692 members: group
1693 .members
1694 .into_iter()
1695 .map(|member| {
1696 let member_metadata_bytes =
1697 classic_subscription_topic_count(&member.member_metadata);
1698 let member_assignment_bytes =
1699 classic_assignment_partition_count(&member.member_assignment);
1700 ConsumerGroupMemberDescription {
1701 member_id: member.member_id.to_string(),
1702 group_instance_id: member
1703 .group_instance_id
1704 .map(|instance_id| instance_id.to_string()),
1705 client_id: member.client_id.to_string(),
1706 client_host: member.client_host.to_string(),
1707 member_metadata_bytes,
1708 member_assignment_bytes,
1709 }
1710 })
1711 .collect(),
1712 authorized_operations: (group.authorized_operations != i32::MIN)
1713 .then_some(group.authorized_operations),
1714 },
1715 );
1716 }
1717
1718 group_ids
1719 .into_iter()
1720 .map(|group_id| {
1721 descriptions.remove(&group_id).ok_or_else(|| {
1722 anyhow!("describe classic groups response did not include group '{group_id}'")
1723 .into()
1724 })
1725 })
1726 .collect()
1727 }
1728
1729 #[instrument(
1730 name = "admin.describe_share_groups",
1731 level = "debug",
1732 skip(self, groups)
1733 )]
1734 pub async fn describe_share_groups<I, S>(
1738 &self,
1739 groups: I,
1740 ) -> Result<Vec<ConsumerGroupDescription>>
1741 where
1742 I: IntoIterator<Item = S>,
1743 S: Into<String>,
1744 {
1745 let group_ids = groups
1746 .into_iter()
1747 .map(|group| validate_group_id(group.into()))
1748 .collect::<Result<Vec<_>>>()?;
1749 if group_ids.is_empty() {
1750 return Ok(Vec::new());
1751 }
1752
1753 let request = ShareGroupDescribeRequest::default()
1754 .with_group_ids(
1755 group_ids
1756 .iter()
1757 .cloned()
1758 .map(StrBytes::from_string)
1759 .map(Into::into)
1760 .collect(),
1761 )
1762 .with_include_authorized_operations(false);
1763 let response: ShareGroupDescribeResponse = self
1764 .send_request::<ShareGroupDescribeRequest>(SHARE_GROUP_DESCRIBE_VERSION_CAP, &request)
1765 .await?;
1766
1767 let mut descriptions = BTreeMap::new();
1768 for group in response.groups {
1769 let group_id = group.group_id.to_string();
1770 if let Some(error) = group.error_code.err() {
1771 return Err(admin_broker_error(
1772 "describe_share_group",
1773 Some(group_id),
1774 error,
1775 ));
1776 }
1777
1778 descriptions.insert(
1779 group_id.clone(),
1780 ConsumerGroupDescription {
1781 group_id,
1782 state: group.group_state.to_string(),
1783 protocol_type: "share".to_owned(),
1784 protocol_data: group.assignor_name.to_string(),
1785 members: group
1786 .members
1787 .into_iter()
1788 .map(|member| ConsumerGroupMemberDescription {
1789 member_id: member.member_id.to_string(),
1790 group_instance_id: None,
1791 client_id: member.client_id.to_string(),
1792 client_host: member.client_host.to_string(),
1793 member_metadata_bytes: member.subscribed_topic_names.len(),
1794 member_assignment_bytes: share_assignment_partition_count(
1795 &member.assignment,
1796 ),
1797 })
1798 .collect(),
1799 authorized_operations: (group.authorized_operations != i32::MIN)
1800 .then_some(group.authorized_operations),
1801 },
1802 );
1803 }
1804
1805 group_ids
1806 .into_iter()
1807 .map(|group_id| {
1808 descriptions.remove(&group_id).ok_or_else(|| {
1809 anyhow!("describe share groups response did not include group '{group_id}'")
1810 .into()
1811 })
1812 })
1813 .collect()
1814 }
1815
1816 #[instrument(name = "admin.delete_groups", level = "debug", skip(self, groups))]
1817 pub async fn delete_groups<I, S>(&self, groups: I) -> Result<()>
1821 where
1822 I: IntoIterator<Item = S>,
1823 S: Into<String>,
1824 {
1825 let group_ids = groups
1826 .into_iter()
1827 .map(|group| validate_group_id(group.into()))
1828 .collect::<Result<Vec<_>>>()?;
1829 if group_ids.is_empty() {
1830 return Ok(());
1831 }
1832
1833 let request = DeleteGroupsRequest::default().with_groups_names(
1834 group_ids
1835 .iter()
1836 .cloned()
1837 .map(StrBytes::from_string)
1838 .map(Into::into)
1839 .collect(),
1840 );
1841 let response: DeleteGroupsResponse = self
1842 .send_request::<DeleteGroupsRequest>(DELETE_GROUPS_VERSION_CAP, &request)
1843 .await?;
1844
1845 for result in response.results {
1846 if let Some(error) = result.error_code.err() {
1847 return Err(admin_broker_error(
1848 "delete_group",
1849 Some(result.group_id.to_string()),
1850 error,
1851 ));
1852 }
1853 }
1854 Ok(())
1855 }
1856
1857 #[instrument(
1858 name = "admin.delete_consumer_groups",
1859 level = "debug",
1860 skip(self, groups)
1861 )]
1862 pub async fn delete_consumer_groups<I, S>(&self, groups: I) -> Result<()>
1864 where
1865 I: IntoIterator<Item = S>,
1866 S: Into<String>,
1867 {
1868 self.delete_groups(groups).await
1869 }
1870
1871 #[instrument(name = "admin.describe_acls", level = "debug", skip(self, filter))]
1872 pub async fn describe_acls(&self, filter: AclBindingFilter) -> Result<Vec<AclBinding>> {
1876 let request = filter.to_describe_request();
1877 let response: DescribeAclsResponse = self
1878 .send_request::<DescribeAclsRequest>(DESCRIBE_ACLS_VERSION_CAP, &request)
1879 .await?;
1880 if let Some(error) = response.error_code.err() {
1881 return Err(admin_broker_error("describe_acls", None::<String>, error));
1882 }
1883
1884 let mut bindings = Vec::new();
1885 for resource in response.resources {
1886 let pattern = ResourcePattern::new(
1887 ResourceType::from_protocol_value(resource.resource_type),
1888 resource.resource_name.to_string(),
1889 PatternType::from_protocol_value(resource.pattern_type),
1890 );
1891 for acl in resource.acls {
1892 bindings.push(AclBinding::new(
1893 pattern.clone(),
1894 AccessControlEntry::new(
1895 acl.principal.to_string(),
1896 acl.host.to_string(),
1897 AclOperation::from_protocol_value(acl.operation),
1898 AclPermissionType::from_protocol_value(acl.permission_type),
1899 ),
1900 ));
1901 }
1902 }
1903 bindings.sort_by(|left, right| {
1904 left.pattern
1905 .name
1906 .cmp(&right.pattern.name)
1907 .then_with(|| left.entry.principal.cmp(&right.entry.principal))
1908 .then_with(|| left.entry.host.cmp(&right.entry.host))
1909 });
1910 Ok(bindings)
1911 }
1912
1913 #[instrument(name = "admin.create_acls", level = "debug", skip(self, acls))]
1914 pub async fn create_acls<I>(&self, acls: I) -> Result<()>
1918 where
1919 I: IntoIterator<Item = AclBinding>,
1920 {
1921 let creations = acls
1922 .into_iter()
1923 .map(AclBinding::into_creation)
1924 .collect::<Result<Vec<_>>>()?;
1925 if creations.is_empty() {
1926 return Ok(());
1927 }
1928
1929 let request = CreateAclsRequest::default().with_creations(creations);
1930 let response: CreateAclsResponse = self
1931 .send_request::<CreateAclsRequest>(CREATE_ACLS_VERSION_CAP, &request)
1932 .await?;
1933 for result in response.results {
1934 if let Some(error) = result.error_code.err() {
1935 return Err(admin_broker_error("create_acl", None::<String>, error));
1936 }
1937 }
1938 Ok(())
1939 }
1940
1941 #[instrument(name = "admin.delete_acls", level = "debug", skip(self, filters))]
1942 pub async fn delete_acls<I>(&self, filters: I) -> Result<Vec<DeleteAclsResult>>
1946 where
1947 I: IntoIterator<Item = AclBindingFilter>,
1948 {
1949 let filters = filters
1950 .into_iter()
1951 .map(AclBindingFilter::into_delete_filter)
1952 .collect::<Vec<_>>();
1953 if filters.is_empty() {
1954 return Ok(Vec::new());
1955 }
1956
1957 let request = DeleteAclsRequest::default().with_filters(filters);
1958 let response: DeleteAclsResponse = self
1959 .send_request::<DeleteAclsRequest>(DELETE_ACLS_VERSION_CAP, &request)
1960 .await?;
1961 let mut results = Vec::new();
1962 for result in response.filter_results {
1963 if let Some(error) = result.error_code.err() {
1964 return Err(admin_broker_error("delete_acls", None::<String>, error));
1965 }
1966 let mut matching_acls = Vec::new();
1967 for acl in result.matching_acls {
1968 if let Some(error) = acl.error_code.err() {
1969 return Err(admin_broker_error(
1970 "delete_acl",
1971 Some(acl.resource_name.to_string()),
1972 error,
1973 ));
1974 }
1975 matching_acls.push(AclBinding::new(
1976 ResourcePattern::new(
1977 ResourceType::from_protocol_value(acl.resource_type),
1978 acl.resource_name.to_string(),
1979 PatternType::from_protocol_value(acl.pattern_type),
1980 ),
1981 AccessControlEntry::new(
1982 acl.principal.to_string(),
1983 acl.host.to_string(),
1984 AclOperation::from_protocol_value(acl.operation),
1985 AclPermissionType::from_protocol_value(acl.permission_type),
1986 ),
1987 ));
1988 }
1989 results.push(DeleteAclsResult { matching_acls });
1990 }
1991 Ok(results)
1992 }
1993
1994 #[instrument(
1995 name = "admin.describe_configs",
1996 level = "debug",
1997 skip(self, resources)
1998 )]
1999 pub async fn describe_configs<I>(&self, resources: I) -> Result<Vec<ConfigResourceConfig>>
2016 where
2017 I: IntoIterator<Item = ConfigResource>,
2018 {
2019 let resources = resources.into_iter().collect::<Vec<_>>();
2020 if resources.is_empty() {
2021 return Ok(Vec::new());
2022 }
2023
2024 let request = DescribeConfigsRequest::default()
2025 .with_resources(
2026 resources
2027 .iter()
2028 .map(|resource| {
2029 DescribeConfigsResource::default()
2030 .with_resource_type(resource.resource_type.as_protocol_value())
2031 .with_resource_name(StrBytes::from_string(
2032 resource.resource_name.clone(),
2033 ))
2034 .with_configuration_keys(None)
2035 })
2036 .collect(),
2037 )
2038 .with_include_synonyms(false);
2039 let response: DescribeConfigsResponse = self
2040 .send_request::<DescribeConfigsRequest>(DESCRIBE_CONFIGS_VERSION_CAP, &request)
2041 .await?;
2042
2043 let mut described = BTreeMap::new();
2044 for resource in response.results {
2045 let resource_type = ConfigResourceType::from_protocol_value(resource.resource_type);
2046 let resource_name = resource.resource_name.to_string();
2047 if let Some(error) = resource.error_code.err() {
2048 return Err(admin_broker_error(
2049 "describe_configs",
2050 Some(format!("{resource_type:?}:{resource_name}")),
2051 error,
2052 ));
2053 }
2054
2055 let entries = resource
2056 .configs
2057 .into_iter()
2058 .map(|entry| {
2059 let name = entry.name.to_string();
2060 let config_entry = ConfigEntry {
2061 name: name.clone(),
2062 value: entry.value.map(|value| value.to_string()),
2063 read_only: entry.read_only,
2064 config_source: entry.config_source,
2065 is_sensitive: entry.is_sensitive,
2066 config_type: (response_supported_config_type(entry.config_type))
2067 .then_some(entry.config_type),
2068 documentation: entry.documentation.map(|doc| doc.to_string()),
2069 };
2070 (name, config_entry)
2071 })
2072 .collect();
2073
2074 described.insert(
2075 (resource_type, resource_name.clone()),
2076 ConfigResourceConfig {
2077 resource: ConfigResource::new(resource_type, resource_name),
2078 entries,
2079 },
2080 );
2081 }
2082
2083 resources
2084 .into_iter()
2085 .map(|resource| {
2086 described
2087 .remove(&(resource.resource_type, resource.resource_name.clone()))
2088 .ok_or_else(|| {
2089 anyhow!("describe configs response did not include {:?}", resource).into()
2090 })
2091 })
2092 .collect()
2093 }
2094
2095 #[instrument(
2096 name = "admin.incremental_alter_configs",
2097 level = "debug",
2098 skip(self, resources)
2099 )]
2100 pub async fn incremental_alter_configs<I>(&self, resources: I) -> Result<()>
2116 where
2117 I: IntoIterator<Item = (ConfigResource, Vec<AlterConfigOp>)>,
2118 {
2119 let resources = resources
2120 .into_iter()
2121 .map(|(resource, ops)| {
2122 AlterConfigsResource::default()
2123 .with_resource_type(resource.resource_type.as_protocol_value())
2124 .with_resource_name(StrBytes::from_string(resource.resource_name))
2125 .with_configs(
2126 ops.into_iter()
2127 .map(|op| {
2128 AlterableConfig::default()
2129 .with_name(StrBytes::from_string(op.name))
2130 .with_config_operation(op.op_type.as_protocol_value())
2131 .with_value(op.value.map(StrBytes::from_string))
2132 })
2133 .collect(),
2134 )
2135 })
2136 .collect::<Vec<_>>();
2137 if resources.is_empty() {
2138 return Ok(());
2139 }
2140
2141 let request = IncrementalAlterConfigsRequest::default()
2142 .with_resources(resources)
2143 .with_validate_only(false);
2144 let response: IncrementalAlterConfigsResponse = self
2145 .send_request::<IncrementalAlterConfigsRequest>(
2146 INCREMENTAL_ALTER_CONFIGS_VERSION_CAP,
2147 &request,
2148 )
2149 .await?;
2150
2151 for resource in response.responses {
2152 if let Some(error) = resource.error_code.err() {
2153 let resource_type = ConfigResourceType::from_protocol_value(resource.resource_type);
2154 return Err(admin_broker_error(
2155 "incremental_alter_configs",
2156 Some(format!("{resource_type:?}:{}", resource.resource_name)),
2157 error,
2158 ));
2159 }
2160 }
2161
2162 Ok(())
2163 }
2164
2165 #[instrument(
2166 name = "admin.upsert_scram_credential",
2167 level = "debug",
2168 skip(self, user, password)
2169 )]
2170 pub async fn upsert_scram_credential(
2177 &self,
2178 user: impl Into<String>,
2179 mechanism: SaslMechanism,
2180 password: impl AsRef<[u8]>,
2181 ) -> Result<()> {
2182 self.upsert_scram_credential_with_iterations(
2183 user,
2184 mechanism,
2185 password,
2186 scram::MIN_ITERATIONS,
2187 )
2188 .await
2189 }
2190
2191 #[instrument(
2192 name = "admin.upsert_scram_credential_with_iterations",
2193 level = "debug",
2194 skip(self, user, password)
2195 )]
2196 pub async fn upsert_scram_credential_with_iterations(
2198 &self,
2199 user: impl Into<String>,
2200 mechanism: SaslMechanism,
2201 password: impl AsRef<[u8]>,
2202 iterations: i32,
2203 ) -> Result<()> {
2204 let user = user.into();
2205 let mechanism_type = mechanism
2206 .scram_type()
2207 .ok_or_else(|| anyhow!("SCRAM credential upsertion requires a SCRAM mechanism"))?;
2208 let salt = scram::secure_random_bytes()?;
2209 let salted_password =
2210 scram::salted_password(mechanism, password.as_ref(), &salt, iterations)?;
2211 let request = AlterUserScramCredentialsRequest::default().with_upsertions(vec![
2212 ScramCredentialUpsertion::default()
2213 .with_name(StrBytes::from_string(user.clone()))
2214 .with_mechanism(mechanism_type)
2215 .with_iterations(iterations)
2216 .with_salt(Bytes::from(salt))
2217 .with_salted_password(Bytes::from(salted_password)),
2218 ]);
2219 let response: AlterUserScramCredentialsResponse = self
2220 .send_request::<AlterUserScramCredentialsRequest>(
2221 ALTER_USER_SCRAM_CREDENTIALS_VERSION_CAP,
2222 &request,
2223 )
2224 .await?;
2225
2226 for result in response.results {
2227 if let Some(error) = result.error_code.err() {
2228 return Err(admin_broker_error(
2229 "alter_scram_credential",
2230 Some(result.user.to_string()),
2231 error,
2232 ));
2233 }
2234 }
2235
2236 Ok(())
2237 }
2238
2239 pub fn config(&self) -> &AdminConfig {
2241 &self.config
2242 }
2243
2244 pub async fn finalized_feature_levels(&self) -> Result<Vec<BrokerFeatureLevel>> {
2246 let connection = connect_to_any_bootstrap(
2247 &self.config.bootstrap_servers,
2248 &self.config.client_id,
2249 self.config.request_timeout,
2250 self.config.security_protocol,
2251 &self.config.tls,
2252 &self.config.sasl,
2253 &self.config.tcp_connector,
2254 )
2255 .await?;
2256 Ok(connection
2257 .finalized_feature_levels()
2258 .into_iter()
2259 .map(|(name, level)| BrokerFeatureLevel { name, level })
2260 .collect())
2261 }
2262
2263 pub async fn update_features<I>(&self, updates: I) -> Result<()>
2269 where
2270 I: IntoIterator<Item = FeatureUpdate>,
2271 {
2272 self.update_features_inner(updates, false).await
2273 }
2274
2275 pub async fn validate_feature_updates<I>(&self, updates: I) -> Result<()>
2277 where
2278 I: IntoIterator<Item = FeatureUpdate>,
2279 {
2280 self.update_features_inner(updates, true).await
2281 }
2282
2283 async fn update_features_inner<I>(&self, updates: I, validate_only: bool) -> Result<()>
2284 where
2285 I: IntoIterator<Item = FeatureUpdate>,
2286 {
2287 let (mut connection, version) = self
2288 .connect_with_version::<UpdateFeaturesRequest>(UPDATE_FEATURES_VERSION_CAP)
2289 .await?;
2290 let feature_updates = updates
2291 .into_iter()
2292 .map(|update| update.into_request_update(version))
2293 .collect::<Result<Vec<_>>>()?;
2294 if feature_updates.is_empty() {
2295 return Ok(());
2296 }
2297
2298 let mut request = UpdateFeaturesRequest::default()
2299 .with_timeout_ms(duration_to_i32_ms(self.config.request_timeout)?)
2300 .with_feature_updates(feature_updates);
2301 if version >= 1 {
2302 request = request.with_validate_only(validate_only);
2303 } else if validate_only {
2304 return Err(anyhow!("validate-only feature updates require UpdateFeatures v1+").into());
2305 }
2306
2307 let response: UpdateFeaturesResponse = connection
2308 .send_request::<UpdateFeaturesRequest>(&self.config.client_id, version, &request)
2309 .await?;
2310 if let Some(error) = response.error_code.err() {
2311 return Err(admin_broker_error("update_features", None::<String>, error));
2312 }
2313
2314 for result in response.results {
2315 if let Some(error) = result.error_code.err() {
2316 let feature = result.feature.to_string();
2317 return Err(admin_broker_error("update_feature", Some(feature), error));
2318 }
2319 }
2320
2321 Ok(())
2322 }
2323
2324 async fn warm_up(&self) -> Result<()> {
2325 let _ = connect_to_any_bootstrap(
2326 &self.config.bootstrap_servers,
2327 &self.config.client_id,
2328 self.config.request_timeout,
2329 self.config.security_protocol,
2330 &self.config.tls,
2331 &self.config.sasl,
2332 &self.config.tcp_connector,
2333 )
2334 .await?;
2335 Ok(())
2336 }
2337
2338 async fn fetch_metadata(&self, topics: Option<&[String]>) -> Result<MetadataResponse> {
2339 let (mut connection, version) = self
2340 .connect_with_version::<MetadataRequest>(METADATA_VERSION_CAP)
2341 .await?;
2342 let request = MetadataRequest::default()
2343 .with_topics(topics.map(|topics| {
2344 topics
2345 .iter()
2346 .cloned()
2347 .map(StrBytes::from_string)
2348 .map(|name| MetadataRequestTopic::default().with_name(Some(name.into())))
2349 .collect()
2350 }))
2351 .with_allow_auto_topic_creation(false)
2352 .with_include_cluster_authorized_operations(false)
2353 .with_include_topic_authorized_operations(false);
2354 Ok(connection
2355 .send_request::<MetadataRequest>(&self.config.client_id, version, &request)
2356 .await?)
2357 }
2358
2359 async fn send_request<Req>(&self, version_cap: i16, request: &Req) -> Result<Req::Response>
2360 where
2361 Req: Request,
2362 {
2363 let (mut connection, version) = self.connect_with_version::<Req>(version_cap).await?;
2364 Ok(connection
2365 .send_request::<Req>(&self.config.client_id, version, request)
2366 .await?)
2367 }
2368
2369 async fn connect_with_version<Req>(&self, version_cap: i16) -> Result<(BrokerConnection, i16)>
2370 where
2371 Req: Request,
2372 {
2373 let connection = connect_to_any_bootstrap(
2374 &self.config.bootstrap_servers,
2375 &self.config.client_id,
2376 self.config.request_timeout,
2377 self.config.security_protocol,
2378 &self.config.tls,
2379 &self.config.sasl,
2380 &self.config.tcp_connector,
2381 )
2382 .await?;
2383 let version = connection.version_with_cap::<Req>(version_cap)?;
2384 Ok((connection, version))
2385 }
2386}
2387
2388fn validate_topic_name(topic: String) -> Result<String> {
2389 let topic = topic.trim();
2390 if topic.is_empty() {
2391 return Err(AdminError::EmptyTopicName.into());
2392 }
2393
2394 Ok(topic.to_owned())
2395}
2396
2397fn admin_broker_error(
2398 operation: &'static str,
2399 resource: impl Into<Option<String>>,
2400 error: ResponseError,
2401) -> crate::Error {
2402 BrokerError::response(operation, resource, error).into()
2403}
2404
2405async fn describe_log_dirs_for_broker(
2406 mut config: AdminConfig,
2407 broker: BrokerDescription,
2408 request_topics: Option<Vec<DescribableLogDirTopic>>,
2409) -> Result<BrokerLogDirs> {
2410 let address = format!("{}:{}", broker.host, broker.port);
2411 config.bootstrap_servers = vec![address];
2412 let broker_admin = KafkaAdmin { config };
2413 let response: DescribeLogDirsResponse = broker_admin
2414 .send_request::<DescribeLogDirsRequest>(
2415 DESCRIBE_LOG_DIRS_VERSION_CAP,
2416 &DescribeLogDirsRequest::default().with_topics(request_topics),
2417 )
2418 .await?;
2419 if let Some(error) = ResponseError::try_from_code(response.error_code) {
2420 return Err(admin_broker_error(
2421 "describe_log_dirs",
2422 Some(format!("broker {}", broker.broker_id)),
2423 error,
2424 ));
2425 }
2426
2427 let mut log_dirs = Vec::new();
2428 for result in response.results {
2429 let replicas = result
2430 .topics
2431 .into_iter()
2432 .flat_map(|topic| {
2433 let topic_name = topic.name.to_string();
2434 topic
2435 .partitions
2436 .into_iter()
2437 .map(move |partition| ReplicaLogDirDescription {
2438 topic: topic_name.clone(),
2439 partition: partition.partition_index,
2440 size_bytes: partition.partition_size.max(0),
2441 offset_lag: partition.offset_lag,
2442 is_future: partition.is_future_key,
2443 })
2444 })
2445 .collect();
2446 log_dirs.push(LogDirDescription {
2447 log_dir: result.log_dir.to_string(),
2448 error_code: result.error_code,
2449 total_bytes: (result.total_bytes >= 0).then_some(result.total_bytes),
2450 usable_bytes: (result.usable_bytes >= 0).then_some(result.usable_bytes),
2451 replicas,
2452 });
2453 }
2454 log_dirs.sort_by(|left, right| left.log_dir.cmp(&right.log_dir));
2455 Ok(BrokerLogDirs {
2456 broker_id: broker.broker_id,
2457 log_dirs,
2458 })
2459}
2460
2461fn describe_log_dirs_topics(partitions: &[TopicPartition]) -> Vec<DescribableLogDirTopic> {
2462 let mut topics = BTreeMap::<String, Vec<i32>>::new();
2463 for partition in partitions {
2464 topics
2465 .entry(partition.topic.clone())
2466 .or_default()
2467 .push(partition.partition);
2468 }
2469 topics
2470 .into_iter()
2471 .map(|(topic, mut partitions)| {
2472 partitions.sort_unstable();
2473 partitions.dedup();
2474 DescribableLogDirTopic::default()
2475 .with_topic(StrBytes::from_string(topic).into())
2476 .with_partitions(partitions)
2477 })
2478 .collect()
2479}
2480
2481fn validate_group_id(group_id: String) -> Result<String> {
2482 let group_id = group_id.trim();
2483 if group_id.is_empty() {
2484 return Err(ValidationError::EmptyConsumerGroupId.into());
2485 }
2486
2487 Ok(group_id.to_owned())
2488}
2489
2490fn describe_group_api_for_listed_type(group_type: Option<&str>) -> DescribeGroupApi {
2491 match group_type {
2492 Some(group_type) if group_type.eq_ignore_ascii_case("consumer") => {
2493 DescribeGroupApi::Consumer
2494 }
2495 Some(group_type) if group_type.eq_ignore_ascii_case("share") => DescribeGroupApi::Share,
2496 _ => DescribeGroupApi::Classic,
2497 }
2498}
2499
2500fn validate_feature_name(feature: String) -> Result<String> {
2501 let feature = feature.trim();
2502 if feature.is_empty() {
2503 return Err(ValidationError::EmptyFeatureName.into());
2504 }
2505
2506 Ok(feature.to_owned())
2507}
2508
2509fn assignment_partition_count(
2510 assignment: &kafka_protocol::messages::consumer_group_describe_response::Assignment,
2511) -> usize {
2512 assignment
2513 .topic_partitions
2514 .iter()
2515 .map(|topic| topic.partitions.len())
2516 .sum()
2517}
2518
2519fn share_assignment_partition_count(
2520 assignment: &kafka_protocol::messages::share_group_describe_response::Assignment,
2521) -> usize {
2522 assignment
2523 .topic_partitions
2524 .iter()
2525 .map(|topic| topic.partitions.len())
2526 .sum()
2527}
2528
2529fn classic_subscription_topic_count(metadata: &Bytes) -> usize {
2530 let Some((version, mut body)) = classic_protocol_body(metadata) else {
2531 return 0;
2532 };
2533 ConsumerProtocolSubscription::decode(&mut body, version)
2534 .map(|subscription| subscription.topics.len())
2535 .unwrap_or(0)
2536}
2537
2538fn classic_assignment_partition_count(assignment: &Bytes) -> usize {
2539 let Some((version, mut body)) = classic_protocol_body(assignment) else {
2540 return 0;
2541 };
2542 ConsumerProtocolAssignment::decode(&mut body, version)
2543 .map(|assignment| {
2544 assignment
2545 .assigned_partitions
2546 .iter()
2547 .map(|topic| topic.partitions.len())
2548 .sum()
2549 })
2550 .unwrap_or(0)
2551}
2552
2553fn classic_protocol_body(bytes: &Bytes) -> Option<(i16, Bytes)> {
2554 let mut body = bytes.clone();
2555 if body.remaining() < 2 {
2556 return None;
2557 }
2558 let version = body.get_i16();
2559 Some((version, body))
2560}
2561
2562fn is_ignorable_create_topic_error(error: ResponseError) -> bool {
2563 error == ResponseError::TopicAlreadyExists
2564}
2565
2566fn response_supported_config_type(config_type: i8) -> bool {
2567 config_type >= 0
2568}
2569
2570#[cfg(test)]
2571mod tests {
2572 use super::*;
2573 use bytes::{BufMut, BytesMut};
2574 use kafka_protocol::protocol::Encodable;
2575
2576 #[test]
2577 fn new_topic_maps_to_create_topics_request() {
2578 let topic = NewTopic::new("orders", 3, 2)
2579 .with_config("cleanup.policy", "compact")
2580 .into_request_topic()
2581 .expect("topic should be valid");
2582
2583 assert_eq!(topic.name.0.to_string(), "orders");
2584 assert_eq!(topic.num_partitions, 3);
2585 assert_eq!(topic.replication_factor, 2);
2586 assert_eq!(topic.configs.len(), 1);
2587 assert_eq!(topic.configs[0].name.to_string(), "cleanup.policy");
2588 }
2589
2590 #[test]
2591 fn new_topic_rejects_invalid_partition_count() {
2592 let error = NewTopic::new("orders", 0, 1)
2593 .into_request_topic()
2594 .expect_err("invalid topic should fail");
2595 assert!(
2596 error
2597 .to_string()
2598 .contains("topic partition count must be positive")
2599 );
2600 }
2601
2602 #[test]
2603 fn new_topic_rejects_empty_names_and_invalid_replication_factor() {
2604 let error = NewTopic::new(" ", 1, 1).into_request_topic().unwrap_err();
2605 assert!(matches!(
2606 error,
2607 crate::Error::Admin(AdminError::EmptyTopicName)
2608 ));
2609
2610 let error = NewTopic::new("orders", 1, 0)
2611 .into_request_topic()
2612 .unwrap_err();
2613 assert!(matches!(
2614 error,
2615 crate::Error::Admin(AdminError::InvalidReplicationFactor {
2616 replication_factor: 0
2617 })
2618 ));
2619 }
2620
2621 #[test]
2622 fn new_partitions_maps_assignments_and_rejects_invalid_input() {
2623 let topic = NewPartitions::increase_to(4)
2624 .with_assignment([1, 2])
2625 .with_assignment([2, 3])
2626 .into_request_topic("orders".to_owned())
2627 .unwrap();
2628 assert_eq!(topic.name.to_string(), "orders");
2629 assert_eq!(topic.count, 4);
2630 assert_eq!(topic.assignments.unwrap().len(), 2);
2631
2632 let error = NewPartitions::increase_to(0)
2633 .into_request_topic("orders".to_owned())
2634 .unwrap_err();
2635 assert!(matches!(
2636 error,
2637 crate::Error::Admin(AdminError::InvalidPartitionCount { partitions: 0 })
2638 ));
2639
2640 let error = NewPartitions::increase_to(1)
2641 .into_request_topic(" ".to_owned())
2642 .unwrap_err();
2643 assert!(matches!(
2644 error,
2645 crate::Error::Admin(AdminError::EmptyTopicName)
2646 ));
2647 }
2648
2649 #[test]
2650 fn config_resource_and_operation_protocol_values_are_stable() {
2651 assert_eq!(ConfigResourceType::Unknown.as_protocol_value(), 0);
2652 assert_eq!(ConfigResourceType::Topic.as_protocol_value(), 2);
2653 assert_eq!(ConfigResourceType::Broker.as_protocol_value(), 4);
2654 assert_eq!(ConfigResourceType::BrokerLogger.as_protocol_value(), 8);
2655 assert_eq!(ConfigResourceType::ClientMetrics.as_protocol_value(), 16);
2656 assert_eq!(ConfigResourceType::Group.as_protocol_value(), 32);
2657 assert_eq!(
2658 ConfigResourceType::from_protocol_value(2),
2659 ConfigResourceType::Topic
2660 );
2661 assert_eq!(
2662 ConfigResourceType::from_protocol_value(99),
2663 ConfigResourceType::Unknown
2664 );
2665
2666 assert_eq!(
2667 AlterConfigOp::set("cleanup.policy", "compact")
2668 .op_type
2669 .as_protocol_value(),
2670 0
2671 );
2672 assert_eq!(
2673 AlterConfigOp::delete("retention.ms")
2674 .op_type
2675 .as_protocol_value(),
2676 1
2677 );
2678 assert_eq!(AlterConfigOpType::Append.as_protocol_value(), 2);
2679 assert_eq!(AlterConfigOpType::Subtract.as_protocol_value(), 3);
2680 assert!(response_supported_config_type(0));
2681 assert!(!response_supported_config_type(-1));
2682 }
2683
2684 #[test]
2685 fn config_resource_config_entry_lookup_returns_named_entry() {
2686 let mut entries = BTreeMap::new();
2687 entries.insert(
2688 "cleanup.policy".to_owned(),
2689 ConfigEntry {
2690 name: "cleanup.policy".to_owned(),
2691 value: Some("compact".to_owned()),
2692 read_only: false,
2693 config_source: 1,
2694 is_sensitive: false,
2695 config_type: Some(2),
2696 documentation: None,
2697 },
2698 );
2699 let config = ConfigResourceConfig {
2700 resource: ConfigResource::topic("orders"),
2701 entries,
2702 };
2703
2704 assert_eq!(
2705 config.entry("cleanup.policy").unwrap().value.as_deref(),
2706 Some("compact")
2707 );
2708 assert!(config.entry("missing").is_none());
2709 }
2710
2711 #[test]
2712 fn describe_group_api_routing_uses_legacy_api_for_classic_groups() {
2713 assert_eq!(
2714 describe_group_api_for_listed_type(None),
2715 DescribeGroupApi::Classic
2716 );
2717 assert_eq!(
2718 describe_group_api_for_listed_type(Some("classic")),
2719 DescribeGroupApi::Classic
2720 );
2721 assert_eq!(
2722 describe_group_api_for_listed_type(Some("consumer")),
2723 DescribeGroupApi::Consumer
2724 );
2725 assert_eq!(
2726 describe_group_api_for_listed_type(Some("share")),
2727 DescribeGroupApi::Share
2728 );
2729 }
2730
2731 #[test]
2732 fn acl_binding_creation_validates_and_maps_protocol_values() {
2733 let creation = AclBinding::new(
2734 ResourcePattern::new(ResourceType::Topic, "orders", PatternType::Prefixed),
2735 AccessControlEntry::new(
2736 "User:alice",
2737 "*",
2738 AclOperation::Read,
2739 AclPermissionType::Allow,
2740 ),
2741 )
2742 .into_creation()
2743 .unwrap();
2744
2745 assert_eq!(creation.resource_type, 2);
2746 assert_eq!(creation.resource_name.to_string(), "orders");
2747 assert_eq!(creation.resource_pattern_type, 4);
2748 assert_eq!(creation.principal.to_string(), "User:alice");
2749 assert_eq!(creation.host.to_string(), "*");
2750 assert_eq!(creation.operation, 3);
2751 assert_eq!(creation.permission_type, 3);
2752
2753 let error = AclBinding::new(
2754 ResourcePattern::new(ResourceType::Any, "orders", PatternType::Literal),
2755 AccessControlEntry::new(
2756 "User:alice",
2757 "*",
2758 AclOperation::Read,
2759 AclPermissionType::Allow,
2760 ),
2761 )
2762 .into_creation()
2763 .unwrap_err();
2764 assert!(error.to_string().contains("resource_type must not be ANY"));
2765
2766 let error = AclBinding::new(
2767 ResourcePattern::new(ResourceType::Topic, "orders", PatternType::Literal),
2768 AccessControlEntry::new(
2769 "User:alice",
2770 "*",
2771 AclOperation::Any,
2772 AclPermissionType::Allow,
2773 ),
2774 )
2775 .into_creation()
2776 .unwrap_err();
2777 assert!(error.to_string().contains("operation must not be ANY"));
2778 }
2779
2780 #[test]
2781 fn acl_binding_filters_map_to_describe_and_delete_requests() {
2782 let filter = AclBindingFilter::new(
2783 ResourcePatternFilter::new(
2784 ResourceType::Topic,
2785 Some("orders".to_owned()),
2786 PatternType::Literal,
2787 ),
2788 AccessControlEntryFilter::new(
2789 Some("User:alice".to_owned()),
2790 Some("*".to_owned()),
2791 AclOperation::Read,
2792 AclPermissionType::Allow,
2793 ),
2794 );
2795
2796 let describe = filter.to_describe_request();
2797 assert_eq!(describe.resource_type_filter, 2);
2798 assert_eq!(
2799 describe
2800 .resource_name_filter
2801 .as_ref()
2802 .map(ToString::to_string),
2803 Some("orders".to_owned())
2804 );
2805 assert_eq!(describe.pattern_type_filter, 3);
2806 assert_eq!(
2807 describe.principal_filter.as_ref().map(ToString::to_string),
2808 Some("User:alice".to_owned())
2809 );
2810 assert_eq!(
2811 describe.host_filter.as_ref().map(ToString::to_string),
2812 Some("*".to_owned())
2813 );
2814 assert_eq!(describe.operation, 3);
2815 assert_eq!(describe.permission_type, 3);
2816
2817 let delete = filter.into_delete_filter();
2818 assert_eq!(delete.resource_type_filter, 2);
2819 assert_eq!(
2820 delete
2821 .resource_name_filter
2822 .as_ref()
2823 .map(ToString::to_string),
2824 Some("orders".to_owned())
2825 );
2826 assert_eq!(delete.pattern_type_filter, 3);
2827 assert_eq!(
2828 delete.principal_filter.as_ref().map(ToString::to_string),
2829 Some("User:alice".to_owned())
2830 );
2831 assert_eq!(
2832 delete.host_filter.as_ref().map(ToString::to_string),
2833 Some("*".to_owned())
2834 );
2835 assert_eq!(delete.operation, 3);
2836 assert_eq!(delete.permission_type, 3);
2837 }
2838
2839 #[test]
2840 fn classic_group_payload_counts_decode_subscription_and_assignment() {
2841 let subscription = ConsumerProtocolSubscription::default().with_topics(vec![
2842 StrBytes::from_static_str("orders"),
2843 StrBytes::from_static_str("payments"),
2844 ]);
2845 let subscription_bytes = encode_classic_protocol(&subscription, 3);
2846 assert_eq!(classic_subscription_topic_count(&subscription_bytes), 2);
2847
2848 let assignment = ConsumerProtocolAssignment::default().with_assigned_partitions(vec![
2849 kafka_protocol::messages::consumer_protocol_assignment::TopicPartition::default()
2850 .with_topic(StrBytes::from_static_str("orders").into())
2851 .with_partitions(vec![0, 1]),
2852 kafka_protocol::messages::consumer_protocol_assignment::TopicPartition::default()
2853 .with_topic(StrBytes::from_static_str("payments").into())
2854 .with_partitions(vec![0]),
2855 ]);
2856 let assignment_bytes = encode_classic_protocol(&assignment, 3);
2857 assert_eq!(classic_assignment_partition_count(&assignment_bytes), 3);
2858
2859 assert_eq!(
2860 classic_subscription_topic_count(&Bytes::from_static(b"\x00")),
2861 0
2862 );
2863 assert_eq!(
2864 classic_assignment_partition_count(&Bytes::from_static(b"\x00\x63")),
2865 0
2866 );
2867 }
2868
2869 #[test]
2870 fn feature_update_maps_to_modern_update_features_request() {
2871 let update = FeatureUpdate::upgrade("share.version", 1)
2872 .into_request_update(2)
2873 .unwrap();
2874 assert_eq!(update.feature.to_string(), "share.version");
2875 assert_eq!(update.max_version_level, 1);
2876 assert_eq!(update.upgrade_type, 1);
2877 assert!(!update.allow_downgrade);
2878 }
2879
2880 #[test]
2881 fn feature_update_maps_to_legacy_downgrade_flag() {
2882 let update = FeatureUpdate::safe_downgrade("metadata.version", 0)
2883 .into_request_update(0)
2884 .unwrap();
2885 assert_eq!(update.feature.to_string(), "metadata.version");
2886 assert_eq!(update.max_version_level, 0);
2887 assert_eq!(update.upgrade_type, 1);
2888 assert!(update.allow_downgrade);
2889 }
2890
2891 #[test]
2892 fn feature_update_rejects_empty_name() {
2893 let error = FeatureUpdate::upgrade(" ", 1)
2894 .into_request_update(2)
2895 .unwrap_err();
2896 assert!(
2897 error
2898 .to_string()
2899 .contains("feature names must be non-empty")
2900 );
2901 }
2902
2903 #[test]
2904 fn topic_already_exists_is_ignorable() {
2905 assert!(is_ignorable_create_topic_error(
2906 ResponseError::TopicAlreadyExists
2907 ));
2908 assert!(!is_ignorable_create_topic_error(
2909 ResponseError::UnknownTopicOrPartition
2910 ));
2911 }
2912
2913 fn encode_classic_protocol<T>(message: &T, version: i16) -> Bytes
2914 where
2915 T: Encodable,
2916 {
2917 let mut bytes = BytesMut::new();
2918 bytes.put_i16(version);
2919 message.encode(&mut bytes, version).unwrap();
2920 bytes.freeze()
2921 }
2922}