danube_client/client.rs
1use std::path::Path;
2use std::sync::Arc;
3use tonic::transport::{Certificate, ClientTlsConfig, Identity, Uri};
4
5use crate::{
6 auth_service::AuthService,
7 connection_manager::{ConnectionManager, ConnectionOptions},
8 consumer::ConsumerBuilder,
9 errors::Result,
10 health_check::HealthCheckService,
11 lookup_service::{LookupResult, LookupService},
12 producer::ProducerBuilder,
13 schema_registry_client::SchemaRegistryClient,
14};
15
16/// The main client for interacting with the Danube messaging system.
17///
18/// The `DanubeClient` struct is designed to facilitate communication with the Danube messaging system.
19/// It provides various methods for managing producers and consumers, performing topic lookups, and retrieving schema information. This client acts as the central interface for interacting with the messaging system and managing connections and services.
20#[derive(Debug, Clone)]
21pub struct DanubeClient {
22 pub(crate) uri: Uri,
23 pub(crate) cnx_manager: Arc<ConnectionManager>,
24 pub(crate) lookup_service: LookupService,
25 pub(crate) health_check_service: HealthCheckService,
26 pub(crate) auth_service: AuthService,
27}
28
29impl DanubeClient {
30 /// Initializes a new `DanubeClientBuilder` instance.
31 ///
32 /// The builder pattern allows for configuring and constructing a `DanubeClient` instance with optional settings and options.
33 /// Using the builder, you can customize various aspects of the `DanubeClient`, such as connection settings, timeouts, and other configurations before creating the final `DanubeClient` instance.
34 pub fn builder() -> DanubeClientBuilder {
35 DanubeClientBuilder::default()
36 }
37
38 /// Returns a new `ProducerBuilder` for configuring and creating a `Producer` instance.
39 ///
40 /// This method initializes a `ProducerBuilder`, which is used to set up various options and settings for a `Producer`.
41 /// The builder pattern allows you to specify details such as the topic, producer name, partitions, schema, and other configurations before creating the final `Producer` instance.
42 pub fn new_producer(&self) -> ProducerBuilder {
43 ProducerBuilder::new(self)
44 }
45
46 /// Returns a new `ConsumerBuilder` for configuring and creating a `Consumer` instance.
47 ///
48 /// This method initializes a `ConsumerBuilder`, which is used to set up various options and settings for a `Consumer`.
49 /// The builder pattern allows you to specify details such as the topic, consumer name, subscription, subscription type, and other configurations before creating the final `Consumer` instance.
50 pub fn new_consumer(&self) -> ConsumerBuilder {
51 ConsumerBuilder::new(self)
52 }
53
54 /// Returns a `SchemaRegistryClient` for schema registry operations.
55 ///
56 /// The returned client shares the same connection manager and auth service as the `DanubeClient`
57 pub fn schema(&self) -> SchemaRegistryClient {
58 SchemaRegistryClient::new(
59 self.cnx_manager.clone(),
60 self.auth_service.clone(),
61 self.uri.clone(),
62 )
63 }
64
65 /// Returns a reference to the `AuthService`.
66 ///
67 /// This method provides access to the `AuthService` instance used by the `DanubeClient`.
68 pub fn auth_service(&self) -> &AuthService {
69 &self.auth_service
70 }
71
72 /// Retrieves the address of the broker responsible for a specified topic.
73 ///
74 /// This asynchronous method performs a lookup to find the broker that is responsible for the given topic. The `addr` parameter specifies the address of the broker to connect to for performing the lookup. The method returns information about the broker handling the topic.
75 ///
76 /// # Parameters
77 ///
78 /// - `addr`: The address of the broker to connect to for the lookup. This is provided as a `&Uri`, which specifies where the request should be sent.
79 /// - `topic`: The name of the topic for which to look up the broker.
80 ///
81 /// # Returns
82 ///
83 /// - `Ok(LookupResult)`: Contains the result of the lookup operation, including the broker address.
84 /// - `Err(e)`: An error if the lookup fails or if there are issues during the operation. This could include connectivity problems, invalid topic names, or other errors related to the lookup process.
85 pub async fn lookup_topic(&self, addr: &Uri, topic: impl Into<String>) -> Result<LookupResult> {
86 self.lookup_service.lookup_topic(addr, topic).await
87 }
88}
89
90/// A builder for configuring and creating a `DanubeClient` instance.
91///
92/// The `DanubeClientBuilder` struct provides methods for setting various options needed to construct a `DanubeClient`. This includes configuring the base URI for the Danube service, connection settings.
93///
94/// # Fields
95///
96/// - `uri`: The base URI for the Danube service. This is a required field and specifies the address of the service that the client will connect to. It is essential for constructing the `DanubeClient`.
97/// - `connection_options`: Optional connection settings that define how the grpc client connects to the Danube service. These settings can include parameters such as timeouts, retries, and other connection-related configurations.
98#[derive(Debug, Clone, Default)]
99pub struct DanubeClientBuilder {
100 uri: String,
101 connection_options: ConnectionOptions,
102 api_key: Option<String>,
103}
104
105impl DanubeClientBuilder {
106 /// Sets the base URI for the Danube service in the builder.
107 ///
108 /// This method configures the base URI that the `DanubeClient` will use to connect to the Danube service. The base URI is a required parameter for establishing a connection and interacting with the service.
109 ///
110 /// # Parameters
111 ///
112 /// - `url`: The base URI to use for connecting to the Danube service. The URI should include the protocol and address of the Danube service.
113 pub fn service_url(mut self, url: impl Into<String>) -> Self {
114 self.uri = url.into();
115 self
116 }
117
118 /// Sets the TLS configuration for the client in the builder.
119 pub fn with_tls(mut self, ca_cert: impl AsRef<Path>) -> Result<Self> {
120 let tls_config =
121 ClientTlsConfig::new().ca_certificate(Certificate::from_pem(std::fs::read(ca_cert)?));
122 self.connection_options.tls_config = Some(tls_config);
123 self.connection_options.use_tls = true;
124 Ok(self)
125 }
126
127 /// Sets the mutual TLS configuration for the client in the builder.
128 pub fn with_mtls(
129 mut self,
130 ca_cert: impl AsRef<Path>,
131 client_cert: impl AsRef<Path>,
132 client_key: impl AsRef<Path>,
133 ) -> Result<Self> {
134 let ca_data = std::fs::read(ca_cert)?;
135 let cert_data = std::fs::read(client_cert)?;
136 let key_data = std::fs::read(client_key)?;
137
138 let tls_config = ClientTlsConfig::new()
139 .ca_certificate(Certificate::from_pem(ca_data))
140 .identity(Identity::from_pem(cert_data, key_data));
141
142 self.connection_options.tls_config = Some(tls_config);
143 self.connection_options.use_tls = true;
144 Ok(self)
145 }
146
147 /// Sets the API key for the client in the builder.
148 ///
149 /// Automatically enables TLS. If no TLS config has been set via `with_tls()` or
150 /// `with_mtls()`, a default TLS config using system root certificates is applied.
151 pub fn with_api_key(mut self, api_key: impl Into<String>) -> Self {
152 self.api_key = Some(api_key.into());
153 if self.connection_options.tls_config.is_none() {
154 self.connection_options.tls_config = Some(ClientTlsConfig::new());
155 }
156 self.connection_options.use_tls = true;
157 self
158 }
159
160 /// Constructs and returns a `DanubeClient` instance based on the configuration specified in the builder.
161 ///
162 /// This method finalizes the configuration and creates a new `DanubeClient` instance. It uses the settings and options that were configured using the `DanubeClientBuilder` methods.
163 ///
164 /// # Returns
165 ///
166 /// - `Ok(DanubeClient)`: A new instance of `DanubeClient` configured with the specified options.
167 /// - `Err(e)`: An error if the configuration is invalid or incomplete.
168 pub async fn build(mut self) -> Result<DanubeClient> {
169 let uri = self.uri.parse::<Uri>()?;
170
171 // Set api_key on connection_options before creating the single ConnectionManager
172 if let Some(ref api_key) = self.api_key {
173 self.connection_options.api_key = Some(api_key.clone());
174 }
175
176 let cnx_manager = Arc::new(ConnectionManager::new(self.connection_options));
177 let auth_service = AuthService::new(cnx_manager.clone());
178
179 // Authenticate if api_key is present; token is cached inside AuthService
180 if let Some(ref api_key) = self.api_key {
181 auth_service.authenticate_client(&uri, api_key).await?;
182 }
183
184 let lookup_service = LookupService::new(cnx_manager.clone(), auth_service.clone());
185 let health_check_service =
186 HealthCheckService::new(cnx_manager.clone(), auth_service.clone());
187
188 Ok(DanubeClient {
189 uri,
190 cnx_manager,
191 lookup_service,
192 health_check_service,
193 auth_service,
194 })
195 }
196}