net_pool/strategy/
ch_strategy.rs

1use crate::backend::{Address, BackendState};
2use crate::utils::bytes_to_hash_code;
3use std::collections::BTreeMap;
4use std::sync::RwLock;
5
6/// 一致性hash负载策略
7pub struct CHStrategy {
8    inner: RwLock<Inner>,
9}
10
11struct Inner {
12    /// usize:虚拟的后端个数
13    virtual_count: usize,
14    /// 排序后用来表示环形
15    tree: BTreeMap<u64, u64>,
16    /// HashSet<BackendState>: 真实的节点
17    bss: super::backends::BackendStates,
18}
19
20impl CHStrategy {
21    pub fn new(virtual_count: usize) -> Self {
22        if virtual_count == 0 {
23            return CHStrategy::default();
24        }
25        CHStrategy {
26            inner: RwLock::new(Inner {
27                virtual_count,
28                tree: BTreeMap::new(),
29                bss: super::backends::BackendStates::new(),
30            }),
31        }
32    }
33
34    fn sub_address(id: usize, addr: &Address) -> String {
35        let mut flag = match addr {
36            Address::Ori(ori) => ori.clone(),
37            Address::Addr(addr) => crate::utils::socketaddr_to_ip_string(addr),
38        };
39
40        flag.push('#');
41        flag.push_str(&id.to_string());
42        flag
43    }
44}
45
46impl Default for CHStrategy {
47    fn default() -> Self {
48        CHStrategy {
49            inner: RwLock::new(Inner {
50                virtual_count: 64,
51                tree: BTreeMap::new(),
52                bss: super::backends::BackendStates::new(),
53            }),
54        }
55    }
56}
57
58impl super::Strategy for CHStrategy {
59    fn contain(&self, addr: &Address) -> bool {
60        let g = self.inner.read().unwrap();
61        g.bss.contain(addr)
62    }
63
64    fn add_backend(&self, id: Option<u32>, addr: Address) {
65        let src_code = addr.hash_code();
66        let mut g = self.inner.write().unwrap();
67        // 添加虚拟节点
68        for idx in 0..g.virtual_count {
69            let address = Self::sub_address(idx, &addr);
70            let code = bytes_to_hash_code(address.as_bytes());
71            g.tree.insert(code, src_code);
72        }
73        // 添加真实节点
74        g.bss.add(id, addr);
75    }
76
77    fn remove_backend(&self, addr: &Address) -> bool {
78        let mut g = self.inner.write().unwrap();
79        // 删除虚拟节点
80        for id in 0..g.virtual_count {
81            let address = Self::sub_address(id, addr);
82            let code = bytes_to_hash_code(address.as_bytes());
83            if let None = g.tree.remove(&code) {
84                return false;
85            }
86        }
87        // 删除真实节点
88        g.bss.remove(addr)
89    }
90
91    /// 获取一个后端节点
92    fn get_backend(&self, key: &str) -> Option<BackendState> {
93        let g = self.inner.read().unwrap();
94        if g.tree.is_empty() {
95            return None;
96        }
97
98        // 算hash值
99        let code = bytes_to_hash_code(key.as_bytes());
100
101        // 获取虚拟节点
102        let mut o = g
103            .tree
104            .iter()
105            .find(|(key, _)| if code <= **key { true } else { false });
106
107        if o.is_none() {
108            // 使用第一个虚拟节点
109            o = g.tree.iter().min();
110        };
111
112        g.bss.get_by_code(*o.unwrap().1).map(|b| b.clone())
113    }
114
115    fn get_backend_by_id(&self, id: u32) -> Option<BackendState> {
116        let g = self.inner.read().unwrap();
117        g.bss.get_by_id(id).map(|b| b.clone())
118    }
119
120    fn get_backend_by_code(&self, code: u64) -> Option<BackendState> {
121        let g = self.inner.read().unwrap();
122        g.bss.get_by_code(code).map(|b| b.clone())
123    }
124
125    fn get_backends(&self) -> Vec<BackendState> {
126        let g = self.inner.read().unwrap();
127        let bss = g.bss.get_all();
128        bss.iter().map(|bs| bs.clone()).collect()
129    }
130}
131
132#[test]
133fn test() {
134    let s = CHStrategy::default();
135    super::Strategy::add_backend(&s, None, Address::from(":10001"));
136    super::Strategy::add_backend(&s, Some(2), Address::from(":10002"));
137    println!("{:?}", super::Strategy::get_backend(&s, "123"));
138    println!("{:?}", super::Strategy::get_backend(&s, "123"));
139    println!("{:?}", super::Strategy::get_backend(&s, "123"));
140    println!("{:?}", super::Strategy::get_backend(&s, "1234"));
141    println!("{:?}", super::Strategy::get_backend(&s, "1234"));
142    println!("{:?}", super::Strategy::get_backend_by_id(&s, 2));
143    println!(
144        "{:?}",
145        super::Strategy::get_backend_by_code(&s, 15291072903040886511)
146    );
147    super::Strategy::remove_backend(&s, &Address::from(":10002"));
148    println!("{:?}", super::Strategy::get_backend_by_id(&s, 2));
149    println!("{:?}", super::Strategy::get_backend(&s, "1234"));
150    println!(
151        "{:?}",
152        super::Strategy::get_backend_by_code(&s, 15291072903040886511)
153    );
154}