Skip to main content

rustfs_kafka_async/
client.rs

1//! Async Kafka client for metadata and connection management.
2
3use rustfs_kafka::client::SecurityConfig;
4use rustfs_kafka::error::{ConnectionError, Error, Result};
5use tokio::task::JoinSet;
6use tracing::{debug, info};
7
8use crate::connection::{AsyncConnection, AsyncConnectionPool};
9
10/// An async Kafka client for bootstrap and connection management.
11///
12/// This lightweight client manages a pool of [`AsyncConnection`]s and is
13/// intended to be used by other async wrappers (producer/consumer) to obtain
14/// connections to brokers without imposing `Sync`/`'static` constraints on the
15/// higher-level code. It will attempt to connect to the provided bootstrap
16/// hosts on creation (unless the host list is empty), but will not continuously
17/// maintain metadata — callers can use [`ensure_connected`] to trigger a
18/// reconnection to bootstrap hosts when necessary.
19pub struct AsyncKafkaClient {
20    pool: AsyncConnectionPool,
21    bootstrap_hosts: Vec<String>,
22    client_id: String,
23    security: Option<SecurityConfig>,
24}
25
26impl AsyncKafkaClient {
27    /// Creates a new async client and connects to the bootstrap brokers.
28    pub async fn new(hosts: Vec<String>) -> Result<Self> {
29        Self::with_client_id_and_security(hosts, "rustfs-kafka-async".to_owned(), None).await
30    }
31
32    /// Creates a new async client with a specific client ID.
33    ///
34    /// Attempts to connect to the provided `hosts` in order until a
35    /// connection succeeds. If no hosts are reachable and the `hosts` list is
36    /// non-empty, an error `Error::Connection(ConnectionError::NoHostReachable)`
37    /// is returned.
38    pub async fn with_client_id(hosts: Vec<String>, client_id: String) -> Result<Self> {
39        Self::with_client_id_and_security(hosts, client_id, None).await
40    }
41
42    /// Creates a new async client with optional TLS security.
43    pub async fn with_client_id_and_security(
44        hosts: Vec<String>,
45        client_id: String,
46        security: Option<SecurityConfig>,
47    ) -> Result<Self> {
48        let mut pool = AsyncConnectionPool::new_with_security(security.clone());
49        let connected = connect_any_bootstrap(&mut pool, &hosts, security.as_ref()).await;
50
51        if !connected && !hosts.is_empty() {
52            return Err(Error::Connection(ConnectionError::NoHostReachable));
53        }
54
55        info!(
56            "AsyncKafkaClient created with {} bootstrap hosts",
57            hosts.len()
58        );
59
60        Ok(Self {
61            pool,
62            bootstrap_hosts: hosts,
63            client_id,
64            security,
65        })
66    }
67
68    /// Returns the client ID.
69    #[must_use]
70    pub fn client_id(&self) -> &str {
71        &self.client_id
72    }
73
74    /// Returns the bootstrap hosts.
75    #[must_use]
76    pub fn bootstrap_hosts(&self) -> &[String] {
77        &self.bootstrap_hosts
78    }
79
80    /// Returns the configured optional security settings.
81    #[must_use]
82    pub fn security(&self) -> Option<&SecurityConfig> {
83        self.security.as_ref()
84    }
85
86    /// Gets (or creates) a mutable reference to a connection for `host`.
87    ///
88    /// If a connection for `host` does not yet exist, the underlying
89    /// [`AsyncConnection::connect`] is attempted and the connection is stored in
90    /// the internal pool. The returned reference is tied to the mutable
91    /// borrow of `self` and therefore short-lived.
92    pub async fn get_connection(&mut self, host: &str) -> Result<&mut AsyncConnection> {
93        self.pool.get(host).await
94    }
95
96    /// Gets the list of currently connected hosts.
97    ///
98    /// This returns the host addresses for which there is an established
99    /// connection in the internal pool. The returned `Vec<&str>` is a snapshot
100    /// of the current keys and does not hold any borrow on `self` afterwards.
101    #[must_use]
102    pub fn connected_hosts(&self) -> Vec<&str> {
103        self.pool.hosts()
104    }
105
106    /// Ensures the client has at least one active connection.
107    ///
108    /// If the client was created with bootstrap hosts and the internal pool is
109    /// currently empty, this will attempt to connect to the bootstrap hosts in
110    /// order until one succeeds. It is a no-op when `bootstrap_hosts` is empty
111    /// or when the pool already contains connections.
112    pub async fn ensure_connected(&mut self) -> Result<()> {
113        if !self.bootstrap_hosts.is_empty() && self.pool.hosts().is_empty() {
114            let security = self.security.clone();
115            let connected =
116                connect_any_bootstrap(&mut self.pool, &self.bootstrap_hosts, security.as_ref())
117                    .await;
118            if !connected {
119                return Err(Error::Connection(ConnectionError::NoHostReachable));
120            }
121        }
122        Ok(())
123    }
124}
125
126async fn connect_any_bootstrap(
127    pool: &mut AsyncConnectionPool,
128    hosts: &[String],
129    security: Option<&SecurityConfig>,
130) -> bool {
131    let mut set = JoinSet::new();
132    for host in hosts {
133        let host = host.clone();
134        let security = security.cloned();
135        set.spawn(async move {
136            let connection =
137                crate::connection::AsyncConnection::connect(&host, security.as_ref()).await;
138            (host, connection)
139        });
140    }
141
142    while let Some(joined) = set.join_next().await {
143        match joined {
144            Ok((host, Ok(connection))) => {
145                pool.insert(host, connection);
146                return true;
147            }
148            Ok((host, Err(e))) => {
149                debug!("Failed to connect to {}: {}", host, e);
150            }
151            Err(e) => {
152                debug!("Bootstrap connect task failed to join: {}", e);
153            }
154        }
155    }
156
157    false
158}
159
160#[cfg(test)]
161mod tests {
162    use rustfs_kafka::error::{ConnectionError, Error};
163
164    use super::*;
165
166    #[tokio::test]
167    async fn new_with_empty_hosts_succeeds() {
168        let result = AsyncKafkaClient::new(vec![]).await;
169        assert!(result.is_ok());
170        let client = result.unwrap();
171        assert!(client.bootstrap_hosts().is_empty());
172        assert!(client.connected_hosts().is_empty());
173    }
174
175    #[tokio::test]
176    async fn new_with_unreachable_hosts_returns_error() {
177        let result = AsyncKafkaClient::new(vec!["127.0.0.1:1".to_owned()]).await;
178        assert!(matches!(
179            result,
180            Err(Error::Connection(ConnectionError::NoHostReachable))
181        ));
182    }
183
184    #[tokio::test]
185    async fn with_client_id_unreachable_returns_error() {
186        let result = AsyncKafkaClient::with_client_id(
187            vec!["127.0.0.1:1".to_owned()],
188            "my-custom-client".to_owned(),
189        )
190        .await;
191        assert!(matches!(
192            result,
193            Err(Error::Connection(ConnectionError::NoHostReachable))
194        ));
195    }
196
197    #[tokio::test]
198    async fn ensure_connected_with_empty_hosts_is_ok() {
199        let client = AsyncKafkaClient {
200            pool: AsyncConnectionPool::new(),
201            bootstrap_hosts: vec![],
202            client_id: "test".to_owned(),
203            security: None,
204        };
205        // ensure_connected with empty bootstrap_hosts is a no-op
206        assert!(client.bootstrap_hosts.is_empty());
207        assert!(client.connected_hosts().is_empty());
208    }
209}