use crate::clients::client::IggyClient;
use crate::clients::producer::IggyProducer;
use crate::clients::producer_config::DirectConfig;
use crate::prelude::{IggyError, IggyExpiry, MaxTopicSize};
use crate::stream_builder::IggyProducerConfig;
use tracing::{error, trace};
pub(crate) async fn build_iggy_producer(
client: &IggyClient,
config: &IggyProducerConfig,
) -> Result<IggyProducer, IggyError> {
trace!("Extract config fields.");
let stream = config.stream_name();
let topic = config.topic_name();
let topic_partitions_count = config.topic_partitions_count();
let topic_replication_factor = config.topic_replication_factor();
let batch_length = config.batch_length();
let linger_time = config.linger_time();
let partitioning = config.partitioning().to_owned();
let send_retries = config.send_retries_count();
let send_retries_interval = config.send_retries_interval();
trace!("Build iggy producer");
let mut builder = client
.producer(stream, topic)?
.partitioning(partitioning)
.create_stream_if_not_exists()
.send_retries(send_retries, send_retries_interval)
.create_topic_if_not_exists(
topic_partitions_count,
topic_replication_factor,
IggyExpiry::ServerDefault,
MaxTopicSize::ServerDefault,
)
.direct(
DirectConfig::builder()
.batch_length(batch_length)
.linger_time(linger_time)
.build(),
);
if let Some(encryptor) = config.encryptor() {
builder = builder.encryptor(encryptor);
}
trace!("Initialize iggy producer");
let producer = builder.build();
producer.init().await.map_err(|err| {
error!("Failed to initialize consumer: {err}");
err
})?;
Ok(producer)
}