use std::path::Path;
use std::sync::Arc;
use tonic::transport::{Certificate, ClientTlsConfig, Identity, Uri};
use crate::{
auth_service::AuthService,
connection_manager::{ConnectionManager, ConnectionOptions},
consumer::ConsumerBuilder,
errors::Result,
health_check::HealthCheckService,
lookup_service::{LookupResult, LookupService},
producer::ProducerBuilder,
schema_registry_client::SchemaRegistryClient,
};
#[derive(Debug, Clone)]
pub struct DanubeClient {
pub(crate) uri: Uri,
pub(crate) cnx_manager: Arc<ConnectionManager>,
pub(crate) lookup_service: LookupService,
pub(crate) health_check_service: HealthCheckService,
pub(crate) auth_service: AuthService,
}
impl DanubeClient {
pub fn builder() -> DanubeClientBuilder {
DanubeClientBuilder::default()
}
pub fn new_producer(&self) -> ProducerBuilder {
ProducerBuilder::new(self)
}
pub fn new_consumer(&self) -> ConsumerBuilder {
ConsumerBuilder::new(self)
}
pub fn schema(&self) -> SchemaRegistryClient {
SchemaRegistryClient::new(
self.cnx_manager.clone(),
self.auth_service.clone(),
self.uri.clone(),
)
}
pub fn auth_service(&self) -> &AuthService {
&self.auth_service
}
pub async fn lookup_topic(&self, addr: &Uri, topic: impl Into<String>) -> Result<LookupResult> {
self.lookup_service.lookup_topic(addr, topic).await
}
}
#[derive(Debug, Clone, Default)]
pub struct DanubeClientBuilder {
uri: String,
connection_options: ConnectionOptions,
api_key: Option<String>,
}
impl DanubeClientBuilder {
pub fn service_url(mut self, url: impl Into<String>) -> Self {
self.uri = url.into();
self
}
pub fn with_tls(mut self, ca_cert: impl AsRef<Path>) -> Result<Self> {
let tls_config =
ClientTlsConfig::new().ca_certificate(Certificate::from_pem(std::fs::read(ca_cert)?));
self.connection_options.tls_config = Some(tls_config);
self.connection_options.use_tls = true;
Ok(self)
}
pub fn with_mtls(
mut self,
ca_cert: impl AsRef<Path>,
client_cert: impl AsRef<Path>,
client_key: impl AsRef<Path>,
) -> Result<Self> {
let ca_data = std::fs::read(ca_cert)?;
let cert_data = std::fs::read(client_cert)?;
let key_data = std::fs::read(client_key)?;
let tls_config = ClientTlsConfig::new()
.ca_certificate(Certificate::from_pem(ca_data))
.identity(Identity::from_pem(cert_data, key_data));
self.connection_options.tls_config = Some(tls_config);
self.connection_options.use_tls = true;
Ok(self)
}
pub fn with_api_key(mut self, api_key: impl Into<String>) -> Self {
self.api_key = Some(api_key.into());
if self.connection_options.tls_config.is_none() {
self.connection_options.tls_config = Some(ClientTlsConfig::new());
}
self.connection_options.use_tls = true;
self
}
pub async fn build(mut self) -> Result<DanubeClient> {
let uri = self.uri.parse::<Uri>()?;
if let Some(ref api_key) = self.api_key {
self.connection_options.api_key = Some(api_key.clone());
}
let cnx_manager = Arc::new(ConnectionManager::new(self.connection_options));
let auth_service = AuthService::new(cnx_manager.clone());
if let Some(ref api_key) = self.api_key {
auth_service.authenticate_client(&uri, api_key).await?;
}
let lookup_service = LookupService::new(cnx_manager.clone(), auth_service.clone());
let health_check_service =
HealthCheckService::new(cnx_manager.clone(), auth_service.clone());
Ok(DanubeClient {
uri,
cnx_manager,
lookup_service,
health_check_service,
auth_service,
})
}
}