net_pool/
pool.rs

1use crate::backend::{Address, BackendState};
2use crate::error::Error;
3use crate::strategy::Strategy;
4use std::sync::Arc;
5use std::sync::atomic::Ordering::Relaxed;
6use std::sync::atomic::{AtomicU64, AtomicUsize};
7use std::time::Duration;
8
9pub trait Pool: Send + Sync {
10    /// id
11    fn id(&self) -> &str {
12        ""
13    }
14
15    /// 设置最大连接数
16    fn set_max_conn(&self, max: Option<usize>);
17
18    /// 获取最大连接数
19    fn get_max_conn(&self) -> Option<usize>;
20
21    /// 获取当前的连接数
22    fn get_cur_conn(&self) -> usize;
23
24    /// 设置空闲连接保留时长
25    fn set_keepalive(&self, _: Option<Duration>) {}
26
27    /// 获取空闲连接保留时长
28    fn get_keepalive(&self) -> Option<Duration> {
29        None
30    }
31
32    /// 获取转发策略
33    fn get_strategy(&self) -> Arc<dyn Strategy>;
34
35    /// 添加一个后端地址
36    fn add_backend(&self, id: Option<u32>, addr: Address) {
37        self.get_strategy().add_backend(id, addr)
38    }
39
40    /// 根据策略获取一个后端
41    fn get_backend(&self, key: &str) -> Option<BackendState> {
42        self.get_strategy().get_backend(key)
43    }
44
45    /// 移除一个后端地址
46    fn remove_backend(&self, addr: &Address) -> bool {
47        self.get_strategy().remove_backend(addr)
48    }
49
50    /// 获取所有后端地址切片
51    fn get_backends(&self) -> Vec<BackendState> {
52        self.get_strategy().get_backends()
53    }
54
55    fn get_backend_by_id(&self, id: u32) -> Option<BackendState> {
56        self.get_strategy().get_backend_by_id(id)
57    }
58
59    fn get_backend_by_code(&self, code: u64) -> Option<BackendState> {
60        self.get_strategy().get_backend_by_code(code)
61    }
62
63    // 是否使用tls
64    fn use_tls(&self, _: bool) {}
65
66    fn tls(&self) -> bool {
67        false
68    }
69}
70
71pub struct BaseState {
72    pub max_conn: AtomicUsize, // usize::MAX表示无设置
73    pub cur_conn: AtomicUsize,
74    pub keepalive: AtomicU64, // 精度只到秒级别, u64::MAX表示无设置
75    pub lb_strategy: Arc<dyn Strategy>,
76}
77
78impl BaseState {
79    pub fn new(strategy: Arc<dyn Strategy>) -> Self {
80        BaseState {
81            max_conn: AtomicUsize::new(usize::MAX),
82            cur_conn: AtomicUsize::new(0),
83            keepalive: AtomicU64::new(u64::MAX),
84            lb_strategy: strategy,
85        }
86    }
87}
88
89pub fn increase_current(max: &AtomicUsize, cur: &AtomicUsize) -> Result<(), Error> {
90    let m = max.load(Relaxed);
91    if m == usize::MAX {
92        cur.fetch_add(1, Relaxed);
93        Ok(())
94    } else {
95        let mut c = cur.load(Relaxed);
96        loop {
97            if c == m {
98                break Err(Error::PoolFull);
99            }
100
101            match cur.compare_exchange(c, c + 1, Relaxed, Relaxed) {
102                Ok(_) => break Ok(()),
103                Err(s) => c = s,
104            }
105        }
106    }
107}