1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::BalancingStrategy;
use tokio::sync::RwLock;
use std::ops::DerefMut;
enum Priority {
DepthFirst,
BreadthFirst,
}
pub struct Polling<K>{
priority:Priority,
weight:AtomicUsize,
index:AtomicUsize,
list:RwLock<Vec<(K,usize)>>
}
impl<K:Clone + Eq + Send > Polling<K>{
pub fn new()->Self{
let priority = Priority::DepthFirst;
let weight = AtomicUsize::new(0);
let index = AtomicUsize::new(0);
let list= RwLock::new(vec![]);
Self{priority,weight,index,list}
}
pub fn depth_first(mut self) ->Self{
self.priority = Priority::DepthFirst;self
}
pub fn breadth_first(mut self) ->Self{
self.priority = Priority::BreadthFirst;self
}
async fn depth(&self) ->Option<K>{
let mut list = self.list.write().await;
let list = list.deref_mut();
if list.len() <= 0 {
return None
}
if let Some(node) = list.get(self.index.load(Ordering::Relaxed)){
if self.weight.load(Ordering::Relaxed) < node.1 {
self.weight.fetch_add(1,Ordering::Relaxed);
return Some((&node.0).clone())
}
}
self.index.fetch_add(1,Ordering::Relaxed);
self.weight.store(1,Ordering::Relaxed);
if self.index.load(Ordering::Relaxed) >= list.len() {
self.index.store(0,Ordering::Relaxed);
}
return Some((&list[self.index.load(Ordering::Relaxed)].0).clone())
}
async fn breadth(&self) ->Option<K>{
let mut list = self.list.write().await;
let list = list.deref_mut();
if list.len() <= 0 {
return None
}
if self.index.load(Ordering::Relaxed) >= list.len() {
self.index.store(0,Ordering::Relaxed);
self.weight.fetch_add(1,Ordering::Relaxed);
if self.weight.load(Ordering::Relaxed) >= list.last().unwrap().1 {
self.weight.store(0,Ordering::Relaxed);
}
}
let weight = self.weight.load(Ordering::Relaxed);
for index in self.index.load(Ordering::Relaxed)..list.len() {
if list[index].1 > weight {
self.index.store(index+1,Ordering::Relaxed);
return Some(list[index].0.clone())
}
}
None
}
}
#[async_trait::async_trait]
impl<K:Clone + Eq + Send + Sync > BalancingStrategy<K> for Polling<K>{
async fn add(&self, k: K, n: usize) {
let mut list = self.list.write().await;
let list = list.deref_mut();
match self.priority {
Priority::DepthFirst => {
list.push((k,n));
}
Priority::BreadthFirst => {
for index in 0..list.len() {
if n <= list[index].1 {
list.insert(index,(k,n));
return;
}
}
list.push((k,n));
}
}
}
async fn remove(&self, k: K) {
let mut list = self.list.write().await;
let list = list.deref_mut();
if list.len() < 1 {
return
}
for index in 0..list.len(){
if k == list[index].0 {
list.remove(index);
break;
}
}
}
async fn select(&self) -> Option<K> {
match self.priority {
Priority::DepthFirst => self.depth().await,
Priority::BreadthFirst => self.breadth().await,
}
}
}