net_pool/
pool.rs

1use crate::backend::{Address, BackendState};
2use crate::error::Error;
3use crate::strategy::Strategy;
4use std::sync::Arc;
5use std::sync::atomic::Ordering::Relaxed;
6use std::sync::atomic::{AtomicU64, AtomicUsize};
7use std::time::Duration;
8
9pub trait Pool: Send + Sync {
10    /// id
11    fn get_id(&self) -> &str;
12
13    /// 设置id
14    fn set_id(&mut self, id: &str);
15    
16    /// 设置最大连接数
17    fn set_max_conn(&self, max: Option<usize>);
18
19    /// 获取最大连接数
20    fn get_max_conn(&self) -> Option<usize>;
21
22    /// 获取当前的连接数
23    fn get_cur_conn(&self) -> usize;
24
25    /// 设置空闲连接保留时长
26    fn set_keepalive(&self, _: Option<Duration>) {}
27
28    /// 获取空闲连接保留时长
29    fn get_keepalive(&self) -> Option<Duration> {
30        None
31    }
32
33    /// 获取转发策略
34    fn get_strategy(&self) -> Arc<dyn Strategy>;
35
36    /// 设置策略
37    fn set_strategy(&mut self, strategy: Arc<dyn Strategy>);
38    
39    /// 添加一个后端地址
40    fn add_backend(&self, id: Option<u32>, addr: Address) {
41        self.get_strategy().add_backend(id, addr)
42    }
43
44    /// 根据策略获取一个后端
45    fn get_backend(&self, key: &str) -> Option<BackendState> {
46        self.get_strategy().get_backend(key)
47    }
48
49    /// 移除一个后端地址
50    fn remove_backend(&self, addr: &Address) -> bool {
51        self.get_strategy().remove_backend(addr)
52    }
53
54    /// 获取所有后端地址切片
55    fn get_backends(&self) -> Vec<BackendState> {
56        self.get_strategy().get_backends()
57    }
58
59    fn get_backend_by_id(&self, id: u32) -> Option<BackendState> {
60        self.get_strategy().get_backend_by_id(id)
61    }
62
63    fn get_backend_by_code(&self, code: u64) -> Option<BackendState> {
64        self.get_strategy().get_backend_by_code(code)
65    }
66
67    // 是否使用tls
68    fn use_tls(&self, _: bool) {}
69
70    fn tls(&self) -> bool {
71        false
72    }
73}
74
75pub struct BaseState {
76    /// id标识
77    pub id: String,
78    /// usize::MAX表示无设置
79    pub max_conn: AtomicUsize, 
80    pub cur_conn: AtomicUsize,
81    /// 精度只到秒级别, u64::MAX表示无设置
82    pub keepalive: AtomicU64, 
83    pub lb_strategy: Arc<dyn Strategy>,
84}
85
86impl BaseState {
87    pub fn new(strategy: Arc<dyn Strategy>) -> Self {
88        BaseState {
89            id: String::new(),
90            max_conn: AtomicUsize::new(usize::MAX),
91            cur_conn: AtomicUsize::new(0),
92            keepalive: AtomicU64::new(u64::MAX),
93            lb_strategy: strategy,
94        }
95    }
96}
97
98pub fn increase_current(max: &AtomicUsize, cur: &AtomicUsize) -> Result<(), Error> {
99    let m = max.load(Relaxed);
100    if m == usize::MAX {
101        cur.fetch_add(1, Relaxed);
102        Ok(())
103    } else {
104        let mut c = cur.load(Relaxed);
105        loop {
106            if c == m {
107                break Err(Error::PoolFull);
108            }
109
110            match cur.compare_exchange(c, c + 1, Relaxed, Relaxed) {
111                Ok(_) => break Ok(()),
112                Err(s) => c = s,
113            }
114        }
115    }
116}