use crate::{ProfileEvent, TorshError, TorshResult};
use serde::{Deserialize, Serialize};
pub struct WorkloadCharacterizer {
samples: Vec<WorkloadSample>,
analysis: Option<WorkloadAnalysis>,
config: CharacterizationConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CharacterizationConfig {
pub min_sample_count: usize,
pub analysis_window_seconds: u64,
pub enable_memory_analysis: bool,
pub enable_compute_analysis: bool,
pub enable_io_analysis: bool,
pub enable_parallelism_analysis: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkloadSample {
pub timestamp: u64,
pub operation_name: String,
pub category: String,
pub duration_ms: f64,
pub cpu_utilization: f64,
pub memory_mb: f64,
pub cache_miss_rate: f64,
pub io_ops_per_sec: f64,
pub parallel_threads: u32,
pub flops: u64,
pub bytes_accessed: u64,
pub energy_joules: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkloadAnalysis {
pub workload_type: WorkloadType,
pub resource_patterns: ResourcePatterns,
pub compute_characteristics: ComputeCharacteristics,
pub memory_patterns: MemoryPatterns,
pub io_patterns: IOPatterns,
pub parallelism_analysis: ParallelismAnalysis,
pub bottlenecks: Vec<PerformanceBottleneck>,
pub recommendations: Vec<OptimizationRecommendation>,
pub stability_metrics: StabilityMetrics,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum WorkloadType {
ComputeIntensive,
MemoryBound,
IOIntensive,
Balanced,
GPUAccelerated,
NetworkBound,
CacheSensitive,
Irregular,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourcePatterns {
pub avg_cpu_utilization: f64,
pub cpu_utilization_variance: f64,
pub avg_memory_usage_mb: f64,
pub memory_peak_factor: f64,
pub memory_locality_score: f64,
pub cache_efficiency_score: f64,
pub io_throughput_mbps: f64,
pub network_utilization: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ComputeCharacteristics {
pub arithmetic_intensity: f64,
pub vectorization_efficiency: f64,
pub ilp_score: f64,
pub branch_prediction_efficiency: f64,
pub compute_to_memory_ratio: f64,
pub dominant_operations: Vec<OperationType>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum OperationType {
IntegerArithmetic,
FloatingPointArithmetic,
VectorOperations,
MatrixOperations,
MemoryOperations,
BranchOperations,
IOOperations,
SynchronizationOperations,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryPatterns {
pub sequential_access_ratio: f64,
pub random_access_ratio: f64,
pub stride_patterns: Vec<StridePattern>,
pub working_set_size_mb: f64,
pub reuse_distance_distribution: Vec<(u64, f64)>,
pub cache_line_efficiency: f64,
pub memory_bandwidth_utilization: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StridePattern {
pub stride_bytes: u64,
pub frequency: f64,
pub cache_friendliness: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IOPatterns {
pub read_write_ratio: f64,
pub sequential_io_ratio: f64,
pub avg_io_size_bytes: u64,
pub burst_patterns: Vec<IOBurstPattern>,
pub storage_utilization: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IOBurstPattern {
pub duration_ms: f64,
pub intensity_ops_per_sec: f64,
pub idle_time_ms: f64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParallelismAnalysis {
pub thread_utilization_efficiency: f64,
pub load_balancing_score: f64,
pub synchronization_overhead: f64,
pub parallel_efficiency: f64,
pub critical_path_analysis: CriticalPathAnalysis,
pub communication_patterns: Vec<CommunicationPattern>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CriticalPathAnalysis {
pub critical_path_length_ms: f64,
pub parallelizable_portion: f64,
pub serial_bottlenecks: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CommunicationPattern {
pub pattern_type: CommunicationPatternType,
pub data_volume_bytes: u64,
pub frequency_per_sec: f64,
pub latency_impact_ms: f64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum CommunicationPatternType {
PointToPoint,
Broadcast,
GatherReduce,
AllToAll,
ProducerConsumer,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PerformanceBottleneck {
pub bottleneck_type: BottleneckType,
pub severity: f64,
pub description: String,
pub affected_operations: Vec<String>,
pub performance_impact_percent: f64,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum BottleneckType {
CPUCompute,
MemoryBandwidth,
CacheMiss,
IOThroughput,
NetworkLatency,
Synchronization,
LoadBalancing,
ResourceContention,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptimizationRecommendation {
pub recommendation_type: OptimizationType,
pub priority: u8,
pub description: String,
pub expected_improvement_percent: f64,
pub implementation_complexity: u8,
pub actions: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum OptimizationType {
Algorithmic,
MemoryAccess,
CacheOptimization,
Vectorization,
Parallelization,
IOOptimization,
DataStructure,
Compiler,
HardwareUtilization,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StabilityMetrics {
pub performance_variance: f64,
pub resource_stability: f64,
pub predictability_score: f64,
pub phase_changes: Vec<PhaseChange>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PhaseChange {
pub timestamp: u64,
pub description: String,
pub magnitude: f64,
pub duration_ms: f64,
}
impl Default for CharacterizationConfig {
fn default() -> Self {
Self {
min_sample_count: 100,
analysis_window_seconds: 60,
enable_memory_analysis: true,
enable_compute_analysis: true,
enable_io_analysis: true,
enable_parallelism_analysis: true,
}
}
}
impl Default for WorkloadCharacterizer {
fn default() -> Self {
Self::new()
}
}
impl WorkloadCharacterizer {
pub fn new() -> Self {
Self {
samples: Vec::new(),
analysis: None,
config: CharacterizationConfig::default(),
}
}
pub fn with_config(config: CharacterizationConfig) -> Self {
Self {
samples: Vec::new(),
analysis: None,
config,
}
}
pub fn add_sample(&mut self, sample: WorkloadSample) -> TorshResult<()> {
self.samples.push(sample);
Ok(())
}
pub fn add_samples_from_events(&mut self, events: &[ProfileEvent]) -> TorshResult<()> {
for event in events {
let sample = WorkloadSample {
timestamp: event.start_us / 1000, operation_name: event.name.clone(),
category: event.category.clone(),
duration_ms: event.duration_us as f64 / 1000.0,
cpu_utilization: 0.7, memory_mb: event.bytes_transferred.unwrap_or(0) as f64 / (1024.0 * 1024.0),
cache_miss_rate: 0.05, io_ops_per_sec: 0.0, parallel_threads: 1, flops: event.flops.unwrap_or(0),
bytes_accessed: event.bytes_transferred.unwrap_or(0),
energy_joules: 0.0, };
self.add_sample(sample)?;
}
Ok(())
}
pub fn analyze(&mut self) -> TorshResult<WorkloadAnalysis> {
if self.samples.len() < self.config.min_sample_count {
return Err(TorshError::InvalidArgument(format!(
"Insufficient samples for analysis: {} < {}",
self.samples.len(),
self.config.min_sample_count
)));
}
let workload_type = self.classify_workload_type()?;
let resource_patterns = self.analyze_resource_patterns()?;
let compute_characteristics = self.analyze_compute_characteristics()?;
let memory_patterns = self.analyze_memory_patterns()?;
let io_patterns = self.analyze_io_patterns()?;
let parallelism_analysis = self.analyze_parallelism()?;
let bottlenecks = self.identify_bottlenecks()?;
let recommendations = self.generate_recommendations(&workload_type, &bottlenecks)?;
let stability_metrics = self.analyze_stability()?;
let analysis = WorkloadAnalysis {
workload_type,
resource_patterns,
compute_characteristics,
memory_patterns,
io_patterns,
parallelism_analysis,
bottlenecks,
recommendations,
stability_metrics,
};
self.analysis = Some(analysis.clone());
Ok(analysis)
}
pub fn get_analysis(&self) -> Option<&WorkloadAnalysis> {
self.analysis.as_ref()
}
pub fn export_analysis(&self, filename: &str) -> TorshResult<()> {
if let Some(analysis) = &self.analysis {
let json = serde_json::to_string_pretty(analysis)
.map_err(|e| TorshError::SerializationError(e.to_string()))?;
std::fs::write(filename, json).map_err(|e| TorshError::IoError(e.to_string()))?;
}
Ok(())
}
pub fn reset(&mut self) {
self.samples.clear();
self.analysis = None;
}
fn classify_workload_type(&self) -> TorshResult<WorkloadType> {
let avg_cpu =
self.samples.iter().map(|s| s.cpu_utilization).sum::<f64>() / self.samples.len() as f64;
let avg_memory =
self.samples.iter().map(|s| s.memory_mb).sum::<f64>() / self.samples.len() as f64;
let avg_io =
self.samples.iter().map(|s| s.io_ops_per_sec).sum::<f64>() / self.samples.len() as f64;
let avg_flops =
self.samples.iter().map(|s| s.flops).sum::<u64>() / self.samples.len() as u64;
if avg_flops > 1000000 && avg_cpu > 0.8 {
Ok(WorkloadType::ComputeIntensive)
} else if avg_memory > 1000.0 {
Ok(WorkloadType::MemoryBound)
} else if avg_io > 1000.0 {
Ok(WorkloadType::IOIntensive)
} else {
Ok(WorkloadType::Balanced)
}
}
fn analyze_resource_patterns(&self) -> TorshResult<ResourcePatterns> {
let cpu_utilizations: Vec<f64> = self.samples.iter().map(|s| s.cpu_utilization).collect();
let avg_cpu = cpu_utilizations.iter().sum::<f64>() / cpu_utilizations.len() as f64;
let cpu_variance = cpu_utilizations
.iter()
.map(|x| (x - avg_cpu).powi(2))
.sum::<f64>()
/ cpu_utilizations.len() as f64;
let avg_memory =
self.samples.iter().map(|s| s.memory_mb).sum::<f64>() / self.samples.len() as f64;
let max_memory = self.samples.iter().map(|s| s.memory_mb).fold(0.0, f64::max);
let memory_peak_factor = if avg_memory > 0.0 {
max_memory / avg_memory
} else {
1.0
};
Ok(ResourcePatterns {
avg_cpu_utilization: avg_cpu,
cpu_utilization_variance: cpu_variance,
avg_memory_usage_mb: avg_memory,
memory_peak_factor,
memory_locality_score: 0.8, cache_efficiency_score: 1.0
- (self.samples.iter().map(|s| s.cache_miss_rate).sum::<f64>()
/ self.samples.len() as f64),
io_throughput_mbps: self.samples.iter().map(|s| s.io_ops_per_sec).sum::<f64>()
/ self.samples.len() as f64,
network_utilization: 0.0, })
}
fn analyze_compute_characteristics(&self) -> TorshResult<ComputeCharacteristics> {
let total_flops: u64 = self.samples.iter().map(|s| s.flops).sum();
let total_bytes: u64 = self.samples.iter().map(|s| s.bytes_accessed).sum();
let arithmetic_intensity = if total_bytes > 0 {
total_flops as f64 / total_bytes as f64
} else {
0.0
};
Ok(ComputeCharacteristics {
arithmetic_intensity,
vectorization_efficiency: 0.7, ilp_score: 0.6, branch_prediction_efficiency: 0.9, compute_to_memory_ratio: arithmetic_intensity,
dominant_operations: vec![OperationType::FloatingPointArithmetic], })
}
fn analyze_memory_patterns(&self) -> TorshResult<MemoryPatterns> {
Ok(MemoryPatterns {
sequential_access_ratio: 0.6, random_access_ratio: 0.4,
stride_patterns: vec![
StridePattern {
stride_bytes: 64,
frequency: 0.4,
cache_friendliness: 0.9,
},
StridePattern {
stride_bytes: 1024,
frequency: 0.3,
cache_friendliness: 0.5,
},
],
working_set_size_mb: self.samples.iter().map(|s| s.memory_mb).fold(0.0, f64::max),
reuse_distance_distribution: vec![(64, 0.3), (1024, 0.4), (65536, 0.3)],
cache_line_efficiency: 0.8,
memory_bandwidth_utilization: 0.7,
})
}
fn analyze_io_patterns(&self) -> TorshResult<IOPatterns> {
Ok(IOPatterns {
read_write_ratio: 2.0, sequential_io_ratio: 0.7,
avg_io_size_bytes: 4096,
burst_patterns: vec![IOBurstPattern {
duration_ms: 100.0,
intensity_ops_per_sec: 10000.0,
idle_time_ms: 50.0,
}],
storage_utilization: 0.6,
})
}
fn analyze_parallelism(&self) -> TorshResult<ParallelismAnalysis> {
Ok(ParallelismAnalysis {
thread_utilization_efficiency: 0.8,
load_balancing_score: 0.9,
synchronization_overhead: 0.05,
parallel_efficiency: 0.85,
critical_path_analysis: CriticalPathAnalysis {
critical_path_length_ms: 100.0,
parallelizable_portion: 0.8,
serial_bottlenecks: vec!["initialization".to_string(), "finalization".to_string()],
},
communication_patterns: vec![CommunicationPattern {
pattern_type: CommunicationPatternType::PointToPoint,
data_volume_bytes: 1024,
frequency_per_sec: 100.0,
latency_impact_ms: 0.1,
}],
})
}
fn identify_bottlenecks(&self) -> TorshResult<Vec<PerformanceBottleneck>> {
let mut bottlenecks = Vec::new();
let avg_cpu =
self.samples.iter().map(|s| s.cpu_utilization).sum::<f64>() / self.samples.len() as f64;
if avg_cpu > 0.9 {
bottlenecks.push(PerformanceBottleneck {
bottleneck_type: BottleneckType::CPUCompute,
severity: avg_cpu,
description: "High CPU utilization indicates compute bottleneck".to_string(),
affected_operations: self
.samples
.iter()
.filter(|s| s.cpu_utilization > 0.9)
.map(|s| s.operation_name.clone())
.collect(),
performance_impact_percent: 25.0,
});
}
let avg_cache_miss =
self.samples.iter().map(|s| s.cache_miss_rate).sum::<f64>() / self.samples.len() as f64;
if avg_cache_miss > 0.1 {
bottlenecks.push(PerformanceBottleneck {
bottleneck_type: BottleneckType::CacheMiss,
severity: avg_cache_miss,
description: "High cache miss rate indicates memory access inefficiency"
.to_string(),
affected_operations: self
.samples
.iter()
.filter(|s| s.cache_miss_rate > 0.1)
.map(|s| s.operation_name.clone())
.collect(),
performance_impact_percent: 15.0,
});
}
Ok(bottlenecks)
}
fn generate_recommendations(
&self,
workload_type: &WorkloadType,
bottlenecks: &[PerformanceBottleneck],
) -> TorshResult<Vec<OptimizationRecommendation>> {
let mut recommendations = Vec::new();
match workload_type {
WorkloadType::ComputeIntensive => {
recommendations.push(OptimizationRecommendation {
recommendation_type: OptimizationType::Vectorization,
priority: 8,
description:
"Consider vectorizing compute-intensive loops for better SIMD utilization"
.to_string(),
expected_improvement_percent: 20.0,
implementation_complexity: 6,
actions: vec![
"Identify hot loops in compute kernels".to_string(),
"Apply SIMD intrinsics or auto-vectorization hints".to_string(),
"Ensure data alignment for optimal vector operations".to_string(),
],
});
}
WorkloadType::MemoryBound => {
recommendations.push(OptimizationRecommendation {
recommendation_type: OptimizationType::MemoryAccess,
priority: 9,
description: "Optimize memory access patterns to reduce bandwidth pressure"
.to_string(),
expected_improvement_percent: 30.0,
implementation_complexity: 7,
actions: vec![
"Implement data prefetching strategies".to_string(),
"Reorganize data structures for better locality".to_string(),
"Consider memory pooling to reduce allocation overhead".to_string(),
],
});
}
_ => {}
}
for bottleneck in bottlenecks {
if bottleneck.bottleneck_type == BottleneckType::CacheMiss {
recommendations.push(OptimizationRecommendation {
recommendation_type: OptimizationType::CacheOptimization,
priority: 7,
description: "Improve cache efficiency to reduce memory latency".to_string(),
expected_improvement_percent: bottleneck.performance_impact_percent * 0.7,
implementation_complexity: 5,
actions: vec![
"Analyze memory access patterns".to_string(),
"Implement cache-friendly data layouts".to_string(),
"Add prefetch instructions for predictable accesses".to_string(),
],
});
}
}
Ok(recommendations)
}
fn analyze_stability(&self) -> TorshResult<StabilityMetrics> {
let durations: Vec<f64> = self.samples.iter().map(|s| s.duration_ms).collect();
let avg_duration = durations.iter().sum::<f64>() / durations.len() as f64;
let variance = durations
.iter()
.map(|x| (x - avg_duration).powi(2))
.sum::<f64>()
/ durations.len() as f64;
let performance_variance = variance.sqrt() / avg_duration;
Ok(StabilityMetrics {
performance_variance,
resource_stability: 0.8, predictability_score: 1.0 - performance_variance.min(1.0),
phase_changes: Vec::new(), })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workload_characterizer_creation() {
let characterizer = WorkloadCharacterizer::new();
assert_eq!(characterizer.samples.len(), 0);
assert!(characterizer.analysis.is_none());
}
#[test]
fn test_add_sample() {
let mut characterizer = WorkloadCharacterizer::new();
let sample = WorkloadSample {
timestamp: 0,
operation_name: "test_op".to_string(),
category: "test".to_string(),
duration_ms: 10.0,
cpu_utilization: 0.8,
memory_mb: 100.0,
cache_miss_rate: 0.05,
io_ops_per_sec: 0.0,
parallel_threads: 1,
flops: 1000000,
bytes_accessed: 1024,
energy_joules: 0.1,
};
characterizer.add_sample(sample).unwrap();
assert_eq!(characterizer.samples.len(), 1);
}
#[test]
fn test_workload_classification() {
let mut characterizer = WorkloadCharacterizer::new();
for i in 0..100 {
let sample = WorkloadSample {
timestamp: i,
operation_name: format!("compute_op_{i}"),
category: "compute".to_string(),
duration_ms: 10.0,
cpu_utilization: 0.95,
memory_mb: 50.0,
cache_miss_rate: 0.02,
io_ops_per_sec: 0.0,
parallel_threads: 1,
flops: 2000000,
bytes_accessed: 1024,
energy_joules: 0.2,
};
characterizer.add_sample(sample).unwrap();
}
let analysis = characterizer.analyze().unwrap();
assert_eq!(analysis.workload_type, WorkloadType::ComputeIntensive);
}
}