1use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use std::time::{Duration, Instant};
11use tokio::sync::RwLock;
12use tracing;
13
14use crate::error::Result;
15use crate::fingerprint::FingerprintProfile;
16use crate::transport::connector::MaybeHttpsStream;
17use crate::transport::h2::PseudoHeaderOrder;
18use crate::version::HttpVersion;
19
20#[derive(Debug, Clone, Hash, Eq, PartialEq)]
22pub struct PoolKey {
23 pub host: String,
24 pub port: u16,
25 pub is_https: bool,
26 pub fingerprint: FingerprintProfile,
27 pub pseudo_order: PseudoHeaderOrder,
28}
29
30impl PoolKey {
31 pub fn new(
33 host: String,
34 port: u16,
35 is_https: bool,
36 fingerprint: FingerprintProfile,
37 pseudo_order: PseudoHeaderOrder,
38 ) -> Self {
39 Self {
40 host,
41 port,
42 is_https,
43 fingerprint,
44 pseudo_order,
45 }
46 }
47
48 pub fn origin_key(&self) -> OriginKey {
49 OriginKey {
50 host: self.host.clone(),
51 port: self.port,
52 is_https: self.is_https,
53 }
54 }
55}
56
57#[derive(Debug, Clone, Hash, Eq, PartialEq)]
58pub struct OriginKey {
59 pub host: String,
60 pub port: u16,
61 pub is_https: bool,
62}
63
64#[derive(Debug, Clone)]
65pub struct OriginFairQueue<K = PoolKey> {
66 order: VecDeque<OriginKey>,
67 queues: HashMap<OriginKey, VecDeque<K>>,
68 len: usize,
69}
70
71impl<K> Default for OriginFairQueue<K> {
72 fn default() -> Self {
73 Self {
74 order: VecDeque::new(),
75 queues: HashMap::new(),
76 len: 0,
77 }
78 }
79}
80
81impl<K> OriginFairQueue<K> {
82 pub fn push_with_origin(&mut self, origin: OriginKey, key: K) {
83 let queue = self.queues.entry(origin.clone()).or_default();
84 if queue.is_empty() {
85 self.order.push_back(origin);
86 }
87 queue.push_back(key);
88 self.len += 1;
89 }
90
91 pub fn pop_next(&mut self) -> Option<K> {
92 while let Some(origin) = self.order.pop_front() {
93 let Some((key, has_more)) = self.pop_origin_front(&origin) else {
94 continue;
95 };
96 if has_more {
97 self.order.push_back(origin);
98 } else {
99 self.queues.remove(&origin);
100 }
101 self.len = self.len.saturating_sub(1);
102 return Some(key);
103 }
104 None
105 }
106
107 pub fn len(&self) -> usize {
108 self.len
109 }
110
111 pub fn is_empty(&self) -> bool {
112 self.len == 0
113 }
114
115 fn pop_origin_front(&mut self, origin: &OriginKey) -> Option<(K, bool)> {
116 let queue = self.queues.get_mut(origin)?;
117 let key = queue.pop_front()?;
118 Some((key, !queue.is_empty()))
119 }
120}
121
122impl OriginFairQueue<PoolKey> {
123 pub fn push(&mut self, key: PoolKey) {
124 let origin = key.origin_key();
125 self.push_with_origin(origin, key);
126 }
127}
128
129#[derive(Debug)]
131pub struct H1PoolEntry {
132 pub stream: MaybeHttpsStream,
133 pub last_used: Instant,
134}
135
136impl H1PoolEntry {
137 pub fn new(stream: MaybeHttpsStream) -> Self {
138 Self {
139 stream,
140 last_used: Instant::now(),
141 }
142 }
143
144 pub fn is_expired(&self, max_idle: Duration) -> bool {
145 self.last_used.elapsed() >= max_idle
146 }
147}
148
149#[derive(Debug, Clone)]
151pub struct PoolEntry {
152 pub version: HttpVersion,
153 pub established_at: Instant,
154 pub last_used: Instant,
155 pub active_streams: u32,
157 pub max_streams: u32,
159 pub is_valid: bool,
161}
162
163impl PoolEntry {
164 pub fn new(version: HttpVersion, max_streams: u32) -> Self {
166 let now = Instant::now();
167 Self {
168 version,
169 established_at: now,
170 last_used: now,
171 active_streams: 0,
172 max_streams,
173 is_valid: true,
174 }
175 }
176
177 pub fn can_multiplex(&self) -> bool {
179 matches!(
180 self.version,
181 HttpVersion::Http2 | HttpVersion::Http3 | HttpVersion::Http3Only
182 ) && self.active_streams < self.max_streams
183 && self.is_valid
184 }
185
186 pub fn acquire_stream(&mut self) -> bool {
188 if self.can_multiplex() {
189 self.active_streams += 1;
190 self.last_used = Instant::now();
191 true
192 } else {
193 false
194 }
195 }
196
197 pub fn release_stream(&mut self) {
199 if self.active_streams > 0 {
200 self.active_streams -= 1;
201 self.last_used = Instant::now();
202 }
203 }
204
205 pub fn invalidate(&mut self) {
207 self.is_valid = false;
208 }
209
210 pub fn is_expired(&self, max_idle: Duration) -> bool {
212 let age = Instant::now().duration_since(self.last_used);
213 age >= max_idle
214 }
215}
216
217pub struct ConnectionPool {
219 entries: Arc<RwLock<HashMap<PoolKey, PoolEntry>>>,
220 h1_idle: Arc<RwLock<HashMap<PoolKey, Vec<H1PoolEntry>>>>,
221 max_idle_duration: Duration,
222 #[allow(dead_code)] max_connections_per_host: usize,
224 default_max_streams: u32,
225}
226
227impl ConnectionPool {
228 const DEFAULT_MAX_IDLE: Duration = Duration::from_secs(30);
230
231 const DEFAULT_MAX_PER_HOST: usize = 6;
233
234 const DEFAULT_MAX_STREAMS: u32 = 100;
236
237 pub fn new() -> Self {
239 Self {
240 entries: Arc::new(RwLock::new(HashMap::new())),
241 h1_idle: Arc::new(RwLock::new(HashMap::new())),
242 max_idle_duration: Self::DEFAULT_MAX_IDLE,
243 max_connections_per_host: Self::DEFAULT_MAX_PER_HOST,
244 default_max_streams: Self::DEFAULT_MAX_STREAMS,
245 }
246 }
247
248 pub fn with_config(max_idle: Duration, max_per_host: usize, max_streams: u32) -> Self {
250 Self {
251 entries: Arc::new(RwLock::new(HashMap::new())),
252 h1_idle: Arc::new(RwLock::new(HashMap::new())),
253 max_idle_duration: max_idle,
254 max_connections_per_host: max_per_host,
255 default_max_streams: max_streams,
256 }
257 }
258
259 pub async fn get_h1(&self, key: &PoolKey) -> Option<MaybeHttpsStream> {
261 let start = Instant::now();
262 let mut pool = self.h1_idle.write().await;
263 if let Some(entries) = pool.get_mut(key) {
264 tracing::debug!("H1 Pool: {} entries for key {:?}", entries.len(), key);
265 let initial_count = entries.len();
266 while let Some(entry) = entries.pop() {
267 if !entry.is_expired(self.max_idle_duration) {
268 tracing::debug!(
269 "H1 Pool: Reusing connection for {:?} (checked {} entries, took {:?})",
270 key,
271 initial_count - entries.len(),
272 start.elapsed()
273 );
274 return Some(entry.stream);
275 }
276 tracing::debug!(
277 "H1 Pool: Connection expired for {:?} (age: {:?})",
278 key,
279 entry.last_used.elapsed()
280 );
281 }
282 } else {
283 tracing::debug!("H1 Pool: No entries for key {:?}", key);
284 }
285 tracing::debug!(
286 "H1 Pool: No reusable connection found for {:?} (took {:?})",
287 key,
288 start.elapsed()
289 );
290 None
291 }
292
293 pub async fn put_h1(&self, key: PoolKey, stream: MaybeHttpsStream) {
295 if self.max_connections_per_host == 0 {
296 return;
297 }
298 let start = Instant::now();
299 tracing::debug!("H1 Pool: Returning connection for {:?}", key);
300 let mut pool = self.h1_idle.write().await;
301 let entries = pool.entry(key.clone()).or_default();
302 let count_before = entries.len();
303 while entries.len() >= self.max_connections_per_host {
304 entries.remove(0);
305 }
306 entries.push(H1PoolEntry::new(stream));
307 tracing::debug!(
308 "H1 Pool: Returned connection for {:?} (pool size: {} -> {}, took {:?})",
309 key,
310 count_before,
311 entries.len(),
312 start.elapsed()
313 );
314 }
315
316 pub fn try_put_h1(&self, key: PoolKey, stream: MaybeHttpsStream) -> bool {
323 if self.max_connections_per_host == 0 {
324 return false;
325 }
326 let Ok(mut pool) = self.h1_idle.try_write() else {
327 return false;
328 };
329 let entries = pool.entry(key).or_default();
330 while entries.len() >= self.max_connections_per_host {
331 entries.remove(0);
332 }
333 entries.push(H1PoolEntry::new(stream));
334 true
335 }
336
337 pub async fn get_or_create(
343 &self,
344 key: &PoolKey,
345 version: HttpVersion,
346 ) -> Result<Option<PoolEntry>> {
347 let start = Instant::now();
348 let mut entries = self.entries.write().await;
349
350 if version == HttpVersion::Http1_1 {
352 return Ok(None);
353 }
354
355 if let Some(entry) = entries.get_mut(key) {
357 let active_before = entry.active_streams;
358 if entry.acquire_stream() {
359 tracing::debug!(
360 "H2/H3 Pool: Reusing connection for {:?} (active streams: {} -> {}, took {:?})",
361 key,
362 active_before,
363 entry.active_streams,
364 start.elapsed()
365 );
366 return Ok(Some(entry.clone()));
367 } else {
368 tracing::debug!(
369 "H2/H3 Pool: Connection exists for {:?} but cannot multiplex (active: {}/{}, valid: {}, took {:?})",
370 key,
371 active_before,
372 entry.max_streams,
373 entry.is_valid,
374 start.elapsed()
375 );
376 }
377 } else {
378 tracing::debug!("H2/H3 Pool: No existing connection for {:?}", key);
379 }
380
381 tracing::debug!(
383 "H2/H3 Pool: Creating new connection entry for {:?} (took {:?})",
384 key,
385 start.elapsed()
386 );
387 let entry = PoolEntry::new(version, self.default_max_streams);
388 entries.insert(key.clone(), entry.clone());
389
390 Ok(Some(entry))
391 }
392
393 pub async fn release(&self, key: &PoolKey) {
395 let mut entries = self.entries.write().await;
396 if let Some(entry) = entries.get_mut(key) {
397 let active_before = entry.active_streams;
398 entry.release_stream();
399 tracing::debug!(
400 "H2/H3 Pool: Released stream for {:?} (active streams: {} -> {})",
401 key,
402 active_before,
403 entry.active_streams
404 );
405 } else {
406 tracing::warn!(
407 "H2/H3 Pool: Attempted to release stream for non-existent connection {:?}",
408 key
409 );
410 }
411 }
412
413 pub async fn invalidate(&self, key: &PoolKey) {
415 let mut entries = self.entries.write().await;
416 if let Some(entry) = entries.get_mut(key) {
417 entry.invalidate();
418 }
419 }
420
421 pub async fn cleanup(&self) {
423 {
425 let mut entries = self.entries.write().await;
426 entries
427 .retain(|_key, entry| entry.is_valid && !entry.is_expired(self.max_idle_duration));
428 }
429
430 {
432 let mut h1_pool = self.h1_idle.write().await;
433 for entries in h1_pool.values_mut() {
434 entries.retain(|e| !e.is_expired(self.max_idle_duration));
435 }
436 h1_pool.retain(|_, entries| !entries.is_empty());
437 }
438 }
439
440 pub fn spawn_cleanup_task(self: Arc<Self>, interval: Duration) -> tokio::task::JoinHandle<()> {
444 tokio::spawn(async move {
445 let mut interval_timer = tokio::time::interval(interval);
446 loop {
447 interval_timer.tick().await;
448 self.cleanup().await;
449 }
450 })
451 }
452
453 pub async fn stats(&self) -> PoolStats {
455 let entries = self.entries.read().await;
456 let h1_pool = self.h1_idle.read().await;
457
458 let h1_idle_count = h1_pool.values().map(|v| v.len()).sum();
459
460 PoolStats {
461 total_connections: entries.len() + h1_idle_count,
462 active_streams: entries.values().map(|e| e.active_streams).sum(),
463 http2_connections: entries
464 .values()
465 .filter(|e| matches!(e.version, HttpVersion::Http2))
466 .count(),
467 http3_connections: entries
468 .values()
469 .filter(|e| matches!(e.version, HttpVersion::Http3 | HttpVersion::Http3Only))
470 .count(),
471 http1_idle_connections: h1_idle_count,
472 }
473 }
474}
475
476impl Default for ConnectionPool {
477 fn default() -> Self {
478 Self::new()
479 }
480}
481
482#[derive(Debug, Clone)]
484pub struct PoolStats {
485 pub total_connections: usize,
486 pub active_streams: u32,
487 pub http2_connections: usize,
488 pub http3_connections: usize,
489 pub http1_idle_connections: usize,
490}
491
492#[cfg(test)]
493mod tests {
494 use super::*;
495
496 #[test]
497 fn test_pool_key_equality() {
498 let key1 = PoolKey::new(
499 "example.com".to_string(),
500 443,
501 true,
502 FingerprintProfile::Chrome142,
503 PseudoHeaderOrder::Chrome,
504 );
505 let key2 = PoolKey::new(
506 "example.com".to_string(),
507 443,
508 true,
509 FingerprintProfile::Chrome142,
510 PseudoHeaderOrder::Chrome,
511 );
512 let key3 = PoolKey::new(
513 "example.com".to_string(),
514 80,
515 false,
516 FingerprintProfile::Chrome142,
517 PseudoHeaderOrder::Chrome,
518 );
519
520 assert_eq!(key1, key2);
521 assert_ne!(key1, key3);
522 }
523
524 #[test]
525 fn test_pool_entry_multiplexing() {
526 let mut entry = PoolEntry::new(HttpVersion::Http2, 100);
527
528 assert!(entry.can_multiplex());
530 assert!(entry.acquire_stream());
531 assert_eq!(entry.active_streams, 1);
532
533 entry.release_stream();
535 assert_eq!(entry.active_streams, 0);
536 }
537
538 #[test]
539 fn test_pool_entry_max_streams() {
540 let mut entry = PoolEntry::new(HttpVersion::Http2, 2);
541
542 assert!(entry.acquire_stream());
543 assert!(entry.acquire_stream());
544 assert!(!entry.acquire_stream()); assert_eq!(entry.active_streams, 2);
546 }
547
548 #[test]
549 fn test_pool_entry_invalidation() {
550 let mut entry = PoolEntry::new(HttpVersion::Http2, 100);
551
552 assert!(entry.can_multiplex());
553 entry.invalidate();
554 assert!(!entry.can_multiplex());
555 }
556
557 #[test]
558 fn test_pool_entry_expiration() {
559 let entry = PoolEntry::new(HttpVersion::Http2, 100);
560
561 assert!(!entry.is_expired(Duration::from_secs(30)));
563
564 assert!(entry.is_expired(Duration::from_secs(0)));
566 }
567
568 #[tokio::test]
569 async fn test_connection_pool_http11() {
570 let pool = ConnectionPool::new();
571 let key = PoolKey::new(
572 "example.com".to_string(),
573 443,
574 true,
575 FingerprintProfile::Chrome142,
576 PseudoHeaderOrder::Chrome,
577 );
578
579 let result = pool
581 .get_or_create(&key, HttpVersion::Http1_1)
582 .await
583 .unwrap();
584 assert!(result.is_none());
585 }
586
587 #[tokio::test]
588 async fn test_connection_pool_http2_multiplexing() {
589 let pool = ConnectionPool::new();
590 let key = PoolKey::new(
591 "example.com".to_string(),
592 443,
593 true,
594 FingerprintProfile::Chrome142,
595 PseudoHeaderOrder::Chrome,
596 );
597
598 let entry1 = pool.get_or_create(&key, HttpVersion::Http2).await.unwrap();
600 assert!(entry1.is_some());
601
602 let entry2 = pool.get_or_create(&key, HttpVersion::Http2).await.unwrap();
604 assert!(entry2.is_some());
605
606 let stats = pool.stats().await;
608 assert_eq!(stats.total_connections, 1);
609 assert_eq!(stats.http2_connections, 1);
610 }
611
612 #[tokio::test]
613 async fn test_connection_pool_release() {
614 let pool = ConnectionPool::new();
615 let key = PoolKey::new(
616 "example.com".to_string(),
617 443,
618 true,
619 FingerprintProfile::Chrome142,
620 PseudoHeaderOrder::Chrome,
621 );
622
623 let _entry = pool.get_or_create(&key, HttpVersion::Http2).await.unwrap();
624
625 pool.release(&key).await;
627
628 let stats = pool.stats().await;
629 assert_eq!(stats.total_connections, 1);
630 }
631
632 #[tokio::test]
633 async fn test_connection_pool_invalidation() {
634 let pool = ConnectionPool::new();
635 let key = PoolKey::new(
636 "example.com".to_string(),
637 443,
638 true,
639 FingerprintProfile::Chrome142,
640 PseudoHeaderOrder::Chrome,
641 );
642
643 let _entry = pool.get_or_create(&key, HttpVersion::Http2).await.unwrap();
644
645 pool.invalidate(&key).await;
647
648 pool.cleanup().await;
650
651 let stats = pool.stats().await;
652 assert_eq!(stats.total_connections, 0);
653 }
654
655 #[test]
656 fn origin_fair_queue_rotates_ready_origins_before_same_origin_reuse() {
657 let alpha_chrome = PoolKey::new(
658 "alpha.example".to_string(),
659 443,
660 true,
661 FingerprintProfile::Chrome142,
662 PseudoHeaderOrder::Chrome,
663 );
664 let alpha_firefox = PoolKey::new(
665 "alpha.example".to_string(),
666 443,
667 true,
668 FingerprintProfile::Firefox142,
669 PseudoHeaderOrder::Firefox,
670 );
671 let beta_chrome = PoolKey::new(
672 "beta.example".to_string(),
673 443,
674 true,
675 FingerprintProfile::Chrome142,
676 PseudoHeaderOrder::Chrome,
677 );
678 let mut queue = OriginFairQueue::default();
679
680 queue.push(alpha_chrome.clone());
681 queue.push(alpha_firefox.clone());
682 queue.push(beta_chrome.clone());
683
684 assert_eq!(queue.pop_next(), Some(alpha_chrome));
685 assert_eq!(
686 queue.pop_next(),
687 Some(beta_chrome),
688 "pool-level scheduling must give another ready origin a turn before reusing alpha"
689 );
690 assert_eq!(queue.pop_next(), Some(alpha_firefox));
691 assert!(queue.is_empty());
692 }
693}