pub mod kafka_client;
pub mod kafka_consumer;
pub mod with_consumer;
pub mod with_kafka;
pub use kafka_client::*;
pub use kafka_consumer::*;
pub use with_consumer::*;
pub use with_kafka::*;
#[derive(serde::Deserialize, Clone, Debug)]
pub struct KafkaSettings {
bootstrap_servers: Option<String>,
message_timeout_ms: Option<u32>,
queue_timeout_secs: Option<u32>,
}
impl Default for KafkaSettings {
fn default() -> Self {
Self {
bootstrap_servers: Some("127.0.0.1:9092".to_string()),
message_timeout_ms: Some(5000),
queue_timeout_secs: Some(10),
}
}
}
impl KafkaSettings {
pub fn bootstrap_servers(&self) -> String {
self.bootstrap_servers.clone().unwrap()
}
pub fn seeds(&self) -> String {
self.bootstrap_servers.clone().unwrap()
}
pub fn get(settings: &Option<KafkaSettings>) -> Self {
let _default = Self::default();
if settings.is_some() {
let mut s = settings.clone().unwrap();
if s.bootstrap_servers.is_none() {
s.bootstrap_servers = _default.bootstrap_servers;
}
if s.message_timeout_ms.is_none() {
s.message_timeout_ms = _default.message_timeout_ms;
}
if s.queue_timeout_secs.is_none() {
s.queue_timeout_secs = _default.queue_timeout_secs;
}
s
} else {
_default
}
}
}
impl std::fmt::Display for KafkaSettings {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.write_str(&self.bootstrap_servers.clone().unwrap())?;
Ok(())
}
}