async_wrr_queue/
wrr_queue.rs

1use crate::instance::Instance;
2use log::error;
3use num::integer::lcm;
4use std::sync::atomic::{AtomicUsize, Ordering};
5
6/// weighted round robin queue struct
7///
8/// WRR queue, each time new instance is inserted, balance queue need to be recalculated.
9/// So minimizing the insert operation can improve performance.
10///
11/// `select` method requires only an atomic usize and a Read access to the RwLock.
12/// There should be of no runtime performance issue.
13///
14/// example:
15///
16/// ```ignore
17/// use async_wrr_queue::{WrrQueue, Instance};
18/// use std::num::NonZeroUsize;
19///
20/// let mut queue = WrrQueue::new().insert_many([("data1", 1), ("data2", 2), ("data3", 3)]).await;
21/// queue.insert(Instance::new_with_weight("data4", NonZeroUsize::new(4).unwrap())).await;
22///
23/// let selected1 = queue.select();
24/// let selected2 = queue.select();
25/// let selected3 = queue.select();
26/// ```
27pub struct WrrQueue<T: PartialEq> {
28    instance_list: Vec<Instance<T>>,
29    cur_idx: AtomicUsize,
30    #[cfg(feature = "tokio")]
31    select_queue: tokio::sync::RwLock<Vec<usize>>,
32    #[cfg(feature = "blocking")]
33    select_queue: std::sync::RwLock<Vec<usize>>,
34}
35
36impl<T: PartialEq> Default for WrrQueue<T> {
37    /// create a default WRR Queue, with no data
38    fn default() -> Self {
39        WrrQueue {
40            instance_list: Vec::new(),
41            cur_idx: AtomicUsize::new(0),
42
43            #[cfg(feature = "tokio")]
44            select_queue: tokio::sync::RwLock::new(Vec::new()),
45            #[cfg(feature = "blocking")]
46            select_queue: std::sync::RwLock::new(Vec::new()),
47        }
48    }
49}
50
51impl<T: PartialEq> WrrQueue<T> {
52    pub fn new() -> Self {
53        Self::default()
54    }
55
56    fn insert_uncalculated(&mut self, instance: Instance<T>) -> bool {
57        if self.instance_list.contains(&instance) {
58            false
59        } else {
60            self.instance_list.push(instance);
61            true
62        }
63    }
64
65    fn clear_instance_uncalculated(&mut self) {
66        self.instance_list = Default::default();
67        self.cur_idx = Default::default();
68        self.select_queue = Default::default();
69    }
70
71    fn delete_uncalculated(&mut self, instance: Instance<T>) -> bool {
72        if self.instance_list.contains(&instance) {
73            false
74        } else {
75            let index = self
76                .instance_list
77                .iter()
78                .position(|x| *x == instance)
79                .unwrap();
80            self.instance_list.remove(index);
81            true
82        }
83    }
84}
85
86#[cfg(feature = "tokio")]
87impl<T: PartialEq> WrrQueue<T> {
88    /// insert a new instance, and re-calculate request queue
89    pub async fn insert(&mut self, instance: impl Into<Instance<T>>) -> bool {
90        let res = self.insert_uncalculated(instance.into());
91        self.recalculate_queue().await;
92        res
93    }
94
95    /// insert a new instance vec, and re-calculate request queue
96    /// recommended when have multiple instance to be inserted
97    pub async fn insert_many<U>(&mut self, instance_list: impl Into<Vec<U>>) -> bool
98    where
99        T: PartialEq,
100        U: Into<Instance<T>>,
101    {
102        let res = instance_list
103            .into()
104            .into_iter()
105            .map(|i| self.insert_uncalculated(i.into()))
106            .all(|t| t);
107        self.recalculate_queue().await;
108        res
109    }
110
111    /// return the selected instance, None if instance_list is empty
112    /// NOTE: select operation used only atomic operation, and can be paralleled  
113    pub async fn select(&mut self) -> Option<&Instance<T>> {
114        if self.instance_list.is_empty() {
115            None
116        } else {
117            let idx = self.cur_idx.fetch_add(1, Ordering::Relaxed);
118            let read_lock = self.select_queue.read().await;
119            let selected_seq_idx = idx % read_lock.len();
120            let selected_instance_idx = read_lock.get(selected_seq_idx)?;
121            self.instance_list.get(*selected_instance_idx)
122        }
123    }
124
125    /// clear instance in the queue
126    pub fn clear_instance(&mut self) {
127        self.clear_instance_uncalculated();
128    }
129
130    /// delete certain instance
131    pub async fn delete_instance(&mut self, instance: Instance<T>) -> bool {
132        if self.delete_uncalculated(instance) {
133            self.recalculate_queue().await;
134            true
135        } else {
136            false
137        }
138    }
139
140    async fn recalculate_queue(&mut self) {
141        if self.instance_list.is_empty() {
142            self.clear_instance();
143            return;
144        }
145        let lcm = self
146            .instance_list
147            .iter()
148            .map(Instance::weight)
149            .fold(1usize, |acc, a| lcm(acc, a.get()));
150        let mut queue = Vec::new();
151        let weight_vec = self.instance_list.iter().fold(Vec::new(), |mut acc, a| {
152            acc.push(a.weight().get());
153            acc
154        });
155        let mut cur_weight_vec: Vec<isize> =
156            weight_vec.clone().into_iter().map(|u| u as isize).collect();
157        for _ in 0..=lcm {
158            let selected = select_instance(&weight_vec, &mut cur_weight_vec);
159            queue.push(selected);
160        }
161
162        let mut queue_lock = self.select_queue.write().await;
163        queue_lock.clear();
164        for i in queue {
165            queue_lock.push(i);
166        }
167    }
168}
169
170#[cfg(feature = "blocking")]
171impl<T: PartialEq> WrrQueue<T> {
172    /// insert a new instance, and re-calculate request queue
173    pub fn insert(&mut self, instance: impl Into<Instance<T>>) -> bool {
174        let res = self.insert_uncalculated(instance.into());
175        self.recalculate_queue();
176        res
177    }
178
179    /// insert a new instance vec, and re-calculate request queue
180    /// recommended when have multiple instance to be inserted
181    pub fn insert_many<U>(&mut self, instance_list: impl Into<Vec<U>>) -> bool
182    where
183        T: PartialEq,
184        U: Into<Instance<T>>,
185    {
186        let res = instance_list
187            .into()
188            .into_iter()
189            .map(|i| self.insert_uncalculated(i.into()))
190            .all(|t| t);
191        self.recalculate_queue();
192        res
193    }
194
195    /// return the selected instance, None if instance_list is empty
196    /// NOTE: select operation used only atomic operation, and can be paralleled  
197    pub fn select(&mut self) -> Option<&Instance<T>> {
198        if self.instance_list.is_empty() {
199            None
200        } else {
201            let idx = self.cur_idx.fetch_add(1, Ordering::Relaxed);
202            let read_lock = self
203                .select_queue
204                .read()
205                .expect("Read access acquired failed");
206            let selected_seq_idx = idx % read_lock.len();
207            let selected_instance_idx = read_lock.get(selected_seq_idx)?;
208            self.instance_list.get(*selected_instance_idx)
209        }
210    }
211
212    /// clear instance in the queue
213    pub fn clear_instance(&mut self) {
214        self.clear_instance_uncalculated();
215    }
216
217    /// delete certain instance
218    pub fn delete_instance(&mut self, instance: Instance<T>) -> bool {
219        if self.delete_uncalculated(instance) {
220            self.recalculate_queue();
221            true
222        } else {
223            false
224        }
225    }
226
227    fn recalculate_queue(&mut self) {
228        let lcm = self
229            .instance_list
230            .iter()
231            .map(Instance::weight)
232            .fold(1usize, |acc, a| lcm(acc, a.get()));
233        let mut queue = Vec::new();
234        let weight_vec = self.instance_list.iter().fold(Vec::new(), |mut acc, a| {
235            acc.push(a.weight().get());
236            acc
237        });
238        let mut cur_weight_vec: Vec<isize> =
239            weight_vec.clone().into_iter().map(|u| u as isize).collect();
240        for _ in 0..=lcm {
241            let selected = select_instance(&weight_vec, &mut cur_weight_vec);
242            queue.push(selected);
243        }
244
245        let mut queue_lock = self
246            .select_queue
247            .write()
248            .expect("Write lock acquired failed");
249        queue_lock.clear();
250        for i in queue {
251            queue_lock.push(i);
252        }
253    }
254}
255
256fn select_instance(weight_vec: &Vec<usize>, cur_weight: &mut [isize]) -> usize {
257    if weight_vec.is_empty() {
258        error!("failed to select an instance: instance list is empty");
259        return 0;
260    }
261    let mut selected = 0;
262    let mut acc = 0isize;
263    for i in 0..weight_vec.len() {
264        cur_weight[i] += weight_vec[i] as isize;
265        acc += weight_vec[i] as isize;
266        if cur_weight[selected] < cur_weight[i] {
267            selected = i;
268        }
269    }
270    cur_weight[selected] -= acc;
271    selected
272}