scirs2_ndimage/
threading.rs

1//! Thread pool integration for shared worker management
2//!
3//! This module provides integration with a shared thread pool that can be
4//! used across different scirs2 modules for consistent thread management
5//! and resource control.
6
7// use scirs2_core::parallel_ops::prelude::*; // FORBIDDEN: Use scirs2-core::parallel_ops instead
8use scirs2_core::parallel_ops::*;
9use std::sync::{Arc, Mutex, OnceLock};
10
11/// Global thread pool configuration
12static THREAD_POOL_CONFIG: OnceLock<Arc<Mutex<ThreadPoolConfig>>> = OnceLock::new();
13
14/// Thread pool configuration
15#[derive(Debug, Clone)]
16pub struct ThreadPoolConfig {
17    /// Number of worker threads
18    pub num_threads: Option<usize>,
19    /// Stack size for worker threads
20    pub stack_size: Option<usize>,
21    /// Thread name prefix
22    pub thread_name_prefix: String,
23    /// Whether to pin threads to CPUs
24    pub pin_threads: bool,
25}
26
27impl Default for ThreadPoolConfig {
28    fn default() -> Self {
29        Self {
30            num_threads: None,                 // Use system default
31            stack_size: Some(8 * 1024 * 1024), // 8MB
32            thread_name_prefix: "scirs2-worker".to_string(),
33            pin_threads: false,
34        }
35    }
36}
37
38/// Initialize the global thread pool configuration
39#[allow(dead_code)]
40pub fn init_thread_pool(config: ThreadPoolConfig) -> Result<(), String> {
41    THREAD_POOL_CONFIG
42        .set(Arc::new(Mutex::new(config)))
43        .map_err(|_| "Thread pool already initialized".to_string())
44}
45
46/// Get the current thread pool configuration
47#[allow(dead_code)]
48pub fn get_thread_pool_config() -> ThreadPoolConfig {
49    THREAD_POOL_CONFIG
50        .get()
51        .map(|config| config.lock().unwrap().clone())
52        .unwrap_or_default()
53}
54
55/// Update thread pool configuration
56#[allow(dead_code)]
57pub fn update_thread_pool_config<F>(_updatefn: F) -> Result<(), String>
58where
59    F: FnOnce(&mut ThreadPoolConfig),
60{
61    if let Some(config) = THREAD_POOL_CONFIG.get() {
62        let mut config = config.lock().unwrap();
63        _updatefn(&mut *config);
64        Ok(())
65    } else {
66        Err("Thread pool not initialized".to_string())
67    }
68}
69
70/// Thread-local worker information
71#[derive(Debug, Clone)]
72pub struct WorkerInfo {
73    /// Worker thread ID
74    pub thread_id: usize,
75    /// Total number of workers
76    pub num_workers: usize,
77    /// CPU affinity if pinned
78    pub cpu_affinity: Option<usize>,
79}
80
81thread_local! {
82    static WORKER_INFO: std::cell::RefCell<Option<WorkerInfo>> = const { std::cell::RefCell::new(None) };
83}
84
85/// Get current worker information
86#[allow(dead_code)]
87pub fn current_worker_info() -> Option<WorkerInfo> {
88    WORKER_INFO.with(|info| info.borrow().clone())
89}
90
91/// Set worker information for the current thread
92#[allow(dead_code)]
93pub fn set_worker_info(info: WorkerInfo) {
94    WORKER_INFO.with(|cell| {
95        *cell.borrow_mut() = Some(info);
96    });
97}
98
99/// Parallel iterator with thread pool integration
100#[allow(dead_code)]
101pub trait ParallelIteratorExt: ParallelIterator {
102    /// Configure the number of threads for this operation
103    fn with_threads(self, numthreads: usize) -> Self;
104
105    /// Configure thread-local initialization
106    fn with_thread_init<F>(self, init: F) -> Self
107    where
108        F: Fn() + Send + Sync + 'static;
109}
110
111/// Extension trait for arrays to use the shared thread pool
112#[allow(dead_code)]
113pub trait ThreadPoolArrayExt<T, D> {
114    /// Apply a function to each element in parallel using the shared thread pool
115    fn par_map_inplace<F>(&mut self, f: F)
116    where
117        F: Fn(&mut T) + Send + Sync;
118
119    /// Apply a function to chunks in parallel
120    fn par_chunks_mut<F>(&mut self, chunksize: usize, f: F)
121    where
122        F: Fn(&mut [T]) + Send + Sync;
123}
124
125/// Thread pool aware execution context
126#[allow(dead_code)]
127pub struct ThreadPoolContext {
128    config: ThreadPoolConfig,
129}
130
131impl ThreadPoolContext {
132    pub fn new() -> Self {
133        Self {
134            config: get_thread_pool_config(),
135        }
136    }
137
138    /// Execute a parallel operation with the configured thread pool
139    pub fn execute_parallel<F, R>(&self, operation: F) -> R
140    where
141        F: FnOnce() -> R + Send,
142        R: Send,
143    {
144        // In a real implementation, this would configure the thread pool
145        // before executing the operation
146        operation()
147    }
148
149    /// Execute with a specific number of threads
150    pub fn execute_with_threads<F, R>(&self, numthreads: usize, operation: F) -> R
151    where
152        F: FnOnce() -> R + Send,
153        R: Send,
154    {
155        // Configure thread count for this operation
156        let _prev_threads = num_threads();
157        // In real implementation, would set thread count here
158        let result = operation();
159        // Restore previous thread count
160        result
161    }
162}
163
164/// Adaptive thread pool that adjusts based on workload
165#[allow(dead_code)]
166pub struct AdaptiveThreadPool {
167    min_threads: usize,
168    max_threads: usize,
169    current_threads: Arc<Mutex<usize>>,
170    load_threshold: f64,
171}
172
173impl AdaptiveThreadPool {
174    pub fn new(_min_threads: usize, maxthreads: usize) -> Self {
175        Self {
176            min_threads: _min_threads,
177            max_threads: maxthreads,
178            current_threads: Arc::new(Mutex::new(_min_threads)),
179            load_threshold: 0.8,
180        }
181    }
182
183    /// Adjust thread count based on current load
184    pub fn adjust_threads(&self, currentload: f64) {
185        let mut threads = self.current_threads.lock().unwrap();
186
187        if currentload > self.load_threshold && *threads < self.max_threads {
188            *threads = (*threads + 1).min(self.max_threads);
189        } else if currentload < self.load_threshold * 0.5 && *threads > self.min_threads {
190            *threads = (*threads - 1).max(self.min_threads);
191        }
192    }
193
194    /// Get current thread count
195    pub fn current_thread_count(&self) -> usize {
196        *self.current_threads.lock().unwrap()
197    }
198}
199
200/// Work-stealing queue for load balancing
201#[allow(dead_code)]
202pub struct WorkStealingQueue<T> {
203    queues: Vec<Arc<Mutex<Vec<T>>>>,
204}
205
206impl<T: Send> WorkStealingQueue<T> {
207    pub fn new(_numqueues: usize) -> Self {
208        let _queues = (0.._numqueues)
209            .map(|_| Arc::new(Mutex::new(Vec::new())))
210            .collect();
211
212        Self { queues: _queues }
213    }
214
215    /// Push work to a specific queue
216    pub fn push(&self, queueid: usize, item: T) {
217        if let Some(queue) = self.queues.get(queueid) {
218            queue.lock().unwrap().push(item);
219        }
220    }
221
222    /// Try to pop from a queue, stealing from others if empty
223    pub fn pop(&self, queueid: usize) -> Option<T> {
224        // Try own queue first
225        if let Some(queue) = self.queues.get(queueid) {
226            if let Some(item) = queue.lock().unwrap().pop() {
227                return Some(item);
228            }
229        }
230
231        // Try to steal from other queues
232        for (i, queue) in self.queues.iter().enumerate() {
233            if i != queueid {
234                if let Some(item) = queue.lock().unwrap().pop() {
235                    return Some(item);
236                }
237            }
238        }
239
240        None
241    }
242}
243
244/// Integration with scirs2-core parallel operations
245#[allow(dead_code)]
246pub fn configure_parallel_ops() {
247    let config = get_thread_pool_config();
248
249    // Configure based on thread pool settings
250    if let Some(num_threads) = config.num_threads {
251        // In real implementation, would configure core parallel ops
252        let _ = num_threads;
253    }
254}
255
256#[cfg(test)]
257mod tests {
258    use super::*;
259
260    #[test]
261    fn test_thread_pool_config() {
262        let config = ThreadPoolConfig {
263            num_threads: Some(4),
264            stack_size: Some(4 * 1024 * 1024),
265            thread_name_prefix: "test-worker".to_string(),
266            pin_threads: true,
267        };
268
269        assert_eq!(config.num_threads, Some(4));
270        assert_eq!(config.thread_name_prefix, "test-worker");
271    }
272
273    #[test]
274    fn test_adaptive_thread_pool() {
275        let pool = AdaptiveThreadPool::new(2, 8);
276
277        assert_eq!(pool.current_thread_count(), 2);
278
279        // High load should increase threads
280        pool.adjust_threads(0.9);
281        assert_eq!(pool.current_thread_count(), 3);
282
283        // Low load should decrease threads
284        pool.adjust_threads(0.3);
285        assert_eq!(pool.current_thread_count(), 2);
286    }
287
288    #[test]
289    fn test_work_stealing_queue() {
290        let queue: WorkStealingQueue<i32> = WorkStealingQueue::new(2);
291
292        // Push to queue 0
293        queue.push(0, 1);
294        queue.push(0, 2);
295
296        // Pop from queue 0
297        assert_eq!(queue.pop(0), Some(2));
298
299        // Queue 1 steals from queue 0
300        assert_eq!(queue.pop(1), Some(1));
301
302        // Both empty now
303        assert_eq!(queue.pop(0), None);
304        assert_eq!(queue.pop(1), None);
305    }
306}