scirs2_core/parallel/
nested.rs

1//! Nested parallelism with controlled resource usage
2//!
3//! This module provides support for nested parallel operations with proper
4//! resource management to prevent thread explosion and maintain performance.
5
6use crate::error::{CoreError, CoreResult, ErrorContext, ErrorLocation};
7use crate::parallel::scheduler::{SchedulerConfigBuilder, WorkStealingScheduler};
8use rayon::iter::ParallelIterator;
9use rayon::prelude::*;
10use std::cell::RefCell;
11use std::sync::atomic::{AtomicUsize, Ordering};
12use std::sync::{Arc, Mutex, RwLock};
13
14thread_local! {
15    /// Thread-local nesting level
16    static NESTING_LEVEL: RefCell<usize> = const { RefCell::new(0) };
17
18    /// Thread-local parent context
19    static PARENT_CONTEXT: RefCell<Option<Arc<NestedContext>>> = const { RefCell::new(None) };
20}
21
22/// Global resource manager for nested parallelism
23static GLOBAL_RESOURCE_MANAGER: std::sync::OnceLock<Arc<ResourceManager>> =
24    std::sync::OnceLock::new();
25
26/// Get or create the global resource manager
27#[allow(dead_code)]
28fn get_resource_manager() -> Arc<ResourceManager> {
29    GLOBAL_RESOURCE_MANAGER
30        .get_or_init(|| Arc::new(ResourceManager::new()))
31        .clone()
32}
33
34/// Resource limits for nested parallelism
35#[derive(Debug, Clone)]
36pub struct ResourceLimits {
37    /// Maximum total number of threads across all nesting levels
38    pub max_total_threads: usize,
39    /// Maximum nesting depth
40    pub max_nesting_depth: usize,
41    /// Thread limit per nesting level
42    pub threads_per_level: Vec<usize>,
43    /// Memory limit in bytes
44    pub max_memory_bytes: usize,
45    /// CPU usage limit (0.0 to 1.0)
46    pub max_cpu_usage: f64,
47    /// Whether to enable thread pooling
48    pub enable_thread_pooling: bool,
49    /// Whether to enable work stealing across levels
50    pub enable_cross_level_stealing: bool,
51}
52
53impl Default for ResourceLimits {
54    fn default() -> Self {
55        let num_cpus = num_cpus::get();
56        Self {
57            max_total_threads: num_cpus * 2,
58            max_nesting_depth: 3,
59            threads_per_level: vec![num_cpus, num_cpus / 2, 1],
60            max_memory_bytes: 4 * 1024 * 1024 * 1024, // 4GB
61            max_cpu_usage: 0.9,
62            enable_thread_pooling: true,
63            enable_cross_level_stealing: false,
64        }
65    }
66}
67
68/// Context for nested parallel execution
69pub struct NestedContext {
70    /// Current nesting level (0 = top level)
71    level: usize,
72    /// Parent context (if any)
73    parent: Option<Arc<NestedContext>>,
74    /// Resource limits for this context
75    limits: ResourceLimits,
76    /// Number of active threads at this level
77    active_threads: AtomicUsize,
78    /// Scheduler for this level
79    scheduler: Option<Arc<Mutex<WorkStealingScheduler>>>,
80}
81
82impl NestedContext {
83    /// Create a new top-level context
84    pub fn new(limits: ResourceLimits) -> Self {
85        Self {
86            level: 0,
87            parent: None,
88            limits,
89            active_threads: AtomicUsize::new(0),
90            scheduler: None,
91        }
92    }
93
94    /// Create a child context
95    pub fn create_child(&self) -> CoreResult<Arc<NestedContext>> {
96        if self.level >= self.limits.max_nesting_depth {
97            return Err(CoreError::ConfigError(
98                ErrorContext::new(format!(
99                    "Maximum nesting depth {} exceeded",
100                    self.limits.max_nesting_depth
101                ))
102                .with_location(ErrorLocation::new(file!(), line!())),
103            ));
104        }
105
106        let child = NestedContext {
107            level: self.level + 1,
108            parent: Some(Arc::new(self.clone())),
109            limits: self.limits.clone(),
110            active_threads: AtomicUsize::new(0),
111            scheduler: None,
112        };
113
114        Ok(Arc::new(child))
115    }
116
117    /// Get the maximum number of threads allowed at this level
118    pub fn max_threads_at_level(&self) -> usize {
119        if self.level < self.limits.threads_per_level.len() {
120            self.limits.threads_per_level[self.level]
121        } else {
122            1 // Default to single thread for deep nesting
123        }
124    }
125
126    /// Try to acquire threads for parallel execution
127    pub fn try_acquire_threads(&self, requested: usize) -> usize {
128        let max_at_level = self.max_threads_at_level();
129        let resource_manager = get_resource_manager();
130
131        // Check global limit
132        let available_global = resource_manager.try_acquire_threads(requested);
133
134        // Check level limit
135        let current = self.active_threads.load(Ordering::Relaxed);
136        let available_at_level = max_at_level.saturating_sub(current);
137
138        // Take minimum of all constraints
139        let granted = requested.min(available_global).min(available_at_level);
140
141        if granted > 0 {
142            self.active_threads.fetch_add(granted, Ordering::Relaxed);
143        } else {
144            // Return any globally acquired threads if we can't use them
145            resource_manager.release_threads(available_global);
146        }
147
148        granted
149    }
150
151    /// Release acquired threads
152    pub fn release_threads(&self, count: usize) {
153        self.active_threads.fetch_sub(count, Ordering::Relaxed);
154        get_resource_manager().release_threads(count);
155    }
156
157    /// Get or create scheduler for this level
158    pub fn get_scheduler(&self) -> CoreResult<Arc<Mutex<WorkStealingScheduler>>> {
159        if let Some(ref scheduler) = self.scheduler {
160            return Ok(scheduler.clone());
161        }
162
163        // Create scheduler with appropriate configuration for this level
164        let config = SchedulerConfigBuilder::new()
165            .workers(self.max_threads_at_level())
166            .adaptive(true)
167            .enable_stealing_heuristics(true)
168            .enable_priorities(true)
169            .build();
170
171        let scheduler = WorkStealingScheduler::new(config);
172        Ok(Arc::new(Mutex::new(scheduler)))
173    }
174}
175
176impl Clone for NestedContext {
177    fn clone(&self) -> Self {
178        Self {
179            level: self.level,
180            parent: self.parent.clone(),
181            limits: self.limits.clone(),
182            active_threads: AtomicUsize::new(self.active_threads.load(Ordering::Relaxed)),
183            scheduler: self.scheduler.clone(),
184        }
185    }
186}
187
188/// Global resource manager for tracking system-wide resource usage
189pub struct ResourceManager {
190    /// Total threads in use across all levels
191    total_threads: AtomicUsize,
192    /// Memory usage tracking
193    memory_used: AtomicUsize,
194    /// CPU usage tracking
195    cpu_usage: RwLock<f64>,
196    /// Active contexts by level
197    active_contexts: RwLock<Vec<usize>>,
198}
199
200impl Default for ResourceManager {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206impl ResourceManager {
207    /// Create a new resource manager
208    pub fn new() -> Self {
209        let max_levels = 10;
210        Self {
211            total_threads: AtomicUsize::new(0),
212            memory_used: AtomicUsize::new(0),
213            cpu_usage: RwLock::new(0.0),
214            active_contexts: RwLock::new(vec![0; max_levels]),
215        }
216    }
217
218    /// Try to acquire threads from the global pool
219    pub fn try_acquire_threads(&self, requested: usize) -> usize {
220        let mut acquired = 0;
221
222        // Simple atomic loop to acquire threads
223        for _ in 0..requested {
224            let current = self.total_threads.load(Ordering::Relaxed);
225            let max_threads = num_cpus::get() * 2; // Global limit
226
227            if current < max_threads {
228                if self
229                    .total_threads
230                    .compare_exchange(current, current + 1, Ordering::Acquire, Ordering::Relaxed)
231                    .is_ok()
232                {
233                    acquired += 1;
234                } else {
235                    // Another thread modified the count, retry
236                    continue;
237                }
238            } else {
239                break;
240            }
241        }
242
243        acquired
244    }
245
246    /// Release threads back to the global pool
247    pub fn release_threads(&self, count: usize) {
248        self.total_threads.fetch_sub(count, Ordering::Release);
249    }
250
251    /// Update memory usage
252    pub fn update_memory_usage(&self, bytes: isize) {
253        if bytes > 0 {
254            self.memory_used
255                .fetch_add(bytes as usize, Ordering::Relaxed);
256        } else {
257            self.memory_used
258                .fetch_sub((-bytes) as usize, Ordering::Relaxed);
259        }
260    }
261
262    /// Get current resource usage statistics
263    pub fn get_usage_stats(&self) -> ResourceUsageStats {
264        ResourceUsageStats {
265            total_threads: self.total_threads.load(Ordering::Relaxed),
266            memory_bytes: self.memory_used.load(Ordering::Relaxed),
267            cpu_usage: *self.cpu_usage.read().expect("Operation failed"),
268            active_contexts_per_level: self
269                .active_contexts
270                .read()
271                .expect("Operation failed")
272                .clone(),
273        }
274    }
275}
276
277/// Resource usage statistics
278#[derive(Debug, Clone)]
279pub struct ResourceUsageStats {
280    /// Total threads in use
281    pub total_threads: usize,
282    /// Memory usage in bytes
283    pub memory_bytes: usize,
284    /// CPU usage (0.0 to 1.0)
285    pub cpu_usage: f64,
286    /// Number of active contexts at each nesting level
287    pub active_contexts_per_level: Vec<usize>,
288}
289
290/// Nested parallel execution scope
291pub struct NestedScope<'a> {
292    context: Arc<NestedContext>,
293    acquired_threads: usize,
294    phantom: std::marker::PhantomData<&'a ()>,
295}
296
297impl NestedScope<'_> {
298    /// Execute a function in parallel within this scope
299    pub fn execute<F, R>(&self, f: F) -> CoreResult<R>
300    where
301        F: FnOnce() -> R + Send,
302        R: Send,
303    {
304        // Set thread-local context
305        PARENT_CONTEXT.with(|ctx| {
306            *ctx.borrow_mut() = Some(self.context.clone());
307        });
308
309        // Set nesting level
310        NESTING_LEVEL.with(|level| {
311            *level.borrow_mut() = self.context.level;
312        });
313
314        // Execute the function
315        let result = f();
316
317        // Clear thread-local state
318        PARENT_CONTEXT.with(|ctx| {
319            *ctx.borrow_mut() = None;
320        });
321
322        Ok(result)
323    }
324
325    /// Execute parallel iterator within this scope
326    pub fn par_iter<I, F, R>(&self, items: I, f: F) -> CoreResult<Vec<R>>
327    where
328        I: IntoParallelIterator,
329        I::Item: Send,
330        F: Fn(I::Item) -> R + Send + Sync,
331        R: Send,
332    {
333        // Convert to parallel iterator once
334        let results: Vec<R> = items.into_par_iter().map(f).collect();
335
336        Ok(results)
337    }
338}
339
340impl Drop for NestedScope<'_> {
341    fn drop(&mut self) {
342        // Release acquired threads
343        if self.acquired_threads > 0 {
344            self.context.release_threads(self.acquired_threads);
345        }
346    }
347}
348
349/// Execute a function with nested parallelism support
350#[allow(dead_code)]
351pub fn nested_scope<F, R>(f: F) -> CoreResult<R>
352where
353    F: FnOnce(&NestedScope) -> CoreResult<R>,
354{
355    nested_scope_with_limits(ResourceLimits::default(), f)
356}
357
358/// Execute a function with nested parallelism support and custom limits
359#[allow(dead_code)]
360pub fn nested_scope_with_limits<F, R>(limits: ResourceLimits, f: F) -> CoreResult<R>
361where
362    F: FnOnce(&NestedScope) -> CoreResult<R>,
363{
364    // Check if we're already in a nested context
365    let context = match PARENT_CONTEXT
366        .with(|ctx| ctx.borrow().as_ref().map(|parent| parent.create_child()))
367    {
368        Some(child_result) => child_result?,
369        None => {
370            // No parent context, create a new root context with level 0
371            Arc::new(NestedContext::new(limits.clone()))
372        }
373    };
374
375    // Try to acquire threads
376    let requested_threads = context.max_threads_at_level();
377    let acquired_threads = context.try_acquire_threads(requested_threads);
378
379    // Create scope
380    let scope = NestedScope {
381        context: context.clone(),
382        acquired_threads,
383        phantom: std::marker::PhantomData,
384    };
385
386    // Set the nesting level for the current thread
387    let old_level = NESTING_LEVEL.with(|level| {
388        let old = *level.borrow();
389        *level.borrow_mut() = context.level;
390        old
391    });
392
393    // Set parent context
394    let old_context = PARENT_CONTEXT.with(|ctx| ctx.borrow_mut().replace(context));
395
396    // Execute function
397    let result = f(&scope);
398
399    // Restore previous nesting level
400    NESTING_LEVEL.with(|level| {
401        *level.borrow_mut() = old_level;
402    });
403
404    // Restore previous context
405    PARENT_CONTEXT.with(|ctx| {
406        *ctx.borrow_mut() = old_context;
407    });
408
409    result
410}
411
412/// Get the current nesting level
413#[allow(dead_code)]
414pub fn current_nesting_level() -> usize {
415    NESTING_LEVEL.with(|level| *level.borrow())
416}
417
418/// Check if nested parallelism is allowed at the current level
419#[allow(dead_code)]
420pub fn is_nested_parallelism_allowed() -> bool {
421    PARENT_CONTEXT.with(|ctx| {
422        if let Some(ref context) = *ctx.borrow() {
423            context.level < context.limits.max_nesting_depth
424        } else {
425            true // Top level always allowed
426        }
427    })
428}
429
430/// Adaptive parallel execution based on nesting level
431#[allow(dead_code)]
432pub fn adaptive_par_for_each<T, F>(data: Vec<T>, f: F) -> CoreResult<()>
433where
434    T: Send,
435    F: Fn(T) + Send + Sync,
436{
437    if is_nested_parallelism_allowed() {
438        data.into_par_iter().for_each(f);
439    } else {
440        // Fall back to sequential at deep nesting levels
441        data.into_iter().for_each(f);
442    }
443    Ok(())
444}
445
446/// Adaptive parallel map based on nesting level
447#[allow(dead_code)]
448pub fn adaptive_par_map<T, F, R>(data: Vec<T>, f: F) -> CoreResult<Vec<R>>
449where
450    T: Send,
451    F: Fn(T) -> R + Send + Sync,
452    R: Send,
453{
454    if is_nested_parallelism_allowed() {
455        Ok(data.into_par_iter().map(f).collect())
456    } else {
457        // Fall back to sequential at deep nesting levels
458        Ok(data.into_iter().map(f).collect())
459    }
460}
461
462/// Policy for handling nested parallelism
463#[derive(Debug, Clone, Copy, PartialEq, Eq)]
464pub enum NestedPolicy {
465    /// Allow nested parallelism with resource limits
466    Allow,
467    /// Convert to sequential execution at nested levels
468    Sequential,
469    /// Distribute work to parent level scheduler
470    Delegate,
471    /// Throw error if nested parallelism is attempted
472    Deny,
473}
474
475/// Configuration for nested parallel execution
476#[derive(Debug, Clone)]
477pub struct NestedConfig {
478    /// Policy for handling nested parallelism
479    pub policy: NestedPolicy,
480    /// Resource limits
481    pub limits: ResourceLimits,
482    /// Whether to track resource usage
483    pub track_usage: bool,
484    /// Whether to enable adaptive scheduling
485    pub adaptive_scheduling: bool,
486}
487
488impl Default for NestedConfig {
489    fn default() -> Self {
490        Self {
491            policy: NestedPolicy::Allow,
492            limits: ResourceLimits::default(),
493            track_usage: true,
494            adaptive_scheduling: true,
495        }
496    }
497}
498
499/// Execute with specific nested parallelism policy
500#[allow(dead_code)]
501pub fn with_nested_policy<F, R>(config: NestedConfig, f: F) -> CoreResult<R>
502where
503    F: FnOnce() -> CoreResult<R>,
504{
505    match config.policy {
506        NestedPolicy::Allow => nested_scope_with_limits(config.limits, |_scope| f()),
507        NestedPolicy::Sequential => {
508            // Force sequential execution
509            NESTING_LEVEL.with(|level| {
510                *level.borrow_mut() = usize::MAX;
511            });
512            let result = f();
513            NESTING_LEVEL.with(|level| {
514                *level.borrow_mut() = 0;
515            });
516            result
517        }
518        NestedPolicy::Delegate => {
519            // Delegate to parent scheduler if available
520            // For now, just execute directly as delegation is complex
521            f()
522        }
523        NestedPolicy::Deny => {
524            // Check if we're inside any nested scope (even level 0)
525            let is_nested = PARENT_CONTEXT.with(|ctx| ctx.borrow().is_some());
526            if is_nested {
527                Err(CoreError::ConfigError(
528                    ErrorContext::new("Nested parallelism not allowed".to_string())
529                        .with_location(ErrorLocation::new(file!(), line!())),
530                ))
531            } else {
532                f()
533            }
534        }
535    }
536}
537
538/// Get parent level scheduler if available
539#[allow(dead_code)]
540fn get_parent_scheduler() -> Option<Arc<Mutex<WorkStealingScheduler>>> {
541    PARENT_CONTEXT.with(|ctx| {
542        ctx.borrow()
543            .as_ref()
544            .and_then(|context| context.scheduler.clone())
545    })
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551
552    #[test]
553    fn test_basic_nested_execution() {
554        let result = nested_scope(|scope| {
555            let data: Vec<i32> = (0..100).collect();
556            scope.par_iter(data, |x| x * 2)
557        })
558        .expect("Operation failed");
559
560        assert_eq!(result.len(), 100);
561        assert_eq!(result[0], 0);
562        assert_eq!(result[50], 100);
563    }
564
565    #[test]
566    fn test_nesting_levels() {
567        nested_scope(|outer_scope| {
568            assert_eq!(current_nesting_level(), 0);
569
570            outer_scope.execute(|| {
571                nested_scope(|inner_scope| {
572                    assert_eq!(current_nesting_level(), 1);
573
574                    inner_scope.execute(|| {
575                        nested_scope(|_deepest_scope| {
576                            assert_eq!(current_nesting_level(), 2);
577                            Ok(())
578                        })
579                        .expect("Operation failed")
580                    })
581                })
582                .expect("Operation failed")
583            })
584        })
585        .expect("Operation failed");
586    }
587
588    #[test]
589    fn test_resource_limits() {
590        let limits = ResourceLimits {
591            max_total_threads: 4,
592            max_nesting_depth: 2,
593            threads_per_level: vec![2, 1],
594            ..Default::default()
595        };
596
597        let result = nested_scope_with_limits(limits, |scope| {
598            let context = &scope.context;
599            assert!(context.max_threads_at_level() <= 2);
600            Ok(42)
601        });
602
603        assert_eq!(result.expect("Operation failed"), 42);
604    }
605
606    #[test]
607    fn test_sequential_policy() {
608        let config = NestedConfig {
609            policy: NestedPolicy::Sequential,
610            ..Default::default()
611        };
612
613        let result = with_nested_policy(config, || {
614            // This should run sequentially even if we try parallel
615            let data: Vec<i32> = (0..10).collect();
616            let sum: i32 = data.into_par_iter().sum();
617            Ok(sum)
618        });
619
620        assert_eq!(result.expect("Operation failed"), 45);
621    }
622
623    #[test]
624    fn test_deny_policy() {
625        let config = NestedConfig {
626            policy: NestedPolicy::Deny,
627            ..Default::default()
628        };
629
630        // Top level should work
631        let result = with_nested_policy(config.clone(), || Ok(1));
632        assert!(result.is_ok());
633
634        // Nested should fail - first establish a nested context, then try to use deny policy
635        let result = nested_scope(|_scope| {
636            // Now we're at nesting level 1
637            // This should fail because deny policy forbids nested parallelism
638            with_nested_policy(config, || Ok(2))
639        });
640
641        assert!(result.is_err());
642    }
643}