1use std::collections::{HashMap, HashSet};
48use std::sync::Arc;
49use std::time::Duration;
50
51use bytes::Bytes;
52use tracing::{debug, info, warn};
53
54use crate::auth::{AuthConfig, ScramMechanism};
55use crate::error::{KrafkaError, Result};
56use crate::metadata::{ClusterMetadata, MetadataRecoveryStrategy, TopicInfo};
57use crate::metrics::ConnectionMetrics;
58use crate::network::{BrokerConnection, ConnectionConfig, ConnectionPool};
59
60use crate::protocol::{
61 AclBinding, AclBindingFilter, AclOperation, AclPatternType, AclPermissionType, AclResourceType,
62 AlterClientQuotasRequest, AlterClientQuotasResponse, AlterConfigOp,
63 AlterPartitionReassignmentsRequest, AlterPartitionReassignmentsResponse, AlterQuotaEntity,
64 AlterQuotaEntry, AlterQuotaOp, AlterReplicaLogDir, AlterReplicaLogDirsRequest,
65 AlterReplicaLogDirsResponse, AlterUserScramCredentialsRequest,
66 AlterUserScramCredentialsResponse, AlterableConfig, ApiKey, ConsumerGroupDescribeRequest,
67 ConsumerGroupDescribeResponse, CreatableRenewer, CreatableTopic, CreatableTopicConfig,
68 CreateAclsRequest, CreateAclsResponse, CreateDelegationTokenRequest,
69 CreateDelegationTokenResponse, CreatePartitionsRequest, CreatePartitionsResponse,
70 CreatePartitionsTopic, CreateTopicsRequest, CreateTopicsResponse, DeleteAclsRequest,
71 DeleteAclsResponse, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsPartition,
72 DeleteRecordsRequest, DeleteRecordsResponse, DeleteRecordsTopic, DeleteTopicState,
73 DeleteTopicsRequest, DeleteTopicsResponse, DescribableLogDirTopic, DescribeAclsRequest,
74 DescribeAclsResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse,
75 DescribeClusterRequest, DescribeClusterResponse, DescribeConfigsResponse,
76 DescribeDelegationTokenOwner, DescribeDelegationTokenRequest, DescribeDelegationTokenResponse,
77 DescribeGroupsRequest, DescribeGroupsResponse, DescribeLogDirsRequest, DescribeLogDirsResponse,
78 DescribeProducersRequest, DescribeProducersResponse, DescribeProducersTopicRequest,
79 DescribeQuorumPartitionRequest, DescribeQuorumRequest, DescribeQuorumResponse,
80 DescribeQuorumTopicRequest, DescribeTopicPartitionsCursor, DescribeTopicPartitionsRequest,
81 DescribeTopicPartitionsResponse, DescribeTransactionsRequest, DescribeTransactionsResponse,
82 DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse, ElectLeadersRequest,
83 ElectLeadersResponse, ElectLeadersTopicPartitions, ElectionType, ExpireDelegationTokenRequest,
84 ExpireDelegationTokenResponse, FinalizedFeature, FindCoordinatorRequest,
85 FindCoordinatorResponse, IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse,
86 ListClientMetricsResourcesRequest, ListClientMetricsResourcesResponse, ListGroupsRequest,
87 ListGroupsResponse, ListPartitionReassignmentsRequest, ListPartitionReassignmentsResponse,
88 ListPartitionReassignmentsTopic, ListTransactionsRequest, ListTransactionsResponse,
89 OffsetDeletePartitionRequest, OffsetDeleteRequest, OffsetDeleteResponse,
90 OffsetDeleteTopicRequest, OffsetForLeaderEpochPartition, OffsetForLeaderEpochRequest,
91 OffsetForLeaderEpochResponse, OffsetForLeaderEpochTopic, QuotaFilterComponent,
92 ReassignableTopic, RenewDelegationTokenRequest, RenewDelegationTokenResponse,
93 ScramCredentialDeletion, ScramCredentialUpsertion, SupportedFeature, UpdateFeaturesRequest,
94 UpdateFeaturesResponse, VersionedDecode, VersionedEncode, WritableTxnMarker,
95 WritableTxnMarkerTopic, WriteTxnMarkersRequest, WriteTxnMarkersResponse, validate_topic_name,
96 validate_topic_names, versions,
97};
98
99pub use crate::protocol::DescribeConfigsRequest;
101
102const DEFAULT_RESPONSE_PARTITION_LIMIT: i32 = 2000;
104
105#[non_exhaustive]
107#[derive(Debug, Clone)]
108pub struct NewTopic {
109 pub name: String,
111 pub num_partitions: i32,
113 pub replication_factor: i16,
115 pub configs: HashMap<String, String>,
117}
118
119impl NewTopic {
120 pub fn new(
135 name: impl Into<String>,
136 num_partitions: i32,
137 replication_factor: i16,
138 ) -> Result<Self> {
139 let name = name.into();
140 validate_topic_name(&name)?;
141 if num_partitions == 0 || num_partitions < -1 {
142 return Err(KrafkaError::config(format!(
143 "num_partitions must be positive or -1, got {num_partitions}"
144 )));
145 }
146 if replication_factor == 0 || replication_factor < -1 {
147 return Err(KrafkaError::config(format!(
148 "replication_factor must be positive or -1, got {replication_factor}"
149 )));
150 }
151 Ok(Self {
152 name,
153 num_partitions,
154 replication_factor,
155 configs: HashMap::new(),
156 })
157 }
158
159 pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
161 self.configs.insert(key.into(), value.into());
162 self
163 }
164}
165
166#[non_exhaustive]
168#[derive(Debug, Clone)]
169pub struct CreateTopicResult {
170 pub name: String,
172 pub error: Option<String>,
174}
175
176#[non_exhaustive]
178#[derive(Debug, Clone)]
179pub struct DeleteTopicResult {
180 pub name: String,
182 pub error: Option<String>,
184}
185
186#[non_exhaustive]
188#[derive(Debug, Clone)]
189pub struct CreatePartitionsResult {
190 pub topic: String,
192 pub error: Option<String>,
194}
195
196#[non_exhaustive]
198#[derive(Debug, Clone)]
199pub struct ConfigEntry {
200 pub name: String,
202 pub value: Option<String>,
204 pub read_only: bool,
206 pub is_default: bool,
208 pub is_sensitive: bool,
210 pub config_source: i8,
212 pub synonyms: Vec<ConfigSynonymEntry>,
214 pub config_type: i8,
216 pub documentation: Option<String>,
218}
219
220#[non_exhaustive]
222#[derive(Debug, Clone)]
223pub struct ConfigSynonymEntry {
224 pub name: String,
226 pub value: Option<String>,
228 pub source: i8,
230}
231
232#[non_exhaustive]
234#[derive(Debug, Clone)]
235pub struct AlterConfigResult {
236 pub resource_name: String,
238 pub error: Option<String>,
240}
241
242#[non_exhaustive]
244#[derive(Debug, Clone)]
245pub struct DescribeAclsResult {
246 pub error: Option<String>,
248 pub bindings: Vec<AclBinding>,
250}
251
252#[non_exhaustive]
254#[derive(Debug, Clone)]
255pub struct CreateAclsResult {
256 pub results: Vec<CreateAclResult>,
258}
259
260#[non_exhaustive]
262#[derive(Debug, Clone)]
263pub struct CreateAclResult {
264 pub error: Option<String>,
266}
267
268#[non_exhaustive]
270#[derive(Debug, Clone)]
271pub struct DeleteAclsResult {
272 pub filter_results: Vec<DeleteAclFilterResult>,
274}
275
276#[non_exhaustive]
278#[derive(Debug, Clone)]
279pub struct DeleteAclFilterResult {
280 pub error: Option<String>,
282 pub deleted_count: usize,
284}
285
286#[non_exhaustive]
288#[derive(Debug, Clone, PartialEq, Eq)]
289pub enum GroupType {
290 Classic,
292 Consumer,
294 Unknown(String),
296}
297
298impl std::fmt::Display for GroupType {
299 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300 match self {
301 Self::Classic => f.write_str("classic"),
302 Self::Consumer => f.write_str("consumer"),
303 Self::Unknown(s) => f.write_str(s),
304 }
305 }
306}
307
308#[non_exhaustive]
317#[derive(Debug, Clone)]
318pub struct ConsumerGroupDescription {
319 pub group_id: String,
321 pub group_type: GroupType,
323 pub state: String,
325 pub protocol_type: Option<String>,
327 pub assignor: Option<String>,
331 pub group_epoch: Option<i32>,
333 pub assignment_epoch: Option<i32>,
335 pub members: Vec<ConsumerGroupMember>,
337 pub authorized_operations: Option<i32>,
339 pub error: Option<String>,
341}
342
343#[non_exhaustive]
347#[derive(Debug, Clone)]
348pub struct ConsumerGroupMember {
349 pub member_id: String,
351 pub instance_id: Option<String>,
353 pub rack_id: Option<String>,
355 pub member_epoch: Option<i32>,
357 pub client_id: String,
359 pub client_host: String,
361 pub subscribed_topic_names: Option<Vec<String>>,
363 pub subscribed_topic_regex: Option<String>,
365 pub assignment: Option<Vec<TopicPartitionAssignment>>,
367 pub target_assignment: Option<Vec<TopicPartitionAssignment>>,
369 pub member_type: Option<i8>,
371}
372
373#[non_exhaustive]
375#[derive(Debug, Clone)]
376pub struct TopicPartitionAssignment {
377 pub topic_id: [u8; 16],
379 pub topic_name: String,
381 pub partitions: Vec<i32>,
383}
384
385#[non_exhaustive]
387#[derive(Debug, Clone)]
388pub struct ConsumerGroupListing {
389 pub group_id: String,
391 pub protocol_type: String,
393 pub group_type: Option<GroupType>,
395}
396
397#[non_exhaustive]
399#[derive(Debug, Clone)]
400pub struct DescribeTopicPartitionsResult {
401 pub topics: Vec<TopicPartitionDescription>,
403 pub next_cursor_topic: Option<String>,
405 pub next_cursor_partition: Option<i32>,
407}
408
409#[non_exhaustive]
411#[derive(Debug, Clone)]
412pub struct TopicPartitionDescription {
413 pub name: Option<String>,
415 pub topic_id: [u8; 16],
417 pub is_internal: bool,
419 pub partitions: Vec<PartitionDescription>,
421 pub topic_authorized_operations: i32,
423 pub error: Option<String>,
425}
426
427#[non_exhaustive]
429#[derive(Debug, Clone)]
430pub struct PartitionDescription {
431 pub partition_index: i32,
433 pub leader_id: i32,
435 pub leader_epoch: i32,
437 pub replica_nodes: Vec<i32>,
439 pub isr_nodes: Vec<i32>,
441 pub eligible_leader_replicas: Option<Vec<i32>>,
443 pub last_known_elr: Option<Vec<i32>>,
445 pub offline_replicas: Vec<i32>,
447 pub error: Option<String>,
449}
450
451#[non_exhaustive]
453#[derive(Debug, Clone)]
454pub struct DeleteRecordResult {
455 pub topic: String,
457 pub partition: i32,
459 pub low_watermark: i64,
461 pub error: Option<String>,
463}
464
465#[non_exhaustive]
467#[derive(Debug, Clone)]
468pub struct LeaderEpochResult {
469 pub topic: String,
471 pub partition: i32,
473 pub leader_epoch: i32,
475 pub end_offset: i64,
477 pub error: Option<String>,
479}
480
481#[non_exhaustive]
483#[derive(Debug, Clone)]
484pub struct DelegationTokenRenewer {
485 pub principal_type: String,
487 pub principal_name: String,
489}
490
491#[non_exhaustive]
494#[derive(Clone)]
495pub struct DelegationToken {
496 pub principal_type: String,
498 pub principal_name: String,
500 pub issue_timestamp_ms: i64,
502 pub expiry_timestamp_ms: i64,
504 pub max_timestamp_ms: i64,
506 pub token_id: String,
508 pub hmac: Bytes,
510 pub renewers: Vec<DelegationTokenRenewer>,
516}
517
518impl std::fmt::Debug for DelegationToken {
519 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
520 f.debug_struct("DelegationToken")
521 .field("principal_type", &self.principal_type)
522 .field("principal_name", &self.principal_name)
523 .field("issue_timestamp_ms", &self.issue_timestamp_ms)
524 .field("expiry_timestamp_ms", &self.expiry_timestamp_ms)
525 .field("max_timestamp_ms", &self.max_timestamp_ms)
526 .field("token_id", &self.token_id)
527 .field("hmac", &"[REDACTED]")
528 .field("renewers", &self.renewers)
529 .finish()
530 }
531}
532
533#[non_exhaustive]
535#[derive(Debug, Clone)]
536pub struct CreateDelegationTokenResult {
537 pub token: Option<DelegationToken>,
539 pub error: Option<String>,
541}
542
543#[non_exhaustive]
545#[derive(Debug, Clone)]
546pub struct RenewDelegationTokenResult {
547 pub expiry_timestamp_ms: i64,
549 pub error: Option<String>,
551}
552
553#[non_exhaustive]
555#[derive(Debug, Clone)]
556pub struct ExpireDelegationTokenResult {
557 pub expiry_timestamp_ms: i64,
559 pub error: Option<String>,
561}
562
563#[non_exhaustive]
565#[derive(Debug, Clone)]
566pub struct QuotaEntityComponent {
567 pub entity_type: String,
569 pub entity_name: Option<String>,
571}
572
573#[non_exhaustive]
575#[derive(Debug, Clone)]
576pub struct QuotaConfig {
577 pub key: String,
580 pub value: f64,
582}
583
584#[non_exhaustive]
586#[derive(Debug, Clone)]
587pub struct QuotaDescription {
588 pub entity: Vec<QuotaEntityComponent>,
590 pub values: Vec<QuotaConfig>,
592}
593
594#[non_exhaustive]
596#[derive(Debug, Clone)]
597pub struct DescribeClientQuotasResult {
598 pub entries: Vec<QuotaDescription>,
600 pub error: Option<String>,
602}
603
604#[non_exhaustive]
606#[derive(Debug, Clone)]
607pub struct AlterClientQuotaResult {
608 pub entity: Vec<QuotaEntityComponent>,
610 pub error: Option<String>,
612}
613
614#[derive(Debug, Clone)]
620pub struct QuotaAlteration<'a> {
621 pub entity: Vec<(&'a str, Option<&'a str>)>,
624 pub ops: Vec<(&'a str, Option<f64>)>,
627}
628
629#[non_exhaustive]
633#[derive(Debug, Clone, Default)]
634pub struct AclFilter {
635 pub resource_type: AclResourceType,
637 pub resource_name: Option<String>,
639 pub pattern_type: AclPatternType,
641 pub principal: Option<String>,
643 pub host: Option<String>,
645 pub operation: AclOperation,
647 pub permission_type: AclPermissionType,
649}
650
651impl AclFilter {
652 pub fn all() -> Self {
654 Self::default()
655 }
656
657 pub fn for_resource(resource_type: AclResourceType, resource_name: impl Into<String>) -> Self {
659 Self {
660 resource_type,
661 resource_name: Some(resource_name.into()),
662 ..Default::default()
663 }
664 }
665
666 pub fn for_principal(principal: impl Into<String>) -> Self {
668 Self {
669 principal: Some(principal.into()),
670 ..Default::default()
671 }
672 }
673
674 pub fn resource_type(mut self, resource_type: AclResourceType) -> Self {
676 self.resource_type = resource_type;
677 self
678 }
679
680 pub fn resource_name(mut self, name: impl Into<String>) -> Self {
682 self.resource_name = Some(name.into());
683 self
684 }
685
686 pub fn pattern_type(mut self, pattern_type: AclPatternType) -> Self {
688 self.pattern_type = pattern_type;
689 self
690 }
691
692 pub fn principal(mut self, principal: impl Into<String>) -> Self {
694 self.principal = Some(principal.into());
695 self
696 }
697
698 pub fn host(mut self, host: impl Into<String>) -> Self {
700 self.host = Some(host.into());
701 self
702 }
703
704 pub fn operation(mut self, operation: AclOperation) -> Self {
706 self.operation = operation;
707 self
708 }
709
710 pub fn permission_type(mut self, permission_type: AclPermissionType) -> Self {
712 self.permission_type = permission_type;
713 self
714 }
715}
716
717#[derive(Debug, Clone)]
721pub struct AdminConfig {
722 pub(crate) bootstrap_servers: String,
724 pub(crate) client_id: String,
726 pub(crate) request_timeout: Duration,
728 pub(crate) metadata_recovery_strategy: MetadataRecoveryStrategy,
730 pub(crate) metadata_recovery_rebootstrap_trigger: Duration,
734 pub(crate) auth: Option<AuthConfig>,
736 #[cfg(feature = "socks5")]
738 pub(crate) proxy: Option<crate::network::ProxyConfig>,
739}
740
741impl Default for AdminConfig {
742 fn default() -> Self {
743 Self {
744 bootstrap_servers: String::new(),
745 client_id: "krafka-admin".to_string(),
746 request_timeout: Duration::from_secs(30),
747 metadata_recovery_strategy: MetadataRecoveryStrategy::Rebootstrap,
748 metadata_recovery_rebootstrap_trigger: Duration::from_secs(300),
749 auth: None,
750 #[cfg(feature = "socks5")]
751 proxy: None,
752 }
753 }
754}
755
756impl AdminConfig {
757 pub fn builder() -> AdminConfigBuilder {
759 AdminConfigBuilder::default()
760 }
761
762 #[inline]
764 pub fn bootstrap_servers(&self) -> &str {
765 &self.bootstrap_servers
766 }
767
768 #[inline]
770 pub fn client_id(&self) -> &str {
771 &self.client_id
772 }
773
774 #[inline]
776 pub fn request_timeout(&self) -> Duration {
777 self.request_timeout
778 }
779
780 #[inline]
782 pub fn metadata_recovery_strategy(&self) -> MetadataRecoveryStrategy {
783 self.metadata_recovery_strategy
784 }
785
786 #[inline]
788 pub fn metadata_recovery_rebootstrap_trigger(&self) -> Duration {
789 self.metadata_recovery_rebootstrap_trigger
790 }
791
792 #[inline]
794 pub fn auth(&self) -> Option<&AuthConfig> {
795 self.auth.as_ref()
796 }
797
798 #[cfg(feature = "socks5")]
800 #[inline]
801 pub fn proxy(&self) -> Option<&crate::network::ProxyConfig> {
802 self.proxy.as_ref()
803 }
804}
805
806#[must_use = "builders do nothing until .build() is called"]
808#[derive(Debug, Default)]
809pub struct AdminConfigBuilder {
810 config: AdminConfig,
811}
812
813impl AdminConfigBuilder {
814 pub fn bootstrap_servers(mut self, servers: impl Into<String>) -> Self {
816 self.config.bootstrap_servers = servers.into();
817 self
818 }
819
820 pub fn client_id(mut self, id: impl Into<String>) -> Self {
822 self.config.client_id = id.into();
823 self
824 }
825
826 pub fn request_timeout(mut self, timeout: Duration) -> Self {
828 self.config.request_timeout = timeout;
829 self
830 }
831
832 pub fn metadata_recovery_strategy(mut self, strategy: MetadataRecoveryStrategy) -> Self {
834 self.config.metadata_recovery_strategy = strategy;
835 self
836 }
837
838 pub fn metadata_recovery_rebootstrap_trigger(mut self, duration: Duration) -> Self {
842 self.config.metadata_recovery_rebootstrap_trigger = duration;
843 self
844 }
845
846 pub fn auth(mut self, auth: AuthConfig) -> Self {
848 self.config.auth = Some(auth);
849 self
850 }
851
852 #[cfg(feature = "socks5")]
854 pub fn proxy(mut self, proxy: crate::network::ProxyConfig) -> Self {
855 self.config.proxy = Some(proxy);
856 self
857 }
858
859 pub fn build(self) -> AdminConfig {
861 self.config
862 }
863}
864
865#[non_exhaustive]
867#[derive(Debug, Clone)]
868pub struct DeleteGroupResult {
869 pub group_id: String,
871 pub error: Option<String>,
873}
874
875#[non_exhaustive]
877#[derive(Debug, Clone)]
878pub struct DescribeClusterResult {
879 pub cluster_id: String,
881 pub controller_id: i32,
883 pub brokers: Vec<DescribeClusterBrokerInfo>,
885 pub cluster_authorized_operations: i32,
887}
888
889#[non_exhaustive]
891#[derive(Debug, Clone)]
892pub struct DescribeClusterBrokerInfo {
893 pub broker_id: i32,
895 pub host: String,
897 pub port: i32,
899 pub rack: Option<String>,
901}
902
903pub struct AdminClient {
905 config: AdminConfig,
907 metadata: Arc<ClusterMetadata>,
909 pool: Arc<ConnectionPool>,
911 closed: std::sync::atomic::AtomicBool,
913}
914
915impl Drop for AdminClient {
916 fn drop(&mut self) {
917 if !self.closed.load(std::sync::atomic::Ordering::SeqCst) && !std::thread::panicking() {
921 warn!(
922 "AdminClient dropped without close(); in-flight RPCs may fail abruptly. \
923 Call `AdminClient::close()` before drop."
924 );
925 }
926 }
927}
928
929impl AdminClient {
930 pub fn builder() -> AdminClientBuilder {
932 AdminClientBuilder::default()
933 }
934
935 #[inline]
941 fn check_not_closed(&self) -> Result<()> {
942 if self.is_closed() {
943 return Err(KrafkaError::invalid_state("AdminClient is closed"));
944 }
945 Ok(())
946 }
947
948 async fn get_any_broker_connection(&self) -> Result<Arc<BrokerConnection>> {
954 self.check_not_closed()?;
955 let brokers = self.metadata.brokers();
956 if brokers.is_empty() {
957 return Err(KrafkaError::broker(
958 crate::error::ErrorCode::UnknownServerError,
959 "no brokers available",
960 ));
961 }
962 let broker = &brokers[0];
963 self.pool
964 .get_connection_by_id(broker.id, broker.address())
965 .await
966 }
967
968 pub async fn create_topics(
970 &self,
971 topics: Vec<NewTopic>,
972 timeout: Duration,
973 ) -> Result<Vec<CreateTopicResult>> {
974 let conn = self.get_any_broker_connection().await?;
975
976 let request = CreateTopicsRequest {
978 topics: topics
979 .iter()
980 .map(|t| CreatableTopic {
981 name: t.name.clone(),
982 num_partitions: t.num_partitions,
983 replication_factor: t.replication_factor,
984 assignments: Vec::new(),
985 configs: t
986 .configs
987 .iter()
988 .map(|(k, v)| CreatableTopicConfig {
989 name: k.clone(),
990 value: Some(v.clone()),
991 })
992 .collect(),
993 })
994 .collect(),
995 timeout_ms: crate::util::duration_to_millis_i32(timeout),
996 validate_only: false,
997 };
998
999 let version = conn
1001 .negotiate_api_version(
1002 ApiKey::CreateTopics,
1003 versions::CREATE_TOPICS_MAX,
1004 versions::CREATE_TOPICS_MIN,
1005 )
1006 .await
1007 .ok_or_else(|| {
1008 KrafkaError::protocol("no mutually supported CreateTopics API version")
1009 })?;
1010
1011 let response_bytes = conn
1012 .send_request(ApiKey::CreateTopics, version, |buf| {
1013 request.encode_versioned(version, buf)
1014 })
1015 .await?;
1016
1017 let mut buf = response_bytes;
1019 let response = CreateTopicsResponse::decode_versioned(version, &mut buf)?;
1020
1021 let results = response
1023 .topics
1024 .into_iter()
1025 .map(|t| CreateTopicResult {
1026 name: t.name,
1027 error: if t.error_code.is_ok() {
1028 None
1029 } else {
1030 Some(
1031 t.error_message
1032 .unwrap_or_else(|| format!("{:?}", t.error_code)),
1033 )
1034 },
1035 })
1036 .collect();
1037
1038 info!("Created {} topics", topics.len());
1039 Ok(results)
1040 }
1041
1042 pub async fn delete_topics(
1044 &self,
1045 topics: Vec<String>,
1046 timeout: Duration,
1047 ) -> Result<Vec<DeleteTopicResult>> {
1048 validate_topic_names(topics.iter().map(String::as_str))?;
1051 let conn = self.get_any_broker_connection().await?;
1052
1053 let delete_topic_states: Vec<DeleteTopicState> = topics
1056 .iter()
1057 .map(|name| DeleteTopicState {
1058 name: Some(name.clone()),
1059 topic_id: [0u8; 16],
1061 })
1062 .collect();
1063 let topic_count = topics.len();
1064 let request = DeleteTopicsRequest {
1065 topic_names: topics,
1066 topics: delete_topic_states,
1067 timeout_ms: crate::util::duration_to_millis_i32(timeout),
1068 };
1069
1070 let version = conn
1072 .negotiate_api_version(
1073 ApiKey::DeleteTopics,
1074 versions::DELETE_TOPICS_MAX,
1075 versions::DELETE_TOPICS_MIN,
1076 )
1077 .await
1078 .ok_or_else(|| {
1079 KrafkaError::protocol("no mutually supported DeleteTopics API version")
1080 })?;
1081
1082 let response_bytes = conn
1083 .send_request(ApiKey::DeleteTopics, version, |buf| {
1084 request.encode_versioned(version, buf)
1085 })
1086 .await?;
1087
1088 let mut buf = response_bytes;
1090 let response = DeleteTopicsResponse::decode_versioned(version, &mut buf)?;
1091
1092 let results = response
1094 .responses
1095 .into_iter()
1096 .map(|r| DeleteTopicResult {
1097 name: r.name.unwrap_or_default(),
1098 error: if r.error_code.is_ok() {
1099 None
1100 } else {
1101 Some(
1102 r.error_message
1103 .unwrap_or_else(|| format!("{:?}", r.error_code)),
1104 )
1105 },
1106 })
1107 .collect();
1108
1109 info!("Deleted {} topics", topic_count);
1110 Ok(results)
1111 }
1112
1113 pub async fn create_partitions(
1117 &self,
1118 topic: impl Into<String>,
1119 new_total_count: i32,
1120 timeout: Duration,
1121 ) -> Result<CreatePartitionsResult> {
1122 let topic_name = topic.into();
1123 validate_topic_name(&topic_name)?;
1126 let conn = self.get_any_broker_connection().await?;
1127
1128 let request = CreatePartitionsRequest {
1130 topics: vec![CreatePartitionsTopic {
1131 name: topic_name.clone(),
1132 count: new_total_count,
1133 assignments: None,
1134 }],
1135 timeout_ms: crate::util::duration_to_millis_i32(timeout),
1136 validate_only: false,
1137 };
1138
1139 let version = conn
1141 .negotiate_api_version(
1142 ApiKey::CreatePartitions,
1143 versions::CREATE_PARTITIONS_MAX,
1144 versions::CREATE_PARTITIONS_MIN,
1145 )
1146 .await
1147 .ok_or_else(|| {
1148 KrafkaError::protocol("no mutually supported CreatePartitions API version")
1149 })?;
1150
1151 let response_bytes = conn
1152 .send_request(ApiKey::CreatePartitions, version, |buf| {
1153 request.encode_versioned(version, buf)
1154 })
1155 .await?;
1156
1157 let mut buf = response_bytes;
1159 let response = CreatePartitionsResponse::decode_versioned(version, &mut buf)?;
1160
1161 let result = response
1162 .results
1163 .into_iter()
1164 .next()
1165 .map(|r| CreatePartitionsResult {
1166 topic: r.name,
1167 error: if r.error_code.is_ok() {
1168 None
1169 } else {
1170 Some(
1171 r.error_message
1172 .unwrap_or_else(|| format!("{:?}", r.error_code)),
1173 )
1174 },
1175 })
1176 .unwrap_or(CreatePartitionsResult {
1177 topic: topic_name.clone(),
1178 error: Some("no response received".to_string()),
1179 });
1180
1181 if result.error.is_none() {
1182 info!(
1183 "Increased partitions for topic {} to {}",
1184 topic_name, new_total_count
1185 );
1186 }
1187 Ok(result)
1188 }
1189
1190 pub async fn describe_configs(
1196 &self,
1197 request: DescribeConfigsRequest,
1198 ) -> Result<Vec<ConfigEntry>> {
1199 let conn = self.get_any_broker_connection().await?;
1200
1201 let version = conn
1202 .negotiate_api_version(
1203 ApiKey::DescribeConfigs,
1204 versions::DESCRIBE_CONFIGS_MAX,
1205 versions::DESCRIBE_CONFIGS_MIN,
1206 )
1207 .await
1208 .ok_or_else(|| {
1209 KrafkaError::protocol("no mutually supported DescribeConfigs API version")
1210 })?;
1211
1212 let response_bytes = conn
1213 .send_request(ApiKey::DescribeConfigs, version, |buf| {
1214 request.encode_versioned(version, buf)
1215 })
1216 .await?;
1217
1218 let mut buf = response_bytes;
1219 let response = DescribeConfigsResponse::decode_versioned(version, &mut buf)?;
1220
1221 let entries = response
1222 .results
1223 .into_iter()
1224 .flat_map(|r| {
1225 if !r.error_code.is_ok() {
1226 return Vec::new();
1227 }
1228 r.configs
1229 .into_iter()
1230 .map(|c| ConfigEntry {
1231 name: c.name,
1232 value: c.value,
1233 read_only: c.read_only,
1234 is_default: c.is_default,
1235 is_sensitive: c.is_sensitive,
1236 config_source: c.config_source,
1237 synonyms: c
1238 .synonyms
1239 .into_iter()
1240 .map(|s| ConfigSynonymEntry {
1241 name: s.name,
1242 value: s.value,
1243 source: s.source,
1244 })
1245 .collect(),
1246 config_type: c.config_type,
1247 documentation: c.documentation,
1248 })
1249 .collect()
1250 })
1251 .collect();
1252
1253 Ok(entries)
1254 }
1255
1256 pub async fn alter_topic_config(
1262 &self,
1263 topic: &str,
1264 configs: HashMap<String, String>,
1265 ) -> Result<AlterConfigResult> {
1266 let conn = self.get_any_broker_connection().await?;
1267
1268 let request = IncrementalAlterConfigsRequest::for_topic(
1269 topic,
1270 configs
1271 .into_iter()
1272 .map(|(name, value)| AlterableConfig {
1273 name,
1274 config_operation: AlterConfigOp::Set,
1275 value: Some(value),
1276 })
1277 .collect(),
1278 );
1279
1280 let version = conn
1281 .negotiate_api_version(
1282 ApiKey::IncrementalAlterConfigs,
1283 versions::INCREMENTAL_ALTER_CONFIGS_MAX,
1284 versions::INCREMENTAL_ALTER_CONFIGS_MIN,
1285 )
1286 .await
1287 .ok_or_else(|| {
1288 KrafkaError::protocol("no mutually supported IncrementalAlterConfigs API version")
1289 })?;
1290
1291 let response_bytes = conn
1292 .send_request(ApiKey::IncrementalAlterConfigs, version, |buf| {
1293 request.encode_versioned(version, buf)
1294 })
1295 .await?;
1296
1297 let mut buf = response_bytes;
1298 let response = IncrementalAlterConfigsResponse::decode_versioned(version, &mut buf)?;
1299
1300 let result = response
1301 .results
1302 .into_iter()
1303 .next()
1304 .map(|r| AlterConfigResult {
1305 resource_name: r.resource_name,
1306 error: if r.error_code.is_ok() {
1307 None
1308 } else {
1309 Some(
1310 r.error_message
1311 .unwrap_or_else(|| format!("{:?}", r.error_code)),
1312 )
1313 },
1314 })
1315 .unwrap_or(AlterConfigResult {
1316 resource_name: topic.to_string(),
1317 error: Some("no response received".to_string()),
1318 });
1319
1320 if result.error.is_none() {
1321 info!("Altered config for topic {}", topic);
1322 }
1323 Ok(result)
1324 }
1325
1326 pub async fn list_topics(&self) -> Result<Vec<String>> {
1328 self.check_not_closed()?;
1329 self.metadata.refresh().await?;
1330 Ok(self.metadata.topics().into_iter().map(|t| t.name).collect())
1331 }
1332
1333 pub async fn describe_topics(&self, topics: &[String]) -> Result<Vec<TopicInfo>> {
1335 self.check_not_closed()?;
1336 self.metadata.refresh().await?;
1337 let all_topics = self.metadata.topics();
1338
1339 let mut result = Vec::new();
1340 for topic_name in topics {
1341 if let Some(info) = all_topics.iter().find(|t| &t.name == topic_name) {
1342 result.push(info.clone());
1343 }
1344 }
1345 Ok(result)
1346 }
1347
1348 pub async fn describe_cluster(&self) -> Result<DescribeClusterResult> {
1353 self.check_not_closed()?;
1354 let conn = self.get_any_broker_connection().await?;
1355
1356 let request = DescribeClusterRequest::default();
1357 let version = conn
1358 .negotiate_api_version(
1359 ApiKey::DescribeCluster,
1360 versions::DESCRIBE_CLUSTER_MAX,
1361 versions::DESCRIBE_CLUSTER_MIN,
1362 )
1363 .await
1364 .ok_or_else(|| {
1365 KrafkaError::protocol("no mutually supported DescribeCluster API version")
1366 })?;
1367
1368 let response_bytes = conn
1369 .send_request(ApiKey::DescribeCluster, version, |buf| {
1370 request.encode_versioned(version, buf)
1371 })
1372 .await?;
1373
1374 let mut buf = response_bytes;
1375 let response = DescribeClusterResponse::decode_versioned(version, &mut buf)?;
1376
1377 if !response.error_code.is_ok() {
1378 let msg = response
1379 .error_message
1380 .unwrap_or_else(|| format!("{:?}", response.error_code));
1381 return Err(KrafkaError::protocol(msg));
1382 }
1383
1384 Ok(DescribeClusterResult {
1385 cluster_id: response.cluster_id,
1386 controller_id: response.controller_id,
1387 brokers: response
1388 .brokers
1389 .into_iter()
1390 .map(|b| DescribeClusterBrokerInfo {
1391 broker_id: b.broker_id,
1392 host: b.host,
1393 port: b.port,
1394 rack: b.rack,
1395 })
1396 .collect(),
1397 cluster_authorized_operations: response.cluster_authorized_operations,
1398 })
1399 }
1400
1401 pub async fn partition_count(&self, topic: &str) -> Result<Option<usize>> {
1403 self.check_not_closed()?;
1404 self.metadata.refresh().await?;
1405 Ok(self.metadata.partition_count(topic))
1406 }
1407
1408 #[inline]
1410 pub fn client_id(&self) -> &str {
1411 &self.config.client_id
1412 }
1413
1414 #[inline]
1416 pub fn request_timeout(&self) -> Duration {
1417 self.config.request_timeout
1418 }
1419
1420 pub async fn describe_acls(&self, filter: AclFilter) -> Result<DescribeAclsResult> {
1429 self.check_not_closed()?;
1430 let conn = self.get_any_broker_connection().await?;
1431
1432 let request = DescribeAclsRequest {
1433 resource_type: filter.resource_type,
1434 resource_name: filter.resource_name,
1435 pattern_type: filter.pattern_type,
1436 principal: filter.principal,
1437 host: filter.host,
1438 operation: filter.operation,
1439 permission_type: filter.permission_type,
1440 };
1441
1442 let version = conn
1443 .negotiate_api_version(
1444 ApiKey::DescribeAcls,
1445 versions::DESCRIBE_ACLS_MAX,
1446 versions::DESCRIBE_ACLS_MIN,
1447 )
1448 .await
1449 .ok_or_else(|| {
1450 KrafkaError::protocol("no mutually supported DescribeAcls API version")
1451 })?;
1452
1453 let response_bytes = conn
1454 .send_request(ApiKey::DescribeAcls, version, |buf| {
1455 request.encode_versioned(version, buf)
1456 })
1457 .await?;
1458
1459 let mut buf = response_bytes;
1460 let response = DescribeAclsResponse::decode_versioned(version, &mut buf)?;
1461
1462 let bindings = response
1463 .resources
1464 .into_iter()
1465 .flat_map(|res| {
1466 res.acls.into_iter().map(move |acl| AclBinding {
1467 resource_type: res.resource_type,
1468 resource_name: res.resource_name.clone(),
1469 pattern_type: res.pattern_type,
1470 principal: acl.principal,
1471 host: acl.host,
1472 operation: acl.operation,
1473 permission_type: acl.permission_type,
1474 })
1475 })
1476 .collect();
1477
1478 Ok(DescribeAclsResult {
1479 error: if response.error_code.is_ok() {
1480 None
1481 } else {
1482 Some(
1483 response
1484 .error_message
1485 .unwrap_or_else(|| format!("{:?}", response.error_code)),
1486 )
1487 },
1488 bindings,
1489 })
1490 }
1491
1492 pub async fn create_acls(&self, acls: Vec<AclBinding>) -> Result<CreateAclsResult> {
1503 let conn = self.get_any_broker_connection().await?;
1504
1505 let request = CreateAclsRequest {
1506 creations: acls.clone(),
1507 };
1508
1509 let version = conn
1510 .negotiate_api_version(
1511 ApiKey::CreateAcls,
1512 versions::CREATE_ACLS_MAX,
1513 versions::CREATE_ACLS_MIN,
1514 )
1515 .await
1516 .ok_or_else(|| KrafkaError::protocol("no mutually supported CreateAcls API version"))?;
1517
1518 let response_bytes = conn
1519 .send_request(ApiKey::CreateAcls, version, |buf| {
1520 request.encode_versioned(version, buf)
1521 })
1522 .await?;
1523
1524 let mut buf = response_bytes;
1525 let response = CreateAclsResponse::decode_versioned(version, &mut buf)?;
1526
1527 let results = response
1528 .results
1529 .into_iter()
1530 .map(|r| CreateAclResult {
1531 error: if r.error_code.is_ok() {
1532 None
1533 } else {
1534 Some(
1535 r.error_message
1536 .unwrap_or_else(|| format!("{:?}", r.error_code)),
1537 )
1538 },
1539 })
1540 .collect();
1541
1542 info!("Created {} ACLs", acls.len());
1543 Ok(CreateAclsResult { results })
1544 }
1545
1546 pub async fn delete_acls(&self, filters: Vec<AclBindingFilter>) -> Result<DeleteAclsResult> {
1566 let conn = self.get_any_broker_connection().await?;
1567
1568 let request = DeleteAclsRequest {
1569 filters: filters.clone(),
1570 };
1571
1572 let version = conn
1573 .negotiate_api_version(
1574 ApiKey::DeleteAcls,
1575 versions::DELETE_ACLS_MAX,
1576 versions::DELETE_ACLS_MIN,
1577 )
1578 .await
1579 .ok_or_else(|| KrafkaError::protocol("no mutually supported DeleteAcls API version"))?;
1580
1581 let response_bytes = conn
1582 .send_request(ApiKey::DeleteAcls, version, |buf| {
1583 request.encode_versioned(version, buf)
1584 })
1585 .await?;
1586
1587 let mut buf = response_bytes;
1588 let response = DeleteAclsResponse::decode_versioned(version, &mut buf)?;
1589
1590 let filter_results = response
1591 .filter_results
1592 .into_iter()
1593 .map(|fr| DeleteAclFilterResult {
1594 error: if fr.error_code.is_ok() {
1595 None
1596 } else {
1597 Some(
1598 fr.error_message
1599 .unwrap_or_else(|| format!("{:?}", fr.error_code)),
1600 )
1601 },
1602 deleted_count: fr.matching_acls.len(),
1603 })
1604 .collect();
1605
1606 info!("Deleted ACLs with {} filters", filters.len());
1607 Ok(DeleteAclsResult { filter_results })
1608 }
1609
1610 pub async fn describe_consumer_groups(
1631 &self,
1632 group_ids: Vec<String>,
1633 ) -> Result<Vec<ConsumerGroupDescription>> {
1634 self.check_not_closed()?;
1635 let brokers = self.metadata.brokers();
1636 if brokers.is_empty() {
1637 return Err(KrafkaError::broker(
1638 crate::error::ErrorCode::UnknownServerError,
1639 "no brokers available",
1640 ));
1641 }
1642
1643 let mut coordinator_groups: HashMap<i32, Vec<String>> = HashMap::new();
1645 let any_broker = &brokers[0];
1646 let any_conn = self
1647 .pool
1648 .get_connection_by_id(any_broker.id, any_broker.address())
1649 .await?;
1650
1651 for group_id in &group_ids {
1652 let coord_request = FindCoordinatorRequest::for_group(group_id);
1653 let coord_version = any_conn
1654 .negotiate_api_version(
1655 ApiKey::FindCoordinator,
1656 versions::FIND_COORDINATOR_MAX,
1657 versions::FIND_COORDINATOR_MIN,
1658 )
1659 .await
1660 .ok_or_else(|| {
1661 KrafkaError::protocol("no mutually supported FindCoordinator API version")
1662 })?;
1663
1664 let coord_response_bytes = any_conn
1665 .send_request(ApiKey::FindCoordinator, coord_version, |buf| {
1666 coord_request.encode_versioned(coord_version, buf)
1667 })
1668 .await?;
1669 let mut coord_buf = coord_response_bytes;
1670 let coord_response =
1671 FindCoordinatorResponse::decode_versioned(coord_version, &mut coord_buf)?;
1672
1673 if coord_response.error_code.is_ok() {
1674 coordinator_groups
1675 .entry(coord_response.node_id)
1676 .or_default()
1677 .push(group_id.clone());
1678 } else {
1679 warn!(
1680 "FindCoordinator failed for group '{}': {:?}, falling back to broker {}",
1681 group_id, coord_response.error_code, any_broker.id
1682 );
1683 coordinator_groups
1684 .entry(any_broker.id)
1685 .or_default()
1686 .push(group_id.clone());
1687 }
1688 }
1689
1690 let mut all_results = Vec::new();
1691
1692 for (broker_id, groups) in &coordinator_groups {
1693 let broker = brokers
1694 .iter()
1695 .find(|b| b.id == *broker_id)
1696 .unwrap_or(any_broker);
1697 let conn = self
1698 .pool
1699 .get_connection_by_id(broker.id, broker.address())
1700 .await?;
1701
1702 let kip848_version = conn
1704 .negotiate_api_version(
1705 ApiKey::ConsumerGroupDescribe,
1706 versions::CONSUMER_GROUP_DESCRIBE_MAX,
1707 versions::CONSUMER_GROUP_DESCRIBE_MIN,
1708 )
1709 .await;
1710
1711 let mut classic_fallback: Vec<String> = Vec::new();
1712 let mut maybe_classic: Vec<(String, ConsumerGroupDescription)> = Vec::new();
1713
1714 if let Some(version) = kip848_version {
1715 let request = ConsumerGroupDescribeRequest::new(groups.clone());
1716 let response_bytes = conn
1717 .send_request(ApiKey::ConsumerGroupDescribe, version, |buf| {
1718 request.encode_versioned(version, buf)
1719 })
1720 .await?;
1721
1722 let mut buf = response_bytes;
1723 let response = ConsumerGroupDescribeResponse::decode_versioned(version, &mut buf)?;
1724
1725 for g in response.groups {
1736 debug!(
1737 "ConsumerGroupDescribe for '{}': error={:?}, state='{}', members={}",
1738 g.group_id,
1739 g.error_code,
1740 g.group_state,
1741 g.members.len()
1742 );
1743 if g.error_code == crate::error::ErrorCode::GroupIdNotFound
1744 || g.error_code == crate::error::ErrorCode::UnsupportedVersion
1745 {
1746 debug!(
1748 "ConsumerGroupDescribe for '{}' returned {:?}, \
1749 will retry with DescribeGroups (Key 15)",
1750 g.group_id, g.error_code
1751 );
1752 classic_fallback.push(g.group_id);
1753 continue;
1754 }
1755
1756 let members_empty = g.members.is_empty() && g.error_code.is_ok();
1757 let group_id_clone = g.group_id.clone();
1758
1759 let desc = ConsumerGroupDescription {
1760 group_id: g.group_id,
1761 group_type: GroupType::Consumer,
1762 state: g.group_state,
1763 protocol_type: None,
1764 assignor: Some(g.assignor_name),
1765 group_epoch: Some(g.group_epoch),
1766 assignment_epoch: Some(g.assignment_epoch),
1767 members: g
1768 .members
1769 .into_iter()
1770 .map(|m| ConsumerGroupMember {
1771 member_id: m.member_id,
1772 instance_id: m.instance_id,
1773 rack_id: m.rack_id,
1774 member_epoch: Some(m.member_epoch),
1775 client_id: m.client_id,
1776 client_host: m.client_host,
1777 subscribed_topic_names: Some(m.subscribed_topic_names),
1778 subscribed_topic_regex: m.subscribed_topic_regex,
1779 assignment: Some(
1780 m.assignment
1781 .topic_partitions
1782 .into_iter()
1783 .map(|tp| TopicPartitionAssignment {
1784 topic_id: tp.topic_id,
1785 topic_name: tp.topic_name,
1786 partitions: tp.partitions,
1787 })
1788 .collect(),
1789 ),
1790 target_assignment: Some(
1791 m.target_assignment
1792 .topic_partitions
1793 .into_iter()
1794 .map(|tp| TopicPartitionAssignment {
1795 topic_id: tp.topic_id,
1796 topic_name: tp.topic_name,
1797 partitions: tp.partitions,
1798 })
1799 .collect(),
1800 ),
1801 member_type: Some(m.member_type),
1802 })
1803 .collect(),
1804 authorized_operations: Some(g.authorized_operations),
1805 error: if g.error_code.is_ok() {
1806 None
1807 } else {
1808 let msg = g
1809 .error_message
1810 .unwrap_or_else(|| format!("{:?}", g.error_code));
1811 Some(msg)
1812 },
1813 };
1814
1815 if members_empty {
1821 maybe_classic.push((group_id_clone.clone(), desc));
1822 classic_fallback.push(group_id_clone);
1823 } else {
1824 all_results.push(desc);
1825 }
1826 }
1827 } else {
1828 classic_fallback = groups.clone();
1830 }
1831
1832 if !classic_fallback.is_empty() {
1834 let request = DescribeGroupsRequest {
1835 groups: classic_fallback,
1836 include_authorized_operations: false,
1837 };
1838
1839 let version = conn
1840 .negotiate_api_version(
1841 ApiKey::DescribeGroups,
1842 versions::DESCRIBE_GROUPS_MAX,
1843 versions::DESCRIBE_GROUPS_MIN,
1844 )
1845 .await
1846 .ok_or_else(|| {
1847 KrafkaError::protocol("no mutually supported DescribeGroups API version")
1848 })?;
1849
1850 let response_bytes = conn
1851 .send_request(ApiKey::DescribeGroups, version, |buf| {
1852 request.encode_versioned(version, buf)
1853 })
1854 .await?;
1855
1856 let mut buf = response_bytes;
1857 let response = DescribeGroupsResponse::decode_versioned(version, &mut buf)?;
1858
1859 for g in response.groups {
1860 debug!(
1861 "DescribeGroups (classic) for '{}': error={:?}, state='{}', members={}",
1862 g.group_id,
1863 g.error_code,
1864 g.group_state,
1865 g.members.len()
1866 );
1867 let classic_desc = ConsumerGroupDescription {
1868 group_id: g.group_id,
1869 group_type: GroupType::Classic,
1870 state: g.group_state,
1871 protocol_type: Some(g.protocol_type),
1872 assignor: Some(g.protocol_data),
1873 group_epoch: None,
1874 assignment_epoch: None,
1875 members: g
1876 .members
1877 .into_iter()
1878 .map(|m| ConsumerGroupMember {
1879 member_id: m.member_id,
1880 instance_id: m.group_instance_id,
1881 rack_id: None,
1882 member_epoch: None,
1883 client_id: m.client_id,
1884 client_host: m.client_host,
1885 subscribed_topic_names: None,
1886 subscribed_topic_regex: None,
1887 assignment: None,
1888 target_assignment: None,
1889 member_type: None,
1890 })
1891 .collect(),
1892 authorized_operations: None,
1893 error: if g.error_code.is_ok() {
1894 None
1895 } else {
1896 Some(format!("{:?}", g.error_code))
1897 },
1898 };
1899
1900 if let Some(idx) = maybe_classic
1905 .iter()
1906 .position(|(id, _)| *id == classic_desc.group_id)
1907 {
1908 let (_, consumer_desc) = maybe_classic.swap_remove(idx);
1909 if classic_desc.members.is_empty() {
1910 all_results.push(consumer_desc);
1912 } else {
1913 all_results.push(classic_desc);
1914 }
1915 } else {
1916 all_results.push(classic_desc);
1917 }
1918 }
1919 }
1920
1921 for (_, desc) in maybe_classic {
1924 all_results.push(desc);
1925 }
1926 }
1927
1928 info!("Described {} consumer groups", all_results.len());
1929 Ok(all_results)
1930 }
1931
1932 pub async fn list_consumer_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
1944 self.check_not_closed()?;
1945 let brokers = self.metadata.brokers();
1946 if brokers.is_empty() {
1947 return Err(KrafkaError::broker(
1948 crate::error::ErrorCode::UnknownServerError,
1949 "no brokers available",
1950 ));
1951 }
1952
1953 let mut all_groups = Vec::new();
1955 let mut seen_ids = HashSet::new();
1956 let mut broker_failures = 0usize;
1957 let broker_count = brokers.len();
1958
1959 for broker in &brokers {
1960 let conn = match self
1961 .pool
1962 .get_connection_by_id(broker.id, broker.address())
1963 .await
1964 {
1965 Ok(c) => c,
1966 Err(e) => {
1967 warn!(
1968 "Failed to connect to broker {} for ListGroups, skipping: {}",
1969 broker.id, e
1970 );
1971 broker_failures += 1;
1972 continue;
1973 }
1974 };
1975
1976 let request = ListGroupsRequest {
1977 states_filter: Vec::new(),
1978 types_filter: Vec::new(),
1979 };
1980
1981 let version = match conn
1982 .negotiate_api_version(
1983 ApiKey::ListGroups,
1984 versions::LIST_GROUPS_MAX,
1985 versions::LIST_GROUPS_MIN,
1986 )
1987 .await
1988 {
1989 Some(v) => v,
1990 None => {
1991 warn!(
1992 "No mutually supported ListGroups API version for broker {}, skipping",
1993 broker.id
1994 );
1995 broker_failures += 1;
1996 continue;
1997 }
1998 };
1999
2000 let response_bytes = match conn
2001 .send_request(ApiKey::ListGroups, version, |buf| {
2002 request.encode_versioned(version, buf)
2003 })
2004 .await
2005 {
2006 Ok(r) => r,
2007 Err(e) => {
2008 warn!("ListGroups RPC failed on broker {}: {}", broker.id, e);
2009 broker_failures += 1;
2010 continue;
2011 }
2012 };
2013
2014 let mut buf = response_bytes;
2015 let response = match ListGroupsResponse::decode_versioned(version, &mut buf) {
2016 Ok(r) => r,
2017 Err(e) => {
2018 warn!("ListGroups decode failed on broker {}: {}", broker.id, e);
2019 broker_failures += 1;
2020 continue;
2021 }
2022 };
2023
2024 if !response.error_code.is_ok() {
2025 tracing::warn!(
2026 "ListGroups error on broker {}: {:?}",
2027 broker.id,
2028 response.error_code
2029 );
2030 broker_failures += 1;
2031 continue;
2032 }
2033
2034 for group in response.groups {
2035 if seen_ids.insert(group.group_id.clone()) {
2036 let group_type = group.group_type.map(|t| match t.as_str() {
2037 "classic" => GroupType::Classic,
2038 "consumer" => GroupType::Consumer,
2039 other => GroupType::Unknown(other.to_string()),
2040 });
2041 all_groups.push(ConsumerGroupListing {
2042 group_id: group.group_id,
2043 protocol_type: group.protocol_type,
2044 group_type,
2045 });
2046 }
2047 }
2048 }
2049
2050 if broker_failures == broker_count {
2051 return Err(KrafkaError::invalid_state(
2052 "list_consumer_groups failed: all brokers returned errors",
2053 ));
2054 }
2055
2056 if broker_failures > 0 {
2057 warn!(
2058 "list_consumer_groups: {broker_failures}/{broker_count} brokers failed; \
2059 results may be incomplete"
2060 );
2061 }
2062
2063 info!("Listed {} consumer groups", all_groups.len());
2064 Ok(all_groups)
2065 }
2066
2067 pub async fn delete_records(
2084 &self,
2085 offsets: HashMap<(String, i32), i64>,
2086 timeout: Duration,
2087 ) -> Result<Vec<DeleteRecordResult>> {
2088 self.check_not_closed()?;
2089 for (topic, _) in offsets.keys() {
2091 validate_topic_name(topic)?;
2092 }
2093
2094 for attempt in 0u8..2 {
2095 if attempt == 1 {
2096 let topics: Vec<&str> = offsets.keys().map(|(t, _)| t.as_str()).collect();
2097 let _ = self.metadata.refresh_for_topics(Some(&topics)).await;
2098 }
2099
2100 let brokers = self.metadata.brokers();
2101 if brokers.is_empty() {
2102 return Err(KrafkaError::broker(
2103 crate::error::ErrorCode::UnknownServerError,
2104 "no brokers available",
2105 ));
2106 }
2107
2108 let mut leader_offsets: HashMap<i32, HashMap<String, Vec<DeleteRecordsPartition>>> =
2110 HashMap::new();
2111 let fallback_broker_id = brokers[0].id;
2112
2113 for ((topic, partition), offset) in &offsets {
2114 let leader_id = self
2115 .metadata
2116 .leader(topic, *partition)
2117 .unwrap_or(fallback_broker_id);
2118 leader_offsets
2119 .entry(leader_id)
2120 .or_default()
2121 .entry(topic.clone())
2122 .or_default()
2123 .push(DeleteRecordsPartition {
2124 partition_index: *partition,
2125 offset: *offset,
2126 });
2127 }
2128
2129 let mut results = Vec::new();
2130 let mut has_stale_leader = false;
2131
2132 for (broker_id, topics_map) in leader_offsets {
2133 let broker = brokers
2134 .iter()
2135 .find(|b| b.id == broker_id)
2136 .unwrap_or(&brokers[0]);
2137 let conn = self
2138 .pool
2139 .get_connection_by_id(broker.id, broker.address())
2140 .await?;
2141
2142 let request = DeleteRecordsRequest {
2143 topics: topics_map
2144 .into_iter()
2145 .map(|(name, partitions)| DeleteRecordsTopic { name, partitions })
2146 .collect(),
2147 timeout_ms: crate::util::duration_to_millis_i32(timeout),
2148 };
2149
2150 let version = conn
2151 .negotiate_api_version(
2152 ApiKey::DeleteRecords,
2153 versions::DELETE_RECORDS_MAX,
2154 versions::DELETE_RECORDS_MIN,
2155 )
2156 .await
2157 .ok_or_else(|| {
2158 KrafkaError::protocol("no mutually supported DeleteRecords API version")
2159 })?;
2160
2161 let response_bytes = conn
2162 .send_request(ApiKey::DeleteRecords, version, |buf| {
2163 request.encode_versioned(version, buf)
2164 })
2165 .await?;
2166
2167 let mut buf = response_bytes;
2168 let response = DeleteRecordsResponse::decode_versioned(version, &mut buf)?;
2169
2170 for topic in response.topics {
2171 let topic_name = topic.name;
2172 for partition in topic.partitions {
2173 if partition.error_code == crate::error::ErrorCode::NotLeaderForPartition {
2174 has_stale_leader = true;
2175 }
2176 results.push(DeleteRecordResult {
2177 topic: topic_name.clone(),
2178 partition: partition.partition_index,
2179 low_watermark: partition.low_watermark,
2180 error: if partition.error_code.is_ok() {
2181 None
2182 } else {
2183 Some(format!("{:?}", partition.error_code))
2184 },
2185 });
2186 }
2187 }
2188 }
2189
2190 if has_stale_leader && attempt == 0 {
2191 warn!(
2192 "NotLeaderForPartition in DeleteRecords response, retrying with refreshed metadata"
2193 );
2194 continue;
2195 }
2196
2197 info!("Deleted records from {} partition(s)", results.len());
2198 return Ok(results);
2199 }
2200 Err(KrafkaError::protocol(
2201 "DeleteRecords retry loop exhausted after metadata refresh",
2202 ))
2203 }
2204
2205 pub async fn offset_for_leader_epoch(
2225 &self,
2226 partitions: Vec<(String, i32, i32)>,
2227 ) -> Result<Vec<LeaderEpochResult>> {
2228 self.check_not_closed()?;
2229 for (topic, _, _) in &partitions {
2231 validate_topic_name(topic)?;
2232 }
2233
2234 for attempt in 0u8..2 {
2235 if attempt == 1 {
2236 let topics: Vec<&str> = partitions.iter().map(|(t, _, _)| t.as_str()).collect();
2237 let _ = self.metadata.refresh_for_topics(Some(&topics)).await;
2238 }
2239
2240 let brokers = self.metadata.brokers();
2241 if brokers.is_empty() {
2242 return Err(KrafkaError::broker(
2243 crate::error::ErrorCode::UnknownServerError,
2244 "no brokers available",
2245 ));
2246 }
2247
2248 let fallback_broker_id = brokers[0].id;
2250 let mut leader_partitions: HashMap<
2251 i32,
2252 HashMap<String, Vec<OffsetForLeaderEpochPartition>>,
2253 > = HashMap::new();
2254
2255 for (topic, partition, leader_epoch) in &partitions {
2256 let leader_id = self
2257 .metadata
2258 .leader(topic, *partition)
2259 .unwrap_or(fallback_broker_id);
2260 leader_partitions
2261 .entry(leader_id)
2262 .or_default()
2263 .entry(topic.clone())
2264 .or_default()
2265 .push(OffsetForLeaderEpochPartition {
2266 partition: *partition,
2267 current_leader_epoch: -1, leader_epoch: *leader_epoch,
2269 });
2270 }
2271
2272 let mut results = Vec::new();
2273 let mut has_stale_leader = false;
2274
2275 for (broker_id, topics_map) in leader_partitions {
2276 let broker = brokers
2277 .iter()
2278 .find(|b| b.id == broker_id)
2279 .unwrap_or(&brokers[0]);
2280 let conn = self
2281 .pool
2282 .get_connection_by_id(broker.id, broker.address())
2283 .await?;
2284
2285 let request = OffsetForLeaderEpochRequest {
2286 replica_id: -1, topics: topics_map
2288 .into_iter()
2289 .map(|(topic, partitions)| OffsetForLeaderEpochTopic { topic, partitions })
2290 .collect(),
2291 };
2292
2293 let version = conn
2294 .negotiate_api_version(
2295 ApiKey::OffsetForLeaderEpoch,
2296 versions::OFFSET_FOR_LEADER_EPOCH_MAX,
2297 versions::OFFSET_FOR_LEADER_EPOCH_MIN,
2298 )
2299 .await
2300 .ok_or_else(|| {
2301 KrafkaError::protocol(
2302 "no mutually supported OffsetForLeaderEpoch API version",
2303 )
2304 })?;
2305
2306 let response_bytes = conn
2307 .send_request(ApiKey::OffsetForLeaderEpoch, version, |buf| {
2308 request.encode_versioned(version, buf)
2309 })
2310 .await?;
2311
2312 let mut buf = response_bytes;
2313 let response = OffsetForLeaderEpochResponse::decode_versioned(version, &mut buf)?;
2314
2315 for topic in response.topics {
2316 let topic_name = topic.topic;
2317 for partition in topic.partitions {
2318 if partition.error_code == crate::error::ErrorCode::NotLeaderForPartition {
2319 has_stale_leader = true;
2320 }
2321 results.push(LeaderEpochResult {
2322 topic: topic_name.clone(),
2323 partition: partition.partition,
2324 leader_epoch: partition.leader_epoch,
2325 end_offset: partition.end_offset,
2326 error: if partition.error_code.is_ok() {
2327 None
2328 } else {
2329 Some(format!("{:?}", partition.error_code))
2330 },
2331 });
2332 }
2333 }
2334 }
2335
2336 if has_stale_leader && attempt == 0 {
2337 warn!(
2338 "NotLeaderForPartition in OffsetForLeaderEpoch response, retrying with refreshed metadata"
2339 );
2340 continue;
2341 }
2342
2343 info!(
2344 "Got leader epoch offsets for {} partition(s)",
2345 results.len()
2346 );
2347 return Ok(results);
2348 }
2349 Err(KrafkaError::protocol(
2350 "OffsetForLeaderEpoch retry loop exhausted after metadata refresh",
2351 ))
2352 }
2353
2354 pub async fn create_delegation_token(
2369 &self,
2370 renewers: &[(&str, &str)],
2371 max_lifetime: Option<Duration>,
2372 ) -> Result<CreateDelegationTokenResult> {
2373 let conn = self.get_any_broker_connection().await?;
2374
2375 let request = CreateDelegationTokenRequest {
2376 renewers: renewers
2377 .iter()
2378 .map(|(t, n)| CreatableRenewer {
2379 principal_type: t.to_string(),
2380 principal_name: n.to_string(),
2381 })
2382 .collect(),
2383 max_lifetime_ms: max_lifetime
2384 .map(crate::util::duration_to_millis_i64)
2385 .unwrap_or(-1),
2386 owner_principal_type: None,
2387 owner_principal_name: None,
2388 };
2389
2390 let version = conn
2391 .negotiate_api_version(
2392 ApiKey::CreateDelegationToken,
2393 versions::CREATE_DELEGATION_TOKEN_MAX,
2394 versions::CREATE_DELEGATION_TOKEN_MIN,
2395 )
2396 .await
2397 .ok_or_else(|| {
2398 KrafkaError::protocol("no mutually supported CreateDelegationToken API version")
2399 })?;
2400
2401 let response_bytes = conn
2402 .send_request(ApiKey::CreateDelegationToken, version, |buf| {
2403 request.encode_versioned(version, buf)
2404 })
2405 .await?;
2406
2407 let mut buf = response_bytes;
2408 let response = CreateDelegationTokenResponse::decode_versioned(version, &mut buf)?;
2409
2410 let result = if response.error_code.is_ok() {
2411 info!("Created delegation token");
2412 CreateDelegationTokenResult {
2413 token: Some(DelegationToken {
2414 principal_type: response.principal_type,
2415 principal_name: response.principal_name,
2416 issue_timestamp_ms: response.issue_timestamp_ms,
2417 expiry_timestamp_ms: response.expiry_timestamp_ms,
2418 max_timestamp_ms: response.max_timestamp_ms,
2419 token_id: response.token_id,
2420 hmac: response.hmac,
2421 renewers: Vec::new(),
2422 }),
2423 error: None,
2424 }
2425 } else {
2426 CreateDelegationTokenResult {
2427 token: None,
2428 error: Some(format!("{:?}", response.error_code)),
2429 }
2430 };
2431
2432 Ok(result)
2433 }
2434
2435 pub async fn renew_delegation_token(
2442 &self,
2443 hmac: &[u8],
2444 renew_period: Duration,
2445 ) -> Result<RenewDelegationTokenResult> {
2446 let conn = self.get_any_broker_connection().await?;
2447
2448 let request = RenewDelegationTokenRequest {
2449 hmac: Bytes::copy_from_slice(hmac),
2450 renew_period_ms: crate::util::duration_to_millis_i64(renew_period),
2451 };
2452
2453 let version = conn
2454 .negotiate_api_version(
2455 ApiKey::RenewDelegationToken,
2456 versions::RENEW_DELEGATION_TOKEN_MAX,
2457 versions::RENEW_DELEGATION_TOKEN_MIN,
2458 )
2459 .await
2460 .ok_or_else(|| {
2461 KrafkaError::protocol("no mutually supported RenewDelegationToken API version")
2462 })?;
2463
2464 let response_bytes = conn
2465 .send_request(ApiKey::RenewDelegationToken, version, |buf| {
2466 request.encode_versioned(version, buf)
2467 })
2468 .await?;
2469
2470 let mut buf = response_bytes;
2471 let response = RenewDelegationTokenResponse::decode_versioned(version, &mut buf)?;
2472
2473 if response.error_code.is_ok() {
2474 info!("Renewed delegation token");
2475 }
2476
2477 Ok(RenewDelegationTokenResult {
2478 expiry_timestamp_ms: response.expiry_timestamp_ms,
2479 error: if response.error_code.is_ok() {
2480 None
2481 } else {
2482 Some(format!("{:?}", response.error_code))
2483 },
2484 })
2485 }
2486
2487 pub async fn expire_delegation_token(
2495 &self,
2496 hmac: &[u8],
2497 expiry_period: Option<Duration>,
2498 ) -> Result<ExpireDelegationTokenResult> {
2499 let conn = self.get_any_broker_connection().await?;
2500
2501 let request = ExpireDelegationTokenRequest {
2502 hmac: Bytes::copy_from_slice(hmac),
2503 expiry_period_ms: expiry_period
2504 .map(crate::util::duration_to_millis_i64)
2505 .unwrap_or(-1),
2506 };
2507
2508 let version = conn
2509 .negotiate_api_version(
2510 ApiKey::ExpireDelegationToken,
2511 versions::EXPIRE_DELEGATION_TOKEN_MAX,
2512 versions::EXPIRE_DELEGATION_TOKEN_MIN,
2513 )
2514 .await
2515 .ok_or_else(|| {
2516 KrafkaError::protocol("no mutually supported ExpireDelegationToken API version")
2517 })?;
2518
2519 let response_bytes = conn
2520 .send_request(ApiKey::ExpireDelegationToken, version, |buf| {
2521 request.encode_versioned(version, buf)
2522 })
2523 .await?;
2524
2525 let mut buf = response_bytes;
2526 let response = ExpireDelegationTokenResponse::decode_versioned(version, &mut buf)?;
2527
2528 if response.error_code.is_ok() {
2529 info!("Expired delegation token");
2530 }
2531
2532 Ok(ExpireDelegationTokenResult {
2533 expiry_timestamp_ms: response.expiry_timestamp_ms,
2534 error: if response.error_code.is_ok() {
2535 None
2536 } else {
2537 Some(format!("{:?}", response.error_code))
2538 },
2539 })
2540 }
2541
2542 pub async fn describe_delegation_token(
2549 &self,
2550 owners: Option<&[(&str, &str)]>,
2551 ) -> Result<Vec<DelegationToken>> {
2552 let conn = self.get_any_broker_connection().await?;
2553
2554 let request = DescribeDelegationTokenRequest {
2555 owners: owners.map(|o| {
2556 o.iter()
2557 .map(|(t, n)| DescribeDelegationTokenOwner {
2558 principal_type: t.to_string(),
2559 principal_name: n.to_string(),
2560 })
2561 .collect()
2562 }),
2563 };
2564
2565 let version = conn
2566 .negotiate_api_version(
2567 ApiKey::DescribeDelegationToken,
2568 versions::DESCRIBE_DELEGATION_TOKEN_MAX,
2569 versions::DESCRIBE_DELEGATION_TOKEN_MIN,
2570 )
2571 .await
2572 .ok_or_else(|| {
2573 KrafkaError::protocol("no mutually supported DescribeDelegationToken API version")
2574 })?;
2575
2576 let response_bytes = conn
2577 .send_request(ApiKey::DescribeDelegationToken, version, |buf| {
2578 request.encode_versioned(version, buf)
2579 })
2580 .await?;
2581
2582 let mut buf = response_bytes;
2583 let response = DescribeDelegationTokenResponse::decode_versioned(version, &mut buf)?;
2584
2585 if !response.error_code.is_ok() {
2586 return Err(KrafkaError::broker(
2587 response.error_code,
2588 "DescribeDelegationToken failed",
2589 ));
2590 }
2591
2592 let tokens: Vec<DelegationToken> = response
2593 .tokens
2594 .into_iter()
2595 .map(|t| DelegationToken {
2596 principal_type: t.principal_type,
2597 principal_name: t.principal_name,
2598 issue_timestamp_ms: t.issue_timestamp_ms,
2599 expiry_timestamp_ms: t.expiry_timestamp_ms,
2600 max_timestamp_ms: t.max_timestamp_ms,
2601 token_id: t.token_id,
2602 hmac: t.hmac,
2603 renewers: t
2604 .renewers
2605 .into_iter()
2606 .map(|r| DelegationTokenRenewer {
2607 principal_type: r.principal_type,
2608 principal_name: r.principal_name,
2609 })
2610 .collect(),
2611 })
2612 .collect();
2613
2614 info!("Described {} delegation token(s)", tokens.len());
2615 Ok(tokens)
2616 }
2617
2618 pub async fn describe_client_quotas(
2647 &self,
2648 components: &[(&str, i8, Option<&str>)],
2649 strict: bool,
2650 ) -> Result<DescribeClientQuotasResult> {
2651 let conn = self.get_any_broker_connection().await?;
2652
2653 let request = DescribeClientQuotasRequest {
2654 components: components
2655 .iter()
2656 .map(
2657 |(entity_type, match_type, match_value)| QuotaFilterComponent {
2658 entity_type: entity_type.to_string(),
2659 match_type: *match_type,
2660 match_value: match_value.map(|v| v.to_string()),
2661 },
2662 )
2663 .collect(),
2664 strict,
2665 };
2666
2667 let version = conn
2668 .negotiate_api_version(
2669 ApiKey::DescribeClientQuotas,
2670 versions::DESCRIBE_CLIENT_QUOTAS_MAX,
2671 versions::DESCRIBE_CLIENT_QUOTAS_MIN,
2672 )
2673 .await
2674 .ok_or_else(|| {
2675 KrafkaError::protocol("no mutually supported DescribeClientQuotas API version")
2676 })?;
2677
2678 let response_bytes = conn
2679 .send_request(ApiKey::DescribeClientQuotas, version, |buf| {
2680 request.encode_versioned(version, buf)
2681 })
2682 .await?;
2683
2684 let mut buf = response_bytes;
2685 let response = DescribeClientQuotasResponse::decode_versioned(version, &mut buf)?;
2686
2687 let entries = response
2688 .entries
2689 .unwrap_or_default()
2690 .into_iter()
2691 .map(|entry| QuotaDescription {
2692 entity: entry
2693 .entity
2694 .into_iter()
2695 .map(|e| QuotaEntityComponent {
2696 entity_type: e.entity_type,
2697 entity_name: e.entity_name,
2698 })
2699 .collect(),
2700 values: entry
2701 .values
2702 .into_iter()
2703 .map(|v| QuotaConfig {
2704 key: v.key,
2705 value: v.value,
2706 })
2707 .collect(),
2708 })
2709 .collect::<Vec<_>>();
2710
2711 if response.error_code.is_ok() {
2712 info!("Described {} client quota entry(ies)", entries.len());
2713 }
2714
2715 Ok(DescribeClientQuotasResult {
2716 entries,
2717 error: if response.error_code.is_ok() {
2718 None
2719 } else {
2720 let msg = response
2721 .error_message
2722 .unwrap_or_else(|| format!("{:?}", response.error_code));
2723 Some(msg)
2724 },
2725 })
2726 }
2727
2728 pub async fn alter_client_quotas(
2753 &self,
2754 entries: &[QuotaAlteration<'_>],
2755 validate_only: bool,
2756 ) -> Result<Vec<AlterClientQuotaResult>> {
2757 let conn = self.get_any_broker_connection().await?;
2758
2759 let request = AlterClientQuotasRequest {
2760 entries: entries
2761 .iter()
2762 .map(|e| AlterQuotaEntry {
2763 entity: e
2764 .entity
2765 .iter()
2766 .map(|(t, n)| AlterQuotaEntity {
2767 entity_type: t.to_string(),
2768 entity_name: n.map(|v| v.to_string()),
2769 })
2770 .collect(),
2771 ops: e
2772 .ops
2773 .iter()
2774 .map(|(key, value)| AlterQuotaOp {
2775 key: key.to_string(),
2776 value: value.unwrap_or(0.0),
2777 remove: value.is_none(),
2778 })
2779 .collect(),
2780 })
2781 .collect(),
2782 validate_only,
2783 };
2784
2785 let version = conn
2786 .negotiate_api_version(
2787 ApiKey::AlterClientQuotas,
2788 versions::ALTER_CLIENT_QUOTAS_MAX,
2789 versions::ALTER_CLIENT_QUOTAS_MIN,
2790 )
2791 .await
2792 .ok_or_else(|| {
2793 KrafkaError::protocol("no mutually supported AlterClientQuotas API version")
2794 })?;
2795
2796 let response_bytes = conn
2797 .send_request(ApiKey::AlterClientQuotas, version, |buf| {
2798 request.encode_versioned(version, buf)
2799 })
2800 .await?;
2801
2802 let mut buf = response_bytes;
2803 let response = AlterClientQuotasResponse::decode_versioned(version, &mut buf)?;
2804
2805 let results: Vec<AlterClientQuotaResult> = response
2806 .entries
2807 .into_iter()
2808 .map(|entry| AlterClientQuotaResult {
2809 entity: entry
2810 .entity
2811 .into_iter()
2812 .map(|e| QuotaEntityComponent {
2813 entity_type: e.entity_type,
2814 entity_name: e.entity_name,
2815 })
2816 .collect(),
2817 error: if entry.error_code.is_ok() {
2818 None
2819 } else {
2820 let msg = entry
2821 .error_message
2822 .unwrap_or_else(|| format!("{:?}", entry.error_code));
2823 Some(msg)
2824 },
2825 })
2826 .collect();
2827
2828 info!("Altered {} client quota entry(ies)", results.len());
2829 Ok(results)
2830 }
2831
2832 pub async fn delete_consumer_groups(
2838 &self,
2839 group_ids: Vec<String>,
2840 ) -> Result<Vec<DeleteGroupResult>> {
2841 self.check_not_closed()?;
2842 let conn = self.get_any_broker_connection().await?;
2843
2844 let request = DeleteGroupsRequest::new(group_ids);
2845 let version = conn
2846 .negotiate_api_version(
2847 ApiKey::DeleteGroups,
2848 versions::DELETE_GROUPS_MAX,
2849 versions::DELETE_GROUPS_MIN,
2850 )
2851 .await
2852 .ok_or_else(|| {
2853 KrafkaError::protocol("no mutually supported DeleteGroups API version")
2854 })?;
2855
2856 let response_bytes = conn
2857 .send_request(ApiKey::DeleteGroups, version, |buf| {
2858 request.encode_versioned(version, buf)
2859 })
2860 .await?;
2861
2862 let mut buf = response_bytes;
2863 let response = DeleteGroupsResponse::decode_versioned(version, &mut buf)?;
2864
2865 let results = response
2866 .results
2867 .into_iter()
2868 .map(|r| DeleteGroupResult {
2869 group_id: r.group_id,
2870 error: if r.error_code.is_ok() {
2871 None
2872 } else {
2873 Some(format!("{:?}", r.error_code))
2874 },
2875 })
2876 .collect();
2877
2878 Ok(results)
2879 }
2880
2881 pub async fn describe_topic_partitions(
2900 &self,
2901 topics: Vec<String>,
2902 ) -> Result<DescribeTopicPartitionsResult> {
2903 self.check_not_closed()?;
2904 validate_topic_names(topics.iter().map(String::as_str))?;
2906 let conn = self.get_any_broker_connection().await?;
2907
2908 let version = conn
2909 .negotiate_api_version(
2910 ApiKey::DescribeTopicPartitions,
2911 versions::DESCRIBE_TOPIC_PARTITIONS_MAX,
2912 versions::DESCRIBE_TOPIC_PARTITIONS_MIN,
2913 )
2914 .await
2915 .ok_or_else(|| {
2916 KrafkaError::protocol("no mutually supported DescribeTopicPartitions API version")
2917 })?;
2918
2919 let mut all_topics: Vec<TopicPartitionDescription> = Vec::new();
2921 let mut cursor = None;
2922
2923 loop {
2924 let request = DescribeTopicPartitionsRequest {
2925 topics: topics.clone(),
2926 response_partition_limit: DEFAULT_RESPONSE_PARTITION_LIMIT,
2927 cursor,
2928 };
2929
2930 let response_bytes = conn
2931 .send_request(ApiKey::DescribeTopicPartitions, version, |buf| {
2932 request.encode_versioned(version, buf)
2933 })
2934 .await?;
2935
2936 let mut buf = response_bytes;
2937 let response = DescribeTopicPartitionsResponse::decode_versioned(version, &mut buf)?;
2938
2939 for t in response.topics {
2940 let null_uuid = [0u8; 16];
2944 let existing = if t.topic_id != null_uuid {
2945 all_topics.iter_mut().find(|e| e.topic_id == t.topic_id)
2946 } else {
2947 all_topics.iter_mut().find(|e| e.name == t.name)
2948 };
2949 let partitions: Vec<PartitionDescription> = t
2950 .partitions
2951 .into_iter()
2952 .map(|p| PartitionDescription {
2953 partition_index: p.partition_index,
2954 leader_id: p.leader_id,
2955 leader_epoch: p.leader_epoch,
2956 replica_nodes: p.replica_nodes,
2957 isr_nodes: p.isr_nodes,
2958 eligible_leader_replicas: p.eligible_leader_replicas,
2959 last_known_elr: p.last_known_elr,
2960 offline_replicas: p.offline_replicas,
2961 error: if p.error_code.is_ok() {
2962 None
2963 } else {
2964 Some(format!("{:?}", p.error_code))
2965 },
2966 })
2967 .collect();
2968
2969 if let Some(entry) = existing {
2970 entry.partitions.extend(partitions);
2971 } else {
2972 all_topics.push(TopicPartitionDescription {
2973 name: t.name,
2974 topic_id: t.topic_id,
2975 is_internal: t.is_internal,
2976 partitions,
2977 topic_authorized_operations: t.topic_authorized_operations,
2978 error: if t.error_code.is_ok() {
2979 None
2980 } else {
2981 Some(format!("{:?}", t.error_code))
2982 },
2983 });
2984 }
2985 }
2986
2987 match response.next_cursor {
2989 Some(c) => {
2990 cursor = Some(DescribeTopicPartitionsCursor {
2991 topic_name: c.topic_name,
2992 partition_index: c.partition_index,
2993 });
2994 }
2995 None => break,
2996 }
2997 }
2998
2999 info!("Described partitions for {} topics", all_topics.len());
3000 Ok(DescribeTopicPartitionsResult {
3001 topics: all_topics,
3002 next_cursor_topic: None,
3003 next_cursor_partition: None,
3004 })
3005 }
3006
3007 pub fn pool(&self) -> &Arc<ConnectionPool> {
3009 &self.pool
3010 }
3011
3012 pub fn update_seed_brokers(&self, servers: Vec<String>) -> Result<()> {
3021 self.metadata.update_seed_brokers(servers)
3022 }
3023
3024 pub async fn rebootstrap(&self) {
3027 self.metadata.rebootstrap().await;
3028 }
3029
3030 pub async fn close(&self) {
3039 if self.closed.swap(true, std::sync::atomic::Ordering::SeqCst) {
3040 return;
3041 }
3042 self.pool.close_all().await;
3043 info!("AdminClient closed");
3044 }
3045
3046 #[inline]
3048 pub fn is_closed(&self) -> bool {
3049 self.closed.load(std::sync::atomic::Ordering::SeqCst)
3050 }
3051
3052 #[inline]
3054 pub fn connection_metrics(&self) -> Arc<ConnectionMetrics> {
3055 self.pool.metrics()
3056 }
3057
3058 pub async fn describe_features(&self) -> Result<DescribeFeaturesResult> {
3076 self.check_not_closed()?;
3077 let conn = self.get_any_broker_connection().await?;
3078
3079 let request = crate::protocol::ApiVersionsRequest::new()
3080 .with_client_software("krafka", env!("CARGO_PKG_VERSION"));
3081
3082 let version = conn
3083 .negotiate_api_version(
3084 ApiKey::ApiVersions,
3085 versions::API_VERSIONS_MAX,
3086 3,
3088 )
3089 .await
3090 .ok_or_else(|| {
3091 KrafkaError::protocol(
3092 "no mutually supported ApiVersions v3+; feature discovery requires v3+",
3093 )
3094 })?;
3095
3096 let response_bytes = conn
3097 .send_request(ApiKey::ApiVersions, version, |buf| {
3098 if version >= 5 {
3099 request.encode_v5(buf)
3100 } else {
3101 request.encode_v3(buf)
3102 }
3103 })
3104 .await?;
3105
3106 let mut buf = response_bytes;
3107 let response = crate::protocol::ApiVersionsResponse::decode_v3(&mut buf)?;
3108
3109 if response.error_code != 0 {
3110 return Err(KrafkaError::broker(
3111 crate::error::ErrorCode::from(response.error_code),
3112 "ApiVersions request failed",
3113 ));
3114 }
3115
3116 Ok(DescribeFeaturesResult {
3117 supported_features: response.supported_features,
3118 finalized_features: response.finalized_features,
3119 finalized_features_epoch: response.finalized_features_epoch,
3120 })
3121 }
3122
3123 pub async fn update_features(
3147 &self,
3148 feature_updates: Vec<crate::protocol::FeatureUpdateKey>,
3149 validate_only: bool,
3150 ) -> Result<UpdateFeaturesResult> {
3151 self.check_not_closed()?;
3152 let conn = self.get_any_broker_connection().await?;
3153
3154 let request = UpdateFeaturesRequest::new(feature_updates).with_validate_only(validate_only);
3155
3156 let version = conn
3157 .negotiate_api_version(
3158 ApiKey::UpdateFeatures,
3159 versions::UPDATE_FEATURES_MAX,
3160 versions::UPDATE_FEATURES_MIN,
3161 )
3162 .await
3163 .ok_or_else(|| {
3164 KrafkaError::protocol("no mutually supported UpdateFeatures API version")
3165 })?;
3166
3167 if validate_only && version < 1 {
3169 return Err(KrafkaError::protocol(
3170 "validate_only requires UpdateFeatures v1+, but broker only supports v0",
3171 ));
3172 }
3173
3174 let response_bytes = conn
3175 .send_request(ApiKey::UpdateFeatures, version, |buf| {
3176 request.encode_versioned(version, buf)
3177 })
3178 .await?;
3179
3180 let mut buf = response_bytes;
3181 let response = UpdateFeaturesResponse::decode_versioned(version, &mut buf)?;
3182
3183 if !response.is_ok() {
3184 let msg = response
3185 .error_message
3186 .unwrap_or_else(|| format!("{:?}", response.error_code));
3187 return Err(KrafkaError::protocol(msg));
3188 }
3189
3190 Ok(UpdateFeaturesResult {
3191 results: response
3192 .results
3193 .into_iter()
3194 .map(|r| UpdateFeatureResult {
3195 feature: r.feature,
3196 error: if r.error_code.is_ok() {
3197 None
3198 } else {
3199 Some(
3200 r.error_message
3201 .unwrap_or_else(|| format!("{:?}", r.error_code)),
3202 )
3203 },
3204 })
3205 .collect(),
3206 })
3207 }
3208
3209 pub async fn describe_log_dirs(
3235 &self,
3236 topics: Option<Vec<DescribableLogDirTopic>>,
3237 ) -> Result<Vec<LogDirInfo>> {
3238 self.check_not_closed()?;
3239 if let Some(ref ts) = topics {
3241 for t in ts {
3242 validate_topic_name(&t.topic)?;
3243 }
3244 }
3245 let brokers = self.metadata.brokers();
3246 if brokers.is_empty() {
3247 return Err(KrafkaError::broker(
3248 crate::error::ErrorCode::UnknownServerError,
3249 "no brokers available",
3250 ));
3251 }
3252
3253 let topic_scope = match &topics {
3254 None => "all".to_string(),
3255 Some(t) => format!("{} topic(s)", t.len()),
3256 };
3257
3258 let request = match &topics {
3259 None => DescribeLogDirsRequest::all(),
3260 Some(t) => DescribeLogDirsRequest::for_topics(t.clone()),
3261 };
3262
3263 let mut all_dirs = Vec::new();
3264
3265 for broker in &brokers {
3266 let conn = self
3267 .pool
3268 .get_connection_by_id(broker.id, broker.address())
3269 .await?;
3270
3271 let version = conn
3272 .negotiate_api_version(
3273 ApiKey::DescribeLogDirs,
3274 versions::DESCRIBE_LOG_DIRS_MAX,
3275 versions::DESCRIBE_LOG_DIRS_MIN,
3276 )
3277 .await
3278 .ok_or_else(|| {
3279 KrafkaError::protocol("no mutually supported DescribeLogDirs API version")
3280 })?;
3281
3282 let response_bytes = match conn
3283 .send_request(ApiKey::DescribeLogDirs, version, |buf| {
3284 request.encode_versioned(version, buf)
3285 })
3286 .await
3287 {
3288 Ok(bytes) => bytes,
3289 Err(e) => {
3290 warn!(
3291 "DescribeLogDirs request failed on broker {} ({}): {}",
3292 broker.id, topic_scope, e
3293 );
3294 continue;
3295 }
3296 };
3297
3298 let mut buf = response_bytes;
3299 let response = match DescribeLogDirsResponse::decode_versioned(version, &mut buf) {
3300 Ok(r) => r,
3301 Err(e) => {
3302 warn!(
3303 "DescribeLogDirs decode failed on broker {} ({}): {}",
3304 broker.id, topic_scope, e
3305 );
3306 continue;
3307 }
3308 };
3309
3310 if !response.error_code.is_ok() {
3312 warn!(
3313 "DescribeLogDirs top-level error on broker {} ({}): {:?}",
3314 broker.id, topic_scope, response.error_code
3315 );
3316 }
3317
3318 if response.results.is_empty() && version < 3 {
3321 warn!(
3322 "DescribeLogDirs returned empty results on broker {} (v{}, {}); \
3323 likely CLUSTER_AUTHORIZATION_FAILED",
3324 broker.id, version, topic_scope
3325 );
3326 }
3327
3328 for result in response.results {
3329 all_dirs.push(LogDirInfo {
3330 broker_id: broker.id,
3331 log_dir: result.log_dir,
3332 error: if result.error_code.is_ok() {
3333 None
3334 } else {
3335 Some(format!("{:?}", result.error_code))
3336 },
3337 topics: result
3338 .topics
3339 .into_iter()
3340 .map(|t| LogDirTopicInfo {
3341 name: t.name,
3342 partitions: t
3343 .partitions
3344 .into_iter()
3345 .map(|p| LogDirPartitionInfo {
3346 partition_index: p.partition_index,
3347 partition_size: p.partition_size,
3348 offset_lag: p.offset_lag,
3349 is_future_key: p.is_future_key,
3350 })
3351 .collect(),
3352 })
3353 .collect(),
3354 total_bytes: result.total_bytes,
3355 usable_bytes: result.usable_bytes,
3356 });
3357 }
3358 }
3359
3360 info!(
3361 "Described {} log dir(s) across {} broker(s)",
3362 all_dirs.len(),
3363 brokers.len()
3364 );
3365 Ok(all_dirs)
3366 }
3367
3368 pub async fn elect_leaders(
3390 &self,
3391 election_type: ElectionType,
3392 topic_partitions: Option<Vec<ElectLeadersTopicPartitions>>,
3393 timeout: Duration,
3394 ) -> Result<Vec<ElectLeadersResult>> {
3395 if let Some(ref tps) = topic_partitions {
3397 for tp in tps {
3398 validate_topic_name(&tp.topic)?;
3399 }
3400 }
3401 let conn = self.get_any_broker_connection().await?;
3402
3403 let request = ElectLeadersRequest {
3404 election_type,
3405 topic_partitions,
3406 timeout_ms: crate::util::duration_to_millis_i32(timeout),
3407 };
3408
3409 let version = conn
3410 .negotiate_api_version(
3411 ApiKey::ElectLeaders,
3412 versions::ELECT_LEADERS_MAX,
3413 versions::ELECT_LEADERS_MIN,
3414 )
3415 .await
3416 .ok_or_else(|| {
3417 KrafkaError::protocol("no mutually supported ElectLeaders API version")
3418 })?;
3419
3420 let response_bytes = conn
3421 .send_request(ApiKey::ElectLeaders, version, |buf| {
3422 request.encode_versioned(version, buf)
3423 })
3424 .await?;
3425
3426 let mut buf = response_bytes;
3427 let response = ElectLeadersResponse::decode_versioned(version, &mut buf)?;
3428
3429 if !response.error_code.is_ok() {
3430 warn!("ElectLeaders top-level error: {:?}", response.error_code);
3431 }
3432
3433 let results = response
3434 .replica_election_results
3435 .into_iter()
3436 .map(|topic| ElectLeadersResult {
3437 topic: topic.topic,
3438 partitions: topic
3439 .partition_results
3440 .into_iter()
3441 .map(|p| ElectLeadersPartitionInfo {
3442 partition_id: p.partition_id,
3443 error: if p.error_code.is_ok() {
3444 None
3445 } else {
3446 Some(
3447 p.error_message
3448 .unwrap_or_else(|| format!("{:?}", p.error_code)),
3449 )
3450 },
3451 })
3452 .collect(),
3453 })
3454 .collect::<Vec<_>>();
3455
3456 info!("ElectLeaders completed for {} topic(s)", results.len());
3457 Ok(results)
3458 }
3459
3460 pub async fn alter_partition_reassignments(
3489 &self,
3490 topics: Vec<ReassignableTopic>,
3491 timeout: Duration,
3492 ) -> Result<AlterReassignmentsResult> {
3493 for t in &topics {
3495 validate_topic_name(&t.name)?;
3496 }
3497 let conn = self.get_any_broker_connection().await?;
3498
3499 let request = AlterPartitionReassignmentsRequest {
3500 timeout_ms: crate::util::duration_to_millis_i32(timeout),
3501 topics,
3502 };
3503
3504 let version = conn
3505 .negotiate_api_version(
3506 ApiKey::AlterPartitionReassignments,
3507 versions::ALTER_PARTITION_REASSIGNMENTS_MAX,
3508 versions::ALTER_PARTITION_REASSIGNMENTS_MIN,
3509 )
3510 .await
3511 .ok_or_else(|| {
3512 KrafkaError::protocol(
3513 "no mutually supported AlterPartitionReassignments API version",
3514 )
3515 })?;
3516
3517 let response_bytes = conn
3518 .send_request(ApiKey::AlterPartitionReassignments, version, |buf| {
3519 request.encode_versioned(version, buf)
3520 })
3521 .await?;
3522
3523 let mut buf = response_bytes;
3524 let response = AlterPartitionReassignmentsResponse::decode_versioned(version, &mut buf)?;
3525
3526 if !response.error_code.is_ok() {
3527 warn!(
3528 "AlterPartitionReassignments top-level error: {:?} — {}",
3529 response.error_code,
3530 response.error_message.as_deref().unwrap_or("(no message)")
3531 );
3532 }
3533
3534 let topic_results = response
3535 .responses
3536 .into_iter()
3537 .map(|t| ReassignmentTopicResult {
3538 name: t.name,
3539 partitions: t
3540 .partitions
3541 .into_iter()
3542 .map(|p| ReassignmentPartitionResult {
3543 partition_index: p.partition_index,
3544 error: if p.error_code.is_ok() {
3545 None
3546 } else {
3547 Some(
3548 p.error_message
3549 .unwrap_or_else(|| format!("{:?}", p.error_code)),
3550 )
3551 },
3552 })
3553 .collect(),
3554 })
3555 .collect::<Vec<_>>();
3556
3557 info!(
3558 "AlterPartitionReassignments completed for {} topic(s)",
3559 topic_results.len()
3560 );
3561
3562 Ok(AlterReassignmentsResult {
3563 error: if response.error_code.is_ok() {
3564 None
3565 } else {
3566 Some(
3567 response
3568 .error_message
3569 .unwrap_or_else(|| format!("{:?}", response.error_code)),
3570 )
3571 },
3572 topics: topic_results,
3573 })
3574 }
3575
3576 pub async fn list_partition_reassignments(
3596 &self,
3597 topics: Option<Vec<ListPartitionReassignmentsTopic>>,
3598 timeout: Duration,
3599 ) -> Result<Vec<PartitionReassignmentInfo>> {
3600 if let Some(ref ts) = topics {
3602 for t in ts {
3603 validate_topic_name(&t.name)?;
3604 }
3605 }
3606 let conn = self.get_any_broker_connection().await?;
3607
3608 let request = ListPartitionReassignmentsRequest {
3609 timeout_ms: crate::util::duration_to_millis_i32(timeout),
3610 topics,
3611 };
3612
3613 let version = conn
3614 .negotiate_api_version(
3615 ApiKey::ListPartitionReassignments,
3616 versions::LIST_PARTITION_REASSIGNMENTS_MAX,
3617 versions::LIST_PARTITION_REASSIGNMENTS_MIN,
3618 )
3619 .await
3620 .ok_or_else(|| {
3621 KrafkaError::protocol(
3622 "no mutually supported ListPartitionReassignments API version",
3623 )
3624 })?;
3625
3626 let response_bytes = conn
3627 .send_request(ApiKey::ListPartitionReassignments, version, |buf| {
3628 request.encode_versioned(version, buf)
3629 })
3630 .await?;
3631
3632 let mut buf = response_bytes;
3633 let response = ListPartitionReassignmentsResponse::decode_versioned(version, &mut buf)?;
3634
3635 if !response.error_code.is_ok() {
3636 warn!(
3637 "ListPartitionReassignments top-level error: {:?} — {}",
3638 response.error_code,
3639 response.error_message.as_deref().unwrap_or("(no message)")
3640 );
3641 }
3642
3643 let results = response
3644 .topics
3645 .into_iter()
3646 .map(|t| PartitionReassignmentInfo {
3647 name: t.name,
3648 partitions: t
3649 .partitions
3650 .into_iter()
3651 .map(|p| PartitionReassignmentPartitionInfo {
3652 partition_index: p.partition_index,
3653 replicas: p.replicas,
3654 adding_replicas: p.adding_replicas,
3655 removing_replicas: p.removing_replicas,
3656 })
3657 .collect(),
3658 })
3659 .collect::<Vec<_>>();
3660
3661 info!(
3662 "Listed {} topic(s) with ongoing reassignments",
3663 results.len()
3664 );
3665 Ok(results)
3666 }
3667
3668 pub async fn alter_replica_log_dirs(
3700 &self,
3701 dirs: Vec<AlterReplicaLogDir>,
3702 ) -> Result<Vec<AlterReplicaLogDirsResult>> {
3703 self.check_not_closed()?;
3704 for d in &dirs {
3706 for t in &d.topics {
3707 validate_topic_name(&t.name)?;
3708 }
3709 }
3710 let brokers = self.metadata.brokers();
3711 if brokers.is_empty() {
3712 return Err(KrafkaError::broker(
3713 crate::error::ErrorCode::UnknownServerError,
3714 "no brokers available",
3715 ));
3716 }
3717
3718 let request = AlterReplicaLogDirsRequest { dirs };
3719 let mut all_results = Vec::new();
3720
3721 for broker in &brokers {
3722 let conn = self
3723 .pool
3724 .get_connection_by_id(broker.id, broker.address())
3725 .await?;
3726
3727 let version = conn
3728 .negotiate_api_version(
3729 ApiKey::AlterReplicaLogDirs,
3730 versions::ALTER_REPLICA_LOG_DIRS_MAX,
3731 versions::ALTER_REPLICA_LOG_DIRS_MIN,
3732 )
3733 .await
3734 .ok_or_else(|| {
3735 KrafkaError::protocol("no mutually supported AlterReplicaLogDirs API version")
3736 })?;
3737
3738 let response_bytes = match conn
3739 .send_request(ApiKey::AlterReplicaLogDirs, version, |buf| {
3740 request.encode_versioned(version, buf)
3741 })
3742 .await
3743 {
3744 Ok(bytes) => bytes,
3745 Err(e) => {
3746 warn!(
3747 "AlterReplicaLogDirs request failed on broker {} ({} dir(s)): {}",
3748 broker.id,
3749 request.dirs.len(),
3750 e
3751 );
3752 continue;
3753 }
3754 };
3755
3756 let mut buf = response_bytes;
3757 let response = match AlterReplicaLogDirsResponse::decode_versioned(version, &mut buf) {
3758 Ok(r) => r,
3759 Err(e) => {
3760 warn!(
3761 "AlterReplicaLogDirs decode failed on broker {} ({} dir(s)): {}",
3762 broker.id,
3763 request.dirs.len(),
3764 e
3765 );
3766 continue;
3767 }
3768 };
3769
3770 for topic in response.results {
3771 all_results.push(AlterReplicaLogDirsResult {
3772 broker_id: broker.id,
3773 topic_name: topic.topic_name,
3774 partitions: topic
3775 .partitions
3776 .into_iter()
3777 .map(|p| AlterReplicaLogDirsPartitionResult {
3778 partition_index: p.partition_index,
3779 error: if p.error_code.is_ok() {
3780 None
3781 } else {
3782 Some(format!("{:?}", p.error_code))
3783 },
3784 })
3785 .collect(),
3786 });
3787 }
3788 }
3789
3790 info!(
3791 "AlterReplicaLogDirs completed for {} topic(s)",
3792 all_results.len()
3793 );
3794 Ok(all_results)
3795 }
3796
3797 pub async fn delete_consumer_group_offsets(
3817 &self,
3818 group_id: &str,
3819 topic_partitions: &[(&str, &[i32])],
3820 ) -> Result<OffsetDeleteResult> {
3821 self.check_not_closed()?;
3822
3823 let any_conn = self.get_any_broker_connection().await?;
3825 let coord_request = FindCoordinatorRequest::for_group(group_id);
3826 let coord_version = any_conn
3827 .negotiate_api_version(
3828 ApiKey::FindCoordinator,
3829 versions::FIND_COORDINATOR_MAX,
3830 versions::FIND_COORDINATOR_MIN,
3831 )
3832 .await
3833 .ok_or_else(|| {
3834 KrafkaError::protocol("no mutually supported FindCoordinator API version")
3835 })?;
3836 let coord_response_bytes = any_conn
3837 .send_request(ApiKey::FindCoordinator, coord_version, |buf| {
3838 coord_request.encode_versioned(coord_version, buf)
3839 })
3840 .await?;
3841 let mut coord_buf = coord_response_bytes;
3842 let coord_response =
3843 FindCoordinatorResponse::decode_versioned(coord_version, &mut coord_buf)?;
3844
3845 let coordinator = if coord_response.error_code.is_ok() {
3846 let addr = format!("{}:{}", coord_response.host, coord_response.port);
3847 self.pool
3848 .get_connection_by_id(coord_response.node_id, &addr)
3849 .await?
3850 } else {
3851 warn!(
3852 "FindCoordinator failed for group '{}': {:?}, using any broker",
3853 group_id, coord_response.error_code
3854 );
3855 any_conn
3856 };
3857
3858 let topics = topic_partitions
3859 .iter()
3860 .map(|(name, partitions)| OffsetDeleteTopicRequest {
3861 name: (*name).to_string(),
3862 partitions: partitions
3863 .iter()
3864 .map(|&p| OffsetDeletePartitionRequest { partition_index: p })
3865 .collect(),
3866 })
3867 .collect();
3868
3869 let request = OffsetDeleteRequest {
3870 group_id: group_id.to_string(),
3871 topics,
3872 };
3873
3874 let version = coordinator
3875 .negotiate_api_version(
3876 ApiKey::OffsetDelete,
3877 versions::OFFSET_DELETE_MAX,
3878 versions::OFFSET_DELETE_MIN,
3879 )
3880 .await
3881 .ok_or_else(|| {
3882 KrafkaError::protocol("no mutually supported OffsetDelete API version")
3883 })?;
3884
3885 let response_bytes = coordinator
3886 .send_request(ApiKey::OffsetDelete, version, |buf| {
3887 request.encode_versioned(version, buf)
3888 })
3889 .await?;
3890
3891 let mut buf = response_bytes;
3892 let response = OffsetDeleteResponse::decode_versioned(version, &mut buf)?;
3893
3894 if !response.error_code.is_ok() {
3895 warn!("OffsetDelete top-level error: {:?}", response.error_code);
3896 }
3897
3898 let topics = response
3899 .topics
3900 .into_iter()
3901 .map(|t| OffsetDeleteTopicResult {
3902 name: t.name,
3903 partitions: t
3904 .partitions
3905 .into_iter()
3906 .map(|p| OffsetDeletePartitionResult {
3907 partition_index: p.partition_index,
3908 error: if p.error_code.is_ok() {
3909 None
3910 } else {
3911 Some(format!("{:?}", p.error_code))
3912 },
3913 })
3914 .collect(),
3915 })
3916 .collect::<Vec<_>>();
3917
3918 info!("OffsetDelete completed for group {group_id}");
3919
3920 Ok(OffsetDeleteResult {
3921 error: if response.error_code.is_ok() {
3922 None
3923 } else {
3924 Some(format!("{:?}", response.error_code))
3925 },
3926 topics,
3927 })
3928 }
3929
3930 pub async fn describe_user_scram_credentials(
3948 &self,
3949 users: Option<Vec<String>>,
3950 ) -> Result<DescribeUserScramCredentialsResult> {
3951 let conn = self.get_any_broker_connection().await?;
3952
3953 let request = DescribeUserScramCredentialsRequest { users };
3954
3955 let version = conn
3956 .negotiate_api_version(
3957 ApiKey::DescribeUserScramCredentials,
3958 versions::DESCRIBE_USER_SCRAM_CREDENTIALS_MAX,
3959 versions::DESCRIBE_USER_SCRAM_CREDENTIALS_MIN,
3960 )
3961 .await
3962 .ok_or_else(|| {
3963 KrafkaError::protocol(
3964 "no mutually supported DescribeUserScramCredentials API version",
3965 )
3966 })?;
3967
3968 let response_bytes = conn
3969 .send_request(ApiKey::DescribeUserScramCredentials, version, |buf| {
3970 request.encode_versioned(version, buf)
3971 })
3972 .await?;
3973
3974 let mut buf = response_bytes;
3975 let response = DescribeUserScramCredentialsResponse::decode_versioned(version, &mut buf)?;
3976
3977 if !response.error_code.is_ok() {
3978 warn!(
3979 "DescribeUserScramCredentials top-level error: {:?} — {}",
3980 response.error_code,
3981 response.error_message.as_deref().unwrap_or("(no message)")
3982 );
3983 }
3984
3985 let users = response
3986 .results
3987 .into_iter()
3988 .map(|r| ScramCredentialUserResult {
3989 name: r.user,
3990 error: if r.error_code.is_ok() {
3991 None
3992 } else {
3993 r.error_message
3994 .or_else(|| Some(format!("{:?}", r.error_code)))
3995 },
3996 credential_infos: r
3997 .credential_infos
3998 .into_iter()
3999 .map(|c| ScramCredentialInfoResult {
4000 mechanism: c.mechanism,
4001 iterations: c.iterations,
4002 })
4003 .collect(),
4004 })
4005 .collect::<Vec<_>>();
4006
4007 info!(
4008 "DescribeUserScramCredentials returned {} user(s)",
4009 users.len()
4010 );
4011
4012 Ok(DescribeUserScramCredentialsResult {
4013 error: if response.error_code.is_ok() {
4014 None
4015 } else {
4016 response
4017 .error_message
4018 .or_else(|| Some(format!("{:?}", response.error_code)))
4019 },
4020 users,
4021 })
4022 }
4023
4024 pub async fn alter_user_scram_credentials(
4055 &self,
4056 deletions: Vec<ScramCredentialDeletion>,
4057 upsertions: Vec<ScramCredentialUpsertion>,
4058 ) -> Result<Vec<AlterScramCredentialResult>> {
4059 let conn = self.get_any_broker_connection().await?;
4060
4061 let request = AlterUserScramCredentialsRequest {
4062 deletions,
4063 upsertions,
4064 };
4065
4066 let version = conn
4067 .negotiate_api_version(
4068 ApiKey::AlterUserScramCredentials,
4069 versions::ALTER_USER_SCRAM_CREDENTIALS_MAX,
4070 versions::ALTER_USER_SCRAM_CREDENTIALS_MIN,
4071 )
4072 .await
4073 .ok_or_else(|| {
4074 KrafkaError::protocol("no mutually supported AlterUserScramCredentials API version")
4075 })?;
4076
4077 let response_bytes = conn
4078 .send_request(ApiKey::AlterUserScramCredentials, version, |buf| {
4079 request.encode_versioned(version, buf)
4080 })
4081 .await?;
4082
4083 let mut buf = response_bytes;
4084 let response = AlterUserScramCredentialsResponse::decode_versioned(version, &mut buf)?;
4085
4086 let results = response
4087 .results
4088 .into_iter()
4089 .map(|r| AlterScramCredentialResult {
4090 user: r.user,
4091 error: if r.error_code.is_ok() {
4092 None
4093 } else {
4094 r.error_message
4095 .or_else(|| Some(format!("{:?}", r.error_code)))
4096 },
4097 })
4098 .collect::<Vec<_>>();
4099
4100 info!(
4101 "AlterUserScramCredentials completed for {} user(s)",
4102 results.len()
4103 );
4104 Ok(results)
4105 }
4106
4107 pub async fn describe_producers(
4128 &self,
4129 topic_partitions: &[(&str, &[i32])],
4130 ) -> Result<Vec<DescribeProducersTopicResult>> {
4131 self.check_not_closed()?;
4132
4133 for attempt in 0u8..2 {
4134 if attempt == 1 {
4135 let topics: Vec<&str> = topic_partitions.iter().map(|&(t, _)| t).collect();
4137 let _ = self.metadata.refresh_for_topics(Some(&topics)).await;
4138 }
4139
4140 let brokers = self.metadata.brokers();
4141 if brokers.is_empty() {
4142 return Err(KrafkaError::broker(
4143 crate::error::ErrorCode::UnknownServerError,
4144 "no brokers available",
4145 ));
4146 }
4147
4148 let fallback_id = brokers[0].id;
4149
4150 let mut by_leader: HashMap<i32, HashMap<String, Vec<i32>>> = HashMap::new();
4152 for &(topic, partitions) in topic_partitions {
4153 for &pid in partitions {
4154 let leader = self.metadata.leader(topic, pid).unwrap_or(fallback_id);
4155 by_leader
4156 .entry(leader)
4157 .or_default()
4158 .entry(topic.to_string())
4159 .or_default()
4160 .push(pid);
4161 }
4162 }
4163
4164 let mut all_results: HashMap<String, DescribeProducersTopicResult> = HashMap::new();
4165 let mut has_stale_leader = false;
4166
4167 for (broker_id, topic_map) in by_leader {
4168 let broker = brokers
4169 .iter()
4170 .find(|b| b.id == broker_id)
4171 .unwrap_or(&brokers[0]);
4172 let conn = self
4173 .pool
4174 .get_connection_by_id(broker.id, broker.address())
4175 .await?;
4176
4177 let topics = topic_map
4178 .into_iter()
4179 .map(|(name, partition_indexes)| DescribeProducersTopicRequest {
4180 name,
4181 partition_indexes,
4182 })
4183 .collect();
4184
4185 let request = DescribeProducersRequest { topics };
4186
4187 let version = conn
4188 .negotiate_api_version(
4189 ApiKey::DescribeProducers,
4190 versions::DESCRIBE_PRODUCERS_MAX,
4191 versions::DESCRIBE_PRODUCERS_MIN,
4192 )
4193 .await
4194 .ok_or_else(|| {
4195 KrafkaError::protocol("no mutually supported DescribeProducers API version")
4196 })?;
4197
4198 let response_bytes = match conn
4199 .send_request(ApiKey::DescribeProducers, version, |buf| {
4200 request.encode_versioned(version, buf)
4201 })
4202 .await
4203 {
4204 Ok(bytes) => bytes,
4205 Err(e) => {
4206 warn!(
4207 "DescribeProducers request failed on broker {}: {}",
4208 broker.id, e
4209 );
4210 continue;
4211 }
4212 };
4213
4214 let mut buf = response_bytes;
4215 let response = match DescribeProducersResponse::decode_versioned(version, &mut buf)
4216 {
4217 Ok(r) => r,
4218 Err(e) => {
4219 warn!(
4220 "DescribeProducers decode failed on broker {}: {}",
4221 broker.id, e
4222 );
4223 continue;
4224 }
4225 };
4226
4227 for topic in response.topics {
4228 let entry = all_results.entry(topic.name.clone()).or_insert_with(|| {
4229 DescribeProducersTopicResult {
4230 name: topic.name,
4231 partitions: Vec::new(),
4232 }
4233 });
4234 entry
4235 .partitions
4236 .extend(topic.partitions.into_iter().map(|p| {
4237 if p.error_code == crate::error::ErrorCode::NotLeaderForPartition {
4238 has_stale_leader = true;
4239 }
4240 DescribeProducersPartitionInfo {
4241 partition_index: p.partition_index,
4242 error: if p.error_code.is_ok() {
4243 None
4244 } else {
4245 Some(
4246 p.error_message
4247 .unwrap_or_else(|| format!("{:?}", p.error_code)),
4248 )
4249 },
4250 active_producers: p
4251 .active_producers
4252 .into_iter()
4253 .map(|pr| ProducerStateInfo {
4254 producer_id: pr.producer_id,
4255 producer_epoch: pr.producer_epoch,
4256 last_sequence: pr.last_sequence,
4257 last_timestamp: pr.last_timestamp,
4258 coordinator_epoch: pr.coordinator_epoch,
4259 current_txn_start_offset: pr.current_txn_start_offset,
4260 })
4261 .collect(),
4262 }
4263 }));
4264 }
4265 }
4266
4267 if has_stale_leader && attempt == 0 {
4268 warn!(
4269 "NotLeaderForPartition in DescribeProducers response, retrying with refreshed metadata"
4270 );
4271 continue;
4272 }
4273
4274 let results: Vec<DescribeProducersTopicResult> = all_results.into_values().collect();
4275 info!("DescribeProducers returned {} topic(s)", results.len());
4276 return Ok(results);
4277 }
4278 Err(KrafkaError::protocol(
4279 "DescribeProducers retry loop exhausted after metadata refresh",
4280 ))
4281 }
4282
4283 pub async fn describe_transactions(
4300 &self,
4301 transactional_ids: &[&str],
4302 ) -> Result<Vec<TransactionDescription>> {
4303 self.check_not_closed()?;
4304 let brokers = self.metadata.brokers();
4305 if brokers.is_empty() {
4306 return Err(KrafkaError::broker(
4307 crate::error::ErrorCode::UnknownServerError,
4308 "no brokers available",
4309 ));
4310 }
4311
4312 let any_broker = &brokers[0];
4314 let any_conn = self
4315 .pool
4316 .get_connection_by_id(any_broker.id, any_broker.address())
4317 .await?;
4318
4319 let mut coordinator_txns: HashMap<i32, Vec<String>> = HashMap::new();
4320
4321 for txn_id in transactional_ids {
4322 let coord_request = FindCoordinatorRequest::for_transaction(txn_id);
4323 let coord_version = any_conn
4324 .negotiate_api_version(
4325 ApiKey::FindCoordinator,
4326 versions::FIND_COORDINATOR_MAX,
4327 versions::FIND_COORDINATOR_MIN,
4328 )
4329 .await
4330 .ok_or_else(|| {
4331 KrafkaError::protocol("no mutually supported FindCoordinator API version")
4332 })?;
4333
4334 let coord_response_bytes = any_conn
4335 .send_request(ApiKey::FindCoordinator, coord_version, |buf| {
4336 coord_request.encode_versioned(coord_version, buf)
4337 })
4338 .await?;
4339 let mut coord_buf = coord_response_bytes;
4340 let coord_response =
4341 FindCoordinatorResponse::decode_versioned(coord_version, &mut coord_buf)?;
4342
4343 if coord_response.error_code.is_ok() {
4344 coordinator_txns
4345 .entry(coord_response.node_id)
4346 .or_default()
4347 .push((*txn_id).to_string());
4348 } else {
4349 warn!(
4350 "FindCoordinator failed for txn '{}': {:?}, falling back to broker {}",
4351 txn_id, coord_response.error_code, any_broker.id
4352 );
4353 coordinator_txns
4354 .entry(any_broker.id)
4355 .or_default()
4356 .push((*txn_id).to_string());
4357 }
4358 }
4359
4360 let mut all_results = Vec::new();
4361
4362 for (broker_id, txn_ids) in coordinator_txns {
4363 let broker = brokers
4364 .iter()
4365 .find(|b| b.id == broker_id)
4366 .unwrap_or(any_broker);
4367 let conn = self
4368 .pool
4369 .get_connection_by_id(broker.id, broker.address())
4370 .await?;
4371
4372 let request = DescribeTransactionsRequest {
4373 transactional_ids: txn_ids,
4374 };
4375
4376 let version = conn
4377 .negotiate_api_version(
4378 ApiKey::DescribeTransactions,
4379 versions::DESCRIBE_TRANSACTIONS_MAX,
4380 versions::DESCRIBE_TRANSACTIONS_MIN,
4381 )
4382 .await
4383 .ok_or_else(|| {
4384 KrafkaError::protocol("no mutually supported DescribeTransactions API version")
4385 })?;
4386
4387 let response_bytes = match conn
4388 .send_request(ApiKey::DescribeTransactions, version, |buf| {
4389 request.encode_versioned(version, buf)
4390 })
4391 .await
4392 {
4393 Ok(bytes) => bytes,
4394 Err(e) => {
4395 warn!(
4396 "DescribeTransactions request failed on broker {}: {}",
4397 broker.id, e
4398 );
4399 continue;
4400 }
4401 };
4402
4403 let mut buf = response_bytes;
4404 let response = match DescribeTransactionsResponse::decode_versioned(version, &mut buf) {
4405 Ok(r) => r,
4406 Err(e) => {
4407 warn!(
4408 "DescribeTransactions decode failed on broker {}: {}",
4409 broker.id, e
4410 );
4411 continue;
4412 }
4413 };
4414
4415 all_results.extend(response.transaction_states.into_iter().map(|s| {
4416 TransactionDescription {
4417 transactional_id: s.transactional_id,
4418 error: if s.error_code.is_ok() {
4419 None
4420 } else {
4421 Some(format!("{:?}", s.error_code))
4422 },
4423 state: s.transaction_state,
4424 timeout_ms: s.transaction_timeout_ms,
4425 start_time_ms: s.transaction_start_time_ms,
4426 producer_id: s.producer_id,
4427 producer_epoch: s.producer_epoch,
4428 topics: s
4429 .topics
4430 .into_iter()
4431 .map(|t| TransactionTopicInfo {
4432 topic: t.topic,
4433 partitions: t.partitions,
4434 })
4435 .collect(),
4436 }
4437 }));
4438 }
4439
4440 info!(
4441 "DescribeTransactions returned {} transaction(s)",
4442 all_results.len()
4443 );
4444 Ok(all_results)
4445 }
4446
4447 pub async fn list_transactions(
4468 &self,
4469 state_filters: &[&str],
4470 producer_id_filters: &[i64],
4471 duration_filter: i64,
4472 ) -> Result<ListTransactionsResult> {
4473 self.check_not_closed()?;
4474 let brokers = self.metadata.brokers();
4475 if brokers.is_empty() {
4476 return Err(KrafkaError::broker(
4477 crate::error::ErrorCode::UnknownServerError,
4478 "no brokers available",
4479 ));
4480 }
4481
4482 let request = ListTransactionsRequest {
4483 state_filters: state_filters.iter().map(|s| (*s).to_string()).collect(),
4484 producer_id_filters: producer_id_filters.to_vec(),
4485 duration_filter,
4486 };
4487
4488 let mut all_transactions = Vec::new();
4489 let mut all_unknown_state_filters = Vec::new();
4490 let mut last_error: Option<String> = None;
4491
4492 for broker in &brokers {
4493 let conn = self
4494 .pool
4495 .get_connection_by_id(broker.id, broker.address())
4496 .await?;
4497
4498 let version = conn
4499 .negotiate_api_version(
4500 ApiKey::ListTransactions,
4501 versions::LIST_TRANSACTIONS_MAX,
4502 versions::LIST_TRANSACTIONS_MIN,
4503 )
4504 .await
4505 .ok_or_else(|| {
4506 KrafkaError::protocol("no mutually supported ListTransactions API version")
4507 })?;
4508
4509 let response_bytes = match conn
4510 .send_request(ApiKey::ListTransactions, version, |buf| {
4511 request.encode_versioned(version, buf)
4512 })
4513 .await
4514 {
4515 Ok(bytes) => bytes,
4516 Err(e) => {
4517 warn!(
4518 "ListTransactions request failed on broker {}: {}",
4519 broker.id, e
4520 );
4521 continue;
4522 }
4523 };
4524
4525 let mut buf = response_bytes;
4526 let response = match ListTransactionsResponse::decode_versioned(version, &mut buf) {
4527 Ok(r) => r,
4528 Err(e) => {
4529 warn!(
4530 "ListTransactions decode failed on broker {}: {}",
4531 broker.id, e
4532 );
4533 continue;
4534 }
4535 };
4536
4537 if !response.error_code.is_ok() {
4538 warn!(
4539 "ListTransactions error on broker {}: {:?}",
4540 broker.id, response.error_code
4541 );
4542 last_error = Some(format!("{:?}", response.error_code));
4543 }
4544
4545 for filter in response.unknown_state_filters {
4546 if !all_unknown_state_filters.contains(&filter) {
4547 all_unknown_state_filters.push(filter);
4548 }
4549 }
4550
4551 all_transactions.extend(response.transaction_states.into_iter().map(|s| {
4552 TransactionListEntry {
4553 transactional_id: s.transactional_id,
4554 producer_id: s.producer_id,
4555 state: s.transaction_state,
4556 }
4557 }));
4558 }
4559
4560 info!(
4561 "ListTransactions returned {} transaction(s) across {} broker(s)",
4562 all_transactions.len(),
4563 brokers.len()
4564 );
4565
4566 Ok(ListTransactionsResult {
4567 error: last_error,
4568 unknown_state_filters: all_unknown_state_filters,
4569 transactions: all_transactions,
4570 })
4571 }
4572
4573 pub async fn list_client_metrics_resources(&self) -> Result<Vec<String>> {
4591 let conn = self.get_any_broker_connection().await?;
4592
4593 let request = ListClientMetricsResourcesRequest;
4594
4595 let version = conn
4596 .negotiate_api_version(
4597 ApiKey::ListClientMetricsResources,
4598 versions::LIST_CLIENT_METRICS_RESOURCES_MAX,
4599 versions::LIST_CLIENT_METRICS_RESOURCES_MIN,
4600 )
4601 .await
4602 .ok_or_else(|| {
4603 KrafkaError::protocol(
4604 "no mutually supported ListClientMetricsResources API version",
4605 )
4606 })?;
4607
4608 let response_bytes = conn
4609 .send_request(ApiKey::ListClientMetricsResources, version, |buf| {
4610 request.encode_versioned(version, buf)
4611 })
4612 .await?;
4613
4614 let mut buf = response_bytes;
4615 let response = ListClientMetricsResourcesResponse::decode_versioned(version, &mut buf)?;
4616
4617 if !response.error_code.is_ok() {
4618 warn!(
4619 "ListClientMetricsResources error: {:?}",
4620 response.error_code
4621 );
4622 }
4623
4624 let names: Vec<String> = response
4625 .client_metrics_resources
4626 .into_iter()
4627 .map(|r| r.name)
4628 .collect();
4629
4630 info!(
4631 "ListClientMetricsResources returned {} resource(s)",
4632 names.len()
4633 );
4634 Ok(names)
4635 }
4636
4637 pub async fn write_txn_markers(
4670 &self,
4671 markers: &[WritableTxnMarker],
4672 ) -> Result<Vec<WriteTxnMarkersResult>> {
4673 self.check_not_closed()?;
4674 let conn = self.get_any_broker_connection().await?;
4675
4676 let request = WriteTxnMarkersRequest {
4677 markers: markers.to_vec(),
4678 };
4679
4680 let version = conn
4681 .negotiate_api_version(
4682 ApiKey::WriteTxnMarkers,
4683 versions::WRITE_TXN_MARKERS_MAX,
4684 versions::WRITE_TXN_MARKERS_MIN,
4685 )
4686 .await
4687 .ok_or_else(|| {
4688 KrafkaError::protocol("no mutually supported WriteTxnMarkers API version")
4689 })?;
4690
4691 let response_bytes = conn
4692 .send_request(ApiKey::WriteTxnMarkers, version, |buf| {
4693 request.encode_versioned(version, buf)
4694 })
4695 .await?;
4696
4697 let mut buf = response_bytes;
4698 let response = WriteTxnMarkersResponse::decode_versioned(version, &mut buf)?;
4699
4700 let results = response
4701 .markers
4702 .into_iter()
4703 .map(|m| WriteTxnMarkersResult {
4704 producer_id: m.producer_id,
4705 topics: m
4706 .topics
4707 .into_iter()
4708 .map(|t| WriteTxnMarkersTopicResult {
4709 name: t.name,
4710 partitions: t
4711 .partitions
4712 .into_iter()
4713 .map(|p| WriteTxnMarkersPartitionResult {
4714 partition_index: p.partition_index,
4715 error: if p.error_code.is_ok() {
4716 None
4717 } else {
4718 Some(format!("{:?}", p.error_code))
4719 },
4720 })
4721 .collect(),
4722 })
4723 .collect(),
4724 })
4725 .collect::<Vec<_>>();
4726
4727 info!(
4728 "WriteTxnMarkers returned {} marker result(s)",
4729 results.len()
4730 );
4731 Ok(results)
4732 }
4733
4734 pub async fn abort_transaction(
4747 &self,
4748 transactional_id: &str,
4749 ) -> Result<Vec<WriteTxnMarkersResult>> {
4750 self.check_not_closed()?;
4751
4752 let descriptions = self.describe_transactions(&[transactional_id]).await?;
4755 let desc = descriptions
4756 .first()
4757 .ok_or_else(|| KrafkaError::protocol("no transaction description returned"))?;
4758
4759 if let Some(ref err) = desc.error {
4760 return Err(KrafkaError::protocol(format!(
4761 "cannot abort transaction '{}': {}",
4762 transactional_id, err,
4763 )));
4764 }
4765
4766 let topics: Vec<WritableTxnMarkerTopic> = desc
4767 .topics
4768 .iter()
4769 .map(|t| WritableTxnMarkerTopic {
4770 name: t.topic.clone(),
4771 partition_indexes: t.partitions.clone(),
4772 })
4773 .collect();
4774
4775 let marker = WritableTxnMarker {
4776 producer_id: desc.producer_id,
4777 producer_epoch: desc.producer_epoch,
4778 transaction_result: false, topics,
4780 coordinator_epoch: 0, };
4782
4783 self.write_txn_markers(&[marker]).await
4784 }
4785
4786 pub async fn describe_metadata_quorum(
4805 &self,
4806 topic_partitions: &[(&str, &[i32])],
4807 ) -> Result<DescribeQuorumResult> {
4808 self.check_not_closed()?;
4809 let conn = self.get_any_broker_connection().await?;
4810
4811 let topics = topic_partitions
4812 .iter()
4813 .map(|(name, partitions)| DescribeQuorumTopicRequest {
4814 topic_name: (*name).to_string(),
4815 partitions: partitions
4816 .iter()
4817 .map(|&p| DescribeQuorumPartitionRequest { partition_index: p })
4818 .collect(),
4819 })
4820 .collect();
4821
4822 let request = DescribeQuorumRequest { topics };
4823
4824 let version = conn
4825 .negotiate_api_version(
4826 ApiKey::DescribeQuorum,
4827 versions::DESCRIBE_QUORUM_MAX,
4828 versions::DESCRIBE_QUORUM_MIN,
4829 )
4830 .await
4831 .ok_or_else(|| {
4832 KrafkaError::protocol("no mutually supported DescribeQuorum API version")
4833 })?;
4834
4835 let response_bytes = conn
4836 .send_request(ApiKey::DescribeQuorum, version, |buf| {
4837 request.encode_versioned(version, buf)
4838 })
4839 .await?;
4840
4841 let mut buf = response_bytes;
4842 let response = DescribeQuorumResponse::decode_versioned(version, &mut buf)?;
4843
4844 if !response.error_code.is_ok() {
4845 warn!("DescribeQuorum top-level error: {:?}", response.error_code);
4846 }
4847
4848 let topics = response
4849 .topics
4850 .into_iter()
4851 .map(|t| QuorumTopicResult {
4852 topic_name: t.topic_name,
4853 partitions: t
4854 .partitions
4855 .into_iter()
4856 .map(|p| QuorumPartitionResult {
4857 partition_index: p.partition_index,
4858 error: if p.error_code.is_ok() {
4859 None
4860 } else {
4861 Some(format!("{:?}", p.error_code))
4862 },
4863 leader_id: p.leader_id,
4864 leader_epoch: p.leader_epoch,
4865 high_watermark: p.high_watermark,
4866 current_voters: p
4867 .current_voters
4868 .into_iter()
4869 .map(|v| QuorumReplicaInfo {
4870 replica_id: v.replica_id,
4871 log_end_offset: v.log_end_offset,
4872 })
4873 .collect(),
4874 observers: p
4875 .observers
4876 .into_iter()
4877 .map(|o| QuorumReplicaInfo {
4878 replica_id: o.replica_id,
4879 log_end_offset: o.log_end_offset,
4880 })
4881 .collect(),
4882 })
4883 .collect(),
4884 })
4885 .collect::<Vec<_>>();
4886
4887 info!("DescribeQuorum returned {} topic(s)", topics.len());
4888
4889 Ok(DescribeQuorumResult {
4890 error: if response.error_code.is_ok() {
4891 None
4892 } else {
4893 Some(format!("{:?}", response.error_code))
4894 },
4895 topics,
4896 })
4897 }
4898}
4899
4900#[non_exhaustive]
4902#[derive(Debug, Clone)]
4903pub struct DescribeFeaturesResult {
4904 pub supported_features: Vec<SupportedFeature>,
4906 pub finalized_features: Vec<FinalizedFeature>,
4908 pub finalized_features_epoch: i64,
4910}
4911
4912#[non_exhaustive]
4914#[derive(Debug, Clone)]
4915pub struct UpdateFeaturesResult {
4916 pub results: Vec<UpdateFeatureResult>,
4918}
4919
4920#[non_exhaustive]
4922#[derive(Debug, Clone)]
4923pub struct UpdateFeatureResult {
4924 pub feature: String,
4926 pub error: Option<String>,
4928}
4929
4930#[non_exhaustive]
4934#[derive(Debug, Clone)]
4935pub struct LogDirInfo {
4936 pub broker_id: i32,
4938 pub log_dir: String,
4940 pub error: Option<String>,
4942 pub topics: Vec<LogDirTopicInfo>,
4944 pub total_bytes: i64,
4946 pub usable_bytes: i64,
4948}
4949
4950#[non_exhaustive]
4952#[derive(Debug, Clone)]
4953pub struct LogDirTopicInfo {
4954 pub name: String,
4956 pub partitions: Vec<LogDirPartitionInfo>,
4958}
4959
4960#[non_exhaustive]
4962#[derive(Debug, Clone)]
4963pub struct LogDirPartitionInfo {
4964 pub partition_index: i32,
4966 pub partition_size: i64,
4968 pub offset_lag: i64,
4970 pub is_future_key: bool,
4972}
4973
4974#[non_exhaustive]
4976#[derive(Debug, Clone)]
4977pub struct ElectLeadersResult {
4978 pub topic: String,
4980 pub partitions: Vec<ElectLeadersPartitionInfo>,
4982}
4983
4984#[non_exhaustive]
4986#[derive(Debug, Clone)]
4987pub struct ElectLeadersPartitionInfo {
4988 pub partition_id: i32,
4990 pub error: Option<String>,
4992}
4993
4994#[non_exhaustive]
4996#[derive(Debug, Clone)]
4997pub struct AlterReassignmentsResult {
4998 pub error: Option<String>,
5000 pub topics: Vec<ReassignmentTopicResult>,
5002}
5003
5004#[non_exhaustive]
5006#[derive(Debug, Clone)]
5007pub struct ReassignmentTopicResult {
5008 pub name: String,
5010 pub partitions: Vec<ReassignmentPartitionResult>,
5012}
5013
5014#[non_exhaustive]
5016#[derive(Debug, Clone)]
5017pub struct ReassignmentPartitionResult {
5018 pub partition_index: i32,
5020 pub error: Option<String>,
5022}
5023
5024#[non_exhaustive]
5026#[derive(Debug, Clone)]
5027pub struct PartitionReassignmentInfo {
5028 pub name: String,
5030 pub partitions: Vec<PartitionReassignmentPartitionInfo>,
5032}
5033
5034#[non_exhaustive]
5036#[derive(Debug, Clone)]
5037pub struct PartitionReassignmentPartitionInfo {
5038 pub partition_index: i32,
5040 pub replicas: Vec<i32>,
5042 pub adding_replicas: Vec<i32>,
5044 pub removing_replicas: Vec<i32>,
5046}
5047
5048#[non_exhaustive]
5054#[derive(Debug, Clone)]
5055pub struct AlterReplicaLogDirsResult {
5056 pub broker_id: i32,
5058 pub topic_name: String,
5060 pub partitions: Vec<AlterReplicaLogDirsPartitionResult>,
5062}
5063
5064#[non_exhaustive]
5066#[derive(Debug, Clone)]
5067pub struct AlterReplicaLogDirsPartitionResult {
5068 pub partition_index: i32,
5070 pub error: Option<String>,
5072}
5073
5074#[non_exhaustive]
5076#[derive(Debug, Clone)]
5077pub struct OffsetDeleteResult {
5078 pub error: Option<String>,
5080 pub topics: Vec<OffsetDeleteTopicResult>,
5082}
5083
5084#[non_exhaustive]
5086#[derive(Debug, Clone)]
5087pub struct OffsetDeleteTopicResult {
5088 pub name: String,
5090 pub partitions: Vec<OffsetDeletePartitionResult>,
5092}
5093
5094#[non_exhaustive]
5096#[derive(Debug, Clone)]
5097pub struct OffsetDeletePartitionResult {
5098 pub partition_index: i32,
5100 pub error: Option<String>,
5102}
5103
5104#[non_exhaustive]
5106#[derive(Debug, Clone)]
5107pub struct DescribeUserScramCredentialsResult {
5108 pub error: Option<String>,
5110 pub users: Vec<ScramCredentialUserResult>,
5112}
5113
5114#[non_exhaustive]
5116#[derive(Debug, Clone)]
5117pub struct ScramCredentialUserResult {
5118 pub name: String,
5120 pub error: Option<String>,
5122 pub credential_infos: Vec<ScramCredentialInfoResult>,
5124}
5125
5126#[non_exhaustive]
5128#[derive(Debug, Clone)]
5129pub struct ScramCredentialInfoResult {
5130 pub mechanism: ScramMechanism,
5132 pub iterations: i32,
5134}
5135
5136#[non_exhaustive]
5138#[derive(Debug, Clone)]
5139pub struct AlterScramCredentialResult {
5140 pub user: String,
5142 pub error: Option<String>,
5144}
5145
5146#[non_exhaustive]
5148#[derive(Debug, Clone)]
5149pub struct DescribeProducersTopicResult {
5150 pub name: String,
5152 pub partitions: Vec<DescribeProducersPartitionInfo>,
5154}
5155
5156#[non_exhaustive]
5158#[derive(Debug, Clone)]
5159pub struct DescribeProducersPartitionInfo {
5160 pub partition_index: i32,
5162 pub error: Option<String>,
5164 pub active_producers: Vec<ProducerStateInfo>,
5166}
5167
5168#[non_exhaustive]
5170#[derive(Debug, Clone)]
5171pub struct ProducerStateInfo {
5172 pub producer_id: i64,
5174 pub producer_epoch: i32,
5176 pub last_sequence: i32,
5178 pub last_timestamp: i64,
5180 pub coordinator_epoch: i32,
5182 pub current_txn_start_offset: i64,
5184}
5185
5186#[non_exhaustive]
5188#[derive(Debug, Clone)]
5189pub struct TransactionDescription {
5190 pub transactional_id: String,
5192 pub error: Option<String>,
5194 pub state: String,
5196 pub timeout_ms: i32,
5198 pub start_time_ms: i64,
5200 pub producer_id: i64,
5202 pub producer_epoch: i16,
5204 pub topics: Vec<TransactionTopicInfo>,
5206}
5207
5208#[non_exhaustive]
5210#[derive(Debug, Clone)]
5211pub struct TransactionTopicInfo {
5212 pub topic: String,
5214 pub partitions: Vec<i32>,
5216}
5217
5218#[non_exhaustive]
5220#[derive(Debug, Clone)]
5221pub struct ListTransactionsResult {
5222 pub error: Option<String>,
5224 pub unknown_state_filters: Vec<String>,
5226 pub transactions: Vec<TransactionListEntry>,
5228}
5229
5230#[non_exhaustive]
5232#[derive(Debug, Clone)]
5233pub struct TransactionListEntry {
5234 pub transactional_id: String,
5236 pub producer_id: i64,
5238 pub state: String,
5240}
5241
5242#[non_exhaustive]
5244#[derive(Debug, Clone)]
5245pub struct WriteTxnMarkersPartitionResult {
5246 pub partition_index: i32,
5248 pub error: Option<String>,
5250}
5251
5252#[non_exhaustive]
5254#[derive(Debug, Clone)]
5255pub struct WriteTxnMarkersTopicResult {
5256 pub name: String,
5258 pub partitions: Vec<WriteTxnMarkersPartitionResult>,
5260}
5261
5262#[non_exhaustive]
5264#[derive(Debug, Clone)]
5265pub struct WriteTxnMarkersResult {
5266 pub producer_id: i64,
5268 pub topics: Vec<WriteTxnMarkersTopicResult>,
5270}
5271
5272#[non_exhaustive]
5274#[derive(Debug, Clone)]
5275pub struct QuorumReplicaInfo {
5276 pub replica_id: i32,
5278 pub log_end_offset: i64,
5280}
5281
5282#[non_exhaustive]
5284#[derive(Debug, Clone)]
5285pub struct QuorumPartitionResult {
5286 pub partition_index: i32,
5288 pub error: Option<String>,
5290 pub leader_id: i32,
5292 pub leader_epoch: i32,
5294 pub high_watermark: i64,
5296 pub current_voters: Vec<QuorumReplicaInfo>,
5298 pub observers: Vec<QuorumReplicaInfo>,
5300}
5301
5302#[non_exhaustive]
5304#[derive(Debug, Clone)]
5305pub struct QuorumTopicResult {
5306 pub topic_name: String,
5308 pub partitions: Vec<QuorumPartitionResult>,
5310}
5311
5312#[non_exhaustive]
5314#[derive(Debug, Clone)]
5315pub struct DescribeQuorumResult {
5316 pub error: Option<String>,
5318 pub topics: Vec<QuorumTopicResult>,
5320}
5321
5322#[must_use = "builders do nothing until .build() is called"]
5324#[derive(Debug, Default)]
5325pub struct AdminClientBuilder {
5326 config: AdminConfig,
5327}
5328
5329impl AdminClientBuilder {
5330 pub fn bootstrap_servers(mut self, servers: impl Into<String>) -> Self {
5332 self.config.bootstrap_servers = servers.into();
5333 self
5334 }
5335
5336 pub fn client_id(mut self, id: impl Into<String>) -> Self {
5338 self.config.client_id = id.into();
5339 self
5340 }
5341
5342 pub fn request_timeout(mut self, timeout: Duration) -> Self {
5344 self.config.request_timeout = timeout;
5345 self
5346 }
5347
5348 pub fn auth(mut self, auth: AuthConfig) -> Self {
5363 self.config.auth = Some(auth);
5364 self
5365 }
5366
5367 #[cfg(feature = "socks5")]
5371 pub fn proxy(mut self, proxy: crate::network::ProxyConfig) -> Self {
5372 self.config.proxy = Some(proxy);
5373 self
5374 }
5375
5376 pub fn sasl_plain(
5378 mut self,
5379 username: impl Into<String>,
5380 password: impl Into<String>,
5381 ) -> crate::Result<Self> {
5382 self.config.auth = Some(AuthConfig::sasl_plain(username, password)?);
5383 Ok(self)
5384 }
5385
5386 pub fn sasl_scram_sha256(
5388 mut self,
5389 username: impl Into<String>,
5390 password: impl Into<String>,
5391 ) -> Self {
5392 self.config.auth = Some(AuthConfig::sasl_scram_sha256(username, password));
5393 self
5394 }
5395
5396 pub fn sasl_scram_sha512(
5398 mut self,
5399 username: impl Into<String>,
5400 password: impl Into<String>,
5401 ) -> Self {
5402 self.config.auth = Some(AuthConfig::sasl_scram_sha512(username, password));
5403 self
5404 }
5405
5406 pub fn sasl_oauthbearer(mut self, token: impl Into<String>) -> Self {
5411 self.config.auth = Some(AuthConfig::sasl_oauthbearer(token));
5412 self
5413 }
5414
5415 pub fn sasl_oauthbearer_provider(
5420 mut self,
5421 provider: impl crate::auth::OAuthBearerTokenProvider + 'static,
5422 ) -> Self {
5423 self.config.auth = Some(AuthConfig::sasl_oauthbearer_provider(provider));
5424 self
5425 }
5426
5427 pub async fn build(self) -> Result<AdminClient> {
5429 if self.config.bootstrap_servers.is_empty() {
5430 return Err(KrafkaError::config("bootstrap.servers is required"));
5431 }
5432
5433 let bootstrap_servers =
5434 crate::util::parse_bootstrap_servers(&self.config.bootstrap_servers)?;
5435
5436 let mut conn_config_builder = ConnectionConfig::builder()
5438 .client_id(&self.config.client_id)
5439 .request_timeout(self.config.request_timeout);
5440
5441 if let Some(ref auth) = self.config.auth {
5442 conn_config_builder = conn_config_builder.auth(auth.clone());
5443 }
5444
5445 #[cfg(feature = "socks5")]
5446 if let Some(ref proxy) = self.config.proxy {
5447 conn_config_builder = conn_config_builder.proxy(proxy.clone());
5448 }
5449
5450 let mut conn_config = conn_config_builder.build();
5451 conn_config.init_tls().await?;
5452
5453 let pool = Arc::new(ConnectionPool::new(conn_config));
5454 pool.start_idle_evictor();
5455 let metadata = Arc::new(
5456 ClusterMetadata::new(bootstrap_servers, pool.clone(), Duration::from_secs(300))
5457 .with_recovery_strategy(self.config.metadata_recovery_strategy)
5458 .with_rebootstrap_trigger(self.config.metadata_recovery_rebootstrap_trigger),
5459 );
5460
5461 metadata.refresh().await?;
5462
5463 info!(
5464 "AdminClient initialized with auth: {}",
5465 if self.config.auth.is_some() {
5466 "configured"
5467 } else {
5468 "none"
5469 }
5470 );
5471
5472 Ok(AdminClient {
5473 config: self.config,
5474 metadata,
5475 pool,
5476 closed: std::sync::atomic::AtomicBool::new(false),
5477 })
5478 }
5479}
5480
5481#[cfg(test)]
5482#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
5483mod tests {
5484 use super::*;
5485
5486 #[test]
5487 fn test_new_topic() {
5488 let topic = NewTopic::new("test-topic", 3, 2)
5489 .unwrap()
5490 .with_config("cleanup.policy", "compact")
5491 .with_config("retention.ms", "86400000");
5492
5493 assert_eq!(topic.name, "test-topic");
5494 assert_eq!(topic.num_partitions, 3);
5495 assert_eq!(topic.replication_factor, 2);
5496 assert_eq!(topic.configs.len(), 2);
5497 }
5498
5499 #[test]
5500 fn test_new_topic_validation() {
5501 assert!(NewTopic::new("t", 1, 1).is_ok());
5502 assert!(NewTopic::new("t", -1, -1).is_ok());
5503 assert!(NewTopic::new("t", 0, 1).is_err());
5504 assert!(NewTopic::new("t", -2, 1).is_err());
5505 assert!(NewTopic::new("t", 1, 0).is_err());
5506 assert!(NewTopic::new("t", 1, -2).is_err());
5507 }
5508
5509 #[test]
5513 fn test_new_topic_name_validation_rejects_empty_and_oversize() {
5514 let empty = NewTopic::new("", 1, 1).unwrap_err().to_string();
5515 assert!(
5516 empty.contains("topic name cannot be empty"),
5517 "expected empty-name error, got: {empty}"
5518 );
5519
5520 let oversize = "x".repeat(i16::MAX as usize + 1);
5521 let err = NewTopic::new(oversize, 1, 1).unwrap_err().to_string();
5522 assert!(
5523 err.contains("exceeds protocol limit"),
5524 "expected protocol-limit error, got: {err}"
5525 );
5526
5527 let max_ok = "x".repeat(i16::MAX as usize);
5529 assert!(NewTopic::new(max_ok, 1, 1).is_ok());
5530 }
5531
5532 #[test]
5533 fn test_admin_config_default() {
5534 let config = AdminConfig::default();
5535 assert_eq!(config.client_id, "krafka-admin");
5536 assert_eq!(config.request_timeout, Duration::from_secs(30));
5537 assert_eq!(
5538 config.metadata_recovery_strategy,
5539 MetadataRecoveryStrategy::Rebootstrap
5540 );
5541 }
5542
5543 #[test]
5544 fn test_describe_acls_result() {
5545 let result = DescribeAclsResult {
5546 error: None,
5547 bindings: vec![
5548 AclBinding::allow_read_topic("my-topic", "User:alice"),
5549 AclBinding::allow_write_topic("my-topic", "User:bob"),
5550 ],
5551 };
5552 assert!(result.error.is_none());
5553 assert_eq!(result.bindings.len(), 2);
5554 }
5555
5556 #[test]
5557 fn test_create_acls_result() {
5558 let result = CreateAclsResult {
5559 results: vec![
5560 CreateAclResult { error: None },
5561 CreateAclResult {
5562 error: Some("ACL already exists".to_string()),
5563 },
5564 ],
5565 };
5566 assert!(result.results[0].error.is_none());
5567 assert!(result.results[1].error.is_some());
5568 }
5569
5570 #[test]
5571 fn test_delete_acls_result() {
5572 let result = DeleteAclsResult {
5573 filter_results: vec![
5574 DeleteAclFilterResult {
5575 error: None,
5576 deleted_count: 3,
5577 },
5578 DeleteAclFilterResult {
5579 error: None,
5580 deleted_count: 0,
5581 },
5582 ],
5583 };
5584 assert_eq!(result.filter_results[0].deleted_count, 3);
5585 assert_eq!(result.filter_results[1].deleted_count, 0);
5586 }
5587
5588 #[test]
5589 fn test_acl_filter_builder() {
5590 use crate::protocol::{AclOperation, AclPatternType, AclPermissionType, AclResourceType};
5591
5592 let filter = AclFilter::all();
5594 assert_eq!(filter.resource_type, AclResourceType::Any);
5595 assert_eq!(filter.pattern_type, AclPatternType::Any);
5596 assert_eq!(filter.operation, AclOperation::Any);
5597 assert_eq!(filter.permission_type, AclPermissionType::Any);
5598 assert!(filter.resource_name.is_none());
5599 assert!(filter.principal.is_none());
5600 assert!(filter.host.is_none());
5601
5602 let filter = AclFilter::for_resource(AclResourceType::Topic, "my-topic");
5604 assert_eq!(filter.resource_type, AclResourceType::Topic);
5605 assert_eq!(filter.resource_name, Some("my-topic".to_string()));
5606
5607 let filter = AclFilter::for_principal("User:alice");
5609 assert_eq!(filter.principal, Some("User:alice".to_string()));
5610
5611 let filter = AclFilter::all()
5613 .resource_type(AclResourceType::Group)
5614 .resource_name("my-group")
5615 .pattern_type(AclPatternType::Literal)
5616 .principal("User:bob")
5617 .host("localhost")
5618 .operation(AclOperation::Read)
5619 .permission_type(AclPermissionType::Allow);
5620
5621 assert_eq!(filter.resource_type, AclResourceType::Group);
5622 assert_eq!(filter.resource_name, Some("my-group".to_string()));
5623 assert_eq!(filter.pattern_type, AclPatternType::Literal);
5624 assert_eq!(filter.principal, Some("User:bob".to_string()));
5625 assert_eq!(filter.host, Some("localhost".to_string()));
5626 assert_eq!(filter.operation, AclOperation::Read);
5627 assert_eq!(filter.permission_type, AclPermissionType::Allow);
5628 }
5629
5630 #[test]
5631 fn test_admin_builder_with_auth() {
5632 use crate::auth::AuthConfig;
5633
5634 let builder = AdminClient::builder()
5635 .bootstrap_servers("broker:9093")
5636 .auth(AuthConfig::sasl_plain("user", "pass").unwrap());
5637
5638 let auth = builder.config.auth.as_ref().unwrap();
5639 assert!(auth.requires_sasl());
5640 assert!(!auth.requires_tls());
5641 assert!(auth.plain_credentials.is_some());
5642 }
5643
5644 #[test]
5645 fn test_admin_builder_sasl_plain() {
5646 let builder = AdminClient::builder()
5647 .bootstrap_servers("broker:9093")
5648 .sasl_plain("admin", "admin-secret")
5649 .unwrap();
5650
5651 let auth = builder.config.auth.as_ref().unwrap();
5652 assert_eq!(
5653 auth.security_protocol,
5654 crate::auth::SecurityProtocol::SaslPlaintext
5655 );
5656 assert_eq!(auth.sasl_mechanism, Some(crate::auth::SaslMechanism::Plain));
5657 let creds = auth.plain_credentials.as_ref().unwrap();
5658 assert_eq!(creds.username, "admin");
5659 }
5660
5661 #[test]
5662 fn test_admin_builder_sasl_scram() {
5663 let builder = AdminClient::builder()
5664 .bootstrap_servers("broker:9093")
5665 .sasl_scram_sha256("user", "pass");
5666
5667 let auth = builder.config.auth.as_ref().unwrap();
5668 assert_eq!(
5669 auth.sasl_mechanism,
5670 Some(crate::auth::SaslMechanism::ScramSha256)
5671 );
5672 assert!(auth.scram_credentials.is_some());
5673
5674 let builder = AdminClient::builder()
5675 .bootstrap_servers("broker:9093")
5676 .sasl_scram_sha512("user", "pass");
5677
5678 let auth = builder.config.auth.as_ref().unwrap();
5679 assert_eq!(
5680 auth.sasl_mechanism,
5681 Some(crate::auth::SaslMechanism::ScramSha512)
5682 );
5683 assert!(auth.scram_credentials.is_some());
5684 }
5685
5686 #[test]
5687 fn test_admin_builder_aws_msk_iam() {
5688 use crate::auth::AuthConfig;
5689
5690 let auth = AuthConfig::aws_msk_iam("AKID", "secret", "us-east-1");
5691 let builder = AdminClient::builder()
5692 .bootstrap_servers("broker:9094")
5693 .auth(auth);
5694
5695 let auth = builder.config.auth.as_ref().unwrap();
5696 assert!(auth.requires_tls());
5697 assert!(auth.requires_sasl());
5698 assert_eq!(
5699 auth.sasl_mechanism,
5700 Some(crate::auth::SaslMechanism::AwsMskIam)
5701 );
5702 assert!(auth.aws_msk_iam_credentials.is_some());
5703 assert!(auth.tls_config.is_some());
5704 }
5705
5706 #[test]
5707 fn test_admin_builder_no_auth_by_default() {
5708 let builder = AdminClient::builder().bootstrap_servers("broker:9092");
5709
5710 assert!(builder.config.auth.is_none());
5711 }
5712
5713 #[test]
5714 fn test_consumer_group_description() {
5715 let desc = ConsumerGroupDescription {
5716 group_id: "my-group".to_string(),
5717 group_type: GroupType::Classic,
5718 state: "Stable".to_string(),
5719 protocol_type: Some("consumer".to_string()),
5720 assignor: Some("range".to_string()),
5721 group_epoch: None,
5722 assignment_epoch: None,
5723 members: vec![
5724 ConsumerGroupMember {
5725 member_id: "member-1".to_string(),
5726 instance_id: Some("instance-1".to_string()),
5727 rack_id: None,
5728 member_epoch: None,
5729 client_id: "my-client".to_string(),
5730 client_host: "/127.0.0.1".to_string(),
5731 subscribed_topic_names: None,
5732 subscribed_topic_regex: None,
5733 assignment: None,
5734 target_assignment: None,
5735 member_type: None,
5736 },
5737 ConsumerGroupMember {
5738 member_id: "member-2".to_string(),
5739 instance_id: None,
5740 rack_id: None,
5741 member_epoch: None,
5742 client_id: "other-client".to_string(),
5743 client_host: "/192.168.1.1".to_string(),
5744 subscribed_topic_names: None,
5745 subscribed_topic_regex: None,
5746 assignment: None,
5747 target_assignment: None,
5748 member_type: None,
5749 },
5750 ],
5751 authorized_operations: None,
5752 error: None,
5753 };
5754 assert_eq!(desc.group_id, "my-group");
5755 assert_eq!(desc.group_type, GroupType::Classic);
5756 assert_eq!(desc.state, "Stable");
5757 assert_eq!(desc.members.len(), 2);
5758 assert!(desc.members[0].instance_id.is_some());
5759 assert!(desc.members[1].instance_id.is_none());
5760 assert!(desc.error.is_none());
5761 }
5762
5763 #[test]
5764 fn test_consumer_group_listing() {
5765 let listing = ConsumerGroupListing {
5766 group_id: "my-group".to_string(),
5767 protocol_type: "consumer".to_string(),
5768 group_type: Some(GroupType::Consumer),
5769 };
5770 assert_eq!(listing.group_id, "my-group");
5771 assert_eq!(listing.protocol_type, "consumer");
5772 assert_eq!(listing.group_type, Some(GroupType::Consumer));
5773 }
5774
5775 #[test]
5776 fn test_delete_record_result() {
5777 let result = DeleteRecordResult {
5778 topic: "my-topic".to_string(),
5779 partition: 0,
5780 low_watermark: 100,
5781 error: None,
5782 };
5783 assert_eq!(result.topic, "my-topic");
5784 assert_eq!(result.partition, 0);
5785 assert_eq!(result.low_watermark, 100);
5786 assert!(result.error.is_none());
5787
5788 let result_err = DeleteRecordResult {
5789 topic: "my-topic".to_string(),
5790 partition: 1,
5791 low_watermark: -1,
5792 error: Some("NotLeaderOrFollower".to_string()),
5793 };
5794 assert!(result_err.error.is_some());
5795 }
5796
5797 #[test]
5798 fn test_leader_epoch_result() {
5799 let result = LeaderEpochResult {
5800 topic: "my-topic".to_string(),
5801 partition: 0,
5802 leader_epoch: 5,
5803 end_offset: 1000,
5804 error: None,
5805 };
5806 assert_eq!(result.topic, "my-topic");
5807 assert_eq!(result.leader_epoch, 5);
5808 assert_eq!(result.end_offset, 1000);
5809 assert!(result.error.is_none());
5810 }
5811
5812 #[test]
5813 fn test_admin_client_is_send_sync() {
5814 fn assert_send_sync<T: Send + Sync>() {}
5815 assert_send_sync::<AdminClient>();
5816 }
5817
5818 #[cfg(feature = "socks5")]
5819 #[test]
5820 fn test_admin_config_builder_proxy_round_trip() {
5821 let config = AdminConfig::builder()
5822 .proxy(crate::network::ProxyConfig::new("proxy:1080"))
5823 .build();
5824 let proxy = config.proxy().expect("proxy should be set");
5825 assert_eq!(proxy.address(), "proxy:1080");
5826 }
5827}