Skip to main content

qubit_thread_pool/
fixed_thread_pool.rs

1/*******************************************************************************
2 *
3 *    Copyright (c) 2025 - 2026.
4 *    Haixing Hu, Qubit Co. Ltd.
5 *
6 *    All rights reserved.
7 *
8 ******************************************************************************/
9// qubit-style: allow multiple-public-types
10// qubit-style: allow inline-tests
11use std::{
12    future::Future,
13    pin::Pin,
14    sync::{
15        Arc,
16        atomic::{
17            AtomicBool,
18            AtomicUsize,
19            Ordering,
20        },
21    },
22    thread,
23};
24
25use crossbeam_deque::Injector;
26use qubit_function::Callable;
27
28use qubit_executor::{
29    TaskCompletionPair,
30    TaskHandle,
31};
32use qubit_lock::Monitor;
33
34use super::thread_pool::{
35    ThreadPoolBuildError,
36    ThreadPoolStats,
37};
38use super::worker_queue::{
39    WorkerQueue,
40    WorkerRuntime,
41    steal_batch_and_pop,
42    steal_one,
43};
44use crate::thread_pool::PoolJob;
45use qubit_executor::service::{
46    ExecutorService,
47    RejectedExecution,
48    ShutdownReport,
49};
50
51/// Default thread name prefix used by [`FixedThreadPoolBuilder`].
52const DEFAULT_FIXED_THREAD_NAME_PREFIX: &str = "qubit-fixed-thread-pool";
53
54/// Maximum number of worker-local queues probed by one submit call.
55const LOCAL_ENQUEUE_MAX_PROBES: usize = 4;
56/// Maximum worker count that uses worker-local batch queues.
57const LOCAL_QUEUE_WORKER_LIMIT: usize = 4;
58
59/// Lifecycle state for a fixed-size thread pool.
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61enum FixedThreadPoolLifecycle {
62    /// The pool accepts new tasks and workers wait for queued work.
63    Running,
64
65    /// The pool rejects new tasks but drains queued work.
66    Shutdown,
67
68    /// The pool rejects new tasks and cancels queued work.
69    Stopping,
70}
71
72impl FixedThreadPoolLifecycle {
73    /// Returns whether this lifecycle still accepts submissions.
74    ///
75    /// # Returns
76    ///
77    /// `true` only while the pool is running.
78    const fn is_running(self) -> bool {
79        matches!(self, Self::Running)
80    }
81}
82
83/// Mutable state protected by the fixed pool monitor.
84struct FixedThreadPoolState {
85    /// Current lifecycle state.
86    lifecycle: FixedThreadPoolLifecycle,
87    /// Number of worker loops that have not exited.
88    live_workers: usize,
89    /// Number of workers currently blocked waiting for work.
90    idle_workers: usize,
91}
92
93impl FixedThreadPoolState {
94    /// Creates an empty running state.
95    ///
96    /// # Returns
97    ///
98    /// A running state before any worker has been reserved.
99    fn new() -> Self {
100        Self {
101            lifecycle: FixedThreadPoolLifecycle::Running,
102            live_workers: 0,
103            idle_workers: 0,
104        }
105    }
106}
107
108/// Shared state for a fixed-size thread pool.
109struct FixedThreadPoolInner {
110    /// Number of workers in this fixed pool.
111    pool_size: usize,
112    /// Mutable lifecycle and worker counters.
113    state: Monitor<FixedThreadPoolState>,
114    /// Admission gate used by submitters.
115    accepting: AtomicBool,
116    /// Whether immediate shutdown has requested workers to stop taking jobs.
117    stop_now: AtomicBool,
118    /// Submit calls that have passed the first admission check.
119    inflight_submissions: AtomicUsize,
120    /// Number of workers currently blocked or about to block waiting for work.
121    idle_worker_count: AtomicUsize,
122    /// Number of idle-worker wakeups already requested but not yet consumed.
123    pending_worker_wakes: AtomicUsize,
124    /// Lock-free queue for externally submitted jobs.
125    global_queue: Injector<PoolJob>,
126    /// Worker-local queues used for submit routing and work stealing.
127    worker_queues: Vec<Arc<WorkerQueue>>,
128    /// Round-robin cursor used for submit-path local queue selection.
129    next_enqueue_worker: AtomicUsize,
130    /// Optional maximum number of queued jobs.
131    queue_capacity: Option<usize>,
132    /// Number of queued jobs not yet started or cancelled.
133    queued_task_count: AtomicUsize,
134    /// Number of jobs currently running.
135    running_task_count: AtomicUsize,
136    /// Total number of accepted jobs.
137    submitted_task_count: AtomicUsize,
138    /// Total number of finished worker-held jobs.
139    completed_task_count: AtomicUsize,
140    /// Total number of queued jobs cancelled by immediate shutdown.
141    cancelled_task_count: AtomicUsize,
142}
143
144impl FixedThreadPoolInner {
145    /// Creates shared state for a fixed-size pool.
146    ///
147    /// # Parameters
148    ///
149    /// * `pool_size` - Number of workers that will be prestarted.
150    /// * `queue_capacity` - Optional queue capacity.
151    ///
152    /// # Returns
153    ///
154    /// A shared state object ready for worker startup.
155    fn new(
156        pool_size: usize,
157        queue_capacity: Option<usize>,
158        worker_queues: Vec<Arc<WorkerQueue>>,
159    ) -> Self {
160        Self {
161            pool_size,
162            state: Monitor::new(FixedThreadPoolState::new()),
163            accepting: AtomicBool::new(true),
164            stop_now: AtomicBool::new(false),
165            inflight_submissions: AtomicUsize::new(0),
166            idle_worker_count: AtomicUsize::new(0),
167            pending_worker_wakes: AtomicUsize::new(0),
168            global_queue: Injector::new(),
169            worker_queues,
170            next_enqueue_worker: AtomicUsize::new(0),
171            queue_capacity,
172            queued_task_count: AtomicUsize::new(0),
173            running_task_count: AtomicUsize::new(0),
174            submitted_task_count: AtomicUsize::new(0),
175            completed_task_count: AtomicUsize::new(0),
176            cancelled_task_count: AtomicUsize::new(0),
177        }
178    }
179
180    /// Returns the fixed worker count.
181    ///
182    /// # Returns
183    ///
184    /// Number of workers owned by this pool.
185    #[inline]
186    fn pool_size(&self) -> usize {
187        self.pool_size
188    }
189
190    /// Returns the queued task count.
191    ///
192    /// # Returns
193    ///
194    /// Number of accepted tasks waiting to run.
195    #[inline]
196    fn queued_count(&self) -> usize {
197        self.queued_task_count.load(Ordering::Acquire)
198    }
199
200    /// Returns the running task count.
201    ///
202    /// # Returns
203    ///
204    /// Number of tasks currently held by workers.
205    #[inline]
206    fn running_count(&self) -> usize {
207        self.running_task_count.load(Ordering::Acquire)
208    }
209
210    /// Returns the number of in-flight submit calls.
211    ///
212    /// # Returns
213    ///
214    /// Number of submit calls that may still publish or roll back a queued job.
215    #[inline]
216    fn inflight_count(&self) -> usize {
217        self.inflight_submissions.load(Ordering::Acquire)
218    }
219
220    /// Attempts to enter submit admission.
221    ///
222    /// # Returns
223    ///
224    /// A guard that leaves admission on drop.
225    ///
226    /// # Errors
227    ///
228    /// Returns [`RejectedExecution::Shutdown`] when admission is closed.
229    fn begin_submit(&self) -> Result<FixedSubmitGuard<'_>, RejectedExecution> {
230        if !self.accepting.load(Ordering::Acquire) {
231            return Err(RejectedExecution::Shutdown);
232        }
233        self.inflight_submissions.fetch_add(1, Ordering::AcqRel);
234        if self.accepting.load(Ordering::Acquire) {
235            Ok(FixedSubmitGuard { inner: self })
236        } else {
237            let previous = self.inflight_submissions.fetch_sub(1, Ordering::AcqRel);
238            debug_assert!(previous > 0, "fixed pool submit counter underflow");
239            if previous == 1 {
240                self.state.notify_all();
241            }
242            Err(RejectedExecution::Shutdown)
243        }
244    }
245
246    /// Attempts to reserve one queue slot.
247    ///
248    /// # Returns
249    ///
250    /// `true` if one queued slot was reserved, otherwise `false`.
251    fn reserve_queue_slot(&self) -> bool {
252        if let Some(capacity) = self.queue_capacity {
253            loop {
254                let current = self.queued_count();
255                if current >= capacity {
256                    return false;
257                }
258                if self
259                    .queued_task_count
260                    .compare_exchange(current, current + 1, Ordering::AcqRel, Ordering::Acquire)
261                    .is_ok()
262                {
263                    return true;
264                }
265            }
266        }
267        self.queued_task_count.fetch_add(1, Ordering::AcqRel);
268        true
269    }
270
271    /// Submits one job to this fixed pool.
272    ///
273    /// # Parameters
274    ///
275    /// * `job` - Type-erased job accepted by the pool.
276    ///
277    /// # Returns
278    ///
279    /// `Ok(())` when the job is accepted.
280    ///
281    /// # Errors
282    ///
283    /// Returns [`RejectedExecution::Shutdown`] after shutdown or
284    /// [`RejectedExecution::Saturated`] when the bounded queue is full.
285    fn submit(&self, job: PoolJob) -> Result<(), RejectedExecution> {
286        let _guard = self.begin_submit()?;
287        if !self.reserve_queue_slot() {
288            return Err(RejectedExecution::Saturated);
289        }
290        if !self.accepting.load(Ordering::Acquire) {
291            let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
292            debug_assert!(previous > 0, "fixed pool queued counter underflow");
293            return Err(RejectedExecution::Shutdown);
294        }
295        self.submitted_task_count.fetch_add(1, Ordering::Relaxed);
296        self.enqueue_job(job);
297        Ok(())
298    }
299
300    /// Enqueues one accepted job to a worker inbox or the global fallback.
301    ///
302    /// # Parameters
303    ///
304    /// * `job` - Job whose queued slot has already been reserved.
305    fn enqueue_job(&self, job: PoolJob) {
306        if self.use_worker_local_queues() {
307            match self.try_enqueue_to_worker(job) {
308                Ok(()) => {}
309                Err(job) => self.global_queue.push(job),
310            }
311        } else {
312            self.global_queue.push(job);
313        }
314        self.wake_one_idle_worker();
315    }
316
317    /// Wakes one idle worker if no already-requested wakeup covers it.
318    ///
319    /// Pending wake tokens close the lost-notification window: a worker that
320    /// has marked itself idle but has not yet parked will observe the token and
321    /// retry work without relying on the condition-variable notification.
322    fn wake_one_idle_worker(&self) {
323        loop {
324            let idle_workers = self.idle_worker_count.load(Ordering::Acquire);
325            if idle_workers == 0 {
326                return;
327            }
328            let pending_wakes = self.pending_worker_wakes.load(Ordering::Acquire);
329            if pending_wakes >= idle_workers {
330                return;
331            }
332            if self
333                .pending_worker_wakes
334                .compare_exchange_weak(
335                    pending_wakes,
336                    pending_wakes + 1,
337                    Ordering::AcqRel,
338                    Ordering::Acquire,
339                )
340                .is_ok()
341            {
342                self.state.notify_one();
343                return;
344            }
345        }
346    }
347
348    /// Returns whether an idle-worker wakeup has been requested.
349    ///
350    /// # Returns
351    ///
352    /// `true` when at least one idle worker should leave the wait path and
353    /// retry taking work.
354    fn has_pending_worker_wake(&self) -> bool {
355        self.pending_worker_wakes.load(Ordering::Acquire) > 0
356    }
357
358    /// Consumes one requested idle-worker wakeup if one exists.
359    fn consume_pending_worker_wake(&self) {
360        let mut current = self.pending_worker_wakes.load(Ordering::Acquire);
361        while current > 0 {
362            match self.pending_worker_wakes.compare_exchange_weak(
363                current,
364                current - 1,
365                Ordering::AcqRel,
366                Ordering::Acquire,
367            ) {
368                Ok(_) => return,
369                Err(actual) => current = actual,
370            }
371        }
372    }
373
374    /// Attempts to route one job directly to an active worker queue.
375    ///
376    /// # Parameters
377    ///
378    /// * `job` - Job to route.
379    ///
380    /// # Returns
381    ///
382    /// `Ok(())` when the job was published to a worker inbox; otherwise the
383    /// original job is returned for global fallback.
384    fn try_enqueue_to_worker(&self, job: PoolJob) -> Result<(), PoolJob> {
385        let queue_count = self.worker_queues.len();
386        debug_assert!(queue_count > 0, "fixed pool must have worker queues");
387        let probe_count = queue_count.min(LOCAL_ENQUEUE_MAX_PROBES);
388        for _ in 0..probe_count {
389            let index = self.next_enqueue_worker.fetch_add(1, Ordering::Relaxed) % queue_count;
390            let queue = &self.worker_queues[index];
391            if queue.is_active() {
392                queue.push_back(job);
393                return Ok(());
394            }
395        }
396        Err(job)
397    }
398
399    /// Attempts to claim one queued job for a worker.
400    ///
401    /// The worker first checks its local queue, then its cross-thread inbox,
402    /// then the global fallback queue, and finally steals from other workers.
403    /// This matches the dynamic pool's hot path and avoids forcing all fixed
404    /// workers through one global injector under skewed workloads.
405    ///
406    /// # Parameters
407    ///
408    /// * `worker_runtime` - Queue runtime owned by the current worker.
409    ///
410    /// # Returns
411    ///
412    /// `Some(job)` when a job was claimed, otherwise `None`.
413    fn try_take_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
414        if self.stop_now.load(Ordering::Acquire) {
415            self.cancel_worker_jobs(worker_runtime);
416            return None;
417        }
418        if !self.use_worker_local_queues() {
419            return self.steal_single_global_job(worker_runtime);
420        }
421        if let Some(job) = worker_runtime.local.pop() {
422            return self.accept_claimed_job(job, worker_runtime);
423        }
424        if let Some(job) = worker_runtime.queue.pop_inbox_into(&worker_runtime.local) {
425            return self.accept_claimed_job(job, worker_runtime);
426        }
427        if let Some(job) = self.steal_global_job(worker_runtime) {
428            return Some(job);
429        }
430        self.steal_worker_job(worker_runtime)
431    }
432
433    /// Attempts to batch-steal one job from the global injector.
434    ///
435    /// # Parameters
436    ///
437    /// * `worker_runtime` - Queue runtime receiving any stolen batch remainder.
438    ///
439    /// # Returns
440    ///
441    /// `Some(job)` when a job was claimed, otherwise `None`.
442    fn steal_global_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
443        if let Some(job) = steal_batch_and_pop(&self.global_queue, &worker_runtime.local) {
444            if !worker_runtime.local.is_empty() {
445                self.state.notify_one();
446            }
447            return self.accept_claimed_job(job, worker_runtime);
448        }
449        self.steal_single_global_job(worker_runtime)
450    }
451
452    /// Attempts to steal exactly one job from the global injector.
453    ///
454    /// # Parameters
455    ///
456    /// * `worker_runtime` - Queue runtime owned by the current worker.
457    ///
458    /// # Returns
459    ///
460    /// `Some(job)` when a job was claimed, otherwise `None`.
461    fn steal_single_global_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
462        steal_one(&self.global_queue).and_then(|job| self.accept_claimed_job(job, worker_runtime))
463    }
464
465    /// Attempts to steal one job from another worker's local queue.
466    ///
467    /// # Parameters
468    ///
469    /// * `worker_runtime` - Queue runtime owned by the current worker.
470    ///
471    /// # Returns
472    ///
473    /// `Some(job)` when a job was claimed, otherwise `None`.
474    fn steal_worker_job(&self, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
475        let queue_count = self.worker_queues.len();
476        if queue_count <= 1 {
477            return None;
478        }
479        let worker_index = worker_runtime.worker_index();
480        let start = worker_runtime.next_steal_start(queue_count);
481        for offset in 0..queue_count {
482            let victim = &self.worker_queues[(start + offset) % queue_count];
483            if victim.worker_index() == worker_index {
484                continue;
485            }
486            if !victim.is_active() {
487                continue;
488            }
489            if let Some(job) = victim.steal_into(&worker_runtime.local) {
490                if !worker_runtime.local.is_empty() {
491                    self.state.notify_one();
492                }
493                return self.accept_claimed_job(job, worker_runtime);
494            }
495        }
496        None
497    }
498
499    /// Returns whether this pool should use worker-local queues.
500    ///
501    /// # Returns
502    ///
503    /// `true` for small fixed pools where local batching reduces global queue
504    /// contention; `false` for larger pools where inbox routing and victim
505    /// scans cost more than they save.
506    fn use_worker_local_queues(&self) -> bool {
507        self.pool_size <= LOCAL_QUEUE_WORKER_LIMIT
508    }
509
510    /// Accepts a claimed queued job or cancels it after immediate shutdown.
511    ///
512    /// # Parameters
513    ///
514    /// * `job` - Job claimed from a queue.
515    /// * `worker_runtime` - Queue runtime drained if stopping.
516    ///
517    /// # Returns
518    ///
519    /// `Some(job)` when the job may run, otherwise `None`.
520    fn accept_claimed_job(&self, job: PoolJob, worker_runtime: &WorkerRuntime) -> Option<PoolJob> {
521        if self.stop_now.load(Ordering::Acquire) {
522            self.cancel_claimed_job(job);
523            self.cancel_worker_jobs(worker_runtime);
524            return None;
525        }
526        self.mark_queued_job_running();
527        Some(job)
528    }
529
530    /// Cancels all jobs remaining in one worker runtime.
531    ///
532    /// # Parameters
533    ///
534    /// * `worker_runtime` - Worker-owned runtime to drain.
535    fn cancel_worker_jobs(&self, worker_runtime: &WorkerRuntime) {
536        while let Some(job) = worker_runtime.local.pop() {
537            self.cancel_claimed_job(job);
538        }
539        for job in worker_runtime.queue.drain() {
540            self.cancel_claimed_job(job);
541        }
542    }
543
544    /// Marks one claimed queued job as running.
545    fn mark_queued_job_running(&self) {
546        let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
547        debug_assert!(previous > 0, "fixed pool queued counter underflow");
548        self.running_task_count.fetch_add(1, Ordering::AcqRel);
549    }
550
551    /// Cancels one job claimed after immediate shutdown started.
552    ///
553    /// # Parameters
554    ///
555    /// * `job` - Queued job that must not be run.
556    fn cancel_claimed_job(&self, job: PoolJob) {
557        let previous = self.queued_task_count.fetch_sub(1, Ordering::AcqRel);
558        debug_assert!(previous > 0, "fixed pool queued counter underflow");
559        self.cancelled_task_count.fetch_add(1, Ordering::Relaxed);
560        job.cancel();
561        self.state.notify_all();
562    }
563
564    /// Marks one running job as finished.
565    fn finish_running_job(&self) {
566        let previous = self.running_task_count.fetch_sub(1, Ordering::AcqRel);
567        debug_assert!(previous > 0, "fixed pool running counter underflow");
568        self.completed_task_count.fetch_add(1, Ordering::Relaxed);
569        if previous == 1 && self.queued_count() == 0 {
570            self.state.notify_all();
571        }
572    }
573
574    /// Reserves one worker slot before spawning a worker thread.
575    fn reserve_worker_slot(&self) {
576        self.state.write(|state| {
577            state.live_workers += 1;
578        });
579    }
580
581    /// Rolls back one worker slot after spawn failure.
582    fn rollback_worker_slot(&self) {
583        self.state.write(|state| {
584            state.live_workers = state
585                .live_workers
586                .checked_sub(1)
587                .expect("fixed pool live worker counter underflow");
588        });
589    }
590
591    /// Stops the pool after a build-time worker spawn failure.
592    fn stop_after_failed_build(&self) {
593        self.accepting.store(false, Ordering::Release);
594        self.stop_now.store(true, Ordering::Release);
595        self.state.write(|state| {
596            state.lifecycle = FixedThreadPoolLifecycle::Stopping;
597        });
598        self.state.notify_all();
599    }
600
601    /// Blocks until the pool is fully terminated.
602    fn wait_for_termination(&self) {
603        self.state
604            .wait_until(|state| self.is_terminated_locked(state), |_| ());
605    }
606
607    /// Requests graceful shutdown.
608    fn shutdown(&self) {
609        self.accepting.store(false, Ordering::Release);
610        self.state.write(|state| {
611            if state.lifecycle.is_running() {
612                state.lifecycle = FixedThreadPoolLifecycle::Shutdown;
613            }
614        });
615        self.state.notify_all();
616    }
617
618    /// Requests immediate shutdown and cancels visible queued jobs.
619    ///
620    /// # Returns
621    ///
622    /// Count-based shutdown report.
623    fn shutdown_now(&self) -> ShutdownReport {
624        self.accepting.store(false, Ordering::Release);
625        self.stop_now.store(true, Ordering::Release);
626        let running = self.running_count();
627        let mut state = self.state.lock();
628        state.lifecycle = FixedThreadPoolLifecycle::Stopping;
629        while self.inflight_count() > 0 {
630            state = state.wait();
631        }
632        drop(state);
633        let jobs = self.drain_visible_queued_jobs();
634        let cancelled = jobs.len();
635        for job in jobs {
636            self.cancel_claimed_job(job);
637        }
638        self.state.notify_all();
639        ShutdownReport::new(cancelled, running, cancelled)
640    }
641
642    /// Drains all jobs currently visible in global and worker-local queues.
643    ///
644    /// # Returns
645    ///
646    /// Drained queued jobs.
647    fn drain_visible_queued_jobs(&self) -> Vec<PoolJob> {
648        let mut jobs = Vec::new();
649        loop {
650            let previous_count = jobs.len();
651            self.drain_global_queue(&mut jobs);
652            self.drain_worker_queues(&mut jobs);
653            if jobs.len() == previous_count {
654                return jobs;
655            }
656        }
657    }
658
659    /// Drains visible jobs from the global injector.
660    ///
661    /// # Parameters
662    ///
663    /// * `jobs` - Destination for drained jobs.
664    fn drain_global_queue(&self, jobs: &mut Vec<PoolJob>) {
665        while let Some(job) = steal_one(&self.global_queue) {
666            jobs.push(job);
667        }
668    }
669
670    /// Drains visible jobs from all worker-local queues.
671    ///
672    /// # Parameters
673    ///
674    /// * `jobs` - Destination for drained jobs.
675    fn drain_worker_queues(&self, jobs: &mut Vec<PoolJob>) {
676        for queue in &self.worker_queues {
677            jobs.extend(queue.drain());
678        }
679    }
680
681    /// Returns whether shutdown has started.
682    ///
683    /// # Returns
684    ///
685    /// `true` when lifecycle is not running.
686    fn is_shutdown(&self) -> bool {
687        self.state.read(|state| !state.lifecycle.is_running())
688    }
689
690    /// Returns whether the pool is terminated.
691    ///
692    /// # Returns
693    ///
694    /// `true` after shutdown and after all workers and jobs are gone.
695    fn is_terminated(&self) -> bool {
696        self.state.read(|state| self.is_terminated_locked(state))
697    }
698
699    /// Checks termination against one locked state snapshot.
700    ///
701    /// # Parameters
702    ///
703    /// * `state` - Locked state snapshot.
704    ///
705    /// # Returns
706    ///
707    /// `true` when the pool is terminal.
708    fn is_terminated_locked(&self, state: &FixedThreadPoolState) -> bool {
709        !state.lifecycle.is_running()
710            && state.live_workers == 0
711            && self.queued_count() == 0
712            && self.running_count() == 0
713            && self.inflight_count() == 0
714    }
715
716    /// Returns a point-in-time stats snapshot.
717    ///
718    /// # Returns
719    ///
720    /// Snapshot using fixed pool size for both core and maximum sizes.
721    fn stats(&self) -> ThreadPoolStats {
722        let queued_tasks = self.queued_count();
723        let running_tasks = self.running_count();
724        let submitted_tasks = self.submitted_task_count.load(Ordering::Relaxed);
725        let completed_tasks = self.completed_task_count.load(Ordering::Relaxed);
726        let cancelled_tasks = self.cancelled_task_count.load(Ordering::Relaxed);
727        self.state.read(|state| ThreadPoolStats {
728            core_pool_size: self.pool_size,
729            maximum_pool_size: self.pool_size,
730            live_workers: state.live_workers,
731            idle_workers: state.idle_workers,
732            queued_tasks,
733            running_tasks,
734            submitted_tasks,
735            completed_tasks,
736            cancelled_tasks,
737            shutdown: !state.lifecycle.is_running(),
738            terminated: self.is_terminated_locked(state),
739        })
740    }
741}
742
743/// Submit guard that leaves in-flight accounting on drop.
744struct FixedSubmitGuard<'a> {
745    /// Pool whose in-flight counter was entered.
746    inner: &'a FixedThreadPoolInner,
747}
748
749impl Drop for FixedSubmitGuard<'_> {
750    /// Leaves submit accounting and wakes shutdown waiters if needed.
751    fn drop(&mut self) {
752        let previous = self
753            .inner
754            .inflight_submissions
755            .fetch_sub(1, Ordering::AcqRel);
756        debug_assert!(previous > 0, "fixed pool submit counter underflow");
757        if previous == 1 && !self.inner.accepting.load(Ordering::Acquire) {
758            self.inner.state.notify_all();
759        }
760    }
761}
762
763/// Builder for [`FixedThreadPool`].
764///
765/// The fixed pool prestarts exactly `pool_size` workers and never changes that
766/// count during runtime.
767#[derive(Debug, Clone)]
768pub struct FixedThreadPoolBuilder {
769    /// Number of workers to prestart.
770    pool_size: usize,
771    /// Optional maximum queued task count.
772    queue_capacity: Option<usize>,
773    /// Prefix used for worker thread names.
774    thread_name_prefix: String,
775    /// Optional worker stack size.
776    stack_size: Option<usize>,
777}
778
779impl FixedThreadPoolBuilder {
780    /// Creates a builder with CPU parallelism defaults.
781    ///
782    /// # Returns
783    ///
784    /// A builder with a fixed worker count equal to available parallelism.
785    pub fn new() -> Self {
786        Self::default()
787    }
788
789    /// Sets the fixed worker count.
790    ///
791    /// # Parameters
792    ///
793    /// * `pool_size` - Number of workers to create.
794    ///
795    /// # Returns
796    ///
797    /// This builder for fluent configuration.
798    pub fn pool_size(mut self, pool_size: usize) -> Self {
799        self.pool_size = pool_size;
800        self
801    }
802
803    /// Sets a bounded queue capacity.
804    ///
805    /// # Parameters
806    ///
807    /// * `capacity` - Maximum number of queued tasks.
808    ///
809    /// # Returns
810    ///
811    /// This builder for fluent configuration.
812    pub fn queue_capacity(mut self, capacity: usize) -> Self {
813        self.queue_capacity = Some(capacity);
814        self
815    }
816
817    /// Uses an unbounded queue.
818    ///
819    /// # Returns
820    ///
821    /// This builder for fluent configuration.
822    pub fn unbounded_queue(mut self) -> Self {
823        self.queue_capacity = None;
824        self
825    }
826
827    /// Sets the worker thread name prefix.
828    ///
829    /// # Parameters
830    ///
831    /// * `prefix` - Prefix used for worker thread names.
832    ///
833    /// # Returns
834    ///
835    /// This builder for fluent configuration.
836    pub fn thread_name_prefix(mut self, prefix: &str) -> Self {
837        self.thread_name_prefix = prefix.to_owned();
838        self
839    }
840
841    /// Sets the worker stack size.
842    ///
843    /// # Parameters
844    ///
845    /// * `stack_size` - Stack size in bytes.
846    ///
847    /// # Returns
848    ///
849    /// This builder for fluent configuration.
850    pub fn stack_size(mut self, stack_size: usize) -> Self {
851        self.stack_size = Some(stack_size);
852        self
853    }
854
855    /// Builds the configured fixed thread pool.
856    ///
857    /// # Returns
858    ///
859    /// A fixed pool with all workers prestarted.
860    ///
861    /// # Errors
862    ///
863    /// Returns [`ThreadPoolBuildError`] when configuration is invalid or a
864    /// worker thread cannot be spawned.
865    pub fn build(self) -> Result<FixedThreadPool, ThreadPoolBuildError> {
866        self.validate()?;
867        let mut worker_runtimes = Vec::with_capacity(self.pool_size);
868        let mut worker_queues = Vec::with_capacity(self.pool_size);
869        for index in 0..self.pool_size {
870            let worker_runtime = WorkerRuntime::new(index);
871            worker_queues.push(Arc::clone(&worker_runtime.queue));
872            worker_runtimes.push(worker_runtime);
873        }
874        let inner = Arc::new(FixedThreadPoolInner::new(
875            self.pool_size,
876            self.queue_capacity,
877            worker_queues,
878        ));
879        for (index, worker_runtime) in worker_runtimes.into_iter().enumerate() {
880            inner.reserve_worker_slot();
881            let worker_inner = Arc::clone(&inner);
882            let mut builder =
883                thread::Builder::new().name(format!("{}-{}", self.thread_name_prefix, index));
884            if let Some(stack_size) = self.stack_size {
885                builder = builder.stack_size(stack_size);
886            }
887            if let Err(source) =
888                builder.spawn(move || run_fixed_worker(worker_inner, worker_runtime))
889            {
890                inner.rollback_worker_slot();
891                inner.stop_after_failed_build();
892                return Err(ThreadPoolBuildError::SpawnWorker { index, source });
893            }
894        }
895        Ok(FixedThreadPool { inner })
896    }
897
898    /// Validates this builder configuration.
899    ///
900    /// # Returns
901    ///
902    /// `Ok(())` when configuration is valid.
903    ///
904    /// # Errors
905    ///
906    /// Returns [`ThreadPoolBuildError`] for zero pool size, zero queue capacity,
907    /// or zero stack size.
908    fn validate(&self) -> Result<(), ThreadPoolBuildError> {
909        if self.pool_size == 0 {
910            return Err(ThreadPoolBuildError::ZeroMaximumPoolSize);
911        }
912        if self.queue_capacity == Some(0) {
913            return Err(ThreadPoolBuildError::ZeroQueueCapacity);
914        }
915        if self.stack_size == Some(0) {
916            return Err(ThreadPoolBuildError::ZeroStackSize);
917        }
918        Ok(())
919    }
920}
921
922impl Default for FixedThreadPoolBuilder {
923    /// Creates a builder using available CPU parallelism.
924    ///
925    /// # Returns
926    ///
927    /// Default fixed-pool builder.
928    fn default() -> Self {
929        Self {
930            pool_size: default_fixed_pool_size(),
931            queue_capacity: None,
932            thread_name_prefix: DEFAULT_FIXED_THREAD_NAME_PREFIX.to_owned(),
933            stack_size: None,
934        }
935    }
936}
937
938/// Fixed-size thread pool implementing [`ExecutorService`].
939///
940/// `FixedThreadPool` prestarts a fixed number of worker threads and does not
941/// support runtime pool-size changes. Use [`super::ThreadPool`] when dynamic
942/// core/maximum sizes or keep-alive policies are required.
943pub struct FixedThreadPool {
944    /// Shared fixed pool state.
945    inner: Arc<FixedThreadPoolInner>,
946}
947
948impl FixedThreadPool {
949    /// Creates a fixed thread pool with `pool_size` prestarted workers.
950    ///
951    /// # Parameters
952    ///
953    /// * `pool_size` - Number of worker threads.
954    ///
955    /// # Returns
956    ///
957    /// A fixed thread pool.
958    ///
959    /// # Errors
960    ///
961    /// Returns [`ThreadPoolBuildError`] if the worker count is zero or a worker
962    /// cannot be spawned.
963    pub fn new(pool_size: usize) -> Result<Self, ThreadPoolBuildError> {
964        Self::builder().pool_size(pool_size).build()
965    }
966
967    /// Creates a fixed pool builder.
968    ///
969    /// # Returns
970    ///
971    /// Builder with CPU parallelism defaults.
972    pub fn builder() -> FixedThreadPoolBuilder {
973        FixedThreadPoolBuilder::new()
974    }
975
976    /// Returns the fixed worker count.
977    ///
978    /// # Returns
979    ///
980    /// Number of workers in this pool.
981    pub fn pool_size(&self) -> usize {
982        self.inner.pool_size()
983    }
984
985    /// Returns the queued task count.
986    ///
987    /// # Returns
988    ///
989    /// Number of accepted tasks waiting to run.
990    pub fn queued_count(&self) -> usize {
991        self.inner.queued_count()
992    }
993
994    /// Returns the running task count.
995    ///
996    /// # Returns
997    ///
998    /// Number of tasks currently held by workers.
999    pub fn running_count(&self) -> usize {
1000        self.inner.running_count()
1001    }
1002
1003    /// Returns the live worker count.
1004    ///
1005    /// # Returns
1006    ///
1007    /// Number of worker loops that have not exited.
1008    pub fn live_worker_count(&self) -> usize {
1009        self.inner.state.read(|state| state.live_workers)
1010    }
1011
1012    /// Returns a point-in-time stats snapshot.
1013    ///
1014    /// # Returns
1015    ///
1016    /// Snapshot containing queue, worker, and lifecycle counters.
1017    pub fn stats(&self) -> ThreadPoolStats {
1018        self.inner.stats()
1019    }
1020}
1021
1022impl Drop for FixedThreadPool {
1023    /// Requests graceful shutdown when the pool handle is dropped.
1024    fn drop(&mut self) {
1025        self.inner.shutdown();
1026    }
1027}
1028
1029impl ExecutorService for FixedThreadPool {
1030    type Handle<R, E>
1031        = TaskHandle<R, E>
1032    where
1033        R: Send + 'static,
1034        E: Send + 'static;
1035
1036    type Termination<'a>
1037        = Pin<Box<dyn Future<Output = ()> + Send + 'a>>
1038    where
1039        Self: 'a;
1040
1041    /// Accepts a callable and queues it for fixed pool workers.
1042    ///
1043    /// # Parameters
1044    ///
1045    /// * `task` - Callable to execute on a fixed pool worker.
1046    ///
1047    /// # Returns
1048    ///
1049    /// A [`TaskHandle`] for the accepted task.
1050    ///
1051    /// # Errors
1052    ///
1053    /// Returns [`RejectedExecution::Shutdown`] after shutdown or
1054    /// [`RejectedExecution::Saturated`] when a bounded queue is full.
1055    fn submit_callable<C, R, E>(&self, task: C) -> Result<Self::Handle<R, E>, RejectedExecution>
1056    where
1057        C: Callable<R, E> + Send + 'static,
1058        R: Send + 'static,
1059        E: Send + 'static,
1060    {
1061        let (handle, completion) = TaskCompletionPair::new().into_parts();
1062        let job = PoolJob::from_task(task, completion);
1063        self.inner.submit(job)?;
1064        Ok(handle)
1065    }
1066
1067    /// Stops accepting new work and drains accepted queued tasks.
1068    fn shutdown(&self) {
1069        self.inner.shutdown();
1070    }
1071
1072    /// Stops accepting work and cancels queued tasks.
1073    ///
1074    /// # Returns
1075    ///
1076    /// A count-based shutdown report.
1077    fn shutdown_now(&self) -> ShutdownReport {
1078        self.inner.shutdown_now()
1079    }
1080
1081    /// Returns whether shutdown has been requested.
1082    ///
1083    /// # Returns
1084    ///
1085    /// `true` when this pool no longer accepts new work.
1086    fn is_shutdown(&self) -> bool {
1087        self.inner.is_shutdown()
1088    }
1089
1090    /// Returns whether this pool is fully terminated.
1091    ///
1092    /// # Returns
1093    ///
1094    /// `true` after shutdown and after all workers have exited.
1095    fn is_terminated(&self) -> bool {
1096        self.inner.is_terminated()
1097    }
1098
1099    /// Waits until this fixed pool has terminated.
1100    ///
1101    /// # Returns
1102    ///
1103    /// A future that blocks the polling thread until termination.
1104    fn await_termination(&self) -> Self::Termination<'_> {
1105        Box::pin(async move {
1106            self.inner.wait_for_termination();
1107        })
1108    }
1109}
1110
1111/// Runs one fixed-pool worker loop.
1112///
1113/// # Parameters
1114///
1115/// * `inner` - Shared fixed-pool state.
1116/// * `worker_runtime` - Queue runtime owned by this worker.
1117fn run_fixed_worker(inner: Arc<FixedThreadPoolInner>, worker_runtime: WorkerRuntime) {
1118    worker_runtime.queue.activate();
1119    loop {
1120        if let Some(job) = inner.try_take_job(&worker_runtime) {
1121            job.run();
1122            inner.finish_running_job();
1123            continue;
1124        }
1125        if !wait_for_fixed_pool_work(&inner) {
1126            break;
1127        }
1128    }
1129    worker_exited(&inner, &worker_runtime.queue);
1130}
1131
1132/// Waits until visible work exists or the worker should exit.
1133///
1134/// # Parameters
1135///
1136/// * `inner` - Shared fixed-pool state.
1137///
1138/// # Returns
1139///
1140/// `true` when the worker should try to take work again, or `false` when it
1141/// should exit.
1142fn wait_for_fixed_pool_work(inner: &FixedThreadPoolInner) -> bool {
1143    let mut state = inner.state.lock();
1144    loop {
1145        match state.lifecycle {
1146            FixedThreadPoolLifecycle::Running => {
1147                if inner.queued_count() > 0 {
1148                    return true;
1149                }
1150                mark_fixed_worker_idle(inner, &mut state);
1151                if inner.queued_count() > 0 || inner.has_pending_worker_wake() {
1152                    unmark_fixed_worker_idle(inner, &mut state);
1153                    return true;
1154                }
1155                state = state.wait();
1156                unmark_fixed_worker_idle(inner, &mut state);
1157            }
1158            FixedThreadPoolLifecycle::Shutdown => {
1159                if inner.queued_count() > 0 {
1160                    return true;
1161                }
1162                if inner.queued_count() == 0 && inner.inflight_count() == 0 {
1163                    return false;
1164                }
1165                mark_fixed_worker_idle(inner, &mut state);
1166                if inner.queued_count() > 0
1167                    || inner.inflight_count() == 0
1168                    || inner.has_pending_worker_wake()
1169                {
1170                    unmark_fixed_worker_idle(inner, &mut state);
1171                    continue;
1172                }
1173                state = state.wait();
1174                unmark_fixed_worker_idle(inner, &mut state);
1175            }
1176            FixedThreadPoolLifecycle::Stopping => return false,
1177        }
1178    }
1179}
1180
1181/// Marks a fixed-pool worker as idle in locked and lock-free state.
1182///
1183/// # Parameters
1184///
1185/// * `inner` - Fixed pool whose idle counter is updated.
1186/// * `state` - Locked mutable state containing authoritative idle workers.
1187fn mark_fixed_worker_idle(inner: &FixedThreadPoolInner, state: &mut FixedThreadPoolState) {
1188    state.idle_workers += 1;
1189    inner.idle_worker_count.fetch_add(1, Ordering::AcqRel);
1190}
1191
1192/// Marks a fixed-pool worker as no longer idle.
1193///
1194/// # Parameters
1195///
1196/// * `inner` - Fixed pool whose idle counter is updated.
1197/// * `state` - Locked mutable state containing authoritative idle workers.
1198fn unmark_fixed_worker_idle(inner: &FixedThreadPoolInner, state: &mut FixedThreadPoolState) {
1199    state.idle_workers = state
1200        .idle_workers
1201        .checked_sub(1)
1202        .expect("fixed pool idle worker counter underflow");
1203    let previous = inner.idle_worker_count.fetch_sub(1, Ordering::AcqRel);
1204    debug_assert!(previous > 0, "fixed pool idle worker counter underflow");
1205    inner.consume_pending_worker_wake();
1206}
1207
1208/// Marks one fixed-pool worker as exited.
1209///
1210/// # Parameters
1211///
1212/// * `inner` - Shared fixed-pool state.
1213/// * `worker_queue` - Queue owned by the exiting worker.
1214fn worker_exited(inner: &FixedThreadPoolInner, worker_queue: &WorkerQueue) {
1215    worker_queue.deactivate();
1216    inner.state.write(|state| {
1217        state.live_workers = state
1218            .live_workers
1219            .checked_sub(1)
1220            .expect("fixed pool live worker counter underflow");
1221    });
1222    inner.state.notify_all();
1223}
1224
1225/// Returns the default fixed worker count.
1226///
1227/// # Returns
1228///
1229/// Available CPU parallelism, or `1` if it cannot be detected.
1230fn default_fixed_pool_size() -> usize {
1231    thread::available_parallelism()
1232        .map(usize::from)
1233        .unwrap_or(1)
1234}
1235
1236#[cfg(test)]
1237mod tests {
1238    use super::*;
1239    use std::sync::{
1240        Arc,
1241        atomic::{
1242            AtomicUsize,
1243            Ordering,
1244        },
1245    };
1246    use std::thread;
1247    use std::time::Duration;
1248
1249    fn counted_job(cancelled: Arc<AtomicUsize>, ran: Arc<AtomicUsize>) -> PoolJob {
1250        PoolJob::new(
1251            Box::new(move || {
1252                ran.fetch_add(1, Ordering::AcqRel);
1253            }),
1254            Box::new(move || {
1255                cancelled.fetch_add(1, Ordering::AcqRel);
1256            }),
1257        )
1258    }
1259
1260    #[test]
1261    fn test_accept_claimed_job_stop_now_cancels_claimed_and_worker_queues() {
1262        let runtime = WorkerRuntime::new(0);
1263        runtime.queue.activate();
1264        let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1265        inner.stop_now.store(true, Ordering::Release);
1266
1267        let cancelled = Arc::new(AtomicUsize::new(0));
1268        let ran = Arc::new(AtomicUsize::new(0));
1269        runtime
1270            .local
1271            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1272        runtime
1273            .queue
1274            .push_back(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1275        inner.queued_task_count.store(3, Ordering::Release);
1276
1277        let accepted =
1278            inner.accept_claimed_job(counted_job(cancelled.clone(), ran.clone()), &runtime);
1279        assert!(accepted.is_none());
1280        assert_eq!(inner.queued_count(), 0);
1281        assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 3);
1282        assert_eq!(cancelled.load(Ordering::Acquire), 3);
1283        assert_eq!(ran.load(Ordering::Acquire), 0);
1284    }
1285
1286    #[test]
1287    fn test_steal_global_job_notifies_when_batch_leaves_local_jobs() {
1288        let runtime = WorkerRuntime::new(0);
1289        let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1290        let cancelled = Arc::new(AtomicUsize::new(0));
1291        let ran = Arc::new(AtomicUsize::new(0));
1292        runtime
1293            .local
1294            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1295        inner
1296            .global_queue
1297            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1298        inner.queued_task_count.store(2, Ordering::Release);
1299
1300        let claimed = inner
1301            .steal_global_job(&runtime)
1302            .expect("global queue should provide one claimed job");
1303        claimed.run();
1304        inner.finish_running_job();
1305        let remaining = runtime
1306            .local
1307            .pop()
1308            .expect("preloaded local job should remain queued");
1309        inner.cancel_claimed_job(remaining);
1310
1311        assert_eq!(inner.queued_count(), 0);
1312        assert_eq!(inner.running_count(), 0);
1313        assert_eq!(inner.completed_task_count.load(Ordering::Acquire), 1);
1314        assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 1);
1315        assert_eq!(ran.load(Ordering::Acquire), 1);
1316        assert_eq!(cancelled.load(Ordering::Acquire), 1);
1317    }
1318
1319    #[test]
1320    fn test_steal_worker_job_notifies_when_batch_leaves_local_jobs() {
1321        let thief = WorkerRuntime::new(0);
1322        let victim = WorkerRuntime::new(1);
1323        thief.queue.activate();
1324        victim.queue.activate();
1325        let inner = FixedThreadPoolInner::new(
1326            2,
1327            None,
1328            vec![Arc::clone(&thief.queue), Arc::clone(&victim.queue)],
1329        );
1330        let cancelled = Arc::new(AtomicUsize::new(0));
1331        let ran = Arc::new(AtomicUsize::new(0));
1332        thief
1333            .local
1334            .push(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1335        victim
1336            .queue
1337            .push_back(counted_job(Arc::clone(&cancelled), Arc::clone(&ran)));
1338        inner.queued_task_count.store(2, Ordering::Release);
1339
1340        let claimed = inner
1341            .steal_worker_job(&thief)
1342            .expect("victim queue should provide one claimed job");
1343        claimed.run();
1344        inner.finish_running_job();
1345        let remaining = thief
1346            .local
1347            .pop()
1348            .expect("batch steal should leave one local job");
1349        inner.cancel_claimed_job(remaining);
1350
1351        assert_eq!(inner.queued_count(), 0);
1352        assert_eq!(inner.running_count(), 0);
1353        assert_eq!(inner.completed_task_count.load(Ordering::Acquire), 1);
1354        assert_eq!(inner.cancelled_task_count.load(Ordering::Acquire), 1);
1355        assert_eq!(ran.load(Ordering::Acquire), 1);
1356        assert_eq!(cancelled.load(Ordering::Acquire), 1);
1357    }
1358
1359    #[test]
1360    fn test_fixed_submit_guard_drop_notifies_when_shutdown_closes_admission() {
1361        let runtime = WorkerRuntime::new(0);
1362        let inner = FixedThreadPoolInner::new(1, None, vec![Arc::clone(&runtime.queue)]);
1363        inner.inflight_submissions.store(1, Ordering::Release);
1364        inner.accepting.store(false, Ordering::Release);
1365
1366        {
1367            let guard = FixedSubmitGuard { inner: &inner };
1368            drop(guard);
1369        }
1370
1371        assert_eq!(inner.inflight_count(), 0);
1372    }
1373
1374    #[test]
1375    fn test_wait_for_fixed_pool_work_shutdown_waits_for_inflight_submissions() {
1376        let runtime = WorkerRuntime::new(0);
1377        let inner = Arc::new(FixedThreadPoolInner::new(
1378            1,
1379            None,
1380            vec![Arc::clone(&runtime.queue)],
1381        ));
1382        inner.state.write(|state| {
1383            state.lifecycle = FixedThreadPoolLifecycle::Shutdown;
1384        });
1385        inner.inflight_submissions.store(1, Ordering::Release);
1386        inner.pending_worker_wakes.store(1, Ordering::Release);
1387
1388        let inner_for_release = Arc::clone(&inner);
1389        let releaser = thread::spawn(move || {
1390            thread::sleep(Duration::from_millis(10));
1391            inner_for_release
1392                .inflight_submissions
1393                .store(0, Ordering::Release);
1394            inner_for_release.state.notify_all();
1395        });
1396
1397        assert!(!wait_for_fixed_pool_work(&inner));
1398        releaser.join().expect("releaser thread should finish");
1399    }
1400
1401    #[test]
1402    fn test_shutdown_now_waits_for_inflight_submissions() {
1403        let runtime = WorkerRuntime::new(0);
1404        let inner = Arc::new(FixedThreadPoolInner::new(
1405            1,
1406            None,
1407            vec![Arc::clone(&runtime.queue)],
1408        ));
1409        inner.inflight_submissions.store(1, Ordering::Release);
1410
1411        let inner_for_release = Arc::clone(&inner);
1412        let releaser = thread::spawn(move || {
1413            thread::sleep(Duration::from_millis(10));
1414            inner_for_release
1415                .inflight_submissions
1416                .store(0, Ordering::Release);
1417            inner_for_release.state.notify_all();
1418        });
1419
1420        let report = inner.shutdown_now();
1421        releaser.join().expect("releaser thread should finish");
1422        assert_eq!(report.running, 0);
1423        assert_eq!(report.queued, 0);
1424        assert_eq!(report.cancelled, 0);
1425    }
1426}