1use std::cmp::Ordering;
24use std::collections::BinaryHeap;
25use std::panic::{self, AssertUnwindSafe};
26use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
27use std::sync::{Arc, Condvar, Mutex};
28use std::thread;
29use std::time::{Duration, Instant};
30
31#[derive(Debug, Clone, Copy, Eq, PartialEq)]
35pub enum Priority {
36 Low,
38 Normal,
40 High,
42}
43
44impl Priority {
45 fn as_u8(self) -> u8 {
46 match self {
47 Priority::Low => 0,
48 Priority::Normal => 1,
49 Priority::High => 2,
50 }
51 }
52}
53
54impl Ord for Priority {
55 fn cmp(&self, other: &Self) -> Ordering {
56 self.as_u8().cmp(&other.as_u8())
57 }
58}
59
60impl PartialOrd for Priority {
61 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
62 Some(self.cmp(other))
63 }
64}
65
66#[derive(Debug)]
68pub enum TaskError {
69 Panicked,
71 Cancelled,
73 QueueFull,
75}
76
77impl std::fmt::Display for TaskError {
78 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
79 match self {
80 TaskError::Panicked => write!(f, "task panicked"),
81 TaskError::Cancelled => write!(f, "task cancelled"),
82 TaskError::QueueFull => write!(f, "task rejected: queue is full"),
83 }
84 }
85}
86
87impl std::error::Error for TaskError {}
88
89#[derive(Debug, Clone)]
93pub struct TaskQueueStats {
94 pub total_submitted: u64,
96 pub completed: u64,
98 pub failed: u64,
100 pub in_flight: u64,
102 pub total_latency_nanos: u128,
105 pub completed_latency_samples: u64,
107}
108
109impl TaskQueueStats {
110 pub fn average_latency(&self) -> Option<Duration> {
126 if self.completed_latency_samples == 0 {
127 return None;
128 }
129 let avg_nanos = self.total_latency_nanos / self.completed_latency_samples as u128;
130 let capped = avg_nanos.min(u64::MAX as u128) as u64;
132 Some(Duration::from_nanos(capped))
133 }
134}
135
136struct StatsCounters {
138 total_submitted: AtomicU64,
139 completed: AtomicU64,
140 failed: AtomicU64,
141 in_flight: AtomicU64,
142 latency: Mutex<LatencyAccumulator>,
145}
146
147#[derive(Default)]
148struct LatencyAccumulator {
149 total_nanos: u128,
150 samples: u64,
151}
152
153impl StatsCounters {
154 fn new() -> Self {
155 Self {
156 total_submitted: AtomicU64::new(0),
157 completed: AtomicU64::new(0),
158 failed: AtomicU64::new(0),
159 in_flight: AtomicU64::new(0),
160 latency: Mutex::new(LatencyAccumulator::default()),
161 }
162 }
163
164 fn record_latency(&self, elapsed: Duration) {
165 if let Ok(mut acc) = self.latency.lock() {
166 acc.total_nanos = acc.total_nanos.saturating_add(elapsed.as_nanos());
167 acc.samples = acc.samples.saturating_add(1);
168 }
169 }
170
171 fn snapshot_latency(&self) -> (u128, u64) {
172 match self.latency.lock() {
173 Ok(acc) => (acc.total_nanos, acc.samples),
174 Err(_) => (0, 0),
175 }
176 }
177}
178
179type CompletionCallback = dyn Fn(bool, Duration) + Send + Sync;
180
181pub struct TaskHandle<T> {
194 inner: Arc<TaskResultSlot<T>>,
195}
196
197struct TaskResultSlot<T> {
198 mutex: Mutex<Option<Result<T, TaskError>>>,
199 condvar: Condvar,
200}
201
202impl<T> TaskResultSlot<T> {
203 fn set(&self, value: Result<T, TaskError>) {
204 let mut guard = self.mutex.lock().unwrap();
205 *guard = Some(value);
206 self.condvar.notify_one();
207 }
208}
209
210impl<T> TaskHandle<T> {
211 pub fn join(self) -> Result<T, TaskError> {
216 let mut guard = self.inner.mutex.lock().unwrap();
217 while guard.is_none() {
218 guard = self.inner.condvar.wait(guard).unwrap();
219 }
220 guard.take().unwrap()
221 }
222
223 pub fn is_done(&self) -> bool {
225 self.inner.mutex.lock().unwrap().is_some()
226 }
227}
228
229struct CancelGuard<T> {
233 slot: Arc<TaskResultSlot<T>>,
234}
235
236impl<T> Drop for CancelGuard<T> {
237 fn drop(&mut self) {
238 let mut guard = self.slot.mutex.lock().unwrap();
239 if guard.is_none() {
240 *guard = Some(Err(TaskError::Cancelled));
241 self.slot.condvar.notify_one();
242 }
243 }
244}
245
246type TaskCompletion = Box<dyn FnOnce() + Send>;
249type BoxedTask = Box<dyn FnOnce() -> TaskCompletion + Send>;
250
251struct QueueEntry {
252 priority: Priority,
253 sequence: u64,
254 task: BoxedTask,
255 enqueued_at: Instant,
256}
257
258impl Eq for QueueEntry {}
259
260impl PartialEq for QueueEntry {
261 fn eq(&self, other: &Self) -> bool {
262 self.priority == other.priority && self.sequence == other.sequence
263 }
264}
265
266impl Ord for QueueEntry {
267 fn cmp(&self, other: &Self) -> Ordering {
268 self.priority
269 .cmp(&other.priority)
270 .then_with(|| other.sequence.cmp(&self.sequence))
271 }
272}
273
274impl PartialOrd for QueueEntry {
275 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
276 Some(self.cmp(other))
277 }
278}
279
280struct SharedState {
281 queue: BinaryHeap<QueueEntry>,
282 shutdown: bool,
283 draining: bool,
284 next_sequence: u64,
285 max_queued: Option<usize>,
286 paused: bool,
287}
288
289pub struct TaskQueue {
311 shared: Arc<(Mutex<SharedState>, Condvar)>,
312 workers: Option<Vec<thread::JoinHandle<()>>>,
313 stats: Arc<StatsCounters>,
314 callback: Arc<Mutex<Option<Arc<CompletionCallback>>>>,
315}
316
317impl TaskQueue {
318 pub fn new(concurrency: usize) -> Self {
324 assert!(concurrency > 0, "concurrency must be at least 1");
325
326 let shared = Arc::new((
327 Mutex::new(SharedState {
328 queue: BinaryHeap::new(),
329 shutdown: false,
330 draining: false,
331 next_sequence: 0,
332 max_queued: None,
333 paused: false,
334 }),
335 Condvar::new(),
336 ));
337
338 let stats = Arc::new(StatsCounters::new());
339 let callback: Arc<Mutex<Option<Arc<CompletionCallback>>>> = Arc::new(Mutex::new(None));
340
341 let mut workers = Vec::with_capacity(concurrency);
342 for _ in 0..concurrency {
343 let shared = Arc::clone(&shared);
344 let stats = Arc::clone(&stats);
345 let callback = Arc::clone(&callback);
346 let handle = thread::spawn(move || {
347 worker_loop(&shared, &stats, &callback);
348 });
349 workers.push(handle);
350 }
351
352 TaskQueue {
353 shared,
354 workers: Some(workers),
355 stats,
356 callback,
357 }
358 }
359
360 pub fn with_capacity(concurrency: usize, max_queued: usize) -> Self {
370 let queue = Self::new(concurrency);
371 {
372 let (ref mutex, _) = *queue.shared;
373 mutex.lock().unwrap().max_queued = Some(max_queued);
374 }
375 queue
376 }
377
378 pub fn pause(&self) {
385 let (ref mutex, _) = *self.shared;
386 mutex.lock().unwrap().paused = true;
387 }
388
389 pub fn resume(&self) {
391 let (ref mutex, ref condvar) = *self.shared;
392 mutex.lock().unwrap().paused = false;
393 condvar.notify_all();
394 }
395
396 pub fn is_paused(&self) -> bool {
398 let (ref mutex, _) = *self.shared;
399 mutex.lock().unwrap().paused
400 }
401
402 pub fn pending_count(&self) -> usize {
404 let (ref mutex, _) = *self.shared;
405 mutex.lock().unwrap().queue.len()
406 }
407
408 pub fn submit<F, T>(&self, task: F) -> TaskHandle<T>
412 where
413 F: FnOnce() -> T + Send + 'static,
414 T: Send + 'static,
415 {
416 self.submit_with_priority(Priority::Normal, task)
417 }
418
419 pub fn submit_with_priority<F, T>(&self, priority: Priority, task: F) -> TaskHandle<T>
429 where
430 F: FnOnce() -> T + Send + 'static,
431 T: Send + 'static,
432 {
433 let slot = Arc::new(TaskResultSlot {
434 mutex: Mutex::new(None),
435 condvar: Condvar::new(),
436 });
437
438 {
440 let (ref mutex, _) = *self.shared;
441 let state = mutex.lock().unwrap();
442 if state.draining || state.shutdown {
443 slot.set(Err(TaskError::Cancelled));
444 return TaskHandle { inner: slot };
445 }
446 if let Some(max) = state.max_queued {
447 if state.queue.len() >= max {
448 slot.set(Err(TaskError::QueueFull));
449 return TaskHandle { inner: slot };
450 }
451 }
452 }
453
454 let cancel_guard = CancelGuard {
455 slot: Arc::clone(&slot),
456 };
457
458 let boxed: BoxedTask = Box::new(move || {
459 let outcome = panic::catch_unwind(AssertUnwindSafe(task));
464 let success = outcome.is_ok();
465 TASK_SUCCESS.with(|s| s.set(success));
466 let value = match outcome {
467 Ok(v) => Ok(v),
468 Err(_) => Err(TaskError::Panicked),
469 };
470 let slot = Arc::clone(&cancel_guard.slot);
471 std::mem::forget(cancel_guard);
473 Box::new(move || slot.set(value))
476 });
477
478 self.stats
479 .total_submitted
480 .fetch_add(1, AtomicOrdering::Relaxed);
481
482 let (ref mutex, ref condvar) = *self.shared;
483 let mut state = mutex.lock().unwrap();
484 let sequence = state.next_sequence;
485 state.next_sequence += 1;
486 state.queue.push(QueueEntry {
487 priority,
488 sequence,
489 task: boxed,
490 enqueued_at: Instant::now(),
491 });
492 condvar.notify_one();
493
494 TaskHandle { inner: slot }
495 }
496
497 pub fn stats(&self) -> TaskQueueStats {
517 let (total_latency_nanos, completed_latency_samples) = self.stats.snapshot_latency();
518 TaskQueueStats {
519 total_submitted: self.stats.total_submitted.load(AtomicOrdering::Relaxed),
520 completed: self.stats.completed.load(AtomicOrdering::Relaxed),
521 failed: self.stats.failed.load(AtomicOrdering::Relaxed),
522 in_flight: self.stats.in_flight.load(AtomicOrdering::Relaxed),
523 total_latency_nanos,
524 completed_latency_samples,
525 }
526 }
527
528 pub fn drain(mut self) {
558 self.do_drain();
559 }
560
561 fn do_drain(&mut self) {
562 let (ref mutex, ref condvar) = *self.shared;
563 {
564 let mut state = mutex.lock().unwrap();
565 state.draining = true;
566 condvar.notify_all();
569 }
570
571 {
573 let mut state = mutex.lock().unwrap();
574 while !state.queue.is_empty() || self.stats.in_flight.load(AtomicOrdering::SeqCst) > 0 {
575 state = condvar.wait(state).unwrap();
576 }
577 }
578
579 self.do_shutdown();
582 }
583
584 pub fn on_complete<F>(&self, callback: F)
612 where
613 F: Fn(bool, Duration) + Send + Sync + 'static,
614 {
615 let mut guard = self.callback.lock().unwrap();
616 *guard = Some(Arc::new(callback));
617 }
618
619 pub fn shutdown(mut self) {
625 self.do_shutdown();
626 }
627
628 fn do_shutdown(&mut self) {
629 let (ref mutex, ref condvar) = *self.shared;
630
631 {
632 let mut state = mutex.lock().unwrap();
633 state.shutdown = true;
634 condvar.notify_all();
635 state.queue.clear();
638 }
639
640 if let Some(workers) = self.workers.take() {
641 for w in workers {
642 let _ = w.join();
643 }
644 }
645 }
646}
647
648impl Drop for TaskQueue {
649 fn drop(&mut self) {
650 let (ref mutex, ref condvar) = *self.shared;
651 {
652 let mut state = mutex.lock().unwrap();
653 if !state.shutdown {
654 state.shutdown = true;
655 if !state.draining {
656 state.queue.clear();
657 }
658 condvar.notify_all();
659 }
660 }
661 if let Some(workers) = self.workers.take() {
662 for w in workers {
663 let _ = w.join();
664 }
665 }
666 }
667}
668
669thread_local! {
670 static TASK_SUCCESS: std::cell::Cell<bool> = const { std::cell::Cell::new(true) };
672}
673
674fn worker_loop(
675 shared: &(Mutex<SharedState>, Condvar),
676 stats: &StatsCounters,
677 callback: &Mutex<Option<Arc<CompletionCallback>>>,
678) {
679 let (ref mutex, ref condvar) = *shared;
680 loop {
681 let task = {
682 let mut state = mutex.lock().unwrap();
683 loop {
684 if !state.paused || state.draining {
685 if let Some(entry) = state.queue.pop() {
686 break Some((entry.task, entry.enqueued_at));
687 }
688 }
689 if state.shutdown || (state.draining && state.queue.is_empty()) {
690 break None;
691 }
692 state = condvar.wait(state).unwrap();
693 }
694 };
695 match task {
696 Some((task, enqueued_at)) => {
697 stats.in_flight.fetch_add(1, AtomicOrdering::SeqCst);
698 let start = Instant::now();
699 let completion = task();
700 let elapsed = start.elapsed();
701 let total_latency = enqueued_at.elapsed();
702 stats.record_latency(total_latency);
703 stats.in_flight.fetch_sub(1, AtomicOrdering::SeqCst);
704
705 let success = TASK_SUCCESS.with(|s| s.get());
709 if success {
710 stats.completed.fetch_add(1, AtomicOrdering::Relaxed);
711 } else {
712 stats.failed.fetch_add(1, AtomicOrdering::Relaxed);
713 }
714
715 if let Ok(guard) = callback.lock() {
717 if let Some(ref cb) = *guard {
718 cb(success, elapsed);
719 }
720 }
721
722 completion();
725
726 condvar.notify_all();
728 }
729 None => return,
730 }
731 }
732}
733
734#[cfg(test)]
735mod tests {
736 use super::*;
737 use std::sync::atomic::{AtomicUsize, Ordering};
738 use std::sync::mpsc;
739 use std::sync::Barrier;
740 use std::time::Duration;
741
742 #[test]
743 fn submit_and_join() {
744 let queue = TaskQueue::new(1);
745 let handle = queue.submit(|| 42);
746 assert_eq!(handle.join().unwrap(), 42);
747 queue.shutdown();
748 }
749
750 #[test]
751 fn submit_multiple_tasks_all_complete() {
752 let queue = TaskQueue::new(2);
753 let handles: Vec<_> = (0..10).map(|i| queue.submit(move || i * 2)).collect();
754 let results: Vec<_> = handles.into_iter().map(|h| h.join().unwrap()).collect();
755 for (i, r) in results.iter().enumerate() {
756 assert_eq!(*r, i * 2);
757 }
758 queue.shutdown();
759 }
760
761 #[test]
762 fn priority_ordering() {
763 let queue = TaskQueue::new(1);
764 let barrier = Arc::new(Barrier::new(2));
765 let order = Arc::new(Mutex::new(Vec::new()));
766
767 let b = barrier.clone();
769 queue.submit(move || {
770 b.wait();
771 });
772
773 thread::sleep(Duration::from_millis(50));
775
776 let o = order.clone();
778 let h_low = queue.submit_with_priority(Priority::Low, move || {
779 o.lock().unwrap().push("low");
780 });
781
782 let o = order.clone();
783 let h_high = queue.submit_with_priority(Priority::High, move || {
784 o.lock().unwrap().push("high");
785 });
786
787 let o = order.clone();
788 let h_normal = queue.submit_with_priority(Priority::Normal, move || {
789 o.lock().unwrap().push("normal");
790 });
791
792 barrier.wait();
794
795 h_low.join().unwrap();
797 h_high.join().unwrap();
798 h_normal.join().unwrap();
799
800 let final_order = order.lock().unwrap();
801 assert_eq!(*final_order, vec!["high", "normal", "low"]);
802
803 queue.shutdown();
804 }
805
806 #[test]
807 fn is_done_returns_false_then_true() {
808 let queue = TaskQueue::new(1);
809 let barrier = Arc::new(Barrier::new(2));
810
811 let b = barrier.clone();
812 let handle = queue.submit(move || {
813 b.wait();
814 99
815 });
816
817 assert!(!handle.is_done());
819
820 barrier.wait();
822
823 let result = handle.join().unwrap();
825 assert_eq!(result, 99);
826
827 queue.shutdown();
828 }
829
830 #[test]
831 fn shutdown_completes_running_tasks() {
832 let queue = TaskQueue::new(1);
833 let (tx, rx) = mpsc::channel();
834
835 queue.submit(move || {
836 thread::sleep(Duration::from_millis(50));
837 tx.send(true).unwrap();
838 });
839
840 thread::sleep(Duration::from_millis(10));
842
843 queue.shutdown();
845
846 assert!(rx.recv_timeout(Duration::from_millis(100)).unwrap());
848 }
849
850 #[test]
851 fn panicking_task_returns_panicked_error() {
852 let queue = TaskQueue::new(1);
853 let handle = queue.submit(|| {
854 panic!("intentional panic");
855 });
856 match handle.join() {
857 Err(TaskError::Panicked) => {}
858 other => panic!("expected TaskError::Panicked, got {:?}", other.err()),
859 }
860
861 let handle = queue.submit(|| 123);
863 assert_eq!(handle.join().unwrap(), 123);
864
865 queue.shutdown();
866 }
867
868 #[test]
869 fn concurrency_limit_is_respected() {
870 let concurrency = 3;
871 let queue = TaskQueue::new(concurrency);
872 let running = Arc::new(AtomicUsize::new(0));
873 let max_running = Arc::new(AtomicUsize::new(0));
874
875 let mut handles = Vec::new();
876 for _ in 0..concurrency * 2 {
877 let r = running.clone();
878 let m = max_running.clone();
879 handles.push(queue.submit(move || {
880 let current = r.fetch_add(1, Ordering::SeqCst) + 1;
881 loop {
883 let prev_max = m.load(Ordering::SeqCst);
884 if current <= prev_max {
885 break;
886 }
887 if m.compare_exchange(prev_max, current, Ordering::SeqCst, Ordering::SeqCst)
888 .is_ok()
889 {
890 break;
891 }
892 }
893 thread::sleep(Duration::from_millis(50));
894 r.fetch_sub(1, Ordering::SeqCst);
895 }));
896 }
897
898 for h in handles {
899 h.join().unwrap();
900 }
901
902 let observed_max = max_running.load(Ordering::SeqCst);
903 assert!(
904 observed_max <= concurrency,
905 "max concurrent tasks ({observed_max}) exceeded concurrency limit ({concurrency})"
906 );
907
908 queue.shutdown();
909 }
910
911 #[test]
912 fn stats_tracks_submitted_and_completed() {
913 let queue = TaskQueue::new(2);
914
915 let handles: Vec<_> = (0..5).map(|i| queue.submit(move || i)).collect();
916 for h in handles {
917 h.join().unwrap();
918 }
919
920 let s = queue.stats();
921 assert_eq!(s.total_submitted, 5);
922 assert_eq!(s.completed, 5);
923 assert_eq!(s.failed, 0);
924 assert_eq!(s.in_flight, 0);
925
926 queue.shutdown();
927 }
928
929 #[test]
930 fn stats_tracks_failures() {
931 let queue = TaskQueue::new(1);
932
933 let h1 = queue.submit(|| panic!("boom"));
934 let _ = h1.join(); let h2 = queue.submit(|| 42);
937 h2.join().unwrap();
938
939 let s = queue.stats();
940 assert_eq!(s.total_submitted, 2);
941 assert_eq!(s.completed, 1);
942 assert_eq!(s.failed, 1);
943
944 queue.shutdown();
945 }
946
947 #[test]
948 fn drain_completes_all_pending_tasks() {
949 let queue = TaskQueue::new(1);
950 let counter = Arc::new(AtomicUsize::new(0));
951
952 for _ in 0..10 {
953 let c = counter.clone();
954 queue.submit(move || {
955 c.fetch_add(1, Ordering::SeqCst);
956 });
957 }
958
959 queue.drain();
960 assert_eq!(counter.load(Ordering::SeqCst), 10);
961 }
962
963 #[test]
964 fn drain_rejects_new_submissions() {
965 let queue = TaskQueue::new(1);
966 let barrier = Arc::new(Barrier::new(2));
967
968 let b = barrier.clone();
970 queue.submit(move || {
971 b.wait();
972 });
973
974 thread::sleep(Duration::from_millis(50));
976
977 let counter = Arc::new(AtomicUsize::new(0));
979 let c = counter.clone();
980 queue.submit(move || {
981 c.fetch_add(1, Ordering::SeqCst);
982 });
983
984 barrier.wait();
989 queue.drain();
990 assert_eq!(counter.load(Ordering::SeqCst), 1);
991 }
992
993 #[test]
994 fn on_complete_callback_fires_on_success() {
995 let queue = TaskQueue::new(1);
996 let call_count = Arc::new(AtomicUsize::new(0));
997 let success_count = Arc::new(AtomicUsize::new(0));
998
999 let cc = call_count.clone();
1000 let sc = success_count.clone();
1001 queue.on_complete(move |success, dur| {
1002 cc.fetch_add(1, Ordering::SeqCst);
1003 if success {
1004 sc.fetch_add(1, Ordering::SeqCst);
1005 }
1006 assert!(dur.as_nanos() > 0);
1007 });
1008
1009 let h = queue.submit(|| 42);
1010 h.join().unwrap();
1011
1012 assert_eq!(call_count.load(Ordering::SeqCst), 1);
1013 assert_eq!(success_count.load(Ordering::SeqCst), 1);
1014
1015 queue.shutdown();
1016 }
1017
1018 #[test]
1019 fn on_complete_callback_fires_on_failure() {
1020 let queue = TaskQueue::new(1);
1021 let failure_count = Arc::new(AtomicUsize::new(0));
1022
1023 let fc = failure_count.clone();
1024 queue.on_complete(move |success, _dur| {
1025 if !success {
1026 fc.fetch_add(1, Ordering::SeqCst);
1027 }
1028 });
1029
1030 let h = queue.submit(|| panic!("intentional"));
1031 let _ = h.join();
1032
1033 assert_eq!(failure_count.load(Ordering::SeqCst), 1);
1034
1035 queue.shutdown();
1036 }
1037
1038 #[test]
1039 fn on_complete_callback_reports_duration() {
1040 let queue = TaskQueue::new(1);
1041 let observed_duration = Arc::new(Mutex::new(Duration::ZERO));
1042
1043 let od = observed_duration.clone();
1044 queue.on_complete(move |_success, dur| {
1045 *od.lock().unwrap() = dur;
1046 });
1047
1048 let h = queue.submit(|| {
1049 thread::sleep(Duration::from_millis(50));
1050 });
1051 h.join().unwrap();
1052
1053 let dur = *observed_duration.lock().unwrap();
1054 assert!(dur >= Duration::from_millis(40), "duration was {dur:?}");
1055
1056 queue.shutdown();
1057 }
1058
1059 #[test]
1060 fn replacing_callback() {
1061 let queue = TaskQueue::new(1);
1062 let first_count = Arc::new(AtomicUsize::new(0));
1063 let second_count = Arc::new(AtomicUsize::new(0));
1064
1065 let fc = first_count.clone();
1066 queue.on_complete(move |_, _| {
1067 fc.fetch_add(1, Ordering::SeqCst);
1068 });
1069
1070 queue.submit(|| {}).join().unwrap();
1071
1072 let sc = second_count.clone();
1073 queue.on_complete(move |_, _| {
1074 sc.fetch_add(1, Ordering::SeqCst);
1075 });
1076
1077 queue.submit(|| {}).join().unwrap();
1078
1079 assert_eq!(first_count.load(Ordering::SeqCst), 1);
1080 assert_eq!(second_count.load(Ordering::SeqCst), 1);
1081
1082 queue.shutdown();
1083 }
1084
1085 #[test]
1086 fn test_with_capacity_rejects_when_full() {
1087 let queue = TaskQueue::with_capacity(1, 2);
1088 queue.pause();
1090
1091 let h1 = queue.submit(|| 1);
1092 let h2 = queue.submit(|| 2);
1093 let h3 = queue.submit(|| 3); queue.resume();
1097 assert!(matches!(h3.join(), Err(TaskError::QueueFull)));
1098
1099 assert!(h1.join().is_ok());
1101 assert!(h2.join().is_ok());
1102 queue.shutdown();
1103 }
1104
1105 #[test]
1106 fn test_with_capacity_allows_within_limit() {
1107 let queue = TaskQueue::with_capacity(2, 10);
1108 let handles: Vec<_> = (0..10).map(|i| queue.submit(move || i)).collect();
1109 for (i, h) in handles.into_iter().enumerate() {
1110 assert_eq!(h.join().unwrap(), i);
1111 }
1112 queue.shutdown();
1113 }
1114
1115 #[test]
1116 fn test_pause_and_resume() {
1117 let queue = TaskQueue::new(2);
1118 queue.pause();
1119
1120 let counter = Arc::new(AtomicUsize::new(0));
1121 let c = counter.clone();
1122 queue.submit(move || {
1123 c.fetch_add(1, Ordering::SeqCst);
1124 });
1125
1126 thread::sleep(Duration::from_millis(50));
1128 assert_eq!(counter.load(Ordering::SeqCst), 0);
1129
1130 queue.resume();
1131 thread::sleep(Duration::from_millis(100));
1133 assert_eq!(counter.load(Ordering::SeqCst), 1);
1134
1135 queue.shutdown();
1136 }
1137
1138 #[test]
1139 fn test_is_paused() {
1140 let queue = TaskQueue::new(1);
1141 assert!(!queue.is_paused());
1142 queue.pause();
1143 assert!(queue.is_paused());
1144 queue.resume();
1145 assert!(!queue.is_paused());
1146 queue.shutdown();
1147 }
1148
1149 #[test]
1150 fn test_drain_overrides_pause() {
1151 let queue = TaskQueue::new(2);
1152 queue.pause();
1153
1154 let counter = Arc::new(AtomicUsize::new(0));
1155 for _ in 0..5 {
1156 let c = counter.clone();
1157 queue.submit(move || {
1158 c.fetch_add(1, Ordering::SeqCst);
1159 });
1160 }
1161
1162 queue.drain();
1164 assert_eq!(counter.load(Ordering::SeqCst), 5);
1165 }
1166
1167 #[test]
1168 fn test_pending_count() {
1169 let queue = TaskQueue::new(1);
1170 queue.pause();
1171
1172 assert_eq!(queue.pending_count(), 0);
1173 queue.submit(|| 1);
1174 queue.submit(|| 2);
1175 assert_eq!(queue.pending_count(), 2);
1176
1177 queue.resume();
1178 thread::sleep(Duration::from_millis(100));
1179 assert_eq!(queue.pending_count(), 0);
1180
1181 queue.shutdown();
1182 }
1183
1184 #[test]
1185 fn test_queue_full_error_display() {
1186 assert_eq!(
1187 format!("{}", TaskError::QueueFull),
1188 "task rejected: queue is full"
1189 );
1190 }
1191}