Skip to main content

ferrotunnel_http/
pool.rs

1//! Connection pooling for HTTP/1.1 and HTTP/2 client connections
2//!
3//! This module provides connection reuse to avoid TCP handshake and HTTP protocol overhead.
4//! HTTP/1.1 connections are pooled in a LIFO queue, while HTTP/2 uses a single multiplexed connection.
5
6use hyper::client::conn::{http1, http2};
7use hyper_util::rt::TokioIo;
8use std::collections::VecDeque;
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use thiserror::Error;
12use tokio::net::TcpStream;
13use tokio::sync::Mutex;
14use tracing::debug;
15
16/// Connection pool configuration
17#[derive(Debug, Clone)]
18pub struct PoolConfig {
19    /// Maximum idle connections per host (default: 32)
20    pub max_idle_per_host: usize,
21    /// Idle timeout for connections (default: 90s)
22    pub idle_timeout: Duration,
23    /// Prefer HTTP/2 when available (default: false)
24    pub prefer_h2: bool,
25}
26
27impl Default for PoolConfig {
28    fn default() -> Self {
29        Self {
30            max_idle_per_host: 32,
31            idle_timeout: Duration::from_secs(90),
32            prefer_h2: false,
33        }
34    }
35}
36
37/// Connection pool errors
38#[derive(Debug, Error)]
39pub enum ConnectionPoolError {
40    #[error("Connection error: {0}")]
41    Connection(String),
42    #[error("Handshake error: {0}")]
43    Handshake(String),
44    #[error("Pool is full")]
45    PoolFull,
46    #[error("No available connection")]
47    NoConnection,
48}
49
50/// Pooled HTTP/1.1 connection with metadata
51struct PooledH1Connection {
52    sender: http1::SendRequest<BoxBody>,
53    last_used: Instant,
54}
55
56/// Connection pool for HTTP/1.1 and HTTP/2
57pub struct ConnectionPool {
58    target_addr: String,
59    config: PoolConfig,
60    /// HTTP/1.1 idle connections (LIFO for cache warmth)
61    h1_pool: Arc<Mutex<VecDeque<PooledH1Connection>>>,
62    /// HTTP/2 multiplexed connection (shared across all requests)
63    h2_connection: Arc<Mutex<Option<http2::SendRequest<BoxBody>>>>,
64}
65
66/// Boxed body type used for both HTTP/1.1 and HTTP/2 connections.
67/// Uses `bytes::Bytes` for data chunks and a boxed error for flexibility.
68type BoxBody =
69    http_body_util::combinators::BoxBody<bytes::Bytes, Box<dyn std::error::Error + Send + Sync>>;
70
71impl ConnectionPool {
72    /// Create a new connection pool
73    pub fn new(target_addr: String, config: PoolConfig) -> Self {
74        let pool = Self {
75            target_addr,
76            config,
77            h1_pool: Arc::new(Mutex::new(VecDeque::new())),
78            h2_connection: Arc::new(Mutex::new(None)),
79        };
80
81        // Spawn background eviction task only if we're in a tokio runtime
82        if tokio::runtime::Handle::try_current().is_ok() {
83            let eviction_pool = pool.h1_pool.clone();
84            let eviction_timeout = pool.config.idle_timeout;
85            tokio::spawn(async move {
86                loop {
87                    tokio::time::sleep(Duration::from_secs(30)).await;
88                    Self::evict_expired_internal(eviction_pool.clone(), eviction_timeout).await;
89                }
90            });
91        }
92
93        pool
94    }
95
96    /// Acquire an HTTP/1.1 connection from the pool or create a new one
97    pub async fn acquire_h1(&self) -> Result<http1::SendRequest<BoxBody>, ConnectionPoolError> {
98        // Try to reuse an idle connection
99        loop {
100            let mut pool = self.h1_pool.lock().await;
101
102            if let Some(mut conn) = pool.pop_back() {
103                // Check if connection is still valid
104                if !conn.sender.is_closed() && conn.last_used.elapsed() < self.config.idle_timeout {
105                    debug!("Reusing HTTP/1.1 connection from pool");
106                    conn.last_used = Instant::now();
107                    return Ok(conn.sender);
108                }
109                // Connection expired or closed, try next one
110                debug!("Discarding expired/closed HTTP/1.1 connection");
111                continue;
112            }
113
114            // No valid connection in pool, create a new one
115            break;
116        }
117
118        debug!("Creating new HTTP/1.1 connection to {}", self.target_addr);
119        let stream = TcpStream::connect(&self.target_addr)
120            .await
121            .map_err(|e| ConnectionPoolError::Connection(e.to_string()))?;
122
123        ferrotunnel_core::transport::socket_tuning::configure_socket_silent(&stream);
124        let io = TokioIo::new(stream);
125
126        let (sender, conn) = http1::handshake(io)
127            .await
128            .map_err(|e| ConnectionPoolError::Handshake(e.to_string()))?;
129
130        // Spawn connection driver
131        tokio::spawn(async move {
132            if let Err(e) = conn.with_upgrades().await {
133                debug!("HTTP/1.1 connection error: {:?}", e);
134            }
135        });
136
137        Ok(sender)
138    }
139
140    /// Release an HTTP/1.1 connection back to the pool
141    pub async fn release_h1(&self, sender: http1::SendRequest<BoxBody>) {
142        // Don't return closed connections to the pool
143        if sender.is_closed() {
144            debug!("Not returning closed connection to pool");
145            return;
146        }
147
148        let mut pool = self.h1_pool.lock().await;
149
150        // Enforce per-host limit
151        if pool.len() >= self.config.max_idle_per_host {
152            debug!("HTTP/1.1 pool full, dropping connection");
153            return;
154        }
155
156        pool.push_back(PooledH1Connection {
157            sender,
158            last_used: Instant::now(),
159        });
160        debug!(
161            "Released HTTP/1.1 connection to pool (size: {})",
162            pool.len()
163        );
164    }
165
166    /// Acquire an HTTP/2 connection (multiplexed, shared)
167    pub async fn acquire_h2(&self) -> Result<http2::SendRequest<BoxBody>, ConnectionPoolError> {
168        let mut h2_conn = self.h2_connection.lock().await;
169
170        // Check if we have a valid H2 connection
171        if let Some(ref sender) = *h2_conn {
172            if sender.is_ready() {
173                debug!("Reusing existing HTTP/2 connection");
174                return Ok(sender.clone());
175            }
176            debug!("HTTP/2 connection not ready, creating new one");
177        }
178
179        // Create new HTTP/2 connection
180        debug!("Creating new HTTP/2 connection to {}", self.target_addr);
181        let stream = TcpStream::connect(&self.target_addr)
182            .await
183            .map_err(|e| ConnectionPoolError::Connection(e.to_string()))?;
184
185        ferrotunnel_core::transport::socket_tuning::configure_socket_silent(&stream);
186        let io = TokioIo::new(stream);
187
188        let (sender, conn) = http2::handshake(hyper_util::rt::TokioExecutor::new(), io)
189            .await
190            .map_err(|e| ConnectionPoolError::Handshake(e.to_string()))?;
191
192        // Spawn connection driver
193        tokio::spawn(async move {
194            if let Err(e) = conn.await {
195                debug!("HTTP/2 connection error: {:?}", e);
196            }
197        });
198
199        *h2_conn = Some(sender.clone());
200        Ok(sender)
201    }
202
203    /// Evict expired connections from the pool
204    pub async fn evict_expired(&self) {
205        Self::evict_expired_internal(self.h1_pool.clone(), self.config.idle_timeout).await;
206    }
207
208    async fn evict_expired_internal(
209        pool: Arc<Mutex<VecDeque<PooledH1Connection>>>,
210        timeout: Duration,
211    ) {
212        let mut pool = pool.lock().await;
213        let original_len = pool.len();
214
215        pool.retain(|conn| !conn.sender.is_closed() && conn.last_used.elapsed() < timeout);
216
217        let evicted = original_len - pool.len();
218        if evicted > 0 {
219            debug!("Evicted {} expired HTTP/1.1 connections", evicted);
220        }
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227
228    #[test]
229    fn test_pool_config_default() {
230        let config = PoolConfig::default();
231        assert_eq!(config.max_idle_per_host, 32);
232        assert_eq!(config.idle_timeout, Duration::from_secs(90));
233        assert!(!config.prefer_h2);
234    }
235
236    #[test]
237    fn test_connection_pool_new() {
238        let config = PoolConfig::default();
239        let pool = ConnectionPool::new("127.0.0.1:8080".to_string(), config);
240        assert_eq!(pool.target_addr, "127.0.0.1:8080");
241    }
242
243    #[test]
244    fn test_pool_config_custom() {
245        let config = PoolConfig {
246            max_idle_per_host: 10,
247            idle_timeout: Duration::from_secs(60),
248            prefer_h2: true,
249        };
250        assert_eq!(config.max_idle_per_host, 10);
251        assert_eq!(config.idle_timeout, Duration::from_secs(60));
252        assert!(config.prefer_h2);
253    }
254}