1use crate::backend::{Address, BackendState};
2use crate::error::Error;
3use crate::strategy::Strategy;
4use std::sync::Arc;
5use std::sync::atomic::Ordering::Relaxed;
6use std::sync::atomic::{AtomicU64, AtomicUsize};
7use std::time::Duration;
8
9pub trait Pool: Send + Sync {
10 fn id(&self) -> &str {
12 ""
13 }
14
15 fn set_max_conn(&self, max: Option<usize>);
17
18 fn get_max_conn(&self) -> Option<usize>;
20
21 fn get_cur_conn(&self) -> usize;
23
24 fn set_keepalive(&self, _: Option<Duration>) {}
26
27 fn get_keepalive(&self) -> Option<Duration> {
29 None
30 }
31
32 fn get_strategy(&self) -> Arc<dyn Strategy>;
34
35 fn add_backend(&self, id: Option<u32>, addr: Address) {
37 self.get_strategy().add_backend(id, addr)
38 }
39
40 fn get_backend(&self, key: &str) -> Option<BackendState> {
42 self.get_strategy().get_backend(key)
43 }
44
45 fn remove_backend(&self, addr: &Address) -> bool {
47 self.get_strategy().remove_backend(addr)
48 }
49
50 fn get_backends(&self) -> Vec<BackendState> {
52 self.get_strategy().get_backends()
53 }
54
55 fn get_backend_by_id(&self, id: u32) -> Option<BackendState> {
56 self.get_strategy().get_backend_by_id(id)
57 }
58
59 fn get_backend_by_code(&self, code: u64) -> Option<BackendState> {
60 self.get_strategy().get_backend_by_code(code)
61 }
62
63 fn use_tls(&self, _: bool) {}
65
66 fn tls(&self) -> bool {
67 false
68 }
69}
70
71pub struct BaseState {
72 pub max_conn: AtomicUsize, pub cur_conn: AtomicUsize,
74 pub keepalive: AtomicU64, pub lb_strategy: Arc<dyn Strategy>,
76}
77
78impl BaseState {
79 pub fn new(strategy: Arc<dyn Strategy>) -> Self {
80 BaseState {
81 max_conn: AtomicUsize::new(usize::MAX),
82 cur_conn: AtomicUsize::new(0),
83 keepalive: AtomicU64::new(u64::MAX),
84 lb_strategy: strategy,
85 }
86 }
87}
88
89pub fn increase_current(max: &AtomicUsize, cur: &AtomicUsize) -> Result<(), Error> {
90 let m = max.load(Relaxed);
91 if m == usize::MAX {
92 cur.fetch_add(1, Relaxed);
93 Ok(())
94 } else {
95 let mut c = cur.load(Relaxed);
96 loop {
97 if c == m {
98 break Err(Error::PoolFull);
99 }
100
101 match cur.compare_exchange(c, c + 1, Relaxed, Relaxed) {
102 Ok(_) => break Ok(()),
103 Err(s) => c = s,
104 }
105 }
106 }
107}