use super::DEFAULT_BROKER;
use crate::Result;
use clap::Subcommand;
use tansu_cat::Cat;
use tansu_sans_io::ErrorCode;
use url::Url;
#[derive(Clone, Debug, Subcommand)]
pub(super) enum Command {
Produce {
#[arg(long, default_value = DEFAULT_BROKER, env = "ADVERTISED_LISTENER_URL")]
broker: Url,
#[clap(value_parser)]
topic: String,
#[clap(value_parser, default_value = "-")]
file: String,
#[arg(long, default_value = "0")]
partition: i32,
#[arg(long, env = "SCHEMA_REGISTRY")]
schema_registry: Option<Url>,
},
Consume {
#[arg(long, default_value = DEFAULT_BROKER, env = "ADVERTISED_LISTENER_URL")]
broker: Url,
#[clap(value_parser)]
topic: String,
#[arg(long, default_value = "0")]
partition: i32,
#[arg(long, env = "SCHEMA_REGISTRY")]
schema_registry: Option<Url>,
#[arg(long, default_value = "5000")]
max_wait_time_ms: i32,
#[arg(long, default_value = "1")]
min_bytes: i32,
#[arg(long, default_value = "52428800")]
max_bytes: Option<i32>,
#[arg(long, default_value = "0")]
fetch_offset: i64,
#[arg(long, default_value = "1048576")]
partition_max_bytes: i32,
},
}
impl From<Command> for Cat {
fn from(value: Command) -> Self {
match value {
Command::Produce {
broker,
topic,
file,
partition,
schema_registry,
} => Cat::produce()
.broker(broker)
.topic(topic)
.partition(partition)
.schema_registry(schema_registry)
.file_name(file)
.build(),
Command::Consume {
broker,
topic,
partition,
schema_registry,
max_wait_time_ms,
min_bytes,
max_bytes,
fetch_offset,
partition_max_bytes,
} => Cat::consume()
.broker(broker)
.topic(topic)
.partition(partition)
.schema_registry(schema_registry)
.max_wait_time_ms(max_wait_time_ms)
.min_bytes(min_bytes)
.max_bytes(max_bytes)
.fetch_offset(fetch_offset)
.partition_max_bytes(partition_max_bytes)
.build(),
}
}
}
impl Command {
pub(super) async fn main(self) -> Result<ErrorCode> {
Cat::from(self).main().await.map_err(Into::into)
}
}