ppaass_v3_common/connection/proxy/
pool.rs1use crate::config::{RetrieveConnectionConfig, RetrieveConnectionPoolConfig};
2use crate::error::CommonError;
3use crate::user::UserInfo;
4use crate::{FramedConnection, ProxyTcpConnectionNewState, ProxyTcpConnectionTunnelCtlState};
5use chrono::{DateTime, Utc};
6use std::cmp::Ordering;
7use std::fmt::Debug;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::mpsc::channel;
11use tokio::sync::{Mutex, RwLock};
12use tokio::time::sleep;
13use tracing::{debug, error};
14struct ProxyTcpConnectionPoolElement<C>
15where
16 C: RetrieveConnectionPoolConfig + RetrieveConnectionConfig + Debug + Send + Sync + 'static,
17{
18 proxy_tcp_connection: FramedConnection<ProxyTcpConnectionTunnelCtlState>,
19 create_time: DateTime<Utc>,
20 last_check_time: DateTime<Utc>,
21 last_check_duration: i64,
22 config: Arc<C>,
23}
24impl<C> ProxyTcpConnectionPoolElement<C>
25where
26 C: RetrieveConnectionPoolConfig + RetrieveConnectionConfig + Debug + Send + Sync + 'static,
27{
28 pub fn new(
29 proxy_tcp_connection: FramedConnection<ProxyTcpConnectionTunnelCtlState>,
30 config: Arc<C>,
31 ) -> Self {
32 Self {
33 proxy_tcp_connection,
34 last_check_time: Utc::now(),
35 create_time: Utc::now(),
36 last_check_duration: 0,
37 config,
38 }
39 }
40 pub fn need_check(&self) -> bool {
41 let now = Utc::now();
42 let delta = now - self.last_check_time;
43 delta.num_seconds() > self.config.check_interval() as i64
44 }
45 pub fn need_close(&self) -> bool {
46 let now = Utc::now();
47 let delta = now - self.create_time;
48 delta.num_seconds() > self.config.connection_max_alive()
49 }
50}
51pub struct ProxyTcpConnectionPool<C>
53where
54 C: RetrieveConnectionPoolConfig + RetrieveConnectionConfig + Debug + Send + Sync + 'static,
55{
56 pool: Arc<Mutex<Vec<ProxyTcpConnectionPoolElement<C>>>>,
58 config: Arc<C>,
59 user_info: Arc<RwLock<UserInfo>>,
60 username: String,
61}
62impl<C> ProxyTcpConnectionPool<C>
63where
64 C: RetrieveConnectionPoolConfig + RetrieveConnectionConfig + Debug + Send + Sync + 'static,
65{
66 pub async fn new(
68 config: Arc<C>,
69 username: &str,
70 user_info: Arc<RwLock<UserInfo>>,
71 ) -> Result<Self, CommonError> {
72 let pool = Arc::new(Mutex::new(Vec::new()));
73 let interval = config.fill_interval();
74 let pool_clone = pool.clone();
75 let user_info_clone = user_info.clone();
76 let config_clone = config.clone();
77 let username_clone = username.to_owned();
78 tokio::spawn(async move {
79 loop {
80 debug!("Starting connection pool auto filling loop.");
81 Self::fill_pool(
82 pool_clone.clone(),
83 config_clone.clone(),
84 user_info_clone.clone(),
85 &username_clone,
86 )
87 .await;
88 sleep(Duration::from_secs(interval)).await;
89 }
90 });
91 Self::start_connection_check_task(config.clone(), pool.clone());
92 Ok(Self {
93 pool,
94 config,
95 user_info,
96 username: username.to_owned(),
97 })
98 }
99 pub async fn take_proxy_connection(
100 &self,
101 ) -> Result<FramedConnection<ProxyTcpConnectionTunnelCtlState>, CommonError> {
102 Self::concrete_take_proxy_connection(
103 self.pool.clone(),
104 self.config.clone(),
105 self.user_info.clone(),
106 &self.username,
107 )
108 .await
109 }
110
111 async fn check_proxy_connection(
112 proxy_tcp_connection_pool_element: &mut ProxyTcpConnectionPoolElement<C>,
113 config: &C,
114 ) -> Result<(), CommonError> {
115 let check_duration = proxy_tcp_connection_pool_element
116 .proxy_tcp_connection
117 .heartbeat(config.heartbeat_timeout())
118 .await?;
119 proxy_tcp_connection_pool_element.last_check_duration = check_duration;
120 Ok(())
121 }
122
123 fn start_connection_check_task(
124 config: Arc<C>,
125 pool: Arc<Mutex<Vec<ProxyTcpConnectionPoolElement<C>>>>,
126 ) {
127 tokio::spawn(async move {
128 loop {
129 let mut pool_lock = pool.lock().await;
130 debug!(
131 "Start checking connection pool loop, current pool size: {} ",
132 pool_lock.len()
133 );
134 let (checking_tx, mut checking_rx) =
135 channel::<ProxyTcpConnectionPoolElement<C>>(config.max_pool_size());
136 channel::<ProxyTcpConnectionPoolElement<C>>(config.max_pool_size());
137 'for_each_connection: loop {
138 let mut proxy_tcp_connection_pool_element = match pool_lock.pop() {
139 None => break 'for_each_connection,
140 Some(proxy_tcp_connection_pool_element) => {
141 proxy_tcp_connection_pool_element
142 }
143 };
144 if !proxy_tcp_connection_pool_element.need_check() {
145 if let Err(e) = checking_tx.send(proxy_tcp_connection_pool_element).await {
146 error!("Fail to push proxy connection back to pool: {}", e);
147 }
148 continue 'for_each_connection;
149 }
150 if proxy_tcp_connection_pool_element.need_close() {
151 continue 'for_each_connection;
152 }
153 let checking_tx = checking_tx.clone();
154 let config = config.clone();
155 tokio::spawn(async move {
156 if let Err(e) = Self::check_proxy_connection(
157 &mut proxy_tcp_connection_pool_element,
158 &config,
159 )
160 .await
161 {
162 error!("Failed to check proxy connection: {}", e);
163 return;
164 };
165 if let Err(e) = checking_tx.send(proxy_tcp_connection_pool_element).await {
166 error!("Fail to push proxy connection back to pool: {}", e);
167 };
168 });
169 }
170 drop(checking_tx);
171 while let Some(mut proxy_connection) = checking_rx.recv().await {
172 if pool_lock.len() >= config.max_pool_size() {
173 tokio::spawn(async move {
174 if let Err(e) = proxy_connection.proxy_tcp_connection.close().await {
175 error!("Failed to close proxy connection: {}", e);
176 };
177 });
178 continue;
179 }
180 pool_lock.push(proxy_connection);
181 }
182 pool_lock.sort_by(|a, b| {
183 let comp = a.last_check_duration.cmp(&b.last_check_duration);
184 if Ordering::Equal == comp {
185 a.last_check_time.cmp(&b.last_check_time)
186 } else {
187 comp
188 }
189 });
190 drop(pool_lock);
191 sleep(Duration::from_secs(config.check_interval())).await;
192 }
193 });
194 }
195 async fn concrete_take_proxy_connection(
197 pool: Arc<Mutex<Vec<ProxyTcpConnectionPoolElement<C>>>>,
198 config: Arc<C>,
199 user_info: Arc<RwLock<UserInfo>>,
200 username: &str,
201 ) -> Result<FramedConnection<ProxyTcpConnectionTunnelCtlState>, CommonError> {
202 loop {
203 let mut pool_lock = pool.lock().await;
204 debug!(
205 "Taking proxy connection, current pool size: {}",
206 pool_lock.len()
207 );
208 let proxy_tcp_connection_element = pool_lock.pop();
209 match proxy_tcp_connection_element {
210 None => {
211 drop(pool_lock);
212 Self::fill_pool(pool.clone(), config.clone(), user_info.clone(), username)
213 .await;
214 sleep(Duration::from_secs(config.retake_interval())).await;
215 continue;
216 }
217 Some(proxy_tcp_connection_element) => {
218 debug!(
219 "Proxy connection available, current pool size before take: {}",
220 pool_lock.len()
221 );
222 return Ok(proxy_tcp_connection_element.proxy_tcp_connection);
223 }
224 }
225 }
226 }
227 async fn fill_pool(
229 pool: Arc<Mutex<Vec<ProxyTcpConnectionPoolElement<C>>>>,
230 config: Arc<C>,
231 user_info: Arc<RwLock<UserInfo>>,
232 username: &str,
233 ) {
234 let max_pool_size = config.max_pool_size();
235 let mut pool_lock = pool.lock().await;
236 if pool_lock.len() >= max_pool_size {
237 debug!(
238 "Cancel filling proxy connection pool, because the pool size exceed max, current pool size: {}, max pool size: {}",
239 pool_lock.len(),
240 max_pool_size
241 );
242 return;
243 }
244 debug!(
245 "Begin to fill proxy connection pool, current pool size:{}",
246 pool_lock.len()
247 );
248 let (proxy_tcp_connection_tx, mut proxy_tcp_connection_rx) =
249 channel::<FramedConnection<ProxyTcpConnectionTunnelCtlState>>(max_pool_size);
250
251 for _ in pool_lock.len()..max_pool_size {
252 let proxy_tcp_connection_tx = proxy_tcp_connection_tx.clone();
253
254 let user_info = user_info.clone();
255 let config = config.clone();
256 let username = username.to_owned();
257 tokio::spawn(async move {
258 let user_info = user_info.read().await;
259 match FramedConnection::<ProxyTcpConnectionNewState>::create(
260 &username,
261 &user_info,
262 config.frame_size(),
263 config.connect_timeout(),
264 )
265 .await
266 {
267 Ok(proxy_tcp_connection) => {
268 if let Err(e) = proxy_tcp_connection_tx.send(proxy_tcp_connection).await {
269 error!("Fail to send proxy tcp connection: {e:?}")
270 }
271 }
272 Err(e) => {
273 error!("Failed to create proxy connection: {e}");
274 }
275 }
276 });
277 }
278
279 drop(proxy_tcp_connection_tx);
280 debug!("Waiting for proxy connection creation result.");
281 while let Some(proxy_tcp_connection) = proxy_tcp_connection_rx.recv().await {
282 let proxy_tcp_connection_element_to_push =
283 ProxyTcpConnectionPoolElement::new(proxy_tcp_connection, config.clone());
284 pool_lock.push(proxy_tcp_connection_element_to_push)
285 }
286 }
287}