use alloc::{string::String, vec::Vec};
use core::{
fmt::{self, Debug},
marker::PhantomData,
};
use serde::{Deserialize, Serialize};
pub use crate::mutators::{mutations::*, token_mutations::*};
use crate::{
bolts::{
rands::Rand,
tuples::{tuple_list, tuple_list_type, NamedTuple},
AsMutSlice, AsSlice,
},
corpus::Corpus,
inputs::Input,
mutators::{MutationResult, Mutator, MutatorsTuple},
state::{HasCorpus, HasMetadata, HasRand},
Error,
};
#[derive(Debug, Serialize, Deserialize)]
pub struct LogMutationMetadata {
pub list: Vec<String>,
}
crate::impl_serdeany!(LogMutationMetadata);
impl AsSlice<String> for LogMutationMetadata {
#[must_use]
fn as_slice(&self) -> &[String] {
self.list.as_slice()
}
}
impl AsMutSlice<String> for LogMutationMetadata {
#[must_use]
fn as_mut_slice(&mut self) -> &mut [String] {
self.list.as_mut_slice()
}
}
impl LogMutationMetadata {
#[must_use]
pub fn new(list: Vec<String>) -> Self {
Self { list }
}
}
pub trait ComposedByMutations<I, MT, S>
where
I: Input,
MT: MutatorsTuple<I, S>,
{
fn mutations(&self) -> &MT;
fn mutations_mut(&mut self) -> &mut MT;
}
pub trait ScheduledMutator<I, MT, S>: ComposedByMutations<I, MT, S> + Mutator<I, S>
where
I: Input,
MT: MutatorsTuple<I, S>,
{
fn iterations(&self, state: &mut S, input: &I) -> u64;
fn schedule(&self, state: &mut S, input: &I) -> usize;
fn scheduled_mutate(
&mut self,
state: &mut S,
input: &mut I,
stage_idx: i32,
) -> Result<MutationResult, Error> {
let mut r = MutationResult::Skipped;
let num = self.iterations(state, input);
for _ in 0..num {
let idx = self.schedule(state, input);
let outcome = self
.mutations_mut()
.get_and_mutate(idx, state, input, stage_idx)?;
if outcome == MutationResult::Mutated {
r = MutationResult::Mutated;
}
}
Ok(r)
}
}
pub struct StdScheduledMutator<I, MT, S>
where
I: Input,
MT: MutatorsTuple<I, S>,
S: HasRand,
{
mutations: MT,
max_stack_pow: u64,
phantom: PhantomData<(I, S)>,
}
impl<I, MT, S> Debug for StdScheduledMutator<I, MT, S>
where
I: Input,
MT: MutatorsTuple<I, S>,
S: HasRand,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"StdScheduledMutator with {} mutations for Input type {}",
self.mutations.len(),
core::any::type_name::<I>()
)
}
}
impl<I, MT, S> Mutator<I, S> for StdScheduledMutator<I, MT, S>
where
I: Input,
MT: MutatorsTuple<I, S>,
S: HasRand,
{
#[inline]
fn mutate(
&mut self,
state: &mut S,
input: &mut I,
stage_idx: i32,
) -> Result<MutationResult, Error> {
self.scheduled_mutate(state, input, stage_idx)
}
}
impl<I, MT, S> ComposedByMutations<I, MT, S> for StdScheduledMutator<I, MT, S>
where
I: Input,
MT: MutatorsTuple<I, S>,
S: HasRand,
{
#[inline]
fn mutations(&self) -> &MT {
&self.mutations
}
#[inline]
fn mutations_mut(&mut self) -> &mut MT {
&mut self.mutations
}
}
impl<I, MT, S> ScheduledMutator<I, MT, S> for StdScheduledMutator<I, MT, S>
where
I: Input,
MT: MutatorsTuple<I, S>,
S: HasRand,
{
fn iterations(&self, state: &mut S, _: &I) -> u64 {
1 << (1 + state.rand_mut().below(self.max_stack_pow))
}
fn schedule(&self, state: &mut S, _: &I) -> usize {
debug_assert!(!self.mutations().is_empty());
state.rand_mut().below(self.mutations().len() as u64) as usize
}
}
impl<I, MT, S> StdScheduledMutator<I, MT, S>
where
I: Input,
MT: MutatorsTuple<I, S>,
S: HasRand,
{
pub fn new(mutations: MT) -> Self {
StdScheduledMutator {
mutations,
max_stack_pow: 7,
phantom: PhantomData,
}
}
pub fn with_max_stack_pow(mutations: MT, max_stack_pow: u64) -> Self {
StdScheduledMutator {
mutations,
max_stack_pow,
phantom: PhantomData,
}
}
}
pub type HavocMutationsType = tuple_list_type!(
BitFlipMutator,
ByteFlipMutator,
ByteIncMutator,
ByteDecMutator,
ByteNegMutator,
ByteRandMutator,
ByteAddMutator,
WordAddMutator,
DwordAddMutator,
QwordAddMutator,
ByteInterestingMutator,
WordInterestingMutator,
DwordInterestingMutator,
BytesDeleteMutator,
BytesDeleteMutator,
BytesDeleteMutator,
BytesDeleteMutator,
BytesExpandMutator,
BytesInsertMutator,
BytesRandInsertMutator,
BytesSetMutator,
BytesRandSetMutator,
BytesCopyMutator,
BytesInsertCopyMutator,
BytesSwapMutator,
CrossoverInsertMutator,
CrossoverReplaceMutator,
);
#[must_use]
pub fn havoc_mutations() -> HavocMutationsType {
tuple_list!(
BitFlipMutator::new(),
ByteFlipMutator::new(),
ByteIncMutator::new(),
ByteDecMutator::new(),
ByteNegMutator::new(),
ByteRandMutator::new(),
ByteAddMutator::new(),
WordAddMutator::new(),
DwordAddMutator::new(),
QwordAddMutator::new(),
ByteInterestingMutator::new(),
WordInterestingMutator::new(),
DwordInterestingMutator::new(),
BytesDeleteMutator::new(),
BytesDeleteMutator::new(),
BytesDeleteMutator::new(),
BytesDeleteMutator::new(),
BytesExpandMutator::new(),
BytesInsertMutator::new(),
BytesRandInsertMutator::new(),
BytesSetMutator::new(),
BytesRandSetMutator::new(),
BytesCopyMutator::new(),
BytesInsertCopyMutator::new(),
BytesSwapMutator::new(),
CrossoverInsertMutator::new(),
CrossoverReplaceMutator::new(),
)
}
#[must_use]
pub fn tokens_mutations() -> tuple_list_type!(TokenInsert, TokenReplace) {
tuple_list!(TokenInsert::new(), TokenReplace::new(),)
}
pub struct LoggerScheduledMutator<I, MT, S, SM>
where
I: Input,
MT: MutatorsTuple<I, S> + NamedTuple,
S: HasRand + HasCorpus<I>,
SM: ScheduledMutator<I, MT, S>,
{
scheduled: SM,
mutation_log: Vec<usize>,
phantom: PhantomData<(I, MT, S)>,
}
impl<I, MT, S, SM> Debug for LoggerScheduledMutator<I, MT, S, SM>
where
I: Input,
MT: MutatorsTuple<I, S> + NamedTuple,
S: HasRand + HasCorpus<I>,
SM: ScheduledMutator<I, MT, S>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"LoggerScheduledMutator with {} mutations for Input type {}",
self.scheduled.mutations().len(),
core::any::type_name::<I>()
)
}
}
impl<I, MT, S, SM> Mutator<I, S> for LoggerScheduledMutator<I, MT, S, SM>
where
I: Input,
MT: MutatorsTuple<I, S> + NamedTuple,
S: HasRand + HasCorpus<I>,
SM: ScheduledMutator<I, MT, S>,
{
fn mutate(
&mut self,
state: &mut S,
input: &mut I,
stage_idx: i32,
) -> Result<MutationResult, Error> {
self.scheduled_mutate(state, input, stage_idx)
}
fn post_exec(
&mut self,
state: &mut S,
_stage_idx: i32,
corpus_idx: Option<usize>,
) -> Result<(), Error> {
if let Some(idx) = corpus_idx {
let mut testcase = (*state.corpus_mut().get(idx)?).borrow_mut();
let mut log = Vec::<String>::new();
while let Some(idx) = self.mutation_log.pop() {
let name = String::from(self.scheduled.mutations().name(idx).unwrap()); log.push(name);
}
let meta = LogMutationMetadata::new(log);
testcase.add_metadata(meta);
};
self.mutation_log.clear();
Ok(())
}
}
impl<I, MT, S, SM> ComposedByMutations<I, MT, S> for LoggerScheduledMutator<I, MT, S, SM>
where
I: Input,
MT: MutatorsTuple<I, S> + NamedTuple,
S: HasRand + HasCorpus<I>,
SM: ScheduledMutator<I, MT, S>,
{
#[inline]
fn mutations(&self) -> &MT {
self.scheduled.mutations()
}
#[inline]
fn mutations_mut(&mut self) -> &mut MT {
self.scheduled.mutations_mut()
}
}
impl<I, MT, S, SM> ScheduledMutator<I, MT, S> for LoggerScheduledMutator<I, MT, S, SM>
where
I: Input,
MT: MutatorsTuple<I, S> + NamedTuple,
S: HasRand + HasCorpus<I>,
SM: ScheduledMutator<I, MT, S>,
{
fn iterations(&self, state: &mut S, _: &I) -> u64 {
1 << (1 + state.rand_mut().below(6))
}
fn schedule(&self, state: &mut S, _: &I) -> usize {
debug_assert!(!self.scheduled.mutations().is_empty());
state
.rand_mut()
.below(self.scheduled.mutations().len() as u64) as usize
}
fn scheduled_mutate(
&mut self,
state: &mut S,
input: &mut I,
stage_idx: i32,
) -> Result<MutationResult, Error> {
let mut r = MutationResult::Skipped;
let num = self.iterations(state, input);
self.mutation_log.clear();
for _ in 0..num {
let idx = self.schedule(state, input);
self.mutation_log.push(idx);
let outcome = self
.mutations_mut()
.get_and_mutate(idx, state, input, stage_idx)?;
if outcome == MutationResult::Mutated {
r = MutationResult::Mutated;
}
}
Ok(r)
}
}
impl<I, MT, S, SM> LoggerScheduledMutator<I, MT, S, SM>
where
I: Input,
MT: MutatorsTuple<I, S> + NamedTuple,
S: HasRand + HasCorpus<I>,
SM: ScheduledMutator<I, MT, S>,
{
pub fn new(scheduled: SM) -> Self {
Self {
scheduled,
mutation_log: vec![],
phantom: PhantomData,
}
}
}
#[cfg(test)]
mod tests {
use crate::{
bolts::rands::{Rand, StdRand, XkcdRand},
corpus::{Corpus, InMemoryCorpus, Testcase},
inputs::{BytesInput, HasBytesVec},
mutators::{
mutations::SpliceMutator,
scheduled::{havoc_mutations, StdScheduledMutator},
Mutator,
},
state::StdState,
};
#[test]
fn test_mut_scheduled() {
let mut rand = XkcdRand::with_seed(5);
let mut corpus: InMemoryCorpus<BytesInput> = InMemoryCorpus::new();
corpus.add(Testcase::new(vec![b'a', b'b', b'c'])).unwrap();
corpus.add(Testcase::new(vec![b'd', b'e', b'f'])).unwrap();
let testcase = corpus.get(0).expect("Corpus did not contain entries");
let mut input = testcase.borrow_mut().load_input().unwrap().clone();
let mut state =
StdState::new(rand, corpus, InMemoryCorpus::new(), &mut (), &mut ()).unwrap();
rand.set_seed(5);
let mut splice = SpliceMutator::new();
splice.mutate(&mut state, &mut input, 0).unwrap();
#[cfg(feature = "std")]
println!("{:?}", input.bytes());
assert_eq!(input.bytes(), &[b'a', b'b', b'f']);
}
#[test]
fn test_havoc() {
let rand = StdRand::with_seed(0x1337);
let mut corpus: InMemoryCorpus<BytesInput> = InMemoryCorpus::new();
corpus.add(Testcase::new(vec![b'a', b'b', b'c'])).unwrap();
corpus.add(Testcase::new(vec![b'd', b'e', b'f'])).unwrap();
let testcase = corpus.get(0).expect("Corpus did not contain entries");
let mut input = testcase.borrow_mut().load_input().unwrap().clone();
let input_prior = input.clone();
let mut state =
StdState::new(rand, corpus, InMemoryCorpus::new(), &mut (), &mut ()).unwrap();
let mut havoc = StdScheduledMutator::new(havoc_mutations());
assert_eq!(input, input_prior);
let mut equal_in_a_row = 0;
for i in 0..42 {
havoc.mutate(&mut state, &mut input, i).unwrap();
equal_in_a_row = if input == input_prior {
equal_in_a_row + 1
} else {
0
};
assert_ne!(equal_in_a_row, 5);
}
}
}
#[cfg(feature = "python")]
#[allow(missing_docs)]
pub mod pybind {
use pyo3::prelude::*;
use super::{havoc_mutations, Debug, HavocMutationsType, StdScheduledMutator};
use crate::{
inputs::BytesInput, mutators::pybind::PythonMutator, state::pybind::PythonStdState,
};
#[pyclass(unsendable, name = "StdHavocMutator")]
#[derive(Debug)]
pub struct PythonStdHavocMutator {
pub inner: StdScheduledMutator<BytesInput, HavocMutationsType, PythonStdState>,
}
#[pymethods]
impl PythonStdHavocMutator {
#[new]
fn new() -> Self {
Self {
inner: StdScheduledMutator::new(havoc_mutations()),
}
}
fn as_mutator(slf: Py<Self>) -> PythonMutator {
PythonMutator::new_std_havoc(slf)
}
}
pub fn register(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PythonStdHavocMutator>()?;
Ok(())
}
}