scirs2_linalg/parallel/advanced_work_stealing/
mod.rs1use super::*;
7use crate::parallel::numa::NumaTopology;
8use std::cmp::Ordering as CmpOrdering;
9use std::collections::{BinaryHeap, VecDeque};
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::time::{Duration, Instant};
12
13#[derive(Debug, Clone)]
15pub struct PriorityWorkItem<T> {
16 pub data: T,
17 pub priority: u32,
18 pub estimated_cost: Duration,
19 pub dependencies: Vec<usize>,
20 pub task_id: usize,
21}
22
23impl<T> PartialEq for PriorityWorkItem<T> {
24 fn eq(&self, other: &Self) -> bool {
25 self.priority == other.priority
26 }
27}
28
29impl<T> Eq for PriorityWorkItem<T> {}
30
31impl<T> PartialOrd for PriorityWorkItem<T> {
32 fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
33 Some(self.cmp(other))
34 }
35}
36
37impl<T> Ord for PriorityWorkItem<T> {
38 fn cmp(&self, other: &Self) -> CmpOrdering {
39 self.priority
41 .cmp(&other.priority)
42 .then_with(|| other.estimated_cost.cmp(&self.estimated_cost))
43 }
44}
45
46pub struct AdvancedWorkStealingQueue<T> {
48 high_priority: Mutex<BinaryHeap<PriorityWorkItem<T>>>,
50 normal_priority: Mutex<VecDeque<PriorityWorkItem<T>>>,
52 low_priority: Mutex<VecDeque<PriorityWorkItem<T>>>,
54 completion_history: Mutex<VecDeque<(usize, Duration)>>,
56 #[allow(dead_code)]
58 active_workers: AtomicUsize,
59 stats: Mutex<WorkStealingStats>,
61}
62
63#[derive(Debug, Clone, Default)]
65pub struct WorkStealingStats {
66 pub tasks_completed: usize,
67 pub successful_steals: usize,
68 pub failed_steals: usize,
69 pub average_completion_time: Duration,
70 pub load_imbalance_ratio: f64,
71 pub prediction_accuracy: f64,
72}
73
74impl<T> Default for AdvancedWorkStealingQueue<T> {
75 fn default() -> Self {
76 Self::new()
77 }
78}
79
80impl<T> AdvancedWorkStealingQueue<T> {
81 pub fn new() -> Self {
83 Self {
84 high_priority: Mutex::new(BinaryHeap::new()),
85 normal_priority: Mutex::new(VecDeque::new()),
86 low_priority: Mutex::new(VecDeque::new()),
87 completion_history: Mutex::new(VecDeque::with_capacity(1000)),
88 active_workers: AtomicUsize::new(0),
89 stats: Mutex::new(WorkStealingStats::default()),
90 }
91 }
92
93 pub fn push(&self, item: T, estimatedcost: Duration, dependencies: Vec<usize>) -> usize {
95 let task_id = self.generate_task_id();
96 let priority = self.classify_priority(&estimatedcost, &dependencies);
97
98 let work_item = PriorityWorkItem {
99 data: item,
100 priority,
101 estimated_cost: estimatedcost,
102 dependencies,
103 task_id,
104 };
105
106 match priority {
107 0..=33 => {
108 self.low_priority.lock().unwrap().push_back(work_item);
109 }
110 34..=66 => {
111 self.normal_priority.lock().unwrap().push_back(work_item);
112 }
113 _ => {
114 self.high_priority.lock().unwrap().push(work_item);
115 }
116 }
117
118 task_id
119 }
120
121 pub fn try_pop(&self) -> Option<PriorityWorkItem<T>> {
123 if let Ok(mut high_queue) = self.high_priority.try_lock() {
125 if let Some(item) = high_queue.pop() {
126 return Some(item);
127 }
128 }
129
130 if let Ok(mut normal_queue) = self.normal_priority.try_lock() {
132 if let Some(item) = normal_queue.pop_front() {
133 return Some(item);
134 }
135 }
136
137 if let Ok(mut low_queue) = self.low_priority.try_lock() {
139 if let Some(item) = low_queue.pop_front() {
140 return Some(item);
141 }
142 }
143
144 None
145 }
146
147 pub fn try_steal(&self) -> Option<PriorityWorkItem<T>> {
149 if let Ok(mut stats) = self.stats.try_lock() {
151 if let Ok(mut normal_queue) = self.normal_priority.try_lock() {
153 if let Some(item) = normal_queue.pop_back() {
154 stats.successful_steals += 1;
155 return Some(item);
156 }
157 }
158
159 if let Ok(mut low_queue) = self.low_priority.try_lock() {
161 if let Some(item) = low_queue.pop_back() {
162 stats.successful_steals += 1;
163 return Some(item);
164 }
165 }
166
167 stats.failed_steals += 1;
168 }
169
170 None
171 }
172
173 fn classify_priority(&self, estimatedcost: &Duration, dependencies: &[usize]) -> u32 {
175 let base_priority: u32 = if estimatedcost.as_millis() > 100 {
176 80 } else if estimatedcost.as_millis() > 10 {
178 50 } else {
180 20 };
182
183 let dependency_penalty = (dependencies.len() as u32 * 5).min(30);
185 base_priority.saturating_sub(dependency_penalty)
186 }
187
188 fn generate_task_id(&self) -> usize {
190 static TASK_COUNTER: AtomicUsize = AtomicUsize::new(0);
191 TASK_COUNTER.fetch_add(1, Ordering::Relaxed)
192 }
193
194 pub fn record_completion(&self, task_id: usize, actualduration: Duration) {
196 if let Ok(mut history) = self.completion_history.try_lock() {
197 history.push_back((task_id, actualduration));
198
199 if history.len() > 1000 {
201 history.pop_front();
202 }
203 }
204 }
205
206 pub fn get_stats(&self) -> WorkStealingStats {
208 self.stats.lock().unwrap().clone()
209 }
210
211 pub fn estimated_remaining_work(&self) -> Duration {
213 let high_count = self.high_priority.lock().unwrap().len();
214 let normal_count = self.normal_priority.lock().unwrap().len();
215 let low_count = self.low_priority.lock().unwrap().len();
216
217 Duration::from_millis((high_count * 100 + normal_count * 50 + low_count * 10) as u64)
219 }
220}
221
222pub struct MatrixAdaptiveChunking {
224 #[allow(dead_code)]
226 cache_linesize: usize,
227 #[allow(dead_code)]
229 numa_info: Option<NumaTopology>,
230 performance_history: Mutex<VecDeque<ChunkingPerformance>>,
232}
233
234#[derive(Debug, Clone)]
235struct ChunkingPerformance {
236 chunksize: usize,
237 matrix_dimensions: (usize, usize),
238 throughput: f64, #[allow(dead_code)]
240 cache_misses: usize,
241 #[allow(dead_code)]
242 timestamp: Instant,
243}
244
245impl Default for MatrixAdaptiveChunking {
246 fn default() -> Self {
247 Self::new()
248 }
249}
250
251impl MatrixAdaptiveChunking {
252 pub fn new() -> Self {
254 Self {
255 cache_linesize: 64, numa_info: Some(NumaTopology::detect()),
257 performance_history: Mutex::new(VecDeque::with_capacity(100)),
258 }
259 }
260
261 pub fn optimal_chunksize(
263 &self,
264 matrix_dims: (usize, usize),
265 operation_type: MatrixOperation,
266 ) -> usize {
267 let (rows, cols) = matrix_dims;
268
269 let base_chunk = match operation_type {
271 MatrixOperation::MatrixMultiply => {
272 let l1_cachesize = 32 * 1024; let elementsize = std::mem::size_of::<f64>();
275 let elements_per_cache = l1_cachesize / elementsize;
276
277 ((elements_per_cache as f64).sqrt() as usize).clamp(32, 512)
279 }
280 MatrixOperation::ElementWise => {
281 let memory_bandwidth = self.estimate_memory_bandwidth();
283 (memory_bandwidth / 8).clamp(64, 1024) }
285 MatrixOperation::Reduction => {
286 let num_cores = std::thread::available_parallelism()
288 .map(|n| n.get())
289 .unwrap_or(4);
290 rows.max(cols) / (num_cores * 4)
291 }
292 MatrixOperation::Decomposition => {
293 let num_cores = std::thread::available_parallelism()
295 .map(|n| n.get())
296 .unwrap_or(4);
297 rows.min(cols) / num_cores.max(1)
298 }
299 };
300
301 self.adjust_for_history(base_chunk, matrix_dims, operation_type)
303 }
304
305 fn estimate_memory_bandwidth(&self) -> usize {
307 match std::env::var("SCIRS_MEMORY_BANDWIDTH") {
310 Ok(val) => val.parse().unwrap_or(100_000), Err(_) => 100_000, }
313 }
314
315 fn adjust_for_history(
317 &self,
318 base_chunk: usize,
319 matrix_dims: (usize, usize),
320 _operation_type: MatrixOperation,
321 ) -> usize {
322 if let Ok(history) = self.performance_history.lock() {
323 let similar_ops: Vec<_> = history
325 .iter()
326 .filter(|perf| {
327 let (h_rows, h_cols) = perf.matrix_dimensions;
328 (h_rows as f64 / matrix_dims.0 as f64).abs() < 2.0
330 && (h_cols as f64 / matrix_dims.1 as f64).abs() < 2.0
331 })
332 .collect();
333
334 if !similar_ops.is_empty() {
335 let best_perf = similar_ops
337 .iter()
338 .max_by(|a, b| a.throughput.partial_cmp(&b.throughput).unwrap());
339
340 if let Some(best) = best_perf {
341 let weight = 0.7; return (base_chunk as f64 * (1.0 - weight) + best.chunksize as f64 * weight)
344 as usize;
345 }
346 }
347 }
348
349 base_chunk
350 }
351
352 pub fn record_performance(
354 &self,
355 chunksize: usize,
356 matrix_dims: (usize, usize),
357 throughput: f64,
358 ) {
359 if let Ok(mut history) = self.performance_history.lock() {
360 let perf = ChunkingPerformance {
361 chunksize,
362 matrix_dimensions: matrix_dims,
363 throughput,
364 cache_misses: 0, timestamp: Instant::now(),
366 };
367
368 history.push_back(perf);
369
370 if history.len() > 100 {
372 history.pop_front();
373 }
374 }
375 }
376}
377
378#[derive(Debug, Clone, Copy)]
380pub enum MatrixOperation {
381 MatrixMultiply,
382 ElementWise,
383 Reduction,
384 Decomposition,
385}
386
387pub struct PredictiveLoadBalancer {
389 execution_history: Mutex<std::collections::HashMap<String, Vec<Duration>>>,
391 worker_loads: Mutex<Vec<f64>>,
393 model_weights: Mutex<Vec<f64>>,
395}
396
397impl PredictiveLoadBalancer {
398 pub fn new(_numworkers: usize) -> Self {
400 Self {
401 execution_history: Mutex::new(std::collections::HashMap::new()),
402 worker_loads: Mutex::new(vec![0.0; _numworkers]),
403 model_weights: Mutex::new(vec![1.0; 4]), }
405 }
406
407 pub fn predict_execution_time(&self, taskfeatures: &TaskFeatures) -> Duration {
409 let weights = self.model_weights.lock().unwrap();
410
411 let _features = [
413 taskfeatures.datasize as f64,
414 taskfeatures.complexity_factor,
415 taskfeatures.memory_access_pattern as f64,
416 taskfeatures.arithmetic_intensity,
417 ];
418
419 let predicted_ms = _features
421 .iter()
422 .zip(weights.iter())
423 .map(|(f, w)| f * w)
424 .sum::<f64>()
425 .max(1.0); Duration::from_millis(predicted_ms as u64)
428 }
429
430 pub fn assign_task(&self, taskfeatures: &TaskFeatures) -> usize {
432 let predicted_time = self.predict_execution_time(taskfeatures);
433 let mut loads = self.worker_loads.lock().unwrap();
434
435 let (best_worker, min_load) = loads
437 .iter()
438 .enumerate()
439 .min_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap())
440 .unwrap();
441
442 loads[best_worker] += predicted_time.as_secs_f64();
444
445 best_worker
446 }
447
448 pub fn update_model(&self, task_features: &TaskFeatures, actualtime: Duration) {
450 let task_type = format!(
452 "{}_{}",
453 task_features.datasize, task_features.complexity_factor as u32
454 );
455
456 if let Ok(mut history) = self.execution_history.lock() {
457 history
458 .entry(task_type)
459 .or_insert_with(Vec::new)
460 .push(actualtime);
461 }
462
463 self.update_weights(task_features, actualtime);
465 }
466
467 pub fn update_worker_load(&self, worker_id: usize, completedtime: Duration) {
469 if let Ok(mut loads) = self.worker_loads.lock() {
470 if worker_id < loads.len() {
471 loads[worker_id] -= completedtime.as_secs_f64();
472 loads[worker_id] = loads[worker_id].max(0.0);
473 }
474 }
475 }
476
477 fn update_weights(&self, task_features: &TaskFeatures, actualtime: Duration) {
479 let predicted_time = self.predict_execution_time(task_features);
480 let error = actualtime.as_secs_f64() - predicted_time.as_secs_f64();
481
482 if let Ok(mut weights) = self.model_weights.lock() {
483 let learning_rate = 0.001;
484 let _features = [
485 task_features.datasize as f64,
486 task_features.complexity_factor,
487 task_features.memory_access_pattern as f64,
488 task_features.arithmetic_intensity,
489 ];
490
491 for (weight, feature) in weights.iter_mut().zip(_features.iter()) {
493 *weight += learning_rate * error * feature;
494 }
495 }
496 }
497}
498
499#[derive(Debug, Clone)]
501pub struct TaskFeatures {
502 pub datasize: usize,
503 pub complexity_factor: f64,
504 pub memory_access_pattern: u32, pub arithmetic_intensity: f64, }
507
508impl TaskFeatures {
509 pub fn formatrix_operation(matrix_dims: (usize, usize), operation: MatrixOperation) -> Self {
511 let (rows, cols) = matrix_dims;
512 let datasize = rows * cols;
513
514 let (complexity_factor, memory_pattern, arithmetic_intensity) = match operation {
515 MatrixOperation::MatrixMultiply => {
516 (rows as f64 * cols as f64 * 2.0, 1, 2.0) }
518 MatrixOperation::ElementWise => {
519 (datasize as f64, 0, 1.0) }
521 MatrixOperation::Reduction => {
522 (datasize as f64, 0, 1.0) }
524 MatrixOperation::Decomposition => {
525 (datasize as f64 * 1.5, 2, 3.0) }
527 };
528
529 Self {
530 datasize,
531 complexity_factor,
532 memory_access_pattern: memory_pattern,
533 arithmetic_intensity,
534 }
535 }
536}