#![deny(unsafe_code)]
#![deny(missing_docs)]
#![deny(clippy::unwrap_used)]
#![deny(clippy::panic)]
use crate::prelude::error::{LatticeArcError, Result};
const MAX_CONSECUTIVE_IDENTICAL_BYTES: usize = 5;
const MIN_SAMPLE_SIZE: usize = 32;
const DEFAULT_SAMPLE_SIZE: usize = 256;
const MAX_FREQUENCY_DEVIATION_RATIO: f64 = 0.5;
pub fn repetition_test(bytes: &[u8]) -> Result<()> {
if bytes.is_empty() {
return Err(LatticeArcError::ValidationError {
message: "Repetition test requires non-empty input".to_string(),
});
}
if bytes.len() < 2 {
return Ok(());
}
let mut consecutive_count = 1usize;
let mut max_consecutive = 1usize;
let mut prev_byte = bytes.first().ok_or_else(|| LatticeArcError::ValidationError {
message: "Failed to access first byte".to_string(),
})?;
for byte in bytes.iter().skip(1) {
if byte == prev_byte {
consecutive_count = consecutive_count.saturating_add(1);
if consecutive_count > max_consecutive {
max_consecutive = consecutive_count;
}
} else {
consecutive_count = 1;
}
prev_byte = byte;
}
if max_consecutive > MAX_CONSECUTIVE_IDENTICAL_BYTES {
return Err(LatticeArcError::ValidationError {
message: format!(
"Repetition test failed: found {} consecutive identical bytes (max allowed: {})",
max_consecutive, MAX_CONSECUTIVE_IDENTICAL_BYTES
),
});
}
Ok(())
}
pub fn frequency_test(bytes: &[u8]) -> Result<()> {
if bytes.len() < MIN_SAMPLE_SIZE {
return Err(LatticeArcError::ValidationError {
message: format!(
"Frequency test requires at least {} bytes, got {}",
MIN_SAMPLE_SIZE,
bytes.len()
),
});
}
let mut counts = [0u32; 256];
for byte in bytes {
let index = usize::from(*byte);
if let Some(count) = counts.get_mut(index) {
*count = count.saturating_add(1);
}
}
let total_bytes = bytes.len();
#[allow(clippy::cast_precision_loss)]
let expected_count_f64 = total_bytes as f64 / 256.0;
let max_deviation = if total_bytes < 512 {
(expected_count_f64 * 6.0).max(8.0)
} else if total_bytes < 1024 {
(expected_count_f64 * 4.0).max(6.0)
} else if total_bytes < 4096 {
expected_count_f64 * 3.0
} else {
expected_count_f64 * (1.0 + MAX_FREQUENCY_DEVIATION_RATIO)
};
let mut max_count = 0u32;
let mut max_byte = 0u8;
for (byte_val, count) in counts.iter().enumerate() {
if *count > max_count {
max_count = *count;
max_byte = u8::try_from(byte_val).unwrap_or(0);
}
}
let max_count_f64 = f64::from(max_count);
if max_count_f64 > max_deviation {
return Err(LatticeArcError::ValidationError {
message: format!(
"Frequency test failed: byte 0x{:02X} appears {} times \
(expected ~{:.1}, max allowed {:.1})",
max_byte, max_count, expected_count_f64, max_deviation
),
});
}
if total_bytes >= 512 {
let zero_count = counts.iter().filter(|&&c| c == 0).count();
let max_zeros = if total_bytes >= 2048 {
64 } else {
128 };
if zero_count > max_zeros {
return Err(LatticeArcError::ValidationError {
message: format!(
"Frequency test failed: {} out of 256 byte values never appeared \
(max allowed {} for {} byte sample)",
zero_count, max_zeros, total_bytes
),
});
}
}
Ok(())
}
#[allow(clippy::arithmetic_side_effects)] #[allow(clippy::cast_precision_loss)] pub fn monobit_test(bytes: &[u8]) -> Result<()> {
if bytes.is_empty() {
return Err(LatticeArcError::ValidationError {
message: "Monobit test requires non-empty input".to_string(),
});
}
let one_count: u64 = bytes.iter().map(|b| u64::from(b.count_ones())).sum();
let total_bits = (bytes.len() as u64).saturating_mul(8);
let proportion = one_count as f64 / total_bits as f64;
let min_proportion = if total_bits < 1000 { 0.35 } else { 0.40 };
let max_proportion = if total_bits < 1000 { 0.65 } else { 0.60 };
if proportion < min_proportion || proportion > max_proportion {
return Err(LatticeArcError::ValidationError {
message: format!(
"Monobit test failed: {:.1}% ones (expected 40-60% for {} bits)",
proportion * 100.0,
total_bits
),
});
}
Ok(())
}
#[allow(clippy::arithmetic_side_effects)] #[allow(clippy::cast_precision_loss)] #[allow(clippy::cast_possible_truncation)] #[allow(clippy::cast_sign_loss)] #[allow(clippy::indexing_slicing)] pub fn runs_test(bytes: &[u8]) -> Result<()> {
if bytes.len() < 8 {
return Err(LatticeArcError::ValidationError {
message: "Runs test requires at least 8 bytes".to_string(),
});
}
let total_bits = bytes.len().saturating_mul(8);
let mut runs: u64 = 1; let mut prev_bit = (bytes[0] >> 7) & 1;
for (i, &byte) in bytes.iter().enumerate() {
let start_bit = if i == 0 { 6 } else { 7 }; for bit_pos in (0..=start_bit).rev() {
let current_bit = (byte >> bit_pos) & 1;
if current_bit != prev_bit {
runs = runs.saturating_add(1);
prev_bit = current_bit;
}
}
}
let expected_runs = total_bits as f64 / 2.0;
let deviation = if total_bits < 1000 { 0.35 } else { 0.30 };
let min_runs = (expected_runs * (1.0 - deviation)) as u64;
let max_runs = (expected_runs * (1.0 + deviation)) as u64;
if runs < min_runs || runs > max_runs {
return Err(LatticeArcError::ValidationError {
message: format!(
"Runs test failed: {} runs found (expected {}-{} for {} bits)",
runs, min_runs, max_runs, total_bits
),
});
}
Ok(())
}
pub fn adaptive_proportion_test(bytes: &[u8]) -> Result<()> {
adaptive_proportion_test_with_params(bytes, 512, 0.4)
}
#[allow(clippy::arithmetic_side_effects)] #[allow(clippy::cast_precision_loss)] #[allow(clippy::indexing_slicing)] pub fn adaptive_proportion_test_with_params(
bytes: &[u8],
window_size: usize,
cutoff_ratio: f64,
) -> Result<()> {
if bytes.len() < window_size || window_size == 0 {
return Ok(());
}
let end = bytes.len().saturating_sub(window_size);
for window_start in 0..=end {
let window_end = window_start.saturating_add(window_size);
let window = bytes.get(window_start..window_end).unwrap_or(&[]);
let mut counts = [0u32; 256];
for &byte in window {
let idx = byte as usize;
counts[idx] = counts[idx].saturating_add(1);
}
let max_count = counts.iter().max().copied().unwrap_or(0);
let max_ratio = f64::from(max_count) / window_size as f64;
if max_ratio > cutoff_ratio {
return Err(LatticeArcError::ValidationError {
message: format!(
"Adaptive proportion test failed at offset {}: \
most common byte appears {:.1}% of window (max {:.1}%)",
window_start,
max_ratio * 100.0,
cutoff_ratio * 100.0
),
});
}
}
Ok(())
}
#[allow(clippy::arithmetic_side_effects)] #[allow(clippy::indexing_slicing)] pub fn longest_run_test(bytes: &[u8]) -> Result<()> {
if bytes.is_empty() {
return Err(LatticeArcError::ValidationError {
message: "Longest run test requires non-empty input".to_string(),
});
}
let total_bits = bytes.len().saturating_mul(8);
let max_allowed_run = if total_bits < 100 {
12 } else if total_bits < 1000 {
16 } else if total_bits < 10000 {
20 } else {
26 };
let mut current_run: usize = 1;
let mut longest_run: usize = 1;
let mut prev_bit = (bytes[0] >> 7) & 1;
for (i, &byte) in bytes.iter().enumerate() {
let start_bit = if i == 0 { 6 } else { 7 };
for bit_pos in (0..=start_bit).rev() {
let current_bit = (byte >> bit_pos) & 1;
if current_bit == prev_bit {
current_run = current_run.saturating_add(1);
if current_run > longest_run {
longest_run = current_run;
}
} else {
current_run = 1;
prev_bit = current_bit;
}
}
}
if longest_run > max_allowed_run {
return Err(LatticeArcError::ValidationError {
message: format!(
"Longest run test failed: found run of {} bits (max allowed {} for {} total bits)",
longest_run, max_allowed_run, total_bits
),
});
}
Ok(())
}
pub fn run_entropy_health_tests() -> Result<()> {
use super::random_bytes;
let bytes = random_bytes(DEFAULT_SAMPLE_SIZE);
repetition_test(&bytes).map_err(|e| LatticeArcError::ValidationError {
message: format!("Entropy health check failed - {}", e),
})?;
frequency_test(&bytes).map_err(|e| LatticeArcError::ValidationError {
message: format!("Entropy health check failed - {}", e),
})?;
monobit_test(&bytes).map_err(|e| LatticeArcError::ValidationError {
message: format!("Entropy health check failed - {}", e),
})?;
runs_test(&bytes).map_err(|e| LatticeArcError::ValidationError {
message: format!("Entropy health check failed - {}", e),
})?;
longest_run_test(&bytes).map_err(|e| LatticeArcError::ValidationError {
message: format!("Entropy health check failed - {}", e),
})?;
adaptive_proportion_test(&bytes).map_err(|e| LatticeArcError::ValidationError {
message: format!("Entropy health check failed - {}", e),
})?;
Ok(())
}
pub fn run_entropy_health_tests_on_bytes(bytes: &[u8]) -> Result<()> {
repetition_test(bytes)?;
frequency_test(bytes)?;
monobit_test(bytes)?;
runs_test(bytes)?;
longest_run_test(bytes)?;
adaptive_proportion_test(bytes)?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_repetition_test_passes_on_varied_input_succeeds() {
let bytes = vec![0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08];
assert!(repetition_test(&bytes).is_ok());
}
#[test]
fn test_repetition_test_passes_with_max_allowed_consecutive_succeeds() {
let bytes = vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x03];
assert!(repetition_test(&bytes).is_ok());
}
#[test]
fn test_repetition_test_fails_with_6_consecutive_fails() {
let bytes = vec![0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02];
let result = repetition_test(&bytes);
assert!(result.is_err());
assert!(matches!(result, Err(LatticeArcError::ValidationError { .. })));
}
#[test]
fn test_repetition_test_fails_on_all_same_fails() {
let bytes = vec![0xFF; 100];
assert!(repetition_test(&bytes).is_err());
}
#[test]
fn test_repetition_test_empty_input_fails() {
let bytes: Vec<u8> = vec![];
assert!(repetition_test(&bytes).is_err());
}
#[test]
fn test_repetition_test_single_byte_succeeds() {
let bytes = vec![0x42];
assert!(repetition_test(&bytes).is_ok());
}
#[test]
fn test_repetition_test_two_same_bytes_succeeds() {
let bytes = vec![0x42, 0x42];
assert!(repetition_test(&bytes).is_ok());
}
#[test]
fn test_frequency_test_passes_on_uniform_succeeds() {
let mut bytes = Vec::with_capacity(256);
for i in 0..=255u8 {
bytes.push(i);
}
assert!(frequency_test(&bytes).is_ok());
}
#[test]
fn test_frequency_test_fails_on_all_same_fails() {
let bytes = vec![0x00; 256];
assert!(frequency_test(&bytes).is_err());
}
#[test]
fn test_frequency_test_too_small_sample_fails() {
let bytes = vec![0x01, 0x02, 0x03];
let result = frequency_test(&bytes);
assert!(result.is_err());
if let Err(LatticeArcError::ValidationError { message }) = result {
assert!(message.contains("at least"));
}
}
#[test]
#[allow(clippy::cast_possible_truncation)]
fn test_frequency_test_min_sample_size_has_correct_size() {
let mut bytes = Vec::with_capacity(MIN_SAMPLE_SIZE);
for i in 0..MIN_SAMPLE_SIZE {
bytes.push((i % 256) as u8);
}
assert!(frequency_test(&bytes).is_ok());
}
#[test]
fn test_frequency_test_heavily_biased_succeeds() {
let mut bytes = vec![0x00; 200];
bytes.extend_from_slice(&[0x01; 56]);
assert!(frequency_test(&bytes).is_err());
}
#[test]
fn test_run_entropy_health_tests_passes_on_csprng_output_succeeds() {
if run_entropy_health_tests().is_err() {
assert!(
run_entropy_health_tests().is_ok(),
"entropy health tests failed twice in a row"
);
}
}
#[test]
fn test_run_entropy_health_tests_on_good_bytes_succeeds() {
let mut bytes = Vec::with_capacity(256);
for i in 0..=255u8 {
bytes.push(i);
}
for i in 0..bytes.len() {
let j = (i.wrapping_mul(7).wrapping_add(13)) % bytes.len();
bytes.swap(i, j);
}
assert!(run_entropy_health_tests_on_bytes(&bytes).is_ok());
}
#[test]
fn test_run_entropy_health_tests_on_bad_bytes_returns_error() {
let bytes = vec![0x42; 256];
assert!(run_entropy_health_tests_on_bytes(&bytes).is_err());
}
#[test]
fn test_repetition_at_end_fails_detection_fails() {
let mut bytes = vec![0x01, 0x02, 0x03, 0x04];
bytes.extend_from_slice(&[0xFF; 6]); assert!(repetition_test(&bytes).is_err());
}
#[test]
fn test_repetition_in_middle_fails_detection_fails() {
let mut bytes = vec![0x01, 0x02];
bytes.extend_from_slice(&[0xAA; 6]); bytes.extend_from_slice(&[0x03, 0x04]);
assert!(repetition_test(&bytes).is_err());
}
}