1use log::{debug, warn};
18use parking_lot::{Mutex, RwLock};
19use pingora_timeout::{sleep, timeout};
20use std::collections::HashMap;
21use std::io;
22use std::sync::Arc;
23use std::time::Duration;
24use tokio::io::{AsyncRead, AsyncReadExt};
25use tokio::sync::{oneshot, watch, Notify, OwnedMutexGuard};
26
27use super::lru::Lru;
28
29type GroupKey = u64;
30#[cfg(unix)]
31type ID = i32;
32#[cfg(windows)]
33type ID = usize;
34
35#[derive(Clone, Debug)]
37pub struct ConnectionMeta {
38 pub key: GroupKey,
40 pub id: ID,
42}
43
44impl ConnectionMeta {
45 pub fn new(key: GroupKey, id: ID) -> Self {
47 ConnectionMeta { key, id }
48 }
49}
50
51struct PoolConnection<S> {
52 pub notify_use: oneshot::Sender<bool>,
53 pub connection: S,
54}
55
56impl<S> PoolConnection<S> {
57 pub fn new(notify_use: oneshot::Sender<bool>, connection: S) -> Self {
58 PoolConnection {
59 notify_use,
60 connection,
61 }
62 }
63
64 pub fn release(self) -> S {
65 let _ = self.notify_use.send(true);
67 self.connection
69 }
70}
71
72use crossbeam_queue::ArrayQueue;
73
74pub struct PoolNode<T> {
76 connections: Mutex<HashMap<ID, T>>,
77 hot_queue: ArrayQueue<(ID, T)>,
79 hot_queue_remove_lock: Mutex<()>,
81 }
83
84const HOT_QUEUE_SIZE: usize = 16;
86
87impl<T> PoolNode<T> {
88 pub fn new() -> Self {
90 PoolNode {
91 connections: Mutex::new(HashMap::new()),
92 hot_queue: ArrayQueue::new(HOT_QUEUE_SIZE),
93 hot_queue_remove_lock: Mutex::new(()),
94 }
95 }
96
97 pub fn get_any(&self) -> Option<(ID, T)> {
99 let hot_conn = self.hot_queue.pop();
100 if hot_conn.is_some() {
101 return hot_conn;
102 }
103 let mut connections = self.connections.lock();
104 let id = match connections.iter().next() {
106 Some((k, _)) => *k, None => return None,
108 };
109 let connection = connections.remove(&id).unwrap();
111 Some((id, connection))
115 }
117
118 pub fn insert(&self, id: ID, conn: T) {
120 if let Err(node) = self.hot_queue.push((id, conn)) {
121 let mut connections = self.connections.lock();
123 connections.insert(node.0, node.1); }
125 }
126
127 pub fn remove(&self, id: ID) -> Option<T> {
132 let removed = self.connections.lock().remove(&id);
134 if removed.is_some() {
135 return removed;
136 } let _queue_lock = self.hot_queue_remove_lock.lock();
139 let max_len = self.hot_queue.len();
141 for _ in 0..max_len {
142 if let Some((conn_id, conn)) = self.hot_queue.pop() {
143 if conn_id == id {
144 return Some(conn);
146 } else {
147 self.insert(conn_id, conn);
149 }
150 } else {
151 return None;
153 }
154 }
155 None
156 }
158}
159
160pub struct ConnectionPool<S> {
165 pool: RwLock<HashMap<GroupKey, Arc<PoolNode<PoolConnection<S>>>>>,
167 lru: Lru<ID, ConnectionMeta>,
168}
169
170impl<S> ConnectionPool<S> {
171 pub fn new(size: usize) -> Self {
175 ConnectionPool {
176 pool: RwLock::new(HashMap::with_capacity(size)), lru: Lru::new(size),
178 }
179 }
180
181 fn get_pool_node(&self, key: GroupKey) -> Arc<PoolNode<PoolConnection<S>>> {
183 {
184 let pool = self.pool.read();
185 if let Some(v) = pool.get(&key) {
186 return (*v).clone();
187 }
188 } {
191 let mut pool = self.pool.write();
193 if let Some(v) = pool.get(&key) {
195 return (*v).clone();
196 }
197 let node = Arc::new(PoolNode::new());
198 let node_ret = node.clone();
199 pool.insert(key, node); node_ret
201 }
202 }
203
204 fn pop_evicted(&self, meta: &ConnectionMeta) {
206 let pool_node = {
207 let pool = self.pool.read();
208 match pool.get(&meta.key) {
209 Some(v) => (*v).clone(),
210 None => {
211 warn!("Fail to get pool node for {:?}", meta);
212 return;
213 } }
215 }; pool_node.remove(meta.id);
218 debug!("evict fd: {} from key {}", meta.id, meta.key);
219 }
220
221 pub fn pop_closed(&self, meta: &ConnectionMeta) {
222 self.pop_evicted(meta);
224 self.lru.pop(&meta.id);
225 }
226
227 pub fn get(&self, key: &GroupKey) -> Option<S> {
229 let pool_node = {
230 let pool = self.pool.read();
231 match pool.get(key) {
232 Some(v) => (*v).clone(),
233 None => return None,
234 }
235 }; if let Some((id, connection)) = pool_node.get_any() {
238 self.lru.pop(&id); Some(connection.release())
240 } else {
241 None
242 }
243 }
244
245 pub fn put(
250 &self,
251 meta: &ConnectionMeta,
252 connection: S,
253 ) -> (Arc<Notify>, oneshot::Receiver<bool>) {
254 let (notify_close, replaced) = self.lru.add(meta.id, meta.clone());
255 if let Some(meta) = replaced {
256 self.pop_evicted(&meta);
257 };
258 let pool_node = self.get_pool_node(meta.key);
259 let (notify_use, watch_use) = oneshot::channel();
260 let connection = PoolConnection::new(notify_use, connection);
261 pool_node.insert(meta.id, connection);
262 (notify_close, watch_use)
263 }
264
265 pub async fn idle_poll<Stream>(
272 &self,
273 connection: OwnedMutexGuard<Stream>,
274 meta: &ConnectionMeta,
275 timeout: Option<Duration>,
276 notify_evicted: Arc<Notify>,
277 watch_use: oneshot::Receiver<bool>,
278 ) where
279 Stream: AsyncRead + Unpin + Send,
280 {
281 let read_result = tokio::select! {
282 biased;
283 _ = watch_use => {
284 debug!("idle connection is being picked up");
285 return
286 },
287 _ = notify_evicted.notified() => {
288 debug!("idle connection is being evicted");
289 return
291 }
292 read_result = read_with_timeout(connection , timeout) => read_result
293 };
294
295 match read_result {
296 Ok(n) => {
297 if n > 0 {
298 warn!("Data received on idle client connection, close it")
299 } else {
300 debug!("Peer closed the idle connection or timeout")
301 }
302 }
303
304 Err(e) => {
305 debug!("error with the idle connection, close it {:?}", e);
306 }
307 }
308 self.pop_closed(meta);
310 }
311
312 pub async fn idle_timeout(
317 &self,
318 meta: &ConnectionMeta,
319 timeout: Duration,
320 notify_evicted: Arc<Notify>,
321 mut notify_closed: watch::Receiver<bool>,
322 watch_use: oneshot::Receiver<bool>,
323 ) {
324 tokio::select! {
325 biased;
326 _ = watch_use => {
327 debug!("idle connection is being picked up");
328 },
329 _ = notify_evicted.notified() => {
330 debug!("idle connection is being evicted");
331 }
333 _ = notify_closed.changed() => {
334 debug!("idle connection is being closed");
336 self.pop_closed(meta);
337 }
338 _ = sleep(timeout) => {
339 debug!("idle connection is being evicted");
340 self.pop_closed(meta);
341 }
342 };
343 }
344}
345
346async fn read_with_timeout<S>(
347 mut connection: OwnedMutexGuard<S>,
348 timeout_duration: Option<Duration>,
349) -> io::Result<usize>
350where
351 S: AsyncRead + Unpin + Send,
352{
353 let mut buf = [0; 1];
354 let read_event = connection.read(&mut buf[..]);
355 match timeout_duration {
356 Some(d) => match timeout(d, read_event).await {
357 Ok(res) => res,
358 Err(e) => {
359 debug!("keepalive timeout {:?} reached, {:?}", d, e);
360 Ok(0)
361 }
362 },
363 _ => read_event.await,
364 }
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370 use log::debug;
371 use tokio::sync::Mutex as AsyncMutex;
372 use tokio_test::io::{Builder, Mock};
373
374 #[tokio::test]
375 async fn test_lookup() {
376 let meta1 = ConnectionMeta::new(101, 1);
377 let value1 = "v1".to_string();
378 let meta2 = ConnectionMeta::new(102, 2);
379 let value2 = "v2".to_string();
380 let meta3 = ConnectionMeta::new(101, 3);
381 let value3 = "v3".to_string();
382 let cp: ConnectionPool<String> = ConnectionPool::new(3); cp.put(&meta1, value1.clone());
384 cp.put(&meta2, value2.clone());
385 cp.put(&meta3, value3.clone());
386
387 let found_b = cp.get(&meta2.key).unwrap();
388 assert_eq!(found_b, value2);
389
390 let found_a1 = cp.get(&meta1.key).unwrap();
391 let found_a2 = cp.get(&meta1.key).unwrap();
392
393 assert!(
394 found_a1 == value1 && found_a2 == value3 || found_a2 == value1 && found_a1 == value3
395 );
396 }
397
398 #[tokio::test]
399 async fn test_pop() {
400 let meta1 = ConnectionMeta::new(101, 1);
401 let value1 = "v1".to_string();
402 let meta2 = ConnectionMeta::new(102, 2);
403 let value2 = "v2".to_string();
404 let meta3 = ConnectionMeta::new(101, 3);
405 let value3 = "v3".to_string();
406 let cp: ConnectionPool<String> = ConnectionPool::new(3); cp.put(&meta1, value1);
408 cp.put(&meta2, value2);
409 cp.put(&meta3, value3.clone());
410
411 cp.pop_closed(&meta1);
412
413 let found_a1 = cp.get(&meta1.key).unwrap();
414 assert_eq!(found_a1, value3);
415
416 cp.pop_closed(&meta1);
417 assert!(cp.get(&meta1.key).is_none())
418 }
419
420 #[tokio::test]
421 async fn test_eviction() {
422 let meta1 = ConnectionMeta::new(101, 1);
423 let value1 = "v1".to_string();
424 let meta2 = ConnectionMeta::new(102, 2);
425 let value2 = "v2".to_string();
426 let meta3 = ConnectionMeta::new(101, 3);
427 let value3 = "v3".to_string();
428 let cp: ConnectionPool<String> = ConnectionPool::new(2);
429 let (notify_close1, _) = cp.put(&meta1, value1.clone());
430 let (notify_close2, _) = cp.put(&meta2, value2.clone());
431 let (notify_close3, _) = cp.put(&meta3, value3.clone()); let closed_item = tokio::select! {
434 _ = notify_close1.notified() => {debug!("notifier1"); 1},
435 _ = notify_close2.notified() => {debug!("notifier2"); 2},
436 _ = notify_close3.notified() => {debug!("notifier3"); 3},
437 };
438 assert_eq!(closed_item, 1);
439
440 let found_a1 = cp.get(&meta1.key).unwrap();
441 assert_eq!(found_a1, value3);
442 assert_eq!(cp.get(&meta1.key), None)
443 }
444
445 #[tokio::test]
446 #[should_panic(expected = "There is still data left to read.")]
447 async fn test_read_close() {
448 let meta1 = ConnectionMeta::new(101, 1);
449 let mock_io1 = Arc::new(AsyncMutex::new(Builder::new().read(b"garbage").build()));
450 let meta2 = ConnectionMeta::new(102, 2);
451 let mock_io2 = Arc::new(AsyncMutex::new(
452 Builder::new().wait(Duration::from_secs(99)).build(),
453 ));
454 let meta3 = ConnectionMeta::new(101, 3);
455 let mock_io3 = Arc::new(AsyncMutex::new(
456 Builder::new().wait(Duration::from_secs(99)).build(),
457 ));
458 let cp: ConnectionPool<Arc<AsyncMutex<Mock>>> = ConnectionPool::new(3);
459 let (c1, u1) = cp.put(&meta1, mock_io1.clone());
460 let (c2, u2) = cp.put(&meta2, mock_io2.clone());
461 let (c3, u3) = cp.put(&meta3, mock_io3.clone());
462
463 let closed_item = tokio::select! {
464 _ = cp.idle_poll(mock_io1.try_lock_owned().unwrap(), &meta1, None, c1, u1) => {debug!("notifier1"); 1},
465 _ = cp.idle_poll(mock_io2.try_lock_owned().unwrap(), &meta1, None, c2, u2) => {debug!("notifier2"); 2},
466 _ = cp.idle_poll(mock_io3.try_lock_owned().unwrap(), &meta1, None, c3, u3) => {debug!("notifier3"); 3},
467 };
468 assert_eq!(closed_item, 1);
469
470 let _ = cp.get(&meta1.key).unwrap(); assert!(cp.get(&meta1.key).is_none()) }
473
474 #[tokio::test]
475 async fn test_read_timeout() {
476 let meta1 = ConnectionMeta::new(101, 1);
477 let mock_io1 = Arc::new(AsyncMutex::new(
478 Builder::new().wait(Duration::from_secs(99)).build(),
479 ));
480 let meta2 = ConnectionMeta::new(102, 2);
481 let mock_io2 = Arc::new(AsyncMutex::new(
482 Builder::new().wait(Duration::from_secs(99)).build(),
483 ));
484 let meta3 = ConnectionMeta::new(101, 3);
485 let mock_io3 = Arc::new(AsyncMutex::new(
486 Builder::new().wait(Duration::from_secs(99)).build(),
487 ));
488 let cp: ConnectionPool<Arc<AsyncMutex<Mock>>> = ConnectionPool::new(3);
489 let (c1, u1) = cp.put(&meta1, mock_io1.clone());
490 let (c2, u2) = cp.put(&meta2, mock_io2.clone());
491 let (c3, u3) = cp.put(&meta3, mock_io3.clone());
492
493 let closed_item = tokio::select! {
494 _ = cp.idle_poll(mock_io1.try_lock_owned().unwrap(), &meta1, Some(Duration::from_secs(1)), c1, u1) => {debug!("notifier1"); 1},
495 _ = cp.idle_poll(mock_io2.try_lock_owned().unwrap(), &meta1, Some(Duration::from_secs(2)), c2, u2) => {debug!("notifier2"); 2},
496 _ = cp.idle_poll(mock_io3.try_lock_owned().unwrap(), &meta1, Some(Duration::from_secs(3)), c3, u3) => {debug!("notifier3"); 3},
497 };
498 assert_eq!(closed_item, 1);
499
500 let _ = cp.get(&meta1.key).unwrap(); assert!(cp.get(&meta1.key).is_none()) }
503
504 #[tokio::test]
505 async fn test_evict_poll() {
506 let meta1 = ConnectionMeta::new(101, 1);
507 let mock_io1 = Arc::new(AsyncMutex::new(
508 Builder::new().wait(Duration::from_secs(99)).build(),
509 ));
510 let meta2 = ConnectionMeta::new(102, 2);
511 let mock_io2 = Arc::new(AsyncMutex::new(
512 Builder::new().wait(Duration::from_secs(99)).build(),
513 ));
514 let meta3 = ConnectionMeta::new(101, 3);
515 let mock_io3 = Arc::new(AsyncMutex::new(
516 Builder::new().wait(Duration::from_secs(99)).build(),
517 ));
518 let cp: ConnectionPool<Arc<AsyncMutex<Mock>>> = ConnectionPool::new(2);
519 let (c1, u1) = cp.put(&meta1, mock_io1.clone());
520 let (c2, u2) = cp.put(&meta2, mock_io2.clone());
521 let (c3, u3) = cp.put(&meta3, mock_io3.clone()); let closed_item = tokio::select! {
524 _ = cp.idle_poll(mock_io1.try_lock_owned().unwrap(), &meta1, None, c1, u1) => {debug!("notifier1"); 1},
525 _ = cp.idle_poll(mock_io2.try_lock_owned().unwrap(), &meta1, None, c2, u2) => {debug!("notifier2"); 2},
526 _ = cp.idle_poll(mock_io3.try_lock_owned().unwrap(), &meta1, None, c3, u3) => {debug!("notifier3"); 3},
527 };
528 assert_eq!(closed_item, 1);
529
530 let _ = cp.get(&meta1.key).unwrap(); assert!(cp.get(&meta1.key).is_none()) }
533}