danube_client/client.rs
1use std::path::Path;
2use std::sync::Arc;
3use tonic::transport::{Certificate, ClientTlsConfig, Identity, Uri};
4
5use crate::{
6 auth_service::AuthService,
7 connection_manager::{ConnectionManager, ConnectionOptions},
8 consumer::ConsumerBuilder,
9 errors::Result,
10 health_check::HealthCheckService,
11 lookup_service::{LookupResult, LookupService},
12 producer::ProducerBuilder,
13 schema::Schema,
14 schema_service::SchemaService,
15};
16
17/// The main client for interacting with the Danube messaging system.
18///
19/// The `DanubeClient` struct is designed to facilitate communication with the Danube messaging system.
20/// It provides various methods for managing producers and consumers, performing topic lookups, and retrieving schema information. This client acts as the central interface for interacting with the messaging system and managing connections and services.
21#[derive(Debug, Clone)]
22pub struct DanubeClient {
23 pub(crate) uri: Uri,
24 pub(crate) cnx_manager: Arc<ConnectionManager>,
25 pub(crate) lookup_service: LookupService,
26 pub(crate) schema_service: SchemaService,
27 pub(crate) health_check_service: HealthCheckService,
28 pub(crate) auth_service: AuthService,
29}
30
31impl DanubeClient {
32 fn new_client(builder: DanubeClientBuilder, uri: Uri) -> Self {
33 let cnx_manager = ConnectionManager::new(builder.connection_options);
34 let cnx_manager = Arc::new(cnx_manager);
35
36 let auth_service = AuthService::new(cnx_manager.clone());
37
38 let lookup_service = LookupService::new(cnx_manager.clone(), auth_service.clone());
39
40 let schema_service = SchemaService::new(cnx_manager.clone(), auth_service.clone());
41
42 let health_check_service =
43 HealthCheckService::new(cnx_manager.clone(), auth_service.clone());
44
45 DanubeClient {
46 uri: uri,
47 cnx_manager,
48 lookup_service,
49 schema_service,
50 health_check_service,
51 auth_service,
52 }
53 }
54
55 /// Initializes a new `DanubeClientBuilder` instance.
56 ///
57 /// The builder pattern allows for configuring and constructing a `DanubeClient` instance with optional settings and options.
58 /// Using the builder, you can customize various aspects of the `DanubeClient`, such as connection settings, timeouts, and other configurations before creating the final `DanubeClient` instance.
59 pub fn builder() -> DanubeClientBuilder {
60 DanubeClientBuilder::default()
61 }
62
63 /// Returns a new `ProducerBuilder` for configuring and creating a `Producer` instance.
64 ///
65 /// This method initializes a `ProducerBuilder`, which is used to set up various options and settings for a `Producer`.
66 /// The builder pattern allows you to specify details such as the topic, producer name, partitions, schema, and other configurations before creating the final `Producer` instance.
67 pub fn new_producer(&self) -> ProducerBuilder {
68 ProducerBuilder::new(self)
69 }
70
71 /// Returns a new `ConsumerBuilder` for configuring and creating a `Consumer` instance.
72 ///
73 /// This method initializes a `ConsumerBuilder`, which is used to set up various options and settings for a `Consumer`.
74 /// The builder pattern allows you to specify details such as the topic, consumer name, subscription, subscription type, and other configurations before creating the final `Consumer` instance.
75 pub fn new_consumer(&self) -> ConsumerBuilder {
76 ConsumerBuilder::new(self)
77 }
78
79 /// Returns a reference to the `AuthService`.
80 ///
81 /// This method provides access to the `AuthService` instance used by the `DanubeClient`.
82 pub fn auth_service(&self) -> &AuthService {
83 &self.auth_service
84 }
85
86 /// Retrieves the address of the broker responsible for a specified topic.
87 ///
88 /// This asynchronous method performs a lookup to find the broker that is responsible for the given topic. The `addr` parameter specifies the address of the broker to connect to for performing the lookup. The method returns information about the broker handling the topic.
89 ///
90 /// # Parameters
91 ///
92 /// - `addr`: The address of the broker to connect to for the lookup. This is provided as a `&Uri`, which specifies where the request should be sent.
93 /// - `topic`: The name of the topic for which to look up the broker.
94 ///
95 /// # Returns
96 ///
97 /// - `Ok(LookupResult)`: Contains the result of the lookup operation, including the broker address.
98 /// - `Err(e)`: An error if the lookup fails or if there are issues during the operation. This could include connectivity problems, invalid topic names, or other errors related to the lookup process.
99 pub async fn lookup_topic(&self, addr: &Uri, topic: impl Into<String>) -> Result<LookupResult> {
100 self.lookup_service.lookup_topic(addr, topic).await
101 }
102
103 /// Retrieves the schema associated with a specified topic from the schema service.
104 ///
105 /// This asynchronous method fetches the schema for the given topic from the schema service. The schema describes the structure and format of the messages for the specified topic. The method returns the schema details or an error if the retrieval fails.
106 ///
107 /// # Parameters
108 ///
109 /// - `topic`: The name of the topic for which the schema is to be retrieved.
110 ///
111 /// # Returns
112 ///
113 /// - `Ok(Schema)`: The schema associated with the specified topic. This includes information about the schema type and its definition, if available.
114 /// - `Err(e)`: An error if the schema retrieval fails or if there are issues during the operation. This could include errors such as non-existent topics, connectivity issues, or internal service errors.
115 pub async fn get_schema(&self, topic: impl Into<String>) -> Result<Schema> {
116 self.schema_service.get_schema(&self.uri, topic).await
117 }
118}
119
120/// A builder for configuring and creating a `DanubeClient` instance.
121///
122/// The `DanubeClientBuilder` struct provides methods for setting various options needed to construct a `DanubeClient`. This includes configuring the base URI for the Danube service, connection settings.
123///
124/// # Fields
125///
126/// - `uri`: The base URI for the Danube service. This is a required field and specifies the address of the service that the client will connect to. It is essential for constructing the `DanubeClient`.
127/// - `connection_options`: Optional connection settings that define how the grpc client connects to the Danube service. These settings can include parameters such as timeouts, retries, and other connection-related configurations.
128#[derive(Debug, Clone, Default)]
129pub struct DanubeClientBuilder {
130 uri: String,
131 connection_options: ConnectionOptions,
132 api_key: Option<String>,
133}
134
135impl DanubeClientBuilder {
136 /// Sets the base URI for the Danube service in the builder.
137 ///
138 /// This method configures the base URI that the `DanubeClient` will use to connect to the Danube service. The base URI is a required parameter for establishing a connection and interacting with the service.
139 ///
140 /// # Parameters
141 ///
142 /// - `url`: The base URI to use for connecting to the Danube service. The URI should include the protocol and address of the Danube service.
143 pub fn service_url(mut self, url: impl Into<String>) -> Self {
144 self.uri = url.into();
145 self
146 }
147
148 /// Sets the TLS configuration for the client in the builder.
149 pub fn with_tls(mut self, ca_cert: impl AsRef<Path>) -> Result<Self> {
150 let tls_config =
151 ClientTlsConfig::new().ca_certificate(Certificate::from_pem(std::fs::read(ca_cert)?));
152 self.connection_options.tls_config = Some(tls_config);
153 self.connection_options.use_tls = true;
154 Ok(self)
155 }
156
157 /// Sets the mutual TLS configuration for the client in the builder.
158 pub fn with_mtls(
159 mut self,
160 ca_cert: impl AsRef<Path>,
161 client_cert: impl AsRef<Path>,
162 client_key: impl AsRef<Path>,
163 ) -> Result<Self> {
164 let ca_data = std::fs::read(ca_cert)?;
165 let cert_data = std::fs::read(client_cert)?;
166 let key_data = std::fs::read(client_key)?;
167
168 let tls_config = ClientTlsConfig::new()
169 .ca_certificate(Certificate::from_pem(ca_data))
170 .identity(Identity::from_pem(cert_data, key_data));
171
172 self.connection_options.tls_config = Some(tls_config);
173 self.connection_options.use_tls = true;
174 Ok(self)
175 }
176
177 /// Sets the API key for the client in the builder.
178 pub fn with_api_key(mut self, api_key: impl Into<String>) -> Self {
179 self.api_key = Some(api_key.into());
180 self.connection_options.use_tls = true; // API key automatically enables TLS
181 self
182 }
183
184 /// Constructs and returns a `DanubeClient` instance based on the configuration specified in the builder.
185 ///
186 /// This method finalizes the configuration and creates a new `DanubeClient` instance. It uses the settings and options that were configured using the `DanubeClientBuilder` methods.
187 ///
188 /// # Returns
189 ///
190 /// - `Ok(DanubeClient)`: A new instance of `DanubeClient` configured with the specified options.
191 /// - `Err(e)`: An error if the configuration is invalid or incomplete.
192 pub async fn build(mut self) -> Result<DanubeClient> {
193 let uri = self.uri.parse::<Uri>()?;
194 let cnx_manager = Arc::new(ConnectionManager::new(self.connection_options.clone()));
195 let auth_service = AuthService::new(cnx_manager.clone());
196
197 if let Some(api_key) = self.api_key.clone() {
198 self.connection_options.api_key = Some(api_key.clone());
199 let token = auth_service.authenticate_client(&uri, &api_key).await?;
200 self.connection_options.jwt_token = Some(token);
201 }
202
203 Ok(DanubeClient::new_client(self, uri))
204 }
205}