#![deny(unsafe_code)]
#![deny(missing_docs)]
#![deny(clippy::unwrap_used)]
#![deny(clippy::panic)]
use crate::prelude::error::Result;
use std::time::{Duration, Instant};
pub struct UtilityTimingAnalyzer {
samples: usize,
warmup_iterations: usize,
}
impl UtilityTimingAnalyzer {
#[must_use]
pub fn new(samples: usize, warmup_iterations: usize) -> Self {
Self { samples, warmup_iterations }
}
pub fn analyze_utility_timing<F>(&self, operation: F) -> Result<TimingAnalysis>
where
F: Fn() -> Result<()>,
{
let mut execution_times = Vec::with_capacity(self.samples);
for _ in 0..self.warmup_iterations {
let _ = operation();
}
for _ in 0..self.samples {
let start = Instant::now();
operation()?;
let duration = start.elapsed();
execution_times.push(duration);
}
Ok(TimingAnalysis {
samples: execution_times.clone(),
mean: calculate_mean(&execution_times),
std_dev: calculate_std_dev(&execution_times),
min: *execution_times.iter().min().unwrap_or(&Duration::ZERO),
max: *execution_times.iter().max().unwrap_or(&Duration::ZERO),
})
}
pub fn test_hex_timing_succeeds(&self) -> Result<Vec<SideChannelAssessment>> {
let mut assessments = Vec::new();
let sizes = [16, 64, 256, 1024];
for &size in &sizes {
let data = vec![0u8; size];
let encode_analysis = self.analyze_utility_timing(|| {
let _encoded = hex::encode(&data);
Ok(())
})?;
let hex_string = hex::encode(&data);
let decode_analysis = self.analyze_utility_timing(|| {
let _decoded = hex::decode(&hex_string)?;
Ok(())
})?;
#[allow(clippy::cast_precision_loss)]
let encode_cv =
encode_analysis.std_dev.as_nanos() as f64 / encode_analysis.mean.as_nanos() as f64;
#[allow(clippy::cast_precision_loss)]
let decode_cv =
decode_analysis.std_dev.as_nanos() as f64 / decode_analysis.mean.as_nanos() as f64;
if encode_cv > 0.05 {
assessments.push(SideChannelAssessment {
vulnerability_type: SideChannelType::Timing,
severity: if encode_cv > 0.1 { Severity::Medium } else { Severity::Low },
confidence: (encode_cv * 20.0).min(1.0),
description: format!(
"High timing variation in hex encoding for {} bytes (CV: {:.3})",
size, encode_cv
),
mitigation_suggestions: vec!["Hex encoding timing appears stable".to_string()],
});
}
if decode_cv > 0.05 {
assessments.push(SideChannelAssessment {
vulnerability_type: SideChannelType::Timing,
severity: if decode_cv > 0.1 { Severity::Medium } else { Severity::Low },
confidence: (decode_cv * 20.0).min(1.0),
description: format!(
"High timing variation in hex decoding for {} bytes (CV: {:.3})",
size, decode_cv
),
mitigation_suggestions: vec!["Hex decoding timing appears stable".to_string()],
});
}
}
Ok(assessments)
}
pub fn test_uuid_timing_succeeds(&self) -> Result<Vec<SideChannelAssessment>> {
let mut assessments = Vec::new();
let analysis = self.analyze_utility_timing(|| {
let _uuid = uuid::Uuid::new_v4();
Ok(())
})?;
#[allow(clippy::cast_precision_loss)]
let cv = analysis.std_dev.as_nanos() as f64 / analysis.mean.as_nanos() as f64;
if cv > 0.1 {
assessments.push(SideChannelAssessment {
vulnerability_type: SideChannelType::Timing,
severity: Severity::Low,
confidence: (cv * 10.0).min(1.0),
description: format!("UUID generation timing variation detected (CV: {:.3})", cv),
mitigation_suggestions: vec![
"UUID generation timing is within expected bounds".to_string(),
],
});
}
Ok(assessments)
}
}
#[derive(Debug, Clone)]
pub struct TimingAnalysis {
pub samples: Vec<Duration>,
pub mean: Duration,
pub std_dev: Duration,
pub min: Duration,
pub max: Duration,
}
#[derive(Debug, Clone)]
pub struct SideChannelAssessment {
pub vulnerability_type: SideChannelType,
pub severity: Severity,
pub confidence: f64,
pub description: String,
pub mitigation_suggestions: Vec<String>,
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq)]
pub enum SideChannelType {
Timing,
Cache,
Power,
Electromagnetic,
Acoustic,
MemoryAccess,
}
#[non_exhaustive]
#[derive(Debug, Clone, PartialEq)]
pub enum Severity {
Low,
Medium,
High,
Critical,
}
#[allow(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::arithmetic_side_effects
)]
fn calculate_mean(durations: &[Duration]) -> Duration {
let total_nanos: u128 = durations.iter().map(Duration::as_nanos).sum();
Duration::from_nanos((total_nanos / durations.len() as u128) as u64)
}
#[allow(clippy::cast_precision_loss, clippy::cast_possible_truncation, clippy::cast_sign_loss)]
fn calculate_std_dev(durations: &[Duration]) -> Duration {
let mean = calculate_mean(durations);
#[allow(clippy::cast_precision_loss)]
let mean_nanos = mean.as_nanos() as f64;
#[allow(clippy::cast_precision_loss)]
let variance: f64 = durations
.iter()
.map(|d| {
#[allow(clippy::cast_precision_loss)]
let diff = d.as_nanos() as f64 - mean_nanos;
diff * diff
})
.sum::<f64>()
/ durations.len() as f64;
Duration::from_nanos((variance.sqrt() * 1_000_000_000.0) as u64)
}
pub struct UtilitySideChannelTester {
timing_analyzer: UtilityTimingAnalyzer,
}
impl Default for UtilitySideChannelTester {
fn default() -> Self {
Self::new()
}
}
impl UtilitySideChannelTester {
#[must_use]
pub fn new() -> Self {
Self { timing_analyzer: UtilityTimingAnalyzer::new(1000, 100) }
}
pub fn run_analysis(&self) -> Result<Vec<SideChannelAssessment>> {
tracing::info!("Running utility side-channel analysis");
let mut assessments = Vec::new();
let hex_assessments = self.timing_analyzer.test_hex_timing_succeeds()?;
assessments.extend(hex_assessments);
let uuid_assessments = self.timing_analyzer.test_uuid_timing_succeeds()?;
assessments.extend(uuid_assessments);
let domain_assessments = self.test_domain_access_timing_succeeds()?;
assessments.extend(domain_assessments);
Ok(assessments)
}
fn test_domain_access_timing_succeeds(&self) -> Result<Vec<SideChannelAssessment>> {
let mut assessments = Vec::new();
let analysis = self.timing_analyzer.analyze_utility_timing(|| {
let _domain = crate::types::domains::HYBRID_KEM;
let _domain = crate::types::domains::CASCADE_OUTER;
let _domain = crate::types::domains::CASCADE_INNER;
let _domain = crate::types::domains::SIGNATURE_BIND;
Ok(())
})?;
#[allow(clippy::cast_precision_loss)]
let cv = analysis.std_dev.as_nanos() as f64 / analysis.mean.as_nanos() as f64;
if cv > 0.05 {
assessments.push(SideChannelAssessment {
vulnerability_type: SideChannelType::MemoryAccess,
severity: Severity::Low,
confidence: (cv * 20.0).min(1.0),
description: format!("Domain constant access timing variation (CV: {:.3})", cv),
mitigation_suggestions: vec![
"Domain constants are static and should be constant time".to_string(),
],
});
}
Ok(assessments)
}
#[must_use]
pub fn generate_security_report(&self, assessments: &[SideChannelAssessment]) -> String {
let mut report = String::from("# Utility Side-Channel Security Assessment Report\n\n");
let critical_count =
assessments.iter().filter(|a| a.severity == Severity::Critical).count();
let high_count = assessments.iter().filter(|a| a.severity == Severity::High).count();
let medium_count = assessments.iter().filter(|a| a.severity == Severity::Medium).count();
let low_count = assessments.iter().filter(|a| a.severity == Severity::Low).count();
report.push_str("## Summary\n\n");
report.push_str(&format!("- Critical Vulnerabilities: {}\n", critical_count));
report.push_str(&format!("- High Vulnerabilities: {}\n", high_count));
report.push_str(&format!("- Medium Vulnerabilities: {}\n", medium_count));
report.push_str(&format!("- Low Vulnerabilities: {}\n\n", low_count));
report.push_str("## Detailed Findings\n\n");
for assessment in assessments {
report.push_str(&format!(
"### {} ({:?})\n",
assessment.vulnerability_type.clone() as u8,
assessment.severity
));
report.push_str(&format!("**Confidence:** {:.1}%\n", assessment.confidence * 100.0));
report.push_str(&format!("**Description:** {}\n", assessment.description));
report.push_str("**Mitigation Suggestions:**\n");
for suggestion in &assessment.mitigation_suggestions {
report.push_str(&format!("- {}\n", suggestion));
}
report.push('\n');
}
report
}
}
#[cfg(test)]
#[allow(clippy::panic_in_result_fn)] mod tests {
use super::*;
#[test]
fn test_timing_analyzer_produces_valid_statistics_succeeds()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let analyzer = UtilityTimingAnalyzer::new(10, 5);
let analysis = analyzer.analyze_utility_timing(|| {
let _encoded = hex::encode(b"test");
Ok(())
})?;
assert!(analysis.samples.len() == 10);
assert!(analysis.mean > Duration::from_nanos(0));
assert!(analysis.min <= analysis.mean);
assert!(analysis.max >= analysis.mean);
Ok(())
}
#[test]
fn test_hex_timing_analysis_has_no_critical_issues_succeeds()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let analyzer = UtilityTimingAnalyzer::new(10, 5);
let assessments = analyzer.test_hex_timing_succeeds()?;
assert!(assessments.iter().all(|a| a.severity != Severity::Critical));
Ok(())
}
#[test]
fn test_uuid_timing_analysis_has_no_critical_issues_succeeds()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let analyzer = UtilityTimingAnalyzer::new(10, 5);
let assessments = analyzer.test_uuid_timing_succeeds()?;
assert!(assessments.iter().all(|a| a.severity != Severity::Critical));
Ok(())
}
#[test]
fn test_side_channel_tester_has_no_critical_vulnerabilities_succeeds()
-> std::result::Result<(), Box<dyn std::error::Error>> {
let tester = UtilitySideChannelTester::new();
let assessments = tester.run_analysis()?;
let report = tester.generate_security_report(&assessments);
assert!(report.contains("Utility Side-Channel Security Assessment Report"));
assert!(assessments.iter().all(|a| a.severity != Severity::Critical));
Ok(())
}
}