# Admin
`KafkaAdmin` provides Kafka cluster, topic, group, config, feature, and SCRAM
credential operations through the Kafka protocol.
Use admin calls for application setup, deployment checks, tooling, and
operational workflows. Keep topic creation and config mutation explicit; these
operations affect cluster state.
## Initial Release Scope
The initial admin API scope is intentionally limited to operations that are
implemented, documented, and covered by tests:
- Cluster metadata: `describe_cluster`.
- Topics: create, delete, list, describe, and increase partitions.
- Consumer groups: list, describe, and delete classic, consumer, and share
groups when the broker supports the underlying APIs.
- Configs: describe and incremental alter for supported Kafka resource types.
- ACLs: describe, create, and delete bindings.
- Broker log directories: describe per-broker log directory usage.
- Feature levels: inspect and update finalized broker features.
- SCRAM: upsert user credentials for supported mechanisms.
Unsupported Kafka admin APIs are not part of the release contract until they are
represented by explicit `KafkaAdmin` methods and integration coverage.
## Connect
```rust,no_run
use kafkit_client::{AdminConfig, KafkaAdmin};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;
let cluster = admin.describe_cluster().await?;
println!("cluster id: {}", cluster.cluster_id);
println!("controller id: {}", cluster.controller_id);
Ok(())
}
```
## Topics
```rust,no_run
use kafkit_client::{AdminConfig, KafkaAdmin, NewPartitions, NewTopic};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;
admin
.create_topics([
NewTopic::new("orders", 3, 1)
.with_config("cleanup.policy", "compact,delete"),
])
.await?;
admin
.create_partitions([("orders", NewPartitions::increase_to(6))])
.await?;
let topics = admin.list_topics().await?;
println!("topics: {}", topics.len());
let descriptions = admin.describe_topics(["orders"]).await?;
for topic in descriptions {
println!("{} has {} partitions", topic.name, topic.partitions.len());
}
Ok(())
}
```
`delete_topics(...)` is also available. Treat deletes as irreversible from the
client's point of view.
## Consumer Groups
```rust,no_run
use kafkit_client::{AdminConfig, KafkaAdmin};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;
let groups = admin.list_consumer_groups().await?;
for group in &groups {
println!("group: {}", group.group_id);
}
let descriptions = admin.describe_consumer_groups(["orders-reader"]).await?;
for group in descriptions {
println!("{} members: {}", group.group_id, group.members.len());
}
Ok(())
}
```
## Configs
Describe configs before mutating them so tooling can show the current value and
source.
```rust,no_run
use kafkit_client::{
AdminConfig, AlterConfigOp, ConfigResource, KafkaAdmin,
};
#[tokio::main]
async fn main() -> kafkit_client::Result<()> {
let admin = KafkaAdmin::connect(AdminConfig::new("localhost:9092")).await?;
let configs = admin
.describe_configs([ConfigResource::topic("orders")])
.await?;
for config in &configs {
if let Some(entry) = config.entry("cleanup.policy") {
println!("cleanup.policy = {:?}", entry.value);
}
}
admin
.incremental_alter_configs([(
ConfigResource::topic("orders"),
vec![AlterConfigOp::set("retention.ms", "604800000")],
)])
.await?;
Ok(())
}
```
## Security Administration
The admin client includes SCRAM credential upsert helpers. Use them only from
trusted operational tooling, and protect passwords before they reach logs or
tracing subscribers.