use std::sync::Arc;
use thiserror::Error;
use crate::{
backoff::BackoffConfig,
build_info::DEFAULT_CLIENT_ID,
client::partition::PartitionClient,
connection::{BrokerConnector, MetadataLookupMode, TlsConfig},
protocol::primitives::Boolean,
topic::Topic,
};
pub mod consumer;
pub mod controller;
pub mod error;
pub(crate) mod metadata_cache;
pub mod partition;
pub mod producer;
use error::{Error, Result};
use self::{controller::ControllerClient, partition::UnknownTopicHandling};
pub use crate::connection::{Credentials, OauthBearerCredentials, OauthCallback, SaslConfig};
#[derive(Debug, Error)]
pub enum ProduceError {
#[error("Broker error: {0}")]
BrokerError(#[from] crate::connection::Error),
#[error("Request error: {0}")]
RequestError(#[from] crate::messenger::RequestError),
#[error("Got duplicate results for topic '{topic}' and partition {partition}")]
DuplicateResult { topic: String, partition: i32 },
#[error("No result for record {index}")]
NoResult { index: usize },
}
pub struct ClientBuilder {
bootstrap_brokers: Vec<String>,
client_id: Option<Arc<str>>,
max_message_size: usize,
socks5_proxy: Option<String>,
tls_config: TlsConfig,
sasl_config: Option<SaslConfig>,
backoff_config: Arc<BackoffConfig>,
}
impl ClientBuilder {
pub fn new(bootstrap_brokers: Vec<String>) -> Self {
Self {
bootstrap_brokers,
client_id: None,
max_message_size: 100 * 1024 * 1024, socks5_proxy: None,
tls_config: TlsConfig::default(),
sasl_config: None,
backoff_config: Default::default(),
}
}
pub fn client_id(mut self, client_id: impl Into<Arc<str>>) -> Self {
self.client_id = Some(client_id.into());
self
}
pub fn max_message_size(mut self, max_message_size: usize) -> Self {
self.max_message_size = max_message_size;
self
}
pub fn backoff_config(mut self, backoff_config: BackoffConfig) -> Self {
self.backoff_config = Arc::from(backoff_config);
self
}
#[cfg(feature = "transport-socks5")]
pub fn socks5_proxy(mut self, proxy: String) -> Self {
self.socks5_proxy = Some(proxy);
self
}
#[cfg(feature = "transport-tls")]
pub fn tls_config(mut self, tls_config: Arc<rustls::ClientConfig>) -> Self {
self.tls_config = Some(tls_config);
self
}
pub fn sasl_config(mut self, sasl_config: SaslConfig) -> Self {
self.sasl_config = Some(sasl_config);
self
}
pub async fn build(self) -> Result<Client> {
let brokers = Arc::new(BrokerConnector::new(
self.bootstrap_brokers,
self.client_id
.unwrap_or_else(|| Arc::from(DEFAULT_CLIENT_ID)),
self.tls_config,
self.socks5_proxy,
self.sasl_config,
self.max_message_size,
Arc::clone(&self.backoff_config),
));
brokers.refresh_metadata().await?;
Ok(Client {
brokers,
backoff_config: self.backoff_config,
})
}
}
impl std::fmt::Debug for ClientBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ClientBuilder").finish_non_exhaustive()
}
}
#[derive(Debug)]
pub struct Client {
brokers: Arc<BrokerConnector>,
backoff_config: Arc<BackoffConfig>,
}
impl Client {
pub fn controller_client(&self) -> Result<ControllerClient> {
Ok(ControllerClient::new(
Arc::clone(&self.brokers),
Arc::clone(&self.backoff_config),
))
}
pub async fn partition_client(
&self,
topic: impl Into<String> + Send,
partition: i32,
unknown_topic_handling: UnknownTopicHandling,
) -> Result<PartitionClient> {
PartitionClient::new(
topic.into(),
partition,
Arc::clone(&self.brokers),
unknown_topic_handling,
Arc::clone(&self.backoff_config),
)
.await
}
pub async fn list_topics(&self) -> Result<Vec<Topic>> {
let (response, _gen) = self
.brokers
.request_metadata(&MetadataLookupMode::ArbitraryBroker, None)
.await?;
Ok(response
.topics
.into_iter()
.filter(|t| !matches!(t.is_internal, Some(Boolean(true))))
.map(|t| Topic {
name: t.name.0,
partitions: t
.partitions
.into_iter()
.map(|p| p.partition_index.0)
.collect(),
})
.collect())
}
}