Skip to main content

qubit_thread_pool/
fixed_thread_pool.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10// qubit-style: allow inline-tests
11// qubit-style: allow explicit-imports
12use std::{
13    future::Future,
14    pin::Pin,
15    sync::{
16        Arc,
17        atomic::{AtomicBool, AtomicUsize, Ordering},
18    },
19};
20
21use crossbeam_deque::Injector;
22use qubit_function::Callable;
23
24use qubit_executor::{TaskCompletionPair, TaskHandle};
25use qubit_lock::Monitor;
26
27use super::fixed_thread_pool_builder::FixedThreadPoolBuilder;
28use super::queue_steal_source::{steal_batch_and_pop, steal_one};
29use super::thread_pool::{ThreadPoolBuildError, ThreadPoolStats};
30use super::worker_queue::WorkerQueue;
31use super::worker_runtime::WorkerRuntime;
32use crate::thread_pool::PoolJob;
33use qubit_executor::service::{ExecutorService, RejectedExecution, ShutdownReport};
34
35/// Maximum number of worker-local queues probed by one submit call.
36const LOCAL_ENQUEUE_MAX_PROBES: usize = 4;
37/// Maximum worker count that uses worker-local batch queues.
38const LOCAL_QUEUE_WORKER_LIMIT: usize = 4;
39
40/// Lifecycle state for a fixed-size thread pool.
41#[derive(Debug, Clone, Copy, PartialEq, Eq)]
42enum FixedThreadPoolLifecycle {
43    /// The pool accepts new tasks and workers wait for queued work.
44    Running,
45
46    /// The pool rejects new tasks but drains queued work.
47    Shutdown,
48
49    /// The pool rejects new tasks and cancels queued work.
50    Stopping,
51}
52
53impl FixedThreadPoolLifecycle {
54    /// Returns whether this lifecycle still accepts submissions.
55    ///
56    /// # Returns
57    ///
58    /// `true` only while the pool is running.
59    const fn is_running(self) -> bool {
60        matches!(self, Self::Running)
61    }
62}
63
64/// Mutable state protected by the fixed pool monitor.
65struct FixedThreadPoolState {
66    /// Current lifecycle state.
67    lifecycle: FixedThreadPoolLifecycle,
68    /// Number of worker loops that have not exited.
69    live_workers: usize,
70    /// Number of workers currently blocked waiting for work.
71    idle_workers: usize,
72}
73
74impl FixedThreadPoolState {
75    /// Creates an empty running state.
76    ///
77    /// # Returns
78    ///
79    /// A running state before any worker has been reserved.
80    fn new() -> Self {
81        Self {
82            lifecycle: FixedThreadPoolLifecycle::Running,
83            live_workers: 0,
84            idle_workers: 0,
85        }
86    }
87}
88
89/// Shared state for a fixed-size thread pool.
90struct FixedThreadPoolInner {
91    /// Number of workers in this fixed pool.
92    pool_size: usize,
93    /// Mutable lifecycle and worker counters.
94    state: Monitor<FixedThreadPoolState>,
95    /// Admission gate used by submitters.
96    accepting: AtomicBool,
97    /// Whether immediate shutdown has requested workers to stop taking jobs.
98    stop_now: AtomicBool,
99    /// Submit calls that have passed the first admission check.
100    inflight_submissions: AtomicUsize,
101    /// Number of workers currently blocked or about to block waiting for work.
102    idle_worker_count: AtomicUsize,
103    /// Number of idle-worker wakeups already requested but not yet consumed.
104    pending_worker_wakes: AtomicUsize,
105    /// Lock-free queue for externally submitted jobs.
106    global_queue: Injector<PoolJob>,
107    /// Worker-local queues used for submit routing and work stealing.
108    worker_queues: Vec<Arc<WorkerQueue>>,
109    /// Round-robin cursor used for submit-path local queue selection.
110    next_enqueue_worker: AtomicUsize,
111    /// Optional maximum number of queued jobs.
112    queue_capacity: Option<usize>,
113    /// Number of queued jobs not yet started or cancelled.
114    queued_task_count: AtomicUsize,
115    /// Number of jobs currently running.
116    running_task_count: AtomicUsize,
117    /// Total number of accepted jobs.
118    submitted_task_count: AtomicUsize,
119    /// Total number of finished worker-held jobs.
120    completed_task_count: AtomicUsize,
121    /// Total number of queued jobs cancelled by immediate shutdown.
122    cancelled_task_count: AtomicUsize,
123}
124
125impl FixedThreadPoolInner {
126    /// Creates shared state for a fixed-size pool.
127    ///
128    /// # Parameters
129    ///
130    /// * `pool_size` - Number of workers that will be prestarted.
131    /// * `queue_capacity` - Optional queue capacity.
132    ///
133    /// # Returns
134    ///
135    /// A shared state object ready for worker startup.
136    fn new(
137        pool_size: usize,
138        queue_capacity: Option<usize>,
139        worker_queues: Vec<Arc<WorkerQueue>>,
140    ) -> Self {
141        Self {
142            pool_size,
143            state: Monitor::new(FixedThreadPoolState::new()),
144            accepting: AtomicBool::new(true),
145            stop_now: AtomicBool::new(false),
146            inflight_submissions: AtomicUsize::new(0),
147            idle_worker_count: AtomicUsize::new(0),
148            pending_worker_wakes: AtomicUsize::new(0),
149            global_queue: Injector::new(),
150            worker_queues,
151            next_enqueue_worker: AtomicUsize::new(0),
152            queue_capacity,
153            queued_task_count: AtomicUsize::new(0),
154            running_task_count: AtomicUsize::new(0),
155            submitted_task_count: AtomicUsize::new(0),
156            completed_task_count: AtomicUsize::new(0),
157            cancelled_task_count: AtomicUsize::new(0),
158        }
159    }
160
161    /// Returns the fixed worker count.
162    ///
163    /// # Returns
164    ///
165    /// Number of workers owned by this pool.
166    #[inline]
167    fn pool_size(&self) -> usize {
168        self.pool_size
169    }
170
171    /// Returns the queued task count.
172    ///
173    /// # Returns
174    ///
175    /// Number of accepted tasks waiting to run.
176    #[inline]
177    fn queued_count(&self) -> usize {
178        self.queued_task_count.load(Ordering::Acquire)
179    }
180
181    /// Returns the running task count.
182    ///
183    /// # Returns
184    ///
185    /// Number of tasks currently held by workers.
186    #[inline]
187    fn running_count(&self) -> usize {
188        self.running_task_count.load(Ordering::Acquire)
189    }
190
191    /// Returns the number of in-flight submit calls.
192    ///
193    /// # Returns
194    ///
195    /// Number of submit calls that may still publish or roll back a queued job.
196    #[inline]
197    fn inflight_count(&self) -> usize {
198        self.inflight_submissions.load(Ordering::Acquire)
199    }
200
201    /// Attempts to enter submit admission.
202    ///
203    /// # Returns
204    ///
205    /// A guard that leaves admission on drop.
206    ///
207    /// # Errors
208    ///
209    /// Returns [`RejectedExecution::Shutdown`] when admission is closed.
210    fn begin_submit(&self) -> Result<FixedSubmitGuard<'_>, RejectedExecution> {
211        if !self.accepting.load(Ordering::Acquire) {
212            return Err(RejectedExecution::Shutdown);
213        }
214        self.inflight_submissions.fetch_add(1, Ordering::AcqRel);
215        if self.accepting.load(Ordering::Acquire) {
216            Ok(FixedSubmitGuard { inner: self })
217        } else {
218            let previous = self.inflight_submissions.fetch_sub(1, Ordering::AcqRel);
219            debug_assert!(previous > 0, "fixed pool submit counter underflow");
220            if previous == 1 {
221                self.state.notify_all();
222            }
223            Err(RejectedExecution::Shutdown)
224        }
225    }
226
227    /// Attempts to reserve one queue slot.
228    ///
229    /// # Returns
230    ///
231    /// `true` if one queued slot was reserved, otherwise `false`.
232    fn reserve_queue_slot(&self) -> bool {
233        if let Some(capacity) = self.queue_capacity {
234            loop {
235                let current = self.queued_count();
236                if current >= capacity {
237                    return false;
238                }
239                if self
240                    .queued_task_count
241                    .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
242                    .is_ok()
243                {
244                    return true;
245                }
246            }
247        }
248        self.queued_task_count.fetch_add(1, Ordering::AcqRel);
249        true
250    }
251
252    /// Submits one job to this fixed pool.
253    ///
254    /// # Parameters
255    ///
256    /// * `job` - Type-erased job accepted by the pool.
257    ///
258    /// # Returns
259    ///
260    /// `Ok(())` when the job is accepted.
261    ///
262    /// # Errors
263    ///
264    /// Returns [`RejectedExecution::Shutdown`] after shutdown or
265    /// [`RejectedExecution::Saturated`] when the bounded queue is full.
266    fn submit(&self, job: PoolJob) -> Result<(), RejectedExecution> {
267        let _guard = self.begin_submit()?;
268        if !self.reserve_queue_slot() {
269            return Err(RejectedExecution::Saturated);
270        }
271        if !self.accepting.load(Ordering::Acquire) {
272            let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
273            debug_assert!(previous > 0, "fixed pool queued counter underflow");
274            return Err(RejectedExecution::Shutdown);
275        }
276        self.submitted_task_count.fetch_add(1, Ordering::Relaxed);
277        self.enqueue_job(job);
278        Ok(())
279    }
280
281    /// Enqueues one accepted job to a worker inbox or the global fallback.
282    ///
283    /// # Parameters
284    ///
285    /// * `job` - Job whose queued slot has already been reserved.
286    fn enqueue_job(&self, job: PoolJob) {
287        if self.use_worker_local_queues() {
288            match self.try_enqueue_to_worker(job) {
289                Ok(()) => {}
290                Err(job) => self.global_queue.push(job),
291            }
292        } else {
293            self.global_queue.push(job);
294        }
295        self.wake_one_idle_worker();
296    }
297
298    /// Wakes one idle worker if no already-requested wakeup covers it.
299    ///
300    /// Pending wake tokens close the lost-notification window: a worker that
301    /// has marked itself idle but has not yet parked will observe the token and
302    /// retry work without relying on the condition-variable notification.
303    fn wake_one_idle_worker(&self) {
304        loop {
305            let idle_workers = self.idle_worker_count.load(Ordering::Acquire);
306            if idle_workers == 0 {
307                return;
308            }
309            let pending_wakes = self.pending_worker_wakes.load(Ordering::Acquire);
310            if pending_wakes >= idle_workers {
311                return;
312            }
313            if self
314                .pending_worker_wakes
315                .compare_exchange_weak(
316                    pending_wakes,
317                    pending_wakes + 1,
318                    Ordering::AcqRel,
319                    Ordering::Acquire,
320                )
321                .is_ok()
322            {
323                self.state.notify_one();
324                return;
325            }
326        }
327    }
328
329    /// Returns whether an idle-worker wakeup has been requested.
330    ///
331    /// # Returns
332    ///
333    /// `true` when at least one idle worker should leave the wait path and
334    /// retry taking work.
335    fn has_pending_worker_wake(&self) -> bool {
336        self.pending_worker_wakes.load(Ordering::Acquire) > 0
337    }
338
339    /// Consumes one requested idle-worker wakeup if one exists.
340    fn consume_pending_worker_wake(&self) {
341        let mut current = self.pending_worker_wakes.load(Ordering::Acquire);
342        while current > 0 {
343            match self.pending_worker_wakes.compare_exchange_weak(
344                current,
345                current - 1,
346                Ordering::AcqRel,
347                Ordering::Acquire,
348            ) {
349                Ok(_) => return,
350                Err(actual) => current = actual,
351            }
352        }
353    }
354
355    /// Attempts to route one job directly to an active worker queue.
356    ///
357    /// # Parameters
358    ///
359    /// * `job` - Job to route.
360    ///
361    /// # Returns
362    ///
363    /// `Ok(())` when the job was published to a worker inbox; otherwise the
364    /// original job is returned for global fallback.
365    fn try_enqueue_to_worker(&self, job: PoolJob) -> Result<(), PoolJob> {
366        let queue_count = self.worker_queues.len();
367        debug_assert!(queue_count > 0, "fixed pool must have worker queues");
368        let probe_count = queue_count.min(LOCAL_ENQUEUE_MAX_PROBES);
369        for _ in 0..probe_count {
370            let index = self.next_enqueue_worker.fetch_add(1, Ordering::Relaxed) % queue_count;
371            let queue = &self.worker_queues[index];
372            if queue.is_active() {
373                queue.push_back(job);
374                return Ok(());
375            }
376        }
377        Err(job)
378    }
379
380    /// Attempts to claim one queued job for a worker.
381    ///
382    /// The worker first checks its local queue, then its cross-thread inbox,
383    /// then the global fallback queue, and finally steals from other workers.
384    /// This matches the dynamic pool's hot path and avoids forcing all fixed
385    /// workers through one global injector under skewed workloads.
386    ///
387    /// # Parameters
388    ///
389    /// * `worker_runtime` - Queue runtime owned by the current worker.
390    ///
391    /// # Returns
392    ///
393    /// `Some(job)` when a job was claimed, otherwise `None`.
394    fn try_take_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
395        if self.stop_now.load(Ordering::Acquire) {
396            self.cancel_worker_jobs(worker_runtime);
397            return None;
398        }
399        if !self.use_worker_local_queues() {
400            return self.steal_single_global_job(worker_runtime);
401        }
402        if let Some(job) = worker_runtime.local.pop() {
403            return self.accept_claimed_job(job, worker_runtime);
404        }
405        if let Some(job) = worker_runtime.queue.pop_inbox_into(&worker_runtime.local) {
406            return self.accept_claimed_job(job, worker_runtime);
407        }
408        if let Some(job) = self.steal_global_job(worker_runtime) {
409            return Some(job);
410        }
411        self.steal_worker_job(worker_runtime)
412    }
413
414    /// Attempts to batch-steal one job from the global injector.
415    ///
416    /// # Parameters
417    ///
418    /// * `worker_runtime` - Queue runtime receiving any stolen batch remainder.
419    ///
420    /// # Returns
421    ///
422    /// `Some(job)` when a job was claimed, otherwise `None`.
423    fn steal_global_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
424        if let Some(job) = steal_batch_and_pop(&self.global_queue, &worker_runtime.local) {
425            if !worker_runtime.local.is_empty() {
426                self.state.notify_one();
427            }
428            return self.accept_claimed_job(job, worker_runtime);
429        }
430        self.steal_single_global_job(worker_runtime)
431    }
432
433    /// Attempts to steal exactly one job from the global injector.
434    ///
435    /// # Parameters
436    ///
437    /// * `worker_runtime` - Queue runtime owned by the current worker.
438    ///
439    /// # Returns
440    ///
441    /// `Some(job)` when a job was claimed, otherwise `None`.
442    fn steal_single_global_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
443        steal_one(&self.global_queue).and_then(|job| self.accept_claimed_job(job, worker_runtime))
444    }
445
446    /// Attempts to steal one job from another worker's local queue.
447    ///
448    /// # Parameters
449    ///
450    /// * `worker_runtime` - Queue runtime owned by the current worker.
451    ///
452    /// # Returns
453    ///
454    /// `Some(job)` when a job was claimed, otherwise `None`.
455    fn steal_worker_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
456        let queue_count = self.worker_queues.len();
457        if queue_count <= 1 {
458            return None;
459        }
460        let worker_index = worker_runtime.worker_index();
461        let start = worker_runtime.next_steal_start(queue_count);
462        for offset in 0..queue_count {
463            let victim = &self.worker_queues[(start + offset) % queue_count];
464            if victim.worker_index() == worker_index {
465                continue;
466            }
467            if !victim.is_active() {
468                continue;
469            }
470            if let Some(job) = victim.steal_into(&worker_runtime.local) {
471                if !worker_runtime.local.is_empty() {
472                    self.state.notify_one();
473                }
474                return self.accept_claimed_job(job, worker_runtime);
475            }
476        }
477        None
478    }
479
480    /// Returns whether this pool should use worker-local queues.
481    ///
482    /// # Returns
483    ///
484    /// `true` for small fixed pools where local batching reduces global queue
485    /// contention; `false` for larger pools where inbox routing and victim
486    /// scans cost more than they save.
487    fn use_worker_local_queues(&self) -> bool {
488        self.pool_size <= LOCAL_QUEUE_WORKER_LIMIT
489    }
490
491    /// Accepts a claimed queued job or cancels it after immediate shutdown.
492    ///
493    /// # Parameters
494    ///
495    /// * `job` - Job claimed from a queue.
496    /// * `worker_runtime` - Queue runtime drained if stopping.
497    ///
498    /// # Returns
499    ///
500    /// `Some(job)` when the job may run, otherwise `None`.
501    fn accept_claimed_job(&self, job: PoolJob, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
502        if self.stop_now.load(Ordering::Acquire) {
503            self.cancel_claimed_job(job);
504            self.cancel_worker_jobs(worker_runtime);
505            return None;
506        }
507        self.mark_queued_job_running();
508        Some(job)
509    }
510
511    /// Cancels all jobs remaining in one worker runtime.
512    ///
513    /// # Parameters
514    ///
515    /// * `worker_runtime` - Worker-owned runtime to drain.
516    fn cancel_worker_jobs(&self, worker_runtime: &WorkerRuntime) {
517        while let Some(job) = worker_runtime.local.pop() {
518            self.cancel_claimed_job(job);
519        }
520        for job in worker_runtime.queue.drain() {
521            self.cancel_claimed_job(job);
522        }
523    }
524
525    /// Marks one claimed queued job as running.
526    fn mark_queued_job_running(&self) {
527        let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
528        debug_assert!(previous > 0, "fixed pool queued counter underflow");
529        self.running_task_count.fetch_add(1, Ordering::AcqRel);
530    }
531
532    /// Cancels one job claimed after immediate shutdown started.
533    ///
534    /// # Parameters
535    ///
536    /// * `job` - Queued job that must not be run.
537    fn cancel_claimed_job(&self, job: PoolJob) {
538        let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
539        debug_assert!(previous > 0, "fixed pool queued counter underflow");
540        self.cancelled_task_count.fetch_add(1, Ordering::Relaxed);
541        job.cancel();
542        self.state.notify_all();
543    }
544
545    /// Marks one running job as finished.
546    fn finish_running_job(&self) {
547        let previous = self.running_task_count.fetch_sub(1, Ordering::AcqRel);
548        debug_assert!(previous > 0, "fixed pool running counter underflow");
549        self.completed_task_count.fetch_add(1, Ordering::Relaxed);
550        if previous == 1 && self.queued_count() == 0 {
551            self.state.notify_all();
552        }
553    }
554
555    /// Reserves one worker slot before spawning a worker thread.
556    pub(crate) fn reserve_worker_slot(&self) {
557        self.state.write(|state| {
558            state.live_workers += 1;
559        });
560    }
561
562    /// Rolls back one worker slot after spawn failure.
563    pub(crate) fn rollback_worker_slot(&self) {
564        self.state.write(|state| {
565            state.live_workers = state
566                .live_workers
567                .checked_sub(1)
568                .expect("fixed pool live worker counter underflow");
569        });
570    }
571
572    /// Stops the pool after a build-time worker spawn failure.
573    pub(crate) fn stop_after_failed_build(&self) {
574        self.accepting.store(false, Ordering::Release);
575        self.stop_now.store(true, Ordering::Release);
576        self.state.write(|state| {
577            state.lifecycle = FixedThreadPoolLifecycle::Stopping;
578        });
579        self.state.notify_all();
580    }
581
582    /// Blocks until the pool is fully terminated.
583    fn wait_for_termination(&self) {
584        self.state
585            .wait_until(|state| self.is_terminated_locked(state), |_| ());
586    }
587
588    /// Requests graceful shutdown.
589    fn shutdown(&self) {
590        self.accepting.store(false, Ordering::Release);
591        self.state.write(|state| {
592            if state.lifecycle.is_running() {
593                state.lifecycle = FixedThreadPoolLifecycle::Shutdown;
594            }
595        });
596        self.state.notify_all();
597    }
598
599    /// Requests immediate shutdown and cancels visible queued jobs.
600    ///
601    /// # Returns
602    ///
603    /// Count-based shutdown report.
604    fn shutdown_now(&self) -> ShutdownReport {
605        self.accepting.store(false, Ordering::Release);
606        self.stop_now.store(true, Ordering::Release);
607        let running = self.running_count();
608        let mut state = self.state.lock();
609        state.lifecycle = FixedThreadPoolLifecycle::Stopping;
610        while self.inflight_count() > 0 {
611            state = state.wait();
612        }
613        drop(state);
614        let jobs = self.drain_visible_queued_jobs();
615        let cancelled = jobs.len();
616        for job in jobs {
617            self.cancel_claimed_job(job);
618        }
619        self.state.notify_all();
620        ShutdownReport::new(cancelled, running, cancelled)
621    }
622
623    /// Drains all jobs currently visible in global and worker-local queues.
624    ///
625    /// # Returns
626    ///
627    /// Drained queued jobs.
628    fn drain_visible_queued_jobs(&self) -> Vec<PoolJob> {
629        let mut jobs = Vec::new();
630        loop {
631            let previous_count = jobs.len();
632            self.drain_global_queue(&mut jobs);
633            self.drain_worker_queues(&mut jobs);
634            if jobs.len() == previous_count {
635                return jobs;
636            }
637        }
638    }
639
640    /// Drains visible jobs from the global injector.
641    ///
642    /// # Parameters
643    ///
644    /// * `jobs` - Destination for drained jobs.
645    fn drain_global_queue(&self, jobs: &mut Vec<PoolJob>) {
646        while let Some(job) = steal_one(&self.global_queue) {
647            jobs.push(job);
648        }
649    }
650
651    /// Drains visible jobs from all worker-local queues.
652    ///
653    /// # Parameters
654    ///
655    /// * `jobs` - Destination for drained jobs.
656    fn drain_worker_queues(&self, jobs: &mut Vec<PoolJob>) {
657        for queue in &self.worker_queues {
658            jobs.extend(queue.drain());
659        }
660    }
661
662    /// Returns whether shutdown has started.
663    ///
664    /// # Returns
665    ///
666    /// `true` when lifecycle is not running.
667    fn is_shutdown(&self) -> bool {
668        self.state.read(|state| !state.lifecycle.is_running())
669    }
670
671    /// Returns whether the pool is terminated.
672    ///
673    /// # Returns
674    ///
675    /// `true` after shutdown and after all workers and jobs are gone.
676    fn is_terminated(&self) -> bool {
677        self.state.read(|state| self.is_terminated_locked(state))
678    }
679
680    /// Checks termination against one locked state snapshot.
681    ///
682    /// # Parameters
683    ///
684    /// * `state` - Locked state snapshot.
685    ///
686    /// # Returns
687    ///
688    /// `true` when the pool is terminal.
689    fn is_terminated_locked(&self, state: &FixedThreadPoolState) -> bool {
690        !state.lifecycle.is_running()
691            && state.live_workers == 0
692            && self.queued_count() == 0
693            && self.running_count() == 0
694            && self.inflight_count() == 0
695    }
696
697    /// Returns a point-in-time stats snapshot.
698    ///
699    /// # Returns
700    ///
701    /// Snapshot using fixed pool size for both core and maximum sizes.
702    fn stats(&self) -> ThreadPoolStats {
703        let queued_tasks = self.queued_count();
704        let running_tasks = self.running_count();
705        let submitted_tasks = self.submitted_task_count.load(Ordering::Relaxed);
706        let completed_tasks = self.completed_task_count.load(Ordering::Relaxed);
707        let cancelled_tasks = self.cancelled_task_count.load(Ordering::Relaxed);
708        self.state.read(|state| ThreadPoolStats {
709            core_pool_size: self.pool_size,
710            maximum_pool_size: self.pool_size,
711            live_workers: state.live_workers,
712            idle_workers: state.idle_workers,
713            queued_tasks,
714            running_tasks,
715            submitted_tasks,
716            completed_tasks,
717            cancelled_tasks,
718            shutdown: !state.lifecycle.is_running(),
719            terminated: self.is_terminated_locked(state),
720        })
721    }
722}
723
724/// Submit guard that leaves in-flight accounting on drop.
725struct FixedSubmitGuard<'a> {
726    /// Pool whose in-flight counter was entered.
727    inner: &'a FixedThreadPoolInner,
728}
729
730impl Drop for FixedSubmitGuard<'_> {
731    /// Leaves submit accounting and wakes shutdown waiters if needed.
732    fn drop(&mut self) {
733        let previous = self
734            .inner
735            .inflight_submissions
736            .fetch_sub(1, Ordering::AcqRel);
737        debug_assert!(previous > 0, "fixed pool submit counter underflow");
738        if previous == 1 && !self.inner.accepting.load(Ordering::Acquire) {
739            self.inner.state.notify_all();
740        }
741    }
742}
743
744/// Fixed-size thread pool implementing [`ExecutorService`].
745///
746/// `FixedThreadPool` prestarts a fixed number of worker threads and does not
747/// support runtime pool-size changes. Use [`super::ThreadPool`] when dynamic
748/// core/maximum sizes or keep-alive policies are required.
749pub struct FixedThreadPool {
750    /// Shared fixed pool state.
751    inner: Arc<FixedThreadPoolInner>,
752}
753
754impl FixedThreadPool {
755    /// Builds a fixed pool from validated builder options.
756    ///
757    /// # Parameters
758    ///
759    /// * `pool_size` - Number of workers to prestart.
760    /// * `queue_capacity` - Optional maximum queued task count.
761    /// * `thread_name_prefix` - Prefix used for worker thread names.
762    /// * `stack_size` - Optional worker stack size.
763    ///
764    /// # Returns
765    ///
766    /// A fixed thread-pool handle with workers already started.
767    ///
768    /// # Errors
769    ///
770    /// Returns [`ThreadPoolBuildError`] when a worker thread cannot be spawned.
771    pub(crate) fn build_with_options(
772        pool_size: usize,
773        queue_capacity: Option<usize>,
774        thread_name_prefix: String,
775        stack_size: Option<usize>,
776    ) -> Result<Self, ThreadPoolBuildError> {
777        let mut worker_runtimes = Vec::with_capacity(pool_size);
778        let mut worker_queues = Vec::with_capacity(pool_size);
779        for index in 0..pool_size {
780            let worker_runtime = WorkerRuntime::new(index);
781            worker_queues.push(Arc::clone(&worker_runtime.queue));
782            worker_runtimes.push(worker_runtime);
783        }
784        let inner = Arc::new(FixedThreadPoolInner::new(
785            pool_size,
786            queue_capacity,
787            worker_queues,
788        ));
789        for (index, worker_runtime) in worker_runtimes.into_iter().enumerate() {
790            inner.reserve_worker_slot();
791            let worker_inner = Arc::clone(&inner);
792            let mut builder =
793                std::thread::Builder::new().name(format!("{}-{}", thread_name_prefix, index));
794            if let Some(stack_size) = stack_size {
795                builder = builder.stack_size(stack_size);
796            }
797            if let Err(source) =
798                builder.spawn(move || run_fixed_worker(worker_inner, worker_runtime))
799            {
800                inner.rollback_worker_slot();
801                inner.stop_after_failed_build();
802                return Err(ThreadPoolBuildError::SpawnWorker { index, source });
803            }
804        }
805        Ok(Self { inner })
806    }
807
808    /// Creates a fixed thread pool with `pool_size` prestarted workers.
809    ///
810    /// # Parameters
811    ///
812    /// * `pool_size` - Number of worker threads.
813    ///
814    /// # Returns
815    ///
816    /// A fixed thread pool.
817    ///
818    /// # Errors
819    ///
820    /// Returns [`ThreadPoolBuildError`] if the worker count is zero or a worker
821    /// cannot be spawned.
822    pub fn new(pool_size: usize) -> Result<Self, ThreadPoolBuildError> {
823        Self::builder().pool_size(pool_size).build()
824    }
825
826    /// Creates a fixed pool builder.
827    ///
828    /// # Returns
829    ///
830    /// Builder with CPU parallelism defaults.
831    pub fn builder() -> FixedThreadPoolBuilder {
832        FixedThreadPoolBuilder::new()
833    }
834
835    /// Returns the fixed worker count.
836    ///
837    /// # Returns
838    ///
839    /// Number of workers in this pool.
840    pub fn pool_size(&self) -> usize {
841        self.inner.pool_size()
842    }
843
844    /// Returns the queued task count.
845    ///
846    /// # Returns
847    ///
848    /// Number of accepted tasks waiting to run.
849    pub fn queued_count(&self) -> usize {
850        self.inner.queued_count()
851    }
852
853    /// Returns the running task count.
854    ///
855    /// # Returns
856    ///
857    /// Number of tasks currently held by workers.
858    pub fn running_count(&self) -> usize {
859        self.inner.running_count()
860    }
861
862    /// Returns the live worker count.
863    ///
864    /// # Returns
865    ///
866    /// Number of worker loops that have not exited.
867    pub fn live_worker_count(&self) -> usize {
868        self.inner.state.read(|state| state.live_workers)
869    }
870
871    /// Returns a point-in-time stats snapshot.
872    ///
873    /// # Returns
874    ///
875    /// Snapshot containing queue, worker, and lifecycle counters.
876    pub fn stats(&self) -> ThreadPoolStats {
877        self.inner.stats()
878    }
879}
880
881impl Drop for FixedThreadPool {
882    /// Requests graceful shutdown when the pool handle is dropped.
883    fn drop(&mut self) {
884        self.inner.shutdown();
885    }
886}
887
888impl ExecutorService for FixedThreadPool {
889    type Handle<R, E>
890        = TaskHandle<R, E>
891    where
892        R: Send + 'static,
893        E: Send + 'static;
894
895    type Termination<'a>
896        = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
897    where
898        Self: 'a;
899
900    /// Accepts a callable and queues it for fixed pool workers.
901    ///
902    /// # Parameters
903    ///
904    /// * `task` - Callable to execute on a fixed pool worker.
905    ///
906    /// # Returns
907    ///
908    /// A [`TaskHandle`] for the accepted task.
909    ///
910    /// # Errors
911    ///
912    /// Returns [`RejectedExecution::Shutdown`] after shutdown or
913    /// [`RejectedExecution::Saturated`] when a bounded queue is full.
914    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
915    where
916        C: Callable<R, E> + Send + 'static,
917        R: Send + 'static,
918        E: Send + 'static,
919    {
920        let (handle, completion) = TaskCompletionPair::new().into_parts();
921        let job = PoolJob::from_task(task, completion);
922        self.inner.submit(job)?;
923        Ok(handle)
924    }
925
926    /// Stops accepting new work and drains accepted queued tasks.
927    fn shutdown(&self) {
928        self.inner.shutdown();
929    }
930
931    /// Stops accepting work and cancels queued tasks.
932    ///
933    /// # Returns
934    ///
935    /// A count-based shutdown report.
936    fn shutdown_now(&self) -> ShutdownReport {
937        self.inner.shutdown_now()
938    }
939
940    /// Returns whether shutdown has been requested.
941    ///
942    /// # Returns
943    ///
944    /// `true` when this pool no longer accepts new work.
945    fn is_shutdown(&self) -> bool {
946        self.inner.is_shutdown()
947    }
948
949    /// Returns whether this pool is fully terminated.
950    ///
951    /// # Returns
952    ///
953    /// `true` after shutdown and after all workers have exited.
954    fn is_terminated(&self) -> bool {
955        self.inner.is_terminated()
956    }
957
958    /// Waits until this fixed pool has terminated.
959    ///
960    /// # Returns
961    ///
962    /// A future that blocks the polling thread until termination.
963    fn await_termination(&self) -> Self::Termination<'_> {
964        Box::pin(async move {
965            self.inner.wait_for_termination();
966        })
967    }
968}
969
970/// Runs one fixed-pool worker loop.
971///
972/// # Parameters
973///
974/// * `inner` - Shared fixed-pool state.
975/// * `worker_runtime` - Queue runtime owned by this worker.
976fn run_fixed_worker(inner: Arc<FixedThreadPoolInner>, worker_runtime: WorkerRuntime) {
977    worker_runtime.queue.activate();
978    loop {
979        if let Some(job) = inner.try_take_job(&worker_runtime) {
980            job.run();
981            inner.finish_running_job();
982            continue;
983        }
984        if !wait_for_fixed_pool_work(&inner) {
985            break;
986        }
987    }
988    worker_exited(&inner, &worker_runtime.queue);
989}
990
991/// Waits until visible work exists or the worker should exit.
992///
993/// # Parameters
994///
995/// * `inner` - Shared fixed-pool state.
996///
997/// # Returns
998///
999/// `true` when the worker should try to take work again, or `false` when it
1000/// should exit.
1001fn wait_for_fixed_pool_work(inner: &FixedThreadPoolInner) -> bool {
1002    let mut state = inner.state.lock();
1003    loop {
1004        match state.lifecycle {
1005            FixedThreadPoolLifecycle::Running => {
1006                if inner.queued_count() > 0 {
1007                    return true;
1008                }
1009                mark_fixed_worker_idle(inner, &mut state);
1010                if inner.queued_count() > 0 || inner.has_pending_worker_wake() {
1011                    unmark_fixed_worker_idle(inner, &mut state);
1012                    return true;
1013                }
1014                state = state.wait();
1015                unmark_fixed_worker_idle(inner, &mut state);
1016            }
1017            FixedThreadPoolLifecycle::Shutdown => {
1018                if inner.queued_count() > 0 {
1019                    return true;
1020                }
1021                if inner.queued_count() == 0 && inner.inflight_count() == 0 {
1022                    return false;
1023                }
1024                mark_fixed_worker_idle(inner, &mut state);
1025                if inner.queued_count() > 0
1026                    || inner.inflight_count() == 0
1027                    || inner.has_pending_worker_wake()
1028                {
1029                    unmark_fixed_worker_idle(inner, &mut state);
1030                    continue;
1031                }
1032                state = state.wait();
1033                unmark_fixed_worker_idle(inner, &mut state);
1034            }
1035            FixedThreadPoolLifecycle::Stopping => return false,
1036        }
1037    }
1038}
1039
1040/// Marks a fixed-pool worker as idle in locked and lock-free state.
1041///
1042/// # Parameters
1043///
1044/// * `inner` - Fixed pool whose idle counter is updated.
1045/// * `state` - Locked mutable state containing authoritative idle workers.
1046fn mark_fixed_worker_idle(inner: &FixedThreadPoolInner, state: &mut FixedThreadPoolState) {
1047    state.idle_workers += 1;
1048    inner.idle_worker_count.fetch_add(1, Ordering::AcqRel);
1049}
1050
1051/// Marks a fixed-pool worker as no longer idle.
1052///
1053/// # Parameters
1054///
1055/// * `inner` - Fixed pool whose idle counter is updated.
1056/// * `state` - Locked mutable state containing authoritative idle workers.
1057fn unmark_fixed_worker_idle(inner: &FixedThreadPoolInner, state: &mut FixedThreadPoolState) {
1058    state.idle_workers = state
1059        .idle_workers
1060        .checked_sub(1)
1061        .expect("fixed pool idle worker counter underflow");
1062    let previous = inner.idle_worker_count.fetch_sub(1, Ordering::AcqRel);
1063    debug_assert!(previous > 0, "fixed pool idle worker counter underflow");
1064    inner.consume_pending_worker_wake();
1065}
1066
1067/// Marks one fixed-pool worker as exited.
1068///
1069/// # Parameters
1070///
1071/// * `inner` - Shared fixed-pool state.
1072/// * `worker_queue` - Queue owned by the exiting worker.
1073fn worker_exited(inner: &FixedThreadPoolInner, worker_queue: &WorkerQueue) {
1074    worker_queue.deactivate();
1075    inner.state.write(|state| {
1076        state.live_workers = state
1077            .live_workers
1078            .checked_sub(1)
1079            .expect("fixed pool live worker counter underflow");
1080    });
1081    inner.state.notify_all();
1082}
1083
1084#[cfg(test)]
1085mod tests {
1086    use super::*;
1087    use std::sync::{
1088        Arc,
1089        atomic::{AtomicUsize, Ordering},
1090    };
1091    use std::thread;
1092    use std::time::Duration;
1093
1094    fn counted_job(cancelled: Arc<AtomicUsize>, ran: Arc<AtomicUsize>) -> PoolJob {
1095        PoolJob::new(
1096            Box::new(move || {
1097                ran.fetch_add(1, Ordering::AcqRel);
1098            }),
1099            Box::new(move || {
1100                cancelled.fetch_add(1, Ordering::AcqRel);
1101            }),
1102        )
1103    }
1104
1105    #[test]
1106    fn test_accept_claimed_job_stop_now_cancels_claimed_and_worker_queues() {
1107        let runtime = WorkerRuntime::new(0);
1108        runtime.queue.activate();
1109        let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1110        inner.stop_now.store(true, Ordering::Release);
1111
1112        let cancelled = Arc::new(AtomicUsize::new(0));
1113        let ran = Arc::new(AtomicUsize::new(0));
1114        runtime
1115            .local
1116            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1117        runtime
1118            .queue
1119            .push_back(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1120        inner.queued_task_count.store(3, Ordering::Release);
1121
1122        let accepted =
1123            inner.accept_claimed_job(counted_job(cancelled.clone(), ran.clone()), &runtime);
1124        assert!(accepted.is_none());
1125        assert_eq!(inner.queued_count(), 0);
1126        assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 3);
1127        assert_eq!(cancelled.load(Ordering::Acquire), 3);
1128        assert_eq!(ran.load(Ordering::Acquire), 0);
1129    }
1130
1131    #[test]
1132    fn test_steal_global_job_notifies_when_batch_leaves_local_jobs() {
1133        let runtime = WorkerRuntime::new(0);
1134        let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1135        let cancelled = Arc::new(AtomicUsize::new(0));
1136        let ran = Arc::new(AtomicUsize::new(0));
1137        runtime
1138            .local
1139            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1140        inner
1141            .global_queue
1142            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1143        inner.queued_task_count.store(2, Ordering::Release);
1144
1145        let claimed = inner
1146            .steal_global_job(&runtime)
1147            .expect("global queue should provide one claimed job");
1148        claimed.run();
1149        inner.finish_running_job();
1150        let remaining = runtime
1151            .local
1152            .pop()
1153            .expect("preloaded local job should remain queued");
1154        inner.cancel_claimed_job(remaining);
1155
1156        assert_eq!(inner.queued_count(), 0);
1157        assert_eq!(inner.running_count(), 0);
1158        assert_eq!(inner.completed_task_count.load(Ordering::Acquire), 1);
1159        assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 1);
1160        assert_eq!(ran.load(Ordering::Acquire), 1);
1161        assert_eq!(cancelled.load(Ordering::Acquire), 1);
1162    }
1163
1164    #[test]
1165    fn test_steal_worker_job_notifies_when_batch_leaves_local_jobs() {
1166        let thief = WorkerRuntime::new(0);
1167        let victim = WorkerRuntime::new(1);
1168        thief.queue.activate();
1169        victim.queue.activate();
1170        let inner = FixedThreadPoolInner::new(
1171            2,
1172            None,
1173            vec![Arc::clone(&thief.queue), Arc::clone(&victim.queue)],
1174        );
1175        let cancelled = Arc::new(AtomicUsize::new(0));
1176        let ran = Arc::new(AtomicUsize::new(0));
1177        thief
1178            .local
1179            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1180        victim
1181            .queue
1182            .push_back(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1183        inner.queued_task_count.store(2, Ordering::Release);
1184
1185        let claimed = inner
1186            .steal_worker_job(&thief)
1187            .expect("victim queue should provide one claimed job");
1188        claimed.run();
1189        inner.finish_running_job();
1190        let remaining = thief
1191            .local
1192            .pop()
1193            .expect("batch steal should leave one local job");
1194        inner.cancel_claimed_job(remaining);
1195
1196        assert_eq!(inner.queued_count(), 0);
1197        assert_eq!(inner.running_count(), 0);
1198        assert_eq!(inner.completed_task_count.load(Ordering::Acquire), 1);
1199        assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 1);
1200        assert_eq!(ran.load(Ordering::Acquire), 1);
1201        assert_eq!(cancelled.load(Ordering::Acquire), 1);
1202    }
1203
1204    #[test]
1205    fn test_fixed_submit_guard_drop_notifies_when_shutdown_closes_admission() {
1206        let runtime = WorkerRuntime::new(0);
1207        let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1208        inner.inflight_submissions.store(1, Ordering::Release);
1209        inner.accepting.store(false, Ordering::Release);
1210
1211        {
1212            let guard = FixedSubmitGuard { inner: &inner };
1213            drop(guard);
1214        }
1215
1216        assert_eq!(inner.inflight_count(), 0);
1217    }
1218
1219    #[test]
1220    fn test_wait_for_fixed_pool_work_shutdown_waits_for_inflight_submissions() {
1221        let runtime = WorkerRuntime::new(0);
1222        let inner = Arc::new(FixedThreadPoolInner::new(
1223            1,
1224            None,
1225            vec![Arc::clone(&runtime.queue)],
1226        ));
1227        inner.state.write(|state| {
1228            state.lifecycle = FixedThreadPoolLifecycle::Shutdown;
1229        });
1230        inner.inflight_submissions.store(1, Ordering::Release);
1231        inner.pending_worker_wakes.store(1, Ordering::Release);
1232
1233        let inner_for_release = Arc::clone(&inner);
1234        let releaser = thread::spawn(move || {
1235            thread::sleep(Duration::from_millis(10));
1236            inner_for_release
1237                .inflight_submissions
1238                .store(0, Ordering::Release);
1239            inner_for_release.state.notify_all();
1240        });
1241
1242        assert!(!wait_for_fixed_pool_work(&inner));
1243        releaser.join().expect("releaser thread should finish");
1244    }
1245
1246    #[test]
1247    fn test_shutdown_now_waits_for_inflight_submissions() {
1248        let runtime = WorkerRuntime::new(0);
1249        let inner = Arc::new(FixedThreadPoolInner::new(
1250            1,
1251            None,
1252            vec![Arc::clone(&runtime.queue)],
1253        ));
1254        inner.inflight_submissions.store(1, Ordering::Release);
1255
1256        let inner_for_release = Arc::clone(&inner);
1257        let releaser = thread::spawn(move || {
1258            thread::sleep(Duration::from_millis(10));
1259            inner_for_release
1260                .inflight_submissions
1261                .store(0, Ordering::Release);
1262            inner_for_release.state.notify_all();
1263        });
1264
1265        let report = inner.shutdown_now();
1266        releaser.join().expect("releaser thread should finish");
1267        assert_eq!(report.running, 0);
1268        assert_eq!(report.queued, 0);
1269        assert_eq!(report.cancelled, 0);
1270    }
1271}