use crate::error::{StatsError, StatsResult};
use scirs2_core::ndarray::ArrayView2;
use scirs2_core::numeric::{Float, NumCast, One, Zero};
use scirs2_core::{
parallel_ops::*,
simd_ops::{PlatformCapabilities, SimdUnifiedOps},
};
use std::collections::{HashMap, VecDeque};
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct AdvancedParallelConfig {
pub hardware: HardwareConfig,
pub strategy: ParallelStrategy,
pub memory: MemoryConfig,
pub optimization: OptimizationConfig,
pub fault_tolerance: FaultToleranceConfig,
pub gpu: GpuConfig,
}
#[derive(Debug, Clone)]
pub struct HardwareConfig {
pub cpu_cores: usize,
pub numa_nodes: usize,
pub cachesizes: CacheSizes,
pub memory_bandwidth: f64,
pub simd_capabilities: PlatformCapabilities,
pub gpu_devices: Vec<GpuDevice>,
}
#[derive(Debug, Clone)]
pub struct CacheSizes {
pub l1data: usize,
pub l1_instruction: usize,
pub l2_unified: usize,
pub l3_shared: usize,
}
#[derive(Debug, Clone)]
pub struct GpuDevice {
pub device_id: usize,
pub memory_gb: f64,
pub compute_capability: f64,
pub multiprocessors: usize,
pub max_threads_per_block: usize,
}
#[derive(Debug, Clone, Copy)]
pub enum ParallelStrategy {
CpuOptimal,
CpuSimd,
HybridCpuGpu,
GpuPrimary,
Distributed,
Adaptive,
}
#[derive(Debug, Clone, Default)]
pub struct MemoryConfig {
pub system_ram: usize,
pub memory_limit: Option<usize>,
pub enable_out_of_core: bool,
pub out_of_core_chunksize: usize,
pub enable_memory_mapping: bool,
pub memory_poolsize: usize,
pub enable_gc: bool,
}
#[derive(Debug, Clone)]
pub struct OptimizationConfig {
pub adaptive_load_balancing: bool,
pub work_stealing: bool,
pub cache_aware_scheduling: bool,
pub numa_aware_allocation: bool,
pub dynamic_thread_scaling: bool,
pub monitoring_interval: Duration,
pub optimization_aggressiveness: f64,
}
#[derive(Debug, Clone)]
pub struct FaultToleranceConfig {
pub enable_checkpointing: bool,
pub checkpoint_interval: Duration,
pub enable_retry: bool,
pub max_retries: usize,
pub enable_degradation: bool,
pub health_check_interval: Duration,
}
#[derive(Debug, Clone)]
pub struct GpuConfig {
pub enable_gpu: bool,
pub preferred_device: Option<usize>,
pub gpu_memory_limit: Option<usize>,
pub transfer_threshold: usize,
pub enable_unified_memory: bool,
pub stream_count: usize,
}
impl Default for AdvancedParallelConfig {
fn default() -> Self {
let cpu_cores = num_threads();
let system_ram = Self::detect_system_ram();
Self {
hardware: HardwareConfig {
cpu_cores,
numa_nodes: Self::detect_numa_nodes(),
cachesizes: Self::detect_cachesizes(),
memory_bandwidth: Self::detect_memory_bandwidth(),
simd_capabilities: PlatformCapabilities::detect(),
gpu_devices: Self::detect_gpu_devices(),
},
strategy: ParallelStrategy::Adaptive,
memory: MemoryConfig {
system_ram,
memory_limit: Some(system_ram * 3 / 4), enable_out_of_core: true,
out_of_core_chunksize: {
#[cfg(target_pointer_width = "32")]
{
64 * 1024 * 1024
} #[cfg(target_pointer_width = "64")]
{
1024 * 1024 * 1024
} },
enable_memory_mapping: true,
memory_poolsize: system_ram / 8,
enable_gc: true,
},
optimization: OptimizationConfig {
adaptive_load_balancing: true,
work_stealing: true,
cache_aware_scheduling: true,
numa_aware_allocation: true,
dynamic_thread_scaling: true,
monitoring_interval: Duration::from_millis(100),
optimization_aggressiveness: 0.8,
},
fault_tolerance: FaultToleranceConfig {
enable_checkpointing: false, checkpoint_interval: Duration::from_secs(60),
enable_retry: true,
max_retries: 3,
enable_degradation: true,
health_check_interval: Duration::from_secs(10),
},
gpu: GpuConfig {
enable_gpu: false, preferred_device: None,
gpu_memory_limit: None,
transfer_threshold: 1024 * 1024, enable_unified_memory: false,
stream_count: 4,
},
}
}
}
impl AdvancedParallelConfig {
fn detect_system_ram() -> usize {
#[cfg(target_os = "linux")]
{
use std::fs;
if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") {
for line in meminfo.lines() {
if line.starts_with("MemTotal:") {
if let Some(kb_str) = line.split_whitespace().nth(1) {
if let Ok(kb) = kb_str.parse::<usize>() {
return kb * 1024; }
}
}
}
}
}
#[cfg(target_os = "windows")]
{
if let Ok(mem_str) = std::env::var("SCIRS_SYSTEM_RAM") {
if let Ok(mem_gb) = mem_str.parse::<usize>() {
#[cfg(target_pointer_width = "32")]
{
return (mem_gb * 1024 * 1024 * 1024).min(2 * 1024 * 1024 * 1024);
}
#[cfg(target_pointer_width = "64")]
{
return mem_gb * 1024 * 1024 * 1024;
}
}
}
}
#[cfg(target_os = "macos")]
{
if let Ok(mem_str) = std::env::var("SCIRS_SYSTEM_RAM") {
if let Ok(mem_gb) = mem_str.parse::<usize>() {
#[cfg(target_pointer_width = "32")]
{
return (mem_gb * 1024 * 1024 * 1024).min(2 * 1024 * 1024 * 1024);
}
#[cfg(target_pointer_width = "64")]
{
return mem_gb * 1024 * 1024 * 1024;
}
}
}
}
let num_cores = num_threads().max(1);
#[cfg(target_pointer_width = "32")]
{
if num_cores >= 16 {
2 * 1024 * 1024 * 1024 } else if num_cores >= 8 {
1024 * 1024 * 1024 } else if num_cores >= 4 {
512 * 1024 * 1024 } else {
256 * 1024 * 1024 }
}
#[cfg(target_pointer_width = "64")]
{
if num_cores >= 16 {
32usize * 1024 * 1024 * 1024 } else if num_cores >= 8 {
16usize * 1024 * 1024 * 1024 } else if num_cores >= 4 {
8usize * 1024 * 1024 * 1024 } else {
4usize * 1024 * 1024 * 1024 }
}
}
fn detect_numa_nodes() -> usize {
#[cfg(target_os = "linux")]
{
use std::fs;
if let Ok(entries) = fs::read_dir("/sys/devices/system/node") {
let mut numa_count = 0;
for entry in entries {
if let Ok(entry) = entry {
let name = entry.file_name();
if let Some(name_str) = name.to_str() {
if name_str.starts_with("node")
&& name_str[4..].parse::<usize>().is_ok()
{
numa_count += 1;
}
}
}
}
if numa_count > 0 {
return numa_count;
}
}
if let Ok(output) = std::process::Command::new("lscpu").output() {
if let Ok(output_str) = String::from_utf8(output.stdout) {
for line in output_str.lines() {
if line.contains("NUMA node(s):") {
if let Some(numa_str) = line.split(':').nth(1) {
if let Ok(numa_count) = numa_str.trim().parse::<usize>() {
return numa_count;
}
}
}
}
}
}
}
let num_cores = num_threads();
if num_cores >= 32 {
4 } else if num_cores >= 16 {
2 } else {
1 }
}
fn detect_cachesizes() -> CacheSizes {
#[cfg(target_os = "linux")]
{
use std::fs;
let mut l1data = 32 * 1024;
let mut l1_instruction = 32 * 1024;
let mut l2_unified = 256 * 1024;
let mut l3_shared = 8 * 1024 * 1024;
if let Ok(entries) = fs::read_dir("/sys/devices/system/cpu/cpu0/cache") {
for entry in entries {
if let Ok(entry) = entry {
let cache_path = entry.path();
if let Ok(level_str) = fs::read_to_string(cache_path.join("level")) {
if let Ok(level) = level_str.trim().parse::<u32>() {
if let Ok(size_str) = fs::read_to_string(cache_path.join("size")) {
let size_str = size_str.trim();
let size = if size_str.ends_with('K') {
size_str[..size_str.len() - 1].parse::<usize>().unwrap_or(0)
* 1024
} else if size_str.ends_with('M') {
size_str[..size_str.len() - 1].parse::<usize>().unwrap_or(0)
* 1024
* 1024
} else {
size_str.parse::<usize>().unwrap_or(0)
};
if let Ok(type_str) =
fs::read_to_string(cache_path.join("type"))
{
match (level, type_str.trim()) {
(1, "Data") => l1data = size,
(1, "Instruction") => l1_instruction = size,
(2, "Unified") => l2_unified = size,
(3, "Unified") => l3_shared = size,
_ => {} }
}
}
}
}
}
}
}
CacheSizes {
l1data,
l1_instruction,
l2_unified,
l3_shared,
}
}
#[cfg(not(target_os = "linux"))]
{
let num_cores = num_threads();
if num_cores >= 16 {
CacheSizes {
l1data: 48 * 1024, l1_instruction: 32 * 1024, l2_unified: 512 * 1024, l3_shared: 32 * 1024 * 1024, }
} else if num_cores >= 8 {
CacheSizes {
l1data: 32 * 1024, l1_instruction: 32 * 1024, l2_unified: 256 * 1024, l3_shared: 16 * 1024 * 1024, }
} else {
CacheSizes {
l1data: 32 * 1024, l1_instruction: 32 * 1024, l2_unified: 256 * 1024, l3_shared: 6 * 1024 * 1024, }
}
}
}
fn detect_memory_bandwidth() -> f64 {
let testsize = 64 * 1024 * 1024; let iterations = 10;
let mut total_bandwidth = 0.0;
let mut successful_tests = 0;
for _ in 0..iterations {
if let Some(bandwidth) = Self::measure_memory_bandwidth(testsize) {
total_bandwidth += bandwidth;
successful_tests += 1;
}
}
if successful_tests > 0 {
let avg_bandwidth = total_bandwidth / successful_tests as f64;
avg_bandwidth.min(200.0) } else {
let num_cores = num_threads();
if num_cores >= 16 {
100.0 } else if num_cores >= 8 {
50.0 } else {
25.6 }
}
}
fn measure_memory_bandwidth(size: usize) -> Option<f64> {
use std::time::Instant;
let source = vec![1.0f64; size / 8]; let mut dest = vec![0.0f64; size / 8];
for i in 0..source.len().min(1000) {
dest[i] = source[i];
}
let start = Instant::now();
for _ in 0..4 {
dest.copy_from_slice(&source);
std::hint::black_box(&dest);
}
let duration = start.elapsed();
if duration.as_nanos() > 0 {
let bytes_transferred = (size * 4 * 2) as f64; let seconds = duration.as_secs_f64();
let bandwidth_gbps = (bytes_transferred / seconds) / (1024.0 * 1024.0 * 1024.0);
Some(bandwidth_gbps)
} else {
None
}
}
fn detect_gpu_devices() -> Vec<GpuDevice> {
vec![]
}
}
pub struct AdvancedParallelProcessor<F> {
config: AdvancedParallelConfig,
thread_pool: Option<ThreadPool>,
performance_monitor: Arc<PerformanceMonitor>,
memory_manager: Arc<MemoryManager>,
gpu_context: Option<GpuContext>,
_phantom: PhantomData<F>,
}
pub struct ThreadPool {
workers: Vec<Worker>,
work_queue: Arc<Mutex<VecDeque<Task>>>,
shutdown: Arc<AtomicBool>,
active_workers: Arc<AtomicUsize>,
}
pub struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
local_queue: VecDeque<Task>,
numa_node: Option<usize>,
}
pub struct Task {
id: u64,
priority: u8,
complexity: f64,
datasize: usize,
function: Box<dyn FnOnce() -> TaskResult + Send>,
}
#[derive(Debug)]
pub struct TaskResult {
pub success: bool,
pub execution_time: Duration,
pub memory_used: usize,
pub error: Option<String>,
}
pub struct PerformanceMonitor {
metrics: RwLock<PerformanceMetrics>,
history: RwLock<VecDeque<PerformanceSnapshot>>,
monitoring_active: AtomicBool,
}
#[derive(Debug, Clone)]
pub struct MemoryUsageStats {
pub current_allocated: usize,
pub peak_allocated: usize,
pub total_allocations: usize,
pub total_deallocations: usize,
pub fragmentation_ratio: f64,
}
#[derive(Debug, Clone)]
pub struct PerformanceMetrics {
pub throughput_ops_per_sec: f64,
pub cpu_utilization: f64,
pub memory_utilization: f64,
pub cache_hit_ratio: f64,
pub load_balance_factor: f64,
pub average_task_time: Duration,
pub active_threads: usize,
pub completed_tasks: u64,
pub failed_tasks: u64,
}
#[derive(Debug, Clone)]
pub struct PerformanceSnapshot {
pub timestamp: Instant,
pub metrics: PerformanceMetrics,
}
pub struct MemoryManager {
allocated_memory: AtomicUsize,
peak_memory: AtomicUsize,
memory_pools: RwLock<HashMap<usize, MemoryPool>>,
gc_enabled: AtomicBool,
}
pub struct MemoryPool {
chunksize: usize,
available_chunks: Mutex<Vec<*mut u8>>,
total_chunks: AtomicUsize,
}
pub struct GpuContext {
device_id: usize,
available_memory: usize,
stream_handles: Vec<GpuStream>,
unified_memory_enabled: bool,
}
pub struct GpuStream {
stream_id: usize,
active: AtomicBool,
pending_operations: AtomicUsize,
}
impl<F> AdvancedParallelProcessor<F>
where
F: Float
+ NumCast
+ SimdUnifiedOps
+ Zero
+ One
+ PartialOrd
+ Copy
+ Send
+ Sync
+ 'static
+ std::fmt::Display
+ scirs2_core::ndarray::ScalarOperand,
{
pub fn new() -> Self {
Self::with_config(AdvancedParallelConfig::default())
}
pub fn with_config(config: AdvancedParallelConfig) -> Self {
let performance_monitor = Arc::new(PerformanceMonitor::new());
let memory_manager = Arc::new(MemoryManager::new(&config.memory));
let thread_pool = if config.hardware.cpu_cores > 1 {
Some(ThreadPool::new(&config))
} else {
None
};
let gpu_context = if config.gpu.enable_gpu {
GpuContext::new(&config.gpu).ok()
} else {
None
};
Self {
config,
thread_pool,
performance_monitor,
memory_manager,
gpu_context: None,
_phantom: PhantomData,
}
}
pub fn process_massivedataset<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
where
T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
R: Send + Sync + 'static,
{
let strategy = self.select_optimal_strategy(data)?;
match strategy {
ParallelStrategy::CpuOptimal => self.process_cpu_optimal(data, operation),
ParallelStrategy::CpuSimd => self.process_cpu_simd(data, operation),
ParallelStrategy::HybridCpuGpu => self.process_hybrid_cpu_gpu(data, operation),
ParallelStrategy::GpuPrimary => self.process_gpu_primary(data, operation),
ParallelStrategy::Distributed => self.process_distributed(data, operation),
ParallelStrategy::Adaptive => self.process_adaptive(data, operation),
}
}
fn select_optimal_strategy(&self, data: &ArrayView2<F>) -> StatsResult<ParallelStrategy> {
let datasize = data.len() * std::mem::size_of::<F>();
let (rows, cols) = data.dim();
if datasize > self.config.memory.system_ram {
Ok(ParallelStrategy::CpuOptimal)
} else if self.config.gpu.enable_gpu && datasize > self.config.gpu.transfer_threshold {
Ok(ParallelStrategy::HybridCpuGpu)
} else if rows * cols > 1_000_000 {
Ok(ParallelStrategy::CpuSimd)
} else {
Ok(ParallelStrategy::CpuOptimal)
}
}
fn process_cpu_optimal<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
where
T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
R: Send + Sync + 'static,
{
let (rows, cols) = data.dim();
let num_threads = self.config.hardware.cpu_cores;
let chunksize = rows.div_ceil(num_threads);
let results: Vec<_> = (0..num_threads)
.into_par_iter()
.map(|thread_id| {
let start_row = thread_id * chunksize;
let end_row = ((thread_id + 1) * chunksize).min(rows);
if start_row < rows {
let chunk = data.slice(scirs2_core::ndarray::s![start_row..end_row, ..]);
operation(&chunk)
} else {
Err(StatsError::InvalidArgument("Empty chunk".to_string()))
}
})
.filter_map(|result| result.ok())
.collect();
results.into_iter().next().ok_or_else(|| {
StatsError::ComputationError("No successful parallel results".to_string())
})
}
fn process_cpu_simd<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
where
T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
R: Send + Sync + 'static,
{
let _simd_processor =
crate::simd_comprehensive::AdvancedComprehensiveSimdProcessor::<F>::new();
operation(data)
}
fn process_hybrid_cpu_gpu<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
where
T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
R: Send + Sync + 'static,
{
if let Some(_gpu_context) = &self.gpu_context {
self.process_cpu_optimal(data, operation)
} else {
self.process_cpu_optimal(data, operation)
}
}
fn process_gpu_primary<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
where
T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
R: Send + Sync + 'static,
{
self.process_cpu_optimal(data, operation)
}
fn process_distributed<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
where
T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
R: Send + Sync + 'static,
{
self.process_cpu_optimal(data, operation)
}
fn process_adaptive<T, R>(&self, data: &ArrayView2<F>, operation: T) -> StatsResult<R>
where
T: Fn(&ArrayView2<F>) -> StatsResult<R> + Send + Sync + Clone + 'static,
R: Send + Sync + 'static,
{
let start_time = Instant::now();
let result = self.process_cpu_optimal(data, operation)?;
let duration = start_time.elapsed();
self.performance_monitor
.update_metrics(duration, data.len());
Ok(result)
}
pub fn get_performance_metrics(&self) -> PerformanceMetrics {
self.performance_monitor.get_current_metrics()
}
pub fn get_config(&self) -> &AdvancedParallelConfig {
&self.config
}
pub fn update_config(&mut self, config: AdvancedParallelConfig) {
self.config = config;
}
}
impl PerformanceMonitor {
fn new() -> Self {
Self {
metrics: RwLock::new(PerformanceMetrics::default()),
history: RwLock::new(VecDeque::new()),
monitoring_active: AtomicBool::new(true),
}
}
fn update_metrics(&self, execution_time: Duration, datasize: usize) {
if let Ok(mut metrics) = self.metrics.write() {
metrics.completed_tasks += 1;
metrics.average_task_time = execution_time;
let ops_per_sec = if execution_time.as_secs_f64() > 0.0 {
datasize as f64 / execution_time.as_secs_f64()
} else {
0.0
};
metrics.throughput_ops_per_sec = ops_per_sec;
}
}
fn get_current_metrics(&self) -> PerformanceMetrics {
self.metrics.read().expect("Operation failed").clone()
}
}
impl Default for PerformanceMetrics {
fn default() -> Self {
Self {
throughput_ops_per_sec: 0.0,
cpu_utilization: 0.0,
memory_utilization: 0.0,
cache_hit_ratio: 0.0,
load_balance_factor: 1.0,
average_task_time: Duration::from_secs(0),
active_threads: 0,
completed_tasks: 0,
failed_tasks: 0,
}
}
}
impl<F> Default for AdvancedParallelProcessor<F>
where
F: Float
+ NumCast
+ SimdUnifiedOps
+ Zero
+ One
+ PartialOrd
+ Copy
+ Send
+ Sync
+ 'static
+ std::fmt::Display
+ scirs2_core::ndarray::ScalarOperand,
{
fn default() -> Self {
Self::new()
}
}
pub type F64AdvancedParallelProcessor = AdvancedParallelProcessor<f64>;
pub type F32AdvancedParallelProcessor = AdvancedParallelProcessor<f32>;
#[allow(dead_code)]
pub fn create_advanced_parallel_processor<F>() -> AdvancedParallelProcessor<F>
where
F: Float
+ NumCast
+ SimdUnifiedOps
+ Zero
+ One
+ PartialOrd
+ Copy
+ Send
+ Sync
+ 'static
+ std::fmt::Display
+ scirs2_core::ndarray::ScalarOperand,
{
AdvancedParallelProcessor::new()
}
#[allow(dead_code)]
pub fn create_optimized_parallel_processor<F>(
config: AdvancedParallelConfig,
) -> AdvancedParallelProcessor<F>
where
F: Float
+ NumCast
+ SimdUnifiedOps
+ Zero
+ One
+ PartialOrd
+ Copy
+ Send
+ Sync
+ 'static
+ std::fmt::Display
+ scirs2_core::ndarray::ScalarOperand,
{
AdvancedParallelProcessor::with_config(config)
}
unsafe impl Send for MemoryPool {}
unsafe impl Sync for MemoryPool {}
#[cfg(test)]
mod tests {
use super::*;
use scirs2_core::ndarray::Array2;
#[test]
fn test_advanced_parallel_config_default() {
let config = AdvancedParallelConfig::default();
assert!(config.hardware.cpu_cores > 0);
assert!(config.memory.system_ram > 0);
}
#[test]
fn test_memory_bandwidth_detection() {
let bandwidth = AdvancedParallelConfig::detect_memory_bandwidth();
assert!(bandwidth > 0.0);
assert!(bandwidth < 1000.0); }
#[test]
fn test_cachesize_detection() {
let cachesizes = AdvancedParallelConfig::detect_cachesizes();
assert!(cachesizes.l1data > 0);
assert!(cachesizes.l2_unified > cachesizes.l1data);
assert!(cachesizes.l3_shared > cachesizes.l2_unified);
}
#[test]
fn test_numa_detection() {
let numa_nodes = AdvancedParallelConfig::detect_numa_nodes();
assert!(numa_nodes > 0);
assert!(numa_nodes <= 16); }
#[test]
fn test_advanced_parallel_processor_creation() {
let processor = AdvancedParallelProcessor::<f64>::new();
assert!(processor.config.hardware.cpu_cores > 0);
}
#[test]
fn test_strategy_selection() {
let processor = AdvancedParallelProcessor::<f64>::new();
let smalldata = Array2::<f64>::zeros((10, 10));
let strategy = processor
.select_optimal_strategy(&smalldata.view())
.expect("Operation failed");
assert!(matches!(strategy, ParallelStrategy::CpuOptimal));
}
#[test]
fn test_performance_monitor() {
let monitor = PerformanceMonitor::new();
let metrics = monitor.get_current_metrics();
assert_eq!(metrics.completed_tasks, 0);
}
#[test]
fn test_memory_manager() {
let config = MemoryConfig::default();
let manager = MemoryManager::new(&config);
assert_eq!(manager.allocated_memory.load(Ordering::Relaxed), 0);
}
}
impl MemoryManager {
fn new(config: &MemoryConfig) -> Self {
Self {
allocated_memory: AtomicUsize::new(0),
peak_memory: AtomicUsize::new(0),
memory_pools: RwLock::new(HashMap::new()),
gc_enabled: AtomicBool::new(config.enable_gc),
}
}
fn get_usage_stats(&self) -> MemoryUsageStats {
MemoryUsageStats {
current_allocated: self.allocated_memory.load(Ordering::Acquire),
peak_allocated: self.peak_memory.load(Ordering::Acquire),
total_allocations: 0, total_deallocations: 0, fragmentation_ratio: 0.0, }
}
}
impl ThreadPool {
fn new(config: &AdvancedParallelConfig) -> Self {
let num_workers = config.hardware.cpu_cores;
let work_queue = Arc::new(Mutex::new(VecDeque::new()));
let shutdown = Arc::new(AtomicBool::new(false));
let active_workers = Arc::new(AtomicUsize::new(0));
let workers = (0..num_workers)
.map(|id| Worker::new(id, work_queue.clone(), shutdown.clone()))
.collect();
Self {
workers,
work_queue,
shutdown,
active_workers,
}
}
}
impl Worker {
fn new(
_id: usize,
_work_queue: Arc<Mutex<VecDeque<Task>>>,
_shutdown: Arc<AtomicBool>,
) -> Self {
Self {
id: _id,
thread: None, local_queue: VecDeque::new(),
numa_node: None,
}
}
}
impl GpuContext {
fn new(config: &GpuConfig) -> Result<Self, String> {
#[cfg(target_pointer_width = "32")]
let default_gpu_memory = 256 * 1024 * 1024; #[cfg(target_pointer_width = "64")]
let default_gpu_memory = 1024 * 1024 * 1024;
Ok(Self {
device_id: config.preferred_device.unwrap_or(0),
available_memory: config.gpu_memory_limit.unwrap_or(default_gpu_memory),
stream_handles: Vec::new(),
unified_memory_enabled: config.enable_unified_memory,
})
}
}