rustfs_kafka_async/
client.rs1use rustfs_kafka::client::SecurityConfig;
4use rustfs_kafka::error::{ConnectionError, Error, Result};
5use tokio::task::JoinSet;
6use tracing::{debug, info};
7
8use crate::connection::{AsyncConnection, AsyncConnectionPool};
9
10pub struct AsyncKafkaClient {
20 pool: AsyncConnectionPool,
21 bootstrap_hosts: Vec<String>,
22 client_id: String,
23 security: Option<SecurityConfig>,
24}
25
26impl AsyncKafkaClient {
27 pub async fn new(hosts: Vec<String>) -> Result<Self> {
29 Self::with_client_id_and_security(hosts, "rustfs-kafka-async".to_owned(), None).await
30 }
31
32 pub async fn with_client_id(hosts: Vec<String>, client_id: String) -> Result<Self> {
39 Self::with_client_id_and_security(hosts, client_id, None).await
40 }
41
42 pub async fn with_client_id_and_security(
44 hosts: Vec<String>,
45 client_id: String,
46 security: Option<SecurityConfig>,
47 ) -> Result<Self> {
48 let mut pool = AsyncConnectionPool::new_with_security(security.clone());
49 let connected = connect_any_bootstrap(&mut pool, &hosts, security.as_ref()).await;
50
51 if !connected && !hosts.is_empty() {
52 return Err(Error::Connection(ConnectionError::NoHostReachable));
53 }
54
55 info!(
56 "AsyncKafkaClient created with {} bootstrap hosts",
57 hosts.len()
58 );
59
60 Ok(Self {
61 pool,
62 bootstrap_hosts: hosts,
63 client_id,
64 security,
65 })
66 }
67
68 #[must_use]
70 pub fn client_id(&self) -> &str {
71 &self.client_id
72 }
73
74 #[must_use]
76 pub fn bootstrap_hosts(&self) -> &[String] {
77 &self.bootstrap_hosts
78 }
79
80 #[must_use]
82 pub fn security(&self) -> Option<&SecurityConfig> {
83 self.security.as_ref()
84 }
85
86 pub async fn get_connection(&mut self, host: &str) -> Result<&mut AsyncConnection> {
93 self.pool.get(host).await
94 }
95
96 #[must_use]
102 pub fn connected_hosts(&self) -> Vec<&str> {
103 self.pool.hosts()
104 }
105
106 pub async fn ensure_connected(&mut self) -> Result<()> {
113 if !self.bootstrap_hosts.is_empty() && self.pool.hosts().is_empty() {
114 let security = self.security.clone();
115 let connected =
116 connect_any_bootstrap(&mut self.pool, &self.bootstrap_hosts, security.as_ref())
117 .await;
118 if !connected {
119 return Err(Error::Connection(ConnectionError::NoHostReachable));
120 }
121 }
122 Ok(())
123 }
124}
125
126async fn connect_any_bootstrap(
127 pool: &mut AsyncConnectionPool,
128 hosts: &[String],
129 security: Option<&SecurityConfig>,
130) -> bool {
131 let mut set = JoinSet::new();
132 for host in hosts {
133 let host = host.clone();
134 let security = security.cloned();
135 set.spawn(async move {
136 let connection =
137 crate::connection::AsyncConnection::connect(&host, security.as_ref()).await;
138 (host, connection)
139 });
140 }
141
142 while let Some(joined) = set.join_next().await {
143 match joined {
144 Ok((host, Ok(connection))) => {
145 pool.insert(host, connection);
146 return true;
147 }
148 Ok((host, Err(e))) => {
149 debug!("Failed to connect to {}: {}", host, e);
150 }
151 Err(e) => {
152 debug!("Bootstrap connect task failed to join: {}", e);
153 }
154 }
155 }
156
157 false
158}
159
160#[cfg(test)]
161mod tests {
162 use rustfs_kafka::error::{ConnectionError, Error};
163
164 use super::*;
165
166 #[tokio::test]
167 async fn new_with_empty_hosts_succeeds() {
168 let result = AsyncKafkaClient::new(vec![]).await;
169 assert!(result.is_ok());
170 let client = result.unwrap();
171 assert!(client.bootstrap_hosts().is_empty());
172 assert!(client.connected_hosts().is_empty());
173 }
174
175 #[tokio::test]
176 async fn new_with_unreachable_hosts_returns_error() {
177 let result = AsyncKafkaClient::new(vec!["127.0.0.1:1".to_owned()]).await;
178 assert!(matches!(
179 result,
180 Err(Error::Connection(ConnectionError::NoHostReachable))
181 ));
182 }
183
184 #[tokio::test]
185 async fn with_client_id_unreachable_returns_error() {
186 let result = AsyncKafkaClient::with_client_id(
187 vec!["127.0.0.1:1".to_owned()],
188 "my-custom-client".to_owned(),
189 )
190 .await;
191 assert!(matches!(
192 result,
193 Err(Error::Connection(ConnectionError::NoHostReachable))
194 ));
195 }
196
197 #[tokio::test]
198 async fn ensure_connected_with_empty_hosts_is_ok() {
199 let client = AsyncKafkaClient {
200 pool: AsyncConnectionPool::new(),
201 bootstrap_hosts: vec![],
202 client_id: "test".to_owned(),
203 security: None,
204 };
205 assert!(client.bootstrap_hosts.is_empty());
207 assert!(client.connected_hosts().is_empty());
208 }
209}