use crate::prelude::{Identifier, IggyDuration, IggyError};
use crate::stream_builder::{IggyConsumerConfig, IggyProducerConfig};
use bon::Builder;
#[derive(Builder, Default, Debug, Clone)]
pub struct IggyStreamConfig {
consumer_config: IggyConsumerConfig,
producer_config: IggyProducerConfig,
}
impl IggyStreamConfig {
pub fn new(consumer_config: IggyConsumerConfig, producer_config: IggyProducerConfig) -> Self {
Self {
consumer_config,
producer_config,
}
}
pub fn from_stream_topic(
stream: &str,
topic: &str,
batch_length: u32,
linger_time: IggyDuration,
polling_interval: IggyDuration,
) -> Result<Self, IggyError> {
let consumer_config =
IggyConsumerConfig::from_stream_topic(stream, topic, batch_length, polling_interval)?;
let producer_config =
IggyProducerConfig::from_stream_topic(stream, topic, batch_length, linger_time)?;
Ok(Self {
consumer_config,
producer_config,
})
}
}
impl IggyStreamConfig {
pub fn consumer_config(&self) -> &IggyConsumerConfig {
&self.consumer_config
}
pub fn producer_config(&self) -> &IggyProducerConfig {
&self.producer_config
}
pub fn stream_id(&self) -> &Identifier {
self.producer_config.stream_id()
}
pub fn stream_name(&self) -> &str {
self.producer_config.stream_name()
}
pub fn topic_id(&self) -> &Identifier {
self.producer_config.topic_id()
}
pub fn topic_name(&self) -> &str {
self.producer_config.topic_name()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn should_be_equal() {
let consumer_config = IggyConsumerConfig::from_stream_topic(
"test_stream",
"test_topic",
100,
IggyDuration::from_str("5ms").unwrap(),
)
.unwrap();
let producer_config = IggyProducerConfig::from_stream_topic(
"test_stream",
"test_topic",
100,
IggyDuration::from_str("5ms").unwrap(),
)
.unwrap();
let config = IggyStreamConfig::new(consumer_config, producer_config);
assert_eq!(config.stream_name(), "test_stream");
assert_eq!(config.topic_name(), "test_topic");
assert_eq!(config.consumer_config().batch_length(), 100);
assert_eq!(config.producer_config().batch_length(), 100);
assert_eq!(
config.consumer_config().polling_interval(),
IggyDuration::from_str("5ms").unwrap()
);
assert_eq!(
config.producer_config().linger_time(),
IggyDuration::from_str("5ms").unwrap()
);
}
#[test]
fn should_be_default() {
let config = IggyStreamConfig::default();
assert_eq!(config.stream_name(), "test_stream");
assert_eq!(config.topic_name(), "test_topic");
assert_eq!(config.consumer_config().batch_length(), 100);
assert_eq!(config.producer_config().batch_length(), 100);
assert_eq!(
config.consumer_config().polling_interval(),
IggyDuration::from_str("5ms").unwrap()
);
assert_eq!(
config.producer_config().linger_time(),
IggyDuration::from_str("5ms").unwrap()
);
}
#[test]
fn should_be_from_stream_topic() {
let res = IggyStreamConfig::from_stream_topic(
"test_stream",
"test_topic",
100,
IggyDuration::from_str("5ms").unwrap(),
IggyDuration::from_str("5ms").unwrap(),
);
assert!(res.is_ok());
let config = res.unwrap();
assert_eq!(config.stream_name(), "test_stream");
assert_eq!(config.topic_name(), "test_topic");
assert_eq!(config.consumer_config().batch_length(), 100);
assert_eq!(config.producer_config().batch_length(), 100);
assert_eq!(
config.consumer_config().polling_interval(),
IggyDuration::from_str("5ms").unwrap()
);
assert_eq!(
config.producer_config().linger_time(),
IggyDuration::from_str("5ms").unwrap()
);
}
}