Skip to main content

krafka/
admin.rs

1//! Admin client for Apache Kafka.
2//!
3//! This module provides administrative operations:
4//! - Create/delete/describe topics
5//! - Create additional partitions
6//! - List topics and partitions
7//! - Describe and alter configurations
8//! - Manage ACLs
9//! - Describe cluster and broker configs
10//! - Manage delegation tokens (create, renew, expire, describe)
11//! - Describe and alter client quotas
12//! - Describe broker log directories
13//! - Move replicas between log directories
14//! - Elect partition leaders (preferred / unclean)
15//! - Alter and list partition reassignments
16//! - Delete committed offsets for consumer groups
17//! - Describe and alter user SCRAM credentials
18//! - Describe active producers on partitions
19//! - Describe and list transactions
20//! - List client metrics resources
21//! - Write transaction markers / abort stuck transactions
22//! - Describe KRaft quorum (voters, observers, leader)
23//!
24//! # Authentication
25//!
26//! The admin client supports all authentication mechanisms:
27//! - PLAINTEXT (no auth)
28//! - TLS/SSL
29//! - SASL/PLAIN
30//! - SASL/SCRAM-SHA-256/512
31//! - SASL/AWS_MSK_IAM
32//!
33//! # Example
34//!
35//! ```rust,ignore
36//! use krafka::admin::AdminClient;
37//! use krafka::auth::AuthConfig;
38//!
39//! // With authentication
40//! let client = AdminClient::builder()
41//!     .bootstrap_servers("localhost:9092")
42//!     .auth(AuthConfig::sasl_plain("user", "password")?)
43//!     .build()
44//!     .await?;
45//! ```
46
47use std::collections::{HashMap, HashSet};
48use std::sync::Arc;
49use std::time::Duration;
50
51use bytes::Bytes;
52use tracing::{debug, info, warn};
53
54use crate::auth::{AuthConfig, ScramMechanism};
55use crate::error::{KrafkaError, Result};
56use crate::metadata::{ClusterMetadata, MetadataRecoveryStrategy, TopicInfo};
57use crate::metrics::ConnectionMetrics;
58use crate::network::{BrokerConnection, ConnectionConfig, ConnectionPool};
59
60use crate::protocol::{
61    AclBinding, AclBindingFilter, AclOperation, AclPatternType, AclPermissionType, AclResourceType,
62    AlterClientQuotasRequest, AlterClientQuotasResponse, AlterConfigOp,
63    AlterPartitionReassignmentsRequest, AlterPartitionReassignmentsResponse, AlterQuotaEntity,
64    AlterQuotaEntry, AlterQuotaOp, AlterReplicaLogDir, AlterReplicaLogDirsRequest,
65    AlterReplicaLogDirsResponse, AlterUserScramCredentialsRequest,
66    AlterUserScramCredentialsResponse, AlterableConfig, ApiKey, ConsumerGroupDescribeRequest,
67    ConsumerGroupDescribeResponse, CreatableRenewer, CreatableTopic, CreatableTopicConfig,
68    CreateAclsRequest, CreateAclsResponse, CreateDelegationTokenRequest,
69    CreateDelegationTokenResponse, CreatePartitionsRequest, CreatePartitionsResponse,
70    CreatePartitionsTopic, CreateTopicsRequest, CreateTopicsResponse, DeleteAclsRequest,
71    DeleteAclsResponse, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsPartition,
72    DeleteRecordsRequest, DeleteRecordsResponse, DeleteRecordsTopic, DeleteTopicState,
73    DeleteTopicsRequest, DeleteTopicsResponse, DescribableLogDirTopic, DescribeAclsRequest,
74    DescribeAclsResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse,
75    DescribeClusterRequest, DescribeClusterResponse, DescribeConfigsResponse,
76    DescribeDelegationTokenOwner, DescribeDelegationTokenRequest, DescribeDelegationTokenResponse,
77    DescribeGroupsRequest, DescribeGroupsResponse, DescribeLogDirsRequest, DescribeLogDirsResponse,
78    DescribeProducersRequest, DescribeProducersResponse, DescribeProducersTopicRequest,
79    DescribeQuorumPartitionRequest, DescribeQuorumRequest, DescribeQuorumResponse,
80    DescribeQuorumTopicRequest, DescribeTopicPartitionsCursor, DescribeTopicPartitionsRequest,
81    DescribeTopicPartitionsResponse, DescribeTransactionsRequest, DescribeTransactionsResponse,
82    DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse, ElectLeadersRequest,
83    ElectLeadersResponse, ElectLeadersTopicPartitions, ElectionType, ExpireDelegationTokenRequest,
84    ExpireDelegationTokenResponse, FinalizedFeature, FindCoordinatorRequest,
85    FindCoordinatorResponse, IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse,
86    ListClientMetricsResourcesRequest, ListClientMetricsResourcesResponse, ListGroupsRequest,
87    ListGroupsResponse, ListPartitionReassignmentsRequest, ListPartitionReassignmentsResponse,
88    ListPartitionReassignmentsTopic, ListTransactionsRequest, ListTransactionsResponse,
89    OffsetDeletePartitionRequest, OffsetDeleteRequest, OffsetDeleteResponse,
90    OffsetDeleteTopicRequest, OffsetForLeaderEpochPartition, OffsetForLeaderEpochRequest,
91    OffsetForLeaderEpochResponse, OffsetForLeaderEpochTopic, QuotaFilterComponent,
92    ReassignableTopic, RenewDelegationTokenRequest, RenewDelegationTokenResponse,
93    ScramCredentialDeletion, ScramCredentialUpsertion, SupportedFeature, UpdateFeaturesRequest,
94    UpdateFeaturesResponse, VersionedDecode, VersionedEncode, WritableTxnMarker,
95    WritableTxnMarkerTopic, WriteTxnMarkersRequest, WriteTxnMarkersResponse, validate_topic_name,
96    validate_topic_names, versions,
97};
98
99// Re-export for use by callers of `describe_configs`.
100pub use crate::protocol::DescribeConfigsRequest;
101
102/// Default partition limit for DescribeTopicPartitions pagination.
103const DEFAULT_RESPONSE_PARTITION_LIMIT: i32 = 2000;
104
105/// Configuration for creating a topic.
106#[non_exhaustive]
107#[derive(Debug, Clone)]
108pub struct NewTopic {
109    /// Topic name.
110    pub name: String,
111    /// Number of partitions.
112    pub num_partitions: i32,
113    /// Replication factor.
114    pub replication_factor: i16,
115    /// Topic configuration overrides.
116    pub configs: HashMap<String, String>,
117}
118
119impl NewTopic {
120    /// Create a new topic configuration.
121    ///
122    /// # Arguments
123    ///
124    /// * `name` — Topic name. Must be non-empty and no longer than `i16::MAX`
125    ///   bytes (the Kafka wire-format limit).
126    /// * `num_partitions` — Must be positive or -1 (use broker default).
127    /// * `replication_factor` — Must be positive or -1 (use broker default).
128    ///
129    /// # Errors
130    ///
131    /// Returns an error if:
132    /// - `name` is empty or exceeds the `i16::MAX`-byte wire-format limit, or
133    /// - `num_partitions` or `replication_factor` is zero or less than -1.
134    pub fn new(
135        name: impl Into<String>,
136        num_partitions: i32,
137        replication_factor: i16,
138    ) -> Result<Self> {
139        let name = name.into();
140        validate_topic_name(&name)?;
141        if num_partitions == 0 || num_partitions < -1 {
142            return Err(KrafkaError::config(format!(
143                "num_partitions must be positive or -1, got {num_partitions}"
144            )));
145        }
146        if replication_factor == 0 || replication_factor < -1 {
147            return Err(KrafkaError::config(format!(
148                "replication_factor must be positive or -1, got {replication_factor}"
149            )));
150        }
151        Ok(Self {
152            name,
153            num_partitions,
154            replication_factor,
155            configs: HashMap::new(),
156        })
157    }
158
159    /// Add a configuration option.
160    pub fn with_config(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
161        self.configs.insert(key.into(), value.into());
162        self
163    }
164}
165
166/// Result of topic creation.
167#[non_exhaustive]
168#[derive(Debug, Clone)]
169pub struct CreateTopicResult {
170    /// Topic name.
171    pub name: String,
172    /// Error message if any.
173    pub error: Option<String>,
174}
175
176/// Result of topic deletion.
177#[non_exhaustive]
178#[derive(Debug, Clone)]
179pub struct DeleteTopicResult {
180    /// Topic name.
181    pub name: String,
182    /// Error message if any.
183    pub error: Option<String>,
184}
185
186/// Result of partition creation.
187#[non_exhaustive]
188#[derive(Debug, Clone)]
189pub struct CreatePartitionsResult {
190    /// Topic name.
191    pub topic: String,
192    /// Error message if any.
193    pub error: Option<String>,
194}
195
196/// A configuration entry.
197#[non_exhaustive]
198#[derive(Debug, Clone)]
199pub struct ConfigEntry {
200    /// Configuration name.
201    pub name: String,
202    /// Configuration value.
203    pub value: Option<String>,
204    /// Whether the config is read-only.
205    pub read_only: bool,
206    /// Whether this is the default value (v0 only; v1+ uses config_source).
207    pub is_default: bool,
208    /// Whether the config is sensitive (passwords, etc.).
209    pub is_sensitive: bool,
210    /// Configuration source (v1+). -1 if not available.
211    pub config_source: i8,
212    /// Synonyms for this configuration key (v1+).
213    pub synonyms: Vec<ConfigSynonymEntry>,
214    /// Configuration data type (v3+). 0 = UNKNOWN.
215    pub config_type: i8,
216    /// Configuration documentation (v3+).
217    pub documentation: Option<String>,
218}
219
220/// A synonym for a configuration key.
221#[non_exhaustive]
222#[derive(Debug, Clone)]
223pub struct ConfigSynonymEntry {
224    /// Synonym name.
225    pub name: String,
226    /// Synonym value.
227    pub value: Option<String>,
228    /// Synonym source.
229    pub source: i8,
230}
231
232/// Result of config alteration.
233#[non_exhaustive]
234#[derive(Debug, Clone)]
235pub struct AlterConfigResult {
236    /// Resource name.
237    pub resource_name: String,
238    /// Error message if any.
239    pub error: Option<String>,
240}
241
242/// Result of describing ACLs.
243#[non_exhaustive]
244#[derive(Debug, Clone)]
245pub struct DescribeAclsResult {
246    /// Error message if any.
247    pub error: Option<String>,
248    /// List of ACL bindings found.
249    pub bindings: Vec<AclBinding>,
250}
251
252/// Result of creating ACLs.
253#[non_exhaustive]
254#[derive(Debug, Clone)]
255pub struct CreateAclsResult {
256    /// Results for each ACL creation.
257    pub results: Vec<CreateAclResult>,
258}
259
260/// Result of a single ACL creation.
261#[non_exhaustive]
262#[derive(Debug, Clone)]
263pub struct CreateAclResult {
264    /// Error message if any.
265    pub error: Option<String>,
266}
267
268/// Result of deleting ACLs.
269#[non_exhaustive]
270#[derive(Debug, Clone)]
271pub struct DeleteAclsResult {
272    /// Results for each filter.
273    pub filter_results: Vec<DeleteAclFilterResult>,
274}
275
276/// Result for a single ACL filter deletion.
277#[non_exhaustive]
278#[derive(Debug, Clone)]
279pub struct DeleteAclFilterResult {
280    /// Error message if any.
281    pub error: Option<String>,
282    /// Number of ACLs deleted by this filter.
283    pub deleted_count: usize,
284}
285
286/// Consumer group type (classic vs. new consumer protocol from KIP-848).
287#[non_exhaustive]
288#[derive(Debug, Clone, PartialEq, Eq)]
289pub enum GroupType {
290    /// Classic consumer group protocol (JoinGroup/SyncGroup/Heartbeat).
291    Classic,
292    /// New consumer group protocol (KIP-848, ConsumerGroupHeartbeat).
293    Consumer,
294    /// Unknown or unrecognised group type.
295    Unknown(String),
296}
297
298impl std::fmt::Display for GroupType {
299    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
300        match self {
301            Self::Classic => f.write_str("classic"),
302            Self::Consumer => f.write_str("consumer"),
303            Self::Unknown(s) => f.write_str(s),
304        }
305    }
306}
307
308/// Description of a consumer group.
309///
310/// This is a unified result type that covers both classic-protocol groups
311/// (Key 15 — DescribeGroups) and KIP-848 consumer groups (Key 69 —
312/// ConsumerGroupDescribe). The method [`AdminClient::describe_consumer_groups()`]
313/// automatically detects each group's type and dispatches to the appropriate API.
314///
315/// Fields that are only available for one protocol type are wrapped in `Option`.
316#[non_exhaustive]
317#[derive(Debug, Clone)]
318pub struct ConsumerGroupDescription {
319    /// Group ID.
320    pub group_id: String,
321    /// Group type.
322    pub group_type: GroupType,
323    /// Group state (e.g., "Stable", "Empty", "Dead", "PreparingRebalance", "Assigning").
324    pub state: String,
325    /// Protocol type (classic groups only, e.g., "consumer").
326    pub protocol_type: Option<String>,
327    /// Protocol / assignor name. For classic groups, the partition assignment strategy
328    /// (e.g., "range", "roundrobin"). For KIP-848 groups, the server-side assignor
329    /// (e.g., "uniform").
330    pub assignor: Option<String>,
331    /// Group epoch (KIP-848 groups only).
332    pub group_epoch: Option<i32>,
333    /// Assignment epoch (KIP-848 groups only).
334    pub assignment_epoch: Option<i32>,
335    /// Group members.
336    pub members: Vec<ConsumerGroupMember>,
337    /// Authorized operations bitfield (KIP-848 groups only; -2^31 if not requested).
338    pub authorized_operations: Option<i32>,
339    /// Error message if any.
340    pub error: Option<String>,
341}
342
343/// A member of a consumer group.
344///
345/// Fields that are only available for KIP-848 groups are wrapped in `Option`.
346#[non_exhaustive]
347#[derive(Debug, Clone)]
348pub struct ConsumerGroupMember {
349    /// Member ID.
350    pub member_id: String,
351    /// Group instance ID / instance ID (static membership).
352    pub instance_id: Option<String>,
353    /// Rack ID (KIP-848 groups only).
354    pub rack_id: Option<String>,
355    /// Current member epoch (KIP-848 groups only).
356    pub member_epoch: Option<i32>,
357    /// Client ID.
358    pub client_id: String,
359    /// Client host.
360    pub client_host: String,
361    /// Subscribed topic names (KIP-848 groups only).
362    pub subscribed_topic_names: Option<Vec<String>>,
363    /// Subscribed topic regex (KIP-848 groups only).
364    pub subscribed_topic_regex: Option<String>,
365    /// Current partition assignment (KIP-848 groups only).
366    pub assignment: Option<Vec<TopicPartitionAssignment>>,
367    /// Target partition assignment (KIP-848 groups only).
368    pub target_assignment: Option<Vec<TopicPartitionAssignment>>,
369    /// Member type (KIP-848 groups only). -1 = unknown, 0 = classic, 1 = consumer.
370    pub member_type: Option<i8>,
371}
372
373/// Topic-partition assignment within a consumer group description.
374#[non_exhaustive]
375#[derive(Debug, Clone)]
376pub struct TopicPartitionAssignment {
377    /// Topic ID (UUID).
378    pub topic_id: [u8; 16],
379    /// Topic name.
380    pub topic_name: String,
381    /// Assigned partition indices.
382    pub partitions: Vec<i32>,
383}
384
385/// Listing entry for a consumer group.
386#[non_exhaustive]
387#[derive(Debug, Clone)]
388pub struct ConsumerGroupListing {
389    /// Group ID.
390    pub group_id: String,
391    /// Protocol type (e.g., "consumer").
392    pub protocol_type: String,
393    /// Group type (Kafka 3.7+, KIP-848). `None` if the broker is too old.
394    pub group_type: Option<GroupType>,
395}
396
397/// Result of [`AdminClient::describe_topic_partitions()`].
398#[non_exhaustive]
399#[derive(Debug, Clone)]
400pub struct DescribeTopicPartitionsResult {
401    /// Described topics.
402    pub topics: Vec<TopicPartitionDescription>,
403    /// Pagination cursor topic name for the next page, if more pages remain.
404    pub next_cursor_topic: Option<String>,
405    /// Pagination cursor partition index for the next page.
406    pub next_cursor_partition: Option<i32>,
407}
408
409/// Per-topic result from [`AdminClient::describe_topic_partitions()`].
410#[non_exhaustive]
411#[derive(Debug, Clone)]
412pub struct TopicPartitionDescription {
413    /// Topic name.
414    pub name: Option<String>,
415    /// Topic ID (UUID).
416    pub topic_id: [u8; 16],
417    /// Whether the topic is internal.
418    pub is_internal: bool,
419    /// Partitions.
420    pub partitions: Vec<PartitionDescription>,
421    /// Authorized operations bitfield.
422    pub topic_authorized_operations: i32,
423    /// Error message if any.
424    pub error: Option<String>,
425}
426
427/// Per-partition detail from [`AdminClient::describe_topic_partitions()`].
428#[non_exhaustive]
429#[derive(Debug, Clone)]
430pub struct PartitionDescription {
431    /// Partition index.
432    pub partition_index: i32,
433    /// Leader broker ID.
434    pub leader_id: i32,
435    /// Leader epoch.
436    pub leader_epoch: i32,
437    /// Replica broker IDs.
438    pub replica_nodes: Vec<i32>,
439    /// ISR broker IDs.
440    pub isr_nodes: Vec<i32>,
441    /// Eligible leader replicas (KIP-966).
442    pub eligible_leader_replicas: Option<Vec<i32>>,
443    /// Last known ELR (KIP-966).
444    pub last_known_elr: Option<Vec<i32>>,
445    /// Offline replica broker IDs.
446    pub offline_replicas: Vec<i32>,
447    /// Error message if any.
448    pub error: Option<String>,
449}
450
451/// Result of deleting records from a partition.
452#[non_exhaustive]
453#[derive(Debug, Clone)]
454pub struct DeleteRecordResult {
455    /// Topic name.
456    pub topic: String,
457    /// Partition index.
458    pub partition: i32,
459    /// The new log start offset (low watermark) after deletion.
460    pub low_watermark: i64,
461    /// Error message if any.
462    pub error: Option<String>,
463}
464
465/// Result of an OffsetForLeaderEpoch request for a partition.
466#[non_exhaustive]
467#[derive(Debug, Clone)]
468pub struct LeaderEpochResult {
469    /// Topic name.
470    pub topic: String,
471    /// Partition index.
472    pub partition: i32,
473    /// The leader epoch.
474    pub leader_epoch: i32,
475    /// The end offset for this leader epoch.
476    pub end_offset: i64,
477    /// Error message if any.
478    pub error: Option<String>,
479}
480
481/// A principal authorized to renew a delegation token.
482#[non_exhaustive]
483#[derive(Debug, Clone)]
484pub struct DelegationTokenRenewer {
485    /// Principal type (e.g., `"User"`).
486    pub principal_type: String,
487    /// Principal name.
488    pub principal_name: String,
489}
490
491/// A delegation token returned by [`AdminClient::create_delegation_token()`] or
492/// [`AdminClient::describe_delegation_token()`].
493#[non_exhaustive]
494#[derive(Clone)]
495pub struct DelegationToken {
496    /// Token owner principal type (e.g., `"User"`).
497    pub principal_type: String,
498    /// Token owner principal name.
499    pub principal_name: String,
500    /// When the token was issued (ms since epoch).
501    pub issue_timestamp_ms: i64,
502    /// When the token expires (ms since epoch).
503    pub expiry_timestamp_ms: i64,
504    /// Maximum timestamp at which the token can be renewed (ms since epoch).
505    pub max_timestamp_ms: i64,
506    /// Unique token ID.
507    pub token_id: String,
508    /// HMAC of the delegation token (used for SASL authentication).
509    pub hmac: Bytes,
510    /// Principals authorized to renew this token.
511    ///
512    /// Populated by [`AdminClient::describe_delegation_token()`]. Empty when
513    /// returned from [`AdminClient::create_delegation_token()`] because the
514    /// Create response does not include the renewer list.
515    pub renewers: Vec<DelegationTokenRenewer>,
516}
517
518impl std::fmt::Debug for DelegationToken {
519    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
520        f.debug_struct("DelegationToken")
521            .field("principal_type", &self.principal_type)
522            .field("principal_name", &self.principal_name)
523            .field("issue_timestamp_ms", &self.issue_timestamp_ms)
524            .field("expiry_timestamp_ms", &self.expiry_timestamp_ms)
525            .field("max_timestamp_ms", &self.max_timestamp_ms)
526            .field("token_id", &self.token_id)
527            .field("hmac", &"[REDACTED]")
528            .field("renewers", &self.renewers)
529            .finish()
530    }
531}
532
533/// Result of creating a delegation token.
534#[non_exhaustive]
535#[derive(Debug, Clone)]
536pub struct CreateDelegationTokenResult {
537    /// The created delegation token (present on success).
538    pub token: Option<DelegationToken>,
539    /// Error message if any.
540    pub error: Option<String>,
541}
542
543/// Result of renewing a delegation token.
544#[non_exhaustive]
545#[derive(Debug, Clone)]
546pub struct RenewDelegationTokenResult {
547    /// New expiry timestamp (ms since epoch).
548    pub expiry_timestamp_ms: i64,
549    /// Error message if any.
550    pub error: Option<String>,
551}
552
553/// Result of expiring a delegation token.
554#[non_exhaustive]
555#[derive(Debug, Clone)]
556pub struct ExpireDelegationTokenResult {
557    /// New expiry timestamp (ms since epoch).
558    pub expiry_timestamp_ms: i64,
559    /// Error message if any.
560    pub error: Option<String>,
561}
562
563/// A quota entity component describing who the quota applies to.
564#[non_exhaustive]
565#[derive(Debug, Clone)]
566pub struct QuotaEntityComponent {
567    /// Entity type (e.g., `"user"`, `"client-id"`, `"ip"`).
568    pub entity_type: String,
569    /// Entity name. `None` represents the default entity.
570    pub entity_name: Option<String>,
571}
572
573/// A quota configuration value.
574#[non_exhaustive]
575#[derive(Debug, Clone)]
576pub struct QuotaConfig {
577    /// Quota key (e.g., `"producer_byte_rate"`, `"consumer_byte_rate"`,
578    /// `"request_percentage"`).
579    pub key: String,
580    /// Quota value.
581    pub value: f64,
582}
583
584/// A quota entry describing the quotas applied to an entity.
585#[non_exhaustive]
586#[derive(Debug, Clone)]
587pub struct QuotaDescription {
588    /// Entity components (user, client-id, ip).
589    pub entity: Vec<QuotaEntityComponent>,
590    /// Quota configuration values.
591    pub values: Vec<QuotaConfig>,
592}
593
594/// Result of describing client quotas.
595#[non_exhaustive]
596#[derive(Debug, Clone)]
597pub struct DescribeClientQuotasResult {
598    /// Quota entries matching the filter.
599    pub entries: Vec<QuotaDescription>,
600    /// Error message if any.
601    pub error: Option<String>,
602}
603
604/// Result of altering a single quota entity.
605#[non_exhaustive]
606#[derive(Debug, Clone)]
607pub struct AlterClientQuotaResult {
608    /// Entity components that were altered.
609    pub entity: Vec<QuotaEntityComponent>,
610    /// Error message if any.
611    pub error: Option<String>,
612}
613
614/// Input for [`AdminClient::alter_client_quotas`].
615///
616/// Describes a set of quota operations (set or remove) to apply to a
617/// single entity. An entity is identified by a list of (type, name) pairs —
618/// for example `[("user", Some("alice")), ("client-id", None)]`.
619#[derive(Debug, Clone)]
620pub struct QuotaAlteration<'a> {
621    /// Entity components (type, optional name). `None` name targets the
622    /// default entity for that type.
623    pub entity: Vec<(&'a str, Option<&'a str>)>,
624    /// Quota operations. `Some(value)` sets the quota key;
625    /// `None` removes it.
626    pub ops: Vec<(&'a str, Option<f64>)>,
627}
628
629/// Filter for ACL operations (describe, delete).
630///
631/// This struct encapsulates all the filter parameters for ACL queries.
632#[non_exhaustive]
633#[derive(Debug, Clone, Default)]
634pub struct AclFilter {
635    /// Resource type to filter by.
636    pub resource_type: AclResourceType,
637    /// Resource name to filter by (None for any).
638    pub resource_name: Option<String>,
639    /// Pattern type for matching.
640    pub pattern_type: AclPatternType,
641    /// Principal to filter by (None for any).
642    pub principal: Option<String>,
643    /// Host to filter by (None for any).
644    pub host: Option<String>,
645    /// Operation to filter by.
646    pub operation: AclOperation,
647    /// Permission type to filter by.
648    pub permission_type: AclPermissionType,
649}
650
651impl AclFilter {
652    /// Create a new ACL filter that matches all ACLs.
653    pub fn all() -> Self {
654        Self::default()
655    }
656
657    /// Create a filter for a specific resource.
658    pub fn for_resource(resource_type: AclResourceType, resource_name: impl Into<String>) -> Self {
659        Self {
660            resource_type,
661            resource_name: Some(resource_name.into()),
662            ..Default::default()
663        }
664    }
665
666    /// Create a filter for a specific principal.
667    pub fn for_principal(principal: impl Into<String>) -> Self {
668        Self {
669            principal: Some(principal.into()),
670            ..Default::default()
671        }
672    }
673
674    /// Set the resource type.
675    pub fn resource_type(mut self, resource_type: AclResourceType) -> Self {
676        self.resource_type = resource_type;
677        self
678    }
679
680    /// Set the resource name.
681    pub fn resource_name(mut self, name: impl Into<String>) -> Self {
682        self.resource_name = Some(name.into());
683        self
684    }
685
686    /// Set the pattern type.
687    pub fn pattern_type(mut self, pattern_type: AclPatternType) -> Self {
688        self.pattern_type = pattern_type;
689        self
690    }
691
692    /// Set the principal.
693    pub fn principal(mut self, principal: impl Into<String>) -> Self {
694        self.principal = Some(principal.into());
695        self
696    }
697
698    /// Set the host.
699    pub fn host(mut self, host: impl Into<String>) -> Self {
700        self.host = Some(host.into());
701        self
702    }
703
704    /// Set the operation.
705    pub fn operation(mut self, operation: AclOperation) -> Self {
706        self.operation = operation;
707        self
708    }
709
710    /// Set the permission type.
711    pub fn permission_type(mut self, permission_type: AclPermissionType) -> Self {
712        self.permission_type = permission_type;
713        self
714    }
715}
716
717/// Admin client configuration.
718///
719/// Use [`AdminConfig::builder()`] or [`Default::default()`] to construct.
720#[derive(Debug, Clone)]
721pub struct AdminConfig {
722    /// Bootstrap servers.
723    pub(crate) bootstrap_servers: String,
724    /// Client ID.
725    pub(crate) client_id: String,
726    /// Request timeout.
727    pub(crate) request_timeout: Duration,
728    /// Metadata recovery strategy (KIP-899).
729    pub(crate) metadata_recovery_strategy: MetadataRecoveryStrategy,
730    /// Duration after which failing metadata refreshes trigger a rebootstrap
731    /// (KIP-899). Only effective with
732    /// [`MetadataRecoveryStrategy::Rebootstrap`]. Default: 300 s.
733    pub(crate) metadata_recovery_rebootstrap_trigger: Duration,
734    /// Authentication configuration (optional).
735    pub(crate) auth: Option<AuthConfig>,
736    /// SOCKS5 proxy configuration (optional).
737    #[cfg(feature = "socks5")]
738    pub(crate) proxy: Option<crate::network::ProxyConfig>,
739}
740
741impl Default for AdminConfig {
742    fn default() -> Self {
743        Self {
744            bootstrap_servers: String::new(),
745            client_id: "krafka-admin".to_string(),
746            request_timeout: Duration::from_secs(30),
747            metadata_recovery_strategy: MetadataRecoveryStrategy::Rebootstrap,
748            metadata_recovery_rebootstrap_trigger: Duration::from_secs(300),
749            auth: None,
750            #[cfg(feature = "socks5")]
751            proxy: None,
752        }
753    }
754}
755
756impl AdminConfig {
757    /// Create a new config builder.
758    pub fn builder() -> AdminConfigBuilder {
759        AdminConfigBuilder::default()
760    }
761
762    /// Returns the bootstrap servers.
763    #[inline]
764    pub fn bootstrap_servers(&self) -> &str {
765        &self.bootstrap_servers
766    }
767
768    /// Returns the client ID.
769    #[inline]
770    pub fn client_id(&self) -> &str {
771        &self.client_id
772    }
773
774    /// Returns the request timeout.
775    #[inline]
776    pub fn request_timeout(&self) -> Duration {
777        self.request_timeout
778    }
779
780    /// Returns the metadata recovery strategy (KIP-899).
781    #[inline]
782    pub fn metadata_recovery_strategy(&self) -> MetadataRecoveryStrategy {
783        self.metadata_recovery_strategy
784    }
785
786    /// Returns the rebootstrap trigger duration (KIP-899).
787    #[inline]
788    pub fn metadata_recovery_rebootstrap_trigger(&self) -> Duration {
789        self.metadata_recovery_rebootstrap_trigger
790    }
791
792    /// Returns the authentication configuration, if set.
793    #[inline]
794    pub fn auth(&self) -> Option<&AuthConfig> {
795        self.auth.as_ref()
796    }
797
798    /// Returns the SOCKS5 proxy configuration, if set.
799    #[cfg(feature = "socks5")]
800    #[inline]
801    pub fn proxy(&self) -> Option<&crate::network::ProxyConfig> {
802        self.proxy.as_ref()
803    }
804}
805
806/// Builder for AdminConfig.
807#[must_use = "builders do nothing until .build() is called"]
808#[derive(Debug, Default)]
809pub struct AdminConfigBuilder {
810    config: AdminConfig,
811}
812
813impl AdminConfigBuilder {
814    /// Set bootstrap servers.
815    pub fn bootstrap_servers(mut self, servers: impl Into<String>) -> Self {
816        self.config.bootstrap_servers = servers.into();
817        self
818    }
819
820    /// Set client ID.
821    pub fn client_id(mut self, id: impl Into<String>) -> Self {
822        self.config.client_id = id.into();
823        self
824    }
825
826    /// Set request timeout.
827    pub fn request_timeout(mut self, timeout: Duration) -> Self {
828        self.config.request_timeout = timeout;
829        self
830    }
831
832    /// Set the metadata recovery strategy (KIP-899).
833    pub fn metadata_recovery_strategy(mut self, strategy: MetadataRecoveryStrategy) -> Self {
834        self.config.metadata_recovery_strategy = strategy;
835        self
836    }
837
838    /// Set the rebootstrap trigger duration (KIP-899).
839    ///
840    /// Only effective when [`MetadataRecoveryStrategy::Rebootstrap`] is set.
841    pub fn metadata_recovery_rebootstrap_trigger(mut self, duration: Duration) -> Self {
842        self.config.metadata_recovery_rebootstrap_trigger = duration;
843        self
844    }
845
846    /// Set authentication configuration.
847    pub fn auth(mut self, auth: AuthConfig) -> Self {
848        self.config.auth = Some(auth);
849        self
850    }
851
852    /// Set SOCKS5 proxy configuration.
853    #[cfg(feature = "socks5")]
854    pub fn proxy(mut self, proxy: crate::network::ProxyConfig) -> Self {
855        self.config.proxy = Some(proxy);
856        self
857    }
858
859    /// Build the AdminConfig.
860    pub fn build(self) -> AdminConfig {
861        self.config
862    }
863}
864
865/// Result of deleting a single consumer group.
866#[non_exhaustive]
867#[derive(Debug, Clone)]
868pub struct DeleteGroupResult {
869    /// Group ID.
870    pub group_id: String,
871    /// Error message if any.
872    pub error: Option<String>,
873}
874
875/// Cluster description returned by [`AdminClient::describe_cluster`].
876#[non_exhaustive]
877#[derive(Debug, Clone)]
878pub struct DescribeClusterResult {
879    /// Cluster ID.
880    pub cluster_id: String,
881    /// Controller broker ID.
882    pub controller_id: i32,
883    /// Brokers in the cluster.
884    pub brokers: Vec<DescribeClusterBrokerInfo>,
885    /// Authorized operations bitfield (-2^31 if not requested).
886    pub cluster_authorized_operations: i32,
887}
888
889/// Broker entry in [`DescribeClusterResult`].
890#[non_exhaustive]
891#[derive(Debug, Clone)]
892pub struct DescribeClusterBrokerInfo {
893    /// Broker ID.
894    pub broker_id: i32,
895    /// Hostname.
896    pub host: String,
897    /// Port.
898    pub port: i32,
899    /// Rack (if assigned).
900    pub rack: Option<String>,
901}
902
903/// Kafka admin client for cluster administration.
904pub struct AdminClient {
905    /// Configuration.
906    config: AdminConfig,
907    /// Cluster metadata.
908    metadata: Arc<ClusterMetadata>,
909    /// Connection pool.
910    pool: Arc<ConnectionPool>,
911    /// Whether the client has been closed.
912    closed: std::sync::atomic::AtomicBool,
913}
914
915impl Drop for AdminClient {
916    fn drop(&mut self) {
917        // Warn when the client is dropped without an explicit `close()`:
918        // in-flight RPCs are terminated abruptly and connections are not
919        // cleanly shut down. Skip during panic unwinding.
920        if !self.closed.load(std::sync::atomic::Ordering::SeqCst) && !std::thread::panicking() {
921            warn!(
922                "AdminClient dropped without close(); in-flight RPCs may fail abruptly. \
923                 Call `AdminClient::close()` before drop."
924            );
925        }
926    }
927}
928
929impl AdminClient {
930    /// Create a new admin client builder.
931    pub fn builder() -> AdminClientBuilder {
932        AdminClientBuilder::default()
933    }
934
935    /// Return an error if the admin client has been closed.
936    ///
937    /// **Note:** This is a best-effort check. A concurrent call to [`close()`](Self::close)
938    /// can race with the RPC that follows, in which case the RPC itself will fail
939    /// with a network error rather than an "AdminClient is closed" message.
940    #[inline]
941    fn check_not_closed(&self) -> Result<()> {
942        if self.is_closed() {
943            return Err(KrafkaError::invalid_state("AdminClient is closed"));
944        }
945        Ok(())
946    }
947
948    /// Get a connection to any available broker.
949    ///
950    /// Checks the client is not closed, picks the first available broker, and
951    /// returns a connection from the pool. Most admin commands can be sent to
952    /// any broker (the broker will forward as needed).
953    async fn get_any_broker_connection(&self) -> Result<Arc<BrokerConnection>> {
954        self.check_not_closed()?;
955        let brokers = self.metadata.brokers();
956        if brokers.is_empty() {
957            return Err(KrafkaError::broker(
958                crate::error::ErrorCode::UnknownServerError,
959                "no brokers available",
960            ));
961        }
962        let broker = &brokers[0];
963        self.pool
964            .get_connection_by_id(broker.id, broker.address())
965            .await
966    }
967
968    /// Create topics.
969    pub async fn create_topics(
970        &self,
971        topics: Vec<NewTopic>,
972        timeout: Duration,
973    ) -> Result<Vec<CreateTopicResult>> {
974        let conn = self.get_any_broker_connection().await?;
975
976        // Build request
977        let request = CreateTopicsRequest {
978            topics: topics
979                .iter()
980                .map(|t| CreatableTopic {
981                    name: t.name.clone(),
982                    num_partitions: t.num_partitions,
983                    replication_factor: t.replication_factor,
984                    assignments: Vec::new(),
985                    configs: t
986                        .configs
987                        .iter()
988                        .map(|(k, v)| CreatableTopicConfig {
989                            name: k.clone(),
990                            value: Some(v.clone()),
991                        })
992                        .collect(),
993                })
994                .collect(),
995            timeout_ms: crate::util::duration_to_millis_i32(timeout),
996            validate_only: false,
997        };
998
999        // Send request — negotiate API version with broker
1000        let version = conn
1001            .negotiate_api_version(
1002                ApiKey::CreateTopics,
1003                versions::CREATE_TOPICS_MAX,
1004                versions::CREATE_TOPICS_MIN,
1005            )
1006            .await
1007            .ok_or_else(|| {
1008                KrafkaError::protocol("no mutually supported CreateTopics API version")
1009            })?;
1010
1011        let response_bytes = conn
1012            .send_request(ApiKey::CreateTopics, version, |buf| {
1013                request.encode_versioned(version, buf)
1014            })
1015            .await?;
1016
1017        // Decode response
1018        let mut buf = response_bytes;
1019        let response = CreateTopicsResponse::decode_versioned(version, &mut buf)?;
1020
1021        // Convert to results
1022        let results = response
1023            .topics
1024            .into_iter()
1025            .map(|t| CreateTopicResult {
1026                name: t.name,
1027                error: if t.error_code.is_ok() {
1028                    None
1029                } else {
1030                    Some(
1031                        t.error_message
1032                            .unwrap_or_else(|| format!("{:?}", t.error_code)),
1033                    )
1034                },
1035            })
1036            .collect();
1037
1038        info!("Created {} topics", topics.len());
1039        Ok(results)
1040    }
1041
1042    /// Delete topics.
1043    pub async fn delete_topics(
1044        &self,
1045        topics: Vec<String>,
1046        timeout: Duration,
1047    ) -> Result<Vec<DeleteTopicResult>> {
1048        // H6: reject oversize topic names at ingress so we never reach the
1049        // panicking `KafkaString::encode` path.
1050        validate_topic_names(topics.iter().map(String::as_str))?;
1051        let conn = self.get_any_broker_connection().await?;
1052
1053        // Build request — populate both fields so the correct one is used
1054        // regardless of the negotiated version (v1–v5 use topic_names, v6+ use topics).
1055        let delete_topic_states: Vec<DeleteTopicState> = topics
1056            .iter()
1057            .map(|name| DeleteTopicState {
1058                name: Some(name.clone()),
1059                // Null UUID: deletion by topic name, not UUID.
1060                topic_id: [0u8; 16],
1061            })
1062            .collect();
1063        let topic_count = topics.len();
1064        let request = DeleteTopicsRequest {
1065            topic_names: topics,
1066            topics: delete_topic_states,
1067            timeout_ms: crate::util::duration_to_millis_i32(timeout),
1068        };
1069
1070        // Send request — negotiate API version with broker
1071        let version = conn
1072            .negotiate_api_version(
1073                ApiKey::DeleteTopics,
1074                versions::DELETE_TOPICS_MAX,
1075                versions::DELETE_TOPICS_MIN,
1076            )
1077            .await
1078            .ok_or_else(|| {
1079                KrafkaError::protocol("no mutually supported DeleteTopics API version")
1080            })?;
1081
1082        let response_bytes = conn
1083            .send_request(ApiKey::DeleteTopics, version, |buf| {
1084                request.encode_versioned(version, buf)
1085            })
1086            .await?;
1087
1088        // Decode response
1089        let mut buf = response_bytes;
1090        let response = DeleteTopicsResponse::decode_versioned(version, &mut buf)?;
1091
1092        // Convert to results
1093        let results = response
1094            .responses
1095            .into_iter()
1096            .map(|r| DeleteTopicResult {
1097                name: r.name.unwrap_or_default(),
1098                error: if r.error_code.is_ok() {
1099                    None
1100                } else {
1101                    Some(
1102                        r.error_message
1103                            .unwrap_or_else(|| format!("{:?}", r.error_code)),
1104                    )
1105                },
1106            })
1107            .collect();
1108
1109        info!("Deleted {} topics", topic_count);
1110        Ok(results)
1111    }
1112
1113    /// Increase the number of partitions for a topic.
1114    ///
1115    /// Note: Partition count can only be increased, never decreased.
1116    pub async fn create_partitions(
1117        &self,
1118        topic: impl Into<String>,
1119        new_total_count: i32,
1120        timeout: Duration,
1121    ) -> Result<CreatePartitionsResult> {
1122        let topic_name = topic.into();
1123        // H6: reject oversize topic names at ingress so we never reach the
1124        // panicking `KafkaString::encode` path.
1125        validate_topic_name(&topic_name)?;
1126        let conn = self.get_any_broker_connection().await?;
1127
1128        // Build request
1129        let request = CreatePartitionsRequest {
1130            topics: vec![CreatePartitionsTopic {
1131                name: topic_name.clone(),
1132                count: new_total_count,
1133                assignments: None,
1134            }],
1135            timeout_ms: crate::util::duration_to_millis_i32(timeout),
1136            validate_only: false,
1137        };
1138
1139        // Send request — negotiate API version with broker
1140        let version = conn
1141            .negotiate_api_version(
1142                ApiKey::CreatePartitions,
1143                versions::CREATE_PARTITIONS_MAX,
1144                versions::CREATE_PARTITIONS_MIN,
1145            )
1146            .await
1147            .ok_or_else(|| {
1148                KrafkaError::protocol("no mutually supported CreatePartitions API version")
1149            })?;
1150
1151        let response_bytes = conn
1152            .send_request(ApiKey::CreatePartitions, version, |buf| {
1153                request.encode_versioned(version, buf)
1154            })
1155            .await?;
1156
1157        // Decode response
1158        let mut buf = response_bytes;
1159        let response = CreatePartitionsResponse::decode_versioned(version, &mut buf)?;
1160
1161        let result = response
1162            .results
1163            .into_iter()
1164            .next()
1165            .map(|r| CreatePartitionsResult {
1166                topic: r.name,
1167                error: if r.error_code.is_ok() {
1168                    None
1169                } else {
1170                    Some(
1171                        r.error_message
1172                            .unwrap_or_else(|| format!("{:?}", r.error_code)),
1173                    )
1174                },
1175            })
1176            .unwrap_or(CreatePartitionsResult {
1177                topic: topic_name.clone(),
1178                error: Some("no response received".to_string()),
1179            });
1180
1181        if result.error.is_none() {
1182            info!(
1183                "Increased partitions for topic {} to {}",
1184                topic_name, new_total_count
1185            );
1186        }
1187        Ok(result)
1188    }
1189
1190    /// Describe configuration for one or more resources (topics, brokers, etc.).
1191    ///
1192    /// Uses DescribeConfigs (API Key 32). Build a [`DescribeConfigsRequest`]
1193    /// via its convenience constructors (`for_topic`, `for_broker`) or manually
1194    /// populate the `resources` field for multi-resource queries.
1195    pub async fn describe_configs(
1196        &self,
1197        request: DescribeConfigsRequest,
1198    ) -> Result<Vec<ConfigEntry>> {
1199        let conn = self.get_any_broker_connection().await?;
1200
1201        let version = conn
1202            .negotiate_api_version(
1203                ApiKey::DescribeConfigs,
1204                versions::DESCRIBE_CONFIGS_MAX,
1205                versions::DESCRIBE_CONFIGS_MIN,
1206            )
1207            .await
1208            .ok_or_else(|| {
1209                KrafkaError::protocol("no mutually supported DescribeConfigs API version")
1210            })?;
1211
1212        let response_bytes = conn
1213            .send_request(ApiKey::DescribeConfigs, version, |buf| {
1214                request.encode_versioned(version, buf)
1215            })
1216            .await?;
1217
1218        let mut buf = response_bytes;
1219        let response = DescribeConfigsResponse::decode_versioned(version, &mut buf)?;
1220
1221        let entries = response
1222            .results
1223            .into_iter()
1224            .flat_map(|r| {
1225                if !r.error_code.is_ok() {
1226                    return Vec::new();
1227                }
1228                r.configs
1229                    .into_iter()
1230                    .map(|c| ConfigEntry {
1231                        name: c.name,
1232                        value: c.value,
1233                        read_only: c.read_only,
1234                        is_default: c.is_default,
1235                        is_sensitive: c.is_sensitive,
1236                        config_source: c.config_source,
1237                        synonyms: c
1238                            .synonyms
1239                            .into_iter()
1240                            .map(|s| ConfigSynonymEntry {
1241                                name: s.name,
1242                                value: s.value,
1243                                source: s.source,
1244                            })
1245                            .collect(),
1246                        config_type: c.config_type,
1247                        documentation: c.documentation,
1248                    })
1249                    .collect()
1250            })
1251            .collect();
1252
1253        Ok(entries)
1254    }
1255
1256    /// Alter configuration for a topic.
1257    ///
1258    /// Uses IncrementalAlterConfigs (API Key 44) to set individual config keys
1259    /// without replacing the entire config. Each key-value pair is applied as a
1260    /// SET operation.
1261    pub async fn alter_topic_config(
1262        &self,
1263        topic: &str,
1264        configs: HashMap<String, String>,
1265    ) -> Result<AlterConfigResult> {
1266        let conn = self.get_any_broker_connection().await?;
1267
1268        let request = IncrementalAlterConfigsRequest::for_topic(
1269            topic,
1270            configs
1271                .into_iter()
1272                .map(|(name, value)| AlterableConfig {
1273                    name,
1274                    config_operation: AlterConfigOp::Set,
1275                    value: Some(value),
1276                })
1277                .collect(),
1278        );
1279
1280        let version = conn
1281            .negotiate_api_version(
1282                ApiKey::IncrementalAlterConfigs,
1283                versions::INCREMENTAL_ALTER_CONFIGS_MAX,
1284                versions::INCREMENTAL_ALTER_CONFIGS_MIN,
1285            )
1286            .await
1287            .ok_or_else(|| {
1288                KrafkaError::protocol("no mutually supported IncrementalAlterConfigs API version")
1289            })?;
1290
1291        let response_bytes = conn
1292            .send_request(ApiKey::IncrementalAlterConfigs, version, |buf| {
1293                request.encode_versioned(version, buf)
1294            })
1295            .await?;
1296
1297        let mut buf = response_bytes;
1298        let response = IncrementalAlterConfigsResponse::decode_versioned(version, &mut buf)?;
1299
1300        let result = response
1301            .results
1302            .into_iter()
1303            .next()
1304            .map(|r| AlterConfigResult {
1305                resource_name: r.resource_name,
1306                error: if r.error_code.is_ok() {
1307                    None
1308                } else {
1309                    Some(
1310                        r.error_message
1311                            .unwrap_or_else(|| format!("{:?}", r.error_code)),
1312                    )
1313                },
1314            })
1315            .unwrap_or(AlterConfigResult {
1316                resource_name: topic.to_string(),
1317                error: Some("no response received".to_string()),
1318            });
1319
1320        if result.error.is_none() {
1321            info!("Altered config for topic {}", topic);
1322        }
1323        Ok(result)
1324    }
1325
1326    /// List all topics.
1327    pub async fn list_topics(&self) -> Result<Vec<String>> {
1328        self.check_not_closed()?;
1329        self.metadata.refresh().await?;
1330        Ok(self.metadata.topics().into_iter().map(|t| t.name).collect())
1331    }
1332
1333    /// Describe topics.
1334    pub async fn describe_topics(&self, topics: &[String]) -> Result<Vec<TopicInfo>> {
1335        self.check_not_closed()?;
1336        self.metadata.refresh().await?;
1337        let all_topics = self.metadata.topics();
1338
1339        let mut result = Vec::new();
1340        for topic_name in topics {
1341            if let Some(info) = all_topics.iter().find(|t| &t.name == topic_name) {
1342                result.push(info.clone());
1343            }
1344        }
1345        Ok(result)
1346    }
1347
1348    /// Describe the cluster using the DescribeCluster API (Key 60).
1349    ///
1350    /// Returns cluster metadata including cluster ID, controller, brokers,
1351    /// and authorized operations.
1352    pub async fn describe_cluster(&self) -> Result<DescribeClusterResult> {
1353        self.check_not_closed()?;
1354        let conn = self.get_any_broker_connection().await?;
1355
1356        let request = DescribeClusterRequest::default();
1357        let version = conn
1358            .negotiate_api_version(
1359                ApiKey::DescribeCluster,
1360                versions::DESCRIBE_CLUSTER_MAX,
1361                versions::DESCRIBE_CLUSTER_MIN,
1362            )
1363            .await
1364            .ok_or_else(|| {
1365                KrafkaError::protocol("no mutually supported DescribeCluster API version")
1366            })?;
1367
1368        let response_bytes = conn
1369            .send_request(ApiKey::DescribeCluster, version, |buf| {
1370                request.encode_versioned(version, buf)
1371            })
1372            .await?;
1373
1374        let mut buf = response_bytes;
1375        let response = DescribeClusterResponse::decode_versioned(version, &mut buf)?;
1376
1377        if !response.error_code.is_ok() {
1378            let msg = response
1379                .error_message
1380                .unwrap_or_else(|| format!("{:?}", response.error_code));
1381            return Err(KrafkaError::protocol(msg));
1382        }
1383
1384        Ok(DescribeClusterResult {
1385            cluster_id: response.cluster_id,
1386            controller_id: response.controller_id,
1387            brokers: response
1388                .brokers
1389                .into_iter()
1390                .map(|b| DescribeClusterBrokerInfo {
1391                    broker_id: b.broker_id,
1392                    host: b.host,
1393                    port: b.port,
1394                    rack: b.rack,
1395                })
1396                .collect(),
1397            cluster_authorized_operations: response.cluster_authorized_operations,
1398        })
1399    }
1400
1401    /// Get partition count for a topic.
1402    pub async fn partition_count(&self, topic: &str) -> Result<Option<usize>> {
1403        self.check_not_closed()?;
1404        self.metadata.refresh().await?;
1405        Ok(self.metadata.partition_count(topic))
1406    }
1407
1408    /// Get the client ID.
1409    #[inline]
1410    pub fn client_id(&self) -> &str {
1411        &self.config.client_id
1412    }
1413
1414    /// Get the request timeout.
1415    #[inline]
1416    pub fn request_timeout(&self) -> Duration {
1417        self.config.request_timeout
1418    }
1419
1420    /// Describe ACLs matching a filter.
1421    ///
1422    /// # Example
1423    /// ```ignore
1424    /// // Describe all ACLs for a specific topic
1425    /// let filter = AclFilter::for_resource(AclResourceType::Topic, "my-topic");
1426    /// let result = admin.describe_acls(filter).await?;
1427    /// ```
1428    pub async fn describe_acls(&self, filter: AclFilter) -> Result<DescribeAclsResult> {
1429        self.check_not_closed()?;
1430        let conn = self.get_any_broker_connection().await?;
1431
1432        let request = DescribeAclsRequest {
1433            resource_type: filter.resource_type,
1434            resource_name: filter.resource_name,
1435            pattern_type: filter.pattern_type,
1436            principal: filter.principal,
1437            host: filter.host,
1438            operation: filter.operation,
1439            permission_type: filter.permission_type,
1440        };
1441
1442        let version = conn
1443            .negotiate_api_version(
1444                ApiKey::DescribeAcls,
1445                versions::DESCRIBE_ACLS_MAX,
1446                versions::DESCRIBE_ACLS_MIN,
1447            )
1448            .await
1449            .ok_or_else(|| {
1450                KrafkaError::protocol("no mutually supported DescribeAcls API version")
1451            })?;
1452
1453        let response_bytes = conn
1454            .send_request(ApiKey::DescribeAcls, version, |buf| {
1455                request.encode_versioned(version, buf)
1456            })
1457            .await?;
1458
1459        let mut buf = response_bytes;
1460        let response = DescribeAclsResponse::decode_versioned(version, &mut buf)?;
1461
1462        let bindings = response
1463            .resources
1464            .into_iter()
1465            .flat_map(|res| {
1466                res.acls.into_iter().map(move |acl| AclBinding {
1467                    resource_type: res.resource_type,
1468                    resource_name: res.resource_name.clone(),
1469                    pattern_type: res.pattern_type,
1470                    principal: acl.principal,
1471                    host: acl.host,
1472                    operation: acl.operation,
1473                    permission_type: acl.permission_type,
1474                })
1475            })
1476            .collect();
1477
1478        Ok(DescribeAclsResult {
1479            error: if response.error_code.is_ok() {
1480                None
1481            } else {
1482                Some(
1483                    response
1484                        .error_message
1485                        .unwrap_or_else(|| format!("{:?}", response.error_code)),
1486                )
1487            },
1488            bindings,
1489        })
1490    }
1491
1492    /// Create ACLs.
1493    ///
1494    /// # Arguments
1495    /// * `acls` - List of ACL bindings to create
1496    ///
1497    /// # Example
1498    /// ```ignore
1499    /// let acl = AclBinding::allow_read_topic("my-topic", "User:alice");
1500    /// admin.create_acls(vec![acl]).await?;
1501    /// ```
1502    pub async fn create_acls(&self, acls: Vec<AclBinding>) -> Result<CreateAclsResult> {
1503        let conn = self.get_any_broker_connection().await?;
1504
1505        let request = CreateAclsRequest {
1506            creations: acls.clone(),
1507        };
1508
1509        let version = conn
1510            .negotiate_api_version(
1511                ApiKey::CreateAcls,
1512                versions::CREATE_ACLS_MAX,
1513                versions::CREATE_ACLS_MIN,
1514            )
1515            .await
1516            .ok_or_else(|| KrafkaError::protocol("no mutually supported CreateAcls API version"))?;
1517
1518        let response_bytes = conn
1519            .send_request(ApiKey::CreateAcls, version, |buf| {
1520                request.encode_versioned(version, buf)
1521            })
1522            .await?;
1523
1524        let mut buf = response_bytes;
1525        let response = CreateAclsResponse::decode_versioned(version, &mut buf)?;
1526
1527        let results = response
1528            .results
1529            .into_iter()
1530            .map(|r| CreateAclResult {
1531                error: if r.error_code.is_ok() {
1532                    None
1533                } else {
1534                    Some(
1535                        r.error_message
1536                            .unwrap_or_else(|| format!("{:?}", r.error_code)),
1537                    )
1538                },
1539            })
1540            .collect();
1541
1542        info!("Created {} ACLs", acls.len());
1543        Ok(CreateAclsResult { results })
1544    }
1545
1546    /// Delete ACLs matching the specified filters.
1547    ///
1548    /// # Arguments
1549    /// * `filters` - List of ACL binding filters to match for deletion
1550    ///
1551    /// # Example
1552    /// ```ignore
1553    /// // Delete all ACLs for a specific topic
1554    /// let filter = AclBindingFilter {
1555    ///     resource_type: AclResourceType::Topic,
1556    ///     resource_name: Some("my-topic".to_string()),
1557    ///     pattern_type: AclPatternType::Literal,
1558    ///     principal: None,
1559    ///     host: None,
1560    ///     operation: AclOperation::Any,
1561    ///     permission_type: AclPermissionType::Any,
1562    /// };
1563    /// admin.delete_acls(vec![filter]).await?;
1564    /// ```
1565    pub async fn delete_acls(&self, filters: Vec<AclBindingFilter>) -> Result<DeleteAclsResult> {
1566        let conn = self.get_any_broker_connection().await?;
1567
1568        let request = DeleteAclsRequest {
1569            filters: filters.clone(),
1570        };
1571
1572        let version = conn
1573            .negotiate_api_version(
1574                ApiKey::DeleteAcls,
1575                versions::DELETE_ACLS_MAX,
1576                versions::DELETE_ACLS_MIN,
1577            )
1578            .await
1579            .ok_or_else(|| KrafkaError::protocol("no mutually supported DeleteAcls API version"))?;
1580
1581        let response_bytes = conn
1582            .send_request(ApiKey::DeleteAcls, version, |buf| {
1583                request.encode_versioned(version, buf)
1584            })
1585            .await?;
1586
1587        let mut buf = response_bytes;
1588        let response = DeleteAclsResponse::decode_versioned(version, &mut buf)?;
1589
1590        let filter_results = response
1591            .filter_results
1592            .into_iter()
1593            .map(|fr| DeleteAclFilterResult {
1594                error: if fr.error_code.is_ok() {
1595                    None
1596                } else {
1597                    Some(
1598                        fr.error_message
1599                            .unwrap_or_else(|| format!("{:?}", fr.error_code)),
1600                    )
1601                },
1602                deleted_count: fr.matching_acls.len(),
1603            })
1604            .collect();
1605
1606        info!("Deleted ACLs with {} filters", filters.len());
1607        Ok(DeleteAclsResult { filter_results })
1608    }
1609
1610    /// Describe consumer groups.
1611    ///
1612    /// Automatically detects whether each group uses the classic protocol or the
1613    /// new consumer protocol (KIP-848) and dispatches to the appropriate API:
1614    /// - **Classic groups** → DescribeGroups (Key 15)
1615    /// - **Consumer groups** → ConsumerGroupDescribe (Key 69)
1616    ///
1617    /// The returned [`ConsumerGroupDescription`] is a unified type.
1618    /// Fields specific to one protocol variant are `Option`-wrapped.
1619    ///
1620    /// # Example
1621    /// ```ignore
1622    /// let groups = admin
1623    ///     .describe_consumer_groups(vec!["my-group".to_string()])
1624    ///     .await?;
1625    /// for group in &groups {
1626    ///     println!("{}: type={}, state={}, members={}",
1627    ///         group.group_id, group.group_type, group.state, group.members.len());
1628    /// }
1629    /// ```
1630    pub async fn describe_consumer_groups(
1631        &self,
1632        group_ids: Vec<String>,
1633    ) -> Result<Vec<ConsumerGroupDescription>> {
1634        self.check_not_closed()?;
1635        let brokers = self.metadata.brokers();
1636        if brokers.is_empty() {
1637            return Err(KrafkaError::broker(
1638                crate::error::ErrorCode::UnknownServerError,
1639                "no brokers available",
1640            ));
1641        }
1642
1643        // Route each group to its coordinator broker.
1644        let mut coordinator_groups: HashMap<i32, Vec<String>> = HashMap::new();
1645        let any_broker = &brokers[0];
1646        let any_conn = self
1647            .pool
1648            .get_connection_by_id(any_broker.id, any_broker.address())
1649            .await?;
1650
1651        for group_id in &group_ids {
1652            let coord_request = FindCoordinatorRequest::for_group(group_id);
1653            let coord_version = any_conn
1654                .negotiate_api_version(
1655                    ApiKey::FindCoordinator,
1656                    versions::FIND_COORDINATOR_MAX,
1657                    versions::FIND_COORDINATOR_MIN,
1658                )
1659                .await
1660                .ok_or_else(|| {
1661                    KrafkaError::protocol("no mutually supported FindCoordinator API version")
1662                })?;
1663
1664            let coord_response_bytes = any_conn
1665                .send_request(ApiKey::FindCoordinator, coord_version, |buf| {
1666                    coord_request.encode_versioned(coord_version, buf)
1667                })
1668                .await?;
1669            let mut coord_buf = coord_response_bytes;
1670            let coord_response =
1671                FindCoordinatorResponse::decode_versioned(coord_version, &mut coord_buf)?;
1672
1673            if coord_response.error_code.is_ok() {
1674                coordinator_groups
1675                    .entry(coord_response.node_id)
1676                    .or_default()
1677                    .push(group_id.clone());
1678            } else {
1679                warn!(
1680                    "FindCoordinator failed for group '{}': {:?}, falling back to broker {}",
1681                    group_id, coord_response.error_code, any_broker.id
1682                );
1683                coordinator_groups
1684                    .entry(any_broker.id)
1685                    .or_default()
1686                    .push(group_id.clone());
1687            }
1688        }
1689
1690        let mut all_results = Vec::new();
1691
1692        for (broker_id, groups) in &coordinator_groups {
1693            let broker = brokers
1694                .iter()
1695                .find(|b| b.id == *broker_id)
1696                .unwrap_or(any_broker);
1697            let conn = self
1698                .pool
1699                .get_connection_by_id(broker.id, broker.address())
1700                .await?;
1701
1702            // Try ConsumerGroupDescribe (Key 69) first for all groups on this broker.
1703            let kip848_version = conn
1704                .negotiate_api_version(
1705                    ApiKey::ConsumerGroupDescribe,
1706                    versions::CONSUMER_GROUP_DESCRIBE_MAX,
1707                    versions::CONSUMER_GROUP_DESCRIBE_MIN,
1708                )
1709                .await;
1710
1711            let mut classic_fallback: Vec<String> = Vec::new();
1712            let mut maybe_classic: Vec<(String, ConsumerGroupDescription)> = Vec::new();
1713
1714            if let Some(version) = kip848_version {
1715                let request = ConsumerGroupDescribeRequest::new(groups.clone());
1716                let response_bytes = conn
1717                    .send_request(ApiKey::ConsumerGroupDescribe, version, |buf| {
1718                        request.encode_versioned(version, buf)
1719                    })
1720                    .await?;
1721
1722                let mut buf = response_bytes;
1723                let response = ConsumerGroupDescribeResponse::decode_versioned(version, &mut buf)?;
1724
1725                // ConsumerGroupDescribe (Key 69) returns per-group error codes
1726                // that tell us which groups need the classic DescribeGroups path:
1727                //
1728                //  • GroupIdNotFound  — classic group (Kafka 3.7–3.8 or 4.0+
1729                //                       with a group that was never a consumer group)
1730                //  • UnsupportedVersion — classic group (Kafka 3.9)
1731                //  • OK + empty members — ambiguous on 3.7–3.8; we try the
1732                //                         classic path too and prefer whichever
1733                //                         reports members.
1734
1735                for g in response.groups {
1736                    debug!(
1737                        "ConsumerGroupDescribe for '{}': error={:?}, state='{}', members={}",
1738                        g.group_id,
1739                        g.error_code,
1740                        g.group_state,
1741                        g.members.len()
1742                    );
1743                    if g.error_code == crate::error::ErrorCode::GroupIdNotFound
1744                        || g.error_code == crate::error::ErrorCode::UnsupportedVersion
1745                    {
1746                        // Classic-protocol group — fall back to DescribeGroups (Key 15).
1747                        debug!(
1748                            "ConsumerGroupDescribe for '{}' returned {:?}, \
1749                             will retry with DescribeGroups (Key 15)",
1750                            g.group_id, g.error_code
1751                        );
1752                        classic_fallback.push(g.group_id);
1753                        continue;
1754                    }
1755
1756                    let members_empty = g.members.is_empty() && g.error_code.is_ok();
1757                    let group_id_clone = g.group_id.clone();
1758
1759                    let desc = ConsumerGroupDescription {
1760                        group_id: g.group_id,
1761                        group_type: GroupType::Consumer,
1762                        state: g.group_state,
1763                        protocol_type: None,
1764                        assignor: Some(g.assignor_name),
1765                        group_epoch: Some(g.group_epoch),
1766                        assignment_epoch: Some(g.assignment_epoch),
1767                        members: g
1768                            .members
1769                            .into_iter()
1770                            .map(|m| ConsumerGroupMember {
1771                                member_id: m.member_id,
1772                                instance_id: m.instance_id,
1773                                rack_id: m.rack_id,
1774                                member_epoch: Some(m.member_epoch),
1775                                client_id: m.client_id,
1776                                client_host: m.client_host,
1777                                subscribed_topic_names: Some(m.subscribed_topic_names),
1778                                subscribed_topic_regex: m.subscribed_topic_regex,
1779                                assignment: Some(
1780                                    m.assignment
1781                                        .topic_partitions
1782                                        .into_iter()
1783                                        .map(|tp| TopicPartitionAssignment {
1784                                            topic_id: tp.topic_id,
1785                                            topic_name: tp.topic_name,
1786                                            partitions: tp.partitions,
1787                                        })
1788                                        .collect(),
1789                                ),
1790                                target_assignment: Some(
1791                                    m.target_assignment
1792                                        .topic_partitions
1793                                        .into_iter()
1794                                        .map(|tp| TopicPartitionAssignment {
1795                                            topic_id: tp.topic_id,
1796                                            topic_name: tp.topic_name,
1797                                            partitions: tp.partitions,
1798                                        })
1799                                        .collect(),
1800                                ),
1801                                member_type: Some(m.member_type),
1802                            })
1803                            .collect(),
1804                        authorized_operations: Some(g.authorized_operations),
1805                        error: if g.error_code.is_ok() {
1806                            None
1807                        } else {
1808                            let msg = g
1809                                .error_message
1810                                .unwrap_or_else(|| format!("{:?}", g.error_code));
1811                            Some(msg)
1812                        },
1813                    };
1814
1815                    // Kafka 3.7–3.8 (KIP-848 Early Access) may return OK
1816                    // with empty members for classic-protocol groups instead
1817                    // of GroupIdNotFound / UnsupportedVersion.  Try the
1818                    // classic DescribeGroups path and prefer whichever has
1819                    // members.
1820                    if members_empty {
1821                        maybe_classic.push((group_id_clone.clone(), desc));
1822                        classic_fallback.push(group_id_clone);
1823                    } else {
1824                        all_results.push(desc);
1825                    }
1826                }
1827            } else {
1828                // Broker does not support Key 69 — all groups are classic.
1829                classic_fallback = groups.clone();
1830            }
1831
1832            // Describe classic-protocol groups via DescribeGroups (Key 15).
1833            if !classic_fallback.is_empty() {
1834                let request = DescribeGroupsRequest {
1835                    groups: classic_fallback,
1836                    include_authorized_operations: false,
1837                };
1838
1839                let version = conn
1840                    .negotiate_api_version(
1841                        ApiKey::DescribeGroups,
1842                        versions::DESCRIBE_GROUPS_MAX,
1843                        versions::DESCRIBE_GROUPS_MIN,
1844                    )
1845                    .await
1846                    .ok_or_else(|| {
1847                        KrafkaError::protocol("no mutually supported DescribeGroups API version")
1848                    })?;
1849
1850                let response_bytes = conn
1851                    .send_request(ApiKey::DescribeGroups, version, |buf| {
1852                        request.encode_versioned(version, buf)
1853                    })
1854                    .await?;
1855
1856                let mut buf = response_bytes;
1857                let response = DescribeGroupsResponse::decode_versioned(version, &mut buf)?;
1858
1859                for g in response.groups {
1860                    debug!(
1861                        "DescribeGroups (classic) for '{}': error={:?}, state='{}', members={}",
1862                        g.group_id,
1863                        g.error_code,
1864                        g.group_state,
1865                        g.members.len()
1866                    );
1867                    let classic_desc = ConsumerGroupDescription {
1868                        group_id: g.group_id,
1869                        group_type: GroupType::Classic,
1870                        state: g.group_state,
1871                        protocol_type: Some(g.protocol_type),
1872                        assignor: Some(g.protocol_data),
1873                        group_epoch: None,
1874                        assignment_epoch: None,
1875                        members: g
1876                            .members
1877                            .into_iter()
1878                            .map(|m| ConsumerGroupMember {
1879                                member_id: m.member_id,
1880                                instance_id: m.group_instance_id,
1881                                rack_id: None,
1882                                member_epoch: None,
1883                                client_id: m.client_id,
1884                                client_host: m.client_host,
1885                                subscribed_topic_names: None,
1886                                subscribed_topic_regex: None,
1887                                assignment: None,
1888                                target_assignment: None,
1889                                member_type: None,
1890                            })
1891                            .collect(),
1892                        authorized_operations: None,
1893                        error: if g.error_code.is_ok() {
1894                            None
1895                        } else {
1896                            Some(format!("{:?}", g.error_code))
1897                        },
1898                    };
1899
1900                    // If this group was a maybe_classic candidate from
1901                    // ConsumerGroupDescribe, prefer whichever path found
1902                    // members. Remove from maybe_classic so we don't
1903                    // double-add it later.
1904                    if let Some(idx) = maybe_classic
1905                        .iter()
1906                        .position(|(id, _)| *id == classic_desc.group_id)
1907                    {
1908                        let (_, consumer_desc) = maybe_classic.swap_remove(idx);
1909                        if classic_desc.members.is_empty() {
1910                            // Neither path found members — keep the consumer result.
1911                            all_results.push(consumer_desc);
1912                        } else {
1913                            all_results.push(classic_desc);
1914                        }
1915                    } else {
1916                        all_results.push(classic_desc);
1917                    }
1918                }
1919            }
1920
1921            // Any remaining maybe_classic entries that weren't resolved
1922            // by the classic fallback (shouldn't happen, but be safe).
1923            for (_, desc) in maybe_classic {
1924                all_results.push(desc);
1925            }
1926        }
1927
1928        info!("Described {} consumer groups", all_results.len());
1929        Ok(all_results)
1930    }
1931
1932    /// List all consumer groups on the cluster.
1933    ///
1934    /// Returns a list of all consumer groups with their protocol types.
1935    ///
1936    /// # Example
1937    /// ```ignore
1938    /// let groups = admin.list_consumer_groups().await?;
1939    /// for group in &groups {
1940    ///     println!("{} ({})", group.group_id, group.protocol_type);
1941    /// }
1942    /// ```
1943    pub async fn list_consumer_groups(&self) -> Result<Vec<ConsumerGroupListing>> {
1944        self.check_not_closed()?;
1945        let brokers = self.metadata.brokers();
1946        if brokers.is_empty() {
1947            return Err(KrafkaError::broker(
1948                crate::error::ErrorCode::UnknownServerError,
1949                "no brokers available",
1950            ));
1951        }
1952
1953        // ListGroups returns groups managed by each broker, so we query all brokers
1954        let mut all_groups = Vec::new();
1955        let mut seen_ids = HashSet::new();
1956        let mut broker_failures = 0usize;
1957        let broker_count = brokers.len();
1958
1959        for broker in &brokers {
1960            let conn = match self
1961                .pool
1962                .get_connection_by_id(broker.id, broker.address())
1963                .await
1964            {
1965                Ok(c) => c,
1966                Err(e) => {
1967                    warn!(
1968                        "Failed to connect to broker {} for ListGroups, skipping: {}",
1969                        broker.id, e
1970                    );
1971                    broker_failures += 1;
1972                    continue;
1973                }
1974            };
1975
1976            let request = ListGroupsRequest {
1977                states_filter: Vec::new(),
1978                types_filter: Vec::new(),
1979            };
1980
1981            let version = match conn
1982                .negotiate_api_version(
1983                    ApiKey::ListGroups,
1984                    versions::LIST_GROUPS_MAX,
1985                    versions::LIST_GROUPS_MIN,
1986                )
1987                .await
1988            {
1989                Some(v) => v,
1990                None => {
1991                    warn!(
1992                        "No mutually supported ListGroups API version for broker {}, skipping",
1993                        broker.id
1994                    );
1995                    broker_failures += 1;
1996                    continue;
1997                }
1998            };
1999
2000            let response_bytes = match conn
2001                .send_request(ApiKey::ListGroups, version, |buf| {
2002                    request.encode_versioned(version, buf)
2003                })
2004                .await
2005            {
2006                Ok(r) => r,
2007                Err(e) => {
2008                    warn!("ListGroups RPC failed on broker {}: {}", broker.id, e);
2009                    broker_failures += 1;
2010                    continue;
2011                }
2012            };
2013
2014            let mut buf = response_bytes;
2015            let response = match ListGroupsResponse::decode_versioned(version, &mut buf) {
2016                Ok(r) => r,
2017                Err(e) => {
2018                    warn!("ListGroups decode failed on broker {}: {}", broker.id, e);
2019                    broker_failures += 1;
2020                    continue;
2021                }
2022            };
2023
2024            if !response.error_code.is_ok() {
2025                tracing::warn!(
2026                    "ListGroups error on broker {}: {:?}",
2027                    broker.id,
2028                    response.error_code
2029                );
2030                broker_failures += 1;
2031                continue;
2032            }
2033
2034            for group in response.groups {
2035                if seen_ids.insert(group.group_id.clone()) {
2036                    let group_type = group.group_type.map(|t| match t.as_str() {
2037                        "classic" => GroupType::Classic,
2038                        "consumer" => GroupType::Consumer,
2039                        other => GroupType::Unknown(other.to_string()),
2040                    });
2041                    all_groups.push(ConsumerGroupListing {
2042                        group_id: group.group_id,
2043                        protocol_type: group.protocol_type,
2044                        group_type,
2045                    });
2046                }
2047            }
2048        }
2049
2050        if broker_failures == broker_count {
2051            return Err(KrafkaError::invalid_state(
2052                "list_consumer_groups failed: all brokers returned errors",
2053            ));
2054        }
2055
2056        if broker_failures > 0 {
2057            warn!(
2058                "list_consumer_groups: {broker_failures}/{broker_count} brokers failed; \
2059                 results may be incomplete"
2060            );
2061        }
2062
2063        info!("Listed {} consumer groups", all_groups.len());
2064        Ok(all_groups)
2065    }
2066
2067    /// Delete records from topic partitions before the specified offsets.
2068    ///
2069    /// Records with offsets less than the specified offset for each partition
2070    /// will be marked for deletion. This adjusts the log start offset.
2071    ///
2072    /// # Arguments
2073    /// * `offsets` - Map of (topic, partition) to the offset before which to delete
2074    /// * `timeout` - Operation timeout
2075    ///
2076    /// # Example
2077    /// ```ignore
2078    /// use std::collections::HashMap;
2079    /// let mut offsets = HashMap::new();
2080    /// offsets.insert(("my-topic".to_string(), 0), 100i64);
2081    /// let results = admin.delete_records(offsets, Duration::from_secs(30)).await?;
2082    /// ```
2083    pub async fn delete_records(
2084        &self,
2085        offsets: HashMap<(String, i32), i64>,
2086        timeout: Duration,
2087    ) -> Result<Vec<DeleteRecordResult>> {
2088        self.check_not_closed()?;
2089        // H6: reject oversize topic names before any encoder reaches them.
2090        for (topic, _) in offsets.keys() {
2091            validate_topic_name(topic)?;
2092        }
2093
2094        for attempt in 0u8..2 {
2095            if attempt == 1 {
2096                let topics: Vec<&str> = offsets.keys().map(|(t, _)| t.as_str()).collect();
2097                let _ = self.metadata.refresh_for_topics(Some(&topics)).await;
2098            }
2099
2100            let brokers = self.metadata.brokers();
2101            if brokers.is_empty() {
2102                return Err(KrafkaError::broker(
2103                    crate::error::ErrorCode::UnknownServerError,
2104                    "no brokers available",
2105                ));
2106            }
2107
2108            // Group offsets by partition leader
2109            let mut leader_offsets: HashMap<i32, HashMap<String, Vec<DeleteRecordsPartition>>> =
2110                HashMap::new();
2111            let fallback_broker_id = brokers[0].id;
2112
2113            for ((topic, partition), offset) in &offsets {
2114                let leader_id = self
2115                    .metadata
2116                    .leader(topic, *partition)
2117                    .unwrap_or(fallback_broker_id);
2118                leader_offsets
2119                    .entry(leader_id)
2120                    .or_default()
2121                    .entry(topic.clone())
2122                    .or_default()
2123                    .push(DeleteRecordsPartition {
2124                        partition_index: *partition,
2125                        offset: *offset,
2126                    });
2127            }
2128
2129            let mut results = Vec::new();
2130            let mut has_stale_leader = false;
2131
2132            for (broker_id, topics_map) in leader_offsets {
2133                let broker = brokers
2134                    .iter()
2135                    .find(|b| b.id == broker_id)
2136                    .unwrap_or(&brokers[0]);
2137                let conn = self
2138                    .pool
2139                    .get_connection_by_id(broker.id, broker.address())
2140                    .await?;
2141
2142                let request = DeleteRecordsRequest {
2143                    topics: topics_map
2144                        .into_iter()
2145                        .map(|(name, partitions)| DeleteRecordsTopic { name, partitions })
2146                        .collect(),
2147                    timeout_ms: crate::util::duration_to_millis_i32(timeout),
2148                };
2149
2150                let version = conn
2151                    .negotiate_api_version(
2152                        ApiKey::DeleteRecords,
2153                        versions::DELETE_RECORDS_MAX,
2154                        versions::DELETE_RECORDS_MIN,
2155                    )
2156                    .await
2157                    .ok_or_else(|| {
2158                        KrafkaError::protocol("no mutually supported DeleteRecords API version")
2159                    })?;
2160
2161                let response_bytes = conn
2162                    .send_request(ApiKey::DeleteRecords, version, |buf| {
2163                        request.encode_versioned(version, buf)
2164                    })
2165                    .await?;
2166
2167                let mut buf = response_bytes;
2168                let response = DeleteRecordsResponse::decode_versioned(version, &mut buf)?;
2169
2170                for topic in response.topics {
2171                    let topic_name = topic.name;
2172                    for partition in topic.partitions {
2173                        if partition.error_code == crate::error::ErrorCode::NotLeaderForPartition {
2174                            has_stale_leader = true;
2175                        }
2176                        results.push(DeleteRecordResult {
2177                            topic: topic_name.clone(),
2178                            partition: partition.partition_index,
2179                            low_watermark: partition.low_watermark,
2180                            error: if partition.error_code.is_ok() {
2181                                None
2182                            } else {
2183                                Some(format!("{:?}", partition.error_code))
2184                            },
2185                        });
2186                    }
2187                }
2188            }
2189
2190            if has_stale_leader && attempt == 0 {
2191                warn!(
2192                    "NotLeaderForPartition in DeleteRecords response, retrying with refreshed metadata"
2193                );
2194                continue;
2195            }
2196
2197            info!("Deleted records from {} partition(s)", results.len());
2198            return Ok(results);
2199        }
2200        Err(KrafkaError::protocol(
2201            "DeleteRecords retry loop exhausted after metadata refresh",
2202        ))
2203    }
2204
2205    /// Get the end offset for each partition at the given leader epoch.
2206    ///
2207    /// This is used to detect log truncation after a leader change. For each
2208    /// topic-partition, the broker returns the end offset for the requested
2209    /// leader epoch. If the epoch is no longer valid, the broker returns
2210    /// the epoch and offset where the log was truncated.
2211    ///
2212    /// # Arguments
2213    /// * `partitions` - List of (topic, partition, leader_epoch) tuples
2214    ///
2215    /// # Example
2216    /// ```ignore
2217    /// let results = admin.offset_for_leader_epoch(
2218    ///     vec![("my-topic".to_string(), 0, 5)]
2219    /// ).await?;
2220    /// for r in &results {
2221    ///     println!("{}:{} epoch={} end_offset={}", r.topic, r.partition, r.leader_epoch, r.end_offset);
2222    /// }
2223    /// ```
2224    pub async fn offset_for_leader_epoch(
2225        &self,
2226        partitions: Vec<(String, i32, i32)>,
2227    ) -> Result<Vec<LeaderEpochResult>> {
2228        self.check_not_closed()?;
2229        // H6: reject oversize topic names at ingress.
2230        for (topic, _, _) in &partitions {
2231            validate_topic_name(topic)?;
2232        }
2233
2234        for attempt in 0u8..2 {
2235            if attempt == 1 {
2236                let topics: Vec<&str> = partitions.iter().map(|(t, _, _)| t.as_str()).collect();
2237                let _ = self.metadata.refresh_for_topics(Some(&topics)).await;
2238            }
2239
2240            let brokers = self.metadata.brokers();
2241            if brokers.is_empty() {
2242                return Err(KrafkaError::broker(
2243                    crate::error::ErrorCode::UnknownServerError,
2244                    "no brokers available",
2245                ));
2246            }
2247
2248            // Group partitions by their leader broker
2249            let fallback_broker_id = brokers[0].id;
2250            let mut leader_partitions: HashMap<
2251                i32,
2252                HashMap<String, Vec<OffsetForLeaderEpochPartition>>,
2253            > = HashMap::new();
2254
2255            for (topic, partition, leader_epoch) in &partitions {
2256                let leader_id = self
2257                    .metadata
2258                    .leader(topic, *partition)
2259                    .unwrap_or(fallback_broker_id);
2260                leader_partitions
2261                    .entry(leader_id)
2262                    .or_default()
2263                    .entry(topic.clone())
2264                    .or_default()
2265                    .push(OffsetForLeaderEpochPartition {
2266                        partition: *partition,
2267                        current_leader_epoch: -1, // consumer perspective
2268                        leader_epoch: *leader_epoch,
2269                    });
2270            }
2271
2272            let mut results = Vec::new();
2273            let mut has_stale_leader = false;
2274
2275            for (broker_id, topics_map) in leader_partitions {
2276                let broker = brokers
2277                    .iter()
2278                    .find(|b| b.id == broker_id)
2279                    .unwrap_or(&brokers[0]);
2280                let conn = self
2281                    .pool
2282                    .get_connection_by_id(broker.id, broker.address())
2283                    .await?;
2284
2285                let request = OffsetForLeaderEpochRequest {
2286                    replica_id: -1, // -1 for consumer
2287                    topics: topics_map
2288                        .into_iter()
2289                        .map(|(topic, partitions)| OffsetForLeaderEpochTopic { topic, partitions })
2290                        .collect(),
2291                };
2292
2293                let version = conn
2294                    .negotiate_api_version(
2295                        ApiKey::OffsetForLeaderEpoch,
2296                        versions::OFFSET_FOR_LEADER_EPOCH_MAX,
2297                        versions::OFFSET_FOR_LEADER_EPOCH_MIN,
2298                    )
2299                    .await
2300                    .ok_or_else(|| {
2301                        KrafkaError::protocol(
2302                            "no mutually supported OffsetForLeaderEpoch API version",
2303                        )
2304                    })?;
2305
2306                let response_bytes = conn
2307                    .send_request(ApiKey::OffsetForLeaderEpoch, version, |buf| {
2308                        request.encode_versioned(version, buf)
2309                    })
2310                    .await?;
2311
2312                let mut buf = response_bytes;
2313                let response = OffsetForLeaderEpochResponse::decode_versioned(version, &mut buf)?;
2314
2315                for topic in response.topics {
2316                    let topic_name = topic.topic;
2317                    for partition in topic.partitions {
2318                        if partition.error_code == crate::error::ErrorCode::NotLeaderForPartition {
2319                            has_stale_leader = true;
2320                        }
2321                        results.push(LeaderEpochResult {
2322                            topic: topic_name.clone(),
2323                            partition: partition.partition,
2324                            leader_epoch: partition.leader_epoch,
2325                            end_offset: partition.end_offset,
2326                            error: if partition.error_code.is_ok() {
2327                                None
2328                            } else {
2329                                Some(format!("{:?}", partition.error_code))
2330                            },
2331                        });
2332                    }
2333                }
2334            }
2335
2336            if has_stale_leader && attempt == 0 {
2337                warn!(
2338                    "NotLeaderForPartition in OffsetForLeaderEpoch response, retrying with refreshed metadata"
2339                );
2340                continue;
2341            }
2342
2343            info!(
2344                "Got leader epoch offsets for {} partition(s)",
2345                results.len()
2346            );
2347            return Ok(results);
2348        }
2349        Err(KrafkaError::protocol(
2350            "OffsetForLeaderEpoch retry loop exhausted after metadata refresh",
2351        ))
2352    }
2353
2354    // ── Delegation Tokens ────────────────────────────────────────────────
2355
2356    /// Create a delegation token.
2357    ///
2358    /// Delegation tokens allow a principal to delegate authentication to
2359    /// another principal without sharing credentials (KIP-48). The token
2360    /// HMAC can be used for SASL/SCRAM authentication.
2361    ///
2362    /// # Arguments
2363    ///
2364    /// * `renewers` - Principals authorized to renew the token (type, name pairs).
2365    ///   Pass an empty slice to allow only the token owner to renew.
2366    /// * `max_lifetime` - Maximum token lifetime. Use `None` for the server
2367    ///   default (typically 7 days).
2368    pub async fn create_delegation_token(
2369        &self,
2370        renewers: &[(&str, &str)],
2371        max_lifetime: Option<Duration>,
2372    ) -> Result<CreateDelegationTokenResult> {
2373        let conn = self.get_any_broker_connection().await?;
2374
2375        let request = CreateDelegationTokenRequest {
2376            renewers: renewers
2377                .iter()
2378                .map(|(t, n)| CreatableRenewer {
2379                    principal_type: t.to_string(),
2380                    principal_name: n.to_string(),
2381                })
2382                .collect(),
2383            max_lifetime_ms: max_lifetime
2384                .map(crate::util::duration_to_millis_i64)
2385                .unwrap_or(-1),
2386            owner_principal_type: None,
2387            owner_principal_name: None,
2388        };
2389
2390        let version = conn
2391            .negotiate_api_version(
2392                ApiKey::CreateDelegationToken,
2393                versions::CREATE_DELEGATION_TOKEN_MAX,
2394                versions::CREATE_DELEGATION_TOKEN_MIN,
2395            )
2396            .await
2397            .ok_or_else(|| {
2398                KrafkaError::protocol("no mutually supported CreateDelegationToken API version")
2399            })?;
2400
2401        let response_bytes = conn
2402            .send_request(ApiKey::CreateDelegationToken, version, |buf| {
2403                request.encode_versioned(version, buf)
2404            })
2405            .await?;
2406
2407        let mut buf = response_bytes;
2408        let response = CreateDelegationTokenResponse::decode_versioned(version, &mut buf)?;
2409
2410        let result = if response.error_code.is_ok() {
2411            info!("Created delegation token");
2412            CreateDelegationTokenResult {
2413                token: Some(DelegationToken {
2414                    principal_type: response.principal_type,
2415                    principal_name: response.principal_name,
2416                    issue_timestamp_ms: response.issue_timestamp_ms,
2417                    expiry_timestamp_ms: response.expiry_timestamp_ms,
2418                    max_timestamp_ms: response.max_timestamp_ms,
2419                    token_id: response.token_id,
2420                    hmac: response.hmac,
2421                    renewers: Vec::new(),
2422                }),
2423                error: None,
2424            }
2425        } else {
2426            CreateDelegationTokenResult {
2427                token: None,
2428                error: Some(format!("{:?}", response.error_code)),
2429            }
2430        };
2431
2432        Ok(result)
2433    }
2434
2435    /// Renew a delegation token, extending its expiry time.
2436    ///
2437    /// # Arguments
2438    ///
2439    /// * `hmac` - HMAC of the token to renew (from [`DelegationToken::hmac`]).
2440    /// * `renew_period` - How long to extend the token's lifetime.
2441    pub async fn renew_delegation_token(
2442        &self,
2443        hmac: &[u8],
2444        renew_period: Duration,
2445    ) -> Result<RenewDelegationTokenResult> {
2446        let conn = self.get_any_broker_connection().await?;
2447
2448        let request = RenewDelegationTokenRequest {
2449            hmac: Bytes::copy_from_slice(hmac),
2450            renew_period_ms: crate::util::duration_to_millis_i64(renew_period),
2451        };
2452
2453        let version = conn
2454            .negotiate_api_version(
2455                ApiKey::RenewDelegationToken,
2456                versions::RENEW_DELEGATION_TOKEN_MAX,
2457                versions::RENEW_DELEGATION_TOKEN_MIN,
2458            )
2459            .await
2460            .ok_or_else(|| {
2461                KrafkaError::protocol("no mutually supported RenewDelegationToken API version")
2462            })?;
2463
2464        let response_bytes = conn
2465            .send_request(ApiKey::RenewDelegationToken, version, |buf| {
2466                request.encode_versioned(version, buf)
2467            })
2468            .await?;
2469
2470        let mut buf = response_bytes;
2471        let response = RenewDelegationTokenResponse::decode_versioned(version, &mut buf)?;
2472
2473        if response.error_code.is_ok() {
2474            info!("Renewed delegation token");
2475        }
2476
2477        Ok(RenewDelegationTokenResult {
2478            expiry_timestamp_ms: response.expiry_timestamp_ms,
2479            error: if response.error_code.is_ok() {
2480                None
2481            } else {
2482                Some(format!("{:?}", response.error_code))
2483            },
2484        })
2485    }
2486
2487    /// Expire a delegation token, revoking it before its natural expiry.
2488    ///
2489    /// # Arguments
2490    ///
2491    /// * `hmac` - HMAC of the token to expire (from [`DelegationToken::hmac`]).
2492    /// * `expiry_period` - How long until the token expires. Pass `None` to
2493    ///   expire the token immediately (sends `-1` to the broker).
2494    pub async fn expire_delegation_token(
2495        &self,
2496        hmac: &[u8],
2497        expiry_period: Option<Duration>,
2498    ) -> Result<ExpireDelegationTokenResult> {
2499        let conn = self.get_any_broker_connection().await?;
2500
2501        let request = ExpireDelegationTokenRequest {
2502            hmac: Bytes::copy_from_slice(hmac),
2503            expiry_period_ms: expiry_period
2504                .map(crate::util::duration_to_millis_i64)
2505                .unwrap_or(-1),
2506        };
2507
2508        let version = conn
2509            .negotiate_api_version(
2510                ApiKey::ExpireDelegationToken,
2511                versions::EXPIRE_DELEGATION_TOKEN_MAX,
2512                versions::EXPIRE_DELEGATION_TOKEN_MIN,
2513            )
2514            .await
2515            .ok_or_else(|| {
2516                KrafkaError::protocol("no mutually supported ExpireDelegationToken API version")
2517            })?;
2518
2519        let response_bytes = conn
2520            .send_request(ApiKey::ExpireDelegationToken, version, |buf| {
2521                request.encode_versioned(version, buf)
2522            })
2523            .await?;
2524
2525        let mut buf = response_bytes;
2526        let response = ExpireDelegationTokenResponse::decode_versioned(version, &mut buf)?;
2527
2528        if response.error_code.is_ok() {
2529            info!("Expired delegation token");
2530        }
2531
2532        Ok(ExpireDelegationTokenResult {
2533            expiry_timestamp_ms: response.expiry_timestamp_ms,
2534            error: if response.error_code.is_ok() {
2535                None
2536            } else {
2537                Some(format!("{:?}", response.error_code))
2538            },
2539        })
2540    }
2541
2542    /// Describe delegation tokens visible to the caller.
2543    ///
2544    /// # Arguments
2545    ///
2546    /// * `owners` - Filter by token owners (type, name pairs). Pass `None`
2547    ///   to return all tokens visible to the caller.
2548    pub async fn describe_delegation_token(
2549        &self,
2550        owners: Option<&[(&str, &str)]>,
2551    ) -> Result<Vec<DelegationToken>> {
2552        let conn = self.get_any_broker_connection().await?;
2553
2554        let request = DescribeDelegationTokenRequest {
2555            owners: owners.map(|o| {
2556                o.iter()
2557                    .map(|(t, n)| DescribeDelegationTokenOwner {
2558                        principal_type: t.to_string(),
2559                        principal_name: n.to_string(),
2560                    })
2561                    .collect()
2562            }),
2563        };
2564
2565        let version = conn
2566            .negotiate_api_version(
2567                ApiKey::DescribeDelegationToken,
2568                versions::DESCRIBE_DELEGATION_TOKEN_MAX,
2569                versions::DESCRIBE_DELEGATION_TOKEN_MIN,
2570            )
2571            .await
2572            .ok_or_else(|| {
2573                KrafkaError::protocol("no mutually supported DescribeDelegationToken API version")
2574            })?;
2575
2576        let response_bytes = conn
2577            .send_request(ApiKey::DescribeDelegationToken, version, |buf| {
2578                request.encode_versioned(version, buf)
2579            })
2580            .await?;
2581
2582        let mut buf = response_bytes;
2583        let response = DescribeDelegationTokenResponse::decode_versioned(version, &mut buf)?;
2584
2585        if !response.error_code.is_ok() {
2586            return Err(KrafkaError::broker(
2587                response.error_code,
2588                "DescribeDelegationToken failed",
2589            ));
2590        }
2591
2592        let tokens: Vec<DelegationToken> = response
2593            .tokens
2594            .into_iter()
2595            .map(|t| DelegationToken {
2596                principal_type: t.principal_type,
2597                principal_name: t.principal_name,
2598                issue_timestamp_ms: t.issue_timestamp_ms,
2599                expiry_timestamp_ms: t.expiry_timestamp_ms,
2600                max_timestamp_ms: t.max_timestamp_ms,
2601                token_id: t.token_id,
2602                hmac: t.hmac,
2603                renewers: t
2604                    .renewers
2605                    .into_iter()
2606                    .map(|r| DelegationTokenRenewer {
2607                        principal_type: r.principal_type,
2608                        principal_name: r.principal_name,
2609                    })
2610                    .collect(),
2611            })
2612            .collect();
2613
2614        info!("Described {} delegation token(s)", tokens.len());
2615        Ok(tokens)
2616    }
2617
2618    // ── Client Quotas ────────────────────────────────────────────────────
2619
2620    /// Describe client quotas matching the given filter.
2621    ///
2622    /// # Arguments
2623    ///
2624    /// * `components` - Filter components. Each component specifies an entity
2625    ///   type and match criteria. The broker returns entities matching **all**
2626    ///   components.
2627    /// * `strict` - If `true`, exclude entities with unspecified entity types
2628    ///   (i.e., only return entities that exactly match all given component types).
2629    ///
2630    /// # Filter Match Types
2631    ///
2632    /// Each component has a `match_type`:
2633    /// - `0` (exact): match the entity with the given name
2634    /// - `1` (default): match the default entity for this type
2635    /// - `2` (any specified): match any entity with a name (non-default)
2636    ///
2637    /// # Example
2638    ///
2639    /// ```rust,ignore
2640    /// // Describe all quotas for user "alice"
2641    /// let results = admin.describe_client_quotas(
2642    ///     &[("user", 0, Some("alice"))],
2643    ///     false,
2644    /// ).await?;
2645    /// ```
2646    pub async fn describe_client_quotas(
2647        &self,
2648        components: &[(&str, i8, Option<&str>)],
2649        strict: bool,
2650    ) -> Result<DescribeClientQuotasResult> {
2651        let conn = self.get_any_broker_connection().await?;
2652
2653        let request = DescribeClientQuotasRequest {
2654            components: components
2655                .iter()
2656                .map(
2657                    |(entity_type, match_type, match_value)| QuotaFilterComponent {
2658                        entity_type: entity_type.to_string(),
2659                        match_type: *match_type,
2660                        match_value: match_value.map(|v| v.to_string()),
2661                    },
2662                )
2663                .collect(),
2664            strict,
2665        };
2666
2667        let version = conn
2668            .negotiate_api_version(
2669                ApiKey::DescribeClientQuotas,
2670                versions::DESCRIBE_CLIENT_QUOTAS_MAX,
2671                versions::DESCRIBE_CLIENT_QUOTAS_MIN,
2672            )
2673            .await
2674            .ok_or_else(|| {
2675                KrafkaError::protocol("no mutually supported DescribeClientQuotas API version")
2676            })?;
2677
2678        let response_bytes = conn
2679            .send_request(ApiKey::DescribeClientQuotas, version, |buf| {
2680                request.encode_versioned(version, buf)
2681            })
2682            .await?;
2683
2684        let mut buf = response_bytes;
2685        let response = DescribeClientQuotasResponse::decode_versioned(version, &mut buf)?;
2686
2687        let entries = response
2688            .entries
2689            .unwrap_or_default()
2690            .into_iter()
2691            .map(|entry| QuotaDescription {
2692                entity: entry
2693                    .entity
2694                    .into_iter()
2695                    .map(|e| QuotaEntityComponent {
2696                        entity_type: e.entity_type,
2697                        entity_name: e.entity_name,
2698                    })
2699                    .collect(),
2700                values: entry
2701                    .values
2702                    .into_iter()
2703                    .map(|v| QuotaConfig {
2704                        key: v.key,
2705                        value: v.value,
2706                    })
2707                    .collect(),
2708            })
2709            .collect::<Vec<_>>();
2710
2711        if response.error_code.is_ok() {
2712            info!("Described {} client quota entry(ies)", entries.len());
2713        }
2714
2715        Ok(DescribeClientQuotasResult {
2716            entries,
2717            error: if response.error_code.is_ok() {
2718                None
2719            } else {
2720                let msg = response
2721                    .error_message
2722                    .unwrap_or_else(|| format!("{:?}", response.error_code));
2723                Some(msg)
2724            },
2725        })
2726    }
2727
2728    /// Alter client quotas.
2729    ///
2730    /// Each entry specifies an entity (user, client-id, ip) and a set of
2731    /// quota operations (set or remove). Results are returned per-entity.
2732    ///
2733    /// # Arguments
2734    ///
2735    /// * `entries` - Quota alterations. Each entry has an entity and operations.
2736    /// * `validate_only` - If `true`, validate the request without applying changes.
2737    ///
2738    /// # Example
2739    ///
2740    /// ```rust,ignore
2741    /// use krafka::admin::QuotaAlteration;
2742    ///
2743    /// // Set producer byte rate quota for user "alice"
2744    /// let results = admin.alter_client_quotas(
2745    ///     &[QuotaAlteration {
2746    ///         entity: vec![("user", Some("alice"))],
2747    ///         ops: vec![("producer_byte_rate", Some(1_048_576.0))],
2748    ///     }],
2749    ///     false,
2750    /// ).await?;
2751    /// ```
2752    pub async fn alter_client_quotas(
2753        &self,
2754        entries: &[QuotaAlteration<'_>],
2755        validate_only: bool,
2756    ) -> Result<Vec<AlterClientQuotaResult>> {
2757        let conn = self.get_any_broker_connection().await?;
2758
2759        let request = AlterClientQuotasRequest {
2760            entries: entries
2761                .iter()
2762                .map(|e| AlterQuotaEntry {
2763                    entity: e
2764                        .entity
2765                        .iter()
2766                        .map(|(t, n)| AlterQuotaEntity {
2767                            entity_type: t.to_string(),
2768                            entity_name: n.map(|v| v.to_string()),
2769                        })
2770                        .collect(),
2771                    ops: e
2772                        .ops
2773                        .iter()
2774                        .map(|(key, value)| AlterQuotaOp {
2775                            key: key.to_string(),
2776                            value: value.unwrap_or(0.0),
2777                            remove: value.is_none(),
2778                        })
2779                        .collect(),
2780                })
2781                .collect(),
2782            validate_only,
2783        };
2784
2785        let version = conn
2786            .negotiate_api_version(
2787                ApiKey::AlterClientQuotas,
2788                versions::ALTER_CLIENT_QUOTAS_MAX,
2789                versions::ALTER_CLIENT_QUOTAS_MIN,
2790            )
2791            .await
2792            .ok_or_else(|| {
2793                KrafkaError::protocol("no mutually supported AlterClientQuotas API version")
2794            })?;
2795
2796        let response_bytes = conn
2797            .send_request(ApiKey::AlterClientQuotas, version, |buf| {
2798                request.encode_versioned(version, buf)
2799            })
2800            .await?;
2801
2802        let mut buf = response_bytes;
2803        let response = AlterClientQuotasResponse::decode_versioned(version, &mut buf)?;
2804
2805        let results: Vec<AlterClientQuotaResult> = response
2806            .entries
2807            .into_iter()
2808            .map(|entry| AlterClientQuotaResult {
2809                entity: entry
2810                    .entity
2811                    .into_iter()
2812                    .map(|e| QuotaEntityComponent {
2813                        entity_type: e.entity_type,
2814                        entity_name: e.entity_name,
2815                    })
2816                    .collect(),
2817                error: if entry.error_code.is_ok() {
2818                    None
2819                } else {
2820                    let msg = entry
2821                        .error_message
2822                        .unwrap_or_else(|| format!("{:?}", entry.error_code));
2823                    Some(msg)
2824                },
2825            })
2826            .collect();
2827
2828        info!("Altered {} client quota entry(ies)", results.len());
2829        Ok(results)
2830    }
2831
2832    /// Delete consumer groups by ID.
2833    ///
2834    /// Returns one [`DeleteGroupResult`] per group. Each result may contain
2835    /// an error if that particular group could not be deleted (e.g., it has
2836    /// active members).
2837    pub async fn delete_consumer_groups(
2838        &self,
2839        group_ids: Vec<String>,
2840    ) -> Result<Vec<DeleteGroupResult>> {
2841        self.check_not_closed()?;
2842        let conn = self.get_any_broker_connection().await?;
2843
2844        let request = DeleteGroupsRequest::new(group_ids);
2845        let version = conn
2846            .negotiate_api_version(
2847                ApiKey::DeleteGroups,
2848                versions::DELETE_GROUPS_MAX,
2849                versions::DELETE_GROUPS_MIN,
2850            )
2851            .await
2852            .ok_or_else(|| {
2853                KrafkaError::protocol("no mutually supported DeleteGroups API version")
2854            })?;
2855
2856        let response_bytes = conn
2857            .send_request(ApiKey::DeleteGroups, version, |buf| {
2858                request.encode_versioned(version, buf)
2859            })
2860            .await?;
2861
2862        let mut buf = response_bytes;
2863        let response = DeleteGroupsResponse::decode_versioned(version, &mut buf)?;
2864
2865        let results = response
2866            .results
2867            .into_iter()
2868            .map(|r| DeleteGroupResult {
2869                group_id: r.group_id,
2870                error: if r.error_code.is_ok() {
2871                    None
2872                } else {
2873                    Some(format!("{:?}", r.error_code))
2874                },
2875            })
2876            .collect();
2877
2878        Ok(results)
2879    }
2880
2881    /// Describe topic partitions using the DescribeTopicPartitions API (Key 75).
2882    ///
2883    /// Returns detailed per-partition information including leader, replicas, ISR,
2884    /// eligible leader replicas (ELR), and offline replicas. Supports pagination
2885    /// for topics with many partitions.
2886    ///
2887    /// # Example
2888    /// ```ignore
2889    /// let result = admin
2890    ///     .describe_topic_partitions(vec!["my-topic".to_string()])
2891    ///     .await?;
2892    /// for topic in &result.topics {
2893    ///     println!("{}: {} partitions", topic.name.as_deref().unwrap_or("?"), topic.partitions.len());
2894    ///     for p in &topic.partitions {
2895    ///         println!("  partition {}: leader={}, isr={:?}", p.partition_index, p.leader_id, p.isr_nodes);
2896    ///     }
2897    /// }
2898    /// ```
2899    pub async fn describe_topic_partitions(
2900        &self,
2901        topics: Vec<String>,
2902    ) -> Result<DescribeTopicPartitionsResult> {
2903        self.check_not_closed()?;
2904        // H6: reject oversize topic names at ingress.
2905        validate_topic_names(topics.iter().map(String::as_str))?;
2906        let conn = self.get_any_broker_connection().await?;
2907
2908        let version = conn
2909            .negotiate_api_version(
2910                ApiKey::DescribeTopicPartitions,
2911                versions::DESCRIBE_TOPIC_PARTITIONS_MAX,
2912                versions::DESCRIBE_TOPIC_PARTITIONS_MIN,
2913            )
2914            .await
2915            .ok_or_else(|| {
2916                KrafkaError::protocol("no mutually supported DescribeTopicPartitions API version")
2917            })?;
2918
2919        // Collect all pages into a single result.
2920        let mut all_topics: Vec<TopicPartitionDescription> = Vec::new();
2921        let mut cursor = None;
2922
2923        loop {
2924            let request = DescribeTopicPartitionsRequest {
2925                topics: topics.clone(),
2926                response_partition_limit: DEFAULT_RESPONSE_PARTITION_LIMIT,
2927                cursor,
2928            };
2929
2930            let response_bytes = conn
2931                .send_request(ApiKey::DescribeTopicPartitions, version, |buf| {
2932                    request.encode_versioned(version, buf)
2933                })
2934                .await?;
2935
2936            let mut buf = response_bytes;
2937            let response = DescribeTopicPartitionsResponse::decode_versioned(version, &mut buf)?;
2938
2939            for t in response.topics {
2940                // Find existing topic entry (pagination may split partitions across pages).
2941                // Use topic_id as merge key; Kafka always assigns a non-zero UUID.
2942                // Fall back to name comparison if topic_id is the null UUID (defensive).
2943                let null_uuid = [0u8; 16];
2944                let existing = if t.topic_id != null_uuid {
2945                    all_topics.iter_mut().find(|e| e.topic_id == t.topic_id)
2946                } else {
2947                    all_topics.iter_mut().find(|e| e.name == t.name)
2948                };
2949                let partitions: Vec<PartitionDescription> = t
2950                    .partitions
2951                    .into_iter()
2952                    .map(|p| PartitionDescription {
2953                        partition_index: p.partition_index,
2954                        leader_id: p.leader_id,
2955                        leader_epoch: p.leader_epoch,
2956                        replica_nodes: p.replica_nodes,
2957                        isr_nodes: p.isr_nodes,
2958                        eligible_leader_replicas: p.eligible_leader_replicas,
2959                        last_known_elr: p.last_known_elr,
2960                        offline_replicas: p.offline_replicas,
2961                        error: if p.error_code.is_ok() {
2962                            None
2963                        } else {
2964                            Some(format!("{:?}", p.error_code))
2965                        },
2966                    })
2967                    .collect();
2968
2969                if let Some(entry) = existing {
2970                    entry.partitions.extend(partitions);
2971                } else {
2972                    all_topics.push(TopicPartitionDescription {
2973                        name: t.name,
2974                        topic_id: t.topic_id,
2975                        is_internal: t.is_internal,
2976                        partitions,
2977                        topic_authorized_operations: t.topic_authorized_operations,
2978                        error: if t.error_code.is_ok() {
2979                            None
2980                        } else {
2981                            Some(format!("{:?}", t.error_code))
2982                        },
2983                    });
2984                }
2985            }
2986
2987            // Check for more pages.
2988            match response.next_cursor {
2989                Some(c) => {
2990                    cursor = Some(DescribeTopicPartitionsCursor {
2991                        topic_name: c.topic_name,
2992                        partition_index: c.partition_index,
2993                    });
2994                }
2995                None => break,
2996            }
2997        }
2998
2999        info!("Described partitions for {} topics", all_topics.len());
3000        Ok(DescribeTopicPartitionsResult {
3001            topics: all_topics,
3002            next_cursor_topic: None,
3003            next_cursor_partition: None,
3004        })
3005    }
3006
3007    /// Get access to the connection pool.
3008    pub fn pool(&self) -> &Arc<ConnectionPool> {
3009        &self.pool
3010    }
3011
3012    /// Replace the bootstrap server list at runtime (KIP-899).
3013    ///
3014    /// The new addresses are used on the next metadata refresh that falls back
3015    /// to bootstrap servers. Does not close existing connections.
3016    ///
3017    /// # Errors
3018    ///
3019    /// Returns an error if `servers` is empty.
3020    pub fn update_seed_brokers(&self, servers: Vec<String>) -> Result<()> {
3021        self.metadata.update_seed_brokers(servers)
3022    }
3023
3024    /// Force a rebootstrap: close all connections, clear the metadata cache,
3025    /// and fall back to bootstrap servers (KIP-899).
3026    pub async fn rebootstrap(&self) {
3027        self.metadata.rebootstrap().await;
3028    }
3029
3030    /// Close the admin client.
3031    ///
3032    /// Sets the closed flag and tears down all broker connections.
3033    /// In-flight RPCs that have not yet received a response will fail
3034    /// with a network error. Callers should ensure long-running admin
3035    /// operations have completed before calling `close()`.
3036    ///
3037    /// Calling `close()` more than once is a no-op.
3038    pub async fn close(&self) {
3039        if self.closed.swap(true, std::sync::atomic::Ordering::SeqCst) {
3040            return;
3041        }
3042        self.pool.close_all().await;
3043        info!("AdminClient closed");
3044    }
3045
3046    /// Check if the admin client is closed.
3047    #[inline]
3048    pub fn is_closed(&self) -> bool {
3049        self.closed.load(std::sync::atomic::Ordering::SeqCst)
3050    }
3051
3052    /// Get the shared connection metrics handle used by this admin client's broker pool.
3053    #[inline]
3054    pub fn connection_metrics(&self) -> Arc<ConnectionMetrics> {
3055        self.pool.metrics()
3056    }
3057
3058    /// Describe broker-supported and cluster-finalized features (KIP-584).
3059    ///
3060    /// Sends an `ApiVersions` request (v3+) to any broker and extracts the
3061    /// feature information from the tagged fields. The response includes:
3062    /// - Features supported by the responding broker (per-broker)
3063    /// - Cluster-wide finalized features and their epoch (cluster-wide)
3064    ///
3065    /// # Example
3066    /// ```ignore
3067    /// let features = admin.describe_features().await?;
3068    /// for f in &features.supported_features {
3069    ///     println!("{}: v{}–v{}", f.name, f.min_version, f.max_version);
3070    /// }
3071    /// for f in &features.finalized_features {
3072    ///     println!("{}: v{}–v{} (finalized)", f.name, f.min_version_level, f.max_version_level);
3073    /// }
3074    /// ```
3075    pub async fn describe_features(&self) -> Result<DescribeFeaturesResult> {
3076        self.check_not_closed()?;
3077        let conn = self.get_any_broker_connection().await?;
3078
3079        let request = crate::protocol::ApiVersionsRequest::new()
3080            .with_client_software("krafka", env!("CARGO_PKG_VERSION"));
3081
3082        let version = conn
3083            .negotiate_api_version(
3084                ApiKey::ApiVersions,
3085                versions::API_VERSIONS_MAX,
3086                // Need v3+ for tagged feature fields
3087                3,
3088            )
3089            .await
3090            .ok_or_else(|| {
3091                KrafkaError::protocol(
3092                    "no mutually supported ApiVersions v3+; feature discovery requires v3+",
3093                )
3094            })?;
3095
3096        let response_bytes = conn
3097            .send_request(ApiKey::ApiVersions, version, |buf| {
3098                if version >= 5 {
3099                    request.encode_v5(buf)
3100                } else {
3101                    request.encode_v3(buf)
3102                }
3103            })
3104            .await?;
3105
3106        let mut buf = response_bytes;
3107        let response = crate::protocol::ApiVersionsResponse::decode_v3(&mut buf)?;
3108
3109        if response.error_code != 0 {
3110            return Err(KrafkaError::broker(
3111                crate::error::ErrorCode::from(response.error_code),
3112                "ApiVersions request failed",
3113            ));
3114        }
3115
3116        Ok(DescribeFeaturesResult {
3117            supported_features: response.supported_features,
3118            finalized_features: response.finalized_features,
3119            finalized_features_epoch: response.finalized_features_epoch,
3120        })
3121    }
3122
3123    /// Update cluster-wide finalized feature version levels (KIP-584).
3124    ///
3125    /// This is a **destructive** operation — downgrades and deletions can be
3126    /// data-lossy. Only the controller broker serves this request; the client
3127    /// sends to any broker, which forwards to the controller.
3128    ///
3129    /// Requires `ALTER` permission on the cluster.
3130    ///
3131    /// # Example
3132    /// ```ignore
3133    /// use krafka::protocol::FeatureUpdateKey;
3134    ///
3135    /// let results = admin.update_features(
3136    ///     vec![FeatureUpdateKey::upgrade("metadata.version", 17)],
3137    ///     false, // validate_only
3138    /// ).await?;
3139    ///
3140    /// for result in &results.results {
3141    ///     if let Some(e) = &result.error {
3142    ///         eprintln!("Failed to update {}: {e}", result.feature);
3143    ///     }
3144    /// }
3145    /// ```
3146    pub async fn update_features(
3147        &self,
3148        feature_updates: Vec<crate::protocol::FeatureUpdateKey>,
3149        validate_only: bool,
3150    ) -> Result<UpdateFeaturesResult> {
3151        self.check_not_closed()?;
3152        let conn = self.get_any_broker_connection().await?;
3153
3154        let request = UpdateFeaturesRequest::new(feature_updates).with_validate_only(validate_only);
3155
3156        let version = conn
3157            .negotiate_api_version(
3158                ApiKey::UpdateFeatures,
3159                versions::UPDATE_FEATURES_MAX,
3160                versions::UPDATE_FEATURES_MIN,
3161            )
3162            .await
3163            .ok_or_else(|| {
3164                KrafkaError::protocol("no mutually supported UpdateFeatures API version")
3165            })?;
3166
3167        // validate_only requires v1+; reject early to avoid silently applying changes
3168        if validate_only && version < 1 {
3169            return Err(KrafkaError::protocol(
3170                "validate_only requires UpdateFeatures v1+, but broker only supports v0",
3171            ));
3172        }
3173
3174        let response_bytes = conn
3175            .send_request(ApiKey::UpdateFeatures, version, |buf| {
3176                request.encode_versioned(version, buf)
3177            })
3178            .await?;
3179
3180        let mut buf = response_bytes;
3181        let response = UpdateFeaturesResponse::decode_versioned(version, &mut buf)?;
3182
3183        if !response.is_ok() {
3184            let msg = response
3185                .error_message
3186                .unwrap_or_else(|| format!("{:?}", response.error_code));
3187            return Err(KrafkaError::protocol(msg));
3188        }
3189
3190        Ok(UpdateFeaturesResult {
3191            results: response
3192                .results
3193                .into_iter()
3194                .map(|r| UpdateFeatureResult {
3195                    feature: r.feature,
3196                    error: if r.error_code.is_ok() {
3197                        None
3198                    } else {
3199                        Some(
3200                            r.error_message
3201                                .unwrap_or_else(|| format!("{:?}", r.error_code)),
3202                        )
3203                    },
3204                })
3205                .collect(),
3206        })
3207    }
3208
3209    /// Describe log directories on all known brokers.
3210    ///
3211    /// Each broker maintains one or more log directories; this method queries
3212    /// every broker and returns per-directory information including sizes,
3213    /// partition assignments, and (v4+) volume capacity.
3214    ///
3215    /// Pass `None` for `topics` to describe **all** partitions on every
3216    /// broker, or pass a list of [`DescribableLogDirTopic`] to filter.
3217    ///
3218    /// # Example
3219    /// ```ignore
3220    /// // Describe all log dirs on every broker
3221    /// let dirs = admin.describe_log_dirs(None).await?;
3222    /// for dir in &dirs {
3223    ///     println!("broker {} dir {}: {:?}", dir.broker_id, dir.log_dir, dir.error);
3224    /// }
3225    ///
3226    /// // Describe specific topic partitions
3227    /// use krafka::protocol::DescribableLogDirTopic;
3228    /// let filter = vec![DescribableLogDirTopic {
3229    ///     topic: "my-topic".into(),
3230    ///     partitions: vec![0, 1, 2],
3231    /// }];
3232    /// let dirs = admin.describe_log_dirs(Some(filter)).await?;
3233    /// ```
3234    pub async fn describe_log_dirs(
3235        &self,
3236        topics: Option<Vec<DescribableLogDirTopic>>,
3237    ) -> Result<Vec<LogDirInfo>> {
3238        self.check_not_closed()?;
3239        // H6: reject oversize topic names at ingress.
3240        if let Some(ref ts) = topics {
3241            for t in ts {
3242                validate_topic_name(&t.topic)?;
3243            }
3244        }
3245        let brokers = self.metadata.brokers();
3246        if brokers.is_empty() {
3247            return Err(KrafkaError::broker(
3248                crate::error::ErrorCode::UnknownServerError,
3249                "no brokers available",
3250            ));
3251        }
3252
3253        let topic_scope = match &topics {
3254            None => "all".to_string(),
3255            Some(t) => format!("{} topic(s)", t.len()),
3256        };
3257
3258        let request = match &topics {
3259            None => DescribeLogDirsRequest::all(),
3260            Some(t) => DescribeLogDirsRequest::for_topics(t.clone()),
3261        };
3262
3263        let mut all_dirs = Vec::new();
3264
3265        for broker in &brokers {
3266            let conn = self
3267                .pool
3268                .get_connection_by_id(broker.id, broker.address())
3269                .await?;
3270
3271            let version = conn
3272                .negotiate_api_version(
3273                    ApiKey::DescribeLogDirs,
3274                    versions::DESCRIBE_LOG_DIRS_MAX,
3275                    versions::DESCRIBE_LOG_DIRS_MIN,
3276                )
3277                .await
3278                .ok_or_else(|| {
3279                    KrafkaError::protocol("no mutually supported DescribeLogDirs API version")
3280                })?;
3281
3282            let response_bytes = match conn
3283                .send_request(ApiKey::DescribeLogDirs, version, |buf| {
3284                    request.encode_versioned(version, buf)
3285                })
3286                .await
3287            {
3288                Ok(bytes) => bytes,
3289                Err(e) => {
3290                    warn!(
3291                        "DescribeLogDirs request failed on broker {} ({}): {}",
3292                        broker.id, topic_scope, e
3293                    );
3294                    continue;
3295                }
3296            };
3297
3298            let mut buf = response_bytes;
3299            let response = match DescribeLogDirsResponse::decode_versioned(version, &mut buf) {
3300                Ok(r) => r,
3301                Err(e) => {
3302                    warn!(
3303                        "DescribeLogDirs decode failed on broker {} ({}): {}",
3304                        broker.id, topic_scope, e
3305                    );
3306                    continue;
3307                }
3308            };
3309
3310            // v3+ top-level error code
3311            if !response.error_code.is_ok() {
3312                warn!(
3313                    "DescribeLogDirs top-level error on broker {} ({}): {:?}",
3314                    broker.id, topic_scope, response.error_code
3315                );
3316            }
3317
3318            // Pre-v3 brokers lack a top-level error code; empty results typically
3319            // signal CLUSTER_AUTHORIZATION_FAILED (matches Java client heuristic).
3320            if response.results.is_empty() && version < 3 {
3321                warn!(
3322                    "DescribeLogDirs returned empty results on broker {} (v{}, {}); \
3323                     likely CLUSTER_AUTHORIZATION_FAILED",
3324                    broker.id, version, topic_scope
3325                );
3326            }
3327
3328            for result in response.results {
3329                all_dirs.push(LogDirInfo {
3330                    broker_id: broker.id,
3331                    log_dir: result.log_dir,
3332                    error: if result.error_code.is_ok() {
3333                        None
3334                    } else {
3335                        Some(format!("{:?}", result.error_code))
3336                    },
3337                    topics: result
3338                        .topics
3339                        .into_iter()
3340                        .map(|t| LogDirTopicInfo {
3341                            name: t.name,
3342                            partitions: t
3343                                .partitions
3344                                .into_iter()
3345                                .map(|p| LogDirPartitionInfo {
3346                                    partition_index: p.partition_index,
3347                                    partition_size: p.partition_size,
3348                                    offset_lag: p.offset_lag,
3349                                    is_future_key: p.is_future_key,
3350                                })
3351                                .collect(),
3352                        })
3353                        .collect(),
3354                    total_bytes: result.total_bytes,
3355                    usable_bytes: result.usable_bytes,
3356                });
3357            }
3358        }
3359
3360        info!(
3361            "Described {} log dir(s) across {} broker(s)",
3362            all_dirs.len(),
3363            brokers.len()
3364        );
3365        Ok(all_dirs)
3366    }
3367
3368    /// Trigger leader election for the specified partitions.
3369    ///
3370    /// When `topic_partitions` is `None`, leaders for all partitions are
3371    /// elected. The `election_type` controls whether to perform a preferred
3372    /// or unclean leader election (requires broker v1+; v0 always does
3373    /// preferred election).
3374    ///
3375    /// Returns per-partition results — individual partitions may fail even
3376    /// when the RPC succeeds.
3377    ///
3378    /// # Example
3379    ///
3380    /// ```ignore
3381    /// use krafka::protocol::ElectionType;
3382    /// use std::time::Duration;
3383    ///
3384    /// // Preferred election for all partitions
3385    /// let results = admin
3386    ///     .elect_leaders(ElectionType::Preferred, None, Duration::from_secs(60))
3387    ///     .await?;
3388    /// ```
3389    pub async fn elect_leaders(
3390        &self,
3391        election_type: ElectionType,
3392        topic_partitions: Option<Vec<ElectLeadersTopicPartitions>>,
3393        timeout: Duration,
3394    ) -> Result<Vec<ElectLeadersResult>> {
3395        // H6: reject oversize topic names at ingress.
3396        if let Some(ref tps) = topic_partitions {
3397            for tp in tps {
3398                validate_topic_name(&tp.topic)?;
3399            }
3400        }
3401        let conn = self.get_any_broker_connection().await?;
3402
3403        let request = ElectLeadersRequest {
3404            election_type,
3405            topic_partitions,
3406            timeout_ms: crate::util::duration_to_millis_i32(timeout),
3407        };
3408
3409        let version = conn
3410            .negotiate_api_version(
3411                ApiKey::ElectLeaders,
3412                versions::ELECT_LEADERS_MAX,
3413                versions::ELECT_LEADERS_MIN,
3414            )
3415            .await
3416            .ok_or_else(|| {
3417                KrafkaError::protocol("no mutually supported ElectLeaders API version")
3418            })?;
3419
3420        let response_bytes = conn
3421            .send_request(ApiKey::ElectLeaders, version, |buf| {
3422                request.encode_versioned(version, buf)
3423            })
3424            .await?;
3425
3426        let mut buf = response_bytes;
3427        let response = ElectLeadersResponse::decode_versioned(version, &mut buf)?;
3428
3429        if !response.error_code.is_ok() {
3430            warn!("ElectLeaders top-level error: {:?}", response.error_code);
3431        }
3432
3433        let results = response
3434            .replica_election_results
3435            .into_iter()
3436            .map(|topic| ElectLeadersResult {
3437                topic: topic.topic,
3438                partitions: topic
3439                    .partition_results
3440                    .into_iter()
3441                    .map(|p| ElectLeadersPartitionInfo {
3442                        partition_id: p.partition_id,
3443                        error: if p.error_code.is_ok() {
3444                            None
3445                        } else {
3446                            Some(
3447                                p.error_message
3448                                    .unwrap_or_else(|| format!("{:?}", p.error_code)),
3449                            )
3450                        },
3451                    })
3452                    .collect(),
3453            })
3454            .collect::<Vec<_>>();
3455
3456        info!("ElectLeaders completed for {} topic(s)", results.len());
3457        Ok(results)
3458    }
3459
3460    /// Alter partition reassignments.
3461    ///
3462    /// Initiates or cancels partition reassignments. To cancel a pending
3463    /// reassignment, set `replicas` to `None` for that partition.
3464    ///
3465    /// **This is a destructive operation** — reassigning partitions moves data
3466    /// between brokers and can significantly impact cluster load.
3467    ///
3468    /// Returns per-partition results — individual partitions may fail even
3469    /// when the RPC succeeds.
3470    ///
3471    /// # Example
3472    ///
3473    /// ```ignore
3474    /// use krafka::protocol::{ReassignableTopic, ReassignablePartition};
3475    /// use std::time::Duration;
3476    ///
3477    /// let results = admin.alter_partition_reassignments(
3478    ///     vec![ReassignableTopic {
3479    ///         name: "my-topic".into(),
3480    ///         partitions: vec![ReassignablePartition {
3481    ///             partition_index: 0,
3482    ///             replicas: Some(vec![1, 2, 3]),
3483    ///         }],
3484    ///     }],
3485    ///     Duration::from_secs(60),
3486    /// ).await?;
3487    /// ```
3488    pub async fn alter_partition_reassignments(
3489        &self,
3490        topics: Vec<ReassignableTopic>,
3491        timeout: Duration,
3492    ) -> Result<AlterReassignmentsResult> {
3493        // H6: reject oversize topic names at ingress.
3494        for t in &topics {
3495            validate_topic_name(&t.name)?;
3496        }
3497        let conn = self.get_any_broker_connection().await?;
3498
3499        let request = AlterPartitionReassignmentsRequest {
3500            timeout_ms: crate::util::duration_to_millis_i32(timeout),
3501            topics,
3502        };
3503
3504        let version = conn
3505            .negotiate_api_version(
3506                ApiKey::AlterPartitionReassignments,
3507                versions::ALTER_PARTITION_REASSIGNMENTS_MAX,
3508                versions::ALTER_PARTITION_REASSIGNMENTS_MIN,
3509            )
3510            .await
3511            .ok_or_else(|| {
3512                KrafkaError::protocol(
3513                    "no mutually supported AlterPartitionReassignments API version",
3514                )
3515            })?;
3516
3517        let response_bytes = conn
3518            .send_request(ApiKey::AlterPartitionReassignments, version, |buf| {
3519                request.encode_versioned(version, buf)
3520            })
3521            .await?;
3522
3523        let mut buf = response_bytes;
3524        let response = AlterPartitionReassignmentsResponse::decode_versioned(version, &mut buf)?;
3525
3526        if !response.error_code.is_ok() {
3527            warn!(
3528                "AlterPartitionReassignments top-level error: {:?} — {}",
3529                response.error_code,
3530                response.error_message.as_deref().unwrap_or("(no message)")
3531            );
3532        }
3533
3534        let topic_results = response
3535            .responses
3536            .into_iter()
3537            .map(|t| ReassignmentTopicResult {
3538                name: t.name,
3539                partitions: t
3540                    .partitions
3541                    .into_iter()
3542                    .map(|p| ReassignmentPartitionResult {
3543                        partition_index: p.partition_index,
3544                        error: if p.error_code.is_ok() {
3545                            None
3546                        } else {
3547                            Some(
3548                                p.error_message
3549                                    .unwrap_or_else(|| format!("{:?}", p.error_code)),
3550                            )
3551                        },
3552                    })
3553                    .collect(),
3554            })
3555            .collect::<Vec<_>>();
3556
3557        info!(
3558            "AlterPartitionReassignments completed for {} topic(s)",
3559            topic_results.len()
3560        );
3561
3562        Ok(AlterReassignmentsResult {
3563            error: if response.error_code.is_ok() {
3564                None
3565            } else {
3566                Some(
3567                    response
3568                        .error_message
3569                        .unwrap_or_else(|| format!("{:?}", response.error_code)),
3570                )
3571            },
3572            topics: topic_results,
3573        })
3574    }
3575
3576    /// List ongoing partition reassignments.
3577    ///
3578    /// When `topics` is `None`, all ongoing reassignments are listed.
3579    /// Otherwise, only the specified topic-partitions are checked.
3580    ///
3581    /// # Example
3582    ///
3583    /// ```ignore
3584    /// // List all ongoing reassignments
3585    /// let reassignments = admin
3586    ///     .list_partition_reassignments(None, Duration::from_secs(60))
3587    ///     .await?;
3588    /// for topic in &reassignments {
3589    ///     for p in &topic.partitions {
3590    ///         println!("{} p{}: adding {:?}, removing {:?}",
3591    ///             topic.name, p.partition_index, p.adding_replicas, p.removing_replicas);
3592    ///     }
3593    /// }
3594    /// ```
3595    pub async fn list_partition_reassignments(
3596        &self,
3597        topics: Option<Vec<ListPartitionReassignmentsTopic>>,
3598        timeout: Duration,
3599    ) -> Result<Vec<PartitionReassignmentInfo>> {
3600        // H6: reject oversize topic names at ingress.
3601        if let Some(ref ts) = topics {
3602            for t in ts {
3603                validate_topic_name(&t.name)?;
3604            }
3605        }
3606        let conn = self.get_any_broker_connection().await?;
3607
3608        let request = ListPartitionReassignmentsRequest {
3609            timeout_ms: crate::util::duration_to_millis_i32(timeout),
3610            topics,
3611        };
3612
3613        let version = conn
3614            .negotiate_api_version(
3615                ApiKey::ListPartitionReassignments,
3616                versions::LIST_PARTITION_REASSIGNMENTS_MAX,
3617                versions::LIST_PARTITION_REASSIGNMENTS_MIN,
3618            )
3619            .await
3620            .ok_or_else(|| {
3621                KrafkaError::protocol(
3622                    "no mutually supported ListPartitionReassignments API version",
3623                )
3624            })?;
3625
3626        let response_bytes = conn
3627            .send_request(ApiKey::ListPartitionReassignments, version, |buf| {
3628                request.encode_versioned(version, buf)
3629            })
3630            .await?;
3631
3632        let mut buf = response_bytes;
3633        let response = ListPartitionReassignmentsResponse::decode_versioned(version, &mut buf)?;
3634
3635        if !response.error_code.is_ok() {
3636            warn!(
3637                "ListPartitionReassignments top-level error: {:?} — {}",
3638                response.error_code,
3639                response.error_message.as_deref().unwrap_or("(no message)")
3640            );
3641        }
3642
3643        let results = response
3644            .topics
3645            .into_iter()
3646            .map(|t| PartitionReassignmentInfo {
3647                name: t.name,
3648                partitions: t
3649                    .partitions
3650                    .into_iter()
3651                    .map(|p| PartitionReassignmentPartitionInfo {
3652                        partition_index: p.partition_index,
3653                        replicas: p.replicas,
3654                        adding_replicas: p.adding_replicas,
3655                        removing_replicas: p.removing_replicas,
3656                    })
3657                    .collect(),
3658            })
3659            .collect::<Vec<_>>();
3660
3661        info!(
3662            "Listed {} topic(s) with ongoing reassignments",
3663            results.len()
3664        );
3665        Ok(results)
3666    }
3667
3668    // ════════════════════════════════════════════════════════════════════
3669    // AlterReplicaLogDirs (API key 34)
3670    // ════════════════════════════════════════════════════════════════════
3671
3672    /// Move partition replicas to a different log directory on the broker.
3673    ///
3674    /// **This is a destructive operation** — moving replicas between log
3675    /// directories triggers data copying and can impact broker I/O.
3676    ///
3677    /// This is a per-broker operation. The request is sent to every broker
3678    /// that currently hosts at least one replica for the specified
3679    /// topic-partitions.
3680    ///
3681    /// Returns per-partition results — individual partitions may fail even
3682    /// when the RPC succeeds.
3683    ///
3684    /// # Example
3685    ///
3686    /// ```ignore
3687    /// use krafka::protocol::{AlterReplicaLogDir, AlterReplicaLogDirTopic};
3688    ///
3689    /// let results = admin.alter_replica_log_dirs(vec![
3690    ///     AlterReplicaLogDir {
3691    ///         path: "/data/kafka-logs-2".into(),
3692    ///         topics: vec![AlterReplicaLogDirTopic {
3693    ///             name: "my-topic".into(),
3694    ///             partitions: vec![0, 1],
3695    ///         }],
3696    ///     },
3697    /// ]).await?;
3698    /// ```
3699    pub async fn alter_replica_log_dirs(
3700        &self,
3701        dirs: Vec<AlterReplicaLogDir>,
3702    ) -> Result<Vec<AlterReplicaLogDirsResult>> {
3703        self.check_not_closed()?;
3704        // H6: reject oversize topic names at ingress.
3705        for d in &dirs {
3706            for t in &d.topics {
3707                validate_topic_name(&t.name)?;
3708            }
3709        }
3710        let brokers = self.metadata.brokers();
3711        if brokers.is_empty() {
3712            return Err(KrafkaError::broker(
3713                crate::error::ErrorCode::UnknownServerError,
3714                "no brokers available",
3715            ));
3716        }
3717
3718        let request = AlterReplicaLogDirsRequest { dirs };
3719        let mut all_results = Vec::new();
3720
3721        for broker in &brokers {
3722            let conn = self
3723                .pool
3724                .get_connection_by_id(broker.id, broker.address())
3725                .await?;
3726
3727            let version = conn
3728                .negotiate_api_version(
3729                    ApiKey::AlterReplicaLogDirs,
3730                    versions::ALTER_REPLICA_LOG_DIRS_MAX,
3731                    versions::ALTER_REPLICA_LOG_DIRS_MIN,
3732                )
3733                .await
3734                .ok_or_else(|| {
3735                    KrafkaError::protocol("no mutually supported AlterReplicaLogDirs API version")
3736                })?;
3737
3738            let response_bytes = match conn
3739                .send_request(ApiKey::AlterReplicaLogDirs, version, |buf| {
3740                    request.encode_versioned(version, buf)
3741                })
3742                .await
3743            {
3744                Ok(bytes) => bytes,
3745                Err(e) => {
3746                    warn!(
3747                        "AlterReplicaLogDirs request failed on broker {} ({} dir(s)): {}",
3748                        broker.id,
3749                        request.dirs.len(),
3750                        e
3751                    );
3752                    continue;
3753                }
3754            };
3755
3756            let mut buf = response_bytes;
3757            let response = match AlterReplicaLogDirsResponse::decode_versioned(version, &mut buf) {
3758                Ok(r) => r,
3759                Err(e) => {
3760                    warn!(
3761                        "AlterReplicaLogDirs decode failed on broker {} ({} dir(s)): {}",
3762                        broker.id,
3763                        request.dirs.len(),
3764                        e
3765                    );
3766                    continue;
3767                }
3768            };
3769
3770            for topic in response.results {
3771                all_results.push(AlterReplicaLogDirsResult {
3772                    broker_id: broker.id,
3773                    topic_name: topic.topic_name,
3774                    partitions: topic
3775                        .partitions
3776                        .into_iter()
3777                        .map(|p| AlterReplicaLogDirsPartitionResult {
3778                            partition_index: p.partition_index,
3779                            error: if p.error_code.is_ok() {
3780                                None
3781                            } else {
3782                                Some(format!("{:?}", p.error_code))
3783                            },
3784                        })
3785                        .collect(),
3786                });
3787            }
3788        }
3789
3790        info!(
3791            "AlterReplicaLogDirs completed for {} topic(s)",
3792            all_results.len()
3793        );
3794        Ok(all_results)
3795    }
3796
3797    // ════════════════════════════════════════════════════════════════════
3798    // OffsetDelete (API key 47)
3799    // ════════════════════════════════════════════════════════════════════
3800
3801    /// Delete committed offsets for a consumer group.
3802    ///
3803    /// **This is a destructive operation** — deleted offsets cannot be
3804    /// recovered. The consumer group must be in the `Empty` state.
3805    ///
3806    /// The request is sent to the group coordinator.
3807    ///
3808    /// # Example
3809    ///
3810    /// ```ignore
3811    /// let results = admin.delete_offsets(
3812    ///     "my-group",
3813    ///     &[("my-topic", &[0, 1, 2])],
3814    /// ).await?;
3815    /// ```
3816    pub async fn delete_consumer_group_offsets(
3817        &self,
3818        group_id: &str,
3819        topic_partitions: &[(&str, &[i32])],
3820    ) -> Result<OffsetDeleteResult> {
3821        self.check_not_closed()?;
3822
3823        // Find the group coordinator.
3824        let any_conn = self.get_any_broker_connection().await?;
3825        let coord_request = FindCoordinatorRequest::for_group(group_id);
3826        let coord_version = any_conn
3827            .negotiate_api_version(
3828                ApiKey::FindCoordinator,
3829                versions::FIND_COORDINATOR_MAX,
3830                versions::FIND_COORDINATOR_MIN,
3831            )
3832            .await
3833            .ok_or_else(|| {
3834                KrafkaError::protocol("no mutually supported FindCoordinator API version")
3835            })?;
3836        let coord_response_bytes = any_conn
3837            .send_request(ApiKey::FindCoordinator, coord_version, |buf| {
3838                coord_request.encode_versioned(coord_version, buf)
3839            })
3840            .await?;
3841        let mut coord_buf = coord_response_bytes;
3842        let coord_response =
3843            FindCoordinatorResponse::decode_versioned(coord_version, &mut coord_buf)?;
3844
3845        let coordinator = if coord_response.error_code.is_ok() {
3846            let addr = format!("{}:{}", coord_response.host, coord_response.port);
3847            self.pool
3848                .get_connection_by_id(coord_response.node_id, &addr)
3849                .await?
3850        } else {
3851            warn!(
3852                "FindCoordinator failed for group '{}': {:?}, using any broker",
3853                group_id, coord_response.error_code
3854            );
3855            any_conn
3856        };
3857
3858        let topics = topic_partitions
3859            .iter()
3860            .map(|(name, partitions)| OffsetDeleteTopicRequest {
3861                name: (*name).to_string(),
3862                partitions: partitions
3863                    .iter()
3864                    .map(|&p| OffsetDeletePartitionRequest { partition_index: p })
3865                    .collect(),
3866            })
3867            .collect();
3868
3869        let request = OffsetDeleteRequest {
3870            group_id: group_id.to_string(),
3871            topics,
3872        };
3873
3874        let version = coordinator
3875            .negotiate_api_version(
3876                ApiKey::OffsetDelete,
3877                versions::OFFSET_DELETE_MAX,
3878                versions::OFFSET_DELETE_MIN,
3879            )
3880            .await
3881            .ok_or_else(|| {
3882                KrafkaError::protocol("no mutually supported OffsetDelete API version")
3883            })?;
3884
3885        let response_bytes = coordinator
3886            .send_request(ApiKey::OffsetDelete, version, |buf| {
3887                request.encode_versioned(version, buf)
3888            })
3889            .await?;
3890
3891        let mut buf = response_bytes;
3892        let response = OffsetDeleteResponse::decode_versioned(version, &mut buf)?;
3893
3894        if !response.error_code.is_ok() {
3895            warn!("OffsetDelete top-level error: {:?}", response.error_code);
3896        }
3897
3898        let topics = response
3899            .topics
3900            .into_iter()
3901            .map(|t| OffsetDeleteTopicResult {
3902                name: t.name,
3903                partitions: t
3904                    .partitions
3905                    .into_iter()
3906                    .map(|p| OffsetDeletePartitionResult {
3907                        partition_index: p.partition_index,
3908                        error: if p.error_code.is_ok() {
3909                            None
3910                        } else {
3911                            Some(format!("{:?}", p.error_code))
3912                        },
3913                    })
3914                    .collect(),
3915            })
3916            .collect::<Vec<_>>();
3917
3918        info!("OffsetDelete completed for group {group_id}");
3919
3920        Ok(OffsetDeleteResult {
3921            error: if response.error_code.is_ok() {
3922                None
3923            } else {
3924                Some(format!("{:?}", response.error_code))
3925            },
3926            topics,
3927        })
3928    }
3929
3930    // ════════════════════════════════════════════════════════════════════
3931    // DescribeUserScramCredentials (API key 50)
3932    // ════════════════════════════════════════════════════════════════════
3933
3934    /// Describe SCRAM credentials for the specified users.
3935    ///
3936    /// When `users` is `None`, all SCRAM credentials are described.
3937    ///
3938    /// # Example
3939    ///
3940    /// ```ignore
3941    /// // Describe all SCRAM credentials
3942    /// let results = admin.describe_user_scram_credentials(None).await?;
3943    /// for user in &results {
3944    ///     println!("{}: {:?}", user.name, user.credential_infos);
3945    /// }
3946    /// ```
3947    pub async fn describe_user_scram_credentials(
3948        &self,
3949        users: Option<Vec<String>>,
3950    ) -> Result<DescribeUserScramCredentialsResult> {
3951        let conn = self.get_any_broker_connection().await?;
3952
3953        let request = DescribeUserScramCredentialsRequest { users };
3954
3955        let version = conn
3956            .negotiate_api_version(
3957                ApiKey::DescribeUserScramCredentials,
3958                versions::DESCRIBE_USER_SCRAM_CREDENTIALS_MAX,
3959                versions::DESCRIBE_USER_SCRAM_CREDENTIALS_MIN,
3960            )
3961            .await
3962            .ok_or_else(|| {
3963                KrafkaError::protocol(
3964                    "no mutually supported DescribeUserScramCredentials API version",
3965                )
3966            })?;
3967
3968        let response_bytes = conn
3969            .send_request(ApiKey::DescribeUserScramCredentials, version, |buf| {
3970                request.encode_versioned(version, buf)
3971            })
3972            .await?;
3973
3974        let mut buf = response_bytes;
3975        let response = DescribeUserScramCredentialsResponse::decode_versioned(version, &mut buf)?;
3976
3977        if !response.error_code.is_ok() {
3978            warn!(
3979                "DescribeUserScramCredentials top-level error: {:?} — {}",
3980                response.error_code,
3981                response.error_message.as_deref().unwrap_or("(no message)")
3982            );
3983        }
3984
3985        let users = response
3986            .results
3987            .into_iter()
3988            .map(|r| ScramCredentialUserResult {
3989                name: r.user,
3990                error: if r.error_code.is_ok() {
3991                    None
3992                } else {
3993                    r.error_message
3994                        .or_else(|| Some(format!("{:?}", r.error_code)))
3995                },
3996                credential_infos: r
3997                    .credential_infos
3998                    .into_iter()
3999                    .map(|c| ScramCredentialInfoResult {
4000                        mechanism: c.mechanism,
4001                        iterations: c.iterations,
4002                    })
4003                    .collect(),
4004            })
4005            .collect::<Vec<_>>();
4006
4007        info!(
4008            "DescribeUserScramCredentials returned {} user(s)",
4009            users.len()
4010        );
4011
4012        Ok(DescribeUserScramCredentialsResult {
4013            error: if response.error_code.is_ok() {
4014                None
4015            } else {
4016                response
4017                    .error_message
4018                    .or_else(|| Some(format!("{:?}", response.error_code)))
4019            },
4020            users,
4021        })
4022    }
4023
4024    // ════════════════════════════════════════════════════════════════════
4025    // AlterUserScramCredentials (API key 51)
4026    // ════════════════════════════════════════════════════════════════════
4027
4028    /// Alter (upsert or delete) SCRAM credentials for users.
4029    ///
4030    /// **This is a destructive operation** — deleting a SCRAM credential
4031    /// removes the user's ability to authenticate with that mechanism.
4032    ///
4033    /// # Example
4034    ///
4035    /// ```ignore
4036    /// use krafka::protocol::{ScramCredentialDeletion, ScramCredentialUpsertion};
4037    /// use krafka::auth::ScramMechanism;
4038    /// use zeroize::Zeroizing;
4039    ///
4040    /// let results = admin.alter_user_scram_credentials(
4041    ///     vec![ScramCredentialDeletion {
4042    ///         name: "alice".into(),
4043    ///         mechanism: ScramMechanism::Sha512,
4044    ///     }],
4045    ///     vec![ScramCredentialUpsertion {
4046    ///         name: "bob".into(),
4047    ///         mechanism: ScramMechanism::Sha256,
4048    ///         iterations: 8192,
4049    ///         salt: Zeroizing::new(vec![1, 2, 3]),
4050    ///         salted_password: Zeroizing::new(vec![4, 5, 6]),
4051    ///     }],
4052    /// ).await?;
4053    /// ```
4054    pub async fn alter_user_scram_credentials(
4055        &self,
4056        deletions: Vec<ScramCredentialDeletion>,
4057        upsertions: Vec<ScramCredentialUpsertion>,
4058    ) -> Result<Vec<AlterScramCredentialResult>> {
4059        let conn = self.get_any_broker_connection().await?;
4060
4061        let request = AlterUserScramCredentialsRequest {
4062            deletions,
4063            upsertions,
4064        };
4065
4066        let version = conn
4067            .negotiate_api_version(
4068                ApiKey::AlterUserScramCredentials,
4069                versions::ALTER_USER_SCRAM_CREDENTIALS_MAX,
4070                versions::ALTER_USER_SCRAM_CREDENTIALS_MIN,
4071            )
4072            .await
4073            .ok_or_else(|| {
4074                KrafkaError::protocol("no mutually supported AlterUserScramCredentials API version")
4075            })?;
4076
4077        let response_bytes = conn
4078            .send_request(ApiKey::AlterUserScramCredentials, version, |buf| {
4079                request.encode_versioned(version, buf)
4080            })
4081            .await?;
4082
4083        let mut buf = response_bytes;
4084        let response = AlterUserScramCredentialsResponse::decode_versioned(version, &mut buf)?;
4085
4086        let results = response
4087            .results
4088            .into_iter()
4089            .map(|r| AlterScramCredentialResult {
4090                user: r.user,
4091                error: if r.error_code.is_ok() {
4092                    None
4093                } else {
4094                    r.error_message
4095                        .or_else(|| Some(format!("{:?}", r.error_code)))
4096                },
4097            })
4098            .collect::<Vec<_>>();
4099
4100        info!(
4101            "AlterUserScramCredentials completed for {} user(s)",
4102            results.len()
4103        );
4104        Ok(results)
4105    }
4106
4107    // ════════════════════════════════════════════════════════════════════
4108    // DescribeProducers (API key 61)
4109    // ════════════════════════════════════════════════════════════════════
4110
4111    /// Describe active producers on the given topic-partitions.
4112    ///
4113    /// Routes each topic-partition to its leader broker via cached metadata
4114    /// for optimal performance. Falls back to any broker if the leader is
4115    /// unknown.
4116    ///
4117    /// Returns per-partition producer state useful for debugging
4118    /// transactional and idempotent producers.
4119    ///
4120    /// # Example
4121    ///
4122    /// ```ignore
4123    /// let results = admin
4124    ///     .describe_producers(&[("my-topic", &[0, 1])])
4125    ///     .await?;
4126    /// ```
4127    pub async fn describe_producers(
4128        &self,
4129        topic_partitions: &[(&str, &[i32])],
4130    ) -> Result<Vec<DescribeProducersTopicResult>> {
4131        self.check_not_closed()?;
4132
4133        for attempt in 0u8..2 {
4134            if attempt == 1 {
4135                // Refresh metadata after a stale-leader error.
4136                let topics: Vec<&str> = topic_partitions.iter().map(|&(t, _)| t).collect();
4137                let _ = self.metadata.refresh_for_topics(Some(&topics)).await;
4138            }
4139
4140            let brokers = self.metadata.brokers();
4141            if brokers.is_empty() {
4142                return Err(KrafkaError::broker(
4143                    crate::error::ErrorCode::UnknownServerError,
4144                    "no brokers available",
4145                ));
4146            }
4147
4148            let fallback_id = brokers[0].id;
4149
4150            // Group topic-partitions by leader broker.
4151            let mut by_leader: HashMap<i32, HashMap<String, Vec<i32>>> = HashMap::new();
4152            for &(topic, partitions) in topic_partitions {
4153                for &pid in partitions {
4154                    let leader = self.metadata.leader(topic, pid).unwrap_or(fallback_id);
4155                    by_leader
4156                        .entry(leader)
4157                        .or_default()
4158                        .entry(topic.to_string())
4159                        .or_default()
4160                        .push(pid);
4161                }
4162            }
4163
4164            let mut all_results: HashMap<String, DescribeProducersTopicResult> = HashMap::new();
4165            let mut has_stale_leader = false;
4166
4167            for (broker_id, topic_map) in by_leader {
4168                let broker = brokers
4169                    .iter()
4170                    .find(|b| b.id == broker_id)
4171                    .unwrap_or(&brokers[0]);
4172                let conn = self
4173                    .pool
4174                    .get_connection_by_id(broker.id, broker.address())
4175                    .await?;
4176
4177                let topics = topic_map
4178                    .into_iter()
4179                    .map(|(name, partition_indexes)| DescribeProducersTopicRequest {
4180                        name,
4181                        partition_indexes,
4182                    })
4183                    .collect();
4184
4185                let request = DescribeProducersRequest { topics };
4186
4187                let version = conn
4188                    .negotiate_api_version(
4189                        ApiKey::DescribeProducers,
4190                        versions::DESCRIBE_PRODUCERS_MAX,
4191                        versions::DESCRIBE_PRODUCERS_MIN,
4192                    )
4193                    .await
4194                    .ok_or_else(|| {
4195                        KrafkaError::protocol("no mutually supported DescribeProducers API version")
4196                    })?;
4197
4198                let response_bytes = match conn
4199                    .send_request(ApiKey::DescribeProducers, version, |buf| {
4200                        request.encode_versioned(version, buf)
4201                    })
4202                    .await
4203                {
4204                    Ok(bytes) => bytes,
4205                    Err(e) => {
4206                        warn!(
4207                            "DescribeProducers request failed on broker {}: {}",
4208                            broker.id, e
4209                        );
4210                        continue;
4211                    }
4212                };
4213
4214                let mut buf = response_bytes;
4215                let response = match DescribeProducersResponse::decode_versioned(version, &mut buf)
4216                {
4217                    Ok(r) => r,
4218                    Err(e) => {
4219                        warn!(
4220                            "DescribeProducers decode failed on broker {}: {}",
4221                            broker.id, e
4222                        );
4223                        continue;
4224                    }
4225                };
4226
4227                for topic in response.topics {
4228                    let entry = all_results.entry(topic.name.clone()).or_insert_with(|| {
4229                        DescribeProducersTopicResult {
4230                            name: topic.name,
4231                            partitions: Vec::new(),
4232                        }
4233                    });
4234                    entry
4235                        .partitions
4236                        .extend(topic.partitions.into_iter().map(|p| {
4237                            if p.error_code == crate::error::ErrorCode::NotLeaderForPartition {
4238                                has_stale_leader = true;
4239                            }
4240                            DescribeProducersPartitionInfo {
4241                                partition_index: p.partition_index,
4242                                error: if p.error_code.is_ok() {
4243                                    None
4244                                } else {
4245                                    Some(
4246                                        p.error_message
4247                                            .unwrap_or_else(|| format!("{:?}", p.error_code)),
4248                                    )
4249                                },
4250                                active_producers: p
4251                                    .active_producers
4252                                    .into_iter()
4253                                    .map(|pr| ProducerStateInfo {
4254                                        producer_id: pr.producer_id,
4255                                        producer_epoch: pr.producer_epoch,
4256                                        last_sequence: pr.last_sequence,
4257                                        last_timestamp: pr.last_timestamp,
4258                                        coordinator_epoch: pr.coordinator_epoch,
4259                                        current_txn_start_offset: pr.current_txn_start_offset,
4260                                    })
4261                                    .collect(),
4262                            }
4263                        }));
4264                }
4265            }
4266
4267            if has_stale_leader && attempt == 0 {
4268                warn!(
4269                    "NotLeaderForPartition in DescribeProducers response, retrying with refreshed metadata"
4270                );
4271                continue;
4272            }
4273
4274            let results: Vec<DescribeProducersTopicResult> = all_results.into_values().collect();
4275            info!("DescribeProducers returned {} topic(s)", results.len());
4276            return Ok(results);
4277        }
4278        Err(KrafkaError::protocol(
4279            "DescribeProducers retry loop exhausted after metadata refresh",
4280        ))
4281    }
4282
4283    // ════════════════════════════════════════════════════════════════════
4284    // DescribeTransactions (API key 65)
4285    // ════════════════════════════════════════════════════════════════════
4286
4287    /// Describe the state of the given transactions.
4288    ///
4289    /// Routes each transactional ID to its transaction coordinator via
4290    /// `FindCoordinator`, groups by coordinator, and batches requests.
4291    ///
4292    /// # Example
4293    ///
4294    /// ```ignore
4295    /// let results = admin
4296    ///     .describe_transactions(&["txn-1", "txn-2"])
4297    ///     .await?;
4298    /// ```
4299    pub async fn describe_transactions(
4300        &self,
4301        transactional_ids: &[&str],
4302    ) -> Result<Vec<TransactionDescription>> {
4303        self.check_not_closed()?;
4304        let brokers = self.metadata.brokers();
4305        if brokers.is_empty() {
4306            return Err(KrafkaError::broker(
4307                crate::error::ErrorCode::UnknownServerError,
4308                "no brokers available",
4309            ));
4310        }
4311
4312        // Group transactional IDs by their coordinator broker.
4313        let any_broker = &brokers[0];
4314        let any_conn = self
4315            .pool
4316            .get_connection_by_id(any_broker.id, any_broker.address())
4317            .await?;
4318
4319        let mut coordinator_txns: HashMap<i32, Vec<String>> = HashMap::new();
4320
4321        for txn_id in transactional_ids {
4322            let coord_request = FindCoordinatorRequest::for_transaction(txn_id);
4323            let coord_version = any_conn
4324                .negotiate_api_version(
4325                    ApiKey::FindCoordinator,
4326                    versions::FIND_COORDINATOR_MAX,
4327                    versions::FIND_COORDINATOR_MIN,
4328                )
4329                .await
4330                .ok_or_else(|| {
4331                    KrafkaError::protocol("no mutually supported FindCoordinator API version")
4332                })?;
4333
4334            let coord_response_bytes = any_conn
4335                .send_request(ApiKey::FindCoordinator, coord_version, |buf| {
4336                    coord_request.encode_versioned(coord_version, buf)
4337                })
4338                .await?;
4339            let mut coord_buf = coord_response_bytes;
4340            let coord_response =
4341                FindCoordinatorResponse::decode_versioned(coord_version, &mut coord_buf)?;
4342
4343            if coord_response.error_code.is_ok() {
4344                coordinator_txns
4345                    .entry(coord_response.node_id)
4346                    .or_default()
4347                    .push((*txn_id).to_string());
4348            } else {
4349                warn!(
4350                    "FindCoordinator failed for txn '{}': {:?}, falling back to broker {}",
4351                    txn_id, coord_response.error_code, any_broker.id
4352                );
4353                coordinator_txns
4354                    .entry(any_broker.id)
4355                    .or_default()
4356                    .push((*txn_id).to_string());
4357            }
4358        }
4359
4360        let mut all_results = Vec::new();
4361
4362        for (broker_id, txn_ids) in coordinator_txns {
4363            let broker = brokers
4364                .iter()
4365                .find(|b| b.id == broker_id)
4366                .unwrap_or(any_broker);
4367            let conn = self
4368                .pool
4369                .get_connection_by_id(broker.id, broker.address())
4370                .await?;
4371
4372            let request = DescribeTransactionsRequest {
4373                transactional_ids: txn_ids,
4374            };
4375
4376            let version = conn
4377                .negotiate_api_version(
4378                    ApiKey::DescribeTransactions,
4379                    versions::DESCRIBE_TRANSACTIONS_MAX,
4380                    versions::DESCRIBE_TRANSACTIONS_MIN,
4381                )
4382                .await
4383                .ok_or_else(|| {
4384                    KrafkaError::protocol("no mutually supported DescribeTransactions API version")
4385                })?;
4386
4387            let response_bytes = match conn
4388                .send_request(ApiKey::DescribeTransactions, version, |buf| {
4389                    request.encode_versioned(version, buf)
4390                })
4391                .await
4392            {
4393                Ok(bytes) => bytes,
4394                Err(e) => {
4395                    warn!(
4396                        "DescribeTransactions request failed on broker {}: {}",
4397                        broker.id, e
4398                    );
4399                    continue;
4400                }
4401            };
4402
4403            let mut buf = response_bytes;
4404            let response = match DescribeTransactionsResponse::decode_versioned(version, &mut buf) {
4405                Ok(r) => r,
4406                Err(e) => {
4407                    warn!(
4408                        "DescribeTransactions decode failed on broker {}: {}",
4409                        broker.id, e
4410                    );
4411                    continue;
4412                }
4413            };
4414
4415            all_results.extend(response.transaction_states.into_iter().map(|s| {
4416                TransactionDescription {
4417                    transactional_id: s.transactional_id,
4418                    error: if s.error_code.is_ok() {
4419                        None
4420                    } else {
4421                        Some(format!("{:?}", s.error_code))
4422                    },
4423                    state: s.transaction_state,
4424                    timeout_ms: s.transaction_timeout_ms,
4425                    start_time_ms: s.transaction_start_time_ms,
4426                    producer_id: s.producer_id,
4427                    producer_epoch: s.producer_epoch,
4428                    topics: s
4429                        .topics
4430                        .into_iter()
4431                        .map(|t| TransactionTopicInfo {
4432                            topic: t.topic,
4433                            partitions: t.partitions,
4434                        })
4435                        .collect(),
4436                }
4437            }));
4438        }
4439
4440        info!(
4441            "DescribeTransactions returned {} transaction(s)",
4442            all_results.len()
4443        );
4444        Ok(all_results)
4445    }
4446
4447    // ════════════════════════════════════════════════════════════════════
4448    // ListTransactions (API key 66)
4449    // ════════════════════════════════════════════════════════════════════
4450
4451    /// List transactions matching the given filters.
4452    ///
4453    /// Queries **all** brokers and merges results, because each broker
4454    /// only knows about transactions it coordinates.
4455    ///
4456    /// Pass empty slices for `state_filters` and `producer_id_filters` to
4457    /// list all transactions.
4458    ///
4459    /// # Example
4460    ///
4461    /// ```ignore
4462    /// // List all ongoing transactions
4463    /// let txns = admin
4464    ///     .list_transactions(&["Ongoing"], &[], -1)
4465    ///     .await?;
4466    /// ```
4467    pub async fn list_transactions(
4468        &self,
4469        state_filters: &[&str],
4470        producer_id_filters: &[i64],
4471        duration_filter: i64,
4472    ) -> Result<ListTransactionsResult> {
4473        self.check_not_closed()?;
4474        let brokers = self.metadata.brokers();
4475        if brokers.is_empty() {
4476            return Err(KrafkaError::broker(
4477                crate::error::ErrorCode::UnknownServerError,
4478                "no brokers available",
4479            ));
4480        }
4481
4482        let request = ListTransactionsRequest {
4483            state_filters: state_filters.iter().map(|s| (*s).to_string()).collect(),
4484            producer_id_filters: producer_id_filters.to_vec(),
4485            duration_filter,
4486        };
4487
4488        let mut all_transactions = Vec::new();
4489        let mut all_unknown_state_filters = Vec::new();
4490        let mut last_error: Option<String> = None;
4491
4492        for broker in &brokers {
4493            let conn = self
4494                .pool
4495                .get_connection_by_id(broker.id, broker.address())
4496                .await?;
4497
4498            let version = conn
4499                .negotiate_api_version(
4500                    ApiKey::ListTransactions,
4501                    versions::LIST_TRANSACTIONS_MAX,
4502                    versions::LIST_TRANSACTIONS_MIN,
4503                )
4504                .await
4505                .ok_or_else(|| {
4506                    KrafkaError::protocol("no mutually supported ListTransactions API version")
4507                })?;
4508
4509            let response_bytes = match conn
4510                .send_request(ApiKey::ListTransactions, version, |buf| {
4511                    request.encode_versioned(version, buf)
4512                })
4513                .await
4514            {
4515                Ok(bytes) => bytes,
4516                Err(e) => {
4517                    warn!(
4518                        "ListTransactions request failed on broker {}: {}",
4519                        broker.id, e
4520                    );
4521                    continue;
4522                }
4523            };
4524
4525            let mut buf = response_bytes;
4526            let response = match ListTransactionsResponse::decode_versioned(version, &mut buf) {
4527                Ok(r) => r,
4528                Err(e) => {
4529                    warn!(
4530                        "ListTransactions decode failed on broker {}: {}",
4531                        broker.id, e
4532                    );
4533                    continue;
4534                }
4535            };
4536
4537            if !response.error_code.is_ok() {
4538                warn!(
4539                    "ListTransactions error on broker {}: {:?}",
4540                    broker.id, response.error_code
4541                );
4542                last_error = Some(format!("{:?}", response.error_code));
4543            }
4544
4545            for filter in response.unknown_state_filters {
4546                if !all_unknown_state_filters.contains(&filter) {
4547                    all_unknown_state_filters.push(filter);
4548                }
4549            }
4550
4551            all_transactions.extend(response.transaction_states.into_iter().map(|s| {
4552                TransactionListEntry {
4553                    transactional_id: s.transactional_id,
4554                    producer_id: s.producer_id,
4555                    state: s.transaction_state,
4556                }
4557            }));
4558        }
4559
4560        info!(
4561            "ListTransactions returned {} transaction(s) across {} broker(s)",
4562            all_transactions.len(),
4563            brokers.len()
4564        );
4565
4566        Ok(ListTransactionsResult {
4567            error: last_error,
4568            unknown_state_filters: all_unknown_state_filters,
4569            transactions: all_transactions,
4570        })
4571    }
4572
4573    // ════════════════════════════════════════════════════════════════════
4574    // ListClientMetricsResources (API key 74)
4575    // ════════════════════════════════════════════════════════════════════
4576
4577    /// List client metrics subscription resources (KIP-714).
4578    ///
4579    /// Returns the names of client metrics subscriptions configured on
4580    /// the cluster.
4581    ///
4582    /// # Example
4583    ///
4584    /// ```ignore
4585    /// let names = admin.list_client_metrics_resources().await?;
4586    /// for name in &names {
4587    ///     println!("subscription: {name}");
4588    /// }
4589    /// ```
4590    pub async fn list_client_metrics_resources(&self) -> Result<Vec<String>> {
4591        let conn = self.get_any_broker_connection().await?;
4592
4593        let request = ListClientMetricsResourcesRequest;
4594
4595        let version = conn
4596            .negotiate_api_version(
4597                ApiKey::ListClientMetricsResources,
4598                versions::LIST_CLIENT_METRICS_RESOURCES_MAX,
4599                versions::LIST_CLIENT_METRICS_RESOURCES_MIN,
4600            )
4601            .await
4602            .ok_or_else(|| {
4603                KrafkaError::protocol(
4604                    "no mutually supported ListClientMetricsResources API version",
4605                )
4606            })?;
4607
4608        let response_bytes = conn
4609            .send_request(ApiKey::ListClientMetricsResources, version, |buf| {
4610                request.encode_versioned(version, buf)
4611            })
4612            .await?;
4613
4614        let mut buf = response_bytes;
4615        let response = ListClientMetricsResourcesResponse::decode_versioned(version, &mut buf)?;
4616
4617        if !response.error_code.is_ok() {
4618            warn!(
4619                "ListClientMetricsResources error: {:?}",
4620                response.error_code
4621            );
4622        }
4623
4624        let names: Vec<String> = response
4625            .client_metrics_resources
4626            .into_iter()
4627            .map(|r| r.name)
4628            .collect();
4629
4630        info!(
4631            "ListClientMetricsResources returned {} resource(s)",
4632            names.len()
4633        );
4634        Ok(names)
4635    }
4636
4637    // ════════════════════════════════════════════════════════════════════
4638    // WriteTxnMarkers (API key 27)
4639    // ════════════════════════════════════════════════════════════════════
4640
4641    /// Write transaction markers (COMMIT or ABORT) to the given topic-partitions.
4642    ///
4643    /// This is an inter-broker API used to finalize transactions.
4644    /// The admin client exposes it primarily for **aborting stuck transactions**
4645    /// (`abort_transaction`).
4646    ///
4647    /// Each marker is sent to **all** brokers since the partitions may be led
4648    /// by different brokers. Per-broker errors are logged and skipped so
4649    /// results from reachable brokers are still returned.
4650    ///
4651    /// # Example
4652    ///
4653    /// ```ignore
4654    /// use krafka::protocol::{WritableTxnMarker, WritableTxnMarkerTopic};
4655    ///
4656    /// let results = admin
4657    ///     .write_txn_markers(&[WritableTxnMarker {
4658    ///         producer_id: 42,
4659    ///         producer_epoch: 5,
4660    ///         transaction_result: false, // ABORT
4661    ///         topics: vec![WritableTxnMarkerTopic {
4662    ///             name: "my-topic".into(),
4663    ///             partition_indexes: vec![0, 1],
4664    ///         }],
4665    ///         coordinator_epoch: 10,
4666    ///     }])
4667    ///     .await?;
4668    /// ```
4669    pub async fn write_txn_markers(
4670        &self,
4671        markers: &[WritableTxnMarker],
4672    ) -> Result<Vec<WriteTxnMarkersResult>> {
4673        self.check_not_closed()?;
4674        let conn = self.get_any_broker_connection().await?;
4675
4676        let request = WriteTxnMarkersRequest {
4677            markers: markers.to_vec(),
4678        };
4679
4680        let version = conn
4681            .negotiate_api_version(
4682                ApiKey::WriteTxnMarkers,
4683                versions::WRITE_TXN_MARKERS_MAX,
4684                versions::WRITE_TXN_MARKERS_MIN,
4685            )
4686            .await
4687            .ok_or_else(|| {
4688                KrafkaError::protocol("no mutually supported WriteTxnMarkers API version")
4689            })?;
4690
4691        let response_bytes = conn
4692            .send_request(ApiKey::WriteTxnMarkers, version, |buf| {
4693                request.encode_versioned(version, buf)
4694            })
4695            .await?;
4696
4697        let mut buf = response_bytes;
4698        let response = WriteTxnMarkersResponse::decode_versioned(version, &mut buf)?;
4699
4700        let results = response
4701            .markers
4702            .into_iter()
4703            .map(|m| WriteTxnMarkersResult {
4704                producer_id: m.producer_id,
4705                topics: m
4706                    .topics
4707                    .into_iter()
4708                    .map(|t| WriteTxnMarkersTopicResult {
4709                        name: t.name,
4710                        partitions: t
4711                            .partitions
4712                            .into_iter()
4713                            .map(|p| WriteTxnMarkersPartitionResult {
4714                                partition_index: p.partition_index,
4715                                error: if p.error_code.is_ok() {
4716                                    None
4717                                } else {
4718                                    Some(format!("{:?}", p.error_code))
4719                                },
4720                            })
4721                            .collect(),
4722                    })
4723                    .collect(),
4724            })
4725            .collect::<Vec<_>>();
4726
4727        info!(
4728            "WriteTxnMarkers returned {} marker result(s)",
4729            results.len()
4730        );
4731        Ok(results)
4732    }
4733
4734    /// Abort a stuck transaction by writing an ABORT marker.
4735    ///
4736    /// This is the admin-friendly wrapper around [`write_txn_markers`](Self::write_txn_markers)
4737    /// that looks up the transaction coordinator, discovers the affected
4738    /// partitions via [`describe_transactions`](Self::describe_transactions),
4739    /// and writes an ABORT marker.
4740    ///
4741    /// # Example
4742    ///
4743    /// ```ignore
4744    /// admin.abort_transaction("my-transactional-id").await?;
4745    /// ```
4746    pub async fn abort_transaction(
4747        &self,
4748        transactional_id: &str,
4749    ) -> Result<Vec<WriteTxnMarkersResult>> {
4750        self.check_not_closed()?;
4751
4752        // Describe the transaction to get producer_id, producer_epoch,
4753        // coordinator_epoch, and the affected topic-partitions.
4754        let descriptions = self.describe_transactions(&[transactional_id]).await?;
4755        let desc = descriptions
4756            .first()
4757            .ok_or_else(|| KrafkaError::protocol("no transaction description returned"))?;
4758
4759        if let Some(ref err) = desc.error {
4760            return Err(KrafkaError::protocol(format!(
4761                "cannot abort transaction '{}': {}",
4762                transactional_id, err,
4763            )));
4764        }
4765
4766        let topics: Vec<WritableTxnMarkerTopic> = desc
4767            .topics
4768            .iter()
4769            .map(|t| WritableTxnMarkerTopic {
4770                name: t.topic.clone(),
4771                partition_indexes: t.partitions.clone(),
4772            })
4773            .collect();
4774
4775        let marker = WritableTxnMarker {
4776            producer_id: desc.producer_id,
4777            producer_epoch: desc.producer_epoch,
4778            transaction_result: false, // ABORT
4779            topics,
4780            coordinator_epoch: 0, // Use 0 — the broker will validate
4781        };
4782
4783        self.write_txn_markers(&[marker]).await
4784    }
4785
4786    // ════════════════════════════════════════════════════════════════════
4787    // DescribeQuorum (API key 55)
4788    // ════════════════════════════════════════════════════════════════════
4789
4790    /// Describe the KRaft quorum for the given topic-partitions.
4791    ///
4792    /// In a KRaft-mode cluster this returns the current voters, observers,
4793    /// leader, leader epoch, and high watermark for each quorum partition.
4794    ///
4795    /// The primary use case is inspecting `__cluster_metadata` partition 0.
4796    ///
4797    /// # Example
4798    ///
4799    /// ```ignore
4800    /// let result = admin
4801    ///     .describe_quorum(&[("__cluster_metadata", &[0])])
4802    ///     .await?;
4803    /// ```
4804    pub async fn describe_metadata_quorum(
4805        &self,
4806        topic_partitions: &[(&str, &[i32])],
4807    ) -> Result<DescribeQuorumResult> {
4808        self.check_not_closed()?;
4809        let conn = self.get_any_broker_connection().await?;
4810
4811        let topics = topic_partitions
4812            .iter()
4813            .map(|(name, partitions)| DescribeQuorumTopicRequest {
4814                topic_name: (*name).to_string(),
4815                partitions: partitions
4816                    .iter()
4817                    .map(|&p| DescribeQuorumPartitionRequest { partition_index: p })
4818                    .collect(),
4819            })
4820            .collect();
4821
4822        let request = DescribeQuorumRequest { topics };
4823
4824        let version = conn
4825            .negotiate_api_version(
4826                ApiKey::DescribeQuorum,
4827                versions::DESCRIBE_QUORUM_MAX,
4828                versions::DESCRIBE_QUORUM_MIN,
4829            )
4830            .await
4831            .ok_or_else(|| {
4832                KrafkaError::protocol("no mutually supported DescribeQuorum API version")
4833            })?;
4834
4835        let response_bytes = conn
4836            .send_request(ApiKey::DescribeQuorum, version, |buf| {
4837                request.encode_versioned(version, buf)
4838            })
4839            .await?;
4840
4841        let mut buf = response_bytes;
4842        let response = DescribeQuorumResponse::decode_versioned(version, &mut buf)?;
4843
4844        if !response.error_code.is_ok() {
4845            warn!("DescribeQuorum top-level error: {:?}", response.error_code);
4846        }
4847
4848        let topics = response
4849            .topics
4850            .into_iter()
4851            .map(|t| QuorumTopicResult {
4852                topic_name: t.topic_name,
4853                partitions: t
4854                    .partitions
4855                    .into_iter()
4856                    .map(|p| QuorumPartitionResult {
4857                        partition_index: p.partition_index,
4858                        error: if p.error_code.is_ok() {
4859                            None
4860                        } else {
4861                            Some(format!("{:?}", p.error_code))
4862                        },
4863                        leader_id: p.leader_id,
4864                        leader_epoch: p.leader_epoch,
4865                        high_watermark: p.high_watermark,
4866                        current_voters: p
4867                            .current_voters
4868                            .into_iter()
4869                            .map(|v| QuorumReplicaInfo {
4870                                replica_id: v.replica_id,
4871                                log_end_offset: v.log_end_offset,
4872                            })
4873                            .collect(),
4874                        observers: p
4875                            .observers
4876                            .into_iter()
4877                            .map(|o| QuorumReplicaInfo {
4878                                replica_id: o.replica_id,
4879                                log_end_offset: o.log_end_offset,
4880                            })
4881                            .collect(),
4882                    })
4883                    .collect(),
4884            })
4885            .collect::<Vec<_>>();
4886
4887        info!("DescribeQuorum returned {} topic(s)", topics.len());
4888
4889        Ok(DescribeQuorumResult {
4890            error: if response.error_code.is_ok() {
4891                None
4892            } else {
4893                Some(format!("{:?}", response.error_code))
4894            },
4895            topics,
4896        })
4897    }
4898}
4899
4900/// Result from [`AdminClient::describe_features`] (KIP-584).
4901#[non_exhaustive]
4902#[derive(Debug, Clone)]
4903pub struct DescribeFeaturesResult {
4904    /// Features supported by the responding broker.
4905    pub supported_features: Vec<SupportedFeature>,
4906    /// Cluster-wide finalized features.
4907    pub finalized_features: Vec<FinalizedFeature>,
4908    /// Monotonically increasing epoch for finalized features (−1 if unknown).
4909    pub finalized_features_epoch: i64,
4910}
4911
4912/// Result from [`AdminClient::update_features`] (KIP-584).
4913#[non_exhaustive]
4914#[derive(Debug, Clone)]
4915pub struct UpdateFeaturesResult {
4916    /// Per-feature results.
4917    pub results: Vec<UpdateFeatureResult>,
4918}
4919
4920/// Per-feature result from [`AdminClient::update_features`].
4921#[non_exhaustive]
4922#[derive(Debug, Clone)]
4923pub struct UpdateFeatureResult {
4924    /// Feature name.
4925    pub feature: String,
4926    /// Error message, or `None` if the update succeeded.
4927    pub error: Option<String>,
4928}
4929
4930/// Information about a single broker log directory.
4931///
4932/// Returned by [`AdminClient::describe_log_dirs`].
4933#[non_exhaustive]
4934#[derive(Debug, Clone)]
4935pub struct LogDirInfo {
4936    /// Broker that owns this log directory.
4937    pub broker_id: i32,
4938    /// Absolute path of the log directory on the broker.
4939    pub log_dir: String,
4940    /// Per-directory error, or `None` on success.
4941    pub error: Option<String>,
4942    /// Topics and partitions stored in this directory.
4943    pub topics: Vec<LogDirTopicInfo>,
4944    /// Total bytes of the volume (-1 if unknown, v4+).
4945    pub total_bytes: i64,
4946    /// Usable bytes on the volume (-1 if unknown, v4+).
4947    pub usable_bytes: i64,
4948}
4949
4950/// Per-topic partition details within a log directory.
4951#[non_exhaustive]
4952#[derive(Debug, Clone)]
4953pub struct LogDirTopicInfo {
4954    /// Topic name.
4955    pub name: String,
4956    /// Partitions of this topic in the log directory.
4957    pub partitions: Vec<LogDirPartitionInfo>,
4958}
4959
4960/// Per-partition details within a log directory.
4961#[non_exhaustive]
4962#[derive(Debug, Clone)]
4963pub struct LogDirPartitionInfo {
4964    /// Partition index.
4965    pub partition_index: i32,
4966    /// Size of the log in bytes.
4967    pub partition_size: i64,
4968    /// Offset lag behind the high watermark.
4969    pub offset_lag: i64,
4970    /// Whether this is a future replica (reassignment in progress).
4971    pub is_future_key: bool,
4972}
4973
4974/// Per-topic result from [`AdminClient::elect_leaders`].
4975#[non_exhaustive]
4976#[derive(Debug, Clone)]
4977pub struct ElectLeadersResult {
4978    /// Topic name.
4979    pub topic: String,
4980    /// Per-partition election results.
4981    pub partitions: Vec<ElectLeadersPartitionInfo>,
4982}
4983
4984/// Per-partition result from [`AdminClient::elect_leaders`].
4985#[non_exhaustive]
4986#[derive(Debug, Clone)]
4987pub struct ElectLeadersPartitionInfo {
4988    /// Partition ID.
4989    pub partition_id: i32,
4990    /// Error message, or `None` if the election succeeded.
4991    pub error: Option<String>,
4992}
4993
4994/// Result from [`AdminClient::alter_partition_reassignments`].
4995#[non_exhaustive]
4996#[derive(Debug, Clone)]
4997pub struct AlterReassignmentsResult {
4998    /// Top-level error, or `None` on success.
4999    pub error: Option<String>,
5000    /// Per-topic results.
5001    pub topics: Vec<ReassignmentTopicResult>,
5002}
5003
5004/// Per-topic result from [`AdminClient::alter_partition_reassignments`].
5005#[non_exhaustive]
5006#[derive(Debug, Clone)]
5007pub struct ReassignmentTopicResult {
5008    /// Topic name.
5009    pub name: String,
5010    /// Per-partition results.
5011    pub partitions: Vec<ReassignmentPartitionResult>,
5012}
5013
5014/// Per-partition result from [`AdminClient::alter_partition_reassignments`].
5015#[non_exhaustive]
5016#[derive(Debug, Clone)]
5017pub struct ReassignmentPartitionResult {
5018    /// Partition index.
5019    pub partition_index: i32,
5020    /// Error message, or `None` if the reassignment was accepted.
5021    pub error: Option<String>,
5022}
5023
5024/// Per-topic ongoing reassignment info from [`AdminClient::list_partition_reassignments`].
5025#[non_exhaustive]
5026#[derive(Debug, Clone)]
5027pub struct PartitionReassignmentInfo {
5028    /// Topic name.
5029    pub name: String,
5030    /// Per-partition reassignment details.
5031    pub partitions: Vec<PartitionReassignmentPartitionInfo>,
5032}
5033
5034/// Per-partition ongoing reassignment info from [`AdminClient::list_partition_reassignments`].
5035#[non_exhaustive]
5036#[derive(Debug, Clone)]
5037pub struct PartitionReassignmentPartitionInfo {
5038    /// Partition index.
5039    pub partition_index: i32,
5040    /// Current replica set.
5041    pub replicas: Vec<i32>,
5042    /// Replicas currently being added.
5043    pub adding_replicas: Vec<i32>,
5044    /// Replicas currently being removed.
5045    pub removing_replicas: Vec<i32>,
5046}
5047
5048// ════════════════════════════════════════════════════════════════════════
5049// Result types for new admin APIs
5050// ════════════════════════════════════════════════════════════════════════
5051
5052/// Per-topic result from [`AdminClient::alter_replica_log_dirs`].
5053#[non_exhaustive]
5054#[derive(Debug, Clone)]
5055pub struct AlterReplicaLogDirsResult {
5056    /// Broker that processed the request.
5057    pub broker_id: i32,
5058    /// Topic name.
5059    pub topic_name: String,
5060    /// Per-partition results.
5061    pub partitions: Vec<AlterReplicaLogDirsPartitionResult>,
5062}
5063
5064/// Per-partition result from [`AdminClient::alter_replica_log_dirs`].
5065#[non_exhaustive]
5066#[derive(Debug, Clone)]
5067pub struct AlterReplicaLogDirsPartitionResult {
5068    /// Partition index.
5069    pub partition_index: i32,
5070    /// Error message, or `None` on success.
5071    pub error: Option<String>,
5072}
5073
5074/// Result from `AdminClient::delete_offsets`.
5075#[non_exhaustive]
5076#[derive(Debug, Clone)]
5077pub struct OffsetDeleteResult {
5078    /// Top-level error, or `None` on success.
5079    pub error: Option<String>,
5080    /// Per-topic results.
5081    pub topics: Vec<OffsetDeleteTopicResult>,
5082}
5083
5084/// Per-topic result from `AdminClient::delete_offsets`.
5085#[non_exhaustive]
5086#[derive(Debug, Clone)]
5087pub struct OffsetDeleteTopicResult {
5088    /// Topic name.
5089    pub name: String,
5090    /// Per-partition results.
5091    pub partitions: Vec<OffsetDeletePartitionResult>,
5092}
5093
5094/// Per-partition result from `AdminClient::delete_offsets`.
5095#[non_exhaustive]
5096#[derive(Debug, Clone)]
5097pub struct OffsetDeletePartitionResult {
5098    /// Partition index.
5099    pub partition_index: i32,
5100    /// Error message, or `None` on success.
5101    pub error: Option<String>,
5102}
5103
5104/// Result from [`AdminClient::describe_user_scram_credentials`].
5105#[non_exhaustive]
5106#[derive(Debug, Clone)]
5107pub struct DescribeUserScramCredentialsResult {
5108    /// Top-level error, or `None` on success.
5109    pub error: Option<String>,
5110    /// Per-user results.
5111    pub users: Vec<ScramCredentialUserResult>,
5112}
5113
5114/// Per-user result from [`AdminClient::describe_user_scram_credentials`].
5115#[non_exhaustive]
5116#[derive(Debug, Clone)]
5117pub struct ScramCredentialUserResult {
5118    /// User name.
5119    pub name: String,
5120    /// Error message, or `None` on success.
5121    pub error: Option<String>,
5122    /// Credential info entries.
5123    pub credential_infos: Vec<ScramCredentialInfoResult>,
5124}
5125
5126/// SCRAM credential info for a user.
5127#[non_exhaustive]
5128#[derive(Debug, Clone)]
5129pub struct ScramCredentialInfoResult {
5130    /// SCRAM mechanism.
5131    pub mechanism: ScramMechanism,
5132    /// Number of iterations.
5133    pub iterations: i32,
5134}
5135
5136/// Per-user result from [`AdminClient::alter_user_scram_credentials`].
5137#[non_exhaustive]
5138#[derive(Debug, Clone)]
5139pub struct AlterScramCredentialResult {
5140    /// User name.
5141    pub user: String,
5142    /// Error message, or `None` on success.
5143    pub error: Option<String>,
5144}
5145
5146/// Per-topic result from [`AdminClient::describe_producers`].
5147#[non_exhaustive]
5148#[derive(Debug, Clone)]
5149pub struct DescribeProducersTopicResult {
5150    /// Topic name.
5151    pub name: String,
5152    /// Per-partition results.
5153    pub partitions: Vec<DescribeProducersPartitionInfo>,
5154}
5155
5156/// Per-partition result from [`AdminClient::describe_producers`].
5157#[non_exhaustive]
5158#[derive(Debug, Clone)]
5159pub struct DescribeProducersPartitionInfo {
5160    /// Partition index.
5161    pub partition_index: i32,
5162    /// Error message, or `None` on success.
5163    pub error: Option<String>,
5164    /// Active producers on this partition.
5165    pub active_producers: Vec<ProducerStateInfo>,
5166}
5167
5168/// Active producer state on a partition.
5169#[non_exhaustive]
5170#[derive(Debug, Clone)]
5171pub struct ProducerStateInfo {
5172    /// Producer ID.
5173    pub producer_id: i64,
5174    /// Producer epoch.
5175    pub producer_epoch: i32,
5176    /// Last sequence number sent. `-1` if unknown.
5177    pub last_sequence: i32,
5178    /// Last timestamp sent. `-1` if unknown.
5179    pub last_timestamp: i64,
5180    /// Coordinator epoch.
5181    pub coordinator_epoch: i32,
5182    /// Current transaction start offset. `-1` if not in a transaction.
5183    pub current_txn_start_offset: i64,
5184}
5185
5186/// Transaction description from [`AdminClient::describe_transactions`].
5187#[non_exhaustive]
5188#[derive(Debug, Clone)]
5189pub struct TransactionDescription {
5190    /// Transactional ID.
5191    pub transactional_id: String,
5192    /// Error message, or `None` on success.
5193    pub error: Option<String>,
5194    /// Current state (e.g. "Ongoing", "PrepareCommit", "PrepareAbort").
5195    pub state: String,
5196    /// Transaction timeout in milliseconds.
5197    pub timeout_ms: i32,
5198    /// Transaction start time in milliseconds since epoch.
5199    pub start_time_ms: i64,
5200    /// Producer ID.
5201    pub producer_id: i64,
5202    /// Producer epoch.
5203    pub producer_epoch: i16,
5204    /// Topic-partitions involved in the transaction.
5205    pub topics: Vec<TransactionTopicInfo>,
5206}
5207
5208/// Topic-partitions involved in a transaction.
5209#[non_exhaustive]
5210#[derive(Debug, Clone)]
5211pub struct TransactionTopicInfo {
5212    /// Topic name.
5213    pub topic: String,
5214    /// Partition indexes.
5215    pub partitions: Vec<i32>,
5216}
5217
5218/// Result from [`AdminClient::list_transactions`].
5219#[non_exhaustive]
5220#[derive(Debug, Clone)]
5221pub struct ListTransactionsResult {
5222    /// Top-level error, or `None` on success.
5223    pub error: Option<String>,
5224    /// State filters that were not recognized by the coordinator.
5225    pub unknown_state_filters: Vec<String>,
5226    /// Listed transactions.
5227    pub transactions: Vec<TransactionListEntry>,
5228}
5229
5230/// A single transaction entry from [`AdminClient::list_transactions`].
5231#[non_exhaustive]
5232#[derive(Debug, Clone)]
5233pub struct TransactionListEntry {
5234    /// Transactional ID.
5235    pub transactional_id: String,
5236    /// Producer ID.
5237    pub producer_id: i64,
5238    /// Current transaction state.
5239    pub state: String,
5240}
5241
5242/// Per-partition result from [`AdminClient::write_txn_markers`].
5243#[non_exhaustive]
5244#[derive(Debug, Clone)]
5245pub struct WriteTxnMarkersPartitionResult {
5246    /// Partition index.
5247    pub partition_index: i32,
5248    /// Error string, or `None` on success.
5249    pub error: Option<String>,
5250}
5251
5252/// Per-topic result from [`AdminClient::write_txn_markers`].
5253#[non_exhaustive]
5254#[derive(Debug, Clone)]
5255pub struct WriteTxnMarkersTopicResult {
5256    /// Topic name.
5257    pub name: String,
5258    /// Per-partition results.
5259    pub partitions: Vec<WriteTxnMarkersPartitionResult>,
5260}
5261
5262/// Result for one producer marker from [`AdminClient::write_txn_markers`].
5263#[non_exhaustive]
5264#[derive(Debug, Clone)]
5265pub struct WriteTxnMarkersResult {
5266    /// Producer ID this result pertains to.
5267    pub producer_id: i64,
5268    /// Per-topic results.
5269    pub topics: Vec<WriteTxnMarkersTopicResult>,
5270}
5271
5272/// Replica (voter or observer) info from `AdminClient::describe_quorum`.
5273#[non_exhaustive]
5274#[derive(Debug, Clone)]
5275pub struct QuorumReplicaInfo {
5276    /// Replica broker ID.
5277    pub replica_id: i32,
5278    /// Last known log end offset, or -1 if unknown.
5279    pub log_end_offset: i64,
5280}
5281
5282/// Per-partition quorum info from `AdminClient::describe_quorum`.
5283#[non_exhaustive]
5284#[derive(Debug, Clone)]
5285pub struct QuorumPartitionResult {
5286    /// Partition index.
5287    pub partition_index: i32,
5288    /// Per-partition error, or `None` on success.
5289    pub error: Option<String>,
5290    /// Leader broker ID, or -1 if unknown.
5291    pub leader_id: i32,
5292    /// Latest known leader epoch.
5293    pub leader_epoch: i32,
5294    /// High watermark offset.
5295    pub high_watermark: i64,
5296    /// Current voters.
5297    pub current_voters: Vec<QuorumReplicaInfo>,
5298    /// Observers.
5299    pub observers: Vec<QuorumReplicaInfo>,
5300}
5301
5302/// Per-topic quorum info from `AdminClient::describe_quorum`.
5303#[non_exhaustive]
5304#[derive(Debug, Clone)]
5305pub struct QuorumTopicResult {
5306    /// Topic name.
5307    pub topic_name: String,
5308    /// Per-partition quorum results.
5309    pub partitions: Vec<QuorumPartitionResult>,
5310}
5311
5312/// Result from `AdminClient::describe_quorum`.
5313#[non_exhaustive]
5314#[derive(Debug, Clone)]
5315pub struct DescribeQuorumResult {
5316    /// Top-level error, or `None` on success.
5317    pub error: Option<String>,
5318    /// Per-topic quorum data.
5319    pub topics: Vec<QuorumTopicResult>,
5320}
5321
5322/// Builder for AdminClient.
5323#[must_use = "builders do nothing until .build() is called"]
5324#[derive(Debug, Default)]
5325pub struct AdminClientBuilder {
5326    config: AdminConfig,
5327}
5328
5329impl AdminClientBuilder {
5330    /// Set bootstrap servers.
5331    pub fn bootstrap_servers(mut self, servers: impl Into<String>) -> Self {
5332        self.config.bootstrap_servers = servers.into();
5333        self
5334    }
5335
5336    /// Set client ID.
5337    pub fn client_id(mut self, id: impl Into<String>) -> Self {
5338        self.config.client_id = id.into();
5339        self
5340    }
5341
5342    /// Set request timeout.
5343    pub fn request_timeout(mut self, timeout: Duration) -> Self {
5344        self.config.request_timeout = timeout;
5345        self
5346    }
5347
5348    /// Set authentication configuration.
5349    ///
5350    /// # Example
5351    ///
5352    /// ```rust,ignore
5353    /// use krafka::admin::AdminClient;
5354    /// use krafka::auth::AuthConfig;
5355    ///
5356    /// let client = AdminClient::builder()
5357    ///     .bootstrap_servers("localhost:9092")
5358    ///     .auth(AuthConfig::sasl_plain("user", "password")?)
5359    ///     .build()
5360    ///     .await?;
5361    /// ```
5362    pub fn auth(mut self, auth: AuthConfig) -> Self {
5363        self.config.auth = Some(auth);
5364        self
5365    }
5366
5367    /// Set SOCKS5 proxy configuration.
5368    ///
5369    /// Routes all broker connections through the specified SOCKS5 proxy.
5370    #[cfg(feature = "socks5")]
5371    pub fn proxy(mut self, proxy: crate::network::ProxyConfig) -> Self {
5372        self.config.proxy = Some(proxy);
5373        self
5374    }
5375
5376    /// Configure SASL/PLAIN authentication.
5377    pub fn sasl_plain(
5378        mut self,
5379        username: impl Into<String>,
5380        password: impl Into<String>,
5381    ) -> crate::Result<Self> {
5382        self.config.auth = Some(AuthConfig::sasl_plain(username, password)?);
5383        Ok(self)
5384    }
5385
5386    /// Configure SASL/SCRAM-SHA-256 authentication.
5387    pub fn sasl_scram_sha256(
5388        mut self,
5389        username: impl Into<String>,
5390        password: impl Into<String>,
5391    ) -> Self {
5392        self.config.auth = Some(AuthConfig::sasl_scram_sha256(username, password));
5393        self
5394    }
5395
5396    /// Configure SASL/SCRAM-SHA-512 authentication.
5397    pub fn sasl_scram_sha512(
5398        mut self,
5399        username: impl Into<String>,
5400        password: impl Into<String>,
5401    ) -> Self {
5402        self.config.auth = Some(AuthConfig::sasl_scram_sha512(username, password));
5403        self
5404    }
5405
5406    /// Configure SASL/OAUTHBEARER authentication with a static token.
5407    ///
5408    /// For automatic token refresh, use [`sasl_oauthbearer_provider()`](Self::sasl_oauthbearer_provider).
5409    /// For SASL extensions, use `.auth(AuthConfig::sasl_oauthbearer_token(...))`.
5410    pub fn sasl_oauthbearer(mut self, token: impl Into<String>) -> Self {
5411        self.config.auth = Some(AuthConfig::sasl_oauthbearer(token));
5412        self
5413    }
5414
5415    /// Configure SASL/OAUTHBEARER authentication with an async token provider.
5416    ///
5417    /// The provider is called on every new broker connection, ensuring
5418    /// tokens are always fresh.
5419    pub fn sasl_oauthbearer_provider(
5420        mut self,
5421        provider: impl crate::auth::OAuthBearerTokenProvider + 'static,
5422    ) -> Self {
5423        self.config.auth = Some(AuthConfig::sasl_oauthbearer_provider(provider));
5424        self
5425    }
5426
5427    /// Build the admin client.
5428    pub async fn build(self) -> Result<AdminClient> {
5429        if self.config.bootstrap_servers.is_empty() {
5430            return Err(KrafkaError::config("bootstrap.servers is required"));
5431        }
5432
5433        let bootstrap_servers =
5434            crate::util::parse_bootstrap_servers(&self.config.bootstrap_servers)?;
5435
5436        // Create connection config with client ID and auth
5437        let mut conn_config_builder = ConnectionConfig::builder()
5438            .client_id(&self.config.client_id)
5439            .request_timeout(self.config.request_timeout);
5440
5441        if let Some(ref auth) = self.config.auth {
5442            conn_config_builder = conn_config_builder.auth(auth.clone());
5443        }
5444
5445        #[cfg(feature = "socks5")]
5446        if let Some(ref proxy) = self.config.proxy {
5447            conn_config_builder = conn_config_builder.proxy(proxy.clone());
5448        }
5449
5450        let mut conn_config = conn_config_builder.build();
5451        conn_config.init_tls().await?;
5452
5453        let pool = Arc::new(ConnectionPool::new(conn_config));
5454        pool.start_idle_evictor();
5455        let metadata = Arc::new(
5456            ClusterMetadata::new(bootstrap_servers, pool.clone(), Duration::from_secs(300))
5457                .with_recovery_strategy(self.config.metadata_recovery_strategy)
5458                .with_rebootstrap_trigger(self.config.metadata_recovery_rebootstrap_trigger),
5459        );
5460
5461        metadata.refresh().await?;
5462
5463        info!(
5464            "AdminClient initialized with auth: {}",
5465            if self.config.auth.is_some() {
5466                "configured"
5467            } else {
5468                "none"
5469            }
5470        );
5471
5472        Ok(AdminClient {
5473            config: self.config,
5474            metadata,
5475            pool,
5476            closed: std::sync::atomic::AtomicBool::new(false),
5477        })
5478    }
5479}
5480
5481#[cfg(test)]
5482#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
5483mod tests {
5484    use super::*;
5485
5486    #[test]
5487    fn test_new_topic() {
5488        let topic = NewTopic::new("test-topic", 3, 2)
5489            .unwrap()
5490            .with_config("cleanup.policy", "compact")
5491            .with_config("retention.ms", "86400000");
5492
5493        assert_eq!(topic.name, "test-topic");
5494        assert_eq!(topic.num_partitions, 3);
5495        assert_eq!(topic.replication_factor, 2);
5496        assert_eq!(topic.configs.len(), 2);
5497    }
5498
5499    #[test]
5500    fn test_new_topic_validation() {
5501        assert!(NewTopic::new("t", 1, 1).is_ok());
5502        assert!(NewTopic::new("t", -1, -1).is_ok());
5503        assert!(NewTopic::new("t", 0, 1).is_err());
5504        assert!(NewTopic::new("t", -2, 1).is_err());
5505        assert!(NewTopic::new("t", 1, 0).is_err());
5506        assert!(NewTopic::new("t", 1, -2).is_err());
5507    }
5508
5509    /// H6: empty / oversize topic names must be rejected at `NewTopic::new`
5510    /// so the panicking `KafkaString::encode` path is unreachable from the
5511    /// public API.
5512    #[test]
5513    fn test_new_topic_name_validation_rejects_empty_and_oversize() {
5514        let empty = NewTopic::new("", 1, 1).unwrap_err().to_string();
5515        assert!(
5516            empty.contains("topic name cannot be empty"),
5517            "expected empty-name error, got: {empty}"
5518        );
5519
5520        let oversize = "x".repeat(i16::MAX as usize + 1);
5521        let err = NewTopic::new(oversize, 1, 1).unwrap_err().to_string();
5522        assert!(
5523            err.contains("exceeds protocol limit"),
5524            "expected protocol-limit error, got: {err}"
5525        );
5526
5527        // Boundary: exactly i16::MAX bytes is accepted.
5528        let max_ok = "x".repeat(i16::MAX as usize);
5529        assert!(NewTopic::new(max_ok, 1, 1).is_ok());
5530    }
5531
5532    #[test]
5533    fn test_admin_config_default() {
5534        let config = AdminConfig::default();
5535        assert_eq!(config.client_id, "krafka-admin");
5536        assert_eq!(config.request_timeout, Duration::from_secs(30));
5537        assert_eq!(
5538            config.metadata_recovery_strategy,
5539            MetadataRecoveryStrategy::Rebootstrap
5540        );
5541    }
5542
5543    #[test]
5544    fn test_describe_acls_result() {
5545        let result = DescribeAclsResult {
5546            error: None,
5547            bindings: vec![
5548                AclBinding::allow_read_topic("my-topic", "User:alice"),
5549                AclBinding::allow_write_topic("my-topic", "User:bob"),
5550            ],
5551        };
5552        assert!(result.error.is_none());
5553        assert_eq!(result.bindings.len(), 2);
5554    }
5555
5556    #[test]
5557    fn test_create_acls_result() {
5558        let result = CreateAclsResult {
5559            results: vec![
5560                CreateAclResult { error: None },
5561                CreateAclResult {
5562                    error: Some("ACL already exists".to_string()),
5563                },
5564            ],
5565        };
5566        assert!(result.results[0].error.is_none());
5567        assert!(result.results[1].error.is_some());
5568    }
5569
5570    #[test]
5571    fn test_delete_acls_result() {
5572        let result = DeleteAclsResult {
5573            filter_results: vec![
5574                DeleteAclFilterResult {
5575                    error: None,
5576                    deleted_count: 3,
5577                },
5578                DeleteAclFilterResult {
5579                    error: None,
5580                    deleted_count: 0,
5581                },
5582            ],
5583        };
5584        assert_eq!(result.filter_results[0].deleted_count, 3);
5585        assert_eq!(result.filter_results[1].deleted_count, 0);
5586    }
5587
5588    #[test]
5589    fn test_acl_filter_builder() {
5590        use crate::protocol::{AclOperation, AclPatternType, AclPermissionType, AclResourceType};
5591
5592        // Test default filter (matches everything)
5593        let filter = AclFilter::all();
5594        assert_eq!(filter.resource_type, AclResourceType::Any);
5595        assert_eq!(filter.pattern_type, AclPatternType::Any);
5596        assert_eq!(filter.operation, AclOperation::Any);
5597        assert_eq!(filter.permission_type, AclPermissionType::Any);
5598        assert!(filter.resource_name.is_none());
5599        assert!(filter.principal.is_none());
5600        assert!(filter.host.is_none());
5601
5602        // Test filter for specific resource
5603        let filter = AclFilter::for_resource(AclResourceType::Topic, "my-topic");
5604        assert_eq!(filter.resource_type, AclResourceType::Topic);
5605        assert_eq!(filter.resource_name, Some("my-topic".to_string()));
5606
5607        // Test filter for specific principal
5608        let filter = AclFilter::for_principal("User:alice");
5609        assert_eq!(filter.principal, Some("User:alice".to_string()));
5610
5611        // Test builder chain
5612        let filter = AclFilter::all()
5613            .resource_type(AclResourceType::Group)
5614            .resource_name("my-group")
5615            .pattern_type(AclPatternType::Literal)
5616            .principal("User:bob")
5617            .host("localhost")
5618            .operation(AclOperation::Read)
5619            .permission_type(AclPermissionType::Allow);
5620
5621        assert_eq!(filter.resource_type, AclResourceType::Group);
5622        assert_eq!(filter.resource_name, Some("my-group".to_string()));
5623        assert_eq!(filter.pattern_type, AclPatternType::Literal);
5624        assert_eq!(filter.principal, Some("User:bob".to_string()));
5625        assert_eq!(filter.host, Some("localhost".to_string()));
5626        assert_eq!(filter.operation, AclOperation::Read);
5627        assert_eq!(filter.permission_type, AclPermissionType::Allow);
5628    }
5629
5630    #[test]
5631    fn test_admin_builder_with_auth() {
5632        use crate::auth::AuthConfig;
5633
5634        let builder = AdminClient::builder()
5635            .bootstrap_servers("broker:9093")
5636            .auth(AuthConfig::sasl_plain("user", "pass").unwrap());
5637
5638        let auth = builder.config.auth.as_ref().unwrap();
5639        assert!(auth.requires_sasl());
5640        assert!(!auth.requires_tls());
5641        assert!(auth.plain_credentials.is_some());
5642    }
5643
5644    #[test]
5645    fn test_admin_builder_sasl_plain() {
5646        let builder = AdminClient::builder()
5647            .bootstrap_servers("broker:9093")
5648            .sasl_plain("admin", "admin-secret")
5649            .unwrap();
5650
5651        let auth = builder.config.auth.as_ref().unwrap();
5652        assert_eq!(
5653            auth.security_protocol,
5654            crate::auth::SecurityProtocol::SaslPlaintext
5655        );
5656        assert_eq!(auth.sasl_mechanism, Some(crate::auth::SaslMechanism::Plain));
5657        let creds = auth.plain_credentials.as_ref().unwrap();
5658        assert_eq!(creds.username, "admin");
5659    }
5660
5661    #[test]
5662    fn test_admin_builder_sasl_scram() {
5663        let builder = AdminClient::builder()
5664            .bootstrap_servers("broker:9093")
5665            .sasl_scram_sha256("user", "pass");
5666
5667        let auth = builder.config.auth.as_ref().unwrap();
5668        assert_eq!(
5669            auth.sasl_mechanism,
5670            Some(crate::auth::SaslMechanism::ScramSha256)
5671        );
5672        assert!(auth.scram_credentials.is_some());
5673
5674        let builder = AdminClient::builder()
5675            .bootstrap_servers("broker:9093")
5676            .sasl_scram_sha512("user", "pass");
5677
5678        let auth = builder.config.auth.as_ref().unwrap();
5679        assert_eq!(
5680            auth.sasl_mechanism,
5681            Some(crate::auth::SaslMechanism::ScramSha512)
5682        );
5683        assert!(auth.scram_credentials.is_some());
5684    }
5685
5686    #[test]
5687    fn test_admin_builder_aws_msk_iam() {
5688        use crate::auth::AuthConfig;
5689
5690        let auth = AuthConfig::aws_msk_iam("AKID", "secret", "us-east-1");
5691        let builder = AdminClient::builder()
5692            .bootstrap_servers("broker:9094")
5693            .auth(auth);
5694
5695        let auth = builder.config.auth.as_ref().unwrap();
5696        assert!(auth.requires_tls());
5697        assert!(auth.requires_sasl());
5698        assert_eq!(
5699            auth.sasl_mechanism,
5700            Some(crate::auth::SaslMechanism::AwsMskIam)
5701        );
5702        assert!(auth.aws_msk_iam_credentials.is_some());
5703        assert!(auth.tls_config.is_some());
5704    }
5705
5706    #[test]
5707    fn test_admin_builder_no_auth_by_default() {
5708        let builder = AdminClient::builder().bootstrap_servers("broker:9092");
5709
5710        assert!(builder.config.auth.is_none());
5711    }
5712
5713    #[test]
5714    fn test_consumer_group_description() {
5715        let desc = ConsumerGroupDescription {
5716            group_id: "my-group".to_string(),
5717            group_type: GroupType::Classic,
5718            state: "Stable".to_string(),
5719            protocol_type: Some("consumer".to_string()),
5720            assignor: Some("range".to_string()),
5721            group_epoch: None,
5722            assignment_epoch: None,
5723            members: vec![
5724                ConsumerGroupMember {
5725                    member_id: "member-1".to_string(),
5726                    instance_id: Some("instance-1".to_string()),
5727                    rack_id: None,
5728                    member_epoch: None,
5729                    client_id: "my-client".to_string(),
5730                    client_host: "/127.0.0.1".to_string(),
5731                    subscribed_topic_names: None,
5732                    subscribed_topic_regex: None,
5733                    assignment: None,
5734                    target_assignment: None,
5735                    member_type: None,
5736                },
5737                ConsumerGroupMember {
5738                    member_id: "member-2".to_string(),
5739                    instance_id: None,
5740                    rack_id: None,
5741                    member_epoch: None,
5742                    client_id: "other-client".to_string(),
5743                    client_host: "/192.168.1.1".to_string(),
5744                    subscribed_topic_names: None,
5745                    subscribed_topic_regex: None,
5746                    assignment: None,
5747                    target_assignment: None,
5748                    member_type: None,
5749                },
5750            ],
5751            authorized_operations: None,
5752            error: None,
5753        };
5754        assert_eq!(desc.group_id, "my-group");
5755        assert_eq!(desc.group_type, GroupType::Classic);
5756        assert_eq!(desc.state, "Stable");
5757        assert_eq!(desc.members.len(), 2);
5758        assert!(desc.members[0].instance_id.is_some());
5759        assert!(desc.members[1].instance_id.is_none());
5760        assert!(desc.error.is_none());
5761    }
5762
5763    #[test]
5764    fn test_consumer_group_listing() {
5765        let listing = ConsumerGroupListing {
5766            group_id: "my-group".to_string(),
5767            protocol_type: "consumer".to_string(),
5768            group_type: Some(GroupType::Consumer),
5769        };
5770        assert_eq!(listing.group_id, "my-group");
5771        assert_eq!(listing.protocol_type, "consumer");
5772        assert_eq!(listing.group_type, Some(GroupType::Consumer));
5773    }
5774
5775    #[test]
5776    fn test_delete_record_result() {
5777        let result = DeleteRecordResult {
5778            topic: "my-topic".to_string(),
5779            partition: 0,
5780            low_watermark: 100,
5781            error: None,
5782        };
5783        assert_eq!(result.topic, "my-topic");
5784        assert_eq!(result.partition, 0);
5785        assert_eq!(result.low_watermark, 100);
5786        assert!(result.error.is_none());
5787
5788        let result_err = DeleteRecordResult {
5789            topic: "my-topic".to_string(),
5790            partition: 1,
5791            low_watermark: -1,
5792            error: Some("NotLeaderOrFollower".to_string()),
5793        };
5794        assert!(result_err.error.is_some());
5795    }
5796
5797    #[test]
5798    fn test_leader_epoch_result() {
5799        let result = LeaderEpochResult {
5800            topic: "my-topic".to_string(),
5801            partition: 0,
5802            leader_epoch: 5,
5803            end_offset: 1000,
5804            error: None,
5805        };
5806        assert_eq!(result.topic, "my-topic");
5807        assert_eq!(result.leader_epoch, 5);
5808        assert_eq!(result.end_offset, 1000);
5809        assert!(result.error.is_none());
5810    }
5811
5812    #[test]
5813    fn test_admin_client_is_send_sync() {
5814        fn assert_send_sync<T: Send + Sync>() {}
5815        assert_send_sync::<AdminClient>();
5816    }
5817
5818    #[cfg(feature = "socks5")]
5819    #[test]
5820    fn test_admin_config_builder_proxy_round_trip() {
5821        let config = AdminConfig::builder()
5822            .proxy(crate::network::ProxyConfig::new("proxy:1080"))
5823            .build();
5824        let proxy = config.proxy().expect("proxy should be set");
5825        assert_eq!(proxy.address(), "proxy:1080");
5826    }
5827}