1use hyper::client::conn::{http1, http2};
7use hyper_util::rt::TokioIo;
8use std::collections::VecDeque;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use thiserror::Error;
12use tokio::net::TcpStream;
13use tokio::sync::Mutex;
14use tracing::debug;
15
16#[derive(Debug, Clone)]
18pub struct PoolConfig {
19 pub max_idle_per_host: usize,
21 pub idle_timeout: Duration,
23 pub prefer_h2: bool,
25}
26
27impl Default for PoolConfig {
28 fn default() -> Self {
29 Self {
30 max_idle_per_host: 32,
31 idle_timeout: Duration::from_secs(90),
32 prefer_h2: false,
33 }
34 }
35}
36
37#[derive(Debug, Error)]
39pub enum ConnectionPoolError {
40 #[error("Connection error: {0}")]
41 Connection(String),
42 #[error("Handshake error: {0}")]
43 Handshake(String),
44 #[error("Pool is full")]
45 PoolFull,
46 #[error("No available connection")]
47 NoConnection,
48}
49
50struct PooledH1Connection {
52 sender: http1::SendRequest<BoxBody>,
53 last_used: Instant,
54}
55
56pub struct ConnectionPool {
58 target_addr: String,
59 config: PoolConfig,
60 h1_pool: Arc<Mutex<VecDeque<PooledH1Connection>>>,
62 h2_connection: Arc<Mutex<Option<http2::SendRequest<BoxBody>>>>,
64}
65
66type BoxBody =
69 http_body_util::combinators::BoxBody<bytes::Bytes, Box<dyn std::error::Error + Send + Sync>>;
70
71impl ConnectionPool {
72 pub fn new(target_addr: String, config: PoolConfig) -> Self {
74 let pool = Self {
75 target_addr,
76 config,
77 h1_pool: Arc::new(Mutex::new(VecDeque::new())),
78 h2_connection: Arc::new(Mutex::new(None)),
79 };
80
81 if tokio::runtime::Handle::try_current().is_ok() {
83 let eviction_pool = pool.h1_pool.clone();
84 let eviction_timeout = pool.config.idle_timeout;
85 tokio::spawn(async move {
86 loop {
87 tokio::time::sleep(Duration::from_secs(30)).await;
88 Self::evict_expired_internal(eviction_pool.clone(), eviction_timeout).await;
89 }
90 });
91 }
92
93 pool
94 }
95
96 pub async fn acquire_h1(&self) -> Result<http1::SendRequest<BoxBody>, ConnectionPoolError> {
98 loop {
100 let mut pool = self.h1_pool.lock().await;
101
102 if let Some(mut conn) = pool.pop_back() {
103 if !conn.sender.is_closed() && conn.last_used.elapsed() < self.config.idle_timeout {
105 debug!("Reusing HTTP/1.1 connection from pool");
106 conn.last_used = Instant::now();
107 return Ok(conn.sender);
108 }
109 debug!("Discarding expired/closed HTTP/1.1 connection");
111 continue;
112 }
113
114 break;
116 }
117
118 debug!("Creating new HTTP/1.1 connection to {}", self.target_addr);
119 let stream = TcpStream::connect(&self.target_addr)
120 .await
121 .map_err(|e| ConnectionPoolError::Connection(e.to_string()))?;
122
123 ferrotunnel_core::transport::socket_tuning::configure_socket_silent(&stream);
124 let io = TokioIo::new(stream);
125
126 let (sender, conn) = http1::handshake(io)
127 .await
128 .map_err(|e| ConnectionPoolError::Handshake(e.to_string()))?;
129
130 tokio::spawn(async move {
132 if let Err(e) = conn.with_upgrades().await {
133 debug!("HTTP/1.1 connection error: {:?}", e);
134 }
135 });
136
137 Ok(sender)
138 }
139
140 pub async fn release_h1(&self, sender: http1::SendRequest<BoxBody>) {
142 if sender.is_closed() {
144 debug!("Not returning closed connection to pool");
145 return;
146 }
147
148 let mut pool = self.h1_pool.lock().await;
149
150 if pool.len() >= self.config.max_idle_per_host {
152 debug!("HTTP/1.1 pool full, dropping connection");
153 return;
154 }
155
156 pool.push_back(PooledH1Connection {
157 sender,
158 last_used: Instant::now(),
159 });
160 debug!(
161 "Released HTTP/1.1 connection to pool (size: {})",
162 pool.len()
163 );
164 }
165
166 pub async fn acquire_h2(&self) -> Result<http2::SendRequest<BoxBody>, ConnectionPoolError> {
168 let mut h2_conn = self.h2_connection.lock().await;
169
170 if let Some(ref sender) = *h2_conn {
172 if sender.is_ready() {
173 debug!("Reusing existing HTTP/2 connection");
174 return Ok(sender.clone());
175 }
176 debug!("HTTP/2 connection not ready, creating new one");
177 }
178
179 debug!("Creating new HTTP/2 connection to {}", self.target_addr);
181 let stream = TcpStream::connect(&self.target_addr)
182 .await
183 .map_err(|e| ConnectionPoolError::Connection(e.to_string()))?;
184
185 ferrotunnel_core::transport::socket_tuning::configure_socket_silent(&stream);
186 let io = TokioIo::new(stream);
187
188 let (sender, conn) = http2::handshake(hyper_util::rt::TokioExecutor::new(), io)
189 .await
190 .map_err(|e| ConnectionPoolError::Handshake(e.to_string()))?;
191
192 tokio::spawn(async move {
194 if let Err(e) = conn.await {
195 debug!("HTTP/2 connection error: {:?}", e);
196 }
197 });
198
199 *h2_conn = Some(sender.clone());
200 Ok(sender)
201 }
202
203 pub async fn evict_expired(&self) {
205 Self::evict_expired_internal(self.h1_pool.clone(), self.config.idle_timeout).await;
206 }
207
208 async fn evict_expired_internal(
209 pool: Arc<Mutex<VecDeque<PooledH1Connection>>>,
210 timeout: Duration,
211 ) {
212 let mut pool = pool.lock().await;
213 let original_len = pool.len();
214
215 pool.retain(|conn| !conn.sender.is_closed() && conn.last_used.elapsed() < timeout);
216
217 let evicted = original_len - pool.len();
218 if evicted > 0 {
219 debug!("Evicted {} expired HTTP/1.1 connections", evicted);
220 }
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227
228 #[test]
229 fn test_pool_config_default() {
230 let config = PoolConfig::default();
231 assert_eq!(config.max_idle_per_host, 32);
232 assert_eq!(config.idle_timeout, Duration::from_secs(90));
233 assert!(!config.prefer_h2);
234 }
235
236 #[test]
237 fn test_connection_pool_new() {
238 let config = PoolConfig::default();
239 let pool = ConnectionPool::new("127.0.0.1:8080".to_string(), config);
240 assert_eq!(pool.target_addr, "127.0.0.1:8080");
241 }
242
243 #[test]
244 fn test_pool_config_custom() {
245 let config = PoolConfig {
246 max_idle_per_host: 10,
247 idle_timeout: Duration::from_secs(60),
248 prefer_h2: true,
249 };
250 assert_eq!(config.max_idle_per_host, 10);
251 assert_eq!(config.idle_timeout, Duration::from_secs(60));
252 assert!(config.prefer_h2);
253 }
254}