scirs2_linalg/parallel/advanced_work_stealing/
mod.rs

1//! Advanced work-stealing algorithms with intelligent scheduling
2//!
3//! This module provides sophisticated work-stealing implementations with
4//! priority-based scheduling, predictive load balancing, and adaptive chunking.
5
6use super::*;
7use crate::parallel::numa::NumaTopology;
8use std::cmp::Ordering as CmpOrdering;
9use std::collections::{BinaryHeap, VecDeque};
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::time::{Duration, Instant};
12
13/// Work item with priority for the advanced scheduler
14#[derive(Debug, Clone)]
15pub struct PriorityWorkItem<T> {
16    pub data: T,
17    pub priority: u32,
18    pub estimated_cost: Duration,
19    pub dependencies: Vec<usize>,
20    pub task_id: usize,
21}
22
23impl<T> PartialEq for PriorityWorkItem<T> {
24    fn eq(&self, other: &Self) -> bool {
25        self.priority == other.priority
26    }
27}
28
29impl<T> Eq for PriorityWorkItem<T> {}
30
31impl<T> PartialOrd for PriorityWorkItem<T> {
32    fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
33        Some(self.cmp(other))
34    }
35}
36
37impl<T> Ord for PriorityWorkItem<T> {
38    fn cmp(&self, other: &Self) -> CmpOrdering {
39        // Higher priority values get processed first
40        self.priority
41            .cmp(&other.priority)
42            .then_with(|| other.estimated_cost.cmp(&self.estimated_cost))
43    }
44}
45
46/// Advanced work-stealing queue with priority and prediction capabilities
47pub struct AdvancedWorkStealingQueue<T> {
48    /// High priority work items (processed first)
49    high_priority: Mutex<BinaryHeap<PriorityWorkItem<T>>>,
50    /// Normal priority work items
51    normal_priority: Mutex<VecDeque<PriorityWorkItem<T>>>,
52    /// Low priority work items (processed when idle)
53    low_priority: Mutex<VecDeque<PriorityWorkItem<T>>>,
54    /// Completion time history for prediction
55    completion_history: Mutex<VecDeque<(usize, Duration)>>,
56    /// Number of active workers
57    #[allow(dead_code)]
58    active_workers: AtomicUsize,
59    /// Queue statistics
60    stats: Mutex<WorkStealingStats>,
61}
62
63/// Statistics for work-stealing performance analysis
64#[derive(Debug, Clone, Default)]
65pub struct WorkStealingStats {
66    pub tasks_completed: usize,
67    pub successful_steals: usize,
68    pub failed_steals: usize,
69    pub average_completion_time: Duration,
70    pub load_imbalance_ratio: f64,
71    pub prediction_accuracy: f64,
72}
73
74impl<T> Default for AdvancedWorkStealingQueue<T> {
75    fn default() -> Self {
76        Self::new()
77    }
78}
79
80impl<T> AdvancedWorkStealingQueue<T> {
81    /// Create a new advanced work-stealing queue
82    pub fn new() -> Self {
83        Self {
84            high_priority: Mutex::new(BinaryHeap::new()),
85            normal_priority: Mutex::new(VecDeque::new()),
86            low_priority: Mutex::new(VecDeque::new()),
87            completion_history: Mutex::new(VecDeque::with_capacity(1000)),
88            active_workers: AtomicUsize::new(0),
89            stats: Mutex::new(WorkStealingStats::default()),
90        }
91    }
92
93    /// Add work item with automatic priority classification
94    pub fn push(&self, item: T, estimatedcost: Duration, dependencies: Vec<usize>) -> usize {
95        let task_id = self.generate_task_id();
96        let priority = self.classify_priority(&estimatedcost, &dependencies);
97
98        let work_item = PriorityWorkItem {
99            data: item,
100            priority,
101            estimated_cost: estimatedcost,
102            dependencies,
103            task_id,
104        };
105
106        match priority {
107            0..=33 => {
108                self.low_priority.lock().unwrap().push_back(work_item);
109            }
110            34..=66 => {
111                self.normal_priority.lock().unwrap().push_back(work_item);
112            }
113            _ => {
114                self.high_priority.lock().unwrap().push(work_item);
115            }
116        }
117
118        task_id
119    }
120
121    /// Try to pop work item using intelligent scheduling
122    pub fn try_pop(&self) -> Option<PriorityWorkItem<T>> {
123        // First try high priority tasks
124        if let Ok(mut high_queue) = self.high_priority.try_lock() {
125            if let Some(item) = high_queue.pop() {
126                return Some(item);
127            }
128        }
129
130        // Then try normal priority tasks
131        if let Ok(mut normal_queue) = self.normal_priority.try_lock() {
132            if let Some(item) = normal_queue.pop_front() {
133                return Some(item);
134            }
135        }
136
137        // Finally try low priority tasks if we're idle
138        if let Ok(mut low_queue) = self.low_priority.try_lock() {
139            if let Some(item) = low_queue.pop_front() {
140                return Some(item);
141            }
142        }
143
144        None
145    }
146
147    /// Attempt to steal work from other queues (for work-stealing)
148    pub fn try_steal(&self) -> Option<PriorityWorkItem<T>> {
149        // Record steal attempt
150        if let Ok(mut stats) = self.stats.try_lock() {
151            // Try stealing from normal priority first (better balance)
152            if let Ok(mut normal_queue) = self.normal_priority.try_lock() {
153                if let Some(item) = normal_queue.pop_back() {
154                    stats.successful_steals += 1;
155                    return Some(item);
156                }
157            }
158
159            // Then try low priority
160            if let Ok(mut low_queue) = self.low_priority.try_lock() {
161                if let Some(item) = low_queue.pop_back() {
162                    stats.successful_steals += 1;
163                    return Some(item);
164                }
165            }
166
167            stats.failed_steals += 1;
168        }
169
170        None
171    }
172
173    /// Classify task priority based on cost and dependencies
174    fn classify_priority(&self, estimatedcost: &Duration, dependencies: &[usize]) -> u32 {
175        let base_priority: u32 = if estimatedcost.as_millis() > 100 {
176            80 // High _cost tasks get high priority
177        } else if estimatedcost.as_millis() > 10 {
178            50 // Medium _cost tasks
179        } else {
180            20 // Low _cost tasks
181        };
182
183        // Adjust for dependencies (more dependencies = lower priority)
184        let dependency_penalty = (dependencies.len() as u32 * 5).min(30);
185        base_priority.saturating_sub(dependency_penalty)
186    }
187
188    /// Generate unique task ID
189    fn generate_task_id(&self) -> usize {
190        static TASK_COUNTER: AtomicUsize = AtomicUsize::new(0);
191        TASK_COUNTER.fetch_add(1, Ordering::Relaxed)
192    }
193
194    /// Record task completion for performance prediction
195    pub fn record_completion(&self, task_id: usize, actualduration: Duration) {
196        if let Ok(mut history) = self.completion_history.try_lock() {
197            history.push_back((task_id, actualduration));
198
199            // Keep history bounded
200            if history.len() > 1000 {
201                history.pop_front();
202            }
203        }
204    }
205
206    /// Get current queue statistics
207    pub fn get_stats(&self) -> WorkStealingStats {
208        self.stats.lock().unwrap().clone()
209    }
210
211    /// Get estimated remaining work
212    pub fn estimated_remaining_work(&self) -> Duration {
213        let high_count = self.high_priority.lock().unwrap().len();
214        let normal_count = self.normal_priority.lock().unwrap().len();
215        let low_count = self.low_priority.lock().unwrap().len();
216
217        // Rough estimates based on priority
218        Duration::from_millis((high_count * 100 + normal_count * 50 + low_count * 10) as u64)
219    }
220}
221
222/// Matrix-specific adaptive chunking strategy
223pub struct MatrixAdaptiveChunking {
224    /// Cache line size for optimal memory access
225    #[allow(dead_code)]
226    cache_linesize: usize,
227    /// NUMA node information
228    #[allow(dead_code)]
229    numa_info: Option<NumaTopology>,
230    /// Historical performance data
231    performance_history: Mutex<VecDeque<ChunkingPerformance>>,
232}
233
234#[derive(Debug, Clone)]
235struct ChunkingPerformance {
236    chunksize: usize,
237    matrix_dimensions: (usize, usize),
238    throughput: f64, // operations per second
239    #[allow(dead_code)]
240    cache_misses: usize,
241    #[allow(dead_code)]
242    timestamp: Instant,
243}
244
245impl Default for MatrixAdaptiveChunking {
246    fn default() -> Self {
247        Self::new()
248    }
249}
250
251impl MatrixAdaptiveChunking {
252    /// Create new adaptive chunking strategy
253    pub fn new() -> Self {
254        Self {
255            cache_linesize: 64, // typical cache line size
256            numa_info: Some(NumaTopology::detect()),
257            performance_history: Mutex::new(VecDeque::with_capacity(100)),
258        }
259    }
260
261    /// Calculate optimal chunk size for matrix operation
262    pub fn optimal_chunksize(
263        &self,
264        matrix_dims: (usize, usize),
265        operation_type: MatrixOperation,
266    ) -> usize {
267        let (rows, cols) = matrix_dims;
268
269        // Base chunk size calculation
270        let base_chunk = match operation_type {
271            MatrixOperation::MatrixMultiply => {
272                // For matrix multiplication, consider cache blocking
273                let l1_cachesize = 32 * 1024; // 32KB typical L1 cache
274                let elementsize = std::mem::size_of::<f64>();
275                let elements_per_cache = l1_cachesize / elementsize;
276
277                // Aim for square blocks that fit in cache
278                ((elements_per_cache as f64).sqrt() as usize).clamp(32, 512)
279            }
280            MatrixOperation::ElementWise => {
281                // For element-wise operations, optimize for memory bandwidth
282                let memory_bandwidth = self.estimate_memory_bandwidth();
283                (memory_bandwidth / 8).clamp(64, 1024) // 8 bytes per f64
284            }
285            MatrixOperation::Reduction => {
286                // For reductions, use smaller chunks to balance load
287                let num_cores = std::thread::available_parallelism()
288                    .map(|n| n.get())
289                    .unwrap_or(4);
290                rows.max(cols) / (num_cores * 4)
291            }
292            MatrixOperation::Decomposition => {
293                // For decompositions, larger chunks for better locality
294                let num_cores = std::thread::available_parallelism()
295                    .map(|n| n.get())
296                    .unwrap_or(4);
297                rows.min(cols) / num_cores.max(1)
298            }
299        };
300
301        // Adjust based on historical performance
302        self.adjust_for_history(base_chunk, matrix_dims, operation_type)
303    }
304
305    /// Estimate memory bandwidth (simplified)
306    fn estimate_memory_bandwidth(&self) -> usize {
307        // This is a simplified estimation - in practice, this would
308        // involve actual benchmarking
309        match std::env::var("SCIRS_MEMORY_BANDWIDTH") {
310            Ok(val) => val.parse().unwrap_or(100_000), // MB/s
311            Err(_) => 100_000,                         // Default assumption: 100 GB/s
312        }
313    }
314
315    /// Adjust chunk size based on historical performance
316    fn adjust_for_history(
317        &self,
318        base_chunk: usize,
319        matrix_dims: (usize, usize),
320        _operation_type: MatrixOperation,
321    ) -> usize {
322        if let Ok(history) = self.performance_history.lock() {
323            // Find similar operations in history
324            let similar_ops: Vec<_> = history
325                .iter()
326                .filter(|perf| {
327                    let (h_rows, h_cols) = perf.matrix_dimensions;
328                    // Consider operations on similar-sized matrices
329                    (h_rows as f64 / matrix_dims.0 as f64).abs() < 2.0
330                        && (h_cols as f64 / matrix_dims.1 as f64).abs() < 2.0
331                })
332                .collect();
333
334            if !similar_ops.is_empty() {
335                // Find the _chunk size with best throughput
336                let best_perf = similar_ops
337                    .iter()
338                    .max_by(|a, b| a.throughput.partial_cmp(&b.throughput).unwrap());
339
340                if let Some(best) = best_perf {
341                    // Interpolate between base _chunk and historically best
342                    let weight = 0.7; // Favor historical data
343                    return (base_chunk as f64 * (1.0 - weight) + best.chunksize as f64 * weight)
344                        as usize;
345                }
346            }
347        }
348
349        base_chunk
350    }
351
352    /// Record performance data for future optimization
353    pub fn record_performance(
354        &self,
355        chunksize: usize,
356        matrix_dims: (usize, usize),
357        throughput: f64,
358    ) {
359        if let Ok(mut history) = self.performance_history.lock() {
360            let perf = ChunkingPerformance {
361                chunksize,
362                matrix_dimensions: matrix_dims,
363                throughput,
364                cache_misses: 0, // Would be measured in practice
365                timestamp: Instant::now(),
366            };
367
368            history.push_back(perf);
369
370            // Keep history bounded
371            if history.len() > 100 {
372                history.pop_front();
373            }
374        }
375    }
376}
377
378/// Types of matrix operations for chunking optimization
379#[derive(Debug, Clone, Copy)]
380pub enum MatrixOperation {
381    MatrixMultiply,
382    ElementWise,
383    Reduction,
384    Decomposition,
385}
386
387/// Predictive load balancer using machine learning-like predictions
388pub struct PredictiveLoadBalancer {
389    /// Historical execution times for different task types
390    execution_history: Mutex<std::collections::HashMap<String, Vec<Duration>>>,
391    /// Current load per worker
392    worker_loads: Mutex<Vec<f64>>,
393    /// Prediction model weights (simplified linear model)
394    model_weights: Mutex<Vec<f64>>,
395}
396
397impl PredictiveLoadBalancer {
398    /// Create new predictive load balancer
399    pub fn new(_numworkers: usize) -> Self {
400        Self {
401            execution_history: Mutex::new(std::collections::HashMap::new()),
402            worker_loads: Mutex::new(vec![0.0; _numworkers]),
403            model_weights: Mutex::new(vec![1.0; 4]), // Simple 4-feature model
404        }
405    }
406
407    /// Predict execution time for a task
408    pub fn predict_execution_time(&self, taskfeatures: &TaskFeatures) -> Duration {
409        let weights = self.model_weights.lock().unwrap();
410
411        // Extract _features
412        let _features = [
413            taskfeatures.datasize as f64,
414            taskfeatures.complexity_factor,
415            taskfeatures.memory_access_pattern as f64,
416            taskfeatures.arithmetic_intensity,
417        ];
418
419        // Simple linear prediction
420        let predicted_ms = _features
421            .iter()
422            .zip(weights.iter())
423            .map(|(f, w)| f * w)
424            .sum::<f64>()
425            .max(1.0); // Minimum 1ms
426
427        Duration::from_millis(predicted_ms as u64)
428    }
429
430    /// Assign task to optimal worker based on predicted load
431    pub fn assign_task(&self, taskfeatures: &TaskFeatures) -> usize {
432        let predicted_time = self.predict_execution_time(taskfeatures);
433        let mut loads = self.worker_loads.lock().unwrap();
434
435        // Find worker with minimum predicted finish time
436        let (best_worker, min_load) = loads
437            .iter()
438            .enumerate()
439            .min_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap())
440            .unwrap();
441
442        // Update predicted load
443        loads[best_worker] += predicted_time.as_secs_f64();
444
445        best_worker
446    }
447
448    /// Update model with actual execution time
449    pub fn update_model(&self, task_features: &TaskFeatures, actualtime: Duration) {
450        // Record execution _time
451        let task_type = format!(
452            "{}_{}",
453            task_features.datasize, task_features.complexity_factor as u32
454        );
455
456        if let Ok(mut history) = self.execution_history.lock() {
457            history
458                .entry(task_type)
459                .or_insert_with(Vec::new)
460                .push(actualtime);
461        }
462
463        // Simple model update (in practice, would use more sophisticated ML)
464        self.update_weights(task_features, actualtime);
465    }
466
467    /// Update worker load (when task completes)
468    pub fn update_worker_load(&self, worker_id: usize, completedtime: Duration) {
469        if let Ok(mut loads) = self.worker_loads.lock() {
470            if worker_id < loads.len() {
471                loads[worker_id] -= completedtime.as_secs_f64();
472                loads[worker_id] = loads[worker_id].max(0.0);
473            }
474        }
475    }
476
477    /// Simple weight update using gradient descent-like approach
478    fn update_weights(&self, task_features: &TaskFeatures, actualtime: Duration) {
479        let predicted_time = self.predict_execution_time(task_features);
480        let error = actualtime.as_secs_f64() - predicted_time.as_secs_f64();
481
482        if let Ok(mut weights) = self.model_weights.lock() {
483            let learning_rate = 0.001;
484            let _features = [
485                task_features.datasize as f64,
486                task_features.complexity_factor,
487                task_features.memory_access_pattern as f64,
488                task_features.arithmetic_intensity,
489            ];
490
491            // Update weights based on error
492            for (weight, feature) in weights.iter_mut().zip(_features.iter()) {
493                *weight += learning_rate * error * feature;
494            }
495        }
496    }
497}
498
499/// Features describing a computational task for prediction
500#[derive(Debug, Clone)]
501pub struct TaskFeatures {
502    pub datasize: usize,
503    pub complexity_factor: f64,
504    pub memory_access_pattern: u32, // 0=sequential, 1=random, 2=strided
505    pub arithmetic_intensity: f64,  // operations per byte
506}
507
508impl TaskFeatures {
509    /// Create task features for matrix operation
510    pub fn formatrix_operation(matrix_dims: (usize, usize), operation: MatrixOperation) -> Self {
511        let (rows, cols) = matrix_dims;
512        let datasize = rows * cols;
513
514        let (complexity_factor, memory_pattern, arithmetic_intensity) = match operation {
515            MatrixOperation::MatrixMultiply => {
516                (rows as f64 * cols as f64 * 2.0, 1, 2.0) // O(n²) complexity, random access, 2 ops per element
517            }
518            MatrixOperation::ElementWise => {
519                (datasize as f64, 0, 1.0) // O(n) complexity, sequential access, 1 op per element
520            }
521            MatrixOperation::Reduction => {
522                (datasize as f64, 0, 1.0) // O(n) complexity, sequential access
523            }
524            MatrixOperation::Decomposition => {
525                (datasize as f64 * 1.5, 2, 3.0) // Higher complexity, strided access
526            }
527        };
528
529        Self {
530            datasize,
531            complexity_factor,
532            memory_access_pattern: memory_pattern,
533            arithmetic_intensity,
534        }
535    }
536}