use alloc::collections::vec_deque::VecDeque;
use core::cell::RefCell;
use std::path::Path;
use serde::{Deserialize, Serialize};
use crate::{
corpus::{
ondisk::{OnDiskCorpus, OnDiskMetadataFormat},
Corpus, CorpusId, Testcase,
},
inputs::{Input, UsesInput},
Error,
};
#[cfg(feature = "std")]
#[derive(Default, Serialize, Deserialize, Clone, Debug)]
#[serde(bound = "I: serde::de::DeserializeOwned")]
pub struct CachedOnDiskCorpus<I>
where
I: Input,
{
inner: OnDiskCorpus<I>,
cached_indexes: RefCell<VecDeque<CorpusId>>,
cache_max_len: usize,
}
impl<I> UsesInput for CachedOnDiskCorpus<I>
where
I: Input,
{
type Input = I;
}
impl<I> Corpus for CachedOnDiskCorpus<I>
where
I: Input,
{
#[inline]
fn count(&self) -> usize {
self.inner.count()
}
#[inline]
fn add(&mut self, testcase: Testcase<I>) -> Result<CorpusId, Error> {
self.inner.add(testcase)
}
#[inline]
fn replace(&mut self, idx: CorpusId, testcase: Testcase<I>) -> Result<Testcase<I>, Error> {
self.inner.replace(idx, testcase)
}
#[inline]
fn remove(&mut self, idx: CorpusId) -> Result<Testcase<I>, Error> {
let testcase = self.inner.remove(idx)?;
self.cached_indexes.borrow_mut().retain(|e| *e != idx);
Ok(testcase)
}
#[inline]
fn get(&self, idx: CorpusId) -> Result<&RefCell<Testcase<I>>, Error> {
let testcase = { self.inner.get(idx)? };
if testcase.borrow().input().is_none() {
let _ = testcase.borrow_mut().load_input()?;
let mut borrowed_num = 0;
while self.cached_indexes.borrow().len() >= self.cache_max_len {
let removed = self.cached_indexes.borrow_mut().pop_front().unwrap();
if let Ok(mut borrowed) = self.inner.get(removed)?.try_borrow_mut() {
*borrowed.input_mut() = None;
} else {
self.cached_indexes.borrow_mut().push_back(removed);
borrowed_num += 1;
if self.cache_max_len == borrowed_num {
break;
}
}
}
self.cached_indexes.borrow_mut().push_back(idx);
}
Ok(testcase)
}
#[inline]
fn current(&self) -> &Option<CorpusId> {
self.inner.current()
}
#[inline]
fn current_mut(&mut self) -> &mut Option<CorpusId> {
self.inner.current_mut()
}
#[inline]
fn next(&self, idx: CorpusId) -> Option<CorpusId> {
self.inner.next(idx)
}
#[inline]
fn prev(&self, idx: CorpusId) -> Option<CorpusId> {
self.inner.prev(idx)
}
#[inline]
fn first(&self) -> Option<CorpusId> {
self.inner.first()
}
#[inline]
fn last(&self) -> Option<CorpusId> {
self.inner.last()
}
#[inline]
fn nth(&self, nth: usize) -> CorpusId {
self.inner.nth(nth)
}
}
impl<I> CachedOnDiskCorpus<I>
where
I: Input,
{
pub fn new<P>(dir_path: P, cache_max_len: usize) -> Result<Self, Error>
where
P: AsRef<Path>,
{
Self::_new(OnDiskCorpus::new(dir_path)?, cache_max_len)
}
pub fn no_meta<P>(dir_path: P, cache_max_len: usize) -> Result<Self, Error>
where
P: AsRef<Path>,
{
Self::_new(OnDiskCorpus::no_meta(dir_path)?, cache_max_len)
}
pub fn with_meta_format<P>(
dir_path: P,
cache_max_len: usize,
meta_format: OnDiskMetadataFormat,
) -> Result<Self, Error>
where
P: AsRef<Path>,
{
Self::_new(
OnDiskCorpus::with_meta_format(dir_path, meta_format)?,
cache_max_len,
)
}
fn _new(on_disk_corpus: OnDiskCorpus<I>, cache_max_len: usize) -> Result<Self, Error> {
if cache_max_len == 0 {
return Err(Error::illegal_argument(
"The max cache len in CachedOnDiskCorpus cannot be 0",
));
}
Ok(Self {
inner: on_disk_corpus,
cached_indexes: RefCell::new(VecDeque::new()),
cache_max_len,
})
}
}
#[cfg(feature = "python")]
pub mod pybind {
use alloc::string::String;
use std::path::PathBuf;
use pyo3::prelude::*;
use serde::{Deserialize, Serialize};
use crate::{
corpus::{pybind::PythonCorpus, CachedOnDiskCorpus},
inputs::BytesInput,
};
#[pyclass(unsendable, name = "CachedOnDiskCorpus")]
#[allow(clippy::unsafe_derive_deserialize)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PythonCachedOnDiskCorpus {
pub inner: CachedOnDiskCorpus<BytesInput>,
}
#[pymethods]
impl PythonCachedOnDiskCorpus {
#[new]
fn new(path: String, cache_max_len: usize) -> Self {
Self {
inner: CachedOnDiskCorpus::new(PathBuf::from(path), cache_max_len).unwrap(),
}
}
fn as_corpus(slf: Py<Self>) -> PythonCorpus {
PythonCorpus::new_cached_on_disk(slf)
}
}
pub fn register(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_class::<PythonCachedOnDiskCorpus>()?;
Ok(())
}
}