ds_event_stream_rs_sdk/utils/mod.rs
1//! Utils module.
2//!
3//! ## Overview
4//! This module contains utility functions for the DS Event Stream.
5//!
6//! ## Features
7//! * Get the topics for the DS Event Stream.
8//!
9//! ### Example
10//! ```no_run
11//! use ds_event_stream_rs_sdk::utils::{list_topics, get_topic, get_bootstrap_servers, Environment, ClientCredentials};
12//!
13//! let bootstrap_servers = get_bootstrap_servers(Environment::Development, false);
14//! let credentials = ClientCredentials { username: "username".to_string(), password: "password".to_string() };
15//!
16//! let topic = get_topic(&bootstrap_servers, &credentials, "topic_name");
17//! let topics = list_topics(&bootstrap_servers, &credentials);
18//! ```
19pub mod error;
20
21use rdkafka::admin::AdminClient;
22use rdkafka::client::DefaultClientContext;
23use rdkafka::config::ClientConfig;
24use tracing::debug;
25
26use crate::error::{Result, SDKError};
27
28use error::UtilsError;
29
30// region: --> helpers
31
32// Bootstrap server constants
33const DEV_INTERNAL_BOOTSTRAP: &str = "kafka-broker-0.kafka-broker-headless.kafka-dev.svc.cluster.local:9092";
34const DEV_EXTERNAL_BOOTSTRAP: &str = "b0.dev.kafka.ds.local:9095";
35const PROD_INTERNAL_BOOTSTRAP: &str = "kafka-broker-0.kafka-broker-headless.kafka.svc.cluster.local:9092,kafka-broker-1.kafka-broker-headless.kafka.svc.cluster.local:9092,kafka-broker-2.kafka-broker-headless.kafka.svc.cluster.local:9092";
36const PROD_EXTERNAL_BOOTSTRAP: &str = "b0.kafka.ds.local:9095,b1.kafka.ds.local:9095,b2.kafka.ds.local:9095";
37
38/// Client credentials for Kafka authentication.
39///
40/// # Arguments
41///
42/// * `username` - The username to use for authentication
43/// * `password` - The password to use for authentication
44///
45#[derive(Debug, Clone)]
46pub struct ClientCredentials {
47 pub username: String,
48 pub password: String,
49}
50
51/// The environment of the DS Event Stream.
52///
53/// # Arguments
54///
55/// * `env` - The environment to get the environment for.
56///
57/// # Returns
58///
59/// * `Environment` - The environment of the DS Event Stream.
60///
61#[derive(Debug, Clone)]
62pub enum Environment {
63 Development,
64 Production,
65}
66
67/// Get the bootstrap servers for the DS Event Stream.
68///
69/// # Arguments
70///
71/// * `env` - The environment to get the bootstrap servers for.
72/// * `use_internal_hostnames` - Whether to use the internal hostnames for the bootstrap servers.
73///
74/// # Returns
75///
76/// * `String` - The bootstrap servers for the DS Event Stream.
77///
78pub fn get_bootstrap_servers(env: Environment, use_internal_hostnames: bool) -> String {
79 match env {
80 Environment::Development => {
81 if use_internal_hostnames {
82 DEV_INTERNAL_BOOTSTRAP.to_string()
83 } else {
84 DEV_EXTERNAL_BOOTSTRAP.to_string()
85 }
86 }
87 Environment::Production => {
88 if use_internal_hostnames {
89 PROD_INTERNAL_BOOTSTRAP.to_string()
90 } else {
91 PROD_EXTERNAL_BOOTSTRAP.to_string()
92 }
93 }
94 }
95}
96
97/// Create an admin client.
98///
99/// # Arguments
100///
101/// * `bootstrap_servers` - The bootstrap servers to create the admin client for.
102/// * `username` - The username for the DS Event Stream.
103/// * `password` - The password for the DS Event Stream.
104///
105/// # Returns
106///
107/// * `AdminClient<DefaultClientContext>` - The admin client.
108///
109/// # Errors
110///
111/// * [`UtilsError::Kafka`] - If the Kafka client fails to create.
112///
113fn create_admin_client(
114 bootstrap_servers: &str,
115 credentials: &ClientCredentials,
116) -> Result<AdminClient<DefaultClientContext>> {
117 ClientConfig::new()
118 .set("bootstrap.servers", bootstrap_servers)
119 .set("session.timeout.ms", "6000")
120 .set("request.timeout.ms", "30000")
121 .set("connections.max.idle.ms", "540000")
122 .set("metadata.max.age.ms", "300000")
123 .set("security.protocol", "SASL_PLAINTEXT")
124 .set("sasl.mechanism", "SCRAM-SHA-512")
125 .set("sasl.username", credentials.username.clone())
126 .set("sasl.password", credentials.password.clone())
127 .create()
128 .map_err(|e| SDKError::from(UtilsError::Kafka(e)))
129}
130
131// endregion: --> helpers
132
133// region: --> Utils
134
135/// Get the name of a specific topic.
136///
137/// # Arguments
138///
139/// * `bootstrap_servers` - The bootstrap servers to get the topic metadata for.
140/// * `credentials` - The credentials for the DS Event Stream.
141/// * `topic_name` - The name of the topic to get metadata for.
142///
143/// # Returns
144///
145/// * `String` - The topic name if found.
146///
147/// # Errors
148///
149/// * [`UtilsError::Kafka`] - If the Kafka client fails to fetch metadata.
150/// * [`UtilsError::TopicNotFound`] - If the topic is not found.
151///
152pub fn get_topic(bootstrap_servers: &str, credentials: &ClientCredentials, topic_name: &str) -> Result<String> {
153 debug!("Getting topic metadata for topic: {}", topic_name);
154 let admin: AdminClient<DefaultClientContext> = create_admin_client(bootstrap_servers, credentials)?;
155 let metadata = admin
156 .inner()
157 .fetch_metadata(Some(topic_name), std::time::Duration::from_secs(10))
158 .map_err(UtilsError::Kafka)?;
159
160 let result = metadata
161 .topics()
162 .iter()
163 .find(|topic| topic.name() == topic_name)
164 .map(|topic| topic.name().to_string());
165
166 match result {
167 Some(topic_name) => Ok(topic_name),
168 None => Err(SDKError::from(UtilsError::TopicNotFound {
169 topic_name: topic_name.to_string(),
170 client_id: credentials.username.to_string(),
171 })),
172 }
173}
174
175/// List the topics for the DS Event Stream.
176///
177/// # Arguments
178///
179/// * `bootstrap_servers` - The bootstrap servers to list the topics for.
180/// * `credentials` - The credentials for the DS Event Stream.
181///
182/// # Returns
183///
184/// * `Vec<String>` - The topic names for the DS Event Stream.
185///
186/// # Errors
187///
188/// * [`UtilsError::Kafka`] - If the Kafka client fails to fetch metadata.
189/// * [`UtilsError::TopicsNotFound`] - If the topics are not found.
190///
191pub fn list_topics(bootstrap_servers: &str, credentials: &ClientCredentials) -> Result<Vec<String>> {
192 debug!("Getting topics for the DS Event Stream");
193 let admin: AdminClient<DefaultClientContext> = create_admin_client(bootstrap_servers, credentials)?;
194 let metadata = admin
195 .inner()
196 .fetch_metadata(None, std::time::Duration::from_secs(10))
197 .map_err(UtilsError::Kafka)?;
198
199 let topics: Vec<String> = metadata.topics().iter().map(|topic| topic.name().to_string()).collect();
200 if topics.is_empty() {
201 return Err(SDKError::from(UtilsError::TopicsNotFound {
202 client_id: credentials.username.to_string(),
203 }));
204 }
205
206 Ok(topics)
207}
208
209// endregion: --> Utils