pub struct FuzzMutationStrategy {
mutation_engine: MutationEngine,
fuzz_config: FuzzConfig,
}
impl FuzzMutationStrategy {
#[provable_contracts_macros::contract("pmat-core.yaml", equation = "check_compliance")]
pub fn new(mutation_engine: MutationEngine, fuzz_config: FuzzConfig) -> Self {
Self {
mutation_engine,
fuzz_config,
}
}
#[provable_contracts_macros::contract("pmat-core.yaml", equation = "check_compliance")]
pub fn config(&self) -> &FuzzConfig {
&self.fuzz_config
}
#[provable_contracts_macros::contract("pmat-core.yaml", equation = "check_compliance")]
pub fn engine(&self) -> &MutationEngine {
&self.mutation_engine
}
#[provable_contracts_macros::contract("pmat-core.yaml", equation = "check_compliance")]
pub fn generate_inputs(&self, count: usize) -> Vec<Vec<u8>> {
use rand::Rng;
let mut rng = rand::rng();
(0..count)
.map(|_| {
let len = rng.random_range(1..=256);
(0..len).map(|_| rng.random::<u8>()).collect()
})
.collect()
}
#[provable_contracts_macros::contract("pmat-core.yaml", equation = "check_compliance")]
pub fn generate_grammar_based_inputs(&self, count: usize, _format: &str) -> Vec<Vec<u8>> {
(0..count)
.map(|i| {
if i % 3 == 0 {
b"{}".to_vec()
} else if i % 3 == 1 {
b"[]".to_vec()
} else {
vec![]
}
})
.collect()
}
#[provable_contracts_macros::contract("pmat-core.yaml", equation = "check_compliance")]
pub async fn fuzz_mutant(&self, mutant: &Mutant) -> Result<FuzzResult> {
use std::panic;
use std::time::Instant;
use tokio::time::timeout;
let mut crashes = Vec::new();
let mut hangs = Vec::new();
let baseline_coverage = CoverageInfo::new();
let mut corpus = CoverageCorpus::new(baseline_coverage);
let inputs = match self.fuzz_config.input_generator {
InputGeneratorType::Random => self.generate_inputs(self.fuzz_config.iterations),
InputGeneratorType::GrammarBased => {
self.generate_grammar_based_inputs(self.fuzz_config.iterations, "generic")
}
InputGeneratorType::MutationBased => {
self.generate_inputs(self.fuzz_config.iterations)
}
InputGeneratorType::CoverageGuided => {
self.generate_inputs(std::cmp::min(100, self.fuzz_config.iterations))
}
};
let is_coverage_guided = matches!(
self.fuzz_config.input_generator,
InputGeneratorType::CoverageGuided
);
let mut iteration = 0;
let max_iterations = self.fuzz_config.iterations;
while iteration < max_iterations {
let input = if iteration < inputs.len() {
inputs[iteration].clone()
} else if is_coverage_guided && !corpus.interesting_inputs.is_empty() {
let seed = &corpus.get_seeds(1)[0];
mutate_input(seed)
} else {
break;
};
let _start = Instant::now();
let result = timeout(
self.fuzz_config.iteration_timeout,
tokio::task::spawn_blocking({
let mutant_source = mutant.mutated_source.clone();
let input_clone = input.clone();
move || {
let exec_result = panic::catch_unwind(|| {
execute_mutant_with_input(&mutant_source, &input_clone)
});
let coverage = CoverageTracker::simulate_coverage(&input_clone);
(exec_result, coverage)
}
}),
)
.await;
match result {
Ok(Ok((Ok(_), coverage))) => {
if is_coverage_guided {
corpus.add_if_interesting(input.clone(), coverage);
}
}
Ok(Ok((Err(_), coverage))) => {
if self.fuzz_config.crash_detection {
crashes.push(format!("crash_at_input_{}", iteration));
}
if is_coverage_guided {
corpus.add_if_interesting(input.clone(), coverage);
}
}
Ok(Err(_)) => {
}
Err(_) => {
hangs.push(input.clone());
}
}
iteration += 1;
if !crashes.is_empty() && !hangs.is_empty() {
break;
}
}
let coverage_increase = if is_coverage_guided {
corpus.total_coverage_increase()
} else {
0.0
};
Ok(FuzzResult {
crashes,
hangs,
coverage_increase,
})
}
#[provable_contracts_macros::contract("pmat-core.yaml", equation = "check_compliance")]
pub async fn execute_from_source(&self, source: &str) -> Result<FuzzMutationReport> {
use std::time::Instant;
let start = Instant::now();
let mutants = self
.mutation_engine
.generate_mutants_from_source(std::path::Path::new("fuzz_target.rs"), source)
.await?;
let total_mutants = mutants.len();
let mut results = Vec::new();
let mut mutants_with_crashes = 0;
let mut mutants_with_hangs = 0;
for mutant in mutants {
let fuzz_result = self.fuzz_mutant(&mutant).await?;
if fuzz_result.has_crashes() {
mutants_with_crashes += 1;
}
if fuzz_result.has_hangs() {
mutants_with_hangs += 1;
}
results.push((mutant, fuzz_result));
}
let execution_time = start.elapsed();
Ok(FuzzMutationReport {
total_mutants,
mutants_with_crashes,
mutants_with_hangs,
execution_time,
results,
})
}
#[provable_contracts_macros::contract("pmat-core.yaml", equation = "check_compliance")]
pub async fn execute_from_source_parallel(
&self,
source: &str,
workers: usize,
) -> Result<FuzzMutationReport> {
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Semaphore;
let start = Instant::now();
let mutants = self
.mutation_engine
.generate_mutants_from_source(std::path::Path::new("fuzz_target.rs"), source)
.await?;
let total_mutants = mutants.len();
let semaphore = Arc::new(Semaphore::new(workers));
let mut tasks = Vec::new();
for mutant in mutants {
let sem = semaphore.clone();
let config = self.fuzz_config.clone();
let engine = self.mutation_engine.clone();
let task = tokio::spawn(async move {
let _permit = sem
.acquire()
.await
.expect("Semaphore must not be closed during fuzzing");
let strategy = FuzzMutationStrategy::new(engine, config);
let fuzz_result = strategy
.fuzz_mutant(&mutant)
.await
.expect("Fuzz mutant operation must succeed");
(mutant, fuzz_result)
});
tasks.push(task);
}
let mut results = Vec::new();
let mut mutants_with_crashes = 0;
let mut mutants_with_hangs = 0;
for task in tasks {
let (mutant, fuzz_result) = task.await?;
if fuzz_result.has_crashes() {
mutants_with_crashes += 1;
}
if fuzz_result.has_hangs() {
mutants_with_hangs += 1;
}
results.push((mutant, fuzz_result));
}
let execution_time = start.elapsed();
Ok(FuzzMutationReport {
total_mutants,
mutants_with_crashes,
mutants_with_hangs,
execution_time,
results,
})
}
}