1use std::{
12 future::Future,
13 pin::Pin,
14 sync::{
15 Arc,
16 atomic::{
17 AtomicBool,
18 AtomicUsize,
19 Ordering,
20 },
21 },
22 thread,
23};
24
25use crossbeam_deque::Injector;
26use qubit_function::Callable;
27
28use qubit_executor::{
29 TaskCompletionPair,
30 TaskHandle,
31};
32use qubit_lock::Monitor;
33
34use super::thread_pool::{
35 ThreadPoolBuildError,
36 ThreadPoolStats,
37};
38use super::worker_queue::{
39 WorkerQueue,
40 WorkerRuntime,
41 steal_batch_and_pop,
42 steal_one,
43};
44use crate::thread_pool::PoolJob;
45use qubit_executor::service::{
46 ExecutorService,
47 RejectedExecution,
48 ShutdownReport,
49};
50
51const DEFAULT_FIXED_THREAD_NAME_PREFIX: &str = "qubit-fixed-thread-pool";
53
54const LOCAL_ENQUEUE_MAX_PROBES: usize = 4;
56const LOCAL_QUEUE_WORKER_LIMIT: usize = 4;
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61enum FixedThreadPoolLifecycle {
62 Running,
64
65 Shutdown,
67
68 Stopping,
70}
71
72impl FixedThreadPoolLifecycle {
73 const fn is_running(self) -> bool {
79 matches!(self, Self::Running)
80 }
81}
82
83struct FixedThreadPoolState {
85 lifecycle: FixedThreadPoolLifecycle,
87 live_workers: usize,
89 idle_workers: usize,
91}
92
93impl FixedThreadPoolState {
94 fn new() -> Self {
100 Self {
101 lifecycle: FixedThreadPoolLifecycle::Running,
102 live_workers: 0,
103 idle_workers: 0,
104 }
105 }
106}
107
108struct FixedThreadPoolInner {
110 pool_size: usize,
112 state: Monitor<FixedThreadPoolState>,
114 accepting: AtomicBool,
116 stop_now: AtomicBool,
118 inflight_submissions: AtomicUsize,
120 idle_worker_count: AtomicUsize,
122 pending_worker_wakes: AtomicUsize,
124 global_queue: Injector<PoolJob>,
126 worker_queues: Vec<Arc<WorkerQueue>>,
128 next_enqueue_worker: AtomicUsize,
130 queue_capacity: Option<usize>,
132 queued_task_count: AtomicUsize,
134 running_task_count: AtomicUsize,
136 submitted_task_count: AtomicUsize,
138 completed_task_count: AtomicUsize,
140 cancelled_task_count: AtomicUsize,
142}
143
144impl FixedThreadPoolInner {
145 fn new(
156 pool_size: usize,
157 queue_capacity: Option<usize>,
158 worker_queues: Vec<Arc<WorkerQueue>>,
159 ) -> Self {
160 Self {
161 pool_size,
162 state: Monitor::new(FixedThreadPoolState::new()),
163 accepting: AtomicBool::new(true),
164 stop_now: AtomicBool::new(false),
165 inflight_submissions: AtomicUsize::new(0),
166 idle_worker_count: AtomicUsize::new(0),
167 pending_worker_wakes: AtomicUsize::new(0),
168 global_queue: Injector::new(),
169 worker_queues,
170 next_enqueue_worker: AtomicUsize::new(0),
171 queue_capacity,
172 queued_task_count: AtomicUsize::new(0),
173 running_task_count: AtomicUsize::new(0),
174 submitted_task_count: AtomicUsize::new(0),
175 completed_task_count: AtomicUsize::new(0),
176 cancelled_task_count: AtomicUsize::new(0),
177 }
178 }
179
180 #[inline]
186 fn pool_size(&self) -> usize {
187 self.pool_size
188 }
189
190 #[inline]
196 fn queued_count(&self) -> usize {
197 self.queued_task_count.load(Ordering::Acquire)
198 }
199
200 #[inline]
206 fn running_count(&self) -> usize {
207 self.running_task_count.load(Ordering::Acquire)
208 }
209
210 #[inline]
216 fn inflight_count(&self) -> usize {
217 self.inflight_submissions.load(Ordering::Acquire)
218 }
219
220 fn begin_submit(&self) -> Result<FixedSubmitGuard<'_>, RejectedExecution> {
230 if !self.accepting.load(Ordering::Acquire) {
231 return Err(RejectedExecution::Shutdown);
232 }
233 self.inflight_submissions.fetch_add(1, Ordering::AcqRel);
234 if self.accepting.load(Ordering::Acquire) {
235 Ok(FixedSubmitGuard { inner: self })
236 } else {
237 let previous = self.inflight_submissions.fetch_sub(1, Ordering::AcqRel);
238 debug_assert!(previous > 0, "fixed pool submit counter underflow");
239 if previous == 1 {
240 self.state.notify_all();
241 }
242 Err(RejectedExecution::Shutdown)
243 }
244 }
245
246 fn reserve_queue_slot(&self) -> bool {
252 if let Some(capacity) = self.queue_capacity {
253 loop {
254 let current = self.queued_count();
255 if current >= capacity {
256 return false;
257 }
258 if self
259 .queued_task_count
260 .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
261 .is_ok()
262 {
263 return true;
264 }
265 }
266 }
267 self.queued_task_count.fetch_add(1, Ordering::AcqRel);
268 true
269 }
270
271 fn submit(&self, job: PoolJob) -> Result<(), RejectedExecution> {
286 let _guard = self.begin_submit()?;
287 if !self.reserve_queue_slot() {
288 return Err(RejectedExecution::Saturated);
289 }
290 if !self.accepting.load(Ordering::Acquire) {
291 let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
292 debug_assert!(previous > 0, "fixed pool queued counter underflow");
293 return Err(RejectedExecution::Shutdown);
294 }
295 self.submitted_task_count.fetch_add(1, Ordering::Relaxed);
296 self.enqueue_job(job);
297 Ok(())
298 }
299
300 fn enqueue_job(&self, job: PoolJob) {
306 if self.use_worker_local_queues() {
307 match self.try_enqueue_to_worker(job) {
308 Ok(()) => {}
309 Err(job) => self.global_queue.push(job),
310 }
311 } else {
312 self.global_queue.push(job);
313 }
314 self.wake_one_idle_worker();
315 }
316
317 fn wake_one_idle_worker(&self) {
323 loop {
324 let idle_workers = self.idle_worker_count.load(Ordering::Acquire);
325 if idle_workers == 0 {
326 return;
327 }
328 let pending_wakes = self.pending_worker_wakes.load(Ordering::Acquire);
329 if pending_wakes >= idle_workers {
330 return;
331 }
332 if self
333 .pending_worker_wakes
334 .compare_exchange_weak(
335 pending_wakes,
336 pending_wakes + 1,
337 Ordering::AcqRel,
338 Ordering::Acquire,
339 )
340 .is_ok()
341 {
342 self.state.notify_one();
343 return;
344 }
345 }
346 }
347
348 fn has_pending_worker_wake(&self) -> bool {
355 self.pending_worker_wakes.load(Ordering::Acquire) > 0
356 }
357
358 fn consume_pending_worker_wake(&self) {
360 let mut current = self.pending_worker_wakes.load(Ordering::Acquire);
361 while current > 0 {
362 match self.pending_worker_wakes.compare_exchange_weak(
363 current,
364 current - 1,
365 Ordering::AcqRel,
366 Ordering::Acquire,
367 ) {
368 Ok(_) => return,
369 Err(actual) => current = actual,
370 }
371 }
372 }
373
374 fn try_enqueue_to_worker(&self, job: PoolJob) -> Result<(), PoolJob> {
385 let queue_count = self.worker_queues.len();
386 debug_assert!(queue_count > 0, "fixed pool must have worker queues");
387 let probe_count = queue_count.min(LOCAL_ENQUEUE_MAX_PROBES);
388 for _ in 0..probe_count {
389 let index = self.next_enqueue_worker.fetch_add(1, Ordering::Relaxed) % queue_count;
390 let queue = &self.worker_queues[index];
391 if queue.is_active() {
392 queue.push_back(job);
393 return Ok(());
394 }
395 }
396 Err(job)
397 }
398
399 fn try_take_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
414 if self.stop_now.load(Ordering::Acquire) {
415 self.cancel_worker_jobs(worker_runtime);
416 return None;
417 }
418 if !self.use_worker_local_queues() {
419 return self.steal_single_global_job(worker_runtime);
420 }
421 if let Some(job) = worker_runtime.local.pop() {
422 return self.accept_claimed_job(job, worker_runtime);
423 }
424 if let Some(job) = worker_runtime.queue.pop_inbox_into(&worker_runtime.local) {
425 return self.accept_claimed_job(job, worker_runtime);
426 }
427 if let Some(job) = self.steal_global_job(worker_runtime) {
428 return Some(job);
429 }
430 self.steal_worker_job(worker_runtime)
431 }
432
433 fn steal_global_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
443 if let Some(job) = steal_batch_and_pop(&self.global_queue, &worker_runtime.local) {
444 if !worker_runtime.local.is_empty() {
445 self.state.notify_one();
446 }
447 return self.accept_claimed_job(job, worker_runtime);
448 }
449 self.steal_single_global_job(worker_runtime)
450 }
451
452 fn steal_single_global_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
462 steal_one(&self.global_queue).and_then(|job| self.accept_claimed_job(job, worker_runtime))
463 }
464
465 fn steal_worker_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
475 let queue_count = self.worker_queues.len();
476 if queue_count <= 1 {
477 return None;
478 }
479 let worker_index = worker_runtime.worker_index();
480 let start = worker_runtime.next_steal_start(queue_count);
481 for offset in 0..queue_count {
482 let victim = &self.worker_queues[(start + offset) % queue_count];
483 if victim.worker_index() == worker_index {
484 continue;
485 }
486 if !victim.is_active() {
487 continue;
488 }
489 if let Some(job) = victim.steal_into(&worker_runtime.local) {
490 if !worker_runtime.local.is_empty() {
491 self.state.notify_one();
492 }
493 return self.accept_claimed_job(job, worker_runtime);
494 }
495 }
496 None
497 }
498
499 fn use_worker_local_queues(&self) -> bool {
507 self.pool_size <= LOCAL_QUEUE_WORKER_LIMIT
508 }
509
510 fn accept_claimed_job(&self, job: PoolJob, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
521 if self.stop_now.load(Ordering::Acquire) {
522 self.cancel_claimed_job(job);
523 self.cancel_worker_jobs(worker_runtime);
524 return None;
525 }
526 self.mark_queued_job_running();
527 Some(job)
528 }
529
530 fn cancel_worker_jobs(&self, worker_runtime: &WorkerRuntime) {
536 while let Some(job) = worker_runtime.local.pop() {
537 self.cancel_claimed_job(job);
538 }
539 for job in worker_runtime.queue.drain() {
540 self.cancel_claimed_job(job);
541 }
542 }
543
544 fn mark_queued_job_running(&self) {
546 let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
547 debug_assert!(previous > 0, "fixed pool queued counter underflow");
548 self.running_task_count.fetch_add(1, Ordering::AcqRel);
549 }
550
551 fn cancel_claimed_job(&self, job: PoolJob) {
557 let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
558 debug_assert!(previous > 0, "fixed pool queued counter underflow");
559 self.cancelled_task_count.fetch_add(1, Ordering::Relaxed);
560 job.cancel();
561 self.state.notify_all();
562 }
563
564 fn finish_running_job(&self) {
566 let previous = self.running_task_count.fetch_sub(1, Ordering::AcqRel);
567 debug_assert!(previous > 0, "fixed pool running counter underflow");
568 self.completed_task_count.fetch_add(1, Ordering::Relaxed);
569 if previous == 1 && self.queued_count() == 0 {
570 self.state.notify_all();
571 }
572 }
573
574 fn reserve_worker_slot(&self) {
576 self.state.write(|state| {
577 state.live_workers += 1;
578 });
579 }
580
581 fn rollback_worker_slot(&self) {
583 self.state.write(|state| {
584 state.live_workers = state
585 .live_workers
586 .checked_sub(1)
587 .expect("fixed pool live worker counter underflow");
588 });
589 }
590
591 fn stop_after_failed_build(&self) {
593 self.accepting.store(false, Ordering::Release);
594 self.stop_now.store(true, Ordering::Release);
595 self.state.write(|state| {
596 state.lifecycle = FixedThreadPoolLifecycle::Stopping;
597 });
598 self.state.notify_all();
599 }
600
601 fn wait_for_termination(&self) {
603 self.state
604 .wait_until(|state| self.is_terminated_locked(state), |_| ());
605 }
606
607 fn shutdown(&self) {
609 self.accepting.store(false, Ordering::Release);
610 self.state.write(|state| {
611 if state.lifecycle.is_running() {
612 state.lifecycle = FixedThreadPoolLifecycle::Shutdown;
613 }
614 });
615 self.state.notify_all();
616 }
617
618 fn shutdown_now(&self) -> ShutdownReport {
624 self.accepting.store(false, Ordering::Release);
625 self.stop_now.store(true, Ordering::Release);
626 let running = self.running_count();
627 let mut state = self.state.lock();
628 state.lifecycle = FixedThreadPoolLifecycle::Stopping;
629 while self.inflight_count() > 0 {
630 state = state.wait();
631 }
632 drop(state);
633 let jobs = self.drain_visible_queued_jobs();
634 let cancelled = jobs.len();
635 for job in jobs {
636 self.cancel_claimed_job(job);
637 }
638 self.state.notify_all();
639 ShutdownReport::new(cancelled, running, cancelled)
640 }
641
642 fn drain_visible_queued_jobs(&self) -> Vec<PoolJob> {
648 let mut jobs = Vec::new();
649 loop {
650 let previous_count = jobs.len();
651 self.drain_global_queue(&mut jobs);
652 self.drain_worker_queues(&mut jobs);
653 if jobs.len() == previous_count {
654 return jobs;
655 }
656 }
657 }
658
659 fn drain_global_queue(&self, jobs: &mut Vec<PoolJob>) {
665 while let Some(job) = steal_one(&self.global_queue) {
666 jobs.push(job);
667 }
668 }
669
670 fn drain_worker_queues(&self, jobs: &mut Vec<PoolJob>) {
676 for queue in &self.worker_queues {
677 jobs.extend(queue.drain());
678 }
679 }
680
681 fn is_shutdown(&self) -> bool {
687 self.state.read(|state| !state.lifecycle.is_running())
688 }
689
690 fn is_terminated(&self) -> bool {
696 self.state.read(|state| self.is_terminated_locked(state))
697 }
698
699 fn is_terminated_locked(&self, state: &FixedThreadPoolState) -> bool {
709 !state.lifecycle.is_running()
710 && state.live_workers == 0
711 && self.queued_count() == 0
712 && self.running_count() == 0
713 && self.inflight_count() == 0
714 }
715
716 fn stats(&self) -> ThreadPoolStats {
722 let queued_tasks = self.queued_count();
723 let running_tasks = self.running_count();
724 let submitted_tasks = self.submitted_task_count.load(Ordering::Relaxed);
725 let completed_tasks = self.completed_task_count.load(Ordering::Relaxed);
726 let cancelled_tasks = self.cancelled_task_count.load(Ordering::Relaxed);
727 self.state.read(|state| ThreadPoolStats {
728 core_pool_size: self.pool_size,
729 maximum_pool_size: self.pool_size,
730 live_workers: state.live_workers,
731 idle_workers: state.idle_workers,
732 queued_tasks,
733 running_tasks,
734 submitted_tasks,
735 completed_tasks,
736 cancelled_tasks,
737 shutdown: !state.lifecycle.is_running(),
738 terminated: self.is_terminated_locked(state),
739 })
740 }
741}
742
743struct FixedSubmitGuard<'a> {
745 inner: &'a FixedThreadPoolInner,
747}
748
749impl Drop for FixedSubmitGuard<'_> {
750 fn drop(&mut self) {
752 let previous = self
753 .inner
754 .inflight_submissions
755 .fetch_sub(1, Ordering::AcqRel);
756 debug_assert!(previous > 0, "fixed pool submit counter underflow");
757 if previous == 1 && !self.inner.accepting.load(Ordering::Acquire) {
758 self.inner.state.notify_all();
759 }
760 }
761}
762
763#[derive(Debug, Clone)]
768pub struct FixedThreadPoolBuilder {
769 pool_size: usize,
771 queue_capacity: Option<usize>,
773 thread_name_prefix: String,
775 stack_size: Option<usize>,
777}
778
779impl FixedThreadPoolBuilder {
780 pub fn new() -> Self {
786 Self::default()
787 }
788
789 pub fn pool_size(mut self, pool_size: usize) -> Self {
799 self.pool_size = pool_size;
800 self
801 }
802
803 pub fn queue_capacity(mut self, capacity: usize) -> Self {
813 self.queue_capacity = Some(capacity);
814 self
815 }
816
817 pub fn unbounded_queue(mut self) -> Self {
823 self.queue_capacity = None;
824 self
825 }
826
827 pub fn thread_name_prefix(mut self, prefix: &str) -> Self {
837 self.thread_name_prefix = prefix.to_owned();
838 self
839 }
840
841 pub fn stack_size(mut self, stack_size: usize) -> Self {
851 self.stack_size = Some(stack_size);
852 self
853 }
854
855 pub fn build(self) -> Result<FixedThreadPool, ThreadPoolBuildError> {
866 self.validate()?;
867 let mut worker_runtimes = Vec::with_capacity(self.pool_size);
868 let mut worker_queues = Vec::with_capacity(self.pool_size);
869 for index in 0..self.pool_size {
870 let worker_runtime = WorkerRuntime::new(index);
871 worker_queues.push(Arc::clone(&worker_runtime.queue));
872 worker_runtimes.push(worker_runtime);
873 }
874 let inner = Arc::new(FixedThreadPoolInner::new(
875 self.pool_size,
876 self.queue_capacity,
877 worker_queues,
878 ));
879 for (index, worker_runtime) in worker_runtimes.into_iter().enumerate() {
880 inner.reserve_worker_slot();
881 let worker_inner = Arc::clone(&inner);
882 let mut builder =
883 thread::Builder::new().name(format!("{}-{}", self.thread_name_prefix, index));
884 if let Some(stack_size) = self.stack_size {
885 builder = builder.stack_size(stack_size);
886 }
887 if let Err(source) =
888 builder.spawn(move || run_fixed_worker(worker_inner, worker_runtime))
889 {
890 inner.rollback_worker_slot();
891 inner.stop_after_failed_build();
892 return Err(ThreadPoolBuildError::SpawnWorker { index, source });
893 }
894 }
895 Ok(FixedThreadPool { inner })
896 }
897
898 fn validate(&self) -> Result<(), ThreadPoolBuildError> {
909 if self.pool_size == 0 {
910 return Err(ThreadPoolBuildError::ZeroMaximumPoolSize);
911 }
912 if self.queue_capacity == Some(0) {
913 return Err(ThreadPoolBuildError::ZeroQueueCapacity);
914 }
915 if self.stack_size == Some(0) {
916 return Err(ThreadPoolBuildError::ZeroStackSize);
917 }
918 Ok(())
919 }
920}
921
922impl Default for FixedThreadPoolBuilder {
923 fn default() -> Self {
929 Self {
930 pool_size: default_fixed_pool_size(),
931 queue_capacity: None,
932 thread_name_prefix: DEFAULT_FIXED_THREAD_NAME_PREFIX.to_owned(),
933 stack_size: None,
934 }
935 }
936}
937
938pub struct FixedThreadPool {
944 inner: Arc<FixedThreadPoolInner>,
946}
947
948impl FixedThreadPool {
949 pub fn new(pool_size: usize) -> Result<Self, ThreadPoolBuildError> {
964 Self::builder().pool_size(pool_size).build()
965 }
966
967 pub fn builder() -> FixedThreadPoolBuilder {
973 FixedThreadPoolBuilder::new()
974 }
975
976 pub fn pool_size(&self) -> usize {
982 self.inner.pool_size()
983 }
984
985 pub fn queued_count(&self) -> usize {
991 self.inner.queued_count()
992 }
993
994 pub fn running_count(&self) -> usize {
1000 self.inner.running_count()
1001 }
1002
1003 pub fn live_worker_count(&self) -> usize {
1009 self.inner.state.read(|state| state.live_workers)
1010 }
1011
1012 pub fn stats(&self) -> ThreadPoolStats {
1018 self.inner.stats()
1019 }
1020}
1021
1022impl Drop for FixedThreadPool {
1023 fn drop(&mut self) {
1025 self.inner.shutdown();
1026 }
1027}
1028
1029impl ExecutorService for FixedThreadPool {
1030 type Handle<R, E>
1031 = TaskHandle<R, E>
1032 where
1033 R: Send + 'static,
1034 E: Send + 'static;
1035
1036 type Termination<'a>
1037 = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
1038 where
1039 Self: 'a;
1040
1041 fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
1056 where
1057 C: Callable<R, E> + Send + 'static,
1058 R: Send + 'static,
1059 E: Send + 'static,
1060 {
1061 let (handle, completion) = TaskCompletionPair::new().into_parts();
1062 let job = PoolJob::from_task(task, completion);
1063 self.inner.submit(job)?;
1064 Ok(handle)
1065 }
1066
1067 fn shutdown(&self) {
1069 self.inner.shutdown();
1070 }
1071
1072 fn shutdown_now(&self) -> ShutdownReport {
1078 self.inner.shutdown_now()
1079 }
1080
1081 fn is_shutdown(&self) -> bool {
1087 self.inner.is_shutdown()
1088 }
1089
1090 fn is_terminated(&self) -> bool {
1096 self.inner.is_terminated()
1097 }
1098
1099 fn await_termination(&self) -> Self::Termination<'_> {
1105 Box::pin(async move {
1106 self.inner.wait_for_termination();
1107 })
1108 }
1109}
1110
1111fn run_fixed_worker(inner: Arc<FixedThreadPoolInner>, worker_runtime: WorkerRuntime) {
1118 worker_runtime.queue.activate();
1119 loop {
1120 if let Some(job) = inner.try_take_job(&worker_runtime) {
1121 job.run();
1122 inner.finish_running_job();
1123 continue;
1124 }
1125 if !wait_for_fixed_pool_work(&inner) {
1126 break;
1127 }
1128 }
1129 worker_exited(&inner, &worker_runtime.queue);
1130}
1131
1132fn wait_for_fixed_pool_work(inner: &FixedThreadPoolInner) -> bool {
1143 let mut state = inner.state.lock();
1144 loop {
1145 match state.lifecycle {
1146 FixedThreadPoolLifecycle::Running => {
1147 if inner.queued_count() > 0 {
1148 return true;
1149 }
1150 mark_fixed_worker_idle(inner, &mut state);
1151 if inner.queued_count() > 0 || inner.has_pending_worker_wake() {
1152 unmark_fixed_worker_idle(inner, &mut state);
1153 return true;
1154 }
1155 state = state.wait();
1156 unmark_fixed_worker_idle(inner, &mut state);
1157 }
1158 FixedThreadPoolLifecycle::Shutdown => {
1159 if inner.queued_count() > 0 {
1160 return true;
1161 }
1162 if inner.queued_count() == 0 && inner.inflight_count() == 0 {
1163 return false;
1164 }
1165 mark_fixed_worker_idle(inner, &mut state);
1166 if inner.queued_count() > 0
1167 || inner.inflight_count() == 0
1168 || inner.has_pending_worker_wake()
1169 {
1170 unmark_fixed_worker_idle(inner, &mut state);
1171 continue;
1172 }
1173 state = state.wait();
1174 unmark_fixed_worker_idle(inner, &mut state);
1175 }
1176 FixedThreadPoolLifecycle::Stopping => return false,
1177 }
1178 }
1179}
1180
1181fn mark_fixed_worker_idle(inner: &FixedThreadPoolInner, state: &mut FixedThreadPoolState) {
1188 state.idle_workers += 1;
1189 inner.idle_worker_count.fetch_add(1, Ordering::AcqRel);
1190}
1191
1192fn unmark_fixed_worker_idle(inner: &FixedThreadPoolInner, state: &mut FixedThreadPoolState) {
1199 state.idle_workers = state
1200 .idle_workers
1201 .checked_sub(1)
1202 .expect("fixed pool idle worker counter underflow");
1203 let previous = inner.idle_worker_count.fetch_sub(1, Ordering::AcqRel);
1204 debug_assert!(previous > 0, "fixed pool idle worker counter underflow");
1205 inner.consume_pending_worker_wake();
1206}
1207
1208fn worker_exited(inner: &FixedThreadPoolInner, worker_queue: &WorkerQueue) {
1215 worker_queue.deactivate();
1216 inner.state.write(|state| {
1217 state.live_workers = state
1218 .live_workers
1219 .checked_sub(1)
1220 .expect("fixed pool live worker counter underflow");
1221 });
1222 inner.state.notify_all();
1223}
1224
1225fn default_fixed_pool_size() -> usize {
1231 thread::available_parallelism()
1232 .map(usize::from)
1233 .unwrap_or(1)
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238 use super::*;
1239 use std::sync::{
1240 Arc,
1241 atomic::{
1242 AtomicUsize,
1243 Ordering,
1244 },
1245 };
1246 use std::thread;
1247 use std::time::Duration;
1248
1249 fn counted_job(cancelled: Arc<AtomicUsize>, ran: Arc<AtomicUsize>) -> PoolJob {
1250 PoolJob::new(
1251 Box::new(move || {
1252 ran.fetch_add(1, Ordering::AcqRel);
1253 }),
1254 Box::new(move || {
1255 cancelled.fetch_add(1, Ordering::AcqRel);
1256 }),
1257 )
1258 }
1259
1260 #[test]
1261 fn test_accept_claimed_job_stop_now_cancels_claimed_and_worker_queues() {
1262 let runtime = WorkerRuntime::new(0);
1263 runtime.queue.activate();
1264 let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1265 inner.stop_now.store(true, Ordering::Release);
1266
1267 let cancelled = Arc::new(AtomicUsize::new(0));
1268 let ran = Arc::new(AtomicUsize::new(0));
1269 runtime
1270 .local
1271 .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1272 runtime
1273 .queue
1274 .push_back(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1275 inner.queued_task_count.store(3, Ordering::Release);
1276
1277 let accepted =
1278 inner.accept_claimed_job(counted_job(cancelled.clone(), ran.clone()), &runtime);
1279 assert!(accepted.is_none());
1280 assert_eq!(inner.queued_count(), 0);
1281 assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 3);
1282 assert_eq!(cancelled.load(Ordering::Acquire), 3);
1283 assert_eq!(ran.load(Ordering::Acquire), 0);
1284 }
1285
1286 #[test]
1287 fn test_steal_global_job_notifies_when_batch_leaves_local_jobs() {
1288 let runtime = WorkerRuntime::new(0);
1289 let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1290 let cancelled = Arc::new(AtomicUsize::new(0));
1291 let ran = Arc::new(AtomicUsize::new(0));
1292 runtime
1293 .local
1294 .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1295 inner
1296 .global_queue
1297 .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1298 inner.queued_task_count.store(2, Ordering::Release);
1299
1300 let claimed = inner
1301 .steal_global_job(&runtime)
1302 .expect("global queue should provide one claimed job");
1303 claimed.run();
1304 inner.finish_running_job();
1305 let remaining = runtime
1306 .local
1307 .pop()
1308 .expect("preloaded local job should remain queued");
1309 inner.cancel_claimed_job(remaining);
1310
1311 assert_eq!(inner.queued_count(), 0);
1312 assert_eq!(inner.running_count(), 0);
1313 assert_eq!(inner.completed_task_count.load(Ordering::Acquire), 1);
1314 assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 1);
1315 assert_eq!(ran.load(Ordering::Acquire), 1);
1316 assert_eq!(cancelled.load(Ordering::Acquire), 1);
1317 }
1318
1319 #[test]
1320 fn test_steal_worker_job_notifies_when_batch_leaves_local_jobs() {
1321 let thief = WorkerRuntime::new(0);
1322 let victim = WorkerRuntime::new(1);
1323 thief.queue.activate();
1324 victim.queue.activate();
1325 let inner = FixedThreadPoolInner::new(
1326 2,
1327 None,
1328 vec![Arc::clone(&thief.queue), Arc::clone(&victim.queue)],
1329 );
1330 let cancelled = Arc::new(AtomicUsize::new(0));
1331 let ran = Arc::new(AtomicUsize::new(0));
1332 thief
1333 .local
1334 .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1335 victim
1336 .queue
1337 .push_back(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1338 inner.queued_task_count.store(2, Ordering::Release);
1339
1340 let claimed = inner
1341 .steal_worker_job(&thief)
1342 .expect("victim queue should provide one claimed job");
1343 claimed.run();
1344 inner.finish_running_job();
1345 let remaining = thief
1346 .local
1347 .pop()
1348 .expect("batch steal should leave one local job");
1349 inner.cancel_claimed_job(remaining);
1350
1351 assert_eq!(inner.queued_count(), 0);
1352 assert_eq!(inner.running_count(), 0);
1353 assert_eq!(inner.completed_task_count.load(Ordering::Acquire), 1);
1354 assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 1);
1355 assert_eq!(ran.load(Ordering::Acquire), 1);
1356 assert_eq!(cancelled.load(Ordering::Acquire), 1);
1357 }
1358
1359 #[test]
1360 fn test_fixed_submit_guard_drop_notifies_when_shutdown_closes_admission() {
1361 let runtime = WorkerRuntime::new(0);
1362 let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1363 inner.inflight_submissions.store(1, Ordering::Release);
1364 inner.accepting.store(false, Ordering::Release);
1365
1366 {
1367 let guard = FixedSubmitGuard { inner: &inner };
1368 drop(guard);
1369 }
1370
1371 assert_eq!(inner.inflight_count(), 0);
1372 }
1373
1374 #[test]
1375 fn test_wait_for_fixed_pool_work_shutdown_waits_for_inflight_submissions() {
1376 let runtime = WorkerRuntime::new(0);
1377 let inner = Arc::new(FixedThreadPoolInner::new(
1378 1,
1379 None,
1380 vec![Arc::clone(&runtime.queue)],
1381 ));
1382 inner.state.write(|state| {
1383 state.lifecycle = FixedThreadPoolLifecycle::Shutdown;
1384 });
1385 inner.inflight_submissions.store(1, Ordering::Release);
1386 inner.pending_worker_wakes.store(1, Ordering::Release);
1387
1388 let inner_for_release = Arc::clone(&inner);
1389 let releaser = thread::spawn(move || {
1390 thread::sleep(Duration::from_millis(10));
1391 inner_for_release
1392 .inflight_submissions
1393 .store(0, Ordering::Release);
1394 inner_for_release.state.notify_all();
1395 });
1396
1397 assert!(!wait_for_fixed_pool_work(&inner));
1398 releaser.join().expect("releaser thread should finish");
1399 }
1400
1401 #[test]
1402 fn test_shutdown_now_waits_for_inflight_submissions() {
1403 let runtime = WorkerRuntime::new(0);
1404 let inner = Arc::new(FixedThreadPoolInner::new(
1405 1,
1406 None,
1407 vec![Arc::clone(&runtime.queue)],
1408 ));
1409 inner.inflight_submissions.store(1, Ordering::Release);
1410
1411 let inner_for_release = Arc::clone(&inner);
1412 let releaser = thread::spawn(move || {
1413 thread::sleep(Duration::from_millis(10));
1414 inner_for_release
1415 .inflight_submissions
1416 .store(0, Ordering::Release);
1417 inner_for_release.state.notify_all();
1418 });
1419
1420 let report = inner.shutdown_now();
1421 releaser.join().expect("releaser thread should finish");
1422 assert_eq!(report.running, 0);
1423 assert_eq!(report.queued, 0);
1424 assert_eq!(report.cancelled, 0);
1425 }
1426}