1use std::{
13 future::Future,
14 pin::Pin,
15 sync::{
16 Arc,
17 atomic::{AtomicBool, AtomicUsize, Ordering},
18 },
19};
20
21use crossbeam_deque::Injector;
22use qubit_function::Callable;
23
24use qubit_executor::{TaskCompletionPair, TaskHandle};
25use qubit_lock::Monitor;
26
27use super::fixed_thread_pool_builder::FixedThreadPoolBuilder;
28use super::queue_steal_source::{steal_batch_and_pop, steal_one};
29use super::thread_pool::{ThreadPoolBuildError, ThreadPoolStats};
30use super::worker_queue::WorkerQueue;
31use super::worker_runtime::WorkerRuntime;
32use crate::thread_pool::PoolJob;
33use qubit_executor::service::{ExecutorService, RejectedExecution, ShutdownReport};
34
35const LOCAL_ENQUEUE_MAX_PROBES: usize = 4;
37const LOCAL_QUEUE_WORKER_LIMIT: usize = 4;
39
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42enum FixedThreadPoolLifecycle {
43 Running,
45
46 Shutdown,
48
49 Stopping,
51}
52
53impl FixedThreadPoolLifecycle {
54 const fn is_running(self) -> bool {
60 matches!(self, Self::Running)
61 }
62}
63
64struct FixedThreadPoolState {
66 lifecycle: FixedThreadPoolLifecycle,
68 live_workers: usize,
70 idle_workers: usize,
72}
73
74impl FixedThreadPoolState {
75 fn new() -> Self {
81 Self {
82 lifecycle: FixedThreadPoolLifecycle::Running,
83 live_workers: 0,
84 idle_workers: 0,
85 }
86 }
87}
88
89struct FixedThreadPoolInner {
91 pool_size: usize,
93 state: Monitor<FixedThreadPoolState>,
95 accepting: AtomicBool,
97 stop_now: AtomicBool,
99 inflight_submissions: AtomicUsize,
101 idle_worker_count: AtomicUsize,
103 pending_worker_wakes: AtomicUsize,
105 global_queue: Injector<PoolJob>,
107 worker_queues: Vec<Arc<WorkerQueue>>,
109 next_enqueue_worker: AtomicUsize,
111 queue_capacity: Option<usize>,
113 queued_task_count: AtomicUsize,
115 running_task_count: AtomicUsize,
117 submitted_task_count: AtomicUsize,
119 completed_task_count: AtomicUsize,
121 cancelled_task_count: AtomicUsize,
123}
124
125impl FixedThreadPoolInner {
126 fn new(
137 pool_size: usize,
138 queue_capacity: Option<usize>,
139 worker_queues: Vec<Arc<WorkerQueue>>,
140 ) -> Self {
141 Self {
142 pool_size,
143 state: Monitor::new(FixedThreadPoolState::new()),
144 accepting: AtomicBool::new(true),
145 stop_now: AtomicBool::new(false),
146 inflight_submissions: AtomicUsize::new(0),
147 idle_worker_count: AtomicUsize::new(0),
148 pending_worker_wakes: AtomicUsize::new(0),
149 global_queue: Injector::new(),
150 worker_queues,
151 next_enqueue_worker: AtomicUsize::new(0),
152 queue_capacity,
153 queued_task_count: AtomicUsize::new(0),
154 running_task_count: AtomicUsize::new(0),
155 submitted_task_count: AtomicUsize::new(0),
156 completed_task_count: AtomicUsize::new(0),
157 cancelled_task_count: AtomicUsize::new(0),
158 }
159 }
160
161 #[inline]
167 fn pool_size(&self) -> usize {
168 self.pool_size
169 }
170
171 #[inline]
177 fn queued_count(&self) -> usize {
178 self.queued_task_count.load(Ordering::Acquire)
179 }
180
181 #[inline]
187 fn running_count(&self) -> usize {
188 self.running_task_count.load(Ordering::Acquire)
189 }
190
191 #[inline]
197 fn inflight_count(&self) -> usize {
198 self.inflight_submissions.load(Ordering::Acquire)
199 }
200
201 fn begin_submit(&self) -> Result<FixedSubmitGuard<'_>, RejectedExecution> {
211 if !self.accepting.load(Ordering::Acquire) {
212 return Err(RejectedExecution::Shutdown);
213 }
214 self.inflight_submissions.fetch_add(1, Ordering::AcqRel);
215 if self.accepting.load(Ordering::Acquire) {
216 Ok(FixedSubmitGuard { inner: self })
217 } else {
218 let previous = self.inflight_submissions.fetch_sub(1, Ordering::AcqRel);
219 debug_assert!(previous > 0, "fixed pool submit counter underflow");
220 if previous == 1 {
221 self.state.notify_all();
222 }
223 Err(RejectedExecution::Shutdown)
224 }
225 }
226
227 fn reserve_queue_slot(&self) -> bool {
233 if let Some(capacity) = self.queue_capacity {
234 loop {
235 let current = self.queued_count();
236 if current >= capacity {
237 return false;
238 }
239 if self
240 .queued_task_count
241 .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
242 .is_ok()
243 {
244 return true;
245 }
246 }
247 }
248 self.queued_task_count.fetch_add(1, Ordering::AcqRel);
249 true
250 }
251
252 fn submit(&self, job: PoolJob) -> Result<(), RejectedExecution> {
267 let _guard = self.begin_submit()?;
268 if !self.reserve_queue_slot() {
269 return Err(RejectedExecution::Saturated);
270 }
271 if !self.accepting.load(Ordering::Acquire) {
272 let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
273 debug_assert!(previous > 0, "fixed pool queued counter underflow");
274 return Err(RejectedExecution::Shutdown);
275 }
276 self.submitted_task_count.fetch_add(1, Ordering::Relaxed);
277 self.enqueue_job(job);
278 Ok(())
279 }
280
281 fn enqueue_job(&self, job: PoolJob) {
287 if self.use_worker_local_queues() {
288 match self.try_enqueue_to_worker(job) {
289 Ok(()) => {}
290 Err(job) => self.global_queue.push(job),
291 }
292 } else {
293 self.global_queue.push(job);
294 }
295 self.wake_one_idle_worker();
296 }
297
298 fn wake_one_idle_worker(&self) {
304 loop {
305 let idle_workers = self.idle_worker_count.load(Ordering::Acquire);
306 if idle_workers == 0 {
307 return;
308 }
309 let pending_wakes = self.pending_worker_wakes.load(Ordering::Acquire);
310 if pending_wakes >= idle_workers {
311 return;
312 }
313 if self
314 .pending_worker_wakes
315 .compare_exchange_weak(
316 pending_wakes,
317 pending_wakes + 1,
318 Ordering::AcqRel,
319 Ordering::Acquire,
320 )
321 .is_ok()
322 {
323 self.state.notify_one();
324 return;
325 }
326 }
327 }
328
329 fn has_pending_worker_wake(&self) -> bool {
336 self.pending_worker_wakes.load(Ordering::Acquire) > 0
337 }
338
339 fn consume_pending_worker_wake(&self) {
341 let mut current = self.pending_worker_wakes.load(Ordering::Acquire);
342 while current > 0 {
343 match self.pending_worker_wakes.compare_exchange_weak(
344 current,
345 current - 1,
346 Ordering::AcqRel,
347 Ordering::Acquire,
348 ) {
349 Ok(_) => return,
350 Err(actual) => current = actual,
351 }
352 }
353 }
354
355 fn try_enqueue_to_worker(&self, job: PoolJob) -> Result<(), PoolJob> {
366 let queue_count = self.worker_queues.len();
367 debug_assert!(queue_count > 0, "fixed pool must have worker queues");
368 let probe_count = queue_count.min(LOCAL_ENQUEUE_MAX_PROBES);
369 for _ in 0..probe_count {
370 let index = self.next_enqueue_worker.fetch_add(1, Ordering::Relaxed) % queue_count;
371 let queue = &self.worker_queues[index];
372 if queue.is_active() {
373 queue.push_back(job);
374 return Ok(());
375 }
376 }
377 Err(job)
378 }
379
380 fn try_take_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
395 if self.stop_now.load(Ordering::Acquire) {
396 self.cancel_worker_jobs(worker_runtime);
397 return None;
398 }
399 if !self.use_worker_local_queues() {
400 return self.steal_single_global_job(worker_runtime);
401 }
402 if let Some(job) = worker_runtime.local.pop() {
403 return self.accept_claimed_job(job, worker_runtime);
404 }
405 if let Some(job) = worker_runtime.queue.pop_inbox_into(&worker_runtime.local) {
406 return self.accept_claimed_job(job, worker_runtime);
407 }
408 if let Some(job) = self.steal_global_job(worker_runtime) {
409 return Some(job);
410 }
411 self.steal_worker_job(worker_runtime)
412 }
413
414 fn steal_global_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
424 if let Some(job) = steal_batch_and_pop(&self.global_queue, &worker_runtime.local) {
425 if !worker_runtime.local.is_empty() {
426 self.state.notify_one();
427 }
428 return self.accept_claimed_job(job, worker_runtime);
429 }
430 self.steal_single_global_job(worker_runtime)
431 }
432
433 fn steal_single_global_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
443 steal_one(&self.global_queue).and_then(|job| self.accept_claimed_job(job, worker_runtime))
444 }
445
446 fn steal_worker_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
456 let queue_count = self.worker_queues.len();
457 if queue_count <= 1 {
458 return None;
459 }
460 let worker_index = worker_runtime.worker_index();
461 let start = worker_runtime.next_steal_start(queue_count);
462 for offset in 0..queue_count {
463 let victim = &self.worker_queues[(start + offset) % queue_count];
464 if victim.worker_index() == worker_index {
465 continue;
466 }
467 if !victim.is_active() {
468 continue;
469 }
470 if let Some(job) = victim.steal_into(&worker_runtime.local) {
471 if !worker_runtime.local.is_empty() {
472 self.state.notify_one();
473 }
474 return self.accept_claimed_job(job, worker_runtime);
475 }
476 }
477 None
478 }
479
480 fn use_worker_local_queues(&self) -> bool {
488 self.pool_size <= LOCAL_QUEUE_WORKER_LIMIT
489 }
490
491 fn accept_claimed_job(&self, job: PoolJob, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
502 if self.stop_now.load(Ordering::Acquire) {
503 self.cancel_claimed_job(job);
504 self.cancel_worker_jobs(worker_runtime);
505 return None;
506 }
507 self.mark_queued_job_running();
508 Some(job)
509 }
510
511 fn cancel_worker_jobs(&self, worker_runtime: &WorkerRuntime) {
517 while let Some(job) = worker_runtime.local.pop() {
518 self.cancel_claimed_job(job);
519 }
520 for job in worker_runtime.queue.drain() {
521 self.cancel_claimed_job(job);
522 }
523 }
524
525 fn mark_queued_job_running(&self) {
527 let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
528 debug_assert!(previous > 0, "fixed pool queued counter underflow");
529 self.running_task_count.fetch_add(1, Ordering::AcqRel);
530 }
531
532 fn cancel_claimed_job(&self, job: PoolJob) {
538 let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
539 debug_assert!(previous > 0, "fixed pool queued counter underflow");
540 self.cancelled_task_count.fetch_add(1, Ordering::Relaxed);
541 job.cancel();
542 self.state.notify_all();
543 }
544
545 fn finish_running_job(&self) {
547 let previous = self.running_task_count.fetch_sub(1, Ordering::AcqRel);
548 debug_assert!(previous > 0, "fixed pool running counter underflow");
549 self.completed_task_count.fetch_add(1, Ordering::Relaxed);
550 if previous == 1 && self.queued_count() == 0 {
551 self.state.notify_all();
552 }
553 }
554
555 pub(crate) fn reserve_worker_slot(&self) {
557 self.state.write(|state| {
558 state.live_workers += 1;
559 });
560 }
561
562 pub(crate) fn rollback_worker_slot(&self) {
564 self.state.write(|state| {
565 state.live_workers = state
566 .live_workers
567 .checked_sub(1)
568 .expect("fixed pool live worker counter underflow");
569 });
570 }
571
572 pub(crate) fn stop_after_failed_build(&self) {
574 self.accepting.store(false, Ordering::Release);
575 self.stop_now.store(true, Ordering::Release);
576 self.state.write(|state| {
577 state.lifecycle = FixedThreadPoolLifecycle::Stopping;
578 });
579 self.state.notify_all();
580 }
581
582 fn wait_for_termination(&self) {
584 self.state
585 .wait_until(|state| self.is_terminated_locked(state), |_| ());
586 }
587
588 fn shutdown(&self) {
590 self.accepting.store(false, Ordering::Release);
591 self.state.write(|state| {
592 if state.lifecycle.is_running() {
593 state.lifecycle = FixedThreadPoolLifecycle::Shutdown;
594 }
595 });
596 self.state.notify_all();
597 }
598
599 fn shutdown_now(&self) -> ShutdownReport {
605 self.accepting.store(false, Ordering::Release);
606 self.stop_now.store(true, Ordering::Release);
607 let running = self.running_count();
608 let mut state = self.state.lock();
609 state.lifecycle = FixedThreadPoolLifecycle::Stopping;
610 while self.inflight_count() > 0 {
611 state = state.wait();
612 }
613 drop(state);
614 let jobs = self.drain_visible_queued_jobs();
615 let cancelled = jobs.len();
616 for job in jobs {
617 self.cancel_claimed_job(job);
618 }
619 self.state.notify_all();
620 ShutdownReport::new(cancelled, running, cancelled)
621 }
622
623 fn drain_visible_queued_jobs(&self) -> Vec<PoolJob> {
629 let mut jobs = Vec::new();
630 loop {
631 let previous_count = jobs.len();
632 self.drain_global_queue(&mut jobs);
633 self.drain_worker_queues(&mut jobs);
634 if jobs.len() == previous_count {
635 return jobs;
636 }
637 }
638 }
639
640 fn drain_global_queue(&self, jobs: &mut Vec<PoolJob>) {
646 while let Some(job) = steal_one(&self.global_queue) {
647 jobs.push(job);
648 }
649 }
650
651 fn drain_worker_queues(&self, jobs: &mut Vec<PoolJob>) {
657 for queue in &self.worker_queues {
658 jobs.extend(queue.drain());
659 }
660 }
661
662 fn is_shutdown(&self) -> bool {
668 self.state.read(|state| !state.lifecycle.is_running())
669 }
670
671 fn is_terminated(&self) -> bool {
677 self.state.read(|state| self.is_terminated_locked(state))
678 }
679
680 fn is_terminated_locked(&self, state: &FixedThreadPoolState) -> bool {
690 !state.lifecycle.is_running()
691 && state.live_workers == 0
692 && self.queued_count() == 0
693 && self.running_count() == 0
694 && self.inflight_count() == 0
695 }
696
697 fn stats(&self) -> ThreadPoolStats {
703 let queued_tasks = self.queued_count();
704 let running_tasks = self.running_count();
705 let submitted_tasks = self.submitted_task_count.load(Ordering::Relaxed);
706 let completed_tasks = self.completed_task_count.load(Ordering::Relaxed);
707 let cancelled_tasks = self.cancelled_task_count.load(Ordering::Relaxed);
708 self.state.read(|state| ThreadPoolStats {
709 core_pool_size: self.pool_size,
710 maximum_pool_size: self.pool_size,
711 live_workers: state.live_workers,
712 idle_workers: state.idle_workers,
713 queued_tasks,
714 running_tasks,
715 submitted_tasks,
716 completed_tasks,
717 cancelled_tasks,
718 shutdown: !state.lifecycle.is_running(),
719 terminated: self.is_terminated_locked(state),
720 })
721 }
722}
723
724struct FixedSubmitGuard<'a> {
726 inner: &'a FixedThreadPoolInner,
728}
729
730impl Drop for FixedSubmitGuard<'_> {
731 fn drop(&mut self) {
733 let previous = self
734 .inner
735 .inflight_submissions
736 .fetch_sub(1, Ordering::AcqRel);
737 debug_assert!(previous > 0, "fixed pool submit counter underflow");
738 if previous == 1 && !self.inner.accepting.load(Ordering::Acquire) {
739 self.inner.state.notify_all();
740 }
741 }
742}
743
744pub struct FixedThreadPool {
750 inner: Arc<FixedThreadPoolInner>,
752}
753
754impl FixedThreadPool {
755 pub(crate) fn build_with_options(
772 pool_size: usize,
773 queue_capacity: Option<usize>,
774 thread_name_prefix: String,
775 stack_size: Option<usize>,
776 ) -> Result<Self, ThreadPoolBuildError> {
777 let mut worker_runtimes = Vec::with_capacity(pool_size);
778 let mut worker_queues = Vec::with_capacity(pool_size);
779 for index in 0..pool_size {
780 let worker_runtime = WorkerRuntime::new(index);
781 worker_queues.push(Arc::clone(&worker_runtime.queue));
782 worker_runtimes.push(worker_runtime);
783 }
784 let inner = Arc::new(FixedThreadPoolInner::new(
785 pool_size,
786 queue_capacity,
787 worker_queues,
788 ));
789 for (index, worker_runtime) in worker_runtimes.into_iter().enumerate() {
790 inner.reserve_worker_slot();
791 let worker_inner = Arc::clone(&inner);
792 let mut builder =
793 std::thread::Builder::new().name(format!("{}-{}", thread_name_prefix, index));
794 if let Some(stack_size) = stack_size {
795 builder = builder.stack_size(stack_size);
796 }
797 if let Err(source) =
798 builder.spawn(move || run_fixed_worker(worker_inner, worker_runtime))
799 {
800 inner.rollback_worker_slot();
801 inner.stop_after_failed_build();
802 return Err(ThreadPoolBuildError::SpawnWorker { index, source });
803 }
804 }
805 Ok(Self { inner })
806 }
807
808 pub fn new(pool_size: usize) -> Result<Self, ThreadPoolBuildError> {
823 Self::builder().pool_size(pool_size).build()
824 }
825
826 pub fn builder() -> FixedThreadPoolBuilder {
832 FixedThreadPoolBuilder::new()
833 }
834
835 pub fn pool_size(&self) -> usize {
841 self.inner.pool_size()
842 }
843
844 pub fn queued_count(&self) -> usize {
850 self.inner.queued_count()
851 }
852
853 pub fn running_count(&self) -> usize {
859 self.inner.running_count()
860 }
861
862 pub fn live_worker_count(&self) -> usize {
868 self.inner.state.read(|state| state.live_workers)
869 }
870
871 pub fn stats(&self) -> ThreadPoolStats {
877 self.inner.stats()
878 }
879}
880
881impl Drop for FixedThreadPool {
882 fn drop(&mut self) {
884 self.inner.shutdown();
885 }
886}
887
888impl ExecutorService for FixedThreadPool {
889 type Handle<R, E>
890 = TaskHandle<R, E>
891 where
892 R: Send + 'static,
893 E: Send + 'static;
894
895 type Termination<'a>
896 = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
897 where
898 Self: 'a;
899
900 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
915 where
916 C: Callable<R, E> + Send + 'static,
917 R: Send + 'static,
918 E: Send + 'static,
919 {
920 let (handle, completion) = TaskCompletionPair::new().into_parts();
921 let job = PoolJob::from_task(task, completion);
922 self.inner.submit(job)?;
923 Ok(handle)
924 }
925
926 fn shutdown(&self) {
928 self.inner.shutdown();
929 }
930
931 fn shutdown_now(&self) -> ShutdownReport {
937 self.inner.shutdown_now()
938 }
939
940 fn is_shutdown(&self) -> bool {
946 self.inner.is_shutdown()
947 }
948
949 fn is_terminated(&self) -> bool {
955 self.inner.is_terminated()
956 }
957
958 fn await_termination(&self) -> Self::Termination<'_> {
964 Box::pin(async move {
965 self.inner.wait_for_termination();
966 })
967 }
968}
969
970fn run_fixed_worker(inner: Arc<FixedThreadPoolInner>, worker_runtime: WorkerRuntime) {
977 worker_runtime.queue.activate();
978 loop {
979 if let Some(job) = inner.try_take_job(&worker_runtime) {
980 job.run();
981 inner.finish_running_job();
982 continue;
983 }
984 if !wait_for_fixed_pool_work(&inner) {
985 break;
986 }
987 }
988 worker_exited(&inner, &worker_runtime.queue);
989}
990
991fn wait_for_fixed_pool_work(inner: &FixedThreadPoolInner) -> bool {
1002 let mut state = inner.state.lock();
1003 loop {
1004 match state.lifecycle {
1005 FixedThreadPoolLifecycle::Running => {
1006 if inner.queued_count() > 0 {
1007 return true;
1008 }
1009 mark_fixed_worker_idle(inner, &mut state);
1010 if inner.queued_count() > 0 || inner.has_pending_worker_wake() {
1011 unmark_fixed_worker_idle(inner, &mut state);
1012 return true;
1013 }
1014 state = state.wait();
1015 unmark_fixed_worker_idle(inner, &mut state);
1016 }
1017 FixedThreadPoolLifecycle::Shutdown => {
1018 if inner.queued_count() > 0 {
1019 return true;
1020 }
1021 if inner.queued_count() == 0 && inner.inflight_count() == 0 {
1022 return false;
1023 }
1024 mark_fixed_worker_idle(inner, &mut state);
1025 if inner.queued_count() > 0
1026 || inner.inflight_count() == 0
1027 || inner.has_pending_worker_wake()
1028 {
1029 unmark_fixed_worker_idle(inner, &mut state);
1030 continue;
1031 }
1032 state = state.wait();
1033 unmark_fixed_worker_idle(inner, &mut state);
1034 }
1035 FixedThreadPoolLifecycle::Stopping => return false,
1036 }
1037 }
1038}
1039
1040fn mark_fixed_worker_idle(inner: &FixedThreadPoolInner, state: &mut FixedThreadPoolState) {
1047 state.idle_workers += 1;
1048 inner.idle_worker_count.fetch_add(1, Ordering::AcqRel);
1049}
1050
1051fn unmark_fixed_worker_idle(inner: &FixedThreadPoolInner, state: &mut FixedThreadPoolState) {
1058 state.idle_workers = state
1059 .idle_workers
1060 .checked_sub(1)
1061 .expect("fixed pool idle worker counter underflow");
1062 let previous = inner.idle_worker_count.fetch_sub(1, Ordering::AcqRel);
1063 debug_assert!(previous > 0, "fixed pool idle worker counter underflow");
1064 inner.consume_pending_worker_wake();
1065}
1066
1067fn worker_exited(inner: &FixedThreadPoolInner, worker_queue: &WorkerQueue) {
1074 worker_queue.deactivate();
1075 inner.state.write(|state| {
1076 state.live_workers = state
1077 .live_workers
1078 .checked_sub(1)
1079 .expect("fixed pool live worker counter underflow");
1080 });
1081 inner.state.notify_all();
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086 use super::*;
1087 use std::sync::{
1088 Arc,
1089 atomic::{AtomicUsize, Ordering},
1090 };
1091 use std::thread;
1092 use std::time::Duration;
1093
1094 fn counted_job(cancelled: Arc<AtomicUsize>, ran: Arc<AtomicUsize>) -> PoolJob {
1095 PoolJob::new(
1096 Box::new(move || {
1097 ran.fetch_add(1, Ordering::AcqRel);
1098 }),
1099 Box::new(move || {
1100 cancelled.fetch_add(1, Ordering::AcqRel);
1101 }),
1102 )
1103 }
1104
1105 #[test]
1106 fn test_accept_claimed_job_stop_now_cancels_claimed_and_worker_queues() {
1107 let runtime = WorkerRuntime::new(0);
1108 runtime.queue.activate();
1109 let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1110 inner.stop_now.store(true, Ordering::Release);
1111
1112 let cancelled = Arc::new(AtomicUsize::new(0));
1113 let ran = Arc::new(AtomicUsize::new(0));
1114 runtime
1115 .local
1116 .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1117 runtime
1118 .queue
1119 .push_back(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1120 inner.queued_task_count.store(3, Ordering::Release);
1121
1122 let accepted =
1123 inner.accept_claimed_job(counted_job(cancelled.clone(), ran.clone()), &runtime);
1124 assert!(accepted.is_none());
1125 assert_eq!(inner.queued_count(), 0);
1126 assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 3);
1127 assert_eq!(cancelled.load(Ordering::Acquire), 3);
1128 assert_eq!(ran.load(Ordering::Acquire), 0);
1129 }
1130
1131 #[test]
1132 fn test_steal_global_job_notifies_when_batch_leaves_local_jobs() {
1133 let runtime = WorkerRuntime::new(0);
1134 let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1135 let cancelled = Arc::new(AtomicUsize::new(0));
1136 let ran = Arc::new(AtomicUsize::new(0));
1137 runtime
1138 .local
1139 .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1140 inner
1141 .global_queue
1142 .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1143 inner.queued_task_count.store(2, Ordering::Release);
1144
1145 let claimed = inner
1146 .steal_global_job(&runtime)
1147 .expect("global queue should provide one claimed job");
1148 claimed.run();
1149 inner.finish_running_job();
1150 let remaining = runtime
1151 .local
1152 .pop()
1153 .expect("preloaded local job should remain queued");
1154 inner.cancel_claimed_job(remaining);
1155
1156 assert_eq!(inner.queued_count(), 0);
1157 assert_eq!(inner.running_count(), 0);
1158 assert_eq!(inner.completed_task_count.load(Ordering::Acquire), 1);
1159 assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 1);
1160 assert_eq!(ran.load(Ordering::Acquire), 1);
1161 assert_eq!(cancelled.load(Ordering::Acquire), 1);
1162 }
1163
1164 #[test]
1165 fn test_steal_worker_job_notifies_when_batch_leaves_local_jobs() {
1166 let thief = WorkerRuntime::new(0);
1167 let victim = WorkerRuntime::new(1);
1168 thief.queue.activate();
1169 victim.queue.activate();
1170 let inner = FixedThreadPoolInner::new(
1171 2,
1172 None,
1173 vec![Arc::clone(&thief.queue), Arc::clone(&victim.queue)],
1174 );
1175 let cancelled = Arc::new(AtomicUsize::new(0));
1176 let ran = Arc::new(AtomicUsize::new(0));
1177 thief
1178 .local
1179 .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1180 victim
1181 .queue
1182 .push_back(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1183 inner.queued_task_count.store(2, Ordering::Release);
1184
1185 let claimed = inner
1186 .steal_worker_job(&thief)
1187 .expect("victim queue should provide one claimed job");
1188 claimed.run();
1189 inner.finish_running_job();
1190 let remaining = thief
1191 .local
1192 .pop()
1193 .expect("batch steal should leave one local job");
1194 inner.cancel_claimed_job(remaining);
1195
1196 assert_eq!(inner.queued_count(), 0);
1197 assert_eq!(inner.running_count(), 0);
1198 assert_eq!(inner.completed_task_count.load(Ordering::Acquire), 1);
1199 assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 1);
1200 assert_eq!(ran.load(Ordering::Acquire), 1);
1201 assert_eq!(cancelled.load(Ordering::Acquire), 1);
1202 }
1203
1204 #[test]
1205 fn test_fixed_submit_guard_drop_notifies_when_shutdown_closes_admission() {
1206 let runtime = WorkerRuntime::new(0);
1207 let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1208 inner.inflight_submissions.store(1, Ordering::Release);
1209 inner.accepting.store(false, Ordering::Release);
1210
1211 {
1212 let guard = FixedSubmitGuard { inner: &inner };
1213 drop(guard);
1214 }
1215
1216 assert_eq!(inner.inflight_count(), 0);
1217 }
1218
1219 #[test]
1220 fn test_wait_for_fixed_pool_work_shutdown_waits_for_inflight_submissions() {
1221 let runtime = WorkerRuntime::new(0);
1222 let inner = Arc::new(FixedThreadPoolInner::new(
1223 1,
1224 None,
1225 vec![Arc::clone(&runtime.queue)],
1226 ));
1227 inner.state.write(|state| {
1228 state.lifecycle = FixedThreadPoolLifecycle::Shutdown;
1229 });
1230 inner.inflight_submissions.store(1, Ordering::Release);
1231 inner.pending_worker_wakes.store(1, Ordering::Release);
1232
1233 let inner_for_release = Arc::clone(&inner);
1234 let releaser = thread::spawn(move || {
1235 thread::sleep(Duration::from_millis(10));
1236 inner_for_release
1237 .inflight_submissions
1238 .store(0, Ordering::Release);
1239 inner_for_release.state.notify_all();
1240 });
1241
1242 assert!(!wait_for_fixed_pool_work(&inner));
1243 releaser.join().expect("releaser thread should finish");
1244 }
1245
1246 #[test]
1247 fn test_shutdown_now_waits_for_inflight_submissions() {
1248 let runtime = WorkerRuntime::new(0);
1249 let inner = Arc::new(FixedThreadPoolInner::new(
1250 1,
1251 None,
1252 vec![Arc::clone(&runtime.queue)],
1253 ));
1254 inner.inflight_submissions.store(1, Ordering::Release);
1255
1256 let inner_for_release = Arc::clone(&inner);
1257 let releaser = thread::spawn(move || {
1258 thread::sleep(Duration::from_millis(10));
1259 inner_for_release
1260 .inflight_submissions
1261 .store(0, Ordering::Release);
1262 inner_for_release.state.notify_all();
1263 });
1264
1265 let report = inner.shutdown_now();
1266 releaser.join().expect("releaser thread should finish");
1267 assert_eq!(report.running, 0);
1268 assert_eq!(report.queued, 0);
1269 assert_eq!(report.cancelled, 0);
1270 }
1271}