nsq_async_rs/
connection_pool.rs1use crate::connection::Connection;
2use crate::error::Result;
3use crate::protocol::IdentifyConfig;
4use dashmap::DashMap;
5use log::{info, warn};
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8
9#[derive(Debug, Clone)]
11pub struct ConnectionPoolConfig {
12 pub connection_timeout: Duration,
14 pub max_idle_time: Duration,
16 pub health_check_interval: Duration,
18 pub max_connections_per_host: usize,
20}
21
22impl Default for ConnectionPoolConfig {
23 fn default() -> Self {
24 Self {
25 connection_timeout: Duration::from_secs(5),
26 max_idle_time: Duration::from_secs(60),
27 health_check_interval: Duration::from_secs(30),
28 max_connections_per_host: 10,
29 }
30 }
31}
32
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
35pub enum HealthStatus {
36 Unknown,
38 Healthy,
40 Unhealthy,
42}
43
44#[derive(Debug)]
46pub struct PooledConnection {
47 pub connection: Arc<Connection>,
49 pub last_used: Instant,
51 pub last_checked: Instant,
53 pub health_status: HealthStatus,
55 pub fingerprint: String,
57}
58
59impl PooledConnection {
60 pub fn new(connection: Arc<Connection>, fingerprint: String) -> Self {
62 let now = Instant::now();
63 Self {
64 connection,
65 last_used: now,
66 last_checked: now,
67 health_status: HealthStatus::Unknown,
68 fingerprint,
69 }
70 }
71
72 pub async fn check_health(&mut self) -> bool {
74 self.last_checked = Instant::now();
75
76 match self.connection.handle_heartbeat().await {
77 Ok(_) => {
78 self.health_status = HealthStatus::Healthy;
79 true
80 }
81 Err(e) => {
82 warn!("连接健康检查失败: {}, 地址: {}", e, self.connection.addr());
83 self.health_status = HealthStatus::Unhealthy;
84 false
85 }
86 }
87 }
88
89 pub fn update_last_used(&mut self) {
91 self.last_used = Instant::now();
92 }
93
94 pub fn is_idle(&self, max_idle_time: Duration) -> bool {
96 self.last_used.elapsed() > max_idle_time
97 }
98}
99
100#[derive(Debug)]
102pub struct ConnectionPool {
103 config: ConnectionPoolConfig,
105 connections: DashMap<String, Vec<PooledConnection>>,
107}
108
109impl ConnectionPool {
110 pub fn new(config: ConnectionPoolConfig) -> Self {
112 Self {
113 config,
114 connections: DashMap::new(),
115 }
116 }
117
118 fn generate_fingerprint(
120 addr: &str,
121 identify_config: &Option<IdentifyConfig>,
122 auth_secret: &Option<String>,
123 ) -> String {
124 format!(
126 "{}:{}:{}",
127 addr,
128 identify_config
129 .as_ref()
130 .map(|c| format!("{:?}", c))
131 .unwrap_or_default(),
132 auth_secret.as_ref().unwrap_or(&String::new())
133 )
134 }
135
136 pub async fn get_connection(
138 &self,
139 addr: &str,
140 identify_config: Option<IdentifyConfig>,
141 auth_secret: Option<String>,
142 ) -> Result<Arc<Connection>> {
143 let fingerprint = Self::generate_fingerprint(addr, &identify_config, &auth_secret);
144
145 if let Some(mut connections) = self.connections.get_mut(&fingerprint) {
147 for conn in connections.value_mut().iter_mut() {
149 if conn.health_status != HealthStatus::Unhealthy {
150 conn.update_last_used();
151
152 return Ok(Arc::clone(&conn.connection));
153 }
154 }
155
156 for conn in connections.value_mut().iter_mut() {
158 if conn.check_health().await {
159 conn.update_last_used();
160
161 return Ok(Arc::clone(&conn.connection));
162 }
163 }
164
165 if connections.value().len() < self.config.max_connections_per_host {
167 return self
168 .create_and_store_connection(addr, identify_config, auth_secret, &fingerprint)
169 .await;
170 }
171
172 if let Some(oldest_index) = connections
174 .value()
175 .iter()
176 .enumerate()
177 .min_by_key(|(_, conn)| conn.last_used)
178 .map(|(i, _)| i)
179 {
180 connections.value_mut().remove(oldest_index);
181 return self
182 .create_and_store_connection(addr, identify_config, auth_secret, &fingerprint)
183 .await;
184 }
185 }
186
187 self.create_and_store_connection(addr, identify_config, auth_secret, &fingerprint)
189 .await
190 }
191
192 async fn create_and_store_connection(
194 &self,
195 addr: &str,
196 identify_config: Option<IdentifyConfig>,
197 auth_secret: Option<String>,
198 fingerprint: &str,
199 ) -> Result<Arc<Connection>> {
200 let connection = Connection::new(
202 addr,
203 identify_config.clone(),
204 auth_secret.clone(),
205 Duration::from_secs(60), Duration::from_secs(5), )
208 .await?;
209
210 let connection = Arc::new(connection);
211 let pooled_connection =
212 PooledConnection::new(Arc::clone(&connection), fingerprint.to_string());
213
214 self.connections
216 .entry(fingerprint.to_string())
217 .or_default()
218 .push(pooled_connection);
219
220 Ok(connection)
221 }
222
223 pub fn start_cleanup_task(pool: Arc<ConnectionPool>) {
225 tokio::spawn(async move {
226 let mut interval = tokio::time::interval(Duration::from_secs(30));
227 loop {
228 interval.tick().await;
229 pool.cleanup_idle_connections().await;
230 }
231 });
232 }
233
234 pub fn start_health_check_task(pool: Arc<ConnectionPool>) {
236 tokio::spawn(async move {
237 let mut interval = tokio::time::interval(pool.config.health_check_interval);
238 loop {
239 interval.tick().await;
240 pool.check_connections_health().await;
241 }
242 });
243 }
244
245 pub async fn cleanup_idle_connections(&self) {
247 let max_idle_time = self.config.max_idle_time;
248
249 for mut entry in self.connections.iter_mut() {
250 let before_count = entry.value().len();
251 entry
252 .value_mut()
253 .retain(|conn| !conn.is_idle(max_idle_time));
254 let after_count = entry.value().len();
255
256 if before_count > after_count {}
257 }
258 }
259
260 pub async fn check_connections_health(&self) {
262 for mut entry in self.connections.iter_mut() {
263 for conn in entry.value_mut().iter_mut() {
264 if conn.last_checked.elapsed() > self.config.health_check_interval {
266 let _ = conn.check_health().await;
267 }
268 }
269 }
270 }
271
272 pub fn get_stats(&self) -> ConnectionPoolStats {
274 let mut stats = ConnectionPoolStats::default();
275
276 for entry in self.connections.iter() {
277 stats.total_connections += entry.value().len();
278
279 for conn in entry.value() {
280 match conn.health_status {
281 HealthStatus::Healthy => stats.healthy_connections += 1,
282 HealthStatus::Unhealthy => stats.unhealthy_connections += 1,
283 HealthStatus::Unknown => stats.unknown_status_connections += 1,
284 }
285
286 if conn.is_idle(self.config.max_idle_time) {
287 stats.idle_connections += 1;
288 }
289 }
290 }
291
292 stats.host_count = self.connections.len();
293 stats
294 }
295}
296
297#[derive(Debug, Default, Clone)]
299pub struct ConnectionPoolStats {
300 pub total_connections: usize,
302 pub healthy_connections: usize,
304 pub unhealthy_connections: usize,
306 pub unknown_status_connections: usize,
308 pub idle_connections: usize,
310 pub host_count: usize,
312}
313
314pub fn create_connection_pool(config: ConnectionPoolConfig) -> Arc<ConnectionPool> {
316 let pool = Arc::new(ConnectionPool::new(config));
317
318 ConnectionPool::start_cleanup_task(Arc::clone(&pool));
320 ConnectionPool::start_health_check_task(Arc::clone(&pool));
321
322 info!("NSQ连接池已初始化");
323 pool
324}
325
326pub fn create_default_connection_pool() -> Arc<ConnectionPool> {
328 create_connection_pool(ConnectionPoolConfig::default())
329}