use std::time::Duration;
use super::DEFAULT_BROKER;
use crate::{EnvVarExp, Result};
use clap::Args;
use human_units::iec::iec_unit;
use tansu_generator::Generate;
use tansu_sans_io::ErrorCode;
use url::Url;
#[iec_unit(symbol = "B/s")]
#[derive(Copy, Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
struct Throughput(pub u32);
#[derive(Args, Clone, Debug)]
pub(super) struct Arg {
#[arg(long, default_value = DEFAULT_BROKER, env = "ADVERTISED_LISTENER_URL")]
broker: EnvVarExp<Url>,
#[clap(value_parser)]
topic: String,
#[arg(long, default_value = "0")]
partition: i32,
#[arg(long, env = "SCHEMA_REGISTRY")]
schema_registry: EnvVarExp<Url>,
#[arg(long, default_value = "1")]
batch_size: u32,
#[clap(long, group = "output")]
per_second: Option<u32>,
#[clap(long, group = "output")]
throughput: Option<Throughput>,
#[arg(long, default_value = "1")]
producers: u32,
#[arg(long)]
duration_seconds: Option<u64>,
#[arg(long, env = "OTEL_EXPORTER_OTLP_ENDPOINT")]
otlp_endpoint_url: Option<EnvVarExp<Url>>,
}
impl Arg {
pub(super) async fn main(self) -> Result<ErrorCode> {
let generate = Generate::builder()
.broker(self.broker.into_inner())
.topic(self.topic)
.partition(self.partition)
.schema_registry(self.schema_registry.into_inner())
.batch_size(self.batch_size)
.per_second(self.per_second)
.throughput(self.throughput.map(|throughput| throughput.0))
.producers(self.producers)
.duration(self.duration_seconds.map(Duration::from_secs))
.otlp_endpoint_url(
self.otlp_endpoint_url
.map(|expression| expression.into_inner()),
)
.build()?;
generate.main().await.map_err(Into::into)
}
}