use alloc::{
boxed::Box,
string::{String, ToString},
vec::Vec,
};
#[cfg(feature = "std")]
use core::sync::atomic::{compiler_fence, Ordering};
use core::{marker::PhantomData, time::Duration};
#[cfg(feature = "std")]
use std::net::{SocketAddr, ToSocketAddrs};
use serde::Deserialize;
#[cfg(feature = "std")]
use serde::{de::DeserializeOwned, Serialize};
#[cfg(feature = "std")]
use typed_builder::TypedBuilder;
use super::{CustomBufEventResult, CustomBufHandlerFn};
#[cfg(feature = "std")]
use crate::bolts::core_affinity::CoreId;
#[cfg(all(feature = "std", any(windows, not(feature = "fork"))))]
use crate::bolts::os::startable_self;
#[cfg(all(feature = "std", feature = "fork", unix))]
use crate::bolts::os::{fork, ForkResult};
#[cfg(feature = "llmp_compression")]
use crate::bolts::{
compress::GzipCompressor,
llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED},
};
#[cfg(feature = "std")]
use crate::bolts::{llmp::LlmpConnection, shmem::StdShMemProvider, staterestore::StateRestorer};
#[cfg(all(unix, feature = "std"))]
use crate::{
bolts::os::unix_signals::setup_signal_handler,
events::{shutdown_handler, SHUTDOWN_SIGHANDLER_DATA},
};
use crate::{
bolts::{
llmp::{self, LlmpClient, LlmpClientDescription, Tag},
shmem::ShMemProvider,
},
events::{
BrokerEventResult, Event, EventConfig, EventFirer, EventManager, EventManagerId,
EventProcessor, EventRestarter, HasCustomBufHandlers, HasEventManagerId, ProgressReporter,
},
executors::{Executor, HasObservers},
fuzzer::{EvaluatorObservers, ExecutionProcessor},
inputs::{Input, InputConverter, UsesInput},
monitors::Monitor,
state::{HasClientPerfMonitor, HasExecutions, HasMetadata, UsesState},
Error,
};
const _LLMP_TAG_EVENT_TO_CLIENT: Tag = 0x2C11E471;
const _LLMP_TAG_EVENT_TO_BROKER: Tag = 0x2B80438;
const LLMP_TAG_EVENT_TO_BOTH: Tag = 0x2B0741;
const _LLMP_TAG_RESTART: Tag = 0x8357A87;
const _LLMP_TAG_NO_RESTART: Tag = 0x57A7EE71;
#[cfg(feature = "llmp_compression")]
const COMPRESS_THRESHOLD: usize = 1024;
#[derive(Debug)]
pub struct LlmpEventBroker<I, MT, SP>
where
I: Input,
SP: ShMemProvider + 'static,
MT: Monitor,
{
monitor: MT,
llmp: llmp::LlmpBroker<SP>,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor,
phantom: PhantomData<I>,
}
impl<I, MT, SP> LlmpEventBroker<I, MT, SP>
where
I: Input,
SP: ShMemProvider + 'static,
MT: Monitor,
{
pub fn new(llmp: llmp::LlmpBroker<SP>, monitor: MT) -> Result<Self, Error> {
Ok(Self {
monitor,
llmp,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData,
})
}
#[cfg(feature = "std")]
pub fn new_on_port(shmem_provider: SP, monitor: MT, port: u16) -> Result<Self, Error> {
Ok(Self {
monitor,
llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData,
})
}
#[cfg(feature = "std")]
pub fn connect_b2b<A>(&mut self, addr: A) -> Result<(), Error>
where
A: ToSocketAddrs,
{
self.llmp.connect_b2b(addr)
}
#[cfg(not(feature = "llmp_broker_timeouts"))]
pub fn broker_loop(&mut self) -> Result<(), Error> {
let monitor = &mut self.monitor;
#[cfg(feature = "llmp_compression")]
let compressor = &self.compressor;
self.llmp.loop_forever(
&mut |client_id, tag, _flags, msg| {
if tag == LLMP_TAG_EVENT_TO_BOTH {
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
#[cfg(feature = "llmp_compression")]
let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = compressor.decompress(msg)?;
&compressed
} else {
msg
};
let event: Event<I> = postcard::from_bytes(event_bytes)?;
match Self::handle_in_broker(monitor, client_id, &event)? {
BrokerEventResult::Forward => Ok(llmp::LlmpMsgHookResult::ForwardToClients),
BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled),
}
} else {
Ok(llmp::LlmpMsgHookResult::ForwardToClients)
}
},
Some(Duration::from_millis(5)),
);
Ok(())
}
#[cfg(feature = "llmp_broker_timeouts")]
pub fn broker_loop(&mut self) -> Result<(), Error> {
let monitor = &mut self.monitor;
#[cfg(feature = "llmp_compression")]
let compressor = &self.compressor;
self.llmp.loop_with_timeouts(
&mut |msg_or_timeout| {
if let Some((client_id, tag, _flags, msg)) = msg_or_timeout {
if tag == LLMP_TAG_EVENT_TO_BOTH {
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
#[cfg(feature = "llmp_compression")]
let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = compressor.decompress(msg)?;
&compressed
} else {
msg
};
let event: Event<I> = postcard::from_bytes(event_bytes)?;
match Self::handle_in_broker(monitor, client_id, &event)? {
BrokerEventResult::Forward => {
Ok(llmp::LlmpMsgHookResult::ForwardToClients)
}
BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled),
}
} else {
Ok(llmp::LlmpMsgHookResult::ForwardToClients)
}
} else {
monitor.display("Timeout".into(), 0);
Ok(llmp::LlmpMsgHookResult::Handled)
}
},
Duration::from_millis(1000),
Some(Duration::from_millis(5)),
);
Ok(())
}
#[allow(clippy::unnecessary_wraps)]
fn handle_in_broker(
monitor: &mut MT,
client_id: u32,
event: &Event<I>,
) -> Result<BrokerEventResult, Error> {
match &event {
Event::NewTestcase {
input: _,
client_config: _,
exit_kind: _,
corpus_size,
observers_buf: _,
time,
executions,
} => {
let client = monitor.client_stats_mut_for(client_id);
client.update_corpus_size(*corpus_size as u64);
client.update_executions(*executions as u64, *time);
monitor.display(event.name().to_string(), client_id);
Ok(BrokerEventResult::Forward)
}
Event::UpdateExecStats {
time,
executions,
phantom: _,
} => {
let client = monitor.client_stats_mut_for(client_id);
client.update_executions(*executions as u64, *time);
monitor.display(event.name().to_string(), client_id);
Ok(BrokerEventResult::Handled)
}
Event::UpdateUserStats {
name,
value,
phantom: _,
} => {
let client = monitor.client_stats_mut_for(client_id);
client.update_user_stats(name.clone(), value.clone());
monitor.display(event.name().to_string(), client_id);
Ok(BrokerEventResult::Handled)
}
#[cfg(feature = "introspection")]
Event::UpdatePerfMonitor {
time,
executions,
introspection_monitor,
phantom: _,
} => {
let client = monitor.client_stats_mut_for(client_id);
client.update_executions(*executions as u64, *time);
client.update_introspection_monitor((**introspection_monitor).clone());
monitor.display(event.name().to_string(), client_id);
Ok(BrokerEventResult::Handled)
}
Event::Objective { objective_size } => {
let client = monitor.client_stats_mut_for(client_id);
client.update_objective_size(*objective_size as u64);
monitor.display(event.name().to_string(), client_id);
Ok(BrokerEventResult::Handled)
}
Event::Log {
severity_level,
message,
phantom: _,
} => {
let (_, _) = (severity_level, message);
#[cfg(feature = "std")]
println!("[LOG {severity_level}]: {message}");
Ok(BrokerEventResult::Handled)
}
Event::CustomBuf { .. } => Ok(BrokerEventResult::Forward),
}
}
}
pub struct LlmpEventManager<S, SP>
where
S: UsesInput,
SP: ShMemProvider + 'static,
{
llmp: LlmpClient<SP>,
custom_buf_handlers: Vec<Box<CustomBufHandlerFn<S>>>,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor,
configuration: EventConfig,
phantom: PhantomData<S>,
}
impl<S, SP> core::fmt::Debug for LlmpEventManager<S, SP>
where
SP: ShMemProvider + 'static,
S: UsesInput,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut debug_struct = f.debug_struct("LlmpEventManager");
let debug = debug_struct.field("llmp", &self.llmp);
#[cfg(feature = "llmp_compression")]
let debug = debug.field("compressor", &self.compressor);
debug
.field("configuration", &self.configuration)
.field("phantom", &self.phantom)
.finish_non_exhaustive()
}
}
impl<S, SP> Drop for LlmpEventManager<S, SP>
where
SP: ShMemProvider + 'static,
S: UsesInput,
{
fn drop(&mut self) {
self.await_restart_safe();
}
}
impl<S, SP> LlmpEventManager<S, SP>
where
S: UsesInput + HasExecutions + HasClientPerfMonitor,
SP: ShMemProvider + 'static,
{
pub fn new(llmp: LlmpClient<SP>, configuration: EventConfig) -> Result<Self, Error> {
Ok(Self {
llmp,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
configuration,
phantom: PhantomData,
custom_buf_handlers: vec![],
})
}
#[cfg(feature = "std")]
pub fn new_on_port(
shmem_provider: SP,
port: u16,
configuration: EventConfig,
) -> Result<Self, Error> {
Ok(Self {
llmp: LlmpClient::create_attach_to_tcp(shmem_provider, port)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
configuration,
phantom: PhantomData,
custom_buf_handlers: vec![],
})
}
#[cfg(feature = "std")]
pub fn existing_client_from_env(
shmem_provider: SP,
env_name: &str,
configuration: EventConfig,
) -> Result<Self, Error> {
Ok(Self {
llmp: LlmpClient::on_existing_from_env(shmem_provider, env_name)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
configuration,
phantom: PhantomData,
custom_buf_handlers: vec![],
})
}
pub fn describe(&self) -> Result<LlmpClientDescription, Error> {
self.llmp.describe()
}
pub fn existing_client_from_description(
shmem_provider: SP,
description: &LlmpClientDescription,
configuration: EventConfig,
) -> Result<Self, Error> {
Ok(Self {
llmp: LlmpClient::existing_client_from_description(shmem_provider, description)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
configuration,
phantom: PhantomData,
custom_buf_handlers: vec![],
})
}
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) {
self.llmp.to_env(env_name).unwrap();
}
#[allow(clippy::unused_self)]
fn handle_in_client<E, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
_client_id: u32,
event: Event<S::Input>,
) -> Result<(), Error>
where
E: Executor<Self, Z> + HasObservers<State = S>,
for<'a> E::Observers: Deserialize<'a>,
Z: ExecutionProcessor<E::Observers, State = S> + EvaluatorObservers<E::Observers>,
{
match event {
Event::NewTestcase {
input,
client_config,
exit_kind,
corpus_size: _,
observers_buf,
time: _,
executions: _,
} => {
#[cfg(feature = "std")]
println!("Received new Testcase from {_client_id} ({client_config:?})");
let _res = if client_config.match_with(&self.configuration)
&& observers_buf.is_some()
{
let observers: E::Observers =
postcard::from_bytes(observers_buf.as_ref().unwrap())?;
fuzzer.process_execution(state, self, input, &observers, &exit_kind, false)?
} else {
fuzzer.evaluate_input_with_observers::<E, Self>(
state, executor, self, input, false,
)?
};
#[cfg(feature = "std")]
if let Some(item) = _res.1 {
println!("Added received Testcase as item #{item}");
}
Ok(())
}
Event::CustomBuf { tag, buf } => {
for handler in &mut self.custom_buf_handlers {
if handler(state, &tag, &buf)? == CustomBufEventResult::Handled {
break;
}
}
Ok(())
}
_ => Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.",
event.name()
))),
}
}
}
impl<S, SP> UsesState for LlmpEventManager<S, SP>
where
S: UsesInput,
SP: ShMemProvider,
{
type State = S;
}
impl<S, SP> EventFirer for LlmpEventManager<S, SP>
where
S: UsesInput,
SP: ShMemProvider,
{
#[cfg(feature = "llmp_compression")]
fn fire(
&mut self,
_state: &mut Self::State,
event: Event<<Self::State as UsesInput>::Input>,
) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?;
let flags = LLMP_FLAG_INITIALIZED;
match self.compressor.compress(&serialized)? {
Some(comp_buf) => {
self.llmp.send_buf_with_flags(
LLMP_TAG_EVENT_TO_BOTH,
flags | LLMP_FLAG_COMPRESSED,
&comp_buf,
)?;
}
None => {
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
}
}
Ok(())
}
#[cfg(not(feature = "llmp_compression"))]
fn fire(
&mut self,
_state: &mut Self::State,
event: Event<<Self::State as UsesInput>::Input>,
) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?;
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
Ok(())
}
fn configuration(&self) -> EventConfig {
self.configuration
}
}
impl<S, SP> EventRestarter for LlmpEventManager<S, SP>
where
S: UsesInput,
SP: ShMemProvider,
{
fn await_restart_safe(&mut self) {
self.llmp.await_safe_to_unmap_blocking();
}
}
impl<E, S, SP, Z> EventProcessor<E, Z> for LlmpEventManager<S, SP>
where
S: UsesInput + HasClientPerfMonitor + HasExecutions,
SP: ShMemProvider,
E: HasObservers<State = S> + Executor<Self, Z>,
for<'a> E::Observers: Deserialize<'a>,
Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers, State = S>,
{
fn process(
&mut self,
fuzzer: &mut Z,
state: &mut Self::State,
executor: &mut E,
) -> Result<usize, Error> {
let self_id = self.llmp.sender.id;
let mut count = 0;
while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? {
assert!(
tag != _LLMP_TAG_EVENT_TO_BROKER,
"EVENT_TO_BROKER parcel should not have arrived in the client!"
);
if client_id == self_id {
continue;
}
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
#[cfg(feature = "llmp_compression")]
let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = self.compressor.decompress(msg)?;
&compressed
} else {
msg
};
let event: Event<S::Input> = postcard::from_bytes(event_bytes)?;
self.handle_in_client(fuzzer, executor, state, client_id, event)?;
count += 1;
}
Ok(count)
}
}
impl<E, S, SP, Z> EventManager<E, Z> for LlmpEventManager<S, SP>
where
E: HasObservers<State = S> + Executor<Self, Z>,
for<'a> E::Observers: Deserialize<'a>,
S: UsesInput + HasExecutions + HasClientPerfMonitor + HasMetadata,
SP: ShMemProvider,
Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers, State = S>,
{
}
impl<S, SP> HasCustomBufHandlers for LlmpEventManager<S, SP>
where
S: UsesInput,
SP: ShMemProvider,
{
fn add_custom_buf_handler(
&mut self,
handler: Box<dyn FnMut(&mut S, &String, &[u8]) -> Result<CustomBufEventResult, Error>>,
) {
self.custom_buf_handlers.push(handler);
}
}
impl<S, SP> ProgressReporter for LlmpEventManager<S, SP>
where
S: UsesInput + HasExecutions + HasClientPerfMonitor + HasMetadata,
SP: ShMemProvider,
{
}
impl<S, SP> HasEventManagerId for LlmpEventManager<S, SP>
where
S: UsesInput,
SP: ShMemProvider,
{
fn mgr_id(&self) -> EventManagerId {
EventManagerId {
id: self.llmp.sender.id as usize,
}
}
}
#[cfg(feature = "std")]
#[derive(Debug)]
pub struct LlmpRestartingEventManager<S, SP>
where
S: UsesInput,
SP: ShMemProvider + 'static,
{
llmp_mgr: LlmpEventManager<S, SP>,
staterestorer: StateRestorer<SP>,
}
#[cfg(feature = "std")]
impl<S, SP> UsesState for LlmpRestartingEventManager<S, SP>
where
S: UsesInput,
SP: ShMemProvider + 'static,
{
type State = S;
}
#[cfg(feature = "std")]
impl<S, SP> ProgressReporter for LlmpRestartingEventManager<S, SP>
where
S: UsesInput + HasExecutions + HasClientPerfMonitor + HasMetadata + Serialize,
SP: ShMemProvider,
{
}
#[cfg(feature = "std")]
impl<S, SP> EventFirer for LlmpRestartingEventManager<S, SP>
where
SP: ShMemProvider,
S: UsesInput,
{
fn fire(
&mut self,
state: &mut Self::State,
event: Event<<Self::State as UsesInput>::Input>,
) -> Result<(), Error> {
self.llmp_mgr.fire(state, event)
}
fn configuration(&self) -> EventConfig {
self.llmp_mgr.configuration()
}
}
#[cfg(feature = "std")]
impl<S, SP> EventRestarter for LlmpRestartingEventManager<S, SP>
where
S: UsesInput + HasExecutions + HasClientPerfMonitor + Serialize,
SP: ShMemProvider,
{
#[inline]
fn await_restart_safe(&mut self) {
self.llmp_mgr.await_restart_safe();
}
fn on_restart(&mut self, state: &mut S) -> Result<(), Error> {
self.staterestorer.reset();
self.staterestorer
.save(&(state, &self.llmp_mgr.describe()?))
}
}
#[cfg(feature = "std")]
impl<E, S, SP, Z> EventProcessor<E, Z> for LlmpRestartingEventManager<S, SP>
where
E: HasObservers<State = S> + Executor<LlmpEventManager<S, SP>, Z>,
for<'a> E::Observers: Deserialize<'a>,
S: UsesInput + HasExecutions + HasClientPerfMonitor,
SP: ShMemProvider + 'static,
Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers>, {
fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> {
self.llmp_mgr.process(fuzzer, state, executor)
}
}
#[cfg(feature = "std")]
impl<E, S, SP, Z> EventManager<E, Z> for LlmpRestartingEventManager<S, SP>
where
E: HasObservers<State = S> + Executor<LlmpEventManager<S, SP>, Z>,
for<'a> E::Observers: Deserialize<'a>,
S: UsesInput + HasExecutions + HasClientPerfMonitor + HasMetadata + Serialize,
SP: ShMemProvider + 'static,
Z: EvaluatorObservers<E::Observers, State = S> + ExecutionProcessor<E::Observers>, {
}
#[cfg(feature = "std")]
impl<S, SP> HasEventManagerId for LlmpRestartingEventManager<S, SP>
where
S: UsesInput + Serialize,
SP: ShMemProvider + 'static,
{
fn mgr_id(&self) -> EventManagerId {
self.llmp_mgr.mgr_id()
}
}
const _ENV_FUZZER_SENDER: &str = "_AFL_ENV_FUZZER_SENDER";
const _ENV_FUZZER_RECEIVER: &str = "_AFL_ENV_FUZZER_RECEIVER";
const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT";
#[cfg(feature = "std")]
impl<S, SP> LlmpRestartingEventManager<S, SP>
where
S: UsesInput,
SP: ShMemProvider + 'static,
{
pub fn new(llmp_mgr: LlmpEventManager<S, SP>, staterestorer: StateRestorer<SP>) -> Self {
Self {
llmp_mgr,
staterestorer,
}
}
pub fn staterestorer(&self) -> &StateRestorer<SP> {
&self.staterestorer
}
pub fn staterestorer_mut(&mut self) -> &mut StateRestorer<SP> {
&mut self.staterestorer
}
}
#[cfg(feature = "std")]
#[derive(Debug, Clone, Copy)]
pub enum ManagerKind {
Any,
Client {
cpu_core: Option<CoreId>,
},
Broker,
}
#[cfg(feature = "std")]
#[allow(clippy::type_complexity)]
pub fn setup_restarting_mgr_std<MT, S>(
monitor: MT,
broker_port: u16,
configuration: EventConfig,
) -> Result<(Option<S>, LlmpRestartingEventManager<S, StdShMemProvider>), Error>
where
MT: Monitor + Clone,
S: DeserializeOwned + UsesInput + HasClientPerfMonitor + HasExecutions,
{
RestartingMgr::builder()
.shmem_provider(StdShMemProvider::new()?)
.monitor(Some(monitor))
.broker_port(broker_port)
.configuration(configuration)
.build()
.launch()
}
#[cfg(feature = "std")]
#[allow(clippy::default_trait_access)]
#[derive(TypedBuilder, Debug)]
pub struct RestartingMgr<MT, S, SP>
where
S: UsesInput + DeserializeOwned,
SP: ShMemProvider + 'static,
MT: Monitor,
{
shmem_provider: SP,
configuration: EventConfig,
#[builder(default = None)]
monitor: Option<MT>,
#[builder(default = 1337_u16)]
broker_port: u16,
#[builder(default = None)]
remote_broker_addr: Option<SocketAddr>,
#[builder(default = ManagerKind::Any)]
kind: ManagerKind,
#[builder(setter(skip), default = PhantomData)]
phantom_data: PhantomData<S>,
}
#[cfg(feature = "std")]
#[allow(clippy::type_complexity, clippy::too_many_lines)]
impl<MT, S, SP> RestartingMgr<MT, S, SP>
where
SP: ShMemProvider,
S: UsesInput + HasExecutions + HasClientPerfMonitor + DeserializeOwned,
MT: Monitor + Clone,
{
pub fn launch(&mut self) -> Result<(Option<S>, LlmpRestartingEventManager<S, SP>), Error> {
let (staterestorer, new_shmem_provider, core_id) = if std::env::var(_ENV_FUZZER_SENDER)
.is_err()
{
let broker_things = |mut broker: LlmpEventBroker<S::Input, MT, SP>,
remote_broker_addr| {
if let Some(remote_broker_addr) = remote_broker_addr {
println!("B2b: Connecting to {:?}", &remote_broker_addr);
broker.connect_b2b(remote_broker_addr)?;
};
broker.broker_loop()
};
let (mgr, core_id) = match self.kind {
ManagerKind::Any => {
let connection =
LlmpConnection::on_port(self.shmem_provider.clone(), self.broker_port)?;
match connection {
LlmpConnection::IsBroker { broker } => {
let event_broker = LlmpEventBroker::<S::Input, MT, SP>::new(
broker,
self.monitor.take().unwrap(),
)?;
println!(
"Doing broker things. Run this tool again to start fuzzing in a client."
);
broker_things(event_broker, self.remote_broker_addr)?;
return Err(Error::shutting_down());
}
LlmpConnection::IsClient { client } => {
let mgr = LlmpEventManager::<S, SP>::new(client, self.configuration)?;
(mgr, None)
}
}
}
ManagerKind::Broker => {
let event_broker = LlmpEventBroker::<S::Input, MT, SP>::new_on_port(
self.shmem_provider.clone(),
self.monitor.take().unwrap(),
self.broker_port,
)?;
broker_things(event_broker, self.remote_broker_addr)?;
return Err(Error::shutting_down());
}
ManagerKind::Client { cpu_core } => {
let mgr = LlmpEventManager::<S, SP>::new_on_port(
self.shmem_provider.clone(),
self.broker_port,
self.configuration,
)?;
(mgr, cpu_core)
}
};
if let Some(core_id) = core_id {
let core_id: CoreId = core_id;
println!("Setting core affinity to {core_id:?}");
core_id.set_affinity()?;
}
mgr.to_env(_ENV_FUZZER_BROKER_CLIENT_INITIAL);
#[cfg(unix)]
let mut staterestorer: StateRestorer<SP> =
StateRestorer::new(self.shmem_provider.new_shmem(256 * 1024 * 1024)?);
#[cfg(not(unix))]
let staterestorer: StateRestorer<SP> =
StateRestorer::new(self.shmem_provider.new_shmem(256 * 1024 * 1024)?);
staterestorer.write_to_env(_ENV_FUZZER_SENDER)?;
#[cfg(unix)]
unsafe {
let data = &mut SHUTDOWN_SIGHANDLER_DATA;
core::ptr::write_volatile(
&mut data.staterestorer_ptr,
&mut staterestorer as *mut _ as *mut std::ffi::c_void,
);
data.allocator_pid = std::process::id() as usize;
data.shutdown_handler = shutdown_handler::<SP> as *const std::ffi::c_void;
}
#[cfg(unix)]
if let Err(_e) = unsafe { setup_signal_handler(&mut SHUTDOWN_SIGHANDLER_DATA) } {
#[cfg(feature = "std")]
println!("Failed to setup signal handlers: {_e}");
}
let mut ctr: u64 = 0;
loop {
println!("Spawning next client (id {ctr})");
#[cfg(all(unix, feature = "fork"))]
let child_status = {
self.shmem_provider.pre_fork()?;
match unsafe { fork() }? {
ForkResult::Parent(handle) => {
self.shmem_provider.post_fork(false)?;
handle.status()
}
ForkResult::Child => {
self.shmem_provider.post_fork(true)?;
break (staterestorer, self.shmem_provider.clone(), core_id);
}
}
};
#[cfg(any(windows, not(feature = "fork")))]
let child_status = startable_self()?.status()?;
#[cfg(all(unix, not(feature = "fork")))]
let child_status = child_status.code().unwrap_or_default();
compiler_fence(Ordering::SeqCst);
#[allow(clippy::manual_assert)]
if !staterestorer.has_content() {
#[cfg(unix)]
if child_status == 137 {
panic!("Fuzzer-respawner: The fuzzed target crashed with an out of memory error! Fix your harness, or switch to another executor (for example, a forkserver).");
}
panic!("Fuzzer-respawner: Storing state in crashed fuzzer instance did not work, no point to spawn the next client! This can happen if the child calls `exit()`, in that case make sure it uses `abort()`, if it got killed unrecoverable (OOM), or if there is a bug in the fuzzer itself. (Child exited with: {child_status})");
}
ctr = ctr.wrapping_add(1);
}
} else {
(
StateRestorer::from_env(&mut self.shmem_provider, _ENV_FUZZER_SENDER)?,
self.shmem_provider.clone(),
None,
)
};
if let Some(core_id) = core_id {
let core_id: CoreId = core_id;
core_id.set_affinity()?;
}
let (state, mut mgr) = if let Some((state, mgr_description)) = staterestorer.restore()? {
(
Some(state),
LlmpRestartingEventManager::new(
LlmpEventManager::existing_client_from_description(
new_shmem_provider,
&mgr_description,
self.configuration,
)?,
staterestorer,
),
)
} else {
println!("First run. Let's set it all up");
let mgr = LlmpEventManager::<S, SP>::existing_client_from_env(
new_shmem_provider,
_ENV_FUZZER_BROKER_CLIENT_INITIAL,
self.configuration,
)?;
(None, LlmpRestartingEventManager::new(mgr, staterestorer))
};
mgr.staterestorer.reset();
Ok((state, mgr))
}
}
pub struct LlmpEventConverter<IC, ICB, DI, S, SP>
where
S: UsesInput,
SP: ShMemProvider + 'static,
IC: InputConverter<From = S::Input, To = DI>,
ICB: InputConverter<From = DI, To = S::Input>,
DI: Input,
{
llmp: LlmpClient<SP>,
custom_buf_handlers: Vec<Box<CustomBufHandlerFn<S>>>,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor,
converter: Option<IC>,
converter_back: Option<ICB>,
phantom: PhantomData<S>,
}
impl<IC, ICB, DI, S, SP> core::fmt::Debug for LlmpEventConverter<IC, ICB, DI, S, SP>
where
SP: ShMemProvider + 'static,
S: UsesInput,
IC: InputConverter<From = S::Input, To = DI>,
ICB: InputConverter<From = DI, To = S::Input>,
DI: Input,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
let mut debug_struct = f.debug_struct("LlmpEventConverter");
let debug = debug_struct.field("llmp", &self.llmp);
#[cfg(feature = "llmp_compression")]
let debug = debug.field("compressor", &self.compressor);
debug
.field("converter", &self.converter)
.field("converter_back", &self.converter_back)
.field("phantom", &self.phantom)
.finish_non_exhaustive()
}
}
impl<IC, ICB, DI, S, SP> LlmpEventConverter<IC, ICB, DI, S, SP>
where
S: UsesInput + HasExecutions + HasClientPerfMonitor,
SP: ShMemProvider + 'static,
IC: InputConverter<From = S::Input, To = DI>,
ICB: InputConverter<From = DI, To = S::Input>,
DI: Input,
{
pub fn new(
llmp: LlmpClient<SP>,
converter: Option<IC>,
converter_back: Option<ICB>,
) -> Result<Self, Error> {
Ok(Self {
llmp,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
converter,
converter_back,
phantom: PhantomData,
custom_buf_handlers: vec![],
})
}
#[cfg(feature = "std")]
pub fn new_on_port(
shmem_provider: SP,
port: u16,
converter: Option<IC>,
converter_back: Option<ICB>,
) -> Result<Self, Error> {
Ok(Self {
llmp: LlmpClient::create_attach_to_tcp(shmem_provider, port)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
converter,
converter_back,
phantom: PhantomData,
custom_buf_handlers: vec![],
})
}
#[cfg(feature = "std")]
pub fn existing_client_from_env(
shmem_provider: SP,
env_name: &str,
converter: Option<IC>,
converter_back: Option<ICB>,
) -> Result<Self, Error> {
Ok(Self {
llmp: LlmpClient::on_existing_from_env(shmem_provider, env_name)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData,
converter,
converter_back,
custom_buf_handlers: vec![],
})
}
pub fn can_convert(&self) -> bool {
self.converter.is_some()
}
pub fn can_convert_back(&self) -> bool {
self.converter_back.is_some()
}
pub fn describe(&self) -> Result<LlmpClientDescription, Error> {
self.llmp.describe()
}
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) {
self.llmp.to_env(env_name).unwrap();
}
fn handle_in_client<E, EM, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
manager: &mut EM,
_client_id: u32,
event: Event<DI>,
) -> Result<(), Error>
where
E: Executor<EM, Z> + HasObservers<State = S>,
EM: UsesState<State = S> + EventFirer,
for<'a> E::Observers: Deserialize<'a>,
Z: ExecutionProcessor<E::Observers, State = S> + EvaluatorObservers<E::Observers>,
{
match event {
Event::NewTestcase {
input,
client_config: _,
exit_kind: _,
corpus_size: _,
observers_buf: _, time: _,
executions: _,
} => {
#[cfg(feature = "std")]
println!("Received new Testcase to convert from {_client_id}");
let Some(converter) = self.converter_back.as_mut() else {
return Ok(());
};
let _res = fuzzer.evaluate_input_with_observers::<E, EM>(
state,
executor,
manager,
converter.convert(input)?,
false,
)?;
#[cfg(feature = "std")]
if let Some(item) = _res.1 {
println!("Added received Testcase as item #{item}");
}
Ok(())
}
Event::CustomBuf { tag, buf } => {
for handler in &mut self.custom_buf_handlers {
if handler(state, &tag, &buf)? == CustomBufEventResult::Handled {
break;
}
}
Ok(())
}
_ => Err(Error::unknown(format!(
"Received illegal message that message should not have arrived: {:?}.",
event.name()
))),
}
}
#[allow(clippy::unused_self)]
pub fn process<E, EM, Z>(
&mut self,
fuzzer: &mut Z,
state: &mut S,
executor: &mut E,
manager: &mut EM,
) -> Result<usize, Error>
where
E: Executor<EM, Z> + HasObservers<State = S>,
EM: UsesState<State = S> + EventFirer,
for<'a> E::Observers: Deserialize<'a>,
Z: ExecutionProcessor<E::Observers, State = S> + EvaluatorObservers<E::Observers>,
{
let self_id = self.llmp.sender.id;
let mut count = 0;
while let Some((client_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? {
assert!(
tag != _LLMP_TAG_EVENT_TO_BROKER,
"EVENT_TO_BROKER parcel should not have arrived in the client!"
);
if client_id == self_id {
continue;
}
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
#[cfg(feature = "llmp_compression")]
let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = self.compressor.decompress(msg)?;
&compressed
} else {
msg
};
let event: Event<DI> = postcard::from_bytes(event_bytes)?;
self.handle_in_client(fuzzer, executor, state, manager, client_id, event)?;
count += 1;
}
Ok(count)
}
}
impl<IC, ICB, DI, S, SP> UsesState for LlmpEventConverter<IC, ICB, DI, S, SP>
where
S: UsesInput,
SP: ShMemProvider,
IC: InputConverter<From = S::Input, To = DI>,
ICB: InputConverter<From = DI, To = S::Input>,
DI: Input,
{
type State = S;
}
impl<IC, ICB, DI, S, SP> EventFirer for LlmpEventConverter<IC, ICB, DI, S, SP>
where
S: UsesInput,
SP: ShMemProvider,
IC: InputConverter<From = S::Input, To = DI>,
ICB: InputConverter<From = DI, To = S::Input>,
DI: Input,
{
#[cfg(feature = "llmp_compression")]
fn fire(
&mut self,
_state: &mut Self::State,
event: Event<<Self::State as UsesInput>::Input>,
) -> Result<(), Error> {
if self.converter.is_none() {
return Ok(());
}
let converted_event = match event {
Event::NewTestcase {
input,
client_config,
exit_kind,
corpus_size,
observers_buf,
time,
executions,
} => Event::NewTestcase {
input: self.converter.as_mut().unwrap().convert(input)?,
client_config,
exit_kind,
corpus_size,
observers_buf,
time,
executions,
},
Event::CustomBuf { buf, tag } => Event::CustomBuf { buf, tag },
_ => {
return Ok(());
}
};
let serialized = postcard::to_allocvec(&converted_event)?;
let flags = LLMP_FLAG_INITIALIZED;
match self.compressor.compress(&serialized)? {
Some(comp_buf) => {
self.llmp.send_buf_with_flags(
LLMP_TAG_EVENT_TO_BOTH,
flags | LLMP_FLAG_COMPRESSED,
&comp_buf,
)?;
}
None => {
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
}
}
Ok(())
}
#[cfg(not(feature = "llmp_compression"))]
fn fire(
&mut self,
_state: &mut Self::State,
event: Event<<Self::State as UsesInput>::Input>,
) -> Result<(), Error> {
if self.converter.is_none() {
return Ok(());
}
let converted_event = match event {
Event::NewTestcase {
input,
client_config,
exit_kind,
corpus_size,
observers_buf,
time,
executions,
} => Event::NewTestcase {
input: self.converter.as_mut().unwrap().convert(input)?,
client_config,
exit_kind,
corpus_size,
observers_buf,
time,
executions,
},
Event::CustomBuf { buf, tag } => Event::CustomBuf { buf, tag },
_ => {
return Ok(());
}
};
let serialized = postcard::to_allocvec(&converted_event)?;
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
Ok(())
}
}
#[cfg(test)]
#[cfg(feature = "std")]
mod tests {
use core::sync::atomic::{compiler_fence, Ordering};
use serial_test::serial;
use crate::{
bolts::{
llmp::{LlmpClient, LlmpSharedMap},
rands::StdRand,
shmem::{ShMemProvider, StdShMemProvider},
staterestore::StateRestorer,
tuples::tuple_list,
},
corpus::{Corpus, InMemoryCorpus, Testcase},
events::{llmp::_ENV_FUZZER_SENDER, LlmpEventManager},
executors::{ExitKind, InProcessExecutor},
feedbacks::ConstFeedback,
fuzzer::Fuzzer,
inputs::BytesInput,
mutators::BitFlipMutator,
schedulers::RandScheduler,
stages::StdMutationalStage,
state::StdState,
StdFuzzer,
};
#[test]
#[serial]
fn test_mgr_state_restore() {
let rand = StdRand::with_seed(0);
let mut corpus = InMemoryCorpus::<BytesInput>::new();
let testcase = Testcase::new(vec![0; 4].into());
corpus.add(testcase).unwrap();
let solutions = InMemoryCorpus::<BytesInput>::new();
let mut feedback = ConstFeedback::new(false);
let mut objective = ConstFeedback::new(false);
let mut state =
StdState::new(rand, corpus, solutions, &mut feedback, &mut objective).unwrap();
let mut shmem_provider = StdShMemProvider::new().unwrap();
let mut llmp_client = LlmpClient::new(
shmem_provider.clone(),
LlmpSharedMap::new(0, shmem_provider.new_shmem(1024).unwrap()),
0,
)
.unwrap();
unsafe {
llmp_client.mark_safe_to_unmap();
}
let mut llmp_mgr = LlmpEventManager::new(llmp_client, "fuzzer".into()).unwrap();
let scheduler = RandScheduler::new();
let feedback = ConstFeedback::new(true);
let objective = ConstFeedback::new(false);
let mut fuzzer = StdFuzzer::new(scheduler, feedback, objective);
let mut harness = |_buf: &BytesInput| ExitKind::Ok;
let mut executor = InProcessExecutor::new(
&mut harness,
tuple_list!(),
&mut fuzzer,
&mut state,
&mut llmp_mgr,
)
.unwrap();
let mutator = BitFlipMutator::new();
let mut stages = tuple_list!(StdMutationalStage::new(mutator));
let mut staterestorer = StateRestorer::<StdShMemProvider>::new(
shmem_provider.new_shmem(256 * 1024 * 1024).unwrap(),
);
staterestorer.reset();
staterestorer
.save(&(&mut state, &llmp_mgr.describe().unwrap()))
.unwrap();
assert!(staterestorer.has_content());
staterestorer.write_to_env(_ENV_FUZZER_SENDER).unwrap();
compiler_fence(Ordering::SeqCst);
let sc_cpy = StateRestorer::from_env(&mut shmem_provider, _ENV_FUZZER_SENDER).unwrap();
assert!(sc_cpy.has_content());
let (mut state_clone, mgr_description) = staterestorer.restore().unwrap().unwrap();
let mut llmp_clone = LlmpEventManager::existing_client_from_description(
shmem_provider,
&mgr_description,
"fuzzer".into(),
)
.unwrap();
if false {
fuzzer
.fuzz_one(
&mut stages,
&mut executor,
&mut state_clone,
&mut llmp_clone,
)
.unwrap();
}
}
}