1use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
7use crate::parallel::scheduler::{SchedulerConfigBuilder, WorkStealingScheduler};
8use rayon::iter::ParallelIterator;
9use rayon::prelude::*;
10use std::cell::RefCell;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::sync::{Arc, Mutex, RwLock};
13
14thread_local! {
15 static NESTING_LEVEL: RefCell<usize> = const { RefCell::new(0) };
17
18 static PARENT_CONTEXT: RefCell<Option<Arc<NestedContext>>> = const { RefCell::new(None) };
20}
21
22static GLOBAL_RESOURCE_MANAGER: std::sync::OnceLock<Arc<ResourceManager>> =
24 std::sync::OnceLock::new();
25
26#[allow(dead_code)]
28fn get_resource_manager() -> Arc<ResourceManager> {
29 GLOBAL_RESOURCE_MANAGER
30 .get_or_init(|| Arc::new(ResourceManager::new()))
31 .clone()
32}
33
34#[derive(Debug, Clone)]
36pub struct ResourceLimits {
37 pub max_total_threads: usize,
39 pub max_nesting_depth: usize,
41 pub threads_per_level: Vec<usize>,
43 pub max_memory_bytes: usize,
45 pub max_cpu_usage: f64,
47 pub enable_thread_pooling: bool,
49 pub enable_cross_level_stealing: bool,
51}
52
53impl Default for ResourceLimits {
54 fn default() -> Self {
55 let num_cpus = num_cpus::get();
56 Self {
57 max_total_threads: num_cpus * 2,
58 max_nesting_depth: 3,
59 threads_per_level: vec![num_cpus, num_cpus / 2, 1],
60 max_memory_bytes: 4 * 1024 * 1024 * 1024, max_cpu_usage: 0.9,
62 enable_thread_pooling: true,
63 enable_cross_level_stealing: false,
64 }
65 }
66}
67
68pub struct NestedContext {
70 level: usize,
72 parent: Option<Arc<NestedContext>>,
74 limits: ResourceLimits,
76 active_threads: AtomicUsize,
78 scheduler: Option<Arc<Mutex<WorkStealingScheduler>>>,
80}
81
82impl NestedContext {
83 pub fn new(limits: ResourceLimits) -> Self {
85 Self {
86 level: 0,
87 parent: None,
88 limits,
89 active_threads: AtomicUsize::new(0),
90 scheduler: None,
91 }
92 }
93
94 pub fn create_child(&self) -> CoreResult<Arc<NestedContext>> {
96 if self.level >= self.limits.max_nesting_depth {
97 return Err(CoreError::ConfigError(
98 ErrorContext::new(format!(
99 "Maximum nesting depth {} exceeded",
100 self.limits.max_nesting_depth
101 ))
102 .with_location(ErrorLocation::new(file!(), line!())),
103 ));
104 }
105
106 let child = NestedContext {
107 level: self.level + 1,
108 parent: Some(Arc::new(self.clone())),
109 limits: self.limits.clone(),
110 active_threads: AtomicUsize::new(0),
111 scheduler: None,
112 };
113
114 Ok(Arc::new(child))
115 }
116
117 pub fn max_threads_at_level(&self) -> usize {
119 if self.level < self.limits.threads_per_level.len() {
120 self.limits.threads_per_level[self.level]
121 } else {
122 1 }
124 }
125
126 pub fn try_acquire_threads(&self, requested: usize) -> usize {
128 let max_at_level = self.max_threads_at_level();
129 let resource_manager = get_resource_manager();
130
131 let available_global = resource_manager.try_acquire_threads(requested);
133
134 let current = self.active_threads.load(Ordering::Relaxed);
136 let available_at_level = max_at_level.saturating_sub(current);
137
138 let granted = requested.min(available_global).min(available_at_level);
140
141 if granted > 0 {
142 self.active_threads.fetch_add(granted, Ordering::Relaxed);
143 } else {
144 resource_manager.release_threads(available_global);
146 }
147
148 granted
149 }
150
151 pub fn release_threads(&self, count: usize) {
153 self.active_threads.fetch_sub(count, Ordering::Relaxed);
154 get_resource_manager().release_threads(count);
155 }
156
157 pub fn get_scheduler(&self) -> CoreResult<Arc<Mutex<WorkStealingScheduler>>> {
159 if let Some(ref scheduler) = self.scheduler {
160 return Ok(scheduler.clone());
161 }
162
163 let config = SchedulerConfigBuilder::new()
165 .workers(self.max_threads_at_level())
166 .adaptive(true)
167 .enable_stealing_heuristics(true)
168 .enable_priorities(true)
169 .build();
170
171 let scheduler = WorkStealingScheduler::new(config);
172 Ok(Arc::new(Mutex::new(scheduler)))
173 }
174}
175
176impl Clone for NestedContext {
177 fn clone(&self) -> Self {
178 Self {
179 level: self.level,
180 parent: self.parent.clone(),
181 limits: self.limits.clone(),
182 active_threads: AtomicUsize::new(self.active_threads.load(Ordering::Relaxed)),
183 scheduler: self.scheduler.clone(),
184 }
185 }
186}
187
188pub struct ResourceManager {
190 total_threads: AtomicUsize,
192 memory_used: AtomicUsize,
194 cpu_usage: RwLock<f64>,
196 active_contexts: RwLock<Vec<usize>>,
198}
199
200impl Default for ResourceManager {
201 fn default() -> Self {
202 Self::new()
203 }
204}
205
206impl ResourceManager {
207 pub fn new() -> Self {
209 let max_levels = 10;
210 Self {
211 total_threads: AtomicUsize::new(0),
212 memory_used: AtomicUsize::new(0),
213 cpu_usage: RwLock::new(0.0),
214 active_contexts: RwLock::new(vec![0; max_levels]),
215 }
216 }
217
218 pub fn try_acquire_threads(&self, requested: usize) -> usize {
220 let mut acquired = 0;
221
222 for _ in 0..requested {
224 let current = self.total_threads.load(Ordering::Relaxed);
225 let max_threads = num_cpus::get() * 2; if current < max_threads {
228 if self
229 .total_threads
230 .compare_exchange(current, current + 1, Ordering::Acquire, Ordering::Relaxed)
231 .is_ok()
232 {
233 acquired += 1;
234 } else {
235 continue;
237 }
238 } else {
239 break;
240 }
241 }
242
243 acquired
244 }
245
246 pub fn release_threads(&self, count: usize) {
248 self.total_threads.fetch_sub(count, Ordering::Release);
249 }
250
251 pub fn update_memory_usage(&self, bytes: isize) {
253 if bytes > 0 {
254 self.memory_used
255 .fetch_add(bytes as usize, Ordering::Relaxed);
256 } else {
257 self.memory_used
258 .fetch_sub((-bytes) as usize, Ordering::Relaxed);
259 }
260 }
261
262 pub fn get_usage_stats(&self) -> ResourceUsageStats {
264 ResourceUsageStats {
265 total_threads: self.total_threads.load(Ordering::Relaxed),
266 memory_bytes: self.memory_used.load(Ordering::Relaxed),
267 cpu_usage: *self.cpu_usage.read().expect("Operation failed"),
268 active_contexts_per_level: self
269 .active_contexts
270 .read()
271 .expect("Operation failed")
272 .clone(),
273 }
274 }
275}
276
277#[derive(Debug, Clone)]
279pub struct ResourceUsageStats {
280 pub total_threads: usize,
282 pub memory_bytes: usize,
284 pub cpu_usage: f64,
286 pub active_contexts_per_level: Vec<usize>,
288}
289
290pub struct NestedScope<'a> {
292 context: Arc<NestedContext>,
293 acquired_threads: usize,
294 phantom: std::marker::PhantomData<&'a ()>,
295}
296
297impl NestedScope<'_> {
298 pub fn execute<F, R>(&self, f: F) -> CoreResult<R>
300 where
301 F: FnOnce() -> R + Send,
302 R: Send,
303 {
304 PARENT_CONTEXT.with(|ctx| {
306 *ctx.borrow_mut() = Some(self.context.clone());
307 });
308
309 NESTING_LEVEL.with(|level| {
311 *level.borrow_mut() = self.context.level;
312 });
313
314 let result = f();
316
317 PARENT_CONTEXT.with(|ctx| {
319 *ctx.borrow_mut() = None;
320 });
321
322 Ok(result)
323 }
324
325 pub fn par_iter<I, F, R>(&self, items: I, f: F) -> CoreResult<Vec<R>>
327 where
328 I: IntoParallelIterator,
329 I::Item: Send,
330 F: Fn(I::Item) -> R + Send + Sync,
331 R: Send,
332 {
333 let results: Vec<R> = items.into_par_iter().map(f).collect();
335
336 Ok(results)
337 }
338}
339
340impl Drop for NestedScope<'_> {
341 fn drop(&mut self) {
342 if self.acquired_threads > 0 {
344 self.context.release_threads(self.acquired_threads);
345 }
346 }
347}
348
349#[allow(dead_code)]
351pub fn nested_scope<F, R>(f: F) -> CoreResult<R>
352where
353 F: FnOnce(&NestedScope) -> CoreResult<R>,
354{
355 nested_scope_with_limits(ResourceLimits::default(), f)
356}
357
358#[allow(dead_code)]
360pub fn nested_scope_with_limits<F, R>(limits: ResourceLimits, f: F) -> CoreResult<R>
361where
362 F: FnOnce(&NestedScope) -> CoreResult<R>,
363{
364 let context = match PARENT_CONTEXT
366 .with(|ctx| ctx.borrow().as_ref().map(|parent| parent.create_child()))
367 {
368 Some(child_result) => child_result?,
369 None => {
370 Arc::new(NestedContext::new(limits.clone()))
372 }
373 };
374
375 let requested_threads = context.max_threads_at_level();
377 let acquired_threads = context.try_acquire_threads(requested_threads);
378
379 let scope = NestedScope {
381 context: context.clone(),
382 acquired_threads,
383 phantom: std::marker::PhantomData,
384 };
385
386 let old_level = NESTING_LEVEL.with(|level| {
388 let old = *level.borrow();
389 *level.borrow_mut() = context.level;
390 old
391 });
392
393 let old_context = PARENT_CONTEXT.with(|ctx| ctx.borrow_mut().replace(context));
395
396 let result = f(&scope);
398
399 NESTING_LEVEL.with(|level| {
401 *level.borrow_mut() = old_level;
402 });
403
404 PARENT_CONTEXT.with(|ctx| {
406 *ctx.borrow_mut() = old_context;
407 });
408
409 result
410}
411
412#[allow(dead_code)]
414pub fn current_nesting_level() -> usize {
415 NESTING_LEVEL.with(|level| *level.borrow())
416}
417
418#[allow(dead_code)]
420pub fn is_nested_parallelism_allowed() -> bool {
421 PARENT_CONTEXT.with(|ctx| {
422 if let Some(ref context) = *ctx.borrow() {
423 context.level < context.limits.max_nesting_depth
424 } else {
425 true }
427 })
428}
429
430#[allow(dead_code)]
432pub fn adaptive_par_for_each<T, F>(data: Vec<T>, f: F) -> CoreResult<()>
433where
434 T: Send,
435 F: Fn(T) + Send + Sync,
436{
437 if is_nested_parallelism_allowed() {
438 data.into_par_iter().for_each(f);
439 } else {
440 data.into_iter().for_each(f);
442 }
443 Ok(())
444}
445
446#[allow(dead_code)]
448pub fn adaptive_par_map<T, F, R>(data: Vec<T>, f: F) -> CoreResult<Vec<R>>
449where
450 T: Send,
451 F: Fn(T) -> R + Send + Sync,
452 R: Send,
453{
454 if is_nested_parallelism_allowed() {
455 Ok(data.into_par_iter().map(f).collect())
456 } else {
457 Ok(data.into_iter().map(f).collect())
459 }
460}
461
462#[derive(Debug, Clone, Copy, PartialEq, Eq)]
464pub enum NestedPolicy {
465 Allow,
467 Sequential,
469 Delegate,
471 Deny,
473}
474
475#[derive(Debug, Clone)]
477pub struct NestedConfig {
478 pub policy: NestedPolicy,
480 pub limits: ResourceLimits,
482 pub track_usage: bool,
484 pub adaptive_scheduling: bool,
486}
487
488impl Default for NestedConfig {
489 fn default() -> Self {
490 Self {
491 policy: NestedPolicy::Allow,
492 limits: ResourceLimits::default(),
493 track_usage: true,
494 adaptive_scheduling: true,
495 }
496 }
497}
498
499#[allow(dead_code)]
501pub fn with_nested_policy<F, R>(config: NestedConfig, f: F) -> CoreResult<R>
502where
503 F: FnOnce() -> CoreResult<R>,
504{
505 match config.policy {
506 NestedPolicy::Allow => nested_scope_with_limits(config.limits, |_scope| f()),
507 NestedPolicy::Sequential => {
508 NESTING_LEVEL.with(|level| {
510 *level.borrow_mut() = usize::MAX;
511 });
512 let result = f();
513 NESTING_LEVEL.with(|level| {
514 *level.borrow_mut() = 0;
515 });
516 result
517 }
518 NestedPolicy::Delegate => {
519 f()
522 }
523 NestedPolicy::Deny => {
524 let is_nested = PARENT_CONTEXT.with(|ctx| ctx.borrow().is_some());
526 if is_nested {
527 Err(CoreError::ConfigError(
528 ErrorContext::new("Nested parallelism not allowed".to_string())
529 .with_location(ErrorLocation::new(file!(), line!())),
530 ))
531 } else {
532 f()
533 }
534 }
535 }
536}
537
538#[allow(dead_code)]
540fn get_parent_scheduler() -> Option<Arc<Mutex<WorkStealingScheduler>>> {
541 PARENT_CONTEXT.with(|ctx| {
542 ctx.borrow()
543 .as_ref()
544 .and_then(|context| context.scheduler.clone())
545 })
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551
552 #[test]
553 fn test_basic_nested_execution() {
554 let result = nested_scope(|scope| {
555 let data: Vec<i32> = (0..100).collect();
556 scope.par_iter(data, |x| x * 2)
557 })
558 .expect("Operation failed");
559
560 assert_eq!(result.len(), 100);
561 assert_eq!(result[0], 0);
562 assert_eq!(result[50], 100);
563 }
564
565 #[test]
566 fn test_nesting_levels() {
567 nested_scope(|outer_scope| {
568 assert_eq!(current_nesting_level(), 0);
569
570 outer_scope.execute(|| {
571 nested_scope(|inner_scope| {
572 assert_eq!(current_nesting_level(), 1);
573
574 inner_scope.execute(|| {
575 nested_scope(|_deepest_scope| {
576 assert_eq!(current_nesting_level(), 2);
577 Ok(())
578 })
579 .expect("Operation failed")
580 })
581 })
582 .expect("Operation failed")
583 })
584 })
585 .expect("Operation failed");
586 }
587
588 #[test]
589 fn test_resource_limits() {
590 let limits = ResourceLimits {
591 max_total_threads: 4,
592 max_nesting_depth: 2,
593 threads_per_level: vec![2, 1],
594 ..Default::default()
595 };
596
597 let result = nested_scope_with_limits(limits, |scope| {
598 let context = &scope.context;
599 assert!(context.max_threads_at_level() <= 2);
600 Ok(42)
601 });
602
603 assert_eq!(result.expect("Operation failed"), 42);
604 }
605
606 #[test]
607 fn test_sequential_policy() {
608 let config = NestedConfig {
609 policy: NestedPolicy::Sequential,
610 ..Default::default()
611 };
612
613 let result = with_nested_policy(config, || {
614 let data: Vec<i32> = (0..10).collect();
616 let sum: i32 = data.into_par_iter().sum();
617 Ok(sum)
618 });
619
620 assert_eq!(result.expect("Operation failed"), 45);
621 }
622
623 #[test]
624 fn test_deny_policy() {
625 let config = NestedConfig {
626 policy: NestedPolicy::Deny,
627 ..Default::default()
628 };
629
630 let result = with_nested_policy(config.clone(), || Ok(1));
632 assert!(result.is_ok());
633
634 let result = nested_scope(|_scope| {
636 with_nested_policy(config, || Ok(2))
639 });
640
641 assert!(result.is_err());
642 }
643}