net_pool/strategy/
ch_strategy.rs

1use crate::backend::{Address, BackendState};
2use crate::utils::bytes_to_hash_code;
3use std::collections::{BTreeMap, HashSet};
4use std::sync::RwLock;
5
6/// 一致性hash负载策略
7
8pub struct CHStrategy {
9    /// usize:虚拟的后端个数
10    /// BTreeMap<u64, BackendState>: 排序后用来表示环形
11    /// HashSet<BackendState>: 真实的节点
12    inner: RwLock<(usize, BTreeMap<u64, BackendState>, HashSet<BackendState>)>,
13}
14
15impl CHStrategy {
16    pub fn new(virtual_backend_count: usize) -> Self {
17        if virtual_backend_count == 0 {
18            return CHStrategy::default();
19        }
20        CHStrategy {
21            inner: RwLock::new((virtual_backend_count, BTreeMap::new(), HashSet::new())),
22        }
23    }
24
25    fn sub_address(id: usize, addr: &Address) -> String {
26        let mut flag = match addr {
27            Address::Ori(ori) => ori.clone(),
28            Address::Addr(addr) => crate::utils::socketaddr_to_ip_string(addr),
29        };
30
31        flag.push('#');
32        flag.push_str(&id.to_string());
33        flag
34    }
35}
36
37impl Default for CHStrategy {
38    fn default() -> Self {
39        CHStrategy {
40            inner: RwLock::new((64, BTreeMap::new(), HashSet::new())),
41        }
42    }
43}
44
45impl super::LbStrategy for CHStrategy {
46    fn strategy(&self) -> super::Strategy {
47        super::Strategy::ConsistentHash
48    }
49
50    fn contain(&self, hash_code: u64) -> bool {
51        let g = self.inner.read().unwrap();
52        g.2.iter()
53            .find(|inner| inner.hash_code() == hash_code)
54            .is_some()
55    }
56
57    fn add_backend(&self, addr: Address) {
58        let mut g = self.inner.write().unwrap();
59        let bs = BackendState::new(addr.clone());
60
61        for id in 0..g.0 {
62            let address = Self::sub_address(id, &addr);
63            let code = bytes_to_hash_code(address.as_bytes());
64            g.1.insert(code, bs.clone());
65        }
66
67        g.2.insert(bs);
68    }
69
70    fn remove_backend(&self, addr: &Address) -> bool {
71        let mut g = self.inner.write().unwrap();
72
73        for id in 0..g.0 {
74            let address = Self::sub_address(id, addr);
75            let code = bytes_to_hash_code(address.as_bytes());
76            if let None = g.1.remove(&code) {
77                return false;
78            }
79        }
80
81        g.2.remove(&BackendState::new(addr.clone()))
82    }
83
84    fn get_backend(&self, key: &str) -> Option<BackendState> {
85        let g = self.inner.read().unwrap();
86        if g.1.is_empty() {
87            return None;
88        }
89
90        let code = bytes_to_hash_code(key.as_bytes());
91
92        let o =
93            g.1.iter()
94                .find(|(key, _)| if code <= **key { true } else { false });
95
96        if o.is_none() {
97            g.1.iter().min().map(|(_, bs)| bs.clone())
98        } else {
99            o.map(|(_, bs)| bs.clone())
100        }
101    }
102
103    fn get_backends(&self) -> Vec<BackendState> {
104        let g = self.inner.read().unwrap();
105        let bss: Vec<BackendState> = g.1.iter().map(|kv| kv.1.clone()).collect();
106        bss
107    }
108
109    fn get_origin_backends(&self) -> Vec<BackendState> {
110        let g = self.inner.read().unwrap();
111        g.2.iter().map(|bs| bs.clone()).collect()
112    }
113}