1use crate::Router;
35use std::cell::RefCell;
36use std::sync::Arc;
37use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
38
39static WORKER_ID_COUNTER: AtomicUsize = AtomicUsize::new(0);
45
46#[inline]
48pub fn next_worker_id() -> usize {
49 WORKER_ID_COUNTER.fetch_add(1, Ordering::Relaxed)
50}
51
52#[inline]
54pub fn total_workers() -> usize {
55 WORKER_ID_COUNTER.load(Ordering::Relaxed)
56}
57
58thread_local! {
63 static WORKER_ROUTER: RefCell<Option<Arc<Router>>> = const { RefCell::new(None) };
65
66 static WORKER_ID: RefCell<Option<usize>> = const { RefCell::new(None) };
68}
69
70#[inline]
83pub fn init_worker_router(router: Arc<Router>) {
84 WORKER_ROUTER.with(|r| {
85 *r.borrow_mut() = Some(router);
86 });
87 WORKER_ID.with(|id| {
88 if id.borrow().is_none() {
89 *id.borrow_mut() = Some(next_worker_id());
90 }
91 });
92 WORKER_STATS.record_init();
93}
94
95#[inline]
97pub fn clear_worker_router() {
98 WORKER_ROUTER.with(|r| {
99 *r.borrow_mut() = None;
100 });
101}
102
103#[inline]
105pub fn worker_id() -> Option<usize> {
106 WORKER_ID.with(|id| *id.borrow())
107}
108
109#[inline]
111pub fn has_worker_router() -> bool {
112 WORKER_ROUTER.with(|r| r.borrow().is_some())
113}
114
115pub struct WorkerRouter;
123
124impl WorkerRouter {
125 #[inline]
143 pub fn with<F, R>(f: F) -> R
144 where
145 F: FnOnce(&Router) -> R,
146 {
147 WORKER_ROUTER.with(|r| {
148 let router_ref = r.borrow();
149 let router = router_ref
150 .as_ref()
151 .expect("WorkerRouter not initialized. Call init_worker_router first.");
152 WORKER_STATS.record_access();
153 f(router)
154 })
155 }
156
157 #[inline]
161 pub fn try_with<F, R>(f: F) -> Option<R>
162 where
163 F: FnOnce(&Router) -> R,
164 {
165 WORKER_ROUTER.with(|r| {
166 let router_ref = r.borrow();
167 router_ref.as_ref().map(|router| {
168 WORKER_STATS.record_access();
169 f(router)
170 })
171 })
172 }
173
174 #[inline]
180 pub fn clone_arc() -> Option<Arc<Router>> {
181 WORKER_ROUTER.with(|r| {
182 let router_ref = r.borrow();
183 router_ref.as_ref().map(|router| {
184 WORKER_STATS.record_clone();
185 Arc::clone(router)
186 })
187 })
188 }
189
190 #[inline]
192 pub fn clone_arc_or_panic() -> Arc<Router> {
193 Self::clone_arc().expect("WorkerRouter not initialized")
194 }
195}
196
197#[derive(Debug, Clone)]
203pub struct WorkerConfig {
204 pub num_workers: usize,
206 pub cpu_affinity: bool,
208 pub stack_size: Option<usize>,
210 pub name_prefix: String,
212}
213
214impl Default for WorkerConfig {
215 fn default() -> Self {
216 Self {
217 num_workers: 0, cpu_affinity: false,
219 stack_size: None,
220 name_prefix: "armature-worker".to_string(),
221 }
222 }
223}
224
225impl WorkerConfig {
226 pub fn new() -> Self {
228 Self::default()
229 }
230
231 #[inline]
235 pub fn workers(mut self, n: usize) -> Self {
236 self.num_workers = n;
237 self
238 }
239
240 #[inline]
245 pub fn with_cpu_affinity(mut self) -> Self {
246 self.cpu_affinity = true;
247 self
248 }
249
250 #[inline]
252 pub fn stack_size(mut self, size: usize) -> Self {
253 self.stack_size = Some(size);
254 self
255 }
256
257 #[inline]
259 pub fn name_prefix(mut self, prefix: impl Into<String>) -> Self {
260 self.name_prefix = prefix.into();
261 self
262 }
263
264 #[inline]
268 pub fn effective_workers(&self) -> usize {
269 if self.num_workers > 0 {
270 self.num_workers
271 } else {
272 std::thread::available_parallelism()
273 .map(|n| n.get())
274 .unwrap_or(1)
275 }
276 }
277}
278
279#[derive(Debug, Clone)]
285pub struct AffinityConfig {
286 pub enabled: bool,
288 pub cores: Vec<usize>,
290 pub mode: AffinityMode,
292}
293
294impl Default for AffinityConfig {
295 fn default() -> Self {
296 Self {
297 enabled: false,
298 cores: Vec::new(),
299 mode: AffinityMode::RoundRobin,
300 }
301 }
302}
303
304impl AffinityConfig {
305 pub fn new() -> Self {
307 Self::default()
308 }
309
310 #[inline]
312 pub fn enable(mut self) -> Self {
313 self.enabled = true;
314 self
315 }
316
317 #[inline]
319 pub fn disable(mut self) -> Self {
320 self.enabled = false;
321 self
322 }
323
324 #[inline]
326 pub fn cores(mut self, cores: Vec<usize>) -> Self {
327 self.cores = cores;
328 self
329 }
330
331 #[inline]
333 pub fn mode(mut self, mode: AffinityMode) -> Self {
334 self.mode = mode;
335 self
336 }
337
338 #[inline]
340 pub fn core_for_worker(&self, worker_id: usize) -> usize {
341 if self.cores.is_empty() {
342 let num_cores = num_cpus();
344 match self.mode {
345 AffinityMode::RoundRobin => worker_id % num_cores,
346 AffinityMode::Packed => worker_id.min(num_cores - 1),
347 AffinityMode::Spread => {
348 let stride = num_cores / 2;
350 (worker_id * stride.max(1)) % num_cores
351 }
352 }
353 } else {
354 self.cores[worker_id % self.cores.len()]
356 }
357 }
358}
359
360#[derive(Debug, Clone, Copy, PartialEq, Eq)]
362pub enum AffinityMode {
363 RoundRobin,
365 Packed,
367 Spread,
369}
370
371#[inline]
373pub fn num_cpus() -> usize {
374 std::thread::available_parallelism()
375 .map(|n| n.get())
376 .unwrap_or(1)
377}
378
379#[inline]
383pub fn num_physical_cpus() -> usize {
384 let total = num_cpus();
387 if total > 4 && total.is_multiple_of(2) {
389 total / 2
390 } else {
391 total
392 }
393}
394
395#[inline]
411pub fn set_thread_affinity(core: usize) -> Result<(), AffinityError> {
412 #[cfg(target_os = "linux")]
413 {
414 set_thread_affinity_linux(core)
415 }
416
417 #[cfg(not(target_os = "linux"))]
418 {
419 let _ = core;
421 Ok(())
422 }
423}
424
425#[cfg(target_os = "linux")]
427fn set_thread_affinity_linux(core: usize) -> Result<(), AffinityError> {
428 use std::mem;
429
430 let num_cores = num_cpus();
432 if core >= num_cores {
433 return Err(AffinityError::InvalidCore {
434 core,
435 max: num_cores - 1,
436 });
437 }
438
439 let mut mask: u64 = 0;
442 mask |= 1u64 << core;
443
444 unsafe {
446 let result = libc::sched_setaffinity(
447 0, mem::size_of::<u64>(),
449 &mask as *const u64 as *const libc::cpu_set_t,
450 );
451
452 if result == 0 {
453 AFFINITY_STATS.record_set(true);
454 Ok(())
455 } else {
456 AFFINITY_STATS.record_set(false);
457 Err(AffinityError::SystemError {
458 errno: *libc::__errno_location(),
459 })
460 }
461 }
462}
463
464#[derive(Debug, Clone)]
466pub enum AffinityError {
467 InvalidCore { core: usize, max: usize },
469 SystemError { errno: i32 },
471 NotSupported,
473}
474
475impl std::fmt::Display for AffinityError {
476 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
477 match self {
478 Self::InvalidCore { core, max } => {
479 write!(f, "Invalid core {}, max is {}", core, max)
480 }
481 Self::SystemError { errno } => {
482 write!(f, "System error: errno {}", errno)
483 }
484 Self::NotSupported => write!(f, "CPU affinity not supported on this platform"),
485 }
486 }
487}
488
489impl std::error::Error for AffinityError {}
490
491#[inline]
495pub fn get_thread_affinity() -> Result<Vec<usize>, AffinityError> {
496 #[cfg(target_os = "linux")]
497 {
498 get_thread_affinity_linux()
499 }
500
501 #[cfg(not(target_os = "linux"))]
502 {
503 Ok((0..num_cpus()).collect())
505 }
506}
507
508#[cfg(target_os = "linux")]
510fn get_thread_affinity_linux() -> Result<Vec<usize>, AffinityError> {
511 use std::mem;
512
513 let mut mask: u64 = 0;
514
515 unsafe {
517 let result = libc::sched_getaffinity(
518 0,
519 mem::size_of::<u64>(),
520 &mut mask as *mut u64 as *mut libc::cpu_set_t,
521 );
522
523 if result == 0 {
524 let mut cores = Vec::new();
525 for i in 0..64 {
526 if (mask & (1u64 << i)) != 0 {
527 cores.push(i);
528 }
529 }
530 Ok(cores)
531 } else {
532 Err(AffinityError::SystemError {
533 errno: *libc::__errno_location(),
534 })
535 }
536 }
537}
538
539#[inline]
541pub fn affinity_supported() -> bool {
542 cfg!(target_os = "linux")
543}
544
545#[inline]
562pub fn init_worker_with_affinity(
563 worker_id: usize,
564 config: &AffinityConfig,
565 router: Arc<Router>,
566) -> Result<(), AffinityError> {
567 if config.enabled && affinity_supported() {
569 let core = config.core_for_worker(worker_id);
570 set_thread_affinity(core)?;
571 }
572
573 init_worker_router(router);
575
576 Ok(())
577}
578
579#[derive(Debug, Default)]
585pub struct AffinityStats {
586 successful: AtomicU64,
588 failed: AtomicU64,
590}
591
592impl AffinityStats {
593 pub fn new() -> Self {
595 Self::default()
596 }
597
598 #[inline]
599 fn record_set(&self, success: bool) {
600 if success {
601 self.successful.fetch_add(1, Ordering::Relaxed);
602 } else {
603 self.failed.fetch_add(1, Ordering::Relaxed);
604 }
605 }
606
607 pub fn successful(&self) -> u64 {
609 self.successful.load(Ordering::Relaxed)
610 }
611
612 pub fn failed(&self) -> u64 {
614 self.failed.load(Ordering::Relaxed)
615 }
616
617 pub fn success_rate(&self) -> f64 {
619 let total = self.successful() + self.failed();
620 if total > 0 {
621 (self.successful() as f64 / total as f64) * 100.0
622 } else {
623 0.0
624 }
625 }
626}
627
628static AFFINITY_STATS: AffinityStats = AffinityStats {
630 successful: AtomicU64::new(0),
631 failed: AtomicU64::new(0),
632};
633
634pub fn affinity_stats() -> &'static AffinityStats {
636 &AFFINITY_STATS
637}
638
639#[derive(Debug, Default)]
645pub struct WorkerStats {
646 inits: AtomicU64,
648 accesses: AtomicU64,
650 clones: AtomicU64,
652}
653
654impl WorkerStats {
655 pub fn new() -> Self {
657 Self::default()
658 }
659
660 #[inline]
661 fn record_init(&self) {
662 self.inits.fetch_add(1, Ordering::Relaxed);
663 }
664
665 #[inline]
666 fn record_access(&self) {
667 self.accesses.fetch_add(1, Ordering::Relaxed);
668 }
669
670 #[inline]
671 fn record_clone(&self) {
672 self.clones.fetch_add(1, Ordering::Relaxed);
673 }
674
675 pub fn inits(&self) -> u64 {
677 self.inits.load(Ordering::Relaxed)
678 }
679
680 pub fn accesses(&self) -> u64 {
682 self.accesses.load(Ordering::Relaxed)
683 }
684
685 pub fn clones(&self) -> u64 {
687 self.clones.load(Ordering::Relaxed)
688 }
689
690 pub fn clone_avoidance_ratio(&self) -> f64 {
694 let accesses = self.accesses() as f64;
695 let clones = self.clones() as f64;
696 if accesses > 0.0 {
697 ((accesses - clones) / accesses) * 100.0
698 } else {
699 0.0
700 }
701 }
702}
703
704static WORKER_STATS: WorkerStats = WorkerStats {
706 inits: AtomicU64::new(0),
707 accesses: AtomicU64::new(0),
708 clones: AtomicU64::new(0),
709};
710
711pub fn worker_stats() -> &'static WorkerStats {
713 &WORKER_STATS
714}
715
716#[derive(Debug, Clone)]
722pub struct WorkerHandle {
723 pub id: usize,
725 pub name: String,
727}
728
729impl WorkerHandle {
730 pub fn new(id: usize, name_prefix: &str) -> Self {
732 Self {
733 id,
734 name: format!("{}-{}", name_prefix, id),
735 }
736 }
737}
738
739pub struct WorkerState<T: 'static> {
779 _marker: std::marker::PhantomData<T>,
780}
781
782thread_local! {
785 static WORKER_STATE: RefCell<WorkerStateStorage> = RefCell::new(WorkerStateStorage::new());
786}
787
788#[derive(Default)]
790struct WorkerStateStorage {
791 data: std::collections::HashMap<std::any::TypeId, Box<dyn std::any::Any + Send>>,
793}
794
795impl WorkerStateStorage {
796 fn new() -> Self {
797 Self {
798 data: std::collections::HashMap::new(),
799 }
800 }
801
802 fn insert<T: 'static + Send>(&mut self, value: T) {
803 let type_id = std::any::TypeId::of::<T>();
804 self.data.insert(type_id, Box::new(value));
805 }
806
807 fn get<T: 'static>(&self) -> Option<&T> {
808 let type_id = std::any::TypeId::of::<T>();
809 self.data.get(&type_id).and_then(|b| b.downcast_ref::<T>())
810 }
811
812 fn get_mut<T: 'static>(&mut self) -> Option<&mut T> {
813 let type_id = std::any::TypeId::of::<T>();
814 self.data
815 .get_mut(&type_id)
816 .and_then(|b| b.downcast_mut::<T>())
817 }
818
819 fn remove<T: 'static>(&mut self) -> Option<T> {
820 let type_id = std::any::TypeId::of::<T>();
821 self.data
822 .remove(&type_id)
823 .and_then(|b| b.downcast::<T>().ok().map(|b| *b))
824 }
825
826 fn contains<T: 'static>(&self) -> bool {
827 let type_id = std::any::TypeId::of::<T>();
828 self.data.contains_key(&type_id)
829 }
830
831 fn clear(&mut self) {
832 self.data.clear();
833 }
834}
835
836impl<T: 'static + Send> WorkerState<T> {
837 #[inline]
841 pub fn init(value: T) {
842 WORKER_STATE.with(|storage| {
843 storage.borrow_mut().insert(value);
844 });
845 WORKER_STATE_STATS.record_init();
846 }
847
848 #[inline]
854 pub fn with<F, R>(f: F) -> R
855 where
856 F: FnOnce(&T) -> R,
857 {
858 WORKER_STATE.with(|storage| {
859 let storage_ref = storage.borrow();
860 let state = storage_ref
861 .get::<T>()
862 .expect("WorkerState not initialized for this type");
863 WORKER_STATE_STATS.record_access();
864 f(state)
865 })
866 }
867
868 #[inline]
874 pub fn with_mut<F, R>(f: F) -> R
875 where
876 F: FnOnce(&mut T) -> R,
877 {
878 WORKER_STATE.with(|storage| {
879 let mut storage_ref = storage.borrow_mut();
880 let state = storage_ref
881 .get_mut::<T>()
882 .expect("WorkerState not initialized for this type");
883 WORKER_STATE_STATS.record_access();
884 f(state)
885 })
886 }
887
888 #[inline]
892 pub fn try_with<F, R>(f: F) -> Option<R>
893 where
894 F: FnOnce(&T) -> R,
895 {
896 WORKER_STATE.with(|storage| {
897 let storage_ref = storage.borrow();
898 storage_ref.get::<T>().map(|state| {
899 WORKER_STATE_STATS.record_access();
900 f(state)
901 })
902 })
903 }
904
905 #[inline]
909 pub fn try_with_mut<F, R>(f: F) -> Option<R>
910 where
911 F: FnOnce(&mut T) -> R,
912 {
913 WORKER_STATE.with(|storage| {
914 let mut storage_ref = storage.borrow_mut();
915 storage_ref.get_mut::<T>().map(|state| {
916 WORKER_STATE_STATS.record_access();
917 f(state)
918 })
919 })
920 }
921
922 #[inline]
924 pub fn is_initialized() -> bool {
925 WORKER_STATE.with(|storage| storage.borrow().contains::<T>())
926 }
927
928 #[inline]
930 pub fn take() -> Option<T> {
931 WORKER_STATE.with(|storage| storage.borrow_mut().remove::<T>())
932 }
933
934 #[inline]
936 pub fn replace(value: T) -> Option<T> {
937 let old = Self::take();
938 Self::init(value);
939 old
940 }
941}
942
943#[inline]
947pub fn init_worker_state<T: 'static + Send>(value: T) {
948 WorkerState::<T>::init(value);
949}
950
951pub fn clear_worker_state() {
955 WORKER_STATE.with(|storage| {
956 storage.borrow_mut().clear();
957 });
958}
959
960#[derive(Debug, Default)]
966pub struct WorkerStateStats {
967 inits: AtomicU64,
969 accesses: AtomicU64,
971}
972
973impl WorkerStateStats {
974 pub fn new() -> Self {
976 Self::default()
977 }
978
979 #[inline]
980 fn record_init(&self) {
981 self.inits.fetch_add(1, Ordering::Relaxed);
982 }
983
984 #[inline]
985 fn record_access(&self) {
986 self.accesses.fetch_add(1, Ordering::Relaxed);
987 }
988
989 pub fn inits(&self) -> u64 {
991 self.inits.load(Ordering::Relaxed)
992 }
993
994 pub fn accesses(&self) -> u64 {
996 self.accesses.load(Ordering::Relaxed)
997 }
998}
999
1000static WORKER_STATE_STATS: WorkerStateStats = WorkerStateStats {
1002 inits: AtomicU64::new(0),
1003 accesses: AtomicU64::new(0),
1004};
1005
1006pub fn worker_state_stats() -> &'static WorkerStateStats {
1008 &WORKER_STATE_STATS
1009}
1010
1011pub struct StateFactory<T: Clone + Send + 'static> {
1035 state: Arc<T>,
1037}
1038
1039impl<T: Clone + Send + 'static> StateFactory<T> {
1040 pub fn new(state: T) -> Self {
1042 Self {
1043 state: Arc::new(state),
1044 }
1045 }
1046
1047 pub fn from_arc(state: Arc<T>) -> Self {
1049 Self { state }
1050 }
1051
1052 pub fn init_for_worker(&self) {
1056 let cloned = (*self.state).clone();
1057 WorkerState::<T>::init(cloned);
1058 }
1059
1060 pub fn shared(&self) -> &T {
1062 &self.state
1063 }
1064
1065 pub fn arc(&self) -> Arc<T> {
1067 Arc::clone(&self.state)
1068 }
1069}
1070
1071impl<T: Clone + Send + 'static> Clone for StateFactory<T> {
1072 fn clone(&self) -> Self {
1073 Self {
1074 state: Arc::clone(&self.state),
1075 }
1076 }
1077}
1078
1079#[derive(Debug)]
1087pub struct WorkerCache<K, V>
1088where
1089 K: std::hash::Hash + Eq + Clone,
1090{
1091 data: std::collections::HashMap<K, V>,
1093 max_entries: usize,
1095 hits: u64,
1097 misses: u64,
1099}
1100
1101impl<K, V> WorkerCache<K, V>
1102where
1103 K: std::hash::Hash + Eq + Clone,
1104{
1105 pub fn new(max_entries: usize) -> Self {
1107 Self {
1108 data: std::collections::HashMap::with_capacity(max_entries),
1109 max_entries,
1110 hits: 0,
1111 misses: 0,
1112 }
1113 }
1114
1115 pub fn get(&mut self, key: &K) -> Option<&V> {
1117 if self.data.contains_key(key) {
1118 self.hits += 1;
1119 self.data.get(key)
1120 } else {
1121 self.misses += 1;
1122 None
1123 }
1124 }
1125
1126 pub fn get_mut(&mut self, key: &K) -> Option<&mut V> {
1128 if self.data.contains_key(key) {
1129 self.hits += 1;
1130 self.data.get_mut(key)
1131 } else {
1132 self.misses += 1;
1133 None
1134 }
1135 }
1136
1137 pub fn insert(&mut self, key: K, value: V) -> Option<V> {
1141 if self.data.len() >= self.max_entries && !self.data.contains_key(&key) {
1143 if let Some(first_key) = self.data.keys().next().cloned() {
1144 self.data.remove(&first_key);
1145 }
1146 }
1147 self.data.insert(key, value)
1148 }
1149
1150 pub fn remove(&mut self, key: &K) -> Option<V> {
1152 self.data.remove(key)
1153 }
1154
1155 pub fn contains(&self, key: &K) -> bool {
1157 self.data.contains_key(key)
1158 }
1159
1160 pub fn clear(&mut self) {
1162 self.data.clear();
1163 self.hits = 0;
1164 self.misses = 0;
1165 }
1166
1167 pub fn len(&self) -> usize {
1169 self.data.len()
1170 }
1171
1172 pub fn is_empty(&self) -> bool {
1174 self.data.is_empty()
1175 }
1176
1177 pub fn hits(&self) -> u64 {
1179 self.hits
1180 }
1181
1182 pub fn misses(&self) -> u64 {
1184 self.misses
1185 }
1186
1187 pub fn hit_ratio(&self) -> f64 {
1189 let total = self.hits + self.misses;
1190 if total > 0 {
1191 (self.hits as f64 / total as f64) * 100.0
1192 } else {
1193 0.0
1194 }
1195 }
1196}
1197
1198impl<K, V> Default for WorkerCache<K, V>
1199where
1200 K: std::hash::Hash + Eq + Clone,
1201{
1202 fn default() -> Self {
1203 Self::new(1000)
1204 }
1205}
1206
1207#[macro_export]
1223macro_rules! with_worker_router {
1224 ($router:ident, $body:block) => {{ $crate::worker::WorkerRouter::with(|$router| $body) }};
1225}
1226
1227#[cfg(test)]
1232mod tests {
1233 use super::*;
1234
1235 #[test]
1236 fn test_worker_id_generation() {
1237 let id1 = next_worker_id();
1238 let id2 = next_worker_id();
1239 assert!(id2 > id1);
1240 }
1241
1242 #[test]
1243 fn test_worker_config_default() {
1244 let config = WorkerConfig::default();
1245 assert_eq!(config.num_workers, 0);
1246 assert!(!config.cpu_affinity);
1247 }
1248
1249 #[test]
1250 fn test_worker_config_builder() {
1251 let config = WorkerConfig::new()
1252 .workers(4)
1253 .with_cpu_affinity()
1254 .name_prefix("test-worker");
1255
1256 assert_eq!(config.num_workers, 4);
1257 assert!(config.cpu_affinity);
1258 assert_eq!(config.name_prefix, "test-worker");
1259 }
1260
1261 #[test]
1262 fn test_effective_workers() {
1263 let config = WorkerConfig::new().workers(8);
1264 assert_eq!(config.effective_workers(), 8);
1265
1266 let auto_config = WorkerConfig::new();
1267 assert!(auto_config.effective_workers() >= 1);
1268 }
1269
1270 #[test]
1271 fn test_affinity_config_default() {
1272 let config = AffinityConfig::default();
1273 assert!(!config.enabled);
1274 assert!(config.cores.is_empty());
1275 assert_eq!(config.mode, AffinityMode::RoundRobin);
1276 }
1277
1278 #[test]
1279 fn test_affinity_config_builder() {
1280 let config = AffinityConfig::new()
1281 .enable()
1282 .cores(vec![0, 2, 4])
1283 .mode(AffinityMode::Spread);
1284
1285 assert!(config.enabled);
1286 assert_eq!(config.cores, vec![0, 2, 4]);
1287 assert_eq!(config.mode, AffinityMode::Spread);
1288 }
1289
1290 #[test]
1291 fn test_core_for_worker_round_robin() {
1292 let config = AffinityConfig::new()
1293 .enable()
1294 .mode(AffinityMode::RoundRobin);
1295
1296 let num_cores = num_cpus();
1297 assert_eq!(config.core_for_worker(0), 0);
1298 assert_eq!(config.core_for_worker(1), 1 % num_cores);
1299 assert_eq!(config.core_for_worker(num_cores), 0);
1300 }
1301
1302 #[test]
1303 fn test_core_for_worker_specific_cores() {
1304 let config = AffinityConfig::new().enable().cores(vec![0, 4, 8]);
1305
1306 assert_eq!(config.core_for_worker(0), 0);
1307 assert_eq!(config.core_for_worker(1), 4);
1308 assert_eq!(config.core_for_worker(2), 8);
1309 assert_eq!(config.core_for_worker(3), 0); }
1311
1312 #[test]
1313 fn test_num_cpus() {
1314 let cpus = num_cpus();
1315 assert!(cpus >= 1);
1316 }
1317
1318 #[test]
1319 fn test_num_physical_cpus() {
1320 let physical = num_physical_cpus();
1321 let total = num_cpus();
1322 assert!(physical >= 1);
1323 assert!(physical <= total);
1324 }
1325
1326 #[test]
1327 fn test_affinity_supported() {
1328 let _ = affinity_supported();
1330 }
1331
1332 #[test]
1333 fn test_get_thread_affinity() {
1334 let result = get_thread_affinity();
1336 assert!(result.is_ok());
1337 let cores = result.unwrap();
1338 assert!(!cores.is_empty());
1339 }
1340
1341 #[test]
1342 fn test_affinity_stats() {
1343 let stats = affinity_stats();
1344 let _ = stats.successful();
1345 let _ = stats.failed();
1346 let _ = stats.success_rate();
1347 }
1348
1349 #[test]
1350 fn test_affinity_error_display() {
1351 let err1 = AffinityError::InvalidCore { core: 100, max: 7 };
1352 assert!(err1.to_string().contains("100"));
1353
1354 let err2 = AffinityError::NotSupported;
1355 assert!(err2.to_string().contains("not supported"));
1356 }
1357
1358 #[test]
1359 fn test_worker_router_not_initialized() {
1360 clear_worker_router();
1362
1363 assert!(!has_worker_router());
1364 assert!(WorkerRouter::try_with(|_| ()).is_none());
1365 assert!(WorkerRouter::clone_arc().is_none());
1366 }
1367
1368 #[test]
1371 fn test_worker_state_basic() {
1372 clear_worker_state();
1374
1375 WorkerState::<u64>::init(42);
1377
1378 let value = WorkerState::<u64>::with(|v| *v);
1380 assert_eq!(value, 42);
1381
1382 WorkerState::<u64>::with_mut(|v| *v += 1);
1384 let value = WorkerState::<u64>::with(|v| *v);
1385 assert_eq!(value, 43);
1386
1387 clear_worker_state();
1389 }
1390
1391 #[test]
1392 fn test_worker_state_multiple_types() {
1393 clear_worker_state();
1394
1395 WorkerState::<u64>::init(100);
1396 WorkerState::<String>::init("hello".to_string());
1397
1398 assert_eq!(WorkerState::<u64>::with(|v| *v), 100);
1399 assert_eq!(WorkerState::<String>::with(|v| v.clone()), "hello");
1400
1401 clear_worker_state();
1402 }
1403
1404 #[test]
1405 fn test_worker_state_try_with() {
1406 clear_worker_state();
1407
1408 assert!(WorkerState::<i32>::try_with(|_| ()).is_none());
1410
1411 WorkerState::<i32>::init(123);
1413 assert!(WorkerState::<i32>::try_with(|v| *v).is_some());
1414 assert_eq!(WorkerState::<i32>::try_with(|v| *v), Some(123));
1415
1416 clear_worker_state();
1417 }
1418
1419 #[test]
1420 fn test_worker_state_take() {
1421 clear_worker_state();
1422
1423 WorkerState::<String>::init("test".to_string());
1424 assert!(WorkerState::<String>::is_initialized());
1425
1426 let taken = WorkerState::<String>::take();
1427 assert_eq!(taken, Some("test".to_string()));
1428 assert!(!WorkerState::<String>::is_initialized());
1429
1430 clear_worker_state();
1431 }
1432
1433 #[test]
1434 fn test_worker_state_replace() {
1435 clear_worker_state();
1436
1437 WorkerState::<u32>::init(10);
1438 let old = WorkerState::<u32>::replace(20);
1439 assert_eq!(old, Some(10));
1440 assert_eq!(WorkerState::<u32>::with(|v| *v), 20);
1441
1442 clear_worker_state();
1443 }
1444
1445 #[test]
1446 fn test_worker_cache_basic() {
1447 let mut cache = WorkerCache::<String, u32>::new(10);
1448
1449 cache.insert("key1".to_string(), 100);
1450 cache.insert("key2".to_string(), 200);
1451
1452 assert_eq!(cache.get(&"key1".to_string()), Some(&100));
1453 assert_eq!(cache.get(&"key3".to_string()), None);
1454 assert_eq!(cache.len(), 2);
1455 }
1456
1457 #[test]
1458 fn test_worker_cache_eviction() {
1459 let mut cache = WorkerCache::<u32, u32>::new(3);
1460
1461 cache.insert(1, 100);
1462 cache.insert(2, 200);
1463 cache.insert(3, 300);
1464 assert_eq!(cache.len(), 3);
1465
1466 cache.insert(4, 400);
1468 assert_eq!(cache.len(), 3);
1469 assert!(cache.contains(&4));
1470 }
1471
1472 #[test]
1473 fn test_worker_cache_hit_ratio() {
1474 let mut cache = WorkerCache::<u32, u32>::new(10);
1475
1476 cache.insert(1, 100);
1477 cache.get(&1); cache.get(&1); cache.get(&2); assert_eq!(cache.hits(), 2);
1482 assert_eq!(cache.misses(), 1);
1483 assert!((cache.hit_ratio() - 66.67).abs() < 1.0);
1484 }
1485
1486 #[test]
1487 fn test_state_factory() {
1488 clear_worker_state();
1489
1490 let factory = StateFactory::new(vec![1, 2, 3]);
1491 factory.init_for_worker();
1492
1493 WorkerState::<Vec<i32>>::with(|v| {
1494 assert_eq!(v, &vec![1, 2, 3]);
1495 });
1496
1497 clear_worker_state();
1498 }
1499
1500 #[test]
1501 fn test_worker_state_stats() {
1502 let stats = worker_state_stats();
1503 let _ = stats.inits();
1504 let _ = stats.accesses();
1505 }
1506
1507 #[test]
1508 fn test_worker_router_initialization() {
1509 let router = Arc::new(Router::new());
1510
1511 init_worker_router(router);
1512
1513 assert!(has_worker_router());
1514 assert!(worker_id().is_some());
1515
1516 WorkerRouter::with(|r| {
1517 assert!(r.routes.is_empty());
1518 });
1519
1520 clear_worker_router();
1522 }
1523
1524 #[test]
1525 fn test_worker_handle() {
1526 let handle = WorkerHandle::new(5, "test-worker");
1527 assert_eq!(handle.id, 5);
1528 assert_eq!(handle.name, "test-worker-5");
1529 }
1530
1531 #[test]
1532 fn test_worker_stats() {
1533 let stats = worker_stats();
1534
1535 let _ = stats.inits();
1537 let _ = stats.accesses();
1538 let _ = stats.clones();
1539 let _ = stats.clone_avoidance_ratio();
1540 }
1541}