use crate::prelude::{EncryptorKind, Identifier, IggyDuration, IggyError, Partitioning};
use bon::Builder;
use std::str::FromStr;
use std::sync::Arc;
#[derive(Builder, Debug, Clone)]
#[builder(on(String, into))]
pub struct IggyProducerConfig {
stream_id: Identifier,
stream_name: String,
topic_id: Identifier,
topic_name: String,
topic_partitions_count: u32,
topic_replication_factor: Option<u8>,
batch_length: u32,
linger_time: IggyDuration,
partitioning: Partitioning,
send_retries_count: Option<u32>,
send_retries_interval: Option<IggyDuration>,
encryptor: Option<Arc<EncryptorKind>>,
}
impl Default for IggyProducerConfig {
fn default() -> Self {
let stream_id = Identifier::from_str_value("test_stream").unwrap();
let topic_id = Identifier::from_str_value("test_topic").unwrap();
Self {
stream_id,
stream_name: "test_stream".to_string(),
topic_id,
topic_name: "test_topic".to_string(),
batch_length: 100,
linger_time: IggyDuration::from_str("5ms").unwrap(),
partitioning: Partitioning::balanced(),
topic_partitions_count: 1,
topic_replication_factor: None,
encryptor: None,
send_retries_count: Some(3),
send_retries_interval: Some(IggyDuration::new_from_secs(1)),
}
}
}
impl IggyProducerConfig {
#[allow(clippy::too_many_arguments)]
pub fn new(
stream_id: Identifier,
stream_name: String,
topic_id: Identifier,
topic_name: String,
topic_partitions_count: u32,
topic_replication_factor: Option<u8>,
batch_length: u32,
linger_time: IggyDuration,
partitioning: Partitioning,
encryptor: Option<Arc<EncryptorKind>>,
send_retries_count: Option<u32>,
send_retries_interval: Option<IggyDuration>,
) -> Self {
Self {
stream_id,
stream_name,
topic_id,
topic_name,
topic_partitions_count,
topic_replication_factor,
batch_length,
linger_time,
partitioning,
encryptor,
send_retries_count,
send_retries_interval,
}
}
pub fn from_stream_topic(
stream: &str,
topic: &str,
batch_length: u32,
linger_time: IggyDuration,
) -> Result<Self, IggyError> {
let stream_id = Identifier::from_str_value(stream)?;
let topic_id = Identifier::from_str_value(topic)?;
Ok(Self {
stream_id,
stream_name: stream.to_string(),
topic_id,
topic_name: topic.to_string(),
batch_length,
linger_time,
partitioning: Partitioning::balanced(),
topic_partitions_count: 1,
topic_replication_factor: None,
encryptor: None,
send_retries_count: Some(3),
send_retries_interval: Some(IggyDuration::new_from_secs(1)),
})
}
}
impl IggyProducerConfig {
pub fn stream_id(&self) -> &Identifier {
&self.stream_id
}
pub fn stream_name(&self) -> &str {
&self.stream_name
}
pub fn topic_id(&self) -> &Identifier {
&self.topic_id
}
pub fn topic_name(&self) -> &str {
&self.topic_name
}
pub fn batch_length(&self) -> u32 {
self.batch_length
}
pub fn linger_time(&self) -> IggyDuration {
self.linger_time
}
pub fn partitioning(&self) -> &Partitioning {
&self.partitioning
}
pub fn topic_partitions_count(&self) -> u32 {
self.topic_partitions_count
}
pub fn topic_replication_factor(&self) -> Option<u8> {
self.topic_replication_factor
}
pub fn encryptor(&self) -> Option<Arc<EncryptorKind>> {
self.encryptor.clone()
}
pub fn send_retries_count(&self) -> Option<u32> {
self.send_retries_count
}
pub fn send_retries_interval(&self) -> Option<IggyDuration> {
self.send_retries_interval
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn should_be_equal() {
let stream = "test_stream";
let topic = "test_topic";
let config = IggyProducerConfig::builder()
.stream_id(Identifier::from_str_value(stream).unwrap())
.stream_name(stream)
.topic_id(Identifier::from_str_value(topic).unwrap())
.topic_name(topic)
.topic_partitions_count(3)
.batch_length(100)
.linger_time(IggyDuration::from_str("5ms").unwrap())
.partitioning(Partitioning::balanced())
.send_retries_count(3)
.send_retries_interval(IggyDuration::new_from_secs(1))
.build();
assert_eq!(
config.stream_id(),
&Identifier::from_str_value("test_stream").unwrap()
);
assert_eq!(config.stream_name(), "test_stream");
assert_eq!(
config.topic_id(),
&Identifier::from_str_value("test_topic").unwrap()
);
assert_eq!(config.topic_name(), "test_topic");
assert_eq!(config.batch_length(), 100);
assert_eq!(config.linger_time(), IggyDuration::from_str("5ms").unwrap());
assert_eq!(config.partitioning(), &Partitioning::balanced());
assert_eq!(config.topic_partitions_count(), 3);
assert_eq!(config.topic_replication_factor(), None);
assert_eq!(config.send_retries_count(), Some(3));
assert_eq!(
config.send_retries_interval(),
Some(IggyDuration::new_from_secs(1))
);
}
#[test]
fn should_be_default() {
let stream_id = Identifier::from_str_value("test_stream").unwrap();
let topic_id = Identifier::from_str_value("test_topic").unwrap();
let config = IggyProducerConfig::default();
assert_eq!(config.stream_id(), &stream_id);
assert_eq!(config.stream_name(), "test_stream");
assert_eq!(config.topic_id(), &topic_id);
assert_eq!(config.topic_name(), "test_topic");
assert_eq!(config.batch_length(), 100);
assert_eq!(config.linger_time(), IggyDuration::from_str("5ms").unwrap());
assert_eq!(config.partitioning(), &Partitioning::balanced());
assert_eq!(config.topic_partitions_count(), 1);
assert_eq!(config.topic_replication_factor(), None);
assert_eq!(config.send_retries_count(), Some(3));
assert_eq!(
config.send_retries_interval(),
Some(IggyDuration::new_from_secs(1))
);
}
#[test]
fn should_be_new() {
let stream_id = Identifier::from_str_value("test_stream").unwrap();
let topic_id = Identifier::from_str_value("test_topic").unwrap();
let config = IggyProducerConfig::new(
stream_id.clone(),
String::from("test_stream"),
topic_id.clone(),
String::from("test_topic"),
3,
None,
100,
IggyDuration::from_str("5ms").unwrap(),
Partitioning::balanced(),
None,
None,
None,
);
assert_eq!(config.stream_id(), &stream_id);
assert_eq!(config.stream_name(), "test_stream");
assert_eq!(config.topic_id(), &topic_id);
assert_eq!(config.topic_name(), "test_topic");
assert_eq!(config.batch_length(), 100);
assert_eq!(config.linger_time(), IggyDuration::from_str("5ms").unwrap());
assert_eq!(config.partitioning(), &Partitioning::balanced());
assert_eq!(config.topic_partitions_count(), 3);
assert_eq!(config.topic_replication_factor(), None);
assert_eq!(config.send_retries_count(), None);
assert_eq!(config.send_retries_interval(), None);
}
#[test]
fn should_be_from_stream_topic() {
let stream_id = Identifier::from_str_value("test_stream").unwrap();
let topic_id = Identifier::from_str_value("test_topic").unwrap();
let res = IggyProducerConfig::from_stream_topic(
"test_stream",
"test_topic",
100,
IggyDuration::from_str("5ms").unwrap(),
);
assert!(res.is_ok());
let config = res.unwrap();
assert_eq!(config.stream_id(), &stream_id);
assert_eq!(config.stream_name(), "test_stream");
assert_eq!(config.topic_id(), &topic_id);
assert_eq!(config.topic_name(), "test_topic");
assert_eq!(config.batch_length(), 100);
assert_eq!(config.linger_time(), IggyDuration::from_str("5ms").unwrap());
assert_eq!(config.partitioning(), &Partitioning::balanced());
assert_eq!(config.topic_partitions_count(), 1);
assert_eq!(config.topic_replication_factor(), None);
}
}