use crate::error::LinalgResult;
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ThreadPoolProfile {
CpuIntensive,
MemoryBound,
Balanced,
LowLatency,
HighThroughput,
LinearAlgebra,
MatrixMultiplication,
EigenComputation,
Decomposition,
IterativeSolver,
NumaOptimized,
HybridComputing,
Custom(String),
}
#[derive(Debug, Clone, PartialEq)]
pub enum AffinityStrategy {
None,
Pinned(Vec<usize>),
NumaSpread,
NumaCompact,
Custom(Vec<Option<usize>>),
}
#[derive(Debug, Clone, PartialEq)]
pub struct ThreadPoolConfig {
pub profile: ThreadPoolProfile,
pub min_threads: usize,
pub max_threads: usize,
pub active_threads: usize,
pub idle_timeout: Duration,
pub affinity: AffinityStrategy,
pub numa_aware: bool,
pub work_stealing: bool,
pub queue_capacity: usize,
pub stacksize: Option<usize>,
}
impl Default for ThreadPoolConfig {
fn default() -> Self {
let num_cpus = thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4);
Self {
profile: ThreadPoolProfile::Balanced,
min_threads: 1,
max_threads: num_cpus,
active_threads: num_cpus,
idle_timeout: Duration::from_secs(60),
affinity: AffinityStrategy::None,
numa_aware: false,
work_stealing: true,
queue_capacity: 1024,
stacksize: None,
}
}
}
#[derive(Debug, Clone)]
pub struct AdvancedThreadPoolConfig {
pub base_config: ThreadPoolConfig,
pub dynamic_sizing: DynamicSizingConfig,
pub monitoring: MonitoringConfig,
pub resource_isolation: ResourceIsolationConfig,
pub workload_adaptation: WorkloadAdaptationConfig,
}
#[derive(Debug, Clone)]
pub struct DynamicSizingConfig {
pub enabled: bool,
pub scale_up_threshold: f64,
pub scale_down_threshold: f64,
pub observation_period: Duration,
pub max_scaling_factor: f64,
}
impl Default for DynamicSizingConfig {
fn default() -> Self {
Self {
enabled: true,
scale_up_threshold: 0.8,
scale_down_threshold: 0.3,
observation_period: Duration::from_secs(5),
max_scaling_factor: 1.5,
}
}
}
#[derive(Debug, Clone)]
pub struct MonitoringConfig {
pub enabled: bool,
pub collection_interval: Duration,
pub max_history_entries: usize,
pub detailed_cpu_metrics: bool,
pub memory_tracking: bool,
}
impl Default for MonitoringConfig {
fn default() -> Self {
Self {
enabled: true,
collection_interval: Duration::from_secs(1),
max_history_entries: 1000,
detailed_cpu_metrics: false,
memory_tracking: true,
}
}
}
#[derive(Debug, Clone)]
pub struct ResourceIsolationConfig {
pub memory_policy: CacheAllocationPolicy,
pub cpu_isolation: bool,
pub memory_bandwidth_limit: Option<f64>,
pub cache_partition: Option<usize>,
}
#[derive(Debug, Clone)]
pub enum CacheAllocationPolicy {
Default,
Shared,
Isolated,
Custom(HashMap<String, f64>),
}
#[derive(Debug, Clone)]
pub struct WorkloadAdaptationConfig {
pub pattern_detection: bool,
pub learning_rate: f64,
pub prediction_model: PredictionModelParams,
pub characteristic_tracking: bool,
}
#[derive(Debug, Clone)]
pub struct PredictionModelParams {
pub model_type: String,
pub parameters: HashMap<String, f64>,
pub accuracy_threshold: f64,
}
impl Default for PredictionModelParams {
fn default() -> Self {
let mut parameters = HashMap::new();
parameters.insert("windowsize".to_string(), 10.0);
parameters.insert("smoothing_factor".to_string(), 0.1);
Self {
model_type: "exponential_smoothing".to_string(),
parameters,
accuracy_threshold: 0.8,
}
}
}
#[derive(Debug)]
pub struct ThreadPoolManager {
pool_configs: Arc<RwLock<HashMap<ThreadPoolProfile, AdvancedThreadPoolConfig>>>,
active_pools: Arc<RwLock<HashMap<ThreadPoolProfile, Arc<AdvancedPerformanceThreadPool>>>>,
#[allow(dead_code)]
instance: Arc<Mutex<Option<Self>>>,
}
impl Default for ThreadPoolManager {
fn default() -> Self {
Self::new()
}
}
impl ThreadPoolManager {
pub fn new() -> Self {
Self {
pool_configs: Arc::new(RwLock::new(HashMap::new())),
active_pools: Arc::new(RwLock::new(HashMap::new())),
instance: Arc::new(Mutex::new(None)),
}
}
pub fn get_pool(
&self,
profile: ThreadPoolProfile,
) -> LinalgResult<Arc<AdvancedPerformanceThreadPool>> {
let pools = self.active_pools.read().expect("Operation failed");
if let Some(pool) = pools.get(&profile) {
return Ok(pool.clone());
}
drop(pools);
let config = {
let configs = self.pool_configs.read().expect("Operation failed");
configs.get(&profile).cloned().unwrap_or_else(|| {
let base_config = ThreadPoolConfig {
profile: profile.clone(),
..Default::default()
};
AdvancedThreadPoolConfig {
base_config,
dynamic_sizing: DynamicSizingConfig::default(),
monitoring: MonitoringConfig::default(),
resource_isolation: ResourceIsolationConfig {
memory_policy: CacheAllocationPolicy::Default,
cpu_isolation: false,
memory_bandwidth_limit: None,
cache_partition: None,
},
workload_adaptation: WorkloadAdaptationConfig {
pattern_detection: true,
learning_rate: 0.1,
prediction_model: PredictionModelParams::default(),
characteristic_tracking: true,
},
}
})
};
let pool = Arc::new(AdvancedPerformanceThreadPool::new(config)?);
let mut pools = self.active_pools.write().expect("Operation failed");
pools.insert(profile, pool.clone());
Ok(pool)
}
pub fn configure_profile(&self, profile: ThreadPoolProfile, config: AdvancedThreadPoolConfig) {
let mut configs = self.pool_configs.write().expect("Operation failed");
configs.insert(profile, config);
}
pub fn get_all_stats(&self) -> HashMap<ThreadPoolProfile, AdvancedPerformanceStats> {
let pools = self.active_pools.read().expect("Operation failed");
let mut stats = HashMap::new();
for (profile, pool) in pools.iter() {
stats.insert(profile.clone(), pool.get_stats());
}
stats
}
pub fn auto_optimize_pools(
&self,
) -> LinalgResult<Vec<(ThreadPoolProfile, AdvancedThreadPoolConfig)>> {
let pools = self.active_pools.read().expect("Operation failed");
let mut optimizations = Vec::new();
for (profile, pool) in pools.iter() {
let stats = pool.get_stats();
let current_config = {
let configs = self.pool_configs.read().expect("Operation failed");
configs
.get(profile)
.cloned()
.unwrap_or_else(|| AdvancedThreadPoolConfig {
base_config: ThreadPoolConfig {
profile: profile.clone(),
..Default::default()
},
dynamic_sizing: DynamicSizingConfig::default(),
monitoring: MonitoringConfig::default(),
resource_isolation: ResourceIsolationConfig {
memory_policy: CacheAllocationPolicy::Default,
cpu_isolation: false,
memory_bandwidth_limit: None,
cache_partition: None,
},
workload_adaptation: WorkloadAdaptationConfig {
pattern_detection: true,
learning_rate: 0.1,
prediction_model: PredictionModelParams::default(),
characteristic_tracking: true,
},
})
};
let mut optimized_config = current_config.clone();
if stats.thread_pool_stats.total_tasks > 100 {
if stats.thread_pool_stats.thread_utilization < 0.6 {
optimized_config.base_config.active_threads =
(optimized_config.base_config.active_threads as f64 * 0.8) as usize;
} else if stats.thread_pool_stats.thread_utilization > 0.9
&& stats.thread_pool_stats.queue_length > 2
{
optimized_config.base_config.active_threads =
(optimized_config.base_config.active_threads as f64 * 1.2) as usize;
}
if stats.thread_pool_stats.total_tasks > 500 {
let task_completion_variance = stats
.max_task_duration
.saturating_sub(stats.min_task_duration);
if task_completion_variance > Duration::from_millis(100) {
optimized_config.dynamic_sizing.scale_up_threshold = 0.7;
optimized_config.dynamic_sizing.scale_down_threshold = 0.4;
} else {
optimized_config.dynamic_sizing.scale_up_threshold = 0.85;
optimized_config.dynamic_sizing.scale_down_threshold = 0.25;
}
}
match profile {
ThreadPoolProfile::MatrixMultiplication | ThreadPoolProfile::CpuIntensive
if optimized_config.base_config.affinity == AffinityStrategy::None => {
optimized_config.base_config.affinity = AffinityStrategy::NumaSpread;
}
ThreadPoolProfile::MemoryBound => {
optimized_config.base_config.affinity = AffinityStrategy::NumaCompact;
}
_ => {}
}
if stats.average_throughput_ops_per_sec > 1000.0 {
optimized_config.workload_adaptation.learning_rate = 0.15;
} else {
optimized_config.workload_adaptation.learning_rate = 0.05;
}
if optimized_config.base_config != current_config.base_config
|| optimized_config.dynamic_sizing.scale_up_threshold
!= current_config.dynamic_sizing.scale_up_threshold
|| optimized_config.workload_adaptation.learning_rate
!= current_config.workload_adaptation.learning_rate
{
optimizations.push((profile.clone(), optimized_config));
}
}
}
Ok(optimizations)
}
pub fn apply_optimizations(
&self,
optimizations: Vec<(ThreadPoolProfile, AdvancedThreadPoolConfig)>,
) {
let mut configs = self.pool_configs.write().expect("Operation failed");
for (profile, config) in optimizations {
configs.insert(profile, config);
}
}
}
#[derive(Debug)]
pub struct AdvancedPerformanceThreadPool {
#[allow(dead_code)]
config: AdvancedThreadPoolConfig,
stats: Arc<Mutex<AdvancedPerformanceStats>>,
thread_manager: Arc<Mutex<DynamicThreadManager>>,
workload_predictor: Arc<Mutex<WorkloadPredictor>>,
#[allow(dead_code)]
profiler: Arc<Mutex<ThreadPoolProfiler>>,
}
impl AdvancedPerformanceThreadPool {
pub fn new(config: AdvancedThreadPoolConfig) -> LinalgResult<Self> {
let stats = Arc::new(Mutex::new(AdvancedPerformanceStats::default()));
let thread_manager = Arc::new(Mutex::new(DynamicThreadManager::new(&config.base_config)?));
let workload_predictor = Arc::new(Mutex::new(WorkloadPredictor::new()));
let profiler = Arc::new(Mutex::new(ThreadPoolProfiler::new()));
Ok(Self {
config,
stats,
thread_manager,
workload_predictor,
profiler,
})
}
pub fn get_stats(&self) -> AdvancedPerformanceStats {
self.stats.lock().expect("Operation failed").clone()
}
pub fn execute<F, R>(&self, operationtype: OperationType, task: F) -> LinalgResult<R>
where
F: FnOnce() -> R + Send,
R: Send,
{
let start_time = Instant::now();
let predictor = self.workload_predictor.lock().expect("Operation failed");
let predicted_characteristics = predictor.predict_workload(&operationtype);
drop(predictor);
{
let mut manager = self.thread_manager.lock().expect("Operation failed");
manager.adapt_to_workload(&predicted_characteristics)?;
}
let result = task();
let execution_time = start_time.elapsed();
{
let mut stats = self.stats.lock().expect("Operation failed");
stats.record_execution(operationtype, execution_time);
}
{
let mut predictor = self.workload_predictor.lock().expect("Operation failed");
predictor.update_performance(operationtype, execution_time);
}
Ok(result)
}
}
#[derive(Debug)]
pub struct DynamicThreadManager {
config: ThreadPoolConfig,
current_threads: usize,
#[allow(dead_code)]
cpu_utilization_history: Vec<f64>,
#[allow(dead_code)]
last_scaling_time: Instant,
scaling_history: Vec<ScalingDecision>,
}
impl DynamicThreadManager {
pub fn new(config: &ThreadPoolConfig) -> LinalgResult<Self> {
Ok(Self {
config: config.clone(),
current_threads: config.active_threads,
cpu_utilization_history: Vec::new(),
last_scaling_time: Instant::now(),
scaling_history: Vec::new(),
})
}
pub fn adapt_to_workload(
&mut self,
characteristics: &WorkloadCharacteristics,
) -> LinalgResult<()> {
let optimal_threads = self.calculate_optimal_threads(characteristics);
if optimal_threads != self.current_threads {
let decision = ScalingDecision {
timestamp: Instant::now(),
from_threads: self.current_threads,
to_threads: optimal_threads,
reason: ScalingReason::WorkloadAdaptation,
confidence: characteristics.complexity_estimate,
};
self.scaling_history.push(decision);
self.current_threads = optimal_threads;
}
Ok(())
}
fn calculate_optimal_threads(&self, characteristics: &WorkloadCharacteristics) -> usize {
let base_threads = match characteristics.pattern {
WorkloadPattern::CpuBound => (self.config.max_threads as f64 * 0.9) as usize,
WorkloadPattern::MemoryBound => (self.config.max_threads as f64 * 0.6) as usize,
WorkloadPattern::Balanced => self.config.max_threads / 2,
WorkloadPattern::IoWait => self.config.max_threads,
};
let complexity_factor = characteristics.complexity_estimate.min(2.0);
let adjusted_threads = (base_threads as f64 * complexity_factor) as usize;
adjusted_threads
.max(self.config.min_threads)
.min(self.config.max_threads)
}
}
#[derive(Debug, Clone)]
pub struct WorkloadCharacteristics {
pub pattern: WorkloadPattern,
pub complexity_estimate: f64,
pub memory_usage: MemoryUsagePattern,
pub parallelization_potential: f64,
pub cache_locality: f64,
}
#[derive(Debug, Clone)]
pub enum WorkloadPattern {
CpuBound,
MemoryBound,
Balanced,
IoWait,
}
#[derive(Debug, Clone)]
pub enum MemoryUsagePattern {
Sequential,
Random,
Strided(usize),
Mixed,
}
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum OperationType {
MatrixMultiplication,
Decomposition(DecompositionType),
IterativeSolver(IterativeSolverType),
VectorOps,
EigenComputation,
LinearSolve,
Custom(u32),
}
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum DecompositionType {
LU,
QR,
SVD,
Cholesky,
Eigen,
Schur,
}
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum IterativeSolverType {
CG,
GMRES,
BiCGSTAB,
Jacobi,
GaussSeidel,
}
#[derive(Debug)]
pub struct WorkloadPredictor {
performance_history: HashMap<OperationType, Vec<Duration>>,
characteristics_cache: HashMap<OperationType, WorkloadCharacteristics>,
#[allow(dead_code)]
prediction_accuracy: HashMap<OperationType, f64>,
}
impl Default for WorkloadPredictor {
fn default() -> Self {
Self::new()
}
}
impl WorkloadPredictor {
pub fn new() -> Self {
Self {
performance_history: HashMap::new(),
characteristics_cache: HashMap::new(),
prediction_accuracy: HashMap::new(),
}
}
pub fn predict_workload(&self, operationtype: &OperationType) -> WorkloadCharacteristics {
self.characteristics_cache
.get(operationtype)
.cloned()
.unwrap_or_else(|| self.default_characteristics_for_operation(operationtype))
}
pub fn update_performance(&mut self, operation_type: OperationType, executiontime: Duration) {
let history = self.performance_history.entry(operation_type).or_default();
history.push(executiontime);
if history.len() > 100 {
history.remove(0);
}
self.update_characteristics(operation_type);
}
fn update_characteristics(&mut self, operationtype: OperationType) {
let characteristics = self.default_characteristics_for_operation(&operationtype);
self.characteristics_cache
.insert(operationtype, characteristics);
}
fn default_characteristics_for_operation(
&self,
operation_type: &OperationType,
) -> WorkloadCharacteristics {
match operation_type {
OperationType::MatrixMultiplication => WorkloadCharacteristics {
pattern: WorkloadPattern::CpuBound,
complexity_estimate: 2.0,
memory_usage: MemoryUsagePattern::Sequential,
parallelization_potential: 0.9,
cache_locality: 0.7,
},
OperationType::Decomposition(DecompositionType::LU) => WorkloadCharacteristics {
pattern: WorkloadPattern::Balanced,
complexity_estimate: 1.8,
memory_usage: MemoryUsagePattern::Random,
parallelization_potential: 0.7,
cache_locality: 0.5,
},
OperationType::EigenComputation => WorkloadCharacteristics {
pattern: WorkloadPattern::CpuBound,
complexity_estimate: 2.5,
memory_usage: MemoryUsagePattern::Mixed,
parallelization_potential: 0.6,
cache_locality: 0.4,
},
_ => WorkloadCharacteristics {
pattern: WorkloadPattern::Balanced,
complexity_estimate: 1.0,
memory_usage: MemoryUsagePattern::Sequential,
parallelization_potential: 0.5,
cache_locality: 0.5,
},
}
}
}
#[derive(Debug)]
pub struct ThreadPoolProfiler {
profile_metrics: HashMap<OperationType, ProfileMetrics>,
anomaly_detector: PerformanceAnomalyDetector,
}
impl Default for ThreadPoolProfiler {
fn default() -> Self {
Self::new()
}
}
impl ThreadPoolProfiler {
pub fn new() -> Self {
Self {
profile_metrics: HashMap::new(),
anomaly_detector: PerformanceAnomalyDetector::new(),
}
}
pub fn record_profile(
&mut self,
operation_type: OperationType,
execution_time: Duration,
thread_count: usize,
) {
let metrics = self.profile_metrics.entry(operation_type).or_default();
metrics.record_execution(execution_time, thread_count);
if let Some(anomaly) = self
.anomaly_detector
.detect_anomaly(operation_type, execution_time)
{
metrics.record_anomaly(anomaly);
}
}
pub fn get_metrics(&self, operationtype: &OperationType) -> Option<&ProfileMetrics> {
self.profile_metrics.get(operationtype)
}
}
#[derive(Debug)]
pub struct ProfileMetrics {
pub total_executions: u64,
pub avg_execution_time: Duration,
pub execution_time_variance: f64,
pub optimal_thread_count: usize,
pub anomalies: Vec<PerformanceAnomaly>,
}
impl Default for ProfileMetrics {
fn default() -> Self {
Self::new()
}
}
impl ProfileMetrics {
pub fn new() -> Self {
Self {
total_executions: 0,
avg_execution_time: Duration::ZERO,
execution_time_variance: 0.0,
optimal_thread_count: 1,
anomalies: Vec::new(),
}
}
pub fn record_execution(&mut self, execution_time: Duration, threadcount: usize) {
let old_avg = self.avg_execution_time;
self.total_executions += 1;
let n = self.total_executions as f64;
let new_time_ms = execution_time.as_secs_f64() * 1000.0;
let old_avg_ms = old_avg.as_secs_f64() * 1000.0;
let new_avg_ms = old_avg_ms + (new_time_ms - old_avg_ms) / n;
self.avg_execution_time = Duration::from_secs_f64(new_avg_ms / 1000.0);
let diff = new_time_ms - new_avg_ms;
self.execution_time_variance = ((n - 1.0) * self.execution_time_variance + diff * diff) / n;
if execution_time < self.avg_execution_time {
self.optimal_thread_count = threadcount;
}
}
pub fn record_anomaly(&mut self, anomaly: PerformanceAnomaly) {
self.anomalies.push(anomaly);
if self.anomalies.len() > 50 {
self.anomalies.remove(0);
}
}
}
#[derive(Debug)]
pub struct PerformanceAnomalyDetector {
baselines: HashMap<OperationType, Duration>,
sensitivity_threshold: f64,
}
impl Default for PerformanceAnomalyDetector {
fn default() -> Self {
Self::new()
}
}
impl PerformanceAnomalyDetector {
pub fn new() -> Self {
Self {
baselines: HashMap::new(),
sensitivity_threshold: 2.0, }
}
pub fn detect_anomaly(
&mut self,
operation_type: OperationType,
execution_time: Duration,
) -> Option<PerformanceAnomaly> {
if let Some(&baseline) = self.baselines.get(&operation_type) {
let ratio = execution_time.as_secs_f64() / baseline.as_secs_f64();
if ratio > self.sensitivity_threshold {
return Some(PerformanceAnomaly {
operation_type,
severity: if ratio > 5.0 {
AnomalySeverity::Critical
} else {
AnomalySeverity::Warning
},
anomaly_type: AnomalyType::SlowExecution,
measured_time: execution_time,
expected_time: baseline,
deviation_factor: ratio,
});
}
} else {
self.baselines.insert(operation_type, execution_time);
}
None
}
}
#[derive(Debug, Clone)]
pub struct PerformanceAnomaly {
pub operation_type: OperationType,
pub severity: AnomalySeverity,
pub anomaly_type: AnomalyType,
pub measured_time: Duration,
pub expected_time: Duration,
pub deviation_factor: f64,
}
#[derive(Debug, Clone)]
pub enum AnomalySeverity {
Info,
Warning,
Critical,
}
#[derive(Debug, Clone)]
pub enum AnomalyType {
SlowExecution,
HighMemoryUsage,
CpuUtilizationAnomaly,
ThreadContention,
}
#[derive(Debug, Clone)]
pub struct ScalingDecision {
pub timestamp: Instant,
pub from_threads: usize,
pub to_threads: usize,
pub reason: ScalingReason,
pub confidence: f64,
}
#[derive(Debug, Clone)]
pub enum ScalingReason {
HighCpuUtilization,
LowCpuUtilization,
WorkloadAdaptation,
PerformanceOptimization,
ResourceConstraints,
}
#[derive(Debug, Clone)]
pub struct AdvancedPerformanceStats {
pub thread_pool_stats: ThreadPoolStats,
pub memory_metrics: MemoryMetrics,
pub resource_patterns: HashMap<OperationType, ResourceUsagePattern>,
pub performance_trends: HashMap<OperationType, Vec<f64>>,
pub max_task_duration: Duration,
pub min_task_duration: Duration,
pub average_throughput_ops_per_sec: f64,
}
impl Default for AdvancedPerformanceStats {
fn default() -> Self {
Self {
thread_pool_stats: ThreadPoolStats::default(),
memory_metrics: MemoryMetrics::default(),
resource_patterns: HashMap::new(),
performance_trends: HashMap::new(),
max_task_duration: Duration::from_millis(0),
min_task_duration: Duration::from_millis(u64::MAX),
average_throughput_ops_per_sec: 0.0,
}
}
}
impl AdvancedPerformanceStats {
pub fn record_execution(&mut self, operation_type: OperationType, executiontime: Duration) {
self.thread_pool_stats.total_tasks += 1;
self.thread_pool_stats.total_execution_time += executiontime;
let trends = self.performance_trends.entry(operation_type).or_default();
trends.push(executiontime.as_secs_f64() * 1000.0);
if trends.len() > 100 {
trends.remove(0);
}
}
}
#[derive(Debug, Clone, Default)]
pub struct ThreadPoolStats {
pub total_tasks: u64,
pub total_execution_time: Duration,
pub active_threads: usize,
pub queue_length: usize,
pub thread_utilization: f64,
}
#[derive(Debug, Clone, Default)]
pub struct MemoryMetrics {
pub current_usage: u64,
pub peak_usage: u64,
pub average_usage: u64,
pub allocation_rate: f64,
}
#[derive(Debug, Clone)]
pub struct ResourceUsagePattern {
pub cpu_pattern: Vec<f64>,
pub memory_pattern: Vec<u64>,
pub io_pattern: Option<Vec<f64>>,
pub cache_miss_pattern: Option<Vec<f64>>,
}
#[derive(Debug)]
pub struct ScopedThreadPool {
#[allow(dead_code)]
config: ThreadPoolConfig,
created_at: Instant,
cleanup_timeout: Duration,
}
impl ScopedThreadPool {
pub fn new(_config: ThreadPoolConfig, cleanuptimeout: Duration) -> Self {
Self {
config: _config,
created_at: Instant::now(),
cleanup_timeout: cleanuptimeout,
}
}
pub fn should_cleanup(&self) -> bool {
self.created_at.elapsed() > self.cleanup_timeout
}
}
static GLOBAL_MANAGER: std::sync::OnceLock<Arc<Mutex<ThreadPoolManager>>> =
std::sync::OnceLock::new();
#[allow(dead_code)]
pub fn get_global_manager() -> Arc<Mutex<ThreadPoolManager>> {
GLOBAL_MANAGER
.get_or_init(|| Arc::new(Mutex::new(ThreadPoolManager::new())))
.clone()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_thread_pool_config_default() {
let config = ThreadPoolConfig::default();
assert!(config.max_threads > 0);
assert!(config.min_threads > 0);
assert!(config.min_threads <= config.max_threads);
}
#[test]
fn test_dynamic_sizing_config() {
let config = DynamicSizingConfig::default();
assert!(config.scale_up_threshold > config.scale_down_threshold);
assert!(config.max_scaling_factor > 1.0);
}
#[test]
fn test_workload_predictor() {
let mut predictor = WorkloadPredictor::new();
let characteristics = predictor.predict_workload(&OperationType::MatrixMultiplication);
assert!(matches!(characteristics.pattern, WorkloadPattern::CpuBound));
assert!(characteristics.complexity_estimate > 0.0);
predictor.update_performance(
OperationType::MatrixMultiplication,
Duration::from_millis(100),
);
let updated_characteristics =
predictor.predict_workload(&OperationType::MatrixMultiplication);
assert!(matches!(
updated_characteristics.pattern,
WorkloadPattern::CpuBound
));
}
#[test]
fn test_performance_anomaly_detection() {
let mut detector = PerformanceAnomalyDetector::new();
let baseline = Duration::from_millis(100);
assert!(detector
.detect_anomaly(OperationType::MatrixMultiplication, baseline)
.is_none());
let slow_execution = Duration::from_millis(300);
let anomaly = detector.detect_anomaly(OperationType::MatrixMultiplication, slow_execution);
assert!(anomaly.is_some());
if let Some(anomaly) = anomaly {
assert_eq!(anomaly.operation_type, OperationType::MatrixMultiplication);
assert!(matches!(anomaly.anomaly_type, AnomalyType::SlowExecution));
}
}
#[test]
fn test_thread_pool_manager() {
let manager = ThreadPoolManager::new();
let pool = manager.get_pool(ThreadPoolProfile::LinearAlgebra);
assert!(pool.is_ok());
let config = AdvancedThreadPoolConfig {
base_config: ThreadPoolConfig::default(),
dynamic_sizing: DynamicSizingConfig::default(),
monitoring: MonitoringConfig::default(),
resource_isolation: ResourceIsolationConfig {
memory_policy: CacheAllocationPolicy::Default,
cpu_isolation: false,
memory_bandwidth_limit: None,
cache_partition: None,
},
workload_adaptation: WorkloadAdaptationConfig {
pattern_detection: true,
learning_rate: 0.1,
prediction_model: PredictionModelParams::default(),
characteristic_tracking: true,
},
};
manager.configure_profile(ThreadPoolProfile::MatrixMultiplication, config);
let stats = manager.get_all_stats();
assert!(stats.contains_key(&ThreadPoolProfile::LinearAlgebra));
}
}