#![cfg_attr(not(feature = "std"), no_std)]
#![forbid(unsafe_code)]
#![doc = include_str!("../README.md")]
#![cfg_attr(not(test), warn(clippy::pedantic))]
#![allow(clippy::must_use_candidate, clippy::module_name_repetitions)]
extern crate alloc;
use core::fmt::{self, Debug, Display};
use core::mem::replace;
use core::ops::Deref;
use core::sync::atomic::{AtomicBool, Ordering};
use alloc::boxed::Box;
use alloc::sync::Arc;
use alloc::vec::Vec;
use arc_swap::{ArcSwap, ArcSwapOption, DefaultStrategy, Guard};
mod options;
pub mod spawner;
pub use options::Options;
pub use spawner::{DefaultSpawner, Spawner};
pub struct Retrace<T: Reproducible, R = DefaultSpawner> {
options: Options,
spawner: Arc<R>,
pending: Vec<T::Intent>,
base: Option<Arc<Base<T, R>>>,
housekeeping: AtomicBool,
}
impl<T: Reproducible> Retrace<T, DefaultSpawner> {
pub fn new(state: T, options: Options) -> Self {
Self::with_spawner(state, options, DefaultSpawner::default())
}
}
impl<T: Reproducible, R: Spawner> Retrace<T, R> {
pub fn with_spawner(state: T, options: Options, spawner: R) -> Self {
let pending = Vec::with_capacity(options.chunk_size.get());
Retrace {
options,
spawner: Arc::new(spawner),
pending,
base: Some(Arc::new(Base::Root(Arc::new(state)))),
housekeeping: AtomicBool::new(false),
}
}
pub fn append(&mut self, intent: T::Intent) {
self.pending.push(intent);
let chunk_size = self.options.chunk_size.get();
if self.pending.len() == chunk_size {
let intents = replace(&mut self.pending, Vec::with_capacity(chunk_size));
let intents = intents.into_boxed_slice();
let base = self
.base
.take()
.expect("base is always put back after this");
let base_is_cached = base.is_cached();
let chunk = Chunk::new(intents, base, Arc::clone(&self.spawner));
if base_is_cached {
chunk.spawn_cache_task_if_not_spawned();
}
self.base = Some(Arc::new(Base::Chunk(chunk)));
self.housekeep();
}
}
pub fn pop(&mut self) -> Option<T::Intent> {
while self.pending.is_empty() {
if !matches!(self.base.as_deref(), Some(Base::Chunk(_))) {
return None;
}
let base = self.base.take();
let chunk = if let Some(Base::Chunk(chunk)) = base.as_deref() {
chunk
} else {
unreachable!()
};
self.pending = chunk.intents.to_vec();
self.base = Some(Arc::clone(&*chunk.base.load()));
}
self.pending.pop()
}
pub fn len(&self) -> usize {
let mut current = RefOrGuard::Ref(self.base.as_ref().expect("base is set here"));
let mut len = 0;
while let Base::Chunk(chunk) = &*current {
len += chunk.len();
current = RefOrGuard::Guard(chunk.base.load());
}
self.pending.len() + len
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn eval(&self, steps: usize) -> Result<T, EvalError<T::Error>> {
if steps <= self.pending.len() {
let base = self.base.as_ref().expect("base is set here");
let mut state = base.eval().map_err(EvalError::Internal)?;
state
.apply_all(&self.pending[0..(self.pending.len() - steps)])
.map_err(EvalError::Internal)?;
return Ok(state);
}
let mut current = RefOrGuard::Ref(self.base.as_ref().expect("base is set here"));
let mut steps = steps - self.pending.len();
while let Base::Chunk(chunk) = &*current {
if steps == 0 {
return chunk.eval_tail().map_err(EvalError::Internal);
} else if steps <= chunk.len() {
return chunk.eval(steps).map_err(EvalError::Internal);
}
steps -= chunk.len();
current = RefOrGuard::Guard(chunk.base.load());
}
Err(EvalError::OutOfRange)
}
pub fn tip(&self) -> Result<T, EvalError<T::Error>> {
self.eval(0)
}
pub fn rollback(&mut self) -> Result<(), RollbackFailure> {
if self.tip().is_ok() {
return Ok(());
}
let intents = self.rollback_chunk_inner()?;
let mut state = self.tip().unwrap_or_else(|_| {
unreachable!("rollback_chunk_inner should leave a valid tip or return Err")
});
for intent in intents {
let mut try_state = state.clone();
match try_state.apply(&intent) {
Ok(()) => {
self.pending.push(intent);
state = try_state;
}
Err(_) => {
break;
}
}
}
debug_assert!(self.tip().is_ok());
Ok(())
}
pub fn rollback_chunk(&mut self) -> Result<(), RollbackFailure> {
if self.tip().is_ok() {
return Ok(());
}
self.rollback_chunk_inner().map(|_| ())
}
fn rollback_chunk_inner(&mut self) -> Result<Vec<T::Intent>, RollbackFailure> {
let mut current = RefOrGuard::Ref(self.base.as_ref().expect("base is set here"));
let mut previous = replace(
&mut self.pending,
Vec::with_capacity(self.options.chunk_size.get()),
);
while let Base::Chunk(chunk) = &*current {
let state = chunk.cached_tail.load();
if let Some(state) = &*state {
if state.is_ok() {
match current {
RefOrGuard::Ref(_) => {
}
RefOrGuard::Guard(base) => {
self.base = Some(Arc::clone(&*base));
}
}
return Ok(previous);
}
}
previous = chunk.intents.to_vec();
current = RefOrGuard::Guard(chunk.base.load());
}
if let Base::Root(state) = &*current {
self.base = Some(Arc::new(Base::Root(Arc::new(T::clone(&*state)))));
return Ok(Vec::new());
}
Err(RollbackFailure { _private: () })
}
pub fn housekeep(&self) {
if self
.housekeeping
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
return;
}
let soft_limit = match (self.options.soft_limit, self.options.hard_limit) {
(None, hard) => hard,
(Some(soft), None) => Some(soft),
(Some(soft), Some(hard)) => Some(soft.min(hard)),
};
let soft_limit = if let Some(soft_limit) = soft_limit {
soft_limit
} else {
return;
};
let hard_limit = self.options.hard_limit;
let mut current = RefOrGuard::Ref(self.base.as_ref().expect("base is set here"));
let mut len = self.pending.len();
while let Base::Chunk(chunk) = &*current {
len += chunk.len();
if hard_limit.map_or(false, |limit| len >= limit) {
chunk.force_unchain();
break;
} else if len >= soft_limit {
if chunk.unchain() {
break;
}
chunk.spawn_cache_task_if_not_spawned();
} else if chunk.base.load().is_cached() {
chunk.spawn_cache_task_if_not_spawned();
}
current = RefOrGuard::Guard(chunk.base.load());
}
self.housekeeping.store(false, Ordering::Release);
}
#[cfg(feature = "bench")]
pub fn decache(&mut self) {
let mut current = RefOrGuard::Ref(self.base.as_ref().expect("base is set here"));
while let Base::Chunk(chunk) = &*current {
chunk.cached_tail.store(None);
current = RefOrGuard::Guard(chunk.base.load());
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum EvalError<E> {
OutOfRange,
Internal(E),
}
impl<E: Display> Display for EvalError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::OutOfRange => write!(f, "the requested step is out of range"),
Self::Internal(e) => write!(f, "internal error: {}", e),
}
}
}
#[derive(Debug)]
pub struct RollbackFailure {
_private: (),
}
impl Display for RollbackFailure {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "no good state could be found in the log")
}
}
pub trait Reproducible: 'static + Clone + Send + Sync {
type Intent: Clone + Send + Sync;
type Error: Clone + Send + Sync;
fn apply(&mut self, intent: &Self::Intent) -> Result<(), Self::Error>;
fn apply_all<'a, I>(&mut self, it: I) -> Result<(), Self::Error>
where
I: IntoIterator<Item = &'a Self::Intent>,
{
for intent in it {
self.apply(intent)?;
}
Ok(())
}
}
enum RefOrGuard<'a, T: Reproducible, R> {
Ref(&'a Base<T, R>),
Guard(Guard<Arc<Base<T, R>>, DefaultStrategy>),
}
impl<'a, T: Reproducible, R> Deref for RefOrGuard<'a, T, R> {
type Target = Base<T, R>;
fn deref(&self) -> &Self::Target {
match self {
RefOrGuard::Ref(r) => r,
RefOrGuard::Guard(r) => &**r,
}
}
}
struct Chunk<T: Reproducible, R> {
spawner: Arc<R>,
cached_tail: Arc<ArcSwapOption<Result<Arc<T>, T::Error>>>,
cache_task_spawned: AtomicBool,
intents: Arc<[T::Intent]>,
base: ArcSwap<Base<T, R>>,
}
impl<T: Reproducible, R: Spawner> Chunk<T, R> {
fn new(intents: Box<[T::Intent]>, base: Arc<Base<T, R>>, spawner: Arc<R>) -> Self {
Chunk {
spawner,
cached_tail: Arc::new(ArcSwapOption::from_pointee(None)),
cache_task_spawned: AtomicBool::new(false),
intents: Arc::from(intents),
base: ArcSwap::new(base),
}
}
fn spawn_cache_task_if_not_spawned(&self) {
if self
.cache_task_spawned
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Relaxed)
.is_err()
{
return;
}
let base = self.base.load();
let cached_tail = Arc::clone(&self.cached_tail);
let intents = Arc::clone(&self.intents);
self.spawner.spawn(move || {
let result = base.eval().and_then(move |mut state| {
state.apply_all(&*intents)?;
Ok(state)
});
cached_tail.store(Some(Arc::new(result.map(Arc::new))));
});
}
fn unchain(&self) -> bool {
let base = self.base.load();
let chunk = match &**base {
Base::Chunk(chunk) => chunk,
Base::Root(_) | Base::Error(_) => return true,
};
let result = if let Some(result) = chunk.cached_tail.swap(None) {
result
} else {
return false;
};
let base = match &*result {
Ok(t) => Base::Root(Arc::clone(t)),
Err(e) => Base::Error(T::Error::clone(e)),
};
let old = self.base.swap(Arc::new(base));
self.spawner.spawn(move || drop(old));
true
}
fn force_unchain(&self) {
let base = self.base.load();
let chunk = match &**base {
Base::Chunk(chunk) => chunk,
Base::Root(_) | Base::Error(_) => return,
};
let base = match chunk.eval_tail() {
Ok(t) => Base::Root(Arc::new(t)),
Err(e) => Base::Error(e),
};
let old = self.base.swap(Arc::new(base));
self.spawner.spawn(move || drop(old));
}
fn len(&self) -> usize {
self.intents.len()
}
fn eval_tail(&self) -> Result<T, T::Error> {
let cached = self.cached_tail.load();
if let Some(result) = &*cached {
return match &**result {
Ok(t) => Ok(T::clone(t)),
Err(e) => Err(T::Error::clone(e)),
};
}
let result = self.eval(0);
self.cached_tail.compare_and_swap(
cached,
Some(Arc::new(match &result {
Ok(t) => Ok(Arc::new(T::clone(t))),
Err(e) => Err(T::Error::clone(e)),
})),
);
result
}
fn eval(&self, steps: usize) -> Result<T, T::Error> {
let base = self.base.load();
let mut state = base.eval()?;
let intents = self.intents.as_ref();
assert!(steps <= intents.len());
state.apply_all(&intents[0..(intents.len() - steps)])?;
Ok(state)
}
}
enum Base<T: Reproducible, R> {
Root(Arc<T>),
Chunk(Chunk<T, R>),
Error(T::Error),
}
impl<T: Reproducible, R: Spawner> Base<T, R> {
fn is_cached(&self) -> bool {
match self {
Base::Root(_) | Base::Error(_) => true,
Base::Chunk(chunk) => chunk.cached_tail.load().is_some(),
}
}
fn eval(&self) -> Result<T, T::Error> {
match self {
Base::Root(t) => Ok(T::clone(t)),
Base::Error(e) => Err(T::Error::clone(e)),
Base::Chunk(chunk) => chunk.eval_tail(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use core::num::NonZeroUsize;
#[derive(Clone, PartialEq, Eq, Debug)]
struct Foo(i32);
impl Reproducible for Foo {
type Intent = i32;
type Error = ();
fn apply(&mut self, intent: &Self::Intent) -> Result<(), Self::Error> {
self.0 += *intent;
Ok(())
}
}
#[test]
fn is_send_sync() {
fn test<T: Send + Sync>() {}
test::<Foo>();
test::<Retrace<Foo, DefaultSpawner>>();
test::<Chunk<Foo, DefaultSpawner>>();
test::<Base<Foo, DefaultSpawner>>();
}
#[test]
fn basic_functions() {
const MAX: i32 = 100;
let mut retrace = Retrace::new(
Foo(0),
Options::new().chunk_size(NonZeroUsize::new(4).unwrap()),
);
for i in 1..=MAX {
retrace.append(i);
}
for _ in 0..MAX {
retrace.append(42);
}
for _ in 0..MAX {
assert_eq!(Some(42), retrace.pop());
}
for i in (1..=MAX).rev() {
assert_eq!(
Foo((1 + i) * i / 2),
retrace.eval((MAX - i) as usize).unwrap()
);
}
assert!(matches!(
retrace.eval((MAX + 1) as usize).unwrap_err(),
EvalError::OutOfRange
));
for i in (1..=MAX).rev() {
assert_eq!(Some(i), retrace.pop());
}
assert_eq!(None, retrace.pop());
}
#[test]
fn backward_query() {
const MAX: i32 = 100;
let mut retrace = Retrace::new(
Foo(0),
Options::new().chunk_size(NonZeroUsize::new(4).unwrap()),
);
for i in 1..=MAX {
retrace.append(i);
}
for i in 1..=MAX {
assert_eq!(
Foo((1 + i) * i / 2),
retrace.eval((MAX - i) as usize).unwrap()
);
}
}
#[test]
fn no_force_rebase_for_soft_limit() {
let mut retrace = Retrace::with_spawner(
Foo(0),
Options::new()
.chunk_size(NonZeroUsize::new(4).unwrap())
.soft_limit(Some(20)),
spawner::DropSpawner::default(),
);
for _ in 0..100 {
retrace.append(42);
}
assert_eq!(100, retrace.len());
}
#[test]
fn force_rebase_for_hard_limit() {
let mut retrace = Retrace::with_spawner(
Foo(0),
Options::new()
.chunk_size(NonZeroUsize::new(4).unwrap())
.hard_limit(Some(20)),
spawner::DropSpawner::default(),
);
for _ in 0..20 {
retrace.append(42);
}
for _ in 0..100 {
retrace.append(42);
assert!((20..=24).contains(&retrace.len()));
}
}
#[test]
fn deferred_rebase_for_soft_limit() {
let mut retrace = Retrace::with_spawner(
Foo(0),
Options::new()
.chunk_size(NonZeroUsize::new(4).unwrap())
.soft_limit(Some(20)),
spawner::ImmediateSpawner::default(),
);
for _ in 0..100 {
retrace.append(42);
}
assert!((20..=24).contains(&retrace.len()));
}
#[derive(Clone, PartialEq, Eq, Debug)]
struct Bar(i32);
impl Reproducible for Bar {
type Intent = Option<i32>;
type Error = ();
fn apply(&mut self, intent: &Self::Intent) -> Result<(), Self::Error> {
match intent {
Some(intent) => {
self.0 += *intent;
Ok(())
}
None => Err(()),
}
}
}
#[test]
fn can_rollback_chunk() {
let mut retrace = Retrace::new(
Bar(0),
Options::new().chunk_size(NonZeroUsize::new(4).unwrap()),
);
for _ in 0..22 {
retrace.append(Some(1));
}
retrace.append(None);
for _ in 0..22 {
retrace.append(Some(1));
}
assert!(retrace.tip().is_err());
retrace.rollback_chunk().unwrap();
assert_eq!(Bar(20), retrace.tip().unwrap());
}
#[test]
fn can_rollback_precise() {
let mut retrace = Retrace::new(
Bar(0),
Options::new().chunk_size(NonZeroUsize::new(4).unwrap()),
);
for _ in 0..22 {
retrace.append(Some(1));
}
retrace.append(None);
for _ in 0..22 {
retrace.append(Some(1));
}
assert!(retrace.tip().is_err());
retrace.rollback().unwrap();
assert_eq!(Bar(22), retrace.tip().unwrap());
}
#[test]
fn can_rollback_to_root() {
let mut retrace = Retrace::new(
Bar(0),
Options::new().chunk_size(NonZeroUsize::new(4).unwrap()),
);
retrace.append(None);
assert!(retrace.tip().is_err());
retrace.rollback().unwrap();
assert_eq!(Bar(0), retrace.tip().unwrap());
let mut retrace = Retrace::new(
Bar(0),
Options::new().chunk_size(NonZeroUsize::new(4).unwrap()),
);
retrace.append(None);
assert!(retrace.tip().is_err());
retrace.rollback_chunk().unwrap();
assert_eq!(Bar(0), retrace.tip().unwrap());
}
}