Skip to main content

qubit_thread_pool/
fixed_thread_pool.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026 Haixing Hu.
4 *
5 *    SPDX-License-Identifier: Apache-2.0
6 *
7 *    Licensed under the Apache License, Version 2.0.
8 *
9 ******************************************************************************/
10// qubit-style: allow inline-tests
11// qubit-style: allow explicit-imports
12use std::{
13    future::Future,
14    pin::Pin,
15    sync::{
16        Arc,
17        atomic::{
18            AtomicBool,
19            AtomicUsize,
20            Ordering,
21        },
22    },
23};
24
25use crossbeam_deque::Injector;
26use qubit_function::Callable;
27
28use qubit_executor::{
29    TaskCompletionPair,
30    TaskHandle,
31};
32use qubit_lock::Monitor;
33
34use super::fixed_thread_pool_builder::FixedThreadPoolBuilder;
35use super::queue_steal_source::{
36    steal_batch_and_pop,
37    steal_one,
38};
39use super::thread_pool::{
40    ThreadPoolBuildError,
41    ThreadPoolStats,
42};
43use super::worker_queue::WorkerQueue;
44use super::worker_runtime::WorkerRuntime;
45use crate::thread_pool::PoolJob;
46use qubit_executor::service::{
47    ExecutorService,
48    RejectedExecution,
49    ShutdownReport,
50};
51
52/// Maximum number of worker-local queues probed by one submit call.
53const LOCAL_ENQUEUE_MAX_PROBES: usize = 4;
54/// Maximum worker count that uses worker-local batch queues.
55const LOCAL_QUEUE_WORKER_LIMIT: usize = 4;
56
57/// Lifecycle state for a fixed-size thread pool.
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59enum FixedThreadPoolLifecycle {
60    /// The pool accepts new tasks and workers wait for queued work.
61    Running,
62
63    /// The pool rejects new tasks but drains queued work.
64    Shutdown,
65
66    /// The pool rejects new tasks and cancels queued work.
67    Stopping,
68}
69
70impl FixedThreadPoolLifecycle {
71    /// Returns whether this lifecycle still accepts submissions.
72    ///
73    /// # Returns
74    ///
75    /// `true` only while the pool is running.
76    const fn is_running(self) -> bool {
77        matches!(self, Self::Running)
78    }
79}
80
81/// Mutable state protected by the fixed pool monitor.
82struct FixedThreadPoolState {
83    /// Current lifecycle state.
84    lifecycle: FixedThreadPoolLifecycle,
85    /// Number of worker loops that have not exited.
86    live_workers: usize,
87    /// Number of workers currently blocked waiting for work.
88    idle_workers: usize,
89}
90
91impl FixedThreadPoolState {
92    /// Creates an empty running state.
93    ///
94    /// # Returns
95    ///
96    /// A running state before any worker has been reserved.
97    fn new() -> Self {
98        Self {
99            lifecycle: FixedThreadPoolLifecycle::Running,
100            live_workers: 0,
101            idle_workers: 0,
102        }
103    }
104}
105
106/// Shared state for a fixed-size thread pool.
107struct FixedThreadPoolInner {
108    /// Number of workers in this fixed pool.
109    pool_size: usize,
110    /// Mutable lifecycle and worker counters.
111    state: Monitor<FixedThreadPoolState>,
112    /// Admission gate used by submitters.
113    accepting: AtomicBool,
114    /// Whether immediate shutdown has requested workers to stop taking jobs.
115    stop_now: AtomicBool,
116    /// Submit calls that have passed the first admission check.
117    inflight_submissions: AtomicUsize,
118    /// Number of workers currently blocked or about to block waiting for work.
119    idle_worker_count: AtomicUsize,
120    /// Number of idle-worker wakeups already requested but not yet consumed.
121    pending_worker_wakes: AtomicUsize,
122    /// Lock-free queue for externally submitted jobs.
123    global_queue: Injector<PoolJob>,
124    /// Worker-local queues used for submit routing and work stealing.
125    worker_queues: Vec<Arc<WorkerQueue>>,
126    /// Round-robin cursor used for submit-path local queue selection.
127    next_enqueue_worker: AtomicUsize,
128    /// Optional maximum number of queued jobs.
129    queue_capacity: Option<usize>,
130    /// Number of queued jobs not yet started or cancelled.
131    queued_task_count: AtomicUsize,
132    /// Number of jobs currently running.
133    running_task_count: AtomicUsize,
134    /// Total number of accepted jobs.
135    submitted_task_count: AtomicUsize,
136    /// Total number of finished worker-held jobs.
137    completed_task_count: AtomicUsize,
138    /// Total number of queued jobs cancelled by immediate shutdown.
139    cancelled_task_count: AtomicUsize,
140}
141
142impl FixedThreadPoolInner {
143    /// Creates shared state for a fixed-size pool.
144    ///
145    /// # Parameters
146    ///
147    /// * `pool_size` - Number of workers that will be prestarted.
148    /// * `queue_capacity` - Optional queue capacity.
149    ///
150    /// # Returns
151    ///
152    /// A shared state object ready for worker startup.
153    fn new(
154        pool_size: usize,
155        queue_capacity: Option<usize>,
156        worker_queues: Vec<Arc<WorkerQueue>>,
157    ) -> Self {
158        Self {
159            pool_size,
160            state: Monitor::new(FixedThreadPoolState::new()),
161            accepting: AtomicBool::new(true),
162            stop_now: AtomicBool::new(false),
163            inflight_submissions: AtomicUsize::new(0),
164            idle_worker_count: AtomicUsize::new(0),
165            pending_worker_wakes: AtomicUsize::new(0),
166            global_queue: Injector::new(),
167            worker_queues,
168            next_enqueue_worker: AtomicUsize::new(0),
169            queue_capacity,
170            queued_task_count: AtomicUsize::new(0),
171            running_task_count: AtomicUsize::new(0),
172            submitted_task_count: AtomicUsize::new(0),
173            completed_task_count: AtomicUsize::new(0),
174            cancelled_task_count: AtomicUsize::new(0),
175        }
176    }
177
178    /// Returns the fixed worker count.
179    ///
180    /// # Returns
181    ///
182    /// Number of workers owned by this pool.
183    #[inline]
184    fn pool_size(&self) -> usize {
185        self.pool_size
186    }
187
188    /// Returns the queued task count.
189    ///
190    /// # Returns
191    ///
192    /// Number of accepted tasks waiting to run.
193    #[inline]
194    fn queued_count(&self) -> usize {
195        self.queued_task_count.load(Ordering::Acquire)
196    }
197
198    /// Returns the running task count.
199    ///
200    /// # Returns
201    ///
202    /// Number of tasks currently held by workers.
203    #[inline]
204    fn running_count(&self) -> usize {
205        self.running_task_count.load(Ordering::Acquire)
206    }
207
208    /// Returns the number of in-flight submit calls.
209    ///
210    /// # Returns
211    ///
212    /// Number of submit calls that may still publish or roll back a queued job.
213    #[inline]
214    fn inflight_count(&self) -> usize {
215        self.inflight_submissions.load(Ordering::Acquire)
216    }
217
218    /// Attempts to enter submit admission.
219    ///
220    /// # Returns
221    ///
222    /// A guard that leaves admission on drop.
223    ///
224    /// # Errors
225    ///
226    /// Returns [`RejectedExecution::Shutdown`] when admission is closed.
227    fn begin_submit(&self) -> Result<FixedSubmitGuard<'_>, RejectedExecution> {
228        if !self.accepting.load(Ordering::Acquire) {
229            return Err(RejectedExecution::Shutdown);
230        }
231        self.inflight_submissions.fetch_add(1, Ordering::AcqRel);
232        if self.accepting.load(Ordering::Acquire) {
233            Ok(FixedSubmitGuard { inner: self })
234        } else {
235            let previous = self.inflight_submissions.fetch_sub(1, Ordering::AcqRel);
236            debug_assert!(previous > 0, "fixed pool submit counter underflow");
237            if previous == 1 {
238                self.state.notify_all();
239            }
240            Err(RejectedExecution::Shutdown)
241        }
242    }
243
244    /// Attempts to reserve one queue slot.
245    ///
246    /// # Returns
247    ///
248    /// `true` if one queued slot was reserved, otherwise `false`.
249    fn reserve_queue_slot(&self) -> bool {
250        if let Some(capacity) = self.queue_capacity {
251            loop {
252                let current = self.queued_count();
253                if current >= capacity {
254                    return false;
255                }
256                if self
257                    .queued_task_count
258                    .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
259                    .is_ok()
260                {
261                    return true;
262                }
263            }
264        }
265        self.queued_task_count.fetch_add(1, Ordering::AcqRel);
266        true
267    }
268
269    /// Submits one job to this fixed pool.
270    ///
271    /// # Parameters
272    ///
273    /// * `job` - Type-erased job accepted by the pool.
274    ///
275    /// # Returns
276    ///
277    /// `Ok(())` when the job is accepted.
278    ///
279    /// # Errors
280    ///
281    /// Returns [`RejectedExecution::Shutdown`] after shutdown or
282    /// [`RejectedExecution::Saturated`] when the bounded queue is full.
283    fn submit(&self, job: PoolJob) -> Result<(), RejectedExecution> {
284        let _guard = self.begin_submit()?;
285        if !self.reserve_queue_slot() {
286            return Err(RejectedExecution::Saturated);
287        }
288        if !self.accepting.load(Ordering::Acquire) {
289            let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
290            debug_assert!(previous > 0, "fixed pool queued counter underflow");
291            return Err(RejectedExecution::Shutdown);
292        }
293        self.submitted_task_count.fetch_add(1, Ordering::Relaxed);
294        self.enqueue_job(job);
295        Ok(())
296    }
297
298    /// Enqueues one accepted job to a worker inbox or the global fallback.
299    ///
300    /// # Parameters
301    ///
302    /// * `job` - Job whose queued slot has already been reserved.
303    fn enqueue_job(&self, job: PoolJob) {
304        if self.use_worker_local_queues() {
305            match self.try_enqueue_to_worker(job) {
306                Ok(()) => {}
307                Err(job) => self.global_queue.push(job),
308            }
309        } else {
310            self.global_queue.push(job);
311        }
312        self.wake_one_idle_worker();
313    }
314
315    /// Wakes one idle worker if no already-requested wakeup covers it.
316    ///
317    /// Pending wake tokens close the lost-notification window: a worker that
318    /// has marked itself idle but has not yet parked will observe the token and
319    /// retry work without relying on the condition-variable notification.
320    fn wake_one_idle_worker(&self) {
321        loop {
322            let idle_workers = self.idle_worker_count.load(Ordering::Acquire);
323            if idle_workers == 0 {
324                return;
325            }
326            let pending_wakes = self.pending_worker_wakes.load(Ordering::Acquire);
327            if pending_wakes >= idle_workers {
328                return;
329            }
330            if self
331                .pending_worker_wakes
332                .compare_exchange_weak(
333                    pending_wakes,
334                    pending_wakes + 1,
335                    Ordering::AcqRel,
336                    Ordering::Acquire,
337                )
338                .is_ok()
339            {
340                self.state.notify_one();
341                return;
342            }
343        }
344    }
345
346    /// Returns whether an idle-worker wakeup has been requested.
347    ///
348    /// # Returns
349    ///
350    /// `true` when at least one idle worker should leave the wait path and
351    /// retry taking work.
352    fn has_pending_worker_wake(&self) -> bool {
353        self.pending_worker_wakes.load(Ordering::Acquire) > 0
354    }
355
356    /// Consumes one requested idle-worker wakeup if one exists.
357    fn consume_pending_worker_wake(&self) {
358        let mut current = self.pending_worker_wakes.load(Ordering::Acquire);
359        while current > 0 {
360            match self.pending_worker_wakes.compare_exchange_weak(
361                current,
362                current - 1,
363                Ordering::AcqRel,
364                Ordering::Acquire,
365            ) {
366                Ok(_) => return,
367                Err(actual) => current = actual,
368            }
369        }
370    }
371
372    /// Attempts to route one job directly to an active worker queue.
373    ///
374    /// # Parameters
375    ///
376    /// * `job` - Job to route.
377    ///
378    /// # Returns
379    ///
380    /// `Ok(())` when the job was published to a worker inbox; otherwise the
381    /// original job is returned for global fallback.
382    fn try_enqueue_to_worker(&self, job: PoolJob) -> Result<(), PoolJob> {
383        let queue_count = self.worker_queues.len();
384        debug_assert!(queue_count > 0, "fixed pool must have worker queues");
385        let probe_count = queue_count.min(LOCAL_ENQUEUE_MAX_PROBES);
386        for _ in 0..probe_count {
387            let index = self.next_enqueue_worker.fetch_add(1, Ordering::Relaxed) % queue_count;
388            let queue = &self.worker_queues[index];
389            if queue.is_active() {
390                queue.push_back(job);
391                return Ok(());
392            }
393        }
394        Err(job)
395    }
396
397    /// Attempts to claim one queued job for a worker.
398    ///
399    /// The worker first checks its local queue, then its cross-thread inbox,
400    /// then the global fallback queue, and finally steals from other workers.
401    /// This matches the dynamic pool's hot path and avoids forcing all fixed
402    /// workers through one global injector under skewed workloads.
403    ///
404    /// # Parameters
405    ///
406    /// * `worker_runtime` - Queue runtime owned by the current worker.
407    ///
408    /// # Returns
409    ///
410    /// `Some(job)` when a job was claimed, otherwise `None`.
411    fn try_take_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
412        if self.stop_now.load(Ordering::Acquire) {
413            self.cancel_worker_jobs(worker_runtime);
414            return None;
415        }
416        if !self.use_worker_local_queues() {
417            return self.steal_single_global_job(worker_runtime);
418        }
419        if let Some(job) = worker_runtime.local.pop() {
420            return self.accept_claimed_job(job, worker_runtime);
421        }
422        if let Some(job) = worker_runtime.queue.pop_inbox_into(&worker_runtime.local) {
423            return self.accept_claimed_job(job, worker_runtime);
424        }
425        if let Some(job) = self.steal_global_job(worker_runtime) {
426            return Some(job);
427        }
428        self.steal_worker_job(worker_runtime)
429    }
430
431    /// Attempts to batch-steal one job from the global injector.
432    ///
433    /// # Parameters
434    ///
435    /// * `worker_runtime` - Queue runtime receiving any stolen batch remainder.
436    ///
437    /// # Returns
438    ///
439    /// `Some(job)` when a job was claimed, otherwise `None`.
440    fn steal_global_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
441        if let Some(job) = steal_batch_and_pop(&self.global_queue, &worker_runtime.local) {
442            if !worker_runtime.local.is_empty() {
443                self.state.notify_one();
444            }
445            return self.accept_claimed_job(job, worker_runtime);
446        }
447        self.steal_single_global_job(worker_runtime)
448    }
449
450    /// Attempts to steal exactly one job from the global injector.
451    ///
452    /// # Parameters
453    ///
454    /// * `worker_runtime` - Queue runtime owned by the current worker.
455    ///
456    /// # Returns
457    ///
458    /// `Some(job)` when a job was claimed, otherwise `None`.
459    fn steal_single_global_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
460        steal_one(&self.global_queue).and_then(|job| self.accept_claimed_job(job, worker_runtime))
461    }
462
463    /// Attempts to steal one job from another worker's local queue.
464    ///
465    /// # Parameters
466    ///
467    /// * `worker_runtime` - Queue runtime owned by the current worker.
468    ///
469    /// # Returns
470    ///
471    /// `Some(job)` when a job was claimed, otherwise `None`.
472    fn steal_worker_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
473        let queue_count = self.worker_queues.len();
474        if queue_count <= 1 {
475            return None;
476        }
477        let worker_index = worker_runtime.worker_index();
478        let start = worker_runtime.next_steal_start(queue_count);
479        for offset in 0..queue_count {
480            let victim = &self.worker_queues[(start + offset) % queue_count];
481            if victim.worker_index() == worker_index {
482                continue;
483            }
484            if !victim.is_active() {
485                continue;
486            }
487            if let Some(job) = victim.steal_into(&worker_runtime.local) {
488                if !worker_runtime.local.is_empty() {
489                    self.state.notify_one();
490                }
491                return self.accept_claimed_job(job, worker_runtime);
492            }
493        }
494        None
495    }
496
497    /// Returns whether this pool should use worker-local queues.
498    ///
499    /// # Returns
500    ///
501    /// `true` for small fixed pools where local batching reduces global queue
502    /// contention; `false` for larger pools where inbox routing and victim
503    /// scans cost more than they save.
504    fn use_worker_local_queues(&self) -> bool {
505        self.pool_size <= LOCAL_QUEUE_WORKER_LIMIT
506    }
507
508    /// Accepts a claimed queued job or cancels it after immediate shutdown.
509    ///
510    /// # Parameters
511    ///
512    /// * `job` - Job claimed from a queue.
513    /// * `worker_runtime` - Queue runtime drained if stopping.
514    ///
515    /// # Returns
516    ///
517    /// `Some(job)` when the job may run, otherwise `None`.
518    fn accept_claimed_job(&self, job: PoolJob, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
519        if self.stop_now.load(Ordering::Acquire) {
520            self.cancel_claimed_job(job);
521            self.cancel_worker_jobs(worker_runtime);
522            return None;
523        }
524        self.mark_queued_job_running();
525        Some(job)
526    }
527
528    /// Cancels all jobs remaining in one worker runtime.
529    ///
530    /// # Parameters
531    ///
532    /// * `worker_runtime` - Worker-owned runtime to drain.
533    fn cancel_worker_jobs(&self, worker_runtime: &WorkerRuntime) {
534        while let Some(job) = worker_runtime.local.pop() {
535            self.cancel_claimed_job(job);
536        }
537        for job in worker_runtime.queue.drain() {
538            self.cancel_claimed_job(job);
539        }
540    }
541
542    /// Marks one claimed queued job as running.
543    fn mark_queued_job_running(&self) {
544        let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
545        debug_assert!(previous > 0, "fixed pool queued counter underflow");
546        self.running_task_count.fetch_add(1, Ordering::AcqRel);
547    }
548
549    /// Cancels one job claimed after immediate shutdown started.
550    ///
551    /// # Parameters
552    ///
553    /// * `job` - Queued job that must not be run.
554    fn cancel_claimed_job(&self, job: PoolJob) {
555        let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
556        debug_assert!(previous > 0, "fixed pool queued counter underflow");
557        self.cancelled_task_count.fetch_add(1, Ordering::Relaxed);
558        job.cancel();
559        self.state.notify_all();
560    }
561
562    /// Marks one running job as finished.
563    fn finish_running_job(&self) {
564        let previous = self.running_task_count.fetch_sub(1, Ordering::AcqRel);
565        debug_assert!(previous > 0, "fixed pool running counter underflow");
566        self.completed_task_count.fetch_add(1, Ordering::Relaxed);
567        if previous == 1 && self.queued_count() == 0 {
568            self.state.notify_all();
569        }
570    }
571
572    /// Reserves one worker slot before spawning a worker thread.
573    pub(crate) fn reserve_worker_slot(&self) {
574        self.state.write(|state| {
575            state.live_workers += 1;
576        });
577    }
578
579    /// Rolls back one worker slot after spawn failure.
580    pub(crate) fn rollback_worker_slot(&self) {
581        self.state.write(|state| {
582            state.live_workers = state
583                .live_workers
584                .checked_sub(1)
585                .expect("fixed pool live worker counter underflow");
586        });
587    }
588
589    /// Stops the pool after a build-time worker spawn failure.
590    pub(crate) fn stop_after_failed_build(&self) {
591        self.accepting.store(false, Ordering::Release);
592        self.stop_now.store(true, Ordering::Release);
593        self.state.write(|state| {
594            state.lifecycle = FixedThreadPoolLifecycle::Stopping;
595        });
596        self.state.notify_all();
597    }
598
599    /// Blocks until the pool is fully terminated.
600    fn wait_for_termination(&self) {
601        self.state
602            .wait_until(|state| self.is_terminated_locked(state), |_| ());
603    }
604
605    /// Requests graceful shutdown.
606    fn shutdown(&self) {
607        self.accepting.store(false, Ordering::Release);
608        self.state.write(|state| {
609            if state.lifecycle.is_running() {
610                state.lifecycle = FixedThreadPoolLifecycle::Shutdown;
611            }
612        });
613        self.state.notify_all();
614    }
615
616    /// Requests immediate shutdown and cancels visible queued jobs.
617    ///
618    /// # Returns
619    ///
620    /// Count-based shutdown report.
621    fn shutdown_now(&self) -> ShutdownReport {
622        self.accepting.store(false, Ordering::Release);
623        self.stop_now.store(true, Ordering::Release);
624        let running = self.running_count();
625        let mut state = self.state.lock();
626        state.lifecycle = FixedThreadPoolLifecycle::Stopping;
627        while self.inflight_count() > 0 {
628            state = state.wait();
629        }
630        drop(state);
631        let jobs = self.drain_visible_queued_jobs();
632        let cancelled = jobs.len();
633        for job in jobs {
634            self.cancel_claimed_job(job);
635        }
636        self.state.notify_all();
637        ShutdownReport::new(cancelled, running, cancelled)
638    }
639
640    /// Drains all jobs currently visible in global and worker-local queues.
641    ///
642    /// # Returns
643    ///
644    /// Drained queued jobs.
645    fn drain_visible_queued_jobs(&self) -> Vec<PoolJob> {
646        let mut jobs = Vec::new();
647        loop {
648            let previous_count = jobs.len();
649            self.drain_global_queue(&mut jobs);
650            self.drain_worker_queues(&mut jobs);
651            if jobs.len() == previous_count {
652                return jobs;
653            }
654        }
655    }
656
657    /// Drains visible jobs from the global injector.
658    ///
659    /// # Parameters
660    ///
661    /// * `jobs` - Destination for drained jobs.
662    fn drain_global_queue(&self, jobs: &mut Vec<PoolJob>) {
663        while let Some(job) = steal_one(&self.global_queue) {
664            jobs.push(job);
665        }
666    }
667
668    /// Drains visible jobs from all worker-local queues.
669    ///
670    /// # Parameters
671    ///
672    /// * `jobs` - Destination for drained jobs.
673    fn drain_worker_queues(&self, jobs: &mut Vec<PoolJob>) {
674        for queue in &self.worker_queues {
675            jobs.extend(queue.drain());
676        }
677    }
678
679    /// Returns whether shutdown has started.
680    ///
681    /// # Returns
682    ///
683    /// `true` when lifecycle is not running.
684    fn is_shutdown(&self) -> bool {
685        self.state.read(|state| !state.lifecycle.is_running())
686    }
687
688    /// Returns whether the pool is terminated.
689    ///
690    /// # Returns
691    ///
692    /// `true` after shutdown and after all workers and jobs are gone.
693    fn is_terminated(&self) -> bool {
694        self.state.read(|state| self.is_terminated_locked(state))
695    }
696
697    /// Checks termination against one locked state snapshot.
698    ///
699    /// # Parameters
700    ///
701    /// * `state` - Locked state snapshot.
702    ///
703    /// # Returns
704    ///
705    /// `true` when the pool is terminal.
706    fn is_terminated_locked(&self, state: &FixedThreadPoolState) -> bool {
707        !state.lifecycle.is_running()
708            && state.live_workers == 0
709            && self.queued_count() == 0
710            && self.running_count() == 0
711            && self.inflight_count() == 0
712    }
713
714    /// Returns a point-in-time stats snapshot.
715    ///
716    /// # Returns
717    ///
718    /// Snapshot using fixed pool size for both core and maximum sizes.
719    fn stats(&self) -> ThreadPoolStats {
720        let queued_tasks = self.queued_count();
721        let running_tasks = self.running_count();
722        let submitted_tasks = self.submitted_task_count.load(Ordering::Relaxed);
723        let completed_tasks = self.completed_task_count.load(Ordering::Relaxed);
724        let cancelled_tasks = self.cancelled_task_count.load(Ordering::Relaxed);
725        self.state.read(|state| ThreadPoolStats {
726            core_pool_size: self.pool_size,
727            maximum_pool_size: self.pool_size,
728            live_workers: state.live_workers,
729            idle_workers: state.idle_workers,
730            queued_tasks,
731            running_tasks,
732            submitted_tasks,
733            completed_tasks,
734            cancelled_tasks,
735            shutdown: !state.lifecycle.is_running(),
736            terminated: self.is_terminated_locked(state),
737        })
738    }
739}
740
741/// Submit guard that leaves in-flight accounting on drop.
742struct FixedSubmitGuard<'a> {
743    /// Pool whose in-flight counter was entered.
744    inner: &'a FixedThreadPoolInner,
745}
746
747impl Drop for FixedSubmitGuard<'_> {
748    /// Leaves submit accounting and wakes shutdown waiters if needed.
749    fn drop(&mut self) {
750        let previous = self
751            .inner
752            .inflight_submissions
753            .fetch_sub(1, Ordering::AcqRel);
754        debug_assert!(previous > 0, "fixed pool submit counter underflow");
755        if previous == 1 && !self.inner.accepting.load(Ordering::Acquire) {
756            self.inner.state.notify_all();
757        }
758    }
759}
760
761/// Fixed-size thread pool implementing [`ExecutorService`].
762///
763/// `FixedThreadPool` prestarts a fixed number of worker threads and does not
764/// support runtime pool-size changes. Use [`super::ThreadPool`] when dynamic
765/// core/maximum sizes or keep-alive policies are required.
766pub struct FixedThreadPool {
767    /// Shared fixed pool state.
768    inner: Arc<FixedThreadPoolInner>,
769}
770
771impl FixedThreadPool {
772    /// Builds a fixed pool from validated builder options.
773    ///
774    /// # Parameters
775    ///
776    /// * `pool_size` - Number of workers to prestart.
777    /// * `queue_capacity` - Optional maximum queued task count.
778    /// * `thread_name_prefix` - Prefix used for worker thread names.
779    /// * `stack_size` - Optional worker stack size.
780    ///
781    /// # Returns
782    ///
783    /// A fixed thread-pool handle with workers already started.
784    ///
785    /// # Errors
786    ///
787    /// Returns [`ThreadPoolBuildError`] when a worker thread cannot be spawned.
788    pub(crate) fn build_with_options(
789        pool_size: usize,
790        queue_capacity: Option<usize>,
791        thread_name_prefix: String,
792        stack_size: Option<usize>,
793    ) -> Result<Self, ThreadPoolBuildError> {
794        let mut worker_runtimes = Vec::with_capacity(pool_size);
795        let mut worker_queues = Vec::with_capacity(pool_size);
796        for index in 0..pool_size {
797            let worker_runtime = WorkerRuntime::new(index);
798            worker_queues.push(Arc::clone(&worker_runtime.queue));
799            worker_runtimes.push(worker_runtime);
800        }
801        let inner = Arc::new(FixedThreadPoolInner::new(
802            pool_size,
803            queue_capacity,
804            worker_queues,
805        ));
806        for (index, worker_runtime) in worker_runtimes.into_iter().enumerate() {
807            inner.reserve_worker_slot();
808            let worker_inner = Arc::clone(&inner);
809            let mut builder =
810                std::thread::Builder::new().name(format!("{}-{}", thread_name_prefix, index));
811            if let Some(stack_size) = stack_size {
812                builder = builder.stack_size(stack_size);
813            }
814            if let Err(source) =
815                builder.spawn(move || run_fixed_worker(worker_inner, worker_runtime))
816            {
817                inner.rollback_worker_slot();
818                inner.stop_after_failed_build();
819                return Err(ThreadPoolBuildError::SpawnWorker { index, source });
820            }
821        }
822        Ok(Self { inner })
823    }
824
825    /// Creates a fixed thread pool with `pool_size` prestarted workers.
826    ///
827    /// # Parameters
828    ///
829    /// * `pool_size` - Number of worker threads.
830    ///
831    /// # Returns
832    ///
833    /// A fixed thread pool.
834    ///
835    /// # Errors
836    ///
837    /// Returns [`ThreadPoolBuildError`] if the worker count is zero or a worker
838    /// cannot be spawned.
839    pub fn new(pool_size: usize) -> Result<Self, ThreadPoolBuildError> {
840        Self::builder().pool_size(pool_size).build()
841    }
842
843    /// Creates a fixed pool builder.
844    ///
845    /// # Returns
846    ///
847    /// Builder with CPU parallelism defaults.
848    pub fn builder() -> FixedThreadPoolBuilder {
849        FixedThreadPoolBuilder::new()
850    }
851
852    /// Returns the fixed worker count.
853    ///
854    /// # Returns
855    ///
856    /// Number of workers in this pool.
857    pub fn pool_size(&self) -> usize {
858        self.inner.pool_size()
859    }
860
861    /// Returns the queued task count.
862    ///
863    /// # Returns
864    ///
865    /// Number of accepted tasks waiting to run.
866    pub fn queued_count(&self) -> usize {
867        self.inner.queued_count()
868    }
869
870    /// Returns the running task count.
871    ///
872    /// # Returns
873    ///
874    /// Number of tasks currently held by workers.
875    pub fn running_count(&self) -> usize {
876        self.inner.running_count()
877    }
878
879    /// Returns the live worker count.
880    ///
881    /// # Returns
882    ///
883    /// Number of worker loops that have not exited.
884    pub fn live_worker_count(&self) -> usize {
885        self.inner.state.read(|state| state.live_workers)
886    }
887
888    /// Returns a point-in-time stats snapshot.
889    ///
890    /// # Returns
891    ///
892    /// Snapshot containing queue, worker, and lifecycle counters.
893    pub fn stats(&self) -> ThreadPoolStats {
894        self.inner.stats()
895    }
896}
897
898impl Drop for FixedThreadPool {
899    /// Requests graceful shutdown when the pool handle is dropped.
900    fn drop(&mut self) {
901        self.inner.shutdown();
902    }
903}
904
905impl ExecutorService for FixedThreadPool {
906    type Handle<R, E>
907        = TaskHandle<R, E>
908    where
909        R: Send + 'static,
910        E: Send + 'static;
911
912    type Termination<'a>
913        = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
914    where
915        Self: 'a;
916
917    /// Accepts a callable and queues it for fixed pool workers.
918    ///
919    /// # Parameters
920    ///
921    /// * `task` - Callable to execute on a fixed pool worker.
922    ///
923    /// # Returns
924    ///
925    /// A [`TaskHandle`] for the accepted task.
926    ///
927    /// # Errors
928    ///
929    /// Returns [`RejectedExecution::Shutdown`] after shutdown or
930    /// [`RejectedExecution::Saturated`] when a bounded queue is full.
931    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
932    where
933        C: Callable<R, E> + Send + 'static,
934        R: Send + 'static,
935        E: Send + 'static,
936    {
937        let (handle, completion) = TaskCompletionPair::new().into_parts();
938        let job = PoolJob::from_task(task, completion);
939        self.inner.submit(job)?;
940        Ok(handle)
941    }
942
943    /// Stops accepting new work and drains accepted queued tasks.
944    fn shutdown(&self) {
945        self.inner.shutdown();
946    }
947
948    /// Stops accepting work and cancels queued tasks.
949    ///
950    /// # Returns
951    ///
952    /// A count-based shutdown report.
953    fn shutdown_now(&self) -> ShutdownReport {
954        self.inner.shutdown_now()
955    }
956
957    /// Returns whether shutdown has been requested.
958    ///
959    /// # Returns
960    ///
961    /// `true` when this pool no longer accepts new work.
962    fn is_shutdown(&self) -> bool {
963        self.inner.is_shutdown()
964    }
965
966    /// Returns whether this pool is fully terminated.
967    ///
968    /// # Returns
969    ///
970    /// `true` after shutdown and after all workers have exited.
971    fn is_terminated(&self) -> bool {
972        self.inner.is_terminated()
973    }
974
975    /// Waits until this fixed pool has terminated.
976    ///
977    /// # Returns
978    ///
979    /// A future that blocks the polling thread until termination.
980    fn await_termination(&self) -> Self::Termination<'_> {
981        Box::pin(async move {
982            self.inner.wait_for_termination();
983        })
984    }
985}
986
987/// Runs one fixed-pool worker loop.
988///
989/// # Parameters
990///
991/// * `inner` - Shared fixed-pool state.
992/// * `worker_runtime` - Queue runtime owned by this worker.
993fn run_fixed_worker(inner: Arc<FixedThreadPoolInner>, worker_runtime: WorkerRuntime) {
994    worker_runtime.queue.activate();
995    loop {
996        if let Some(job) = inner.try_take_job(&worker_runtime) {
997            job.run();
998            inner.finish_running_job();
999            continue;
1000        }
1001        if !wait_for_fixed_pool_work(&inner) {
1002            break;
1003        }
1004    }
1005    worker_exited(&inner, &worker_runtime.queue);
1006}
1007
1008/// Waits until visible work exists or the worker should exit.
1009///
1010/// # Parameters
1011///
1012/// * `inner` - Shared fixed-pool state.
1013///
1014/// # Returns
1015///
1016/// `true` when the worker should try to take work again, or `false` when it
1017/// should exit.
1018fn wait_for_fixed_pool_work(inner: &FixedThreadPoolInner) -> bool {
1019    let mut state = inner.state.lock();
1020    loop {
1021        match state.lifecycle {
1022            FixedThreadPoolLifecycle::Running => {
1023                if inner.queued_count() > 0 {
1024                    return true;
1025                }
1026                mark_fixed_worker_idle(inner, &mut state);
1027                if inner.queued_count() > 0 || inner.has_pending_worker_wake() {
1028                    unmark_fixed_worker_idle(inner, &mut state);
1029                    return true;
1030                }
1031                state = state.wait();
1032                unmark_fixed_worker_idle(inner, &mut state);
1033            }
1034            FixedThreadPoolLifecycle::Shutdown => {
1035                if inner.queued_count() > 0 {
1036                    return true;
1037                }
1038                if inner.queued_count() == 0 && inner.inflight_count() == 0 {
1039                    return false;
1040                }
1041                mark_fixed_worker_idle(inner, &mut state);
1042                if inner.queued_count() > 0
1043                    || inner.inflight_count() == 0
1044                    || inner.has_pending_worker_wake()
1045                {
1046                    unmark_fixed_worker_idle(inner, &mut state);
1047                    continue;
1048                }
1049                state = state.wait();
1050                unmark_fixed_worker_idle(inner, &mut state);
1051            }
1052            FixedThreadPoolLifecycle::Stopping => return false,
1053        }
1054    }
1055}
1056
1057/// Marks a fixed-pool worker as idle in locked and lock-free state.
1058///
1059/// # Parameters
1060///
1061/// * `inner` - Fixed pool whose idle counter is updated.
1062/// * `state` - Locked mutable state containing authoritative idle workers.
1063fn mark_fixed_worker_idle(inner: &FixedThreadPoolInner, state: &mut FixedThreadPoolState) {
1064    state.idle_workers += 1;
1065    inner.idle_worker_count.fetch_add(1, Ordering::AcqRel);
1066}
1067
1068/// Marks a fixed-pool worker as no longer idle.
1069///
1070/// # Parameters
1071///
1072/// * `inner` - Fixed pool whose idle counter is updated.
1073/// * `state` - Locked mutable state containing authoritative idle workers.
1074fn unmark_fixed_worker_idle(inner: &FixedThreadPoolInner, state: &mut FixedThreadPoolState) {
1075    state.idle_workers = state
1076        .idle_workers
1077        .checked_sub(1)
1078        .expect("fixed pool idle worker counter underflow");
1079    let previous = inner.idle_worker_count.fetch_sub(1, Ordering::AcqRel);
1080    debug_assert!(previous > 0, "fixed pool idle worker counter underflow");
1081    inner.consume_pending_worker_wake();
1082}
1083
1084/// Marks one fixed-pool worker as exited.
1085///
1086/// # Parameters
1087///
1088/// * `inner` - Shared fixed-pool state.
1089/// * `worker_queue` - Queue owned by the exiting worker.
1090fn worker_exited(inner: &FixedThreadPoolInner, worker_queue: &WorkerQueue) {
1091    worker_queue.deactivate();
1092    inner.state.write(|state| {
1093        state.live_workers = state
1094            .live_workers
1095            .checked_sub(1)
1096            .expect("fixed pool live worker counter underflow");
1097    });
1098    inner.state.notify_all();
1099}
1100
1101#[cfg(test)]
1102mod tests {
1103    use super::*;
1104    use std::sync::{
1105        Arc,
1106        atomic::{
1107            AtomicUsize,
1108            Ordering,
1109        },
1110    };
1111    use std::thread;
1112    use std::time::Duration;
1113
1114    fn counted_job(cancelled: Arc<AtomicUsize>, ran: Arc<AtomicUsize>) -> PoolJob {
1115        PoolJob::new(
1116            Box::new(move || {
1117                ran.fetch_add(1, Ordering::AcqRel);
1118            }),
1119            Box::new(move || {
1120                cancelled.fetch_add(1, Ordering::AcqRel);
1121            }),
1122        )
1123    }
1124
1125    #[test]
1126    fn test_accept_claimed_job_stop_now_cancels_claimed_and_worker_queues() {
1127        let runtime = WorkerRuntime::new(0);
1128        runtime.queue.activate();
1129        let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1130        inner.stop_now.store(true, Ordering::Release);
1131
1132        let cancelled = Arc::new(AtomicUsize::new(0));
1133        let ran = Arc::new(AtomicUsize::new(0));
1134        runtime
1135            .local
1136            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1137        runtime
1138            .queue
1139            .push_back(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1140        inner.queued_task_count.store(3, Ordering::Release);
1141
1142        let accepted =
1143            inner.accept_claimed_job(counted_job(cancelled.clone(), ran.clone()), &runtime);
1144        assert!(accepted.is_none());
1145        assert_eq!(inner.queued_count(), 0);
1146        assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 3);
1147        assert_eq!(cancelled.load(Ordering::Acquire), 3);
1148        assert_eq!(ran.load(Ordering::Acquire), 0);
1149    }
1150
1151    #[test]
1152    fn test_steal_global_job_notifies_when_batch_leaves_local_jobs() {
1153        let runtime = WorkerRuntime::new(0);
1154        let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1155        let cancelled = Arc::new(AtomicUsize::new(0));
1156        let ran = Arc::new(AtomicUsize::new(0));
1157        runtime
1158            .local
1159            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1160        inner
1161            .global_queue
1162            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1163        inner.queued_task_count.store(2, Ordering::Release);
1164
1165        let claimed = inner
1166            .steal_global_job(&runtime)
1167            .expect("global queue should provide one claimed job");
1168        claimed.run();
1169        inner.finish_running_job();
1170        let remaining = runtime
1171            .local
1172            .pop()
1173            .expect("preloaded local job should remain queued");
1174        inner.cancel_claimed_job(remaining);
1175
1176        assert_eq!(inner.queued_count(), 0);
1177        assert_eq!(inner.running_count(), 0);
1178        assert_eq!(inner.completed_task_count.load(Ordering::Acquire), 1);
1179        assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 1);
1180        assert_eq!(ran.load(Ordering::Acquire), 1);
1181        assert_eq!(cancelled.load(Ordering::Acquire), 1);
1182    }
1183
1184    #[test]
1185    fn test_steal_worker_job_notifies_when_batch_leaves_local_jobs() {
1186        let thief = WorkerRuntime::new(0);
1187        let victim = WorkerRuntime::new(1);
1188        thief.queue.activate();
1189        victim.queue.activate();
1190        let inner = FixedThreadPoolInner::new(
1191            2,
1192            None,
1193            vec![Arc::clone(&thief.queue), Arc::clone(&victim.queue)],
1194        );
1195        let cancelled = Arc::new(AtomicUsize::new(0));
1196        let ran = Arc::new(AtomicUsize::new(0));
1197        thief
1198            .local
1199            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1200        victim
1201            .queue
1202            .push_back(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1203        inner.queued_task_count.store(2, Ordering::Release);
1204
1205        let claimed = inner
1206            .steal_worker_job(&thief)
1207            .expect("victim queue should provide one claimed job");
1208        claimed.run();
1209        inner.finish_running_job();
1210        let remaining = thief
1211            .local
1212            .pop()
1213            .expect("batch steal should leave one local job");
1214        inner.cancel_claimed_job(remaining);
1215
1216        assert_eq!(inner.queued_count(), 0);
1217        assert_eq!(inner.running_count(), 0);
1218        assert_eq!(inner.completed_task_count.load(Ordering::Acquire), 1);
1219        assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 1);
1220        assert_eq!(ran.load(Ordering::Acquire), 1);
1221        assert_eq!(cancelled.load(Ordering::Acquire), 1);
1222    }
1223
1224    #[test]
1225    fn test_fixed_submit_guard_drop_notifies_when_shutdown_closes_admission() {
1226        let runtime = WorkerRuntime::new(0);
1227        let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1228        inner.inflight_submissions.store(1, Ordering::Release);
1229        inner.accepting.store(false, Ordering::Release);
1230
1231        {
1232            let guard = FixedSubmitGuard { inner: &inner };
1233            drop(guard);
1234        }
1235
1236        assert_eq!(inner.inflight_count(), 0);
1237    }
1238
1239    #[test]
1240    fn test_wait_for_fixed_pool_work_shutdown_waits_for_inflight_submissions() {
1241        let runtime = WorkerRuntime::new(0);
1242        let inner = Arc::new(FixedThreadPoolInner::new(
1243            1,
1244            None,
1245            vec![Arc::clone(&runtime.queue)],
1246        ));
1247        inner.state.write(|state| {
1248            state.lifecycle = FixedThreadPoolLifecycle::Shutdown;
1249        });
1250        inner.inflight_submissions.store(1, Ordering::Release);
1251        inner.pending_worker_wakes.store(1, Ordering::Release);
1252
1253        let inner_for_release = Arc::clone(&inner);
1254        let releaser = thread::spawn(move || {
1255            thread::sleep(Duration::from_millis(10));
1256            inner_for_release
1257                .inflight_submissions
1258                .store(0, Ordering::Release);
1259            inner_for_release.state.notify_all();
1260        });
1261
1262        assert!(!wait_for_fixed_pool_work(&inner));
1263        releaser.join().expect("releaser thread should finish");
1264    }
1265
1266    #[test]
1267    fn test_shutdown_now_waits_for_inflight_submissions() {
1268        let runtime = WorkerRuntime::new(0);
1269        let inner = Arc::new(FixedThreadPoolInner::new(
1270            1,
1271            None,
1272            vec![Arc::clone(&runtime.queue)],
1273        ));
1274        inner.inflight_submissions.store(1, Ordering::Release);
1275
1276        let inner_for_release = Arc::clone(&inner);
1277        let releaser = thread::spawn(move || {
1278            thread::sleep(Duration::from_millis(10));
1279            inner_for_release
1280                .inflight_submissions
1281                .store(0, Ordering::Release);
1282            inner_for_release.state.notify_all();
1283        });
1284
1285        let report = inner.shutdown_now();
1286        releaser.join().expect("releaser thread should finish");
1287        assert_eq!(report.running, 0);
1288        assert_eq!(report.queued, 0);
1289        assert_eq!(report.cancelled, 0);
1290    }
1291}