use rustfs_kafka::client::SecurityConfig;
use rustfs_kafka::error::{ConnectionError, Error, Result};
use tokio::task::JoinSet;
use tracing::{debug, info};
use crate::connection::{AsyncConnection, AsyncConnectionPool};
pub struct AsyncKafkaClient {
pool: AsyncConnectionPool,
bootstrap_hosts: Vec<String>,
client_id: String,
security: Option<SecurityConfig>,
}
impl AsyncKafkaClient {
pub async fn new(hosts: Vec<String>) -> Result<Self> {
Self::with_client_id_and_security(hosts, "rustfs-kafka-async".to_owned(), None).await
}
pub async fn with_client_id(hosts: Vec<String>, client_id: String) -> Result<Self> {
Self::with_client_id_and_security(hosts, client_id, None).await
}
pub async fn with_client_id_and_security(
hosts: Vec<String>,
client_id: String,
security: Option<SecurityConfig>,
) -> Result<Self> {
let mut pool = AsyncConnectionPool::new_with_security(security.clone());
let connected = connect_any_bootstrap(&mut pool, &hosts, security.as_ref()).await;
if !connected && !hosts.is_empty() {
return Err(Error::Connection(ConnectionError::NoHostReachable));
}
info!(
"AsyncKafkaClient created with {} bootstrap hosts",
hosts.len()
);
Ok(Self {
pool,
bootstrap_hosts: hosts,
client_id,
security,
})
}
#[must_use]
pub fn client_id(&self) -> &str {
&self.client_id
}
#[must_use]
pub fn bootstrap_hosts(&self) -> &[String] {
&self.bootstrap_hosts
}
#[must_use]
pub fn security(&self) -> Option<&SecurityConfig> {
self.security.as_ref()
}
pub async fn get_connection(&mut self, host: &str) -> Result<&mut AsyncConnection> {
self.pool.get(host).await
}
#[must_use]
pub fn connected_hosts(&self) -> Vec<&str> {
self.pool.hosts()
}
pub async fn ensure_connected(&mut self) -> Result<()> {
if !self.bootstrap_hosts.is_empty() && self.pool.hosts().is_empty() {
let security = self.security.clone();
let connected =
connect_any_bootstrap(&mut self.pool, &self.bootstrap_hosts, security.as_ref())
.await;
if !connected {
return Err(Error::Connection(ConnectionError::NoHostReachable));
}
}
Ok(())
}
}
async fn connect_any_bootstrap(
pool: &mut AsyncConnectionPool,
hosts: &[String],
security: Option<&SecurityConfig>,
) -> bool {
let mut set = JoinSet::new();
for host in hosts {
let host = host.clone();
let security = security.cloned();
set.spawn(async move {
let connection =
crate::connection::AsyncConnection::connect(&host, security.as_ref()).await;
(host, connection)
});
}
while let Some(joined) = set.join_next().await {
match joined {
Ok((host, Ok(connection))) => {
pool.insert(host, connection);
return true;
}
Ok((host, Err(e))) => {
debug!("Failed to connect to {}: {}", host, e);
}
Err(e) => {
debug!("Bootstrap connect task failed to join: {}", e);
}
}
}
false
}
#[cfg(test)]
mod tests {
use rustfs_kafka::error::{ConnectionError, Error};
use super::*;
#[tokio::test]
async fn new_with_empty_hosts_succeeds() {
let result = AsyncKafkaClient::new(vec![]).await;
assert!(result.is_ok());
let client = result.unwrap();
assert!(client.bootstrap_hosts().is_empty());
assert!(client.connected_hosts().is_empty());
}
#[tokio::test]
async fn new_with_unreachable_hosts_returns_error() {
let result = AsyncKafkaClient::new(vec!["127.0.0.1:1".to_owned()]).await;
assert!(matches!(
result,
Err(Error::Connection(ConnectionError::NoHostReachable))
));
}
#[tokio::test]
async fn with_client_id_unreachable_returns_error() {
let result = AsyncKafkaClient::with_client_id(
vec!["127.0.0.1:1".to_owned()],
"my-custom-client".to_owned(),
)
.await;
assert!(matches!(
result,
Err(Error::Connection(ConnectionError::NoHostReachable))
));
}
#[tokio::test]
async fn ensure_connected_with_empty_hosts_is_ok() {
let client = AsyncKafkaClient {
pool: AsyncConnectionPool::new(),
bootstrap_hosts: vec![],
client_id: "test".to_owned(),
security: None,
};
assert!(client.bootstrap_hosts.is_empty());
assert!(client.connected_hosts().is_empty());
}
}