1pub mod load_balancer;
65pub mod parallel_algorithms;
66pub mod parallel_allocator;
67pub mod scheduler;
68pub mod thread_pool;
69pub mod work_stealing;
70
71pub use load_balancer::{
73 BalancingStrategy, LoadBalancer, LoadBalancingAdvisor, LoadBalancingAnalysis, WorkloadMetrics,
74};
75
76pub use parallel_algorithms::{
78 parallel_prefix_sum, parallel_scan, ParallelArrayOps, ParallelConfig, ParallelFFT,
79 ParallelMatrixOps, ParallelPipeline, ParallelQuickSort, ScanMode,
80};
81
82pub use parallel_allocator::{ParallelAllocator, ParallelAllocatorConfig, ThreadLocalAllocator};
84
85pub use scheduler::{ParallelScheduler, SchedulerConfig, SchedulerStats, TaskPriority};
87
88pub use thread_pool::{Priority, ThreadPool, ThreadPoolConfig, ThreadPoolStats};
90
91pub use work_stealing::{task, PoolStats, Task, TaskResult, WorkStealingConfig, WorkStealingPool};
93
94use crate::error::{NumRs2Error, Result};
95use std::sync::Arc;
96use std::time::Duration;
97
98pub struct ParallelContext {
100 scheduler: Arc<ParallelScheduler>,
101 load_balancer: Arc<LoadBalancer>,
102 work_stealing_pool: Arc<WorkStealingPool>,
103}
104
105impl ParallelContext {
106 pub fn new() -> Result<Self> {
108 let num_cores = std::thread::available_parallelism()
109 .map(|n| n.get())
110 .unwrap_or(4);
111
112 let scheduler_config = SchedulerConfig::optimal_for_cores(num_cores);
113 let scheduler = Arc::new(ParallelScheduler::new(scheduler_config)?);
114
115 let load_balancer = Arc::new(LoadBalancer::new(BalancingStrategy::Adaptive, num_cores)?);
116
117 let work_stealing_pool = Arc::new(WorkStealingPool::new(num_cores)?);
118
119 Ok(Self {
120 scheduler,
121 load_balancer,
122 work_stealing_pool,
123 })
124 }
125
126 pub fn with_config(
128 scheduler_config: SchedulerConfig,
129 balancing_strategy: BalancingStrategy,
130 num_threads: usize,
131 ) -> Result<Self> {
132 let scheduler = Arc::new(ParallelScheduler::new(scheduler_config)?);
133 let load_balancer = Arc::new(LoadBalancer::new(balancing_strategy, num_threads)?);
134 let work_stealing_pool = Arc::new(WorkStealingPool::new(num_threads)?);
135
136 Ok(Self {
137 scheduler,
138 load_balancer,
139 work_stealing_pool,
140 })
141 }
142
143 pub fn scheduler(&self) -> &Arc<ParallelScheduler> {
145 &self.scheduler
146 }
147
148 pub fn load_balancer(&self) -> &Arc<LoadBalancer> {
150 &self.load_balancer
151 }
152
153 pub fn work_stealing_pool(&self) -> &Arc<WorkStealingPool> {
155 &self.work_stealing_pool
156 }
157
158 pub fn shutdown(&self) -> Result<()> {
160 self.work_stealing_pool.shutdown()?;
161 self.scheduler.shutdown()?;
162 Ok(())
163 }
164
165 pub fn workload_stats(&self) -> WorkloadMetrics {
167 self.load_balancer.current_metrics()
168 }
169}
170
171impl Default for ParallelContext {
172 fn default() -> Self {
173 Self::new().unwrap_or_else(|_| {
174 let num_cores = 1; let scheduler_config = SchedulerConfig::optimal_for_cores(num_cores);
178
179 let scheduler = ParallelScheduler::new(scheduler_config)
182 .ok()
183 .map(Arc::new)
184 .unwrap_or_else(|| {
185 Arc::new(
187 ParallelScheduler::new(SchedulerConfig {
188 num_threads: 1,
189 max_queue_size: 100,
190 enable_thread_affinity: false,
191 enable_adaptive_scheduling: false,
192 time_slice_ms: 10,
193 work_stealing_threshold: 5,
194 cache_aware_scheduling: false,
195 })
196 .unwrap_or_else(|_| {
197 panic!("Cannot create even minimal ParallelScheduler - system unusable")
198 }),
199 )
200 });
201
202 let load_balancer = LoadBalancer::new(BalancingStrategy::Adaptive, num_cores)
203 .ok()
204 .map(Arc::new)
205 .unwrap_or_else(|| panic!("Cannot create LoadBalancer - system unusable"));
206
207 let work_stealing_pool = WorkStealingPool::new(num_cores)
208 .ok()
209 .map(Arc::new)
210 .unwrap_or_else(|| panic!("Cannot create WorkStealingPool - system unusable"));
211
212 Self {
213 scheduler,
214 load_balancer,
215 work_stealing_pool,
216 }
217 })
218 }
219}
220
221lazy_static::lazy_static! {
222 static ref GLOBAL_PARALLEL_CONTEXT: std::sync::Mutex<Option<Arc<ParallelContext>>> =
224 std::sync::Mutex::new(None);
225
226 static ref GLOBAL_THREAD_POOL: Arc<ThreadPool> = {
229 let num_threads = std::env::var("NUMRS2_THREAD_COUNT")
230 .ok()
231 .and_then(|s| s.parse().ok())
232 .or_else(|| std::thread::available_parallelism().ok().map(|n| n.get()))
233 .unwrap_or(2); let config = ThreadPoolConfig {
236 num_threads: Some(num_threads),
237 enable_thread_pinning: false,
238 adaptive_threads: false,
239 min_threads: 1,
240 max_threads: num_threads,
241 queue_capacity: 10000,
242 steal_interval: Duration::from_micros(100),
243 idle_timeout: Duration::from_millis(100),
244 };
245
246 Arc::new(ThreadPool::with_config(config).unwrap_or_else(|_| {
247 ThreadPool::with_config(ThreadPoolConfig {
249 num_threads: Some(1),
250 ..Default::default()
251 }).expect("Failed to create fallback thread pool")
252 }))
253 };
254}
255
256pub fn initialize_parallel_context() -> Result<()> {
258 let context = Arc::new(ParallelContext::new()?);
259 let mut global = GLOBAL_PARALLEL_CONTEXT.lock().map_err(|e| {
260 NumRs2Error::RuntimeError(format!("Failed to acquire global context lock: {}", e))
261 })?;
262 *global = Some(context);
263 Ok(())
264}
265
266pub fn global_parallel_context() -> Result<Arc<ParallelContext>> {
268 let global = GLOBAL_PARALLEL_CONTEXT.lock().map_err(|e| {
269 NumRs2Error::RuntimeError(format!("Failed to acquire global context lock: {}", e))
270 })?;
271 global.clone().ok_or_else(|| {
272 NumRs2Error::RuntimeError("Global parallel context not initialized".to_string())
273 })
274}
275
276pub fn shutdown_parallel_context() -> Result<()> {
278 let mut global = GLOBAL_PARALLEL_CONTEXT.lock().map_err(|e| {
279 NumRs2Error::RuntimeError(format!("Failed to acquire global context lock: {}", e))
280 })?;
281 if let Some(context) = global.take() {
282 context.shutdown()?;
283 }
284 Ok(())
285}
286
287pub fn global_thread_pool() -> Arc<ThreadPool> {
292 Arc::clone(&GLOBAL_THREAD_POOL)
293}
294
295pub fn global_thread_count() -> usize {
297 GLOBAL_THREAD_POOL.num_threads()
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303
304 #[test]
305 fn test_parallel_context_creation() {
306 let context =
307 ParallelContext::new().expect("ParallelContext creation should succeed in test");
308 assert!(context.scheduler.num_threads() > 0);
309 assert!(context.load_balancer.num_workers() > 0);
310 }
311
312 #[test]
313 fn test_global_context_initialization() {
314 initialize_parallel_context().expect("initialize_parallel_context should succeed in test");
315 let context =
316 global_parallel_context().expect("global_parallel_context should succeed in test");
317 assert!(context.scheduler.num_threads() > 0);
318 shutdown_parallel_context().expect("shutdown_parallel_context should succeed in test");
319 }
320
321 #[test]
322 fn test_workload_stats() {
323 let context =
324 ParallelContext::new().expect("ParallelContext creation should succeed in test");
325 let stats = context.workload_stats();
326 assert_eq!(stats.active_tasks, 0);
327 assert!(stats.total_throughput >= 0.0);
328 }
329}