use alloc::{
borrow::{Cow, ToOwned},
vec::Vec,
};
use core::{marker::PhantomData, time::Duration};
use std::path::{Path, PathBuf};
use libafl_bolts::{
Named, current_time,
fs::find_new_files_rec,
shmem::{ShMem, ShMemProvider},
};
use serde::{Deserialize, Serialize};
use crate::{
Error, HasMetadata, HasNamedMetadata,
corpus::{Corpus, CorpusId, HasCurrentCorpusId},
events::{Event, EventConfig, EventFirer, EventWithStats, llmp::LlmpEventConverter},
executors::{Executor, ExitKind, HasObservers},
fuzzer::{Evaluator, EvaluatorObservers, ExecutionProcessor, HasObjective},
inputs::{Input, InputConverter},
stages::{Restartable, RetryCountRestartHelper, Stage},
state::{
HasCorpus, HasCurrentTestcase, HasExecutions, HasRand, HasSolutions,
MaybeHasClientPerfMonitor, Stoppable,
},
};
pub const SYNC_FROM_DISK_STAGE_NAME: &str = "sync";
#[cfg_attr(
any(not(feature = "serdeany_autoreg"), miri),
expect(clippy::unsafe_derive_deserialize)
)] #[derive(Serialize, Deserialize, Debug)]
pub struct SyncFromDiskMetadata {
pub last_time: Duration,
pub left_to_sync: Vec<PathBuf>,
}
libafl_bolts::impl_serdeany!(SyncFromDiskMetadata);
impl SyncFromDiskMetadata {
#[must_use]
pub fn new(last_time: Duration, left_to_sync: Vec<PathBuf>) -> Self {
Self {
last_time,
left_to_sync,
}
}
}
#[derive(Debug)]
pub struct SyncFromDiskStage<CB, E, EM, I, S, Z> {
name: Cow<'static, str>,
sync_dirs: Vec<PathBuf>,
load_callback: CB,
interval: Duration,
phantom: PhantomData<(E, EM, I, S, Z)>,
}
impl<CB, E, EM, I, S, Z> Named for SyncFromDiskStage<CB, E, EM, I, S, Z> {
fn name(&self) -> &Cow<'static, str> {
&self.name
}
}
impl<CB, E, EM, I, S, Z> Stage<E, EM, S, Z> for SyncFromDiskStage<CB, E, EM, I, S, Z>
where
CB: FnMut(&mut Z, &mut S, &Path) -> Result<I, Error>,
Z: Evaluator<E, EM, I, S>,
S: HasCorpus<I>
+ HasRand
+ HasMetadata
+ HasNamedMetadata
+ HasCurrentCorpusId
+ MaybeHasClientPerfMonitor,
{
#[inline]
fn perform(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
manager: &mut EM,
) -> Result<(), Error> {
let last = state
.metadata_map()
.get::<SyncFromDiskMetadata>()
.map(|m| m.last_time);
if let Some(last) = last {
if current_time().saturating_sub(last) < self.interval {
return Ok(());
}
}
let new_max_time = current_time();
let mut new_files = vec![];
for dir in &self.sync_dirs {
log::debug!("Syncing from dir: {}", dir.display());
let new_dir_files = find_new_files_rec(dir, &last)?;
new_files.extend(new_dir_files);
}
let sync_from_disk_metadata = state
.metadata_or_insert_with(|| SyncFromDiskMetadata::new(new_max_time, new_files.clone()));
sync_from_disk_metadata.last_time = new_max_time;
sync_from_disk_metadata.left_to_sync = new_files;
let to_sync = sync_from_disk_metadata.left_to_sync.clone();
log::debug!("Number of files to sync: {:?}", to_sync.len());
for path in to_sync {
state
.metadata_mut::<SyncFromDiskMetadata>()
.unwrap()
.left_to_sync
.retain(|p| p != &path);
let input = match (self.load_callback)(fuzzer, state, &path) {
Ok(input) => input,
Err(Error::InvalidInput(reason, _)) => {
log::warn!(
"Invalid input found in {} when syncing; reason {reason}; skipping;",
path.display()
);
continue;
}
Err(e) => return Err(e),
};
log::debug!("Syncing and evaluating {}", path.display());
fuzzer.evaluate_input(state, executor, manager, &input)?;
}
Ok(())
}
}
impl<CB, E, EM, I, S, Z> Restartable<S> for SyncFromDiskStage<CB, E, EM, I, S, Z>
where
S: HasMetadata + HasNamedMetadata + HasCurrentCorpusId,
{
#[inline]
fn should_restart(&mut self, state: &mut S) -> Result<bool, Error> {
RetryCountRestartHelper::no_retry(state, &self.name)
}
#[inline]
fn clear_progress(&mut self, state: &mut S) -> Result<(), Error> {
RetryCountRestartHelper::clear_progress(state, &self.name)
}
}
impl<CB, E, EM, I, S, Z> SyncFromDiskStage<CB, E, EM, I, S, Z> {
#[must_use]
pub fn new(sync_dirs: Vec<PathBuf>, load_callback: CB, interval: Duration, name: &str) -> Self {
Self {
name: Cow::Owned(SYNC_FROM_DISK_STAGE_NAME.to_owned() + ":" + name),
phantom: PhantomData,
sync_dirs,
interval,
load_callback,
}
}
}
pub type SyncFromDiskFunction<I, S, Z> = fn(&mut Z, &mut S, &Path) -> Result<I, Error>;
impl<E, EM, I, S, Z> SyncFromDiskStage<SyncFromDiskFunction<I, S, Z>, E, EM, I, S, Z>
where
I: Input,
S: HasCorpus<I>,
Z: Evaluator<E, EM, I, S>,
{
#[must_use]
pub fn with_from_file(sync_dirs: Vec<PathBuf>, interval: Duration) -> Self {
fn load_callback<I, S, Z>(_: &mut Z, _: &mut S, p: &Path) -> Result<I, Error>
where
I: Input,
S: HasCorpus<I>,
{
Input::from_file(p)
}
Self {
interval,
name: Cow::Borrowed(SYNC_FROM_DISK_STAGE_NAME),
sync_dirs,
load_callback: load_callback::<_, _, _>,
phantom: PhantomData,
}
}
}
#[cfg_attr(
any(not(feature = "serdeany_autoreg"), miri),
expect(clippy::unsafe_derive_deserialize)
)] #[derive(Serialize, Deserialize, Debug)]
pub struct SyncFromBrokerMetadata {
pub last_id: Option<CorpusId>,
}
libafl_bolts::impl_serdeany!(SyncFromBrokerMetadata);
impl SyncFromBrokerMetadata {
#[must_use]
pub fn new(last_id: Option<CorpusId>) -> Self {
Self { last_id }
}
}
#[derive(Debug)]
pub struct SyncFromBrokerStage<I, IC, ICB, S, SHM, SP> {
client: LlmpEventConverter<I, IC, ICB, S, SHM, SP>,
}
impl<E, EM, I, IC, ICB, DI, S, SHM, SP, Z> Stage<E, EM, S, Z>
for SyncFromBrokerStage<I, IC, ICB, S, SHM, SP>
where
DI: Input,
EM: EventFirer<I, S>,
E: HasObservers + Executor<EM, I, S, Z>,
for<'a> E::Observers: Deserialize<'a>,
I: Input + Clone,
IC: InputConverter<From = I, To = DI>,
ICB: InputConverter<From = DI, To = I>,
S: HasExecutions
+ HasRand
+ HasMetadata
+ HasSolutions<I>
+ HasCurrentTestcase<I>
+ Stoppable
+ MaybeHasClientPerfMonitor,
SHM: ShMem,
SP: ShMemProvider<ShMem = SHM>,
Z: EvaluatorObservers<E, EM, I, S> + ExecutionProcessor<EM, I, E::Observers, S> + HasObjective,
{
#[inline]
fn perform(
&mut self,
fuzzer: &mut Z,
executor: &mut E,
state: &mut S,
manager: &mut EM,
) -> Result<(), Error> {
if self.client.can_convert() {
let last_id = state
.metadata_map()
.get::<SyncFromBrokerMetadata>()
.and_then(|m| m.last_id);
let mut cur_id =
last_id.map_or_else(|| state.corpus().first(), |id| state.corpus().next(id));
while let Some(id) = cur_id {
let input = state.corpus().cloned_input_for_id(id)?;
self.client.fire(
state,
EventWithStats::with_current_time(
Event::NewTestcase {
input,
observers_buf: None,
exit_kind: ExitKind::Ok,
corpus_size: 0, client_config: EventConfig::AlwaysUnique,
forward_id: None,
#[cfg(all(unix, feature = "multi_machine"))]
node_id: None,
},
*state.executions(),
),
)?;
cur_id = state.corpus().next(id);
}
let last = state.corpus().last();
if last_id.is_none() {
state
.metadata_map_mut()
.insert(SyncFromBrokerMetadata::new(last));
} else {
state
.metadata_map_mut()
.get_mut::<SyncFromBrokerMetadata>()
.unwrap()
.last_id = last;
}
}
self.client.process(fuzzer, state, executor, manager)?;
Ok(())
}
}
impl<I, IC, ICB, S, SHM, SP> Restartable<S> for SyncFromBrokerStage<I, IC, ICB, S, SHM, SP> {
#[inline]
fn should_restart(&mut self, _state: &mut S) -> Result<bool, Error> {
Ok(true)
}
#[inline]
fn clear_progress(&mut self, _state: &mut S) -> Result<(), Error> {
Ok(())
}
}
impl<I, IC, ICB, S, SHM, SP> SyncFromBrokerStage<I, IC, ICB, S, SHM, SP> {
#[must_use]
pub fn new(client: LlmpEventConverter<I, IC, ICB, S, SHM, SP>) -> Self {
Self { client }
}
}