use alloc::{
string::{String, ToString},
vec::Vec,
};
use core::{marker::PhantomData, time::Duration};
use core_affinity::CoreId;
use serde::{de::DeserializeOwned, Serialize};
#[cfg(feature = "std")]
use core::ptr::{addr_of, read_volatile};
#[cfg(feature = "std")]
use crate::bolts::{
llmp::{LlmpClient, LlmpConnection, LlmpReceiver},
shmem::StdShMemProvider,
};
#[cfg(feature = "std")]
use std::net::{SocketAddr, ToSocketAddrs};
use crate::{
bolts::{
llmp::{self, Flags, LlmpClientDescription, LlmpSender, Tag},
shmem::ShMemProvider,
},
events::{
BrokerEventResult, Event, EventFirer, EventManager, EventManagerId, EventProcessor,
EventRestarter, HasEventManagerId,
},
executors::{Executor, HasObservers},
fuzzer::{EvaluatorObservers, ExecutionProcessor},
inputs::Input,
observers::ObserversTuple,
stats::Stats,
Error,
};
#[cfg(feature = "llmp_compression")]
use crate::bolts::{
compress::GzipCompressor,
llmp::{LLMP_FLAG_COMPRESSED, LLMP_FLAG_INITIALIZED},
};
#[cfg(all(feature = "std", windows))]
use crate::bolts::os::startable_self;
#[cfg(all(feature = "std", unix))]
use crate::bolts::os::{fork, ForkResult};
#[cfg(all(target_os = "android", feature = "std"))]
use crate::bolts::os::ashmem_server::AshmemService;
#[cfg(feature = "std")]
use typed_builder::TypedBuilder;
const _LLMP_TAG_EVENT_TO_CLIENT: llmp::Tag = 0x2C11E471;
const _LLMP_TAG_EVENT_TO_BROKER: llmp::Tag = 0x2B80438;
const LLMP_TAG_EVENT_TO_BOTH: llmp::Tag = 0x2B0741;
const _LLMP_TAG_RESTART: llmp::Tag = 0x8357A87;
const _LLMP_TAG_NO_RESTART: llmp::Tag = 0x57A7EE71;
#[cfg(feature = "llmp_compression")]
const COMPRESS_THRESHOLD: usize = 1024;
#[derive(Debug)]
pub struct LlmpEventBroker<I, SP, ST>
where
I: Input,
SP: ShMemProvider + 'static,
ST: Stats,
{
stats: ST,
llmp: llmp::LlmpBroker<SP>,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor,
phantom: PhantomData<I>,
}
impl<I, SP, ST> LlmpEventBroker<I, SP, ST>
where
I: Input,
SP: ShMemProvider + 'static,
ST: Stats,
{
pub fn new(llmp: llmp::LlmpBroker<SP>, stats: ST) -> Result<Self, Error> {
Ok(Self {
stats,
llmp,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData,
})
}
#[cfg(feature = "std")]
pub fn new_on_port(shmem_provider: SP, stats: ST, port: u16) -> Result<Self, Error> {
Ok(Self {
stats,
llmp: llmp::LlmpBroker::create_attach_to_tcp(shmem_provider, port)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
phantom: PhantomData,
})
}
#[cfg(feature = "std")]
pub fn connect_b2b<A>(&mut self, addr: A) -> Result<(), Error>
where
A: ToSocketAddrs,
{
self.llmp.connect_b2b(addr)
}
pub fn broker_loop(&mut self) -> Result<(), Error> {
let stats = &mut self.stats;
#[cfg(feature = "llmp_compression")]
let compressor = &self.compressor;
self.llmp.loop_forever(
&mut |sender_id: u32, tag: Tag, _flags: Flags, msg: &[u8]| {
if tag == LLMP_TAG_EVENT_TO_BOTH {
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
#[cfg(feature = "llmp_compression")]
let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = compressor.decompress(msg)?;
&compressed
} else {
msg
};
let event: Event<I> = postcard::from_bytes(event_bytes)?;
match Self::handle_in_broker(stats, sender_id, &event)? {
BrokerEventResult::Forward => Ok(llmp::LlmpMsgHookResult::ForwardToClients),
BrokerEventResult::Handled => Ok(llmp::LlmpMsgHookResult::Handled),
}
} else {
Ok(llmp::LlmpMsgHookResult::ForwardToClients)
}
},
Some(Duration::from_millis(5)),
);
Ok(())
}
#[allow(clippy::unnecessary_wraps)]
fn handle_in_broker(
stats: &mut ST,
sender_id: u32,
event: &Event<I>,
) -> Result<BrokerEventResult, Error> {
match &event {
Event::NewTestcase {
input: _,
client_config: _,
exit_kind: _,
corpus_size,
observers_buf: _,
time,
executions,
} => {
let client = stats.client_stats_mut_for(sender_id);
client.update_corpus_size(*corpus_size as u64);
client.update_executions(*executions as u64, *time);
stats.display(event.name().to_string(), sender_id);
Ok(BrokerEventResult::Forward)
}
Event::UpdateStats {
time,
executions,
phantom: _,
} => {
let client = stats.client_stats_mut_for(sender_id);
client.update_executions(*executions as u64, *time);
stats.display(event.name().to_string(), sender_id);
Ok(BrokerEventResult::Handled)
}
Event::UpdateUserStats {
name,
value,
phantom: _,
} => {
let client = stats.client_stats_mut_for(sender_id);
client.update_user_stats(name.clone(), value.clone());
stats.display(event.name().to_string(), sender_id);
Ok(BrokerEventResult::Handled)
}
#[cfg(feature = "introspection")]
Event::UpdatePerfStats {
time,
executions,
introspection_stats,
phantom: _,
} => {
let client = stats.client_stats_mut_for(sender_id);
client.update_executions(*executions as u64, *time);
client.update_introspection_stats((**introspection_stats).clone());
stats.display(event.name().to_string(), sender_id);
Ok(BrokerEventResult::Handled)
}
Event::Objective { objective_size } => {
let client = stats.client_stats_mut_for(sender_id);
client.update_objective_size(*objective_size as u64);
stats.display(event.name().to_string(), sender_id);
Ok(BrokerEventResult::Handled)
}
Event::Log {
severity_level,
message,
phantom: _,
} => {
let (_, _) = (severity_level, message);
#[cfg(feature = "std")]
println!("[LOG {}]: {}", severity_level, message);
Ok(BrokerEventResult::Handled)
} }
}
}
#[derive(Debug)]
pub struct LlmpEventManager<I, OT, S, SP>
where
I: Input,
OT: ObserversTuple<I, S>,
SP: ShMemProvider + 'static,
{
llmp: llmp::LlmpClient<SP>,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor,
configuration: String,
phantom: PhantomData<(I, OT, S)>,
}
impl<I, OT, S, SP> Drop for LlmpEventManager<I, OT, S, SP>
where
I: Input,
OT: ObserversTuple<I, S>,
SP: ShMemProvider + 'static,
{
fn drop(&mut self) {
self.await_restart_safe();
}
}
impl<I, OT, S, SP> LlmpEventManager<I, OT, S, SP>
where
I: Input,
OT: ObserversTuple<I, S>,
SP: ShMemProvider + 'static,
{
pub fn new(llmp: llmp::LlmpClient<SP>, configuration: String) -> Result<Self, Error> {
Ok(Self {
llmp,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
configuration,
phantom: PhantomData,
})
}
#[cfg(feature = "std")]
pub fn new_on_port(
shmem_provider: SP,
port: u16,
configuration: String,
) -> Result<Self, Error> {
Ok(Self {
llmp: llmp::LlmpClient::create_attach_to_tcp(shmem_provider, port)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
configuration,
phantom: PhantomData,
})
}
#[cfg(feature = "std")]
pub fn existing_client_from_env(
shmem_provider: SP,
env_name: &str,
configuration: String,
) -> Result<Self, Error> {
Ok(Self {
llmp: LlmpClient::on_existing_from_env(shmem_provider, env_name)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
configuration,
phantom: PhantomData,
})
}
pub fn describe(&self) -> Result<LlmpClientDescription, Error> {
self.llmp.describe()
}
pub fn existing_client_from_description(
shmem_provider: SP,
description: &LlmpClientDescription,
configuration: String,
) -> Result<Self, Error> {
Ok(Self {
llmp: llmp::LlmpClient::existing_client_from_description(shmem_provider, description)?,
#[cfg(feature = "llmp_compression")]
compressor: GzipCompressor::new(COMPRESS_THRESHOLD),
configuration,
phantom: PhantomData,
})
}
#[cfg(feature = "std")]
pub fn to_env(&self, env_name: &str) {
self.llmp.to_env(env_name).unwrap();
}
#[allow(clippy::unused_self)]
fn handle_in_client<E, Z>(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
_sender_id: u32,
event: Event<I>,
) -> Result<(), Error>
where
OT: ObserversTuple<I, S> + serde::de::DeserializeOwned,
E: Executor<Self, I, S, Z> + HasObservers<I, OT, S>,
Z: ExecutionProcessor<I, OT, S> + EvaluatorObservers<I, OT, S>,
{
match event {
Event::NewTestcase {
input,
client_config,
exit_kind,
corpus_size: _,
observers_buf,
time: _,
executions: _,
} => {
#[cfg(feature = "std")]
println!(
"Received new Testcase from {} ({}) {:?}",
_sender_id, client_config, input
);
let _res = if client_config == self.configuration {
let observers: OT = postcard::from_bytes(&observers_buf)?;
fuzzer.process_execution(state, self, input, &observers, &exit_kind, false)?
} else {
fuzzer.evaluate_input_with_observers(state, executor, self, input, false)?
};
#[cfg(feature = "std")]
if let Some(item) = _res.1 {
println!("Added received Testcase as item #{}", item);
}
Ok(())
}
_ => Err(Error::Unknown(format!(
"Received illegal message that message should not have arrived: {:?}.",
event.name()
))),
}
}
}
impl<I, OT, S, SP> EventFirer<I, S> for LlmpEventManager<I, OT, S, SP>
where
I: Input,
OT: ObserversTuple<I, S>,
SP: ShMemProvider,
{
#[cfg(feature = "llmp_compression")]
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?;
let flags: Flags = LLMP_FLAG_INITIALIZED;
match self.compressor.compress(&serialized)? {
Some(comp_buf) => {
self.llmp.send_buf_with_flags(
LLMP_TAG_EVENT_TO_BOTH,
flags | LLMP_FLAG_COMPRESSED,
&comp_buf,
)?;
}
None => {
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
}
}
Ok(())
}
#[cfg(not(feature = "llmp_compression"))]
fn fire(&mut self, _state: &mut S, event: Event<I>) -> Result<(), Error> {
let serialized = postcard::to_allocvec(&event)?;
self.llmp.send_buf(LLMP_TAG_EVENT_TO_BOTH, &serialized)?;
Ok(())
}
fn configuration(&self) -> &str {
&self.configuration
}
}
impl<I, OT, S, SP> EventRestarter<S> for LlmpEventManager<I, OT, S, SP>
where
I: Input,
OT: ObserversTuple<I, S>,
SP: ShMemProvider,
{
fn await_restart_safe(&mut self) {
self.llmp.await_save_to_unmap_blocking();
}
}
impl<E, I, OT, S, SP, Z> EventProcessor<E, I, S, Z> for LlmpEventManager<I, OT, S, SP>
where
SP: ShMemProvider,
E: Executor<Self, I, S, Z> + HasObservers<I, OT, S>,
I: Input,
OT: ObserversTuple<I, S> + serde::de::DeserializeOwned,
Z: ExecutionProcessor<I, OT, S> + EvaluatorObservers<I, OT, S>, {
fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> {
let mut events = vec![];
let self_id = self.llmp.sender.id;
while let Some((sender_id, tag, _flags, msg)) = self.llmp.recv_buf_with_flags()? {
if tag == _LLMP_TAG_EVENT_TO_BROKER {
panic!("EVENT_TO_BROKER parcel should not have arrived in the client!");
}
if sender_id == self_id {
continue;
}
#[cfg(not(feature = "llmp_compression"))]
let event_bytes = msg;
#[cfg(feature = "llmp_compression")]
let compressed;
#[cfg(feature = "llmp_compression")]
let event_bytes = if _flags & LLMP_FLAG_COMPRESSED == LLMP_FLAG_COMPRESSED {
compressed = self.compressor.decompress(msg)?;
&compressed
} else {
msg
};
let event: Event<I> = postcard::from_bytes(event_bytes)?;
events.push((sender_id, event));
}
let count = events.len();
events.drain(..).try_for_each(|(sender_id, event)| {
self.handle_in_client(fuzzer, executor, state, sender_id, event)
})?;
Ok(count)
}
}
impl<E, I, OT, S, SP, Z> EventManager<E, I, S, Z> for LlmpEventManager<I, OT, S, SP>
where
E: Executor<Self, I, S, Z> + HasObservers<I, OT, S>,
I: Input,
OT: ObserversTuple<I, S> + serde::de::DeserializeOwned,
SP: ShMemProvider,
Z: ExecutionProcessor<I, OT, S> + EvaluatorObservers<I, OT, S>, {
}
impl<I, OT, S, SP> HasEventManagerId for LlmpEventManager<I, OT, S, SP>
where
I: Input,
OT: ObserversTuple<I, S> + serde::de::DeserializeOwned,
SP: ShMemProvider,
{
fn mgr_id(&self) -> EventManagerId {
EventManagerId {
id: self.llmp.sender.id as usize,
}
}
}
pub fn serialize_state_mgr<I, OT, S, SP>(
state: &S,
mgr: &LlmpEventManager<I, OT, S, SP>,
) -> Result<Vec<u8>, Error>
where
I: Input,
OT: ObserversTuple<I, S>,
S: Serialize,
SP: ShMemProvider,
{
Ok(postcard::to_allocvec(&(&state, &mgr.describe()?))?)
}
#[allow(clippy::type_complexity)]
pub fn deserialize_state_mgr<I, OT, S, SP>(
shmem_provider: SP,
state_corpus_serialized: &[u8],
configuration: String,
) -> Result<(S, LlmpEventManager<I, OT, S, SP>), Error>
where
I: Input,
OT: ObserversTuple<I, S>,
S: DeserializeOwned,
SP: ShMemProvider,
{
let tuple: (S, _) = postcard::from_bytes(state_corpus_serialized)?;
Ok((
tuple.0,
LlmpEventManager::existing_client_from_description(
shmem_provider,
&tuple.1,
configuration,
)?,
))
}
#[derive(Debug)]
pub struct LlmpRestartingEventManager<I, OT, S, SP>
where
I: Input,
OT: ObserversTuple<I, S>,
SP: ShMemProvider + 'static,
{
llmp_mgr: LlmpEventManager<I, OT, S, SP>,
sender: LlmpSender<SP>,
}
impl<I, OT, S, SP> EventFirer<I, S> for LlmpRestartingEventManager<I, OT, S, SP>
where
I: Input,
OT: ObserversTuple<I, S>,
S: Serialize,
SP: ShMemProvider,
{
fn fire(&mut self, state: &mut S, event: Event<I>) -> Result<(), Error> {
self.llmp_mgr.fire(state, event)
}
fn configuration(&self) -> &str {
self.llmp_mgr.configuration()
}
}
impl<I, OT, S, SP> EventRestarter<S> for LlmpRestartingEventManager<I, OT, S, SP>
where
I: Input,
OT: ObserversTuple<I, S>,
S: Serialize,
SP: ShMemProvider,
{
#[inline]
fn await_restart_safe(&mut self) {
self.llmp_mgr.await_restart_safe();
}
fn on_restart(&mut self, state: &mut S) -> Result<(), Error> {
unsafe {
self.sender.reset();
}
let state_corpus_serialized = serialize_state_mgr(state, &self.llmp_mgr)?;
self.sender
.send_buf(_LLMP_TAG_RESTART, &state_corpus_serialized)
}
}
impl<E, I, OT, S, SP, Z> EventProcessor<E, I, S, Z> for LlmpRestartingEventManager<I, OT, S, SP>
where
E: Executor<LlmpEventManager<I, OT, S, SP>, I, S, Z> + HasObservers<I, OT, S>,
I: Input,
Z: ExecutionProcessor<I, OT, S> + EvaluatorObservers<I, OT, S>,
OT: ObserversTuple<I, S> + serde::de::DeserializeOwned,
SP: ShMemProvider + 'static,
{
fn process(&mut self, fuzzer: &mut Z, state: &mut S, executor: &mut E) -> Result<usize, Error> {
self.llmp_mgr.process(fuzzer, state, executor)
}
}
impl<E, I, OT, S, SP, Z> EventManager<E, I, S, Z> for LlmpRestartingEventManager<I, OT, S, SP>
where
E: Executor<LlmpEventManager<I, OT, S, SP>, I, S, Z> + HasObservers<I, OT, S>,
I: Input,
S: Serialize,
Z: ExecutionProcessor<I, OT, S> + EvaluatorObservers<I, OT, S>,
OT: ObserversTuple<I, S> + serde::de::DeserializeOwned,
SP: ShMemProvider + 'static,
{
}
impl<I, OT, S, SP> HasEventManagerId for LlmpRestartingEventManager<I, OT, S, SP>
where
I: Input,
OT: ObserversTuple<I, S> + serde::de::DeserializeOwned,
S: Serialize,
SP: ShMemProvider + 'static,
{
fn mgr_id(&self) -> EventManagerId {
self.llmp_mgr.mgr_id()
}
}
const _ENV_FUZZER_SENDER: &str = "_AFL_ENV_FUZZER_SENDER";
const _ENV_FUZZER_RECEIVER: &str = "_AFL_ENV_FUZZER_RECEIVER";
const _ENV_FUZZER_BROKER_CLIENT_INITIAL: &str = "_AFL_ENV_FUZZER_BROKER_CLIENT";
impl<I, OT, S, SP> LlmpRestartingEventManager<I, OT, S, SP>
where
I: Input,
OT: ObserversTuple<I, S> + serde::de::DeserializeOwned,
SP: ShMemProvider + 'static,
{
pub fn new(llmp_mgr: LlmpEventManager<I, OT, S, SP>, sender: LlmpSender<SP>) -> Self {
Self { llmp_mgr, sender }
}
pub fn sender(&self) -> &LlmpSender<SP> {
&self.sender
}
pub fn sender_mut(&mut self) -> &mut LlmpSender<SP> {
&mut self.sender
}
}
#[derive(Debug, Clone, Copy)]
pub enum ManagerKind {
Any,
Client { cpu_core: Option<CoreId> },
Broker,
}
#[cfg(feature = "std")]
#[allow(clippy::type_complexity)]
pub fn setup_restarting_mgr_std<I, OT, S, ST>(
stats: ST,
broker_port: u16,
configuration: String,
) -> Result<
(
Option<S>,
LlmpRestartingEventManager<I, OT, S, StdShMemProvider>,
),
Error,
>
where
I: Input,
S: DeserializeOwned,
ST: Stats + Clone,
OT: ObserversTuple<I, S> + serde::de::DeserializeOwned,
S: DeserializeOwned,
{
#[cfg(target_os = "android")]
AshmemService::start().expect("Error starting Ashmem Service");
RestartingMgr::builder()
.shmem_provider(StdShMemProvider::new()?)
.stats(Some(stats))
.broker_port(broker_port)
.configuration(configuration)
.build()
.launch()
}
#[cfg(feature = "std")]
#[allow(clippy::default_trait_access)]
#[derive(TypedBuilder, Debug)]
pub struct RestartingMgr<I, OT, S, SP, ST>
where
I: Input,
OT: ObserversTuple<I, S> + serde::de::DeserializeOwned,
S: DeserializeOwned,
SP: ShMemProvider + 'static,
ST: Stats,
{
shmem_provider: SP,
configuration: String,
#[builder(default = None)]
stats: Option<ST>,
#[builder(default = 1337_u16)]
broker_port: u16,
#[builder(default = None)]
remote_broker_addr: Option<SocketAddr>,
#[builder(default = ManagerKind::Any)]
kind: ManagerKind,
#[builder(setter(skip), default = PhantomData {})]
_phantom: PhantomData<(I, OT, S)>,
}
#[cfg(feature = "std")]
#[allow(clippy::type_complexity, clippy::too_many_lines)]
impl<I, OT, S, SP, ST> RestartingMgr<I, OT, S, SP, ST>
where
I: Input,
OT: ObserversTuple<I, S> + serde::de::DeserializeOwned,
S: DeserializeOwned,
SP: ShMemProvider,
ST: Stats + Clone,
{
pub fn launch(
&mut self,
) -> Result<(Option<S>, LlmpRestartingEventManager<I, OT, S, SP>), Error> {
let (sender, mut receiver, new_shmem_provider, core_id) = if std::env::var(
_ENV_FUZZER_SENDER,
)
.is_err()
{
let broker_things = |mut broker: LlmpEventBroker<I, SP, ST>, remote_broker_addr| {
if let Some(remote_broker_addr) = remote_broker_addr {
println!("B2b: Connecting to {:?}", &remote_broker_addr);
broker.connect_b2b(remote_broker_addr)?;
};
broker.broker_loop()
};
let (mgr, core_id) = match self.kind {
ManagerKind::Any => {
let connection =
LlmpConnection::on_port(self.shmem_provider.clone(), self.broker_port)?;
match connection {
LlmpConnection::IsBroker { broker } => {
let event_broker = LlmpEventBroker::<I, SP, ST>::new(
broker,
self.stats.take().unwrap(),
)?;
println!(
"Doing broker things. Run this tool again to start fuzzing in a client."
);
broker_things(event_broker, self.remote_broker_addr)?;
return Err(Error::ShuttingDown);
}
LlmpConnection::IsClient { client } => {
let mgr = LlmpEventManager::<I, OT, S, SP>::new(
client,
self.configuration.clone(),
)?;
(mgr, None)
}
}
}
ManagerKind::Broker => {
let event_broker = LlmpEventBroker::<I, SP, ST>::new_on_port(
self.shmem_provider.clone(),
self.stats.take().unwrap(),
self.broker_port,
)?;
broker_things(event_broker, self.remote_broker_addr)?;
return Err(Error::ShuttingDown);
}
ManagerKind::Client { cpu_core } => {
let mgr = LlmpEventManager::<I, OT, S, SP>::new_on_port(
self.shmem_provider.clone(),
self.broker_port,
self.configuration.clone(),
)?;
(mgr, cpu_core)
}
};
if let Some(core_id) = core_id {
println!("Setting core affinity to {:?}", core_id);
core_affinity::set_for_current(core_id);
}
mgr.to_env(_ENV_FUZZER_BROKER_CLIENT_INITIAL);
let sender = { LlmpSender::new(self.shmem_provider.clone(), 0, false)? };
let map = {
self.shmem_provider
.clone_ref(&sender.out_maps.last().unwrap().shmem)?
};
let receiver = LlmpReceiver::on_existing_map(self.shmem_provider.clone(), map, None)?;
sender.to_env(_ENV_FUZZER_SENDER)?;
receiver.to_env(_ENV_FUZZER_RECEIVER)?;
let mut ctr: u64 = 0;
loop {
dbg!("Spawning next client (id {})", ctr);
#[cfg(unix)]
let child_status = {
self.shmem_provider.pre_fork()?;
match unsafe { fork() }? {
ForkResult::Parent(handle) => {
self.shmem_provider.post_fork(false)?;
handle.status()
}
ForkResult::Child => {
self.shmem_provider.post_fork(true)?;
break (sender, receiver, self.shmem_provider.clone(), core_id);
}
}
};
#[cfg(windows)]
let child_status = startable_self()?.status()?;
if unsafe { read_volatile(addr_of!((*receiver.current_recv_map.page()).size_used)) }
== 0
{
#[cfg(unix)]
if child_status == 137 {
panic!("Fuzzer-respawner: The fuzzed target crashed with an out of memory error! Fix your harness, or switch to another executor (for example, a forkserver).");
}
panic!("Fuzzer-respawner: Storing state in crashed fuzzer instance did not work, no point to spawn the next client! (Child exited with: {})", child_status);
}
ctr = ctr.wrapping_add(1);
}
} else {
(
LlmpSender::on_existing_from_env(self.shmem_provider.clone(), _ENV_FUZZER_SENDER)?,
LlmpReceiver::on_existing_from_env(
self.shmem_provider.clone(),
_ENV_FUZZER_RECEIVER,
)?,
self.shmem_provider.clone(),
None,
)
};
if let Some(core_id) = core_id {
core_affinity::set_for_current(core_id);
}
println!("We're a client, let's fuzz :)");
let (state, mut mgr) = match receiver.recv_buf()? {
None => {
println!("First run. Let's set it all up");
let mgr = LlmpEventManager::<I, OT, S, SP>::existing_client_from_env(
new_shmem_provider,
_ENV_FUZZER_BROKER_CLIENT_INITIAL,
self.configuration.clone(),
)?;
(None, LlmpRestartingEventManager::new(mgr, sender))
}
Some((_sender, _tag, msg)) => {
println!("Subsequent run. Let's load all data from shmem (received {} bytes from previous instance)", msg.len());
let (state, mgr): (S, LlmpEventManager<I, OT, S, SP>) =
deserialize_state_mgr(new_shmem_provider, msg, self.configuration.clone())?;
(Some(state), LlmpRestartingEventManager::new(mgr, sender))
}
};
unsafe {
mgr.sender_mut().reset();
}
Ok((state, mgr))
}
}