use anyhow::Result;
use clap::Subcommand;
use mockforge_core::config::{load_config, KafkaConfig};
use mockforge_kafka::{KafkaFixture, KafkaMockBroker};
use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication};
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer};
use rdkafka::message::{Header, Headers, Message, OwnedHeaders};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::topic_partition_list::TopicPartitionList;
use rdkafka::Offset;
use std::path::PathBuf;
use std::time::Duration;
#[derive(Subcommand)]
pub enum KafkaCommands {
#[command(verbatim_doc_comment)]
Metrics {
#[arg(short, long, default_value = "text")]
format: String,
},
#[command(verbatim_doc_comment)]
Serve {
#[arg(short, long, default_value = "9092")]
port: u16,
#[arg(long, default_value = "127.0.0.1")]
host: String,
#[arg(short, long)]
config: Option<PathBuf>,
},
#[command(verbatim_doc_comment)]
Topic {
#[command(subcommand)]
topic_command: KafkaTopicCommands,
},
#[command(verbatim_doc_comment)]
Groups {
#[command(subcommand)]
groups_command: KafkaGroupsCommands,
},
#[command(verbatim_doc_comment)]
Produce {
#[arg(short, long)]
topic: String,
#[arg(short, long)]
key: Option<String>,
#[arg(short = 'm', long)]
value: String,
#[arg(short, long)]
partition: Option<i32>,
#[arg(short = 'H', long)]
header: Vec<String>,
},
#[command(verbatim_doc_comment)]
Consume {
#[arg(short, long)]
topic: String,
#[arg(short, long)]
group: Option<String>,
#[arg(short, long)]
partition: Option<i32>,
#[arg(short, long, default_value = "latest")]
from: String,
#[arg(short, long)]
count: Option<usize>,
},
#[command(verbatim_doc_comment)]
Fixtures {
#[command(subcommand)]
fixtures_command: KafkaFixturesCommands,
},
#[command(verbatim_doc_comment)]
Simulate {
#[command(subcommand)]
simulate_command: KafkaSimulateCommands,
},
}
#[derive(Subcommand)]
pub enum KafkaTopicCommands {
Create {
name: String,
#[arg(short, long, default_value = "3")]
partitions: i32,
#[arg(short, long, default_value = "1")]
replication_factor: i32,
},
List,
Describe {
name: String,
},
Delete {
name: String,
},
}
#[derive(Subcommand)]
pub enum KafkaGroupsCommands {
List,
Describe {
group_id: String,
},
Offsets {
group_id: String,
},
}
#[derive(Subcommand)]
pub enum KafkaFixturesCommands {
Load {
directory: PathBuf,
},
List,
StartAutoProduce,
StopAutoProduce,
}
#[derive(Subcommand)]
pub enum KafkaSimulateCommands {
Lag {
#[arg(short, long)]
group: String,
#[arg(short, long)]
topic: String,
#[arg(short, long)]
lag: i64,
},
Rebalance {
#[arg(short, long)]
group: String,
},
ResetOffsets {
#[arg(short, long)]
group: String,
#[arg(short, long)]
topic: String,
#[arg(short = 'o', long, default_value = "earliest")]
to: String,
},
}
pub async fn handle_kafka_command(command: KafkaCommands) -> Result<()> {
execute_kafka_command(command).await
}
pub async fn execute_kafka_command(command: KafkaCommands) -> Result<()> {
match command {
KafkaCommands::Serve { port, host, config } => {
let mut kafka_config = if let Some(config_path) = config {
let server_config = load_config(config_path).await?;
server_config.kafka
} else {
KafkaConfig::default()
};
kafka_config.port = port;
kafka_config.host = host.clone();
let broker = KafkaMockBroker::new(kafka_config).await?;
println!("Starting Kafka broker on {}:{}", host, port);
broker.start().await?;
Ok(())
}
KafkaCommands::Topic { topic_command } => execute_topic_command(topic_command).await,
KafkaCommands::Groups { groups_command } => execute_groups_command(groups_command).await,
KafkaCommands::Produce {
topic,
key,
value,
partition,
header,
} => {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.create()
.map_err(|e| anyhow::anyhow!("Producer creation failed: {}", e))?;
let mut record = FutureRecord::to(&topic).payload(&value);
if let Some(k) = &key {
record = record.key(k);
}
if let Some(p) = partition {
record = record.partition(p);
}
if !header.is_empty() {
let mut owned_headers = OwnedHeaders::new();
for h in &header {
let parts: Vec<&str> = h.splitn(2, ':').collect();
if parts.len() == 2 {
owned_headers = owned_headers.insert(Header {
key: parts[0],
value: Some(parts[1].as_bytes()),
});
} else {
return Err(anyhow::anyhow!("Invalid header format: {}", h));
}
}
record = record.headers(owned_headers);
}
let delivery_status = producer.send(record, Duration::from_secs(0)).await;
match delivery_status {
Ok(delivery) => {
println!(
"Message produced to topic {} partition {} at offset {}",
topic, delivery.partition, delivery.offset
);
}
Err((e, _)) => {
return Err(anyhow::anyhow!("Failed to produce message: {}", e));
}
}
Ok(())
}
KafkaCommands::Consume {
topic,
group,
partition,
from,
count,
} => {
let group_id = group.unwrap_or_else(|| "cli-consumer".to_string());
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", &group_id)
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
.create()
.map_err(|e| anyhow::anyhow!("Consumer creation failed: {}", e))?;
if let Some(p) = partition {
let mut tpl = TopicPartitionList::new();
tpl.add_partition(&topic, p);
consumer.assign(&tpl).map_err(|e| anyhow::anyhow!("Assign failed: {}", e))?;
let offset = match from.as_str() {
"beginning" => Offset::Beginning,
"end" => Offset::End,
_ => return Err(anyhow::anyhow!("Invalid 'from' value: {}", from)),
};
consumer
.seek(&topic, p, offset, Duration::from_secs(30))
.map_err(|e| anyhow::anyhow!("Seek failed: {}", e))?;
} else {
consumer
.subscribe(&[&topic])
.map_err(|e| anyhow::anyhow!("Subscribe failed: {}", e))?;
}
let mut message_count = 0;
let max_count = count.unwrap_or(usize::MAX);
println!("Consuming from topic {}...", topic);
if let Some(p) = partition {
println!(" Partition: {}", p);
}
println!(" From: {}", from);
loop {
if message_count >= max_count {
break;
}
match consumer.recv().await {
Ok(message) => {
message_count += 1;
println!("Message {}:", message_count);
if let Some(key) = message.key() {
println!(" Key: {}", String::from_utf8_lossy(key));
}
if let Some(payload) = message.payload() {
println!(" Value: {}", String::from_utf8_lossy(payload));
}
println!(" Partition: {}", message.partition());
println!(" Offset: {}", message.offset());
if let Some(headers) = message.headers() {
for header in headers.iter() {
println!(
" Header {}: {}",
header.key,
String::from_utf8_lossy(header.value.unwrap_or(&[]))
);
}
}
println!();
}
Err(e) => {
return Err(anyhow::anyhow!("Receive failed: {}", e));
}
}
}
println!("Consumed {} messages", message_count);
Ok(())
}
KafkaCommands::Fixtures { fixtures_command } => {
execute_fixtures_command(fixtures_command).await
}
KafkaCommands::Simulate { simulate_command } => {
execute_simulate_command(simulate_command).await
}
KafkaCommands::Metrics { format } => {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "metrics-consumer")
.create()
.map_err(|e| anyhow::anyhow!("Consumer creation failed: {}", e))?;
let metadata = consumer
.fetch_metadata(None, Duration::from_secs(30))
.map_err(|e| anyhow::anyhow!("Fetch metadata failed: {}", e))?;
if format == "prometheus" {
println!("# Kafka Metrics");
println!("kafka_topics_total {}", metadata.topics().len());
let mut total_partitions = 0;
for topic in metadata.topics() {
let partitions = topic.partitions().len();
total_partitions += partitions;
println!("kafka_topic_partitions{{topic=\"{}\"}} {}", topic.name(), partitions);
}
println!("kafka_partitions_total {}", total_partitions);
println!("kafka_brokers_total {}", metadata.brokers().len());
} else {
println!("Kafka Broker Metrics:");
println!(" Brokers: {}", metadata.brokers().len());
println!(" Topics: {}", metadata.topics().len());
let mut total_partitions = 0;
for topic in metadata.topics() {
let partitions = topic.partitions().len();
total_partitions += partitions;
println!(" {}: {} partitions", topic.name(), partitions);
}
println!(" Total Partitions: {}", total_partitions);
}
Ok(())
}
}
}
async fn execute_topic_command(command: KafkaTopicCommands) -> Result<()> {
match command {
KafkaTopicCommands::Create {
name,
partitions,
replication_factor,
} => {
let admin: AdminClient<_> = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.create()
.map_err(|e| anyhow::anyhow!("Admin client creation failed: {}", e))?;
let topics = vec![NewTopic::new(
name.as_str(),
partitions,
TopicReplication::Fixed(-1),
)];
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(30)));
admin
.create_topics(&topics, &options)
.await
.map_err(|e| anyhow::anyhow!("Create topic failed: {}", e))?;
println!(
"Topic '{}' created successfully with {} partitions (replication factor: {})",
name, partitions, replication_factor
);
Ok(())
}
KafkaTopicCommands::List => {
let admin: AdminClient<_> = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.create()
.map_err(|e| anyhow::anyhow!("Admin client creation failed: {}", e))?;
let metadata = admin
.inner()
.fetch_metadata(None, Duration::from_secs(30))
.map_err(|e| anyhow::anyhow!("Fetch metadata failed: {}", e))?;
println!("Topics:");
for topic in metadata.topics() {
println!(" {} ({} partitions)", topic.name(), topic.partitions().len());
}
Ok(())
}
KafkaTopicCommands::Describe { name } => {
let admin: AdminClient<_> = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.create()
.map_err(|e| anyhow::anyhow!("Admin client creation failed: {}", e))?;
let metadata = admin
.inner()
.fetch_metadata(None, Duration::from_secs(30))
.map_err(|e| anyhow::anyhow!("Fetch metadata failed: {}", e))?;
let topic = metadata
.topics()
.iter()
.find(|t| t.name() == name)
.ok_or_else(|| anyhow::anyhow!("Topic {} not found", name))?;
println!("Topic: {}", topic.name());
println!("Partitions: {}", topic.partitions().len());
for partition in topic.partitions() {
println!(
" Partition {}: Leader={}, Replicas={:?}",
partition.id(),
partition.leader(),
partition.replicas().to_vec()
);
}
Ok(())
}
KafkaTopicCommands::Delete { name } => {
let admin: AdminClient<_> = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.create()
.map_err(|e| anyhow::anyhow!("Admin client creation failed: {}", e))?;
let options = AdminOptions::new().request_timeout(Some(Duration::from_secs(30)));
admin
.delete_topics(&[name.as_str()], &options)
.await
.map_err(|e| anyhow::anyhow!("Delete topic failed: {}", e))?;
println!("Topic '{}' deleted successfully", name);
Ok(())
}
}
}
async fn execute_groups_command(command: KafkaGroupsCommands) -> Result<()> {
match command {
KafkaGroupsCommands::List => {
let admin: AdminClient<_> = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.create()
.map_err(|e| anyhow::anyhow!("Admin client creation failed: {}", e))?;
let groups = admin
.inner()
.fetch_group_list(None, Duration::from_secs(30))
.map_err(|e| anyhow::anyhow!("List groups failed: {}", e))?;
println!("Consumer Groups:");
for group in groups.groups() {
println!(" {}", group.name());
}
Ok(())
}
KafkaGroupsCommands::Describe { group_id } => {
let admin: AdminClient<_> = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.create()
.map_err(|e| anyhow::anyhow!("Admin client creation failed: {}", e))?;
let groups = admin
.inner()
.fetch_group_list(None, Duration::from_secs(30))
.map_err(|e| anyhow::anyhow!("List groups failed: {}", e))?;
let group = groups
.groups()
.iter()
.find(|g| g.name() == group_id)
.ok_or_else(|| anyhow::anyhow!("Consumer group {} not found", group_id))?;
println!("Consumer Group: {}", group_id);
println!(" State: {}", group.state());
println!(" Protocol: {}", group.protocol());
println!(" Protocol Type: {}", group.protocol_type());
println!(" Members: {}", group.members().len());
Ok(())
}
KafkaGroupsCommands::Offsets { group_id } => {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", &group_id)
.set("enable.auto.commit", "false")
.create()
.map_err(|e| anyhow::anyhow!("Consumer creation failed: {}", e))?;
let metadata = consumer
.fetch_metadata(None, Duration::from_secs(30))
.map_err(|e| anyhow::anyhow!("Fetch metadata failed: {}", e))?;
let mut tpl = TopicPartitionList::new();
for topic in metadata.topics() {
if topic.name().starts_with("__") {
continue;
}
for partition in topic.partitions() {
tpl.add_partition(topic.name(), partition.id());
}
}
let committed = consumer
.committed_offsets(tpl, Duration::from_secs(30))
.map_err(|e| anyhow::anyhow!("Failed to fetch committed offsets: {}", e))?;
println!("Consumer group offsets for '{}':", group_id);
for elem in committed.elements() {
let offset = match elem.offset() {
Offset::Offset(v) => v.to_string(),
Offset::Beginning => "beginning".to_string(),
Offset::End => "end".to_string(),
Offset::Stored => "stored".to_string(),
Offset::Invalid => "invalid".to_string(),
Offset::OffsetTail(v) => format!("tail({})", v),
};
println!(" {}[{}] -> {}", elem.topic(), elem.partition(), offset);
}
Ok(())
}
}
}
async fn execute_fixtures_command(command: KafkaFixturesCommands) -> Result<()> {
match command {
KafkaFixturesCommands::Load { directory } => {
if !directory.exists() {
return Err(anyhow::anyhow!("Directory does not exist: {}", directory.display()));
}
if !directory.is_dir() {
return Err(anyhow::anyhow!("Path is not a directory: {}", directory.display()));
}
match KafkaFixture::load_from_dir(&directory) {
Ok(fixtures) => {
if fixtures.is_empty() {
println!("No fixture files found in {}", directory.display());
println!("Fixture files should be YAML files (.yaml or .yml) containing KafkaFixture definitions.");
return Ok(());
}
println!(
"Successfully loaded {} fixtures from {}",
fixtures.len(),
directory.display()
);
let mut topics = std::collections::HashSet::new();
let mut auto_produce_count = 0;
for fixture in &fixtures {
topics.insert(&fixture.topic);
if fixture.auto_produce.as_ref().is_some_and(|ap| ap.enabled) {
auto_produce_count += 1;
}
}
println!("Fixtures cover {} unique topics", topics.len());
if auto_produce_count > 0 {
println!("{} fixtures have auto-produce enabled", auto_produce_count);
}
println!("\nFixtures loaded:");
for fixture in &fixtures {
println!(" ✓ {} ({})", fixture.identifier, fixture.name);
}
println!(
"\nNote: Fixtures are loaded for validation. In a running mock broker,"
);
println!(
"these would be available for message generation and auto-production."
);
Ok(())
}
Err(e) => Err(anyhow::anyhow!(
"Failed to load fixtures from {}: {}",
directory.display(),
e
)),
}
}
KafkaFixturesCommands::List => {
let fixture_dirs = vec![
std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")),
PathBuf::from("./fixtures"),
PathBuf::from("./kafka-fixtures"),
];
let mut all_fixtures = Vec::new();
let mut found_dirs = Vec::new();
for dir in fixture_dirs {
if dir.exists() && dir.is_dir() {
match KafkaFixture::load_from_dir(&dir) {
Ok(fixtures) => {
if !fixtures.is_empty() {
all_fixtures.extend(fixtures);
found_dirs.push(dir);
}
}
Err(e) => {
tracing::warn!("Failed to load fixtures from {}: {}", dir.display(), e);
}
}
}
}
if all_fixtures.is_empty() {
println!(
"No fixtures found. Checked directories: ./, ./fixtures, ./kafka-fixtures"
);
println!("Create YAML fixture files in one of these directories to define message templates.");
return Ok(());
}
println!(
"Found {} fixtures in {} director{}:",
all_fixtures.len(),
found_dirs.len(),
if found_dirs.len() == 1 { "y" } else { "ies" }
);
for dir in &found_dirs {
println!(" {}", dir.display());
}
println!("\nFixtures:");
for fixture in &all_fixtures {
println!(" {}: {}", fixture.identifier, fixture.name);
println!(" Topic: {}", fixture.topic);
println!(
" Partition: {}",
fixture.partition.map_or("all".to_string(), |p| p.to_string())
);
if let Some(auto_produce) = &fixture.auto_produce {
if auto_produce.enabled {
println!(" Auto-produce: {} msg/sec", auto_produce.rate_per_second);
if let Some(duration) = auto_produce.duration_seconds {
println!(" Duration: {} seconds", duration);
}
if let Some(count) = auto_produce.total_count {
println!(" Total count: {} messages", count);
}
} else {
println!(" Auto-produce: disabled");
}
} else {
println!(" Auto-produce: not configured");
}
println!(" Headers: {}", fixture.headers.len());
println!();
}
Ok(())
}
KafkaFixturesCommands::StartAutoProduce => {
println!("Auto-produce start requested - ensure fixtures have auto_produce.enabled = true in their configuration");
Ok(())
}
KafkaFixturesCommands::StopAutoProduce => {
println!("Auto-produce stop requested - this would disable auto_produce for all running fixtures");
Ok(())
}
}
}
async fn execute_simulate_command(command: KafkaSimulateCommands) -> Result<()> {
match command {
KafkaSimulateCommands::Lag { group, topic, lag } => {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", "lag-simulator")
.set("enable.auto.commit", "false")
.create()
.map_err(|e| anyhow::anyhow!("Consumer creation failed: {}", e))?;
let metadata = consumer
.fetch_metadata(Some(topic.as_str()), Duration::from_secs(30))
.map_err(|e| anyhow::anyhow!("Fetch metadata failed: {}", e))?;
let topic_metadata = metadata
.topics()
.iter()
.find(|t| t.name() == topic)
.ok_or_else(|| anyhow::anyhow!("Topic {} not found", topic))?;
let mut tpl = TopicPartitionList::new();
for partition in topic_metadata.partitions() {
let (low_watermark, high_watermark) = consumer
.fetch_watermarks(&topic, partition.id(), Duration::from_secs(30))
.map_err(|e| {
anyhow::anyhow!(
"Fetch watermarks failed for partition {}: {}",
partition.id(),
e
)
})?;
let target_offset = if lag >= 0 {
high_watermark.saturating_sub(lag)
} else {
low_watermark
};
let _ =
tpl.add_partition_offset(&topic, partition.id(), Offset::Offset(target_offset));
}
let group_consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", &group)
.set("enable.auto.commit", "false")
.create()
.map_err(|e| anyhow::anyhow!("Consumer creation failed: {}", e))?;
group_consumer
.commit(&tpl, CommitMode::Sync)
.map_err(|e| anyhow::anyhow!("Failed to commit lagged offsets: {}", e))?;
println!(
"Simulated lag of {} messages for group {} on topic {} (set offsets behind high watermark)",
lag, group, topic
);
Ok(())
}
KafkaSimulateCommands::Rebalance { group } => {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", &group)
.set("enable.auto.commit", "false")
.create()
.map_err(|e| anyhow::anyhow!("Consumer creation failed: {}", e))?;
let metadata = consumer
.fetch_metadata(None, Duration::from_secs(30))
.map_err(|e| anyhow::anyhow!("Fetch metadata failed: {}", e))?;
let topics: Vec<&str> = metadata
.topics()
.iter()
.map(|t| t.name())
.filter(|name| !name.starts_with("__"))
.collect();
if topics.is_empty() {
return Err(anyhow::anyhow!(
"No non-internal topics available to trigger rebalance"
));
}
consumer
.subscribe(&topics)
.map_err(|e| anyhow::anyhow!("Subscribe failed: {}", e))?;
tokio::time::sleep(Duration::from_millis(500)).await;
consumer.unsubscribe();
println!(
"Triggered rebalance probe for group {} by joining/leaving topics: {}",
group,
topics.join(", ")
);
Ok(())
}
KafkaSimulateCommands::ResetOffsets { group, topic, to } => {
let admin: AdminClient<_> = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.create()
.map_err(|e| anyhow::anyhow!("Admin client creation failed: {}", e))?;
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9092")
.set("group.id", &group)
.set("enable.auto.commit", "false")
.create()
.map_err(|e| anyhow::anyhow!("Consumer creation failed: {}", e))?;
let metadata = admin
.inner()
.fetch_metadata(Some(topic.as_str()), Duration::from_secs(30))
.map_err(|e| anyhow::anyhow!("Fetch metadata failed: {}", e))?;
let topic_metadata = metadata
.topics()
.iter()
.find(|t| t.name() == topic)
.ok_or_else(|| anyhow::anyhow!("Topic {} not found", topic))?;
let mut tpl = TopicPartitionList::new();
for partition in topic_metadata.partitions() {
let (low_watermark, high_watermark) = consumer
.fetch_watermarks(&topic, partition.id(), Duration::from_secs(30))
.map_err(|e| {
anyhow::anyhow!(
"Fetch watermarks failed for partition {}: {}",
partition.id(),
e
)
})?;
let target_offset = match to.as_str() {
"earliest" => low_watermark,
"latest" => high_watermark,
offset_str => offset_str
.parse()
.map_err(|_| anyhow::anyhow!("Invalid offset: {}", offset_str))?,
};
let _ =
tpl.add_partition_offset(&topic, partition.id(), Offset::Offset(target_offset));
}
consumer
.commit(&tpl, CommitMode::Sync)
.map_err(|e| anyhow::anyhow!("Failed to commit offsets: {}", e))?;
println!("Successfully reset offsets for group {} on topic {} to {}", group, topic, to);
Ok(())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_kafka_commands_metrics_variant() {
let _cmd = KafkaCommands::Metrics {
format: "text".to_string(),
};
}
#[test]
fn test_kafka_commands_serve_variant() {
let _cmd = KafkaCommands::Serve {
port: 9092,
host: "127.0.0.1".to_string(),
config: None,
};
}
#[test]
fn test_kafka_commands_produce_variant() {
let _cmd = KafkaCommands::Produce {
topic: "test-topic".to_string(),
key: Some("key1".to_string()),
value: "test message".to_string(),
partition: None,
header: vec![],
};
}
#[test]
fn test_kafka_commands_produce_with_partition() {
let _cmd = KafkaCommands::Produce {
topic: "test-topic".to_string(),
key: None,
value: "message".to_string(),
partition: Some(2),
header: vec!["header1:value1".to_string()],
};
}
#[test]
fn test_kafka_topic_create_variant() {
let _cmd = KafkaTopicCommands::Create {
name: "new-topic".to_string(),
partitions: 3,
replication_factor: 1,
};
}
#[test]
fn test_kafka_topic_list_variant() {
let _cmd = KafkaTopicCommands::List;
}
#[test]
fn test_kafka_groups_list_variant() {
let _cmd = KafkaGroupsCommands::List;
}
}