drpc/
balance.rs

1use dark_std::sync::SyncVec;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::sync::Arc;
4
5/// Defines the minimum abstraction required by the load algorithm.
6/// The four common load algorithms simply provide remote IP addresses
7/// to use the LoadBalance structure. The client must implement this trait.
8pub trait RpcClient {
9    fn addr(&self) -> &str;
10}
11
12#[derive(Debug)]
13pub struct LoadBalance<C>
14where
15    C: RpcClient,
16{
17    pub index: AtomicUsize,
18    pub rpc_clients: SyncVec<Arc<C>>,
19}
20
21/// A load balance type.
22#[derive(Clone, Debug, Copy)]
23pub enum LoadBalanceType {
24    /// RPC clients take turns to execute
25    Round,
26    /// RPC clients random pick one
27    Random,
28    /// RPC clients pick one by address's hashcode,so client_ip with client that is matches in pairs
29    Hash,
30    /// RPC clients pick on by Has the minimum number of TCP connections
31    MinConnect,
32}
33
34impl<C> LoadBalance<C>
35where
36    C: RpcClient,
37{
38    pub fn new() -> Self {
39        Self {
40            index: AtomicUsize::new(0),
41            rpc_clients: SyncVec::new(),
42        }
43    }
44
45    /// Put a client and return the old one.
46    pub fn put(&self, arg: C) -> Option<Arc<C>> {
47        let arg = Some(Arc::new(arg));
48        let addr = arg.as_deref().unwrap().addr();
49        let mut idx = 0;
50        for x in &self.rpc_clients {
51            if x.addr().eq(addr) {
52                let rm = self.rpc_clients.remove(idx);
53                if rm.is_none() {
54                    self.rpc_clients.push(arg.unwrap());
55                    return None;
56                }
57                return rm;
58            }
59            idx += 1;
60        }
61        if let Some(arg) = arg {
62            self.rpc_clients.push(arg);
63        }
64        return None;
65    }
66
67    pub fn remove(&self, address: &str) -> Option<Arc<C>> {
68        let mut idx = 0;
69        for x in &self.rpc_clients {
70            if x.addr().eq(address) {
71                return self.rpc_clients.remove(idx);
72            }
73            idx += 1;
74        }
75        return None;
76    }
77
78    pub fn contains(&self, address: &str) -> bool {
79        for x in &self.rpc_clients {
80            if x.addr().eq(address) {
81                return true;
82            }
83        }
84        return false;
85    }
86
87    pub fn clear(&self) {
88        self.rpc_clients.clear();
89    }
90
91    pub fn do_balance(&self, b: LoadBalanceType, from: &str) -> Option<Arc<C>> {
92        match b {
93            LoadBalanceType::Round => self.round_pick_client(),
94            LoadBalanceType::Random => self.random_pick_client(),
95            LoadBalanceType::Hash => self.hash_pick_client(from),
96            LoadBalanceType::MinConnect => self.min_connect_client(),
97        }
98    }
99
100    fn hash_pick_client(&self, from: &str) -> Option<Arc<C>> {
101        let length = self.rpc_clients.len() as i64;
102        if length == 0 {
103            return None;
104        }
105        let def_key: String;
106        if from.is_empty() {
107            def_key = format!("{}", rand::random::<i32>());
108        } else {
109            def_key = from.to_string();
110        }
111        let hash = {
112            let mut value = 0i64;
113            let mut i = 0;
114            for x in def_key.as_bytes() {
115                i += 1;
116                value += (*x as i64) * i;
117            }
118            value
119        };
120        return Some(
121            self.rpc_clients
122                .get((hash % length) as usize)
123                .unwrap()
124                .clone(),
125        );
126    }
127
128    fn random_pick_client(&self) -> Option<Arc<C>> {
129        let length = self.rpc_clients.len();
130        if length == 0 {
131            return None;
132        }
133        use rand::{thread_rng, Rng};
134        let mut rng = thread_rng();
135        let rand_index: usize = rng.gen_range(0..length);
136        if rand_index < length {
137            return Some(self.rpc_clients.get(rand_index).unwrap().clone());
138        }
139        return None;
140    }
141
142    fn round_pick_client(&self) -> Option<Arc<C>> {
143        let length = self.rpc_clients.len();
144        if length == 0 {
145            return None;
146        }
147        let idx = self.index.load(Ordering::SeqCst);
148        if (idx + 1) >= length {
149            self.index.store(0, Ordering::SeqCst)
150        } else {
151            self.index.store(idx + 1, Ordering::SeqCst);
152        }
153        let return_obj = self.rpc_clients.get(idx).unwrap().clone();
154        return Some(return_obj);
155    }
156
157    fn min_connect_client(&self) -> Option<Arc<C>> {
158        let mut min = -1i64;
159        let mut result = None;
160        for x in &self.rpc_clients {
161            if min == -1 || Arc::strong_count(x) < min as usize {
162                min = Arc::strong_count(x) as i64;
163                result = Some(x.clone());
164            }
165        }
166        result
167    }
168}