Skip to main content

crabka_client_core/
pool.rs

1//! `BrokerPool`: a `DashMap<broker_id, Arc<Connection>>` with lazy
2//! connect on first use.
3
4use std::net::SocketAddr;
5use std::sync::Arc;
6
7use dashmap::DashMap;
8
9use crate::connection::{Connection, ConnectionOptions};
10use crate::error::ClientError;
11
12/// Information about a single Kafka broker, as reported by a `MetadataResponse`.
13#[derive(Debug, Clone)]
14pub struct BrokerInfo {
15    pub id: i32,
16    pub host: String,
17    pub port: i32,
18    pub rack: Option<String>,
19}
20
21/// Pool of `Arc<Connection>` keyed by broker id. Connections are opened lazily
22/// on first use and cached thereafter.
23pub struct BrokerPool {
24    by_id: DashMap<i32, Arc<Connection>>,
25    by_addr: DashMap<i32, SocketAddr>,
26    bootstrap: Vec<SocketAddr>,
27    options: ConnectionOptions,
28}
29
30impl BrokerPool {
31    /// Create a new pool with the given bootstrap addresses and connection options.
32    #[must_use]
33    pub fn new(bootstrap: Vec<SocketAddr>, options: ConnectionOptions) -> Self {
34        Self {
35            by_id: DashMap::new(),
36            by_addr: DashMap::new(),
37            bootstrap,
38            options,
39        }
40    }
41
42    /// Get-or-connect to a specific broker id. The pool must have already
43    /// learned the (id, address) mapping via [`refresh_brokers`].
44    ///
45    /// [`refresh_brokers`]: BrokerPool::refresh_brokers
46    pub async fn get(&self, broker_id: i32) -> Result<Arc<Connection>, ClientError> {
47        if let Some(entry) = self.by_id.get(&broker_id) {
48            return Ok(entry.clone());
49        }
50        let addr = self
51            .by_addr
52            .get(&broker_id)
53            .map(|e| *e)
54            .ok_or(ClientError::Disconnected)?;
55        let conn = Arc::new(Connection::connect_with_options(addr, self.options.clone()).await?);
56        self.by_id.insert(broker_id, conn.clone());
57        Ok(conn)
58    }
59
60    /// Get-or-connect to the first reachable bootstrap address. The bootstrap
61    /// connection is cached under the synthetic broker id `-1`.
62    pub async fn bootstrap_connection(&self) -> Result<Arc<Connection>, ClientError> {
63        const BOOTSTRAP_ID: i32 = -1;
64        if let Some(entry) = self.by_id.get(&BOOTSTRAP_ID) {
65            return Ok(entry.clone());
66        }
67        let mut last_err: Option<ClientError> = None;
68        for addr in &self.bootstrap {
69            match Connection::connect_with_options(*addr, self.options.clone()).await {
70                Ok(c) => {
71                    let arc = Arc::new(c);
72                    self.by_id.insert(BOOTSTRAP_ID, arc.clone());
73                    return Ok(arc);
74                }
75                Err(e) => last_err = Some(e),
76            }
77        }
78        Err(last_err.unwrap_or(ClientError::Disconnected))
79    }
80
81    /// Update the (id, addr) address registry from a list of brokers, typically
82    /// sourced from a `MetadataResponse`. Does not open any new connections.
83    ///
84    /// Brokers advertising port `0` are skipped: that is not a dialable address
85    /// (it shows up for in-process test brokers whose advertised port never got
86    /// rewritten to the real bound port). Leaving such an entry out means
87    /// [`get`](BrokerPool::get) reports `Disconnected` for that id, letting a
88    /// caller fall back to the bootstrap connection rather than attempting a
89    /// doomed `host:0` connect.
90    pub fn refresh_brokers(&self, brokers: &[BrokerInfo]) {
91        for b in brokers {
92            if b.port == 0 {
93                continue;
94            }
95            let addr_str = format!("{}:{}", b.host, b.port);
96            if let Ok(addr) = addr_str.parse::<SocketAddr>() {
97                self.by_addr.insert(b.id, addr);
98            }
99        }
100    }
101
102    /// Whether the (id → addr) registry knows a dialable address for this
103    /// broker id (i.e. [`refresh_brokers`](BrokerPool::refresh_brokers) learned
104    /// it and the port was not `0`). A caller can use this to decide between
105    /// routing to a specific broker and falling back to the bootstrap
106    /// connection, without a speculative connect.
107    #[must_use]
108    pub fn knows_broker(&self, broker_id: i32) -> bool {
109        self.by_addr.contains_key(&broker_id)
110    }
111
112    /// Close every open connection in the pool. Consumes the pool.
113    pub fn close_all(self) {
114        let conns: Vec<_> = self.by_id.iter().map(|e| e.value().clone()).collect();
115        drop(self.by_id);
116        // Drop each Arc; when the last reference goes away the background tasks
117        // shut down naturally via the CancellationToken in ConnectionInner.
118        drop(conns);
119    }
120}
121
122#[cfg(test)]
123mod tests {
124    use super::*;
125    use assert2::assert;
126
127    #[test]
128    fn refresh_inserts_addresses() {
129        let pool = BrokerPool::new(vec![], ConnectionOptions::default());
130        pool.refresh_brokers(&[
131            BrokerInfo {
132                id: 1,
133                host: "127.0.0.1".into(),
134                port: 9092,
135                rack: None,
136            },
137            BrokerInfo {
138                id: 2,
139                host: "127.0.0.1".into(),
140                port: 9093,
141                rack: None,
142            },
143        ]);
144        assert!(pool.by_addr.contains_key(&1));
145        assert!(pool.by_addr.contains_key(&2));
146        assert!(*pool.by_addr.get(&1).unwrap() == "127.0.0.1:9092".parse().unwrap());
147        assert!(*pool.by_addr.get(&2).unwrap() == "127.0.0.1:9093".parse().unwrap());
148    }
149}