1use std::sync::Arc;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use mco::std::sync::SyncVec;
4
5pub trait RpcClient {
9 fn addr(&self) -> &str;
10}
11
12#[derive(Debug)]
13pub struct LoadBalance<C> where C: RpcClient {
14 pub index: AtomicUsize,
15 pub rpc_clients: SyncVec<Arc<C>>,
16}
17
18#[derive(Clone, Debug, Copy)]
20pub enum LoadBalanceType {
21 Round,
23 Random,
25 Hash,
27 MinConnect,
29}
30
31impl<C> LoadBalance<C> where C: RpcClient {
32 pub fn new() -> Self {
33 Self {
34 index: AtomicUsize::new(0),
35 rpc_clients: SyncVec::new(),
36 }
37 }
38
39 pub fn put(&self, arg: C) -> Option<Arc<C>> {
41 let arg = Some(Arc::new(arg));
42 let addr = arg.as_deref().unwrap().addr();
43 let mut idx = 0;
44 for x in &self.rpc_clients {
45 if x.addr().eq(addr) {
46 let rm = self.rpc_clients.remove(idx);
47 if rm.is_none() {
48 self.rpc_clients.push(arg.unwrap());
49 return None;
50 }
51 return rm;
52 }
53 idx += 1;
54 }
55 if let Some(arg) = arg {
56 self.rpc_clients.push(arg);
57 }
58 return None;
59 }
60
61 pub fn remove(&self, address: &str) -> Option<Arc<C>> {
62 let mut idx = 0;
63 for x in &self.rpc_clients {
64 if x.addr().eq(address) {
65 return self.rpc_clients.remove(idx);
66 }
67 idx += 1;
68 }
69 return None;
70 }
71
72 pub fn contains(&self, address: &str) -> bool {
73 for x in &self.rpc_clients {
74 if x.addr().eq(address) {
75 return true;
76 }
77 }
78 return false;
79 }
80
81 pub fn clear(&self) {
82 self.rpc_clients.clear();
83 }
84
85 pub fn do_balance(&self, b: LoadBalanceType, client_ip: &str) -> Option<Arc<C>> {
86 match b {
87 LoadBalanceType::Round => {
88 self.round_pick_client()
89 }
90 LoadBalanceType::Random => {
91 self.random_pick_client()
92 }
93 LoadBalanceType::Hash => {
94 self.hash_pick_client(client_ip)
95 }
96 LoadBalanceType::MinConnect => {
97 self.min_connect_client()
98 }
99 }
100 }
101
102 fn hash_pick_client(&self, client_ip: &str) -> Option<Arc<C>> {
103 let length = self.rpc_clients.len() as i64;
104 if length == 0 {
105 return None;
106 }
107 let def_key: String;
108 if client_ip.is_empty() {
109 def_key = format!("{}", rand::random::<i32>());
110 } else {
111 def_key = client_ip.to_string();
112 }
113 let hash = {
114 let mut value = 0i64;
115 let mut i = 0;
116 for x in def_key.as_bytes() {
117 i += 1;
118 value += (*x as i64) * i;
119 }
120 value
121 };
122 return Some(self.rpc_clients.get((hash % length) as usize).unwrap().clone());
123 }
124
125 fn random_pick_client(&self) -> Option<Arc<C>> {
126 let length = self.rpc_clients.len();
127 if length == 0 {
128 return None;
129 }
130 use rand::{thread_rng, Rng};
131 let mut rng = thread_rng();
132 let rand_index: usize = rng.gen_range(0..length);
133 if rand_index < length {
134 return Some(self.rpc_clients.get(rand_index).unwrap().clone());
135 }
136 return None;
137 }
138
139 fn round_pick_client(&self) -> Option<Arc<C>> {
140 let length = self.rpc_clients.len();
141 if length == 0 {
142 return None;
143 }
144 let idx = self.index.load(Ordering::SeqCst);
145 if (idx + 1) >= length {
146 self.index.store(0, Ordering::SeqCst)
147 } else {
148 self.index.store(idx + 1, Ordering::SeqCst);
149 }
150 let return_obj = self.rpc_clients.get(idx).unwrap().clone();
151 return Some(return_obj);
152 }
153
154 fn min_connect_client(&self) -> Option<Arc<C>> {
155 let mut min = -1i64;
156 let mut result = None;
157 for x in &self.rpc_clients {
158 if min == -1 || Arc::strong_count(x) < min as usize {
159 min = Arc::strong_count(x) as i64;
160 result = Some(x.clone());
161 }
162 }
163 result
164 }
165}
166
167#[cfg(test)]
168mod test {
169 use balance::{RpcClient, LoadBalance, LoadBalanceType};
170
171 impl RpcClient for String {
172 fn addr(&self) -> &str {
173 &self
174 }
175 }
176
177 #[test]
178 fn test_put() {
179 let mut load: LoadBalance<String> = LoadBalance::new();
180 load.put("127.0.0.1:13000".to_string());
181 load.put("127.0.0.1:13001".to_string());
182
183 let old = load.put("127.0.0.1:13001".to_string()).unwrap();
184 assert_eq!(old.addr(), "127.0.0.1:13001".to_string());
185 }
186
187 #[test]
188 fn test_remove() {
189 let mut load: LoadBalance<String> = LoadBalance::new();
190 load.put("127.0.0.1:13000".to_string());
191 load.put("127.0.0.1:13001".to_string());
192
193 let old = load.remove("127.0.0.1:13000").unwrap();
194 assert_eq!(old.addr(), "127.0.0.1:13000".to_string());
195 }
196
197 #[test]
198 fn test_min_connect() {
199 let mut load: LoadBalance<String> = LoadBalance::new();
200 load.put("127.0.0.1:13000".to_string());
201 load.put("127.0.0.1:13001".to_string());
202 load.put("127.0.0.1:13002".to_string());
203 load.put("127.0.0.1:13003".to_string());
204 let mut v = vec![];
205 let item = load.do_balance(LoadBalanceType::MinConnect, "");
206 println!("select:{}", item.as_ref().unwrap().addr());
207 v.push(item);
208 let item = load.do_balance(LoadBalanceType::MinConnect, "");
209 println!("select:{}", item.as_ref().unwrap().addr());
210 v.push(item);
211 let item = load.do_balance(LoadBalanceType::MinConnect, "");
212 println!("select:{}", item.as_ref().unwrap().addr());
213 v.push(item);
214 let item = load.do_balance(LoadBalanceType::MinConnect, "");
215 println!("select:{}", item.as_ref().unwrap().addr());
216 v.push(item);
217 let item = load.do_balance(LoadBalanceType::MinConnect, "");
218 println!("select:{}", item.as_ref().unwrap().addr());
219 v.push(item);
220 let item = load.do_balance(LoadBalanceType::MinConnect, "");
221 println!("select:{}", item.as_ref().unwrap().addr());
222 v.push(item);
223 }
224}