1use pi_ext_heap::empty;
16use pi_weight::{WeightHeap, WeightItem};
17use pi_wy_rng::WyRng;
18use rand_core::{RngCore, SeedableRng};
19use slotmap::{new_key_type, Key, SlotMap};
20use std::{collections::VecDeque, fmt, num::NonZeroU32};
21
22new_key_type! {
24 pub struct DequeKey;
25}
26
27#[derive(Clone, Copy, PartialEq, Eq, Debug)]
29pub enum WeightType {
30 Normal(NonZeroU32),
32 Unit(NonZeroU32),
34}
35pub struct Deque<T, D> {
37 pub deque: VecDeque<T>,
39 weight_type: WeightType,
41 weight_index: usize,
43 lock_state: Option<bool>,
45 old_deque_len: usize,
47 pub data: D,
49}
50impl<T, D> Deque<T, D> {
51 pub fn new(weight_type: WeightType, data: D) -> Self {
52 Deque {
53 deque: Default::default(),
54 weight_type,
55 weight_index: usize::MAX,
56 lock_state: None,
57 old_deque_len: 0,
58 data,
59 }
60 }
61 pub fn with_lock(weight_type: WeightType, data: D) -> Self {
62 Deque {
63 deque: Default::default(),
64 weight_type,
65 weight_index: usize::MAX,
66 lock_state: Some(true),
67 old_deque_len: 0,
68 data,
69 }
70 }
71 pub fn weight_type(&self) -> WeightType {
73 self.weight_type
74 }
75 pub fn lock_state(&self) -> Option<bool> {
77 self.lock_state
78 }
79 pub fn queue_len(&self) -> usize {
81 self.deque.len()
82 }
83 pub fn state(&self) -> DequeState {
85 DequeState {
86 new_deque_len: self.deque.len(),
87 weight_type: self.weight_type,
88 weight_index: self.weight_index,
89 lock_state: self.lock_state,
90 old_deque_len: self.old_deque_len,
91 }
92 }
93}
94impl<T: fmt::Debug, D: fmt::Debug> fmt::Debug for Deque<T, D> {
95 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
96 f.debug_struct("Deque")
97 .field("deque", &self.deque)
98 .field("weight_type", &self.weight_type)
99 .field("weight_index", &self.weight_index)
100 .field("lock_state", &self.lock_state)
101 .field("data", &self.data)
102 .finish()
103 }
104}
105pub struct DequeState {
107 new_deque_len: usize,
109 weight_type: WeightType,
111 weight_index: usize,
113 lock_state: Option<bool>,
115 old_deque_len: usize,
117}
118
119pub struct TaskPool<T, D, const N0: usize, const N: usize, const L: usize> {
121 slot: SlotMap<DequeKey, Deque<T, D>>,
122 sync_pool: WeightHeap<DequeKey>,
124 async_pool: WeightHeap<T>,
126 timer: pi_timer::Timer<T, N0, N, L>,
128 cancel_timer: pi_cancel_timer::Timer<T, N0, N, L>,
130 timer_weight: usize,
132 rng: WyRng,
134 sync_add_count: usize,
136 sync_remove_count: usize,
138 async_add_count: usize,
140 async_remove_count: usize,
142}
143
144impl<T: fmt::Debug, D, const N0: usize, const N: usize, const L: usize> fmt::Debug
145 for TaskPool<T, D, N0, N, L>
146{
147 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
148 f.debug_struct("TaskPool")
149 .field("sync_pool", &self.sync_pool)
150 .field("async_pool", &self.async_pool)
151 .field("timer", &self.timer)
152 .field("cancel_timer", &self.cancel_timer)
153 .field("timer_weight", &self.timer_weight)
154 .field("rng", &self.rng)
155 .field("sync_add_count", &self.sync_add_count)
156 .field("sync_remove_count", &self.sync_remove_count)
157 .field("async_add_count", &self.async_add_count)
158 .field("async_remove_count", &self.async_remove_count)
159 .finish()
160 }
161}
162
163impl<T, D, const N0: usize, const N: usize, const L: usize> Default for TaskPool<T, D, N0, N, L> {
164 fn default() -> Self {
165 TaskPool {
166 slot: Default::default(),
167 sync_pool: Default::default(),
168 async_pool: Default::default(),
169 timer: Default::default(),
170 cancel_timer: Default::default(),
171 timer_weight: 65535,
172 rng: Default::default(),
173 sync_add_count: 0,
174 sync_remove_count: 0,
175 async_add_count: 0,
176 async_remove_count: 0,
177 }
178 }
179}
180impl<T, D, const N0: usize, const N: usize, const L: usize> TaskPool<T, D, N0, N, L> {
181 pub fn add_count(&self) -> usize {
183 self.sync_add_count
184 + self.async_add_count
185 + self.timer.add_count()
186 + self.cancel_timer.add_count()
187 }
188 pub fn remove_count(&self) -> usize {
190 self.sync_remove_count
191 + self.async_remove_count
192 + self.timer.remove_count()
193 + self.cancel_timer.remove_count()
194 }
195 pub fn sync_add_count(&self) -> usize {
197 self.sync_add_count
198 }
199 pub fn sync_remove_count(&self) -> usize {
201 self.sync_remove_count
202 }
203 pub fn async_add_count(&self) -> usize {
205 self.async_add_count
206 }
207 pub fn async_remove_count(&self) -> usize {
209 self.async_remove_count
210 }
211 pub fn reset_rng(&mut self, seed: u64) {
213 self.rng = WyRng::seed_from_u64(seed);
214 }
215 pub fn deque_len(&self) -> usize {
217 self.slot.len()
218 }
219 pub fn push_deque(&mut self, deque: Deque<T, D>) -> DequeKey {
221 let w = match deque.weight_type {
222 WeightType::Normal(w) => {
223 if deque.deque.len() > 0 {
224 w.get() as usize
225 } else {
226 0
227 }
228 }
229 WeightType::Unit(w) => (w.get() as usize) * deque.deque.len(),
230 };
231 let lock_state = deque.lock_state;
232 let key = self.slot.insert(deque);
233 if Some(true) != lock_state && w > 0 {
234 self.sync_pool
236 .push_weight(w, key, &mut self.slot, set_index);
237 }
238 key
239 }
240 pub fn reset_deque_weight(&mut self, key: DequeKey, weight_type: WeightType) -> bool {
242 if let Some(it) = self.slot.get_mut(key) {
243 if it.weight_type == weight_type {
244 return true;
245 }
246 it.weight_type = weight_type;
247 if it.deque.len() == 0 {
249 return true;
250 }
251 if Some(true) == it.lock_state {
253 return true;
254 }
255 let w = match it.weight_type {
256 WeightType::Normal(w) => w.get() as usize,
257 WeightType::Unit(w) => (w.get() as usize) * it.deque.len(),
258 };
259 self.sync_pool
261 .modify_weight(it.weight_index, w, &mut self.slot, set_index);
262 return true;
263 }
264 false
265 }
266 pub fn get_deque(&self, key: DequeKey) -> Option<&Deque<T, D>> {
268 self.slot.get(key)
269 }
270 pub fn get_deque_mut(&mut self, key: DequeKey) -> Option<&mut Deque<T, D>> {
272 let mut r = self.slot.get_mut(key);
273 if let Some(it) = &mut r {
274 it.old_deque_len = it.deque.len();
275 }
276 r
277 }
278 pub fn repair_deque_state(&mut self, key: DequeKey, state: DequeState) {
280 if Some(true) == state.lock_state {
282 return;
283 }
284 if state.new_deque_len > 0 {
285 if state.old_deque_len > 0 {
286 if state.new_deque_len == state.old_deque_len {
287 return;
288 }
289 match state.weight_type {
290 WeightType::Unit(w) => {
291 let w = (w.get() as usize) * state.new_deque_len;
293 self.sync_pool.modify_weight(
294 state.weight_index,
295 w,
296 &mut self.slot,
297 set_index,
298 );
299 }
300 _ => (),
301 };
302 if state.new_deque_len > state.old_deque_len {
303 self.sync_add_count += state.new_deque_len - state.old_deque_len;
304 } else {
305 self.sync_remove_count += state.old_deque_len - state.new_deque_len;
306 }
307 } else {
308 let w = match state.weight_type {
309 WeightType::Normal(w) => w.get() as usize,
310 WeightType::Unit(w) => (w.get() as usize) * state.new_deque_len,
311 };
312 self.sync_pool
314 .push_weight(w, key, &mut self.slot, set_index);
315 self.sync_add_count += state.new_deque_len;
316 }
317 } else if state.old_deque_len > 0 {
318 self.sync_pool
320 .remove_index(state.weight_index, &mut self.slot, set_index);
321 self.sync_remove_count += state.old_deque_len;
322 }
323 }
324 pub fn deque_unlock(&mut self, key: DequeKey) -> bool {
326 if let Some(it) = self.slot.get_mut(key) {
327 if Some(true) == it.lock_state {
328 it.lock_state = Some(false);
330 if it.deque.len() > 0 {
331 let w = match it.weight_type {
333 WeightType::Normal(w) => w.get() as usize,
334 WeightType::Unit(w) => (w.get() as usize) * it.deque.len(),
335 };
336 self.sync_pool
337 .push_weight(w, key, &mut self.slot, set_index);
338 }
339 }
340 true
341 } else {
342 false
343 }
344 }
345
346 pub fn remove_deque(&mut self, key: DequeKey) -> bool {
348 if let Some(it) = self.slot.remove(key) {
349 if it.deque.len() == 0 {
351 return true;
352 }
353 self.sync_remove_count += it.deque.len();
354 if Some(true) == it.lock_state {
356 return true;
357 }
358 self.sync_pool
359 .remove_index(it.weight_index, &mut self.slot, set_index);
360 true
361 } else {
362 false
363 }
364 }
365 pub fn push_async(&mut self, task: T, weight: u32) {
367 self.async_pool
368 .push_weight(weight as usize, task, &mut (), empty);
369 self.async_add_count += 1;
370 }
371 pub fn get_timer(&self) -> &pi_timer::Timer<T, N0, N, L> {
373 &self.timer
374 }
375 pub fn get_cancel_timer(&self) -> &pi_cancel_timer::Timer<T, N0, N, L> {
377 &self.cancel_timer
378 }
379 pub fn get_timer_mut(&mut self) -> &mut pi_timer::Timer<T, N0, N, L> {
381 &mut self.timer
382 }
383 pub fn get_cancel_timer_mut(&mut self) -> &mut pi_cancel_timer::Timer<T, N0, N, L> {
385 &mut self.cancel_timer
386 }
387 #[deprecated]
389 pub fn get_timer_weight(&self) -> usize {
390 self.timer_weight
391 }
392 #[deprecated]
394 pub fn set_timer_weight(&mut self, weight: usize) {
395 self.timer_weight = weight;
396 }
397 pub fn pop(&mut self, now: u64) -> (Option<T>, DequeKey) {
401 let sync_w = if let Some(r) = self.sync_pool.peek() {
402 r.amount() as u64
403 } else {
404 0
405 };
406 let async_w = if let Some(r) = self.async_pool.peek() {
407 r.amount() as u64
408 } else {
409 0
410 };
411 let mut cancel_timer_w = if self.cancel_timer.is_ok(now) {
413 sync_w + async_w + 2
414 } else {
415 0
416 };
417 let timer_w = if self.timer.is_ok(now) {
418 if cancel_timer_w > 0 {
419 cancel_timer_w = cancel_timer_w >> 1;
420 cancel_timer_w
421 } else {
422 sync_w + async_w + 1
423 }
424 } else {
425 0
426 };
427 let amount = sync_w + async_w + timer_w + cancel_timer_w;
428 if amount == 0 {
429 return (None, DequeKey::null());
430 }
431 let mut w = self.rng.next_u64() % amount;
432 if w < cancel_timer_w {
433 return (self.cancel_timer.pop(now), DequeKey::null());
434 } else {
435 w -= cancel_timer_w;
436 }
437 if w < timer_w {
438 return (self.timer.pop(now), DequeKey::null());
439 } else {
440 w -= timer_w;
441 }
442 if w < async_w {
443 if let Some(r) = self.async_pool.pop(&mut (), empty) {
444 self.async_remove_count += 1;
445 return (Some(r.el), DequeKey::null());
446 }
447 } else {
448 w -= async_w;
449 }
450 let index = self.sync_pool.find_weight(w as usize);
452 let key = self.sync_pool.as_slice()[index].el;
453 let it = &mut self.slot[key];
454 let r = it.deque.pop_front();
456 self.sync_remove_count += 1;
457 if it.lock_state.is_some() {
458 it.lock_state = Some(true);
460 self.sync_pool
461 .remove_index(index, &mut self.slot, set_index);
462 } else if it.deque.is_empty() {
463 self.sync_pool
465 .remove_index(index, &mut self.slot, set_index);
466 } else if let WeightType::Unit(uw) = it.weight_type {
467 let ww = (uw.get() as usize) * it.deque.len();
469 self.sync_pool
470 .modify_weight(index, ww, &mut self.slot, set_index);
471 }
472 (r, key)
473 }
474
475 pub fn pop_ignore_timer(&mut self) -> (Option<T>, DequeKey) {
477 let sync_w = if let Some(r) = self.sync_pool.peek() {
478 r.amount() as u64
479 } else {
480 0
481 };
482 let async_w = if let Some(r) = self.async_pool.peek() {
483 r.amount() as u64
484 } else {
485 0
486 };
487 let amount = sync_w + async_w;
488 if amount == 0 {
489 return (None, DequeKey::null());
490 }
491 let mut w = self.rng.next_u64() % amount;
492 if w < async_w {
493 if let Some(r) = self.async_pool.pop(&mut (), empty) {
494 self.async_remove_count += 1;
495 return (Some(r.el), DequeKey::null());
496 }
497 } else {
498 w -= async_w;
499 }
500 let index = self.sync_pool.find_weight(w as usize);
502 let key = self.sync_pool.as_slice()[index].el;
503 let it = &mut self.slot[key];
504 let r = it.deque.pop_front();
506 self.sync_remove_count += 1;
507 if it.lock_state.is_some() {
508 it.lock_state = Some(true);
510 self.sync_pool
511 .remove_index(index, &mut self.slot, set_index);
512 } else if it.deque.is_empty() {
513 self.sync_pool
515 .remove_index(index, &mut self.slot, set_index);
516 } else if let WeightType::Unit(uw) = it.weight_type {
517 let ww = (uw.get() as usize) * it.deque.len();
519 self.sync_pool
520 .modify_weight(index, ww, &mut self.slot, set_index);
521 }
522 (r, key)
523 }
524 pub fn is_ok(&mut self, now: u64) -> bool {
526 self.sync_pool.len() > 0
527 || self.async_pool.len() > 0
528 || self.timer.is_ok(now)
529 || self.cancel_timer.is_ok(now)
530 }
531}
532
533fn set_index<T, D>(
534 slot: &mut SlotMap<DequeKey, Deque<T, D>>,
535 arr: &mut [WeightItem<DequeKey>],
536 loc: usize,
537) {
538 let i = &arr[loc];
539 unsafe {
540 slot.get_unchecked_mut(i.el).weight_index = loc;
541 }
542}
543
544#[cfg(test)]
546mod test_mod {
547 use std::{
550 thread,
551 time::{Duration, Instant},
552 };
553
554 use crate::*;
556
557 #[test]
558 fn test() {
559 let mut pool: TaskPool<(u64, u64), u64, 128, 16, 1> = Default::default();
560 let arr = [
561 pool.push_deque(Deque::new(
562 WeightType::Unit(unsafe { NonZeroU32::new_unchecked(1000) }),
563 1,
564 )),
565 pool.push_deque(Deque::new(
566 WeightType::Unit(unsafe { NonZeroU32::new_unchecked(200) }),
567 2,
568 )),
569 pool.push_deque(Deque::new(
570 WeightType::Normal(unsafe { NonZeroU32::new_unchecked(20000) }),
571 3,
572 )),
573 ];
574 let mut rng = WyRng::seed_from_u64(22222);
575 let start = Instant::now();
576 for i in 1..100000 {
577 let t = (rng.next_u32() % 16100) as u64;
578 let now = Instant::now();
579 let tt = now.duration_since(start).as_millis() as u64;
580 if i < 100 {
581 if t % 4 == 0 {
582 println!("push: timeout:{} r:{:?}", t, (i, t));
583 pool.get_timer_mut().push(t as usize, (i, t));
584 } else if t % 4 == 1 {
585 println!("push: cancel:{} r:{:?}", t, (i, t));
586 pool.get_cancel_timer_mut().push(t as usize, (i, t));
587 } else if t % 4 == 2 {
588 println!("push: async:{} r:{:?}", t, (i, t));
589 pool.push_async((i, t), t as u32);
590 continue;
591 } else if t % 4 == 3 {
592 println!("push: sync:{} r:{:?}", t, (i, t));
593 let k = arr[(t % 3) as usize];
594 let d = pool.get_deque_mut(k).unwrap();
595 d.deque.push_back((i, t));
596 let state = d.state();
597 pool.repair_deque_state(k, state);
598 continue;
599 }
600 }
601 while let (Some(it), dk) = pool.pop(tt) {
602 println!("ppp:{:?}, now:{}, dk:{:?}", it, tt, dk);
603 }
604 if i > 100 && pool.add_count() == pool.remove_count() {
605 println!("return: add_count:{:?}", pool.add_count());
607 return;
608 }
609 thread::sleep(Duration::from_millis(1 as u64));
610 }
611 }
612}