Skip to main content

specter/pool/
multiplexer.rs

1//! Connection pool for HTTP/2 and HTTP/3 multiplexing
2//!
3//! This module provides connection pooling with support for:
4//! - HTTP/1.1: One connection per request (no pooling)
5//! - HTTP/2: Connection reuse with stream multiplexing
6//! - HTTP/3: QUIC connection reuse with stream multiplexing
7
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::RwLock;
12use tracing;
13
14use crate::error::Result;
15use crate::fingerprint::FingerprintProfile;
16use crate::transport::connector::MaybeHttpsStream;
17use crate::transport::h2::PseudoHeaderOrder;
18use crate::version::HttpVersion;
19
20/// Connection pool key identifying a unique host/port combination with fingerprint settings
21#[derive(Debug, Clone, Hash, Eq, PartialEq)]
22pub struct PoolKey {
23    pub host: String,
24    pub port: u16,
25    pub is_https: bool,
26    pub fingerprint: FingerprintProfile,
27    pub pseudo_order: PseudoHeaderOrder,
28}
29
30impl PoolKey {
31    /// Create a new pool key
32    pub fn new(
33        host: String,
34        port: u16,
35        is_https: bool,
36        fingerprint: FingerprintProfile,
37        pseudo_order: PseudoHeaderOrder,
38    ) -> Self {
39        Self {
40            host,
41            port,
42            is_https,
43            fingerprint,
44            pseudo_order,
45        }
46    }
47
48    pub fn origin_key(&self) -> OriginKey {
49        OriginKey {
50            host: self.host.clone(),
51            port: self.port,
52            is_https: self.is_https,
53        }
54    }
55}
56
57#[derive(Debug, Clone, Hash, Eq, PartialEq)]
58pub struct OriginKey {
59    pub host: String,
60    pub port: u16,
61    pub is_https: bool,
62}
63
64#[derive(Debug, Clone)]
65pub struct OriginFairQueue<K = PoolKey> {
66    order: VecDeque<OriginKey>,
67    queues: HashMap<OriginKey, VecDeque<K>>,
68    len: usize,
69}
70
71impl<K> Default for OriginFairQueue<K> {
72    fn default() -> Self {
73        Self {
74            order: VecDeque::new(),
75            queues: HashMap::new(),
76            len: 0,
77        }
78    }
79}
80
81impl<K> OriginFairQueue<K> {
82    pub fn push_with_origin(&mut self, origin: OriginKey, key: K) {
83        let queue = self.queues.entry(origin.clone()).or_default();
84        if queue.is_empty() {
85            self.order.push_back(origin);
86        }
87        queue.push_back(key);
88        self.len += 1;
89    }
90
91    pub fn pop_next(&mut self) -> Option<K> {
92        while let Some(origin) = self.order.pop_front() {
93            let Some((key, has_more)) = self.pop_origin_front(&origin) else {
94                continue;
95            };
96            if has_more {
97                self.order.push_back(origin);
98            } else {
99                self.queues.remove(&origin);
100            }
101            self.len = self.len.saturating_sub(1);
102            return Some(key);
103        }
104        None
105    }
106
107    pub fn len(&self) -> usize {
108        self.len
109    }
110
111    pub fn is_empty(&self) -> bool {
112        self.len == 0
113    }
114
115    fn pop_origin_front(&mut self, origin: &OriginKey) -> Option<(K, bool)> {
116        let queue = self.queues.get_mut(origin)?;
117        let key = queue.pop_front()?;
118        Some((key, !queue.is_empty()))
119    }
120}
121
122impl OriginFairQueue<PoolKey> {
123    pub fn push(&mut self, key: PoolKey) {
124        let origin = key.origin_key();
125        self.push_with_origin(origin, key);
126    }
127}
128
129/// Pool entry for HTTP/1.1 connections
130#[derive(Debug)]
131pub struct H1PoolEntry {
132    pub stream: MaybeHttpsStream,
133    pub last_used: Instant,
134}
135
136impl H1PoolEntry {
137    pub fn new(stream: MaybeHttpsStream) -> Self {
138        Self {
139            stream,
140            last_used: Instant::now(),
141        }
142    }
143
144    pub fn is_expired(&self, max_idle: Duration) -> bool {
145        self.last_used.elapsed() >= max_idle
146    }
147}
148
149/// Pool entry tracking connection state and stream usage
150#[derive(Debug, Clone)]
151pub struct PoolEntry {
152    pub version: HttpVersion,
153    pub established_at: Instant,
154    pub last_used: Instant,
155    /// Number of active streams (for HTTP/2 and HTTP/3)
156    pub active_streams: u32,
157    /// Maximum concurrent streams (from SETTINGS for HTTP/2)
158    pub max_streams: u32,
159    /// Connection is still valid
160    pub is_valid: bool,
161}
162
163impl PoolEntry {
164    /// Create a new pool entry
165    pub fn new(version: HttpVersion, max_streams: u32) -> Self {
166        let now = Instant::now();
167        Self {
168            version,
169            established_at: now,
170            last_used: now,
171            active_streams: 0,
172            max_streams,
173            is_valid: true,
174        }
175    }
176
177    /// Check if this connection can handle another multiplexed stream
178    pub fn can_multiplex(&self) -> bool {
179        matches!(
180            self.version,
181            HttpVersion::Http2 | HttpVersion::Http3 | HttpVersion::Http3Only
182        ) && self.active_streams < self.max_streams
183            && self.is_valid
184    }
185
186    /// Attempt to acquire a stream slot
187    pub fn acquire_stream(&mut self) -> bool {
188        if self.can_multiplex() {
189            self.active_streams += 1;
190            self.last_used = Instant::now();
191            true
192        } else {
193            false
194        }
195    }
196
197    /// Release a stream slot
198    pub fn release_stream(&mut self) {
199        if self.active_streams > 0 {
200            self.active_streams -= 1;
201            self.last_used = Instant::now();
202        }
203    }
204
205    /// Mark connection as invalid (connection error, GOAWAY frame, etc.)
206    pub fn invalidate(&mut self) {
207        self.is_valid = false;
208    }
209
210    /// Check if connection is expired based on idle time
211    pub fn is_expired(&self, max_idle: Duration) -> bool {
212        let age = Instant::now().duration_since(self.last_used);
213        age >= max_idle
214    }
215}
216
217/// Connection pool for reusing HTTP/1.1, HTTP/2, and HTTP/3 connections
218pub struct ConnectionPool {
219    entries: Arc<RwLock<HashMap<PoolKey, PoolEntry>>>,
220    h1_idle: Arc<RwLock<HashMap<PoolKey, Vec<H1PoolEntry>>>>,
221    max_idle_duration: Duration,
222    #[allow(dead_code)] // Reserved for future connection limiting per host
223    max_connections_per_host: usize,
224    default_max_streams: u32,
225}
226
227impl ConnectionPool {
228    /// Default maximum idle duration (30 seconds)
229    const DEFAULT_MAX_IDLE: Duration = Duration::from_secs(30);
230
231    /// Default maximum connections per host
232    const DEFAULT_MAX_PER_HOST: usize = 6;
233
234    /// Default maximum concurrent streams for HTTP/2 and HTTP/3
235    const DEFAULT_MAX_STREAMS: u32 = 100;
236
237    /// Create a new connection pool with default settings
238    pub fn new() -> Self {
239        Self {
240            entries: Arc::new(RwLock::new(HashMap::new())),
241            h1_idle: Arc::new(RwLock::new(HashMap::new())),
242            max_idle_duration: Self::DEFAULT_MAX_IDLE,
243            max_connections_per_host: Self::DEFAULT_MAX_PER_HOST,
244            default_max_streams: Self::DEFAULT_MAX_STREAMS,
245        }
246    }
247
248    /// Create a connection pool with custom configuration
249    pub fn with_config(max_idle: Duration, max_per_host: usize, max_streams: u32) -> Self {
250        Self {
251            entries: Arc::new(RwLock::new(HashMap::new())),
252            h1_idle: Arc::new(RwLock::new(HashMap::new())),
253            max_idle_duration: max_idle,
254            max_connections_per_host: max_per_host,
255            default_max_streams: max_streams,
256        }
257    }
258
259    /// Get an idle HTTP/1.1 connection from the pool
260    pub async fn get_h1(&self, key: &PoolKey) -> Option<MaybeHttpsStream> {
261        let start = Instant::now();
262        let mut pool = self.h1_idle.write().await;
263        if let Some(entries) = pool.get_mut(key) {
264            tracing::debug!("H1 Pool: {} entries for key {:?}", entries.len(), key);
265            let initial_count = entries.len();
266            while let Some(entry) = entries.pop() {
267                if !entry.is_expired(self.max_idle_duration) {
268                    tracing::debug!(
269                        "H1 Pool: Reusing connection for {:?} (checked {} entries, took {:?})",
270                        key,
271                        initial_count - entries.len(),
272                        start.elapsed()
273                    );
274                    return Some(entry.stream);
275                }
276                tracing::debug!(
277                    "H1 Pool: Connection expired for {:?} (age: {:?})",
278                    key,
279                    entry.last_used.elapsed()
280                );
281            }
282        } else {
283            tracing::debug!("H1 Pool: No entries for key {:?}", key);
284        }
285        tracing::debug!(
286            "H1 Pool: No reusable connection found for {:?} (took {:?})",
287            key,
288            start.elapsed()
289        );
290        None
291    }
292
293    /// Return an HTTP/1.1 connection to the pool
294    pub async fn put_h1(&self, key: PoolKey, stream: MaybeHttpsStream) {
295        if self.max_connections_per_host == 0 {
296            return;
297        }
298        let start = Instant::now();
299        tracing::debug!("H1 Pool: Returning connection for {:?}", key);
300        let mut pool = self.h1_idle.write().await;
301        let entries = pool.entry(key.clone()).or_default();
302        let count_before = entries.len();
303        while entries.len() >= self.max_connections_per_host {
304            entries.remove(0);
305        }
306        entries.push(H1PoolEntry::new(stream));
307        tracing::debug!(
308            "H1 Pool: Returned connection for {:?} (pool size: {} -> {}, took {:?})",
309            key,
310            count_before,
311            entries.len(),
312            start.elapsed()
313        );
314    }
315
316    /// Try to return an HTTP/1.1 connection to the pool without awaiting.
317    ///
318    /// This is used by poll-based response bodies, where parking the caller on
319    /// an async pool lock would couple body EOF to an unrelated task wake. If
320    /// the pool lock is temporarily unavailable, the connection is safely
321    /// discarded instead of being returned late or through a spawned shim.
322    pub fn try_put_h1(&self, key: PoolKey, stream: MaybeHttpsStream) -> bool {
323        if self.max_connections_per_host == 0 {
324            return false;
325        }
326        let Ok(mut pool) = self.h1_idle.try_write() else {
327            return false;
328        };
329        let entries = pool.entry(key).or_default();
330        while entries.len() >= self.max_connections_per_host {
331            entries.remove(0);
332        }
333        entries.push(H1PoolEntry::new(stream));
334        true
335    }
336
337    /// Get an existing connection or signal that a new one should be created
338    ///
339    /// Returns:
340    /// - `Ok(Some(entry))`: Reusable connection found (HTTP/2 or HTTP/3)
341    /// - `Ok(None)`: No reusable connection, create new one
342    pub async fn get_or_create(
343        &self,
344        key: &PoolKey,
345        version: HttpVersion,
346    ) -> Result<Option<PoolEntry>> {
347        let start = Instant::now();
348        let mut entries = self.entries.write().await;
349
350        // HTTP/1.1 doesn't support multiplexing in this map - managed via get_h1/put_h1
351        if version == HttpVersion::Http1_1 {
352            return Ok(None);
353        }
354
355        // Check for existing valid connection with available stream slots
356        if let Some(entry) = entries.get_mut(key) {
357            let active_before = entry.active_streams;
358            if entry.acquire_stream() {
359                tracing::debug!(
360                    "H2/H3 Pool: Reusing connection for {:?} (active streams: {} -> {}, took {:?})",
361                    key,
362                    active_before,
363                    entry.active_streams,
364                    start.elapsed()
365                );
366                return Ok(Some(entry.clone()));
367            } else {
368                tracing::debug!(
369                    "H2/H3 Pool: Connection exists for {:?} but cannot multiplex (active: {}/{}, valid: {}, took {:?})",
370                    key,
371                    active_before,
372                    entry.max_streams,
373                    entry.is_valid,
374                    start.elapsed()
375                );
376            }
377        } else {
378            tracing::debug!("H2/H3 Pool: No existing connection for {:?}", key);
379        }
380
381        // No reusable connection found - create new entry
382        tracing::debug!(
383            "H2/H3 Pool: Creating new connection entry for {:?} (took {:?})",
384            key,
385            start.elapsed()
386        );
387        let entry = PoolEntry::new(version, self.default_max_streams);
388        entries.insert(key.clone(), entry.clone());
389
390        Ok(Some(entry))
391    }
392
393    /// Release a stream slot back to the pool
394    pub async fn release(&self, key: &PoolKey) {
395        let mut entries = self.entries.write().await;
396        if let Some(entry) = entries.get_mut(key) {
397            let active_before = entry.active_streams;
398            entry.release_stream();
399            tracing::debug!(
400                "H2/H3 Pool: Released stream for {:?} (active streams: {} -> {})",
401                key,
402                active_before,
403                entry.active_streams
404            );
405        } else {
406            tracing::warn!(
407                "H2/H3 Pool: Attempted to release stream for non-existent connection {:?}",
408                key
409            );
410        }
411    }
412
413    /// Invalidate a connection (due to error, GOAWAY, etc.)
414    pub async fn invalidate(&self, key: &PoolKey) {
415        let mut entries = self.entries.write().await;
416        if let Some(entry) = entries.get_mut(key) {
417            entry.invalidate();
418        }
419    }
420
421    /// Remove expired and invalid connections
422    pub async fn cleanup(&self) {
423        // Cleanup H2/H3 entries
424        {
425            let mut entries = self.entries.write().await;
426            entries
427                .retain(|_key, entry| entry.is_valid && !entry.is_expired(self.max_idle_duration));
428        }
429
430        // Cleanup H1 entries
431        {
432            let mut h1_pool = self.h1_idle.write().await;
433            for entries in h1_pool.values_mut() {
434                entries.retain(|e| !e.is_expired(self.max_idle_duration));
435            }
436            h1_pool.retain(|_, entries| !entries.is_empty());
437        }
438    }
439
440    /// Spawn a background cleanup task that runs periodically
441    ///
442    /// Returns a handle to the spawned task
443    pub fn spawn_cleanup_task(self: Arc<Self>, interval: Duration) -> tokio::task::JoinHandle<()> {
444        tokio::spawn(async move {
445            let mut interval_timer = tokio::time::interval(interval);
446            loop {
447                interval_timer.tick().await;
448                self.cleanup().await;
449            }
450        })
451    }
452
453    /// Get current pool statistics (for debugging/monitoring)
454    pub async fn stats(&self) -> PoolStats {
455        let entries = self.entries.read().await;
456        let h1_pool = self.h1_idle.read().await;
457
458        let h1_idle_count = h1_pool.values().map(|v| v.len()).sum();
459
460        PoolStats {
461            total_connections: entries.len() + h1_idle_count,
462            active_streams: entries.values().map(|e| e.active_streams).sum(),
463            http2_connections: entries
464                .values()
465                .filter(|e| matches!(e.version, HttpVersion::Http2))
466                .count(),
467            http3_connections: entries
468                .values()
469                .filter(|e| matches!(e.version, HttpVersion::Http3 | HttpVersion::Http3Only))
470                .count(),
471            http1_idle_connections: h1_idle_count,
472        }
473    }
474}
475
476impl Default for ConnectionPool {
477    fn default() -> Self {
478        Self::new()
479    }
480}
481
482/// Pool statistics for monitoring
483#[derive(Debug, Clone)]
484pub struct PoolStats {
485    pub total_connections: usize,
486    pub active_streams: u32,
487    pub http2_connections: usize,
488    pub http3_connections: usize,
489    pub http1_idle_connections: usize,
490}
491
492#[cfg(test)]
493mod tests {
494    use super::*;
495
496    #[test]
497    fn test_pool_key_equality() {
498        let key1 = PoolKey::new(
499            "example.com".to_string(),
500            443,
501            true,
502            FingerprintProfile::Chrome142,
503            PseudoHeaderOrder::Chrome,
504        );
505        let key2 = PoolKey::new(
506            "example.com".to_string(),
507            443,
508            true,
509            FingerprintProfile::Chrome142,
510            PseudoHeaderOrder::Chrome,
511        );
512        let key3 = PoolKey::new(
513            "example.com".to_string(),
514            80,
515            false,
516            FingerprintProfile::Chrome142,
517            PseudoHeaderOrder::Chrome,
518        );
519
520        assert_eq!(key1, key2);
521        assert_ne!(key1, key3);
522    }
523
524    #[test]
525    fn test_pool_entry_multiplexing() {
526        let mut entry = PoolEntry::new(HttpVersion::Http2, 100);
527
528        // Should be able to acquire streams
529        assert!(entry.can_multiplex());
530        assert!(entry.acquire_stream());
531        assert_eq!(entry.active_streams, 1);
532
533        // Release stream
534        entry.release_stream();
535        assert_eq!(entry.active_streams, 0);
536    }
537
538    #[test]
539    fn test_pool_entry_max_streams() {
540        let mut entry = PoolEntry::new(HttpVersion::Http2, 2);
541
542        assert!(entry.acquire_stream());
543        assert!(entry.acquire_stream());
544        assert!(!entry.acquire_stream()); // Max reached
545        assert_eq!(entry.active_streams, 2);
546    }
547
548    #[test]
549    fn test_pool_entry_invalidation() {
550        let mut entry = PoolEntry::new(HttpVersion::Http2, 100);
551
552        assert!(entry.can_multiplex());
553        entry.invalidate();
554        assert!(!entry.can_multiplex());
555    }
556
557    #[test]
558    fn test_pool_entry_expiration() {
559        let entry = PoolEntry::new(HttpVersion::Http2, 100);
560
561        // Should not be expired immediately
562        assert!(!entry.is_expired(Duration::from_secs(30)));
563
564        // Test with zero duration (always expired)
565        assert!(entry.is_expired(Duration::from_secs(0)));
566    }
567
568    #[tokio::test]
569    async fn test_connection_pool_http11() {
570        let pool = ConnectionPool::new();
571        let key = PoolKey::new(
572            "example.com".to_string(),
573            443,
574            true,
575            FingerprintProfile::Chrome142,
576            PseudoHeaderOrder::Chrome,
577        );
578
579        // HTTP/1.1 should always return None (no pooling)
580        let result = pool
581            .get_or_create(&key, HttpVersion::Http1_1)
582            .await
583            .unwrap();
584        assert!(result.is_none());
585    }
586
587    #[tokio::test]
588    async fn test_connection_pool_http2_multiplexing() {
589        let pool = ConnectionPool::new();
590        let key = PoolKey::new(
591            "example.com".to_string(),
592            443,
593            true,
594            FingerprintProfile::Chrome142,
595            PseudoHeaderOrder::Chrome,
596        );
597
598        // First request creates connection
599        let entry1 = pool.get_or_create(&key, HttpVersion::Http2).await.unwrap();
600        assert!(entry1.is_some());
601
602        // Second request should reuse connection
603        let entry2 = pool.get_or_create(&key, HttpVersion::Http2).await.unwrap();
604        assert!(entry2.is_some());
605
606        // Verify stats
607        let stats = pool.stats().await;
608        assert_eq!(stats.total_connections, 1);
609        assert_eq!(stats.http2_connections, 1);
610    }
611
612    #[tokio::test]
613    async fn test_connection_pool_release() {
614        let pool = ConnectionPool::new();
615        let key = PoolKey::new(
616            "example.com".to_string(),
617            443,
618            true,
619            FingerprintProfile::Chrome142,
620            PseudoHeaderOrder::Chrome,
621        );
622
623        let _entry = pool.get_or_create(&key, HttpVersion::Http2).await.unwrap();
624
625        // Release stream
626        pool.release(&key).await;
627
628        let stats = pool.stats().await;
629        assert_eq!(stats.total_connections, 1);
630    }
631
632    #[tokio::test]
633    async fn test_connection_pool_invalidation() {
634        let pool = ConnectionPool::new();
635        let key = PoolKey::new(
636            "example.com".to_string(),
637            443,
638            true,
639            FingerprintProfile::Chrome142,
640            PseudoHeaderOrder::Chrome,
641        );
642
643        let _entry = pool.get_or_create(&key, HttpVersion::Http2).await.unwrap();
644
645        // Invalidate connection
646        pool.invalidate(&key).await;
647
648        // Cleanup should remove invalid connection
649        pool.cleanup().await;
650
651        let stats = pool.stats().await;
652        assert_eq!(stats.total_connections, 0);
653    }
654
655    #[test]
656    fn origin_fair_queue_rotates_ready_origins_before_same_origin_reuse() {
657        let alpha_chrome = PoolKey::new(
658            "alpha.example".to_string(),
659            443,
660            true,
661            FingerprintProfile::Chrome142,
662            PseudoHeaderOrder::Chrome,
663        );
664        let alpha_firefox = PoolKey::new(
665            "alpha.example".to_string(),
666            443,
667            true,
668            FingerprintProfile::Firefox142,
669            PseudoHeaderOrder::Firefox,
670        );
671        let beta_chrome = PoolKey::new(
672            "beta.example".to_string(),
673            443,
674            true,
675            FingerprintProfile::Chrome142,
676            PseudoHeaderOrder::Chrome,
677        );
678        let mut queue = OriginFairQueue::default();
679
680        queue.push(alpha_chrome.clone());
681        queue.push(alpha_firefox.clone());
682        queue.push(beta_chrome.clone());
683
684        assert_eq!(queue.pop_next(), Some(alpha_chrome));
685        assert_eq!(
686            queue.pop_next(),
687            Some(beta_chrome),
688            "pool-level scheduling must give another ready origin a turn before reusing alpha"
689        );
690        assert_eq!(queue.pop_next(), Some(alpha_firefox));
691        assert!(queue.is_empty());
692    }
693}