---
layout: default
title: Admin Client
nav_order: 5
description: "Cluster administration - topics, partitions, and configuration"
---
# Admin Client Guide
This guide covers administrative operations using the Krafka AdminClient.
## Overview
The AdminClient provides cluster administration capabilities:
- Topic management (create, delete, describe, list)
- Consumer group management (describe, list, KIP-848 describe)
- Topic partition details (paginated describe with ELR)
- Record deletion (delete records before an offset)
- Leader epoch queries (detect log truncation)
- Cluster information
- Partition management
- ACL management
- Delegation token management (create, describe, renew, expire)
- Client quota management (describe, alter)
- Cluster feature versioning (describe, update — KIP-584)
- Log directory inspection (describe log dirs with volume capacity)
- Move replicas between log directories
- Delete consumer group committed offsets
- SCRAM credential management (describe, alter — KIP-554)
- Transaction debugging (describe producers, describe/list transactions — KIP-664)
- Client metrics resource discovery (KIP-714)
### API Version Negotiation
The AdminClient automatically negotiates the best API version for each RPC
using the broker's `ApiVersions` response. This ensures forward compatibility
with newer Kafka releases while gracefully falling back to older protocol
versions on legacy brokers. If a broker does not support a required API, the
client returns a clear protocol error.
## Basic Usage
```rust
use krafka::admin::AdminClient;
use krafka::error::Result;
#[tokio::main]
async fn main() -> Result<()> {
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.build()
.await?;
// List all topics
let topics = admin.list_topics().await?;
println!("Topics: {:?}", topics);
Ok(())
}
```
## Authentication
The AdminClient supports all SASL authentication mechanisms:
### SASL/PLAIN
```rust
use krafka::admin::AdminClient;
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.sasl_plain("username", "password")
.build()
.await?;
```
### SASL/SCRAM-SHA-256
```rust
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.sasl_scram_sha256("username", "password")
.build()
.await?;
```
### SASL/SCRAM-SHA-512
```rust
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.sasl_scram_sha512("username", "password")
.build()
.await?;
```
### Generic AuthConfig
For AWS MSK IAM or advanced configurations:
```rust
use krafka::admin::AdminClient;
use krafka::auth::AuthConfig;
let auth = AuthConfig::aws_msk_iam("access_key", "secret_key", "us-east-1");
let admin = AdminClient::builder()
.bootstrap_servers("msk-broker:9098")
.auth(auth)
.build()
.await?;
```
## Topic Management
### Creating Topics
```rust
use krafka::admin::{AdminClient, NewTopic};
use std::time::Duration;
let admin = AdminClient::builder()
.bootstrap_servers("localhost:9092")
.build()
.await?;
// Simple topic creation
let topic = NewTopic::new("my-topic", 6, 3);
let results = admin
.create_topics(vec![topic], Duration::from_secs(30))
.await?;
for result in results {
match result.error {
None => println!("Created: {}", result.name),
Some(e) => println!("Failed to create {}: {}", result.name, e),
}
}
```
### Creating Topics with Configuration
```rust
use krafka::admin::{AdminClient, NewTopic};
use std::time::Duration;
let topic = NewTopic::new("compacted-topic", 12, 3)
.with_config("cleanup.policy", "compact")
.with_config("min.insync.replicas", "2")
.with_config("retention.ms", "604800000"); // 7 days
admin.create_topics(vec![topic], Duration::from_secs(30)).await?;
```
### Deleting Topics
```rust
use std::time::Duration;
let results = admin
.delete_topics(
vec!["topic-to-delete".to_string()],
Duration::from_secs(30),
)
.await?;
for result in results {
match result.error {
None => println!("Deleted: {}", result.name),
Some(e) => println!("Failed to delete {}: {}", result.name, e),
}
}
```
### Listing Topics
```rust
let topics = admin.list_topics().await?;
println!("Topics in cluster:");
for topic in topics {
println!(" - {}", topic);
}
```
### Describing Topics
```rust
let descriptions = admin
.describe_topics(&["topic1".to_string(), "topic2".to_string()])
.await?;
for topic in descriptions {
println!("Topic: {}", topic.name);
println!(" Partitions: {}", topic.partitions.len());
for partition in &topic.partitions {
println!(
" Partition {}: leader={}, replicas={:?}, isr={:?}",
partition.partition,
partition.leader,
partition.replicas,
partition.isr
);
}
}
```
### Increasing Partition Count
You can increase the number of partitions for an existing topic (but never decrease):
```rust
use std::time::Duration;
let result = admin
.create_partitions("my-topic", 12, Duration::from_secs(30))
.await?;
match result.error {
None => println!("Partitions increased to 12"),
Some(e) => println!("Failed: {}", e),
}
```
## Configuration Management
### Describing Configuration
```rust
use krafka::admin::DescribeConfigsRequest;
let configs = admin.describe_configs(DescribeConfigsRequest::for_topic("my-topic")).await?;
println!("Topic configuration:");
for config in configs {
let value = config.value.as_deref().unwrap_or("(null)");
let flags = format!(
"{}{}{}",
if config.read_only { "R" } else { "" },
if config.is_default { "D" } else { "" },
if config.is_sensitive { "S" } else { "" }
);
println!(" {}: {} [{}]", config.name, value, flags);
}
```
### Describing Broker Configuration
```rust
let configs = admin.describe_configs(DescribeConfigsRequest::for_broker(0)).await?;
println!("Broker 0 configuration:");
}
```
### Altering Topic Configuration
```rust
use std::collections::HashMap;
let mut configs = HashMap::new();
configs.insert("retention.ms".to_string(), "86400000".to_string()); // 1 day
configs.insert("cleanup.policy".to_string(), "compact".to_string());
let result = admin.alter_topic_config("my-topic", configs).await?;
match result.error {
None => println!("Configuration updated"),
Some(e) => println!("Failed: {}", e),
}
```
## Cluster Information
### Describing the Cluster
```rust
let cluster = admin.describe_cluster().await?;
println!("Cluster info:");
println!(" Cluster ID: {}", cluster.cluster_id);
println!(" Controller: {}", cluster.controller_id);
println!(" Brokers:");
for broker in cluster.brokers {
println!(
" - {} at {}:{} (rack: {:?})",
broker.broker_id, broker.host, broker.port, broker.rack
);
}
```
### Getting Partition Count
```rust
if let Some(count) = admin.partition_count("my-topic").await? {
println!("Topic has {} partitions", count);
} else {
println!("Topic not found");
}
```
## Error Handling
```rust
use krafka::admin::{AdminClient, NewTopic};
use krafka::error::KrafkaError;
use std::time::Duration;
async fn create_topic_if_not_exists(
admin: &AdminClient,
name: &str,
partitions: i32,
replication_factor: i16,
) -> Result<(), KrafkaError> {
// Check if topic exists
let topics = admin.list_topics().await?;
if topics.contains(&name.to_string()) {
println!("Topic {} already exists", name);
return Ok(());
}
// Create the topic
let topic = NewTopic::new(name, partitions, replication_factor);
let results = admin
.create_topics(vec![topic], Duration::from_secs(30))
.await?;
for result in results {
if let Some(error) = result.error {
return Err(KrafkaError::broker(
krafka::error::ErrorCode::UnknownServerError,
error,
));
}
}
println!("Created topic: {}", name);
Ok(())
}
```
## Common Topic Configurations
| `cleanup.policy` | String | `delete` | `delete` or `compact` |
| `compression.type` | String | `producer` | Compression type |
| `retention.ms` | Long | `-1` | Message retention time (-1 = infinite) |
| `retention.bytes` | Long | `-1` | Max partition size (-1 = infinite) |
| `segment.bytes` | Int | 1GB | Segment file size |
| `min.insync.replicas` | Int | `1` | Min ISR for writes with acks=all |
| `max.message.bytes` | Int | 1MB | Max message size |
| `unclean.leader.election.enable` | Bool | `false` | Allow unclean leader election |
## Best Practices
### Always Check Results
```rust
let results = admin.create_topics(topics, timeout).await?;
let mut success = true;
for result in results {
if let Some(error) = &result.error {
eprintln!("Failed to create {}: {}", result.name, error);
success = false;
}
}
if !success {
return Err(KrafkaError::invalid_state("Some topics failed to create"));
}
```
### Use Appropriate Timeouts
```rust
use std::time::Duration;
// Short timeout for simple operations
admin.list_topics().await?; // Uses default timeout
// Longer timeout for operations that may take time
admin.create_topics(topics, Duration::from_secs(60)).await?;
admin.delete_topics(topics, Duration::from_secs(60)).await?;
```
### Handle Topic Already Exists
```rust
let results = admin.create_topics(vec![topic], timeout).await?;
for result in results {
match &result.error {
None => println!("Created: {}", result.name),
Some(e) if e.contains("TOPIC_ALREADY_EXISTS") => {
println!("Topic {} already exists (OK)", result.name);
}
Some(e) => {
return Err(KrafkaError::broker(
krafka::error::ErrorCode::UnknownServerError,
e.clone(),
));
}
}
}
```
## ACL Management
The AdminClient supports Access Control List (ACL) management for Kafka security.
### Using AclFilter (Recommended)
The `AclFilter` struct provides a cleaner API for ACL queries:
```rust
use krafka::admin::AclFilter;
use krafka::protocol::AclResourceType;
// Filter that matches all ACLs
let all_acls = admin.describe_acls_with_filter(AclFilter::all()).await?;
// Filter for a specific topic
let topic_acls = admin.describe_acls_with_filter(
AclFilter::for_resource(AclResourceType::Topic, "my-topic")
).await?;
// Filter for a specific principal
let user_acls = admin.describe_acls_with_filter(
AclFilter::for_principal("User:alice")
).await?;
// Builder pattern for complex filters
let filter = AclFilter::all()
.resource_type(AclResourceType::Group)
.resource_name("my-consumer-group")
.principal("User:bob");
let result = admin.describe_acls_with_filter(filter).await?;
```
### Describe ACLs
Query existing ACLs matching a filter:
```rust
use krafka::protocol::{AclResourceType, AclPatternType, AclOperation, AclPermissionType};
// Find all ACLs for a specific topic
let result = admin.describe_acls(
AclResourceType::Topic,
Some("my-topic"),
AclPatternType::Literal,
None, // any principal
None, // any host
AclOperation::Any,
AclPermissionType::Any,
).await?;
if let Some(error) = result.error {
println!("Error: {}", error);
} else {
for binding in result.bindings {
println!("ACL: {:?} {} {:?} on {}",
binding.permission_type,
binding.principal,
binding.operation,
binding.resource_name);
}
}
```
### Create ACLs
Create new access control entries:
```rust
use krafka::protocol::AclBinding;
// Create a simple read ACL
let read_acl = AclBinding::allow_read_topic("my-topic", "User:alice");
// Create a write ACL
let write_acl = AclBinding::allow_write_topic("my-topic", "User:bob");
// Create ACLs
let result = admin.create_acls(vec![read_acl, write_acl]).await?;
for (i, r) in result.results.iter().enumerate() {
match &r.error {
None => println!("ACL {} created successfully", i),
Some(e) => println!("ACL {} failed: {}", i, e),
}
}
```
### Delete ACLs
Delete ACLs matching a filter:
```rust
use krafka::protocol::{AclBindingFilter, AclResourceType, AclPatternType, AclOperation, AclPermissionType};
// Delete all ACLs for a topic
let filter = AclBindingFilter {
resource_type: AclResourceType::Topic,
resource_name: Some("my-topic".to_string()),
pattern_type: AclPatternType::Literal,
principal: None,
host: None,
operation: AclOperation::Any,
permission_type: AclPermissionType::Any,
};
let result = admin.delete_acls(vec![filter]).await?;
for (i, fr) in result.filter_results.iter().enumerate() {
match &fr.error {
None => println!("Filter {} deleted {} ACLs", i, fr.deleted_count),
Some(e) => println!("Filter {} failed: {}", i, e),
}
}
```
## Consumer Group Management
### Describing Consumer Groups
Get detailed information about one or more consumer groups. The method
automatically detects each group's type (classic or KIP-848 consumer protocol)
and dispatches to the appropriate API (Key 15 or Key 69). The request is routed
to each group's coordinator broker via FindCoordinator:
```rust
let descriptions = admin
.describe_consumer_groups(vec!["my-group".to_string(), "other-group".to_string()])
.await?;
for group in &descriptions {
println!("Group: {} (type: {}, state: {})", group.group_id, group.group_type, group.state);
if let Some(assignor) = &group.assignor {
println!(" Assignor: {}", assignor);
}
if let Some(epoch) = group.group_epoch {
println!(" Epoch: {}", epoch);
}
for member in &group.members {
println!(
" Member: {} (client: {}, host: {}, instance: {:?})",
member.member_id, member.client_id, member.client_host,
member.instance_id
);
}
if let Some(error) = &group.error {
println!(" Error: {}", error);
}
}
```
> **Note:** Classic-protocol groups return `protocol_type` and `assignor` but
> no epoch or assignment details. KIP-848 groups return `group_epoch`,
> `assignment_epoch`, per-member subscriptions, and topic-UUID-based
> current/target assignments.
### Listing Consumer Groups
List all consumer groups across the cluster:
```rust
let groups = admin.list_consumer_groups().await?;
println!("Consumer groups:");
for group in &groups {
println!(" {} (type: {:?}, protocol: {})", group.group_id, group.group_type, group.protocol_type);
}
```
> **Note:** `list_consumer_groups()` queries all brokers in the cluster and deduplicates results, since consumer groups are managed by their respective group coordinators.
## Topic Partition Details
### Describing Topic Partitions
Use `describe_topic_partitions()` for paginated, detailed partition information
including ELR (eligible leader replicas) from KIP-966:
```rust
let result = admin
.describe_topic_partitions(vec!["my-topic".to_string()])
.await?;
for topic in &result.topics {
println!(
"Topic: {} (internal: {}, id: {:?})",
topic.name.as_deref().unwrap_or("?"),
topic.is_internal,
topic.topic_id
);
for p in &topic.partitions {
println!(
" Partition {}: leader={}, epoch={}, replicas={:?}, isr={:?}",
p.partition_index, p.leader_id, p.leader_epoch,
p.replica_nodes, p.isr_nodes
);
if let Some(elr) = &p.eligible_leader_replicas {
println!(" ELR: {:?}", elr);
}
if let Some(last_elr) = &p.last_known_elr {
println!(" Last known ELR: {:?}", last_elr);
}
if !p.offline_replicas.is_empty() {
println!(" Offline: {:?}", p.offline_replicas);
}
}
}
```
> **Note:** The DescribeTopicPartitions API (Key 75) is available on Kafka 4.0+.
> It automatically handles pagination for topics with many partitions (default
> limit 2000 partitions per page). All pages are collected into a single result.
## Record Deletion
### Deleting Records
Delete records from topic partitions before a specified offset. Records with offsets less than the specified offset are marked for deletion (this adjusts the log start offset). Requests are automatically routed to each partition's leader broker:
```rust
use std::collections::HashMap;
use std::time::Duration;
let mut offsets = HashMap::new();
offsets.insert(("my-topic".to_string(), 0), 100i64); // Delete before offset 100
offsets.insert(("my-topic".to_string(), 1), 250i64); // Delete before offset 250
let results = admin
.delete_records(offsets, Duration::from_secs(30))
.await?;
for result in &results {
match &result.error {
None => println!(
"Deleted records from {}:{}, new low watermark: {}",
result.topic, result.partition, result.low_watermark
),
Some(e) => println!(
"Failed to delete from {}:{}: {}",
result.topic, result.partition, e
),
}
}
```
> **Note:** Deleted records are not immediately removed from disk. The broker adjusts the log start offset, and records before that offset become inaccessible. Physical deletion happens during log segment cleanup.
## Leader Epoch Queries
### OffsetForLeaderEpoch
Query the end offset for a given leader epoch. This is used to detect log truncation after leader changes. Requests are routed to each partition's leader broker:
```rust
// Query the end offset for leader epoch 5 on partition 0 of "my-topic"
let results = admin
.offset_for_leader_epoch(vec![
("my-topic".to_string(), 0, 5),
("my-topic".to_string(), 1, 3),
])
.await?;
for result in &results {
match &result.error {
None => println!(
"{}:{} epoch={} end_offset={}",
result.topic, result.partition,
result.leader_epoch, result.end_offset
),
Some(e) => println!(
"{}:{} error: {}",
result.topic, result.partition, e
),
}
}
```
This API is useful for:
- **Log truncation detection**: After a leader change, check if the log was truncated
- **Consumer offset validation**: Ensure a consumer's saved offset is still valid
- **Replication diagnostics**: Verify epoch boundaries across replicas
## Delegation Tokens
Delegation tokens (KIP-48) allow a principal to delegate authentication to
another principal without sharing credentials. The token HMAC can be used for
SASL/SCRAM authentication.
### Creating a Token
```rust
use std::time::Duration;
// Create a token that "alice" can renew, with a 24-hour lifetime
let result = admin
.create_delegation_token(
&[("User", "alice")],
Some(Duration::from_secs(86_400)),
)
.await?;
match result.token {
Some(token) => println!("Created token: {} (HMAC {} bytes)", token.token_id, token.hmac.len()),
None => println!("Error: {}", result.error.unwrap()),
}
```
Pass an empty renewers slice to allow only the token owner to renew.
Use `None` for `max_lifetime` to accept the server default (typically 7 days).
### Describing Tokens
```rust
// Describe all tokens visible to the caller
let tokens = admin.describe_delegation_tokens(None).await?;
for token in &tokens {
println!(
"Token {} owned by {}:{}, expires at {}, {} renewer(s)",
token.token_id,
token.principal_type,
token.principal_name,
token.expiry_timestamp_ms,
token.renewers.len(),
);
}
// Describe tokens for a specific owner
let tokens = admin
.describe_delegation_tokens(Some(&[("User", "alice")]))
.await?;
```
### Renewing a Token
```rust
use std::time::Duration;
// Obtain a token (e.g., from a prior create call)
let result = admin
.create_delegation_token(&[("User", "alice")], Some(Duration::from_secs(86_400)))
.await?;
let token = result.token.expect("token created");
// Extend the token's lifetime by 1 hour
let result = admin
.renew_delegation_token(&token.hmac, Duration::from_secs(3_600))
.await?;
match result.error {
None => println!("New expiry: {}", result.expiry_timestamp_ms),
Some(e) => println!("Renew failed: {}", e),
}
```
### Expiring a Token
```rust
use std::time::Duration;
// Obtain a token (e.g., from describe)
let tokens = admin.describe_delegation_tokens(None).await?;
let token = &tokens[0];
// Expire a token immediately
let result = admin.expire_delegation_token(&token.hmac, None).await?;
// Expire a token after a grace period
let result = admin
.expire_delegation_token(&token.hmac, Some(Duration::from_secs(60)))
.await?;
```
### Protocol Versions
| CreateDelegationToken | v1–v3 | v1 baseline (v0 removed in Kafka 4.0), v2 flexible encoding, v3 owner principal override |
| RenewDelegationToken | v1–v2 | v1 baseline, v2 flexible encoding |
| ExpireDelegationToken | v1–v2 | v1 baseline, v2 flexible encoding |
| DescribeDelegationToken | v1–v3 | v1 baseline, v2 flexible encoding, v3 token requester fields |
## Client Quotas
Client quotas control the resource usage of clients (producer/consumer byte
rates, request percentages, etc.). Use `describe_client_quotas` to query
current quotas and `alter_client_quotas` to change them.
### Describing Quotas
```rust
// Describe all quotas for user "alice" (match_type 0 = exact match)
let result = admin
.describe_client_quotas(&[("user", 0, Some("alice"))], false)
.await?;
for entry in &result.entries {
let entity: Vec<_> = entry.entity.iter().map(|e| {
format!("{}={}", e.entity_type, e.entity_name.as_deref().unwrap_or("<default>"))
}).collect();
println!("Entity: {}", entity.join(", "));
for v in &entry.values {
println!(" {} = {}", v.key, v.value);
}
}
```
Filter match types:
- `0` — exact: match the entity with the given name
- `1` — default: match the default entity for this type
- `2` — any specified: match any entity with a name (non-default)
When `strict` is `true`, only entities that exactly match all given component
types are returned (entities with additional unspecified types are excluded).
### Altering Quotas
```rust
use krafka::admin::QuotaAlteration;
// Set producer byte rate for user "alice"
let results = admin
.alter_client_quotas(
&[QuotaAlteration {
entity: vec![("user", Some("alice"))],
ops: vec![
("producer_byte_rate", Some(1_048_576.0)), // set to 1 MiB/s
("consumer_byte_rate", None), // remove quota
],
}],
false,
)
.await?;
for result in &results {
match &result.error {
None => println!("Quota altered successfully"),
Some(e) => println!("Error: {}", e),
}
}
// Dry-run validation (validate_only = true)
let results = admin
.alter_client_quotas(
&[QuotaAlteration {
entity: vec![("user", Some("alice"))],
ops: vec![("producer_byte_rate", Some(1_048_576.0))],
}],
true,
)
.await?;
```
## Feature Versioning (KIP-584)
Kafka 2.7+ supports cluster-wide feature flags that control the finalized
version range for features like `metadata.version`. Use `describe_features` to
discover what the cluster supports and `update_features` to upgrade, downgrade,
or delete finalized feature levels.
### Describing Features
```rust
let features = admin.describe_features().await?;
println!("Epoch: {}", features.finalized_features_epoch);
for f in &features.supported_features {
println!("supported: {} [{}, {}]", f.name, f.min_version, f.max_version);
}
for f in &features.finalized_features {
println!("finalized: {} [{}, {}]", f.name, f.min_version_level, f.max_version_level);
}
```
### Updating Features
```rust
use krafka::protocol::messages::FeatureUpdateKey;
// Upgrade metadata.version to level 17
let results = admin
.update_features(
vec![FeatureUpdateKey::upgrade("metadata.version", 17)],
false, // validate_only
)
.await?;
for r in &results.results {
match &r.error {
None => println!("{}: ok", r.feature),
Some(e) => println!("{}: {}", r.feature, e),
}
}
// Dry-run validation (validate_only = true, requires v1+)
let results = admin
.update_features(
vec![FeatureUpdateKey::upgrade("metadata.version", 17)],
true,
)
.await?;
```
Upgrade types:
- `FeatureUpdateKey::upgrade(name, level)` — raise to a higher level
- `FeatureUpdateKey::safe_downgrade(name, level)` — lower the level safely
- `FeatureUpdateKey::unsafe_downgrade(name, level)` — forceful downgrade (may lose data)
- `FeatureUpdateKey::delete(name)` — remove the finalized feature entirely
When the broker supports `UpdateFeatures` v1+, the request uses the typed
`UpgradeType` field. On older v0 brokers, the client falls back to the boolean
`AllowDowngrade` flag.
## Log Directory Inspection
`describe_log_dirs()` queries every broker and returns per-directory information
including partition sizes, offset lag, future-replica status, and (v4+) volume
capacity.
### Describe All Log Directories
```rust
let dirs = admin.describe_log_dirs(None).await?;
for dir in &dirs {
println!("broker {} — {} (total: {}, usable: {})",
dir.broker_id, dir.log_dir, dir.total_bytes, dir.usable_bytes);
if let Some(err) = &dir.error {
eprintln!(" error: {err}");
}
for topic in &dir.topics {
for p in &topic.partitions {
println!(" {}-{}: {} bytes, lag {}{}",
topic.name, p.partition_index, p.partition_size,
p.offset_lag, if p.is_future_key { " (future)" } else { "" });
}
}
}
```
### Describe Specific Topics
```rust
use krafka::protocol::DescribableLogDirTopic;
let filter = vec![DescribableLogDirTopic {
topic: "my-topic".into(),
partitions: vec![0, 1, 2],
}];
let dirs = admin.describe_log_dirs(Some(filter)).await?;
```
### Result Fields
| `broker_id` | `i32` | Broker that owns the directory |
| `log_dir` | `String` | Absolute path on the broker |
| `error` | `Option<String>` | Per-directory error (e.g., `KAFKA_STORAGE_ERROR`) |
| `total_bytes` | `i64` | Volume total bytes (-1 if unknown, v4+) |
| `usable_bytes` | `i64` | Volume free bytes (-1 if unknown, v4+) |
| `topics[].partitions[].partition_size` | `i64` | Log size in bytes |
| `topics[].partitions[].offset_lag` | `i64` | Lag behind high watermark |
| `topics[].partitions[].is_future_key` | `bool` | Future replica (reassignment) |
### Protocol Versions
| v1 | Baseline (v0 removed in Kafka 4.0) |
| v2 | Flexible encoding (compact strings + tagged fields) |
| v3 | Top-level `ErrorCode` in response |
| v4 | `TotalBytes` + `UsableBytes` per log directory |
## Leader Election
`elect_leaders()` triggers a leader election for the specified partitions.
Supports preferred election (elect the preferred replica) and unclean election
(elect the first live replica even without in-sync replicas).
### Preferred Election for All Partitions
```rust
use krafka::protocol::ElectionType;
let results = admin
.elect_leaders(ElectionType::Preferred, None, Duration::from_secs(60))
.await?;
for topic in &results {
for p in &topic.partitions {
if let Some(err) = &p.error {
eprintln!("{}-{}: {err}", topic.topic, p.partition_id);
}
}
}
```
### Unclean Election for Specific Partitions
```rust
use krafka::protocol::{ElectionType, ElectLeadersTopicPartitions};
let results = admin
.elect_leaders(
ElectionType::Unclean,
Some(vec![ElectLeadersTopicPartitions {
topic: "my-topic".into(),
partitions: vec![0, 1],
}]),
Duration::from_secs(60),
)
.await?;
```
### Protocol Versions
| v0 | Baseline (preferred election only) |
| v1 | Adds `ElectionType` for preferred/unclean (KIP-460); top-level error code |
| v2 | Flexible encoding (compact strings + tagged fields) |
## Partition Reassignment
`alter_partition_reassignments()` initiates or cancels partition reassignments.
`list_partition_reassignments()` lists all ongoing reassignments.
> **Warning**: Reassigning partitions moves data between brokers and can
> significantly impact cluster load.
### Start a Reassignment
```rust
use krafka::protocol::{ReassignableTopic, ReassignablePartition};
let result = admin.alter_partition_reassignments(
vec![ReassignableTopic {
name: "my-topic".into(),
partitions: vec![ReassignablePartition {
partition_index: 0,
replicas: Some(vec![1, 2, 3]),
}],
}],
Duration::from_secs(60),
).await?;
if let Some(err) = &result.error {
eprintln!("Top-level error: {err}");
}
for topic in &result.topics {
for p in &topic.partitions {
if let Some(err) = &p.error {
eprintln!("{}-{}: {err}", topic.name, p.partition_index);
}
}
}
```
### Cancel a Pending Reassignment
```rust
use krafka::protocol::{ReassignableTopic, ReassignablePartition};
// Set replicas to None to cancel
let result = admin.alter_partition_reassignments(
vec![ReassignableTopic {
name: "my-topic".into(),
partitions: vec![ReassignablePartition {
partition_index: 0,
replicas: None, // cancel pending reassignment
}],
}],
Duration::from_secs(60),
).await?;
```
### List Ongoing Reassignments
```rust
let reassignments = admin
.list_partition_reassignments(None, Duration::from_secs(60))
.await?;
for topic in &reassignments {
for p in &topic.partitions {
println!("{} p{}: replicas={:?} adding={:?} removing={:?}",
topic.name, p.partition_index, p.replicas,
p.adding_replicas, p.removing_replicas);
}
}
```
### AlterPartitionReassignments Protocol Versions
| v0 | Baseline (flexible encoding from the start) |
### ListPartitionReassignments Protocol Versions
| v0 | Baseline (flexible encoding from the start) |
## SCRAM Credential Management
Manage SASL/SCRAM credentials (KIP-554) for users.
### Describe SCRAM Credentials
```rust
// Describe all users
let result = admin.describe_user_scram_credentials(None).await?;
for user in &result.users {
println!("{}: {:?}", user.name, user.credential_infos);
}
// Describe specific users
let result = admin
.describe_user_scram_credentials(Some(vec!["alice".into(), "bob".into()]))
.await?;
```
### Alter SCRAM Credentials
```rust
use krafka::protocol::{ScramCredentialDeletion, ScramCredentialUpsertion};
use krafka::auth::ScramMechanism;
use zeroize::Zeroizing;
let results = admin.alter_user_scram_credentials(
vec![ScramCredentialDeletion {
name: "alice".into(),
mechanism: ScramMechanism::Sha512,
}],
vec![ScramCredentialUpsertion {
name: "bob".into(),
mechanism: ScramMechanism::Sha256,
iterations: 8192,
salt: Zeroizing::new(vec![1, 2, 3]),
salted_password: Zeroizing::new(vec![4, 5, 6]),
}],
).await?;
```
### SCRAM Credential Protocol Versions
| DescribeUserScramCredentials v0 | Baseline (KIP-554, flexible from v0) |
| AlterUserScramCredentials v0 | Baseline (KIP-554, flexible from v0) |
## Log Directory Management
### Move Replicas Between Log Directories
```rust
use krafka::protocol::{AlterReplicaLogDir, AlterReplicaLogDirTopic};
let results = admin.alter_replica_log_dirs(vec![
AlterReplicaLogDir {
path: "/data/kafka-logs-2".into(),
topics: vec![AlterReplicaLogDirTopic {
name: "my-topic".into(),
partitions: vec![0, 1],
}],
},
]).await?;
```
### AlterReplicaLogDirs Protocol Versions
| v1 | Baseline (non-flexible encoding) |
| v2 | Flexible encoding |
## Offset Management
### Delete Consumer Group Offsets
```rust
let result = admin.delete_offsets(
"my-group",
&[("my-topic", &[0, 1, 2])],
).await?;
if let Some(err) = &result.error {
eprintln!("Top-level error: {err}");
}
```
### OffsetDelete Protocol Versions
| v0 | Baseline (non-flexible encoding) |
## Transaction Debugging
### Describe Producers
Inspect active producers on partitions (useful for debugging stuck transactions).
```rust
let results = admin
.describe_producers(&[("my-topic", &[0, 1])])
.await?;
for topic in &results {
for p in &topic.partitions {
for pr in &p.active_producers {
println!("p{}: producer_id={} epoch={} txn_offset={}",
p.partition_index, pr.producer_id,
pr.producer_epoch, pr.current_txn_start_offset);
}
}
}
```
### Describe Transactions
```rust
let results = admin
.describe_transactions(&["txn-1", "txn-2"])
.await?;
for txn in &results {
println!("{}: state={} producer_id={}", txn.transactional_id, txn.state, txn.producer_id);
}
```
### List Transactions
```rust
// List all ongoing transactions
let result = admin.list_transactions(&["Ongoing"], &[], -1).await?;
for txn in &result.transactions {
println!("{}: state={} producer_id={}", txn.transactional_id, txn.state, txn.producer_id);
}
```
### Transaction Debug Protocol Versions
| DescribeProducers v0 | Baseline (KIP-664, flexible from v0) |
| DescribeTransactions v0 | Baseline (KIP-664, flexible from v0) |
| ListTransactions v0 | Baseline (KIP-664, flexible from v0) |
| ListTransactions v1 | Adds DurationFilter (KIP-994) |
## Client Metrics Resources
### List Client Metrics Subscriptions
```rust
let names = admin.list_client_metrics_resources().await?;
for name in &names {
println!("subscription: {name}");
}
```
### ListClientMetricsResources Protocol Versions
| v0 | Baseline (KIP-714, flexible from v0) |
## Transaction Markers (WriteTxnMarkers)
### Write Transaction Markers
Write COMMIT or ABORT markers for transactions. Primarily useful for aborting
stuck (hanging) transactions via the `abort_transaction` convenience method.
```rust
// Abort a stuck transaction
admin.abort_transaction("my-transactional-id").await?;
```
For low-level control, use `write_txn_markers` directly:
```rust
use krafka::protocol::{WritableTxnMarker, WritableTxnMarkerTopic};
let results = admin
.write_txn_markers(&[WritableTxnMarker {
producer_id: 42,
producer_epoch: 5,
transaction_result: false, // ABORT
topics: vec![WritableTxnMarkerTopic {
name: "my-topic".into(),
partition_indexes: vec![0, 1],
}],
coordinator_epoch: 10,
}])
.await?;
```
### WriteTxnMarkers Protocol Versions
| WriteTxnMarkers v1 | Baseline (flexible encoding, v0 removed in Kafka 4.0) |
| WriteTxnMarkers v2 | Adds TransactionVersion field (KIP-1228) |
## KRaft Quorum (DescribeQuorum)
### Describe Quorum
Inspect the KRaft quorum for cluster metadata partitions. Returns voter and
observer replicas, leader info, and high watermark.
```rust
let result = admin
.describe_quorum(&[("__cluster_metadata", &[0])])
.await?;
for topic in &result.topics {
for partition in &topic.partitions {
println!(
"partition {} leader={} epoch={} hw={}",
partition.partition_index,
partition.leader_id,
partition.leader_epoch,
partition.high_watermark
);
for voter in &partition.current_voters {
println!(" voter {} log_end_offset={}", voter.replica_id, voter.log_end_offset);
}
}
}
```
### DescribeQuorum Protocol Versions
| DescribeQuorum v0 | Baseline (KIP-595, flexible from v0) |
| DescribeQuorum v1 | Adds LastFetchTimestamp + LastCaughtUpTimestamp (KIP-836) |
| DescribeQuorum v2 | Adds Nodes, ErrorMessage, ReplicaDirectoryId (KIP-853) |
## Next Steps
- [Interceptors Guide](interceptors.md) - Producer and consumer interceptor hooks
- [Configuration Reference](configuration.md) - All admin client options
- [Architecture Overview](architecture.md) - How admin client works internally