concurrent_round_robin/
lib.rs

1
2use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
3use chrono::{Local};
4use dashmap::{DashMap};
5
6use dashmap::mapref::one::RefMut;
7
8
9struct WeightedRoundRobin{
10    weight: i64,
11    current: AtomicI64,
12    last_update: i64,
13}
14
15impl WeightedRoundRobin {
16    pub fn new_with_weight(weight: i64) ->  WeightedRoundRobin{
17        WeightedRoundRobin{
18            weight,
19            current: AtomicI64::new(0),
20            last_update: 0
21        }
22    }
23    fn increase_current(&self) -> i64{
24        self.current.fetch_add(self.weight, Ordering::SeqCst)
25    }
26    fn sel(&self, total: i64) {
27        let _ = self.current.fetch_add(-1 * total, Ordering::SeqCst);
28    }
29}
30
31/// # Examples
32/// ```
33/// use concurrent_round_robin::WeightedRoundRobinSelector;
34/// let balancer = Arc::new(WeightedRoundRobinSelector::new(
35///     vec![("1", 1), ("2", 2)]
36/// ));
37/// for _i in 0..100 {
38///     let arc = balancer.clone();
39///     std::thread::spawn(move ||{
40///         println!("{}", arc.select().unwrap().value());
41///     });
42/// }
43/// ```
44pub struct WeightedRoundRobinSelector<T> {
45    size: AtomicUsize,
46    elements_map: DashMap<usize, T>,
47    weight_map: DashMap<usize, usize>,
48    weighted_round_robin_map: DashMap<usize, WeightedRoundRobin>,
49}
50
51unsafe impl <T> Send for WeightedRoundRobinSelector<T> {}
52
53impl <T> Drop for WeightedRoundRobinSelector<T>{
54    fn drop(&mut self) {
55        self.size.store(0, Ordering::Release);
56    }
57}
58
59
60impl  <T> WeightedRoundRobinSelector<T>{
61
62    pub fn new(elements_with_weight: Vec<(T, usize)>) ->  WeightedRoundRobinSelector<T>{
63        let elements_map = DashMap::new();
64        let weight_map = DashMap::new();
65        let mut i = 0;
66        for  (e, w) in elements_with_weight {
67            elements_map.insert( i, e);
68            weight_map.insert(i, w);
69            i+=1;
70        }
71        WeightedRoundRobinSelector {
72            size: AtomicUsize::new(i),
73            elements_map, weight_map, weighted_round_robin_map: DashMap::new()
74        }
75    }
76
77    pub fn close(self) -> Vec<(T, usize)> {
78        let size = self.size.load(Ordering::Acquire);
79        self.size.store(0, Ordering::Release);
80        let mut vec = Vec::new();
81        for i in 0..size {
82            let x = self.elements_map.remove(&i).unwrap();
83            vec.push((x.1, x.0));
84        }
85        return vec;
86    }
87
88    pub fn select(&self) -> Option<RefMut<usize, T>> {
89        let mut total_weight: i64 = 0;
90        let mut max_current = -1 << 63;
91        let now = Local::now().timestamp();
92        let mut selected_index: isize = -1;
93        for i in 0..self.size.load(Ordering::Acquire) {
94            let weight = self.weight_map.get(&i).unwrap().value().to_owned() as i64;
95            let mut entry = self.weighted_round_robin_map.entry(i)
96                .or_insert(WeightedRoundRobin::new_with_weight(
97                    weight
98                ));
99            let value_mut = entry.value_mut();
100            let cur = value_mut.increase_current();
101            value_mut.last_update = now;
102            if cur > max_current {
103                max_current = cur;
104                selected_index = i as isize;
105            }
106            total_weight += weight;
107        }
108        return if selected_index > -1 {
109            let i = selected_index as usize;
110            self.weighted_round_robin_map.get_mut(&i).unwrap().sel(total_weight);
111            self.elements_map.get_mut(&i)
112        } else {
113            Option::None
114        }
115    }
116}
117
118#[test]
119pub fn t(){
120    use std::sync::Arc;
121    let balancer = Arc::new(WeightedRoundRobinSelector::new(
122        vec![("1", 1), ("2", 2)]
123    ));
124    for _i in 0..100000 {
125        let arc = balancer.clone();
126        std::thread::spawn(move ||{
127            println!("{}", arc.select().unwrap().value());
128        });
129    }
130}