mco_rpc/
balance.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use mco::std::sync::SyncVec;
4
5///Defines the minimum abstraction required by the load algorithm
6///The four common load algorithms simply provide remote IP addresses
7///To use the LoadBalance structure, the client must implement this trait
8pub trait RpcClient {
9    fn addr(&self) -> &str;
10}
11
12#[derive(Debug)]
13pub struct LoadBalance<C> where C: RpcClient {
14    pub index: AtomicUsize,
15    pub rpc_clients: SyncVec<Arc<C>>,
16}
17
18/// an load balance type.
19#[derive(Clone, Debug, Copy)]
20pub enum LoadBalanceType {
21    /// RPC clients take turns to execute
22    Round,
23    /// RPC clients random pick one
24    Random,
25    /// RPC clients pick one by address's hashcode,so client_ip with client that is matches in pairs
26    Hash,
27    /// RPC clients pick on by Has the minimum number of TCP connections
28    MinConnect,
29}
30
31impl<C> LoadBalance<C> where C: RpcClient {
32    pub fn new() -> Self {
33        Self {
34            index: AtomicUsize::new(0),
35            rpc_clients: SyncVec::new(),
36        }
37    }
38
39    /// put client,and return old client
40    pub fn put(&self, arg: C) -> Option<Arc<C>> {
41        let arg = Some(Arc::new(arg));
42        let addr = arg.as_deref().unwrap().addr();
43        let mut idx = 0;
44        for x in &self.rpc_clients {
45            if x.addr().eq(addr) {
46                let rm = self.rpc_clients.remove(idx);
47                if rm.is_none() {
48                    self.rpc_clients.push(arg.unwrap());
49                    return None;
50                }
51                return rm;
52            }
53            idx += 1;
54        }
55        if let Some(arg) = arg {
56            self.rpc_clients.push(arg);
57        }
58        return None;
59    }
60
61    pub fn remove(&self, address: &str) -> Option<Arc<C>> {
62        let mut idx = 0;
63        for x in &self.rpc_clients {
64            if x.addr().eq(address) {
65                return self.rpc_clients.remove(idx);
66            }
67            idx += 1;
68        }
69        return None;
70    }
71
72    pub fn contains(&self, address: &str) -> bool {
73        for x in &self.rpc_clients {
74            if x.addr().eq(address) {
75                return true;
76            }
77        }
78        return false;
79    }
80
81    pub fn clear(&self) {
82        self.rpc_clients.clear();
83    }
84
85    pub fn do_balance(&self, b: LoadBalanceType, client_ip: &str) -> Option<Arc<C>> {
86        match b {
87            LoadBalanceType::Round => {
88                self.round_pick_client()
89            }
90            LoadBalanceType::Random => {
91                self.random_pick_client()
92            }
93            LoadBalanceType::Hash => {
94                self.hash_pick_client(client_ip)
95            }
96            LoadBalanceType::MinConnect => {
97                self.min_connect_client()
98            }
99        }
100    }
101
102    fn hash_pick_client(&self, client_ip: &str) -> Option<Arc<C>> {
103        let length = self.rpc_clients.len() as i64;
104        if length == 0 {
105            return None;
106        }
107        let def_key: String;
108        if client_ip.is_empty() {
109            def_key = format!("{}", rand::random::<i32>());
110        } else {
111            def_key = client_ip.to_string();
112        }
113        let hash = {
114            let mut value = 0i64;
115            let mut i = 0;
116            for x in def_key.as_bytes() {
117                i += 1;
118                value += (*x as i64) * i;
119            }
120            value
121        };
122        return Some(self.rpc_clients.get((hash % length) as usize).unwrap().clone());
123    }
124
125    fn random_pick_client(&self) -> Option<Arc<C>> {
126        let length = self.rpc_clients.len();
127        if length == 0 {
128            return None;
129        }
130        use rand::{thread_rng, Rng};
131        let mut rng = thread_rng();
132        let rand_index: usize = rng.gen_range(0..length);
133        if rand_index < length {
134            return Some(self.rpc_clients.get(rand_index).unwrap().clone());
135        }
136        return None;
137    }
138
139    fn round_pick_client(&self) -> Option<Arc<C>> {
140        let length = self.rpc_clients.len();
141        if length == 0 {
142            return None;
143        }
144        let idx = self.index.load(Ordering::SeqCst);
145        if (idx + 1) >= length {
146            self.index.store(0, Ordering::SeqCst)
147        } else {
148            self.index.store(idx + 1, Ordering::SeqCst);
149        }
150        let return_obj = self.rpc_clients.get(idx).unwrap().clone();
151        return Some(return_obj);
152    }
153
154    fn min_connect_client(&self) -> Option<Arc<C>> {
155        let mut min = -1i64;
156        let mut result = None;
157        for x in &self.rpc_clients {
158            if min == -1 || Arc::strong_count(x) < min as usize {
159                min = Arc::strong_count(x) as i64;
160                result = Some(x.clone());
161            }
162        }
163        result
164    }
165}
166
167#[cfg(test)]
168mod test {
169    use balance::{RpcClient, LoadBalance, LoadBalanceType};
170
171    impl RpcClient for String {
172        fn addr(&self) -> &str {
173            &self
174        }
175    }
176
177    #[test]
178    fn test_put() {
179        let mut load: LoadBalance<String> = LoadBalance::new();
180        load.put("127.0.0.1:13000".to_string());
181        load.put("127.0.0.1:13001".to_string());
182
183        let old = load.put("127.0.0.1:13001".to_string()).unwrap();
184        assert_eq!(old.addr(), "127.0.0.1:13001".to_string());
185    }
186
187    #[test]
188    fn test_remove() {
189        let mut load: LoadBalance<String> = LoadBalance::new();
190        load.put("127.0.0.1:13000".to_string());
191        load.put("127.0.0.1:13001".to_string());
192
193        let old = load.remove("127.0.0.1:13000").unwrap();
194        assert_eq!(old.addr(), "127.0.0.1:13000".to_string());
195    }
196
197    #[test]
198    fn test_min_connect() {
199        let mut load: LoadBalance<String> = LoadBalance::new();
200        load.put("127.0.0.1:13000".to_string());
201        load.put("127.0.0.1:13001".to_string());
202        load.put("127.0.0.1:13002".to_string());
203        load.put("127.0.0.1:13003".to_string());
204        let mut v = vec![];
205        let item = load.do_balance(LoadBalanceType::MinConnect, "");
206        println!("select:{}", item.as_ref().unwrap().addr());
207        v.push(item);
208        let item = load.do_balance(LoadBalanceType::MinConnect, "");
209        println!("select:{}", item.as_ref().unwrap().addr());
210        v.push(item);
211        let item = load.do_balance(LoadBalanceType::MinConnect, "");
212        println!("select:{}", item.as_ref().unwrap().addr());
213        v.push(item);
214        let item = load.do_balance(LoadBalanceType::MinConnect, "");
215        println!("select:{}", item.as_ref().unwrap().addr());
216        v.push(item);
217        let item = load.do_balance(LoadBalanceType::MinConnect, "");
218        println!("select:{}", item.as_ref().unwrap().addr());
219        v.push(item);
220        let item = load.do_balance(LoadBalanceType::MinConnect, "");
221        println!("select:{}", item.as_ref().unwrap().addr());
222        v.push(item);
223    }
224}