use std::sync::Arc;
use std::time::Duration;
use tracing::info;
use crate::auth::AuthConfig;
use crate::error::{KrafkaError, Result};
use crate::metadata::{ClusterMetadata, MetadataRecoveryStrategy};
use crate::network::{ConnectionConfig, ConnectionConfigBuilder, ConnectionPool};
#[derive(Clone)]
pub struct KrafkaClient {
pool: Arc<ConnectionPool>,
metadata: Arc<ClusterMetadata>,
}
impl KrafkaClient {
pub fn builder(bootstrap_servers: impl Into<String>) -> KrafkaClientBuilder {
KrafkaClientBuilder {
bootstrap_servers: bootstrap_servers.into(),
client_id: "krafka".to_string(),
auth: None,
request_timeout: Duration::from_secs(30),
metadata_max_age: Duration::from_secs(300),
metadata_recovery_strategy: MetadataRecoveryStrategy::Rebootstrap,
metadata_recovery_rebootstrap_trigger: Duration::from_secs(300),
metadata_topic_cache_ttl: Some(Duration::from_secs(300)),
#[cfg(feature = "socks5")]
proxy: None,
}
}
pub fn pool(&self) -> &Arc<ConnectionPool> {
&self.pool
}
pub fn metadata(&self) -> &Arc<ClusterMetadata> {
&self.metadata
}
}
#[must_use = "builders do nothing until .build().await is called"]
pub struct KrafkaClientBuilder {
bootstrap_servers: String,
client_id: String,
auth: Option<AuthConfig>,
request_timeout: Duration,
metadata_max_age: Duration,
metadata_recovery_strategy: MetadataRecoveryStrategy,
metadata_recovery_rebootstrap_trigger: Duration,
metadata_topic_cache_ttl: Option<Duration>,
#[cfg(feature = "socks5")]
proxy: Option<crate::network::ProxyConfig>,
}
impl KrafkaClientBuilder {
pub fn client_id(mut self, id: impl Into<String>) -> Self {
self.client_id = id.into();
self
}
pub fn auth(mut self, auth: AuthConfig) -> Self {
self.auth = Some(auth);
self
}
pub fn sasl_oauthbearer_provider(
mut self,
provider: impl crate::auth::OAuthBearerTokenProvider + 'static,
) -> Self {
self.auth = Some(AuthConfig::sasl_oauthbearer_provider(provider));
self
}
pub fn request_timeout(mut self, timeout: Duration) -> Self {
self.request_timeout = timeout;
self
}
pub fn metadata_max_age(mut self, duration: Duration) -> Self {
self.metadata_max_age = duration;
self
}
pub fn metadata_recovery_strategy(mut self, strategy: MetadataRecoveryStrategy) -> Self {
self.metadata_recovery_strategy = strategy;
self
}
pub fn metadata_recovery_rebootstrap_trigger(mut self, duration: Duration) -> Self {
self.metadata_recovery_rebootstrap_trigger = duration;
self
}
pub fn metadata_topic_cache_ttl(mut self, ttl: Duration) -> Self {
self.metadata_topic_cache_ttl = Some(ttl);
self
}
pub fn disable_metadata_topic_cache_ttl(mut self) -> Self {
self.metadata_topic_cache_ttl = None;
self
}
#[cfg(feature = "socks5")]
#[cfg_attr(docsrs, doc(cfg(feature = "socks5")))]
pub fn proxy(mut self, proxy: crate::network::ProxyConfig) -> Self {
self.proxy = Some(proxy);
self
}
pub async fn build(self) -> Result<KrafkaClient> {
if self.bootstrap_servers.is_empty() {
return Err(KrafkaError::config("bootstrap_servers must not be empty"));
}
let mut pool_config_builder: ConnectionConfigBuilder = ConnectionConfig::builder()
.client_id(&self.client_id)
.request_timeout(self.request_timeout);
if let Some(ref auth) = self.auth {
pool_config_builder = pool_config_builder.auth(auth.clone());
}
#[cfg(feature = "socks5")]
if let Some(ref proxy) = self.proxy {
pool_config_builder = pool_config_builder.proxy(proxy.clone());
}
let mut pool_config = pool_config_builder.build()?;
pool_config.init_tls().await?;
let pool = Arc::new(ConnectionPool::new(pool_config));
pool.start_idle_evictor();
let bootstrap_servers = crate::util::parse_bootstrap_servers(&self.bootstrap_servers)?;
let mut meta = ClusterMetadata::new(bootstrap_servers, pool.clone(), self.metadata_max_age)
.with_recovery_strategy(self.metadata_recovery_strategy)
.with_rebootstrap_trigger(self.metadata_recovery_rebootstrap_trigger);
meta = match self.metadata_topic_cache_ttl {
Some(ttl) => meta.with_topic_cache_ttl(ttl),
None => meta.with_topic_cache_ttl_disabled(),
};
let metadata = Arc::new(meta);
metadata.refresh().await?;
info!(
bootstrap_servers = %self.bootstrap_servers,
brokers = metadata.brokers().len(),
"KrafkaClient initialized"
);
Ok(KrafkaClient { pool, metadata })
}
}