Skip to main content

danube_client/
client.rs

1use std::path::Path;
2use std::sync::Arc;
3use tonic::transport::{Certificate, ClientTlsConfig, Identity, Uri};
4
5use crate::{
6    auth_service::AuthService,
7    connection_manager::{BrokerAddress, ConnectionManager, ConnectionOptions},
8    consumer::ConsumerBuilder,
9    errors::Result,
10    health_check::HealthCheckService,
11    lookup_service::{LookupResult, LookupService},
12    producer::ProducerBuilder,
13    schema_registry_client::SchemaRegistryClient,
14};
15
16/// The main client for interacting with the Danube messaging system.
17///
18/// The `DanubeClient` struct is designed to facilitate communication with the Danube messaging system.
19/// It provides various methods for managing producers and consumers, performing topic lookups, and retrieving schema information. This client acts as the central interface for interacting with the messaging system and managing connections and services.
20#[derive(Debug, Clone)]
21pub struct DanubeClient {
22    pub(crate) uri: Uri,
23    pub(crate) cnx_manager: Arc<ConnectionManager>,
24    pub(crate) lookup_service: LookupService,
25    pub(crate) health_check_service: HealthCheckService,
26    pub(crate) auth_service: AuthService,
27}
28
29impl DanubeClient {
30    /// Initializes a new `DanubeClientBuilder` instance.
31    ///
32    /// The builder pattern allows for configuring and constructing a `DanubeClient` instance with optional settings and options.
33    /// Using the builder, you can customize various aspects of the `DanubeClient`, such as connection settings, timeouts, and other configurations before creating the final `DanubeClient` instance.
34    pub fn builder() -> DanubeClientBuilder {
35        DanubeClientBuilder::default()
36    }
37
38    /// Returns a new `ProducerBuilder` for configuring and creating a `Producer` instance.
39    ///
40    /// This method initializes a `ProducerBuilder`, which is used to set up various options and settings for a `Producer`.
41    /// The builder pattern allows you to specify details such as the topic, producer name, partitions, schema, and other configurations before creating the final `Producer` instance.
42    pub fn new_producer(&self) -> ProducerBuilder {
43        ProducerBuilder::new(self)
44    }
45
46    /// Returns a new `ConsumerBuilder` for configuring and creating a `Consumer` instance.
47    ///
48    /// This method initializes a `ConsumerBuilder`, which is used to set up various options and settings for a `Consumer`.
49    /// The builder pattern allows you to specify details such as the topic, consumer name, subscription, subscription type, and other configurations before creating the final `Consumer` instance.
50    pub fn new_consumer(&self) -> ConsumerBuilder {
51        ConsumerBuilder::new(self)
52    }
53
54    /// Returns a `SchemaRegistryClient` for schema registry operations.
55    ///
56    /// The returned client shares the same connection manager and auth service as the `DanubeClient`
57    pub fn schema(&self) -> SchemaRegistryClient {
58        SchemaRegistryClient::new(
59            self.cnx_manager.clone(),
60            self.auth_service.clone(),
61            self.uri.clone(),
62        )
63    }
64
65    /// Returns a reference to the `AuthService`.
66    ///
67    /// This method provides access to the `AuthService` instance used by the `DanubeClient`.
68    pub fn auth_service(&self) -> &AuthService {
69        &self.auth_service
70    }
71
72    /// Retrieves the address of the broker responsible for a specified topic.
73    ///
74    /// This asynchronous method performs a lookup to find the broker that is responsible for the given topic. The `addr` parameter specifies the address of the broker to connect to for performing the lookup. The method returns information about the broker handling the topic.
75    ///
76    /// # Parameters
77    ///
78    /// - `addr`: The address of the broker to connect to for the lookup. This is provided as a `&Uri`, which specifies where the request should be sent.
79    /// - `topic`: The name of the topic for which to look up the broker.
80    ///
81    /// # Returns
82    ///
83    /// - `Ok(LookupResult)`: Contains the result of the lookup operation, including the broker address.
84    /// - `Err(e)`: An error if the lookup fails or if there are issues during the operation. This could include connectivity problems, invalid topic names, or other errors related to the lookup process.
85    pub async fn lookup_topic(&self, addr: &Uri, topic: impl Into<String>) -> Result<LookupResult> {
86        self.lookup_service.lookup_topic(addr, topic).await
87    }
88
89    /// Resolve which broker owns a topic in the cluster.
90    ///
91    /// Performs a topic lookup via the Discovery service and follows
92    /// redirects to find the owning broker. Returns the broker address
93    /// that the caller should connect to for this topic.
94    ///
95    /// This is the recommended way for edge brokers and external services
96    /// to discover topic-to-broker assignment without needing direct access
97    /// to internal `LookupService` or `ConnectionManager`.
98    pub async fn resolve_topic_broker(&self, topic: &str) -> Result<BrokerAddress> {
99        self.lookup_service.handle_lookup(&self.uri, topic).await
100    }
101
102    /// Get a gRPC channel to a specific broker.
103    ///
104    /// Uses the client's internal connection pool with TLS/mTLS already
105    /// configured (from `DanubeClientBuilder`). Connections are cached —
106    /// repeated calls for the same broker return the same channel.
107    ///
108    /// The returned `Channel` can be used to construct any gRPC service
109    /// client (e.g., `EdgeReplicatorServiceClient::new(channel)`).
110    pub async fn get_broker_channel(
111        &self,
112        broker: &BrokerAddress,
113    ) -> Result<tonic::transport::Channel> {
114        let cnx = self
115            .cnx_manager
116            .get_connection(&broker.broker_url, &broker.connect_url)
117            .await?;
118        Ok(cnx.grpc_cnx.clone())
119    }
120}
121
122/// A builder for configuring and creating a `DanubeClient` instance.
123///
124/// The `DanubeClientBuilder` struct provides methods for setting various options needed to construct a `DanubeClient`. This includes configuring the base URI for the Danube service, connection settings.
125///
126/// # Fields
127///
128/// - `uri`: The base URI for the Danube service. This is a required field and specifies the address of the service that the client will connect to. It is essential for constructing the `DanubeClient`.
129/// - `connection_options`: Optional connection settings that define how the grpc client connects to the Danube service. These settings can include parameters such as timeouts, retries, and other connection-related configurations.
130#[derive(Debug, Clone, Default)]
131pub struct DanubeClientBuilder {
132    uri: String,
133    connection_options: ConnectionOptions,
134    token: Option<String>,
135    internal_broker: Option<String>,
136}
137
138impl DanubeClientBuilder {
139    /// Sets the base URI for the Danube service in the builder.
140    ///
141    /// This method configures the base URI that the `DanubeClient` will use to connect to the Danube service. The base URI is a required parameter for establishing a connection and interacting with the service.
142    ///
143    /// # Parameters
144    ///
145    /// - `url`: The base URI to use for connecting to the Danube service. The URI should include the protocol and address of the Danube service.
146    pub fn service_url(mut self, url: impl Into<String>) -> Self {
147        self.uri = url.into();
148        self
149    }
150
151    /// Sets the TLS configuration for the client in the builder.
152    pub fn with_tls(mut self, ca_cert: impl AsRef<Path>) -> Result<Self> {
153        let tls_config =
154            ClientTlsConfig::new().ca_certificate(Certificate::from_pem(std::fs::read(ca_cert)?));
155        self.connection_options.tls_config = Some(tls_config);
156        self.connection_options.use_tls = true;
157        Ok(self)
158    }
159
160    /// Sets the mutual TLS configuration for the client in the builder.
161    pub fn with_mtls(
162        mut self,
163        ca_cert: impl AsRef<Path>,
164        client_cert: impl AsRef<Path>,
165        client_key: impl AsRef<Path>,
166    ) -> Result<Self> {
167        let ca_data = std::fs::read(ca_cert)?;
168        let cert_data = std::fs::read(client_cert)?;
169        let key_data = std::fs::read(client_key)?;
170
171        let tls_config = ClientTlsConfig::new()
172            .ca_certificate(Certificate::from_pem(ca_data))
173            .identity(Identity::from_pem(cert_data, key_data));
174
175        self.connection_options.tls_config = Some(tls_config);
176        self.connection_options.use_tls = true;
177        Ok(self)
178    }
179
180    /// Sets the authentication token (JWT) for the client.
181    ///
182    /// Use `danube-admin security tokens create` to generate a token.
183    /// Automatically enables TLS. If no TLS config has been set via `with_tls()` or
184    /// `with_mtls()`, a default TLS config using system root certificates is applied.
185    ///
186    /// For tokens that expire, consider [`with_token_supplier`](Self::with_token_supplier)
187    /// instead, which allows runtime token refresh.
188    pub fn with_token(mut self, token: impl Into<String>) -> Self {
189        self.token = Some(token.into());
190        if self.connection_options.tls_config.is_none() {
191            self.connection_options.tls_config = Some(ClientTlsConfig::new());
192        }
193        self.connection_options.use_tls = true;
194        self
195    }
196
197    /// Sets a dynamic token supplier for the client.
198    ///
199    /// The supplier function is called on **every gRPC request** to obtain the current
200    /// token, enabling runtime token refresh without restarting the client. This is
201    /// useful for:
202    ///
203    /// - **File-based tokens**: Read from a file that is updated by infrastructure
204    ///   (e.g., K8s projected volumes, sidecar token refreshers)
205    /// - **Environment-based tokens**: Read from an environment variable
206    /// - **Custom refresh logic**: Implement your own token rotation
207    ///
208    /// Automatically enables TLS (same as `with_token`).
209    ///
210    /// # Examples
211    ///
212    /// ```rust,no_run
213    /// // Read token from a file on each request
214    /// let client = DanubeClient::builder()
215    ///     .service_url("https://broker:6650")
216    ///     .with_token_supplier(|| {
217    ///         std::fs::read_to_string("/var/run/secrets/danube-token")
218    ///             .unwrap_or_default()
219    ///             .trim()
220    ///             .to_string()
221    ///     })
222    ///     .build()
223    ///     .await?;
224    /// ```
225    pub fn with_token_supplier(mut self, supplier: impl Fn() -> String + Send + Sync + 'static) -> Self {
226        self.connection_options.token_supplier = Some(Arc::new(supplier));
227        if self.connection_options.tls_config.is_none() {
228            self.connection_options.tls_config = Some(ClientTlsConfig::new());
229        }
230        self.connection_options.use_tls = true;
231        self
232    }
233
234    /// Deprecated: Use `with_token()` instead. This method treats the input as a JWT token.
235    #[deprecated(
236        since = "0.11.0",
237        note = "Use with_token() — Danube now uses JWT tokens instead of API keys"
238    )]
239    pub fn with_api_key(self, api_key: impl Into<String>) -> Self {
240        self.with_token(api_key)
241    }
242
243    /// Sets the internal broker identity for the client in the builder.
244    pub fn with_internal_broker(mut self, broker_name: impl Into<String>) -> Self {
245        self.internal_broker = Some(broker_name.into());
246        self
247    }
248
249    /// Constructs and returns a `DanubeClient` instance based on the configuration specified in the builder.
250    ///
251    /// This method finalizes the configuration and creates a new `DanubeClient` instance. It uses the settings and options that were configured using the `DanubeClientBuilder` methods.
252    ///
253    /// # Returns
254    ///
255    /// - `Ok(DanubeClient)`: A new instance of `DanubeClient` configured with the specified options.
256    /// - `Err(e)`: An error if the configuration is invalid or incomplete.
257    pub async fn build(mut self) -> Result<DanubeClient> {
258        let uri = self.uri.parse::<Uri>()?;
259
260        // Set token on connection_options before creating the single ConnectionManager
261        if let Some(ref token) = self.token {
262            self.connection_options.token = Some(token.clone());
263        }
264
265        if let Some(ref internal_broker) = self.internal_broker {
266            self.connection_options.internal_broker = Some(internal_broker.clone());
267        }
268
269        let cnx_manager = Arc::new(ConnectionManager::new(self.connection_options));
270        let auth_service = AuthService::new(cnx_manager.clone());
271
272        let lookup_service = LookupService::new(cnx_manager.clone(), auth_service.clone());
273        let health_check_service =
274            HealthCheckService::new(cnx_manager.clone(), auth_service.clone());
275
276        Ok(DanubeClient {
277            uri,
278            cnx_manager,
279            lookup_service,
280            health_check_service,
281            auth_service,
282        })
283    }
284}