async_wrr_queue/
wrr_queue.rs1use crate::instance::Instance;
2use log::error;
3use num::integer::lcm;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6pub struct WrrQueue<T: PartialEq> {
28 instance_list: Vec<Instance<T>>,
29 cur_idx: AtomicUsize,
30 #[cfg(feature = "tokio")]
31 select_queue: tokio::sync::RwLock<Vec<usize>>,
32 #[cfg(feature = "blocking")]
33 select_queue: std::sync::RwLock<Vec<usize>>,
34}
35
36impl<T: PartialEq> Default for WrrQueue<T> {
37 fn default() -> Self {
39 WrrQueue {
40 instance_list: Vec::new(),
41 cur_idx: AtomicUsize::new(0),
42
43 #[cfg(feature = "tokio")]
44 select_queue: tokio::sync::RwLock::new(Vec::new()),
45 #[cfg(feature = "blocking")]
46 select_queue: std::sync::RwLock::new(Vec::new()),
47 }
48 }
49}
50
51impl<T: PartialEq> WrrQueue<T> {
52 pub fn new() -> Self {
53 Self::default()
54 }
55
56 fn insert_uncalculated(&mut self, instance: Instance<T>) -> bool {
57 if self.instance_list.contains(&instance) {
58 false
59 } else {
60 self.instance_list.push(instance);
61 true
62 }
63 }
64
65 fn clear_instance_uncalculated(&mut self) {
66 self.instance_list = Default::default();
67 self.cur_idx = Default::default();
68 self.select_queue = Default::default();
69 }
70
71 fn delete_uncalculated(&mut self, instance: Instance<T>) -> bool {
72 if self.instance_list.contains(&instance) {
73 false
74 } else {
75 let index = self
76 .instance_list
77 .iter()
78 .position(|x| *x == instance)
79 .unwrap();
80 self.instance_list.remove(index);
81 true
82 }
83 }
84}
85
86#[cfg(feature = "tokio")]
87impl<T: PartialEq> WrrQueue<T> {
88 pub async fn insert(&mut self, instance: impl Into<Instance<T>>) -> bool {
90 let res = self.insert_uncalculated(instance.into());
91 self.recalculate_queue().await;
92 res
93 }
94
95 pub async fn insert_many<U>(&mut self, instance_list: impl Into<Vec<U>>) -> bool
98 where
99 T: PartialEq,
100 U: Into<Instance<T>>,
101 {
102 let res = instance_list
103 .into()
104 .into_iter()
105 .map(|i| self.insert_uncalculated(i.into()))
106 .all(|t| t);
107 self.recalculate_queue().await;
108 res
109 }
110
111 pub async fn select(&mut self) -> Option<&Instance<T>> {
114 if self.instance_list.is_empty() {
115 None
116 } else {
117 let idx = self.cur_idx.fetch_add(1, Ordering::Relaxed);
118 let read_lock = self.select_queue.read().await;
119 let selected_seq_idx = idx % read_lock.len();
120 let selected_instance_idx = read_lock.get(selected_seq_idx)?;
121 self.instance_list.get(*selected_instance_idx)
122 }
123 }
124
125 pub fn clear_instance(&mut self) {
127 self.clear_instance_uncalculated();
128 }
129
130 pub async fn delete_instance(&mut self, instance: Instance<T>) -> bool {
132 if self.delete_uncalculated(instance) {
133 self.recalculate_queue().await;
134 true
135 } else {
136 false
137 }
138 }
139
140 async fn recalculate_queue(&mut self) {
141 if self.instance_list.is_empty() {
142 self.clear_instance();
143 return;
144 }
145 let lcm = self
146 .instance_list
147 .iter()
148 .map(Instance::weight)
149 .fold(1usize, |acc, a| lcm(acc, a.get()));
150 let mut queue = Vec::new();
151 let weight_vec = self.instance_list.iter().fold(Vec::new(), |mut acc, a| {
152 acc.push(a.weight().get());
153 acc
154 });
155 let mut cur_weight_vec: Vec<isize> =
156 weight_vec.clone().into_iter().map(|u| u as isize).collect();
157 for _ in 0..=lcm {
158 let selected = select_instance(&weight_vec, &mut cur_weight_vec);
159 queue.push(selected);
160 }
161
162 let mut queue_lock = self.select_queue.write().await;
163 queue_lock.clear();
164 for i in queue {
165 queue_lock.push(i);
166 }
167 }
168}
169
170#[cfg(feature = "blocking")]
171impl<T: PartialEq> WrrQueue<T> {
172 pub fn insert(&mut self, instance: impl Into<Instance<T>>) -> bool {
174 let res = self.insert_uncalculated(instance.into());
175 self.recalculate_queue();
176 res
177 }
178
179 pub fn insert_many<U>(&mut self, instance_list: impl Into<Vec<U>>) -> bool
182 where
183 T: PartialEq,
184 U: Into<Instance<T>>,
185 {
186 let res = instance_list
187 .into()
188 .into_iter()
189 .map(|i| self.insert_uncalculated(i.into()))
190 .all(|t| t);
191 self.recalculate_queue();
192 res
193 }
194
195 pub fn select(&mut self) -> Option<&Instance<T>> {
198 if self.instance_list.is_empty() {
199 None
200 } else {
201 let idx = self.cur_idx.fetch_add(1, Ordering::Relaxed);
202 let read_lock = self
203 .select_queue
204 .read()
205 .expect("Read access acquired failed");
206 let selected_seq_idx = idx % read_lock.len();
207 let selected_instance_idx = read_lock.get(selected_seq_idx)?;
208 self.instance_list.get(*selected_instance_idx)
209 }
210 }
211
212 pub fn clear_instance(&mut self) {
214 self.clear_instance_uncalculated();
215 }
216
217 pub fn delete_instance(&mut self, instance: Instance<T>) -> bool {
219 if self.delete_uncalculated(instance) {
220 self.recalculate_queue();
221 true
222 } else {
223 false
224 }
225 }
226
227 fn recalculate_queue(&mut self) {
228 let lcm = self
229 .instance_list
230 .iter()
231 .map(Instance::weight)
232 .fold(1usize, |acc, a| lcm(acc, a.get()));
233 let mut queue = Vec::new();
234 let weight_vec = self.instance_list.iter().fold(Vec::new(), |mut acc, a| {
235 acc.push(a.weight().get());
236 acc
237 });
238 let mut cur_weight_vec: Vec<isize> =
239 weight_vec.clone().into_iter().map(|u| u as isize).collect();
240 for _ in 0..=lcm {
241 let selected = select_instance(&weight_vec, &mut cur_weight_vec);
242 queue.push(selected);
243 }
244
245 let mut queue_lock = self
246 .select_queue
247 .write()
248 .expect("Write lock acquired failed");
249 queue_lock.clear();
250 for i in queue {
251 queue_lock.push(i);
252 }
253 }
254}
255
256fn select_instance(weight_vec: &Vec<usize>, cur_weight: &mut [isize]) -> usize {
257 if weight_vec.is_empty() {
258 error!("failed to select an instance: instance list is empty");
259 return 0;
260 }
261 let mut selected = 0;
262 let mut acc = 0isize;
263 for i in 0..weight_vec.len() {
264 cur_weight[i] += weight_vec[i] as isize;
265 acc += weight_vec[i] as isize;
266 if cur_weight[selected] < cur_weight[i] {
267 selected = i;
268 }
269 }
270 cur_weight[selected] -= acc;
271 selected
272}