Skip to main content

task_exec_queue/
lib.rs

1use std::collections::HashSet;
2use std::collections::VecDeque;
3use std::fmt::Debug;
4use std::sync::atomic::{AtomicIsize, Ordering};
5use std::sync::Arc;
6
7use futures::channel::mpsc;
8use once_cell::sync::OnceCell;
9use parking_lot::RwLock;
10
11pub use builder::{Builder, SpawnDefaultExt, SpawnExt};
12pub use exec::{TaskExecQueue, TaskType};
13pub use local::LocalTaskExecQueue;
14pub use local::LocalTaskType;
15pub use local_builder::{LocalBuilder, LocalSender, LocalSpawnExt};
16pub use local_spawner::{LocalGroupSpawner, LocalSpawner, TryLocalGroupSpawner, TryLocalSpawner};
17pub use spawner::{GroupSpawner, Spawner, TryGroupSpawner, TrySpawner};
18
19mod builder;
20mod close;
21mod exec;
22mod flush;
23mod spawner;
24
25mod local;
26mod local_builder;
27mod local_spawner;
28
29#[derive(Clone, Debug)]
30struct Counter(std::sync::Arc<AtomicIsize>);
31
32impl Counter {
33    #[inline]
34    fn new() -> Self {
35        Counter(std::sync::Arc::new(AtomicIsize::new(0)))
36    }
37
38    #[inline]
39    fn inc(&self) {
40        self.0.fetch_add(1, Ordering::SeqCst);
41    }
42
43    #[inline]
44    fn dec(&self) {
45        self.0.fetch_sub(1, Ordering::SeqCst);
46    }
47
48    #[inline]
49    fn value(&self) -> isize {
50        self.0.load(Ordering::SeqCst)
51    }
52}
53
54#[derive(Clone)]
55struct IndexSet(Arc<RwLock<HashSet<usize, ahash::RandomState>>>);
56
57impl IndexSet {
58    #[inline]
59    fn new() -> Self {
60        Self(Arc::new(RwLock::new(HashSet::default())))
61    }
62
63    #[inline]
64    #[allow(dead_code)]
65    fn len(&self) -> usize {
66        self.0.read().len()
67    }
68
69    #[inline]
70    fn is_empty(&self) -> bool {
71        self.0.read().is_empty()
72    }
73
74    #[inline]
75    fn insert(&self, v: usize) {
76        self.0.write().insert(v);
77    }
78
79    #[inline]
80    fn pop(&self) -> Option<usize> {
81        let mut set = self.0.write();
82        if let Some(idx) = set.iter().next().copied() {
83            set.remove(&idx);
84            Some(idx)
85        } else {
86            None
87        }
88    }
89}
90
91struct GroupTaskExecQueue<TT> {
92    tasks: VecDeque<TT>,
93    is_running: bool,
94}
95
96impl<TT> GroupTaskExecQueue<TT> {
97    #[inline]
98    fn new() -> Self {
99        Self {
100            tasks: VecDeque::default(),
101            is_running: false,
102        }
103    }
104
105    #[inline]
106    fn push(&mut self, task: TT) {
107        self.tasks.push_back(task);
108    }
109
110    #[inline]
111    fn pop(&mut self) -> Option<TT> {
112        if let Some(task) = self.tasks.pop_front() {
113            Some(task)
114        } else {
115            self.set_running(false);
116            None
117        }
118    }
119
120    #[inline]
121    fn set_running(&mut self, b: bool) {
122        self.is_running = b;
123    }
124
125    #[inline]
126    fn is_running(&self) -> bool {
127        self.is_running
128    }
129}
130
131#[derive(thiserror::Error, Debug)]
132pub enum Error<T> {
133    #[error("send error")]
134    SendError(ErrorType<T>),
135    #[error("try send error")]
136    TrySendError(ErrorType<T>),
137    #[error("send timeout error")]
138    SendTimeoutError(ErrorType<T>),
139    #[error("recv result error")]
140    RecvResultError,
141}
142
143#[derive(Debug, Eq, PartialEq)]
144pub enum ErrorType<T> {
145    Full(Option<T>),
146    Closed(Option<T>),
147    Timeout(Option<T>),
148}
149
150impl<T> Error<T> {
151    #[inline]
152    pub fn is_full(&self) -> bool {
153        matches!(
154            self,
155            Error::SendError(ErrorType::Full(_))
156                | Error::TrySendError(ErrorType::Full(_))
157                | Error::SendTimeoutError(ErrorType::Full(_))
158        )
159    }
160
161    #[inline]
162    pub fn is_closed(&self) -> bool {
163        matches!(
164            self,
165            Error::SendError(ErrorType::Closed(_))
166                | Error::TrySendError(ErrorType::Closed(_))
167                | Error::SendTimeoutError(ErrorType::Closed(_))
168        )
169    }
170
171    #[inline]
172    pub fn is_timeout(&self) -> bool {
173        matches!(
174            self,
175            Error::SendError(ErrorType::Timeout(_))
176                | Error::TrySendError(ErrorType::Timeout(_))
177                | Error::SendTimeoutError(ErrorType::Timeout(_))
178        )
179    }
180}
181
182impl<T> From<mpsc::TrySendError<T>> for Error<T> {
183    fn from(e: mpsc::TrySendError<T>) -> Self {
184        if e.is_full() {
185            Error::TrySendError(ErrorType::Full(Some(e.into_inner())))
186        } else {
187            Error::TrySendError(ErrorType::Closed(Some(e.into_inner())))
188        }
189    }
190}
191
192impl<T> From<mpsc::SendError> for Error<T> {
193    fn from(e: mpsc::SendError) -> Self {
194        if e.is_full() {
195            Error::SendError(ErrorType::Full(None))
196        } else {
197            Error::SendError(ErrorType::Closed(None))
198        }
199    }
200}
201
202// Just a helper function to ensure the futures we're returning all have the
203// right implementations.
204pub(crate) fn assert_future<T, F>(future: F) -> F
205where
206    F: futures::Future<Output = T>,
207{
208    future
209}
210
211static DEFAULT_EXEC_QUEUE: OnceCell<TaskExecQueue> = OnceCell::new();
212
213pub fn set_default(queue: TaskExecQueue) -> Result<(), TaskExecQueue> {
214    DEFAULT_EXEC_QUEUE.set(queue)
215}
216
217pub fn init_default() -> impl futures::Future<Output = ()> {
218    let (queue, runner) = Builder::default().workers(100).queue_max(100_000).build();
219    DEFAULT_EXEC_QUEUE.set(queue).ok().unwrap();
220    runner
221}
222
223pub fn default() -> &'static TaskExecQueue {
224    DEFAULT_EXEC_QUEUE
225        .get()
226        .expect("default task execution queue must be set first")
227}
228
229#[test]
230fn test_index_set() {
231    let set = IndexSet::new();
232    set.insert(1);
233    set.insert(10);
234    set.insert(100);
235    assert_eq!(set.len(), 3);
236    assert!(matches!(set.pop(), Some(1) | Some(10) | Some(100)));
237    assert_eq!(set.len(), 2);
238    set.pop();
239    set.pop();
240    assert_eq!(set.len(), 0);
241}
242
243// ---------------------------------------------------------------------------
244// Tests
245// ---------------------------------------------------------------------------
246#[cfg(test)]
247mod tests {
248    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
249    use std::sync::Arc;
250    use std::time::Duration;
251
252    use futures::Future;
253
254    use super::*;
255    use crate::builder::{Builder, SpawnExt};
256    use crate::local_builder::LocalBuilder;
257
258    // ------------------------------------------------------------------
259    // 1. Error type tests
260    // ------------------------------------------------------------------
261    #[test]
262    fn test_error_type_variants() {
263        let err = Error::<i32>::TrySendError(ErrorType::Full(Some(42)));
264        assert!(err.is_full());
265        assert!(!err.is_closed());
266        assert!(!err.is_timeout());
267        assert!(matches!(
268            err,
269            Error::TrySendError(ErrorType::Full(Some(42)))
270        ));
271
272        let err = Error::<i32>::SendError(ErrorType::Full(Some(42)));
273        assert!(err.is_full());
274        assert!(matches!(err, Error::SendError(ErrorType::Full(Some(42)))));
275
276        let err = Error::<i32>::SendTimeoutError(ErrorType::Full(Some(42)));
277        assert!(err.is_full());
278        assert!(matches!(
279            err,
280            Error::SendTimeoutError(ErrorType::Full(Some(42)))
281        ));
282
283        let err = Error::<i32>::TrySendError(ErrorType::Closed(Some(99)));
284        assert!(err.is_closed());
285        assert!(!err.is_full());
286
287        let err = Error::<i32>::SendError(ErrorType::Closed(Some(99)));
288        assert!(err.is_closed());
289
290        let err = Error::<i32>::SendTimeoutError(ErrorType::Closed(Some(99)));
291        assert!(err.is_closed());
292
293        let err = Error::<i32>::SendTimeoutError(ErrorType::Timeout(Some(77)));
294        assert!(err.is_timeout());
295        assert!(!err.is_full());
296        assert!(!err.is_closed());
297
298        let err = Error::<i32>::RecvResultError;
299        assert!(!err.is_full());
300        assert!(!err.is_closed());
301        assert!(!err.is_timeout());
302    }
303
304    #[test]
305    fn test_error_type_debug_and_display() {
306        let err = Error::<String>::TrySendError(ErrorType::Full(Some("hello".into())));
307        assert!(!format!("{:?}", err).is_empty());
308        assert!(!format!("{}", err).is_empty());
309    }
310
311    #[test]
312    fn test_error_type_eq() {
313        assert_eq!(
314            ErrorType::<i32>::Full(Some(1)),
315            ErrorType::<i32>::Full(Some(1))
316        );
317        assert_ne!(
318            ErrorType::<i32>::Full(Some(1)),
319            ErrorType::<i32>::Full(Some(2))
320        );
321        assert_ne!(
322            ErrorType::<i32>::Full(Some(1)),
323            ErrorType::<i32>::Closed(Some(1))
324        );
325    }
326
327    #[test]
328    fn test_error_none_values() {
329        assert!(Error::<()>::SendError(ErrorType::Full(None)).is_full());
330        assert!(Error::<()>::SendError(ErrorType::Closed(None)).is_closed());
331        assert!(Error::<()>::SendTimeoutError(ErrorType::Timeout(None)).is_timeout());
332    }
333
334    // ------------------------------------------------------------------
335    // 2. Builder tests (public API only - fields are private)
336    // ------------------------------------------------------------------
337    #[test]
338    fn test_builder_build_default_queue_workers() {
339        let (queue, _runner) = Builder::default().build();
340        assert_eq!(queue.workers(), 100);
341    }
342
343    #[test]
344    fn test_builder_custom_workers() {
345        let (queue, _runner) = Builder::default().workers(42).build();
346        assert_eq!(queue.workers(), 42);
347    }
348
349    #[test]
350    fn test_builder_custom_queue_max_is_full_check() {
351        let (queue, _runner) = Builder::default().queue_max(10).build();
352        assert_eq!(queue.waiting_count(), 0);
353    }
354
355    #[test]
356    fn test_builder_custom_both() {
357        let (q, _r) = Builder::default().workers(8).queue_max(2048).build();
358        assert_eq!(q.workers(), 8);
359    }
360
361    #[test]
362    fn test_group_builder_build() {
363        let (_queue, _runner): (TaskExecQueue<_, String, ()>, _) =
364            Builder::default().group().build::<String>();
365    }
366
367    #[test]
368    fn test_group_builder_default_workers() {
369        let (queue, _runner): (TaskExecQueue<_, String, ()>, _) =
370            Builder::default().group().build::<String>();
371        assert_eq!(queue.workers(), 100);
372    }
373
374    #[test]
375    fn test_builder_with_channel() {
376        let (tx, rx) = futures::channel::mpsc::channel::<((), TaskType)>(10);
377        let (_queue, _runner) = Builder::default().with_channel::<_, _, ()>(tx, rx).build();
378    }
379
380    #[test]
381    fn test_builder_with_channel_group() {
382        let (tx, rx) = futures::channel::mpsc::channel::<((), TaskType)>(10);
383        let (_queue, _runner) = Builder::default()
384            .with_channel::<_, _, ()>(tx, rx)
385            .group()
386            .build::<String>();
387    }
388
389    #[test]
390    fn test_builder_build_default_types() {
391        let (_queue, _runner): (TaskExecQueue, _) = Builder::default().build();
392        let (_queue, _runner): (TaskExecQueue<_, (), ()>, _) = Builder::default().build();
393    }
394
395    #[test]
396    fn test_group_channel_builder_combined() {
397        let (tx, rx) = futures::channel::mpsc::channel::<((), TaskType)>(50);
398        let (_queue, _runner) = Builder::default()
399            .workers(3)
400            .queue_max(50)
401            .with_channel::<_, _, ()>(tx, rx)
402            .group()
403            .build::<String>();
404    }
405
406    // ------------------------------------------------------------------
407    // 3. Queue state tests (non-async)
408    // ------------------------------------------------------------------
409    #[test]
410    fn test_queue_state_initial() {
411        let (queue, _runner) = Builder::default().workers(4).queue_max(1000).build();
412        assert_eq!(queue.workers(), 4);
413        assert_eq!(queue.active_count(), 0);
414        assert_eq!(queue.waiting_count(), 0);
415        assert!(!queue.is_active());
416        assert!(!queue.is_closed());
417        assert!(!queue.is_full());
418        assert!(!queue.is_flushing());
419        assert_eq!(queue.pending_wakers_count(), 0);
420        assert_eq!(queue.waiting_wakers_count(), 0);
421    }
422
423    #[test]
424    fn test_queue_state_is_full_initially_false() {
425        let (queue, _runner) = Builder::default().queue_max(1).build();
426        assert!(!queue.is_full());
427    }
428
429    #[test]
430    fn test_queue_state_is_closed_initially_false() {
431        let (queue, _runner) = Builder::default().build();
432        assert!(!queue.is_closed());
433    }
434
435    // ------------------------------------------------------------------
436    // 4. Spawner basic tests (async, multi-threaded tokio)
437    // ------------------------------------------------------------------
438    #[tokio::test]
439    async fn test_spawn_task_executes() {
440        let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
441        tokio::spawn(runner);
442
443        let flag = Arc::new(AtomicBool::new(false));
444        let f = flag.clone();
445        let result = queue
446            .spawn(async move { f.store(true, Ordering::SeqCst) })
447            .await;
448        assert!(result.is_ok());
449        tokio::time::sleep(Duration::from_millis(100)).await;
450        assert!(flag.load(Ordering::SeqCst));
451    }
452
453    #[tokio::test]
454    async fn test_try_spawn_task_executes() {
455        let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
456        tokio::spawn(runner);
457
458        let flag = Arc::new(AtomicBool::new(false));
459        let f = flag.clone();
460        let result = queue
461            .try_spawn(async move { f.store(true, Ordering::SeqCst) })
462            .await;
463        assert!(result.is_ok());
464        tokio::time::sleep(Duration::from_millis(100)).await;
465        assert!(flag.load(Ordering::SeqCst));
466    }
467
468    #[tokio::test]
469    async fn test_spawn_multiple_tasks() {
470        let (queue, runner) = Builder::default().workers(5).queue_max(100).build();
471        tokio::spawn(runner);
472
473        let counter = Arc::new(AtomicUsize::new(0));
474        for _ in 0..10 {
475            let c = counter.clone();
476            let result = queue
477                .spawn(async move {
478                    c.fetch_add(1, Ordering::SeqCst);
479                })
480                .await;
481            assert!(result.is_ok());
482        }
483        tokio::time::sleep(Duration::from_millis(200)).await;
484        assert_eq!(counter.load(Ordering::SeqCst), 10);
485    }
486
487    #[tokio::test]
488    async fn test_spawn_with_name() {
489        // Custom channel with D = &'static str
490        let (tx, rx) = futures::channel::mpsc::channel::<(&'static str, TaskType)>(100);
491        let (queue, runner) = Builder::default()
492            .workers(2)
493            .queue_max(100)
494            .with_channel::<_, _, &'static str>(tx, rx)
495            .build();
496        tokio::spawn(runner);
497
498        let flag = Arc::new(AtomicBool::new(false));
499        let f = flag.clone();
500        let result = queue
501            .spawn_with(async move { f.store(true, Ordering::SeqCst) }, "named")
502            .await;
503        assert!(result.is_ok());
504        tokio::time::sleep(Duration::from_millis(100)).await;
505        assert!(flag.load(Ordering::SeqCst));
506    }
507
508    #[tokio::test]
509    async fn test_try_spawn_with_name() {
510        let (tx, rx) = futures::channel::mpsc::channel::<(&'static str, TaskType)>(100);
511        let (queue, runner) = Builder::default()
512            .workers(2)
513            .queue_max(100)
514            .with_channel::<_, _, &'static str>(tx, rx)
515            .build();
516        tokio::spawn(runner);
517
518        let flag = Arc::new(AtomicBool::new(false));
519        let f = flag.clone();
520        let result = queue
521            .try_spawn_with(async move { f.store(true, Ordering::SeqCst) }, "try_named")
522            .await;
523        assert!(result.is_ok());
524        tokio::time::sleep(Duration::from_millis(100)).await;
525        assert!(flag.load(Ordering::SeqCst));
526    }
527
528    #[tokio::test]
529    async fn test_spawn_result_returns_ok() {
530        let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
531        tokio::spawn(runner);
532        let result = queue.spawn(async {}).await;
533        assert!(result.is_ok());
534    }
535
536    // ------------------------------------------------------------------
537    // 5. Group spawner tests (async) — uses GroupBuilder for G=String
538    // ------------------------------------------------------------------
539    #[tokio::test]
540    async fn test_group_same_key_sequential_order() {
541        let (queue, runner): (TaskExecQueue<_, String, ()>, _) = Builder::default()
542            .workers(2)
543            .queue_max(100)
544            .group()
545            .build::<String>();
546        tokio::spawn(runner);
547
548        let results = Arc::new(parking_lot::Mutex::new(Vec::new()));
549        let r = results.clone();
550        let result = queue
551            .spawn(async move { r.lock().push(1) })
552            .group("group_a".to_string())
553            .await;
554        assert!(result.is_ok());
555
556        let r = results.clone();
557        let result = queue
558            .spawn(async move { r.lock().push(2) })
559            .group("group_a".to_string())
560            .await;
561        assert!(result.is_ok());
562
563        tokio::time::sleep(Duration::from_millis(200)).await;
564        assert_eq!(*results.lock(), vec![1, 2]);
565    }
566
567    #[tokio::test]
568    async fn test_group_different_keys_concurrent() {
569        let (queue, runner): (TaskExecQueue<_, String, ()>, _) = Builder::default()
570            .workers(5)
571            .queue_max(100)
572            .group()
573            .build::<String>();
574        tokio::spawn(runner);
575
576        let results = Arc::new(parking_lot::Mutex::new(Vec::new()));
577
578        let r = results.clone();
579        let result = queue
580            .spawn(async move { r.lock().push("a1") })
581            .group("grp_a".to_string())
582            .await;
583        assert!(result.is_ok());
584
585        let r = results.clone();
586        let result = queue
587            .spawn(async move { r.lock().push("b1") })
588            .group("grp_b".to_string())
589            .await;
590        assert!(result.is_ok());
591
592        tokio::time::sleep(Duration::from_millis(200)).await;
593        let vec = results.lock();
594        assert_eq!(vec.len(), 2);
595        assert!(vec.contains(&"a1"));
596        assert!(vec.contains(&"b1"));
597    }
598
599    #[tokio::test]
600    async fn test_try_group_spawner() {
601        let (queue, runner): (TaskExecQueue<_, String, ()>, _) = Builder::default()
602            .workers(2)
603            .queue_max(100)
604            .group()
605            .build::<String>();
606        tokio::spawn(runner);
607
608        let flag = Arc::new(AtomicBool::new(false));
609        let f = flag.clone();
610        let result = queue
611            .try_spawn(async move { f.store(true, Ordering::SeqCst) })
612            .group("grp_x".to_string())
613            .await;
614        assert!(result.is_ok());
615        tokio::time::sleep(Duration::from_millis(100)).await;
616        assert!(flag.load(Ordering::SeqCst));
617    }
618
619    #[tokio::test]
620    async fn test_group_spawner_consecutive() {
621        let (queue, runner): (TaskExecQueue<_, String, ()>, _) = Builder::default()
622            .workers(2)
623            .queue_max(100)
624            .group()
625            .build::<String>();
626        tokio::spawn(runner);
627
628        let counter = Arc::new(AtomicUsize::new(0));
629        for _ in 0..3 {
630            let c = counter.clone();
631            let result = queue
632                .spawn(async move {
633                    c.fetch_add(1, Ordering::SeqCst);
634                    tokio::time::sleep(Duration::from_millis(50)).await;
635                })
636                .group("seq".to_string())
637                .await;
638            assert!(result.is_ok());
639        }
640        tokio::time::sleep(Duration::from_millis(500)).await;
641        assert_eq!(counter.load(Ordering::SeqCst), 3);
642    }
643
644    // ------------------------------------------------------------------
645    // 6. Close / Flush tests (async)
646    // ------------------------------------------------------------------
647    #[tokio::test]
648    async fn test_close_drains_and_closes() {
649        let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
650        tokio::spawn(runner);
651
652        let counter = Arc::new(AtomicUsize::new(0));
653        let c = counter.clone();
654        let _ = queue
655            .spawn(async move {
656                tokio::time::sleep(Duration::from_millis(50)).await;
657                c.fetch_add(1, Ordering::SeqCst);
658            })
659            .await;
660
661        let result = queue.close().await;
662        assert!(result.is_ok());
663        assert!(queue.is_closed());
664        tokio::time::sleep(Duration::from_millis(50)).await;
665        assert_eq!(counter.load(Ordering::SeqCst), 1);
666    }
667
668    #[tokio::test]
669    async fn test_flush_waiting_tasks() {
670        let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
671        tokio::spawn(runner);
672
673        let counter = Arc::new(AtomicUsize::new(0));
674        for _ in 0..3 {
675            let c = counter.clone();
676            let _ = queue
677                .spawn(async move {
678                    tokio::time::sleep(Duration::from_millis(30)).await;
679                    c.fetch_add(1, Ordering::SeqCst);
680                })
681                .await;
682        }
683
684        let result = queue.flush().await;
685        assert!(result.is_ok());
686        assert_eq!(counter.load(Ordering::SeqCst), 3);
687    }
688
689    #[tokio::test]
690    async fn test_spawn_after_close_returns_error() {
691        let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
692        tokio::spawn(runner);
693        let _ = queue.close().await;
694
695        let result = queue.spawn(async {}).await;
696        assert!(result.is_err());
697    }
698
699    #[tokio::test]
700    async fn test_try_spawn_after_close_fails() {
701        let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
702        tokio::spawn(runner);
703        let _ = queue.close().await;
704
705        let result = queue.try_spawn(async {}).await;
706        assert!(result.is_err());
707    }
708
709    #[tokio::test]
710    async fn test_close_before_start() {
711        let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
712        let runner_handle = tokio::spawn(runner);
713
714        let result = queue.close().await;
715        assert!(result.is_ok());
716        assert!(queue.is_closed());
717        let _ = tokio::time::timeout(Duration::from_secs(1), runner_handle).await;
718    }
719
720    // ------------------------------------------------------------------
721    // 7. Local variant tests (single-threaded, uses LocalSet + spawn_local)
722    // ------------------------------------------------------------------
723    fn run_local<F, T>(f: F) -> T
724    where
725        F: Future<Output = T>,
726    {
727        let rt = tokio::runtime::Builder::new_current_thread()
728            .enable_time()
729            .build()
730            .unwrap();
731        let local = tokio::task::LocalSet::new();
732        rt.block_on(local.run_until(f))
733    }
734
735    #[test]
736    fn test_local_spawn_task_executes() {
737        run_local(async {
738            let (queue, runner) = LocalBuilder::default().workers(2).queue_max(100).build();
739            tokio::task::spawn_local(runner);
740
741            let flag = Arc::new(AtomicBool::new(false));
742            let f = flag.clone();
743            let result = queue
744                .spawn(async move { f.store(true, Ordering::SeqCst) })
745                .await;
746            assert!(result.is_ok());
747            tokio::time::sleep(Duration::from_millis(100)).await;
748            assert!(flag.load(Ordering::SeqCst));
749        });
750    }
751
752    #[test]
753    fn test_local_try_spawn() {
754        run_local(async {
755            let (queue, runner) = LocalBuilder::default().workers(2).queue_max(100).build();
756            tokio::task::spawn_local(runner);
757
758            let flag = Arc::new(AtomicBool::new(false));
759            let f = flag.clone();
760            let result = queue
761                .try_spawn(async move { f.store(true, Ordering::SeqCst) })
762                .await;
763            assert!(result.is_ok());
764            tokio::time::sleep(Duration::from_millis(100)).await;
765            assert!(flag.load(Ordering::SeqCst));
766        });
767    }
768
769    #[test]
770    fn test_local_group_spawner() {
771        run_local(async {
772            let (queue, runner): (LocalTaskExecQueue<_, String, ()>, _) = LocalBuilder::default()
773                .workers(2)
774                .queue_max(100)
775                .group()
776                .build::<String>();
777            tokio::task::spawn_local(runner);
778
779            let results = Arc::new(parking_lot::Mutex::new(Vec::new()));
780            let r = results.clone();
781            let result = queue
782                .spawn(async move { r.lock().push(1) })
783                .group("g".to_string())
784                .await;
785            assert!(result.is_ok());
786
787            let r = results.clone();
788            let result = queue
789                .spawn(async move { r.lock().push(2) })
790                .group("g".to_string())
791                .await;
792            assert!(result.is_ok());
793
794            tokio::time::sleep(Duration::from_millis(200)).await;
795            assert_eq!(*results.lock(), vec![1, 2]);
796        });
797    }
798
799    #[test]
800    fn test_local_queue_state() {
801        let (queue, _runner) = LocalBuilder::default().workers(3).queue_max(500).build();
802        assert_eq!(queue.workers(), 3);
803        assert!(!queue.is_closed());
804        assert!(!queue.is_full());
805        assert_eq!(queue.active_count(), 0);
806        assert_eq!(queue.waiting_count(), 0);
807    }
808
809    #[test]
810    fn test_local_close() {
811        run_local(async {
812            let (queue, runner) = LocalBuilder::default().workers(2).queue_max(100).build();
813            tokio::task::spawn_local(runner);
814
815            let counter = Arc::new(AtomicUsize::new(0));
816            let c = counter.clone();
817            let _ = queue
818                .spawn(async move { c.fetch_add(1, Ordering::SeqCst) })
819                .await;
820
821            let result = queue.close().await;
822            assert!(result.is_ok());
823            assert!(queue.is_closed());
824            tokio::time::sleep(Duration::from_millis(100)).await;
825            assert_eq!(counter.load(Ordering::SeqCst), 1);
826        });
827    }
828
829    #[test]
830    fn test_local_flush() {
831        run_local(async {
832            let (queue, runner) = LocalBuilder::default().workers(2).queue_max(100).build();
833            tokio::task::spawn_local(runner);
834
835            let counter = Arc::new(AtomicUsize::new(0));
836            for _ in 0..3 {
837                let c = counter.clone();
838                let _ = queue
839                    .spawn(async move { c.fetch_add(1, Ordering::SeqCst) })
840                    .await;
841            }
842
843            let result = queue.flush().await;
844            assert!(result.is_ok());
845            assert_eq!(counter.load(Ordering::SeqCst), 3);
846        });
847    }
848
849    #[test]
850    fn test_local_spawn_with_name() {
851        run_local(async {
852            let (tx, rx) = futures::channel::mpsc::channel::<(&'static str, LocalTaskType)>(100);
853            let (queue, runner) = LocalBuilder::default()
854                .workers(2)
855                .queue_max(100)
856                .with_channel::<_, _, &'static str>(tx, rx)
857                .build();
858            tokio::task::spawn_local(runner);
859
860            let flag = Arc::new(AtomicBool::new(false));
861            let f = flag.clone();
862            let result = queue
863                .spawn_with(
864                    async move { f.store(true, Ordering::SeqCst) },
865                    "local_named",
866                )
867                .await;
868            assert!(result.is_ok());
869            tokio::time::sleep(Duration::from_millis(100)).await;
870            assert!(flag.load(Ordering::SeqCst));
871        });
872    }
873
874    #[test]
875    fn test_local_try_group_spawner() {
876        run_local(async {
877            let (queue, runner): (LocalTaskExecQueue<_, String, ()>, _) = LocalBuilder::default()
878                .workers(2)
879                .queue_max(100)
880                .group()
881                .build::<String>();
882            tokio::task::spawn_local(runner);
883
884            let flag = Arc::new(AtomicBool::new(false));
885            let f = flag.clone();
886            let result = queue
887                .try_spawn(async move { f.store(true, Ordering::SeqCst) })
888                .group("g".to_string())
889                .await;
890            assert!(result.is_ok());
891            tokio::time::sleep(Duration::from_millis(100)).await;
892            assert!(flag.load(Ordering::SeqCst));
893        });
894    }
895
896    // ------------------------------------------------------------------
897    // 8. Default spawner tests
898    // ------------------------------------------------------------------
899    #[tokio::test]
900    async fn test_spawn_default_with_manual_set() {
901        let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
902        tokio::spawn(runner);
903
904        // set_default returns Result<(), TaskExecQueue> — handle without unwrap
905        if set_default(queue).is_err() {
906            // already set by another test — skip
907            return;
908        }
909
910        let flag = Arc::new(AtomicBool::new(false));
911        let f = flag.clone();
912        let result = default()
913            .spawn(async move { f.store(true, Ordering::SeqCst) })
914            .await;
915        assert!(result.is_ok());
916        tokio::time::sleep(Duration::from_millis(100)).await;
917        assert!(flag.load(Ordering::SeqCst));
918    }
919
920    #[tokio::test]
921    async fn test_default_already_set() {
922        let (q1, r1) = Builder::default().workers(1).queue_max(10).build();
923        let (q2, _r2) = Builder::default().workers(1).queue_max(10).build();
924        tokio::spawn(r1);
925
926        assert!(set_default(q1).is_ok());
927        assert!(set_default(q2).is_err());
928    }
929
930    // ------------------------------------------------------------------
931    // 9. SpawnExt trait tests (qualified calls to avoid ambiguity)
932    // ------------------------------------------------------------------
933    #[tokio::test]
934    async fn test_spawn_ext_trait() {
935        let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
936        tokio::spawn(runner);
937
938        let flag = Arc::new(AtomicBool::new(false));
939        let f = flag.clone();
940        let result = SpawnExt::spawn(async move { f.store(true, Ordering::SeqCst) }, &queue).await;
941        assert!(result.is_ok());
942        tokio::time::sleep(Duration::from_millis(100)).await;
943        assert!(flag.load(Ordering::SeqCst));
944    }
945
946    #[tokio::test]
947    async fn test_spawn_ext_with_name() {
948        let (tx, rx) = futures::channel::mpsc::channel::<(&'static str, TaskType)>(100);
949        let (queue, runner) = Builder::default()
950            .workers(2)
951            .queue_max(100)
952            .with_channel::<_, _, &'static str>(tx, rx)
953            .build();
954        tokio::spawn(runner);
955
956        let flag = Arc::new(AtomicBool::new(false));
957        let f = flag.clone();
958        let result = SpawnExt::spawn_with(
959            async move { f.store(true, Ordering::SeqCst) },
960            &queue,
961            "named",
962        )
963        .await;
964        assert!(result.is_ok());
965        tokio::time::sleep(Duration::from_millis(100)).await;
966        assert!(flag.load(Ordering::SeqCst));
967    }
968
969    // ------------------------------------------------------------------
970    // 10. Counter tests (internal helper)
971    // ------------------------------------------------------------------
972    #[test]
973    fn test_counter_inc_dec_value() {
974        let c = Counter::new();
975        assert_eq!(c.value(), 0);
976        c.inc();
977        assert_eq!(c.value(), 1);
978        c.inc();
979        assert_eq!(c.value(), 2);
980        c.dec();
981        assert_eq!(c.value(), 1);
982        c.dec();
983        assert_eq!(c.value(), 0);
984    }
985
986    // ------------------------------------------------------------------
987    // 11. Rate feature tests (conditional)
988    // ------------------------------------------------------------------
989    #[cfg(feature = "rate")]
990    #[tokio::test]
991    async fn test_rate_completed_count() {
992        let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
993        tokio::spawn(runner);
994
995        for _ in 0..5 {
996            let _ = queue.spawn(async {}).await;
997        }
998
999        tokio::time::sleep(Duration::from_millis(200)).await;
1000        assert!(queue.completed_count().await >= 0);
1001    }
1002
1003    #[cfg(feature = "rate")]
1004    #[tokio::test]
1005    async fn test_rate_method() {
1006        let (queue, runner) = Builder::default().workers(2).queue_max(100).build();
1007        tokio::spawn(runner);
1008
1009        for _ in 0..10 {
1010            let _ = queue.spawn(async {}).await;
1011        }
1012
1013        tokio::time::sleep(Duration::from_millis(300)).await;
1014        assert!(queue.rate().await >= 0.0);
1015    }
1016
1017    // ------------------------------------------------------------------
1018    // 12. LocalBuilder tests (public API only)
1019    // ------------------------------------------------------------------
1020    #[test]
1021    fn test_local_builder_default_workers() {
1022        let (queue, _runner) = LocalBuilder::default().build();
1023        assert_eq!(queue.workers(), 100);
1024    }
1025
1026    #[test]
1027    fn test_local_builder_custom_workers() {
1028        let (queue, _runner) = LocalBuilder::default().workers(4).queue_max(500).build();
1029        assert_eq!(queue.workers(), 4);
1030    }
1031
1032    #[test]
1033    fn test_local_group_builder() {
1034        let (_queue, _runner): (LocalTaskExecQueue<_, String, ()>, _) =
1035            LocalBuilder::default().workers(2).group().build::<String>();
1036    }
1037
1038    #[test]
1039    fn test_local_channel_builder() {
1040        let (tx, rx) = futures::channel::mpsc::channel::<((), LocalTaskType)>(10);
1041        let (_queue, _runner) = LocalBuilder::default()
1042            .with_channel::<_, _, ()>(tx, rx)
1043            .build();
1044    }
1045
1046    #[test]
1047    fn test_local_channel_builder_group() {
1048        let (tx, rx) = futures::channel::mpsc::channel::<((), LocalTaskType)>(10);
1049        let (_queue, _runner): (LocalTaskExecQueue<_, String, ()>, _) = LocalBuilder::default()
1050            .with_channel::<_, _, ()>(tx, rx)
1051            .group()
1052            .build::<String>();
1053    }
1054}