pi_weight_task/
lib.rs

1//! 任务池
2//! 可以向任务池中放入不同权重的任务,任务池提供弹出功能,任务池大概率会弹出权重高的任务。
3//! 任务池支持的任务可以大致分为三类:
4//!     1. 串行任务:插入串行任务需要先创建队列,放入到同一个队列的任务,会按顺序弹出。
5//!         创建队列会返回队列key,可以通过该key获取队列引用。
6//!         可以获得队列状态-是否被锁定,可以从队列头或尾放入任务,也可弹出任务。
7//!         可设置队列权重。调整队列权重立刻在下一次弹出队列时生效。
8//!         队列可以设置为弹出任务后自动锁定,直到外部将队列解开锁定,锁定的队列不会在弹出任务。
9//!     2. 并行任务:在任务池中,如果不是队列任务,那一定是一个并行任务。
10//!         并行任务与串行任务的区别是,并行任务不需要排队,并行任务的权重越高,弹出的概率越大。
11//!     3. 定时任务,该任务先被存在定时器中,超时后,才能有机会被弹出。
12//!         定时任务分为可撤销和不可撤销两类,可撤销的定时任务放入时会返回唯一key,通过该key可撤销该任务。
13//!
14
15use pi_ext_heap::empty;
16use pi_weight::{WeightHeap, WeightItem};
17use pi_wy_rng::WyRng;
18use rand_core::{RngCore, SeedableRng};
19use slotmap::{new_key_type, Key, SlotMap};
20use std::{collections::VecDeque, fmt, num::NonZeroU32};
21
22// 定义队列键类型
23new_key_type! {
24    pub struct DequeKey;
25}
26
27/// 队列权重类型
28#[derive(Clone, Copy, PartialEq, Eq, Debug)]
29pub enum WeightType {
30    // 标准权重
31    Normal(NonZeroU32),
32    // 单位权重,总权重为队列长度*单位权重
33    Unit(NonZeroU32),
34}
35/// 任务队列
36pub struct Deque<T, D> {
37    /// 队列
38    pub deque: VecDeque<T>,
39    /// 队列权重类型
40    weight_type: WeightType,
41    /// 所在的权重堆的位置
42    weight_index: usize,
43    /// 锁定状态, None表示不自动锁定, true表示锁定,false表示无锁定
44    lock_state: Option<bool>,
45    /// 旧的长度
46    old_deque_len: usize,
47    /// 关联的数据
48    pub data: D,
49}
50impl<T, D> Deque<T, D> {
51    pub fn new(weight_type: WeightType, data: D) -> Self {
52        Deque {
53            deque: Default::default(),
54            weight_type,
55            weight_index: usize::MAX,
56            lock_state: None,
57            old_deque_len: 0,
58            data,
59        }
60    }
61    pub fn with_lock(weight_type: WeightType, data: D) -> Self {
62        Deque {
63            deque: Default::default(),
64            weight_type,
65            weight_index: usize::MAX,
66            lock_state: Some(true),
67            old_deque_len: 0,
68            data,
69        }
70    }
71    /// 获得权重类型
72    pub fn weight_type(&self) -> WeightType {
73        self.weight_type
74    }
75    /// 获得队列锁定状态
76    pub fn lock_state(&self) -> Option<bool> {
77        self.lock_state
78    }
79    /// 获得队列长度
80    pub fn queue_len(&self) -> usize {
81        self.deque.len()
82    }
83    /// 获得队列状态
84    pub fn state(&self) -> DequeState {
85        DequeState {
86            new_deque_len: self.deque.len(),
87            weight_type: self.weight_type,
88            weight_index: self.weight_index,
89            lock_state: self.lock_state,
90            old_deque_len: self.old_deque_len,
91        }
92    }
93}
94impl<T: fmt::Debug, D: fmt::Debug> fmt::Debug for Deque<T, D> {
95    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96        f.debug_struct("Deque")
97            .field("deque", &self.deque)
98            .field("weight_type", &self.weight_type)
99            .field("weight_index", &self.weight_index)
100            .field("lock_state", &self.lock_state)
101            .field("data", &self.data)
102            .finish()
103    }
104}
105/// 队列状态,修复队列在权重堆上时需要
106pub struct DequeState {
107    /// 新的队列长度
108    new_deque_len: usize,
109    /// 队列权重类型
110    weight_type: WeightType,
111    /// 所在的权重堆的位置
112    weight_index: usize,
113    /// 锁定状态, None表示不自动锁定, true表示锁定,false表示无锁定
114    lock_state: Option<bool>,
115    /// 旧的队列长度
116    old_deque_len: usize,
117}
118
119/// 任务池
120pub struct TaskPool<T, D, const N0: usize, const N: usize, const L: usize> {
121    slot: SlotMap<DequeKey, Deque<T, D>>,
122    // 串行任务队列池
123    sync_pool: WeightHeap<DequeKey>,
124    // 并行任务池
125    async_pool: WeightHeap<T>,
126    // 不可撤销定时器
127    timer: pi_timer::Timer<T, N0, N, L>,
128    // 可撤销定时器
129    cancel_timer: pi_cancel_timer::Timer<T, N0, N, L>,
130    // 两个定时器的权重
131    timer_weight: usize,
132    // 随机数
133    rng: WyRng,
134    // 串行任务的添加数量
135    sync_add_count: usize,
136    // 串行任务的移除数量
137    sync_remove_count: usize,
138    // 并行任务的添加数量
139    async_add_count: usize,
140    // 并行任务的移除数量
141    async_remove_count: usize,
142}
143
144impl<T: fmt::Debug, D, const N0: usize, const N: usize, const L: usize> fmt::Debug
145    for TaskPool<T, D, N0, N, L>
146{
147    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
148        f.debug_struct("TaskPool")
149            .field("sync_pool", &self.sync_pool)
150            .field("async_pool", &self.async_pool)
151            .field("timer", &self.timer)
152            .field("cancel_timer", &self.cancel_timer)
153            .field("timer_weight", &self.timer_weight)
154            .field("rng", &self.rng)
155            .field("sync_add_count", &self.sync_add_count)
156            .field("sync_remove_count", &self.sync_remove_count)
157            .field("async_add_count", &self.async_add_count)
158            .field("async_remove_count", &self.async_remove_count)
159            .finish()
160    }
161}
162
163impl<T, D, const N0: usize, const N: usize, const L: usize> Default for TaskPool<T, D, N0, N, L> {
164    fn default() -> Self {
165        TaskPool {
166            slot: Default::default(),
167            sync_pool: Default::default(),
168            async_pool: Default::default(),
169            timer: Default::default(),
170            cancel_timer: Default::default(),
171            timer_weight: 65535,
172            rng: Default::default(),
173            sync_add_count: 0,
174            sync_remove_count: 0,
175            async_add_count: 0,
176            async_remove_count: 0,
177        }
178    }
179}
180impl<T, D, const N0: usize, const N: usize, const L: usize> TaskPool<T, D, N0, N, L> {
181    /// 获得所有任务的添加数量
182    pub fn add_count(&self) -> usize {
183        self.sync_add_count
184            + self.async_add_count
185            + self.timer.add_count()
186            + self.cancel_timer.add_count()
187    }
188    /// 获得所有任务的移除数量
189    pub fn remove_count(&self) -> usize {
190        self.sync_remove_count
191            + self.async_remove_count
192            + self.timer.remove_count()
193            + self.cancel_timer.remove_count()
194    }
195    /// 获得串行任务的添加数量
196    pub fn sync_add_count(&self) -> usize {
197        self.sync_add_count
198    }
199    /// 获得串行任务的移除数量
200    pub fn sync_remove_count(&self) -> usize {
201        self.sync_remove_count
202    }
203    /// 获得并行任务的添加数量
204    pub fn async_add_count(&self) -> usize {
205        self.async_add_count
206    }
207    /// 获得并行任务的移除数量
208    pub fn async_remove_count(&self) -> usize {
209        self.async_remove_count
210    }
211    /// 重置随机种子
212    pub fn reset_rng(&mut self, seed: u64) {
213        self.rng = WyRng::seed_from_u64(seed);
214    }
215    /// 获得串行任务队列的数量
216    pub fn deque_len(&self) -> usize {
217        self.slot.len()
218    }
219    /// 将指定的串行任务队列加入任务池,返回队列key
220    pub fn push_deque(&mut self, deque: Deque<T, D>) -> DequeKey {
221        let w = match deque.weight_type {
222            WeightType::Normal(w) => {
223                if deque.deque.len() > 0 {
224                    w.get() as usize
225                } else {
226                    0
227                }
228            }
229            WeightType::Unit(w) => (w.get() as usize) * deque.deque.len(),
230        };
231        let lock_state = deque.lock_state;
232        let key = self.slot.insert(deque);
233        if Some(true) != lock_state && w > 0 {
234            // 如果队列没有锁定, 并且有任务, 则放入权重池中
235            self.sync_pool
236                .push_weight(w, key, &mut self.slot, set_index);
237        }
238        key
239    }
240    /// 重设置队列的权重
241    pub fn reset_deque_weight(&mut self, key: DequeKey, weight_type: WeightType) -> bool {
242        if let Some(it) = self.slot.get_mut(key) {
243            if it.weight_type == weight_type {
244                return true;
245            }
246            it.weight_type = weight_type;
247            // 当前队列没有任务
248            if it.deque.len() == 0 {
249                return true;
250            }
251            // 当前队列被锁定
252            if Some(true) == it.lock_state {
253                return true;
254            }
255            let w = match it.weight_type {
256                WeightType::Normal(w) => w.get() as usize,
257                WeightType::Unit(w) => (w.get() as usize) * it.deque.len(),
258            };
259            // 修正权重堆中的权重
260            self.sync_pool
261                .modify_weight(it.weight_index, w, &mut self.slot, set_index);
262            return true;
263        }
264        false
265    }
266    /// 获得任务队列的只读引用
267    pub fn get_deque(&self, key: DequeKey) -> Option<&Deque<T, D>> {
268        self.slot.get(key)
269    }
270    /// 获得任务队列的可写引用
271    pub fn get_deque_mut(&mut self, key: DequeKey) -> Option<&mut Deque<T, D>> {
272        let mut r = self.slot.get_mut(key);
273        if let Some(it) = &mut r {
274            it.old_deque_len = it.deque.len();
275        }
276        r
277    }
278    /// 增删任务队列的任务后,应修复队列状态
279    pub fn repair_deque_state(&mut self, key: DequeKey, state: DequeState) {
280        // 当前队列被锁定
281        if Some(true) == state.lock_state {
282            return;
283        }
284        if state.new_deque_len > 0 {
285            if state.old_deque_len > 0 {
286                if state.new_deque_len == state.old_deque_len {
287                    return;
288                }
289                match state.weight_type {
290                    WeightType::Unit(w) => {
291                        // 单位权重情况下, 任务数量变化要修正权重堆中的权重
292                        let w = (w.get() as usize) * state.new_deque_len;
293                        self.sync_pool.modify_weight(
294                            state.weight_index,
295                            w,
296                            &mut self.slot,
297                            set_index,
298                        );
299                    }
300                    _ => (),
301                };
302                if state.new_deque_len > state.old_deque_len {
303                    self.sync_add_count += state.new_deque_len - state.old_deque_len;
304                } else {
305                    self.sync_remove_count += state.old_deque_len - state.new_deque_len;
306                }
307            } else {
308                let w = match state.weight_type {
309                    WeightType::Normal(w) => w.get() as usize,
310                    WeightType::Unit(w) => (w.get() as usize) * state.new_deque_len,
311                };
312                // 插入到权重堆
313                self.sync_pool
314                    .push_weight(w, key, &mut self.slot, set_index);
315                self.sync_add_count += state.new_deque_len;
316            }
317        } else if state.old_deque_len > 0 {
318            // 移除出权重堆
319            self.sync_pool
320                .remove_index(state.weight_index, &mut self.slot, set_index);
321            self.sync_remove_count += state.old_deque_len;
322        }
323    }
324    /// 释放队列的锁,成功释放,则返回true, 否则返回false
325    pub fn deque_unlock(&mut self, key: DequeKey) -> bool {
326        if let Some(it) = self.slot.get_mut(key) {
327            if Some(true) == it.lock_state {
328                // 解锁
329                it.lock_state = Some(false);
330                if it.deque.len() > 0 {
331                    // 队列中有任务,则将队列放入到权重堆
332                    let w = match it.weight_type {
333                        WeightType::Normal(w) => w.get() as usize,
334                        WeightType::Unit(w) => (w.get() as usize) * it.deque.len(),
335                    };
336                    self.sync_pool
337                        .push_weight(w, key, &mut self.slot, set_index);
338                }
339            }
340            true
341        } else {
342            false
343        }
344    }
345
346    /// 删除一个任务队列,如果删除成功,返回true, 否则返回false
347    pub fn remove_deque(&mut self, key: DequeKey) -> bool {
348        if let Some(it) = self.slot.remove(key) {
349            // 当前队列没有任务
350            if it.deque.len() == 0 {
351                return true;
352            }
353            self.sync_remove_count += it.deque.len();
354            // 当前队列被锁定
355            if Some(true) == it.lock_state {
356                return true;
357            }
358            self.sync_pool
359                .remove_index(it.weight_index, &mut self.slot, set_index);
360            true
361        } else {
362            false
363        }
364    }
365    /// 插入一个指定任务权重的并行任务
366    pub fn push_async(&mut self, task: T, weight: u32) {
367        self.async_pool
368            .push_weight(weight as usize, task, &mut (), empty);
369        self.async_add_count += 1;
370    }
371    /// 获得不可删除的定时器
372    pub fn get_timer(&self) -> &pi_timer::Timer<T, N0, N, L> {
373        &self.timer
374    }
375    /// 获得可删除的定时器
376    pub fn get_cancel_timer(&self) -> &pi_cancel_timer::Timer<T, N0, N, L> {
377        &self.cancel_timer
378    }
379    /// 获得不可删除的定时器
380    pub fn get_timer_mut(&mut self) -> &mut pi_timer::Timer<T, N0, N, L> {
381        &mut self.timer
382    }
383    /// 获得可删除的定时器
384    pub fn get_cancel_timer_mut(&mut self) -> &mut pi_cancel_timer::Timer<T, N0, N, L> {
385        &mut self.cancel_timer
386    }
387    /// 获得定时器的权重
388    #[deprecated]
389    pub fn get_timer_weight(&self) -> usize {
390        self.timer_weight
391    }
392    /// 设置定时器的权重
393    #[deprecated]
394    pub fn set_timer_weight(&mut self, weight: usize) {
395        self.timer_weight = weight;
396    }
397    /// 弹出一个任务,如果任务存在,返回任务及所在队列, 否则返回None
398    /// 如果该任务是一个串行队列任务,并且为自动加锁状态,则会对该任务所在的队列加锁,此后,该队列的任务无法弹出,
399    /// 直到外部调用free_deque方法解锁该队列,该队列的任务在后续的弹出过程中才有机会被弹出
400    pub fn pop(&mut self, now: u64) -> (Option<T>, DequeKey) {
401        let sync_w = if let Some(r) = self.sync_pool.peek() {
402            r.amount() as u64
403        } else {
404            0
405        };
406        let async_w = if let Some(r) = self.async_pool.peek() {
407            r.amount() as u64
408        } else {
409            0
410        };
411        // 定时器中任务权重总是占总权重的一半
412        let mut cancel_timer_w = if self.cancel_timer.is_ok(now) {
413            sync_w + async_w + 2
414        } else {
415            0
416        };
417        let timer_w = if self.timer.is_ok(now) {
418            if cancel_timer_w > 0 {
419                cancel_timer_w = cancel_timer_w >> 1;
420                cancel_timer_w
421            } else {
422                sync_w + async_w + 1
423            }
424        } else {
425            0
426        };
427        let amount = sync_w + async_w + timer_w + cancel_timer_w;
428        if amount == 0 {
429            return (None, DequeKey::null());
430        }
431        let mut w = self.rng.next_u64() % amount;
432        if w < cancel_timer_w {
433            return (self.cancel_timer.pop(now), DequeKey::null());
434        } else {
435            w -= cancel_timer_w;
436        }
437        if w < timer_w {
438            return (self.timer.pop(now), DequeKey::null());
439        } else {
440            w -= timer_w;
441        }
442        if w < async_w {
443            if let Some(r) = self.async_pool.pop(&mut (), empty) {
444                self.async_remove_count += 1;
445                return (Some(r.el), DequeKey::null());
446            }
447        } else {
448            w -= async_w;
449        }
450        // 从串行任务队列的权重堆中根据权重查找队列
451        let index = self.sync_pool.find_weight(w as usize);
452        let key = self.sync_pool.as_slice()[index].el;
453        let it = &mut self.slot[key];
454        // 弹出任务
455        let r = it.deque.pop_front();
456        self.sync_remove_count += 1;
457        if it.lock_state.is_some() {
458            // 如果队列为自动锁定状态,则改为锁定, 并移出权重堆
459            it.lock_state = Some(true);
460            self.sync_pool
461                .remove_index(index, &mut self.slot, set_index);
462        } else if it.deque.is_empty() {
463            // 如果队列为空,则移出权重堆
464            self.sync_pool
465                .remove_index(index, &mut self.slot, set_index);
466        } else if let WeightType::Unit(uw) = it.weight_type {
467            // 如果队列权重类型为单位权重,则调整队列在权重堆上的权重
468            let ww = (uw.get() as usize) * it.deque.len();
469            self.sync_pool
470                .modify_weight(index, ww, &mut self.slot, set_index);
471        }
472        (r, key)
473    }
474
475    /// 弹出一个任务(忽略定时器的任务)
476    pub fn pop_ignore_timer(&mut self) -> (Option<T>, DequeKey) {
477        let sync_w = if let Some(r) = self.sync_pool.peek() {
478            r.amount() as u64
479        } else {
480            0
481        };
482        let async_w = if let Some(r) = self.async_pool.peek() {
483            r.amount() as u64
484        } else {
485            0
486        };
487        let amount = sync_w + async_w;
488        if amount == 0 {
489            return (None, DequeKey::null());
490        }
491        let mut w = self.rng.next_u64() % amount;
492        if w < async_w {
493            if let Some(r) = self.async_pool.pop(&mut (), empty) {
494                self.async_remove_count += 1;
495                return (Some(r.el), DequeKey::null());
496            }
497        } else {
498            w -= async_w;
499        }
500        // 从串行任务队列的权重堆中根据权重查找队列
501        let index = self.sync_pool.find_weight(w as usize);
502        let key = self.sync_pool.as_slice()[index].el;
503        let it = &mut self.slot[key];
504        // 弹出任务
505        let r = it.deque.pop_front();
506        self.sync_remove_count += 1;
507        if it.lock_state.is_some() {
508            // 如果队列为自动锁定状态,则改为锁定, 并移出权重堆
509            it.lock_state = Some(true);
510            self.sync_pool
511                .remove_index(index, &mut self.slot, set_index);
512        } else if it.deque.is_empty() {
513            // 如果队列为空,则移出权重堆
514            self.sync_pool
515                .remove_index(index, &mut self.slot, set_index);
516        } else if let WeightType::Unit(uw) = it.weight_type {
517            // 如果队列权重类型为单位权重,则调整队列在权重堆上的权重
518            let ww = (uw.get() as usize) * it.deque.len();
519            self.sync_pool
520                .modify_weight(index, ww, &mut self.slot, set_index);
521        }
522        (r, key)
523    }
524    /// 判断当前时间内是否还有可以弹出的任务
525    pub fn is_ok(&mut self, now: u64) -> bool {
526        self.sync_pool.len() > 0
527            || self.async_pool.len() > 0
528            || self.timer.is_ok(now)
529            || self.cancel_timer.is_ok(now)
530    }
531}
532
533fn set_index<T, D>(
534    slot: &mut SlotMap<DequeKey, Deque<T, D>>,
535    arr: &mut [WeightItem<DequeKey>],
536    loc: usize,
537) {
538    let i = &arr[loc];
539    unsafe {
540        slot.get_unchecked_mut(i.el).weight_index = loc;
541    }
542}
543
544// 测试定时器得延时情况
545#[cfg(test)]
546mod test_mod {
547    //extern crate rand_core;
548
549    use std::{
550        thread,
551        time::{Duration, Instant},
552    };
553
554    //use self::rand_core::{RngCore, SeedableRng};
555    use crate::*;
556
557    #[test]
558    fn test() {
559        let mut pool: TaskPool<(u64, u64), u64, 128, 16, 1> = Default::default();
560        let arr = [
561            pool.push_deque(Deque::new(
562                WeightType::Unit(unsafe { NonZeroU32::new_unchecked(1000) }),
563                1,
564            )),
565            pool.push_deque(Deque::new(
566                WeightType::Unit(unsafe { NonZeroU32::new_unchecked(200) }),
567                2,
568            )),
569            pool.push_deque(Deque::new(
570                WeightType::Normal(unsafe { NonZeroU32::new_unchecked(20000) }),
571                3,
572            )),
573        ];
574        let mut rng = WyRng::seed_from_u64(22222);
575        let start = Instant::now();
576        for i in 1..100000 {
577            let t = (rng.next_u32() % 16100) as u64;
578            let now = Instant::now();
579            let tt = now.duration_since(start).as_millis() as u64;
580            if i < 100 {
581                if t % 4 == 0 {
582                    println!("push: timeout:{} r:{:?}", t, (i, t));
583                    pool.get_timer_mut().push(t as usize, (i, t));
584                } else if t % 4 == 1 {
585                    println!("push: cancel:{} r:{:?}", t, (i, t));
586                    pool.get_cancel_timer_mut().push(t as usize, (i, t));
587                } else if t % 4 == 2 {
588                    println!("push: async:{} r:{:?}", t, (i, t));
589                    pool.push_async((i, t), t as u32);
590                    continue;
591                } else if t % 4 == 3 {
592                    println!("push: sync:{} r:{:?}", t, (i, t));
593                    let k = arr[(t % 3) as usize];
594                    let d = pool.get_deque_mut(k).unwrap();
595                    d.deque.push_back((i, t));
596                    let state = d.state();
597                    pool.repair_deque_state(k, state);
598                    continue;
599                }
600            }
601            while let (Some(it), dk) = pool.pop(tt) {
602                println!("ppp:{:?}, now:{}, dk:{:?}", it, tt, dk);
603            }
604            if i > 100 && pool.add_count() == pool.remove_count() {
605                //println!("vec:{:?}", vec);
606                println!("return: add_count:{:?}", pool.add_count());
607                return;
608            }
609            thread::sleep(Duration::from_millis(1 as u64));
610        }
611    }
612}