use super::*;
use crate::parallel::numa::NumaTopology;
use std::cmp::Ordering as CmpOrdering;
use std::collections::{BinaryHeap, VecDeque};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct PriorityWorkItem<T> {
pub data: T,
pub priority: u32,
pub estimated_cost: Duration,
pub dependencies: Vec<usize>,
pub task_id: usize,
}
impl<T> PartialEq for PriorityWorkItem<T> {
fn eq(&self, other: &Self) -> bool {
self.priority == other.priority
}
}
impl<T> Eq for PriorityWorkItem<T> {}
impl<T> PartialOrd for PriorityWorkItem<T> {
fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
Some(self.cmp(other))
}
}
impl<T> Ord for PriorityWorkItem<T> {
fn cmp(&self, other: &Self) -> CmpOrdering {
self.priority
.cmp(&other.priority)
.then_with(|| other.estimated_cost.cmp(&self.estimated_cost))
}
}
pub struct AdvancedWorkStealingQueue<T> {
high_priority: Mutex<BinaryHeap<PriorityWorkItem<T>>>,
normal_priority: Mutex<VecDeque<PriorityWorkItem<T>>>,
low_priority: Mutex<VecDeque<PriorityWorkItem<T>>>,
completion_history: Mutex<VecDeque<(usize, Duration)>>,
#[allow(dead_code)]
active_workers: AtomicUsize,
stats: Mutex<WorkStealingStats>,
}
#[derive(Debug, Clone, Default)]
pub struct WorkStealingStats {
pub tasks_completed: usize,
pub successful_steals: usize,
pub failed_steals: usize,
pub average_completion_time: Duration,
pub load_imbalance_ratio: f64,
pub prediction_accuracy: f64,
}
impl<T> Default for AdvancedWorkStealingQueue<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> AdvancedWorkStealingQueue<T> {
pub fn new() -> Self {
Self {
high_priority: Mutex::new(BinaryHeap::new()),
normal_priority: Mutex::new(VecDeque::new()),
low_priority: Mutex::new(VecDeque::new()),
completion_history: Mutex::new(VecDeque::with_capacity(1000)),
active_workers: AtomicUsize::new(0),
stats: Mutex::new(WorkStealingStats::default()),
}
}
pub fn push(&self, item: T, estimatedcost: Duration, dependencies: Vec<usize>) -> usize {
let task_id = self.generate_task_id();
let priority = self.classify_priority(&estimatedcost, &dependencies);
let work_item = PriorityWorkItem {
data: item,
priority,
estimated_cost: estimatedcost,
dependencies,
task_id,
};
match priority {
0..=33 => {
self.low_priority
.lock()
.expect("Operation failed")
.push_back(work_item);
}
34..=66 => {
self.normal_priority
.lock()
.expect("Operation failed")
.push_back(work_item);
}
_ => {
self.high_priority
.lock()
.expect("Operation failed")
.push(work_item);
}
}
task_id
}
pub fn try_pop(&self) -> Option<PriorityWorkItem<T>> {
if let Ok(mut high_queue) = self.high_priority.try_lock() {
if let Some(item) = high_queue.pop() {
return Some(item);
}
}
if let Ok(mut normal_queue) = self.normal_priority.try_lock() {
if let Some(item) = normal_queue.pop_front() {
return Some(item);
}
}
if let Ok(mut low_queue) = self.low_priority.try_lock() {
if let Some(item) = low_queue.pop_front() {
return Some(item);
}
}
None
}
pub fn try_steal(&self) -> Option<PriorityWorkItem<T>> {
if let Ok(mut stats) = self.stats.try_lock() {
if let Ok(mut normal_queue) = self.normal_priority.try_lock() {
if let Some(item) = normal_queue.pop_back() {
stats.successful_steals += 1;
return Some(item);
}
}
if let Ok(mut low_queue) = self.low_priority.try_lock() {
if let Some(item) = low_queue.pop_back() {
stats.successful_steals += 1;
return Some(item);
}
}
stats.failed_steals += 1;
}
None
}
fn classify_priority(&self, estimatedcost: &Duration, dependencies: &[usize]) -> u32 {
let base_priority: u32 = if estimatedcost.as_millis() > 100 {
80 } else if estimatedcost.as_millis() > 10 {
50 } else {
20 };
let dependency_penalty = (dependencies.len() as u32 * 5).min(30);
base_priority.saturating_sub(dependency_penalty)
}
fn generate_task_id(&self) -> usize {
static TASK_COUNTER: AtomicUsize = AtomicUsize::new(0);
TASK_COUNTER.fetch_add(1, Ordering::Relaxed)
}
pub fn record_completion(&self, task_id: usize, actualduration: Duration) {
if let Ok(mut history) = self.completion_history.try_lock() {
history.push_back((task_id, actualduration));
if history.len() > 1000 {
history.pop_front();
}
}
}
pub fn get_stats(&self) -> WorkStealingStats {
self.stats.lock().expect("Operation failed").clone()
}
pub fn estimated_remaining_work(&self) -> Duration {
let high_count = self.high_priority.lock().expect("Operation failed").len();
let normal_count = self.normal_priority.lock().expect("Operation failed").len();
let low_count = self.low_priority.lock().expect("Operation failed").len();
Duration::from_millis((high_count * 100 + normal_count * 50 + low_count * 10) as u64)
}
}
pub struct MatrixAdaptiveChunking {
#[allow(dead_code)]
cache_linesize: usize,
#[allow(dead_code)]
numa_info: Option<NumaTopology>,
performance_history: Mutex<VecDeque<ChunkingPerformance>>,
}
#[derive(Debug, Clone)]
struct ChunkingPerformance {
chunksize: usize,
matrix_dimensions: (usize, usize),
throughput: f64, #[allow(dead_code)]
cache_misses: usize,
#[allow(dead_code)]
timestamp: Instant,
}
impl Default for MatrixAdaptiveChunking {
fn default() -> Self {
Self::new()
}
}
impl MatrixAdaptiveChunking {
pub fn new() -> Self {
Self {
cache_linesize: 64, numa_info: Some(NumaTopology::detect()),
performance_history: Mutex::new(VecDeque::with_capacity(100)),
}
}
pub fn optimal_chunksize(
&self,
matrix_dims: (usize, usize),
operation_type: MatrixOperation,
) -> usize {
let (rows, cols) = matrix_dims;
let base_chunk = match operation_type {
MatrixOperation::MatrixMultiply => {
let l1_cachesize = 32 * 1024; let elementsize = std::mem::size_of::<f64>();
let elements_per_cache = l1_cachesize / elementsize;
((elements_per_cache as f64).sqrt() as usize).clamp(32, 512)
}
MatrixOperation::ElementWise => {
let memory_bandwidth = self.estimate_memory_bandwidth();
(memory_bandwidth / 8).clamp(64, 1024) }
MatrixOperation::Reduction => {
let num_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
rows.max(cols) / (num_cores * 4)
}
MatrixOperation::Decomposition => {
let num_cores = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
rows.min(cols) / num_cores.max(1)
}
};
self.adjust_for_history(base_chunk, matrix_dims, operation_type)
}
fn estimate_memory_bandwidth(&self) -> usize {
match std::env::var("SCIRS_MEMORY_BANDWIDTH") {
Ok(val) => val.parse().unwrap_or(100_000), Err(_) => 100_000, }
}
fn adjust_for_history(
&self,
base_chunk: usize,
matrix_dims: (usize, usize),
_operation_type: MatrixOperation,
) -> usize {
if let Ok(history) = self.performance_history.lock() {
let similar_ops: Vec<_> = history
.iter()
.filter(|perf| {
let (h_rows, h_cols) = perf.matrix_dimensions;
(h_rows as f64 / matrix_dims.0 as f64).abs() < 2.0
&& (h_cols as f64 / matrix_dims.1 as f64).abs() < 2.0
})
.collect();
if !similar_ops.is_empty() {
let best_perf = similar_ops.iter().max_by(|a, b| {
a.throughput
.partial_cmp(&b.throughput)
.expect("Operation failed")
});
if let Some(best) = best_perf {
let weight = 0.7; return (base_chunk as f64 * (1.0 - weight) + best.chunksize as f64 * weight)
as usize;
}
}
}
base_chunk
}
pub fn record_performance(
&self,
chunksize: usize,
matrix_dims: (usize, usize),
throughput: f64,
) {
if let Ok(mut history) = self.performance_history.lock() {
let perf = ChunkingPerformance {
chunksize,
matrix_dimensions: matrix_dims,
throughput,
cache_misses: 0, timestamp: Instant::now(),
};
history.push_back(perf);
if history.len() > 100 {
history.pop_front();
}
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum MatrixOperation {
MatrixMultiply,
ElementWise,
Reduction,
Decomposition,
}
pub struct PredictiveLoadBalancer {
execution_history: Mutex<std::collections::HashMap<String, Vec<Duration>>>,
worker_loads: Mutex<Vec<f64>>,
model_weights: Mutex<Vec<f64>>,
}
impl PredictiveLoadBalancer {
pub fn new(_numworkers: usize) -> Self {
Self {
execution_history: Mutex::new(std::collections::HashMap::new()),
worker_loads: Mutex::new(vec![0.0; _numworkers]),
model_weights: Mutex::new(vec![1.0; 4]), }
}
pub fn predict_execution_time(&self, taskfeatures: &TaskFeatures) -> Duration {
let weights = self.model_weights.lock().expect("Operation failed");
let _features = [
taskfeatures.datasize as f64,
taskfeatures.complexity_factor,
taskfeatures.memory_access_pattern as f64,
taskfeatures.arithmetic_intensity,
];
let predicted_ms = _features
.iter()
.zip(weights.iter())
.map(|(f, w)| f * w)
.sum::<f64>()
.max(1.0);
Duration::from_millis(predicted_ms as u64)
}
pub fn assign_task(&self, taskfeatures: &TaskFeatures) -> usize {
let predicted_time = self.predict_execution_time(taskfeatures);
let mut loads = self.worker_loads.lock().expect("Operation failed");
let (best_worker, min_load) = loads
.iter()
.enumerate()
.min_by(|(_, a), (_, b)| a.partial_cmp(b).expect("Operation failed"))
.expect("Operation failed");
loads[best_worker] += predicted_time.as_secs_f64();
best_worker
}
pub fn update_model(&self, task_features: &TaskFeatures, actualtime: Duration) {
let task_type = format!(
"{}_{}",
task_features.datasize, task_features.complexity_factor as u32
);
if let Ok(mut history) = self.execution_history.lock() {
history
.entry(task_type)
.or_insert_with(Vec::new)
.push(actualtime);
}
self.update_weights(task_features, actualtime);
}
pub fn update_worker_load(&self, worker_id: usize, completedtime: Duration) {
if let Ok(mut loads) = self.worker_loads.lock() {
if worker_id < loads.len() {
loads[worker_id] -= completedtime.as_secs_f64();
loads[worker_id] = loads[worker_id].max(0.0);
}
}
}
fn update_weights(&self, task_features: &TaskFeatures, actualtime: Duration) {
let predicted_time = self.predict_execution_time(task_features);
let error = actualtime.as_secs_f64() - predicted_time.as_secs_f64();
if let Ok(mut weights) = self.model_weights.lock() {
let learning_rate = 0.001;
let _features = [
task_features.datasize as f64,
task_features.complexity_factor,
task_features.memory_access_pattern as f64,
task_features.arithmetic_intensity,
];
for (weight, feature) in weights.iter_mut().zip(_features.iter()) {
*weight += learning_rate * error * feature;
}
}
}
}
#[derive(Debug, Clone)]
pub struct TaskFeatures {
pub datasize: usize,
pub complexity_factor: f64,
pub memory_access_pattern: u32, pub arithmetic_intensity: f64, }
impl TaskFeatures {
pub fn formatrix_operation(matrix_dims: (usize, usize), operation: MatrixOperation) -> Self {
let (rows, cols) = matrix_dims;
let datasize = rows * cols;
let (complexity_factor, memory_pattern, arithmetic_intensity) = match operation {
MatrixOperation::MatrixMultiply => {
(rows as f64 * cols as f64 * 2.0, 1, 2.0) }
MatrixOperation::ElementWise => {
(datasize as f64, 0, 1.0) }
MatrixOperation::Reduction => {
(datasize as f64, 0, 1.0) }
MatrixOperation::Decomposition => {
(datasize as f64 * 1.5, 2, 3.0) }
};
Self {
datasize,
complexity_factor,
memory_access_pattern: memory_pattern,
arithmetic_intensity,
}
}
}