use std::{fs, net::SocketAddr, path::PathBuf, time::Duration};
use libafl::{
bolts::{
core_affinity::Cores,
current_nanos,
launcher::Launcher,
rands::StdRand,
shmem::{ShMem, ShMemProvider, UnixShMemProvider},
tuples::{tuple_list, Merge},
AsMutSlice,
},
corpus::{CachedOnDiskCorpus, Corpus, OnDiskCorpus},
events::{EventConfig, EventRestarter, LlmpRestartingEventManager},
executors::{forkserver::ForkserverExecutorBuilder, TimeoutForkserverExecutor},
feedback_or, feedback_or_fast,
feedbacks::{CrashFeedback, MaxMapFeedback, TimeFeedback, TimeoutFeedback},
fuzzer::{Fuzzer, StdFuzzer},
generators::RandBytesGenerator,
monitors::MultiMonitor,
mutators::{
scheduled::{havoc_mutations, tokens_mutations, StdScheduledMutator},
token_mutations::Tokens,
},
observers::{ConstMapObserver, HitcountsMapObserver, TimeObserver},
schedulers::{IndexesLenTimeMinimizerScheduler, QueueScheduler},
stages::StdMutationalStage,
state::{HasCorpus, HasMetadata, StdState},
Error,
};
use typed_builder::TypedBuilder;
use crate::{CORPUS_CACHE_SIZE, DEFAULT_TIMEOUT_SECS};
pub const DEFAULT_MAP_SIZE: usize = 65536;
#[derive(Debug, TypedBuilder)]
pub struct ForkserverBytesCoverageSugar<'a, const MAP_SIZE: usize> {
#[builder(default = None, setter(strip_option))]
configuration: Option<String>,
#[builder(default = None)]
timeout: Option<u64>,
input_dirs: &'a [PathBuf],
output_dir: PathBuf,
#[builder(default = None)]
tokens_file: Option<PathBuf>,
#[builder(default = None)]
use_cmplog: Option<bool>,
#[builder(default = 1337_u16)]
broker_port: u16,
cores: &'a Cores,
#[builder(default = None, setter(strip_option))]
remote_broker_addr: Option<SocketAddr>,
program: String,
arguments: &'a [String],
#[builder(default = false)]
shmem_testcase: bool,
#[builder(default = false)]
debug_output: bool,
#[builder(default = None)]
iterations: Option<u64>,
}
#[allow(clippy::similar_names)]
impl<'a, const MAP_SIZE: usize> ForkserverBytesCoverageSugar<'a, MAP_SIZE> {
#[allow(clippy::too_many_lines, clippy::similar_names)]
pub fn run(&mut self) {
let conf = match self.configuration.as_ref() {
Some(name) => EventConfig::from_name(name),
None => EventConfig::AlwaysUnique,
};
if self.use_cmplog.unwrap_or(false) {
println!("[WARNING] use of cmplog not currently supported, use_cmplog ignored.");
}
let timeout = Duration::from_secs(self.timeout.unwrap_or(DEFAULT_TIMEOUT_SECS));
let mut out_dir = self.output_dir.clone();
if fs::create_dir(&out_dir).is_err() {
println!("Out dir at {:?} already exists.", &out_dir);
assert!(
out_dir.is_dir(),
"Out dir at {:?} is not a valid directory!",
&out_dir
);
}
let mut crashes = out_dir.clone();
crashes.push("crashes");
out_dir.push("queue");
let shmem_provider = UnixShMemProvider::new().expect("Failed to init shared memory");
let mut shmem_provider_client = shmem_provider.clone();
let monitor = MultiMonitor::new(|s| println!("{s}"));
let mut run_client = |state: Option<_>,
mut mgr: LlmpRestartingEventManager<_, _>,
_core_id| {
let mut shmem = shmem_provider_client.new_shmem(MAP_SIZE).unwrap();
shmem.write_to_env("__AFL_SHM_ID").unwrap();
let shmem_map = shmem.as_mut_slice();
let edges_observer = unsafe {
HitcountsMapObserver::new(ConstMapObserver::<_, MAP_SIZE>::from_mut_ptr(
"shared_mem",
shmem_map.as_mut_ptr(),
))
};
let time_observer = TimeObserver::new("time");
let mut feedback = feedback_or!(
MaxMapFeedback::new_tracking(&edges_observer, true, false),
TimeFeedback::with_observer(&time_observer)
);
let mut objective = feedback_or_fast!(CrashFeedback::new(), TimeoutFeedback::new());
let mut state = state.unwrap_or_else(|| {
StdState::new(
StdRand::with_seed(current_nanos()),
CachedOnDiskCorpus::new(out_dir.clone(), CORPUS_CACHE_SIZE).unwrap(),
OnDiskCorpus::new(crashes.clone()).unwrap(),
&mut feedback,
&mut objective,
)
.unwrap()
});
if let Some(tokens_file) = &self.tokens_file {
if state.metadata().get::<Tokens>().is_none() {
state.add_metadata(Tokens::from_file(tokens_file)?);
}
}
let scheduler = IndexesLenTimeMinimizerScheduler::new(QueueScheduler::new());
let mut fuzzer = StdFuzzer::new(scheduler, feedback, objective);
let forkserver = if self.shmem_testcase {
ForkserverExecutorBuilder::new()
.program(self.program.clone())
.args(self.arguments)
.debug_child(self.debug_output)
.shmem_provider(&mut shmem_provider_client)
.build(tuple_list!(edges_observer, time_observer))
} else {
ForkserverExecutorBuilder::new()
.program(self.program.clone())
.args(self.arguments)
.debug_child(self.debug_output)
.build(tuple_list!(edges_observer, time_observer))
};
let mut executor = TimeoutForkserverExecutor::new(
forkserver.expect("Failed to create the executor."),
timeout,
)
.expect("Failed to create the executor.");
if state.corpus().count() < 1 {
if self.input_dirs.is_empty() {
let mut generator = RandBytesGenerator::new(32);
state
.generate_initial_inputs(
&mut fuzzer,
&mut executor,
&mut generator,
&mut mgr,
8,
)
.expect("Failed to generate the initial corpus");
println!(
"We imported {} inputs from the generator.",
state.corpus().count()
);
} else {
println!("Loading from {:?}", &self.input_dirs);
state
.load_initial_inputs(&mut fuzzer, &mut executor, &mut mgr, self.input_dirs)
.unwrap_or_else(|_| {
panic!("Failed to load initial corpus at {:?}", &self.input_dirs);
});
println!("We imported {} inputs from disk.", state.corpus().count());
}
}
if self.tokens_file.is_some() {
let mutator = StdScheduledMutator::new(havoc_mutations().merge(tokens_mutations()));
let mutational = StdMutationalStage::new(mutator);
let mut stages = tuple_list!(mutational);
if let Some(iters) = self.iterations {
fuzzer.fuzz_loop_for(
&mut stages,
&mut executor,
&mut state,
&mut mgr,
iters,
)?;
mgr.on_restart(&mut state)?;
std::process::exit(0);
} else {
fuzzer.fuzz_loop(&mut stages, &mut executor, &mut state, &mut mgr)?;
}
} else {
let mutator = StdScheduledMutator::new(havoc_mutations());
let mutational = StdMutationalStage::new(mutator);
let mut stages = tuple_list!(mutational);
if let Some(iters) = self.iterations {
fuzzer.fuzz_loop_for(
&mut stages,
&mut executor,
&mut state,
&mut mgr,
iters,
)?;
mgr.on_restart(&mut state)?;
std::process::exit(0);
} else {
fuzzer.fuzz_loop(&mut stages, &mut executor, &mut state, &mut mgr)?;
}
}
Ok(())
};
let launcher = Launcher::builder()
.shmem_provider(shmem_provider)
.configuration(conf)
.monitor(monitor)
.run_client(&mut run_client)
.cores(self.cores)
.broker_port(self.broker_port)
.remote_broker_addr(self.remote_broker_addr);
#[cfg(unix)]
let launcher = launcher.stdout_file(Some("/dev/null"));
match launcher.build().launch() {
Ok(()) => (),
Err(Error::ShuttingDown) => println!("\nFuzzing stopped by user. Good Bye."),
Err(err) => panic!("Fuzzingg failed {err:?}"),
}
}
}
#[cfg(feature = "python")]
pub mod pybind {
use std::path::PathBuf;
use libafl::bolts::core_affinity::Cores;
use pyo3::prelude::*;
use crate::forkserver;
#[pyclass(unsendable)]
struct ForkserverBytesCoverageSugar {
input_dirs: Vec<PathBuf>,
output_dir: PathBuf,
broker_port: u16,
cores: Cores,
use_cmplog: Option<bool>,
iterations: Option<u64>,
tokens_file: Option<PathBuf>,
timeout: Option<u64>,
}
#[pymethods]
impl ForkserverBytesCoverageSugar {
#[new]
#[allow(clippy::too_many_arguments)]
fn new(
input_dirs: Vec<PathBuf>,
output_dir: PathBuf,
broker_port: u16,
cores: Vec<usize>,
use_cmplog: Option<bool>,
iterations: Option<u64>,
tokens_file: Option<PathBuf>,
timeout: Option<u64>,
) -> Self {
Self {
input_dirs,
output_dir,
broker_port,
cores: cores.into(),
use_cmplog,
iterations,
tokens_file,
timeout,
}
}
#[allow(clippy::needless_pass_by_value)]
pub fn run(&self, program: String, arguments: Vec<String>) {
forkserver::ForkserverBytesCoverageSugar::<{ forkserver::DEFAULT_MAP_SIZE }>::builder()
.input_dirs(&self.input_dirs)
.output_dir(self.output_dir.clone())
.broker_port(self.broker_port)
.cores(&self.cores)
.program(program)
.arguments(&arguments)
.use_cmplog(self.use_cmplog)
.timeout(self.timeout)
.tokens_file(self.tokens_file.clone())
.iterations(self.iterations)
.build()
.run();
}
}
pub fn register(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<ForkserverBytesCoverageSugar>()?;
Ok(())
}
}