1use std::{
13 future::Future,
14 pin::Pin,
15 sync::{
16 Arc,
17 atomic::{
18 AtomicBool,
19 AtomicUsize,
20 Ordering,
21 },
22 },
23};
24
25use crossbeam_deque::Injector;
26use qubit_function::Callable;
27
28use qubit_executor::{
29 TaskCompletionPair,
30 TaskHandle,
31};
32use qubit_lock::Monitor;
33
34use super::fixed_thread_pool_builder::FixedThreadPoolBuilder;
35use super::queue_steal_source::{
36 steal_batch_and_pop,
37 steal_one,
38};
39use super::thread_pool::{
40 ThreadPoolBuildError,
41 ThreadPoolStats,
42};
43use super::worker_queue::WorkerQueue;
44use super::worker_runtime::WorkerRuntime;
45use crate::thread_pool::PoolJob;
46use qubit_executor::service::{
47 ExecutorService,
48 RejectedExecution,
49 ShutdownReport,
50};
51
52const LOCAL_ENQUEUE_MAX_PROBES: usize = 4;
54const LOCAL_QUEUE_WORKER_LIMIT: usize = 4;
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59enum FixedThreadPoolLifecycle {
60 Running,
62
63 Shutdown,
65
66 Stopping,
68}
69
70impl FixedThreadPoolLifecycle {
71 const fn is_running(self) -> bool {
77 matches!(self, Self::Running)
78 }
79}
80
81struct FixedThreadPoolState {
83 lifecycle: FixedThreadPoolLifecycle,
85 live_workers: usize,
87 idle_workers: usize,
89}
90
91impl FixedThreadPoolState {
92 fn new() -> Self {
98 Self {
99 lifecycle: FixedThreadPoolLifecycle::Running,
100 live_workers: 0,
101 idle_workers: 0,
102 }
103 }
104}
105
106struct FixedThreadPoolInner {
108 pool_size: usize,
110 state: Monitor<FixedThreadPoolState>,
112 accepting: AtomicBool,
114 stop_now: AtomicBool,
116 inflight_submissions: AtomicUsize,
118 idle_worker_count: AtomicUsize,
120 pending_worker_wakes: AtomicUsize,
122 global_queue: Injector<PoolJob>,
124 worker_queues: Vec<Arc<WorkerQueue>>,
126 next_enqueue_worker: AtomicUsize,
128 queue_capacity: Option<usize>,
130 queued_task_count: AtomicUsize,
132 running_task_count: AtomicUsize,
134 submitted_task_count: AtomicUsize,
136 completed_task_count: AtomicUsize,
138 cancelled_task_count: AtomicUsize,
140}
141
142impl FixedThreadPoolInner {
143 fn new(
154 pool_size: usize,
155 queue_capacity: Option<usize>,
156 worker_queues: Vec<Arc<WorkerQueue>>,
157 ) -> Self {
158 Self {
159 pool_size,
160 state: Monitor::new(FixedThreadPoolState::new()),
161 accepting: AtomicBool::new(true),
162 stop_now: AtomicBool::new(false),
163 inflight_submissions: AtomicUsize::new(0),
164 idle_worker_count: AtomicUsize::new(0),
165 pending_worker_wakes: AtomicUsize::new(0),
166 global_queue: Injector::new(),
167 worker_queues,
168 next_enqueue_worker: AtomicUsize::new(0),
169 queue_capacity,
170 queued_task_count: AtomicUsize::new(0),
171 running_task_count: AtomicUsize::new(0),
172 submitted_task_count: AtomicUsize::new(0),
173 completed_task_count: AtomicUsize::new(0),
174 cancelled_task_count: AtomicUsize::new(0),
175 }
176 }
177
178 #[inline]
184 fn pool_size(&self) -> usize {
185 self.pool_size
186 }
187
188 #[inline]
194 fn queued_count(&self) -> usize {
195 self.queued_task_count.load(Ordering::Acquire)
196 }
197
198 #[inline]
204 fn running_count(&self) -> usize {
205 self.running_task_count.load(Ordering::Acquire)
206 }
207
208 #[inline]
214 fn inflight_count(&self) -> usize {
215 self.inflight_submissions.load(Ordering::Acquire)
216 }
217
218 fn begin_submit(&self) -> Result<FixedSubmitGuard<'_>, RejectedExecution> {
228 if !self.accepting.load(Ordering::Acquire) {
229 return Err(RejectedExecution::Shutdown);
230 }
231 self.inflight_submissions.fetch_add(1, Ordering::AcqRel);
232 if self.accepting.load(Ordering::Acquire) {
233 Ok(FixedSubmitGuard { inner: self })
234 } else {
235 let previous = self.inflight_submissions.fetch_sub(1, Ordering::AcqRel);
236 debug_assert!(previous > 0, "fixed pool submit counter underflow");
237 if previous == 1 {
238 self.state.notify_all();
239 }
240 Err(RejectedExecution::Shutdown)
241 }
242 }
243
244 fn reserve_queue_slot(&self) -> bool {
250 if let Some(capacity) = self.queue_capacity {
251 loop {
252 let current = self.queued_count();
253 if current >= capacity {
254 return false;
255 }
256 if self
257 .queued_task_count
258 .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
259 .is_ok()
260 {
261 return true;
262 }
263 }
264 }
265 self.queued_task_count.fetch_add(1, Ordering::AcqRel);
266 true
267 }
268
269 fn submit(&self, job: PoolJob) -> Result<(), RejectedExecution> {
284 let _guard = self.begin_submit()?;
285 if !self.reserve_queue_slot() {
286 return Err(RejectedExecution::Saturated);
287 }
288 if !self.accepting.load(Ordering::Acquire) {
289 let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
290 debug_assert!(previous > 0, "fixed pool queued counter underflow");
291 return Err(RejectedExecution::Shutdown);
292 }
293 self.submitted_task_count.fetch_add(1, Ordering::Relaxed);
294 self.enqueue_job(job);
295 Ok(())
296 }
297
298 fn enqueue_job(&self, job: PoolJob) {
304 if self.use_worker_local_queues() {
305 match self.try_enqueue_to_worker(job) {
306 Ok(()) => {}
307 Err(job) => self.global_queue.push(job),
308 }
309 } else {
310 self.global_queue.push(job);
311 }
312 self.wake_one_idle_worker();
313 }
314
315 fn wake_one_idle_worker(&self) {
321 loop {
322 let idle_workers = self.idle_worker_count.load(Ordering::Acquire);
323 if idle_workers == 0 {
324 return;
325 }
326 let pending_wakes = self.pending_worker_wakes.load(Ordering::Acquire);
327 if pending_wakes >= idle_workers {
328 return;
329 }
330 if self
331 .pending_worker_wakes
332 .compare_exchange_weak(
333 pending_wakes,
334 pending_wakes + 1,
335 Ordering::AcqRel,
336 Ordering::Acquire,
337 )
338 .is_ok()
339 {
340 self.state.notify_one();
341 return;
342 }
343 }
344 }
345
346 fn has_pending_worker_wake(&self) -> bool {
353 self.pending_worker_wakes.load(Ordering::Acquire) > 0
354 }
355
356 fn consume_pending_worker_wake(&self) {
358 let mut current = self.pending_worker_wakes.load(Ordering::Acquire);
359 while current > 0 {
360 match self.pending_worker_wakes.compare_exchange_weak(
361 current,
362 current - 1,
363 Ordering::AcqRel,
364 Ordering::Acquire,
365 ) {
366 Ok(_) => return,
367 Err(actual) => current = actual,
368 }
369 }
370 }
371
372 fn try_enqueue_to_worker(&self, job: PoolJob) -> Result<(), PoolJob> {
383 let queue_count = self.worker_queues.len();
384 debug_assert!(queue_count > 0, "fixed pool must have worker queues");
385 let probe_count = queue_count.min(LOCAL_ENQUEUE_MAX_PROBES);
386 for _ in 0..probe_count {
387 let index = self.next_enqueue_worker.fetch_add(1, Ordering::Relaxed) % queue_count;
388 let queue = &self.worker_queues[index];
389 if queue.is_active() {
390 queue.push_back(job);
391 return Ok(());
392 }
393 }
394 Err(job)
395 }
396
397 fn try_take_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
412 if self.stop_now.load(Ordering::Acquire) {
413 self.cancel_worker_jobs(worker_runtime);
414 return None;
415 }
416 if !self.use_worker_local_queues() {
417 return self.steal_single_global_job(worker_runtime);
418 }
419 if let Some(job) = worker_runtime.local.pop() {
420 return self.accept_claimed_job(job, worker_runtime);
421 }
422 if let Some(job) = worker_runtime.queue.pop_inbox_into(&worker_runtime.local) {
423 return self.accept_claimed_job(job, worker_runtime);
424 }
425 if let Some(job) = self.steal_global_job(worker_runtime) {
426 return Some(job);
427 }
428 self.steal_worker_job(worker_runtime)
429 }
430
431 fn steal_global_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
441 if let Some(job) = steal_batch_and_pop(&self.global_queue, &worker_runtime.local) {
442 if !worker_runtime.local.is_empty() {
443 self.state.notify_one();
444 }
445 return self.accept_claimed_job(job, worker_runtime);
446 }
447 self.steal_single_global_job(worker_runtime)
448 }
449
450 fn steal_single_global_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
460 steal_one(&self.global_queue).and_then(|job| self.accept_claimed_job(job, worker_runtime))
461 }
462
463 fn steal_worker_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
473 let queue_count = self.worker_queues.len();
474 if queue_count <= 1 {
475 return None;
476 }
477 let worker_index = worker_runtime.worker_index();
478 let start = worker_runtime.next_steal_start(queue_count);
479 for offset in 0..queue_count {
480 let victim = &self.worker_queues[(start + offset) % queue_count];
481 if victim.worker_index() == worker_index {
482 continue;
483 }
484 if !victim.is_active() {
485 continue;
486 }
487 if let Some(job) = victim.steal_into(&worker_runtime.local) {
488 if !worker_runtime.local.is_empty() {
489 self.state.notify_one();
490 }
491 return self.accept_claimed_job(job, worker_runtime);
492 }
493 }
494 None
495 }
496
497 fn use_worker_local_queues(&self) -> bool {
505 self.pool_size <= LOCAL_QUEUE_WORKER_LIMIT
506 }
507
508 fn accept_claimed_job(&self, job: PoolJob, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
519 if self.stop_now.load(Ordering::Acquire) {
520 self.cancel_claimed_job(job);
521 self.cancel_worker_jobs(worker_runtime);
522 return None;
523 }
524 self.mark_queued_job_running();
525 Some(job)
526 }
527
528 fn cancel_worker_jobs(&self, worker_runtime: &WorkerRuntime) {
534 while let Some(job) = worker_runtime.local.pop() {
535 self.cancel_claimed_job(job);
536 }
537 for job in worker_runtime.queue.drain() {
538 self.cancel_claimed_job(job);
539 }
540 }
541
542 fn mark_queued_job_running(&self) {
544 let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
545 debug_assert!(previous > 0, "fixed pool queued counter underflow");
546 self.running_task_count.fetch_add(1, Ordering::AcqRel);
547 }
548
549 fn cancel_claimed_job(&self, job: PoolJob) {
555 let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
556 debug_assert!(previous > 0, "fixed pool queued counter underflow");
557 self.cancelled_task_count.fetch_add(1, Ordering::Relaxed);
558 job.cancel();
559 self.state.notify_all();
560 }
561
562 fn finish_running_job(&self) {
564 let previous = self.running_task_count.fetch_sub(1, Ordering::AcqRel);
565 debug_assert!(previous > 0, "fixed pool running counter underflow");
566 self.completed_task_count.fetch_add(1, Ordering::Relaxed);
567 if previous == 1 && self.queued_count() == 0 {
568 self.state.notify_all();
569 }
570 }
571
572 pub(crate) fn reserve_worker_slot(&self) {
574 self.state.write(|state| {
575 state.live_workers += 1;
576 });
577 }
578
579 pub(crate) fn rollback_worker_slot(&self) {
581 self.state.write(|state| {
582 state.live_workers = state
583 .live_workers
584 .checked_sub(1)
585 .expect("fixed pool live worker counter underflow");
586 });
587 }
588
589 pub(crate) fn stop_after_failed_build(&self) {
591 self.accepting.store(false, Ordering::Release);
592 self.stop_now.store(true, Ordering::Release);
593 self.state.write(|state| {
594 state.lifecycle = FixedThreadPoolLifecycle::Stopping;
595 });
596 self.state.notify_all();
597 }
598
599 fn wait_for_termination(&self) {
601 self.state
602 .wait_until(|state| self.is_terminated_locked(state), |_| ());
603 }
604
605 fn shutdown(&self) {
607 self.accepting.store(false, Ordering::Release);
608 self.state.write(|state| {
609 if state.lifecycle.is_running() {
610 state.lifecycle = FixedThreadPoolLifecycle::Shutdown;
611 }
612 });
613 self.state.notify_all();
614 }
615
616 fn shutdown_now(&self) -> ShutdownReport {
622 self.accepting.store(false, Ordering::Release);
623 self.stop_now.store(true, Ordering::Release);
624 let running = self.running_count();
625 let mut state = self.state.lock();
626 state.lifecycle = FixedThreadPoolLifecycle::Stopping;
627 while self.inflight_count() > 0 {
628 state = state.wait();
629 }
630 drop(state);
631 let jobs = self.drain_visible_queued_jobs();
632 let cancelled = jobs.len();
633 for job in jobs {
634 self.cancel_claimed_job(job);
635 }
636 self.state.notify_all();
637 ShutdownReport::new(cancelled, running, cancelled)
638 }
639
640 fn drain_visible_queued_jobs(&self) -> Vec<PoolJob> {
646 let mut jobs = Vec::new();
647 loop {
648 let previous_count = jobs.len();
649 self.drain_global_queue(&mut jobs);
650 self.drain_worker_queues(&mut jobs);
651 if jobs.len() == previous_count {
652 return jobs;
653 }
654 }
655 }
656
657 fn drain_global_queue(&self, jobs: &mut Vec<PoolJob>) {
663 while let Some(job) = steal_one(&self.global_queue) {
664 jobs.push(job);
665 }
666 }
667
668 fn drain_worker_queues(&self, jobs: &mut Vec<PoolJob>) {
674 for queue in &self.worker_queues {
675 jobs.extend(queue.drain());
676 }
677 }
678
679 fn is_shutdown(&self) -> bool {
685 self.state.read(|state| !state.lifecycle.is_running())
686 }
687
688 fn is_terminated(&self) -> bool {
694 self.state.read(|state| self.is_terminated_locked(state))
695 }
696
697 fn is_terminated_locked(&self, state: &FixedThreadPoolState) -> bool {
707 !state.lifecycle.is_running()
708 && state.live_workers == 0
709 && self.queued_count() == 0
710 && self.running_count() == 0
711 && self.inflight_count() == 0
712 }
713
714 fn stats(&self) -> ThreadPoolStats {
720 let queued_tasks = self.queued_count();
721 let running_tasks = self.running_count();
722 let submitted_tasks = self.submitted_task_count.load(Ordering::Relaxed);
723 let completed_tasks = self.completed_task_count.load(Ordering::Relaxed);
724 let cancelled_tasks = self.cancelled_task_count.load(Ordering::Relaxed);
725 self.state.read(|state| ThreadPoolStats {
726 core_pool_size: self.pool_size,
727 maximum_pool_size: self.pool_size,
728 live_workers: state.live_workers,
729 idle_workers: state.idle_workers,
730 queued_tasks,
731 running_tasks,
732 submitted_tasks,
733 completed_tasks,
734 cancelled_tasks,
735 shutdown: !state.lifecycle.is_running(),
736 terminated: self.is_terminated_locked(state),
737 })
738 }
739}
740
741struct FixedSubmitGuard<'a> {
743 inner: &'a FixedThreadPoolInner,
745}
746
747impl Drop for FixedSubmitGuard<'_> {
748 fn drop(&mut self) {
750 let previous = self
751 .inner
752 .inflight_submissions
753 .fetch_sub(1, Ordering::AcqRel);
754 debug_assert!(previous > 0, "fixed pool submit counter underflow");
755 if previous == 1 && !self.inner.accepting.load(Ordering::Acquire) {
756 self.inner.state.notify_all();
757 }
758 }
759}
760
761pub struct FixedThreadPool {
767 inner: Arc<FixedThreadPoolInner>,
769}
770
771impl FixedThreadPool {
772 pub(crate) fn build_with_options(
789 pool_size: usize,
790 queue_capacity: Option<usize>,
791 thread_name_prefix: String,
792 stack_size: Option<usize>,
793 ) -> Result<Self, ThreadPoolBuildError> {
794 let mut worker_runtimes = Vec::with_capacity(pool_size);
795 let mut worker_queues = Vec::with_capacity(pool_size);
796 for index in 0..pool_size {
797 let worker_runtime = WorkerRuntime::new(index);
798 worker_queues.push(Arc::clone(&worker_runtime.queue));
799 worker_runtimes.push(worker_runtime);
800 }
801 let inner = Arc::new(FixedThreadPoolInner::new(
802 pool_size,
803 queue_capacity,
804 worker_queues,
805 ));
806 for (index, worker_runtime) in worker_runtimes.into_iter().enumerate() {
807 inner.reserve_worker_slot();
808 let worker_inner = Arc::clone(&inner);
809 let mut builder =
810 std::thread::Builder::new().name(format!("{}-{}", thread_name_prefix, index));
811 if let Some(stack_size) = stack_size {
812 builder = builder.stack_size(stack_size);
813 }
814 if let Err(source) =
815 builder.spawn(move || run_fixed_worker(worker_inner, worker_runtime))
816 {
817 inner.rollback_worker_slot();
818 inner.stop_after_failed_build();
819 return Err(ThreadPoolBuildError::SpawnWorker { index, source });
820 }
821 }
822 Ok(Self { inner })
823 }
824
825 pub fn new(pool_size: usize) -> Result<Self, ThreadPoolBuildError> {
840 Self::builder().pool_size(pool_size).build()
841 }
842
843 pub fn builder() -> FixedThreadPoolBuilder {
849 FixedThreadPoolBuilder::new()
850 }
851
852 pub fn pool_size(&self) -> usize {
858 self.inner.pool_size()
859 }
860
861 pub fn queued_count(&self) -> usize {
867 self.inner.queued_count()
868 }
869
870 pub fn running_count(&self) -> usize {
876 self.inner.running_count()
877 }
878
879 pub fn live_worker_count(&self) -> usize {
885 self.inner.state.read(|state| state.live_workers)
886 }
887
888 pub fn stats(&self) -> ThreadPoolStats {
894 self.inner.stats()
895 }
896}
897
898impl Drop for FixedThreadPool {
899 fn drop(&mut self) {
901 self.inner.shutdown();
902 }
903}
904
905impl ExecutorService for FixedThreadPool {
906 type Handle<R, E>
907 = TaskHandle<R, E>
908 where
909 R: Send + 'static,
910 E: Send + 'static;
911
912 type Termination<'a>
913 = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
914 where
915 Self: 'a;
916
917 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
932 where
933 C: Callable<R, E> + Send + 'static,
934 R: Send + 'static,
935 E: Send + 'static,
936 {
937 let (handle, completion) = TaskCompletionPair::new().into_parts();
938 let job = PoolJob::from_task(task, completion);
939 self.inner.submit(job)?;
940 Ok(handle)
941 }
942
943 fn shutdown(&self) {
945 self.inner.shutdown();
946 }
947
948 fn shutdown_now(&self) -> ShutdownReport {
954 self.inner.shutdown_now()
955 }
956
957 fn is_shutdown(&self) -> bool {
963 self.inner.is_shutdown()
964 }
965
966 fn is_terminated(&self) -> bool {
972 self.inner.is_terminated()
973 }
974
975 fn await_termination(&self) -> Self::Termination<'_> {
981 Box::pin(async move {
982 self.inner.wait_for_termination();
983 })
984 }
985}
986
987fn run_fixed_worker(inner: Arc<FixedThreadPoolInner>, worker_runtime: WorkerRuntime) {
994 worker_runtime.queue.activate();
995 loop {
996 if let Some(job) = inner.try_take_job(&worker_runtime) {
997 job.run();
998 inner.finish_running_job();
999 continue;
1000 }
1001 if !wait_for_fixed_pool_work(&inner) {
1002 break;
1003 }
1004 }
1005 worker_exited(&inner, &worker_runtime.queue);
1006}
1007
1008fn wait_for_fixed_pool_work(inner: &FixedThreadPoolInner) -> bool {
1019 let mut state = inner.state.lock();
1020 loop {
1021 match state.lifecycle {
1022 FixedThreadPoolLifecycle::Running => {
1023 if inner.queued_count() > 0 {
1024 return true;
1025 }
1026 mark_fixed_worker_idle(inner, &mut state);
1027 if inner.queued_count() > 0 || inner.has_pending_worker_wake() {
1028 unmark_fixed_worker_idle(inner, &mut state);
1029 return true;
1030 }
1031 state = state.wait();
1032 unmark_fixed_worker_idle(inner, &mut state);
1033 }
1034 FixedThreadPoolLifecycle::Shutdown => {
1035 if inner.queued_count() > 0 {
1036 return true;
1037 }
1038 if inner.queued_count() == 0 && inner.inflight_count() == 0 {
1039 return false;
1040 }
1041 mark_fixed_worker_idle(inner, &mut state);
1042 if inner.queued_count() > 0
1043 || inner.inflight_count() == 0
1044 || inner.has_pending_worker_wake()
1045 {
1046 unmark_fixed_worker_idle(inner, &mut state);
1047 continue;
1048 }
1049 state = state.wait();
1050 unmark_fixed_worker_idle(inner, &mut state);
1051 }
1052 FixedThreadPoolLifecycle::Stopping => return false,
1053 }
1054 }
1055}
1056
1057fn mark_fixed_worker_idle(inner: &FixedThreadPoolInner, state: &mut FixedThreadPoolState) {
1064 state.idle_workers += 1;
1065 inner.idle_worker_count.fetch_add(1, Ordering::AcqRel);
1066}
1067
1068fn unmark_fixed_worker_idle(inner: &FixedThreadPoolInner, state: &mut FixedThreadPoolState) {
1075 state.idle_workers = state
1076 .idle_workers
1077 .checked_sub(1)
1078 .expect("fixed pool idle worker counter underflow");
1079 let previous = inner.idle_worker_count.fetch_sub(1, Ordering::AcqRel);
1080 debug_assert!(previous > 0, "fixed pool idle worker counter underflow");
1081 inner.consume_pending_worker_wake();
1082}
1083
1084fn worker_exited(inner: &FixedThreadPoolInner, worker_queue: &WorkerQueue) {
1091 worker_queue.deactivate();
1092 inner.state.write(|state| {
1093 state.live_workers = state
1094 .live_workers
1095 .checked_sub(1)
1096 .expect("fixed pool live worker counter underflow");
1097 });
1098 inner.state.notify_all();
1099}
1100
1101#[cfg(test)]
1102mod tests {
1103 use super::*;
1104 use std::sync::{
1105 Arc,
1106 atomic::{
1107 AtomicUsize,
1108 Ordering,
1109 },
1110 };
1111 use std::thread;
1112 use std::time::Duration;
1113
1114 fn counted_job(cancelled: Arc<AtomicUsize>, ran: Arc<AtomicUsize>) -> PoolJob {
1115 PoolJob::new(
1116 Box::new(move || {
1117 ran.fetch_add(1, Ordering::AcqRel);
1118 }),
1119 Box::new(move || {
1120 cancelled.fetch_add(1, Ordering::AcqRel);
1121 }),
1122 )
1123 }
1124
1125 #[test]
1126 fn test_accept_claimed_job_stop_now_cancels_claimed_and_worker_queues() {
1127 let runtime = WorkerRuntime::new(0);
1128 runtime.queue.activate();
1129 let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1130 inner.stop_now.store(true, Ordering::Release);
1131
1132 let cancelled = Arc::new(AtomicUsize::new(0));
1133 let ran = Arc::new(AtomicUsize::new(0));
1134 runtime
1135 .local
1136 .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1137 runtime
1138 .queue
1139 .push_back(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1140 inner.queued_task_count.store(3, Ordering::Release);
1141
1142 let accepted =
1143 inner.accept_claimed_job(counted_job(cancelled.clone(), ran.clone()), &runtime);
1144 assert!(accepted.is_none());
1145 assert_eq!(inner.queued_count(), 0);
1146 assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 3);
1147 assert_eq!(cancelled.load(Ordering::Acquire), 3);
1148 assert_eq!(ran.load(Ordering::Acquire), 0);
1149 }
1150
1151 #[test]
1152 fn test_steal_global_job_notifies_when_batch_leaves_local_jobs() {
1153 let runtime = WorkerRuntime::new(0);
1154 let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1155 let cancelled = Arc::new(AtomicUsize::new(0));
1156 let ran = Arc::new(AtomicUsize::new(0));
1157 runtime
1158 .local
1159 .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1160 inner
1161 .global_queue
1162 .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1163 inner.queued_task_count.store(2, Ordering::Release);
1164
1165 let claimed = inner
1166 .steal_global_job(&runtime)
1167 .expect("global queue should provide one claimed job");
1168 claimed.run();
1169 inner.finish_running_job();
1170 let remaining = runtime
1171 .local
1172 .pop()
1173 .expect("preloaded local job should remain queued");
1174 inner.cancel_claimed_job(remaining);
1175
1176 assert_eq!(inner.queued_count(), 0);
1177 assert_eq!(inner.running_count(), 0);
1178 assert_eq!(inner.completed_task_count.load(Ordering::Acquire), 1);
1179 assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 1);
1180 assert_eq!(ran.load(Ordering::Acquire), 1);
1181 assert_eq!(cancelled.load(Ordering::Acquire), 1);
1182 }
1183
1184 #[test]
1185 fn test_steal_worker_job_notifies_when_batch_leaves_local_jobs() {
1186 let thief = WorkerRuntime::new(0);
1187 let victim = WorkerRuntime::new(1);
1188 thief.queue.activate();
1189 victim.queue.activate();
1190 let inner = FixedThreadPoolInner::new(
1191 2,
1192 None,
1193 vec![Arc::clone(&thief.queue), Arc::clone(&victim.queue)],
1194 );
1195 let cancelled = Arc::new(AtomicUsize::new(0));
1196 let ran = Arc::new(AtomicUsize::new(0));
1197 thief
1198 .local
1199 .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1200 victim
1201 .queue
1202 .push_back(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1203 inner.queued_task_count.store(2, Ordering::Release);
1204
1205 let claimed = inner
1206 .steal_worker_job(&thief)
1207 .expect("victim queue should provide one claimed job");
1208 claimed.run();
1209 inner.finish_running_job();
1210 let remaining = thief
1211 .local
1212 .pop()
1213 .expect("batch steal should leave one local job");
1214 inner.cancel_claimed_job(remaining);
1215
1216 assert_eq!(inner.queued_count(), 0);
1217 assert_eq!(inner.running_count(), 0);
1218 assert_eq!(inner.completed_task_count.load(Ordering::Acquire), 1);
1219 assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 1);
1220 assert_eq!(ran.load(Ordering::Acquire), 1);
1221 assert_eq!(cancelled.load(Ordering::Acquire), 1);
1222 }
1223
1224 #[test]
1225 fn test_fixed_submit_guard_drop_notifies_when_shutdown_closes_admission() {
1226 let runtime = WorkerRuntime::new(0);
1227 let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1228 inner.inflight_submissions.store(1, Ordering::Release);
1229 inner.accepting.store(false, Ordering::Release);
1230
1231 {
1232 let guard = FixedSubmitGuard { inner: &inner };
1233 drop(guard);
1234 }
1235
1236 assert_eq!(inner.inflight_count(), 0);
1237 }
1238
1239 #[test]
1240 fn test_wait_for_fixed_pool_work_shutdown_waits_for_inflight_submissions() {
1241 let runtime = WorkerRuntime::new(0);
1242 let inner = Arc::new(FixedThreadPoolInner::new(
1243 1,
1244 None,
1245 vec![Arc::clone(&runtime.queue)],
1246 ));
1247 inner.state.write(|state| {
1248 state.lifecycle = FixedThreadPoolLifecycle::Shutdown;
1249 });
1250 inner.inflight_submissions.store(1, Ordering::Release);
1251 inner.pending_worker_wakes.store(1, Ordering::Release);
1252
1253 let inner_for_release = Arc::clone(&inner);
1254 let releaser = thread::spawn(move || {
1255 thread::sleep(Duration::from_millis(10));
1256 inner_for_release
1257 .inflight_submissions
1258 .store(0, Ordering::Release);
1259 inner_for_release.state.notify_all();
1260 });
1261
1262 assert!(!wait_for_fixed_pool_work(&inner));
1263 releaser.join().expect("releaser thread should finish");
1264 }
1265
1266 #[test]
1267 fn test_shutdown_now_waits_for_inflight_submissions() {
1268 let runtime = WorkerRuntime::new(0);
1269 let inner = Arc::new(FixedThreadPoolInner::new(
1270 1,
1271 None,
1272 vec![Arc::clone(&runtime.queue)],
1273 ));
1274 inner.inflight_submissions.store(1, Ordering::Release);
1275
1276 let inner_for_release = Arc::clone(&inner);
1277 let releaser = thread::spawn(move || {
1278 thread::sleep(Duration::from_millis(10));
1279 inner_for_release
1280 .inflight_submissions
1281 .store(0, Ordering::Release);
1282 inner_for_release.state.notify_all();
1283 });
1284
1285 let report = inner.shutdown_now();
1286 releaser.join().expect("releaser thread should finish");
1287 assert_eq!(report.running, 0);
1288 assert_eq!(report.queued, 0);
1289 assert_eq!(report.cancelled, 0);
1290 }
1291}