ppaass_v3_common/connection/proxy/
pool.rs

1use crate::config::{RetrieveConnectionConfig, RetrieveConnectionPoolConfig};
2use crate::error::CommonError;
3use crate::user::UserInfo;
4use crate::{FramedConnection, ProxyTcpConnectionNewState, ProxyTcpConnectionTunnelCtlState};
5use chrono::{DateTime, Utc};
6use std::cmp::Ordering;
7use std::fmt::Debug;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::mpsc::channel;
11use tokio::sync::{Mutex, RwLock};
12use tokio::time::sleep;
13use tracing::{debug, error};
14struct ProxyTcpConnectionPoolElement<C>
15where
16    C: RetrieveConnectionPoolConfig + RetrieveConnectionConfig + Debug + Send + Sync + 'static,
17{
18    proxy_tcp_connection: FramedConnection<ProxyTcpConnectionTunnelCtlState>,
19    create_time: DateTime<Utc>,
20    last_check_time: DateTime<Utc>,
21    last_check_duration: i64,
22    config: Arc<C>,
23}
24impl<C> ProxyTcpConnectionPoolElement<C>
25where
26    C: RetrieveConnectionPoolConfig + RetrieveConnectionConfig + Debug + Send + Sync + 'static,
27{
28    pub fn new(
29        proxy_tcp_connection: FramedConnection<ProxyTcpConnectionTunnelCtlState>,
30        config: Arc<C>,
31    ) -> Self {
32        Self {
33            proxy_tcp_connection,
34            last_check_time: Utc::now(),
35            create_time: Utc::now(),
36            last_check_duration: 0,
37            config,
38        }
39    }
40    pub fn need_check(&self) -> bool {
41        let now = Utc::now();
42        let delta = now - self.last_check_time;
43        delta.num_seconds() > self.config.check_interval() as i64
44    }
45    pub fn need_close(&self) -> bool {
46        let now = Utc::now();
47        let delta = now - self.create_time;
48        delta.num_seconds() > self.config.connection_max_alive()
49    }
50}
51/// The connection pool for proxy connection.
52pub struct ProxyTcpConnectionPool<C>
53where
54    C: RetrieveConnectionPoolConfig + RetrieveConnectionConfig + Debug + Send + Sync + 'static,
55{
56    /// The pool to store the proxy connection
57    pool: Arc<Mutex<Vec<ProxyTcpConnectionPoolElement<C>>>>,
58    config: Arc<C>,
59    user_info: Arc<RwLock<UserInfo>>,
60    username: String,
61}
62impl<C> ProxyTcpConnectionPool<C>
63where
64    C: RetrieveConnectionPoolConfig + RetrieveConnectionConfig + Debug + Send + Sync + 'static,
65{
66    /// Create the proxy connection pool
67    pub async fn new(
68        config: Arc<C>,
69        username: &str,
70        user_info: Arc<RwLock<UserInfo>>,
71    ) -> Result<Self, CommonError> {
72        let pool = Arc::new(Mutex::new(Vec::new()));
73        let interval = config.fill_interval();
74        let pool_clone = pool.clone();
75        let user_info_clone = user_info.clone();
76        let config_clone = config.clone();
77        let username_clone = username.to_owned();
78        tokio::spawn(async move {
79            loop {
80                debug!("Starting connection pool auto filling loop.");
81                Self::fill_pool(
82                    pool_clone.clone(),
83                    config_clone.clone(),
84                    user_info_clone.clone(),
85                    &username_clone,
86                )
87                .await;
88                sleep(Duration::from_secs(interval)).await;
89            }
90        });
91        Self::start_connection_check_task(config.clone(), pool.clone());
92        Ok(Self {
93            pool,
94            config,
95            user_info,
96            username: username.to_owned(),
97        })
98    }
99    pub async fn take_proxy_connection(
100        &self,
101    ) -> Result<FramedConnection<ProxyTcpConnectionTunnelCtlState>, CommonError> {
102        Self::concrete_take_proxy_connection(
103            self.pool.clone(),
104            self.config.clone(),
105            self.user_info.clone(),
106            &self.username,
107        )
108        .await
109    }
110
111    async fn check_proxy_connection(
112        proxy_tcp_connection_pool_element: &mut ProxyTcpConnectionPoolElement<C>,
113        config: &C,
114    ) -> Result<(), CommonError> {
115        let check_duration = proxy_tcp_connection_pool_element
116            .proxy_tcp_connection
117            .heartbeat(config.heartbeat_timeout())
118            .await?;
119        proxy_tcp_connection_pool_element.last_check_duration = check_duration;
120        Ok(())
121    }
122
123    fn start_connection_check_task(
124        config: Arc<C>,
125        pool: Arc<Mutex<Vec<ProxyTcpConnectionPoolElement<C>>>>,
126    ) {
127        tokio::spawn(async move {
128            loop {
129                let mut pool_lock = pool.lock().await;
130                debug!(
131                    "Start checking connection pool loop, current pool size: {} ",
132                    pool_lock.len()
133                );
134                let (checking_tx, mut checking_rx) =
135                    channel::<ProxyTcpConnectionPoolElement<C>>(config.max_pool_size());
136                channel::<ProxyTcpConnectionPoolElement<C>>(config.max_pool_size());
137                'for_each_connection: loop {
138                    let mut proxy_tcp_connection_pool_element = match pool_lock.pop() {
139                        None => break 'for_each_connection,
140                        Some(proxy_tcp_connection_pool_element) => {
141                            proxy_tcp_connection_pool_element
142                        }
143                    };
144                    if !proxy_tcp_connection_pool_element.need_check() {
145                        if let Err(e) = checking_tx.send(proxy_tcp_connection_pool_element).await {
146                            error!("Fail to push proxy connection back to pool: {}", e);
147                        }
148                        continue 'for_each_connection;
149                    }
150                    if proxy_tcp_connection_pool_element.need_close() {
151                        continue 'for_each_connection;
152                    }
153                    let checking_tx = checking_tx.clone();
154                    let config = config.clone();
155                    tokio::spawn(async move {
156                        if let Err(e) = Self::check_proxy_connection(
157                            &mut proxy_tcp_connection_pool_element,
158                            &config,
159                        )
160                        .await
161                        {
162                            error!("Failed to check proxy connection: {}", e);
163                            return;
164                        };
165                        if let Err(e) = checking_tx.send(proxy_tcp_connection_pool_element).await {
166                            error!("Fail to push proxy connection back to pool: {}", e);
167                        };
168                    });
169                }
170                drop(checking_tx);
171                while let Some(mut proxy_connection) = checking_rx.recv().await {
172                    if pool_lock.len() >= config.max_pool_size() {
173                        tokio::spawn(async move {
174                            if let Err(e) = proxy_connection.proxy_tcp_connection.close().await {
175                                error!("Failed to close proxy connection: {}", e);
176                            };
177                        });
178                        continue;
179                    }
180                    pool_lock.push(proxy_connection);
181                }
182                pool_lock.sort_by(|a, b| {
183                    let comp = a.last_check_duration.cmp(&b.last_check_duration);
184                    if Ordering::Equal == comp {
185                        a.last_check_time.cmp(&b.last_check_time)
186                    } else {
187                        comp
188                    }
189                });
190                drop(pool_lock);
191                sleep(Duration::from_secs(config.check_interval())).await;
192            }
193        });
194    }
195    /// The concrete take proxy connection implementation
196    async fn concrete_take_proxy_connection(
197        pool: Arc<Mutex<Vec<ProxyTcpConnectionPoolElement<C>>>>,
198        config: Arc<C>,
199        user_info: Arc<RwLock<UserInfo>>,
200        username: &str,
201    ) -> Result<FramedConnection<ProxyTcpConnectionTunnelCtlState>, CommonError> {
202        loop {
203            let mut pool_lock = pool.lock().await;
204            debug!(
205                "Taking proxy connection, current pool size: {}",
206                pool_lock.len()
207            );
208            let proxy_tcp_connection_element = pool_lock.pop();
209            match proxy_tcp_connection_element {
210                None => {
211                    drop(pool_lock);
212                    Self::fill_pool(pool.clone(), config.clone(), user_info.clone(), username)
213                        .await;
214                    sleep(Duration::from_secs(config.retake_interval())).await;
215                    continue;
216                }
217                Some(proxy_tcp_connection_element) => {
218                    debug!(
219                        "Proxy connection available, current pool size before take: {}",
220                        pool_lock.len()
221                    );
222                    return Ok(proxy_tcp_connection_element.proxy_tcp_connection);
223                }
224            }
225        }
226    }
227    /// Fill the pool with proxy connection
228    async fn fill_pool(
229        pool: Arc<Mutex<Vec<ProxyTcpConnectionPoolElement<C>>>>,
230        config: Arc<C>,
231        user_info: Arc<RwLock<UserInfo>>,
232        username: &str,
233    ) {
234        let max_pool_size = config.max_pool_size();
235        let mut pool_lock = pool.lock().await;
236        if pool_lock.len() >= max_pool_size {
237            debug!(
238                "Cancel filling proxy connection pool, because the pool size exceed max, current pool size: {}, max pool size: {}",
239                pool_lock.len(),
240                max_pool_size
241            );
242            return;
243        }
244        debug!(
245            "Begin to fill proxy connection pool, current pool size:{}",
246            pool_lock.len()
247        );
248        let (proxy_tcp_connection_tx, mut proxy_tcp_connection_rx) =
249            channel::<FramedConnection<ProxyTcpConnectionTunnelCtlState>>(max_pool_size);
250
251        for _ in pool_lock.len()..max_pool_size {
252            let proxy_tcp_connection_tx = proxy_tcp_connection_tx.clone();
253
254            let user_info = user_info.clone();
255            let config = config.clone();
256            let username = username.to_owned();
257            tokio::spawn(async move {
258                let user_info = user_info.read().await;
259                match FramedConnection::<ProxyTcpConnectionNewState>::create(
260                    &username,
261                    &user_info,
262                    config.frame_size(),
263                    config.connect_timeout(),
264                )
265                .await
266                {
267                    Ok(proxy_tcp_connection) => {
268                        if let Err(e) = proxy_tcp_connection_tx.send(proxy_tcp_connection).await {
269                            error!("Fail to send proxy tcp connection: {e:?}")
270                        }
271                    }
272                    Err(e) => {
273                        error!("Failed to create proxy connection: {e}");
274                    }
275                }
276            });
277        }
278
279        drop(proxy_tcp_connection_tx);
280        debug!("Waiting for proxy connection creation result.");
281        while let Some(proxy_tcp_connection) = proxy_tcp_connection_rx.recv().await {
282            let proxy_tcp_connection_element_to_push =
283                ProxyTcpConnectionPoolElement::new(proxy_tcp_connection, config.clone());
284            pool_lock.push(proxy_tcp_connection_element_to_push)
285        }
286    }
287}