Skip to main content

krafka/
admin.rs

1//! Admin client for Apache Kafka.
2//!
3//! This module provides administrative operations:
4//! - Create/delete/describe topics
5//! - Create additional partitions
6//! - List topics and partitions
7//! - Describe and alter configurations
8//! - Manage ACLs
9//! - Describe cluster and broker configs
10//!
11//! # Authentication
12//!
13//! The admin client supports all authentication mechanisms:
14//! - PLAINTEXT (no auth)
15//! - TLS/SSL
16//! - SASL/PLAIN
17//! - SASL/SCRAM-SHA-256/512
18//! - SASL/AWS_MSK_IAM
19//!
20//! # Example
21//!
22//! ```rust,ignore
23//! use krafka::admin::AdminClient;
24//! use krafka::auth::AuthConfig;
25//!
26//! // With authentication
27//! let client = AdminClient::builder()
28//!     .bootstrap_servers("localhost:9092")
29//!     .auth(AuthConfig::sasl_plain("user", "password"))
30//!     .build()
31//!     .await?;
32//! ```
33
34use std::collections::{HashMap, HashSet};
35use std::sync::Arc;
36use std::time::Duration;
37
38use tracing::{info, warn};
39
40use crate::auth::AuthConfig;
41use crate::error::{KrafkaError, Result};
42use crate::metadata::{BrokerInfo, ClusterMetadata, TopicInfo};
43use crate::network::{ConnectionConfig, ConnectionPool};
44
45use crate::protocol::{
46    AclBinding, AclBindingFilter, AclOperation, AclPatternType, AclPermissionType, AclResourceType,
47    AlterConfigsRequest, AlterConfigsResponse, ApiKey, CreatableTopic, CreatableTopicConfig,
48    CreateAclsRequest, CreateAclsResponse, CreatePartitionsRequest, CreatePartitionsResponse,
49    CreatePartitionsTopic, CreateTopicsRequest, CreateTopicsResponse, DeleteAclsRequest,
50    DeleteAclsResponse, DeleteRecordsPartition, DeleteRecordsRequest, DeleteRecordsResponse,
51    DeleteRecordsTopic, DeleteTopicsRequest, DeleteTopicsResponse, DescribeAclsRequest,
52    DescribeAclsResponse, DescribeConfigsRequest, DescribeConfigsResponse, DescribeGroupsRequest,
53    DescribeGroupsResponse, FindCoordinatorRequest, FindCoordinatorResponse, ListGroupsRequest,
54    ListGroupsResponse, OffsetForLeaderEpochPartition, OffsetForLeaderEpochRequest,
55    OffsetForLeaderEpochResponse, OffsetForLeaderEpochTopic,
56};
57
58/// Configuration for creating a topic.
59#[derive(Debug, Clone)]
60pub struct NewTopic {
61    /// Topic name.
62    pub name: String,
63    /// Number of partitions.
64    pub num_partitions: i32,
65    /// Replication factor.
66    pub replication_factor: i16,
67    /// Topic configuration overrides.
68    pub configs: HashMap<String, String>,
69}
70
71impl NewTopic {
72    /// Create a new topic configuration.
73    pub fn new(name: impl Into<String>, num_partitions: i32, replication_factor: i16) -> Self {
74        Self {
75            name: name.into(),
76            num_partitions,
77            replication_factor,
78            configs: HashMap::new(),
79        }
80    }
81
82    /// Add a configuration option.
83    pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
84        self.configs.insert(key.into(), value.into());
85        self
86    }
87}
88
89/// Result of topic creation.
90#[derive(Debug, Clone)]
91pub struct CreateTopicResult {
92    /// Topic name.
93    pub name: String,
94    /// Error message if any.
95    pub error: Option<String>,
96}
97
98/// Result of topic deletion.
99#[derive(Debug, Clone)]
100pub struct DeleteTopicResult {
101    /// Topic name.
102    pub name: String,
103    /// Error message if any.
104    pub error: Option<String>,
105}
106
107/// Result of partition creation.
108#[derive(Debug, Clone)]
109pub struct CreatePartitionsResult {
110    /// Topic name.
111    pub topic: String,
112    /// Error message if any.
113    pub error: Option<String>,
114}
115
116/// A configuration entry.
117#[derive(Debug, Clone)]
118pub struct ConfigEntry {
119    /// Configuration name.
120    pub name: String,
121    /// Configuration value.
122    pub value: Option<String>,
123    /// Whether the config is read-only.
124    pub read_only: bool,
125    /// Whether this is the default value.
126    pub is_default: bool,
127    /// Whether the config is sensitive (passwords, etc.).
128    pub is_sensitive: bool,
129}
130
131/// Result of config alteration.
132#[derive(Debug, Clone)]
133pub struct AlterConfigResult {
134    /// Resource name.
135    pub resource_name: String,
136    /// Error message if any.
137    pub error: Option<String>,
138}
139
140/// Result of describing ACLs.
141#[derive(Debug, Clone)]
142pub struct DescribeAclsResult {
143    /// Error message if any.
144    pub error: Option<String>,
145    /// List of ACL bindings found.
146    pub bindings: Vec<AclBinding>,
147}
148
149/// Result of creating ACLs.
150#[derive(Debug, Clone)]
151pub struct CreateAclsResult {
152    /// Results for each ACL creation.
153    pub results: Vec<CreateAclResult>,
154}
155
156/// Result of a single ACL creation.
157#[derive(Debug, Clone)]
158pub struct CreateAclResult {
159    /// Error message if any.
160    pub error: Option<String>,
161}
162
163/// Result of deleting ACLs.
164#[derive(Debug, Clone)]
165pub struct DeleteAclsResult {
166    /// Results for each filter.
167    pub filter_results: Vec<DeleteAclFilterResult>,
168}
169
170/// Result for a single ACL filter deletion.
171#[derive(Debug, Clone)]
172pub struct DeleteAclFilterResult {
173    /// Error message if any.
174    pub error: Option<String>,
175    /// Number of ACLs deleted by this filter.
176    pub deleted_count: usize,
177}
178
179/// Description of a consumer group.
180#[derive(Debug, Clone)]
181pub struct ConsumerGroupDescription {
182    /// Group ID.
183    pub group_id: String,
184    /// Group state (e.g., "Stable", "Empty", "Dead", "PreparingRebalance").
185    pub state: String,
186    /// Protocol type (e.g., "consumer").
187    pub protocol_type: String,
188    /// Protocol (e.g., assignor name like "range", "roundrobin").
189    pub protocol: String,
190    /// Group members.
191    pub members: Vec<ConsumerGroupMember>,
192    /// Error message if any.
193    pub error: Option<String>,
194}
195
196/// A member of a consumer group.
197#[derive(Debug, Clone)]
198pub struct ConsumerGroupMember {
199    /// Member ID.
200    pub member_id: String,
201    /// Group instance ID (static membership).
202    pub group_instance_id: Option<String>,
203    /// Client ID.
204    pub client_id: String,
205    /// Client host.
206    pub client_host: String,
207}
208
209/// Listing entry for a consumer group.
210#[derive(Debug, Clone)]
211pub struct ConsumerGroupListing {
212    /// Group ID.
213    pub group_id: String,
214    /// Protocol type (e.g., "consumer").
215    pub protocol_type: String,
216}
217
218/// Result of deleting records from a partition.
219#[derive(Debug, Clone)]
220pub struct DeleteRecordResult {
221    /// Topic name.
222    pub topic: String,
223    /// Partition index.
224    pub partition: i32,
225    /// The new log start offset (low watermark) after deletion.
226    pub low_watermark: i64,
227    /// Error message if any.
228    pub error: Option<String>,
229}
230
231/// Result of an OffsetForLeaderEpoch request for a partition.
232#[derive(Debug, Clone)]
233pub struct LeaderEpochResult {
234    /// Topic name.
235    pub topic: String,
236    /// Partition index.
237    pub partition: i32,
238    /// The leader epoch.
239    pub leader_epoch: i32,
240    /// The end offset for this leader epoch.
241    pub end_offset: i64,
242    /// Error message if any.
243    pub error: Option<String>,
244}
245
246/// Filter for ACL operations (describe, delete).
247///
248/// This struct encapsulates all the filter parameters for ACL queries.
249#[derive(Debug, Clone, Default)]
250pub struct AclFilter {
251    /// Resource type to filter by.
252    pub resource_type: AclResourceType,
253    /// Resource name to filter by (None for any).
254    pub resource_name: Option<String>,
255    /// Pattern type for matching.
256    pub pattern_type: AclPatternType,
257    /// Principal to filter by (None for any).
258    pub principal: Option<String>,
259    /// Host to filter by (None for any).
260    pub host: Option<String>,
261    /// Operation to filter by.
262    pub operation: AclOperation,
263    /// Permission type to filter by.
264    pub permission_type: AclPermissionType,
265}
266
267impl AclFilter {
268    /// Create a new ACL filter that matches all ACLs.
269    pub fn all() -> Self {
270        Self::default()
271    }
272
273    /// Create a filter for a specific resource.
274    pub fn for_resource(resource_type: AclResourceType, resource_name: impl Into<String>) -> Self {
275        Self {
276            resource_type,
277            resource_name: Some(resource_name.into()),
278            ..Default::default()
279        }
280    }
281
282    /// Create a filter for a specific principal.
283    pub fn for_principal(principal: impl Into<String>) -> Self {
284        Self {
285            principal: Some(principal.into()),
286            ..Default::default()
287        }
288    }
289
290    /// Set the resource type.
291    pub fn resource_type(mut self, resource_type: AclResourceType) -> Self {
292        self.resource_type = resource_type;
293        self
294    }
295
296    /// Set the resource name.
297    pub fn resource_name(mut self, name: impl Into<String>) -> Self {
298        self.resource_name = Some(name.into());
299        self
300    }
301
302    /// Set the pattern type.
303    pub fn pattern_type(mut self, pattern_type: AclPatternType) -> Self {
304        self.pattern_type = pattern_type;
305        self
306    }
307
308    /// Set the principal.
309    pub fn principal(mut self, principal: impl Into<String>) -> Self {
310        self.principal = Some(principal.into());
311        self
312    }
313
314    /// Set the host.
315    pub fn host(mut self, host: impl Into<String>) -> Self {
316        self.host = Some(host.into());
317        self
318    }
319
320    /// Set the operation.
321    pub fn operation(mut self, operation: AclOperation) -> Self {
322        self.operation = operation;
323        self
324    }
325
326    /// Set the permission type.
327    pub fn permission_type(mut self, permission_type: AclPermissionType) -> Self {
328        self.permission_type = permission_type;
329        self
330    }
331}
332
333/// Admin client configuration.
334#[derive(Debug, Clone)]
335pub struct AdminConfig {
336    /// Bootstrap servers.
337    pub bootstrap_servers: String,
338    /// Client ID.
339    pub client_id: String,
340    /// Request timeout.
341    pub request_timeout: Duration,
342    /// Authentication configuration (optional).
343    pub auth: Option<AuthConfig>,
344}
345
346impl Default for AdminConfig {
347    fn default() -> Self {
348        Self {
349            bootstrap_servers: String::new(),
350            client_id: "krafka-admin".to_string(),
351            request_timeout: Duration::from_secs(30),
352            auth: None,
353        }
354    }
355}
356
357/// Kafka admin client for cluster administration.
358pub struct AdminClient {
359    /// Configuration.
360    config: AdminConfig,
361    /// Cluster metadata.
362    metadata: Arc<ClusterMetadata>,
363    /// Connection pool.
364    pool: Arc<ConnectionPool>,
365}
366
367impl AdminClient {
368    /// Create a new admin client builder.
369    pub fn builder() -> AdminClientBuilder {
370        AdminClientBuilder::default()
371    }
372
373    /// Create topics.
374    pub async fn create_topics(
375        &self,
376        topics: Vec<NewTopic>,
377        timeout: Duration,
378    ) -> Result<Vec<CreateTopicResult>> {
379        // Get any broker connection (controller for leadership, but any broker forwards)
380        let brokers = self.metadata.brokers().await;
381        if brokers.is_empty() {
382            return Err(KrafkaError::broker(
383                crate::error::ErrorCode::UnknownServerError,
384                "no brokers available",
385            ));
386        }
387
388        let broker = &brokers[0];
389        let conn = self
390            .pool
391            .get_connection_by_id(broker.id, &broker.address())
392            .await?;
393
394        // Build request
395        let request = CreateTopicsRequest {
396            topics: topics
397                .iter()
398                .map(|t| CreatableTopic {
399                    name: t.name.clone(),
400                    num_partitions: t.num_partitions,
401                    replication_factor: t.replication_factor,
402                    assignments: Vec::new(),
403                    configs: t
404                        .configs
405                        .iter()
406                        .map(|(k, v)| CreatableTopicConfig {
407                            name: k.clone(),
408                            value: Some(v.clone()),
409                        })
410                        .collect(),
411                })
412                .collect(),
413            timeout_ms: crate::util::duration_to_millis_i32(timeout),
414            validate_only: false,
415        };
416
417        // Send request
418        let response_bytes = conn
419            .send_request(ApiKey::CreateTopics, 0, |buf| {
420                request.encode_v0(buf);
421            })
422            .await?;
423
424        // Decode response
425        let mut buf = response_bytes;
426        let response = CreateTopicsResponse::decode_v0(&mut buf)?;
427
428        // Convert to results
429        let results = response
430            .topics
431            .into_iter()
432            .map(|t| CreateTopicResult {
433                name: t.name,
434                error: if t.error_code.is_ok() {
435                    None
436                } else {
437                    Some(format!("{:?}", t.error_code))
438                },
439            })
440            .collect();
441
442        info!("Created {} topics", topics.len());
443        Ok(results)
444    }
445
446    /// Delete topics.
447    pub async fn delete_topics(
448        &self,
449        topics: Vec<String>,
450        timeout: Duration,
451    ) -> Result<Vec<DeleteTopicResult>> {
452        // Get any broker connection
453        let brokers = self.metadata.brokers().await;
454        if brokers.is_empty() {
455            return Err(KrafkaError::broker(
456                crate::error::ErrorCode::UnknownServerError,
457                "no brokers available",
458            ));
459        }
460
461        let broker = &brokers[0];
462        let conn = self
463            .pool
464            .get_connection_by_id(broker.id, &broker.address())
465            .await?;
466
467        // Build request
468        let request = DeleteTopicsRequest {
469            topic_names: topics.clone(),
470            timeout_ms: crate::util::duration_to_millis_i32(timeout),
471        };
472
473        // Send request
474        let response_bytes = conn
475            .send_request(ApiKey::DeleteTopics, 0, |buf| {
476                request.encode_v0(buf);
477            })
478            .await?;
479
480        // Decode response
481        let mut buf = response_bytes;
482        let response = DeleteTopicsResponse::decode_v0(&mut buf)?;
483
484        // Convert to results
485        let results = response
486            .responses
487            .into_iter()
488            .map(|r| DeleteTopicResult {
489                name: r.name.unwrap_or_default(),
490                error: if r.error_code.is_ok() {
491                    None
492                } else {
493                    Some(format!("{:?}", r.error_code))
494                },
495            })
496            .collect();
497
498        info!("Deleted {} topics", topics.len());
499        Ok(results)
500    }
501
502    /// Increase the number of partitions for a topic.
503    ///
504    /// Note: Partition count can only be increased, never decreased.
505    pub async fn create_partitions(
506        &self,
507        topic: impl Into<String>,
508        new_total_count: i32,
509        timeout: Duration,
510    ) -> Result<CreatePartitionsResult> {
511        let topic_name = topic.into();
512
513        // Get any broker connection
514        let brokers = self.metadata.brokers().await;
515        if brokers.is_empty() {
516            return Err(KrafkaError::broker(
517                crate::error::ErrorCode::UnknownServerError,
518                "no brokers available",
519            ));
520        }
521
522        let broker = &brokers[0];
523        let conn = self
524            .pool
525            .get_connection_by_id(broker.id, &broker.address())
526            .await?;
527
528        // Build request
529        let request = CreatePartitionsRequest {
530            topics: vec![CreatePartitionsTopic {
531                name: topic_name.clone(),
532                count: new_total_count,
533                assignments: None,
534            }],
535            timeout_ms: crate::util::duration_to_millis_i32(timeout),
536            validate_only: false,
537        };
538
539        // Send request
540        let response_bytes = conn
541            .send_request(ApiKey::CreatePartitions, 0, |buf| {
542                request.encode_v0(buf);
543            })
544            .await?;
545
546        // Decode response
547        let mut buf = response_bytes;
548        let response = CreatePartitionsResponse::decode_v0(&mut buf)?;
549
550        let result = response
551            .results
552            .into_iter()
553            .next()
554            .map(|r| CreatePartitionsResult {
555                topic: r.name,
556                error: if r.error_code.is_ok() {
557                    None
558                } else {
559                    Some(
560                        r.error_message
561                            .unwrap_or_else(|| format!("{:?}", r.error_code)),
562                    )
563                },
564            })
565            .unwrap_or(CreatePartitionsResult {
566                topic: topic_name.clone(),
567                error: Some("no response received".to_string()),
568            });
569
570        if result.error.is_none() {
571            info!(
572                "Increased partitions for topic {} to {}",
573                topic_name, new_total_count
574            );
575        }
576        Ok(result)
577    }
578
579    /// Describe configuration for a topic.
580    pub async fn describe_topic_config(&self, topic: &str) -> Result<Vec<ConfigEntry>> {
581        let brokers = self.metadata.brokers().await;
582        if brokers.is_empty() {
583            return Err(KrafkaError::broker(
584                crate::error::ErrorCode::UnknownServerError,
585                "no brokers available",
586            ));
587        }
588
589        let broker = &brokers[0];
590        let conn = self
591            .pool
592            .get_connection_by_id(broker.id, &broker.address())
593            .await?;
594
595        let request = DescribeConfigsRequest::for_topic(topic);
596
597        let response_bytes = conn
598            .send_request(ApiKey::DescribeConfigs, 0, |buf| {
599                request.encode_v0(buf);
600            })
601            .await?;
602
603        let mut buf = response_bytes;
604        let response = DescribeConfigsResponse::decode_v0(&mut buf)?;
605
606        let entries = response
607            .results
608            .into_iter()
609            .flat_map(|r| {
610                if !r.error_code.is_ok() {
611                    return Vec::new();
612                }
613                r.configs
614                    .into_iter()
615                    .map(|c| ConfigEntry {
616                        name: c.name,
617                        value: c.value,
618                        read_only: c.read_only,
619                        is_default: c.is_default,
620                        is_sensitive: c.is_sensitive,
621                    })
622                    .collect()
623            })
624            .collect();
625
626        Ok(entries)
627    }
628
629    /// Describe configuration for a broker.
630    pub async fn describe_broker_config(&self, broker_id: i32) -> Result<Vec<ConfigEntry>> {
631        let brokers = self.metadata.brokers().await;
632        if brokers.is_empty() {
633            return Err(KrafkaError::broker(
634                crate::error::ErrorCode::UnknownServerError,
635                "no brokers available",
636            ));
637        }
638
639        let broker = &brokers[0];
640        let conn = self
641            .pool
642            .get_connection_by_id(broker.id, &broker.address())
643            .await?;
644
645        let request = DescribeConfigsRequest::for_broker(broker_id);
646
647        let response_bytes = conn
648            .send_request(ApiKey::DescribeConfigs, 0, |buf| {
649                request.encode_v0(buf);
650            })
651            .await?;
652
653        let mut buf = response_bytes;
654        let response = DescribeConfigsResponse::decode_v0(&mut buf)?;
655
656        let entries = response
657            .results
658            .into_iter()
659            .flat_map(|r| {
660                if !r.error_code.is_ok() {
661                    return Vec::new();
662                }
663                r.configs
664                    .into_iter()
665                    .map(|c| ConfigEntry {
666                        name: c.name,
667                        value: c.value,
668                        read_only: c.read_only,
669                        is_default: c.is_default,
670                        is_sensitive: c.is_sensitive,
671                    })
672                    .collect()
673            })
674            .collect();
675
676        Ok(entries)
677    }
678
679    /// Alter configuration for a topic.
680    ///
681    /// Note: This replaces all dynamic configs. To modify a single config,
682    /// first describe the topic config and then set all desired values.
683    pub async fn alter_topic_config(
684        &self,
685        topic: &str,
686        configs: HashMap<String, String>,
687    ) -> Result<AlterConfigResult> {
688        let brokers = self.metadata.brokers().await;
689        if brokers.is_empty() {
690            return Err(KrafkaError::broker(
691                crate::error::ErrorCode::UnknownServerError,
692                "no brokers available",
693            ));
694        }
695
696        let broker = &brokers[0];
697        let conn = self
698            .pool
699            .get_connection_by_id(broker.id, &broker.address())
700            .await?;
701
702        let request = AlterConfigsRequest::for_topic(topic, configs.into_iter().collect());
703
704        let response_bytes = conn
705            .send_request(ApiKey::AlterConfigs, 0, |buf| {
706                request.encode_v0(buf);
707            })
708            .await?;
709
710        let mut buf = response_bytes;
711        let response = AlterConfigsResponse::decode_v0(&mut buf)?;
712
713        let result = response
714            .results
715            .into_iter()
716            .next()
717            .map(|r| AlterConfigResult {
718                resource_name: r.resource_name,
719                error: if r.error_code.is_ok() {
720                    None
721                } else {
722                    Some(
723                        r.error_message
724                            .unwrap_or_else(|| format!("{:?}", r.error_code)),
725                    )
726                },
727            })
728            .unwrap_or(AlterConfigResult {
729                resource_name: topic.to_string(),
730                error: Some("no response received".to_string()),
731            });
732
733        if result.error.is_none() {
734            info!("Altered config for topic {}", topic);
735        }
736        Ok(result)
737    }
738
739    /// List all topics.
740    pub async fn list_topics(&self) -> Result<Vec<String>> {
741        self.metadata.refresh().await?;
742        Ok(self
743            .metadata
744            .topics()
745            .await
746            .into_iter()
747            .map(|t| t.name)
748            .collect())
749    }
750
751    /// Describe topics.
752    pub async fn describe_topics(&self, topics: &[String]) -> Result<Vec<TopicInfo>> {
753        self.metadata.refresh().await?;
754        let all_topics = self.metadata.topics().await;
755
756        let mut result = Vec::new();
757        for topic_name in topics {
758            if let Some(info) = all_topics.iter().find(|t| &t.name == topic_name) {
759                result.push(info.clone());
760            }
761        }
762        Ok(result)
763    }
764
765    /// Describe the cluster.
766    pub async fn describe_cluster(&self) -> Result<ClusterDescription> {
767        self.metadata.refresh().await?;
768        let brokers = self.metadata.brokers().await;
769        let controller = self.metadata.controller().await;
770
771        Ok(ClusterDescription {
772            controller_id: controller.map(|c| c.id),
773            brokers,
774        })
775    }
776
777    /// Get partition count for a topic.
778    pub async fn partition_count(&self, topic: &str) -> Result<Option<usize>> {
779        self.metadata.refresh().await?;
780        Ok(self.metadata.partition_count(topic).await)
781    }
782
783    /// Get the client ID.
784    pub fn client_id(&self) -> &str {
785        &self.config.client_id
786    }
787
788    /// Get the request timeout.
789    pub fn request_timeout(&self) -> Duration {
790        self.config.request_timeout
791    }
792
793    /// Describe ACLs matching a filter.
794    ///
795    /// # Arguments
796    /// * `resource_type` - Type of resource (Topic, Group, Cluster, etc.)
797    /// * `resource_name` - Name of the resource (use None to match any)
798    /// * `pattern_type` - Pattern type (Literal, Prefixed, Any)
799    /// * `principal` - Principal (use None to match any)
800    /// * `host` - Host (use None to match any)
801    /// * `operation` - Operation (use Any to match all)
802    /// * `permission_type` - Permission type (use Any to match all)
803    ///
804    /// # Example
805    /// ```ignore
806    /// // Describe all ACLs for a specific topic
807    /// let result = admin.describe_acls(
808    ///     AclResourceType::Topic,
809    ///     Some("my-topic"),
810    ///     AclPatternType::Literal,
811    ///     None,
812    ///     None,
813    ///     AclOperation::Any,
814    ///     AclPermissionType::Any,
815    /// ).await?;
816    /// ```
817    #[allow(clippy::too_many_arguments)]
818    pub async fn describe_acls(
819        &self,
820        resource_type: AclResourceType,
821        resource_name: Option<&str>,
822        pattern_type: AclPatternType,
823        principal: Option<&str>,
824        host: Option<&str>,
825        operation: AclOperation,
826        permission_type: AclPermissionType,
827    ) -> Result<DescribeAclsResult> {
828        self.describe_acls_with_filter(AclFilter {
829            resource_type,
830            resource_name: resource_name.map(|s| s.to_string()),
831            pattern_type,
832            principal: principal.map(|s| s.to_string()),
833            host: host.map(|s| s.to_string()),
834            operation,
835            permission_type,
836        })
837        .await
838    }
839
840    /// Describe ACLs matching a filter.
841    ///
842    /// This is the preferred method for describing ACLs as it uses a structured
843    /// filter object.
844    ///
845    /// # Example
846    /// ```ignore
847    /// // Describe all ACLs for a specific topic
848    /// let filter = AclFilter::for_resource(AclResourceType::Topic, "my-topic");
849    /// let result = admin.describe_acls_with_filter(filter).await?;
850    /// ```
851    pub async fn describe_acls_with_filter(&self, filter: AclFilter) -> Result<DescribeAclsResult> {
852        let brokers = self.metadata.brokers().await;
853        if brokers.is_empty() {
854            return Err(KrafkaError::broker(
855                crate::error::ErrorCode::UnknownServerError,
856                "no brokers available",
857            ));
858        }
859
860        let broker = &brokers[0];
861        let conn = self
862            .pool
863            .get_connection_by_id(broker.id, &broker.address())
864            .await?;
865
866        let request = DescribeAclsRequest {
867            resource_type: filter.resource_type,
868            resource_name: filter.resource_name,
869            pattern_type: filter.pattern_type,
870            principal: filter.principal,
871            host: filter.host,
872            operation: filter.operation,
873            permission_type: filter.permission_type,
874        };
875
876        let response_bytes = conn
877            .send_request(ApiKey::DescribeAcls, 0, |buf| {
878                request.encode_v0(buf);
879            })
880            .await?;
881
882        let mut buf = response_bytes;
883        let response = DescribeAclsResponse::decode_v0(&mut buf)?;
884
885        let bindings = response
886            .resources
887            .into_iter()
888            .flat_map(|res| {
889                res.acls.into_iter().map(move |acl| AclBinding {
890                    resource_type: res.resource_type,
891                    resource_name: res.resource_name.clone(),
892                    pattern_type: res.pattern_type,
893                    principal: acl.principal,
894                    host: acl.host,
895                    operation: acl.operation,
896                    permission_type: acl.permission_type,
897                })
898            })
899            .collect();
900
901        Ok(DescribeAclsResult {
902            error: if response.error_code.is_ok() {
903                None
904            } else {
905                Some(
906                    response
907                        .error_message
908                        .unwrap_or_else(|| format!("{:?}", response.error_code)),
909                )
910            },
911            bindings,
912        })
913    }
914
915    /// Create ACLs.
916    ///
917    /// # Arguments
918    /// * `acls` - List of ACL bindings to create
919    ///
920    /// # Example
921    /// ```ignore
922    /// let acl = AclBinding::allow_read_topic("my-topic", "User:alice");
923    /// admin.create_acls(vec![acl]).await?;
924    /// ```
925    pub async fn create_acls(&self, acls: Vec<AclBinding>) -> Result<CreateAclsResult> {
926        let brokers = self.metadata.brokers().await;
927        if brokers.is_empty() {
928            return Err(KrafkaError::broker(
929                crate::error::ErrorCode::UnknownServerError,
930                "no brokers available",
931            ));
932        }
933
934        let broker = &brokers[0];
935        let conn = self
936            .pool
937            .get_connection_by_id(broker.id, &broker.address())
938            .await?;
939
940        let request = CreateAclsRequest {
941            creations: acls.clone(),
942        };
943
944        let response_bytes = conn
945            .send_request(ApiKey::CreateAcls, 0, |buf| {
946                request.encode_v0(buf);
947            })
948            .await?;
949
950        let mut buf = response_bytes;
951        let response = CreateAclsResponse::decode_v0(&mut buf)?;
952
953        let results = response
954            .results
955            .into_iter()
956            .map(|r| CreateAclResult {
957                error: if r.error_code.is_ok() {
958                    None
959                } else {
960                    Some(
961                        r.error_message
962                            .unwrap_or_else(|| format!("{:?}", r.error_code)),
963                    )
964                },
965            })
966            .collect();
967
968        info!("Created {} ACLs", acls.len());
969        Ok(CreateAclsResult { results })
970    }
971
972    /// Delete ACLs matching the specified filters.
973    ///
974    /// # Arguments
975    /// * `filters` - List of ACL binding filters to match for deletion
976    ///
977    /// # Example
978    /// ```ignore
979    /// // Delete all ACLs for a specific topic
980    /// let filter = AclBindingFilter {
981    ///     resource_type: AclResourceType::Topic,
982    ///     resource_name: Some("my-topic".to_string()),
983    ///     pattern_type: AclPatternType::Literal,
984    ///     principal: None,
985    ///     host: None,
986    ///     operation: AclOperation::Any,
987    ///     permission_type: AclPermissionType::Any,
988    /// };
989    /// admin.delete_acls(vec![filter]).await?;
990    /// ```
991    pub async fn delete_acls(&self, filters: Vec<AclBindingFilter>) -> Result<DeleteAclsResult> {
992        let brokers = self.metadata.brokers().await;
993        if brokers.is_empty() {
994            return Err(KrafkaError::broker(
995                crate::error::ErrorCode::UnknownServerError,
996                "no brokers available",
997            ));
998        }
999
1000        let broker = &brokers[0];
1001        let conn = self
1002            .pool
1003            .get_connection_by_id(broker.id, &broker.address())
1004            .await?;
1005
1006        let request = DeleteAclsRequest {
1007            filters: filters.clone(),
1008        };
1009
1010        let response_bytes = conn
1011            .send_request(ApiKey::DeleteAcls, 0, |buf| {
1012                request.encode_v0(buf);
1013            })
1014            .await?;
1015
1016        let mut buf = response_bytes;
1017        let response = DeleteAclsResponse::decode_v0(&mut buf)?;
1018
1019        let filter_results = response
1020            .filter_results
1021            .into_iter()
1022            .map(|fr| DeleteAclFilterResult {
1023                error: if fr.error_code.is_ok() {
1024                    None
1025                } else {
1026                    Some(
1027                        fr.error_message
1028                            .unwrap_or_else(|| format!("{:?}", fr.error_code)),
1029                    )
1030                },
1031                deleted_count: fr.matching_acls.len(),
1032            })
1033            .collect();
1034
1035        info!("Deleted ACLs with {} filters", filters.len());
1036        Ok(DeleteAclsResult { filter_results })
1037    }
1038
1039    /// Describe consumer groups.
1040    ///
1041    /// Returns detailed information about each group including state, members,
1042    /// and partition assignments.
1043    ///
1044    /// # Example
1045    /// ```ignore
1046    /// let groups = admin.describe_groups(vec!["my-group".to_string()]).await?;
1047    /// for group in &groups {
1048    ///     println!("{}: state={}, members={}", group.group_id, group.state, group.members.len());
1049    /// }
1050    /// ```
1051    pub async fn describe_groups(
1052        &self,
1053        group_ids: Vec<String>,
1054    ) -> Result<Vec<ConsumerGroupDescription>> {
1055        let brokers = self.metadata.brokers().await;
1056        if brokers.is_empty() {
1057            return Err(KrafkaError::broker(
1058                crate::error::ErrorCode::UnknownServerError,
1059                "no brokers available",
1060            ));
1061        }
1062
1063        // Group the group_ids by their coordinator broker
1064        let mut coordinator_groups: HashMap<i32, Vec<String>> = HashMap::new();
1065        let any_broker = &brokers[0];
1066        let any_conn = self
1067            .pool
1068            .get_connection_by_id(any_broker.id, &any_broker.address())
1069            .await?;
1070
1071        for group_id in &group_ids {
1072            let coord_request = FindCoordinatorRequest::for_group(group_id);
1073            let coord_response_bytes = any_conn
1074                .send_request(ApiKey::FindCoordinator, 1, |buf| {
1075                    coord_request.encode_v1(buf);
1076                })
1077                .await?;
1078            let mut coord_buf = coord_response_bytes;
1079            let coord_response = FindCoordinatorResponse::decode_v1(&mut coord_buf)?;
1080
1081            if coord_response.error_code.is_ok() {
1082                coordinator_groups
1083                    .entry(coord_response.node_id)
1084                    .or_default()
1085                    .push(group_id.clone());
1086            } else {
1087                // Fallback to first broker if coordinator lookup fails
1088                warn!(
1089                    "FindCoordinator failed for group '{}': {:?}, falling back to broker {}",
1090                    group_id, coord_response.error_code, any_broker.id
1091                );
1092                coordinator_groups
1093                    .entry(any_broker.id)
1094                    .or_default()
1095                    .push(group_id.clone());
1096            }
1097        }
1098
1099        let mut all_results = Vec::new();
1100
1101        for (broker_id, groups) in coordinator_groups {
1102            // Find broker address
1103            let broker = brokers
1104                .iter()
1105                .find(|b| b.id == broker_id)
1106                .unwrap_or(any_broker);
1107            let conn = self
1108                .pool
1109                .get_connection_by_id(broker.id, &broker.address())
1110                .await?;
1111
1112            let request = DescribeGroupsRequest {
1113                groups: groups.clone(),
1114            };
1115
1116            let response_bytes = conn
1117                .send_request(ApiKey::DescribeGroups, 0, |buf| {
1118                    request.encode_v0(buf);
1119                })
1120                .await?;
1121
1122            let mut buf = response_bytes;
1123            let response = DescribeGroupsResponse::decode_v0(&mut buf)?;
1124
1125            for g in response.groups {
1126                all_results.push(ConsumerGroupDescription {
1127                    group_id: g.group_id,
1128                    state: g.group_state,
1129                    protocol_type: g.protocol_type,
1130                    protocol: g.protocol_data,
1131                    members: g
1132                        .members
1133                        .into_iter()
1134                        .map(|m| ConsumerGroupMember {
1135                            member_id: m.member_id,
1136                            group_instance_id: m.group_instance_id,
1137                            client_id: m.client_id,
1138                            client_host: m.client_host,
1139                        })
1140                        .collect(),
1141                    error: if g.error_code.is_ok() {
1142                        None
1143                    } else {
1144                        Some(format!("{:?}", g.error_code))
1145                    },
1146                });
1147            }
1148        }
1149
1150        info!("Described {} groups", all_results.len());
1151        Ok(all_results)
1152    }
1153
1154    /// List all consumer groups on the cluster.
1155    ///
1156    /// Returns a list of all consumer groups with their protocol types.
1157    ///
1158    /// # Example
1159    /// ```ignore
1160    /// let groups = admin.list_consumer_groups().await?;
1161    /// for group in &groups {
1162    ///     println!("{} ({})", group.group_id, group.protocol_type);
1163    /// }
1164    /// ```
1165    pub async fn list_consumer_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
1166        let brokers = self.metadata.brokers().await;
1167        if brokers.is_empty() {
1168            return Err(KrafkaError::broker(
1169                crate::error::ErrorCode::UnknownServerError,
1170                "no brokers available",
1171            ));
1172        }
1173
1174        // ListGroups returns groups managed by each broker, so we query all brokers
1175        let mut all_groups = Vec::new();
1176        let mut seen_ids = HashSet::new();
1177
1178        for broker in &brokers {
1179            let conn = match self
1180                .pool
1181                .get_connection_by_id(broker.id, &broker.address())
1182                .await
1183            {
1184                Ok(c) => c,
1185                Err(_) => continue, // Skip unreachable brokers
1186            };
1187
1188            let request = ListGroupsRequest;
1189
1190            let response_bytes = match conn
1191                .send_request(ApiKey::ListGroups, 0, |buf| {
1192                    request.encode_v0(buf);
1193                })
1194                .await
1195            {
1196                Ok(r) => r,
1197                Err(e) => {
1198                    warn!("ListGroups RPC failed on broker {}: {}", broker.id, e);
1199                    continue;
1200                }
1201            };
1202
1203            let mut buf = response_bytes;
1204            let response = match ListGroupsResponse::decode_v0(&mut buf) {
1205                Ok(r) => r,
1206                Err(e) => {
1207                    warn!("ListGroups decode failed on broker {}: {}", broker.id, e);
1208                    continue;
1209                }
1210            };
1211
1212            if !response.error_code.is_ok() {
1213                tracing::warn!(
1214                    "ListGroups error on broker {}: {:?}",
1215                    broker.id,
1216                    response.error_code
1217                );
1218                continue;
1219            }
1220
1221            for group in response.groups {
1222                if seen_ids.insert(group.group_id.clone()) {
1223                    all_groups.push(ConsumerGroupListing {
1224                        group_id: group.group_id,
1225                        protocol_type: group.protocol_type,
1226                    });
1227                }
1228            }
1229        }
1230
1231        info!("Listed {} consumer groups", all_groups.len());
1232        Ok(all_groups)
1233    }
1234
1235    /// Delete records from topic partitions before the specified offsets.
1236    ///
1237    /// Records with offsets less than the specified offset for each partition
1238    /// will be marked for deletion. This adjusts the log start offset.
1239    ///
1240    /// # Arguments
1241    /// * `offsets` - Map of (topic, partition) to the offset before which to delete
1242    /// * `timeout` - Operation timeout
1243    ///
1244    /// # Example
1245    /// ```ignore
1246    /// use std::collections::HashMap;
1247    /// let mut offsets = HashMap::new();
1248    /// offsets.insert(("my-topic".to_string(), 0), 100i64);
1249    /// let results = admin.delete_records(offsets, Duration::from_secs(30)).await?;
1250    /// ```
1251    pub async fn delete_records(
1252        &self,
1253        offsets: HashMap<(String, i32), i64>,
1254        timeout: Duration,
1255    ) -> Result<Vec<DeleteRecordResult>> {
1256        let brokers = self.metadata.brokers().await;
1257        if brokers.is_empty() {
1258            return Err(KrafkaError::broker(
1259                crate::error::ErrorCode::UnknownServerError,
1260                "no brokers available",
1261            ));
1262        }
1263
1264        // Group offsets by partition leader
1265        let mut leader_offsets: HashMap<i32, HashMap<String, Vec<DeleteRecordsPartition>>> =
1266            HashMap::new();
1267        let fallback_broker_id = brokers[0].id;
1268
1269        for ((topic, partition), offset) in &offsets {
1270            let leader_id = self
1271                .metadata
1272                .leader(topic, *partition)
1273                .await
1274                .unwrap_or(fallback_broker_id);
1275            leader_offsets
1276                .entry(leader_id)
1277                .or_default()
1278                .entry(topic.clone())
1279                .or_default()
1280                .push(DeleteRecordsPartition {
1281                    partition_index: *partition,
1282                    offset: *offset,
1283                });
1284        }
1285
1286        let mut results = Vec::new();
1287
1288        for (broker_id, topics_map) in leader_offsets {
1289            let broker = brokers
1290                .iter()
1291                .find(|b| b.id == broker_id)
1292                .unwrap_or(&brokers[0]);
1293            let conn = self
1294                .pool
1295                .get_connection_by_id(broker.id, &broker.address())
1296                .await?;
1297
1298            let request = DeleteRecordsRequest {
1299                topics: topics_map
1300                    .into_iter()
1301                    .map(|(name, partitions)| DeleteRecordsTopic { name, partitions })
1302                    .collect(),
1303                timeout_ms: crate::util::duration_to_millis_i32(timeout),
1304            };
1305
1306            let response_bytes = conn
1307                .send_request(ApiKey::DeleteRecords, 0, |buf| {
1308                    request.encode_v0(buf);
1309                })
1310                .await?;
1311
1312            let mut buf = response_bytes;
1313            let response = DeleteRecordsResponse::decode_v0(&mut buf)?;
1314
1315            for topic in response.topics {
1316                for partition in topic.partitions {
1317                    results.push(DeleteRecordResult {
1318                        topic: topic.name.clone(),
1319                        partition: partition.partition_index,
1320                        low_watermark: partition.low_watermark,
1321                        error: if partition.error_code.is_ok() {
1322                            None
1323                        } else {
1324                            Some(format!("{:?}", partition.error_code))
1325                        },
1326                    });
1327                }
1328            }
1329        }
1330
1331        info!("Deleted records from {} partition(s)", results.len());
1332        Ok(results)
1333    }
1334
1335    /// Get the end offset for each partition at the given leader epoch.
1336    ///
1337    /// This is used to detect log truncation after a leader change. For each
1338    /// topic-partition, the broker returns the end offset for the requested
1339    /// leader epoch. If the epoch is no longer valid, the broker returns
1340    /// the epoch and offset where the log was truncated.
1341    ///
1342    /// # Arguments
1343    /// * `partitions` - List of (topic, partition, leader_epoch) tuples
1344    ///
1345    /// # Example
1346    /// ```ignore
1347    /// let results = admin.offset_for_leader_epoch(
1348    ///     vec![("my-topic".to_string(), 0, 5)]
1349    /// ).await?;
1350    /// for r in &results {
1351    ///     println!("{}:{} epoch={} end_offset={}", r.topic, r.partition, r.leader_epoch, r.end_offset);
1352    /// }
1353    /// ```
1354    pub async fn offset_for_leader_epoch(
1355        &self,
1356        partitions: Vec<(String, i32, i32)>,
1357    ) -> Result<Vec<LeaderEpochResult>> {
1358        let brokers = self.metadata.brokers().await;
1359        if brokers.is_empty() {
1360            return Err(KrafkaError::broker(
1361                crate::error::ErrorCode::UnknownServerError,
1362                "no brokers available",
1363            ));
1364        }
1365
1366        // Group partitions by their leader broker
1367        let fallback_broker_id = brokers[0].id;
1368        let mut leader_partitions: HashMap<
1369            i32,
1370            HashMap<String, Vec<OffsetForLeaderEpochPartition>>,
1371        > = HashMap::new();
1372
1373        for (topic, partition, leader_epoch) in &partitions {
1374            let leader_id = self
1375                .metadata
1376                .leader(topic, *partition)
1377                .await
1378                .unwrap_or(fallback_broker_id);
1379            leader_partitions
1380                .entry(leader_id)
1381                .or_default()
1382                .entry(topic.clone())
1383                .or_default()
1384                .push(OffsetForLeaderEpochPartition {
1385                    partition: *partition,
1386                    current_leader_epoch: -1, // consumer perspective
1387                    leader_epoch: *leader_epoch,
1388                });
1389        }
1390
1391        let mut results = Vec::new();
1392
1393        for (broker_id, topics_map) in leader_partitions {
1394            let broker = brokers
1395                .iter()
1396                .find(|b| b.id == broker_id)
1397                .unwrap_or(&brokers[0]);
1398            let conn = self
1399                .pool
1400                .get_connection_by_id(broker.id, &broker.address())
1401                .await?;
1402
1403            let request = OffsetForLeaderEpochRequest {
1404                replica_id: -1, // -1 for consumer
1405                topics: topics_map
1406                    .into_iter()
1407                    .map(|(topic, partitions)| OffsetForLeaderEpochTopic { topic, partitions })
1408                    .collect(),
1409            };
1410
1411            let response_bytes = conn
1412                .send_request(ApiKey::OffsetForLeaderEpoch, 2, |buf| {
1413                    request.encode_v2(buf);
1414                })
1415                .await?;
1416
1417            let mut buf = response_bytes;
1418            let response = OffsetForLeaderEpochResponse::decode_v2(&mut buf)?;
1419
1420            for topic in response.topics {
1421                for partition in topic.partitions {
1422                    results.push(LeaderEpochResult {
1423                        topic: topic.topic.clone(),
1424                        partition: partition.partition,
1425                        leader_epoch: partition.leader_epoch,
1426                        end_offset: partition.end_offset,
1427                        error: if partition.error_code.is_ok() {
1428                            None
1429                        } else {
1430                            Some(format!("{:?}", partition.error_code))
1431                        },
1432                    });
1433                }
1434            }
1435        }
1436
1437        info!(
1438            "Got leader epoch offsets for {} partition(s)",
1439            results.len()
1440        );
1441        Ok(results)
1442    }
1443
1444    /// Get access to the connection pool.
1445    pub fn pool(&self) -> &Arc<ConnectionPool> {
1446        &self.pool
1447    }
1448}
1449
1450/// Description of a Kafka cluster.
1451#[derive(Debug, Clone)]
1452pub struct ClusterDescription {
1453    /// Controller broker ID.
1454    pub controller_id: Option<i32>,
1455    /// List of brokers.
1456    pub brokers: Vec<BrokerInfo>,
1457}
1458
1459/// Builder for AdminClient.
1460#[must_use = "builders do nothing until .build() is called"]
1461#[derive(Debug, Default)]
1462pub struct AdminClientBuilder {
1463    config: AdminConfig,
1464}
1465
1466impl AdminClientBuilder {
1467    /// Set bootstrap servers.
1468    pub fn bootstrap_servers(mut self, servers: impl Into<String>) -> Self {
1469        self.config.bootstrap_servers = servers.into();
1470        self
1471    }
1472
1473    /// Set client ID.
1474    pub fn client_id(mut self, id: impl Into<String>) -> Self {
1475        self.config.client_id = id.into();
1476        self
1477    }
1478
1479    /// Set request timeout.
1480    pub fn request_timeout(mut self, timeout: Duration) -> Self {
1481        self.config.request_timeout = timeout;
1482        self
1483    }
1484
1485    /// Set authentication configuration.
1486    ///
1487    /// # Example
1488    ///
1489    /// ```rust,ignore
1490    /// use krafka::admin::AdminClient;
1491    /// use krafka::auth::AuthConfig;
1492    ///
1493    /// let client = AdminClient::builder()
1494    ///     .bootstrap_servers("localhost:9092")
1495    ///     .auth(AuthConfig::sasl_plain("user", "password"))
1496    ///     .build()
1497    ///     .await?;
1498    /// ```
1499    pub fn auth(mut self, auth: AuthConfig) -> Self {
1500        self.config.auth = Some(auth);
1501        self
1502    }
1503
1504    /// Configure SASL/PLAIN authentication.
1505    pub fn sasl_plain(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
1506        self.config.auth = Some(AuthConfig::sasl_plain(username, password));
1507        self
1508    }
1509
1510    /// Configure SASL/SCRAM-SHA-256 authentication.
1511    pub fn sasl_scram_sha256(
1512        mut self,
1513        username: impl Into<String>,
1514        password: impl Into<String>,
1515    ) -> Self {
1516        self.config.auth = Some(AuthConfig::sasl_scram_sha256(username, password));
1517        self
1518    }
1519
1520    /// Configure SASL/SCRAM-SHA-512 authentication.
1521    pub fn sasl_scram_sha512(
1522        mut self,
1523        username: impl Into<String>,
1524        password: impl Into<String>,
1525    ) -> Self {
1526        self.config.auth = Some(AuthConfig::sasl_scram_sha512(username, password));
1527        self
1528    }
1529
1530    /// Configure SASL/OAUTHBEARER authentication.
1531    ///
1532    /// Uses a static OAuth 2.0 bearer token. For token refresh, reconnect
1533    /// with a new token. For SASL extensions, use `.auth(AuthConfig::sasl_oauthbearer_token(...))`.
1534    pub fn sasl_oauthbearer(mut self, token: impl Into<String>) -> Self {
1535        self.config.auth = Some(AuthConfig::sasl_oauthbearer(token));
1536        self
1537    }
1538
1539    /// Build the admin client.
1540    pub async fn build(self) -> Result<AdminClient> {
1541        if self.config.bootstrap_servers.is_empty() {
1542            return Err(KrafkaError::config("bootstrap_servers is required"));
1543        }
1544
1545        let bootstrap_servers: Vec<String> = self
1546            .config
1547            .bootstrap_servers
1548            .split(',')
1549            .map(|s| s.trim().to_string())
1550            .filter(|s| !s.is_empty())
1551            .collect();
1552
1553        if bootstrap_servers.is_empty() {
1554            return Err(KrafkaError::config("no bootstrap servers specified"));
1555        }
1556
1557        // Create connection config with client ID and auth
1558        let mut conn_config_builder = ConnectionConfig::builder()
1559            .client_id(&self.config.client_id)
1560            .request_timeout(self.config.request_timeout);
1561
1562        if let Some(ref auth) = self.config.auth {
1563            conn_config_builder = conn_config_builder.auth(auth.clone());
1564        }
1565
1566        let conn_config = conn_config_builder.build();
1567
1568        let pool = Arc::new(ConnectionPool::new(conn_config));
1569        let metadata = Arc::new(ClusterMetadata::new(
1570            bootstrap_servers,
1571            pool.clone(),
1572            Duration::from_secs(300),
1573        ));
1574
1575        metadata.refresh().await?;
1576
1577        info!(
1578            "AdminClient initialized with auth: {}",
1579            if self.config.auth.is_some() {
1580                "configured"
1581            } else {
1582                "none"
1583            }
1584        );
1585
1586        Ok(AdminClient {
1587            config: self.config,
1588            metadata,
1589            pool,
1590        })
1591    }
1592}
1593
1594#[cfg(test)]
1595mod tests {
1596    use super::*;
1597
1598    #[test]
1599    fn test_new_topic() {
1600        let topic = NewTopic::new("test-topic", 3, 2)
1601            .with_config("cleanup.policy", "compact")
1602            .with_config("retention.ms", "86400000");
1603
1604        assert_eq!(topic.name, "test-topic");
1605        assert_eq!(topic.num_partitions, 3);
1606        assert_eq!(topic.replication_factor, 2);
1607        assert_eq!(topic.configs.len(), 2);
1608    }
1609
1610    #[test]
1611    fn test_admin_config_default() {
1612        let config = AdminConfig::default();
1613        assert_eq!(config.client_id, "krafka-admin");
1614        assert_eq!(config.request_timeout, Duration::from_secs(30));
1615    }
1616
1617    #[test]
1618    fn test_describe_acls_result() {
1619        let result = DescribeAclsResult {
1620            error: None,
1621            bindings: vec![
1622                AclBinding::allow_read_topic("my-topic", "User:alice"),
1623                AclBinding::allow_write_topic("my-topic", "User:bob"),
1624            ],
1625        };
1626        assert!(result.error.is_none());
1627        assert_eq!(result.bindings.len(), 2);
1628    }
1629
1630    #[test]
1631    fn test_create_acls_result() {
1632        let result = CreateAclsResult {
1633            results: vec![
1634                CreateAclResult { error: None },
1635                CreateAclResult {
1636                    error: Some("ACL already exists".to_string()),
1637                },
1638            ],
1639        };
1640        assert!(result.results[0].error.is_none());
1641        assert!(result.results[1].error.is_some());
1642    }
1643
1644    #[test]
1645    fn test_delete_acls_result() {
1646        let result = DeleteAclsResult {
1647            filter_results: vec![
1648                DeleteAclFilterResult {
1649                    error: None,
1650                    deleted_count: 3,
1651                },
1652                DeleteAclFilterResult {
1653                    error: None,
1654                    deleted_count: 0,
1655                },
1656            ],
1657        };
1658        assert_eq!(result.filter_results[0].deleted_count, 3);
1659        assert_eq!(result.filter_results[1].deleted_count, 0);
1660    }
1661
1662    #[test]
1663    fn test_acl_filter_builder() {
1664        use crate::protocol::{AclOperation, AclPatternType, AclPermissionType, AclResourceType};
1665
1666        // Test default filter (matches everything)
1667        let filter = AclFilter::all();
1668        assert_eq!(filter.resource_type, AclResourceType::Any);
1669        assert_eq!(filter.pattern_type, AclPatternType::Any);
1670        assert_eq!(filter.operation, AclOperation::Any);
1671        assert_eq!(filter.permission_type, AclPermissionType::Any);
1672        assert!(filter.resource_name.is_none());
1673        assert!(filter.principal.is_none());
1674        assert!(filter.host.is_none());
1675
1676        // Test filter for specific resource
1677        let filter = AclFilter::for_resource(AclResourceType::Topic, "my-topic");
1678        assert_eq!(filter.resource_type, AclResourceType::Topic);
1679        assert_eq!(filter.resource_name, Some("my-topic".to_string()));
1680
1681        // Test filter for specific principal
1682        let filter = AclFilter::for_principal("User:alice");
1683        assert_eq!(filter.principal, Some("User:alice".to_string()));
1684
1685        // Test builder chain
1686        let filter = AclFilter::all()
1687            .resource_type(AclResourceType::Group)
1688            .resource_name("my-group")
1689            .pattern_type(AclPatternType::Literal)
1690            .principal("User:bob")
1691            .host("localhost")
1692            .operation(AclOperation::Read)
1693            .permission_type(AclPermissionType::Allow);
1694
1695        assert_eq!(filter.resource_type, AclResourceType::Group);
1696        assert_eq!(filter.resource_name, Some("my-group".to_string()));
1697        assert_eq!(filter.pattern_type, AclPatternType::Literal);
1698        assert_eq!(filter.principal, Some("User:bob".to_string()));
1699        assert_eq!(filter.host, Some("localhost".to_string()));
1700        assert_eq!(filter.operation, AclOperation::Read);
1701        assert_eq!(filter.permission_type, AclPermissionType::Allow);
1702    }
1703
1704    #[test]
1705    fn test_admin_builder_with_auth() {
1706        use crate::auth::AuthConfig;
1707
1708        let builder = AdminClient::builder()
1709            .bootstrap_servers("broker:9093")
1710            .auth(AuthConfig::sasl_plain("user", "pass"));
1711
1712        let auth = builder.config.auth.as_ref().unwrap();
1713        assert!(auth.requires_sasl());
1714        assert!(!auth.requires_tls());
1715        assert!(auth.plain_credentials.is_some());
1716    }
1717
1718    #[test]
1719    fn test_admin_builder_sasl_plain() {
1720        let builder = AdminClient::builder()
1721            .bootstrap_servers("broker:9093")
1722            .sasl_plain("admin", "admin-secret");
1723
1724        let auth = builder.config.auth.as_ref().unwrap();
1725        assert_eq!(
1726            auth.security_protocol,
1727            crate::auth::SecurityProtocol::SaslPlaintext
1728        );
1729        assert_eq!(auth.sasl_mechanism, Some(crate::auth::SaslMechanism::Plain));
1730        let creds = auth.plain_credentials.as_ref().unwrap();
1731        assert_eq!(creds.username, "admin");
1732    }
1733
1734    #[test]
1735    fn test_admin_builder_sasl_scram() {
1736        let builder = AdminClient::builder()
1737            .bootstrap_servers("broker:9093")
1738            .sasl_scram_sha256("user", "pass");
1739
1740        let auth = builder.config.auth.as_ref().unwrap();
1741        assert_eq!(
1742            auth.sasl_mechanism,
1743            Some(crate::auth::SaslMechanism::ScramSha256)
1744        );
1745        assert!(auth.scram_credentials.is_some());
1746
1747        let builder = AdminClient::builder()
1748            .bootstrap_servers("broker:9093")
1749            .sasl_scram_sha512("user", "pass");
1750
1751        let auth = builder.config.auth.as_ref().unwrap();
1752        assert_eq!(
1753            auth.sasl_mechanism,
1754            Some(crate::auth::SaslMechanism::ScramSha512)
1755        );
1756        assert!(auth.scram_credentials.is_some());
1757    }
1758
1759    #[test]
1760    fn test_admin_builder_aws_msk_iam() {
1761        use crate::auth::AuthConfig;
1762
1763        let auth = AuthConfig::aws_msk_iam("AKID", "secret", "us-east-1");
1764        let builder = AdminClient::builder()
1765            .bootstrap_servers("broker:9094")
1766            .auth(auth);
1767
1768        let auth = builder.config.auth.as_ref().unwrap();
1769        assert!(auth.requires_tls());
1770        assert!(auth.requires_sasl());
1771        assert_eq!(
1772            auth.sasl_mechanism,
1773            Some(crate::auth::SaslMechanism::AwsMskIam)
1774        );
1775        assert!(auth.aws_msk_iam_credentials.is_some());
1776        assert!(auth.tls_config.is_some());
1777    }
1778
1779    #[test]
1780    fn test_admin_builder_no_auth_by_default() {
1781        let builder = AdminClient::builder().bootstrap_servers("broker:9092");
1782
1783        assert!(builder.config.auth.is_none());
1784    }
1785
1786    #[test]
1787    fn test_consumer_group_description() {
1788        let desc = ConsumerGroupDescription {
1789            group_id: "my-group".to_string(),
1790            state: "Stable".to_string(),
1791            protocol_type: "consumer".to_string(),
1792            protocol: "range".to_string(),
1793            members: vec![
1794                ConsumerGroupMember {
1795                    member_id: "member-1".to_string(),
1796                    group_instance_id: Some("instance-1".to_string()),
1797                    client_id: "my-client".to_string(),
1798                    client_host: "/127.0.0.1".to_string(),
1799                },
1800                ConsumerGroupMember {
1801                    member_id: "member-2".to_string(),
1802                    group_instance_id: None,
1803                    client_id: "other-client".to_string(),
1804                    client_host: "/192.168.1.1".to_string(),
1805                },
1806            ],
1807            error: None,
1808        };
1809        assert_eq!(desc.group_id, "my-group");
1810        assert_eq!(desc.state, "Stable");
1811        assert_eq!(desc.members.len(), 2);
1812        assert!(desc.members[0].group_instance_id.is_some());
1813        assert!(desc.members[1].group_instance_id.is_none());
1814        assert!(desc.error.is_none());
1815    }
1816
1817    #[test]
1818    fn test_consumer_group_listing() {
1819        let listing = ConsumerGroupListing {
1820            group_id: "my-group".to_string(),
1821            protocol_type: "consumer".to_string(),
1822        };
1823        assert_eq!(listing.group_id, "my-group");
1824        assert_eq!(listing.protocol_type, "consumer");
1825    }
1826
1827    #[test]
1828    fn test_delete_record_result() {
1829        let result = DeleteRecordResult {
1830            topic: "my-topic".to_string(),
1831            partition: 0,
1832            low_watermark: 100,
1833            error: None,
1834        };
1835        assert_eq!(result.topic, "my-topic");
1836        assert_eq!(result.partition, 0);
1837        assert_eq!(result.low_watermark, 100);
1838        assert!(result.error.is_none());
1839
1840        let result_err = DeleteRecordResult {
1841            topic: "my-topic".to_string(),
1842            partition: 1,
1843            low_watermark: -1,
1844            error: Some("NotLeaderOrFollower".to_string()),
1845        };
1846        assert!(result_err.error.is_some());
1847    }
1848
1849    #[test]
1850    fn test_leader_epoch_result() {
1851        let result = LeaderEpochResult {
1852            topic: "my-topic".to_string(),
1853            partition: 0,
1854            leader_epoch: 5,
1855            end_offset: 1000,
1856            error: None,
1857        };
1858        assert_eq!(result.topic, "my-topic");
1859        assert_eq!(result.leader_epoch, 5);
1860        assert_eq!(result.end_offset, 1000);
1861        assert!(result.error.is_none());
1862    }
1863}