crabka_client_core/
pool.rs1use std::net::SocketAddr;
5use std::sync::Arc;
6
7use dashmap::DashMap;
8
9use crate::connection::{Connection, ConnectionOptions};
10use crate::error::ClientError;
11
12#[derive(Debug, Clone)]
14pub struct BrokerInfo {
15 pub id: i32,
16 pub host: String,
17 pub port: i32,
18 pub rack: Option<String>,
19}
20
21pub struct BrokerPool {
24 by_id: DashMap<i32, Arc<Connection>>,
25 by_addr: DashMap<i32, SocketAddr>,
26 bootstrap: Vec<SocketAddr>,
27 options: ConnectionOptions,
28}
29
30impl BrokerPool {
31 #[must_use]
33 pub fn new(bootstrap: Vec<SocketAddr>, options: ConnectionOptions) -> Self {
34 Self {
35 by_id: DashMap::new(),
36 by_addr: DashMap::new(),
37 bootstrap,
38 options,
39 }
40 }
41
42 pub async fn get(&self, broker_id: i32) -> Result<Arc<Connection>, ClientError> {
47 if let Some(entry) = self.by_id.get(&broker_id) {
48 return Ok(entry.clone());
49 }
50 let addr = self
51 .by_addr
52 .get(&broker_id)
53 .map(|e| *e)
54 .ok_or(ClientError::Disconnected)?;
55 let conn = Arc::new(Connection::connect_with_options(addr, self.options.clone()).await?);
56 self.by_id.insert(broker_id, conn.clone());
57 Ok(conn)
58 }
59
60 pub async fn bootstrap_connection(&self) -> Result<Arc<Connection>, ClientError> {
63 const BOOTSTRAP_ID: i32 = -1;
64 if let Some(entry) = self.by_id.get(&BOOTSTRAP_ID) {
65 return Ok(entry.clone());
66 }
67 let mut last_err: Option<ClientError> = None;
68 for addr in &self.bootstrap {
69 match Connection::connect_with_options(*addr, self.options.clone()).await {
70 Ok(c) => {
71 let arc = Arc::new(c);
72 self.by_id.insert(BOOTSTRAP_ID, arc.clone());
73 return Ok(arc);
74 }
75 Err(e) => last_err = Some(e),
76 }
77 }
78 Err(last_err.unwrap_or(ClientError::Disconnected))
79 }
80
81 pub fn refresh_brokers(&self, brokers: &[BrokerInfo]) {
91 for b in brokers {
92 if b.port == 0 {
93 continue;
94 }
95 let addr_str = format!("{}:{}", b.host, b.port);
96 if let Ok(addr) = addr_str.parse::<SocketAddr>() {
97 self.by_addr.insert(b.id, addr);
98 }
99 }
100 }
101
102 #[must_use]
108 pub fn knows_broker(&self, broker_id: i32) -> bool {
109 self.by_addr.contains_key(&broker_id)
110 }
111
112 pub fn close_all(self) {
114 let conns: Vec<_> = self.by_id.iter().map(|e| e.value().clone()).collect();
115 drop(self.by_id);
116 drop(conns);
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use super::*;
125 use assert2::assert;
126
127 #[test]
128 fn refresh_inserts_addresses() {
129 let pool = BrokerPool::new(vec![], ConnectionOptions::default());
130 pool.refresh_brokers(&[
131 BrokerInfo {
132 id: 1,
133 host: "127.0.0.1".into(),
134 port: 9092,
135 rack: None,
136 },
137 BrokerInfo {
138 id: 2,
139 host: "127.0.0.1".into(),
140 port: 9093,
141 rack: None,
142 },
143 ]);
144 assert!(pool.by_addr.contains_key(&1));
145 assert!(pool.by_addr.contains_key(&2));
146 assert!(*pool.by_addr.get(&1).unwrap() == "127.0.0.1:9092".parse().unwrap());
147 assert!(*pool.by_addr.get(&2).unwrap() == "127.0.0.1:9093".parse().unwrap());
148 }
149}