use core::{fmt::Debug, marker::PhantomData, time::Duration};
#[cfg(feature = "std")]
use std::{
fs,
path::{Path, PathBuf},
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use crate::{
bolts::{
rands::Rand,
serdeany::{NamedSerdeAnyMap, SerdeAny, SerdeAnyMap},
},
corpus::Corpus,
events::{Event, EventFirer, LogSeverity},
feedbacks::Feedback,
fuzzer::{Evaluator, ExecuteInputResult},
generators::Generator,
inputs::Input,
monitors::ClientPerfMonitor,
Error,
};
pub const DEFAULT_MAX_SIZE: usize = 1_048_576;
pub trait State: Serialize + DeserializeOwned {}
pub trait HasCorpus<I: Input> {
type Corpus: Corpus<I>;
fn corpus(&self) -> &Self::Corpus;
fn corpus_mut(&mut self) -> &mut Self::Corpus;
}
pub trait HasMaxSize {
fn max_size(&self) -> usize;
fn set_max_size(&mut self, max_size: usize);
}
pub trait HasSolutions<I: Input> {
type Solutions: Corpus<I>;
fn solutions(&self) -> &Self::Solutions;
fn solutions_mut(&mut self) -> &mut Self::Solutions;
}
pub trait HasRand {
type Rand: Rand;
fn rand(&self) -> &Self::Rand;
fn rand_mut(&mut self) -> &mut Self::Rand;
}
pub trait HasClientPerfMonitor {
fn introspection_monitor(&self) -> &ClientPerfMonitor;
fn introspection_monitor_mut(&mut self) -> &mut ClientPerfMonitor;
}
pub trait HasMetadata {
fn metadata(&self) -> &SerdeAnyMap;
fn metadata_mut(&mut self) -> &mut SerdeAnyMap;
#[inline]
fn add_metadata<M>(&mut self, meta: M)
where
M: SerdeAny,
{
self.metadata_mut().insert(meta);
}
#[inline]
fn has_metadata<M>(&self) -> bool
where
M: SerdeAny,
{
self.metadata().get::<M>().is_some()
}
}
pub trait HasNamedMetadata {
fn named_metadata(&self) -> &NamedSerdeAnyMap;
fn named_metadata_mut(&mut self) -> &mut NamedSerdeAnyMap;
#[inline]
fn add_named_metadata<M>(&mut self, meta: M, name: &str)
where
M: SerdeAny,
{
self.named_metadata_mut().insert(meta, name);
}
#[inline]
fn has_named_metadata<M>(&self, name: &str) -> bool
where
M: SerdeAny,
{
self.named_metadata().contains::<M>(name)
}
}
pub trait HasExecutions {
fn executions(&self) -> &usize;
fn executions_mut(&mut self) -> &mut usize;
}
pub trait HasStartTime {
fn start_time(&self) -> &Duration;
fn start_time_mut(&mut self) -> &mut Duration;
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(bound = "C: serde::Serialize + for<'a> serde::Deserialize<'a>")]
pub struct StdState<C, I, R, SC>
where
C: Corpus<I>,
I: Input,
R: Rand,
SC: Corpus<I>,
{
rand: R,
executions: usize,
start_time: Duration,
corpus: C,
solutions: SC,
metadata: SerdeAnyMap,
named_metadata: NamedSerdeAnyMap,
max_size: usize,
#[cfg(feature = "introspection")]
introspection_monitor: ClientPerfMonitor,
phantom: PhantomData<I>,
}
impl<C, I, R, SC> State for StdState<C, I, R, SC>
where
C: Corpus<I>,
I: Input,
R: Rand,
SC: Corpus<I>,
{
}
impl<C, I, R, SC> HasRand for StdState<C, I, R, SC>
where
C: Corpus<I>,
I: Input,
R: Rand,
SC: Corpus<I>,
{
type Rand = R;
#[inline]
fn rand(&self) -> &Self::Rand {
&self.rand
}
#[inline]
fn rand_mut(&mut self) -> &mut Self::Rand {
&mut self.rand
}
}
impl<C, I, R, SC> HasCorpus<I> for StdState<C, I, R, SC>
where
C: Corpus<I>,
I: Input,
R: Rand,
SC: Corpus<I>,
{
type Corpus = C;
#[inline]
fn corpus(&self) -> &C {
&self.corpus
}
#[inline]
fn corpus_mut(&mut self) -> &mut C {
&mut self.corpus
}
}
impl<C, I, R, SC> HasSolutions<I> for StdState<C, I, R, SC>
where
C: Corpus<I>,
I: Input,
R: Rand,
SC: Corpus<I>,
{
type Solutions = SC;
#[inline]
fn solutions(&self) -> &SC {
&self.solutions
}
#[inline]
fn solutions_mut(&mut self) -> &mut SC {
&mut self.solutions
}
}
impl<C, I, R, SC> HasMetadata for StdState<C, I, R, SC>
where
C: Corpus<I>,
I: Input,
R: Rand,
SC: Corpus<I>,
{
#[inline]
fn metadata(&self) -> &SerdeAnyMap {
&self.metadata
}
#[inline]
fn metadata_mut(&mut self) -> &mut SerdeAnyMap {
&mut self.metadata
}
}
impl<C, I, R, SC> HasNamedMetadata for StdState<C, I, R, SC>
where
C: Corpus<I>,
I: Input,
R: Rand,
SC: Corpus<I>,
{
#[inline]
fn named_metadata(&self) -> &NamedSerdeAnyMap {
&self.named_metadata
}
#[inline]
fn named_metadata_mut(&mut self) -> &mut NamedSerdeAnyMap {
&mut self.named_metadata
}
}
impl<C, I, R, SC> HasExecutions for StdState<C, I, R, SC>
where
C: Corpus<I>,
I: Input,
R: Rand,
SC: Corpus<I>,
{
#[inline]
fn executions(&self) -> &usize {
&self.executions
}
#[inline]
fn executions_mut(&mut self) -> &mut usize {
&mut self.executions
}
}
impl<C, I, R, SC> HasMaxSize for StdState<C, I, R, SC>
where
C: Corpus<I>,
I: Input,
R: Rand,
SC: Corpus<I>,
{
fn max_size(&self) -> usize {
self.max_size
}
fn set_max_size(&mut self, max_size: usize) {
self.max_size = max_size;
}
}
impl<C, I, R, SC> HasStartTime for StdState<C, I, R, SC>
where
C: Corpus<I>,
I: Input,
R: Rand,
SC: Corpus<I>,
{
#[inline]
fn start_time(&self) -> &Duration {
&self.start_time
}
#[inline]
fn start_time_mut(&mut self) -> &mut Duration {
&mut self.start_time
}
}
#[cfg(feature = "std")]
impl<C, I, R, SC> StdState<C, I, R, SC>
where
C: Corpus<I>,
I: Input,
R: Rand,
SC: Corpus<I>,
{
pub fn load_from_directory<E, EM, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
manager: &mut EM,
in_dir: &Path,
forced: bool,
loader: &mut dyn FnMut(&mut Z, &mut Self, &Path) -> Result<I, Error>,
) -> Result<(), Error>
where
Z: Evaluator<E, EM, I, Self>,
{
for entry in fs::read_dir(in_dir)? {
let entry = entry?;
let path = entry.path();
let attributes = fs::metadata(&path);
if attributes.is_err() {
continue;
}
let attr = attributes?;
if attr.is_file() && attr.len() > 0 {
println!("Loading file {:?} ...", &path);
let input = loader(fuzzer, self, &path)?;
if forced {
let _ = fuzzer.add_input(self, executor, manager, input)?;
} else {
let (res, _) = fuzzer.evaluate_input(self, executor, manager, input)?;
if res == ExecuteInputResult::None {
println!("File {:?} was not interesting, skipped.", &path);
}
}
} else if attr.is_dir() {
self.load_from_directory(fuzzer, executor, manager, &path, forced, loader)?;
}
}
Ok(())
}
fn load_initial_inputs_internal<E, EM, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
manager: &mut EM,
in_dirs: &[PathBuf],
forced: bool,
) -> Result<(), Error>
where
Z: Evaluator<E, EM, I, Self>,
EM: EventFirer<I>,
{
for in_dir in in_dirs {
self.load_from_directory(
fuzzer,
executor,
manager,
in_dir,
forced,
&mut |_, _, path| I::from_file(path),
)?;
}
manager.fire(
self,
Event::Log {
severity_level: LogSeverity::Debug,
message: format!("Loaded {} initial testcases.", self.corpus().count()), phantom: PhantomData,
},
)?;
Ok(())
}
pub fn load_initial_inputs_forced<E, EM, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
manager: &mut EM,
in_dirs: &[PathBuf],
) -> Result<(), Error>
where
Z: Evaluator<E, EM, I, Self>,
EM: EventFirer<I>,
{
self.load_initial_inputs_internal(fuzzer, executor, manager, in_dirs, true)
}
pub fn load_initial_inputs<E, EM, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
manager: &mut EM,
in_dirs: &[PathBuf],
) -> Result<(), Error>
where
Z: Evaluator<E, EM, I, Self>,
EM: EventFirer<I>,
{
self.load_initial_inputs_internal(fuzzer, executor, manager, in_dirs, false)
}
}
impl<C, I, R, SC> StdState<C, I, R, SC>
where
C: Corpus<I>,
I: Input,
R: Rand,
SC: Corpus<I>,
{
fn generate_initial_internal<G, E, EM, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
generator: &mut G,
manager: &mut EM,
num: usize,
forced: bool,
) -> Result<(), Error>
where
G: Generator<I, Self>,
Z: Evaluator<E, EM, I, Self>,
EM: EventFirer<I>,
{
let mut added = 0;
for _ in 0..num {
let input = generator.generate(self)?;
if forced {
let _ = fuzzer.add_input(self, executor, manager, input)?;
added += 1;
} else {
let (res, _) = fuzzer.evaluate_input(self, executor, manager, input)?;
if res != ExecuteInputResult::None {
added += 1;
}
}
}
manager.fire(
self,
Event::Log {
severity_level: LogSeverity::Debug,
message: format!("Loaded {added} over {num} initial testcases"),
phantom: PhantomData,
},
)?;
Ok(())
}
pub fn generate_initial_inputs_forced<G, E, EM, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
generator: &mut G,
manager: &mut EM,
num: usize,
) -> Result<(), Error>
where
G: Generator<I, Self>,
Z: Evaluator<E, EM, I, Self>,
EM: EventFirer<I>,
{
self.generate_initial_internal(fuzzer, executor, generator, manager, num, true)
}
pub fn generate_initial_inputs<G, E, EM, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
generator: &mut G,
manager: &mut EM,
num: usize,
) -> Result<(), Error>
where
G: Generator<I, Self>,
Z: Evaluator<E, EM, I, Self>,
EM: EventFirer<I>,
{
self.generate_initial_internal(fuzzer, executor, generator, manager, num, false)
}
pub fn new<F, O>(
rand: R,
corpus: C,
solutions: SC,
feedback: &mut F,
objective: &mut O,
) -> Result<Self, Error>
where
F: Feedback<I, Self>,
O: Feedback<I, Self>,
{
let mut state = Self {
rand,
executions: 0,
start_time: Duration::from_millis(0),
metadata: SerdeAnyMap::default(),
named_metadata: NamedSerdeAnyMap::default(),
corpus,
solutions,
max_size: DEFAULT_MAX_SIZE,
#[cfg(feature = "introspection")]
introspection_monitor: ClientPerfMonitor::new(),
phantom: PhantomData,
};
feedback.init_state(&mut state)?;
objective.init_state(&mut state)?;
Ok(state)
}
}
#[cfg(feature = "introspection")]
impl<C, I, R, SC> HasClientPerfMonitor for StdState<C, I, R, SC>
where
C: Corpus<I>,
I: Input,
R: Rand,
SC: Corpus<I>,
{
fn introspection_monitor(&self) -> &ClientPerfMonitor {
&self.introspection_monitor
}
fn introspection_monitor_mut(&mut self) -> &mut ClientPerfMonitor {
&mut self.introspection_monitor
}
}
#[cfg(not(feature = "introspection"))]
impl<C, I, R, SC> HasClientPerfMonitor for StdState<C, I, R, SC>
where
C: Corpus<I>,
I: Input,
R: Rand,
SC: Corpus<I>,
{
fn introspection_monitor(&self) -> &ClientPerfMonitor {
unimplemented!()
}
fn introspection_monitor_mut(&mut self) -> &mut ClientPerfMonitor {
unimplemented!()
}
}
#[cfg(feature = "python")]
#[allow(missing_docs)]
pub mod pybind {
use alloc::{boxed::Box, vec::Vec};
use std::path::PathBuf;
use pyo3::{prelude::*, types::PyDict};
use crate::{
bolts::{ownedref::OwnedPtrMut, rands::pybind::PythonRand},
corpus::pybind::PythonCorpus,
events::pybind::PythonEventManager,
executors::pybind::PythonExecutor,
feedbacks::pybind::PythonFeedback,
fuzzer::pybind::PythonStdFuzzerWrapper,
generators::pybind::PythonGenerator,
inputs::BytesInput,
pybind::PythonMetadata,
state::{
HasCorpus, HasExecutions, HasMaxSize, HasMetadata, HasRand, HasSolutions, StdState,
},
};
pub type PythonStdState = StdState<PythonCorpus, BytesInput, PythonRand, PythonCorpus>;
#[pyclass(unsendable, name = "StdState")]
#[derive(Debug)]
pub struct PythonStdStateWrapper {
pub inner: OwnedPtrMut<PythonStdState>,
}
impl PythonStdStateWrapper {
pub fn wrap(r: &mut PythonStdState) -> Self {
Self {
inner: OwnedPtrMut::Ptr(r),
}
}
#[must_use]
pub fn unwrap(&self) -> &PythonStdState {
self.inner.as_ref()
}
pub fn unwrap_mut(&mut self) -> &mut PythonStdState {
self.inner.as_mut()
}
}
#[pymethods]
impl PythonStdStateWrapper {
#[new]
fn new(
py_rand: PythonRand,
corpus: PythonCorpus,
solutions: PythonCorpus,
feedback: &mut PythonFeedback,
objective: &mut PythonFeedback,
) -> Self {
Self {
inner: OwnedPtrMut::Owned(Box::new(
StdState::new(py_rand, corpus, solutions, feedback, objective)
.expect("Failed to create a new StdState"),
)),
}
}
fn metadata(&mut self) -> PyObject {
let meta = self.inner.as_mut().metadata_mut();
if !meta.contains::<PythonMetadata>() {
Python::with_gil(|py| {
let dict: Py<PyDict> = PyDict::new(py).into();
meta.insert(PythonMetadata::new(dict.to_object(py)));
});
}
meta.get::<PythonMetadata>().unwrap().map.clone()
}
fn rand(&self) -> PythonRand {
self.inner.as_ref().rand().clone()
}
fn corpus(&self) -> PythonCorpus {
self.inner.as_ref().corpus().clone()
}
fn solutions(&self) -> PythonCorpus {
self.inner.as_ref().solutions().clone()
}
fn executions(&self) -> usize {
*self.inner.as_ref().executions()
}
fn max_size(&self) -> usize {
self.inner.as_ref().max_size()
}
fn generate_initial_inputs(
&mut self,
py_fuzzer: &mut PythonStdFuzzerWrapper,
py_executor: &mut PythonExecutor,
py_generator: &mut PythonGenerator,
py_mgr: &mut PythonEventManager,
num: usize,
) {
self.inner
.as_mut()
.generate_initial_inputs(
py_fuzzer.unwrap_mut(),
py_executor,
py_generator,
py_mgr,
num,
)
.expect("Failed to generate the initial corpus");
}
#[allow(clippy::needless_pass_by_value)]
fn load_initial_inputs(
&mut self,
py_fuzzer: &mut PythonStdFuzzerWrapper,
py_executor: &mut PythonExecutor,
py_mgr: &mut PythonEventManager,
in_dirs: Vec<PathBuf>,
) {
self.inner
.as_mut()
.load_initial_inputs(py_fuzzer.unwrap_mut(), py_executor, py_mgr, &in_dirs)
.expect("Failed to load the initial corpus");
}
}
pub fn register(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PythonStdStateWrapper>()?;
Ok(())
}
}