concurrent_round_robin/
lib.rs1
2use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
3use chrono::{Local};
4use dashmap::{DashMap};
5
6use dashmap::mapref::one::RefMut;
7
8
9struct WeightedRoundRobin{
10 weight: i64,
11 current: AtomicI64,
12 last_update: i64,
13}
14
15impl WeightedRoundRobin {
16 pub fn new_with_weight(weight: i64) -> WeightedRoundRobin{
17 WeightedRoundRobin{
18 weight,
19 current: AtomicI64::new(0),
20 last_update: 0
21 }
22 }
23 fn increase_current(&self) -> i64{
24 self.current.fetch_add(self.weight, Ordering::SeqCst)
25 }
26 fn sel(&self, total: i64) {
27 let _ = self.current.fetch_add(-1 * total, Ordering::SeqCst);
28 }
29}
30
31pub struct WeightedRoundRobinSelector<T> {
45 size: AtomicUsize,
46 elements_map: DashMap<usize, T>,
47 weight_map: DashMap<usize, usize>,
48 weighted_round_robin_map: DashMap<usize, WeightedRoundRobin>,
49}
50
51unsafe impl <T> Send for WeightedRoundRobinSelector<T> {}
52
53impl <T> Drop for WeightedRoundRobinSelector<T>{
54 fn drop(&mut self) {
55 self.size.store(0, Ordering::Release);
56 }
57}
58
59
60impl <T> WeightedRoundRobinSelector<T>{
61
62 pub fn new(elements_with_weight: Vec<(T, usize)>) -> WeightedRoundRobinSelector<T>{
63 let elements_map = DashMap::new();
64 let weight_map = DashMap::new();
65 let mut i = 0;
66 for (e, w) in elements_with_weight {
67 elements_map.insert( i, e);
68 weight_map.insert(i, w);
69 i+=1;
70 }
71 WeightedRoundRobinSelector {
72 size: AtomicUsize::new(i),
73 elements_map, weight_map, weighted_round_robin_map: DashMap::new()
74 }
75 }
76
77 pub fn close(self) -> Vec<(T, usize)> {
78 let size = self.size.load(Ordering::Acquire);
79 self.size.store(0, Ordering::Release);
80 let mut vec = Vec::new();
81 for i in 0..size {
82 let x = self.elements_map.remove(&i).unwrap();
83 vec.push((x.1, x.0));
84 }
85 return vec;
86 }
87
88 pub fn select(&self) -> Option<RefMut<usize, T>> {
89 let mut total_weight: i64 = 0;
90 let mut max_current = -1 << 63;
91 let now = Local::now().timestamp();
92 let mut selected_index: isize = -1;
93 for i in 0..self.size.load(Ordering::Acquire) {
94 let weight = self.weight_map.get(&i).unwrap().value().to_owned() as i64;
95 let mut entry = self.weighted_round_robin_map.entry(i)
96 .or_insert(WeightedRoundRobin::new_with_weight(
97 weight
98 ));
99 let value_mut = entry.value_mut();
100 let cur = value_mut.increase_current();
101 value_mut.last_update = now;
102 if cur > max_current {
103 max_current = cur;
104 selected_index = i as isize;
105 }
106 total_weight += weight;
107 }
108 return if selected_index > -1 {
109 let i = selected_index as usize;
110 self.weighted_round_robin_map.get_mut(&i).unwrap().sel(total_weight);
111 self.elements_map.get_mut(&i)
112 } else {
113 Option::None
114 }
115 }
116}
117
118#[test]
119pub fn t(){
120 use std::sync::Arc;
121 let balancer = Arc::new(WeightedRoundRobinSelector::new(
122 vec![("1", 1), ("2", 2)]
123 ));
124 for _i in 0..100000 {
125 let arc = balancer.clone();
126 std::thread::spawn(move ||{
127 println!("{}", arc.select().unwrap().value());
128 });
129 }
130}