1use std::collections::{HashMap, HashSet};
35use std::sync::Arc;
36use std::time::Duration;
37
38use tracing::{info, warn};
39
40use crate::auth::AuthConfig;
41use crate::error::{KrafkaError, Result};
42use crate::metadata::{BrokerInfo, ClusterMetadata, TopicInfo};
43use crate::network::{ConnectionConfig, ConnectionPool};
44
45use crate::protocol::{
46 AclBinding, AclBindingFilter, AclOperation, AclPatternType, AclPermissionType, AclResourceType,
47 AlterConfigsRequest, AlterConfigsResponse, ApiKey, CreatableTopic, CreatableTopicConfig,
48 CreateAclsRequest, CreateAclsResponse, CreatePartitionsRequest, CreatePartitionsResponse,
49 CreatePartitionsTopic, CreateTopicsRequest, CreateTopicsResponse, DeleteAclsRequest,
50 DeleteAclsResponse, DeleteRecordsPartition, DeleteRecordsRequest, DeleteRecordsResponse,
51 DeleteRecordsTopic, DeleteTopicsRequest, DeleteTopicsResponse, DescribeAclsRequest,
52 DescribeAclsResponse, DescribeConfigsRequest, DescribeConfigsResponse, DescribeGroupsRequest,
53 DescribeGroupsResponse, FindCoordinatorRequest, FindCoordinatorResponse, ListGroupsRequest,
54 ListGroupsResponse, OffsetForLeaderEpochPartition, OffsetForLeaderEpochRequest,
55 OffsetForLeaderEpochResponse, OffsetForLeaderEpochTopic,
56};
57
58#[derive(Debug, Clone)]
60pub struct NewTopic {
61 pub name: String,
63 pub num_partitions: i32,
65 pub replication_factor: i16,
67 pub configs: HashMap<String, String>,
69}
70
71impl NewTopic {
72 pub fn new(name: impl Into<String>, num_partitions: i32, replication_factor: i16) -> Self {
74 Self {
75 name: name.into(),
76 num_partitions,
77 replication_factor,
78 configs: HashMap::new(),
79 }
80 }
81
82 pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
84 self.configs.insert(key.into(), value.into());
85 self
86 }
87}
88
89#[derive(Debug, Clone)]
91pub struct CreateTopicResult {
92 pub name: String,
94 pub error: Option<String>,
96}
97
98#[derive(Debug, Clone)]
100pub struct DeleteTopicResult {
101 pub name: String,
103 pub error: Option<String>,
105}
106
107#[derive(Debug, Clone)]
109pub struct CreatePartitionsResult {
110 pub topic: String,
112 pub error: Option<String>,
114}
115
116#[derive(Debug, Clone)]
118pub struct ConfigEntry {
119 pub name: String,
121 pub value: Option<String>,
123 pub read_only: bool,
125 pub is_default: bool,
127 pub is_sensitive: bool,
129}
130
131#[derive(Debug, Clone)]
133pub struct AlterConfigResult {
134 pub resource_name: String,
136 pub error: Option<String>,
138}
139
140#[derive(Debug, Clone)]
142pub struct DescribeAclsResult {
143 pub error: Option<String>,
145 pub bindings: Vec<AclBinding>,
147}
148
149#[derive(Debug, Clone)]
151pub struct CreateAclsResult {
152 pub results: Vec<CreateAclResult>,
154}
155
156#[derive(Debug, Clone)]
158pub struct CreateAclResult {
159 pub error: Option<String>,
161}
162
163#[derive(Debug, Clone)]
165pub struct DeleteAclsResult {
166 pub filter_results: Vec<DeleteAclFilterResult>,
168}
169
170#[derive(Debug, Clone)]
172pub struct DeleteAclFilterResult {
173 pub error: Option<String>,
175 pub deleted_count: usize,
177}
178
179#[derive(Debug, Clone)]
181pub struct ConsumerGroupDescription {
182 pub group_id: String,
184 pub state: String,
186 pub protocol_type: String,
188 pub protocol: String,
190 pub members: Vec<ConsumerGroupMember>,
192 pub error: Option<String>,
194}
195
196#[derive(Debug, Clone)]
198pub struct ConsumerGroupMember {
199 pub member_id: String,
201 pub group_instance_id: Option<String>,
203 pub client_id: String,
205 pub client_host: String,
207}
208
209#[derive(Debug, Clone)]
211pub struct ConsumerGroupListing {
212 pub group_id: String,
214 pub protocol_type: String,
216}
217
218#[derive(Debug, Clone)]
220pub struct DeleteRecordResult {
221 pub topic: String,
223 pub partition: i32,
225 pub low_watermark: i64,
227 pub error: Option<String>,
229}
230
231#[derive(Debug, Clone)]
233pub struct LeaderEpochResult {
234 pub topic: String,
236 pub partition: i32,
238 pub leader_epoch: i32,
240 pub end_offset: i64,
242 pub error: Option<String>,
244}
245
246#[derive(Debug, Clone, Default)]
250pub struct AclFilter {
251 pub resource_type: AclResourceType,
253 pub resource_name: Option<String>,
255 pub pattern_type: AclPatternType,
257 pub principal: Option<String>,
259 pub host: Option<String>,
261 pub operation: AclOperation,
263 pub permission_type: AclPermissionType,
265}
266
267impl AclFilter {
268 pub fn all() -> Self {
270 Self::default()
271 }
272
273 pub fn for_resource(resource_type: AclResourceType, resource_name: impl Into<String>) -> Self {
275 Self {
276 resource_type,
277 resource_name: Some(resource_name.into()),
278 ..Default::default()
279 }
280 }
281
282 pub fn for_principal(principal: impl Into<String>) -> Self {
284 Self {
285 principal: Some(principal.into()),
286 ..Default::default()
287 }
288 }
289
290 pub fn resource_type(mut self, resource_type: AclResourceType) -> Self {
292 self.resource_type = resource_type;
293 self
294 }
295
296 pub fn resource_name(mut self, name: impl Into<String>) -> Self {
298 self.resource_name = Some(name.into());
299 self
300 }
301
302 pub fn pattern_type(mut self, pattern_type: AclPatternType) -> Self {
304 self.pattern_type = pattern_type;
305 self
306 }
307
308 pub fn principal(mut self, principal: impl Into<String>) -> Self {
310 self.principal = Some(principal.into());
311 self
312 }
313
314 pub fn host(mut self, host: impl Into<String>) -> Self {
316 self.host = Some(host.into());
317 self
318 }
319
320 pub fn operation(mut self, operation: AclOperation) -> Self {
322 self.operation = operation;
323 self
324 }
325
326 pub fn permission_type(mut self, permission_type: AclPermissionType) -> Self {
328 self.permission_type = permission_type;
329 self
330 }
331}
332
333#[derive(Debug, Clone)]
335pub struct AdminConfig {
336 pub bootstrap_servers: String,
338 pub client_id: String,
340 pub request_timeout: Duration,
342 pub auth: Option<AuthConfig>,
344}
345
346impl Default for AdminConfig {
347 fn default() -> Self {
348 Self {
349 bootstrap_servers: String::new(),
350 client_id: "krafka-admin".to_string(),
351 request_timeout: Duration::from_secs(30),
352 auth: None,
353 }
354 }
355}
356
357pub struct AdminClient {
359 config: AdminConfig,
361 metadata: Arc<ClusterMetadata>,
363 pool: Arc<ConnectionPool>,
365}
366
367impl AdminClient {
368 pub fn builder() -> AdminClientBuilder {
370 AdminClientBuilder::default()
371 }
372
373 pub async fn create_topics(
375 &self,
376 topics: Vec<NewTopic>,
377 timeout: Duration,
378 ) -> Result<Vec<CreateTopicResult>> {
379 let brokers = self.metadata.brokers().await;
381 if brokers.is_empty() {
382 return Err(KrafkaError::broker(
383 crate::error::ErrorCode::UnknownServerError,
384 "no brokers available",
385 ));
386 }
387
388 let broker = &brokers[0];
389 let conn = self
390 .pool
391 .get_connection_by_id(broker.id, &broker.address())
392 .await?;
393
394 let request = CreateTopicsRequest {
396 topics: topics
397 .iter()
398 .map(|t| CreatableTopic {
399 name: t.name.clone(),
400 num_partitions: t.num_partitions,
401 replication_factor: t.replication_factor,
402 assignments: Vec::new(),
403 configs: t
404 .configs
405 .iter()
406 .map(|(k, v)| CreatableTopicConfig {
407 name: k.clone(),
408 value: Some(v.clone()),
409 })
410 .collect(),
411 })
412 .collect(),
413 timeout_ms: crate::util::duration_to_millis_i32(timeout),
414 validate_only: false,
415 };
416
417 let response_bytes = conn
419 .send_request(ApiKey::CreateTopics, 0, |buf| {
420 request.encode_v0(buf);
421 })
422 .await?;
423
424 let mut buf = response_bytes;
426 let response = CreateTopicsResponse::decode_v0(&mut buf)?;
427
428 let results = response
430 .topics
431 .into_iter()
432 .map(|t| CreateTopicResult {
433 name: t.name,
434 error: if t.error_code.is_ok() {
435 None
436 } else {
437 Some(format!("{:?}", t.error_code))
438 },
439 })
440 .collect();
441
442 info!("Created {} topics", topics.len());
443 Ok(results)
444 }
445
446 pub async fn delete_topics(
448 &self,
449 topics: Vec<String>,
450 timeout: Duration,
451 ) -> Result<Vec<DeleteTopicResult>> {
452 let brokers = self.metadata.brokers().await;
454 if brokers.is_empty() {
455 return Err(KrafkaError::broker(
456 crate::error::ErrorCode::UnknownServerError,
457 "no brokers available",
458 ));
459 }
460
461 let broker = &brokers[0];
462 let conn = self
463 .pool
464 .get_connection_by_id(broker.id, &broker.address())
465 .await?;
466
467 let request = DeleteTopicsRequest {
469 topic_names: topics.clone(),
470 timeout_ms: crate::util::duration_to_millis_i32(timeout),
471 };
472
473 let response_bytes = conn
475 .send_request(ApiKey::DeleteTopics, 0, |buf| {
476 request.encode_v0(buf);
477 })
478 .await?;
479
480 let mut buf = response_bytes;
482 let response = DeleteTopicsResponse::decode_v0(&mut buf)?;
483
484 let results = response
486 .responses
487 .into_iter()
488 .map(|r| DeleteTopicResult {
489 name: r.name.unwrap_or_default(),
490 error: if r.error_code.is_ok() {
491 None
492 } else {
493 Some(format!("{:?}", r.error_code))
494 },
495 })
496 .collect();
497
498 info!("Deleted {} topics", topics.len());
499 Ok(results)
500 }
501
502 pub async fn create_partitions(
506 &self,
507 topic: impl Into<String>,
508 new_total_count: i32,
509 timeout: Duration,
510 ) -> Result<CreatePartitionsResult> {
511 let topic_name = topic.into();
512
513 let brokers = self.metadata.brokers().await;
515 if brokers.is_empty() {
516 return Err(KrafkaError::broker(
517 crate::error::ErrorCode::UnknownServerError,
518 "no brokers available",
519 ));
520 }
521
522 let broker = &brokers[0];
523 let conn = self
524 .pool
525 .get_connection_by_id(broker.id, &broker.address())
526 .await?;
527
528 let request = CreatePartitionsRequest {
530 topics: vec![CreatePartitionsTopic {
531 name: topic_name.clone(),
532 count: new_total_count,
533 assignments: None,
534 }],
535 timeout_ms: crate::util::duration_to_millis_i32(timeout),
536 validate_only: false,
537 };
538
539 let response_bytes = conn
541 .send_request(ApiKey::CreatePartitions, 0, |buf| {
542 request.encode_v0(buf);
543 })
544 .await?;
545
546 let mut buf = response_bytes;
548 let response = CreatePartitionsResponse::decode_v0(&mut buf)?;
549
550 let result = response
551 .results
552 .into_iter()
553 .next()
554 .map(|r| CreatePartitionsResult {
555 topic: r.name,
556 error: if r.error_code.is_ok() {
557 None
558 } else {
559 Some(
560 r.error_message
561 .unwrap_or_else(|| format!("{:?}", r.error_code)),
562 )
563 },
564 })
565 .unwrap_or(CreatePartitionsResult {
566 topic: topic_name.clone(),
567 error: Some("no response received".to_string()),
568 });
569
570 if result.error.is_none() {
571 info!(
572 "Increased partitions for topic {} to {}",
573 topic_name, new_total_count
574 );
575 }
576 Ok(result)
577 }
578
579 pub async fn describe_topic_config(&self, topic: &str) -> Result<Vec<ConfigEntry>> {
581 let brokers = self.metadata.brokers().await;
582 if brokers.is_empty() {
583 return Err(KrafkaError::broker(
584 crate::error::ErrorCode::UnknownServerError,
585 "no brokers available",
586 ));
587 }
588
589 let broker = &brokers[0];
590 let conn = self
591 .pool
592 .get_connection_by_id(broker.id, &broker.address())
593 .await?;
594
595 let request = DescribeConfigsRequest::for_topic(topic);
596
597 let response_bytes = conn
598 .send_request(ApiKey::DescribeConfigs, 0, |buf| {
599 request.encode_v0(buf);
600 })
601 .await?;
602
603 let mut buf = response_bytes;
604 let response = DescribeConfigsResponse::decode_v0(&mut buf)?;
605
606 let entries = response
607 .results
608 .into_iter()
609 .flat_map(|r| {
610 if !r.error_code.is_ok() {
611 return Vec::new();
612 }
613 r.configs
614 .into_iter()
615 .map(|c| ConfigEntry {
616 name: c.name,
617 value: c.value,
618 read_only: c.read_only,
619 is_default: c.is_default,
620 is_sensitive: c.is_sensitive,
621 })
622 .collect()
623 })
624 .collect();
625
626 Ok(entries)
627 }
628
629 pub async fn describe_broker_config(&self, broker_id: i32) -> Result<Vec<ConfigEntry>> {
631 let brokers = self.metadata.brokers().await;
632 if brokers.is_empty() {
633 return Err(KrafkaError::broker(
634 crate::error::ErrorCode::UnknownServerError,
635 "no brokers available",
636 ));
637 }
638
639 let broker = &brokers[0];
640 let conn = self
641 .pool
642 .get_connection_by_id(broker.id, &broker.address())
643 .await?;
644
645 let request = DescribeConfigsRequest::for_broker(broker_id);
646
647 let response_bytes = conn
648 .send_request(ApiKey::DescribeConfigs, 0, |buf| {
649 request.encode_v0(buf);
650 })
651 .await?;
652
653 let mut buf = response_bytes;
654 let response = DescribeConfigsResponse::decode_v0(&mut buf)?;
655
656 let entries = response
657 .results
658 .into_iter()
659 .flat_map(|r| {
660 if !r.error_code.is_ok() {
661 return Vec::new();
662 }
663 r.configs
664 .into_iter()
665 .map(|c| ConfigEntry {
666 name: c.name,
667 value: c.value,
668 read_only: c.read_only,
669 is_default: c.is_default,
670 is_sensitive: c.is_sensitive,
671 })
672 .collect()
673 })
674 .collect();
675
676 Ok(entries)
677 }
678
679 pub async fn alter_topic_config(
684 &self,
685 topic: &str,
686 configs: HashMap<String, String>,
687 ) -> Result<AlterConfigResult> {
688 let brokers = self.metadata.brokers().await;
689 if brokers.is_empty() {
690 return Err(KrafkaError::broker(
691 crate::error::ErrorCode::UnknownServerError,
692 "no brokers available",
693 ));
694 }
695
696 let broker = &brokers[0];
697 let conn = self
698 .pool
699 .get_connection_by_id(broker.id, &broker.address())
700 .await?;
701
702 let request = AlterConfigsRequest::for_topic(topic, configs.into_iter().collect());
703
704 let response_bytes = conn
705 .send_request(ApiKey::AlterConfigs, 0, |buf| {
706 request.encode_v0(buf);
707 })
708 .await?;
709
710 let mut buf = response_bytes;
711 let response = AlterConfigsResponse::decode_v0(&mut buf)?;
712
713 let result = response
714 .results
715 .into_iter()
716 .next()
717 .map(|r| AlterConfigResult {
718 resource_name: r.resource_name,
719 error: if r.error_code.is_ok() {
720 None
721 } else {
722 Some(
723 r.error_message
724 .unwrap_or_else(|| format!("{:?}", r.error_code)),
725 )
726 },
727 })
728 .unwrap_or(AlterConfigResult {
729 resource_name: topic.to_string(),
730 error: Some("no response received".to_string()),
731 });
732
733 if result.error.is_none() {
734 info!("Altered config for topic {}", topic);
735 }
736 Ok(result)
737 }
738
739 pub async fn list_topics(&self) -> Result<Vec<String>> {
741 self.metadata.refresh().await?;
742 Ok(self
743 .metadata
744 .topics()
745 .await
746 .into_iter()
747 .map(|t| t.name)
748 .collect())
749 }
750
751 pub async fn describe_topics(&self, topics: &[String]) -> Result<Vec<TopicInfo>> {
753 self.metadata.refresh().await?;
754 let all_topics = self.metadata.topics().await;
755
756 let mut result = Vec::new();
757 for topic_name in topics {
758 if let Some(info) = all_topics.iter().find(|t| &t.name == topic_name) {
759 result.push(info.clone());
760 }
761 }
762 Ok(result)
763 }
764
765 pub async fn describe_cluster(&self) -> Result<ClusterDescription> {
767 self.metadata.refresh().await?;
768 let brokers = self.metadata.brokers().await;
769 let controller = self.metadata.controller().await;
770
771 Ok(ClusterDescription {
772 controller_id: controller.map(|c| c.id),
773 brokers,
774 })
775 }
776
777 pub async fn partition_count(&self, topic: &str) -> Result<Option<usize>> {
779 self.metadata.refresh().await?;
780 Ok(self.metadata.partition_count(topic).await)
781 }
782
783 pub fn client_id(&self) -> &str {
785 &self.config.client_id
786 }
787
788 pub fn request_timeout(&self) -> Duration {
790 self.config.request_timeout
791 }
792
793 #[allow(clippy::too_many_arguments)]
818 pub async fn describe_acls(
819 &self,
820 resource_type: AclResourceType,
821 resource_name: Option<&str>,
822 pattern_type: AclPatternType,
823 principal: Option<&str>,
824 host: Option<&str>,
825 operation: AclOperation,
826 permission_type: AclPermissionType,
827 ) -> Result<DescribeAclsResult> {
828 self.describe_acls_with_filter(AclFilter {
829 resource_type,
830 resource_name: resource_name.map(|s| s.to_string()),
831 pattern_type,
832 principal: principal.map(|s| s.to_string()),
833 host: host.map(|s| s.to_string()),
834 operation,
835 permission_type,
836 })
837 .await
838 }
839
840 pub async fn describe_acls_with_filter(&self, filter: AclFilter) -> Result<DescribeAclsResult> {
852 let brokers = self.metadata.brokers().await;
853 if brokers.is_empty() {
854 return Err(KrafkaError::broker(
855 crate::error::ErrorCode::UnknownServerError,
856 "no brokers available",
857 ));
858 }
859
860 let broker = &brokers[0];
861 let conn = self
862 .pool
863 .get_connection_by_id(broker.id, &broker.address())
864 .await?;
865
866 let request = DescribeAclsRequest {
867 resource_type: filter.resource_type,
868 resource_name: filter.resource_name,
869 pattern_type: filter.pattern_type,
870 principal: filter.principal,
871 host: filter.host,
872 operation: filter.operation,
873 permission_type: filter.permission_type,
874 };
875
876 let response_bytes = conn
877 .send_request(ApiKey::DescribeAcls, 0, |buf| {
878 request.encode_v0(buf);
879 })
880 .await?;
881
882 let mut buf = response_bytes;
883 let response = DescribeAclsResponse::decode_v0(&mut buf)?;
884
885 let bindings = response
886 .resources
887 .into_iter()
888 .flat_map(|res| {
889 res.acls.into_iter().map(move |acl| AclBinding {
890 resource_type: res.resource_type,
891 resource_name: res.resource_name.clone(),
892 pattern_type: res.pattern_type,
893 principal: acl.principal,
894 host: acl.host,
895 operation: acl.operation,
896 permission_type: acl.permission_type,
897 })
898 })
899 .collect();
900
901 Ok(DescribeAclsResult {
902 error: if response.error_code.is_ok() {
903 None
904 } else {
905 Some(
906 response
907 .error_message
908 .unwrap_or_else(|| format!("{:?}", response.error_code)),
909 )
910 },
911 bindings,
912 })
913 }
914
915 pub async fn create_acls(&self, acls: Vec<AclBinding>) -> Result<CreateAclsResult> {
926 let brokers = self.metadata.brokers().await;
927 if brokers.is_empty() {
928 return Err(KrafkaError::broker(
929 crate::error::ErrorCode::UnknownServerError,
930 "no brokers available",
931 ));
932 }
933
934 let broker = &brokers[0];
935 let conn = self
936 .pool
937 .get_connection_by_id(broker.id, &broker.address())
938 .await?;
939
940 let request = CreateAclsRequest {
941 creations: acls.clone(),
942 };
943
944 let response_bytes = conn
945 .send_request(ApiKey::CreateAcls, 0, |buf| {
946 request.encode_v0(buf);
947 })
948 .await?;
949
950 let mut buf = response_bytes;
951 let response = CreateAclsResponse::decode_v0(&mut buf)?;
952
953 let results = response
954 .results
955 .into_iter()
956 .map(|r| CreateAclResult {
957 error: if r.error_code.is_ok() {
958 None
959 } else {
960 Some(
961 r.error_message
962 .unwrap_or_else(|| format!("{:?}", r.error_code)),
963 )
964 },
965 })
966 .collect();
967
968 info!("Created {} ACLs", acls.len());
969 Ok(CreateAclsResult { results })
970 }
971
972 pub async fn delete_acls(&self, filters: Vec<AclBindingFilter>) -> Result<DeleteAclsResult> {
992 let brokers = self.metadata.brokers().await;
993 if brokers.is_empty() {
994 return Err(KrafkaError::broker(
995 crate::error::ErrorCode::UnknownServerError,
996 "no brokers available",
997 ));
998 }
999
1000 let broker = &brokers[0];
1001 let conn = self
1002 .pool
1003 .get_connection_by_id(broker.id, &broker.address())
1004 .await?;
1005
1006 let request = DeleteAclsRequest {
1007 filters: filters.clone(),
1008 };
1009
1010 let response_bytes = conn
1011 .send_request(ApiKey::DeleteAcls, 0, |buf| {
1012 request.encode_v0(buf);
1013 })
1014 .await?;
1015
1016 let mut buf = response_bytes;
1017 let response = DeleteAclsResponse::decode_v0(&mut buf)?;
1018
1019 let filter_results = response
1020 .filter_results
1021 .into_iter()
1022 .map(|fr| DeleteAclFilterResult {
1023 error: if fr.error_code.is_ok() {
1024 None
1025 } else {
1026 Some(
1027 fr.error_message
1028 .unwrap_or_else(|| format!("{:?}", fr.error_code)),
1029 )
1030 },
1031 deleted_count: fr.matching_acls.len(),
1032 })
1033 .collect();
1034
1035 info!("Deleted ACLs with {} filters", filters.len());
1036 Ok(DeleteAclsResult { filter_results })
1037 }
1038
1039 pub async fn describe_groups(
1052 &self,
1053 group_ids: Vec<String>,
1054 ) -> Result<Vec<ConsumerGroupDescription>> {
1055 let brokers = self.metadata.brokers().await;
1056 if brokers.is_empty() {
1057 return Err(KrafkaError::broker(
1058 crate::error::ErrorCode::UnknownServerError,
1059 "no brokers available",
1060 ));
1061 }
1062
1063 let mut coordinator_groups: HashMap<i32, Vec<String>> = HashMap::new();
1065 let any_broker = &brokers[0];
1066 let any_conn = self
1067 .pool
1068 .get_connection_by_id(any_broker.id, &any_broker.address())
1069 .await?;
1070
1071 for group_id in &group_ids {
1072 let coord_request = FindCoordinatorRequest::for_group(group_id);
1073 let coord_response_bytes = any_conn
1074 .send_request(ApiKey::FindCoordinator, 1, |buf| {
1075 coord_request.encode_v1(buf);
1076 })
1077 .await?;
1078 let mut coord_buf = coord_response_bytes;
1079 let coord_response = FindCoordinatorResponse::decode_v1(&mut coord_buf)?;
1080
1081 if coord_response.error_code.is_ok() {
1082 coordinator_groups
1083 .entry(coord_response.node_id)
1084 .or_default()
1085 .push(group_id.clone());
1086 } else {
1087 warn!(
1089 "FindCoordinator failed for group '{}': {:?}, falling back to broker {}",
1090 group_id, coord_response.error_code, any_broker.id
1091 );
1092 coordinator_groups
1093 .entry(any_broker.id)
1094 .or_default()
1095 .push(group_id.clone());
1096 }
1097 }
1098
1099 let mut all_results = Vec::new();
1100
1101 for (broker_id, groups) in coordinator_groups {
1102 let broker = brokers
1104 .iter()
1105 .find(|b| b.id == broker_id)
1106 .unwrap_or(any_broker);
1107 let conn = self
1108 .pool
1109 .get_connection_by_id(broker.id, &broker.address())
1110 .await?;
1111
1112 let request = DescribeGroupsRequest {
1113 groups: groups.clone(),
1114 };
1115
1116 let response_bytes = conn
1117 .send_request(ApiKey::DescribeGroups, 0, |buf| {
1118 request.encode_v0(buf);
1119 })
1120 .await?;
1121
1122 let mut buf = response_bytes;
1123 let response = DescribeGroupsResponse::decode_v0(&mut buf)?;
1124
1125 for g in response.groups {
1126 all_results.push(ConsumerGroupDescription {
1127 group_id: g.group_id,
1128 state: g.group_state,
1129 protocol_type: g.protocol_type,
1130 protocol: g.protocol_data,
1131 members: g
1132 .members
1133 .into_iter()
1134 .map(|m| ConsumerGroupMember {
1135 member_id: m.member_id,
1136 group_instance_id: m.group_instance_id,
1137 client_id: m.client_id,
1138 client_host: m.client_host,
1139 })
1140 .collect(),
1141 error: if g.error_code.is_ok() {
1142 None
1143 } else {
1144 Some(format!("{:?}", g.error_code))
1145 },
1146 });
1147 }
1148 }
1149
1150 info!("Described {} groups", all_results.len());
1151 Ok(all_results)
1152 }
1153
1154 pub async fn list_consumer_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
1166 let brokers = self.metadata.brokers().await;
1167 if brokers.is_empty() {
1168 return Err(KrafkaError::broker(
1169 crate::error::ErrorCode::UnknownServerError,
1170 "no brokers available",
1171 ));
1172 }
1173
1174 let mut all_groups = Vec::new();
1176 let mut seen_ids = HashSet::new();
1177
1178 for broker in &brokers {
1179 let conn = match self
1180 .pool
1181 .get_connection_by_id(broker.id, &broker.address())
1182 .await
1183 {
1184 Ok(c) => c,
1185 Err(_) => continue, };
1187
1188 let request = ListGroupsRequest;
1189
1190 let response_bytes = match conn
1191 .send_request(ApiKey::ListGroups, 0, |buf| {
1192 request.encode_v0(buf);
1193 })
1194 .await
1195 {
1196 Ok(r) => r,
1197 Err(e) => {
1198 warn!("ListGroups RPC failed on broker {}: {}", broker.id, e);
1199 continue;
1200 }
1201 };
1202
1203 let mut buf = response_bytes;
1204 let response = match ListGroupsResponse::decode_v0(&mut buf) {
1205 Ok(r) => r,
1206 Err(e) => {
1207 warn!("ListGroups decode failed on broker {}: {}", broker.id, e);
1208 continue;
1209 }
1210 };
1211
1212 if !response.error_code.is_ok() {
1213 tracing::warn!(
1214 "ListGroups error on broker {}: {:?}",
1215 broker.id,
1216 response.error_code
1217 );
1218 continue;
1219 }
1220
1221 for group in response.groups {
1222 if seen_ids.insert(group.group_id.clone()) {
1223 all_groups.push(ConsumerGroupListing {
1224 group_id: group.group_id,
1225 protocol_type: group.protocol_type,
1226 });
1227 }
1228 }
1229 }
1230
1231 info!("Listed {} consumer groups", all_groups.len());
1232 Ok(all_groups)
1233 }
1234
1235 pub async fn delete_records(
1252 &self,
1253 offsets: HashMap<(String, i32), i64>,
1254 timeout: Duration,
1255 ) -> Result<Vec<DeleteRecordResult>> {
1256 let brokers = self.metadata.brokers().await;
1257 if brokers.is_empty() {
1258 return Err(KrafkaError::broker(
1259 crate::error::ErrorCode::UnknownServerError,
1260 "no brokers available",
1261 ));
1262 }
1263
1264 let mut leader_offsets: HashMap<i32, HashMap<String, Vec<DeleteRecordsPartition>>> =
1266 HashMap::new();
1267 let fallback_broker_id = brokers[0].id;
1268
1269 for ((topic, partition), offset) in &offsets {
1270 let leader_id = self
1271 .metadata
1272 .leader(topic, *partition)
1273 .await
1274 .unwrap_or(fallback_broker_id);
1275 leader_offsets
1276 .entry(leader_id)
1277 .or_default()
1278 .entry(topic.clone())
1279 .or_default()
1280 .push(DeleteRecordsPartition {
1281 partition_index: *partition,
1282 offset: *offset,
1283 });
1284 }
1285
1286 let mut results = Vec::new();
1287
1288 for (broker_id, topics_map) in leader_offsets {
1289 let broker = brokers
1290 .iter()
1291 .find(|b| b.id == broker_id)
1292 .unwrap_or(&brokers[0]);
1293 let conn = self
1294 .pool
1295 .get_connection_by_id(broker.id, &broker.address())
1296 .await?;
1297
1298 let request = DeleteRecordsRequest {
1299 topics: topics_map
1300 .into_iter()
1301 .map(|(name, partitions)| DeleteRecordsTopic { name, partitions })
1302 .collect(),
1303 timeout_ms: crate::util::duration_to_millis_i32(timeout),
1304 };
1305
1306 let response_bytes = conn
1307 .send_request(ApiKey::DeleteRecords, 0, |buf| {
1308 request.encode_v0(buf);
1309 })
1310 .await?;
1311
1312 let mut buf = response_bytes;
1313 let response = DeleteRecordsResponse::decode_v0(&mut buf)?;
1314
1315 for topic in response.topics {
1316 for partition in topic.partitions {
1317 results.push(DeleteRecordResult {
1318 topic: topic.name.clone(),
1319 partition: partition.partition_index,
1320 low_watermark: partition.low_watermark,
1321 error: if partition.error_code.is_ok() {
1322 None
1323 } else {
1324 Some(format!("{:?}", partition.error_code))
1325 },
1326 });
1327 }
1328 }
1329 }
1330
1331 info!("Deleted records from {} partition(s)", results.len());
1332 Ok(results)
1333 }
1334
1335 pub async fn offset_for_leader_epoch(
1355 &self,
1356 partitions: Vec<(String, i32, i32)>,
1357 ) -> Result<Vec<LeaderEpochResult>> {
1358 let brokers = self.metadata.brokers().await;
1359 if brokers.is_empty() {
1360 return Err(KrafkaError::broker(
1361 crate::error::ErrorCode::UnknownServerError,
1362 "no brokers available",
1363 ));
1364 }
1365
1366 let fallback_broker_id = brokers[0].id;
1368 let mut leader_partitions: HashMap<
1369 i32,
1370 HashMap<String, Vec<OffsetForLeaderEpochPartition>>,
1371 > = HashMap::new();
1372
1373 for (topic, partition, leader_epoch) in &partitions {
1374 let leader_id = self
1375 .metadata
1376 .leader(topic, *partition)
1377 .await
1378 .unwrap_or(fallback_broker_id);
1379 leader_partitions
1380 .entry(leader_id)
1381 .or_default()
1382 .entry(topic.clone())
1383 .or_default()
1384 .push(OffsetForLeaderEpochPartition {
1385 partition: *partition,
1386 current_leader_epoch: -1, leader_epoch: *leader_epoch,
1388 });
1389 }
1390
1391 let mut results = Vec::new();
1392
1393 for (broker_id, topics_map) in leader_partitions {
1394 let broker = brokers
1395 .iter()
1396 .find(|b| b.id == broker_id)
1397 .unwrap_or(&brokers[0]);
1398 let conn = self
1399 .pool
1400 .get_connection_by_id(broker.id, &broker.address())
1401 .await?;
1402
1403 let request = OffsetForLeaderEpochRequest {
1404 replica_id: -1, topics: topics_map
1406 .into_iter()
1407 .map(|(topic, partitions)| OffsetForLeaderEpochTopic { topic, partitions })
1408 .collect(),
1409 };
1410
1411 let response_bytes = conn
1412 .send_request(ApiKey::OffsetForLeaderEpoch, 2, |buf| {
1413 request.encode_v2(buf);
1414 })
1415 .await?;
1416
1417 let mut buf = response_bytes;
1418 let response = OffsetForLeaderEpochResponse::decode_v2(&mut buf)?;
1419
1420 for topic in response.topics {
1421 for partition in topic.partitions {
1422 results.push(LeaderEpochResult {
1423 topic: topic.topic.clone(),
1424 partition: partition.partition,
1425 leader_epoch: partition.leader_epoch,
1426 end_offset: partition.end_offset,
1427 error: if partition.error_code.is_ok() {
1428 None
1429 } else {
1430 Some(format!("{:?}", partition.error_code))
1431 },
1432 });
1433 }
1434 }
1435 }
1436
1437 info!(
1438 "Got leader epoch offsets for {} partition(s)",
1439 results.len()
1440 );
1441 Ok(results)
1442 }
1443
1444 pub fn pool(&self) -> &Arc<ConnectionPool> {
1446 &self.pool
1447 }
1448}
1449
1450#[derive(Debug, Clone)]
1452pub struct ClusterDescription {
1453 pub controller_id: Option<i32>,
1455 pub brokers: Vec<BrokerInfo>,
1457}
1458
1459#[must_use = "builders do nothing until .build() is called"]
1461#[derive(Debug, Default)]
1462pub struct AdminClientBuilder {
1463 config: AdminConfig,
1464}
1465
1466impl AdminClientBuilder {
1467 pub fn bootstrap_servers(mut self, servers: impl Into<String>) -> Self {
1469 self.config.bootstrap_servers = servers.into();
1470 self
1471 }
1472
1473 pub fn client_id(mut self, id: impl Into<String>) -> Self {
1475 self.config.client_id = id.into();
1476 self
1477 }
1478
1479 pub fn request_timeout(mut self, timeout: Duration) -> Self {
1481 self.config.request_timeout = timeout;
1482 self
1483 }
1484
1485 pub fn auth(mut self, auth: AuthConfig) -> Self {
1500 self.config.auth = Some(auth);
1501 self
1502 }
1503
1504 pub fn sasl_plain(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
1506 self.config.auth = Some(AuthConfig::sasl_plain(username, password));
1507 self
1508 }
1509
1510 pub fn sasl_scram_sha256(
1512 mut self,
1513 username: impl Into<String>,
1514 password: impl Into<String>,
1515 ) -> Self {
1516 self.config.auth = Some(AuthConfig::sasl_scram_sha256(username, password));
1517 self
1518 }
1519
1520 pub fn sasl_scram_sha512(
1522 mut self,
1523 username: impl Into<String>,
1524 password: impl Into<String>,
1525 ) -> Self {
1526 self.config.auth = Some(AuthConfig::sasl_scram_sha512(username, password));
1527 self
1528 }
1529
1530 pub fn sasl_oauthbearer(mut self, token: impl Into<String>) -> Self {
1535 self.config.auth = Some(AuthConfig::sasl_oauthbearer(token));
1536 self
1537 }
1538
1539 pub async fn build(self) -> Result<AdminClient> {
1541 if self.config.bootstrap_servers.is_empty() {
1542 return Err(KrafkaError::config("bootstrap_servers is required"));
1543 }
1544
1545 let bootstrap_servers: Vec<String> = self
1546 .config
1547 .bootstrap_servers
1548 .split(',')
1549 .map(|s| s.trim().to_string())
1550 .filter(|s| !s.is_empty())
1551 .collect();
1552
1553 if bootstrap_servers.is_empty() {
1554 return Err(KrafkaError::config("no bootstrap servers specified"));
1555 }
1556
1557 let mut conn_config_builder = ConnectionConfig::builder()
1559 .client_id(&self.config.client_id)
1560 .request_timeout(self.config.request_timeout);
1561
1562 if let Some(ref auth) = self.config.auth {
1563 conn_config_builder = conn_config_builder.auth(auth.clone());
1564 }
1565
1566 let conn_config = conn_config_builder.build();
1567
1568 let pool = Arc::new(ConnectionPool::new(conn_config));
1569 let metadata = Arc::new(ClusterMetadata::new(
1570 bootstrap_servers,
1571 pool.clone(),
1572 Duration::from_secs(300),
1573 ));
1574
1575 metadata.refresh().await?;
1576
1577 info!(
1578 "AdminClient initialized with auth: {}",
1579 if self.config.auth.is_some() {
1580 "configured"
1581 } else {
1582 "none"
1583 }
1584 );
1585
1586 Ok(AdminClient {
1587 config: self.config,
1588 metadata,
1589 pool,
1590 })
1591 }
1592}
1593
1594#[cfg(test)]
1595mod tests {
1596 use super::*;
1597
1598 #[test]
1599 fn test_new_topic() {
1600 let topic = NewTopic::new("test-topic", 3, 2)
1601 .with_config("cleanup.policy", "compact")
1602 .with_config("retention.ms", "86400000");
1603
1604 assert_eq!(topic.name, "test-topic");
1605 assert_eq!(topic.num_partitions, 3);
1606 assert_eq!(topic.replication_factor, 2);
1607 assert_eq!(topic.configs.len(), 2);
1608 }
1609
1610 #[test]
1611 fn test_admin_config_default() {
1612 let config = AdminConfig::default();
1613 assert_eq!(config.client_id, "krafka-admin");
1614 assert_eq!(config.request_timeout, Duration::from_secs(30));
1615 }
1616
1617 #[test]
1618 fn test_describe_acls_result() {
1619 let result = DescribeAclsResult {
1620 error: None,
1621 bindings: vec![
1622 AclBinding::allow_read_topic("my-topic", "User:alice"),
1623 AclBinding::allow_write_topic("my-topic", "User:bob"),
1624 ],
1625 };
1626 assert!(result.error.is_none());
1627 assert_eq!(result.bindings.len(), 2);
1628 }
1629
1630 #[test]
1631 fn test_create_acls_result() {
1632 let result = CreateAclsResult {
1633 results: vec![
1634 CreateAclResult { error: None },
1635 CreateAclResult {
1636 error: Some("ACL already exists".to_string()),
1637 },
1638 ],
1639 };
1640 assert!(result.results[0].error.is_none());
1641 assert!(result.results[1].error.is_some());
1642 }
1643
1644 #[test]
1645 fn test_delete_acls_result() {
1646 let result = DeleteAclsResult {
1647 filter_results: vec![
1648 DeleteAclFilterResult {
1649 error: None,
1650 deleted_count: 3,
1651 },
1652 DeleteAclFilterResult {
1653 error: None,
1654 deleted_count: 0,
1655 },
1656 ],
1657 };
1658 assert_eq!(result.filter_results[0].deleted_count, 3);
1659 assert_eq!(result.filter_results[1].deleted_count, 0);
1660 }
1661
1662 #[test]
1663 fn test_acl_filter_builder() {
1664 use crate::protocol::{AclOperation, AclPatternType, AclPermissionType, AclResourceType};
1665
1666 let filter = AclFilter::all();
1668 assert_eq!(filter.resource_type, AclResourceType::Any);
1669 assert_eq!(filter.pattern_type, AclPatternType::Any);
1670 assert_eq!(filter.operation, AclOperation::Any);
1671 assert_eq!(filter.permission_type, AclPermissionType::Any);
1672 assert!(filter.resource_name.is_none());
1673 assert!(filter.principal.is_none());
1674 assert!(filter.host.is_none());
1675
1676 let filter = AclFilter::for_resource(AclResourceType::Topic, "my-topic");
1678 assert_eq!(filter.resource_type, AclResourceType::Topic);
1679 assert_eq!(filter.resource_name, Some("my-topic".to_string()));
1680
1681 let filter = AclFilter::for_principal("User:alice");
1683 assert_eq!(filter.principal, Some("User:alice".to_string()));
1684
1685 let filter = AclFilter::all()
1687 .resource_type(AclResourceType::Group)
1688 .resource_name("my-group")
1689 .pattern_type(AclPatternType::Literal)
1690 .principal("User:bob")
1691 .host("localhost")
1692 .operation(AclOperation::Read)
1693 .permission_type(AclPermissionType::Allow);
1694
1695 assert_eq!(filter.resource_type, AclResourceType::Group);
1696 assert_eq!(filter.resource_name, Some("my-group".to_string()));
1697 assert_eq!(filter.pattern_type, AclPatternType::Literal);
1698 assert_eq!(filter.principal, Some("User:bob".to_string()));
1699 assert_eq!(filter.host, Some("localhost".to_string()));
1700 assert_eq!(filter.operation, AclOperation::Read);
1701 assert_eq!(filter.permission_type, AclPermissionType::Allow);
1702 }
1703
1704 #[test]
1705 fn test_admin_builder_with_auth() {
1706 use crate::auth::AuthConfig;
1707
1708 let builder = AdminClient::builder()
1709 .bootstrap_servers("broker:9093")
1710 .auth(AuthConfig::sasl_plain("user", "pass"));
1711
1712 let auth = builder.config.auth.as_ref().unwrap();
1713 assert!(auth.requires_sasl());
1714 assert!(!auth.requires_tls());
1715 assert!(auth.plain_credentials.is_some());
1716 }
1717
1718 #[test]
1719 fn test_admin_builder_sasl_plain() {
1720 let builder = AdminClient::builder()
1721 .bootstrap_servers("broker:9093")
1722 .sasl_plain("admin", "admin-secret");
1723
1724 let auth = builder.config.auth.as_ref().unwrap();
1725 assert_eq!(
1726 auth.security_protocol,
1727 crate::auth::SecurityProtocol::SaslPlaintext
1728 );
1729 assert_eq!(auth.sasl_mechanism, Some(crate::auth::SaslMechanism::Plain));
1730 let creds = auth.plain_credentials.as_ref().unwrap();
1731 assert_eq!(creds.username, "admin");
1732 }
1733
1734 #[test]
1735 fn test_admin_builder_sasl_scram() {
1736 let builder = AdminClient::builder()
1737 .bootstrap_servers("broker:9093")
1738 .sasl_scram_sha256("user", "pass");
1739
1740 let auth = builder.config.auth.as_ref().unwrap();
1741 assert_eq!(
1742 auth.sasl_mechanism,
1743 Some(crate::auth::SaslMechanism::ScramSha256)
1744 );
1745 assert!(auth.scram_credentials.is_some());
1746
1747 let builder = AdminClient::builder()
1748 .bootstrap_servers("broker:9093")
1749 .sasl_scram_sha512("user", "pass");
1750
1751 let auth = builder.config.auth.as_ref().unwrap();
1752 assert_eq!(
1753 auth.sasl_mechanism,
1754 Some(crate::auth::SaslMechanism::ScramSha512)
1755 );
1756 assert!(auth.scram_credentials.is_some());
1757 }
1758
1759 #[test]
1760 fn test_admin_builder_aws_msk_iam() {
1761 use crate::auth::AuthConfig;
1762
1763 let auth = AuthConfig::aws_msk_iam("AKID", "secret", "us-east-1");
1764 let builder = AdminClient::builder()
1765 .bootstrap_servers("broker:9094")
1766 .auth(auth);
1767
1768 let auth = builder.config.auth.as_ref().unwrap();
1769 assert!(auth.requires_tls());
1770 assert!(auth.requires_sasl());
1771 assert_eq!(
1772 auth.sasl_mechanism,
1773 Some(crate::auth::SaslMechanism::AwsMskIam)
1774 );
1775 assert!(auth.aws_msk_iam_credentials.is_some());
1776 assert!(auth.tls_config.is_some());
1777 }
1778
1779 #[test]
1780 fn test_admin_builder_no_auth_by_default() {
1781 let builder = AdminClient::builder().bootstrap_servers("broker:9092");
1782
1783 assert!(builder.config.auth.is_none());
1784 }
1785
1786 #[test]
1787 fn test_consumer_group_description() {
1788 let desc = ConsumerGroupDescription {
1789 group_id: "my-group".to_string(),
1790 state: "Stable".to_string(),
1791 protocol_type: "consumer".to_string(),
1792 protocol: "range".to_string(),
1793 members: vec![
1794 ConsumerGroupMember {
1795 member_id: "member-1".to_string(),
1796 group_instance_id: Some("instance-1".to_string()),
1797 client_id: "my-client".to_string(),
1798 client_host: "/127.0.0.1".to_string(),
1799 },
1800 ConsumerGroupMember {
1801 member_id: "member-2".to_string(),
1802 group_instance_id: None,
1803 client_id: "other-client".to_string(),
1804 client_host: "/192.168.1.1".to_string(),
1805 },
1806 ],
1807 error: None,
1808 };
1809 assert_eq!(desc.group_id, "my-group");
1810 assert_eq!(desc.state, "Stable");
1811 assert_eq!(desc.members.len(), 2);
1812 assert!(desc.members[0].group_instance_id.is_some());
1813 assert!(desc.members[1].group_instance_id.is_none());
1814 assert!(desc.error.is_none());
1815 }
1816
1817 #[test]
1818 fn test_consumer_group_listing() {
1819 let listing = ConsumerGroupListing {
1820 group_id: "my-group".to_string(),
1821 protocol_type: "consumer".to_string(),
1822 };
1823 assert_eq!(listing.group_id, "my-group");
1824 assert_eq!(listing.protocol_type, "consumer");
1825 }
1826
1827 #[test]
1828 fn test_delete_record_result() {
1829 let result = DeleteRecordResult {
1830 topic: "my-topic".to_string(),
1831 partition: 0,
1832 low_watermark: 100,
1833 error: None,
1834 };
1835 assert_eq!(result.topic, "my-topic");
1836 assert_eq!(result.partition, 0);
1837 assert_eq!(result.low_watermark, 100);
1838 assert!(result.error.is_none());
1839
1840 let result_err = DeleteRecordResult {
1841 topic: "my-topic".to_string(),
1842 partition: 1,
1843 low_watermark: -1,
1844 error: Some("NotLeaderOrFollower".to_string()),
1845 };
1846 assert!(result_err.error.is_some());
1847 }
1848
1849 #[test]
1850 fn test_leader_epoch_result() {
1851 let result = LeaderEpochResult {
1852 topic: "my-topic".to_string(),
1853 partition: 0,
1854 leader_epoch: 5,
1855 end_offset: 1000,
1856 error: None,
1857 };
1858 assert_eq!(result.topic, "my-topic");
1859 assert_eq!(result.leader_epoch, 5);
1860 assert_eq!(result.end_offset, 1000);
1861 assert!(result.error.is_none());
1862 }
1863}