Skip to main content

soli_proxy/
pool.rs

1use http::Uri;
2use hyper_util::client::legacy::connect::HttpConnector;
3use hyper_util::client::legacy::Client;
4use hyper_util::rt::TokioExecutor;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use tokio::sync::Mutex;
9
10#[derive(Clone)]
11pub struct PooledConnection {
12    pub client: Client<HttpConnector, hyper::body::Incoming>,
13    pub last_used: Instant,
14    pub uri: Uri,
15}
16
17pub struct ConnectionPool {
18    pools: Arc<Mutex<HashMap<String, PooledConnection>>>,
19    _max_idle: usize,
20    max_age: Duration,
21    connector: HttpConnector,
22}
23
24impl ConnectionPool {
25    pub fn new(max_idle: usize, max_age: Duration) -> Self {
26        Self {
27            pools: Arc::new(Mutex::new(HashMap::new())),
28            _max_idle: max_idle,
29            max_age,
30            connector: HttpConnector::new(),
31        }
32    }
33
34    pub async fn get_client(&self, uri: &Uri) -> Client<HttpConnector, hyper::body::Incoming> {
35        let uri_str = uri.to_string();
36        let mut pools = self.pools.lock().await;
37
38        if let Some(conn) = pools.get(&uri_str) {
39            if conn.last_used.elapsed() < self.max_age {
40                let client = Client::builder(TokioExecutor::new())
41                    .http2_only(true)
42                    .build(self.connector.clone());
43                return client;
44            }
45            pools.remove(&uri_str);
46        }
47
48        Client::builder(TokioExecutor::new())
49            .http2_only(true)
50            .build(self.connector.clone())
51    }
52
53    pub async fn cleanup(&self) {
54        let mut pools = self.pools.lock().await;
55        let now = Instant::now();
56        pools.retain(|_, conn| now.duration_since(conn.last_used) < self.max_age);
57    }
58}
59
60pub fn create_optimized_client() -> Client<HttpConnector, hyper::body::Incoming> {
61    let mut connector = HttpConnector::new();
62    connector.set_nodelay(true);
63    connector.set_keepalive(Some(Duration::from_secs(30)));
64
65    Client::builder(TokioExecutor::new())
66        .http2_only(true)
67        .build(connector)
68}
69
70pub struct BackendPool {
71    clients: Vec<Client<HttpConnector, hyper::body::Incoming>>,
72    current: usize,
73}
74
75impl BackendPool {
76    pub fn new(targets: &[Uri]) -> Self {
77        let clients: Vec<_> = targets
78            .iter()
79            .map(|_| {
80                let mut connector = HttpConnector::new();
81                connector.set_nodelay(true);
82                connector.set_keepalive(Some(Duration::from_secs(30)));
83                Client::builder(TokioExecutor::new())
84                    .http2_only(true)
85                    .build(connector)
86            })
87            .collect();
88
89        Self {
90            clients,
91            current: 0,
92        }
93    }
94
95    pub fn get_client(&mut self) -> &mut Client<HttpConnector, hyper::body::Incoming> {
96        let idx = self.current;
97        self.current = (self.current + 1) % self.clients.len();
98        &mut self.clients[idx]
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use super::*;
105
106    #[tokio::test]
107    async fn test_pool_creation() {
108        let pool = ConnectionPool::new(10, Duration::from_secs(60));
109        assert!(pool._max_idle == 10);
110        assert!(pool.max_age == Duration::from_secs(60));
111    }
112}