1use std::collections::HashSet;
2use std::collections::VecDeque;
3use std::fmt::Debug;
4use std::sync::atomic::{AtomicIsize, Ordering};
5use std::sync::Arc;
6
7use futures::channel::mpsc;
8use once_cell::sync::OnceCell;
9use parking_lot::RwLock;
10
11pub use builder::{Builder, SpawnDefaultExt, SpawnExt};
12pub use exec::{TaskExecQueue, TaskType};
13pub use local::LocalTaskExecQueue;
14pub use local::LocalTaskType;
15pub use local_builder::{LocalBuilder, LocalSender, LocalSpawnExt};
16pub use local_spawner::{LocalGroupSpawner, LocalSpawner, TryLocalGroupSpawner, TryLocalSpawner};
17pub use spawner::{GroupSpawner, Spawner, TryGroupSpawner, TrySpawner};
18
19mod builder;
20mod close;
21mod exec;
22mod flush;
23mod spawner;
24
25mod local;
26mod local_builder;
27mod local_spawner;
28
29#[derive(Clone, Debug)]
30struct Counter(std::sync::Arc<AtomicIsize>);
31
32impl Counter {
33 #[inline]
34 fn new() -> Self {
35 Counter(std::sync::Arc::new(AtomicIsize::new(0)))
36 }
37
38 #[inline]
39 fn inc(&self) {
40 self.0.fetch_add(1, Ordering::SeqCst);
41 }
42
43 #[inline]
44 fn dec(&self) {
45 self.0.fetch_sub(1, Ordering::SeqCst);
46 }
47
48 #[inline]
49 fn value(&self) -> isize {
50 self.0.load(Ordering::SeqCst)
51 }
52}
53
54#[derive(Clone)]
55struct IndexSet(Arc<RwLock<HashSet<usize, ahash::RandomState>>>);
56
57impl IndexSet {
58 #[inline]
59 fn new() -> Self {
60 Self(Arc::new(RwLock::new(HashSet::default())))
61 }
62
63 #[inline]
64 #[allow(dead_code)]
65 fn len(&self) -> usize {
66 self.0.read().len()
67 }
68
69 #[inline]
70 fn is_empty(&self) -> bool {
71 self.0.read().is_empty()
72 }
73
74 #[inline]
75 fn insert(&self, v: usize) {
76 self.0.write().insert(v);
77 }
78
79 #[inline]
80 fn pop(&self) -> Option<usize> {
81 let mut set = self.0.write();
82 if let Some(idx) = set.iter().next().copied() {
83 set.remove(&idx);
84 Some(idx)
85 } else {
86 None
87 }
88 }
89}
90
91struct GroupTaskExecQueue<TT> {
92 tasks: VecDeque<TT>,
93 is_running: bool,
94}
95
96impl<TT> GroupTaskExecQueue<TT> {
97 #[inline]
98 fn new() -> Self {
99 Self {
100 tasks: VecDeque::default(),
101 is_running: false,
102 }
103 }
104
105 #[inline]
106 fn push(&mut self, task: TT) {
107 self.tasks.push_back(task);
108 }
109
110 #[inline]
111 fn pop(&mut self) -> Option<TT> {
112 if let Some(task) = self.tasks.pop_front() {
113 Some(task)
114 } else {
115 self.set_running(false);
116 None
117 }
118 }
119
120 #[inline]
121 fn set_running(&mut self, b: bool) {
122 self.is_running = b;
123 }
124
125 #[inline]
126 fn is_running(&self) -> bool {
127 self.is_running
128 }
129}
130
131#[derive(thiserror::Error, Debug)]
132pub enum Error<T> {
133 #[error("send error")]
134 SendError(ErrorType<T>),
135 #[error("try send error")]
136 TrySendError(ErrorType<T>),
137 #[error("send timeout error")]
138 SendTimeoutError(ErrorType<T>),
139 #[error("recv result error")]
140 RecvResultError,
141}
142
143#[derive(Debug, Eq, PartialEq)]
144pub enum ErrorType<T> {
145 Full(Option<T>),
146 Closed(Option<T>),
147 Timeout(Option<T>),
148}
149
150impl<T> Error<T> {
151 #[inline]
152 pub fn is_full(&self) -> bool {
153 matches!(
154 self,
155 Error::SendError(ErrorType::Full(_))
156 | Error::TrySendError(ErrorType::Full(_))
157 | Error::SendTimeoutError(ErrorType::Full(_))
158 )
159 }
160
161 #[inline]
162 pub fn is_closed(&self) -> bool {
163 matches!(
164 self,
165 Error::SendError(ErrorType::Closed(_))
166 | Error::TrySendError(ErrorType::Closed(_))
167 | Error::SendTimeoutError(ErrorType::Closed(_))
168 )
169 }
170
171 #[inline]
172 pub fn is_timeout(&self) -> bool {
173 matches!(
174 self,
175 Error::SendError(ErrorType::Timeout(_))
176 | Error::TrySendError(ErrorType::Timeout(_))
177 | Error::SendTimeoutError(ErrorType::Timeout(_))
178 )
179 }
180}
181
182impl<T> From<mpsc::TrySendError<T>> for Error<T> {
183 fn from(e: mpsc::TrySendError<T>) -> Self {
184 if e.is_full() {
185 Error::TrySendError(ErrorType::Full(Some(e.into_inner())))
186 } else {
187 Error::TrySendError(ErrorType::Closed(Some(e.into_inner())))
188 }
189 }
190}
191
192impl<T> From<mpsc::SendError> for Error<T> {
193 fn from(e: mpsc::SendError) -> Self {
194 if e.is_full() {
195 Error::SendError(ErrorType::Full(None))
196 } else {
197 Error::SendError(ErrorType::Closed(None))
198 }
199 }
200}
201
202pub(crate) fn assert_future<T, F>(future: F) -> F
205where
206 F: futures::Future<Output = T>,
207{
208 future
209}
210
211static DEFAULT_EXEC_QUEUE: OnceCell<TaskExecQueue> = OnceCell::new();
212
213pub fn set_default(queue: TaskExecQueue) -> Result<(), TaskExecQueue> {
214 DEFAULT_EXEC_QUEUE.set(queue)
215}
216
217pub fn init_default() -> impl futures::Future<Output = ()> {
218 let (queue, runner) = Builder::default().workers(100).queue_max(100_000).build();
219 DEFAULT_EXEC_QUEUE.set(queue).ok().unwrap();
220 runner
221}
222
223pub fn default() -> &'static TaskExecQueue {
224 DEFAULT_EXEC_QUEUE
225 .get()
226 .expect("default task execution queue must be set first")
227}
228
229#[test]
230fn test_index_set() {
231 let set = IndexSet::new();
232 set.insert(1);
233 set.insert(10);
234 set.insert(100);
235 assert_eq!(set.len(), 3);
236 assert!(matches!(set.pop(), Some(1) | Some(10) | Some(100)));
237 assert_eq!(set.len(), 2);
238 set.pop();
239 set.pop();
240 assert_eq!(set.len(), 0);
241}
242
243#[cfg(test)]
247mod tests {
248 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
249 use std::sync::Arc;
250 use std::time::Duration;
251
252 use futures::Future;
253
254 use super::*;
255 use crate::builder::{Builder, SpawnExt};
256 use crate::local_builder::LocalBuilder;
257
258 #[test]
262 fn test_error_type_variants() {
263 let err = Error::<i32>::TrySendError(ErrorType::Full(Some(42)));
264 assert!(err.is_full());
265 assert!(!err.is_closed());
266 assert!(!err.is_timeout());
267 assert!(matches!(
268 err,
269 Error::TrySendError(ErrorType::Full(Some(42)))
270 ));
271
272 let err = Error::<i32>::SendError(ErrorType::Full(Some(42)));
273 assert!(err.is_full());
274 assert!(matches!(err, Error::SendError(ErrorType::Full(Some(42)))));
275
276 let err = Error::<i32>::SendTimeoutError(ErrorType::Full(Some(42)));
277 assert!(err.is_full());
278 assert!(matches!(
279 err,
280 Error::SendTimeoutError(ErrorType::Full(Some(42)))
281 ));
282
283 let err = Error::<i32>::TrySendError(ErrorType::Closed(Some(99)));
284 assert!(err.is_closed());
285 assert!(!err.is_full());
286
287 let err = Error::<i32>::SendError(ErrorType::Closed(Some(99)));
288 assert!(err.is_closed());
289
290 let err = Error::<i32>::SendTimeoutError(ErrorType::Closed(Some(99)));
291 assert!(err.is_closed());
292
293 let err = Error::<i32>::SendTimeoutError(ErrorType::Timeout(Some(77)));
294 assert!(err.is_timeout());
295 assert!(!err.is_full());
296 assert!(!err.is_closed());
297
298 let err = Error::<i32>::RecvResultError;
299 assert!(!err.is_full());
300 assert!(!err.is_closed());
301 assert!(!err.is_timeout());
302 }
303
304 #[test]
305 fn test_error_type_debug_and_display() {
306 let err = Error::<String>::TrySendError(ErrorType::Full(Some("hello".into())));
307 assert!(!format!("{:?}", err).is_empty());
308 assert!(!format!("{}", err).is_empty());
309 }
310
311 #[test]
312 fn test_error_type_eq() {
313 assert_eq!(
314 ErrorType::<i32>::Full(Some(1)),
315 ErrorType::<i32>::Full(Some(1))
316 );
317 assert_ne!(
318 ErrorType::<i32>::Full(Some(1)),
319 ErrorType::<i32>::Full(Some(2))
320 );
321 assert_ne!(
322 ErrorType::<i32>::Full(Some(1)),
323 ErrorType::<i32>::Closed(Some(1))
324 );
325 }
326
327 #[test]
328 fn test_error_none_values() {
329 assert!(Error::<()>::SendError(ErrorType::Full(None)).is_full());
330 assert!(Error::<()>::SendError(ErrorType::Closed(None)).is_closed());
331 assert!(Error::<()>::SendTimeoutError(ErrorType::Timeout(None)).is_timeout());
332 }
333
334 #[test]
338 fn test_builder_build_default_queue_workers() {
339 let (queue, _runner) = Builder::default().build();
340 assert_eq!(queue.workers(), 100);
341 }
342
343 #[test]
344 fn test_builder_custom_workers() {
345 let (queue, _runner) = Builder::default().workers(42).build();
346 assert_eq!(queue.workers(), 42);
347 }
348
349 #[test]
350 fn test_builder_custom_queue_max_is_full_check() {
351 let (queue, _runner) = Builder::default().queue_max(10).build();
352 assert_eq!(queue.waiting_count(), 0);
353 }
354
355 #[test]
356 fn test_builder_custom_both() {
357 let (q, _r) = Builder::default().workers(8).queue_max(2048).build();
358 assert_eq!(q.workers(), 8);
359 }
360
361 #[test]
362 fn test_group_builder_build() {
363 let (_queue, _runner): (TaskExecQueue<_, String, ()>, _) =
364 Builder::default().group().build::<String>();
365 }
366
367 #[test]
368 fn test_group_builder_default_workers() {
369 let (queue, _runner): (TaskExecQueue<_, String, ()>, _) =
370 Builder::default().group().build::<String>();
371 assert_eq!(queue.workers(), 100);
372 }
373
374 #[test]
375 fn test_builder_with_channel() {
376 let (tx, rx) = futures::channel::mpsc::channel::<((), TaskType)>(10);
377 let (_queue, _runner) = Builder::default().with_channel::<_, _, ()>(tx, rx).build();
378 }
379
380 #[test]
381 fn test_builder_with_channel_group() {
382 let (tx, rx) = futures::channel::mpsc::channel::<((), TaskType)>(10);
383 let (_queue, _runner) = Builder::default()
384 .with_channel::<_, _, ()>(tx, rx)
385 .group()
386 .build::<String>();
387 }
388
389 #[test]
390 fn test_builder_build_default_types() {
391 let (_queue, _runner): (TaskExecQueue, _) = Builder::default().build();
392 let (_queue, _runner): (TaskExecQueue<_, (), ()>, _) = Builder::default().build();
393 }
394
395 #[test]
396 fn test_group_channel_builder_combined() {
397 let (tx, rx) = futures::channel::mpsc::channel::<((), TaskType)>(50);
398 let (_queue, _runner) = Builder::default()
399 .workers(3)
400 .queue_max(50)
401 .with_channel::<_, _, ()>(tx, rx)
402 .group()
403 .build::<String>();
404 }
405
406 #[test]
410 fn test_queue_state_initial() {
411 let (queue, _runner) = Builder::default().workers(4).queue_max(1000).build();
412 assert_eq!(queue.workers(), 4);
413 assert_eq!(queue.active_count(), 0);
414 assert_eq!(queue.waiting_count(), 0);
415 assert!(!queue.is_active());
416 assert!(!queue.is_closed());
417 assert!(!queue.is_full());
418 assert!(!queue.is_flushing());
419 assert_eq!(queue.pending_wakers_count(), 0);
420 assert_eq!(queue.waiting_wakers_count(), 0);
421 }
422
423 #[test]
424 fn test_queue_state_is_full_initially_false() {
425 let (queue, _runner) = Builder::default().queue_max(1).build();
426 assert!(!queue.is_full());
427 }
428
429 #[test]
430 fn test_queue_state_is_closed_initially_false() {
431 let (queue, _runner) = Builder::default().build();
432 assert!(!queue.is_closed());
433 }
434
435 #[tokio::test]
439 async fn test_spawn_task_executes() {
440 let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
441 tokio::spawn(runner);
442
443 let flag = Arc::new(AtomicBool::new(false));
444 let f = flag.clone();
445 let result = queue
446 .spawn(async move { f.store(true, Ordering::SeqCst) })
447 .await;
448 assert!(result.is_ok());
449 tokio::time::sleep(Duration::from_millis(100)).await;
450 assert!(flag.load(Ordering::SeqCst));
451 }
452
453 #[tokio::test]
454 async fn test_try_spawn_task_executes() {
455 let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
456 tokio::spawn(runner);
457
458 let flag = Arc::new(AtomicBool::new(false));
459 let f = flag.clone();
460 let result = queue
461 .try_spawn(async move { f.store(true, Ordering::SeqCst) })
462 .await;
463 assert!(result.is_ok());
464 tokio::time::sleep(Duration::from_millis(100)).await;
465 assert!(flag.load(Ordering::SeqCst));
466 }
467
468 #[tokio::test]
469 async fn test_spawn_multiple_tasks() {
470 let (queue, runner) = Builder::default().workers(5).queue_max(100).build();
471 tokio::spawn(runner);
472
473 let counter = Arc::new(AtomicUsize::new(0));
474 for _ in 0..10 {
475 let c = counter.clone();
476 let result = queue
477 .spawn(async move {
478 c.fetch_add(1, Ordering::SeqCst);
479 })
480 .await;
481 assert!(result.is_ok());
482 }
483 tokio::time::sleep(Duration::from_millis(200)).await;
484 assert_eq!(counter.load(Ordering::SeqCst), 10);
485 }
486
487 #[tokio::test]
488 async fn test_spawn_with_name() {
489 let (tx, rx) = futures::channel::mpsc::channel::<(&'static str, TaskType)>(100);
491 let (queue, runner) = Builder::default()
492 .workers(2)
493 .queue_max(100)
494 .with_channel::<_, _, &'static str>(tx, rx)
495 .build();
496 tokio::spawn(runner);
497
498 let flag = Arc::new(AtomicBool::new(false));
499 let f = flag.clone();
500 let result = queue
501 .spawn_with(async move { f.store(true, Ordering::SeqCst) }, "named")
502 .await;
503 assert!(result.is_ok());
504 tokio::time::sleep(Duration::from_millis(100)).await;
505 assert!(flag.load(Ordering::SeqCst));
506 }
507
508 #[tokio::test]
509 async fn test_try_spawn_with_name() {
510 let (tx, rx) = futures::channel::mpsc::channel::<(&'static str, TaskType)>(100);
511 let (queue, runner) = Builder::default()
512 .workers(2)
513 .queue_max(100)
514 .with_channel::<_, _, &'static str>(tx, rx)
515 .build();
516 tokio::spawn(runner);
517
518 let flag = Arc::new(AtomicBool::new(false));
519 let f = flag.clone();
520 let result = queue
521 .try_spawn_with(async move { f.store(true, Ordering::SeqCst) }, "try_named")
522 .await;
523 assert!(result.is_ok());
524 tokio::time::sleep(Duration::from_millis(100)).await;
525 assert!(flag.load(Ordering::SeqCst));
526 }
527
528 #[tokio::test]
529 async fn test_spawn_result_returns_ok() {
530 let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
531 tokio::spawn(runner);
532 let result = queue.spawn(async {}).await;
533 assert!(result.is_ok());
534 }
535
536 #[tokio::test]
540 async fn test_group_same_key_sequential_order() {
541 let (queue, runner): (TaskExecQueue<_, String, ()>, _) = Builder::default()
542 .workers(2)
543 .queue_max(100)
544 .group()
545 .build::<String>();
546 tokio::spawn(runner);
547
548 let results = Arc::new(parking_lot::Mutex::new(Vec::new()));
549 let r = results.clone();
550 let result = queue
551 .spawn(async move { r.lock().push(1) })
552 .group("group_a".to_string())
553 .await;
554 assert!(result.is_ok());
555
556 let r = results.clone();
557 let result = queue
558 .spawn(async move { r.lock().push(2) })
559 .group("group_a".to_string())
560 .await;
561 assert!(result.is_ok());
562
563 tokio::time::sleep(Duration::from_millis(200)).await;
564 assert_eq!(*results.lock(), vec![1, 2]);
565 }
566
567 #[tokio::test]
568 async fn test_group_different_keys_concurrent() {
569 let (queue, runner): (TaskExecQueue<_, String, ()>, _) = Builder::default()
570 .workers(5)
571 .queue_max(100)
572 .group()
573 .build::<String>();
574 tokio::spawn(runner);
575
576 let results = Arc::new(parking_lot::Mutex::new(Vec::new()));
577
578 let r = results.clone();
579 let result = queue
580 .spawn(async move { r.lock().push("a1") })
581 .group("grp_a".to_string())
582 .await;
583 assert!(result.is_ok());
584
585 let r = results.clone();
586 let result = queue
587 .spawn(async move { r.lock().push("b1") })
588 .group("grp_b".to_string())
589 .await;
590 assert!(result.is_ok());
591
592 tokio::time::sleep(Duration::from_millis(200)).await;
593 let vec = results.lock();
594 assert_eq!(vec.len(), 2);
595 assert!(vec.contains(&"a1"));
596 assert!(vec.contains(&"b1"));
597 }
598
599 #[tokio::test]
600 async fn test_try_group_spawner() {
601 let (queue, runner): (TaskExecQueue<_, String, ()>, _) = Builder::default()
602 .workers(2)
603 .queue_max(100)
604 .group()
605 .build::<String>();
606 tokio::spawn(runner);
607
608 let flag = Arc::new(AtomicBool::new(false));
609 let f = flag.clone();
610 let result = queue
611 .try_spawn(async move { f.store(true, Ordering::SeqCst) })
612 .group("grp_x".to_string())
613 .await;
614 assert!(result.is_ok());
615 tokio::time::sleep(Duration::from_millis(100)).await;
616 assert!(flag.load(Ordering::SeqCst));
617 }
618
619 #[tokio::test]
620 async fn test_group_spawner_consecutive() {
621 let (queue, runner): (TaskExecQueue<_, String, ()>, _) = Builder::default()
622 .workers(2)
623 .queue_max(100)
624 .group()
625 .build::<String>();
626 tokio::spawn(runner);
627
628 let counter = Arc::new(AtomicUsize::new(0));
629 for _ in 0..3 {
630 let c = counter.clone();
631 let result = queue
632 .spawn(async move {
633 c.fetch_add(1, Ordering::SeqCst);
634 tokio::time::sleep(Duration::from_millis(50)).await;
635 })
636 .group("seq".to_string())
637 .await;
638 assert!(result.is_ok());
639 }
640 tokio::time::sleep(Duration::from_millis(500)).await;
641 assert_eq!(counter.load(Ordering::SeqCst), 3);
642 }
643
644 #[tokio::test]
648 async fn test_close_drains_and_closes() {
649 let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
650 tokio::spawn(runner);
651
652 let counter = Arc::new(AtomicUsize::new(0));
653 let c = counter.clone();
654 let _ = queue
655 .spawn(async move {
656 tokio::time::sleep(Duration::from_millis(50)).await;
657 c.fetch_add(1, Ordering::SeqCst);
658 })
659 .await;
660
661 let result = queue.close().await;
662 assert!(result.is_ok());
663 assert!(queue.is_closed());
664 tokio::time::sleep(Duration::from_millis(50)).await;
665 assert_eq!(counter.load(Ordering::SeqCst), 1);
666 }
667
668 #[tokio::test]
669 async fn test_flush_waiting_tasks() {
670 let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
671 tokio::spawn(runner);
672
673 let counter = Arc::new(AtomicUsize::new(0));
674 for _ in 0..3 {
675 let c = counter.clone();
676 let _ = queue
677 .spawn(async move {
678 tokio::time::sleep(Duration::from_millis(30)).await;
679 c.fetch_add(1, Ordering::SeqCst);
680 })
681 .await;
682 }
683
684 let result = queue.flush().await;
685 assert!(result.is_ok());
686 assert_eq!(counter.load(Ordering::SeqCst), 3);
687 }
688
689 #[tokio::test]
690 async fn test_spawn_after_close_returns_error() {
691 let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
692 tokio::spawn(runner);
693 let _ = queue.close().await;
694
695 let result = queue.spawn(async {}).await;
696 assert!(result.is_err());
697 }
698
699 #[tokio::test]
700 async fn test_try_spawn_after_close_fails() {
701 let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
702 tokio::spawn(runner);
703 let _ = queue.close().await;
704
705 let result = queue.try_spawn(async {}).await;
706 assert!(result.is_err());
707 }
708
709 #[tokio::test]
710 async fn test_close_before_start() {
711 let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
712 let runner_handle = tokio::spawn(runner);
713
714 let result = queue.close().await;
715 assert!(result.is_ok());
716 assert!(queue.is_closed());
717 let _ = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
718 }
719
720 fn run_local<F, T>(f: F) -> T
724 where
725 F: Future<Output = T>,
726 {
727 let rt = tokio::runtime::Builder::new_current_thread()
728 .enable_time()
729 .build()
730 .unwrap();
731 let local = tokio::task::LocalSet::new();
732 rt.block_on(local.run_until(f))
733 }
734
735 #[test]
736 fn test_local_spawn_task_executes() {
737 run_local(async {
738 let (queue, runner) = LocalBuilder::default().workers(2).queue_max(100).build();
739 tokio::task::spawn_local(runner);
740
741 let flag = Arc::new(AtomicBool::new(false));
742 let f = flag.clone();
743 let result = queue
744 .spawn(async move { f.store(true, Ordering::SeqCst) })
745 .await;
746 assert!(result.is_ok());
747 tokio::time::sleep(Duration::from_millis(100)).await;
748 assert!(flag.load(Ordering::SeqCst));
749 });
750 }
751
752 #[test]
753 fn test_local_try_spawn() {
754 run_local(async {
755 let (queue, runner) = LocalBuilder::default().workers(2).queue_max(100).build();
756 tokio::task::spawn_local(runner);
757
758 let flag = Arc::new(AtomicBool::new(false));
759 let f = flag.clone();
760 let result = queue
761 .try_spawn(async move { f.store(true, Ordering::SeqCst) })
762 .await;
763 assert!(result.is_ok());
764 tokio::time::sleep(Duration::from_millis(100)).await;
765 assert!(flag.load(Ordering::SeqCst));
766 });
767 }
768
769 #[test]
770 fn test_local_group_spawner() {
771 run_local(async {
772 let (queue, runner): (LocalTaskExecQueue<_, String, ()>, _) = LocalBuilder::default()
773 .workers(2)
774 .queue_max(100)
775 .group()
776 .build::<String>();
777 tokio::task::spawn_local(runner);
778
779 let results = Arc::new(parking_lot::Mutex::new(Vec::new()));
780 let r = results.clone();
781 let result = queue
782 .spawn(async move { r.lock().push(1) })
783 .group("g".to_string())
784 .await;
785 assert!(result.is_ok());
786
787 let r = results.clone();
788 let result = queue
789 .spawn(async move { r.lock().push(2) })
790 .group("g".to_string())
791 .await;
792 assert!(result.is_ok());
793
794 tokio::time::sleep(Duration::from_millis(200)).await;
795 assert_eq!(*results.lock(), vec![1, 2]);
796 });
797 }
798
799 #[test]
800 fn test_local_queue_state() {
801 let (queue, _runner) = LocalBuilder::default().workers(3).queue_max(500).build();
802 assert_eq!(queue.workers(), 3);
803 assert!(!queue.is_closed());
804 assert!(!queue.is_full());
805 assert_eq!(queue.active_count(), 0);
806 assert_eq!(queue.waiting_count(), 0);
807 }
808
809 #[test]
810 fn test_local_close() {
811 run_local(async {
812 let (queue, runner) = LocalBuilder::default().workers(2).queue_max(100).build();
813 tokio::task::spawn_local(runner);
814
815 let counter = Arc::new(AtomicUsize::new(0));
816 let c = counter.clone();
817 let _ = queue
818 .spawn(async move { c.fetch_add(1, Ordering::SeqCst) })
819 .await;
820
821 let result = queue.close().await;
822 assert!(result.is_ok());
823 assert!(queue.is_closed());
824 tokio::time::sleep(Duration::from_millis(100)).await;
825 assert_eq!(counter.load(Ordering::SeqCst), 1);
826 });
827 }
828
829 #[test]
830 fn test_local_flush() {
831 run_local(async {
832 let (queue, runner) = LocalBuilder::default().workers(2).queue_max(100).build();
833 tokio::task::spawn_local(runner);
834
835 let counter = Arc::new(AtomicUsize::new(0));
836 for _ in 0..3 {
837 let c = counter.clone();
838 let _ = queue
839 .spawn(async move { c.fetch_add(1, Ordering::SeqCst) })
840 .await;
841 }
842
843 let result = queue.flush().await;
844 assert!(result.is_ok());
845 assert_eq!(counter.load(Ordering::SeqCst), 3);
846 });
847 }
848
849 #[test]
850 fn test_local_spawn_with_name() {
851 run_local(async {
852 let (tx, rx) = futures::channel::mpsc::channel::<(&'static str, LocalTaskType)>(100);
853 let (queue, runner) = LocalBuilder::default()
854 .workers(2)
855 .queue_max(100)
856 .with_channel::<_, _, &'static str>(tx, rx)
857 .build();
858 tokio::task::spawn_local(runner);
859
860 let flag = Arc::new(AtomicBool::new(false));
861 let f = flag.clone();
862 let result = queue
863 .spawn_with(
864 async move { f.store(true, Ordering::SeqCst) },
865 "local_named",
866 )
867 .await;
868 assert!(result.is_ok());
869 tokio::time::sleep(Duration::from_millis(100)).await;
870 assert!(flag.load(Ordering::SeqCst));
871 });
872 }
873
874 #[test]
875 fn test_local_try_group_spawner() {
876 run_local(async {
877 let (queue, runner): (LocalTaskExecQueue<_, String, ()>, _) = LocalBuilder::default()
878 .workers(2)
879 .queue_max(100)
880 .group()
881 .build::<String>();
882 tokio::task::spawn_local(runner);
883
884 let flag = Arc::new(AtomicBool::new(false));
885 let f = flag.clone();
886 let result = queue
887 .try_spawn(async move { f.store(true, Ordering::SeqCst) })
888 .group("g".to_string())
889 .await;
890 assert!(result.is_ok());
891 tokio::time::sleep(Duration::from_millis(100)).await;
892 assert!(flag.load(Ordering::SeqCst));
893 });
894 }
895
896 #[tokio::test]
900 async fn test_spawn_default_with_manual_set() {
901 let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
902 tokio::spawn(runner);
903
904 if set_default(queue).is_err() {
906 return;
908 }
909
910 let flag = Arc::new(AtomicBool::new(false));
911 let f = flag.clone();
912 let result = default()
913 .spawn(async move { f.store(true, Ordering::SeqCst) })
914 .await;
915 assert!(result.is_ok());
916 tokio::time::sleep(Duration::from_millis(100)).await;
917 assert!(flag.load(Ordering::SeqCst));
918 }
919
920 #[tokio::test]
921 async fn test_default_already_set() {
922 let (q1, r1) = Builder::default().workers(1).queue_max(10).build();
923 let (q2, _r2) = Builder::default().workers(1).queue_max(10).build();
924 tokio::spawn(r1);
925
926 assert!(set_default(q1).is_ok());
927 assert!(set_default(q2).is_err());
928 }
929
930 #[tokio::test]
934 async fn test_spawn_ext_trait() {
935 let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
936 tokio::spawn(runner);
937
938 let flag = Arc::new(AtomicBool::new(false));
939 let f = flag.clone();
940 let result = SpawnExt::spawn(async move { f.store(true, Ordering::SeqCst) }, &queue).await;
941 assert!(result.is_ok());
942 tokio::time::sleep(Duration::from_millis(100)).await;
943 assert!(flag.load(Ordering::SeqCst));
944 }
945
946 #[tokio::test]
947 async fn test_spawn_ext_with_name() {
948 let (tx, rx) = futures::channel::mpsc::channel::<(&'static str, TaskType)>(100);
949 let (queue, runner) = Builder::default()
950 .workers(2)
951 .queue_max(100)
952 .with_channel::<_, _, &'static str>(tx, rx)
953 .build();
954 tokio::spawn(runner);
955
956 let flag = Arc::new(AtomicBool::new(false));
957 let f = flag.clone();
958 let result = SpawnExt::spawn_with(
959 async move { f.store(true, Ordering::SeqCst) },
960 &queue,
961 "named",
962 )
963 .await;
964 assert!(result.is_ok());
965 tokio::time::sleep(Duration::from_millis(100)).await;
966 assert!(flag.load(Ordering::SeqCst));
967 }
968
969 #[test]
973 fn test_counter_inc_dec_value() {
974 let c = Counter::new();
975 assert_eq!(c.value(), 0);
976 c.inc();
977 assert_eq!(c.value(), 1);
978 c.inc();
979 assert_eq!(c.value(), 2);
980 c.dec();
981 assert_eq!(c.value(), 1);
982 c.dec();
983 assert_eq!(c.value(), 0);
984 }
985
986 #[cfg(feature = "rate")]
990 #[tokio::test]
991 async fn test_rate_completed_count() {
992 let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
993 tokio::spawn(runner);
994
995 for _ in 0..5 {
996 let _ = queue.spawn(async {}).await;
997 }
998
999 tokio::time::sleep(Duration::from_millis(200)).await;
1000 assert!(queue.completed_count().await >= 0);
1001 }
1002
1003 #[cfg(feature = "rate")]
1004 #[tokio::test]
1005 async fn test_rate_method() {
1006 let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
1007 tokio::spawn(runner);
1008
1009 for _ in 0..10 {
1010 let _ = queue.spawn(async {}).await;
1011 }
1012
1013 tokio::time::sleep(Duration::from_millis(300)).await;
1014 assert!(queue.rate().await >= 0.0);
1015 }
1016
1017 #[test]
1021 fn test_local_builder_default_workers() {
1022 let (queue, _runner) = LocalBuilder::default().build();
1023 assert_eq!(queue.workers(), 100);
1024 }
1025
1026 #[test]
1027 fn test_local_builder_custom_workers() {
1028 let (queue, _runner) = LocalBuilder::default().workers(4).queue_max(500).build();
1029 assert_eq!(queue.workers(), 4);
1030 }
1031
1032 #[test]
1033 fn test_local_group_builder() {
1034 let (_queue, _runner): (LocalTaskExecQueue<_, String, ()>, _) =
1035 LocalBuilder::default().workers(2).group().build::<String>();
1036 }
1037
1038 #[test]
1039 fn test_local_channel_builder() {
1040 let (tx, rx) = futures::channel::mpsc::channel::<((), LocalTaskType)>(10);
1041 let (_queue, _runner) = LocalBuilder::default()
1042 .with_channel::<_, _, ()>(tx, rx)
1043 .build();
1044 }
1045
1046 #[test]
1047 fn test_local_channel_builder_group() {
1048 let (tx, rx) = futures::channel::mpsc::channel::<((), LocalTaskType)>(10);
1049 let (_queue, _runner): (LocalTaskExecQueue<_, String, ()>, _) = LocalBuilder::default()
1050 .with_channel::<_, _, ()>(tx, rx)
1051 .group()
1052 .build::<String>();
1053 }
1054}