use std::collections::HashMap;
use super::types::{
ActivationHeatmap, AttentionVisualization, ClusteringResults, DriftInfo, HiddenStateAnalysis,
LayerActivationStats, LayerAnalysis, RepresentationStability, TemporalDynamics,
WeightDistribution,
};
#[derive(Debug)]
pub struct LayerAnalyzer {
layer_activations: HashMap<String, Vec<LayerActivationStats>>,
config: LayerAnalysisConfig,
layer_states: HashMap<String, LayerState>,
}
#[derive(Debug, Clone)]
pub struct LayerAnalysisConfig {
pub dead_neuron_threshold: f64,
pub saturated_neuron_threshold: f64,
pub max_activation_variance: f64,
pub min_health_score: f64,
pub history_length: usize,
}
impl Default for LayerAnalysisConfig {
fn default() -> Self {
Self {
dead_neuron_threshold: 0.1,
saturated_neuron_threshold: 0.1,
max_activation_variance: 2.0,
min_health_score: 0.7,
history_length: 100,
}
}
}
#[derive(Debug, Clone, Default)]
struct LayerState {
health_scores: Vec<f64>,
#[allow(dead_code)]
detected_issues: Vec<String>,
last_analysis_step: usize,
}
impl LayerAnalyzer {
pub fn new() -> Self {
Self {
layer_activations: HashMap::new(),
config: LayerAnalysisConfig::default(),
layer_states: HashMap::new(),
}
}
pub fn with_config(config: LayerAnalysisConfig) -> Self {
Self {
layer_activations: HashMap::new(),
config,
layer_states: HashMap::new(),
}
}
pub fn record_layer_activations(&mut self, layer_name: &str, stats: LayerActivationStats) {
let health_score = self.calculate_layer_health_score(&stats);
let layer_stats = self.layer_activations.entry(layer_name.to_string()).or_default();
layer_stats.push(stats);
if layer_stats.len() > self.config.history_length {
layer_stats.remove(0);
}
let layer_state = self.layer_states.entry(layer_name.to_string()).or_default();
layer_state.health_scores.push(health_score);
if layer_state.health_scores.len() > 50 {
layer_state.health_scores.remove(0);
}
layer_state.last_analysis_step += 1;
}
pub fn record_layer_stats(&mut self, stats: LayerActivationStats) {
let layer_name = stats.layer_name.clone();
self.record_layer_activations(&layer_name, stats);
}
pub fn get_layer_activations(&self, layer_name: &str) -> Option<&[LayerActivationStats]> {
self.layer_activations.get(layer_name).map(|v| v.as_slice())
}
pub fn perform_layer_by_layer_analysis(&self) -> Vec<LayerAnalysis> {
let mut analyses = Vec::new();
for (layer_name, stats_history) in &self.layer_activations {
if let Some(latest_stats) = stats_history.last() {
let analysis = self.analyze_single_layer(layer_name, latest_stats, stats_history);
analyses.push(analysis);
}
}
analyses.sort_by(|a, b| {
a.health_score.partial_cmp(&b.health_score).unwrap_or(std::cmp::Ordering::Equal)
});
analyses
}
pub fn analyze_single_layer(
&self,
layer_name: &str,
current_stats: &LayerActivationStats,
stats_history: &[LayerActivationStats],
) -> LayerAnalysis {
let layer_type = self.infer_layer_type(layer_name);
let health_score = self.calculate_layer_health_score(current_stats);
let issues = self.identify_layer_issues(current_stats, stats_history);
let recommendations = self.generate_layer_recommendations(&issues, &layer_type);
let activation_summary = self.generate_activation_summary(current_stats);
LayerAnalysis {
layer_name: layer_name.to_string(),
layer_type,
health_score,
issues,
recommendations,
activation_summary,
}
}
pub fn calculate_layer_health_score(&self, stats: &LayerActivationStats) -> f64 {
let mut score = 1.0;
if stats.dead_neurons_ratio > self.config.dead_neuron_threshold {
score -= stats.dead_neurons_ratio * 0.5;
}
if stats.saturated_neurons_ratio > self.config.saturated_neuron_threshold {
score -= stats.saturated_neurons_ratio * 0.3;
}
let activation_range = stats.max_activation - stats.min_activation;
if activation_range > 10.0 {
score -= 0.2;
}
if stats.std_activation > self.config.max_activation_variance {
score -= 0.2;
}
if stats.sparsity > 0.1 && stats.sparsity < 0.8 {
score += 0.1;
}
score.max(0.0).min(1.0)
}
pub fn identify_layer_issues(
&self,
current_stats: &LayerActivationStats,
stats_history: &[LayerActivationStats],
) -> Vec<String> {
let mut issues = Vec::new();
if current_stats.dead_neurons_ratio > self.config.dead_neuron_threshold {
issues.push(format!(
"High dead neuron ratio: {:.1}%",
current_stats.dead_neurons_ratio * 100.0
));
}
if current_stats.saturated_neurons_ratio > self.config.saturated_neuron_threshold {
issues.push(format!(
"High saturated neuron ratio: {:.1}%",
current_stats.saturated_neurons_ratio * 100.0
));
}
if current_stats.max_activation - current_stats.min_activation > 100.0 {
issues.push("Extremely wide activation range detected".to_string());
}
if current_stats.std_activation > self.config.max_activation_variance {
issues.push("High activation variance detected".to_string());
}
if stats_history.len() > 5 {
let variance_trend = self.analyze_variance_trend(stats_history);
if variance_trend > 0.1 {
issues.push("Increasing activation variance over time".to_string());
}
}
if current_stats.mean_activation.abs() < 1e-6 {
issues.push("Near-zero mean activation detected".to_string());
}
issues
}
pub fn generate_layer_recommendations(
&self,
issues: &[String],
layer_type: &str,
) -> Vec<String> {
let mut recommendations = Vec::new();
for issue in issues {
if issue.contains("dead neuron") {
match layer_type {
"Linear" => recommendations
.push("Consider using LeakyReLU or ELU activation".to_string()),
"Convolutional" => recommendations.push(
"Consider batch normalization or different initialization".to_string(),
),
_ => recommendations.push(
"Consider different activation function or initialization".to_string(),
),
}
}
if issue.contains("saturated neuron") {
recommendations
.push("Consider gradient clipping or learning rate reduction".to_string());
recommendations.push("Consider batch normalization".to_string());
}
if issue.contains("activation range") {
recommendations.push("Consider activation clipping or normalization".to_string());
}
if issue.contains("variance") {
recommendations.push("Consider weight initialization adjustment".to_string());
recommendations.push("Consider adding regularization".to_string());
}
if issue.contains("zero activation") {
recommendations
.push("Check weight initialization and input preprocessing".to_string());
}
}
recommendations.dedup();
recommendations
}
pub fn analyze_weight_distributions(&self) -> HashMap<String, WeightDistribution> {
let mut distributions = HashMap::new();
for layer_name in self.layer_activations.keys() {
let distribution = self.analyze_layer_weight_distribution(layer_name);
distributions.insert(layer_name.clone(), distribution);
}
distributions
}
pub fn generate_activation_heatmaps(&self) -> HashMap<String, ActivationHeatmap> {
let mut heatmaps = HashMap::new();
for (layer_name, stats_history) in &self.layer_activations {
if let Some(latest_stats) = stats_history.last() {
let heatmap = self.create_activation_heatmap(layer_name, latest_stats);
heatmaps.insert(layer_name.clone(), heatmap);
}
}
heatmaps
}
pub fn generate_attention_visualizations(&self) -> HashMap<String, AttentionVisualization> {
let mut visualizations = HashMap::new();
for layer_name in self.layer_activations.keys() {
if self.infer_layer_type(layer_name) == "Attention" {
let visualization = self.create_attention_visualization(layer_name);
visualizations.insert(layer_name.clone(), visualization);
}
}
visualizations
}
pub fn analyze_hidden_states(&self) -> HashMap<String, HiddenStateAnalysis> {
let mut analyses = HashMap::new();
for layer_name in self.layer_activations.keys() {
let analysis = self.analyze_layer_hidden_states(layer_name);
analyses.insert(layer_name.clone(), analysis);
}
analyses
}
fn infer_layer_type(&self, layer_name: &str) -> String {
let name_lower = layer_name.to_lowercase();
if name_lower.contains("attention") || name_lower.contains("attn") {
"Attention".to_string()
} else if name_lower.contains("linear")
|| name_lower.contains("dense")
|| name_lower.contains("fc")
{
"Linear".to_string()
} else if name_lower.contains("conv") {
"Convolutional".to_string()
} else if name_lower.contains("norm")
|| name_lower.contains("bn")
|| name_lower.contains("ln")
{
"Normalization".to_string()
} else if name_lower.contains("dropout") {
"Dropout".to_string()
} else if name_lower.contains("embed") {
"Embedding".to_string()
} else {
"Unknown".to_string()
}
}
fn generate_activation_summary(&self, stats: &LayerActivationStats) -> String {
format!(
"Mean: {:.3}, Std: {:.3}, Range: [{:.3}, {:.3}], Dead: {:.1}%, Saturated: {:.1}%, Sparsity: {:.1}%",
stats.mean_activation,
stats.std_activation,
stats.min_activation,
stats.max_activation,
stats.dead_neurons_ratio * 100.0,
stats.saturated_neurons_ratio * 100.0,
stats.sparsity * 100.0
)
}
fn analyze_variance_trend(&self, stats_history: &[LayerActivationStats]) -> f64 {
if stats_history.len() < 2 {
return 0.0;
}
let variances: Vec<f64> = stats_history.iter().map(|s| s.std_activation.powi(2)).collect();
self.calculate_trend(&variances)
}
fn calculate_trend(&self, values: &[f64]) -> f64 {
if values.len() < 2 {
return 0.0;
}
let n = values.len() as f64;
let x_mean = (n - 1.0) / 2.0;
let y_mean = values.iter().sum::<f64>() / n;
let mut numerator = 0.0;
let mut denominator = 0.0;
for (i, &y) in values.iter().enumerate() {
let x = i as f64;
numerator += (x - x_mean) * (y - y_mean);
denominator += (x - x_mean).powi(2);
}
if denominator == 0.0 {
0.0
} else {
numerator / denominator
}
}
fn analyze_layer_weight_distribution(&self, layer_name: &str) -> WeightDistribution {
use scirs2_core::random::*; let mut rng = thread_rng();
let layer_type = self.infer_layer_type(layer_name);
let (mean, std_dev) = match layer_type.as_str() {
"Linear" => (rng.random_range(-0.1..0.1), rng.random_range(0.1..0.5)),
"Convolutional" => (rng.random_range(-0.05..0.05), rng.random_range(0.05..0.3)),
"Attention" => (rng.random_range(-0.02..0.02), rng.random_range(0.02..0.2)),
_ => (rng.random_range(-0.1..0.1), rng.random_range(0.1..0.4)),
};
let min = mean - 3.0 * std_dev;
let max = mean + 3.0 * std_dev;
let sparsity = rng.random_range(0.0..0.3);
WeightDistribution {
mean,
std_dev,
min,
max,
sparsity,
distribution_shape: "Normal".to_string(),
}
}
fn create_activation_heatmap(
&self,
layer_name: &str,
stats: &LayerActivationStats,
) -> ActivationHeatmap {
use scirs2_core::random::*; let mut rng = thread_rng();
let (height, width) = if stats.output_shape.len() >= 2 {
(stats.output_shape[0].min(64), stats.output_shape[1].min(64))
} else {
(32, 32)
};
let data: Vec<Vec<f64>> = (0..height)
.map(|_| {
(0..width)
.map(|_| rng.random_range(stats.min_activation..stats.max_activation))
.collect()
})
.collect();
ActivationHeatmap {
data,
dimensions: (height, width),
value_range: (stats.min_activation, stats.max_activation),
interpretation: format!(
"Activation pattern for {} layer",
self.infer_layer_type(layer_name)
),
}
}
fn create_attention_visualization(&self, _layer_name: &str) -> AttentionVisualization {
use scirs2_core::random::*; let mut rng = thread_rng();
let seq_length = rng.random_range(10..50);
let attention_weights: Vec<Vec<f64>> = (0..seq_length)
.map(|_| (0..seq_length).map(|_| rng.random_range(0.0..1.0)).collect())
.collect();
let input_tokens: Vec<String> = (0..seq_length).map(|i| format!("token_{}", i)).collect();
let output_tokens = input_tokens.clone();
let patterns = vec![
"Self-attention pattern detected".to_string(),
"Local attention focused".to_string(),
"Global attention pattern".to_string(),
];
AttentionVisualization {
attention_weights,
input_tokens,
output_tokens,
patterns,
}
}
fn analyze_layer_hidden_states(&self, layer_name: &str) -> HiddenStateAnalysis {
use scirs2_core::random::*; let _rng = thread_rng();
let dimensionality = self.get_hidden_dimensions(layer_name);
let information_content = self.compute_information_content(layer_name);
let clustering_results = self.perform_clustering_analysis(layer_name);
let temporal_dynamics = self.analyze_temporal_dynamics(layer_name);
let representation_stability = self.assess_representation_stability(layer_name);
HiddenStateAnalysis {
dimensionality,
information_content,
clustering_results,
temporal_dynamics,
representation_stability,
}
}
fn get_hidden_dimensions(&self, layer_name: &str) -> usize {
if let Some(stats_history) = self.layer_activations.get(layer_name) {
if let Some(latest_stats) = stats_history.last() {
return latest_stats.output_shape.iter().product();
}
}
512 }
fn compute_information_content(&self, layer_name: &str) -> f64 {
use scirs2_core::random::*; let mut rng = thread_rng();
let layer_type = self.infer_layer_type(layer_name);
match layer_type.as_str() {
"Attention" => rng.random_range(0.6..0.9),
"Linear" => rng.random_range(0.4..0.7),
"Convolutional" => rng.random_range(0.3..0.6),
_ => rng.random_range(0.4..0.7),
}
}
fn perform_clustering_analysis(&self, layer_name: &str) -> ClusteringResults {
use scirs2_core::random::*; let mut rng = thread_rng();
let hidden_dims = self.get_hidden_dimensions(layer_name);
let num_clusters = rng.random_range(5..20);
let cluster_centers: Vec<Vec<f64>> = (0..num_clusters)
.map(|_| (0..hidden_dims.min(10)).map(|_| rng.random_range(-1.0..1.0)).collect())
.collect();
let cluster_assignments: Vec<usize> =
(0..100).map(|_| rng.random_range(0..num_clusters)).collect();
ClusteringResults {
num_clusters,
cluster_centers,
cluster_assignments,
silhouette_score: rng.random_range(0.2..0.8),
inertia: rng.random_range(100.0..1000.0),
}
}
fn analyze_temporal_dynamics(&self, _layer_name: &str) -> TemporalDynamics {
use scirs2_core::random::*; let mut rng = thread_rng();
let consistency = rng.random_range(0.5..0.9);
let change_rate = rng.random_range(0.01..0.1);
let num_windows = rng.random_range(3..8);
let stability_windows: Vec<(usize, usize)> = (0..num_windows)
.map(|i| {
let start = i * 100;
let end = start + rng.random_range(50..150);
(start, end)
})
.collect();
let drift_detected = rng.gen_bool(0.2);
let drift_info = DriftInfo {
drift_detected,
drift_magnitude: if drift_detected { rng.random_range(0.1..0.5) } else { 0.0 },
drift_direction: if drift_detected {
["increasing", "decreasing", "oscillating"][rng.random_range(0..3)].to_string()
} else {
"stable".to_string()
},
onset_step: if drift_detected { Some(rng.random_range(100..1000)) } else { None },
};
TemporalDynamics {
temporal_consistency: consistency,
change_rate,
stability_windows,
drift_detection: drift_info,
}
}
fn assess_representation_stability(&self, layer_name: &str) -> RepresentationStability {
use scirs2_core::random::*; let mut rng = thread_rng();
let layer_type = self.infer_layer_type(layer_name);
let stability_score = match layer_type.as_str() {
"Normalization" => rng.random_range(0.8..0.95),
"Attention" => rng.random_range(0.6..0.85),
"Linear" => rng.random_range(0.5..0.8),
_ => rng.random_range(0.4..0.7),
};
RepresentationStability {
stability_score,
variance_across_batches: rng.random_range(0.01..0.1),
consistency_measure: rng.random_range(0.6..0.9),
robustness_to_noise: rng.random_range(0.3..0.8),
}
}
pub fn clear(&mut self) {
self.layer_activations.clear();
self.layer_states.clear();
}
}
impl Default for LayerAnalyzer {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn create_test_layer_stats(layer_name: &str) -> LayerActivationStats {
LayerActivationStats {
layer_name: layer_name.to_string(),
mean_activation: 0.5,
std_activation: 0.2,
min_activation: 0.0,
max_activation: 1.0,
dead_neurons_ratio: 0.05,
saturated_neurons_ratio: 0.03,
sparsity: 0.3,
output_shape: vec![128, 256],
}
}
#[test]
fn test_layer_analyzer_creation() {
let analyzer = LayerAnalyzer::new();
assert_eq!(analyzer.layer_activations.len(), 0);
}
#[test]
fn test_record_layer_activations() {
let mut analyzer = LayerAnalyzer::new();
let stats = create_test_layer_stats("test_layer");
analyzer.record_layer_activations("test_layer", stats);
assert_eq!(analyzer.layer_activations.len(), 1);
assert!(analyzer.layer_activations.contains_key("test_layer"));
}
#[test]
fn test_layer_health_score_calculation() {
let analyzer = LayerAnalyzer::new();
let stats = create_test_layer_stats("test_layer");
let health_score = analyzer.calculate_layer_health_score(&stats);
assert!(health_score > 0.0 && health_score <= 1.0);
}
#[test]
fn test_layer_type_inference() {
let analyzer = LayerAnalyzer::new();
assert_eq!(analyzer.infer_layer_type("attention_layer"), "Attention");
assert_eq!(analyzer.infer_layer_type("linear_projection"), "Linear");
assert_eq!(analyzer.infer_layer_type("conv2d_layer"), "Convolutional");
assert_eq!(analyzer.infer_layer_type("batch_norm"), "Normalization");
}
#[test]
fn test_issue_identification() {
let analyzer = LayerAnalyzer::new();
let mut stats = create_test_layer_stats("test_layer");
stats.dead_neurons_ratio = 0.2;
let issues = analyzer.identify_layer_issues(&stats, &[]);
assert!(!issues.is_empty());
assert!(issues[0].contains("dead neuron"));
}
#[test]
fn test_layer_analysis() {
let analyzer = LayerAnalyzer::new();
let stats = create_test_layer_stats("attention_layer");
let history = vec![stats.clone()];
let analysis = analyzer.analyze_single_layer("attention_layer", &stats, &history);
assert_eq!(analysis.layer_name, "attention_layer");
assert_eq!(analysis.layer_type, "Attention");
assert!(analysis.health_score > 0.0);
}
}