armature_core/
worker.rs

1//! Per-Worker State Management
2//!
3//! This module provides per-worker (thread-local) state to avoid Arc cloning
4//! overhead on the hot path. Instead of cloning `Arc<Router>` for every
5//! request, each worker thread maintains its own reference.
6//!
7//! ## Performance Benefits
8//!
9//! ```text
10//! Arc clone path:
11//! Request → Arc::clone(&router) → atomic increment → handle
12//!
13//! Per-worker path:
14//! Request → thread_local router ref → handle (no atomic ops)
15//! ```
16//!
17//! This eliminates atomic reference counting on every request, which can
18//! save 2-3% throughput under high concurrency.
19//!
20//! ## Usage
21//!
22//! ```rust,ignore
23//! use armature_core::worker::{WorkerRouter, init_worker_router};
24//!
25//! // Initialize once per worker thread
26//! init_worker_router(router.clone());
27//!
28//! // Access router without cloning Arc
29//! WorkerRouter::with(|router| {
30//!     router.route(request).await
31//! });
32//! ```
33
34use crate::Router;
35use std::cell::RefCell;
36use std::sync::Arc;
37use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
38
39// ============================================================================
40// Worker ID Generation
41// ============================================================================
42
43/// Global worker ID counter
44static WORKER_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
45
46/// Get the next worker ID
47#[inline]
48pub fn next_worker_id() -> usize {
49    WORKER_ID_COUNTER.fetch_add(1, Ordering::Relaxed)
50}
51
52/// Get total workers spawned
53#[inline]
54pub fn total_workers() -> usize {
55    WORKER_ID_COUNTER.load(Ordering::Relaxed)
56}
57
58// ============================================================================
59// Per-Worker Router Storage
60// ============================================================================
61
62thread_local! {
63    /// Thread-local router storage
64    static WORKER_ROUTER: RefCell<Option<Arc<Router>>> = const { RefCell::new(None) };
65
66    /// Thread-local worker ID
67    static WORKER_ID: RefCell<Option<usize>> = const { RefCell::new(None) };
68}
69
70/// Initialize the thread-local router for the current worker.
71///
72/// Call this once when spawning a new worker task.
73///
74/// # Example
75///
76/// ```rust,ignore
77/// tokio::spawn(async move {
78///     init_worker_router(router.clone());
79///     // Now can use WorkerRouter::with() without cloning
80/// });
81/// ```
82#[inline]
83pub fn init_worker_router(router: Arc<Router>) {
84    WORKER_ROUTER.with(|r| {
85        *r.borrow_mut() = Some(router);
86    });
87    WORKER_ID.with(|id| {
88        if id.borrow().is_none() {
89            *id.borrow_mut() = Some(next_worker_id());
90        }
91    });
92    WORKER_STATS.record_init();
93}
94
95/// Clear the thread-local router (for cleanup/testing).
96#[inline]
97pub fn clear_worker_router() {
98    WORKER_ROUTER.with(|r| {
99        *r.borrow_mut() = None;
100    });
101}
102
103/// Get the current worker's ID.
104#[inline]
105pub fn worker_id() -> Option<usize> {
106    WORKER_ID.with(|id| *id.borrow())
107}
108
109/// Check if the current thread has a worker router initialized.
110#[inline]
111pub fn has_worker_router() -> bool {
112    WORKER_ROUTER.with(|r| r.borrow().is_some())
113}
114
115// ============================================================================
116// Worker Router Access
117// ============================================================================
118
119/// Per-worker router accessor.
120///
121/// This provides zero-cost access to the router without Arc cloning.
122pub struct WorkerRouter;
123
124impl WorkerRouter {
125    /// Execute a closure with the worker's router.
126    ///
127    /// This is the primary way to access the router without cloning.
128    /// The closure receives a reference to the router.
129    ///
130    /// # Panics
131    ///
132    /// Panics if called from a thread without an initialized worker router.
133    /// Use `try_with` for a non-panicking version.
134    ///
135    /// # Example
136    ///
137    /// ```rust,ignore
138    /// let response = WorkerRouter::with(|router| {
139    ///     router.route(request).await
140    /// });
141    /// ```
142    #[inline]
143    pub fn with<F, R>(f: F) -> R
144    where
145        F: FnOnce(&Router) -> R,
146    {
147        WORKER_ROUTER.with(|r| {
148            let router_ref = r.borrow();
149            let router = router_ref
150                .as_ref()
151                .expect("WorkerRouter not initialized. Call init_worker_router first.");
152            WORKER_STATS.record_access();
153            f(router)
154        })
155    }
156
157    /// Try to execute a closure with the worker's router.
158    ///
159    /// Returns `None` if no worker router is initialized.
160    #[inline]
161    pub fn try_with<F, R>(f: F) -> Option<R>
162    where
163        F: FnOnce(&Router) -> R,
164    {
165        WORKER_ROUTER.with(|r| {
166            let router_ref = r.borrow();
167            router_ref.as_ref().map(|router| {
168                WORKER_STATS.record_access();
169                f(router)
170            })
171        })
172    }
173
174    /// Get a clone of the worker's router (fallback for async contexts).
175    ///
176    /// Use this when you need to move the router into an async block.
177    /// This still clones the Arc, but only once per request instead of
178    /// multiple times in nested closures.
179    #[inline]
180    pub fn clone_arc() -> Option<Arc<Router>> {
181        WORKER_ROUTER.with(|r| {
182            let router_ref = r.borrow();
183            router_ref.as_ref().map(|router| {
184                WORKER_STATS.record_clone();
185                Arc::clone(router)
186            })
187        })
188    }
189
190    /// Get a clone of the worker's router, or panic if not initialized.
191    #[inline]
192    pub fn clone_arc_or_panic() -> Arc<Router> {
193        Self::clone_arc().expect("WorkerRouter not initialized")
194    }
195}
196
197// ============================================================================
198// Worker Configuration
199// ============================================================================
200
201/// Configuration for worker threads.
202#[derive(Debug, Clone)]
203pub struct WorkerConfig {
204    /// Number of worker threads (0 = use number of CPU cores)
205    pub num_workers: usize,
206    /// Enable CPU core affinity (pin workers to cores)
207    pub cpu_affinity: bool,
208    /// Stack size for worker threads (bytes)
209    pub stack_size: Option<usize>,
210    /// Worker thread name prefix
211    pub name_prefix: String,
212}
213
214impl Default for WorkerConfig {
215    fn default() -> Self {
216        Self {
217            num_workers: 0, // Auto-detect
218            cpu_affinity: false,
219            stack_size: None,
220            name_prefix: "armature-worker".to_string(),
221        }
222    }
223}
224
225impl WorkerConfig {
226    /// Create a new worker configuration.
227    pub fn new() -> Self {
228        Self::default()
229    }
230
231    /// Set the number of worker threads.
232    ///
233    /// Use 0 for auto-detection (number of CPU cores).
234    #[inline]
235    pub fn workers(mut self, n: usize) -> Self {
236        self.num_workers = n;
237        self
238    }
239
240    /// Enable CPU core affinity.
241    ///
242    /// When enabled, workers are pinned to specific CPU cores for
243    /// better cache locality.
244    #[inline]
245    pub fn with_cpu_affinity(mut self) -> Self {
246        self.cpu_affinity = true;
247        self
248    }
249
250    /// Set the worker thread stack size.
251    #[inline]
252    pub fn stack_size(mut self, size: usize) -> Self {
253        self.stack_size = Some(size);
254        self
255    }
256
257    /// Set the worker thread name prefix.
258    #[inline]
259    pub fn name_prefix(mut self, prefix: impl Into<String>) -> Self {
260        self.name_prefix = prefix.into();
261        self
262    }
263
264    /// Get the effective number of workers.
265    ///
266    /// Returns `num_workers` if set, otherwise returns the number of CPU cores.
267    #[inline]
268    pub fn effective_workers(&self) -> usize {
269        if self.num_workers > 0 {
270            self.num_workers
271        } else {
272            std::thread::available_parallelism()
273                .map(|n| n.get())
274                .unwrap_or(1)
275        }
276    }
277}
278
279// ============================================================================
280// CPU Core Affinity
281// ============================================================================
282
283/// CPU core affinity configuration.
284#[derive(Debug, Clone)]
285pub struct AffinityConfig {
286    /// Enable core pinning
287    pub enabled: bool,
288    /// Specific cores to use (empty = all available)
289    pub cores: Vec<usize>,
290    /// Affinity mode
291    pub mode: AffinityMode,
292}
293
294impl Default for AffinityConfig {
295    fn default() -> Self {
296        Self {
297            enabled: false,
298            cores: Vec::new(),
299            mode: AffinityMode::RoundRobin,
300        }
301    }
302}
303
304impl AffinityConfig {
305    /// Create a new affinity configuration.
306    pub fn new() -> Self {
307        Self::default()
308    }
309
310    /// Enable CPU affinity.
311    #[inline]
312    pub fn enable(mut self) -> Self {
313        self.enabled = true;
314        self
315    }
316
317    /// Disable CPU affinity.
318    #[inline]
319    pub fn disable(mut self) -> Self {
320        self.enabled = false;
321        self
322    }
323
324    /// Set specific cores to use.
325    #[inline]
326    pub fn cores(mut self, cores: Vec<usize>) -> Self {
327        self.cores = cores;
328        self
329    }
330
331    /// Set affinity mode.
332    #[inline]
333    pub fn mode(mut self, mode: AffinityMode) -> Self {
334        self.mode = mode;
335        self
336    }
337
338    /// Get the core to pin a worker to based on worker ID.
339    #[inline]
340    pub fn core_for_worker(&self, worker_id: usize) -> usize {
341        if self.cores.is_empty() {
342            // Use all available cores
343            let num_cores = num_cpus();
344            match self.mode {
345                AffinityMode::RoundRobin => worker_id % num_cores,
346                AffinityMode::Packed => worker_id.min(num_cores - 1),
347                AffinityMode::Spread => {
348                    // Spread across cores with gaps
349                    let stride = num_cores / 2;
350                    (worker_id * stride.max(1)) % num_cores
351                }
352            }
353        } else {
354            // Use specified cores
355            self.cores[worker_id % self.cores.len()]
356        }
357    }
358}
359
360/// CPU affinity mode - how workers are assigned to cores.
361#[derive(Debug, Clone, Copy, PartialEq, Eq)]
362pub enum AffinityMode {
363    /// Round-robin assignment: worker 0 → core 0, worker 1 → core 1, etc.
364    RoundRobin,
365    /// Pack workers on first N cores
366    Packed,
367    /// Spread workers across cores with gaps (better for hyper-threading)
368    Spread,
369}
370
371/// Get the number of CPU cores.
372#[inline]
373pub fn num_cpus() -> usize {
374    std::thread::available_parallelism()
375        .map(|n| n.get())
376        .unwrap_or(1)
377}
378
379/// Get the number of physical CPU cores (excluding hyper-threads).
380///
381/// On systems without hyper-threading, this returns the same as `num_cpus()`.
382#[inline]
383pub fn num_physical_cpus() -> usize {
384    // On most systems, physical cores = total cores / 2 if hyper-threading
385    // This is a heuristic; for accurate info, use platform-specific APIs
386    let total = num_cpus();
387    // Assume hyper-threading if > 4 cores and even number
388    if total > 4 && total.is_multiple_of(2) {
389        total / 2
390    } else {
391        total
392    }
393}
394
395/// Set CPU affinity for the current thread.
396///
397/// This pins the current thread to the specified CPU core.
398///
399/// # Platform Support
400///
401/// - Linux: Uses `sched_setaffinity`
402/// - macOS/Windows: No-op (returns Ok but doesn't pin)
403///
404/// # Example
405///
406/// ```rust,ignore
407/// // Pin current thread to core 0
408/// set_thread_affinity(0)?;
409/// ```
410#[inline]
411pub fn set_thread_affinity(core: usize) -> Result<(), AffinityError> {
412    #[cfg(target_os = "linux")]
413    {
414        set_thread_affinity_linux(core)
415    }
416
417    #[cfg(not(target_os = "linux"))]
418    {
419        // No-op on non-Linux platforms
420        let _ = core;
421        Ok(())
422    }
423}
424
425/// Set CPU affinity on Linux using sched_setaffinity.
426#[cfg(target_os = "linux")]
427fn set_thread_affinity_linux(core: usize) -> Result<(), AffinityError> {
428    use std::mem;
429
430    // Check if core is valid
431    let num_cores = num_cpus();
432    if core >= num_cores {
433        return Err(AffinityError::InvalidCore {
434            core,
435            max: num_cores - 1,
436        });
437    }
438
439    // cpu_set_t is 1024 bits = 128 bytes on Linux
440    // We use a simplified version that supports up to 64 cores
441    let mut mask: u64 = 0;
442    mask |= 1u64 << core;
443
444    // SAFETY: sched_setaffinity is a safe syscall when called with correct parameters
445    unsafe {
446        let result = libc::sched_setaffinity(
447            0, // 0 = current thread
448            mem::size_of::<u64>(),
449            &mask as *const u64 as *const libc::cpu_set_t,
450        );
451
452        if result == 0 {
453            AFFINITY_STATS.record_set(true);
454            Ok(())
455        } else {
456            AFFINITY_STATS.record_set(false);
457            Err(AffinityError::SystemError {
458                errno: *libc::__errno_location(),
459            })
460        }
461    }
462}
463
464/// Error setting CPU affinity.
465#[derive(Debug, Clone)]
466pub enum AffinityError {
467    /// Invalid core number
468    InvalidCore { core: usize, max: usize },
469    /// System error (Linux errno)
470    SystemError { errno: i32 },
471    /// Platform not supported
472    NotSupported,
473}
474
475impl std::fmt::Display for AffinityError {
476    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
477        match self {
478            Self::InvalidCore { core, max } => {
479                write!(f, "Invalid core {}, max is {}", core, max)
480            }
481            Self::SystemError { errno } => {
482                write!(f, "System error: errno {}", errno)
483            }
484            Self::NotSupported => write!(f, "CPU affinity not supported on this platform"),
485        }
486    }
487}
488
489impl std::error::Error for AffinityError {}
490
491/// Get the CPU affinity of the current thread.
492///
493/// Returns the set of cores the thread is allowed to run on.
494#[inline]
495pub fn get_thread_affinity() -> Result<Vec<usize>, AffinityError> {
496    #[cfg(target_os = "linux")]
497    {
498        get_thread_affinity_linux()
499    }
500
501    #[cfg(not(target_os = "linux"))]
502    {
503        // Return all cores on non-Linux
504        Ok((0..num_cpus()).collect())
505    }
506}
507
508/// Get CPU affinity on Linux.
509#[cfg(target_os = "linux")]
510fn get_thread_affinity_linux() -> Result<Vec<usize>, AffinityError> {
511    use std::mem;
512
513    let mut mask: u64 = 0;
514
515    // SAFETY: sched_getaffinity is a safe syscall
516    unsafe {
517        let result = libc::sched_getaffinity(
518            0,
519            mem::size_of::<u64>(),
520            &mut mask as *mut u64 as *mut libc::cpu_set_t,
521        );
522
523        if result == 0 {
524            let mut cores = Vec::new();
525            for i in 0..64 {
526                if (mask & (1u64 << i)) != 0 {
527                    cores.push(i);
528                }
529            }
530            Ok(cores)
531        } else {
532            Err(AffinityError::SystemError {
533                errno: *libc::__errno_location(),
534            })
535        }
536    }
537}
538
539/// Check if CPU affinity is supported on this platform.
540#[inline]
541pub fn affinity_supported() -> bool {
542    cfg!(target_os = "linux")
543}
544
545/// Initialize a worker with CPU affinity.
546///
547/// This is a convenience function that:
548/// 1. Sets CPU affinity based on worker ID
549/// 2. Initializes the thread-local router
550///
551/// # Example
552///
553/// ```rust,ignore
554/// let config = AffinityConfig::new().enable();
555///
556/// tokio::spawn(async move {
557///     init_worker_with_affinity(worker_id, &config, router.clone())?;
558///     // Worker is now pinned to a core and has router access
559/// });
560/// ```
561#[inline]
562pub fn init_worker_with_affinity(
563    worker_id: usize,
564    config: &AffinityConfig,
565    router: Arc<Router>,
566) -> Result<(), AffinityError> {
567    // Set CPU affinity if enabled
568    if config.enabled && affinity_supported() {
569        let core = config.core_for_worker(worker_id);
570        set_thread_affinity(core)?;
571    }
572
573    // Initialize thread-local router
574    init_worker_router(router);
575
576    Ok(())
577}
578
579// ============================================================================
580// Affinity Statistics
581// ============================================================================
582
583/// Statistics for CPU affinity operations.
584#[derive(Debug, Default)]
585pub struct AffinityStats {
586    /// Successful affinity sets
587    successful: AtomicU64,
588    /// Failed affinity sets
589    failed: AtomicU64,
590}
591
592impl AffinityStats {
593    /// Create new stats.
594    pub fn new() -> Self {
595        Self::default()
596    }
597
598    #[inline]
599    fn record_set(&self, success: bool) {
600        if success {
601            self.successful.fetch_add(1, Ordering::Relaxed);
602        } else {
603            self.failed.fetch_add(1, Ordering::Relaxed);
604        }
605    }
606
607    /// Get successful sets.
608    pub fn successful(&self) -> u64 {
609        self.successful.load(Ordering::Relaxed)
610    }
611
612    /// Get failed sets.
613    pub fn failed(&self) -> u64 {
614        self.failed.load(Ordering::Relaxed)
615    }
616
617    /// Get success rate.
618    pub fn success_rate(&self) -> f64 {
619        let total = self.successful() + self.failed();
620        if total > 0 {
621            (self.successful() as f64 / total as f64) * 100.0
622        } else {
623            0.0
624        }
625    }
626}
627
628/// Global affinity statistics.
629static AFFINITY_STATS: AffinityStats = AffinityStats {
630    successful: AtomicU64::new(0),
631    failed: AtomicU64::new(0),
632};
633
634/// Get global affinity statistics.
635pub fn affinity_stats() -> &'static AffinityStats {
636    &AFFINITY_STATS
637}
638
639// ============================================================================
640// Statistics
641// ============================================================================
642
643/// Statistics for worker router operations.
644#[derive(Debug, Default)]
645pub struct WorkerStats {
646    /// Number of worker initializations
647    inits: AtomicU64,
648    /// Number of router accesses (via with/try_with)
649    accesses: AtomicU64,
650    /// Number of Arc clones (via clone_arc)
651    clones: AtomicU64,
652}
653
654impl WorkerStats {
655    /// Create new stats.
656    pub fn new() -> Self {
657        Self::default()
658    }
659
660    #[inline]
661    fn record_init(&self) {
662        self.inits.fetch_add(1, Ordering::Relaxed);
663    }
664
665    #[inline]
666    fn record_access(&self) {
667        self.accesses.fetch_add(1, Ordering::Relaxed);
668    }
669
670    #[inline]
671    fn record_clone(&self) {
672        self.clones.fetch_add(1, Ordering::Relaxed);
673    }
674
675    /// Get number of initializations.
676    pub fn inits(&self) -> u64 {
677        self.inits.load(Ordering::Relaxed)
678    }
679
680    /// Get number of accesses.
681    pub fn accesses(&self) -> u64 {
682        self.accesses.load(Ordering::Relaxed)
683    }
684
685    /// Get number of Arc clones.
686    pub fn clones(&self) -> u64 {
687        self.clones.load(Ordering::Relaxed)
688    }
689
690    /// Get clone avoidance ratio.
691    ///
692    /// Higher is better - means more accesses without Arc cloning.
693    pub fn clone_avoidance_ratio(&self) -> f64 {
694        let accesses = self.accesses() as f64;
695        let clones = self.clones() as f64;
696        if accesses > 0.0 {
697            ((accesses - clones) / accesses) * 100.0
698        } else {
699            0.0
700        }
701    }
702}
703
704/// Global worker statistics.
705static WORKER_STATS: WorkerStats = WorkerStats {
706    inits: AtomicU64::new(0),
707    accesses: AtomicU64::new(0),
708    clones: AtomicU64::new(0),
709};
710
711/// Get global worker statistics.
712pub fn worker_stats() -> &'static WorkerStats {
713    &WORKER_STATS
714}
715
716// ============================================================================
717// Worker Handle
718// ============================================================================
719
720/// A handle to a worker for tracking and management.
721#[derive(Debug, Clone)]
722pub struct WorkerHandle {
723    /// Worker ID
724    pub id: usize,
725    /// Worker name
726    pub name: String,
727}
728
729impl WorkerHandle {
730    /// Create a new worker handle.
731    pub fn new(id: usize, name_prefix: &str) -> Self {
732        Self {
733            id,
734            name: format!("{}-{}", name_prefix, id),
735        }
736    }
737}
738
739// ============================================================================
740// Per-Worker State
741// ============================================================================
742
743/// Per-worker state storage.
744///
745/// This provides thread-local storage for arbitrary state that needs to be
746/// accessed on the hot path without Arc cloning overhead.
747///
748/// ## Use Cases
749///
750/// - Database connection pools (one per worker)
751/// - Caches (per-worker to avoid contention)
752/// - Metrics collectors
753/// - Random number generators
754/// - Pre-allocated buffers
755///
756/// ## Example
757///
758/// ```rust,ignore
759/// use armature_core::worker::{WorkerState, init_worker_state};
760///
761/// // Define state
762/// struct MyState {
763///     counter: u64,
764///     buffer: Vec<u8>,
765/// }
766///
767/// // Initialize once per worker
768/// init_worker_state(MyState {
769///     counter: 0,
770///     buffer: Vec::with_capacity(4096),
771/// });
772///
773/// // Access without Arc cloning
774/// WorkerState::<MyState>::with_mut(|state| {
775///     state.counter += 1;
776/// });
777/// ```
778pub struct WorkerState<T: 'static> {
779    _marker: std::marker::PhantomData<T>,
780}
781
782// Thread-local storage for arbitrary state
783// Uses a type-erased approach with TypeId for flexibility
784thread_local! {
785    static WORKER_STATE: RefCell<WorkerStateStorage> = RefCell::new(WorkerStateStorage::new());
786}
787
788/// Type-erased storage for per-worker state.
789#[derive(Default)]
790struct WorkerStateStorage {
791    /// Store state by TypeId
792    data: std::collections::HashMap<std::any::TypeId, Box<dyn std::any::Any + Send>>,
793}
794
795impl WorkerStateStorage {
796    fn new() -> Self {
797        Self {
798            data: std::collections::HashMap::new(),
799        }
800    }
801
802    fn insert<T: 'static + Send>(&mut self, value: T) {
803        let type_id = std::any::TypeId::of::<T>();
804        self.data.insert(type_id, Box::new(value));
805    }
806
807    fn get<T: 'static>(&self) -> Option<&T> {
808        let type_id = std::any::TypeId::of::<T>();
809        self.data.get(&type_id).and_then(|b| b.downcast_ref::<T>())
810    }
811
812    fn get_mut<T: 'static>(&mut self) -> Option<&mut T> {
813        let type_id = std::any::TypeId::of::<T>();
814        self.data
815            .get_mut(&type_id)
816            .and_then(|b| b.downcast_mut::<T>())
817    }
818
819    fn remove<T: 'static>(&mut self) -> Option<T> {
820        let type_id = std::any::TypeId::of::<T>();
821        self.data
822            .remove(&type_id)
823            .and_then(|b| b.downcast::<T>().ok().map(|b| *b))
824    }
825
826    fn contains<T: 'static>(&self) -> bool {
827        let type_id = std::any::TypeId::of::<T>();
828        self.data.contains_key(&type_id)
829    }
830
831    fn clear(&mut self) {
832        self.data.clear();
833    }
834}
835
836impl<T: 'static + Send> WorkerState<T> {
837    /// Initialize state for this worker.
838    ///
839    /// Call once per worker thread during startup.
840    #[inline]
841    pub fn init(value: T) {
842        WORKER_STATE.with(|storage| {
843            storage.borrow_mut().insert(value);
844        });
845        WORKER_STATE_STATS.record_init();
846    }
847
848    /// Access state immutably.
849    ///
850    /// # Panics
851    ///
852    /// Panics if state was not initialized. Use `try_with` for non-panicking.
853    #[inline]
854    pub fn with<F, R>(f: F) -> R
855    where
856        F: FnOnce(&T) -> R,
857    {
858        WORKER_STATE.with(|storage| {
859            let storage_ref = storage.borrow();
860            let state = storage_ref
861                .get::<T>()
862                .expect("WorkerState not initialized for this type");
863            WORKER_STATE_STATS.record_access();
864            f(state)
865        })
866    }
867
868    /// Access state mutably.
869    ///
870    /// # Panics
871    ///
872    /// Panics if state was not initialized.
873    #[inline]
874    pub fn with_mut<F, R>(f: F) -> R
875    where
876        F: FnOnce(&mut T) -> R,
877    {
878        WORKER_STATE.with(|storage| {
879            let mut storage_ref = storage.borrow_mut();
880            let state = storage_ref
881                .get_mut::<T>()
882                .expect("WorkerState not initialized for this type");
883            WORKER_STATE_STATS.record_access();
884            f(state)
885        })
886    }
887
888    /// Try to access state immutably.
889    ///
890    /// Returns `None` if state was not initialized.
891    #[inline]
892    pub fn try_with<F, R>(f: F) -> Option<R>
893    where
894        F: FnOnce(&T) -> R,
895    {
896        WORKER_STATE.with(|storage| {
897            let storage_ref = storage.borrow();
898            storage_ref.get::<T>().map(|state| {
899                WORKER_STATE_STATS.record_access();
900                f(state)
901            })
902        })
903    }
904
905    /// Try to access state mutably.
906    ///
907    /// Returns `None` if state was not initialized.
908    #[inline]
909    pub fn try_with_mut<F, R>(f: F) -> Option<R>
910    where
911        F: FnOnce(&mut T) -> R,
912    {
913        WORKER_STATE.with(|storage| {
914            let mut storage_ref = storage.borrow_mut();
915            storage_ref.get_mut::<T>().map(|state| {
916                WORKER_STATE_STATS.record_access();
917                f(state)
918            })
919        })
920    }
921
922    /// Check if state is initialized.
923    #[inline]
924    pub fn is_initialized() -> bool {
925        WORKER_STATE.with(|storage| storage.borrow().contains::<T>())
926    }
927
928    /// Remove state and return it.
929    #[inline]
930    pub fn take() -> Option<T> {
931        WORKER_STATE.with(|storage| storage.borrow_mut().remove::<T>())
932    }
933
934    /// Replace state with a new value, returning the old one.
935    #[inline]
936    pub fn replace(value: T) -> Option<T> {
937        let old = Self::take();
938        Self::init(value);
939        old
940    }
941}
942
943/// Initialize per-worker state.
944///
945/// Convenience function equivalent to `WorkerState::<T>::init(value)`.
946#[inline]
947pub fn init_worker_state<T: 'static + Send>(value: T) {
948    WorkerState::<T>::init(value);
949}
950
951/// Clear all per-worker state.
952///
953/// Use for testing or worker cleanup.
954pub fn clear_worker_state() {
955    WORKER_STATE.with(|storage| {
956        storage.borrow_mut().clear();
957    });
958}
959
960// ============================================================================
961// Worker State Statistics
962// ============================================================================
963
964/// Statistics for per-worker state operations.
965#[derive(Debug, Default)]
966pub struct WorkerStateStats {
967    /// State initializations
968    inits: AtomicU64,
969    /// State accesses
970    accesses: AtomicU64,
971}
972
973impl WorkerStateStats {
974    /// Create new stats.
975    pub fn new() -> Self {
976        Self::default()
977    }
978
979    #[inline]
980    fn record_init(&self) {
981        self.inits.fetch_add(1, Ordering::Relaxed);
982    }
983
984    #[inline]
985    fn record_access(&self) {
986        self.accesses.fetch_add(1, Ordering::Relaxed);
987    }
988
989    /// Get initialization count.
990    pub fn inits(&self) -> u64 {
991        self.inits.load(Ordering::Relaxed)
992    }
993
994    /// Get access count.
995    pub fn accesses(&self) -> u64 {
996        self.accesses.load(Ordering::Relaxed)
997    }
998}
999
1000/// Global worker state statistics.
1001static WORKER_STATE_STATS: WorkerStateStats = WorkerStateStats {
1002    inits: AtomicU64::new(0),
1003    accesses: AtomicU64::new(0),
1004};
1005
1006/// Get global worker state statistics.
1007pub fn worker_state_stats() -> &'static WorkerStateStats {
1008    &WORKER_STATE_STATS
1009}
1010
1011// ============================================================================
1012// Cloneable State Factory
1013// ============================================================================
1014
1015/// Factory for creating per-worker clones of shared state.
1016///
1017/// This is useful for state that needs to be cloned once per worker
1018/// rather than once per request.
1019///
1020/// ## Example
1021///
1022/// ```rust,ignore
1023/// let pool = DatabasePool::new("postgres://...");
1024/// let factory = StateFactory::new(pool);
1025///
1026/// // In worker initialization
1027/// factory.init_for_worker(); // Clones pool once per worker
1028///
1029/// // In request handler - no clone needed
1030/// WorkerState::<DatabasePool>::with(|pool| {
1031///     pool.get_connection()
1032/// });
1033/// ```
1034pub struct StateFactory<T: Clone + Send + 'static> {
1035    /// Shared state to clone from
1036    state: Arc<T>,
1037}
1038
1039impl<T: Clone + Send + 'static> StateFactory<T> {
1040    /// Create a new state factory.
1041    pub fn new(state: T) -> Self {
1042        Self {
1043            state: Arc::new(state),
1044        }
1045    }
1046
1047    /// Create from an existing Arc.
1048    pub fn from_arc(state: Arc<T>) -> Self {
1049        Self { state }
1050    }
1051
1052    /// Initialize state for the current worker.
1053    ///
1054    /// This clones the shared state once per worker.
1055    pub fn init_for_worker(&self) {
1056        let cloned = (*self.state).clone();
1057        WorkerState::<T>::init(cloned);
1058    }
1059
1060    /// Get a reference to the shared state.
1061    pub fn shared(&self) -> &T {
1062        &self.state
1063    }
1064
1065    /// Get the Arc to the shared state.
1066    pub fn arc(&self) -> Arc<T> {
1067        Arc::clone(&self.state)
1068    }
1069}
1070
1071impl<T: Clone + Send + 'static> Clone for StateFactory<T> {
1072    fn clone(&self) -> Self {
1073        Self {
1074            state: Arc::clone(&self.state),
1075        }
1076    }
1077}
1078
1079// ============================================================================
1080// Worker-Local Cache
1081// ============================================================================
1082
1083/// A simple per-worker cache to avoid repeated allocations.
1084///
1085/// Each worker maintains its own cache, eliminating contention.
1086#[derive(Debug)]
1087pub struct WorkerCache<K, V>
1088where
1089    K: std::hash::Hash + Eq + Clone,
1090{
1091    /// Cache entries
1092    data: std::collections::HashMap<K, V>,
1093    /// Maximum entries
1094    max_entries: usize,
1095    /// Hits
1096    hits: u64,
1097    /// Misses
1098    misses: u64,
1099}
1100
1101impl<K, V> WorkerCache<K, V>
1102where
1103    K: std::hash::Hash + Eq + Clone,
1104{
1105    /// Create a new cache with max entries.
1106    pub fn new(max_entries: usize) -> Self {
1107        Self {
1108            data: std::collections::HashMap::with_capacity(max_entries),
1109            max_entries,
1110            hits: 0,
1111            misses: 0,
1112        }
1113    }
1114
1115    /// Get a value from the cache.
1116    pub fn get(&mut self, key: &K) -> Option<&V> {
1117        if self.data.contains_key(key) {
1118            self.hits += 1;
1119            self.data.get(key)
1120        } else {
1121            self.misses += 1;
1122            None
1123        }
1124    }
1125
1126    /// Get a mutable value from the cache.
1127    pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
1128        if self.data.contains_key(key) {
1129            self.hits += 1;
1130            self.data.get_mut(key)
1131        } else {
1132            self.misses += 1;
1133            None
1134        }
1135    }
1136
1137    /// Insert a value into the cache.
1138    ///
1139    /// If at capacity, evicts a random entry.
1140    pub fn insert(&mut self, key: K, value: V) -> Option<V> {
1141        // Simple eviction: remove first entry if at capacity
1142        if self.data.len() >= self.max_entries && !self.data.contains_key(&key) {
1143            if let Some(first_key) = self.data.keys().next().cloned() {
1144                self.data.remove(&first_key);
1145            }
1146        }
1147        self.data.insert(key, value)
1148    }
1149
1150    /// Remove a value from the cache.
1151    pub fn remove(&mut self, key: &K) -> Option<V> {
1152        self.data.remove(key)
1153    }
1154
1155    /// Check if key exists.
1156    pub fn contains(&self, key: &K) -> bool {
1157        self.data.contains_key(key)
1158    }
1159
1160    /// Clear the cache.
1161    pub fn clear(&mut self) {
1162        self.data.clear();
1163        self.hits = 0;
1164        self.misses = 0;
1165    }
1166
1167    /// Get cache size.
1168    pub fn len(&self) -> usize {
1169        self.data.len()
1170    }
1171
1172    /// Check if empty.
1173    pub fn is_empty(&self) -> bool {
1174        self.data.is_empty()
1175    }
1176
1177    /// Get hit count.
1178    pub fn hits(&self) -> u64 {
1179        self.hits
1180    }
1181
1182    /// Get miss count.
1183    pub fn misses(&self) -> u64 {
1184        self.misses
1185    }
1186
1187    /// Get hit ratio.
1188    pub fn hit_ratio(&self) -> f64 {
1189        let total = self.hits + self.misses;
1190        if total > 0 {
1191            (self.hits as f64 / total as f64) * 100.0
1192        } else {
1193            0.0
1194        }
1195    }
1196}
1197
1198impl<K, V> Default for WorkerCache<K, V>
1199where
1200    K: std::hash::Hash + Eq + Clone,
1201{
1202    fn default() -> Self {
1203        Self::new(1000)
1204    }
1205}
1206
1207// ============================================================================
1208// Macros for ergonomic usage
1209// ============================================================================
1210
1211/// Initialize worker router and execute code.
1212///
1213/// This macro handles initialization and provides access in one step.
1214///
1215/// # Example
1216///
1217/// ```rust,ignore
1218/// with_worker_router!(router, {
1219///     router.route(request).await
1220/// });
1221/// ```
1222#[macro_export]
1223macro_rules! with_worker_router {
1224    ($router:ident, $body:block) => {{ $crate::worker::WorkerRouter::with(|$router| $body) }};
1225}
1226
1227// ============================================================================
1228// Tests
1229// ============================================================================
1230
1231#[cfg(test)]
1232mod tests {
1233    use super::*;
1234
1235    #[test]
1236    fn test_worker_id_generation() {
1237        let id1 = next_worker_id();
1238        let id2 = next_worker_id();
1239        assert!(id2 > id1);
1240    }
1241
1242    #[test]
1243    fn test_worker_config_default() {
1244        let config = WorkerConfig::default();
1245        assert_eq!(config.num_workers, 0);
1246        assert!(!config.cpu_affinity);
1247    }
1248
1249    #[test]
1250    fn test_worker_config_builder() {
1251        let config = WorkerConfig::new()
1252            .workers(4)
1253            .with_cpu_affinity()
1254            .name_prefix("test-worker");
1255
1256        assert_eq!(config.num_workers, 4);
1257        assert!(config.cpu_affinity);
1258        assert_eq!(config.name_prefix, "test-worker");
1259    }
1260
1261    #[test]
1262    fn test_effective_workers() {
1263        let config = WorkerConfig::new().workers(8);
1264        assert_eq!(config.effective_workers(), 8);
1265
1266        let auto_config = WorkerConfig::new();
1267        assert!(auto_config.effective_workers() >= 1);
1268    }
1269
1270    #[test]
1271    fn test_affinity_config_default() {
1272        let config = AffinityConfig::default();
1273        assert!(!config.enabled);
1274        assert!(config.cores.is_empty());
1275        assert_eq!(config.mode, AffinityMode::RoundRobin);
1276    }
1277
1278    #[test]
1279    fn test_affinity_config_builder() {
1280        let config = AffinityConfig::new()
1281            .enable()
1282            .cores(vec![0, 2, 4])
1283            .mode(AffinityMode::Spread);
1284
1285        assert!(config.enabled);
1286        assert_eq!(config.cores, vec![0, 2, 4]);
1287        assert_eq!(config.mode, AffinityMode::Spread);
1288    }
1289
1290    #[test]
1291    fn test_core_for_worker_round_robin() {
1292        let config = AffinityConfig::new()
1293            .enable()
1294            .mode(AffinityMode::RoundRobin);
1295
1296        let num_cores = num_cpus();
1297        assert_eq!(config.core_for_worker(0), 0);
1298        assert_eq!(config.core_for_worker(1), 1 % num_cores);
1299        assert_eq!(config.core_for_worker(num_cores), 0);
1300    }
1301
1302    #[test]
1303    fn test_core_for_worker_specific_cores() {
1304        let config = AffinityConfig::new().enable().cores(vec![0, 4, 8]);
1305
1306        assert_eq!(config.core_for_worker(0), 0);
1307        assert_eq!(config.core_for_worker(1), 4);
1308        assert_eq!(config.core_for_worker(2), 8);
1309        assert_eq!(config.core_for_worker(3), 0); // Wraps around
1310    }
1311
1312    #[test]
1313    fn test_num_cpus() {
1314        let cpus = num_cpus();
1315        assert!(cpus >= 1);
1316    }
1317
1318    #[test]
1319    fn test_num_physical_cpus() {
1320        let physical = num_physical_cpus();
1321        let total = num_cpus();
1322        assert!(physical >= 1);
1323        assert!(physical <= total);
1324    }
1325
1326    #[test]
1327    fn test_affinity_supported() {
1328        // Just check it returns a bool without panicking
1329        let _ = affinity_supported();
1330    }
1331
1332    #[test]
1333    fn test_get_thread_affinity() {
1334        // Should return Ok on all platforms
1335        let result = get_thread_affinity();
1336        assert!(result.is_ok());
1337        let cores = result.unwrap();
1338        assert!(!cores.is_empty());
1339    }
1340
1341    #[test]
1342    fn test_affinity_stats() {
1343        let stats = affinity_stats();
1344        let _ = stats.successful();
1345        let _ = stats.failed();
1346        let _ = stats.success_rate();
1347    }
1348
1349    #[test]
1350    fn test_affinity_error_display() {
1351        let err1 = AffinityError::InvalidCore { core: 100, max: 7 };
1352        assert!(err1.to_string().contains("100"));
1353
1354        let err2 = AffinityError::NotSupported;
1355        assert!(err2.to_string().contains("not supported"));
1356    }
1357
1358    #[test]
1359    fn test_worker_router_not_initialized() {
1360        // Clear any existing router
1361        clear_worker_router();
1362
1363        assert!(!has_worker_router());
1364        assert!(WorkerRouter::try_with(|_| ()).is_none());
1365        assert!(WorkerRouter::clone_arc().is_none());
1366    }
1367
1368    // Per-Worker State Tests
1369
1370    #[test]
1371    fn test_worker_state_basic() {
1372        // Clear any existing state
1373        clear_worker_state();
1374
1375        // Initialize state
1376        WorkerState::<u64>::init(42);
1377
1378        // Access immutably
1379        let value = WorkerState::<u64>::with(|v| *v);
1380        assert_eq!(value, 42);
1381
1382        // Access mutably
1383        WorkerState::<u64>::with_mut(|v| *v += 1);
1384        let value = WorkerState::<u64>::with(|v| *v);
1385        assert_eq!(value, 43);
1386
1387        // Clean up
1388        clear_worker_state();
1389    }
1390
1391    #[test]
1392    fn test_worker_state_multiple_types() {
1393        clear_worker_state();
1394
1395        WorkerState::<u64>::init(100);
1396        WorkerState::<String>::init("hello".to_string());
1397
1398        assert_eq!(WorkerState::<u64>::with(|v| *v), 100);
1399        assert_eq!(WorkerState::<String>::with(|v| v.clone()), "hello");
1400
1401        clear_worker_state();
1402    }
1403
1404    #[test]
1405    fn test_worker_state_try_with() {
1406        clear_worker_state();
1407
1408        // Not initialized
1409        assert!(WorkerState::<i32>::try_with(|_| ()).is_none());
1410
1411        // Initialize and access
1412        WorkerState::<i32>::init(123);
1413        assert!(WorkerState::<i32>::try_with(|v| *v).is_some());
1414        assert_eq!(WorkerState::<i32>::try_with(|v| *v), Some(123));
1415
1416        clear_worker_state();
1417    }
1418
1419    #[test]
1420    fn test_worker_state_take() {
1421        clear_worker_state();
1422
1423        WorkerState::<String>::init("test".to_string());
1424        assert!(WorkerState::<String>::is_initialized());
1425
1426        let taken = WorkerState::<String>::take();
1427        assert_eq!(taken, Some("test".to_string()));
1428        assert!(!WorkerState::<String>::is_initialized());
1429
1430        clear_worker_state();
1431    }
1432
1433    #[test]
1434    fn test_worker_state_replace() {
1435        clear_worker_state();
1436
1437        WorkerState::<u32>::init(10);
1438        let old = WorkerState::<u32>::replace(20);
1439        assert_eq!(old, Some(10));
1440        assert_eq!(WorkerState::<u32>::with(|v| *v), 20);
1441
1442        clear_worker_state();
1443    }
1444
1445    #[test]
1446    fn test_worker_cache_basic() {
1447        let mut cache = WorkerCache::<String, u32>::new(10);
1448
1449        cache.insert("key1".to_string(), 100);
1450        cache.insert("key2".to_string(), 200);
1451
1452        assert_eq!(cache.get(&"key1".to_string()), Some(&100));
1453        assert_eq!(cache.get(&"key3".to_string()), None);
1454        assert_eq!(cache.len(), 2);
1455    }
1456
1457    #[test]
1458    fn test_worker_cache_eviction() {
1459        let mut cache = WorkerCache::<u32, u32>::new(3);
1460
1461        cache.insert(1, 100);
1462        cache.insert(2, 200);
1463        cache.insert(3, 300);
1464        assert_eq!(cache.len(), 3);
1465
1466        // This should evict one entry
1467        cache.insert(4, 400);
1468        assert_eq!(cache.len(), 3);
1469        assert!(cache.contains(&4));
1470    }
1471
1472    #[test]
1473    fn test_worker_cache_hit_ratio() {
1474        let mut cache = WorkerCache::<u32, u32>::new(10);
1475
1476        cache.insert(1, 100);
1477        cache.get(&1); // Hit
1478        cache.get(&1); // Hit
1479        cache.get(&2); // Miss
1480
1481        assert_eq!(cache.hits(), 2);
1482        assert_eq!(cache.misses(), 1);
1483        assert!((cache.hit_ratio() - 66.67).abs() < 1.0);
1484    }
1485
1486    #[test]
1487    fn test_state_factory() {
1488        clear_worker_state();
1489
1490        let factory = StateFactory::new(vec![1, 2, 3]);
1491        factory.init_for_worker();
1492
1493        WorkerState::<Vec<i32>>::with(|v| {
1494            assert_eq!(v, &vec![1, 2, 3]);
1495        });
1496
1497        clear_worker_state();
1498    }
1499
1500    #[test]
1501    fn test_worker_state_stats() {
1502        let stats = worker_state_stats();
1503        let _ = stats.inits();
1504        let _ = stats.accesses();
1505    }
1506
1507    #[test]
1508    fn test_worker_router_initialization() {
1509        let router = Arc::new(Router::new());
1510
1511        init_worker_router(router);
1512
1513        assert!(has_worker_router());
1514        assert!(worker_id().is_some());
1515
1516        WorkerRouter::with(|r| {
1517            assert!(r.routes.is_empty());
1518        });
1519
1520        // Cleanup
1521        clear_worker_router();
1522    }
1523
1524    #[test]
1525    fn test_worker_handle() {
1526        let handle = WorkerHandle::new(5, "test-worker");
1527        assert_eq!(handle.id, 5);
1528        assert_eq!(handle.name, "test-worker-5");
1529    }
1530
1531    #[test]
1532    fn test_worker_stats() {
1533        let stats = worker_stats();
1534
1535        // Stats should be accessible
1536        let _ = stats.inits();
1537        let _ = stats.accesses();
1538        let _ = stats.clones();
1539        let _ = stats.clone_avoidance_ratio();
1540    }
1541}